WebSocket 集羣解決方案

問題起因

最近做項目時遇到了需要多用戶之間通信的問題,涉及到了 WebSocket 握手請求,以及集羣中 WebSocket Session 共享的問題。

期間我經過了幾天的研究,總結出了幾個實現分佈式 WebSocket 集羣的辦法,從 zuul 到 spring cloud gateway 的不同嘗試,總結出了這篇文章,希望能幫助到某些人,並且能一起分享這方面的想法與研究。

以下是我的場景描述

系統架構圖

在我的實現裏,每個應用服務器都負責 http and ws 請求,其實也可以將 ws 請求建立的聊天模型單獨成立爲一個模塊。從分佈式的角度來看,這兩種實現類型差不多,但從實現方便性來說,一個應用服務 http+ws 請求的方式更爲方便。下文會有解釋

本文涉及的技術棧

相信能走到這一步的人都瞭解過我上面列舉的技術棧了,如果還沒有,可以先去網上找找入門教程瞭解一下。下面的內容都與上述技術相關,題主默認大家都瞭解過了...

技術可行性分析

下面我將描述 session 特性,以及根據這些特性列舉出 n 個解決分佈式架構中處理 ws 請求的集羣方案

WebSocketSession 與 HttpSession

在 Spring 所集成的 WebSocket 裏面,每個 ws 連接都有一個對應的 session:WebSocketSession,在 Spring WebSocket 中,我們建立 ws 連接之後可以通過類似這樣的方式進行與客戶端的通信:

protected void handleTextMessage(WebSocketSession session, TextMessage message) {
   System.out.println("服務器接收到的消息: "+ message );
   //send message to client
   session.sendMessage(new TextMessage("message"));
}

那麼問題來了:ws 的 session 無法序列化到 redis,因此在集羣中,我們無法將所有 WebSocketSession 都緩存到 redis 進行 session 共享。每臺服務器都有各自的 session。於此相反的是 HttpSession,redis 可以支持 httpsession 共享,但是目前沒有 websocket session 共享的方案,因此走 redis websocket session 共享這條路是行不通的。

有的人可能會想:我可不可以將 sessin 關鍵信息緩存到 redis,集羣中的服務器從 redis 拿取 session 關鍵信息然後重新構建 websocket session... 我只想說這種方法如果有人能試出來,請告訴我一聲...

以上便是 websocket session 與 http session 共享的區別,總的來說就是 http session 共享已經有解決方案了,而且很簡單,只要引入相關依賴:spring-session-data-redisspring-boot-starter-redis,大家可以從網上找個 demo 玩一下就知道怎麼做了。而 websocket session 共享的方案由於 websocket 底層實現的方式,我們無法做到真正的 websocket session 共享。

解決方案的演變

Netty 與 Spring WebSocket

剛開始的時候,我嘗試着用 netty 實現了 websocket 服務端的搭建。在 netty 裏面,並沒有 websocket session 這樣的概念,與其類似的是 channel,每一個客戶端連接都代表一個 channel。前端的 ws 請求通過 netty 監聽的端口,走 websocket 協議進行 ws 握手連接之後,通過一些列的 handler(責鏈模式)進行消息處理。與 websocket session 類似地,服務端在連接建立後有一個 channel,我們可以通過 channel 進行與客戶端的通信

   /**
    * TODO 根據服務器傳進來的id,分配到不同的group
    */
   private static final ChannelGroup GROUP = new DefaultChannelGroup(ImmediateEventExecutor.INSTANCE);
 
   @Override
   protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
       //retain增加引用計數,防止接下來的調用引用失效
       System.out.println("服務器接收到來自 " + ctx.channel().id() + " 的消息: " + msg.text());
       //將消息發送給group裏面的所有channel,也就是發送消息給客戶端
       GROUP.writeAndFlush(msg.retain());
   }

那麼,服務端用 netty 還是用 spring websocket?以下我將從幾個方面列舉這兩種實現方式的優缺點

使用 netty 實現 websocket

