文盤 Rust -- rust 連接 oss

對象存儲是雲的基礎組件之一,各大雲廠商都有相關產品。這裏跟大家介紹一下 rust 與對象存儲交到的基本套路和其中的一些技巧。

基本連接

我們以 S3 sdk 爲例來說說基本的連接與操作,作者驗證過 aws、京東雲、阿里雲。主要的增刪改查功能沒有什麼差別。

基本依賴  Cargo.toml

  # oss
  aws-config = { git = "https://github.com/awslabs/aws-sdk-rust", branch = "main" }
  aws-sdk-s3 = { git = "https://github.com/awslabs/aws-sdk-rust", branch = "main" }
  aws-types = { git = "https://github.com/awslabs/aws-sdk-rust", branch = "main",  feature = ["hardcoded-credentials"] }
  aws-credential-types = { git = "https://github.com/awslabs/aws-sdk-rust", branch = "main" }
  aws-smithy-types = { git = "https://github.com/awslabs/aws-sdk-rust", branch = "main" }

建立客戶端

let shared_config = SdkConfig::builder()
         .credentials_provider(SharedCredentialsProvider::new(Credentials::new(
            "LTAI5t7NPuPKsXm6UeSa1",
            "DGHuK03ESXQYqQ83buKMHs9NAwz",
             None,
             None,
             "Static",
         )))
         .endpoint_url("http://oss-cn-beijing.aliyuncs.com")
         .region(Region::new("oss-cn-beijing"))
         .build();
     let s3_config_builder = aws_sdk_s3::config::Builder::from(&shared_config);
     let client = aws_sdk_s3::Client::from_conf(s3_config_builder.build());

建立 Client 所需要的參數主要有你需要訪問的 oss 的 AK、SK,endpoint url 以及服務所在的區域。以上信息都可以在服務商的幫助文檔查詢到。

對象列表

let mut obj_list = client
     .list_objects_v2()
     .bucket(bucket)
     .max_keys(max_keys)
     .prefix(prefix_str)
     .continuation_token(token_str);
let list = obj_list.send().await.unwrap();
println!("{:?}",list.contents());
println!("{:?}",list.next_continuation_token());

使用 list_objects_v2 函數返回對象列表,相比 list_objects 函數,list_objects_v2 可以通過 continuation_token 和 max_keys 控制返回列表的長度。list.contents() 返回對象列表數組,list.next_continuation_token() 返回繼續查詢的 token。

上傳文件

let content = ByteStream::from("content in file".as_bytes());
let exp = aws_smithy_types::DateTime::from_secs(100);
let upload = client
    .put_object()
    .bucket("bucket")
    .key("/test/key")
    .expires(exp)
    .body(content);
upload.send().await.unwrap();

指定 bucket 及對象路徑,body 接受 ByteStream 類型作爲文件內容,最後設置過期時間 expires,無過期時間時不指定該配置即可。

下載文件


let key = "/tmp/test/key".to_string();
let resp = client
    .get_object()
    .bucket("bucket")
    .key(&key)
    .send()
    .await.unwrap();
let data = resp.body.collect().await.unwrap();
let bytes = data.into_bytes();

let path = std::path::Path::new("/tmp/key")
if let Some(p) = path.parent() {
    std::fs::create_dir_all(p).unwrap();
}
let mut file = OpenOptions::new()
    .write(true)
    .truncate(true)
    .create(true)
    .open(path).unwrap();
let _ = file.write(&*bytes);
file.flush().unwrap();

通過 get_object() 函數獲取 GetObjectOutput。返回值的 body 就是文件內容,將 body 轉換爲 bytes,最後打開文件寫入即可。

刪除文件


let mut keys = vec![];
let key1 = ObjectIdentifier::builder()
    .set_key(Some("/tmp/key1".to_string()))
    .build();
let key2 = ObjectIdentifier::builder()
    .set_key(Some("/tmp/key2".to_string()))
    .build()
