我是靠谱客的博主 愉快路灯,最近开发中收集的这篇文章主要介绍kafka发送超大消息设置,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

最近开发一cdc应用,为了测试极端情况,需要kafka传递100万条数据过去,1个G左右,由于其他环节限制,不便进行拆包(注:测下来,大包走kafka不一定性能更好,甚至可能更低)。

测试百万以上的变更数据时,报消息超过kafka broker允许的最大值,因此需要修改如下参数,保证包能够正常发送:

socket.request.max.bytes=2147483647    # 设置了socket server接收的最大请求大小
log.segment.bytes=2147483647              # kafka数据文件的大小,确保这个数值大于一个消息的长度。一般说来使用默认值即可(一般一个消息很难大于1G,因为这是一个消息系统,而不是文件系统)。
message.max.bytes=2147483647             # 设置了kafka server接收的最大消息大小,应小于等于socket.request.max.bytes
replica.fetch.max.bytes=2147483647         #每个分区试图获取的消息字节数。要大于等于message.max.bytes,否则broker会接收此消息,但无法将此消息复制出去,从而造成数据丢失。
fetch.message.max.bytes=2147483647      #每个提取请求中为每个主题分区提取的消息字节数。要大于等于message.max.bytes,否则broker就会因为消费端无法使用这个消息而挂起。

生产者可以如下设定:

kafkaProps.put("max.request.size", 2147483647);
# 要小于 message.max.bytes,也可以设置在producer.properties配置文件中
kafkaProps.put("buffer.memory", 2147483647);
kafkaProps.put("timeout.ms", 3000000);
kafkaProps.put("request.timeout.ms", 30000000);

消费者设定如下:

props.put("request.timeout.ms", 30000000);
props.put("session.timeout.ms", "3000000");
props.put("fetch.max.wait.ms", "3000000");

各参数的含义可以参考kafka官方文档https://kafka.apache.org/documentation/#configuration。

kafka基础知识体系,请参考LZ学习笔记kafka学习指南(总结版)。

注,各参数对内存的影响如下:Brokers会为每个分区分配replica.fetch.max.bytes参数指定的内存空间,假设replica.fetch.max.bytes=1M,且有1000个分区,则需要差不多1G的内存,确保 分区数*最大的消息不会超过服务器的内存,否则会报OOM错误。同样地,消费端的fetch.message.max.bytes指定了最大消息需要的内存空间,同样,分区数*最大需要内存空间 不能超过服务器的内存。所以,如果你有大的消息要传送,则在内存一定的情况下,只能使用较少的分区数或者使用更大内存的服务器。

  虽然上面的方法可以奏效,但是并不推荐。Kafka设计的初衷是迅速处理短小的消息,一般10K大小的消息吞吐性能最好(可参见LinkedIn的kafka性能测试)。但有时候,我们需要处理更大的消息,比如XML文档或JSON内容,一个消息差不多有10-100M,这种情况下,Kakfa应该如何处理?

针对这个问题,有以下几个建议:

  1.   最好的方法是不直接传送这些大的数据。如果有共享存储,如NAS, HDFS, S3等,可以把这些大的文件存放到共享存储,然后使用Kafka来传送文件的位置信息。
  2.   第二个方法是,将大的消息数据切片或切块,在生产端将数据切片为10K大小,使用分区主键确保一个大消息的所有部分会被发送到同一个kafka分区(这样每一部分的拆分顺序得以保留),如此以来,当消费端使用时会将这些部分重新还原为原始的消息。
  3.   第三,Kafka的生产端可以压缩消息,如果原始消息是XML,当通过压缩之后,消息可能会变得不那么大。在生产端的配置参数中使用compression.codec和commpressed.topics可以开启压缩功能,压缩算法可以使用GZip或Snappy。

上面这些值太大还会造成一个问题,就是消息没有在指定时间内(max.poll.interval.ms(默认300秒))消费完,导致被rebalance,而kafka本身有个bug(服务器端的rebalance.timeout.ms(默认60秒)不生效),这会导致消费者组的rebalance时间比较长,所以这是需要注意的,参见https://blog.csdn.net/u013200380/article/details/87868696。

转载于:https://www.cnblogs.com/zhjh256/p/11369165.html

最后

以上就是愉快路灯为你收集整理的kafka发送超大消息设置的全部内容,希望文章能够帮你解决kafka发送超大消息设置所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(71)

评论列表共有 0 条评论

立即
投稿
返回
顶部