使用 Go 實現服務端事件推送 SSE
背景
在對內部 CRM 項目進行優化時,我們發現項目中的站內信功能目前採用了 WebSocket 來實現消息推送。然而,對於站內信這種低頻的推送場景來說,維護一個長連接的成本相對較高。WebSocket 通常用於需要實時雙向通信的應用,而我們需要的只是簡單的單向推送。經過考慮,我們決定使用一種更輕量級的技術——Server-Sent Events(SSE)來實現站內信的推送。這種技術不僅可以減少服務器資源的消耗,還能簡化實現過程。
SSE 簡介
Server-Sent Events(SSE)是一種允許服務器向瀏覽器推送實時更新的技術。與 WebSocket 不同,SSE 是基於 HTTP 協議的,它通過在請求頭中添加 Accept: text/event-stream 來標識這是一個 SSE 請求。SSE 主要用於服務器向客戶端單向推送事件,例如實時更新股票價格、社交媒體通知等。其優點在於實現簡單、資源消耗低,尤其適合於低頻率的事件推送。
SSE 與 WebSocket 比較
1. 通信方式
-
SSE 提供單向通信,即服務器向客戶端推送數據,客戶端無法直接向服務器發送數據。
-
WebSocket 提供雙向通信,允許服務器和客戶端之間進行實時的數據交換。
2. 協議
-
SSE 是通過標準的 HTTP 協議實現的,適合於大多數 Web 應用程序的需求。
-
WebSocket 是一種獨立的協議,需要在建立連接時進行協議升級。
3. 實現複雜度
-
SSE 的實現相對簡單,服務器只需維持一個 HTTP 連接即可推送數據。
-
WebSocket 的實現較爲複雜,涉及到協議握手和連接維護等操作。
4. 數據傳輸
-
SSE 僅支持文本數據的傳輸,不支持二進制數據。
-
WebSocket 支持傳輸文本數據和二進制數據,適用於更復雜的應用場景。
5. 連接限制
-
SSE 的連接數受限於瀏覽器的限制,通常在 6 到 10 個連接之間。
-
WebSocket 的連接限制較少,但管理多個 WebSocket 連接仍需額外的資源。
SSE 與 Websocket 相比較:
-
SSE 提供單向通信,Websocket 提供雙向通信;
-
SSE 是通過 HTTP 協議實現的,Websocket 是單獨的協議;
-
實現上來說 SSE 比較容易,Websocket 複雜一些;
-
SSE 有最大連接數限制;
-
WebSocket 可以傳輸二進制數據和文本數據,但 SSE 只有文本數據;
SSE 與長輪詢
長輪詢是一種通信方法,由客戶端定期訪問服務器獲取新數據;
當正在構建的應用程序涉及手工操作或執行計算量大的任務時,通常使用這種形式的通信;
例如,觸發機器學習模型的訓練,此時需要很長時間才能完成;在這種情況下,可能不需要經常檢查這些任務的完成情況;
而 SSE 通常用於快速生成事件的應用程序中,例如,在 YouTube 視頻上託管喜歡的實時計數,在 UI 上顯示服務器日誌文件或將通知推送到用戶的電話,所有這些事件都近似於即時更新;
實現
以下是使用 Golang 實現 SSE 的基本步驟。
實現步驟
服務端
-
創建 HTTP 服務器:使用 Golang 的 net/http 包創建一個簡單的 HTTP 服務器。
-
設置 SSE 響應頭:在響應中設置適當的 Content-Type,並確保連接保持打開狀態。
-
發送事件數據:持續向客戶端發送數據,使用特定的格式,
data: <message>
。 -
保持連接:確保連接持續,以便服務器可以持續推送更新。
package main
import (
"bufio"
"fmt"
"net/http"
"os"
)
/**
* @Author: PFinal南丞
* @Author: lampxiezi@163.com
* @Date: 2024/8/22
* @Desc:
* @Project: 2024
*/
func SSEHandler(w http.ResponseWriter, r *http.Request) {
// 設置SSE相關的響應頭
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
w.Header().Set("Access-Control-Allow-Origin", "*")
// 檢查是否支持Flush
flusher, ok := w.(http.Flusher)
if !ok {
http.Error(w, "Streaming unsupported!", http.StatusInternalServerError)
return
}
// 創建一個通道,用於將輸入的數據發送到SSE
inputChan := make(chan string)
// 啓動一個Goroutine來讀取標準輸入併發送到通道
go func() {
scanner := bufio.NewScanner(os.Stdin)
for scanner.Scan() {
text := scanner.Text()
fmt.Println("Read from stdin:", text) // 輸出讀取到的內容
inputChan <- text
}
close(inputChan)
}()
// 監聽通道中的數據並推送到客戶端
for {
select {
case msg, ok := <-inputChan:
if !ok {
// 通道關閉,結束SSE
fmt.Fprint(w, "data: Connection closed\n\n")
flusher.Flush()
return
}
fmt.Println("Pushing to client:", msg) // 輸出即將推送的內容
_, err := fmt.Fprintf(w, "data: %s\n\n", msg)
if err != nil {
// 推送失敗,可能是客戶端斷開了連接
fmt.Println("Client disconnected:", err)
return
}
flusher.Flush()
}
}
}
func main() {
http.HandleFunc("/events", SSEHandler)
fmt.Println("Starting server on :8000")
err := http.ListenAndServe(":8000", nil)
if err != nil {
fmt.Println("Error starting server:", err)
}
}
客戶端
const eventSource = new EventSource("http://localhost:8080/events");
eventSource.onmessage = function(event) {
console.log("New message:", event.data);
};
eventSource.onerror = function() {
console.error("Error occurred while receiving events.");
};
效果如下圖
通過開源庫 eventsource 直接支持了 SSE,使用這個庫構建服務器
使用開源庫
package main
import (
"fmt"
"log"
"net/http"
"time"
"gopkg.in/antage/eventsource.v1"
)
func main() {
es := eventsource.New(nil, nil)
defer es.Close()
http.Handle("/", http.FileServer(http.Dir("./public")))
http.Handle("/events", es)
go func() {
for {
// 每2秒發送一條當前時間消息,並打印對應客戶端數量
es.SendEventMessage(fmt.Sprintf("hello, now is: %s", time.Now()), "", "")
log.Printf("Hello has been sent (consumers: %d)", es.ConsumersCount())
time.Sleep(2 * time.Second)
}
}()
log.Println("Open URL http://localhost:8080/ in your browser.")
err := http.ListenAndServe(":8080", nil)
if err != nil {
log.Fatal(err)
}
}
注意
-
SSE 的連接限制:每個瀏覽器對 SSE 連接數有一定的限制,通常在 6 到 10 個之間。
-
超時處理:需要確保服務器不會超時關閉連接,同時客戶端可能需要處理重新連接的邏輯。
總結
通過將站內信推送從 WebSocket 切換到 SSE,我們能夠實現更高效的低頻推送。SSE 的簡單實現和低資源消耗使其成爲處理此類場景的理想選擇。未來,如果有更多的優化需求或技術挑戰,持續探索和調整將是提升系統性能和用戶體驗的關鍵。
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/sNsH3JlrfP1u_mpqwfKPVA