GO 實現千萬級 WebSocket 消息推送服務

【導讀】WebSocket 是做什麼的,應用上有什麼坑?本文詳細介紹了 WebSocket 技術和 Go 實現。

拉模式和推模式區別

拉模式(定時輪詢訪問接口獲取數據)

推模式(向客戶端進行數據的推送)

基於 WebSocket 協議做推送

WebSocket 協議的交互流程

客戶端首先發起一個 Http 請求到服務端,請求的特殊之處,在於在請求裏面帶了一個 upgrade 的字段,告訴服務端,我想生成一個 websocket 的協議,服務端收到請求後,會給客戶端一個握手的確認,返回一個 switching, 意思允許客戶端向 websocket 協議轉換,完成這個協商之後,客戶端與服務端之間的底層 TCP 協議是沒有中斷的,接下來,客戶端可以向服務端發起一個基於 websocket 協議的消息,服務端也可以主動向客戶端發起 websocket 協議的消息,websocket 協議裏面通訊的單位就叫 message。

————————————————

服務端技術選型與考慮

NodeJs

C/C++

Go

基於 Go 實現 WebSocket 服務端

用 Go 語言對 WebSocket 做一個簡單的服務端實現,以及 HTML 頁面進行調試,並對 WebSocket 封裝,這裏就直接給出代碼了。

WebSocket 服務端

package main
 
import (
  "net/http"
  "github.com/gorilla/websocket"
  "github.com/myproject/gowebsocket/impl"
  "time"
  )
var(
 upgrader = websocket.Upgrader{
  // 允許跨域
  CheckOrigin:func(r *http.Request) bool{
   return true
  },
 }
)
 
func wsHandler(w http.ResponseWriter , r *http.Request){
 // w.Write([]byte("hello"))
 var(
  wsConn *websocket.Conn
  err error
  conn *impl.Connection
  data []byte
 )
 // 完成ws協議的握手操作
 // Upgrade:websocket
 if wsConn , err = upgrader.Upgrade(w,r,nil); err != nil{
  return 
 }
 
 if conn , err = impl.InitConnection(wsConn); err != nil{
  goto ERR
 }
 
 // 啓動線程,不斷髮消息
 go func(){
  var (err error)
  for{
   if err = conn.WriteMessage([]byte("heartbeat"));err != nil{
    return 
   }
   time.Sleep(1*time.Second)
  }
 }()
 
 for {
  if data , err = conn.ReadMessage();err != nil{
   goto ERR
  }
  if err = conn.WriteMessage(data);err !=nil{
   goto ERR
  }
 }
 
 ERR:
  conn.Close()
 
}
 
func main(){
 
 http.HandleFunc("/ws",wsHandler)
 http.ListenAndServe("0.0.0.0:7777",nil)
}

前端頁面

<!DOCTYPE html>
<html>
<head>
 <title>go websocket</title>
 <meta charset="utf-8" />  
</head>
<body>
 <script type="text/javascript">
  var wsUri ="ws://127.0.0.1:7777/ws"; 
     var output;  
     
     function init() { 
         output = document.getElementById("output"); 
         testWebSocket(); 
     }  
  
     function testWebSocket() { 
         websocket = new WebSocket(wsUri); 
         websocket.onopen = function(evt) { 
             onOpen(evt) 
         }; 
         websocket.onclose = function(evt) { 
             onClose(evt) 
         }; 
         websocket.onmessage = function(evt) { 
             onMessage(evt) 
         }; 
         websocket.onerror = function(evt) { 
             onError(evt) 
         }; 
     }  
  
     function onOpen(evt) { 
         writeToScreen("CONNECTED"); 
        // doSend("WebSocket rocks"); 
     }  
  
     function onClose(evt) { 
         writeToScreen("DISCONNECTED"); 
     }  
  
     function onMessage(evt) { 
         writeToScreen('<span>RESPONSE: '+ evt.data+'</span>'); 
        // websocket.close(); 
     }  
  
     function onError(evt) { 
         writeToScreen('<span>ERROR:</span> '+ evt.data); 
     }  
  
     function doSend(message) { 
         writeToScreen("SENT: " + message);  
         websocket.send(message); 
     }  
  
     function writeToScreen(message) { 
         var pre = document.createElement("p"); 
         pre.style.wordWrap = "break-word"; 
         pre.innerHTML = message; 
         output.appendChild(pre); 
     }  
  
     window.addEventListener("load", init, false);  
     function sendBtnClick(){
      var msg = document.getElementById("input").value;
      doSend(msg);
      document.getElementById("input").value = '';
     }
     function closeBtnClick(){
      websocket.close(); 
     }
 </script>
 <h2>WebSocket Test</h2>  
 <input type="text" id="input"></input>
 <button onclick="sendBtnClick()" >send</button>
 <button onclick="closeBtnClick()" >close</button>
 <div id="output"></div>  
 
