概述
黏性分区策略
只有发送到相同分区的消息,才会被放到同一个Batch中,因此决定一个Batch如何形成的一个因素是消息队列Kafka版Producer端设置的分区策略。消息队列Kafka版Producer允许通过设置Partitioner的实现类来选择适合自己业务的分区。在消息指定Key的情况下,消息队列Kafka版Producer的默认策略是对消息的Key进行哈希,然后根据哈希结果选择分区,保证相同Key的消息会发送到同一个分区。
在消息没有指定Key的情况下,消息队列Kafka版2.4版本之前的默认策略是循环使用主题的所有分区,将消息以轮询的方式发送到每一个分区上。但是,这种默认策略Batch的效果会比较差,在实际使用中,可能会产生大量的小Batch,从而使得实际的延迟增加。鉴于该默认策略对无Key消息的分区效率低问题,消息队列Kafka版在2.4版本引入了黏性分区策略(Sticky Partitioning Strategy)。
黏性分区策略主要解决无Key消息分散到不同分区,造成小Batch问题。其主要策略是如果一个分区的Batch完成后,就随机选择另一个分区,然后后续的消息尽可能地使用该分区。这种策略在短时间内看,会将消息发送到同一个分区,如果拉长整个运行时间,消息还是可以均匀地发布到各个分区上的。这样可以避免消息出现分区倾斜,同时还可以降低延迟,提升服务整体性能。
如果您使用的消息队列Kafka版Producer客户端是2.4及以上版本,默认的分区策略就采用黏性分区策略。如果您使用的Producer客户端版本小于2.4,可以根据黏性分区策略原理,自行实现分区策略,然后通过参数partitioner.class设置指定的分区策略。
关于黏性分区策略实现,您可以参考如下java版代码实现。该代码的实现逻辑主要是根据一定的时间间隔,切换一次分区。public class MyStickyPartitioner implements Partitioner {
// 记录上一次切换分区时间。
private long lastPartitionChangeTimeMillis = 0L;
// 记录当前分区。
private int currentPartition = -1;
// 分区切换时间间隔,可以根据实际业务选择切换分区的时间间隔。
private long partitionChangeTimeGap = 100L;
public void configure(Map configs) {}
/**
* Compute the partition for the given record.
*
* @param topic The topic name
* @param key The key to partition on (or null if no key)
* @param keyBytes serialized key to partition on (or null if no key)
* @param value The value to partition on or null
* @param valueBytes serialized value to partition on or null
* @param cluster The current cluster metadata
*/
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
// 获取所有分区信息。
List partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
if (keyBytes == null) {
List availablePartitions = cluster.availablePartitionsForTopic(topic);
int availablePartitionSize = availablePartitions.size();
// 判断当前可用分区。
if (availablePartitionSize > 0) {
handlePartitionChange(availablePartitionSize);
return availablePartitions.get(currentPartition).partition();
} else {
handlePartitionChange(numPartitions);
return currentPartition;
}
} else {
// 对于有key的消息,根据key的哈希值选择分区。
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}
}
private void handlePartitionChange(int partitionNum) {
long currentTimeMillis = System.currentTimeMillis();
// 如果超过分区切换时间间隔,则切换下一个分区,否则还是选择之前的分区。
if (currentTimeMillis - lastPartitionChangeTimeMillis >= partitionChangeTimeGap
|| currentPartition < 0 || currentPartition >= partitionNum) {
lastPartitionChangeTimeMillis = currentTimeMillis;
currentPartition = Utils.toPositive(ThreadLocalRandom.current().nextInt()) % partitionNum;
}
}
public void close() {}
}
最后
以上就是真实洋葱为你收集整理的php消息队列kafka最佳实践,发布者最佳实践的全部内容,希望文章能够帮你解决php消息队列kafka最佳实践,发布者最佳实践所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复