我是靠谱客的博主 哭泣夕阳,最近开发中收集的这篇文章主要介绍3.3 dubbo-消费方获取目标类的代理bean,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

1 获取目标类的代理bean

此时applicationContext中有两个bean,一个id=‘demo-consumer’,对应的实例是ApplicationConfig的实例,一个是id=‘demoService’ 对应的实例是ReferenceBean的实例;

DemoService demoService = (DemoService)context.getBean("demoService");
//  ReferenceBean的继承结构:
ReferenceBean<T> extends ReferenceConfig<T> implements FactoryBean, ApplicationContextAware, InitializingBean, DisposableBean

ReferenceBean 实现了FactoryBean,那么在使用getBean(“demoService”),在BeanFactory中获取Bean时,spring在执行到DefaultListableBeanFactory.getObjectForBeanInstance方法时,会判断当前容器中的实例是否是FactoryBean的实例,如果是,则调用重写的getObject()方法,判断的逻辑和代码如下图所示:
在这里插入图片描述在这里插入图片描述
因为ReferenceBean实现了FactoryBean,因此在执行getBean(“demoService”),方法时,最终执行了ReferenceBean.getObject();代码如下:

	//RefereneceBean的结构:
	public class ReferenceBean<T> extends ReferenceConfig<T> implements FactoryBean, ApplicationContextAware, InitializingBean, DisposableBean
	
	// ReferenceBean.getObject()
    @Override
    public Object getObject() {
        return get();
    }
    //ReferenceConfig.get()
        public synchronized T get() {
        checkAndUpdateSubConfigs();

        if (destroyed) {
            throw new IllegalStateException("The invoker of ReferenceConfig(" + url + ") has already destroyed!");
        }
        if (ref == null) {
            init();
        }
        return ref;
    }

主要是init方法。init的只要代码如下:

