我是靠谱客的博主 眯眯眼康乃馨,最近开发中收集的这篇文章主要介绍springboot集成rabbitmq 延迟队列,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

源码

  1. 引入开发包
    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <parent>
            <artifactId>springboot-integrate</artifactId>
            <groupId>springboot-integrate</groupId>
            <version>1.0-SNAPSHOT</version>
        </parent>
        <modelVersion>4.0.0</modelVersion>
    
        <artifactId>springboot-rabbitmq</artifactId>
    
        <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-amqp</artifactId>
            </dependency>
        </dependencies>
    
        <build>
            <plugins>
                <plugin>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-maven-plugin</artifactId>
                    <configuration>
                        <mainClass>com.integrate.rabbitmq.RabbitmqApplication</mainClass>
                    </configuration>
                </plugin>
            </plugins>
        </build>
    </project>
    
  2. 将 rabbitmq 配置信息写入 application.yml 文件,springBoot会创建一个 RabbitTemplate 实例
    spring:
      application:
        name: springboot-rabbitmq
      rabbitmq:
        host: 52.83.239.30
        port: 5672
        username: root
        password: yunduan2019
        # 开启发送确认
        publisher-confirms: true
        # 开启发送失败退回
        publisher-returns: true
        # 开启ACK
        listener:
          direct:
            acknowledge-mode: manual
          simple:
            acknowledge-mode: manual
        template:
          mandatory: true
    server:
      port: 10002
    
  3. 创建几个队列和交换机,并将队列绑定至交换机
    package com.integrate.rabbitmq.config;
    
    import org.springframework.amqp.core.*;
    import org.springframework.beans.factory.annotation.Qualifier;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    import java.util.HashMap;
    import java.util.Map;
    
    /**
     * @author 刘志强
     * @date 2020/8/18 14:07
     */
    @Configuration
    public class RabbitmqConfig {
    
        /**
         * 创建一个队列
         * @return
         */
        @Bean(name="queue")
        public Queue queue() {
            return new Queue("queue");
        }
    
        @Bean(name="memberQueue")
        public Queue memberQueue() {
            return new Queue("memberQueue");
        }
    
    
    
        /**
         * 创建路由交换机 根据路由匹配转发消息给队列
         * @return
         */
        @Bean(name = "exchange")
        public TopicExchange exchange() {
            return new TopicExchange("exchange");
        }
    
    
        /**
         * 配置交换机(广播) 转发消息给旗下所有队列
         * @return
         */
        @Bean(name = "fanoutExchange")
        FanoutExchange fanoutExchange() {
            return new FanoutExchange("fanoutExchange");
        }
    
    
    
        /**
         * 将队列进绑定至路由交换机并设置路由键
         * 交换机会将消息传递给 满足路由键的队列
         * @param queue
         * @param exchange
         * @return
         */
        @Bean
        Binding bindingExchangeQueue(@Qualifier("queue") Queue queue, @Qualifier("exchange") TopicExchange exchange) {
            return BindingBuilder.bind(queue).to(exchange).with("exchange.queue");
        }
    
        @Bean
        Binding bindingExchangeMemberQueue(@Qualifier("memberQueue") Queue memberQueue, @Qualifier("exchange") TopicExchange exchange) {
            // *表示一个词,#表示零个或多个词
            return BindingBuilder.bind(memberQueue).to(exchange).with("exchange.*");
        }
    
        /**
         * 将队列绑定至广播交换机
         * 因为不绑定路由键 所以交换机会把消息传递给被绑定的所由队列 广播交换机无法设置路由键。因为消息会发给旗下的所有队列
         * @param queue
         * @param fanoutExchange
         * @return
         */
        @Bean
        Binding bindingFanoutExchangeQueueTwo(@Qualifier("queue") Queue queue, @Qualifier("fanoutExchange") FanoutExchange fanoutExchange) {
            return BindingBuilder.bind(queue).to(fanoutExchange);
        }
        @Bean
        Binding bindingFanoutExchangeMemberQueueTwo(@Qualifier("memberQueue") Queue memberQueue, @Qualifier("fanoutExchange") FanoutExchange fanoutExchange) {
            return BindingBuilder.bind(memberQueue).to(fanoutExchange);
        }
    }
    
    
  4. 创建订阅类,用于订阅队列
    package com.integrate.rabbitmq.consumer;
    
    
    import com.rabbitmq.client.Channel;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    import java.io.IOException;
    
    /**
     * 消息的标识,false只确认当前一个消息收到,true确认所有consumer获得的消息
     * channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
     * ack返回false,并重新回到队列,api里面解释得很清楚
     * channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
     * 拒绝消息
     * channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
     * 丢弃这条消息
     * channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,false);
     * @author 刘志强
     * @date 2020/11/13 13:45
     */
    @Component
    @Slf4j
    public class RabbitConsumer {
    
        /**
         * 订阅queue队列 消费消息
         * @param object
         */
        @RabbitListener(queues="queue")
        public void consumerQueue(Channel channel, Object object, Message message) throws IOException {
            log.info("consumerQueue 消费来自queue队列消息,消息内容为" + object);
            // 告诉Rabbit已消费,从队列中删除
            channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
        }
    
        /**
         * 订阅queue队列 不消费消息,并将消息返回队列
         * @param object
         */
        @RabbitListener(queues="queue")
        public void noConsumerQueue(Channel channel, Object object, Message message) throws IOException {
            log.info("noConsumerQueue 消费来自queue队列消息,消息内容为" + object);
            // 不消费并返回队列
            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
    
        }
    
    
        /**
         * 订阅memberQueue队列
         * @param object
         */
        @RabbitListener(queues="memberQueue")
        public void consumerMemberQueue(Channel channel, Object object, Message message) throws IOException {
            log.info("consumerMemberQueue 消费memberQueue队列消息,消息内容为" + object);
            // 告诉Rabbit已消费,从队列中删除
            channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
        }
    
    }
    
  5. 创建生产类 用于发送消息,并设置发送成功回调
    package com.integrate.rabbitmq.porducer;
    
    import lombok.extern.slf4j.Slf4j;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.core.MessagePostProcessor;
    import org.springframework.amqp.rabbit.connection.CorrelationData;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    
    import java.util.UUID;
    
    /**
     * @author 刘志强
     * @date 2020/8/18 14:47
     */
    @Component
    @Slf4j
    public class AckSender implements  RabbitTemplate.ConfirmCallback , RabbitTemplate.ReturnCallback{
    
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        /**
         * 指定队列发送
         * @param queue
         * @param content
         * @return
         */
        public CorrelationData convertAndSend(String queue, Object content) {
            //设置回调对象
            rabbitTemplate.setConfirmCallback(this);
            rabbitTemplate.setReturnCallback(this);
            //构建回调返回的数据
            CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
            rabbitTemplate.convertAndSend(queue,content,correlationData);
            return correlationData;
        }
    
        /**
         * 指定交换机 和 路由建 发送
         * @param exchange
         * @param routingKey
         * @param content
         * @return
         */
        public CorrelationData convertAndSend(String exchange, String routingKey, Object content) {
            //设置回调对象
            rabbitTemplate.setConfirmCallback(this);
            rabbitTemplate.setReturnCallback(this);
            //构建回调返回的数据
            CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
            rabbitTemplate.convertAndSend(exchange,routingKey, content,correlationData);
            return correlationData;
        }
    
        /**
         * 消息回调确认方法
         * 如果消息没有到exchange,则confirm回调,ack=false
         * 如果消息到达exchange,则confirm回调,ack=true
         * @param
         */
        @Override
        public void confirm(CorrelationData correlationData, boolean isSendSuccess, String s) {
            log.info("confirm--message:回调消息ID为: " + correlationData.getId());
            if (isSendSuccess) {
                log.info("消息进入交换机成功");
            } else {
                log.info("消息进入交换机失败====" + s);
            }
        }
    
        /**
         * exchange到queue成功,则不回调return
         * exchange到queue失败,则回调return(需设置mandatory=true,否则不回回调,消息就丢了)
         */
        @Override
        public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
            StringBuilder str = new StringBuilder();
            str.append("return--message:").append(message.getBody())
                    .append(",replyCode:").append(replyCode).append(",replyText:").append(replyText).append(",exchange:").append(exchange)
                    .append(",routingKey:").append(routingKey);
            log.info(String.valueOf(str));
        }
    }
    
    
  6. 测试
    @Autowired
    private AckSender ackSender;
    
    ackSender.convertAndSend("queue", msg);
    
    ackSender.convertAndSend("exchange", "exchange.queue", msg);

