Rust Async 4- 理解 Future
這次我們利用 tokio 這個庫來嘗試理解 Future。
Tokio 運行時就是管理異步任務並安排他們在 CPU 上執行的組件。如上圖,一個程序可能生成多個任務,每個任務可能包含一個或多個 Future。
下面我們寫一個自定義的 Future,來進行深入的理解。程序結構大致如下:
程序代碼如下:
這裏:
· 第 2 行:引入了一個特殊的類型 Pin(這個在我 Rust Async 視頻教程中介紹過);
· 第 3 行:引入了 Context,它包含異步任務的上下文,可用來喚醒當前的任務;
· 第 7 行:創建了一個自定義的 struct,在第 9 行爲他實現了 Future trait;
· 第 10 行:指定了 Future 完成後將返回的數據類型爲 String;
· 第 12 行:實現了 Future trait 的 poll 方法;
· 第 23-26 行:在 main() 函數里調用了這個 Future 的實現,注意這裏調用的方式和以前的例子不同。
之所以使用 Pin(Pin 可以理解爲固定的意思):是因爲 Future 會被異步運行時反覆的 poll,所以把 Future 給 Pin(固定)到內存中的某一特定位置,這對於異步代碼塊中功能的安全性來說是必要的。Pin 這塊是比較高級的內容,不懂的話照着做就行,就當作是一個技術要求。
第 12 行的 poll 方法會被 tokio 執行器調用,執行器會嘗試解析 Future 來得到最終的值(本例中是 String 類型的值)。如果 Future 的值不可用,那麼當前的任務就註冊到 Waker(喚醒器)組件,以便當 Future 裏的值變得可用的時候,Waker 組件可以通知 tokio 運行時再次調用 Future 上的 poll 方法。
這裏,poll 方法可能返回兩種值:
· Poll::Pending,如果 Future 還沒準備好的話;
· Poll::Ready(值),如果 Future 裏的值變得可用了。
上述程序的執行步驟如下:
1. main 函數生成第一個異步任務;
2. main 函數生成第二個異步任務;
3. 第一個任務調用自定義的 Future,Future 會返回 Poll::Pending;
4. 第二個任務調用異步函數 read_file2(),2 秒後函數返回。
運行程序,結果如下:
你可以注意到程序執行沒有結束,它在等着某些東西。
再理解一下這個例子的代碼,main 函數調用兩個異步的計算 ReadFileFuture 和 read_file2(),然後 main 函數會在 tokio 運行時上分別爲他們生成異步的任務。Tokio 執行器首先 poll 第一個 Future,它會返回 Poll::Pending。然後執行器會 poll 第二個 Future,在 2 秒鐘後它會返回 Poll::Ready 並返回一個值,並打印相應的信息。然後 tokio 運行時繼續等待第一個 Future 準備好被安排下次執行,但這永遠也不會發生,因爲我們 poll 方法代碼裏返回的就是 Poll::Pending。此外還有一點注意:如果某個 Future 完成了,那麼 tokio 運行時不會再調用他了,所以第二個函數只執行了一次。
那麼 tokio 執行器怎麼知道何時再次 poll 第一個 Future 呢?它是一直不斷的對他進行 poll 嗎?肯定不會一直 poll。
Tokio(Rust 的異步設計)是使用一個 Waker 組件來處理這件事的。當被異步執行器 poll 過的任務還沒有準備好產生值的時候,這個任務就被註冊到一個 Waker。Waker 會有一個處理程序(handle),它會被存儲在任務關聯的 Context 對象中。Waker 有一個 wake() 方法,可以用來告訴異步執行器關聯的任務應該被喚醒了。當 wake() 方法被調用了,tokio 執行器就會被通知是時候再次 poll 這個異步的任務了,具體方式就是調用任務上的 poll() 函數。
我們還是看例子吧:
這裏添加了一行代碼(第 15 行)。調用了 Waker 實例上的 wake_by_ref() 這個方法,這就會通知 tokio 運行時異步任務已經準備好可以被安排再次執行了。
現在的執行步驟如下:
1. main 函數生成第一個異步任務;
2. main 函數生成第二個異步任務;
3. 第一個任務調用自定義的 Future,它會調用 Waker 的 wake() 方法,這就會告訴異步運行時(tokio)讓其再次 poll 這個 Future。這個操縱一直循環進行;
4. 第二個任務調用異步函數 read_file2(),2 秒後函數返回。
執行程序後,你會發現 poll() 方法會一直被調用,這是因爲在 poll() 方法裏我們調用了 Waker 實例上的 wake_by_ref() 方法,這就會告訴異步執行器再次 poll,然後就不斷重複這個循環。
講了這麼多 tokio 的東西,下面我們還是看一下 tokio 的組件構成吧:
Tokio 運行時需要理解操作系統(內核)的方法,例如 epoll 來開啓 I/O 操作(讀取網絡數據,讀寫文件等)。
Tokio 運行時會註冊異步的處理程序,以便在事件發生時作爲 I/O 操作的一部分進行調用。而在 Tokio 運行時裏面,從內核監聽這些事件並與 tokio 其他部分通信的組件就是反應器(reactor)。
Tokio 執行器,他會把一個 Future,當其可取得更多進展時,通過調用 Future 的 poll() 方法來驅動其完成。
那麼 Future 是如何告訴執行器他們準備好取得進展了呢?就是 Future 調用 Waker 組件上的 wake() 方法。Waker 組件就會通知執行器,然後再把 Future 放回隊列,並再次調用 poll() 方法,直到 Future 完成。
下面是 Tokio 組件的簡化工作流程:
1. Main 函數在 tokio 運行時上生成任務 1;
2. 任務 1 有一個 Future,會從一個大文件讀取內容;
3. 從文件讀取內容的請求交給到系統內核的文件子系統;
4. 與此同時,任務 2 也被 tokio 運行時安排進行處理;
5. 當任務 1 的文件操作結束時,文件子系統會觸發一個系統中斷,他會被翻譯成 tokio 響應器可識別的一個事件;
6. Tokio 響應器會通知任務 1:文件操作的數據已經準備好;
7. 任務 1 通知他註冊的 Waker 組件:說明他可以產生一個值了;
8. Waker 組件通知 tokio 執行器來調用任務 1 關聯的 poll() 方法;
9. Tokio 執行器安排任務 1 進行處理,並調用 poll() 方法;
- 任務 1 產生一個值。
再概括一下:執行異步 I/O 操作的 Future,會被 tokio 響應器通知一個 I/O 事件。一旦接收到事件,Future 就準備好可以取得進展並調用 Waker 組件。Waker 組件然後告訴 tokio 執行器說 Future 已經準備好取得進展,這就會觸發 tokio 執行器來安排 Future 的執行並調用 Future 上的 poll() 方法。
好,現在我們可以修改程序代碼從 poll() 方法返回一個合理的值:
現在的執行步驟如下:
1. main 函數生成第一個異步任務;
2. main 函數生成第二個異步任務;
3. 第一個任務調用自定義的 Future,他返回 Poll::Ready();
4. 第二個任務調用異步函數 read_file2(),2 秒後函數返回。
其執行結果如下:
這次程序不會一直等待了,他可以正常的結束。
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/yeOJY_i4W3qoC5nywMxwEQ