概述
目录
序言
消息发布流程
发布消息确认
一、事务使用
二、Confirm发送方确认模式
方式一:普通Confirm模式
方式二:批量Confirm模式
方式三:异步Confirm模式
扩展知识
消费消息确认
总结
序言
RabbitMQ作为一种消息队列,是一个基于AMQP(Advanced Message Queuing Protocol )基础上实现的,可复用的企业消息系统。它不仅可以用于大型软件系统各个模块之间的高效通信,还支持高并发,支持可扩展。使用RabbitMQ作为各个模块之间的通信工具,不仅有利于各个模块之间的解耦,还可以在高峰期起到削峰减流的作用,这里问题就来了,那如何保证消息的可靠呢?这个就要涉及到RabbitMQ两大高级特性,也就是接下来要讲的:①发布消息确认 ②消费者消息确认。
消息发布流程
RabbitMQ发布消息流程大致为: 生产者先将消息发送到交换机,发送消息时需要携带路由键,交换机根据发送过来的路由键,再转发到符合该路由键的队列上面,消费者监听队列一有消息,系统会自动将消息分发给消费者,消费者拿到发送过来的消息,就可以处理相应的业务逻辑
发布消息确认
正常情况下,如果消息经过交换机进入队列就可以完成消息的持久化,但如果消息在没有到达broker之前出现意外,那就造成消息丢失,有没有办法可以解决这个问题?
RabbitMQ有两种方式来解决这个问题:
- 通过AMQP提供的事务机制实现;
- 使用发送者确认模式实现;
一、事务使用
事务的实现主要是对信道(Channel)的设置,主要的方法有三个:
-
channel.txSelect()声明启动事务模式;
-
channel.txComment()提交事务;
-
channel.txRollback()回滚事务;
我们来看具体的代码实现:
public class Demo {
public static void main(String[] args) throws IOException, TimeoutException {
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("guest");
factory.setPassword("guest");
Connection connection = factory.newConnection();
//创建信道
Channel channel = connection.createChannel();
//声明交换机
channel.exchangeDeclare(" directExchange", BuiltinExchangeType.DIRECT,false);
//声明队列
channel.queueDeclare("direct_queue",false,false,false,null);
//将队列与交换机进行绑定
channel.queueBind("direct_queue","directExchange","route_key");
String message = String.format("时间 => %s", new Date().getTime());
try {
channel.txSelect(); //声明事务
//发送消息
channel.basicPublish("directExchange","direct",null,message.getBytes());
channel.txCommit(); //提交事务
}catch (Exception e){
channel.txRollback(); //如果发生异常,则回退事务
}
connection.close(); //关闭连接
}
}
从上面的代码可以看出,发送消息之前需要先声明 channel 为事务模式,如果其中任意一个环节出现问题,就会抛出IoException异常,这样用户就可以根据实际情况来提交或者回滚事务,以上大概的交互流程为:
- 客户端发送给服务器Tx.Select(开启事务模式)
- 服务器端返回Tx.Select-Ok(开启事务模式ok)
- 推送消息
- 客户端发送给事务提交Tx.Commit
- 服务器端返回Tx.Commit-Ok
用户就可以根据服务器返回的 Tx.Commit-Ok 来判断消息是否发布成功,然后写自己的业务逻辑。
二、Confirm发送方确认模式
Confirm的三种实现方式:
方式一:channel.waitForConfirms()普通发送方确认模式;
方式二:channel.waitForConfirmsOrDie()批量确认模式;
方式三:channel.addConfirmListener()异步监听发送方确认模式;
方式一:普通Confirm模式
public class Demo {
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("guest");
factory.setPassword("guest");
Connection connection = factory.newConnection();
//创建信道
Channel channel = connection.createChannel();
//声明交换机
channel.exchangeDeclare(" directExchange", BuiltinExchangeType.DIRECT, false);
//声明队列
channel.queueDeclare("direct_queue", false, false, false, null);
//将队列与交换机进行绑定
channel.queueBind("direct_queue","directExchange","route_key");
String message = String.format("时间 => %s", new Date().getTime());
//开启发送方确认模式
channel.confirmSelect();
//发送消息
channel.basicPublish("directExchange", "direct", null, message.getBytes());
//消息发送确认返回值
boolean status = channel.waitForConfirms();
if (status){
System.out.println("消息发送成功");
}
}
}
从上面代码可以看出,发送消息之前只需要通过 channel开启发送方确认模式,等到消息发送完之后,可以调用channel.waitForConfirms()来确认消息是否发送成功,如果返回true则表示消息成功发送,如果返回false,则表示消息发送失败!
方式二:批量Confirm模式
public class Demo {
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("guest");
factory.setPassword("guest");
Connection connection = factory.newConnection();
//创建信道
Channel channel = connection.createChannel();
//声明交换机
channel.exchangeDeclare(" directExchange", BuiltinExchangeType.DIRECT, false);
//声明队列
channel.queueDeclare("direct_queue", false, false, false, null);
//将队列与交换机进行绑定
channel.queueBind("direct_queue","directExchange","route_key");
String message = String.format("时间 => %s", new Date().getTime());
//开启发送方确认模式
channel.confirmSelect();
//发送消息
for (int i = 0; i < 10; i++){
channel.basicPublish("directExchange", "direct", null, message.getBytes());
}
//程序会在这里停止等待,直到所有信息都发布,只要有一个未确认就会IOException异常
channel.waitForConfirmsOrDie();
}
}
以上代码可以看出来channel.waitForConfirmsOrDie(),使用同步方式等所有的消息发送之后才会执行后面代码,只要有一个消息未被确认就会抛出IOException异常,所以可以在程序里面捕获异常,当有异常抛出时,就表示消息发送失败,然后可以处理对应逻辑。
方式三:异步Confirm模式
public class Demo {
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("guest");
factory.setPassword("guest");
Connection connection = factory.newConnection();
//创建信道
Channel channel = connection.createChannel();
//声明交换机
channel.exchangeDeclare(" directExchange", BuiltinExchangeType.DIRECT, false);
//声明队列
channel.queueDeclare("direct_queue", false, false, false, null);
//将队列与交换机进行绑定
channel.queueBind("direct_queue","directExchange","route_key");
String message = String.format("时间 => %s", new Date().getTime());
//开启发送方确认模式
channel.confirmSelect();
//发送消息
for (int i = 0; i < 10; i++) {
channel.basicPublish("directExchange", "direct", null, message.getBytes());
}
//添加异步监听
channel.addConfirmListener(new ConfirmListener() {
@Override
public void handleAck(long l, boolean b) throws IOException {
System.out.println("已确认消息");
}
@Override
public void handleNack(long l, boolean b) throws IOException {
System.out.println("未确认消息");
//消息发送失败,在这里写相应处理逻辑,如:重新发送消息或者直接丢弃消息等等
}
});
}
}
异步模式的优点,就是执行效率高,不需要等待消息执行完,只需要监听消息即可,当消息发送成功时,回调handleAck()方法,当消息发送失败的时候,回调handleNack()方法,可以在失败的回调方法里面写相关的处理逻辑。
扩展知识
既然已经有事务了,没什么还要使用发送方确认模式呢,原因是因为事务的性能是非常差的。
发送方确认模式的性能大概要比事务快10倍左右!
消费消息确认
讲消费者确认模式之前,先来了解一下RabbitMQ一个默认的工作流程:将消息发送到队列时,服务器检测到队列中有消息,就会去寻找监听了该条队列的消费者,然后将消息丢到Stoker连接管道中,随后将服务器中的消息删除。
试想一下这样的场景:如果消息在还在管道时,消费者服务宕机了,消费者无法领取到消息,但此时RabbitMQ服务器已经把消息删除了,或者如果消费者领取消息后,还没执行操作就挂掉了呢?或者抛出了异常?消息消费失败,但是RabbitMQ无从得知,这样消息就丢失了!
因此,RabbitMQ有一个ACK机制。当消费者获取消息后,会向RabbitMQ发送回执ACK,告知消息已经被接收。不过这种回执ACK分两种情况:
-
自动ACK:消息一旦被接收,消费者自动发送ACK(默认)
-
手动ACK:消息接收后,不会发送ACK,需要手动调用
我们来看具体的代码实现
public class ConsumerDemo {
public static void main(String[] args) throws IOException, TimeoutException {
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("guest");
factory.setPassword("guest");
Connection connection = factory.newConnection();
//创建信道
Channel channel = connection.createChannel();
//如果有消息到达队列,则会自动调用此方法
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消息的回调方法");
//手动确认消息已经被消费,RabbitMQ收到该消息后,才会将消息从服务器中删除
//如果RabbitMQ一直接收不到应答,哪怕你已经消费了该条消息,该条消息就会一直保留在RabbitMQ服务器中,不会被删除
channel.basicAck(envelope.getDeliveryTag(), false);
//也可以告诉RabbitMQ服务器,拒绝接收该条信息,第二个参数表示是否重新放入队列,false表示不需要
//channel.basicReject(envelope.getDeliveryTag(), false);
}
};
/**
* 监听队列
* 第一个参数: 要监听的队列名称
* 第二个参数: 是否自动ack, false:表示收到ack(也就是手动确认消息是否被消费)
* 第三个参数: 消息的回调函数(作用:有消息过来时,将消息发送到该回调上)
*/
channel.basicConsume("direct_queue", false, consumer);
}
}
从以上代码可以看出,调用channel.basicConsume()监听队列时,第二个参数传入false就表示消息接收后,采用手动ACK应答方式,消息处理完之后,再调用channel.basicAck()这个方法,第一个参数表示消息标识(唯一id,用来识别消费的是那条消息,第二个参数表示是否批量应答,如果·为true,则小于该条消息id的所有消息都会一起确认)
总结
通过以上,我们就可以保证消息的可靠性了,实际上,消息发布确认还可以再进行细分:生产者发送消息到达交换机时,RabbitMQ会有一个回调,表示消息已经送达交换机,交换机将消息路由到队列时,也会有一个回调,用来确认交换机是否将消息成功发送到队列。
PS:后续有时间再写一个SpringBoot整合RabbitMQ的发布消息和消费消息的确认,
最后
以上就是伶俐跳跳糖为你收集整理的RabbitMQ—发布消息确认和消费消息确认序言消息发布流程发布消息确认消费消息确认总结的全部内容,希望文章能够帮你解决RabbitMQ—发布消息确认和消费消息确认序言消息发布流程发布消息确认消费消息确认总结所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复