6 張圖帶你徹底搞懂分佈式事務 XA 模式

作者 | 朱晉君

XA 協議是由 X/Open 組織提出的分佈式事務處理規範,主要定義了事務管理器 TM 和局部資源管理器 RM 之間的接口。目前主流的數據庫,比如 oracle、DB2 都是支持 XA 協議的。

mysql 從 5.0 版本開始,innoDB 存儲引擎已經支持 XA 協議,今天的源碼介紹實驗環境使用的是 mysql 數據庫。

兩階段提交

分佈式事務的兩階段提交是把整個事務提交分爲 prepare 和 commit 兩個階段。以電商系統爲例,分佈式系統中有訂單、賬戶和庫存三個服務,如下圖:

第一階段,事務協調者向事務參與者發送 prepare 請求,事務參與者收到請求後,如果可以提交事務,回覆 yes,否則回覆 no。

第二階段,如果所有事務參與者都回復了 yes,事務協調者向所有事務參與者發送 commit 請求,否則發送 rollback 請求。

兩階段提交存在三個問題:

三階段提交

爲了解決兩階段提交的問題,三階段提交做了改進:

如下圖:

引入 preCommit 階段後,協調節點會在 commit 之前再次檢查各個事務參與者的狀態,保證它們的狀態是一致的。但是也存在問題,那就是如果第三階段發出 rollback 請求,有的節點沒有收到,那沒有收到的節點會在超時之後進行提交,造成數據不一致。

XA 事務語法介紹

xa 事務的語法如下:

  1. 三階段的第一階段:開啓 xa 事務,這裏 xid 爲全局事務 id:

結束 xa 事務:

XA END xid [SUSPEND [FOR MIGRATE]]
  1. 三階段的第二階段,即 prepare:
XA PREPARE xid
  1. 三階段的第三階段,即 commit/rollback:
XA COMMIT xid [ONE PHASE]
XA ROLLBACK xid
  1. 查看處於 PREPARE 階段的所有事務:
XA RECOVER XA RECOVER [CONVERT XID]

seata XA 簡介

seata 是阿里推出的一款開源分佈式事務解決方案,目前有 AT、TCC、SAGA、XA 四種模式。

seata 的 XA 模式是利用分支事務中數據庫對 XA 協議的支持來實現的。我們看一下 seata 官網的介紹:[1]

從上面的圖可以看到,seata XA 模式的流程跟其他模式一樣:

  1. TM 開啓全局事務

  2. RM 向 TC 註冊分支事務

  3. RM 向 TC 報告分支事務狀態

  4. TC 向 RM 發送 commit/rollback 請求

  5. TM 結束全局事務

這裏介紹一下 RM 客戶端初始化關聯的 UML 類圖:[2]

這個圖中有一個類是 AbstractNettyRemotingClient,這個類的內部類 ClientHandler 來處理 TC 發來的請求並委託給父類 AbstractNettyRemoting 的 processMessage 方法來處理。processMessage 方法調用 RmBranchCommitProcessor 類的 process 方法。

需要注意的是,「seata 的 xa 模式對傳統的三階段提交做了優化,改成了兩階段提交」:

mysql 目前是支持 seata xa 模式的兩階段優化的。

「但是這個優化對 oracle 不支持,因爲 oracle 實現的是標準的 xa 協議,即 xa end 後,協調節點向事務參與者統一發送 prepare,最後再發送 commit/rollback。這也導致了 seata 的 xa 模式對 oracle 支持不太好。」

seata XA 源碼

seata 中的 XA 模式是使用數據源代理來實現的,需要手動配置數據源代理,代碼如下:

@Bean
@ConfigurationProperties(prefix = "spring.datasource")
public DruidDataSource druidDataSource() {
    return new DruidDataSource();
}
@Bean("dataSourceProxy")
public DataSource dataSource(DruidDataSource druidDataSource) {
    return new DataSourceProxyXA(druidDataSource);
}

1. XA 第一階段

當 RM 收到 DML 請求後,seata 會使用 ExecuteTemplateXA 來執行,執行方法 execute 中有一個地方很關鍵,就是把 autocommit 屬性改爲了 false,而 mysql 默認 autocommit 是 true。事務提交之後,還要把 autocommit 改回默認。

