在 Go 中如何正確重試請求?

導語 | 我們平時在開發中肯定避不開的一個問題是如何在不可靠的網絡服務中實現可靠的網絡通信,其中 http 請求重試是經常用的技術。但是 Go 標準庫 net/http 實際上是沒有重試這個功能的,所以本篇文章主要講解如何在 Go 中實現請求重試。

一、概述

要理解 cpo 機制的產生和使用,並不是一件容易的事。說實話,筆者第一次看到這個機制。

一般而言,對於網絡通信失敗的處理分爲以下幾步:

二、重試策略

重試策略可以分爲很多種,一方面要考慮到本次請求時長過長而影響到的業務忍受度,另一方面要考慮到重試會對下游服務產生過多的請求而帶來的影響,總之就是一個 trade-off 的問題。

所以對於重試算法,一般是在重試之間加一個 gap 時間,感興趣的朋友也可以去看看這篇‍文章(https://aws.amazon.com/cn/blogs/architecture/exponential-backoff-and-jitter/)。結合我們自己平時的實踐加上這篇文章的算法一般可以總結出以下幾條規則:

上面有兩種策略都加入了擾動(jitter),目的是防止驚羣問題 (Thundering Herd Problem)的發生。

所謂驚羣問題當許多進程都在等待被同一事件喚醒的時候,當事件發生後最後只有一個進程能獲得處理。其餘進程又造成阻塞,這會造成上下文切換的浪費。所以加入一個隨機時間來避免同一時間同時請求服務端還是很有必要的。

使用 net/http 重試所帶來的問題

重試這個操作其實對於 Go 來說其實還不能直接加一個 for 循環根據次數來進行,對於 Get 請求重試的時候沒有請求體,可以直接進行重試,但是對於 Post 請求來說需要把請求體放到 Reader 裏面,如下:

req, _ := http.NewRequest("POST", "localhost", strings.NewReader("hello"))

服務端收到請求之後就會從這個 Reader 中調用 Read() 函數去讀取數據,通常情況當服務端去讀取數據的時候,offset 會隨之改變,下一次再讀的時候會從 offset 位置繼續向後讀取。所以如果直接重試,會出現讀不到 Reader 的情況。

我們可以先弄一個例子:

func main() {
  go func() {
    http.HandleFunc("/", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
      time.Sleep(time.Millisecond * 20)
      body, _ := ioutil.ReadAll(r.Body)  
      fmt.Printf("received body with length %v containing: %v\n", len(body), string(body))
      w.WriteHeader(http.StatusOK)
    }))
    http.ListenAndServe(":8090", nil)
  }()
  fmt.Print("Try with bare strings.Reader\n") 
  retryDo(req)
}
func retryDo() {
  originalBody := []byte("abcdefghigklmnopqrst")
  reader := strings.NewReader(string(originalBody))
  req, _ := http.NewRequest("POST", "http://localhost:8090/", reader)
  client := http.Client{
    Timeout: time.Millisecond * 10,
  }
  for {
    _, err := client.Do(req)
    if err != nil {
      fmt.Printf("error sending the first time: %v\n", err)
    } 
    time.Sleep(1000)
  }
}
// output:
error sending the first time: Post "http://localhost:8090/": context deadline exceeded (Client.Timeout exceeded while awaiting headers)
error sending the first time: Post "http://localhost:8090/": http: ContentLength=20 with Body length 0
error sending the first time: Post "http://localhost:8090/": http: ContentLength=20 with Body length 0
received body with length 20 containing: abcdefghigklmnopqrst
error sending the first time: Post "http://localhost:8090/": http: ContentLength=20 with Body length 0
....

在上面這個例子中,在客戶端設值了 10ms 的超時時間。在服務端模擬請求處理超時情況,先 sleep 20ms,然後再讀請求數據,這樣必然會超時。

當再次請求的時候,發現 client 請求的 Body 數據並不是我們預期的 20 個長度,而是 0,導致了 err。因此需要將 Body 這個 Reader 進行重置,如下:

func resetBody(request *http.Request, originalBody []byte) {
  request.Body = io.NopCloser(bytes.NewBuffer(originalBody))
  request.GetBody = func() (io.ReadCloser, error) {
    return io.NopCloser(bytes.NewBuffer(originalBody)), nil
  }
}

上面這段代碼中,我們使用 io.NopCloser 對請求的 Body 數據進行了重置,避免下次請求的時候出現非預期的異常。

那麼相對於上面簡陋的例子,還可以完善一下,加上我們上面說的 StatusCode 重試判斷、重試策略、重試次數等等,可以寫成這樣:

