概述
目录
- 1、注册中心的工作流程
- 2、注册中心的数据结构
- 3、订阅发布的实现
- 3.1 发布的实现
- 3.2 订阅的实现
- 4、缓存机制
- 4.1 缓存的加载
- 4.1 缓存的保存与更新
- 5、重试机制
- 6、设计模式
1、注册中心的工作流程
Dubbo通过注册中心实现了分布式环境中各服务之间的注册与发现,是各个分布式节点之间的纽带。其主要作用如下:
- 动态加入:服务提供者通过注册中心可以动态地把自己暴露给其他消费者
- 动态发现:消费者可以动态地感知新的配置、路由规则和新的服务提供者
- 动态调整:注册中心支持参数的动态调整,新参数自动更新到所有相关服务节点
- 统一配置:避免本地配置导致每个服务的配置不一致问题
Dubbo主要包含ZooKeeper、Nacos、Multicast 等注册中心的实现,其中ZooKeeper是官方推荐的注册中心。
- 服务提供者启动时,向注册中心注册自己提供的服务
- 消费者启动时,向注册中心订阅自己所需的服务
- 注册中心返回服务提供者地址列表给消费者,如果有变更,注册中心将基于长连接推送变更数据给消费者
- 服务消费者,从提供者地址列表中,基于负载均衡算法选一台提供者进行调用
- 服务消费者和提供者,在内存中累计调用次数和调用时间,定时每分钟发送一次统计数据到监控中心
2、注册中心的数据结构
ZooKeeper是树形结构的注册中心,存在多种节点类型,具体可分为:
- 持久节点:服务注册后保证节点不会丢失,注册中心重启也会存在
- 持久顺序节点:在持久节点特性的基础上增加了节点先后顺序的能力
- 临时节点:服务注册后连接丢失或session超时,注册的节点会自动被移除
- 临时顺序节点:在临时节点特性的基础上增加了节点先后顺序的能力
Dubbo使用ZooKeeper作为注册中心时,只会创建 持久节点 和 临时节点 两种,节点树形结构如图:
在Dubbo框架启动时,会根据用户配置的服务,在注册中心中创建4个目录,在providers和consumers目录中分别存储服务提供方、消费方元数据信息,主要包括IP、端口、权重和应用名等数据,同时服务元数据中的所有参数都是以键值对形式存储的。
3、订阅发布的实现
3.1 发布的实现
ZooKeeper发布代码非常简单,只是调用了ZooKeeper的客户端库在注册中心上创建一个目录,取消发布也很简单,只是把ZooKeeper注册中心上对应的路径删除,代码如下:
# 发布代码
zkClient.create(toUrlPath(url), url.getParameter(DYNAMIC_KEY, true));
# 取消发布
zkClient.delete(toUrlPath(url));
3.2 订阅的实现
订阅通常有pull和push两种方式,一种是客户端定时轮询注册中心拉取配置,另一种是注册中心主动推送数据给客户端。这两种方式各有利弊,目前Dubbo采用的是第一次启动拉取方式,后续接收事件重新拉取数据。
在服务暴露时,服务端会订阅configurators用于监听动态配置,在消费端启动时,消费端会订阅providers、routers和configurators这三个目录,分别对应服务提供者、路由和动态配置变更通知。 核心代码来自 ZookeeperRegistry,具体代码如下:
public void doSubscribe(final URL url, final NotifyListener listener) {
try {
checkDestroyed();
// 订阅全量服务(主要支持Dubbo服务治理平台(dubbo-admin),平台在启动时会订阅全量接口,它会感知每个服务的状态)
if (ANY_VALUE.equals(url.getServiceInterface())) {
String root = toRootPath();
boolean check = url.getParameter(CHECK_KEY, false);
// 订阅所有数据
ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.computeIfAbsent(url, k -> new ConcurrentHashMap<>());
ChildListener zkListener = listeners.computeIfAbsent(listener, k -> (parentPath, currentChildren) -> {
// 子节点有变化则会接到通知,遍历所有的子节点
for (String child : currentChildren) {
child = URL.decode(child);
// 如果存在子节点还未被订阅,说明是新的节点,则订阅
if (!anyServices.contains(child)) {
anyServices.add(child);
subscribe(url.setPath(child).addParameters(INTERFACE_KEY, child,
Constants.CHECK_KEY, String.valueOf(check)), k);
}
}
});
// 创建持久节点,接下来订阅持久节点的直接子节点
zkClient.create(root, false);
// 增加当前节点的订阅,并且会返回该节点下所有子节点列表
List<String> services = zkClient.addChildListener(root, zkListener);
if (CollectionUtils.isNotEmpty(services)) {
// 遍历所有节点进行订阅
for (String service : services) {
service = URL.decode(service);
anyServices.add(service);
subscribe(url.setPath(service).addParameters(INTERFACE_KEY, service,
Constants.CHECK_KEY, String.valueOf(check)), listener);
}
}
} else {
// 订阅类别服务(普通消费者的订阅逻辑)
CountDownLatch latch = new CountDownLatch(1);
try {
List<URL> urls = new ArrayList<>();
// 根据URL的类别得到一组需要订阅的路径。如果类别是*,则会订阅四种类型的路径(providers、routers、consumers> configurators),否则只订阅providers路径
for (String path : toCategoriesPath(url)) {
ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.computeIfAbsent(url, k -> new ConcurrentHashMap<>());
ChildListener zkListener = listeners.computeIfAbsent(listener, k -> new RegistryChildListenerImpl(url, k, latch));
if (zkListener instanceof RegistryChildListenerImpl) {
((RegistryChildListenerImpl) zkListener).setLatch(latch);
}
zkClient.create(path, false);
// 订阅,返回该节点下的子路径并缓存
List<String> children = zkClient.addChildListener(path, zkListener);
if (children != null) {
urls.addAll(toUrlsWithEmpty(url, path, children));
}
}
// 回调 NotifyListener 更新本地缓存信息
notify(url, listener, urls);
} finally {
// 告诉侦听器仅在主线程的同步通知完成后运行
latch.countDown();
}
}
} catch (Throwable e) {
throw new RpcException("Failed to subscribe " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
4、缓存机制
缓存的存在就是用空间换取时间,如果每次远程调用都要先从注册中心获取一次可调用的服务列表,则会让注册中心承受巨大的流量压力。另外,每次额外的网络请求也会让整个系统的性能下降,Dubbo的注册中心实现了通用的缓存机制,在抽象类AbstractRegistry中实现。AbstractRegistry类结构关系如图:
消费者或服务治理中心获取注册信息后会做本地缓存。内存中会有一份,保存在 Properties 对象里,磁盘上也会持久化一份文件,通过file对象引用。在AbstractRegistry抽象类中有如下定义:
// 本地磁盘缓存,其中特殊key value.registries记录注册中心列表,其他为通知服务提供者列表
private final Properties properties = new Properties();
// 盘文件服务缓存对象
private File file;
// 内存中的服务缓存对象,外层Map的key是消费者的 URL,内层 Map 的 key 是分类,包含 providers、consumers、routes、configurators四种。value则是对应的服务列表,对于没有服务提供者提供服务的URL,它会以特殊的empty://前缀开头
private final ConcurrentMap<URL, Map<String, List<URL>>> notified = new ConcurrentHashMap<>();
4.1 缓存的加载
在服务初始化的时候,AbstractRegistry构造函数里会从本地磁盘文件中把持久化的注册数据读到Properties对象里,并加载到内存缓存中,核心代码如下:
public AbstractRegistry(URL url) {
······
loadProperties();
notify(url.getBackupUrls());
}
private void loadProperties() {
······
// 读取磁盘上的文件并加载到 properties 中
try (InputStream in = Files.newInputStream(file.toPath())) {
properties.load(in);
if (logger.isInfoEnabled()) {
logger.info("Loaded registry cache file " + file);
}
}
······
}
Properties保存了所有服务提供者的URL,使用URL#serviceKey()作为key,提供者列表、 路由规则列表、配置规则列表等作为value。由于value是列表,当存在多个的时候使用空格隔开。还有一个特殊的key.registies,保存所有的注册中心的地址。如果应用在启动过程中,注册中心无法连接或宕机,则Dubbo框架会自动通过本地缓存加载Invoker
4.1 缓存的保存与更新
缓存的保存有同步和异步两种方式。异步会使用线程池异步保存,具体核心代码如下:
private void saveProperties(URL url) {
if (syncSaveFile) {
// 同步保存
doSaveProperties(version);
} else {
// 异步保存
registryCacheExecutor.execute(() -> doSaveProperties(version));
}
}
AbstractRegistry#notify方法中封装了更新内存缓存和更新文件缓存的逻辑,具体代码如下:
/**
* Notify changes from the Provider side.
*
* @param url consumer side url
* @param listener listener
* @param urls provider latest urls
*/
protected void notify(URL url, NotifyListener listener, List<URL> urls) {
if (url == null) {
throw new IllegalArgumentException("notify url == null");
}
if (listener == null) {
throw new IllegalArgumentException("notify listener == null");
}
if ((CollectionUtils.isEmpty(urls)) && !ANY_VALUE.equals(url.getServiceInterface())) {
logger.warn("Ignore empty notify urls for subscribe url " + url);
return;
}
if (logger.isInfoEnabled()) {
logger.info("Notify urls for subscribe url " + url + ", url size: " + urls.size());
}
// keep every provider's category.
Map<String, List<URL>> result = new HashMap<>();
for (URL u : urls) {
if (UrlUtils.isMatch(url, u)) {
String category = u.getCategory(DEFAULT_CATEGORY);
List<URL> categoryList = result.computeIfAbsent(category, k -> new ArrayList<>());
categoryList.add(u);
}
}
if (result.size() == 0) {
return;
}
Map<String, List<URL>> categoryNotified = notified.computeIfAbsent(url, u -> new ConcurrentHashMap<>());
for (Map.Entry<String, List<URL>> entry : result.entrySet()) {
String category = entry.getKey();
List<URL> categoryList = entry.getValue();
categoryNotified.put(category, categoryList);
listener.notify(categoryList);
// 在每次通知后更新缓存文件,当由于网络抖动导致订阅失败时,我们至少可以返回现有的缓存URL
if (localCacheEnabled) {
saveProperties(url);
}
}
}
5、重试机制
FailbackRegistry 继承了 AbstractRegistry,并在此基础上增加了失败重试机制作为抽象能力,FailbackRegistry 抽象类中定义了一个 HashedWheelTimer,每经过固定间隔(默认为5秒)调用AbstractRetryTask#doRetry()方法,AbstractRetryTask类结构信息如下:
/* retry task map */
// 取消订阅失败的监听器集合
private final ConcurrentMap<Holder, FailedUnsubscribedTask> failedUnsubscribed = new ConcurrentHashMap<>();
// 发起订阅失败的监听器集合
private final ConcurrentMap<Holder, FailedSubscribedTask> failedSubscribed = new ConcurrentHashMap<>();
// 取消注册失败的URL集合
private final ConcurrentMap<URL, FailedUnregisteredTask> failedUnregistered = new ConcurrentHashMap<>();
// 发起注册失败的URL集合
private final ConcurrentMap<URL, FailedRegisteredTask> failedRegistered = new ConcurrentHashMap<>();
在定时器中调用retry方法的时候,会把对应集合分别遍历和重试,重试成功则从集合中移除。
6、设计模式
Dubbo注册中心拥有良好的扩展性,用户可以在其基础上,快速开发出符合自己业务需求的注册中心。这种扩展性和Dubbo中使用的设计模式密不可分,注册中心模块使用的设计模式:
- 模板模式:注册中心的逻辑部分使用了模板模式
- 工厂模式:所有的注册中心实现,都是通过对应的工厂创建的
最后
以上就是美好宝马为你收集整理的Dubbo 注册中心详解1、注册中心的工作流程2、注册中心的数据结构3、订阅发布的实现4、缓存机制5、重试机制6、设计模式的全部内容,希望文章能够帮你解决Dubbo 注册中心详解1、注册中心的工作流程2、注册中心的数据结构3、订阅发布的实现4、缓存机制5、重试机制6、设计模式所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复