我是靠谱客的博主 端庄斑马,最近开发中收集的这篇文章主要介绍【微服务】SpringAMQP实现RabbitMQ的消息模型,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

文章目录

    • 1、基本概念
    • 2、消息模型
      • 2.1、粗浅引入
      • 2.2、环境准备
    • 3、基本消息队列
      • 3.0、配置MQ
      • 3.1、消息发送
      • 3.2、消息接收
      • 3.3、测试
    • 4、工作消息队列
      • 4.1、消息发送
      • 4.2、消息接收
      • 4.3、能者多劳
      • 4.4、测试
    • 5、发布/订阅
      • 5.1、Fanout Exchange
        • 5.1.1.声明队列和交换机
        • 5.1.2、消息发送
        • 5.1.3、消息接收
        • 5.1.4、测试
      • 5.2、Direct Exchange
        • 5.2.1、基于注解声明队列和交换机
        • 5.2.2、消息发送
        • 5.2.3、测试
      • 5.3、Topic Exchange
        • 5.3.1、消息发送
        • 5.3.2、消息接收
        • 5.3.3、测试
    • 6、消息转换器
      • 6.1、尝试传递Map对象
        • 6.1.1、消息发送
        • 6.1.2、消息接收
        • 6.1.3、测试
      • 6.2、优化改进
        • 6.2.1、导入依赖
        • 6.2.2、配置消息转换器
        • 6.2.3、测试

1、基本概念

