什麼是 WebSocket?Go_WebSocket 編程來了!

爲什麼要有 WebSocket?

已經有了 HTTP 了爲什麼還要有 WebSocket 呢?

因爲,HTTP 的請求只能由客戶端發起,服務器接收。但是,現在想要讓服務器端也可以主動發起請求。那麼使用 HTTP 是無法滿足的。

其次,還有一種就是,如果想要監聽服務端發送的請求。那麼,可以讓客戶端始終處於一種輪詢狀態。客戶端每隔一段就發起一個詢問,看一下服務端有沒有請求信息。

使用輪詢的缺點非常明顯,流量一旦很大的話,後端服務的壓力就很大。而且在高併發的情況下,很容易引起雪崩。例如,服務器端因爲壓力大導致在輪詢的時候沒有辦法返回正常的響應,使得客戶端進一步輪詢等。綜合使用 HTTP 的效率極其低下。

就在這樣的場景下,工程師們經過不懈的思考,WebSocket 由此誕生。

WebSocket 的特點 & 場景

WebSocket 是一種在單個 TCP 連接上進行全雙工通信的協議,設計用於在客戶端(通常是瀏覽器)和服務器之間建立持久連接,以實現實時數據傳輸。

1. 實時通信

優點:

使用場景:

2. 持久連接

優點:

使用場景:

3. 簡化開發

優點:

使用場景:

4. 更高效的資源使用

優點:

使用場景:

5. 替代輪詢和長輪詢

優點:

使用場景:

WebSocket 的建立的過程

WebSocket 的初始化過程非常簡單,可以理解爲是在 HTTP 基礎上進行了協商之後,將 HTTP 協議升級成了 WebSocket 協議。

1. 建立連接(握手)

WebSocket 連接始於客戶端向服務器發送一個 HTTP 請求,以啓動 WebSocket 握手過程。

客戶端請求

客戶端發送一個 HTTP GET 請求,其中包含一些特定的頭部字段來表示請求升級到 WebSocket 協議。以下是一個典型的握手請求示例:

GET /chat HTTP/1.1
Host: example.com
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==
Sec-WebSocket-Version: 13

服務器響應

服務器接收到握手請求後,生成一個響應,並確認協議升級。以下是一個典型的握手響應示例:

HTTP/1.1 101 Switching Protocols
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Accept: s3pPLMBiTxaQ9kYGzzhZRbK+xOo=

一旦握手成功,HTTP 連接將升級爲 WebSocket 連接,並且可以開始進行數據傳輸。

2. 數據傳輸

在 WebSocket 連接建立後,客戶端和服務器可以通過這個連接進行全雙工(雙向)數據傳輸。數據通過幀(frame)進行傳輸。

發送消息

客戶端和服務器都可以發送文本幀或二進制幀。例如,發送一個文本消息: (這裏我們先使用 javascrip)

// 客戶端
const socket = new WebSocket('ws://example.com/chat');

socket.onopen = function(event) {
  socket.send('Hello Server!');
};

// 服務器(使用 Node.js WebSocket 庫)
const WebSocket = require('ws');
const wss = new WebSocket.Server({ port: 8080 });

wss.on('connection'function connection(ws) {
  ws.on('message'function incoming(message) {
    console.log('received: %s', message);
    ws.send('Hello Client!');
  });
});

接收消息

客戶端和服務器都可以接收消息:

// 客戶端
socket.onmessage = function(event) {
  console.log('Message from server: ', event.data);
};

// 服務器
ws.on('message'function incoming(message) {
  console.log('Message from client: ', message);
});

3. 關閉連接

WebSocket 連接可以由客戶端或服務器任意一方關閉。關閉連接的過程包括髮送一個關閉幀。

WebSocket 的報文

WebSocket 是一種全雙工、雙向通信協議,允許客戶端和服務器之間進行實時數據交換。WebSocket 的報文結構在設計上非常高效,具有較小的開銷。

WebSocket 幀結構

WebSocket 數據通過幀(frame)進行傳輸,每個幀由以下幾個部分組成:

  1. FIN, RSV1, RSV2, RSV3 和 Opcode(第 1 個字節)

  2. Mask 和 Payload Length(第 2 個字節)

  3. 擴展的 Payload Length(可選)(第 3-10 個字節)

  4. Masking Key(可選)(第 4-14 個字節)

  5. Payload Data(有效負載數據)(第 5-14 個字節之後)

1. FIN, RSV1, RSV2, RSV3 和 Opcode

2. Mask 和 Payload Length

3. 擴展的 Payload Length(可選)

4. Masking Key(可選)

5. Payload Data(有效負載數據)

好的,讓我們詳細講解一個 WebSocket 幀的示例,假設客戶端發送一條文本消息 "Hello"。我們將逐步拆解這個幀的各個組成部分。

示例:發送文本消息 "Hello"

1. 構建幀頭部

假設我們要發送的消息是 "Hello",它的字節表示是:0x48 0x65 0x6C 0x6C 0x6F

第 1 字節:FIN, RSV1, RSV2, RSV3, Opcode
FIN               RSV1  RSV2  RSV3  Opcode
1                 0     0     0     0001