func retryDo(req *http.Request, maxRetries int, timeout time.Duration,
  backoffStrategy BackoffStrategy) (*http.Response, error) {
  var (
    originalBody []byte
    err          error
  )
  if req != nil && req.Body != nil {
    originalBody, err = copyBody(req.Body)
    resetBody(req, originalBody)
  }
  if err != nil {
    return nil, err
  }
  AttemptLimit := maxRetries
  if AttemptLimit <= 0 {
    AttemptLimit = 1
  }
  client := http.Client{
    Timeout: timeout,
  }
  var resp *http.Response
  //重試次數
  for i := 1; i <= AttemptLimit; i++ {
    resp, err = client.Do(req)
    if err != nil {
      fmt.Printf("error sending the first time: %v\n", err)
    } 
    // 重試 500 以上的錯誤碼
    if err == nil && resp.StatusCode < 500 {
      return resp, err
    }
    // 如果正在重試,那麼釋放fd
    if resp != nil {
      resp.Body.Close()
    }
    // 重置body
    if req.Body != nil {
      resetBody(req, originalBody)
    }
    time.Sleep(backoffStrategy(i) + 1*time.Microsecond)
  }
  // 到這裏,說明重試也沒用
  return resp, req.Context().Err()
}
func copyBody(src io.ReadCloser) ([]byte, error) {
  b, err := ioutil.ReadAll(src)
  if err != nil {
    return nil, ErrReadingRequestBody
  }
  src.Close()
  return b, nil
}
func resetBody(request *http.Request, originalBody []byte) {
  request.Body = io.NopCloser(bytes.NewBuffer(originalBody))
  request.GetBody = func() (io.ReadCloser, error) {
    return io.NopCloser(bytes.NewBuffer(originalBody)), nil
  }
}

三、對沖策略

上面講的是重試的概念,那麼有時候我們接口只是偶然會出問題,並且我們的下游服務並不在乎多請求幾次,那麼我們可以借用 grpc 裏面的概念:對沖策略(Hedged requests)。

對沖是指在不等待響應的情況主動發送單次調用的多個請求,然後取首個返回的回包。對沖和重試的區別點主要在:對沖在超過指定時間沒有響應就會直接發起請求,而重試則必須要服務端響應後纔會發起請求。所以對沖更像是比較激進的重試策略。

使用對沖的時候需要注意一點是,因爲下游服務可能會做負載均衡策略,所以要求請求的下游服務一般是要求冪等的,能夠在多次併發請求中是安全的,並且是符合預期的。

對沖請求一般是用來處理 “長尾” 請求的,關於”長尾“請求的概念可以看這篇文章:https://segmentfault.com/a/1190000039978117

四、併發模式的處理

因爲對沖重試加上了併發的概念,要用到 goroutine 來併發請求,所以我們可以把數據封裝到 channel 裏面來進行消息的異步處理。

並且由於是多個 goroutine 處理消息,我們需要在每個 goroutine 處理完畢,但是都失敗的情況下返回 err,不能直接由於 channel 等待卡住主流程,這一點十分重要。

但是由於在 Go 中是無法獲取每個 goroutine 的執行結果的,我們又只關注正確處理結果,需要忽略錯誤,所以需要配合 WaitGroup 來實現流程控制,示例如下:

func main() {
  totalSentRequests := &sync.WaitGroup{}
  allRequestsBackCh := make(chan struct{})
  multiplexCh := make(chan struct {
    result string
    retry  int
  })
  go func() {
    //所有請求完成之後會close掉allRequestsBackCh
    totalSentRequests.Wait()
    close(allRequestsBackCh)
  }()
  for i := 1; i <= 10; i++ {
    totalSentRequests.Add(1)
    go func() {
      // 標記已經執行完
      defer totalSentRequests.Done()
      // 模擬耗時操作
      time.Sleep(500 * time.Microsecond)
      // 模擬處理成功
      if random.Intn(500)%2 == 0 {
        multiplexCh <- struct {
          result string
          retry  int
        }{"finsh success", i}
      }
      // 處理失敗不關心,當然,也可以加入一個錯誤的channel中進一步處理
    }()
  }
  select {
  case <-multiplexCh:
    fmt.Println("finish success")
  case <-allRequestsBackCh:
    // 到這裏,說明全部的 goroutine 都執行完畢,但是都請求失敗了
    fmt.Println("all req finish,but all fail")
  }
}

從上面這段代碼看爲了進行流程控制,多用了兩個 channel:totalSentRequests、allRequestsBackCh,多用了一個 goroutine 異步關停 allRequestsBackCh,才實現的流程控制,實在太過於麻煩,有新的實現方案的同學不妨和我探討一下。

除了上面的併發請求控制的問題,對於對沖重試來說,還需要注意的是,由於請求不是串行的,所以 http.Request 的上下文會變,所以每次請求前需要 clone 一次 context,保證每個不同請求的 context 是獨立的。但是每次 clone 之後 Reader 的 offset 位置又變了,所以我們還需要進行重新 reset:

func main() {
  req, _ := http.NewRequest("POST", "localhost", strings.NewReader("hello"))
  req2 := req.Clone(req.Context())
  contents, _ := io.ReadAll(req.Body)
  contents2, _ := io.ReadAll(req2.Body)
  fmt.Printf("First read: %v\n", string(contents))
  fmt.Printf("Second read: %v\n", string(contents2))
}
//output:
First read: hello
Second read:

所以結合一下上面的例子,我們可以將對沖重試的代碼變爲:

