帶你十天輕鬆搞定 Go 微服務之大結局(分佈式事務)

序言

我們通過一個系列文章跟大家詳細展示一個 go-zero 微服務示例,整個系列分十篇文章,目錄結構如下:

  1. 環境搭建:帶你十天輕鬆搞定 Go 微服務系列(一)

  2. 服務拆分:帶你十天輕鬆搞定 Go 微服務系列(二)

  3. 用戶服務:帶你十天輕鬆搞定 Go 微服務系列(三)

  4. 產品服務:帶你十天輕鬆搞定 Go 微服務系列(四)

  5. 訂單服務:帶你十天輕鬆搞定 Go 微服務系列(五)

  6. 支付服務:帶你十天輕鬆搞定 Go 微服務系列(六)

  7. RPC 服務 Auth 驗證:帶你十天輕鬆搞定 Go 微服務系列(七)

  8. 服務監控:帶你十天輕鬆搞定 Go 微服務系列(八、服務監控)

  9. 鏈路追蹤:帶你十天輕鬆搞定 Go 微服務系列(九、鏈路追蹤)

  10. 分佈式事務(本文)

期望通過本系列帶你在本機利用 Docker 環境利用 go-zero 快速開發一個商城系統,讓你快速上手微服務。

完整示例代碼:https://github.com/nivin-studio/go-zero-mall

首先,我們來看一下整體的服務拆分圖:

10.1 DTM 介紹

DTM 是一款 golang開發的分佈式事務管理器,解決了跨數據庫、跨服務、跨語言棧更新數據的一致性問題。

絕大多數的訂單系統的事務都會跨服務,因此都有更新數據一致性的需求,都可以通過 DTM 大幅簡化架構,形成一個優雅的解決方案。

而且 DTM 已經深度合作,原生的支持 go-zero 中的分佈式事務,下面就來詳細的講解如何用 DTM 來幫助我們的訂單系統解決一致性問題

10.2 go-zero 使用 DTM

首先我們回顧下 第五章 訂單服務 中 order rpc 服務中 Create 接口處理邏輯。方法裏判斷了用戶和產品的合法性,以及產品庫存是否充足,最後通過 OrderModel 創建了一個新的訂單,以及調用 product rpc 服務 Update 的接口更新了產品的庫存。

func (l *CreateLogic) Create(in *order.CreateRequest) (*order.CreateResponse, error) {
 // 查詢用戶是否存在
 _, err := l.svcCtx.UserRpc.UserInfo(l.ctx, &user.UserInfoRequest{
  Id: in.Uid,
 })
 if err != nil {
  return nil, err
 }

 // 查詢產品是否存在
 productRes, err := l.svcCtx.ProductRpc.Detail(l.ctx, &product.DetailRequest{
  Id: in.Pid,
 })
 if err != nil {
  return nil, err
 }
 // 判斷產品庫存是否充足
 if productRes.Stock <= 0 {
  return nil, status.Error(500, "產品庫存不足")
 }

 newOrder := model.Order{
  Uid:    in.Uid,
  Pid:    in.Pid,
  Amount: in.Amount,
  Status: 0,
 }

 res, err := l.svcCtx.OrderModel.Insert(&newOrder)
 if err != nil {
  return nil, status.Error(500, err.Error())
 }

 newOrder.Id, err = res.LastInsertId()
 if err != nil {
  return nil, status.Error(500, err.Error())
 }

 _, err = l.svcCtx.ProductRpc.Update(l.ctx, &product.UpdateRequest{
  Id:     productRes.Id,
  Name:   productRes.Name,
  Desc:   productRes.Desc,
  Stock:  productRes.Stock - 1,
  Amount: productRes.Amount,
  Status: productRes.Status,
 })
 if err != nil {
  return nil, err
 }

 return &order.CreateResponse{
  Id: newOrder.Id,
 }, nil
}

之前我們說過,這裏處理邏輯存在數據一致性問題,有可能訂單創建成功了,但是在更新產品庫存的時候可能會發生失敗,這時候就會存在訂單創建成功,產品庫存沒有減少的情況。

