我是靠谱客的博主 独特水壶,最近开发中收集的这篇文章主要介绍Dubbo的Consumer启动过程(通过URL对象注册,订阅),觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

接着上篇博客继续讲。

ReferenceConfig在生成URL的过程中,有这么一个步骤:

public abstract class AbstractInterfaceConfig extends AbstractMethodConfig {

    /**
     * 加载注册中心 URL 数组
     *
     * @param provider 是否是服务提供者
     * @return URL 数组
     */
    protected List<URL> loadRegistries(boolean provider) {

        ......
        // 解析地址,创建 Dubbo URL 数组。(数组大小可以为一),同时将map内的参数填充入每个url
        List<URL> urls = UrlUtils.parseURLs(address, map);
        // 循环 `url` ,设置 "registry" 和 "protocol" 属性。
        for (URL url : urls) {
            // 设置 `registry=${protocol}` 和 `protocol=registry` 到 URL
            url = url.addParameter(Constants.REGISTRY_KEY, url.getProtocol());
            // 首先需要使用url向zookeeper注册自身
            url = url.setProtocol(Constants.REGISTRY_PROTOCOL);
            // 添加到结果
            if ((provider && url.getParameter(Constants.REGISTER_KEY, true)) // 服务提供者 && 注册
                || (!provider && url.getParameter(Constants.SUBSCRIBE_KEY, true))) { // 服务消费者 && 订阅
                registryList.add(url);
            }
        }

    }

}

其用 Constants.REGISTRY_PROTOCOL 替换掉 URL上的protocol,将原始值放进RUL的prameter集合中,这样在下一步执行时,会用ExtensionLoader生成的Protocol$Adaptive来选择RegistryProtocol执行,这样就会先向相应的注册中心注册自己,然后订阅Providers,最后连接Provider。

public class ReferenceConfig<T> extends AbstractReferenceConfig {

    /**
     * 自适应 Protocol 实现对象
     */
    private static final Protocol refprotocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();

    /**
     * 创建 Service 代理对象
     *
     * @param map 集合
     * @return 代理对象
     */
    @SuppressWarnings({"unchecked", "rawtypes", "deprecation"})
    private T createProxy(Map<String, String> map) {
        ......
        // 单 `urls` 时,引用服务,返回 Invoker 对象
        if (urls.size() == 1) {
            // 引用服务
            invoker = refprotocol.refer(interfaceClass, urls.get(0));
        } else {
            // 循环 `urls` ,引用服务,返回 Invoker 对象
            List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
            URL registryURL = null;
            for (URL url : urls) {
                // 引用服务
                invokers.add(refprotocol.refer(interfaceClass, url));
                // 使用最后一个注册中心的 URL
                if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
                    registryURL = url; // use last registry url
                }
            }
            // 有注册中心
            if (registryURL != null) { // registry url is available
                // 对有注册中心的 Cluster 只用 AvailableCluster
                // use AvailableCluster only when register's cluster is available
                URL u = registryURL.addParameter(Constants.CLUSTER_KEY, AvailableCluster.NAME);
                // 将多个invoker组合成一个集群式的invker,以便负载均衡
                invoker = cluster.join(new StaticDirectory(u, invokers));
                // 无注册中心
            } else { // not a registry url
                // // 将多个invoker组合成一个集群式的invker,以便负载均衡
                invoker = cluster.join(new StaticDirectory(invokers));
            }
        }
    }
}

上段代码关键是 : refprotocol.refer(interfaceClass, urls.get(0)); refprotocol.refer(interfaceClass, url) ; 当只有一个URL对象时,连接生成一个Invoker,如果有多个URL,也就是说有多个Provider,那么需要将多个Invoker组合成一个Directory< T > 的实现对象,做集群容错以及负载均衡。

refprotocol 在执行refer() 方法时,会从提取URL上的protocol,将执行过程传递给相应的Protocol实现对象,我们这里就是RegistryProtocol对象了。

public class RegistryProtocol implements Protocol {

