從零到一搭建 TCC 分佈式事務框架
0 前言
在寫這篇文章的過程中,我在另一邊並行完成了一個開源項目的搭建. 這個項目是基於 golang 從零到一實現的 TCC 分佈式事務框架,當前於 github 的開源地址爲:https://github.com/xiaoxuxiansheng/gotcc
本期分享內容將會緊密圍繞着這個開源項目展開. 受限於個人水平,在項目實現以及文章講解中有着諸多不當之處,權當在此拋磚引玉,歡迎大家多多批評指正.
1 架構設計
1.1 整體架構
首先我們簡單回顧一下有關於分佈式事務以及 TCC 的概念.
所謂事務,對應的語義是 “要麼什麼都不做,要麼全都做到位”,需要針對多個執行動作,建立一套一氣呵成、不可拆解的運行機制.
在事務中包括的一些執行動作,倘若涉及到跨數據庫、跨組件、跨服務等分佈式操作,那我們就稱這樣的事務是分佈式事務.
分佈式事務在實現上存在很多技術難點,是一個頗具挑戰的有趣話題. 目前業界也形成了一套相對成熟且普遍認同的解決方案,就是——TCC:Try-Confirm/Cancel.
TCC 本質上是一種 2PC(two phase commitment protocal 兩階段提交)的實現:
-
• 把分佈式事務中,負責維護狀態數據變更的模塊,封裝成一個 TCC 組件
-
• 把數據的變更狀態拆分爲對應 Try 操作的【凍結】、對應 Confirm 操作的【成功】以及對應 Cancel 操作的【失敗回滾】
-
• 抽出一個統籌全局的事務協調者角色 TXManager. 在執行分佈式事務時,分爲兩個階段:
-
• 階段 I:先對所有組件執行 Try 操作
-
• 階段 II:根據上階段 Try 操作的執行結果,決定本輪執行 Confirm 還是 Cancel 操作
在我們實現 TCC 框架的實戰環節中,首先需要明確的事情是:
-
• 哪部分內容在 TCC 架構中屬於通用的流程,這部分內容可以抽取出來放在 sdk 中,以供後續複用
-
• 哪部分內容需要給使用方預留出足夠的自由度,由使用方自行實現,然後和通用 sdk 進行接軌.
最終,這兩部分內容明確如下:
-
• 在 TCC sdk 中實現的通用邏輯包含了和事務協調器 txManager 有關的核心流程
-
• 事務協調器 TXManager 開啓事務以及 try-confirm/cancel 的 2PC 流程串聯
-
• 事務協調器 TXManager 異步輪詢任務,用於推進事務從中間態走向終態
-
• TCC 組件的註冊流程
-
• 需要預定義事務日誌存儲模塊 TXStore 的實現規範(聲明 interface)
-
• 需要預定義 TCC 組件 TCCComponent 的實現規範(聲明 interface)
-
• TCC 組件和 TXStore 兩部分內容需要由使用方自行實現:
-
• 使用方自行實現 TCCComponent 類,包括其 Try、Confirm、Cancel 方法的執行邏輯
-
• 使用方自行實現具體的 TXStore 日誌存儲模塊. 可以根據實際需要,選型合適的存儲組件和存儲方式
1.2 TCC Component
下面是關於 TCC 組件的定位:
-
• 這部分內容需要由用戶自行實現,並在 TXManager 啓動時將其註冊到註冊中心 RegistryCenter 當中.
-
• 當使用方調用 TXManager 開啓事務時,會通過 RegistryCenter 獲取這些組件,並對其進行使用
-
• TCC 組件需要具備的能力包括如下幾項:
1.3 TX Manager
下面是關於事務協調器 TXManager 的定位.
-
• TXManager 是整個 TCC 架構中最核心的角色
-
• TXManager 作爲 gotcc 的統一入口,供使用方執行啓動事務和註冊組件的操作
-
• TXManager 作爲中樞系統分別和 RegisterCenter、TXStore 交互
-
• TXManager 需要串聯起整個 Try-Confirm/Canel 的 2PC 調用流程
-
• TXManager 需要運行異步輪詢任務,推進未完成的事務走向終態
1.4 TX Store
TXStore 是用於存儲和管理事務日誌明細記錄的模塊:
-
• 需要支持事務明細數據的 CRUD 能力
-
• 通常情況下,底層需要應用到實際的存儲組件作爲支持
-
• TXStore 在 gotcc 的 sdk 中體現爲一個抽象的 interface. 需要由用戶完成具體類的實現,並將其注入到 TXManager 當中.
1.5 RegistryCenter
最後是 TCC 組件的註冊管理中心 RegistryCenter,負責給 txManager 提供出註冊和查詢 TCC 組件的能力.
2 TXManager 核心源碼講解
理完了基本的流程和概念,下面我們一起開啓一線實戰環節.
2.1 類圖
首先捋一下,在 gotcc 核心 sdk 中,涉及到的幾個核心類:
-
• TXManager:事務協調器,class
-
• TXStore:事務日誌存儲模塊,interface
-
• registryCenter:TCC 組件註冊管理中心,class
-
• TCCComponent:TCC 組件,interface
通過下面的 UML 類圖,展示一下幾個核心類之間的關聯性:
2.2 核心類定義
2.2.1 TXManager
下面是關於事務協調器 TXManager 的幾個核心字段:
-
• txStore:內置的事務日誌存儲模塊,需要由使用方實現並完成注入
-
• registryCenter:TCC 組件的註冊管理中心
-
• opts:內聚了一些 TXManager 的配置項,可以由使用方自定義,並通過 option 注入
-
• ctx:用於反映 TXManager 運行生命週期的的 context,當 ctx 終止時,異步輪詢任務也會隨之退出
-
• stop:用於停止 txManager 的控制器. 當 stop 被調用後,異步輪詢任務會被終止
type TXManager struct {
ctx context.Context
stop context.CancelFunc
opts *Options
txStore TXStore
registryCenter *registryCenter
}
func NewTXManager(txStore TXStore, opts ...Option) *TXManager {
ctx, cancel := context.WithCancel(context.Background())
txManager := TXManager{
opts: &Options{},
txStore: txStore,
registryCenter: newRegistryCenter(),
ctx: ctx,
stop: cancel,
}
for _, opt := range opts {
opt(txManager.opts)
}
repair(txManager.opts)
go txManager.run()
return &txManager
}
2.2.2 RegistryCenter
註冊中心 registryCenter 中的內容很簡單,通過 map 存儲所有註冊進來的 TCC 組件,要求各組件都有獨立的組件 ID;通過一把讀寫鎖 rwMutex 保護 map 的併發安全性
type registryCenter struct {
mux sync.RWMutex
components map[string]component.TCCComponent
}
func newRegistryCenter() *registryCenter {
return ®istryCenter{
components: make(map[string]component.TCCComponent),
}
}
2.2.3 TXStore
下面 gotcc sdk 中,對事務日誌存儲模塊 TXStore interface 的定義,這個點很重要,要求後續使用方在實現具體的 TXStore 模塊時,需要實現這裏所羅列出來的所有方法,並且要保證實現方法滿足預期的功能:
-
• CreateTX:創建一條事務明細記錄,會在入參中傳入本事務涉及的 TCC 組件列表,同時需要在出參中返回全局唯一的事務 id
-
• TXUpdate:更新一條事務明細記錄. 這裏指的更新,針對於,事務中某個 TCC 組件 Try 響應狀態的更新
-
• TXSubmit:提交一條事務的執行結果. 要麼置爲成功,要麼置爲失敗
-
• GetHangingTXs:獲取所有未完成的事務明細記錄
-
• GetTX:根據事務 id,獲取指定的一條事務明細記錄
-
• Lock:鎖住整個事務日誌存儲模塊(要求爲分佈式鎖)
-
• Unlock:解鎖整個事務日誌存儲模塊
type TXStore interface {
// 創建一條事務明細記錄
CreateTX(ctx context.Context, components ...component.TCCComponent) (txID string, err error)
// 更新事務進度:
// 規則爲:倘若有一個 component try 操作執行失敗,則整個事務失敗;倘若所有 component try 操作執行成功,則事務成功
TXUpdate(ctx context.Context, txID string, componentID string, accept bool) error
// 提交事務的最終狀態
TXSubmit(ctx context.Context, txID string, success bool) error
// 獲取到所有處於中間態的事務
GetHangingTXs(ctx context.Context) ([]*Transaction, error)
// 獲取指定的一筆事務
GetTX(ctx context.Context, txID string) (*Transaction, error)
// 鎖住事務日誌表
Lock(ctx context.Context, expireDuration time.Duration) error
// 解鎖事務日誌表
Unlock(ctx context.Context) error
}
2.3 註冊組件
下面是註冊 TCC 組件的處理流程:
首先,使用方通過 TXManager 對外暴露的公開方法 Register,開啓註冊流程,傳入對應的 TCCComponent:
func (t *TXManager) Register(component component.TCCComponent) error {
return t.registryCenter.register(component)
}
-
• TXManager 會調用註冊中心 registeryCenter 的 register 方法,將對應 component 注入到 map 中. 這裏有兩個點值得一提:
-
• Register 方法可以併發使用,其內部會通過 rwMutex 維護 map 的併發安全性
-
• TCC 組件不能重複註冊,即不能存在重複的 component id
func (r *registryCenter) register(component component.TCCComponent) error {
r.mux.Lock()
defer r.mux.Unlock()
if _, ok := r.components[component.ID()]; ok {
return errors.New("repeat component id")
}
r.components[component.ID()] = component
return nil
}
上游 TXManager 可以通過 component id,進行 TCC 組件的查詢. 倘若某個 component id 不存在,則會拋出錯誤:
func (r *registryCenter) getComponents(componentIDs ...string) ([]component.TCCComponent, error) {
components := make([]component.TCCComponent, 0, len(componentIDs))
r.mux.RLock()
defer r.mux.RUnlock()
for _, componentID := range componentIDs {
component, ok := r.components[componentID]
if !ok {
return nil, fmt.Errorf("component id: %s not existed", componentID)
}
components = append(components, component)
}
return components, nil
}
2.4 事務主流程
下面進入最核心的部分,介紹一下整個分佈式事務的運行流程.
2.4.1 主流程
用戶可以通過 txManager.Transaction 方法,一鍵啓動動一個分佈式事務流程,其中包含的幾個核心步驟展示如下圖:
txManager.Transaction 方法是用戶啓動分佈式事務的入口,需要在入參中聲明本次事務涉及到的組件以及需要在 Try 流程中傳遞給對應組件的請求參數:
type RequestEntity struct {
// 組件名稱
ComponentID string `json:"componentName"`
// Try 請求時傳遞的參數
Request map[string]interface{} `json:"request"`
}
txManager.Transaction 對應源碼如下,核心步驟均給出了註釋. 核心的 try-confirm/cancel 流程,會在後續的 txManager.twoPhaseCommit 方法中展開.
// 啓動事務
func (t *TXManager) Transaction(ctx context.Context, reqs ...*RequestEntity) (bool, error) {
// 1 限制分佈式事務執行時長
tctx, cancel := context.WithTimeout(ctx, t.opts.Timeout)
defer cancel()
// 2 獲得所有的涉及使用的 tcc 組件
componentEntities, err := t.getComponents(tctx, reqs...)
if err != nil {
return false, err
}
// 3 調用 txStore 模塊,創建新的事務明細記錄,並取得全局唯一的事務 id
txID, err := t.txStore.CreateTX(tctx, componentEntities.ToComponents()...)
if err != nil {
return false, err
}
// 4. 開啓兩階段提交流程:try-confirm/cancel
return t.twoPhaseCommit(ctx, txID, componentEntities)
}
2.4.2 2PC 串聯
此處涉及 try-confirm/cancel 流程的串聯,可以說是整個 gotcc 框架的精髓所在,請大家細品斟酌.
對應流程圖展示如下,方法源碼中也給出了相對詳細的註釋:
func (t *TXManager) twoPhaseCommit(ctx context.Context, txID string, componentEntities ComponentEntities) (bool, error) {
// 1 創建子 context 用於管理子 goroutine 生命週期
// 手握 cancel 終止器,能保證在需要的時候終止所有子 goroutine 生命週期
cctx, cancel := context.WithCancel(ctx)
defer cancel()
// 2 創建一個 chan,用於接收子 goroutine 傳遞的錯誤
errCh := make(chan error)
// 3 併發啓動,批量執行各 tcc 組件的 try 流程
go func() {
// 通過 waitGroup 進行多個子 goroutine 的彙總
var wg sync.WaitGroup
for _, componentEntity := range componentEntities {
// shadow
componentEntity := componentEntity
wg.Add(1)
// 併發執行各組件的 try 流程
go func() {
defer wg.Done()
resp, err := componentEntity.Component.Try(cctx, &component.TCCReq{
ComponentID: componentEntity.Component.ID(),
TXID: txID,
Data: componentEntity.Request,
})
// 出現 tcc 組件執行 try 操作失敗,則需要對事務明細記錄進行更新,同時把錯誤通過 chan 拋給父 goroutine
if err != nil || !resp.ACK {
// 對對應的事務進行更新
_ = t.txStore.TXUpdate(cctx, txID, componentEntity.Component.ID(), false)
errCh <- fmt.Errorf("component: %s try failed", componentEntity.Component.ID())
return
}
// try 請求成功,則對事務明細記錄進行更新. 倘若更新失敗,也要視爲錯誤,拋給父 goroutine
if err = t.txStore.TXUpdate(cctx, txID, componentEntity.Component.ID(), true); err != nil {
errCh <- err
}
}()
}
// 等待所有子 goroutine 運行完成
wg.Wait()
// 關閉 errCh,告知父 goroutine 所有任務已運行完成的信息
close(errCh)
}()
successful := true
// 4 通過 chan,阻塞子 goroutine 執行完成
// 4.1 但凡出現一個子 goroutine 遇到了錯誤,則會提前接收到錯誤,並調用 cancel 方法熔斷其他所有子 goroutine 流程
// 4.2 倘若所有子 goroutine 都執行成功,則會通過 chan 的關閉事件推進流程,對應 err 爲 nil
if err := <-errCh; err != nil {
// 只要有一筆 try 請求出現問題,其他的都進行終止
cancel()
successful = false
}
// 5 異步執行第二階段的 confirm/cancel 流程
// 之所以是異步,是因爲實際上在第一階段 try 的響應結果塵埃落定時,對應事務的成敗已經有了定論
// 第二階段能夠容忍異步執行的原因在於,執行失敗時,還有輪詢任務進行兜底
go t.advanceProgressByTXID(txID)
// 6 響應結果
// 6.1 倘若所有 try 請求都成功,則 successful 爲 try,事務成功
// 6.2 但凡有一個 try 請求處理出現問題,successful 爲 false,事務失敗
return successful, nil
}
2.4.3 事務進度推進
當一筆事務在第一階段中所有的 Try 請求都有了響應後,就需要根據第一階段的結果,執行第二階段的 Confirm 或者 Cancel 操作,並且將事務狀態推進爲成功或失敗的終態:
-
• 倘若所有組件的 Try 響應都是成功,則需要批量調用組件的 Confirm 接口,並在這之後將事務狀態更新爲成功
-
• 倘若存在某個組件 Try 響應爲失敗,則需要批量調用組件的 Cancel 接口,並在這之後將事務狀態更新爲失敗
-
• 倘若當前事務已執行超時,同樣需要批量調用組件的 Cancel 接口,並在這之後將事務狀態更新爲失敗
// 傳入一個事務 id 推進其進度
func (t *TXManager) advanceProgressByTXID(txID string) error {
// 獲取事務日誌明細
tx, err := t.txStore.GetTX(t.ctx, txID)
if err != nil {
return err
}
// 推進進度
return t.advanceProgress(tx)
}
// 傳入一個事務 id 推進其進度
func (t *TXManager) advanceProgress(tx *Transaction) error {
// 1 推斷出事務當前的狀態
// 1.1 倘若所有組件 try 都成功,則爲 successful
// 1.2 倘若存在組件 try 失敗,則爲 failure
// 1.3 倘若事務超時了,則爲 failure
// 1.4 否則事務狀態爲 hanging
txStatus := tx.getStatus(time.Now().Add(-t.opts.Timeout))
// hanging 狀態的事務暫時不處理
if txStatus == TXHanging {
return nil
}
// 2 根據事務是否成功,定製不同的處理函數
success := txStatus == TXSuccessful
var confirmOrCancel func(ctx context.Context, component component.TCCComponent) (*component.TCCResp, error)
var txAdvanceProgress func(ctx context.Context) error
if success {
// 如果事務成功,則需要對組件進行 confirm
confirmOrCancel = func(ctx context.Context, component component.TCCComponent) (*component.TCCResp, error) {
return component.Confirm(ctx, tx.TXID)
}
// 如果事務成功,則需要在最後更新事務日誌記錄的狀態爲成功
txAdvanceProgress = func(ctx context.Context) error {
return t.txStore.TXSubmit(ctx, tx.TXID, true)
}
} else {
// 如果事務失敗,則需要對組件進行 cancel
confirmOrCancel = func(ctx context.Context, component component.TCCComponent) (*component.TCCResp, error) {
return component.Cancel(ctx, tx.TXID)
}
// 如果事務失敗,則需要在最後更新事務日誌記錄的狀態爲失敗
txAdvanceProgress = func(ctx context.Context) error {
return t.txStore.TXSubmit(ctx, tx.TXID, false)
}
}
// 3 批量調用組件,執行第二階段的 confirm/cancel 操作
for _, component := range tx.Components {
// 獲取對應的 tcc component
components, err := t.registryCenter.getComponents(component.ComponentID)
if err != nil || len(components) == 0 {
return errors.New("get tcc component failed")
}
resp, err := confirmOrCancel(t.ctx, components[0])
if err != nil {
return err
}
if !resp.ACK {
return fmt.Errorf("component: %s ack failed", component.ComponentID)
}
}
// 4 二階段 confirm/cancel 操作都執行完成後,對事務狀態進行提交
return txAdvanceProgress(t.ctx)
}
2.5 異步輪詢流程
接下來聊聊 txManager 的異步輪詢流程. 這個流程同樣非常重要,是支撐 txManager 魯棒性的重要機制.
倘若存在事務已經完成第一階段 Try 操作的執行,但是第二階段沒執行成功,則需要由異步輪詢流程進行兜底處理,爲事務補齊第二階段的操作,並將事務狀態更新爲終態
2.5.1 啓動時機
異步輪詢任務是在 txManager 的初始化流程中啓動的,通過異步 goroutine 持久運行:
func NewTXManager(txStore TXStore, opts ...Option) *TXManager {
ctx, cancel := context.WithCancel(context.Background())
txManager := TXManager{
opts: &Options{},
txStore: txStore,
registryCenter: NewRegistryCenter(),
ctx: ctx,
stop: cancel,
}
for _, opt := range opts {
opt(txManager.opts)
}
repair(txManager.opts)
go txManager.run()
return &txManager
}
2.5.2 輪詢流程
異步輪詢任務運行時,基於 for 循環 + select 多路複用的方式,實現定時任務的執行.
輪詢的時間間隔會根據一輪任務處理過程中是否出現錯誤,而進行動態調整. 這裏調整規則指的是:當一次處理流程中發生了錯誤,就需要調大當前節點輪詢的時間間隔,讓其他節點的異步輪詢任務得到更大的執行機會.
func (t *TXManager) run() {
var tick time.Duration
var err error
// 1 for 循環自旋式運行任務
for {
// 如果處理過程中出現了錯誤,需要增長輪詢時間間隔
if err == nil {
tick = t.opts.MonitorTick
} else {
tick = t.backOffTick(tick)
}
// select 多路複用
select {
// 倘若 txManager.ctx 被終止,則異步輪詢任務退出
case <-t.ctx.Done():
return
// 2 等待 tick 對應時長後,開始執行任務
case <-time.After(tick):
// 對 txStore 加分佈式鎖,避免分佈式服務下多個服務節點的輪詢任務重複執行
if err = t.txStore.Lock(t.ctx, t.opts.MonitorTick); err != nil {
// 取鎖失敗時(大概率被其他節點佔有),不需要增加 tick 時長
err = nil
continue
}
// 3 獲取處於 hanging 狀態的事務
var txs []*Transaction
if txs, err = t.txStore.GetHangingTXs(t.ctx); err != nil {
_ = t.txStore.Unlock(t.ctx)
continue
}
// 4 批量推進事務進度
err = t.batchAdvanceProgress(txs)
_ = t.txStore.Unlock(t.ctx)
}
}
}
有關於輪詢時間間隔的退避謙讓策略爲:每次對時間間隔進行翻倍,封頂爲初始時長的 8 倍:
func (t *TXManager) backOffTick(tick time.Duration) time.Duration {
tick <<= 1
if threshold := t.opts.MonitorTick << 3; tick > threshold {
return threshold
}
return tick
}
2.5.3 批量推進事務進度
下面是異步輪詢任務批量推進事務第二階段執行的流程,核心是開啓多個 goroutine 併發對多項事務進行處理:
func (t *TXManager) batchAdvanceProgress(txs []*Transaction) error {
// 1 創建一個 chan,用於接收子 goroutine 傳輸的 err
errCh := make(chan error)
go func() {
// 2 通過 waitGroup 聚合多個子 groutine
var wg sync.WaitGroup
for _, tx := range txs {
// shadow
tx := tx
wg.Add(1)
go func() {
defer wg.Done()
// 3 推進每筆事務的進度
if err := t.advanceProgress(tx); err != nil {
// 遇到錯誤則投遞到 errCh
errCh <- err
}
}()
}
// 4 收口等待所有子 goroutine 執行完成
wg.Wait()
// 5 所有子 goroutine 執行完成後關閉 chan,喚醒阻塞等待的父 goroutine
close(errCh)
}()
// 記錄遇到的第一個錯誤
var firstErr error
// 6 父 goroutine 通過 chan 阻塞在這裏,直到所有 goroutine 執行完成,chan 被 close 才能往下
for err := range errCh {
// 記錄遇到的第一個錯誤
if firstErr != nil {
continue
}
firstErr = err
}
// 7 返回錯誤,核心是標識執行過程中,是否發生過錯誤
return firstErr
}
3 GOTCC 使用案例講解
從第 3 章開始,我們從實際應用 gotcc 框架的使用方視角出發,對所需要實現的模塊進行定義,然後給出應用 gotcc 框架的代碼示例.
3.1 TCC 組件實現
首先,我們對 TCC 組件的具體實現類進行定義:
3.1.1 類定義
定義一個 MockComponent 類,其中內置了 redis 客戶端,用於完成一些狀態數據的存取.
// 實現的 tcc 組件
type MockComponent struct {
// tcc 組件唯一標識 id,構造時由使用方傳入
id string
// redis 客戶端
client *redis_lock.Client
}
func NewMockComponent(id string, client *redis_lock.Client) *MockComponent {
return &MockComponent{
id: id,
client: client,
}
}
// 返回 tcc 組件的唯一標識 id
func (m *MockComponent) ID() string {
return m.id
}
3.1.2 Try 流程
下面實現一下 TCC 組件的 Try 方法,關鍵要點已於代碼中通過註釋的形式給出:
func (m *MockComponent) Try(ctx context.Context, req *component.TCCReq) (*component.TCCResp, error) {
// 1 基於 txID 維度加 redis 分佈式鎖
lock := redis_lock.NewRedisLock(pkg.BuildTXLockKey(m.id, req.TXID), m.client)
if err := lock.Lock(ctx); err != nil {
return nil, err
}
defer func() {
_ = lock.Unlock(ctx)
}()
// 2 基於 txID 冪等性去, 需要對事務的狀態進行檢查
txStatus, err := m.client.Get(ctx, pkg.BuildTXKey(m.id, req.TXID))
if err != nil && !errors.Is(err, redis_lock.ErrNil) {
return nil, err
}
res := component.TCCResp{
ComponentID: m.id,
TXID: req.TXID,
}
switch txStatus {
case TXTried.String(), TXConfirmed.String(): // 重複的 try 請求,給予成功的響應
res.ACK = true
return &res, nil
case TXCanceled.String(): // 此前該事務已 cancel,則拒絕本次 try 請求
return &res, nil
default:
}
// 3 建立 txID 與 bizID 的關聯
bizID := gocast.ToString(req.Data["biz_id"])
if _, err = m.client.Set(ctx, pkg.BuildTXDetailKey(m.id, req.TXID), bizID); err != nil {
return nil, err
}
// 4 把 bizID 對應的業務數據置爲凍結態
reply, err := m.client.SetNX(ctx, pkg.BuildDataKey(m.id, req.TXID, bizID), DataFrozen.String())
if err != nil {
return nil, err
}
// 倘若數據此前已凍結或已使用,則拒絕本次 try 請求
if reply != 1 {
return &res, nil
}
// 5 更新當前組件下的事務狀態爲 tried
_, err = m.client.Set(ctx, pkg.BuildTXKey(m.id, req.TXID), TXTried.String())
if err != nil {
return nil, err
}
// 6 給予接收 try 請求的響應
res.ACK = true
return &res, nil
}
3.1.3 Confirm 流程
下面實現一下 TCC 組件的 Confirm 方法,關鍵要點已於代碼中通過註釋的形式給出:
func (m *MockComponent) Confirm(ctx context.Context, txID string) (*component.TCCResp, error) {
// 1 基於 txID 維度加鎖
lock := redis_lock.NewRedisLock(pkg.BuildTXLockKey(m.id, txID), m.client)
if err := lock.Lock(ctx); err != nil {
return nil, err
}
defer func() {
_ = lock.Unlock(ctx)
}()
// 2. 校驗事務狀態,要求對應組件下,事務此前的狀態爲 tried
txStatus, err := m.client.Get(ctx, pkg.BuildTXKey(m.id, txID))
if err != nil {
return nil, err
}
res := component.TCCResp{
ComponentID: m.id,
TXID: txID,
}
switch txStatus {
case TXConfirmed.String(): // 事務狀態已 confirm,直接冪等響應爲成功
res.ACK = true
return &res, nil
case TXTried.String(): // 只有事務狀態爲 try 纔是合法的,會對程序放行
default: // 其他情況直接拒絕,ack 爲 false
return &res, nil
}
// 3 獲取事務對應的 bizID
bizID, err := m.client.Get(ctx, pkg.BuildTXDetailKey(m.id, txID))
if err != nil {
return nil, err
}
// 4. 校驗業務數據此前狀態是否爲凍結
dataStatus, err := m.client.Get(ctx, pkg.BuildDataKey(m.id, txID, bizID))
if err != nil {
return nil, err
}
// 如果此前非凍結態,則拒絕本次請求
if dataStatus != DataFrozen.String() {
return &res, nil
}
// 5 把業務數據的更新操作置爲 successful
if _, err = m.client.Set(ctx, pkg.BuildDataKey(m.id, txID, bizID), DataSuccessful.String()); err != nil {
return nil, err
}
// 6 把對應組件下的事務狀態更新爲成功,這一步哪怕失敗了也不阻塞主流程
_, _ = m.client.Set(ctx, pkg.BuildTXKey(m.id, txID), TXConfirmed.String())
// 7 處理成功,給予成功的響應
res.ACK = true
return &res, nil
}
3.1.4 Cancel 流程
下面實現一下 TCC 組件的 Cancel 方法,關鍵要點已於代碼中通過註釋的形式給出:
func (m *MockComponent) Cancel(ctx context.Context, txID string) (*component.TCCResp, error) {
// 1 基於 txID 維度加鎖
lock := redis_lock.NewRedisLock(pkg.BuildTXLockKey(m.id, txID), m.client)
if err := lock.Lock(ctx); err != nil {
return nil, err
}
defer func() {
_ = lock.Unlock(ctx)
}()
// 2 校驗事務狀態,只要不是 confirmed,都允許被置爲 canceld
txStatus, err := m.client.Get(ctx, pkg.BuildTXKey(m.id, txID))
if err != nil && !errors.Is(err, redis_lock.ErrNil) {
return nil, err
}
// 倘若組件內事務此前的狀態爲 confirmed,則說明流程有異常.
if txStatus == TXConfirmed.String() {
return nil, fmt.Errorf("invalid tx status: %s, txid: %s", txStatus, txID)
}
// 3 根據事務獲取對應的 bizID
bizID, err := m.client.Get(ctx, pkg.BuildTXDetailKey(m.id, txID))
if err != nil {
return nil, err
}
// 4 刪除對應的 frozen 凍結記錄,代表對數據執行了回滾操作
if err = m.client.Del(ctx, pkg.BuildDataKey(m.id, txID, bizID)); err != nil {
return nil, err
}
// 5 把事務狀態更新爲 canceled
_, err = m.client.Set(ctx, pkg.BuildTXKey(m.id, txID), TXCanceled.String())
if err != nil {
return nil, err
}
// 6 給予處理成功的 ack
return &component.TCCResp{
ACK: true,
ComponentID: m.id,
TXID: txID,
}, nil
}
3.2 TX Store 實現
接下來是關於事務日誌存儲模塊 TXStore 的具體實現:
3.2.1 類定義
聲明瞭一個 MockTXStore 類,裏面通過 mysql 存儲事務日誌明細數據,通過 redis 實現 TXStore 模塊的分佈式鎖.
其中和事務日誌明細數據庫直接交互的操作被封裝在 TXRecordDAO 當中.
// TXStore 模塊具體實現
type MockTXStore struct {
// redis 客戶端,用於實現分佈式鎖
client *redis_lock.Client
// 事務日誌存儲 DAO 層
dao *expdao.TXRecordDAO
}
func NewMockTXStore(dao *expdao.TXRecordDAO, client *redis_lock.Client) *MockTXStore {
return &MockTXStore{
dao: dao,
client: client,
}
}
事務日誌存儲 DAO 層:
type TXRecordDAO struct {
db *gorm.DB
}
func NewTXRecordDAO(db *gorm.DB) *TXRecordDAO {
return &TXRecordDAO{
db: db,
}
}
接下來是關於事務日誌明細記錄的持久化對象(PO,Persistent Object)模型定義:
-
• 內置了 gorm.Model,包含了主鍵 ID、創建時間 CreatedAt、更新時間 UpdatedAt、刪除時間 DeletedAt 幾個字段
-
• 事務狀態 Status,標識事務所處的狀態,分爲進行中 hanging、成功 successful、失敗 failure
-
• 組件 Try 響應明細記錄 ComponentTryStatuses: 記錄了事務下各組件 Try 請求響應結果,會以一個 json 字符串的格式存儲,其真實的類型爲 map[string]*ComponentTryStatus
type TXRecordPO struct {
gorm.Model
Status string `gorm:"status"`
ComponentTryStatuses string `gorm:"component_try_statuses"`
}
func (t TXRecordPO) TableName() string {
return "tx_record"
}
type ComponentTryStatus struct {
ComponentID string `json:"componentID"`
TryStatus string `json:"tryStatus"`
}
下面是事務日誌明細表的建表語句:
CREATE TABLE IF NOT EXISTS `tx_record`
(
`id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '主鍵ID',
`status` varchar(16) NOT NULL COMMENT '事務狀態 hanging/successful/failure',
`component_try_statuses` json DEFAULT NULL COMMENT '各組件 try 接口請求狀態 hanging/successful/failure',
`deleted_at` datetime DEFAULT NULL COMMENT '刪除時間',
`created_at` datetime NOT NULL COMMENT '創建時間',
`updated_at` datetime DEFAULT NULL ON UPDATE CURRENT_TIMESTAMP COMMENT '更新時間',
PRIMARY KEY (`id`) USING BTREE COMMENT '主鍵索引',
KEY `idx_status` (`status`) COMMENT '事務狀態索引'
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8mb4 COMMENT '事務日誌記錄';
3.2.2 創建事務記錄
接下來是通過過 TXStore 模塊創建一條事務明細記錄的實現代碼:
func (m *MockTXStore) CreateTX(ctx context.Context, components ...component.TCCComponent) (string, error) {
// 創建一個記錄組件 try 響應結果的 map,其中以組件 id 爲 key
componentTryStatuses := make(map[string]*expdao.ComponentTryStatus, len(components))
for _, component := range components {
componentTryStatuses[component.ID()] = &expdao.ComponentTryStatus{
ComponentID: component.ID(),
TryStatus: txmanager.TryHanging.String(),
}
}
statusesBody, _ := json.Marshal(componentTryStatuses)
// 創建事務明細記錄 po 示例,調用 dao 模塊將記錄落庫
txID, err := m.dao.CreateTXRecord(ctx, &expdao.TXRecordPO{
Status: txmanager.TXHanging.String(),
ComponentTryStatuses: string(statusesBody),
})
if err != nil {
return "", err
}
return gocast.ToString(txID), nil
}
dao 層創建事務明細記錄的實現代碼:
func (t *TXRecordDAO) CreateTXRecord(ctx context.Context, record *TXRecordPO) (uint, error) {
return record.ID, t.db.WithContext(ctx).Model(&TXRecordPO{}).Create(record).Error
}
3.2.3 事務明細更新
下面是更新一筆事務明細的方法,其處理流程是:
-
• 針對這筆事務記錄加寫鎖
-
• 根據組件的 try 響應結果,對 json 字符串進行更新
-
• 將事務記錄寫回表中.
func (m *MockTXStore) TXUpdate(ctx context.Context, txID string, componentID string, accept bool) error {
// 後續需要閉包傳入執行函數
do := func(ctx context.Context, dao *expdao.TXRecordDAO, record *expdao.TXRecordPO) error {
componentTryStatuses := make(map[string]*expdao.ComponentTryStatus)
_ = json.Unmarshal([]byte(record.ComponentTryStatuses), &componentTryStatuses)
if accept {
componentTryStatuses[componentID].TryStatus = txmanager.TrySucceesful.String()
} else {
componentTryStatuses[componentID].TryStatus = txmanager.TryFailure.String()
}
newBody, _ := json.Marshal(componentTryStatuses)
record.ComponentTryStatuses = string(newBody)
return dao.UpdateTXRecord(ctx, record)
}
_txID := gocast.ToUint(txID)
return m.dao.LockAndDo(ctx, _txID, do)
}
// 通過 gorm 實現數據記錄加寫鎖,並執行閉包函數的操作:
func (t *TXRecordDAO) LockAndDo(ctx context.Context, id uint, do func(ctx context.Context, dao *TXRecordDAO, record *TXRecordPO) error) error {
// 開啓事務
return t.db.Transaction(func(tx *gorm.DB) error {
defer func() {
if err := recover(); err != nil {
tx.Rollback()
}
}()
// 加寫鎖
var record TXRecordPO
if err := tx.Set("gorm:query_option", "FOR UPDATE").WithContext(ctx).First(&record, id).Error; err != nil {
return err
}
txDAO := NewTXRecordDAO(tx)
// 執行閉包函數
return do(ctx, txDAO, &record)
})
}
// 更新一條事務日誌數據記錄
func (t *TXRecordDAO) UpdateTXRecord(ctx context.Context, record *TXRecordPO) error {
return t.db.WithContext(ctx).Updates(record).Error
}
3.2.4 查詢事務
接下來是查詢事務的兩個方法:
// 根據事務 id 查詢指定的一筆事務明細記錄:
func (m *MockTXStore) GetTX(ctx context.Context, txID string) (*txmanager.Transaction, error) {
// 通過 option 在查詢條件中注入事務 id
records, err := m.dao.GetTXRecords(ctx, expdao.WithID(gocast.ToUint(txID)))
if err != nil {
return nil, err
}
if len(records) != 1 {
return nil, errors.New("get tx failed")
}
// 對各組件 try 明細內容進行反序列化
componentTryStatuses := make(map[string]*expdao.ComponentTryStatus)
_ = json.Unmarshal([]byte(records[0].ComponentTryStatuses), &componentTryStatuses)
components := make([]*txmanager.ComponentTryEntity, 0, len(componentTryStatuses))
for _, tryItem := range componentTryStatuses {
components = append(components, &txmanager.ComponentTryEntity{
ComponentID: tryItem.ComponentID,
TryStatus: txmanager.ComponentTryStatus(tryItem.TryStatus),
})
}
return &txmanager.Transaction{
TXID: txID,
Status: txmanager.TXStatus(records[0].Status),
Components: components,
CreatedAt: records[0].CreatedAt,
}, nil
}
// 獲取全量處於中間態的事務明細記錄
func (m *MockTXStore) GetHangingTXs(ctx context.Context) ([]*txmanager.Transaction, error) {
// 通過 option 在查詢條件中指定事務狀態爲 hanging
records, err := m.dao.GetTXRecords(ctx, expdao.WithStatus(txmanager.TryHanging))
if err != nil {
return nil, err
}
txs := make([]*txmanager.Transaction, 0, len(records))
for _, record := range records {
// 對各組件 try 響應結果進行反序列化
componentTryStatuses := make(map[string]*expdao.ComponentTryStatus)
_ = json.Unmarshal([]byte(record.ComponentTryStatuses), &componentTryStatuses)
components := make([]*txmanager.ComponentTryEntity, 0, len(componentTryStatuses))
for _, component := range componentTryStatuses {
components = append(components, &txmanager.ComponentTryEntity{
ComponentID: component.ComponentID,
TryStatus: txmanager.ComponentTryStatus(component.TryStatus),
})
}
txs = append(txs, &txmanager.Transaction{
TXID: gocast.ToString(record.ID),
Status: txmanager.TXHanging,
CreatedAt: record.CreatedAt,
Components: components,
})
}
return txs, nil
}
在 dao 層實現了一個通用的事務日誌查詢方法,通過 option 模式實現查詢條件的靈活組裝:
func (t *TXRecordDAO) GetTXRecords(ctx context.Context, opts ...QueryOption) ([]*TXRecordPO, error) {
db := t.db.WithContext(ctx).Model(&TXRecordPO{})
for _, opt := range opts {
db = opt(db)
}
var records []*TXRecordPO
return records, db.Scan(&records).Error
}
下面是關於 option 的具體定義,更多有關於這種模式的設計實現思路,可以參見我之前發表的文章——Golang 設計模式之建造者模式
type QueryOption func(db *gorm.DB) *gorm.DB
// 通過事務主鍵 id 進行查詢
func WithID(id uint) QueryOption {
return func(db *gorm.DB) *gorm.DB {
return db.Where("id = ?", id)
}
}
// 通過事務狀態進行查詢
func WithStatus(status txmanager.ComponentTryStatus) QueryOption {
return func(db *gorm.DB) *gorm.DB {
return db.Where("status = ?", status.String())
}
}
3.2.5 提交事務結果
接下來是在事務執行完成後,將執行結果更新到事務明細記錄中的處理方法:
// 提交事務的最終狀態
func (m *MockTXStore) TXSubmit(ctx context.Context, txID string, success bool) error {
do := func(ctx context.Context, dao *expdao.TXRecordDAO, record *expdao.TXRecordPO) error {
if success {
record.Status = txmanager.TXSuccessful.String()
} else {
record.Status = txmanager.TXFailure.String()
}
return dao.UpdateTXRecord(ctx, record)
}
return m.dao.LockAndDo(ctx, gocast.ToUint(txID), do)
}
3.2.6 加 / 解全局鎖
最後,是實現整個 txStore 模塊加 / 解鎖的處理方法,內部是基於 redis 實現的分佈式鎖:
func (m *MockTXStore) Lock(ctx context.Context, expireDuration time.Duration) error {
lock := redis_lock.NewRedisLock(pkg.BuildTXRecordLockKey(), m.client, redis_lock.WithExpireSeconds(int64(expireDuration.Seconds())))
return lock.Lock(ctx)
}
func (m *MockTXStore) Unlock(ctx context.Context) error {
lock := redis_lock.NewRedisLock(pkg.BuildTXRecordLockKey(), m.client)
return lock.Unlock(ctx)
}
到這裏爲止,所有前置準備工作都已經處理完成,接下來我們展示一個應用到 gotcc 框架的使用示例.
3.3 使用代碼示例
由於我實現的 txStore 和 tccComponent 需要依賴到 mysql 和 redis 兩個組件,因此在這裏需要輸入對應的信息.
單測代碼相對比較簡單,其中一些要點通過註釋給出:
const (
dsn = "請輸入你的 mysql dsn"
network = "tcp"
address = "請輸入你的 redis ip"
password = "請輸入你的 redis 密碼"
)
// 使用 tcc 單測代碼
func Test_TCC(t *testing.T) {
// 創建 redis 客戶端
redisClient := pkg.NewRedisClient(network, address, password)
// 創建 mysql 客戶端
mysqlDB, err := pkg.NewDB(dsn)
if err != nil {
t.Error(err)
return
}
// 構造三個 tcc 組件
componentAID := "componentA"
componentBID := "componentB"
componentCID := "componentC"
componentA := NewMockComponent(componentAID, redisClient)
componentB := NewMockComponent(componentBID, redisClient)
componentC := NewMockComponent(componentCID, redisClient)
// 構造出事務日誌存儲模塊
txRecordDAO := dao.NewTXRecordDAO(mysqlDB)
txStore := NewMockTXStore(txRecordDAO, redisClient)
// 構造出 txManager 模塊
txManager := txmanager.NewTXManager(txStore, txmanager.WithMonitorTick(time.Second))
defer txManager.Stop()
// 完成三個組件的註冊
if err := txManager.Register(componentA); err != nil {
t.Error(err)
return
}
if err := txManager.Register(componentB); err != nil {
t.Error(err)
return
}
if err := txManager.Register(componentC); err != nil {
t.Error(err)
return
}
ctx, cancel := context.WithTimeout(context.Background(), time.Second*30)
defer cancel()
// 啓動分佈式事務
success, err := txManager.Transaction(ctx, []*txmanager.RequestEntity{
{ComponentID: componentAID,
Request: map[string]interface{}{
"biz_id": componentAID + "_biz",
},
},
{ComponentID: componentBID,
Request: map[string]interface{}{
"biz_id": componentBID + "_biz",
},
},
{ComponentID: componentCID,
Request: map[string]interface{}{
"biz_id": componentCID + "_biz",
},
},
}...)
if err != nil {
t.Errorf("tx failed, err: %v", err)
return
}
if !success {
t.Error("tx failed")
return
}
// 分佈式事務處理成功
t.Log("success")
}
4 總結
到這裏,本文正文內容全部結束. 這裏回頭再對本期分享的內容做個總結:
-
• 本期我基於 golang 從零到一搭建了一個 TCC 分佈式事務框架的開源項目 gotcc,並圍繞着這個項目展開源碼級別的講解
-
• 在 gotcc 中,對事務協調器 TXManager 相關的核心處理邏輯,如 Try-Confirm/Cancel 兩階段流程串聯、TCC 組件註冊、異步輪詢任務等內容進行實現,並將這部分核心內容抽出放在了 SDK 中,供應用方使用
-
• 在 gotcc 中還定義了 TCC 組件和事務日誌存儲模塊的抽象 interface,這部分內容需要由應用方自行實現,並在使用 gotcc 時將其注入到 TXManager 當中
gotcc 的開源地址爲 https://github.com/xiaoxuxiansheng/gotcc . 大家走過路過,幫忙留個 star,非常感謝.
至此,有關 TCC 分佈式事務框架實戰篇內容全部講解完成. 個人水平有限,不當之處懇請大家不吝賜教.
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/aTQ6mgVUbUqn69NLmXh_7A