我是靠谱客的博主 大力猫咪,最近开发中收集的这篇文章主要介绍RabbitMQ和Spring AMQP学习二(消息模型详解、Spring AMQP启动流程分析-超详细)RabbitMQ和Spring AMQP学习二,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

文章目录

  • RabbitMQ和Spring AMQP学习二
    • RabbitMQ消息模型
      • Broker
      • Connection
      • Channel
        • 背景
        • 使用
        • Java Client
    • Spring AMQP使用详解
      • 消息队列使用规范
        • 哪些配置该配置在生产者端?
        • 哪些配置该配置在消费者端?
      • 使用PoolChannelConnectionFactory
        • maxTotal设置为多少合适?
      • Spring AMQP启动流程
        • 开启消费者线程
        • 初始化消费者
        • RabbitAdmin声明Exchange、Queue、Binding
        • 扫描@RabbitListener注解
        • 完整流程
        • 生产者端
        • PoolChannenlConnectionFactory和Apache Common Pool2驱逐规则问题
        • 结论
    • 建议
    • 示例代码

RabbitMQ和Spring AMQP学习二

RabbitMQ消息模型

RabbitMQ消息模型分为了三大部分,其中隐藏了很多细节
在这里插入图片描述

Broker

第一节说的Exchange、Binding、Queue都被划分为Broker部分.Broker中文译为中间人,是不是很形象。充分说明了RabbitMQ Server的角色,只作为消息的中转站。

Connection

无论是Producer还是Consumer,要想向server发送消息、接收server的消息都要同RabbitMQ Server建立连接。

注意 :对于Rabbit MQ Server而言,无论是Producer还是Consumer在它的角度来看,都属于Client-客户端。务必要搞清这点。

每一个客户端都会与Rabbit MQ Server建立一条、TCP长连接.可以在下图中看到官方Connection文档明确指出了客户端使用一条TCP连接,并且当建立连接成功时,可以发布、订阅消息。
在这里插入图片描述

所以,Producer、Consumer和Broker之间最终都是通过各自建立的单条的TCP长连接进行消息的发布、消费的。

Producer、Consumer会定时向Rabbit MQ Server发送心跳保持连接.心跳文档

当connection连接失败时,Rabbit MQ Java Client会自动重连。官方文档
在这里插入图片描述

Channel

背景

实际应用场景中,一个生产者端应用不会只向一个Exchange发送消息,可能会向几十个Exchange发送不同的消息。由上图可知,Exchange属于Broker一部分,而生产者、消费者和Broker之间通过TCP长连接同Broker进行通信。那么就会出现,一个生产者端会打开N个TCP连接向N个Exchange发送消息。消费者消费消息也是同理(消费者订阅Broker也会打开很多TCP连接)。如果一台机器上生产者应用(指发送消息的多个应用)很多,那么打开的TCP连接就很多。造成资源的浪费。

基于此原因,Rabbit MQ使用虚拟连接-Channel.即多个Channel使用一条TCP连接。网上有很多图,如下
在这里插入图片描述

但是这个图有个小问题就是,只画出了Producer端,没有画出Consumer端,所以会对初学者曹成一些困扰。结合上述背景理解即可.

同时,一般来说,一个消费者会有一个属于它自己的特定的channel,每一个channel都有一个自己的线程。
在这里插入图片描述

并且channel的连接时间也很长
在这里插入图片描述

使用

Rabbit MQ将对Exchange、Queue、Binding的操作都由Channel完成。Channel是存在于Connection中的,如果connection不存在了,那么基于此connection的所有channel都会消失
在这里插入图片描述

Java Client

使用Rabbit MQ 提供的原生的Java Client API比较容易理解上面的概念、以及Rabbit MQ入门使用

生产者端Java代码

ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
//...省略设置username、password、virtualHost等设置

// 建立TCP连接
Connection connection = factory.createConnection();
// 使用建立好的连接建立虚拟连接channel
Channel channel = connection.createChannel();
// 声明要发送消息到的Exchange,由Channel完成
channel.exchangeDeclare("topicExchangeName", BuiltinExchangeType.Topic);
// 声明消息
byte[] message = "this is message".getBytes();
channel.basicPublish("topicExchangeName", "routingKey", null, message);

消费者端Java代码

ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
//...省略设置username、password、virtualHost等设置

