Go 經典阻塞式 TCP 協議流解析的實踐

1. Go 經典阻塞 I/O 的 TCP 網絡編程模型

Go 語言誕生十多年來取得了飛速發展,並得到了全世界開發者的廣泛接納和應用,其應用領域廣泛,包括:Web 服務、數據庫、網絡編程、系統編程、DevOps、安全檢測與管控、數據科學以及人工智能等。下面是 2020 年 Go 官方開發者調查的部分結果:

圖:2020 年 Go 官方開發者調查之 Go 語言的應用領域 (對比 2019)

我們看到 “Web 編程” “網絡編程” 分別位列第一名和第四名,這個應用領域數據分佈與 Go 語言最初的面向大規模分佈式網絡服務的設計目標十分契合。網絡通信這塊是服務端程序必不可少也是至關重要的一部分。Go 標準庫的 net 包是在 Go 中進行網絡編程的基礎。即便您沒有直接使用到 net 包中有關 TCP Socket[1] 方面的函數 / 方法或接口,但 net/http 包想必大家總是用過的,http 包實現的是 HTTP 這個應用層協議,其在傳輸層使用的依舊是 TCP Socket。

Go 是自帶運行時的跨平臺編程語言,由於 Go 運行時調度的需要,Go 基於 I/O 多路複用機制 (linux 上使用 epoll,macOS 和 freebsd 上使用 kqueue) 設計和實現了一套適合自己的 TCP Socket 網絡編程模型。並且,Go 秉承了自己一貫的追求簡單的設計哲學 [2],Go 向語言使用者暴露了簡單的 TCP Socket API 接口,而將 Go TCP socket 網絡編程的 “複雜性” 留給了自己並隱藏在 Go 運行時的實現中。這樣,大多數情況下,Go 開發者無需關心 Socket 是否是阻塞的,也無需親自將 Socket 文件描述符的回調函數註冊到類似 epoll 這樣的系統調用中,而只需在每個連接對應的 goroutine 中以最簡單最易用的 “阻塞 I/O 模型” 的方式進行 Socket 操作即可(像下圖所示),這種設計大大降低了網絡應用開發人員的心智負擔。

這是經典的 Go tcp 網絡編程模型。由於 TCP 是全雙工模型,每一端 (peer) 都可以單獨在已經建立的連接上進行讀寫,因此在 Go 中,我們常常針對一個已建立的 TCP 連接建立兩個 goroutine,一個負責從連接上讀取數據(如需響應(ack),也可以由該 read goroutine 直接回復),一個負責將新生成的業務數據寫入連接。

read goroutine 爲例,其典型的程序結構如下:

func handleConn(c net.Conn) {
    defer c.Close()
    for {
        // read from the connection c
        ... ...
        // write ack to the connection c
        ... ...
    }
}

func main() {
    l, err := net.Listen("tcp"":8888")
    if err != nil {
        fmt.Println("listen error:", err)
        return
    }

    for {
        c, err := l.Accept()
        if err != nil {
            fmt.Println("accept error:", err)
            break
        }
        // start a new goroutine to handle
        // the new connection.
        go handleConn(c) // start a read goroutine
    }
}

從上面代碼,我們看到,針對每一個向 server 建立成功的連接,程序都會啓動一個 reader goroutine 負責從連接讀取數據,並在處理後,返回 (向連接寫入) 響應(ack)。這樣的程序結構已經直白到無法再直白了,即便你是網絡編程小白,看懂這樣的程序想必也不會費多少腦細胞。

我們知道,TCP 傳輸控制協議是一種面向連接的、可靠的、基於字節流的傳輸層通信協議,因此 TCP socket 編程多爲流數據 (streaming) 處理。這種數據的特點是按序逐個字節傳輸,在傳輸層沒有明顯的數據邊界(只有應用層能識別出協議數據的邊界,這個依賴應用層協議的定義)。TCP 發送端發送了 1000 個字節,TCP 接收端就會接收到 1000 個字節。發送端可能通過一次發送操作就發送了這 1000 個字節,但接收端可能通過 10 次讀取操作纔讀完這 1000 個字節,也就是說發送端的發送動作與接收端的接收動作並沒有嚴格的一一對應關係。這與 UDP 協議基於數據報 (diagram) 形式的數據傳輸形式有本質差別(更多關於 tcp 與 udp 差別的內容可以詳見《TCP/IP 詳解卷 1:協議》[3] 一書)。

