我是靠谱客的博主 刻苦导师,这篇文章主要介绍Spark源码学习之KV-RDD的常见算子(1)前言源码,现在分享给大家,希望可以做个参考。

文章目录

  • 前言
  • 源码
    • countByKey()
    • reduceByKey()
    • foldByKey
    • aggregateByKey
    • groupByKey
    • combineByKey

前言

之前讨论了非KV-RDD常见算子的一些实现,这次来看看KV-RDD。由于算子过多,本文只展示ByKey的常见算子。
在这里插入图片描述

同样,本文侧重的是看这些算子之间的调用关系,从上图可以发现一些特点。

  1. 分组聚合类函数最终都调用到了combineByKeyWithClassTag
  2. 多数算子都有简单版本指定分区个数的版本:
    简单版本通过defaultPartitioner调用指定分区类型的版本
    指定分区个数通过new HashPartitioner(numPartitions)调用指定分区类型的版本。
    指定分区类型的版本最终调用到combineByKeyWithClassTag

源码

下面的源码取自2.0.2版本,和1.x可能有较大差异。

countByKey()

其实就是用reduceByKey累加实现的

复制代码
1
2
3
4
def countByKey(): Map[K, Long] = self.withScope { self.mapValues(_ => 1L).reduceByKey(_ + _).collect().toMap }

mapValues本来的V都映射成了1,然后就像wordcount一样累加就行。

reduceByKey()

之前的特点2,不太懂,仔细看看下面几个函数就好理解了

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
def reduceByKey(func: (V, V) => V): RDD[(K, V)] = self.withScope { reduceByKey(defaultPartitioner(self), func) }//简单版本的添加了分区类型 def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)] = self.withScope { reduceByKey(new HashPartitioner(numPartitions), func) }//指定分区个数,用HashPartitioner def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = self.withScope { combineByKeyWithClassTag[V]((v: V) => v, func, func, partitioner) }//调用combineByKeyWithClassTag

foldByKey

reduceByKey都很熟悉,这个可能没那么熟悉了。

用过Scala的reduceLeft的话,应该知道reduceLeft是foldLeft实现的:reduceLeft调用了foldLeft,初始值为集合最左的元素。

foldByKey其实就是比reduceByKey多了个初始值zeroValue而已。用这个初始值开始和集合的元素进行聚合。

复制代码
1
2
3
4
def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)] = self.withScope { foldByKey(zeroValue, defaultPartitioner(self))(func) }//简单版本

另外两个重载的函数也满足特点2

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)] = self.withScope { foldByKey(zeroValue, defaultPartitioner(self))(func) }//指定分区个数的调用指定分区类型 def foldByKey( zeroValue: V, partitioner: Partitioner)(func: (V, V) => V): RDD[(K, V)] = self.withScope { // Serialize the zero value to a byte array so that we can get a new clone of it on each key val zeroBuffer = SparkEnv.get.serializer.newInstance().serialize(zeroValue) val zeroArray = new Array[Byte](zeroBuffer.limit) zeroBuffer.get(zeroArray) // When deserializing, use a lazy val to create just one instance of the serializer per task lazy val cachedSerializer = SparkEnv.get.serializer.newInstance() val createZero = () => cachedSerializer.deserialize[V](ByteBuffer.wrap(zeroArray)) val cleanedFunc = self.context.clean(func) combineByKeyWithClassTag[V]((v: V) => cleanedFunc(createZero(), v), cleanedFunc, cleanedFunc, partitioner) }//指定分区类型,调用combineByKeyWithClassTag

aggregateByKey

这个比foldByKey还要复杂,需要对Spark运行原理有一了解。除了有初始值zeroValue,合并函数变为了两个:一个是分区内部的合并、另一个是分区之间的合并。

复制代码
1
2
3
4
5
def aggregateByKey[U: ClassTag](zeroValue: U)(seqOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)] = self.withScope { aggregateByKey(zeroValue, defaultPartitioner(self))(seqOp, combOp) }

