概述
Task序列化问题
问题现象
本质原因
1.封装数据的bean没有实现序列化(报错图1)
封装数据的bean需要写入本地磁盘和在网络中传输,没有实现序列化出出错
2.rdd的算子中传入的函数用到了外部引用类型(闭包),且没有实现序列化(报错图2)
本质原因的Task是在Driver端生成的,函数也是在Driver端定义的,但是真正执行是需要将Task和Task中引用都序列化,如果有没法序列化的,就会出现Task没有序列化异常
Task在底层有实现序列化接口的,所以Task可以被传输到Executor端,如果在Task的函数中传入了一个函数外部定义的引用数据类型(闭包),该引用会在Driver端被初始化,然后会被Task识别到,传输Task时,因为在函数内部使用了它,所以它需要跟着Task一起序列化到Executor端,然后再在Executor端反序列化,但如果该引用类型未实现序列化接口,就会导致Task也无法序列化,报上面图2的错误.
案例
在Driver端(函数外部)初始化一个Object
Driver初始化一个object,这个单例对象要伴随着Task发送到Executor,但是一个Executor中只有一份
注意:必须实现序列化接口, 而且使用共享成员变量有可能会出现线程安全问题(因为一个Executor进程中可能存在多个Task线程,多个Task共享一个object的实例,那么就有可能出现多个Task同时访问一个成员变量的问题,如果都是object的方法中都是局部变量就不会存在该问题,详细解决方案在下篇文章中介绍)
object RulesMapObj extends Serializable {
val rules = Map("ln" -> "辽宁省", "bj" -> "北京市", "sd" -> "山东省")
}
object SerTest01 {
def main(args: Array[String]): Unit = {
val isLocal = args(0).toBoolean
//创建SparkConf,然后创建SparkContext
val conf = new SparkConf().setAppName(this.getClass.getSimpleName)
if (isLocal) {
conf.setMaster("local[*]")
}
val sc = new SparkContext(conf)
//在Driver端初始化
val rulesMap = RulesMapObj
val lines: RDD[String] = sc.textFile(args(1))
//对数据进行map关联中文名称
val wordAndProvince = lines.map(word => {
//关联外部的规则
//存在闭包:函数内部用到了一个函数外部的引用类型,函数式在Executor中被调用的
//在Driver端引用的数据要伴随着Task一起发送到Executor中
val province = rulesMap.rules(word)
//获取分区Id(即Task的Id)
val partitionId = TaskContext.get().partitionId()
//获取当前机器的主机名
val hostname = InetAddress.getLocalHost.getHostName
//获取当前线程的Id
val threadId = Thread.currentThread().getId
(word, province, hostname, partitionId, threadId, rulesMap)
})
//将处理好的数据写回到HDFS
wordAndProvince.saveAsTextFile(args(2))
sc.stop()
}
}
通过观察HDSF中的结果,可以得出结论:
1.一个Executor中只有一个RulesMapObj实例,该Executor中的多个Task共享这一个实例,因为object的对象是单例的,在一个进程中只能有一个对象
2.RulesMapObj需要实现序列化
在Driver端new一个类的实例
Driver实例化一个类的实例,在函数内容引用了这个实例,该实例会伴随着Task发送到Executor,每个Task有一个单独的实例,必须实现序列化接口
class RulesMapClass extends Serializable {
val rules = Map("ln" -> "辽宁省", "bj" -> "北京市", "sd" -> "山东省")
}
object SerTest02 {
def main(args: Array[String]): Unit = {
val isLocal = args(0).toBoolean
//创建SparkConf,然后创建SparkContext
val conf = new SparkConf().setAppName(this.getClass.getSimpleName)
if (isLocal) {
conf.setMaster("local[*]")
}
val sc = new SparkContext(conf)
//在Driver端初始化【new一个类的实例】
val rulesMap = new RulesMapClass
//sh
val lines: RDD[String] = sc.textFile(args(1))
//对数据进行map关联中文名称
val wordAndProvince = lines.map(word => {
//关联外部的规则
//存在闭包:函数内部用到了一个函数外部的引用类型,函数式在Executor中被调用的
//在Driver端引用的数据要伴随着Task一起发送到Executor中
val province = rulesMap.rules(word)
val partitionId = TaskContext.get().partitionId()
val hostname = InetAddress.getLocalHost.getHostName
val threadId = Thread.currentThread().getId
(word, province, hostname, partitionId, threadId, rulesMap)
})
//将处理好的数据写回到HDFS
wordAndProvince.saveAsTextFile(args(2))
sc.stop()
}
}
通过观察HDSF中的结果,可以得出结论:
1.每一个Task中都有一个单独的RulesMapClass实例
2.RulesMapClass需要实现序列化
class相对object更浪费空间,但不会存在线程安全问题
在Driver端初始化对象都会存在一个共同的问题,就是Driver端new出来的对象会先序列化传输到该Application的所有Executor中,然后再反序列化,如果对象中的信息量很大,就会在序列化后传输后,然后在多个Executor反序列化浪费大量的资源和时间
在Executor端(函数内部)初始化一个Object
在函数内部使用一个object,这个object不用实现序列化,因为函数内部的逻辑不被调用,是不会执行的,所以它不会在Driver端被初始化出实例对象,直接跟着函数内部的逻辑一起到了Executor端,然后在Executor中初始化,所以不用将实例对象序列化再反序列传输,并且object修饰的对象一个Executor中只有一份,且使用共享成员变量也有可能会出现线程安全问题
如果是class的对象,new在函数内部,如果是map算子,则会一条数据就new一个对象,会浪费大量的内存空间,但可以使用mapPartitions,一个分区new一个对象,节省内存空间,且不会存在线程安全问题
object RulesMapObjNoSer {
{
//查看该静态代码块是在哪一端打印的
val hostName = InetAddress.getLocalHost.getHostName
println("¥¥¥¥¥¥¥¥ init RulesMapObjNoSer at " + hostName)
}
val rules = Map("ln" -> "辽宁省", "bj" -> "北京市", "sd" -> "山东省")
}
object SerTest03 {
def main(args: Array[String]): Unit = {
val isLocal = args(0).toBoolean
//创建SparkConf,然后创建SparkContext
val conf = new SparkConf().setAppName(this.getClass.getSimpleName)
if (isLocal) {
conf.setMaster("local[*]")
}
val sc = new SparkContext(conf)
val lines: RDD[String] = sc.textFile(args(1))
//对数据进行map关联中文名称
val wordAndProvince = lines.map(word => {
//关联外部的规则
//RulesMapObjNoSer是在Executor端初始化的
val province = RulesMapObjNoSer.rules(word)
val partitionId = TaskContext.get().partitionId()
val hostname = InetAddress.getLocalHost.getHostName
val threadId = Thread.currentThread().getId
(word, province, hostname, partitionId, threadId, RulesMapObjNoSer)
})
//将处理好的数据写回到HDFS
wordAndProvince.saveAsTextFile(args(2))
sc.stop()
}
}
通过观察HDSF中的结果,可以得出结论:
一个Executor中只有一个RulesMapObjNoSer实例
RulesMapObjNoSer是在Executor中被初始化的,不需要实现序列化
最后
以上就是多情小虾米为你收集整理的大数据之spark_spark中的序列化问题详解Task序列化问题的全部内容,希望文章能够帮你解决大数据之spark_spark中的序列化问题详解Task序列化问题所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复