// 设置60秒心跳超时
factory.setRequestedHeartbeat(60);
// 建立TCP连接
Connection connection = factory.createConnection();
// 使用建立好的TCP连接建立虚拟连接channel
Channel channel = connection.createChannel();
// 声明要订阅的队列
channel.queueDeclare("consumerQueueName", false, false, false, null);
// 声明订阅的队列要绑定的Exchange
channel.queueBind("consumerQueueName", "topicExchangeName", "bindingKey");
// 消费订阅队列里的消息
// 是否自动确认消息,这里是为false,则需要消费者端手动确认消息
boolean autoAck = false;
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            // delivery代表的是一个Message对象
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [x] Received '" + delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
            try{
	            // 做一些业务操作
	            doWork(message);
	            // 没有异常的话,确认消息消费成功,第一个参数为消息中带的tag
	            channel.baiscAck(delivery.getEnvelope().getDeliveryTag(), true);
            }catch(Exception exception) {
            // 如果有异常,则不确认该消息,并将该消息发送会Queue.最后一个参数requeue代表是否返回到Queue中
                channel.backNack(delivery.getEnvelope().getDeliveryTag(), true, true)
            }
         };  
channel.basicConsume("consumerQueueName", autoAck, deliverCallback, consumerTag -> { });

更多Java Client的使用见官方AMQP快速使用文档、Rabbit MQ官方Java Client API这两个文档即可快速入门channel的使用。更多使用例子可以在官方GitHub示例代码找到

Spring AMQP使用详解

消息队列使用规范

实际应用场景中,生产者、消费者往往存在于不同主机上。而且二者都要进行Rabbit MQ的连接配置,那么就不能像第一篇文章示例代码那样都写在一起。

哪些配置该配置在生产者端?

个人认为,生产者同Rabbbit MQ建立连接后,只负责发送消息。Rabbit MQ 的Exchange负责接收生产者发送的消息。那么生产者端能看到的其实只有Exchange。所以,生产者端只需要添加建立连接的配置,以及Exchange的配置

MQ的配置:

建立连接connection时最后起一个名字,方便查看和管理

package com.example.rabbitmq.rabbitmqdemo.config.mq;

import com.rabbitmq.client.ConnectionFactory;
import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.rabbit.connection.PooledChannelConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * 具体类型的ConnectionFactory见<a href="https://docs.spring.io/spring-amqp/docs/current/reference/html/#connections">Spring AMQP文档</a>
 * @author jacksparrow414
 * @date 2020/12/16
 */
@Configuration
public class RabbitMqConfig {

    /**
     * 官方推荐使用{@link PooledChannelConnectionFactory}来管理Channel连接.
     */
    @Bean
    public org.springframework.amqp.rabbit.connection.ConnectionFactory connectionFactory() {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        PooledChannelConnectionFactory result = new PooledChannelConnectionFactory(connectionFactory);
        result.setHost("localhost");
        result.setPort(5672);
        result.setUsername("dhb");
        result.setPassword("123456");
        result.setVirtualHost("dhb");
        // 设置连接的名字
        result.setConnectionNameStrategy(factory -> "producer-connection");
        // TODO 对象池的设置
        // 使用Apache Common Pool2,一共有两个对象池
        result.setPoolConfigurer((pool, tx) -> {
            if (tx) {
                // 用于管理事务的对象池
                pool.setMaxTotal(2);
            }else {
                // 非事务的对象池,默认最大为8
                pool.setMaxTotal(8);
            }
        });
        return result;
    }
    
    @Bean
    public AmqpAdmin rabbitAdmin() {
        return new RabbitAdmin(connectionFactory());
    }
    
    /**
     * 默认MessageConverter是{@link org.springframework.amqp.support.converter.SimpleMessageConverter}
     * @return
     */
    @Bean(name = "rabbitTemplate")
    public RabbitTemplate rabbitTemplate() {
        RabbitTemplate reuslt = new RabbitTemplate(connectionFactory());
        reuslt.setMessageConverter(new Jackson2JsonMessageConverter());
        return reuslt;
    }
}

Exchange的声明配置:

@Configuration
public class TopicExchangeConfig {

    @Bean
    public TopicExchange topicExchange() {
        return new TopicExchange("topic");
    }
}

