Go 基於 I-O 多路複用的 TCP 協議流解析實踐

《Go 經典阻塞式 TCP 協議流解析的實踐》一文中,我們基於 Go 經典的阻塞 I/O 模型實現了一個基於 TCP 流的自定義協議的解析。這種 one-connection-per-goroutine 模型的優點就是簡單、好寫以及好理解,降低開發者心智負擔。但一旦連接數上來,goroutine 的數量就會線性增加。當面對海量連接的場景,這種模型將力不從心:系統中將存在大量 goroutine,goroutine 調度和切換的開銷過多。

那麼面對海量連接場景,應該如何解決呢?業界成熟方案:使用 I/O 多路複用模型。瞭解 Go net 包實現的朋友想必都知曉 Go 在運行時底層使用的也是 I/O 多路複用,其實現爲 runtime 中的 netpoll[1]。goroutine 層面獲得的 net.Conn(無論是 Accept 的,還是 Dial 得到的)都展現出 “阻塞” 的特徵,但這些 net.Conn 底層實現的 fd(文件描述符)在 netpoll 中都是 non-blocking(非阻塞)的,Go 運行時負責調用 epoll 等多路複用機制監視這些 fd 是否可讀或可寫,並適時喚醒 goroutine 繼續網絡 I/O 操作,這種方式減少了系統調用,也減少了運行 Goroutine 的 M(操作系統線程)因系統調用陷入內核態等待的頻率以及因阻塞失去 M 而不得不去創建新線程的數量。

那麼在用戶層面建立自己的 I/O 多路複用的不足在哪裏呢?複雜,不好寫,不好理解。但似乎也沒有其他更好的辦法。除非換語言,否則就得硬着頭皮上 ^_^。好在,Go 社區已經有幾個不錯的 Go 用戶層面非阻塞 I/O 多路複用的開發框架庫可供選擇,比如:evio[2]、gnet[3]、easygo[4] 等。我們選擇 gnet。但注意:選擇不代表推薦,這裏僅是來做這個實踐而已,是否使用 gnet 開發上生產的程序,需要你自己評估確定。

1. 基於 gnet 開發 TCP 流協議解析程序

用框架的一個門檻就是你要去學習框架本身。好在 gnet 提供了幾個很典型的 examples[5],我們可以基於其中的 custom_codec[6] 來快速開發我們的 TCP 流協議解析程序。

下面是基於 gnet 框架實現 custom codec 的一個關鍵循環,瞭解這個循環,我們就知道在什麼位置調用 Frame 編解碼以及 packet 編解碼了,這樣決定了後續 demo 程序的結構:

上面圖中右邊虛框中的 frame 編解碼、packet 編解碼以及 React 是用戶需要自己實現的,gnet 框架的 eventloop.loopRead 方法會循環調用 frame 編解碼和 React 以實現 TCP 流的處理以及響應的返回。有了這樣一張 “地圖”,我們就可以明確 demo 程序中各個包的大致位置了。

我們的 demo 改自 gnet 的例子 custom_codec[7],其 main 包結構來自於 custom_codec:

// github.com/bigwhite/experiments/tree/master/tcp-stream-proto/demo4/cmd/server/main.go

type customCodecServer struct {
 *gnet.EventServer
 addr       string
 multicore  bool
 async      bool
 codec      gnet.ICodec
 workerPool *goroutine.Pool
}

func (cs *customCodecServer) OnInitComplete(srv gnet.Server) (action gnet.Action) {
 log.Printf("custom codec server is listening on %s (multi-cores: %t, loops: %d)\n",
  srv.Addr.String(), srv.Multicore, srv.NumEventLoop)
 return
}

func customCodecServe(addr string, multicore, async bool, codec gnet.ICodec) {
 var err error
 codec = frame.Frame{}
 cs := &customCodecServer{addr: addr, multicore: multicore, async: async, codec: codec, workerPool: goroutine.Default()}
 err = gnet.Serve(cs, addr, gnet.WithMulticore(multicore), gnet.WithTCPKeepAlive(time.Minute*5), gnet.WithCodec(codec))
 if err != nil {
  panic(err)
 }
}

func main() {
 var port int
 var multicore bool

 // Example command: go run server.go --port 8888 --multicore=true
 flag.IntVar(&port, "port", 8888, "server port")
 flag.BoolVar(&multicore, "multicore", true, "multicore")
 flag.Parse()
 addr := fmt.Sprintf("tcp://:%d", port)
 customCodecServe(addr, multicore, false, nil)
}

針對上面代碼,有兩點要注意:

