我是靠谱客的博主 正直铅笔,最近开发中收集的这篇文章主要介绍第5篇 RabbitMQ集成SpringBoot实现Direct模式,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述







直接代码  项目结构 pom需要增加对RabbitM的支持



Pom文件如下

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.haibo</groupId>
<artifactId>spring-rabbit-hello</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>spring-rabbit-hello</name>
<description>Demo project for Spring Boot</description>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.5.3.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->

</parent>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
<repositories>
<repository>
<id>nexus</id>
<name>nexus</name>
<url>http://localhost:8081/nexus/content/groups/public/</url>
</repository>
</repositories>
</project>

package com.haibo;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
 * rabbitmq 的配置类
 * 赵海波 on 2017/6/6.
 */
@Configuration
public class RabbitMQConfig10 {
/**

* 消息交换机的名字

*/

public static final String EXCHANGE = "my-mq-exchange";
/**

* 队列key1

*/

public static final String ROUTINGKEY1 = "queue_one_key1";
/**

* 队列key2

*/

public static final String ROUTINGKEY2 = "queue_one_key2";
/**

* 配置链接信息
* @return

*/

@Bean

public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setAddresses("139.199.11.69:5672");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
connectionFactory.setVirtualHost("/");
connectionFactory.setPublisherConfirms(true); // 必须要设置

return connectionFactory;
}
/**

* 配置消息交换机

* 针对消费者配置

* FanoutExchange: 将消息分发到所有的绑定队列,无routingkey的概念

* HeadersExchange :通过添加属性key-value匹配

* DirectExchange:按照routingkey分发到指定队列

* TopicExchange:多关键字匹配

*/

@Bean

public DirectExchange defaultExchange() {
return new DirectExchange(EXCHANGE, true, false);
}
/**

* 配置消息队列1

* 针对消费者配置

*

* @return

*/

@Bean

public Queue queue() {
return new Queue("queue_one", true); //队列持久

}
/**

* 将消息队列1与交换机绑定

* 针对消费者配置

*

* @return

*/

@Bean

public Binding binding() {
return BindingBuilder.bind(queue()).to(defaultExchange()).with(RabbitMQConfig10.ROUTINGKEY1);
}
/**

* 配置消息队列2

* 针对消费者配置

*

* @return

*/

@Bean

public Queue queue1() {
return new Queue("queue_one1", true); //队列持久

}
/**

* 将消息队列2与交换机绑定

* 针对消费者配置

*

* @return

*/

@Bean

public Binding binding1() {
return BindingBuilder.bind(queue1()).to(defaultExchange()).with(RabbitMQConfig10.ROUTINGKEY2);
}
/**

* 接受消息的监听,这个监听会接受消息队列1的消息

* 针对消费者配置

*

* @return

*/

@Bean

public SimpleMessageListenerContainer messageContainer() {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory());
container.setQueues(queue());
container.setExposeListenerChannel(true);
container.setMaxConcurrentConsumers(1);
container.setConcurrentConsumers(1);
container.setAcknowledgeMode(AcknowledgeMode.MANUAL); //设置确认模式手工确认

container.setMessageListener(new ChannelAwareMessageListener() {
public void onMessage(Message message, com.rabbitmq.client.Channel channel) throws Exception {
byte[] body = message.getBody();
System.out.println("收到消息 : " + new String(body));
channel.basicAck(message.getMessageProperties().getDeliveryTag(), true); //确认消息成功消费

}
});
return container;
}
/**

* @return

*/

@Bean

public SimpleMessageListenerContainer messageContainer2() {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory());
container.setQueues(queue1());
container.setExposeListenerChannel(true);
container.setMaxConcurrentConsumers(1);
container.setConcurrentConsumers(1);
container.setAcknowledgeMode(AcknowledgeMode.MANUAL); //设置确认模式手工确认

