概述
- 延时队列
RabbitMQ本身不具有延时消息队列的功能,但是可以通过TTL(Time To Live)、DLX(Dead Letter Exchanges)特性实现。其原理给消息设置过期时间,在消息队列上为过期消息指定转发器,这样消息过期后会转发到与指定转发器匹配的队列上,变向实现延时队列。利用RabbitMQ的这种特性,应该可以实现很多现实中的业务,我们可以发挥想象。
rabbitmq-delayed-message-exchange ,我们也可以使用插件来实现延时队列。利用TTL、DLX实现的延时队列可以中断,使用插件实现的延时队列是否可以中断?留着下次。。。
- 注意要点
为每一条消息设置过期时间 :
- Builder properties=new BasicProperties.Builder();
- //指定消息过期时间为12秒,队列上也可以指定消息的过期时间,两者以较小时间为准
- properties.expiration("12000");//延时12秒,不会及时删除(在consuemr消费时判定是否过期,因为每条消息的过期时间不一致,删除过期消息就需要扫描整个队列)
- channel.basicPublish("header_exchange", "" ,properties.build(), SerializationUtils.serialize(object));
在队列上设置队列过期时间(可以不用设置)、消息过期时间、过期消息转发规则 :
- //设置消息过期时间为12秒,消息过期转发给指定转发器、匹配的routingkey(可不指定)
- Map<String, Object> args=new HashMap<String, Object>();
- args.put("x-expires", 30000);//队列过期时间
- args.put("x-message-ttl", 12000);//队列上消息过期时间,应小于队列过期时间
- args.put("x-dead-letter-exchange", "exchange-direct");//过期消息转向路由
- args.put("x-dead-letter-routing-key", "routing-delay");//过期消息转向路由相匹配routingkey
消息没有consumer消费才会过期,所以接收消息类中consumer需要注释掉
队列上设置消息过期时间和消息上设置消息过期时间,优先级以较小的为准
队列上设置消息过期时间和消息上设置消息过期时间,后者过期消息有可能不会及时删除,因为每条消息的过期时间不一致,删除过期消息就需要扫描整个队列,因此消费时判断是否过期
- 发送消息类
- package com.demo.mq.rabbitmq.example08;
- import java.io.IOException;
- import java.io.Serializable;
- import java.util.HashMap;
- import java.util.Map;
- import org.apache.commons.lang3.SerializationUtils;
- import com.demo.mq.rabbitmq.MqManager;
- import com.rabbitmq.client.AMQP.BasicProperties;
- import com.rabbitmq.client.AMQP.BasicProperties.Builder;
- import com.rabbitmq.client.BuiltinExchangeType;
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- /**
- * 发送消息类
- * @author sheungxin
- *
- */
- public class Send{
- /**
- * 在topic转发器的基础上练习延时转发,发送消息时指定消息过期时间
- * 消息已发送到queue上,但未有consumer进行消费
- * @param object 消息主体
- * @throws IOException
- */
- public static void sendAToB(Serializable object) throws Exception{
- Connection conn=MqManager.newConnection();
- Channel channel=conn.createChannel();
- //声明headers转发器
- channel.exchangeDeclare("header_exchange", BuiltinExchangeType.HEADERS);
- //定义headers存储的键值对
- Map<String, Object> headers=new HashMap<String, Object>();
- headers.put("key", "123456");
- headers.put("token", "654321");
- //把键值对放在properties
- Builder properties=new BasicProperties.Builder();
- properties.headers(headers);
- properties.deliveryMode(2);//持久化
- //指定消息过期时间为12秒,队列上也可以指定消息的过期时间,两者以较小时间为准
- // properties.expiration("12000");//延时12秒,不会及时删除(在consuemr消费时判定是否过期,因为每条消息的过期时间不一致,删除过期消息就需要扫描整个队列)
- channel.basicPublish("header_exchange", "" ,properties.build(), SerializationUtils.serialize(object));
- System.out.println("Send '"+object+"'");
- channel.close();
- conn.close();
- }
- public static void main(String[] args) throws Exception {
- sendAToB("Hello World !");
- }
- }
- 接收消息类
- package com.demo.mq.rabbitmq.example08;
- import java.io.IOException;
- import java.util.HashMap;
- import java.util.Map;
- import org.apache.commons.lang3.SerializationUtils;
- import com.demo.mq.rabbitmq.MqManager;
- import com.rabbitmq.client.AMQP;
- import com.rabbitmq.client.BuiltinExchangeType;
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.Consumer;
- import com.rabbitmq.client.DefaultConsumer;
- import com.rabbitmq.client.Envelope;
- /**
- * 接收消息类
- * @author sheungxin
- *
- */
- public class Recv {
- /**
- * 在topic转发器的基础上练习延时转发,设置队列过期时间(过期后自动删除),过期消息处理策略(转发给相匹配的queue)
- * 实验时启动接收类创建队列后,关闭该线程,使其进入未使用状态
- * @throws Exception
- */
- public static void recvAToB() throws Exception{
- Connection conn=MqManager.newConnection();
- Channel channel=conn.createChannel();
- channel.exchangeDeclare("header_exchange", BuiltinExchangeType.HEADERS);
- //设置队列过期时间为30秒,消息过期转发给指定转发器、匹配的routingkey(可不指定)
- Map<String, Object> args=new HashMap<String, Object>();
- args.put("x-expires", 30000);//队列过期时间
- args.put("x-message-ttl", 12000);//队列上消息过期时间
- args.put("x-dead-letter-exchange", "exchange-direct");//过期消息转向路由
- args.put("x-dead-letter-routing-key", "routing-delay");//过期消息转向路由相匹配routingkey
- //创建一个临时队列
- String queueName=channel.queueDeclare("tmp01",false,false,false,args).getQueue();
- //指定headers的匹配类型(all、any)、键值对
- Map<String, Object> headers=new HashMap<String, Object>();
- headers.put("x-match", "all");//all any(只要有一个键值对匹配即可)
- headers.put("key", "123456");
- // headers.put("token", "6543211");
- //绑定临时队列和转发器header_exchange
- channel.queueBind(queueName, "header_exchange", "", headers);
- System.out.println("Received ...");
- Consumer consumer=new DefaultConsumer(channel){
- @Override
- public void handleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body) throws IOException{
- String mes=SerializationUtils.deserialize(body);
- System.out.println(envelope.getRoutingKey()+":Received :'"+mes+"' done");
- channel.basicAck(envelope.getDeliveryTag(), false);
- }
- };
- //关闭自动应答机制,默认开启;这时候需要手动进行应该
- channel.basicConsume(queueName, false, consumer);
- }
- public static void main(String[] args) throws Exception {
- recvAToB();
- }
- }
- 延时消息处理类
- package com.demo.mq.rabbitmq.example08;
- import java.io.IOException;
- import org.apache.commons.lang3.SerializationUtils;
- import com.demo.mq.rabbitmq.MqManager;
- import com.rabbitmq.client.AMQP;
- import com.rabbitmq.client.BuiltinExchangeType;
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.Consumer;
- import com.rabbitmq.client.DefaultConsumer;
- import com.rabbitmq.client.Envelope;
- /**
- * 延时消息处理类
- * @author sheungxin
- *
- */
- public class DelayRecv {
- /**
- * 创建队列并声明consumer用于处理转发过来的延时消息
- * @throws Exception
- */
- public static void delayRecv() throws Exception{
- Connection conn=MqManager.newConnection();
- Channel channel=conn.createChannel();
- channel.exchangeDeclare("exchange-direct", BuiltinExchangeType.DIRECT);
- String queueName=channel.queueDeclare().getQueue();
- channel.queueBind(queueName, "exchange-direct", "routing-delay");
- Consumer consumer=new DefaultConsumer(channel){
- @Override
- public void handleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body) throws IOException{
- String mes=SerializationUtils.deserialize(body);
- System.out.println(envelope.getRoutingKey()+":delay Received :'"+mes+"' done");
- }
- };
- //关闭自动应答机制,默认开启;这时候需要手动进行应该
- channel.basicConsume(queueName, true, consumer);
- }
- public static void main(String[] args) throws Exception {
- delayRecv();
- }
- }
最后
以上就是要减肥钢笔为你收集整理的RabbitMQ使用场景练习:延迟队列(八)的全部内容,希望文章能够帮你解决RabbitMQ使用场景练习:延迟队列(八)所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复