</body>
</html>

封裝 WebSocket

package impl
 
import (
  "github.com/gorilla/websocket"
  "sync"
  "errors"
  )
 
type Connection struct{
 wsConnect *websocket.Conn
 inChan chan []byte
 outChan chan []byte
 closeChan chan byte
 
 mutex sync.Mutex  // 對closeChan關閉上鎖
 isClosed bool  // 防止closeChan被關閉多次
}
 
func InitConnection(wsConn *websocket.Conn)(conn *Connection ,err error){
 conn = &Connection{
  wsConnect:wsConn,
  inChan: make(chan []byte,1000),
  outChan: make(chan []byte,1000),
  closeChan: make(chan byte,1),
 
 }
 // 啓動讀協程
 go conn.readLoop();
 // 啓動寫協程
 go conn.writeLoop();
 return
}
 
func (conn *Connection)ReadMessage()(data []byte , err error){
 
 select{
 case data = <- conn.inChan:
 case <- conn.closeChan:
  err = errors.New("connection is closeed")
 }
 return 
}
 
func (conn *Connection)WriteMessage(data []byte)(err error){
 
 select{
 case conn.outChan <- data:
 case <- conn.closeChan:
  err = errors.New("connection is closeed")
 }
 return 
}
 
func (conn *Connection)Close(){
 // 線程安全,可多次調用
 conn.wsConnect.Close()
 // 利用標記,讓closeChan只關閉一次
 conn.mutex.Lock()
 if !conn.isClosed {
  close(conn.closeChan)
  conn.isClosed = true 
 }
 conn.mutex.Unlock()
}
 
// 內部實現
func (conn *Connection)readLoop(){
 var(
  data []byte
  err error
  )
 for{
  if _, data , err = conn.wsConnect.ReadMessage(); err != nil{
   goto ERR
  }
//阻塞在這裏,等待inChan有空閒位置
  select{
   case conn.inChan <- data:
   case <- conn.closeChan:  // closeChan 感知 conn斷開
    goto ERR
  }
  
 }
 
 ERR:
  conn.Close()
}
 
func (conn *Connection)writeLoop(){
 var(
  data []byte
  err error
  )
 
 for{
  select{
   case data= <- conn.outChan:
   case <- conn.closeChan:
    goto ERR
  }
  if err = conn.wsConnect.WriteMessage(websocket.TextMessage , data); err != nil{
   goto ERR
  }
 }
 
 ERR:
  conn.Close()
 
}

千萬級彈幕系統的架構設計

技術難點

推送量大:100W 在線 * 10 條 / 每秒 = 1000W 條 / 秒

內核瓶頸:linux 內核發送 TCP 的極限包頻 ≈ 100W / 秒

需要維護在線用戶集合(100W 用戶在線),通常是一個字典結構

推送消息即遍歷整個集合,順序發送消息,耗時極長

推送期間,客戶端仍舊正常的上下線,集合面臨不停的修改,修改需要遍歷,所以集合需要上鎖

瀏覽器與服務端之間一般採用的是 JSon 格式去通訊

Json 編碼非常耗費 CPU 資源

向 100W 在線推送一次,則需 100W 次 Json Encode

優化方案

減少網絡小包的發送,我們將網絡上幾百字節定義成網絡的小包了,小包的問題是對內核和網絡的中間設備造成處理的壓力。方案是將一秒內 N 條消息合併成 1 條消息,合併後,每秒推送數等於在線連接數。

大鎖拆小鎖,將長連接打散到多個集合中去,每個集合都有自己的鎖,多線程併發推送集合,線程之間推送的集合不同,所以沒有鎖的競爭關係,避免鎖競爭。

讀寫鎖取代互斥鎖,多個推送任務可以併發遍歷相同集合

減少重複計算,Json 編碼前置,1 次消息編碼 + 100W 次推送,消息合併前置,N 條消息合併後,只需要編碼一次。

部署多個節點,通過負載均衡,把連接打散到多個 服務器上,但推送消息的時候,不知道哪個直播間在哪個節點上,最常用的方式是將消息廣播給所有的網關節點,此時就需要做一個邏輯集羣。

基於 Http2 協議向 gateway 集羣分發消息(Http2 支持連接複用,用作 RPC 性能更佳,即在單個連接上可以做高吞吐的請求應答處理)

基於 Http1 協議對外提供推送 API(Http1 更加普及,對業務方更加友好)

整體分佈式架構圖如下:

任何業務方通過 Http 接口調用到邏輯集羣,邏輯集羣把消息廣播給所有網關,各個網關各自將消息推送給在線的連接即可。

本文講解了開發消息推送服務的難點與解決方案的大體思路,按照整個理論流程下來,基本能實現一套彈幕消息推送的服務。

轉自:Xin Lee

zhuanlan.zhihu.com/p/100770431

Go 開發大全

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/Y22bu2qwX6s7M06wxWio7A