概述
文章目录
- 安装插件
- 代码
- 注册交换机和Queue
- 消息发送者
- 消息消费者
- 常见问题
安装插件
- 下载
rabbitmq_delayed_message_exchange
插件地址(尽量与rabbitMQ版本一致):https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases - 将
rabbitmq_delayed_message_exchange
插件拷贝到 rabbitMQ 安装目录下的 plugins 目录中 - 进入RabbitMQ安装目录中的
sbin
目录,启动插件
./rabbitmq-plugins enable rabbitmq_delayed_message_exchange
- 验证是否安装成功,如果存在下图所示的交换机类型,则表示安装成功。
代码
注册交换机和Queue
package com.wcong.concise.amqp;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.CustomExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
/**
* rabbitMQ相关配置
*
* @author: wcong
* @date: 2022/3/4 18:13
*/
@Configuration
public class RabbitMqConfiguration {
/** queue name */
public static final String QUEUE_NAME = "delay_queue_test";
/** 交换机name */
public static final String EXCHANGE_NAME = "delay_exchange_test";
/**
* 自定义交换机,这里为自定义延迟交换机
*
* @return org.springframework.amqp.core.CustomExchange
*/
@Bean
public CustomExchange customExchange() {
Map<String, Object> args = new HashMap<>(2);
// 定义交换机分发消息类型,direct、fanout、topic、header
args.put("x-delayed-type", "direct");
/* 各个参数含义解释:
* 1、交换机名称
* 2、交换机类型,延迟交换机固定为:x-delayed-message
* 3、交换机是否持久化
* 4、如果没有队列绑定到交换机,交换机是否删除
* 5、结构化参数
*/
return new CustomExchange(EXCHANGE_NAME, "x-delayed-message", true, false, args);
}
/**
* 定义queue
*
* @return org.springframework.amqp.core.Queue
*/
@Bean
public Queue queue() {
/*
* 各个参数含义解释:
* 1、队列名称
* 2、队列是否持久化到磁盘,如果设置为 false,当mq宕机后,消息会丢失
* 3、队列是否专属,专属的范围针对的是连接,也就是说,一个连接下面的多个通道是可见的,对于其他连接是不可见的.连接断开后,该队列会被删除.注意,不是通道断开,是连接断开.并且,就算设置成了持久化,也会删除.
* 4、如果所有消费者都断开连接了,是否自动删除.如果还没有消费者从该队列获取过消息或者监听该队列,那么该队列不会删除.只有在有消费者从该队列获取过消息后,该队列才有可能自动删除(当所有消费者都断开连接,不管消息是否获取完)
*/
return new Queue(QUEUE_NAME, true, false, false);
}
/**
* 将Queue绑定到交换机上
*
* @return org.springframework.amqp.core.Binding
*/
@Bean
public Binding binding() {
return BindingBuilder.bind(queue()).to(customExchange()).with(QUEUE_NAME).noargs();
}
}
消息发送者
package com.wcong.concise.test;
import com.wcong.concise.amqp.RabbitMqConfiguration;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.boot.test.context.SpringBootTest;
import java.io.UnsupportedEncodingException;
import java.time.LocalDateTime;
/**
* TODO desc
*
* @author: wcong
* @date: 2022/3/4 18:37
*/
@SpringBootTest
@RequiredArgsConstructor
@Slf4j
public class DelayMsgApplicationTests{
private final RabbitTemplate rabbitTemplate;
@Test
public void contextLoads() throws UnsupportedEncodingException {
// 模拟消息
String msg = "this is msg: " + LocalDateTime.now();
rabbitTemplate.convertAndSend(RabbitMqConfiguration.EXCHANGE_NAME, RabbitMqConfiguration.QUEUE_NAME, msg, messagePostProcessor -> {
// 延迟10s
long delayTime = 10 * 1000L;
messagePostProcessor.getMessageProperties().setDelay(Math.toIntExact(delayTime));
return messagePostProcessor;
});
log.info(">>> msg send success!");
}
}
消息消费者
package com.wcong.concise.amqp;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* 消息消费者
*
* @author: wcong
* @date: 2022/3/4 18:34
*/
@Component
@Slf4j
public class MsgReceiverTest{
@RabbitListener(queues = RabbitMqConfiguration.QUEUE_NAME)
public void handleMsg(String msg){
log.info(">>> 接收到消息: {}", msg);
}
}
常见问题
-
检查交换机是否为具备延迟属性,即
Type
为x-delayed-message
-
发送延迟消息没有被消费:检查队列是否被路由到了交换机:
最后
以上就是坚定煎蛋为你收集整理的RabbitMQ使用插件实现延迟消息,常见问题解决方案安装插件代码常见问题的全部内容,希望文章能够帮你解决RabbitMQ使用插件实现延迟消息,常见问题解决方案安装插件代码常见问题所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复