「譯」使用 Tokio 實現 Actor 系統
譯者:Matrixtang
本文將不使用任何 Actors 庫 (例如 Actix) 而直接使用 Tokio 實現 Actors 系統。事實上這甚至是更容易的,但是還是有一些細節需要注意:
tokio::spawn
的調用位置。- 使用帶有
run
方法的結構體還是裸函數。 - Actor 的 Handle 函數。
- 背壓 (Backpressure) 和 有界信道。
- 優雅的關閉。
本文概述的技術適用於任何執行器,但爲簡單起見,我們僅討論 Tokio。與 Tokio 教程中的 spawning 和 channel chapters 章節有一些重疊, 當然啦,我建議也閱讀這些章節。
在討論如何編寫 Actor 之前,我們需要知道 Actor 是什麼。Actor 背後的基本思想是產生一個獨立的任務,該任務獨立於程序的其他部分執行某些工作。 通常,這些參與者通過使用消息傳遞信道與程序的其餘部分進行通信。 由於每個 Actor 獨立運行,因此使用它們設計的程序自然是並行的。 Actor 的一個常見用法是爲 Actor 分配你要共享的某些資源的專有所有權,然後讓其他任務通過與 Actor 通信來間接訪問彼此的資源。 例如,如果要實現聊天服務器,則可以爲每個連接生成一個任務,並在其他任務之間路由一個聊天消息的主任務。 十分有用,因爲主任務可以避免必須處理網絡 IO,而連接任務可以專門處理網絡 IO。
實現
Actor 分爲兩部分:任務和 handle。 該任務是獨立生成的 Tokio 任務,實際上執行 Actor 的職責,而 handle 是一種允許你與該任務進行通信的結構。
讓我們考慮一個簡單的 Actor 。 Actor 在內部存儲一個計數器,該計數器用於獲取某種唯一 ID。 Actor 的基本結構如下所示:
use tokio::sync::{oneshot, mpsc};
struct MyActor {
receiver: mpsc::Receiver<ActorMessage>,
next_id: u32,
}
enum ActorMessage {
GetUniqueId {
respond_to: oneshot::Sender<u32>,
},
}
impl MyActor {
fn new(receiver: mpsc::Receiver<ActorMessage>) -> Self {
MyActor {
receiver,
next_id: 0,
}
}
fn handle_message(&mut self, msg: ActorMessage) {
match msg {
ActorMessage::GetUniqueId { respond_to } => {
self.next_id += 1;
let _ = respond_to.send(self.next_id);
},
}
}
}
async fn run_my_actor(mut actor: MyActor) {
while let Some(msg) = actor.receiver.recv().await {
actor.handle_message(msg);
}
}
複製代碼
現在我們有了 Actor 本身,我們還需要一個與 actor 配套的 handle 。 handle 是其他代碼段可以用來與 actor 對話的對象,也是讓 Actor 存活的原因。
以下是 handle 的實現:
#[derive(Clone)]
pub struct MyActorHandle {
sender: mpsc::Sender<ActorMessage>,
}
impl MyActorHandle {
pub fn new() -> Self {
let (sender, receiver) = mpsc::channel(8);
let actor = MyActor::new(receiver);
tokio::spawn(run_my_actor(actor));
Self { sender }
}
pub async fn get_unique_id(&self) -> u32 {
let (send, recv) = oneshot::channel();
let msg = ActorMessage::GetUniqueId {
respond_to: send,
};
let _ = self.sender.send(msg).await;
recv.await.expect("Actor task has been killed")
}
}
複製代碼
讓我們仔細看一下本示例中的不同部分。
ActorMessage.
ActorMessage
枚舉定義了我們可以發送給 Actor 的消息類型。 通過使用這個枚舉,我們可以擁有許多不同的消息類型,並且每種消息類型都可以具有自己的參數集。我們通過 oneshot
信道向 sender 返回值 , 而這種信道只允許發送一條消息。
在上面的示例中,我們在 actor 結構的 handle_message
方法中的枚舉上進行了匹配,但這不是構造此方法的唯一辦法。 也可以在 run_my_actor
函數的枚舉中進行匹配。 然後,此匹配項中的每個分支都可以在 actor 對象上調用各種方法,例如 get_unique_id
。
發送消息時出錯 在處理信道時,並非所有錯誤都是致命 (fatal) 的。 因此,該示例有時使用 let _ =
來忽略錯誤。 通常,如果 receiver 被丟棄,那在信道上的 send
操作將失敗。 在我們的示例中,此操作的第一個實例是 actor 中我們響應已發送的消息的那行 。
let _ = respond_to.send(self.next_id);)
複製代碼
這將發生在接收方不再需要操作的結果的情形下,例如 發送消息的任務可能已被殺死。
關閉 Actor 我們可以通過查看接收消息是否失敗來決定何時關閉 Actor 。 在我們的示例中,這發生在以下 while 循環中:
while let Some(msg) = actor.receiver.recv().await {
actor.handle_message(msg);
}
複製代碼
當所有發送到receiver
的 sender
都被丟棄時,我們就知道將不會再收到其他信息了,因此可以關閉 Actor 。 當這種情況發生時,調用.recv()
將返回 None
,並且由於它與模式Some(msg)
不匹配,while 循環將退出並且函數會返回。
結構體的 run 方法
我上面給出的示例使用的頂層函數並未在任何結構上定義,因爲我們將其作爲 Tokio 任務產生 ,但是許多人發現直接在 MyActor 結構體中定義 run
方法並且啓動更加自然。 也不是不行,但是我舉這個使用頂層函數的示例的原因是,使用這種方法就可以避免很多由生命週期而產生的問題了。 爲了說清楚這種問題,我準備了一個例子,說明不熟悉該模式的人經常會想到什麼。
impl MyActor {
fn run(&mut self) {
tokio::spawn(async move {
while let Some(msg) = self.receiver.recv().await {
self.handle_message(msg);
}
});
}
pub async fn get_unique_id(&self) -> u32 {
let (send, recv) = oneshot::channel();
let msg = ActorMessage::GetUniqueId {
respond_to: send,
};
let _ = self.sender.send(msg).await;
recv.await.expect("Actor task has been killed")
}
}
... and no separate MyActorHandle
複製代碼
這個示例存在兩個問題:
tokio::spawn
在run
方法中被調用。- Actor 和 handle 其實是一個結構體。
導致問題的第一個原因是,因爲tokio :: spawn
函數要求參數爲 'static'
。那就意味着新任務必須擁有完整的所有權,這就導致了該方法借用了self
,所以它無法將 self
的所有權交給新任務。
第二個問題是,因爲 Rust 強制實施了單一所有權原則。 如果將 actor 和 handle 都合併爲同一個結構體,則(至少從編譯器的角度來看)將使每個 handle 都可以訪問 actor 的任務所擁有的全部字段。 例如, next_id
應僅由 actor 任務擁有,而且不應該讓任何 handle 直接訪問。
也就是說,有一個通過解決以上兩個問題,變得可行的版本。代碼如下:
impl MyActor {
async fn run(&mut self) {
while let Some(msg) = self.receiver.recv().await {
self.handle_message(msg);
}
}
}
impl MyActorHandle {
pub fn new() -> Self {
let (sender, receiver) = mpsc::channel(8);
let actor = MyActor::new(receiver);
tokio::spawn(async move { actor.run().await });
Self { sender }
}
}
複製代碼
該函數與頂層函數相同。 請注意,嚴格來講,可以編寫tokio :: spawn
在run
內的那種 , 但是我並不推薦。
actor 的 其他變體
我在本文中的示例使用了參與者使用消息的請求 - 響應模型 (request-response),但是這不是必須的。 在本節中,我將給你一些使用其他方式的例子,給你一些啓發。
不對消息迴應
在之前的示例中我們介紹了一種使用oneshot
信道發送對消息響應的方式,但是並不總是需要響應。在這些情況下,僅在消息枚舉中不包含 oneshot
信道是沒有問題的。當信道中有空間時,這甚至可以讓你在處理完消息之前就返回。 但是仍應確保使用有界信道,以保證在該信道中等待的消息數不會無限增長。在某些情況下,這意味着仍然需要由一個異步函數來處理發送
操作,用於處理等待信道需要更多空間的情況。 但是,還有一種替代方法可以使send
操作成爲異步的。即使用 try_send
方法,並通過簡單地殺死 Actor 來處理發送失敗的情況。這在 Aoctor 管理 TcpStream
時,用於轉發發送到連接中的任何消息的情況下是很有用的。這種情況下,如果無法繼續向 TcpStream
寫入 ,則可直接關閉連接。
多個 handle 共享一個 Actor
如果需要從不同的地方向 actor 發送消息,則可以使用多個 handle 來強制某些消息只能從某些地方發送。 當使用這種方法時,你仍然可以在內部重複使用相同的 mpsc
通道,並使用其中包含所有可能的消息類型的枚舉。 如果你不得不想要爲此使用單獨的信道,則 actor 可以使用 tokio::select!
來一次性衝多個信道中接受信息。
loop {
tokio::select! {
Some(msg) = chan1.recv() => {
},
Some(msg) = chan2.recv() => {
},
else => break,
}
}
複製代碼
需要注意的是在信道關閉時的處理方式,因爲在這種情況下,它們的 recv
方法會立即返回 None
。 幸運的是,tokio :: select!
宏允許您通過提供 Some(msg)
來處理這種情況。 如果僅關閉一個信道,則該分支將被禁用,另外一個信道依舊是可用的。 當兩者都關閉時,else 分支運行並使用break
退出循環。
Actors 間發送信息
讓 Actor 將消息發送給其他 Actor 也是可行的。 爲此,只需爲一個 Actor 提供其他 Actor 的 handle 即可。 當 Actor 形成了循環時,需要上點心,因爲爲了保持彼此的 handle 存活,防止 Actor 被關閉最後一個 sender
不會被丟棄。 爲了處理這種情況,您可以讓一個 actor 具有兩個帶有獨立的mpsc
通道的 handle ,tokio :: select!
會被用在下面這個示例裏 :
loop {
tokio::select! {
opt_msg = chan1.recv() => {
let msg = match opt_msg {
Some(msg) => msg,
None => break,
};
},
Some(msg) = chan2.recv() => {
},
}
}
複製代碼
如果 chan1
關閉,即使chan2
仍然打開,上述循環也將退出。 如果 chan2
是 Actor 循環的一部分,則這會中斷該循環並讓 Actor 關閉。
只需要簡單的在循環裏調用 abort
就可以了。
多個 Actors 共享一個 handle
就像每個 Actor 可以共享多個 handle 一樣,每個 handle 也可以共享多個 Actors 。 最常見的示例是在處理諸如 TcpStream
之類的連接時,通常會產生兩個任務:一個用於讀,一個用於寫。 使用此模式時,需要將讀和寫入任務變得儘可能簡單——它們的唯一工作就是執行 IO。 讀任務會將接收到的所有消息發送給其他任務,通常是另一個 Actor ,而寫任務會將接收到的所有消息轉發給連接。 這種模式非常有用,因爲它把與執行 IO 相關的複雜性隔離開來,這意味着其他程序部分可以假裝將某些內容立即寫入連接,儘管實際的寫入其實是在 Actor 處理消息後進行的。
當心循環
我已經在Actors 間發送信息
標題下討論了一些關於循環的問題,在此我討論瞭如何關閉循環的 Actors。但是,如何關閉並不是循環可能導致的唯一問題,因爲這種循環還會產生死鎖,循環中的每個 Actor 都在等待下一個 Actor 接收消息,但是下一個 Actor 直到它的下一個 Actor 接收到消息纔會接收到該消息,依此類推。 爲避免這種死鎖,必須確保循環的信道容量都不受限。這樣做的原因是有界信道上的 send
方法不會立即返回,而具有立即返回send
方法的信道是不記入這種循環,因爲這種send
方法是不會產生死鎖的。 當心,這意味着oneshot
信道也不會產生死鎖,因爲它們也有 立即返回的 send
方法。還要當心,如果使用的是 try_send
而不是send
來發送消息,那麼這也不是死鎖循環的一部分。
感謝 matklad 指出循環和死鎖的問題。
譯者簡介:
Matrixtang,Rust/cpp 程序員,對編譯相關領域感興趣,不會 pwn 的安全愛好者。
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://juejin.cn/post/6942410107040825374