一、前言
分布式事务:分布式条件下,多个节点操作的整体事务一致性。
特别是在微服务场景下,业务 A 和业务 B 关联,事务 A 成功,事务 B 失败,由于跨系统, 就会导致不被感知。 此时从整体来看,数据是不一致的。
分布式事务中的两大基本理论: CAP 理论 与 Base 理论。
分布式事务解决方案可以分为:
- 强一致性分布式事务解决方案:基于
CAP理论 - 最终一致性分布式事务解决方案:基于
Base理论
在最终一致性分布式事务解决方案中,典型的方案包括:
TCC
TCC 解决方案
适用场景:适用于具有强隔离性、严格一致性要求的业务场景,也适用于执行时间比较短的业务。
TCC 方案执行流程:

-
try阶段: 仅做业务的一致性检查和预留相应的资源。 -
confirm阶段: 当try阶段所有分支事务执行成功后开始执行confirm阶段。默认一定成功。出错(异常):就要 重试或者人工处理 ,对出错的事务进行干预。
-
cancel阶段: 在业务执行异常或出现错误的情况下,需要回滚事务的操作,执行分支事务的取消操作,并且释放try阶段预留的资源。默认一定成功。出错(异常):就要 重试或者人工处理 ,对出错的事务进行干预。
TCC 方案的优点:
- 锁定资源的粒度变小: 提升系统的性能。
- 保证分布式事务执行后数据一致性:
confirm阶段 和cancel阶段需具备幂等性。 - 解决
XA规范的单点故障问题: 主业务和分支业务都能集群部署。
TCC 方案的缺点:
- 耦合性: 代码需要耦合到具体业务。
- 开发成本: 业务方法都要拆分成
try、confirm和cancel三个阶段。
TCC 需要注意的问题
使用 TCC 方案解决分布式事务问题时,需要注意空回滚、悬挂和幂等的问题。
(1)空回滚问题
空回滚出现的原因:服务器宕机或者网络发生故障,未执行 try 阶段(或执行到一半)。
解决方案:判断是否执行了 try 阶段的方法
-
全局事务
ID:生成全局事务记录,贯穿整个分布式事务的执行流程。 -
分支事务记录表:用于记录分支事务,将全局事务
ID和 分支事务ID保存到分支事务表中。 -
执行
cancel阶段前,先读取分支事务表中的数据:try
(2)悬挂问题
悬挂问题出现的原因:预留业务资源后,无法继续往下处理。
try阶段:先注册分支事务,再执行RPC调用- 此时发生服务器宕机、应用崩溃或者网络异常等,
RPC调用超时 - 判定
RPC调用超时,就会回滚事务 - 这时,
RPC请求到了对应业务方,但此时事务已经回滚,try阶段预留的资源就无法释放了
解决方案:执行了 confirm 或 cancel 阶段,就不能再执行 try 阶段
- 在执行
try阶段的方法时,判断是否已有执行confirm或cancel阶段的记录 - 如果存在,则不再执行
try阶段的方法
(3)幂等问题
幂等主要是各业务方需要解决的业务问题。
幂等问题出现的原因:服务器宕机、应用崩溃或网络异常等原因,出现方法调用超时。
解决方案:可查状态
- 增加事务的执行状态
- 每次执行分支事务以及
confirm阶段 和cancel阶段的方法时,都查询此事务的执行状态
实际工作中 TCC 三种方案
(1)通用型
通用型,最常用的。

工作模板如下:
// 消息队列 + 事务消息
public void doBusiness() {
// 消息队列名称
String queueName = "queue";
// 消息内容:json 格式
String msg = "{}";
// 调用 MQ,预发送消息
String msgId = msgService.createPreMsg(queueName, msg);
try {
// 执行业务1 try(业务层面需要做好幂等、悬挂)
// 执行业务2 try(业务层面需要做好幂等、悬挂)
// 执行业务3 try(业务层面需要做好幂等、悬挂)
} catch (Exception e) {
// 回滚业务1 cancel(业务层面需要做好幂等、悬挂、空回滚)
// 回滚业务2 cancel(业务层面需要做好幂等、悬挂、空回滚)
// 回滚业务3 cancel(业务层面需要做好幂等、悬挂、空回滚)
}
RpcContext.getContext().asyncCall(() -> {
// 执行业务1 commit(业务层面需要做好幂等、悬挂)
// 执行业务2 commit(业务层面需要做好幂等、悬挂)
// 执行业务3 commit(业务层面需要做好幂等、悬挂)
msgService.confirmMsg(queueName, msgId);
});
}
复制代码
(2)异步确保型
你对接别人,还要别人加接口,别人肯定不愿意啊,还麻烦。
异步确保型 TCC 技术方案: 引入 可靠消息服务
- 优点:不要从业务服务进行配合改造,提供
try、confirm、cancel3个接口