玩過 netty 的人都知道 netty 是的線程模型是 nio 模型,併發量非常高,spring5 之前的網絡線程模型是 servlet 實現的,而 servlet 不是 nio 模型,所以在 spring5 之後,spring 的底層網絡實現採用了 netty。如果我們單獨使用 netty 來開發 websocket 服務端,速度快是絕對的,但是可能會遇到下列問題:

  1. 與系統的其他應用集成不方便,在 rpc 調用的時候,無法享受 springcloud 裏 feign 服務調用的便利性

  2. 業務邏輯可能要重複實現

  3. 使用 netty 可能需要重複造輪子

  4. 怎麼連接上服務註冊中心,也是一件麻煩的事情

  5. restful 服務與 ws 服務需要分開實現,如果在 netty 上實現 restful 服務,有多麻煩可想而知,用 spring 一站式 restful 開發相信很多人都習慣了。

使用 spring websocket 實現 ws 服務

spring websocket 已經被 springboot 很好地集成了,所以在 springboot 上開發 ws 服務非常方便,做法非常簡單

第一步:添加依賴

<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-websocket</artifactId>
</dependency>

第二步:添加配置類

@Configuration
public class WebSocketConfig implements WebSocketConfigurer {
@Override
public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
    registry.addHandler(myHandler()"/")
        .setAllowedOrigins("*");
}
 
@Bean
 public WebSocketHandler myHandler() {
     return new MessageHandler();
 }
}

第三步:實現消息監聽類

@Component
@SuppressWarnings("unchecked")
public class MessageHandler extends TextWebSocketHandler {
   private List<WebSocketSession> clients = new ArrayList<>();
 
   @Override
   public void afterConnectionEstablished(WebSocketSession session) {
       clients.add(session);
       System.out.println("uri :" + session.getUri());
       System.out.println("連接建立: " + session.getId());
       System.out.println("current seesion: " + clients.size());
   }
 
   @Override
   public void afterConnectionClosed(WebSocketSession session, CloseStatus status) {
       clients.remove(session);
       System.out.println("斷開連接: " + session.getId());
   }
 
   @Override
   protected void handleTextMessage(WebSocketSession session, TextMessage message) {
       String payload = message.getPayload();
       Map<String, String> map = JSONObject.parseObject(payload, HashMap.class);
       System.out.println("接受到的數據" + map);
       clients.forEach(s -> {
           try {
               System.out.println("發送消息給: " + session.getId());
               s.sendMessage(new TextMessage("服務器返回收到的信息," + payload));
           } catch (Exception e) {
               e.printStackTrace();
           }
       });
   }
}

從這個 demo 中,使用 spring websocket 實現 ws 服務的便利性大家可想而知了。爲了能更好地向 spring cloud 大家族看齊,我最終採用了 spring websocket 實現 ws 服務。

因此我的應用服務架構是這樣子的:一個應用既負責 restful 服務,也負責 ws 服務。沒有將 ws 服務模塊拆分是因爲拆分出去要使用 feign 來進行服務調用。第一本人比較懶惰,第二拆分與不拆分相差在多了一層服務間的 io 調用,所以就沒有這麼做了。

學習資料:Java 進階視頻資源

從 zuul 技術轉型到 spring cloud gateway

要實現 websocket 集羣,我們必不可免地得從 zuul 轉型到 spring cloud gateway。原因如下:

zuul1.0 版本不支持 websocket 轉發,zuul 2.0 開始支持 websocket,zuul2.0 幾個月前開源了,但是 2.0 版本沒有被 spring boot 集成,而且文檔不健全。因此轉型是必須的,同時轉型也很容易實現。

在 gateway 中,爲了實現 ssl 認證和動態路由負載均衡,yml 文件中以下的某些配置是必須的,在這裏提前避免大家採坑

server:
  port: 443
  ssl:
    enabled: true
    key-store: classpath:xxx.jks
    key-store-password: xxxx
    key-store-type: JKS
    key-alias: alias
