概述
1、发布确认的原理
生产者将信道设置成confirm模式,所有在该信道上面发布的消息都将会被指派一个唯一的ID(从1开始),
一旦消息被投送到匹配的队列之后,broker就会发送一个确认消息给生产者(包含消息唯一的ID),
如果消息和队列是可持久化的,那么确认消息将在写入磁盘之后发送确认消息给生产者
confirm模式最大的好处就是异步的,可以一边等待消息的返回,同时可继续发布下一条消息。
当消息最终得到确认之后,生产者应用便可以通过回调方法来处理该确认消息,
如果 RabbitMQ 因为自身内部错误导致消息丢失,就会发送一条 nack 消息,
生产者应用程序同样可以在回调方法中处理该 nack 消息。
//开启发布确认的方法
channel.confirmSelect();
2、单个消息发布确认发布
这是简单的发布确认方式,也是一种同步确认发布的方式,也就是只要一个消息发布得到确认后,后续的消息才能继续发布;
waitForConfirmsOrDie(long)这个方法只有在消息被确认的时候才返回,如果在指定时间范围内这个消息没有被确认那么它将抛出异常
import com.example.wwy.rabbit.units.ChannelUnit;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.MessageProperties;
//发送消息
public class Task1 {
public static final String QUEUE_NAME="tsak1";
public static void main(String[] args) throws Exception {
Channel channel = ChannelUnit.getChannel();
//开启发布确认的方法
channel.confirmSelect();
/**
* 1.队列名称
* 2.是否持久化,true持久化
* 3.是否共享
* 4.是否自动删除
* 5.其它参数
*/
boolean durable =true;
channel.queueDeclare(QUEUE_NAME,durable,false,false,null);
System.out.println("正在准备发送消息。。。");
Scanner scanner=new Scanner(System.in);
while (scanner.hasNext()){
String message=scanner.next();
/**
* 1.发送到那个交换机
* 2.队列名称
* 3.其他参数 消息持久化参数:MessageProperties.PERSISTENT_TEXT_PLAIN
* 4.要发送的消息体
*/
channel.basicPublish("",QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes(StandardCharsets.UTF_8));
//服务端返回false或者超时,可以设置消息重发
boolean b = channel.waitForConfirms();
if(b) {
System.out.println("发送消息【" + message + "】完毕");
}
}
}
}
这种确认的方式有个最大的缺点:发布的速度很慢,而且还可能阻塞后续的消息发布。
3、批量确认发布
与单个消息确认相比,先发布一批消息后在一起确认,这样提高了系统的吞吐量,
缺点:如果出现问题,不知道是那个消息, 这种方案也是同步的。
import com.example.wwy.rabbit.units.ChannelUnit;
import com.rabbitmq.client.Channel;
import java.util.UUID;
public class Task2 {
public static void main(String[] args) throws Exception {
Channel channel = ChannelUnit.getChannel();
String queueName= UUID.randomUUID().toString();
/**
* 1.队列名称
* 2.是否持久化,true持久化
* 3.是否共享
* 4.是否自动删除
* 5.其它参数
*/
channel.queueDeclare(queueName,false,false,false,null);
//开启发布确认
channel.confirmSelect();
//批量确认消息的大小
int batchSize=100;
//未确认消息个数
int noaskCount=1000;
for(int i=1;i<=1000;i++){
String message=i+"";
channel.basicPublish("",queueName,null,message.getBytes());
//未确认消息个数减1
noaskCount--;
//每100个消息,确认一次
if(i%batchSize==0){
channel.waitForConfirms();
}
System.out.println("发布消息【"+message+"】");
}
}
}
4、异步确认发布
import com.example.wwy.rabbit.units.ChannelUnit;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmCallback;
import java.util.UUID;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
//异步确认
public class Task3 {
public static void main(String[] args) throws Exception {
Channel channel = ChannelUnit.getChannel();
String queueName= UUID.randomUUID().toString();
/**
* 1.队列名称
* 2.是否持久化,true持久化
* 3.是否共享
* 4.是否自动删除
* 5.其它参数
*/
channel.queueDeclare(queueName,false,false,false,null);
//开启发布确认
channel.confirmSelect();
//线程安全有序的一个哈希表,适用于高并发的情况
ConcurrentSkipListMap<Long,String> concurrentSkipListMap=new ConcurrentSkipListMap<>();
/**
* 确认收到消息的回调
* 1.消息序列号
* 2.true可以确认小于或等于该序列号的消息
* false只确认当前序列号的消息
*/
ConfirmCallback askCallback=(sequenceNumber, multiple)->{
if(multiple){
//返回的是小于或等于该序列号的消息
ConcurrentNavigableMap<Long, String> longStringConcurrentNavigableMap =
concurrentSkipListMap.headMap(sequenceNumber, true);
Set<Long> longs = longStringConcurrentNavigableMap.keySet();
for(Long key:longs){
System.out.println("发布的消息【"+longStringConcurrentNavigableMap.get(key)+"】被A确认,序列号"+key);
}
//清除该部分未确认消息
longStringConcurrentNavigableMap.clear();
}else{
String message=concurrentSkipListMap.get(sequenceNumber);
System.out.println("发布的消息【"+message+"】被B确认,序列号"+sequenceNumber);
//只清除当前序列号的消息
concurrentSkipListMap.remove(sequenceNumber);
}
};
//未确认消息的回调
ConfirmCallback nackCallback = (sequenceNumber, multiple) ->{
String message = concurrentSkipListMap.get(sequenceNumber);
System.out.println("发布的消息"+message+"未被确认,序列号"+sequenceNumber);
};
/**
*
添加一个异步确认的监听器
* 1.确认收到消息的回调
* 2.未收到消息的回调
*/
channel.addConfirmListener(askCallback,nackCallback);
for (int i = 0; i < 1000; i++) {
String message = "消息" + i;
/**
* channel.getNextPublishSeqNo()获取下一个消息的序列号
* 通过序列号与消息体进行一个关联
* 全部都是未确认的消息体
*/
concurrentSkipListMap.put(channel.getNextPublishSeqNo(), message);
channel.basicPublish("", queueName, null, message.getBytes());
System.out.println("确认的消息是【"+message+"】");
}
}
}
单条发布消息
同步等待确认,简单,但吞吐量非常有限。
批量发布消息
批量同步等待确认,简单,合理的吞吐量,一旦出现问题但很难推断出是那条消息出现了问题。
异步处理:
最佳性能和资源使用,在出现错误的情况下可以很好地控制,但是实现起来稍微难些
5、发布确认与 springboot 整合
5.1、在配置文件中添加
- none:禁用发布确认模式(默认)
- correlated:发布消息成功后,会触发交换机调用回调;
- simple:经测试有两种效果,其一效果和correlated值一样会触发回调方法,
其二在发布消息成功后使用rabbitTemplate调用waitForConfirms或waitForConfirmsOrDie方法等待broker节点返回发送结果,根据返回结果来判定下一步的逻辑,要注意的点是waitForConfirmsOrDie方法如果返回false则会关闭channel,则接下来无法发送消息到broker;
5.2、设置correlated一个简单的demo
假设交换机出现问题了
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
//配置类,设置了交换机和队列,以及绑定信息
@Configuration
public class ConfirmConfig {
public static final String EXCHANGE_NAME="confirm_exchange";
public static final String QUEUE_NAME="confirm_queue";
//配置交换机
@Bean
public DirectExchange getConfirmExchange(){
return new DirectExchange(EXCHANGE_NAME);
}
//配置队列
@Bean
public Queue getConfirmQueue(){
return new Queue(QUEUE_NAME);
}
//声明队列与交换机之间的绑定
@Bean
public Binding getConfirmBinding(@Qualifier("getConfirmExchange") DirectExchange exchange,
@Qualifier("getConfirmQueue") Queue queue){
return BindingBuilder.bind(queue).to(exchange).with("key1");
}
}
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
//交换机是否收到消息的回调
@Slf4j
@Component
public class MyCallBack implements RabbitTemplate.ConfirmCallback {
/**
*
* @param correlationData 消息相关信息
* @param b 是否收到消息
* @param s 未收到消息的异常信息
*/
@Override
public void confirm(CorrelationData correlationData, boolean b, String s) {
String id=correlationData!=null?correlationData.getId():"";
if(b){
log.info("交换机收到ID为{}的消息",id);
}else{
log.info("交换机没有收到ID为{}的消息,原因是{}",id,s);
}
}
}
import com.example.wwy.comfirm.MyCallBack;
import com.example.wwy.config.ConfirmConfig;
import com.example.wwy.rabbit.units.DateUtile;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.PostConstruct;
//发送者消息
@Slf4j
@RestController
@RequestMapping(value = "/confirm")
public class ConFirmController {
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private MyCallBack myCallBack;
//初始化的时候设置回调方法
@PostConstruct
public void init(){
rabbitTemplate.setConfirmCallback(myCallBack);
}
@GetMapping(value = "/sendConfirmMsg/{msg}")
public void sendConfirmMsg(@PathVariable String msg){
CorrelationData correlationData1=new CorrelationData("1");
String key="key1";
//发送一条消息到正确的交换机
rabbitTemplate.convertAndSend(ConfirmConfig.EXCHANGE_NAME,key,msg+1,correlationData1);
log.info("【{}】发送第一条消息:{}", DateUtile.getDateToString(),msg+1);
CorrelationData correlationData2=new CorrelationData("2");
//发送一条消息到错误的交换机
rabbitTemplate.convertAndSend(ConfirmConfig.EXCHANGE_NAME+"abc",key,msg+2,correlationData2);
log.info("【{}】发送第二条消息:{}", DateUtile.getDateToString(),msg+2);
}
}
import com.example.wwy.config.ConfirmConfig;
import com.example.wwy.rabbit.units.DateUtile;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.UnsupportedEncodingException;
//消费者信息
@Slf4j
@Component
public class ConfirmConsumer1 {
@RabbitListener(queues=ConfirmConfig.QUEUE_NAME)
public void confirmC1(Message message) throws UnsupportedEncodingException {
String msg=new String(message.getBody(),"UTF-8");
log.info("【{}】接受到的消息是:{}", DateUtile.getDateToString(),msg);
}
}
5.3、消息回退 Mandatory 参数
场景:交换机接受到消息之后,就会发送到确认消息给生产者,如果发现该消息不可路由,那么该消息就会被丢弃掉,此时生产者还一无所知。
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
import java.io.UnsupportedEncodingException;
@Slf4j
@Component
public class MyCallBack implements RabbitTemplate.ConfirmCallback ,RabbitTemplate.ReturnsCallback{
/**
*
* @param correlationData 消息相关信息
* @param b 是否收到消息
* @param s 未收到消息的异常信息
*/
@Override
public void confirm(CorrelationData correlationData, boolean b, String s) {
String id=correlationData!=null?correlationData.getId():"";
if(b){
log.info("交换机收到ID为{}的消息",id);
}else{
log.info("交换机没有收到ID为{}的消息,原因是{}",id,s);
}
}
//消费未送达队列,回调调用的方法
@Override
public void returnedMessage(ReturnedMessage returnedMessage) {
try {
log.info("消息:{}被服务器退回,退回的原因是:{},交换机是{},路由key是{},异常编号是{}",
new String(returnedMessage.getMessage().getBody(),"utf-8"),
returnedMessage.getReplyText(),
returnedMessage.getExchange(),
returnedMessage.getRoutingKey(),
returnedMessage.getReplyCode()
);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
}
//在生产者增加如下配置
@PostConstruct
public void init(){
rabbitTemplate.setConfirmCallback(myCallBack);
//设置交换机无法送达时,是否发送消息给生产者,true发送
rabbitTemplate.setMandatory(true);
//设置回退消息给谁处理
rabbitTemplate.setReturnsCallback(myCallBack);
}
最后
以上就是优雅橘子为你收集整理的RabbitMQ的发布确认机制的全部内容,希望文章能够帮你解决RabbitMQ的发布确认机制所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复