我是靠谱客的博主 拉长大地,最近开发中收集的这篇文章主要介绍sparkstreaming ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

最近在测试sparkstreaming的时候发现了一个问题,记录一下
环境 spark 2.x, kafka_0.10.x
示例代码:

val ssc: StreamingContext = new StreamingContext(sparkSession.sparkContext,Seconds(5))

    val kafkaBrokers:String = "hadoop01:9092,hadoop02:9092,hadoop03:9092"
    val kafkaTopics: String = "test"

    val kafkaParam = Map(
      "bootstrap.servers" -> kafkaBrokers,//用于初始化链接到集群的地址
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> "group1",
      "auto.offset.reset" -> "earliest",
      "enable.auto.commit" -> (false: java.lang.Boolean)
    )

    val inputDStream=KafkaUtils.createDirectStream[String,String](ssc, LocationStrategies.PreferConsistent,
      ConsumerStrategies.Subscribe[String,String](Array(kafkaTopics),kafkaParam))

    val valueDStream: DStream[String] = inputDStream.map(_.value())
    valueDStream.foreachRDD(rdd =>{
      val tRDD: RDD[String] = rdd.filter(_.contains("t"))
      val hRDD: RDD[String] = rdd.filter(_.contains("h"))
      tRDD.union(hRDD).foreach(println)
    })

报错信息.java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access
kafka问题
原因分析:
这里的两个rdd读取的是同一份数据,当执行action时,都会触发两次数据的读操作,(rdd中的一个分区对应着topic中的一个分区,也就是说kafka中的一个分区的数据这里被读取了2次) 但是,同一个分区的数据只能被一个consumer消费,所以这里报错。
解决方法:一个可行的解决方案是对rdd进行缓存或者checkpoint,然后要能保证,原始的kafka中的数据,只会被消费一次,然后剩下的数据消费都从缓存中获取数据。
示例代码:

val valueDStream: DStream[String] = inputDStream.map(_.value()).persist(StorageLevel.DISK_ONLY)
    valueDStream.foreachRDD(rdd =>{
      val tRDD: RDD[String] = rdd.filter(_.contains("t"))
      val hRDD: RDD[String] = rdd.filter(_.contains("h"))
      tRDD.union(hRDD).foreach(println)
    })

参考:java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access
解决KafkaConsumer多线程接入不安全问题(spark streaming 消费kafka)

最后

以上就是拉长大地为你收集整理的sparkstreaming ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access的全部内容,希望文章能够帮你解决sparkstreaming ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(70)

评论列表共有 0 条评论

立即
投稿
返回
顶部