使用 Go 每分鐘處理百萬請求

圖片拍攝於 2021 年 4 月 4 日,杭州西湖。

介紹

偶然間看到一篇寫於 15 年的文章,說實話,標題確實吸引了我。 

關於這篇文章,我就不直接翻譯了,原文地址我放在文章最後了。 

項目的需求就是很簡單,客戶端發送請求,服務端接收請求處理數據 (原文是把資源上傳至 Amazon S3 資源中)。本質上就是這樣,

我稍微改動了原文的業務代碼,但是並不影響核心模塊。在第一版中,每收到一個 Request,開啓一個 G 進行處理,很常規的操作。

初版

package main
import (
  "fmt"
  "log"
  "net/http"
  "time"
)
type Payload struct {
  // 傳啥不重要
}
func (p *Payload) UpdateToS3() error {
  //存儲邏輯,模擬操作耗時
  time.Sleep(500 * time.Millisecond)
  fmt.Println("上傳成功")
  return nil
}
func payloadHandler(w http.ResponseWriter, r *http.Request) {
  // 業務過濾
  // 請求body解析......
  var p Payload
  go p.UpdateToS3()
  w.Write([]byte("操作成功"))
}
func main() {
  http.HandleFunc("/payload", payloadHandler)
  log.Fatal(http.ListenAndServe(":8099", nil))
}

這樣操作存在什麼問題呢?一般情況下,沒什麼問題。但是如果是高併發的場景下,不對 G 進行控制,你的 CPU 使用率暴漲,內存佔用暴漲......,直至程序奔潰。

如果此操作落地至數據庫,例如 mysql。相應的,你數據庫服務器的磁盤 IO、網絡帶寬 、CPU 負載、內存消耗都會達到非常高的情況,一併奔潰。所以,一旦程序中出現不可控的事物,往往是危險的信號。

中版

package main
import (
  "fmt"
  "log"
  "net/http"
  "time"
)
const MaxQueue = 400
var Queue chan Payload
func init() {
  Queue = make(chan Payload, MaxQueue)
}
type Payload struct {
  // 傳啥不重要
}
func (p *Payload) UpdateToS3() error {
  //存儲邏輯,模擬操作耗時
  time.Sleep(500 * time.Millisecond)
  fmt.Println("上傳成功")
  return nil
}
func payloadHandler(w http.ResponseWriter, r *http.Request) {
  // 業務過濾
  // 請求body解析......
  var p Payload
  //go p.UpdateToS3()
  Queue <- p
  w.Write([]byte("操作成功"))
}
// 處理任務
func StartProcessor() {
  for {
    select {
    case payload := <-Queue:
      payload.UpdateToS3()
    }
  }
}
func main() {
  http.HandleFunc("/payload", payloadHandler)
  //單獨開一個g接收與處理任務
  go StartProcessor()
  log.Fatal(http.ListenAndServe(":8099", nil))
}

這一版藉助帶 buffered 的 channel 來完成這個功能,這樣控制住了無限制的 G,但是依然沒有解決問題。

原因是處理請求是一個同步的操作,每次只會處理一個任務,然而高併發下請求進來的速度會遠遠超過處理的速度。這種情況,一旦 channel 滿了之後, 後續的請求將會被阻塞等待。然後你會發現,響應的時間會大幅度的開始增加, 甚至不再有任何的響應。

終版

