我是靠谱客的博主 独特长颈鹿,最近开发中收集的这篇文章主要介绍【RabbitMQ】 延时队列(delayed_message_exchange插件实现),觉得挺不错的,现在分享给大家,希望可以做个参考。
概述
一、安装 rabbitmq_delayed_message_exchange 插件
- 官方下载地址:https://www.rabbitmq.com/community-plugins.html
(1) linux下直接安装:
注意:
rabbitMQ 中必须是Disc(磁盘型)类型的节点才可以安装延时队列插件, RAM(内存型)类型节点无法安装。
- 1.通过 rpm -qa|grep rabbit 命令查看当前linux是否安装RabbitMQ
[root@linux]# rpm -qa|grep rabbit
rabbitmq-server-3.6.5-1.noarch
- 2.通过 rpm -ql rabbitmq-server-3.6.5-1.noarch 查找RabbitMQ安装路径
[root@linux]# rpm -ql rabbitmq-server-3.6.5-1.noarch
/etc/logrotate.d/rabbitmq-server
/etc/rabbitmq
/etc/rc.d/init.d/rabbitmq-server
/usr/lib/ocf/resource.d/rabbitmq/rabbitmq-server
/usr/lib/ocf/resource.d/rabbitmq/rabbitmq-server-ha
/usr/lib/rabbitmq/bin/rabbitmq-defaults
/usr/lib/rabbitmq/bin/rabbitmq-env
/usr/lib/rabbitmq/bin/rabbitmq-plugins
...
- 3.先通过 rabbitmq-plugins list 查看已安装的插件列表
[root@linux]# rabbitmq-plugins list
Configured: E = explicitly enabled; e = implicitly enabled
| Status: * = running on rabbit@apec-198
|/
[e*] amqp_client 3.6.5
[ ] cowboy 1.0.3
[ ] cowlib 1.0.1
[e*] mochiweb 2.13.1
[ ] rabbitmq_amqp1_0 3.6.5
[ ] rabbitmq_auth_backend_ldap 3.6.5
[ ] rabbitmq_auth_mechanism_ssl 3.6.5
[ ] rabbitmq_consistent_hash_exchange 3.6.5
[ ] rabbitmq_event_exchange 3.6.5
[ ] rabbitmq_federation 3.6.5
[ ] rabbitmq_federation_management 3.6.5
[ ] rabbitmq_jms_topic_exchange 3.6.5
[E*] rabbitmq_management 3.6.5
[e*] rabbitmq_management_agent 3.6.5
[ ] rabbitmq_management_visualiser 3.6.5
[ ] rabbitmq_mqtt 3.6.5
[ ] rabbitmq_recent_history_exchange 1.2.1
[ ] rabbitmq_sharding 0.1.0
[ ] rabbitmq_shovel 3.6.5
[ ] rabbitmq_shovel_management 3.6.5
[ ] rabbitmq_stomp 3.6.5
[ ] rabbitmq_top 3.6.5
[ ] rabbitmq_tracing 3.6.5
[ ] rabbitmq_trust_store 3.6.5
[e*] rabbitmq_web_dispatch 3.6.5
[ ] rabbitmq_web_stomp 3.6.5
[ ] rabbitmq_web_stomp_examples 3.6.5
[ ] sockjs 0.3.4
[e*] webmachine 1.10.3
- 4.进入 /rabbitmq/lib/rabbitmq_server-3.6.5/plugins 上传预下载好的 rabbitmq_delayed_message_exchange 插件(rabbitmq_delayed_message_exchange-20171201-3.7.x.ez)
- 5.启动插件 rabbitmq-plugins enable rabbitmq_delayed_message_exchange
- 6.重启RabbitMQ 使插件生效: service rabbitmq-server restart
(2) linux下 Docker 镜像中安装:
- 1.查看当前Docker容器中的镜像,找到RabbitMQ
- docker ps -a
[root@linux ~]# docker ps -a
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
8efd6f3add3c chenchuxin/dubbo-admin "catalina.sh run" 6 weeks ago Up 5 weeks 0.0.0.0:9090->8080/tcp dubbo-admin
6939b83d0942 zookeeper "/docker-entrypoint.…" 6 weeks ago Up 5 weeks zookeeper01
2aec2548a9f8 525bd2016729 "docker-entrypoint.s…" 6 weeks ago Up 5 weeks 0.0.0.0:27017->27017/tcp docker_mongodb
a6da9a3f6ca2 mongo-express "tini -- /docker-ent…" 6 weeks ago Up 5 weeks 0.0.0.0:8081->8081/tcp agitated_tu
d3dfb1bbfda4 mongo:4.0.4 "docker-entrypoint.s…" 6 weeks ago Up 5 weeks 27017/tcp mymongo
389a673177ea portainer/portainer "/portainer" 7 weeks ago Up 5 weeks 0.0.0.0:9000->9000/tcp prtainer-test
840e143489ac 752be83a5396 "/docker-entrypoint.…" 7 weeks ago Up 5 weeks 0.0.0.0:9200->9200/tcp, 0.0.0.0:9300->9300/tcp es2
c13f5fba3a1a mysql:5.7 "docker-entrypoint.s…" 7 weeks ago Up 4 weeks 0.0.0.0:3306->3306/tcp, 33060/tcp mysql
09512f961186 2888deb59dfc "docker-entrypoint.s…" 7 weeks ago Up 5 weeks 4369/tcp, 5671/tcp, 0.0.0.0:5672->5672/tcp, 15671/tcp, 25672/tcp, 0.0.0.0:15672->15672/tcp rabbitmq3.7.7
a817c6e95c4b redis:4.0 "docker-entrypoint.s…" 7 weeks ago Up 5 weeks 0.0.0.0:7001->6379/tcp redis7001
- 2.预先上传rabbitmq_delayed_message_exchange-20171201-3.7.x.ez插件到Linux文件夹中
- 3.拷贝插件文件到rabbitMQ的Docker容器中
[root@linux ~]# docker cp rabbitmq_delayed_message_exchange-20171201-3.7.x.ez rabbitmq3.7.7:/plugins
- 3.进入rabbitMQ的Docker容器中docker exec -it rabbitmq3.7.7 bash
[root@linux ~]# docker exec -it rabbitmq3.7.7 bash
root@myRabbit:/#
- 4.查看插件列表 rabbitmq-plugins list
- 5.启用插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
二、接入
- 配置x-delayed-type的交换机即可
// config
public static final String DELAYED_ROUTING_KEY = "delay.queue.job.delay.routingKey";
public static final String DELAYED_EXCHANGE_TYPE = "x-delayed-message";
@Bean
public CustomExchange customExchange() {
Map<String, Object> args = new HashMap<>(2);
args.put("x-delayed-type", "direct");
return new CustomExchange(DELAYED_EXCHANGE_NAME, DELAYED_EXCHANGE_TYPE, true, false, args);
}
// message body
/**
* Created by KINGFS on 2020/8/6.
*
* @author KINGFS
*/
@Data
@EqualsAndHashCode(callSuper = false)
public class DelayMessage {
/**
* 持久化
*/
private MessageDeliveryMode deliveryMode = MessageDeliveryMode.PERSISTENT;
/**
* 消息超时时间
*/
private String expiration;
/**
* 消息超时时间
*/
private Integer delay;
/**
* 消息数据
*/
private String message;
}
// sender
/**
* Created by KINGFS on 2020/8/6.
*
* @author KINGFS
*/
@Component
@RequiredArgsConstructor
@FieldDefaults(level = AccessLevel.PRIVATE, makeFinal = true)
public class RabbitMqSender {
RabbitTemplate rabbitTemplate;
//------------------------------------------------------------------------------------------------------------------
public void sendDirectDelayMessage(DelayMessage delayMessage) {
sendMessage(DirectDelayMqConfig.DELAY_EXCHANGE_NAME, DirectDelayMqConfig.DELAY_QUEUE_MESSAGE_TTL_ROUTING_KEY, delayMessage);
}
public void sendXDelayMessage(DelayMessage delayMessage) {
sendMessage(DelayMessageMqConfig.DELAYED_EXCHANGE_NAME, DelayMessageMqConfig.DELAYED_ROUTING_KEY, delayMessage);
}
private void sendMessage(String exchangeName, String routingKey, DelayMessage message) {
rabbitTemplate.convertAndSend(exchangeName, routingKey, message.getMessage(),
messagePostProcessor -> {
MessageProperties messageProperties = messagePostProcessor.getMessageProperties();
messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
messageProperties.setExpiration(message.getExpiration());
messageProperties.setDelay(message.getDelay());
return messagePostProcessor;
}
);
}
}
- 写的不对或者不好的地方欢迎各位斧正,万分感谢!
最后
以上就是独特长颈鹿为你收集整理的【RabbitMQ】 延时队列(delayed_message_exchange插件实现)的全部内容,希望文章能够帮你解决【RabbitMQ】 延时队列(delayed_message_exchange插件实现)所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复