清晰架構(Clean Architecture)的 Go 微服務: 事物管理

爲了支持業務層中的事務,我試圖在 Go 中查找類似 Spring 的聲明式事務管理,但是沒找到,所以我決定自己寫一個。 事務很容易在 Go 中實現,但很難做到正確地實現。

需求:
  1. 將業務邏輯與事務代碼分開。
    在編寫業務用例時,開發者應該只需考慮業務邏輯,不需要同時考慮怎樣給業務邏輯加事務管理。如果以後需要添加事務支持,你可以在現有業務邏輯的基礎上進行簡單封裝,而無需更改任何其他代碼。事務實現細節應該對業務邏輯透明。
  2. 事務邏輯應該作用於用例層(業務邏輯)
    不在持久層上。
  3. 數據服務(數據持久性)層應對事務邏輯透明。
    這意味着持久性代碼應該是相同的,無論它是否支持事務
  4. 你可以選擇延遲支持事物。
    你可以先編寫沒有事務的用例,稍後可以在不修改現有代碼的情況下給該用例加上事務。你只需添加新代碼。

我最終的解決方案還不是聲明式事務管理,但它非常接近。創建一個真正的聲明式事務管理需要付出很多努力,因此我構建了一個可以實現聲明式事務的大多數功能的事務管理,同時又沒花很多精力。

方案:

最終解決方案涉及本程序的所有層級,我將逐一解釋它們。

數據庫鏈接封裝

在 Go 的 “sql”lib 中,有兩個數據庫鏈接 sql.DB 和 sql.Tx. 不需要事務時,使用 sql.DB 訪問數據庫; 當需要事務時,你使用 sql.Tx. 爲了共享代碼,持久層需要同時支持兩者。 因此需要對數據庫鏈接進行封裝,然後把它作爲數據庫訪問方法的接收器。 我從這裏 ¹ 得到了粗略的想法。

// SqlGdbc (SQL Go database connection) is a wrapper for SQL database handler ( can be *sql.DB or *sql.Tx)
// It should be able to work with all SQL data that follows SQL standard.
type SqlGdbc interface {
    Exec(query string, args ...interface{}) (sql.Result, error)
    Prepare(query string) (*sql.Stmt, error)
    Query(query string, args ...interface{}) (*sql.Rows, error)
    QueryRow(query string, args ...interface{}) *sql.Row
    // If need transaction support, add this interface
    Transactioner
}

// SqlDBTx is the concrete implementation of sqlGdbc by using *sql.DB
type SqlDBTx struct {
    DB *sql.DB
}

// SqlConnTx is the concrete implementation of sqlGdbc by using *sql.Tx
type SqlConnTx struct {
    DB *sql.Tx
}

數據庫實現類型 SqlDBTx 和 sqlConnTx 都需要實現 SqlGdbc 接口(包括 “Transactioner”)接口才行。 需要爲每個數據庫(例如 MySQL, CouchDB)實現“Transactioner” 接口以支持事務。

// Transactioner is the transaction interface for database handler
// It should only be applicable to SQL database
type Transactioner interface {
    // Rollback a transaction
    Rollback() error
    // Commit a transaction
    Commit() error
    // TxEnd commits a transaction if no errors, otherwise rollback
    // txFunc is the operations wrapped in a transaction
    TxEnd(txFunc func() error) error
    // TxBegin gets *sql.DB from receiver and return a SqlGdbc, which has a *sql.Tx
    TxBegin() (SqlGdbc, error)
}

數據庫存儲層(datastore layer)的事物管理代碼

以下是 “Transactioner” 接口的實現代碼,其中只有 TxBegin()是在 SqlDBTx(sql.DB)上實現,因爲事務從 sql.DB 開始,然後所有事務的其他操作都在 SqlConnTx(sql.Tx)上。 我從這裏 ² 得到了這個想法。

// TransactionBegin starts a transaction
func (sdt *SqlDBTx) TxBegin() (gdbc.SqlGdbc, error) {
    tx, err := sdt.DB.Begin()
    sct := SqlConnTx{tx}
    return &sct, err
}

