我是靠谱客的博主 迷人网络,这篇文章主要介绍RabbitMQ到底是怎么运行的?一、核心组件二、RabbitMQ的运行流程三、实际演练小结,现在分享给大家,希望可以做个参考。

????????????????????????

哈喽!大家好,我是【一心同学】,一位上进心十足的【Java领域博主】!????????????

✨【一心同学】的写作风格:喜欢用【通俗易懂】的文笔去讲解每一个知识点,而不喜欢用【高大上】的官方陈述。

✨【一心同学】博客的领域是【面向后端技术】的学习,未来会持续更新更多的【后端技术】以及【学习心得】。

✨如果有对【后端技术】感兴趣的【小可爱】,欢迎关注一心同学】????????????

❤️❤️❤️感谢各位大可爱小可爱!❤️❤️❤️ 


目录

一、核心组件

???? 概念讲解

???? 思考

二、RabbitMQ的运行流程

???? 生产者发送消息

???? 消费者接收消息

三、实际演练

3.1 准备工作

3.2 定义生产者

???? 代码

???? 分析

3.3 定义消费者

???? 代码

???? 分析

小结


一、核心组件

???? 概念讲解

我们来看一下RabbitMQ核心组成部分,如下图:

以上这张图非常重要,因为其就是我们的RabbitMQ的核心组成,接下来,一心同学将来讲解其每一个组件。

????Producer:消息的生产者,也是一个向交换器发布消息的客户端应用程序。

????Consumer:消息的消费者,表示一个从一个消息队列中取得消息的客户端应用程序。

????Connection:生产者、消费者与broker之间的TCP链接,需要进行 TCP/IP的三次握手和四次挥手。

????Channel:网络信道,几乎所有的操作都在Channel中进行,Channel是进行消息读写的通道。

????Server:又称Broker ,接受客户端的连接,实现AMQP实体服务。

????Virtual Host:虚拟地址,用于进行逻辑隔离,是出于多租户和安全因素,把AMQP的基本组件划分到一个虚拟的分组中,可以把Virtual Host理解为mysql中的database,每个Virtual Host都可以进行单独的权限管理和逻辑操作。当多个不同的用户使用同一个RabbitMQ server提供的服务时,可以划分出多个vhost,每个用户在自己的vhost中创建exchange、queue等。

????Message:消息,服务与应用程序之间传送的数据,由Properties和body组成,Properties可以对消息进行修饰,比如消息的优先级,延迟等高级特性,Body则就是消息体的内容。

????Exchange:message到达broker的第一站,根据分发规则,匹配routing key,将消息分发到queue中去,可以绑定多个Queue也可以同时绑定其他的Exchange,常用的类型有direct(点对点)、topic(规则匹配)、fanout(广播)。

????Routing key:是一个路由规则,Exchange会根据消息的Routing Key匹配对应的Queue进行投递。

????Bindings:exchange和queue之间的虚拟链接,binding中包含routing key,binding信息保存到exchange的查询表中,用于message的分发,这样RabbitMQ就知道如何正确地将消息路由发到指定的Queue了。

????Queue:队列,也称为Message Queue(消息队列),保存消息并将它们转发给消费者。

???? 思考

我们完全可以直接使用Connection 就能完成Channel信道的工作,为什么还要引入Channel信道呢?

        我们试想这样一个场景, 一个应用程序中有很多个线程需要从RabbitMQ消费消息,或者生产消息,那么必然需要建立很多个Connection ,也就是许多个TCP 连接。然而对于操作系统而言,建立和销毁TCP 连接是非常昂贵的开销,如果遇到使用高峰,性能瓶颈也随之显现。RabbitMQ 采用类似NIO' (Non-blocking 1/0) 的做法,选择TCP 连接复用,不仅可以减少性能开销,同时也便于管理。

        每个线程把持一个信道,所以信道复用了Connection 的TCP 连接。同时RabbitMQ 可以确
保每个线程的私密性,就像拥有独立的连接一样。

        当每个信道的流量不是很大时,复用单一的Connection 可以在产生性能瓶颈的情况下有效地节省TCP 连接资源。

        但是当信道本身的流量很大时,这时候多个信道复用一个Connection 就会产生性能瓶颈,进而使整体的流量被限制了。此时就需要开辟多个Connection ,将这些信道均摊到这些Connection 中, 至于这些相关的调优策略需要根据业务自身的实际情况进行调节。

