sqlx: 一個優秀的 rust 異步 SQL 庫

概覽

sqlx 是一個爲 Rust 語言提供的功能齊全的數據庫訪問和查詢構建器庫。它支持多種數據庫, 包括 PostgreSQL、MySQL、SQLite 等。sqlx 的設計目標是成爲 Rust 中最直觀、高效且類型安全的數據庫客戶端。

  • SQLite 驅動程序使用 libsqlite3 C 庫。

  • sqlx 除非啓用了 sqlite 功能,否則會使用 #![forbid(unsafe_code)]。SQLite 驅動程序通過 libsqlite3-sys 直接調用 SQLite3 API,這需要使用不安全的代碼。

另外,它還有以下特性:

sqlx 支持編譯時檢查的查詢。然而,它並不是通過提供一個 Rust API 或 DSL(特定領域語言)來構建查詢來實現這一點的。相反,它提供了宏,這些宏接受常規的 SQL 作爲輸入,並確保其對於您的數據庫是有效的。其工作原理是,sqlx 在編譯時連接到您的開發數據庫,讓數據庫本身驗證(並返回一些有關)您的 SQL 查詢的信息。這有一些可能令人驚訝的含義:

它不是一個 ORM 庫,你如果想使用 ORM 庫,可以參考 ormx[3] 和 SeaORM[4]。

安裝

sqlx 支持多種異步運行時,你可以通過選擇不同的特性來使用不同的異步運行時。目前支持的異步運行時有async-std, tokioactix(其實是 tokio 的別名)。還支持 TLS 的連接:

# Cargo.toml
[dependencies]
# P挑選下面的一行引入sqlx:

# tokio (no TLS)
sqlx = { version = "0.7"features = [ "runtime-tokio" ] }
# tokio + native-tls
sqlx = { version = "0.7"features = [ "runtime-tokio""tls-native-tls" ] }
# tokio + rustls
sqlx = { version = "0.7"features = [ "runtime-tokio""tls-rustls" ] }

# async-std (no TLS)
sqlx = { version = "0.7"features = [ "runtime-async-std" ] }
# async-std + native-tls
sqlx = { version = "0.7"features = [ "runtime-async-std""tls-native-tls" ] }
# async-std + rustls
sqlx = { version = "0.7"features = [ "runtime-async-std""tls-rustls" ] }

如果你引入了多個異步運行時,默認首選tokio

同時你也需要引入所需的數據庫特性:

sqlx = { version = "0.7"features = [ "postgres" ] }
sqlx = { version = "0.7"features = [ "mysql" ] }
sqlx = { version = "0.7"features = [ "sqlite" ] }
sqlx = { version = "0.7"features = [ "any" ] }

以及一些其他的關於數據類型的特性等,比如chronouuidtimebstrbigdecimalrust_decimalipnetwork等。

derive支持 derive 類型的宏,如FromRow, Type, Encode, Decode.

macros 增加了對 query*! 宏的支持,該宏允許進行編譯時檢查的查詢。

一個簡單的 sqlx 示例如下:

use sqlx::postgres::PgPoolOptions;

#[tokio::main] // 異步運行時
async fn main() -> Result<(), sqlx::Error> {
    // 使用一個數據庫的連接池
    // 不同的數據庫選擇不同的連接池構建器
    let pool = MySqlPoolOptions::new()
        .max_connections(5)
        .connect("mysql://root:password@localhost/test").await?;

    // 執行一個簡單的查詢
    let row: (i64,) = sqlx::query_as("SELECT ?")
        .bind(150_i64)
        .fetch_one(&pool).await?;

    assert_eq!(row.0, 150);

    Ok(())
}

連接數據庫

sqlx 支持多種不同的方式來連接數據庫。最常見和推薦的是使用連接池。

建立連接池

連接池可以顯著提高應用程序的性能和併發能力。通過重用連接, 減少了建立新連接的開銷。

use sqlx::postgres::PgPoolOptions;

#[tokio::main]
async fn main() -> Result<(), sqlx::Error> {
    let pool = PgPoolOptions::new()
        .max_connections(5)
        .connect("postgres://postgres:@localhost")
        .await?;

    Ok(())
}

上面的代碼創建了一個最大 5 個連接的 PostgreSQL 連接池。PgPoolOptions提供了各種配置選項。

