我是靠谱客的博主 潇洒大碗,最近开发中收集的这篇文章主要介绍【知识库】--Dubbo ReferenceBean获取-- 调用RegistryDirectory.notify -- 源码过程(254),觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

回顾上一篇文章:获取ReferenceBean过程中SPI自适应到RegistryProtocol类后,调用refer方法


@SuppressWarnings("unchecked")
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
    url = url.setProtocol(url.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_REGISTRY)).removeParameter(Constants.REGISTRY_KEY);
    Registry registry = registryFactory.getRegistry(url);
    if (RegistryService.class.equals(type)) {
        return proxyFactory.getInvoker((T) registry, type, url);
    }

    // group="a,b" or group="*"
    Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(Constants.REFER_KEY));
    String group = qs.get(Constants.GROUP_KEY);
    if (group != null && group.length() > 0) {
        if ((Constants.COMMA_SPLIT_PATTERN.split(group)).length > 1
                || "*".equals(group)) {
            return doRefer(getMergeableCluster(), registry, type, url);
        }
    }
    return doRefer(cluster, registry, type, url);
}

private Cluster getMergeableCluster() {
    return ExtensionLoader.getExtensionLoader(Cluster.class).getExtension("mergeable");
}

private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
    RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
    directory.setRegistry(registry);
    directory.setProtocol(protocol);
    URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, NetUtils.getLocalHost(), 0, type.getName(), directory.getUrl().getParameters());
    if (!Constants.ANY_VALUE.equals(url.getServiceInterface())
            && url.getParameter(Constants.REGISTER_KEY, true)) {
        registry.register(subscribeUrl.addParameters(Constants.CATEGORY_KEY, Constants.CONSUMERS_CATEGORY,
                Constants.CHECK_KEY, String.valueOf(false)));
    }
    directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY,
            Constants.PROVIDERS_CATEGORY
                    + "," + Constants.CONFIGURATORS_CATEGORY
                    + "," + Constants.ROUTERS_CATEGORY));
    return cluster.join(directory);
}
1

自适应到:com.alibaba.dubbo.registry.integration.RegistryDirectory#subscribe方法

 directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY,
            Constants.PROVIDERS_CATEGORY
                    + "," + Constants.CONFIGURATORS_CATEGORY
                    + "," + Constants.ROUTERS_CATEGORY));

2 方法实现:

public void subscribe(URL url) {
    setConsumerUrl(url);
    registry.subscribe(url, this);
}

3 订阅实现:

com.alibaba.dubbo.registry.support.FailbackRegistry#subscribe

@Override
public void subscribe(URL url, NotifyListener listener) {
    super.subscribe(url, listener);
    removeFailedSubscribed(url, listener);
    try {
        // 向服务器端发送订阅请求
        doSubscribe(url, listener);
    } catch (Exception e) {
        Throwable t = e;

        List<URL> urls = getCacheUrls(url);
        if (urls != null && urls.size() > 0) {
            notify(url, listener, urls);
            logger.error("Failed to subscribe " + url + ", Using cached list: " + urls + " from cache file: " + getUrl().getParameter(Constants.FILE_KEY, System.getProperty("user.home") + "/dubbo-registry-" + url.getHost() + ".cache") + ", cause: " + t.getMessage(), t);
        } else {
            // 如果开启了启动时检测,则直接抛出异常
            boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
                    && url.getParameter(Constants.CHECK_KEY, true);
            boolean skipFailback = t instanceof SkipFailbackWrapperException;
            if (check || skipFailback) {
                if (skipFailback) {
                    t = t.getCause();
                }
                throw new IllegalStateException("Failed to subscribe " + url + ", cause: " + t.getMessage(), t);
            } else {
                logger.error("Failed to subscribe " + url + ", waiting for retry, cause: " + t.getMessage(), t);
            }
        }

        // 将失败的订阅请求记录到失败列表,定时重试
        addFailedSubscribed(url, listener);
    }
}

4 重点:

        // 向服务器端发送订阅请求
        doSubscribe(url, listener);

5 参考zookeeper实现:

com.alibaba.dubbo.registry.zookeeper.ZookeeperRegistry#doSubscribe

