概述
接着上篇博客继续讲。
ReferenceConfig在生成URL的过程中,有这么一个步骤:
public abstract class AbstractInterfaceConfig extends AbstractMethodConfig {
/**
* 加载注册中心 URL 数组
*
* @param provider 是否是服务提供者
* @return URL 数组
*/
protected List<URL> loadRegistries(boolean provider) {
......
// 解析地址,创建 Dubbo URL 数组。(数组大小可以为一),同时将map内的参数填充入每个url
List<URL> urls = UrlUtils.parseURLs(address, map);
// 循环 `url` ,设置 "registry" 和 "protocol" 属性。
for (URL url : urls) {
// 设置 `registry=${protocol}` 和 `protocol=registry` 到 URL
url = url.addParameter(Constants.REGISTRY_KEY, url.getProtocol());
// 首先需要使用url向zookeeper注册自身
url = url.setProtocol(Constants.REGISTRY_PROTOCOL);
// 添加到结果
if ((provider && url.getParameter(Constants.REGISTER_KEY, true)) // 服务提供者 && 注册
|| (!provider && url.getParameter(Constants.SUBSCRIBE_KEY, true))) { // 服务消费者 && 订阅
registryList.add(url);
}
}
}
}
其用 Constants.REGISTRY_PROTOCOL 替换掉 URL上的protocol,将原始值放进RUL的prameter集合中,这样在下一步执行时,会用ExtensionLoader生成的Protocol$Adaptive来选择RegistryProtocol执行,这样就会先向相应的注册中心注册自己,然后订阅Providers,最后连接Provider。
public class ReferenceConfig<T> extends AbstractReferenceConfig {
/**
* 自适应 Protocol 实现对象
*/
private static final Protocol refprotocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();
/**
* 创建 Service 代理对象
*
* @param map 集合
* @return 代理对象
*/
@SuppressWarnings({"unchecked", "rawtypes", "deprecation"})
private T createProxy(Map<String, String> map) {
......
// 单 `urls` 时,引用服务,返回 Invoker 对象
if (urls.size() == 1) {
// 引用服务
invoker = refprotocol.refer(interfaceClass, urls.get(0));
} else {
// 循环 `urls` ,引用服务,返回 Invoker 对象
List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
URL registryURL = null;
for (URL url : urls) {
// 引用服务
invokers.add(refprotocol.refer(interfaceClass, url));
// 使用最后一个注册中心的 URL
if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
registryURL = url; // use last registry url
}
}
// 有注册中心
if (registryURL != null) { // registry url is available
// 对有注册中心的 Cluster 只用 AvailableCluster
// use AvailableCluster only when register's cluster is available
URL u = registryURL.addParameter(Constants.CLUSTER_KEY, AvailableCluster.NAME);
// 将多个invoker组合成一个集群式的invker,以便负载均衡
invoker = cluster.join(new StaticDirectory(u, invokers));
// 无注册中心
} else { // not a registry url
// // 将多个invoker组合成一个集群式的invker,以便负载均衡
invoker = cluster.join(new StaticDirectory(invokers));
}
}
}
}
上段代码关键是 : refprotocol.refer(interfaceClass, urls.get(0)); refprotocol.refer(interfaceClass, url) ; 当只有一个URL对象时,连接生成一个Invoker,如果有多个URL,也就是说有多个Provider,那么需要将多个Invoker组合成一个Directory< T > 的实现对象,做集群容错以及负载均衡。
refprotocol 在执行refer() 方法时,会从提取URL上的protocol,将执行过程传递给相应的Protocol实现对象,我们这里就是RegistryProtocol对象了。
public class RegistryProtocol implements Protocol {
@Override
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
// 获得真实的注册中心的 URL, 还原protocol的设置。已经进入了此注册器,不需要再用protocol的设置了
url = url.setProtocol(url.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_REGISTRY)).removeParameter(Constants.REGISTRY_KEY);
// 通过RegistryFactory$Adaptive 获取 ZookeeperRegistryFactory
// 获得注册中心,第一次会连接zookeeper,创建 {@link ZooKeeperClient},生成{@link ZookeeperRegistry}
Registry registry = registryFactory.getRegistry(url);
// TODO 芋艿
if (RegistryService.class.equals(type)) {
return proxyFactory.getInvoker((T)registry, type, url);
}
// 获得服务引用配置参数集合
// group="a,b" or group="*"
Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(Constants.REFER_KEY));
String group = qs.get(Constants.GROUP_KEY);
// 分组聚合,参见文档 http://dubbo.io/books/dubbo-user-book/demos/group-merger.html
if (group != null && group.length() > 0) {
if ((Constants.COMMA_SPLIT_PATTERN.split(group)).length > 1 || "*".equals(group)) {
// 执行服务引用
return doRefer(getMergeableCluster(), registry, type, url);
}
}
// 执行服务引用
return doRefer(cluster, registry, type, url);
}
/**
* 执行服务引用,返回 Invoker 对象
*
* @param cluster Cluster 对象
* @param registry 注册中心对象 {@link FailbackRegistry}
* @param type 服务接口类型
* @param url 注册中心 URL
* @param <T> 泛型
* @return Invoker 对象
*/
private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
// 创建 RegistryDirectory 对象,并设置注册中心
RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
directory.setRegistry(registry);
directory.setProtocol(protocol);
// 创建订阅 URL
// all attributes of REFER_KEY
Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters()); // 服务引用配置集合
URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, parameters.remove(Constants.REGISTER_IP_KEY), 0, type.getName(), parameters);
// 向注册中心注册自己(服务消费者),FailbackRegistry
if (!Constants.ANY_VALUE.equals(url.getServiceInterface()) && url.getParameter(Constants.REGISTER_KEY, true)) {
registry.register(subscribeUrl.addParameters(Constants.CATEGORY_KEY, Constants.CONSUMERS_CATEGORY, Constants.CHECK_KEY, String.valueOf(false)));
}
// 向注册中心订阅服务提供者, 同时订阅providers,configurators,routers 三个类目
directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY, Constants.PROVIDERS_CATEGORY + "," + Constants.CONFIGURATORS_CATEGORY + "," + Constants.ROUTERS_CATEGORY));
// 创建 Invoker 对象,【TODO 8015】集群容错
Invoker invoker = cluster.join(directory);
// 向本地注册表,注册消费者
ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory);
return invoker;
}
}
在refer(Class< T > type, URL url) 和 doRefer(Cluster cluster, Registry registry, Class< T > type, URL url) 里完成Invoker的初始化,也就是完成消费者对象的初始化过程。
总结下上述两个函数的关键作用:
- 根据protocol配置地址,连接Zookeeper,生成连接客户端对象,Zookeeper Client
- 向注册中心注册自己作为一个Consumer
- 订阅Zookeeper下指定Path,获取此目录下的Providers的URL字符串
- 根据获取到的字符串,重解析成URL对象,然后根据Interface,Version,Group 择取匹配的URL,然后根据这些URL连接相应的Privoder
- 将连接Provider生成的Invokers生成Directory,指定相应的集群容错机制,负载均衡器,之后返回给Spring容器。
我们按顺序依次看,先看第一点,生成Zookeeper Client :
public class ZkclientZookeeperClient extends AbstractZookeeperClient<IZkChildListener> {
public ZkclientZookeeperClient(URL url) {
super(url);
// 创建 client 对象,地址为:127.0.0.1:2181 形式
client = new ZkClientWrapper(url.getBackupAddress(), 30000);
// 添加连接监听器
client.addListener(new IZkStateListener() {
@Override
public void handleStateChanged(KeeperState state) throws Exception {
ZkclientZookeeperClient.this.state = state;
if (state == KeeperState.Disconnected) {
stateChanged(StateListener.DISCONNECTED);
} else if (state == KeeperState.SyncConnected) {
stateChanged(StateListener.CONNECTED);
}
}
@Override
public void handleNewSession() throws Exception {
stateChanged(StateListener.RECONNECTED);
}
});
// 启动 client
client.start();
}
}
在连接Zookeeper Server时,最终会调用上述构造函数,生成Client,同时为Zookeeper的事件注册监听器。
下面看第二点,向Zookeeper注册自身:
public class RegistryProtocol implements Protocol {
private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
......
// 向注册中心注册自己(服务消费者),FailbackRegistry
if (!Constants.ANY_VALUE.equals(url.getServiceInterface()) && url.getParameter(Constants.REGISTER_KEY, true)) {
registry.register(subscribeUrl.addParameters(Constants.CATEGORY_KEY, Constants.CONSUMERS_CATEGORY, Constants.CHECK_KEY, String.valueOf(false)));
}
}
}
public class ZookeeperRegistry extends FailbackRegistry {
/**
* 向Zookeeper注册URL
*
* @param url
*/
@Override
protected void doRegister(URL url) {
try {
zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true));
} catch (Throwable e) {
throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
/**
* 获得 URL 的路径
*
* Root + Service + Type + URL
*
* 被 {@link #doRegister(URL)} 和 {@link #doUnregister(URL)} 调用
*
* @param url URL
* @return 路径
*/
private String toUrlPath(URL url) {
return toCategoryPath(url) + Constants.PATH_SEPARATOR + URL.encode(url.toFullString());
}
}
public abstract class AbstractZookeeperClient<TargetChildListener> implements ZookeeperClient {
@Override
public void create(String path, boolean ephemeral) {
// 循环创建父路径
int i = path.lastIndexOf('/');
if (i > 0) {
String parentPath = path.substring(0, i);
if (!checkExists(parentPath)) {
// 创建持久化的父节点 ,/dubbo/***Service/providers, /dubbo/***Service/consumers
create(parentPath, false);
}
}
if (ephemeral) {
// 创建临时节点
createEphemeral(path);
} else {
// 创建持久节点
createPersistent(path);
}
}
}
向Zookeeper注册自身时,在父级目录下创建一个临时节点,节点的形式为 : Root + Service + Type + URL。
当Consumer与Zookeeper的长连接断开后,会移除此节点。
接下来看Consumer订阅Zookeeper里的Provider节点信息:
public class RegistryProtocol implements Protocol {
/**
* 执行服务引用,返回 Invoker 对象
*
* @param cluster Cluster 对象
* @param registry 注册中心对象 {@link FailbackRegistry}
* @param type 服务接口类型
* @param url 注册中心 URL
* @param <T> 泛型
* @return Invoker 对象
*/
private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
......
// 向注册中心订阅服务提供者, 同时订阅providers,configurators,routers 三个类目
directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY, Constants.PROVIDERS_CATEGORY + "," + Constants.CONFIGURATORS_CATEGORY + "," + Constants.ROUTERS_CATEGORY));
......
}
}
public class ZookeeperRegistry extends FailbackRegistry {
@Override
protected void doSubscribe(final URL url, final NotifyListener listener) {
try {
// 处理所有 Service 层的发起订阅,例如监控中心的订阅
if (Constants.ANY_VALUE.equals(url.getServiceInterface())) {
......
// 处理指定 Service 层的发起订阅,例如服务消费者的订阅
} else {
// 子节点数据数组
List<URL> urls = new ArrayList<URL>();
// 循环分类数组 , router, configurator, provider
for (String path : toCategoriesPath(url)) {
// 获得 url 对应的监听器集合
ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
if (listeners == null) { // 不存在,进行创建
zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>());
listeners = zkListeners.get(url);
}
// 获得 ChildListener 对象
ChildListener zkListener = listeners.get(listener);
if (zkListener == null) { // 不存在子目录的监听器,进行创建 ChildListener 对象
// 订阅父级目录, 当有子节点发生变化时,触发此回调函数
listeners.putIfAbsent(listener, new ChildListener() {
@Override
public void childChanged(String parentPath, List<String> currentChilds) {
// 变更时,调用 `#notify(...)` 方法,回调 NotifyListener
ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds));
}
});
zkListener = listeners.get(listener);
}
// 创建 Type 节点。该节点为持久节点。
zkClient.create(path, false);
// 向 Zookeeper ,PATH 节点,发起订阅,返回此节点下的所有子元素 path : /根节点/接口全名/providers, 比如 : /dubbo/com.bob.service.CityService/providers
List<String> children = zkClient.addChildListener(path, zkListener);
// 添加到 `urls` 中
if (children != null) {
urls.addAll(toUrlsWithEmpty(url, path, children));
}
}
// 首次全量数据获取完成时,调用 `#notify(...)` 方法,回调 NotifyListener, 在这一步从连接Provider,实例化Invoker
notify(url, listener, urls);
}
} catch (Throwable e) {
throw new RpcException("Failed to subscribe " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
}
在订阅Providers目录时,会注册一个监听器Listener,当Provider的元素发生改变时,Zookeeper会通知Client,触发此回调函数,更新Consumer持有的Provider列表,更新Invoker集群。
第一次订阅时返回的字符串集合即使Provider的URL字符串集合,Dubbo解析这些字符串,生成URL,然后根据Interface,Group,Version匹配自己指定的Provider。
下面我们看这个回调函数内容:
public abstract class AbstractRegistry implements Registry {
/**
* 通知监听器,URL 变化结果。当Zookeeper里的节点发生变化时,Dubbo注册了监听器,到时候会回调此函数
*
* 数据流向 `urls` => {@link #notified} => {@link #properties} => {@link #file}
*
* @param url 消费者 URL
* @param listener 监听器
* @param urls 通知的 URL 变化结果(全量数据)
*/
protected void notify(URL url, NotifyListener listener, List<URL> urls) {
if (url == null) {
throw new IllegalArgumentException("notify url == null");
}
if (listener == null) {
throw new IllegalArgumentException("notify listener == null");
}
if ((urls == null || urls.isEmpty())
&& !Constants.ANY_VALUE.equals(url.getServiceInterface())) {
logger.warn("Ignore empty notify urls for subscribe url " + url);
return;
}
if (logger.isInfoEnabled()) {
logger.info("Notify urls for subscribe url " + url + ", urls: " + urls);
}
// 将 `urls` 按照 `url.parameter.category` 分类,添加到集合
// 注意,特殊情况,使用 curator 连接 Zookeeper 时,若是服务消费者,连接断开,会出现 category=providers,configurations,routes
Map<String, List<URL>> result = new HashMap<String, List<URL>>();
for (URL u : urls) {
// ############根据Interface,Group,Version验证Provider是否匹配Consumer的订阅
if (UrlUtils.isMatch(url, u)) {
String category = u.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);
List<URL> categoryList = result.get(category);
if (categoryList == null) {
categoryList = new ArrayList<URL>();
result.put(category, categoryList); // category >> 相应的urls
}
categoryList.add(u);
}
}
if (result.size() == 0) {
return;
}
// 获得消费者 URL 对应的在 `notified` 中,通知的 URL 变化结果(全量数据)
Map<String, List<URL>> categoryNotified = notified.get(url);
if (categoryNotified == null) {
notified.putIfAbsent(url, new ConcurrentHashMap<String, List<URL>>());
categoryNotified = notified.get(url);
}
// 处理通知的 URL 变化结果(全量数据)
for (Map.Entry<String, List<URL>> entry : result.entrySet()) {
String category = entry.getKey();
List<URL> categoryList = entry.getValue();
// 覆盖到 `notified`
// 当某个分类的数据为空时,会依然有 urls 。其中 `urls[0].protocol = empty` ,通过这样的方式,处理所有服务提供者为空的情况。
categoryNotified.put(category, categoryList);
// 保存到文件
saveProperties(url);
// 通知监听器, 最终会调用RegistryDirectory#notify(List<URL>)
listener.notify(categoryList);
}
}
}
public class UrlUtils {
/**
* Provider和Consumer的URL是否匹配
*
* @param consumerUrl
* @param providerUrl
* @return
*/
public static boolean isMatch(URL consumerUrl, URL providerUrl) {
String consumerInterface = consumerUrl.getServiceInterface();
String providerInterface = providerUrl.getServiceInterface();
// 如果consumer指定的接口为* 或者 consumer的接口和provider的接口名称一致
if (!(Constants.ANY_VALUE.equals(consumerInterface) || StringUtils.isEquals(consumerInterface, providerInterface))) {
return false;
}
// 匹配Category, consumer的categories 包含 provider的category
if (!isMatchCategory(providerUrl.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY),
consumerUrl.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY))) {
return false;
}
// provider?enabled=true || consumer?enabled= *
if (!providerUrl.getParameter(Constants.ENABLED_KEY, true)
&& !Constants.ANY_VALUE.equals(consumerUrl.getParameter(Constants.ENABLED_KEY))) {
return false;
}
String consumerGroup = consumerUrl.getParameter(Constants.GROUP_KEY);
String consumerVersion = consumerUrl.getParameter(Constants.VERSION_KEY);
String consumerClassifier = consumerUrl.getParameter(Constants.CLASSIFIER_KEY, Constants.ANY_VALUE);
String providerGroup = providerUrl.getParameter(Constants.GROUP_KEY);
String providerVersion = providerUrl.getParameter(Constants.VERSION_KEY);
String providerClassifier = providerUrl.getParameter(Constants.CLASSIFIER_KEY, Constants.ANY_VALUE);
// group,version,classfier 均匹配
return (Constants.ANY_VALUE.equals(consumerGroup) || StringUtils.isEquals(consumerGroup, providerGroup) || StringUtils.isContains(consumerGroup,
providerGroup))
&& (Constants.ANY_VALUE.equals(consumerVersion) || StringUtils.isEquals(consumerVersion, providerVersion))
&& (consumerClassifier == null || Constants.ANY_VALUE.equals(consumerClassifier) || StringUtils.isEquals(consumerClassifier, providerClassifier));
}
}
从Zookeeper处订阅到了Provider的URL信息,之后就吃将URL信息解析生成相应的Invoker对象了:
public class RegistryDirectory<T> extends AbstractDirectory<T> implements NotifyListener {
private void refreshInvoker(List<URL> invokerUrls) {
// 从zookeeper获取到的url已经没有合适的了,在订阅返回为空时,会手动生成一个 EMPTY_PROTOCOL 的 url
if (invokerUrls != null && invokerUrls.size() == 1 && invokerUrls.get(0) != null
&& Constants.EMPTY_PROTOCOL.equals(invokerUrls.get(0).getProtocol())) {
this.forbidden = true; // Forbid to access
this.methodInvokerMap = null; // Set the method invoker map to null
destroyAllInvokers(); // Close all invokers
} else {
this.forbidden = false; // Allow to access
Map<String, Invoker<T>> oldUrlInvokerMap = this.urlInvokerMap; // local reference
if (invokerUrls.isEmpty() && this.cachedInvokerUrls != null) {
invokerUrls.addAll(this.cachedInvokerUrls);
} else {
this.cachedInvokerUrls = new HashSet<URL>();
this.cachedInvokerUrls.addAll(invokerUrls);//Cached invoker urls, convenient for comparison
}
if (invokerUrls.isEmpty()) {
return;
}
// 将url对象转换成Invoker对象,关键步骤
Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);// Translate url list to Invoker map
Map<String, List<Invoker<T>>> newMethodInvokerMap = toMethodInvokers(newUrlInvokerMap); // Change method name to map Invoker Map
// state change
// If the calculation is wrong, it is not processed.
if (newUrlInvokerMap == null || newUrlInvokerMap.size() == 0) {
logger.error(new IllegalStateException(
"urls to invokers error .invokerUrls.size :" + invokerUrls.size() + ", invoker.size :0. urls :" + invokerUrls.toString()));
return;
}
this.methodInvokerMap = multiGroup ? toMergeMethodInvokerMap(newMethodInvokerMap) : newMethodInvokerMap;
this.urlInvokerMap = newUrlInvokerMap;
try {
destroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap); // Close the unused Invoker
} catch (Exception e) {
logger.warn("destroyUnusedInvokers error. ", e);
}
}
}
/**
* Turn urls into invokers, and if url has been refer, will not re-reference.
* 将每个provider映射为相应的Invoker, Invoker内包含consumer和provider的ExchangeClient,也就是连接和信息转换对象
*
* @param urls
* @return invokers
*/
private Map<String, Invoker<T>> toInvokers(List<URL> urls) {
Map<String, Invoker<T>> newUrlInvokerMap = new HashMap<String, Invoker<T>>();
if (urls == null || urls.isEmpty()) {
return newUrlInvokerMap;
}
Set<String> keys = new HashSet<String>();
String queryProtocols = this.queryMap.get(Constants.PROTOCOL_KEY);
for (URL providerUrl : urls) {
// If protocol is configured at the reference side, only the matching protocol is selected
if (queryProtocols != null && queryProtocols.length() > 0) {
boolean accept = false;
String[] acceptProtocols = queryProtocols.split(",");
for (String acceptProtocol : acceptProtocols) {
// protocol协议必须符合
if (providerUrl.getProtocol().equals(acceptProtocol)) {
accept = true;
break;
}
}
if (!accept) {
continue;
}
}
if (Constants.EMPTY_PROTOCOL.equals(providerUrl.getProtocol())) {
continue;
}
// 如果Provider的protocol不是已知的
if (!ExtensionLoader.getExtensionLoader(Protocol.class).hasExtension(providerUrl.getProtocol())) {
logger.error(new IllegalStateException(
"Unsupported protocol " + providerUrl.getProtocol() + " in notified url: " + providerUrl + " from registry " + getUrl().getAddress()
+ " to consumer " + NetUtils.getLocalHost()
+ ", supported protocol: " + ExtensionLoader.getExtensionLoader(Protocol.class).getSupportedExtensions()));
continue;
}
URL url = mergeUrl(providerUrl);
String key = url.toFullString(); // The parameter urls are sorted
if (keys.contains(key)) { // Repeated url
continue;
}
// 缓存url相应的invoker,key 是url的fullString
keys.add(key);
// Cache key is url that does not merge with consumer side parameters, regardless of how the consumer combines parameters, if the server url
// changes, then refer again
Map<String, Invoker<T>> localUrlInvokerMap = this.urlInvokerMap; // local reference
Invoker<T> invoker = localUrlInvokerMap == null ? null : localUrlInvokerMap.get(key);
if (invoker == null) { // Not in the cache, refer again
try {
// 判断URL上是否含有disable,enable的参数,也就是是否可用
boolean enabled = true;
if (url.hasParameter(Constants.DISABLED_KEY)) {
enabled = !url.getParameter(Constants.DISABLED_KEY, false);
} else {
enabled = url.getParameter(Constants.ENABLED_KEY, true);
}
if (enabled) {
// 分发式的Invoker, 便于负载均衡 , protocol.refer(serviceType, url) refer Provider,生成Invoker
// protocol >> Protocol$Adaptive , 通过ExtesionLoader获取到的是 Wrapper包装,比如 ProtocolFilterWrapper
invoker = new InvokerDelegate<T>(protocol.refer(serviceType, url), url, providerUrl);
}
} catch (Throwable t) {
logger.error("Failed to refer invoker for interface:" + serviceType + ",url:(" + url + ")" + t.getMessage(), t);
}
if (invoker != null) { // Put new invoker in cache
newUrlInvokerMap.put(key, invoker);
}
} else {
newUrlInvokerMap.put(key, invoker);
}
}
keys.clear();
return newUrlInvokerMap;
}
}
上文中最关键的就是这行代码 : invoker = new InvokerDelegate(protocol.refer(serviceType, url), url, providerUrl);
ReferenceConfig在初始化时,Dubbo将url上的protocol替换成了registry,然后解析时使用RegistryProtocol去想Zookeeper注册自己,然后订阅Provider。
在进入RegistryProtocol时,就将protocol还原成原始值,默认是dubbo,这样这会执行protocol.refer(serviceType, url)时,就会使用DubboRegistry的refer方法,连接Provider,生成Invoker。
Consumer连接Provider,生成Invoker的流程很长,很关键,我们在下次博客接着讲。
最后
以上就是独特水壶为你收集整理的Dubbo的Consumer启动过程(通过URL对象注册,订阅)的全部内容,希望文章能够帮你解决Dubbo的Consumer启动过程(通过URL对象注册,订阅)所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复