另外两个依旧遵循特点2

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
def aggregateByKey[U: ClassTag](zeroValue: U, numPartitions: Int)(seqOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)] = self.withScope { aggregateByKey(zeroValue, new HashPartitioner(numPartitions))(seqOp, combOp) } def aggregateByKey[U: ClassTag](zeroValue: U, partitioner: Partitioner)(seqOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)] = self.withScope { // Serialize the zero value to a byte array so that we can get a new clone of it on each key val zeroBuffer = SparkEnv.get.serializer.newInstance().serialize(zeroValue) val zeroArray = new Array[Byte](zeroBuffer.limit) zeroBuffer.get(zeroArray) lazy val cachedSerializer = SparkEnv.get.serializer.newInstance() val createZero = () => cachedSerializer.deserialize[U](ByteBuffer.wrap(zeroArray)) // We will clean the combiner closure later in `combineByKey` val cleanedSeqOp = self.context.clean(seqOp) combineByKeyWithClassTag[U]((v: V) => cleanedSeqOp(createZero(), v), cleanedSeqOp, combOp, partitioner) }

前三个算子有一个共性,就是聚合的时候,类型不会改变(源码中的体现是(U, U) => U)。
但是下面的算子,则会将值合并成集合而不是原的类型。

groupByKey

分组函数应该很熟悉了,值按Key被合并到了集合中。

复制代码
1
2
3
4
def groupByKey(): RDD[(K, Iterable[V])] = self.withScope { groupByKey(defaultPartitioner(self)) }

同样满足特点2

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])] = self.withScope { groupByKey(new HashPartitioner(numPartitions)) } def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])] = self.withScope { // groupByKey shouldn't use map side combine because map side combine does not // reduce the amount of data shuffled and requires all map side data be inserted // into a hash table, leading to more objects in the old gen. val createCombiner = (v: V) => CompactBuffer(v) val mergeValue = (buf: CompactBuffer[V], v: V) => buf += v val mergeCombiners = (c1: CompactBuffer[V], c2: CompactBuffer[V]) => c1 ++= c2 val bufs = combineByKeyWithClassTag[CompactBuffer[V]]( createCombiner, mergeValue, mergeCombiners, partitioner, mapSideCombine = false) bufs.asInstanceOf[RDD[(K, Iterable[V])]] }

combineByKey

理解了前面的,这个也好理解。createCombiner类似zeroValue,创建初始集合;mergeValue合并分区内的值到集合里,mergeCombiners分区结果已经是集合,对集合进行合并。

复制代码
1
2
3
4
5
6
7
def combineByKey[C]( createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C): RDD[(K, C)] = self.withScope { combineByKeyWithClassTag(createCombiner, mergeValue, mergeCombiners)(null) }

其实早期版本的Spark就是用这个实现的groupByKey,源码如下。

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
class PairRDDFunctions[K, V](self: RDD[(K, V)]) (implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K] = null) extends Logging with SparkHadoopMapReduceUtil with Serializable { def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])] = { val createCombiner = (v: V) => CompactBuffer(v) val mergeValue = (buf: CompactBuffer[V], v: V) => buf += v val mergeCombiners = (c1: CompactBuffer[V], c2: CompactBuffer[V]) => c1 ++= c2 val bufs = combineByKey[CompactBuffer[V]]( createCombiner, mergeValue, mergeCombiners, partitioner, mapSideCombine=false) bufs.asInstanceOf[RDD[(K, Iterable[V])]] } }

分组聚合的看完了,还剩下最后一个抽样函数

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
def sampleByKey(withReplacement: Boolean, fractions: Map[K, Double], seed: Long = Utils.random.nextLong): RDD[(K, V)] = self.withScope { require(fractions.values.forall(v => v >= 0.0), "Negative sampling rates.") val samplingFunc = if (withReplacement) { StratifiedSamplingUtils.getPoissonSamplingFunction(self, fractions, false, seed) } else { StratifiedSamplingUtils.getBernoulliSamplingFunction(self, fractions, false, seed) } self.mapPartitionsWithIndex(samplingFunc, preservesPartitioning = true) }

除了这些,KV-RDD还有一类JOIN算子也很常见,下篇博客再进行讨论。

最后

以上就是刻苦导师最近收集整理的关于Spark源码学习之KV-RDD的常见算子(1)前言源码的全部内容,更多相关Spark源码学习之KV-RDD内容请搜索靠谱客的其他文章。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(56)

评论列表共有 0 条评论

立即
投稿
返回
顶部