Rust 使用 websocket 創建聊天室

websocket 是基於 TCP 的一種網絡協議。它實現了瀏覽器與服務器的全雙工通信,允許服務器主動發送消息給客戶端。它適用於需要服務器實時推送的場景。例如彈幕,社交聊天,在線教育,股票,體育實況等。

我們使用 rust 實現了一個聊天室的服務端,可以搜索使用在線的 websocket 客戶端就測試,你也可以自行使用熟悉的語言開發客戶端。

1,我們使用 map 存儲 websocket 的地址和消息通道的發射端。爲什麼使用消息通道,而不直接使用 websocket,很多原因,主要的原因爲方便在多個線程之間不用鎖進行通信,增加緩存。

2,我們將收到的消息轉發給聊天室中的其它客戶端,map 存儲了所有客戶端的 socket 地址和消息通道的發射端。我們迭代 map,將消息發送給其他客戶端的消息通道。

type Tx = mpsc::UnboundedSender<Message>;
type Rx = mpsc::UnboundedReceiver<Message>;
struct Shared {
    peers: HashMap<SocketAddr, Tx>,
}
struct Peer {
    stream: WebSocketStream<TcpStream>,
    rx: Rx,
}
impl Shared {
    /// Create a new, empty, instance of `Shared`.
    fn new() -> Self {
        Shared {
            peers: HashMap::new(),
        }
    }
    // 廣播
    async fn broadcast(&mut self, sender: SocketAddr, msg: &str) {
        for peer in self.peers.iter_mut() {
            if *peer.0 != sender {
                let _ = peer.1.send(Message::Text(msg.to_string()));
            }
        }
    }
}
impl Peer {
    /// Create a new instance of `Peer`.
    async fn new(state: Arc<Mutex<Shared>>, stream: WebSocketStream<TcpStream>) -> Result<Peer> {
        // Get the client socket address
        let addr = stream.get_ref().peer_addr()?;
        // Create a channel for this peer
        let (tx, rx) = mpsc::unbounded_channel();
        // Add an entry for this `Peer` in the shared state map.
        state.lock().await.peers.insert(addr, tx);
        Ok(Peer { stream, rx })
    }
}

3,websocket 使用 rust 庫 tokio_tungstenite,通過它可以建立 websocket 連接,在此連接上,服務器和客戶端可以互相收發消息。

4,我們有一個邏輯循環處理消息通道,當消息通道中有消息時,直接將消息從消息通道取出,然後通過自身的 webscoket 發送給客戶端。

5,我們添加了心跳,服務器定期向客戶端進行發送心跳,客戶端收到心跳後進行迴應。我們設置超時定時器,超時之後沒有收到客戶端的響應,我們就就認爲客戶端已經不在線。然後刪除 socket 地址,和消息通道。

async fn handle_connection(
    state: Arc<Mutex<Shared>>,
    raw_stream: TcpStream,
    addr: SocketAddr,
) -> Result<()> {
    println!("Incoming TCP connection from: {}", addr);
    let mut ws_stream = accept_async(raw_stream)
        .await
        .expect("Error during the websocket handshake occurred");
    println!("WebSocket connection established: {}", addr);
    ws_stream
        .send(Message::Text("請輸入您的暱稱:".to_string()))
        .await?;
    let mut username = String::new();
    loop {
        let line = ws_stream.next().await;
        match line {
            Some(Ok(line)) => {
                if line.is_binary() || line.is_text() {
                    let msg = line.to_text().unwrap();
                    if msg.len() > 0 {
                        username.push_str(line.to_text().unwrap());
                        break;
                    }
                }
                ws_stream
                    .send(Message::Text("請輸入您的暱稱:".to_string()))
                    .await?;
            }
            // We didn't get a line so we return early here.
            _ => {
                println!("Failed to get username from {}. Client disconnected.", addr);
                return Ok(());
            }
        }
    }
    // 獲取到暱稱,發送消息給ta
    let hello_msg = format!("您設置的暱稱是{},歡迎進入聊天室", username);
    ws_stream.send(Message::Text(hello_msg)).await?;
    let mut peer = Peer::new(state.clone(), ws_stream).await?;
    {
        let mut state = state.lock().await;
        let n = state.peers.len();
        let msg = format!("歡迎[{}]進入聊天室,當前人數:{}", username, n);
        println!("{}", msg);
        state.broadcast(addr, &msg).await;
    }
    let mut interval = tokio::time::interval(Duration::from_secs(30));
    loop {
        tokio::select! {
            // 通道中有值時
            Some(msg) = peer.rx.recv() => {
                peer.stream.send(msg).await?;
            },
            // websocket接收到消息時
            msg = timeout(Duration::from_secs(33),peer.stream.next()) => {
                match msg {
                    Ok(Some(Ok(msg)))=>{
                        if msg.is_text() || msg.is_binary(){
                            let content = msg.to_text().unwrap();
                            let content = content.trim();
                            if content.len() > 0 {
                                let mut state = state.lock().await;
                                let msg = format!("{}: {}", username, content);
                                state.broadcast(addr, &msg).await;
                            }
                        }else if msg.is_close() {
                            println!("客戶端斷開連接");
                            break;
                        }
                    },
                    Ok(Some(Err(e)))=>{
                        println!("客戶端發生錯誤,{}",e);
                    },
                    Ok(None)=>{
                        println!("客戶端已經掉線");
                        break;
                    },
                    Err(e)=>{
                        println!("獲取消息超時,{}",e);
                        break;
                    }
                }
            }
            // 時鐘到達時
            _ = interval.tick()=> {
                let payload = Vec::new();
                let ping = Message::Ping(payload);
                peer.stream.send(ping).await?;
            }
        }
    }
    {
        let mut state = state.lock().await;
        state.peers.remove(&addr);
        let n = state.peers.len();
        let msg = format!("[{}]離開了聊天室,當前人數:{}", username, n);
        println!("{}", msg);
        state.broadcast(addr, &msg).await;
    }
    Ok(())
}

最後,我們將它添加 main 函數中。

use std::{
    collections::HashMap, env, io::Error as IoError, net::SocketAddr, sync::Arc, time::Duration,
};
use tokio::sync::{mpsc, Mutex};
use futures_util::{SinkExt, StreamExt};
use tokio::net::{TcpListener, TcpStream};
use tokio::time::timeout;
use tokio_tungstenite::WebSocketStream;
use tokio_tungstenite::{
    accept_async,
    tungstenite::{Message, Result},
};
#[tokio::main]
async fn main() -> Result<(), IoError> {
    let addr = env::args()
        .nth(1)
        .unwrap_or_else(|| "0.0.0.0:9528".to_string());
    let state = Arc::new(Mutex::new(Shared::new()));
    // Create the event loop and TCP listener we'll accept connections on.
    let listener = TcpListener::bind(&addr).await.expect("Failed to bind");
    println!("Listening on: {}", addr);
    // Let's spawn the handling of each connection in a separate task.
    while let Ok((stream, addr)) = listener.accept().await {
        let state = Arc::clone(&state);
        tokio::spawn(handle_connection(state, stream, addr));
    }
    Ok(())
}

我已經部署了一個服務器端,可以通過 ws://m.91demo.top:9528 訪問。這個和 TCP 實現的聊天室,相比有以下優點,1 可以直接使用瀏覽器進行訪問,2websocket 自帶 ping 和 pong,可方便創建心跳,3 自帶消息類型和連接狀態。

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/QlV7O3wXo81MQ4vP5HYaOQ