我是靠谱客的博主 平常海燕,最近开发中收集的这篇文章主要介绍RocketMQ消息消费(二)消息队列负载与重新分布机制消息消费过程,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

消息队列负载与重新分布机制

RocketMQ消息队列重新分布是由RebalanceService线程来实现的。一个MQClient-Instance持有一个RebalanceService实现,并随着MQClientInstance的启动而启动。

RebalanceService#run

    @Override
    public void run() {
        log.info(this.getServiceName() + " service started");

        while (!this.isStopped()) {
            this.waitForRunning(waitInterval);
            this.mqClientFactory.doRebalance();
        }

        log.info(this.getServiceName() + " service end");
    }

RebalanceService线程默认每隔20s执行一次mqClientFactory.doRebalance()方法,可以使用-Drocketmq.client.rebalance.waitInterval=interval来改变默认值。

    public void doRebalance() {
        for (Map.Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) {
            MQConsumerInner impl = entry.getValue();
            if (impl != null) {
                try {
                    impl.doRebalance();
                } catch (Throwable e) {
                    log.error("doRebalance exception", e);
                }
            }
        }
    }

MQClientIinstance遍历已注册的消费者,对消费者执行doRebalance()方法。

    public void doRebalance(final boolean isOrder) {
        Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
        if (subTable != null) {
            for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
                final String topic = entry.getKey();
                try {
                    this.rebalanceByTopic(topic, isOrder);
                } catch (Throwable e) {
                    if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                        log.warn("rebalanceByTopic Exception", e);
                    }
                }
            }
        }

        this.truncateMessageQueueNotMyTopic();
    }

每个DefaultMQPushConsumerImpl都持有一个单独的RebalanceImpl对象,该方法主要是遍历订阅信息对每个主题的队列进行重新负载。RebalanceImpl的Map<String,SubscriptionData> subTable在调用消费者DefaultMQPushConsumerImpl#subscribe方法时填充。如果订阅信息发送变化,例如调用了unsubscribe方法,则需要将不关心的主题消费队列从processQueueTable中移除。

RebalanceImpl#rebalanceByTopic对单个主题进行消息队列重新负载(以集群模式)。

    private void rebalanceByTopic(final String topic, final boolean isOrder) {
        switch (messageModel) {
            case BROADCASTING: {
                Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
                if (mqSet != null) {
                    boolean changed = this.updateProcessQueueTableInRebalance(topic, mqSet, isOrder);
                    if (changed) {
                        this.messageQueueChanged(topic, mqSet, mqSet);
                        log.info("messageQueueChanged {} {} {} {}",
                            consumerGroup,
                            topic,
                            mqSet,
                            mqSet);
                    }
                } else {
                    log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
                }
                break;
            }
            case CLUSTERING: {
                Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
                List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
                if (null == mqSet) {
                    if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                        log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
                    }
                }

                if (null == cidAll) {
                    log.warn("doRebalance, {} {}, get consumer id list failed", consumerGroup, topic);
                }

                if (mqSet != null && cidAll != null) {
                    List<MessageQueue> mqAll = new ArrayList<MessageQueue>();
                    mqAll.addAll(mqSet);

                    Collections.sort(mqAll);
                    Collections.sort(cidAll);

                    AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;

                    List<MessageQueue> allocateResult = null;
                    try {
                        allocateResult = strategy.allocate(
                            this.consumerGroup,
                            this.mQClientFactory.getClientId(),
                            mqAll,
                            cidAll);
                    } catch (Throwable e) {
                        log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(),
                            e);
                        return;
                    }

                    Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();
                    if (allocateResult != null) {
                        allocateResultSet.addAll(allocateResult);
                    }

                    boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
                    if (changed) {
                        log.info(
                            "rebalanced result changed. allocateMessageQueueStrategyName={}, group={}, topic={}, clientId={}, mqAllSize={}, cidAllSize={}, rebalanceResultSize={}, rebalanceResultSet={}",
                            strategy.getName(), consumerGroup, topic, this.mQClientFactory.getClientId(), mqSet.size(), cidAll.size(),
                            allocateResultSet.size(), allocateResultSet);
                        this.messageQueueChanged(topic, mqSet, allocateResultSet);
                    }
                }
                break;
            }
            default:
                break;
        }
    }

1.步骤一

从主题订阅信息缓存表中获取主题的队列信息;发送请求从Broker中该消费组内当前所有的消费者客户端ID,RocketeMQ从主题的路由信息表中随机选择一个Broker。

                Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
                List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);