//ReferenceConfig.init()
		...
        ref = createProxy(map);
        //map的值是:{side=consumer, application=demo-consumer, register.ip=192.168.0.103, release=2.7.3, methods=sayHello, lazy=false, sticky=false, dubbo=2.0.2, pid=8564, check=false, interface=org.apache.dubbo.demo.DemoService, timestamp=1575614824918}
        String serviceKey = URL.buildKey(interfaceName, group, version);
        ApplicationModel.initConsumerModel(serviceKey, buildConsumerModel(serviceKey, attributes));
        initialized = true;
    //ReferenceConfig.createProxy()方法
       @SuppressWarnings({"unchecked", "rawtypes", "deprecation"})
    private T createProxy(Map<String, String> map) {
    	//判断是否是本地方法调用,判断依据,referenece是否配置了url,配置了,则为远程方法调用
        if (shouldJvmRefer(map)) {
            URL url = new URL(LOCAL_PROTOCOL, LOCALHOST_VALUE, 0, interfaceClass.getName()).addParameters(map);
            invoker = REF_PROTOCOL.refer(interfaceClass, url);
            if (logger.isInfoEnabled()) {
                logger.info("Using injvm service " + interfaceClass.getName());
            }
        } else {
        	// 开始构建服务提供方的URL信息,URL中的信息见下图
            urls.clear(); // reference retry init will add url to urls, lead to OOM
            if (url != null && url.length() > 0) { // user specified URL, could be peer-to-peer address, or register center's address.
                String[] us = SEMICOLON_SPLIT_PATTERN.split(url);
                if (us != null && us.length > 0) {
                    for (String u : us) {
                        URL url = URL.valueOf(u);
                        if (StringUtils.isEmpty(url.getPath())) {
                            url = url.setPath(interfaceName);
                        }
                        if (REGISTRY_PROTOCOL.equals(url.getProtocol())) {
                            urls.add(url.addParameterAndEncoded(REFER_KEY, StringUtils.toQueryString(map)));
                        } else {
                            urls.add(ClusterUtils.mergeUrl(url, map));
                        }
                    }
                }
            } else { // assemble URL from register center's configuration
                // if protocols not injvm checkRegistry
                if (!LOCAL_PROTOCOL.equalsIgnoreCase(getProtocol())){
                    checkRegistry();
                    List<URL> us = loadRegistries(false);
                    if (CollectionUtils.isNotEmpty(us)) {
                        for (URL u : us) {
                            URL monitorUrl = loadMonitor(u);
                            if (monitorUrl != null) {
                                map.put(MONITOR_KEY, URL.encode(monitorUrl.toFullString()));
                            }
                            urls.add(u.addParameterAndEncoded(REFER_KEY, StringUtils.toQueryString(map)));
                        }
                    }
                    if (urls.isEmpty()) {
                        throw new IllegalStateException("No such any registry to reference " + interfaceName + " on the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", please config <dubbo:registry address="..." /> to your spring config.");
                    }
                }
            }
			/*
			//private static final Protocol REF_PROTOCOL = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();
			上述代码涉及到Dubbo中的对SPI扩展,可以查看官方文档:
			http://dubbo.apache.org/zh-cn/docs/source_code_guide/adaptive-extension.html
			依据官方文档中可知,REF_PROTOCOL实际是一个动态生成的类的实例,类名是Protocol$Adaptive,代码如下图。
			REF_PROTOCOL.refer(),默认情况下,调用dubboProtocol.refer();	
*/
            if (urls.size() == 1) {
            	// 如果只有一个URL则,构建Invoker
               invoker = REF_PROTOCOL.refer(interfaceClass, urls.get(0));
            } else {
            	//多个URL,聚合为一个Invoker
                List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
                URL registryURL = null;
                for (URL url : urls) {
                    invokers.add(REF_PROTOCOL.refer(interfaceClass, url));
                    if (REGISTRY_PROTOCOL.equals(url.getProtocol())) {
                        registryURL = url; // use last registry url
                    }
                }
                if (registryURL != null) { // registry url is available
                    // use RegistryAwareCluster only when register's CLUSTER is available
                    URL u = registryURL.addParameter(CLUSTER_KEY, RegistryAwareCluster.NAME);
                    // The invoker wrap relation would be: RegistryAwareClusterInvoker(StaticDirectory) -> FailoverClusterInvoker(RegistryDirectory, will execute route) -> Invoker
                    invoker = CLUSTER.join(new StaticDirectory(u, invokers));
                } else { // not a registry url, must be direct invoke.
                    invoker = CLUSTER.join(new StaticDirectory(invokers));
                }
            }
        }

        if (shouldCheck() && !invoker.isAvailable()) {
            throw new IllegalStateException("Failed to check the status of the service " + interfaceName + ". No provider available for the service " + (group == null ? "" : group + "/") + interfaceName + (version == null ? "" : ":" + version) + " from the url " + invoker.getUrl() + " to the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion());
        }
        if (logger.isInfoEnabled()) {
            logger.info("Refer dubbo service " + interfaceClass.getName() + " from url " + invoker.getUrl());
        }
        /**
         * @since 2.7.0
         * ServiceData Store
         */
        MetadataReportService metadataReportService = null;
        if ((metadataReportService = getMetadataReportService()) != null) {
            URL consumerURL = new URL(CONSUMER_PROTOCOL, map.remove(REGISTER_IP_KEY), 0, map.get(INTERFACE_KEY), map);
            metadataReportService.publishConsumer(consumerURL);
        }
        // create service proxy
        return (T) PROXY_FACTORY.getProxy(invoker);
    }

REF_PROTOCOL生成的动态类代码

public class Protocol$Adaptive implements org.apache.dubbo.rpc.Protocol {
    //没有@Adaptive注解,直接抛出异常
	public void destroy() {
		throw new UnsupportedOperationException(
				"The method public abstract void org.apache.dubbo.rpc.Protocol.destroy() of interface org.apache.dubbo.rpc.Protocol is not adaptive method!");
	}

	public int getDefaultPort() {
		throw new UnsupportedOperationException(
				"The method public abstract int org.apache.dubbo.rpc.Protocol.getDefaultPort() of interface org.apache.dubbo.rpc.Protocol is not adaptive method!");
	}
	//有@Adaptive注解,编写通过URL获取具体扩展类的代码,有代码可知调用refer()方法实际调用extension.refer()方法;extension在默认情况下,是通过关键词dubbo进行配置的扩展类。
	@SuppressWarnings("unchecked")
	public Invoker refer(Class arg0, URL arg1) throws org.apache.dubbo.rpc.RpcException {
		if (arg1 == null)
			throw new IllegalArgumentException("url == null");
		URL url = arg1;
		String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());
		if (extName == null)
			throw new IllegalStateException("Failed to get extension (org.apache.dubbo.rpc.Protocol) name from url ("
					+ url.toString() + ") use keys([protocol])");
		Protocol extension = (Protocol) ExtensionLoader.getExtensionLoader(Protocol.class).getExtension(extName);
		return extension.refer(arg0, arg1);
	}

	@SuppressWarnings("unchecked")
	public Exporter<?> export(Invoker arg0) throws RpcException {
		if (arg0 == null)
			throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument == null");
		if (arg0.getUrl() == null)
			throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument getUrl() == null");
		URL url = arg0.getUrl();
		String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());
		if (extName == null)
			throw new IllegalStateException("Failed to get extension (org.apache.dubbo.rpc.Protocol) name from url ("
					+ url.toString() + ") use keys([protocol])");
		Protocol extension = (Protocol) ExtensionLoader.getExtensionLoader(Protocol.class).getExtension(extName);
		return extension.export(arg0);
	}
}

