概述
持续学习&持续更新中…
守破离
【黑马-SpringCloud技术栈】【09】消息队列—RabbitMQ_SpringAMQP
- 初识MQ
- 同步和异步通讯
- 同步调用
- 异步调用
- 如何选
- 什么是MQ及四大MQ对比
- RabbitMQ快速入门
- 什么是RabbitMQ
- RabbitMQ的安装
- 单机安装
- 集群安装
- RabbitMQ的结构和概念
- 常见消息模型
- HelloWorld案例
- SpringAMQP快速入门
- 什么是SpringAMQP
- 案例:利用SpringAMQP实现HelloWorld中的基础消息队列功能
- 工作队列
- 什么是工作队列
- 案例:模拟WorkQueue,实现一个队列绑定多个消费者
- 消费预取机制
- 总结
- 发布、订阅—交换机
- FanoutExchange【广播】
- 什么是FanoutExchange
- 案例:利用SpringAMQP演示FanoutExchange的使用
- DirectExchange【路由】
- 什么是DirectExchange
- 案例:利用SpringAMQP演示DirectExchange的使用
- 总结
- TopicExchange【话题】
- 什么是TopicExchange
- 案例:利用SpringAMQP演示TopicExchange的使用
- 消息转换器
- 测试发送Object类型消息
- 总结
- 参考
初识MQ
同步和异步通讯
-
微服务间通讯有同步和异步两种方式:
-
同步通讯:就像打电话,需要实时响应。
-
异步通讯:就像发邮件,不需要马上回复。
-
-
两种方式各有优劣,打电话可以立即得到响应,但是你却不能跟多个人同时通话。
-
发送邮件可以同时与多个人收发邮件,但是往往响应会有延迟。
同步调用
我们之前学习的Feign调用就属于同步方式,虽然调用可以实时得到结果,但存在下面的问题:
总结:
同步调用的优点:
- 时效性较强,可以立即得到结果
同步调用的问题:
- 耦合度高
- 性能和吞吐能力下降
- 有额外的资源消耗
- 有级联失败问题
异步调用
异步调用则可以避免上述问题:
-
我们以购买商品为例,用户支付后需要调用订单服务完成订单状态修改,也需要调用物流服务,从仓库分配响应的库存并准备发货。
-
在事件模式中,支付服务是事件发布者(publisher),在支付完成后只需要发布一个支付成功的事件(event),事件中带上订单id。
-
订单服务和物流服务是事件订阅者(consumer),订阅支付成功的事件,监听到事件后完成自己业务即可。
-
为了解除事件发布者与订阅者之间的耦合,两者并不是直接通信,而是有一个中间人(Broker)。发布者发布事件到Broker,不关心谁来订阅事件。订阅者从Broker订阅事件,不关心谁发来的消息。
-
Broker 是一个像数据总线一样的东西,所有的服务要接收数据和发送数据都发到这个总线上,这个总线就像协议一样,让服务间的通讯变得标准和可控。
异步调用常见实现就是事件驱动模式:
事件驱动优势:
-
优势一:服务解耦
-
优势二:性能提升,吞吐量提高
-
优势三:服务没有强依赖,不担心级联失败问题
-
优势四:流量消峰
总结:
-
异步调用的好处:
-
耦合度低,每个服务都可以灵活插拔,可替换
-
吞吐量高:无需等待订阅者处理完成,响应更快速,就可以处理更多的用户请求
-
故障隔离:服务没有直接调用,不存在级联失败问题
-
流量削峰:不管发布事件的流量波动多大,都由Broker接收,订阅者可以按照自己的速度去处理事件
-
-
异步调用的缺点:
- 架构复杂了,业务没有明显的流程线,不好追踪管理
- 需要依赖Broker的可靠性、可用性、稳定性、安全性、吞吐能力等(好在现在开源软件或云平台上 Broker 的软件是非常成熟的,比较常见的一种就是我们今天要学习的MQ技术。)
如何选
-
大多数情况下我们对并发并没有很高的要求,相反而言我们对实效性要求很高,需要用到服务的返回结果,因此大多数我们使用同步。
-
异步调用只适合那种不需要知道返回结果的、只需通知一下的、对并发和吞吐量要求较高的、需要服务解耦的服务调用
什么是MQ及四大MQ对比
- MQ (MessageQueue),中文是消息队列,字面来看就是存放消息(消息可以理解为“事件”)的队列。也就是事件驱动架构中的Broker。
注意:
- RockerMQ现在也是Apache社区的,但维护人员大多还是阿里员工
- Kafka适合海量数据传输但是对于数据安全要求不高的,一般用于日志数据传输
- RabbitMQ和RocketMQ更适合于业务(服务)之间的通信
- RabbitMQ有着活跃的社区并且很稳定,因此一般而言对于中小型公司来说,选择RabbitMQ即可
- 如果是大型公司,有对MQ做深度定制的需求,那么你可以选择使用RocketMQ,基于Java语言去做一定的开发
RabbitMQ快速入门
什么是RabbitMQ
- RabbitMQ是基于Erlang语言开发的开源消息通信中间件
- 官网地址:https://www.rabbitmq.com/
- Erlang是一门面向并发的语言,天生就是为了分布式系统而设计的。
RabbitMQ的安装
我们在Centos7虚拟机中使用Docker来安装。
单机安装
下载镜像:
-
方式一:在线拉取
docker pull rabbitmq:3-management
-
方式二:从本地加载:在课前资料已经提供了镜像包,上传到虚拟机中后,使用命令加载镜像即可:
docker load -i mq.tar
安装MQ:
-
执行下面的命令来运行MQ容器:
docker run -e RABBITMQ_DEFAULT_USER=itcast -e RABBITMQ_DEFAULT_PASS=123321 --name mq --hostname mq1 -p 15672:15672 -p 5672:5672 -d rabbitmq:3-management
-
访问MQ或者登录MQ的管理平台都需要用到环境变量配置的用户名和密码
-
集群部署必须配置
--hostname
-
-p 15672:15672
RabbitMQ的管理页面端口 -
-p 5672:5672
消息通信的端口
查看管理页面:
-
在浏览器中打开:http://192.168.152.134:15672
-
查看虚拟主机
集群安装
- 网上查阅相关教程
RabbitMQ的结构和概念
- channel:操作MQ的工具
- exchange:路由消息到队列中
- queue:缓存消息
- virtual host:虚拟主机,是对queue、exchange等资源的逻辑分组【RabbitMQ是一个多租户系统】
- 我们可以通过虚拟主机对不同的用户进行隔离
- 一般而言每一个用户都应该有自己独享的虚拟主机
常见消息模型
HelloWorld案例
官方的HelloWorld是基于最基础的消息队列模型来实现的,只包括三个角色:
- publisher:消息发布者,将消息发送到队列queue
- queue:消息队列,负责接受并缓存消息
- consumer:订阅队列,处理队列中的消息
实现步骤:
代码:
-
Publisher:
public class PublisherTest { @Test public void testSendMessage() throws IOException, TimeoutException { // 1.建立连接 ConnectionFactory factory = new ConnectionFactory(); // 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码 factory.setHost("192.168.152.134"); factory.setPort(5672); factory.setVirtualHost("/"); factory.setUsername("itcast"); factory.setPassword("123321"); // 1.2.建立连接 Connection connection = factory.newConnection(); // 2.创建通道Channel Channel channel = connection.createChannel(); // 3.创建队列 String queueName = "simple.queue"; channel.queueDeclare(queueName, false, false, false, null); // 4.发送消息 String message = "hello, rabbitmq!"; channel.basicPublish("", queueName, null, message.getBytes()); System.out.println("发送消息成功:【" + message + "】"); // 5.关闭通道和连接 channel.close(); connection.close(); } }
-
Consumer:
public class ConsumerTest { public static void main(String[] args) throws IOException, TimeoutException { // 1.建立连接 ConnectionFactory factory = new ConnectionFactory(); // 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码 factory.setHost("192.168.152.134"); factory.setPort(5672); factory.setVirtualHost("/"); factory.setUsername("itcast"); factory.setPassword("123321"); // 1.2.建立连接 Connection connection = factory.newConnection(); // 2.创建通道Channel Channel channel = connection.createChannel(); // 3.创建队列 String queueName = "simple.queue"; channel.queueDeclare(queueName, false, false, false, null); // 4.订阅消息 channel.basicConsume(queueName, true, new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { // 5.处理消息 String message = new String(body); System.out.println("接收到消息:【" + message + "】"); } }); System.out.println("等待接收消息。。。。"); } }
-
注意:由于Publisher和Consumer并不一定谁先启动,因此它俩都需要创建(声明)队列【MQ内部会优化,并不会创建两个相同的队列】
总结:
基本消息队列的消息发送流程:
- 建立connection
- 创建channel
- 利用channel声明队列
- 利用channel向队列发送消息
基本消息队列的消息接收流程:
- 建立connection
- 创建channel
- 利用channel声明队列
- 定义consumer的消费行为handleDelivery()
- 利用channel将消费者与队列绑定
SpringAMQP快速入门
什么是SpringAMQP
- SpringAmqp的官方地址:https://spring.io/projects/spring-amqp
案例:利用SpringAMQP实现HelloWorld中的基础消息队列功能
因为publisher和consumer服务都需要amqp依赖,因此这里把依赖直接放到父工程mq-demo中:
<!--AMQP依赖,包含RabbitMQ-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!--待会儿还需要用到单元测试-->
<!--单元测试-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
在publisher服务中编写application.yml,添加mq连接信息:
spring:
rabbitmq:
host: 192.168.152.134
port: 5672
username: itcast
password: 123321
virtual-host: /
在publisher服务中新建一个测试类,编写测试方法:
@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAMQPTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testSendSimpleQueue() {
String queueName = "simple.queue";
String message = "Hello, SpringAMQP! I am LP!";
rabbitTemplate.convertAndSend(queueName, message);
}
}
在consumer服务中编写application.yml,添加mq连接信息:
在consumer服务中新建一个类,编写消费逻辑:
@Component
public class SpringAMQPListener {
@RabbitListener(queues = "simple.queue")
public void receiveSimpleQueueMessage(String message) {
System.out.println("接收到了simple.queue的消息:【" + message + "】");
}
}
注意:
- 消息一旦消费就会从队列删除,RabbitMQ没有消息回溯功能,也即“阅后即焚”
- 需要事先创建好simple.queue这个队列, SpringAMQP不会帮你创建队列
工作队列
什么是工作队列
- Work queue,工作队列,让多个消费者合作处理消息,可以提高消息处理速度,避免队列消息堆积
案例:模拟WorkQueue,实现一个队列绑定多个消费者
-
publisher:
@RunWith(SpringRunner.class) @SpringBootTest public class SpringAMQPTest { @Autowired private RabbitTemplate rabbitTemplate; @Test public void testSendWorkQueue() throws Exception { String queueName = "simple.queue"; String message = "Hello, I am "; for (int i = 1; i <= 50; i++) { rabbitTemplate.convertAndSend(queueName, message + i); // 1秒钟等于1000毫秒 // 模拟1秒发送50条消息 TimeUnit.MILLISECONDS.sleep(20); } } }
-
consumer:
@Component public class SpringAMQPListener { @RabbitListener(queues = "simple.queue") public void receiveWorkQueueMessage1(String message) throws Exception { System.out.println("WorkQueueMessage1接收到了消息:【" + message + "】" + LocalTime.now()); // TimeUnit.MILLISECONDS.sleep(25); // 模拟一秒钟处理40个消息 TimeUnit.MILLISECONDS.sleep(20); // 模拟一秒钟处理50个消息 } @RabbitListener(queues = "simple.queue") public void receiveWorkQueueMessage2(String message) throws Exception { System.err.println("WorkQueueMessage2-----------接收到了消息:【" + message + "】" + LocalTime.now()); // TimeUnit.MILLISECONDS.sleep(50); // 模拟一秒钟处理20个消息 // TimeUnit.MILLISECONDS.sleep(500); // 模拟一秒钟处理2个消息 TimeUnit.MILLISECONDS.sleep(100); // 模拟一秒钟处理10个消息 } }
消费预取机制
有一个问题:
- 我们使用Work Queue的目的是让多个消费者合作处理消息,并且想让:处理消息快的消费者多消费,处理消息慢的消费者少消费。所谓“能者多劳”、“多干多得”
- 但是我们通过控制台的打印输出可以看到,每个RabbitListener都会处理一半的消息,看起来也就是消息被平均的分配给了消费者们。
- 我们发现,处理速度慢的消费者揽了一堆消息在那儿慢慢处理,而处理速度快的消费者却早早地就完成了任务,在那儿空闲着,这显然不行。
- 那么究其原因,是RabbitMQ的消费预取机制造成的。
什么是消费预取机制:
- 当有大量的消息到达队列时,队列就需要将这些消息投递给消费者了
- 而消费者的Channel管他能不能处理,会先把这些消息提前拿过来,你拿一个,我拿一个,于是消息就被平均的分配给了消费者们
- 而我们现在要让消费快的拿到的消息多,消费慢的拿到的消息少,那么就需要配置一下了
配置预取机制:
总结
- Work模型的使用:
- 多个消费者绑定到一个队列,同一条消息只会被一个消费者处理
- 通过设置prefetch来控制消费者预取的消息数量
发布、订阅—交换机
- 发布订阅模式与之前案例的区别就是允许将同一消息发送给多个消费者。实现方式是加入了exchange(交换机)。
- 常见exchange类型包括:
- Fanout:广播
- Direct:路由
- Topic:话题
总结:交换机的作用是什么?
- 接收publisher发送的消息
- 将消息按照规则路由到与之绑定的队列
- exchange负责消息路由,而不是存储,路由失败则消息丢失
FanoutExchange【广播】
什么是FanoutExchange
- Fanout Exchange 会将接收到的消息广播到每一个跟其绑定的queue
案例:利用SpringAMQP演示FanoutExchange的使用
-
SpringAMQP提供了声明交换机、队列、绑定关系的API,例如:
-
consumer:
@Configuration public class FanoutExchangeConfig { // 声明Fanout交换机 @Bean public FanoutExchange fanoutExchange() { return new FanoutExchange("itcast.fanout"); } // 声明第1个队列 @Bean public Queue fanoutQueue1() { return new Queue("fanout.queue1"); } //绑定队列1和交换机 @Bean public Binding bindingQueue1(FanoutExchange fanoutExchange, Queue fanoutQueue1) { return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange); } // 声明第2个队列 @Bean public Queue fanoutQueue2() { return new Queue("fanout.queue2"); } //绑定队列2和交换机 @Bean public Binding bindingQueue2(FanoutExchange fanoutExchange, Queue fanoutQueue2) { return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange); } }
@Component public class SpringAMQPListener { @RabbitListener(queues = "fanout.queue1") public void listenFanoutQueue1(String message) { System.err.println("fanoutQueue1-----------接收到了消息:【" + message + "】"); } @RabbitListener(queues = "fanout.queue2") public void listenFanoutQueue2(String message) { System.err.println("fanoutQueue2接收到了消息:【" + message + "】"); } }
-
publisher:
DirectExchange【路由】
什么是DirectExchange
Direct Exchange 会将接收到的消息根据规则路由到指定的Queue,因此称为路由模式(routes)。
- 每一个Queue都与Exchange设置一个BindingKey
- 发布者发送消息时,指定消息的RoutingKey
- Exchange将消息路由到BindingKey与消息RoutingKey一致的队列
案例:利用SpringAMQP演示DirectExchange的使用
-
consumer:
@Component public class SpringAMQPListener { @RabbitListener( bindings = @QueueBinding( value = @Queue(name = "direct.queue1"), exchange = @Exchange(name = "itcast.direct", type = "direct"), key = {"blue", "red"} ) ) public void listenDirectExchangeQueue1(String message) { System.out.println("listenDirectExchangeQueue1:message:【" + message + "】"); } @RabbitListener( bindings = @QueueBinding( value = @Queue(name = "direct.queue2"), exchange = @Exchange(name = "itcast.direct", type = ExchangeTypes.DIRECT), key = {"yellow", "red"} ) ) public void listenDirectExchangeQueue2(String message) { System.err.println("listenDirectExchangeQueue2——>message:【" + message + "】"); } }
-
publisher:
@RunWith(SpringRunner.class) @SpringBootTest public class SpringAMQPTest { @Autowired private RabbitTemplate rabbitTemplate; @Test public void testDirectExchangeQueue() { String exchangeName = "itcast.direct"; // String message = "Hello, blue!"; // 发送消息,参数依次为:交换机名称,RoutingKey,消息 // rabbitTemplate.convertAndSend(exchangeName, "blue", message); // String message = "Hello, yellow!"; // rabbitTemplate.convertAndSend(exchangeName, "yellow", message); String message = "Hello, red!"; rabbitTemplate.convertAndSend(exchangeName, "red", message); } }
总结
- 描述下Direct交换机与Fanout交换机的差异?
- Fanout交换机将消息路由给每一个与之绑定的队列
- Direct交换机根据RoutingKey匹配BindingKey来判断应该路由给哪个队列
- 如果多个队列具有相同的BindingKey,则与Fanout功能类似,比如上边的【red】
TopicExchange【话题】
什么是TopicExchange
- TopicExchange与DirectExchange类似,区别在于routingKey必须是多个单词的列表,并且以
.
分割。 - Queue与Exchange指定bindingKey时可以使用通配符:
#
:代指0个或多个单词*
:代指一个单词
案例:利用SpringAMQP演示TopicExchange的使用
-
consumer:
@Component public class SpringAMQPListener { @RabbitListener( bindings = @QueueBinding( value = @Queue(name = "topic.queue1"), exchange = @Exchange(name = "itcast.topic", type = ExchangeTypes.TOPIC), key = "china.#" ) ) public void listenTopicExchangeQueue1(String message) { System.err.println("listenTopicExchangeQueue1——>message:【" + message + "】"); } @RabbitListener( bindings = @QueueBinding( value = @Queue(name = "topic.queue2"), exchange = @Exchange(name = "itcast.topic", type = ExchangeTypes.TOPIC), key = "#.news" ) ) public void listenTopicExchangeQueue2(String message) { System.out.println("listenTopicExchangeQueue2==>message:【" + message + "】"); } }
-
publisher:
@RunWith(SpringRunner.class) @SpringBootTest public class SpringAMQPTest { @Autowired private RabbitTemplate rabbitTemplate; @Test public void testTopicExchangeQueue() { String exchangeName = "itcast.topic"; // String message = "新闻:传智教育【教育行业IPO第一股】上市了!"; // rabbitTemplate.convertAndSend(exchangeName, "china.news", message); String message = "天气:晴天,34摄氏度"; rabbitTemplate.convertAndSend(exchangeName, "china.weather", message); } }
消息转换器
测试发送Object类型消息
PS:老师使用上述方法在consumer中声明队列的目的是在浏览器中查看队列中的消息,自己实现完全可以不这样做,直接使用注解即可。
默认的JDK序列化的方式:这样既不直观,传输的字符又过长,因此,推荐换一种使用JSON序列化的方式。
Spring的对消息对象的处理是由org.springframework.amqp.support.converter.MessageConverter
来处理的。
而默认实现是SimpleMessageConverter
,基于JDK的ObjectOutputStream完成序列化。
如果要修改只需要定义一个 MessageConverter 类型的Bean即可。
推荐用JSON方式序列化,步骤如下:
-
publisher:
-
consumer:
-
修改后的消息:
总结
- SpringAMQP中消息的序列化和反序列化是怎么实现的?
- 利用MessageConverter实现的,默认是JDK的序列化,推荐使用JSON序列化方式
- 注意发送方与接收方必须使用相同的MessageConverter
参考
黑马程序员:SpringCloud微服务技术栈.
本文完,感谢您的关注支持!
最后
以上就是无聊蚂蚁为你收集整理的【黑马-SpringCloud技术栈】【09】消息队列—RabbitMQ_SpringAMQP初识MQRabbitMQ快速入门SpringAMQP快速入门工作队列发布、订阅—交换机FanoutExchange【广播】DirectExchange【路由】TopicExchange【话题】消息转换器参考的全部内容,希望文章能够帮你解决【黑马-SpringCloud技术栈】【09】消息队列—RabbitMQ_SpringAMQP初识MQRabbitMQ快速入门SpringAMQP快速入门工作队列发布、订阅—交换机FanoutExchange【广播】DirectExchange【路由】TopicExchange【话题】消息转换器参考所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复