Golang Websocket 實踐

這裏先簡單介紹一下 websocket,確實只是簡單介紹一下。

  1. 應用場景

有些場景下,比如交易 K 線,我們需要前端對後端進行輪詢來不斷獲取或者更新資源狀態。輪詢的問題毫無以爲是一種笨重的方式,因爲每一次 http 請求除了本身的資源信息傳輸外還有三次握手以及四次揮手。替代輪詢的一種方案是複用一個 http 連接,更準確的複用同一個 tcp 連接。這種方式可以是 http 長連接,也可以是 websocket。

  1. websocket 和 http 長連接的區別

首先 websocket 和 http 是完全不同的兩種協議,雖然底層都是 tcp/ip。http 長連接也是屬於 http 協議。http 協議和 websocket 的最大區別就是 http 是基於 request/response 模式,而 websocket 的 client 和 server 端卻可以隨意發起 data push,比如服務端向 app 端的消息下發就比較適合使用 websocket(這種場景下使用 http 長連接也是可以,client 端定時向 server 端發送消息,比如 heatbeat,然後 server 端要 push 的消息以 response 的形式返回給 client)。

這裏 https://gist.github.com/legendtkl/1922db71553c849ef0029429f737aadb 我寫一個 github gist 代碼片段,給大家體驗一下。

  1. Golang 最佳實踐

這裏先定義一下我們的使用場景:交易所有很多數據,比如 K 線,比如盤口數據都是在定時刷新的,這裏就可以用 websocket 來做。簡單來說,前端向後端請求特定的數據,比如 K 線數據,前端和後端建立 websocket 連接,後端持續不斷返回信息給前端。

在我們編寫 websocket 接口之前,需要略微考慮一下如何抽象,如何設計我們 websocket 框架從而保證代碼的良好的擴展性。

3.1 Hub

首先 hub 是什麼東西,下圖是 google image 查出來的結果。簡單做個類比,圖片中的 USB 3.0 口(藍色)就相當於一個個 tcp 連接,上面彙總的接口就是我們 hub 的上流數據源。

在我第一時間想去定義 hub 的粒度想到的是使用 controller,也就是請求的 router。但是後來想了一下這樣設計太複雜了,因爲一個 router 的參數有很多種,不同參數可能就對應不同數據。

那麼應該怎麼去定義呢?不是從功能性上去定義,而是從數據源上定義。我們只要簡單看一下需要提供多少類不停更新的數據,這裏的每一類就對應一個 hub。

3.2 Broadcast

通過 3.1 我們定義了 hub,下面要考慮的就是如何去做廣播。

最簡單的方式遍歷一個 hub 上面所有的 conn 然後進行 conn.Write()。這種方法非常的簡單粗暴,問題也很明顯:每個 conn.Write() 都是一個網絡 IO,我們這是在串行地處理多個網絡 IO,低效。

串行改並行。我們還是遍歷 hub 上面所有的 conn,然後每一個 conn.Write() 起一個 goroutine 去做,這樣其實就是 IO 多路複用。

思考一下上面這種方式還有沒有問題。其實是有的:擴展性的問題。如果 websocket 的接口參數比較多,我們要根據參數對不同的 conn 返回不同的結果,那麼應該怎麼做的?也很簡單,對上面的 conn 進行一次封裝,封裝成一個 struct。我在很久以前一篇文章討論函數的擴展性的時候也說過將函數形參設計成 struct 是一種不錯的擴展方式。

3.3 Hub 數據感知

接 3.2,broadcast 的數據怎麼得到,主動去信息源拉,還是別人 push 過來?最簡單的實現方式是構造生產者 - 消費者模型,而在 golang 中實現生產者 - 消費者模型尤其簡單。結合到我們這裏,我們只需要在 hub 中定一個 channel 即可。

我的理解,要廣播的數據如何生存應該都是業務邏輯,不應該和基礎框架耦合在一起。

  1. talk is cheap, show me the code

代碼以下面兩個 package 爲例:

  1. http://github.com/astaxie/beego

  2. http://github.com/gorilla/websocket

Controller 處理。

type WsController struct {
    beego.Controller
}
var upgrader = websocket.Upgrader{
    ReadBufferSize:  maxMessageSize,
    WriteBufferSize: maxMessageSize,
}

