我是靠谱客的博主 迅速画笔,最近开发中收集的这篇文章主要介绍RabbitMQ的通过Spring AMQP实现的五种消息模式,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述


1.简单模式

Spring AMQP实现
首先需要在pom.xml中添加Spring AMQP的相关依赖;

<!--Spring AMQP依赖-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

然后修改application.yml,添加RabbitMQ的相关配置;

spring:
rabbitmq:
host: localhost
port: 5672
virtual-host: /mall
username: mall
password: mall
publisher-confirms: true #消息发送到交换器确认
publisher-returns: true #消息发送到队列确认

添加简单模式相关Java配置,创建一个名为simple.hello的队列、一个生产者和一个消费者;

/**
* Created by macro on 2020/5/19.
*/
@Configuration
public class SimpleRabbitConfig {
@Bean
public Queue hello() {
return new Queue("simple.hello");
}
@Bean
public SimpleSender simpleSender(){
return new SimpleSender();
}
@Bean
public SimpleReceiver simpleReceiver(){
return new SimpleReceiver();
}
}

生产者通过send方法向队列simple.hello中发送消息;

/**
* Created by macro on 2020/5/19.
*/
public class SimpleSender {
private static final Logger LOGGER = LoggerFactory.getLogger(SimpleSender.class);
@Autowired
private RabbitTemplate template;
private static final String queueName="simple.hello";
public void send() {
String message = "Hello World!";
this.template.convertAndSend(queueName, message);
LOGGER.info(" [x] Sent '{}'", message);
}
}

消费者从队列simple.hello中获取消息;

/**
* Created by macro on 2020/5/19.
*/
@RabbitListener(queues = "simple.hello")
public class SimpleReceiver {
private static final Logger LOGGER = LoggerFactory.getLogger(SimpleReceiver.class);
@RabbitHandler
public void receive(String in) {
LOGGER.info(" [x] Received '{}'", in);
}
}

在controller中添加测试接口,调用该接口开始发送消息;

/**
* Created by macro on 2020/5/19.
*/
@Api(tags = "RabbitController", description = "RabbitMQ功能测试")
@Controller
@RequestMapping("/rabbit")
public class RabbitController {
@Autowired
private SimpleSender simpleSender;
@ApiOperation("简单模式")
@RequestMapping(value = "/simple", method = RequestMethod.GET)
@ResponseBody
public CommonResult simpleTest() {
for(int i=0;i<10;i++){
simpleSender.send();
ThreadUtil.sleep(1000);
}
return CommonResult.success(null);
}
}

2.工厂模式

添加工作模式相关Java配置,创建一个名为work.hello的队列、一个生产者和两个消费者;

/**
* Created by macro on 2020/5/19.
*/
@Configuration
public class WorkRabbitConfig {
@Bean
public Queue workQueue() {
return new Queue("work.hello");
}
@Bean
public WorkReceiver workReceiver1() {
return new WorkReceiver(1);
}
@Bean
public WorkReceiver workReceiver2() {
return new WorkReceiver(2);
}
@Bean
public WorkSender workSender() {
return new WorkSender();
}
}

生产者通过send方法向队列work.hello中发送消息,消息中包含一定数量的.号;

/**
* Created by macro on 2020/5/19.
*/
public class WorkSender {
private static final Logger LOGGER = LoggerFactory.getLogger(WorkSender.class);
@Autowired
private RabbitTemplate template;
private static final String queueName = "work.hello";
public void send(int index) {
StringBuilder builder = new StringBuilder("Hello");
int limitIndex = index % 3+1;
for (int i = 0; i < limitIndex; i++) {
builder.append('.');
}
builder.append(index+1);
String message = builder.toString();
template.convertAndSend(queueName, message);
LOGGER.info(" [x] Sent '{}'", message);
}
}

两个消费者从队列work.hello中获取消息,名称分别为instance 1和instance 2,消息中包含.号越多,耗时越长;

