Python: 如何精確控制 asyncio 中併發運行的多個任務

之前我們瞭解瞭如何創建多個任務來併發運行程序,方式是通過 asyncio.create_task 將協程包裝成任務,如下所示:

import asyncio, time

async def main():
    task1 = asyncio.create_task(asyncio.sleep(3))
    task2 = asyncio.create_task(asyncio.sleep(3))
    task3 = asyncio.create_task(asyncio.sleep(3))

    await task1
    await task2
    await task3

start = time.perf_counter()
asyncio.run(main())
end = time.perf_counter()
print("總耗時:", end - start)
"""
總耗時: 3.003109625
"""

但這種代碼編寫方式只適用於簡單情況,如果在同時發出數百、數千甚至更多 Web 請求的情況下,這種編寫方式將變得冗長且混亂。所以 asyncio 提供了許多便利的函數,支持我們一次性等待多個任務。

等待一組任務全部完成

一個被廣泛用於等待一組任務的方式是使用 asyncio.gather,這個函數接收一系列的可等待對象,允許我們在一行代碼中同時運行它們。如果傳入的 awaitable 對象是協程,gather 函數會自動將其包裝成任務,以確保它們可以同時運行。這意味着不必像之前那樣,用 asyncio.create_task 單獨包裝,但即便如此,還是建議手動包裝一下。

asyncio.gather 同樣返回一個 awaitable 對象,在 await 表達式中使用它時,它將暫停,直到傳遞給它的所有 awaitable 對象都完成爲止。一旦所有任務都完成,asyncio.gather 將返回這些任務的結果所組成的列表。

import asyncio
import time
from aiohttp import ClientSession

async def fetch_status(session: ClientSession, url: str):
    async with session.get(url) as resp:
        return resp.status

async def main():
    async with ClientSession() as session:
        # 注意:requests 裏面是 100 個協程
        # 傳遞給 asyncio.gather 之後會自動被包裝成任務
        requests = [fetch_status(session, "http://www.baidu.com")
                    for _ in range(100)]
        # 併發運行 100 個任務,並等待這些任務全部完成
        # 相比寫 for 循環再單獨 await,這種方式就簡便多了
        status_codes = await asyncio.gather(*requests)
        print(f"{len(status_codes)} 個任務已全部完成")

start = time.perf_counter()
asyncio.run(main())
end = time.perf_counter()
print("總耗時:", end - start)
"""
100 個任務已全部完成
總耗時: 0.552532458
"""

完成 100 個請求只需要 0.55 秒鐘,由於網絡問題,測試的結果可能不準確,但異步肯定比同步要快。

另外傳給 gather 的每個 awaitable 對象可能不是按照確定性順序完成的,例如將協程 a 和 b 按順序傳遞給 gather,但 b 可能會在 a 之前完成。不過 gather 的一個很好的特性是,不管 awaitable 對象何時完成,都保證結果會按照傳遞它們的順序返回。

import asyncio
import time

async def main():
    # asyncio.sleep 還可以接收一個 result 參數,作爲 await 表達式的值
    tasks = [asyncio.sleep(second, result=f"我睡了 {second} 秒")
             for second in (5, 3, 4)]
    print(await asyncio.gather(*tasks))

start = time.perf_counter()
asyncio.run(main())
end = time.perf_counter()
print("總耗時:", end - start)
"""
['我睡了 5 秒', '我睡了 3 秒', '我睡了 4 秒']
總耗時: 5.002968417
"""

然後 gather 還可以實現分組,什麼意思呢?

import asyncio
import time

async def main():
    gather1 = asyncio.gather(
        *[asyncio.sleep(second, result=f"我睡了 {second} 秒")
          for second in (5, 3, 4)]
    )
    gather2 = asyncio.gather(
        *[asyncio.sleep(second, result=f"我睡了 {second} 秒")
          for second in (3, 3, 3)]
    )
    results = await asyncio.gather(
        gather1, gather2, asyncio.sleep(6, "我睡了 6 秒")
    )
    print(results)


