我是靠谱客的博主 呆萌小懒虫,最近开发中收集的这篇文章主要介绍Rabbitm 延迟队列插件rabbitmq_delayed_message_exchange延迟队列:需要安装插件代码实现完整代码,觉得挺不错的,现在分享给大家,希望可以做个参考。
概述
延迟队列:
根据上面的方案我门知道我们生产中不可缺少的使用延迟队列,我们如何实现今天介绍下
需要安装插件
https://www.rabbitmq.com/community-plugins.html 这个里面有rabbitmq的插件集合
我们从中找到rabbitmq_delayed_message_exchange 点击下载,下载后的代码放到你的安装目录
下面的plugins目录下
比如我放的就是:
/usr/local/Cellar/rabbitmq/3.8.16/plugins 这个目录下面,找自己的安装目录
再执行下吗的命令:
如果没有配置全局命令需要到sbin目录下,都是基础知识不在赘述
./rabbitmq-plugins enable rabbitmq_delayed_message_exchange
刷新页面就可以看到下吗的截图中多一个type为x-delayed-message
rabbitmq_delayed_message_exchange
代码实现
声明x-delayed-message类型的交换机代码如下所示:
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-delayed-type", "direct");
channel.exchangeDeclare(交换机名称, "x-delayed-message", true, false, args);
上面的两行代码是重点
发送消息的时候通过header添加"x-delay"参数来设置消息的延时时间,其单位为毫秒
Map<String, Object> headers = new HashMap<String, Object>();
headers.put("x-delay", 5000);
BasicProperties props = new AMQP.BasicProperties.Builder()
.headers(headers).build();
channel.basicPublish("delay_plugin_exchange", "delay", props , "延迟消息".getBytes());
完整代码
package com.wfg.delay;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.MessageProperties;
import com.wfg.util.MQUtils;
import java.io.IOException;
import java.time.LocalDateTime;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;
/**
* @author wufagang
* @description
* @date 2021年05月23日 10:06 上午
*/
public class DelayProducer {
public static final String EXCHANENAME1="delay-exchange999";
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
Channel channel = MQUtils.getChannel();
//channel.exchangeDeclare(EXCHANENAME1,"direct");
Map<String,Object> args1 = new HashMap<>();
args1.put("x-delayed-type","direct");
channel.exchangeDeclare(EXCHANENAME1,"x-delayed-message",true,false,args1);
channel.queueDeclare("delay-queue",false,false,false,null);
channel.queueBind("delay-queue",EXCHANENAME1,"delay-key");
for (int i=0 ;i<5;i++){
Thread.sleep(1000);
System.out.println(System.currentTimeMillis());
Map<String, Object> headers = new HashMap<String, Object>();
headers.put("x-delay", 5000);
AMQP.BasicProperties.Builder props = new AMQP.BasicProperties.Builder().headers(headers);
String message = "序号:"+i +" ,时间:"+ LocalDateTime.now();
channel.basicPublish(EXCHANENAME1,"delay-key",false,
props.build(),message.getBytes());
}
}
}
package com.wfg.delay;
import com.rabbitmq.client.*;
import com.wfg.direct.DirectProducer;
import com.wfg.util.MQUtils;
import java.io.IOException;
import java.time.LocalDateTime;
import java.util.concurrent.TimeoutException;
/**
* @author wufagang
* @description
* @date 2021年05月23日 10:14 上午
*/
public class DelayConstomer {
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = MQUtils.getChannel();
channel.exchangeDeclare(DelayProducer.EXCHANENAME1,"x-delayed-message",true,false,null);
//String queue = channel.queueDeclare().getQueue();
//System.out.println("消费者1 队列 "+ queue);
channel.queueDeclare("deley-queue",false,false,false,null);
channel.queueBind("delay-queue", DelayProducer.EXCHANENAME1,"delay-key");
channel.basicQos(1);
// channel.basicConsume(queue,true,new DefaultConsumer(channel){
// @Override
// public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// System.out.println(System.currentTimeMillis());
// System.out.println("消费者1 info1: =="+new String(body));
// }
// });
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者收到消息:" + new String(body)+",当前时间:"+ LocalDateTime.now());
// 消费者手动发送ack应答
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
System.out.println("消费延迟5秒队列中的消息======================");
// 监听队列
channel.basicConsume("delay-queue", false, consumer);
}
}
效果截图
全部相隔5s
最后
以上就是呆萌小懒虫为你收集整理的Rabbitm 延迟队列插件rabbitmq_delayed_message_exchange延迟队列:需要安装插件代码实现完整代码的全部内容,希望文章能够帮你解决Rabbitm 延迟队列插件rabbitmq_delayed_message_exchange延迟队列:需要安装插件代码实现完整代码所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复