我是靠谱客的博主 羞涩星星,最近开发中收集的这篇文章主要介绍Spark 调优之RDD持久化级别及kryo序列化性能测试MEMORY_ONLYMEMORY_ONLY_SER 未使用kryo序列化MEMORY_ONLY_SER 使用kryo序列化未注册MEMORY_ONLY_SER 使用kryo序列化并注册注册kryo序列化并开启RDD压缩对比表格,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

我们上篇文章中讲了,RDD的持久化是spark优化中必须掌握的,并且,在内存不足的情况下,我们可以将持久化类型选择为MEMORY_ONLY_SER,减少内存的占用,持久化更多的partition,并且不同的序列化方法也会影响序列化性能。
下面,我们就来测试下,持久化级别和序列化方法的选择对RDD持久化大小的影响。
我选择了一个170.9MB的日志文件,传到了百度网盘
提取码:ffae
测试环境是windows,
IDEA参数配置
spark 调优之rdd持久化级别及kryo序列化性能测试
spark 调优之rdd持久化级别及kryo序列化性能测试

MEMORY_ONLY

代码为

case class CleanedLog(cdn:String,region:String,level:String,date:String,ip:String, domain:String, url:String, traffic:String)
object KyroTest {
def main(args: Array[String]) {
val inputPath=new Path(args(0))
val outputPath=new Path(args(1))
val fsConf=new Configuration()
val fs= FileSystem.get(fsConf)
if (fs.exists(outputPath)) {
fs.delete(outputPath,true)
val path=args(1).toString
println(s"已删除已存在的路径$path")
}
val conf = new SparkConf().setMaster("local[2]").setAppName("KyroTest")
//conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
//conf.set("spark.kryo.registrationRequired", "true")
val sc = new SparkContext(conf)
val logs = sc.textFile(args(0))
//logs.filter(_.split("t").length==8).take(10).foreach(println(_))
val logsCache=logsCahe(logs)
//序列化的方式将rdd存到内存
saveAtLocal(logsCache,args(1))
Thread.sleep(100000)
}
def logsCahe(logs:RDD[String]): RDD[CleanedLog] ={
logs.filter(_.split("t").length==8).map(x=>{
val fields=x.split("t")
CleanedLog(fields(0),fields(1),fields(2),fields(3),fields(4),fields(5),fields(6),fields(7))
}).cache()
}
def saveAtLocal(logsCache: RDD[CleanedLog], outputPath: String) = {
logsCache.map(x=>{
x.cdn+"t"+x.region+"t"+x.level+"t"+x.date+"t"+x.ip+"t"+x.domain+"t"+x.url+"t"+x.traffic
}).repartition(1).saveAsTextFile(outputPath)
}
}

代码逻辑就是输入是什么内容,输就是什么内容,在中间我将输入的文本RDD进行了memory_only持久化,我们就看这个持久化内存占多少
spark 调优之rdd持久化级别及kryo序列化性能测试

spark 调优之rdd持久化级别及kryo序列化性能测试
显然,input size大小是170.9 MB,但是持久化之后是908.5 MB,显然占据内存空间增大了好几倍,如果在生产上,内存资源不足的情况下,这种方式显然缓存不了不少partition
时间耗费14s

MEMORY_ONLY_SER 未使用kryo序列化

