概述
一、Nacos服务注册流程图
服务注册表serviceMap
/**
* Map(namespace, Map(group::serviceName, Service)).
*/
private final Map<String, Map<String, Service>> serviceMap = new ConcurrentHashMap<>();
二、Nacos服务注册入口
1、SpringBoot自动装配
使用Nacos做为注册中心,首先要在pom中引入:
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>
嗯,然后呢?SpringBoot项目大家都知道有一个很重要的东西:spring.factories,这个是针对引入的jar包做自动装配用的;在SpringBoot项目运行时,SpringFactoriesLoader 类会去寻找META-INF/spring.factories
文件,spring.factories用键值对的方式记录了所有需要加入容器的类,key为:org.springframework.boot.autoconfigure.EnableAutoConfiguration
,value为各种xxxAutoConfiguration
;或key为:org.springframework.cloud.bootstrap.BootstrapConfiguration
,value为各种xxxBootstrapConfiguration
。
大家对EnableAutoConfiguration
都比较熟悉了,自动装配的撒;而BootstrapConfiguration
可能会相对陌生一点,它呢是做自定义启动配置的,也就是说:所有的Bean都会在SpingApplicatin启动前加入到它的上下文中。
2、Nacos中的spring.factories
查看引入的nacos-discovery包的层级结构,找到spring.factories文件;
打开这文件瞅一瞅:
上面我有聊到BootstrapConfiguration
是在SpringBoot项目启动时自动加载配置到SpingApplicatin的上下文中。
我们进去点进去看一下这个类NacosDiscoveryClientConfigServiceBootstrapConfiguration
:
emmm,它好像啥也没干,就引入了两个自动配置类:NacosDiscoveryClientAutoConfiguration
,NacosDiscoveryAutoConfiguration
;咱也不知道哪个是和服务注册相关的,先都点进去看看撒。
(1)NacosDiscoveryClientAutoConfiguration:
额额额,从命名来看,好像没有和注册相关的,先看看下一个类吧。
(2)NacosDiscoveryAutoConfiguration:
卧槽,这个好。这个类里能看到用注册register命名的方法;感觉就它了。
3、客户端自动注册入口类NacosDiscoveryClientAutoConfiguration
NacosDiscoveryAutoConfiguration
类内部内部管理了三个类:NacosServiceRegistry
(完成服务注册功能,实现ServiceRegistry接口),NacosRegistration
(注册时用来存储nacos服务端的相关信息),NacosAutoServiceRegistration
(自动注册功能)。
这个自动注册功能是怎么实现的呢?从类的命名上来看,感觉是在NacosAutoServiceRegistration
中的,我们跟进去看一下:
NacosAutoServiceRegistration
类继承自AbstractAutoServiceRegistration
类;
再看AbstractAutoServiceRegistration
类实现了ApplicationListener
接口。ApplicationListener接口实际是一个事件监听器,其监听WebServerInitializedEvent
事件。
在AbstractAutoServiceRegistration
类的bind()方法中会绑定一个事件,启动调用start()方法进行事件的绑定操作。
start()方法中再调用register()方法实现注册功能,具体的register()内部逻辑由子类实现。
这里和AQS一样,是典型模板方法设计模式。
go on,go on;我们接着看ServiceRegistry
接口的实现类,点一下发现直接蹦到了NacosServiceRegistry
类上;感觉对味了。
虽然我们找到了服务注册的入口,但是呢,其实spring-cloud-commons
包中定义了一套服务注册规范,集成Spring Cloud实现服务注册的组件都会实现ServiceRegistry
接口。
4、Nacos服务注册类NacosServiceRegistry
现在我们继续接着NacosServiceRegistry#register()方法来看:
先是组装服务实例信息,然后走入到了NamingService
接口#registerInstance()方法,我跟,我继续往里跟;点到NamingService
接口的实现类NacosNamingService
;
NacosNamingService类在初始化的时候会实例几个比较关键的类:
// 1)发送心跳的
private BeatReactor beatReactor;
// 2)事件分发器:订阅的服务发生改变时,Nacos服务端就会通知到当前Nacos Client端;
// Client端收到这个通知后,将通知的事件给到观察者,也就是我们自己实现的listener
private EventDispatcher eventDispatcher;
// 3)与Nacos服务端通信的
private NamingProxy serverProxy;
NacosNamingService
#registerInstance()方法套娃如下:
@Override
public void registerInstance(String serviceName, Instance instance) throws NacosException {
registerInstance(serviceName, Constants.DEFAULT_GROUP, instance);
}
@Override
public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {
if (instance.isEphemeral()) {
// 组装心跳信息
BeatInfo beatInfo = new BeatInfo();
beatInfo.setServiceName(NamingUtils.getGroupedName(serviceName, groupName));
beatInfo.setIp(instance.getIp());
beatInfo.setPort(instance.getPort());
beatInfo.setCluster(instance.getClusterName());
beatInfo.setWeight(instance.getWeight());
beatInfo.setMetadata(instance.getMetadata());
beatInfo.setScheduled(false);
long instanceInterval = instance.getInstanceHeartBeatInterval();
beatInfo.setPeriod(instanceInterval == 0 ? DEFAULT_HEART_BEAT_INTERVAL : instanceInterval);
// 1)发送心跳
beatReactor.addBeatInfo(NamingUtils.getGroupedName(serviceName, groupName), beatInfo);
}
// 2)注册服务
serverProxy.registerService(NamingUtils.getGroupedName(serviceName, groupName), groupName, instance);
}
这里有两个重要的点:
1、如果实例节点是临时节点的话(默认是临时的),会组装一个心跳信息,然后通过
BeatReactor
组件发送心跳到服务端
,也就是服务续约
;
2、调用ServerProxy
组件注册服务
到服务端,即服务上线
。
在聊服务续约和服务注册之前,我们先看一下serviceName的组成,其实就是就是上面传递下来的groupName和serviceName用@@拼接起来作为新的serviceName,以达到Group层的数据隔离。
NamingUtils.getGroupedName(serviceName, groupName)
// NamingUtils
public static String getGroupedName(String serviceName, String groupName) {
return groupName + Constants.SERVICE_INFO_SPLITER + serviceName;
}
1)Client服务续约
即Nacos Client端定时发送心跳到服务端。
在BeatReactor
#addBeatInfo()方法中,会启动一个定时任务,立即执行BeatTask这个任务。
我们接着来看BeatTask这个Runnable接口的实现类,是如何运行的?
我们可以看到它主要展现两个能力:
(1)调用
NamingProxy
向Nacos服务端发送心跳;
(2)再开启一个定时任务,延时5S再持续发送心跳到Nacos服务端,即默认每5s向Nacos服务端发送一次心跳。
发送心跳就很简单了,直接采用HTTP接口调用
的方式,调用服务端的/nacos/v1/ns/instance/beat
接口。
2)Client服务注册
在NamingProxy
#registerService()方法中直接做HTTP请求调用Nacos Server的接口/nacos/v1/ns/instance
做服务注册:
public static String WEB_CONTEXT = "/nacos";
public static String NACOS_URL_BASE = WEB_CONTEXT + "/v1/ns";
public static String NACOS_URL_INSTANCE = NACOS_URL_BASE + "/instance";
5、Nacos Server端如何接收心跳、服务注册请求
上面我们聊到了Nacos Client端分别调用服务端的/nacos/v1/ns/instance/beat
、/nacos/v1/ns/instance
接口进行服务续约和服务注册,下面我们聊一下服务端对这个两个请求是如何处理的?
0)先找入口
Nacos Client通过NamingProxy
类调用Nacos Server,以开源框架的命名规范来看,Nacos Server的源码中应该有个和naming相关命名的模块;
接着往下展开,找到controllers包下的InstanceController
:
1)Server端接收服务注册请求
InstanceController#register()
方法是Nacos Server接收服务注册请求的入口:
@CanDistro
@PostMapping
@Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)
public String register(HttpServletRequest request) throws Exception {
final String namespaceId = WebUtils
.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
final String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
NamingUtils.checkServiceNameFormat(serviceName);
final Instance instance = HttpRequestInstanceBuilder.newBuilder()
.setDefaultInstanceEphemeral(switchDomain.isDefaultInstanceEphemeral()).setRequest(request).build();
// 真正注册服务的地方,serviceName为注册的服务名,namespaceId默认为public
getInstanceOperator().registerInstance(namespaceId, serviceName, instance);
return "ok";
}
其中namespaceId可以做一层数据隔离;
我们接着往里看getInstanceOperator().registerInstance()
:
InstanceOperator接口有两个实现类,分别是:InstanceOperatorClientImpl
,InstanceOperatorServiceImpl
,由于我这里是作为服务端,我们关注InstanceOperatorServiceImpl
;
1、组装服务的实例信息,包含:IP、Port
2、调用ServiceManager的registerInstanc()方法,完成真正的服务注册操作。
ServiceManager#registerInstanc()方法中主要做四件事:
1、如果服务不存在,则创建一个服务;
2、从本地缓存《Map(namespace, Map(group::serviceName, Service))》中获取服务信息;
3、校验服务不能为null;
4、将服务的实例信息添加到DataStore的dataMap缓存中
(1)创建服务
这里的操作也很简单,套娃套娃套娃,不对是封装、抽象。最终将服务信息保存到本地缓存serviceMap中。
public void createEmptyService(String namespaceId, String serviceName, boolean local) throws NacosException {
createServiceIfAbsent(namespaceId, serviceName, local, null);
}
public void createServiceIfAbsent(String namespaceId, String serviceName, boolean local, Cluster cluster)
throws NacosException {
Service service = getService(namespaceId, serviceName);
if (service == null) {
Loggers.SRV_LOG.info("creating empty service {}:{}", namespaceId, serviceName);
service = new Service();
service.setName(serviceName);
service.setNamespaceId(namespaceId);
service.setGroupName(NamingUtils.getGroupName(serviceName));
// now validate the service. if failed, exception will be thrown
service.setLastModifiedMillis(System.currentTimeMillis());
service.recalculateChecksum();
if (cluster != null) {
cluster.setService(service);
service.getClusterMap().put(cluster.getName(), cluster);
}
service.validate();
// 存储服务信息和初始化,点进去
putServiceAndInit(service);
if (!local) {
addOrReplaceService(service);
}
}
}
private void putServiceAndInit(Service service) throws NacosException {
putService(service);
service = getService(service.getNamespaceId(), service.getName());
// 服务初始化,会做健康检查
service.init();
// 添加两个监听器,使用Raft协议和 Distro协议维护数据一致性的,,包括:Nacos Client感知服务提供者实例变更
consistencyService
.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), true), service);
consistencyService
.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), false), service);
Loggers.SRV_LOG.info("[NEW-SERVICE] {}", service.toJson());
}
// ”服务注册“操作,也就是将服务信息保存到本地缓存serviceMap中。
public void putService(Service service) {
if (!serviceMap.containsKey(service.getNamespaceId())) {
serviceMap.putIfAbsent(service.getNamespaceId(), new ConcurrentSkipListMap<>());
}
serviceMap.get(service.getNamespaceId()).putIfAbsent(service.getName(), service);
}
注意:在service.init()初始化服务时,会启动一个定时任务做不健康服务的服务剔除
:
/**
* Service#init()
*/
public void init() {
// 开始一个定时任务,对不健康的服务实例做服务下线/剔除,点进去
HealthCheckReactor.scheduleCheck(clientBeatCheckTask);
for (Map.Entry<String, Cluster> entry : clusterMap.entrySet()) {
entry.getValue().setService(this);
entry.getValue().init();
}
}
@JsonIgnore
private ClientBeatCheckTask clientBeatCheckTask = new ClientBeatCheckTask(this);
走到HealthCheckReactor.scheduleCheck()
方法中,会延时5s开启一个固定延时5s的FixedDelay类型的定时任务执行服务剔除操作;
另外:由于ClientBeatCheckTask
不是NacosHealthCheckTask
的实现类,所以定时任务中执行的方法为ClientBeatCheckTask
中的run()方法;
看一下ClientBeatCheckTask
的run()方法:
通过判断当前时间和实例最后一次心跳时间的间隔是否大于阈值(默认15s),决定是否进行服务剔除/下线;
(2)从本地缓存Map中获取服务信息
从本地缓存serviceMap中获取服务信息,没别的啥操作
public Service getService(String namespaceId, String serviceName) {
// 如果namespaceId没有的话
if (serviceMap.get(namespaceId) == null) {
return null;
}
// 点进去
return chooseServiceMap(namespaceId).get(serviceName);
}
public Map<String, Service> chooseServiceMap(String namespaceId) {
return serviceMap.get(namespaceId);
}
(3)校验服务不能为null
一个单纯的判空处理;
public void checkServiceIsNull(Service service, String namespaceId, String serviceName) throws NacosException {
if (service == null) {
throw new NacosException(NacosException.INVALID_PARAM,
"service not found, namespace: " + namespaceId + ", serviceName: " + serviceName);
}
}
(4)**服务实例信息"持久化"
将服务的相应实例信息保存到DataStore的serviceMap中;由于存在多个服务实例同时注册的场景,所以要加一个synchronized锁。
public void addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips)
throws NacosException {
String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral);
Service service = getService(namespaceId, serviceName);
synchronized (service) {
List<Instance> instanceList = addIpAddresses(service, ephemeral, ips);
Instances instances = new Instances();
instances.setInstanceList(instanceList);
// 保存服务信息,key为namespaceId和serviceName的结合体,点进去
consistencyService.put(key, instances);
}
}
ConsistencyService
是一个接口,由于我们默认是采用ephemeral
方式(在聊Nacos Client时我们也有提到,服务端见Instance#isEphemeral()
或 SwitchDomain#isDefaultInstanceEphemeral()
),所以以临时Client为例,我们看一下DistroConsistencyServiceImpl
;如果是持久client,则关注RaftConsistencyServiceImpl。
@Override
public void put(String key, Record value) throws NacosException {
// 服务实例信息持久化,点进去
onPut(key, value);
// If upgrade to 2.0.X, do not sync for v1.
if (ApplicationUtils.getBean(UpgradeJudgement.class).isUseGrpcFeatures()) {
return;
}
// Nacos集群间数据同步
distroProtocol.sync(new DistroKey(key, KeyBuilder.INSTANCE_LIST_KEY_PREFIX), DataOperation.CHANGE,
DistroConfig.getInstance().getSyncDelayMillis());
}
进入到DistroConsistencyServiceImpl
类的onPut()方法:
public void onPut(String key, Record value) {
if (KeyBuilder.matchEphemeralInstanceListKey(key)) {
Datum<Instances> datum = new Datum<>();
datum.value = (Instances) value;
datum.key = key;
datum.timestamp.incrementAndGet();
// 往DataStore的dataMap中添加数据,点进去
dataStore.put(key, datum);
}
// 如果listener中没有这个key的话直接返回,key是在创建Service时添加进去的,见ServiceManager#putServiceAndInit()方法
if (!listeners.containsKey(key)) {
return;
}
// 通知Nacos client服务端服务实例信息发生变更,这里是先添加任务;点进去
notifier.addTask(key, DataOperation.CHANGE);
}
1. 往DataStore的dataMap中添加数据
@Component
public class DataStore {
private Map<String, Datum> dataMap = new ConcurrentHashMap<>(1024);
public void put(String key, Datum value) {
// 单纯的往一个Map缓存放一下数据
dataMap.put(key, value);
}
}
2. 进入到Notifier类内部,通知服务实例信息变更
进入到DistroConsistencyServiceImpl的内部类Notifier
,先看其addTask()方法:
最重要的一步将服务实例信息添加到tasks任务队列中,按常理来说到这也就结束了;但是添加到任务队列之后呢,怎么处理嘞?
我们注意到DistroConsistencyServiceImpl
中有一个@PostConstruct
修饰的init()方法,也就是说在DistroConsistencyServiceImpl
类构造器执行之后会执行这个方法启动Notifier通知器
:
我们走入Notifer#run()
方法:
首先从tasks任务丢列中取出任务,调用自己的handle()方法处理任务:
handle()方法中调用监听器listener的onChange()/onDelete()方法执行相应通知数据更新、删除的逻辑。
下面我着重看一下通知数据变更的onChange()方法:
RecordListener
是一个接口,它有三个实现,我们进入到它的实现类Service
中;
着重看一下updateIPs()方法,其他代码不涉及到主链路;
遍历要注册的instance集合,采用一个临时的Map记录clusterName和Instance实例的对应关系,然后更新clusterMap<clusterName, Cluster>:
维护
Cluster
中实例的代码大家感兴趣可以点进去看看,主要思想还是比对新老数据,找出新的instance,与挂掉的instance,最后更新 cluster对象里面的存放临时节点的集合或 存放永久节点的集合
。
最后调用PushService通知单客户端服务信息发生变更。
OK,fine!Nacos服务注册,服务端主要是将服务信息和服务的实例信息保存到两个Map(serviceMap、dataMap)中;启动一个延时5s执行的定时任务做服务剔除/下线;Notifier
做普通服务集群实例信息维护,调用PushService通知客户端服务信息发生变更。
2)Server端接收心跳请求
InstanceController#beat()
方法是Nacos Server接收心跳请求的入口:
其内部调用InstanceOperator
接口的handleBeat()方法做心跳判断;
InstanceOperator
有两个实现类,我们这里讨论的是服务端如何处理客户端的心跳请求,因此我们看InstanceOperatorServiceImpl
类;
InstanceOperatorServiceImpl
#handleBeat():
1、如果服务还没有注册,就先注册;
2、接着获取服务信息,校验服务不能为null;
3、最后,调用Service#processClientBeat()
处理客户端心跳;
Service#processClientBeat()
中启动一个只执行一次的任务:
任务的逻辑体现在ClientBeatProcessor
#run()方法中:
1、更新实例的最后一次心跳时间,进而决定服务是否需要下线/剔除;
2、如果服务之前是不可用的,则设置服务为可用的,并调用PushService
通知客户端;
服务端对于心跳的处理,主要两件事:维护心跳的最后一次时间,如果服务变为可用状态则通知客户端;另外在服务注册创建Service服务时会启动一个固定延时5S执行的定时任务做服务的剔除,其中以心跳时间为根本逻辑。
最后
以上就是无限项链为你收集整理的图文详述Nacos服务注册源码分析一、Nacos服务注册流程图二、Nacos服务注册入口的全部内容,希望文章能够帮你解决图文详述Nacos服务注册源码分析一、Nacos服务注册流程图二、Nacos服务注册入口所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复