    @Override
    public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
        // 获得真实的注册中心的 URL, 还原protocol的设置。已经进入了此注册器,不需要再用protocol的设置了
        url = url.setProtocol(url.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_REGISTRY)).removeParameter(Constants.REGISTRY_KEY);
        // 通过RegistryFactory$Adaptive 获取 ZookeeperRegistryFactory
        // 获得注册中心,第一次会连接zookeeper,创建 {@link ZooKeeperClient},生成{@link ZookeeperRegistry}
        Registry registry = registryFactory.getRegistry(url);
        // TODO 芋艿
        if (RegistryService.class.equals(type)) {
            return proxyFactory.getInvoker((T)registry, type, url);
        }

        // 获得服务引用配置参数集合
        // group="a,b" or group="*"
        Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(Constants.REFER_KEY));
        String group = qs.get(Constants.GROUP_KEY);
        // 分组聚合,参见文档 http://dubbo.io/books/dubbo-user-book/demos/group-merger.html
        if (group != null && group.length() > 0) {
            if ((Constants.COMMA_SPLIT_PATTERN.split(group)).length > 1 || "*".equals(group)) {
                // 执行服务引用
                return doRefer(getMergeableCluster(), registry, type, url);
            }
        }
        // 执行服务引用
        return doRefer(cluster, registry, type, url);
    }

    /**
     * 执行服务引用,返回 Invoker 对象
     *
     * @param cluster  Cluster 对象
     * @param registry 注册中心对象  {@link FailbackRegistry}
     * @param type     服务接口类型
     * @param url      注册中心 URL
     * @param <T>      泛型
     * @return Invoker 对象
     */
    private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
        // 创建 RegistryDirectory 对象,并设置注册中心
        RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
        directory.setRegistry(registry);
        directory.setProtocol(protocol);
        // 创建订阅 URL
        // all attributes of REFER_KEY
        Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters()); // 服务引用配置集合
        URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, parameters.remove(Constants.REGISTER_IP_KEY), 0, type.getName(), parameters);
        // 向注册中心注册自己(服务消费者),FailbackRegistry
        if (!Constants.ANY_VALUE.equals(url.getServiceInterface()) && url.getParameter(Constants.REGISTER_KEY, true)) {
            registry.register(subscribeUrl.addParameters(Constants.CATEGORY_KEY, Constants.CONSUMERS_CATEGORY, Constants.CHECK_KEY, String.valueOf(false)));
        }
        // 向注册中心订阅服务提供者, 同时订阅providers,configurators,routers 三个类目
        directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY, Constants.PROVIDERS_CATEGORY + "," + Constants.CONFIGURATORS_CATEGORY + "," + Constants.ROUTERS_CATEGORY));
        // 创建 Invoker 对象,【TODO 8015】集群容错
        Invoker invoker = cluster.join(directory);
        // 向本地注册表,注册消费者
        ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory);
        return invoker;
    }

}

在refer(Class< T > type, URL url) 和 doRefer(Cluster cluster, Registry registry, Class< T > type, URL url) 里完成Invoker的初始化,也就是完成消费者对象的初始化过程。

总结下上述两个函数的关键作用:

  1. 根据protocol配置地址,连接Zookeeper,生成连接客户端对象,Zookeeper Client
  2. 向注册中心注册自己作为一个Consumer
  3. 订阅Zookeeper下指定Path,获取此目录下的Providers的URL字符串
  4. 根据获取到的字符串,重解析成URL对象,然后根据Interface,Version,Group 择取匹配的URL,然后根据这些URL连接相应的Privoder
  5. 将连接Provider生成的Invokers生成Directory,指定相应的集群容错机制,负载均衡器,之后返回给Spring容器。

我们按顺序依次看,先看第一点,生成Zookeeper Client :

public class ZkclientZookeeperClient extends AbstractZookeeperClient<IZkChildListener> {

    public ZkclientZookeeperClient(URL url) {
        super(url);
        // 创建 client 对象,地址为:127.0.0.1:2181 形式
        client = new ZkClientWrapper(url.getBackupAddress(), 30000);
        // 添加连接监听器
        client.addListener(new IZkStateListener() {
            @Override
            public void handleStateChanged(KeeperState state) throws Exception {
                ZkclientZookeeperClient.this.state = state;
                if (state == KeeperState.Disconnected) {
                    stateChanged(StateListener.DISCONNECTED);
                } else if (state == KeeperState.SyncConnected) {
                    stateChanged(StateListener.CONNECTED);
                }
            }

            @Override
            public void handleNewSession() throws Exception {
                stateChanged(StateListener.RECONNECTED);
            }
        });
        // 启动 client
        client.start();
    }

}

在连接Zookeeper Server时,最终会调用上述构造函数,生成Client,同时为Zookeeper的事件注册监听器。

