我是靠谱客的博主 刻苦导师,最近开发中收集的这篇文章主要介绍Spark源码学习之KV-RDD的常见算子(1)前言源码,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

文章目录

  • 前言
  • 源码
    • countByKey()
    • reduceByKey()
    • foldByKey
    • aggregateByKey
    • groupByKey
    • combineByKey

前言

之前讨论了非KV-RDD常见算子的一些实现,这次来看看KV-RDD。由于算子过多,本文只展示ByKey的常见算子。
在这里插入图片描述

同样,本文侧重的是看这些算子之间的调用关系,从上图可以发现一些特点。

  1. 分组聚合类函数最终都调用到了combineByKeyWithClassTag
  2. 多数算子都有简单版本指定分区个数的版本:
    简单版本通过defaultPartitioner调用指定分区类型的版本
    指定分区个数通过new HashPartitioner(numPartitions)调用指定分区类型的版本。
    指定分区类型的版本最终调用到combineByKeyWithClassTag

源码

下面的源码取自2.0.2版本,和1.x可能有较大差异。

countByKey()

其实就是用reduceByKey累加实现的

  def countByKey(): Map[K, Long] = self.withScope {
    self.mapValues(_ => 1L).reduceByKey(_ + _).collect().toMap
  }

mapValues本来的V都映射成了1,然后就像wordcount一样累加就行。

reduceByKey()

之前的特点2,不太懂,仔细看看下面几个函数就好理解了

def reduceByKey(func: (V, V) => V): RDD[(K, V)] = self.withScope {
    reduceByKey(defaultPartitioner(self), func)
  }//简单版本的添加了分区类型

def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)] = self.withScope {
    reduceByKey(new HashPartitioner(numPartitions), func)
  }//指定分区个数,用HashPartitioner

def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = self.withScope {
    combineByKeyWithClassTag[V]((v: V) => v, func, func, partitioner)
  }//调用combineByKeyWithClassTag

foldByKey

reduceByKey都很熟悉,这个可能没那么熟悉了。

用过Scala的reduceLeft的话,应该知道reduceLeft是foldLeft实现的:reduceLeft调用了foldLeft,初始值为集合最左的元素。

foldByKey其实就是比reduceByKey多了个初始值zeroValue而已。用这个初始值开始和集合的元素进行聚合。

  def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)] = self.withScope {
    foldByKey(zeroValue, defaultPartitioner(self))(func)
  }//简单版本

另外两个重载的函数也满足特点2

def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)] = self.withScope {
    foldByKey(zeroValue, defaultPartitioner(self))(func)
  }//指定分区个数的调用指定分区类型
def foldByKey(
      zeroValue: V,
      partitioner: Partitioner)(func: (V, V) => V): RDD[(K, V)] = self.withScope {
    // Serialize the zero value to a byte array so that we can get a new clone of it on each key
    val zeroBuffer = SparkEnv.get.serializer.newInstance().serialize(zeroValue)
    val zeroArray = new Array[Byte](zeroBuffer.limit)
    zeroBuffer.get(zeroArray)

    // When deserializing, use a lazy val to create just one instance of the serializer per task
    lazy val cachedSerializer = SparkEnv.get.serializer.newInstance()
    val createZero = () => cachedSerializer.deserialize[V](ByteBuffer.wrap(zeroArray))

    val cleanedFunc = self.context.clean(func)
    combineByKeyWithClassTag[V]((v: V) => cleanedFunc(createZero(), v),
      cleanedFunc, cleanedFunc, partitioner)
  }//指定分区类型,调用combineByKeyWithClassTag

aggregateByKey

这个比foldByKey还要复杂,需要对Spark运行原理有一了解。除了有初始值zeroValue,合并函数变为了两个:一个是分区内部的合并、另一个是分区之间的合并。

  def aggregateByKey[U: ClassTag](zeroValue: U)(seqOp: (U, V) => U,
      combOp: (U, U) => U): RDD[(K, U)] = self.withScope {
    aggregateByKey(zeroValue, defaultPartitioner(self))(seqOp, combOp)
  }