start = time.perf_counter()
asyncio.run(main())
end = time.perf_counter()
print("總耗時:", end - start)
"""
[['我睡了 5 秒', '我睡了 3 秒', '我睡了 4 秒'], 
 ['我睡了 3 秒', '我睡了 3 秒', '我睡了 3 秒'], 
 '我睡了 6 秒']
總耗時: 6.002826208
"""

asyncio.gather 裏面可以通過繼續接收 asyncio.gather 返回的對象,從而實現分組功能,還是比較強大的。

如果 gather 裏面啥都不傳的話,那麼會返回一個空列表。

問題來了,在上面的例子中,我們假設所有請求都不會失敗或拋出異常,這是理想情況。但如果請求失敗了呢?我們來看一下,當 gather 裏面的任務出現異常時會發生什麼?

import asyncio

async def normal_running():
    await asyncio.sleep(3)
    return "正常運行"

async def raise_error():
    raise ValueError("出錯啦")

async def main():
    results = await asyncio.gather(normal_running(), raise_error())
    print(results)

loop = asyncio.get_event_loop()
loop.run_until_complete(main())
"""
Traceback (most recent call last):
    ......
    raise ValueError("出錯啦")
ValueError: 出錯啦
"""

我們看到拋異常了,其實 gather 函數的原理就是等待一組任務運行完畢,當某個任務完成時,就調用它的 result 方法,拿到返回值。但我們之前介紹 Future 和 Task 的時候說過,如果出錯了,調用 result 方法會將異常拋出來。

import asyncio

async def normal_running():
    await asyncio.sleep(3)
    return "正常運行"

async def raise_error():
    raise ValueError("出錯啦")

async def main():
    try:
        await asyncio.gather(normal_running(), raise_error())
    except Exception:
        print("執行時出現了異常")
    # 但是剩餘的任務仍在執行,拿到當前的所有正在執行的任務
    all_tasks = asyncio.all_tasks()
    # task 相當於對協程做了一個封裝,那麼通過 get_coro 方法也可以拿到對應的協程
    print(f"當前剩餘的任務:"[task.get_coro().__name__ for task in all_tasks])
    # 繼續等待剩餘的任務完成
    results = await asyncio.gather(
        *[task for task in all_tasks if task.get_coro().__name__ != "main"]
    )
    print(results)

loop = asyncio.get_event_loop()
loop.run_until_complete(main())
"""
執行時出現了異常
當前剩餘的任務: ['main', 'normal_running']
['正常運行']
"""

可以看到在 await asyncio.gather() 的時候,raise_error() 協程拋異常了,那麼異常會向上傳播,在 main() 裏面 await 處產生 ValueError。我們捕獲之後查看剩餘未完成的任務,顯然只剩下 normal_running() 和 main(),因爲任務執行出現異常也代表它完成了。

需要注意的是,一個任務出現了異常,並不影響剩餘未完成的任務,它們仍在後臺運行。我們舉個例子證明這一點:

import asyncio, time

async def normal_running():
    await asyncio.sleep(5)
    return "正常運行"

async def raise_error():
    await asyncio.sleep(3)
    raise ValueError("出錯啦")

async def main():
    try:
        await asyncio.gather(normal_running(), raise_error())
    except Exception:
        print("執行時出現了異常")
    # raise_error() 會在 3 秒後拋異常,然後向上拋,被這裏捕獲
    # 而 normal_running() 不會受到影響,它仍然在後臺運行
    # 顯然接下來它只需要再過 2 秒就能運行完畢
    time.sleep(2)  # 注意:此處會阻塞整個線程
    # asyncio.sleep 是不耗費 CPU 的,因此即使 time.sleep 將整個線程阻塞了,也不影響
    # 因爲執行 time.sleep 時,normal_running() 裏面的 await asyncio.sleep(5) 已經開始執行了
    results = await asyncio.gather(*[task for task in asyncio.all_tasks()
                                     if task.get_coro().__name__ != "main"])
    print(results)

loop = asyncio.get_event_loop()
start = time.perf_counter()
loop.run_until_complete(main())
end = time.perf_counter()
print("總耗時:", end - start)
"""
執行時出現了異常
['正常運行']
總耗時: 5.004949666
"""