下面看第二点,向Zookeeper注册自身:

public class RegistryProtocol implements Protocol {

    private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
        ......
        // 向注册中心注册自己(服务消费者),FailbackRegistry
        if (!Constants.ANY_VALUE.equals(url.getServiceInterface()) && url.getParameter(Constants.REGISTER_KEY, true)) {
            registry.register(subscribeUrl.addParameters(Constants.CATEGORY_KEY, Constants.CONSUMERS_CATEGORY, Constants.CHECK_KEY, String.valueOf(false)));
        }

    }

}


public class ZookeeperRegistry extends FailbackRegistry {

    /**
     * 向Zookeeper注册URL
     *
     * @param url
     */
    @Override
    protected void doRegister(URL url) {
        try {
            zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true));
        } catch (Throwable e) {
            throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
        }
    }

    /**
     * 获得 URL 的路径
     *
     * Root + Service + Type + URL
     *
     * 被 {@link #doRegister(URL)} 和 {@link #doUnregister(URL)} 调用
     *
     * @param url URL
     * @return 路径
     */
    private String toUrlPath(URL url) {
        return toCategoryPath(url) + Constants.PATH_SEPARATOR + URL.encode(url.toFullString());
    }

}


public abstract class AbstractZookeeperClient<TargetChildListener> implements ZookeeperClient {

    @Override
    public void create(String path, boolean ephemeral) {
        // 循环创建父路径
        int i = path.lastIndexOf('/');
        if (i > 0) {
            String parentPath = path.substring(0, i);
            if (!checkExists(parentPath)) {
                // 创建持久化的父节点 ,/dubbo/***Service/providers, /dubbo/***Service/consumers
                create(parentPath, false);
            }
        }
        if (ephemeral) {
            // 创建临时节点
            createEphemeral(path);
        } else {
            // 创建持久节点
            createPersistent(path);
        }
    }  
}

向Zookeeper注册自身时,在父级目录下创建一个临时节点,节点的形式为 : Root + Service + Type + URL。

当Consumer与Zookeeper的长连接断开后,会移除此节点。

接下来看Consumer订阅Zookeeper里的Provider节点信息:

public class RegistryProtocol implements Protocol {

     /**
     * 执行服务引用,返回 Invoker 对象
     *
     * @param cluster  Cluster 对象
     * @param registry 注册中心对象  {@link FailbackRegistry}
     * @param type     服务接口类型
     * @param url      注册中心 URL
     * @param <T>      泛型
     * @return Invoker 对象
     */
    private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
        ......
        // 向注册中心订阅服务提供者, 同时订阅providers,configurators,routers 三个类目
        directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY, Constants.PROVIDERS_CATEGORY + "," + Constants.CONFIGURATORS_CATEGORY + "," + Constants.ROUTERS_CATEGORY));
        ......
    }

}


public class ZookeeperRegistry extends FailbackRegistry {

    @Override
    protected void doSubscribe(final URL url, final NotifyListener listener) {
        try {
            // 处理所有 Service 层的发起订阅,例如监控中心的订阅
            if (Constants.ANY_VALUE.equals(url.getServiceInterface())) {
                ......
                // 处理指定 Service 层的发起订阅,例如服务消费者的订阅
            } else {
                // 子节点数据数组
                List<URL> urls = new ArrayList<URL>();
                // 循环分类数组 , router, configurator, provider
                for (String path : toCategoriesPath(url)) {
                    // 获得 url 对应的监听器集合
                    ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
                    if (listeners == null) { // 不存在,进行创建
                        zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>());
                        listeners = zkListeners.get(url);
                    }
                    // 获得 ChildListener 对象
                    ChildListener zkListener = listeners.get(listener);
                    if (zkListener == null) { //  不存在子目录的监听器,进行创建 ChildListener 对象
                        // 订阅父级目录, 当有子节点发生变化时,触发此回调函数
                        listeners.putIfAbsent(listener, new ChildListener() {
                            @Override
                            public void childChanged(String parentPath, List<String> currentChilds) {
                                // 变更时,调用 `#notify(...)` 方法,回调 NotifyListener
                                ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds));
                            }
                        });
                        zkListener = listeners.get(listener);
                    }
                    // 创建 Type 节点。该节点为持久节点。
                    zkClient.create(path, false);
                    // 向 Zookeeper ,PATH 节点,发起订阅,返回此节点下的所有子元素 path : /根节点/接口全名/providers, 比如 : /dubbo/com.bob.service.CityService/providers
                    List<String> children = zkClient.addChildListener(path, zkListener);
                    // 添加到 `urls` 中
                    if (children != null) {
                        urls.addAll(toUrlsWithEmpty(url, path, children));
                    }
                }
                // 首次全量数据获取完成时,调用 `#notify(...)` 方法,回调 NotifyListener, 在这一步从连接Provider,实例化Invoker
                notify(url, listener, urls);
            }
        } catch (Throwable e) {
            throw new RpcException("Failed to subscribe " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
        }
    }

}

