概述
ListenerBus对消费队列的实现
上图为LiveListenerBus类的申明
self => 这句相当于给this起了一个别名为self
LiveListenerBus负责将SparkListenerEvents异步发送过已经注册过的SparkListeners,在SparkContext中,首先会创建LiveListenerBus实例,这个类的功能是
- 保存有消息队列负责消息的缓存
- 保存有注册过的listener,负责消息分发
Listener链表保存在ListenerBus类中,为了保证并发的安全性,这里使用了CopyOnWriteArrayList类来存储Listener,关于CopyOnWriteArrayList的更多信息请点击CopyOnWriteArrayList
消息队列保存在LiveListenerBus中
从图中可以看到队列长度可以自己配置,但是spark设置了默认值10000,当缓存事件数量达到上限后,新来的事件被丢弃,具体的丢弃函数在LiveListenerBus类中如下所示
通过上面的代码可以看到,处理的方式输出日志,并且使用logDroppedEvent来保证输出仅为一次
我们继续看消息机制的核心,消息队列的生产与消费
下面的代码是消息的消费代码
listenerThread,来从消息队列中取得消息并进行分发,为了保证生产者和消费者对消息队列的并发访问,在每次需要获取消息的时候,调用eventLock.acquire()来获取信号量, 信号量的值就是当前队列中所含有的事件数量.如果正常获取到事件,就调用postToAll将事件分发给所有listener, 继续下一次循环. 如果获取到null值, 则有下面两种情况:
- stopped值被设置为true,整个应用正常的结束
- 系统发生了错误,立即终止运行
下面是生产者代码
该函数用来将事件放入到消息队列中,每成功放入一个就调用eventLock.release()来增加Semaphore信号量,供消费者消费,如果队列满了,就调用onDropEvent来处理。而真正的消费路由是通过SparkListenerBus的doPostEvent来处理
doPostEvent会根据不同的消息类型,调用listener对应的方法进行处理
消息队列的建立和发送流程
在SparkContext中会
- 创建LiveListenerBus类类型的成员变量listenerBus
- 创建各种listener,并加入到listenerBus中
- post一些事件到listenerBus中
- 调用listenerBus.start()来启动事件处理程序
listenerBus.start()调用之前,可以向其中post消息,这些消息会被缓存起来,等start函数启动之后,消费者线程会分发这些缓存的消息,listenerBus.start()是在SparkContext中的setupAndStartListenerBus()函数被调用。
这段代码运用反射机制来创建spark.extraListeners参数指定的类,对于被创建类的构造器有如下的要求
- 有一个单参数构造器并且参数为SparkConf类型,则该参数构造器被调用
- 第一种情况不满足的情况下,如果有无参构造器则将被调用
- 前两种情况都不满足的情况下,程序以抛出异常结束
在extraListener被构造并且被注册之后,listenerBus.start被调用
于此同时,启动消费者线程listenerThread,并且开始进行消息路由,在程序结束后,会调用stop函数
从上述stop()代码可以看到,在stop函数中调用了eventLock.release()增加信号量,然而并未向消息队列中添加消息所以,消费者线程listenerThread读取队列时会返回null值,进而达到结束listenerThread线程的目的
最后
以上就是标致含羞草为你收集整理的深入理解Spark之ListenerBus监听器的全部内容,希望文章能够帮你解决深入理解Spark之ListenerBus监听器所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复