Rust:Tokio 網絡 IO 學習
這篇文章的目標是通過構建 TCP Echo Server 來研究 Tokio 庫。計劃是逐步向服務器添加特性,從基本功能開始,逐步增加複雜性。這種方法將使我們更好地理解 Tokio,並提高在 Rust 編程方面的技能。最終目標是擁有一個功能齊全的 TCP Echo Server,可以處理多個連接。
在深入研究代碼之前,瞭解任務和阻塞部分的基本概念是很重要的。任務用於執行異步操作,允許多個操作同時執行。阻塞部分用於執行需要阻塞當前任務的操作,直到特定事件發生。
下面的代碼展示了使用 runtime::new() 方法創建一個新的運行時,然後使用 runtime.spawn 調度一個新任務。在阻塞部分,使用 tokio::time::timeout 函數,成功還是失敗取決於任務是否在指定的超時時間之前完成。
use tokio::{
runtime::Runtime,
time::{sleep, Duration, timeout},
};
fn main() {
let runtime = Runtime::new().unwrap();
let task = runtime.spawn(async {
println!("Hello from the executor.");
sleep(Duration::from_secs(5)).await;
"success"
});
runtime.block_on(async {
match timeout(Duration::from_secs(10), task).await {
Ok(result) => println!("completed: {}", result.unwrap()),
Err(_) => println!("failed: time out"),
}
});
}
創建新的運行時並不是執行異步任務的唯一選項。框架定義了一個屬性,該屬性還可以用於提供默認運行時,並使整個 main 函數異步化。這簡化了代碼,因爲不需要顯式地創建和管理運行時。
使用 #[tokio::main],異步代碼將在默認運行時的上下文中自動執行,允許有效地調度和執行任務。在這種情況下,代碼是流線型的,使得更容易集中精力實現異步任務本身。
#[tokio::main]
async fn main() {
let task = tokio::spawn(async {
println!("Hello from the executor.");
sleep(Duration::from_secs(5)).await;
"success"
});
match timeout(Duration::from_secs(10), task).await {
Ok(result) => println!("completed: {}", result.unwrap()),
Err(_) => println!("failed: time out"),
}
}
現在已經介紹了使用 Tokio 進行異步編程的基礎知識,是時候通過使用 TCP 將這些概念付諸實踐了:綁定一個接受新連接的套接字,接受的連接由一個單獨的異步任務處理,該任務讀取數據、寫回數據並關閉連接。爲了防止程序立即結束,設置了 60 秒的睡眠計時器。過了這段時間後,程序將自動關閉。
#[tokio::main]
async fn main() {
let listener = TcpListener::bind("127.0.0.1:8080").await.unwrap();
println!("Listening on {}", listener.local_addr().unwrap());
let (mut socket, address) = listener.accept().await.unwrap();
println!("Accepted connection on {}", address);
tokio::spawn(async move {
let (mut reader, mut writer) = socket.split();
let mut buffer: [u8; 1024] = [0; 1024];
let n = reader.read(&mut buffer).await.unwrap();
println!("Read {} bytes", n);
writer.write_all(&buffer[..n]).await.unwrap();
println!("Write {} bytes", n);
drop(socket);
println!("Closed remote connection {}", address);
});
sleep(Duration::from_secs(60)).await;
drop(listener);
}
爲了處理所有傳入的數據,讀寫過程可以在循環中執行。這允許程序持續處理輸入數據,直到輸入流關閉。一旦輸入流被關閉,read 函數將返回一個值 0,表示通信的結束。此時,套接字可以安全釋放,通信終止。這種基於循環的方法確保處理所有傳入的數據,允許客戶端和服務器之間進行完整的交換。
tokio::spawn(async move {
let (mut reader, mut writer) = socket.split();
loop {
let mut buffer: [u8; 1024] = [0; 1024];
let n = reader.read(&mut buffer).await.unwrap();
if n == 0 {
println!("Reached end of data {}", address);
break
}else {
println!("Read {} bytes: {:?}", n, &buffer[..n]);
}
writer.write_all(&buffer[..n]).await.unwrap();
println!("Write {} bytes", n);
}
drop(socket);
println!("Closed remote connection {}", address);
});
爲了進一步改進數據傳輸過程,正確處理錯誤並優化緩衝區的處理非常重要。將緩衝區移到循環之外可以幫助簡化過程並將開銷降至最低。此外,改進控制檯日誌記錄可以更好地瞭解執行期間發生的事情。通過採取這些步驟,可以提高數據傳輸的整體性能,從而實現更順暢、更有效的信息交換。
async fn handle_accepted_connection(mut socket: TcpStream, address: SocketAddr) {
let mut buffer = [0; 1024];
let (mut reader, mut writer) = socket.split();
loop {
println!("{}: waiting for data", address);
match reader.read(&mut buffer).await {
Err(error) => {
println!("{}: something bad happened - {}", address, error);
}
Ok(count) if count == 0 => {
break println!("{0}: end of stream", address);
}
Ok(count) => {
println!("{}: read {} bytes", address, count);
match writer.write_all(&buffer[..count]).await {
Err(error) => {
println!("{}: Something happened: {}", address, error);
}
Ok(_) => {
println!("{}: Wrote {} bytes", address, count);
}
}
}
}
}
drop(socket);
println!("{}: remote connection closed", address);
}
實現中的最後一個變化是允許在正確處理錯誤的同時接受多個連接。綁定套接字的目的還在於處理任何潛在的錯誤,確保系統保持穩定和正常運行。其結果是一個可伸縮的解決方案,可以處理數百個連接,而不需要創建額外的線程。這種設計允許在多個客戶端和服務器之間進行高效和有效的通信,使其成爲現實應用程序可靠和健壯的解決方案。
#[tokio::main]
async fn main() {
let address = "127.0.0.1:8080";
let listener = match TcpListener::bind(address).await {
Ok(listener) => {
println!("Listening on {}", address);
listener
},
Err(error) => {
panic!("{}: local port couldn't be bound - {}", address, error);
}
};
loop {
match listener.accept().await {
Err(error) => {
println!("{}: something bad happened - {}", address, error);
},
Ok((socket, address)) => {
println!("{0}: accepting new connection ...", address);
tokio::spawn(async move { handle_accepted_connection(socket, address).await });
println!("{0}: accepting new connection completed", address);
}
}
}
}
完成 TCP Echo Server 示例應用程序的全部功能。它啓動服務器並允許多個客戶端連接,每個客戶端請求都由服務器響應,應用程序優雅地處理關閉連接的客戶端。服務器將繼續運行,直到用戶手動終止它,從而允許與多個客戶機進行無縫通信。該應用程序演示瞭解決方案的可伸縮性,因爲它能夠處理數百個連接,而無需爲每個連接啓動一個新線程,使其成爲處理多路複用的高效和有效的解決方案。
總之,Rust 的 Tokio 庫使異步網絡編程變得非常容易。我一直對異步 I/O 着迷,但它從來沒有像現在的 Tokio 那樣簡單和高效。憑藉其強大的工具和優化的性能,使構建和擴展網絡應用程序從未如此容易。無論你是剛剛開始網絡編程,還是經驗豐富的開發人員,Tokio 都爲構建高性能和可擴展的解決方案提供了堅實的基礎。
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/jCWBQE35R8Oz9eOAxwpyQg