我是靠谱客的博主 超帅菠萝,最近开发中收集的这篇文章主要介绍创建MQ中的队列、交换机,并生产消息后消费,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

1、创建一个空springboot项目

2、添加依赖:

<dependencies>
   <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter</artifactId>
    </dependency>
    <!--RabbitMQ 依赖-->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>
    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
        <version>1.2.47</version>
    </dependency>
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
    </dependency>
    <!--swagger-->
    <dependency>
        <groupId>io.springfox</groupId>
        <artifactId>springfox-swagger2</artifactId>
        <version>3.0.0</version>
    </dependency>
    <dependency>
        <groupId>io.springfox</groupId>
        <artifactId>springfox-swagger-ui</artifactId>
        <version>3.0.0</version>
    </dependency>
    <!--RabbitMQ 测试依赖-->
    <dependency>
        <groupId>org.springframework.amqp</groupId>
        <artifactId>spring-rabbit-test</artifactId>
        <scope>test</scope>
    </dependency>
</dependencies>

3、修改配置文件(yaml)

rabbitmq:
    virtual-host: root
    host: 192.168.211.131
    password: 123456
    username: root

4、在生产者中创建config包、config中创建RabbitMQConfiguration类(用来创建队列、交换机和队列交换机的绑定)

package com.changan.producer.config;

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitMQConfiguration {

    /**
     * 创建一个名字为fanoutExchange的bean,类型是FanoutExchange
     * @return
     */
    // 声明 分裂队列交换机
    @Bean("fanoutExchange")
    public FanoutExchange fanoutExchange(){
        return new FanoutExchange("F_Exchange");
    }

    // 声明 死信队列交换机
    @Bean("directExchange")
    public DirectExchange directExchange(){
        return new DirectExchange("D_Exchange");
    }

    // 声明 主题队列交换机
    @Bean("topicExchange")
    public TopicExchange topicExchange(){
        return new TopicExchange("T_Exchange");
    }

    // 声明 分裂队列
    @Bean("fanoutQueue")
    public Queue fanoutQueue(){
        //持久化一个名字为F_Queue的队列
        return QueueBuilder.durable("F_Queue").build();
    }

    // 声明 主题队列
    @Bean("topicQueue")
    public Queue topicQueue(){
        //持久化一个名字为F_Queue的队列
        return QueueBuilder.durable("T_Queue").build();
    }

    // 声明 死信队列
    @Bean("directQueue")
    public Queue directQueue(){
        //持久化一个名字为F_Queue的队列
        return QueueBuilder.durable("D_Queue").build();
    }

    /**
     * 实现分裂交换机和分裂队列的绑定
     * @return
     */
    @Bean
    public Binding queueFanoutBinding(@Qualifier("directQueue") Queue queueD,
                                      @Qualifier("fanoutExchange") FanoutExchange fExchange){
        return BindingBuilder.bind(queueD).to(fExchange);
    }

    /**
     * 实现主题交换机和主题队列的绑定
     * @return
     */
    @Bean
    public Binding queueTopBinding(@Qualifier("topicQueue") Queue queueT,
                                      @Qualifier("topicExchange") TopicExchange tExchange){
        return BindingBuilder.bind(queueT).to(tExchange).with("error");
    }


    /**
     * 实现死信交换机和死信队列的绑定
     * @return
     */
    @Bean
    public Binding queueDirectBinding(@Qualifier("topicQueue") Queue queueT,
                                   @Qualifier("directExchange") DirectExchange dExchange){
        return BindingBuilder.bind(queueT).to(dExchange).with("");
    }
}

5、在controller控制层中发送消息到交换机

package com.changan.producer.controller;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@Slf4j
@RestController
public class MessageController {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @RequestMapping("/sendMsg")
    public void sendMsg()throws Exception{
        rabbitTemplate.convertAndSend("T_Exchange", "error","发送了,这是一个错误的消息");
    }
}

6、在消费者中创建TopicConsumerListener类,绑定对应的对内并接受交换机中的消息

package com.changan.consumer.Listener;

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.util.Date;


@Slf4j
@Component
public class TopicConsumerListener {

    @RabbitListener(queues = "T_Queue")
    public void receiveD(Message message, Channel channel) throws IOException {
        String msg = new String(message.getBody());
        log.info("当前时间:{},收到分裂队列信息{}", new Date().toString(), msg);
    }
}

7、运行生产者(发送信息)、消费者中的控制台就接收到了发来的信息

 

最后

以上就是超帅菠萝为你收集整理的创建MQ中的队列、交换机,并生产消息后消费的全部内容,希望文章能够帮你解决创建MQ中的队列、交换机,并生产消息后消费所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(39)

评论列表共有 0 条评论

立即
投稿
返回
顶部