4 发布确认
4.1 发布确认原理
生产者将信道设置成confirm模式,一旦信道进入confirm模式所有在该信道上发布的消息都将会指派一个唯一ID(从1开始),一旦消息被投递到所有匹配的队列之后,broker就会发送一个确认给生产者(包含消息的唯一ID),这样生产者就知道消息已经正确到达目的队列了,如果消息和队列是可持久化的,那么消息会在将消息写入磁盘后发出;broker回传给生产者的确认消息中delivery-tag域包含了确认消息的序列号,此外broker也可以设置basic.ack的multiple域,表示这个序列号之前的消息都已经得到了处理。
confirm最大的好处是他是异步的,一旦发布消息,生产者应用程序就可以在等待信道返回的同时继续发送下一条消息,当消息最终得到确认后,生产者应用便可以通过回调方法来处理该确认消息,如果RabbitMQ因为自身内部错误导致消息丢失,就会发送一条nack消息,生产者应用程序同样可以在回调中处理该nack消息。
4.2 单个发布确认
发布一个消息之后只有他被确认之后,后续消息才能继续发布。他是一种同步确认发布
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32public class ConfirmMessage { public static final int MESSAGE_COUNT = 1000; public static void main(String[] args) throws Exception { publishMessageIndividually(); //Cost Time is 497ms publishMessageBatch(); //Cost Time is 61ms publishMessageSync(); //Cost Time is 17ms } //单个确认 public static void publishMessageIndividually() throws Exception { Channel channel = RabbitMqUtils.getChannel(); String queueName = UUID.randomUUID().toString(); channel.queueDeclare(queueName, false, false, false, null); //发布确认 channel.confirmSelect(); long startTime = System.currentTimeMillis(); for (int i = 0; i < MESSAGE_COUNT; i++) { String message = i + ""; channel.basicPublish("", queueName, null, message.getBytes(StandardCharsets.UTF_8)); //单个发布确认 boolean confirms = channel.waitForConfirms(); if (confirms) { System.out.println("消息发送成功"); } } long endTime = System.currentTimeMillis(); System.out.println("Cost Time is " + (endTime - startTime) + "ms"); } }
4.3 批量确认发布
与单个发布确认相比,先发一批消息然后一起确认可以极大第提高吞吐量,缺点就是:当发送出现故障时,不知道哪个消息出现了问题,我们必须将整个批处理保存在内存中,以记录信息而后重新发布消息。这种方案仍是同步的。
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21//批量发布确认 public static void publishMessageBatch() throws Exception { Channel channel = RabbitMqUtils.getChannel(); String queueName = UUID.randomUUID().toString(); channel.queueDeclare(queueName, false, false, false, null); //发布确认 channel.confirmSelect(); //批量大小 int batchSize = 100; long startTime = System.currentTimeMillis(); for (int i = 0; i < MESSAGE_COUNT; i++) { String message = i + ""; channel.basicPublish("", queueName, null, message.getBytes(StandardCharsets.UTF_8)); if (i % batchSize == 0){ channel.confirmSelect(); } } long endTime = System.currentTimeMillis(); System.out.println("Cost Time is " + (endTime - startTime) + "ms"); }
4.4 异步确认发布
性能高,可靠性高;推荐
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26//异步发送确认 public static void publishMessageSync() throws Exception { Channel channel = RabbitMqUtils.getChannel(); String queueName = UUID.randomUUID().toString(); channel.queueDeclare(queueName, false, false, false, null); //开启发布确认 channel.confirmSelect(); long startTime = System.currentTimeMillis(); //消息监听器,监听哪些消息发送失败或成功 channel.addConfirmListener( //消息确认成功回调 1:消息标识;2:是否批量 (deliveryTag, multiple) -> { System.out.println("确认的消息:" + deliveryTag); }, //消息确认失败回调 1:消息标识;2:是否批量 (deliveryTag, multiple) -> { System.out.println("未确认消息:" + deliveryTag); }); for (int i = 0; i < MESSAGE_COUNT; i++) { String message = i + ""; channel.basicPublish("", queueName, null, message.getBytes(StandardCharsets.UTF_8)); } long endTime = System.currentTimeMillis(); System.out.println("Cost Time is " + (endTime - startTime) + "ms"); }
4.5 异步未确认消息的处理
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45//异步发送确认 public static void publishMessageSync() throws Exception { Channel channel = RabbitMqUtils.getChannel(); String queueName = UUID.randomUUID().toString(); channel.queueDeclare(queueName, false, false, false, null); //开启发布确认 channel.confirmSelect(); long startTime = System.currentTimeMillis(); /** * 线程安全的哈希表,适用于高并发 * 将序号与消息进行关联 * 批量删除条目 */ ConcurrentSkipListMap<Long, String> outstandingConfirms = new ConcurrentSkipListMap<>(); //消息监听器,监听哪些消息发送失败或成功 channel.addConfirmListener( //消息确认成功回调 1:消息标识;2:是否批量 (deliveryTag, multiple) -> { if (multiple) { //删除已确认的消息 headMap(deliveryTag)返回一个小于deliveryTag的子map ConcurrentNavigableMap<Long, String> confirmed = outstandingConfirms.headMap(deliveryTag); confirmed.clear(); } else { outstandingConfirms.remove(deliveryTag); } System.out.println("确认的消息:" + deliveryTag); }, //消息确认失败回调 1:消息标识;2:是否批量 (deliveryTag, multiple) -> { String message = outstandingConfirms.get(deliveryTag); System.out.println("未确认消息:" + message); }); for (int i = 0; i < MESSAGE_COUNT; i++) { String message = i + ""; channel.basicPublish("", queueName, null, message.getBytes(StandardCharsets.UTF_8)); //记录要发送的消息 outstandingConfirms.put(channel.getNextPublishSeqNo(), message); } long endTime = System.currentTimeMillis(); System.out.println("Cost Time is " + (endTime - startTime) + "ms"); }
第五章:RabbitMQ 交换机
最后
以上就是着急樱桃最近收集整理的关于RabbitMQ 发布确认4 发布确认的全部内容,更多相关RabbitMQ内容请搜索靠谱客的其他文章。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复