SpringAMQP是基于RabbitMQ封装的一套模板,并且还利用SpringBoot对其实现了自动装配,可谓是十分优雅,SpringAMQP由两部分组成;spring-amqp 是基础抽象,spring-rabbit 是 RabbitMQ 实现。用官方很牛且掰的话来讲就是:(自行跳转:https://spring.io/projects/spring-amqp)

Spring AMQP 项目将核心 Spring 概念应用于基于 AMQP 的消息传递解决方案的开发。它提供了一个“模板”作为发送和接收消息的高级抽象。它还通过“侦听器容器”为消息驱动的 POJO 提供支持。这些库促进了 AMQP 资源的管理,同时促进了依赖注入和声明性配置的使用。在所有这些情况下,都会看到与 Spring Framework 中的 JMS 支持的相似之处。

SpringAMQP提供了三个功能:

  • 自动声明队列、交换机及其绑定关系;
  • 基于注解的监听器模式,异步接收消息;
  • 封装了RabbitTemplate工具,用于发送和接收消息 ,一般用于发送

image-20220731114113911

2、消息模型

2.1、粗浅引入


在RabbitMQ官方提供了五种有点相同但又没有完全相同的示例,其对应了不同的消息模型(其实我也没有找到这个示例在哪,有兴趣的小伙伴可以自己去扒拉一下再顺道告诉我[/dog])

下面的按顺序分别对应着不同的用法:

  • 基本消息队列(BasicQueue) ,对应着1. “Hello World!”
  • 工作消息队列(WorkQueue) ,对应着2. “Work queues”
  • 发布订阅(Publish、Subscribe) ,后三种都属于这一类型,又根据交换机类型不同分为三种:
    • Fanout Exchange:广播。对应着"3. Publish/Subscribe";
    • Direct Exchange:路由。对应着"4. Routing";
    • Topic Exchange:话题。对应着"5. Topics"。

image-20220731115329866

2.2、环境准备


这里是用到的是父工程mq-demo进行管理依赖,然后两个子工程publisher和consumer分别对应消息的发送者和消息的消费者。

因为这里只涉及到了MQ且所有依赖两个子工程都用得到,因此所有依赖都统一交由父工程管理,子工程共用即可。

<dependencies>
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
    </dependency>
    <!--AMQP依赖,包含RabbitMQ-->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    <!--单元测试-->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
    </dependency>
</dependencies>

在搭建好项目环境之后就会得到下面的结构啦:

image-20210717163604330

3、基本消息队列

在前面编写的《【微服务】RabbitMQ的粗浅入门》中其实已经编写过这一基本消息队列的具体实现了,但是那边文章使用的是不太优雅的代码方式,连接、通道、队列这些都需要我们逐一搭建,因此在这里使用SpringAMQP再次实现一遍,从而形成对比,让我们看看SpringAMQP究竟有多么优雅。

image-20210717163434647

3.0、配置MQ


这其实是应该放在环境准备里面去编写的,但是为了对比原生中的编写连接,最终决定放在了这里。

分别在两个子工程的application.yml文件中添加如下配置,注意主机IP和账号密码记得换成自己的,并且两个子工程都需要进行配置。在分别配置之后就可以一劳永逸啦,而不像原生的一样,每写一次发送接收都需要配置参数建立连接。优雅,永不过时。

spring:
  rabbitmq:
    host: 192.168.150.101 # 主机名
    port: 5672 # 端口
    virtual-host: / # 虚拟主机
    username: xbaozi # 用户名
    password: 123456 # 密码

3.1、消息发送


这里在publisher服务中使用测试类对发送端的代码进行发送

@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAmqpTest {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void testSimpleQueue() {
        // 队列名称
        String queueName = "simple.queue";
        // 消息
        String message = "hello, spring amqp!";
        // 发送消息
        rabbitTemplate.convertAndSend(queueName, message);
    }
}

3.2、消息接收


在consumer服务中新建RabbitConfig配置类,将队列加入到容器中。

@Configuration
public class RabbitConfig {
    @Bean
    public Queue simpleQueue() {
        return new Queue("simple.queue");
    }
}

在consumer服务中新建SpringRabbitListener类对消息进行监听。

@Component
public class SpringRabbitListener {

    @RabbitListener(queues = "simple.queue")
    public void listenSimpleQueueMessage(String msg) throws InterruptedException {
        System.out.println("spring 消费者接收到消息:【" + msg + "】");
    }
}

3.3、测试


运行consumer服务运行测试类进行消息发送,因为这里只有consumer服务有注入了队列对象,如果先进行消息发送的话则会报错。

image-20220731161347000

4、工作消息队列

Work queues,也被称为(Task queues),任务模型。简单来说就是让多个消费者绑定到一个队列,共同消费队列中的消息

当消息处理比较耗时的时候,可能生产消息的速度会远远大于消息的消费速度。长此以往,消息就会堆积越来越多,无法及时处理。此时就可以使用work 模型,多个消费者共同处理消息处理,速度就能大大提高了。

4.1、消息发送


在publisher服务中的SpringAmqpTest类中添加一个测试方法,模拟大小消息的堆积现象。

@Test
public void testWorkQueue() throws InterruptedException {
    // 队列名称
    String queueName = "simple.queue";
    // 消息
    String message = "hello, message_";
    for (int i = 0; i < 50; i++) {
        // 发送消息
        rabbitTemplate.convertAndSend(queueName, message + i);
        Thread.sleep(20);
    }
}

4.2、消息接收


这里在consumer服务中的SpringRabbitListener类中定义两个监听方法,分别模拟多个消费者对同一队列进行监听处理。

  • 方法1进行sleep处理20ms,模拟性能好的服务器,1s处理五十条数据
  • 方法2进行sleep处理200ms,模拟性能较差的服务器,1s处理五条数据
@RabbitListener(queues = "simple.queue")
public void listenWorkQueue1(String msg) throws InterruptedException {
    System.out.println("消费者1接收到消息:【" + msg + "】" + LocalTime.now());
    Thread.sleep(20);
}

@RabbitListener(queues = "simple.queue")
public void listenWorkQueue2(String msg) throws InterruptedException {
    System.err.println("消费者2........接收到消息:【" + msg + "】" + LocalTime.now());
    Thread.sleep(200);
}

4.3、能者多劳


其实到这里就可以分别运行两个服务了,但是运行之后会发现结果并没有按照我们想的来执行,而是出现了一个随机的处理结果。

这是因为两个不同的消费者默认情况下,会从队列中争夺性的获取随机条信息进行处理,这就会导致性能较差的消费者撑死揽下了所有的消息。

这时我们就需要进行一定的配置,使得每个消费者一次只能获取一条消息,处理完成之后才能获取下一个消息。因此,在application.yml文件中添加以下配置:

spring:
  rabbitmq:
    listener:
      simple:
      	# 每次只能获取一条消息,处理完成才能获取下一个消息
        prefetch: 1

4.4、测试


同样运行consumer服务运行测试类进行消息发送,我们会发现成功按照我们想象中的根据性能的不同处理不同的消息,使得资源得以最大化使用。同样我们也可以看得出来这一模型的一些特点:

  • 多个消费者绑定到一个队列,同一条消息只会被一个消费者处理
  • 通过设置prefetch来控制消费者预取的消息数量

image-20220731162939666

5、发布/订阅

在发布/订阅模型与前面的模型最大的不同即为多了一个exchange角色,且过程略有变化:

  • Publisher:生产者,也就是要发送消息的程序,但是不再发送到队列中,而是发给交换机;
  • Exchange:交换机,。一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作取决于Exchange的类型。Exchange有以下3种类型:
    • Fanout:广播,将消息交给所有绑定到交换机的队列;
    • Direct:定向,把消息交给符合指定routing key 的队列;
    • Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列;
  • Consumer:消费者,与以前一样,订阅队列,没有变化;
  • Queue:消息队列也与以前一样,接收消息、缓存消息。

image-20210717165309625

值得注意的是,Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失

5.1、Fanout Exchange


在广播模式下,消息发送流程是这样的:

  1. 可以有多个队列
  2. 每个队列都要绑定到Exchange(交换机)
  3. 生产者发送的消息,只能发送到交换机,交换机来决定要发给哪个队列,生产者无法决定
  4. 交换机把消息发送给绑定过的所有队列
  5. 订阅队列的消费者都能拿到消息

image-20210717165438225

5.1.1.声明队列和交换机


在consumer服务中的RabbitConfig中声明队列和交换机

@Configuration
public class RabbitConfig {
    /**
     * 声明交换机
     * @return Fanout类型交换机
     */
    @Bean
    public FanoutExchange fanoutExchange(){
        return new FanoutExchange("xbaozi.fanout");
    }

    /**
     * 第1个队列,方法名是这一个Bean的唯一标识
     */
    @Bean
    public Queue fanoutQueue1(){
        return new Queue("fanout.queue1");
    }

    /**
     * @description: 绑定队列和交换机
     * @param: fanoutQueue1     根据队列1的方法名进行自动装配
     * @param: fanoutExchange   根据交换机的方法名进行自动装配
     **/
    @Bean
    public Binding bindingQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange){
        return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
    }

    /**
     * 第2个队列,方法名是这一个Bean的唯一标识
     */
    @Bean
    public Queue fanoutQueue2(){
        return new Queue("fanout.queue2");
    }

    /**
     * @description: 绑定队列和交换机
     * @param: fanoutQueue2     根据队列2的方法名进行自动装配
     * @param: fanoutExchange   根据交换机的方法名进行自动装配
     **/
    @Bean
    public Binding bindingQueue2(Queue fanoutQueue2, FanoutExchange fanoutExchange){
        return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
    }
}