func (this *WsController) WSTest() {
    defer this.ServeJSON()
    ws, err := upgrader.Upgrade(this.Ctx.ResponseWriter, this.Ctx.Request, nil)
    // 這裏 ws 就是 websocket.Conn,是 websocket 對 net.Conn 的封裝
    if err != nil {
        this.Data["json"] = "fail"
        return
    }

    // WsClient 是我們對 websocket.Conn 的再一層封裝,後面細說
    wsClient := &WsClient{
        WsConn:  ws,
        WsSend:  make(chan []byte, maxMessageSize),
        HttpRequest:  this.Ctx.Request,     //記錄請求參數
    }

    service.ServeWsExample(wsClient)
}

WsClient 結構。

type WsClient struct {
    WsConn  *websocket.Conn
    WsSend  chan []byte
    HttpRequest http.Request 
}

WsClient 有兩個基本方法:對 client 端發送過來的數據進行處理,以及對 server 端下發的數據進行處理。這使用函數作爲參數,也是爲了實現最大的靈活性,但是函數參數的設計不一定是最合適的,如果大家有更合適的,歡迎指教。

func (client *WsClient) ReadMsg(fn func(c *WsClient, s string)) {
    for {
        _, msg, err := client.WsConn.ReadMessage()
        if err != nil {
            break
        }
        fn(client, string(msg))
    }
}

func (client *WsClient) WriteMsg(fn func(s string) error) {
    for {
        select {
        case msg, ok := <-client.WsSend:
            if !ok {
                client.WsConn.WriteMessage(websocket.CloseMessage, []byte{})
                return 
            }
            if err := fn(string(msg)); err != nil {
                return
            }
        }
    }
}

Hub。

type WsHub struct {
    Clients    map[*WsClient]bool   // clients to be broadcast
    Broadcast  chan string
    Register   chan *WsClient
    UnRegister chan *WsClient
    LastMsg    string     // 最近一次的廣播內容。如果我們是 1 分鐘廣播一次,新來一個請求還沒有到廣播的時間,就返回最近一次廣播的內容
    Identity   string     //可以用作做標誌
}

Hub 包括一個 export 的 Run 方法和一個私有方法 broadCast()。

func (hub *WsHub) Run() {
    for {
        select {
        case c := <-hub.Register:
            hub.Clients[c] = true
            c.WsSend <- []byte(hub.LastMsg)
            break
        case c := <-hub.UnRegister:
            _, ok := hub.Clients[c]
            if ok {
                delete(hub.Clients, c)
                close(c.WsSend)
            }
            break
        case msg := <-hub.Broadcast:
            hub.LastMsg = msg
            hub.broadCast()
            break
        }
    }
}

func (hub *WsHub) broadCast() {
    for c := range hub.Clients {
        select {
        case c.WsSend <- []byte(hub.LastMsg):
            break
        default:
            close(c.WsSend)
            delete(hub.Clients, c)
        }

    }
}

我們現在把 client 和 hub 串起來,也就是第一個例子中的 service.ServeWsExample(wsClient)

// 初始化
func initWs() {
    WsHubs = make(map[string]*util.WsHub)
    hubList := []string{"hub1""hub2""hub2"}

    for _, hub := range hubList {
        WsHubs[hub] = &WsHub {
            Clients: make(map[*util.WsClient]bool),
            Broadcast: make(chan string),
            Register:  make(chan *util.WsClient),
            UnRegister: make(chan *util.WsClient),
            //Identity:   hub.String(),
        }
        go mockBroadCast(WsHubs[hub].Broadcast)
        go WsHubs[hub].Run()
    }
}

func mockBroadCast(broadCast chan string) {
    for {
        broadCast <- "hello world"
        time.Sleep(time.Second * 10)
    }
}

// controller 請求路由到相應的 ServeWsExample 函數
func ServeWsExample(c *util.WsClient, pair string) {
    defer func() {
        WsHubs[pair].UnRegister <- c
        c.WsConn.Close()
    }()

    WsHubs[pair].Register <- c
    go c.WriteMsg(func(string) error {})
    c.ReadMsg(func(*WsClient, string){})
}

還有一點需要說明的是,這裏沒有寫出生成者(也就是向 Hub 發送數據的進程),因爲生產者的寫法比較靈活,這裏還是簡單寫一個吧。

//init
func init() {
    go Producer()
}

// 生產者
func Producer() {
    for {
        // generate msg
        msg := "hello, I am legendtkl"

        // select the proper hub to send the msg
        WsHubs["hub1"].Broadcast <- msg
    }
}
  1. 寫在最後

工作之後一直思考的一個問題是,怎麼衡量代碼的擴展性以及如何寫出高擴展性的代碼?歡迎交流。

轉自:

zhuanlan.zhihu.com/p/35167916

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/wZVkWLswzN3YtSdZMXF1jg