概述
RocketMQ使用教程相关系列 目录
技术活,该赏
关注+一键三连(点赞,评论,收藏)再看,养成好习惯
目录
第一节:介绍
RocketMQ事务消息介绍
事务消息流程介绍
使用限制
第二节:使用场景
第三节:代码实战
事务消息生产者
事务监听器
事务消息消费者
效果:
第四节:checkLocalTransaction不会触发
调整后的生产者
效果:
第二个问题,回查的次数和定时时间是多少?
科学验证:
第一节:介绍
RocketMQ事务消息介绍
在4.3.0版本后,有了事务消息这一个特性,对于分布式事务来说,最常见的还是二阶段提交协议。
事务消息流程介绍
中文图更友好
上图说明了事务消息的大致方案,其中分为两个流程:
- 正常事务消息的发送及提交(黑线走的流程)
- 事务消息的补偿流程(黑线+红线走的流程)
1)正常事务消息的发送及提交
(1) 发送消息(half消息)。
(2) 服务端响应消息写入结果。
(3) 根据发送结果执行本地事务(如果写入失败,此时half消息对业务不可见,本地逻辑不执行)。
(4) 根据本地事务状态执行Commit或者Rollback(Commit操作生成消息索引,消息对消费者可见)
2)事务消息的补偿流程
补偿阶段用于解决消息Commit或者Rollback发生超时或者失败的情况,具体流程如下:
(1) 发送消息(half消息)。
(2) 服务端响应消息写入结果。
(3) 根据发送结果执行本地事务(如果写入失败,此时half消息对业务不可见,本地逻辑不执行)。
(4)此时执行本地事务后,并没有执行Commit或Rollback操作
(5) 对没有Commit/Rollback的事务消息(pending状态的消息),从服务端发起一次“回查”(最多重试15次(由配置参数决定),超过了默认丢弃此消息)
(6) Producer收到回查消息,检查回查消息对应的本地事务的状态
(7) 根据本地事务状态,重新Commit或者Rollback
其中,补偿阶段用于解决消息Commit或者Rollback发生超时或者失败的情况。
3)事务消息状态
事务消息共有三种状态,提交状态、回滚状态、中间状态:
- TransactionStatus.CommitTransaction: 提交事务,它允许消费者消费此消息。
- TransactionStatus.RollbackTransaction: 回滚事务,它代表该消息将被删除,不允许被消费。
- TransactionStatus.Unknown: 中间状态,它代表需要检查消息队列来确定状态。
使用限制
1. 事务消息不支持延时消息和批量消息。
2. 为了避免单个消息被检查太多次而导致半队列消息累积,我们默认将单个消息的检查次数限制为 15 次,但是用户可以通过 Broker 配置文件的 `transactionCheckMax`参数来修改此限制。如果已经检查某条消息超过 N 次的话( N = `transactionCheckMax` ) 则 Broker 将丢弃此消息,并在默认情况下同时打印错误日志。用户可以通过重写 `AbstractTransactionCheckListener` 类来修改这个行为。
3. 事务消息将在 Broker 配置文件中的参数 transactionMsgTimeout 这样的特定时间长度之后被检查。当发送事务消息时,用户还可以通过设置用户属性 CHECK_IMMUNITY_TIME_IN_SECONDS 来改变这个限制,该参数优先于 `transactionMsgTimeout` 参数。
4. 事务性消息可能不止一次被检查或消费。
5. 提交给用户的目标主题消息可能会失败,目前这依日志的记录而定。它的高可用性通过 RocketMQ 本身的高可用性机制来保证,如果希望确保事务消息不丢失、并且事务完整性得到保证,建议使用同步的双重写入机制。
6. 事务消息的生产者 ID 不能与其他类型消息的生产者 ID 共享。与其他类型的消息不同,事务消息允许反向查询、MQ服务器能通过它们的生产者 ID 查询到消费者。
第二节:使用场景
同步消息解决的问题是:消息一定投递成功(Broker 响应Send_OK状态码时才代表消息发送成功)
事务消息解决的问题是:本地事务+消息投递一起做
例子:李四要给张三转钱1万元。
同步消息
1、 银行发一个同步消息给MQ,给张三加钱1万元
2、MQ ack反馈发送成功了
3、银行给李四扣1万元
可能的问题,ack Send_OK之后,系统抛出异常,没有给李四扣钱,但是消息已经发送出去了,张三加钱成功了。
事务消息
1、银行发一个事务消息给MQ,给张三加钱1万元
2、Broker precommit成功,回调excuteCommit,真正执行李四扣款1万元
3、扣款成功ACK Commit给MQ
4、MQ收到Commit ACK时,commit消息,系统可以消费这个消息
如果系统扣款异常,则消息虽然prepareCommit在MQ中,但是对系统不可见。另外如果ACK网络丢失或者延时,MQ如果超时未接收到ACK,会发起重试确认到系统。
第三节:代码实战
事务消息生产者
/**
* 发送事务消息
*/
public class Producer {
public static void main(String[] args) throws Exception {
// 1.创建消息生产者producer,并制定生产者组名
TransactionMQProducer producer = new TransactionMQProducer("demo_transaction_group");
// 2.指定Nameserver地址
producer.setNamesrvAddr("192.168.88.131:9876");
// 添加事务监听器
TransactionListenerImpl transactionListener = new TransactionListenerImpl();
producer.setTransactionListener(transactionListener);
ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName("client-transaction-msg-check-thread");
return thread;
}
});
// 设置线程池
producer.setExecutorService(executorService);
// 3.启动producer
producer.start();
System.out.println("生产者启动");
String[] tags = { "TAGA", "TAGB", "TAGC" };
for (int i = 0; i < 3; i++) {
// 4.创建消息对象,指定主题Topic、Tag和消息体
/**
* 参数一:消息主题Topic
* 参数二:消息Tag
* 参数三:消息内容
*/
Message msg = new Message("TransactionTopic", tags[i], ("Hello xuzhu" + i).getBytes());
// 5.发送消息
SendResult result = producer.sendMessageInTransaction(msg, "hello-xuzhu_transaction");
// 发送状态
SendStatus status = result.getSendStatus();
System.out.println("发送结果:" + result);
System.out.println("发送结果状态:" + status);
// 线程睡2秒
TimeUnit.SECONDS.sleep(2);
}
// 6.关闭生产者producer
producer.shutdown();
System.out.println("生产者结束");
}
}
事务监听器
public class TransactionListenerImpl implements TransactionListener {
/**
* 完成本地事务逻辑
*
* @param message
* @param o
*@return org.apache.rocketmq.client.producer.LocalTransactionState
**/
@Override
public LocalTransactionState executeLocalTransaction(Message message, Object o) {
System.out.println("正在执行本地事务----");
if (StringUtils.equals("TAGA", message.getTags())) {
return LocalTransactionState.COMMIT_MESSAGE;
} else if (StringUtils.equals("TAGB", message.getTags())) {
return LocalTransactionState.ROLLBACK_MESSAGE;
} else if (StringUtils.equals("TAGC", message.getTags())) {
return LocalTransactionState.UNKNOW;
}
return LocalTransactionState.UNKNOW;
}
/**
* 消息回查
*
* @param messageExt
*@return org.apache.rocketmq.client.producer.LocalTransactionState
**/
@Override
public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
System.out.println("消息的Tag:" + messageExt.getTags());
return LocalTransactionState.COMMIT_MESSAGE;
}
}
事务消息消费者
public class Consumer {
public static void main(String[] args) throws Exception {
// 1.创建消费者Consumer,制定消费者组名
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("demo_transaction_group");
// 2.指定Nameserver地址
consumer.setNamesrvAddr("192.168.88.131:9876");
// 3.订阅主题Topic和Tag
consumer.subscribe("TransactionTopic", "*");
// 4.设置回调函数,处理消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
// 接受消息内容
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
System.out.println(
"consumeThread=" + Thread.currentThread().getName() + "," + new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 5.启动消费者consumer
consumer.start();
System.out.println("消费者启动");
}
}
效果:
从断点跟进,启动生产者时,会进入本地事务方法里
从本地事务方法里可知道,只有第一条数据有成功,一会看下,能消费几条数据
消费者,只会消费第一条,成功
第四节:checkLocalTransaction不会触发
细心的同学,可能会发现,事务监听器里有这么一个方法checkLocalTransaction,如果你断点,会发现怎么都执行不到这个方法。
原因是:事务一直没有rollback或者commit的时候才会触发回查
基于这个理论,我们调整下代码
调整后的生产者
/**
* 发送事务消息
*/
public class Producer {
public static void main(String[] args) throws Exception {
// 1.创建消息生产者producer,并制定生产者组名
TransactionMQProducer producer = new TransactionMQProducer("demo_transaction_group");
// 2.指定Nameserver地址
producer.setNamesrvAddr("192.168.88.131:9876");
// 添加事务监听器
TransactionListenerImpl transactionListener = new TransactionListenerImpl();
producer.setTransactionListener(transactionListener);
ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName("client-transaction-msg-check-thread");
return thread;
}
});
// 设置线程池
producer.setExecutorService(executorService);
// 3.启动producer
producer.start();
System.out.println("生产者启动");
String[] tags = { "TAGC", "TAGA", "TAGB" };
for (int i = 0; i < 3; i++) {
// 4.创建消息对象,指定主题Topic、Tag和消息体
/**
* 参数一:消息主题Topic
* 参数二:消息Tag
* 参数三:消息内容
*/
Message msg = new Message("TransactionTopic", tags[i], ("Hello xuzhu" + i).getBytes());
// 5.发送消息
SendResult result = producer.sendMessageInTransaction(msg, "hello-xuzhu_transaction");
// 发送状态
SendStatus status = result.getSendStatus();
System.out.println("发送结果:" + result);
System.out.println("发送结果状态:" + status);
// 线程睡120秒
TimeUnit.SECONDS.sleep(120);
}
// 6.关闭生产者producer
producer.shutdown();
System.out.println("生产者结束");
}
}
调整的地方:
String[] tags = { "TAGC", "TAGA", "TAGB" };
TimeUnit.SECONDS.sleep(120);
事务监听器和生产者代码保持不变
效果:
可以看到,回查消息已经进来了
第二个问题,回查的次数和定时时间是多少?
网上找的资料都说是默认15次,但定时时间没有资料有说明
不信这个邪了,我下载了rocketmq的源码,最后在BrokerConfig类中找到了答案
/**
* The maximum number of times the message was checked, if exceed this value, this message will be discarded.
*/
@ImportantField
private int transactionCheckMax = 15;
/**
* Transaction message check interval.
*/
@ImportantField
private long transactionCheckInterval = 60 * 1000;
注:
transactionCheckMax参数:默认回查次数是15次,15次后还没成功就回滚
transactionCheckInterval参数:默认定时回查间隔时间是1分钟。
这两个参数都可以在broker.properties配置
科学验证:
调整的地方:
#Producer类
TimeUnit.SECONDS.sleep(60 * 17);
#TransactionListenerImpl类
/**
* 消息回查
*
* @param messageExt
*@return org.apache.rocketmq.client.producer.LocalTransactionState
**/
@Override
public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
System.out.println("消息的Tag:" + messageExt.getTags());
System.out.println(new Date());
return LocalTransactionState.UNKNOW;
}
回查间隔时间是1分钟,符合参数设置
回查次数是15次,也符合参数设置
同学,你坚持到这里,说明有所收获了吧,给自己加个鸡腿,犒劳下自己。顺便给博主点关注+一键三连。感谢感谢
最后
以上就是慈祥凉面为你收集整理的RocketMQ第九章:手把手教老婆代码实现-事务消息生产者和事务消息消费者 及深入源码探索事务的消息回查第一节:介绍第二节:使用场景第三节:代码实战第四节:checkLocalTransaction不会触发的全部内容,希望文章能够帮你解决RocketMQ第九章:手把手教老婆代码实现-事务消息生产者和事务消息消费者 及深入源码探索事务的消息回查第一节:介绍第二节:使用场景第三节:代码实战第四节:checkLocalTransaction不会触发所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复