慢聊 Golang 的 websocket 使用和實現代碼分析

前言

【你不知道的 websocket 協議,這次給你講明白!】中介紹了 web 端即時通訊的方式,以及 websocket 如何進行連接、驗證、數據幀的格式,這些都是瞭解 websocket 的基礎知識。

本期將會繼續上次話題,上篇主要是理論還是停留在文字層面,今天帶來的是 websocket 實操,分享它使用和底層實現!

相信很多使用 Golang 的小夥伴都知道 Gorilla 這個工具包,長久以來 gorilla/websocket 都是比官方包更好的 websocket 包。

題外話 gorilla:大猩猩 (不過這個猩猩還挺可愛的)

gorilla/websocket 框架開源地址爲: https://github.com/gorilla/websocket

今天小許就用【gorilla/websocket】框架來展開本期文章內容,文章會涉及到核心代碼的走讀,會涉及到不少代碼,需要小夥伴們保持耐心往下看,然後結合之前分享的 websocket 基礎,徹底學個明白!

簡單使用

安裝 Gorilla Websocket Go 軟件包,您只需要使用即可go get

go get github.com/gorilla/websocket

在正式使用之前我們先簡單瞭解下兩個數據結構 Upgrader 和 Conn

Upgrader

Upgrader 指定用於將 HTTP 連接升級到 WebSocket 連接

type Upgrader struct {
    
    HandshakeTimeout time.Duration
    
    ReadBufferSize, WriteBufferSize int

    WriteBufferPool BufferPool

    Subprotocols []string

    Error func(w http.ResponseWriter, r *http.Request, status int, reason error)

    CheckOrigin func(r *http.Request) bool

    EnableCompression bool
}

這裏一般會設置下 CheckOrigin 來解決跨域問題

Conn

Conn 類型表示 WebSocket 連接,這個結構體的組成包括兩部分,寫入字段(Write fields)和 讀取字段(Read fields)

type Conn struct {
    conn        net.Conn
    isServer    bool
    ...

    // Write fields
    writeBuf      []byte        
    writePool     BufferPool
    writeBufSize  int
    writer        io.WriteCloser 
    isWriting     bool           
    ...
    // Read fields
    readRemaining int64
    readFinal     bool  
    readLength    int64 
    messageReader *messageReader 
    ...
}

isServer :字段來區分我們是否用 Conn 作爲客戶端還是服務端,也就是說說 gorilla/websocket 中同時編寫客戶端程序和服務器程序,但是一般是 Web 應用程序使用單獨的前端作爲客戶端程序。

部分字段說明如下圖:

服務端示例

出於說明的目的,我們將在 Go 中同時編寫客戶端程序和服務端程序(其實小許是前端小趴菜😅 🤭)。

當然我們在開發程序的時候基本都是單獨的前端,通常使用(Javascript,vue 等)實現 websocket 客戶端,這裏爲了讓大家有比較直觀的感受,用【gorilla/websocket】分別寫了服務端和客戶端示例。

var upGrader = websocket.Upgrader{
    CheckOrigin: func(r *http.Request) bool {
        return true
    },
}

func main() {
    http.HandleFunc("/ws", wsUpGrader)
    err := http.ListenAndServe("localhost:8080", nil)
    if err != nil {
        log.Println("server start err", err)
    }
}

func wsUpGrader(w http.ResponseWriter, r *http.Request) {
    //轉換爲升級爲websocket
    conn, err := upGrader.Upgrade(w, r, nil)
    if err != nil {
        log.Println(err)
        return
    }
    //釋放連接
    defer conn.Close()

    for {
        //接收消息
        messageType, message, err := conn.ReadMessage()
        if err != nil {
            log.Println(err)
            return
        }
        log.Println("server receive messageType", messageType, "message", string(message))
        //發送消息
        err = conn.WriteMessage(messageType, []byte("pong"))
        if err != nil {
            log.Println(err)
            return
        }
    }
}

我們知道 websocket 協議是基於 http 協議進行 upgrade 升級的, 這裏使用 net/http 提供原始的 http 連接。

http.HandleFunc 接受兩個參數:第一個參數是字符串表示的 url 路徑,第二個參數是該 url 實際的處理對象

http.ListenAndServe 監聽在某個端口,啓動服務,準備接受客戶端的請求

