概述
文章目录
- 前言
- 源码
- countByKey()
- reduceByKey()
- foldByKey
- aggregateByKey
- groupByKey
- combineByKey
前言
之前讨论了非KV-RDD常见算子的一些实现,这次来看看KV-RDD。由于算子过多,本文只展示ByKey的常见算子。
同样,本文侧重的是看这些算子之间的调用关系,从上图可以发现一些特点。
- 分组聚合类函数最终都调用到了
combineByKeyWithClassTag
- 多数算子都有简单版本和指定分区个数的版本:
简单版本通过defaultPartitioner
调用指定分区类型的版本
指定分区个数通过new HashPartitioner(numPartitions)
调用指定分区类型的版本。
指定分区类型的版本最终调用到combineByKeyWithClassTag
源码
下面的源码取自2.0.2版本,和1.x可能有较大差异。
countByKey()
其实就是用reduceByKey累加实现的
def countByKey(): Map[K, Long] = self.withScope {
self.mapValues(_ => 1L).reduceByKey(_ + _).collect().toMap
}
mapValues本来的V都映射成了1,然后就像wordcount一样累加就行。
reduceByKey()
之前的特点2,不太懂,仔细看看下面几个函数就好理解了
def reduceByKey(func: (V, V) => V): RDD[(K, V)] = self.withScope {
reduceByKey(defaultPartitioner(self), func)
}//简单版本的添加了分区类型
def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)] = self.withScope {
reduceByKey(new HashPartitioner(numPartitions), func)
}//指定分区个数,用HashPartitioner
def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = self.withScope {
combineByKeyWithClassTag[V]((v: V) => v, func, func, partitioner)
}//调用combineByKeyWithClassTag
foldByKey
reduceByKey
都很熟悉,这个可能没那么熟悉了。
用过Scala的reduceLeft的话,应该知道reduceLeft是foldLeft实现的:reduceLeft调用了foldLeft,初始值为集合最左的元素。
foldByKey
其实就是比reduceByKey
多了个初始值zeroValue
而已。用这个初始值开始和集合的元素进行聚合。
def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)] = self.withScope {
foldByKey(zeroValue, defaultPartitioner(self))(func)
}//简单版本
另外两个重载的函数也满足特点2
def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)] = self.withScope {
foldByKey(zeroValue, defaultPartitioner(self))(func)
}//指定分区个数的调用指定分区类型
def foldByKey(
zeroValue: V,
partitioner: Partitioner)(func: (V, V) => V): RDD[(K, V)] = self.withScope {
// Serialize the zero value to a byte array so that we can get a new clone of it on each key
val zeroBuffer = SparkEnv.get.serializer.newInstance().serialize(zeroValue)
val zeroArray = new Array[Byte](zeroBuffer.limit)
zeroBuffer.get(zeroArray)
// When deserializing, use a lazy val to create just one instance of the serializer per task
lazy val cachedSerializer = SparkEnv.get.serializer.newInstance()
val createZero = () => cachedSerializer.deserialize[V](ByteBuffer.wrap(zeroArray))
val cleanedFunc = self.context.clean(func)
combineByKeyWithClassTag[V]((v: V) => cleanedFunc(createZero(), v),
cleanedFunc, cleanedFunc, partitioner)
}//指定分区类型,调用combineByKeyWithClassTag
aggregateByKey
这个比foldByKey
还要复杂,需要对Spark运行原理有一了解。除了有初始值zeroValue
,合并函数变为了两个:一个是分区内部的合并、另一个是分区之间的合并。
def aggregateByKey[U: ClassTag](zeroValue: U)(seqOp: (U, V) => U,
combOp: (U, U) => U): RDD[(K, U)] = self.withScope {
aggregateByKey(zeroValue, defaultPartitioner(self))(seqOp, combOp)
}
另外两个依旧遵循特点2
def aggregateByKey[U: ClassTag](zeroValue: U, numPartitions: Int)(seqOp: (U, V) => U,
combOp: (U, U) => U): RDD[(K, U)] = self.withScope {
aggregateByKey(zeroValue, new HashPartitioner(numPartitions))(seqOp, combOp)
}
def aggregateByKey[U: ClassTag](zeroValue: U, partitioner: Partitioner)(seqOp: (U, V) => U,
combOp: (U, U) => U): RDD[(K, U)] = self.withScope {
// Serialize the zero value to a byte array so that we can get a new clone of it on each key
val zeroBuffer = SparkEnv.get.serializer.newInstance().serialize(zeroValue)
val zeroArray = new Array[Byte](zeroBuffer.limit)
zeroBuffer.get(zeroArray)
lazy val cachedSerializer = SparkEnv.get.serializer.newInstance()
val createZero = () => cachedSerializer.deserialize[U](ByteBuffer.wrap(zeroArray))
// We will clean the combiner closure later in `combineByKey`
val cleanedSeqOp = self.context.clean(seqOp)
combineByKeyWithClassTag[U]((v: V) => cleanedSeqOp(createZero(), v),
cleanedSeqOp, combOp, partitioner)
}
前三个算子有一个共性,就是聚合的时候,类型不会改变(源码中的体现是(U, U) => U)。
但是下面的算子,则会将值合并成集合而不是原的类型。
groupByKey
分组函数应该很熟悉了,值按Key被合并到了集合中。
def groupByKey(): RDD[(K, Iterable[V])] = self.withScope {
groupByKey(defaultPartitioner(self))
}
同样满足特点2
def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])] = self.withScope {
groupByKey(new HashPartitioner(numPartitions))
}
def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])] = self.withScope {
// groupByKey shouldn't use map side combine because map side combine does not
// reduce the amount of data shuffled and requires all map side data be inserted
// into a hash table, leading to more objects in the old gen.
val createCombiner = (v: V) => CompactBuffer(v)
val mergeValue = (buf: CompactBuffer[V], v: V) => buf += v
val mergeCombiners = (c1: CompactBuffer[V], c2: CompactBuffer[V]) => c1 ++= c2
val bufs = combineByKeyWithClassTag[CompactBuffer[V]](
createCombiner, mergeValue, mergeCombiners, partitioner, mapSideCombine = false)
bufs.asInstanceOf[RDD[(K, Iterable[V])]]
}
combineByKey
理解了前面的,这个也好理解。createCombiner
类似zeroValue
,创建初始集合;mergeValue
合并分区内的值到集合里,mergeCombiners
分区结果已经是集合,对集合进行合并。
def combineByKey[C](
createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C): RDD[(K, C)] = self.withScope {
combineByKeyWithClassTag(createCombiner, mergeValue, mergeCombiners)(null)
}
其实早期版本的Spark就是用这个实现的groupByKey
,源码如下。
class PairRDDFunctions[K, V](self: RDD[(K, V)])
(implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K] = null)
extends Logging
with SparkHadoopMapReduceUtil
with Serializable {
def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])] = {
val createCombiner = (v: V) => CompactBuffer(v)
val mergeValue = (buf: CompactBuffer[V], v: V) => buf += v
val mergeCombiners = (c1: CompactBuffer[V], c2: CompactBuffer[V]) => c1 ++= c2
val bufs = combineByKey[CompactBuffer[V]](
createCombiner, mergeValue, mergeCombiners, partitioner, mapSideCombine=false)
bufs.asInstanceOf[RDD[(K, Iterable[V])]]
}
}
分组聚合的看完了,还剩下最后一个抽样函数
def sampleByKey(withReplacement: Boolean,
fractions: Map[K, Double],
seed: Long = Utils.random.nextLong): RDD[(K, V)] = self.withScope {
require(fractions.values.forall(v => v >= 0.0), "Negative sampling rates.")
val samplingFunc = if (withReplacement) {
StratifiedSamplingUtils.getPoissonSamplingFunction(self, fractions, false, seed)
} else {
StratifiedSamplingUtils.getBernoulliSamplingFunction(self, fractions, false, seed)
}
self.mapPartitionsWithIndex(samplingFunc, preservesPartitioning = true)
}
除了这些,KV-RDD还有一类JOIN算子也很常见,下篇博客再进行讨论。
最后
以上就是刻苦导师为你收集整理的Spark源码学习之KV-RDD的常见算子(1)前言源码的全部内容,希望文章能够帮你解决Spark源码学习之KV-RDD的常见算子(1)前言源码所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复