func (sct *SqlConnTx) TxEnd(txFunc func() error) error {
    var err error
    tx := sct.DB

    defer func() {
        if p := recover(); p != nil {
            tx.Rollback()
            panic(p) // re-throw panic after Rollback
        } else if err != nil {
            tx.Rollback() // err is non-nil; don't change it
        } else {
            err = tx.Commit() // if Commit returns error update err with commit err
        }
    }()
    err = txFunc()
    return err
}

func (sct *SqlConnTx) Rollback() error {
    return sct.DB.Rollback()
}

用例層的事物接口

在用例層中,你可以擁有相同業務功能的一個函數的兩個版本,一個支持事務,一個不支持,並且它們的名稱可以共享相同的前綴,而事務可以添加 “withTx” 作爲後綴。 例如,在以下代碼中,“ModifyAndUnregister”是不支持事務的那個,“ModifyAndUnregisterWithTx”是支持事務的那個。 “EnableTxer”是用例層上唯一的事務支持接口,任何支持事務的 “用例” 都需要它。 這裏的所有代碼都在是用例層級(包括“EnableTxer”)代碼,不涉及數據庫內容。

type RegistrationUseCaseInterface interface {
...
    // ModifyAndUnregister change user information and then unregister the user based on the User.Id passed in.
    // It is created to illustrate transaction, no real use.
    ModifyAndUnregister(user *model.User) error
    // ModifyAndUnregisterWithTx change user information and then unregister the user based on the User.Id passed in.
    // It supports transaction
    // It is created to illustrate transaction, no real use.
    ModifyAndUnregisterWithTx(user *model.User) error
    // EnableTx enable transaction support on use case. Need to be included for each use case needs transaction
    // It replaces the underline database handler to sql.Tx for each data service that used by this use case
    EnableTxer
}
// EnableTxer is the transaction interface for use case layer
type EnableTxer interface {
    EnableTx()
}

以下是不包含事務的業務邏輯代碼的示例。 “modifyAndUnregister(ruc,user)”是事務和非事務用例函數共享的業務功能。 你需要使用 TxBegin()和 TxEnd()(在 TxDataInterface 中)來包裝業務功能以支持事務,這些是數據服務層接口,並且與數據庫訪問層無關。 該用例還實現了 “EnableTx()” 接口,該接口實際上將底層數據庫鏈接從 sql.DB 切換到 sql.Tx.

// The use case of ModifyAndUnregister without transaction
func (ruc *RegistrationUseCase) ModifyAndUnregister(user *model.User) error {
    return modifyAndUnregister(ruc, user)
}

// The use case of ModifyAndUnregister with transaction
func (ruc *RegistrationUseCase) ModifyAndUnregisterWithTx(user *model.User) error {
    tdi, err := ruc.TxDataInterface.TxBegin()
    if err != nil {
        return errors.Wrap(err, "")
    }
    ruc.EnableTx()
    return tdi.TxEnd(func() error {
        // wrap the business function inside the TxEnd function
        return modifyAndUnregister(ruc, user)
    })
}

// The business function will be wrapped inside a transaction and inside a non-transaction function
// It needs to be written in a way that every error will be returned so it can be catched by TxEnd() function,
// which will handle commit and rollback
func modifyAndUnregister(ruc *RegistrationUseCase, user *model.User) error {
    udi := ruc.UserDataInterface
    err := modifyUser(udi, user)
    if err != nil {
        return errors.Wrap(err, "")
    }
    err = unregisterUser(udi, user.Name)
    if err != nil {
        return errors.Wrap(err, "")
    }
    return nil
}

func (ruc *RegistrationUseCase) EnableTx() {
    // Only UserDataInterface need transaction support here. If there are other data services need it,
    // then they also need to enable transaction here
    ruc.UserDataInterface.EnableTx(ruc.TxDataInterface)
}

爲什麼我需要在 “TxDataInterface” 中調用函數 “EnbaleTx” 來替換底層數據庫鏈接而不是直接在用例中執行? 因爲 sql.DB 和 sql.Tx 層級要比用例層低幾個級別,直接調用會搞砸依賴關係。 保持合理依賴關係的訣竅是在每一層上都有 TxBegin()和 TxEnd()並逐層調用它們以維持合理的依賴關係。

