概述
系列文章:
- 【RabbitMQ】如何保证消息可靠性投递?
- 【RabbitMQ】消息可靠性投递(一)Producer->Broker
- 【RabbitMQ】消息可靠性投递(二)Exchange->Queue
- 【RabbitMQ】消息可靠性投递(三)Queue存储消息
- 【RabbitMQ】消息可靠性投递(四)Queue–>Consumer
如果消费者收到消息后没来得及处理就发生异常,或者处理过程中发生异常,会导致④失败。服务端应该以某种方式得知消费者对消息的接收情况,并决定是否重新投递这条消息给其他消费者。
RabbitMQ提供了消费者的消息确认机制(message acknowledgement),消费者可以自动或者手动地发送ACK给服务端没有收到ACK的消息,消费者断开连接后,RabbitMQ会把这条消息发送给其他消费者。如果没有其他消费者,消费者重启后会重新消费这条消息,重复执行业务逻辑。
1.自动ACK
消费者默认采用的是自动ack(autoAck=true),所以我们可以不断的一条一条接收消息。
而自动ack的问题在于消息丢失问题,当消息到达Consumer就会给broker返回ack,若Consumer在处理中就宕机,那么当前消息就丢失了
注:在 Kafka 中,自动 ACK 是每隔一段时间 ACK 一次,而且消息的清除是根据根据配置的消息的清除策略, 所以,消息自动 ACK 容易引起的是消息重复消费(与 RabbitMQ 正好相反)
有没有一种方式,等Consumer处理完消息后,在当前消息的ack发给服务端?手动ACK
2.手动ACK
RabbitMQ会等待消费者显式地回复确认信号后才从队列中移去消息,而这种方式的问题在于,若服务器未收到消费者的ack时会一直阻塞,最终可能引起消息大量堆积。
若是采用原生API,消费者在订阅队列时可以指定 autoAck=false
public class AckConsumer {
private final static String QUEUE_NAME = "TEST_ACK_QUEUE";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setUri(ResourceUtil.getKey("rabbitmq.uri"));
// 建立连接
Connection conn = factory.newConnection();
// 创建消息通道
final Channel channel = conn.createChannel();
// 声明队列(默认交换机AMQP default,Direct)
// String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" Waiting for message....");
// 创建消费者,并接收消息
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
String msg = new String(body, "UTF-8");
System.out.println("Received message : '" + msg + "'");
if (msg.contains("拒收")){
// 拒绝消息
// requeue:是否重新入队列,true:是;false:直接丢弃,相当于告诉队列可以直接删除掉
// TODO 如果只有这一个消费者,requeue 为true 的时候会造成消息重复消费
channel.basicReject(envelope.getDeliveryTag(), false);
} else if (msg.contains("异常")){
// 批量拒绝(拒绝deliveryTag之前的消息)
// requeue:是否重新入队列
// TODO 如果只有这一个消费者,requeue 为true 的时候会造成消息重复消费
channel.basicNack(envelope.getDeliveryTag(), true, false);
} else {
// 手工应答
// 如果不应答,队列中的消息会一直存在,重新连接的时候会重复消费
channel.basicAck(envelope.getDeliveryTag(), true);
}
}
};
// 开始获取消息,注意这里开启了手工应答
// String queue, boolean autoAck, Consumer callback
channel.basicConsume(QUEUE_NAME, false, consumer);
}
}
在 Spring AMQP中 MessageListener 相当于消费者,可以在AcknowledgeMode
枚举类中看到,关于ack的配置具体有三种选择:
- NONE:自动ACK(默认)
- MANUAL: 手动ACK
- AUTO:如果方法未抛出异常,则发送 ack。
- 当抛出 AmqpRejectAndDontRequeueException 异常的时候,则消息会被拒绝,且不重新入队。
- 当抛出 ImmediateAcknowledgeAmqpException 异常,则消费者会发送ACK。
- 其他的异常,则消息会被拒绝,且 requeue = true会重新入队。
@Bean // 构建MessageListenerContainer的Bean时配置
public SimpleMessageListenerContainer container(ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
container.setQueues(getSecondQueue(),getThirdQueue()); // 监听的队列
container.setConcurrentConsumers(1); // 最小消费者数
container.setMaxConcurrentConsumers(5); // 最大的消费者数量
container.setDefaultRequeueRejected(false); // 是否重回队列
container.setAcknowledgeMode(AcknowledgeMode.AUTO); // 签收模式!!!
container.setExposeListenerChannel(true);
container.setConsumerTagStrategy(new ConsumerTagStrategy() { // 消费端的标签策略
public String createConsumerTag(String queue) {
return null;
}
});
return container;
}
若在SpringBoot的配置:
spring.rabbitmq.listener.direct.acknowledge-mode=manual # 默认为NONE,自动ack
spring.rabbitmq.listener.simple.acknowledge-mode=manual
具体的调用代码如下:
@Component
@RabbitListener(queues = "SECOND_QUEUE")
public class SecondConsumer {
@RabbitHandler
// 当要手动确认时,参数中要有Channel和Message (注:Channel是rabbitmq.client.Channel不是amqp的)
public void process(String msg,Channel channel,Message message){
System.out.println("Second Queue received msg:" + msg);
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); // 手动ack
}
}
3.拒绝策略
如果消息无法处理或者消费失败,也有两种拒绝的方式,
void basicReject(long deliveryTag, boolean requeue)
单条拒绝void basicNack(long deliveryTag, boolean multiple, boolean requeue)
批量拒绝
如果requeue参数设置为true,可以把这条消息重新存入队列,以便发给下一个消费者(当然,只有一个消费者的时候,这种方式可能会出现无限循环重复消费的情况。可以投递到新的队列中,或者只打印异常日志)。如果requeue为false,当消息投递失败就会丢弃。
但无论消费者是发送ACK还是NACK,甚至是消费者出现异常,生产者也是完全不知情的。所以,生产者最终确定消费者有没有消费成功的方式:
- 消费者收到消息,处理完毕后,调用生产者的API
- 消费者收到消息,处理完毕后,发送一条响应消息给生产者
最后
以上就是欣慰保温杯为你收集整理的【RabbitMQ】消息可靠性投递(四)Queue-->Consumer的全部内容,希望文章能够帮你解决【RabbitMQ】消息可靠性投递(四)Queue-->Consumer所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复