概述
这篇文章我们要搞明白的问题有:
- Block数据是如何生成的
- Block是如何存储的
上一篇文章已经介绍了Receiver
是如何启动的,以及在Executor和Driver运行的ReceiverSupervisor
、Receiver
、ReceiverTracker
之间的劳作关系,但是关于Receiver
接收到数据,然后和ReceiverSupervisor
之间配合BlockManager
进行Block
生成管理的内部细节未进行说明,对此我们今天再来看看他们内部的关系。
先来看看Block的生成流程,自然先从Receiver
的启动之处作为入口,就是之前介绍过的ReceiverSupervisor
中的startReceiver
方法,其中会调用receiver.onStart()
,直接点进去发现这是Receiver
中的一个未实现的抽象方法,那么很显然应该是对于不同的Receiver
有自己不同的onStart()
逻辑,每个接收器可以依据自己的逻辑需求,在启动时可能会有不同的处理。
我们直接选择KafkaInputDStream
来作为观察小白鼠,看它里面的onStart
是如何实现的,内部代码结构比较简单清晰,主要是为每个topic创建了一个数据流(具体如何创建这个数据流涉及ZooKeeper消费Kafka数据的源码,不在此篇范围内),然后对每个数据流的处理直接放到线程池中进行处理,具体每条消息的处理逻辑就是调用store
方法:
def run() {
logInfo("Starting MessageHandler.")
try {
val streamIterator = stream.iterator()
while (streamIterator.hasNext()) {
val msgAndMetadata = streamIterator.next()
store((msgAndMetadata.key, msgAndMetadata.message))
}
} catch {
case e: Throwable => reportError("Error handling message; exiting", e)
}
}
def store(dataItem: T) {
supervisor.pushSingle(dataItem)
}
直接看到store
方法这里就是Receiver
和ReceiverSupervivor
建立连接的地方,到这里就已经了解了Recieiver
数据获取的主要流程,当然由于每个Receiver
的数据接收逻辑不一样,所以onStart
方法肯定会不一样,这里只是举了一个栗子,但是流程都是一样的,因此这里算是解决了我们提到的第一个问题的一半,即Block数据的来源,但是这里是对一条一条数据的处理,后续处理生成Block流程继续。
继续追溯刚才的pushSingle
,路线为:ReceiverSupervisorImpl.pushSingle --> defaultBlockGenerator.addData
,看到这里很明显addData
代码如下:
def addData(data: Any): Unit = {
if (state == Active) {
waitToPush()
synchronized {
if (state == Active) {
currentBuffer += data
} else {
throw new SparkException(
"Cannot add data as BlockGenerator has not been started or has been stopped")
}
}
} else {
throw new SparkException(
"Cannot add data as BlockGenerator has not been started or has been stopped")
}
}
对于每一条数据,都是直接存入到BlockGenerator
中的一个buffer
数组中。这里只是看到了数据存入到currentBuffer
中,结果都到了currentBuffer
中,那么我们要的Block
又是如何得到的呢?
对于这个问题,我们需要注意到ReceiverSupervisor
中start
方法中调用的onStart
方法,当然这里是ReceiverSupervisor
的具体实现类ReceiverSupervisorImpl
中的方法,其中的onStart
方法:
override protected def onStart() {
registeredBlockGenerators.foreach { _.start() }
}
而这里的registeredBlockGenerators
是一个BlockGenerator
数组,其数据填充通过createBlockGenerator
方法来进行,方法如下:
override def createBlockGenerator(
blockGeneratorListener: BlockGeneratorListener): BlockGenerator = {
// Cleanup BlockGenerators that have already been stopped
registeredBlockGenerators --= registeredBlockGenerators.filter{ _.isStopped() }
val newBlockGenerator = new BlockGenerator(blockGeneratorListener, streamId, env.conf)
registeredBlockGenerators += newBlockGenerator
newBlockGenerator
}
那么就是我们在onStart
方法中,会调用BlockGenerator
的start
方法,这里我们先看看BlockGenerator
类的成员和涉及的主要方法:
private[streaming] class BlockGenerator(
listener: BlockGeneratorListener,
receiverId: Int,
conf: SparkConf,
clock: Clock = new SystemClock()
) extends RateLimiter(conf) with Logging {
private case class Block(id: StreamBlockId, buffer: ArrayBuffer[Any])
// Generator的状态枚举类
private object GeneratorState extends Enumeration {
type GeneratorState = Value
val Initialized, Active, StoppedAddingData, StoppedGeneratingBlocks, StoppedAll = Value
}
import GeneratorState._
//生成Block的时间间隔
private val blockIntervalMs = conf.getTimeAsMs("spark.streaming.blockInterval", "200ms")
//生成Block的定时器
private val blockIntervalTimer =
new RecurringTimer(clock, blockIntervalMs, updateCurrentBuffer, "BlockGenerator")
//生成Block的阻塞队列
private val blockQueueSize = conf.getInt("spark.streaming.blockQueueSize", 10)
private val blocksForPushing = new ArrayBlockingQueue[Block](blockQueueSize)
//将生成的Block信息推送给BlockManager
private val blockPushingThread = new Thread() { override def run() { keepPushingBlocks() } }
//数据接收的数组
@volatile private var currentBuffer = new ArrayBuffer[Any]
@volatile private var state = Initialized
/** Start block generating and pushing threads. */
def start(): Unit = synchronized {
if (state == Initialized) {
state = Active
//启动定时器定期将currentBuffer中的数据转换为一个Block并放入队列中
blockIntervalTimer.start()
//获取队列中的数据,发送给BlockManager
blockPushingThread.start()
logInfo("Started BlockGenerator")
} else {
throw new SparkException(
s"Cannot start BlockGenerator as its not in the Initialized state [state = $state]")
}
}
...
我们先看定时器如何定期将currentBuffer
转换为Block
的,即看其入参的的updateCurrentBuffer
方法:
private def updateCurrentBuffer(time: Long): Unit = {
try {
var newBlock: Block = null
//先加锁
synchronized {
if (currentBuffer.nonEmpty) {
//如果currentBuffer非空,则通过赋值中间变量转换得到newBlock
val newBlockBuffer = currentBuffer
currentBuffer = new ArrayBuffer[Any]
val blockId = StreamBlockId(receiverId, time - blockIntervalMs)
listener.onGenerateBlock(blockId)
newBlock = new Block(blockId, newBlockBuffer)
}
}
//将生成的newBlock插入到队列中,等待汇报给BlockManager并进行转储
if (newBlock != null) {
blocksForPushing.put(newBlock) // put is blocking when queue is full
}
} catch {
...
}
}
到此,我们的Block
就正式的生成了,而且注意生成Block
的时间间隔是blockIntervalMs
,这个参数是可以用户自己配置的,即控制生成速率,生成速率越低则同一个批次中对应的Block
就越少,而任务运行的时候,是依据Block
来设置Task的个数的,官网对此在调优篇的原话是:
The number of blocks in each batch determines the number of tasks that will be used to process the received data in a map-like transformation. If the number of tasks is too low (that is, less than the number of cores per machine), then it will be inefficient as all available cores will not be used to process the data. To increase the number of tasks for a given batch interval, reduce the block interval.
即如果数据量很大,而且集群中core数足够,并且数据接收对程序性能有影响,可以修改这个参数来增加Task的个数,从而在运行的时候可以充分运用集群资源,不然在处理Task的时候会有部分core是没有任务分配的,不过个人认为这种调优估计实际应用场景应该不多。
回到正轨,Block
生成以后,需要告诉BlockManager
并进行转储,这部分的内容是通过BlockGenerator
中的start
方法中启动的blockPushingThread
线程进行的,线程中运行的方法为keepPushingBlocks
,我们来看看这个方法的内容:
private def keepPushingBlocks() {
logInfo("Started block pushing thread")
//判断BlockGenerator是否处于非停止状态,只有在非停止状态下才可能会产生block
def areBlocksBeingGenerated: Boolean = synchronized {
state != StoppedGeneratingBlocks
}
try {
// 如果处于非停止状态,则一直在这个while中无限循环获取队列中的数据进行处理
while (areBlocksBeingGenerated) {
Option(blocksForPushing.poll(10, TimeUnit.MILLISECONDS)) match {
case Some(block) => pushBlock(block)
case None =>
}
}
//运行到这里说明BlockGenerator已经停止生成block,但是这里需要把队列中的已经生成的Block进行汇报和转储
while (!blocksForPushing.isEmpty) {
val block = blocksForPushing.take()
pushBlock(block)
}
} catch {
...
}
}
很显然主要方法在pushBlock
,这个方法十分简单,就是调用BlockGenerator
的构造函数中的BlockGeneratorListener
的onPushBlock
方法,这个BlockGeneratorListener
具体实现在ReceiverSupervisorImpl
中,进行追溯路线为:onPushBlock --> pushArrayBuffer --> pushAndReportBlock
,重点看最后的pushAndReportBlock
方法如下:
def pushAndReportBlock(
receivedBlock: ReceivedBlock,
metadataOption: Option[Any],
blockIdOption: Option[StreamBlockId]
) {
val blockId = blockIdOption.getOrElse(nextBlockId)
val time = System.currentTimeMillis
//对Block数据进行汇报给BlockManager并进行存储,得到存储结果
val blockStoreResult = receivedBlockHandler.storeBlock(blockId, receivedBlock)
logDebug(s"Pushed block $blockId in ${(System.currentTimeMillis - time)} ms")
val numRecords = blockStoreResult.numRecords
//封装存储好的block信息
val blockInfo = ReceivedBlockInfo(streamId, numRecords, metadataOption, blockStoreResult)
//发送信息告诉ReceiverTracker存储成功的block信息
trackerEndpoint.askWithRetry[Boolean](AddBlock(blockInfo))
logDebug(s"Reported block $blockId")
}
方法逻辑十分简单,不过需要了解的是在存储部分而言,receivedBlockHandler
部分的实现为:
private val receivedBlockHandler: ReceivedBlockHandler = {
if (WriteAheadLogUtils.enableReceiverLog(env.conf)) {
if (checkpointDirOption.isEmpty) {
throw new SparkException(
"Cannot enable receiver write-ahead log without checkpoint directory set. " +
"Please use streamingContext.checkpoint() to set the checkpoint directory. " +
"See documentation for more details.")
}
new WriteAheadLogBasedBlockHandler(env.blockManager, receiver.streamId,
receiver.storageLevel, env.conf, hadoopConf, checkpointDirOption.get)
} else {
new BlockManagerBasedBlockHandler(env.blockManager, receiver.storageLevel)
}
}
这里对应两种存储的方式,一种是直接通过BlockManager
进行转储,还有一种方式是用Write Ahead Log
的形式,先写WAL,然后再存到具体HDFS或者内存中,具体存储为Spark中的实现,由于篇幅所限,这里暂不扩展,后续文章继续再行研究。
那么到了这里已经知道了Block
的生成以及存储,那么对于它在发送信息给ReceiverTracker
后做了啥呢?对此我们继续进行探索,对此我们需要看ReceiverTracker
端对于远端发送的RPC请求的接收类,即为ReceiverTrackerEndpoint
中的对应消息接收处理方法receiveAndReply
:
private class ReceiverTrackerEndpoint(override val rpcEnv: RpcEnv) extends ThreadSafeRpcEndpoint {
private val submitJobThreadPool = ExecutionContext.fromExecutorService(
ThreadUtils.newDaemonCachedThreadPool("submit-job-thead-pool"))
...
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
// Remote messages
case RegisterReceiver(streamId, typ, hostPort, receiverEndpoint) =>
val successful =
registerReceiver(streamId, typ, hostPort, receiverEndpoint, context.sender.address)
context.reply(successful)
case AddBlock(receivedBlockInfo) =>
context.reply(addBlock(receivedBlockInfo))
case DeregisterReceiver(streamId, message, error) =>
deregisterReceiver(streamId, message, error)
context.reply(true)
// Local messages
case AllReceiverIds =>
context.reply(receiverTrackingInfos.filter(_._2.state != ReceiverState.INACTIVE).keys.toSeq)
case StopAllReceivers =>
assert(isTrackerStopping || isTrackerStopped)
stopReceivers()
context.reply(true)
}
我们需要注意在ReceiverSupervisorImpl
中通过RPC发送消息的时候是发送的AddBlock
消息,所以这里处理消息的时候匹配到的就是AddBlock
对应的处理模块,对应处理方法:
private def addBlock(receivedBlockInfo: ReceivedBlockInfo): Boolean = {
receivedBlockTracker.addBlock(receivedBlockInfo)
}
这里出现了ReceiverBlockTracker
这个类,这里有必要看看这个类的基本构造,通过了解类的成员变量可以很好的大致知道内部功能,代码如下:
private[streaming] class ReceivedBlockTracker(
conf: SparkConf,
hadoopConf: Configuration,
streamIds: Seq[Int],
clock: Clock,
recoverFromWriteAheadLog: Boolean,
checkpointDirOption: Option[String])
extends Logging {
private type ReceivedBlockQueue = mutable.Queue[ReceivedBlockInfo]
// 每个Receiver对应有一个streamId,这里就是给每一个Receiver配置了一个队列,来存储从该队列获取到的Block信息
private val streamIdToUnallocatedBlockQueues = new mutable.HashMap[Int, ReceivedBlockQueue]
//每个Block都是所属于某个batch的,这里就是记录每个batch有哪些block的,在任务存在延迟的时候,这个队列中应该就会
private val timeToAllocatedBlocks = new mutable.HashMap[Time, AllocatedBlocks]
private val writeAheadLogOption = createWriteAheadLog()
private var lastAllocatedBatchTime: Time = null
// 如果配置需要恢复之前的数据块,则依据WAL进行恢复
if (recoverFromWriteAheadLog) {
recoverPastEvents()
}
def addBlock(receivedBlockInfo: ReceivedBlockInfo): Boolean = synchronized {
try {
//写WAL
writeToLog(BlockAdditionEvent(receivedBlockInfo))
//将收到的block信息加入对应队列中
getReceivedBlockQueue(receivedBlockInfo.streamId) += receivedBlockInfo
logDebug(s"Stream ${receivedBlockInfo.streamId} received " +
s"block ${receivedBlockInfo.blockStoreResult.blockId}")
true
} catch {
case e: Exception =>
logError(s"Error adding block $receivedBlockInfo", e)
false
}
}
...
}
这里在addBlock
的时候只是把接收到的block
数据加入到未处理的队列中,然后在下个批次开始的时候,会需要generateJobs
,就是之前文章中分析的JobGenerator
中对应定时器定时执行的地方,这里再回顾一下和这边刚好是有联系的。
在JobGenerator
中定时器定期执行generateJobs
,里面第一件事就是调用jobScheduler.receiverTracker.allocateBlocksToBatch(time)
,这个方法就是调用的我们这里说的ReceivedBlockTracker
中的allocateBlocksToBatch
方法,代码如下:
---> ReceivedBlockTracker.scala
def allocateBlocksToBatch(batchTime: Time): Unit = synchronized {
if (lastAllocatedBatchTime == null || batchTime > lastAllocatedBatchTime) {
//获取当前为分配到batch的所有block
val streamIdToBlocks = streamIds.map { streamId =>
(streamId, getReceivedBlockQueue(streamId).dequeueAll(x => true))
}.toMap
val allocatedBlocks = AllocatedBlocks(streamIdToBlocks)
//写WAL
writeToLog(BatchAllocationEvent(batchTime, allocatedBlocks))
//将所有的从streamIdToUnallocatedBlockQueues队列中获取的数据一起作为value,放入到timeToAllocatedBlocks队列,从而指定了当前批次的block数据
timeToAllocatedBlocks(batchTime) = allocatedBlocks
lastAllocatedBatchTime = batchTime
allocatedBlocks
} else {
...
}
}
private def getReceivedBlockQueue(streamId: Int): ReceivedBlockQueue = {
streamIdToUnallocatedBlockQueues.getOrElseUpdate(streamId, new ReceivedBlockQueue)
}
以上方法中注释已经说明,每个批次开始时,会将待分配的所有block打包作为当前批次的block放入到待处理的队列中,每个批次开始处理时,就是通过这里来获取该批次需要处理的数据。
不过到这里回想到了之前看网上各种关于Spark Streaming工作原理的介绍,都说Spark Streaming其实就是每个批次的数据作为一个个的RDD,然后对每个批次的RDD数据执行Spark作业,即将流数据按时间进行划分为一个个切片然后每个切片执行一个Spark任务。此刻作为第四篇源码分析的文章,到此已经知道了每个批次处理block数据的合集,但是还是没出现生成RDD数据的地方,不过既然已经知道获取数据的源头来自timeToAllocatedBlocks
这个队列,我们就可以反向追踪了,结果发现在ReceiverInputDStream
中的compute
方法中代码如下:
override def compute(validTime: Time): Option[RDD[T]] = {
val blockRDD = {
if (validTime < graph.startTime) {
new BlockRDD[T](ssc.sc, Array.empty)
} else {
val receiverTracker = ssc.scheduler.receiverTracker
val blockInfos = receiverTracker.getBlocksOfBatch(validTime).getOrElse(id, Seq.empty)
val inputInfo = StreamInputInfo(id, blockInfos.flatMap(_.numRecords).sum)
ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo)
createBlockRDD(validTime, blockInfos)
}
}
Some(blockRDD)
}
至此,从Receiver
获取数据,到生产Block
,然后生成RDD数据,流程全部妥当了!自己建了一个源码交流群:936037639,如果你也在看Spark或是大数据相关框架的源码,可以进群大家互相交流哦,一个人看源码有些细节是真的不容易弄明白的,人多力量大!
最后
以上就是知性蜗牛为你收集整理的Spark Streaming之Block生成和存储源码解析的全部内容,希望文章能够帮你解决Spark Streaming之Block生成和存储源码解析所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复