數據服務層的事物接口

我們討論了用例層和數據存儲層上的事務功能,我們還需要數據服務層中的事務功能將這兩者連接在一起。 以下代碼是數據服務層的事務接口(“TxDataInterface”)。 “TxDataInterface”是僅爲事物管理而創建的數據服務層接口。 每個數據庫只需要實現一次。 還有一個 “EnableTxer” 接口(這是一個數據服務層接口,不要與用例層中的 “EnableTxer” 接口混淆),實現 “EnableTxer” 接口將開啓數據服務類型對事務的支持,例如, 如果想要 “UserDataInterface” 支持事物,就需要它實現 “EnableTxer” 接口。

// TxDataInterface represents operations needed for transaction support.
// It only needs to be implemented once for each database
// For sqlGdbc, it is implemented for SqlDBTx in transaction.go
type TxDataInterface interface {
    // TxBegin starts a transaction. It gets a DB handler from the receiver and return a TxDataInterface, which has a
    // *sql.Tx inside. Any data access wrapped inside a transaction will go through the *sql.Tx
    TxBegin() (TxDataInterface, error)
    // TxEnd is called at the end of a transaction and based on whether there is an error, it commits or rollback the
    // transaction.
    // txFunc is the business function wrapped in a transaction
    TxEnd(txFunc func() error) error
    // Return the underline transaction handler, sql.Tx
    GetTx() gdbc.SqlGdbc
}

// This interface needs to be included in every data service interface that needs transaction support
type EnableTxer interface {
    // EnableTx enables transaction, basically it replaces the underling database handle sql.DB with sql.Tx
    EnableTx(dataInterface TxDataInterface)
}

// UserDataInterface represents interface for user data access through database
type UserDataInterface interface {
...
    Update(user *model.User) (rowsAffected int64, err error)
    // Insert adds a user to a database. The returned resultUser has a Id, which is auto generated by database
    Insert(user *model.User) (resultUser *model.User, err error)
    // Need to add this for transaction support
    EnableTxer
}

以下代碼是 “TxDataInterface” 的實現。 “TxDataSql”是 “TxDataInterface” 的具體類型。 它調用底層數據庫鏈接的開始和結束函數來執行真正的事務操作。

// TxDataSql is the generic implementation for transaction for SQL database
// You only need to do it once for each SQL database
type TxDataSql struct {
    DB gdbc.SqlGdbc
}

func (tds *TxDataSql) TxEnd(txFunc func() error) error {
    return tds.DB.TxEnd(txFunc)
}

func (tds *TxDataSql) TxBegin() (dataservice.TxDataInterface, error) {

    sqlTx, error := tds.DB.TxBegin()
    tdi := TxDataSql{sqlTx}
    tds.DB = tdi.DB
    return &tdi, error
}
func (tds *TxDataSql) GetTx() gdbc.SqlGdbc {
    return tds.DB
}

事物策略:

你可能會問爲什麼我在上面的代碼中需要 “TxDataSql”? 確實可以在沒有它的情況下實現事務,實際上最開的程序裏就沒有它。 但是我還是要在某些數據服務中實現“TxDataInterface” 來開始和結束事務。 由於這是在用例層中完成的,用例層不知道哪個數據服務類型實現了接口,因此必須在每個數據服務接口上實現 “TxDataInterface”(例如,“UserDataInterface” 和“CourseDataInterface”)以保證 “用例層”不會選擇沒有接口的 “數據服務(data service)”。 在創建“TxDataSql” 之後,我只需要在 “TxDataSql” 中實現一次 “TxDataInterface”,然後每個數據服務類型只需要實現“EnableTx()” 就行了。

// UserDataSql is the SQL implementation of UserDataInterface
type UserDataSql struct {
    DB gdbc.SqlGdbc
}

func (uds *UserDataSql) EnableTx(tx dataservice.TxDataInterface) {
    uds.DB = tx.GetTx()
}

func (uds *UserDataSql) FindByName(name string) (*model.User, error) {
    //logger.Log.Debug("call FindByName() and name is:", name)
    rows, err := uds.DB.Query(QUERY_USER_BY_NAME, name)
    if err != nil {
        return nil, errors.Wrap(err, "")
    }
    defer rows.Close()
    return retrieveUser(rows)
}

