我是靠谱客的博主 壮观含羞草,最近开发中收集的这篇文章主要介绍RabbitMQ 之消息可靠投递可靠投递RabbitMQ中的消息可靠传递RabbitMQ的事务机制RabbitMQ的生产者确认机制如何让消息可靠传递到队列mandatory 参数备份交换机RabbitMQ消费者消息确认机制 ACK总结,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

文章目录

  • 可靠投递
  • RabbitMQ中的消息可靠传递
  • RabbitMQ的事务机制
  • RabbitMQ的生产者确认机制
  • 如何让消息可靠传递到队列
  • mandatory 参数
  • 备份交换机
  • RabbitMQ消费者消息确认机制 ACK
  • 总结

RabbitMQ进行了重启,导致该集群下所有消费者都挂了,需要将项目重启后才能正常进行消费。
RabbitMQ重启期间消息投递失败,导致消息丢失,需要手动处理和恢复。
如何才能进行RabbitMQ的消息可靠投递呢?特别是在这样比较极端的情况,RabbitMQ集群不可用的时候,无法投递的消息该如何处理呢?

可靠投递

所谓可靠投递,就是确保消息能够百分百从生产者发送到服务器。
在RabbitMQ中,一个消息从生产者发送到RabbitMQ服务器,需要经历这么几个步骤:
1、生产者准备好需要投递的消息。
2、生产者与RabbitMQ服务器建立连接。
3、生产者发送消息。
4、RabbitMQ服务器接收到消息,并将其路由到指定队列。
5、RabbitMQ服务器发起回调,告知生产者消息发送成功。
在这里插入图片描述

RabbitMQ中的消息可靠传递

默认情况下,发送消息的操作是不会返回任何信息给生产者的,也就是说,默认情况下生产者是不知道消息有没有正确地到达服务器。

那么如何解决这个问题呢?

对此,RabbitMQ中有一些相关的解决方案:

  • 使用事务机制来让生产者感知消息被成功投递到服务器。
  • 通过生产者确认机制实现。

在RabbitMQ中,所有确保消息可靠投递的机制都会对性能产生一定影响,如使用不当,可能会对吞吐量造成重大影响,只有通过执行性能基准测试,才能在确定性能与可靠投递之间的平衡。

在使用可靠投递前,需要先思考以下问题:
1、消息发布时,保证消息进入队列的重要性有多高?
2、如果消息无法进行路由,是否应该将该消息返回给发布者?
3、如果消息无法被路由,是否应该将其发送到其他地方稍后再重新进行路由?
4、如果RabbitMQ服务器崩溃了,是否可以接受消息丢失?
5、RabbitMQ在处理新消息时是否应该确认它已经为发布者执行了所有请求的路由和持久化?
6、消息发布者是否可以批量投递消息?
7、在可靠投递上是否有可以接受的平衡性?是否可以接受一部分的不可靠性来提升性能?

只考虑平衡性不考虑性能是不行的,至于这个平衡的度具体如何把握,就要具体情况具体分析了,比如像订单数据这样敏感的信息,对可靠性的要求自然要比一般的业务消息对可靠性的要求高的多,因为订单数据是跟钱直接相关的,可能会导致直接的经济损失。

所以不仅应该知道有哪些保证消息可靠性的解决方案,还应该知道每种方案对性能的影响程度,以此来进行方案的选择。

RabbitMQ的事务机制

RabbitMQ是支持AMQP事务机制的,在生产者确认机制之前,事务是确保消息被成功投递的唯一方法。

在SpringBoot项目中,使用RabbitMQ事务其实很简单,只需要声明一个事务管理的Bean,并将RabbitTemplate的事务设置为true即可。

配置文件如下:

spring:
  rabbitmq:
    host: localhost
    password: guest
    username: guest
    virtualHost: /
    listener:
      type: simple
      simple:
        default-requeue-rejected: false
        acknowledge-mode: manual

配置交换机和队列,以及事务管理器。

