Rust:Tokio 網絡 IO 學習

這篇文章的目標是通過構建 TCP Echo Server 來研究 Tokio 庫。計劃是逐步向服務器添加特性,從基本功能開始,逐步增加複雜性。這種方法將使我們更好地理解 Tokio,並提高在 Rust 編程方面的技能。最終目標是擁有一個功能齊全的 TCP Echo Server,可以處理多個連接。

在深入研究代碼之前,瞭解任務和阻塞部分的基本概念是很重要的。任務用於執行異步操作,允許多個操作同時執行。阻塞部分用於執行需要阻塞當前任務的操作,直到特定事件發生。

下面的代碼展示了使用 runtime::new() 方法創建一個新的運行時,然後使用 runtime.spawn 調度一個新任務。在阻塞部分,使用 tokio::time::timeout 函數,成功還是失敗取決於任務是否在指定的超時時間之前完成。

use tokio::{
    runtime::Runtime,
    time::{sleep, Duration, timeout},
};

fn main() {
    let runtime = Runtime::new().unwrap();

    let task = runtime.spawn(async {
        println!("Hello from the executor.");
        sleep(Duration::from_secs(5)).await;
        "success"
    });

    runtime.block_on(async {
        match timeout(Duration::from_secs(10), task).await {
            Ok(result) => println!("completed: {}", result.unwrap()),
            Err(_) => println!("failed: time out"),
        }
    });
}

創建新的運行時並不是執行異步任務的唯一選項。框架定義了一個屬性,該屬性還可以用於提供默認運行時,並使整個 main 函數異步化。這簡化了代碼,因爲不需要顯式地創建和管理運行時。

使用 #[tokio::main],異步代碼將在默認運行時的上下文中自動執行,允許有效地調度和執行任務。在這種情況下,代碼是流線型的,使得更容易集中精力實現異步任務本身。

#[tokio::main]
async fn main() {
    let task = tokio::spawn(async {
        println!("Hello from the executor.");
        sleep(Duration::from_secs(5)).await;
        "success"
    });

    match timeout(Duration::from_secs(10), task).await {
        Ok(result) => println!("completed: {}", result.unwrap()),
        Err(_) => println!("failed: time out"),
    }    
}

現在已經介紹了使用 Tokio 進行異步編程的基礎知識,是時候通過使用 TCP 將這些概念付諸實踐了:綁定一個接受新連接的套接字,接受的連接由一個單獨的異步任務處理,該任務讀取數據、寫回數據並關閉連接。爲了防止程序立即結束,設置了 60 秒的睡眠計時器。過了這段時間後,程序將自動關閉。

#[tokio::main]
async fn main() {
    let listener = TcpListener::bind("127.0.0.1:8080").await.unwrap();
    println!("Listening on {}", listener.local_addr().unwrap());

    let (mut socket, address) = listener.accept().await.unwrap();
    println!("Accepted connection on {}", address);

    tokio::spawn(async move {
        let (mut reader, mut writer) = socket.split();

        let mut buffer: [u8; 1024] = [0; 1024];
        let n = reader.read(&mut buffer).await.unwrap();
        println!("Read {} bytes", n);

        writer.write_all(&buffer[..n]).await.unwrap();
        println!("Write {} bytes", n);

        drop(socket);
        println!("Closed remote connection {}", address);
    });

    sleep(Duration::from_secs(60)).await;
    drop(listener);
}

爲了處理所有傳入的數據,讀寫過程可以在循環中執行。這允許程序持續處理輸入數據,直到輸入流關閉。一旦輸入流被關閉,read 函數將返回一個值 0,表示通信的結束。此時,套接字可以安全釋放,通信終止。這種基於循環的方法確保處理所有傳入的數據,允許客戶端和服務器之間進行完整的交換。

tokio::spawn(async move {
    let (mut reader, mut writer) = socket.split();

    loop {
        let mut buffer: [u8; 1024] = [0; 1024];
        let n = reader.read(&mut buffer).await.unwrap();
        if n == 0 { 
            println!("Reached end of data {}", address);
            break
        }else {
            println!("Read {} bytes: {:?}", n, &buffer[..n]);
        }

        writer.write_all(&buffer[..n]).await.unwrap();
        println!("Write {} bytes", n);
    }

    drop(socket);
    println!("Closed remote connection {}", address);
});

爲了進一步改進數據傳輸過程,正確處理錯誤並優化緩衝區的處理非常重要。將緩衝區移到循環之外可以幫助簡化過程並將開銷降至最低。此外,改進控制檯日誌記錄可以更好地瞭解執行期間發生的事情。通過採取這些步驟,可以提高數據傳輸的整體性能,從而實現更順暢、更有效的信息交換。

async fn handle_accepted_connection(mut socket: TcpStream, address: SocketAddr) {
    let mut buffer = [0; 1024];
    let (mut reader, mut writer) = socket.split();

    loop {
        println!("{}: waiting for data", address);
        match reader.read(&mut buffer).await {
            Err(error) ={
                println!("{}: something bad happened - {}", address, error);
            }
            Ok(count) if count == 0 ={
                break println!("{0}: end of stream", address);
            }
            Ok(count) ={
                println!("{}: read {} bytes", address, count);
                match writer.write_all(&buffer[..count]).await {
                    Err(error) ={
                        println!("{}: Something happened: {}", address, error);
                    }
                    Ok(_) ={
                        println!("{}: Wrote {} bytes", address, count);
                    }
                }
            }
        }
    }

    drop(socket);
    println!("{}: remote connection closed", address);
}

實現中的最後一個變化是允許在正確處理錯誤的同時接受多個連接。綁定套接字的目的還在於處理任何潛在的錯誤,確保系統保持穩定和正常運行。其結果是一個可伸縮的解決方案,可以處理數百個連接,而不需要創建額外的線程。這種設計允許在多個客戶端和服務器之間進行高效和有效的通信,使其成爲現實應用程序可靠和健壯的解決方案。

#[tokio::main]
async fn main() {
    let address = "127.0.0.1:8080";
    let listener = match TcpListener::bind(address).await {
        Ok(listener) ={
            println!("Listening on {}", address);
            listener
        },
        Err(error) ={
            panic!("{}: local port couldn't be bound - {}", address, error);
        }
    };

    loop {
        match listener.accept().await {
            Err(error) ={
                println!("{}: something bad happened - {}", address, error);
            },
            Ok((socket, address)) ={
                println!("{0}: accepting new connection ...", address);
                tokio::spawn(async move { handle_accepted_connection(socket, address).await });
                println!("{0}: accepting new connection completed", address);
            }
        }
    }
}

完成 TCP Echo Server 示例應用程序的全部功能。它啓動服務器並允許多個客戶端連接,每個客戶端請求都由服務器響應,應用程序優雅地處理關閉連接的客戶端。服務器將繼續運行,直到用戶手動終止它,從而允許與多個客戶機進行無縫通信。該應用程序演示瞭解決方案的可伸縮性,因爲它能夠處理數百個連接,而無需爲每個連接啓動一個新線程,使其成爲處理多路複用的高效和有效的解決方案。

總之,Rust 的 Tokio 庫使異步網絡編程變得非常容易。我一直對異步 I/O 着迷,但它從來沒有像現在的 Tokio 那樣簡單和高效。憑藉其強大的工具和優化的性能,使構建和擴展網絡應用程序從未如此容易。無論你是剛剛開始網絡編程,還是經驗豐富的開發人員,Tokio 都爲構建高性能和可擴展的解決方案提供了堅實的基礎。

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/jCWBQE35R8Oz9eOAxwpyQg