按上面流程圖的順序,gnet 從 conn 讀取的字節流將傳遞給我們的 frame 解碼器,下面我們看看基於 gnet 的 Frame 解碼器的實現 (我們的自定義協議定義可以參考《Go 經典阻塞式 TCP 協議流解析的實踐》一文):

// github.com/bigwhite/experiments/tree/master/tcp-stream-proto/demo4/pkg/frame/frame.go

type Frame []byte

func (cc Frame) Decode(c gnet.Conn) ([]byte, error) {
 // read length
 var frameLength uint32
 if n, header := c.ReadN(4); n == 4 {
  byteBuffer := bytes.NewBuffer(header)
  _ = binary.Read(byteBuffer, binary.BigEndian, &frameLength)

  if frameLength > 100 {
   c.ResetBuffer()
   return nil, errors.New("length value is wrong")
  }

  if n, wholeFrame := c.ReadN(int(frameLength)); n == int(frameLength) {
   c.ShiftN(int(frameLength)) // shift frame length
   return wholeFrame[4:], nil // return frame payload
  } else {
   return nil, errors.New("not enough frame payload data")
  }
 }
 return nil, errors.New("not enough frame length data")
}

上面 Frame 的 Decode 實現既負責 frame 解碼,同時也會對 frame 的當前數據完整性進行校驗,如果一個完整的 frame 尚未就緒,Decode 會返回錯誤,之後 gnet 還會在連接 (conn) 可讀時再次調用該 Decode 函數。這裏實現的關鍵就是 gnet.Conn.ReadN 這個方法,這個方法本質上是一個 Peek 操作 (gnet 稱之爲 lazyRead),即只預覽數據, 不挪動數據流中的“讀指針” 的位置。frame 未完全就緒時,gnet 在底層會使用 RingBuffer 存放已經到位的 frame 的部分數據。如果 frame 所有數據都就緒了,那麼 Decode 會調用 gnet.Conn.ShiftN 方法來挪動底層 RingBuffer 的 “讀指針” 的位置,表明這段數據已經被上層讀取了。

如果預讀取到的 frame 長度過長(這裏代碼中的 100 是一個魔數,僅做 demo 演示之用,你可以根據實際情況使用 frame 可能的最大值),則會清空當前緩存並返回錯誤。(但 gnet 並沒有因此而斷開與客戶端的連接,這塊兒 gnet 的機制是否合理還有待商榷。)

如果解碼順利,根據我們自定義的協議 spec,我們會將 frame 的 payload 返回,即從 frame 的第五個字節開始返回。

從上圖看到,frame Decode 返回的 payload 將作爲輸入數據傳給 eventHandler.React 方法,這個方法也是我們自己實現的:

// github.com/bigwhite/experiments/tree/master/tcp-stream-proto/demo4/cmd/server/main.go

func (cs *customCodecServer) React(framePayload []byte, c gnet.Conn) (out []byte, action gnet.Action) {
 var p packet.Packet
 var ackFramePayload []byte
 p, err := packet.Decode(framePayload)
 if err != nil {
  fmt.Println("react: packet decode error:", err)
  action = gnet.Close // close the connection
  return
 }

 switch p.(type) {
 case *packet.Submit:
  submit := p.(*packet.Submit)
  fmt.Printf("recv submit: id = %s, payload=%s\n", submit.ID, string(submit.Payload))
  submitAck := &packet.SubmitAck{
   ID:     submit.ID,
   Result: 0,
  }
  ackFramePayload, err = packet.Encode(submitAck)
  if err != nil {
   fmt.Println("handleConn: packet encode error:", err)
   action = gnet.Close // close the connection
   return
  }
  out = []byte(ackFramePayload)
  return
 default:
  return nil, gnet.Close // close the connection
 }
}

在 React 中,我們利用 packet 包對傳入的 frame payload 進行 Decode 並處理得到的 Packet,處理後將 packet 響應進行編碼 (encode),編碼後得到的字節序列(ackFramePayload) 將作爲 React 的第一個返回值 out 返回。

frame 會對 React 返回的 ackFramePayload 進行 Encode,編碼後的字節序列將被 gnet 寫入 outbound 的 tcp 流中去:

// github.com/bigwhite/experiments/tree/master/tcp-stream-proto/demo4/pkg/frame/frame.go

func (cc Frame) Encode(c gnet.Conn, framePayload []byte) ([]byte, error) {
    result := make([]byte, 0)

    buffer := bytes.NewBuffer(result)

    // encode frame length(4+ framePayload length)
    length := uint32(4 + len([]byte(framePayload)))
    if err := binary.Write(buffer, binary.BigEndian, length); err != nil {
        s := fmt.Sprintf("Pack length error , %v", err)
        return nil, errors.New(s)
    }

    // encode frame payload
    n, err := buffer.Write(framePayload)
    if err != nil {
        s := fmt.Sprintf("Pack frame payload error , %v", err)
        return nil, errors.New(s)
    }

    if n != len(framePayload) {
        s := fmt.Sprintf("Pack frame payload length error , %v", err)
        return nil, errors.New(s)
    }

    return buffer.Bytes(), nil
}

