我是靠谱客的博主 暴躁飞鸟,最近开发中收集的这篇文章主要介绍Spark序列化 & Encoders,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

spark序列化方式

分布式的程序存在着网络传输,无论是数据还是程序本身的序列化都是必不可少的。spark自身提供两种序列化方式:

  • java序列化:这是spark默认的序列化方式,使用java的ObjectOutputStream框架,只要是实现了java.io.Serializable接口的类都可以,这种方式虽然通用但是性能差,占用的空间也比较大
  • kryo序列化:相比于java序列化,kryo更高效且序列化结果紧凑,占用空间小,但是不能序列化所有数据类型,且为更好的性能,需要在程序中注册需要序列化的类

kryo不作为默认的序列化方式,是因为需要显式注册自定义的类型,自spark2.0后,对于一些简单类型的rdd(AllScalaRegistrar默认注册了一些常用的基本类型)在shuffling时内部默认使用kryo作序列化

SparkSql与序列化

SparkSql并不使用kryo或java序列化,Dataset使用的是Encoder将jvm对象转换为二进制(《spark数据格式UnsafeRow》),类似于序列化过程,但是Encoder是动态生成代码,并使用标准的InternalRow格式,使得spark可以直接基于字节上做很多操作(不需要反序列化过程),比如filtering,sorting和hashing;Encoder比kryo和java序列化更轻量级,因为它不用额外保存类的描述信息。

在spark中使用kryo

kryo使用比较麻烦,但为了更好的性能和使用更少的内存,还是建议使用kryo序列化。

  1. 初始化sparkcontext时指定使用kryo序列化
  2. 向kryo注册自定义类(registerKryoClasses->org.apache.spark.serializer.KryoSerializer.classesToRegister)
val conf=new SparkConf().setAppName("kryo-test").setMaster("local[*]")
  .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
  .set("spark.kryo.registrationRequired", "true")
  .registerKryoClasses(Array(classOf[com.test.spark.KryoTest.Person],  
    classOf[Array[com.test.spark.KryoTest.Person]],
    classOf[scala.collection.mutable.WrappedArray.ofRef[_]]))
val spark=SparkSession.builder().config(conf).enableHiveSupport().getOrCreate()
case class Person(val name: String, val age: Long)
val array = (1 to 100000).map(v => Person("person" + v, v)).toSeq
val rdd = spark.sparkContext.parallelize(array, 5).persist(StorageLevel.MEMORY_ONLY_SER)
rdd.count
rdd.take(10).foreach(p => println(p.age, p.name))

上面的示例执行完后在sparkui上看到使用内存为1641.3 KB

使用kryo但不注册类

val conf=new SparkConf().setAppName("kryo-test").setMaster("local[*]")
  .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
val spark=SparkSession.builder().config(conf).enableHiveSupport().getOrCreate()
case class Person(val name: String, val age: Long)
val array = (1 to 100000).map(v => Person("person" + v, v)).toSeq
val rdd = spark.sparkContext.parallelize(array, 5).persist(StorageLevel.MEMORY_ONLY_SER)
rdd.count
rdd.take(10).foreach(p => println(p.age, p.name))

上面的示例执行完后在sparkui上看到使用内存为4.8 MB,是注册类时1641.3 KB的差不多3倍,不仅如此,如果在Stage界面观察Task Deserialization TimeResult Serialization Time两项指标,也可以看出注册了类的话耗时更少。

当序列化Person实例对象时,如果不注册Person类,那么会写入Person类的完全限定类名,如果注册了,则会使用一个int类型的ID(1-2字节)代替完全限定类名,显然注册类更加高效且节省空间

这个ID是通过com.esotericsoftware.kryo.Kryo#nextRegisterID自增生成的,与类唯一对应,在反序列化时通过ID反向找到类然后实例化。所以在spark这种分布式的程序中,Person类在所有executor中都必需对应着相同的ID值,这是如何保证的?是因为在所有的executor中代码相同所以注册类的顺序一致,还是在driver中把所有类统一注册然后广播到各个executor中?

kryo与java占用内存对比

将上面代码改为使用java序列化方式,最终得到2.7MB

序列化方式占用内存
kryo且注册类1641.3 KB
kryo不注册类4.8 MB
java2.7 MB

可以看到kryo在不注册类的情况时,rdd缓存占用的内存比使用java时还要多

Encoders.kryo vs Encoders.javaSerialization

在Dataset中如果想使用kryo序列化,可以通过工厂类org.apache.spark.sql.Encoders生成一个使用kryo序列化/java序列化的Encoder,但是创建的Dataset并不是一个标准的数据集,因为得到的数据集中有唯一一列"value",而这个列的值则是整行记录的二进制数据

val df_java_ser = spark.createDataset(array)(Encoders.javaSerialization(classOf[Person])).persist(StorageLevel.MEMORY_ONLY_SER)
df_java_ser.count
df_java_ser.show
val df_kryo_ser = spark.createDataset(array)(Encoders.kryo(classOf[Person])).persist(StorageLevel.MEMORY_ONLY_SER)
df_kryo_ser.count
df_kryo_ser.show(3)

//show的结果如下
+--------------------+
|               value|
+--------------------+
|[01 00 63 6F 6D 2...|
|[01 00 63 6F 6D 2...|
|[01 00 63 6F 6D 2...|
+--------------------+

上面分别使用kryo和java序列化时分别占用内在为:5.2M16.8M

在Dataset两种方式最后大小都比上面rdd的多,猜测这是因为行数据序列化为二进制后被包装成UnsafeRow,所以需要很多额外的空间。
但是在同样没注册到Person类的情况下,RDD是java优于kryo,Dataset是kryo优于java,这是什么原因,求留言告知!!!

撇开Dataset直接对Person对象进行序列化(一样不注册Person类):

val bf_kryo = new org.apache.spark.serializer.KryoSerializer(new SparkConf()).newInstance().serialize(Person("abcd" , 1))
val bf_java = new org.apache.spark.serializer.JavaSerializer(new SparkConf()).newInstance().serialize(Person("abcd" , 1))

kryo使用了42字节,java使用了103字节,既然单个对象的序列化结果都是kryo优于java,为什么在RDD中却相反,why?

参考

https://medium.com/@knoldus/kryo-serialization-in-spark-55b53667e7abhttps://medium.com/@knoldus/kryo-serialization-in-spark-55b53667e7ab

https://spark.apache.org/docs/latest/tuning.html#data-serializationhttps://spark.apache.org/docs/latest/tuning.html#data-serialization

最后

以上就是暴躁飞鸟为你收集整理的Spark序列化 & Encoders的全部内容,希望文章能够帮你解决Spark序列化 & Encoders所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(69)

评论列表共有 0 条评论

立即
投稿
返回
顶部