URL信息
其中string=
dubbo://127.0.0.1:20880/org.apache.dubbo.demo.DemoService?application=demo-consumer&check=false&interface=org.apache.dubbo.demo.DemoService&lazy=false&pid=8564&register.ip=192.168.0.103&remote.application=&side=consumer&sticky=false

DubboProtocol的refer继承自AbstractProtocol:代码如下:

	//AbstractProtocol.refer
    @Override
    public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
        return new AsyncToSyncInvoker<>(protocolBindingRefer(type, url));
    }

其中AsyncToSyncInvoker是一个代理类,其中包含了一个目标的Invoker;目标Invoker来源于protocolBindingRefer(type, url);其代码如下:

    @Override
    public <T> Invoker<T> protocolBindingRefer(Class<T> serviceType, URL url) throws RpcException {
        optimizeSerialization(url);

        // create rpc invoker.
       	
        DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
        invokers.add(invoker);

        return invoker;
    }

因此,代理类中的目标Invoker就是DubboInvoker,context.getBean(“demoService”)返回的就是一个持有DubboInvoker的代理类AsyncToSyncInvoker的动态代理对象,其中该动态代理对象的InvocationHandler就是InvokerInvocationHandler。

2 获取ExchangeClient

ExchangeClient的继承结构如下:

public interface ExchangeClient extends Client, ExchangeChannel{}

创建DubboInvoker的代码如下:

        DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);

getClients(url)返回的是ExchageClient[]列表;代码如下:

	//DubboProtocol.getClients
    private ExchangeClient[] getClients(URL url) {
        // whether to share connection

        boolean useShareConnect = false;

        int connections = url.getParameter(CONNECTIONS_KEY, 0);
        List<ReferenceCountExchangeClient> shareClients = null;
        // if not configured, connection is shared, otherwise, one connection for one service
        if (connections == 0) {
            useShareConnect = true;

            /**
             * The xml configuration should have a higher priority than properties.
             */
            String shareConnectionsStr = url.getParameter(SHARE_CONNECTIONS_KEY, (String) null);
            connections = Integer.parseInt(StringUtils.isBlank(shareConnectionsStr) ? ConfigUtils.getProperty(SHARE_CONNECTIONS_KEY,
                    DEFAULT_SHARE_CONNECTIONS) : shareConnectionsStr);
            shareClients = getSharedClient(url, connections);
        }

        ExchangeClient[] clients = new ExchangeClient[connections];
        for (int i = 0; i < clients.length; i++) {
            if (useShareConnect) {
                clients[i] = shareClients.get(i);

            } else {
                clients[i] = initClient(url);
            }
        }

        return clients;
    }

上述代码主要判断是否可以共享连接数,如果在配置文件中配置了dubbo:service,那么一个service独立使用一个连接,如果没有配置,所有的service共享一个连接。后面代码两个功能,一是获取共享连接getSharedClient,一是初始化独立连接initClient

2.1 获取共享连接getSharedClient(url, connections);

	//dubboProtocol.getSharedClient
    private List<ReferenceCountExchangeClient> getSharedClient(URL url, int connectNum) {
        //xml配置的url,
        String key = url.getAddress();
        List<ReferenceCountExchangeClient> clients = referenceClientMap.get(key);
		//如果缓存中存在,核对是否能够使用
        if (checkClientCanUse(clients)) {
            batchClientRefIncr(clients);
            return clients;
        }

        locks.putIfAbsent(key, new Object());
        synchronized (locks.get(key)) {
            clients = referenceClientMap.get(key);
            // dubbo check
            if (checkClientCanUse(clients)) {
                batchClientRefIncr(clients);
                return clients;
            }

            // connectNum must be greater than or equal to 1
            connectNum = Math.max(connectNum, 1);

            // If the clients is empty, then the first initialization is
            if (CollectionUtils.isEmpty(clients)) {
                clients = buildReferenceCountExchangeClientList(url, connectNum);
                referenceClientMap.put(key, clients);

            } else {
                for (int i = 0; i < clients.size(); i++) {
                    ReferenceCountExchangeClient referenceCountExchangeClient = clients.get(i);
         // If there is a client in the list that is no longer available, create a new one to replace him.
                    if (referenceCountExchangeClient == null || referenceCountExchangeClient.isClosed()) {
                        clients.set(i, buildReferenceCountExchangeClient(url));
                        continue;
                    }

                    referenceCountExchangeClient.incrementAndGetCount();
                }
            }

            /**
             * I understand that the purpose of the remove operation here is to avoid the expired url key
             * always occupying this memory space.
             */
            locks.remove(key);

            return clients;
        }
    }

	//DubboProtocol.buildReferenceCountExchangeClientList
    private List<ReferenceCountExchangeClient> buildReferenceCountExchangeClientList(URL url, int connectNum) {
        List<ReferenceCountExchangeClient> clients = new ArrayList<>();

        for (int i = 0; i < connectNum; i++) {
            clients.add(buildReferenceCountExchangeClient(url));
        }

        return clients;
    }

