概述
一、需求场景
需求:后台管理添加消息支持定时发送;
看到这个需求,我第一个想到的是查询用户消息时,只查询发送时间小于当前时间的消息;
这确实是一种解决方案,不过我这边的需求复杂一些,在手机通知中心中也能定时收到消息;【就是从手机顶部滑下来的消息】
这就要用到延迟队列了,延迟队列的实现有好几种,这里主要讲 rabbitMQ 的实现方式;
二、rabbitMQ 实现消息定时发送
具体流程:添加消息时如果发送时间大于当前时间,调用添加延迟队列的方法;
我们会先计算消息发送时间和当前时间的时间差,把这个时间差和这条消息封装成放入死信队列中,如果时间一到死信队列会根据路由键把消息发送到消息推送队列,我们有个监听消息推送队列的方法,当监听到有消息时处理相应的业务逻辑;
注:死信队列可以理解为放一些异常或无法立即被消费的消息;
三、代码
-
引入依赖;
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
-
配置文件;
spring: rabbitmq: host: 127.0.0.1 port: 5672 username: guest password: guest # 虚拟主机,可以用项目名 virtual-host: demo # 开启发送方消息抵达broker确认回调 publisher-confirm-type: correlated # 开启发送方消息抵达队列确认回调 publisher-returns: true # 只要抵达队列,以异步发送优先回调returnConfirm template: mandatory: true # 手动ack消息,不使用默认消费端确认 listener: simple: acknowledge-mode: manual
-
配置类:配置交换机,队列和路由键的关联关系;
@Configuration public class MyMQConfig { /** 交换机 */ public static final String NOTICE_PUSH_EXCHANGE = "notice_push_exchange"; /** 死信队列 */ public static final String NOTICE_DL_QUEUE = "notice_dl_queue"; /** 死信路由键 */ public static final String NOTICE_DL_KEY = "notice_dl_key"; /** 消息推送队列,死信消息最终会放到这个队列消费 */ public static final String NOTICE_PUSH_QUEUE = "notice_push_queue"; /** 消息推送路由键 */ public static final String NOTICE_PUSH_KEY = "notice_push_key"; /** * 声明交换机 */ @Bean public Exchange noticePushExchange() { // 参数说明:交换机名字,是否持久化,是否自动删除,交换机参数 return new DirectExchange(NOTICE_PUSH_EXCHANGE, true,false, null); } /** * 声明死信队列 */ @Bean public Queue deadLetterQueue() { Map<String, Object> args = new HashMap<>(2); // 声明死信交换机 args.put("x-dead-letter-exchange", NOTICE_PUSH_EXCHANGE); // 转发路由键 args.put("x-dead-letter-routing-key", NOTICE_PUSH_KEY); // 参数说明:队列名字,是否持久化,是否排它,是否自动删除,队列参数 // 排它指的是只能被一个消费者连接使用 return new Queue(NOTICE_DL_QUEUE, true, false, false, args); } /** * 声明消息推送队列 */ @Bean public Queue noticePushQueue() { return new Queue(NOTICE_PUSH_QUEUE, true, false, false, null); } /** * 死信队列,死信路由键和交换机的绑定 */ @Bean public Binding deadLetterBinding() { // 参数说明:绑定名称【队列名称/】,类型【队列/交换机】,绑定交换机名称,绑定路由键,绑定参数 return new Binding(NOTICE_DL_QUEUE, Binding.DestinationType.QUEUE, NOTICE_PUSH_EXCHANGE, NOTICE_DL_KEY, null); } /** * 消息推送队列,消息推送路由键和交换机的绑定 */ @Bean public Binding noticePushBinding() { return new Binding(NOTICE_PUSH_QUEUE, Binding.DestinationType.QUEUE, NOTICE_PUSH_EXCHANGE, NOTICE_PUSH_KEY, null); } }
-
添加延迟消息的方法;
public void addDelayNotice(Notice notice){ // 发送时间和当前时间相差的毫秒数,DateUtil.betweenMs()是hutool工具包的方法 long betweenMs = DateUtil.betweenMs(notice.getSendTime(), new Date()); MessagePostProcessor messagePostProcessor = message -> { MessageProperties messageProperties = message.getMessageProperties(); // 设置编码 messageProperties.setContentEncoding("utf-8"); // 设置过期时间,单位毫秒 messageProperties.setExpiration(Long.toString(betweenMs)); return message; }; // 加入到死信队列 rabbitTemplate.convertAndSend(MyMQConfig.NOTICE_PUSH_EXCHANGE, MyMQConfig.NOTICE_DL_KEY, notice, messagePostProcessor); } /** * 通知类 */ @Data public class Notice { // 主键id private Integer id; // 发送时间 @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss",timezone = "GMT+8") private Date sendTime; // 其他属性 }
-
监听消息推送队列;
@Service @RabbitListener(queues = {MyMQConfig.NOTICE_PUSH_QUEUE}) public class NoticePushListener { @SneakyThrows @RabbitHandler public void noticePush(Notice notice, Channel channel, Message message){ try { // TODO 监听到消息推送队列传来的notice对象,调用推送的方法 channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); } catch (Exception e) { // 拒签,重新放回队列(可能自身服务问题报错等原因) channel.basicReject(message.getMessageProperties().getDeliveryTag(),true); } } }
四、消息重复消费问题
当我们修改还没发送的消息时也会将消息加到死信队列,这时有个消息重复的问题;
在项目中我是这样处理的,在发送消息的方法中比较数据库的发送时间和消息推送队列中Notice对象的发送时间是否一样,如果不一样就不发消息;也可以加个版本号字段判断,每次修改时版本号都加1;
最后
以上就是淡定悟空为你收集整理的RabbitMQ 定时消息处理场景的全部内容,希望文章能够帮你解决RabbitMQ 定时消息处理场景所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复