HandleFunc 的作用:通過類型轉換讓我們可以將普通的函數作爲 HTTP 處理器使用

服務端代碼流程:

客戶端示例

import (
    "fmt"
    "github.com/gorilla/websocket"
    "log"
    "time"
)

func main() {
    //服務器地址 websocket 統一使用 ws://
    url := "ws://localhost:8080/ws" 
    //使用默認撥號器,向服務器發送連接請求
    ws, _, err := websocket.DefaultDialer.Dial(url, nil)
    if err != nil {
        log.Fatal(err)
    }
    //關閉連接
    defer conn.Close()
    //發送消息
    go func() {
        for {
            err := ws.WriteMessage(websocket.BinaryMessage, []byte("ping"))
            if err != nil {
                log.Fatal(err)
            }
            //休眠兩秒
            time.Sleep(time.Second * 2)
        }
    }()

    //接收消息
    for {
        _, data, err := ws.ReadMessage()
        if err != nil {
            log.Fatal(err)
        }
        fmt.Println("client receive message: ", string(data))
    }
}

客戶端的實現看起來也是簡單,先使用默認撥號器,向服務器地址發送連接請求,撥號成功時也返回一個 * Conn,開啓一個協程每隔兩秒向服務端發送消息,同樣都是使用 ReadMessage 和 W riteMessage 讀寫消息。

示例代碼運行結果如下:

源碼走讀

看完上面基本的客戶端和服務端案例之後,我們對整個消息發送和接收的使用已經熟悉了,實際開發中要做的就是如何結合業務去定義消息類型和發送場景了,我們接着走讀下底層的實現邏輯!

代碼走讀我們分了四部分,主要了解協議是如何升級、已經消息如何讀寫、解析數據幀【 🚩 🚩核心】!

Upgrade 協議升級

Upgrade 顧名思義【升級】,在進行協議升級之前是需要對協議進行校驗的,之前我們知道待升級的 http 請求是有固定請求頭的,這裏列舉幾個:

✏️ Upgrade 進行校驗的目的是看該請求是否符合協議升級的規定

Upgrade 的部分校驗代碼如下,return 處進行了省略

func (u *Upgrader) Upgrade(w http.ResponseWriter, r *http.Request, responseHeader http.Header) (*Conn, error) {

    if !tokenListContainsValue(r.Header, "Connection""upgrade") {
           return ...
    }
    if !tokenListContainsValue(r.Header, "Upgrade""websocket") {
        return ...
    }
    //必須是get請求方法
    if r.Method != http.MethodGet {
           return ...
    }

    if !tokenListContainsValue(r.Header, "Sec-Websocket-Version""13") {
        return ...
    }

    if _, ok := responseHeader["Sec-Websocket-Extensions"]; ok {
        return ...
    }
    ...
    c := newConn(netConn, true, u.ReadBufferSize, u.WriteBufferSize, u.WriteBufferPool, br, writeBuf)
    ...
}

tokenListContainsValue 的目的是校驗請求的 Header 中是否有 upgrade 需要的特定參數,比如我們上圖列舉的一些。

newConn 就是初始化部分 Conn 結構體的,方法中的第二個參數爲 true 代表這是服務端

computeAcceptKey 計算接受密鑰:

這個函數重點說下,在上一期中在 websocket【連接確認】這一章節中知道,websocket 協議升級時,需要滿足如下條件:

✏️只有當請求頭參數 Sec-WebSocket-Key 字段的值經過固定算法加密後的數據和響應頭裏的 Sec-WebSocket-Accept 的值保持一致,該連接纔會被認可建立。

var keyGUID = []byte("258EAFA5-E914-47DA-95CA-C5AB0DC85B11")

func computeAcceptKey(challengeKey string) string {
    h := sha1.New() 
    h.Write([]byte(challengeKey))
    h.Write(keyGUID)
    return base64.StdEncoding.EncodeToString(h.Sum(nil))
}

上面 computeAcceptKey 函數的實現,驗證了之前說的關於 Sec-WebSocket-Accept 的生成

服務端需將 Sec-WebSocket-Key 和固定的 GUID 字符串( 258EAFA5-E914-47DA-95CA-C5AB0DC85B11) 拼接後使用 SHA-1 進行哈希,並採用 base64 編碼後返回

ReadMessage 讀消息