因爲這裏的產品庫存更新是跨服務操作的,也沒有辦法使用本地事務來處理,所以我們需要使用分佈式事務來處理它。這裏我們需要藉助 DTMSAGA 協議來實現訂單創建和產品庫存更新的跨服務分佈式事務操作。

大家可以先移步到 DTM 的文檔先了接下 SAGA 事務模式。

10.2.1 添加 DTM 服務配置

參見 第一章 環境搭建,修改 dtm->config.yml 配置文件。我們只要修改 MicroService 中的 TargetEndPoint 配置即可,將 dtm 註冊到 etcd 中。

# ......

# 微服務
MicroService:
  Driver: 'dtm-driver-gozero'           # 要處理註冊/發現的驅動程序的名稱
  Target: 'etcd://etcd:2379/dtmservice' # 註冊 dtm 服務的 etcd 地址
  EndPoint: 'dtm:36790'

# ......

10.2.2 添加 dtm_barrier 數據表

微服務是一個分佈式系統,因此可能發生各種異常,例如網絡抖動導致重複請求,這類的異常會讓業務處理異常複雜。而 DTM 中,首創了 子事務屏障 技術,使用該技術,能夠非常便捷的解決異常問題,極大的降低了分佈式事務的使用門檻。

使用 DTM 提供的子事務屏障技術則需要在業務數據庫中創建子事務屏障相關的表,建表語句如下:

create database if not exists dtm_barrier
/*!40100 DEFAULT CHARACTER SET utf8mb4 */
;
drop table if exists dtm_barrier.barrier;
create table if not exists dtm_barrier.barrier(
  id bigint(22) PRIMARY KEY AUTO_INCREMENT,
  trans_type varchar(45) default '',
  gid varchar(128) default '',
  branch_id varchar(128) default '',
  op varchar(45) default '',
  barrier_id varchar(45) default '',
  reason varchar(45) default '' comment 'the branch type who insert this record',
  create_time datetime DEFAULT now(),
  update_time datetime DEFAULT now(),
  key(create_time),
  key(update_time),
  UNIQUE key(gid, branch_id, op, barrier_id)
);

注意:庫名和表名請勿修改,如果您自定義了表名,請在使用前調用 dtmcli.SetBarrierTableName

10.2.3 修改 OrderModelProductModel

在每一個子事務中,很多操作邏輯,需要使用到本地事務,所以我們添加一些 model 方法兼容 DTM 的子事務屏障

$ vim mall/service/order/model/ordermodel.go
package model

......

type (
 OrderModel interface {
  TxInsert(tx *sql.Tx, data *Order) (sql.Result, error)
  TxUpdate(tx *sql.Tx, data *Order) error
 }
)

......

func (m *defaultOrderModel) TxInsert(tx *sql.Tx, data *Order) (sql.Result, error) {
 query := fmt.Sprintf("insert into %s (%s) values (?, ?, ?, ?)", m.table, orderRowsExpectAutoSet)
 ret, err := tx.Exec(query, data.Uid, data.Pid, data.Amount, data.Status)

 return ret, err
}

func (m *defaultOrderModel) TxUpdate(tx *sql.Tx, data *Order) error {
 productIdKey := fmt.Sprintf("%s%v", cacheOrderIdPrefix, data.Id)
 _, err := m.Exec(func(conn sqlx.SqlConn) (result sql.Result, err error) {
  query := fmt.Sprintf("update %s set %s where `id` = ?", m.table, orderRowsWithPlaceHolder)
  return tx.Exec(query, data.Uid, data.Pid, data.Amount, data.Status, data.Id)
 }, productIdKey)
 return err
}

func (m *defaultOrderModel) FindOneByUid(uid int64) (*Order, error) {
 var resp Order

 query := fmt.Sprintf("select %s from %s where `uid` = ? order by create_time desc limit 1", orderRows, m.table)
 err := m.QueryRowNoCache(&resp, query, uid)

 switch err {
 case nil:
  return &resp, nil
 case sqlc.ErrNotFound:
  return nil, ErrNotFound
 default:
  return nil, err
 }
}
$ vim mall/service/product/model/productmodel.go
package model

......

type (
 ProductModel interface {
  TxAdjustStock(tx *sql.Tx, id int64, delta int) (sql.Result, error)
 }
)

......

