我是靠谱客的博主 忧心河马,最近开发中收集的这篇文章主要介绍spark中cache和persist的区别,rdd缓存源码解析一、cache和persist的区别二、spark rdd缓存源码解析,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

一、cache和persist的区别

昨天面试被问到了cache和persist区别,当时只记得是其中一个调用了另一个,但没有回答出二者的不同,所以回来后重新看了源码,算是弄清楚它们的区别了。

cache和persist都是用于将一个RDD进行缓存的,这样在之后使用的过程中就不需要重新计算了,可以大大节省程序运行时间。

cache和persist的区别

基于Spark 1.4.1 的源码,可以看到

/** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
def cache(): this.type = persist()
  • 1
  • 2
  • 3

说明是cache()调用了persist(), 想要知道二者的不同还需要看一下persist函数:

/** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
def persist(): this.type = persist(StorageLevel.MEMORY_ONLY)
  • 1
  • 2
  • 3

可以看到persist()内部调用了persist(StorageLevel.MEMORY_ONLY),继续深入:

/**
* Set this RDD's storage level to persist its values across operations after the first time
* it is computed. This can only be used to assign a new storage level if the RDD does not
* have a storage level set yet..
*/
def persist(newLevel: StorageLevel): this.type = {
// TODO: Handle changes of StorageLevel
if (storageLevel != StorageLevel.NONE && newLevel != storageLevel) {
throw new UnsupportedOperationException(
"Cannot change storage level of an RDD after it was already assigned a level")
}
sc.persistRDD(this)
// Register the RDD with the ContextCleaner for automatic GC-based cleanup
sc.cleaner.foreach(_.registerRDDForCleanup(this))
storageLevel = newLevel
this
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18

可以看出来persist有一个 StorageLevel 类型的参数,这个表示的是RDD的缓存级别。

至此便可得出cache和persist的区别了:cache只有一个默认的缓存级别MEMORY_ONLY ,而persist可以根据情况设置其它的缓存级别。

RDD的缓存级别

顺便看一下RDD都有哪些缓存级别,查看 StorageLevel 类的源码:

object StorageLevel {
val NONE = new StorageLevel(false, false, false, false)
val DISK_ONLY = new StorageLevel(true, false, false, false)
val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
val MEMORY_ONLY = new StorageLevel(false, true, false, true)
val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
val OFF_HEAP = new StorageLevel(false, false, true, false)
......
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16

可以看到这里列出了12种缓存级别,但这些有什么区别呢?可以看到每个缓存级别后面都跟了一个StorageLevel的构造函数,里面包含了4个或5个参数,如下

val MEMORY_ONLY = new StorageLevel(false, true, false, true)
  • 1
  • 2

查看其构造函数

class StorageLevel private(
private var _useDisk: Boolean,
private var _useMemory: Boolean,
private var _useOffHeap: Boolean,
private var _deserialized: Boolean,
private var _replication: Int = 1)
extends Externalizable {
......
def useDisk: Boolean = _useDisk
def useMemory: Boolean = _useMemory
def useOffHeap: Boolean = _useOffHeap
def deserialized: Boolean = _deserialized
def replication: Int = _replication
......
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16

可以看到StorageLevel类的主构造器包含了5个参数:

  • useDisk:使用硬盘(外存)
  • useMemory:使用内存
  • useOffHeap:使用堆外内存,这是Java虚拟机里面的概念,堆外内存意味着把内存对象分配在Java虚拟机的堆以外的内存,这些内存直接受操作系统管理(而不是虚拟机)。这样做的结果就是能保持一个较小的堆,以减少垃圾收集对应用的影响。
  • deserialized:反序列化,其逆过程序列化(Serialization)是java提供的一种机制,将对象表示成一连串的字节;而反序列化就表示将字节恢复为对象的过程。序列化是对象永久化的一种机制,可以将对象及其属性保存起来,并能在反序列化后直接恢复这个对象
  • replication:备份数(在多个节点上备份)

理解了这5个参数,StorageLevel 的12种缓存级别就不难理解了。

val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2) 就表示使用这种缓存级别的RDD将存储在硬盘以及内存中,使用序列化(在硬盘中),并且在多个节点上备份2份(正常的RDD只有一份)

另外还注意到有一种特殊的缓存级别

val OFF_HEAP = new StorageLevel(false, false, true, false)
  • 1
  • 2

使用了堆外内存,StorageLevel 类的源码中有一段代码可以看出这个的特殊性,它不能和其它几个参数共存。

if (useOffHeap) {
require(!useDisk, "Off-heap storage level does not support using disk")
require(!useMemory, "Off-heap storage level does not support using heap memory")
require(!deserialized, "Off-heap storage level does not support deserialized storage")
require(replication == 1, "Off-heap storage level does not support multiple replication")
}

二、spark rdd缓存源码解析

1、RDD的缓存(cache和persist)

spark最重要一个能力就是:在不同的操作中把数据集缓存(cache)或存留(persist)在内存(memory)

中。当持久化一个RDD后,每个节点都会把计算的分片的结果保存在内存中,之后可以对此数据集在其他action中

再次使用。这使得后续的action变得迅速(通常快10x)[1].

2、缓存的级别

源码在:package org.apache.spark.storage这个包,官方给出的各个级别的意思如下:


3、cache和persist关系

RDD缓存函数persist和cache关系如下(附录:源码1):

从源码发现其实cache就是persist(StorageLevel.MEMORY_ONLY),也就是说我们选择缓存的时候是把数据缓存在内存中的(memory)。這样加快了数据在再次使用的调用速度。我们再来查看最核心函数:

persist(newLevel: StorageLevel, allowOverride: Boolean),

StorageLevel:存储的级别,

allowOverride:RDD1转换为RDD2之后RDD2是否重写它之前定义的级别

存储部分如下:

// If this is the first time this RDD is marked for persisting, register it
// with the SparkContext for cleanups and accounting. Do this only once.
if (storageLevel == StorageLevel.NONE) {
sc.cleaner.foreach(_.registerRDDForCleanup(this))
sc.persistRDD(this)
}
storageLevel = newLevel
this
它的存储是用sparkcontext来进行缓存这个RDD的:第一次调用的时候,清理缓存,然后进行登记,每个cache的RDD只做一次

private[spark] def cleaner: Option[ContextCleaner] = _cleaner

private var _cleaner: Option[ContextCleaner] = None

registerRDDForCleanup函数如下:

/** Register a RDD for cleanup when it is garbage collected. */
def registerRDDForCleanup(rdd: RDD[_]): Unit = {
registerForCleanup(rdd, CleanRDD(rdd.id))
}
之后用sc.persistRDD(this)进行注册:

/**
 * Register an RDD to be persisted in memory and/or disk storage
 */
private[spark] def persistRDD(rdd: RDD[_]) {
persistentRdds(rdd.id) = rdd
}
// Keeps track of all persisted RDDs
private[spark] val persistentRdds = {
val map: ConcurrentMap[Int, RDD[_]] = new MapMaker().weakValues().makeMap[Int, RDD[_]]()
map.asScala
}
用persistentRdds这个函数进行跟踪所有进行缓存的函数,它就用一个hashmap进行存储的,形式是:[Int, RDD[_]]

类型为:

new MapMaker().weakValues().makeMap[Int, RDD[_]]()

我们知道我们创建一个RDD时候,系统会分配一个id给这个RDD(看http://blog.csdn.net/legotime/article/details/51223572),那么这个[Int, RDD[_]]中的Int存储的就是这个id


4、RDD的存储

我们从调用数据的角度来分析,需要数据的时候,是怎么拿数据的。我们知道真正对RDD进行动刀的是action,也就是说当一个stage的ResultTask触发,我们才进行存储,我们查看ResultTask(源码2),在runTask中发现调用了RDD的iterator,下面我们看看RDD中iterator是什么,源码如下:

/**
 * Internal method to this RDD; will read from cache if applicable, or otherwise compute it.
 * This should ''not'' be called by users directly, but is available for implementors of custom
 * subclasses of RDD.
 */
final def iterator(split: Partition, context: TaskContext): Iterator[T] = {
if (storageLevel != StorageLevel.NONE) {
getOrCompute(split, context)
} else {
computeOrReadCheckpoint(split, context)
}
}
我们发现iterator会判断这个存储级别是否为NONE,NONE的值如下:

val NONE = new StorageLevel(false, false, false, false)
class StorageLevel private(
private var _useDisk: Boolean,

private var _useMemory: Boolean,

private var _useOffHeap: Boolean,

private var _deserialized: Boolean,

private var _replication: Int = 1)
extends Externalizable {/*集合体*/}
如果判断这个存储级别不是NONE,也就是说之前已经cache了,那么会调用getOrCompute函数,getOrCompute如下: 首先,我们输入blockid和存储级别(storageLevel):会向blockManager发送请求,是否可以直接拿还是需要计算

private[spark] def getOrCompute(partition: Partition, context: TaskContext): Iterator[T] = {
val blockId = RDDBlockId(id, partition.index)
//获取RDD的blockID

var readCachedBlock = true

// This method is called on executors, so we need call SparkEnv.get instead of sc.env.

SparkEnv.get.blockManager.getOrElseUpdate(blockId, storageLevel, () => {
readCachedBlock = false

computeOrReadCheckpoint(partition, context)
}) match {
case Left(blockResult) =>
if (readCachedBlock) {
val existingMetrics = context.taskMetrics().registerInputMetrics(blockResult.readMethod)
existingMetrics.incBytesReadInternal(blockResult.bytes)
new InterruptibleIterator[T](context, blockResult.data.asInstanceOf[Iterator[T]]) {
override def next(): T = {
existingMetrics.incRecordsReadInternal(1)
delegate.next()
}
}
} else {
new InterruptibleIterator(context, blockResult.data.asInstanceOf[Iterator[T]])
}
case Right(iter) =>
new InterruptibleIterator(context, iter.asInstanceOf[Iterator[T]])
}
}

如果存在,那么我们就检索我们的block,如果不存在那么我们就提供` makeiterator `方法,这个方法就是处理:

() => {
readCachedBlock = false

computeOrReadCheckpoint(partition, context)}

它的作用就是如果我们想要的block不存在,那么就调用computeOrReadCheckpoint(partition, context)这个函数,具体getOrElseUpdate函数如下

def getOrElseUpdate(
blockId: BlockId,

level: StorageLevel,

makeIterator: () => Iterator[Any]): Either[BlockResult, Iterator[Any]] = {
// Initially we hold no locks on this block.

doPut(blockId, IteratorValues(makeIterator), level, keepReadLock = true) match {
//我们调用的block不存在,那么需要计算

case None =>
// doPut() didn't hand work back to us, so the block already existed or was successfully

// stored. Therefore, we now hold a read lock on the block.

val blockResult = get(blockId).getOrElse {
// Since we held a read lock between the doPut() and get() calls, the block should not

// have been evicted, so get() not returning the block indicates some internal error.

releaseLock(blockId)
throw new SparkException(s"get() failed for block $blockId even though we held a lock")
}
Left(blockResult)
//我们调用的block已经存在了

case Some(failedPutResult) =>
// The put failed, likely because the data was too large to fit in memory and could not be

// dropped to disk. Therefore, we need to pass the input iterator back to the caller so

// that they can decide what to do with the values (e.g. process them without caching).

Right(failedPutResult.data.left.get)
}
}


doput是存储管理,代码量很大,所以打算放到存储那块一起分析,不过可以从它返回的信息,来补充我们需要的分

析资料。doput的返回的信息如下:

* @return `Some(PutResult)` if the block did not exist and could not be successfully cached,
*
or None if the block already existed or was successfully stored (fully consuming
*
the input data / input iterator).
函数

private def doPut(
blockId: BlockId,

data: BlockValues,

level: StorageLevel,

tellMaster: Boolean = true,

effectiveStorageLevel: Option[StorageLevel] = None,

keepReadLock: Boolean = false): Option[PutResult] = {/*集合体*/}




/**
 * Compute an RDD partition or read it from a checkpoint if the RDD is checkpointing.
 */
private[spark] def computeOrReadCheckpoint(split: Partition, context: TaskContext): Iterator[T] =
{
if (isCheckpointedAndMaterialized) {
firstParent[T].iterator(split, context)
} else {
compute(split, context)
//从新计算
}
}

综上所述,分析到最后如果都没有找到我们所需,那么就会先看看有没有checkpoint,最后在重新计算我们所需。下面是寻找数据的流程图:




附录

源码1

/**
 * Mark this RDD for persisting using the specified level.
 *
 * @param newLevel the target storage level
 * @param allowOverride whether to override any existing level with the new one
 */
private def persist(newLevel: StorageLevel, allowOverride: Boolean): this.type = {
// TODO: Handle changes of StorageLevel

if (storageLevel != StorageLevel.NONE && newLevel != storageLevel && !allowOverride) {
throw new UnsupportedOperationException(
"Cannot change storage level of an RDD after it was already assigned a level")
}
// If this is the first time this RDD is marked for persisting, register it

// with the SparkContext for cleanups and accounting. Do this only once.

if (storageLevel == StorageLevel.NONE) {
sc.cleaner.foreach(_.registerRDDForCleanup(this))
sc.persistRDD(this)
}
storageLevel = newLevel
this
}
/**
 * Set this RDD's storage level to persist its values across operations after the first time
 * it is computed. This can only be used to assign a new storage level if the RDD does not
 * have a storage level set yet. Local checkpointing is an exception.
 */
def persist(newLevel: StorageLevel): this.type = {
if (isLocallyCheckpointed) {
// This means the user previously called localCheckpoint(), which should have already

// marked this RDD for persisting. Here we should override the old storage level with

// one that is explicitly requested by the user (after adapting it to use disk).

persist(LocalRDDCheckpointData.transformStorageLevel(newLevel), allowOverride = true)
} else {
persist(newLevel, allowOverride = false)
}
}
/** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
def persist(): this.type = persist(StorageLevel.MEMORY_ONLY)
/** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
def cache(): this.type = persist()
/**
 * Mark the RDD as non-persistent, and remove all blocks for it from memory and disk.
 *
 * @param blocking Whether to block until all blocks are deleted.
 * @return This RDD.
 */
def unpersist(blocking: Boolean = true): this.type = {
logInfo("Removing RDD " + id + " from persistence list")
sc.unpersistRDD(id, blocking)
storageLevel = StorageLevel.NONE

this
}
/** Get the RDD's current storage level, or StorageLevel.NONE if none is set. */
def getStorageLevel: StorageLevel = storageLevel

源码2

private[spark] class ResultTask[T, U](
stageId: Int,

stageAttemptId: Int,

taskBinary: Broadcast[Array[Byte]],

partition: Partition,

locs: Seq[TaskLocation],

val outputId: Int,

_initialAccums: Seq[Accumulator[_]] = InternalAccumulator.createAll())
extends Task[U](stageId, stageAttemptId, partition.index, _initialAccums)
with Serializable {
@transient private[this] val preferredLocs: Seq[TaskLocation] = {
if (locs == null) Nil else locs.toSet.toSeq
}
override def runTask(context: TaskContext): U = {
// Deserialize the RDD and the func using the broadcast variables.

val deserializeStartTime = System.currentTimeMillis()
val ser = SparkEnv.get.closureSerializer.newInstance()
val (rdd, func) = ser.deserialize[(RDD[T], (TaskContext, Iterator[T]) => U)](
ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
_executorDeserializeTime = System.currentTimeMillis() - deserializeStartTime
metrics = Some(context.taskMetrics)
func(context, rdd.iterator(partition, context))
}
// This is only callable on the driver side.

override def preferredLocations: Seq[TaskLocation] = preferredLocs


override def toString: String = "ResultTask(" + stageId + ", " + partitionId + ")"
}




参考文献:

http://spark.apache.org/docs/latest/programming-guide.html#rdd-persistence


最后

以上就是忧心河马为你收集整理的spark中cache和persist的区别,rdd缓存源码解析一、cache和persist的区别二、spark rdd缓存源码解析的全部内容,希望文章能够帮你解决spark中cache和persist的区别,rdd缓存源码解析一、cache和persist的区别二、spark rdd缓存源码解析所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(56)

评论列表共有 0 条评论

立即
投稿
返回
顶部