spring整合dubbo源代码总结

云程序员 2020年07月08日 39次浏览

Dubbo是一款高性能、轻量级的开源Java RPC框架,它提供了三大核心能力:面向接口的远程方法调用,智能容错和负载均衡,以及服务自动注册和发现。本文主要是通过Spring中Dubbo的加载过程来介绍下Dubbo的几个比较重要的类(虽然有些类已经废弃,但还是保留原文章内容,除了没空更新外,也为了更好理解Spring的初始化流程)

DubboApplicationContextInitializer(新版已变更)

DubboApplicationContextInitializer实现了ApplicationContextInitializer, Ordered接口,initialize方法执行overrideBeanDefinitions,它往applicationContext添加了OverrideBeanDefinitionRegistryPostProcessor、DubboConfigBeanDefinitionConflictProcessor(新版本已取消);getOrder返回的是HIGHEST_PRECEDENCE

public class DubboApplicationContextInitializer implements ApplicationContextInitializer, Ordered {

    @Override
    public void initialize(ConfigurableApplicationContext applicationContext) {
        overrideBeanDefinitions(applicationContext);
    }

    private void overrideBeanDefinitions(ConfigurableApplicationContext applicationContext) {
        // @since 2.7.8 OverrideBeanDefinitionRegistryPostProcessor has been removed
        // applicationContext.addBeanFactoryPostProcessor(new OverrideBeanDefinitionRegistryPostProcessor());
        // @since 2.7.5 DubboConfigBeanDefinitionConflictProcessor has been removed
        // @see {@link DubboConfigBeanDefinitionConflictApplicationListener}
        // applicationContext.addBeanFactoryPostProcessor(new DubboConfigBeanDefinitionConflictProcessor());
        // TODO Add some components in the future ( after 2.7.8 )
        //新版本已经被替代了
        //在EnableDubbo中有注册相应bean的代码
        applicationContext.addBeanFactoryPostProcessor(new OverrideBeanDefinitionRegistryPostProcessor());
        applicationContext.addBeanFactoryPostProcessor(new DubboConfigBeanDefinitionConflictProcessor());
    }

    @Override
    public int getOrder() {
        return HIGHEST_PRECEDENCE;
    }

}

DubboServiceRegistrationApplicationContextInitializer

dubbo服务注册相关功能实现

public class DubboServiceRegistrationApplicationContextInitializer
		implements ApplicationContextInitializer<ConfigurableApplicationContext> {

	@Override
	public void initialize(ConfigurableApplicationContext applicationContext) {
		// Set ApplicationContext into SpringCloudRegistryFactory before Dubbo Service
		// Register
		SpringCloudRegistryFactory.setApplicationContext(applicationContext);
	}

}

ServiceAnnotationBeanPostProcessor

ServiceAnnotationBeanPostProcessor的作用就是将Dubbo自实现的@Service标识的类注册到bean工厂去,由spring去生成bean

@Override
public void postProcessBeanDefinitionRegistry(BeanDefinitionRegistry registry) throws BeansException {

    Set<String> resolvedPackagesToScan = resolvePackagesToScan(packagesToScan);

    if (!CollectionUtils.isEmpty(resolvedPackagesToScan)) {
        registerServiceBeans(resolvedPackagesToScan, registry);
    } else {
        if (logger.isWarnEnabled()) {
            logger.warn("packagesToScan is empty , ServiceBean registry will be ignored!");
        }
    }

}

至于上述方法的执行是在PostProcessorRegistrationDelegate.invokeBeanFactoryPostProcessors这个方法中