下面我們看一下 XA 第一階段提交的主要代碼。

1)開啓 XA

上面代碼標註 [1] 處,調用了 ConnectionProxyXA 類的 setAutoCommit 方法,這個方法的源代碼中,XA start 主要做了三件事:

RM 並沒有直接使用 TC 返回的 branchId 作爲 xa 數據源的 branchId,而是使用全局事務 id(xid) 和 branchId 重新構建了一個。

2)執行 sql

調用 PreparedStatementProxyXA 的 execute 執行 sql。

3)XA end/prepare

調用 connectionProxyXA 的 commit 方法,代碼如下:

public void commit() throws SQLException {
    //省略部分源代碼
    try {
        // XA End: Success
        xaResource.end(xaBranchXid, XAResource.TMSUCCESS);
        // XA Prepare
        xaResource.prepare(xaBranchXid);
        // Keep the Connection if necessary
        keepIfNecessary();
    } catch (XAException xe) {
        try {
            // Branch Report to TC: Failed
            DefaultResourceManager.get().branchReport(BranchType.XA, xid, xaBranchXid.getBranchId(),
                BranchStatus.PhaseOne_Failed, null);
        } catch (TransactionException te) {
            //這兒只打印了一個warn級別的日誌
        }
        throw new SQLException(
            "Failed to end(TMSUCCESS)/prepare xa branch on " + xid + "-" + xaBranchXid.getBranchId() + " since " + xe
                .getMessage(), xe);
    } finally {
        cleanXABranchContext();
    }
}

從這個源碼我們看到,commit 主要做了三件事:

到這裏我們就可以看到,seata 把 xa 協議的前兩個階段合成了一個階段。

2. XA commit

這裏的調用關係用一個時序圖來表示:

看一下 RmBranchCommitProcessor 類的 process 方法,代碼如下:

@Override
public void process(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {
    String remoteAddress = NetUtil.toStringAddress(ctx.channel().remoteAddress());
    Object msg = rpcMessage.getBody();
    if (LOGGER.isInfoEnabled()) {
        LOGGER.info("rm client handle branch commit process:" + msg);
    }
    handleBranchCommit(rpcMessage, remoteAddress, (BranchCommitRequest) msg);
}

從調用關係時序圖可以看出,上面的 handleBranchCommit 方法最終調用了 AbstractRMHandler 的 handle 方法,最後通過 branchCommit 方法調用了 ResourceManagerXA 類的 finishBranch 方法。

ResourceManagerXA 類是 XA 模式的資源管理器,看下面這個類圖,也就是 seata 中資源管理器(RM)的 UML 類圖:

上面的 finishBranch 方法調用了 connectionProxyXA.xaCommit 方法,我們最後看一下 xaCommit 方法:

public void xaCommit(String xid, long branchId, String applicationData) throws XAException {
    XAXid xaXid = XAXidBuilder.build(xid, branchId);
 //因爲使用mysql,這裏xaResource是MysqlXAConnection
    xaResource.commit(xaXid, false);
    releaseIfNecessary();
}

上面調用了數據源的 commit 方法,提交了 RM 分支事務。

到這裏,整個 RM 分支事務就結束了。Rollback 的代碼邏輯跟 commit 類似。

最後要說明的是,上面的 xaResource,是 mysql-connector-java.jar 包中的 MysqlXAConnection 類實例,它封裝了 mysql 提供的 XA 協議接口。

總結

seata 中 XA 模式的實現是使用數據源代理完成的,底層使用了數據庫對 XA 協議的原生支持。

mysql 的 java 驅動庫中,MysqlXAConnection 類封裝類 XA 協議的底層接口供外部調用。

跟 TCC 和 SAGA 模式需要在業務代碼中實現 prepare/commit/rollback 邏輯相比,XA 模式對業務代碼無侵入。

Reference

[1]:http://seata.io/zh-cn/docs/overview/what-is-seata.html

[2]:https://github.com/seata/seata

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/AEuPYwfk4fKkNcaPEaZbww