WebSocket 的 6 種集成方式

由於前段時間我實現了一個庫【Spring Cloud】一個配置註解實現 WebSocket 集羣方案

以至於我對 WebSocket 的各種集成方式做了一些研究,目前我所瞭解到的就是下面這些了(就一個破 ws 都有這麼多花裏胡哨的集成方式了?)

今天主要介紹一下前 3 種方式,畢竟現在的主流框架還是 Spring Boot

而後 3 種其實和 Spring Boot 並不強行綁定,基於 Java 就可以支持,不過我也會對後 3 種做個簡單的介紹,大家先混個眼熟就行了

那麼接下來我們就來講講前 3 種方式(Javax,WebMVC,WebFlux)在 Spring Boot 中的服務端和客戶端配置(客戶端配置也超重要的有木有,平時用不到,用到了卻基本找不到文檔,這也太絕望了)

Javax

在 java 的擴展包 javax.websocket 中就定義了一套 WebSocket 的接口規範

服務端

一般使用註解的方式來進行配置

第一步

@Component
@ServerEndpoint("/websocket/{type}")
public class JavaxWebSocketServerEndpoint {

    @OnOpen
    public void onOpen(Session session, EndpointConfig config,
                       @PathParam(value = "type") String type) {
        //連接建立
    }

    @OnClose
    public void onClose(Session session, CloseReason reason) {
        //連接關閉
    }

    @OnMessage
    public void onMessage(Session session, String message) {
        //接收文本信息
    }

    @OnMessage
    public void onMessage(Session session, PongMessage message) {
        //接收pong信息
    }

    @OnMessage
    public void onMessage(Session session, ByteBuffer message) {
        //接收二進制信息,也可以用byte[]接收
    }

    @OnError
    public void onError(Session session, Throwable e) {
        //異常處理
    }
}

我們在類上添加 @ServerEndpoint 註解來表示這是一個服務端點,同時可以在註解中配置路徑,這個路徑可以配置成動態的,使用 {} 包起來就可以了

第二步

implementation 'org.springframework.boot:spring-boot-starter-websocket'
@Configuration(proxyBeanMethods = false)
public class JavaxWebSocketConfiguration {

    @Bean
    public ServerEndpointExporter serverEndpointExporter() {
        return new ServerEndpointExporter();
    }
}

依賴 Spring 的 WebSocket 模塊,手動注入 ServerEndpointExporter 就可以了

需要注意 ServerEndpointExporter 是 Spring 中的類,算是 Spring 爲了支持 javax.websocket 的原生用法所提供的支持類

冷知識

javax.websocket 庫中定義了 PongMessage 而沒有 PingMessage

通過我的測試發現基本上所有的 WebSocket 包括前端 js 自帶的,都實現了自動回覆;也就是說當接收到一個 ping 消息之後,是會自動迴應一個 pong 消息,所以沒有必要再自己接受 ping 消息來處理了,即我們不會接受到 ping 消息;

當然我上面講的 ping 和 pong 都是需要使用框架提供的 api,如果是我們自己通過 Message 來自定義心跳數據的話是沒有任何的處理的,下面是對應的 api

//發送ping
session.getAsyncRemote().sendPing(ByteBuffer buffer);

//發送pong
session.getAsyncRemote().sendPong(ByteBuffer buffer);

然後我又發現 js 自帶的 WebSocket 是沒有發送 ping 的 api 的,所以是不是可以猜想當初就是約定服務端發送 ping,客戶端回覆 pong

客戶端

客戶端也是使用註解配置

第一步

@ClientEndpoint
public class JavaxWebSocketClientEndpoint {

    @OnOpen
    public void onOpen(Session session) {
        //連接建立
    }

    @OnClose
    public void onClose(Session session, CloseReason reason) {
        //連接關閉
    }

    @OnMessage
    public void onMessage(Session session, String message) {
        //接收文本消息
    }

    @OnMessage
    public void onMessage(Session session, PongMessage message) {
        //接收pong消息
    }

    @OnMessage
    public void onMessage(Session session, ByteBuffer message) {
        //接收二進制消息
    }

    @OnError
    public void onError(Session session, Throwable e) {
        //異常處理
    }
}

客戶端使用 @ClientEndpoint 來標記,其他的 @OnOpen,@OnClose,@OnMessage,@OnError 和服務端一模一樣

第二步

