概述
源码
- 引入开发包
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <parent> <artifactId>springboot-integrate</artifactId> <groupId>springboot-integrate</groupId> <version>1.0-SNAPSHOT</version> </parent> <modelVersion>4.0.0</modelVersion> <artifactId>springboot-rabbitmq</artifactId> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> <configuration> <mainClass>com.integrate.rabbitmq.RabbitmqApplication</mainClass> </configuration> </plugin> </plugins> </build> </project>
- 将 rabbitmq 配置信息写入 application.yml 文件,springBoot会创建一个 RabbitTemplate 实例
spring: application: name: springboot-rabbitmq rabbitmq: host: 52.83.239.30 port: 5672 username: root password: yunduan2019 # 开启发送确认 publisher-confirms: true # 开启发送失败退回 publisher-returns: true # 开启ACK listener: direct: acknowledge-mode: manual simple: acknowledge-mode: manual template: mandatory: true server: port: 10002
- 创建几个队列和交换机,并将队列绑定至交换机
package com.integrate.rabbitmq.config; import org.springframework.amqp.core.*; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.HashMap; import java.util.Map; /** * @author 刘志强 * @date 2020/8/18 14:07 */ @Configuration public class RabbitmqConfig { /** * 创建一个队列 * @return */ @Bean(name="queue") public Queue queue() { return new Queue("queue"); } @Bean(name="memberQueue") public Queue memberQueue() { return new Queue("memberQueue"); } /** * 创建路由交换机 根据路由匹配转发消息给队列 * @return */ @Bean(name = "exchange") public TopicExchange exchange() { return new TopicExchange("exchange"); } /** * 配置交换机(广播) 转发消息给旗下所有队列 * @return */ @Bean(name = "fanoutExchange") FanoutExchange fanoutExchange() { return new FanoutExchange("fanoutExchange"); } /** * 将队列进绑定至路由交换机并设置路由键 * 交换机会将消息传递给 满足路由键的队列 * @param queue * @param exchange * @return */ @Bean Binding bindingExchangeQueue(@Qualifier("queue") Queue queue, @Qualifier("exchange") TopicExchange exchange) { return BindingBuilder.bind(queue).to(exchange).with("exchange.queue"); } @Bean Binding bindingExchangeMemberQueue(@Qualifier("memberQueue") Queue memberQueue, @Qualifier("exchange") TopicExchange exchange) { // *表示一个词,#表示零个或多个词 return BindingBuilder.bind(memberQueue).to(exchange).with("exchange.*"); } /** * 将队列绑定至广播交换机 * 因为不绑定路由键 所以交换机会把消息传递给被绑定的所由队列 广播交换机无法设置路由键。因为消息会发给旗下的所有队列 * @param queue * @param fanoutExchange * @return */ @Bean Binding bindingFanoutExchangeQueueTwo(@Qualifier("queue") Queue queue, @Qualifier("fanoutExchange") FanoutExchange fanoutExchange) { return BindingBuilder.bind(queue).to(fanoutExchange); } @Bean Binding bindingFanoutExchangeMemberQueueTwo(@Qualifier("memberQueue") Queue memberQueue, @Qualifier("fanoutExchange") FanoutExchange fanoutExchange) { return BindingBuilder.bind(memberQueue).to(fanoutExchange); } }
- 创建订阅类,用于订阅队列
package com.integrate.rabbitmq.consumer; import com.rabbitmq.client.Channel; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import java.io.IOException; /** * 消息的标识,false只确认当前一个消息收到,true确认所有consumer获得的消息 * channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); * ack返回false,并重新回到队列,api里面解释得很清楚 * channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true); * 拒绝消息 * channel.basicReject(message.getMessageProperties().getDeliveryTag(), true); * 丢弃这条消息 * channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,false); * @author 刘志强 * @date 2020/11/13 13:45 */ @Component @Slf4j public class RabbitConsumer { /** * 订阅queue队列 消费消息 * @param object */ @RabbitListener(queues="queue") public void consumerQueue(Channel channel, Object object, Message message) throws IOException { log.info("consumerQueue 消费来自queue队列消息,消息内容为" + object); // 告诉Rabbit已消费,从队列中删除 channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); } /** * 订阅queue队列 不消费消息,并将消息返回队列 * @param object */ @RabbitListener(queues="queue") public void noConsumerQueue(Channel channel, Object object, Message message) throws IOException { log.info("noConsumerQueue 消费来自queue队列消息,消息内容为" + object); // 不消费并返回队列 channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true); } /** * 订阅memberQueue队列 * @param object */ @RabbitListener(queues="memberQueue") public void consumerMemberQueue(Channel channel, Object object, Message message) throws IOException { log.info("consumerMemberQueue 消费memberQueue队列消息,消息内容为" + object); // 告诉Rabbit已消费,从队列中删除 channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); } }
- 创建生产类 用于发送消息,并设置发送成功回调
package com.integrate.rabbitmq.porducer; import lombok.extern.slf4j.Slf4j; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessagePostProcessor; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.util.UUID; /** * @author 刘志强 * @date 2020/8/18 14:47 */ @Component @Slf4j public class AckSender implements RabbitTemplate.ConfirmCallback , RabbitTemplate.ReturnCallback{ @Autowired private RabbitTemplate rabbitTemplate; /** * 指定队列发送 * @param queue * @param content * @return */ public CorrelationData convertAndSend(String queue, Object content) { //设置回调对象 rabbitTemplate.setConfirmCallback(this); rabbitTemplate.setReturnCallback(this); //构建回调返回的数据 CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString()); rabbitTemplate.convertAndSend(queue,content,correlationData); return correlationData; } /** * 指定交换机 和 路由建 发送 * @param exchange * @param routingKey * @param content * @return */ public CorrelationData convertAndSend(String exchange, String routingKey, Object content) { //设置回调对象 rabbitTemplate.setConfirmCallback(this); rabbitTemplate.setReturnCallback(this); //构建回调返回的数据 CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString()); rabbitTemplate.convertAndSend(exchange,routingKey, content,correlationData); return correlationData; } /** * 消息回调确认方法 * 如果消息没有到exchange,则confirm回调,ack=false * 如果消息到达exchange,则confirm回调,ack=true * @param */ @Override public void confirm(CorrelationData correlationData, boolean isSendSuccess, String s) { log.info("confirm--message:回调消息ID为: " + correlationData.getId()); if (isSendSuccess) { log.info("消息进入交换机成功"); } else { log.info("消息进入交换机失败====" + s); } } /** * exchange到queue成功,则不回调return * exchange到queue失败,则回调return(需设置mandatory=true,否则不回回调,消息就丢了) */ @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { StringBuilder str = new StringBuilder(); str.append("return--message:").append(message.getBody()) .append(",replyCode:").append(replyCode).append(",replyText:").append(replyText).append(",exchange:").append(exchange) .append(",routingKey:").append(routingKey); log.info(String.valueOf(str)); } }
- 测试
@Autowired
private AckSender ackSender;
ackSender.convertAndSend("queue", msg);
ackSender.convertAndSend("exchange", "exchange.queue", msg);
延迟消息(实现定时需求)
- 基于TTL(消息存活时间) 和 x-dead-letter-exchange(死信)
- 意思就是 消息在死亡后,会将此消息发送至死信交换机。死信交换机在发送至(x-dead-letter-routing-key)死信路由的下的死信队列(死信交换机和死信队列也是普通的交换机和队列)
- 开始操作
-
创建 延迟队列 并绑定死信交换机 及 死信路由
/** * 创建一个队列作为延迟队列 * @return */ @Bean(name="delayQueue") public Queue delayQueue() { Map<String, Object> args = new HashMap<>(2); // x-dead-letter-exchange 将队列绑定至交换机 args.put("x-dead-letter-exchange", "deathExchange"); // x-dead-letter-routing-key 当消息死亡时,转发给delayExchange交换机的路由 args.put("x-dead-letter-routing-key", "death-route"); return new Queue("delayQueue",true, false, false,args); }
-
创建一个交换机作为 死信交换机,延迟队列里死亡的消息将像此交换机发送
/** * 创建一个交换机 用于绑定死信队列 * @return */ @Bean(name = "deathExchange") public TopicExchange deathExchange() { return new TopicExchange("deathExchange"); }
-
创建一个队列 作为死信队列。延迟队列里死亡的消息将通过 交换机发送至此队列
/** * 创建一个队列作为死信队列 * @return */ @Bean(name="deathQueue") public Queue deathQueue() { return new Queue("deathQueue"); }
-
将死信队列绑定至死信交换机
/** * 将死信列绑定至交换机 并设置路由健 * @param queue * @param exchange * @return */ @Bean Binding deathExchangeDeathQueue(@Qualifier("deathQueue") Queue queue, @Qualifier("deathExchange") TopicExchange exchange) { return BindingBuilder.bind(queue).to(exchange).with("death-route"); }
-
创建 发送消息 配置失效时间
- 自定义消息处理器追加 消息信息 MessagePostProcessor
package com.integrate.rabbitmq.porducer; import org.springframework.amqp.AmqpException; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessagePostProcessor; /** * @author 刘志强 * @date 2020/11/24 16:40 */ public class MyMessagePostProcessor implements MessagePostProcessor { private final Long ttl; public MyMessagePostProcessor(final Long ttl) { this.ttl = ttl; } @Override public Message postProcessMessage(final Message message) throws AmqpException { // 设置消息失效时间 message.getMessageProperties().setExpiration(ttl.toString()); return message; } }
- 创建发送定时过期消息工具方法
public CorrelationData convertAndSendDelay(String queue, Object content, Long time) { //设置回调对象 rabbitTemplate.setConfirmCallback(this); rabbitTemplate.setReturnCallback(this); //构建回调返回的数据 CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString()); MessagePostProcessor messagePostProcessor = new MyMessagePostProcessor(time); rabbitTemplate.convertAndSend(queue, content,messagePostProcessor,correlationData); return correlationData; }
- 自定义消息处理器追加 消息信息 MessagePostProcessor
-
订阅死信队列
/** * 订阅死信队列,当延迟队列的消息死亡时。消息会进入死信队列 * @param channel * @param object * @param message * @throws IOException */ @RabbitListener(queues="deathQueue") public void deathQueue(Channel channel, Object object, Message message) throws IOException { log.info("deathQueue 消费deathQueue队列消息,消息内容为" + object); // 告诉Rabbit已消费,从队列中删除 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); }
-
测试
/** * 像队列发送消息并设置 TTL(过期事件) * @param msg * @return */ @GetMapping("convertAndSendDelay") public String convertAndSendDelay(String msg,Long ttl) { ackSender.convertAndSendDelay("delayQueue",msg,ttl); return "已发送"; }
-
最后
以上就是眯眯眼康乃馨为你收集整理的springboot集成rabbitmq 延迟队列的全部内容,希望文章能够帮你解决springboot集成rabbitmq 延迟队列所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复