這裏耗時是 5 秒,說明一個任務拋異常不會影響其它任務,因爲 time.sleep(2) 執行完畢之後,normal_running() 裏面 asyncio.sleep(5) 也已經執行完畢了,說明異常捕獲之後,剩餘的任務沒有受到影響。

並且這裏我們使用了 time.sleep,在工作中千萬不要這麼做,因爲它會阻塞整個線程,導致主線程無法再做其他事情了。而這裏之所以用 time.sleep,主要是想說明一個任務出錯,那麼將異常捕獲之後,其它任務不會受到影響。

那麼問題來了,如果發生異常,我不希望它將異常向上拋該怎麼辦呢?可能有人覺得這還不簡單,直接來一個異常捕獲不就行了?這是一個解決辦法,但 asyncio.gather 提供了一個參數,可以更優雅的實現這一點。

import asyncio

async def normal_running():
    await asyncio.sleep(3)
    return "正常運行"

async def raise_error():
    raise ValueError("出錯啦")

async def main():
    results = await asyncio.gather(
        normal_running(), raise_error(),
        return_exceptions=True
    )
    print(results)


loop = asyncio.get_event_loop()
loop.run_until_complete(main())
"""
['正常運行', ValueError('出錯啦')]
"""

之前在介紹任務的時候我們說了,不管正常執行結束還是出錯,都代表任務已完成,會將結果和異常都收集起來,只不過其中肯定有一個爲 None。然後根據不同的情況,選擇是否將異常拋出來。所以在 asyncio 裏面,異常只是一個普通的屬性,會保存在任務對象裏面。

對於 asyncio.gather 也是同理,它裏面有一個 return_exceptions 參數,默認爲 False,當任務出現異常時,會拋給 await 所在的位置。如果該參數設置爲 True,那麼出現異常時,會直接把異常本身返回(此時任務也算是結束了)。

在 asyncio 裏面,異常變成了一個可控的屬性。因爲執行是以任務爲單位的,當出現異常時,也會作爲任務的一個普通的屬性。我們可以選擇將它拋出來,也可以選擇隱式處理掉。

至於我們要判斷哪些任務是正常執行,哪些任務是拋了異常,便可以通過返回值來判斷。如果 isinstance(res, Exception) 爲 True,那麼證明任務出現了異常,否則正常執行。雖然這有點笨拙,但也能湊合用,因爲 API 並不完美。

當然以上這些都不能算是缺點,gather 真正的缺點有兩個:

而 asyncio 也提供了用於解決這兩個問題的 API。

在任務完成時立即處理

如果想在某個結果生成之後就對其進行處理,這是一個問題;如果有一些可以快速完成的等待對象,和一些可能需要很長時間完成的等待對象,這也可能是一個問題。因爲 gather 需要等待所有對象執行完畢,這就導致應用程序可能變得無法響應。

想象一個用戶發出 100 個請求,其中兩個很慢,但其餘的都很快完成。如果一旦有請求完成,可以向用戶輸出一些信息,來提升用戶的使用體驗。

爲處理這種情況,asyncio 公開了一個名爲 as_completed 的 API 函數,這個函數接收一個可等待對象(awaitable)組成的列表,並返回一個生成器。通過遍歷,等待它們中的每一個對象都完成,並且哪個先完成,哪個就先被迭代。這意味着將能在結果可用時立即就處理它們,但很明顯此時就沒有所謂的順序了,因爲無法保證哪些請求先完成。

import asyncio
import time

async def delay(seconds):
    await asyncio.sleep(seconds)
    return f"我睡了 {seconds} 秒"

async def main():
    # asyncio 提供的用於等待一組 awaitable 對象的 API 都很智能
    # 如果檢測到你傳遞的是協程,那麼會自動包裝成任務
    # 不過還是建議手動包裝一下
    tasks = [asyncio.create_task(delay(seconds))
             for seconds in (3, 5, 2, 4, 6, 1)]
    for finished in asyncio.as_completed(tasks):
        print(await finished)