二、RabbitMQ的运行流程

???? 生产者发送消息

(1) 生产者连接到RabbitMQ Broker , 建立一个连接( Connection) ,开启一个信道(Channel)

(2) 生产者声明一个交换器(Exchange),并设置相关属性,比如交换机类型、是否持久化等

(3) 生产者声明一个队列(Queue)井设置相关属性,比如是否排他、是否持久化、是否自动删除等

( 4 ) 生产者通过路由键(Routing key)将交换器队列绑定起来

( 5 ) 生产者发送消息至RabbitMQ Broker,其中包含路由键交换器等信息

(6) 相应的交换器根据接收到的路由键查找相匹配的队列。

( 7 ) 如果找到,则将从生产者发送过来的消息存入相应的队列中。

(8) 如果没有找到,则根据生产者配置的属性选择丢弃还是回退给生产者

(9) 关闭信道。

(1 0) 关闭连接。

???? 消费者接收消息

(1)消费者连接到RabbitMQ Broker ,建立一个连接(Connection ) ,开启一个信道(Channel) 。

(2) 消费者向RabbitMQ Broker 请求消费相应队列中的消息,可能会设置相应的回调函数,
以及做一些准备工作

(3)等待RabbitMQ Broker 回应并投递相应队列中的消息, 消费者接收消息。

(4) 消费者确认( ack) 接收到的消息。

( 5) RabbitMQ 从队列中删除相应己经被确认的消息。

( 6) 关闭信道。

( 7) 关闭连接。

三、实际演练

我们现在对RabbitMQ的基本概念以及流程熟悉了,那么我们接下来就要进行实操了。

3.1 准备工作

(1)启动RabbitMQ的服务

复制代码
1
systemctl start rabbitmq-server

(2)打开防火墙

我们需要对5672这个端口进行打开防火墙设置(如果已经打开,忽略此步)

复制代码
1
firewall-cmd --zone=public --add-port=5672/tcp --permanent

重启防火墙:

复制代码
1
firewall-cmd --reload

(3)创建一个Maven项目并导入依赖

复制代码
1
2
3
4
5
<dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.10.0</version> </dependency>

如果是使用Spring Boot集成,那么则导入以下依赖:

复制代码
1
2
3
4
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>

3.2 定义生产者