protected void doSubscribe(final URL url, final NotifyListener listener) {
    try {
        if (Constants.ANY_VALUE.equals(url.getServiceInterface())) {
            String root = toRootPath();
            ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
            if (listeners == null) {
                zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>());
                listeners = zkListeners.get(url);
            }
            ChildListener zkListener = listeners.get(listener);
            if (zkListener == null) {
                listeners.putIfAbsent(listener, new ChildListener() {
                    public void childChanged(String parentPath, List<String> currentChilds) {
                        for (String child : currentChilds) {
                            if (! anyServices.contains(child)) {
                                anyServices.add(child);
                                subscribe(url.setPath(child).addParameters(Constants.INTERFACE_KEY, child, 
                                        Constants.CHECK_KEY, String.valueOf(false)), listener);
                            }
                        }
                    }
                });
                zkListener = listeners.get(listener);
            }
            zkClient.create(root, false);
            List<String> services = zkClient.addChildListener(root, zkListener);
            if (services != null && services.size() > 0) {
                anyServices.addAll(services);
                for (String service : services) {
                    subscribe(url.setPath(service).addParameters(Constants.INTERFACE_KEY, service, 
                            Constants.CHECK_KEY, String.valueOf(false)), listener);
                }
            }
        } else {
            List<URL> urls = new ArrayList<URL>();
            for (String path : toCategoriesPath(url)) {
                ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
                if (listeners == null) {
                    zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>());
                    listeners = zkListeners.get(url);
                }
                ChildListener zkListener = listeners.get(listener);
                if (zkListener == null) {
                    listeners.putIfAbsent(listener, new ChildListener() {
                        public void childChanged(String parentPath, List<String> currentChilds) {
                           ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds));
                        }
                    });
                    zkListener = listeners.get(listener);
                }
                zkClient.create(path, false);
                List<String> children = zkClient.addChildListener(path, zkListener);
                if (children != null) {
                   urls.addAll(toUrlsWithEmpty(url, path, children));
                }
            }
            notify(url, listener, urls);
        }
    } catch (Throwable e) {
        throw new RpcException("Failed to subscribe " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
    }
}

6 上面除了注册回调函数外,最后调用了(回调函数,主动触发)

notify(url, listener, urls);
即:

com.alibaba.dubbo.registry.support.FailbackRegistry#notify

实现:

@Override
protected void notify(URL url, NotifyListener listener, List<URL> urls) {
    if (url == null) {
        throw new IllegalArgumentException("notify url == null");
    }
    if (listener == null) {
        throw new IllegalArgumentException("notify listener == null");
    }
    try {
        doNotify(url, listener, urls);
    } catch (Exception t) {
        // 将失败的通知请求记录到失败列表,定时重试
        Map<NotifyListener, List<URL>> listeners = failedNotified.get(url);
        if (listeners == null) {
            failedNotified.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, List<URL>>());
            listeners = failedNotified.get(url);
        }
        listeners.put(listener, urls);
        logger.error("Failed to notify for subscribe " + url + ", waiting for retry, cause: " + t.getMessage(), t);
    }
}

跟进:

protected void doNotify(URL url, NotifyListener listener, List<URL> urls) {
    super.notify(url, listener, urls);
}
跟进到:com.alibaba.dubbo.registry.support.AbstractRegistry# notify
protected void notify(URL url, NotifyListener listener, List<URL> urls) {
    if (url == null) {
        throw new IllegalArgumentException("notify url == null");
    }
    if (listener == null) {
        throw new IllegalArgumentException("notify listener == null");
    }
    if ((urls == null || urls.size() == 0)
            && !Constants.ANY_VALUE.equals(url.getServiceInterface())) {
        logger.warn("Ignore empty notify urls for subscribe url " + url);
        return;
    }
    if (logger.isInfoEnabled()) {
        logger.info("Notify urls for subscribe url " + url + ", urls: " + urls);
    }
    Map<String, List<URL>> result = new HashMap<String, List<URL>>();
    for (URL u : urls) {
        if (UrlUtils.isMatch(url, u)) {
            String category = u.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);
            List<URL> categoryList = result.get(category);
            if (categoryList == null) {
                categoryList = new ArrayList<URL>();
                result.put(category, categoryList);
            }
            categoryList.add(u);
        }
    }
    if (result.size() == 0) {
        return;
    }
    Map<String, List<URL>> categoryNotified = notified.get(url);
    if (categoryNotified == null) {
        notified.putIfAbsent(url, new ConcurrentHashMap<String, List<URL>>());
        categoryNotified = notified.get(url);
    }
    for (Map.Entry<String, List<URL>> entry : result.entrySet()) {
        String category = entry.getKey();
        List<URL> categoryList = entry.getValue();
        categoryNotified.put(category, categoryList);
        saveProperties(url);
        listener.notify(categoryList);
    }
}


