用 Rust 開發跨平臺、高性能的時序數據應用

WebAssembly (Wasm) 正在成爲一個廣受歡迎的編譯目標,幫助開發者構建可遷移平臺的應用。最近 Greptime 和 WasmEdge 協作,支持了在 WasmEdge 平臺上的 Wasm 應用通過 MySQL 協議讀寫 GreptimeDB 中的時序數據。

本文來自 GreptimeDB,近期我們與 GreptimeDB 一起實現了使用 Rust 開發跨平臺、高性能的時序數據應用。GreptimeDB 本身是使用 Rust 進行編寫的。

WebAssembly (Wasm) 正在成爲一個廣受歡迎的編譯目標,幫助開發者構建可遷移平臺的應用。最近 Greptime 和 WasmEdge 協作,支持了在 WasmEdge 平臺上的 Wasm 應用通過 MySQL 協議讀寫 GreptimeDB 中的時序數據。

什麼是 WebAssembly

**WebAssembly 是一種新的指令格式,同時具備了跨平臺和接近原生機器代碼的執行速度。**通過將 C/C++ 或 Rust 代碼編譯成 WebAssembly ,可以在瀏覽器中提升程序的性能。而在瀏覽器外的其他運行環境,尤其是 CDN 或 IoT 的邊緣端,我們也可以利用 WebAssembly 實現沙盒、動態加載的插件機制等高級的功能。

什麼是 WasmEdge

WasmEdge 是 CNCF 的沙箱項目,提供上文提到的沙盒能力,允許開發者在 WebAssembly 標準的基礎上,進一步擴展其能訪問的資源和接口。例如,WasmEdge 爲 Wasm 提供了額外的 TLS、網絡能力和 AI 能力,大大豐富了使用場景。

WasmEdge GitHub 地址:

https://github.com/WasmEdge/WasmEdge

安裝 GreptimeDB 和 WasmEdge

如果你已經安裝了 GreptimeDB ,可以跳過這個步驟。

下載 GreptimeDB 並運行:

curl -L https://github.com/GreptimeTeam/greptimedb/raw/develop/scripts/install.sh | sh
./greptime standalone start

安裝 WasmEdge:

curl -sSf https://raw.githubusercontent.com/WasmEdge/WasmEdge/master/utils/install.sh | bash -s

編寫 GreptimeDB 的 WASM 應用

在 WasmEdge 中,我們可以使用 MySQL 協議,讓 Rust 語言編寫的應用程序連接到 GreptimeDB。

首先通過 cargo new 創建一個新的 Rust 項目,我們的編譯目標將是 wasm32-wasi,可以在項目根目錄下創建 .cargo/config.toml 文件,指定默認編譯目標,之後就無需在每次 cargo build 命令後專門指定 --target 了。

# .cargo/config.toml
[build]
target = "wasm32-wasi"

編輯 Cargo.toml 增加依賴。mysql_async 的應用需要 tokio 運行時,WasmEdge 維護了這兩個庫的修改版本,使他們能夠編譯成 WebAssembly 代碼,並且運行到 WasmEdge 環境中。

[package]
name = "greptimedb"
version = "0.1.0"
edition = "2021"

[dependencies]
mysql_async_wasi = "0.31"
time = "0.3"
tokio_wasi = { version = "1"features = [ "io-util""fs""net""time""rt""macros"] }

進一步編輯 src/main.rs 文件,加入數據庫訪問的邏輯。這段代碼將演示:

  1. 通過環境變量讀取數據庫地址,並創建連接池;

  2. 執行 SQL 語句創建數據表;

  3. 插入數據;

  4. 查詢數據。

定義數據結構:

#[derive(Debug)]
struct CpuMetric {
    hostname: String,
    environment: String,
    usage_user: f64,
    usage_system: f64,
    usage_idle: f64,
    ts: i64,
}

impl CpuMetric {
    fn new(
        hostname: String,
        environment: String,
        usage_user: f64,
        usage_system: f64,
        usage_idle: f64,
        ts: i64,
    ) -> Self {
        Self {
            hostname,
            environment,
            usage_user,
            usage_system,
            usage_idle,
            ts,
        }
    }

}

初始化數據庫連接池:

use mysql_async::{
    prelude::*, Opts, OptsBuilder, Pool, PoolConstraints, PoolOpts, Result,
};
use time::PrimitiveDateTime;

fn get_url() -> String {
    if let Ok(url) = std::env::var("DATABASE_URL") {
        let opts = Opts::from_url(&url).expect("DATABASE_URL invalid");
        if opts
            .db_name()
            .expect("a database name is required")
            .is_empty()
        {
            panic!("database name is empty");
        }
        url
    } else {
        "mysql://root:pass@127.0.0.1:3306/mysql".into()
    }
}


