文章目录
- 前言
- 源码
- countByKey()
- reduceByKey()
- foldByKey
- aggregateByKey
- groupByKey
- combineByKey
前言
之前讨论了非KV-RDD常见算子的一些实现,这次来看看KV-RDD。由于算子过多,本文只展示ByKey的常见算子。
同样,本文侧重的是看这些算子之间的调用关系,从上图可以发现一些特点。
- 分组聚合类函数最终都调用到了
combineByKeyWithClassTag
- 多数算子都有简单版本和指定分区个数的版本:
简单版本通过defaultPartitioner
调用指定分区类型的版本
指定分区个数通过new HashPartitioner(numPartitions)
调用指定分区类型的版本。
指定分区类型的版本最终调用到combineByKeyWithClassTag
源码
下面的源码取自2.0.2版本,和1.x可能有较大差异。
countByKey()
其实就是用reduceByKey累加实现的
1
2
3
4def countByKey(): Map[K, Long] = self.withScope { self.mapValues(_ => 1L).reduceByKey(_ + _).collect().toMap }
mapValues本来的V都映射成了1,然后就像wordcount一样累加就行。
reduceByKey()
之前的特点2,不太懂,仔细看看下面几个函数就好理解了
1
2
3
4
5
6
7
8
9
10
11
12def reduceByKey(func: (V, V) => V): RDD[(K, V)] = self.withScope { reduceByKey(defaultPartitioner(self), func) }//简单版本的添加了分区类型 def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)] = self.withScope { reduceByKey(new HashPartitioner(numPartitions), func) }//指定分区个数,用HashPartitioner def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = self.withScope { combineByKeyWithClassTag[V]((v: V) => v, func, func, partitioner) }//调用combineByKeyWithClassTag
foldByKey
reduceByKey
都很熟悉,这个可能没那么熟悉了。
用过Scala的reduceLeft的话,应该知道reduceLeft是foldLeft实现的:reduceLeft调用了foldLeft,初始值为集合最左的元素。
foldByKey
其实就是比reduceByKey
多了个初始值zeroValue
而已。用这个初始值开始和集合的元素进行聚合。
1
2
3
4def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)] = self.withScope { foldByKey(zeroValue, defaultPartitioner(self))(func) }//简单版本
另外两个重载的函数也满足特点2
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)] = self.withScope { foldByKey(zeroValue, defaultPartitioner(self))(func) }//指定分区个数的调用指定分区类型 def foldByKey( zeroValue: V, partitioner: Partitioner)(func: (V, V) => V): RDD[(K, V)] = self.withScope { // Serialize the zero value to a byte array so that we can get a new clone of it on each key val zeroBuffer = SparkEnv.get.serializer.newInstance().serialize(zeroValue) val zeroArray = new Array[Byte](zeroBuffer.limit) zeroBuffer.get(zeroArray) // When deserializing, use a lazy val to create just one instance of the serializer per task lazy val cachedSerializer = SparkEnv.get.serializer.newInstance() val createZero = () => cachedSerializer.deserialize[V](ByteBuffer.wrap(zeroArray)) val cleanedFunc = self.context.clean(func) combineByKeyWithClassTag[V]((v: V) => cleanedFunc(createZero(), v), cleanedFunc, cleanedFunc, partitioner) }//指定分区类型,调用combineByKeyWithClassTag
aggregateByKey
这个比foldByKey
还要复杂,需要对Spark运行原理有一了解。除了有初始值zeroValue
,合并函数变为了两个:一个是分区内部的合并、另一个是分区之间的合并。
1
2
3
4
5def aggregateByKey[U: ClassTag](zeroValue: U)(seqOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)] = self.withScope { aggregateByKey(zeroValue, defaultPartitioner(self))(seqOp, combOp) }
另外两个依旧遵循特点2
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20def aggregateByKey[U: ClassTag](zeroValue: U, numPartitions: Int)(seqOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)] = self.withScope { aggregateByKey(zeroValue, new HashPartitioner(numPartitions))(seqOp, combOp) } def aggregateByKey[U: ClassTag](zeroValue: U, partitioner: Partitioner)(seqOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)] = self.withScope { // Serialize the zero value to a byte array so that we can get a new clone of it on each key val zeroBuffer = SparkEnv.get.serializer.newInstance().serialize(zeroValue) val zeroArray = new Array[Byte](zeroBuffer.limit) zeroBuffer.get(zeroArray) lazy val cachedSerializer = SparkEnv.get.serializer.newInstance() val createZero = () => cachedSerializer.deserialize[U](ByteBuffer.wrap(zeroArray)) // We will clean the combiner closure later in `combineByKey` val cleanedSeqOp = self.context.clean(seqOp) combineByKeyWithClassTag[U]((v: V) => cleanedSeqOp(createZero(), v), cleanedSeqOp, combOp, partitioner) }
前三个算子有一个共性,就是聚合的时候,类型不会改变(源码中的体现是(U, U) => U)。
但是下面的算子,则会将值合并成集合而不是原的类型。
groupByKey
分组函数应该很熟悉了,值按Key被合并到了集合中。
1
2
3
4def groupByKey(): RDD[(K, Iterable[V])] = self.withScope { groupByKey(defaultPartitioner(self)) }
同样满足特点2
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])] = self.withScope { groupByKey(new HashPartitioner(numPartitions)) } def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])] = self.withScope { // groupByKey shouldn't use map side combine because map side combine does not // reduce the amount of data shuffled and requires all map side data be inserted // into a hash table, leading to more objects in the old gen. val createCombiner = (v: V) => CompactBuffer(v) val mergeValue = (buf: CompactBuffer[V], v: V) => buf += v val mergeCombiners = (c1: CompactBuffer[V], c2: CompactBuffer[V]) => c1 ++= c2 val bufs = combineByKeyWithClassTag[CompactBuffer[V]]( createCombiner, mergeValue, mergeCombiners, partitioner, mapSideCombine = false) bufs.asInstanceOf[RDD[(K, Iterable[V])]] }
combineByKey
理解了前面的,这个也好理解。createCombiner
类似zeroValue
,创建初始集合;mergeValue
合并分区内的值到集合里,mergeCombiners
分区结果已经是集合,对集合进行合并。
1
2
3
4
5
6
7def combineByKey[C]( createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C): RDD[(K, C)] = self.withScope { combineByKeyWithClassTag(createCombiner, mergeValue, mergeCombiners)(null) }
其实早期版本的Spark就是用这个实现的groupByKey
,源码如下。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15class PairRDDFunctions[K, V](self: RDD[(K, V)]) (implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K] = null) extends Logging with SparkHadoopMapReduceUtil with Serializable { def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])] = { val createCombiner = (v: V) => CompactBuffer(v) val mergeValue = (buf: CompactBuffer[V], v: V) => buf += v val mergeCombiners = (c1: CompactBuffer[V], c2: CompactBuffer[V]) => c1 ++= c2 val bufs = combineByKey[CompactBuffer[V]]( createCombiner, mergeValue, mergeCombiners, partitioner, mapSideCombine=false) bufs.asInstanceOf[RDD[(K, Iterable[V])]] } }
分组聚合的看完了,还剩下最后一个抽样函数
1
2
3
4
5
6
7
8
9
10
11
12
13
14def sampleByKey(withReplacement: Boolean, fractions: Map[K, Double], seed: Long = Utils.random.nextLong): RDD[(K, V)] = self.withScope { require(fractions.values.forall(v => v >= 0.0), "Negative sampling rates.") val samplingFunc = if (withReplacement) { StratifiedSamplingUtils.getPoissonSamplingFunction(self, fractions, false, seed) } else { StratifiedSamplingUtils.getBernoulliSamplingFunction(self, fractions, false, seed) } self.mapPartitionsWithIndex(samplingFunc, preservesPartitioning = true) }
除了这些,KV-RDD还有一类JOIN算子也很常见,下篇博客再进行讨论。
最后
以上就是刻苦导师最近收集整理的关于Spark源码学习之KV-RDD的常见算子(1)前言源码的全部内容,更多相关Spark源码学习之KV-RDD内容请搜索靠谱客的其他文章。
发表评论 取消回复