Seata AT 模式代碼級詳解

文|

劉月財

seata-go 項目負責人

北京小桔科技有限公司【滴滴】開發工程師

趙新(花名:於雨 )

螞蟻集團 Seata 項目開源負責人

背景

Seata 四種事務模式中,AT 事務模式是阿里體系獨創的事務模式,對業務無侵入,也是 Seata 用戶最多的一種事務模式,兼具易用性與高性能。

目前,Seata 社區正大力推進其多語言版本建設,Go、PHP、JS 和 Python 四個語言版本基本完成了 TCC 事務模式的實現。參照 Seata v1.5.2 版本的 AT 模式的實現,並結合 Seata 官方文檔,本文嘗試從代碼角度詳解 Seata AT 事務模式的詳細流程,目的是梳理 Seata Java 版本 AT 模式的實現細節後,在多語言版本後續開發中,優先實現 AT 事務模式。

1、什麼是 AT 模式?

AT 模式是一種二階段提交的分佈式事務模式,它採用了本地 undo log 的方式來數據在修改前後的狀態,並用它來實現回滾。從性能上來說,AT 模式由於有 undo log 的存在,一階段執行完可以立即釋放鎖和連接資源,吞吐量比 XA 模式高。用戶在使用 AT 模式的時候,只需要配置好對應的數據源即可,事務提交、回滾的流程都由 Seata 自動完成,對用戶業務幾乎沒有入侵,使用便利。

2、AT 模式與 ACID 和 CAP

談論數據庫的事務模式,一般都會先談論事務相關的 ACID 特性,但在分佈式場景下,還需要考慮其 CAP 性質。

2.1 AT 與 ACID

在數據庫本地事務隔離級別讀已提交(Read Committed)或以上的基礎上,Seata(AT 模式)的默認全局隔離級別是讀未提交(Read Uncommitted)

如果應用在特定場景下,必須要求全局的讀已提交,目前 Seata 的方式是通過 SELECT FOR UPDATE 語句的代理。 

SELECT FOR UPDATE 語句的執行會查詢全局鎖,如果全局鎖被其他事務持有,則釋放本地鎖(回滾 SELECT FOR UPDATE 語句的本地執行)並重試。這個過程中,查詢是被 block 住的,直到全局鎖拿到,即讀取的相關數據是已提交的,才返回。

出於總體性能上的考慮,Seata 目前的方案並沒有對所有 SELECT 語句都進行代理,僅針對 FOR UPDATE 的 SELECT 語句。

詳細例子參考 Seata 官網:https://seata.io/zh-cn/docs/dev/mode/at-mode.html

2.1.2 AT 模式的寫隔離

AT 會對寫操作的 SQL 進行攔截,提交本地事務前,會向 TC 獲取全局鎖,未獲取到全局鎖的情況下,不能進行寫,以此來保證不會發生寫衝突:

• 一階段本地事務提交前,需要確保先拿到全局鎖

• 拿不到全局鎖,不能提交本地事務;

• 拿全局鎖的嘗試被限制在一定範圍內,超出範圍將放棄,並回滾本地事務,釋放本地鎖。

詳細例子參考 Seata 官網:https://seata.io/zh-cn/docs/dev/mode/at-mode.html

2.2 AT 與 CAP

Seata 所有的事務模式在一般情況下,是需要保證 CP,即一致性和分區容錯性,因爲分佈式事務的核心就是要保證數據的一致性(包括弱一致性)。比如,在一些交易場景下,涉及到多個系統的金額的變化,保證一致性可以避免系統產生資損。

分佈式系統不可避免地會出現服務不可用的情況,如 Seata 的 TC 出現不可用時,用戶可能希望通過服務降級,優先保證整個服務的可用性,此時 Seata 需要從 CP 系統轉換爲一個保證 AP 的系統。

比如,有一個服務是給用戶端提供用戶修改信息的功能,假如此時 TC 服務出現問題,爲了不影響用戶的使用體驗,我們希望服務仍然可用,只不過所有的 SQL 的執行降級爲不走全局事務,而是當做本地事務執行。