在订阅Providers目录时,会注册一个监听器Listener,当Provider的元素发生改变时,Zookeeper会通知Client,触发此回调函数,更新Consumer持有的Provider列表,更新Invoker集群。

第一次订阅时返回的字符串集合即使Provider的URL字符串集合,Dubbo解析这些字符串,生成URL,然后根据Interface,Group,Version匹配自己指定的Provider。

下面我们看这个回调函数内容:

public abstract class AbstractRegistry implements Registry {

    /**
     * 通知监听器,URL 变化结果。当Zookeeper里的节点发生变化时,Dubbo注册了监听器,到时候会回调此函数
     *
     * 数据流向 `urls` => {@link #notified} => {@link #properties} => {@link #file}
     *
     * @param url 消费者 URL
     * @param listener 监听器
     * @param urls 通知的 URL 变化结果(全量数据)
     */
    protected void  notify(URL url, NotifyListener listener, List<URL> urls) {
        if (url == null) {
            throw new IllegalArgumentException("notify url == null");
        }
        if (listener == null) {
            throw new IllegalArgumentException("notify listener == null");
        }
        if ((urls == null || urls.isEmpty())
                && !Constants.ANY_VALUE.equals(url.getServiceInterface())) {
            logger.warn("Ignore empty notify urls for subscribe url " + url);
            return;
        }
        if (logger.isInfoEnabled()) {
            logger.info("Notify urls for subscribe url " + url + ", urls: " + urls);
        }
        // 将 `urls` 按照 `url.parameter.category` 分类,添加到集合
        // 注意,特殊情况,使用 curator 连接 Zookeeper 时,若是服务消费者,连接断开,会出现 category=providers,configurations,routes
        Map<String, List<URL>> result = new HashMap<String, List<URL>>();
        for (URL u : urls) {
            // ############根据Interface,Group,Version验证Provider是否匹配Consumer的订阅
            if (UrlUtils.isMatch(url, u)) {  
                String category = u.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);
                List<URL> categoryList = result.get(category);
                if (categoryList == null) {
                    categoryList = new ArrayList<URL>();
                    result.put(category, categoryList);               // category >> 相应的urls
                }
                categoryList.add(u);
            }
        }
        if (result.size() == 0) {
            return;
        }
        // 获得消费者 URL 对应的在 `notified` 中,通知的 URL 变化结果(全量数据)
        Map<String, List<URL>> categoryNotified = notified.get(url);
        if (categoryNotified == null) {
            notified.putIfAbsent(url, new ConcurrentHashMap<String, List<URL>>());
            categoryNotified = notified.get(url);
        }
        // 处理通知的 URL 变化结果(全量数据)
        for (Map.Entry<String, List<URL>> entry : result.entrySet()) {
            String category = entry.getKey();
            List<URL> categoryList = entry.getValue();
            // 覆盖到 `notified`
            // 当某个分类的数据为空时,会依然有 urls 。其中 `urls[0].protocol = empty` ,通过这样的方式,处理所有服务提供者为空的情况。
            categoryNotified.put(category, categoryList);
            // 保存到文件
            saveProperties(url);
            // 通知监听器, 最终会调用RegistryDirectory#notify(List<URL>)
            listener.notify(categoryList);
        }
    }

}


public class UrlUtils {