loop = asyncio.get_event_loop()
start = time.perf_counter()
loop.run_until_complete(main())
end = time.perf_counter()
print("總耗時:", end - start)
"""
我睡了 1 秒
我睡了 2 秒
我睡了 3 秒
我睡了 4 秒
我睡了 5 秒
我睡了 6 秒
總耗時: 6.000872417
"""

和 gather 不同,gather 是等待一組任務全部完成之後才返回,並且會自動將結果取出來,結果值的順序和添加任務的順序是一致的。對於 as_completed 而言,它會返回一個生成器,我們遍歷它,哪個任務先完成則哪個就先被處理。

那麼問題來了,如果出現異常了該怎麼辦?很簡單,直接異常捕獲即可。

然後我們再來思考一個問題,任何基於 Web 的請求都存在花費很長時間的風險,服務器可能處於過重的資源負載下,或者網絡連接可能很差。

之前我們看到了通過 wait_for 函數可以爲特定請求添加超時,但如果想爲一組請求設置超時怎麼辦?as_completed 函數通過提供一個可選的 timeout 參數來處理這種情況,它允許以秒爲單位指定超時時間。如果花費的時間超過設定的時間,那麼迭代器中的每個可等待對象都會在等待時拋出 TimeoutException。

import asyncio

async def delay(seconds):
    await asyncio.sleep(seconds)
    return f"我睡了 {seconds} 秒"

async def main():
    tasks = [asyncio.create_task(delay(seconds))
             for seconds in (1, 5, 6)]
    for finished in asyncio.as_completed(tasks, timeout=3):
        try:
            print(await finished)
        except asyncio.TimeoutError:
            print("超時啦")

loop = asyncio.get_event_loop()
loop.run_until_complete(main())
"""
我睡了 1 秒
超時啦
超時啦
"""

as_completed 非常適合用於儘快獲得結果,但它也有缺點。

第一個缺點是沒有任何方法可快速瞭解我們正在等待哪個協程或任務,因爲運行順序是完全不確定的。如果不關心順序,這可能沒問題,但如果需要以某種方式將結果與請求相關聯,那麼將面臨挑戰。

第二個缺點是超時,雖然會正確地拋出異常並繼續運行程序,但創建的所有任務仍將在後臺運行。如果想取消它們,很難確定哪些任務仍在運行,這是我們面臨的另一個挑戰。

import asyncio

async def delay(seconds):
    await asyncio.sleep(seconds)
    return f"我睡了 {seconds} 秒"

async def main():
    tasks = [asyncio.create_task(delay(seconds))
             for seconds in (1, 5, 6)]
    for finished in asyncio.as_completed(tasks, timeout=3):
        try:
            print(await finished)
        except asyncio.TimeoutError:
            print("超時啦")

    # tasks[1] 還需要 2 秒運行完畢,tasks[2] 還需要 3 秒運行完畢
    print(tasks[1].done(), tasks[2].done())

    await asyncio.sleep(2)
    # 此時只剩下 tasks[2],還需要 1 秒運行完畢
    print(tasks[1].done(), tasks[2].done())

    await asyncio.sleep(1)
    # tasks[2] 也運行完畢
    print(tasks[1].done(), tasks[2].done())


loop = asyncio.get_event_loop()
loop.run_until_complete(main())
"""
我睡了 1 秒
超時啦
超時啦
False False
True False
True True
"""

根據輸出結果可以發現,雖然因爲抵達超時時間, await 會導致 TimeoutError,但未完成的任務不會受到影響,它們仍然在後臺執行。

但這對於我們來說,有時卻不是一件好事,因爲我們希望如果抵達超時時間,那麼未完成的任務就別在執行了,這時候如何快速找到那些未完成的任務呢?爲處理這種情況,asyncio 提供了另一個 API 函數:wait。

使用 wait 進行細粒度控制