import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.transaction.RabbitTransactionManager;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitMQConfig {
    public static final String BUSINESS_EXCHANGE_NAME = "rabbitmq.tx.demo.simple.business.exchange";
    public static final String BUSINESS_QUEUEA_NAME = "rabbitmq.tx.demo.simple.business.queue";

    // 声明业务Exchange
    @Bean("businessExchange")
    public FanoutExchange businessExchange() {
        return new FanoutExchange(BUSINESS_EXCHANGE_NAME);
    }

    // 声明业务队列
    @Bean("businessQueue")
    public Queue businessQueue() {
        return QueueBuilder.durable(BUSINESS_QUEUEA_NAME).build();
    }

    // 声明业务队列绑定关系
    @Bean
    public Binding businessBinding(@Qualifier("businessQueue") Queue queue,
                                   @Qualifier("businessExchange") FanoutExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange);
    }

    /**
     * 配置启用rabbitmq事务
     */
    @Bean
    public RabbitTransactionManager rabbitTransactionManager(CachingConnectionFactory connectionFactory) {
        return new RabbitTransactionManager(connectionFactory);
    }
}

消费者:

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.IOException;

@Slf4j
@Component
public class BusinessMsgConsumer {
    @RabbitListener(queues = RabbitMQConfig.BUSINESS_QUEUEA_NAME)
    public void receiveMsg(Message message, Channel channel) throws IOException {
        String msg = new String(message.getBody());
        log.info("收到业务消息:{}", msg);
        channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
    }
}

生产者:
1、在初始化方法里,通过使用 rabbitTemplate.setChannelTransacted(true); 来开启事务。
2、在发送消息的方法上加上 @Transactional 注解,这样在该方法中发生异常时,消息将不会发送。

import javax.annotation.PostConstruct;

@Slf4j
@Component
public class BusinessMsgProducer {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @PostConstruct
    private void init() {
        rabbitTemplate.setChannelTransacted(true);
    }

    @Transactional
    public void sendMsg(String msg) {
        rabbitTemplate.convertAndSend(RabbitMQConfig.BUSINESS_EXCHANGE_NAME, "key", msg);
        log.info("msg:{}", msg);
        if (msg != null && msg.contains("exception"))
            throw new RuntimeException("surprise!");
        log.info("消息已发送 {}", msg);
    }
}

生产消息Controller:

@RestController
public class BusinessController {

    @Autowired
    private BusinessMsgProducer producer;

    @RequestMapping("send")
    public void sendMsg(String msg){
        producer.sendMsg(msg);
    }
}

验证:

msg:1
消息已发送 1
收到业务消息:1
msg:2
消息已发送 2
收到业务消息:2
msg:3
消息已发送 3
收到业务消息:3
msg:exception

Servlet.service() for servlet [dispatcherServlet] in context with path [] threw exception [Request processing failed; nested exception is java.lang.RuntimeException: surprise!] with root cause

java.lang.RuntimeException: surprise!
	at com.mfrank.rabbitmqdemo.producer.BusinessMsgProducer.sendMsg(BusinessMsgProducer.java:30)
    ...

当 msg 的值为 exception 时, 在调用rabbitTemplate.convertAndSend 方法之后,程序抛出了异常,消息并没有发送出去,而是被当前事务回滚了。

当然,你可以将事务管理器注释掉,或者将初始化方法的开启事务注释掉,这样事务就不会生效,即使在调用了发送消息方法之后,程序发生了异常,消息也会被正常发送和消费。

RabbitMQ中的事务使用起来虽然简单,但是对性能的影响是不可忽视的,因为每次事务的提交都是阻塞式的等待服务器处理返回结果,而默认模式下,客户端是不需要等待的,直接发送就完事了,除此之外,事务消息需要比普通消息多4次与服务器的交互,这就意味着会占用更多的处理时间,所以如果对消息处理速度有较高要求时,尽量不要采用事务机制。

RabbitMQ的生产者确认机制

