我是靠谱客的博主 想人陪大雁,最近开发中收集的这篇文章主要介绍RabbitMQ初步到精通-第九章-RabbitMQ整合SpringBoot第九章-RabbitMQ整合SpringBoot,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

目录

第九章-RabbitMQ整合SpringBoot

1. 前言

2. 整合方式

2.1 pom文件

2.2 配置文件

2.3 生产者

2.4 消费者

3. 整合案例

3.1 helloword

3.2 work

3.3 pub/sub

3.4 routing

4. 总结


第九章-RabbitMQ整合SpringBoot

1. 前言

前面学了很多都是基于java 的AMQP client 的内容,但到实际开发中运用呢,相信没有同学直接使用amqp client 吧。

所以,最终还是需要回归到java 的王者Spring,无论spring也好,还是SpringBoot 都是基于amqp client再做了一层封装而已,为的也是让大家用的爽,专注于业务。

由于大家基本SpringBoot用的最多,我们也就只涉及关于SpringBoot 的整合。用的时候肯定是用Springboot封装的方式,但如果大家想要系统的学的话,建议还是要基于Amqp Client去学,循序渐进,才能明白,SpringBoot 封装的那些注解,是啥东东。

2. 整合方式

2.1 pom文件

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

2.2 配置文件

spring.rabbitmq.host = 127.0.0.1
spring.rabbitmq.virtual-host = my-test-virtual
spring.rabbitmq.port = 5672
spring.rabbitmq.username = test
spring.rabbitmq.password = test

#spring.rabbitmq.listener.simple.acknowledge-mode = auto
#spring.rabbitmq.listener.simple.prefetch = 1

2.3 生产者

根据发送到不同的Exchange,代码不同

// 发送到默认Exchange
rabbitTemplate.convertAndSend("SolarWaterHeater", msg);

//发送到指定的Exchange
rabbitTemplate.convertAndSend(exchangeName, routeKey, msg);

2.4 消费者

根据Exchange的不同,队列的是否存在,消费者使用的注解方式不同

1、队列已经存在且相关的Exchange和绑定关系都OK的话,直接使用:

    @RabbitHandler
    @RabbitListener(queues = "SolarWaterHeater")
    public void receiveBAmtInMessage(Message message, Channel channel) throws InterruptedException {
        Thread.sleep(1000);
        log.info("小明洗澡用水: " + new String(message.getBody()));
    }

2. 如果使用默认的Exchange,且队列不存在的话,需要在注解上声明Queue的情况

    @RabbitHandler
    @RabbitListener(queuesToDeclare = @Queue("SolarWaterHeater"))
    public void receiveBAmtInMessage(Message message, Channel channel) throws InterruptedException {
        Thread.sleep(1000);
        log.info("小明洗澡用水: " + new String(message.getBody()));
    }

3. 使用配置类,声明Queue

@Configuration
public class QueueConfig {

    /**
     * 定义一个队列
     *
     * @return
     */
    @Bean("SolarWaterHeater")
    public Queue solarWaterHeater() {
        /*
            第二种方式:
                通过new Queue对象来创建队列
                参数1: 队列名称
                参数2: 是否持久化(默认:true)
                参数3: 是否独占(默认:false)
                参数4: 是否自动删除(默认:false)
                参数5: 队列的其他参数
         */
        return new Queue("SolarWaterHeater", true, false, false, null);
    }

  /*  @Bean("SolarWaterHeater")
    public Queue solarWaterHeater() {
         *//*
            第一种方式:
                durable():代表需要持久化
                exclusive(): 代表该队列独占(只允许有一个consumer监听)
                autoDelete(): 代表需要自动删除(没有consumer自动删除)
                withArgument(): 队列的其他参数

         *//*
        return QueueBuilder.durable("boot_work_queue").exclusive().autoDelete().withArgument("key", "val").build();
    }*/

}

