概述
1、创建一个空springboot项目
2、添加依赖:
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<!--RabbitMQ 依赖-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.47</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<!--swagger-->
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger2</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger-ui</artifactId>
<version>3.0.0</version>
</dependency>
<!--RabbitMQ 测试依赖-->
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
3、修改配置文件(yaml)
rabbitmq:
virtual-host: root
host: 192.168.211.131
password: 123456
username: root
4、在生产者中创建config包、config中创建RabbitMQConfiguration类(用来创建队列、交换机和队列交换机的绑定)
package com.changan.producer.config;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMQConfiguration {
/**
* 创建一个名字为fanoutExchange的bean,类型是FanoutExchange
* @return
*/
// 声明 分裂队列交换机
@Bean("fanoutExchange")
public FanoutExchange fanoutExchange(){
return new FanoutExchange("F_Exchange");
}
// 声明 死信队列交换机
@Bean("directExchange")
public DirectExchange directExchange(){
return new DirectExchange("D_Exchange");
}
// 声明 主题队列交换机
@Bean("topicExchange")
public TopicExchange topicExchange(){
return new TopicExchange("T_Exchange");
}
// 声明 分裂队列
@Bean("fanoutQueue")
public Queue fanoutQueue(){
//持久化一个名字为F_Queue的队列
return QueueBuilder.durable("F_Queue").build();
}
// 声明 主题队列
@Bean("topicQueue")
public Queue topicQueue(){
//持久化一个名字为F_Queue的队列
return QueueBuilder.durable("T_Queue").build();
}
// 声明 死信队列
@Bean("directQueue")
public Queue directQueue(){
//持久化一个名字为F_Queue的队列
return QueueBuilder.durable("D_Queue").build();
}
/**
* 实现分裂交换机和分裂队列的绑定
* @return
*/
@Bean
public Binding queueFanoutBinding(@Qualifier("directQueue") Queue queueD,
@Qualifier("fanoutExchange") FanoutExchange fExchange){
return BindingBuilder.bind(queueD).to(fExchange);
}
/**
* 实现主题交换机和主题队列的绑定
* @return
*/
@Bean
public Binding queueTopBinding(@Qualifier("topicQueue") Queue queueT,
@Qualifier("topicExchange") TopicExchange tExchange){
return BindingBuilder.bind(queueT).to(tExchange).with("error");
}
/**
* 实现死信交换机和死信队列的绑定
* @return
*/
@Bean
public Binding queueDirectBinding(@Qualifier("topicQueue") Queue queueT,
@Qualifier("directExchange") DirectExchange dExchange){
return BindingBuilder.bind(queueT).to(dExchange).with("");
}
}
5、在controller控制层中发送消息到交换机
package com.changan.producer.controller;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@Slf4j
@RestController
public class MessageController {
@Autowired
private RabbitTemplate rabbitTemplate;
@RequestMapping("/sendMsg")
public void sendMsg()throws Exception{
rabbitTemplate.convertAndSend("T_Exchange", "error","发送了,这是一个错误的消息");
}
}
6、在消费者中创建TopicConsumerListener类,绑定对应的对内并接受交换机中的消息
package com.changan.consumer.Listener;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.Date;
@Slf4j
@Component
public class TopicConsumerListener {
@RabbitListener(queues = "T_Queue")
public void receiveD(Message message, Channel channel) throws IOException {
String msg = new String(message.getBody());
log.info("当前时间:{},收到分裂队列信息{}", new Date().toString(), msg);
}
}
7、运行生产者(发送信息)、消费者中的控制台就接收到了发来的信息
最后
以上就是超帅菠萝为你收集整理的创建MQ中的队列、交换机,并生产消息后消费的全部内容,希望文章能够帮你解决创建MQ中的队列、交换机,并生产消息后消费所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复