我是靠谱客的博主 拉长香水,最近开发中收集的这篇文章主要介绍rabbitmq 延时队列,实现类似微信支付宝 阶梯性异步回调通知,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

一般支付完成后,微信会把相关支付结果和用户信息发送给商户,商户需要接收处理,并返回应答。

对后台通知交互时,如果微信收到商户的应答不是成功或超时,微信认为通知失败,微信会通过一定的策略定期重新发起通知,尽可能提高通知的成功率,但微信不保证通知最终能成功。 (通知频率为15/15/30/180/1800/1800/1800/1800/3600,单位:秒)
废话不多说 (先理下思路)  下面以生产消费者 模式  进行梳理

1.TTL:存活时间
2.死信:过了存活期的消息
3.Dead Letter Exchange:"死信" 交换器  其实就是普通的交换器而已.Exchange有4个类型:direct,topic,fanout,header
4.routingkey:消费者队列绑定到交换机时要指定路由key (详细请看:
rabbitmq-----Routing和topic模式)

1.首先  在Connection中创建一个Channel,通过Channel声明两个交换器,一个是 用来接收通知数据的交换器ExchangeA,一个是延时通知数据的交换器ExchangeB。

2.声明两个队列,一个"死信"队列queueA,一个延时通知数据队列queueB。

3.然后,交换器ExchangeA和 "死信"队列queueA  绑定,把延时通知数据的交换器ExchangeB和queueB进行绑定。交换器再和队列进行绑定时会设置routingkey

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
<!--配置connection-factory,指定连接rabbit server参数 -->
<rabbit:connection-factory id="connectionFactory" virtual-host="/" username="${USERNAME}" password="${PASSWORD}" host="${HOST}" port="${PORT}"/>
<!--定义rabbit模版,指定连接工厂以及定义exchange-->
<rabbit:template id="amqpTemplate" connection-factory="connectionFactory" />
<!--MQ的管理,包括队列,交换器,声明等-->
<rabbit:admin connection-factory="connectionFactory" />
<!--定义一个交换机exchangeA,路由类型为direct,所有的订单会塞给此交换机 和 然后和queueA 绑定 key testQueue-->
<rabbit:direct-exchange name="exchangeA" delayed="false"
id="exchangeA" durable="false" auto-delete="false">
<rabbit:bindings>
<rabbit:binding queue="queueA" key="testQueue"/>
</rabbit:bindings>
</rabbit:direct-exchange>
<!--定义一个延时交换机,路由类型为direct,延时的数据会塞给此交换机 并和exchangeB绑定-->
<rabbit:direct-exchange name="exchangeB"
id="exchangeB" delayed="false" durable="false" auto-delete="false">
<rabbit:bindings>
<rabbit:binding queue="queueB" key="testQueue"/>
</rabbit:bindings>
</rabbit:direct-exchange>
<!--定义队列,queueA-->
<rabbit:queue name="queueA" durable="false"/>
<!--定义延时队列,queueB-->
<rabbit:queue name="queueB" durable="false">
<rabbit:queue-arguments>
<!--
队列绑定参数 告诉rabbit 将超时的数据推送到 死信交换机-->
<entry key="x-dead-letter-exchange" value="exchangeA" />
<entry key="x-dead-letter-routing-key" value="testQueue" />
</rabbit:queue-arguments>
</rabbit:queue>
<!--队列监听-->
<rabbit:listener-container connection-factory="connectionFactory">
<rabbit:listener ref="myCustomer" method="listen" queue-names="queueA" />
<rabbit:listener ref="myCustomer" method="listen" queue-names="queueA" />
<rabbit:listener ref="myCustomer" method="listen" queue-names="queueA" />
</rabbit:listener-container>
<!--定义消费者-->
<bean id="myCustomer" class="xxx.RabbitCustomer" />

4.创建生产者类(下面贴出部分代码 具体按自己需求实现) RabbitService

public static void publish(String queue, String message, long delay) {
try {
Channel channel = connection.createChannel();//创建一个channel,不管是生产数据,还是消费数据,都是通过channel去操作的
AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
builder.contentType("text/plain");
if(delay ==0){
AMQP.BasicProperties properties = builder.build();
//如果延时为0 走死信交换器
channel.basicPublish("exchangeA",
"testQueue",
properties,
message.getBytes()
);
}else{
//否则推送到延时交换器
builder.deliveryMode(2);//设置消息持久化
builder.expiration(String.valueOf(delay*1000)); //设置延时时间
rabbitmq
是毫秒
AMQP.BasicProperties properties = builder.build();
channel.basicPublish("exchangeB","testQueue", properties, message.getBytes());
}
channel.close();
} catch (Exception e) {
logger.error("rabbit-publish-error:{}", e.getMessage());
}
}

5.创建消费者类  RabbitCustomer(下面部分代码)

//message 为json数据
public void listen(String message) {
try {
JSONObject jsonData;
Integer pushNum;
try {
jsonData= loadMessage(message);
pushNum= paras.getInteger("pushNum");
} catch (Exception e) {
logger.error("异步通知错误:", e.getMessage());
return;
}
String result = null;
try {
result = doHttpNotify(jsonData, connectionsPoolExecutor);
} catch (Exception e) {
logger.error("异步通知错误:"+e.getMessage());
}
logger.info("异步通知返回:" + result);
if (StringUtils.isNotBlank(result) && result.length() >= 60) {
result = result.substring(0, 60);
}
if (StringUtils.isNotBlank(result) && "success".equals(result.trim())) {
//返回成功的逻辑
} else {
//处理失败的逻辑 然后准备 重新发起通知
paras.put("pushNum", pushNum+ 1);
if (retry == 0) {
RabbitService.publish(jsonData.toJSONString(), 5);
//5s
} else if (retry == 1) {
RabbitService.publish(jsonData.toJSONString(), 30);
//30s
} else if (retry == 2) {
RabbitService.publish( jsonData.toJSONString(), 120); //120s
} else {
logger.error("异步通知失败");
}
}
} catch (Exception e) {
logger.error("异步通知失败" + e.getMessage());
} finally {
}

 

最后

以上就是拉长香水为你收集整理的rabbitmq 延时队列,实现类似微信支付宝 阶梯性异步回调通知的全部内容,希望文章能够帮你解决rabbitmq 延时队列,实现类似微信支付宝 阶梯性异步回调通知所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(49)

评论列表共有 0 条评论

立即
投稿
返回
顶部