概述
Seata框架介绍
Seata(Simple Extensible Autonomous Transaction Architecture) 是 阿里巴巴开源的分布式事务中间件,以高效并且对业务 0 侵入的方式,解决微服务场景下面临的分布式事务问题。
对于分布式事务和Seata框架本身的介绍本文就不再多赘述了,想了解更多Seata框架的细节,建议阅读Seata中文文档(相当详细和易懂):http://seata.io/zh-cn/docs/overview/what-is-seata.html
AT模式介绍
说到AT模式,就不得不说AT模式中的三大角色:
此图又是一个典型的分布式订单流程图解,其中大致的意思:TC属于服务器端,控制着整个订单流程的命脉,也是每个环节执行动作的监听者。TM和RM则是客户端,RM属于本地资源管理器,控制着自己本地的事务动作,并将事务执行结果告知TC。而TM则是定义了事务执行的范围,它的存在是为了统一多个本地事务的提交/回滚操作。
- TC:维护全局和分支事务的状态,驱动全局事务提交或回滚。
- TM:定义全局事务的范围,开始全局事务、提交或回滚全局事务。
- RM:管理分支事务处理的资源,与TC交谈以注册分支事务和报告分支事务的状态,并驱动分支事务提交或回滚。
除了以上三大重要角色,还有核心的流程控制:
- 一阶段:业务数据和回滚日志记录在同一个本地事务中提交,释放本地锁和连接资源。(执行用户SQL释放资源,生成快照)
- 二阶段:提交异步化,快速完成。回滚通过一阶段的回滚日志进行反向补偿。
源码分析
以上内容便是针对Seata框架中的AT模式进行大致的介绍,接下来为了了解更多的细节,开始本文的重点——源码分析。
而Seata这一框架也很好的融入进Spring体系,因此在初始化时,Seata同样是通过扫描bean的方式进行初始化,其中来关注下重点类GlobalTransactionScanner:
- 扫描并初始化客户端:
public class GlobalTransactionScanner extends AbstractAutoProxyCreator
implements ConfigurationChangeListener, InitializingBean, ApplicationContextAware, DisposableBean { // 此处实现了Spring体系中初始化bean的接口
...
@Override
public void afterPropertiesSet() {
ConfigurationCache.addConfigListener(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION, (ConfigurationChangeListener)this);
if (disableGlobalTransaction) { // 判断是否允许使用全局事务
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Global transaction is disabled.");
}
return;
}
if (initialized.compareAndSet(false, true)) { // 切换初始化状态,防止重复初始化
initClient(); // 初始化客户端
}
}
...
}
从以下初始化客户端的方法中可以明显看到,其中初始化了TM、RM客户端:
private void initClient() {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Initializing Global Transaction Clients ... ");
}
if (StringUtils.isNullOrEmpty(applicationId) || StringUtils.isNullOrEmpty(txServiceGroup)) {
throw new IllegalArgumentException(String.format("applicationId: %s, txServiceGroup: %s", applicationId, txServiceGroup));
}
// init TM
// 初始化TM客户端(applicationId、txServiceGroup由配置文件定义)
TMClient.init(applicationId, txServiceGroup, accessKey, secretKey);
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Transaction Manager Client is initialized. applicationId[{}] txServiceGroup[{}]", applicationId, txServiceGroup);
}
// init RM
// 初始化RM客户端(applicationId、txServiceGroup由配置文件定义)
RMClient.init(applicationId, txServiceGroup);
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Resource Manager is initialized. applicationId[{}] txServiceGroup[{}]", applicationId, txServiceGroup);
}
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Global Transaction Clients are initialized. ");
}
registerSpringShutdownHook();
}
- 初始化TM客户端:
public static void init(String applicationId, String transactionServiceGroup, String accessKey, String secretKey) {
TmNettyRemotingClient tmNettyRemotingClient = TmNettyRemotingClient.getInstance(applicationId, transactionServiceGroup, accessKey, secretKey); // 获取TM客户端实例
tmNettyRemotingClient.init(); // 客户端实例初始化
}
获取TM客户端实例:
public static TmNettyRemotingClient getInstance(String applicationId, String transactionServiceGroup, String accessKey, String secretKey) {
TmNettyRemotingClient tmRpcClient = getInstance(); // 获取实例
// 设置客户端信息
tmRpcClient.setApplicationId(applicationId);
tmRpcClient.setTransactionServiceGroup(transactionServiceGroup);
tmRpcClient.setAccessKey(accessKey);
tmRpcClient.setSecretKey(secretKey);
return tmRpcClient;
}
以下获取客户端实例方法,可以看到使用了线程安全的获取单例方式。并从中可知,所说的客户端其实是基于Netty实现的:
public static TmNettyRemotingClient getInstance() {
if (instance == null) {
synchronized (TmNettyRemotingClient.class) {
if (instance == null) {
NettyClientConfig nettyClientConfig = new NettyClientConfig();
final ThreadPoolExecutor messageExecutor = new ThreadPoolExecutor(
nettyClientConfig.getClientWorkerThreads(), nettyClientConfig.getClientWorkerThreads(),
KEEP_ALIVE_TIME, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(MAX_QUEUE_SIZE),
new NamedThreadFactory(nettyClientConfig.getTmDispatchThreadPrefix(),
nettyClientConfig.getClientWorkerThreads()),
RejectedPolicies.runsOldestTaskPolicy());
instance = new TmNettyRemotingClient(nettyClientConfig, null, messageExecutor);
}
}
}
return instance;
}
了解完如何获取TM客户端的实例后,回到下一步,初始化TM客户端:
@Override
public void init() {
registerProcessor(); // 注册处理者
if (initialized.compareAndSet(false, true)) {
super.init(); // 使用父类方法初始化
}
}
注册客户端对应的结果处理者:
private void registerProcessor() {
// 注册TC响应结果处理者(根据TC的响应码找到对应的处理者,进行下一步处理)
ClientOnResponseProcessor onResponseProcessor = new ClientOnResponseProcessor(mergeMsgMap, super.getFutures(), getTransactionMessageHandler());
super.registerProcessor(MessageType.TYPE_SEATA_MERGE_RESULT, onResponseProcessor, null);
super.registerProcessor(MessageType.TYPE_GLOBAL_BEGIN_RESULT, onResponseProcessor, null);
super.registerProcessor(MessageType.TYPE_GLOBAL_COMMIT_RESULT, onResponseProcessor, null);
super.registerProcessor(MessageType.TYPE_GLOBAL_REPORT_RESULT, onResponseProcessor, null);
super.registerProcessor(MessageType.TYPE_GLOBAL_ROLLBACK_RESULT, onResponseProcessor, null);
super.registerProcessor(MessageType.TYPE_GLOBAL_STATUS_RESULT, onResponseProcessor, null);
super.registerProcessor(MessageType.TYPE_REG_CLT_RESULT, onResponseProcessor, null);
// 注册心跳检测处理者(因为TM和TC的连接需要保证畅通,因此需要不断重发心跳信息检测通路)
ClientHeartbeatProcessor clientHeartbeatProcessor = new ClientHeartbeatProcessor();
super.registerProcessor(MessageType.TYPE_HEARTBEAT_MSG, clientHeartbeatProcessor, null);
}
调用父类初始化方法,其中可以看到,利用了计划任务线程池timerExecutor按照指定相隔时间不断的尝试发送TC重连消息,保证TM与TC的正常通路。
@Override
public void init() {
timerExecutor.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
clientChannelManager.reconnect(getTransactionServiceGroup()); // 不断重连,保证通路
}
}, SCHEDULE_DELAY_MILLS, SCHEDULE_INTERVAL_MILLS, TimeUnit.MILLISECONDS);
if (NettyClientConfig.isEnableClientBatchSendRequest()) {
mergeSendExecutorService = new ThreadPoolExecutor(MAX_MERGE_SEND_THREAD,
MAX_MERGE_SEND_THREAD,
KEEP_ALIVE_TIME, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(),
new NamedThreadFactory(getThreadPrefix(), MAX_MERGE_SEND_THREAD));
mergeSendExecutorService.submit(new MergedSendRunnable());
}
super.init();
clientBootstrap.start();
}
不断尝试重连,保持通路:
void reconnect(String transactionServiceGroup) {
List<String> availList = null;
try {
availList = getAvailServerList(transactionServiceGroup); // 根据配置定义的事务组名获取合法TC网络地址
} catch (Exception e) {
LOGGER.error("Failed to get available servers: {}", e.getMessage(), e);
return;
}
if (CollectionUtils.isEmpty(availList)) { // 网络地址为空,说明配置信息有误,直接返回,无需进行连接
RegistryService registryService = RegistryFactory.getInstance();
String clusterName = registryService.getServiceGroup(transactionServiceGroup);
if (StringUtils.isBlank(clusterName)) {
LOGGER.error("can not get cluster name in registry config '{}{}', please make sure registry config correct",
ConfigurationKeys.SERVICE_GROUP_MAPPING_PREFIX,
transactionServiceGroup);
return;
}
if (!(registryService instanceof FileRegistryServiceImpl)) {
LOGGER.error("no available service found in cluster '{}', please make sure registry config correct and keep your seata server running", clusterName);
}
return;
}
for (String serverAddress : availList) { // 遍历所有合法网络地址
try {
acquireChannel(serverAddress); // 请求连接
} catch (Exception e) {
LOGGER.error("{} can not connect to {} cause:{}",FrameworkErrorCode.NetConnect.getErrCode(), serverAddress, e.getMessage(), e);
}
}
}
请求连接TC网络地址:
Channel acquireChannel(String serverAddress) {
Channel channelToServer = channels.get(serverAddress); // 根据网络地址获取网络通路
if (channelToServer != null) { // 网络通路不为空,说明已连接
// 再判断当前通路是否可用,可用直接返回,不可用再重新进行连接
channelToServer = getExistAliveChannel(channelToServer, serverAddress);
if (channelToServer != null) {
return channelToServer;
}
}
if (LOGGER.isInfoEnabled()) {
LOGGER.info("will connect to " + serverAddress);
}
Object lockObj = CollectionUtils.computeIfAbsent(channelLocks, serverAddress, key -> new Object());
synchronized (lockObj) { // 同步方式,连接TC地址
return doConnect(serverAddress);
}
}
private Channel doConnect(String serverAddress) {
// 该网络地址的通路已存在,且可用,则直接返回
Channel channelToServer = channels.get(serverAddress);
if (channelToServer != null && channelToServer.isActive()) {
return channelToServer;
}
// 连接网络地址并创建网络通路
Channel channelFromPool;
try {
NettyPoolKey currentPoolKey = poolKeyFunction.apply(serverAddress);
NettyPoolKey previousPoolKey = poolKeyMap.putIfAbsent(serverAddress, currentPoolKey);
if (previousPoolKey != null && previousPoolKey.getMessage() instanceof RegisterRMRequest) {
RegisterRMRequest registerRMRequest = (RegisterRMRequest) currentPoolKey.getMessage();
((RegisterRMRequest) previousPoolKey.getMessage()).setResourceIds(registerRMRequest.getResourceIds());
}
channelFromPool = nettyClientKeyPool.borrowObject(poolKeyMap.get(serverAddress));
channels.put(serverAddress, channelFromPool); // 连接成功,并加入channels作为连接记录,方便下次重连查询
} catch (Exception exx) {
LOGGER.error("{} register RM failed.",FrameworkErrorCode.RegisterRM.getErrCode(), exx);
throw new FrameworkException("can not register RM,err:" + exx.getMessage());
}
return channelFromPool;
}
以上就是TM客户端的初始化逻辑。
- 初始化RM客户端:
RM客户端的初始化其实与TM客户端的初始化逻辑大体一致,因此接下来在源码中遇到相同的逻辑,便不再赘述了
public static void init(String applicationId, String transactionServiceGroup) {
RmNettyRemotingClient rmNettyRemotingClient = RmNettyRemotingClient.getInstance(applicationId, transactionServiceGroup); // 获取RM客户端示例
rmNettyRemotingClient.setResourceManager(DefaultResourceManager.get()); // 设置资源管理器
rmNettyRemotingClient.setTransactionMessageHandler(DefaultRMHandler.get()); // 设置消息回调监听器,用于接收服务器端在二阶段发出的提交/回滚请求(重点,在之后的源码分析会重点讲解)
rmNettyRemotingClient.init(); // 客户端实例初始化
}
@Override
public void init() {
registerProcessor(); // 注册请求响应处理者
if (initialized.compareAndSet(false, true)) {
super.init(); // 调用父类方法初始化
// Found one or more resources that were registered before initialization
if (resourceManager != null
&& !resourceManager.getManagedResources().isEmpty()
&& StringUtils.isNotBlank(transactionServiceGroup)) {
getClientChannelManager().reconnect(transactionServiceGroup);
}
}
}
以上便是TM、RM客户端初始化,以及连接Seata服务端的整体流程。
- 一阶段提交:
TM、RM已初始化完成后,下一步则是分布式事务的真正核心流程。如果你使用过Seata框架的AT模式,会发现在使用层面上的成本非常小,只需要在总的分布式事务执行入口方法加上注解@GlobalTransactional。有注解,相对应的就会有拦截器,接下来重点关注GlobalTransactionalInterceptor类的invoke拦截方法:
@Override
public Object invoke(final MethodInvocation methodInvocation) throws Throwable {
Class<?> targetClass = methodInvocation.getThis() != null ? AopUtils.getTargetClass(methodInvocation.getThis()) : null;
Method specificMethod = ClassUtils.getMostSpecificMethod(methodInvocation.getMethod(), targetClass);
if (specificMethod != null && !specificMethod.getDeclaringClass().equals(Object.class)) {
final Method method = BridgeMethodResolver.findBridgedMethod(specificMethod);
final GlobalTransactional globalTransactionalAnnotation = getAnnotation(method, targetClass, GlobalTransactional.class); // 获取拦截方法上的@GlobalTransactional注解
final GlobalLock globalLockAnnotation = getAnnotation(method, targetClass, GlobalLock.class);
boolean localDisable = disable || (degradeCheck && degradeNum >= degradeCheckAllowTimes);
if (!localDisable) { // 判断本地是否允许开启分布式事务
if (globalTransactionalAnnotation != null) { // 注解不为空,则说明拦截的方法上有@GlobalTransactional注解,开启分布式事务
return handleGlobalTransaction(methodInvocation, globalTransactionalAnnotation);
} else if (globalLockAnnotation != null) {
return handleGlobalLock(methodInvocation, globalLockAnnotation);
}
}
}
return methodInvocation.proceed(); // 本地不允许开启分布式事务/拦截方法上没有@GlobalTransactional注解,则按原方法执行
}
Object handleGlobalTransaction(final MethodInvocation methodInvocation, final GlobalTransactional globalTrxAnno) throws Throwable {
boolean succeed = true;
try {
// 定义事务执行对象,并通过调用事务模板对象开启执行事务
return transactionalTemplate.execute(new TransactionalExecutor() {
@Override
public Object execute() throws Throwable {
return methodInvocation.proceed(); // 执行原方法
}
public String name() {
String name = globalTrxAnno.name();
if (!StringUtils.isNullOrEmpty(name)) {
return name;
}
return formatMethod(methodInvocation.getMethod()); // 格式化原方法名,不重要
}
@Override
public TransactionInfo getTransactionInfo() {
// 若注解上有定义的事务执行合法超时时间,则使用定义的超时时间,否则使用默认值
int timeout = globalTrxAnno.timeoutMills();
if (timeout <= 0 || timeout == DEFAULT_GLOBAL_TRANSACTION_TIMEOUT) {
timeout = defaultGlobalTransactionTimeout;
}
// 以下内容都是添加使用者自定义的属性值,最后返回相关的事务信息,很简单,不过多描述
TransactionInfo transactionInfo = new TransactionInfo();
transactionInfo.setTimeOut(timeout);
transactionInfo.setName(name());
transactionInfo.setPropagation(globalTrxAnno.propagation());
transactionInfo.setLockRetryInternal(globalTrxAnno.lockRetryInternal());
transactionInfo.setLockRetryTimes(globalTrxAnno.lockRetryTimes());
Set<RollbackRule> rollbackRules = new LinkedHashSet<>();
for (Class<?> rbRule : globalTrxAnno.rollbackFor()) {
rollbackRules.add(new RollbackRule(rbRule));
}
for (String rbRule : globalTrxAnno.rollbackForClassName()) {
rollbackRules.add(new RollbackRule(rbRule));
}
for (Class<?> rbRule : globalTrxAnno.noRollbackFor()) {
rollbackRules.add(new NoRollbackRule(rbRule));
}
for (String rbRule : globalTrxAnno.noRollbackForClassName()) {
rollbackRules.add(new NoRollbackRule(rbRule));
}
transactionInfo.setRollbackRules(rollbackRules);
return transactionInfo;
}
});
} catch (TransactionalExecutor.ExecutionException e) {
// 事务执行失败
TransactionalExecutor.Code code = e.getCode();
switch (code) {
case RollbackDone:
throw e.getOriginalException();
case BeginFailure:
succeed = false;
failureHandler.onBeginFailure(e.getTransaction(), e.getCause());
throw e.getCause();
case CommitFailure:
succeed = false;
failureHandler.onCommitFailure(e.getTransaction(), e.getCause());
throw e.getCause();
case RollbackFailure:
failureHandler.onRollbackFailure(e.getTransaction(), e.getOriginalException());
throw e.getOriginalException();
case RollbackRetrying:
failureHandler.onRollbackRetrying(e.getTransaction(), e.getOriginalException());
throw e.getOriginalException();
default:
throw new ShouldNeverHappenException(String.format("Unknown TransactionalExecutor.Code: %s", code));
}
} finally {
if (degradeCheck) {
EVENT_BUS.post(new DegradeCheckEvent(succeed));
}
}
}
从上面的handleGlobalTransaction方法中可以看到,其中主要是定义了事务执行器的各种方法,而真正开启并执行事务的动作,还是通过transactionalTemplate.execute方法:
public Object execute(TransactionalExecutor business) throws Throwable {
TransactionInfo txInfo = business.getTransactionInfo(); // 获取事务相关信息
if (txInfo == null) {
throw new ShouldNeverHappenException("transactionInfo does not exist");
}
GlobalTransaction tx = GlobalTransactionContext.getCurrent(); // 获取当前事务对象(主要是通过全局事务XID进行判断)
Propagation propagation = txInfo.getPropagation(); // 获取事务信息中的事务传播级别
SuspendedResourcesHolder suspendedResourcesHolder = null;
try {
// 针对各个事务级别做特殊处理,在此不多讲,继续往下看
switch (propagation) {
case NOT_SUPPORTED:
// If transaction is existing, suspend it.
if (existingTransaction(tx)) {
suspendedResourcesHolder = tx.suspend();
}
// Execute without transaction and return.
return business.execute();
case REQUIRES_NEW:
// If transaction is existing, suspend it, and then begin new transaction.
if (existingTransaction(tx)) {
suspendedResourcesHolder = tx.suspend();
tx = GlobalTransactionContext.createNew();
}
// Continue and execute with new transaction
break;
case SUPPORTS:
// If transaction is not existing, execute without transaction.
if (notExistingTransaction(tx)) {
return business.execute();
}
// Continue and execute with new transaction
break;
case REQUIRED:
// If current transaction is existing, execute with current transaction,
// else continue and execute with new transaction.
break;
case NEVER:
// If transaction is existing, throw exception.
if (existingTransaction(tx)) {
throw new TransactionException(
String.format("Existing transaction found for transaction marked with propagation 'never', xid = %s"
, tx.getXid()));
} else {
// Execute without transaction and return.
return business.execute();
}
case MANDATORY:
// If transaction is not existing, throw exception.
if (notExistingTransaction(tx)) {
throw new TransactionException("No existing transaction found for transaction marked with propagation 'mandatory'");
}
// Continue and execute with current transaction.
break;
default:
throw new TransactionException("Not Supported Propagation:" + propagation);
}
if (tx == null) { // 当前事务对象为空,说明当前未开启事务,新建事务对象
tx = GlobalTransactionContext.createNew();
}
GlobalLockConfig previousConfig = replaceGlobalLockConfig(txInfo);
try {
beginTransaction(txInfo, tx); // 开启事务(重点)
Object rs;
try {
rs = business.execute(); // 执行原方法
} catch (Throwable ex) {
completeTransactionAfterThrowing(txInfo, tx, ex); // 原方法执行异常,回滚(重点)
throw ex;
}
commitTransaction(tx); // 原方法执行无异常,提交(重点)
return rs;
} finally {
// clear
resumeGlobalLockConfig(previousConfig);
triggerAfterCompletion();
cleanUp();
}
} finally {
// If the transaction is suspended, resume it.
if (suspendedResourcesHolder != null) {
tx.resume(suspendedResourcesHolder);
}
}
}
开启事务:
private void beginTransaction(TransactionInfo txInfo, GlobalTransaction tx) throws TransactionalExecutor.ExecutionException {
try {
triggerBeforeBegin(); // 执行前置钩子函数,做些开启事务前的动作
tx.begin(txInfo.getTimeOut(), txInfo.getName()); // 开启事务
triggerAfterBegin(); // 执行后置钩子函数,做些开启事务后的动作
} catch (TransactionException txe) {
throw new TransactionalExecutor.ExecutionException(tx, txe,TransactionalExecutor.Code.BeginFailure);
}
}
@Override
public void begin(int timeout, String name) throws TransactionException {
if (role != GlobalTransactionRole.Launcher) {
assertXIDNotNull();
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Ignore Begin(): just involved in global transaction [{}]", xid);
}
return;
}
assertXIDNull();
String currentXid = RootContext.getXID();
if (currentXid != null) {
throw new IllegalStateException("Global transaction already exists can't begin a new global transaction, currentXid = " + currentXid);
}
xid = transactionManager.begin(null, null, name, timeout); // 开启事务,通知TC,并返回全局事务XID
status = GlobalStatus.Begin; // 设置事务状态为已开启
RootContext.bind(xid); // 为事务上下文绑定全局事务XID
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Begin new global transaction [{}]", xid);
}
}
@Override
public String begin(String applicationId, String transactionServiceGroup, String name, int timeout)
throws TransactionException {
GlobalBeginRequest request = new GlobalBeginRequest();
request.setTransactionName(name);
request.setTimeout(timeout);
GlobalBeginResponse response = (GlobalBeginResponse) syncCall(request); // 发送同步请求,返回XID
if (response.getResultCode() == ResultCode.Failed) {
throw new TmTransactionException(TransactionExceptionCode.BeginFailed, response.getMsg());
}
return response.getXid();
}
以上便是开启事务的源码逻辑,而之后的提交/回滚其实也是同样的操作,封装请求发送,返回响应码,这部分逻辑都是一致,再次不做赘述了。
但是,到此一阶段的核心才刚刚开始。对于Seata的AT模式,它是依赖于底层数据的事务驱动,它之所以能够做到本地分支事务的回滚,是因为它对原本的数据源加了一层代理,在代理中生成能够执行回滚的SQL语句,因此才能让回滚的数据恢复。而Seata所代理的类分别是:
其中我们需要重点关注的是SQL执行过程中的SQL解析部分,也就是Statement相关的代理类。此处选择最常用的PreparedStatementProxy.execute方法进行源码分析(其他方法最终入口都是一致的):
public class PreparedStatementProxy extends AbstractPreparedStatementProxy implements PreparedStatement, ParametersHolder {
...
@Override
public boolean execute() throws SQLException {
return ExecuteTemplate.execute(this, (statement, args) -> statement.execute()); // 调用ExecuteTemplate.execute通用类方法解析执行SQL
}
...
}
public static <T, S extends Statement> T execute(StatementProxy<S> statementProxy,
StatementCallback<T, S> statementCallback,
Object... args) throws SQLException {
return execute(null, statementProxy, statementCallback, args);
}
public static <T, S extends Statement> T execute(List<SQLRecognizer> sqlRecognizers,
StatementProxy<S> statementProxy,
StatementCallback<T, S> statementCallback,
Object... args) throws SQLException {
if (!RootContext.requireGlobalLock() && BranchType.AT != RootContext.getBranchType()) { // 获取事务全局锁,以及判断当前分支事务类型是否是AT模式
return statementCallback.execute(statementProxy.getTargetStatement(), args); // 获取锁失败,以及当前分支事务类型不是AT模式,则按原statement执行SQL
}
String dbType = statementProxy.getConnectionProxy().getDbType();
if (CollectionUtils.isEmpty(sqlRecognizers)) {
// 根据SQL和数据类型获取SQL识别器(为了接下来的SQL解析,不同的数据库可能解析出来的SQL不同,因此需要在此作区分)
sqlRecognizers = SQLVisitorFactory.get(statementProxy.getTargetSQL(), dbType);
}
Executor<T> executor;
if (CollectionUtils.isEmpty(sqlRecognizers)) {
executor = new PlainExecutor<>(statementProxy, statementCallback);
} else {
if (sqlRecognizers.size() == 1) {
SQLRecognizer sqlRecognizer = sqlRecognizers.get(0);
// 区分SQL类型,获取不同的SQL执行器
switch (sqlRecognizer.getSQLType()) {
case INSERT:
executor = EnhancedServiceLoader.load(InsertExecutor.class, dbType,
new Class[]{StatementProxy.class, StatementCallback.class, SQLRecognizer.class},
new Object[]{statementProxy, statementCallback, sqlRecognizer});
break;
case UPDATE:
executor = new UpdateExecutor<>(statementProxy, statementCallback, sqlRecognizer);
break;
case DELETE:
executor = new DeleteExecutor<>(statementProxy, statementCallback, sqlRecognizer);
break;
case SELECT_FOR_UPDATE:
executor = new SelectForUpdateExecutor<>(statementProxy, statementCallback, sqlRecognizer);
break;
default:
executor = new PlainExecutor<>(statementProxy, statementCallback);
break;
}
} else {
executor = new MultiExecutor<>(statementProxy, statementCallback, sqlRecognizers);
}
}
T rs;
try {
rs = executor.execute(args); // 执行SQL
} catch (Throwable ex) {
if (!(ex instanceof SQLException)) {
// Turn other exception into SQLException
ex = new SQLException(ex);
}
throw (SQLException) ex;
}
return rs;
}
调用SQL执行器解析并执行SQL:
@Override
public T execute(Object... args) throws Throwable {
// 为当前数据库代理连接绑定全局事务id
String xid = RootContext.getXID();
if (xid != null) {
statementProxy.getConnectionProxy().bind(xid);
}
statementProxy.getConnectionProxy().setGlobalLockRequire(RootContext.requireGlobalLock());
return doExecute(args);
}
@Override
public T doExecute(Object... args) throws Throwable {
AbstractConnectionProxy connectionProxy = statementProxy.getConnectionProxy();
if (connectionProxy.getAutoCommit()) { // 判断事务是否自动提交,调用不同的执行方法
return executeAutoCommitTrue(args);
} else {
return executeAutoCommitFalse(args);
}
}
doExecute方法中可以看到两个方法,一个是属于事务自动提交的,另一个是属于事务手动提交的。这两个方法有何差异,先来看下自动提交的executeAutoCommitTrue:
protected T executeAutoCommitTrue(Object[] args) throws Throwable {
ConnectionProxy connectionProxy = statementProxy.getConnectionProxy();
try {
connectionProxy.changeAutoCommit(); // 将自动提交改为false
return new LockRetryPolicy(connectionProxy).execute(() -> {
T result = executeAutoCommitFalse(args); // 调用非自动提交的事务方法,解析执行SQL
connectionProxy.commit(); // 执行完后手动提交事务
return result;
});
} catch (Exception e) {
// when exception occur in finally,this exception will lost, so just print it here
LOGGER.error("execute executeAutoCommitTrue error:{}", e.getMessage(), e);
if (!LockRetryPolicy.isLockRetryPolicyBranchRollbackOnConflict()) {
connectionProxy.getTargetConnection().rollback();
}
throw e;
} finally {
connectionProxy.getContext().reset();
connectionProxy.setAutoCommit(true); // 执行完SQL后,将自动提交重新改为true
}
}
从自动提交事务的方法中看到,最终还是调用了手动提交事务的方法解析执行SQL。这是因为在执行SQL之前,需要生成回滚SQL(undoSql),以及执行SQL前后的数据镜像。因此需要在自动提交的方法中,首先将自动提交改为false,执行完操作后,再重置。
因此,重点又到了手动提交事务的方法executeAutoCommitFalse:
protected T executeAutoCommitFalse(Object[] args) throws Exception {
if (!JdbcConstants.MYSQL.equalsIgnoreCase(getDbType()) && isMultiPk()) {
throw new NotSupportYetException("multi pk only support mysql!");
}
TableRecords beforeImage = beforeImage(); // SQL执行前镜像
T result = statementCallback.execute(statementProxy.getTargetStatement(), args); // SQL执行
TableRecords afterImage = afterImage(beforeImage); // SQL执行后镜像
prepareUndoLog(beforeImage, afterImage); // 准备回滚日志信息,用于SQL执行异常回滚
return result;
}
到此,一阶段的部分的源码分析就结束了,这部分主要是针对于TM、RM如何初始化连接TC服务器,以及Seata在数据库底层解析执行SQL的过程代理操作源码进行了分析。
- 二阶段提交:
执行完业务SQL,以及UndoLog后,接着就是在ConnectionProxy中进行提交commit:
@Override
public void commit() throws SQLException {
try {
LOCK_RETRY_POLICY.execute(() -> {
doCommit(); // 提交
return null;
});
} catch (SQLException e) {
if (targetConnection != null && !getAutoCommit() && !getContext().isAutoCommitChanged()) {
rollback(); // 异常回滚
}
throw e;
} catch (Exception e) {
throw new SQLException(e);
}
}
- 提交:
private void doCommit() throws SQLException {
if (context.inGlobalTransaction()) {
processGlobalTransactionCommit(); // 如果处于全局事务当中,则使用全局事务提交方法(重点)
} else if (context.isGlobalLockRequire()) {
processLocalCommitWithGlobalLocks(); // 如果处于全局事务,并使用了全局锁注解,则使用全局事务锁提交方法
} else {
targetConnection.commit(); // 否则正常事务提交
}
}
private void processGlobalTransactionCommit() throws SQLException {
try {
register(); // 注册分支事务
} catch (TransactionException e) {
recognizeLockKeyConflictException(e, context.buildLockKeys());
}
try {
UndoLogManagerFactory.getUndoLogManager(this.getDbType()).flushUndoLogs(this); // UndoLog数据入库
targetConnection.commit(); // 本地事务提交
} catch (Throwable ex) {
LOGGER.error("process connectionProxy commit error: {}", ex.getMessage(), ex);
report(false); // 本地事务提交异常,向TC汇报分支事务提交失败
throw new SQLException(ex);
}
if (IS_REPORT_SUCCESS_ENABLE) {
report(true); // 本地事务提交成功,向TC汇报分支事务提交成功
}
context.reset(); // 分支事务执行完毕,数据库连接上下文信息清空
}
RM向TC注册分支事务:
private void register() throws TransactionException {
if (!context.hasUndoLog() || !context.hasLockKey()) {
return;
}
Long branchId = DefaultResourceManager.get().branchRegister(BranchType.AT, getDataSourceProxy().getResourceId(),
null, context.getXid(), null, context.buildLockKeys()); // 根据本地事务信息向TC注册分支,并返回分支id
context.setBranchId(branchId); // 将分支id绑定到数据库连接上下文
}
UndoLog入库:
@Override
public void flushUndoLogs(ConnectionProxy cp) throws SQLException {
ConnectionContext connectionContext = cp.getContext();
if (!connectionContext.hasUndoLog()) {
return;
}
// 根据事务信息封装UndoLog数据
String xid = connectionContext.getXid();
long branchId = connectionContext.getBranchId();
BranchUndoLog branchUndoLog = new BranchUndoLog();
branchUndoLog.setXid(xid);
branchUndoLog.setBranchId(branchId);
branchUndoLog.setSqlUndoLogs(connectionContext.getUndoItems());
UndoLogParser parser = UndoLogParserFactory.getInstance();
byte[] undoLogContent = parser.encode(branchUndoLog);
CompressorType compressorType = CompressorType.NONE;
if (needCompress(undoLogContent)) {
compressorType = ROLLBACK_INFO_COMPRESS_TYPE;
undoLogContent = CompressorFactory.getCompressor(compressorType.getCode()).compress(undoLogContent);
}
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Flushing UNDO LOG: {}", new String(undoLogContent, Constants.DEFAULT_CHARSET));
}
insertUndoLogWithNormal(xid, branchId, buildContext(parser.getName(), compressorType), undoLogContent, cp.getTargetConnection()); // UndoLog入库
}
RM向TC汇报本地事务提交情况:
private void report(boolean commitDone) throws SQLException {
if (context.getBranchId() == null) {
return;
}
int retry = REPORT_RETRY_COUNT; // 重试次数
while (retry > 0) {
try {
DefaultResourceManager.get().branchReport(BranchType.AT, context.getXid(), context.getBranchId(),
commitDone ? BranchStatus.PhaseOne_Done : BranchStatus.PhaseOne_Failed, null); // 向TC汇报本地分支事务提交情况
return;
} catch (Throwable ex) {
LOGGER.error("Failed to report [" + context.getBranchId() + "/" + context.getXid() + "] commit done [" + commitDone + "] Retry Countdown: " + retry);
retry--; // 发送失败,重试次数减一
if (retry == 0) { // 重试次数达到最大值,抛出异常
throw new SQLException("Failed to report branch status " + commitDone, ex);
}
}
}
}
到此,二阶段提交的执行逻辑似乎已经完毕。其实不然,这一步的完成,只是RM向TC汇报本地分支事务的提交情况,汇报请求是发送了,但是如何处理TC的响应呢(UndoLog的异步删除、数据回滚)?
在本文的源码分析开头部分介绍了RM客户端的初始话,在这个过程中有一步:“设置消息回调监听器,用于接收服务器端在二阶段发出的提交/回滚请求”(向上翻看)。而这个所谓的消息回调监听器,就是用于处理TC响应:
public abstract class AbstractRMHandler extends AbstractExceptionHandler implements RMInboundHandler, TransactionMessageHandler {
private static final Logger LOGGER = LoggerFactory.getLogger(AbstractRMHandler.class);
/**
* 处理TC可提交请求
*/
@Override
public BranchCommitResponse handle(BranchCommitRequest request) {
BranchCommitResponse response = new BranchCommitResponse();
exceptionHandleTemplate(new AbstractCallback<BranchCommitRequest, BranchCommitResponse>() {
@Override
public void execute(BranchCommitRequest request, BranchCommitResponse response) throws TransactionException {
doBranchCommit(request, response);
}
}, request, response);
return response;
}
/**
* 处理TC异常回滚请求
*/
@Override
public BranchRollbackResponse handle(BranchRollbackRequest request) {
BranchRollbackResponse response = new BranchRollbackResponse();
exceptionHandleTemplate(new AbstractCallback<BranchRollbackRequest, BranchRollbackResponse>() {
@Override
public void execute(BranchRollbackRequest request, BranchRollbackResponse response) throws TransactionException {
doBranchRollback(request, response);
}
}, request, response);
return response;
}
...
}
全局事务执行成功(也就是当前TM所调用的RM分支事务全部执行并提交成功),分支事务提交。其实分支事务在汇报给TC之前就已经将本地事务提交了,因此在此处只需要将无用的UndoLog日志删除即可:
protected void doBranchCommit(BranchCommitRequest request, BranchCommitResponse response) throws TransactionException {
String xid = request.getXid();
long branchId = request.getBranchId();
String resourceId = request.getResourceId();
String applicationData = request.getApplicationData();
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Branch committing: " + xid + " " + branchId + " " + resourceId + " " + applicationData);
}
BranchStatus status = getResourceManager().branchCommit(request.getBranchType(), xid, branchId, resourceId, applicationData); // 分支事务提交(重点)
response.setXid(xid);
response.setBranchId(branchId);
response.setBranchStatus(status);
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Branch commit result: " + status);
}
}
将事务信息加入异步队列,以异步的方式处理,提高效率:
@Override
public BranchStatus branchCommit(BranchType branchType, String xid, long branchId, String resourceId, String applicationData) throws TransactionException {
return asyncWorker.branchCommit(xid, branchId, resourceId); // 将事务信息加入异步请求队列
}
public BranchStatus branchCommit(String xid, long branchId, String resourceId) {
// 将信息封装成异步队列元素,加入队列中,并返回分支状态
Phase2Context context = new Phase2Context(xid, branchId, resourceId);
addToCommitQueue(context);
return BranchStatus.PhaseTwo_Committed;
}
private void addToCommitQueue(Phase2Context context) {
if (commitQueue.offer(context)) { // 此处的commitQueue其实是阻塞队列,判断如果事务信息已经存在,则不需要加入
return;
}
CompletableFuture
.runAsync(this::doBranchCommitSafely, scheduledExecutor)
.thenRun(() -> addToCommitQueue(context)); // 不断尝试加入阻塞队列
}
到此,只看到了将事务信息添加到异步队列中,那么阻塞队列中的事务信息如何处理?答案是在AsyncWorker的构造器中提供了计划任务线程池来处理:
public AsyncWorker(DataSourceManager dataSourceManager) {
this.dataSourceManager = dataSourceManager;
LOGGER.info("Async Commit Buffer Limit: {}", ASYNC_COMMIT_BUFFER_LIMIT);
commitQueue = new LinkedBlockingQueue<>(ASYNC_COMMIT_BUFFER_LIMIT);
ThreadFactory threadFactory = new NamedThreadFactory("AsyncWorker", 2, true);
scheduledExecutor = new ScheduledThreadPoolExecutor(2, threadFactory); // 定义计划任务线程池
scheduledExecutor.scheduleAtFixedRate(this::doBranchCommitSafely, 10, 1000, TimeUnit.MILLISECONDS); // 按照指定相隔时间,不断执行doBranchCommitSafely方法,处理事务信息
}
真正处理提交事务信息的方法入口:
void doBranchCommitSafely() {
try {
doBranchCommit();
} catch (Throwable e) {
LOGGER.error("Exception occur when doing branch commit", e);
}
}
private void doBranchCommit() {
if (commitQueue.isEmpty()) { // 阻塞队列为空,无需处理,直接返回
return;
}
List<Phase2Context> allContexts = new LinkedList<>();
commitQueue.drainTo(allContexts); // 将阻塞队列中的事务信息转移到allContexts列表中
Map<String, List<Phase2Context>> groupedContexts = groupedByResourceId(allContexts); // 根据TM处理范围进行事务信息分类(同一次TM调用的RM本地分支事务为一组)
groupedContexts.forEach(this::dealWithGroupedContexts);
}
private void dealWithGroupedContexts(String resourceId, List<Phase2Context> contexts) {
DataSourceProxy dataSourceProxy = dataSourceManager.get(resourceId);
if (dataSourceProxy == null) {
LOGGER.warn("Failed to find resource for {}", resourceId);
return;
}
Connection conn;
try {
conn = dataSourceProxy.getPlainConnection();
} catch (SQLException sqle) {
LOGGER.error("Failed to get connection for async committing on {}", resourceId, sqle);
return;
}
UndoLogManager undoLogManager = UndoLogManagerFactory.getUndoLogManager(dataSourceProxy.getDbType());
List<List<Phase2Context>> splitByLimit = Lists.partition(contexts, UNDOLOG_DELETE_LIMIT_SIZE); // 对需要删除的UndoLog数据进行限制,每次删除一千条
splitByLimit.forEach(partition -> deleteUndoLog(conn, undoLogManager, partition)); // 分批次批量删除UndoLog
}
private void deleteUndoLog(Connection conn, UndoLogManager undoLogManager, List<Phase2Context> contexts) {
// 以下就是很常规的SQL执行操作了,在此不过多讲解
Set<String> xids = new LinkedHashSet<>(contexts.size());
Set<Long> branchIds = new LinkedHashSet<>(contexts.size());
contexts.forEach(context -> {
xids.add(context.xid);
branchIds.add(context.branchId);
});
try {
undoLogManager.batchDeleteUndoLog(xids, branchIds, conn);
if (!conn.getAutoCommit()) {
conn.commit();
}
} catch (SQLException e) {
LOGGER.error("Failed to batch delete undo log", e);
try {
conn.rollback();
} catch (SQLException rollbackEx) {
LOGGER.error("Failed to rollback JDBC resource after deleting undo log failed", rollbackEx);
}
} finally {
try {
conn.close();
} catch (SQLException closeEx) {
LOGGER.error("Failed to close JDBC resource after deleting undo log", closeEx);
}
}
}
以上便是全局事务提交成功,分支事务异步删除UndoLog的整体逻辑。
最后,剩下还有如何处理全局事务失败,分支事务回滚:
protected void doBranchRollback(BranchRollbackRequest request, BranchRollbackResponse response)
throws TransactionException {
String xid = request.getXid();
long branchId = request.getBranchId();
String resourceId = request.getResourceId();
String applicationData = request.getApplicationData();
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Branch Rollbacking: " + xid + " " + branchId + " " + resourceId);
}
BranchStatus status = getResourceManager().branchRollback(request.getBranchType(), xid, branchId, resourceId, applicationData); // 回滚分支事务
response.setXid(xid);
response.setBranchId(branchId);
response.setBranchStatus(status);
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Branch Rollbacked result: " + status);
}
}
@Override
public BranchStatus branchRollback(BranchType branchType, String xid, long branchId, String resourceId, String applicationData) throws TransactionException {
DataSourceProxy dataSourceProxy = get(resourceId);
if (dataSourceProxy == null) {
throw new ShouldNeverHappenException();
}
try {
UndoLogManagerFactory.getUndoLogManager(dataSourceProxy.getDbType()).undo(dataSourceProxy, xid, branchId); // 执行UndoLog,回滚数据
} catch (TransactionException te) {
StackTraceLogger.info(LOGGER, te,
"branchRollback failed. branchType:[{}], xid:[{}], branchId:[{}], resourceId:[{}], applicationData:[{}]. reason:[{}]",
new Object[]{branchType, xid, branchId, resourceId, applicationData, te.getMessage()});
if (te.getCode() == TransactionExceptionCode.BranchRollbackFailed_Unretriable) {
return BranchStatus.PhaseTwo_RollbackFailed_Unretryable;
} else {
return BranchStatus.PhaseTwo_RollbackFailed_Retryable;
}
}
return BranchStatus.PhaseTwo_Rollbacked;
}
@Override
public void undo(DataSourceProxy dataSourceProxy, String xid, long branchId) throws TransactionException {
Connection conn = null;
ResultSet rs = null;
PreparedStatement selectPST = null;
boolean originalAutoCommit = true;
for (; ; ) {
try {
conn = dataSourceProxy.getPlainConnection();
// The entire undo process should run in a local transaction.
if (originalAutoCommit = conn.getAutoCommit()) {
conn.setAutoCommit(false);
}
selectPST = conn.prepareStatement(SELECT_UNDO_LOG_SQL); // 根据分支事务ID和全局事务ID为条件,构建查询指定UndoLog数据的SQL
selectPST.setLong(1, branchId);
selectPST.setString(2, xid);
rs = selectPST.executeQuery();
boolean exists = false;
while (rs.next()) {
exists = true; // 在一阶段提交中UndoLog写入成功,标志存在
int state = rs.getInt(ClientTableColumnsName.UNDO_LOG_LOG_STATUS); // 查看UndoLog数据的状态,确保只处理正常状态下UndoLog
if (!canUndo(state)) {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("xid {} branch {}, ignore {} undo_log", xid, branchId, state);
}
return;
}
String contextString = rs.getString(ClientTableColumnsName.UNDO_LOG_CONTEXT);
Map<String, String> context = parseContext(contextString);
byte[] rollbackInfo = getRollbackInfo(rs);
String serializer = context == null ? null : context.get(UndoLogConstants.SERIALIZER_KEY);
UndoLogParser parser = serializer == null ? UndoLogParserFactory.getInstance()
: UndoLogParserFactory.getInstance(serializer);
BranchUndoLog branchUndoLog = parser.decode(rollbackInfo);
try {
// put serializer name to local
setCurrentSerializer(parser.getName());
List<SQLUndoLog> sqlUndoLogs = branchUndoLog.getSqlUndoLogs();
if (sqlUndoLogs.size() > 1) {
Collections.reverse(sqlUndoLogs);
}
for (SQLUndoLog sqlUndoLog : sqlUndoLogs) {
TableMeta tableMeta = TableMetaCacheFactory.getTableMetaCache(dataSourceProxy.getDbType()).getTableMeta(
conn, sqlUndoLog.getTableName(), dataSourceProxy.getResourceId());
sqlUndoLog.setTableMeta(tableMeta);
AbstractUndoExecutor undoExecutor = UndoExecutorFactory.getUndoExecutor(dataSourceProxy.getDbType(), sqlUndoLog);
undoExecutor.executeOn(conn); // 回滚数据
}
} finally {
// remove serializer name
removeCurrentSerializer();
}
}
if (exists) { // UndoLog存在,并已成功回滚数据
deleteUndoLog(xid, branchId, conn); // 删除UndoLog
conn.commit();
if (LOGGER.isInfoEnabled()) {
LOGGER.info("xid {} branch {}, undo_log deleted with {}", xid, branchId, State.GlobalFinished.name());
}
} else { // UndoLog不存在,说明在一阶段可能写入UndoLog时发生异常,但业务SQL已执行
insertUndoLogWithGlobalFinished(xid, branchId, UndoLogParserFactory.getInstance(), conn); // 重新添加UndoLog
conn.commit();
if (LOGGER.isInfoEnabled()) {
LOGGER.info("xid {} branch {}, undo_log added with {}", xid, branchId, State.GlobalFinished.name());
}
}
return;
} catch (SQLIntegrityConstraintViolationException e) {
// Possible undo_log has been inserted into the database by other processes, retrying rollback undo_log
if (LOGGER.isInfoEnabled()) {
LOGGER.info("xid {} branch {}, undo_log inserted, retry rollback", xid, branchId);
}
} catch (Throwable e) {
if (conn != null) {
try {
conn.rollback();
} catch (SQLException rollbackEx) {
LOGGER.warn("Failed to close JDBC resource while undo ... ", rollbackEx);
}
}
throw new BranchTransactionException(BranchRollbackFailed_Retriable, String.format("Branch session rollback failed and try again later xid = %s branchId = %s %s", xid, branchId, e.getMessage()), e);
} finally {
try {
if (rs != null) {
rs.close();
}
if (selectPST != null) {
selectPST.close();
}
if (conn != null) {
if (originalAutoCommit) {
conn.setAutoCommit(true);
}
conn.close();
}
} catch (SQLException closeEx) {
LOGGER.warn("Failed to close JDBC resource while undo ... ", closeEx);
}
}
}
}
至此,Seata框架的AT模式的源码分析到此结束。
最后
以上就是强健季节为你收集整理的Seata框架源码分析——AT模式源码分析的全部内容,希望文章能够帮你解决Seata框架源码分析——AT模式源码分析所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复