我是靠谱客的博主 知性蜗牛,最近开发中收集的这篇文章主要介绍Spark Streaming之Block生成和存储源码解析,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

  这篇文章我们要搞明白的问题有:

  1. Block数据是如何生成的
  2. Block是如何存储的

  上一篇文章已经介绍了Receiver是如何启动的,以及在Executor和Driver运行的ReceiverSupervisorReceiverReceiverTracker之间的劳作关系,但是关于Receiver接收到数据,然后和ReceiverSupervisor之间配合BlockManager进行Block生成管理的内部细节未进行说明,对此我们今天再来看看他们内部的关系。
  先来看看Block的生成流程,自然先从Receiver的启动之处作为入口,就是之前介绍过的ReceiverSupervisor中的startReceiver方法,其中会调用receiver.onStart(),直接点进去发现这是Receiver中的一个未实现的抽象方法,那么很显然应该是对于不同的Receiver有自己不同的onStart()逻辑,每个接收器可以依据自己的逻辑需求,在启动时可能会有不同的处理。
  我们直接选择KafkaInputDStream来作为观察小白鼠,看它里面的onStart是如何实现的,内部代码结构比较简单清晰,主要是为每个topic创建了一个数据流(具体如何创建这个数据流涉及ZooKeeper消费Kafka数据的源码,不在此篇范围内),然后对每个数据流的处理直接放到线程池中进行处理,具体每条消息的处理逻辑就是调用store方法:

def run() {
      logInfo("Starting MessageHandler.")
      try {
        val streamIterator = stream.iterator()
        while (streamIterator.hasNext()) {
          val msgAndMetadata = streamIterator.next()
          store((msgAndMetadata.key, msgAndMetadata.message))
        }
      } catch {
        case e: Throwable => reportError("Error handling message; exiting", e)
      }
    }

def store(dataItem: T) {
    supervisor.pushSingle(dataItem)
  }

  直接看到store方法这里就是ReceiverReceiverSupervivor建立连接的地方,到这里就已经了解了Recieiver数据获取的主要流程,当然由于每个Receiver的数据接收逻辑不一样,所以onStart方法肯定会不一样,这里只是举了一个栗子,但是流程都是一样的,因此这里算是解决了我们提到的第一个问题的一半,即Block数据的来源,但是这里是对一条一条数据的处理,后续处理生成Block流程继续。
  继续追溯刚才的pushSingle,路线为:ReceiverSupervisorImpl.pushSingle --> defaultBlockGenerator.addData,看到这里很明显addData代码如下:

def addData(data: Any): Unit = {
    if (state == Active) {
      waitToPush()
      synchronized {
        if (state == Active) {
          currentBuffer += data
        } else {
          throw new SparkException(
            "Cannot add data as BlockGenerator has not been started or has been stopped")
        }
      }
    } else {
      throw new SparkException(
        "Cannot add data as BlockGenerator has not been started or has been stopped")
    }
  }

  对于每一条数据,都是直接存入到BlockGenerator中的一个buffer数组中。这里只是看到了数据存入到currentBuffer中,结果都到了currentBuffer中,那么我们要的Block又是如何得到的呢?
  对于这个问题,我们需要注意到ReceiverSupervisorstart方法中调用的onStart方法,当然这里是ReceiverSupervisor的具体实现类ReceiverSupervisorImpl中的方法,其中的onStart方法:

override protected def onStart() {
    registeredBlockGenerators.foreach { _.start() }
  }

  而这里的registeredBlockGenerators是一个BlockGenerator数组,其数据填充通过createBlockGenerator方法来进行,方法如下:

override def createBlockGenerator(
      blockGeneratorListener: BlockGeneratorListener): BlockGenerator = {
    // Cleanup BlockGenerators that have already been stopped
    registeredBlockGenerators --= registeredBlockGenerators.filter{ _.isStopped() }

    val newBlockGenerator = new BlockGenerator(blockGeneratorListener, streamId, env.conf)
    registeredBlockGenerators += newBlockGenerator
    newBlockGenerator
  }

  那么就是我们在onStart方法中,会调用BlockGeneratorstart方法,这里我们先看看BlockGenerator类的成员和涉及的主要方法:

private[streaming] class BlockGenerator(
    listener: BlockGeneratorListener,
    receiverId: Int,
    conf: SparkConf,
    clock: Clock = new SystemClock()
  ) extends RateLimiter(conf) with Logging {

  private case class Block(id: StreamBlockId, buffer: ArrayBuffer[Any])

  // Generator的状态枚举类
  private object GeneratorState extends Enumeration {
    type GeneratorState = Value
    val Initialized, Active, StoppedAddingData, StoppedGeneratingBlocks, StoppedAll = Value
  }
  import GeneratorState._
  //生成Block的时间间隔
  private val blockIntervalMs = conf.getTimeAsMs("spark.streaming.blockInterval", "200ms")
  //生成Block的定时器
  private val blockIntervalTimer =
    new RecurringTimer(clock, blockIntervalMs, updateCurrentBuffer, "BlockGenerator")
  //生成Block的阻塞队列
  private val blockQueueSize = conf.getInt("spark.streaming.blockQueueSize", 10)
  private val blocksForPushing = new ArrayBlockingQueue[Block](blockQueueSize)
  //将生成的Block信息推送给BlockManager
  private val blockPushingThread = new Thread() { override def run() { keepPushingBlocks() } }
  //数据接收的数组
  @volatile private var currentBuffer = new ArrayBuffer[Any]
  @volatile private var state = Initialized

  /** Start block generating and pushing threads. */
  def start(): Unit = synchronized {
    if (state == Initialized) {
      state = Active
      //启动定时器定期将currentBuffer中的数据转换为一个Block并放入队列中
      blockIntervalTimer.start()
      //获取队列中的数据,发送给BlockManager
      blockPushingThread.start()
      logInfo("Started BlockGenerator")
    } else {
      throw new SparkException(
        s"Cannot start BlockGenerator as its not in the Initialized state [state = $state]")
    }
  }
  ...

  我们先看定时器如何定期将currentBuffer转换为Block的,即看其入参的的updateCurrentBuffer方法:

private def updateCurrentBuffer(time: Long): Unit = {
    try {
      var newBlock: Block = null
      //先加锁
      synchronized {
        if (currentBuffer.nonEmpty) {
          //如果currentBuffer非空,则通过赋值中间变量转换得到newBlock
          val newBlockBuffer = currentBuffer
          currentBuffer = new ArrayBuffer[Any]
          val blockId = StreamBlockId(receiverId, time - blockIntervalMs)
          listener.onGenerateBlock(blockId)
          newBlock = new Block(blockId, newBlockBuffer)
        }
      }
      //将生成的newBlock插入到队列中,等待汇报给BlockManager并进行转储
      if (newBlock != null) {
        blocksForPushing.put(newBlock)  // put is blocking when queue is full
      }
    } catch {
      ...
    }
  }

  到此,我们的Block就正式的生成了,而且注意生成Block的时间间隔是blockIntervalMs,这个参数是可以用户自己配置的,即控制生成速率,生成速率越低则同一个批次中对应的Block就越少,而任务运行的时候,是依据Block来设置Task的个数的,官网对此在调优篇的原话是:

The number of blocks in each batch determines the number of tasks that will be used to process the received data in a map-like transformation. If the number of tasks is too low (that is, less than the number of cores per machine), then it will be inefficient as all available cores will not be used to process the data. To increase the number of tasks for a given batch interval, reduce the block interval.

  即如果数据量很大,而且集群中core数足够,并且数据接收对程序性能有影响,可以修改这个参数来增加Task的个数,从而在运行的时候可以充分运用集群资源,不然在处理Task的时候会有部分core是没有任务分配的,不过个人认为这种调优估计实际应用场景应该不多。
  回到正轨,Block生成以后,需要告诉BlockManager并进行转储,这部分的内容是通过BlockGenerator中的start方法中启动的blockPushingThread线程进行的,线程中运行的方法为keepPushingBlocks,我们来看看这个方法的内容:

private def keepPushingBlocks() {
    logInfo("Started block pushing thread")
    //判断BlockGenerator是否处于非停止状态,只有在非停止状态下才可能会产生block
    def areBlocksBeingGenerated: Boolean = synchronized {
      state != StoppedGeneratingBlocks
    }
    try {
      // 如果处于非停止状态,则一直在这个while中无限循环获取队列中的数据进行处理   
      while (areBlocksBeingGenerated) {
        Option(blocksForPushing.poll(10, TimeUnit.MILLISECONDS)) match {
          case Some(block) => pushBlock(block)
          case None =>
        }
      }
      //运行到这里说明BlockGenerator已经停止生成block,但是这里需要把队列中的已经生成的Block进行汇报和转储
      while (!blocksForPushing.isEmpty) {
        val block = blocksForPushing.take()
        pushBlock(block)
      }
    } catch {
      ...
    }
  }

  很显然主要方法在pushBlock,这个方法十分简单,就是调用BlockGenerator的构造函数中的BlockGeneratorListeneronPushBlock方法,这个BlockGeneratorListener具体实现在ReceiverSupervisorImpl中,进行追溯路线为:onPushBlock --> pushArrayBuffer --> pushAndReportBlock,重点看最后的pushAndReportBlock方法如下:

def pushAndReportBlock(
      receivedBlock: ReceivedBlock,
      metadataOption: Option[Any],
      blockIdOption: Option[StreamBlockId]
    ) {
    val blockId = blockIdOption.getOrElse(nextBlockId)
    val time = System.currentTimeMillis
    //对Block数据进行汇报给BlockManager并进行存储,得到存储结果
    val blockStoreResult = receivedBlockHandler.storeBlock(blockId, receivedBlock)
    logDebug(s"Pushed block $blockId in ${(System.currentTimeMillis - time)} ms")
    val numRecords = blockStoreResult.numRecords
    //封装存储好的block信息
    val blockInfo = ReceivedBlockInfo(streamId, numRecords, metadataOption, blockStoreResult)
    //发送信息告诉ReceiverTracker存储成功的block信息
    trackerEndpoint.askWithRetry[Boolean](AddBlock(blockInfo))
    logDebug(s"Reported block $blockId")
  }

  方法逻辑十分简单,不过需要了解的是在存储部分而言,receivedBlockHandler部分的实现为:

private val receivedBlockHandler: ReceivedBlockHandler = {
    if (WriteAheadLogUtils.enableReceiverLog(env.conf)) {
      if (checkpointDirOption.isEmpty) {
        throw new SparkException(
          "Cannot enable receiver write-ahead log without checkpoint directory set. " +
            "Please use streamingContext.checkpoint() to set the checkpoint directory. " +
            "See documentation for more details.")
      }
      new WriteAheadLogBasedBlockHandler(env.blockManager, receiver.streamId,
        receiver.storageLevel, env.conf, hadoopConf, checkpointDirOption.get)
    } else {
      new BlockManagerBasedBlockHandler(env.blockManager, receiver.storageLevel)
    }
  }

  这里对应两种存储的方式,一种是直接通过BlockManager进行转储,还有一种方式是用Write Ahead Log的形式,先写WAL,然后再存到具体HDFS或者内存中,具体存储为Spark中的实现,由于篇幅所限,这里暂不扩展,后续文章继续再行研究。
  那么到了这里已经知道了Block的生成以及存储,那么对于它在发送信息给ReceiverTracker后做了啥呢?对此我们继续进行探索,对此我们需要看ReceiverTracker端对于远端发送的RPC请求的接收类,即为ReceiverTrackerEndpoint中的对应消息接收处理方法receiveAndReply:

private class ReceiverTrackerEndpoint(override val rpcEnv: RpcEnv) extends ThreadSafeRpcEndpoint {
private val submitJobThreadPool = ExecutionContext.fromExecutorService(
      ThreadUtils.newDaemonCachedThreadPool("submit-job-thead-pool"))
      ...
      override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
      // Remote messages
      case RegisterReceiver(streamId, typ, hostPort, receiverEndpoint) =>
        val successful =
          registerReceiver(streamId, typ, hostPort, receiverEndpoint, context.sender.address)
        context.reply(successful)
      case AddBlock(receivedBlockInfo) =>
        context.reply(addBlock(receivedBlockInfo))
      case DeregisterReceiver(streamId, message, error) =>
        deregisterReceiver(streamId, message, error)
        context.reply(true)
      // Local messages
      case AllReceiverIds =>
        context.reply(receiverTrackingInfos.filter(_._2.state != ReceiverState.INACTIVE).keys.toSeq)
      case StopAllReceivers =>
        assert(isTrackerStopping || isTrackerStopped)
        stopReceivers()
        context.reply(true)
    }

  我们需要注意在ReceiverSupervisorImpl中通过RPC发送消息的时候是发送的AddBlock消息,所以这里处理消息的时候匹配到的就是AddBlock对应的处理模块,对应处理方法:

private def addBlock(receivedBlockInfo: ReceivedBlockInfo): Boolean = {
    receivedBlockTracker.addBlock(receivedBlockInfo)
  }

  这里出现了ReceiverBlockTracker这个类,这里有必要看看这个类的基本构造,通过了解类的成员变量可以很好的大致知道内部功能,代码如下:

private[streaming] class ReceivedBlockTracker(
    conf: SparkConf,
    hadoopConf: Configuration,
    streamIds: Seq[Int],
    clock: Clock,
    recoverFromWriteAheadLog: Boolean,
    checkpointDirOption: Option[String])
  extends Logging {
  
  private type ReceivedBlockQueue = mutable.Queue[ReceivedBlockInfo]
  // 每个Receiver对应有一个streamId,这里就是给每一个Receiver配置了一个队列,来存储从该队列获取到的Block信息
  private val streamIdToUnallocatedBlockQueues = new mutable.HashMap[Int, ReceivedBlockQueue]
  //每个Block都是所属于某个batch的,这里就是记录每个batch有哪些block的,在任务存在延迟的时候,这个队列中应该就会
  private val timeToAllocatedBlocks = new mutable.HashMap[Time, AllocatedBlocks]
  private val writeAheadLogOption = createWriteAheadLog()

  private var lastAllocatedBatchTime: Time = null

  // 如果配置需要恢复之前的数据块,则依据WAL进行恢复
  if (recoverFromWriteAheadLog) {
    recoverPastEvents()
  }

  def addBlock(receivedBlockInfo: ReceivedBlockInfo): Boolean = synchronized {
    try {
      //写WAL
      writeToLog(BlockAdditionEvent(receivedBlockInfo))
      //将收到的block信息加入对应队列中
      getReceivedBlockQueue(receivedBlockInfo.streamId) += receivedBlockInfo
      logDebug(s"Stream ${receivedBlockInfo.streamId} received " +
        s"block ${receivedBlockInfo.blockStoreResult.blockId}")
      true
    } catch {
      case e: Exception =>
        logError(s"Error adding block $receivedBlockInfo", e)
        false
    }
  }
  ...
 }

  这里在addBlock的时候只是把接收到的block数据加入到未处理的队列中,然后在下个批次开始的时候,会需要generateJobs,就是之前文章中分析的JobGenerator中对应定时器定时执行的地方,这里再回顾一下和这边刚好是有联系的。
  在JobGenerator中定时器定期执行generateJobs,里面第一件事就是调用jobScheduler.receiverTracker.allocateBlocksToBatch(time),这个方法就是调用的我们这里说的ReceivedBlockTracker中的allocateBlocksToBatch方法,代码如下:

---> ReceivedBlockTracker.scala
def allocateBlocksToBatch(batchTime: Time): Unit = synchronized {
    if (lastAllocatedBatchTime == null || batchTime > lastAllocatedBatchTime) {
      //获取当前为分配到batch的所有block
      val streamIdToBlocks = streamIds.map { streamId =>
          (streamId, getReceivedBlockQueue(streamId).dequeueAll(x => true))
      }.toMap
      val allocatedBlocks = AllocatedBlocks(streamIdToBlocks)
      //写WAL
      writeToLog(BatchAllocationEvent(batchTime, allocatedBlocks))
      //将所有的从streamIdToUnallocatedBlockQueues队列中获取的数据一起作为value,放入到timeToAllocatedBlocks队列,从而指定了当前批次的block数据
      timeToAllocatedBlocks(batchTime) = allocatedBlocks
      lastAllocatedBatchTime = batchTime
      allocatedBlocks
    } else {
      ...
    }
  }
 
 private def getReceivedBlockQueue(streamId: Int): ReceivedBlockQueue = {
    streamIdToUnallocatedBlockQueues.getOrElseUpdate(streamId, new ReceivedBlockQueue)
  }

  以上方法中注释已经说明,每个批次开始时,会将待分配的所有block打包作为当前批次的block放入到待处理的队列中,每个批次开始处理时,就是通过这里来获取该批次需要处理的数据。
  不过到这里回想到了之前看网上各种关于Spark Streaming工作原理的介绍,都说Spark Streaming其实就是每个批次的数据作为一个个的RDD,然后对每个批次的RDD数据执行Spark作业,即将流数据按时间进行划分为一个个切片然后每个切片执行一个Spark任务。此刻作为第四篇源码分析的文章,到此已经知道了每个批次处理block数据的合集,但是还是没出现生成RDD数据的地方,不过既然已经知道获取数据的源头来自timeToAllocatedBlocks这个队列,我们就可以反向追踪了,结果发现在ReceiverInputDStream中的compute方法中代码如下:

override def compute(validTime: Time): Option[RDD[T]] = {
    val blockRDD = {
      if (validTime < graph.startTime) {
        new BlockRDD[T](ssc.sc, Array.empty)
      } else {
        val receiverTracker = ssc.scheduler.receiverTracker
        val blockInfos = receiverTracker.getBlocksOfBatch(validTime).getOrElse(id, Seq.empty)
        val inputInfo = StreamInputInfo(id, blockInfos.flatMap(_.numRecords).sum)
        ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo)
        createBlockRDD(validTime, blockInfos)
      }
    }
    Some(blockRDD)
  }

  至此,从Receiver获取数据,到生产Block,然后生成RDD数据,流程全部妥当了!自己建了一个源码交流群:936037639,如果你也在看Spark或是大数据相关框架的源码,可以进群大家互相交流哦,一个人看源码有些细节是真的不容易弄明白的,人多力量大!

最后

以上就是知性蜗牛为你收集整理的Spark Streaming之Block生成和存储源码解析的全部内容,希望文章能够帮你解决Spark Streaming之Block生成和存储源码解析所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(53)

评论列表共有 0 条评论

立即
投稿
返回
顶部