func (m *defaultProductModel) TxAdjustStock(tx *sql.Tx, id int64, delta int) (sql.Result, error) {
 productIdKey := fmt.Sprintf("%s%v", cacheProductIdPrefix, id)
 return m.Exec(func(conn sqlx.SqlConn) (result sql.Result, err error) {
  query := fmt.Sprintf("update %s set stock=stock+? where stock >= -? and id=?", m.table)
  return tx.Exec(query, delta, delta, id)
 }, productIdKey)
}

10.2.4 修改 product rpc 服務

$ vim mall/service/product/rpc/product.proto
syntax = "proto3";

package productclient;

option go_package = "product";

......

// 減產品庫存
message DecrStockRequest {
    int64 id = 1;
    int64 num = 2;
}
message DecrStockResponse {
}
// 減產品庫存

service Product {
    ......
    rpc DecrStock(DecrStockRequest) returns(DecrStockResponse);
    rpc DecrStockRevert(DecrStockRequest) returns(DecrStockResponse);
}

提示:修改後使用 goctl 工具重新生成下代碼。

$ vim mall/service/product/rpc/internal/logic/decrstocklogic.go
package logic

import (
 "context"
 "database/sql"

 "mall/service/product/rpc/internal/svc"
 "mall/service/product/rpc/product"

 "github.com/dtm-labs/dtmcli"
 "github.com/dtm-labs/dtmgrpc"
 "github.com/tal-tech/go-zero/core/logx"
 "github.com/tal-tech/go-zero/core/stores/sqlx"
 "google.golang.org/grpc/codes"
 "google.golang.org/grpc/status"
)

type DecrStockLogic struct {
 ctx    context.Context
 svcCtx *svc.ServiceContext
 logx.Logger
}

func NewDecrStockLogic(ctx context.Context, svcCtx *svc.ServiceContext) *DecrStockLogic {
 return &DecrStockLogic{
  ctx:    ctx,
  svcCtx: svcCtx,
  Logger: logx.WithContext(ctx),
 }
}

func (l *DecrStockLogic) DecrStock(in *product.DecrStockRequest) (*product.DecrStockResponse, error) {
 // 獲取 RawDB
 db, err := sqlx.NewMysql(l.svcCtx.Config.Mysql.DataSource).RawDB()
 if err != nil {
  return nil, status.Error(500, err.Error())
 }

 // 獲取子事務屏障對象
 barrier, err := dtmgrpc.BarrierFromGrpc(l.ctx)
 if err != nil {
  return nil, status.Error(500, err.Error())
 }
 // 開啓子事務屏障
 err = barrier.CallWithDB(db, func(tx *sql.Tx) error {
  // 更新產品庫存
  result, err := l.svcCtx.ProductModel.TxAdjustStock(tx, in.Id, -1)
  if err != nil {
   return err
  }

  affected, err := result.RowsAffected()
  // 庫存不足,返回子事務失敗
  if err == nil && affected == 0 {
   return dtmcli.ErrFailure
  }

  return err
 })

 // 這種情況是庫存不足,不再重試,走回滾
 if err == dtmcli.ErrFailure {
  return nil, status.Error(codes.Aborted, dtmcli.ResultFailure)
 }

 if err != nil {
  return nil, err
 }

 return &product.DecrStockResponse{}, nil
}
$ vim mall/service/product/rpc/internal/logic/decrstockrevertlogic.go
package logic

import (
 "context"
 "database/sql"

 "mall/service/product/rpc/internal/svc"
 "mall/service/product/rpc/product"

 "github.com/dtm-labs/dtmcli"
 "github.com/dtm-labs/dtmgrpc"
 "github.com/tal-tech/go-zero/core/logx"
 "github.com/tal-tech/go-zero/core/stores/sqlx"
 "google.golang.org/grpc/status"
)

type DecrStockRevertLogic struct {
 ctx    context.Context
 svcCtx *svc.ServiceContext
 logx.Logger
}

func NewDecrStockRevertLogic(ctx context.Context, svcCtx *svc.ServiceContext) *DecrStockRevertLogic {
 return &DecrStockRevertLogic{
  ctx:    ctx,
  svcCtx: svcCtx,
  Logger: logx.WithContext(ctx),
 }
}