gather 和 as_completed 的缺點之一是,當我們看到異常時,沒有簡單的方法可以取消已經在運行的任務。這在很多情況下可能沒問題,但是想象一個場景:同時發送大批量 Web 請求(參數格式是相同的),如果某個請求的參數格式錯誤(說明所有請求的參數格式都錯了),那麼剩餘的請求還有必要執行嗎?顯然是沒有必要的,而且還會消耗更多資源。另外 as_completed 的另一個缺點是,由於迭代順序是不確定的,因此很難準確跟蹤已完成的任務。

於是 asyncio 提供了 wait 函數,注意它和 wait_for 的區別,wait_for 針對的是單個任務,而 wait 則針對一組任務(不限數量)。

注:wait 函數接收的是一組 awaitable 對象,但未來的版本改爲僅接收任務對象。因此對於 gather、as_completed、wait 等函數,雖然它們會自動包裝成任務,但我們更建議先手動包裝成任務,然後再傳過去。

並且 wait 和 as_completed 接收的都是任務列表,而 gather 則要求將列表打散,以多個位置參數的方式傳遞,因此這些 API 的參數格式不要搞混了。

然後是 wait 函數的返回值,它會返回兩個集合:一個由已完成的任務(執行結束或出現異常)組成的集合,另一個由未完成的任務組成的集合。而 wait 函數的參數,它除了可以接收一個任務列表之外,還可以接收一個 timeout(超時時間)和一個 return_when(用於控制返回條件)。光說很容易亂,我們來實際演示一下。

等待所有任務完成

如果未指定 retun_when,則此選項使用默認值,並且它的行爲與 asyncio.gather 最接近,但也存在一些差異。

import asyncio

async def delay(seconds):
    await asyncio.sleep(seconds)
    return f"我睡了 {seconds} 秒"

async def main():
    tasks = [asyncio.create_task(delay(seconds)) for seconds in (3, 2, 4)]
    # 和 gather 一樣,默認會等待所有任務都完成
    donepending = await asyncio.wait(tasks)
    print(f"已完成的任務數: {len(done)}")
    print(f"未完成的任務數: {len(pending)}")

    for done_task in done:
        print(await done_task)


loop = asyncio.get_event_loop()
loop.run_until_complete(main())
"""
已完成的任務數: 3
未完成的任務數: 0
我睡了 2 秒
我睡了 4 秒
我睡了 3 秒
"""

await asynio.wait 時,會返回兩個集合,分別保存已完成的任務和仍然運行的任務。並且由於返回的是集合,所以是無序的。默認情況下,asyncio.wait 會等到所有任務都完成後才返回,所以待處理集合的長度爲 0。

然後還是要說一下異常,如果某個任務執行時出現異常了該怎麼辦呢?

import asyncio

async def delay(seconds):
    await asyncio.sleep(seconds)
    if seconds == 3:
        raise ValueError("我出錯了(second is 3)")
    return f"我睡了 {seconds} 秒"

async def main():
    tasks = [asyncio.create_task(delay(seconds)) for seconds in range(1, 6)]
    donepending = await asyncio.wait(tasks)
    print(f"已完成的任務數: {len(done)}")
    print(f"未完成的任務數: {len(pending)}")


loop = asyncio.get_event_loop()
loop.run_until_complete(main())
"""
已完成的任務數: 5
未完成的任務數: 0
Task exception was never retrieved
future: <Task finished ... coro=<delay() done, defined at .../main.py:3> 
         exception=ValueError('我出錯了(second is 3)')>
    ......
    raise ValueError("我出錯了(second is 3)")
ValueError: 我出錯了(second is 3)
"""

對於 asyncio.gather 而言,如果某個任務出現異常,那麼異常會向上拋給 await 所在的位置。如果不希望它拋,那麼可以將 gather 裏面的 return_exceptions 參數指定爲 True,這樣當出現異常時,會將異常返回。

而 asyncio.wait 也是如此,如果任務出現異常了,那麼會直接視爲已完成,異常同樣不會向上拋。但是從程序開發的角度來講,返回值可以不要,但異常不能不處理。

所以當任務執行出錯時,雖然異常不會向上拋,但 asyncio 會將它打印出來,於是就有了:Task exception was never retrieved。意思就是該任務出現異常了,但你沒有處理它。

import asyncio