ReadMessage 方法內部使用 NextReader 獲取讀取器並從該讀取器讀取到緩衝區,如果是一條消息由多個數據幀,則會拼接成完整的消息,返回給業務層。

func (c *Conn) ReadMessage() (messageType int, p []byte, err error) {
    var r io.Reader
    messageType, r, err = c.NextReader()
    if err != nil {
        return messageType, nil, err
    }
    //ReadAll從r讀取,直到出現錯誤或EOF,並返回讀取的數據
    p, err = io.ReadAll(r)
    return messageType, p, err
}

該方法,返回三個參數,分別是消息類型、內容、error

messageType 是 int 型,值可能是 BinaryMessage(二進制消息) 或 TextMessage(文本消息)

NextReader: 該方法得到一個消息類型 messageType,io.Reader,err

func (c *Conn) NextReader() (messageType int, r io.Reader, err error) {
        ...
        for c.readErr == nil {
        //解析數據幀方法advanceFrame
        // frameType : 幀類型
        frameType, err := c.advanceFrame()
        if err != nil {
            c.readErr = hideTempErr(err)
            break
        }
        //數據類型是 文本或二進制類型
        if frameType == TextMessage || frameType == BinaryMessage {
            c.messageReader = &messageReader{c}
            c.reader = c.messageReader
            if c.readDecompress {
                c.reader = c.newDecompressionReader(c.reader)
            }
            return frameType, c.reader, nil
        }
    }
    ...
}

c.advanceFrame() 是核心代碼,主要是實現解析這條消息,這裏在最後章節會講。

這裏有個 c.messageReader (當前的低級讀取器),賦值給 c.reader,爲什麼要這樣呢?

c.messageReader 是更低級讀取器,而 c.reader 的作用是當前讀取器返回到應用程序。簡單就是 messageReader 是實現了 c.reader 接口的結構體, 從而也實現了 io.Reader 接口

圖上加一個 bufio.Read 方法:Read 讀取數據寫入 p。本方法返回寫入 p 的字節數。本方法一次調用最多會調用下層 Reader 接口一次 Read 方法,因此返回值 n 可能小於 len(p)。讀取到達結尾時,返回值 n 將爲 0 而 err 將爲 io.EOF

messageReader 的 Read 方法: 我們看下 Read 的具體實現,Read 方法主要是讀取數據幀內容,直到出現並返回 io.EOF 或者其他錯誤爲止,而實際調用它的正是 io.ReadAll。

func (r *messageReader) Read([]byte) (int, error) {
    ...
    for c.readErr == nil {
        //當前幀中剩餘的字節
        if c.readRemaining > 0 {
            if int64(len(b)) > c.readRemaining {
                b = b[:c.readRemaining]
            }
            //讀取到切片b中
            n, err := c.br.Read(b)
            c.readErr = hideTempErr(err)
            //當Conn是服務端
            if c.isServer {
                c.readMaskPos = maskBytes(c.readMaskKey, c.readMaskPos, b[:n])
            }
            //readRemaining字節數轉int64
            rem := c.readRemaining
            rem -= int64(n)
            //跟蹤連接上剩餘的字節數
            if err := c.setReadRemaining(rem); err != nil {
                return 0, err
            }
            if c.readRemaining > 0 && c.readErr == io.EOF {
                c.readErr = errUnexpectedEOF
            }
            //返回讀後字節數
            return n, c.readErr
        }
        //標記是否最後一個數據幀
        if c.readFinal {
            // messageRader 置爲nil
            c.messageReader = nil
            return 0, io.EOF
        }
        //獲取數據幀類型
        frameType, err := c.advanceFrame()
        switch {
        case err != nil:
            c.readErr = hideTempErr(err)
        case frameType == TextMessage || frameType == BinaryMessage:
            c.readErr = errors.New("websocket: internal error, unexpected text or binary in Reader")
        }
    }

    err := c.readErr
    if err == io.EOF && c.messageReader == r {
        err = errUnexpectedEOF
    }
    return 0, err
}

io.ReadAll : ReadAll 從 r 讀取,這裏是實現如果一條消息由多個數據幀,會一直讀直到最後一幀的關鍵。

