概述
我最近阅读了有关使用RabbitMQ重试的方法
在这里,并想尝试类似的方法
Spring Integration ,提供了一组很棒的集成抽象。
TL; DR解决的问题是重试一次消息(在处理失败的情况下),两次重试之间有较大的延迟(例如10分钟以上)。 该方法利用RabbitMQ支持
死信交换 ,看起来像这样
流程的要点是:
1.工作调度员创建“工作单元”,并通过交换机将其发送到RabbitMQ队列。
2.工作队列设置为
死信交换 。 如果消息处理由于任何原因失败,则“工作单元”将以“工作单元死信队列”结束。
3.依次将工作单位死信队列与工作单位交换设置为死信交换,以此方式创建一个循环。 此外,将死信队列中的消息过期设置为10分钟,这样,一旦消息过期,它将再次返回到工作单元队列中。
4.要打破周期,一旦超过某个计数阈值,处理代码就必须停止处理。
使用Spring Integration实现
我已经使用Spring Integration和RabbitMQ讲述了一条快乐的小路
在之前 ,这里我将主要基于此代码构建。
设置的一个很好的部分是适当的死信交换/队列的配置,当使用Spring的Java配置表示时,看起来像这样:
@Configuration
public class RabbitConfig {
@Autowired
private ConnectionFactory rabbitConnectionFactory;
@Bean
Exchange worksExchange() {
return ExchangeBuilder.topicExchange("work.exchange")
.durable()
.build();
}
@Bean
public Queue worksQueue() {
return QueueBuilder.durable("work.queue")
.withArgument("x-dead-letter-exchange", worksDlExchange().getName())
.build();
}
@Bean
Binding worksBinding() {
return BindingBuilder
.bind(worksQueue())
.to(worksExchange()).with("#").noargs();
}
// Dead letter exchange for holding rejected work units..
@Bean
Exchange worksDlExchange() {
return ExchangeBuilder
.topicExchange("work.exchange.dl")
.durable()
.build();
}
//Queue to hold Deadletter messages from worksQueue
@Bean
public Queue worksDLQueue() {
return QueueBuilder
.durable("works.queue.dl")
.withArgument("x-message-ttl", 20000)
.withArgument("x-dead-letter-exchange", worksExchange().getName())
.build();
}
@Bean
Binding worksDlBinding() {
return BindingBuilder
.bind(worksDLQueue())
.to(worksDlExchange()).with("#")
.noargs();
}
...
}
请注意,这里我将“死信”队列的TTL设置为20秒,这意味着20秒后,一条失败的消息将返回到处理队列中。 一旦完成此设置并在RabbitMQ中创建了适当的结构,代码的消耗部分将如下所示,使用
Spring Integration Java DSL :
@Configuration
public class WorkInbound {
@Autowired
private RabbitConfig rabbitConfig;
@Bean
public IntegrationFlow inboundFlow() {
return IntegrationFlows.from(
Amqp.inboundAdapter(rabbitConfig.workListenerContainer()))
.transform(Transformers.fromJson(WorkUnit.class))
.log()
.filter("(headers['x-death'] != null) ? headers['x-death'][0].count <= 3: true", f -> f.discardChannel("nullChannel"))
.handle("workHandler", "process")
.get();
}
}
这里的大多数重试逻辑是由RabbitMQ基础结构处理的,这里唯一的变化是通过在特定的2次重试后显式丢弃消息来打破周期。 此中断表示为上面的过滤器,查看了RabbitMQ一旦发送到Dead Letter交换后将其添加到消息的称为“ x-death”的标头。 过滤器确实有些丑陋-可以用Java代码更好地表达它。
还有一点要注意的是,重试逻辑可以使用Spring Integration在过程中表示,但是我想研究一个重试时间可能很长(例如15到20分钟)的流程,该流程在过程中无法正常工作而且也不安全,因为我希望应用程序的任何实例都可以处理消息重试。
如果您想进一步探索,请尝试在
我的github仓库 – https://github.com/bijukunjummen/si-dsl-rabbit-sample
参考:
使用RabbitMQ重试:http://dev.venntro.com/2014/07/back-off-and-retry-with-rabbitmq
翻译自: https://www.javacodegeeks.com/2016/09/rabbitmq-retries-using-spring-integration.html
最后
以上就是从容纸飞机为你收集整理的使用Spring Integration重试RabbitMQ的全部内容,希望文章能够帮你解决使用Spring Integration重试RabbitMQ所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复