我是靠谱客的博主 热情戒指,最近开发中收集的这篇文章主要介绍RabbitMQ消息队列(四):SpringBoot整合之发布订阅模式RabbitMQ消息队列(四):SpringBoot整合之发布/订阅模式,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

RabbitMQ消息队列(四):SpringBoot整合之发布/订阅模式

在这里插入图片描述
P:生产者,也就是要发送消息的程序,但是不再发送到队列中,而是发给 X(交换机)
C:消费者,消息的接受者,会一直等待消息到来。
Queue:消息队列,接收消息、缓存消息。
Exchange:交换机,图中的 X。一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于 Exchange 的类型。

Exchange有常见以下 3 种类型:

  • Fanout:广播,将消息交给所有绑定到交换机的队列;
  • Direct:定向,把消息交给符合指定 routing key 的队列;
  • Topic:通配符,把消息交给符合 routing pattern(路由模式)的队列

Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与 Exchange 绑定,或者没有符合路由规则的队列,那么消息会丢失。

1. 环境准备

参考:https://blog.csdn.net/fatestranger/article/details/126284433?spm=1001.2014.3001.5501

2. 生产者

配置交换机和绑定的队列:

RabbitMQPubSubConfig

@Configuration
public class RabbitMQPubSubConfig {

  public static final String EXCHANGE_NAME = "amq-fanout";
  private static final String TV_FANOUT_QUEUE_NAME = "TV-fanout-queue";
  private static final String PHONE_FANOUT_QUEUE_NAME = "phone-fanout-queue";
  private static final String PAPER_FANOUT_QUEUE_NAME = "paper-fanout-queue";

  /**
   * 1. 声明交换机
   *
   * @return FanoutExchange
   */
  @Bean
  public FanoutExchange fanoutExchange() {
    /*
      FanoutExchange的参数说明:
      1. 交换机名称
      2. 是否持久化 true:持久化,交换机一直保留 false:不持久化,用完就删除
      3. 是否自动删除 false:不自动删除 true:自动删除
     */
    return new FanoutExchange(EXCHANGE_NAME, true, false);
  }

  /*
    2. 声明队列

    @return Queue
   */
  @Bean
  public Queue fanoutQueue() {
    /*
      Queue构造函数参数说明
      1. 队列名
      2. 是否持久化 true:持久化 false:不持久化
     */
    return new Queue(TV_FANOUT_QUEUE_NAME, true);
  }

  @Bean
  public Queue phoneFanoutQueue() {
    /*
      Queue构造函数参数说明
      1. 队列名
      2. 是否持久化 true:持久化 false:不持久化
     */
    return new Queue(PHONE_FANOUT_QUEUE_NAME, true);
  }

  @Bean
  public Queue paperFanoutQueue() {
    /*
      Queue构造函数参数说明
      1. 队列名
      2. 是否持久化 true:持久化 false:不持久化
     */
    return new Queue(PAPER_FANOUT_QUEUE_NAME, true);
  }

  /*
   * 3. 队列与交换机绑定
   */
  @Bean
  public Binding bindingTV() {
    return BindingBuilder.bind(fanoutQueue()).to(fanoutExchange());
  }

  @Bean
  public Binding bindingPhone() {
    return BindingBuilder.bind(phoneFanoutQueue()).to(fanoutExchange());
  }

  @Bean
  public Binding bindingPaper() {
    return BindingBuilder.bind(paperFanoutQueue()).to(fanoutExchange());
  }
}

模拟产生消息:

PubSubController

@RestController
@Slf4j
public class PubSubController {

  @Autowired
  private RabbitTemplate rabbitTemplate;

  @GetMapping("/pubQueue")
  public String sendMsg(@RequestParam String message) {

    log.info("message:{}", message);
    //交换机名称
    String exchangeName = "amq-fanout";
    //fanout模式,不需要路由key
    String routeKey = "";

    rabbitTemplate.convertAndSend(exchangeName, routeKey, message);
    //返回消息
    return "OK!";
  }
}

手动触发:

http://localhost:8888/pubQueue?message=fanout

3. 消费者

三个消费者:

PubSubReceiver

@Component
@Slf4j
public class PubSubReceiver {

  @Autowired
  private MqReceiveService mqReceiveService;

  @RabbitListener(queues = "TV-fanout-queue")
  public void processTV(String msg) {
    log.info("队列:TV-fanout-queue,接收msg: {} ", msg);

    //消息消费存储
    MqReceive mqReceive = new MqReceive();
    mqReceive.setType("pubSub");
    mqReceive.setReceive("TV-fanout-queue");
    mqReceive.setContent(msg);
    mqReceiveService.insetMsg(mqReceive);
  }

  @RabbitListener(queues = "phone-fanout-queue")
  public void processPhone(String msg) {
    log.info("队列:phone-fanout-queue,接收msg: {} ", msg);

    //消息消费存储
    MqReceive mqReceive = new MqReceive();
    mqReceive.setType("pubSub");
    mqReceive.setReceive("phone-fanout-queue");
    mqReceive.setContent(msg);
    mqReceiveService.insetMsg(mqReceive);
  }

  @RabbitListener(queues = "paper-fanout-queue")
  public void processPaper(String msg) {
    log.info("队列:paper-fanout-queue,接收msg: {} ", msg);

    //消息消费存储
    MqReceive mqReceive = new MqReceive();
    mqReceive.setType("pubSub");
    mqReceive.setReceive("paper-fanout-queue");
    mqReceive.setContent(msg);
    mqReceiveService.insetMsg(mqReceive);
  }

}

在这里插入图片描述

4.总结

交换机需要与队列进行绑定,绑定之后;一个消息可以被多个消费者都收到。
发布订阅模式与工作队列模式的区别:

  1. 工作队列模式不用定义交换机,而发布/订阅模式需要定义交换机。
  2. 发布/订阅模式的生产方是面向交换机发送消息,工作队列模式的生产方是面向队列发送消息(底层使用默认交换机)。
  3. 发布/订阅模式需要设置队列和交换机的绑定,工作队列模式不需要设置,实际上工作队列模式会将队列绑 定到默认的交换机 。

最后

以上就是热情戒指为你收集整理的RabbitMQ消息队列(四):SpringBoot整合之发布订阅模式RabbitMQ消息队列(四):SpringBoot整合之发布/订阅模式的全部内容,希望文章能够帮你解决RabbitMQ消息队列(四):SpringBoot整合之发布订阅模式RabbitMQ消息队列(四):SpringBoot整合之发布/订阅模式所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(60)

评论列表共有 0 条评论

立即
投稿
返回
顶部