使用 Go 實現服務端事件推送 SSE


背景

在對內部 CRM 項目進行優化時,我們發現項目中的站內信功能目前採用了 WebSocket 來實現消息推送。然而,對於站內信這種低頻的推送場景來說,維護一個長連接的成本相對較高。WebSocket 通常用於需要實時雙向通信的應用,而我們需要的只是簡單的單向推送。經過考慮,我們決定使用一種更輕量級的技術——Server-Sent Events(SSE)來實現站內信的推送。這種技術不僅可以減少服務器資源的消耗,還能簡化實現過程。

SSE 簡介

Server-Sent Events(SSE)是一種允許服務器向瀏覽器推送實時更新的技術。與 WebSocket 不同,SSE 是基於 HTTP 協議的,它通過在請求頭中添加 Accept: text/event-stream 來標識這是一個 SSE 請求。SSE 主要用於服務器向客戶端單向推送事件,例如實時更新股票價格、社交媒體通知等。其優點在於實現簡單、資源消耗低,尤其適合於低頻率的事件推送。

SSE 與 WebSocket 比較

1. 通信方式

2. 協議

3. 實現複雜度

4. 數據傳輸

5. 連接限制

SSE 與 Websocket 相比較:

SSE 與長輪詢

長輪詢是一種通信方法,由客戶端定期訪問服務器獲取新數據;

當正在構建的應用程序涉及手工操作或執行計算量大的任務時,通常使用這種形式的通信;

例如,觸發機器學習模型的訓練,此時需要很長時間才能完成;在這種情況下,可能不需要經常檢查這些任務的完成情況;

而 SSE 通常用於快速生成事件的應用程序中,例如,在 YouTube 視頻上託管喜歡的實時計數,在 UI 上顯示服務器日誌文件或將通知推送到用戶的電話,所有這些事件都近似於即時更新;

實現

以下是使用 Golang 實現 SSE 的基本步驟。

實現步驟

服務端
  1. 創建 HTTP 服務器:使用 Golang 的 net/http 包創建一個簡單的 HTTP 服務器。

  2. 設置 SSE 響應頭:在響應中設置適當的 Content-Type,並確保連接保持打開狀態。

  3. 發送事件數據:持續向客戶端發送數據,使用特定的格式,data: <message>

  4. 保持連接:確保連接持續,以便服務器可以持續推送更新。

package main

import (
 "bufio"
 "fmt"
 "net/http"
 "os"
)

/**
 * @Author: PFinal南丞
 * @Author: lampxiezi@163.com
 * @Date: 2024/8/22
 * @Desc:
 * @Project: 2024
 */

func SSEHandler(w http.ResponseWriter, r *http.Request) {
 // 設置SSE相關的響應頭
 w.Header().Set("Content-Type""text/event-stream")
 w.Header().Set("Cache-Control""no-cache")
 w.Header().Set("Connection""keep-alive")
 w.Header().Set("Access-Control-Allow-Origin""*")

 // 檢查是否支持Flush
 flusher, ok := w.(http.Flusher)
 if !ok {
  http.Error(w, "Streaming unsupported!", http.StatusInternalServerError)
  return
 }

 // 創建一個通道,用於將輸入的數據發送到SSE
 inputChan := make(chan string)

 // 啓動一個Goroutine來讀取標準輸入併發送到通道
 go func() {
  scanner := bufio.NewScanner(os.Stdin)
  for scanner.Scan() {
   text := scanner.Text()
   fmt.Println("Read from stdin:", text) // 輸出讀取到的內容
   inputChan <- text
  }
  close(inputChan)
 }()

 // 監聽通道中的數據並推送到客戶端
 for {
  select {
  case msg, ok := <-inputChan:
   if !ok {
    // 通道關閉,結束SSE
    fmt.Fprint(w, "data: Connection closed\n\n")
    flusher.Flush()
    return
   }
   fmt.Println("Pushing to client:", msg) // 輸出即將推送的內容
   _, err := fmt.Fprintf(w, "data: %s\n\n", msg)
   if err != nil {
    // 推送失敗,可能是客戶端斷開了連接
    fmt.Println("Client disconnected:", err)
    return
   }
   flusher.Flush()
  }
 }
}

func main() {
 http.HandleFunc("/events", SSEHandler)

 fmt.Println("Starting server on :8000")
 err := http.ListenAndServe(":8000", nil)
 if err != nil {
  fmt.Println("Error starting server:", err)
 }
}
客戶端
const eventSource = new EventSource("http://localhost:8080/events");

eventSource.onmessage = function(event) {
  console.log("New message:", event.data);
};

eventSource.onerror = function() {
  console.error("Error occurred while receiving events.");
};
效果如下圖

通過開源庫 eventsource 直接支持了 SSE,使用這個庫構建服務器

使用開源庫
package main

import (
    "fmt"
    "log"
    "net/http"
    "time"

    "gopkg.in/antage/eventsource.v1"
)

func main() {
    es := eventsource.New(nil, nil)
    defer es.Close()

    http.Handle("/", http.FileServer(http.Dir("./public")))
    http.Handle("/events", es)
    go func() {
        for {
            // 每2秒發送一條當前時間消息,並打印對應客戶端數量
            es.SendEventMessage(fmt.Sprintf("hello, now is: %s", time.Now())"""")
            log.Printf("Hello has been sent (consumers: %d)", es.ConsumersCount())
            time.Sleep(2 * time.Second)
        }
    }()

    log.Println("Open URL http://localhost:8080/ in your browser.")
    err := http.ListenAndServe(":8080", nil)
    if err != nil {
        log.Fatal(err)
    }
}

注意

總結

通過將站內信推送從 WebSocket 切換到 SSE,我們能夠實現更高效的低頻推送。SSE 的簡單實現和低資源消耗使其成爲處理此類場景的理想選擇。未來,如果有更多的優化需求或技術挑戰,持續探索和調整將是提升系統性能和用戶體驗的關鍵。

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/sNsH3JlrfP1u_mpqwfKPVA