我是靠谱客的博主 任性水杯,最近开发中收集的这篇文章主要介绍rocketmq源码-consumer负载均衡逻辑,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

前言

这篇笔记主要记录consumer在启动过程中,负载均衡的逻辑,多个消费者组成一个消费者组,对于集群模式,同一个消费者组中的多个消费者共同消费一个topic下的所有消息,所以每个consumer可能会处理N个messageQueue,至于哪个consumer消费哪个messageQueue,是由负载均衡策略决定的

源码

在消费者启动的时候,会通过负载均衡策略,来决定当前消费者处理哪几个messageQueue,入口是:

this.rebalanceService.start();

在run()方法中,会通过while循环,每20S,去进行一次负载均衡计算
在这里插入图片描述

在这里插入图片描述
无论是pull模式,还是push模式,都会调用到

org.apache.rocketmq.client.impl.consumer.RebalanceImpl#doRebalance

这个方法,是按照topic维度,进行负载均衡
在这里插入图片描述

广播模式

org.apache.rocketmq.client.impl.consumer.RebalanceImpl#rebalanceByTopic

在这个方法中,会先判断当前消费者的模式,是集群模式?还是广播模式
对于广播模式,无需进行负载均衡,因为广播模式,每个消费者都会消费所有的消息。也就是说,topic中的所有messageQueue,都是需要消费者去处理的
在这里插入图片描述

集群模式

对于集群模式,最重要的,就是这里根据负载均衡策略进行计算的逻辑
在这里插入图片描述

这里是根据负载均衡之后得到的结果,然后更新一些信息
在这里插入图片描述

这里更新的逻辑很重要,对于push模式,每个messageQueue会对应一个pullRequest请求,然后把pullRequest请求放到队列之后,线程会不停的从queue中拉取pullRequest,然后请求broker
updateProcessQueueTableInRebalance在这个方法中,就会去根据messageQueue,构建pullRequest请求,然后放到queue中

对于pull模式,是需要启动异步的pullTaskImpl任务,在这个任务中,会不停的去broker拉取消息,然后放到消费者主动拉取的队列中
messageQueueChanged() 这个方法,就会根据messageQueue,启动pullTaskImpl

所以,对于consumer,我们会发现,对于广播模式,无需进行负载均衡,每个消费者都会处理messageQueue中的消息,对于集群模式,同一个consumeGroup中的消费者,会分摊一个topic中所有的messageQueue

负载均衡策略

在consumer进行负载均衡时,默认提供了多个负载均衡策略;但是还没有仔细研究这几个负载均衡策略的细节,先列举出来

AllocateMachineRoomNearby

就近机房

AllocateMessageQueueAveragely

平均分配算法

AllocateMessageQueueAveragelyByCircle

平均轮询分配

AllocateMessageQueueByConfig

自定义配置

AllocateMessageQueueByMachineRoom

指定机房

AllocateMessageQueueConsistentHash

一致性hash算法

最后

以上就是任性水杯为你收集整理的rocketmq源码-consumer负载均衡逻辑的全部内容,希望文章能够帮你解决rocketmq源码-consumer负载均衡逻辑所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(39)

评论列表共有 0 条评论

立即
投稿
返回
顶部