上面的代碼是 “UserDataService” 接口的實現程序。 “EnableTx()”方法從 “TxDataInterface” 獲得 sql.Tx 並將 “UserDataSql” 中的 sql.DB 替換爲 sql.Tx.

數據訪問方法(例如,FindByName())在事務代碼和非事務代碼之間共享,並且不需要知道 “UserDataSql.DB” 是 sql.DB 還是 sql.Tx.

依賴關係漏洞:

上面的代碼實現中存在一個缺陷,這會破壞我的設計並使其不完美。它是 “TxDataInterface” 中的函數“GetTx()”,它是一個數據服務層接口,因此它不應該依賴於 gdbc.SqlGdbc(數據庫接口)。你可能認爲數據服務層的實現代碼無論如何都需要訪問數據庫,當前這是正確的。但是,你可以在將來更改實現去調用 gRPC 微服務(而不是數據庫)。如果接口不依賴於 SQL 接口的話,則可以自由更改實現,但如果不是,則即使你的接口實現已更改,該接口也會永久保留對 SQL 的依賴。

爲什麼它是本程序中打破依賴關係的唯一地方?因爲對於其他接口,容器負責創建具體類型,而程序的其餘部分僅使用接口。但是對於事務,在創建具體類型之後,需要將底層數據庫處理程序從 sql.DB 替換爲 sql.Tx,這破壞了設計。

它有解決方法嗎?是的,容器可以爲需要事務的函數創建 sql.Tx 而不是 sql.DB,這樣我就不需要在以後的用例級別中替換它。但是,配置文件中需要一個標誌來指示函數是否需要事務, 而且這個標誌需要配備給用例中的每個函數。這是一個太大的改動,所以我決定現在先這樣,以後再重新審視它。

好處:

通過這個實現,事務代碼對業務邏輯幾乎是透明的(除了我上面提到的缺陷)。業務邏輯中沒有數據存儲(datastore)級事務代碼,如 Tx.Begin,Tx.Commit 和 Tx.Rollback(但你確實需要業務級別事物函數 Tx.Begin 和 Tx.End),不僅如此,你的持久性代碼中也幾乎沒有數據存儲級事務代碼。 如需在用例層上啓用事務,你只需要在用例上實現 EnableTx()並將業務函數封裝在 “TxBegin()”,EnableTx()和“TxEnd()” 中,如上例所示。 在持久層上,大多數事務代碼已經由 “txDataService.go” 實現,你只需要爲特定的數據服務(例如 UserDataService)實現 “EnableTx”。 事務支持的真正操作是在“transaction.go” 文件中實現的,它實現了 “Transactioner” 接口,它有四個函數,“Rollback”, “Commit”, “TxBegin” 和 “TxEnd”。

對用例增加事物支持的步驟:

假設我們需要在用例 “listCourse” 中爲一個函數添加事務支持,以下是步驟

  1. 在列表課程用例(“listCourse.go”)中實現 “EnableTxer” 界面
  2. 在域模型(“course”)數據服務層(courseDataMysql.go)中實現 “EnableTxer” 接口
  3. 創建一個新的事務啓用函數並將現有業務函數包裝在 “TxBegin()”,EnableTx()和“TxEnd()” 中
缺陷:

首先,它仍然不是聲明​​式事物管理; 第二,它沒有完全達到需求中的#4。要將用例函數從非事務更改爲事務,你可以創建一個支持事務的新函數,它需要更改調用函數; 或者你修改現有函數並將其包裝到事務中,這也需要代碼更改。爲了實現#4,需要添加許多代碼,因此我將其推遲到以後。第三,它不支持嵌套事務(Nested Transaction),因此你需要手動確保代碼中沒有發生嵌套事務。如果代碼庫不是太複雜,這很容易做到。如果你有一個非常複雜的代碼庫,有很多事務和非事務函數混在一起,那麼手工做起來會比較困難,這是需要在程序中實現嵌套事務或找到已經支持它的方案。我沒有花時間研究添加嵌套事務所需的工作量,但這可能並不容易。如果你對它感興趣,這裏 ³ 是一些討論。到目前爲止,對於大多數情況而言,當前的解決方案可能是在代價不大的情況下的最佳方案。

