聊一聊如何用 C- 輕鬆完成一個 SAGA 分佈式事務

背景

銀行跨行轉賬業務是一個典型分佈式事務場景,假設 A 需要跨行轉賬給  B,那麼就涉及兩個銀行的數據,無法通過一個數據庫的本地事務保證轉賬的 ACID ,只能夠通過分佈式事務來解決。

市面上使用比較多的分佈式事務框架,支持 SAGA 的,大部分都是 JAVA 爲主的,沒有提供 C# 的對接方式,或者是對接難度大,一定程度上讓人望而卻步。

這裏推薦一下葉東富大佬的分佈式事務框架 dtm,一款跨語言的開源分佈式事務管理器,優雅的解決了冪等、空補償、懸掛等分佈式事務難題。提供了簡單易用、高性能、易水平擴展的分佈式事務解決方案。

老黃在搜索相關分佈式事務資料的時候,他寫的文章都是相對比較好理解的,也就是這樣關注到了 dtm 這個項目。

下面就基於這個框架來實踐一下銀行轉賬的例子。

前置工作

dotnet add package Dtmcli --version 0.3.0

成功的 SAGA

先來看一下一個成功完成的 SAGA 時序圖。

上圖的微服務 1,對應我們示例的 OutApi,也就是轉錢出去的那個服務。

微服務 2,對應我們示例的 InApi,也就是轉錢進來的那個服務。

下面是兩個服務的正向操作和補償操作的處理。

OutApi

app.MapPost("/api/TransOut"(string branch_id, string gid, string op, TransRequest req) ={
    // 進行 數據庫操作
    Console.WriteLine($"用戶【{req.UserId}】轉出【{req.Amount}】正向操作,gid={gid}, branch_id={branch_id}, op={op}");

    return Results.Ok(TransResponse.BuildSucceedResponse());
});

app.MapPost("/api/TransOutCompensate"(string branch_id, string gid, string op, TransRequest req) =>
{
    // 進行 數據庫操作
    Console.WriteLine($"用戶【{req.UserId}】轉出【{req.Amount}】補償操作,gid={gid}, branch_id={branch_id}, op={op}");

    return Results.Ok(TransResponse.BuildSucceedResponse());
});

InApi

app.MapPost("/api/TransIn"(string branch_id, string gid, string op, TransRequest req) =>
{
    Console.WriteLine($"用戶【{req.UserId}】轉入【{req.Amount}】正向操作,gid={gid}, branch_id={branch_id}, op={op}");

    return Results.Ok(TransResponse.BuildSucceedResponse());
});

app.MapPost("/api/TransInCompensate"(string branch_id, string gid, string op, TransRequest req) =>
{
    Console.WriteLine($"用戶【{req.UserId}】轉入【{req.Amount}】補償操作,gid={gid}, branch_id={branch_id}, op={op}");

    return Results.Ok(TransResponse.BuildSucceedResponse());
});

注:示例爲了簡單,沒有進行實際的數據庫操作。

到此各個子事務的處理已經 OK 了,然後是開啓 SAGA 事務,進行分支調用

var userOutReq = new TransRequest() { UserId = "1"Amount = -30 };
var userInReq = new TransRequest() { UserId = "2"Amount = 30 };

var ct = new CancellationToken();
var gid = await dtmClient.GenGid(ct);
var saga = new Saga(dtmClient, gid)
    .Add(outApi + "/TransOut", outApi + "/TransOutCompensate", userOutReq)
    .Add(inApi + "/TransIn", inApi + "/TransInCompensate", userInReq)
    ;

var flag = await saga.Submit(ct);

Console.WriteLine($"case1, {gid} saga 提交結果 = {flag}");

到這裏,一個完整的 SAGA 分佈式事務就編寫完成了。

搭建好 dtm 的環境後,運行上面的例子,會看到下面的輸出。

當然,上面的情況太理想了,轉出轉入都是一次性就成功了。

