我是靠谱客的博主 呆萌小懒虫,最近开发中收集的这篇文章主要介绍Rabbitm 延迟队列插件rabbitmq_delayed_message_exchange延迟队列:需要安装插件代码实现完整代码,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

延迟队列:

根据上面的方案我门知道我们生产中不可缺少的使用延迟队列,我们如何实现今天介绍下

需要安装插件

https://www.rabbitmq.com/community-plugins.html 这个里面有rabbitmq的插件集合
我们从中找到rabbitmq_delayed_message_exchange 点击下载,下载后的代码放到你的安装目录
下面的plugins目录下
比如我放的就是:
/usr/local/Cellar/rabbitmq/3.8.16/plugins 这个目录下面,找自己的安装目录

再执行下吗的命令:
如果没有配置全局命令需要到sbin目录下,都是基础知识不在赘述

./rabbitmq-plugins enable rabbitmq_delayed_message_exchange

刷新页面就可以看到下吗的截图中多一个type为x-delayed-message
在这里插入图片描述
rabbitmq_delayed_message_exchange
在这里插入图片描述

在这里插入图片描述

代码实现

声明x-delayed-message类型的交换机代码如下所示:

Map<String, Object> args = new HashMap<String, Object>();
args.put("x-delayed-type", "direct");
channel.exchangeDeclare(交换机名称, "x-delayed-message", true, false, args);
上面的两行代码是重点

发送消息的时候通过header添加"x-delay"参数来设置消息的延时时间,其单位为毫秒

Map<String, Object> headers = new HashMap<String, Object>();
headers.put("x-delay", 5000);
BasicProperties props = new AMQP.BasicProperties.Builder()
	                .headers(headers).build();
channel.basicPublish("delay_plugin_exchange", "delay", props , "延迟消息".getBytes());

完整代码

package com.wfg.delay;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.MessageProperties;
import com.wfg.util.MQUtils;

import java.io.IOException;
import java.time.LocalDateTime;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;

/**
 * @author wufagang
 * @description
 * @date 2021年05月23日 10:06 上午
 */
public class DelayProducer {
    public static final String EXCHANENAME1="delay-exchange999";
    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        Channel channel = MQUtils.getChannel();
        //channel.exchangeDeclare(EXCHANENAME1,"direct");
        Map<String,Object> args1 = new HashMap<>();
        args1.put("x-delayed-type","direct");
        channel.exchangeDeclare(EXCHANENAME1,"x-delayed-message",true,false,args1);
        channel.queueDeclare("delay-queue",false,false,false,null);
        channel.queueBind("delay-queue",EXCHANENAME1,"delay-key");

        for (int i=0 ;i<5;i++){
            Thread.sleep(1000);
            System.out.println(System.currentTimeMillis());
            Map<String, Object> headers = new HashMap<String, Object>();
            headers.put("x-delay", 5000);
            AMQP.BasicProperties.Builder props = new AMQP.BasicProperties.Builder().headers(headers);
            String message = "序号:"+i +" ,时间:"+ LocalDateTime.now();
            channel.basicPublish(EXCHANENAME1,"delay-key",false,
                    props.build(),message.getBytes());


        }

    }
}

package com.wfg.delay;

import com.rabbitmq.client.*;
import com.wfg.direct.DirectProducer;
import com.wfg.util.MQUtils;

import java.io.IOException;
import java.time.LocalDateTime;
import java.util.concurrent.TimeoutException;

/**
 * @author wufagang
 * @description
 * @date 2021年05月23日 10:14 上午
 */
public class DelayConstomer {

    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = MQUtils.getChannel();
        channel.exchangeDeclare(DelayProducer.EXCHANENAME1,"x-delayed-message",true,false,null);
        //String queue = channel.queueDeclare().getQueue();
        //System.out.println("消费者1 队列  "+ queue);
        channel.queueDeclare("deley-queue",false,false,false,null);
        channel.queueBind("delay-queue", DelayProducer.EXCHANENAME1,"delay-key");
        channel.basicQos(1);
//        channel.basicConsume(queue,true,new DefaultConsumer(channel){
//            @Override
//            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//                System.out.println(System.currentTimeMillis());
//                System.out.println("消费者1 info1: =="+new String(body));
//            }
//        });




        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者收到消息:" + new String(body)+",当前时间:"+ LocalDateTime.now());
                // 消费者手动发送ack应答
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };

        System.out.println("消费延迟5秒队列中的消息======================");
        // 监听队列
        channel.basicConsume("delay-queue", false, consumer);

    }
}

效果截图
在这里插入图片描述
全部相隔5s

最后

以上就是呆萌小懒虫为你收集整理的Rabbitm 延迟队列插件rabbitmq_delayed_message_exchange延迟队列:需要安装插件代码实现完整代码的全部内容,希望文章能够帮你解决Rabbitm 延迟队列插件rabbitmq_delayed_message_exchange延迟队列:需要安装插件代码实现完整代码所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(52)

评论列表共有 0 条评论

立即
投稿
返回
顶部