我是靠谱客的博主 迷人网络,最近开发中收集的这篇文章主要介绍RabbitMQ到底是怎么运行的?一、核心组件二、RabbitMQ的运行流程三、实际演练小结,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

????????????????????????

哈喽!大家好,我是【一心同学】,一位上进心十足的【Java领域博主】!????????????

✨【一心同学】的写作风格:喜欢用【通俗易懂】的文笔去讲解每一个知识点,而不喜欢用【高大上】的官方陈述。

✨【一心同学】博客的领域是【面向后端技术】的学习,未来会持续更新更多的【后端技术】以及【学习心得】。

✨如果有对【后端技术】感兴趣的【小可爱】,欢迎关注一心同学】????????????

❤️❤️❤️感谢各位大可爱小可爱!❤️❤️❤️ 


目录

一、核心组件

???? 概念讲解

???? 思考

二、RabbitMQ的运行流程

???? 生产者发送消息

???? 消费者接收消息

三、实际演练

3.1 准备工作

3.2 定义生产者

???? 代码

???? 分析

3.3 定义消费者

???? 代码

???? 分析

小结


一、核心组件

???? 概念讲解

我们来看一下RabbitMQ核心组成部分,如下图:

以上这张图非常重要,因为其就是我们的RabbitMQ的核心组成,接下来,一心同学将来讲解其每一个组件。

????Producer:消息的生产者,也是一个向交换器发布消息的客户端应用程序。

????Consumer:消息的消费者,表示一个从一个消息队列中取得消息的客户端应用程序。

????Connection:生产者、消费者与broker之间的TCP链接,需要进行 TCP/IP的三次握手和四次挥手。

????Channel:网络信道,几乎所有的操作都在Channel中进行,Channel是进行消息读写的通道。

????Server:又称Broker ,接受客户端的连接,实现AMQP实体服务。

????Virtual Host:虚拟地址,用于进行逻辑隔离,是出于多租户和安全因素,把AMQP的基本组件划分到一个虚拟的分组中,可以把Virtual Host理解为mysql中的database,每个Virtual Host都可以进行单独的权限管理和逻辑操作。当多个不同的用户使用同一个RabbitMQ server提供的服务时,可以划分出多个vhost,每个用户在自己的vhost中创建exchange、queue等。

????Message:消息,服务与应用程序之间传送的数据,由Properties和body组成,Properties可以对消息进行修饰,比如消息的优先级,延迟等高级特性,Body则就是消息体的内容。

????Exchange:message到达broker的第一站,根据分发规则,匹配routing key,将消息分发到queue中去,可以绑定多个Queue也可以同时绑定其他的Exchange,常用的类型有direct(点对点)、topic(规则匹配)、fanout(广播)。

????Routing key:是一个路由规则,Exchange会根据消息的Routing Key匹配对应的Queue进行投递。

????Bindings:exchange和queue之间的虚拟链接,binding中包含routing key,binding信息保存到exchange的查询表中,用于message的分发,这样RabbitMQ就知道如何正确地将消息路由发到指定的Queue了。

????Queue:队列,也称为Message Queue(消息队列),保存消息并将它们转发给消费者。

???? 思考

我们完全可以直接使用Connection 就能完成Channel信道的工作,为什么还要引入Channel信道呢?

        我们试想这样一个场景, 一个应用程序中有很多个线程需要从RabbitMQ消费消息,或者生产消息,那么必然需要建立很多个Connection ,也就是许多个TCP 连接。然而对于操作系统而言,建立和销毁TCP 连接是非常昂贵的开销,如果遇到使用高峰,性能瓶颈也随之显现。RabbitMQ 采用类似NIO' (Non-blocking 1/0) 的做法,选择TCP 连接复用,不仅可以减少性能开销,同时也便于管理。

        每个线程把持一个信道,所以信道复用了Connection 的TCP 连接。同时RabbitMQ 可以确
保每个线程的私密性,就像拥有独立的连接一样。

        当每个信道的流量不是很大时,复用单一的Connection 可以在产生性能瓶颈的情况下有效地节省TCP 连接资源。

        但是当信道本身的流量很大时,这时候多个信道复用一个Connection 就会产生性能瓶颈,进而使整体的流量被限制了。此时就需要开辟多个Connection ,将这些信道均摊到这些Connection 中, 至于这些相关的调优策略需要根据业务自身的实际情况进行调节。