但是實際上我們會遇到許許多多的問題,最常見的應該就是網絡故障了。

下面來看一個異常的 SAGA 示例

異常的 SAGA

做一個假設,用戶 1 的轉出是正常的,但是用戶 2 在轉入的時候出現了問題。

由於事務已經提交給 dtm 了,按照 SAGA 事務的協議,dtm 會重試未完成的操作。

這個時候用戶 2 這邊會出現什麼樣的情況呢?

  1. 轉入其實成功了,但是 dtm 收到錯誤 (網絡故障等)

  2. 轉入沒有成功,直接告訴 dtm 失敗了 (應用異常等)

無論是那一種,dtm 都會進行重試操作。這個時候會發生什麼呢?我們繼續往下看。

先看一下事務失敗交互的時序圖

再通過調整上面成功的例子,來比較直觀的看看出現的情況。

在 InApi 加多一個轉入失敗的處理接口

app.MapPost("/api/TransInError"(string branch_id, string gid, string op, TransRequest req) =>
{
    Console.WriteLine($"用戶【{req.UserId}】轉入【{req.Amount}】正向操作--失敗,gid={gid}, branch_id={branch_id}, op={op}");

    //return Results.BadRequest();
    return Results.Ok(TransResponse.BuildFailureResponse());
});

失敗的返回有兩種,一種是狀態碼大於 400,一種是狀態碼是 200 並且響應體包含 FAILURE,上面的例子是第二種

調整一下調用方,把轉入正向操作替換成上面這個返回錯誤的接口。

var saga = new Saga(dtmClient, gid)
    .Add(outApi + "/TransOut", outApi + "/TransOutCompensate", userOutReq)
    .Add(inApi + "/TransInError", inApi + "/TransInCompensate", userInReq);

運行結果如下:

在這個例子中,只考慮補償 / 重試成功的情況下。

用戶 1 轉出的 30 塊錢最終是回到了他的帳號上,他沒有出現損失。

用戶 2 就有點苦逼了,轉入沒有成功,返回了失敗,還觸發了轉入的補償機制,結果就是把用戶 2 還沒進帳的 30 塊錢給多扣了,這個就是上面的情況 2,常見的空補償問題。

這個時候就要在進行轉入補償的時候做一系列的判斷,轉入有沒有成功,轉出有沒有失敗等等,把業務變的十分複雜。

如果出現了上述的情況 1,會發生什麼呢?

用戶 2 第一次已經成功轉入 30 塊錢,返回的也是成功,但是網絡出了點問題,導致 dtm 認爲失敗了,它就會進行重試,相當於用戶 2  還會收到第二個轉入 30 塊錢的請求!也就是說這次轉帳,用戶 2 會進賬 60 塊錢,翻倍了,也就是說這個請求不是冪等。

同樣的,要處理這個問題,在進行轉入的正向操作中也要進行一系列的判斷,同樣會把複雜度上升一個級別。

前面有提到 dtm 提供了子事務屏障的功能,保證了冪等、空補償等常見問題。

再來看看這個子事務屏障的功能有沒有幫我們簡化上面異常處理。

子事務屏障

子事務屏障,需要根據 trans_typegidbranch_idop 四個內容進行創建。

這 4 個內容 dtm 在回調時會放在 querysting 上面。

客戶端裏面提供了 IBranchBarrierFactory 來供我們使用。

空補償

針對上面的異常情況 (用戶 2 憑空消失 30 塊錢),對轉入的補償進行子事務屏障的改造。