生产者端无需关心Exchange路由消息到哪个Queue上,所以Queue、Binding的配置不需要在生产者端配置,在客户端配置

哪些配置该配置在消费者端?

如上所说,消费者只关心自己订阅的消息队列即可。不关心具体消息从哪个Exchange转发的。所以Queue的声明以及Binding的声明放在消费者端配置

MQ配置

package com.example.rabbitmq.consumerdemo.config.mq;

import cn.hutool.extra.spring.SpringUtil;
import java.util.concurrent.ThreadPoolExecutor;
import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.PooledChannelConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.boot.autoconfigure.amqp.RabbitAutoConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

/**
 * @author jacksparrow414
 * @date 2020/12/17
 */
@Configuration
@Import(SpringUtil.class)
public class RabbitMqConfig {
    
    @Bean
    public ConnectionFactory connectionFactory() {
        com.rabbitmq.client.ConnectionFactory connectionFactory = new com.rabbitmq.client.ConnectionFactory();
        connectionFactory.setHost("localhost");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("dhb");
        connectionFactory.setPassword("123456");
        connectionFactory.setVirtualHost("dhb");
        // 设置心跳超时时间,默认60
        connectionFactory.setRequestedHeartbeat(60);
        PooledChannelConnectionFactory result = new PooledChannelConnectionFactory(connectionFactory);
        result.setConnectionNameStrategy(factory -> "consumer-connection");
        result.setPoolConfigurer((pool, tx) -> {
            if (tx) {

            }else {
                // 设置channel数量,此数量应该等于消费者端的Queue的数量. 为什么要等于?
                // 客户端要监听所有的Queue,监听Queue的时候,客户端会同Channel建立连接,
                // 如果Channel连接小于Queue的数量,则有的Queue不会建立Channel连接,
                // 那么当Exchange向对应的Queue发送消息时,到达该Queue的消息不会被消费者消费(都没有建立连接,自然消息推不下去)
                // 下面的RabbitAdmin初始化的时候也是需要初始化RabbitTemplate的,接着会调用Channel完成,会用到channel,但是此channel用完之后会被关闭,
                // 所以在后面的Queue订阅channel时,可以拿到连接
                pool.setMaxTotal(12);
            }
        });
        return result;
    }
    @Bean
    public AmqpAdmin rabbitAdmin() {
        return new RabbitAdmin(connectionFactory());
    }

    @Bean
    public RabbitTemplate rabbitTemplate() {
        RabbitTemplate result = new RabbitTemplate(connectionFactory());
        result.setMessageConverter(new Jackson2JsonMessageConverter());
        return result;
    }
    
    @Bean
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
        SimpleRabbitListenerContainerFactory result = new SimpleRabbitListenerContainerFactory();
        result.setConnectionFactory(connectionFactory());
        result.setPrefetchCount(2);
        result.setMessageConverter(new Jackson2JsonMessageConverter());
        return result;
    }
}

Queue、Binding的声明、配置

package com.example.rabbitmq.consumerdemo.config.queue;

import cn.hutool.extra.spring.SpringUtil;
import com.example.rabbitmq.rabbitmqdemo.config.exchange.TopicExchangeConfig;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;

/**
 * @author jacksparrow414
 * @date 2020/12/17
 */
@Configuration
@Import(TopicExchangeConfig.class)
public class TopicQueueConfig {
    
    @Bean
    public Queue firstTopicQueue() {
        return new Queue("firstTopic");
    }
    
    @Bean
    public Queue secondTopicQueue() {
        return new Queue("secondTopic");
    }
    
    @Bean
    public Queue thirdTopicQueue() {
        return new Queue("thirdTopic");
    }
    
    @Bean
    public Binding firstTopicBinding() {
        return BindingBuilder.bind(firstTopicQueue()).to(getTopicExchange()).with("*.first.*");
    }
    
    @Bean
    public Binding secondTopicBinding() {
        return BindingBuilder.bind(secondTopicQueue()).to(getTopicExchange()).with("*.second.*");
    }
    
    @Bean
    public Binding thirdTopicBinding() {
        return BindingBuilder.bind(thirdTopicQueue()).to(getTopicExchange()).with("*.*.Topic");
    }
    
    @Bean
    public Binding fourthTopicBinding() {
        return BindingBuilder.bind(thirdTopicQueue()).to(getTopicExchange()).with("message.#");
    }
    