二、RabbitMQ的运行流程

???? 生产者发送消息

(1) 生产者连接到RabbitMQ Broker , 建立一个连接( Connection) ,开启一个信道(Channel)

(2) 生产者声明一个交换器(Exchange),并设置相关属性,比如交换机类型、是否持久化等

(3) 生产者声明一个队列(Queue)井设置相关属性,比如是否排他、是否持久化、是否自动删除等

( 4 ) 生产者通过路由键(Routing key)将交换器队列绑定起来

( 5 ) 生产者发送消息至RabbitMQ Broker,其中包含路由键交换器等信息

(6) 相应的交换器根据接收到的路由键查找相匹配的队列。

( 7 ) 如果找到,则将从生产者发送过来的消息存入相应的队列中。

(8) 如果没有找到,则根据生产者配置的属性选择丢弃还是回退给生产者

(9) 关闭信道。

(1 0) 关闭连接。

???? 消费者接收消息

(1)消费者连接到RabbitMQ Broker ,建立一个连接(Connection ) ,开启一个信道(Channel) 。

(2) 消费者向RabbitMQ Broker 请求消费相应队列中的消息,可能会设置相应的回调函数,
以及做一些准备工作

(3)等待RabbitMQ Broker 回应并投递相应队列中的消息, 消费者接收消息。

(4) 消费者确认( ack) 接收到的消息。

( 5) RabbitMQ 从队列中删除相应己经被确认的消息。

( 6) 关闭信道。

( 7) 关闭连接。

三、实际演练

我们现在对RabbitMQ的基本概念以及流程熟悉了,那么我们接下来就要进行实操了。

3.1 准备工作

(1)启动RabbitMQ的服务

 systemctl start rabbitmq-server

(2)打开防火墙

我们需要对5672这个端口进行打开防火墙设置(如果已经打开,忽略此步)

firewall-cmd --zone=public --add-port=5672/tcp --permanent

重启防火墙:

firewall-cmd --reload

(3)创建一个Maven项目并导入依赖

<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.10.0</version>
</dependency>

如果是使用Spring Boot集成,那么则导入以下依赖:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

3.2 定义生产者

???? 代码

package com.yixin.simple;


import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;


