概述
RabbitMQ本身提供了三种机制来保证消息的成功投递,成功消费,和消息丢失的处理,一个一个测试一下。
1、RabbitMQ的ACK机制
ACK机制是rabbitmq保证消息成功消费的机制,默认应该是自动签收的,也就是消息被队列取出即视为已消费,但是往往业务流程里面会存在必须等业务处理完成才能是已签收,或者处理业务的过程中发生了异常,不能签收,所以ACK机制可以保证这个问题。
1、首先要将签收设置为非自动签收
2、然后消费者消费消息,业务处理完成后,手动回应服务端,已签收
创建连接工厂:
public class RabbitmqConnectionFactory {
/**
* 获取一个rabbitmq连接工厂
* @return
*/
public static ConnectionFactory getConnectionFactory(){
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");// 主机
factory.setPort(5672);// 端口号
factory.setVirtualHost("/");// 虚拟机
factory.setUsername("admin");// 用户名
factory.setPassword("admin");// 密码
factory.setAutomaticRecoveryEnabled(true);// 是否支持自动重连
factory.setNetworkRecoveryInterval(3000);// 多久重连一次
return factory;
}
}
自定义消费,继承DefaultConsumer ,重写handleDelivery方法
重点在这里
// 成功签收
channel.basicAck(envelope.getDeliveryTag(), false);
// 未成功签收,第三个参数标识:是否重回队列,设置为true,则会重回到队列,如果设置为false则需要自己处理,写日志或其他方式
channel.basicNack(envelope.getDeliveryTag(), false, true);
:
public class MyConsumer extends DefaultConsumer {
private Channel channel;
public MyConsumer(Channel channel) {
super(channel);
this.channel = channel;
}
/**
* 重写父类方法
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body){
System.out.println("--------------- MyConsumer Message ----------------");
System.out.println("message body :" + new String(body));
try {
Thread.sleep(5000);
// 手动确认消息,false:不批量签收
channel.basicAck(envelope.getDeliveryTag(), false);
// 第三参数:是否重回队列,设置为true,则会重回到队列
//channel.basicNack(envelope.getDeliveryTag(), false, true);
} catch (Exception e) {
e.printStackTrace();
}
}
}
消费者代码,并启动:
public class AckConsumer {
private static final String QUEUE = "test.ack.queue";
private static final String EXCHANG_NAME = "test.ack.exchang";
private static final String ROUTING_KEY = "test.ack.*";
public static void main(String args[]) throws Exception {
// 1、获取连接工厂
ConnectionFactory connectionFactory = RabbitmqConnectionFactory.getConnectionFactory();
// 2、创建连接
Connection connection = connectionFactory.newConnection();
// 3、创建通道
Channel channel = connection.createChannel();
// 4、创建队列、交换机,并绑定
channel.exchangeDeclare(EXCHANG_NAME, BuiltinExchangeType.TOPIC, true);
channel.queueDeclare(QUEUE, true, false, false, null);
channel.queueBind(QUEUE, EXCHANG_NAME ,ROUTING_KEY);
// 5、创建消费者
MyConsumer consumer = new MyConsumer(channel);
// autoAck 必须设置为 false (手动签收)
channel.basicConsume(QUEUE, false, consumer);
}
}
生产端代码,并启动:
public class AckProducer {
private static final String QUEUE = "test.ack.queue";
private static final String EXCHANG_NAME = "test.ack.exchang";
private static final String ROUTING_KEY = "test.ack.one";
public static void main(String args[]) throws Exception {
// 1、获取连接工厂
ConnectionFactory connectionFactory = RabbitmqConnectionFactory.getConnectionFactory();
// 2、创建连接
Connection connection = connectionFactory.newConnection();
// 3、创建通道
Channel channel = connection.createChannel();
// 4、发送消息
String msg = "this is ack msg !";
channel.basicPublish(EXCHANG_NAME, ROUTING_KEY, null, msg.getBytes());
}
}
2、RabbitMQ的Confirm机制
confirm机制机制是保证消息成功投递到了服务端,通过回调通知生产者是否收到了消息,重点在生产者这里,要设置消息确认,然后监听回调。
生产者代码:
public class ConfirmProducer {
// 队列
private static final String QUEUE = "test.confirm.queue";
// 交换机
private static final String EXCHANG_NAME = "test.confirm.exchang";
// 路由key
private static final String ROUTING_KEY = "test.confirm.one";
public static void main(String args[]) throws Exception {
// 1、获取连接工厂
ConnectionFactory connectionFactory = RabbitmqConnectionFactory.getConnectionFactory();
// 2、创建连接
Connection connection = connectionFactory.newConnection();
// 3、创建通道、并指定消息确认模式
Channel channel = connection.createChannel();
// 设置消息确认模式
channel.confirmSelect();
// 5、发送消息
String msg = "this is confirm message !";
channel.basicPublish(EXCHANG_NAME, ROUTING_KEY, null, msg.getBytes());
// 6、添加消息确认监听
channel.addConfirmListener(new ConfirmListener() {
// 消息确认
@Override
public void handleAck(long l, boolean b) throws IOException {
System.out.println("--------handle Ack---------");
}
// 消息未确认
@Override
public void handleNack(long l, boolean b) throws IOException {
System.out.println("--------handle No Ack---------");
}
});
}
}
3、RabbitMQ的Return机制
return机制是保证消息不丢失,有些时候我们生产的消息没有投递到任何队列,或者队列名称、交换机、路由错了,导致没有投递到队列里面,这个时候return监听机制就能获取到未被成功投递的消息,然后做业务处理,生产消息的时候要设置消息确认模式,然后添加return监听器,设置mandatory为true,如果为false,那么broker端自动删除该消息。
重点也是在生产者:
public class ReturnProducer {
private static final String QUEUE = "test.return.queue";
private static final String EXCHANG_NAME = "test.return.exchang";
private static final String ROUTING_KEY = "test.return.aa.bb";
public static void main(String args[]) throws Exception {
// 1、获取连接工厂
ConnectionFactory connectionFactory = RabbitmqConnectionFactory.getConnectionFactory();
// 2、创建连接
Connection connection = connectionFactory.newConnection();
// 3、创建通道
Channel channel = connection.createChannel();
// 设置消息确认模式
channel.confirmSelect();
// 添加return监听
channel.addReturnListener(new ReturnListener() {
@Override
public void handleReturn(int i, String s, String s1, String s2, AMQP.BasicProperties basicProperties, byte[] bytes) throws IOException {
System.err.println("消息投递失败!!!");
System.out.println("i:" +i);
System.out.println("s:" +s);
System.out.println("s1:" +s1);
System.out.println("s2:" +s2);
System.out.println("basicProperties:" +basicProperties);
System.out.println("body:" +new String(bytes));
}
});
// 4、发送消息
String msg = "this is return msg !";
// mandatory, 设置为true,则监听器会接收到路由不可达的消息, 然后进行处理,如果设置为false,那么broker端自动删除该消息。
channel.basicPublish(EXCHANG_NAME, ROUTING_KEY, true,null, msg.getBytes());
}
}
源码:https://github.com/wangyi0102/RabbitmqDemo
最后
以上就是光亮蓝天为你收集整理的rabbitmq(三),ACK机制、Confirm、Return机制(附源码)的全部内容,希望文章能够帮你解决rabbitmq(三),ACK机制、Confirm、Return机制(附源码)所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复