5.1.2、消息发送


在publisher服务的SpringAmqpTest类中添加测试方法:

@Test
public void testFanoutExchange() {
    // 交换机名称
    String exchangeName = "xbaozi.fanout";
    // 消息
    String message = "hello, everyone!";
    // routingKey,即第二个参数先留空,后续Direct Exchange模型使用
    rabbitTemplate.convertAndSend(exchangeName, "", message);
}

5.1.3、消息接收


在consumer服务的SpringRabbitListener中添加两个方法,分别模拟两个不同的消费者:

@RabbitListener(queues = "fanout.queue1")
public void listenFanoutQueue1(String msg) {
    System.out.println("消费者1接收到Fanout消息:【" + msg + "】");
}

@RabbitListener(queues = "fanout.queue2")
public void listenFanoutQueue2(String msg) {
    System.out.println("消费者2接收到Fanout消息:【" + msg + "】");
}

5.1.4、测试


同样运行consumer服务运行测试类进行消息发送,得出结果后我们发现,结果和我们预料的一样,如同广播,发送的消息都被所有的消费者接收到了。同样我们也可以看得出来这一模型的一些特点:

  • 接收publisher发送的消息,FanoutExchange的会将消息路由到每个绑定的队列
  • 不能缓存消息,路由失败,消息丢失

image-20220731181209638

5.2、Direct Exchange


在Fanout模式中,一条消息,会被所有订阅的队列都消费。但是,在某些场景下,我们希望不同的消息被不同的队列消费。这时就要用到Direct类型的Exchange。在Direct模型下:

  • 队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由key)
  • 消息的发送方在 向 Exchange发送消息时,也必须指定消息的 RoutingKey。
  • Exchange不再把消息交给每一个绑定的队列,而是根据消息的Routing Key进行判断,只有队列的Routingkey与消息的 Routing key完全一致,才会接收到消息

image-20210717170041447

5.2.1、基于注解声明队列和交换机


在前面的Fanout Exchange中我们大概可以感受得到手动的注入bean其实是一件很繁琐的事情,因此在Spring中还给我们提供了基于注解的方式来声明。