WebSocketContainer container = ContainerProvider.getWebSocketContainer();
Session session = container.connectToServer(JavaxWebSocketClientEndpoint.class, uri);

我們可以通過 ContainerProvider 來獲得一個 WebSocketContainer,然後調用 connectToServer 方法將我們的客戶端類和連接的 uri 傳入就行了

冷知識

通過 ContainerProvider#getWebSocketContainer 獲得 WebSocketContainer 其實是基於 SPI 實現的

在 Spring 的環境中我更推薦大家使用 ServletContextAware 來獲得,代碼如下

@Component
public class JavaxWebSocketContainer implements ServletContextAware {

    private volatile WebSocketContainer container;

    public WebSocketContainer getContainer() {
        if (container == null) {
            synchronized (this) {
                if (container == null) {
                    container = ContainerProvider.getWebSocketContainer();
                }
            }
        }
        return container;
    }

    @Override
    public void setServletContext(@NonNull ServletContext servletContext) {
        if (container == null) {
            container = (WebSocketContainer) servletContext
                .getAttribute("javax.websocket.server.ServerContainer");
        }
    }
}

發消息

Session session = ...

//發送文本消息
session.getAsyncRemote().sendText(String message);

//發送二進制消息
session.getAsyncRemote().sendBinary(ByteBuffer message);

//發送對象消息,會嘗試使用Encoder編碼
session.getAsyncRemote().sendObject(Object message);

//發送ping
session.getAsyncRemote().sendPing(ByteBuffer buffer);

//發送pong
session.getAsyncRemote().sendPong(ByteBuffer buffer);

WebMVC

依賴肯定是必不可少的

implementation 'org.springframework.boot:spring-boot-starter-websocket'

服務端

第一步

import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.WebSocketMessage;
import org.springframework.web.socket.WebSocketSession;

public class ServletWebSocketServerHandler implements WebSocketHandler {

    @Override
    public void afterConnectionEstablished(@NonNull WebSocketSession session) throws Exception {
        //連接建立
    }

    @Override
    public void handleMessage(@NonNull WebSocketSession session, @NonNull WebSocketMessage<?> message) throws Exception {
        //接收消息
    }

    @Override
    public void handleTransportError(@NonNull WebSocketSession session, @NonNull Throwable exception) throws Exception {
        //異常處理
    }

    @Override
    public void afterConnectionClosed(@NonNull WebSocketSession session, @NonNull CloseStatus closeStatus) throws Exception {
        //連接關閉
    }

    @Override
    public boolean supportsPartialMessages() {
        //是否支持接收不完整的消息
        return false;
    }
}

我們實現一個 WebSocketHandler 來處理 WebSocket 的連接,關閉,消息和異常

第二步

@Configuration
@EnableWebSocket
public class ServletWebSocketServerConfigurer implements WebSocketConfigurer {

    @Override
    public void registerWebSocketHandlers(@NonNull WebSocketHandlerRegistry registry) {
        registry
            //添加處理器到對應的路徑
            .addHandler(new ServletWebSocketServerHandler()"/websocket")
            .setAllowedOrigins("*");
    }
}

首先需要添加 @EnableWebSocket 來啓用 WebSocket

然後實現 WebSocketConfigurer 來註冊 WebSocket 路徑以及對應的 WebSocketHandler。另外,搜索公衆號 Linux 就該這樣學後臺回覆 “Linux”,獲取一份驚喜禮包。

握手攔截

提供了 HandshakeInterceptor 來攔截握手

@Configuration
@EnableWebSocket
public class ServletWebSocketServerConfigurer implements WebSocketConfigurer {

    @Override
    public void registerWebSocketHandlers(@NonNull WebSocketHandlerRegistry registry) {
        registry
            //添加處理器到對應的路徑
            .addHandler(new ServletWebSocketServerHandler()"/websocket")
            //添加握手攔截器
            .addInterceptors(new ServletWebSocketHandshakeInterceptor())
            .setAllowedOrigins("*");
    }
    
    public static class ServletWebSocketHandshakeInterceptor implements HandshakeInterceptor {

        @Override
        public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Map<String, Object> attributes) throws Exception {
            //握手之前
            //繼續握手返回true, 中斷握手返回false
            return false;
        }

        @Override
        public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Exception exception) {
            //握手之後
        }
    }
}

冷知識