buildReferenceCountExchangeClientList不过是循环调用buildReferenceCountExchangeClient的方法,说明共享连接数就是在List中存放多个相同的ReferenceCountExchangeClient。

   //DubboProtocol.buildReferenceCountExchangeClient
   private ReferenceCountExchangeClient buildReferenceCountExchangeClient(URL url) {
        ExchangeClient exchangeClient = initClient(url);
        return new ReferenceCountExchangeClient(exchangeClient);
  }

上面代码说明ReferenceCountExchangeClient是一个代理类,其中的目标类为initClient的返回类。init代码如下:

2.2 初始化独立连接initClient

	//DubboProtocol.initClient
    /**
     * Create new connection
     *
     * @param url
     */
    private ExchangeClient initClient(URL url) {

        // client type setting.
        String str = url.getParameter(CLIENT_KEY, url.getParameter(SERVER_KEY, DEFAULT_REMOTING_CLIENT));

        url = url.addParameter(CODEC_KEY, DubboCodec.NAME);
        // enable heartbeat by default
        url = url.addParameterIfAbsent(HEARTBEAT_KEY, String.valueOf(DEFAULT_HEARTBEAT));

        // BIO is not allowed since it has severe performance issue.
        if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
            throw new RpcException("Unsupported client type: " + str + "," +
                    " supported client type is " + StringUtils.join(ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions(), " "));
        }

        ExchangeClient client;
        try {
            // connection should be lazy
            if (url.getParameter(LAZY_CONNECT_KEY, false)) {
                client = new LazyConnectExchangeClient(url, requestHandler);

            } else {
                client = Exchangers.connect(url, requestHandler);
            }

        } catch (RemotingException e) {
            throw new RpcException("Fail to create remoting client for service(" + url + "): " + e.getMessage(), e);
        }
        return client;
    }
    //Exchangers.connect
    public static ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
        if (url == null) {
            throw new IllegalArgumentException("url == null");
        }
        if (handler == null) {
            throw new IllegalArgumentException("handler == null");
        }
        url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
        return getExchanger(url).connect(url, handler);
        //getExchanger(url)是依据SPI返回不同的Exchanger.class的扩展类,默认是HeaderExchanger
    }
	//HeaderExchanger.connect
    @Override
    public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
        return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true);
    }
   public static Client connect(URL url, ChannelHandler... handlers) throws RemotingException {
        if (url == null) {
            throw new IllegalArgumentException("url == null");
        }
        ChannelHandler handler;
        if (handlers == null || handlers.length == 0) {
            handler = new ChannelHandlerAdapter();
        } else if (handlers.length == 1) {
            handler = handlers[0];
        } else {
            handler = new ChannelHandlerDispatcher(handlers);
        }
        return getTransporter().connect(url, handler);
        //getTransporter()是依据SPI返回不同的Transporter.class的扩展类,默认是NettyTransporter
    }
   //NettyTransporter.connect
   @Override
    public Client connect(URL url, ChannelHandler listener) throws RemotingException {
        return new NettyClient(url, listener);
    }

NettyClient是Netty中相关的内容,可以查看相关文档。
总结:ExchangeClient[]中存放的是ReferenceCountExchangeClient,ReferenceCountExchangeClient是一个代理类,目标类是LazyConnectExchangeClient(xml中配置lazy=true)或者HeaderExchangeClient,其中HeaderExchangeClient中client存放NettyClient的实例。

最后

以上就是哭泣夕阳为你收集整理的3.3 dubbo-消费方获取目标类的代理bean的全部内容,希望文章能够帮你解决3.3 dubbo-消费方获取目标类的代理bean所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(35)

评论列表共有 0 条评论

立即
投稿
返回
顶部