Python 源碼解密協程隊列和線程隊列的實現原理
本次來聊一聊 Python 的隊列,首先隊列是一種特殊的線性表,具有先進先出(FIFO)的特性,這意味着元素的入隊順序和出隊順序是一致的。
隊列通常用於存儲需要按順序處理的數據,例如任務調度。當然隊列最常見的一個應用場景就是解耦,一個線程不停地生產數據,放到隊列裏,另一個線程從隊列中取數據進行消費。
而 Python 也提供了隊列,分別是協程隊列和線程隊列。
import asyncio
import queue
# 協程隊列
coroutine_queue = asyncio.Queue()
# 線程隊列
threading_queue = queue.Queue()
如果你的程序基於 asyncio,那麼應該使用協程隊列,如果你的程序採用了多線程,那麼應該使用線程隊列。
下面我們來看一看這兩種隊列的 API,以及底層實現原理。
協程隊列
協程隊列的具體實現由 asyncio 提供,以下是它的一些用法。
import asyncio
async def main():
# 創建隊列時可以指定能夠存儲的最大元素個數
# 不指定則沒有容量限制
queue = asyncio.Queue(maxsize=20)
# 返回容量
print(queue.maxsize)
"""
20
"""
# 添加元素,如果隊列滿了會阻塞,直到有剩餘空間
await queue.put(111)
# 添加元素,如果隊列滿了會拋異常
# 因爲不需要阻塞等待,所以 put_nowait 不是協程函數
queue.put_nowait(222)
# 隊列是否已滿
print(queue.full())
"""
False
"""
# 返回隊列內部的元素個數
print(queue.qsize())
"""
2
"""
# 從隊列中獲取元素,如果隊列爲空,會阻塞,直到隊列中有可用元素
print(await queue.get())
"""
111
"""
# 從隊列中獲取元素,如果隊列爲空,會拋異常
# 因爲不需要阻塞等待,所以 put_nowait 不是協程函數
print(queue.get_nowait())
"""
222
"""
# 隊列是否爲空
print(queue.empty())
"""
True
"""
asyncio.run(main())
所以協程隊列的 API 很簡單,我們再羅列一下:
然後,協程隊列還有兩個 API,需要單獨說明,分別是 task_done() 和 join()。
首先在協程隊列內部有一個 _unfinished_tasks 屬性,初始值爲 0,每當往隊列添加一個元素時,該屬性的值就會自增 1。但是從隊列取出元素時,該屬性不會自動減 1,需要手動調用 task_done() 方法。
所以 _unfinished_tasks 記錄了隊列中有多少個任務數據需要處理,每來一個自動加 1,但取走一個不會自動減 1,而是需要 task_done 來實現。
然後 join() 的作用是,當 _unfinished_tasks 不爲 0 的時候,await queue.join() 會阻塞,直到爲 0。
import asyncio
async def consumer(queue, n):
print(f"consumer{n} 開始消費")
await asyncio.sleep(3)
await queue.get()
# 獲取數據後,調用 task_done
queue.task_done()
print(f"consumer{n} 消費完畢")
async def main():
queue = asyncio.Queue()
await queue.put(123)
await queue.put(456)
await queue.put(789)
# 隊列裏面有三個數據,開啓三個消費者去消費
await asyncio.gather(
consumer(queue, 1),
consumer(queue, 2),
consumer(queue, 3),
)
# 這裏會陷入阻塞,直到 _unfinished_tasks 變爲 0
await queue.join()
print("main 解除阻塞")
asyncio.run(main())
"""
consumer1 開始消費
consumer2 開始消費
consumer3 開始消費
consumer1 消費完畢
consumer2 消費完畢
consumer3 消費完畢
main 解除阻塞
"""
還是比較簡單的,然後我們來看一下協程隊列的具體實現細節。
首先協程隊列內部有一個 _queue 屬性,它是一個雙端隊列,負責保存具體的元素。因爲要保證兩端的操作都是高效的,所以採用雙端隊列實現。
然後是 _getters 和 _putters 兩個屬性,它們是做什麼的呢?在隊列滿了的時候,協程往隊列添加元素時會陷入阻塞,等到隊列有剩餘空間時會解除阻塞。同理,在隊列爲空時,協程從隊列獲取元素時會陷入阻塞,等到隊列有可用元素時會解除阻塞。
那麼這個阻塞等待,以及自動喚醒並解除阻塞是怎麼實現的呢?在介紹鎖和信號量的時候,我們分析過整個實現過程,協程隊列與之類似。
假設協程從隊列獲取元素,但是隊列爲空,於是會創建一個 Future 對象,並保存起來,當前保存的地方就是 _getters,它也是雙端隊列。然後 await future,此時就會陷入阻塞,當其它協程往隊列中添加元素時,會將 _getters 裏面的 future 彈出,設置結果集。因此 await future 的協程就會解除阻塞,因爲隊列有可用元素了。
同理,協程往隊列添加元素也是如此,如果隊列滿了,同樣創建一個 Future 對象,並保存起來,當前保存的地方就是 _putters。然後 await future,陷入阻塞,當其它協程從隊列中取出元素,會將 _putters 裏面的 future 彈出,設置結果集。因此 await future 的協程就會解除阻塞,因爲隊列有可用空間了。
三個內部調用的方法,_get 方法負責從隊列的頭部彈出元素,_put 方法負責從隊列的尾部追加元素,比較簡單。然後是 _wakeup_next 方法,它負責喚醒阻塞的協程。參數 waiters 要麼是 _getters,要麼是 _putters,從裏面彈出一個 future,設置結果集,讓對應的協程解除阻塞。
-
qsize() 負責返回隊列的元素個數;
-
maxsize 負責返回隊列的容量;
-
empty() 負責判斷隊列是否爲空;
-
full() 負責判斷隊列是否已滿,如果容量小於等於 0,那麼表示容量無限,隊列永遠不會滿。否則判斷元素個數是否大於等於容量;
然後看看 put_nowait 和 get_nowait,首先是 put_nowait,往隊列添加元素。
如果添加時發現隊列已滿,那麼拋出異常。如果未滿,則調用 _put 方法往 _queue 裏面添加元素,因爲元素的實際存儲是由 self._queue 這個雙端隊列負責的。
添加完畢後,將 _unfinished_task 加 1。最後從 _getters 裏面彈出 future,設置結果集,讓因獲取不到元素而陷入阻塞的協程解除阻塞(同時會將添加的元素取走)。
get_nowait 的邏輯也很簡單,如果隊列爲空,直接拋異常。如果不爲空,則調用 _get 方法從隊列中彈出元素。最後從 _putters 裏面彈出 future,設置結果集,讓因隊列已滿、無法添加元素而陷入阻塞的協程解除阻塞(同時會將元素添加進隊列)。
再來看看 put 方法的實現細節:
結果和我們之前分析的一樣,只是源碼內部多做了一些異常檢測。再來看看 get 方法,它的實現細節和 put 是類似的。
比較簡單,還是沒什麼難度的,最後再來看看 task_done 和 join 兩個方法。
協程隊列裏面使用了 asyncio.Event,它表示事件,如果事件對象沒有調用 set 方法設置標誌位,那麼調用 wait 方法時會陷入阻塞。當事件對象調用 set 方法時,wait 會解除阻塞。
所以協程隊列的 join 方法的邏輯就是,當 _unfinished_tasks 大於 0 時,調用事件對象的 wait 方法陷入阻塞。
而 task_done 方法的作用就是將 _unfinished_tasks 減 1,當它的值屬性爲 0 時,調用事件對象的 set 方法,讓 join 解除阻塞。
以上就是整個協程隊列的實現細節,具體的元素存儲是由 collections.deque 來承載的。並在隊列已滿或者爲空時,通過 Future 對象來實現阻塞等待和自動喚醒。
另外除了先進先出隊列之外,還有先進後出隊列,一般稱爲 LIFO 隊列,它的效果類似於棧。
這個沒什麼好說的,因爲是先進後出,所以添加和彈出都在同一端,直接使用列表實現即可。並且由於 LifoQueue 繼承 Queue,所以它的 API 和普通的協程隊列是一樣的。
除了先進先出隊列,還有一個優先隊列。
它的 API 和普通的協程隊列也是一致的,只不過優先隊列在添加元素時,需要指定一個優先級:(優先級, 元素),優先級的值越低,表示優先級越高。然後在內部,會按照優先級的高低,維護一個小根堆,堆頂元素便是優先級最高的元素。
這幾個隊列具體使用哪一種,則取決於具體的業務場景。
線程隊列
說完了協程隊列,再來看看線程隊列,它們的 API 是類似的,但實現細節則不同。因爲操作系統感知不到協程,所以協程隊列的阻塞等待是基於 Future 實現的,而線程隊列的阻塞等待是基於條件變量(和互斥鎖)實現的。
還是先來看看線程隊列的一些 API,和協程隊列是類似的。
from queue import Queue
# 可以指定一個 maxsize 參數,表示隊列的容量
# 默認爲 0,表示隊列的容量無限
queue = Queue(maxsize=20)
# 查看容量
print(queue.maxsize)
"""
20
"""
# 查看隊列的元素個數
print(queue.qsize())
"""
0
"""
# 判斷隊列是否已滿
print(queue.full())
"""
False
"""
# 判斷隊列是否爲空
print(queue.empty())
"""
True
"""
# 往隊列中添加元素
# block 參數表示是否阻塞,默認爲 True,當隊列已滿時,線程會阻塞
# timeout 表示超時時間,默認爲 None,表示會無限等待
# 當然也可以給 timeout 傳一個具體的值
# 如果在規定時間內,沒有將元素放入隊列,那麼拋異常
queue.put(123, block=True, timeout=None)
# 也是往隊列中添加元素,但是當隊列已滿時,會直接拋異常
# put_nowait(item) 本質上就是 put(item, block=False)
queue.put_nowait(456)
# 從隊列中取出元素
# 同樣可以傳遞 block 和 timeout 參數
# block 默認爲 True,當隊列爲空時會陷入阻塞
# timeout 默認爲 None,表示會無限等待
print(queue.get(block=True, timeout=None))
"""
123
"""
# 也是從隊列中取出元素,但是當隊列爲空時,會直接拋異常
# get_nowait() 本質上就是 get(block=False)
print(queue.get_nowait())
"""
456
"""
# task_done(),將 unfinished_tasks 屬性的值減 1
print(queue.unfinished_tasks)
"""
2
"""
queue.task_done()
queue.task_done()
print(queue.unfinished_tasks)
"""
0
"""
# join(),當 unfinished_tasks 不爲 0 時,陷入阻塞
queue.join()
API 和協程隊列是相似的,我們羅列一下:
線程隊列的具體使用我們已經知道了,下面來看看它的具體實現。
線程隊列的內部依舊使用雙端隊列進行元素存儲,並且還使用了一個互斥鎖和三個條件變量。
爲了保證數據的一致性和線程安全,當隊列在多線程環境中被修改(比如添加或刪除元素)時,需要使用互斥鎖。任何需要修改隊列的操作都必須在獲取到互斥鎖之後進行,以防止多個線程同時對隊列進行修改,否則會導致數據不一致或其它錯誤。同時,一旦對隊列的修改完成,必須立即釋放互斥鎖,以便其它線程可以訪問隊列。
然後是 not_empty 條件變量,當一個新元素被添加到隊列時,應該向 not_empty 發送一個信號。這個動作會通知那些想從隊列中獲取元素,但因隊列爲空而陷入阻塞的線程,現在隊列中已經有了新的元素,它們可以繼續執行獲取元素的操作。
接下來是 not_full 條件變量,當從隊列中取走一個元素時,應該向 not_full 發送一個信號。這個動作通知那些想往隊列添加元素,但因隊列已滿而陷入阻塞的線程,現在隊列中已經有了可用空間,它們可以繼續執行添加元素的操作。
最後是 all_tasks_done 條件變量,當處理的任務全部完成,即計數器 unfinished_task 爲 0 時,應該向 all_tasks_done 發送一個信號。這個動作會通知那些執行了 join() 方法而陷入阻塞的線程,它們可以繼續往下執行了。
因爲線程隊列採用了雙端隊列存儲元素,所以雙端隊列的長度就是線程隊列的元素個數。如果元素個數爲 0,那麼隊列就是空;如果容量大於 0,並且小於等於元素個數,那麼隊列就滿了。
前面說了,put_nowait 和 get_nowait 本質上就是調用了 put 和 get,所以我們的重點是 put 和 get 兩個方法。
以上就是 put 方法的底層實現,不難理解。說完了 put,再來看看 get。
最後是 task_done 和 join 方法,看看它們的內部邏輯。
調用 join 方法,當 unfinished_task 大於 0 時,會陷入阻塞。調用 task_done 方法,會將未完成任務數減 1,如果爲 0,那麼喚醒阻塞等待的線程。
需要注意的是,喚醒調用的方法不是 notify,而是 notify_all。對於添加元素和獲取元素,每次顯然只能喚醒一個線程,此時調用 notify。而 unfinished_task 爲 0 時,應該要喚醒所有等待的線程,因此要調用 notify_all。
最後線程隊列也有相應的 PriorityQueue 和 LifoQueue,它們的用法、實現和協程裏面的這兩個隊列是一樣的。
小結
以上便是協程隊列和線程隊列的具體用法和實現原理,它們本質上都是基於雙端隊列實現具體的元素存儲,並且在隊列已滿和隊列爲空時,可以阻塞等待。
只不過協程隊列是通過 Future 對象實現的,而線程隊列是通過條件變量實現的。
當然,除了協程隊列和線程隊列,還有進程隊列,但進程隊列要複雜的多。因此關於進程隊列的實現細節,我們以後專門花篇幅去介紹。
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/ujbwoG9sjoYdvF9De0RUuQ