人人都能學會的 asyncio 教程

異步函數的定義


普通函數的定義是使用 def 關鍵詞,異步的函數,協程函數 (Coroutine) 本質上是一個函數,特點是在代碼塊中可以將執行權交給其他協程,使用 async def 來定義

如何調用協程並且得到它的運行結果?
調用普通的函數只需要 result = add2(2), 這時函數就可以得到運行,並且將結果 4 返回給 result, 如果使用 result = add3(2), 此時再打印 result 呢?
得到的是一個 coroutine 對象,,並不是 2+3=5 這個結果,怎樣才能得到結果呢?
協程函數想要執行需要放到事件循環裏執行。

事件循環 Eventloop


Eventloop 是 asyncio 應用的核心, 把一些異步函數註冊到這個事件循環上,事件循環會循環執行這些函數, 當執行到某個函數時,如果它正在等待 I/O 返回,如它正在進行網絡請求,或者 sleep 操作,事件循環會暫停它的執行去執行其他的函數;當某個函數完成 I/O 後會恢復,下次循環到它的時候繼續執行。因此,這些異步函數可以協同 (Cooperative) 運行:這就是事件循環的目標。
返回到上面的函數, 想要得到函數執行結果,需要有一個 Eventloop

或者使用 await 關鍵字來修飾函數的調用,如 result = await add3(2), 但是 await 只能用在協程函數中,所以想要用 await 關鍵字就還需要定義一個協程函數

但最終的執行還是需要放到一個事件循環中進行
稍微複雜一點的例子

這段代碼定義了兩個協程,並將它們放到另外一個協程 main 函數中,想要獲得它們運行的結果,事件循環的特點是當它遇到某個 I/O 需要等待 (如這裏的 asyncio.sleep() 函數) 的時候,可以去執行其它的函數,這樣,整個函數執行所需要的時間,應該是所有協程中執行時間最長的那個,對於上面這個代碼來說,一個 sleep 了 3 秒,一個 sleep 了 1 秒,總的用時應該是 3 秒多一點,但結果是這樣嗎?
它的輸出是這樣的

它的用時是 4 秒多一點,而且是先執行了 testa 函數,然後再執行了 testb 函數,是串行的依次執行的,並沒有像我們想象中的併發執行。那應該怎樣才能併發執行呢?

需要將協程放到 asyncio.gather() 中運行,上面的代碼得到的輸出是

可以看到,testa 和 testb 是同步在運行,由於 testb 只 sleep 了 1 秒鐘,所以 testb 先輸出了 Resuming b, 最後將每個協程函數的結果返回,注意,這裏是 gather() 函數里的每一個協程函數都執行完了,它才結果,結果是一個列表,列表裏的值順序和放到 gather 函數里的協程的順序是一致的。
除了使用 asyncio.gather 來執行協程函數以外,還可以使用 Task 任務對象

使用 asyncio.ensure_future(testa(1)) 返回一個 task 對象,此時 task 進入 pending 狀態, 並沒有執行,這時 print(taska) 得到 > 些時,taska.done() 返回 False, 表示它還沒有結束,當調用 await taska 時表示開始執行該協程,當執行結束以後,taska.done() 返回 True,這時可以調用 taska.result() 得到函數的返回值,如果協程還沒有結束就調用 result() 方法則會拋個異常,raise InvalidStateError('Result is not ready.')
創建 task 對象除了使用 asyncio.ensure_future() 方法還可以使用 loop.create_task() 方法

上面一直在使用 asyncio.gather() 函數來執行協程函數,還有一個 asyncio.wait() 函數,它的參數是協程的列表。

asyncio.wait() 返回一個 tuple 對象,對象裏又包含一個已經完成的任務 set 和未完成任務的 set,上面代碼得到的結果是

使用 wait 和 gather 有哪些區別呢?
首先,gather 是需要所有任務都執行結束,如果某一個協程函數崩潰了,則會拋異常,都不會有結果。
wait 可以定義函數返回的時機,可以是 FIRST_COMPLETED(第一個結束的), FIRST_EXCEPTION(第一個出現異常的), ALL_COMPLETED(全部執行完,默認的)