(3)补偿型
在通用型基础上进行简化,只需再提供补偿接口,业务改造量小。
补偿型 TCC 技术方案: 从业务服务只需要提供 Do 和 Compensate 两个接口。
try 阶段有的话,也可能只做一些数据的校验。
Do Compensate
Tips : Saga 事务也是类似。

二、 Hmily-TCC 实战
以前不使用 hmily ,大部分是因为他很多配置还是 xml 形式,比较麻烦。
所以之前都是推荐使用 byte-tcc 。
接入 Hmily-TCC :
pom依赖导入:不同服务调用方式,导入不同的配置
<!-- dubbo -->
<dependency>
<groupId>org.dromara</groupId>
<artifactId>hmily-spring-boot-starter-dubbo</artifactId>
<version>${hmily.version}</version>
<exclusions>
<exclusion>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- springcloud -->
<dependency>
<groupId>org.dromara</groupId>
<artifactId>hmily-spring-boot-starter-springcloud</artifactId>
<version>${hmily.version}</version>
</dependency>
复制代码
- 开发:使用
hmliy注解
// 1. try 阶段
@Transactional(rollbackFor = Exception.class)
@HmilyTCC(confirmMethod = "confirm", cancelMethod = "cancel")
public void doBusiness() {
String txNo;
// 1. 幂等处理:若已处理过,则直接返回
// 2. 悬挂处理:若已处理过,则直接返回
// 3. 业务、RPC调用等
}
// 2. confirm 阶段
@Transactional(rollbackFor = Exception.class)
public void confirm(){
String txNo;
// 幂等处理
// do something
}
// 3. cancel 阶段
@Transactional(rollbackFor = Exception.class)
public void cancel(){
String txNo;
// 幂等处理
// do something
}
复制代码
- 服务间的调用
// 以 SpringCloud 中 Feign 为栗
// 需要加上注解:@Hmily
@FeignClient(value = "account-service")
public interface AccountClient {
@Hmily
@RequestMapping("/account-service/account/payment")
Boolean payment(@RequestBody AccountDTO accountDO);
}
复制代码
实战:模拟下订单减库存
以官方 demo 为栗。
- 拉取代码,编译
$ git clone git@github.com:dromara/hmily.git $ cd hmily/ $ mvn -DskipTests clean install -U 复制代码
- 构建项目,这里以
springcloud为例,使用hmily-demo-tcc-springcloud工程

- 执行
MySQL脚本
# 脚本在 $ cd ./hmily-demo/sql hmily-demo.sql 复制代码
-
修改项目配置
- 订单服务:修改
application.yml和hmily.yml中MySQL连接配置 - 账号服务:修改
application.yml和hmily.yml中MySQL连接配置 - 库存服务:修改
application.yml和hmily.yml中MySQL连接配置 - 注册中心:使用的是
eureka,不用修改
- 订单服务:修改
-
启动服务,先启动
eureka,再相继启动其他服务

-
验证:访问
swagger中/order/orderPay接口浏览器访问:
http://127.0.0.1:8090/swagger-ui.html

三、 Hmily-TCC 源码浅析
(1)框架初始化阶段
hmily-spring-boot-starter 下有 META-INF/spring.factories 文件:
org.springframework.boot.autoconfigure.EnableAutoConfiguration= org.dromara.hmily.spring.boot.starter.parent.configuration.HmilyAutoConfiguration 复制代码
hmily 框架会随着应用程序的启动而启动,并初始化类 HmilyAutoConfiguration :
@Configuration
@EnableAspectJAutoProxy(proxyTargetClass = true)
public class HmilyAutoConfiguration {
// 1. 处理添加 @HmilyTCC 注解的切面入口
@Bean
public SpringHmilyTransactionAspect hmilyTransactionAspect() {
return new SpringHmilyTransactionAspect();
}
// 2. 支持使用注解调用的 RPC 框架
@Bean
@ConditionalOnProperty(value = "hmily.support.rpc.annotation", havingValue = "true")
public BeanPostProcessor refererAnnotationBeanPostProcessor() {
return new RefererAnnotationBeanPostProcessor();
}
// 3. 框架启动初始化类
@Bean
@Qualifier("hmilyTransactionBootstrap")
@Primary
public HmilyApplicationContextAware hmilyTransactionBootstrap() {
return new HmilyApplicationContextAware();
}
}
复制代码
(2) TCC 的 Try 阶段

