Go 組件:context 學習筆記!

最近學習 go 有一段時間了,在網上一直看到別人推薦,學 go 可以學習裏面的 context 源碼,短小精悍。看了下確實有所收穫,本文是基於我最近對 context 源碼學習的一些心得積累,望大家不吝賜教。

一、爲什麼使用 Context

(一)go 的扛把子

要論 go 最津津樂道的功能莫過於 go 強大而簡潔的併發能力。

func main(){
  go func(){
    fmt.Println("Hello World")
  }()
}

通過簡單的 go func(){},go 可以快速生成新的協程並運行。

(二)想象一個沒有 Context 的世界

有併發的地方就有江湖。每個編程語言都有各自的併發編程方式,也有不同的併發控制方法,比如 java 通過 join() 來做主子線程同步。

go 裏面常用於協程間通信和管理的有 channel 和 sync 包。比如 channel 可以通知協程做特定操作(退出,阻塞等),sync 可以加鎖和同步。

假如我要實現一個可以同時關閉所有協程的程序,可以這樣實現。

closed := make(chan struct{})
for i := 0; i < 2; i++ {
   // do something
   go func(i int) {
      select {
      case <-closed:
         fmt.Printf("%d Closed\n", i)
      }
   }(i)
}
// 發送指令關閉所有協程
close(closed)
time.Sleep(1 * time.Second)

因爲 go 的協程不支持直接從外部退出,不像 C++ 和 Java 有個線程 ID 可以操作。所以只能通過協程自己退出的方式。一般來說通過 channel 來控制是最方便的。

如果我想加點功能,比如到時間後退出,只要給 channel 增加關閉條件即可。

closed := make(chan struct{})
for i := 0; i < 2; i++ {
   go func(i int) {
      // do something
      select {
      case <-closed:
         fmt.Printf("%d Timeout\n", i)
      }
   }(i)
}
// 加個時間條件
ta := time.After(5 * time.Second)
select {
case <-ta:
   close(closed)
}
time.Sleep(1 * time.Second)

(三)用 Context 精簡代碼

上面的代碼已經夠簡單了,但是還是顯得有些複雜。比如每次都要在協程內部增加對 channel 的判斷,也要在外部設置關閉條件。試想一下,如果程序要限制的是總時長,而不是單個操作的時長,這樣每個操作要限制多少時間也是個難題。

這個時候就輪到 Context 登場了。Context 顧名思義是協程的上下文,主要用於跟蹤協程的狀態,可以做一些簡單的協程控制,也能記錄一些協程信息。

下面試着用 Context 改造下前面的例子:

// 空的父context
pctx := context.TODO()
// 子context(攜帶有超時信息),cancel函數(可以主動觸發取消)
//ctx, cancel := context.WithTimeout(pctx, 5*time.Second)
ctx, _ := context.WithTimeout(pctx, 5*time.Second)
for i := 0; i < 2; i++ {
   go func(i int) {
      // do something
    // 大部分工具庫內置了對ctx的判斷,下面的部分幾乎可以省略
      select {
      case <-ctx.Done():
         fmt.Printf("%d Done\n", i)
      }
   }(i)
}
// 調用cancel會直接關閉ctx.Done()返回的管道,不用等到超時
//cancel()
time.Sleep(6 * time.Second)

通過 Context 可以進一步簡化控制代碼,且更爲友好的是,大多數 go 庫,如 http、各種 db driver、grpc 等都內置了對 ctx.Done() 的判斷,我們只需要將 ctx 傳入即可。

二、Context 基礎用法

接下來介紹 Context 的基礎用法,最爲重要的就是 3 個基礎能力,取消、超時、附加值。

(一)新建一個 Context

ctx := context.TODO()
ctx := context.Background()

這兩個方法返回的內容是一樣的,都是返回一個空的 context,這個 context 一般用來做父 context。

(二)WithCancel

// 函數聲明
func WithCancel(parent Context) (ctx Context, cancel CancelFunc)
// 用法:返回一個子Context和主動取消函數
ctx, cancel := context.WithCancel(parentCtx)

這個函數相當重要,會根據傳入的 context 生成一個子 context 和一個取消函數。當父 context 有相關取消操作,或者直接調用 cancel 函數的話,子 context 就會被取消。

舉個日常業務中常用的例子:

// 一般操作比較耗時或者涉及遠程調用等,都會在輸入參數裏帶上一個ctx,這也是公司代碼規範裏提倡的
func Do(ctx context.Context, ...) {
  ctx, cancel := context.WithCancel(parentCtx)
  // 實現某些業務邏輯
  // 當遇到某種條件,比如程序出錯,就取消掉子Context,這樣子Context綁定的協程也可以跟着退出
  if err != nil {
    cancel()
  }
}

