概述
一、cache和persist的区别
昨天面试被问到了cache和persist区别,当时只记得是其中一个调用了另一个,但没有回答出二者的不同,所以回来后重新看了源码,算是弄清楚它们的区别了。
cache和persist都是用于将一个RDD进行缓存的,这样在之后使用的过程中就不需要重新计算了,可以大大节省程序运行时间。
cache和persist的区别
基于Spark 1.4.1 的源码,可以看到
/** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
def cache(): this.type = persist()
- 1
- 2
- 3
说明是cache()调用了persist(), 想要知道二者的不同还需要看一下persist函数:
/** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
def persist(): this.type = persist(StorageLevel.MEMORY_ONLY)
- 1
- 2
- 3
可以看到persist()内部调用了persist(StorageLevel.MEMORY_ONLY),继续深入:
/**
* Set this RDD's storage level to persist its values across operations after the first time
* it is computed. This can only be used to assign a new storage level if the RDD does not
* have a storage level set yet..
*/
def persist(newLevel: StorageLevel): this.type = {
// TODO: Handle changes of StorageLevel
if (storageLevel != StorageLevel.NONE && newLevel != storageLevel) {
throw new UnsupportedOperationException(
"Cannot change storage level of an RDD after it was already assigned a level")
}
sc.persistRDD(this)
// Register the RDD with the ContextCleaner for automatic GC-based cleanup
sc.cleaner.foreach(_.registerRDDForCleanup(this))
storageLevel = newLevel
this
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
可以看出来persist有一个 StorageLevel 类型的参数,这个表示的是RDD的缓存级别。
至此便可得出cache和persist的区别了:cache只有一个默认的缓存级别MEMORY_ONLY ,而persist可以根据情况设置其它的缓存级别。
RDD的缓存级别
顺便看一下RDD都有哪些缓存级别,查看 StorageLevel 类的源码:
object StorageLevel {
val NONE = new StorageLevel(false, false, false, false)
val DISK_ONLY = new StorageLevel(true, false, false, false)
val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
val MEMORY_ONLY = new StorageLevel(false, true, false, true)
val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
val OFF_HEAP = new StorageLevel(false, false, true, false)
......
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
可以看到这里列出了12种缓存级别,但这些有什么区别呢?可以看到每个缓存级别后面都跟了一个StorageLevel的构造函数,里面包含了4个或5个参数,如下
val MEMORY_ONLY = new StorageLevel(false, true, false, true)
- 1
- 2
查看其构造函数
class StorageLevel private(
private var _useDisk: Boolean,
private var _useMemory: Boolean,
private var _useOffHeap: Boolean,
private var _deserialized: Boolean,
private var _replication: Int = 1)
extends Externalizable {
......
def useDisk: Boolean = _useDisk
def useMemory: Boolean = _useMemory
def useOffHeap: Boolean = _useOffHeap
def deserialized: Boolean = _deserialized
def replication: Int = _replication
......
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
可以看到StorageLevel类的主构造器包含了5个参数:
- useDisk:使用硬盘(外存)
- useMemory:使用内存
- useOffHeap:使用堆外内存,这是Java虚拟机里面的概念,堆外内存意味着把内存对象分配在Java虚拟机的堆以外的内存,这些内存直接受操作系统管理(而不是虚拟机)。这样做的结果就是能保持一个较小的堆,以减少垃圾收集对应用的影响。
- deserialized:反序列化,其逆过程序列化(Serialization)是java提供的一种机制,将对象表示成一连串的字节;而反序列化就表示将字节恢复为对象的过程。序列化是对象永久化的一种机制,可以将对象及其属性保存起来,并能在反序列化后直接恢复这个对象
- replication:备份数(在多个节点上备份)
理解了这5个参数,StorageLevel 的12种缓存级别就不难理解了。
val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2) 就表示使用这种缓存级别的RDD将存储在硬盘以及内存中,使用序列化(在硬盘中),并且在多个节点上备份2份(正常的RDD只有一份)
另外还注意到有一种特殊的缓存级别
val OFF_HEAP = new StorageLevel(false, false, true, false)
- 1
- 2
使用了堆外内存,StorageLevel 类的源码中有一段代码可以看出这个的特殊性,它不能和其它几个参数共存。
if (useOffHeap) {
require(!useDisk, "Off-heap storage level does not support using disk")
require(!useMemory, "Off-heap storage level does not support using heap memory")
require(!deserialized, "Off-heap storage level does not support deserialized storage")
require(replication == 1, "Off-heap storage level does not support multiple replication")
}
二、spark rdd缓存源码解析
1、RDD的缓存(cache和persist)
spark最重要一个能力就是:在不同的操作中把数据集缓存(cache)或存留(persist)在内存(memory)
中。当持久化一个RDD后,每个节点都会把计算的分片的结果保存在内存中,之后可以对此数据集在其他action中
再次使用。这使得后续的action变得迅速(通常快10x)[1].
2、缓存的级别
源码在:package org.apache.spark.storage这个包,官方给出的各个级别的意思如下:
3、cache和persist关系
RDD缓存函数persist和cache关系如下(附录:源码1):
从源码发现其实cache就是persist(StorageLevel.MEMORY_ONLY),也就是说我们选择缓存的时候是把数据缓存在内存中的(memory)。這样加快了数据在再次使用的调用速度。我们再来查看最核心函数:
persist(newLevel: StorageLevel, allowOverride: Boolean),
StorageLevel:存储的级别,
allowOverride:RDD1转换为RDD2之后RDD2是否重写它之前定义的级别
存储部分如下:
// If this is the first time this RDD is marked for persisting, register it // with the SparkContext for cleanups and accounting. Do this only once. if (storageLevel == StorageLevel.NONE) { sc.cleaner.foreach(_.registerRDDForCleanup(this)) sc.persistRDD(this) } storageLevel = newLevel this它的存储是用sparkcontext来进行缓存这个RDD的:第一次调用的时候,清理缓存,然后进行登记,每个cache的RDD只做一次 。
private[spark] def cleaner: Option[ContextCleaner] = _cleaner
private var _cleaner: Option[ContextCleaner] = None
registerRDDForCleanup函数如下:
/** Register a RDD for cleanup when it is garbage collected. */ def registerRDDForCleanup(rdd: RDD[_]): Unit = { registerForCleanup(rdd, CleanRDD(rdd.id)) }之后用sc.persistRDD(this)进行注册:
/** * Register an RDD to be persisted in memory and/or disk storage */ private[spark] def persistRDD(rdd: RDD[_]) { persistentRdds(rdd.id) = rdd }
// Keeps track of all persisted RDDs private[spark] val persistentRdds = { val map: ConcurrentMap[Int, RDD[_]] = new MapMaker().weakValues().makeMap[Int, RDD[_]]() map.asScala }用persistentRdds这个函数进行跟踪所有进行缓存的函数,它就用一个hashmap进行存储的,形式是:[Int, RDD[_]]
类型为:
new MapMaker().weakValues().makeMap[Int, RDD[_]]()
我们知道我们创建一个RDD时候,系统会分配一个id给这个RDD(看http://blog.csdn.net/legotime/article/details/51223572),那么这个[Int, RDD[_]]中的Int存储的就是这个id
4、RDD的存储
我们从调用数据的角度来分析,需要数据的时候,是怎么拿数据的。我们知道真正对RDD进行动刀的是action,也就是说当一个stage的ResultTask触发,我们才进行存储,我们查看ResultTask(源码2),在runTask中发现调用了RDD的iterator,下面我们看看RDD中iterator是什么,源码如下:
/** * Internal method to this RDD; will read from cache if applicable, or otherwise compute it. * This should ''not'' be called by users directly, but is available for implementors of custom * subclasses of RDD. */ final def iterator(split: Partition, context: TaskContext): Iterator[T] = { if (storageLevel != StorageLevel.NONE) { getOrCompute(split, context) } else { computeOrReadCheckpoint(split, context) } }我们发现iterator会判断这个存储级别是否为NONE,NONE的值如下:
val NONE = new StorageLevel(false, false, false, false)
class StorageLevel private( private var _useDisk: Boolean, private var _useMemory: Boolean, private var _useOffHeap: Boolean, private var _deserialized: Boolean, private var _replication: Int = 1) extends Externalizable {/*集合体*/}如果判断这个存储级别不是NONE,也就是说之前已经cache了,那么会调用getOrCompute函数,getOrCompute如下: 首先,我们输入blockid和存储级别(storageLevel):会向blockManager发送请求,是否可以直接拿还是需要计算
private[spark] def getOrCompute(partition: Partition, context: TaskContext): Iterator[T] = { val blockId = RDDBlockId(id, partition.index) //获取RDD的blockID var readCachedBlock = true // This method is called on executors, so we need call SparkEnv.get instead of sc.env. SparkEnv.get.blockManager.getOrElseUpdate(blockId, storageLevel, () => { readCachedBlock = false computeOrReadCheckpoint(partition, context) }) match { case Left(blockResult) => if (readCachedBlock) { val existingMetrics = context.taskMetrics().registerInputMetrics(blockResult.readMethod) existingMetrics.incBytesReadInternal(blockResult.bytes) new InterruptibleIterator[T](context, blockResult.data.asInstanceOf[Iterator[T]]) { override def next(): T = { existingMetrics.incRecordsReadInternal(1) delegate.next() } } } else { new InterruptibleIterator(context, blockResult.data.asInstanceOf[Iterator[T]]) } case Right(iter) => new InterruptibleIterator(context, iter.asInstanceOf[Iterator[T]]) } }
如果存在,那么我们就检索我们的block,如果不存在那么我们就提供` makeiterator `方法,这个方法就是处理:
() => { readCachedBlock = false computeOrReadCheckpoint(partition, context)}
它的作用就是如果我们想要的block不存在,那么就调用computeOrReadCheckpoint(partition, context)这个函数,具体getOrElseUpdate函数如下
def getOrElseUpdate( blockId: BlockId, level: StorageLevel, makeIterator: () => Iterator[Any]): Either[BlockResult, Iterator[Any]] = { // Initially we hold no locks on this block. doPut(blockId, IteratorValues(makeIterator), level, keepReadLock = true) match { //我们调用的block不存在,那么需要计算 case None => // doPut() didn't hand work back to us, so the block already existed or was successfully // stored. Therefore, we now hold a read lock on the block. val blockResult = get(blockId).getOrElse { // Since we held a read lock between the doPut() and get() calls, the block should not // have been evicted, so get() not returning the block indicates some internal error. releaseLock(blockId) throw new SparkException(s"get() failed for block $blockId even though we held a lock") } Left(blockResult) //我们调用的block已经存在了 case Some(failedPutResult) => // The put failed, likely because the data was too large to fit in memory and could not be // dropped to disk. Therefore, we need to pass the input iterator back to the caller so // that they can decide what to do with the values (e.g. process them without caching). Right(failedPutResult.data.left.get) } }
doput是存储管理,代码量很大,所以打算放到存储那块一起分析,不过可以从它返回的信息,来补充我们需要的分
析资料。doput的返回的信息如下:
* @return `Some(PutResult)` if the block did not exist and could not be successfully cached, * or None if the block already existed or was successfully stored (fully consuming * the input data / input iterator).函数
private def doPut( blockId: BlockId, data: BlockValues, level: StorageLevel, tellMaster: Boolean = true, effectiveStorageLevel: Option[StorageLevel] = None, keepReadLock: Boolean = false): Option[PutResult] = {/*集合体*/}
/** * Compute an RDD partition or read it from a checkpoint if the RDD is checkpointing. */ private[spark] def computeOrReadCheckpoint(split: Partition, context: TaskContext): Iterator[T] = { if (isCheckpointedAndMaterialized) { firstParent[T].iterator(split, context) } else { compute(split, context) //从新计算 } }
综上所述,分析到最后如果都没有找到我们所需,那么就会先看看有没有checkpoint,最后在重新计算我们所需。下面是寻找数据的流程图:
附录
源码1
/** * Mark this RDD for persisting using the specified level. * * @param newLevel the target storage level * @param allowOverride whether to override any existing level with the new one */ private def persist(newLevel: StorageLevel, allowOverride: Boolean): this.type = { // TODO: Handle changes of StorageLevel if (storageLevel != StorageLevel.NONE && newLevel != storageLevel && !allowOverride) { throw new UnsupportedOperationException( "Cannot change storage level of an RDD after it was already assigned a level") } // If this is the first time this RDD is marked for persisting, register it // with the SparkContext for cleanups and accounting. Do this only once. if (storageLevel == StorageLevel.NONE) { sc.cleaner.foreach(_.registerRDDForCleanup(this)) sc.persistRDD(this) } storageLevel = newLevel this } /** * Set this RDD's storage level to persist its values across operations after the first time * it is computed. This can only be used to assign a new storage level if the RDD does not * have a storage level set yet. Local checkpointing is an exception. */ def persist(newLevel: StorageLevel): this.type = { if (isLocallyCheckpointed) { // This means the user previously called localCheckpoint(), which should have already // marked this RDD for persisting. Here we should override the old storage level with // one that is explicitly requested by the user (after adapting it to use disk). persist(LocalRDDCheckpointData.transformStorageLevel(newLevel), allowOverride = true) } else { persist(newLevel, allowOverride = false) } } /** Persist this RDD with the default storage level (`MEMORY_ONLY`). */ def persist(): this.type = persist(StorageLevel.MEMORY_ONLY) /** Persist this RDD with the default storage level (`MEMORY_ONLY`). */ def cache(): this.type = persist() /** * Mark the RDD as non-persistent, and remove all blocks for it from memory and disk. * * @param blocking Whether to block until all blocks are deleted. * @return This RDD. */ def unpersist(blocking: Boolean = true): this.type = { logInfo("Removing RDD " + id + " from persistence list") sc.unpersistRDD(id, blocking) storageLevel = StorageLevel.NONE this } /** Get the RDD's current storage level, or StorageLevel.NONE if none is set. */ def getStorageLevel: StorageLevel = storageLevel
源码2
private[spark] class ResultTask[T, U]( stageId: Int, stageAttemptId: Int, taskBinary: Broadcast[Array[Byte]], partition: Partition, locs: Seq[TaskLocation], val outputId: Int, _initialAccums: Seq[Accumulator[_]] = InternalAccumulator.createAll()) extends Task[U](stageId, stageAttemptId, partition.index, _initialAccums) with Serializable { @transient private[this] val preferredLocs: Seq[TaskLocation] = { if (locs == null) Nil else locs.toSet.toSeq } override def runTask(context: TaskContext): U = { // Deserialize the RDD and the func using the broadcast variables. val deserializeStartTime = System.currentTimeMillis() val ser = SparkEnv.get.closureSerializer.newInstance() val (rdd, func) = ser.deserialize[(RDD[T], (TaskContext, Iterator[T]) => U)]( ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader) _executorDeserializeTime = System.currentTimeMillis() - deserializeStartTime metrics = Some(context.taskMetrics) func(context, rdd.iterator(partition, context)) } // This is only callable on the driver side. override def preferredLocations: Seq[TaskLocation] = preferredLocs override def toString: String = "ResultTask(" + stageId + ", " + partitionId + ")" }
参考文献:
http://spark.apache.org/docs/latest/programming-guide.html#rdd-persistence
最后
以上就是忧心河马为你收集整理的spark中cache和persist的区别,rdd缓存源码解析一、cache和persist的区别二、spark rdd缓存源码解析的全部内容,希望文章能够帮你解决spark中cache和persist的区别,rdd缓存源码解析一、cache和persist的区别二、spark rdd缓存源码解析所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复