这里我们同样模拟两个消费者接收消息进行测试,在此之前对下面用到的注解进行一个简单的解释:

  • @RabbitListener:进行MQ的系列配置;
  • @QueueBinding:指定绑定关系,其中的key属性为指定绑定的路由key;
  • @Exchange:指定交换机,如果没有则创建,使用枚举类ExchangeTypes指定类型是Direct(这是默认的类型);
  • @Queue:指定监听的队列,如果没有则创建。
@RabbitListener(bindings = @QueueBinding(
        exchange = @Exchange(name = "xbaozi.direct", type = ExchangeTypes.DIRECT),
        value = @Queue(name = "direct.queue1"),
        // 指定绑定的路由key,相同的是red,不同的是yellow
        key = {"red", "yellow"}
))
public void listenDirectQueue1(String msg){
    System.out.println("消费者1接收到direct.queue1的消息:【" + msg + "】");
}

@RabbitListener(bindings = @QueueBinding(
        exchange = @Exchange(name = "xbaozi.direct", type = ExchangeTypes.DIRECT),
        value = @Queue(name = "direct.queue2"),
        // 指定绑定的路由key,相同的是red,不同的是blue
        key = {"red", "blue"}
))
public void listenDirectQueue2(String msg){
    System.out.println("消费者2接收到direct.queue2的消息:【" + msg + "】");
}

5.2.2、消息发送


在publisher服务的SpringAmqpTest类中添加测试方法:

@Test
public void testSendDirectExchange() {
    // 交换机名称
    String exchangeName = "xbaozi.direct";
    // 消息
    String message1 = "红色警报!日本乱排核废水,导致海洋生物变异,惊现哥斯拉!";
    String message2 = "蓝色警报!最近台风来袭,广东听闻终于下雨!";
    String message3 = "黄色警报!近期持续高温,导致地表温度升高,广东热成焦鸡腿!";
    // 发送消息
    rabbitTemplate.convertAndSend(exchangeName, "red", message1);
    rabbitTemplate.convertAndSend(exchangeName, "blue", message2);
    rabbitTemplate.convertAndSend(exchangeName, "yellow", message3);
}

5.2.3、测试


同样运行consumer服务运行测试类进行消息发送,得出结果后我们发现确实是按照我们指定的key对消息进行了正确的路由。同样我们也可以看得出来这一模型的一些特点:

  • Fanout交换机将消息路由给每一个与之绑定的队列
  • Direct交换机根据RoutingKey判断路由给哪个队列
  • 如果多个队列具有相同的RoutingKey,则与Fanout功能类似

image-20220731182510334

5.3、Topic Exchange


Topic与Direct相比,都是可以根据RoutingKey把消息路由到不同的队列。只不过Topic可以让队列在绑定Routing key 的时候使用通配符,这是两者最大的区别。

Routingkey 一般都是有一个或多个单词组成,多个单词之间以 ”.” 分割,例如: china.culture

通配符规则:

#:匹配一个或多个词,如china.#能够匹配china.beijing.news 或者 china.beijing

*****:匹配不多不少恰好1个词,如china.*:只能匹配china.news或者china.culture

image-20210717170705380

5.3.1、消息发送


在publisher服务的SpringAmqpTest类中添加测试方法:

@Test
public void testSendTopicExchange() {
    // 交换机名称
    String exchangeName = "xbaozi.topic";
    // 消息
    String message1 = "卖报卖报!小日子不错的那些人不顾全球安危,坚持乱排核废水!";
    String message2 = "预警!日本樱岛火山爆发,当地警戒级别已升至最高,请注意安全!";
    String message3 = "2022年5号台风桑达已生成,各地请注意暴雨冰雹!";
    // 发送消息
    rabbitTemplate.convertAndSend(exchangeName, "china.news", message1);
    rabbitTemplate.convertAndSend(exchangeName, "japan.news", message2);
    rabbitTemplate.convertAndSend(exchangeName, "china.weather", message3);
}

5.3.2、消息接收


在consumer服务的SpringRabbitListener中添加方法,注解和Direct Exchange中的几乎一样,只需要将交换机的类型换成TOPIC即可。

@RabbitListener(bindings = @QueueBinding(
        exchange = @Exchange(name = "xbaozi.topic", type = ExchangeTypes.TOPIC),
        value = @Queue(name = "topic.queue1"),
        key = "china.#"
))
public void listenTopicQueue1(String msg){
    System.out.println("消费者1接收到topic.queue2的消息:【" + msg + "】");
}

