從零到一搭建 TCC 分佈式事務框架

0 前言

在寫這篇文章的過程中,我在另一邊並行完成了一個開源項目的搭建. 這個項目是基於 golang 從零到一實現的 TCC 分佈式事務框架,當前於 github 的開源地址爲:https://github.com/xiaoxuxiansheng/gotcc

本期分享內容將會緊密圍繞着這個開源項目展開. 受限於個人水平,在項目實現以及文章講解中有着諸多不當之處,權當在此拋磚引玉,歡迎大家多多批評指正.

1 架構設計

1.1 整體架構

首先我們簡單回顧一下有關於分佈式事務以及 TCC 的概念.

所謂事務,對應的語義是 “要麼什麼都不做,要麼全都做到位”,需要針對多個執行動作,建立一套一氣呵成、不可拆解的運行機制.

在事務中包括的一些執行動作,倘若涉及到跨數據庫、跨組件、跨服務等分佈式操作,那我們就稱這樣的事務是分佈式事務.

分佈式事務在實現上存在很多技術難點,是一個頗具挑戰的有趣話題. 目前業界也形成了一套相對成熟且普遍認同的解決方案,就是——TCC:Try-Confirm/Cancel.

TCC 本質上是一種 2PC(two phase commitment protocal 兩階段提交)的實現:

在我們實現 TCC 框架的實戰環節中,首先需要明確的事情是:

最終,這兩部分內容明確如下:

1.2 TCC Component

下面是關於 TCC 組件的定位:

1.3 TX Manager

下面是關於事務協調器 TXManager 的定位.

1.4 TX Store

TXStore 是用於存儲和管理事務日誌明細記錄的模塊:

1.5 RegistryCenter

最後是 TCC 組件的註冊管理中心 RegistryCenter,負責給 txManager 提供出註冊和查詢 TCC 組件的能力.

2 TXManager 核心源碼講解

理完了基本的流程和概念,下面我們一起開啓一線實戰環節.

2.1 類圖

首先捋一下,在 gotcc 核心 sdk 中,涉及到的幾個核心類:

通過下面的 UML 類圖,展示一下幾個核心類之間的關聯性:

2.2 核心類定義

2.2.1 TXManager

下面是關於事務協調器 TXManager 的幾個核心字段:

type TXManager struct {
    ctx            context.Context
    stop           context.CancelFunc
    opts           *Options
    txStore        TXStore
    registryCenter *registryCenter
}


func NewTXManager(txStore TXStore, opts ...Option) *TXManager {
    ctx, cancel := context.WithCancel(context.Background())
    txManager := TXManager{
        opts:           &Options{},
        txStore:        txStore,
        registryCenter: newRegistryCenter(),
        ctx:            ctx,
        stop:           cancel,
    }


    for _, opt := range opts {
        opt(txManager.opts)
    }


    repair(txManager.opts)


    go txManager.run()
    return &txManager
}

2.2.2 RegistryCenter

註冊中心 registryCenter 中的內容很簡單,通過 map 存儲所有註冊進來的 TCC 組件,要求各組件都有獨立的組件 ID;通過一把讀寫鎖 rwMutex 保護 map 的併發安全性

type registryCenter struct {
    mux        sync.RWMutex
    components map[string]component.TCCComponent
}


func newRegistryCenter() *registryCenter {
    return ®istryCenter{
        components: make(map[string]component.TCCComponent),
    }
}

2.2.3 TXStore

下面 gotcc sdk 中,對事務日誌存儲模塊 TXStore interface 的定義,這個點很重要,要求後續使用方在實現具體的 TXStore 模塊時,需要實現這裏所羅列出來的所有方法,並且要保證實現方法滿足預期的功能:

type TXStore interface {
    // 創建一條事務明細記錄
    CreateTX(ctx context.Context, components ...component.TCCComponent) (txID string, err error)
    // 更新事務進度:
    // 規則爲:倘若有一個 component try 操作執行失敗,則整個事務失敗;倘若所有 component try 操作執行成功,則事務成功
    TXUpdate(ctx context.Context, txID string, componentID string, accept bool) error
    // 提交事務的最終狀態
    TXSubmit(ctx context.Context, txID string, success bool) error
    // 獲取到所有處於中間態的事務
    GetHangingTXs(ctx context.Context) ([]*Transaction, error)
    // 獲取指定的一筆事務
    GetTX(ctx context.Context, txID string) (*Transaction, error)
    // 鎖住事務日誌表
    Lock(ctx context.Context, expireDuration time.Duration) error
    // 解鎖事務日誌表
    Unlock(ctx context.Context) error
}

2.3 註冊組件

下面是註冊 TCC 組件的處理流程:

首先,使用方通過 TXManager 對外暴露的公開方法 Register,開啓註冊流程,傳入對應的 TCCComponent:

func (t *TXManager) Register(component component.TCCComponent) error {
    return t.registryCenter.register(component)
}
func (r *registryCenter) register(component component.TCCComponent) error {
    r.mux.Lock()
    defer r.mux.Unlock()
    if _, ok := r.components[component.ID()]; ok {
        return errors.New("repeat component id")
    }
    r.components[component.ID()] = component
    return nil
}

