sqlx: 一個優秀的 rust 異步 SQL 庫
概覽
sqlx 是一個爲 Rust 語言提供的功能齊全的數據庫訪問和查詢構建器庫。它支持多種數據庫, 包括 PostgreSQL、MySQL、SQLite 等。sqlx 的設計目標是成爲 Rust 中最直觀、高效且類型安全的數據庫客戶端。
-
真正的異步。從頭開始使用 async/await 構建,以實現最大的併發性。
-
編譯時檢查查詢(如果你需要的話)。請注意,sqlx 不是 ORM。
-
與數據庫無關。支持 PostgreSQL、MySQL、MariaDB、SQLite。
-
純 Rust。Postgres 和 MySQL/MariaDB 驅動程序是使用零不安全的代碼以純 Rust 編寫的。
-
與運行時無關。在不同的運行時(async-std/tokio/actix)和 TLS 後端(native-tls,rustls)上運行。
SQLite 驅動程序使用 libsqlite3 C 庫。
sqlx 除非啓用了 sqlite 功能,否則會使用 #![forbid(unsafe_code)]。SQLite 驅動程序通過 libsqlite3-sys 直接調用 SQLite3 API,這需要使用不安全的代碼。
另外,它還有以下特性:
-
跨平臺。作爲原生的 Rust 代碼,sqlx 可以在任何支持 Rust 的平臺上編譯。
-
內建的連接池功能,使用
sqlx::Pool
。 -
行流式處理。數據從數據庫異步讀取並按需解碼。
-
自動的語句準備和緩存。當使用高級查詢 API(
sqlx::query
)時,語句將按連接進行準備和緩存。 -
簡單的(未準備)查詢執行,包括將結果獲取到與高級 API 使用的相同 Row 類型。支持批量執行並返回所有語句的結果。
-
傳輸層安全性(TLS)在支持的平臺(MySQL、MariaDB 和 PostgreSQL)上可用。
-
使用
LISTEN
和NOTIFY
以支持 PostgreSQL 異步通知。 -
支持保存點的嵌套事務。
-
任何數據庫驅動程序,允許在運行時更改數據庫驅動程序。
AnyPool
根據 URL 方案連接到指定的驅動程序。
sqlx 支持編譯時檢查的查詢。然而,它並不是通過提供一個 Rust API 或 DSL(特定領域語言)來構建查詢來實現這一點的。相反,它提供了宏,這些宏接受常規的 SQL 作爲輸入,並確保其對於您的數據庫是有效的。其工作原理是,sqlx 在編譯時連接到您的開發數據庫,讓數據庫本身驗證(並返回一些有關)您的 SQL 查詢的信息。這有一些可能令人驚訝的含義:
-
由於 sqlx 不需要解析 SQL 字符串本身,因此可以使用開發數據庫接受的任何語法(包括數據庫擴展添加的內容)
-
由於數據庫允許您檢索的查詢信息量不同,從查詢宏獲得的 SQL 驗證的程度取決於數據庫。
它不是一個 ORM 庫,你如果想使用 ORM 庫,可以參考 ormx[3] 和 SeaORM[4]。
安裝
sqlx 支持多種異步運行時,你可以通過選擇不同的特性來使用不同的異步運行時。目前支持的異步運行時有async-std
, tokio
和 actix
(其實是 tokio 的別名)。還支持 TLS 的連接:
# Cargo.toml
[dependencies]
# P挑選下面的一行引入sqlx:
# tokio (no TLS)
sqlx = { version = "0.7", features = [ "runtime-tokio" ] }
# tokio + native-tls
sqlx = { version = "0.7", features = [ "runtime-tokio", "tls-native-tls" ] }
# tokio + rustls
sqlx = { version = "0.7", features = [ "runtime-tokio", "tls-rustls" ] }
# async-std (no TLS)
sqlx = { version = "0.7", features = [ "runtime-async-std" ] }
# async-std + native-tls
sqlx = { version = "0.7", features = [ "runtime-async-std", "tls-native-tls" ] }
# async-std + rustls
sqlx = { version = "0.7", features = [ "runtime-async-std", "tls-rustls" ] }
如果你引入了多個異步運行時,默認首選tokio
。
同時你也需要引入所需的數據庫特性:
sqlx = { version = "0.7", features = [ "postgres" ] }
sqlx = { version = "0.7", features = [ "mysql" ] }
sqlx = { version = "0.7", features = [ "sqlite" ] }
sqlx = { version = "0.7", features = [ "any" ] }
以及一些其他的關於數據類型的特性等,比如chrono
、uuid
、time
、bstr
、bigdecimal
、rust_decimal
、ipnetwork
等。
derive
支持 derive 類型的宏,如FromRow
, Type
, Encode
, Decode
.
macros
增加了對 query*! 宏的支持,該宏允許進行編譯時檢查的查詢。
一個簡單的 sqlx 示例如下:
use sqlx::postgres::PgPoolOptions;
#[tokio::main] // 異步運行時
async fn main() -> Result<(), sqlx::Error> {
// 使用一個數據庫的連接池
// 不同的數據庫選擇不同的連接池構建器
let pool = MySqlPoolOptions::new()
.max_connections(5)
.connect("mysql://root:password@localhost/test").await?;
// 執行一個簡單的查詢
let row: (i64,) = sqlx::query_as("SELECT ?")
.bind(150_i64)
.fetch_one(&pool).await?;
assert_eq!(row.0, 150);
Ok(())
}
連接數據庫
sqlx 支持多種不同的方式來連接數據庫。最常見和推薦的是使用連接池。
建立連接池
連接池可以顯著提高應用程序的性能和併發能力。通過重用連接, 減少了建立新連接的開銷。
use sqlx::postgres::PgPoolOptions;
#[tokio::main]
async fn main() -> Result<(), sqlx::Error> {
let pool = PgPoolOptions::new()
.max_connections(5)
.connect("postgres://postgres:@localhost")
.await?;
Ok(())
}
上面的代碼創建了一個最大 5 個連接的 PostgreSQL 連接池。PgPoolOptions
提供了各種配置選項。
比如你可以不通過 dsn 字符串,而是通過方法進行用戶名和密碼設置:
let conn = PgConnectOptions::new()
.host("secret-host")
.port(2525)
.username("secret-user")
.password("secret-password")
.ssl_mode(PgSslMode::Require)
.connect()
.await?;
甚至可以在解析 dsn 字符串後再修改特定的參數,如下面的 mysql 示例:
use sqlx::{Connection, ConnectOptions};
use sqlx::mysql::{MySqlConnectOptions, MySqlConnection, MySqlPool, MySqlSslMode};
// dsn string
let conn = MySqlConnection::connect("mysql://root:password@localhost/db").await?;
// 手工構造
let conn = MySqlConnectOptions::new()
.host("localhost")
.username("root")
.password("password")
.database("db")
.connect().await?;
// 從dsn字符串解析Options
let mut opts: MySqlConnectOptions = "mysql://root:password@localhost/db".parse()?;
// 修改參數
opts.log_statements(log::LevelFilter::Trace);
// 創建連接池
let pool = MySqlPool::connect_with(&opts).await?;
單個連接
有時您可能只需要一個簡單的單連接, 而不需要連接池。
use sqlx::postgres::PgConnOptions;
#[tokio::main]
async fn main() -> Result<(), sqlx::Error> {
let conn = PgConnOptions::new()
.connect("postgres://postgres:@localhost")
.await?;
Ok(())
}
查詢
在 SQL 中,查詢可以分爲預編譯(參數化)或未預編譯(簡單)的。預編譯查詢會緩存其查詢計劃,使用二進制通信模式(降低帶寬和更快的解碼速度),並利用參數來避免 SQL 注入。未預編譯的查詢是簡單的,並且僅用於無法使用預編譯語句的情況,例如各種數據庫命令(如 PRAGMA
、SET
或 BEGIN
)。
sqlx 支持使用這兩種類型的查詢進行所有操作。在 sqlx 中,一個 &str
被當作未預編譯的查詢來處理,而 Query
或 QueryAs
結構體被當作預編譯的查詢來處理。
“
在其他語言中,預編譯就是 prepared statement,未預編譯就是 unprepared statement。
// 底層執行
conn.execute("BEGIN").await?; // 未預編譯,簡單查詢
conn.execute(sqlx::query("DELETE FROM table")).await?; // 預編譯,此連接會緩存查詢
我們應該儘可能使用高級查詢接口。爲了使這更加容易,這些類型上有終結器(finalizers),這樣就不需要使用執行器(executor)來包裝它們。換句話說,sqlx 提供了高級查詢接口,這些接口使得與數據庫的交互更加簡潔和直觀。這些接口被設計爲可以獨立工作,而不需要顯式地創建一個執行器對象來執行查詢。終結器(在這裏可能指的是一些內部機制或方法)確保了這些高級接口在使用後可以正確地清理和關閉相關資源,從而簡化了開發者的工作。
sqlx::query("DELETE FROM table").execute(&mut conn).await?;
sqlx::query("DELETE FROM table").execute(&pool).await?;
在 sqlx 中,執行查詢(execute)的終結器會返回受影響的行數(如果有的話),並丟棄所有接收到的結果。此外,還提供了 fetch
、fetch_one
、fetch_optional
和 fetch_all
方法來接收結果。
sqlx::query
返回的 Query
類型, 它會從數據庫中返回 Row<'conn>
。可以使用 row.get()
方法通過索引或名稱訪問列值。由於 Row
保持了對連接的不可變借用,因此一次只能存在一個 Row
。
fetch
查詢的終結器返回一個類似流的類型,該類型會遍歷結果集中的行。你可以通過迭代這個流來訪問每一行數據。這通常用於處理查詢結果集中有多行數據的情況。
// 提供 `try_next`
use futures::TryStreamExt;
let mut rows = sqlx::query("SELECT * FROM users WHERE email = ?")
.bind(email)
.fetch(&mut conn);
while let Some(row) = rows.try_next().await? {
// 將row映射到用戶定義的領域類型
let email: &str = row.try_get("email")?;
}
爲了將 row 映射到領域類型,可以使用以下兩種模式之一:
- 手工映射
let mut stream = sqlx::query("SELECT * FROM users")
.map(|row: PgRow| {
// 映射row到用戶定義的領域類型
})
.fetch(&mut conn);
- 使用
query_as
和bind
方法
#[derive(sqlx::FromRow)]
struct User { name: String, id: i64 }
let mut stream = sqlx::query_as::<_, User>("SELECT * FROM users WHERE email = ? OR name = ?")
.bind(user_email)
.bind(user_name)
.fetch(&mut conn);
除了使用類似流的類型來遍歷結果集之外 (fetch
),我們還可以使用 fetch_one
或 fetch_optional
來從數據庫中請求一個必需的或可選的結果。
-
fetch_one
: 這個方法會嘗試從結果集中獲取第一行數據。如果結果集爲空(即沒有數據),那麼fetch_one
通常會返回一個錯誤。這個方法適用於你期望查詢結果只有一行數據的情況。 -
fetch_optional
: 這個方法類似於fetch_one
,但它返回一個可選的結果(Option<Row>
或Option<T>
,如果使用了類型映射)。如果結果集爲空,它將返回None
而不是錯誤。這使得它在處理可能返回零行或多行數據的查詢時更加靈活,但你又只關心第一行數據(如果存在)的情況下特別有用。
使用這兩個方法可以幫助你更直接地處理那些只返回單個結果(或可能不返回結果)的查詢
原生查詢和參數化查詢
sqlx 支持執行原生 SQL 查詢, 也支持使用綁定參數進行參數化查詢, 後者有助於防止 SQL 注入攻擊。
#[tokio::main]
async fn main() -> Result<(), sqlx::Error> {
let pool = /* 連接池初始化 */
// 原生查詢
let name: String = sqlx::query_scalar("SELECT name FROM users WHERE id = 1")
.fetch_one(&pool)
.await?;
// 參數化查詢
let count: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM users WHERE email LIKE $1")
.bind("%@example.com")
.fetch_one(&pool)
.await?;
Ok(())
}
流式查詢
sqlx 支持流式查詢,這意味着你可以在查詢結果返回時立即處理它們,而不需要等待整個結果集加載完畢。這對於處理大量數據或需要實時處理數據的情況非常有用。
use sqlx::postgres::PgRow;
#[tokio::main]
async fn main() -> Result<(), sqlx::Error> {
let pool = /* 連接池初始化 */
let mut rows = sqlx::query("SELECT id, name FROM users")
.fetch(&pool);
while let Some(row) = rows.try_next().await? {
let id: i32 = row.try_get(0)?;
let name: &str = row.try_get(1)?;
println!("{} {}", id, name);
}
Ok(())
}
查詢結果映射到 Rust 數據結構
最常見的查詢方式是將結果映射到一個 Rust 數據結構, 比如結構體或元組結構體。sqlx 會自動將數據庫列映射到結構體字段。
比如下面這個例子是查詢一個用戶的信息:
use sqlx::FromRow;
#[derive(FromRow)]
struct User {
id: i32,
name: String,
email: String,
}
#[tokio::main]
async fn main() -> Result<(), sqlx::Error> {
let pool = /* 連接池初始化 */
let user = sqlx::query_as::<_, User>("SELECT id, name, email FROM users WHERE id = $1")
.bind(42)
.fetch_one(&pool)
.await?;
println!("{:?}", user);
Ok(())
}
又比如下面這個例子查詢一組書籍的信息:
use sqlx::postgres::PgPool;
use sqlx::FromRow;
#[derive(FromRow)]
struct Book {
id: i32,
title: String,
author: String,
}
#[tokio::main]
async fn main() -> Result<(), sqlx::Error> {
let pool = PgPool::connect("postgres://postgres:@localhost").await?;
let books = sqlx::query_as::<_, Book>("SELECT * FROM books")
.fetch_all(&pool)
.await?;
for book in books {
println!("{} - {} ({})", book.id, book.title, book.author);
}
Ok(())
}
執行語句
除了查詢, sqlx 還支持執行其他 SQL 語句, 如 INSERT、UPDATE 和 DELETE 等。它提供了多種執行這些語句的方法, 包括支持事務。
執行語句
最簡單的執行語句方式是使用execute
函數:
#[tokio::main]
async fn main() -> Result<(), sqlx::Error> {
let pool = /* 連接池初始化 */
let inserted_rows = sqlx::query("INSERT INTO users (name, email) VALUES ($1, $2)")
.bind("User1").bind("user1@example.com")
.execute(&pool)
.await?
.rows_affected();
println!("Inserted {} rows", inserted_rows);
Ok(())
}
上面這個例子是插入一行數據到users
表中,並打印出插入的行數。
“
不要被
sqlx::query
這個名字所誤導, 它不僅僅用於查詢, 還可以用於執行其他 SQL 語句。
下面是一個插入 / 更新 / 刪除數據的示例:
use sqlx::postgres::PgPool;
#[derive(Debug)]
struct User {
id: i32,
name: String,
email: String,
}
#[tokio::main]
async fn main() -> Result<(), sqlx::Error> {
let pool = PgPool::connect("postgres://postgres:@localhost").await?;
// 插入新用戶
let user = User { id: 0, name: "NewUser".into(), email: "new@example.com".into() };
let id = sqlx::query("INSERT INTO users (name, email) VALUES ($1, $2) RETURNING id")
.bind(&user.name).bind(&user.email)
.fetch_one(&pool)
.await?
.get(0);
println!("Inserted user with id: {}", id);
// 更新用戶
let updated_rows = sqlx::query("UPDATE users SET email=$1 WHERE id=$2")
.bind("updated@example.com").bind(id)
.execute(&pool)
.await?
.rows_affected();
println!("Updated {} rows", updated_rows);
// 刪除用戶
let deleted_rows = sqlx::query("DELETE FROM users WHERE id=$1")
.bind(id)
.execute(&pool)
.await?
.rows_affected();
println!("Deleted {} rows", deleted_rows);
Ok(())
}
事務
sqlx 支持事務, 你可以使用transaction
方法來執行一個事務。要執行多個語句作爲一個原子事務, 您可以使用begin
、commit
和rollback
函數:
#[tokio::main]
async fn main() -> Result<(), sqlx::Error> {
let pool = /* 連接池初始化 */
let mut tx = pool.begin().await?;
sqlx::query("UPDATE users SET email=$1 WHERE id=$2")
.bind("new@email.com").bind(42)
.execute(&mut tx)
.await?;
sqlx::query("DELETE FROM users WHERE id=$1")
.bind(43)
.execute(&mut tx)
.await?;
tx.commit().await?;
Ok(())
}
上面的示例首先開始一個新事務, 然後執行兩個語句, 最後提交事務。如果中間任何一步失敗, 可以調用rollback
回滾整個事務。
面是一個使用 sqlx 中事務和回滾 (rollback) 的示例:
use sqlx::postgres::PgPool;
use sqlx::Error;
#[tokio::main]
async fn main() -> Result<(), Error> {
let pool = PgPool::connect("postgres://postgres:@localhost").await?;
// 開始一個事務
let mut transaction = pool.begin().await?;
// 執行一些操作
sqlx::query("UPDATE accounts SET balance = balance - $1 WHERE id = $2")
.bind(100.0) // 從賬號中扣除100元
.bind(1)
.execute(&mut transaction)
.await?;
sqlx::query("UPDATE accounts SET balance = balance + $1 WHERE id = $2")
.bind(100.0) // 將100元轉賬到另一個賬號
.bind(2)
.execute(&mut transaction)
.await?;
// 模擬一個錯誤情況
if should_rollback() {
// 回滾事務
transaction.rollback().await?;
println!("Transaction rolled back");
} else {
// 提交事務
transaction.commit().await?;
println!("Transaction committed");
}
Ok(())
}
fn should_rollback() -> bool {
// 一些條件判斷,決定是否需要回滾
// 這裏爲了演示,我們隨機返回true或false
rand::thread_rng().gen_bool(0.5)
}
在這個示例中, 我們首先使用pool.begin()
開始一個新的事務。然後, 我們執行兩個查詢, 分別從一個賬戶扣除 100 元, 並將這 100 元轉賬到另一個賬戶。接下來, 我們調用should_rollback()
函數來模擬一個錯誤情況。如果should_rollback()
返回 true, 我們就調用transaction.rollback().await?
來回滾整個事務。否則, 我們調用transaction.commit().await?
來提交事務。
在真實情況下, 您可能會在遇到某些異常或錯誤時觸發回滾, 例如:
-
違反了某些業務規則或數據完整性約束
-
發生了意外的異常或錯誤
-
用戶取消或中斷了操作
-
出於某些原因, 整個事務需要被回滾
通過使用事務和回滾, 您可以確保數據庫中的更改要麼全部成功, 要麼完全回滾, 從而保持數據的一致性和完整性。這對於處理敏感操作或需要多個步驟的複雜操作非常重要。
連接池和併發
sqlx 內置了連接池支持, 這使得它天生就支持高效的併發查詢。通過連接池, 可以避免爲每個查詢創建新連接的開銷。
連接池管理
sqlx 中的連接池由PgPool
之類的類型表示。您可以直接創建一個連接池實例, 也可以使用PgPoolOptions
來定製配置:
use sqlx::postgres::PgPoolOptions;
let pool = PgPoolOptions::new()
.max_connections(10)
.connect("postgres://postgres:@localhost")
.await?;
上面的代碼創建了一個最大連接數爲 10 的 PostgreSQL 連接池。PgPoolOptions
提供了各種配置選項, 如最大連接數、最小連接數、連接超時等。
併發查詢
由於 sqlx 內置了連接池, 因此併發查詢變得非常簡單。你只需要在多個異步任務中並行執行查詢即可:
use sqlx::postgres::PgPool;
use std::time::Instant;
#[tokio::main]
async fn main() -> Result<(), sqlx::Error> {
let pool = PgPool::connect("postgres://postgres:@localhost").await?;
let tasks = (0..10)
.map(|_| {
let pool = pool.clone();
tokio::spawn(async move { // 併發
let now = Instant::now();
let _ = sqlx::query("SELECT pg_sleep(1)").execute(&pool).await;
println!("Task completed in {:?}", now.elapsed());
})
})
.collect::<Vec<_>>();
for task in tasks {
task.await?;
}
Ok(())
}
上面的代碼創建了一個包含 10 個任務的併發查詢。每個任務都會執行一個簡單的查詢, 然後打印出執行時間。通過併發查詢, 您可以同時執行多個查詢, 從而提高查詢效率。
下面是一個更實際的示例, 模擬了併發處理多個 Web 請求的場景:
use sqlx::postgres::{PgPool, PgRow};
use std::io;
#[tokio::main]
async fn main() -> Result<(), sqlx::Error> {
let pool = PgPool::connect("postgres://postgres:@localhost").await?;
let requests = vec![
"SELECT * FROM users WHERE id = $1",
"SELECT * FROM products WHERE category = $1",
"SELECT * FROM orders WHERE user_id = $1",
];
for request in requests {
let rows = sqlx::query(request)
.bind(42)
.fetch_all(&pool)
.await?;
for row in rows {
print_row(row);
}
println!();
}
Ok(())
}
fn print_row(row: PgRow) {
let cols = row.columns();
let values: Vec<&str> = row.get_refs(cols).into_iter().map(|v| v.unwrap()).collect();
println!("{}", values.join(", "));
}
在這個示例中, 我們模擬了處理多個 Web 請求的場景。我們定義了一個包含多個查詢的請求列表, 然後併發執行這些查詢。每個查詢都會返回一組行數據, 我們將這些行數據打印出來。通過併發查詢, 我們可以同時處理多個請求, 從而提高系統的性能和效率。
JSON 支持
現代數據庫廣泛支持 JSON 數據類型, sqlx 也爲此提供了非常好的支持。您可以方便地查詢 JSON 類型以及將查詢結果映射爲 JSON。
查詢 JSON 類型
在數據庫中, JSON 類型通常被存儲爲文本。sqlx 允許你直接查詢和處理 JSON 數據:
#[tokio::main]
async fn main() -> Result<(), sqlx::Error> {
let pool = /* 連接池初始化 */
let json_data: serde_json::Value = sqlx::query(r#"
SELECT '[{"id": 1, "name": "Product 1"}, {"id": 2, "name": "Product 2"}]'::json
"#)
.fetch_one(&pool)
.await?
.get(0);
println!("{:?}", json_data);
Ok(())
}
這個例子查詢了一個 JSON 數組, 並將其直接映射爲 serde_json::Value
。
將查詢結果映射爲 JSON
您還可以將常規的查詢結果映射爲 JSON 格式。這對於構建 API 或與前端交互非常有用。
use serde::{Serialize, Deserialize};
#[derive(Deserialize, Serialize)]
struct Product {
id: i32,
name: String,
price: f64,
}
#[tokio::main]
async fn main() -> Result<(), sqlx::Error> {
let pool = /* 連接池初始化 */
let products: Vec<Product> = sqlx::query_as(
"SELECT id, name, price FROM products"
)
.fetch_all(&pool)
.await?;
let json = serde_json::to_string(&products)?;
println!("{}", json);
Ok(())
}
這個例子查詢了產品列表, 並使用serde_json
將其序列化爲 JSON 格式。
使用 PostgreSQL 的 JSON 類型
這是一個更全面的示例, 展示瞭如何在 PostgreSQL 中使用 JSON 類型:
use serde::{Deserialize, Serialize};
use sqlx::postgres::PgPool;
use sqlx::types::JsonValue;
#[derive(Serialize, Deserialize)]
struct User {
id: i32,
name: String,
profile: JsonValue,
}
#[tokio::main]
async fn main() -> Result<(), sqlx::Error> {
let pool = PgPool::connect("postgres://postgres:@localhost").await?;
// 插入用戶及其JSON配置文件
let profile = serde_json::json!({
"bio": "Software Engineer",
"interests": ["coding", "reading"]
});
let user = User {
id: 0,
name: "NewUser".into(),
profile: profile.into(),
};
let id = sqlx::query("INSERT INTO users (name, profile) VALUES ($1, $2) RETURNING id")
.bind(&user.name)
.bind(&user.profile)
.fetch_one(&pool)
.await?
.get(0);
// 查詢並打印用戶及其配置文件
let user: User = sqlx::query_as("SELECT id, name, profile FROM users WHERE id = $1")
.bind(id)
.fetch_one(&pool)
.await?;
println!("{:?}", user);
Ok(())
}
在這個例子中, 我們首先使用serde_json
創建了一個 JSON 值, 作爲用戶配置文件。然後, 我們將這個 JSON 值 插入到數據庫中。最後, 我們查詢用戶並將配置文件作爲 JsonValue
類型獲取。
通知和監聽
sqlx 提供了在數據庫中監聽通知 (NOTIFY/LISTEN
) 的功能, 這使得構建基於事件的、實時應用程序成爲可能。
數據庫通知數據庫通知是一種機制, 允許應用程序在數據庫中發生某些事件時接收通知。這種功能在構建基於事件的系統 (如聊天應用程序或實時儀表板) 時非常有用。
-- 在數據庫中觸發通知
NOTIFY channel_name, 'hello';
使用監聽器 sqlx 通過DatabaseNotification
結構體來表示接收到的通知。您可以在應用程序中設置一個監聽器, 以接收並處理這些通知。
use sqlx::postgres::PgListener;
#[tokio::main]
async fn main() -> Result<(), sqlx::Error> {
let listener = PgListener::bind("postgresql://localhost/").await?;
listener.listen("channel_name").await?;
loop {
let notification = listener.recv().await?;
println!(
"Received notification: {} ({})",
notification.payload, notification.payload_pretty(),
);
}
}
上面的示例創建了一個PostgreSQL
監聽器, 並開始監聽名爲channel_name
的通道。當接收到通知時, 它會打印出通知的有效負載。
這是一個更完整的示例, 展示瞭如何在 PostgreSQL 中設置通知並在應用程序中監聽它們:
use sqlx::postgres::{PgPool, PgListener};
#[tokio::main]
async fn main() -> Result<(), sqlx::Error> {
let pool = PgPool::connect("postgres://postgres:@localhost").await?;
let listener = PgListener::bind("postgresql://localhost/").await?;
// 創建一個通知通道
sqlx::query("LISTEN channel_name").execute(&pool).await?;
// 在另一個連接上觸發通知
sqlx::query("NOTIFY channel_name, 'hello'").execute(&pool).await?;
// 等待並處理通知
if let Some(notification) = listener.recv().await? {
println!("Received notification: {}", notification.payload);
}
Ok(())
}
在這個例子中, 我們首先創建了一個 PostgreSQL 監聽器, 並在數據庫中設置了一個名爲channel_name
的通知通道。然後, 我們在另一個連接上觸發了一個通知。最後, 監聽器接收到通知並打印出了它的有效負載。
測試
編寫測試對於任何健壯的軟件系統都是必不可少的, sqlx 也不例外。幸運的是, sqlx 提供了多種方式來測試與數據庫交互的代碼。
測試連接
最基本的測試是確保您的應用程序能夠成功連接到數據庫。您可以使用 sqlx 提供的try_connect
函數進行測試:
use sqlx::PgPool;
#[tokio::test]
async fn test_connection() {
let pool = PgPool::try_connect("postgres://postgres:@localhost").await.unwrap();
// 執行一些操作來測試連接...
}
測試查詢
您還可以測試查詢, 以確保它們能夠正確地執行並返回預期的結果。您可以使用query
和query_as
函數來測試查詢:
use sqlx::PgPool;
#[tokio::test]
async fn test_query() {
let pool = PgPool::connect("postgres://postgres:@localhost").await.unwrap();
let row: (i64,) = sqlx::query_as("SELECT 1")
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(row.0, 1);
}
使用內存數據庫
sqlx 支持使用內存數據庫進行測試, 例如 SQLite 內存數據庫。這種方式快速、輕量, 非常適合單元測試。
#[tokio::test]
async fn test_query() {
let pool = sqlx::SqlitePool::connect(":memory:").await.unwrap();
// 執行測試...
}
對於更全面的集成測試, 您可以在測試用例中創建一個臨時的測試數據庫, 執行所需的操作, 然後在測試結束時清理該數據庫。這種方式更接近真實的生產環境。
使用 mock 數據庫
如msql-srv
、opensrv-clickhouse
、opensrv-mysql
、
下面是一個使用集成測試數據庫進行測試的例子:
use sqlx::postgres::{PgPool, PgRow};
#[tokio::test]
async fn test_user_operations() {
let pool = create_test_pool().await;
// 準備測試數據
sqlx::query("CREATE TABLE users (id SERIAL PRIMARY KEY, name TEXT, email TEXT)")
.execute(&pool)
.await
.unwrap();
// 插入新用戶
let name = "Test User".to_owned();
let email = "test@example.com".to_owned();
let id = insert_user(&pool, &name, &email).await;
// 查詢並驗證用戶數據
let row: PgRow = sqlx::query_as("SELECT id, name, email FROM users WHERE id = $1")
.bind(id)
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(row.get::<i32, _>(0), id);
assert_eq!(row.get::<String, _>(1), name);
assert_eq!(row.get::<String, _>(2), email);
}
async fn insert_user(pool: &PgPool, name: &str, email: &str) -> i32 {
sqlx::query("INSERT INTO users (name, email) VALUES ($1, $2) RETURNING id")
.bind(name)
.bind(email)
.fetch_one(pool)
.await
.unwrap()
.get(0)
}
async fn create_test_pool() -> PgPool {
let db_name = "test_database";
let pool = PgPool::connect(&format!("postgres://postgres:@localhost/{}", db_name))
.await
.unwrap();
// 清理並重新創建測試數據庫
sqlx::query(&format!("DROP DATABASE IF EXISTS {}", db_name))
.execute(&pool)
.await
.unwrap();
sqlx::query(&format!("CREATE DATABASE {}", db_name))
.execute(&pool)
.await
.unwrap();
pool
}
在這個示例中, 我們首先創建了一個專用的測試數據庫。然後我們在這個數據庫中創建了一個 users 表, 並進行了插入、查詢等操作, 最後驗證了查詢結果。
高級主題
除了基礎功能外, sqlx 還提供了一些高級功能, 如自定義類型映射、編譯時檢查和性能分析等, 可以進一步提高您的生產力和應用程序的性能。
自定義類型映射
sqlx 允許您定義自定義的數據類型映射規則, 將數據庫中的數據類型映射到 Rust 中的類型。這對於處理一些特殊的數據類型或實現自定義的邏輯非常有用。
use sqlx::types::Type;
use sqlx::postgres::{PgTypeInfo, PgValueRef};
struct MyType(String);
impl Type<PgTypeInfo> for MyType {
fn type_info() -> PgTypeInfo {
PgTypeInfo::with_name("mytype")
}
fn readable_name() -> String {
"MyType".into()
}
}
impl<'r> PgValueRef<'r> for MyType {
fn from_pg_value(value: Option<&'r [u8]>) -> Option<MyType> {
value.map(|bytes| MyType(String::from_utf8_lossy(bytes).into_owned()))
}
fn to_pg_value(&self) -> Option<Vec<u8>> {
Some(self.0.as_bytes().to_vec())
}
}
在這個例子中, 我們定義了一個名爲MyType
的自定義數據類型, 並實現了Type
和PgValueRef
trait。這樣, 我們就可以將數據庫中的mytype
類型映射到 Rust 中的MyType
類型。
編譯時檢查
sqlx 提供了一些宏和編譯時檢查功能, 可以在編譯時捕獲一些錯誤, 而不是在運行時才發現。這有助於提高代碼質量和安全性。
use sqlx::query;
#[rustfmt::skip]
let query = query!(
"
SELECT id, name, email
FROM users
WHERE id = ?
",
42
);
上面的query!
宏可以在編譯時檢查 SQL 語句的語法錯誤, 並驗證綁定參數的數量和類型。這樣可以避免在運行時才發現這些問題。
類似的宏還有query_as!
、query_scalar!
、query_file!
、query_file!
、query_file_scalar!
以及它們的變種query_xxx_unchecked!
。
執行時間
你可以通過計時來分析查詢的性能, 並根據結果進行優化。sqlx 提供了一些工具來幫助您分析查詢的性能, 如log_statements
、log_slow_statements
等。
use sqlx::query;
use sqlx::postgres::PgQueryAs;
#[tokio::main]
async fn main() -> Result<(), sqlx::Error> {
let pool = /* 連接池初始化 */
let query = query_as!(
User,
r#"
SELECT id, name, email
FROM users
WHERE id = $1
"#
);
for attempt in 0..5 {
let time = std::time::Instant::now();
let _users: Vec<User> = query.bind(42).fetch_all(&pool).await?;
let elapsed = time.elapsed();
println!("Query attempt {attempt} took: {elapsed:?}");
}
Ok(())
}
打印日誌
ConnectOptions
提供了兩個設置日誌的方法:
-
log_statements: 使用指定的級別打印執行語句
-
log_slow_statements: 使用指定的級別打印執行時間超過指定閾值的 SQL 語句。
use sqlx::postgres::PgPoolOptions;
#[tokio::main]
async fn main() -> Result<(), sqlx::Error> {
let pool = PgPoolOptions::new()
.max_connections(5)
.log_statements(log::LevelFilter::Debug) // 記錄所有SQL語句
.log_slow_statements(log::LevelFilter::Warn, std::time::Duration::from_millis(100)) // 記錄執行時間超過100ms的慢查詢
.connect("postgres://postgres:@localhost")
.await?;
// 執行一些查詢
let row: (i64,) = sqlx::query_as("SELECT 42")
.fetch_one(&pool)
.await?;
println!("Result: {}", row.0);
Ok(())
}
最佳實踐和故障排除
再囉嗦幾句。
在使用 sqlx 時, 遵循一些最佳實踐可以幫助您編寫更加安全、高效和可維護的代碼。此外, 掌握一些常見錯誤和故障排除技巧也很有幫助。
sqlx 最佳實踐
-
使用參數化查詢: 始終使用帶參數的查詢, 而不是字符串插值。這可以防止 SQL 注入攻擊。
-
監控連接池指標: 監控連接池的指標, 如活躍連接數、獲取連接等待時間等, 以確保連接池配置正確。
-
避免 ORM: sqlx 是一個查詢構建器, 而不是完整的對象關係映射 (ORM) 庫。儘量避免在 sqlx 中複製 ORM 功能。
-
使用流式查詢: 對於大型查詢結果集, 使用流式查詢可以避免一次性加載所有數據到內存中。
-
利用編譯時檢查: 使用 sqlx 提供的 query! 和 query_as! 宏, 可以在編譯時捕獲 SQL 語法錯誤和類型不匹配等問題。
-
測試覆蓋: 爲您的數據庫交互代碼編寫單元測試和集成測試, 以確保正確性和穩定性。
常見錯誤和故障排除
-
連接池耗盡: 如果出現 "連接池耗盡" 錯誤, 可能是因爲併發請求過多或連接池配置不當導致的。檢查連接池指標並適當調整
max_connections
。 -
死鎖: 在事務中執行多個查詢時, 可能會遇到死鎖情況。確保正確使用事務, 並實現重試邏輯。
-
類型不匹配: 如果遇到 "無法將 PostgreSQL 類型映射到 Rust 類型" 之類的錯誤, 檢查您的結構體字段類型是否與數據庫列類型匹配。
-
SQL 語法錯誤: 如果出現 SQL 語法錯誤, 首先檢查您是否使用了參數化查詢。如果使用了 query! 宏, 也可能是宏解析出現了問題。
-
查詢性能差: 如果查詢性能較差, 可以使用 sqlx 提供的查詢追蹤功能分析查詢執行情況, 並優化慢查詢。如果頻繁創建連接,檢查連接池配置是否合理,比如
min_connections
是否過小
生產就緒建議
-
啓用日誌記錄: 在生產環境中合理啓用 sqlx 的日誌記錄, 以便更好地調試和監控應用程序。
-
監控指標: 監控數據庫和連接池指標, 如查詢執行時間、錯誤率、連接池利用率等。
-
進行負載測試: 在部署之前, 對您的應用程序進行全面的負載測試, 以確保其能夠在生產環境中良好運行。
-
實施安全最佳實踐: 遵循安全最佳實踐, 如使用參數化查詢、限制數據庫權限、加密敏感數據等。
-
準備故障轉移計劃: 制定數據庫故障轉移計劃, 以確保應用程序在數據庫出現故障時能夠正常運行。
-
持續集成和交付: 將 sqlx 集成測試納入您的持續集成和交付流程, 以確保代碼質量。
sqlx 生態
有一些其他數據庫的擴展和支持,比如sqlx-rxqlite
、sqlx-clickhouse-ext
。
sqlx-crud
提供常見的數據庫操作的 CRUD 操作的 derive 宏:
use sqlx::FromRow;
use sqlx_crud::SqlxCrud;
#[derive(Debug, FromRow, SqlxCrud)]
struct User {
user_id: i32,
name: String,
}
if let Some(user) = User::by_id(&pool, 42) {
println!("Found user user_id=42: {:?}", user);
}
sqlx-error
提供了對sqlx::Error
的包裝。
當然還有一些其他的庫,不過當前關注度還不是很高。
參考資料
[1]
Go 生態圈的 sqlx 庫: https://colobu.com/2024/05/10/sqlx-a-brief-introduction/
[2]
sqlx 庫: https://github.com/launchbadge/sqlx
[3]
ormx: https://crates.io/crates/ormx
[4]
SeaORM: https://github.com/SeaQL/sea-orm
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/niddplVZmUrWP7d6iM2TgQ