AT 模式默認優先保證 CP,但提供了配置通道讓用戶在 CP 和 AP 兩種模式下進行切換:

• 配置文件的 tm.degrade-check 參數,其值爲 true 則分支事務保證 AP,反之保證 CP;

• 手動修改配置中心的 service.disableGlobalTransaction 屬性爲 true,則關閉全局事務實現 AP。

3、AT 數據源代理

在 AT 模式中,用戶只需要配置好 AT 的代理數據源即可, AT 的所有流程都在代理數據源中完成,對用戶無感知。 

AT 數據源代理的整體類結構如下圖:

圖片

 AT 事務數據源代理類結構圖【from https://seata.io/zh-cn/docs/dev/mode/xa-mode.html

AT 的數據源代理中,分別對目標數據庫的 DataSource 、 Connection 和 Statement  進行了代理,在執行目標 SQL 動作之前,完成了 RM 資源註冊、 undo log 生成、分支事務註冊、分支事務提交 / 回滾等操作,而這些操作對用戶並無感知。

下面的時序圖中,展示了 AT 模式在執行過程中,這幾個代理類的動作細節:

圖片

4、AT 模式流程

以下是 AT 模式的整體流程,從這裏可以看到分佈式事務各個關鍵動作的執行時機,每個動作細節,我們後面來討論:

圖片

4.1 一階段

在 AT 模式的第一階段, Seata 會通過代理數據源,攔截用戶執行的業務 SQL ,假如用戶沒有開啓事務,會自動開啓一個新事務。如果業務 SQL 是寫操作(增、刪、改操作)類型,會解析業務 SQL 的語法,生成 SELECT SQL 語句,把要被修改的記錄查出來,保存爲 “before image” 。然後執行業務 SQL ,執行完後用同樣的原理,將已經被修改的記錄查出來,保存爲 “after image” ,至此一個 undo log 記錄就完整了。隨後 RM 會向 TC 註冊分支事務, TC 側會新加鎖記錄,鎖可以保證 AT 模式的讀、寫隔離。RM  再將 undo log 和業務 SQL 的本地事務提交,保證業務 SQL 和保存 undo log 記錄 SQL 的原子性。

圖片

4.2 二階段提交

AT 模式的二階段提交,TC 側會將該事務的鎖刪除,然後通知 RM 異步刪除 undo log 記錄即可。

圖片

4.3 二階段回滾

如果 AT 模式的二階段是回滾,那麼 RM 側需要根據一階段保存的 undo log 數據中的 before image 記錄,通過逆向 SQL 的方式,對在一階段修改過的業務數據進行還原即可。

但是在還原數據之前,需要進行髒數據校驗。因爲在一階段提交後,到現在進行回滾的中間這段時間,該記錄有可能被別的業務改動過。校驗的方式,就是用 undo log 的 after image 和現在數據庫的數據做比較,假如數據一致,說明沒有髒數據;不一致則說明有髒數據,出現髒數據就需要人工進行處理了。

圖片

5、關鍵代碼模塊

如下是 AT 模式整個流程的主要模塊,我們從中可以瞭解開發 AT 模式需要做哪些事情:

圖片

5.1 Undo log 數據格式

undo log 存在表 undo_log 表中,undo_log 表的表結構如下:

圖片

rollback_info 存放了業務數據修改前後的內容,數據表存放的是經過壓縮後的格式,他的明文格式如下:

{
    "branchId":2828558179596595558,
    "sqlUndoLogs":[
        {
            "afterImage":{
                "rows":[
                    {
                        "fields":[
                            {
                                "keyType":"PRIMARY_KEY",
                                "name":"id",
                                "type":4,
                                "value":3
                            },
                            {
                                "keyType":"NULL",
                                "name":"count",
                                "type":4,
                                "value":70
                            }
                        ]
                    }
                ],
                "tableName":"stock_tbl"
            },
            "beforeImage":{
                "rows":[
                    {
                        "fields":[
                            {
                                "keyType":"PRIMARY_KEY",
                                "name":"id",
                                "type":4,
                                "value":3
                            },
                            {
                                "keyType":"NULL",
                                "name":"count",
                                "type":4,
                                "value":100
                            }
                        ]
                    }
                ],
                "tableName":"stock_tbl"
            },
            "sqlType":"UPDATE",
            "tableName":"stock_tbl"
        }
    ],
    "xid":"192.168.51.102:8091:2828558179596595550"
}