上游 TXManager 可以通過 component id,進行 TCC 組件的查詢. 倘若某個 component id 不存在,則會拋出錯誤:

func (r *registryCenter) getComponents(componentIDs ...string) ([]component.TCCComponent, error) {
    components := make([]component.TCCComponent, 0, len(componentIDs))


    r.mux.RLock()
    defer r.mux.RUnlock()


    for _, componentID := range componentIDs {
        component, ok := r.components[componentID]
        if !ok {
            return nil, fmt.Errorf("component id: %s not existed", componentID)
        }
        components = append(components, component)
    }


    return components, nil
}

2.4 事務主流程

下面進入最核心的部分,介紹一下整個分佈式事務的運行流程.

2.4.1 主流程

用戶可以通過 txManager.Transaction 方法,一鍵啓動動一個分佈式事務流程,其中包含的幾個核心步驟展示如下圖:

txManager.Transaction 方法是用戶啓動分佈式事務的入口,需要在入參中聲明本次事務涉及到的組件以及需要在 Try 流程中傳遞給對應組件的請求參數:

type RequestEntity struct {
    // 組件名稱
    ComponentID string `json:"componentName"`
    // Try 請求時傳遞的參數
    Request map[string]interface{} `json:"request"`
}

txManager.Transaction 對應源碼如下,核心步驟均給出了註釋. 核心的 try-confirm/cancel 流程,會在後續的 txManager.twoPhaseCommit 方法中展開.

// 啓動事務
func (t *TXManager) Transaction(ctx context.Context, reqs ...*RequestEntity) (bool, error) {
    // 1 限制分佈式事務執行時長
    tctx, cancel := context.WithTimeout(ctx, t.opts.Timeout)
    defer cancel()


    // 2 獲得所有的涉及使用的 tcc 組件
    componentEntities, err := t.getComponents(tctx, reqs...)
    if err != nil {
        return false, err
    }


    // 3 調用 txStore 模塊,創建新的事務明細記錄,並取得全局唯一的事務 id
    txID, err := t.txStore.CreateTX(tctx, componentEntities.ToComponents()...)
    if err != nil {
        return false, err
    }


    // 4. 開啓兩階段提交流程:try-confirm/cancel
    return t.twoPhaseCommit(ctx, txID, componentEntities)
}

2.4.2 2PC 串聯

此處涉及 try-confirm/cancel 流程的串聯,可以說是整個 gotcc 框架的精髓所在,請大家細品斟酌.

對應流程圖展示如下,方法源碼中也給出了相對詳細的註釋:

func (t *TXManager) twoPhaseCommit(ctx context.Context, txID string, componentEntities ComponentEntities) (bool, error) {
    // 1 創建子 context 用於管理子 goroutine 生命週期
    // 手握 cancel 終止器,能保證在需要的時候終止所有子 goroutine 生命週期
    cctx, cancel := context.WithCancel(ctx)
    defer cancel()


    // 2 創建一個 chan,用於接收子 goroutine 傳遞的錯誤
    errCh := make(chan error)
    // 3 併發啓動,批量執行各 tcc 組件的 try 流程
    go func() {
        // 通過 waitGroup 進行多個子 goroutine 的彙總
        var wg sync.WaitGroup
        for _, componentEntity := range componentEntities {
            // shadow
            componentEntity := componentEntity
            wg.Add(1)
            // 併發執行各組件的 try 流程
            go func() {
                defer wg.Done()
                resp, err := componentEntity.Component.Try(cctx, &component.TCCReq{
                    ComponentID: componentEntity.Component.ID(),
                    TXID:        txID,
                    Data:        componentEntity.Request,
                })
                // 出現 tcc 組件執行 try 操作失敗,則需要對事務明細記錄進行更新,同時把錯誤通過 chan 拋給父 goroutine
                if err != nil || !resp.ACK {
                    // 對對應的事務進行更新
                    _ = t.txStore.TXUpdate(cctx, txID, componentEntity.Component.ID()false)
                    errCh <- fmt.Errorf("component: %s try failed", componentEntity.Component.ID())
                    return
                }
                // try 請求成功,則對事務明細記錄進行更新. 倘若更新失敗,也要視爲錯誤,拋給父 goroutine
                if err = t.txStore.TXUpdate(cctx, txID, componentEntity.Component.ID()true); err != nil {
                    errCh <- err
                }
            }()
        }


        // 等待所有子 goroutine 運行完成
        wg.Wait()
        // 關閉 errCh,告知父 goroutine 所有任務已運行完成的信息
        close(errCh)
    }()




    successful := true
    // 4 通過 chan,阻塞子 goroutine 執行完成
    // 4.1 但凡出現一個子 goroutine 遇到了錯誤,則會提前接收到錯誤,並調用 cancel 方法熔斷其他所有子 goroutine 流程
    // 4.2 倘若所有子 goroutine 都執行成功,則會通過 chan 的關閉事件推進流程,對應 err 爲 nil
    if err := <-errCh; err != nil {
        // 只要有一筆 try 請求出現問題,其他的都進行終止
        cancel()
        successful = false
    }


    // 5 異步執行第二階段的 confirm/cancel 流程
    // 之所以是異步,是因爲實際上在第一階段 try 的響應結果塵埃落定時,對應事務的成敗已經有了定論
    // 第二階段能夠容忍異步執行的原因在於,執行失敗時,還有輪詢任務進行兜底
    go t.advanceProgressByTXID(txID)
    
    // 6 響應結果
    // 6.1 倘若所有 try 請求都成功,則 successful 爲 try,事務成功
    // 6.2 但凡有一個 try 請求處理出現問題,successful 爲 false,事務失敗
    return successful, nil
}