/**
* Created by macro on 2020/5/19.
*/
@RabbitListener(queues = "work.hello")
public class WorkReceiver {
private static final Logger LOGGER = LoggerFactory.getLogger(WorkReceiver.class);
private final int instance;
public WorkReceiver(int i) {
this.instance = i;
}
@RabbitHandler
public void receive(String in) {
StopWatch watch = new StopWatch();
watch.start();
LOGGER.info("instance {} [x] Received '{}'", this.instance, in);
doWork(in);
watch.stop();
LOGGER.info("instance {} [x] Done in {}s", this.instance, watch.getTotalTimeSeconds());
}
private void doWork(String in) {
for (char ch : in.toCharArray()) {
if (ch == '.') {
ThreadUtil.sleep(1000);
}
}
}
}

在controller中添加测试接口,调用该接口开始发送消息;

/**
* Created by macro on 2020/5/19.
*/
@Api(tags = "RabbitController", description = "RabbitMQ功能测试")
@Controller
@RequestMapping("/rabbit")
public class RabbitController {
@Autowired
private WorkSender workSender;
@ApiOperation("工作模式")
@RequestMapping(value = "/work", method = RequestMethod.GET)
@ResponseBody
public CommonResult workTest() {
for(int i=0;i<10;i++){
workSender.send(i);
ThreadUtil.sleep(1000);
}
return CommonResult.success(null);
}
}

3.发布/订阅模式


添加发布/订阅模式相关Java配置,创建一个名为exchange.fanout的交换机、一个生产者、两个消费者和两个匿名队列,将两个匿名队列都绑定到交换机;

/**
* Created by macro on 2020/5/19.
*/
@Configuration
public class FanoutRabbitConfig {
@Bean
public FanoutExchange fanout() {
return new FanoutExchange("exchange.fanout");
}
@Bean
public Queue fanoutQueue1() {
return new AnonymousQueue();
}
@Bean
public Queue fanoutQueue2() {
return new AnonymousQueue();
}
@Bean
public Binding fanoutBinding1(FanoutExchange fanout, Queue fanoutQueue1) {
return BindingBuilder.bind(fanoutQueue1).to(fanout);
}
@Bean
public Binding fanoutBinding2(FanoutExchange fanout, Queue fanoutQueue2) {
return BindingBuilder.bind(fanoutQueue2).to(fanout);
}
@Bean
public FanoutReceiver fanoutReceiver() {
return new FanoutReceiver();
}
@Bean
public FanoutSender fanoutSender() {
return new FanoutSender();
}
}

生产者通过send方法向交换机exchange.fanout中发送消息,消息中包含一定数量的.号;

/**
* Created by macro on 2020/5/19.
*/
public class FanoutSender {
private static final Logger LOGGER = LoggerFactory.getLogger(FanoutSender.class);
@Autowired
private RabbitTemplate template;
private static final String exchangeName = "exchange.fanout";
public void send(int index) {
StringBuilder builder = new StringBuilder("Hello");
int limitIndex = index % 3 + 1;
for (int i = 0; i < limitIndex; i++) {
builder.append('.');
}
builder.append(index + 1);
String message = builder.toString();
template.convertAndSend(exchangeName, "", message);
LOGGER.info(" [x] Sent '{}'", message);
}
}

消费者从绑定的匿名队列中获取消息,消息中包含.号越多,耗时越长,由于该消费者可以从两个队列中获取并消费消息,可以看做两个消费者,名称分别为instance 1和instance 2;

/**
* Created by macro on 2020/5/19.
*/
public class FanoutReceiver {
private static final Logger LOGGER = LoggerFactory.getLogger(FanoutReceiver.class);
@RabbitListener(queues = "#{fanoutQueue1.name}")
public void receive1(String in) {
receive(in, 1);
}
@RabbitListener(queues = "#{fanoutQueue2.name}")
public void receive2(String in) {
receive(in, 2);
}
private void receive(String in, int receiver) {
StopWatch watch = new StopWatch();
watch.start();
LOGGER.info("instance {} [x] Received '{}'", receiver, in);
doWork(in);
watch.stop();
LOGGER.info("instance {} [x] Done in {}s", receiver, watch.getTotalTimeSeconds());
}
private void doWork(String in) {
for (char ch : in.toCharArray()) {
if (ch == '.') {
ThreadUtil.sleep(1000);
}
}
}
}

在controller中添加测试接口,调用该接口开始发送消息;

