我是靠谱客的博主 淡定悟空,最近开发中收集的这篇文章主要介绍RabbitMQ 定时消息处理场景,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

一、需求场景

需求:后台管理添加消息支持定时发送;

看到这个需求,我第一个想到的是查询用户消息时,只查询发送时间小于当前时间的消息;

这确实是一种解决方案,不过我这边的需求复杂一些,在手机通知中心中也能定时收到消息;【就是从手机顶部滑下来的消息】

这就要用到延迟队列了,延迟队列的实现有好几种,这里主要讲 rabbitMQ 的实现方式;


二、rabbitMQ 实现消息定时发送

具体流程:添加消息时如果发送时间大于当前时间,调用添加延迟队列的方法;

我们会先计算消息发送时间和当前时间的时间差,把这个时间差和这条消息封装成放入死信队列中,如果时间一到死信队列会根据路由键把消息发送到消息推送队列,我们有个监听消息推送队列的方法,当监听到有消息时处理相应的业务逻辑;

注:死信队列可以理解为放一些异常或无法立即被消费的消息;


三、代码

  1. 引入依赖;

    <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    
  2. 配置文件;

    spring:
    rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest
    password: guest
    # 虚拟主机,可以用项目名
    virtual-host: demo
    # 开启发送方消息抵达broker确认回调
    publisher-confirm-type: correlated
    # 开启发送方消息抵达队列确认回调
    publisher-returns: true
    # 只要抵达队列,以异步发送优先回调returnConfirm
    template:
    mandatory: true
    # 手动ack消息,不使用默认消费端确认
    listener:
    simple:
    acknowledge-mode: manual
    
  3. 配置类:配置交换机,队列和路由键的关联关系;

    @Configuration
    public class MyMQConfig {
    /** 交换机 */
    public static final String NOTICE_PUSH_EXCHANGE = "notice_push_exchange";
    /** 死信队列 */
    public static final String NOTICE_DL_QUEUE = "notice_dl_queue";
    /** 死信路由键 */
    public static final String NOTICE_DL_KEY = "notice_dl_key";
    /** 消息推送队列,死信消息最终会放到这个队列消费 */
    public static final String NOTICE_PUSH_QUEUE = "notice_push_queue";
    /** 消息推送路由键 */
    public static final String NOTICE_PUSH_KEY = "notice_push_key";
    /**
    * 声明交换机
    */
    @Bean
    public Exchange noticePushExchange() {
    // 参数说明:交换机名字,是否持久化,是否自动删除,交换机参数
    return new DirectExchange(NOTICE_PUSH_EXCHANGE, true,false, null);
    }
    /**
    * 声明死信队列
    */
    @Bean
    public Queue deadLetterQueue() {
    Map<String, Object> args = new HashMap<>(2);
    // 声明死信交换机
    args.put("x-dead-letter-exchange", NOTICE_PUSH_EXCHANGE);
    // 转发路由键
    args.put("x-dead-letter-routing-key", NOTICE_PUSH_KEY);
    // 参数说明:队列名字,是否持久化,是否排它,是否自动删除,队列参数
    // 排它指的是只能被一个消费者连接使用
    return new Queue(NOTICE_DL_QUEUE, true, false, false, args);
    }
    /**
    * 声明消息推送队列
    */
    @Bean
    public Queue noticePushQueue() {
    return new Queue(NOTICE_PUSH_QUEUE, true, false, false, null);
    }
    /**
    * 死信队列,死信路由键和交换机的绑定
    */
    @Bean
    public Binding deadLetterBinding() {
    // 参数说明:绑定名称【队列名称/】,类型【队列/交换机】,绑定交换机名称,绑定路由键,绑定参数
    return new Binding(NOTICE_DL_QUEUE, Binding.DestinationType.QUEUE, NOTICE_PUSH_EXCHANGE, NOTICE_DL_KEY, null);
    }
    /**
    * 消息推送队列,消息推送路由键和交换机的绑定
    */
    @Bean
    public Binding noticePushBinding() {
    return new Binding(NOTICE_PUSH_QUEUE, Binding.DestinationType.QUEUE, NOTICE_PUSH_EXCHANGE, NOTICE_PUSH_KEY, null);
    }
    }
    
  4. 添加延迟消息的方法;

    public void addDelayNotice(Notice notice){
    // 发送时间和当前时间相差的毫秒数,DateUtil.betweenMs()是hutool工具包的方法
    long betweenMs = DateUtil.betweenMs(notice.getSendTime(), new Date());
    MessagePostProcessor messagePostProcessor = message -> {
    MessageProperties messageProperties = message.getMessageProperties();
    // 设置编码
    messageProperties.setContentEncoding("utf-8");
    // 设置过期时间,单位毫秒
    messageProperties.setExpiration(Long.toString(betweenMs));
    return message;
    };
    // 加入到死信队列
    rabbitTemplate.convertAndSend(MyMQConfig.NOTICE_PUSH_EXCHANGE, MyMQConfig.NOTICE_DL_KEY, notice, messagePostProcessor);
    }
    /**
    * 通知类
    */
    @Data
    public class Notice {
    // 主键id
    private Integer id;
    // 发送时间
    @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss",timezone = "GMT+8")
    private Date sendTime;
    // 其他属性
    }
    
  5. 监听消息推送队列;

    @Service
    @RabbitListener(queues = {MyMQConfig.NOTICE_PUSH_QUEUE})
    public class NoticePushListener {
    @SneakyThrows
    @RabbitHandler
    public void noticePush(Notice notice, Channel channel, Message message){
    try {
    // TODO 监听到消息推送队列传来的notice对象,调用推送的方法
    channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
    } catch (Exception e) {
    // 拒签,重新放回队列(可能自身服务问题报错等原因)
    channel.basicReject(message.getMessageProperties().getDeliveryTag(),true);
    }
    }
    }
    

四、消息重复消费问题

当我们修改还没发送的消息时也会将消息加到死信队列,这时有个消息重复的问题;

在项目中我是这样处理的,在发送消息的方法中比较数据库的发送时间和消息推送队列中Notice对象的发送时间是否一样,如果不一样就不发消息;也可以加个版本号字段判断,每次修改时版本号都加1;

最后

以上就是淡定悟空为你收集整理的RabbitMQ 定时消息处理场景的全部内容,希望文章能够帮你解决RabbitMQ 定时消息处理场景所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(42)

评论列表共有 0 条评论

立即
投稿
返回
顶部