我是靠谱客的博主 含蓄月饼,最近开发中收集的这篇文章主要介绍spark streaming 整合kafka 报错 KafkaConsumer is not safe for multi-threaded access,觉得挺不错的,现在分享给大家,希望可以做个参考。
概述
问题描述
spark streaming 使用 直连方式 读取kafka 数据,使用窗口时出现
java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access
报错信息如图:
代码
object testScala {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
.setAppName("fs01")
.setMaster("local[*]")
conf.set("spark.streaming.stopGracefullyOnShutdown", "true")
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
conf.set("spark.streaming.kafka.maxRatePerPartition","10")
val sc = new SparkContext(conf)
sc.setLogLevel("ERROR")
val interval = PropertiesUtils.loadProperties("streaming.interval").toLong
val ssc:StreamingContext = new StreamingContext(sc, Seconds(5))
val kalfa_server_list: String = PropertiesUtils.loadProperties("kafka.broker.list")
val kafka_group: String = "group_test_role_1"
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> kalfa_server_list,
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> kafka_group, //消费者组名kafka.groupId
"auto.offset.reset" -> "earliest", //earliest可以获取历史数据
"enable.auto.commit" -> "false") //如果是true,则这个消费者的偏移量会在后台自动提交
val topics = Array("t_2021-09")
var offsetRanges: Array[OffsetRange] = Array.empty[OffsetRange]
val kafkaStream = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent, //多数时候采用该方式,在所有可用的executor上均匀分配kafka的主题的所有分区。
Subscribe[String, String](topics, kafkaParams))
val cacheOper = kafkaStream.transform(rdd=>{
offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
val check = rdd.map(x => {
x.value()})//.cache()
check
})
cacheOper.window(Seconds(20),Seconds(10)).foreachRDD(rdd=>{
rdd.foreach(x=>{
println("data:"+x)
Thread.sleep(2000L)
})
kafkaStream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
})
ssc.start()
ssc.awaitTermination()
}
}
解决办法
添加 conf.set(“spark.streaming.kafka.consumer.cache.enabled”, “false”)
这个问题发生的原因是spark 缓存问题,可以查看官网spark streaming整合kafka官网地址
最后
以上就是含蓄月饼为你收集整理的spark streaming 整合kafka 报错 KafkaConsumer is not safe for multi-threaded access的全部内容,希望文章能够帮你解决spark streaming 整合kafka 报错 KafkaConsumer is not safe for multi-threaded access所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复