這樣一個 loopRead 循環就完成了。我們可以使用《Go 經典阻塞式 TCP 協議流解析的實踐》一文中的 client 對該程序進行測試:

// demo2的client
$./client
2021/07/25 16:35:34 dial ok
send submit id = 00000001, payload=full-bluestreak-207e
the result of submit ack[00000001] is 0
send submit id = 00000002, payload=cosmic-spider-ham-2985
the result of submit ack[00000002] is 0
send submit id = 00000003, payload=true-forge-3552
the result of submit ack[00000003] is 0

// demo4的server
$./server
2021/07/25 16:35:31 custom codec server is listening on :8888 (multi-cores: true, loops: 8)
recv submit: id = 00000001, payload=full-bluestreak-207e
recv submit: id = 00000002, payload=cosmic-spider-ham-2985
recv submit: id = 00000003, payload=true-forge-3552

2. 壓測對比

gnet 針對內存分配、緩存重用等做了很多優化,我們來將其與阻塞 I/O 模型程序在性能上做一下簡單比較 (由於資源有限,我們這裏的壓測也和上一文中一樣,採用 100 個 client 連接盡力(best effort) 發送,而不是海量連接)。

下面是 demo1(阻塞 I/O 模型未優化)、demo3(阻塞 I/O 模型優化後) 以及 demo4(io 多路複用模型)的性能對比:

粗略來看,採用 gnet I/O 多路複用模型的程序 (demo4) 在性能上平均比阻塞 I/O 模型優化後的程序 (demo3) 高出 15%~20%。

不僅如此,通過 dstat 採集的系統監控數據也表明跑 demo4 時,cpu 系統時間 (sys) 佔用也比 demo3 少了 5 個點左右:

跑 demo3 時的 dstat -tcdngym 輸出:

----system---- ----total-cpu-usage---- -dsk/total- -net/total- ---paging-- ---system-- ------memory-usage-----
     time     |usr sys idl wai hiq siq| read  writ| recv  send|  in   out | int   csw | used  buff  cach  free
23-07 17:03:17|  2   1  97   0   0   0|3458B   19k|   0     0 |   0     0 | 535  2475 |1921M  225M 5354M 8386M
23-07 17:03:18| 40  45   5   0   0  11|   0     0 |  66B   54B|   0     0 |  11k   15k|1922M  225M 5354M 8384M
23-07 17:03:19| 39  46   6   0   0   9|   0     0 |  66B 1158B|   0     0 |  12k   18k|1922M  225M 5354M 8384M
23-07 17:03:20| 35  48   7   0   0  11|   0     0 |  66B  462B|   0     0 |  12k   22k|1922M  225M 5354M 8385M
23-07 17:03:21| 39  44   7   0   0  10|   0    12k|  66B  462B|   0     0 |  11k   16k|1922M  225M 5354M 8385M
23-07 17:03:22| 38  45   6   0   0  10|   0     0 |  66B  102B|   0     0 |  11k   16k|1923M  225M 5354M 8384M
23-07 17:03:23| 38  45   7   0   0  10|   0     0 |  66B  470B|   0     0 |  12k   20k|1923M  225M 5354M 8384M
23-07 17:03:24| 39  46   6   0   0   9|   0     0 |  66B  462B|   0     0 |  11k   19k|1923M  225M 5354M 8384M

跑 demo4 時的 dstat -tcdngym 輸出:

----system---- ----total-cpu-usage---- -dsk/total- -net/total- ---paging-- ---system-- ------memory-usage-----
     time     |usr sys idl wai hiq siq| read  writ| recv  send|  in   out | int   csw | used  buff  cach  free
24-07 20:28:38| 43  42   7   0   0   8|   0    20k|1050B   14k|   0     0 |  11k   18k|1954M  234M 5959M 7738M
24-07 20:28:39| 44  41   9   0   0   7|   0    16k| 396B 7626B|   0     0 |  11k   17k|1954M  234M 5959M 7739M
24-07 20:28:40| 43  42   6   0   0   8|   0     0 | 132B 7044B|   0     0 |  11k   16k|1954M  234M 5959M 7738M
24-07 20:28:41| 42  42   8   0   0   8|   0     0 | 630B   12k|   0     0 |  12k   20k|1955M  234M 5959M 7738M
24-07 20:28:42| 45  41   7   0   0   7|   0     0 | 726B 9980B|   0     0 |  11k   16k|1955M  234M 5959M 7738M