RabbitMQ中的生产者确认功能是AMQP规范的增强功能,当生产者发布给所有队列的已路由消息被消费者应用程序直接消费时,或者消息被放入队列并根据需要进行持久化时,一个Basic.Ack请求会被发送到生产者,如果消息无法路由,代理服务器将发送一个Basic.Nack RPC请求用于表示失败。然后由生产者决定该如何处理该消息。

也就是说,通过生产者确认机制,生产者可以在消息被服务器成功接收时得到反馈,并有机会处理未被成功接收的消息。

在Springboot中开启RabbitMQ的生产者确认模式也很简单,只多了一行配置:

spring:
  rabbitmq:
    host: localhost
    password: guest
    username: guest
    virtualHost: /
    listener:
      type: simple
      simple:
        default-requeue-rejected: false
        acknowledge-mode: manual
    publisher-confirm-type: correlated

publisher-confirm-type: correlated 即表示开启生产者确认模式。

修改消息生产者代码:
让生产者继承自RabbitTemplate.ConfirmCallback 类,然后实现其confirm 方法,即可用其接收服务器回调。
发送消息时,代码也进行了调整。

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.util.UUID;

@Slf4j
@Component
public class BusinessMsgProducer implements RabbitTemplate.ConfirmCallback {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @PostConstruct
    private void init() {
        rabbitTemplate.setConfirmCallback(this);
    }

    public void sendMsg(String msg) {
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
        log.info("消息id:{}, msg:{}", correlationData.getId(), msg);
        rabbitTemplate.convertAndSend(RabbitMQConfig.BUSINESS_EXCHANGE_NAME, "key", msg, correlationData);
    }

    @Override
    public void confirm(CorrelationData correlationData, boolean b, String s) {
        String id = correlationData != null ? correlationData.getId() : "";
        if (b) {
            log.info("消息确认成功, id:{}", id);
        } else {
            log.error("消息未成功投递, id:{}, cause:{}", id, s);
        }
    }
}
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
rabbitTemplate.convertAndSend(RabbitMQConfig.BUSINESS_EXCHANGE_NAME, "key", msg, correlationData);

为消息设置了消息ID,以便在回调时通过该ID来判断是对哪个消息的回调,因为在回调函数中,我们是无法直接获取到消息内容的,所以需要将消息先暂存起来,根据消息的重要程度,可以考虑使用本地缓存,或者存入Redis中,或者Mysql中,然后在回调时更新其状态或者从缓存中移除,最后使用定时任务对一段时间内未发送的消息进行重新投递。

如何让消息可靠传递到队列

上面了解了如何保证消息被可靠投递到RabbitMQ的交换机中,但还是存在一些问题。

在仅开启了生产者确认机制的情况下,交换机接收到消息后,会直接给消息生产者发送确认消息,如果发现该消息不可路由,那么消息会被直接丢弃,此时,生产者是不知道消息被丢弃这个事件的。
在这里插入图片描述
将交换机类型改为DirectExchange,这样就只有当消息的 RoutingKey 和队列绑定时设置的 Bindingkey (这里即“key”)一致时,才会真正将该消息进行路由。

@Configuration
public class RabbitMQConfig {
    public static final String BUSINESS_EXCHANGE_NAME = "rabbitmq.tx.demo.simple.business.exchange";
    public static final String BUSINESS_QUEUEA_NAME = "rabbitmq.tx.demo.simple.business.queue";

    // 声明业务 Exchange
    @Bean("businessExchange")
    public DirectExchange businessExchange(){
        return new DirectExchange(BUSINESS_EXCHANGE_NAME);
    }

    // 声明业务队列
    @Bean("businessQueue")
    public Queue businessQueue(){
        return QueueBuilder.durable(BUSINESS_QUEUEA_NAME).build();
    }

    // 声明业务队列绑定关系
    @Bean
    public Binding businessBinding(@Qualifier("businessQueue") Queue queue,
                                   @Qualifier("businessExchange") DirectExchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with("key");
    }
}

修改消息生产者:

@Slf4j
@Component
public class BusinessMsgProducer implements RabbitTemplate.ConfirmCallback {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @PostConstruct
    private void init() {
        rabbitTemplate.setConfirmCallback(this);
    }

    public void sendCustomMsg(String msg) {
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
        log.info("消息id:{}, msg:{}", correlationData.getId(), msg);
        rabbitTemplate.convertAndSend(RabbitMQConfig.BUSINESS_EXCHANGE_NAME, "key", msg, correlationData);

        correlationData = new CorrelationData(UUID.randomUUID().toString());
        log.info("消息id:{}, msg:{}", correlationData.getId(), msg);
        rabbitTemplate.convertAndSend(RabbitMQConfig.BUSINESS_EXCHANGE_NAME, "key2", msg, correlationData);
    }

    @Override
    public void confirm(CorrelationData correlationData, boolean b, String s) {
        String id = correlationData != null ? correlationData.getId() : "";
        if (b) {
            log.info("消息确认成功, id:{}", id);
        } else {
            log.error("消息未成功投递, id:{}, cause:{}", id, s);
        }
    }
}

验证:

消息id:ba6bf502-9381-4220-8dc9-313d6a289a4e, msg:1
消息id:f0040a41-dc02-4e45-b8af-e3cfa8a118b2, msg:1
消息确认成功, id:ba6bf502-9381-4220-8dc9-313d6a289a4e
消息确认成功, id:f0040a41-dc02-4e45-b8af-e3cfa8a118b2
收到业务消息:1

可以看到,发送了两条消息,第一条消息的 RoutingKey 为 “key”,第二条消息的 RoutingKey 为 “key2”,两条消息都成功被交换机接收,也收到了交换机的确认回调,但消费者只收到了一条消息,因为第二条消息的 RoutingKey 与队列的 BindingKey 不一致,也没有其它队列能接收这个消息,所有第二条消息被直接丢弃了。

那么,如何让消息被路由到队列后再返回ACK呢?
或者无法被路由的消息帮我想办法处理一下?
最起码通知我一声,我好自己处理啊。

RabbitMQ里有两个机制刚好可以解决我们上面的疑问:
1、mandatory 参数
2、备份交换机

mandatory 参数

设置 mandatory 参数可以在当消息传递过程中不可达目的地时将消息返回给生产者。

当把 mandotory 参数设置为 true 时,如果交换机无法将消息进行路由时,会将该消息返回给生产者,而如果该参数设置为false,如果发现消息无法进行路由,则直接丢弃。

设置 mandatory 参数后,如果消息无法被路由,则会返回给生产者,是通过回调的方式进行的,所以,生产者需要设置相应的回调函数才能接受该消息。

为了进行回调,我们需要实现一个接口 RabbitTemplate.ReturnCallback

@Slf4j
@Component
public class BusinessMsgProducer implements RabbitTemplate.ReturnCallback, RabbitTemplate.ConfirmCallback {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @PostConstruct
    private void init() {
        rabbitTemplate.setMandatory(true);
        rabbitTemplate.setConfirmCallback(this);
        rabbitTemplate.setReturnCallback(this);
    }

    public void sendCustomMsg(String msg) {
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
        log.info("消息id:{}, msg:{}", correlationData.getId(), msg);
        rabbitTemplate.convertAndSend(RabbitMQConfig.BUSINESS_EXCHANGE_NAME, "key", msg, correlationData);

        correlationData = new CorrelationData(UUID.randomUUID().toString());
        log.info("消息id:{}, msg:{}", correlationData.getId(), msg);
        rabbitTemplate.convertAndSend(RabbitMQConfig.BUSINESS_EXCHANGE_NAME, "key2", msg, correlationData);
    }