本文我們就來了解一下基於經典 Go 阻塞式網絡 I/O 模型對基於 TCP 流的自定義協議進行解析的基本模式。

2. 自定義協議簡述

爲了便於後續內容展開,我們現在這裏說明一下我們即將解析的自定義流協議。基於 TCP 的自定義應用層流協議有兩種常見的定義模式:

採用長度字段分隔,常見的包括:mqtt(物聯網最常用的應用層協議之一)、cmpp(中國移動互聯網短信網關接口協議)[4] 等。

採用特定分隔符分割和識別,常見的包括 http 等。

這裏我們使用二進制模式來定義我們即將解析的應用層協議,下面是協議的定義:

這是一個請求應答協議,請求包和應答包的第一個字段都是包總長度,這也是在應用層用於 “分割包” 的最重要字段。第二個字段則是用於標識包類型,這裏我們定義四種類型:

onst (
    CommandConn   = iota + 0x01 // 0x01,連接請求包
    CommandSubmit               // 0x02,消息發送請求包
)

const (
    CommandConnAck   = iota + 0x80 // 0x81,連接請求的響應包
    CommandSubmitAck               //0x82,消息發送請求的響應包
)

ID 是每個連接上請求的消息流水,多用於請求發送方後續匹配響應包之用。請求包與響應包唯一的不同之處在於最後一個字段,請求包定義了有效載荷 (payload),而響應包則定義了請求包的響應狀態字段 (result)。

明確了應用層協議包的定義後,我們就來看看如何解析這樣的一個流協議吧。

3. 建立 Frame 和 Packet 抽象

在真正開始編寫代碼前,我們先來針對上述應用層協議建立兩個抽象概念:Frame 和 Packet。

首先,我們設定無論是從 client 到 server,還是 server 到 client,數據流都是由一個接一個 Frame 組成的,上述的協議就封裝在這一個個的 Frame 中。我們可以通過特定的方法將 Frame 與 Frame 分割開來:

每個 Frame 由一個 totalLength 和 frame payload 構成,如下圖左側 Frame 結構所示:

這樣,我們通過 Frame header: totalLength 即可將 Frame 之間隔離開來。我們將 Frame payload 定義爲一個 packet,每個 Packet 的結構如上圖右側所示。每個 packet 包含 commandID、ID 和 payload(packet payload) 字段。

這樣我們就將上述的協議轉換爲由 Frame 和 Packet 兩個抽象組成的 TCP 流了。

4. 阻塞式 TCP 流協議解析的基本程序結構

建立完抽象後,我們就要開始解析這個協議了!下圖是該阻塞式 TCP 流協議解析的 server 流程圖:

我們看到 tcp 流數據先後經由 frame decode 和 packet decode 後得到應用層所需的 packet 數據,應用層回覆的響應則先後經過 packet 的 encode 與 frame 的 encode 後寫入 tcp 響應流中。

下面我們就先來看看 frame 編解碼的代碼。我們首先定義 frame 編碼器的接口類型:

// github.com/bigwhite/experiments/tree/master/tcp-stream-proto/demo1/pkg/frame/frame.go

type FramePayload []byte

type StreamFrameCodec interface {
    Encode(io.Writer, FramePayload) error   // data -> frame,並寫入io.Writer
    Decode(io.Reader) (FramePayload, error) // 從io.Reader中提取frame payload,並返回給上層
}

我們將流數據的輸入定義爲 io.Reader,將流數據輸出定義爲 io.Writer。和上圖中的設計意義,Decode 方法返回 framePayload,而 Encode 會將輸入的 framePayload 編碼爲 frame 並寫入 outbound 的 tcp 流。

一旦確定好接口方法集,我們就來給出一個 StreamFrameCodec 接口的實現:

// github.com/bigwhite/experiments/tree/master/tcp-stream-proto/demo1/pkg/frame/frame.go

type myFrameCodec struct{}

