Rust:Tokio 的異步 IO

現代應用程序經常處理大量併發 I/O 操作,例如讀寫文件、網絡通信或與外部設備交互。使用傳統的阻塞 I/O,應用程序會暫停,等待每個 I/O 操作完成後再繼續。這可能導致性能問題並限制應用程序的可伸縮性。爲了解決這些挑戰,我們轉向異步 I/O。這就是 Rust 的 Tokio 發揮作用的地方——Rust 編程語言的一個強大的異步運行時,使開發人員能夠使用 async/await 語法構建高性能、非阻塞應用程序。

在操作系統中反覆檢查完成情況會耗費大量資源,而且很麻煩。這就是像 epoll 這樣的系統調用發揮作用的地方。使用 epoll,你可以將多個未完成的 I/O 請求捆綁在一起,並要求操作系統在這些請求完成時喚醒你的線程。

AsyncRead,AsyncWrite

要異步讀取文件,我們可以使用 “tokio::fs::File” 結構體和 “AsyncReadExt” 特徵。讓我們創建一個簡單的例子,讀取文件的內容並將其打印到控制檯:

use tokio::io::{self, AsyncReadExt};
use tokio::fs::File;

#[tokio::main]
async fn main() -> io::Result<(){
    let mut f = File::open("foo.txt").await?;
    let mut buffer = [0; 10];
    // 讀取最多10個字節
    let n = f.read(&mut buffer).await?;
    println!("The bytes: {:?}"&buffer[..n]);
    Ok(())
}

在這個例子中,我們使用 “tokio::fs::File::open” 來異步打開一個文件。然後使用 “AsyncReadExt::read_to_string” 方法將文件內容讀入 String。最後,我們將內容打印到控制檯。

Tokio 還提供了 “std::io::BufRead” 特徵的對等異步特徵,稱爲 “AsyncBufRead”。此外,它還提供了分別包裝讀取器和寫入器的異步“BufReader” 和“BufWriter”結構。通過利用緩衝區,這些包裝器減少了系統調用的數量,併爲訪問所需數據提供了更方便的方法。

例如,“BufReader”與 “AsyncBufRead” 特徵一起工作,爲任何異步讀取器引入額外的方法:

use tokio::io::{self, BufReader, AsyncBufReadExt};
use tokio::fs::File;

#[tokio::main]
async fn main() -> io::Result<(){
    let f = File::open("foo.txt").await?;
    let mut reader = BufReader::new(f);
    let mut buffer = String::new();
    // 將一行讀入緩衝區
    reader.read_line(&mut buffer).await?;
    println!("{}", buffer);
    Ok(())
}

另一方面,“BufWriter” 並沒有引入新的寫入方法。相反,它會緩衝每個寫入調用。要確保寫入任何緩衝的數據,必須刷新 “BufWriter”。

use tokio::io::{self, BufWriter, AsyncWriteExt};
use tokio::fs::File;

#[tokio::main]
async fn main() -> io::Result<(){
    let f = File::create("foo.txt").await?;
    {
        let mut writer = BufWriter::new(f);
        // 向緩衝區寫入一個字節
        writer.write(&[42u8]).await?;
        // 在緩衝區超出範圍之前刷新緩衝區
        writer.flush().await?;
    } // 如果沒有刷新或關閉,緩衝區的內容將在刪除時被丟棄
    Ok(())
}

總之,Tokio 的讀寫緩衝器通過最小化系統調用和提供更友好的用戶界面,提高了異步 I/O 操作的效率。

自定義實現

讓我們創建一個簡單的自定義內存緩衝區,使用 “AsyncRead” 和“AsyncWrite”特徵實現。我們的 “MemBuffer” 將充當異步 I/O 資源,允許我們異步讀寫數據。

use std::io::Result;
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::io::{AsyncRead, AsyncWrite};
use tokio::io::ReadBuf;

struct MemBuffer {
    data: Vec<u8>,
    read_pos: usize,
}

impl MemBuffer {
    fn new() -> Self {
        Self {
            data: Vec::new(),
            read_pos: 0,
        }
    }
}

impl AsyncRead for MemBuffer {
    fn poll_read(
        mut self: Pin<&mut Self>,
        _cx: &mut Context,
        buf: &mut ReadBuf<'_>,
    ) -> Poll<Result<()>> {
        // 計算可讀取的可用字節數
        let available = self.data.len() - self.read_pos;
        // 確定要讀取的字節數,取ReadBuf剩餘容量的最小值和可用字節數
        let bytes_to_read = available.min(buf.remaining());
        // 從MemBuffer中獲取要讀取的數據片
        let data = &self.data[self.read_pos..self.read_pos + bytes_to_read];
        // 將數據片放入ReadBuf中
        buf.put_slice(data);
        // 更新MemBuffer中的讀位置
        self.read_pos += bytes_to_read;
        Poll::Ready(Ok(()))
    }
}

impl AsyncWrite for MemBuffer {
    fn poll_write(
        mut self: Pin<&mut Self>,
        _cx: &mut Context,
        buf: &[u8],
    ) -> Poll<Result<usize>> {
        self.data.extend_from_slice(buf);
        Poll::Ready(Ok(buf.len()))
    }

    fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Result<()>> {
        // 由於我們的緩衝區在內存中,我們不需要做任何事情來刷新。
        Poll::Ready(Ok(()))
    }

    fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Result<()>> {
        // 內存中的緩衝區不需要特殊的關閉過程。
        Poll::Ready(Ok(()))
    }
}

現在我們有了自定義 “MemBuffer” 實現,我們可以在異步上下文中使用它。讓我們創建一個簡單的例子,寫入和讀取我們的“MemBuffer”。

use tokio::io::{AsyncReadExt, AsyncWriteExt};

#[tokio::main]
async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
    let mut buffer = MemBuffer::new();
    // 將數據寫入緩衝區
    buffer.write_all(b"Hello, world!").await?;
    // 將讀取位置重置爲從頭讀取
    buffer.read_pos = 0;
    // 從緩衝區中讀取數據
    let mut read_buf = vec![0; 13];
    buffer.read_exact(&mut read_buf).await?;
    assert_eq!(read_buf, b"Hello, world!");
    println!("Successfully read data: {:?}", String::from_utf8(read_buf)?);
    Ok(())
}

對於應用程序中可能需要的其他自定義 I/O 資源,可以採用類似的方法。

支持平臺

Tokio 主要是爲類 unix 系統定製的,包括 Linux、macOS 和一些 BSD 變體,以及 Windows。然而,值得注意的是,Tokio 目前缺乏對 WebAssembly (Wasm) 的全面支持。目前正在努力使 Tokio 將來能夠在 Wasm 平臺上運行,但目前它仍然是非官方的支持。

對於 WebAssembly,你可能需要考慮其他選項,例如利用 wasm-bindgen 和 web-sys 來促進異步 I/O 操作。這些庫爲 web 應用程序提供了一種使用本地 JavaScript api 的方法,使你能夠使用 Rust 和 Wasm 開發的 web 應用程序與 DOM 元素、網絡請求和其他基本資源進行交互。

總結

憑藉其精心設計的抽象,Tokio 簡化了異步編程的複雜性,使開發人員更容易訪問和享受它。儘管它主要針對類 unix 系統和 Windows,但正在努力將其支持擴展到 WebAssembly 平臺。

通過採用 Tokio 的異步功能,你可以構建健壯的、高性能的、可擴展的應用程序,以應對當今現代計算環境的高要求工作負載。因此,繼續前進,探索 Tokio 的世界,並在 Rust 中釋放異步編程的全部潛力。

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/r4asA8mpOnk5KCoVADeXUg