概述
文章目录
- 先看看接口
- 接着细化其源码
- 先看看其具体有哪些属性吧
- init
- 启动事件发布者后又做了啥
- 向发布者添加事件
- notifySubscriber
承上启下, 从NotifyCenter的publisher.publish(event)开始,进入事件发布者的源码分析
当然,以下均为个人的理解哈~!
先看看接口
public interface EventPublisher extends Closeable {
/**
* 初始化事件发布者/Initializes the event publisher.
*/
void init(Class<? extends Event> type, int bufferSize);
/**
* 事件队列长度/The number of currently staged events.
*/
long currentEventSize();
/**
* 添加订阅者/Add listener.
*/
void addSubscriber(Subscriber subscriber);
/**
* 移除订阅者/Remove listener.
*/
void removeSubscriber(Subscriber subscriber);
/**
* 发布事件/publish event.
*/
boolean publish(Event event);
/**
* 通知订阅者/Notify listener.
*/
void notifySubscriber(Subscriber subscriber, Event event);
}
⚡️继承自Closeable,事件发布者通过实现shutdown方法,来停止事件的发布(NotifyCenter进行shutdown的时候会进行调用)
⚡️ 接着看看这个接口抽象了哪些行为,就能大致了解这个事件发布者接口实现了哪些功能
接着细化其源码
先看看其具体有哪些属性吧
public class NamingEventPublisher extends Thread implements ShardedEventPublisher {
private static final String THREAD_NAME = "naming.publisher-";
private static final int DEFAULT_WAIT_TIME = 60;
private final Map<Class<? extends Event>, Set<Subscriber<? extends Event>>> subscribes = new ConcurrentHashMap<>();
private volatile boolean initialized = false;
private volatile boolean shutdown = false;
private int queueMaxSize = -1;
private BlockingQueue<Event> queue;
private String publisherName;
}
⚡️继承了Thread
接口,那我可以理解成事件发布者本质是一个线程
⚡️Map<Class<? extends Event>, Set<Subscriber<? extends Event>>> subscribes
是一个ConcurrentHashMap,其key是一个class,也就是说是一类事件为key,其value是一个订阅者的集合,Subscriber接口会在下一篇中分析,这里就当做事件的订阅者就行,所以这个map就是事件发布者的一个核心,在这里,保存了事件的订阅者的集合
⚡️queue
是一个阻塞队列,泛型是一个事件,可以暂时理解成消息暂存队列(like local mq),下文会具体阐述
init
看init方法的代码之前,我们先看看init方法是在哪里被调用的,init顾名思义就是初始化,咱从源头开始
@Override
public EventPublisher apply(final Class<? extends Event> eventType, final Integer maxQueueSize) {
Class<? extends Event> cachedEventType =
eventType.isMemberClass() ? (Class<? extends Event>) eventType.getEnclosingClass() : eventType;
publisher.computeIfAbsent(cachedEventType, eventClass -> {
NamingEventPublisher result = new NamingEventPublisher();
// here
result.init(eventClass, maxQueueSize);
return result;
});
return publisher.get(cachedEventType);
}
⚡️ 眼熟的话(因为这块代码在NotifyCenter源码分析中有贴过),这块代码是在publishmap添加订阅者的时候(如果事件发布者未初始化),就会调用EventPublisherFactory的apply实现,创建事件发布者,此时就会执行它的init()方法
接下来看看init的源码
@Override
public void init(Class<? extends Event> type, int bufferSize) {
this.queueMaxSize = bufferSize;
this.queue = new ArrayBlockingQueue<>(bufferSize);
this.publisherName = type.getSimpleName();
super.setName(THREAD_NAME + this.publisherName);
super.setDaemon(true);
super.start();
initialized = true;
}
⚡️ 其实没啥好看的,初始化了一下阻塞队列queue
的大小,因为消息发布者本质又是一个Thread
,于是在初始化的时候调用了其start
方法,并给当前线程命了个名,由此,事件发布者在其执行init方法的时候,就开始了它的线程之旅
接着,看看其启动事件发布者后都做了些什么?
启动事件发布者后又做了啥
private void handleEvents() {
while (!shutdown) {
try {
// 取出事件
final Event event = queue.take();
// 事件处理
handleEvent(event);
} catch (InterruptedException e) {
// ..
}
}
}
private void handleEvent(Event event) {
Class<? extends Event> eventType = event.getClass();
Set<Subscriber<? extends Event>> subscribers = subscribes.get(eventType);
if (null == subscribers) {
// ..
}
// 给每个订阅者发送通知
for (Subscriber subscriber : subscribers) {
notifySubscriber(subscriber, event);
}
}
⚡️ 在其run方法里面,执行的核心方法是handleEvents,于是直接跳到这里来,这里通过一个while循环来保持发布者这个线程的持续在线,终止条件是其shutdown状态
⚡️ 通过阻塞队列来获取事件,然后再发布事件,这样当队列中无事件的时候,线程就会阻塞在这里,不会造成资源的浪费
⚡️ 接着开始handleEvent
,也就是处理我们的事件,逻辑很清晰明了,拿到当前事件的Clz,然后从subscribes(保存订阅者的map)中获取到订阅者的集合,接着一个for循环给每个订阅者们发送通知,notifySubscriber
的具体代码会在后面贴出
所以,在事件发布者初始化的时候,就被当做一个线程,然后放在了NotifyCenter的publishmap里面,并且直接调用了其start方法启动了它,然后其内部保存了一个阻塞队列,用来暂存事件,当监听到里面的阻塞队列来了事件之后,就会消费它(挺像MQ的)
向发布者添加事件
上面描述了事件的保存位置,这里贴一下事件是怎么被添加订阅者的
@Override
public boolean publish(Event event) {
checkIsStart();
// 将事件放到阻塞队列
boolean success = this.queue.offer(event);
if (!success) {
handleEvent(event);
return true;
}
return true;
}
⚡️ 逻辑简单,就是往阻塞队列queue塞入了事件,如果成功就过,不成功的话打了个日志就直接处理这个事件,而不是放入阻塞队列了
⚡️ 如果不记得事件添加的源头的话,这里贴一下代码 ???? NotifyCenter#publishEvent{publisher.publish(event)}
notifySubscriber
就是在这里,进行了事件的通知,也是NotifyCenter调用publish(event)方法的落地
@Override
public void notifySubscriber(Subscriber subscriber, Event event) {
// ..
// 执行订阅者们的onEvent方法,即每个订阅者对于该事件的处理逻辑
final Runnable job = () -> subscriber.onEvent(event);
// 判断当前事件是异步还是同步
final Executor executor = subscriber.executor();
if (executor != null) {
// 异步
executor.execute(job);
} else {
try {
// 同步
job.run();
} catch (Throwable e) {
// ..
}
}
}
⚡️ 这里的逻辑很清晰,通过订阅者的具体实现,来决定这个消息通知是走的同步还是异步的方式
⚡️ 提交的任务就是调用传入的订阅者的onEvent方法的具体实现
⚡️ executor.execute(job)
如果订阅者有对 Executor的实现,那么就是走的异步
⚡️ job.run()
如果订阅者没实现Executor,那么就直接执行run,其本质就是执行了subscriber.onEvent(event)
这行代码,并且是同步执行的
如果对这块线程操作理解模糊的话,下面打印一下这块位置的线程ID:
@Override
public void notifySubscriber(Subscriber subscriber, Event event) {
System.out.println("notifySubscriber的ThreadId-1:" + + Thread.currentThread().getId());
final Runnable job = () -> {
System.out.println("notifySubscriber的ThreadId-2:" + + Thread.currentThread().getId());
subscriber.onEvent(event);
};
final Executor executor = subscriber.executor();
if (executor != null) {
executor.execute(job);
} else {
try {
job.run();
} catch (Throwable e) {
}
}
}
======================================================================================================================
日志贴在这里了,是两个事件,不过是同一类
>>取出queue事件:com.alibaba.nacos.naming.core.v2.event.client.ClientOperationEvent$ClientSubscribeServiceEvent@5dbbd6e
notifySubscriber的ThreadId-1:264
notifySubscriber的ThreadId-2:264
>>取出queue事件:com.alibaba.nacos.naming.core.v2.event.client.ClientOperationEvent$ClientSubscribeServiceEvent@13b131b1
notifySubscriber的ThreadId-1:264
notifySubscriber的ThreadId-2:264
如果你本地拉了代码能够调试的话,你就会发现:
这里就拿两次相同的事件来举例子(上面的日志可以看出,ClientOperationEvent@5dbbd6e/@13b131b1,是两个事件,但是事件类型是一样的~)
notifySubscriber的ThreadId-1的线程ID是一直是同一个,并且notifySubscriber的ThreadId-2的线程ID和notifySubscriber的ThreadId-1是相同的,这就说明这里是同步的,至于为什么大牛写这段代码的时候,同步操作依然使用Runnable来包装的话,可能是为了兼容Executor的写法吧
⚡️ 当然这段代码侧面说明了 EventPublisher 是一个线程
⚡️ 订阅者监听事件可以同步,也可以异步
这里的描述有点绕,不过尽量理解,毕竟这是事件发布的落地
简单来说:可以理解为每个事件的通知均是通过调用订阅者的onEvent方法,类似于一个listener的机制,当然订阅者也可以自己选择通知方式是同步的还是异步的
到此为止,一个事件从NotifyCenter发布事件,事件发布者接收事件,其内部对事件如何通知其订阅者的链路已经剖析完,下文会对订阅者接口Subscriber进行剖析~
Nacos源码篇
- 【Nacos源码篇(一)】Nacos源码本地环境搭建
- 【Nacos源码篇(二)】Nacos的事件通知机制
- 【Nacos源码篇(三)】NotifyCenter源码剖析
- 【Nacos源码篇(四)】EventPublisher源码分析
- 【Nacos源码篇(五)】Subscriber源码分析
语雀版文档
- 事件通知
- 事件通知核心流程
- NotifyCenter源码分析
- EventPublisher源码分析
- Subscriber源码分析
- Event源码分析
Nacos源码注释
最后
以上就是危机秋天为你收集整理的【Nacos源码篇(四)】EventPublisher源码分析的全部内容,希望文章能够帮你解决【Nacos源码篇(四)】EventPublisher源码分析所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复