(三)WithTimeout

// 函數聲明
func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc)
// 用法:返回一個子Context(會在一段時間後自動取消),主動取消函數
ctx := context.WithTimeout(parentCtx, 5*time.Second)

這個函數在日常工作中使用得非常多,簡單來說就是給 Context 附加一個超時控制,當超時 ctx.Done() 返回的 channel 就能讀取到值,協程可以通過這個方式來判斷執行時間是否滿足要求。

舉個日常業務中常用的例子:

// 一般操作比較耗時或者涉及遠程調用等,都會在輸入參數裏帶上一個ctx,這也是公司代碼規範裏提倡的
func Do(ctx context.Context, ...) {
  ctx, cancel := context.WithTimeout(parentCtx)
  // 實現某些業務邏輯
  for {
    select {
     // 輪詢檢測是否已經超時
      case <-ctx.Done():
        return
      // 有時也會附加一些錯誤判斷
      case <-errCh:
        cancel()
      default:
    }
  }
}

現在大部分 go 庫都實現了超時判斷邏輯,我們只需要傳入 ctx 就好。

(四)WithDeadline

// 函數聲明
func WithDeadline(parent Context, d time.Time) (Context, CancelFunc)
// 用法:返回一個子Context(會在指定的時間自動取消),主動取消函數
ctx, cancel := context.WithDeadline(parentCtx, time.Now().Add(5*time.Second))

這個函數感覺用得比較少,和 WithTimeout 相比的話就是使用的是截止時間。

(五)WithValue

// 函數聲明
func WithValue(parent Context, key, val interface{}) Context
// 用法: 傳入父Context和(key, value),相當於存一個kv
ctx := context.WithValue(parentCtx, "name", 123)
// 用法:將key對應的值取出
v := ctx.Value("name")

這個函數常用來保存一些鏈路追蹤信息,比如 API 服務裏會有來保存一些來源 ip、請求參數等。

因爲這個方法實在是太常用了,比如 grpc-go 裏的 metadata 就使用這個方法將結構體存儲在 ctx 裏。

func NewOutgoingContext(ctx context.Context, md MD) context.Context {
    return context.WithValue(ctx, mdOutgoingKey{}, rawMD{md: md})
}

三、Context 源碼實現

(一)理解 Context

雖然我們平時寫代碼時直接 context.Context 拿來就用,但實際上 context.Context 是一個接口,源碼裏是有多種不同的實現的,藉此實現不同的功能。

type Context interface {
  // 返回這個ctx預期的結束時間
  Deadline() (deadline time.Time, ok bool)
  // 返回一個channel,當執行結束或者取消時被close,我們平時可以用這個來判斷ctx綁定的協程是否該退出。實現裏用的懶漢模式,所以一開始可能會返回nil
  Done() <-chan struct{}
  // 如果未完成,返回nil。已完成源碼裏目前就兩種錯誤,已被取消或者已超時
  Err() error
  // 返回ctx綁定的key對應的value值
  Value(key interface{}) interface{}
}

context 整體是一個樹形結構,不同的 ctx 間可能是兄弟節點或者是父子節點的關係。

同時由於 Context 接口有多種不同的實現,所以樹的節點可能也是多種不同的 ctx 實現。總的來說我覺得 Context 的特點是:

在源碼裏實際只有 4 種實現,要弄懂 context 的源碼其實把這 4 種對應的實現學習一下就行,他們分別是:

現在先簡單對這幾個實現有個概念,後面會對其中核心關鍵的部分講解下。

(二)Context 類圖

從類圖中可以看出,源碼裏有 4 種結構和 3 種接口,相對於其他 go 庫源碼來說是比較簡單的。

核心的接口是 Context,裏面包含了最常用的判斷是否處理完成的 Done() 方法 。其他所有結構都通過①實現方法或②組合的方式來實現該接口。

核心的結構是 cancelCtx,被 timerCtx 包含。cancelCtx 和 timerCtx 可以說代表了 Context 庫最核心的取消和超時相關的實現,也最爲複雜些。

(三)Context 源碼

因爲篇幅關係,不會把每一行源碼都拎出來,會挑比較重點的方法講下。由於平時我們使用都是通過幾個固定的方法入口,所以會圍繞這幾個方法講下

對外體現

var (
   background = new(emptyCtx)
   todo       = new(emptyCtx)
)
func Background() Context {
   return background
}
func TODO() Context {
   return todo
}

TODO(),Background() 其實都是返回一個 emptyCtx。

實現