keys.push(key1);
keys.push(key2)
client
    .delete_objects()
    .bucket(bucket)
    .delete(Delete::builder().set_objects(Some(keys)).build())
    .send()
    .await
    .unwrap();

delete_objects 批量刪除對象。首先構建 keys vector,定義要刪除的對象,然後通過 Delete::builder(),構建 Delete model。

大文件上傳

let mut file = fs::File::open("/tmp/file_name").unwrap();
let chunk_size = 1024*1024;
let mut part_number = 0;
let mut upload_parts: Vec = Vec::new();

//獲取上傳id
let multipart_upload_res: CreateMultipartUploadOutput = self
    .client
    .create_multipart_upload()
    .bucket("bucket")
    .key("/tmp/key")
    .send()
    .await.unwrap();
let upload_id = match multipart_upload_res.upload_id() {
    Some(id) => id,
    None => {
        return Err(anyhow!("upload id is None"));
    }
};

//分段上傳文件並記錄completer_part
loop {
    let mut buf = vec![0; chuck_size];
    let read_count = file.read(&mut buf)?;
    part_number += 1;

    if read_count == 0 {
        break;
    }

    let body = &buf[..read_count];
    let stream = ByteStream::from(body.to_vec());

    let upload_part_res = self
        .client
        .upload_part()
        .key(key)
        .bucket(bucket)
        .upload_id(upload_id)
        .body(stream)
        .part_number(part_number)
        .send()
        .await.unwrap();

    let completer_part = CompletedPart::builder()
        .e_tag(upload_part_res.e_tag.unwrap_or_default())
        .part_number(part_number)
        .build();

    upload_parts.push(completer_part);

    if read_count != chuck_size {
        break;
    }
}
// 完成上傳文件合併
let completed_multipart_upload: CompletedMultipartUpload =
    CompletedMultipartUpload::builder()
        .set_parts(Some(upload_parts))
        .build();

let _complete_multipart_upload_res = self
    .client
    .complete_multipart_upload()
    .bucket("bucket")
    .key(key)
    .multipart_upload(completed_multipart_upload)
    .upload_id(upload_id)
    .send()
    .await.unwrap();

有時候面對大文件,比如幾百兆甚至幾個 G 的文件,爲了節約帶寬和內存,我才採取分段上傳的方案,然後在對象存儲的服務端做合併。基本流程是:指定 bucket 和 key,獲取一個上傳 id;按流讀取文件,分段上傳字節流,並記錄 CompletedPart; 通知服務器按照 CompletedPart 集合來合併文件。具體過程代碼已加註釋,這裏不再累述。

大文件下載

let mut file = match OpenOptions::new()
            .truncate(true)
            .create(true)
            .write(true)
            .open("/tmp/target_file");
let key = "/tmp/test/key".to_string();
let resp = client
    .get_object()
    .bucket("bucket")
    .key(&key)
    .send()
    .await.unwrap();

let content_len = resp.content_length();
let mut byte_stream_async_reader = resp.body.into_async_read();
let mut content_len_usize: usize = content_len.try_into().unwrap();
loop {
    if content_len_usize > chunk_size {
        let mut buffer = vec![0; chunk_size];
        let _ = byte_stream_async_reader.read_exact(&mut buffer).await.unwrap();
        file.write_all(&buffer).unwrap();
        content_len_usize -= chunk_size;
        continue;
    } else {
        let mut buffer = vec![0; content_len_usize];
        let _ = byte_stream_async_reader.read_exact(&mut buffer).await.unwrap();
        file.write_all(&buffer).unwrap();
        break;
    }
}
file.flush().unwrap();

在從對象存儲服務端下載文件的過程中也會遇到大文件問題。爲了節約帶寬和內存,我們採取讀取字節流的方式分段寫入文件。首先 get_object() 函數獲取 ByteStream,通過 async_reader 流式讀取對象字節,分段寫入文件。

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/MCcydxIlPYS8Oxtlv0Nl5g