我是靠谱客的博主 危机秋天,最近开发中收集的这篇文章主要介绍【Nacos源码篇(四)】EventPublisher源码分析,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

文章目录

    • 先看看接口
    • 接着细化其源码
      • 先看看其具体有哪些属性吧
      • init
      • 启动事件发布者后又做了啥
      • 向发布者添加事件
      • notifySubscriber

承上启下, 从NotifyCenter的publisher.publish(event)开始,进入事件发布者的源码分析
当然,以下均为个人的理解哈~!

先看看接口

public interface EventPublisher extends Closeable {
/**
* 初始化事件发布者/Initializes the event publisher.
*/
void init(Class<? extends Event> type, int bufferSize);
/**
* 事件队列长度/The number of currently staged events.
*/
long currentEventSize();
/**
* 添加订阅者/Add listener.
*/
void addSubscriber(Subscriber subscriber);
/**
* 移除订阅者/Remove listener.
*/
void removeSubscriber(Subscriber subscriber);
/**
* 发布事件/publish event.
*/
boolean publish(Event event);
/**
* 通知订阅者/Notify listener.
*/
void notifySubscriber(Subscriber subscriber, Event event);
}

⚡️继承自Closeable,事件发布者通过实现shutdown方法,来停止事件的发布(NotifyCenter进行shutdown的时候会进行调用)
⚡️ 接着看看这个接口抽象了哪些行为,就能大致了解这个事件发布者接口实现了哪些功能

接着细化其源码

先看看其具体有哪些属性吧

public class NamingEventPublisher extends Thread implements ShardedEventPublisher {
private static final String THREAD_NAME = "naming.publisher-";
private static final int DEFAULT_WAIT_TIME = 60;
private final Map<Class<? extends Event>, Set<Subscriber<? extends Event>>> subscribes = new ConcurrentHashMap<>();
private volatile boolean initialized = false;
private volatile boolean shutdown = false;
private int queueMaxSize = -1;
private BlockingQueue<Event> queue;
private String publisherName;
}

⚡️继承了Thread接口,那我可以理解成事件发布者本质是一个线程
⚡️Map<Class<? extends Event>, Set<Subscriber<? extends Event>>> subscribes是一个ConcurrentHashMap,其key是一个class,也就是说是一类事件为key,其value是一个订阅者的集合,Subscriber接口会在下一篇中分析,这里就当做事件的订阅者就行,所以这个map就是事件发布者的一个核心,在这里,保存了事件的订阅者的集合
⚡️queue是一个阻塞队列泛型是一个事件,可以暂时理解成消息暂存队列(like local mq),下文会具体阐述

init

看init方法的代码之前,我们先看看init方法是在哪里被调用的,init顾名思义就是初始化,咱从源头开始


@Override
public EventPublisher apply(final Class<? extends Event> eventType, final Integer maxQueueSize) {
Class<? extends Event> cachedEventType =
eventType.isMemberClass() ? (Class<? extends Event>) eventType.getEnclosingClass() : eventType;
publisher.computeIfAbsent(cachedEventType, eventClass -> {
NamingEventPublisher result = new NamingEventPublisher();
// here
result.init(eventClass, maxQueueSize);
return result;
});
return publisher.get(cachedEventType);
}

⚡️ 眼熟的话(因为这块代码在NotifyCenter源码分析中有贴过),这块代码是在publishmap添加订阅者的时候(如果事件发布者未初始化),就会调用EventPublisherFactory的apply实现,创建事件发布者,此时就会执行它的init()方法

接下来看看init的源码


@Override
public void init(Class<? extends Event> type, int bufferSize) {
this.queueMaxSize = bufferSize;
this.queue = new ArrayBlockingQueue<>(bufferSize);
this.publisherName = type.getSimpleName();
super.setName(THREAD_NAME + this.publisherName);
super.setDaemon(true);
super.start();
initialized = true;
}

⚡️ 其实没啥好看的,初始化了一下阻塞队列queue的大小,因为消息发布者本质又是一个Thread,于是在初始化的时候调用了其start方法,并给当前线程命了个名,由此,事件发布者在其执行init方法的时候,就开始了它的线程之旅

接着,看看其启动事件发布者后都做了些什么?

启动事件发布者后又做了啥


private void handleEvents() {
while (!shutdown) {
try {
// 取出事件
final Event event = queue.take();
// 事件处理
handleEvent(event);
} catch (InterruptedException e) {
// ..
}
}
}
private void handleEvent(Event event) {
Class<? extends Event> eventType = event.getClass();
Set<Subscriber<? extends Event>> subscribers = subscribes.get(eventType);
if (null == subscribers) {
// ..
}
// 给每个订阅者发送通知
for (Subscriber subscriber : subscribers) {
notifySubscriber(subscriber, event);
}
}

⚡️ 在其run方法里面,执行的核心方法是handleEvents,于是直接跳到这里来,这里通过一个while循环来保持发布者这个线程的持续在线,终止条件是其shutdown状态
⚡️ 通过阻塞队列来获取事件,然后再发布事件,这样当队列中无事件的时候,线程就会阻塞在这里,不会造成资源的浪费
⚡️ 接着开始handleEvent,也就是处理我们的事件,逻辑很清晰明了,拿到当前事件的Clz然后从subscribes(保存订阅者的map)中获取到订阅者的集合,接着一个for循环给每个订阅者们发送通知,notifySubscriber的具体代码会在后面贴出

