Listener监听
Listener的yml配置参数形式如下:
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19listener: simple: prefetch: 1 # 一次拉取的数量 concurrency: 5 # 消费端的监听个数(即@RabbitListener开启几个线程去处理数据。) max-concurrency: 10 # 消费端的监听最大个数 acknowledge-mode: manual retry: multiplier: 1 max-attempts: 3 enabled: true direct: retry: enabled: true max-attempts: 3 acknowledge-mode: manual auto-startup: true type: simple
在消费端,配置prefetch和concurrency参数便可以实现消费端MQ并发处理消息,下面详细叙述下listener下的几个参数的意思
复制代码
1
2
3listener.type 表示监听的类型 主要有两种 simple和direct 对于的监听容器是 SimpleMessageListenerContainer和DirectMessageListenerContainer
复制代码
1
2
3
4
5listener.simple.prefetch: 每个customer会在MQ预取一些消息放入内存的LinkedBlockingQueue中进行消费,这个值越高,消息传递的越快, 但非顺序处理消息的风险更高。如果ack模式为none,则忽略。如有必要,将增加此值以匹配txSize或messagePerAck。 不过在有些情况下,尤其是处理速度比较慢的大消息,消息可能在内存中大量堆积,消耗大量内存;以及对于一些严格要求顺序的消息,prefetch的值应当设置为1。
复制代码
1
2
3
4
5
6
7
8acknowledge-mode:表示消费端收到消息后的确认方式。 有三种确认方式: 自动确认:acknowledge="none" 手动确认:acknowledge="manual" 根据异常情况确认:acknowledge="auto" 其中自动确认是指,当消息一旦被Consumer接收到,则自动确认收到,并将相应 message 从 RabbitMQ 的消息缓存中移除。但是在实际业务处理中,很可能消息接收到,业务处理出现异常,那么该消息就会丢失。 如果设置了手动确认方式,则需要在业务处理成功后,调用channel.basicAck(),手动签收,如果出现异常,则调用channel.basicNack()方法,让其自动重新发送消息。
复制代码
1
2
3
4
5concurrency: concurrency =1,即每个Listener容器将开启一个线程去处理消息。 可以在 @RabbitListener(concurrency = "3")直接配置当前监听器启动线程,如果在Listener配置了exclusive参数,即确定此容器中的单个customer是否具有对队列的独占访问权限。如果为true,则容器的并发性必须为1。 如果配置了exclusive=true,但是concurrency>1则会抛错
复制代码
1
2
3
4prefetch和concurrency 若一个消费者配置prefetch=10,concurrency=2,即会开启2个线程去消费消息,每个线程都会抓取10个线程到内存中(注意不是两个线程去共享内存中抓取的消息) max-concurrency:: 表示最大能启动的线程数
复制代码
1
2
3
4
5
6
7
8
9retry: 表示消息被消费者拒收后重试发送或者因为异常原因消息重试发送 multiplier:重试间隔乘法策略 max-attempts: 是最大重试次数,默认是三 enabled: 是否开启重试 initial-interval: 初始化重试次数间隔 max-interval 最大重试间隔 stateless:重试是无状态的还是有状态的。
复制代码
1
2
3autoStartup: 当autoStartup为false的时候,监听容器就不会自动启动,然后我们可以通过使用单个容器的ID,调用RabbitListenerEndpointRegistry类的getListenerContainer(String id)方法来获得对单个容器的引用,并执行strat方法,启动容器。
举一个例子:
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17@RabbitListener(bindings = { @QueueBinding( value = @Queue(value = "${rabbitmq.queue.routing.beijing}", durable = "true", autoDelete = "false"), exchange = @Exchange( value = "${rabbitmq.exchange.routing}", durable = "true", type = ExchangeTypes.TOPIC), key = "china.#")}, concurrency = "3", exclusive = false,id = "autoStart",autoStartup = "false") public void receive(@Payload String msg, Message message,Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) throws IOException { System.out.println(String.format("mesage:%s", message.getBody())); System.out.println("路由监听接受到发送者发送的信息:" + msg); // 确认消息 // channel.basicAck(deliveryTag, false); }
手动开启/关闭容器
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21public void stopContainer(String containerId){ //得到容器的对象 MessageListenerContainer container = registry.getListenerContainer(containerId); //判断容器状态 if(container.isRunning()){ //开启容器 container.stop(); System.out.println("关闭容器"); } } public void startContainer(String containerId){ //得到容器的对象 MessageListenerContainer container = registry.getListenerContainer(containerId); //判断容器状态 if(!container.isRunning()){ //开启容器 container.start(); System.out.println("开启容器"); } }
Rabbitmq listener监听Message消息,其中Message主要包含两部分
复制代码
1
2
3
4
5
6
7
8
9public class Message implements Serializable { private final MessageProperties messageProperties; private final byte[] body; public Message(byte[] body, MessageProperties messageProperties) { //NOSONAR this.body = body; //NOSONAR this.messageProperties = messageProperties; } }
复制代码
1
2
3MessageProperties // 消息属性 byte[] body // 消息内容
当监听者监听到队列中有消息时则会进行接收并处理,MessageConvert 会直接转换成消息类型,并绑定在对应被注解的方法中。默认实现类为SimpleMessageConverter
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77public class SimpleMessageConverter extends WhiteListDeserializingMessageConverter implements BeanClassLoaderAware { @Override public Object fromMessage(Message message) throws MessageConversionException { Object content = null; MessageProperties properties = message.getMessageProperties(); if (properties != null) { String contentType = properties.getContentType(); if (contentType != null && contentType.startsWith("text")) { String encoding = properties.getContentEncoding(); if (encoding == null) { encoding = this.defaultCharset; } try { content = new String(message.getBody(), encoding); } catch (UnsupportedEncodingException e) { throw new MessageConversionException( "failed to convert text-based Message content", e); } } else if (contentType != null && contentType.equals(MessageProperties.CONTENT_TYPE_SERIALIZED_OBJECT)) { try { content = SerializationUtils.deserialize( createObjectInputStream(new ByteArrayInputStream(message.getBody()), this.codebaseUrl)); } catch (IOException | IllegalArgumentException | IllegalStateException e) { throw new MessageConversionException( "failed to convert serialized Message content", e); } } } if (content == null) { content = message.getBody(); } return content; } /** * Creates an AMQP Message from the provided Object. */ @Override protected Message createMessage(Object object, MessageProperties messageProperties) throws MessageConversionException { byte[] bytes = null; if (object instanceof byte[]) { bytes = (byte[]) object; messageProperties.setContentType(MessageProperties.CONTENT_TYPE_BYTES); } else if (object instanceof String) { try { bytes = ((String) object).getBytes(this.defaultCharset); } catch (UnsupportedEncodingException e) { throw new MessageConversionException( "failed to convert to Message content", e); } messageProperties.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN); messageProperties.setContentEncoding(this.defaultCharset); } else if (object instanceof Serializable) { try { bytes = SerializationUtils.serialize(object); } catch (IllegalArgumentException e) { throw new MessageConversionException( "failed to convert to serialized Message content", e); } messageProperties.setContentType(MessageProperties.CONTENT_TYPE_SERIALIZED_OBJECT); } if (bytes != null) { messageProperties.setContentLength(bytes.length); return new Message(bytes, messageProperties); } throw new IllegalArgumentException(getClass().getSimpleName() + " only supports String, byte[] and Serializable payloads, received: " + object.getClass().getName()); } }
最后
以上就是平常口红最近收集整理的关于Rabbitmq - rabbitmq Listener监听的全部内容,更多相关Rabbitmq内容请搜索靠谱客的其他文章。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复