SEATA分布式事务的XA模式如何实现?

摘要:一、传统分布式XA事务的2PC 2PC 即两阶段提交协议,是将整个事务流程分为两个阶段,准备阶段(Prepare phase)、提交阶段(commit phase),2 是指两个阶段,P 是指准备阶段,C 是指提交阶段。常见的关系型数据库如
一、传统分布式XA事务的2PC   2PC 即两阶段提交协议,是将整个事务流程分为两个阶段,准备阶段(Prepare phase)、提交阶段(commit phase),2 是指两个阶段,P 是指准备阶段,C 是指提交阶段。常见的关系型数据库如 Oracle、MySQL 都支持两阶段提交协议,如下图: 成功情况 失败情况 ①、准备阶段(Prepare phase):TM(事务管理器)给每个RM(资源管理器,也就是数据库)发送 Prepare 消息,每个RM在本地执行事务,并写本地的 Undo/Redo (Undo 日志是记录修改前的数据,用于数据库回滚,Redo 日志是记录修改后的数据,用于提交事务后写入数据文件)日志,此时事务没有提交。 ②、提交阶段(commit phase):如果TM(事务管理器)收到了RM(资源管理器,也就是数据库)的执行失败或者超时消息时,直接给每个RM发送回滚(Rollback)消息;否则,发送提交(Commit)消息;参与者根据事务管理器的指令执行提交(Commit)或者回滚(Rollback)操作,并释放事务处理过程中使用的锁资源。注意:必须在最后阶段释放锁资源。   传统XA事务的详细细节和实现方式,请查看我的另一篇博客:mysql数据库事务的实现和XA事务 二、Seata的XA模式   seata实现分布式事务的样例程序   在 Seata 定义的分布式事务框架内,XA模式是利用RM(资源管理器,也就是数据库)对 XA 协议的支持,以 XA 协议的机制来管理分支事务的一种事务模式,如下所示: 从上面的图可以看到,seata XA 模式分为以下5个步骤: ①、TM(事务管理器) 开启全局事务; ②、RM 向 TC(事务协调者) 注册分支事务; ③、RM 向 TC 报告分支事务状态; ④、TC 向 RM (资源管理器,也就是数据库)发送commit/rollback 请求; ⑤、TM 结束全局事务Global Commit/Rollback   负责RM 客户端的类是RmNettyRemotingClient.class,这个类的UML图如下所示: RmNettyRemotingClient.class的父类中的内部类AbstractNettyRemotingClient.class::ClientHandler.class来处理 TC 发来的请求并再次委托给父类AbstractNettyRemoting.class::processMessage()函数来处理TC发来的请求 AbstractNettyRemotingClient.class::ClientHandler.class的源码如下: ...省略部分导包代码... import io.netty.channel.ChannelDuplexHandler; import io.netty.channel.ChannelHandlerContext; ...省略部分导包代码... public abstract class AbstractNettyRemotingClient extends AbstractNettyRemoting implements RemotingClient { ...省略部分代码... @Sharable class ClientHandler extends ChannelDuplexHandler { ClientHandler() { } public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception { if (msg instanceof RpcMessage) { AbstractNettyRemotingClient.this.processMessage(ctx, (RpcMessage)msg); } else { AbstractNettyRemotingClient.LOGGER.error("rpcMessage type error"); } } } ...省略部分代码... } AbstractNettyRemoting.class::processMessage()的源码如下: public abstract class AbstractNettyRemoting implements Disposable { protected void processMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception { if (LOGGER.isDebugEnabled()) { LOGGER.debug(String.format("%s msgId:%s, body:%s", this, rpcMessage.getId(), rpcMessage.getBody())); } Object body = rpcMessage.getBody(); if (body instanceof MessageTypeAware) { MessageTypeAware messageTypeAware = (MessageTypeAware) body; final Pair<RemotingProcessor, ExecutorService> pair = this.processorTable.get((int) messageTypeAware.getTypeCode()); if (pair != null) { if (pair.getSecond() != null) { try { pair.getSecond().execute(() -> { try { //最终调用的是RmBranchCommitProcessor.class的process()函数 pair.getFirst().process(ctx, rpcMessage); } catch (Throwable th) { LOGGER.error(FrameworkErrorCode.NetDispatch.getErrCode(), th.getMessage(), th); } finally { MDC.clear(); } }); } catch (RejectedExecutionException e) { LOGGER.error(FrameworkErrorCode.ThreadPoolFull.getErrCode(), "thread pool is full, current max pool size is " + messageExecutor.getActiveCount()); if (allowDumpStack) { String name = ManagementFactory.getRuntimeMXBean().getName(); String pid = name.split("@")[0]; long idx = System.currentTimeMillis(); try { String jstackFile = idx + ".log"; LOGGER.info("jstack command will dump to " + jstackFile); Runtime.getRuntime().exec(String.format("jstack %s > %s", pid, jstackFile)); } catch (IOException exx) { LOGGER.error(exx.getMessage()); } allowDumpStack = false; } } } else { try { pair.getFirst().process(ctx, rpcMessage); } catch (Throwable th) { LOGGER.error(FrameworkErrorCode.NetDispatch.getErrCode(), th.getMessage(), th); } } } else { LOGGER.error("This message type [{}] has no processor.", messageTypeAware.getTypeCode()); } } else { LOGGER.error("This rpcMessage body[{}] is not MessageTypeAware type.", body); } } } RmBranchCommitProcessor.class的源码如下: package org.apache.seata.core.rpc.processor.client; import io.netty.channel.ChannelHandlerContext; import org.apache.seata.common.util.NetUtil; import org.apache.seata.core.protocol.RpcMessage; import org.apache.seata.core.protocol.transaction.BranchCommitRequest; import org.apache.seata.core.protocol.transaction.BranchCommitResponse; import org.apache.seata.core.rpc.RemotingClient; import org.apache.seata.core.rpc.RpcContext; import org.apache.seata.core.rpc.TransactionMessageHandler; import org.apache.seata.core.rpc.processor.RemotingProcessor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class RmBranchCommitProcessor implements RemotingProcessor { private static final Logger LOGGER = LoggerFactory.getLogger(RmBranchCommitProcessor.class); private TransactionMessageHandler handler; private RemotingClient remotingClient; public RmBranchCommitProcessor(TransactionMessageHandler handler, RemotingClient remotingClient) { this.handler = handler; this.remotingClient = remotingClient; } public void process(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception { String remoteAddress = NetUtil.toStringAddress(ctx.channel().remoteAddress()); Object msg = rpcMessage.getBody(); if (LOGGER.isInfoEnabled()) { LOGGER.info("rm client handle branch commit process:" + msg); } this.handleBranchCommit(rpcMessage, remoteAddress, (BranchCommitRequest)msg); } private void handleBranchCommit(RpcMessage request, String serverAddress, BranchCommitRequest branchCommitRequest) { BranchCommitResponse resultMessage = (BranchCommitResponse)this.handler.onRequest(branchCommitRequest, (RpcContext)null); if (LOGGER.isDebugEnabled()) { LOGGER.debug("branch commit result:" + resultMessage); } try { this.remotingClient.sendAsyncResponse(serverAddress, request, resultMessage); } catch (Throwable var6) { LOGGER.error("branch commit error: {}", var6.getMessage(), var6); } } } seata 的 xa 模式是两阶段提交: ①、第一阶段先执行 XA Start、执行SQL、XA End三个步骤,之后直接执行XA Prepare。 ②、第二阶段执行 XA commit/rollback。 但是oracle数据库不支持,因为 oracle 实现的是标准的 xa 协议,即 xa end 后,TC(事务协调者)向RM (资源管理器,也就是数据库)统一发送 prepare,最后再发送 commit/rollback。这也导致了 seata 的 xa 模式对 oracle 数据库的支持不太好。 2.1、xa模式数据源代理与at模式数据源代理的区别   seata 中的 XA 模式是使用数据源代理来实现的,需要手动配置数据源代理,代码如下: import org.apache.seata.rm.datasource.xa.DataSourceProxyXA; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.context.annotation.Bean; import javax.sql.DataSource; @Bean @ConfigurationProperties(prefix = "spring.datasource") public DruidDataSource druidDataSource() { //也可以根据普通 DataSource 来创建 XAConnection,但是这种方式有兼容性问题(比如 oracle数据库不支持) return new DruidDataSource(); } @Bean("dataSourceProxy") public DataSource dataSource(DruidDataSource druidDataSource) { //所以 seata 使用了开发者自己配置 XADataSource //seata 提供的 XA 数据源代理,要求代码框架中必须使用 druid 连接池 return new DataSourceProxyXA(druidDataSource); } ①、在数据源代理根据普通数据源(DataSource)中获取的普通 JDBC 连接创建出相应的 Connection的数据源代理方式中,XA模式与 AT 模式的数据源代理机制的区别,如下所示: DataSourceProxyXA、ConnectionProxyXA、StatementProxyXA的UML关系图,如下所示: DataSourceProxyXA.class的部分源码 ...省略部分导包代码... import org.apache.seata.rm.datasource.util.XAUtils; import javax.sql.DataSource; import javax.sql.XAConnection; ...省略部分导包代码... public class DataSourceProxyXA extends AbstractDataSourceProxyXA { ...省略部分代码... protected Connection getConnectionProxy(Connection connection) throws SQLException { return !RootContext.inGlobalTransaction() ? connection : this.getConnectionProxyXA(connection); } protected Connection getConnectionProxyXA() throws SQLException { Connection connection = this.dataSource.getConnection(); return this.getConnectionProxyXA(connection); } //创建ConnectionProxyXA和XAConnection private Connection getConnectionProxyXA(Connection connection) throws SQLException { Connection physicalConn = (Connection)connection.unwrap(Connection.class); XAConnection xaConnection = XAUtils.createXAConnection(physicalConn, this); ConnectionProxyXA connectionProxyXA = new ConnectionProxyXA(connection, xaConnection, this, RootContext.getXID()); connectionProxyXA.init(); return connectionProxyXA; } ...省略部分代码... } ConnectionProxyXA .class的部分源码和AbstractConnectionProxyXA.class的部分源码 ...省略部分导包代码... public class ConnectionProxyXA extends AbstractConnectionProxyXA implements Holdable { private static final Logger LOGGER = LoggerFactory.getLogger(ConnectionProxyXA.class); private static final int BRANCH_EXECUTION_TIMEOUT = ConfigurationFactory.getInstance().getInt("client.rm.branchExecutionTimeoutXA", 60000); private volatile boolean currentAutoCommitStatus = true; private volatile XAXid xaBranchXid; private volatile boolean xaActive = false; private volatile boolean xaEnded = false; private volatile boolean kept = false; private volatile boolean rollBacked = false; private volatile Long branchRegisterTime = null; private volatile Long prepareTime = null; private static final Integer TIMEOUT; private boolean shouldBeHeld = false; public ConnectionProxyXA(Connection originalConnection, XAConnection xaConnection, BaseDataSourceResource resource, String xid) { super(originalConnection, xaConnection, resource, xid); this.shouldBeHeld = resource.isShouldBeHeld(); } ...省略部分代码... } package org.apache.seata.rm.datasource.xa; ...省略部分导包代码... import javax.sql.XAConnection; import java.sql.Connection; ...省略部分导包代码... public abstract class AbstractConnectionProxyXA implements Connection { ...省略部分代码... protected Connection originalConnection; public AbstractConnectionProxyXA(Connection originalConnection, XAConnection xaConnection, BaseDataSourceResource resource, String xid) { this.originalConnection = originalConnection; this.xaConnection = xaConnection; this.resource = resource; this.xid = xid; } //用重载的方式创建不同的StatementProxyXA @Override public Statement createStatement() throws SQLException { Statement targetStatement = originalConnection.createStatement(); return new StatementProxyXA(this, targetStatement); } @Override public Statement createStatement(int resultSetType, int resultSetConcurrency) throws SQLException { Statement statement = originalConnection.createStatement(resultSetType, resultSetConcurrency); return new StatementProxyXA(this, statement); } @Override public Statement createStatement(int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException { Statement statement = originalConnection.createStatement(resultSetType, resultSetConcurrency, resultSetHoldability); return new StatementProxyXA(this, statement); } ...省略部分代码... } StatementProxyXA.class的部分源码 import java.sql.Connection; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.SQLWarning; import java.sql.Statement; /** * Statement proxy for XA mode. * */ public class StatementProxyXA implements Statement { protected AbstractConnectionProxyXA connectionProxyXA; protected Statement targetStatement; public StatementProxyXA(AbstractConnectionProxyXA connectionProxyXA, Statement targetStatement) { this.connectionProxyXA = connectionProxyXA; this.targetStatement = targetStatement; } ...省略部分代码... } ②、在数据源代理是指定的XA 数据源(XADataSource)进行代理方式中获取的普通 JDBC 连接创建出相应的 Connection的数据源代理方式中,XA模式与 AT模式的数据源代理机制的区别,如下所示: 作者在SEATA的2.3.0版本中没有找到XADataSourceProxy.class、XAConnectionProxy.class、StatementProxyXA.class 2.2、XA 第一阶段的部分源码   XA第一阶段是指XA Start、执行SQL、XA End、XA Prepare,如下所示: 当 RM 收到 DML 请求后,seata 会使用 ExecuteTemplateXA.class中的静态函数execute()来执行,execute()中有一个地方很关键,就是把 autocommit 属性改为了 false,而 mysql 默认 autocommit 是 true。事务提交之后,还要把 autocommit 改回默认。如下所示: import java.sql.Connection; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.SQLWarning; import java.sql.Statement; /** * Statement proxy for XA mode. * */ public class StatementProxyXA implements Statement { protected AbstractConnectionProxyXA connectionProxyXA; protected Statement targetStatement; public StatementProxyXA(AbstractConnectionProxyXA connectionProxyXA, Statement targetStatement) { this.connectionProxyXA = connectionProxyXA; this.targetStatement = targetStatement; } @Override public int executeUpdate(String sql) throws SQLException { return ExecuteTemplateXA.execute(connectionProxyXA, (statement, args) -> statement.executeUpdate( (String)args[0]), targetStatement, sql); } ...省略部分代码... } 2.2.1、XA Start环节 ExecuteTemplateXA .class的的部分源码 package org.apache.seata.rm.datasource.xa; import org.apache.seata.rm.datasource.exec.StatementCallback; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.sql.SQLException; import java.sql.Statement; public class ExecuteTemplateXA { private static final Logger LOGGER = LoggerFactory.getLogger(ExecuteTemplateXA.class); public static <T, S extends Statement> T execute(AbstractConnectionProxyXA connectionProxyXA, StatementCallback<T, S> statementCallback, S targetStatement, Object... args) throws SQLException { boolean autoCommitStatus = connectionProxyXA.getAutoCommit(); if (autoCommitStatus) { // XA Start环节 connectionProxyXA.setAutoCommit(false); } ...省略部分代码... } ...省略部分代码... } ConnectionProxyXA .class的部分源码——真正开启XA Start环节 package org.apache.seata.rm.datasource.xa; import java.sql.Connection; import java.sql.SQLException; import javax.sql.PooledConnection; import javax.sql.XAConnection; import javax.transaction.xa.XAException; import javax.transaction.xa.XAResource; import org.apache.seata.common.DefaultValues; import org.apache.seata.common.util.StringUtils; import org.apache.seata.config.ConfigurationFactory; import org.apache.seata.core.exception.TransactionException; import org.apache.seata.core.model.BranchStatus; import org.apache.seata.core.model.BranchType; import org.apache.seata.rm.BaseDataSourceResource; import org.apache.seata.rm.DefaultResourceManager; import org.apache.seata.rm.datasource.util.SeataXAResource; import org.apache.seata.sqlparser.util.JdbcConstants; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class ConnectionProxyXA extends AbstractConnectionProxyXA implements Holdable { ...省略部分代码... @Override public void setAutoCommit(boolean autoCommit) throws SQLException { if (currentAutoCommitStatus == autoCommit) { return; } if (isReadOnly()) { //If it is a read-only transaction, do nothing currentAutoCommitStatus = autoCommit; return; } if (autoCommit) { // According to JDBC spec: // If this method is called during a transaction and the // auto-commit mode is changed, the transaction is committed. if (xaActive) { commit(); } } else { if (xaActive) { throw new SQLException("should NEVER happen: setAutoCommit from true to false while xa branch is active"); } // Start a XA branch long branchId; try { // 1. register branch to TC then get the branch message branchRegisterTime = System.currentTimeMillis(); branchId = DefaultResourceManager.get().branchRegister(BranchType.XA, resource.getResourceId(), null, xid, null, null); } catch (TransactionException te) { cleanXABranchContext(); throw new SQLException("failed to register xa branch " + xid + " since " + te.getCode() + ":" + te.getMessage(), te); } // 2. build XA-Xid with xid and branchId this.xaBranchXid = XAXidBuilder.build(xid, branchId); // Keep the Connection if necessary keepIfNecessary(); try { start();//开启XA事务 } catch (XAException e) { cleanXABranchContext(); throw new SQLException("failed to start xa branch " + xid + " since " + e.getMessage(), e); } // 4. XA is active this.xaActive = true; } currentAutoCommitStatus = autoCommit; } //最终调用了start()函数才真正开启了XA Start环节 private synchronized void start() throws XAException, SQLException { // 3. XA Start if (JdbcConstants.ORACLE.equals(resource.getDbType())) { xaResource.start(this.xaBranchXid, SeataXAResource.ORATRANSLOOSE); } else { xaResource.start(this.xaBranchXid, XAResource.TMNOFLAGS); } try { termination(); } catch (SQLException e) { // the framework layer does not actively call ROLLBACK when setAutoCommit throws an SQL exception xaResource.end(this.xaBranchXid, XAResource.TMFAIL); xaRollback(xaBranchXid); // Branch Report to TC: Failed reportStatusToTC(BranchStatus.PhaseOne_Failed); throw e; } } ...省略部分代码... } 2.2.2、执行SQL环节 ExecuteTemplateXA .class的部分源码 package org.apache.seata.rm.datasource.xa; import org.apache.seata.rm.datasource.exec.StatementCallback; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.sql.SQLException; import java.sql.Statement; /** * The type Execute template. * */ public class ExecuteTemplateXA { private static final Logger LOGGER = LoggerFactory.getLogger(ExecuteTemplateXA.class); public static <T, S extends Statement> T execute(AbstractConnectionProxyXA connectionProxyXA, StatementCallback<T, S> statementCallback, S targetStatement, Object... args) throws SQLException { boolean autoCommitStatus = connectionProxyXA.getAutoCommit(); if (autoCommitStatus) { // XA Start connectionProxyXA.setAutoCommit(false); } try { T res = null; try { //执行SQL环节,最终调用的是StatementProxyXA.class中的匿名内部类执行了SQL // execute SQL res = statementCallback.execute(targetStatement, args); ...省略部分代码... } ...省略部分代码... } StatementProxyXA .class的部分源代码,最终调用的是StatementProxyXA.class中的匿名内部类执行了SQL public class StatementProxyXA implements Statement { protected AbstractConnectionProxyXA connectionProxyXA; protected Statement targetStatement; public StatementProxyXA(AbstractConnectionProxyXA connectionProxyXA, Statement targetStatement) { this.connectionProxyXA = connectionProxyXA; this.targetStatement = targetStatement; } @Override public int executeUpdate(String sql) throws SQLException { //最终调用了StatementProxyXA.class中的匿名内部类执行了SQL return ExecuteTemplateXA.execute(connectionProxyXA, (statement, args) -> statement.executeUpdate( (String)args[0]), targetStatement, sql); } ...省略部分代码... } 2.2.3、XA End环节和XA Prepare环节 ExecuteTemplateXA .class的的部分源码 package org.apache.seata.rm.datasource.xa; import org.apache.seata.rm.datasource.exec.StatementCallback; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.sql.SQLException; import java.sql.Statement; /** * The type Execute template. * */ public class ExecuteTemplateXA { private static final Logger LOGGER = LoggerFactory.getLogger(ExecuteTemplateXA.class); public static <T, S extends Statement> T execute(AbstractConnectionProxyXA connectionProxyXA, StatementCallback<T, S> statementCallback, S targetStatement, Object... args) throws SQLException { boolean autoCommitStatus = connectionProxyXA.getAutoCommit(); if (autoCommitStatus) { // XA Start connectionProxyXA.setAutoCommit(false); } try { T res = null; try { // execute SQL res = statementCallback.execute(targetStatement, args); } catch (Throwable ex) { if (autoCommitStatus) { // XA End & Rollback try { connectionProxyXA.rollback(); } catch (SQLException sqle) { // log and ignore the rollback failure. LOGGER.warn( "Failed to rollback xa branch of " + connectionProxyXA.xid + "(caused by SQL execution failure(" + ex.getMessage() + ") since " + sqle.getMessage(), sqle); } } if (ex instanceof SQLException) { throw ex; } else { throw new SQLException(ex); } } if (autoCommitStatus) { try { // XA End & Prepare //XA End环节和XA Prepare环节 connectionProxyXA.commit(); ...省略部分代码... } ...省略部分代码... } ConnectionProxyXA .class的部分源码——真正执行XA End环节和XA Prepare环节 package org.apache.seata.rm.datasource.xa; import java.sql.Connection; import java.sql.SQLException; import javax.sql.PooledConnection; import javax.sql.XAConnection; import javax.transaction.xa.XAException; import javax.transaction.xa.XAResource; import org.apache.seata.common.DefaultValues; import org.apache.seata.common.util.StringUtils; import org.apache.seata.config.ConfigurationFactory; import org.apache.seata.core.exception.TransactionException; import org.apache.seata.core.model.BranchStatus; import org.apache.seata.core.model.BranchType; import org.apache.seata.rm.BaseDataSourceResource; import org.apache.seata.rm.DefaultResourceManager; import org.apache.seata.rm.datasource.util.SeataXAResource; import org.apache.seata.sqlparser.util.JdbcConstants; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class ConnectionProxyXA extends AbstractConnectionProxyXA implements Holdable { ...省略部分代码... @Override public synchronized void commit() throws SQLException { if (currentAutoCommitStatus || isReadOnly()) { // Ignore the committing on an autocommit session and read-only transaction. return; } if (!xaActive || this.xaBranchXid == null) { throw new SQLException("should NOT commit on an inactive session", SQLSTATE_XA_NOT_END); } try { // XA End: Success try { end(XAResource.TMSUCCESS); } catch (SQLException sqle) { // Rollback immediately before the XA Branch Context is deleted. String xaBranchXid = this.xaBranchXid.toString(); rollback(); throw new SQLException("Branch " + xaBranchXid + " was rollbacked on committing since " + sqle.getMessage(), SQLSTATE_XA_NOT_END, sqle); } long now = System.currentTimeMillis(); checkTimeout(now); setPrepareTime(now); int prepare = xaResource.prepare(xaBranchXid); // Based on the four databases: MySQL (8), Oracle (12c), Postgres (16), and MSSQL Server (2022), // only Oracle has read-only optimization; the others do not provide read-only feedback. // Therefore, the database type check can be eliminated here. if (prepare == XAResource.XA_RDONLY) { // Branch Report to TC: RDONLY reportStatusToTC(BranchStatus.PhaseOne_RDONLY); } } catch (XAException xe) { // Branch Report to TC: Failed reportStatusToTC(BranchStatus.PhaseOne_Failed); throw new SQLException( "Failed to end(TMSUCCESS)/prepare xa branch on " + xid + "-" + xaBranchXid.getBranchId() + " since " + xe .getMessage(), xe); } finally { cleanXABranchContext(); } } private void xaEnd(XAXid xaXid, int flags) throws XAException { if (!xaEnded) { xaResource.end(xaXid, flags); xaEnded = true; } } private synchronized void end(int flags) throws XAException, SQLException { xaEnd(xaBranchXid, flags); termination(); } private void termination() throws SQLException { termination(this.xaBranchXid.toString()); } private void termination(String xaBranchXid) throws SQLException { // if it is not empty, the resource will hang and need to be terminated early BranchStatus branchStatus = BaseDataSourceResource.getBranchStatus(xaBranchXid); if (branchStatus != null) { releaseIfNecessary(); throw new SQLException("failed xa branch " + xid + " the global transaction has finish, branch status: " + branchStatus.getCode()); } } //给TC汇报XA Commit状态和XA Rollback状态 private void reportStatusToTC(BranchStatus status) { try { DefaultResourceManager.get().branchReport(BranchType.XA, xid, xaBranchXid.getBranchId(), status, null); } catch (TransactionException te) { LOGGER.warn("Failed to report XA branch {} on {}-{} since {}:{}", status, xid, xaBranchXid.getBranchId(), te.getCode(), te.getMessage()); } } ...省略部分代码... } 2.3、XA 第二阶段的部分源码   XA第二阶段是指XA Commit或者XA Rollback,如下所示: 当XA模式的第一阶段执行完成后,便会根据第一阶段的执行结果来执行XA模式的第二阶段,如下图所示: RmBranchCommitProcessor.class的部分源码 ...省略导包代码... public class RmBranchCommitProcessor implements RemotingProcessor { ...省略部分代码... @Override public void process(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception { String remoteAddress = NetUtil.toStringAddress(ctx.channel().remoteAddress()); Object msg = rpcMessage.getBody(); if (LOGGER.isInfoEnabled()) { LOGGER.info("rm client handle branch commit process:" + msg); } handleBranchCommit(rpcMessage, remoteAddress, (BranchCommitRequest) msg); } private void handleBranchCommit(RpcMessage request, String serverAddress, BranchCommitRequest branchCommitRequest) { BranchCommitResponse resultMessage; resultMessage = (BranchCommitResponse) handler.onRequest(branchCommitRequest, null); if (LOGGER.isDebugEnabled()) { LOGGER.debug("branch commit result:" + resultMessage); } try { this.remotingClient.sendAsyncResponse(serverAddress, request, resultMessage); } catch (Throwable throwable) { LOGGER.error("branch commit error: {}", throwable.getMessage(), throwable); } } ...省略部分代码... } AbstractRMHandler.class的部分源码 ...省略导包代码... public abstract class AbstractRMHandler extends AbstractExceptionHandler implements RMInboundHandler, TransactionMessageHandler { ...省略部分代码... @Override public BranchCommitResponse handle(BranchCommitRequest request) { BranchCommitResponse response = new BranchCommitResponse(); exceptionHandleTemplate(new AbstractCallback<BranchCommitRequest, BranchCommitResponse>() { @Override public void execute(BranchCommitRequest request, BranchCommitResponse response) throws TransactionException { doBranchCommit(request, response); } }, request, response); return response; } @Override public BranchRollbackResponse handle(BranchRollbackRequest request) { BranchRollbackResponse response = new BranchRollbackResponse(); exceptionHandleTemplate(new AbstractCallback<BranchRollbackRequest, BranchRollbackResponse>() { @Override public void execute(BranchRollbackRequest request, BranchRollbackResponse response) throws TransactionException { doBranchRollback(request, response); } }, request, response); return response; } protected void doBranchCommit(BranchCommitRequest request, BranchCommitResponse response) throws TransactionException { String xid = request.getXid(); long branchId = request.getBranchId(); String resourceId = request.getResourceId(); String applicationData = request.getApplicationData(); if (LOGGER.isInfoEnabled()) { LOGGER.info("Branch committing: " + xid + " " + branchId + " " + resourceId + " " + applicationData); } BranchStatus status = getResourceManager().branchCommit(request.getBranchType(), xid, branchId, resourceId, applicationData); response.setXid(xid); response.setBranchId(branchId); response.setBranchStatus(status); if (LOGGER.isInfoEnabled()) { LOGGER.info("Branch commit result: " + status); } } protected void doBranchRollback(BranchRollbackRequest request, BranchRollbackResponse response) throws TransactionException { String xid = request.getXid(); long branchId = request.getBranchId(); String resourceId = request.getResourceId(); String applicationData = request.getApplicationData(); if (LOGGER.isInfoEnabled()) { LOGGER.info("Branch Rollbacking: " + xid + " " + branchId + " " + resourceId); } BranchStatus status = getResourceManager().branchRollback(request.getBranchType(), xid, branchId, resourceId, applicationData); response.setXid(xid); response.setBranchId(branchId); response.setBranchStatus(status); if (LOGGER.isInfoEnabled()) { LOGGER.info("Branch Rollbacked result: " + status); } } @Override public AbstractResultMessage onRequest(AbstractMessage request, RpcContext context) { if (!(request instanceof AbstractTransactionRequestToRM)) { throw new IllegalArgumentException(); } AbstractTransactionRequestToRM transactionRequest = (AbstractTransactionRequestToRM)request; transactionRequest.setRMInboundMessageHandler(this); return transactionRequest.handle(context); } ...省略部分代码... } BranchCommitRequest.class的部分源码 ...省略导包代码... public class BranchCommitRequest extends AbstractBranchEndRequest { @Override public short getTypeCode() { return MessageType.TYPE_BRANCH_COMMIT; } @Override public AbstractTransactionResponse handle(RpcContext rpcContext) { return handler.handle(this); } } ResourceManagerXA.class的部分源码 ...省略导包代码... public class ResourceManagerXA extends AbstractDataSourceCacheResourceManager { ...省略部分代码... @Override public BranchStatus branchCommit(BranchType branchType, String xid, long branchId, String resourceId, String applicationData) throws TransactionException { return finishBranch(true, branchType, xid, branchId, resourceId, applicationData); } @Override public BranchStatus branchRollback(BranchType branchType, String xid, long branchId, String resourceId, String applicationData) throws TransactionException { return finishBranch(false, branchType, xid, branchId, resourceId, applicationData); } ...省略部分代码... } ResourceManagerXA.class的UML关系图,如下所示: 上述类的调用关系,如下时序图所示: ResourceManagerXA.class::finishBranch()函数最终调用了ConnectionProxyXA.class::xaCommit()函数或者ConnectionProxyXA.class::xaRollback()函数,如下所示: ...省略导包代码... public class ConnectionProxyXA extends AbstractConnectionProxyXA implements Holdable { ...省略部分代码... public synchronized void xaCommit(String xid, long branchId, String applicationData) throws XAException { XAXid xaXid = XAXidBuilder.build(xid, branchId); xaResource.commit(xaXid, false); releaseIfNecessary(); } public synchronized void xaRollback(String xid, long branchId, String applicationData) throws XAException { if (this.xaBranchXid != null) { xaRollback(xaBranchXid); } else { XAXid xaXid = XAXidBuilder.build(xid, branchId); xaRollback(xaXid); } } ...省略部分代码... }