概述
Spark RDD是惰性求值的,而有时由于业务需要,我们要复用一个RDD。对于这种情况,如果我们只是简单地对RDD调用行动操作,Spark 将会每次都重算RDD 以及它的所有依赖。这在迭代算法中消耗格外大,因为迭代算法常常会多次使用同一组数据。
例如:Scala 中的两次执行
val result = input.map(x => x*x)
println(result.count())
println(result.collect().mkString(","))
为了避免多次计算同一个RDD造成额外的开销,可以使用RDD.cache()函数对数据进行持久化存储。当我们让 Spark 持久化存储一个 RDD 时,计算出 RDD 的节点会分别保存它们所求出的分区数据。如果一个有持久化数据的节点发生故障,Spark 会在需要用到缓存的数据时重算丢失的数据分区。如果希望节点故障的情况不会拖累我们的执行速度,也可以把数据备份到多个节点上。
根据业务的不同需求,我们可以在persist()函数中为 RDD 选择不同的持久化级别(如下面函数所示)。上面的cache()函数等同于persist()函数存储时的MEMORY_ONLY持久化级别。
在Scala和 会把数据以序列化的形式缓存在 JVM 的堆空间中。而在 Python 中,会始终序列化要持久化存储的数据,所以持久化级别默认值就是以序列化后的对象存储在 JVM 堆空间中。当我们把数据写到磁盘或者堆外存储上时,也总是使用序列化后的数据。
如果有需要,我们也可以通过在存储级别的末尾加上“_2”来把持久化数据存为两份
默认不进行持久化可能也显得有些奇怪,不过这对于大规模数据集是很有意义的:如果不会重用该 RDD,我们就没有必要浪费存储空间,Spark 可以直接遍历一遍数据然后计算出结果。
RDD缓存策略
class StorageLevel private(
private var _useDisk :
Boolean,private var _useMemory :
Boolean,private var _useOffHeap:
Boolean,private var _deserialized:
Boolean,private var _replication : Int =1
)
object StorageLevel {
val NONE = new StorageLevel(false,false, false, false)
val DISK_ONLY = new StorageLevel(true,false, false, false)
val DISK_ONLY_2 = new StorageLevel(true,false, false, false, 2)
val MEMORY_ONLY = newStorageLevel(false, true, false, true) //系统默认
val MEMORY_ONLY_2 = newStorageLevel(false, true, false, true, 2)
val MEMORY_ONLY_SER = newStorageLevel(false, true, false, false)
val MEMORY_ONLY_SER_2 = newStorageLevel(false, true, false, false, 2)
val MEMORY_AND_DISK = newStorageLevel(true, true, false, true)
val MEMORY_AND_DISK_2 = newStorageLevel(true, true, false, true, 2)
val MEMORY_AND_DISK_SER = newStorageLevel(true, true, false, false)
val MEMORY_AND_DISK_SER_2 = newStorageLevel(true, true, false, false, 2)
val OFF_HEAP = new StorageLevel(false,false, true, false
)
最后
以上就是甜蜜烧鹅为你收集整理的spark的持久化存储的全部内容,希望文章能够帮你解决spark的持久化存储所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复