type emptyCtx int
func (*emptyCtx) Deadline() (deadline time.Time, ok bool) {
   return
}
func (*emptyCtx) Done() <-chan struct{} {
   return nil
}
func (*emptyCtx) Err() error {
   return nil
}
func (*emptyCtx) Value(key interface{}) interface{} {
   return nil
}
func (e *emptyCtx) String() string {
   switch e {
   case background:
      return "context.Background"
   case todo:
      return "context.TODO"
   }
   return "unknown empty Context"
}

這個結構非常簡單,都是返回 nil。emptyCtx 主要用於新建一個獨立的樹。比方說,我想在協程裏做些異步操作,但是又想脫離主協程的 ctx 控制如使用獨立的超時限制,就可以使用這種方式。但是在整個 go 程序裏只有 todo 和 background 兩個大根節點,所以 TODO() 和 Background() 其實是新建第二層級的子樹。

func demo(ctx context.Context){
  nctx := context.TODO()
  nctx := context.WithTimeout(nctx, 5*time.Second)
  ...
}

對外體現

// 設置key, value值
func WithValue(parent Context, key, val interface{}) Context {
   if key == nil {
      panic("nil key")
   }
   if !reflectlite.TypeOf(key).Comparable() {
      panic("key is not comparable")
   }
   // 在當前節點下生成新的子節點
   return &valueCtx{parent, key, val}
}
// 根據key讀取value
func (c *valueCtx) Value(key interface{}) interface{} {
   if c.key == key {
      return c.val
   }
   return c.Context.Value(key)
}

通過公共方法設置值,再通過 valueCtx 的內部方法獲取值。後面再仔細講下 Value 的實現方式。

實現

type valueCtx struct {
   Context
   key, val interface{}
}
// 根據key讀取value
func (c *valueCtx) Value(key interface{}) interface{} {
  // 每個ctx只綁定一個key,匹配則返回。否則向上追溯到匹配爲止
   if c.key == key {
      return c.val
   }
   return c.Context.Value(key)
}

從實現上可以看出,每當我們往 ctx 裏調 WithValue 塞值時,都會生成一個新的子節點。調用的次數多了,生成的子樹就很龐大。 

若當前節點的 key 和傳入的 key 不匹配會沿着繼承關係向上遞歸查找。遞歸到根就變成 nil,表示當前 key 在該子樹序列裏沒存。

介紹完上面兩種比較簡單的結構後,終於要來到複雜的 cancelCtx。cancelCtx 和 timerCtx 關聯性很強,基本上弄懂一個,另外一個也差不多了。

對外方法

func WithCancel(parent Context) (ctx Context, cancel CancelFunc) {
   // 新建一個cancelCtx
   c := newCancelCtx(parent)
   // 將父節點的取消函數和子節點關聯,做到父節點取消,子節點也跟着取消
   propagateCancel(parent, &c)
   // 返回當前節點和主動取消函數(調用會將自身從父節點移除,並返回一個已取消錯誤)
   return &c, func() { c.cancel(true, Canceled) }
}

對外的方法裏包含的幾個方法都是重點的方法,後面主要講下

結構

type cancelCtx struct {
   Context
   mu       sync.Mutex            // protects following fields
   done     chan struct{}         // created lazily, closed by first cancel call
   children map[canceler]struct{} // set to nil by the first cancel call
   err      error                 // set to non-nil by the first cancel call
}
type canceler interface {
   cancel(removeFromParent bool, err error)
   Done() <-chan struct{}
}

這個接口約定了可以取消的 context,比如 cancelCtx 和 timerCtx 是可以取消的,emptyCtx 和 valueCtx 是不可以取消的。

初始化

// newCancelCtx returns an initialized cancelCtx.
func newCancelCtx(parent Context) cancelCtx {
   return cancelCtx{Context: parent}
}

初始化就是將父節點設置了一下,其他不設置。

cancelCtx 的取消實現

// cancel closes c.done, cancels each of c's children, and, if
// removeFromParent is true, removes c from its parent's children.
func (c *cancelCtx) cancel(removeFromParent bool, err error) {
  // 取消無論是通過父節點還是自身主動取消,err都不爲空
   if err == nil {
      panic("context: internal error: missing cancel error")
   }
   c.mu.Lock()
   if c.err != nil {
     // c.err 不爲空表示已經被取消過,比如父節點取消時子節點可能已經主動調用過取消函數
      c.mu.Unlock()
      return // already canceled
   }
   c.err = err
   if c.done == nil {
     // closedchan 是一個已經關閉的channel,要特殊處理是因爲c.done是懶加載的方式。只有調用c.Done()時纔會實際創建
      c.done = closedchan
   } else {
      close(c.done)
   }
   // 遞歸取消子節點
   for child := range c.children {
      // NOTE: acquiring the child's lock while holding parent's lock.
      child.cancel(false, err)
   }
   c.children = nil
   c.mu.Unlock()
  // 從父節點中移除當前節點
   if removeFromParent {
      removeChild(c.Context, c)
   }
}