func NewMyFrameCodec() StreamFrameCodec {
    return &myFrameCodec{}
}

func (p *myFrameCodec) Encode(w io.Writer, framePayload FramePayload) error {
    var f = framePayload
    var totalLen int32 = int32(len(framePayload)) + 4

    err := binary.Write(w, binary.BigEndian, &totalLen)
    if err != nil {
        return err
    }

    // make sure all data will be written to outbound stream
    for {
        n, err := w.Write([]byte(f)) // write the frame payload to outbound stream
        if err != nil {
            return err
        }
        if n >= len(f) {
            break
        }
        if n < len(f) {
            f = f[n:]
        }
    }
    return nil
}

func (p *myFrameCodec) Decode(r io.Reader) (FramePayload, error) {
    var totalLen int32
    err := binary.Read(r, binary.BigEndian, &totalLen)
    if err != nil {
        return nil, err
    }

    buf := make([]byte, totalLen-4)
    _, err = io.ReadFull(r, buf)
    if err != nil {
        return nil, err
    }
    return FramePayload(buf), nil
}

在上面在這段實現中,有三點要注意:

接下來,我們再看看 Packet 的編解碼。和 Frame 不同,Packet 有多種類型 (這裏僅定義了 Conn, submit,connack, submit ack)。因此我們首先抽象一下這些類型需要遵循的共同接口:

// github.com/bigwhite/experiments/tree/master/tcp-stream-proto/demo1/pkg/packet/packet.go

type Packet interface {
    Decode([]byte) error     // []byte -> struct
    Encode() ([]byte, error) //  struct -> []byte
}

其中 Decode 是將一段字節流數據解碼爲一個 Packet 類型,可能是 conn,可能是 submit 等 (根據解碼出來的 commandID 判斷)。而 Encode 則是將一個 Packet 類型編碼爲一段字節流數據。下面是 submit 和 submitack 類型的 Packet 接口實現:

// github.com/bigwhite/experiments/tree/master/tcp-stream-proto/demo1/pkg/packet/packet.go

type Submit struct {
    ID      string
    Payload []byte
}

func (s *Submit) Decode(pktBody []byte) error {
    s.ID = string(pktBody[:8])
    s.Payload = pktBody[8:]
    return nil
}

func (s *Submit) Encode() ([]byte, error) {
    return bytes.Join([][]byte{[]byte(s.ID[:8]), s.Payload}, nil), nil
}

type SubmitAck struct {
    ID     string
    Result uint8
}

func (s *SubmitAck) Decode(pktBody []byte) error {
    s.ID = string(pktBody[0:8])
    s.Result = uint8(pktBody[8])
    return nil
}

func (s *SubmitAck) Encode() ([]byte, error) {
    return bytes.Join([][]byte{[]byte(s.ID[:8])[]byte{s.Result}}, nil), nil
}

不過上述各種類型的編解碼被調用的前提是明確數據流是什麼類型的,因此我們需要在包級提供一個對外的函數 Decode,該函數負責從字節流中解析出對應的類型 (根據 commandID),並調用對應類型的 Decode 方法:

// github.com/bigwhite/experiments/tree/master/tcp-stream-proto/demo1/pkg/packet/packet.go
func Decode(packet []byte) (Packet, error) {
 commandID := packet[0]
 pktBody := packet[1:]

 switch commandID {
 case CommandConn:
  return nil, nil
 case CommandConnAck:
  return nil, nil
 case CommandSubmit:
  s := Submit{}
  err := s.Decode(pktBody)
  if err != nil {
   return nil, err
  }
  return &s, nil
 case CommandSubmitAck:
  s := SubmitAck{}
  err := s.Decode(pktBody)
  if err != nil {
   return nil, err
  }
  return &s, nil
 default:
  return nil, fmt.Errorf("unknown commandID [%d]", commandID)
 }
}

同樣,我們也需要包級的 Encode 函數,根據傳入的 packet 類型調用對應的 Encode 方法實現對象的編碼:

// github.com/bigwhite/experiments/tree/master/tcp-stream-proto/demo1/pkg/packet/packet.go
func Encode(p Packet) ([]byte, error) {
 var commandID uint8
 var pktBody []byte
 var err error

 switch t := p.(type) {
 case *Submit:
  commandID = CommandSubmit
  pktBody, err = p.Encode()
  if err != nil {
   return nil, err
  }
 case *SubmitAck:
  commandID = CommandSubmitAck
  pktBody, err = p.Encode()
  if err != nil {
   return nil, err
  }
 default:
  return nil, fmt.Errorf("unknown type [%s]", t)
 }
 return bytes.Join([][]byte{[]byte{commandID}, pktBody}, nil), nil
}

好了,萬事俱備只欠東風!下面我們就來編寫程序結構,將 tcp conn 與 Frame、Packet 連接起來:

// github.com/bigwhite/experiments/tree/master/tcp-stream-proto/demo1/cmd/server/main.go

package main

import (
 "fmt"
 "net"

 "github.com/bigwhite/tcp-stream-proto/demo1/pkg/frame"
 "github.com/bigwhite/tcp-stream-proto/demo1/pkg/packet"
)

func handlePacket(framePayload []byte) (ackFramePayload []byte, err error) {
 var p packet.Packet
 p, err = packet.Decode(framePayload)
 if err != nil {
  fmt.Println("handleConn: packet decode error:", err)
  return
 }

 switch p.(type) {
 case *packet.Submit:
  submit := p.(*packet.Submit)
  fmt.Printf("recv submit: id = %s, payload=%s\n", submit.ID, string(submit.Payload))
  submitAck := &packet.SubmitAck{
   ID:     submit.ID,
   Result: 0,
  }
  ackFramePayload, err = packet.Encode(submitAck)
  if err != nil {
   fmt.Println("handleConn: packet encode error:", err)
   return nil, err
  }
  return ackFramePayload, nil
 default:
  return nil, fmt.Errorf("unknown packet type")
 }
}

func handleConn(c net.Conn) {
 defer c.Close()
 frameCodec := frame.NewMyFrameCodec()

 for {
  // read from the connection

  // decode the frame to get the payload
  // the payload is undecoded packet
  framePayload, err := frameCodec.Decode(c)
  if err != nil {
   fmt.Println("handleConn: frame decode error:", err)
   return
  }

  // do something with the packet
  ackFramePayload, err := handlePacket(framePayload)
  if err != nil {
   fmt.Println("handleConn: handle packet error:", err)
   return
  }

  // write ack frame to the connection
  err = frameCodec.Encode(c, ackFramePayload)
  if err != nil {
   fmt.Println("handleConn: frame encode error:", err)
   return
  }
 }
}

func main() {
 l, err := net.Listen("tcp"":8888")
 if err != nil {
  fmt.Println("listen error:", err)
  return
 }

 for {
  c, err := l.Accept()
  if err != nil {
   fmt.Println("accept error:", err)
   break
  }
  // start a new goroutine to handle
  // the new connection.
  go handleConn(c)
 }
}

在上面這個程序中,main 函數是標準的 “one connection per goroutine” 的結構,重點邏輯都在 handleConn 中。在 handleConn 中,我們看到十分清晰的代碼結構:

read conn
 ->frame decode
  -> handle packet
   -> packet decode
   -> packet(ack) encode
 ->frame(ack) encode
write conn

到這裏,一個經典阻塞式 TCP 流解析的 demo 就完成了 (你可以將 demo 中提供的 client 和 server run 起來驗證一下)。

5. 可能的優化點

在上面的 demo1 中,我們直接將 net.Conn 實例傳給 frame.Decode 作爲 io.Reader 參數的實參,這樣我們每次調用 Read 方法都是直接從 Conn 中讀取數據。不過 Go runtime 使用 net poller 將 net.Conn.Read 轉換爲 io 多路複用的等待,避免了每次從 net.Conn 直接讀取都轉換爲一次系統調用。但即便如此,也可能會多一次 goroutine 的上下文切換 (在數據尚未 ready 的情況下)。雖然 goroutine 的上下文切換代價相較於線程切換要小許多,但畢竟這種切換並不是免費的,我們要減少這種切換。我們可以通過緩存讀的方式來減少 net.Conn.Read 真實調用的頻率。我們可以像下面這樣改造 demo1 的例子:

// github.com/bigwhite/experiments/tree/master/tcp-stream-proto/demo2/cmd/server/main.go

func handleConn(c net.Conn) {
    defer c.Close()
    frameCodec := frame.NewMyFrameCodec()
    rbuf := bufio.NewReader(c) // 爲io增加緩存

    for {
        // read from the connection

        // decode the frame to get the payload
        // the payload is undecoded packet
        framePayload, err := frameCodec.Decode(rbuf) // 使用bufio,減少直接read conn.Conn的次數
        if err != nil {
            fmt.Println("handleConn: frame decode error:", err)
            return
        }
        ... ...
    }
    ... ...
}

bufio 內部每次從 net.Conn 嘗試讀取其內部緩存 (buf) 大小的數據,而不是用戶傳入的希望讀取的數據大小。這些數據緩存在內存中,這樣後續 Read 就可以直接從內存中得到數據,而不是每次都從 net.Conn 讀取,從而降低 goroutine 上下文切換的頻率。

除此之外,我們在 frame 包中的 frame Decode 實現如下:

// github.com/bigwhite/experiments/tree/master/tcp-stream-proto/demo2/pkg/frame/frame.go

func (p *myFrameCodec) Decode(r io.Reader) (FramePayload, error) {
    var totalLen int32
    err := binary.Read(r, binary.BigEndian, &totalLen)
    if err != nil {
        return nil, err
    }

    buf := make([]byte, totalLen-4)
    _, err = io.ReadFull(r, buf)
    if err != nil {
        return nil, err
    }
    return FramePayload(buf), nil
}

我們看到每次調用這個方法都會分配一個 buf,並且 buf 是不定長的,這些在程序關鍵路徑上的堆內存對象分配會給 GC 帶來壓力,我們要儘量避免或減小其頻度,一個可行的辦法是儘量重用對象,在 Go 中一提到重用內存對象,我們就想到了 sync.Pool,但這裏還有一個問題,那就是 “不定長”,這給 sync.Pool 的使用增加了難度。

mcache[6] 是字節技術團隊開源的多級 sync.Pool 包,它可以根據你所要分配的對象大小選擇不同的 sync.Pool 池,有些類似 tcmalloc 的多級 (class) 內存對象管理,與 Go runtime 的 mcache 也是類似的,mcache 一共分爲 46 個等級,每個等級一個 sync.Pool:

// github.com/bytedance/gopkg/tree/master/lang/mcache/mcache.go
const maxSize = 46

// index contains []byte which cap is 1<<index
var caches [maxSize]sync.Pool

我們可以從 mcache 中分配內存來換掉每次都申請一個 []byte 的動作以達到內存對象重用,降低 GC 壓力的目的:

// github.com/bigwhite/experiments/tree/master/tcp-stream-proto/demo3/pkg/frame/frame.go

func (p *myFrameCodec) Decode(r io.Reader) (FramePayload, error) {
    var totalLen int32
    err := binary.Read(r, binary.BigEndian, &totalLen)
    if err != nil {
        return nil, err
    }

    buf := mcache.Malloc(int(totalLen - 4))  // 這裏我們重用mcache中的內存對象
    _, err = io.ReadFull(r, buf)
    if err != nil {
        return nil, err
    }
    return FramePayload(buf), nil
}

有了 mcache.Malloc,我們就需要在特定位置調用 mcache.Free 歸還內存對象,而 packet 中的 Decode 就是最好的位置:

// github.com/bigwhite/experiments/tree/master/tcp-stream-proto/demo3/pkg/packet/packet.go

func Decode(packet []byte) (Packet, error) {
    defer mcache.Free(packet) // 在decode結束後,釋放對象回mcache
    commandID := packet[0]
    pktBody := packet[1:]
    ... ...
}

上面是兩個在不動用 pprof 這樣的工具的前提下就能識別出的較爲明顯的可優化的點,可優化的點可能還有很多,這裏不一一列舉了。

6. 簡單的壓力測試

既然給出了優化的點,我們就來粗略壓測一下優化前和優化後的程序。我們爲兩個版本程序添加上基於標準庫 expvar 的計數器 (以優化前的 demo1 爲例):

