Rust 使用 websocket 創建聊天室
websocket 是基於 TCP 的一種網絡協議。它實現了瀏覽器與服務器的全雙工通信,允許服務器主動發送消息給客戶端。它適用於需要服務器實時推送的場景。例如彈幕,社交聊天,在線教育,股票,體育實況等。
我們使用 rust 實現了一個聊天室的服務端,可以搜索使用在線的 websocket 客戶端就測試,你也可以自行使用熟悉的語言開發客戶端。
1,我們使用 map 存儲 websocket 的地址和消息通道的發射端。爲什麼使用消息通道,而不直接使用 websocket,很多原因,主要的原因爲方便在多個線程之間不用鎖進行通信,增加緩存。
2,我們將收到的消息轉發給聊天室中的其它客戶端,map 存儲了所有客戶端的 socket 地址和消息通道的發射端。我們迭代 map,將消息發送給其他客戶端的消息通道。
type Tx = mpsc::UnboundedSender<Message>;
type Rx = mpsc::UnboundedReceiver<Message>;
struct Shared {
peers: HashMap<SocketAddr, Tx>,
}
struct Peer {
stream: WebSocketStream<TcpStream>,
rx: Rx,
}
impl Shared {
/// Create a new, empty, instance of `Shared`.
fn new() -> Self {
Shared {
peers: HashMap::new(),
}
}
// 廣播
async fn broadcast(&mut self, sender: SocketAddr, msg: &str) {
for peer in self.peers.iter_mut() {
if *peer.0 != sender {
let _ = peer.1.send(Message::Text(msg.to_string()));
}
}
}
}
impl Peer {
/// Create a new instance of `Peer`.
async fn new(state: Arc<Mutex<Shared>>, stream: WebSocketStream<TcpStream>) -> Result<Peer> {
// Get the client socket address
let addr = stream.get_ref().peer_addr()?;
// Create a channel for this peer
let (tx, rx) = mpsc::unbounded_channel();
// Add an entry for this `Peer` in the shared state map.
state.lock().await.peers.insert(addr, tx);
Ok(Peer { stream, rx })
}
}
3,websocket 使用 rust 庫 tokio_tungstenite,通過它可以建立 websocket 連接,在此連接上,服務器和客戶端可以互相收發消息。
4,我們有一個邏輯循環處理消息通道,當消息通道中有消息時,直接將消息從消息通道取出,然後通過自身的 webscoket 發送給客戶端。
5,我們添加了心跳,服務器定期向客戶端進行發送心跳,客戶端收到心跳後進行迴應。我們設置超時定時器,超時之後沒有收到客戶端的響應,我們就就認爲客戶端已經不在線。然後刪除 socket 地址,和消息通道。
async fn handle_connection(
state: Arc<Mutex<Shared>>,
raw_stream: TcpStream,
addr: SocketAddr,
) -> Result<()> {
println!("Incoming TCP connection from: {}", addr);
let mut ws_stream = accept_async(raw_stream)
.await
.expect("Error during the websocket handshake occurred");
println!("WebSocket connection established: {}", addr);
ws_stream
.send(Message::Text("請輸入您的暱稱:".to_string()))
.await?;
let mut username = String::new();
loop {
let line = ws_stream.next().await;
match line {
Some(Ok(line)) => {
if line.is_binary() || line.is_text() {
let msg = line.to_text().unwrap();
if msg.len() > 0 {
username.push_str(line.to_text().unwrap());
break;
}
}
ws_stream
.send(Message::Text("請輸入您的暱稱:".to_string()))
.await?;
}
// We didn't get a line so we return early here.
_ => {
println!("Failed to get username from {}. Client disconnected.", addr);
return Ok(());
}
}
}
// 獲取到暱稱,發送消息給ta
let hello_msg = format!("您設置的暱稱是{},歡迎進入聊天室", username);
ws_stream.send(Message::Text(hello_msg)).await?;
let mut peer = Peer::new(state.clone(), ws_stream).await?;
{
let mut state = state.lock().await;
let n = state.peers.len();
let msg = format!("歡迎[{}]進入聊天室,當前人數:{}", username, n);
println!("{}", msg);
state.broadcast(addr, &msg).await;
}
let mut interval = tokio::time::interval(Duration::from_secs(30));
loop {
tokio::select! {
// 通道中有值時
Some(msg) = peer.rx.recv() => {
peer.stream.send(msg).await?;
},
// websocket接收到消息時
msg = timeout(Duration::from_secs(33),peer.stream.next()) => {
match msg {
Ok(Some(Ok(msg)))=>{
if msg.is_text() || msg.is_binary(){
let content = msg.to_text().unwrap();
let content = content.trim();
if content.len() > 0 {
let mut state = state.lock().await;
let msg = format!("{}: {}", username, content);
state.broadcast(addr, &msg).await;
}
}else if msg.is_close() {
println!("客戶端斷開連接");
break;
}
},
Ok(Some(Err(e)))=>{
println!("客戶端發生錯誤,{}",e);
},
Ok(None)=>{
println!("客戶端已經掉線");
break;
},
Err(e)=>{
println!("獲取消息超時,{}",e);
break;
}
}
}
// 時鐘到達時
_ = interval.tick()=> {
let payload = Vec::new();
let ping = Message::Ping(payload);
peer.stream.send(ping).await?;
}
}
}
{
let mut state = state.lock().await;
state.peers.remove(&addr);
let n = state.peers.len();
let msg = format!("[{}]離開了聊天室,當前人數:{}", username, n);
println!("{}", msg);
state.broadcast(addr, &msg).await;
}
Ok(())
}
最後,我們將它添加 main 函數中。
use std::{
collections::HashMap, env, io::Error as IoError, net::SocketAddr, sync::Arc, time::Duration,
};
use tokio::sync::{mpsc, Mutex};
use futures_util::{SinkExt, StreamExt};
use tokio::net::{TcpListener, TcpStream};
use tokio::time::timeout;
use tokio_tungstenite::WebSocketStream;
use tokio_tungstenite::{
accept_async,
tungstenite::{Message, Result},
};
#[tokio::main]
async fn main() -> Result<(), IoError> {
let addr = env::args()
.nth(1)
.unwrap_or_else(|| "0.0.0.0:9528".to_string());
let state = Arc::new(Mutex::new(Shared::new()));
// Create the event loop and TCP listener we'll accept connections on.
let listener = TcpListener::bind(&addr).await.expect("Failed to bind");
println!("Listening on: {}", addr);
// Let's spawn the handling of each connection in a separate task.
while let Ok((stream, addr)) = listener.accept().await {
let state = Arc::clone(&state);
tokio::spawn(handle_connection(state, stream, addr));
}
Ok(())
}
我已經部署了一個服務器端,可以通過 ws://m.91demo.top:9528 訪問。這個和 TCP 實現的聊天室,相比有以下優點,1 可以直接使用瀏覽器進行訪問,2websocket 自帶 ping 和 pong,可方便創建心跳,3 自帶消息類型和連接狀態。
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/QlV7O3wXo81MQ4vP5HYaOQ