2.4.3 事務進度推進

當一筆事務在第一階段中所有的 Try 請求都有了響應後,就需要根據第一階段的結果,執行第二階段的 Confirm 或者 Cancel 操作,並且將事務狀態推進爲成功或失敗的終態:

// 傳入一個事務 id 推進其進度
func (t *TXManager) advanceProgressByTXID(txID string) error {
    // 獲取事務日誌明細
    tx, err := t.txStore.GetTX(t.ctx, txID)
    if err != nil {
        return err
    }
    // 推進進度
    return t.advanceProgress(tx)
}
// 傳入一個事務 id 推進其進度
func (t *TXManager) advanceProgress(tx *Transaction) error {
    // 1 推斷出事務當前的狀態
    // 1.1 倘若所有組件 try 都成功,則爲 successful
    // 1.2 倘若存在組件 try 失敗,則爲 failure
    // 1.3 倘若事務超時了,則爲 failure
    // 1.4 否則事務狀態爲 hanging
    txStatus := tx.getStatus(time.Now().Add(-t.opts.Timeout))
    // hanging 狀態的事務暫時不處理
    if txStatus == TXHanging {
        return nil
    }


    // 2 根據事務是否成功,定製不同的處理函數
    success := txStatus == TXSuccessful
    var confirmOrCancel func(ctx context.Context, component component.TCCComponent) (*component.TCCResp, error)
    var txAdvanceProgress func(ctx context.Context) error
    if success {
        // 如果事務成功,則需要對組件進行 confirm
        confirmOrCancel = func(ctx context.Context, component component.TCCComponent) (*component.TCCResp, error) {
            return component.Confirm(ctx, tx.TXID)
        }
        // 如果事務成功,則需要在最後更新事務日誌記錄的狀態爲成功
        txAdvanceProgress = func(ctx context.Context) error {
            return t.txStore.TXSubmit(ctx, tx.TXID, true)
        }




    } else {
        // 如果事務失敗,則需要對組件進行 cancel
        confirmOrCancel = func(ctx context.Context, component component.TCCComponent) (*component.TCCResp, error) {           
            return component.Cancel(ctx, tx.TXID)
        }


        // 如果事務失敗,則需要在最後更新事務日誌記錄的狀態爲失敗
        txAdvanceProgress = func(ctx context.Context) error {           
            return t.txStore.TXSubmit(ctx, tx.TXID, false)
        }
    }


    // 3 批量調用組件,執行第二階段的 confirm/cancel 操作
    for _, component := range tx.Components {
        // 獲取對應的 tcc component
        components, err := t.registryCenter.getComponents(component.ComponentID)
        if err != nil || len(components) == 0 {
            return errors.New("get tcc component failed")
        }      
        resp, err := confirmOrCancel(t.ctx, components[0])
        if err != nil {
            return err
        }
        if !resp.ACK {
            return fmt.Errorf("component: %s ack failed", component.ComponentID)
        }
    }


    // 4 二階段 confirm/cancel 操作都執行完成後,對事務狀態進行提交
    return txAdvanceProgress(t.ctx)
}

2.5 異步輪詢流程

接下來聊聊 txManager 的異步輪詢流程. 這個流程同樣非常重要,是支撐 txManager 魯棒性的重要機制.

倘若存在事務已經完成第一階段 Try 操作的執行,但是第二階段沒執行成功,則需要由異步輪詢流程進行兜底處理,爲事務補齊第二階段的操作,並將事務狀態更新爲終態

2.5.1 啓動時機

異步輪詢任務是在 txManager 的初始化流程中啓動的,通過異步 goroutine 持久運行:

func NewTXManager(txStore TXStore, opts ...Option) *TXManager {
    ctx, cancel := context.WithCancel(context.Background())
    txManager := TXManager{
        opts:           &Options{},
        txStore:        txStore,
        registryCenter: NewRegistryCenter(),
        ctx:            ctx,
        stop:           cancel,
    }


    for _, opt := range opts {
        opt(txManager.opts)
    }


    repair(txManager.opts)


    go txManager.run()
    return &txManager
}

2.5.2 輪詢流程