async def delay(seconds):
    await asyncio.sleep(seconds)
    if seconds == 3:
        raise ValueError("我出錯了(second is 3)")
    return f"我睡了 {seconds} 秒"

async def main():
    tasks = [asyncio.create_task(delay(seconds)) for seconds in range(1, 6)]
    # done 裏面保存的都是已完成的任務
    donepending = await asyncio.wait(tasks)
    print(f"已完成的任務數: {len(done)}")
    print(f"未完成的任務數: {len(pending)}")

    # 所以我們直接遍歷 done 即可
    for done_task in done:
        # 這裏不能使用 await done_task,因爲當任務完成時,它就等價於 done_task.result()
        # 而任務出現異常時,調用 result() 是會將異常拋出來的,所以我們需要先檢測異常是否爲空
        exc = done_task.exception()
        if exc:
            print(exc)
        else:
            print(done_task.result())


loop = asyncio.get_event_loop()
loop.run_until_complete(main())
"""
已完成的任務數: 5
未完成的任務數: 0
我睡了 5 秒
我睡了 2 秒
我出錯了(second is 3)
我睡了 4 秒
我睡了 1 秒
"""

這裏調用 result 和 exception 有一個前提,就是任務必須處於已完成狀態,否則會拋異常:InvalidStateError: Result is not ready.。但對於我們當前是沒有問題的,因爲 done 裏面的都是已完成的任務。

這裏能再次看到和 gather 的區別,gather 會幫你把返回值都取出來,放在一個列表中,並且順序就是任務添加的順序。而 wait 返回的是集合,集合裏面是任務,我們需要手動拿到返回值。

某個完成出現異常時取消其它任務

從目前來講,wait 的作用和 gather 沒有太大的區別,都是等到任務全部結束再解除等待(出現異常也視作任務完成,並且其它任務不受影響)。那如果我希望當有任務出現異常時,立即取消其它任務該怎麼做呢?顯然這就依賴 wait 函數里面的 return_when,它有三個可選值:

顯然爲完成這個需求,我們應該將 return_when 指定爲 FIRST_EXCEPTION。

import asyncio

async def delay(seconds):
    await asyncio.sleep(seconds)
    if seconds == 3:
        raise ValueError("我出錯了(second is 3)")
    return f"我睡了 {seconds} 秒"

async def main():
    tasks = [asyncio.create_task(delay(seconds), 睡了 {seconds} 秒的任務")
             for seconds in range(1, 6)]
    done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_EXCEPTION)
    print(f"已完成的任務數: {len(done)}")
    print(f"未完成的任務數: {len(pending)}")

    print("都有哪些任務完成了?")
    for t in done:
        print("    " + t.get_name())

    print("還有哪些任務沒完成?")
    for t in pending:
        print("    " + t.get_name())

loop = asyncio.get_event_loop()
loop.run_until_complete(main())
"""
已完成的任務數: 3
未完成的任務數: 2
都有哪些任務完成了?
    睡了 2 秒的任務
    睡了 3 秒的任務
    睡了 1 秒的任務
還有哪些任務沒完成?
    睡了 4 秒的任務
    睡了 5 秒的任務
"""

當 delay(3) 失敗時,顯然 delay(1)、delay(2) 已完成,而 delay(4) 和 delay(5) 未完成。此時集合 done 裏面的就是已完成的任務,pending 裏面則是未完成的任務。

當 wait 返回時,未完成的任務仍在後臺繼續運行,如果我們希望將剩餘未完成的任務取消掉,那麼直接遍歷 pending 集合即可。

import asyncio

async def delay(seconds):
    await asyncio.sleep(seconds)
    if seconds == 3:
        raise ValueError("我出錯了(second is 3)")
    print(f"我睡了 {seconds} 秒")

async def main():
    tasks = [asyncio.create_task(delay(seconds))
             for seconds in range(1, 6)]
    donepending = await asyncio.wait(tasks, return_when=asyncio.FIRST_EXCEPTION)
    print(f"已完成的任務數: {len(done)}")
    print(f"未完成的任務數: {len(pending)}")
    # 此時未完成的任務仍然在後臺運行,這時候我們可以將它們取消掉
    for t in pending:
        t.cancel()
    # 阻塞 3 秒
    await asyncio.sleep(3)