這段代碼要求在出現第一個異常的時候就結果,函數整體不會崩潰,只是如果這裏想要獲取結果的話它是一個異常對象。

可以在實際的工作中,由於以前寫了太多的多線程與多進程,所以對於以前編寫風格和一些由於沒有異步支持的庫函數來說,由於要寫在異步裏,所以對於編寫代碼來說還是要處理很多同步的方法,今天在這裏整理一下在異步操作中如果處理同步的函數問題。
爲了更好的演示,我準備了三個函數,一個同步的函數,兩個異步的函數

單個協程任務的運行


上面的函數,比如說我只想將 asyncfunc1() 函數運行並且得結果,可以使用 loop.create_task() 方法創建一個 task 對象,task 是 Futures 的子類,當調用 loop.run_until_complete() 以後,協程跑完以後,通過 task.result() 獲取協程函數的返回結果。

輸出結果爲

主線程和跑的協程函數是在同一個線程中。
也可以給 task 對象添加一個回調方法

輸出結果爲

loop.run_until_complete 是一個阻塞方法,只有當它裏面的協程運行結束以後這個方法才結束,纔會運行之後的代碼。
其實也可以不調用 loop.run_until_complete 方法,創建一個 task 以後,其實就已經在跑協程函數了,只不過當事件循環如果準備開始運行了,此時的 task 狀態是 pending, 如果不調用事件循環的話,則不會運行協程函數,由於主線程跑完了,子線程也就被銷燬了,如代碼寫成這樣:

得到的輸出是

所以想要使得協程函數得到執行,需要調用事件循環來執行任務,上面的 loop.run_until_complete 就是使循環開始跑了,其實也可以使用 loop.run_forever(), 這個函數就像它的名字一樣,會一直跑。只有事件循環跑起來了,那麼使用該循環註冊的協程纔會得到執行,但是如果使用 loop.run_forever() 則會阻塞在這裏,事件循環還有一個 stop 方法,可以結束循環,我們可以在 task 對象上添加一個回調方法,當協程執行結束以後,調用事件循環的 stop 方法來結束整個循環

除了使用 loop.run_until_complete 方法,還可以使用 asyncio.ensure_future() 方法來運行協程,將上面代碼中的 task = loop.create_task(asyncfunc1()) 改爲 task = asyncio.ensure_future(asyncfunc1()) 會得到相同的結果, 它的參數是協程對象或者 futures,也可以傳 task 對象,因爲 task 是 futures 的子類, 當傳入的是一個協程對象時,返回一個 task 對象,傳入一個 futures 的時候,直接返回 futures 對象, 也就是說,在調用 asyncio.ensure_future() 以後,都會返回一個 task 對象, 都可以爲它添加一個回調方法,並且可以調用 task.result() 方法得到結果 (注意如果 task 沒有執行結束就調用 result 方法,則會拋異常)。

多個協程任務的並行


最上面我準備了兩個異步的函數 asyncfunc1 和 asyncfunc2,如果我想要這兩個函數同時執行,並且得到它們的返回值該怎麼操作呢?
有了上面單協程的經驗,我們也可以使用事件循環創建兩個 task, 然後在 run_forever() 來執行,可以對 task 添加回調,將結果輸出。

輸出結果是

此時由於 loop 調用了 run_forever 方法,且沒有方法調用 stop 方法,所以程序會一直卡着。
這樣是可以將多個協程跑起來,但這樣的處理一是繁瑣,二是不方便結果的回收。
asyncio 有一個 gather 方法,可以傳入多個任務對象,當調用 await asyncio.gather(*) 時,它會將結果全部返回
由於 await 只能寫在 async def 函數中,所以這裏還需要新創建一個函數

兩種定義方式都可以,一個是向 gather 函數傳的是協程對象,一個是傳的 task 對象。之後在調用