/**
* Created by macro on 2020/5/19.
*/
@Api(tags = "RabbitController", description = "RabbitMQ功能测试")
@Controller
@RequestMapping("/rabbit")
public class RabbitController {
@Autowired
private FanoutSender fanoutSender;
@ApiOperation("发布/订阅模式")
@RequestMapping(value = "/fanout", method = RequestMethod.GET)
@ResponseBody
public CommonResult fanoutTest() {
for(int i=0;i<10;i++){
fanoutSender.send(i);
ThreadUtil.sleep(1000);
}
return CommonResult.success(null);
}
}

4.路由模式


添加路由模式相关Java配置,创建一个名为exchange.direct的交换机、一个生产者、两个消费者和两个匿名队列,队列通过路由键都绑定到交换机,队列1的路由键为orange和black,队列2的路由键为green和black;

/**
* Created by macro on 2020/5/19.
*/
@Configuration
public class DirectRabbitConfig {
@Bean
public DirectExchange direct() {
return new DirectExchange("exchange.direct");
}
@Bean
public Queue directQueue1() {
return new AnonymousQueue();
}
@Bean
public Queue directQueue2() {
return new AnonymousQueue();
}
@Bean
public Binding directBinding1a(DirectExchange direct, Queue directQueue1) {
return BindingBuilder.bind(directQueue1).to(direct).with("orange");
}
@Bean
public Binding directBinding1b(DirectExchange direct, Queue directQueue1) {
return BindingBuilder.bind(directQueue1).to(direct).with("black");
}
@Bean
public Binding directBinding2a(DirectExchange direct, Queue directQueue2) {
return BindingBuilder.bind(directQueue2).to(direct).with("green");
}
@Bean
public Binding directBinding2b(DirectExchange direct, Queue directQueue2) {
return BindingBuilder.bind(directQueue2).to(direct).with("black");
}
@Bean
public DirectReceiver receiver() {
return new DirectReceiver();
}
@Bean
public DirectSender directSender() {
return new DirectSender();
}
}

生产者通过send方法向交换机exchange.direct中发送消息,发送时使用不同的路由键,根据路由键会被转发到不同的队列;

/**
* Created by macro on 2020/5/19.
*/
public class DirectSender {
@Autowired
private RabbitTemplate template;
private static final String exchangeName = "exchange.direct";
private final String[] keys = {"orange", "black", "green"};
private static final Logger LOGGER = LoggerFactory.getLogger(DirectSender.class);
public void send(int index) {
StringBuilder builder = new StringBuilder("Hello to ");
int limitIndex = index % 3;
String key = keys[limitIndex];
builder.append(key).append(' ');
builder.append(index+1);
String message = builder.toString();
template.convertAndSend(exchangeName, key, message);
LOGGER.info(" [x] Sent '{}'", message);
}
}

消费者从自己绑定的匿名队列中获取消息,由于该消费者可以从两个队列中获取并消费消息,可以看做两个消费者,名称分别为instance 1和instance 2;

/**
* Created by macro on 2020/5/19.
*/
public class DirectReceiver {
private static final Logger LOGGER = LoggerFactory.getLogger(DirectReceiver.class);
@RabbitListener(queues = "#{directQueue1.name}")
public void receive1(String in){
receive(in, 1);
}
@RabbitListener(queues = "#{directQueue2.name}")
public void receive2(String in){
receive(in, 2);
}
private void receive(String in, int receiver){
StopWatch watch = new StopWatch();
watch.start();
LOGGER.info("instance {} [x] Received '{}'", receiver, in);
doWork(in);
watch.stop();
LOGGER.info("instance {} [x] Done in {}s", receiver, watch.getTotalTimeSeconds());
}
private void doWork(String in){
for (char ch : in.toCharArray()) {
if (ch == '.') {
ThreadUtil.sleep(1000);
}
}
}
}

在controller中添加测试接口,调用该接口开始发送消息;

/**
* Created by macro on 2020/5/19.
*/
@Api(tags = "RabbitController", description = "RabbitMQ功能测试")
@Controller
@RequestMapping("/rabbit")
public class RabbitController {
@Autowired
private DirectSender directSender;
@ApiOperation("路由模式")
@RequestMapping(value = "/direct", method = RequestMethod.GET)
@ResponseBody
public CommonResult directTest() {
for(int i=0;i<10;i++){
directSender.send(i);
ThreadUtil.sleep(1000);
}
return CommonResult.success(null);
}
}