所以,在事件发布者初始化的时候,就被当做一个线程,然后放在了NotifyCenter的publishmap里面,并且直接调用了其start方法启动了它,然后其内部保存了一个阻塞队列,用来暂存事件,当监听到里面的阻塞队列来了事件之后,就会消费它(挺像MQ的)

向发布者添加事件

上面描述了事件的保存位置,这里贴一下事件是怎么被添加订阅者的


@Override
public boolean publish(Event event) {
checkIsStart();
// 将事件放到阻塞队列
boolean success = this.queue.offer(event);
if (!success) {
handleEvent(event);
return true;
}
return true;
}

⚡️ 逻辑简单,就是往阻塞队列queue塞入了事件,如果成功就过,不成功的话打了个日志就直接处理这个事件,而不是放入阻塞队列了
⚡️ 如果不记得事件添加的源头的话,这里贴一下代码 ???? NotifyCenter#publishEvent{publisher.publish(event)}

notifySubscriber

就是在这里,进行了事件的通知,也是NotifyCenter调用publish(event)方法的落地


@Override
public void notifySubscriber(Subscriber subscriber, Event event) {
// ..
// 执行订阅者们的onEvent方法,即每个订阅者对于该事件的处理逻辑
final Runnable job = () -> subscriber.onEvent(event);
// 判断当前事件是异步还是同步
final Executor executor = subscriber.executor();
if (executor != null) {
// 异步
executor.execute(job);
} else {
try {
// 同步
job.run();
} catch (Throwable e) {
// ..
}
}
}

⚡️ 这里的逻辑很清晰,通过订阅者的具体实现,来决定这个消息通知是走的同步还是异步的方式
⚡️ 提交的任务就是调用传入的订阅者的onEvent方法的具体实现
⚡️ executor.execute(job)如果订阅者有对 Executor的实现,那么就是走的异步
⚡️ job.run()如果订阅者没实现Executor,那么就直接执行run,其本质就是执行了subscriber.onEvent(event)这行代码,并且是同步执行的

如果对这块线程操作理解模糊的话,下面打印一下这块位置的线程ID:

 @Override
public void notifySubscriber(Subscriber subscriber, Event event) {
System.out.println("notifySubscriber的ThreadId-1:" + + Thread.currentThread().getId());
final Runnable job = () -> {
System.out.println("notifySubscriber的ThreadId-2:" + + Thread.currentThread().getId());
subscriber.onEvent(event);
};
final Executor executor = subscriber.executor();
if (executor != null) {
executor.execute(job);
} else {
try {
job.run();
} catch (Throwable e) {
}
}
}
======================================================================================================================
日志贴在这里了,是两个事件,不过是同一类
>>取出queue事件:com.alibaba.nacos.naming.core.v2.event.client.ClientOperationEvent$ClientSubscribeServiceEvent@5dbbd6e
notifySubscriber的ThreadId-1264
notifySubscriber的ThreadId-2264
>>取出queue事件:com.alibaba.nacos.naming.core.v2.event.client.ClientOperationEvent$ClientSubscribeServiceEvent@13b131b1
notifySubscriber的ThreadId-1264
notifySubscriber的ThreadId-2264

如果你本地拉了代码能够调试的话,你就会发现:
这里就拿两次相同的事件来举例子(上面的日志可以看出,ClientOperationEvent@5dbbd6e/@13b131b1,是两个事件,但是事件类型是一样的~
notifySubscriber的ThreadId-1的线程ID是一直是同一个,并且notifySubscriber的ThreadId-2的线程ID和notifySubscriber的ThreadId-1是相同的,这就说明这里是同步的,至于为什么大牛写这段代码的时候,同步操作依然使用Runnable来包装的话,可能是为了兼容Executor的写法吧
⚡️ 当然这段代码侧面说明了 EventPublisher 是一个线程
⚡️ 订阅者监听事件可以同步,也可以异步

这里的描述有点绕,不过尽量理解,毕竟这是事件发布的落地

简单来说:可以理解为每个事件的通知均是通过调用订阅者的onEvent方法,类似于一个listener的机制,当然订阅者也可以自己选择通知方式是同步的还是异步的

到此为止,一个事件从NotifyCenter发布事件,事件发布者接收事件,其内部对事件如何通知其订阅者的链路已经剖析完,下文会对订阅者接口Subscriber进行剖析~

Nacos源码篇

  • 【Nacos源码篇(一)】Nacos源码本地环境搭建
  • 【Nacos源码篇(二)】Nacos的事件通知机制
  • 【Nacos源码篇(三)】NotifyCenter源码剖析
  • 【Nacos源码篇(四)】EventPublisher源码分析
  • 【Nacos源码篇(五)】Subscriber源码分析

语雀版文档

  • 事件通知
    • 事件通知核心流程
    • NotifyCenter源码分析
    • EventPublisher源码分析
    • Subscriber源码分析
    • Event源码分析

Nacos源码注释

最后

以上就是危机秋天为你收集整理的【Nacos源码篇(四)】EventPublisher源码分析的全部内容,希望文章能够帮你解决【Nacos源码篇(四)】EventPublisher源码分析所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(87)

评论列表共有 0 条评论

立即
投稿
返回
顶部