另外两个依旧遵循特点2

  def aggregateByKey[U: ClassTag](zeroValue: U, numPartitions: Int)(seqOp: (U, V) => U,
      combOp: (U, U) => U): RDD[(K, U)] = self.withScope {
    aggregateByKey(zeroValue, new HashPartitioner(numPartitions))(seqOp, combOp)
  }
 def aggregateByKey[U: ClassTag](zeroValue: U, partitioner: Partitioner)(seqOp: (U, V) => U,
      combOp: (U, U) => U): RDD[(K, U)] = self.withScope {
    // Serialize the zero value to a byte array so that we can get a new clone of it on each key
    val zeroBuffer = SparkEnv.get.serializer.newInstance().serialize(zeroValue)
    val zeroArray = new Array[Byte](zeroBuffer.limit)
    zeroBuffer.get(zeroArray)

    lazy val cachedSerializer = SparkEnv.get.serializer.newInstance()
    val createZero = () => cachedSerializer.deserialize[U](ByteBuffer.wrap(zeroArray))

    // We will clean the combiner closure later in `combineByKey`
    val cleanedSeqOp = self.context.clean(seqOp)
    combineByKeyWithClassTag[U]((v: V) => cleanedSeqOp(createZero(), v),
      cleanedSeqOp, combOp, partitioner)
  }

前三个算子有一个共性,就是聚合的时候,类型不会改变(源码中的体现是(U, U) => U)。
但是下面的算子,则会将值合并成集合而不是原的类型。

groupByKey

分组函数应该很熟悉了,值按Key被合并到了集合中。

 def groupByKey(): RDD[(K, Iterable[V])] = self.withScope {
    groupByKey(defaultPartitioner(self))
  }

同样满足特点2

def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])] = self.withScope {
    groupByKey(new HashPartitioner(numPartitions))
  }
def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])] = self.withScope {
    // groupByKey shouldn't use map side combine because map side combine does not
    // reduce the amount of data shuffled and requires all map side data be inserted
    // into a hash table, leading to more objects in the old gen.
    val createCombiner = (v: V) => CompactBuffer(v)
    val mergeValue = (buf: CompactBuffer[V], v: V) => buf += v
    val mergeCombiners = (c1: CompactBuffer[V], c2: CompactBuffer[V]) => c1 ++= c2
    val bufs = combineByKeyWithClassTag[CompactBuffer[V]](
      createCombiner, mergeValue, mergeCombiners, partitioner, mapSideCombine = false)
    bufs.asInstanceOf[RDD[(K, Iterable[V])]]
  }

combineByKey

理解了前面的,这个也好理解。createCombiner类似zeroValue,创建初始集合;mergeValue合并分区内的值到集合里,mergeCombiners分区结果已经是集合,对集合进行合并。

  def combineByKey[C](
      createCombiner: V => C,
      mergeValue: (C, V) => C,
      mergeCombiners: (C, C) => C): RDD[(K, C)] = self.withScope {
    combineByKeyWithClassTag(createCombiner, mergeValue, mergeCombiners)(null)
  }

其实早期版本的Spark就是用这个实现的groupByKey,源码如下。

class PairRDDFunctions[K, V](self: RDD[(K, V)])
    (implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K] = null)
  extends Logging
  with SparkHadoopMapReduceUtil
  with Serializable {
    def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])] = {
        val createCombiner = (v: V) => CompactBuffer(v)
        val mergeValue = (buf: CompactBuffer[V], v: V) => buf += v
        val mergeCombiners = (c1: CompactBuffer[V], c2: CompactBuffer[V]) => c1 ++= c2
        val bufs = combineByKey[CompactBuffer[V]](
          createCombiner, mergeValue, mergeCombiners, partitioner, mapSideCombine=false)
        bufs.asInstanceOf[RDD[(K, Iterable[V])]]
      }
}

分组聚合的看完了,还剩下最后一个抽样函数

def sampleByKey(withReplacement: Boolean,
      fractions: Map[K, Double],
      seed: Long = Utils.random.nextLong): RDD[(K, V)] = self.withScope {

    require(fractions.values.forall(v => v >= 0.0), "Negative sampling rates.")

    val samplingFunc = if (withReplacement) {
      StratifiedSamplingUtils.getPoissonSamplingFunction(self, fractions, false, seed)
    } else {
      StratifiedSamplingUtils.getBernoulliSamplingFunction(self, fractions, false, seed)
    }
    self.mapPartitionsWithIndex(samplingFunc, preservesPartitioning = true)
  }

除了这些,KV-RDD还有一类JOIN算子也很常见,下篇博客再进行讨论。

最后

以上就是刻苦导师为你收集整理的Spark源码学习之KV-RDD的常见算子(1)前言源码的全部内容,希望文章能够帮你解决Spark源码学习之KV-RDD的常见算子(1)前言源码所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(41)

评论列表共有 0 条评论

立即
投稿
返回
顶部