fasthttp 是如何做到比 net-http 快十倍的
小許之前分享過標準庫 net/http 的實現原理,不過有個 fasthttp 的庫號稱比 net/http 快十倍呢!(畢竟名字就帶 fast 呢😆)
哇,性能太強了吧,話不多說,本期小許和大家一起看看 fasthttp Server 端的底層實現,來看看到底是如何做到性能如此之快的,有哪些優秀的特性值得我們學習和借鑑的!
Server 端處理流程對比
在進行了解 fasthttp 底層代碼實現之前,我們先對兩者處理請求的方式進行一個回顧和對比,瞭解完兩者的基本的情況之後,再對 fasthttp 的實現最進一步分析。
net/http 處理流程
在小許文章《圖文講透 Golang 標準庫 net/http 實現原理 -- 服務端》中講的比較詳細了,這裏再把大致流程整理以下,整體流程如下:
-
1. 將路由和對應的 handler 註冊到一個 map 中,用做後續鍵值路由匹配
-
2. 註冊完之後就是開啓循環監聽連接,每獲取到一個連接就會創建一個 Goroutine 進行處理
-
3. 在創建好的 Goroutine 裏面會循環的等待接收請求數據,然後根據請求的地址去鍵值路由 map 中匹配對應的 handler
-
4. 執行匹配到的處理器 handler
net/http 的實現是一個連接新建一個 goroutine,如果在連接數非常多的時候,,每個連接都會創建一個 Goroutine 就會給系統帶來一定的壓力。這也就造成了 net/http 在處理高併發時的瓶頸。
每次來了一個連接,都要實例化一個連接對象,這誰受得了,哈哈
fasthttp 處理流程
再看看 fasthttp 處理請求的流程:
-
1. 啓動監聽
-
2. 循環監聽端口獲取連接,建立 workerPool
-
3. 循環嘗試獲取連接 net.Conn,先會去 ready 隊列裏獲取 workerChan,獲取不到就會去對象池獲取
-
4. 將獲取到的的連接 net.Conn 發送到 workerChan 的 channel 中
-
5. 開啓一個 Goroutine 一直循環獲取 workerChan 這個 channel 中的數據
-
6. 獲取到 channel 中的 net.Conn 之後就會對請求進行處理
workerChan 其實就是一個連接處理對象,這個對象裏面有一個 channel 用來傳遞連接;每個 workerChan 在後臺都會有一個 Goroutine 循環獲取 channel 中的連接,然後進行處理。
workerChan 是在 workerPool 臨時對象分別存取
fasthttp 爲什麼快
fasthttp 的優化主要有以下幾個點:
-
• 連接複用,如 slice 中有可複用的 workerChan 就從 ready 這個 slice 中獲取,沒有可複用的就在 workerChanPool 創建一個,萬一池子滿了(默認是 256 * 1024 個)就報錯。
-
• 對於內存複用,就是大量使用了 sync.Pool(你知道的,sync.Pool 複用對象有啥好處),有人統計過,用了整整 30 個 sync.Pool,context、request 對象、header、response 對象都用了 sync.Pool ....
-
• 利用 unsafe.Pointer 指針進行 []byte 和 string 轉換,避免 []byte 到 string 轉換時帶來的內存分配和拷貝帶來的消耗 。
知道了 fasthttp 爲什麼快,接下來我們看下它是如何處理監聽處理請求的,在哪些地方用到了這些特性。
底層實現
簡單案例
import (
"github.com/buaazp/fasthttprouter"
"github.com/valyala/fasthttp"
"log"
)
func main() {
//創建路由
r := fasthttprouter.New()
r.GET("/", Index)
if err := fasthttp.ListenAndServe(":8083", r.Handler); err != nil {
log.Fatalf("ListenAndServe fatal: %s", err)
}
}
func Index(ctx *fasthttp.RequestCtx) {
ctx.WriteString("hello xiaou code!")
}
這個案例同樣是幾樣代碼就啓動了一個服務。
創建路由、爲不同的路由執行關聯不同的處理函數 handler,接着跟 net/http 一樣調用 ListenAndServe 函數進行啓動服務監聽,等待請求進行處理。
workerPool 結構
workerpool 對象表示 連接處理 工作池,這樣可以控制連接建立後的處理方式,而不是像標準庫 net/http 一樣,對每個請求連接都啓動一個 goroutine 處理, 內部的 ready 字段存儲空閒的 workerChan 對象,workerChanPool 字段表示管理 workerChan 的對象池。
workerPool 結構體如下:
type workerPool struct {
//匹配請求對應的handler
WorkerFunc ServeHandler
//最大同時處理的請求數
MaxWorkersCount int
LogAllErrors bool
//最大空閒工作時間
MaxIdleWorkerDuration time.Duration
Logger Logger
//互斥鎖
lock sync.Mutex
//work數量
workersCount int
mustStop bool
// 空閒的 workerChan
ready []*workerChan
//是否關閉workerPool
stopCh chan struct{}
//sync.Pool workerChan 的對象池
workerChanPool sync.Pool
connState func(net.Conn, ConnState)
}
WorkerFunc :這個屬性挺重要的,因爲給它賦值的是 Server.serveConn
ready:存儲了空閒的 workerChan
workerChanPool:是 workerChan 的對象池,在 sync.Pool 中存取臨時對象,可減少內存分配
啓動服務
ListenAndServe 是啓動服務監聽的入口,內部的調用過程如下:
Server.Serve
Serve 方法爲來自給監聽到的連接提供處理服務,直到超過了最大限制(256 * 1024)纔會報錯。
func (s *Server) Serve(ln net.Listener) error {
//最大連接處理數
maxWorkersCount := s.getConcurrency()
s.mu.Lock()
s.ln = append(s.ln, ln)
if s.done == nil {
s.done = make(chan struct{})
}
if s.concurrencyCh == nil {
s.concurrencyCh = make(chan struct{}, maxWorkersCount)
}
s.mu.Unlock()
//workerPool進行初始化
wp := &workerPool{
WorkerFunc: s.serveConn,
MaxWorkersCount: maxWorkersCount,
LogAllErrors: s.LogAllErrors,
MaxIdleWorkerDuration: s.MaxIdleWorkerDuration,
Logger: s.logger(),
connState: s.setState,
}
//開啓協程,處理協程池的清理工作
wp.Start()
atomic.AddInt32(&s.open, 1)
defer atomic.AddInt32(&s.open, -1)
for {
// 阻塞等待,獲取連接net.Conn
if c, err = acceptConn(s, ln, &lastPerIPErrorTime); err != nil {
...
return err
}
s.setState(c, StateNew)
atomic.AddInt32(&s.open, 1)
//處理獲取到的連接net.Conn
if !wp.Serve(c) {
//未能處理,說明已達到最大worker限制
...
}
c = nil
}
}
從上面的註釋中我們可以看出 Server 方法主要做了以下幾件事:
-
1. 初始化 worker Pool,並啓動
-
2. net.Listener 循環接收請求
-
3. 將接收到的請求交給 workerChan 處理
注意:這裏如果超過了設定的最大連接數(默認是 256 * 1024 個)就直接報錯了
Start 開啓協程池
workerPool 進行初始化之後接着就調用 Start 開啓,這裏主要是指定 sync.Pool 變量 workerChanPool 的創建函數。
接着開啓一個協程,該 Goroutine 的目的是進行定時清理 workerPool 中的 ready 中保存的空閒 workerChan,清理頻率爲每 10s 啓動一次。
🚩清理規則是使用二進制搜索算法找出最近可以清理的工作者的索引
func (wp *workerPool) Start() {
//wp的關閉channel是否爲空
if wp.stopCh != nil {
return
}
wp.stopCh = make(chan struct{})
stopCh := wp.stopCh
//指定workerChanPool的創建函數
wp.workerChanPool.New = func() interface{} {
return &workerChan{
ch: make(chan net.Conn, workerChanCap),
}
}
//開啓協程
go func() {
var scratch []*workerChan
for {
//清理空閒超時的 workerChan
wp.clean(&scratch)
select {
case <-stopCh:
return
default:
// 間隔10 s
time.Sleep(wp.getMaxIdleWorkerDuration())
}
}
}()
}
開啓一個清理 Goroutine 的目的是爲了避免在流量高峯創建了大量協程,之後不再使用,造成協程浪費。
清理流程是在 wp.clean() 方法中實現的。
接收連接
acceptConn 函數通過調用 net.Listener 的 accept 方法去接受連接,這裏獲取連接的方式跟 net/http 調用的其實都是一樣的。
func acceptConn(s *Server, ln net.Listener, lastPerIPErrorTime *time.Time) (net.Conn, error) {
for {
c, err := ln.Accept()
if err != nil {
//err判斷
...
}
//校驗是否net.TCPConn連接
// 校驗每個ip對應的連接數
if s.MaxConnsPerIP > 0 {
pic := wrapPerIPConn(s, c)
if pic == nil {
...
continue
}
c = pic
}
return c, nil
}
}
獲取 workerChan
func (wp *workerPool) Serve(c net.Conn) bool {
//獲取 workerChan
ch := wp.getCh()
if ch == nil {
return false
}
//將連接放到channel中
ch.ch <- c
//返回true
return true
}
這裏調用的 getCh() 函數實現了獲取 workerChan,獲取到之後將之前接受的連接 net.Conn 放到 workerChan 結構體的 channel 通道中。
我們看下 workerChan 這個結構體
type workerChan struct {
lastUseTime time.Time
ch chan net.Conn
}
lastUseTime:最後一次被使用的時間,這個值在進行清理 workerChan 的時候是會用到的
ch:用來傳遞獲取到的連接 net.Conn,獲取到連接時接收,處理請求時獲取
getCh 方法:
func (wp *workerPool) getCh() *workerChan {
var ch *workerChan
createWorker := false
wp.lock.Lock()
//從ready隊列中拿workerChan
ready := wp.ready
n := len(ready) - 1
if n < 0 {
if wp.workersCount < wp.MaxWorkersCount {
createWorker = true
wp.workersCount++
}
} else {
//ready隊列不爲空,從隊尾拿workerChan
ch = ready[n]
//隊尾置爲nil
ready[n] = nil
//重新將ready賦值給wp.ready
wp.ready = ready[:n]
}
wp.lock.Unlock()
//ready中獲取不到workerChan,則從對象池中新建一個
if ch == nil {
if !createWorker {
return nil
}
vch := wp.workerChanPool.Get()
ch = vch.(*workerChan)
//開啓一個goroutine執行
go func() {
//處理ch中channel中的數據
wp.workerFunc(ch)
//處理完後將workerChan放回對象池
wp.workerChanPool.Put(vch)
}()
}
return ch
}
getCh() 方法的目的就是獲取 workerChan,流程如下:
-
• 先會去 ready 空閒隊列中獲取 workerChan
-
• ready 獲取不到則從對象池中創建一個新的 workerChan
-
• 並啓動 Goroutine 用來處理 channel 中的數據
workPool 中的 ready 是一個 FILO 的棧, 每次從隊尾取出 workChan
處理連接
func (wp *workerPool) workerFunc(ch *workerChan) {
var c net.Conn
var err error
for c = range ch.ch {
//channel的值是nil,退出
if c == nil {
break
}
//執行請求,並處理
if err = wp.WorkerFunc(c); err != nil && err != errHijacked {
...
}
...
//將當前workerChan放入ready隊列
if !wp.release(ch) {
break
}
}
wp.lock.Lock()
wp.workersCount--
wp.lock.Unlock()
}
執行流程
-
• 先遍歷 workerChan 的 channel,看是否有連接 net.Conn
-
• 獲取到連接之後就執行 WorkerFunc 函數處理請求
-
• 請求處理完之後將當前 workerChan 放入 ready 隊列
🚩 WorkerFunc 函數實際上是 Server 的 serveConn 方法
一開始開代碼的時候我還沒發現呢,細看了之後在 Server.Serve() 啓動服務時將 Server.serveConn() 方法賦值給了 workerPool 的 WorkerFunc()。
要想了解實現的朋友可以搜下這方面的代碼
func (s *Server) ServeConn(c net.Conn) error {
...
err := s.serveConn(c)
...
}
裏面的代碼會比較多,不過裏面的流程就是是獲取到請求的參數,找到對應的 handler 進行請求處理,然後返回 響應給客戶端。
這裏的實現代碼可以看到 context、request 對象的 sync.Pool 實現,這裏就不一一貼出來了。
總結
fasthttp 和 net/http 在實現上還是有較大區別,通過對實現原理的分析,知道了 fasthttp 速度快是利用了大量 sync.Pool 對象複用 、[]byte 和 string 利用萬能指針 unsafe.Pointer 進行轉換等優化技巧。
如果你的業務需要支撐較高的 QPS 並且保持一致的低延遲時間,那麼採用 fasthttp 是一個較好的選擇。不過 net/http 兼容性更高,在多數情況下反而是更好的選擇!
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/zxLO4IhLqQmIaUDzwwjU1w