我是靠谱客的博主 淡定白开水,最近开发中收集的这篇文章主要介绍【异常】SparkStreaming抛出Listener SQLListener threw an exception异常,觉得挺不错的,现在分享给大家,希望可以做个参考。
概述
问题描述
SparkStreaming在长时间运行时,偶尔会出现下面的异常:
2018-01-08 18:42:03 [ SparkListenerBus:32824468 ] - [ ERROR ] Listener SQLListener threw an exception
java.lang.IllegalStateException: Attempted to access garbage collected accumulator 5618419
at org.apache.spark.util.AccumulatorContext$$anonfun$get$1.apply(AccumulatorV2.scala:268)
at org.apache.spark.util.AccumulatorContext$$anonfun$get$1.apply(AccumulatorV2.scala:264)
at scala.Option.map(Option.scala:146)
at org.apache.spark.util.AccumulatorContext$.get(AccumulatorV2.scala:264)
at org.apache.spark.util.AccumulatorV2$$anonfun$name$1.apply(AccumulatorV2.scala:90)
at org.apache.spark.util.AccumulatorV2$$anonfun$name$1.apply(AccumulatorV2.scala:90)
at scala.Option.orElse(Option.scala:289)
at org.apache.spark.util.AccumulatorV2.name(AccumulatorV2.scala:90)
at org.apache.spark.util.AccumulatorV2.toInfo(AccumulatorV2.scala:111)
at org.apache.spark.sql.execution.ui.SQLListener$$anonfun$onTaskEnd$1.apply(SQLListener.scala:216)
at org.apache.spark.sql.execution.ui.SQLListener$$anonfun$onTaskEnd$1.apply(SQLListener.scala:216)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.spark.sql.execution.ui.SQLListener.onTaskEnd(SQLListener.scala:216)
at org.apache.spark.scheduler.SparkListenerBus$class.doPostEvent(SparkListenerBus.scala:45)
at org.apache.spark.scheduler.LiveListenerBus.doPostEvent(LiveListenerBus.scala:36)
at org.apache.spark.scheduler.LiveListenerBus.doPostEvent(LiveListenerBus.scala:36)
at org.apache.spark.util.ListenerBus$class.postToAll(ListenerBus.scala:63)
at org.apache.spark.scheduler.LiveListenerBus.postToAll(LiveListenerBus.scala:36)
at org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(LiveListenerBus.scala:94)
at org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(LiveListenerBus.scala:79)
at org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(LiveListenerBus.scala:79)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
at org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1.apply$mcV$sp(LiveListenerBus.scala:78)
at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1279)
at org.apache.spark.scheduler.LiveListenerBus$$anon$1.run(LiveListenerBus.scala:77)
通过Spark源码异常分析
SparkContext初始化的时候会创建LiveListenerBus,它是用执行Spark提交事件的事件队列,比如Job的启动停止,Task的启动停止,stage的提交,block添加删除等等
SparkContext初始化LiveListenerBus启动源码如下:
private var _listenerBus: LiveListenerBus = _ // 195
private[spark] def listenerBus: LiveListenerBus = _listenerBus // 248
_listenerBus = new LiveListenerBus(_conf) // 417
listenerBus.start(this, _env.metricsSystem) // 2393
LiveListenerBus的start方法源码如下:
private val eventQueue =
new LinkedBlockingQueue[SparkListenerEvent](conf.get(LISTENER_BUS_EVENT_QUEUE_CAPACITY))
def start(sc: SparkContext, metricsSystem: MetricsSystem): Unit = {
if (started.compareAndSet(false, true)) {
sparkContext = sc
metricsSystem.registerSource(metrics)
listenerThread.start()
} else {
throw new IllegalStateException(s"$name already started!")
}
}
private val listenerThread = new Thread(name) {
setDaemon(true)
override def run(): Unit = Utils.tryOrStopSparkContext(sparkContext) {
LiveListenerBus.withinListenerThread.withValue(true) {
val timer = metrics.eventProcessingTime
while (true) {
eventLock.acquire()
self.synchronized {
processingEvent = true
}
try {
val event = eventQueue.poll
if (event == null) {
// Get out of the while loop and shutdown the daemon thread
if (!stopped.get) {
throw new IllegalStateException("Polling `null` from eventQueue means" +
" the listener bus has been stopped. So `stopped` must be true")
}
return
}
val timerContext = timer.time()
try {
postToAll(event)
} finally {
timerContext.stop()
}
} finally {
self.synchronized {
processingEvent = false
}
}
}
}
}
}
可以看到,SparkContext初始化的时候会调用LivListenerBus的start方法来启动线程,该线程的作用是去取LiveListenerBus初始化时创建的LinkedBlockingQueue队列中的Event事件来执行相应的作业。
从异常栈中我们可以看到是在执行SQLListener的onTaskEnd方法时报的错,下面是SQLListener的源码:
override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = synchronized {
if (taskEnd.taskMetrics != null) {
updateTaskAccumulatorValues(
taskEnd.taskInfo.taskId,
taskEnd.stageId,
taskEnd.stageAttemptId,
taskEnd.taskMetrics.externalAccums.map(a => a.toInfo(Some(a.value), None)),
finishTask = true)
}
}
该方法调用了AccumulatroV2的name的toInfo()方法,然后toInfo()方法调用了name()方法,name()方法通过Accumulator的id去取originals的ConcurrentHashMap中的对象,但是这个是一个弱引用,GC回去进行回收的,所以,当内存不足时就会出现部分弱应用被回收,进而get的时候取不到数据,然后抛出异常:
throw new IllegalStateException(s"Attempted to access garbage collected accumulator $id")
private val originals = new ConcurrentHashMap[Long, jl.ref.WeakReference[AccumulatorV2[_, _]]]
private[spark] def toInfo(update: Option[Any], value: Option[Any]): AccumulableInfo = {
val isInternal = name.exists(_.startsWith(InternalAccumulator.METRICS_PREFIX))
new AccumulableInfo(id, name, update, value, isInternal, countFailedValues)
}
final def name: Option[String] = {
assertMetadataNotNull()
if (atDriverSide) {
metadata.name.orElse(AccumulatorContext.get(id).flatMap(_.metadata.name))
} else {
metadata.name
}
}
def get(id: Long): Option[AccumulatorV2[_, _]] = {
Option(originals.get(id)).map { ref =>
// Since we are storing weak references, we must check whether the underlying data is valid.
val acc = ref.get
if (acc eq null) {
throw new IllegalStateException(s"Attempted to access garbage collected accumulator $id")
}
acc
}
}
def postToAll(event: E): Unit = {
// JavaConverters can create a JIterableWrapper if we use asScala.
// However, this method will be called frequently. To avoid the wrapper cost, here we use
// Java Iterator directly.
val iter = listenersPlusTimers.iterator
while (iter.hasNext) {
val listenerAndMaybeTimer = iter.next()
val listener = listenerAndMaybeTimer._1
val maybeTimer = listenerAndMaybeTimer._2
val maybeTimerContext = if (maybeTimer.isDefined) {
maybeTimer.get.time()
} else {
null
}
try {
doPostEvent(listener, event)
} catch {
case NonFatal(e) =>
logError(s"Listener ${Utils.getFormattedClassName(listener)} threw an exception", e)
} finally {
if (maybeTimerContext != null) {
maybeTimerContext.stop()
}
}
}
}
这个地方就出现了我们上面的异常信息。
我们可以看到最后,这个异常其实是不影响我们系统的正常运行的。
最后
以上就是淡定白开水为你收集整理的【异常】SparkStreaming抛出Listener SQLListener threw an exception异常的全部内容,希望文章能够帮你解决【异常】SparkStreaming抛出Listener SQLListener threw an exception异常所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复