概述
一、ErasureCodePolicy问题,导致streaming任务退出
1、任务失败的原因,当执行block时有uncaught 异常时,stop sparkcontext,如下
具体源码错误路径,感兴趣的,可以根据错误日志跟踪一下,这里就不具体跟踪了,只显示比较重要的源码信息
AsyncEventQueue
private val dispatchThread = new Thread(s"spark-listener-group-$name") {
setDaemon(true)
override def run(): Unit = Utils.tryOrStopSparkContext(sc) {
dispatch()
}
}
private def dispatch(): Unit = LiveListenerBus.withinListenerThread.withValue(true) {
try {
var next: SparkListenerEvent = eventQueue.take()
while (next != POISON_PILL) {
val ctx = processingTime.time()
try {
super.postToAll(next)
} finally {
ctx.stop()
}
eventCount.decrementAndGet()
next = eventQueue.take()
}
eventCount.decrementAndGet()
} catch {
case ie: InterruptedException =>
logInfo(s"Stopping listener queue $name.", ie)
}
}
org.apache.spark.util.Utils
/**
* Execute a block of code that evaluates to Unit, stop SparkContext if there is any uncaught
* exception
*
* NOTE: This method is to be called by the driver-side components to avoid stopping the
* user-started JVM process completely; in contrast, tryOrExit is to be called in the
* spark-started JVM process .
*/
def tryOrStopSparkContext(sc: SparkContext)(block: => Unit) {
try {
block
} catch {
case e: ControlThrowable => throw e
case t: Throwable =>
val currentThreadName = Thread.currentThread().getName
if (sc != null) {
logError(s"uncaught error in thread $currentThreadName, stopping SparkContext", t)
sc.stopInNewThread()
}
if (!NonFatal(t)) {
logError(s"throw uncaught fatal error in thread $currentThreadName", t)
throw t
}
}
}
2、首先是个告警信息,如下
2018-11-30 16:35:53 WARN Client:66 - Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.
2018-11-30 16:35:54 WARN ErasureCodeNative:55 - ISA-L support is not available in your platform... using builtin-java codec where applicable
3、当spark streaming任务跑完一批后,就开始报错,一直循环同样的报错,跑不了多久任务就挂掉了,报错信息如下
2018-11-30 16:37:21 ERROR AsyncEventQueue:91 - Listener EventLoggingListener threw an exception
java.lang.IllegalStateException
at com.google.common.base.Preconditions.checkState(Preconditions.java:133)
at org.apache.hadoop.hdfs.DFSStripedOutputStream$CellBuffers.addTo(DFSStripedOutputStream.java:238)
at org.apache.hadoop.hdfs.DFSStripedOutputStream$CellBuffers.access$700(DFSStripedOutputStream.java:203)
at org.apache.hadoop.hdfs.DFSStripedOutputStream.writeChunk(DFSStripedOutputStream.java:520)
at org.apache.hadoop.fs.FSOutputSummer.writeChecksumChunks(FSOutputSummer.java:217)
at org.apache.hadoop.fs.FSOutputSummer.flushBuffer(FSOutputSummer.java:164)
at org.apache.hadoop.fs.FSOutputSummer.flushBuffer(FSOutputSummer.java:145)
at org.apache.hadoop.fs.FSOutputSummer.write1(FSOutputSummer.java:136)
at org.apache.hadoop.fs.FSOutputSummer.write(FSOutputSummer.java:111)
at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.write(FSDataOutputStream.java:57)
at java.io.DataOutputStream.write(DataOutputStream.java:107)
at java.io.BufferedOutputStream.write(BufferedOutputStream.java:122)
at sun.nio.cs.StreamEncoder.writeBytes(StreamEncoder.java:221)
at sun.nio.cs.StreamEncoder.implWrite(StreamEncoder.java:282)
at sun.nio.cs.StreamEncoder.write(StreamEncoder.java:125)
at java.io.OutputStreamWriter.write(OutputStreamWriter.java:207)
at java.io.BufferedWriter.flushBuffer(BufferedWriter.java:129)
at java.io.BufferedWriter.write(BufferedWriter.java:230)
at java.io.PrintWriter.write(PrintWriter.java:456)
at java.io.PrintWriter.write(PrintWriter.java:473)
at java.io.PrintWriter.print(PrintWriter.java:603)
at java.io.PrintWriter.println(PrintWriter.java:739)
at org.apache.spark.scheduler.EventLoggingListener$$anonfun$logEvent$1.apply(EventLoggingListener.scala:143)
at org.apache.spark.scheduler.EventLoggingListener$$anonfun$logEvent$1.apply(EventLoggingListener.scala:143)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.EventLoggingListener.logEvent(EventLoggingListener.scala:143)
at org.apache.spark.scheduler.EventLoggingListener.onTaskEnd(EventLoggingListener.scala:164)
at org.apache.spark.scheduler.SparkListenerBus$class.doPostEvent(SparkListenerBus.scala:45)
at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37)
at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37)
at org.apache.spark.util.ListenerBus$class.postToAll(ListenerBus.scala:82)
at org.apache.spark.scheduler.AsyncEventQueue.org$apache$spark$scheduler$AsyncEventQueue$$super$postToAll(AsyncEventQueue.scala:89)
at org.apache.spark.scheduler.AsyncEventQueue$$anonfun$org$apache$spark$scheduler$AsyncEventQueue$$dispatch$1.apply(AsyncEventQueue.scala:89)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
at org.apache.spark.scheduler.AsyncEventQueue.org$apache$spark$scheduler$AsyncEventQueue$$dispatch(AsyncEventQueue.scala:83)
at org.apache.spark.scheduler.AsyncEventQueue$$anon$1$$anonfun$run$1.apply$mcV$sp(AsyncEventQueue.scala:79)
at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1319)
at org.apache.spark.scheduler.AsyncEventQueue$$anon$1.run(AsyncEventQueue.scala:78)
二、问题分析
报错的 源码大致意思是
1、在 onTaskEnd 的时候 会调用 PrintWriter 输出 even log 信息
2、而 这个 PrintWriter 的write 最终 由 DFSStripedOutputStream 的 writeChunk去写
3、写之前有个 checkState 就是在 这里报 的错
4、相关 参数 有个 ErasureCodingPolicy 的 一个 cellSize
5、当 输出的 position 大于 cellsize 时 就抛这个异常了
6、而且 spark.util.Utils 的 tryOrStopSparkContext 发现 执行 block 有 uncaught 异常 ,就stop sparkContext
7、从这来看 任务 在新集群 没跑一会儿 就挂了 很大原因 就是这个 引起的
8、EC分析:日志输出时,会将其分块,然后写入EC的缓冲池中,当缓冲池满后,flush将其清空,导致异常当两个很重要当原因:A、缓冲块分的大 B、缓冲池比较小,由于这两个原因导致异常发生
三、问题解决
1. 将日志输出改为本地,不写HDFS,或关闭日志输出的EC
2. 升级spark 版本 为2.4,EC问题,是spark 2.3的一个bug
四、代码分析
报错代码
类:DFSStripedOutputStream
private int addTo(int i, byte[] b, int off, int len) {
ByteBuffer buf = this.buffers[i];
int pos = buf.position() + len;
Preconditions.checkState(pos <= DFSStripedOutputStream.this.cellSize);
buf.put(b, off, len);
return pos;
}
public ErasureCodingPolicy(String name, ECSchema schema, int cellSize, byte id) {
Preconditions.checkNotNull(name);
Preconditions.checkNotNull(schema);
Preconditions.checkArgument(cellSize > 0, "cellSize must be positive");
Preconditions.checkArgument(cellSize % 1024 == 0, "cellSize must be 1024 aligned");
this.name = name;
this.schema = schema;
this.cellSize = cellSize;
this.id = id;
}
有关EC导致的直接问题,目前还没真正解决
最后
以上就是温婉悟空为你收集整理的Hadoop 3.1.0 ErasureCodingPolicy导致spark streaming的任务失败问题分析的全部内容,希望文章能够帮你解决Hadoop 3.1.0 ErasureCodingPolicy导致spark streaming的任务失败问题分析所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复