應用範圍:

首先,它只支持 SQL 數據庫的事務。 如果你有 NoSql 數據庫,它將無法工作(大多數 NoSql 數據庫無論如何都不支持事務)。 其次,如果事務跨越了數據庫的邊界(例如在不同的微服務器之間),那麼它將無法工作。 在這種情況下,你需要使用 Saga⁴。它的原理是爲事物中的每個操作寫一個補償操作,然後在回滾階段挨個執行每一個補償操作。 在當前框架中添加 Sage 解決方案應該不難。

其他數據庫相關問題:

關閉數據庫鏈接(Close connection)

我從來沒有爲數據庫鏈接調用 Close()函數,因爲沒有必要這樣做。 你可以傳入 sql.DB 或 sql.Tx 作爲持久性函數的接收器(receiver)。 對於 sql.DB,數據庫將自動創建鏈接池併爲你管理鏈接。 鏈接完成後,它將返回到鏈接池,無需關閉。 對於 sql.Tx,在事務結束時,你可以提交或回滾,之後鏈接將返回到連接池,而無需關閉。 請參閱此處⁵ 和 此處⁶ .

對象關係映射(O/R mapping)

我簡要地查看了幾個 “O/R” 映射庫,但它們沒有提供我所需要的功能。 我認爲 “O/R 映射” 只適合兩種情況。 首先,你的應用程序主要是 CRUD,沒有太多的查詢或搜索; 第二,開發人員不熟悉 SQL。 如果不是這種情況,則 O/R 映射不會提供太多幫助。 我想從擴展數據庫模塊中獲得兩個功能,一個是將 sql.row 加載到我的域模型結構(包括處理 NULL 值)中(例如“User”),另一個是自動關閉 sql 類型,如 sql.statement 或 sql.rows。 有一些 sql 擴展庫似乎提供了至少部分這樣的功能。 我還沒有嘗試,但似乎值得一試。

延遲(Defer):

在進行數據庫訪問時,你將進行大量重複調用以關閉數據庫類型(例如 statements, rows)。例如以下代碼中的 “defer row.close()”。 你想要記住這一點,要在錯誤處理函數之後調用“defer row.close()”,因爲如果不是這樣,當出現錯誤時,“rows” 將爲 nil,這將導致恐慌並且不會執行錯誤處理代碼。

func (uds *UserDataSql) Find(id int) (*model.User, error) {
    rows, err := uds.DB.Query(QUERY_USER_BY_ID, id)
    if err != nil {
        return nil, errors.Wrap(err, "")
    }
    defer rows.Close()
    return retrieveUser(rows)
}

恐慌(panic):

我看到很多 Go 數據庫代碼在出現數據庫錯誤時拋出了恐慌(panic)而不是錯誤(error),這可能會導致微服務出現問題,因爲在微服務環境中你通常希望服務一直運行。 假設當更新語句中出現 SQL 錯誤時,用戶將無法訪問該功能,這很糟糕。 但如果因爲這個,整個微服務或網站被關閉,那就更糟了。 因此,正確的方法是將錯誤傳播到上一級並讓它決定要做什麼。 因此正確的做法是不在你的程序中拋出 panic,但如果第三方庫拋出恐慌呢? 這時你需要捕獲恐慌並從中恢復以保持你的服務正常運行。 我在另一篇文章 “日誌管理”⁸中有具體示例.

源程序:

完整的源程序鏈接 github: https://github.com/jfeng45/servicetmpl

索引:

[1]db transaction in golang

[2]database/sql Tx—detecting Commit or Rollback

[3]database/sql: nested transaction or save point support

[4]GOTO 2015 • Applying the Saga Pattern • Caitie McCaffrey — YouTube

[5]Common Pitfalls When Using database/sql in Go

[6]Go database/sql tutorial

[7]sqlx

[8]Go Microservice with Clean Architecture: Application Logging

不堆砌術語,不羅列架構,不迷信權威,不盲從流行,堅持獨立思考

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://blog.csdn.net/weixin_38748858/article/details/104067578