如何在 Rust 中使用 ClickHouse

最近沉迷於學習 Rust,簡單分享一篇在 Rust 中使用 ClickHouse 的方法。

Example 工程連接會放在末尾。

目前已經有實現的 clickhouse crates 庫,地址如下:

https://crates.io/crates/clickhouse-rs/

crates.io 上的版本有點問題,所以直接依賴它在 GitHub 上的版本

Cargo.toml:

[dependencies]
clickhouse-rs = { git = "https://github.com/suharev7/clickhouse-rs", rev = "67c3bcb14751e05b83b3592fa9ee9f6ad2f408c6" }
tokio = {version = "*", features = ["full"]}

clickhouse-rs 基於異步編程實現,默認使用的是 tokio, 所以在依賴中需要一併添加。

新建一個 lib.rs,實現簡單的連接邏輯,首先新建一個結構體:

pub struct ClickHouseEngine {
    pool: Pool,
}

接着爲其定義關聯函數:

impl ClickHouseEngine {
    pub fn new(database_url: &str) -> Self {
        let pool = Pool::new(database_url);
        ClickHouseEngine { pool }
    }
 }

實現簡單的 ddl、insert 和 select 方法,支持鏈式調用:

impl ClickHouseEngine {
    pub fn new(database_url: &str) -> Self {
        let pool = Pool::new(database_url);
        ClickHouseEngine { pool }
    }
   pub async fn ddl_str(&self, ddl: &str) -> Result<(), Box<dyn Error>> {
        let mut client = self.pool.get_handle().await?;
        client.execute(ddl).await?;
        Ok(())
    }
   pub async fn query_str(&self, sql: &str) -> Result<Block<Complex>, Box<dyn Error>> {
        let mut client = self.pool.get_handle().await?;
        let block = client.query(sql).fetch_all().await?;
        Ok(block)
    }
    pub async fn insert_block(&self, table_name: &str, block: Block) -> Result<(), Box<dyn Error>> {
        let mut client = self.pool.get_handle().await?;
        client.insert(table_name, block).await?;
        Ok(())
    }
}

完成之後,直接在 lib.rs 中爲其編寫單元測試:

#[cfg(test)]
mod tests {
    use super::*;
    async fn print_row(block: Block<Complex>) -> Result<(), Box<dyn Error>> {
        println!("count:{} ", block.rows().count());
        for row in block.rows() {
            let id: u32 = row.get("product_id")?;
            let amount: u32 = row.get("price")?;
            let name: Option<&str> = row.get("product_name")?;
            println!("Found  {}: {} {:?}", id, amount, name);
        }
        Ok(())
    }
    #[tokio::test]
    async fn test_qb() -> Result<(), Box<dyn Error>> {
        let ddl = r"
        CREATE TABLE IF NOT EXISTS t_product (
            product_id  UInt32,
            price       UInt32,
            product_name Nullable(FixedString(5))
        ) Engine=Memory";
        let block = Block::new()
            .column("product_id", vec![1_u32, 3, 5, 7, 9])
            .column("price", vec![2_u32, 4, 6, 8, 10])
            .column(
                "product_name",
                vec![Some("foo"), Some("foo"), None, None, Some("bar")],
            );
        let database_url = "tcp://10.37.129.9:9000/default?compression=lz4&ping_timeout=42ms";
        let ce = ClickHouseEngine::new(database_url);
        ce.ddl_str(ddl).await?;
        ce.insert_block("t_product", block).await?;
        let block = ce.query_str("SELECT * FROM t_product").await?;
        print_row(block).await?;
        Ok(())
    }
}

執行單元測試,可以看到功能正常:

    Finished test [unoptimized + debuginfo] target(s) in 0.12s
     Running target/debug/deps/rust2ch-7eda52001fbe25f8
count:5 
Found  1: 2 Some("foo")
Found  3: 4 Some("foo")
Found  5: 6 None
Found  7: 8 None
Found  9: 10 Some("bar")
Process finished with exit code 0

接下來,我們將新增和查詢功能製作成命令行工具,在 main.rs 中定義結構體和枚舉,用來封裝 shell 接收的參數:

/// A rust2ch example
#[derive(StructOpt,Debug)]
struct Args {
    #[structopt(subcommand)]
    cmd: Option<Command>,
}
#[derive(StructOpt,Debug)]
enum Command {
    /// Add new product
    Add {product_name: String},
    /// List products
    List {id: String},
}

這裏使用了非常好用的 StructOpt,它可以幫助我們生成一系列幫助功能,例如對編譯好的二進制執行 -help 命令:

(base) nauu@Boness-MBP debug % ./rust2ch -h    
rust2ch 0.1.0
A rust2ch example
USAGE:
    rust2ch [SUBCOMMAND]
FLAGS:
    -h, --help       Prints help information
    -V, --version    Prints version information
SUBCOMMANDS:
    add     Add new product
    help    Prints this message or the help of the given subcommand(s)
    list    List products

是不是有點那意思了? 

接着在通過枚舉匹配輸入:

 match opt.cmd {
        Some(Command::Add { product_name, .. }) => {
            println!("Add new product with product_name '{}'", &product_name);
            ce.ddl_str(ddl).await?;
            ce.insert_block("t_product", block).await?;
        }
        Some(Command::List { id }) => {
            let block = ce.query_str("SELECT * FROM t_product").await?;
            println!("count:{} ", block.rows().count());
            for row in block.rows() {
                let id: u32 = row.get("product_id")?;
                let amount: u32 = row.get("price")?;
                let name: Option<&str> = row.get("product_name")?;
                println!("Found  {}: {} {:?}", id, amount, name);
            }
        }
        _ => {}
    }

將工程 Build 成二進制文件之後,就可以直接使用了,例如:

(base) nauu@Boness-MBP debug % ./rust2ch list 1
count:5 
Found  1: 2 Some("foo")
Found  3: 4 Some("foo")
Found  5: 6 None
Found  7: 8 None
Found  9: 10 Some("bar")

好了今天的分享就到這裏吧,代碼工程比較簡單,只做拋磚引玉之用,地址如下:https://github.com/nauu/rust2ch

Rust 作爲一款新興的語言十分強大,未來肯定會有所作爲,趕快學起來吧 

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/o2pJBKDKK9YsHoCwf8srRw