概述
Spring AMQP异常处理和事物
异常处理
RabbitMQ Java客户端的许多操作都会抛出checked Exception。例如,在大多数情况下,IOException可能抛出。RabbitTemplate,SimpleMessageListenerContainer和其它的一些Spring AMQP组件将捕获这些异常,将它转为运行时异常体系中的异常。这些异常都定义在org.springframework.amqp包中,AmqpException值基础体系中的根部。
当监听器抛出一个异常的时候,它将这个异常包装为ListenerExecutionFailedException,通常这个消息会被拒绝,并且重新放在消息代理中。如果设置defaultRequeueRejected属性为false,将把这个消息直接丢弃(或者过滤到dead letter exchange)。
然而,存在一类错误,listener不能控制行为。当一种消息不能够转换的时候(例如一种不合法的content_encoding),在消息到达用户代码之前MessageConversionException会被抛出,如果你将
defaultRequeueRejected设置为true,那么这个消息将会用不停息的发送下去。在1.3.2版本之前,用户需要写一个自定义ErrorHandler来处理。1.3.2版本之后可以通过Exception Handling来避免这种情况。
自从1.3.2版本之后,默认的ErrorHandler是ConditionalRejectingErrorHandler,
如果抛出了
MessageConversionException异常,
它将被拒绝(而不是重新塞入)。这个错误处理器可以通过FatalExceptionStrategy进行配置,所以用户可以通过提供自己的规则来进行条件拒绝,例如通过实现Spring Retry中BinaryException。另外,
ListenerExecutionFailedException有failedMessage这个属性可以用于这个决定。如果FatalExceptionStrategy.isFatal()方法返回true,那么错误处理器将抛出AmqpRejectAndDontRequeueException。默认情况下,
FatalExceptionStrategy记录警告信息。
事物
Spring Rabbit Framework支持原子化事物管理,在同步和异步的情况下,存在很多不同的语义,可以进行不同的声明式选择,就和现有的Spring事物那样。这就使得很多消息模式很容易实现。
有两种方式将事物引入框架。在RabbitTemplate和SimpleMessageListenerContainer中都可以设置channelTransacted为true,来告诉框架使用事务channel,在所有的操作之后进行提交,在出现异常时,进行回滚。另外一种方式是提供一个外部的事务,Spring 的PlatformTransactionManager实现为正在进行的操作提供了事务环境。如果框架正在发送或者接收消息,恰巧这是已经有一个事务在运行,而且
channelTransacted设置为true,那么提交或者回滚消息事务将被延迟,直到当前的事务结束。如果设置
channelTransacted为false,那么就没有事务环境。
channelTransacted标志是在配置时设定的,它是在AMQP组件创建,通常是在应用启动的时候声明和处理的。外部事务在理论上更为动态,系统在运行的时候响应当前的线程状态,但是在实践中,在配置时候设定。
对于RabbitTemplate中的同步操作,外部事务由调用者提供或者显示或者隐式,下面是个现实方法,template的channelTransacted属性被设置为true。
@Transactional
public void doSomething() {
String incoming = rabbitTemplate.receiveAndConvert();
// do some more database processing...
String outgoing = processInDatabaseAndExtractReply(incoming);
rabbitTemplate.convertAndSend(outgoing);
}
一个字符串在一个事务方法中被接收,转换作为消息体发送,如果数据库操作失败,并且抛出异常,那么进入的消息将返还给消息代理,向外发送的消息将不会发送,这对于RabbitTemplate中的任何一连串事务方法都适用。
在异步的情况下,使用SimpleMessageListenerContainer,如果需要异步事务,在监听器创建的时候需要通过容器来请求。为了表示需要的外部事务,用户需要在容器配置的时候指定PlatformTransactionManager的实现。例如:
@Configuration
public class ExampleExternalTransactionAmqpConfiguration {
@Bean
publicSimpleMessageListenerContainer messageListenerContainer() {
SimpleMessageListenerContainer container = newSimpleMessageListenerContainer();
container.setConnectionFactory(rabbitConnectionFactory());
container.setTransactionManager(transactionManager());
container.setChannelTransacted(true);
container.setQueueName("some.queue");
container.setMessageListener(exampleListener());
returncontainer;
}
}
在上面的例子中,事务管理器作为注入的bean被添加进来,同时
channelTransacted属性被设置为true。起到的作用是,如果监听器处理失败,并且抛出异常,那么事务将进行回滚,那么消息将返回给消息代理。这有时被称作一阶段提交,是可信度较高的消息的一种有力的模式。如果
channelTransacted属性被设置为false,默认情况是这样,外部事务提供给监听器,那么所有的消息操作将会自动应答,效果是提交所有的消息操作,即使业务操作进行了回滚。
了解接收消息回滚
AMQP事务仅仅应用于
消息或者应答,所以当已经接收到消息
时候
,有Spring事务回滚的,Spring AMQP要做的不仅仅是回滚事务,而且需要手动的拒绝消息。消息拒绝措施和事务是独立的,并且依赖于
defaultRequeueRejected属性的设置。
使用RabbitTransactionManager
RabbitTransactionManager使用外部事务执行Rabbit操作的另外一种方式。这个事务管理器是PlatformTransactionManager接口的实现,它只能在一个Rabbit ConnectionFactory中使用。
注意:
这种策略不能够提供XA事务,例如在消息和数据库之间共享事务。
应用代码可以通过
ConnectionFactoryUtils.getTransactionalResourceHolder(ConnectionFactory,
boolean)获取事务资源,而不是先创建Connection.createChannel(),然后再使用Channel创建。当使用Spring RabbitTemplate的时候,它将自动探测线程内的Channel,并自动装载。
使用Java进行配置:
@Bean
public RabbitTransactionManager rabbitTransactionManager() {
return newRabbitTransactionManager(connectionFactory);
}
使用XML进行配置:
<bean id="rabbitTxManager"
class="org.springframework.amqp.rabbit.transaction.RabbitTransactionManager">
<property name="connectionFactory" ref="connectionFactory"/>
</bean>
最后
以上就是文静发夹为你收集整理的Spring AMQP异常处理和事物Spring AMQP异常处理和事物的全部内容,希望文章能够帮你解决Spring AMQP异常处理和事物Spring AMQP异常处理和事物所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复