public static void invokeBeanFactoryPostProcessors(
			ConfigurableListableBeanFactory beanFactory, List<BeanFactoryPostProcessor> beanFactoryPostProcessors) {

		// Invoke BeanDefinitionRegistryPostProcessors first, if any.
		Set<String> processedBeans = new HashSet<>();

		if (beanFactory instanceof BeanDefinitionRegistry) {
			BeanDefinitionRegistry registry = (BeanDefinitionRegistry) beanFactory;
			List<BeanFactoryPostProcessor> regularPostProcessors = new ArrayList<>();
			List<BeanDefinitionRegistryPostProcessor> registryProcessors = new ArrayList<>();

			for (BeanFactoryPostProcessor postProcessor : beanFactoryPostProcessors) {
				if (postProcessor instanceof BeanDefinitionRegistryPostProcessor) {
					BeanDefinitionRegistryPostProcessor registryProcessor =
							(BeanDefinitionRegistryPostProcessor) postProcessor;
					registryProcessor.postProcessBeanDefinitionRegistry(registry);
					registryProcessors.add(registryProcessor);
				}
				else {
					regularPostProcessors.add(postProcessor);
				}
			}

			// Do not initialize FactoryBeans here: We need to leave all regular beans
			// uninitialized to let the bean factory post-processors apply to them!
			// Separate between BeanDefinitionRegistryPostProcessors that implement
			// PriorityOrdered, Ordered, and the rest.
			List<BeanDefinitionRegistryPostProcessor> currentRegistryProcessors = new ArrayList<>();

			// First, invoke the BeanDefinitionRegistryPostProcessors that implement PriorityOrdered.
			String[] postProcessorNames =
					beanFactory.getBeanNamesForType(BeanDefinitionRegistryPostProcessor.class, true, false);
			for (String ppName : postProcessorNames) {
				if (beanFactory.isTypeMatch(ppName, PriorityOrdered.class)) {
					currentRegistryProcessors.add(beanFactory.getBean(ppName, BeanDefinitionRegistryPostProcessor.class));
					processedBeans.add(ppName);
				}
			}
			sortPostProcessors(currentRegistryProcessors, beanFactory);
			registryProcessors.addAll(currentRegistryProcessors);
			invokeBeanDefinitionRegistryPostProcessors(currentRegistryProcessors, registry);
			currentRegistryProcessors.clear();

			// Next, invoke the BeanDefinitionRegistryPostProcessors that implement Ordered.
			postProcessorNames = beanFactory.getBeanNamesForType(BeanDefinitionRegistryPostProcessor.class, true, false);
			for (String ppName : postProcessorNames) {
				if (!processedBeans.contains(ppName) && beanFactory.isTypeMatch(ppName, Ordered.class)) {
					currentRegistryProcessors.add(beanFactory.getBean(ppName, BeanDefinitionRegistryPostProcessor.class));
					processedBeans.add(ppName);
				}
			}
			sortPostProcessors(currentRegistryProcessors, beanFactory);
			registryProcessors.addAll(currentRegistryProcessors);
			invokeBeanDefinitionRegistryPostProcessors(currentRegistryProcessors, registry);
			currentRegistryProcessors.clear();

			// Finally, invoke all other BeanDefinitionRegistryPostProcessors until no further ones appear.
			boolean reiterate = true;
			while (reiterate) {
				reiterate = false;
				postProcessorNames = beanFactory.getBeanNamesForType(BeanDefinitionRegistryPostProcessor.class, true, false);
				for (String ppName : postProcessorNames) {
					if (!processedBeans.contains(ppName)) {
						currentRegistryProcessors.add(beanFactory.getBean(ppName, BeanDefinitionRegistryPostProcessor.class));
						processedBeans.add(ppName);
						reiterate = true;
					}
				}
				sortPostProcessors(currentRegistryProcessors, beanFactory);
				registryProcessors.addAll(currentRegistryProcessors);
				invokeBeanDefinitionRegistryPostProcessors(currentRegistryProcessors, registry);
				currentRegistryProcessors.clear();
			}

			// Now, invoke the postProcessBeanFactory callback of all processors handled so far.
			invokeBeanFactoryPostProcessors(registryProcessors, beanFactory);
			invokeBeanFactoryPostProcessors(regularPostProcessors, beanFactory);
		}

		else {
			// Invoke factory processors registered with the context instance.
			invokeBeanFactoryPostProcessors(beanFactoryPostProcessors, beanFactory);
		}

		// Do not initialize FactoryBeans here: We need to leave all regular beans
		// uninitialized to let the bean factory post-processors apply to them!
		String[] postProcessorNames =
				beanFactory.getBeanNamesForType(BeanFactoryPostProcessor.class, true, false);

		// Separate between BeanFactoryPostProcessors that implement PriorityOrdered,
		// Ordered, and the rest.
		List<BeanFactoryPostProcessor> priorityOrderedPostProcessors = new ArrayList<>();
		List<String> orderedPostProcessorNames = new ArrayList<>();
		List<String> nonOrderedPostProcessorNames = new ArrayList<>();
		for (String ppName : postProcessorNames) {
			if (processedBeans.contains(ppName)) {
				// skip - already processed in first phase above
			}
			else if (beanFactory.isTypeMatch(ppName, PriorityOrdered.class)) {
				priorityOrderedPostProcessors.add(beanFactory.getBean(ppName, BeanFactoryPostProcessor.class));
			}
			else if (beanFactory.isTypeMatch(ppName, Ordered.class)) {
				orderedPostProcessorNames.add(ppName);
			}
			else {
				nonOrderedPostProcessorNames.add(ppName);
			}
		}

		// First, invoke the BeanFactoryPostProcessors that implement PriorityOrdered.
		sortPostProcessors(priorityOrderedPostProcessors, beanFactory);
		invokeBeanFactoryPostProcessors(priorityOrderedPostProcessors, beanFactory);

		// Next, invoke the BeanFactoryPostProcessors that implement Ordered.
		List<BeanFactoryPostProcessor> orderedPostProcessors = new ArrayList<>(orderedPostProcessorNames.size());
		for (String postProcessorName : orderedPostProcessorNames) {
			orderedPostProcessors.add(beanFactory.getBean(postProcessorName, BeanFactoryPostProcessor.class));
		}
		sortPostProcessors(orderedPostProcessors, beanFactory);
		invokeBeanFactoryPostProcessors(orderedPostProcessors, beanFactory);

		// Finally, invoke all other BeanFactoryPostProcessors.
		List<BeanFactoryPostProcessor> nonOrderedPostProcessors = new ArrayList<>(nonOrderedPostProcessorNames.size());
		for (String postProcessorName : nonOrderedPostProcessorNames) {
			nonOrderedPostProcessors.add(beanFactory.getBean(postProcessorName, BeanFactoryPostProcessor.class));
		}
		invokeBeanFactoryPostProcessors(nonOrderedPostProcessors, beanFactory);

		// Clear cached merged bean definitions since the post-processors might have
		// modified the original metadata, e.g. replacing placeholders in values...
		beanFactory.clearMetadataCache();
	}
  1. 最先执行的是已经传递过来的PostProcess,例如DubboApplicationContextInitializer中注册的OverrideBeanDefinitionRegistryPostProcessor等

  2. 第二个执行的是ConfigurationClassPostProcessor,这个接口实现了PriorityOrdered.class

  3. 执行完ConfigurationClassPostProcessor的postProcessBeanDefinitionRegistry方法后,spring能扫描到的、配置类导入的任何类都以BeanDefinition的形式存在bean工厂了(除开使用后置处理器手动添加bean的情况),所以此时,@EnableDubbo注解导入的五个beanDefinition(因为还有一个带@Bean的方法)肯定在bean工厂中(注意: 此时bean并还没有被生成)

  4. 接下来就会执行除开ConfigurationClassPostProcessor并且实现了Ordered接口的后置处理器了

  5. 因为ServiceAnnotationBeanPostProcessor并没有实现Ordered接口,所以这个是最后执行的