入口 @HmilyTCC :为 Hmily 框架处理 TCC 事务的切面:
// AbstractHmilyTransctionAspect是SpringHmilyTransactionAspect的父类。
@Aspect
public abstract class AbstractHmilyTransactionAspect {
private final HmilyTransactionInterceptor interceptor = new HmilyGlobalInterceptor();
@Pointcut("@annotation(org.dromara.hmily.annotation.HmilyTCC) || @annotation(org.dromara.hmily.annotation.HmilyTAC) || @annotation(org.dromara.hmily.annotation.HmilyXA)")
public void hmilyInterceptor() {
}
// 切面环绕执行
@Around("hmilyInterceptor()")
public Object interceptTccMethod(final ProceedingJoinPoint proceedingJoinPoint) throws Throwable {
return interceptor.invoke(proceedingJoinPoint);
}
}
复制代码
拦截器拦截,进入 HmilyTransactionInterceptor ,主要干 2 件事情:
-
select(context):根据Hmily事务上下文,获取事务处理器首次执行,事务上下文为
null,事务处理器是StarterHmilyTccTransactionHandler -
handleTransaction():执行Hmily事务
public class HmilyGlobalInterceptor implements HmilyTransactionInterceptor {
// ... ...
static {
// 根据引入不同的 RPC 支持包,获取不同的 RPC 参数加载器
// ===== 重点 =====
// 因为使用的是 SpringCloud,所以获取的是 SpringCloudParameterLoader
parameterLoader;
}
@Override
public Object invoke(final ProceedingJoinPoint pjp) throws Throwable {
HmilyTransactionContext context = parameterLoader.load();
return invokeWithinTransaction(context, pjp);
}
private Object invokeWithinTransaction(final HmilyTransactionContext context,
final ProceedingJoinPoint point) throws Throwable {
MethodSignature signature = (MethodSignature) point.getSignature();
// ===== 重点 =====
// 获取事务处理器,进行事务处理
// 首次执行,事务上下文为 null,事务处理器是 StarterHmilyTccTransactionHandler
return getRegistry(signature.getMethod()).select(context)
.handleTransaction(point, context);
}
// ... ...
}
复制代码
具体事务处理器执行:首先会做预处理 preTry ,即分布式事务开始的一些准备
public class StarterHmilyTccTransactionHandler
implements HmilyTransactionHandler, AutoCloseable {
@Override
public Object handleTransaction(final ProceedingJoinPoint point,
final HmilyTransactionContext context) throws Throwable {
Object returnValue;
try {
// 0. 这块主要做:创建主事务、存储、构建分支事务,创建事务上下文等。
HmilyTransaction hmilyTransaction = executor.preTry(point);
try {
// 执行切面进入点的原始 try 方法,也就是上文提到的 makePayment 方法
returnValue = point.proceed();
// try 执行成功事务日志状态
hmilyTransaction.setStatus(HmilyActionEnum.TRYING.getCode());
executor.updateStartStatus(hmilyTransaction);
} catch (Throwable throwable) {
// 如果出现异常, 异步执行 cancel 方法
disruptor.getProvider().onData(() -> {
executor.globalCancel(currentTransaction);
});
throw throwable;
}
// try 方法执行成功,执行 confirm方法
disruptor.getProvider().onData(() -> {
executor.globalConfirm(currentTransaction);
});
} finally {
// 清理资源与缓存
// 记录调用耗时时间
}
return returnValue;
}
}
复制代码
需要注意的是: confirm 和 cancel 是异步执行时,会有数据异常问题
场景:本先更新操作再插入操作,异步后可能变为先插入再更新了。
为了保证事务数据的一致性: 会根据事务 Id 一致性哈希算法。
- 同一个事务
Id会被同一线程顺序执行。
使用 HmilyContext 设置事务上下文有两种模式:
- 默认
ThreadLocal TransimttableThreadLocal:阿里提供的跨线程ThreadLocal的实现
RPC 调用: Feign
分布式事务的 RPC 进行调用: 通过 拦截器 在 header 里设置事务上下文
@Configuration
public class HmilyFeignConfiguration {
// 1. 对 RPC 调用进行参数的传递
@Bean
@Qualifier("hmilyFeignInterceptor")
public RequestInterceptor hmilyFeignInterceptor() {
return new HmilyFeignInterceptor();
}
// 2. 对添加了 Hmily 注解的 Bean 实例进行代理
@Bean
public HmilyFeignBeanPostProcessor feignPostProcessor() {
return new HmilyFeignBeanPostProcessor();
}
// 3. 处理 Hystrix 跨线程传递参数问题
@Bean
@ConditionalOnProperty(name = "feign.hystrix.enabled")
public HystrixConcurrencyStrategy hmilyHystrixConcurrencyStrategy() {
return new HmilyHystrixConcurrencyStrategy();
}
}
复制代码
- 对
RPC调用进行参数的传递
public class HmilyFeignInterceptor implements RequestInterceptor {
@Override
public void apply(final RequestTemplate requestTemplate) {
// 在 header 中设置事务上下文
RpcMediator.getInstance().transmit(requestTemplate::header,
HmilyContextHolder.get());
}
}
复制代码
- 对添加了
Hmily注解的Bean实例进行代理
public class HmilyFeignHandler implements InvocationHandler {
@Override
public Object invoke(final Object proxy, final Method method,
final Object[] args) throws Throwable {
if (Object.class.equals(method.getDeclaringClass())) {
return method.invoke(this, args);
} else {
// 获取事务上下文
final HmilyTransactionContext context = HmilyContextHolder.get();
if (Objects.isNull(context)) {
// 如果为空,则进行正常调用
return this.delegate.invoke(proxy, method, args);
}
final Hmily hmily = method.getAnnotation(Hmily.class);
if (Objects.isNull(hmily)) {
// 如果为空,则进行正常调用
return this.delegate.invoke(proxy, method, args);
}
try {
// 构建参与者对象,进行缓存
// ...
// 发起真正的调用
final Object invoke = delegate.invoke(proxy, method, args);
// 如果调用成功,缓存参与者对象至发起者
if (context.getRole() == HmilyRoleEnum.PARTICIPANT.getCode()) {
// ...
} else {
// ...
}
return invoke;
} catch (Throwable e) {
LOGGER.error("HmilyFeignHandler invoker exception :", e);
throw e;
}
}
}
}
复制代码
(3) TCC 的 Confirm 阶段
在所有 Try 流程执行完成,且没有异常的情况下:
- 使用
disrupto队列异步执行executor.globalConfirm(currentTransaction);
public final class HmilyTccTransactionExecutor {
public void globalConfirm(final HmilyTransaction currentTransaction)
throws HmilyRuntimeException {
// 更新事务状态为 confirm
currentTransaction.setStatus(HmilyActionEnum.CONFIRMING.getCode());
HmilyRepositoryStorage.updateHmilyTransactionStatus(currentTransaction);
// 从本地缓存里面获取所有的参与者对象
for (HmilyParticipant hmilyParticipant : hmilyParticipants) {
try {
// 如果参与者的角色是发起者
if (hmilyParticipant.getRole() == HmilyRoleEnum.START.getCode()) {
// 执行本地调用
} else {
// 执行 RPC 调用
}
successList.add(true);
} catch (Throwable e) {
//...
} finally {
HmilyContextHolder.remove();
}
}
if (successList.stream().allMatch(e -> e)) {
// 如果每个参与者都执行成功,删除主事务
HmilyRepositoryStorage.removeHmilyTransaction(currentTransaction);
}
}
}
复制代码
(4) TCC 的 Cancel 阶段
Cancel 流程是在分布式事务发起方在 Try 阶段有异常时调用:
executor.globalCancel(currentTransaction);
public final class HmilyTccTransactionExecutor {
public void globalCancel(final HmilyTransaction currentTransaction) {
// 更新事务日志状态为 cancel
currentTransaction.setStatus(HmilyActionEnum.CANCELING.getCode());
HmilyRepositoryStorage.updateHmilyTransactionStatus(currentTransaction);
for (HmilyParticipant hmilyParticipant : hmilyParticipants) {
try {
// 如果是发起者,执行本地调用
if (hmilyParticipant.getRole() == HmilyRoleEnum.START.getCode()) {
} else {
// 执行远端 RPC 调用
}
} catch (Throwable e) {
// ... ...
} finally {
HmilyContextHolder.remove();
}
}
}
}
复制代码
(5)事务恢复
事务恢复日志只针对非常特殊、极少的场景,在正常的流程中都会被清理掉。
出现场景:
- 在执行
try阶段方法时,服务宕机 - 执行
confirm阶段方法时,有RPC服务调用不成功 - 执行
cancel阶段方法时,有RPC服务调用不成功
解决方法:定时调度
在初始化 Hmily 框架启动阶段,创建并启动此定时任务。
DEATH
public class HmilyTransactionSelfRecoveryScheduled implements AutoCloseable {
private void selfTccRecovery() {
selfTccRecoveryExecutor.scheduleWithFixedDelay(() -> {
try {
// ...
for (HmilyParticipant hmilyParticipant : hmilyParticipantList) {
// 1. 判断是否超过最大重试次数
if (hmilyParticipant.getRetry() > hmilyConfig.getRetryMax()) {
// 更新日志状态为 DEATH
continue;
}
// 2. 如果事务处于 PRE_TRY 状态,即 try 还没执行,则无需处理
// 3. 锁事务日志:避免多个定时任务同时执行
// 若采用数据库来存储,则通过更新 version 字段来获取锁
final boolean successful
= hmilyRepository.lockHmilyParticipant(hmilyParticipant);
if (successful) {
// 根据全局事务id 获取全局事务对象
HmilyTransaction globalHmilyTransaction;
// 如果没有全局事务,证明事务流程已经完成
// 则根据自身的事务状态进行恢复
// 这种场景常见于 RPC 接口调用超时,但是自身执行又成功
if (Objects.isNull(globalHmilyTransaction)) {
tccRecovery(hmilyParticipant.getStatus(), hmilyParticipant);
} else {
// 根据全局事务状态进行恢复
tccRecovery(globalHmilyTransaction.getStatus(),
hmilyParticipant);
}
}
}
} catch (Exception e) {
LOGGER.error("hmily scheduled transaction log is error:", e);
}
}, hmilyConfig.getScheduledInitDelay(), hmilyConfig.getScheduledRecoveryDelay(), TimeUnit.SECONDS);
}
private void tccRecovery(final int status, final HmilyParticipant hmilyParticipant) {
// 如果事务状态是 TRYING 和 CANCELING,执行 cancel 阶段
if (status == HmilyActionEnum.TRYING.getCode()
|| status == HmilyActionEnum.CANCELING.getCode()) {
hmilyTransactionRecoveryService.cancel(hmilyParticipant);
} else if (status == HmilyActionEnum.CONFIRMING.getCode()) {
// 反之,执行 confirm 阶段
hmilyTransactionRecoveryService.confirm(hmilyParticipant);
}
}
}
复制代码
(6)事务日志存储
对于分布式事务来说,事务日志至关重要。

在事务日志的存储上, Hmily 支持多种介质 : File 、 Redis 、 MySQL 、 Zookeeper 等。
这里介绍以 MySQL 为主,其 sql 脚本位于: resource/mysql/schema.sql 。
TCC 事务日志的结构主要由 3 个类构成:
HmilyTransaction:事务主体类,包含多个HmilyParticipant, 对应hmily_transaction_global表HmilyParticipant:分支事务类,包含多个HmilyInvocation,对应hmily_transaction_participant表HmilyInvocation:事务方法的参数列表实体类
HmilyBootstrap 框架初始化时,会创建初始化事务恢复调度器:
TCC事务恢复单线程池:selfTccRecoveryExecutorTAC事务恢复单线程池:selfTacRecoveryExecutor- 事务日志清理线程池:
cleanHmilyTransactionExecutor - 物理删除线程池:
phyDeletedExecutor
Hmily 采用高性能队列 disruptor 进行事务日志的异步存储:
HmilyRepositoryEventPublisher TCC
最后
以上就是欣喜小懒猪最近收集整理的关于最终一致性性分布式事务 TCC的全部内容,更多相关最终一致性性分布式事务内容请搜索靠谱客的其他文章。
发表评论 取消回复