比如你可以不通過 dsn 字符串,而是通過方法進行用戶名和密碼設置:

let conn = PgConnectOptions::new()
    .host("secret-host")
    .port(2525)
    .username("secret-user")
    .password("secret-password")
    .ssl_mode(PgSslMode::Require)
    .connect()
    .await?;

甚至可以在解析 dsn 字符串後再修改特定的參數,如下面的 mysql 示例:

use sqlx::{Connection, ConnectOptions};
use sqlx::mysql::{MySqlConnectOptions, MySqlConnection, MySqlPool, MySqlSslMode};

// dsn string
let conn = MySqlConnection::connect("mysql://root:password@localhost/db").await?;

// 手工構造
let conn = MySqlConnectOptions::new()
    .host("localhost")
    .username("root")
    .password("password")
    .database("db")
    .connect().await?;

// 從dsn字符串解析Options
let mut opts: MySqlConnectOptions = "mysql://root:password@localhost/db".parse()?;

// 修改參數
opts.log_statements(log::LevelFilter::Trace);

// 創建連接池
let pool = MySqlPool::connect_with(&opts).await?;

單個連接

有時您可能只需要一個簡單的單連接, 而不需要連接池。

use sqlx::postgres::PgConnOptions;

#[tokio::main]
async fn main() -> Result<(), sqlx::Error> {
    let conn = PgConnOptions::new()
        .connect("postgres://postgres:@localhost")
        .await?;

    Ok(())
}

查詢

在 SQL 中,查詢可以分爲預編譯(參數化)或未預編譯(簡單)的。預編譯查詢會緩存其查詢計劃,使用二進制通信模式(降低帶寬和更快的解碼速度),並利用參數來避免 SQL 注入。未預編譯的查詢是簡單的,並且僅用於無法使用預編譯語句的情況,例如各種數據庫命令(如 PRAGMASETBEGIN)。

sqlx 支持使用這兩種類型的查詢進行所有操作。在 sqlx 中,一個 &str 被當作未預編譯的查詢來處理,而 QueryQueryAs 結構體被當作預編譯的查詢來處理。

在其他語言中,預編譯就是 prepared statement,未預編譯就是 unprepared statement。

// 底層執行
conn.execute("BEGIN").await?; // 未預編譯,簡單查詢
conn.execute(sqlx::query("DELETE FROM table")).await?; // 預編譯,此連接會緩存查詢

我們應該儘可能使用高級查詢接口。爲了使這更加容易,這些類型上有終結器(finalizers),這樣就不需要使用執行器(executor)來包裝它們。換句話說,sqlx 提供了高級查詢接口,這些接口使得與數據庫的交互更加簡潔和直觀。這些接口被設計爲可以獨立工作,而不需要顯式地創建一個執行器對象來執行查詢。終結器(在這裏可能指的是一些內部機制或方法)確保了這些高級接口在使用後可以正確地清理和關閉相關資源,從而簡化了開發者的工作。

sqlx::query("DELETE FROM table").execute(&mut conn).await?;
sqlx::query("DELETE FROM table").execute(&pool).await?;

在 sqlx 中,執行查詢(execute)的終結器會返回受影響的行數(如果有的話),並丟棄所有接收到的結果。此外,還提供了 fetchfetch_onefetch_optionalfetch_all 方法來接收結果。

sqlx::query 返回的 Query 類型, 它會從數據庫中返回 Row<'conn>。可以使用 row.get() 方法通過索引或名稱訪問列值。由於 Row 保持了對連接的不可變借用,因此一次只能存在一個 Row

fetch 查詢的終結器返回一個類似流的類型,該類型會遍歷結果集中的行。你可以通過迭代這個流來訪問每一行數據。這通常用於處理查詢結果集中有多行數據的情況。

// 提供 `try_next`
use futures::TryStreamExt;

let mut rows = sqlx::query("SELECT * FROM users WHERE email = ?")
    .bind(email)
    .fetch(&mut conn);

while let Some(row) = rows.try_next().await? {
    // 將row映射到用戶定義的領域類型
    let email: &str = row.try_get("email")?;
}

爲了將 row 映射到領域類型,可以使用以下兩種模式之一:

  1. 手工映射