    @Override
    public void confirm(CorrelationData correlationData, boolean b, String s) {
        String id = correlationData != null ? correlationData.getId() : "";
        if (b) {
            log.info("消息确认成功, id:{}", id);
        } else {
            log.error("消息未成功投递, id:{}, cause:{}", id, s);
        }
    }

    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        log.info("消息被服务器退回。msg:{}, replyCode:{}. replyText:{}, exchange:{}, routingKey :{}",
                new String(message.getBody()), replyCode, replyText, exchange, routingKey);
    }
}

验证:

消息id:2e5c336a-883a-474e-b40e-b6e3499088ef, msg:1
消息id:85c771cb-c88f-47dd-adea-f0da57138423, msg:1
消息确认成功, id:2e5c336a-883a-474e-b40e-b6e3499088ef
消息无法被路由,被服务器退回。msg:1, replyCode:312. replyText:NO_ROUTE, exchange:rabbitmq.tx.demo.simple.business.exchange, routingKey :key2
消息确认成功, id:85c771cb-c88f-47dd-adea-f0da57138423
收到业务消息:1

可以看到,我们接收到了被退回的消息,并带上了消息被退回的原因:NO_ROUTE。但是要注意的是, mandatory 参数仅仅是在当消息无法被路由的时候,让生产者可以感知到这一点,只要开启了生产者确认机制,无论是否设置了 mandatory 参数,都会在交换机接收到消息时进行消息确认回调,而且通常消息的退回回调会在消息的确认回调之前。

备份交换机

有了 mandatory 参数,我们获得了对无法投递消息的感知能力,有机会在生产者的消息无法被投递时发现并处理。但有时候,我们并不知道该如何处理这些无法路由的消息,最多打个日志,然后触发报警,再来手动处理。而通过日志来处理这些无法路由的消息是很不优雅的做法,特别是当生产者所在的服务有多台机器的时候,手动复制日志会更加麻烦而且容易出错。

而且设置 mandatory 参数会增加生产者的复杂性,需要添加处理这些被退回的消息的逻辑。如果既不想丢失消息,又不想增加生产者的复杂性,该怎么做呢?

在 RabbitMQ 中,有一种备份交换机的机制存在,可以很好的应对这个问题。

什么是备份交换机呢?备份交换机可以理解为 RabbitMQ 中交换机的“备胎”,当我们为某一个交换机声明一个对应的备份交换机时,就是为它创建一个备胎,当交换机接收到一条不可路由消息时,将会将这条消息转发到备份交换机中,由备份交换机来进行转发和处理,通常备份交换机的类型为 Fanout ,这样就能把所有消息都投递到与其绑定的队列中,然后我们在备份交换机下绑定一个队列,这样所有那些原交换机无法被路由的消息,就会都进入这个队列了。当然,我们还可以建立一个报警队列,用独立的消费者来进行监测和报警。
在这里插入图片描述
设置一下备份交换机:
使用 ExchangeBuilder 来创建交换机,并为其设置备份交换机:.withArgument("alternate-exchange", BUSINESS_BACKUP_EXCHANGE_NAME);
为业务交换机绑定了一个队列,为备份交换机绑定了两个队列,一个用来存储不可投递消息,待之后人工处理,一个专门用来做报警用途。

@Configuration
public class RabbitMQConfig {

    public static final String BUSINESS_EXCHANGE_NAME = "rabbitmq.backup.test.exchange";
    public static final String BUSINESS_QUEUE_NAME = "rabbitmq.backup.test.queue";
    public static final String BUSINESS_BACKUP_EXCHANGE_NAME = "rabbitmq.backup.test.backup-exchange";
    public static final String BUSINESS_BACKUP_QUEUE_NAME = "rabbitmq.backup.test.backup-queue";
    public static final String BUSINESS_BACKUP_WARNING_QUEUE_NAME = "rabbitmq.backup.test.backup-warning-queue";

    // 声明业务 Exchange
    @Bean("businessExchange")
    public DirectExchange businessExchange() {
        ExchangeBuilder exchangeBuilder = ExchangeBuilder.directExchange(BUSINESS_EXCHANGE_NAME)
                .durable(true)
                .withArgument("alternate-exchange", BUSINESS_BACKUP_EXCHANGE_NAME);
        return exchangeBuilder.build();
    }