    private TopicExchange getTopicExchange() {
       return SpringUtil.getBean(TopicExchange.class);
    }
}

存在的问题:

这样的配置,使得生产者、消费者的配置一目了然。但是有个问题是,消费者声明Binding时候,要把Queue绑定在Exchange对象上。但是我们的Exchange是声明在服务端的工程里的。怎么办?

  1. 在消费端重新声明Exchange,并绑定。可行,但是不规范,这样就违背了上述的规范,并且由消费者端声明Exchange不可控

  2. 将生产者端的Exchange配置单独打包,在消费者端引入即可。这样既引入了生产者端的配置,又符合我们的规范
    在这里插入图片描述

    如图所示,在生产者端只将Exchange的配置打包出来。在消费者端,上述代码中通过@Import()注解注入要配置的Exchange对应的bean即可.

使用PoolChannelConnectionFactory

Spring AMQP官方建议使用PoolChannelConnectionFactory来管理客户端(这里的客户端代表生产者、消费者.上面说的消息模型)的channel复用。其依赖于Apache Common Pool2最对channel对象池。

maxTotal设置为多少合适?

对象池的maxTotal设置为多少合适?如果消费者端订阅的队列消息比较少,可以设置为小于订阅队列数的数目,这样当channel对象闲置之后,会被及时回收并利用。这就是我最初的想法。

对象池可以设置最大闲置数量、以及闲置多久之后被回收。设置了之后,启动消费者端,查看Rabbit MQ控制台,发现经过闲置时间之后,channel并没有被回收
在这里插入图片描述

此时消费者端没有消息可以消费,并经过了设置的闲置时间,没有被回收。这是为什么呢?

第一个坑:

没有被回收,说明这个channel对象是在对象池中的状态并不是闲置状态,很明显,Apache Common Pool2对象池的闲置并不等同于控制台的闲置。控制台的闲置意味着,此时channel没有接收到任何消息。而对象池中的闲置则是认为该对象不活跃。这是自己给自己挖的第一个坑

此时又想到了另外一个问题,Rabbit MQ消息模型中指出,connection是一条实际的TCP长连接,channel是基于connection的虚拟连接。那么channel是不是长连接?我并不确定。于是在Spring AMQP源码中寻找答案。顺便了解下Spring AMQP启动都干了什么

Spring AMQP启动流程

将Spring AMQP日志级别调为debug级别,并以debug模式启动消费者端代码。首先在控制台看到了如下日志
在这里插入图片描述

在日志中可以看出两点,第一,每一个被消费者订阅的queue都会有单独的channel,并且有多个不同的线程pool-9、pool-10这些有打印ConsumerOk这些日志。于是进入日志后面的类中进行进步debug

开启消费者线程

Starting consumer日志位于BlockingQueueConsumer类的start方法,在SimpleMessageListenerContainer的initialize方法中,this.consumer就是BlockingQueueConsumer的实例
在这里插入图片描述

调用位置在同一个类中的内部类AsyncMessageProcessingConsumer的run方法中
在这里插入图片描述

此类实现了Runnable接口,这个线程run在哪执行呢
在这里插入图片描述

在本类中的doStart()方法实例化并开启了一个新的线程池用一个新的线程(该线程执行的上面AsyncMessageProcessingConsumer的run方法)执行此方法。可以看到,消费者线程开启就是在这里开启的。消费者线程开启之后又干了什么呢?稍后再详细debug.

初始化消费者

从上面的步骤可知,消费者启动是由BlockingQueueConsumer类的实例调用其start()开启的.那么该类的实例是什么时候初始化的呢?注意上图中标红的上几行,有一个initializeConsumers()方法,顾名思义就是初始化consumer。点进去看看都是干嘛的
在这里插入图片描述

可以看到,是在initializeConsumers里面实例化该类的实例

这两步已经看到了,那么是哪里调用的doStart()方法呢?

该方法重写继承的抽象类的AbstractMessageListenerContainer.doStart()方法,该方法由该抽象类内部的start调用
在这里插入图片描述

doStart()上面的两个方法,由名字可知,配置RabbitAdmin,由于我们在自定义的配置文件了RabbitAdmin.所以第一个方法就是简单的将已经配置的bean赋值给当前的amqpAdmin属性