???? 代码

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
package com.yixin.simple; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class Producer { public static void main(String[] args) { // 1: 创建连接工厂 ConnectionFactory connectionFactory = new ConnectionFactory(); // 2: 设置连接属性 connectionFactory.setHost("服务器地址"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); connectionFactory.setUsername("yixin"); connectionFactory.setPassword("123456"); Connection connection = null; Channel channel = null; try { // 3: 从连接工厂中获取连接 connection = connectionFactory.newConnection("生产者"); // 4: 从连接中获取通道channel channel = connection.createChannel(); // 5: 申明队列queue存储消息 /* * 如果队列不存在,则会创建 * Rabbitmq不允许创建两个相同的队列名称,否则会报错。 * * @params1: queue 队列的名称 * @params2: durable 队列是否持久化 * @params3: exclusive 是否排他,即是否私有的,如果为true,会对当前队列加锁,其他的通道不能访问,并且连接自动关闭 * @params4: autoDelete 是否自动删除,当最后一个消费者断开连接之后是否自动删除消息。 * @params5: arguments 可以设置队列附加参数,设置队列的有效期,消息的最大长度,队列的消息生命周期等等。 * */ channel.queueDeclare("queue1", false, false, false, null); // 6: 准备发送消息的内容 String message = "你好,一心同学"; // 7: 发送消息给中间件rabbitmq-server // @params1: 交换机exchange // @params2: 队列名称/routing // @params3: 属性配置 // @params4: 发送消息的内容 channel.basicPublish("", "queue1", null, message.getBytes()); System.out.println("消息发送成功!"); } catch (Exception ex) { ex.printStackTrace(); System.out.println("发送消息出现异常..."); } finally { // 7: 释放连接关闭通道 if (channel != null && channel.isOpen()) { try { channel.close(); } catch (Exception ex) { ex.printStackTrace(); } } if (connection != null) { try { connection.close(); } catch (Exception ex) { ex.printStackTrace(); } } } } }

我们点击运行,如果出现以下错误则说明我们没有给该用户赋予资源权限,所以其没有连接权限:

我们只需要赋予其资源权限即可,执行以下代码:

复制代码
1
rabbitmqctl set_permissions -p / 用户名 ".*" ".*" ".*"

重新运行,就可以成功了!

???? 分析

我们前往Web控制台(http://服务器地址:15672/)进行查看该队列的信息: 

 

我们点击queue1进行查看详情,以下是我们该队列的信息的基本操作

我们点击Get messages进行查看:

其中ACK是应答模式
(1)Nack:消息被消费不告诉rabbitmq-server消息被消费了,会重新投递队列中
(2)Ack:rabbitmq-server接收到消费信息,会从服务器中移除消息。

3.3 定义消费者

???? 代码

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
package com.yixin.simple; import com.rabbitmq.client.*; import java.io.IOException; public class Consumer { public static void main(String[] args) { // 1: 创建连接工厂 ConnectionFactory connectionFactory = new ConnectionFactory(); // 2: 设置连接属性 connectionFactory.setHost("服务器地址"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); connectionFactory.setUsername("yixin"); connectionFactory.setPassword("123456"); Connection connection = null; Channel channel = null; try { // 3: 从连接工厂中获取连接 connection = connectionFactory.newConnection("消费者"); // 4: 从连接中获取通道channel channel = connection.createChannel(); // 5: 申明队列queue存储消息 /* * 如果队列不存在,则会创建 * Rabbitmq不允许创建两个相同的队列名称,否则会报错。 * * @params1: queue 队列的名称 * @params2: durable 队列是否持久化 * @params3: exclusive 是否排他,即是否私有的,如果为true,会对当前队列加锁,其他的通道不能访问,并且连接自动关闭 * @params4: autoDelete 是否自动删除,当最后一个消费者断开连接之后是否自动删除消息。 * @params5: arguments 可以设置队列附加参数,设置队列的有效期,消息的最大长度,队列的消息生命周期等等。 * */ channel.basicConsume("queue1", true, new DeliverCallback() { public void handle(String s, Delivery delivery) throws IOException { System.out.println("收到的消息是"+new String(delivery.getBody(),"UTF-8")); } },new CancelCallback(){ public void handle(String consumerTag) throws IOException{ System.out.println("接收失败"); } }); // 7: 发送消息给中间件rabbitmq-server // @params1: 交换机exchange // @params2: 队列名称/routing // @params3: 属性配置 // @params4: 发送消息的内容 System.out.println("开始接收消息"); //让我们的生产者一直处于运行状态,这样就不必使用while(true)了 System.in.read(); } catch (Exception ex) { ex.printStackTrace(); System.out.println("发送消息出现异常..."); } finally { // 7: 释放连接关闭通道 if (channel != null && channel.isOpen()) { try { channel.close(); } catch (Exception ex) { ex.printStackTrace(); } } if (connection != null) { try { connection.close(); } catch (Exception ex) { ex.printStackTrace(); } } } } }

点击运行后,控制台打印:

 可以发现我们的消费者已经成功拿到我们的数据了!

???? 分析

我们现在重新回到Web控制界面进行查看:

即使收到消息后我们的消费者也还是一直处于连接的状态( System.in.read()的功劳),也就是说我们的生产者那边只要一投递消息,我们生产者这边就可以立即收到,我们可以去控制台查看其连接信息:

而且我们消费者的信道Channel也是还处于占用的状态:

 看到这里,是不是就对其工作流程基本熟悉了,我们再来看一下这张图,相信大家思路立马就清晰了:


小结

以上就是【一心同学】讲解的【RabbitMQ】的【工作流程】,学完是不是感觉很震惊,这一部分还是希望大家能够【熟练掌握】,因为在我们后面的讲解中,可以帮助我们更加清晰的理解【RabbitMQ】的其他知识点。 

如果这篇【文章】有帮助到你,希望可以给【一心同学】点个????,创作不易,相比官方的陈述,我更喜欢用【通俗易懂】的文笔去讲解每一个知识点,如果有对【后端技术】感兴趣的小可爱,也欢迎关注❤️❤️❤️ 【一心同学】❤️❤️❤️,我将会给你带来巨大的【收获与惊喜】????????!

最后

以上就是迷人网络最近收集整理的关于RabbitMQ到底是怎么运行的?一、核心组件二、RabbitMQ的运行流程三、实际演练小结的全部内容,更多相关RabbitMQ到底是怎么运行内容请搜索靠谱客的其他文章。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(73)

评论列表共有 0 条评论

立即
投稿
返回
顶部