我是靠谱客的博主 含蓄月饼,最近开发中收集的这篇文章主要介绍spark streaming 整合kafka 报错 KafkaConsumer is not safe for multi-threaded access,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

问题描述

spark streaming 使用 直连方式 读取kafka 数据,使用窗口时出现
java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access
报错信息如图:
在这里插入图片描述

代码
object testScala {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
      .setAppName("fs01")
      .setMaster("local[*]")
    conf.set("spark.streaming.stopGracefullyOnShutdown", "true")
    conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    conf.set("spark.streaming.kafka.maxRatePerPartition","10")

    val sc = new SparkContext(conf)
    sc.setLogLevel("ERROR")

    val interval = PropertiesUtils.loadProperties("streaming.interval").toLong
    val ssc:StreamingContext = new StreamingContext(sc, Seconds(5))
    val kalfa_server_list: String = PropertiesUtils.loadProperties("kafka.broker.list")
    val kafka_group: String = "group_test_role_1"
    val kafkaParams = Map[String, Object](
      "bootstrap.servers" ->  kalfa_server_list, 
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" ->  kafka_group, //消费者组名kafka.groupId
      "auto.offset.reset" -> "earliest", //earliest可以获取历史数据
      "enable.auto.commit" -> "false") //如果是true,则这个消费者的偏移量会在后台自动提交
    val topics = Array("t_2021-09") 
    var offsetRanges: Array[OffsetRange] = Array.empty[OffsetRange]
    val kafkaStream = KafkaUtils.createDirectStream[String, String](
      ssc,
      PreferConsistent, //多数时候采用该方式,在所有可用的executor上均匀分配kafka的主题的所有分区。
      Subscribe[String, String](topics, kafkaParams))
    val cacheOper = kafkaStream.transform(rdd=>{
      offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
      val check = rdd.map(x => {
           x.value()})//.cache()
      check
    }) 
     cacheOper.window(Seconds(20),Seconds(10)).foreachRDD(rdd=>{
      rdd.foreach(x=>{
        println("data:"+x)
        Thread.sleep(2000L)
      })
    kafkaStream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
    })
    ssc.start()
    ssc.awaitTermination()
  }
}
解决办法

添加 conf.set(“spark.streaming.kafka.consumer.cache.enabled”, “false”)
这个问题发生的原因是spark 缓存问题,可以查看官网spark streaming整合kafka官网地址

最后

以上就是含蓄月饼为你收集整理的spark streaming 整合kafka 报错 KafkaConsumer is not safe for multi-threaded access的全部内容,希望文章能够帮你解决spark streaming 整合kafka 报错 KafkaConsumer is not safe for multi-threaded access所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(56)

评论列表共有 0 条评论

立即
投稿
返回
顶部