概述
RabbitAdmin类可以很好的操作RabbitMQ, 在Spring 中 直接进行注入即可
@Bean
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
rabbitAdmin.setAutoStartup(true); // 服务启动时候开启自动启动
return rabbitAdmin;
}
注意: autoStartup必须要设置为true,否则Spring容器不会加载RabbitAdmin类
RabbitAdmin底层实现就是从Spring容器中获取Exchange,Bingding,Routing以及Queue的@Bean声明
然后使用RabbitTemplate的execute方法执行对应的声明,修改,删除等一系列RabbitMQ基础功能操作
initialize()方法
SpringAMQP 声明
使用SpringAMQP去声明,就需要使用SpringAMQP的如下模式,即声明@Bean方法
/**
* 针对消费者配置
* 1. 设置交换机类型
* 2. 将队列绑定到交换机
FanoutExchange: 将消息分发到所有的绑定队列,无routingkey的概念
HeadersExchange :通过添加属性key-value匹配
DirectExchange:按照routingkey分发到指定队列
TopicExchange:多关键字匹配
*/
@Bean
public TopicExchange exchange001() {
return new TopicExchange("topic001", true, false);
}
@Bean
public Queue queue001() {
return new Queue("queue001", true); //队列持久
}
@Bean
public Binding binding001() {
return BindingBuilder.bind(queue001()).to(exchange001()).with("spring.*");
}
RabbitTemplate 消息模板
- 我们在与SpringAMQP整合的时候进行发送消息的关键类
2.该类提供了丰富的发送消息方法,
包括可靠性投递消息方法,
回调监听消息接口ConfirmCallBack,
返回值确认接口ReturnCallback等等。
同样我们需要进行注入到Spring容器中,然后直接使用 。
@Test
public void testSendMessage() throws Exception {
//1 创建消息
MessageProperties messageProperties = new MessageProperties();
messageProperties.getHeaders().put("desc", "信息描述..");
messageProperties.getHeaders().put("type", "自定义消息类型..");
Message message = new Message("Hello RabbitMQ".getBytes(), messageProperties);
rabbitTemplate.convertAndSend("topic001", "spring.amqp", message, new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
System.err.println("------添加额外的设置---------");
message.getMessageProperties().getHeaders().put("desc", "额外修改的信息描述");
message.getMessageProperties().getHeaders().put("attr", "额外新加的属性");
return message;
}
});
}
@Test
public void testSendMessage2() throws Exception {
//1 创建消息
MessageProperties messageProperties = new MessageProperties();
messageProperties.setContentType("text/plain");
Message message = new Message("mq 消息1234".getBytes(), messageProperties);
rabbitTemplate.send("topic001", "spring.abc", message);
rabbitTemplate.convertAndSend("topic001", "spring.amqp", "hello object message send!");
rabbitTemplate.convertAndSend("topic002", "rabbit.abc", "hello object message send!");
}
SimpleMessageListenerContainer : 简单消息监听容器 ,热形式的动态参数变更。
-
这个类也是非常的强大,我们可以对他进行很多设置,对于消费者的配置项,这个类都可以满足
-
监听队列(多个队列),自动启动,自动声明
-
设置事务特性,事务管理器,事务属性,事务容量(并发),是否开启事务,回滚消息等
-
设置消费者数量,这是最小最大数量,批量消费
-
设置消息确认和自动确认模式,是否重回队列,异常捕获handler函数
-
设置消费者标签生成策略,是否独占模式,消费者属性等等
-
设置具体的监听器,消息转换器等等
注意: SimpleMessageListenerContainer 可以进行动态设置,比如在运行中的应用动态的修改其消费者的大小,接收消息的模式等等
很多基于RabbitMQ的自制定化后端管控台在进行动态设置的时候,也是根据这一个特性去实现的,所以可以看出SpringAMQP是非常的强大。
@Bean
public SimpleMessageListenerContainer messageContainer(ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
container.setQueues(queue001(), queue002(), queue003(), queue_image(), queue_pdf());
container.setConcurrentConsumers(1);
container.setMaxConcurrentConsumers(5);
container.setDefaultRequeueRejected(false);
container.setAcknowledgeMode(AcknowledgeMode.AUTO);
container.setExposeListenerChannel(true);
container.setConsumerTagStrategy(new ConsumerTagStrategy() {
@Override
public String createConsumerTag(String queue) {
return queue + "_" + UUID.randomUUID().toString();
}
});
/**
container.setMessageListener(new ChannelAwareMessageListener() {
@Override
public void onMessage(Message message, Channel channel) throws Exception {
String msg = new String(message.getBody());
System.err.println("----------消费者: " + msg);
}
});
*/
/**
* 1 适配器方式. 默认是有自己的方法名字的:handleMessage
// 可以自己指定一个方法的名字: consumeMessage
// 也可以添加一个转换器: 从字节数组转换为String
MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
adapter.setDefaultListenerMethod("consumeMessage");
adapter.setMessageConverter(new TextMessageConverter());
container.setMessageListener(adapter);
*/
/**
* 2 适配器方式: 我们的队列名称 和 方法名称 也可以进行一一的匹配
*
MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
adapter.setMessageConverter(new TextMessageConverter());
Map<String, String> queueOrTagToMethodName = new HashMap<>();
queueOrTagToMethodName.put("queue001", "method1");
queueOrTagToMethodName.put("queue002", "method2");
adapter.setQueueOrTagToMethodName(queueOrTagToMethodName);
container.setMessageListener(adapter);
*/
// 1.1 支持json格式的转换器
/**
MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
adapter.setDefaultListenerMethod("consumeMessage");
Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
adapter.setMessageConverter(jackson2JsonMessageConverter);
container.setMessageListener(adapter);
*/
// 1.2 DefaultJackson2JavaTypeMapper & Jackson2JsonMessageConverter 支持java对象转换
/**
MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
adapter.setDefaultListenerMethod("consumeMessage");
Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
DefaultJackson2JavaTypeMapper javaTypeMapper = new DefaultJackson2JavaTypeMapper();
jackson2JsonMessageConverter.setJavaTypeMapper(javaTypeMapper);
adapter.setMessageConverter(jackson2JsonMessageConverter);
container.setMessageListener(adapter);
*/
//1.3 DefaultJackson2JavaTypeMapper & Jackson2JsonMessageConverter 支持java对象多映射转换
/**
MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
adapter.setDefaultListenerMethod("consumeMessage");
Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
DefaultJackson2JavaTypeMapper javaTypeMapper = new DefaultJackson2JavaTypeMapper();
Map<String, Class<?>> idClassMapping = new HashMap<String, Class<?>>();
idClassMapping.put("order", com.bfxy.spring.entity.Order.class);
idClassMapping.put("packaged", com.bfxy.spring.entity.Packaged.class);
javaTypeMapper.setIdClassMapping(idClassMapping);
jackson2JsonMessageConverter.setJavaTypeMapper(javaTypeMapper);
adapter.setMessageConverter(jackson2JsonMessageConverter);
container.setMessageListener(adapter);
*/
//1.4 ext convert
MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
adapter.setDefaultListenerMethod("consumeMessage");
//全局的转换器:
ContentTypeDelegatingMessageConverter convert = new ContentTypeDelegatingMessageConverter();
TextMessageConverter textConvert = new TextMessageConverter();
convert.addDelegate("text", textConvert);
convert.addDelegate("html/text", textConvert);
convert.addDelegate("xml/text", textConvert);
convert.addDelegate("text/plain", textConvert);
Jackson2JsonMessageConverter jsonConvert = new Jackson2JsonMessageConverter();
convert.addDelegate("json", jsonConvert);
convert.addDelegate("application/json", jsonConvert);
ImageMessageConverter imageConverter = new ImageMessageConverter();
convert.addDelegate("image/png", imageConverter);
convert.addDelegate("image", imageConverter);
PDFMessageConverter pdfConverter = new PDFMessageConverter();
convert.addDelegate("application/pdf", pdfConverter);
adapter.setMessageConverter(convert);
container.setMessageListener(adapter);
return container;
}
MessageListenerAdapter 消息监听适配器
可以看到 MessageListenerAdapter 的源码的 ORIGINAL_DEFAULT_LISTENER_METHOD 变量,通过反射获取
public static final String ORIGINAL_DEFAULT_LISTENER_METHOD = "handleMessage";
public class MessageListenerAdapter extends AbstractAdaptableMessageListener {
private final Map<String, String> queueOrTagToMethodName = new HashMap<String, String>();
/**
* Out-of-the-box value for the default listener method: "handleMessage".
*/
public static final String ORIGINAL_DEFAULT_LISTENER_METHOD = "handleMessage";
private Object delegate;
private String defaultListenerMethod = ORIGINAL_DEFAULT_LISTENER_METHOD;
MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
adapter.setDefaultListenerMethod("consumeMessage");
adapter.setMessageConverter(new TextMessageConverter());
container.setMessageListener(adapter);
MessageDelegate
handleMessage 方法
package com.bfxy.spring.adapter;
import java.io.File;
import java.util.Map;
import com.bfxy.spring.entity.Order;
import com.bfxy.spring.entity.Packaged;
public class MessageDelegate {
public void handleMessage(byte[] messageBody) {
System.err.println("默认方法, 消息内容:" + new String(messageBody));
}
public void consumeMessage(byte[] messageBody) {
System.err.println("字节数组方法, 消息内容:" + new String(messageBody));
}
public void consumeMessage(String messageBody) {
System.err.println("字符串方法, 消息内容:" + messageBody);
}
public void method1(String messageBody) {
System.err.println("method1 收到消息内容:" + new String(messageBody));
}
public void method2(String messageBody) {
System.err.println("method2 收到消息内容:" + new String(messageBody));
}
public void consumeMessage(Map messageBody) {
System.err.println("map方法, 消息内容:" + messageBody);
}
public void consumeMessage(Order order) {
System.err.println("order对象, 消息内容, id: " + order.getId() +
", name: " + order.getName() +
", content: "+ order.getContent());
}
public void consumeMessage(Packaged pack) {
System.err.println("package对象, 消息内容, id: " + pack.getId() +
", name: " + pack.getName() +
", content: "+ pack.getDescription());
}
public void consumeMessage(File file) {
System.err.println("文件对象 方法, 消息内容:" + file.getName());
}
}
最后
以上就是清秀热狗为你收集整理的RabbitAdmin 实战的全部内容,希望文章能够帮你解决RabbitAdmin 实战所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复