概述
回顾上一篇文章:获取ReferenceBean过程中SPI自适应到RegistryProtocol类后,调用refer方法:
@SuppressWarnings("unchecked") public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException { url = url.setProtocol(url.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_REGISTRY)).removeParameter(Constants.REGISTRY_KEY); Registry registry = registryFactory.getRegistry(url); if (RegistryService.class.equals(type)) { return proxyFactory.getInvoker((T) registry, type, url); } // group="a,b" or group="*" Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(Constants.REFER_KEY)); String group = qs.get(Constants.GROUP_KEY); if (group != null && group.length() > 0) { if ((Constants.COMMA_SPLIT_PATTERN.split(group)).length > 1 || "*".equals(group)) { return doRefer(getMergeableCluster(), registry, type, url); } } return doRefer(cluster, registry, type, url); } private Cluster getMergeableCluster() { return ExtensionLoader.getExtensionLoader(Cluster.class).getExtension("mergeable"); } private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) { RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url); directory.setRegistry(registry); directory.setProtocol(protocol); URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, NetUtils.getLocalHost(), 0, type.getName(), directory.getUrl().getParameters()); if (!Constants.ANY_VALUE.equals(url.getServiceInterface()) && url.getParameter(Constants.REGISTER_KEY, true)) { registry.register(subscribeUrl.addParameters(Constants.CATEGORY_KEY, Constants.CONSUMERS_CATEGORY, Constants.CHECK_KEY, String.valueOf(false))); } directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY, Constants.PROVIDERS_CATEGORY + "," + Constants.CONFIGURATORS_CATEGORY + "," + Constants.ROUTERS_CATEGORY)); return cluster.join(directory); }1
自适应到:com.alibaba.dubbo.registry.integration.RegistryDirectory#subscribe方法
directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY, Constants.PROVIDERS_CATEGORY + "," + Constants.CONFIGURATORS_CATEGORY + "," + Constants.ROUTERS_CATEGORY));
2 方法实现:
public void subscribe(URL url) { setConsumerUrl(url); registry.subscribe(url, this); }
3 订阅实现:
com.alibaba.dubbo.registry.support.FailbackRegistry#subscribe
@Override public void subscribe(URL url, NotifyListener listener) { super.subscribe(url, listener); removeFailedSubscribed(url, listener); try { // 向服务器端发送订阅请求 doSubscribe(url, listener); } catch (Exception e) { Throwable t = e; List<URL> urls = getCacheUrls(url); if (urls != null && urls.size() > 0) { notify(url, listener, urls); logger.error("Failed to subscribe " + url + ", Using cached list: " + urls + " from cache file: " + getUrl().getParameter(Constants.FILE_KEY, System.getProperty("user.home") + "/dubbo-registry-" + url.getHost() + ".cache") + ", cause: " + t.getMessage(), t); } else { // 如果开启了启动时检测,则直接抛出异常 boolean check = getUrl().getParameter(Constants.CHECK_KEY, true) && url.getParameter(Constants.CHECK_KEY, true); boolean skipFailback = t instanceof SkipFailbackWrapperException; if (check || skipFailback) { if (skipFailback) { t = t.getCause(); } throw new IllegalStateException("Failed to subscribe " + url + ", cause: " + t.getMessage(), t); } else { logger.error("Failed to subscribe " + url + ", waiting for retry, cause: " + t.getMessage(), t); } } // 将失败的订阅请求记录到失败列表,定时重试 addFailedSubscribed(url, listener); } }
4 重点:
// 向服务器端发送订阅请求 doSubscribe(url, listener);
5 参考zookeeper实现:
com.alibaba.dubbo.registry.zookeeper.ZookeeperRegistry#doSubscribe
protected void doSubscribe(final URL url, final NotifyListener listener) { try { if (Constants.ANY_VALUE.equals(url.getServiceInterface())) { String root = toRootPath(); ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url); if (listeners == null) { zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>()); listeners = zkListeners.get(url); } ChildListener zkListener = listeners.get(listener); if (zkListener == null) { listeners.putIfAbsent(listener, new ChildListener() { public void childChanged(String parentPath, List<String> currentChilds) { for (String child : currentChilds) { if (! anyServices.contains(child)) { anyServices.add(child); subscribe(url.setPath(child).addParameters(Constants.INTERFACE_KEY, child, Constants.CHECK_KEY, String.valueOf(false)), listener); } } } }); zkListener = listeners.get(listener); } zkClient.create(root, false); List<String> services = zkClient.addChildListener(root, zkListener); if (services != null && services.size() > 0) { anyServices.addAll(services); for (String service : services) { subscribe(url.setPath(service).addParameters(Constants.INTERFACE_KEY, service, Constants.CHECK_KEY, String.valueOf(false)), listener); } } } else { List<URL> urls = new ArrayList<URL>(); for (String path : toCategoriesPath(url)) { ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url); if (listeners == null) { zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>()); listeners = zkListeners.get(url); } ChildListener zkListener = listeners.get(listener); if (zkListener == null) { listeners.putIfAbsent(listener, new ChildListener() { public void childChanged(String parentPath, List<String> currentChilds) { ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds)); } }); zkListener = listeners.get(listener); } zkClient.create(path, false); List<String> children = zkClient.addChildListener(path, zkListener); if (children != null) { urls.addAll(toUrlsWithEmpty(url, path, children)); } } notify(url, listener, urls); } } catch (Throwable e) { throw new RpcException("Failed to subscribe " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e); } }
6 上面除了注册回调函数外,最后调用了(回调函数,主动触发)
notify(url, listener, urls);即:
com.alibaba.dubbo.registry.support.FailbackRegistry#notify
实现:
@Override protected void notify(URL url, NotifyListener listener, List<URL> urls) { if (url == null) { throw new IllegalArgumentException("notify url == null"); } if (listener == null) { throw new IllegalArgumentException("notify listener == null"); } try { doNotify(url, listener, urls); } catch (Exception t) { // 将失败的通知请求记录到失败列表,定时重试 Map<NotifyListener, List<URL>> listeners = failedNotified.get(url); if (listeners == null) { failedNotified.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, List<URL>>()); listeners = failedNotified.get(url); } listeners.put(listener, urls); logger.error("Failed to notify for subscribe " + url + ", waiting for retry, cause: " + t.getMessage(), t); } }
跟进:
protected void doNotify(URL url, NotifyListener listener, List<URL> urls) { super.notify(url, listener, urls); }跟进到:com.alibaba.dubbo.registry.support.AbstractRegistry# notify
protected void notify(URL url, NotifyListener listener, List<URL> urls) { if (url == null) { throw new IllegalArgumentException("notify url == null"); } if (listener == null) { throw new IllegalArgumentException("notify listener == null"); } if ((urls == null || urls.size() == 0) && !Constants.ANY_VALUE.equals(url.getServiceInterface())) { logger.warn("Ignore empty notify urls for subscribe url " + url); return; } if (logger.isInfoEnabled()) { logger.info("Notify urls for subscribe url " + url + ", urls: " + urls); } Map<String, List<URL>> result = new HashMap<String, List<URL>>(); for (URL u : urls) { if (UrlUtils.isMatch(url, u)) { String category = u.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY); List<URL> categoryList = result.get(category); if (categoryList == null) { categoryList = new ArrayList<URL>(); result.put(category, categoryList); } categoryList.add(u); } } if (result.size() == 0) { return; } Map<String, List<URL>> categoryNotified = notified.get(url); if (categoryNotified == null) { notified.putIfAbsent(url, new ConcurrentHashMap<String, List<URL>>()); categoryNotified = notified.get(url); } for (Map.Entry<String, List<URL>> entry : result.entrySet()) { String category = entry.getKey(); List<URL> categoryList = entry.getValue(); categoryNotified.put(category, categoryList); saveProperties(url); listener.notify(categoryList); } }
7 上面最终执行了 回调!
listener.notify(categoryList);也就是com.alibaba.dubbo.registry.integration.RegistryDirectory# notify
public synchronized void notify(List<URL> urls) { List<URL> invokerUrls = new ArrayList<URL>(); List<URL> routerUrls = new ArrayList<URL>(); List<URL> configuratorUrls = new ArrayList<URL>(); for (URL url : urls) { String protocol = url.getProtocol(); String category = url.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY); if (Constants.ROUTERS_CATEGORY.equals(category) || Constants.ROUTE_PROTOCOL.equals(protocol)) { routerUrls.add(url); } else if (Constants.CONFIGURATORS_CATEGORY.equals(category) || Constants.OVERRIDE_PROTOCOL.equals(protocol)) { configuratorUrls.add(url); } else if (Constants.PROVIDERS_CATEGORY.equals(category)) { invokerUrls.add(url); } else { logger.warn("Unsupported category " + category + " in notified url: " + url + " from registry " + getUrl().getAddress() + " to consumer " + NetUtils.getLocalHost()); } } // routers if (routerUrls.size() > 0) { List<Router> routers = toRouters(routerUrls); if (routers != null) { // null - do nothing setRouters(routers); } } // configurators if (configuratorUrls.size() > 0) { this.configurators = toConfigurators(configuratorUrls); } List<Configurator> localConfigurators = this.configurators; // local reference // 合并override参数 this.overrideDirectoryUrl = directoryUrl; if (localConfigurators != null && localConfigurators.size() > 0) { for (Configurator configurator : localConfigurators) { this.overrideDirectoryUrl = configurator.configure(overrideDirectoryUrl); } } if (configuratorUrls.size() > 0) { logger.info("unconfigured directory url without provider params: " + this.directoryUrl + ", configured directory url without provider params: " + this.overrideDirectoryUrl); } // providers refreshInvoker(invokerUrls); }
等待被动回调和本次主动回调最终都会调用RegistryDirectory.notify。
8 最后就会触发refreshInvoker/* 根据invokerURL列表转换为invoker列表。转换规则如下: * 1.如果url已经被转换为invoker,则不在重新引用,直接从缓存中获取,注意如果url中任何一个参数变更也会重新引用 * 2.如果传入的invoker列表不为空,则表示最新的invoker列表 * 3.如果传入的invokerUrl列表是空,则表示只是下发的override规则或route规则,需要重新交叉对比,决定是否需要重新引用。 * * @param invokerUrls 传入的参数不能为null */ private void refreshInvoker(List<URL> invokerUrls) { if (invokerUrls != null && invokerUrls.size() == 1 && invokerUrls.get(0) != null && Constants.EMPTY_PROTOCOL.equals(invokerUrls.get(0).getProtocol())) { this.forbidden = true; // 禁止访问 overrideDirectoryUrl = overrideDirectoryUrl.addParameters(Constants.INVOKER_INSIDE_INVOKERS_KEY, "", Constants.INVOKER_INSIDE_INVOKER_COUNT_KEY, "0"); this.methodInvokerMap = null; // 置空列表 destroyAllInvokers(); // 关闭所有Invoker } else { this.forbidden = false; // 允许访问 Map<String, Invoker<T>> oldUrlInvokerMap = this.urlInvokerMap; // local reference if (invokerUrls.size() == 0 && this.lastInvokerUrls != null) { invokerUrls.addAll(this.lastInvokerUrls); } else { this.lastInvokerUrls = new HashSet<URL>(); this.lastInvokerUrls.addAll(invokerUrls);//缓存invokerUrls列表,便于交叉对比 } if (invokerUrls.size() == 0) return; Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);// 将URL列表转成Invoker列表 // state change //如果计算错误,则不进行处理. //比如所有provider都标记为下线状态,这样其实最后一个provider是下不了的 if (newUrlInvokerMap == null || newUrlInvokerMap.size() == 0) { logger.error(new IllegalStateException("urls to invokers error .invokerUrls.size :" + invokerUrls.size() + ", invoker.size :0. urls :" + invokerUrls.toString())); return; } Map<String, List<Invoker<T>>> newMethodInvokerMap = toMethodInvokers(newUrlInvokerMap); // 换方法名映射Invoker列表 this.methodInvokerMap = multiGroup ? toMergeMethodInvokerMap(newMethodInvokerMap) : newMethodInvokerMap; this.urlInvokerMap = newUrlInvokerMap; try { destroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap); // 关闭未使用的Invoker } catch (Exception e) { logger.warn("destroyUnusedInvokers error. ", e); } List<String> invokerUrlString = new ArrayList<String>(); for (URL invoker : invokerUrls) { invokerUrlString.add(invoker.toString()); } overrideDirectoryUrl = overrideDirectoryUrl.addParameters( Constants.INVOKER_INSIDE_INVOKERS_KEY, URL.encode(CollectionUtils.join(invokerUrlString, ";")), Constants.INVOKER_INSIDE_INVOKER_COUNT_KEY, String.valueOf(invokerUrls.size())); } }
9 跟进com.alibaba.dubbo.registry.integration.RegistryDirectory#toInvokers
/** * 将urls转成invokers,如果url已经被refer过,不再重新引用。 * * @param urls * @return invokers */ private Map<String, Invoker<T>> toInvokers(List<URL> urls) { Map<String, Invoker<T>> result = new HashMap<String, Invoker<T>>(); if (urls == null || urls.size() == 0) return result; Set<String> keys = new HashSet<String>(); for (URL providerUrl : urls) { if (!acceptProtocol(providerUrl)) { continue; } if (!acceptTransport(providerUrl)) { continue; } URL url = mergeUrl(providerUrl); String key = url.toFullString(); // URL参数是排序的 if (keys.contains(key)) { // 重复URL continue; } keys.add(key); // 缓存key为没有合并消费端参数的URL,不管消费端如何合并参数,如果服务端URL发生变化,则重新refer Map<String, Invoker<T>> localUrlInvokerMap = this.urlInvokerMap; // local reference Invoker<T> invoker = localUrlInvokerMap == null ? null : localUrlInvokerMap.get(key); if (invoker == null) { // 缓存中没有,重新refer try { if (isEnable(url)) { invoker = new InvokerDelegete<T>(protocol.refer(serviceType, url), url, providerUrl); } } catch (Throwable t) { logger.error("Failed to refer invoker for interface:" + serviceType + ",url:(" + url + ")" + t.getMessage(), t); } if (invoker != null) { // 将新的引用放入缓存 result.put(key, invoker); } } else { result.put(key, invoker); } } keys.clear(); return result; }到此目录notify方法调用 源码过程完毕!
最后
以上就是潇洒大碗为你收集整理的【知识库】--Dubbo ReferenceBean获取-- 调用RegistryDirectory.notify -- 源码过程(254)的全部内容,希望文章能够帮你解决【知识库】--Dubbo ReferenceBean获取-- 调用RegistryDirectory.notify -- 源码过程(254)所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复