更进一步:

点进去RabbitAdmin类,发现该类实现了InitializingBean接口。当我们配置文件中执行new RabbitAdmin()之后,在其afterPropertiesSet实现方法里
在这里插入图片描述

可以看到,调用当前factory-PoolChannelConnectionFactory的addConnectionListener方法
在这里插入图片描述

参数ConnectionListener是个函数式接口,接收一个connection

super.addConnectionListener
在这里插入图片描述

将当前函数(上面两张图的this.connectionFactory.addListener()方法)放进去。在PoolChannelConnectionFactory中ConnectionListener的实现类为CompositeConnectionListener。可以在PoolChannelConnectionFactory类中找到此属性的声明。

到此为止,当实例化RabbitAdmin该bean时,最重要的就是为当前connectionFactory添加connectionListener

RabbitAdmin声明Exchange、Queue、Binding

chenckMismatchedQueues();方法
在这里插入图片描述

第一个至关重要的方法就是initialize().这个只会满足条件时才会进入。debug时并没有进入if块,执行的是else的connection部分,createConnection()完成了什么呢?
在这里插入图片描述

可以看到,当第一此进入该方法时,connection为空,会new Connection,最后会执行onCreate方法,
在这里插入图片描述

这个方法执行对应connectionListener的OnCreate方法,就是上一步中分析的RabbitAdmin的afterPropertiesSet添加的listener的方法,因此,由上面几张关于Rabbit Admin的图可知,最终会执行addListener中的initalize方法,就是真正Rabbit Admin初始化的工作
在这里插入图片描述
在这里插入图片描述

以declareExchange方法为例,可以看到,最终还是调用了Rabbit MQ Java Client API,channel.exchangeDeclare()方法。而控制台的日志也佐证了这一点
在这里插入图片描述

再次进入该方法时,由于connection已经建立,所以直接返回。

那么connection.close方法不是又关闭了吗
在这里插入图片描述
可以看到此关闭方法什么都没有,也印证了只有一条连接,不会重复创建TCP连接

对于RabbitAdmin的作用,官方文档也给出了解释。很简短
在这里插入图片描述
可以看到,和我们分析的是一样的,对于Queue,Exchange,Binding确实是懒加载的,并且是通过ConnectionListener触发的。

扫描@RabbitListener注解

现在返回最初的位置AbstractMessageListenerContainer的start方法,该方法具体实现了rabbit MQ的connection的建立,Exchange、Queue、Binding的声明,并初始化消费者,同时又开启了一个消费者线程去进执行具体方法。那么哪里调用这个start()方法呢?哪里实例化MessageListenerContainer的实现类呢?debug中发现了
在这里插入图片描述
在RabbitListenerEndpointRegistrar中registerListenerContainer方法
在这里插入图片描述
在这个类中实例化MessageListenerContainer实例.debug发现但是并没有调用startIfNecessary方法。,这里只执行到this.listenerContainers.put之后就没执行里面的方法。

那是什么时候执行的该方法呢?

观察RabbitListenerEndpointRegistry发现其实现了SmartLifecycle接口。这个接口是干嘛用的呢。当Spring加载和初始化所有bean之后,并完成初始化后,会会接着回调实现该接口的类中对应的方法(start()方法).
在这里插入图片描述

找到RabbitListenerEndpointRegistry的start方法
在这里插入图片描述

可以看到这里拿到刚才put进去的containers

再往上debug,
在这里插入图片描述
最后来到RabbitListenerAnnotationBeanPostProcessor该类,此类顾名思义,处理@RabbitListener注解的类
在这里插入图片描述

找到所有带有@RabbitListener的方法、类
在这里插入图片描述

将会解析所有@RabbitListener,并将其注册为endpointDescriptor

对象最终在下一步循环(上面的图)为每一个endpoint注册Listenercontainer