延迟消息(实现定时需求)

  1. 基于TTL(消息存活时间) 和 x-dead-letter-exchange(死信)
  2. 意思就是 消息在死亡后,会将此消息发送至死信交换机。死信交换机在发送至(x-dead-letter-routing-key)死信路由的下的死信队列(死信交换机和死信队列也是普通的交换机和队列)
  3. 开始操作
    1. 创建 延迟队列 并绑定死信交换机 及 死信路由

          /**
          * 创建一个队列作为延迟队列
          * @return
          */
          @Bean(name="delayQueue")
          public Queue delayQueue() {
              Map<String, Object> args = new HashMap<>(2);
              // x-dead-letter-exchange    将队列绑定至交换机
              args.put("x-dead-letter-exchange", "deathExchange");
              // x-dead-letter-routing-key  当消息死亡时,转发给delayExchange交换机的路由
              args.put("x-dead-letter-routing-key", "death-route");
              return new Queue("delayQueue",true, false, false,args);
          }       
      
    2. 创建一个交换机作为 死信交换机,延迟队列里死亡的消息将像此交换机发送

          /**
           * 创建一个交换机 用于绑定死信队列
           * @return
           */
          @Bean(name = "deathExchange")
          public TopicExchange deathExchange() {
              return new TopicExchange("deathExchange");
          }
      
    3. 创建一个队列 作为死信队列。延迟队列里死亡的消息将通过 交换机发送至此队列

          /**
           * 创建一个队列作为死信队列
           * @return
           */
          @Bean(name="deathQueue")
          public Queue deathQueue() {
              return new Queue("deathQueue");
          }
      
    4. 将死信队列绑定至死信交换机

          /**
           * 将死信列绑定至交换机 并设置路由健
           * @param queue
           * @param exchange
           * @return
           */
          @Bean
          Binding deathExchangeDeathQueue(@Qualifier("deathQueue") Queue queue, @Qualifier("deathExchange") TopicExchange exchange) {
              return BindingBuilder.bind(queue).to(exchange).with("death-route");
          }
      
    5. 创建 发送消息 配置失效时间

      1. 自定义消息处理器追加 消息信息 MessagePostProcessor
        package com.integrate.rabbitmq.porducer;
        
        import org.springframework.amqp.AmqpException;
        import org.springframework.amqp.core.Message;
        import org.springframework.amqp.core.MessagePostProcessor;
        
        /**
         * @author 刘志强
         * @date 2020/11/24 16:40
         */
        public class MyMessagePostProcessor implements MessagePostProcessor {
        
            private final Long ttl;
        
            public MyMessagePostProcessor(final Long ttl) {
                this.ttl = ttl;
            }
        
            @Override
            public Message postProcessMessage(final Message message) throws AmqpException {
                // 设置消息失效时间
                message.getMessageProperties().setExpiration(ttl.toString());
                return message;
            }
        }
        
      2. 创建发送定时过期消息工具方法
            public CorrelationData convertAndSendDelay(String queue, Object content, Long time) {
                //设置回调对象
                rabbitTemplate.setConfirmCallback(this);
                rabbitTemplate.setReturnCallback(this);
                //构建回调返回的数据
                CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
                MessagePostProcessor messagePostProcessor = new MyMessagePostProcessor(time);
                rabbitTemplate.convertAndSend(queue, content,messagePostProcessor,correlationData);
                return correlationData;
            }
        
    6. 订阅死信队列

          /**
           * 订阅死信队列,当延迟队列的消息死亡时。消息会进入死信队列
           * @param channel
           * @param object
           * @param message
           * @throws IOException
           */
          @RabbitListener(queues="deathQueue")
          public void deathQueue(Channel channel, Object object, Message message) throws IOException {
              log.info("deathQueue 消费deathQueue队列消息,消息内容为" + object);
              // 告诉Rabbit已消费,从队列中删除
              channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
          }
      
    7. 测试

          /**
           * 像队列发送消息并设置 TTL(过期事件)
           * @param msg
           * @return
           */
          @GetMapping("convertAndSendDelay")
          public String convertAndSendDelay(String msg,Long ttl) {
              ackSender.convertAndSendDelay("delayQueue",msg,ttl);
              return "已发送";
          }
      

最后

以上就是眯眯眼康乃馨为你收集整理的springboot集成rabbitmq 延迟队列的全部内容,希望文章能够帮你解决springboot集成rabbitmq 延迟队列所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(47)

评论列表共有 0 条评论

立即
投稿
返回
顶部