得到的輸出爲

這樣就達到的協程的並行與結果的回收。
依然是之前準備的三個函數,一個阻塞的,兩個異步的。

使用傳統的多線程的方式跑同步代碼

輸出結果

可以看到,主線程和子線程跑在了不同的線程中。

在事件循環中動態的添加同步函數


解決方案是,先啓一個子線程,這個線程用來跑事件循環 loop,然後動態的將同步函數添加到事件循環中

由於使用 ping 命令得到很多輸出,所以我對函數稍稍做了修改, 只是模擬打印了一行文字,但是函數中的 time.sleep(2) 這個是一個阻塞式的函數
得到的輸出爲

從輸出結果可以看出,loop.call_soon_threadsafe() 和主線程不是跑在同一個線程中的,雖然 loop.call_soon_threadsafe() 沒有阻塞主線程的運行,但是由於需要跑的函數 ping 是阻塞式函數,所以調用了三次,這三次結果是順序執行的,並沒有實現併發。
如果想要實現併發,需要通過 run_in_executor 把同步函數在一個執行器裏去執行。該方法需要傳入三個參數,run_in_executor(self, executor, func, *args) 第一個是執行器,默認可以傳入 None,如果傳入的是 None,將使用默認的執行器,一般執行器可以使用線程或者進程執行器。

得到的輸出結果

可以看到同步函數實現了併發,但是它們跑在了不同的線程中,這個就和之前傳統的使用多線程是一樣的了。

上文說到,run_in_executor 的第一個參數是執行器,這裏執行器是使用 concurrent.futures 下的兩個類,一個是 thread 一個是 process,也就是執行器可以分爲線程執行器和進程執行器。它們在初始化的時候都有一個 max_workers 參數,如果不傳則根據系統自身決定。

這裏初始化了兩個執行器,一個是線程的,一個是進程的,它們執行的效果一樣,只是一個跑在了多線程,一個跑在了多進程
使用 concurrent.futures.ThreadPoolExecutor() 執行器的結果是

這們的進程 ID 都是 8188,是跑在了同一個進程下。另外注意一下,我這裏在初始化的時候傳一個 max_workers 爲 2,注意看結果的輸出,它是先執行了前兩個,當有一個執行完了以後再開始執行第三個,而不是三個同時運行的。

可以看出來它們的進程 ID 是不同的。
這樣看使用 run_in_executor 和使用多進程和多線程其實意義是一樣的。彆着急,在講完異步函數以後就可以看到區別了。

在事件循環中動態的添加異步函數


通過 asyncio.run_coroutine_threadsafe 方法來動態的將一個協程綁定到事件循環上,並且不會阻塞主線程

通過 asyncio.run_coroutine_threadsafe 在 loop 上綁定了四個協程函數,得到的輸出結果爲

主線程不會被阻塞,起的四個協程函數幾乎同時返回的結果,但是注意,協程所在的線程和主線程不是同一個線程,因爲此時事件循環 loop 是放到了另外的子線程中跑的,所以此時這四個協程放到事件循環的線程中運行的。
注意這裏只有 run_coroutine_threadsafe 方法,沒有 run_coroutine_thread 方法。

獲取協程的返回結果


獲取結果可以使用 asyncio.gather() 方法, 這裏面傳的是 coros_or_futures 就是協程或者 task 對象,asyncio.run_coroutine_threadsafe() 和 run_in_executor() 返回的都是 Future 對象, 所以可以將它們共同放到 gather 裏, 獲取返回值。

代碼執行結果:

總的時間是取決於所有運行的函數中耗時最長的, 這裏同步函數有個阻塞的 sleep(4) , 所以總的時間是 4 秒多一點點。
關於在異步協程中的處理流程先總結這麼多, 之後再學習總結一個與異步相關的各種庫如 aiohttp 的使用等等。

原文鏈接:https://testerhome.com/articles/19703
排版:Python 編程時光

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/9e3R4ZNneRHzfOLhQeYJzQ