這個字節的二進制表示是 1000 0001,即 0x81

第 2 字節:Mask, Payload Length
Mask  Payload Length
1     000 0101

這個字節的二進制表示是 1000 0101,即 0x85

第 3-6 字節:Masking Key

假設 Masking Key 是 0x37 0xFA 0x21 0x3D,這個是客戶端隨機生成的用於掩碼處理。

2. 構建幀負載數據

對負載數據進行掩碼處理:

掩碼處理是通過每個負載字節與對應的 Masking Key 字節進行異或(XOR)操作:

0x48 ^ 0x37 = 0x7F
0x65 ^ 0xFA = 0x9F
0x6C ^ 0x21 = 0x4D
0x6C ^ 0x3D = 0x51
0x6F ^ 0x37 = 0x58

掩碼後的負載數據是: 0x7F 0x9F 0x4D 0x51 0x58

3. 完整的 WebSocket 幀

將所有部分組合在一起,形成完整的 WebSocket 幀:

0x81              // FIN=1, RSV1=0, RSV2=0, RSV3=0, Opcode=1
0x85              // Mask=1, Payload Length=5
0x37 0xFA 0x21 0x3D  // Masking Key
0x7F 0x9F 0x4D 0x51 0x58  // 掩碼後的負載數據 "Hello"

因此,完整的幀表示爲:

0x81 0x85 0x37 0xFA 0x21 0x3D 0x7F 0x9F 0x4D 0x51 0x58

WebSocket 的保活原理

保活機制(Keep-Alive)旨在確保連接在長時間不活動後仍然保持打開狀態,以防止連接由於網絡設備的超時策略而被意外關閉。

保活機制主要通過發送心跳消息(通常是 Ping/Pong 幀)來實現。

心跳機制

1. Ping/Pong 幀

WebSocket 協議內置了 Ping 和 Pong 幀,用於保持連接的活躍狀態:

2. 工作原理

實現心跳機制

心跳機制的實現通常依賴於以下兩方面:客戶端實現和服務器實現。以下是一些常見的實現方式:

1. 客戶端實現

在客戶端中,可以使用 JavaScript 的 setInterval 方法定期發送 Ping 幀。例如:

const socket = new WebSocket('ws://example.com/socket');

socket.onopen = function() {
  // 每隔30秒發送一個Ping幀
  setInterval(function() {
    socket.send(JSON.stringify({ type: 'ping' }));
  }, 30000);
};

socket.onmessage = function(event) {
  const message = JSON.parse(event.data);
  if (message.type === 'pong') {
    console.log('Received pong');
  }
};

2. 服務器實現

在服務器中,可以使用 WebSocket 庫提供的功能來實現心跳機制。例如,在 Node.js 中使用 ws 庫:

const WebSocket = require('ws');

const wss = new WebSocket.Server({ port: 8080 });

wss.on('connection'function(ws) {
  ws.isAlive = true;

  ws.on('pong'function() {
    ws.isAlive = true;
  });

  const interval = setInterval(function ping() {
    wss.clients.forEach(function each(ws) {
      if (ws.isAlive === false) return ws.terminate();

      ws.isAlive = false;
      ws.ping();
    });
  }, 30000);

  ws.on('close'function() {
    clearInterval(interval);
  });
});

解釋:在這個示例中,服務器每隔 30 秒向所有連接的客戶端發送 Ping 幀,並檢查是否接收到 Pong 幀。如果沒有接收到 Pong 幀,則認爲連接已斷開並關閉該連接。

其他保活機制

除了 Ping/Pong 幀,WebSocket 連接的保活還可能依賴於以下機制:

1. TCP Keep-Alive

一些 WebSocket 實現可能依賴於底層的 TCP Keep-Alive 機制來確保連接的活躍狀態。TCP Keep-Alive 是一種在 TCP 層實現的保活機制,通過發送空的 TCP 數據包來保持連接。

2. 應用層心跳

在應用層,也可以實現自定義的心跳機制。例如,定期發送應用特定的心跳消息,並檢查是否接收到預期的響應。

WebSocket API

使用 gorilla/websocket 庫來建立 WebSocket 連接,並在連接上進行數據的讀寫。

組合 API

type WsServer struct {
 *websocket.Conn
}

WebSocket 升級

通過 websocket.Upgrader 實例將 HTTP 連接升級爲 WebSocket 連接。

upgrader := &websocket.Upgrader{}
 http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
  c, err := upgrader.Upgrade(w, r, nil)
  if err != nil {
   w.Write([]byte("upgrade error"))
   return
  }
  conn := &WsServer{Conn:c}
 })

讀取 WebSocket 消息

使用一個 goroutine 來讀取 WebSocket 消息,並根據消息類型進行處理。

go func() {
   for {
    typ, msg, err := conn.ReadMessage()
    if err != nil {
     return
    }
    switch typ {
    case websocket.CloseMessage:
     conn.Close()
     return
    default:
     t.Logf("msg:%s", msg)
    }
   }
  }()

發送 WebSocket 消息

另一個 goroutine 定期發送消息到客戶端。