7 上面最终执行了 回调!

 listener.notify(categoryList);
也就是com.alibaba.dubbo.registry.integration.RegistryDirectory# notify

public synchronized void notify(List<URL> urls) {
    List<URL> invokerUrls = new ArrayList<URL>();
    List<URL> routerUrls = new ArrayList<URL>();
    List<URL> configuratorUrls = new ArrayList<URL>();

    for (URL url : urls) {
        String protocol = url.getProtocol();
        String category = url.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);
        if (Constants.ROUTERS_CATEGORY.equals(category) || Constants.ROUTE_PROTOCOL.equals(protocol)) {
            routerUrls.add(url);
        } else if (Constants.CONFIGURATORS_CATEGORY.equals(category) || Constants.OVERRIDE_PROTOCOL.equals(protocol)) {
            configuratorUrls.add(url);
        } else if (Constants.PROVIDERS_CATEGORY.equals(category)) {
            invokerUrls.add(url);
        } else {
            logger.warn("Unsupported category " + category + " in notified url: " + url + " from registry " + getUrl().getAddress() + " to consumer " + NetUtils.getLocalHost());
        }
    }

    // routers
    if (routerUrls.size() > 0) {
        List<Router> routers = toRouters(routerUrls);
        if (routers != null) { // null - do nothing
            setRouters(routers);
        }
    }

    // configurators
    if (configuratorUrls.size() > 0) {
        this.configurators = toConfigurators(configuratorUrls);
    }
    List<Configurator> localConfigurators = this.configurators; // local reference
    // 合并override参数
    this.overrideDirectoryUrl = directoryUrl;
    if (localConfigurators != null && localConfigurators.size() > 0) {
        for (Configurator configurator : localConfigurators) {
            this.overrideDirectoryUrl = configurator.configure(overrideDirectoryUrl);
        }
    }

    if (configuratorUrls.size() > 0) {
        logger.info("unconfigured directory url without provider params: " + this.directoryUrl
                + ", configured directory url without provider params: " + this.overrideDirectoryUrl);
    }

    // providers
    refreshInvoker(invokerUrls);
}

等待被动回调和本次主动回调最终都会调用RegistryDirectory.notify。

8 最后就会触发refreshInvoker
 /* 根据invokerURL列表转换为invoker列表。转换规则如下:
 * 1.如果url已经被转换为invoker,则不在重新引用,直接从缓存中获取,注意如果url中任何一个参数变更也会重新引用
 * 2.如果传入的invoker列表不为空,则表示最新的invoker列表
 * 3.如果传入的invokerUrl列表是空,则表示只是下发的override规则或route规则,需要重新交叉对比,决定是否需要重新引用。
 *
 * @param invokerUrls 传入的参数不能为null
 */