2.步骤二

首先对cidAll, mqAll排序,这个很重要,同一个消费组内看到的视图保持一致,确保同一个消费队列不会被多个消费者分配。RocketMQ消息队列分配算法接口。

                    Collections.sort(mqAll);
                    Collections.sort(cidAll);

                    AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;

                    List<MessageQueue> allocateResult = null;
                    try {
                        allocateResult = strategy.allocate(
                            this.consumerGroup,
                            this.mQClientFactory.getClientId(),
                            mqAll,
                            cidAll);
                    } catch (Throwable e) {
                        log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(),
                            e);
                        return;
                    }

RocketMQ默认提供5种分配算法。

  1. AllocateMessageQueueAveragely:平均分配,推荐指数为5颗星。
  2. AllocateMessageQueueAveragelyByCircle:平均轮询分配,推荐指数为5颗星。
  3. AllocateMessageQueueConsistentHash:一致性hash。不推荐使用,因为消息队列负载信息不容易跟踪。
  4. AllocateMessageQueueByConfig:根据配置,为每一个消费者配置固定的消息队列。
  5. AllocateMessageQueueByMachineRoom:根据Broker部署机房名,对每个消费者负责不同的Broker上的队列。

消息负载算法如果没有特殊的要求,尽量使用AllocateMessageQueueAveragely、AllocateMessageQueueAveragelyByCircle,因为分配算法比较直观。消息队列分配遵循一个消费者可以分配多个消息队列,但同一个消息队列只会分配给一个消费者,故如果消费者个数大于消息队列数量,则有些消费者无法消费消息。

对比消息队列是否发生变化,主要思路是遍历当前负载队列集合,如果队列不在新分配队列集合中,需要将该队列停止消费并保存消费进度;遍历已分配的队列,如果队列不在队列负载表中(processQueueTable)则需要创建该队列拉取任务PullRequest,然后添加到PullMessageService线程的pullRequestQueue中,PullMessageService才会继续拉取任务。

3.步骤三

ConcurrentMap<MessageQueue, ProcessQueue>processQueueTable,当前消费者负载的消息队列缓存表,如果缓存表中的MessageQueue不包含在mqSet中,说明经过本次消息队列负载后,该mq被分配给其他消费者,故需要暂停该消息队列消息的消费,方法是将ProccessQueue的状态设置为droped=true,该ProcessQueue中的消息将不会再被消费,调用removeUnnecessaryMessageQueue方法判断是否将MessageQueue、ProccessQueue缓存表中移除。removeUnnecessaryMessageQueue在RebalanceImple定义为抽象方法。remove UnnecessaryMessageQueue方法主要持久化待移除MessageQueue消息消费进度。在Push模式下,如果是集群模式并且是顺序消息消费时,还需要先解锁队列。

RebalanceImpl#updateProcessQueueTableInRebalance

        Iterator<Entry<MessageQueue, ProcessQueue>> it = this.processQueueTable.entrySet().iterator();
        while (it.hasNext()) {
            Entry<MessageQueue, ProcessQueue> next = it.next();
            MessageQueue mq = next.getKey();
            ProcessQueue pq = next.getValue();

            if (mq.getTopic().equals(topic)) {
                if (!mqSet.contains(mq)) {
                    pq.setDropped(true);
                    if (this.removeUnnecessaryMessageQueue(mq, pq)) {
                        it.remove();
                        changed = true;
                        log.info("doRebalance, {}, remove unnecessary mq, {}", consumerGroup, mq);
                    }
                } else if (pq.isPullExpired()) {
                    switch (this.consumeType()) {
                        case CONSUME_ACTIVELY:
                            break;
                        case CONSUME_PASSIVELY:
                            pq.setDropped(true);
                            if (this.removeUnnecessaryMessageQueue(mq, pq)) {
                                it.remove();
                                changed = true;
                                log.error("[BUG]doRebalance, {}, remove unnecessary mq, {}, because pull is pause, so try to fixed it",
                                    consumerGroup, mq);
                            }
                            break;
                        default:
                            break;
                    }
                }
            }
        }

4.步骤四