5.2 UndoLogManager

UndoLogManager 負責 undo log 的新加、刪除、回滾操作,不同的數據庫有不同的實現(不同數據庫的 SQL 語法會不同),公共邏輯放在了 AbstractUndoLogManager 抽象類中,整體的類繼承關係如下圖:

圖片

注:圖片建議在 PC 端查看

插入和刪除 undo log 的邏輯都比較簡單,直接操作數據表就行。這裏重點看下回滾 undo log 的邏輯:

圖片

源碼分析如下:

@Override
public void undo(DataSourceProxy dataSourceProxy, String xid, long branchId) throws TransactionException {
  Connection conn = null;b
    ResultSet rs = null;
  PreparedStatement selectPST = null;
  boolean originalAutoCommit = true;
  for (; ; ) {
    try {
      conn = dataSourceProxy.getPlainConnection();
      // The entire undo process should run in a local transaction.
      // 開啓本地事務,確保刪除undo log和恢復業務數據的SQL在一個事務中commit
      if (originalAutoCommit = conn.getAutoCommit()) {
        conn.setAutoCommit(false);
      }
      // Find UNDO LOG
      selectPST = conn.prepareStatement(SELECT_UNDO_LOG_SQL);
      selectPST.setLong(1, branchId);
      selectPST.setString(2, xid);
      // 查出branchId的所有undo log記錄,用來恢復業務數據
      rs = selectPST.executeQuery();
      boolean exists = false;
      while (rs.next()) {
        exists = true;
        // It is possible that the server repeatedly sends a rollback request to roll back
        // the same branch transaction to multiple processes,
        // ensuring that only the undo_log in the normal state is processed.
        int state = rs.getInt(ClientTableColumnsName.UNDO_LOG_LOG_STATUS);
        // 如果state=1,說明可以回滾;state=1說明不能回滾
        if (!canUndo(state)) {
          if (LOGGER.isInfoEnabled()) {
            LOGGER.info("xid {} branch {}, ignore {} undo_log", xid, branchId, state);
          }
          return;
        }
        String contextString = rs.getString(ClientTableColumnsName.UNDO_LOG_CONTEXT);
        Map<String, String> context = parseContext(contextString);
        byte[] rollbackInfo = getRollbackInfo(rs);
        String serializer = context == null ? null : context.get(UndoLogConstants.SERIALIZER_KEY);
        // 根據serializer獲取序列化工具類
        UndoLogParser parser = serializer == null ? UndoLogParserFactory.getInstance()
          : UndoLogParserFactory.getInstance(serializer);
        // 反序列化undo log,得到業務記錄修改前後的明文
        BranchUndoLog branchUndoLog = parser.decode(rollbackInfo);
        try {
          // put serializer name to local
          setCurrentSerializer(parser.getName());
          List<SQLUndoLog> sqlUndoLogs = branchUndoLog.getSqlUndoLogs();
          if (sqlUndoLogs.size() > 1) {
            Collections.reverse(sqlUndoLogs);
          }
          for (SQLUndoLog sqlUndoLog : sqlUndoLogs) {
            TableMeta tableMeta = TableMetaCacheFactory.getTableMetaCache(dataSourceProxy.getDbType()).getTableMeta(
                conn, sqlUndoLog.getTableName(), dataSourceProxy.getResourceId());
            sqlUndoLog.setTableMeta(tableMeta);
            AbstractUndoExecutor undoExecutor = UndoExecutorFactory.getUndoExecutor(
                dataSourceProxy.getDbType(), sqlUndoLog);
            undoExecutor.executeOn(conn);
          }
        } finally {
          // remove serializer name
          removeCurrentSerializer();
        }
      }
      // If undo_log exists, it means that the branch transaction has completed the first phase,
      // we can directly roll back and clean the undo_log
      // Otherwise, it indicates that there is an exception in the branch transaction,
      // causing undo_log not to be written to the database.
      // For example, the business processing timeout, the global transaction is the initiator rolls back.
      // To ensure data consistency, we can insert an undo_log with GlobalFinished state
      // to prevent the local transaction of the first phase of other programs from being correctly submitted.
      // See https://github.com/seata/seata/issues/489
      if (exists) {
        deleteUndoLog(xid, branchId, conn);
        conn.commit();
        if (LOGGER.isInfoEnabled()) {
          LOGGER.info("xid {} branch {}, undo_log deleted with {}", xid, branchId,
              State.GlobalFinished.name());
        }
      } else {
        // 如果不存在undo log,可能是因爲分支事務還未執行完成(比如,分支事務執行超時),TM發起了回滾全局事務的請求。
        // 這個時候,往undo_log表插入一條記錄,可以使分支事務提交的時候失敗(undo log)
        insertUndoLogWithGlobalFinished(xid, branchId, UndoLogParserFactory.getInstance(), conn);
        conn.commit();
        if (LOGGER.isInfoEnabled()) {
          LOGGER.info("xid {} branch {}, undo_log added with {}", xid, branchId,
              State.GlobalFinished.name());
        }
      }
      return;
    } catch (SQLIntegrityConstraintViolationException e) {
      // Possible undo_log has been inserted into the database by other processes, retrying rollback undo_log
      if (LOGGER.isInfoEnabled()) {
        LOGGER.info("xid {} branch {}, undo_log inserted, retry rollback", xid, branchId);
      }
    } catch (Throwable e) {
      if (conn != null) {
        try {
          conn.rollback();
        } catch (SQLException rollbackEx) {
          LOGGER.warn("Failed to close JDBC resource while undo ... ", rollbackEx);
        }
      }
      throw new BranchTransactionException(BranchRollbackFailed_Retriable, String
          .format("Branch session rollback failed and try again later xid = %s branchId = %s %s", xid,
            branchId, e.getMessage()), e);
    } finally {
      try {
        if (rs != null) {
          rs.close();
        }
        if (selectPST != null) {
          selectPST.close();
        }
        if (conn != null) {
          if (originalAutoCommit) {
            conn.setAutoCommit(true);
          }
          conn.close();
        }
      } catch (SQLException closeEx) {
        LOGGER.warn("Failed to close JDBC resource while undo ... ", closeEx);
      }
    }
  }
}