func retryHedged(req *http.Request, maxRetries int, timeout time.Duration,
  backoffStrategy BackoffStrategy) (*http.Response, error) {
  var (
    originalBody []byte
    err          error
  )
  if req != nil && req.Body != nil {
    originalBody, err = copyBody(req.Body)
  }
  if err != nil {
    return nil, err
  }
  AttemptLimit := maxRetries
  if AttemptLimit <= 0 {
    AttemptLimit = 1
  }
  client := http.Client{
    Timeout: timeout,
  }
  // 每次請求copy新的request
  copyRequest := func() (request *http.Request) {
    request = req.Clone(req.Context())
    if request.Body != nil {
      resetBody(request, originalBody)
    }
    return
  }
  multiplexCh := make(chan struct {
    resp  *http.Response
    err   error
    retry int
  })
  totalSentRequests := &sync.WaitGroup{}
  allRequestsBackCh := make(chan struct{})
  go func() {
    totalSentRequests.Wait()
    close(allRequestsBackCh)
  }()
  var resp *http.Response
  for i := 1; i <= AttemptLimit; i++ {
    totalSentRequests.Add(1)
    go func() {
      // 標記已經執行完
      defer totalSentRequests.Done()
      req = copyRequest()
      resp, err = client.Do(req)
      if err != nil {
        fmt.Printf("error sending the first time: %v\n", err)
      }
      // 重試 500 以上的錯誤碼
      if err == nil && resp.StatusCode < 500 {
        multiplexCh <- struct {
          resp  *http.Response
          err   error
          retry int
        }{resp: resp, err: err, retry: i}
        return
      }
      // 如果正在重試,那麼釋放fd
      if resp != nil {
        resp.Body.Close()
      }
      // 重置body
      if req.Body != nil {
        resetBody(req, originalBody)
      }
      time.Sleep(backoffStrategy(i) + 1*time.Microsecond)
    }()
  }
  select {
  case res := <-multiplexCh:
    return res.resp, res.err
  case <-allRequestsBackCh:
    // 到這裏,說明全部的 goroutine 都執行完畢,但是都請求失敗了
    return nil, errors.New("all req finish,but all fail")
  }
}

五、熔斷&降級

因爲在我們使用 http 調用的時候,調用的外部服務很多時候其實並不可靠,很有可能因爲外部的服務問題導致自身服務接口調用等待,從而調用時間過長,產生大量的調用積壓,慢慢耗盡服務資源,最終導致服務調用雪崩的發生,所以在服務中使用熔斷降級是非常有必要的一件事。

其實熔斷降級的概念總體上來說,實現都差不多。核心思想就是通過全局的計數器,用來統計調用次數、成功 / 失敗次數。通過統計的計數器來判斷熔斷器的開關,熔斷器的狀態由三種狀態表示:closed、open、half open,下面借用了 sentinel 的圖來表示三者的關係:

首先初始狀態是 closed,每次調用都會經過計數器統計總次數和成功 / 失敗次數,然後在達到一定閾值或條件之後熔斷器會切換到 open 狀態,發起的請求會被拒絕。

熔斷器規則中會配置一個熔斷超時重試的時間,經過熔斷超時重試時長後熔斷器會將狀態置爲 half-open 狀態。這個狀態對於 sentinel 來說會發起定時探測,對於 go-zero 來說會允許通過一定比例的請求,不管是主動定時探測,還是被動通過的請求調用,只要請求的結果返回正常,那麼就需要重置計數器恢復到 closed 狀態。

一般而言會支持兩種熔斷策略:

比如我們使用 hystrix-go 來處理我們的服務接口的熔斷,可以結合我們上面說的重試從而進一步保障我們的服務。

hystrix.ConfigureCommand("my_service", hystrix.CommandConfig{ 
        ErrorPercentThreshold:  30,
    })
    _ = hystrix.Do("my_service", func() error { 
        req, _ := http.NewRequest("POST", "http://localhost:8090/", strings.NewReader("test"))
        _, err := retryDo(req, 5, 20*time.Millisecond, ExponentialBackoff)
        if err != nil {
            fmt.Println("get error:%v",err)
            return err
        }
        return nil
    }, func(err error) error {
        fmt.Printf("handle  error:%v\n", err)
        return nil
    })

上面這個例子中就利用 hystrix-go 設置了最大錯誤百分比等於 30,超過這個閾值就會進行熔斷。

總結

這篇文章從接口調用出發,探究了重試的幾個要點,講解了重試的幾種策略;然後在實踐環節中講解了直接使用 net/http 重試會有什麼問題,對於對沖策略使用 channel 加上 waitgroup 來實現併發請求控制;最後使用 hystrix-go 來對故障服務進行熔斷,防止請求堆積引起資源耗盡的問題。

參考資料:

  1. 從 gRPC 的重試策略說起

2.Go HTTP 如何正確重試 

  1. 熔斷原理與實現

  2. 處理過載

5.Google 怎麼解決長尾延遲問題

作者簡介

羅志贇

騰訊後臺開發工程師

騰訊後臺開發工程師,深入研究過 Go runtime 相關代碼,喜歡專研技術細節,探索技術中有趣的實現分享給大家。

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/D2mLZI9NXMjgQwLOReQCgg