概述
Rabbitmq的死信队列和延时队列
一、死信队列
死信队列其实和普通的队列没啥大的区别,都需要创建自己的Queue、Exchange,然后通过RoutingKey绑定到Exchange上去,只不过死信队列的RoutingKey和Exchange要作为参数,绑定到正常的队列上去,一种应用场景是正常队列里面的消息被basicNack或者reject时,消息就会被路由到正常队列绑定的死信队列中,还有一种还有常用的场景就是开启了自动签收,然后消费者消费消息时出现异常,超过了重试次数,那么这条消息也会进入死信队列,如果配置了话,当然还有其他的应用场景,这里不一一讨论。
1.1、死信队列和交换器配置
这里有两个队列,正常的业务队列emailQueue和与之绑定的死信队列,这里只演示,手动签收,消费者捕获异常Nack
1.1.2、yml配置
spring:
rabbitmq:
host: 192.168.99.12
port: 5672
username: guest
password: guest
# 发送确认
publisher-confirms: true
# 路由失败回调
publisher-returns: true
template:
# 必须设置成true 消息路由失败通知监听者,false 将消息丢弃
mandatory: true
listener:
simple:
# 每次从RabbitMQ获取的消息数量
prefetch: 1
default-requeue-rejected: false
# 每个队列启动的消费者数量
concurrency: 1
# 每个队列最大的消费者数量
max-concurrency: 1
# 签收模式为手动签收-那么需要在代码中手动ACK
acknowledge-mode: manual
#邮件队列
email:
queue:
name: demo.email
#邮件交换器名称
exchange:
name: demoTopicExchange
#死信队列
dead:
letter:
queue:
name: demo.dead.letter
exchange:
name: demoDeadLetterTopicExchange
1.1.3、死信队列配置
/**
-
rabbitmq 配置
-
@author DUCHONG
-
@since 2020-08-23 14:05
**/
@Configuration
@Slf4j
public class RabbitmqConfig {@Value(" e m a i l . q u e u e . n a m e " ) p r i v a t e S t r i n g e m a i l Q u e u e ; @ V a l u e ( " {email.queue.name}") private String emailQueue; @Value(" email.queue.name")privateStringemailQueue;@Value("{exchange.name}")
private String topicExchange;
@Value(" d e a d . l e t t e r . q u e u e . n a m e " ) p r i v a t e S t r i n g d e a d L e t t e r Q u e u e ; @ V a l u e ( " {dead.letter.queue.name}") private String deadLetterQueue; @Value(" dead.letter.queue.name")privateStringdeadLetterQueue;@Value("{dead.letter.exchange.name}")
private String deadLetterExchange;@Bean
public Queue emailQueue() {Map<String, Object> arguments = new HashMap<>(2); // 绑定死信交换机 arguments.put("x-dead-letter-exchange", deadLetterExchange); // 绑定死信的路由key arguments.put("x-dead-letter-routing-key", deadLetterQueue+".#"); return new Queue(emailQueue,true,false,false,arguments);
}
@Bean
TopicExchange emailExchange() {
return new TopicExchange(topicExchange);
}@Bean
Binding bindingEmailQueue() {
return BindingBuilder.bind(emailQueue()).to(emailExchange()).with(emailQueue+".#");
}//私信队列和交换器
@Bean
public Queue deadLetterQueue() {
return new Queue(deadLetterQueue);
}@Bean
TopicExchange deadLetterExchange() {
return new TopicExchange(deadLetterExchange);
}@Bean
Binding bindingDeadLetterQueue() {
return BindingBuilder.bind(deadLetterQueue()).to(deadLetterExchange()).with(deadLetterQueue+".#");
}
}
1.2、消息发送方
@Configuration
@EnableScheduling
@Slf4j
public class ScheduleController {
@Autowired
RabbitTemplate rabbitTemplate;
@Value("${exchange.name}")
private String topicExchange;
@Scheduled(cron = "0 0/2 * * * ?")
public void sendEmailMessage() {
String msg = RandomStringUtils.randomAlphanumeric(8);
JSONObject email=new JSONObject();
email.put("content",msg);
email.put("to","duchong@qq.com");
CorrelationData correlationData=new CorrelationData(UUID.randomUUID().toString());
rabbitTemplate.convertAndSend(topicExchange,"demo.email.x",email.toJSONString(),correlationData);
log.info("---发送 email 消息---{}---messageId---{}",email,correlationData.getId());
}
}
1.3、消息消费方
@Component
@Slf4j
public class MessageHandler {
/**
* 邮件消费者
* @param message
* @param channel
* @param headers
* @throws IOException
*/
@RabbitListener(queues =“demo.email”)
@RabbitHandler
public void handleEmailMessage(Message message, Channel channel, @Headers Map<String,Object> headers) throws IOException {
try {
String msg=new String(message.getBody(), CharEncoding.UTF_8);
JSONObject jsonObject = JSON.parseObject(msg);
jsonObject.put("messageId",headers.get("spring_returned_message_correlation"));
log.info("---接受到消息---{}",jsonObject);
//主动异常
int m=1/0;
//手动签收
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}
catch (Exception e) {
log.info("handleEmailMessage捕获到异常,拒绝重新入队---消息ID---{}",headers.get("spring_returned_message_correlation"));
//异常,ture 重新入队,或者false,进入死信队列
channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);
}
}
/**
* 死信消费者,自动签收开启状态下,超过重试次数,或者手动签收,reject或者Nack
* @param message
*/
@RabbitListener(queues = "demo.dead.letter")
public void handleDeadLetterMessage(Message message, Channel channel,@Headers Map<String,Object> headers) throws IOException {
//可以考虑数据库记录,每次进来查数量,达到一定的数量,进行预警,人工介入处理
log.info("接收到死信消息:---{}---消息ID---{}", new String(message.getBody()),headers.get("spring_returned_message_correlation"));
//回复ack
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}
}
1.4、结果
image-20200823185649078
二、延时队列
延时队列顾名思义,不是及时的队列,也就是发送者发给的消息要延时一段时间,消费者才能接受的到,这里有个典型的应用场景就是订单30分钟内未支付就关闭订单,当然死信队列也是可以实现的,这里只演示消息的延时消费逻辑,订单逻辑就一个判断,这里不做讨论。
2.1、延时队列和交换器配置
使用延时队列之前,需要先安装延时队列插件,安装方法,前面已经介绍过了,这里放个链接
延时队列插件安装
2.1.1、yml配置
spring:
rabbitmq:
host: 192.168.99.12
port: 5672
username: guest
password: guest
# 发送确认
publisher-confirms: true
# 路由失败回调
publisher-returns: true
template:
# 必须设置成true 消息路由失败通知监听者,false 将消息丢弃
mandatory: true
#消费端
listener:
simple:
# 每次从RabbitMQ获取的消息数量
prefetch: 1
default-requeue-rejected: false
# 每个队列启动的消费者数量
concurrency: 1
# 每个队列最大的消费者数量
max-concurrency: 1
# 签收模式为手动签收-那么需要在代码中手动ACK
acknowledge-mode: manual
#邮件队列
email:
queue:
name: demo.email
#邮件交换器名称
exchange:
name: demoTopicExchange
#死信队列
dead:
letter:
queue:
name: demo.dead.letter
exchange:
name: demoDeadLetterTopicExchange
#延时队列
delay:
queue:
name: demo.delay
exchange:
name: demoDelayTopicExchange
2.1.2、延时队列配置
/**
-
rabbitmq 配置
-
@author DUCHONG
-
@since 2020-08-23 14:05
**/
@Configuration
@Slf4j
public class RabbitmqConfig {@Value(" e m a i l . q u e u e . n a m e " ) p r i v a t e S t r i n g e m a i l Q u e u e ; @ V a l u e ( " {email.queue.name}") private String emailQueue; @Value(" email.queue.name")privateStringemailQueue;@Value("{exchange.name}")
private String topicExchange;
@Value(" d e a d . l e t t e r . q u e u e . n a m e " ) p r i v a t e S t r i n g d e a d L e t t e r Q u e u e ; @ V a l u e ( " {dead.letter.queue.name}") private String deadLetterQueue; @Value(" dead.letter.queue.name")privateStringdeadLetterQueue;@Value("{dead.letter.exchange.name}")
private String deadLetterExchange;
@Value(" d e l a y . q u e u e . n a m e " ) p r i v a t e S t r i n g d e l a y Q u e u e ; @ V a l u e ( " {delay.queue.name}") private String delayQueue; @Value(" delay.queue.name")privateStringdelayQueue;@Value("{delay.exchange.name}")
private String delayExchange;@Bean
public Queue emailQueue() {Map<String, Object> arguments = new HashMap<>(2); // 绑定死信交换机 arguments.put("x-dead-letter-exchange", deadLetterExchange); // 绑定死信的路由key arguments.put("x-dead-letter-routing-key", deadLetterQueue+".#"); return new Queue(emailQueue,true,false,false,arguments);
}
@Bean
TopicExchange emailExchange() {
return new TopicExchange(topicExchange);
}@Bean
Binding bindingEmailQueue() {
return BindingBuilder.bind(emailQueue()).to(emailExchange()).with(emailQueue+".#");
}//私信队列和交换器
@Bean
public Queue deadLetterQueue() {
return new Queue(deadLetterQueue);
}@Bean
TopicExchange deadLetterExchange() {
return new TopicExchange(deadLetterExchange);
}@Bean
Binding bindingDeadLetterQueue() {
return BindingBuilder.bind(deadLetterQueue()).to(deadLetterExchange()).with(deadLetterQueue+".#");
}
//延时队列
@Bean
public Queue delayQueue() {
return new Queue(delayQueue);
}@Bean
CustomExchange delayExchange() {
Map<String, Object> args = new HashMap<>();
args.put(“x-delayed-type”, “topic”);
//参数二为类型:必须是x-delayed-message
return new CustomExchange(delayExchange, “x-delayed-message”, true, false, args);}
@Bean
Binding bindingDelayQueue() {
return BindingBuilder.bind(delayQueue()).to(delayExchange()).with(delayQueue+".#").noargs();
}
}
2.2、消息发送方
30分钟时间太久了,这里延时2分钟来看效果
@Configuration
@EnableScheduling
@Slf4j
public class ScheduleController {
@Autowired
RabbitTemplate rabbitTemplate;
@Value("${exchange.name}")
private String topicExchange;
@Value("${delay.exchange.name}")
private String delayTopicExchange;
@Scheduled(cron = "0 0/1 * * * ?")
public void sendEmailMessage() {
String msg = RandomStringUtils.randomAlphanumeric(8);
JSONObject email=new JSONObject();
email.put("content",msg);
email.put("to","duchong@qq.com");
CorrelationData correlationData=new CorrelationData(UUID.randomUUID().toString());
rabbitTemplate.convertAndSend(topicExchange,"demo.email.x",email.toJSONString(),correlationData);
log.info("---发送 email 消息---{}---messageId---{}",email,correlationData.getId());
}
@Scheduled(cron = "0 0/1 * * * ?")
public void sendDelayOrderMessage() throws Exception{
//订单号 id实际是保存订单后返回的,这里用uuid代替
String orderId = UUID.randomUUID().toString();
// 模拟订单信息
JSONObject order=new JSONObject();
order.put("orderId",orderId);
order.put("goodsName","vip充值");
order.put("orderAmount","99.00");
CorrelationData correlationData=new CorrelationData(orderId);
MessageProperties messageProperties = new MessageProperties();
messageProperties.setMessageId(orderId);
//30分钟时间太长,这里延时120s消费
messageProperties.setHeader("x-delay", 120000);
Message message = new Message(order.toJSONString().getBytes(CharEncoding.UTF_8), messageProperties);
rabbitTemplate.convertAndSend(delayTopicExchange,"demo.delay.x",message,correlationData);
log.info("---发送 order 消息---{}---orderId---{}",order,correlationData.getId());
//睡一会,为了看延迟效果
TimeUnit.MINUTES.sleep(10);
}
}
2.3、消息消费方
@Component
@Slf4j
public class MessageHandler {
/**
* 邮件发送
* @param message
* @param channel
* @param headers
* @throws IOException
*/
@RabbitListener(queues ="demo.email")
@RabbitHandler
public void handleEmailMessage(Message message, Channel channel, @Headers Map<String,Object> headers) throws IOException {
try {
String msg=new String(message.getBody(), CharEncoding.UTF_8);
JSONObject jsonObject = JSON.parseObject(msg);
jsonObject.put("messageId",headers.get("spring_returned_message_correlation"));
log.info("---接受到消息---{}",jsonObject);
//主动异常
int m=1/0;
//手动签收
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}
catch (Exception e) {
log.info("handleEmailMessage捕获到异常,拒绝重新入队---消息ID---{}", headers.get("spring_returned_message_correlation"));
//异常,ture 重新入队,或者false,进入死信队列
channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);
}
}
/**
* 死信消费者,自动签收开启状态下,超过重试次数,或者手动签收,reject或者Nack
* @param message
*/
@RabbitListener(queues = "demo.dead.letter")
public void handleDeadLetterMessage(Message message, Channel channel,@Headers Map<String,Object> headers) throws IOException {
//可以考虑数据库记录,每次进来查数量,达到一定的数量,进行预警,人工介入处理
log.info("接收到死信消息:---{}---消息ID---{}", new String(message.getBody()),headers.get("spring_returned_message_correlation"));
//回复ack
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}
/**
* 延时队列消费
* @param message
* @param channel
* @param headers
* @throws IOException
*/
@RabbitListener(queues ="demo.delay")
@RabbitHandler
public void handleOrderDelayMessage(Message message, Channel channel, @Headers Map<String,Object> headers) throws IOException {
try {
String msg=new String(message.getBody(), CharEncoding.UTF_8);
JSONObject jsonObject = JSON.parseObject(msg);
log.info("---接受到订单消息---orderId---{}",message.getMessageProperties().getMessageId());
log.info("---订单信息---order---{}",jsonObject);
//业务逻辑,根据订单id获取订单信息,如果还未支付,设置关闭状态,如果已支付,不做任何处理
//手动签收
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}
catch (Exception e) {
log.info("handleOrderDelayMessage捕获到异常,重新入队---orderId---{}", headers.get("spring_returned_message_correlation"));
//异常,ture 重新入队,或者false,进入死信队列
channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true);
}
}
}
2.4、结果
运行结果显示,同一个订单号的消息,发送过后2分钟,消费者才接受到,符合预期。
最后
以上就是土豪板栗为你收集整理的Rabbitmq的死信队列和延时队列的全部内容,希望文章能够帮你解决Rabbitmq的死信队列和延时队列所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复