func (l *DecrStockRevertLogic) DecrStockRevert(in *product.DecrStockRequest) (*product.DecrStockResponse, error) {
 // 獲取 RawDB
 db, err := sqlx.NewMysql(l.svcCtx.Config.Mysql.DataSource).RawDB()
 if err != nil {
  return nil, status.Error(500, err.Error())
 }

 // 獲取子事務屏障對象
 barrier, err := dtmgrpc.BarrierFromGrpc(l.ctx)
 if err != nil {
  return nil, status.Error(500, err.Error())
 }
 // 開啓子事務屏障
 err = barrier.CallWithDB(db, func(tx *sql.Tx) error {
  // 更新產品庫存
  _, err := l.svcCtx.ProductModel.TxAdjustStock(tx, in.Id, 1)
  return err
 })

 if err != nil {
  return nil, err
 }

 return &product.DecrStockResponse{}, nil
}

10.2.5 修改 order rpc 服務

$ vim mall/service/order/rpc/order.proto
syntax = "proto3";

package orderclient;

option go_package = "order";

......

service Order {
    rpc Create(CreateRequest) returns(CreateResponse);
    rpc CreateRevert(CreateRequest) returns(CreateResponse);
    ......
}

提示:修改後使用 goctl 工具重新生成下代碼。

$ vim mall/service/order/rpc/internal/logic/createlogic.go
package logic

import (
 "context"
 "database/sql"
 "fmt"

 "mall/service/order/model"
 "mall/service/order/rpc/internal/svc"
 "mall/service/order/rpc/order"
 "mall/service/user/rpc/user"

 "github.com/dtm-labs/dtmgrpc"
 "github.com/tal-tech/go-zero/core/logx"
 "github.com/tal-tech/go-zero/core/stores/sqlx"
 "google.golang.org/grpc/status"
)

type CreateLogic struct {
 ctx    context.Context
 svcCtx *svc.ServiceContext
 logx.Logger
}

func NewCreateLogic(ctx context.Context, svcCtx *svc.ServiceContext) *CreateLogic {
 return &CreateLogic{
  ctx:    ctx,
  svcCtx: svcCtx,
  Logger: logx.WithContext(ctx),
 }
}

func (l *CreateLogic) Create(in *order.CreateRequest) (*order.CreateResponse, error) {
 // 獲取 RawDB
 db, err := sqlx.NewMysql(l.svcCtx.Config.Mysql.DataSource).RawDB()
 if err != nil {
  return nil, status.Error(500, err.Error())
 }

 // 獲取子事務屏障對象
 barrier, err := dtmgrpc.BarrierFromGrpc(l.ctx)
 if err != nil {
  return nil, status.Error(500, err.Error())
 }
 // 開啓子事務屏障
 if err := barrier.CallWithDB(db, func(tx *sql.Tx) error {
  // 查詢用戶是否存在
  _, err := l.svcCtx.UserRpc.UserInfo(l.ctx, &user.UserInfoRequest{
   Id: in.Uid,
  })
  if err != nil {
   return fmt.Errorf("用戶不存在")
  }

  newOrder := model.Order{
   Uid:    in.Uid,
   Pid:    in.Pid,
   Amount: in.Amount,
   Status: 0,
  }
  // 創建訂單
  _, err = l.svcCtx.OrderModel.TxInsert(tx, &newOrder)
  if err != nil {
   return fmt.Errorf("訂單創建失敗")
  }

  return nil
 }); err != nil {
  return nil, status.Error(500, err.Error())
 }

 return &order.CreateResponse{}, nil
}
$ vim mall/service/order/rpc/internal/logic/createrevertlogic.go
package logic

import (
 "context"
 "database/sql"
 "fmt"

 "mall/service/order/rpc/internal/svc"
 "mall/service/order/rpc/order"
 "mall/service/user/rpc/user"

 "github.com/dtm-labs/dtmgrpc"
 "github.com/tal-tech/go-zero/core/logx"
 "github.com/tal-tech/go-zero/core/stores/sqlx"
 "google.golang.org/grpc/status"
)

type CreateRevertLogic struct {
 ctx    context.Context
 svcCtx *svc.ServiceContext
 logx.Logger
}

