概述
文章目录
- 1、基本概念
- 2、消息模型
- 2.1、粗浅引入
- 2.2、环境准备
- 3、基本消息队列
- 3.0、配置MQ
- 3.1、消息发送
- 3.2、消息接收
- 3.3、测试
- 4、工作消息队列
- 4.1、消息发送
- 4.2、消息接收
- 4.3、能者多劳
- 4.4、测试
- 5、发布/订阅
- 5.1、Fanout Exchange
- 5.1.1.声明队列和交换机
- 5.1.2、消息发送
- 5.1.3、消息接收
- 5.1.4、测试
- 5.2、Direct Exchange
- 5.2.1、基于注解声明队列和交换机
- 5.2.2、消息发送
- 5.2.3、测试
- 5.3、Topic Exchange
- 5.3.1、消息发送
- 5.3.2、消息接收
- 5.3.3、测试
- 6、消息转换器
- 6.1、尝试传递Map对象
- 6.1.1、消息发送
- 6.1.2、消息接收
- 6.1.3、测试
- 6.2、优化改进
- 6.2.1、导入依赖
- 6.2.2、配置消息转换器
- 6.2.3、测试
1、基本概念
SpringAMQP是基于RabbitMQ封装的一套模板,并且还利用SpringBoot对其实现了自动装配,可谓是十分优雅,SpringAMQP由两部分组成;spring-amqp 是基础抽象,spring-rabbit 是 RabbitMQ 实现。用官方很牛且掰的话来讲就是:(自行跳转:https://spring.io/projects/spring-amqp)
Spring AMQP 项目将核心 Spring 概念应用于基于 AMQP 的消息传递解决方案的开发。它提供了一个“模板”作为发送和接收消息的高级抽象。它还通过“侦听器容器”为消息驱动的 POJO 提供支持。这些库促进了 AMQP 资源的管理,同时促进了依赖注入和声明性配置的使用。在所有这些情况下,您都会看到与 Spring Framework 中的 JMS 支持的相似之处。
SpringAMQP提供了三个功能:
- 自动声明队列、交换机及其绑定关系;
- 基于注解的监听器模式,异步接收消息;
- 封装了RabbitTemplate工具,用于发送和接收消息 ,一般用于发送;
2、消息模型
2.1、粗浅引入
在RabbitMQ官方提供了五种有点相同但又没有完全相同的示例,其对应了不同的消息模型(其实我也没有找到这个示例在哪,有兴趣的小伙伴可以自己去扒拉一下再顺道告诉我[/dog])
下面的按顺序分别对应着不同的用法:
- 基本消息队列(BasicQueue) ,对应着1. “Hello World!”;
- 工作消息队列(WorkQueue) ,对应着2. “Work queues”;
- 发布订阅(Publish、Subscribe) ,后三种都属于这一类型,又根据交换机类型不同分为三种:
- Fanout Exchange:广播。对应着"3. Publish/Subscribe";
- Direct Exchange:路由。对应着"4. Routing";
- Topic Exchange:话题。对应着"5. Topics"。
2.2、环境准备
这里是用到的是父工程mq-demo进行管理依赖,然后两个子工程publisher和consumer分别对应消息的发送者和消息的消费者。
因为这里只涉及到了MQ且所有依赖两个子工程都用得到,因此所有依赖都统一交由父工程管理,子工程共用即可。
<dependencies>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<!--AMQP依赖,包含RabbitMQ-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!--单元测试-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
</dependencies>
在搭建好项目环境之后就会得到下面的结构啦:
3、基本消息队列
在前面编写的《【微服务】RabbitMQ的粗浅入门》中其实已经编写过这一基本消息队列的具体实现了,但是那边文章使用的是不太优雅的代码方式,连接、通道、队列这些都需要我们逐一搭建,因此在这里使用SpringAMQP再次实现一遍,从而形成对比,让我们看看SpringAMQP究竟有多么优雅。
3.0、配置MQ
这其实是应该放在环境准备里面去编写的,但是为了对比原生中的编写连接,最终决定放在了这里。
分别在两个子工程的application.yml文件中添加如下配置,注意主机IP和账号密码记得换成自己的,并且两个子工程都需要进行配置。在分别配置之后就可以一劳永逸啦,而不像原生的一样,每写一次发送接收都需要配置参数建立连接。优雅,永不过时。
spring:
rabbitmq:
host: 192.168.150.101 # 主机名
port: 5672 # 端口
virtual-host: / # 虚拟主机
username: xbaozi # 用户名
password: 123456 # 密码
3.1、消息发送
这里在publisher服务中使用测试类对发送端的代码进行发送
@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAmqpTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testSimpleQueue() {
// 队列名称
String queueName = "simple.queue";
// 消息
String message = "hello, spring amqp!";
// 发送消息
rabbitTemplate.convertAndSend(queueName, message);
}
}
3.2、消息接收
在consumer服务中新建RabbitConfig配置类,将队列加入到容器中。
@Configuration
public class RabbitConfig {
@Bean
public Queue simpleQueue() {
return new Queue("simple.queue");
}
}
在consumer服务中新建SpringRabbitListener类对消息进行监听。
@Component
public class SpringRabbitListener {
@RabbitListener(queues = "simple.queue")
public void listenSimpleQueueMessage(String msg) throws InterruptedException {
System.out.println("spring 消费者接收到消息:【" + msg + "】");
}
}
3.3、测试
先运行consumer服务后运行测试类进行消息发送,因为这里只有consumer服务有注入了队列对象,如果先进行消息发送的话则会报错。
4、工作消息队列
Work queues,也被称为(Task queues),任务模型。简单来说就是让多个消费者绑定到一个队列,共同消费队列中的消息。
当消息处理比较耗时的时候,可能生产消息的速度会远远大于消息的消费速度。长此以往,消息就会堆积越来越多,无法及时处理。此时就可以使用work 模型,多个消费者共同处理消息处理,速度就能大大提高了。
4.1、消息发送
在publisher服务中的SpringAmqpTest类中添加一个测试方法,模拟大小消息的堆积现象。
@Test
public void testWorkQueue() throws InterruptedException {
// 队列名称
String queueName = "simple.queue";
// 消息
String message = "hello, message_";
for (int i = 0; i < 50; i++) {
// 发送消息
rabbitTemplate.convertAndSend(queueName, message + i);
Thread.sleep(20);
}
}
4.2、消息接收
这里在consumer服务中的SpringRabbitListener类中定义两个监听方法,分别模拟多个消费者对同一队列进行监听处理。
- 方法1进行sleep处理20ms,模拟性能好的服务器,1s处理五十条数据
- 方法2进行sleep处理200ms,模拟性能较差的服务器,1s处理五条数据
@RabbitListener(queues = "simple.queue")
public void listenWorkQueue1(String msg) throws InterruptedException {
System.out.println("消费者1接收到消息:【" + msg + "】" + LocalTime.now());
Thread.sleep(20);
}
@RabbitListener(queues = "simple.queue")
public void listenWorkQueue2(String msg) throws InterruptedException {
System.err.println("消费者2........接收到消息:【" + msg + "】" + LocalTime.now());
Thread.sleep(200);
}
4.3、能者多劳
其实到这里就可以分别运行两个服务了,但是运行之后会发现结果并没有按照我们想的来执行,而是出现了一个随机的处理结果。
这是因为两个不同的消费者默认情况下,会从队列中争夺性的获取随机条信息进行处理,这就会导致性能较差的消费者撑死揽下了所有的消息。
这时我们就需要进行一定的配置,使得每个消费者一次只能获取一条消息,处理完成之后才能获取下一个消息。因此,在application.yml文件中添加以下配置:
spring:
rabbitmq:
listener:
simple:
# 每次只能获取一条消息,处理完成才能获取下一个消息
prefetch: 1
4.4、测试
同样先运行consumer服务后运行测试类进行消息发送,我们会发现成功按照我们想象中的根据性能的不同处理不同的消息,使得资源得以最大化使用。同样我们也可以看得出来这一模型的一些特点:
- 多个消费者绑定到一个队列,同一条消息只会被一个消费者处理
- 通过设置prefetch来控制消费者预取的消息数量
5、发布/订阅
在发布/订阅模型与前面的模型最大的不同即为多了一个exchange角色,且过程略有变化:
- Publisher:生产者,也就是要发送消息的程序,但是不再发送到队列中,而是发给交换机;
- Exchange:交换机,。一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。Exchange有以下3种类型:
- Fanout:广播,将消息交给所有绑定到交换机的队列;
- Direct:定向,把消息交给符合指定routing key 的队列;
- Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列;
- Consumer:消费者,与以前一样,订阅队列,没有变化;
- Queue:消息队列也与以前一样,接收消息、缓存消息。
值得注意的是,Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!
5.1、Fanout Exchange
在广播模式下,消息发送流程是这样的:
- 可以有多个队列
- 每个队列都要绑定到Exchange(交换机)
- 生产者发送的消息,只能发送到交换机,交换机来决定要发给哪个队列,生产者无法决定
- 交换机把消息发送给绑定过的所有队列
- 订阅队列的消费者都能拿到消息
5.1.1.声明队列和交换机
在consumer服务中的RabbitConfig中声明队列和交换机
@Configuration
public class RabbitConfig {
/**
* 声明交换机
* @return Fanout类型交换机
*/
@Bean
public FanoutExchange fanoutExchange(){
return new FanoutExchange("xbaozi.fanout");
}
/**
* 第1个队列,方法名是这一个Bean的唯一标识
*/
@Bean
public Queue fanoutQueue1(){
return new Queue("fanout.queue1");
}
/**
* @description: 绑定队列和交换机
* @param: fanoutQueue1 根据队列1的方法名进行自动装配
* @param: fanoutExchange 根据交换机的方法名进行自动装配
**/
@Bean
public Binding bindingQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange){
return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
}
/**
* 第2个队列,方法名是这一个Bean的唯一标识
*/
@Bean
public Queue fanoutQueue2(){
return new Queue("fanout.queue2");
}
/**
* @description: 绑定队列和交换机
* @param: fanoutQueue2 根据队列2的方法名进行自动装配
* @param: fanoutExchange 根据交换机的方法名进行自动装配
**/
@Bean
public Binding bindingQueue2(Queue fanoutQueue2, FanoutExchange fanoutExchange){
return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
}
}
5.1.2、消息发送
在publisher服务的SpringAmqpTest类中添加测试方法:
@Test
public void testFanoutExchange() {
// 交换机名称
String exchangeName = "xbaozi.fanout";
// 消息
String message = "hello, everyone!";
// routingKey,即第二个参数先留空,后续Direct Exchange模型使用
rabbitTemplate.convertAndSend(exchangeName, "", message);
}
5.1.3、消息接收
在consumer服务的SpringRabbitListener
中添加两个方法,分别模拟两个不同的消费者:
@RabbitListener(queues = "fanout.queue1")
public void listenFanoutQueue1(String msg) {
System.out.println("消费者1接收到Fanout消息:【" + msg + "】");
}
@RabbitListener(queues = "fanout.queue2")
public void listenFanoutQueue2(String msg) {
System.out.println("消费者2接收到Fanout消息:【" + msg + "】");
}
5.1.4、测试
同样先运行consumer服务后运行测试类进行消息发送,得出结果后我们发现,结果和我们预料的一样,如同广播,发送的消息都被所有的消费者接收到了。同样我们也可以看得出来这一模型的一些特点:
- 接收publisher发送的消息,FanoutExchange的会将消息路由到每个绑定的队列
- 不能缓存消息,路由失败,消息丢失
5.2、Direct Exchange
在Fanout模式中,一条消息,会被所有订阅的队列都消费。但是,在某些场景下,我们希望不同的消息被不同的队列消费。这时就要用到Direct类型的Exchange。在Direct模型下:
- 队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由key)
- 消息的发送方在 向 Exchange发送消息时,也必须指定消息的 RoutingKey。
- Exchange不再把消息交给每一个绑定的队列,而是根据消息的Routing Key进行判断,只有队列的Routingkey与消息的 Routing key完全一致,才会接收到消息
5.2.1、基于注解声明队列和交换机
在前面的Fanout Exchange中我们大概可以感受得到手动的注入bean其实是一件很繁琐的事情,因此在Spring中还给我们提供了基于注解的方式来声明。
这里我们同样模拟两个消费者接收消息进行测试,在此之前对下面用到的注解进行一个简单的解释:
- @RabbitListener:进行MQ的系列配置;
- @QueueBinding:指定绑定关系,其中的key属性为指定绑定的路由key;
- @Exchange:指定交换机,如果没有则创建,使用枚举类ExchangeTypes指定类型是Direct(这是默认的类型);
- @Queue:指定监听的队列,如果没有则创建。
@RabbitListener(bindings = @QueueBinding(
exchange = @Exchange(name = "xbaozi.direct", type = ExchangeTypes.DIRECT),
value = @Queue(name = "direct.queue1"),
// 指定绑定的路由key,相同的是red,不同的是yellow
key = {"red", "yellow"}
))
public void listenDirectQueue1(String msg){
System.out.println("消费者1接收到direct.queue1的消息:【" + msg + "】");
}
@RabbitListener(bindings = @QueueBinding(
exchange = @Exchange(name = "xbaozi.direct", type = ExchangeTypes.DIRECT),
value = @Queue(name = "direct.queue2"),
// 指定绑定的路由key,相同的是red,不同的是blue
key = {"red", "blue"}
))
public void listenDirectQueue2(String msg){
System.out.println("消费者2接收到direct.queue2的消息:【" + msg + "】");
}
5.2.2、消息发送
在publisher服务的SpringAmqpTest
类中添加测试方法:
@Test
public void testSendDirectExchange() {
// 交换机名称
String exchangeName = "xbaozi.direct";
// 消息
String message1 = "红色警报!日本乱排核废水,导致海洋生物变异,惊现哥斯拉!";
String message2 = "蓝色警报!最近台风来袭,广东听闻终于下雨!";
String message3 = "黄色警报!近期持续高温,导致地表温度升高,广东热成焦鸡腿!";
// 发送消息
rabbitTemplate.convertAndSend(exchangeName, "red", message1);
rabbitTemplate.convertAndSend(exchangeName, "blue", message2);
rabbitTemplate.convertAndSend(exchangeName, "yellow", message3);
}
5.2.3、测试
同样先运行consumer服务后运行测试类进行消息发送,得出结果后我们发现确实是按照我们指定的key对消息进行了正确的路由。同样我们也可以看得出来这一模型的一些特点:
- Fanout交换机将消息路由给每一个与之绑定的队列
- Direct交换机根据RoutingKey判断路由给哪个队列
- 如果多个队列具有相同的RoutingKey,则与Fanout功能类似
5.3、Topic Exchange
Topic与Direct相比,都是可以根据RoutingKey把消息路由到不同的队列。只不过Topic可以让队列在绑定Routing key 的时候使用通配符,这是两者最大的区别。
Routingkey
一般都是有一个或多个单词组成,多个单词之间以 ”.” 分割,例如: china.culture
通配符规则:
#:匹配一个或多个词,如china.#
能够匹配china.beijing.news
或者 china.beijing
*****:匹配不多不少恰好1个词,如china.*
:只能匹配china.news
或者china.culture
5.3.1、消息发送
在publisher服务的SpringAmqpTest
类中添加测试方法:
@Test
public void testSendTopicExchange() {
// 交换机名称
String exchangeName = "xbaozi.topic";
// 消息
String message1 = "卖报卖报!小日子不错的那些人不顾全球安危,坚持乱排核废水!";
String message2 = "预警!日本樱岛火山爆发,当地警戒级别已升至最高,请注意安全!";
String message3 = "2022年5号台风桑达已生成,各地请注意暴雨冰雹!";
// 发送消息
rabbitTemplate.convertAndSend(exchangeName, "china.news", message1);
rabbitTemplate.convertAndSend(exchangeName, "japan.news", message2);
rabbitTemplate.convertAndSend(exchangeName, "china.weather", message3);
}
5.3.2、消息接收
在consumer服务的SpringRabbitListener
中添加方法,注解和Direct Exchange中的几乎一样,只需要将交换机的类型换成TOPIC即可。
@RabbitListener(bindings = @QueueBinding(
exchange = @Exchange(name = "xbaozi.topic", type = ExchangeTypes.TOPIC),
value = @Queue(name = "topic.queue1"),
key = "china.#"
))
public void listenTopicQueue1(String msg){
System.out.println("消费者1接收到topic.queue2的消息:【" + msg + "】");
}
@RabbitListener(bindings = @QueueBinding(
exchange = @Exchange(name = "xbaozi.topic", type = ExchangeTypes.TOPIC),
value = @Queue(name = "#.news"),
key = "#.news"
))
public void listenTopicQueue2(String msg){
System.out.println("消费者2接收到topic.queue2的消息:【" + msg + "】");
}
5.3.3、测试
同样先运行consumer服务后运行测试类进行消息发送,得出结果后我们发现确实是按照我们指定的key的通配符对消息进行了正确的路由。同样我们也可以看得出来这一模型的一些特点:
- Topic交换机接收的消息RoutingKey必须是多个单词,以
**.**
分割 - Topic交换机与队列绑定时的bindingKey可以指定通配符
#
:代表0个或多个词*
:代表1个词
6、消息转换器
其实不知道大家有没有发现,其实写了这么多个模型下来,我们一直都是使用的字符串当作消息进行通讯,而在开发过程中我们用过最多的其实就是对象,目前看来这显然是还没有学到心坎去,那么就试一下传输一个对象试试?
说搞就搞!为了偷懒,直接使用了直接消息队列进行改造,然后传递一个Map对象来玩一下,但是要记得接收时的参数也要对应换成Map类型进行接收才能够正常接收,否则会报异常。
6.1、尝试传递Map对象
6.1.1、消息发送
在测试类中定义一个发送的方法,把我们的三十米大刀传递过去试试。
@Test
public void testSendMap() throws InterruptedException {
// 准备消息
Map<String,Object> msg = new HashMap<>();
msg.put("name", "大刀");
msg.put("len", 30);
// 发送消息
rabbitTemplate.convertAndSend("simple.queue", msg);
}
6.1.2、消息接收
在监听器中定义接收方法,只需将简单队列模型进行一个简单的修改即可:
@RabbitListener(queues = "simple.queue")
public void listenSimpleQueueMessage(Map<String, Object> msg) throws InterruptedException {
System.out.println("spring 消费者接收到消息:" + msg);
}
6.1.3、测试
这里因为前面的代码,我们的MQ中已经存在了simple.queue
这一队列,因此我们直接运行消息发送代码,并在MQ的管理平台中进行查看。在发送之后我们惊奇的发现,怎么是一长串的符号,刚刚传过来的30米大刀呢???
但是当我们将consumer服务跑起来之后,其实这30米大刀是能正常接收的,这是为什么呢?其实这时因为默认情况下Spring采用的序列化方式是JDK序列化。众所周知,JDK序列化存在下列问题:
- 数据体积过大,导致发送和接收的时间过长
- 有安全漏洞,容易被不怀好意的崽截获
- 可读性差,压根看不出来发送的是什么
6.2、优化改进
显然,JDK序列化方式并不合适。我们希望消息体的体积更小、可读性更高,因此可以使用JSON方式来做序列化和反序列化。
6.2.1、导入依赖
因为需要在publisher和consumer两个服务中都引入下方依赖,因此我直接将其放到了父工程mq-demo
中去:
<!--jackson依赖-->
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-xml</artifactId>
<version>2.13.3</version>
</dependency>
6.2.2、配置消息转换器
这里就没什么注解是我了解的了,所以只能在配置类中将消息转换器注入到容器中了。需要注意的是,两个服务publisher
和consumer
都需要配置消息转换器,从而替换掉默认的转换器。因为在发送和接收需要对数据分别进行序列化和反序列化,这就必须使用同一个转换器,这就好像你写了一句日文,然后我用中文来阅读一样,虽然有点相似dddd,但是总归是不同的东西。
- 嫌弃麻烦的其实也可以选择性的加在启动类上,publisher同样可以检测扫描得到。
@Bean
public MessageConverter jsonMessageConverter(){
return new Jackson2JsonMessageConverter();
}
6.2.3、测试
在运行消息发送的方法之后,在管理平台中可以发现,我们的30米大刀又回来了!
同样,在将consumer服务跑起来之后,我们也可以在控制台看到正确的结果,这证明反序列化也成功了!
最后
以上就是端庄斑马为你收集整理的【微服务】SpringAMQP实现RabbitMQ的消息模型的全部内容,希望文章能够帮你解决【微服务】SpringAMQP实现RabbitMQ的消息模型所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复