private void refreshInvoker(List<URL> invokerUrls) {
    if (invokerUrls != null && invokerUrls.size() == 1 && invokerUrls.get(0) != null && Constants.EMPTY_PROTOCOL.equals(invokerUrls.get(0).getProtocol())) {
        this.forbidden = true; // 禁止访问
        overrideDirectoryUrl = overrideDirectoryUrl.addParameters(Constants.INVOKER_INSIDE_INVOKERS_KEY, "", Constants.INVOKER_INSIDE_INVOKER_COUNT_KEY, "0");
        this.methodInvokerMap = null; // 置空列表
        destroyAllInvokers(); // 关闭所有Invoker
    } else {
        this.forbidden = false; // 允许访问
        Map<String, Invoker<T>> oldUrlInvokerMap = this.urlInvokerMap; // local reference
        if (invokerUrls.size() == 0 && this.lastInvokerUrls != null) {
            invokerUrls.addAll(this.lastInvokerUrls);
        } else {
            this.lastInvokerUrls = new HashSet<URL>();
            this.lastInvokerUrls.addAll(invokerUrls);//缓存invokerUrls列表,便于交叉对比
        }

        if (invokerUrls.size() == 0) return;

        Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);// 将URL列表转成Invoker列表
        // state change
        //如果计算错误,则不进行处理.
        //比如所有provider都标记为下线状态,这样其实最后一个provider是下不了的
        if (newUrlInvokerMap == null || newUrlInvokerMap.size() == 0) {
            logger.error(new IllegalStateException("urls to invokers error .invokerUrls.size :" + invokerUrls.size() + ", invoker.size :0. urls :" + invokerUrls.toString()));
            return;
        }
        Map<String, List<Invoker<T>>> newMethodInvokerMap = toMethodInvokers(newUrlInvokerMap); // 换方法名映射Invoker列表

        this.methodInvokerMap = multiGroup ? toMergeMethodInvokerMap(newMethodInvokerMap) : newMethodInvokerMap;
        this.urlInvokerMap = newUrlInvokerMap;
        try {
            destroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap); // 关闭未使用的Invoker
        } catch (Exception e) {
            logger.warn("destroyUnusedInvokers error. ", e);
        }

        List<String> invokerUrlString = new ArrayList<String>();
        for (URL invoker : invokerUrls) {
            invokerUrlString.add(invoker.toString());
        }
        overrideDirectoryUrl = overrideDirectoryUrl.addParameters(
                Constants.INVOKER_INSIDE_INVOKERS_KEY, URL.encode(CollectionUtils.join(invokerUrlString, ";")),
                Constants.INVOKER_INSIDE_INVOKER_COUNT_KEY, String.valueOf(invokerUrls.size()));
    }
}

9 跟进com.alibaba.dubbo.registry.integration.RegistryDirectory#toInvokers

/**
 * 将urls转成invokers,如果url已经被refer过,不再重新引用。
 *
 * @param urls
 * @return invokers
 */
private Map<String, Invoker<T>> toInvokers(List<URL> urls) {
    Map<String, Invoker<T>> result = new HashMap<String, Invoker<T>>();
    if (urls == null || urls.size() == 0) return result;

    Set<String> keys = new HashSet<String>();
    for (URL providerUrl : urls) {
        if (!acceptProtocol(providerUrl)) {
            continue;
        }
        if (!acceptTransport(providerUrl)) {
            continue;
        }

        URL url = mergeUrl(providerUrl);

        String key = url.toFullString(); // URL参数是排序的
        if (keys.contains(key)) { // 重复URL
            continue;
        }
        keys.add(key);

        // 缓存key为没有合并消费端参数的URL,不管消费端如何合并参数,如果服务端URL发生变化,则重新refer
        Map<String, Invoker<T>> localUrlInvokerMap = this.urlInvokerMap; // local reference
        Invoker<T> invoker = localUrlInvokerMap == null ? null : localUrlInvokerMap.get(key);
        if (invoker == null) { // 缓存中没有,重新refer
            try {
                if (isEnable(url)) {
                    invoker = new InvokerDelegete<T>(protocol.refer(serviceType, url), url, providerUrl);
                }
            } catch (Throwable t) {
                logger.error("Failed to refer invoker for interface:" + serviceType + ",url:(" + url + ")" + t.getMessage(), t);
            }
            if (invoker != null) { // 将新的引用放入缓存
                result.put(key, invoker);
            }
        } else {
            result.put(key, invoker);
        }
    }
    keys.clear();
    return result;
}
到此目录notify方法调用 源码过程完毕!

最后

以上就是潇洒大碗为你收集整理的【知识库】--Dubbo ReferenceBean获取-- 调用RegistryDirectory.notify -- 源码过程(254)的全部内容,希望文章能够帮你解决【知识库】--Dubbo ReferenceBean获取-- 调用RegistryDirectory.notify -- 源码过程(254)所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(63)

评论列表共有 0 条评论

立即
投稿
返回
顶部