Go 併發模式: Context

在 Go 服務中,每個傳入的請求在單獨的 goroutine 中處理。請求回調函數通常啓動額外的 goroutine 以訪問後端,如數據庫和 RPC 服務。處理同一請求的一系列 goroutine 通常需要訪問請求相關的值,例如端用戶的標識、授權令牌和請求截止時間。當請求被取消或超時,處理該請求的所有 goroutine 都應該快速退出,以便系統可以回收它們正在使用的資源。

在 Google,我們開發了一個上下文包,可以輕鬆地跨越 API 邊界,將請求作用域內的值、取消信號和截止時間傳遞給所有處理請求的 goroutine。該包的公共可用版本爲 context。本文描述瞭如何使用這個包,並提供了一個完整的示例。

Context

context 包的核心是 Context 類型:

// A Context carries a deadline, cancelation signal, and request-scoped values
// across API boundaries. Its methods are safe for simultaneous use by multiple
// goroutines.
type Context interface {
    // Done returns a channel that is closed when this Context is canceled
    // or times out.
    Done() <-chan struct{}

    // Err indicates why this context was canceled, after the Done channel
    // is closed.
    Err() error

    // Deadline returns the time when this Context will be canceled, if any.
    Deadline() (deadline time.Time, ok bool)

    // Value returns the value associated with key or nil if none.
    Value(key interface{}) interface{}
}

(此描述是精簡的;godoc 是權威的。)

Done 方法返回一個 channel,充當傳遞給 Context 下運行的函數的取消信號:當 channel 關閉時,函數應該放棄它們的工作並返回。Err 方法返回一個錯誤,表明取消 context 的原因。文章 Pipelines and Cancelation 更詳細地討論了 Done channel 的習慣用法。

Context 沒有 Cancel 方法,原因與 Done channel 是隻讀的一樣:接收取消信號的函數通常不是發送信號的函數。特別是當父操作爲子操作啓動 goroutine 時,子操作不應該有能力取消父操作。相反,WithCancel 函數(如下所述)提供了一種取消新 Context 值的方法。

多個 goroutine 同時使用同一 Context 是安全的。代碼可以將單個 Context 傳遞給任意數量的 goroutine,並取消該 Context 以向所有 goroutine 發送信號。

Deadline 方法允許函數決定是否應該開始工作;如果剩下的時間太少,則可能不值得。代碼還可以使用截止時間來設置 I/O 操作超時。

Value 允許 Context 攜帶請求作用域的數據。爲使多個 goroutine 同時使用,這些數據必須是安全的。

Derived contexts

context 包提供了從現有 Context 值派生新 Context 值的函數。這些值形成一個樹:當 Context 被取消時,從它派生的所有 Context 也被取消。

Background 是所有 Context 樹的根;它永遠不會被取消:

// Background returns an empty Context. It is never canceled, has no deadline,
// and has no values. Background is typically used in main, init, and tests,
// and as the top-level Context for incoming requests.
func Background() Context

WithCancelWithTimeout 返回派生 Context 值,可以比父 Context 更早取消。當請求回調函數返回時,通常會取消與傳入請求關聯的 ContextWithCancel 還可用於使用多個副本時取消冗餘的請求。WithTimeout 用於設置對後端服務器請求的截止時間:

// WithCancel returns a copy of parent whose Done channel is closed as soon as
// parent.Done is closed or cancel is called.
func WithCancel(parent Context) (ctx Context, cancel CancelFunc)

// A CancelFunc cancels a Context.
type CancelFunc func()

// WithTimeout returns a copy of parent whose Done channel is closed as soon as
// parent.Done is closed, cancel is called, or timeout elapses. The new
// Context's Deadline is the sooner of now+timeout and the parent's deadline, if
// any. If the timer is still running, the cancel function releases its
// resources.
func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc)

WithValue 提供了一種將請求作用域的值與 Context 關聯的方法:

// WithValue returns a copy of parent whose Value method returns val for key.
func WithValue(parent Context, key interface{}, val interface{}) Context

通過一個示例瞭解使用 context 包的最佳方法。

示例:Google Web 搜索