func NewCreateRevertLogic(ctx context.Context, svcCtx *svc.ServiceContext) *CreateRevertLogic {
 return &CreateRevertLogic{
  ctx:    ctx,
  svcCtx: svcCtx,
  Logger: logx.WithContext(ctx),
 }
}

func (l *CreateRevertLogic) CreateRevert(in *order.CreateRequest) (*order.CreateResponse, error) {
 // 獲取 RawDB
 db, err := sqlx.NewMysql(l.svcCtx.Config.Mysql.DataSource).RawDB()
 if err != nil {
  return nil, status.Error(500, err.Error())
 }

 // 獲取子事務屏障對象
 barrier, err := dtmgrpc.BarrierFromGrpc(l.ctx)
 if err != nil {
  return nil, status.Error(500, err.Error())
 }
 // 開啓子事務屏障
 if err := barrier.CallWithDB(db, func(tx *sql.Tx) error {
  // 查詢用戶是否存在
  _, err := l.svcCtx.UserRpc.UserInfo(l.ctx, &user.UserInfoRequest{
   Id: in.Uid,
  })
  if err != nil {
   return fmt.Errorf("用戶不存在")
  }
  // 查詢用戶最新創建的訂單
  resOrder, err := l.svcCtx.OrderModel.FindOneByUid(in.Uid)
  if err != nil {
   return fmt.Errorf("訂單不存在")
  }
  // 修改訂單狀態9,標識訂單已失效,並更新訂單
  resOrder.Status = 9
  err = l.svcCtx.OrderModel.TxUpdate(tx, resOrder)
  if err != nil {
   return fmt.Errorf("訂單更新失敗")
  }

  return nil
 }); err != nil {
  return nil, status.Error(500, err.Error())
 }

 return &order.CreateResponse{}, nil
}

10.2.6 修改 order api 服務

我們把 order rpc 服務 CreateCreateRevert 接口方法,product rpc 服務 DecrStockDecrStockRevert 接口方法,提到 order api 服務中做成一個以 SAGA事務模式 的分佈式事務操作。

$ vim mall/service/order/api/etc/order.yaml
Name: Order
Host: 0.0.0.0
Port: 8002

......

OrderRpc:
  Etcd:
    Hosts:
    - etcd:2379
    Key: order.rpc

ProductRpc:
  Etcd:
    Hosts:
    - etcd:2379
    Key: product.rpc
$ vim mall/service/order/api/internal/config/config.go
package config

import (
 "github.com/tal-tech/go-zero/rest"
 "github.com/tal-tech/go-zero/zrpc"
)

type Config struct {
 rest.RestConf

 Auth struct {
  AccessSecret string
  AccessExpire int64
 }

 OrderRpc   zrpc.RpcClientConf
 ProductRpc zrpc.RpcClientConf
}
$ vim mall/service/order/api/internal/svc/servicecontext.go
package svc

import (
 "mall/service/order/api/internal/config"
 "mall/service/order/rpc/orderclient"
 "mall/service/product/rpc/productclient"

 "github.com/tal-tech/go-zero/zrpc"
)

type ServiceContext struct {
 Config config.Config

 OrderRpc   orderclient.Order
 ProductRpc productclient.Product
}

func NewServiceContext(c config.Config) *ServiceContext {
 return &ServiceContext{
  Config:     c,
  OrderRpc:   orderclient.NewOrder(zrpc.MustNewClient(c.OrderRpc)),
  ProductRpc: productclient.NewProduct(zrpc.MustNewClient(c.ProductRpc)),
 }
}
$ vim mall/service/order/api/order.go
package main

import (
 ......

 _ "github.com/dtm-labs/driver-gozero" // 添加導入 `gozero` 的 `dtm` 驅動
)

var configFile = flag.String("f""etc/order.yaml""the config file")

func main() {
 ......
}
$ vim mall/service/order/api/internal/logic/createlogic.go
package logic

import (
 "context"

 "mall/service/order/api/internal/svc"
 "mall/service/order/api/internal/types"
 "mall/service/order/rpc/order"
 "mall/service/product/rpc/product"

 "github.com/dtm-labs/dtmgrpc"
 "github.com/tal-tech/go-zero/core/logx"
 "google.golang.org/grpc/status"
)