loop = asyncio.get_event_loop()
loop.run_until_complete(main())
"""
我睡了 1 秒
我睡了 2 秒
已完成的任務數: 3
未完成的任務數: 2
"""

在 await asyncio.sleep(3) 的時候,剩餘兩個任務並沒有輸出,所以任務確實被取消了。注:出現異常的任務會被掛在已完成集合裏面,如果沒有任務在執行時出現異常,那麼效果等價於 ALL_COMPLETED。

當任務完成時處理結果

ALL_COMPLETED 和 FIRST_EXCEPTION 都有一個缺點,在任務成功且不拋出異常的情況下,必須等待所有任務完成。對於之前的用例,這可能是可以接受的,但如果想要在某個協程成功完成後立即處理結果,那麼現在的情況將不能滿足我們的需求。

雖然這個場景可使用 as_completed 實現,但 as_completed 的問題是沒有簡單的方法可以查看哪些任務還在運行,哪些任務已經完成。因爲遍歷的時候,我們無法得知哪個任務先完成,所以 as_completed 無法完成我們的需求。

好在 wait 函數的 return_when 參數可以接收 FIRST_COMPLETED 選項,表示只要有一個任務完成就立即返回,而返回的可以是執行出錯的任務,也可以是成功運行的任務(任務失敗也表示已完成)。然後,我們可以取消其他正在運行的任務,或者讓某些任務繼續運行,具體取決於用例。

import asyncio

async def delay(seconds):
    await asyncio.sleep(seconds)
    if seconds == 3:
        raise ValueError("我出錯了(second is 3)")
    print(f"我睡了 {seconds} 秒")

async def main():
    tasks = [asyncio.create_task(delay(seconds))
             for seconds in range(1, 6)]
    donepending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
    print(f"已完成的任務數: {len(done)}")
    print(f"未完成的任務數: {len(pending)}")

loop = asyncio.get_event_loop()
loop.run_until_complete(main())
"""
我睡了 1 秒
已完成的任務數: 1
未完成的任務數: 4
"""

當 return_when 參數爲 FIRST_COMPLETED 時,那麼只要有一個任務完成就會立即返回,然後我們處理完成的任務即可。至於剩餘的任務,它們仍在後臺運行,我們可以繼續對其使用 wait 函數。

import asyncio

async def delay(seconds):
    await asyncio.sleep(seconds)
    if seconds == 3:
        raise ValueError("我出錯了(second is 3)")
    return f"我睡了 {seconds} 秒"

async def main():
    tasks = [asyncio.create_task(delay(seconds))
             for seconds in range(1, 6)]
    while True:
        donepending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
        for t in done:
            exc = t.exception()
            print(exc) if exc else print(t.result())

        if pending:  # 還有未完成的任務,那麼繼續使用 wait
            tasks = pending
        else:
            break

loop = asyncio.get_event_loop()
loop.run_until_complete(main())
"""
我睡了 1 秒
我睡了 2 秒
我出錯了(second is 3)
我睡了 4 秒
我睡了 5 秒
"""

整個行爲和 as_completed 是一致的,但這種做法有一個好處,就是我們每一步都可以準確地知曉哪些任務已經完成,哪些任務仍然運行,並且也可以做到精確取消指定任務。

處理超時

除了允許對如何等待協程完成進行更細粒度的控制外,wait 還允許設置超時,以指定我們希望等待完成的時間。要啓用此功能,可將 timeout 參數設置爲所需的最大秒數,如果超過了這個超時時間,wait 將立即返回 done 和 pending 任務集。

不過與目前所看到的 wait_for 和 as_completed 相比,超時在 wait 中的行爲方式存在一些差異。

1)協程不會被取消。

當使用 wait_for 時,如果任務超時,則引發 TimeouError,並且任務也會自動取消。但使用 wait 的情況並非如此,它的行爲更接近我們在 as_completed 中看到的情況。如果想因爲超時而取消協程,必須顯式地遍歷任務並取消,否則它們仍在後臺運行。