let mut stream = sqlx::query("SELECT * FROM users")
    .map(|row: PgRow| {
        // 映射row到用戶定義的領域類型
    })
    .fetch(&mut conn);
  1. 使用query_asbind方法
#[derive(sqlx::FromRow)]
struct User { name: String, id: i64 }

let mut stream = sqlx::query_as::<_, User>("SELECT * FROM users WHERE email = ? OR name = ?")
    .bind(user_email)
    .bind(user_name)
    .fetch(&mut conn);

除了使用類似流的類型來遍歷結果集之外 (fetch),我們還可以使用 fetch_onefetch_optional 來從數據庫中請求一個必需的或可選的結果。

使用這兩個方法可以幫助你更直接地處理那些只返回單個結果(或可能不返回結果)的查詢

原生查詢和參數化查詢

sqlx 支持執行原生 SQL 查詢, 也支持使用綁定參數進行參數化查詢, 後者有助於防止 SQL 注入攻擊。

#[tokio::main]
async fn main() -> Result<(), sqlx::Error> {
    let pool = /* 連接池初始化 */

    // 原生查詢
    let name: String = sqlx::query_scalar("SELECT name FROM users WHERE id = 1")
        .fetch_one(&pool)
        .await?;

    // 參數化查詢
    let count: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM users WHERE email LIKE $1")
        .bind("%@example.com")
        .fetch_one(&pool)
        .await?;

    Ok(())
}

流式查詢

sqlx 支持流式查詢,這意味着你可以在查詢結果返回時立即處理它們,而不需要等待整個結果集加載完畢。這對於處理大量數據或需要實時處理數據的情況非常有用。

use sqlx::postgres::PgRow;

#[tokio::main]
async fn main() -> Result<(), sqlx::Error> {
    let pool = /* 連接池初始化 */

    let mut rows = sqlx::query("SELECT id, name FROM users")
        .fetch(&pool);

    while let Some(row) = rows.try_next().await? {
        let id: i32 = row.try_get(0)?;
        let name: &str = row.try_get(1)?;
        println!("{} {}", id, name);
    }

    Ok(())
}

查詢結果映射到 Rust 數據結構

最常見的查詢方式是將結果映射到一個 Rust 數據結構, 比如結構體或元組結構體。sqlx 會自動將數據庫列映射到結構體字段。

比如下面這個例子是查詢一個用戶的信息:

use sqlx::FromRow;

#[derive(FromRow)]
struct User {
    id: i32,
    name: String,
    email: String,
}

#[tokio::main]
async fn main() -> Result<(), sqlx::Error> {
    let pool = /* 連接池初始化 */

    let user = sqlx::query_as::<_, User>("SELECT id, name, email FROM users WHERE id = $1")
        .bind(42)
        .fetch_one(&pool)
        .await?;

    println!("{:?}", user);
    Ok(())
}

又比如下面這個例子查詢一組書籍的信息:

use sqlx::postgres::PgPool;
use sqlx::FromRow;

#[derive(FromRow)]
struct Book {
    id: i32,
    title: String,
    author: String,
}

#[tokio::main]
async fn main() -> Result<(), sqlx::Error> {
    let pool = PgPool::connect("postgres://postgres:@localhost").await?;

    let books = sqlx::query_as::<_, Book>("SELECT * FROM books")
        .fetch_all(&pool)
        .await?;

    for book in books {
        println!("{} - {} ({})", book.id, book.title, book.author);
    }

    Ok(())
}

執行語句

除了查詢, sqlx 還支持執行其他 SQL 語句, 如 INSERT、UPDATE 和 DELETE 等。它提供了多種執行這些語句的方法, 包括支持事務。

執行語句

最簡單的執行語句方式是使用execute函數:

#[tokio::main]
async fn main() -> Result<(), sqlx::Error> {
    let pool = /* 連接池初始化 */

    let inserted_rows = sqlx::query("INSERT INTO users (name, email) VALUES ($1$2)")
        .bind("User1").bind("user1@example.com")
        .execute(&pool)
        .await?
        .rows_affected();

    println!("Inserted {} rows", inserted_rows);
    Ok(())
}

上面這個例子是插入一行數據到users表中,並打印出插入的行數。

不要被sqlx::query這個名字所誤導, 它不僅僅用於查詢, 還可以用於執行其他 SQL 語句。

