Rust Websocket 設置心跳

WebSockets 是一種先進的技術。它可以在用戶的瀏覽器和服務器之間打開交互式通信會話。服務端可以主動給客戶端推送消息。相比於 HTTP,使用 Websocket,你可以從服務器主動向客戶端發送消息,而無需客戶端通過輪詢服務器的方式以獲得響應。

它基於 TCP,可以從 HTTP 升級爲 Websocket。常見的應用場景就是媒體聊天,彈幕,協同編輯,基於位置的應用,體育實況更新、股票基金報價實時更新,天氣預報等。

爲什麼我們要設置心跳呢?

有人會說 TCP 不是有 KeepAlive 機制麼,通過這個機制來實現不就可以了嗎?但是事實上,TCP KeepAlive 的機制其實並不適用於此。Keep Alive 機制開啓後,TCP 層將在定時時間到後發送相應的 KeepAlive 探針以確定連接可用性。一般時間爲 7200 s,失敗後重試 10 次,每次超時時間 75 s。顯然默認值無法滿足我們的需求,而修改過設置後就可以滿足了嗎?答案仍舊是否定的。

我們知道 TCP 是一個基於連接的協議,其連接狀態是由一個狀態機進行維護,連接完畢後,雙方都會處於 established 狀態,這之後的狀態並不會主動進行變化。這意味着如果上層不進行任何調用,一直使 TCP 連接空閒,那麼這個連接雖然沒有任何數據,但仍是保持連接狀態,一天、一星期、甚至一個月,即使在這期間中間路由崩潰重啓無數次。舉個現實中經常遇到的例子:當我們 ssh 到自己的 VPS 上,然後不小心踢掉網線,此時的網絡變化並不會被 TCP 檢測出,當我們重新插回網線,仍舊可以正常使用 ssh,同時此時並沒有發生任何 TCP 的重連。

實際的網絡環境要非常複雜,辦公的防火牆默認配置會釋放設置的時間內沒有數據的 TCP 連接。DHCP 租約到期。如果使用移動設備,則網絡更加複雜。移動設備爲了節省電量,切到後臺的應用會關閉網絡連接。移動設備使用的運營商網絡,由於無線網絡的頻帶資源相比計算機網絡的光纖傳輸帶寬而言稀缺得多, 無線信號所受到空中干擾大, 信號隨距離的衰減快, 要達到同樣的帶寬及同樣的覆蓋範圍, 配置密集基站的成本遠比建設光纖傳輸網要高得多, 正是因爲如此, 移動通信網絡中才需要頻繁地通過釋放和重新申請無線資源來對寶貴的無線資源進行復用。

Websocket 有 ping、pong 控制幀,我們只需要在約定的時間內發送 ping 幀,對端回覆 pong 幀。

我們可以在 ping 和 pong 幀附加數據。在 websocket 層發送心跳,可以檢測網絡鏈路是否通暢,應用是否阻塞假死等。

設置心跳策略有多種。

一是固定時間發送心跳,比如客戶端每隔 30 秒給服務端發送一個 ping 控制幀,或者服務端給客戶端發送一個 ping 幀,如果未在規定的時間內收到 pong 幀,則認定對端已經關閉,將 websocket 重新連接或者關閉。這種固定時間發送心跳,對於服務端來說,容易實現,設置定時器,或者設置讀超時,在規定的時間內,未收到 ping 控制幀,則判定客戶端連接已關閉,則將服務器端對於客戶端的 websocket 連接關閉。如果再優化一下,可以設置未收到 ping 心跳次數。例如當 ping 心跳 3 次還未收到時,則斷開客戶端的 websocket 連接。這樣可以解決因爲 ping 控制幀丟包的問題。還可以再優化一下,收到客戶端的發送的數據幀時,也將服務端讀超時重新重置。

二是非固定時間發送心跳,比如初始設置默認的雙方約定的心跳時間。當手機應用在前臺時,延長心跳時間,手機應用切到後臺時,縮短心跳時間。這裏主要考慮到手機應用切到後臺時,手機操作系統會關閉網絡連接,服務器端能儘快的檢查到客戶端的狀態,並釋放服務器的 websocket 資源。而如果用戶的手機應用在前臺,手機操作系統不會對網絡連接進行限制,從社會工程學來說,用戶也會使用應用進行瀏覽消息,或發送消息。下次心跳的時間,可以在本次的 ping 控制幀獲取。這樣非固定時間發送心跳,服務端比較難實現。

你有其他的更好的心跳算法,歡迎留言。

在 Rust 中,常用的 Websocket 庫有 tungstenite-rs,tokio-tungstenite,其中,tokio-tungstenite 基於 tugstenite-rs 開發,是異步調用。我們這裏以 tokio-tugstenite 爲例,在服務端,設置固定時間的心跳。

 let ws_stream = accept_async(stream).await.expect("Failed to accept");
    info!("New WebSocket connection: {}", peer);
    let (mut ws_sender, mut ws_receiver) = ws_stream.split();
    let mut interval = tokio::time::interval(Duration::from_millis(1000));
    // Echo incoming WebSocket messages and send a message periodically every second.
    loop {
        tokio::select! {
            msg = ws_receiver.next() => {
                match msg {
                    Some(msg) => {
                        let msg = msg?;
                        if msg.is_text() ||msg.is_binary() {
                            ws_sender.send(msg).await?;
                        } else if msg.is_close() {
                            break;
                        }
                    }
                    None => break,
                }
            }
            _ = interval.tick() => {
                let v:Vec<u8> = Vec::new();
                ws_sender.send(Message::Ping(v)).await?;
            }
        }
    }

其它庫類似的思路。下節介紹 websocket 設置超時。

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/NNLmSM0UJoBv2ym9xAi8ew