Rust 實現一個簡單聊天室(上)
這篇實現聊天室的服務端,使用 tokio 庫,tcp 協議連接。
一、主要功能需求如下:
1,用戶進入聊天室,要輸入暱稱。
2,用戶輸入的消息,按行讀取,然後轉發給聊天室中其他人。
3,用戶離開聊天室後,告知聊天室的其他人。
二、代碼實現
1,每個客戶端和服務器都要建立一條 TCP 連接,用來發送和接收消息,如果兩個客戶端之間要互發消息,那麼需要服務器進行中轉,因爲他們不知道彼此。假設 A 用戶要給 B 用戶發送 hello 信息,A 用戶需要使用他的客戶端建立 TCP 連接,然後在它的 TCP 連接上發送 hello 消息,服務器從 A 客戶端的 TCP 連接上取到 hello 消息,然後去找到 B 的消息通道放置進去(存儲消息的地方),然後 B 的 TCP 連接將這條消息發送出去,B 收到消息。反之亦然。那服務器就需要幾樣東西,1 是 TCP 連接,用來傳送和接收消息,2 是消息通道,用來存放從 TCP 連接取到的消息。3 是 MAP,記錄 TCP 連接和消息通道的關聯關係,不然容易弄錯或找不到。
代碼如下:
/// 消息通道發射端
type Tx = mpsc::UnboundedSender<String>;
/// 消息通道接收端
type Rx = mpsc::UnboundedReceiver<String>;
/// 存放服務器端的所有用戶連接和消息通道的關係。
///
/// 記錄socket連接地址和消息通道的發射端。
/// 當收到消息後,我們通過消息通道的發射端廣播給其他用戶。
struct Shared {
peers: HashMap<SocketAddr, Tx>,
}
/// 存放客戶端的連接。
struct Peer {
/// 線路,線纜,由TCP連接流,和編碼器組成。
///
/// 可以這樣理解它的作用,它可以用來發送和接收數據流,
/// 並通過線路編碼器進行編碼和解碼。
lines: Framed<TcpStream, LinesCodec>,
/// 消息通道接收端。
///
/// 用來接收從其他客戶端發來的消息。
/// 收到消息後,我們將直接發送給我們的客戶端。
rx: Rx,
}
impl Shared {
/// 創建一個共享倉庫。
fn new() -> Self {
Shared {
peers: HashMap::new(),
}
}
/// 將經過線路編碼器編碼後的消息
/// 廣播給其他客戶端。
async fn broadcast(&mut self, sender: SocketAddr, message: &str) {
for peer in self.peers.iter_mut() {
if *peer.0 != sender {
let _ = peer.1.send(message.into());
}
}
}
}
impl Peer {
/// 創建一個客戶端的檔案信息
async fn new(
state: Arc<Mutex<Shared>>,
lines: Framed<TcpStream, LinesCodec>,
) -> io::Result<Peer> {
// 獲取客戶端的socket地址
let addr = lines.get_ref().peer_addr()?;
// 給這個客戶端創建一個消息通道
let (tx, rx) = mpsc::unbounded_channel();
// 將消息通道和socket地址的關聯關係記錄到map中
state.lock().await.peers.insert(addr, tx);
Ok(Peer { lines, rx })
}
}
2,實現操作步驟,處理流程工藝
/// 處理步驟
async fn process(
state: Arc<Mutex<Shared>>,
stream: TcpStream,
addr: SocketAddr,
) -> Result<(), Box<dyn Error>> {
// 我們需要拿到TCP連接,並添加我們的線路編碼器,用來解碼線路上的字節流
let mut lines = Framed::new(stream, LinesCodec::new());
// 我們需要發送給客戶端消息,請他告訴我們他的名字
lines.send("請輸入你的暱稱:").await?;
// 我們把客戶端發來的第一條消息作爲它的名字
let username = match lines.next().await {
Some(Ok(line)) => line,
// We didn't get a line so we return early here.
_ => {
println!("Failed to get username from {}. Client disconnected.", addr);
return Ok(());
}
};
// 我們建立客戶端的信息
let mut peer = Peer::new(state.clone(), lines).await?;
// 我們需要將新加入的客戶端告訴其他客戶端
{
let mut state = state.lock().await;
let n = state.peers.len();
let msg = format!("歡迎[{}]進入聊天室,當前人數:{}", username, n);
println!("{}", msg);
state.broadcast(addr, &msg).await;
}
// 我們處理收到的消息直到客戶端斷開連接。
loop {
tokio::select! {
// 當我們的消息通道收到消息後,我們馬上告訴客戶端。
Some(msg) = peer.rx.recv() => {
peer.lines.send(&msg).await?;
}
// 當收到客戶端的消息時,我們也需要廣播給其他客戶端。
result = peer.lines.next() => match result {
// 收到消息
Some(Ok(msg)) => {
let mut state = state.lock().await;
let msg = format!("{}: {}", username, msg);
state.broadcast(addr, &msg).await;
}
// An error occurred.
Some(Err(e)) => {
println!(
"an error occurred while processing messages for {}; error = {:?}",
username,
e
);
}
// The stream has been exhausted.
None => break,
},
}
}
// 客戶端離開,我們做些善後工作
// 把它的關聯關係刪除掉,廣播給其他人它退出聊天室了。
{
let mut state = state.lock().await;
state.peers.remove(&addr);
let n = state.peers.len();
let msg = format!("[{}]離開了聊天室,當前人數:{}", username, n);
println!("{}", msg);
state.broadcast(addr, &msg).await;
}
Ok(())
}
3,我們準備好了一切,開始接收客戶端建立 TCP 連接,並處理收發到的消息。
use tokio::net::{TcpListener, TcpStream};
use tokio::sync::{mpsc, Mutex};
use tokio_stream::StreamExt;
use tokio_util::codec::{Framed, LinesCodec};
use futures::SinkExt;
use std::collections::HashMap;
use std::env;
use std::error::Error;
use std::io;
use std::net::SocketAddr;
use std::sync::Arc;
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
// 創建一個開放倉庫
//
// 新建立的客戶端都可以使用它。
let state = Arc::new(Mutex::new(Shared::new()));
let addr = env::args()
.nth(1)
.unwrap_or_else(|| "0.0.0.0:9527".to_string());
// 創建一個TCP監聽器,可以用來接受客戶端的連接。
//
let listener = TcpListener::bind(&addr).await?;
println!("the server start:{}", addr);
loop {
// 等待客戶端的連接
let (stream, addr) = listener.accept().await?;
let state = Arc::clone(&state);
// 生成一個任務進行處理這個連接
tokio::spawn(async move {
if let Err(e) = process(state, stream, addr).await {
println!("an error occurred; error = {:?}", e);
}
});
}
}
以上就是簡單的實現,你可以進行擴展,例如記錄聊天內容到數據庫,創建房間等。你有好的建議和問題,歡迎留言溝通!
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/UfjqHx4-6-vCvcjjOFPPyg