概述
RabbitMQ消息队列(四):SpringBoot整合之发布/订阅模式
P:生产者,也就是要发送消息的程序,但是不再发送到队列中,而是发给 X(交换机)
C:消费者,消息的接受者,会一直等待消息到来。
Queue:消息队列,接收消息、缓存消息。
Exchange:交换机,图中的 X。一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于 Exchange 的类型。
Exchange有常见以下 3 种类型:
- Fanout:广播,将消息交给所有绑定到交换机的队列;
- Direct:定向,把消息交给符合指定 routing key 的队列;
- Topic:通配符,把消息交给符合 routing pattern(路由模式)的队列
Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与 Exchange 绑定,或者没有符合路由规则的队列,那么消息会丢失。
1. 环境准备
参考:https://blog.csdn.net/fatestranger/article/details/126284433?spm=1001.2014.3001.5501
2. 生产者
配置交换机和绑定的队列:
RabbitMQPubSubConfig
@Configuration
public class RabbitMQPubSubConfig {
public static final String EXCHANGE_NAME = "amq-fanout";
private static final String TV_FANOUT_QUEUE_NAME = "TV-fanout-queue";
private static final String PHONE_FANOUT_QUEUE_NAME = "phone-fanout-queue";
private static final String PAPER_FANOUT_QUEUE_NAME = "paper-fanout-queue";
/**
* 1. 声明交换机
*
* @return FanoutExchange
*/
@Bean
public FanoutExchange fanoutExchange() {
/*
FanoutExchange的参数说明:
1. 交换机名称
2. 是否持久化 true:持久化,交换机一直保留 false:不持久化,用完就删除
3. 是否自动删除 false:不自动删除 true:自动删除
*/
return new FanoutExchange(EXCHANGE_NAME, true, false);
}
/*
2. 声明队列
@return Queue
*/
@Bean
public Queue fanoutQueue() {
/*
Queue构造函数参数说明
1. 队列名
2. 是否持久化 true:持久化 false:不持久化
*/
return new Queue(TV_FANOUT_QUEUE_NAME, true);
}
@Bean
public Queue phoneFanoutQueue() {
/*
Queue构造函数参数说明
1. 队列名
2. 是否持久化 true:持久化 false:不持久化
*/
return new Queue(PHONE_FANOUT_QUEUE_NAME, true);
}
@Bean
public Queue paperFanoutQueue() {
/*
Queue构造函数参数说明
1. 队列名
2. 是否持久化 true:持久化 false:不持久化
*/
return new Queue(PAPER_FANOUT_QUEUE_NAME, true);
}
/*
* 3. 队列与交换机绑定
*/
@Bean
public Binding bindingTV() {
return BindingBuilder.bind(fanoutQueue()).to(fanoutExchange());
}
@Bean
public Binding bindingPhone() {
return BindingBuilder.bind(phoneFanoutQueue()).to(fanoutExchange());
}
@Bean
public Binding bindingPaper() {
return BindingBuilder.bind(paperFanoutQueue()).to(fanoutExchange());
}
}
模拟产生消息:
PubSubController
@RestController
@Slf4j
public class PubSubController {
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("/pubQueue")
public String sendMsg(@RequestParam String message) {
log.info("message:{}", message);
//交换机名称
String exchangeName = "amq-fanout";
//fanout模式,不需要路由key
String routeKey = "";
rabbitTemplate.convertAndSend(exchangeName, routeKey, message);
//返回消息
return "OK!";
}
}
手动触发:
http://localhost:8888/pubQueue?message=fanout
3. 消费者
三个消费者:
PubSubReceiver
@Component
@Slf4j
public class PubSubReceiver {
@Autowired
private MqReceiveService mqReceiveService;
@RabbitListener(queues = "TV-fanout-queue")
public void processTV(String msg) {
log.info("队列:TV-fanout-queue,接收msg: {} ", msg);
//消息消费存储
MqReceive mqReceive = new MqReceive();
mqReceive.setType("pubSub");
mqReceive.setReceive("TV-fanout-queue");
mqReceive.setContent(msg);
mqReceiveService.insetMsg(mqReceive);
}
@RabbitListener(queues = "phone-fanout-queue")
public void processPhone(String msg) {
log.info("队列:phone-fanout-queue,接收msg: {} ", msg);
//消息消费存储
MqReceive mqReceive = new MqReceive();
mqReceive.setType("pubSub");
mqReceive.setReceive("phone-fanout-queue");
mqReceive.setContent(msg);
mqReceiveService.insetMsg(mqReceive);
}
@RabbitListener(queues = "paper-fanout-queue")
public void processPaper(String msg) {
log.info("队列:paper-fanout-queue,接收msg: {} ", msg);
//消息消费存储
MqReceive mqReceive = new MqReceive();
mqReceive.setType("pubSub");
mqReceive.setReceive("paper-fanout-queue");
mqReceive.setContent(msg);
mqReceiveService.insetMsg(mqReceive);
}
}
4.总结
交换机需要与队列进行绑定,绑定之后;一个消息可以被多个消费者都收到。
发布订阅模式与工作队列模式的区别:
- 工作队列模式不用定义交换机,而发布/订阅模式需要定义交换机。
- 发布/订阅模式的生产方是面向交换机发送消息,工作队列模式的生产方是面向队列发送消息(底层使用默认交换机)。
- 发布/订阅模式需要设置队列和交换机的绑定,工作队列模式不需要设置,实际上工作队列模式会将队列绑 定到默认的交换机 。
最后
以上就是热情戒指为你收集整理的RabbitMQ消息队列(四):SpringBoot整合之发布订阅模式RabbitMQ消息队列(四):SpringBoot整合之发布/订阅模式的全部内容,希望文章能够帮你解决RabbitMQ消息队列(四):SpringBoot整合之发布订阅模式RabbitMQ消息队列(四):SpringBoot整合之发布/订阅模式所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复