2. 異步迴應答

在上面的例子中,我們採用的是 gnet 同步迴應答的方式,gnet 還支持異步迴應答的方式,即將 React 中得到的 ackFramePayload 提交給 gnet 創建的一個 goroutine Worker 池,由 worker 池中的某個空閒 goroutine 在後續將 ackFramePayload 編碼爲一個完整的 ackFrame 後返回給 client 端。

要支持異步迴應答,我們需要對 demo4 做幾處修改(見 demo5),主要修改點都在 cmd/server/main.go 中。

第一處:main 函數調用 customCodecServe 時,將第三個參數 async 設置爲 true:

// github.com/bigwhite/experiments/tree/master/tcp-stream-proto/demo5/cmd/server/main.go

func main() {
 ... ...
 customCodecServe(addr, multicore, true, nil)
}

第二處:在 customCodecServer 的 React 方法中,我們得到編碼後的 ackFramePayload 後,不要立即將其賦值給 out 並返回,而是判斷是否要異步返回應答。如果異步返回應答,則將 ackFramePayload 提交給 workerpool,workerPool 後續會分配 goroutine,並通過 gnet.Conn 的 AsyncWrite 將應答寫回 client。如果非異步,在將 ackFramePayload 賦值給 out 並返回。

// github.com/bigwhite/experiments/tree/master/tcp-stream-proto/demo5/cmd/server/main.go

func (cs *customCodecServer) React(framePayload []byte, c gnet.Conn) (out []byte, action gnet.Action) {
 ... ...
 switch p.(type) {
 case *packet.Submit:
  submit := p.(*packet.Submit)
  fmt.Printf("recv submit: id = %s, payload=%s\n", submit.ID, string(submit.Payload))
  submitAck := &packet.SubmitAck{
   ID:     submit.ID,
   Result: 0,
  }
  ackFramePayload, err = packet.Encode(submitAck)
  if err != nil {
   fmt.Println("handleConn: packet encode error:", err)
   action = gnet.Close // close the connection
   return
  }
 default:
  return nil, gnet.Close // close the connection
 }

 if cs.async {
  data := append([]byte{}, ackFramePayload...)
  _ = cs.workerPool.Submit(func() {
   fmt.Println("handleConn: async write ackFramePayload")
   c.AsyncWrite(data)
  })
  return
 }
 out = ackFramePayload
 return
}

除此之外,其他包的代碼不變。我們依然還做個壓測,看看異步迴應答的 demo5 性能究竟如何!

從上圖來看,在這個場景下通過異步迴應答的方式,性能反而下降很多,甚至還不如阻塞式 I/O 模型的程序。對此沒有做深究,但猜測可能是應答過多且同時集中回覆時 workerpool 創建了很多 goroutine,不僅沒有起到池化的作用,還帶來的 goroutine 創建和調度的開銷。

3. 小結

在本文中,我們將阻塞式 I/O 模型換成了 I/O 多路複用模型,並基於 gnet 框架重新實現了自定義 TCP 流協議的解析程序。在同步迴應答的策略下,基於 gnet 開發 TCP 流協議解析程序相比於阻塞 I/O 模型程序的性能有一定提升。

本文涉及的所有代碼可以從這裏下載 [8]:https://github.com/bigwhite/experiments/tree/master/tcp-stream-proto


我的聯繫方式:

商務合作方式:撰稿、出書、培訓、在線課程、合夥創業、諮詢、廣告合作。

參考資料

[1]  netpoll: https://github.com/golang/go/tree/master/src/runtime/netpoll.go

[2]  evio: https://github.com/tidwall/evio

[3]  gnet: https://github.com/panjf2000/gnet

[4]  easygo: https://github.com/mailru/easygo

[5]  gnet 提供了幾個很典型的 examples: https://github.com/gnet-io/gnet-examples

[6]  custom_codec: https://github.com/gnet-io/gnet-examples/tree/master/examples/custom_codec

[7]  custom_codec: https://github.com/gnet-io/gnet-examples/tree/master/examples/custom_codec

[8]  這裏下載: https://github.com/bigwhite/experiments/tree/master/tcp-stream-proto

[9]  改善 Go 語⾔編程質量的 50 個有效實踐: https://www.imooc.com/read/87

[10]  Kubernetes 實戰:高可用集羣搭建、配置、運維與應用: https://coding.imooc.com/class/284.html

[11]  我愛發短信: https://51smspush.com/

[12]  鏈接地址: https://m.do.co/c/bff6eed92687

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/FH1fiK3-3fnkUxM6MA5Y8w