概述
nacos源码解读
版本基于2.x
实例注册接口
public class InstanceControllerV2{
//开关类 SwitchManager对该类进行管理
@Autowired
private SwitchDomain switchDomain;
/**
* 实例操作类
* InstanceOperatorClientImpl 2.x版本实现类
* InstanceOperatorServiceImpl 1.x版本实现类
* 都实现接口InstanceOperator
*/
@Autowired
private InstanceOperatorClientImpl instanceServiceV2;
@CanDistro
@PostMapping
@Secured(action = ActionTypes.WRITE)
public String register(@RequestParam(defaultValue = Constants.DEFAULT_NAMESPACE_ID) String namespaceId,
@RequestParam String serviceName, @RequestParam String ip,
@RequestParam(defaultValue = UtilsAndCommons.DEFAULT_CLUSTER_NAME) String cluster,
@RequestParam Integer port, @RequestParam(defaultValue = "true") Boolean healthy,
@RequestParam(defaultValue = "1") Double weight, @RequestParam(defaultValue = "true") Boolean enabled,
@RequestParam String metadata, @RequestParam Boolean ephemeral) throws Exception {
NamingUtils.checkServiceNameFormat(serviceName);
checkWeight(weight);
final Instance instance = InstanceBuilder.newBuilder().setServiceName(serviceName).setIp(ip)
.setClusterName(cluster).setPort(port).setHealthy(healthy).setWeight(weight).setEnabled(enabled)
.setMetadata(UtilsAndCommons.parseMetadata(metadata)).setEphemeral(ephemeral).build();
if (ephemeral == null) {
instance.setEphemeral((switchDomain.isDefaultInstanceEphemeral()));
}
//注册实例
instanceServiceV2.registerInstance(namespaceId, serviceName, instance);
return "ok";
}
}
实例操作类(2.x版本使用InstanceOperatorClientImpl)
public class InstanceOperatorClientImpl implements InstanceOperator {
/**
* 针对客户端的业务类
* EphemeralClientOperationServiceImpl 临时节点业务类
* PersistentClientOperationServiceImpl 持久节点业务类
* ClientOperationServiceProxy 临时、持久节点业务代理类
* 都实现了ClientOperationService接口
*/
private final ClientOperationService clientOperationService;
@Override
public void registerInstance(String namespaceId, String serviceName, Instance instance) {
boolean ephemeral = instance.isEphemeral();
String clientId = IpPortBasedClient.getClientId(instance.toInetAddr(), ephemeral);
createIpPortClientIfAbsent(clientId);
Service service = getService(namespaceId, serviceName, ephemeral);
//注册实例
clientOperationService.registerInstance(service, instance, clientId);
}
}
临时节点业务类
public class EphemeralClientOperationServiceImpl implements ClientOperationService {
private final ClientManager clientManager;
@Override
public void registerInstance(Service service, Instance instance, String clientId) {
Service singleton = ServiceManager.getInstance().getSingleton(service);
if (!singleton.isEphemeral()) {
throw new NacosRuntimeException(NacosException.INVALID_PARAM,
String.format("Current service %s is persistent service, can't register ephemeral instance.",
singleton.getGroupedServiceName()));
}
Client client = clientManager.getClient(clientId);
if (!clientIsLegal(client, clientId)) {
return;
}
InstancePublishInfo instanceInfo = getPublishInfo(instance);
client.addServiceInstance(singleton, instanceInfo);
client.setLastUpdatedTime();
//发布 客户端注册服务事件
NotifyCenter.publishEvent(new ClientOperationEvent.ClientRegisterServiceEvent(singleton, clientId));
//发布 实例元数据事件
NotifyCenter
.publishEvent(new MetadataEvent.InstanceMetadataEvent(singleton, instanceInfo.getMetadataId(), false));
}
}
客户端注册服务事件 订阅者
public class ClientServiceIndexesManager extends SmartSubscriber {
//收到 客户端注册服务事件 后调用方法
private void addPublisherIndexes(Service service, String clientId) {
publisherIndexes.computeIfAbsent(service, (key) -> new ConcurrentHashSet<>());
publisherIndexes.get(service).add(clientId);
//发布 服务变更事件
NotifyCenter.publishEvent(new ServiceEvent.ServiceChangedEvent(service, true));
}
}
服务变更事件 订阅者1
public class NamingSubscriberServiceV2Impl extends SmartSubscriber implements NamingSubscriberService {
@Override
public void onEvent(Event event) {
if (!upgradeJudgement.isUseGrpcFeatures()) {
return;
}
if (event instanceof ServiceEvent.ServiceChangedEvent) {
// If service changed, push to all subscribers.
//服务变更事件 走这个分支
ServiceEvent.ServiceChangedEvent serviceChangedEvent = (ServiceEvent.ServiceChangedEvent) event;
Service service = serviceChangedEvent.getService();
delayTaskEngine.addTask(service, new PushDelayTask(service, PushConfig.getInstance().getPushTaskDelay()));
} else if (event instanceof ServiceEvent.ServiceSubscribedEvent) {
// If service is subscribed by one client, only push this client.
ServiceEvent.ServiceSubscribedEvent subscribedEvent = (ServiceEvent.ServiceSubscribedEvent) event;
Service service = subscribedEvent.getService();
//异步执行变更任务 任务处理器使用的是PushDelayTaskProcessor
//这里使用到了 统一的任务异步处理机制 后边单独学习
delayTaskEngine.addTask(service, new PushDelayTask(service, PushConfig.getInstance().getPushTaskDelay(),
subscribedEvent.getClientId()));
}
}
}
PushDelayTaskProcessor 任务处理器
private static class PushDelayTaskProcessor implements NacosTaskProcessor {
private final PushDelayTaskExecuteEngine executeEngine;
public PushDelayTaskProcessor(PushDelayTaskExecuteEngine executeEngine) {
this.executeEngine = executeEngine;
}
@Override
public boolean process(NacosTask task) {
PushDelayTask pushDelayTask = (PushDelayTask) task;
Service service = pushDelayTask.getService();
//异步推送数据到客户端
NamingExecuteTaskDispatcher.getInstance()
.dispatchAndExecuteTask(service, new PushExecuteTask(service, executeEngine, pushDelayTask));
return true;
}
}
异步推送数据到客户端
public class PushExecuteTask extends AbstractExecuteTask {
@Override
public void run() {
try {
PushDataWrapper wrapper = generatePushData();
ClientManager clientManager = delayTaskEngine.getClientManager();
for (String each : getTargetClientIds()) {
Client client = clientManager.getClient(each);
if (null == client) {
// means this client has disconnect
continue;
}
Subscriber subscriber = clientManager.getClient(each).getSubscriber(service);
delayTaskEngine.getPushExecutor().doPushWithCallback(each, subscriber, wrapper,
new NamingPushCallback(each, subscriber, wrapper.getOriginalData(), delayTask.isPushToAll()));
}
} catch (Exception e) {
Loggers.PUSH.error("Push task for service" + service.getGroupedServiceName() + " execute failed ", e);
delayTaskEngine.addTask(service, new PushDelayTask(service, 1000L));
}
}
}
客户端接收处理
public class NamingPushRequestHandler implements ServerRequestHandler {
private final ServiceInfoHolder serviceInfoHolder;
public NamingPushRequestHandler(ServiceInfoHolder serviceInfoHolder) {
this.serviceInfoHolder = serviceInfoHolder;
}
@Override
public Response requestReply(Request request) {
if (request instanceof NotifySubscriberRequest) {
NotifySubscriberRequest notifyResponse = (NotifySubscriberRequest) request;
serviceInfoHolder.processServiceInfo(notifyResponse.getServiceInfo());
return new NotifySubscriberResponse();
}
return null;
}
}
public class ServiceInfoHolder implements Closeable {
public ServiceInfo processServiceInfo(ServiceInfo serviceInfo) {
String serviceKey = serviceInfo.getKey();
if (serviceKey == null) {
return null;
}
ServiceInfo oldService = serviceInfoMap.get(serviceInfo.getKey());
if (isEmptyOrErrorPush(serviceInfo)) {
//empty or error push, just ignore
return oldService;
}
serviceInfoMap.put(serviceInfo.getKey(), serviceInfo);
boolean changed = isChangedServiceInfo(oldService, serviceInfo);
if (StringUtils.isBlank(serviceInfo.getJsonFromServer())) {
serviceInfo.setJsonFromServer(JacksonUtils.toJson(serviceInfo));
}
MetricsMonitor.getServiceInfoMapSizeMonitor().set(serviceInfoMap.size());
if (changed) {
NAMING_LOGGER.info("current ips:({}) service: {} -> {}", serviceInfo.ipCount(), serviceInfo.getKey(),
JacksonUtils.toJson(serviceInfo.getHosts()));
NotifyCenter.publishEvent(new InstancesChangeEvent(serviceInfo.getName(), serviceInfo.getGroupName(),
serviceInfo.getClusters(), serviceInfo.getHosts()));
DiskCache.write(serviceInfo, cacheDir);
}
return serviceInfo;
}
}
服务变更事件 订阅者2
@Component
public class DoubleWriteEventListener extends Subscriber<ServiceEvent.ServiceChangedEvent> {
@Override
public void onEvent(ServiceEvent.ServiceChangedEvent event) {
if (stopDoubleWrite) {
return;
}
if (!upgradeJudgement.isUseGrpcFeatures()) {
return;
}
String taskKey = ServiceChangeV2Task.getKey(event.getService());
//进行副本数据同步
ServiceChangeV2Task task = new ServiceChangeV2Task(event.getService(), DoubleWriteContent.INSTANCE);
doubleWriteDelayTaskEngine.addTask(taskKey, task);
}
}
源码阅读理解
统一的事件通知机制
/**
* 通知中心类可以发布事件,注册订阅者
*/
public class NotifyCenter {
private static final NotifyCenter INSTANCE = new NotifyCenter();
/**
*
EventPublisher实现原理为线程+阻塞队列,EventPublisher线程定时拉取队列通知订阅者。
*/
private final Map<String, EventPublisher> publisherMap = new ConcurrentHashMap<>(16);
//注册订阅者
//根据订阅者的注册类型找到对应的事件发布者,在该发布者上添加订阅者
public static void registerSubscriber(final Subscriber consumer, final EventPublisherFactory factory) {
// If you want to listen to multiple events, you do it separately,
// based on subclass's subscribeTypes method return list, it can register to publisher.
if (consumer instanceof SmartSubscriber) {
for (Class<? extends Event> subscribeType : ((SmartSubscriber) consumer).subscribeTypes()) {
// For case, producer: defaultSharePublisher -> consumer: smartSubscriber.
if (ClassUtils.isAssignableFrom(SlowEvent.class, subscribeType)) {
INSTANCE.sharePublisher.addSubscriber(consumer, subscribeType);
} else {
// For case, producer: defaultPublisher -> consumer: subscriber.
addSubscriber(consumer, subscribeType, factory);
}
}
return;
}
final Class<? extends Event> subscribeType = consumer.subscribeType();
if (ClassUtils.isAssignableFrom(SlowEvent.class, subscribeType)) {
INSTANCE.sharePublisher.addSubscriber(consumer, subscribeType);
return;
}
addSubscriber(consumer, subscribeType, factory);
}
//发布事件
private static boolean publishEvent(final Class<? extends Event> eventType, final Event event) {
if (ClassUtils.isAssignableFrom(SlowEvent.class, eventType)) {
return INSTANCE.sharePublisher.publish(event);
}
final String topic = ClassUtils.getCanonicalName(eventType);
EventPublisher publisher = INSTANCE.publisherMap.get(topic);
if (publisher != null) {
return publisher.publish(event);
}
LOGGER.warn("There are no [{}] publishers for this event, please register", topic);
return false;
}
//注册事件发布者
public static EventPublisher registerToPublisher(final Class<? extends Event> eventType,
final EventPublisherFactory factory, final int queueMaxSize) {
if (ClassUtils.isAssignableFrom(SlowEvent.class, eventType)) {
return INSTANCE.sharePublisher;
}
final String topic = ClassUtils.getCanonicalName(eventType);
synchronized (NotifyCenter.class) {
// MapUtils.computeIfAbsent is a unsafe method.
MapUtil.computeIfAbsent(INSTANCE.publisherMap, topic, factory, eventType, queueMaxSize);
}
return INSTANCE.publisherMap.get(topic);
}
}
1、通知中心类使用单例,可以通过静态方法调用的方式进行发布、订阅。
2、通知中心类有三个重要方法
- 注册发布者,发布者类是一个线程类,监听内部的阻塞队列,当队列有数据时,通知订阅者执行。
- 注册订阅者,根据订阅事件类型注册到对应的发布者下。
- 发布事件,调用发布方法会往阻塞队列中加数据。
最后
以上就是敏感钢笔为你收集整理的nacos-注册中心服务注册接口及订阅通知类实例注册接口源码阅读理解的全部内容,希望文章能够帮你解决nacos-注册中心服务注册接口及订阅通知类实例注册接口源码阅读理解所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复