Rust 在數據工程領域中的應用

近年來,數據工程領域發生了重大變化。數據系統日益複雜,對實時處理的需求,以及對可靠性和性能的不斷需求,促使人們尋找更健壯的編程語言。這就是 Rust 的用武之地,Rust 的增長速度非常快:

在這些競爭者中,Rust 在 2024 年作爲數據工程師的強大工具出現了,它承諾內存安全性、併發性和高效性。但是,究竟是什麼讓 Rust 成爲數據工程的合適選擇,以及如何在這個領域有效地利用它?

Rust 在數據工程領域越來越受歡迎的一些關鍵原因:

1,內存安全:Rust 的所有權系統確保在編譯時捕獲空指針解引用和緩衝區溢出等內存錯誤,從而降低運行時崩潰的風險。

2,併發性:憑藉其輕量級併發模型和嚴格的編譯時檢查,Rust 使編寫既安全又高效的併發程序變得更容易,這是數據密集型應用程序的關鍵需求。

3,性能:Rust 的性能與 C 和 C++ 相當,適合高吞吐量的數據處理任務。

4,生態系統:Rust 生態系統雖然相對年輕,但隨着庫和工具的不斷成熟,它們越來越支持數據工程任務。

Rust 構建數據工程的模塊

爲了在數據工程中有效地利用 Rust,有必要了解構建模塊以及它們如何適應更廣泛的數據管道。

  1. 數據攝取

數據攝取是任何數據管道中的第一步,它包括從各種來源收集原始數據,併爲進一步處理做好準備。Rust 的併發能力在這裏派上了用場。像 HTTP 請求的 request 和 kafka-rust 這樣的庫可以進行高效和安全的數據攝取。

例子:

use reqwest::Client;
use tokio::stream::StreamExt;

#[tokio::main]
async fn main() {
    let client = Client::new();
    let urls = vec!["http://example.com/data1""http://example.com/data2"];

    let fetches = futures::stream::iter(urls)
        .map(|url| {
            let client = client.clone();
            async move {
                let resp = client.get(url).send().await?;
                let body = resp.text().await?;
                Ok(body)
            }
        })
        .buffer_unordered(5);

    fetches
        .for_each(|res| async {
            match res {
                Ok(data) => println!("Fetched data: {}", data),
                Err(e) => eprintln!("Error: {}", e),
            }
        })
        .await;
}

2,數據轉換 

一旦數據被攝取,下一步就是處理。這可能涉及轉換、聚合和過濾。Rust 強大的迭代器系統和函數式編程範例支持富有表現力和高效的數據處理。

例子:

let data = vec![1, 2, 3, 4, 5];
let processed_data: Vec<_> = data
    .into_iter()
    .filter(|&x| x % 2 == 0)
    .map(|x| x * 2)
    .collect();

println!("Processed data: {:?}", processed_data);

3,數據存儲

有效地存儲處理過的數據對於任何數據管道都是至關重要的。Rust 支持與各種數據庫交互,包括 SQL 和 NoSQL,確保數據可以可靠地存儲和快速檢索。SQL 數據庫的 diesel 和 MongoDB 的 mongodb 等庫提供了必要的抽象。

例子:

use diesel::prelude::*;
use diesel::sqlite::SqliteConnection;

fn establish_connection() -> SqliteConnection {
    SqliteConnection::establish("test.db").expect("Error connecting to database")
}

fn main() {
    let connection = establish_connection();
    // Insert and query data using Diesel ORM
}

4,數據服務

使用 serde 以及強大而簡單的 API 框架 (如 axum),可以簡單的進行序列化和反序列化操作,以有效地提供數據,使得爲數據請求提供 API 服務變得容易。

例子:

use axum::{routing::get, Json, Router};
use serde::Serialize;
use std::net::SocketAddr;

#[derive(Serialize)]
struct HelloWorld {
    message: String,
}

#[tokio::main]
async fn main() {
    let app = Router::new().route("/", get(hello_world));

    let addr = SocketAddr::from(([127.0.0.1], 3000));
    axum::Server::bind(&addr)
        .serve(app.into_make_service())
        .await
        .unwrap();
}

async fn hello_world() -> Json<HelloWorld> {
    Json(HelloWorld {
        message: "Hello, World!".to_string(),
    })
}

5,數據挖掘

數據分析通常需要處理大型數據集和執行復雜的計算。Rust 的數據操作庫 (如 polars) 越來越多地用於這些任務,並且比現有的 Python 庫 (如 pandas) 更加高效和高性能。

例子:

use polars::prelude::*;
use std::fs::File;
use std::io::BufReader;

fn main() -> Result<(){
    let file = File::open("path/to/your/data.csv").expect("Failed to open file");
    let reader = BufReader::new(file);
    let df = CsvReader::new(reader)
        .infer_schema(None)
        .has_header(true)
        .finish()?;

    // Print the first few rows of the DataFrame
    println!("DataFrame preview:");
    println!("{:?}", df.head(Some(5)));

    // Example: Calculate the mean and sum of a column named "value"
    let mean_value = df
        .lazy()
        .select([col("value").mean().alias("mean_value")])
        .collect()?
        .column("mean_value")?
        .get(0);
    println!("Mean of 'value' column: {:?}", mean_value);

    let sum_value = df
        .lazy()
        .select([col("value").sum().alias("sum_value")])
        .collect()?
        .column("sum_value")?
        .get(0);
    println!("Sum of 'value' column: {:?}", sum_value);

    // Example: Group by a column named "category" and calculate aggregate metrics
    let grouped_df = df
        .lazy()
        .groupby([col("category")])
        .agg([
            col("value").mean().alias("mean_value"),
            col("value").sum().alias("sum_value"),
            col("value").count().alias("count"),
        ])
        .collect()?;

    println!("Grouped DataFrame with aggregate metrics:");
    println!("{:?}", grouped_df);

    Ok(())
}

數據工程的遊戲規則改變者:DataFusion

DataFusion 是 Rust 數據工程生態系統中最突出的項目之一,它是一個可擴展的查詢執行框架。DataFusion 爲構建高性能的分佈式數據處理系統提供了基礎,是 Apache Arrow 生態系統的一部分,它專注於內存中的列數據處理。

DataFusion 的主要優點

1,內存處理:DataFusion 利用 Apache Arrow 的列式進行內存處理,與傳統的基於行的存儲相比,這大大加快了分析查詢的速度。

2,SQL 支持:DataFusion 支持 SQL,使熟悉基於 SQL 數據操作的廣泛用戶可以訪問它。

3,可擴展性:它的模塊化設計允許開發人員可以根據他們的特定需求擴展和定製它,添加對新數據源、自定義函數等的支持。

4,併發性和並行性:DataFusion 基於 Rust,自然繼承了 Rust 在併發性和並行性方面的優勢,能夠在大型數據集上高效地執行復雜查詢。

爲了說明如何使用 DataFusion,我們看一個需要在大型數據集上執行 SQL 查詢的場景:

use datafusion::prelude::*;
use arrow::record_batch::RecordBatch;

#[tokio::main]
async fn main() -> datafusion::error::Result<(){
    // 創建一個新的執行上下文
    let mut ctx = ExecutionContext::new();

    // 註冊CSV文件
    ctx.register_csv("example""path/to/csv/file", CsvReadOptions::new()).await?;

    // 執行SQL查詢
    let df = ctx.sql("SELECT * FROM example WHERE column_name > 100").await?;

    // 收集結果
    let batches: Vec<RecordBatch> = df.collect().await?;

    // Print the results
    for batch in batches {
        println!("{:?}", batch);
    }

    Ok(())
}

由於 DataFusion 的出現,新的庫如 ballista 將取代 Spark 在數據處理方面的地位,Ballista 是一個用 Rust 編寫的分佈式計算平臺,專爲高性能、大規模數據處理而設計。它利用 Apache Arrow 實現高效的內存列數據表示,利用 DataFusion 實現查詢執行,允許開發人員以分佈式方式執行復雜的數據轉換和分析。

Ballista 旨在爲傳統的大數據框架 (如 Apache Spark) 提供一個現代的、可擴展的替代方案,重點關注安全性、併發性和性能。

例子:

use ballista::prelude::*;
use tokio;

#[tokio::main]
async fn main() -> Result<(){
    // 創建一個Ballista上下文
    let ctx = BallistaContext::local();

    // 註冊CSV文件
    ctx.register_csv("example""path/to/your/data.csv", CsvReadOptions::new()).await?;

    // 執行SQL查詢
    let df = ctx.sql("SELECT * FROM example WHERE some_column > 100").await?;

    // 收集並打印結果
    let batches = df.collect().await?;
    for batch in batches {
        println!("{:?}", batch);
    }

    Ok(())
}

總結

Rust 提供了令人信服的安全性、高性能和高併發性的結合,使其成爲現代數據工程任務的有力候選者,它對於數據工程的各種任務都非常有用。

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/9fWRbQaviGQhQBMa0JrFYQ