#[tokio::main(flavor = "current_thread")]
async fn main() -> Result<(){
    // Alternative: The "easy" way with a default connection pool
    // let pool = Pool::new(Opts::from_url(&*get_url()).unwrap());
    // let mut conn = pool.get_conn().await.unwrap();

    // Below we create a customized connection pool
    let opts = Opts::from_url(&*get_url()).unwrap();
    let builder = OptsBuilder::from_opts(opts);
    // The connection pool will have a min of 1 and max of 2 connections.
    let constraints = PoolConstraints::new(1, 2).unwrap();
    let pool_opts = PoolOpts::default().with_constraints(constraints);

    let pool = Pool::new(builder.pool_opts(pool_opts));
    let mut conn = pool.get_conn().await.unwrap();
    
    
    
    Ok(())
}

創建數據表:

    // Create table if not exists
    r"CREATE TABLE IF NOT EXISTS wasmedge_example_cpu_metrics (
    hostname STRING,
    environment STRING,
    usage_user DOUBLE,
    usage_system DOUBLE,
    usage_idle DOUBLE,
    ts TIMESTAMP,
    TIME INDEX(ts),
    PRIMARY KEY(hostname, environment)
);"
    .ignore(&mut conn)
    .await?;

插入數據:

    let metrics = vec![
        CpuMetric::new(
            "host0".into(),
            "test".into(),
            32f64,
            3f64,
            4f64,
            1680307200050,
        ),
        CpuMetric::new(
            "host1".into(),
            "test".into(),
            29f64,
            32f64,
            50f64,
            1680307200050,
        ),
        CpuMetric::new(
            "host0".into(),
            "test".into(),
            32f64,
            3f64,
            4f64,
            1680307260050,
        ),
        CpuMetric::new(
            "host1".into(),
            "test".into(),
            29f64,
            32f64,
            50f64,
            1680307260050,
        ),
        CpuMetric::new(
            "host0".into(),
            "test".into(),
            32f64,
            3f64,
            4f64,
            1680307320050,
        ),
        CpuMetric::new(
            "host1".into(),
            "test".into(),
            29f64,
            32f64,
            50f64,
            1680307320050,
        ),
    ];

    r"INSERT INTO wasmedge_example_cpu_metrics (hostname, environment, usage_user, usage_system, usage_idle, ts)
      VALUES (:hostname, :environment, :usage_user, :usage_system, :usage_idle, :ts)"
        .with(metrics.iter().map(|metric| {
            params! {
                "hostname" =&metric.hostname,
                "environment" =&metric.environment,
                "usage_user" => metric.usage_user,
                "usage_system" => metric.usage_system,
                "usage_idle" => metric.usage_idle,
                "ts" => metric.ts,
            }
        }))
        .batch(&mut conn)

        .await?;

查詢數據:

    let loaded_metrics = "SELECT * FROM wasmedge_example_cpu_metrics"
        .with(())
        .map(
            &mut conn,
            |(hostname, environment, usage_user, usage_system, usage_idle, raw_ts)(
                String,
                String,
                f64,
                f64,
                f64,
                PrimitiveDateTime,
            )| {
                let ts = raw_ts.assume_utc().unix_timestamp() * 1000;
                CpuMetric::new(
                    hostname,
                    environment,
                    usage_user,
                    usage_system,
                    usage_idle,
                    ts,
                )
            },
        )
        .await?;

    println!("{:?}", loaded_metrics);

WasmEdge 團隊提供的 tokio 和 mysql_async 庫與原始版本編程接口完全一致,因此可以無縫地將普通 Rust 應用切換到 WebAssembly 平臺上。

編譯這個項目,我們可以獲得 greptimedb.wasm 文件:

cargo build
ls -lh target/wasm32-wasi/debug/greptimedb.wasm

通過 WasmEdge 運行我們的程序:

wasmedge --env "DATABASE_URL=mysql://localhost:4002/public" target/wasm32-wasi/debug/greptimedb.wasm

上面這段示例程序已經納入了 WasmEdge 的數據庫使用示例,你可以在 GitHub 倉庫找到完整的代碼:

https://github.com/WasmEdge/wasmedge-db-examples/tree/main/greptimedb。

總結

WasmEdge 爲 WebAssembly 應用提供了更多的擴展能力。如果你也將應用部署在 WebAssembly 環境裏,未來我們還可以使用 OpenTelemetry SDK 採集指標數據直接存儲到 GreptimeDB 。現在就下載 GreptimeDB 或開通 GreptimeCloud 實例運行上面的例子吧。


本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/8eCkvVjX3H-2lgVO9vXoMw