    // 声明备份 Exchange
    @Bean("backupExchange")
    public FanoutExchange backupExchange() {
        ExchangeBuilder exchangeBuilder = ExchangeBuilder.fanoutExchange(BUSINESS_BACKUP_EXCHANGE_NAME)
                .durable(true);
        return exchangeBuilder.build();
    }

    // 声明业务队列
    @Bean("businessQueue")
    public Queue businessQueue() {
        return QueueBuilder.durable(BUSINESS_QUEUE_NAME).build();
    }

    // 声明业务队列绑定关系
    @Bean
    public Binding businessBinding(@Qualifier("businessQueue") Queue queue,
                                   @Qualifier("businessExchange") DirectExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with("key");
    }

    // 声明备份队列
    @Bean("backupQueue")
    public Queue backupQueue() {
        return QueueBuilder.durable(BUSINESS_BACKUP_QUEUE_NAME).build();
    }

    // 声明报警队列
    @Bean("warningQueue")
    public Queue warningQueue() {
        return QueueBuilder.durable(BUSINESS_BACKUP_WARNING_QUEUE_NAME).build();
    }

    // 声明备份队列绑定关系
    @Bean
    public Binding backupBinding(@Qualifier("backupQueue") Queue queue,
                                 @Qualifier("backupExchange") FanoutExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange);
    }

    // 声明备份报警队列绑定关系
    @Bean
    public Binding backupWarningBinding(@Qualifier("warningQueue") Queue queue,
                                        @Qualifier("backupExchange") FanoutExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange);
    }
}

分别为业务交换机和备份交换机创建消费者:

@Slf4j
@Component
public class BusinessMsgConsumer {
    @RabbitListener(queues = RabbitMQConfig.BUSINESS_QUEUE_NAME)
    public void receiveMsg(Message message, Channel channel) throws IOException {
        String msg = new String(message.getBody());
        log.info("收到业务消息:{}", msg);
        channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
    }
}
@Slf4j
@Component
public class BusinessWaringConsumer {
    @RabbitListener(queues = RabbitMQConfig.BUSINESS_BACKUP_WARNING_QUEUE_NAME)
    public void receiveMsg(Message message, Channel channel) throws IOException {
        String msg = new String(message.getBody());
        log.error("发现不可路由消息:{}", msg);
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }
}

分别发送一条可路由消息和不可路由消息:
这里设置 mandatory 参数会让交换机将不可路由消息退回给生产者。

@Slf4j
@Component
public class BusinessMsgProducer implements RabbitTemplate.ReturnCallback, RabbitTemplate.ConfirmCallback {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @PostConstruct
    private void init() {
        rabbitTemplate.setMandatory(true);
        rabbitTemplate.setConfirmCallback(this);
        rabbitTemplate.setReturnCallback(this);
    }

    public void sendCustomMsg(String msg) {
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
        log.info("消息id:{}, msg:{}", correlationData.getId(), msg);
        rabbitTemplate.convertAndSend(RabbitMQConfig.BUSINESS_EXCHANGE_NAME, "key", msg, correlationData);

        correlationData = new CorrelationData(UUID.randomUUID().toString());
        log.info("消息id:{}, msg:{}", correlationData.getId(), msg);
        rabbitTemplate.convertAndSend(RabbitMQConfig.BUSINESS_EXCHANGE_NAME, "key2", msg, correlationData);
    }

    @Override
    public void confirm(CorrelationData correlationData, boolean b, String s) {
        String id = correlationData != null ? correlationData.getId() : "";
        if (b) {
            log.info("消息确认成功, id:{}", id);
        } else {
            log.error("消息未成功投递, id:{}, cause:{}", id, s);
        }
    }

    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        log.info("消息被服务器退回。msg:{}, replyCode:{}. replyText:{}, exchange:{}, routingKey :{}",
                new String(message.getBody()), replyCode, replyText, exchange, routingKey);
    }
}