備註:需要特別注意下,當回滾的時候,發現 undo log 不存在,需要往 undo_log 表新加一條記錄,避免因爲 RM 在 TM 發出回滾請求後,又成功提交分支事務的場景。

5.3 Compressor 壓縮算法

Compressor 接口定義了壓縮算法的規範,用來壓縮文本,節省存儲空間:

public interface Compressor {
    /**
     * compress byte[] to byte[].
     * @param bytes the bytes
     * @return the byte[]
     */
    byte[] compress(byte[] bytes);
    /**
     * decompress byte[] to byte[].
     * @param bytes the bytes
     * @return the byte[]
     */
    byte[] decompress(byte[] bytes);
}

目前已經實現的壓縮算法有如下這些:

圖片

5.4 UndoLogParser 序列化算法

Serializer 接口定義了序列化算法的規範,用來序列化代碼:

public interface UndoLogParser {
    /**
     * Get the name of parser;
     * 
     * @return the name of parser
     */
    String getName();
    /**
     * Get default context of this parser
     * 
     * @return the default content if undo log is empty
     */
    byte[] getDefaultContent();
    /**
     * Encode branch undo log to byte array.
     *
     * @param branchUndoLog the branch undo log
     * @return the byte array
     */
    byte[] encode(BranchUndoLog branchUndoLog);
    /**
     * Decode byte array to branch undo log.
     *
     * @param bytes the byte array
     * @return the branch undo log
     */
    BranchUndoLog decode(byte[] bytes);
}

目前已經實現的序列化算法有如下這些:

圖片

5.5 Executor 執行器

Executor 是 SQL 執行的入口類, AT 在執行 SQL 前後,需要管理 undo log 的 image 記錄,主要是構建 undo log ,包括根據不同的業務 SQL ,來組裝查詢 undo log 的 SQL 語句;執行查詢 undo log 的 SQL ,獲取到鏡像記錄數據;執行插入 undo log 的邏輯(未提交事務)。