遍历本次负载分配到的队列集合,如果processQueueTable中没有包含该消息队列,表明这是本次新增加的消息队列,首先从内存中移除该消息队列的消费进度,然后从磁盘中读取该消息队列的消费进度,创建PullRequest对象。这里有一个关键,如果读取到的消费进度小于0,则需要校对消费进度。RocketMQ提供CONSUME_FROM_LAST_OFFSET、CONSUME_FROM_FIRST_OFFSET、CONSUME_FROM_TIMESTAMP方式,在创建消费者时可以通过调用DefaultMQPushConsumer#setConsumeFromWhere方法设置。PullRequest的nextOffset计算逻辑位于:RebalancePushImpl#computePullFromWhere。

        List<PullRequest> pullRequestList = new ArrayList<PullRequest>();
        for (MessageQueue mq : mqSet) {
            if (!this.processQueueTable.containsKey(mq)) {
                if (isOrder && !this.lock(mq)) {
                    log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq);
                    continue;
                }

                this.removeDirtyOffset(mq);
                ProcessQueue pq = new ProcessQueue();
                long nextOffset = this.computePullFromWhere(mq);
                if (nextOffset >= 0) {
                    ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);
                    if (pre != null) {
                        log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq);
                    } else {
                        log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);
                        PullRequest pullRequest = new PullRequest();
                        pullRequest.setConsumerGroup(consumerGroup);
                        pullRequest.setNextOffset(nextOffset);
                        pullRequest.setMessageQueue(mq);
                        pullRequest.setProcessQueue(pq);
                        pullRequestList.add(pullRequest);
                        changed = true;
                    }
                } else {
                    log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq);
                }
            }
        }

ConsumeFromWhere相关消费进度校正策略只有在从磁盘中获取消费进度返回-1时才会生效,如果从消息进度存储文件中返回的消费进度小于-1,表示偏移量非法,则使用偏移量-1去拉取消息,那么会发生什么呢?首先第一次去消息服务器拉取消息时无法取到消息,但是会用-1去更新消费进度,然后将消息消费队列丢弃,在下一次消息队列负载时会再次消费。

5.步骤五

将PullRequest加入到PullMessageService中,以便唤醒PullMessageService线程。

    @Override
    public void dispatchPullRequest(List<PullRequest> pullRequestList) {
        for (PullRequest pullRequest : pullRequestList) {
            this.defaultMQPushConsumerImpl.executePullRequestImmediately(pullRequest);
            log.info("doRebalance, {}, add a new pull request {}", consumerGroup, pullRequest);
        }
    }

总结

RebalanceService线程每隔20s对消费者订阅的主题进行一次队列重新分配,每一次分配都会获取主题的所有队列、从Broker服务器实时查询当前该主题该消费组内消费者列表,对新分配的消息队列会创建对应的PullRequest对象。在一个JVM进程中,同一个消费组同一个队列只会存在一个PullRequest对象。

由于每次进行队列重新负载时会从Broker实时查询出当前消费组内所有消费者,并且对消息队列、消费者列表进行排序,这样新加入的消费者就会在队列重新分布时分配到消费队列从而消费消息。

消息消费过程

消息拉取,PullMessageService负责对消息队列进行消息拉取,从远端服务器拉取消息后将消息存入ProcessQueue消息队列处理队列中,然后调用ConsumeMessageSer-vice#submitConsumeRequest方法进行消息消费,使用线程池来消费消息,确保了消息拉取与消息消费的解耦。

消息消费

消费者消息消费服务ConsumeMessageConcurrentlyService的主要方法是submitConsumeRequest提交消费请求。

1.步骤一

consumeMessageBatchMaxSize,消息批次,在这里看来也就是一次消息消费任务ConsumeRequest中包含的消息条数,默认为1, msgs.size()默认最多为32条,受DefaultMQPushConsumer.pullBatchSize属性控制,如果msgs.size()小于consumeMessageBatchMaxSize,则直接将拉取到的消息放入到ConsumeRequest中,然后将consumeRequest提交到消息消费者线程池中,如果提交过程中出现拒绝提交异常则延迟5s再提交,这里其实是给出一种标准的拒绝提交实现方式,实际过程中由于消费者线程池使用的任务队列为LinkedBlockingQueue无界队列,故不会出现拒绝提交异常。

        final int consumeBatchSize = this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
        if (msgs.size() <= consumeBatchSize) {
            ConsumeRequest consumeRequest = new ConsumeRequest(msgs, processQueue, messageQueue);
            try {
                this.consumeExecutor.submit(consumeRequest);
            } catch (RejectedExecutionException e) {
                this.submitConsumeRequestLater(consumeRequest);
            }
        }

2.步骤二