完整流程

  1. spring容器启动,此时会扫描所有@RabbitListener注解,并未每个@RabbitListener实例化一个单独的MessageListenerContainer实例。如果我们配置RabbitAdmin,则会执行其afterPropertiesSet()添加connectionListener,以便下面声明Exchange、Binding、Queue等。如果没有配置RabbitAdmin怎么办?不用担心,有自动配置
    在这里插入图片描述

    当我们引用了带有RabbitTemplate的包时,该配置会自动生效
    在这里插入图片描述

    当spring容器中缺少AmqpAdmin的实例时,会自动进行配置。

    关于@Conditional更多注解可看
    在这里插入图片描述

  2. 当容器所有bean启动完毕,刚刚实例好的每一个MessageListenerContainer实例,都会执行其start方法,start方法包含两个重要步骤

    • 建立connection,如果不存在,则建立。存在则直接使用。close方法是空方法,不会关闭连接
    • 声明Exchange、Binding、Queue

    最后执行其doStart()方法

  3. 接下来在每一个MessageListernerContainer实例SimpleMessageListenerContainer中的doStart方法中都会新创建一个属于该MessageListenerContainer的消费者,同时新开另外一个线程AsyncMessageProcessingConsumer去执行当前消费者(BlockingQueueConsumer)的start方法-属于initialize

    执行消费者对象start的线程会首先拿到一个channel。然后进行等待消费,并未关闭channel

  4. 当initialize之后,此线程会一直在一个while循环中,mainLoop(),也未见channel关闭

可以得出结论,对于订阅了Queue的消费者端应用,在应用启动之后,会一直监听对应Queue,当有消息达到时,即消费消息。

生产者端

那么在生产者端呢?channel应该为多少呢?

启动生产者端应用,当不发布任何消息时,idea控制台没有任何日志。同时rabbit MQ控制台也没有新的connection、channel。

当调用RabbitTemplate.convertAndSend方法发布消息时,最终会执行
在这里插入图片描述

首先第一步,还是拿到channel,然后再执行doSend方法。先看获取channel方法
在这里插入图片描述

在上图可以看到,createConnection点进去和上面消费者端一样的逻辑,建立connection,建立连接之后会调用ConnectionListener进行Exchange的声明。

再通过connection创建channel.创建Channel的代理类
在这里插入图片描述

创建好channel之后,执行doSend方法(invokeAction中调用)。最后在执行完毕会执行关闭channel、cononection操作。对于connection,和消费者端一样,由于使用了PoolChannelConnectionFatory,它的connection的close方法是个空方法,所以connection一旦建立则不会关闭。而channel的关闭则是由代理类执行上图中的handleClose方法,在其内部,Spring AMQP的实现并没有关闭Channel,仅仅是更新了状态。可以点进具体方法debug看一下。

PoolChannenlConnectionFactory和Apache Common Pool2驱逐规则问题

这里比较坑的一点就是,如果你设置了common pool的的驱逐规则,那么对象池会去修改一些东西,在达到驱逐规则的时候,会调用destoryObject方法。但是本人debug发现,最终的channel关闭操作并没有关闭。不知道调用到哪去了。现象就是,如果你开启了驱逐规则,那么观察Rabbit MQ控制台,会发现channel数量一直在增加,但是,对象池里的对象又仅有驱逐之后的channel存在。只要驱逐规则不启动,那么对象池里一直都会存在你设置的数量的channle。并且可以复用。具体逻辑,可自己debug看看,我是看累了.

而且Rabbit MQ 也确实说了channel声明周期也很长,首先要考虑重用,而不是频繁开关。有关文档图在上面的Channel那节下面。

结论

  1. 对于消费者端,其配置的PoolChannelConnectionFactory的对象池,建议channel对象数量设置为当前应用的订阅的Queue的数量
  2. 对于生产者端,其配置的PoolChannelConnectionFactory的对象池,建议channel对象数量默认(8个)即可。并且不要开启对象池的驱逐策略。

建议

建议学习时结合Rabbit MQ官方文档、Spring AMQP官方文档、以及各自官方的GitHub仓库的example学习。官方的话比网上任何的百度信息都靠谱.

示例代码

  1. 生产者端代码
  2. 消费者端代码

最后

以上就是大力猫咪为你收集整理的RabbitMQ和Spring AMQP学习二(消息模型详解、Spring AMQP启动流程分析-超详细)RabbitMQ和Spring AMQP学习二的全部内容,希望文章能够帮你解决RabbitMQ和Spring AMQP学习二(消息模型详解、Spring AMQP启动流程分析-超详细)RabbitMQ和Spring AMQP学习二所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(228)

评论列表共有 0 条评论

立即
投稿
返回
顶部