概述
一、概念
1、原理
生产者将信道设置成confirm模式,一旦信道进入confirm模式,所有在该信道上面发布的消息都将会被指派一个唯一的ID(从1开始)。一旦消息被投递到所有匹配的队列之后,broker就会发送一个确认给生产者(包含消息的唯一ID),这就使得生产者知道消息已经正确到达目的队列,如果消息和队列是可持久化的,那么确认消息会在将写入磁盘后发出,broker回传给生产者的确认消息中delivery-tag域包含了确认消息的序列号,此外broker也可以设置basic.ack的multiple域,表示到这个序列号之前的所有消息都已经得到了处理。
confirm模式最大的好处在于他是异步的,一旦发布一条消息,生产者应用程序就可以在等信道返回确认的同时继续发送下一条消息,当消息最终得到确认之后,生产者应用便可以通过回调方法来处理该确认消息,如果RabbitMQ因为自身内部错误导致消息丢失,就会发送一条nack消息,生产者应用程序同样可以在回调方法中处理该nack消息。
2、目的是确保生产者的消息持久化。
持久化的三个要求:1、设置队列持久化;2、设置队列中的消息持久化;3、发布确认
二、具体的步骤
1、开启发布确认的方法
发布确认默认是没有开启的,如果需要开启调用的方法confirmSelect,每当需要使用发布确认的时候都需要在channel上调用该方法。
//获取信道
Channel channel = connection.createChannel();
//开启发布确认
channel.confirmSelect();
2、单个发布确认
他是一种同步确认发布的方式,发布一个消息后只有他被确认发布,后续的消息才能继续发布。waitForConfirmOrDie(long)这个方法只有在消息被确认的时候才返回,如果在指定时间范围内这个消息没有被确认那么它会抛出异常。
它最大的缺点:发布速度慢,因为没有确认发布的消息就会阻塞所有后续消息的发布,这种方式最多提供每秒数百条的发布消息吞吐量。
3、批量发布确认
发布一批消息然后批量确认,这种方式的缺点:当发生故障导致发布出现问题,不知道是哪个消息出现问题,必须将整个批量保存在内存中,以记录重要的信息而后在重新发布消息。这种方案任然是同步的,也会阻塞消息的发布。
4、异步确认发布
这种方式编程逻辑相对复杂,但是性价比最高,无论是可靠性还是效率,他是利用回调函数来达到消息可靠性的传递,这个中间件是通过函数的回调来保证是否投递成功。
5、三种方式的代码
public class ConfirmMsg {
public static void main(String[] args) {
ConfirmMsg confirmMsg = new ConfirmMsg();
//单个发布确认
System.out.printf("单个发布确认耗时:"+confirmMsg.singleConfirm()+"n");
//批量
System.out.printf("批量发布确认耗时:"+confirmMsg.batchConfirm()+"n");
//异步
System.out.printf("批量发布确认耗时:"+confirmMsg.asynchronizationConfirm()+"n");
}
//单个消息发布确认
public long singleConfirm(){
//创建链接
Connection connection = ConnectionUtil.getConnection();
try {
//创建信道
Channel channel = connection.createChannel();
//开启发布确认
channel.confirmSelect();
/**
* 生产队列
* 1、队列名称
* 2、队列消息是否持久化
* 3、队列消息是否资源共享
* 4、是否自动删除
* 5、其他参数
* */
channel.queueDeclare(workerOne.QUEUE_NAME,true,false,false,null);
long begin = System.currentTimeMillis();
//发送消息
for (int i = 0; i <100; i++) {
String msg=i+"";
//发消息
channel.basicPublish("",workerOne.QUEUE_NAME,null,msg.getBytes());
//确认
boolean b = channel.waitForConfirms();
// if(b){
// System.out.printf("消息发送成功:"+i+"n");
// }
}
return System.currentTimeMillis()-begin;
} catch (IOException | InterruptedException e) {
e.printStackTrace();
}
return 0;
}
//批量消息发布确认
public long batchConfirm(){
//创建链接
Connection connection = ConnectionUtil.getConnection();
try {
//创建信道
Channel channel = connection.createChannel();
//开启发布确认
channel.confirmSelect();
/**
* 生产队列
* 1、队列名称
* 2、队列消息是否持久化
* 3、队列消息是否资源共享
* 4、是否自动删除
* 5、其他参数
* */
channel.queueDeclare(workerOne.QUEUE_NAME,true,false,false,null);
long begin = System.currentTimeMillis();
//发送消息
//一次性发送消息的条数
int count=0;
for (int i = 0; i <100; i++) {
String msg=i+"";
count++;
//发消息
channel.basicPublish("",workerOne.QUEUE_NAME,null,msg.getBytes());
//确认
if(count==10){
channel.waitForConfirms();
count=0;
}
}
if(count>0){
channel.waitForConfirms();
}
return System.currentTimeMillis()-begin;
} catch (IOException | InterruptedException e) {
e.printStackTrace();
}
return 0;
}
//异步消息发布确认
public long asynchronizationConfirm(){
//创建链接
Connection connection = ConnectionUtil.getConnection();
try {
//创建信道
Channel channel = connection.createChannel();
//开启发布确认
channel.confirmSelect();
/**
* 生产队列
* 1、队列名称
* 2、队列消息是否持久化
* 3、队列消息是否资源共享
* 4、是否自动删除
* 5、其他参数
* */
channel.queueDeclare(workerOne.QUEUE_NAME,true,false,false,null);
//创建一个安全的map存发送的消息
ConcurrentSkipListMap<Long,String> map=new ConcurrentSkipListMap<>();
long begin = System.currentTimeMillis();
//监听器的参数;var1消息编号,var3是否批量确认
//成功
ConfirmCallback ackCallback=(var1,var3)->{
//成功删除消息
//如果是批量
if(var3){
ConcurrentNavigableMap<Long, String> longStringConcurrentNavigableMap = map.headMap(var1);
longStringConcurrentNavigableMap.clear();
}else {
map.remove(var1);
}
System.out.printf("消息接收成功"+var1+"n");
};
//失败
ConfirmCallback nackCallback=(var1,var3)->{
//失败的消息
String s = map.get(var1);
System.out.printf("消息接收失败"+var1+"n");
};
//准备消息的监听器械
channel.addConfirmListener(ackCallback,nackCallback);
//发送消息
for (int i = 0; i <100; i++) {
String msg=i+"";
//发消息
channel.basicPublish("",workerOne.QUEUE_NAME,null,msg.getBytes());
//消息存map,方便后续确认是否成功接收
map.put(channel.getNextPublishSeqNo(),msg);
}
return System.currentTimeMillis()-begin;
} catch (IOException e) {
e.printStackTrace();
}
return 0;
}
}
最后
以上就是痴情魔镜为你收集整理的RabbitMQ--发布确认一、概念二、具体的步骤的全部内容,希望文章能够帮你解决RabbitMQ--发布确认一、概念二、具体的步骤所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复