5.通配符模式


添加通配符模式相关Java配置,创建一个名为exchange.topic的交换机、一个生产者、两个消费者和两个匿名队列,匹配*.orange..*.rabbit发送到队列1,匹配lazy.#发送到队列2;

/**
* Created by macro on 2020/5/19.
*/
@Configuration
public class TopicRabbitConfig {
@Bean
public TopicExchange topic() {
return new TopicExchange("exchange.topic");
}
@Bean
public Queue topicQueue1() {
return new AnonymousQueue();
}
@Bean
public Queue topicQueue2() {
return new AnonymousQueue();
}
@Bean
public Binding topicBinding1a(TopicExchange topic, Queue topicQueue1) {
return BindingBuilder.bind(topicQueue1).to(topic).with("*.orange.*");
}
@Bean
public Binding topicBinding1b(TopicExchange topic, Queue topicQueue1) {
return BindingBuilder.bind(topicQueue1).to(topic).with("*.*.rabbit");
}
@Bean
public Binding topicBinding2a(TopicExchange topic, Queue topicQueue2) {
return BindingBuilder.bind(topicQueue2).to(topic).with("lazy.#");
}
@Bean
public TopicReceiver topicReceiver() {
return new TopicReceiver();
}
@Bean
public TopicSender topicSender() {
return new TopicSender();
}
}

生产者通过send方法向交换机exchange.topic中发送消息,消息中包含不同的路由键;

/**
* Created by macro on 2020/5/19.
*/
public class TopicSender {
@Autowired
private RabbitTemplate template;
private static final String exchangeName = "exchange.topic";
private static final Logger LOGGER = LoggerFactory.getLogger(TopicSender.class);
private final String[] keys = {"quick.orange.rabbit", "lazy.orange.elephant", "quick.orange.fox",
"lazy.brown.fox", "lazy.pink.rabbit", "quick.brown.fox"};
public void send(int index) {
StringBuilder builder = new StringBuilder("Hello to ");
int limitIndex = index%keys.length;
String key = keys[limitIndex];
builder.append(key).append(' ');
builder.append(index+1);
String message = builder.toString();
template.convertAndSend(exchangeName, key, message);
LOGGER.info(" [x] Sent '{}'",message);
System.out.println(" [x] Sent '" + message + "'");
}
}

消费者从自己绑定的匿名队列中获取消息,由于该消费者可以从两个队列中获取并消费消息,可以看做两个消费者,名称分别为instance 1和instance 2;

/**
* Created by macro on 2020/5/19.
*/
public class TopicReceiver {
private static final Logger LOGGER = LoggerFactory.getLogger(TopicReceiver.class);
@RabbitListener(queues = "#{topicQueue1.name}")
public void receive1(String in){
receive(in, 1);
}
@RabbitListener(queues = "#{topicQueue2.name}")
public void receive2(String in){
receive(in, 2);
}
public void receive(String in, int receiver){
StopWatch watch = new StopWatch();
watch.start();
LOGGER.info("instance {} [x] Received '{}'", receiver, in);
doWork(in);
watch.stop();
LOGGER.info("instance {} [x] Done in {}s", receiver, watch.getTotalTimeSeconds());
}
private void doWork(String in){
for (char ch : in.toCharArray()) {
if (ch == '.') {
ThreadUtil.sleep(1000);
}
}
}
}

在controller中添加测试接口,调用该接口开始发送消息;

/**
* Created by macro on 2020/5/19.
*/
@Api(tags = "RabbitController", description = "RabbitMQ功能测试")
@Controller
@RequestMapping("/rabbit")
public class RabbitController {
@Autowired
private TopicSender topicSender;
@ApiOperation("通配符模式")
@RequestMapping(value = "/topic", method = RequestMethod.GET)
@ResponseBody
public CommonResult topicTest() {
for(int i=0;i<10;i++){
topicSender.send(i);
ThreadUtil.sleep(1000);
}
return CommonResult.success(null);
}
}

最后

以上就是迅速画笔为你收集整理的RabbitMQ的通过Spring AMQP实现的五种消息模式的全部内容,希望文章能够帮你解决RabbitMQ的通过Spring AMQP实现的五种消息模式所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(57)

评论列表共有 0 条评论

立即
投稿
返回
顶部