@RabbitListener(bindings = @QueueBinding(
        exchange = @Exchange(name = "xbaozi.topic", type = ExchangeTypes.TOPIC),
        value = @Queue(name = "#.news"),
        key = "#.news"
))
public void listenTopicQueue2(String msg){
    System.out.println("消费者2接收到topic.queue2的消息:【" + msg + "】");
}

5.3.3、测试


同样运行consumer服务运行测试类进行消息发送,得出结果后我们发现确实是按照我们指定的key的通配符对消息进行了正确的路由。同样我们也可以看得出来这一模型的一些特点:

  • Topic交换机接收的消息RoutingKey必须是多个单词,以 **.** 分割
  • Topic交换机与队列绑定时的bindingKey可以指定通配符
  • #:代表0个或多个词
  • *:代表1个词

image-20220731183931312

6、消息转换器

其实不知道大家有没有发现,其实写了这么多个模型下来,我们一直都是使用的字符串当作消息进行通讯,而在开发过程中我们用过最多的其实就是对象,目前看来这显然是还没有学到心坎去,那么就试一下传输一个对象试试?

说搞就搞!为了偷懒,直接使用了直接消息队列进行改造,然后传递一个Map对象来玩一下,但是要记得接收时的参数也要对应换成Map类型进行接收才能够正常接收,否则会报异常。

6.1、尝试传递Map对象


6.1.1、消息发送


在测试类中定义一个发送的方法,把我们的三十米大刀传递过去试试。

@Test
public void testSendMap() throws InterruptedException {
    // 准备消息
    Map<String,Object> msg = new HashMap<>();
    msg.put("name", "大刀");
    msg.put("len", 30);
    // 发送消息
    rabbitTemplate.convertAndSend("simple.queue", msg);
}

6.1.2、消息接收


在监听器中定义接收方法,只需将简单队列模型进行一个简单的修改即可:

@RabbitListener(queues = "simple.queue")
public void listenSimpleQueueMessage(Map<String, Object> msg) throws InterruptedException {
    System.out.println("spring 消费者接收到消息:" + msg);
}

6.1.3、测试

这里因为前面的代码,我们的MQ中已经存在了simple.queue这一队列,因此我们直接运行消息发送代码,并在MQ的管理平台中进行查看。在发送之后我们惊奇的发现,怎么是一长串的符号,刚刚传过来的30米大刀呢???

image-20220731184810514

但是当我们将consumer服务跑起来之后,其实这30米大刀是能正常接收的,这是为什么呢?其实这时因为默认情况下Spring采用的序列化方式是JDK序列化。众所周知,JDK序列化存在下列问题:

  • 数据体积过大,导致发送和接收的时间过长
  • 有安全漏洞,容易被不怀好意的崽截获
  • 可读性差,压根看不出来发送的是什么

6.2、优化改进


显然,JDK序列化方式并不合适。我们希望消息体的体积更小、可读性更高,因此可以使用JSON方式来做序列化和反序列化。

6.2.1、导入依赖


因为需要在publisher和consumer两个服务中都引入下方依赖,因此我直接将其放到了父工程mq-demo中去:

<!--jackson依赖-->
<dependency>
    <groupId>com.fasterxml.jackson.dataformat</groupId>
    <artifactId>jackson-dataformat-xml</artifactId>
    <version>2.13.3</version>
</dependency>

6.2.2、配置消息转换器


这里就没什么注解是我了解的了,所以只能在配置类中将消息转换器注入到容器中了。需要注意的是,两个服务publisherconsumer都需要配置消息转换器,从而替换掉默认的转换器。因为在发送和接收需要对数据分别进行序列化和反序列化,这就必须使用同一个转换器,这就好像你写了一句日文,然后我用中文来阅读一样,虽然有点相似dddd,但是总归是不同的东西。

  • 嫌弃麻烦的其实也可以选择性的加在启动类上,publisher同样可以检测扫描得到。
@Bean
public MessageConverter jsonMessageConverter(){
    return new Jackson2JsonMessageConverter();
}

6.2.3、测试


在运行消息发送的方法之后,在管理平台中可以发现,我们的30米大刀又回来了!

image-20220731190206477

同样,在将consumer服务跑起来之后,我们也可以在控制台看到正确的结果,这证明反序列化也成功了!

image-20220731190326666

最后

以上就是端庄斑马为你收集整理的【微服务】SpringAMQP实现RabbitMQ的消息模型的全部内容,希望文章能够帮你解决【微服务】SpringAMQP实现RabbitMQ的消息模型所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(42)

评论列表共有 0 条评论

立即
投稿
返回
顶部