package main
import (
"fmt"
"log"
"net/http"
"time"
)
const (
  MaxWorker = 100 //隨便設置值
  MaxQueue  = 200 // 隨便設置值
)
// 一個可以發送工作請求的緩衝 channel
var JobQueue chan Job
func init() {
  JobQueue = make(chan Job, MaxQueue)
}
type Payload struct{}
type Job struct {
  PayLoad Payload
}
type Worker struct {
  WorkerPool chan chan Job
  JobChannel chan Job
  quit       chan bool
}
func NewWorker(workerPool chan chan Job) Worker {
  return Worker{
    WorkerPool: workerPool,
    JobChannel: make(chan Job),
    quit:       make(chan bool),
  }
}
// Start 方法開啓一個 worker 循環,監聽退出 channel,可按需停止這個循環
func (w Worker) Start() {
  go func() {
    for {
      // 將當前的 worker 註冊到 worker 隊列中
      w.WorkerPool <- w.JobChannel
      select {
      case job := <-w.JobChannel:
        //   真正業務的地方
        //  模擬操作耗時
        time.Sleep(500 * time.Millisecond)
        fmt.Printf("上傳成功:%v\n", job)
      case <-w.quit:
        return
      }
    }
  }()
}
func (w Worker) stop() {
  go func() {
    w.quit <- true
  }()
}
// 初始化操作
type Dispatcher struct {
  // 註冊到 dispatcher 的 worker channel 池
  WorkerPool chan chan Job
}
func NewDispatcher(maxWorkers int) *Dispatcher {
  pool := make(chan chan Job, maxWorkers)
  return &Dispatcher{WorkerPool: pool}
}
func (d *Dispatcher) Run() {
  // 開始運行 n 個 worker
  for i := 0; i < MaxWorker; i++ {
    worker := NewWorker(d.WorkerPool)
    worker.Start()
  }
  go d.dispatch()
}
func (d *Dispatcher) dispatch() {
  for {
    select {
    case job := <-JobQueue:
      go func(job Job) {
        // 嘗試獲取一個可用的 worker job channel,阻塞直到有可用的 worker
        jobChannel := <-d.WorkerPool
        // 分發任務到 worker job channel 中
        jobChannel <- job
      }(job)
    }
  }
}
// 接收請求,把任務篩入JobQueue。
func payloadHandler(w http.ResponseWriter, r *http.Request) {
  work := Job{PayLoad: Payload{}}
  JobQueue <- work
  _, _ = w.Write([]byte("操作成功"))
}
func main() {
  // 通過調度器創建worker,監聽來自 JobQueue的任務
  d := NewDispatcher(MaxWorker)
  d.Run()
  http.HandleFunc("/payload", payloadHandler)
  log.Fatal(http.ListenAndServe(":8099", nil))
}

最終採用的是兩級 channel,一級是將用戶請求數據放入到 chan Job 中,這個 channel job 相當於待處理的任務隊列。

另一級用來存放可以處理任務的 work 緩存隊列,類型爲 chan chan Job。調度器把待處理的任務放入一個空閒的緩存隊列當中,work 會一直處理它的緩存隊列。通過這種方式,實現了一個 worker 池。大致畫了一個圖幫助理解,

首先我們在接收到一個請求後,創建 Job 任務,把它放入到任務隊列中等待 work 池處理。

func payloadHandler(w http.ResponseWriter, r *http.Request) {
  job := Job{PayLoad: Payload{}}
  JobQueue <- work
  _, _ = w.Write([]byte("操作成功"))
}

調度器初始化 work 池後,在 dispatch 中,一旦我們接收到 JobQueue 的任務,就去嘗試獲取一個可用的 worker,分發任務給 worker 的 job channel 中。 注意這個過程不是同步的,而是每接收到一個 job,就開啓一個 G 去處理。這樣可以保證 JobQueue 不需要進行阻塞,對應的往 JobQueue 理論上也不需要阻塞地寫入任務。

func (d *Dispatcher) Run() {
  // 開始運行 n 個 worker
  for i := 0; i < MaxWorker; i++ {
    worker := NewWorker(d.WorkerPool)
    worker.Start()
  }
  go d.dispatch()
}
func (d *Dispatcher) dispatch() {
  for {
    select {
    case job := <-JobQueue:
      go func(job Job) {
        // 嘗試獲取一個可用的 worker job channel,阻塞直到有可用的 worker
        jobChannel := <-d.WorkerPool
        // 分發任務到 worker job channel 中
        jobChannel <- job
      }(job)
    }
  }
}

這裏 "不可控" 的 G 和上面還是又所不同的。僅僅極短時間內處於阻塞讀 Chan 狀態, 當有空閒的 worker 被喚醒,然後分發任務,整個生命週期遠遠短於上面的操作。

最後,強烈建議看一下原文,原文地址在 [1]

另外,之前一直沒有個人 blog,這兩天搭了一個,打算以文檔的形式開啓一個 Go 面試集錦,整合一些好的外部資源,敬請期待。

附錄

[1]http://marcio.io/2015/07/handling-1-million-requests-per-minute-with-golang/

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/BPCxSYEr6ww2F0b9tqPdSQ