概述
直接代码 项目结构 pom需要增加对RabbitM的支持
Pom文件如下
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.haibo</groupId> <artifactId>spring-rabbit-hello</artifactId> <version>0.0.1-SNAPSHOT</version> <packaging>jar</packaging> <name>spring-rabbit-hello</name> <description>Demo project for Spring Boot</description> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>1.5.3.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> <repositories> <repository> <id>nexus</id> <name>nexus</name> <url>http://localhost:8081/nexus/content/groups/public/</url> </repository> </repositories> </project>
package com.haibo; import org.springframework.amqp.core.*; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener; import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * rabbitmq 的配置类 * 赵海波 on 2017/6/6. */ @Configuration public class RabbitMQConfig10 { /** * 消息交换机的名字 */ public static final String EXCHANGE = "my-mq-exchange"; /** * 队列key1 */ public static final String ROUTINGKEY1 = "queue_one_key1"; /** * 队列key2 */ public static final String ROUTINGKEY2 = "queue_one_key2"; /** * 配置链接信息 * @return */ @Bean public ConnectionFactory connectionFactory() { CachingConnectionFactory connectionFactory = new CachingConnectionFactory(); connectionFactory.setAddresses("139.199.11.69:5672"); connectionFactory.setUsername("guest"); connectionFactory.setPassword("guest"); connectionFactory.setVirtualHost("/"); connectionFactory.setPublisherConfirms(true); // 必须要设置 return connectionFactory; } /** * 配置消息交换机 * 针对消费者配置 * FanoutExchange: 将消息分发到所有的绑定队列,无routingkey的概念 * HeadersExchange :通过添加属性key-value匹配 * DirectExchange:按照routingkey分发到指定队列 * TopicExchange:多关键字匹配 */ @Bean public DirectExchange defaultExchange() { return new DirectExchange(EXCHANGE, true, false); } /** * 配置消息队列1 * 针对消费者配置 * * @return */ @Bean public Queue queue() { return new Queue("queue_one", true); //队列持久 } /** * 将消息队列1与交换机绑定 * 针对消费者配置 * * @return */ @Bean public Binding binding() { return BindingBuilder.bind(queue()).to(defaultExchange()).with(RabbitMQConfig10.ROUTINGKEY1); } /** * 配置消息队列2 * 针对消费者配置 * * @return */ @Bean public Queue queue1() { return new Queue("queue_one1", true); //队列持久 } /** * 将消息队列2与交换机绑定 * 针对消费者配置 * * @return */ @Bean public Binding binding1() { return BindingBuilder.bind(queue1()).to(defaultExchange()).with(RabbitMQConfig10.ROUTINGKEY2); } /** * 接受消息的监听,这个监听会接受消息队列1的消息 * 针对消费者配置 * * @return */ @Bean public SimpleMessageListenerContainer messageContainer() { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory()); container.setQueues(queue()); container.setExposeListenerChannel(true); container.setMaxConcurrentConsumers(1); container.setConcurrentConsumers(1); container.setAcknowledgeMode(AcknowledgeMode.MANUAL); //设置确认模式手工确认 container.setMessageListener(new ChannelAwareMessageListener() { public void onMessage(Message message, com.rabbitmq.client.Channel channel) throws Exception { byte[] body = message.getBody(); System.out.println("收到消息 : " + new String(body)); channel.basicAck(message.getMessageProperties().getDeliveryTag(), true); //确认消息成功消费 } }); return container; } /** * @return */ @Bean public SimpleMessageListenerContainer messageContainer2() { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory()); container.setQueues(queue1()); container.setExposeListenerChannel(true); container.setMaxConcurrentConsumers(1); container.setConcurrentConsumers(1); container.setAcknowledgeMode(AcknowledgeMode.MANUAL); //设置确认模式手工确认 container.setMessageListener(new ChannelAwareMessageListener() { public void onMessage(Message message, com.rabbitmq.client.Channel channel) throws Exception { byte[] body = message.getBody(); System.out.println("queue1 收到消息 : " + new String(body)); channel.basicAck(message.getMessageProperties().getDeliveryTag(), true); //确认消息成功消费 } }); return container; } @Bean public SimpleMessageListenerContainer messageContainer3() { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory()); container.setQueues(queue1()); container.setExposeListenerChannel(true); container.setMaxConcurrentConsumers(1); container.setConcurrentConsumers(1); container.setAcknowledgeMode(AcknowledgeMode.MANUAL); //设置确认模式手工确认 container.setMessageListener(new ChannelAwareMessageListener() { public void onMessage(Message message, com.rabbitmq.client.Channel channel) throws Exception { byte[] body = message.getBody(); System.out.println("========queue2 收到消息 : " + new String(body)); channel.basicAck(message.getMessageProperties().getDeliveryTag(), true); //确认消息成功消费 } }); return container; } public static void main(String[] args) { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(new RabbitMQConfig10().connectionFactory()); container.setQueues(new RabbitMQConfig10().queue1()); container.setExposeListenerChannel(true); container.setMaxConcurrentConsumers(1); container.setConcurrentConsumers(1); container.setAcknowledgeMode(AcknowledgeMode.MANUAL); //设置确认模式手工确认 container.setMessageListener(new ChannelAwareMessageListener() { public void onMessage(Message message, com.rabbitmq.client.Channel channel) throws Exception { byte[] body = message.getBody(); System.out.println("queue1 收到消息 : " + new String(body)); channel.basicAck(message.getMessageProperties().getDeliveryTag(), true); //确认消息成功消费 } }); } }
生产者类如下
package com.haibo; import java.util.UUID; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.support.CorrelationData; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; /** * 测试RabbitMQ发送消息的Controller */ @RestController public class SendController10 implements RabbitTemplate.ConfirmCallback{ private RabbitTemplate rabbitTemplate; /** * 配置发送消息的rabbitTemplate,因为是构造方法,所以不用注解Spring也会自动注入(应该是新版本的特性) * @param rabbitTemplate */ public SendController10(RabbitTemplate rabbitTemplate){ this.rabbitTemplate = rabbitTemplate; //设置消费回调 this.rabbitTemplate.setConfirmCallback(this); } /** * 向消息队列1中发送消息 * @param msg * @return */ @RequestMapping("send1") public String send1(String msg){ String uuid = UUID.randomUUID().toString(); CorrelationData correlationId = new CorrelationData(uuid); rabbitTemplate.convertAndSend(RabbitMQConfig10.EXCHANGE, RabbitMQConfig10.ROUTINGKEY1, msg, correlationId); return null; } /** * 向消息队列2中发送消息 * @param msg * @return */ @RequestMapping("send2") public String send2(String msg){ String uuid = UUID.randomUUID().toString(); CorrelationData correlationId = new CorrelationData(uuid); rabbitTemplate.convertAndSend(RabbitMQConfig10.EXCHANGE, RabbitMQConfig10.ROUTINGKEY2, msg, correlationId); return null; } /** * 消息的回调,主要是实现RabbitTemplate.ConfirmCallback接口 * 注意,消息回调只能代表成功消息发送到RabbitMQ服务器,不能代表消息被成功处理和接受 */ public void confirm(CorrelationData correlationData, boolean ack, String cause) { System.out.println(" 回调id:" + correlationData); if (ack) { System.out.println("消息成功消费"); } else { System.out.println("消息消费失败:" + cause+"n重新发送"); } } }
测试如下
http://localhost:8000/send1?msg=123
http://localhost:8000/send2?msg=123
最后
以上就是正直铅笔为你收集整理的第5篇 RabbitMQ集成SpringBoot实现Direct模式的全部内容,希望文章能够帮你解决第5篇 RabbitMQ集成SpringBoot实现Direct模式所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复