如果拉取的消息条数大于consumeMessageBatchMaxSize,则对拉取消息进行分页,每页consumeMessageBatchMaxSize条消息,创建多个ConsumeRequest任务并提交到消费线程池。ConsumeRequest的run方法封装了具体消息消费逻辑。

            for (int total = 0; total < msgs.size(); ) {
                List<MessageExt> msgThis = new ArrayList<MessageExt>(consumeBatchSize);
                for (int i = 0; i < consumeBatchSize; i++, total++) {
                    if (total < msgs.size()) {
                        msgThis.add(msgs.get(total));
                    } else {
                        break;
                    }
                }

                ConsumeRequest consumeRequest = new ConsumeRequest(msgThis, processQueue, messageQueue);
                try {
                    this.consumeExecutor.submit(consumeRequest);
                } catch (RejectedExecutionException e) {
                    for (; total < msgs.size(); total++) {
                        msgThis.add(msgs.get(total));
                    }

                    this.submitConsumeRequestLater(consumeRequest);
                }
            }

3.步骤三

进入具体消息消费时会先检查processQueue的dropped,如果设置为true,则停止该队列的消费,在进行消息重新负载时如果该消息队列被分配给消费组内其他消费者后,需要droped设置为true,阻止消费者继续消费不属于自己的消息队列。

            if (this.processQueue.isDropped()) {
                log.info("the message queue not be able to consume, because it's dropped. group={} {}", ConsumeMessageConcurrentlyService.this.consumerGroup, this.messageQueue);
                return;
            }

4.步骤四

执行消息消费钩子函数ConsumeMessageHook#consumeMessageBefore函数,通过consumer.getDefaultMQPushConsumerImpl().registerConsumeMessageHook(hook)方法消息消费执行钩子函数。

5.步骤五

RocketMQ将消息存入commitlog文件时,如果发现消息的延时级别delayTimeLevel大于0,会首先将重试主题存入在消息的属性中,然后设置主题名称为SCHEDULE_TOPIC,以便时间到后重新参与消息消费。

    public void resetRetryAndNamespace(final List<MessageExt> msgs, String consumerGroup) {
        final String groupTopic = MixAll.getRetryTopic(consumerGroup);
        for (MessageExt msg : msgs) {
            String retryTopic = msg.getProperty(MessageConst.PROPERTY_RETRY_TOPIC);
            if (retryTopic != null && groupTopic.equals(msg.getTopic())) {
                msg.setTopic(retryTopic);
            }

            if (StringUtils.isNotEmpty(this.defaultMQPushConsumer.getNamespace())) {
                msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQPushConsumer.getNamespace()));
            }
        }
    }

6.步骤六

执行具体的消息消费,调用应用程序消息监听器的consumeMessage方法,进入到具体的消息消费业务逻辑,返回该批消息的消费结果。最终将返回CONSUME_SUCCESS(消费成功)或RECONSUME_LATER(需要重新消费)。

            try {
                if (msgs != null && !msgs.isEmpty()) {
                    for (MessageExt msg : msgs) {
                        MessageAccessor.setConsumeStartTimeStamp(msg, String.valueOf(System.currentTimeMillis()));
                    }
                }
                status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);
            } catch (Throwable e) {
                log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}",
                    RemotingHelper.exceptionSimpleDesc(e),
                    ConsumeMessageConcurrentlyService.this.consumerGroup,
                    msgs,
                    messageQueue);
                hasException = true;
            }

7.步骤七

执行消息消费钩子函数ConsumeMessageHook#consumeMessageAfter函数。

8.步骤八

执行业务消息消费后,在处理结果前再次验证一下ProcessQueue的isDroped状态值,如果设置为true,将不对结果进行处理,也就是说如果在消息消费过程中进入到Step4时,如果由于由新的消费者加入或原先的消费者出现宕机导致原先分给消费者的队列在负载之后分配给别的消费者,那么在应用程序的角度来看的话,消息会被重复消费。

            if (!processQueue.isDropped()) {
                ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);
            }

9.步骤九

根据消息监听器返回的结果,计算ackIndex,如果返回CONSUME_SUCCESS, ackIndex设置为msgs.size()-1,如果返回RECONSUME_LATER, ackIndex=-1,这是为发送msg back(ACK)消息做准备的。

        switch (status) {
            case CONSUME_SUCCESS:
                if (ackIndex >= consumeRequest.getMsgs().size()) {
                    ackIndex = consumeRequest.getMsgs().size() - 1;
                }
                int ok = ackIndex + 1;
                int failed = consumeRequest.getMsgs().size() - ok;
                this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), ok);
                this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), failed);
                break;
            case RECONSUME_LATER:
                ackIndex = -1;
                this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(),
                    consumeRequest.getMsgs().size());
                break;
            default:
                break;
        }

