我是靠谱客的博主 听话毛豆,最近开发中收集的这篇文章主要介绍关于KafkaConsumer销毁时发生的错误:KafkaConsumer is not safe for multi-threaded access,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

关于KafkaConsumer销毁时发生的错误:KafkaConsumer is not safe for multi-threaded access


问题背景:
1.web工程启动时,使用Timer来周期执行任务:KafkaConsumer.poll()获取消息
2.web工程销毁时,使用KafkaConsumer.close()方法释放资源。


问题原因:
Timer启动任务时会创建一个新的线程,例如:ThreadA,在ThreadA里面周期执行KafkaConsumer.poll()方法。
工程销毁时,执行KafkaConsumer.close()的线程却是主线程:MainThreadB。于是kafka就认为有两个线程在同时操作他,报错:KafkaConsumer is not safe for multi-threaded access


解决办法:
解决思路就是让ThreadA来执行KafkaConsumer.close(),这样就保证了操作KafkaConsumer的都是同一个线程,就不会发生错误了。
开始我想的是在Task的run方法里,通过参数来判断执行:KafkaConsumer.close(),但是这个参数在什么情况下该发生变化,这个不好控制。
比如我在Timer.cancell()之后或之前让他变化,Task会有时执行,有时不执行。这样就有的时候会销毁KafkaConsumer,有的时候销毁不了。
然后,我又改成在Timer执行cancell时,通过让该Timer再schedule一个Task:TaskC,在TaskC里执行KafkaConsumer.close(),这样Timer会使用同一个线程去执行这个TaskC。
如果要保证在Timer调用cancell时,已经完成了TaskC的执行,这就得再加点参数来判断,我是通过重写Timer的cancell方法实现的。

最后

以上就是听话毛豆为你收集整理的关于KafkaConsumer销毁时发生的错误:KafkaConsumer is not safe for multi-threaded access的全部内容,希望文章能够帮你解决关于KafkaConsumer销毁时发生的错误:KafkaConsumer is not safe for multi-threaded access所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(64)

评论列表共有 0 条评论

立即
投稿
返回
顶部