異步輪詢任務運行時,基於 for 循環 + select 多路複用的方式,實現定時任務的執行.

輪詢的時間間隔會根據一輪任務處理過程中是否出現錯誤,而進行動態調整. 這裏調整規則指的是:當一次處理流程中發生了錯誤,就需要調大當前節點輪詢的時間間隔,讓其他節點的異步輪詢任務得到更大的執行機會.

func (t *TXManager) run() {
    var tick time.Duration
    var err error
    // 1 for 循環自旋式運行任務
    for {
        // 如果處理過程中出現了錯誤,需要增長輪詢時間間隔
        if err == nil {
            tick = t.opts.MonitorTick
        } else {
            tick = t.backOffTick(tick)
        }
        
        // select 多路複用
        select {
        // 倘若 txManager.ctx 被終止,則異步輪詢任務退出
        case <-t.ctx.Done():
            return


        // 2 等待 tick 對應時長後,開始執行任務
        case <-time.After(tick):
            // 對 txStore 加分佈式鎖,避免分佈式服務下多個服務節點的輪詢任務重複執行
            if err = t.txStore.Lock(t.ctx, t.opts.MonitorTick); err != nil {
                // 取鎖失敗時(大概率被其他節點佔有),不需要增加 tick 時長
                err = nil
                continue
            }


            // 3 獲取處於 hanging 狀態的事務
            var txs []*Transaction
            if txs, err = t.txStore.GetHangingTXs(t.ctx); err != nil {
                _ = t.txStore.Unlock(t.ctx)
                continue
            }
   
            // 4 批量推進事務進度
            err = t.batchAdvanceProgress(txs)
            _ = t.txStore.Unlock(t.ctx)
        }
    }
}

有關於輪詢時間間隔的退避謙讓策略爲:每次對時間間隔進行翻倍,封頂爲初始時長的 8 倍:

func (t *TXManager) backOffTick(tick time.Duration) time.Duration {
    tick <<= 1
    if threshold := t.opts.MonitorTick << 3; tick > threshold {
        return threshold
    }
    return tick
}

2.5.3 批量推進事務進度

下面是異步輪詢任務批量推進事務第二階段執行的流程,核心是開啓多個 goroutine 併發對多項事務進行處理:

func (t *TXManager) batchAdvanceProgress(txs []*Transaction) error {
    // 1 創建一個 chan,用於接收子 goroutine 傳輸的 err
    errCh := make(chan error)
    go func() {
        // 2 通過 waitGroup 聚合多個子 groutine
        var wg sync.WaitGroup
        for _, tx := range txs {
            // shadow
            tx := tx
            wg.Add(1)
            go func() {
                defer wg.Done()
                // 3 推進每筆事務的進度
                if err := t.advanceProgress(tx); err != nil {
                    // 遇到錯誤則投遞到 errCh
                    errCh <- err
                }
            }()
        }
        
        // 4 收口等待所有子 goroutine 執行完成
        wg.Wait()
        // 5 所有子 goroutine 執行完成後關閉 chan,喚醒阻塞等待的父 goroutine
        close(errCh)
    }()


    // 記錄遇到的第一個錯誤
    var firstErr error
    // 6 父 goroutine 通過 chan 阻塞在這裏,直到所有 goroutine 執行完成,chan 被 close 才能往下
    for err := range errCh {
        // 記錄遇到的第一個錯誤
        if firstErr != nil {
            continue
        }
        firstErr = err
    }


    // 7 返回錯誤,核心是標識執行過程中,是否發生過錯誤
    return firstErr
}

3 GOTCC 使用案例講解

從第 3 章開始,我們從實際應用 gotcc 框架的使用方視角出發,對所需要實現的模塊進行定義,然後給出應用 gotcc 框架的代碼示例.

3.1 TCC 組件實現

首先,我們對 TCC 組件的具體實現類進行定義:

3.1.1 類定義

定義一個 MockComponent 類,其中內置了 redis 客戶端,用於完成一些狀態數據的存取.

// 實現的 tcc 組件
type MockComponent struct {
    // tcc 組件唯一標識 id,構造時由使用方傳入
    id     string
    // redis 客戶端
    client *redis_lock.Client
}


func NewMockComponent(id string, client *redis_lock.Client) *MockComponent {
    return &MockComponent{
        id:     id,
        client: client,
    }
}


// 返回 tcc 組件的唯一標識 id
func (m *MockComponent) ID() string {
    return m.id
}

3.1.2 Try 流程

下面實現一下 TCC 組件的 Try 方法,關鍵要點已於代碼中通過註釋的形式給出:

func (m *MockComponent) Try(ctx context.Context, req *component.TCCReq) (*component.TCCResp, error) {
    // 1 基於 txID 維度加 redis 分佈式鎖
    lock := redis_lock.NewRedisLock(pkg.BuildTXLockKey(m.id, req.TXID), m.client)
    if err := lock.Lock(ctx); err != nil {
        return nil, err
    }
    defer func() {
        _ = lock.Unlock(ctx)
    }()


    // 2 基於 txID 冪等性去, 需要對事務的狀態進行檢查
    txStatus, err := m.client.Get(ctx, pkg.BuildTXKey(m.id, req.TXID))
    if err != nil && !errors.Is(err, redis_lock.ErrNil) {
        return nil, err
    }


    res := component.TCCResp{
        ComponentID: m.id,
        TXID:        req.TXID,
    }
    switch txStatus {
    case TXTried.String(), TXConfirmed.String(): // 重複的 try 請求,給予成功的響應
        res.ACK = true
        return &res, nil
    case TXCanceled.String(): // 此前該事務已 cancel,則拒絕本次 try 請求
        return &res, nil
    default:
    }


    // 3 建立 txID 與 bizID 的關聯
    bizID := gocast.ToString(req.Data["biz_id"])
    if _, err = m.client.Set(ctx, pkg.BuildTXDetailKey(m.id, req.TXID), bizID); err != nil {
        return nil, err
    }


    // 4 把 bizID 對應的業務數據置爲凍結態
    reply, err := m.client.SetNX(ctx, pkg.BuildDataKey(m.id, req.TXID, bizID), DataFrozen.String())
    if err != nil {
        return nil, err
    }
    // 倘若數據此前已凍結或已使用,則拒絕本次 try 請求
    if reply != 1 {
        return &res, nil
    }


    // 5 更新當前組件下的事務狀態爲 tried
    _, err = m.client.Set(ctx, pkg.BuildTXKey(m.id, req.TXID), TXTried.String())
    if err != nil {
        return nil, err
    }


    // 6 給予接收 try 請求的響應
    res.ACK = true
    return &res, nil
}

3.1.3 Confirm 流程

下面實現一下 TCC 組件的 Confirm 方法,關鍵要點已於代碼中通過註釋的形式給出:

func (m *MockComponent) Confirm(ctx context.Context, txID string) (*component.TCCResp, error) {
    // 1 基於 txID 維度加鎖
    lock := redis_lock.NewRedisLock(pkg.BuildTXLockKey(m.id, txID), m.client)
    if err := lock.Lock(ctx); err != nil {
        return nil, err
    }
    defer func() {
        _ = lock.Unlock(ctx)
    }()




    // 2. 校驗事務狀態,要求對應組件下,事務此前的狀態爲 tried
    txStatus, err := m.client.Get(ctx, pkg.BuildTXKey(m.id, txID))
    if err != nil {
        return nil, err
    }


    res := component.TCCResp{
        ComponentID: m.id,
        TXID:        txID,
    }
    switch txStatus {
    case TXConfirmed.String(): // 事務狀態已 confirm,直接冪等響應爲成功
        res.ACK = true
        return &res, nil
    case TXTried.String(): // 只有事務狀態爲 try 纔是合法的,會對程序放行
    default: // 其他情況直接拒絕,ack 爲 false
        return &res, nil
    }


    // 3 獲取事務對應的 bizID
    bizID, err := m.client.Get(ctx, pkg.BuildTXDetailKey(m.id, txID))
    if err != nil {
        return nil, err
    }


    // 4. 校驗業務數據此前狀態是否爲凍結
    dataStatus, err := m.client.Get(ctx, pkg.BuildDataKey(m.id, txID, bizID))
    if err != nil {
        return nil, err
    }
    
    // 如果此前非凍結態,則拒絕本次請求
    if dataStatus != DataFrozen.String() {
        return &res, nil
    }




    // 5 把業務數據的更新操作置爲 successful
    if _, err = m.client.Set(ctx, pkg.BuildDataKey(m.id, txID, bizID), DataSuccessful.String()); err != nil {
        return nil, err
    }


    // 6 把對應組件下的事務狀態更新爲成功,這一步哪怕失敗了也不阻塞主流程
    _, _ = m.client.Set(ctx, pkg.BuildTXKey(m.id, txID), TXConfirmed.String())


    // 7 處理成功,給予成功的響應
    res.ACK = true
    return &res, nil
}

3.1.4 Cancel 流程

下面實現一下 TCC 組件的 Cancel 方法,關鍵要點已於代碼中通過註釋的形式給出:

func (m *MockComponent) Cancel(ctx context.Context, txID string) (*component.TCCResp, error) {
    // 1 基於 txID 維度加鎖
    lock := redis_lock.NewRedisLock(pkg.BuildTXLockKey(m.id, txID), m.client)
    if err := lock.Lock(ctx); err != nil {
        return nil, err
    }
    defer func() {
        _ = lock.Unlock(ctx)
    }()


    // 2 校驗事務狀態,只要不是 confirmed,都允許被置爲 canceld
    txStatus, err := m.client.Get(ctx, pkg.BuildTXKey(m.id, txID))
    if err != nil && !errors.Is(err, redis_lock.ErrNil) {
        return nil, err
    }
    // 倘若組件內事務此前的狀態爲 confirmed,則說明流程有異常.
    if txStatus == TXConfirmed.String() {
        return nil, fmt.Errorf("invalid tx status: %s, txid: %s", txStatus, txID)
    }


    // 3 根據事務獲取對應的 bizID
    bizID, err := m.client.Get(ctx, pkg.BuildTXDetailKey(m.id, txID))
    if err != nil {
        return nil, err
    }


    // 4 刪除對應的 frozen 凍結記錄,代表對數據執行了回滾操作
    if err = m.client.Del(ctx, pkg.BuildDataKey(m.id, txID, bizID)); err != nil {
        return nil, err
    }


    // 5 把事務狀態更新爲 canceled
    _, err = m.client.Set(ctx, pkg.BuildTXKey(m.id, txID), TXCanceled.String())
    if err != nil {
        return nil, err
    }


    // 6 給予處理成功的 ack 
    return &component.TCCResp{
        ACK:         true,
        ComponentID: m.id,
        TXID:        txID,
    }, nil
}