验证:

消息id:0a3eca1e-d937-418c-a7ce-bfb8ce25fdd4, msg:1
消息id:d8c9e010-e120-46da-a42e-1ba21026ff06, msg:1
消息确认成功, id:0a3eca1e-d937-418c-a7ce-bfb8ce25fdd4
消息确认成功, id:d8c9e010-e120-46da-a42e-1ba21026ff06
发现不可路由消息:1
收到业务消息:1

可以看到,两条消息都可以收到确认成功回调,但是不可路由消息不会被回退给生产者,而是直接转发给备份交换机。可见备份交换机的处理优先级更高。

RabbitMQ消费者消息确认机制 ACK

RabbitMQ有一个ACK机制。当消费者获取消息后,会向RabbitMQ发送回执ACK,告知消息已经被接收。
默认消息一旦被接收,消费者自动发送ACK。

配置手动发送ACK:spring.rabbitmq.listener.simple.acknowledge-mode=manual

// 可以批量确认ACK
/*
 *  void basicAck(long deliveryTag, boolean multiple) throws IOException;
 *  deliveryTag:用来标识消息的id
 *  multiple:是否批量.true:将一次性ack所有小于deliveryTag的消息。
 */
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);

// 可以批量拒绝ACK
/**
 * deliveryTag: 消息的ID
 * multiple: 是否为批量拒绝
 * requeue: 表示该消息是否需要重新发送给消费者
 */
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);

// 只能拒绝一条消息
/**
 * deliveryTag: 消息的ID
 * requeue: 表示该消息是否需要重新发送给消费者
 */
channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);

总结

如果将消息发布到不存在的交换机上,那么发布用的信道将会被RabbitMQ关闭。

生产者确认机制跟事务是不能一起工作的,是事务的轻量级替代方案。因为事务和发布者确认模式都是需要先跟服务器协商,对信道启用的一种模式,不能对同一个信道同时使用两种模式。

在生产者确认模式中,消息的确认可以是异步和批量的,所以相比使用事务,性能会更好。

使用事务机制和生产者确认机制都能确保消息被正确的发送至RabbitMQ,这里的“正确发送至RabbitMQ”说的是消息成功被交换机接收,但如果找不到能接收该消息的队列,这条消息也会丢失。

所以当公司机房“断电”时,如何处理那些需要发送的消息呢?

一般来说,这种“断电”不会持续较长时间,一般几分钟到半小时之间,很快能够恢复,所以如果是重要消息,可以保存到数据库中,如果是非重要消息,可以使用redis进行保存,当然,还要根据消息的数量级来进行判断。

如果消息量比较大,可以考虑将消息发送到另一个集群的死信队列中,所以当一个集群不可用时,可以往另一个集群发消息,如果两个机房都停电了的话,当我没说。

处理生产者不丢失:
事务机制和生产者确认机制来确保消息的可靠投递,相对而言,生产者确认机制更加高效和灵活。
处理路由不丢失:
也可以通过 mandatory 参数和备份交换机来处理不可路由消息。

通过以上几种机制,我们总算是可以确保消息被万无一失的投递到目的地了。
消息可靠投递是我们使用MQ时无法逃避的话题,一次性搞定它,就不会再为其所困。

最后

以上就是壮观含羞草为你收集整理的RabbitMQ 之消息可靠投递可靠投递RabbitMQ中的消息可靠传递RabbitMQ的事务机制RabbitMQ的生产者确认机制如何让消息可靠传递到队列mandatory 参数备份交换机RabbitMQ消费者消息确认机制 ACK总结的全部内容,希望文章能够帮你解决RabbitMQ 之消息可靠投递可靠投递RabbitMQ中的消息可靠传递RabbitMQ的事务机制RabbitMQ的生产者确认机制如何让消息可靠传递到队列mandatory 参数备份交换机RabbitMQ消费者消息确认机制 ACK总结所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(47)

评论列表共有 0 条评论

立即
投稿
返回
顶部