ReferenceAnnotationBeanPostProcessor

ReferenceAnnotationBeanPostProcessor类,该类继承了InstantiationAwareBeanPostProcessor ,用来解析@Reference注解并完成依赖注入

实例化Bean后置处理器(继承BeanPostProcessor)

public class ReferenceAnnotationBeanPostProcessor extends AnnotationInjectedBeanPostProcessor implements
        ApplicationContextAware, ApplicationListener {
        
}
  1. postProcessBeforeInstantiation 在实例化目标对象之前执行,可以自定义实例化逻辑,如返回一个代理对象等。

  2. postProcessAfterInitialization Bean实例化完毕后执行的后处理操作,所有初始化逻辑、装配逻辑之前执行,如果返回false将阻止其他的InstantiationAwareBeanPostProcessor的postProcessAfterInstantiation的执行。

  3. postProcessPropertyValues 完成其他定制的一些依赖注入和依赖检查等,如AutowiredAnnotationBeanPostProcessor执行@Autowired注解注入,CommonAnnotationBeanPostProcessor执行@Resource等注解的注入,PersistenceAnnotationBeanPostProcessor执行@ PersistenceContext等JPA注解的注入,RequiredAnnotationBeanPostProcessor执行@ Required注解的检查等等。

dubbo也是采用了和@Autowired注入一样的原理,通过继承InstantiationAwareBeanPostProcessor 重写postProcessPropertyValues

Invoker

 Invoker是Dubbo中的实体域,也就是真实存在的。其他模型都向它靠拢或转换成它,它也就代表一个可执行体,可向它发起invoke调用。在服务提供方,Invoker用于调用服务提供类。在服务消费方,Invoker用于执行远程调用。
 

通过@Reference引用的时候会通过protocolBindingRefer进行客户端的创建,并返回代理对象

@Override
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
        return new AsyncToSyncInvoker<>(protocolBindingRefer(type, url));
    }

protected abstract <T> Invoker<T> protocolBindingRefer(Class<T> type, URL url) throws RpcException;

通过@Service标记类的时候会使用DelegateProviderMetaDataInvoker进行包装,并进行发布,创建Server

 Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(EXPORT_KEY, url.toFullString()));
 
 DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);

 Exporter<?> exporter = PROTOCOL.export(wrapperInvoker);

