概述
通过Publisher Confirms and Returns机制,生产者可以判断消息是否发送到了exchange及queue,而通过消费者确认机制,Rabbitmq可以决定是否重发消息给消费者,以保证消息被处理。
1.什么是Publisher Confirms and Returns?
Delivery processing acknowledgements from consumers to RabbitMQ are known as acknowledgements in AMQP 0-9-1 parlance; broker acknowledgements to publishers are a protocol extension called publisher confirms.
地址:http://www.rabbitmq.com/confirms.html
根据RabbitMq官网定义,rabbitmq代理(broker)对发布者(publishers)的确认被称作发布者确认(publisher confirms),这种机制是Rabbitmq对标准Amqp协议的扩展。因此通过这种机制可以确认消息是否发送给了目标。
2.如何通过Spring amqp来使用Publisher Confirms and Returns机制?
Confirmed and returned messages are supported by setting the CachingConnectionFactory’s publisherConfirms and publisherReturns properties to ‘true’ respectively.When these options are set, Channel s created by the factory are wrapped in an PublisherCallbackChannel, which is used to facilitate the callbacks. When such a channel is obtained, the client can register a PublisherCallbackChannel.Listener with the Channel. The PublisherCallbackChannel implementation contains logic to route a confirm/return to the appropriate listener. These features are explained further in the following sections.
http://docs.spring.io/spring-amqp/docs/1.6.3.RELEASE/reference/html/_reference.html#cf-pub-conf-ret
通过Spring amqp文档可以看到,要使用这种机制需要将Template模版的设publisherConfirms 或publisherReturns 属性设置为true,此外ConnectionFactory要配置为CachingConnectionFactory。
<bean id="connectionFactory"
class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
<property name="host" value="192.168.2.133" />
<property name="port" value="5672" />
<property name="username" value="sun" />
<property name="password" value="123456" />
<property name="publisherConfirms" value="true" />
<property name="publisherReturns" value="true" />
</bean>
2.1 ConfirmCallback的使用及触发的一种场景
RabbitTemplate template = (RabbitTemplate) ctx.getBean("amqpTemplate");
int i = 0;
template.setMandatory(true);
if(!template.isConfirmListener()){
template.setConfirmCallback(new ConfirmCallback() {
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.out.println("ack: " + ack + ". correlationData: " + correlationData + "cause : " + cause);
}
});
}
isConfirmListener由RabbitTemplate 提供,用于判断是否创建了这个对象。
@Override
public boolean isConfirmListener() {
return this.confirmCallback != null;
}
而ConfirmListener是当消息无法发送到Exchange被触发,此时Ack为False,这时cause包含发送失败的原因,例如exchange不存在时,cause的内容如下。
ack: false. correlationData: nullcause : channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'myExchange' in vhost '/', class-id=60, method-id=40)
而消息可以发送到Exchange时Ack为true。在发送时可以通过使用RabbitTemplate 下面的方法来进行验证。
convertAndSend(exchange, routingKey, object, correlationData);
correlationData是回调时传入回调方法的参数,因此通过这个属性来区分消息,并进行重发。
2.2 ReturnCallback的使用及触发的一种场景
ReturnCallBack使用时需要通过RabbitTemplate 的setMandatory方法设置变量mandatoryExpression的值,该值可以是一个表达式或一个Boolean值。当为TRUE时,如果消息无法发送到指定的消息队列那么ReturnCallBack回调方法会被调用。
与isConfirmListener类似,也有一个isReturnListener方法,但这个方法在1.6.1版本中返回true。
@Override
public boolean isReturnListener() {
return true;
}
设置ReturnCallBack回调。
template.setReturnCallback(new ReturnCallback() {
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
System.out.println("text: " + replyText + " code: " + replyCode + " exchange: " + exchange + " routingKey :" + routingKey);
}
});
发送消息验证ReturnCallBack。
convertAndSend(String routingKey, final Object object, CorrelationData correlationData)
template.convertAndSend("not exist routing key abc", (Object)"bbb", new CorrelationData("123"));
在测试代码中指定了一个routingKey,但通过这个routingKey,Exchange无法将消息路由到任何队列,因此导致ReturnCallBack被触发,最后返回的信息如下。
text: NO_ROUTE code: 312 exchange: myExchange routingKey :not exist routing key abc
此外在vhost中若找不到Exchange时,confirmCallBack会被触发,而returnCallBack不会被触发,原因参见下面的回答。
http://stackoverflow.com/questions/29283845/spring-rabbit-returncallback-not-triggered
3.消费者对消息的确认
Rabbitmq 官网教程第2课讲述了如何通过Rabbitmq 提供的Api来实现消费者确认消息已经处理。
http://www.rabbitmq.com/tutorials/tutorial-two-java.html
而在Spring-rabbitmq中,默认启用的是自动确认机制,当消费端在回调方法中接收到消息后,并成功处理,且未抛出任何异常,那么消息会被确认。如果执行过程中产生异常,那么Rabbitmq会尝试重复发送消息给消费者。这点可以查看源码,并通过在消息处理方法中抛出异常来验证。当产生异常后,通过ip:15672登录Rabbitmq管理的Web页面,选择队列选项可以看到未处理的消息的数量(Unacked)。这个数量是与
<rabbit:listener-container/>
配置中的prefetchCount(预取数相关)。
因此消费者对消息的确认最基础的使用不需要额外进行配置。
若要在消费方配置相关参数,可以参考官方文档3.1.5小节。
http://docs.spring.io/spring-amqp/docs/1.6.3.RELEASE/reference/html/_reference.html
4.配置多个消费者并关联到同一个队列
如下,若配置多个消费者关联到一个队列,那么当发送多条消息时(消息数量大于消费者数量时),那么队列中的消息会轮流分发给各个消费者。
consumer,consumer2,consumer,consumer2,….的顺序。
<rabbit:listener-container
connection-factory="connectionFactory" prefetch="1" receive-timeout="4000">
<rabbit:listener ref="consumer" method="listen" queue-names="myQueue" />
<rabbit:listener ref="consumer2" method="listen" queue-names="myQueue" />
</rabbit:listener-container>
如果其中一个消费者接收消息后,并产生异常,那么该消息会发送给下一个消费者。例如consumer,在listen抛出异常,那么这条消息会发送给consumer2。
5.示例中的配置与代码
<bean id="connectionFactory"
class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
<property name="host" value="192.168.2.133" />
<property name="port" value="5672" />
<property name="username" value="sun" />
<property name="password" value="123456" />
<property name="publisherConfirms" value="true" />
<property name="publisherReturns" value="true" />
</bean>
<rabbit:admin connection-factory="connectionFactory" />
<rabbit:queue name="myQueue" id="myQueue" durable="true"
auto-delete="false" exclusive="false" />
<rabbit:queue name="myQueue" id="myQueue2" durable="true"
auto-delete="false" exclusive="false" />
<rabbit:direct-exchange name="myExchange">
<rabbit:bindings>
<rabbit:binding queue="myQueue" key="sun1" />
<rabbit:binding queue="myQueue" key="sun2" />
</rabbit:bindings>
</rabbit:direct-exchange>
<rabbit:template id="amqpTemplate" connection-factory="connectionFactory"
exchange="myExchange" routing-key="sunv5" />
<rabbit:listener-container
connection-factory="connectionFactory">
<rabbit:listener ref="consumer" method="listen" queue-names="myQueue" />
</rabbit:listener-container>
<bean id="consumer" class="springamqp.Foo" />
ClassPathXmlApplicationContext ctx =
new ClassPathXmlApplicationContext("classpath:rabbit-context.xml");
RabbitTemplate template = (RabbitTemplate) ctx.getBean("amqpTemplate");
template.setMandatory(true);
if(!template.isConfirmListener()){
template.setConfirmCallback(new ConfirmCallback() {
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.out.println("ack: " + ack + ". correlationData: " + correlationData + "cause : " + cause);
}
});
}
template.setReturnCallback(new ReturnCallback() {
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
System.out.println("text: " + replyText + " code: " + replyCode + " exchange: " + exchange + " routingKey :" + routingKey);
}
});
int i = 0;
while(i < 50) {
template.convertAndSend("not exist routing key abc", (Object)"bbb", new CorrelationData("123"));
Thread.sleep(7000);
}
public class Foo {
public void listen(String foo) throws InterruptedException {
System.out.println("get msg");
System.out.println(foo);
Thread.sleep(2000);
// 抛出异常,观察消息重发
throw new NullPointerException();
}
}
一些参考:
- immediate与mandatory标志位的区别,http://www.rabbitmq.com/amqp-0-9-1-reference.html#
- Introducing Publisher Confirms,http://www.rabbitmq.com/blog/2011/02/10/introducing-publisher-confirms/
- 通过dlx与ttl替代immediate http://www.tuicool.com/articles/ERJnyqr
最后
以上就是舒心太阳为你收集整理的Spring-amqp 1.6.1 生产者与消费者消息确认配置与使用1.什么是Publisher Confirms and Returns?2.如何通过Spring amqp来使用Publisher Confirms and Returns机制?3.消费者对消息的确认4.配置多个消费者并关联到同一个队列5.示例中的配置与代码一些参考:的全部内容,希望文章能够帮你解决Spring-amqp 1.6.1 生产者与消费者消息确认配置与使用1.什么是Publisher Confirms and Returns?2.如何通过Spring amqp来使用Publisher Confirms and Returns机制?3.消费者对消息的确认4.配置多个消费者并关联到同一个队列5.示例中的配置与代码一些参考:所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复