10.步骤十

如果是集群模式,业务方返回RECONSUME_LATER,消息并不会重新被消费,只是以警告级别输出到日志文件。如果是集群模式,消息消费成功,由于ackIndex=consumeRequest.getMsgs().size()-1,故i=ackIndex+1等于consumeRequest.getMsgs().size(),并不会执行sendMessageBack。只有在业务方返回RECONSUME_LATER时,该批消息都需要发ACK消息,如果消息发送ACK失败,则直接将本批ACK消费发送失败的消息再次封装为ConsumeRequest,然后延迟5s后重新消费。如果ACK消息发送成功,则该消息会延迟消费。

        switch (this.defaultMQPushConsumer.getMessageModel()) {
            case BROADCASTING:
                for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
                    MessageExt msg = consumeRequest.getMsgs().get(i);
                    log.warn("BROADCASTING, the message consume failed, drop it, {}", msg.toString());
                }
                break;
            case CLUSTERING:
                List<MessageExt> msgBackFailed = new ArrayList<MessageExt>(consumeRequest.getMsgs().size());
                for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
                    MessageExt msg = consumeRequest.getMsgs().get(i);
                    boolean result = this.sendMessageBack(msg, context);
                    if (!result) {
                        msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
                        msgBackFailed.add(msg);
                    }
                }

                if (!msgBackFailed.isEmpty()) {
                    consumeRequest.getMsgs().removeAll(msgBackFailed);

                    this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue());
                }
                break;
            default:
                break;
        }

11.步骤十一

从ProcessQueue中移除这批消息,这里返回的偏移量是移除该批消息后最小的偏移量,然后用该偏移量更新消息消费进度,以便在消费者重启后能从上一次的消费进度开始消费,避免消息重复消费。值得重点注意的是当消息监听器返回RECONSUME_LATER,消息消费进度也会向前推进,用ProcessQueue中最小的队列偏移量调用消息消费进度存储器OffsetStore更新消费进度,这是因为当返回RECONSUME_LATER, RocketMQ会创建一条与原先消息属性相同的消息,拥有一个唯一的新msgId,并存储原消息ID,该消息会存入到commitlog文件中,与原先的消息没有任何关联,那该消息当然也会进入到ConsuemeQueue队列中,将拥有一个全新的队列偏移量。

        long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());
        if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
            this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true);
        }

消息确认(ACK)

如果消息监听器返回的消费结果为RECONSUME_LATER,则需要将这些消息发送给Broker延迟消息。如果发送ACK消息失败,将延迟5s后提交线程池进行消费。ACK消息发送的网络客户端入口:MQClientAPIImpl#consumerSendMessageBack,命令编码:RequestCode.CONSUMER_SEND_MSG_BACK。

1.步骤一

获取消费组的订阅配置信息,如果配置信息为空返回配置组信息不存在错误,如果重试队列数量小于1,则直接返回成功,说明该消费组不支持重试。

SendMessageProcessor#consumerSendMsgBack

        SubscriptionGroupConfig subscriptionGroupConfig =
            this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(requestHeader.getGroup());
        if (null == subscriptionGroupConfig) {
            response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST);
            response.setRemark("subscription group not exist, " + requestHeader.getGroup() + " "
                + FAQUrl.suggestTodo(FAQUrl.SUBSCRIPTION_GROUP_NOT_EXIST));
            return CompletableFuture.completedFuture(response);
        }
        if (!PermName.isWriteable(this.brokerController.getBrokerConfig().getBrokerPermission())) {
            response.setCode(ResponseCode.NO_PERMISSION);
            response.setRemark("the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1() + "] sending message is forbidden");
            return CompletableFuture.completedFuture(response);
        }

        if (subscriptionGroupConfig.getRetryQueueNums() <= 0) {
            response.setCode(ResponseCode.SUCCESS);
            response.setRemark(null);
            return CompletableFuture.completedFuture(response);
        }

2.步骤二

创建重试主题,重试主题名称:%RETRY%+消费组名称,并从重试队列中随机选择一个队列,并构建TopicConfig主题配置信息。

