我是靠谱客的博主 含蓄月饼,这篇文章主要介绍spark streaming 整合kafka 报错 KafkaConsumer is not safe for multi-threaded access,现在分享给大家,希望可以做个参考。
问题描述
spark streaming 使用 直连方式 读取kafka 数据,使用窗口时出现
java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access
报错信息如图:
代码
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47object testScala { def main(args: Array[String]): Unit = { val conf = new SparkConf() .setAppName("fs01") .setMaster("local[*]") conf.set("spark.streaming.stopGracefullyOnShutdown", "true") conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") conf.set("spark.streaming.kafka.maxRatePerPartition","10") val sc = new SparkContext(conf) sc.setLogLevel("ERROR") val interval = PropertiesUtils.loadProperties("streaming.interval").toLong val ssc:StreamingContext = new StreamingContext(sc, Seconds(5)) val kalfa_server_list: String = PropertiesUtils.loadProperties("kafka.broker.list") val kafka_group: String = "group_test_role_1" val kafkaParams = Map[String, Object]( "bootstrap.servers" -> kalfa_server_list, "key.deserializer" -> classOf[StringDeserializer], "value.deserializer" -> classOf[StringDeserializer], "group.id" -> kafka_group, //消费者组名kafka.groupId "auto.offset.reset" -> "earliest", //earliest可以获取历史数据 "enable.auto.commit" -> "false") //如果是true,则这个消费者的偏移量会在后台自动提交 val topics = Array("t_2021-09") var offsetRanges: Array[OffsetRange] = Array.empty[OffsetRange] val kafkaStream = KafkaUtils.createDirectStream[String, String]( ssc, PreferConsistent, //多数时候采用该方式,在所有可用的executor上均匀分配kafka的主题的所有分区。 Subscribe[String, String](topics, kafkaParams)) val cacheOper = kafkaStream.transform(rdd=>{ offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges val check = rdd.map(x => { x.value()})//.cache() check }) cacheOper.window(Seconds(20),Seconds(10)).foreachRDD(rdd=>{ rdd.foreach(x=>{ println("data:"+x) Thread.sleep(2000L) }) kafkaStream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges) }) ssc.start() ssc.awaitTermination() } }
解决办法
添加 conf.set(“spark.streaming.kafka.consumer.cache.enabled”, “false”)
这个问题发生的原因是spark 缓存问题,可以查看官网spark streaming整合kafka官网地址
最后
以上就是含蓄月饼最近收集整理的关于spark streaming 整合kafka 报错 KafkaConsumer is not safe for multi-threaded access的全部内容,更多相关spark内容请搜索靠谱客的其他文章。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复