2)不會引發超時錯誤。

如果發生超時,則 wait 返回所有已完成的任務,以及在發生超時的時候仍處於運行狀態的所有任務。

import asyncio

async def delay(seconds):
    await asyncio.sleep(seconds)
    return f"我睡了 {seconds} 秒"

async def main():
    tasks = [asyncio.create_task(delay(seconds))
             for seconds in range(1, 6)]
    donepending = await asyncio.wait(tasks, timeout=3.1)
    print(f"已完成的任務數: {len(done)}")
    print(f"未完成的任務數: {len(pending)}")

loop = asyncio.get_event_loop()
loop.run_until_complete(main())
"""
已完成的任務數: 3
未完成的任務數: 2
"""

wait 調用將在 3 秒後返回 done 和 pending 集合,在 done 集合中,會有三個已完成的任務。而耗時 4 秒和 5 秒的任務,由於仍在運行,因此它們將出現在 pending 集合中。我們可以繼續等待它們完成並提取返回值,也可以將它們取消掉。

需要注意:和之前一樣,pending 集合中的任務不會被取消,並且繼續運行,儘管會超時。對於要終止待處理任務的情況,我們需要顯式地遍歷 pending 集合並在每個任務上調用 cancel。

爲什麼要先將協程包裝成任務

我們說協程在傳給 wait 的時候會自動包裝成任務,那爲什麼我們還要手動包裝呢?

import asyncio

async def delay(seconds):
    await asyncio.sleep(seconds)
    return f"我睡了 {seconds} 秒"

async def main():
    tasks = [asyncio.create_task(delay(seconds))
             for seconds in range(1, 6)]
    donepending = await asyncio.wait(tasks, timeout=3.1)
    print(all(map(lambda t: t in tasks, done)))
    print(all(map(lambda t: t in tasks, pending)))

loop = asyncio.get_event_loop()
loop.run_until_complete(main())
"""
True
True
"""

如果 wait 函數接收的就是任務,那麼 wait 函數就不會再包裝了,所以 done 和 pending 裏面的任務和 tasks 裏面的任務是相同的。基於這個條件,我們後續可以做一些比較之類的。

比如有很多 Web 請求任務,但如果當未完成的任務是 task1、task2、task3,那麼就取消掉,於是可以這麼做。

for t in pending:
    if t in (task1, task2, task3):
        t.cancel()

如果返回的 done 和 pending 裏的任務,是在 wait 函數中自動創建的,那麼我們就無法進行任何比較來查看 pending 集合中的特定任務。

小結

1)asyncio.gather 函數允許同時運行多個任務,並等待它們完成。一旦傳遞給它的所有任務全部完成,這個函數就會返回。由於 gather 會拿到裏面每個任務的返回值,所以它要求每個任務都是成功的,如果有任務執行出錯(沒有返回值),那麼獲取返回值的時候就會將異常拋出來,然後向上傳遞給 await asyncio.gather。

爲此,可以將 return_exceptions 設置爲 True,這將返回成功完成的可等待對象的結果,以及產生的異常(異常會作爲一個普通的屬性返回,和返回值是等價的)。

2)可使用 as_completed 函數在可等待對象列表完成時處理它們的結果,它會返回一個可以循環遍歷的生成器。一旦某個協程或任務完成,就能訪問結果並處理它。

3)如果希望同時運行多個任務,並且還希望能瞭解哪些任務已經完成,哪些任務在運行,則可以使用 wait。這個函數還允許在返回結果時進行更多控制,返回時,我們會得到一組已經完成的任務和一組仍在運行的任務。

然後可以取消任何想要取消的任務,或執行其他任何需要執行的任務。並且 wait 裏面的任務出現異常,也不會影響其它任務,異常會作爲任務的一個屬性,只是在我們沒有處理的時候會給出警告。至於具體的處理方式,我們直接通過 exception 方法判斷是否發生了異常即可,沒有異常返回 result(),有異常返回 exception()。

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/WQ5-kngWyA0Ce25k3c4w_g