def logsCahe(logs:RDD[String]): RDD[CleanedLog] ={
logs.filter(_.split("t").length==8).map(x=>{
val fields=x.split("t")
CleanedLog(fields(0),fields(1),fields(2),fields(3),fields(4),fields(5),fields(6),fields(7))
}).persist(StorageLevel.MEMORY_ONLY_SER)

代码仅更改了persist(StorageLevel.MEMORY_ONLY_SER)
spark 调优之rdd持久化级别及kryo序列化性能测试
spark 调优之rdd持久化级别及kryo序列化性能测试
显然,input size大小是170.9 MB,但是持久化之后是有204.9MB,所以序列化对于节约内存空间是很有帮助的。

时间耗费11s

MEMORY_ONLY_SER 使用kryo序列化未注册

 val conf = new SparkConf().setMaster("local[2]").setAppName("KyroTest")
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")

与上一代码相比,为SparkConf设置了开启kryo序列化,不是默认的java序列化了,但是没有进行具体的类注册!
spark 调优之rdd持久化级别及kryo序列化性能测试
spark 调优之rdd持久化级别及kryo序列化性能测试
显然,input size大小是170.9 MB,但是持久化之后是有230.8MB,使用未注册的kryo序列化竟然比使用java序列化还臃肿!原因是:每一个对象实例的序列化结果都会包含一份完整的类名,造成了大量的空间浪费!

时间是9s,比java序列化快了一些。

MEMORY_ONLY_SER 使用kryo序列化并注册

val conf = new SparkConf().setMaster("local[2]").setAppName("KyroTest")
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
conf.registerKryoClasses(Array(classOf[CleanedLog], classOf[String]))

添加了String类和自定义样例类的kryo注册
spark 调优之rdd持久化级别及kryo序列化性能测试
spark 调优之rdd持久化级别及kryo序列化性能测试
显然,input size大小是170.9 MB,使用注册的kryo序列化之后,只有175.7MB,时间也才9秒,很舒服!

所以在目前为止,使用kryo序列化并注册是性能最好得了!!!

如果CPU还是那么悠闲的话,我们还有另外一个进一步优化点!

注册kryo序列化并开启RDD压缩

注意:RDD压缩只能存在于序列化的情况下

val conf = new SparkConf().setMaster("local[2]").setAppName("KyroTest")
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
conf.registerKryoClasses(Array(classOf[CleanedLog], classOf[String]))
conf.set("spark.rdd.compress","true")

spark 调优之rdd持久化级别及kryo序列化性能测试
spark 调优之rdd持久化级别及kryo序列化性能测试
持久化的大小仅有45.6MB!!!

spark.rdd.compress

这个参数决定了RDD Cache的过程中,RDD数据在序列化之后是否进一步进行压缩再储存到内存或磁盘上。当然是为了进一步减小Cache数据的尺寸,对于Cache在磁盘上而言,绝对大小大概没有太大关系,主要是考虑Disk的IO带宽。而对于Cache在内存中,那主要就是考虑尺寸的影响,是否能够Cache更多的数据,是否能减小Cache数据对GC造成的压力等。

这两者,前者通常不会是主要问题,尤其是在RDD Cache本身的目的就是追求速度,减少重算步骤,用IO换CPU的情况下。而后者,GC问题当然是需要考量的,数据量小,占用空间少,GC的问题大概会减轻,但是是否真的需要走到RDD Cache压缩这一步,或许用其它方式来解决可能更加有效。

所以这个值默认是关闭的,但是如果在磁盘IO的确成为问题或者GC问题真的没有其它更好的解决办法的时候,可以考虑启用RDD压缩。

对比表格

类型输入大小持久化大小时间
MEMORY_ONLY170.9 MB908.5 MB14s
MEMORY_ONLY_SER170.9 MB204.9MB11s
kyro序列化未注册170.9 MB230.8MB9s
kyro序列化注册170.9 MB175.7MB9s
注册kryo序列化并开启RDD压缩170.9 MB45.6MB9s

转载于:https://blog.51cto.com/14309075/2397012

最后

以上就是羞涩星星为你收集整理的Spark 调优之RDD持久化级别及kryo序列化性能测试MEMORY_ONLYMEMORY_ONLY_SER 未使用kryo序列化MEMORY_ONLY_SER 使用kryo序列化未注册MEMORY_ONLY_SER 使用kryo序列化并注册注册kryo序列化并开启RDD压缩对比表格的全部内容,希望文章能够帮你解决Spark 调优之RDD持久化级别及kryo序列化性能测试MEMORY_ONLYMEMORY_ONLY_SER 未使用kryo序列化MEMORY_ONLY_SER 使用kryo序列化未注册MEMORY_ONLY_SER 使用kryo序列化并注册注册kryo序列化并开启RDD压缩对比表格所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(60)

评论列表共有 0 条评论

立即
投稿
返回
顶部