    /**
     * Provider和Consumer的URL是否匹配
     *
     * @param consumerUrl
     * @param providerUrl
     * @return
     */
    public static boolean isMatch(URL consumerUrl, URL providerUrl) {
        String consumerInterface = consumerUrl.getServiceInterface();
        String providerInterface = providerUrl.getServiceInterface();
        // 如果consumer指定的接口为* 或者 consumer的接口和provider的接口名称一致
        if (!(Constants.ANY_VALUE.equals(consumerInterface) || StringUtils.isEquals(consumerInterface, providerInterface))) {
            return false;
        }
        // 匹配Category, consumer的categories 包含 provider的category
        if (!isMatchCategory(providerUrl.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY),
            consumerUrl.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY))) {
            return false;
        }
        // provider?enabled=true || consumer?enabled= *
        if (!providerUrl.getParameter(Constants.ENABLED_KEY, true)
            && !Constants.ANY_VALUE.equals(consumerUrl.getParameter(Constants.ENABLED_KEY))) {
            return false;
        }

        String consumerGroup = consumerUrl.getParameter(Constants.GROUP_KEY);
        String consumerVersion = consumerUrl.getParameter(Constants.VERSION_KEY);
        String consumerClassifier = consumerUrl.getParameter(Constants.CLASSIFIER_KEY, Constants.ANY_VALUE);

        String providerGroup = providerUrl.getParameter(Constants.GROUP_KEY);
        String providerVersion = providerUrl.getParameter(Constants.VERSION_KEY);
        String providerClassifier = providerUrl.getParameter(Constants.CLASSIFIER_KEY, Constants.ANY_VALUE);
        // group,version,classfier 均匹配
        return (Constants.ANY_VALUE.equals(consumerGroup) || StringUtils.isEquals(consumerGroup, providerGroup) || StringUtils.isContains(consumerGroup,
            providerGroup))
            && (Constants.ANY_VALUE.equals(consumerVersion) || StringUtils.isEquals(consumerVersion, providerVersion))
            && (consumerClassifier == null || Constants.ANY_VALUE.equals(consumerClassifier) || StringUtils.isEquals(consumerClassifier, providerClassifier));
    }

}

从Zookeeper处订阅到了Provider的URL信息,之后就吃将URL信息解析生成相应的Invoker对象了:

public class RegistryDirectory<T> extends AbstractDirectory<T> implements NotifyListener {

