  1. Block数据是如何生成的
  2. Block是如何存储的


def run() {
      logInfo("Starting MessageHandler.")
      try {
        val streamIterator = stream.iterator()
        while (streamIterator.hasNext()) {
          val msgAndMetadata = streamIterator.next()
          store((msgAndMetadata.key, msgAndMetadata.message))
      } catch {
        case e: Throwable => reportError("Error handling message; exiting", e)

def store(dataItem: T) {

  继续追溯刚才的pushSingle,路线为:ReceiverSupervisorImpl.pushSingle --> defaultBlockGenerator.addData,看到这里很明显addData代码如下:

def addData(data: Any): Unit = {
    if (state == Active) {
      synchronized {
        if (state == Active) {
          currentBuffer += data
        } else {
          throw new SparkException(
            "Cannot add data as BlockGenerator has not been started or has been stopped")
    } else {
      throw new SparkException(
        "Cannot add data as BlockGenerator has not been started or has been stopped")


override protected def onStart() {
    registeredBlockGenerators.foreach { _.start() }


override def createBlockGenerator(
      blockGeneratorListener: BlockGeneratorListener): BlockGenerator = {
    // Cleanup BlockGenerators that have already been stopped
    registeredBlockGenerators --= registeredBlockGenerators.filter{ _.isStopped() }

    val newBlockGenerator = new BlockGenerator(blockGeneratorListener, streamId, env.conf)
    registeredBlockGenerators += newBlockGenerator


private[streaming] class BlockGenerator(
    listener: BlockGeneratorListener,
    receiverId: Int,
    conf: SparkConf,
    clock: Clock = new SystemClock()
  ) extends RateLimiter(conf) with Logging {

  private case class Block(id: StreamBlockId, buffer: ArrayBuffer[Any])

  // Generator的状态枚举类
  private object GeneratorState extends Enumeration {
    type GeneratorState = Value
    val Initialized, Active, StoppedAddingData, StoppedGeneratingBlocks, StoppedAll = Value
  import GeneratorState._
  private val blockIntervalMs = conf.getTimeAsMs("spark.streaming.blockInterval", "200ms")
  private val blockIntervalTimer =
    new RecurringTimer(clock, blockIntervalMs, updateCurrentBuffer, "BlockGenerator")
  private val blockQueueSize = conf.getInt("spark.streaming.blockQueueSize", 10)
  private val blocksForPushing = new ArrayBlockingQueue[Block](blockQueueSize)
  private val blockPushingThread = new Thread() { override def run() { keepPushingBlocks() } }
  @volatile private var currentBuffer = new ArrayBuffer[Any]
  @volatile private var state = Initialized

  /** Start block generating and pushing threads. */
  def start(): Unit = synchronized {
    if (state == Initialized) {
      state = Active
      logInfo("Started BlockGenerator")
    } else {
      throw new SparkException(
        s"Cannot start BlockGenerator as its not in the Initialized state [state = $state]")


private def updateCurrentBuffer(time: Long): Unit = {
    try {
      var newBlock: Block = null
      synchronized {
        if (currentBuffer.nonEmpty) {
          val newBlockBuffer = currentBuffer
          currentBuffer = new ArrayBuffer[Any]
          val blockId = StreamBlockId(receiverId, time - blockIntervalMs)
          newBlock = new Block(blockId, newBlockBuffer)
      if (newBlock != null) {
        blocksForPushing.put(newBlock)  // put is blocking when queue is full
    } catch {


The number of blocks in each batch determines the number of tasks that will be used to process the received data in a map-like transformation. If the number of tasks is too low (that is, less than the number of cores per machine), then it will be inefficient as all available cores will not be used to process the data. To increase the number of tasks for a given batch interval, reduce the block interval.


private def keepPushingBlocks() {
    logInfo("Started block pushing thread")
    def areBlocksBeingGenerated: Boolean = synchronized {
      state != StoppedGeneratingBlocks
    try {
      // 如果处于非停止状态,则一直在这个while中无限循环获取队列中的数据进行处理   
      while (areBlocksBeingGenerated) {
        Option(blocksForPushing.poll(10, TimeUnit.MILLISECONDS)) match {
          case Some(block) => pushBlock(block)
          case None =>
      while (!blocksForPushing.isEmpty) {
        val block = blocksForPushing.take()
    } catch {

  很显然主要方法在pushBlock,这个方法十分简单,就是调用BlockGenerator的构造函数中的BlockGeneratorListeneronPushBlock方法,这个BlockGeneratorListener具体实现在ReceiverSupervisorImpl中,进行追溯路线为:onPushBlock --> pushArrayBuffer --> pushAndReportBlock,重点看最后的pushAndReportBlock方法如下:

def pushAndReportBlock(
      receivedBlock: ReceivedBlock,
      metadataOption: Option[Any],
      blockIdOption: Option[StreamBlockId]
    ) {
    val blockId = blockIdOption.getOrElse(nextBlockId)
    val time = System.currentTimeMillis
    val blockStoreResult = receivedBlockHandler.storeBlock(blockId, receivedBlock)
    logDebug(s"Pushed block $blockId in ${(System.currentTimeMillis - time)} ms")
    val numRecords = blockStoreResult.numRecords
    val blockInfo = ReceivedBlockInfo(streamId, numRecords, metadataOption, blockStoreResult)
    logDebug(s"Reported block $blockId")


private val receivedBlockHandler: ReceivedBlockHandler = {
    if (WriteAheadLogUtils.enableReceiverLog(env.conf)) {
      if (checkpointDirOption.isEmpty) {
        throw new SparkException(
          "Cannot enable receiver write-ahead log without checkpoint directory set. " +
            "Please use streamingContext.checkpoint() to set the checkpoint directory. " +
            "See documentation for more details.")
      new WriteAheadLogBasedBlockHandler(env.blockManager, receiver.streamId,
        receiver.storageLevel, env.conf, hadoopConf, checkpointDirOption.get)
    } else {
      new BlockManagerBasedBlockHandler(env.blockManager, receiver.storageLevel)

  这里对应两种存储的方式,一种是直接通过BlockManager进行转储,还有一种方式是用Write Ahead Log的形式,先写WAL,然后再存到具体HDFS或者内存中,具体存储为Spark中的实现,由于篇幅所限,这里暂不扩展,后续文章继续再行研究。

private class ReceiverTrackerEndpoint(override val rpcEnv: RpcEnv) extends ThreadSafeRpcEndpoint {
private val submitJobThreadPool = ExecutionContext.fromExecutorService(
      override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
      // Remote messages
      case RegisterReceiver(streamId, typ, hostPort, receiverEndpoint) =>
        val successful =
          registerReceiver(streamId, typ, hostPort, receiverEndpoint, context.sender.address)
      case AddBlock(receivedBlockInfo) =>
      case DeregisterReceiver(streamId, message, error) =>
        deregisterReceiver(streamId, message, error)
      // Local messages
      case AllReceiverIds =>
        context.reply(receiverTrackingInfos.filter(_._2.state != ReceiverState.INACTIVE).keys.toSeq)
      case StopAllReceivers =>
        assert(isTrackerStopping || isTrackerStopped)


private def addBlock(receivedBlockInfo: ReceivedBlockInfo): Boolean = {


private[streaming] class ReceivedBlockTracker(
    conf: SparkConf,
    hadoopConf: Configuration,
    streamIds: Seq[Int],
    clock: Clock,
    recoverFromWriteAheadLog: Boolean,
    checkpointDirOption: Option[String])
  extends Logging {
  private type ReceivedBlockQueue = mutable.Queue[ReceivedBlockInfo]
  // 每个Receiver对应有一个streamId,这里就是给每一个Receiver配置了一个队列,来存储从该队列获取到的Block信息
  private val streamIdToUnallocatedBlockQueues = new mutable.HashMap[Int, ReceivedBlockQueue]
  private val timeToAllocatedBlocks = new mutable.HashMap[Time, AllocatedBlocks]
  private val writeAheadLogOption = createWriteAheadLog()

  private var lastAllocatedBatchTime: Time = null

  // 如果配置需要恢复之前的数据块,则依据WAL进行恢复
  if (recoverFromWriteAheadLog) {

  def addBlock(receivedBlockInfo: ReceivedBlockInfo): Boolean = synchronized {
    try {
      getReceivedBlockQueue(receivedBlockInfo.streamId) += receivedBlockInfo
      logDebug(s"Stream ${receivedBlockInfo.streamId} received " +
        s"block ${receivedBlockInfo.blockStoreResult.blockId}")
    } catch {
      case e: Exception =>
        logError(s"Error adding block $receivedBlockInfo", e)


---> ReceivedBlockTracker.scala
def allocateBlocksToBatch(batchTime: Time): Unit = synchronized {
    if (lastAllocatedBatchTime == null || batchTime > lastAllocatedBatchTime) {
      val streamIdToBlocks = streamIds.map { streamId =>
          (streamId, getReceivedBlockQueue(streamId).dequeueAll(x => true))
      val allocatedBlocks = AllocatedBlocks(streamIdToBlocks)
      writeToLog(BatchAllocationEvent(batchTime, allocatedBlocks))
      timeToAllocatedBlocks(batchTime) = allocatedBlocks
      lastAllocatedBatchTime = batchTime
    } else {
 private def getReceivedBlockQueue(streamId: Int): ReceivedBlockQueue = {
    streamIdToUnallocatedBlockQueues.getOrElseUpdate(streamId, new ReceivedBlockQueue)

  不过到这里回想到了之前看网上各种关于Spark Streaming工作原理的介绍,都说Spark Streaming其实就是每个批次的数据作为一个个的RDD,然后对每个批次的RDD数据执行Spark作业,即将流数据按时间进行划分为一个个切片然后每个切片执行一个Spark任务。此刻作为第四篇源码分析的文章,到此已经知道了每个批次处理block数据的合集,但是还是没出现生成RDD数据的地方,不过既然已经知道获取数据的源头来自timeToAllocatedBlocks这个队列,我们就可以反向追踪了,结果发现在ReceiverInputDStream中的compute方法中代码如下:

override def compute(validTime: Time): Option[RDD[T]] = {
    val blockRDD = {
      if (validTime < graph.startTime) {
        new BlockRDD[T](ssc.sc, Array.empty)
      } else {
        val receiverTracker = ssc.scheduler.receiverTracker
        val blockInfos = receiverTracker.getBlocksOfBatch(validTime).getOrElse(id, Seq.empty)
        val inputInfo = StreamInputInfo(id, blockInfos.flatMap(_.numRecords).sum)
        ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo)
        createBlockRDD(validTime, blockInfos)