我們的示例是一個 HTTP 服務器,它處理 URL,如 /search?q=golang&timeout=1s,將查詢 “golang” 轉發到 google web search api 並渲染結果。timeout 參數告訴服務器在該延時後取消請求。

代碼分爲三個包:

server 程序

server 程序處理諸如 /search?q=golang 的請求爲 golang 提供 Google 搜索結果。它註冊 handlesearch 來處理 /search endpoint。回調函數創建一個名爲 ctx 的初始 Context,並安排了在回調函數返回時取消它。如果請求包含 timeout URL 參數,則在超時結束時 Context 將自動取消:

func handleSearch(w http.ResponseWriter, req *http.Request) {
    // ctx is the Context for this handler. Calling cancel closes the
    // ctx.Done channel, which is the cancellation signal for requests
    // started by this handler.
    var (
        ctx    context.Context
        cancel context.CancelFunc
    )
    timeout, err := time.ParseDuration(req.FormValue("timeout"))
    if err == nil {
        // The request has a timeout, so create a context that is
        // canceled automatically when the timeout expires.
        ctx, cancel = context.WithTimeout(context.Background(), timeout)
    } else {
        ctx, cancel = context.WithCancel(context.Background())
    }
    defer cancel() // Cancel ctx as soon as handleSearch returns.

回調函數從請求中提取查詢,並通過調用 userip 包提取客戶機的 IP 地址。後端請求需要客戶端的 IP 地址,因此 handlesearch 將其附加到 ctx

// Check the search query.
   query := req.FormValue("q")
   if query == "" {
       http.Error(w, "no query", http.StatusBadRequest)
       return
   }

   // Store the user IP in ctx for use by code in other packages.
   userIP, err := userip.FromRequest(req)
   if err != nil {
       http.Error(w, err.Error(), http.StatusBadRequest)
       return
   }
   ctx = userip.NewContext(ctx, userIP)

回調函數使用 ctxquery 調用 google.Search

// Run the Google search and print the results.
start := time.Now()
results, err := google.Search(ctx, query)
elapsed := time.Since(start)

如果搜索成功,回調函數渲染結果:

if err := resultsTemplate.Execute(w, struct {
       Results          google.Results
       Timeout, Elapsed time.Duration
   }{
       Results: results,
       Timeout: timeout,
       Elapsed: elapsed,
   }); err != nil {
       log.Print(err)
       return
   }

Package userip

userip 包提供了從請求中提取用戶 IP 地址並將其與 Context 關聯的函數。Context 提供鍵 - 值映射,其中鍵和值都是 interface{} 類型。鍵類型必須支持相等,且值必須安全地供多個 goroutine 同時使用。像 userip 這樣的包隱藏映射的細節,並提供對特定 Context 值的強類型訪問。

爲了避免鍵衝突, userip 定義了一個未導出的類型 key,並使用此類型的值作爲 Context 鍵:

// The key type is unexported to prevent collisions with context keys defined in
// other packages.
type key int

// userIPkey is the context key for the user IP address.  Its value of zero is
// arbitrary.  If this package defined other context keys, they would have
// different integer values.
const userIPKey key = 0

fromrequesthttp.request 中提取 userip 的值:

func FromRequest(req *http.Request) (net.IP, error) {
    ip, _, err := net.SplitHostPort(req.RemoteAddr)
    if err != nil {
        return nil, fmt.Errorf("userip: %q is not IP:port", req.RemoteAddr)
}

newcontext 返回一個的攜帶入參 userip 值的新 Context

func NewContext(ctx context.Context, userIP net.IP) context.Context {
    return context.WithValue(ctx, userIPKey, userIP)
}

fromcontextcontext 中提取 userip:

func FromContext(ctx context.Context) (net.IP, bool) {
    // ctx.Value returns nil if ctx has no value for the key;
    // the net.IP type assertion returns ok=false for nil.
    userIP, ok := ctx.Value(userIPKey).(net.IP)
    return userIP, ok
}

Package google

該 google.Search 函數向 google web search api 發出 HTTP 請求,並解析 JSON 編碼的結果。它接受 Context 參數 ctx,請求運行時,如果 ctx.done 關閉,則立刻返回。

google web search api 請求包括搜索 queryuser ip 作爲查詢參數:

func Search(ctx context.Context, query string) (Results, error) {
    // Prepare the Google Search API request.
    req, err := http.NewRequest("GET""https://ajax.googleapis.com/ajax/services/search/web?v=1.0", nil)
    if err != nil {
        return nil, err
    }
    q := req.URL.Query()
    q.Set("q", query)

    // If ctx is carrying the user IP address, forward it to the server.
    // Google APIs use the user IP to distinguish server-initiated requests
    // from end-user requests.
    if userIP, ok := userip.FromContext(ctx); ok {
        q.Set("userip", userIP.String())
    }
    req.URL.RawQuery = q.Encode()

search 使用一個 helper 函數 httpdo 來發出 HTTP 請求;在處理請求或響應時,如果 ctx.done 關閉,將取消調用。search 將閉包傳遞給 httpdo 處理 HTTP 響應:

var results Results
  err = httpDo(ctx, req, func(resp *http.Response, err error) error {
      if err != nil {
          return err
      }
      defer resp.Body.Close()

      // Parse the JSON search result.
      // https://developers.google.com/web-search/docs/#fonje
      var data struct {
          ResponseData struct {
              Results []struct {
                  TitleNoFormatting string
                  URL               string
              }
          }
      }
      if err := json.NewDecoder(resp.Body).Decode(&data); err != nil {
          return err
      }
      for _, res := range data.ResponseData.Results {
          results = append(results, Result{Title: res.TitleNoFormatting, URL: res.URL})
      }
      return nil
  })
  // httpDo waits for the closure we provided to return, so it's safe to
  // read results here.
  return results, err

httpdo 函數運行 HTTP 請求並在新的 goroutine 中處理其響應。如果 ctx.donegoroutine 退出之前關閉,將取消請求:

func httpDo(ctx context.Context, req *http.Request, f func(*http.Response, error) error) error {
    // Run the HTTP request in a goroutine and pass the response to f.
    c := make(chan error, 1)
    req = req.WithContext(ctx)
    go func() { c <- f(http.DefaultClient.Do(req)) }()
    select {
    case <-ctx.Done():
        <-c // Wait for f to return.
        return ctx.Err()
    case err := <-c:
        return err
    }
}

根據 Context 調整代碼

許多服務框架提供了包和類型,用於承載請求作用域的值。我們可以定義 Context 接口的新實現,以便在使用現有框架的代碼和需要 Context 參數的代碼之間架起橋樑。

例如,Gorilla 的 github.com/gorilla/context 包允許處理程序通過提供從 HTTP 請求到鍵值對的映射,將數據與傳入請求相關聯。在 gorilla.go,我們提供了一個 Context 實現,其 Value 方法返回與 Gorilla 包中 HTTP 請求相關聯的值。

其他包提供了類似於 Context 的取消支持。例如,Tomb 提供了一個 kill 方法,通過關閉一個 dying channel 發出取消信號。tomb 還提供了等待這些 goroutine 退出的方法,類似於 sync.WaitGroup. 在 tomb.go,我們提供了一個 Context 實現,當其父 Context 被取消或提供的 tomb 被殺死時,該 Context 實現被取消。

總結

在 Google,我們要求 Go 程序員將 Context 參數作爲第一個參數傳遞給傳入和傳出請求之間調用路徑上的每個函數。這使得許多不同團隊開發的 Go 代碼能夠很好地互操作。它提供了對超時和取消的簡單控制,並確保諸如安全憑據之類的關鍵值正確地傳遞到程序中。

想要基於 Context 構建的服務器框架應該提供 Context 的實現,以便在其包和那些需要上下文參數的包之間架起橋樑。它們的客戶端庫接受來自調用代碼的 Context。通過爲請求作用域的數據和取消建立公共接口,Context 使包開發人員更容易共享代碼以創建可伸縮的服務。

轉自:

cyningsun.com/01-19-2021/go-concurrency-patterns-context-cn.html

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/RvJgGrzIytiiF6DWKlhNQA