我是靠谱客的博主 从容纸飞机,最近开发中收集的这篇文章主要介绍使用Spring Integration重试RabbitMQ,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

我最近阅读了有关使用RabbitMQ重试的方法
在这里,并想尝试类似的方法
Spring Integration ,提供了一组很棒的集成抽象。

TL; DR解决的问题是重试一次消息(在处理失败的情况下),两次重试之间有较大的延迟(例如10分钟以上)。 该方法利用RabbitMQ支持
死信交换 ,看起来像这样

canvas-2

流程的要点是:

1.工作调度员创建“工作单元”,并通过交换机将其发送到RabbitMQ队列。

2.工作队列设置为
死信交换 。 如果消息处理由于任何原因失败,则“工作单元”将以“工作单元死信队列”结束。

3.依次将工作单位死信队列与工作单位交换设置为死信交换,以此方式创建一个循环。 此外,将死信队列中的消息过期设置为10分钟,这样,一旦消息过期,它将再次返回到工作单元队列中。

4.要打破周期,一旦超过某个计数阈值,处理代码就必须停止处理。

使用Spring Integration实现

我已经使用Spring Integration和RabbitMQ讲述了一条快乐的小路
在之前 ,这里我将主要基于此代码构建。

设置的一个很好的部分是适当的死信交换/队列的配置,当使用Spring的Java配置表示时,看起来像这样:

@Configuration
public class RabbitConfig {

    @Autowired
    private ConnectionFactory rabbitConnectionFactory;

    @Bean
    Exchange worksExchange() {
        return ExchangeBuilder.topicExchange("work.exchange")
                .durable()
                .build();
    }


    @Bean
    public Queue worksQueue() {
        return QueueBuilder.durable("work.queue")
                .withArgument("x-dead-letter-exchange", worksDlExchange().getName())
                .build();
    }

    @Bean
    Binding worksBinding() {
        return BindingBuilder
                .bind(worksQueue())
                .to(worksExchange()).with("#").noargs();
    }
    
    // Dead letter exchange for holding rejected work units..
    @Bean
    Exchange worksDlExchange() {
        return ExchangeBuilder
                .topicExchange("work.exchange.dl")
                .durable()
                .build();
    }

    //Queue to hold Deadletter messages from worksQueue
    @Bean
    public Queue worksDLQueue() {
        return QueueBuilder
                .durable("works.queue.dl")
                .withArgument("x-message-ttl", 20000)
                .withArgument("x-dead-letter-exchange", worksExchange().getName())
                .build();
    }

    @Bean
    Binding worksDlBinding() {
        return BindingBuilder
                .bind(worksDLQueue())
                .to(worksDlExchange()).with("#")
                .noargs();
    }
    ...
}

请注意,这里我将“死信”队列的TTL设置为20秒,这意味着20秒后,一条失败的消息将返回到处理队列中。 一旦完成此设置并在RabbitMQ中创建了适当的结构,代码的消耗部分将如下所示,使用
Spring Integration Java DSL :

@Configuration
public class WorkInbound {

    @Autowired
    private RabbitConfig rabbitConfig;

    @Bean
    public IntegrationFlow inboundFlow() {
        return IntegrationFlows.from(
                Amqp.inboundAdapter(rabbitConfig.workListenerContainer()))
                .transform(Transformers.fromJson(WorkUnit.class))
                .log()
                .filter("(headers['x-death'] != null) ? headers['x-death'][0].count <= 3: true", f -> f.discardChannel("nullChannel"))
                .handle("workHandler", "process")
                .get();
    }

}

这里的大多数重试逻辑是由RabbitMQ基础结构处理的,这里唯一的变化是通过在特定的2次重试后显式丢弃消息来打破周期。 此中断表示为上面的过滤器,查看了RabbitMQ一旦发送到Dead Letter交换后将其添加到消息的称为“ x-death”的标头。 过滤器确实有些丑陋-可以用Java代码更好地表达它。

还有一点要注意的是,重试逻辑可以使用Spring Integration在过程中表示,但是我想研究一个重试时间可能很长(例如15到20分钟)的流程,该流程在过程中无法正常工作而且也不安全,因为我希望应用程序的任何实例都可以处理消息重试。

如果您想进一步探索,请尝试在
我的github仓库 – https://github.com/bijukunjummen/si-dsl-rabbit-sample

参考:

使用RabbitMQ重试:http://dev.venntro.com/2014/07/back-off-and-retry-with-rabbitmq

翻译自: https://www.javacodegeeks.com/2016/09/rabbitmq-retries-using-spring-integration.html

最后

以上就是从容纸飞机为你收集整理的使用Spring Integration重试RabbitMQ的全部内容,希望文章能够帮你解决使用Spring Integration重试RabbitMQ所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(41)

评论列表共有 0 条评论

立即
投稿
返回
顶部