func ReadAll(r Reader) ([]byte, error) {
    b := make([]byte, 0, 512)
    for {
        if len(b) == cap(b) {
            // 給[]byte添加更多容量
            b = append(b, 0)[:len(b)]
        }
        n, err := r.Read(b[len(b):cap(b)])
        b = b[:len(b)+n]
        if err != nil {
            if err == EOF {
                err = nil
            }
            return b, err
        }
    }
}

可以看出在 for 循環中一直讀取,直至讀取到最後一幀,直到返回 io.EOF 或網絡原因錯誤爲止,否則一直進行阻塞讀,這些 error 可以從上面講到的 messageReader 的 Read 方法可以看出來。

總結下,整個流程如下:

整個讀消息的流程就結束了,我們繼續看如何寫消息!

WriteMessage 寫消息

既然讀消息是對數據幀進行解析,那麼寫消息就自然會聯想到將數據按照數據幀的規範組裝寫入到一個 writebuf 中,然後寫入到網絡中。

我們繼續看 WriteMessage 是如何實現的

func (c *Conn) WriteMessage(messageType int, data []byte) error {
    ...
    //w 是一個io.WriteCloser
    w, err := c.NextWriter(messageType)
    if err != nil {
        return err
    }
    //將data寫入writeBuf中
    if _, err = w.Write(data); err != nil {
        return err
    }
    return w.Close()
}

WriteMessage 方法接收一個消息類型和數據,主要邏輯是先調用 Conn 的 NextWriter 方法得到一個 io.WriteCloser,然後寫消息到這個 Conn 的 writeBuf,寫完消息後 close 它。

NextWriter 實現如下:

func (c *Conn) NextWriter(messageType int) (io.WriteCloser, error) {
    var mw messageWriter
    if err := c.beginMessage(&mw, messageType); err != nil {
        return nil, err
    }
    c.writer = &mw
    ...
    return c.writer, nil
}

注意看這裏有個 messageWriter 賦值給了 Conn 的 writer,也就是說 messageWriter 實現了 io.WriterCloser 接口。

這裏的實現跟讀消息中的 NextReader 方法中的 messageReader 很像,也是通過實現 io.Reader 接口,然後賦值給了 Conn 的 Reader,這裏可以做個小聯動,找到讀寫消息實際的實現者 messageReader、messageWriter

messageWriter 的 Write 實現:

前置知識:如果沒有設置 Conn 中 writeBufferSize, 默認情況下會設置爲 4096 個字節,另外加上 14 字節的數據幀頭部大小【這些在 newConn 中初始化的時候有代碼說明】

func (w *messageWriter) Write([]byte) (int, error) {
    ...
    //如果字節長度大於初始化的writeBuf空間大小
    if len(p) > 2*len(w.c.writeBuf) && w.c.isServer {
        //寫入方法
        err := w.flushFrame(false, p)
        ...
    }
    //字節長度不大於初始化的writeBuf空間大小
    nn := len(p)
    for len(p) > 0 {
        //內部也是調用的flushFrame
        n, err := w.ncopy(len(p))
        ...
    }
    return nn, nil
}

messageWriter 中的 Write 方法主要的目的是將數據寫入到 writeBuf 中,它主要存儲結構化的數據幀內容,所謂結構化就是按照數據幀的格式,用 Go 實現寫入的。

總結下,整個流程如下:

而 flushFrame 方法將緩衝數據和額外數據作爲幀寫入網絡,這個 final 參數表示這是消息中的最後一幀。

至於 flushFrame 內部是如何實現寫入網絡中的,你可以看看 net.Conn 是怎麼 Write 的,因爲最終就是調這個寫入網絡的,這裏就不再深究了,有興趣的同學可以自己挖一挖!

advanceFrame 解析數據幀

解析數據幀放在最後,前面的代碼走讀主要是爲了方便大家能把整體流程搞清楚,而數據幀的解析,是更加需要對 websocket 基礎有了解,特別是數據幀的組成,因爲解析就是按照協定用 Go 代碼實現的一種方式而已!

強烈推薦大家看完# 爲什麼有了 http,還需要 websocket,懂了!]

根據上圖【來自網絡】回顧下數據幀各部分代表的意思:

FIN : 1 個 bit 位,用來標記當前數據幀是不是最後一個數據幀

RSV1, RSV2, RSV3 :這三個各佔用一個 bit 位用做擴展用途,沒有這個需求的話設置爲 0