下面是一個插入 / 更新 / 刪除數據的示例:

use sqlx::postgres::PgPool;

#[derive(Debug)]
struct User {
    id: i32,
    name: String,
    email: String,
}

#[tokio::main]
async fn main() -> Result<(), sqlx::Error> {
    let pool = PgPool::connect("postgres://postgres:@localhost").await?;

    // 插入新用戶
    let user = User { id: 0, name: "NewUser".into(), email: "new@example.com".into() };
    let id = sqlx::query("INSERT INTO users (name, email) VALUES ($1$2) RETURNING id")
        .bind(&user.name).bind(&user.email)
        .fetch_one(&pool)
        .await?
        .get(0);

    println!("Inserted user with id: {}", id);

    // 更新用戶
    let updated_rows = sqlx::query("UPDATE users SET email=$1 WHERE id=$2")
        .bind("updated@example.com").bind(id)
        .execute(&pool)
        .await?
        .rows_affected();

    println!("Updated {} rows", updated_rows);

    // 刪除用戶
    let deleted_rows = sqlx::query("DELETE FROM users WHERE id=$1")
        .bind(id)
        .execute(&pool)
        .await?
        .rows_affected();

    println!("Deleted {} rows", deleted_rows);

    Ok(())
}

事務

sqlx 支持事務, 你可以使用transaction方法來執行一個事務。要執行多個語句作爲一個原子事務, 您可以使用begincommitrollback函數:

#[tokio::main]
async fn main() -> Result<(), sqlx::Error> {
    let pool = /* 連接池初始化 */

    let mut tx = pool.begin().await?;

    sqlx::query("UPDATE users SET email=$1 WHERE id=$2")
        .bind("new@email.com").bind(42)
        .execute(&mut tx)
        .await?;

    sqlx::query("DELETE FROM users WHERE id=$1")
        .bind(43)
        .execute(&mut tx)
        .await?;

    tx.commit().await?;

    Ok(())
}

上面的示例首先開始一個新事務, 然後執行兩個語句, 最後提交事務。如果中間任何一步失敗, 可以調用rollback回滾整個事務。

面是一個使用 sqlx 中事務和回滾 (rollback) 的示例:

use sqlx::postgres::PgPool;
use sqlx::Error;

#[tokio::main]
async fn main() -> Result<(), Error> {
    let pool = PgPool::connect("postgres://postgres:@localhost").await?;

    // 開始一個事務
    let mut transaction = pool.begin().await?;

    // 執行一些操作
    sqlx::query("UPDATE accounts SET balance = balance - $1 WHERE id = $2")
        .bind(100.0) // 從賬號中扣除100元
        .bind(1)
        .execute(&mut transaction)
        .await?;

    sqlx::query("UPDATE accounts SET balance = balance + $1 WHERE id = $2")
        .bind(100.0) // 將100元轉賬到另一個賬號
        .bind(2)
        .execute(&mut transaction)
        .await?;

    // 模擬一個錯誤情況
    if should_rollback() {
        // 回滾事務
        transaction.rollback().await?;
        println!("Transaction rolled back");
    } else {
        // 提交事務
        transaction.commit().await?;
        println!("Transaction committed");
    }

    Ok(())
}

fn should_rollback() -> bool {
    // 一些條件判斷,決定是否需要回滾
    // 這裏爲了演示,我們隨機返回true或false
    rand::thread_rng().gen_bool(0.5)
}

在這個示例中, 我們首先使用pool.begin()開始一個新的事務。然後, 我們執行兩個查詢, 分別從一個賬戶扣除 100 元, 並將這 100 元轉賬到另一個賬戶。接下來, 我們調用should_rollback()函數來模擬一個錯誤情況。如果should_rollback()返回 true, 我們就調用transaction.rollback().await?來回滾整個事務。否則, 我們調用transaction.commit().await?來提交事務。

在真實情況下, 您可能會在遇到某些異常或錯誤時觸發回滾, 例如:

通過使用事務和回滾, 您可以確保數據庫中的更改要麼全部成功, 要麼完全回滾, 從而保持數據的一致性和完整性。這對於處理敏感操作或需要多個步驟的複雜操作非常重要。

連接池和併發

