我是靠谱客的博主 真实洋葱,最近开发中收集的这篇文章主要介绍php消息队列kafka最佳实践,发布者最佳实践,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

黏性分区策略

只有发送到相同分区的消息,才会被放到同一个Batch中,因此决定一个Batch如何形成的一个因素是消息队列Kafka版Producer端设置的分区策略。消息队列Kafka版Producer允许通过设置Partitioner的实现类来选择适合自己业务的分区。在消息指定Key的情况下,消息队列Kafka版Producer的默认策略是对消息的Key进行哈希,然后根据哈希结果选择分区,保证相同Key的消息会发送到同一个分区。

在消息没有指定Key的情况下,消息队列Kafka版2.4版本之前的默认策略是循环使用主题的所有分区,将消息以轮询的方式发送到每一个分区上。但是,这种默认策略Batch的效果会比较差,在实际使用中,可能会产生大量的小Batch,从而使得实际的延迟增加。鉴于该默认策略对无Key消息的分区效率低问题,消息队列Kafka版在2.4版本引入了黏性分区策略(Sticky Partitioning Strategy)。

黏性分区策略主要解决无Key消息分散到不同分区,造成小Batch问题。其主要策略是如果一个分区的Batch完成后,就随机选择另一个分区,然后后续的消息尽可能地使用该分区。这种策略在短时间内看,会将消息发送到同一个分区,如果拉长整个运行时间,消息还是可以均匀地发布到各个分区上的。这样可以避免消息出现分区倾斜,同时还可以降低延迟,提升服务整体性能。

如果您使用的消息队列Kafka版Producer客户端是2.4及以上版本,默认的分区策略就采用黏性分区策略。如果您使用的Producer客户端版本小于2.4,可以根据黏性分区策略原理,自行实现分区策略,然后通过参数partitioner.class设置指定的分区策略。

关于黏性分区策略实现,您可以参考如下java版代码实现。该代码的实现逻辑主要是根据一定的时间间隔,切换一次分区。public class MyStickyPartitioner implements Partitioner {

// 记录上一次切换分区时间。

private long lastPartitionChangeTimeMillis = 0L;

// 记录当前分区。

private int currentPartition = -1;

// 分区切换时间间隔,可以根据实际业务选择切换分区的时间间隔。

private long partitionChangeTimeGap = 100L;

public void configure(Map configs) {}

/**

* Compute the partition for the given record.

*

* @param topic The topic name

* @param key The key to partition on (or null if no key)

* @param keyBytes serialized key to partition on (or null if no key)

* @param value The value to partition on or null

* @param valueBytes serialized value to partition on or null

* @param cluster The current cluster metadata

*/

public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {

// 获取所有分区信息。

List partitions = cluster.partitionsForTopic(topic);

int numPartitions = partitions.size();

if (keyBytes == null) {

List availablePartitions = cluster.availablePartitionsForTopic(topic);

int availablePartitionSize = availablePartitions.size();

// 判断当前可用分区。

if (availablePartitionSize > 0) {

handlePartitionChange(availablePartitionSize);

return availablePartitions.get(currentPartition).partition();

} else {

handlePartitionChange(numPartitions);

return currentPartition;

}

} else {

// 对于有key的消息,根据key的哈希值选择分区。

return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;

}

}

private void handlePartitionChange(int partitionNum) {

long currentTimeMillis = System.currentTimeMillis();

// 如果超过分区切换时间间隔,则切换下一个分区,否则还是选择之前的分区。

if (currentTimeMillis - lastPartitionChangeTimeMillis >= partitionChangeTimeGap

|| currentPartition < 0 || currentPartition >= partitionNum) {

lastPartitionChangeTimeMillis = currentTimeMillis;

currentPartition = Utils.toPositive(ThreadLocalRandom.current().nextInt()) % partitionNum;

}

}

public void close() {}

}

最后

以上就是真实洋葱为你收集整理的php消息队列kafka最佳实践,发布者最佳实践的全部内容,希望文章能够帮你解决php消息队列kafka最佳实践,发布者最佳实践所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(42)

评论列表共有 0 条评论

立即
投稿
返回
顶部