go func() {
   ticker := time.NewTicker(time.Second * 3)
   for now := range ticker.C {
    err := conn.WriteMessage(websocket.TextMessage, []byte("Hello"+now.String()))
    if err != nil {
     return
    }
   }
  }()

啓動服務

// ws://localhost:8081/ws
http.ListenAndServe(":8081", nil)

以上實現了一個極其簡單的 WebSocket。

多客戶端協調

思路一

假如,我們的服務端都在同一個節點上,我們只需要在服務器內部做一個簡單的轉發機制就可以啦。

最簡單的做法就是,每次連接上一個客戶端,我們就將其保存在內存裏面。而後,如果從一個客戶端接收到了消息,就轉發給別的客戶端。

type Hub struct {
 // 封裝的 map key 爲房間號,value 爲房間內的所有連接
 conns *syncx.Map[string, *websocket.Conn]
}

func (h *Hub) AddConn(name string, conn *websocket.Conn) {
 h.conns.Store(name, conn)
 go func() {
  // 接收消息
  typ, msg, err := conn.ReadMessage()
  if err != nil {
   return
  }
  switch typ {
  case websocket.CloseMessage:
   h.conns.Delete(name)
   conn.Close()
   return
  default:
   // 廣播消息
   log.Println("from client:", typ, string(msg), name)
   h.conns.Range(func(key string, value *websocket.Conn) bool {
    if key == name {
     // 不發送給自己
     return true
    }
    log.Println("to client:", key)
    err := value.WriteMessage(typ, msg)
    if err != nil {
     log.Println(err)
    }
    return true
   })
  }
 }()
}

測試:

func TestForward(t *testing.T) {
 upgrader := websocket.Upgrader{}
 hub := &Hub{
  conns: &syncx.Map[string, *websocket.Conn]{},
 }
 http.HandleFunc("/ws", func(w http.ResponseWriter, r *http.Request) {
  c, err := upgrader.Upgrade(w, r, nil)
  if err != nil {
   w.Write([]byte("upgrade error"))
   return
  }
  name := r.URL.Query().Get("name")
  hub.AddConn(name, c)
 })
 // ws://localhost:8081/ws?name=ypb
 http.ListenAndServe(":8081", nil)
}

思路二

實現 WebSocket 多客戶端協調涉及管理多個 WebSocket 連接,並在必要時廣播消息或在客戶端之間傳遞消息。這通常通過維護一個連接池來實現。

定義一個結構體來封裝 WebSocket 連接,幷包含一個連接池來管理所有的連接。

// Ws websocket
type Ws struct {
 Conn *websocket.Conn
}

// ConnectionPool connection pool
type ConnectionPool struct {
 connections map[*websocket.Conn]bool
 lock        sync.Mutex
}

// Add new connection
func (p *ConnectionPool) Add(conn *websocket.Conn) {
 p.lock.Lock()
 defer p.lock.Unlock()
 p.connections[conn] = true
}

// Remove connection
func (p *ConnectionPool) Remove(conn *websocket.Conn) {
 p.lock.Lock()
 defer p.lock.Unlock()
 for _, ok := p.connections[conn]; ok; {
  delete(p.connections, conn)
  conn.Close()
 }
}

// Broadcast message
func (p *ConnectionPool) Broadcast(messageType int, message []byte) {
 p.lock.Lock()
 defer p.lock.Unlock()
 for conn := range p.connections {
  err := conn.WriteMessage(messageType, message)
  if err != nil {
   log.Printf("發送消息錯誤: %v", err)
   conn.Close()
   delete(p.connections, conn)
  }
 }
}

實現一個 SebSocket 處理函數。

func wsHandler(w http.ResponseWriter, r *http.Request) {
 upgrader := &websocket.Upgrader{}
 pool := &ConnectionPool{
  connections: make(map[*websocket.Conn]bool),
 }
 conn, err := upgrader.Upgrade(w, r, nil)
 if err != nil {
  http.Error(w, "無法升級到 WebSocket", http.StatusInternalServerError)
  return
 }

 // 添加新的連接到連接池
 pool.Add(conn)
 defer pool.Remove(conn)

 // 讀取消息
 for {
  messageType, message, err := conn.ReadMessage()
  if err != nil {
   log.Printf("讀取消息錯誤: %v", err)
   break
  }
  log.Printf("接收到消息: %s", message)

  // 廣播消息到所有連接
  pool.Broadcast(messageType, message)
 }
}

測試:

func Test_wsHandler(t *testing.T) {
 http.HandleFunc("/ws", wsHandler)

 go func() {
  server := gin.Default()
  server.GET("/", func(ctx *gin.Context) {
   ctx.String(http.StatusOK, "Hello, WebSocket!")
  })
  server.Run(":8082")
 }()

 log.Fatal(http.ListenAndServe(":8081", nil))
}

總結

以上就是本次關於 WebSocket 的講解,向大家介紹了 WebSocket 的發展歷程,數據報文字段(這部分不是重點),並且給大家講述瞭如何實現一個簡單的 WebSocket 樣例。

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/1SpVkCx8v3iYk91nfCjuAQ