SendMessageProcessor#consumerSendMsgBack

        String newTopic = MixAll.getRetryTopic(requestHeader.getGroup());
        int queueIdInt = Math.abs(this.random.nextInt() % 99999999) % subscriptionGroupConfig.getRetryQueueNums();
        int topicSysFlag = 0;
        if (requestHeader.isUnitMode()) {
            topicSysFlag = TopicSysFlag.buildSysFlag(false, true);
        }

3.步骤三

根据消息物理偏移量从commitlog文件中获取消息,同时将消息的主题存入属性中。

        MessageExt msgExt = this.brokerController.getMessageStore().lookMessageByOffset(requestHeader.getOffset());
        if (null == msgExt) {
            response.setCode(ResponseCode.SYSTEM_ERROR);
            response.setRemark("look message by offset failed, " + requestHeader.getOffset());
            return CompletableFuture.completedFuture(response);
        }

        final String retryTopic = msgExt.getProperty(MessageConst.PROPERTY_RETRY_TOPIC);
        if (null == retryTopic) {
            MessageAccessor.putProperty(msgExt, MessageConst.PROPERTY_RETRY_TOPIC, msgExt.getTopic());
        }
        msgExt.setWaitStoreMsgOK(false);

4.步骤四

设置消息重试次数,如果消息已重试次数超过maxReconsumeTimes,再次改变newTopic主题为DLQ("%DLQ%"),该主题的权限为只写,说明消息一旦进入到DLQ队列中,RocketMQ将不负责再次调度进行消费了,需要人工干预。

        if (msgExt.getReconsumeTimes() >= maxReconsumeTimes 
            || delayLevel < 0) {
            newTopic = MixAll.getDLQTopic(requestHeader.getGroup());
            queueIdInt = Math.abs(this.random.nextInt() % 99999999) % DLQ_NUMS_PER_GROUP;

            topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic,
                    DLQ_NUMS_PER_GROUP,
                    PermName.PERM_WRITE, 0);
            if (null == topicConfig) {
                response.setCode(ResponseCode.SYSTEM_ERROR);
                response.setRemark("topic[" + newTopic + "] not exist");
                return CompletableFuture.completedFuture(response);
            }
        } 

5.步骤五

根据原先的消息创建一个新的消息对象,重试消息会拥有自己的唯一消息ID(msgId)并存入到commitlog文件中,并不会去更新原先消息,而是会将原先的主题、消息ID存入消息的属性中,主题名称为重试主题,其他属性与原先消息保持相同。

        MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
        msgInner.setTopic(newTopic);
        msgInner.setBody(msgExt.getBody());
        msgInner.setFlag(msgExt.getFlag());
        MessageAccessor.setProperties(msgInner, msgExt.getProperties());
        msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties()));
        msgInner.setTagsCode(MessageExtBrokerInner.tagsString2tagsCode(null, msgExt.getTags()));

        msgInner.setQueueId(queueIdInt);
        msgInner.setSysFlag(msgExt.getSysFlag());
        msgInner.setBornTimestamp(msgExt.getBornTimestamp());
        msgInner.setBornHost(msgExt.getBornHost());
        msgInner.setStoreHost(msgExt.getStoreHost());
        msgInner.setReconsumeTimes(msgExt.getReconsumeTimes() + 1);

        String originMsgId = MessageAccessor.getOriginMessageId(msgExt);
        MessageAccessor.setOriginMessageId(msgInner, UtilAll.isBlank(originMsgId) ? msgExt.getMsgId() : originMsgId);
 

6.步骤六

将消息存入到CommitLog文件中。(消息重试机制依托于定时任务实现)

     CompletableFuture<PutMessageResult> putMessageResult = this.brokerController.getMessageStore().asyncPutMessage(msgInner);

ACK消息存入CommitLog文件后,将依托RocketMQ定时消息机制在延迟时间到期后再次将消息拉取,提交消费线程池。ACK消息是同步发送的,如果在发送过程中出现错误,将记录所有发送ACK消息失败的消息,然后再次封装成ConsumeRequest,延迟5s执行。

 

参考《RocketMQ技术内幕:RocketMQ架构设计与实现原理》

最后

以上就是平常海燕为你收集整理的RocketMQ消息消费(二)消息队列负载与重新分布机制消息消费过程的全部内容,希望文章能够帮你解决RocketMQ消息消费(二)消息队列负载与重新分布机制消息消费过程所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(78)

评论列表共有 0 条评论

立即
投稿
返回
顶部