4. 需要声明Exchange并绑定Queue的情况

    @RabbitHandler
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue("SolarWaterHeater-RedWine"),
            key = "REDWINE",
            exchange = @Exchange(value = "routing-exchange", type = ExchangeTypes.DIRECT, durable = "false")))
    public void receiveXMInMessage(Message message, Channel channel) throws InterruptedException {
        Thread.sleep(1000);
        log.info("小明洗澡用: " + new String(message.getBody()));
    }

5. 使用配置的方式:

@Configuration
public class RoutingConfig {

    /**
     * 准备两个队列SolarWaterHeater-RedWine、SolarWaterHeater-Milk
     *
     * @return
     */
    @Bean("SolarWaterHeater-RedWine")
    public Queue bootDirectQueue1() {
        return QueueBuilder.durable("SolarWaterHeater-RedWine").build();
    }

    @Bean("SolarWaterHeater-Milk")
    public Queue bootDirectQueue2() {
        return QueueBuilder.durable("SolarWaterHeater-Milk").build();
    }

    // direct类型交换机
    @Bean("routing-exchange")
    public Exchange bootDirectExchange() {
        /*
            第一种方式: 通过ExchangeBuilder构建交换机
                durable: 是否持久化
                autoDelete: 是否自动删除
                withArgument: 交换机其他参数
         */
//        return ExchangeBuilder.directExchange("boot_direct_exchange").durable(true).autoDelete().withArgument("key","val").build();

        /*
            第二种方式:
                参数1: 是否持久化(默认false)
                参数2: 是否自动删除(默认false)
                参数3: 其他参数
         */
        return new DirectExchange("routing-exchange", true, false, null);
    }

    /**
     * 交换机与队列进行绑定
     */
    @Bean
    public Binding bindDirect1() {
        /*
            第一种方式:
                bind(Queue): 需要绑定的queue
                to(Exchange): 需要绑定到哪个交换机
                with(String): routing key
                noargs(): 进行构建
         */
//        return BindingBuilder.bind(bootDirectQueue1).to(bootDirectExchange).with("article").noargs();

        /*
            第一种方式:
                参数1: 绑定的队列
                参数2: 绑定的类型 Binding.DestinationType.QUEUE: 绑定的类型为queue(交换机不仅可以绑定queue还可以绑定exchange)
                参数3: 哪个交换机需要绑定
                参数4: routing key
                参数5: 其他参数
         */
        return new Binding("SolarWaterHeater-RedWine", Binding.DestinationType.QUEUE, "routing-exchange", "REDWINE", null);
    }

    /**
     * 交换机与队列进行绑定
     *
     * @return
     */
    @Bean
    public Binding bindDirect2() {
        Queue bootDirectQueue2 = bootDirectQueue2();
        Exchange bootDirectExchange = bootDirectExchange();
        return BindingBuilder.bind(bootDirectQueue2).to(bootDirectExchange).with("MILK").noargs();
    }

}

3. 整合案例

3.1 helloword

生产者:


@RestController
@RequestMapping("/mq")
@Slf4j
public class WaterProducerController {

    @Resource
    private RabbitTemplate rabbitTemplate;

