使用 Go 每分鐘處理百萬請求
圖片拍攝於 2021 年 4 月 4 日,杭州西湖。
介紹
偶然間看到一篇寫於 15 年的文章,說實話,標題確實吸引了我。
關於這篇文章,我就不直接翻譯了,原文地址我放在文章最後了。
項目的需求就是很簡單,客戶端發送請求,服務端接收請求處理數據 (原文是把資源上傳至 Amazon S3 資源中)。本質上就是這樣,
我稍微改動了原文的業務代碼,但是並不影響核心模塊。在第一版中,每收到一個 Request,開啓一個 G 進行處理,很常規的操作。
初版
package main
import (
"fmt"
"log"
"net/http"
"time"
)
type Payload struct {
// 傳啥不重要
}
func (p *Payload) UpdateToS3() error {
//存儲邏輯,模擬操作耗時
time.Sleep(500 * time.Millisecond)
fmt.Println("上傳成功")
return nil
}
func payloadHandler(w http.ResponseWriter, r *http.Request) {
// 業務過濾
// 請求body解析......
var p Payload
go p.UpdateToS3()
w.Write([]byte("操作成功"))
}
func main() {
http.HandleFunc("/payload", payloadHandler)
log.Fatal(http.ListenAndServe(":8099", nil))
}
這樣操作存在什麼問題呢?一般情況下,沒什麼問題。但是如果是高併發的場景下,不對 G 進行控制,你的 CPU 使用率暴漲,內存佔用暴漲......,直至程序奔潰。
如果此操作落地至數據庫,例如 mysql。相應的,你數據庫服務器的磁盤 IO、網絡帶寬 、CPU 負載、內存消耗都會達到非常高的情況,一併奔潰。所以,一旦程序中出現不可控的事物,往往是危險的信號。
中版
package main
import (
"fmt"
"log"
"net/http"
"time"
)
const MaxQueue = 400
var Queue chan Payload
func init() {
Queue = make(chan Payload, MaxQueue)
}
type Payload struct {
// 傳啥不重要
}
func (p *Payload) UpdateToS3() error {
//存儲邏輯,模擬操作耗時
time.Sleep(500 * time.Millisecond)
fmt.Println("上傳成功")
return nil
}
func payloadHandler(w http.ResponseWriter, r *http.Request) {
// 業務過濾
// 請求body解析......
var p Payload
//go p.UpdateToS3()
Queue <- p
w.Write([]byte("操作成功"))
}
// 處理任務
func StartProcessor() {
for {
select {
case payload := <-Queue:
payload.UpdateToS3()
}
}
}
func main() {
http.HandleFunc("/payload", payloadHandler)
//單獨開一個g接收與處理任務
go StartProcessor()
log.Fatal(http.ListenAndServe(":8099", nil))
}
這一版藉助帶 buffered 的 channel 來完成這個功能,這樣控制住了無限制的 G,但是依然沒有解決問題。
原因是處理請求是一個同步的操作,每次只會處理一個任務,然而高併發下請求進來的速度會遠遠超過處理的速度。這種情況,一旦 channel 滿了之後, 後續的請求將會被阻塞等待。然後你會發現,響應的時間會大幅度的開始增加, 甚至不再有任何的響應。
終版
package main
import (
"fmt"
"log"
"net/http"
"time"
)
const (
MaxWorker = 100 //隨便設置值
MaxQueue = 200 // 隨便設置值
)
// 一個可以發送工作請求的緩衝 channel
var JobQueue chan Job
func init() {
JobQueue = make(chan Job, MaxQueue)
}
type Payload struct{}
type Job struct {
PayLoad Payload
}
type Worker struct {
WorkerPool chan chan Job
JobChannel chan Job
quit chan bool
}
func NewWorker(workerPool chan chan Job) Worker {
return Worker{
WorkerPool: workerPool,
JobChannel: make(chan Job),
quit: make(chan bool),
}
}
// Start 方法開啓一個 worker 循環,監聽退出 channel,可按需停止這個循環
func (w Worker) Start() {
go func() {
for {
// 將當前的 worker 註冊到 worker 隊列中
w.WorkerPool <- w.JobChannel
select {
case job := <-w.JobChannel:
// 真正業務的地方
// 模擬操作耗時
time.Sleep(500 * time.Millisecond)
fmt.Printf("上傳成功:%v\n", job)
case <-w.quit:
return
}
}
}()
}
func (w Worker) stop() {
go func() {
w.quit <- true
}()
}
// 初始化操作
type Dispatcher struct {
// 註冊到 dispatcher 的 worker channel 池
WorkerPool chan chan Job
}
func NewDispatcher(maxWorkers int) *Dispatcher {
pool := make(chan chan Job, maxWorkers)
return &Dispatcher{WorkerPool: pool}
}
func (d *Dispatcher) Run() {
// 開始運行 n 個 worker
for i := 0; i < MaxWorker; i++ {
worker := NewWorker(d.WorkerPool)
worker.Start()
}
go d.dispatch()
}
func (d *Dispatcher) dispatch() {
for {
select {
case job := <-JobQueue:
go func(job Job) {
// 嘗試獲取一個可用的 worker job channel,阻塞直到有可用的 worker
jobChannel := <-d.WorkerPool
// 分發任務到 worker job channel 中
jobChannel <- job
}(job)
}
}
}
// 接收請求,把任務篩入JobQueue。
func payloadHandler(w http.ResponseWriter, r *http.Request) {
work := Job{PayLoad: Payload{}}
JobQueue <- work
_, _ = w.Write([]byte("操作成功"))
}
func main() {
// 通過調度器創建worker,監聽來自 JobQueue的任務
d := NewDispatcher(MaxWorker)
d.Run()
http.HandleFunc("/payload", payloadHandler)
log.Fatal(http.ListenAndServe(":8099", nil))
}
最終採用的是兩級 channel,一級是將用戶請求數據放入到 chan Job 中,這個 channel job 相當於待處理的任務隊列。
另一級用來存放可以處理任務的 work 緩存隊列,類型爲 chan chan Job。調度器把待處理的任務放入一個空閒的緩存隊列當中,work 會一直處理它的緩存隊列。通過這種方式,實現了一個 worker 池。大致畫了一個圖幫助理解,
首先我們在接收到一個請求後,創建 Job 任務,把它放入到任務隊列中等待 work 池處理。
func payloadHandler(w http.ResponseWriter, r *http.Request) {
job := Job{PayLoad: Payload{}}
JobQueue <- work
_, _ = w.Write([]byte("操作成功"))
}
調度器初始化 work 池後,在 dispatch 中,一旦我們接收到 JobQueue 的任務,就去嘗試獲取一個可用的 worker,分發任務給 worker 的 job channel 中。 注意這個過程不是同步的,而是每接收到一個 job,就開啓一個 G 去處理。這樣可以保證 JobQueue 不需要進行阻塞,對應的往 JobQueue 理論上也不需要阻塞地寫入任務。
func (d *Dispatcher) Run() {
// 開始運行 n 個 worker
for i := 0; i < MaxWorker; i++ {
worker := NewWorker(d.WorkerPool)
worker.Start()
}
go d.dispatch()
}
func (d *Dispatcher) dispatch() {
for {
select {
case job := <-JobQueue:
go func(job Job) {
// 嘗試獲取一個可用的 worker job channel,阻塞直到有可用的 worker
jobChannel := <-d.WorkerPool
// 分發任務到 worker job channel 中
jobChannel <- job
}(job)
}
}
}
這裏 "不可控" 的 G 和上面還是又所不同的。僅僅極短時間內處於阻塞讀 Chan 狀態, 當有空閒的 worker 被喚醒,然後分發任務,整個生命週期遠遠短於上面的操作。
最後,強烈建議看一下原文,原文地址在 [1]
另外,之前一直沒有個人 blog,這兩天搭了一個,打算以文檔的形式開啓一個 Go 面試集錦,整合一些好的外部資源,敬請期待。
附錄
[1]http://marcio.io/2015/07/handling-1-million-requests-per-minute-with-golang/
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/BPCxSYEr6ww2F0b9tqPdSQ