// github.com/bigwhite/experiments/tree/master/tcp-stream-proto/demo1-with-metrics/cmd/server/main.go

func handleConn(c net.Conn) {
    defer c.Close()
    frameCodec := frame.NewMyFrameCodec()

        for {
        // read from the connection
        ... ...
        // write ack frame to the connection
        err = frameCodec.Encode(c, ackFramePayload)
        if err != nil {
            fmt.Println("handleConn: frame encode error:", err)
            return
        }   
        monitor.SubmitInTotal.Add(1) // 每處理完一條消息,計數器+1
    }   
}

在 monitor 包中,我們每秒計算一下處理性能:

// github.com/bigwhite/experiments/tree/master/tcp-stream-proto/demo1-with-metrics/pkg/monitor/monitor.go
func init() {
    // register statistics index
    SubmitInTotal = expvar.NewInt("submitInTotal")
    submitInRate = expvar.NewInt("submitInRate")

    go func() {
        var lastSubmitInTotal int64

        ticker := time.NewTicker(time.Second)
        defer ticker.Stop()
        for {
            select {
            case <-ticker.C:
                newSubmitInTotal := SubmitInTotal.Value()
                submitInRate.Set(newSubmitInTotal - lastSubmitInTotal) // 兩秒處理的消息量之差作爲處理速度
                lastSubmitInTotal = newSubmitInTotal
            }
        }
    }()
}

有了基於 expvar 的計數器,我們就可以通過帶有導出 csv 功能的 expvarmon 工具獲取程序每秒的處理性能了(壓測客戶端可以使用 demo1-with-metrics 的 client)。下面的性能對比圖是在一個 4 核 8g 的雲主機上獲得的(條件有限,壓測 client 與 server 放在一臺機器上了,必然相互干擾):

我們看到,優化後的程序從趨勢上看略微好於優化前的 (雖然不是很穩定)。

如果你覺得采集瞬時值太夠專業 ^_^,也可以在被測程序上添加基於 go-metrics 的 metric,這個作業就留給大家了:)

7. 小結

在本文中,我們簡單說明了 Go 經典阻塞 I/O 的 TCP 網絡編程模型,這種模型最大的好處就是簡單,降低開發人員在處理網絡 I/O 時的心智負擔,將更多關注集中在業務層面。文中基於這種模型,給出了一個自定義流協議的解析實現框架,並說明了一些可優化的點。在非超大連接數量的場景下,這類模型會有不錯性能和開發效率。一旦連接數量猛增,相應的處理這些連接的 goroutine 數量就會線性增加,Goroutine 調度的開銷就會顯著增加,這個時候我們就要考慮是否使用其他模型應對了,這個我們在後續篇章再說。

本文涉及的所有代碼可以從這裏下載 [7]:https://github.com/bigwhite/experiments/tree/master/tcp-stream-proto

我的聯繫方式:

商務合作方式:撰稿、出書、培訓、在線課程、合夥創業、諮詢、廣告合作。

參考資料

[1]  TCP Socket: https://tonybai.com/2015/11/17/tcp-programming-in-golang/

[2]  追求簡單的設計哲學: https://www.imooc.com/read/87/article/2321

[3]  《TCP/IP 詳解卷 1:協議》: https://book.douban.com/subject/26825411/

[4]  cmpp(中國移動互聯網短信網關接口協議): https://github.com/bigwhite/gocmpp

[5]  大端字節序 (BigEndian): https://tonybai.com/2011/01/21/encounter-byte-order-problem-again/

[6]  mcache: https://github.com/bytedance/gopkg/tree/master/lang/mcache

[7]  這裏下載: https://github.com/bigwhite/experiments/tree/master/tcp-stream-proto

[8]  改善 Go 語⾔編程質量的 50 個有效實踐: https://www.imooc.com/read/87

[9]  Kubernetes 實戰:高可用集羣搭建、配置、運維與應用: https://coding.imooc.com/class/284.html

[10]  我愛發短信: https://51smspush.com/

[11]  鏈接地址: https://m.do.co/c/bff6eed92687

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/NG3f-KkjtJBTVdRHQKLXOg