整個過程可以總結爲:

這裏 child.cancel(false,err) 不從父節點移除子節點是因爲當前節點操作已取過鎖,移除操作會再取鎖造成衝突,故先全部 cancel 後再將 children 置爲 nil 一次性移除。

propagateCancel 綁定父子節點的取消關係

// propagateCancel arranges for child to be canceled when parent is.
func propagateCancel(parent Context, child canceler) {
   done := parent.Done()
   if done == nil {
     // 若當前節點追溯到根沒有cancelCtx或者timerCtx的話,表示當前節點的祖先沒有可以取消的結構,後面的父子綁定的操作就可以不用做了,可參考下圖
      return // parent is never canceled
   }
   select {
   case <-done:
     // 父節點已取消就直接取消子節點,無需移除是因爲父子關係還沒加到parent.children
      // parent is already canceled
      child.cancel(false, parent.Err())
      return
   default:
   }
  // 獲取最近的可取消的祖先
   if p, ok := parentCancelCtx(parent); ok {
      p.mu.Lock()
      if p.err != nil {
      // 和前面一樣,如果祖先節點已經取消過了,後面就沒必要綁定,直接取消就好
         // parent has already been canceled
         child.cancel(false, p.err)
      } else {
        // 綁定父子關係
         if p.children == nil {
            p.children = make(map[canceler]struct{})
         }
         p.children[child] = struct{}{}
      }
      p.mu.Unlock()
   } else {
     // 當ctx是開發者自定義的並繼承context.Context接口會進入這個分支,另起一個協程來監聽取消動作,因爲開發者自定義的習慣可能和源碼中用c.done和c.err的判斷方式有所不同
      atomic.AddInt32(&goroutines, +1)
      go func() {
         select {
         case <-parent.Done():
            child.cancel(false, parent.Err())
         case <-child.Done():
         }
      }()
   }
}

①當祖先繼承鏈裏沒有 cancelCtx 或 timerCtx 等實現時,Done() 方法總是返回 nil,可以作爲前置判斷。

②parentCancelCtx 取的是可以取消的最近祖先節點。

總結

總結一下,cancelCtx 的作用其實就兩個:

結構體

type timerCtx struct {
   cancelCtx
   timer *time.Timer // Under cancelCtx.mu.
   deadline time.Time
}

相比 cancelCtx 多了一個計時器和截止時間。

取消方法

func (c *timerCtx) cancel(removeFromParent bool, err error) {
   c.cancelCtx.cancel(false, err)
   if removeFromParent {
      // Remove this timerCtx from its parent cancelCtx's children.
      removeChild(c.cancelCtx.Context, c)
   }
   c.mu.Lock()
   if c.timer != nil {
      c.timer.Stop()
      c.timer = nil
   }
   c.mu.Unlock()
}

取消方法就是直接調用 cancelCtx 的取消外加計時器停止。

對外方法

func WithDeadline(parent Context, d time.Time) (Context, CancelFunc) {
   if cur, ok := parent.Deadline(); ok && cur.Before(d) {
     // 傳入的截止時間在父節點截止時間之後,則父節點取消時會同步取消當前子節點,不需要額外再設置計費器了,可以當普通的cancelCtx對待。
      // The current deadline is already sooner than the new one.
      return WithCancel(parent)
   }
   c := &timerCtx{
      cancelCtx: newCancelCtx(parent),
      deadline:  d,
   }
   propagateCancel(parent, c)
   dur := time.Until(d)
   if dur <= 0 {
     // 已超時直接取消
      c.cancel(true, DeadlineExceeded) // deadline has already passed
      return c, func() { c.cancel(false, Canceled) }
   }
   c.mu.Lock()
   defer c.mu.Unlock()
   if c.err == nil {
     // 間隔時間到後主動觸發取消
      c.timer = time.AfterFunc(dur, func() {
         c.cancel(true, DeadlineExceeded)
      })
   }
   return c, func() { c.cancel(true, Canceled) }
}

四、總結

綜上所述,Context 的主要功能就是用於控制協程退出和附加鏈路信息。核心實現的結構體有 4 個,最複雜的是 cancelCtx,最常用的是 cancelCtx 和 valueCtx。整體呈樹狀結構,父子節點間同步取消信號。

** 作者簡介**

郭君

騰訊後臺開發工程師

騰訊後臺開發工程師,目前從事人工智能工程化落地相關工作。

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/OCpVRwtiphFRZgu9zdae5g