概述
一、前言
分布式事务:分布式条件下,多个节点操作的整体事务一致性。
特别是在微服务场景下,业务 A 和业务 B 关联,事务 A 成功,事务 B 失败,由于跨系统, 就会导致不被感知。 此时从整体来看,数据是不一致的。
分布式事务中的两大基本理论: CAP
理论 与 Base
理论。
分布式事务解决方案可以分为:
- 强一致性分布式事务解决方案:基于
CAP
理论 - 最终一致性分布式事务解决方案:基于
Base
理论
在最终一致性分布式事务解决方案中,典型的方案包括:
TCC
TCC
解决方案
适用场景:适用于具有强隔离性、严格一致性要求的业务场景,也适用于执行时间比较短的业务。
TCC
方案执行流程:
-
try
阶段: 仅做业务的一致性检查和预留相应的资源。 -
confirm
阶段: 当try
阶段所有分支事务执行成功后开始执行confirm
阶段。默认一定成功。出错(异常):就要 重试或者人工处理 ,对出错的事务进行干预。
-
cancel
阶段: 在业务执行异常或出现错误的情况下,需要回滚事务的操作,执行分支事务的取消操作,并且释放try
阶段预留的资源。默认一定成功。出错(异常):就要 重试或者人工处理 ,对出错的事务进行干预。
TCC
方案的优点:
- 锁定资源的粒度变小: 提升系统的性能。
- 保证分布式事务执行后数据一致性:
confirm
阶段 和cancel
阶段需具备幂等性。 - 解决
XA
规范的单点故障问题: 主业务和分支业务都能集群部署。
TCC
方案的缺点:
- 耦合性: 代码需要耦合到具体业务。
- 开发成本: 业务方法都要拆分成
try
、confirm
和cancel
三个阶段。
TCC
需要注意的问题
使用 TCC
方案解决分布式事务问题时,需要注意空回滚、悬挂和幂等的问题。
(1)空回滚问题
空回滚出现的原因:服务器宕机或者网络发生故障,未执行 try
阶段(或执行到一半)。
解决方案:判断是否执行了 try
阶段的方法
-
全局事务
ID
:生成全局事务记录,贯穿整个分布式事务的执行流程。 -
分支事务记录表:用于记录分支事务,将全局事务
ID
和 分支事务ID
保存到分支事务表中。 -
执行
cancel
阶段前,先读取分支事务表中的数据:try
(2)悬挂问题
悬挂问题出现的原因:预留业务资源后,无法继续往下处理。
try
阶段:先注册分支事务,再执行RPC
调用- 此时发生服务器宕机、应用崩溃或者网络异常等,
RPC
调用超时 - 判定
RPC
调用超时,就会回滚事务 - 这时,
RPC
请求到了对应业务方,但此时事务已经回滚,try
阶段预留的资源就无法释放了
解决方案:执行了 confirm
或 cancel
阶段,就不能再执行 try
阶段
- 在执行
try
阶段的方法时,判断是否已有执行confirm
或cancel
阶段的记录 - 如果存在,则不再执行
try
阶段的方法
(3)幂等问题
幂等主要是各业务方需要解决的业务问题。
幂等问题出现的原因:服务器宕机、应用崩溃或网络异常等原因,出现方法调用超时。
解决方案:可查状态
- 增加事务的执行状态
- 每次执行分支事务以及
confirm
阶段 和cancel
阶段的方法时,都查询此事务的执行状态
实际工作中 TCC
三种方案
(1)通用型
通用型,最常用的。
工作模板如下:
// 消息队列 + 事务消息 public void doBusiness() { // 消息队列名称 String queueName = "queue"; // 消息内容:json 格式 String msg = "{}"; // 调用 MQ,预发送消息 String msgId = msgService.createPreMsg(queueName, msg); try { // 执行业务1 try(业务层面需要做好幂等、悬挂) // 执行业务2 try(业务层面需要做好幂等、悬挂) // 执行业务3 try(业务层面需要做好幂等、悬挂) } catch (Exception e) { // 回滚业务1 cancel(业务层面需要做好幂等、悬挂、空回滚) // 回滚业务2 cancel(业务层面需要做好幂等、悬挂、空回滚) // 回滚业务3 cancel(业务层面需要做好幂等、悬挂、空回滚) } RpcContext.getContext().asyncCall(() -> { // 执行业务1 commit(业务层面需要做好幂等、悬挂) // 执行业务2 commit(业务层面需要做好幂等、悬挂) // 执行业务3 commit(业务层面需要做好幂等、悬挂) msgService.confirmMsg(queueName, msgId); }); } 复制代码
(2)异步确保型
你对接别人,还要别人加接口,别人肯定不愿意啊,还麻烦。
异步确保型 TCC
技术方案: 引入 可靠消息服务
- 优点:不要从业务服务进行配合改造,提供
try
、confirm
、cancel
3个接口
(3)补偿型
在通用型基础上进行简化,只需再提供补偿接口,业务改造量小。
补偿型 TCC
技术方案: 从业务服务只需要提供 Do
和 Compensate
两个接口。
try
阶段有的话,也可能只做一些数据的校验。
Do Compensate
Tips
: Saga
事务也是类似。
二、 Hmily-TCC
实战
以前不使用 hmily
,大部分是因为他很多配置还是 xml
形式,比较麻烦。
所以之前都是推荐使用 byte-tcc
。
接入 Hmily-TCC
:
pom
依赖导入:不同服务调用方式,导入不同的配置
<!-- dubbo --> <dependency> <groupId>org.dromara</groupId> <artifactId>hmily-spring-boot-starter-dubbo</artifactId> <version>${hmily.version}</version> <exclusions> <exclusion> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> </exclusion> </exclusions> </dependency> <!-- springcloud --> <dependency> <groupId>org.dromara</groupId> <artifactId>hmily-spring-boot-starter-springcloud</artifactId> <version>${hmily.version}</version> </dependency> 复制代码
- 开发:使用
hmliy
注解
// 1. try 阶段 @Transactional(rollbackFor = Exception.class) @HmilyTCC(confirmMethod = "confirm", cancelMethod = "cancel") public void doBusiness() { String txNo; // 1. 幂等处理:若已处理过,则直接返回 // 2. 悬挂处理:若已处理过,则直接返回 // 3. 业务、RPC调用等 } // 2. confirm 阶段 @Transactional(rollbackFor = Exception.class) public void confirm(){ String txNo; // 幂等处理 // do something } // 3. cancel 阶段 @Transactional(rollbackFor = Exception.class) public void cancel(){ String txNo; // 幂等处理 // do something } 复制代码
- 服务间的调用
// 以 SpringCloud 中 Feign 为栗 // 需要加上注解:@Hmily @FeignClient(value = "account-service") public interface AccountClient { @Hmily @RequestMapping("/account-service/account/payment") Boolean payment(@RequestBody AccountDTO accountDO); } 复制代码
实战:模拟下订单减库存
以官方 demo
为栗。
- 拉取代码,编译
$ git clone git@github.com:dromara/hmily.git $ cd hmily/ $ mvn -DskipTests clean install -U 复制代码
- 构建项目,这里以
springcloud
为例,使用hmily-demo-tcc-springcloud
工程
- 执行
MySQL
脚本
# 脚本在 $ cd ./hmily-demo/sql hmily-demo.sql 复制代码
-
修改项目配置
- 订单服务:修改
application.yml
和hmily.yml
中MySQL
连接配置 - 账号服务:修改
application.yml
和hmily.yml
中MySQL
连接配置 - 库存服务:修改
application.yml
和hmily.yml
中MySQL
连接配置 - 注册中心:使用的是
eureka
,不用修改
- 订单服务:修改
-
启动服务,先启动
eureka
,再相继启动其他服务
-
验证:访问
swagger
中/order/orderPay
接口浏览器访问:
http://127.0.0.1:8090/swagger-ui.html
三、 Hmily-TCC
源码浅析
(1)框架初始化阶段
hmily-spring-boot-starter
下有 META-INF/spring.factories
文件:
org.springframework.boot.autoconfigure.EnableAutoConfiguration= org.dromara.hmily.spring.boot.starter.parent.configuration.HmilyAutoConfiguration 复制代码
hmily
框架会随着应用程序的启动而启动,并初始化类 HmilyAutoConfiguration
:
@Configuration @EnableAspectJAutoProxy(proxyTargetClass = true) public class HmilyAutoConfiguration { // 1. 处理添加 @HmilyTCC 注解的切面入口 @Bean public SpringHmilyTransactionAspect hmilyTransactionAspect() { return new SpringHmilyTransactionAspect(); } // 2. 支持使用注解调用的 RPC 框架 @Bean @ConditionalOnProperty(value = "hmily.support.rpc.annotation", havingValue = "true") public BeanPostProcessor refererAnnotationBeanPostProcessor() { return new RefererAnnotationBeanPostProcessor(); } // 3. 框架启动初始化类 @Bean @Qualifier("hmilyTransactionBootstrap") @Primary public HmilyApplicationContextAware hmilyTransactionBootstrap() { return new HmilyApplicationContextAware(); } } 复制代码
(2) TCC
的 Try
阶段
入口 @HmilyTCC
:为 Hmily
框架处理 TCC
事务的切面:
// AbstractHmilyTransctionAspect是SpringHmilyTransactionAspect的父类。 @Aspect public abstract class AbstractHmilyTransactionAspect { private final HmilyTransactionInterceptor interceptor = new HmilyGlobalInterceptor(); @Pointcut("@annotation(org.dromara.hmily.annotation.HmilyTCC) || @annotation(org.dromara.hmily.annotation.HmilyTAC) || @annotation(org.dromara.hmily.annotation.HmilyXA)") public void hmilyInterceptor() { } // 切面环绕执行 @Around("hmilyInterceptor()") public Object interceptTccMethod(final ProceedingJoinPoint proceedingJoinPoint) throws Throwable { return interceptor.invoke(proceedingJoinPoint); } } 复制代码
拦截器拦截,进入 HmilyTransactionInterceptor
,主要干 2 件事情:
-
select(context)
:根据Hmily
事务上下文,获取事务处理器首次执行,事务上下文为
null
,事务处理器是StarterHmilyTccTransactionHandler
-
handleTransaction()
:执行Hmily
事务
public class HmilyGlobalInterceptor implements HmilyTransactionInterceptor { // ... ... static { // 根据引入不同的 RPC 支持包,获取不同的 RPC 参数加载器 // ===== 重点 ===== // 因为使用的是 SpringCloud,所以获取的是 SpringCloudParameterLoader parameterLoader; } @Override public Object invoke(final ProceedingJoinPoint pjp) throws Throwable { HmilyTransactionContext context = parameterLoader.load(); return invokeWithinTransaction(context, pjp); } private Object invokeWithinTransaction(final HmilyTransactionContext context, final ProceedingJoinPoint point) throws Throwable { MethodSignature signature = (MethodSignature) point.getSignature(); // ===== 重点 ===== // 获取事务处理器,进行事务处理 // 首次执行,事务上下文为 null,事务处理器是 StarterHmilyTccTransactionHandler return getRegistry(signature.getMethod()).select(context) .handleTransaction(point, context); } // ... ... } 复制代码
具体事务处理器执行:首先会做预处理 preTry
,即分布式事务开始的一些准备
public class StarterHmilyTccTransactionHandler implements HmilyTransactionHandler, AutoCloseable { @Override public Object handleTransaction(final ProceedingJoinPoint point, final HmilyTransactionContext context) throws Throwable { Object returnValue; try { // 0. 这块主要做:创建主事务、存储、构建分支事务,创建事务上下文等。 HmilyTransaction hmilyTransaction = executor.preTry(point); try { // 执行切面进入点的原始 try 方法,也就是上文提到的 makePayment 方法 returnValue = point.proceed(); // try 执行成功事务日志状态 hmilyTransaction.setStatus(HmilyActionEnum.TRYING.getCode()); executor.updateStartStatus(hmilyTransaction); } catch (Throwable throwable) { // 如果出现异常, 异步执行 cancel 方法 disruptor.getProvider().onData(() -> { executor.globalCancel(currentTransaction); }); throw throwable; } // try 方法执行成功,执行 confirm方法 disruptor.getProvider().onData(() -> { executor.globalConfirm(currentTransaction); }); } finally { // 清理资源与缓存 // 记录调用耗时时间 } return returnValue; } } 复制代码
需要注意的是: confirm
和 cancel
是异步执行时,会有数据异常问题
场景:本先更新操作再插入操作,异步后可能变为先插入再更新了。
为了保证事务数据的一致性: 会根据事务 Id
一致性哈希算法。
- 同一个事务
Id
会被同一线程顺序执行。
使用 HmilyContext
设置事务上下文有两种模式:
- 默认
ThreadLocal
TransimttableThreadLocal
:阿里提供的跨线程ThreadLocal
的实现
RPC
调用: Feign
分布式事务的 RPC
进行调用: 通过 拦截器 在 header
里设置事务上下文
@Configuration public class HmilyFeignConfiguration { // 1. 对 RPC 调用进行参数的传递 @Bean @Qualifier("hmilyFeignInterceptor") public RequestInterceptor hmilyFeignInterceptor() { return new HmilyFeignInterceptor(); } // 2. 对添加了 Hmily 注解的 Bean 实例进行代理 @Bean public HmilyFeignBeanPostProcessor feignPostProcessor() { return new HmilyFeignBeanPostProcessor(); } // 3. 处理 Hystrix 跨线程传递参数问题 @Bean @ConditionalOnProperty(name = "feign.hystrix.enabled") public HystrixConcurrencyStrategy hmilyHystrixConcurrencyStrategy() { return new HmilyHystrixConcurrencyStrategy(); } } 复制代码
- 对
RPC
调用进行参数的传递
public class HmilyFeignInterceptor implements RequestInterceptor { @Override public void apply(final RequestTemplate requestTemplate) { // 在 header 中设置事务上下文 RpcMediator.getInstance().transmit(requestTemplate::header, HmilyContextHolder.get()); } } 复制代码
- 对添加了
Hmily
注解的Bean
实例进行代理
public class HmilyFeignHandler implements InvocationHandler { @Override public Object invoke(final Object proxy, final Method method, final Object[] args) throws Throwable { if (Object.class.equals(method.getDeclaringClass())) { return method.invoke(this, args); } else { // 获取事务上下文 final HmilyTransactionContext context = HmilyContextHolder.get(); if (Objects.isNull(context)) { // 如果为空,则进行正常调用 return this.delegate.invoke(proxy, method, args); } final Hmily hmily = method.getAnnotation(Hmily.class); if (Objects.isNull(hmily)) { // 如果为空,则进行正常调用 return this.delegate.invoke(proxy, method, args); } try { // 构建参与者对象,进行缓存 // ... // 发起真正的调用 final Object invoke = delegate.invoke(proxy, method, args); // 如果调用成功,缓存参与者对象至发起者 if (context.getRole() == HmilyRoleEnum.PARTICIPANT.getCode()) { // ... } else { // ... } return invoke; } catch (Throwable e) { LOGGER.error("HmilyFeignHandler invoker exception :", e); throw e; } } } } 复制代码
(3) TCC
的 Confirm
阶段
在所有 Try
流程执行完成,且没有异常的情况下:
- 使用
disrupto
队列异步执行executor.globalConfirm(currentTransaction);
public final class HmilyTccTransactionExecutor { public void globalConfirm(final HmilyTransaction currentTransaction) throws HmilyRuntimeException { // 更新事务状态为 confirm currentTransaction.setStatus(HmilyActionEnum.CONFIRMING.getCode()); HmilyRepositoryStorage.updateHmilyTransactionStatus(currentTransaction); // 从本地缓存里面获取所有的参与者对象 for (HmilyParticipant hmilyParticipant : hmilyParticipants) { try { // 如果参与者的角色是发起者 if (hmilyParticipant.getRole() == HmilyRoleEnum.START.getCode()) { // 执行本地调用 } else { // 执行 RPC 调用 } successList.add(true); } catch (Throwable e) { //... } finally { HmilyContextHolder.remove(); } } if (successList.stream().allMatch(e -> e)) { // 如果每个参与者都执行成功,删除主事务 HmilyRepositoryStorage.removeHmilyTransaction(currentTransaction); } } } 复制代码
(4) TCC
的 Cancel
阶段
Cancel
流程是在分布式事务发起方在 Try
阶段有异常时调用:
executor.globalCancel(currentTransaction);
public final class HmilyTccTransactionExecutor { public void globalCancel(final HmilyTransaction currentTransaction) { // 更新事务日志状态为 cancel currentTransaction.setStatus(HmilyActionEnum.CANCELING.getCode()); HmilyRepositoryStorage.updateHmilyTransactionStatus(currentTransaction); for (HmilyParticipant hmilyParticipant : hmilyParticipants) { try { // 如果是发起者,执行本地调用 if (hmilyParticipant.getRole() == HmilyRoleEnum.START.getCode()) { } else { // 执行远端 RPC 调用 } } catch (Throwable e) { // ... ... } finally { HmilyContextHolder.remove(); } } } } 复制代码
(5)事务恢复
事务恢复日志只针对非常特殊、极少的场景,在正常的流程中都会被清理掉。
出现场景:
- 在执行
try
阶段方法时,服务宕机 - 执行
confirm
阶段方法时,有RPC
服务调用不成功 - 执行
cancel
阶段方法时,有RPC
服务调用不成功
解决方法:定时调度
在初始化 Hmily
框架启动阶段,创建并启动此定时任务。
DEATH
public class HmilyTransactionSelfRecoveryScheduled implements AutoCloseable { private void selfTccRecovery() { selfTccRecoveryExecutor.scheduleWithFixedDelay(() -> { try { // ... for (HmilyParticipant hmilyParticipant : hmilyParticipantList) { // 1. 判断是否超过最大重试次数 if (hmilyParticipant.getRetry() > hmilyConfig.getRetryMax()) { // 更新日志状态为 DEATH continue; } // 2. 如果事务处于 PRE_TRY 状态,即 try 还没执行,则无需处理 // 3. 锁事务日志:避免多个定时任务同时执行 // 若采用数据库来存储,则通过更新 version 字段来获取锁 final boolean successful = hmilyRepository.lockHmilyParticipant(hmilyParticipant); if (successful) { // 根据全局事务id 获取全局事务对象 HmilyTransaction globalHmilyTransaction; // 如果没有全局事务,证明事务流程已经完成 // 则根据自身的事务状态进行恢复 // 这种场景常见于 RPC 接口调用超时,但是自身执行又成功 if (Objects.isNull(globalHmilyTransaction)) { tccRecovery(hmilyParticipant.getStatus(), hmilyParticipant); } else { // 根据全局事务状态进行恢复 tccRecovery(globalHmilyTransaction.getStatus(), hmilyParticipant); } } } } catch (Exception e) { LOGGER.error("hmily scheduled transaction log is error:", e); } }, hmilyConfig.getScheduledInitDelay(), hmilyConfig.getScheduledRecoveryDelay(), TimeUnit.SECONDS); } private void tccRecovery(final int status, final HmilyParticipant hmilyParticipant) { // 如果事务状态是 TRYING 和 CANCELING,执行 cancel 阶段 if (status == HmilyActionEnum.TRYING.getCode() || status == HmilyActionEnum.CANCELING.getCode()) { hmilyTransactionRecoveryService.cancel(hmilyParticipant); } else if (status == HmilyActionEnum.CONFIRMING.getCode()) { // 反之,执行 confirm 阶段 hmilyTransactionRecoveryService.confirm(hmilyParticipant); } } } 复制代码
(6)事务日志存储
对于分布式事务来说,事务日志至关重要。
在事务日志的存储上, Hmily
支持多种介质 : File
、 Redis
、 MySQL
、 Zookeeper
等。
这里介绍以 MySQL
为主,其 sql
脚本位于: resource/mysql/schema.sql
。
TCC
事务日志的结构主要由 3 个类构成:
HmilyTransaction
:事务主体类,包含多个HmilyParticipant
, 对应hmily_transaction_global
表HmilyParticipant
:分支事务类,包含多个HmilyInvocation
,对应hmily_transaction_participant
表HmilyInvocation
:事务方法的参数列表实体类
HmilyBootstrap
框架初始化时,会创建初始化事务恢复调度器:
TCC
事务恢复单线程池:selfTccRecoveryExecutor
TAC
事务恢复单线程池:selfTacRecoveryExecutor
- 事务日志清理线程池:
cleanHmilyTransactionExecutor
- 物理删除线程池:
phyDeletedExecutor
Hmily
采用高性能队列 disruptor
进行事务日志的异步存储:
HmilyRepositoryEventPublisher TCC
最后
以上就是欣喜小懒猪为你收集整理的最终一致性性分布式事务 TCC的全部内容,希望文章能够帮你解决最终一致性性分布式事务 TCC所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复