概述
1. 概述
本文分享 dubbo-registry-api 模块,注册中心模块:基于注册中心下发地址的集群方式,以及对各种注册中心的抽象。
2. 抽象 API
2.1 RegistryFactory
注册中心工厂接口,代码如下:
/**
* 注册中心工厂
*/
@SPI("dubbo")
public interface RegistryFactory {
/**
* 连接注册中心.
* <p>
* 连接注册中心需处理契约:<br>
* 1. 当设置check=false时表示不检查连接,否则在连接不上时抛出异常。<br>
* 2. 支持URL上的username:password权限认证。<br>
* 3. 支持backup=10.20.153.10备选注册中心集群地址。<br>
* 4. 支持file=registry.cache本地磁盘文件缓存。<br>
* 5. 支持timeout=1000请求超时设置。<br>
* 6. 支持session=60000会话超时或过期设置。<br>
*
* @param url 注册中心地址,不允许为空
* @return 注册中心引用,总不返回空
*/
@Adaptive({"protocol"})
Registry getRegistry(URL url);
}
2.2 AbstractRegistryFactory
实现 RegistryFactory 接口,RegistryFactory 抽象类,实现了 Registry 的容器管理。
2.2.1 属性
// The lock for the acquisition process of the registry
private static final ReentrantLock LOCK = new ReentrantLock();
/**
* Registry 集合
*
* key:{@link URL#toServiceString()}
*/
// Registry Collection Map<RegistryAddress, Registry>
private static final Map<String, Registry> REGISTRIES = new ConcurrentHashMap<String, Registry>();
2.2.2 createRegistry
创建 Registry 对象。代码如下:
/**
* 创建 Registry 对象
*
* @param url 注册中心地址
* @return Registry 对象
*/
protected abstract Registry createRegistry(URL url);
子类实现该方法,创建其对应的 Registry 实现类。例如,ZookeeperRegistryFactory 的该方法,创建 ZookeeperRegistry 对象。
2.2.3 getRegistry
获得注册中心 Registry 对象。优先从缓存中获取,否则进行创建。
/**
* 获得注册中心 Registry 对象
*
* @param url 注册中心地址,不允许为空
* @return Registry 对象
*/
@Override
public Registry getRegistry(URL url) {
// 修改 URL
url = url.setPath(RegistryService.class.getName()) // + `path`
.addParameter(Constants.INTERFACE_KEY, RegistryService.class.getName()) // + `parameters.interface`
.removeParameters(Constants.EXPORT_KEY, Constants.REFER_KEY); // - `export`
// 计算 key
String key = url.toServiceString();
// 获得锁
// Lock the registry access process to ensure a single instance of the registry
LOCK.lock();
try {
// 从缓存中获得 Registry 对象
Registry registry = REGISTRIES.get(key);
if (registry != null) {
return registry;
}
// 缓存不存在,进行创建 Registry 对象
registry = createRegistry(url);
if (registry == null) {
throw new IllegalStateException("Can not create registry " + url);
}
// 添加到缓存
REGISTRIES.put(key, registry);
return registry;
} finally {
// 释放锁
// Release the lock
LOCK.unlock();
}
}
2.2.4 destroyAll
销毁所有 Registry 对象。
/**
* 销毁所有 Registry
*
* Close all created registries
*/
public static void destroyAll() {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Close all registries " + getRegistries());
}
// 获得锁
// Lock up the registry shutdown process
LOCK.lock();
try {
// 销毁
for (Registry registry : getRegistries()) {
try {
registry.destroy();
} catch (Throwable e) {
LOGGER.error(e.getMessage(), e);
}
}
// 清空缓存
REGISTRIES.clear();
} finally {
// 释放锁
// Release the lock
LOCK.unlock();
}
}
2.3 RegistryService
注册中心服务接口,定义了注册、订阅、查询三种操作方法
public interface RegistryService {
/**
* 注册数据,比如:提供者地址,消费者地址,路由规则,覆盖规则,等数据。
* <p>
* 注册需处理契约:<br>
* 1. 当URL设置了check=false时,注册失败后不报错,在后台定时重试,否则抛出异常。<br>
* 2. 当URL设置了dynamic=false参数,则需持久存储,否则,当注册者出现断电等情况异常退出时,需自动删除。<br>
* 3. 当URL设置了category=routers时,表示分类存储,缺省类别为providers,可按分类部分通知数据。<br>
* 4. 当注册中心重启,网络抖动,不能丢失数据,包括断线自动删除数据。<br>
* 5. 允许URI相同但参数不同的URL并存,不能覆盖。<br>
*/
void register(URL url);
/**
* 取消注册.
* <p>
* 取消注册需处理契约:<br>
* 1. 如果是dynamic=false的持久存储数据,找不到注册数据,则抛IllegalStateException,否则忽略。<br>
* 2. 按全URL匹配取消注册。<br>
*/
void unregister(URL url);
/**
* 订阅符合条件的已注册数据,当有注册数据变更时自动推送.
* <p>
* 订阅需处理契约:<br>
* 1. 当URL设置了check=false时,订阅失败后不报错,在后台定时重试。<br>
* 2. 当URL设置了category=routers,只通知指定分类的数据,多个分类用逗号分隔,并允许星号通配,表示订阅所有分类数据。<br>
* 3. 允许以interface,group,version,classifier作为条件查询,如:interface=com.alibaba.foo.BarService&version=1.0.0<br>
* 4. 并且查询条件允许星号通配,订阅所有接口的所有分组的所有版本,或:interface=*&group=*&version=*&classifier=*<br>
* 5. 当注册中心重启,网络抖动,需自动恢复订阅请求。<br>
* 6. 允许URI相同但参数不同的URL并存,不能覆盖。<br>
* 7. 必须阻塞订阅过程,等第一次通知完后再返回。<br>
*/
void subscribe(URL url, NotifyListener listener);
/**
* 取消订阅.
* <p>
* 取消订阅需处理契约:<br>
* 1. 如果没有订阅,直接忽略。<br>
* 2. 按全URL匹配取消订阅。<br>
*/
void unsubscribe(URL url, NotifyListener listener);
/**
* 查询符合条件的已注册数据,与订阅的推模式相对应,这里为拉模式,只返回一次结果。
*/
List<URL> lookup(URL url);
2.4 Registry
注册中心接口。Registry 继承了RegistryService 和Node 接口
/**
* 注册中心接口
*/
public interface Registry extends Node, RegistryService {
}
2.5 AbstractRegistry
实现 Registry 接口,Registry 抽象类
2.5.1 属性
// URL地址分隔符,用于文件缓存中,服务提供者URL分隔
// URL address separator, used in file cache, service provider URL separation
private static final char URL_SEPARATOR = ' ';
// URL地址分隔正则表达式,用于解析文件缓存中服务提供者URL列表
// URL address separated regular expression for parsing the service provider URL list in the file cache
private static final String URL_SPLIT = "\s+";
// Log output
protected final Logger logger = LoggerFactory.getLogger(getClass());
/**
* 本地磁盘缓存。
*
* 1. 其中特殊的 key 值 .registies 记录注册中心列表 TODO 8019 芋艿,特殊的 key 是
* 2. 其它均为 {@link #notified} 服务提供者列表
*/
// Local disk cache, where the special key value.registies records the list of registry centers, and the others are the list of notified service providers
private final Properties properties = new Properties();
/**
* 注册中心缓存写入执行器。
*
* 线程数=1
*/
// File cache timing writing
private final ExecutorService registryCacheExecutor = Executors.newFixedThreadPool(1, new NamedThreadFactory("DubboSaveRegistryCache", true));
/**
* 是否同步保存文件
*/
// Is it synchronized to save the file
private final boolean syncSaveFile;
/**
* 数据版本号
*
* {@link #properties}
*/
private final AtomicLong lastCacheChanged = new AtomicLong();
/**
* 已注册 URL 集合。
*
* 注意,注册的 URL 不仅仅可以是服务提供者的,也可以是服务消费者的
*/
private final Set<URL> registered = new ConcurrentHashSet<URL>();
/**
* 订阅 URL 的监听器集合
*
* key:订阅者的 URL ,例如消费者的 URL
*/
private final ConcurrentMap<URL, Set<NotifyListener>> subscribed = new ConcurrentHashMap<URL, Set<NotifyListener>>();
/**
* 被通知的 URL 集合
*
* key1:消费者的 URL ,例如消费者的 URL ,和 {@link #subscribed} 的键一致
* key2:分类,例如:providers、consumers、routes、configurators。【实际无 consumers ,因为消费者不会去订阅另外的消费者的列表】
* 在 {@link Constants} 中,以 "_CATEGORY" 结尾
*/
private final ConcurrentMap<URL, Map<String, List<URL>>> notified = new ConcurrentHashMap<URL, Map<String, List<URL>>>();
/**
* 注册中心 URL
*/
private URL registryUrl;
/**
* 本地磁盘缓存文件,缓存注册中心的数据
*/
// Local disk cache file
private File file;
/**
* 是否销毁
*/
private AtomicBoolean destroyed = new AtomicBoolean(false);
public AbstractRegistry(URL url) {
setUrl(url);
// Start file save timer
syncSaveFile = url.getParameter(Constants.REGISTRY_FILESAVE_SYNC_KEY, false);
// 获得 `file`
String filename = url.getParameter(Constants.FILE_KEY, System.getProperty("user.home") + "/.dubbo/dubbo-registry-" + url.getParameter(Constants.APPLICATION_KEY) + "-" + url.getAddress() + ".cache");
File file = null;
if (ConfigUtils.isNotEmpty(filename)) {
file = new File(filename);
if (!file.exists() && file.getParentFile() != null && !file.getParentFile().exists()) {
if (!file.getParentFile().mkdirs()) {
throw new IllegalArgumentException("Invalid registry store file " + file + ", cause: Failed to create directory " + file.getParentFile() + "!");
}
}
}
this.file = file;
// 加载本地磁盘缓存文件到内存缓存
loadProperties();
// 通知监听器,URL 变化结果
notify(url.getBackupUrls());
}
2.5.2register && unregister
注册和移除 注册中心url
@Override
public void register(URL url) {
if (url == null) {
throw new IllegalArgumentException("register url == null");
}
if (logger.isInfoEnabled()) {
logger.info("Register: " + url);
}
// 添加到 registered 集合
registered.add(url);
}
@Override
public void unregister(URL url) {
if (url == null) {
throw new IllegalArgumentException("unregister url == null");
}
if (logger.isInfoEnabled()) {
logger.info("Unregister: " + url);
}
// 移除出 registered 集合
registered.remove(url);
}
2.5.3 subscribe && unsubscribe
订阅和取消订阅注册中 url
@Override
public void subscribe(URL url, NotifyListener listener) {
if (url == null) {
throw new IllegalArgumentException("subscribe url == null");
}
if (listener == null) {
throw new IllegalArgumentException("subscribe listener == null");
}
if (logger.isInfoEnabled()) {
logger.info("Subscribe: " + url);
}
// 添加到 subscribed 集合
Set<NotifyListener> listeners = subscribed.get(url);
if (listeners == null) {
subscribed.putIfAbsent(url, new ConcurrentHashSet<NotifyListener>());
listeners = subscribed.get(url);
}
listeners.add(listener);
}
@Override
public void unsubscribe(URL url, NotifyListener listener) {
if (url == null) {
throw new IllegalArgumentException("unsubscribe url == null");
}
if (listener == null) {
throw new IllegalArgumentException("unsubscribe listener == null");
}
if (logger.isInfoEnabled()) {
logger.info("Unsubscribe: " + url);
}
// 移除出 subscribed 集合
Set<NotifyListener> listeners = subscribed.get(url);
if (listeners != null) {
listeners.remove(listener);
}
}
2.5.4 notify
第一,向注册中心发起订阅后,会获取到全量数据,此时会被调用 #notify(…) 方法,即 Registry 获取到了全量数据。
第二,每次注册中心发生变更时,会调用 #notify(…) 方法,虽然变化是增量,调用这个方法的调用方,已经进行处理,传入的 urls 依然是全量的。
/**
* 通知监听器,URL 变化结果。
*
* @param urls 通知的 URL 变化结果(全量数据)
*/
protected void notify(List<URL> urls) {
if (urls == null || urls.isEmpty()) return;
// 循环 `subscribed` ,通知监听器们
for (Map.Entry<URL, Set<NotifyListener>> entry : getSubscribed().entrySet()) {
URL url = entry.getKey();
// 匹配
if (!UrlUtils.isMatch(url, urls.get(0))) {
continue;
}
// 通知监听器
Set<NotifyListener> listeners = entry.getValue();
if (listeners != null) {
for (NotifyListener listener : listeners) {
try {
notify(url, listener, filterEmpty(url, urls));
} catch (Throwable t) {
logger.error("Failed to notify registry event, urls: " + urls + ", cause: " + t.getMessage(), t);
}
}
}
}
}
2.6 FailbackRegistry
实现 AbstractRegistry 抽象类,支持失败重试的 Registry 抽象类。
在上文中的代码中,我们可以看到,AbstractRegistry 进行的注册、订阅等操作,更多的是修改状态,而无和注册中心实际的操作。FailbackRegistry 在 AbstractRegistry 的基础上,实现了和注册中心实际的操作,并且支持失败重试的特性。
2.6.1 属性
/**
* 定时任务执行器
*/
// Scheduled executor service
private final ScheduledExecutorService retryExecutor = Executors.newScheduledThreadPool(1, new NamedThreadFactory("DubboRegistryFailedRetryTimer", true));
/**
* 失败重试定时器,定时检查是否有请求失败,如有,无限次重试
*/
// Timer for failure retry, regular check if there is a request for failure, and if there is, an unlimited retry
private final ScheduledFuture<?> retryFuture;
/**
* 失败发起注册失败的 URL 集合
*/
private final Set<URL> failedRegistered = new ConcurrentHashSet<URL>();
/**
* 失败取消注册失败的 URL 集合
*/
private final Set<URL> failedUnregistered = new ConcurrentHashSet<URL>();
/**
* 失败发起订阅失败的监听器集合
*/
private final ConcurrentMap<URL, Set<NotifyListener>> failedSubscribed = new ConcurrentHashMap<URL, Set<NotifyListener>>();
/**
* 失败取消订阅失败的监听器集合
*/
private final ConcurrentMap<URL, Set<NotifyListener>> failedUnsubscribed = new ConcurrentHashMap<URL, Set<NotifyListener>>();
/**
* 失败通知通知的 URL 集合
*/
private final ConcurrentMap<URL, Map<NotifyListener, List<URL>>> failedNotified = new ConcurrentHashMap<URL, Map<NotifyListener, List<URL>>>();
/**
* 是否销毁
*/
private AtomicBoolean destroyed = new AtomicBoolean(false);
public FailbackRegistry(URL url) {
super(url);
// 重试频率,单位:毫秒
int retryPeriod = url.getParameter(Constants.REGISTRY_RETRY_PERIOD_KEY, Constants.DEFAULT_REGISTRY_RETRY_PERIOD);
// 创建失败重试定时器
this.retryFuture = retryExecutor.scheduleWithFixedDelay(new Runnable() {
public void run() {
// Check and connect to the registry
try {
retry();
} catch (Throwable t) { // Defensive fault tolerance
logger.error("Unexpected error occur at failed retry, cause: " + t.getMessage(), t);
}
}
}, retryPeriod, retryPeriod, TimeUnit.MILLISECONDS);
}
2.7 NotifyListener
通知监听器。当收到服务变更通知时触发,代码如下
public interface NotifyListener {
/**
* 当收到服务变更通知时触发。
* <p>
* 通知需处理契约:<br>
* 1. 总是以服务接口和数据类型为维度全量通知,即不会通知一个服务的同类型的部分数据,用户不需要对比上一次通知结果。<br>
* 2. 订阅时的第一次通知,必须是一个服务的所有类型数据的全量通知。<br>
* 3. 中途变更时,允许不同类型的数据分开通知,比如:providers, consumers, routers, overrides,允许只通知其中一种类型,但该类型的数据必须是全量的,不是增量的。<br>
* 4. 如果一种类型的数据为空,需通知一个empty协议并带category参数的标识性URL数据。<br>
* 5. 通知者(即注册中心实现)需保证通知的顺序,比如:单线程推送,队列串行化,带版本对比。<br>
*
* @param urls 已注册信息列表,总不为空,含义同{@link com.alibaba.dubbo.registry.RegistryService#lookup(URL)}的返回值。
*/
void notify(List<URL> urls);
}
2.8 ProviderConsumerRegTable
服务提供者和消费者注册表,存储 JVM 进程内自己的服务提供者和消费者的 Invoker
/**
* 服务提供者 Invoker 集合
*
* key:服务提供者 URL 服务键
*/
public static ConcurrentHashMap<String, Set<ProviderInvokerWrapper>> providerInvokers = new ConcurrentHashMap<String, Set<ProviderInvokerWrapper>>();
/**
* 服务消费者 Invoker 集合
*
* key:服务消费者 URL 服务键
*/
public static ConcurrentHashMap<String, Set<ConsumerInvokerWrapper>> consumerInvokers = new ConcurrentHashMap<String, Set<ConsumerInvokerWrapper>>();
2.8.1 ProviderInvokerWrapper
实现 Invoker 接口,服务提供者 Invoker Wrapper ,代码如下:
/**
* Invoker 对象
*/
private Invoker<T> invoker;
/**
* 原始 URL
*/
private URL originUrl;
/**
* 注册中心 URL
*/
private URL registryUrl;
/**
* 服务提供者 URL
*/
private URL providerUrl;
/**
* 是否注册
*/
private volatile boolean isReg;
2.8.2 ConsumerInvokerWrapper
/**
* Invoker 对象
*/
private Invoker<T> invoker;
/**
* 原始 URL
*/
private URL originUrl;
/**
* 注册中心 URL
*/
private URL registryUrl;
/**
* 消费者 URL
*/
private URL consumerUrl;
/**
* 注册中心 Directory
*/
private RegistryDirectory registryDirectory;
最后
以上就是谦让故事为你收集整理的精尽 Dubbo 源码分析 —— 注册中心抽象 API1. 概述2. 抽象 API的全部内容,希望文章能够帮你解决精尽 Dubbo 源码分析 —— 注册中心抽象 API1. 概述2. 抽象 API所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复