    @PostMapping("/send")
    public void send() {
        try {
            for (int i = 1; i <= 50; i++) {
                String msg = i + "升";
                rabbitTemplate.convertAndSend("SolarWaterHeater", msg);
                Thread.sleep(1000);
                log.info("水龙头放水成功!" + i + "升");
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

}

消费者:

@Slf4j
@Component
public class XMShowerConsumer {

    @RabbitHandler
    @RabbitListener(queuesToDeclare = @Queue("SolarWaterHeater"))
    public void receiveBAmtInMessage(Message message, Channel channel) throws InterruptedException {
        Thread.sleep(1000);
        log.info("小明洗澡用水: " + new String(message.getBody()));
    }

}

3.2 work

平均模式:

生产者:


@RestController
@RequestMapping("/mqAverage")
@Slf4j
public class WaterProducerAverageController {

    @Resource
    private RabbitTemplate rabbitTemplate;

    @PostMapping("/send")
    public void send() {
        try {
            for (int i = 1; i <= 50; i++) {
                String msg = i + "升";
                rabbitTemplate.convertAndSend("SolarWaterHeater-Average", msg);
                Thread.sleep(100);
                log.info("水龙头放水成功!" + i + "升");
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

}

消费者:


/**
 * @author rabbit
 * @version 1.0.0
 * @Description 平均模式
 * 需要配置 :
 *  * spring.rabbitmq.listener.simple.acknowledge-mode = auto
 * @createTime 2022/08/27 20:22:00
 */
@Slf4j
@Component
public class AverageShowerConsumer {

    @RabbitHandler
    @RabbitListener(queuesToDeclare = @Queue("SolarWaterHeater-Average"))
    public void receiveXMInMessage(Message message, Channel channel) throws InterruptedException {
        Thread.sleep(300);
        log.info("小明洗澡用水: " + new String(message.getBody()));
    }

    @RabbitHandler
    @RabbitListener(queuesToDeclare = @Queue("SolarWaterHeater-Average"))
    public void receiveXLInMessage(Message message, Channel channel) throws InterruptedException {
        Thread.sleep(1000);
        log.info("小丽洗澡用水: " + new String(message.getBody()));
    }

}

公平模式:

生产者:


@RestController
@RequestMapping("/mqFair")
@Slf4j
public class WaterProducerFairController {

    @Resource
    private RabbitTemplate rabbitTemplate;

    @PostMapping("/send")
    public void send() {
        try {
            for (int i = 1; i <= 50; i++) {
                String msg = i + "升";
                rabbitTemplate.convertAndSend("SolarWaterHeater-Fair", msg);
                Thread.sleep(100);
                log.info("水龙头放水成功!" + i + "升");
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

}

消费者:


/**
 * @author rabbit
 * @version 1.0.0
 * @Description 公平模式
 * 需要配置 :
 * spring.rabbitmq.listener.simple.acknowledge-mode = manual
 * spring.rabbitmq.listener.simple.prefetch = 1
 * 并 消费后进行手动确认
 * @createTime 2022/08/27 20:22:00
 */
@Slf4j
@Component
public class FairShowerConsumer {

    @RabbitHandler
    @RabbitListener(queuesToDeclare = @Queue("SolarWaterHeater-Fair"))
    public void receiveXMInMessage(Message message, Channel channel) throws InterruptedException {
        Thread.sleep(1000);
        log.info("小明洗澡用水: " + new String(message.getBody()));
        try {
            final long deliveryTag = message.getMessageProperties().getDeliveryTag();
            channel.basicAck(deliveryTag, false);
        } catch (IOException e) {
            log.error(" basicAck mq处理失败", e);
        }
    }

    @RabbitHandler
    @RabbitListener(queuesToDeclare = @Queue("SolarWaterHeater-Fair"))
    public void receiveXLInMessage(Message message, Channel channel) throws InterruptedException {
        Thread.sleep(500);
        log.info("小丽洗澡用水: " + new String(message.getBody()));
        try {
            final long deliveryTag = message.getMessageProperties().getDeliveryTag();
            channel.basicAck(deliveryTag, false);
        } catch (IOException e) {
            log.error(" basicAck mq处理失败", e);
        }
    }

}

3.3 pub/sub

生产者:


@RestController
@RequestMapping("/mqPublish")
@Slf4j
public class WaterPublishProducerController {

    @Resource
    private RabbitTemplate rabbitTemplate;

    @PostMapping("/send")
    public void send() {
        try {
            for (int i = 1; i <= 50; i++) {
                String msg = i + "升";
                //交换机名称
                String exchangeName = "publish-exchange";
                //路由key 由于我们实现的是fanout模式(广播模式),不需要路由key,所有的消费者都可以进行监听和消费
                String routeKey = "";
                //发送mq消息
                rabbitTemplate.convertAndSend(exchangeName, routeKey, msg);
                Thread.sleep(1000);
                log.info("水龙头放水成功!" + i + "升");
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

}

消费者:


/**
 * @author rabbit
 * @version 1.0.0
 * @Description 广播模式
 * 需要配置 :
 * * spring.rabbitmq.listener.simple.acknowledge-mode = auto
 * @createTime 2022/08/27 20:22:00
 */
@Slf4j
@Component
public class PublishShowerConsumer {

    @RabbitHandler
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue("SolarWaterHeaterXM"),
            exchange = @Exchange(value = "publish-exchange", type = ExchangeTypes.FANOUT, durable = "false")))
    public void receiveXMInMessage(Message message, Channel channel) throws InterruptedException {
        Thread.sleep(1000);
        log.info("小明洗澡用水: " + new String(message.getBody()));
    }

    @RabbitHandler
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue("SolarWaterHeaterXL"),
            exchange = @Exchange(value = "publish-exchange", type = ExchangeTypes.FANOUT, durable = "false")))
    public void receiveXLInMessage(Message message, Channel channel) throws InterruptedException {
        Thread.sleep(1000);
        log.info("小丽洗澡用水: " + new String(message.getBody()));
    }

}

3.4 routing

生产者:


@RestController
@RequestMapping("/mqRouting")
@Slf4j
public class WaterRoutingProducerController {

    @Resource
    private RabbitTemplate rabbitTemplate;

    @PostMapping("/send")
    public void send() {
        try {
            for (int i = 1; i <= 50; i++) {
                String msg1 = "牛奶" + i + "升";
                String msg2 = "红酒" + i + "升";
                //交换机名称
                String exchangeName = "routing-exchange";
                //发送mq消息
                rabbitTemplate.convertAndSend(exchangeName, "MILK", msg1);
                rabbitTemplate.convertAndSend(exchangeName, "REDWINE", msg2);
                Thread.sleep(1000);
                log.info("水龙头放牛奶和红酒成功!" + i + "升");
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

}

消费者:


/**
 * @author rabbit
 * @version 1.0.0
 * @Description 路由模式
 * @createTime 2022/08/27 20:22:00
 */
@Slf4j
@Component
public class RoutingShowerConsumer {

    @RabbitHandler
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue("SolarWaterHeater-RedWine"),
            key = "REDWINE",
            exchange = @Exchange(value = "routing-exchange", type = ExchangeTypes.DIRECT, durable = "false")))
    public void receiveXMInMessage(Message message, Channel channel) throws InterruptedException {
        Thread.sleep(1000);
        log.info("小明洗澡用: " + new String(message.getBody()));
    }

    @RabbitHandler
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue("SolarWaterHeater-Milk"),
            key = "MILK",
            exchange = @Exchange(value = "routing-exchange", type = ExchangeTypes.DIRECT, durable = "false")))
    public void receiveXLInMessage(Message message, Channel channel) throws InterruptedException {
        Thread.sleep(1000);
        log.info("小丽洗澡用: " + new String(message.getBody()));
    }

}

4. 总结

无论是使用java amqp client 还是 spring boot 无外乎还是做那几件事:

1.创建连接

2. 声明Exchange

3. 声明queue

4. 创建绑定关系

5. 发送

6. 消费

只不过是有的需要自己手动做,有的是Spring 帮我们做,有的是靠Configuration来做,有的是靠注解来做而已。

最后

以上就是想人陪大雁为你收集整理的RabbitMQ初步到精通-第九章-RabbitMQ整合SpringBoot第九章-RabbitMQ整合SpringBoot的全部内容,希望文章能够帮你解决RabbitMQ初步到精通-第九章-RabbitMQ整合SpringBoot第九章-RabbitMQ整合SpringBoot所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(46)

评论列表共有 0 条评论

立即
投稿
返回
顶部