    private void refreshInvoker(List<URL> invokerUrls) {
        // 从zookeeper获取到的url已经没有合适的了,在订阅返回为空时,会手动生成一个 EMPTY_PROTOCOL 的 url
        if (invokerUrls != null && invokerUrls.size() == 1 && invokerUrls.get(0) != null
            && Constants.EMPTY_PROTOCOL.equals(invokerUrls.get(0).getProtocol())) {
            this.forbidden = true; // Forbid to access
            this.methodInvokerMap = null; // Set the method invoker map to null
            destroyAllInvokers(); // Close all invokers
        } else {
            this.forbidden = false; // Allow to access
            Map<String, Invoker<T>> oldUrlInvokerMap = this.urlInvokerMap; // local reference
            if (invokerUrls.isEmpty() && this.cachedInvokerUrls != null) {
                invokerUrls.addAll(this.cachedInvokerUrls);
            } else {
                this.cachedInvokerUrls = new HashSet<URL>();
                this.cachedInvokerUrls.addAll(invokerUrls);//Cached invoker urls, convenient for comparison
            }
            if (invokerUrls.isEmpty()) {
                return;
            }
            // 将url对象转换成Invoker对象,关键步骤
            Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);// Translate url list to Invoker map
            Map<String, List<Invoker<T>>> newMethodInvokerMap = toMethodInvokers(newUrlInvokerMap); // Change method name to map Invoker Map
            // state change
            // If the calculation is wrong, it is not processed.
            if (newUrlInvokerMap == null || newUrlInvokerMap.size() == 0) {
                logger.error(new IllegalStateException(
                    "urls to invokers error .invokerUrls.size :" + invokerUrls.size() + ", invoker.size :0. urls :" + invokerUrls.toString()));
                return;
            }
            this.methodInvokerMap = multiGroup ? toMergeMethodInvokerMap(newMethodInvokerMap) : newMethodInvokerMap;
            this.urlInvokerMap = newUrlInvokerMap;
            try {
                destroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap); // Close the unused Invoker
            } catch (Exception e) {
                logger.warn("destroyUnusedInvokers error. ", e);
            }
        }
    }

    /**
     * Turn urls into invokers, and if url has been refer, will not re-reference.
     * 将每个provider映射为相应的Invoker, Invoker内包含consumer和provider的ExchangeClient,也就是连接和信息转换对象
     *
     * @param urls
     * @return invokers
     */
    private Map<String, Invoker<T>> toInvokers(List<URL> urls) {
        Map<String, Invoker<T>> newUrlInvokerMap = new HashMap<String, Invoker<T>>();
        if (urls == null || urls.isEmpty()) {
            return newUrlInvokerMap;
        }
        Set<String> keys = new HashSet<String>();
        String queryProtocols = this.queryMap.get(Constants.PROTOCOL_KEY);
        for (URL providerUrl : urls) {
            // If protocol is configured at the reference side, only the matching protocol is selected
            if (queryProtocols != null && queryProtocols.length() > 0) {
                boolean accept = false;
                String[] acceptProtocols = queryProtocols.split(",");
                for (String acceptProtocol : acceptProtocols) {
                    // protocol协议必须符合
                    if (providerUrl.getProtocol().equals(acceptProtocol)) {
                        accept = true;
                        break;
                    }
                }
                if (!accept) {
                    continue;
                }
            }
            if (Constants.EMPTY_PROTOCOL.equals(providerUrl.getProtocol())) {
                continue;
            }
            // 如果Provider的protocol不是已知的
            if (!ExtensionLoader.getExtensionLoader(Protocol.class).hasExtension(providerUrl.getProtocol())) {
                logger.error(new IllegalStateException(
                    "Unsupported protocol " + providerUrl.getProtocol() + " in notified url: " + providerUrl + " from registry " + getUrl().getAddress()
                        + " to consumer " + NetUtils.getLocalHost()
                        + ", supported protocol: " + ExtensionLoader.getExtensionLoader(Protocol.class).getSupportedExtensions()));
                continue;
            }
            URL url = mergeUrl(providerUrl);

            String key = url.toFullString(); // The parameter urls are sorted
            if (keys.contains(key)) { // Repeated url
                continue;
            }
            // 缓存url相应的invoker,key 是url的fullString
            keys.add(key);
            // Cache key is url that does not merge with consumer side parameters, regardless of how the consumer combines parameters, if the server url
            // changes, then refer again
            Map<String, Invoker<T>> localUrlInvokerMap = this.urlInvokerMap; // local reference
            Invoker<T> invoker = localUrlInvokerMap == null ? null : localUrlInvokerMap.get(key);
            if (invoker == null) { // Not in the cache, refer again
                try {
                    // 判断URL上是否含有disable,enable的参数,也就是是否可用
                    boolean enabled = true;
                    if (url.hasParameter(Constants.DISABLED_KEY)) {
                        enabled = !url.getParameter(Constants.DISABLED_KEY, false);
                    } else {
                        enabled = url.getParameter(Constants.ENABLED_KEY, true);
                    }
                    if (enabled) {
                        // 分发式的Invoker, 便于负载均衡 , protocol.refer(serviceType, url) refer Provider,生成Invoker
                        // protocol  >> Protocol$Adaptive , 通过ExtesionLoader获取到的是 Wrapper包装,比如 ProtocolFilterWrapper
                        invoker = new InvokerDelegate<T>(protocol.refer(serviceType, url), url, providerUrl);
                    }
                } catch (Throwable t) {
                    logger.error("Failed to refer invoker for interface:" + serviceType + ",url:(" + url + ")" + t.getMessage(), t);
                }
                if (invoker != null) { // Put new invoker in cache
                    newUrlInvokerMap.put(key, invoker);
                }
            } else {
                newUrlInvokerMap.put(key, invoker);
            }
        }
        keys.clear();
        return newUrlInvokerMap;
    }

}

上文中最关键的就是这行代码 : invoker = new InvokerDelegate(protocol.refer(serviceType, url), url, providerUrl);

ReferenceConfig在初始化时,Dubbo将url上的protocol替换成了registry,然后解析时使用RegistryProtocol去想Zookeeper注册自己,然后订阅Provider。

在进入RegistryProtocol时,就将protocol还原成原始值,默认是dubbo,这样这会执行protocol.refer(serviceType, url)时,就会使用DubboRegistryrefer方法,连接Provider,生成Invoker。

Consumer连接Provider,生成Invoker的流程很长,很关键,我们在下次博客接着讲。

最后

以上就是独特水壶为你收集整理的Dubbo的Consumer启动过程(通过URL对象注册,订阅)的全部内容,希望文章能够帮你解决Dubbo的Consumer启动过程(通过URL对象注册,订阅)所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(35)

评论列表共有 0 条评论

立即
投稿
返回
顶部