我是靠谱客的博主 拉长香水,这篇文章主要介绍rabbitmq 延时队列,实现类似微信支付宝 阶梯性异步回调通知,现在分享给大家,希望可以做个参考。

一般支付完成后,微信会把相关支付结果和用户信息发送给商户,商户需要接收处理,并返回应答。

对后台通知交互时,如果微信收到商户的应答不是成功或超时,微信认为通知失败,微信会通过一定的策略定期重新发起通知,尽可能提高通知的成功率,但微信不保证通知最终能成功。 (通知频率为15/15/30/180/1800/1800/1800/1800/3600,单位:秒)
废话不多说 (先理下思路)  下面以生产消费者 模式  进行梳理

1.TTL:存活时间
2.死信:过了存活期的消息
3.Dead Letter Exchange:"死信" 交换器  其实就是普通的交换器而已.Exchange有4个类型:direct,topic,fanout,header
4.routingkey:消费者队列绑定到交换机时要指定路由key (详细请看:
rabbitmq-----Routing和topic模式)

1.首先  在Connection中创建一个Channel,通过Channel声明两个交换器,一个是 用来接收通知数据的交换器ExchangeA,一个是延时通知数据的交换器ExchangeB。

2.声明两个队列,一个"死信"队列queueA,一个延时通知数据队列queueB。

3.然后,交换器ExchangeA和 "死信"队列queueA  绑定,把延时通知数据的交换器ExchangeB和queueB进行绑定。交换器再和队列进行绑定时会设置routingkey

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd"> <!--配置connection-factory,指定连接rabbit server参数 --> <rabbit:connection-factory id="connectionFactory" virtual-host="/" username="${USERNAME}" password="${PASSWORD}" host="${HOST}" port="${PORT}"/> <!--定义rabbit模版,指定连接工厂以及定义exchange--> <rabbit:template id="amqpTemplate" connection-factory="connectionFactory" /> <!--MQ的管理,包括队列,交换器,声明等--> <rabbit:admin connection-factory="connectionFactory" /> <!--定义一个交换机exchangeA,路由类型为direct,所有的订单会塞给此交换机 和 然后和queueA 绑定 key testQueue--> <rabbit:direct-exchange name="exchangeA" delayed="false" id="exchangeA" durable="false" auto-delete="false"> <rabbit:bindings> <rabbit:binding queue="queueA" key="testQueue"/> </rabbit:bindings> </rabbit:direct-exchange> <!--定义一个延时交换机,路由类型为direct,延时的数据会塞给此交换机 并和exchangeB绑定--> <rabbit:direct-exchange name="exchangeB" id="exchangeB" delayed="false" durable="false" auto-delete="false"> <rabbit:bindings> <rabbit:binding queue="queueB" key="testQueue"/> </rabbit:bindings> </rabbit:direct-exchange> <!--定义队列,queueA--> <rabbit:queue name="queueA" durable="false"/> <!--定义延时队列,queueB--> <rabbit:queue name="queueB" durable="false"> <rabbit:queue-arguments> <!-- 队列绑定参数 告诉rabbit 将超时的数据推送到 死信交换机--> <entry key="x-dead-letter-exchange" value="exchangeA" /> <entry key="x-dead-letter-routing-key" value="testQueue" /> </rabbit:queue-arguments> </rabbit:queue> <!--队列监听--> <rabbit:listener-container connection-factory="connectionFactory"> <rabbit:listener ref="myCustomer" method="listen" queue-names="queueA" /> <rabbit:listener ref="myCustomer" method="listen" queue-names="queueA" /> <rabbit:listener ref="myCustomer" method="listen" queue-names="queueA" /> </rabbit:listener-container> <!--定义消费者--> <bean id="myCustomer" class="xxx.RabbitCustomer" />

4.创建生产者类(下面贴出部分代码 具体按自己需求实现) RabbitService

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public static void publish(String queue, String message, long delay) { try { Channel channel = connection.createChannel();//创建一个channel,不管是生产数据,还是消费数据,都是通过channel去操作的 AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder(); builder.contentType("text/plain"); if(delay ==0){ AMQP.BasicProperties properties = builder.build(); //如果延时为0 走死信交换器 channel.basicPublish("exchangeA", "testQueue", properties, message.getBytes() ); }else{ //否则推送到延时交换器 builder.deliveryMode(2);//设置消息持久化 builder.expiration(String.valueOf(delay*1000)); //设置延时时间 rabbitmq 是毫秒 AMQP.BasicProperties properties = builder.build(); channel.basicPublish("exchangeB","testQueue", properties, message.getBytes()); } channel.close(); } catch (Exception e) { logger.error("rabbit-publish-error:{}", e.getMessage()); } }

5.创建消费者类  RabbitCustomer(下面部分代码)

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
//message 为json数据 public void listen(String message) { try { JSONObject jsonData; Integer pushNum; try { jsonData= loadMessage(message); pushNum= paras.getInteger("pushNum"); } catch (Exception e) { logger.error("异步通知错误:", e.getMessage()); return; } String result = null; try { result = doHttpNotify(jsonData, connectionsPoolExecutor); } catch (Exception e) { logger.error("异步通知错误:"+e.getMessage()); } logger.info("异步通知返回:" + result); if (StringUtils.isNotBlank(result) && result.length() >= 60) { result = result.substring(0, 60); } if (StringUtils.isNotBlank(result) && "success".equals(result.trim())) { //返回成功的逻辑 } else { //处理失败的逻辑 然后准备 重新发起通知 paras.put("pushNum", pushNum+ 1); if (retry == 0) { RabbitService.publish(jsonData.toJSONString(), 5); //5s } else if (retry == 1) { RabbitService.publish(jsonData.toJSONString(), 30); //30s } else if (retry == 2) { RabbitService.publish( jsonData.toJSONString(), 120); //120s } else { logger.error("异步通知失败"); } } } catch (Exception e) { logger.error("异步通知失败" + e.getMessage()); } finally { }

 

最后

以上就是拉长香水最近收集整理的关于rabbitmq 延时队列,实现类似微信支付宝 阶梯性异步回调通知的全部内容,更多相关rabbitmq内容请搜索靠谱客的其他文章。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(57)

评论列表共有 0 条评论

立即
投稿
返回
顶部