app.MapPost("/api/BarrierTransInCompensate", async (string branch_id, string gid, string op, string trans_type, TransRequest req, IBranchBarrierFactory factory) =>
{
    var barrier = factory.CreateBranchBarrier(trans_type, gid, branch_id, op);

    using var db = Db.GeConn();
    await barrier.Call(db, async (tx) =>
    {
        // 轉入失敗的情況下,不應該輸出下面這個
        Console.WriteLine($"用戶【{req.UserId}】轉入【{req.Amount}】補償操作,gid={gid}, branch_id={branch_id}, op={op}");
        // tx 參數是事務,可和本地事務一起提交回滾
        await Task.CompletedTask;
    });

    Console.WriteLine($"子事務屏障-補償操作,gid={gid}, branch_id={branch_id}, op={op}");
    return Results.Ok(TransResponse.BuildSucceedResponse());
});

Call 方法就是關鍵所在了,需要傳入一個 DbConnection 和真正的業務操作,這裏的業務操作就是在控制檯輸出補償操作的信息。

同樣的,我們再調整一下調用方,把轉入補償操作替換成上面帶子事務屏障的接口。

var saga = new Saga(dtmClient, gid)
    .Add(outApi + "/TransOut", outApi + "/TransOutCompensate", userOutReq)
    .Add(inApi + "/TransInError", inApi + "/BarrierTransInCompensate", userInReq)
    ;

再來運行這個例子。

會發現轉入的補償操作並沒執行,控制檯沒有輸出補償信息,而是輸出了

Will not exec busiCall, isNullCompensation=True, isDuplicateOrPend=False

這個就表明了,這個請求是個空補償,是不應該執行業務方法的,即空操作。

再來看一下,轉入成功的,但是 dtm 收到了失敗的信號,不斷重試造成重複請求的情況。

冪等

針對用戶 2 轉入兩次 30 塊錢的異常情況,對轉入的正向操作進行子事務屏障的改造。

app.MapPost("/api/BarrierTransIn", async (string branch_id, string gid, string op, string trans_type, TransRequest req, IBranchBarrierFactory factory) =>
{
    Console.WriteLine($"用戶【{req.UserId}】轉入【{req.Amount}】請求來了!!!gid={gid}, branch_id={branch_id}, op={op}");

    var barrier = factory.CreateBranchBarrier(trans_type, gid, branch_id, op);

    using var db = Db.GeConn();
    await barrier.Call(db, async (tx) =>
    {
        var c = Interlocked.Increment(ref _errCount);

        // 模擬一個超時執行
        if (c > 0 && c < 2) await Task.Delay(10000);

        Console.WriteLine($"用戶【{req.UserId}】轉入【{req.Amount}】正向操作,gid={gid}, branch_id={branch_id}, op={op}");
        await Task.CompletedTask;
    });

    return Results.Ok(TransResponse.BuildSucceedResponse());
});

這裏通過一個超時執行來讓 dtm 進行轉入正向操作的重試。

同樣的,我們再調整一下調用方,把轉入的正向操作也替換成上面帶子事務屏障的接口。

var saga = new Saga(dtmClient, gid)
    .Add(outApi + "/TransOut", outApi + "/TransOutCompensate", userOutReq)
    .Add(inApi + "/BarrierTransIn", inApi + "/BarrierTransInCompensate", userInReq)
    ;

再來運行這個例子。

可以看到轉入的正向操作確實是觸發了多次,第一次實際上是成功,只是響應比較慢,導致 dtm 認爲是失敗了,觸發了第二次請求,但是第二次請求並沒有執行業務操作,而是輸出了

Will not exec busiCall, isNullCompensation=False, isDuplicateOrPend=True

這個就表明了,這個請求是個重複請求,是不應該執行業務方法的,保證了冪等。

到這裏,可以看出,子事務屏障確實解決了冪等和空補償的問題,大大降低了業務判斷的複雜度和出錯的可能性

寫在最後

在這篇文章裏,也通過幾個例子,完整給出了編寫一個 SAGA 事務的過程,涵蓋了正常成功完成,異常情況,以及成功回滾的情況。希望對研究分佈式事務的您有所幫助。

本文示例代碼:https://github.com/catcherwong-archive/2022/tree/main/DtmSagaSample

參考資料

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/dj7ZOX-EDKOtKsCa1GV3MQ