我在集成的時候發現這種方式沒辦法動態匹配路徑,它的路徑就是固定的,沒辦法使用如 / websocket/** 這樣的通配符

我在研究了一下之後發現可以在 UrlPathHelper 上做點文章

@Configuration
@EnableWebSocket
public class ServletWebSocketServerConfigurer implements WebSocketConfigurer {

    @Override
    public void registerWebSocketHandlers(@NonNull WebSocketHandlerRegistry registry) {
        if (registry instanceof ServletWebSocketHandlerRegistry) {
            //替換UrlPathHelper
            ((ServletWebSocketHandlerRegistry) registry)
                .setUrlPathHelper(new PrefixUrlPathHelper("/websocket"));
        }

        registry
            //添加處理器到對應的路徑
            .addHandler(new ServletWebSocketServerHandler()"/websocket/**")
            .setAllowedOrigins("*");
    }
    
    public class PrefixUrlPathHelper extends UrlPathHelper {

        private String prefix;

        @Override
        public @NonNull String resolveAndCacheLookupPath(@NonNull HttpServletRequest request) {
            //獲得原本的Path
            String path = super.resolveAndCacheLookupPath(request);
            //如果是指定前綴就返回對應的通配路徑
            if (path.startsWith(prefix)) {
                return prefix + "/**";
            }
            return path;
        }
    }
}

因爲它內部實際上就是用一個Map<String, WebSocketHandler>來存的,所以沒有辦法用通配符

主要是有現成的 AntPathMatcher 實現通配應該不麻煩纔對啊

客戶端

第一步

public class ServletWebSocketClientHandler implements WebSocketHandler {

    @Override
    public void afterConnectionEstablished(@NonNull WebSocketSession session) throws Exception {
        //連接建立
    }

    @Override
    public void handleMessage(@NonNull WebSocketSession session, @NonNull WebSocketMessage<?> message) throws Exception {
        //接收消息
    }

    @Override
    public void handleTransportError(@NonNull WebSocketSession session, @NonNull Throwable exception) throws Exception {
        //異常處理
    }

    @Override
    public void afterConnectionClosed(@NonNull WebSocketSession session, @NonNull CloseStatus closeStatus) throws Exception {
        //連接關閉
    }

    @Override
    public boolean supportsPartialMessages() {
        //是否支持接收不完整的消息
        return false;
    }
}

和服務端一樣我們需要先實現一個 WebSocketHandler 來處理 WebSocket 的連接,關閉,消息和異常

第二步

WebSocketClient client = new StandardWebSocketClient();
WebSocketHandler handler = new ServletWebSocketClientHandler();
WebSocketConnectionManager manager = new WebSocketConnectionManager(client, handler, uri);
manager.start();

首先我們需要先 new 一個 StandardWebSocketClient,可以傳入一個 WebSocketContainer 參數,獲得該對象的方式我之前已經介紹過了,這邊就先略過

然後 new 一個 WebSocketConnectionManager 傳入 WebSocketClient,WebSocketHandler 還有路徑 uri

最後調用一下 WebSocketConnectionManager 的 start 方法就可以啦

冷知識

這裏如果大家去看 WebSocketClient 的實現類就會發現有 StandardWebSocketClient 還有 JettyWebSocketClient 等等,所以大家可以根據自身項目所使用的容器來選擇不同的 WebSocketClient 實現類

這裏給大家貼一小段 Spring 適配不同容器 WebSocket 的代碼

public abstract class AbstractHandshakeHandler implements HandshakeHandler, Lifecycle {

    private static final boolean tomcatWsPresent;

    private static final boolean jettyWsPresent;

    private static final boolean jetty10WsPresent;

    private static final boolean undertowWsPresent;

    private static final boolean glassfishWsPresent;

    private static final boolean weblogicWsPresent;

    private static final boolean websphereWsPresent;

    static {
        ClassLoader classLoader = AbstractHandshakeHandler.class.getClassLoader();
        tomcatWsPresent = ClassUtils.isPresent(
            "org.apache.tomcat.websocket.server.WsHttpUpgradeHandler", classLoader);
        jetty10WsPresent = ClassUtils.isPresent(
            "org.eclipse.jetty.websocket.server.JettyWebSocketServerContainer", classLoader);
        jettyWsPresent = ClassUtils.isPresent(
            "org.eclipse.jetty.websocket.server.WebSocketServerFactory", classLoader);
        undertowWsPresent = ClassUtils.isPresent(
            "io.undertow.websockets.jsr.ServerWebSocketContainer", classLoader);
        glassfishWsPresent = ClassUtils.isPresent(
            "org.glassfish.tyrus.servlet.TyrusHttpUpgradeHandler", classLoader);
        weblogicWsPresent = ClassUtils.isPresent(
            "weblogic.websocket.tyrus.TyrusServletWriter", classLoader);
        websphereWsPresent = ClassUtils.isPresent(
            "com.ibm.websphere.wsoc.WsWsocServerContainer", classLoader);
    }
}

發消息

import org.springframework.web.socket.*;

WebSocketSession session = ...

//發送文本消息
session.sendMessage(new TextMessage(CharSequence message);

//發送二進制消息
session.sendMessage(new BinaryMessage(ByteBuffer message));

//發送ping
session.sendMessage(new PingMessage(ByteBuffer message));

//發送pong
session.sendMessage(new PongMessage(ByteBuffer message));

WebFlux

WebFlux 的 WebSocket 不需要額外的依賴包

服務端

第一步

import org.springframework.web.reactive.socket.WebSocketHandler;
import org.springframework.web.reactive.socket.WebSocketSession;

public class ReactiveWebSocketServerHandler implements WebSocketHandler {

    @NonNull
    @Override
    public Mono<Void> handle(WebSocketSession session) {
        Mono<Void> send = session.send(Flux.create(sink -> {
            //可以持有sink對象在任意時候調用next發送消息
            sink.next(WebSocketMessage message);
        })).doOnError(it -> {
            //異常處理
        });

        Mono<Void> receive = session.receive()
                .doOnNext(it -> {
                    //接收消息
                })
                .doOnError(it -> {
                    //異常處理
                })
                .then();

        @SuppressWarnings("all")
        Disposable disposable = session.closeStatus()
                .doOnError(it -> {
                    //異常處理
                })
                .subscribe(it -> {
                    //連接關閉
                });

        return Mono.zip(send, receive).then();
    }
}

首先需要注意這裏的 WebSocketHandler 和 WebSocketSession 是 reactive 包下的

第二步

@Component
public class ReactiveWebSocketServerHandlerMapping extends SimpleUrlHandlerMapping {

    public ReactiveWebSocketServerHandlerMapping() {
        Map<String, WebSocketHandler> map = new HashMap<>();
        map.put("/websocket/**", new ReactiveWebSocketServerHandler());
        setUrlMap(map);
        setOrder(100);
    }
}

註冊一個 HandlerMapping 同時配置路徑和對應的 WebSocketHandler

第三步

@Configuration(proxyBeanMethods = false)
public class ReactiveWebSocketConfiguration {

    @Bean
    public WebSocketHandlerAdapter webSocketHandlerAdapter() {
        return new WebSocketHandlerAdapter();
    }
}

注入 WebSocketHandlerAdapter

冷知識

我們自定義的 HandlerMapping 需要設置 order,如果不設置,默認爲 Ordered.LOWEST_PRECEDENCE,會導致這個 HandlerMapping 被放在最後,當有客戶端連接上來時會被其他的 HandlerMapping 優先匹配上而連接失敗

客戶端

第一步

public class ReactiveWebSocketClientHandler implements WebSocketHandler {

    @NonNull
    @Override
    public Mono<Void> handle(WebSocketSession session) {
        Mono<Void> send = session.send(Flux.create(sink -> {
            //可以持有sink對象在任意時候調用next發送消息
            sink.next(WebSocketMessage message);
        })).doOnError(it -> {
            //處理異常
        });

        Mono<Void> receive = session.receive()
                .doOnNext(it -> {
                    //接收消息
                })
                .doOnError(it -> {
                    //異常處理
                })
                .then();

        @SuppressWarnings("all")
        Disposable disposable = session.closeStatus()
                .doOnError(it -> {
                    //異常處理
                })
                .subscribe(it -> {
                    //連接關閉
                });

        return Mono.zip(send, receive).then();
    }
}

客戶端 WebSocketHandler 的寫法和服務端的一樣

第二步

import org.springframework.web.reactive.socket.client.WebSocketClient;

WebSocketClient client = ReactorNettyWebSocketClient();
WebSocketHandler handler = new ReactiveWebSocketClientHandler();
client.execute(uri, handler).subscribe();

首先我們需要先 new 一個 ReactorNettyWebSocketClient

然後調用一下 WebSocketClient 的 execute 方法傳入路徑 uri 和 WebSocketHandler 並繼續調用 subscribe 方法就行啦

冷知識

和 WebMVC 中的 WebSocketClient 一樣,Reactive 包中的 WebSocketClient 也有很多實現類,比如 ReactorNettyWebSocketClient,JettyWebSocketClient,UndertowWebSocketClient,TomcatWebSocketClient 等等,也是需要大家基於自身項目的容器使用不同的實現類

這裏也給大家貼一小段 Reactive 適配不同容器 WebSocket 的代碼

public class HandshakeWebSocketService implements WebSocketService, Lifecycle {

    private static final boolean tomcatPresent;

    private static final boolean jettyPresent;

    private static final boolean jetty10Present;

    private static final boolean undertowPresent;

    private static final boolean reactorNettyPresent;

    static {
        ClassLoader loader = HandshakeWebSocketService.class.getClassLoader();
        tomcatPresent = ClassUtils.isPresent("org.apache.tomcat.websocket.server.WsHttpUpgradeHandler", loader);
        jettyPresent = ClassUtils.isPresent("org.eclipse.jetty.websocket.server.WebSocketServerFactory", loader);
        jetty10Present = ClassUtils.isPresent("org.eclipse.jetty.websocket.server.JettyWebSocketServerContainer", loader);
        undertowPresent = ClassUtils.isPresent("io.undertow.websockets.WebSocketProtocolHandshakeHandler", loader);
        reactorNettyPresent = ClassUtils.isPresent("reactor.netty.http.server.HttpServerResponse", loader);
    }
}

發消息

我們需要使用在 WebSocketHandler 中獲得的FluxSink<WebSocketMessage>來發送消息

import org.springframework.web.reactive.socket.CloseStatus;
import org.springframework.web.reactive.socket.WebSocketMessage;
import org.springframework.web.reactive.socket.WebSocketSession;

public class ReactiveWebSocket {

    private final WebSocketSession session;

    private final FluxSink<WebSocketMessage> sender;

    public ReactiveWebSocket(WebSocketSession session, FluxSink<WebSocketMessage> sender) {
        this.session = session;
        this.sender = sender;
    }

    public String getId() {
        return session.getId();
    }

    public URI getUri() {
        return session.getHandshakeInfo().getUri();
    }

    public void send(Object message) {
        if (message instanceof WebSocketMessage) {
            sender.next((WebSocketMessage) message);
        } else if (message instanceof String) {
            //發送文本消息
            sender.next(session.textMessage((String) message));
        } else if (message instanceof DataBuffer) {
            //發送二進制消息
            sender.next(session.binaryMessage(factory -> (DataBuffer) message));
        } else if (message instanceof ByteBuffer) {
            發送二進制消息
            sender.next(session.binaryMessage(factory -> factory.wrap((ByteBuffer) message)));
        } else if (message instanceof byte[]) {
            發送二進制消息
            sender.next(session.binaryMessage(factory -> factory.wrap((byte[]) message)));
        } else {
            throw new IllegalArgumentException("Message type not match");
        }
    }

    public void ping() {
        //發送ping
        sender.next(session.pingMessage(factory -> factory.wrap(ByteBuffer.allocate(0))));
    }

    public void pong() {
        //發送pong
        sender.next(session.pongMessage(factory -> factory.wrap(ByteBuffer.allocate(0))));
    }

    public void close(CloseStatus reason) {
        sender.complete();
        session.close(reason).subscribe();
    }
}

Java-WebSocket

這是一個純 java 的第三方庫,專門用於實現 WebSocket

Github 上已經有很詳細的使用教程了,現在有 9k + 的 Star

傳送門:https://github.com/TooTallNate/Java-WebSocket

SocketIO

該庫使用的協議是經過自己封裝的,支持很多的語言,提供了統一的接口,所以需要使用它提供的 Server 和 Client 來連接,如 socket.io-server-java 和 socket.io-client-java

這個庫我瞭解下來主要用於實時聊天等場景,所以如果只是普通的 WebSocket 功能就有點大材小用了

Github 上也有非常詳細的使用文檔,大家如果有興趣可以研究一下

傳送門:https://github.com/socketio

Netty

這個大家應該都比較熟悉了,就算沒用過肯定也聽過

作者:黑白法師

來源:blog.csdn.net/m0_64360721/article/details/125384766

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/yescDWi2JR5zzUD_fLMEUg