Opcode : 該值定義的是數據幀的數據類型 1 表示文本 2 表示二進制

MASK:表示數據有沒有使用掩碼

Payload length :數據的長度,Payload data 的長度,佔 7bits,7+16bits,7+64bits

Masking-key :數據掩碼 (設置爲 0,則該部分可以省略,如果設置爲 1,則用來解碼客戶端發送給服務端的數據幀)

Payload data : 幀真正要發送的數據,可以是任意長度

advanceFrame 解析方法

實現代碼會比較長,如果直接貼代碼,會看不下去,該方法返回數據類型和 error, 這裏我們只會截取其中一部分

func (c *Conn) advanceFrame() (int, error) {
    ...
    //讀取前兩個字節
    p, err := c.read(2)
    if err != nil {
        return noFrame, err
    }
    //數據幀類型
    frameType := int(p[0] & 0xf)
    // FIN 標記位
    final := p[0]&finalBit != 0
    //三個擴展用
    rsv1 := p[0]&rsv1Bit != 0
    rsv2 := p[0]&rsv2Bit != 0
    rsv3 := p[0]&rsv3Bit != 0
    //mask :是否使用掩碼
    mask := p[1]&maskBit != 0
    ...
    switch c.readRemaining {
    case 126:
        p, err := c.read(2)
        if err != nil {
            return noFrame, err
        }

        if err := c.setReadRemaining(int64(binary.BigEndian.Uint16(p))); err != nil {
            return noFrame, err
        }
    case 127:
        p, err := c.read(8)
        if err != nil {
            return noFrame, err
        }

        if err := c.setReadRemaining(int64(binary.BigEndian.Uint64(p))); err != nil {
            return noFrame, err
        }
    }
    ..
}

整個流程分爲了 7 個部分:

  1. 1. 跳過前一幀的剩餘部分,畢竟這是之前幀的數據

  2. 2. 讀取並解析幀頭的前兩個字節(從上面圖中可以看出只讀取到 Payload len)

  3. 3. 根據讀取和解析幀長度(根據 Payload length 的值來獲取 Payload data 的長度)

  4. 4. 處理數據幀的 mask 掩碼

  5. 5. 如果是文本和二進制消息,強制執行讀取限制並返回 (結束)

  6. 6. 讀取控制幀有效載荷 即 play data,設置 setReadRemaining 以安全地更新此值並防止溢出

  7. 7. 過程控制幀有效載荷,如果是 ping/pong/close 消息類型,返回 -1 (noFrame) (結束)

advanceFrame 方法的主要目的就是解析數據幀,獲取數據幀的消息類型,而對於數據幀的解析都是按照上圖幀格式來的!

heartbeat 心跳

WebSocket 爲了確保客戶端、服務端之間的 TCP 通道連接沒有斷開,使用心跳機制來判斷連接狀態。如果超時時間內沒有收到應答則認爲連接斷開,關閉連接,釋放資源。流程如下

ping、pong 消息:它們對應的是 WebSocket 的兩個控制幀,opcode 分別是 0x9、0xA,對應的消息類型分別是 PingMessage, PongMessage,前提是應用程序需要先讀取連接中的消息才能處理從對等方發送的 close、ping 和 pong 消息。

⏰⏰ 當然關於源碼的部分我只是拿了其中一部分比如:控制類消息、併發、緩衝等,大家要知道有這些功能,有興趣的可以去看看

總結

本期主要和大家一起了解 gorilla/websocket 框架的使用和部分底層實現原理代碼走讀,通篇讀下來想必大家對 websocket 用程序語言實現有了更深刻的認識吧!

不過流行的開源 Go 語言 Web 工具包 Gorilla 宣佈已正式歸檔,目前已進入只讀模式。“它發出的信號是,這些庫在未來將不會有任何發展。

也就是說 gorilla/websocket 這個被廣泛使用的 websocket 庫也會停止更新了,真是個令人悲傷的消息!

正如作者所說的那樣:“沒有一個項目需要永遠存在。這可能不會讓每個人都開心,但生活就是這樣。”

好了,通過兩期對 websocket 的講解,相信大家心裏已經對它有了比較深刻的印象,還是那句話知道的越多,不知道的也越多,一起前行讓自己知道的更多一點!

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/jsAo0ntFutONLdnEGnFRzQ