spring:
  application:
    name: api-gateway
  cloud:
    gateway:
      httpclient:
        ssl:
          handshake-timeout-millis: 10000
          close-notify-flush-timeout-millis: 3000
          close-notify-read-timeout-millis: 0
          useInsecureTrustManager: true
      discovery:
        locator:
          enabled: true
          lower-case-service-id: true
      routes:
      - id: dc
        uri: lb://dc
        predicates:
        - Path=/dc/**
      - id: wecheck
        uri: lb://wecheck
        predicates:
        - Path=/wecheck/**

如果要愉快地玩 https 卸載,我們還需要配置一個 filter,否則請求網關時會出現錯誤 not an SSL/TLS record

@Component
public class HttpsToHttpFilter implements GlobalFilter, Ordered {
  private static final int HTTPS_TO_HTTP_FILTER_ORDER = 10099;
  @Override
  public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
      URI originalUri = exchange.getRequest().getURI();
      ServerHttpRequest request = exchange.getRequest();
      ServerHttpRequest.Builder mutate = request.mutate();
      String forwardedUri = request.getURI().toString();
      if (forwardedUri != null && forwardedUri.startsWith("https")) {
          try {
              URI mutatedUri = new URI("http",
                      originalUri.getUserInfo(),
                      originalUri.getHost(),
                      originalUri.getPort(),
                      originalUri.getPath(),
                      originalUri.getQuery(),
                      originalUri.getFragment());
              mutate.uri(mutatedUri);
          } catch (Exception e) {
              throw new IllegalStateException(e.getMessage(), e);
          }
      }
      ServerHttpRequest build = mutate.build();
      ServerWebExchange webExchange = exchange.mutate().request(build).build();
      return chain.filter(webExchange);
  }
 
  @Override
  public int getOrder() {
      return HTTPS_TO_HTTP_FILTER_ORDER;
  }
}

這樣子我們就可以使用 gateway 來卸載 https 請求了,到目前爲止,我們的基本框架已經搭建完畢,網關既可以轉發 https 請求,也可以轉發 wss 請求。接下來就是用戶多對多之間 session 互通的通訊解決方案了。接下來,我將根據方案的優雅性,從最不優雅的方案開始講起。

session 廣播

這是最簡單的 websocket 集羣通訊解決方案。場景如下:

教師 A 想要羣發消息給他的學生們

session 廣播實現很簡單,但是有一個致命缺陷:計算力浪費現象,當服務器沒有消息接收者 session 的時候,相當於浪費了一次循環遍歷的計算力,該方案在併發需求不高的情況下可以優先考慮,實現很容易。

spring cloud 中獲取服務集羣中每臺服務器信息的方法如下

@Resource
private EurekaClient eurekaClient;
 
Application app = eurekaClient.getApplication("service-name");
//instanceInfo包括了一臺服務器ip,port等消息
InstanceInfo instanceInfo = app.getInstances().get(0);
System.out.println("ip address: " + instanceInfo.getIPAddr());

服務器需要維護關係映射表,將用戶的 id 與 session 做映射,session 建立時在映射表中添加映射關係,session 斷開後要刪除映射表內關聯關係

一致性哈希算法實現(本文的要點)

這種方法是本人認爲最優雅的實現方案,理解這種方案需要一定的時間,如果你耐心看下去,相信你一定會有所收穫。再強調一次,不瞭解一致性哈希算法的同學請先看這裏,現先假設哈希環是順時針查找的。

首先,想要將一致性哈希算法的思想應用到我們的 websocket 集羣,我們需要解決以下新問題:

在集羣中,總會出現服務 UP/DOWN 的問題。

針對節點 DOWN 的問題分析如下:

一個服務器 DOWN 的時候,其擁有的 websocket session 會自動關閉連接,並且前端會收到通知。此時會影響到哈希環的映射錯誤。我們只需要當監聽到服務器 DOWN 的時候,刪除哈希環上面對應的實際結點和虛結點,避免讓網關轉發到狀態是 DOWN 的服務器上。

實現方法:在 eureka 治理中心監聽集羣服務 DOWN 事件,並及時更新哈希環。

學習資料:Java 進階視頻資源

針對節點 UP 的問題分析如下:

現假設集羣中有服務 CacheB 上線了,該服務器的 ip 地址剛好被映射到 key1 和 cacheA 之間。那麼 key1 對應的用戶每次要發消息時都跑去 CacheB 發送消息,結果明顯是發送不了消息,因爲 CacheB 沒有 key1 對應的 session。

此時我們有兩種解決方案。

方案 A 簡單,動作大:

eureka 監聽到節點 UP 事件之後,根據現有集羣信息,更新哈希環。並且斷開所有 session 連接,讓客戶端重新連接,此時客戶端會連接到更新後的哈希環節點,以此避免消息無法送達的情況。

方案 B 複雜,動作小:

我們先看看沒有虛擬節點的情況,假設 CacheC 和 CacheA 之間上線了服務器 CacheB。所有映射在 CacheC 到 CacheB 的用戶發消息時都會去 CacheB 裏面找 session 發消息。也就是說 CacheB 一但上線,便會影響到 CacheC 到 CacheB 之間的用戶發送消息。所以我們只需要將 CacheA 斷開 CacheC 到 CacheB 的用戶所對應的 session,讓客戶端重連。

接下來是有虛擬節點的情況,假設淺色的節點是虛擬節點。我們用長括號來代表某段區域映射的結果屬於某個 Cache。首先是 C 節點未上線的情況。圖大家應該都懂吧,所有 B 的虛擬節點都會指向真實的 B 節點,所以所有 B 節點逆時針那一部分都會映射到 B(因爲我們規定哈希環順時針查找)。

接下來是 C 節點上線的情況,可以看到某些區域被 C 佔領了。

由以上情況我們可以知道:節點上線,會有許多對應虛擬節點也同時上線,因此我們需要將多段範圍 key 對應的 session 斷開連接(上圖紅色的部分)。具體算法有點複雜,實現的方式因人而異,大家可以嘗試一下自己實現算法。

哈希環應該放在哪裏?

至此我們的 spring websocket 集羣已經搭建的差不多了,最重要的地方還是一致性哈希算法。現在有最後一個技術瓶頸,網關如何根據 ws 請求轉發到指定的集羣服務器上?

答案在負載均衡。spring cloud gateway 或 zuul 都默認集成了 ribbon 作爲負載均衡,我們只需要根據建立 ws 請求時客戶端發來的 user id,重寫 ribbon 負載均衡算法,根據 user id 進行 hash,並在哈希環上尋找 ip,並將 ws 請求轉發到該 ip 便完事了。流程如下圖所示:

接下來用戶溝通的時候,只需要根據 id 進行 hash,在哈希環上獲取對應 ip,便可以知道與該用戶建立 ws 連接時的 session 存在哪臺服務器上了!

spring cloud Finchley.RELEASE 版本中 ribbon 未完善的地方

題主在實際操作的時候發現了 ribbon 兩個不完善的地方......

難道這樣子我們就沒有辦法了嗎?其實還有一個可行並且暫時可替代的辦法!

如下圖所示,客戶端發送一個普通的 http 請求(包含 id 參數)給網關,網關根據 id 進行 hash,在哈希環中尋找 ip 地址,將 ip 地址返回給客戶端,客戶端再根據該 ip 地址進行 ws 請求。

由於 ribbon 未完善 key 的處理,我們暫時無法在 ribbon 上實現一致性哈希算法。只能間接地通過客戶端發起兩次請求(一次 http,一次 ws)的方式來實現一致性哈希。希望不久之後 ribbon 能更新這個缺陷!讓我們的 websocket 集羣實現得更優雅一點。

後記

以上便是我這幾天探索的結果。期間遇到了許多問題,並逐一解決難題,列出兩個 websocket 集羣解決方案。第一個是 session 廣播,第二個是一致性哈希。

這兩種方案針對不同場景各有優缺點,本文並未用到 ActiveMQ,Karfa 等消息隊列實現消息推送,只是想通過自己的想法,不依靠消息隊列來簡單地實現多用戶之間的長連接通訊。希望能爲大家提供一條不同於尋常的思路。

來源:blog.csdn.net/weixin_34194702/

article/details/88701309

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/or5Rq-e5KkiVKVRTt48aGQ