public interface Executor<T> {
    /**
     * Execute t.
     *
     * @param args the args
     * @return the t
     * @throws Throwable the throwable
     */
    T execute(Object... args) throws Throwable;
}

針對不同的業務 SQL ,有不同的 Executor 實現,主要是因爲不同操作 / 不同數據庫類型的業務 SQL ,生成 undo log 的 SQL 的邏輯不同,所以都分別重寫了 beforeImage() 和 afterImage() 方法。整體的繼承關係如下圖所示:

圖片

注:圖片建議在 PC 端查看

爲了直觀地看到不同類型的 SQL 生成的 before image SQL 和 after iamge SQL ,這裏做個梳理。假如目標數據表的結構如下:

CREATE TABLE `t` (
  `id` int(64) NOT NULL AUTO_INCREMENT,
  `name` varchar(64) COLLATE utf8mb4_unicode_ci DEFAULT "",
  `addr` varchar(64) COLLATE utf8mb4_unicode_ci DEFAULT "",
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci
INSERT INTO `t` (`id`, `name`, `addr`) VALUES (1, 'Tom', 'Beijing');
INSERT INTO `t` (`id`, `name`, `addr`) VALUES (2, 'Jack', 'Nanjing');

圖片

注:圖片建議在 PC 端查看

5.6 AsyncWorker

AsyncWorker 是用來做異步執行的,用來做分支事務提交和 undo log 記錄刪除等操作。

圖片

6、關於性能

並不存在某一種完美的分佈式事務機制可以適應所有場景,完美滿足所有需求。無論 AT 模式、TCC 模式還是 Saga 模式,本質上都是對 XA 規範在各種場景下安全性或者性能的不足的改進。Seata 不同的事務模式是在一致性、可靠性、易用性、性能四個特性之間進行不同的取捨。

近期  Seata 社區發現有同行,在未詳細分析 Java 版本 AT 模式的代碼的詳細實現的情況下,僅對某個早期的 Go 版本的 Seata 進行短鏈接壓測後,質疑 AT 模型的性能及其數據安全性,請具有一定思辨能力的用戶朋友們在接受這個結論前仔細查閱其測試方法與測試對象,區分好 “李鬼” 與 “李逵”。

實際上,這個早期的 Go 版本實現僅參照了 Seata v1.4.0,且未嚴格把 Seata AT 模式的所有功能都予以實現。話說回來,即便其推崇的 Seata XA 模式,其也依賴於單 DB 的 XA 模式。而當下最新版本的 MySQL XA 事務模式的 BUG 依然很多,這個地基並沒有其想象中的那樣百分百穩固。

由阿里與螞蟻集團共建的 Seata,是我們多年內部分佈式事務工程實踐與技術經驗的結晶,開源出來後得到了多達 150+ 以上行業同行生產環境的驗證。開源大道既長且寬,這個道路上可以有機動車道也有非機動車道,還可以有人行道,大家攜手把道路拓寬延長,而非站在人行道上宣傳機動車道危險性高且車速慢。

7、總結

Seata AT 模式依賴於各個 DB 廠商的不同版本的 DB Driver(數據庫驅動),每種數據庫發佈新版本後,其 SQL 語義及其使用模式都可能發生改變。隨着近年 Seata 被其用戶們廣泛應用於多種業務場景,在開發者們的努力下,Seata AT 模式保持了編程接口與其 XA 模式幾乎一致,適配了幾乎所有的主流數據庫,並覆蓋了這些數據庫的主要流行版本的 Driver:真正做到了把分佈式系統的 “複雜性” 留在了框架層面,把易用性和高性能交給了用戶。

當然,Seata Java 版本的 XA 和 AT 模式還有許多需要完善與改進的地方,遑論其它多語言版本的實現。歡迎對 Seata 及其多語言版本建設感興趣的同行參與到 Seata 的建設中來,共同努力把 Seata 打造成一個標準化分佈式事務平臺。

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/qicDuZPhbGbKgUAbvZNemQ