sqlx 內置了連接池支持, 這使得它天生就支持高效的併發查詢。通過連接池, 可以避免爲每個查詢創建新連接的開銷。

連接池管理

sqlx 中的連接池由PgPool之類的類型表示。您可以直接創建一個連接池實例, 也可以使用PgPoolOptions來定製配置:

use sqlx::postgres::PgPoolOptions;

let pool = PgPoolOptions::new()
    .max_connections(10)
    .connect("postgres://postgres:@localhost")
    .await?;

上面的代碼創建了一個最大連接數爲 10 的 PostgreSQL 連接池。PgPoolOptions提供了各種配置選項, 如最大連接數、最小連接數、連接超時等。

併發查詢

由於 sqlx 內置了連接池, 因此併發查詢變得非常簡單。你只需要在多個異步任務中並行執行查詢即可:

use sqlx::postgres::PgPool;
use std::time::Instant;

#[tokio::main]
async fn main() -> Result<(), sqlx::Error> {
    let pool = PgPool::connect("postgres://postgres:@localhost").await?;
    let tasks = (0..10)
        .map(|_| {
            let pool = pool.clone();
            tokio::spawn(async move { // 併發
                let now = Instant::now();
                let _ = sqlx::query("SELECT pg_sleep(1)").execute(&pool).await;
                println!("Task completed in {:?}", now.elapsed());
            })
        })
        .collect::<Vec<_>>();

    for task in tasks {
        task.await?;
    }
    Ok(())
}

上面的代碼創建了一個包含 10 個任務的併發查詢。每個任務都會執行一個簡單的查詢, 然後打印出執行時間。通過併發查詢, 您可以同時執行多個查詢, 從而提高查詢效率。

下面是一個更實際的示例, 模擬了併發處理多個 Web 請求的場景:

use sqlx::postgres::{PgPool, PgRow};
use std::io;

#[tokio::main]
async fn main() -> Result<(), sqlx::Error> {
    let pool = PgPool::connect("postgres://postgres:@localhost").await?;

    let requests = vec![
        "SELECT * FROM users WHERE id = $1",
        "SELECT * FROM products WHERE category = $1",
        "SELECT * FROM orders WHERE user_id = $1",
    ];

    for request in requests {
        let rows = sqlx::query(request)
            .bind(42)
            .fetch_all(&pool)
            .await?;

        for row in rows {
            print_row(row);
        }

        println!();
    }

    Ok(())
}

fn print_row(row: PgRow) {
    let cols = row.columns();
    let values: Vec<&str> = row.get_refs(cols).into_iter().map(|v| v.unwrap()).collect();
    println!("{}", values.join(", "));
}

在這個示例中, 我們模擬了處理多個 Web 請求的場景。我們定義了一個包含多個查詢的請求列表, 然後併發執行這些查詢。每個查詢都會返回一組行數據, 我們將這些行數據打印出來。通過併發查詢, 我們可以同時處理多個請求, 從而提高系統的性能和效率。

JSON 支持

現代數據庫廣泛支持 JSON 數據類型, sqlx 也爲此提供了非常好的支持。您可以方便地查詢 JSON 類型以及將查詢結果映射爲 JSON。

查詢 JSON 類型

在數據庫中, JSON 類型通常被存儲爲文本。sqlx 允許你直接查詢和處理 JSON 數據:

#[tokio::main]
async fn main() -> Result<(), sqlx::Error> {
    let pool = /* 連接池初始化 */

    let json_data: serde_json::Value = sqlx::query(r#"
        SELECT '[{"id": 1, "name": "Product 1"}, {"id": 2, "name": "Product 2"}]'::json
    "#)
        .fetch_one(&pool)
        .await?
        .get(0);

    println!("{:?}", json_data);
    Ok(())
}

這個例子查詢了一個 JSON 數組, 並將其直接映射爲 serde_json::Value

將查詢結果映射爲 JSON

您還可以將常規的查詢結果映射爲 JSON 格式。這對於構建 API 或與前端交互非常有用。

use serde::{Serialize, Deserialize};

#[derive(Deserialize, Serialize)]
struct Product {
    id: i32,
    name: String,
    price: f64,
}

#[tokio::main]
async fn main() -> Result<(), sqlx::Error> {
    let pool = /* 連接池初始化 */

    let products: Vec<Product> = sqlx::query_as(
        "SELECT id, name, price FROM products"
    )
    .fetch_all(&pool)
    .await?;

    let json = serde_json::to_string(&products)?;
    println!("{}", json);

    Ok(())
}

這個例子查詢了產品列表, 並使用serde_json將其序列化爲 JSON 格式。

使用 PostgreSQL 的 JSON 類型

這是一個更全面的示例, 展示瞭如何在 PostgreSQL 中使用 JSON 類型:

use serde::{Deserialize, Serialize};
use sqlx::postgres::PgPool;
use sqlx::types::JsonValue;

#[derive(Serialize, Deserialize)]
struct User {
    id: i32,
    name: String,
    profile: JsonValue,
}

#[tokio::main]
async fn main() -> Result<(), sqlx::Error> {
    let pool = PgPool::connect("postgres://postgres:@localhost").await?;

    // 插入用戶及其JSON配置文件
    let profile = serde_json::json!({
        "bio""Software Engineer",
        "interests"["coding""reading"]
    });

    let user = User {
        id: 0,
        name: "NewUser".into(),
        profile: profile.into(),
    };

    let id = sqlx::query("INSERT INTO users (name, profile) VALUES ($1$2) RETURNING id")
        .bind(&user.name)
        .bind(&user.profile)
        .fetch_one(&pool)
        .await?
        .get(0);

    // 查詢並打印用戶及其配置文件
    let user: User = sqlx::query_as("SELECT id, name, profile FROM users WHERE id = $1")
        .bind(id)
        .fetch_one(&pool)
        .await?;

    println!("{:?}", user);

    Ok(())
}

在這個例子中, 我們首先使用serde_json創建了一個 JSON 值, 作爲用戶配置文件。然後, 我們將這個 JSON 值 插入到數據庫中。最後, 我們查詢用戶並將配置文件作爲 JsonValue 類型獲取。

通知和監聽

sqlx 提供了在數據庫中監聽通知 (NOTIFY/LISTEN) 的功能, 這使得構建基於事件的、實時應用程序成爲可能。

數據庫通知數據庫通知是一種機制, 允許應用程序在數據庫中發生某些事件時接收通知。這種功能在構建基於事件的系統 (如聊天應用程序或實時儀表板) 時非常有用。

-- 在數據庫中觸發通知
NOTIFY channel_name, 'hello';

使用監聽器 sqlx 通過DatabaseNotification結構體來表示接收到的通知。您可以在應用程序中設置一個監聽器, 以接收並處理這些通知。

use sqlx::postgres::PgListener;

#[tokio::main]
async fn main() -> Result<(), sqlx::Error> {
    let listener = PgListener::bind("postgresql://localhost/").await?;
    listener.listen("channel_name").await?;

    loop {
        let notification = listener.recv().await?;
        println!(
            "Received notification: {} ({})",
            notification.payload, notification.payload_pretty(),
        );
    }
}

上面的示例創建了一個PostgreSQL監聽器, 並開始監聽名爲channel_name的通道。當接收到通知時, 它會打印出通知的有效負載。

這是一個更完整的示例, 展示瞭如何在 PostgreSQL 中設置通知並在應用程序中監聽它們:

use sqlx::postgres::{PgPool, PgListener};

#[tokio::main]
async fn main() -> Result<(), sqlx::Error> {
    let pool = PgPool::connect("postgres://postgres:@localhost").await?;
    let listener = PgListener::bind("postgresql://localhost/").await?;

    // 創建一個通知通道
    sqlx::query("LISTEN channel_name").execute(&pool).await?;

    // 在另一個連接上觸發通知
    sqlx::query("NOTIFY channel_name, 'hello'").execute(&pool).await?;

    // 等待並處理通知
    if let Some(notification) = listener.recv().await? {
        println!("Received notification: {}", notification.payload);
    }

    Ok(())
}

在這個例子中, 我們首先創建了一個 PostgreSQL 監聽器, 並在數據庫中設置了一個名爲channel_name的通知通道。然後, 我們在另一個連接上觸發了一個通知。最後, 監聽器接收到通知並打印出了它的有效負載。

測試

編寫測試對於任何健壯的軟件系統都是必不可少的, sqlx 也不例外。幸運的是, sqlx 提供了多種方式來測試與數據庫交互的代碼。

測試連接

最基本的測試是確保您的應用程序能夠成功連接到數據庫。您可以使用 sqlx 提供的try_connect函數進行測試:

use sqlx::PgPool;

#[tokio::test]
async fn test_connection() {
    let pool = PgPool::try_connect("postgres://postgres:@localhost").await.unwrap();
    // 執行一些操作來測試連接...
}

測試查詢

您還可以測試查詢, 以確保它們能夠正確地執行並返回預期的結果。您可以使用queryquery_as函數來測試查詢:

use sqlx::PgPool;

#[tokio::test]
async fn test_query() {
    let pool = PgPool::connect("postgres://postgres:@localhost").await.unwrap();

    let row: (i64,) = sqlx::query_as("SELECT 1")
        .fetch_one(&pool)
        .await
        .unwrap();

    assert_eq!(row.0, 1);
}

使用內存數據庫

sqlx 支持使用內存數據庫進行測試, 例如 SQLite 內存數據庫。這種方式快速、輕量, 非常適合單元測試。

#[tokio::test]
async fn test_query() {
    let pool = sqlx::SqlitePool::connect(":memory:").await.unwrap();
    // 執行測試...
}

對於更全面的集成測試, 您可以在測試用例中創建一個臨時的測試數據庫, 執行所需的操作, 然後在測試結束時清理該數據庫。這種方式更接近真實的生產環境。

使用 mock 數據庫

msql-srvopensrv-clickhouseopensrv-mysql

下面是一個使用集成測試數據庫進行測試的例子:

use sqlx::postgres::{PgPool, PgRow};

#[tokio::test]
async fn test_user_operations() {
    let pool = create_test_pool().await;

    // 準備測試數據
    sqlx::query("CREATE TABLE users (id SERIAL PRIMARY KEY, name TEXT, email TEXT)")
        .execute(&pool)
        .await
        .unwrap();

    // 插入新用戶
    let name = "Test User".to_owned();
    let email = "test@example.com".to_owned();
    let id = insert_user(&pool, &name, &email).await;

    // 查詢並驗證用戶數據
    let row: PgRow = sqlx::query_as("SELECT id, name, email FROM users WHERE id = $1")
        .bind(id)
        .fetch_one(&pool)
        .await
        .unwrap();

    assert_eq!(row.get::<i32, _>(0), id);
    assert_eq!(row.get::<String, _>(1), name);
    assert_eq!(row.get::<String, _>(2), email);
}

async fn insert_user(pool: &PgPool, name: &str, email: &str) -> i32 {
    sqlx::query("INSERT INTO users (name, email) VALUES ($1$2) RETURNING id")
        .bind(name)
        .bind(email)
        .fetch_one(pool)
        .await
        .unwrap()
        .get(0)
}

async fn create_test_pool() -> PgPool {
    let db_name = "test_database";
    let pool = PgPool::connect(&format!("postgres://postgres:@localhost/{}", db_name))
        .await
        .unwrap();

    // 清理並重新創建測試數據庫
    sqlx::query(&format!("DROP DATABASE IF EXISTS {}", db_name))
        .execute(&pool)
        .await
        .unwrap();
    sqlx::query(&format!("CREATE DATABASE {}", db_name))
        .execute(&pool)
        .await
        .unwrap();

    pool
}

在這個示例中, 我們首先創建了一個專用的測試數據庫。然後我們在這個數據庫中創建了一個 users 表, 並進行了插入、查詢等操作, 最後驗證了查詢結果。

高級主題

除了基礎功能外, sqlx 還提供了一些高級功能, 如自定義類型映射、編譯時檢查和性能分析等, 可以進一步提高您的生產力和應用程序的性能。

自定義類型映射

sqlx 允許您定義自定義的數據類型映射規則, 將數據庫中的數據類型映射到 Rust 中的類型。這對於處理一些特殊的數據類型或實現自定義的邏輯非常有用。

use sqlx::types::Type;
use sqlx::postgres::{PgTypeInfo, PgValueRef};

struct MyType(String);

impl Type<PgTypeInfo> for MyType {
    fn type_info() -> PgTypeInfo {
        PgTypeInfo::with_name("mytype")
    }

    fn readable_name() -> String {
        "MyType".into()
    }
}

impl<'r> PgValueRef<'r> for MyType {
    fn from_pg_value(value: Option<&'[u8]>) -> Option<MyType> {
        value.map(|bytes| MyType(String::from_utf8_lossy(bytes).into_owned()))
    }

    fn to_pg_value(&self) -> Option<Vec<u8>> {
        Some(self.0.as_bytes().to_vec())
    }
}

在這個例子中, 我們定義了一個名爲MyType的自定義數據類型, 並實現了TypePgValueRef trait。這樣, 我們就可以將數據庫中的mytype類型映射到 Rust 中的MyType類型。

編譯時檢查

sqlx 提供了一些宏和編譯時檢查功能, 可以在編譯時捕獲一些錯誤, 而不是在運行時才發現。這有助於提高代碼質量和安全性。

use sqlx::query;

#[rustfmt::skip]
let query = query!(
    "
    SELECT id, name, email
    FROM users
    WHERE id = ?
    ",
    42
);

上面的query!宏可以在編譯時檢查 SQL 語句的語法錯誤, 並驗證綁定參數的數量和類型。這樣可以避免在運行時才發現這些問題。

類似的宏還有query_as!query_scalar!query_file!query_file!query_file_scalar!以及它們的變種query_xxx_unchecked!

執行時間

你可以通過計時來分析查詢的性能, 並根據結果進行優化。sqlx 提供了一些工具來幫助您分析查詢的性能, 如log_statementslog_slow_statements等。

use sqlx::query;
use sqlx::postgres::PgQueryAs;

#[tokio::main]
async fn main() -> Result<(), sqlx::Error> {
    let pool = /* 連接池初始化 */

    let query = query_as!(
        User,
        r#"
        SELECT id, name, email
        FROM users
        WHERE id = $1
        "#
    );

    for attempt in 0..5 {
        let time = std::time::Instant::now();
        let _users: Vec<User> = query.bind(42).fetch_all(&pool).await?;

        let elapsed = time.elapsed();
        println!("Query attempt {attempt} took: {elapsed:?}");
    }

    Ok(())
}

打印日誌

ConnectOptions提供了兩個設置日誌的方法:

use sqlx::postgres::PgPoolOptions;

#[tokio::main]
async fn main() -> Result<(), sqlx::Error> {
    let pool = PgPoolOptions::new()
        .max_connections(5)
        .log_statements(log::LevelFilter::Debug) // 記錄所有SQL語句
        .log_slow_statements(log::LevelFilter::Warn, std::time::Duration::from_millis(100)) // 記錄執行時間超過100ms的慢查詢
        .connect("postgres://postgres:@localhost")
        .await?;

    // 執行一些查詢
    let row: (i64,) = sqlx::query_as("SELECT 42")
        .fetch_one(&pool)
        .await?;

    println!("Result: {}", row.0);

    Ok(())
}

最佳實踐和故障排除

再囉嗦幾句。

在使用 sqlx 時, 遵循一些最佳實踐可以幫助您編寫更加安全、高效和可維護的代碼。此外, 掌握一些常見錯誤和故障排除技巧也很有幫助。

sqlx 最佳實踐

常見錯誤和故障排除

生產就緒建議

sqlx 生態

有一些其他數據庫的擴展和支持,比如sqlx-rxqlitesqlx-clickhouse-ext

sqlx-crud提供常見的數據庫操作的 CRUD 操作的 derive 宏:

use sqlx::FromRow;
use sqlx_crud::SqlxCrud;

#[derive(Debug, FromRow, SqlxCrud)]
struct User {
    user_id: i32,
    name: String,
}

if let Some(user) = User::by_id(&pool, 42) {
    println!("Found user user_id=42: {:?}", user);
}

sqlx-error提供了對sqlx::Error的包裝。

當然還有一些其他的庫,不過當前關注度還不是很高。

參考資料

[1]

Go 生態圈的 sqlx 庫: https://colobu.com/2024/05/10/sqlx-a-brief-introduction/

[2]

sqlx 庫: https://github.com/launchbadge/sqlx

[3]

ormx: https://crates.io/crates/ormx

[4]

SeaORM: https://github.com/SeaQL/sea-orm

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/niddplVZmUrWP7d6iM2TgQ