我是靠谱客的博主 想人陪老虎,最近开发中收集的这篇文章主要介绍Rabbitmq使用优先级队列实现消息插队,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

简单示例

在消息发送方给队列设置最大优先级,同时给消息设置优先级,优先级大小 0-255

1.消息发送方

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;

public class RabbitmqProducer {
    public static final String ip = "192.168.*.*";
    public static final int port = 5672;
    public static final String username = "guest";
    public static final String password = "guest";

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setPassword(password);
        connectionFactory.setUsername(username);
        connectionFactory.setPort(port);
        connectionFactory.setHost(ip);

        // 创建信道,信道是在connection上虚拟出来的连接
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();

        // 声明交换器
        channel.exchangeDeclare("exchange_priority","direct",true);

        // 声明队列,加入最大优先级参数 x-max-priority
        Map<String,Object> param = new HashMap<>();
        param.put("x-max-priority", 10);  // 必须给队列设置优先级,否则消息设置了优先级也不生效
        channel.queueDeclare("queue_priority", true, false, false, param);

        // 给队列绑定交换器
        channel.queueBind("queue_priority", "exchange_priority", "rk_priority");

        // 发送消息
        for(int i=0;i<10;i++) {
            AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
            if(i%2!=0)
                builder.priority(5);
            AMQP.BasicProperties properties = builder.build();
            // 给消息设定优先级并发送
            channel.basicPublish("exchange_priority","rk_priority",properties,("messages-"+i).getBytes());
        }

        channel.close();
        connection.close();
    }
}


2.消息接收方

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class RabbitmqConsumer {

    public static final String ip = "192.168.*.*";
    public static final int port = 5672;
    public static final String username = "guest";
    public static final String password = "guest";

    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setPassword(password);
        connectionFactory.setUsername(username);
        connectionFactory.setPort(port);
        connectionFactory.setHost(ip);

        // 声明信道
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();

        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            Integer priority = delivery.getProperties().getPriority(); // 优先级
            System.out.println(" [x] Received '"+ message + "'"+" === priority: " + priority );
        };

        // 接受消息
        channel.basicConsume("queue_priority", true, deliverCallback, consumerTag -> { });

    }
}



结合 SpringCloud Stream 使用

1.启动类添加注解

@EnableBinding({DemoPriorityBinding.class})

2.定义一个binding接口

public interface DemoPriorityBinding {

    String OUTPUT = "abc-center-demo-priority-output";
    String INPUT = "abc-center-demo-priority-input";

    @Output(OUTPUT)
    MessageChannel output();

    @Input(INPUT)
    SubscribableChannel input();
}

3.bootstrap.properties配置

spring.cloud.stream.bindings.abc-center-demo-priority-input.destination=abc-center-demo-priority
spring.cloud.stream.bindings.abc-center-demo-priority-input.group=${spring.application.name}
spring.cloud.stream.bindings.abc-center-demo-priority-output.destination=abc-center-demo-priority
# 这里consumer.max-priority一定要有优先级的设置,这里即是给队列设置优先级
spring.cloud.stream.rabbit.bindings.abc-center-demo-priority-input.consumer.max-priority=100
spring.cloud.stream.rabbit.bindings.abc-center-demo-priority-output.producer.max-priority=100

4.发送消息

@Autowired
private DemoPriorityBinding demoPriorityBinding;

public void sendPriorityMsg(String msg, Integer priority) {
	// 这里通过setHeader可以给消息设置优先级,priority参数就是优先级的意思
	demoPriorityBinding.output().send(MessageBuilder.withPayload(msg).setHeader("priority", priority).build());
    log.info("消息发送成功");
}

5.接收消息

@StreamListener(DemoPriorityBinding.INPUT)
public void acquirePriorityMsg(String msg) throws Exception{
    // 这里休眠20秒,模拟业务处理,方便让消息排队
    Thread.sleep(20000);
    log.info("==========接收优先级消息:"+msg);
}


注意检查队列是否有优先级设置,队列设置完优先级会有如图显示
在这里插入图片描述

最后

以上就是想人陪老虎为你收集整理的Rabbitmq使用优先级队列实现消息插队的全部内容,希望文章能够帮你解决Rabbitmq使用优先级队列实现消息插队所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(47)

评论列表共有 0 条评论

立即
投稿
返回
顶部