概述
4 发布确认
4.1 发布确认原理
生产者将信道设置成confirm模式,一旦信道进入confirm模式所有在该信道上发布的消息都将会指派一个唯一ID(从1开始),一旦消息被投递到所有匹配的队列之后,broker就会发送一个确认给生产者(包含消息的唯一ID),这样生产者就知道消息已经正确到达目的队列了,如果消息和队列是可持久化的,那么消息会在将消息写入磁盘后发出;broker回传给生产者的确认消息中delivery-tag域包含了确认消息的序列号,此外broker也可以设置basic.ack的multiple域,表示这个序列号之前的消息都已经得到了处理。
confirm最大的好处是他是异步的,一旦发布消息,生产者应用程序就可以在等待信道返回的同时继续发送下一条消息,当消息最终得到确认后,生产者应用便可以通过回调方法来处理该确认消息,如果RabbitMQ因为自身内部错误导致消息丢失,就会发送一条nack消息,生产者应用程序同样可以在回调中处理该nack消息。
4.2 单个发布确认
发布一个消息之后只有他被确认之后,后续消息才能继续发布。他是一种同步确认发布
public class ConfirmMessage {
public static final int MESSAGE_COUNT = 1000;
public static void main(String[] args) throws Exception {
publishMessageIndividually(); //Cost Time is 497ms
publishMessageBatch(); //Cost Time is 61ms
publishMessageSync(); //Cost Time is 17ms
}
//单个确认
public static void publishMessageIndividually() throws Exception {
Channel channel = RabbitMqUtils.getChannel();
String queueName = UUID.randomUUID().toString();
channel.queueDeclare(queueName, false, false, false, null);
//发布确认
channel.confirmSelect();
long startTime = System.currentTimeMillis();
for (int i = 0; i < MESSAGE_COUNT; i++) {
String message = i + "";
channel.basicPublish("", queueName, null, message.getBytes(StandardCharsets.UTF_8));
//单个发布确认
boolean confirms = channel.waitForConfirms();
if (confirms) {
System.out.println("消息发送成功");
}
}
long endTime = System.currentTimeMillis();
System.out.println("Cost Time is " + (endTime - startTime) + "ms");
}
}
4.3 批量确认发布
与单个发布确认相比,先发一批消息然后一起确认可以极大第提高吞吐量,缺点就是:当发送出现故障时,不知道哪个消息出现了问题,我们必须将整个批处理保存在内存中,以记录信息而后重新发布消息。这种方案仍是同步的。
//批量发布确认
public static void publishMessageBatch() throws Exception {
Channel channel = RabbitMqUtils.getChannel();
String queueName = UUID.randomUUID().toString();
channel.queueDeclare(queueName, false, false, false, null);
//发布确认
channel.confirmSelect();
//批量大小
int batchSize = 100;
long startTime = System.currentTimeMillis();
for (int i = 0; i < MESSAGE_COUNT; i++) {
String message = i + "";
channel.basicPublish("", queueName, null, message.getBytes(StandardCharsets.UTF_8));
if (i % batchSize == 0){
channel.confirmSelect();
}
}
long endTime = System.currentTimeMillis();
System.out.println("Cost Time is " + (endTime - startTime) + "ms");
}
4.4 异步确认发布
性能高,可靠性高;推荐
//异步发送确认
public static void publishMessageSync() throws Exception {
Channel channel = RabbitMqUtils.getChannel();
String queueName = UUID.randomUUID().toString();
channel.queueDeclare(queueName, false, false, false, null);
//开启发布确认
channel.confirmSelect();
long startTime = System.currentTimeMillis();
//消息监听器,监听哪些消息发送失败或成功
channel.addConfirmListener(
//消息确认成功回调 1:消息标识;2:是否批量
(deliveryTag, multiple) -> {
System.out.println("确认的消息:" + deliveryTag);
},
//消息确认失败回调 1:消息标识;2:是否批量
(deliveryTag, multiple) -> {
System.out.println("未确认消息:" + deliveryTag);
});
for (int i = 0; i < MESSAGE_COUNT; i++) {
String message = i + "";
channel.basicPublish("", queueName, null, message.getBytes(StandardCharsets.UTF_8));
}
long endTime = System.currentTimeMillis();
System.out.println("Cost Time is " + (endTime - startTime) + "ms");
}
4.5 异步未确认消息的处理
//异步发送确认
public static void publishMessageSync() throws Exception {
Channel channel = RabbitMqUtils.getChannel();
String queueName = UUID.randomUUID().toString();
channel.queueDeclare(queueName, false, false, false, null);
//开启发布确认
channel.confirmSelect();
long startTime = System.currentTimeMillis();
/**
* 线程安全的哈希表,适用于高并发
* 将序号与消息进行关联
* 批量删除条目
*/
ConcurrentSkipListMap<Long, String> outstandingConfirms = new ConcurrentSkipListMap<>();
//消息监听器,监听哪些消息发送失败或成功
channel.addConfirmListener(
//消息确认成功回调 1:消息标识;2:是否批量
(deliveryTag, multiple) -> {
if (multiple) {
//删除已确认的消息 headMap(deliveryTag)返回一个小于deliveryTag的子map
ConcurrentNavigableMap<Long, String> confirmed =
outstandingConfirms.headMap(deliveryTag);
confirmed.clear();
} else {
outstandingConfirms.remove(deliveryTag);
}
System.out.println("确认的消息:" + deliveryTag);
},
//消息确认失败回调 1:消息标识;2:是否批量
(deliveryTag, multiple) -> {
String message = outstandingConfirms.get(deliveryTag);
System.out.println("未确认消息:" + message);
});
for (int i = 0; i < MESSAGE_COUNT; i++) {
String message = i + "";
channel.basicPublish("", queueName, null, message.getBytes(StandardCharsets.UTF_8));
//记录要发送的消息
outstandingConfirms.put(channel.getNextPublishSeqNo(), message);
}
long endTime = System.currentTimeMillis();
System.out.println("Cost Time is " + (endTime - startTime) + "ms");
}
第五章:RabbitMQ 交换机
最后
以上就是着急樱桃为你收集整理的RabbitMQ 发布确认4 发布确认的全部内容,希望文章能够帮你解决RabbitMQ 发布确认4 发布确认所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复