3.2 TX Store 實現

接下來是關於事務日誌存儲模塊 TXStore 的具體實現:

3.2.1 類定義

聲明瞭一個 MockTXStore 類,裏面通過 mysql 存儲事務日誌明細數據,通過 redis 實現 TXStore 模塊的分佈式鎖.

其中和事務日誌明細數據庫直接交互的操作被封裝在 TXRecordDAO 當中.

// TXStore 模塊具體實現
type MockTXStore struct {
    // redis 客戶端,用於實現分佈式鎖
    client *redis_lock.Client
    // 事務日誌存儲 DAO 層
    dao    *expdao.TXRecordDAO
}


func NewMockTXStore(dao *expdao.TXRecordDAO, client *redis_lock.Client) *MockTXStore {
    return &MockTXStore{
        dao:    dao,
        client: client,
    }
}

事務日誌存儲 DAO 層:

type TXRecordDAO struct {
    db *gorm.DB
}


func NewTXRecordDAO(db *gorm.DB) *TXRecordDAO {
    return &TXRecordDAO{
        db: db,
    }
}

接下來是關於事務日誌明細記錄的持久化對象(PO,Persistent Object)模型定義:

type TXRecordPO struct {
    gorm.Model
    Status               string `gorm:"status"`
    ComponentTryStatuses string `gorm:"component_try_statuses"`
}


func (t TXRecordPO) TableName() string {
    return "tx_record"
}


type ComponentTryStatus struct {
    ComponentID string `json:"componentID"`
    TryStatus   string `json:"tryStatus"`
}

下面是事務日誌明細表的建表語句:

CREATE TABLE IF NOT EXISTS `tx_record`
(
    `id`                       bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '主鍵ID',
    `status`                   varchar(16) NOT NULL COMMENT '事務狀態 hanging/successful/failure',
    `component_try_statuses`   json DEFAULT NULL COMMENT '各組件 try 接口請求狀態 hanging/successful/failure',
    `deleted_at`        datetime     DEFAULT NULL COMMENT '刪除時間',
    `created_at`        datetime     NOT NULL COMMENT '創建時間',
    `updated_at`        datetime     DEFAULT NULL ON UPDATE CURRENT_TIMESTAMP COMMENT '更新時間',
    PRIMARY KEY (`id`) USING BTREE COMMENT '主鍵索引',
    KEY `idx_status` (`status`) COMMENT '事務狀態索引'
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8mb4 COMMENT '事務日誌記錄';

3.2.2 創建事務記錄

接下來是通過過 TXStore 模塊創建一條事務明細記錄的實現代碼:

func (m *MockTXStore) CreateTX(ctx context.Context, components ...component.TCCComponent) (string, error) {
    // 創建一個記錄組件 try 響應結果的 map,其中以組件 id 爲 key
    componentTryStatuses := make(map[string]*expdao.ComponentTryStatus, len(components))
    for _, component := range components {
        componentTryStatuses[component.ID()] = &expdao.ComponentTryStatus{
            ComponentID: component.ID(),
            TryStatus:   txmanager.TryHanging.String(),
        }
    }


    statusesBody, _ := json.Marshal(componentTryStatuses)
    // 創建事務明細記錄 po 示例,調用 dao 模塊將記錄落庫
    txID, err := m.dao.CreateTXRecord(ctx, &expdao.TXRecordPO{
        Status:               txmanager.TXHanging.String(),
        ComponentTryStatuses: string(statusesBody),
    })
    if err != nil {
        return "", err
    }


    return gocast.ToString(txID), nil
}

dao 層創建事務明細記錄的實現代碼:

func (t *TXRecordDAO) CreateTXRecord(ctx context.Context, record *TXRecordPO) (uint, error) {
    return record.ID, t.db.WithContext(ctx).Model(&TXRecordPO{}).Create(record).Error
}

3.2.3 事務明細更新

下面是更新一筆事務明細的方法,其處理流程是:

func (m *MockTXStore) TXUpdate(ctx context.Context, txID string, componentID string, accept bool) error {
    // 後續需要閉包傳入執行函數
    do := func(ctx context.Context, dao *expdao.TXRecordDAO, record *expdao.TXRecordPO) error {
        componentTryStatuses := make(map[string]*expdao.ComponentTryStatus)
        _ = json.Unmarshal([]byte(record.ComponentTryStatuses)&componentTryStatuses)
        if accept {
            componentTryStatuses[componentID].TryStatus = txmanager.TrySucceesful.String()
        } else {
            componentTryStatuses[componentID].TryStatus = txmanager.TryFailure.String()
        }
        newBody, _ := json.Marshal(componentTryStatuses)
        record.ComponentTryStatuses = string(newBody)
        return dao.UpdateTXRecord(ctx, record)
    }


    _txID := gocast.ToUint(txID)
    return m.dao.LockAndDo(ctx, _txID, do)
}
// 通過 gorm 實現數據記錄加寫鎖,並執行閉包函數的操作:
func (t *TXRecordDAO) LockAndDo(ctx context.Context, id uint, do func(ctx context.Context, dao *TXRecordDAO, record *TXRecordPO) error) error {
    // 開啓事務
    return t.db.Transaction(func(tx *gorm.DB) error {
        defer func() {
            if err := recover(); err != nil {
                tx.Rollback()
            }
        }()


        // 加寫鎖
        var record TXRecordPO
        if err := tx.Set("gorm:query_option""FOR UPDATE").WithContext(ctx).First(&record, id).Error; err != nil {
            return err
        }


        txDAO := NewTXRecordDAO(tx)
        // 執行閉包函數
        return do(ctx, txDAO, &record)
    })
}
// 更新一條事務日誌數據記錄
func (t *TXRecordDAO) UpdateTXRecord(ctx context.Context, record *TXRecordPO) error {
    return t.db.WithContext(ctx).Updates(record).Error
}

3.2.4 查詢事務

接下來是查詢事務的兩個方法:

// 根據事務 id 查詢指定的一筆事務明細記錄:
func (m *MockTXStore) GetTX(ctx context.Context, txID string) (*txmanager.Transaction, error) {
    // 通過 option 在查詢條件中注入事務 id
    records, err := m.dao.GetTXRecords(ctx, expdao.WithID(gocast.ToUint(txID)))
    if err != nil {
        return nil, err
    }
    if len(records) != 1 {
        return nil, errors.New("get tx failed")
    }
 
    // 對各組件 try 明細內容進行反序列化
    componentTryStatuses := make(map[string]*expdao.ComponentTryStatus)
    _ = json.Unmarshal([]byte(records[0].ComponentTryStatuses)&componentTryStatuses)


    components := make([]*txmanager.ComponentTryEntity, 0, len(componentTryStatuses))
    for _, tryItem := range componentTryStatuses {
        components = append(components, &txmanager.ComponentTryEntity{
            ComponentID: tryItem.ComponentID,
            TryStatus:   txmanager.ComponentTryStatus(tryItem.TryStatus),
        })
    }
    return &txmanager.Transaction{
        TXID:       txID,
        Status:     txmanager.TXStatus(records[0].Status),
        Components: components,
        CreatedAt:  records[0].CreatedAt,
    }, nil
}
// 獲取全量處於中間態的事務明細記錄
func (m *MockTXStore) GetHangingTXs(ctx context.Context) ([]*txmanager.Transaction, error) {
    // 通過 option 在查詢條件中指定事務狀態爲 hanging
    records, err := m.dao.GetTXRecords(ctx, expdao.WithStatus(txmanager.TryHanging))
    if err != nil {
        return nil, err
    }


    txs := make([]*txmanager.Transaction, 0, len(records))
    for _, record := range records {
        // 對各組件 try 響應結果進行反序列化
        componentTryStatuses := make(map[string]*expdao.ComponentTryStatus)
        _ = json.Unmarshal([]byte(record.ComponentTryStatuses)&componentTryStatuses)
        components := make([]*txmanager.ComponentTryEntity, 0, len(componentTryStatuses))
        for _, component := range componentTryStatuses {
            components = append(components, &txmanager.ComponentTryEntity{
                ComponentID: component.ComponentID,
                TryStatus:   txmanager.ComponentTryStatus(component.TryStatus),
            })
        }


        txs = append(txs, &txmanager.Transaction{
            TXID:       gocast.ToString(record.ID),
            Status:     txmanager.TXHanging,
            CreatedAt:  record.CreatedAt,
            Components: components,
        })
    }


    return txs, nil
}

在 dao 層實現了一個通用的事務日誌查詢方法,通過 option 模式實現查詢條件的靈活組裝:

func (t *TXRecordDAO) GetTXRecords(ctx context.Context, opts ...QueryOption) ([]*TXRecordPO, error) {
    db := t.db.WithContext(ctx).Model(&TXRecordPO{})
    for _, opt := range opts {
        db = opt(db)
    }


    var records []*TXRecordPO
    return records, db.Scan(&records).Error
}

下面是關於 option 的具體定義,更多有關於這種模式的設計實現思路,可以參見我之前發表的文章——Golang 設計模式之建造者模式

type QueryOption func(db *gorm.DB) *gorm.DB


// 通過事務主鍵 id 進行查詢
func WithID(id uint) QueryOption {
    return func(db *gorm.DB) *gorm.DB {
        return db.Where("id = ?", id)
    }
}


// 通過事務狀態進行查詢
func WithStatus(status txmanager.ComponentTryStatus) QueryOption {
    return func(db *gorm.DB) *gorm.DB {
        return db.Where("status = ?", status.String())
    }
}

3.2.5 提交事務結果

接下來是在事務執行完成後,將執行結果更新到事務明細記錄中的處理方法:

// 提交事務的最終狀態
func (m *MockTXStore) TXSubmit(ctx context.Context, txID string, success bool) error {
    do := func(ctx context.Context, dao *expdao.TXRecordDAO, record *expdao.TXRecordPO) error {
        if success {
            record.Status = txmanager.TXSuccessful.String()
        } else {
            record.Status = txmanager.TXFailure.String()
        }
        return dao.UpdateTXRecord(ctx, record)
    }
    return m.dao.LockAndDo(ctx, gocast.ToUint(txID)do)
}

3.2.6 加 / 解全局鎖

最後,是實現整個 txStore 模塊加 / 解鎖的處理方法,內部是基於 redis 實現的分佈式鎖:

func (m *MockTXStore) Lock(ctx context.Context, expireDuration time.Duration) error {
    lock := redis_lock.NewRedisLock(pkg.BuildTXRecordLockKey(), m.client, redis_lock.WithExpireSeconds(int64(expireDuration.Seconds())))
    return lock.Lock(ctx)
}


func (m *MockTXStore) Unlock(ctx context.Context) error {
    lock := redis_lock.NewRedisLock(pkg.BuildTXRecordLockKey(), m.client)
    return lock.Unlock(ctx)
}

到這裏爲止,所有前置準備工作都已經處理完成,接下來我們展示一個應用到 gotcc 框架的使用示例.

3.3 使用代碼示例

由於我實現的 txStore 和 tccComponent 需要依賴到 mysql 和 redis 兩個組件,因此在這裏需要輸入對應的信息.

單測代碼相對比較簡單,其中一些要點通過註釋給出:

const (
    dsn      = "請輸入你的 mysql dsn"
    network  = "tcp"
    address  = "請輸入你的 redis ip"
    password = "請輸入你的 redis 密碼"
)


// 使用 tcc 單測代碼
func Test_TCC(t *testing.T) {
    // 創建 redis 客戶端
    redisClient := pkg.NewRedisClient(network, address, password)
    // 創建 mysql 客戶端
    mysqlDB, err := pkg.NewDB(dsn)
    if err != nil {
        t.Error(err)
        return
    }


    // 構造三個 tcc 組件
    componentAID := "componentA"
    componentBID := "componentB"
    componentCID := "componentC"
    componentA := NewMockComponent(componentAID, redisClient)
    componentB := NewMockComponent(componentBID, redisClient)
    componentC := NewMockComponent(componentCID, redisClient)


    // 構造出事務日誌存儲模塊
    txRecordDAO := dao.NewTXRecordDAO(mysqlDB)
    txStore := NewMockTXStore(txRecordDAO, redisClient)


    // 構造出 txManager 模塊
    txManager := txmanager.NewTXManager(txStore, txmanager.WithMonitorTick(time.Second))
    defer txManager.Stop()


    // 完成三個組件的註冊
    if err := txManager.Register(componentA); err != nil {
        t.Error(err)
        return
    }


    if err := txManager.Register(componentB); err != nil {
        t.Error(err)
        return
    }


    if err := txManager.Register(componentC); err != nil {
        t.Error(err)
        return
    }


    
    ctx, cancel := context.WithTimeout(context.Background(), time.Second*30)
    defer cancel()
    // 啓動分佈式事務
    success, err := txManager.Transaction(ctx, []*txmanager.RequestEntity{
        {ComponentID: componentAID,
            Request: map[string]interface{}{
                "biz_id": componentAID + "_biz",
            },
        },
        {ComponentID: componentBID,
            Request: map[string]interface{}{
                "biz_id": componentBID + "_biz",
            },
        },
        {ComponentID: componentCID,
            Request: map[string]interface{}{
                "biz_id": componentCID + "_biz",
            },
        },
    }...)
    if err != nil {
        t.Errorf("tx failed, err: %v", err)
        return
    }
    if !success {
        t.Error("tx failed")
        return
    }
    
    // 分佈式事務處理成功
    t.Log("success")
}

4 總結

到這裏,本文正文內容全部結束. 這裏回頭再對本期分享的內容做個總結:

gotcc 的開源地址爲 https://github.com/xiaoxuxiansheng/gotcc . 大家走過路過,幫忙留個 star,非常感謝.

至此,有關 TCC 分佈式事務框架實戰篇內容全部講解完成. 個人水平有限,不當之處懇請大家不吝賜教.

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/aTQ6mgVUbUqn69NLmXh_7A