type CreateLogic struct {
 logx.Logger
 ctx    context.Context
 svcCtx *svc.ServiceContext
}

func NewCreateLogic(ctx context.Context, svcCtx *svc.ServiceContext) CreateLogic {
 return CreateLogic{
  Logger: logx.WithContext(ctx),
  ctx:    ctx,
  svcCtx: svcCtx,
 }
}

func (l *CreateLogic) Create(req types.CreateRequest) (resp *types.CreateResponse, err error) {
 // 獲取 OrderRpc BuildTarget
 orderRpcBusiServer, err := l.svcCtx.Config.OrderRpc.BuildTarget()
 if err != nil {
  return nil, status.Error(100, "訂單創建異常")
 }

 // 獲取 ProductRpc BuildTarget
 productRpcBusiServer, err := l.svcCtx.Config.ProductRpc.BuildTarget()
 if err != nil {
  return nil, status.Error(100, "訂單創建異常")
 }

 // dtm 服務的 etcd 註冊地址
 var dtmServer = "etcd://etcd:2379/dtmservice"
 // 創建一個gid
 gid := dtmgrpc.MustGenGid(dtmServer)
 // 創建一個saga協議的事務
 saga := dtmgrpc.NewSagaGrpc(dtmServer, gid).
  Add(orderRpcBusiServer+"/orderclient.Order/Create", orderRpcBusiServer+"/orderclient.Order/CreateRevert"&order.CreateRequest{
   Uid:    req.Uid,
   Pid:    req.Pid,
   Amount: req.Amount,
   Status: 0,
  }).
  Add(productRpcBusiServer+"/productclient.Product/DecrStock", productRpcBusiServer+"/productclient.Product/DecrStockRevert"&product.DecrStockRequest{
   Id:  req.Pid,
   Num: 1,
  })

 // 事務提交
 err = saga.Submit()
 if err != nil {
  return nil, status.Error(500, err.Error())
 }

 return &types.CreateResponse{}, nil
}

提示:SagaGrpc.Add 方法第一個參數 action 是微服務 grpc 訪問的方法路徑,這個方法路徑需要分別去以下文件中尋找。
mall/service/order/rpc/order/order.pb.go
mall/service/product/rpc/product/product.pb.go
按關鍵字 Invoke 搜索即可找到。

10.3 測試 go-zero + DTM

10.3.1 測試分佈式事務正常流程

10.3.2 測試分佈式事務失敗流程 1

  1. 首先 DTM 服務會調 order rpc Create 接口進行創建訂單處理。

  2. 創建訂單完成後 DTM 服務再調 product rpc DecrStock 接口,這個接口的裏通過 pid 更新產品庫存,因產品庫存不足,拋出事務失敗。

  3. DTM 服務發起補償機制,調 order rpc CreateRevert 接口進行訂單的補償處理。

  4. DTM 服務發起補償機制,調 product rpc DecrStockRevert 接口進行產品庫存更新的補償處理。但是因爲在 product rpc DecrStock 接口的子事務屏障內,業務處理並未成功。所以在 DecrStockRevert 接口裏不會執行子事務屏障內的業務邏輯。

10.3.3 測試分佈式事務失敗流程 2

大家可以對比下 測試分佈式事務失敗流程 1 與 測試分佈式事務失敗流程 2 不同之處,是不是能發現和體會到 DTM 的這個子事務屏障技術的強大之處。

子事務屏障會自動識別正向操作是否已執行,失敗流程 1 未執行業務操作,所以補償時,也不會執行補償的業務操作;失敗流程 2 執行了業務操作,所以補償時,也會執行補償的業務操作。

項目地址

https://github.com/zeromicro/go-zero

歡迎使用 go-zerostar 支持我們!

關於勘誤

由於公衆號只能修改 20 個字,所以無法勘誤,文章如有疑惑,請查看:

https://www.zhihu.com/people/kevwan/posts

所有勘誤都會在這裏修改。

微信交流羣

關注『微服務實踐』公衆號並點擊 交流羣 獲取社區羣二維碼。

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/M1Xx_UI-62JhWVRdSfbDMg