如何編寫異步運行時通用庫?

如果你正在用 Rust 編寫異步應用程序,在某些情況下,你可能希望將代碼分成幾個子 crate。這樣做的好處是:

使用一個異步運行時,編寫異步運行時通用庫的好處是什麼?

下面使用三種方法來實現異步運行時通用庫。

方法 1,定義自己的異步運行時 Trait

使用 futures crate,可以編寫非常通用的庫代碼,但是 time,sleep 或 timeout 等操作必須依賴於異步運行時。這時,你可以定義自己的 AsyncRuntime trait,並要求下游實現它。

use std::{future::Future, time::Duration};

pub trait AsyncRuntime: Send + Sync + 'static {
    type Delay: Future<Output = ()> + Send;

    // 返回值必須是一個Future
    fn sleep(duration: Duration) -> Self::Delay;
}

可以像這樣使用上面的庫代碼:

async fn operation<R: AsyncRuntime>() {
    R::sleep(Duration::from_millis(1)).await;
}

下面是它如何實現的:

pub struct TokioRuntime;

impl AsyncRuntime for TokioRuntime {
    type Delay = tokio::time::Sleep;

    fn sleep(duration: Duration) -> Self::Delay {
        tokio::time::sleep(duration)
    }
}

#[tokio::main]
async fn main() {
    operation::<TokioRuntime>().await;
    println!("Hello, world!");
}

方法 2,在內部抽象異步運行時並公開特性標誌

爲了處理網絡連接或文件句柄,我們可以使用 AsyncRead / AsyncWrite trait:

#[async_trait]
pub(crate) trait AsyncRuntime: Send + Sync + 'static {
    type Connection: AsyncRead + AsyncWrite + Send + Sync + 'static;

    async fn connect(addr: SocketAddr) -> std::io::Result<Self::Connection>;
}

可以像這樣使用上面的庫代碼:

async fn operation<R: AsyncRuntime>(conn: &mut R::Connection) 
where
    R::Connection: Unpin,
{
    conn.write(b"some bytes").await;
}

然後爲每個異步運行時定義一個模塊:

#[cfg(feature = "runtime-async-std")]
mod async_std_impl;
#[cfg(feature = "runtime-async-std")]
use async_std_impl::*;

#[cfg(feature = "runtime-tokio")]
mod tokio_impl;
#[cfg(feature = "runtime-tokio")]
use tokio_impl::*;

tokio_impl 模塊:

mod tokio_impl {
    use std::net::SocketAddr;

    use async_trait::async_trait;
    use crate::AsyncRuntime;

    pub struct TokioRuntime;

    #[async_trait]
    impl AsyncRuntime for TokioRuntime {
        type Connection = tokio::net::TcpStream;

        async fn connect(addr: SocketAddr) -> std::io::Result<Self::Connection> {
            tokio::net::TcpStream::connect(addr).await
        }
    }
}

main 函數代碼:

#[tokio::main]
async fn main() {
    let mut conn =
        TokioRuntime::connect(SocketAddr::new(IpAddr::from_str("0.0.0.0").unwrap(), 8080))
            .await
            .unwrap();
    operation::<TokioRuntime>(&mut conn).await;
    println!("Hello, world!");
}

方法 3,維護一個異步運行時抽象庫

基本上,將使用的所有異步運行時 api 寫成一個包裝器庫。這樣做可能很繁瑣,但也有一個好處,即可以在一個地方爲項目指定與異步運行時的所有交互,這對於調試或跟蹤非常方便。

例如,我們定義異步運行時抽象庫的名字爲:common-async-runtime,它的異步任務處理代碼如下:

// common-async-runtime/tokio_task.rs

pub use tokio::task::{JoinHandle as TaskHandle};

pub fn spawn_task<F, T>(future: F) -> TaskHandle<T>
where
    F: Future<Output = T> + Send + 'static,
    T: Send + 'static,
{
    tokio::task::spawn(future)
}

async-std 的任務 API 與 Tokio 略有不同,這需要一些樣板文件:

// common-async-runtime/async_std_task.rs

pub struct TaskHandle<T>(async_std::task::JoinHandle<T>);

pub fn spawn_task<F, T>(future: F) -> TaskHandle<T>
where
    F: Future<Output = T> + Send + 'static,
    T: Send + 'static,
{
    TaskHandle(async_std::task::spawn(future))
}

#[derive(Debug)]
pub struct JoinError;

impl std::error::Error for JoinError {}

impl<T> Future for TaskHandle<T> {
    type Output = Result<T, JoinError>;

    fn poll(
        mut self: std::pin::Pin<&mut Self>,
        cx: &mut std::task::Context<'_>,
    ) -> std::task::Poll<Self::Output> {
        match self.0.poll_unpin(cx) {
            std::task::Poll::Ready(res) => std::task::Poll::Ready(Ok(res)),
            std::task::Poll::Pending => std::task::Poll::Pending,
        }
    }
}

在 Cargo.toml 中,你可以簡單地將 common-async-runtime 作爲依賴項包含進來。這使得你的庫代碼很 “純粹”,因爲現在選擇異步運行時是由下游控制的。與方法 1 類似,這個 crate 可以在沒有任何異步運行時的情況下編譯,這很簡潔!

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/_Gj64RbzRu0rYXBlJZ5URg