public class Producer {
    public static void main(String[] args) {
        // 1: 创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        // 2: 设置连接属性
        connectionFactory.setHost("服务器地址");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setUsername("yixin");
        connectionFactory.setPassword("123456");
        Connection connection = null;
        Channel channel = null;
        try {
            // 3: 从连接工厂中获取连接
            connection = connectionFactory.newConnection("生产者");
            // 4: 从连接中获取通道channel
            channel = connection.createChannel();
            // 5: 申明队列queue存储消息
            /*
             *  如果队列不存在,则会创建
             *  Rabbitmq不允许创建两个相同的队列名称,否则会报错。
             *
             *  @params1: queue 队列的名称
             *  @params2: durable 队列是否持久化
             *  @params3: exclusive 是否排他,即是否私有的,如果为true,会对当前队列加锁,其他的通道不能访问,并且连接自动关闭
             *  @params4: autoDelete 是否自动删除,当最后一个消费者断开连接之后是否自动删除消息。
             *  @params5: arguments 可以设置队列附加参数,设置队列的有效期,消息的最大长度,队列的消息生命周期等等。
             * */
            channel.queueDeclare("queue1", false, false, false, null);
            // 6: 准备发送消息的内容
            String message = "你好,一心同学";
            // 7: 发送消息给中间件rabbitmq-server
            // @params1: 交换机exchange
            // @params2: 队列名称/routing
            // @params3: 属性配置
            // @params4: 发送消息的内容
            channel.basicPublish("", "queue1", null, message.getBytes());
            System.out.println("消息发送成功!");
        } catch (Exception ex) {
            ex.printStackTrace();
            System.out.println("发送消息出现异常...");
        } finally {
            // 7: 释放连接关闭通道
            if (channel != null && channel.isOpen()) {
                try {
                    channel.close();
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
            if (connection != null) {
                try {
                    connection.close();
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
        }
    }
}

我们点击运行,如果出现以下错误则说明我们没有给该用户赋予资源权限,所以其没有连接权限:

我们只需要赋予其资源权限即可,执行以下代码:

rabbitmqctl set_permissions -p / 用户名 ".*" ".*" ".*"

重新运行,就可以成功了!

???? 分析

我们前往Web控制台(http://服务器地址:15672/)进行查看该队列的信息: 

 

我们点击queue1进行查看详情,以下是我们该队列的信息的基本操作

我们点击Get messages进行查看:

其中ACK是应答模式
(1)Nack:消息被消费不告诉rabbitmq-server消息被消费了,会重新投递队列中
(2)Ack:rabbitmq-server接收到消费信息,会从服务器中移除消息。

3.3 定义消费者

???? 代码

package com.yixin.simple;

import com.rabbitmq.client.*;

import java.io.IOException;


public class Consumer {
    public static void main(String[] args) {
        // 1: 创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        // 2: 设置连接属性
        connectionFactory.setHost("服务器地址");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setUsername("yixin");
        connectionFactory.setPassword("123456");
        Connection connection = null;
        Channel channel = null;
        try {
            // 3: 从连接工厂中获取连接
            connection = connectionFactory.newConnection("消费者");
            // 4: 从连接中获取通道channel
            channel = connection.createChannel();
            // 5: 申明队列queue存储消息
            /*
             *  如果队列不存在,则会创建
             *  Rabbitmq不允许创建两个相同的队列名称,否则会报错。
             *
             *  @params1: queue 队列的名称
             *  @params2: durable 队列是否持久化
             *  @params3: exclusive 是否排他,即是否私有的,如果为true,会对当前队列加锁,其他的通道不能访问,并且连接自动关闭
             *  @params4: autoDelete 是否自动删除,当最后一个消费者断开连接之后是否自动删除消息。
             *  @params5: arguments 可以设置队列附加参数,设置队列的有效期,消息的最大长度,队列的消息生命周期等等。
             * */
            channel.basicConsume("queue1", true, new DeliverCallback() {
                public void handle(String s, Delivery delivery) throws IOException {
                    System.out.println("收到的消息是"+new String(delivery.getBody(),"UTF-8"));
                }
            },new CancelCallback(){
                public void handle(String consumerTag) throws IOException{
                    System.out.println("接收失败");
                }
            });

            // 7: 发送消息给中间件rabbitmq-server
            // @params1: 交换机exchange
            // @params2: 队列名称/routing
            // @params3: 属性配置
            // @params4: 发送消息的内容
            System.out.println("开始接收消息");
            //让我们的生产者一直处于运行状态,这样就不必使用while(true)了
            System.in.read();
        } catch (Exception ex) {
            ex.printStackTrace();
            System.out.println("发送消息出现异常...");
        } finally {
            // 7: 释放连接关闭通道
            if (channel != null && channel.isOpen()) {
                try {
                    channel.close();
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
            if (connection != null) {
                try {
                    connection.close();
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
        }
    }
}

点击运行后,控制台打印:

 可以发现我们的消费者已经成功拿到我们的数据了!

???? 分析

我们现在重新回到Web控制界面进行查看:

即使收到消息后我们的消费者也还是一直处于连接的状态( System.in.read()的功劳),也就是说我们的生产者那边只要一投递消息,我们生产者这边就可以立即收到,我们可以去控制台查看其连接信息:

而且我们消费者的信道Channel也是还处于占用的状态:

 看到这里,是不是就对其工作流程基本熟悉了,我们再来看一下这张图,相信大家思路立马就清晰了:


小结

以上就是【一心同学】讲解的【RabbitMQ】的【工作流程】,学完是不是感觉很震惊,这一部分还是希望大家能够【熟练掌握】,因为在我们后面的讲解中,可以帮助我们更加清晰的理解【RabbitMQ】的其他知识点。 

如果这篇【文章】有帮助到你,希望可以给【一心同学】点个????,创作不易,相比官方的陈述,我更喜欢用【通俗易懂】的文笔去讲解每一个知识点,如果有对【后端技术】感兴趣的小可爱,也欢迎关注❤️❤️❤️ 【一心同学】❤️❤️❤️,我将会给你带来巨大的【收获与惊喜】????????!

最后

以上就是迷人网络为你收集整理的RabbitMQ到底是怎么运行的?一、核心组件二、RabbitMQ的运行流程三、实际演练小结的全部内容,希望文章能够帮你解决RabbitMQ到底是怎么运行的?一、核心组件二、RabbitMQ的运行流程三、实际演练小结所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(53)

评论列表共有 0 条评论

立即
投稿
返回
顶部