GO 實現千萬級 WebSocket 消息推送服務
【導讀】WebSocket 是做什麼的,應用上有什麼坑?本文詳細介紹了 WebSocket 技術和 Go 實現。
拉模式和推模式區別
拉模式(定時輪詢訪問接口獲取數據)
-
數據更新頻率低,則大多數的數據請求時無效的
-
在線用戶數量多,則服務端的查詢負載很高
-
定時輪詢拉取,無法滿足時效性要求
推模式(向客戶端進行數據的推送)
-
僅在數據更新時,纔有推送
-
需要維護大量的在線長連接
-
數據更新後,可以立即推送
基於 WebSocket 協議做推送
-
瀏覽器支持的 socket 編程,輕鬆維持服務端的長連接
-
基於 TCP 協議之上的高層協議,無需開發者關心通訊細節
-
提供了高度抽象的編程接口,業務開發成本較低
WebSocket 協議的交互流程
客戶端首先發起一個 Http 請求到服務端,請求的特殊之處,在於在請求裏面帶了一個 upgrade 的字段,告訴服務端,我想生成一個 websocket 的協議,服務端收到請求後,會給客戶端一個握手的確認,返回一個 switching, 意思允許客戶端向 websocket 協議轉換,完成這個協商之後,客戶端與服務端之間的底層 TCP 協議是沒有中斷的,接下來,客戶端可以向服務端發起一個基於 websocket 協議的消息,服務端也可以主動向客戶端發起 websocket 協議的消息,websocket 協議裏面通訊的單位就叫 message。
————————————————
服務端技術選型與考慮
NodeJs
- 單線程模型(儘管可以多進程),推送性能有限
C/C++
- TCP 通訊、WebSocket 協議實現成本高
Go
-
多線程,基於協程模型併發
-
Go 語言屬於編譯型語言,運行速度並不慢
-
成熟的 WebSocket 標準庫,無需造輪子
基於 Go 實現 WebSocket 服務端
用 Go 語言對 WebSocket 做一個簡單的服務端實現,以及 HTML 頁面進行調試,並對 WebSocket 封裝,這裏就直接給出代碼了。
WebSocket 服務端
package main
import (
"net/http"
"github.com/gorilla/websocket"
"github.com/myproject/gowebsocket/impl"
"time"
)
var(
upgrader = websocket.Upgrader{
// 允許跨域
CheckOrigin:func(r *http.Request) bool{
return true
},
}
)
func wsHandler(w http.ResponseWriter , r *http.Request){
// w.Write([]byte("hello"))
var(
wsConn *websocket.Conn
err error
conn *impl.Connection
data []byte
)
// 完成ws協議的握手操作
// Upgrade:websocket
if wsConn , err = upgrader.Upgrade(w,r,nil); err != nil{
return
}
if conn , err = impl.InitConnection(wsConn); err != nil{
goto ERR
}
// 啓動線程,不斷髮消息
go func(){
var (err error)
for{
if err = conn.WriteMessage([]byte("heartbeat"));err != nil{
return
}
time.Sleep(1*time.Second)
}
}()
for {
if data , err = conn.ReadMessage();err != nil{
goto ERR
}
if err = conn.WriteMessage(data);err !=nil{
goto ERR
}
}
ERR:
conn.Close()
}
func main(){
http.HandleFunc("/ws",wsHandler)
http.ListenAndServe("0.0.0.0:7777",nil)
}
前端頁面
<!DOCTYPE html>
<html>
<head>
<title>go websocket</title>
<meta charset="utf-8" />
</head>
<body>
<script type="text/javascript">
var wsUri ="ws://127.0.0.1:7777/ws";
var output;
function init() {
output = document.getElementById("output");
testWebSocket();
}
function testWebSocket() {
websocket = new WebSocket(wsUri);
websocket.onopen = function(evt) {
onOpen(evt)
};
websocket.onclose = function(evt) {
onClose(evt)
};
websocket.onmessage = function(evt) {
onMessage(evt)
};
websocket.onerror = function(evt) {
onError(evt)
};
}
function onOpen(evt) {
writeToScreen("CONNECTED");
// doSend("WebSocket rocks");
}
function onClose(evt) {
writeToScreen("DISCONNECTED");
}
function onMessage(evt) {
writeToScreen('<span>RESPONSE: '+ evt.data+'</span>');
// websocket.close();
}
function onError(evt) {
writeToScreen('<span>ERROR:</span> '+ evt.data);
}
function doSend(message) {
writeToScreen("SENT: " + message);
websocket.send(message);
}
function writeToScreen(message) {
var pre = document.createElement("p");
pre.style.wordWrap = "break-word";
pre.innerHTML = message;
output.appendChild(pre);
}
window.addEventListener("load", init, false);
function sendBtnClick(){
var msg = document.getElementById("input").value;
doSend(msg);
document.getElementById("input").value = '';
}
function closeBtnClick(){
websocket.close();
}
</script>
<h2>WebSocket Test</h2>
<input type="text" id="input"></input>
<button onclick="sendBtnClick()" >send</button>
<button onclick="closeBtnClick()" >close</button>
<div id="output"></div>
</body>
</html>
封裝 WebSocket
package impl
import (
"github.com/gorilla/websocket"
"sync"
"errors"
)
type Connection struct{
wsConnect *websocket.Conn
inChan chan []byte
outChan chan []byte
closeChan chan byte
mutex sync.Mutex // 對closeChan關閉上鎖
isClosed bool // 防止closeChan被關閉多次
}
func InitConnection(wsConn *websocket.Conn)(conn *Connection ,err error){
conn = &Connection{
wsConnect:wsConn,
inChan: make(chan []byte,1000),
outChan: make(chan []byte,1000),
closeChan: make(chan byte,1),
}
// 啓動讀協程
go conn.readLoop();
// 啓動寫協程
go conn.writeLoop();
return
}
func (conn *Connection)ReadMessage()(data []byte , err error){
select{
case data = <- conn.inChan:
case <- conn.closeChan:
err = errors.New("connection is closeed")
}
return
}
func (conn *Connection)WriteMessage(data []byte)(err error){
select{
case conn.outChan <- data:
case <- conn.closeChan:
err = errors.New("connection is closeed")
}
return
}
func (conn *Connection)Close(){
// 線程安全,可多次調用
conn.wsConnect.Close()
// 利用標記,讓closeChan只關閉一次
conn.mutex.Lock()
if !conn.isClosed {
close(conn.closeChan)
conn.isClosed = true
}
conn.mutex.Unlock()
}
// 內部實現
func (conn *Connection)readLoop(){
var(
data []byte
err error
)
for{
if _, data , err = conn.wsConnect.ReadMessage(); err != nil{
goto ERR
}
//阻塞在這裏,等待inChan有空閒位置
select{
case conn.inChan <- data:
case <- conn.closeChan: // closeChan 感知 conn斷開
goto ERR
}
}
ERR:
conn.Close()
}
func (conn *Connection)writeLoop(){
var(
data []byte
err error
)
for{
select{
case data= <- conn.outChan:
case <- conn.closeChan:
goto ERR
}
if err = conn.wsConnect.WriteMessage(websocket.TextMessage , data); err != nil{
goto ERR
}
}
ERR:
conn.Close()
}
千萬級彈幕系統的架構設計
技術難點
- 內核瓶頸
推送量大:100W 在線 * 10 條 / 每秒 = 1000W 條 / 秒
內核瓶頸:linux 內核發送 TCP 的極限包頻 ≈ 100W / 秒
- 鎖瓶頸
需要維護在線用戶集合(100W 用戶在線),通常是一個字典結構
推送消息即遍歷整個集合,順序發送消息,耗時極長
推送期間,客戶端仍舊正常的上下線,集合面臨不停的修改,修改需要遍歷,所以集合需要上鎖
- CPU 瓶頸
瀏覽器與服務端之間一般採用的是 JSon 格式去通訊
Json 編碼非常耗費 CPU 資源
向 100W 在線推送一次,則需 100W 次 Json Encode
優化方案
- 內核瓶頸
減少網絡小包的發送,我們將網絡上幾百字節定義成網絡的小包了,小包的問題是對內核和網絡的中間設備造成處理的壓力。方案是將一秒內 N 條消息合併成 1 條消息,合併後,每秒推送數等於在線連接數。
- 鎖瓶頸
大鎖拆小鎖,將長連接打散到多個集合中去,每個集合都有自己的鎖,多線程併發推送集合,線程之間推送的集合不同,所以沒有鎖的競爭關係,避免鎖競爭。
讀寫鎖取代互斥鎖,多個推送任務可以併發遍歷相同集合
- CPU 瓶頸
減少重複計算,Json 編碼前置,1 次消息編碼 + 100W 次推送,消息合併前置,N 條消息合併後,只需要編碼一次。
- 集羣
部署多個節點,通過負載均衡,把連接打散到多個 服務器上,但推送消息的時候,不知道哪個直播間在哪個節點上,最常用的方式是將消息廣播給所有的網關節點,此時就需要做一個邏輯集羣。
- 邏輯集羣
基於 Http2 協議向 gateway 集羣分發消息(Http2 支持連接複用,用作 RPC 性能更佳,即在單個連接上可以做高吞吐的請求應答處理)
基於 Http1 協議對外提供推送 API(Http1 更加普及,對業務方更加友好)
整體分佈式架構圖如下:
任何業務方通過 Http 接口調用到邏輯集羣,邏輯集羣把消息廣播給所有網關,各個網關各自將消息推送給在線的連接即可。
本文講解了開發消息推送服務的難點與解決方案的大體思路,按照整個理論流程下來,基本能實現一套彈幕消息推送的服務。
轉自:Xin Lee
zhuanlan.zhihu.com/p/100770431
Go 開發大全
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/Y22bu2qwX6s7M06wxWio7A