ProxyFactory

dubbo中ProxyFactory作为代理工厂,用于生产代理对象封装Invoker(reference),和使用invoker封装原对象(service)

@SPI("javassist")
public interface ProxyFactory {

    /**
     * create proxy.
     *
     * @param invoker
     * @return proxy
     */
    @Adaptive({PROXY_KEY})
    <T> T getProxy(Invoker<T> invoker) throws RpcException;

    /**
     * create proxy.
     *
     * @param invoker
     * @return proxy
     */
    @Adaptive({PROXY_KEY})
    <T> T getProxy(Invoker<T> invoker, boolean generic) throws RpcException;

    /**
     * create invoker.
     *
     * @param <T>
     * @param proxy
     * @param type
     * @param url
     * @return invoker
     */
    @Adaptive({PROXY_KEY})
    <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) throws RpcException;

}

ReferenceBean

ReferenceBean实现spring的FactoryBean接口,获取bean实例的步骤如下

  1. getObject方法会调用父类ReferenceConfig的get方法

  2. get是同步方法,检查配置,同时从已经导出的ref变量中获取值。ref为空,调用init方法初始化。

  3. init方法主要添加一些运行参数至map中,然后调用createProxy创建代理

  4. createProxy首先判断是否应该引用jvm的service provider,然后再调用REF_PROTOCOL.refer(interfaceClass, urls.get(0))方法通过DubboProtocol获取invoker,最后在使用代理工厂PROXY_FACTORY,创建invoker的代理。

  5. REF_PROTOCOL是通过Dubbo spi机制ExtensionLoader加载的AdaptiveProtocol,它会根据url获取不同的Protocol实例,

/**
 * ReferenceFactoryBean
 */
public class ReferenceBean<T> extends ReferenceConfig<T> implements FactoryBean, ApplicationContextAware, InitializingBean, DisposableBean {

    private static final long serialVersionUID = 213195494150089726L;

    private transient ApplicationContext applicationContext;

    public ReferenceBean() {
        super();
    }

    public ReferenceBean(Reference reference) {
        super(reference);
    }

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) {
        this.applicationContext = applicationContext;
        SpringExtensionFactory.addApplicationContext(applicationContext);
    }

    @Override
    public Object getObject() {
        return get();
    }

    @Override
    public Class<?> getObjectType() {
        return getInterfaceClass();
    }

    @Override
    @Parameter(excluded = true)
    public boolean isSingleton() {
        return true;
    }

    @Override
    @SuppressWarnings({"unchecked"})
    public void afterPropertiesSet() throws Exception {
        if (shouldInit()) {
            getObject();
        }
    }
    @Override
    public void destroy() {
        // do nothing
    }
}

ServiceBean

ServiceBean是Dubbo中很重要的一个类,每个暴露出去的服务都会生成一个ServiceBean

ServiceBean继承自ServiceConfig,ServiceConfig是服务暴露的具体实现类。另外ServiceBean还实现了InitializingBean,DisposableBean,ApplicationContextAware,ApplicationListener,BeanNameAware
ServiceBean实现了ApplicationListener接口,当ApplicationContext启动完成事件后调用其onApplicationEvent()方法如下

@Override
public void onApplicationEvent(ContextRefreshedEvent event) {
        if (!isExported() && !isUnexported()) {
            if (logger.isInfoEnabled()) {
                logger.info("The service ready on spring started. service: " + getInterface());
            }
            export();
        }
    }

会调用其父类的ServiceConfig的export方法完成服务暴露(后面版本是使用一个DubboBootstrap进行监听事件,从ConfigManager(一个类似多个Spring scope的缓存)里取出缓存,然后异步暴露)

Exporter<?> exporter = protocol.export(invoker);

其中DubboProtocol中的export会创建一个exporter,打开一个Server,并放进exporterMap里面,到时候再从Request中取出key,获取Exporter,然后进行业务操作

public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
        URL url = invoker.getUrl();

        // export service.
        String key = serviceKey(url);
        DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
        exporterMap.put(key, exporter);

        //export an stub service for dispatching event
        Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY, Constants.DEFAULT_STUB_EVENT);
        Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false);
        if (isStubSupportEvent && !isCallbackservice) {
            String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY);
            if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
                if (logger.isWarnEnabled()) {
                    logger.warn(new IllegalStateException("consumer [" + url.getParameter(Constants.INTERFACE_KEY) +
                            "], has set stubproxy support event ,but no stub methods founded."));
                }
            } else {
                stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);
            }
        }

        openServer(url);

        return exporter;
    }