我是靠谱客的博主 怕孤单花生,最近开发中收集的这篇文章主要介绍spark之缓存,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

1、cache()和persist()

cache()和persist()注意问题:
1.cache()和persist()持久化单位是partition,cache()和persist()是懒执行算子,需要action算子触发执行。
2.对一个RDD使用cache或者persist之后可以赋值给一个变量,下次直接使用这个变量就是使用的持久化的数据。也可以直接对RDD进行cache或者persist不赋值给一个变量
3.如果采用第二种方式赋值给变量的话,后面不能紧跟action算子。
4.cache()和persist()的数据在当前application执行完成之后会自动清除。

def main(args: Array[String]): Unit = {
val conf = new SparkConf()
conf.setAppName("cacheAndPersist")
conf.setMaster("local")
val sc = new SparkContext(conf)
var lines = sc.textFile("./data/persistData.txt")
//
lines.cache()
lines = lines.persist(StorageLevel.MEMORY_ONLY)
val startTime1 = System.currentTimeMillis()
val count1 = lines.count()
val endTime1 = System.currentTimeMillis()
println("count1 = "+count1+",time = "+(endTime1-startTime1)+"ms")
val startTime2 = System.currentTimeMillis()
val count2 = lines.count()
val endTime2 = System.currentTimeMillis()
println("count2 = "+count2+",time = "+(endTime2-startTime2)+"ms")
sc.stop()
}

2、checkpoint

当RDD的lineage比较长,计算较为复杂时,可以使用checkpint对RDD进行持久化,checkpoint将数据直接持久化到磁盘中
checkpoint执行流程:
1.当spark job 执行完之后会从后往前回溯,对进行checkpoint RDD进行标记
2.回溯完成之后,Spark框架会启动一个job重新计算checkpointRDD的数据
3.计算完成之后,将计算的结果直接持久化到指定的checkpoint目录中,切断RDD之间的依赖关系。
优化:对RDD进行checkpoint之前先对RDD进行cache()下,这样第三步就不用重新从头计算当前checkpointRDD的数据

def main(args: Array[String]): Unit = {
val conf =new SparkConf()
conf.setAppName("test")
conf.setMaster("local")
val sc = new SparkContext(conf)
val lines = sc.textFile("./data/words")
val lines2 = sc.textFile("./data/words")
sc.setCheckpointDir("./data/checkpoint")
lines.checkpoint()
lines2.checkpoint()
val count = lines.count()
val count2 = lines2.count()
println(lines2.getCheckpointFile)
sc.stop()
}

最后

以上就是怕孤单花生为你收集整理的spark之缓存的全部内容,希望文章能够帮你解决spark之缓存所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(30)

评论列表共有 0 条评论

立即
投稿
返回
顶部