如何動態匹配 tokio 派生線程?
當我們在使用 tokio 和 MPSC(多生產者單消費者) 通道時,通常以某種固定的方式連接派生線程。然而,在最近項目中,必須在各種配置中動態匹配異步生產者和消費者。
在這篇文章中,讓我們來看看如何實現這種非常有用的動態匹配模式。
首先,我們創建一個關於餐廳的 Rust 項目:
cargo init restaurant
在 Cargo.toml 文件中加入依賴項:
[dependencies]
tokio = { version = "1.38.0", features = ["full"] }
然後,在 src/main.rs 文件中寫入業務邏輯代碼。
作爲餐廳經理,可以分配不同的烹飪臺來異步準備不同類型的食物,代碼如下 (現在不用擔心未定義的值):
async fn cooking_stand(food: char) {
loop {
somewhere.send(food.clone()).await;
}
}
食物應該被送到等待上菜的餐桌上,代碼如下:
async fn table (number: u8) {
loop {
let food = somehow.recv().await;
println!("Got {} at table {}", food, number);
}
}
現在可以組織我們的餐廳了:
#[tokio::main]
async fn main() {
// 烹飪臺
tokio::spawn(cooking_stand('🥗')); // 沙拉
tokio::spawn(cooking_stand('🍔')); // 漢堡
// 客人餐桌
tokio::spawn(table(1));
tokio::spawn(table(2));
tokio::time::sleep(Duration::from_millis(1000)).await;
}
爲簡單起見,我們假設通過應用程序接受訂單。例如,餐廳經理 (主線程) 知道餐桌 1 正在等待沙拉,餐桌 3 正在等待漢堡。但如何真正完成這些訂單呢?
初級方法:
cooking_stand -> 🥗🥗🥗🥗🥗 -> -> table 1
cooking_stand -> 🍕🍕🍕🍕🍕 -> manager -> table 2
cooking_stand -> 🍔🍔🍔🍔🍔 -> -> table 3
如果我們強迫經理做這項工作,他可以等待沙拉烹飪站準備沙拉,然後將其傳遞給餐桌 1。然後等待漢堡烹飪臺準備好漢堡,把它端到 3 號餐桌。
這顯然是一個有缺陷的設計:
-
不管是否需要,烹飪臺都會生產食物。
-
如果烹飪臺很慢,那麼經理將必須等待食物準備好。
-
經理不應該做繁重的工作,因爲這會影響他的反應能力。
我們需要服務員,幸運的是,Tokio 爲這項工作提供了完美的工具——oneshot 通道。這些通道被設計和優化爲一次傳遞單個值。
let (waiter_rx, waiter_tx) = oneshot::channel::<char>();
爲了讓服務員先把沙拉送到 1 號桌,需要修改我們的烹飪臺:
use tokio::sync::oneshot;
async fn cooking_stand (
product: char,
mut waiters: tokio::sync::mpsc::Receiver<oneshot::Sender<char>>
) {
while let Some(waiter) = waiters.recv().await {
waiter.send(product.clone());
}
}
其中 tokio::sync::mpsc::Receiver<oneshot::Sender> 是一個等待隊列。是的,你沒看錯,可以通過其他通道封裝一個 oneshot 通道。當服務員到達烹飪臺時,烹飪臺就會把食物準備好,然後交給服務員送到餐桌上。讓我們對餐桌做同樣的事情,他們有特定的服務員接收部分,會給他們送食物:
async fn table (
number: u8,
mut waiters: tokio::sync::mpsc::Receiver<oneshot::Receiver<char>>
) {
while let Some(waiter) = waiters.recv().await {
let food = waiter.await.unwrap();
println!("Got {} at table {}", food, number);
}
}
當服務員被分配到餐桌上時,顧客等待服務員送來烹飪臺生產的食物。爲了完成這個謎題,我們來修改 main 函數。經理可以僱傭服務員,而不是自己做繁重的工作,並將他們分配到匹配的烹飪臺和桌子上,以完成食物訂單。
#[tokio::main]
async fn main() {
// 經理分配服務員到烹飪臺
let (stand_salad_tx, stand_salad_rx) = mpsc::channel::<oneshot::Sender<char>>(100);
let (stand_pizza_tx, stand_pizza_rx) = mpsc::channel::<oneshot::Sender<char>>(100);
let (stand_burger_tx, stand_burger_rx) = mpsc::channel::<oneshot::Sender<char>>(100);
// 搭建烹飪臺
tokio::spawn(cooking_stand('🥗', stand_salad_rx));
tokio::spawn(cooking_stand('🍕', stand_pizza_rx));
tokio::spawn(cooking_stand('🍔', stand_burger_rx));
// 經理分配服務員到餐桌
let mut tables: Vec<tokio::sync::mpsc::Sender<oneshot::Receiver<char>>> = Vec::new();
for number in 1..=4 {
let (table_tx, table_rx) = mpsc::channel::<oneshot::Receiver<char>>(100);
tables.push(table_tx);
tokio::spawn(table(number, table_rx));
}
// t;
}
讓我們通過在 main 函數的末尾添加以下代碼來檢查這種方式是否有效:
// 創建服務員
let (waiter_tx, waiter_rx) = oneshot::channel::<char>();
// 分配到沙拉烹飪臺
stand_salad_tx.send(waiter_tx).await.unwrap();
// 讓他把食物送到1號桌
tables.first().unwrap().send(waiter_rx).await.unwrap();
運行結果:
Got 🥗 at table 1
這種通過常規通道發送 oneshot 通道的模式可以用於實現各種流量控制。以給定的比率、節流等方式傳遞消息。
完整代碼如下:
use std::time::Duration;
use tokio::sync::{mpsc, oneshot};
async fn cooking_stand(
product: char,
mut waiters: tokio::sync::mpsc::Receiver<oneshot::Sender<char>>,
) {
while let Some(waiter) = waiters.recv().await {
waiter.send(product).unwrap();
}
}
async fn table(number: u8, mut waiters: tokio::sync::mpsc::Receiver<oneshot::Receiver<char>>) {
while let Some(waiter) = waiters.recv().await {
let food = waiter.await.unwrap();
println!("Got {} at table {}", food, number);
}
}
#[tokio::main]
async fn main() {
// 經理分配服務員到烹飪臺
let (stand_salad_tx, stand_salad_rx) = mpsc::channel::<oneshot::Sender<char>>(100);
let (stand_pizza_tx, stand_pizza_rx) = mpsc::channel::<oneshot::Sender<char>>(100);
let (stand_burger_tx, stand_burger_rx) = mpsc::channel::<oneshot::Sender<char>>(100);
// 搭建烹飪臺
tokio::spawn(cooking_stand('🥗', stand_salad_rx));
tokio::spawn(cooking_stand('🍕', stand_pizza_rx));
tokio::spawn(cooking_stand('🍔', stand_burger_rx));
// 經理分配服務員到餐桌
let mut tables: Vec<tokio::sync::mpsc::Sender<oneshot::Receiver<char>>> = Vec::new();
for number in 1..=4 {
let (table_tx, table_rx) = mpsc::channel::<oneshot::Receiver<char>>(100);
tables.push(table_tx);
tokio::spawn(table(number, table_rx));
}
// 創建服務員
let (waiter_tx, waiter_rx) = oneshot::channel::<char>();
// 分配到沙拉烹飪臺
stand_salad_tx.send(waiter_tx).await.unwrap();
// 讓他把食物送到1號桌
tables.first().unwrap().send(waiter_rx).await.unwrap();
// 創建服務員
let (waiter_tx, waiter_rx) = oneshot::channel::<char>();
// 分配到披薩烹飪臺
stand_pizza_tx.send(waiter_tx).await.unwrap();
// 讓他把食物送到2號桌
tables.get(1).unwrap().send(waiter_rx).await.unwrap();
// 創建服務員
let (waiter_tx, waiter_rx) = oneshot::channel::<char>();
// 分配到披薩烹飪臺
stand_burger_tx.send(waiter_tx).await.unwrap();
// 讓他把食物送到3號桌
tables.get(2).unwrap().send(waiter_rx).await.unwrap();
tokio::time::sleep(Duration::from_millis(1000)).await;
}
運行結果:
Got 🍕 at table 2
Got 🍔 at table 3
Got 🥗 at table 1
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/UwKIQKwxzW3ZYw9vwIMRGw