概述
简单示例
在消息发送方给队列设置最大优先级,同时给消息设置优先级,优先级大小 0-255
1.消息发送方
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;
public class RabbitmqProducer {
public static final String ip = "192.168.*.*";
public static final int port = 5672;
public static final String username = "guest";
public static final String password = "guest";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setPassword(password);
connectionFactory.setUsername(username);
connectionFactory.setPort(port);
connectionFactory.setHost(ip);
// 创建信道,信道是在connection上虚拟出来的连接
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
// 声明交换器
channel.exchangeDeclare("exchange_priority","direct",true);
// 声明队列,加入最大优先级参数 x-max-priority
Map<String,Object> param = new HashMap<>();
param.put("x-max-priority", 10); // 必须给队列设置优先级,否则消息设置了优先级也不生效
channel.queueDeclare("queue_priority", true, false, false, param);
// 给队列绑定交换器
channel.queueBind("queue_priority", "exchange_priority", "rk_priority");
// 发送消息
for(int i=0;i<10;i++) {
AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
if(i%2!=0)
builder.priority(5);
AMQP.BasicProperties properties = builder.build();
// 给消息设定优先级并发送
channel.basicPublish("exchange_priority","rk_priority",properties,("messages-"+i).getBytes());
}
channel.close();
connection.close();
}
}
2.消息接收方
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class RabbitmqConsumer {
public static final String ip = "192.168.*.*";
public static final int port = 5672;
public static final String username = "guest";
public static final String password = "guest";
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setPassword(password);
connectionFactory.setUsername(username);
connectionFactory.setPort(port);
connectionFactory.setHost(ip);
// 声明信道
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
Integer priority = delivery.getProperties().getPriority(); // 优先级
System.out.println(" [x] Received '"+ message + "'"+" === priority: " + priority );
};
// 接受消息
channel.basicConsume("queue_priority", true, deliverCallback, consumerTag -> { });
}
}
结合 SpringCloud Stream 使用
1.启动类添加注解
@EnableBinding({DemoPriorityBinding.class})
2.定义一个binding接口
public interface DemoPriorityBinding {
String OUTPUT = "abc-center-demo-priority-output";
String INPUT = "abc-center-demo-priority-input";
@Output(OUTPUT)
MessageChannel output();
@Input(INPUT)
SubscribableChannel input();
}
3.bootstrap.properties配置
spring.cloud.stream.bindings.abc-center-demo-priority-input.destination=abc-center-demo-priority
spring.cloud.stream.bindings.abc-center-demo-priority-input.group=${spring.application.name}
spring.cloud.stream.bindings.abc-center-demo-priority-output.destination=abc-center-demo-priority
# 这里consumer.max-priority一定要有优先级的设置,这里即是给队列设置优先级
spring.cloud.stream.rabbit.bindings.abc-center-demo-priority-input.consumer.max-priority=100
spring.cloud.stream.rabbit.bindings.abc-center-demo-priority-output.producer.max-priority=100
4.发送消息
@Autowired
private DemoPriorityBinding demoPriorityBinding;
public void sendPriorityMsg(String msg, Integer priority) {
// 这里通过setHeader可以给消息设置优先级,priority参数就是优先级的意思
demoPriorityBinding.output().send(MessageBuilder.withPayload(msg).setHeader("priority", priority).build());
log.info("消息发送成功");
}
5.接收消息
@StreamListener(DemoPriorityBinding.INPUT)
public void acquirePriorityMsg(String msg) throws Exception{
// 这里休眠20秒,模拟业务处理,方便让消息排队
Thread.sleep(20000);
log.info("==========接收优先级消息:"+msg);
}
注意
检查队列是否有优先级设置,队列设置完优先级会有如图显示
最后
以上就是想人陪老虎为你收集整理的Rabbitmq使用优先级队列实现消息插队的全部内容,希望文章能够帮你解决Rabbitmq使用优先级队列实现消息插队所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复