概述
????????????????????????
哈喽!大家好,我是【一心同学】,一位上进心十足的【Java领域博主】!????????????
✨【一心同学】的写作风格:喜欢用【通俗易懂】的文笔去讲解每一个知识点,而不喜欢用【高大上】的官方陈述。
✨【一心同学】博客的领域是【面向后端技术】的学习,未来会持续更新更多的【后端技术】以及【学习心得】。
✨如果有对【后端技术】感兴趣的【小可爱】,欢迎关注【一心同学】????????????
❤️❤️❤️感谢各位大可爱小可爱!❤️❤️❤️
目录
一、核心组件
???? 概念讲解
???? 思考
二、RabbitMQ的运行流程
???? 生产者发送消息
???? 消费者接收消息
三、实际演练
3.1 准备工作
3.2 定义生产者
???? 代码
???? 分析
3.3 定义消费者
???? 代码
???? 分析
小结
一、核心组件
???? 概念讲解
我们来看一下RabbitMQ的核心组成部分,如下图:
以上这张图非常重要,因为其就是我们的RabbitMQ的核心组成,接下来,一心同学将来讲解其每一个组件。
????Producer:消息的生产者,也是一个向交换器发布消息的客户端应用程序。
????Consumer:消息的消费者,表示一个从一个消息队列中取得消息的客户端应用程序。
????Connection:生产者、消费者与broker之间的TCP链接,需要进行 TCP/IP的三次握手和四次挥手。
????Channel:网络信道,几乎所有的操作都在Channel中进行,Channel是进行消息读写的通道。
????Server:又称Broker ,接受客户端的连接,实现AMQP实体服务。
????Virtual Host:虚拟地址,用于进行逻辑隔离,是出于多租户和安全因素,把AMQP的基本组件划分到一个虚拟的分组中,可以把Virtual Host理解为mysql中的database,每个Virtual Host都可以进行单独的权限管理和逻辑操作。当多个不同的用户使用同一个RabbitMQ server提供的服务时,可以划分出多个vhost,每个用户在自己的vhost中创建exchange、queue等。
????Message:消息,服务与应用程序之间传送的数据,由Properties和body组成,Properties可以对消息进行修饰,比如消息的优先级,延迟等高级特性,Body则就是消息体的内容。
????Exchange:message到达broker的第一站,根据分发规则,匹配routing key,将消息分发到queue中去,可以绑定多个Queue也可以同时绑定其他的Exchange,常用的类型有direct(点对点)、topic(规则匹配)、fanout(广播)。
????Routing key:是一个路由规则,Exchange会根据消息的Routing Key匹配对应的Queue进行投递。
????Bindings:exchange和queue之间的虚拟链接,binding中包含routing key,binding信息保存到exchange的查询表中,用于message的分发,这样RabbitMQ就知道如何正确地将消息路由发到指定的Queue了。
????Queue:队列,也称为Message Queue(消息队列),保存消息并将它们转发给消费者。
???? 思考
我们完全可以直接使用Connection 就能完成Channel信道的工作,为什么还要引入Channel信道呢?
我们试想这样一个场景, 一个应用程序中有很多个线程需要从RabbitMQ 中消费消息,或者生产消息,那么必然需要建立很多个Connection ,也就是许多个TCP 连接。然而对于操作系统而言,建立和销毁TCP 连接是非常昂贵的开销,如果遇到使用高峰,性能瓶颈也随之显现。RabbitMQ 采用类似NIO' (Non-blocking 1/0) 的做法,选择TCP 连接复用,不仅可以减少性能开销,同时也便于管理。
每个线程把持一个信道,所以信道复用了Connection 的TCP 连接。同时RabbitMQ 可以确
保每个线程的私密性,就像拥有独立的连接一样。
当每个信道的流量不是很大时,复用单一的Connection 可以在产生性能瓶颈的情况下有效地节省TCP 连接资源。
但是当信道本身的流量很大时,这时候多个信道复用一个Connection 就会产生性能瓶颈,进而使整体的流量被限制了。此时就需要开辟多个Connection ,将这些信道均摊到这些Connection 中, 至于这些相关的调优策略需要根据业务自身的实际情况进行调节。
二、RabbitMQ的运行流程
???? 生产者发送消息
(1) 生产者连接到RabbitMQ Broker , 建立一个连接( Connection) ,开启一个信道(Channel)
(2) 生产者声明一个交换器(Exchange),并设置相关属性,比如交换机类型、是否持久化等
(3) 生产者声明一个队列(Queue)井设置相关属性,比如是否排他、是否持久化、是否自动删除等
( 4 ) 生产者通过路由键(Routing key)将交换器和队列绑定起来
( 5 ) 生产者发送消息至RabbitMQ Broker,其中包含路由键、交换器等信息
(6) 相应的交换器根据接收到的路由键查找相匹配的队列。
( 7 ) 如果找到,则将从生产者发送过来的消息存入相应的队列中。
(8) 如果没有找到,则根据生产者配置的属性选择丢弃还是回退给生产者
(9) 关闭信道。
(1 0) 关闭连接。
???? 消费者接收消息
(1)消费者连接到RabbitMQ Broker ,建立一个连接(Connection ) ,开启一个信道(Channel) 。
(2) 消费者向RabbitMQ Broker 请求消费相应队列中的消息,可能会设置相应的回调函数,
以及做一些准备工作(3)等待RabbitMQ Broker 回应并投递相应队列中的消息, 消费者接收消息。
(4) 消费者确认( ack) 接收到的消息。
( 5) RabbitMQ 从队列中删除相应己经被确认的消息。
( 6) 关闭信道。
( 7) 关闭连接。
三、实际演练
我们现在对RabbitMQ的基本概念以及流程熟悉了,那么我们接下来就要进行实操了。
3.1 准备工作
(1)启动RabbitMQ的服务
systemctl start rabbitmq-server
(2)打开防火墙
我们需要对5672这个端口进行打开防火墙设置(如果已经打开,忽略此步)
firewall-cmd --zone=public --add-port=5672/tcp --permanent
重启防火墙:
firewall-cmd --reload
(3)创建一个Maven项目并导入依赖
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.10.0</version>
</dependency>
如果是使用Spring Boot集成,那么则导入以下依赖:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
3.2 定义生产者
???? 代码
package com.yixin.simple;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Producer {
public static void main(String[] args) {
// 1: 创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 2: 设置连接属性
connectionFactory.setHost("服务器地址");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setUsername("yixin");
connectionFactory.setPassword("123456");
Connection connection = null;
Channel channel = null;
try {
// 3: 从连接工厂中获取连接
connection = connectionFactory.newConnection("生产者");
// 4: 从连接中获取通道channel
channel = connection.createChannel();
// 5: 申明队列queue存储消息
/*
* 如果队列不存在,则会创建
* Rabbitmq不允许创建两个相同的队列名称,否则会报错。
*
* @params1: queue 队列的名称
* @params2: durable 队列是否持久化
* @params3: exclusive 是否排他,即是否私有的,如果为true,会对当前队列加锁,其他的通道不能访问,并且连接自动关闭
* @params4: autoDelete 是否自动删除,当最后一个消费者断开连接之后是否自动删除消息。
* @params5: arguments 可以设置队列附加参数,设置队列的有效期,消息的最大长度,队列的消息生命周期等等。
* */
channel.queueDeclare("queue1", false, false, false, null);
// 6: 准备发送消息的内容
String message = "你好,一心同学";
// 7: 发送消息给中间件rabbitmq-server
// @params1: 交换机exchange
// @params2: 队列名称/routing
// @params3: 属性配置
// @params4: 发送消息的内容
channel.basicPublish("", "queue1", null, message.getBytes());
System.out.println("消息发送成功!");
} catch (Exception ex) {
ex.printStackTrace();
System.out.println("发送消息出现异常...");
} finally {
// 7: 释放连接关闭通道
if (channel != null && channel.isOpen()) {
try {
channel.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
if (connection != null) {
try {
connection.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
}
}
我们点击运行,如果出现以下错误则说明我们没有给该用户赋予资源权限,所以其没有连接权限:
我们只需要赋予其资源权限即可,执行以下代码:
rabbitmqctl set_permissions -p / 用户名 ".*" ".*" ".*"
重新运行,就可以成功了!
???? 分析
我们前往Web控制台(http://服务器地址:15672/)进行查看该队列的信息:
我们点击queue1进行查看详情,以下是我们该队列的信息的基本操作:
我们点击Get messages进行查看:
其中ACK是应答模式
(1)Nack:消息被消费不告诉rabbitmq-server消息被消费了,会重新投递队列中
(2)Ack:rabbitmq-server接收到消费信息,会从服务器中移除消息。
3.3 定义消费者
???? 代码
package com.yixin.simple;
import com.rabbitmq.client.*;
import java.io.IOException;
public class Consumer {
public static void main(String[] args) {
// 1: 创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 2: 设置连接属性
connectionFactory.setHost("服务器地址");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setUsername("yixin");
connectionFactory.setPassword("123456");
Connection connection = null;
Channel channel = null;
try {
// 3: 从连接工厂中获取连接
connection = connectionFactory.newConnection("消费者");
// 4: 从连接中获取通道channel
channel = connection.createChannel();
// 5: 申明队列queue存储消息
/*
* 如果队列不存在,则会创建
* Rabbitmq不允许创建两个相同的队列名称,否则会报错。
*
* @params1: queue 队列的名称
* @params2: durable 队列是否持久化
* @params3: exclusive 是否排他,即是否私有的,如果为true,会对当前队列加锁,其他的通道不能访问,并且连接自动关闭
* @params4: autoDelete 是否自动删除,当最后一个消费者断开连接之后是否自动删除消息。
* @params5: arguments 可以设置队列附加参数,设置队列的有效期,消息的最大长度,队列的消息生命周期等等。
* */
channel.basicConsume("queue1", true, new DeliverCallback() {
public void handle(String s, Delivery delivery) throws IOException {
System.out.println("收到的消息是"+new String(delivery.getBody(),"UTF-8"));
}
},new CancelCallback(){
public void handle(String consumerTag) throws IOException{
System.out.println("接收失败");
}
});
// 7: 发送消息给中间件rabbitmq-server
// @params1: 交换机exchange
// @params2: 队列名称/routing
// @params3: 属性配置
// @params4: 发送消息的内容
System.out.println("开始接收消息");
//让我们的生产者一直处于运行状态,这样就不必使用while(true)了
System.in.read();
} catch (Exception ex) {
ex.printStackTrace();
System.out.println("发送消息出现异常...");
} finally {
// 7: 释放连接关闭通道
if (channel != null && channel.isOpen()) {
try {
channel.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
if (connection != null) {
try {
connection.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
}
}
点击运行后,控制台打印:
可以发现我们的消费者已经成功拿到我们的数据了!
???? 分析
我们现在重新回到Web控制界面进行查看:
即使收到消息后我们的消费者也还是一直处于连接的状态( System.in.read()的功劳),也就是说我们的生产者那边只要一投递消息,我们生产者这边就可以立即收到,我们可以去控制台查看其连接信息:
而且我们消费者的信道Channel也是还处于占用的状态:
看到这里,是不是就对其工作流程基本熟悉了,我们再来看一下这张图,相信大家思路立马就清晰了:
小结
以上就是【一心同学】讲解的【RabbitMQ】的【工作流程】,学完是不是感觉很震惊,这一部分还是希望大家能够【熟练掌握】,因为在我们后面的讲解中,可以帮助我们更加清晰的理解【RabbitMQ】的其他知识点。
如果这篇【文章】有帮助到你,希望可以给【一心同学】点个赞????,创作不易,相比官方的陈述,我更喜欢用【通俗易懂】的文笔去讲解每一个知识点,如果有对【后端技术】感兴趣的小可爱,也欢迎关注❤️❤️❤️ 【一心同学】❤️❤️❤️,我将会给你带来巨大的【收获与惊喜】????????!
最后
以上就是迷人网络为你收集整理的RabbitMQ到底是怎么运行的?一、核心组件二、RabbitMQ的运行流程三、实际演练小结的全部内容,希望文章能够帮你解决RabbitMQ到底是怎么运行的?一、核心组件二、RabbitMQ的运行流程三、实际演练小结所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复