Rust 實現一個簡單聊天室(上)

這篇實現聊天室的服務端,使用 tokio 庫,tcp 協議連接。

一、主要功能需求如下:

1,用戶進入聊天室,要輸入暱稱。

2,用戶輸入的消息,按行讀取,然後轉發給聊天室中其他人。

3,用戶離開聊天室後,告知聊天室的其他人。

二、代碼實現

1,每個客戶端和服務器都要建立一條 TCP 連接,用來發送和接收消息,如果兩個客戶端之間要互發消息,那麼需要服務器進行中轉,因爲他們不知道彼此。假設 A 用戶要給 B 用戶發送 hello 信息,A 用戶需要使用他的客戶端建立 TCP 連接,然後在它的 TCP 連接上發送 hello 消息,服務器從 A 客戶端的 TCP 連接上取到 hello 消息,然後去找到 B 的消息通道放置進去(存儲消息的地方),然後 B 的 TCP 連接將這條消息發送出去,B 收到消息。反之亦然。那服務器就需要幾樣東西,1 是 TCP 連接,用來傳送和接收消息,2 是消息通道,用來存放從 TCP 連接取到的消息。3 是 MAP,記錄 TCP 連接和消息通道的關聯關係,不然容易弄錯或找不到。

代碼如下:

/// 消息通道發射端
type Tx = mpsc::UnboundedSender<String>;
/// 消息通道接收端
type Rx = mpsc::UnboundedReceiver<String>;
/// 存放服務器端的所有用戶連接和消息通道的關係。
///
/// 記錄socket連接地址和消息通道的發射端。
/// 當收到消息後,我們通過消息通道的發射端廣播給其他用戶。
struct Shared {
    peers: HashMap<SocketAddr, Tx>,
}
/// 存放客戶端的連接。
struct Peer {
    /// 線路,線纜,由TCP連接流,和編碼器組成。
    ///
    /// 可以這樣理解它的作用,它可以用來發送和接收數據流,
    /// 並通過線路編碼器進行編碼和解碼。
    lines: Framed<TcpStream, LinesCodec>,
    /// 消息通道接收端。
    ///
    /// 用來接收從其他客戶端發來的消息。
    /// 收到消息後,我們將直接發送給我們的客戶端。
    rx: Rx,
}
impl Shared {
    /// 創建一個共享倉庫。
    fn new() -> Self {
        Shared {
            peers: HashMap::new(),
        }
    }
    /// 將經過線路編碼器編碼後的消息
    /// 廣播給其他客戶端。
    async fn broadcast(&mut self, sender: SocketAddr, message: &str) {
        for peer in self.peers.iter_mut() {
            if *peer.0 != sender {
                let _ = peer.1.send(message.into());
            }
        }
    }
}
impl Peer {
    /// 創建一個客戶端的檔案信息
    async fn new(
        state: Arc<Mutex<Shared>>,
        lines: Framed<TcpStream, LinesCodec>,
    ) -> io::Result<Peer> {
        // 獲取客戶端的socket地址
        let addr = lines.get_ref().peer_addr()?;
        // 給這個客戶端創建一個消息通道
        let (tx, rx) = mpsc::unbounded_channel();
        // 將消息通道和socket地址的關聯關係記錄到map中
        state.lock().await.peers.insert(addr, tx);
        Ok(Peer { lines, rx })
    }
}

2,實現操作步驟,處理流程工藝

/// 處理步驟
async fn process(
    state: Arc<Mutex<Shared>>,
    stream: TcpStream,
    addr: SocketAddr,
) -> Result<(), Box<dyn Error>> {
    // 我們需要拿到TCP連接,並添加我們的線路編碼器,用來解碼線路上的字節流
    let mut lines = Framed::new(stream, LinesCodec::new());
    // 我們需要發送給客戶端消息,請他告訴我們他的名字
    lines.send("請輸入你的暱稱:").await?;
    // 我們把客戶端發來的第一條消息作爲它的名字
    let username = match lines.next().await {
        Some(Ok(line)) => line,
        // We didn't get a line so we return early here.
        _ => {
            println!("Failed to get username from {}. Client disconnected.", addr);
            return Ok(());
        }
    };
    // 我們建立客戶端的信息
    let mut peer = Peer::new(state.clone(), lines).await?;
    // 我們需要將新加入的客戶端告訴其他客戶端
    {
        let mut state = state.lock().await;
        let n = state.peers.len();
        let msg = format!("歡迎[{}]進入聊天室,當前人數:{}", username, n);
        println!("{}", msg);
        state.broadcast(addr, &msg).await;
    }
    // 我們處理收到的消息直到客戶端斷開連接。
    loop {
        tokio::select! {
            // 當我們的消息通道收到消息後,我們馬上告訴客戶端。
            Some(msg) = peer.rx.recv() => {
                peer.lines.send(&msg).await?;
            }
            // 當收到客戶端的消息時,我們也需要廣播給其他客戶端。
            result = peer.lines.next() => match result {
                // 收到消息
                Some(Ok(msg)) => {
                    let mut state = state.lock().await;
                    let msg = format!("{}: {}", username, msg);
                    state.broadcast(addr, &msg).await;
                }
                // An error occurred.
                Some(Err(e)) => {
                    println!(
                        "an error occurred while processing messages for {}; error = {:?}",
                        username,
                        e
                    );
                }
                // The stream has been exhausted.
                None => break,
            },
        }
    }
    // 客戶端離開,我們做些善後工作
    // 把它的關聯關係刪除掉,廣播給其他人它退出聊天室了。
    {
        let mut state = state.lock().await;
        state.peers.remove(&addr);
        let n = state.peers.len();
        let msg = format!("[{}]離開了聊天室,當前人數:{}", username, n);
        println!("{}", msg);
        state.broadcast(addr, &msg).await;
    }
    Ok(())
}

3,我們準備好了一切,開始接收客戶端建立 TCP 連接,並處理收發到的消息。

use tokio::net::{TcpListener, TcpStream};
use tokio::sync::{mpsc, Mutex};
use tokio_stream::StreamExt;
use tokio_util::codec::{Framed, LinesCodec};
use futures::SinkExt;
use std::collections::HashMap;
use std::env;
use std::error::Error;
use std::io;
use std::net::SocketAddr;
use std::sync::Arc;
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
    // 創建一個開放倉庫
    //
    // 新建立的客戶端都可以使用它。
    let state = Arc::new(Mutex::new(Shared::new()));
    let addr = env::args()
        .nth(1)
        .unwrap_or_else(|| "0.0.0.0:9527".to_string());
    // 創建一個TCP監聽器,可以用來接受客戶端的連接。
    //
    let listener = TcpListener::bind(&addr).await?;
    println!("the server start:{}", addr);
    loop {
        // 等待客戶端的連接
        let (stream, addr) = listener.accept().await?;
        let state = Arc::clone(&state);
        // 生成一個任務進行處理這個連接
        tokio::spawn(async move {
            if let Err(e) = process(state, stream, addr).await {
                println!("an error occurred; error = {:?}", e);
            }
        });
    }
}

以上就是簡單的實現,你可以進行擴展,例如記錄聊天內容到數據庫,創建房間等。你有好的建議和問題,歡迎留言溝通!

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/UfjqHx4-6-vCvcjjOFPPyg