container.setMessageListener(new ChannelAwareMessageListener() {
public void onMessage(Message message, com.rabbitmq.client.Channel channel) throws Exception {
byte[] body = message.getBody();
System.out.println("queue1 收到消息 : " + new String(body));
channel.basicAck(message.getMessageProperties().getDeliveryTag(), true); //确认消息成功消费

}
});
return container;
}
@Bean

public SimpleMessageListenerContainer messageContainer3() {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory());
container.setQueues(queue1());
container.setExposeListenerChannel(true);
container.setMaxConcurrentConsumers(1);
container.setConcurrentConsumers(1);
container.setAcknowledgeMode(AcknowledgeMode.MANUAL); //设置确认模式手工确认

container.setMessageListener(new ChannelAwareMessageListener() {
public void onMessage(Message message, com.rabbitmq.client.Channel channel) throws Exception {
byte[] body = message.getBody();
System.out.println("========queue2 收到消息 : " + new String(body));
channel.basicAck(message.getMessageProperties().getDeliveryTag(), true); //确认消息成功消费

}
});
return container;
}
public static void main(String[] args) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(new RabbitMQConfig10().connectionFactory());
container.setQueues(new RabbitMQConfig10().queue1());
container.setExposeListenerChannel(true);
container.setMaxConcurrentConsumers(1);
container.setConcurrentConsumers(1);
container.setAcknowledgeMode(AcknowledgeMode.MANUAL); //设置确认模式手工确认

container.setMessageListener(new ChannelAwareMessageListener() {
public void onMessage(Message message, com.rabbitmq.client.Channel channel) throws Exception {
byte[] body = message.getBody();
System.out.println("queue1 收到消息 : " + new String(body));
channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
//确认消息成功消费

}
});
}
}



生产者类如下




package com.haibo;
import java.util.UUID;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
 * 测试RabbitMQ发送消息的Controller
 */
@RestController
public class SendController10 implements RabbitTemplate.ConfirmCallback{
private RabbitTemplate rabbitTemplate;
/**

* 配置发送消息的rabbitTemplate,因为是构造方法,所以不用注解Spring也会自动注入(应该是新版本的特性)

* @param rabbitTemplate

*/

public SendController10(RabbitTemplate rabbitTemplate){
this.rabbitTemplate = rabbitTemplate;
//设置消费回调

this.rabbitTemplate.setConfirmCallback(this);
}
/**

* 向消息队列1中发送消息

* @param msg

* @return

*/

@RequestMapping("send1")
public String send1(String msg){
String uuid = UUID.randomUUID().toString();
CorrelationData correlationId = new CorrelationData(uuid);
rabbitTemplate.convertAndSend(RabbitMQConfig10.EXCHANGE, RabbitMQConfig10.ROUTINGKEY1, msg,
correlationId);
return null;
}
/**

* 向消息队列2中发送消息

* @param msg

* @return

*/

@RequestMapping("send2")
public String send2(String msg){
String uuid = UUID.randomUUID().toString();
CorrelationData correlationId = new CorrelationData(uuid);
rabbitTemplate.convertAndSend(RabbitMQConfig10.EXCHANGE, RabbitMQConfig10.ROUTINGKEY2, msg,
correlationId);
return null;
}
/**

* 消息的回调,主要是实现RabbitTemplate.ConfirmCallback接口

* 注意,消息回调只能代表成功消息发送到RabbitMQ服务器,不能代表消息被成功处理和接受

*/

public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.out.println(" 回调id:" + correlationData);
if (ack) {
System.out.println("消息成功消费");
} else {
System.out.println("消息消费失败:" + cause+"n重新发送");
}
}
}

测试如下 

http://localhost:8000/send1?msg=123

http://localhost:8000/send2?msg=123



最后

以上就是正直铅笔为你收集整理的第5篇 RabbitMQ集成SpringBoot实现Direct模式的全部内容,希望文章能够帮你解决第5篇 RabbitMQ集成SpringBoot实现Direct模式所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(45)

评论列表共有 0 条评论

立即
投稿
返回
顶部