Python 多線程與多進程
在學習 Python 的過程中,有接觸到多線程編程相關的知識點,先前一直都沒有徹底的搞明白。今天準備花一些時間,把裏面的細節儘可能的梳理清楚。
線程與進程的區別
進程(process)和線程(thread)是操作系統的基本概念,但是它們比較抽象,不容易掌握。關於多進程和多線程,教科書上最經典的一句話是 “進程是資源分配的最小單位,線程是 CPU 調度的最小單位”。線程是程序中一個單一的順序控制流程。進程內一個相對獨立的、可調度的執行單元,是系統獨立調度和分派 CPU 的基本單位指運行中的程序的調度單位。在單個程序中同時運行多個線程完成不同的工作,稱爲多線程。
進程和線程區別
進程是資源分配的基本單位。所有與該進程有關的資源,都被記錄在進程控制塊 PCB 中。以表示該進程擁有這些資源或正在使用它們。另外,進程也是搶佔處理機的調度單位,它擁有一個完整的虛擬地址空間。當進程發生調度時,不同的進程擁有不同的虛擬地址空間,而同一進程內的不同線程共享同一地址空間。
與進程相對應,線程與資源分配無關,它屬於某一個進程,並與進程內的其他線程一起共享進程的資源。線程只由相關堆棧(系統棧或用戶棧)寄存器和線程控制表 TCB 組成。寄存器可被用來存儲線程內的局部變量,但不能存儲其他線程的相關變量。
通常在一個進程中可以包含若干個線程,它們可以利用進程所擁有的資源。在引入線程的操作系統中,通常都是把進程作爲分配資源的基本單位,而把線程作爲獨立運行和獨立調度的基本單位。
由於線程比進程更小,基本上不擁有系統資源,故對它的調度所付出的開銷就會小得多,能更高效的提高系統內多個程序間併發執行的程度,從而顯著提高系統資源的利用率和吞吐量。
因而近年來推出的通用操作系統都引入了線程,以便進一步提高系統的併發性,並把它視爲現代操作系統的一個重要指標。
線程與進程的區別可以歸納爲以下 4 點:
-
地址空間和其它資源(如打開文件):進程間相互獨立,同一進程的各線程間共享。某進程內的線程在其它進程不可見。
-
通信:進程間通信 IPC,線程間可以直接讀寫進程數據段(如全局變量)來進行通信——需要進程同步和互斥手段的輔助,以保證數據的一致性。
-
調度和切換:線程上下文切換比進程上下文切換要快得多。
-
在多線程 OS 中,進程不是一個可執行的實體。
多進程和多線程的比較
| 對比維度 | 多進程 | 多線程 | 總結 | | --- | --- | --- | --- | | 數據共享、同步 | 數據共享複雜,同步簡單 | 數據共享簡單,同步複雜 | 各有優劣 | | 內存、CPU | 佔用內存多,切換複雜,CPU 利用率低 | 佔用內存少,切換簡單,CPU 利用率高 | 線程佔優 | | 創建、銷燬、切換 | 複雜,速度慢 | 簡單,速度快 | 線程佔優 | | 編程、調試 | 編程簡單,調試簡單 | 編程複雜,調試複雜 | 進程佔優 | | 可靠性 | 進程間不會互相影響 | 一個線程掛掉將導致整個進程掛掉 | 進程佔優 | | 分佈式 | 適用於多核、多機,擴展到多臺機器簡單 | 適合於多核 | 進程佔優 |
總結,進程和線程還可以類比爲火車和車廂:
-
線程在進程下行進(單純的車廂無法運行)
-
一個進程可以包含多個線程(一輛火車可以有多個車廂)
-
不同進程間數據很難共享(一輛火車上的乘客很難換到另外一輛火車,比如站點換乘)
-
同一進程下不同線程間數據很易共享(A 車廂換到 B 車廂很容易)
-
進程要比線程消耗更多的計算機資源(採用多列火車相比多個車廂更耗資源)
-
進程間不會相互影響,一個線程掛掉將導致整個進程掛掉(一列火車不會影響到另外一列火車,但是如果一列火車上中間的一節車廂着火了,將影響到該趟火車的所有車廂)
-
進程可以拓展到多機,進程最多適合多核(不同火車可以開在多個軌道上,同一火車的車廂不能在行進的不同的軌道上)
-
進程使用的內存地址可以上鎖,即一個線程使用某些共享內存時,其他線程必須等它結束,才能使用這一塊內存。(比如火車上的洗手間)-” 互斥鎖(mutex)”
-
進程使用的內存地址可以限定使用量(比如火車上的餐廳,最多隻允許多少人進入,如果滿了需要在門口等,等有人出來了才能進去)-“信號量(semaphore)”
Python 全局解釋器鎖 GIL
全局解釋器鎖(英語:Global Interpreter Lock,縮寫 GIL),並不是 Python 的特性,它是在實現 Python 解析器(CPython)時所引入的一個概念。由於 CPython 是大部分環境下默認的 Python 執行環境。所以在很多人的概念裏 CPython 就是 Python,也就想當然的把 GIL 歸結爲 Python 語言的缺陷。那麼 CPython 實現中的 GIL 又是什麼呢?來看看官方的解釋:
The mechanism used by the CPython interpreter to assure that only one thread executes Python bytecode at a time. This simplifies the CPython implementation by making the object model (including critical built-in types such as dict) implicitly safe against concurrent access. Locking the entire interpreter makes it easier for the interpreter to be multi-threaded, at the expense of much of the parallelism afforded by multi-processor machines.
Python 代碼的執行由 Python 虛擬機 (也叫解釋器主循環,CPython 版本) 來控制,Python 在設計之初就考慮到要在解釋器的主循環中,同時只有一個線程在執行,即在任意時刻,只有一個線程在解釋器中運行。對 Python 虛擬機的訪問由全局解釋器鎖(GIL)來控制,正是這個鎖能保證同一時刻只有一個線程在運行。
GIL 有什麼好處?簡單來說,它在單線程的情況更快,並且在和 C 庫結合時更方便,而且不用考慮線程安全問題,這也是早期 Python 最常見的應用場景和優勢。另外,GIL 的設計簡化了 CPython 的實現,使得對象模型,包括關鍵的內建類型如字典,都是隱含可以併發訪問的。鎖住全局解釋器使得比較容易的實現對多線程的支持,但也損失了多處理器主機的並行計算能力。
在多線程環境中,Python 虛擬機按以下方式執行:
-
設置 GIL
-
切換到一個線程去運行
-
運行直至指定數量的字節碼指令,或者線程主動讓出控制(可以調用 sleep(0))
-
把線程設置爲睡眠狀態
-
解鎖 GIL
-
再次重複以上所有步驟
Python3.2 前,GIL 的釋放邏輯是當前線程遇見 IO 操作或者 ticks 計數達到 100(ticks 可以看作是 python 自身的一個計數器,專門做用於 GIL,每次釋放後歸零,這個計數可以通過 sys.setcheckinterval 來調整),進行釋放。因爲計算密集型線程在釋放 GIL 之後又會立即去申請 GIL,並且通常在其它線程還沒有調度完之前它就已經重新獲取到了 GIL,就會導致一旦計算密集型線程獲得了 GIL,那麼它在很長一段時間內都將佔據 GIL,甚至一直到該線程執行結束。
Python 3.2 開始使用新的 GIL。新的 GIL 實現中用一個固定的超時時間來指示當前的線程放棄全局鎖。在當前線程保持這個鎖,且其他線程請求這個鎖時,當前線程就會在 5 毫秒後被強制釋放該鎖。該改進在單核的情況下,對於單個線程長期佔用 GIL 的情況有所好轉。
在單核 CPU 上,數百次的間隔檢查纔會導致一次線程切換。在多核 CPU 上,存在嚴重的線程顛簸(thrashing)。而每次釋放 GIL 鎖,線程進行鎖競爭、切換線程,會消耗資源。單核下多線程,每次釋放 GIL,喚醒的那個線程都能獲取到 GIL 鎖,所以能夠無縫執行,但多核下,CPU0 釋放 GIL 後,其他 CPU 上的線程都會進行競爭,但 GIL 可能會馬上又被 CPU0 拿到,導致其他幾個 CPU 上被喚醒後的線程會醒着等待到切換時間後又進入待調度狀態,這樣會造成線程顛簸 (thrashing),導致效率更低。
另外,從上面的實現機制可以推導出,Python 的多線程對 IO 密集型代碼要比 CPU 密集型代碼更加友好。
針對 GIL 的應對措施:
-
使用更高版本 Python(對 GIL 機制進行了優化)
-
使用多進程替換多線程(多進程之間沒有 GIL,但是進程本身的資源消耗較多)
-
指定 cpu 運行線程(使用 affinity 模塊)
-
使用 Jython、IronPython 等無 GIL 解釋器
-
全 IO 密集型任務時才使用多線程
-
使用協程(高效的單線程模式,也稱微線程;通常與多進程配合使用)
-
將關鍵組件用 C/C++ 編寫爲 Python 擴展,通過 ctypes 使 Python 程序直接調用 C 語言編譯的動態鏈接庫的導出函數。(with nogil 調出 GIL 限制)
Python 的多進程包 multiprocessing
Python 的 threading 包主要運用多線程的開發,但由於 GIL 的存在,Python 中的多線程其實並不是真正的多線程,如果想要充分地使用多核 CPU 的資源,大部分情況需要使用多進程。在 Python 2.6 版本的時候引入了 multiprocessing 包,它完整的複製了一套 threading 所提供的接口方便遷移。唯一的不同就是它使用了多進程而不是多線程。每個進程有自己的獨立的 GIL,因此也不會出現進程之間的 GIL 爭搶。
藉助這個 multiprocessing,你可以輕鬆完成從單進程到併發執行的轉換。multiprocessing 支持子進程、通信和共享數據、執行不同形式的同步,提供了 Process、Queue、Pipe、Lock 等組件。
Multiprocessing 產生的背景
除了應對 Python 的 GIL 以外,產生 multiprocessing 的另外一個原因時 Windows 操作系統與 Linux/Unix 系統的不一致。
Unix/Linux 操作系統提供了一個 fork() 系統調用,它非常特殊。普通的函數,調用一次,返回一次,但是 fork() 調用一次,返回兩次,因爲操作系統自動把當前進程(父進程)複製了一份(子進程),然後,分別在父進程和子進程內返回。子進程永遠返回 0,而父進程返回子進程的 ID。這樣做的理由是,一個父進程可以 fork 出很多子進程,所以,父進程要記下每個子進程的 ID,而子進程只需要調用 getpid() 就可以拿到父進程的 ID。
Python 的 os 模塊封裝了常見的系統調用,其中就包括 fork,可以在 Python 程序中輕鬆創建子進程:
import os
print('Process (%s) start...' % os.getpid())
\# Only works on Unix/Linux/Mac:
pid = os.fork()
if pid == 0:
print('I am child process (%s) and my parent is %s.' % (os.getpid(), os.getppid()))
else:
print('I (%s) just created a child process (%s).' % (os.getpid(), pid))
上述代碼在 Linux、Unix 和 Mac 上的執行結果爲:
Process (876) start...
I (876) just created a child process (877).
I am child process (877) and my parent is 876.
有了 fork 調用,一個進程在接到新任務時就可以複製出一個子進程來處理新任務,常見的 Apache 服務器就是由父進程監聽端口,每當有新的 http 請求時,就 fork 出子進程來處理新的 http 請求。
由於 Windows 沒有 fork 調用,上面的代碼在 Windows 上無法運行。由於 Python 是跨平臺的,自然也應該提供一個跨平臺的多進程支持。multiprocessing 模塊就是跨平臺版本的多進程模塊。multiprocessing 模塊封裝了 fork()調用,使我們不需要關注 fork()的細節。由於 Windows 沒有 fork 調用,因此,multiprocessing 需要 “模擬” 出 fork 的效果。
multiprocessing 常用組件及功能
創建管理進程模塊:
-
Process(用於創建進程)
-
Pool(用於創建管理進程池)
-
Queue(用於進程通信,資源共享)
-
Value,Array(用於進程通信,資源共享)
-
Pipe(用於管道通信)
-
Manager(用於資源共享)
同步子進程模塊:
-
Condition(條件變量)
-
Event(事件)
-
Lock(互斥鎖)
-
RLock(可重入的互斥鎖 (同一個進程可以多次獲得它,同時不會造成阻塞)
-
Semaphore(信號量)
接下來就一起來學習下每個組件及功能的具體使用方法。
Process(用於創建進程)
multiprocessing 模塊提供了一個 Process 類來代表一個進程對象。
在 multiprocessing 中,每一個進程都用一個 Process 類來表示。
構造方法:Process([group [, target [, name [, args [, kwargs]]]]])
-
group:分組,實際上不使用,值始終爲 None
-
target:表示調用對象,即子進程要執行的任務,你可以傳入方法名
-
name:爲子進程設定名稱
-
args:要傳給 target 函數的位置參數,以元組方式進行傳入。
-
kwargs:要傳給 target 函數的字典參數,以字典方式進行傳入。
實例方法:
-
start():啓動進程,並調用該子進程中的 p.run()
-
run():進程啓動時運行的方法,正是它去調用 target 指定的函數,我們自定義類的類中一定要實現該方法
-
terminate():強制終止進程 p,不會進行任何清理操作,如果 p 創建了子進程,該子進程就成了殭屍進程,使用該方法需要特別小心這種情況。如果 p 還保存了一個鎖那麼也將不會被釋放,進而導致死鎖
-
is_alive():返回進程是否在運行。如果 p 仍然運行,返回 True
-
join([timeout]):進程同步,主進程等待子進程完成後再執行後面的代碼。線程等待 p 終止(強調:是主線程處於等的狀態,而 p 是處於運行的狀態)。timeout 是可選的超時時間(超過這個時間,父線程不再等待子線程,繼續往下執行),需要強調的是,p.join 只能 join 住 start 開啓的進程,而不能 join 住 run 開啓的進程
屬性介紹:
-
daemon:默認值爲 False,如果設爲 True,代表 p 爲後臺運行的守護進程;當 p 的父進程終止時,p 也隨之終止,並且設定爲 True 後,p 不能創建自己的新進程;必須在 p.start() 之前設置
-
name:進程的名稱
-
pid:進程的 pid
-
exitcode:進程在運行時爲 None、如果爲–N,表示被信號 N 結束 (瞭解即可)
-
authkey:進程的身份驗證鍵, 默認是由 os.urandom() 隨機生成的 32 字符的字符串。這個鍵的用途是爲涉及網絡連接的底層進程間通信提供安全性,這類連接只有在具有相同的身份驗證鍵時才能成功(瞭解即可)
使用示例:(注意:在 windows 中 Process() 必須放到 if name == ‘main’: 下)
from multiprocessing import Process
import os
def run_proc(name):
print('Run child process %s (%s)...' % (name, os.getpid()))
if __name__=='__main__':
print('Parent process %s.' % os.getpid())
p = Process(target=run_proc, args=('test',))
print('Child process will start.')
p.start()
p.join()
print('Child process end.')
Pool(用於創建管理進程池)
Pool 類用於需要執行的目標很多,而手動限制進程數量又太繁瑣時,如果目標少且不用控制進程數量則可以用 Process 類。Pool 可以提供指定數量的進程,供用戶調用,當有新的請求提交到 Pool 中時,如果池還沒有滿,那麼就會創建一個新的進程用來執行該請求;但如果池中的進程數已經達到規定最大值,那麼該請求就會等待,直到池中有進程結束,就重用進程池中的進程。
構造方法:Pool([processes[, initializer[, initargs[, maxtasksperchild[, context]]]]])
-
processes :要創建的進程數,如果省略,將默認使用 cpu_count() 返回的數量。
-
initializer:每個工作進程啓動時要執行的可調用對象,默認爲 None。如果 initializer 是 None,那麼每一個工作進程在開始的時候會調用 initializer(*initargs)。
-
initargs:是要傳給 initializer 的參數組。
-
maxtasksperchild:工作進程退出之前可以完成的任務數,完成後用一個新的工作進程來替代原進程,來讓閒置的資源被釋放。maxtasksperchild 默認是 None,意味着只要 Pool 存在工作進程就會一直存活。
-
context: 用在制定工作進程啓動時的上下文,一般使用 Pool() 或者一個 context 對象的 Pool() 方法來創建一個池,兩種方法都適當的設置了 context。
實例方法:
-
apply(func[, args[, kwargs]]):在一個池工作進程中執行 func(args,*kwargs), 然後返回結果。需要強調的是:此操作並不會在所有池工作進程中並執行 func 函數。如果要通過不同參數併發地執行 func 函數,必須從不同線程調用 p.apply() 函數或者使用 p.apply_async()。它是阻塞的。apply 很少使用
-
apply_async(func[, arg[, kwds={}[, callback=None]]]):在一個池工作進程中執行 func(args,*kwargs), 然後返回結果。此方法的結果是 AsyncResult 類的實例,callback 是可調用對象,接收輸入參數。當 func 的結果變爲可用時,將理解傳遞給 callback。callback 禁止執行任何阻塞操作,否則將接收其他異步操作中的結果。它是非阻塞。
-
map(func, iterable[, chunksize=None]):Pool 類中的 map 方法,與內置的 map 函數用法行爲基本一致,它會使進程阻塞直到返回結果。注意,雖然第二個參數是一個迭代器,但在實際使用中,必須在整個隊列都就緒後,程序纔會運行子進程。
-
map_async(func, iterable[, chunksize=None]):map_async 與 map 的關係同 apply 與 apply_async
-
imap():imap 與 map 的區別是,map 是當所有的進程都已經執行完了,並將結果返回了,imap() 則是立即返回一個 iterable 可迭代對象。
-
imap_unordered():不保證返回的結果順序與進程添加的順序一致。
-
close():關閉進程池,防止進一步操作。如果所有操作持續掛起,它們將在工作進程終止前完成。
-
join():等待所有工作進程退出。此方法只能在 close() 或 teminate() 之後調用,讓其不再接受新的 Process。
-
terminate():結束工作進程,不再處理未處理的任務。
方法 apply_async() 和 map_async() 的返回值是 AsyncResul 的實例 obj。實例具有以下方法:
-
get():返回結果,如果有必要則等待結果到達。timeout 是可選的。如果在指定時間內還沒有到達,將引發異常。如果遠程操作中引發了異常,它將在調用此方法時再次被引發。
-
ready():如果調用完成,返回 True
-
successful():如果調用完成且沒有引發異常,返回 True,如果在結果就緒之前調用此方法,引發異常
-
wait([timeout]):等待結果變爲可用。
-
terminate():立即終止所有工作進程,同時不執行任何清理或結束任何掛起工作。如果 p 被垃圾回收,將自動調用此函數
使用示例:
\# -*- coding:utf-8 -*-
\# Pool+map
from multiprocessing import Pool
def test(i):
print(i)
if __name__ == "__main__":
lists = range(100)
pool = Pool(8)
pool.map(test, lists)
pool.close()
pool.join()
\# -*- coding:utf-8 -*-
\# 異步進程池(非阻塞)
from multiprocessing import Pool
def test(i):
print(i)
if __name__ == "__main__":
pool = Pool(8)
for i in range(100):
'''
For循環中執行步驟:
(1)循環遍歷,將100個子進程添加到進程池(相對父進程會阻塞)
(2)每次執行8個子進程,等一個子進程執行完後,立馬啓動新的子進程。(相對父進程不阻塞)
apply_async爲異步進程池寫法。異步指的是啓動子進程的過程,與父進程本身的執行(print)是異步的,而For循環中往進程池添加子進程的過程,與父進程本身的執行卻是同步的。
'''
pool.apply_async(test, args=(i,)) # 維持執行的進程總數爲8,當一個進程執行完後啓動一個新進程.
print("test")
pool.close()
pool.join()
\# -*- coding:utf-8 -*-
\# 異步進程池(非阻塞)
from multiprocessing import Pool
def test(i):
print(i)
if __name__ == "__main__":
pool = Pool(8)
for i in range(100):
'''
實際測試發現,for循環內部執行步驟:
(1)遍歷100個可迭代對象,往進程池放一個子進程
(2)執行這個子進程,等子進程執行完畢,再往進程池放一個子進程,再執行。(同時只執行一個子進程)
for循環執行完畢,再執行print函數。
'''
pool.apply(test, args=(i,)) # 維持執行的進程總數爲8,當一個進程執行完後啓動一個新進程.
print("test")
pool.close()
pool.join()
Queue(用於進程通信,資源共享)
在使用多進程的過程中,最好不要使用共享資源。普通的全局變量是不能被子進程所共享的,只有通過 Multiprocessing 組件構造的數據結構可以被共享。
Queue 是用來創建進程間資源共享的隊列的類,使用 Queue 可以達到多進程間數據傳遞的功能(缺點:只適用 Process 類,不能在 Pool 進程池中使用)。
構造方法:Queue([maxsize])
- maxsize 是隊列中允許最大項數,省略則無大小限制。
實例方法:
-
put():用以插入數據到隊列。put 方法還有兩個可選參數:blocked 和 timeout。如果 blocked 爲 True(默認值),並且 timeout 爲正值,該方法會阻塞 timeout 指定的時間,直到該隊列有剩餘的空間。如果超時,會拋出 Queue.Full 異常。如果 blocked 爲 False,但該 Queue 已滿,會立即拋出 Queue.Full 異常。
-
get():可以從隊列讀取並且刪除一個元素。get 方法有兩個可選參數:blocked 和 timeout。如果 blocked 爲 True(默認值),並且 timeout 爲正值,那麼在等待時間內沒有取到任何元素,會拋出 Queue.Empty 異常。如果 blocked 爲 False,有兩種情況存在,如果 Queue 有一個值可用,則立即返回該值,否則,如果隊列爲空,則立即拋出 Queue.Empty 異常。若不希望在 empty 的時候拋出異常,令 blocked 爲 True 或者參數全部置空即可。
-
get_nowait():同 q.get(False)
-
put_nowait():同 q.put(False)
-
empty():調用此方法時 q 爲空則返回 True,該結果不可靠,比如在返回 True 的過程中,如果隊列中又加入了項目。
-
full():調用此方法時 q 已滿則返回 True,該結果不可靠,比如在返回 True 的過程中,如果隊列中的項目被取走。
-
qsize():返回隊列中目前項目的正確數量,結果也不可靠,理由同 q.empty() 和 q.full() 一樣
使用示例:
from multiprocessing import Process, Queue
import os, time, random
def write(q):
print('Process to write: %s' % os.getpid())
for value in ['A', 'B', 'C']:
print('Put %s to queue...' % value)
q.put(value)
time.sleep(random.random())
def read(q):
print('Process to read: %s' % os.getpid())
while True:
value = q.get(True)
print('Get %s from queue.' % value)
if __name__ == "__main__":
q = Queue()
pw = Process(target=write, args=(q,))
pr = Process(target=read, args=(q,))
pw.start()
pr.start()
pw.join() # 等待pw結束
pr.terminate() # pr進程裏是死循環,無法等待其結束,只能強行終止
JoinableQueue 就像是一個 Queue 對象,但隊列允許項目的使用者通知生成者項目已經被成功處理。通知進程是使用共享的信號和條件變量來實現的。
構造方法:JoinableQueue([maxsize])
- maxsize:隊列中允許最大項數,省略則無大小限制。
實例方法
JoinableQueue 的實例 p 除了與 Queue 對象相同的方法之外還具有:
-
task_done():使用者使用此方法發出信號,表示 q.get() 的返回項目已經被處理。如果調用此方法的次數大於從隊列中刪除項目的數量,將引發 ValueError 異常
-
join(): 生產者調用此方法進行阻塞,直到隊列中所有的項目均被處理。阻塞將持續到隊列中的每個項目均調用 q.task_done()方法爲止
使用示例:
\# -*- coding:utf-8 -*-
from multiprocessing import Process, JoinableQueue
import time, random
def consumer(q):
while True:
res = q.get()
print('消費者拿到了 %s' % res)
q.task_done()
def producer(seq, q):
for item in seq:
time.sleep(random.randrange(1,2))
q.put(item)
print('生產者做好了 %s' % item)
q.join()
if __name__ == "__main__":
q = JoinableQueue()
seq = ('產品%s' % i for i in range(5))
p = Process(target=consumer, args=(q,))
p.daemon = True # 設置爲守護進程,在主線程停止時p也停止,但是不用擔心,producer內調用q.join保證了consumer已經處理完隊列中的所有元素
p.start()
producer(seq, q)
print('主線程')
Value,Array(用於進程通信,資源共享)
multiprocessing 中 Value 和 Array 的實現原理都是在共享內存中創建 ctypes() 對象來達到共享數據的目的,兩者實現方法大同小異,只是選用不同的 ctypes 數據類型而已。
Value
構造方法:Value((typecode_or_type, args[, lock])
-
typecode_or_type:定義 ctypes() 對象的類型,可以傳 Type code 或 C Type,具體對照表見下文。
-
args:傳遞給 typecode_or_type 構造函數的參數
-
lock:默認爲 True,創建一個互斥鎖來限制對 Value 對象的訪問,如果傳入一個鎖,如 Lock 或 RLock 的實例,將用於同步。如果傳入 False,Value 的實例就不會被鎖保護,它將不是進程安全的。
typecode_or_type 支持的類型:
| Type code | C Type | Python Type | Minimum size in bytes |
| --------- | ------------------ | ----------------- | --------------------- |
| `'b'` | signed char | int | 1 |
| `'B'` | unsigned char | int | 1 |
| `'u'` | Py_UNICODE | Unicode character | 2 |
| `'h'` | signed short | int | 2 |
| `'H'` | unsigned short | int | 2 |
| `'i'` | signed int | int | 2 |
| `'I'` | unsigned int | int | 2 |
| `'l'` | signed long | int | 4 |
| `'L'` | unsigned long | int | 4 |
| `'q'` | signed long long | int | 8 |
| `'Q'` | unsigned long long | int | 8 |
| `'f'` | float | float | 4 |
| `'d'` | double | float | 8 |
參考地址:https://docs.python.org/3/library/array.html
Array
構造方法:Array(typecode_or_type, size_or_initializer, **kwds[, lock])
-
typecode_or_type:同上
-
size_or_initializer:如果它是一個整數,那麼它確定數組的長度,並且數組將被初始化爲零。否則,size_or_initializer 是用於初始化數組的序列,其長度決定數組的長度。
-
kwds:傳遞給 typecode_or_type 構造函數的參數
-
lock:同上
使用示例:
import multiprocessing
def f(n, a):
n.value = 3.14
a[0] = 5
if __name__ == '__main__':
num = multiprocessing.Value('d', 0.0)
arr = multiprocessing.Array('i', range(10))
p = multiprocessing.Process(target=f, args=(num, arr))
p.start()
p.join()
print(num.value)
print(arr[:])
注意:Value 和 Array 只適用於 Process 類。
Pipe(用於管道通信)
多進程還有一種數據傳遞方式叫管道原理和 Queue 相同。Pipe 可以在進程之間創建一條管道,並返回元組(conn1,conn2), 其中 conn1,conn2 表示管道兩端的連接對象,強調一點:必須在產生 Process 對象之前產生管道。
構造方法:Pipe([duplex])
- dumplex: 默認管道是全雙工的,如果將 duplex 射成 False,conn1 只能用於接收,conn2 只能用於發送。
實例方法:
-
send(obj):通過連接發送對象。obj 是與序列化兼容的任意對象
-
recv():接收 conn2.send(obj) 發送的對象。如果沒有消息可接收,recv 方法會一直阻塞。如果連接的另外一端已經關閉,那麼 recv 方法會拋出 EOFError。
-
close(): 關閉連接。如果 conn1 被垃圾回收,將自動調用此方法
-
fileno(): 返回連接使用的整數文件描述符
-
poll([timeout]): 如果連接上的數據可用,返回 True。timeout 指定等待的最長時限。如果省略此參數,方法將立即返回結果。如果將 timeout 射成 None,操作將無限期地等待數據到達。
-
recv_bytes([maxlength]): 接收 c.send_bytes() 方法發送的一條完整的字節消息。maxlength 指定要接收的最大字節數。如果進入的消息,超過了這個最大值,將引發 IOError 異常,並且在連接上無法進行進一步讀取。如果連接的另外一端已經關閉,再也不存在任何數據,將引發 EOFError 異常。
-
send_bytes(buffer [, offset [, size]]):通過連接發送字節數據緩衝區,buffer 是支持緩衝區接口的任意對象,offset 是緩衝區中的字節偏移量,而 size 是要發送字節數。結果數據以單條消息的形式發出,然後調用 c.recv_bytes() 函數進行接收
-
recv_bytes_into(buffer [, offset]): 接收一條完整的字節消息,並把它保存在 buffer 對象中,該對象支持可寫入的緩衝區接口(即 bytearray 對象或類似的對象)。offset 指定緩衝區中放置消息處的字節位移。返回值是收到的字節數。如果消息長度大於可用的緩衝區空間,將引發 BufferTooShort 異常。
使用示例:
from multiprocessing import Process, Pipe
import time
\# 子進程執行方法
def f(Subconn):
time.sleep(1)
Subconn.send("吃了嗎")
print("來自父親的問候:", Subconn.recv())
Subconn.close()
if __name__ == "__main__":
parent_conn, child_conn = Pipe() # 創建管道兩端
p = Process(target=f, args=(child_conn,)) # 創建子進程
p.start()
print("來自兒子的問候:", parent_conn.recv())
parent_conn.send("嗯")
Manager(用於資源共享)
Manager() 返回的 manager 對象控制了一個 server 進程,此進程包含的 python 對象可以被其他的進程通過 proxies 來訪問。從而達到多進程間數據通信且安全。Manager 模塊常與 Pool 模塊一起使用。
Manager 支持的類型有 list,dict,Namespace,Lock,RLock,Semaphore,BoundedSemaphore,Condition,Event,Queue,Value 和 Array。
管理器是獨立運行的子進程,其中存在真實的對象,並以服務器的形式運行,其他進程通過使用代理訪問共享對象,這些代理作爲客戶端運行。Manager() 是 BaseManager 的子類,返回一個啓動的 SyncManager() 實例,可用於創建共享對象並返回訪問這些共享對象的代理。
BaseManager,創建管理器服務器的基類
構造方法:BaseManager([address[, authkey]])
-
address:(hostname,port), 指定服務器的網址地址,默認爲簡單分配一個空閒的端口
-
authkey:連接到服務器的客戶端的身份驗證,默認爲 current_process().authkey 的值
實例方法:
-
start([initializer[, initargs]]):啓動一個單獨的子進程,並在該子進程中啓動管理器服務器
-
get_server():獲取服務器對象
-
connect():連接管理器對象
-
shutdown():關閉管理器對象,只能在調用了 start() 方法之後調用
實例屬性:
- address:只讀屬性,管理器服務器正在使用的地址
SyncManager**,**以下類型均不是進程安全的,需要加鎖..
實例方法:
-
Array(self,*args,**kwds)
-
BoundedSemaphore(self,*args,**kwds)
-
Condition(self,*args,**kwds)
-
Event(self,*args,**kwds)
-
JoinableQueue(self,*args,**kwds)
-
Lock(self,*args,**kwds)
-
Namespace(self,*args,**kwds)
-
Pool(self,*args,**kwds)
-
Queue(self,*args,**kwds)
-
RLock(self,*args,**kwds)
-
Semaphore(self,*args,**kwds)
-
Value(self,*args,**kwds)
-
dict(self,*args,**kwds)
-
list(self,*args,**kwds)
使用示例:
import multiprocessing
def f(x, arr, l, d, n):
x.value = 3.14
arr[0] = 5
l.append('Hello')
d[1] = 2
n.a = 10
if __name__ == '__main__':
server = multiprocessing.Manager()
x = server.Value('d', 0.0)
arr = server.Array('i', range(10))
l = server.list()
d = server.dict()
n = server.Namespace()
proc = multiprocessing.Process(target=f, args=(x, arr, l, d, n))
proc.start()
proc.join()
print(x.value)
print(arr)
print(l)
print(d)
print(n)
同步子進程模塊
Lock(互斥鎖)
Lock 鎖的作用是當多個進程需要訪問共享資源的時候,避免訪問的衝突。加鎖保證了多個進程修改同一塊數據時,同一時間只能有一個修改,即串行的修改,犧牲了速度但保證了數據安全。Lock 包含兩種狀態——鎖定和非鎖定,以及兩個基本的方法。
構造方法:Lock()
實例方法:
-
acquire([timeout]): 使線程進入同步阻塞狀態,嘗試獲得鎖定。
-
release(): 釋放鎖。使用前線程必須已獲得鎖定,否則將拋出異常。
使用示例:
from multiprocessing import Process, Lock
def l(lock, num):
lock.acquire()
print("Hello Num: %s" % (num))
lock.release()
if __name__ == '__main__':
lock = Lock() # 這個一定要定義爲全局
for num in range(20):
Process(target=l, args=(lock, num)).start()
RLock(可重入的互斥鎖 (同一個進程可以多次獲得它,同時不會造成阻塞)
RLock(可重入鎖)是一個可以被同一個線程請求多次的同步指令。RLock 使用了 “擁有的線程” 和“遞歸等級”的概念,處於鎖定狀態時,RLock 被某個線程擁有。擁有 RLock 的線程可以再次調用 acquire(),釋放鎖時需要調用 release()相同次數。可以認爲 RLock 包含一個鎖定池和一個初始值爲 0 的計數器,每次成功調用 acquire()/release(),計數器將 + 1/-1,爲 0 時鎖處於未鎖定狀態。
構造方法:RLock()
實例方法:
-
acquire([timeout]):同 Lock
-
release(): 同 Lock
Semaphore(信號量)
信號量是一個更高級的鎖機制。信號量內部有一個計數器而不像鎖對象內部有鎖標識,而且只有當佔用信號量的線程數超過信號量時線程才阻塞。這允許了多個線程可以同時訪問相同的代碼區。比如廁所有 3 個坑,那最多隻允許 3 個人上廁所,後面的人只能等裏面有人出來了才能再進去,如果指定信號量爲 3,那麼來一個人獲得一把鎖,計數加 1,當計數等於 3 時,後面的人均需要等待。一旦釋放,就有人可以獲得一把鎖。
構造方法:Semaphore([value])
- value:設定信號量,默認值爲 1
實例方法:
-
acquire([timeout]):同 Lock
-
release(): 同 Lock
使用示例:
from multiprocessing import Process, Semaphore
import time, random
def go_wc(sem, user):
sem.acquire()
print('%s 佔到一個茅坑' % user)
time.sleep(random.randint(0, 3))
sem.release()
print(user, 'OK')
if __name__ == '__main__':
sem = Semaphore(2)
p_l = []
for i in range(5):
p = Process(target=go_wc, args=(sem, 'user%s' % i,))
p.start()
p_l.append(p)
for i in p_l:
i.join()
Condition(條件變量)
可以把 Condition 理解爲一把高級的鎖,它提供了比 Lock, RLock 更高級的功能,允許我們能夠控制複雜的線程同步問題。Condition 在內部維護一個鎖對象(默認是 RLock),可以在創建 Condigtion 對象的時候把瑣對象作爲參數傳入。Condition 也提供了 acquire, release 方法,其含義與鎖的 acquire, release 方法一致,其實它只是簡單的調用內部鎖對象的對應的方法而已。Condition 還提供了其他的一些方法。
構造方法:Condition([lock/rlock])
- 可以傳遞一個 Lock/RLock 實例給構造方法,否則它將自己生成一個 RLock 實例。
實例方法:
-
acquire([timeout]):首先進行 acquire,然後判斷一些條件。如果條件不滿足則 wait
-
release():釋放 Lock
-
wait([timeout]): 調用這個方法將使線程進入 Condition 的等待池等待通知,並釋放鎖。使用前線程必須已獲得鎖定,否則將拋出異常。處於 wait 狀態的線程接到通知後會重新判斷條件。
-
notify(): 調用這個方法將從等待池挑選一個線程並通知,收到通知的線程將自動調用 acquire() 嘗試獲得鎖定(進入鎖定池);其他線程仍然在等待池中。調用這個方法不會釋放鎖定。使用前線程必須已獲得鎖定,否則將拋出異常。
-
notifyAll(): 調用這個方法將通知等待池中所有的線程,這些線程都將進入鎖定池嘗試獲得鎖定。調用這個方法不會釋放鎖定。使用前線程必須已獲得鎖定,否則將拋出異常。
使用示例:
import multiprocessing
import time
def stage_1(cond):
"""perform first stage of work,
then notify stage_2 to continue
"""
name = multiprocessing.current_process().name
print('Starting', name)
with cond:
print('{} done and ready for stage 2'.format(name))
cond.notify_all()
def stage_2(cond):
"""wait for the condition telling us stage_1 is done"""
name = multiprocessing.current_process().name
print('Starting', name)
with cond:
cond.wait()
print('{} running'.format(name))
if __name__ == '__main__':
condition = multiprocessing.Condition()
s1 = multiprocessing.Process(name='s1',
target=stage_1,
args=(condition,))
s2_clients = [
multiprocessing.Process(
name='stage_2[{}]'.format(i),
target=stage_2,
args=(condition,),
)
for i in range(1, 3)
]
for c in s2_clients:
c.start()
time.sleep(1)
s1.start()
s1.join()
for c in s2_clients:
c.join()
Event(事件)
Event 內部包含了一個標誌位,初始的時候爲 false。可以使用 set() 來將其設置爲 true;或者使用 clear() 將其從新設置爲 false;可以使用 is_set() 來檢查標誌位的狀態;另一個最重要的函數就是 wait(timeout=None),用來阻塞當前線程,直到 event 的內部標誌位被設置爲 true 或者 timeout 超時。如果內部標誌位爲 true 則 wait() 函數理解返回。
使用示例:
import multiprocessing
import time
def wait_for_event(e):
"""Wait for the event to be set before doing anything"""
print('wait_for_event: starting')
e.wait()
print('wait_for_event: e.is_set()->', e.is_set())
def wait_for_event_timeout(e, t):
"""Wait t seconds and then timeout"""
print('wait_for_event_timeout: starting')
e.wait(t)
print('wait_for_event_timeout: e.is_set()->', e.is_set())
if __name__ == '__main__':
e = multiprocessing.Event()
w1 = multiprocessing.Process(
name='block',
target=wait_for_event,
args=(e,),
)
w1.start()
w2 = multiprocessing.Process(
name='nonblock',
target=wait_for_event_timeout,
args=(e, 2),
)
w2.start()
print('main: waiting before calling Event.set()')
time.sleep(3)
e.set()
print('main: event is set')
其他內容
multiprocessing.dummy 模塊與 multiprocessing 模塊的區別:dummy 模塊是多線程,而 multiprocessing 是多進程, api 都是通用的。所有可以很方便將代碼在多線程和多進程之間切換。multiprocessing.dummy 通常在 IO 場景可以嘗試使用,比如使用如下方式引入線程池。
from multiprocessing.dummy import Pool as ThreadPool
multiprocessing.dummy 與早期的 threading,不同的點好像是在多多核 CPU 下,只綁定了一個核心(具體未考證)。
參考文檔:
-
https://docs.python.org/3/library/multiprocessing.html
-
https://www.rddoc.com/doc/Python/3.6.0/zh/library/multiprocessing/
Python 併發之 concurrent.futures
Python 標準庫爲我們提供了 threading 和 multiprocessing 模塊編寫相應的多線程 / 多進程代碼。從 Python3.2 開始,標準庫爲我們提供了 concurrent.futures 模塊,它提供了 ThreadPoolExecutor 和 ProcessPoolExecutor 兩個類,實現了對 threading 和 multiprocessing 的更高級的抽象,對編寫線程池 / 進程池提供了直接的支持。concurrent.futures 基礎模塊是 executor 和 future。
Executor
Executor 是一個抽象類,它不能被直接使用。它爲具體的異步執行定義了一些基本的方法。ThreadPoolExecutor 和 ProcessPoolExecutor 繼承了 Executor,分別被用來創建線程池和進程池的代碼。
ThreadPoolExecutor 對象
ThreadPoolExecutor 類是 Executor 子類,使用線程池執行異步調用。
class concurrent.futures.ThreadPoolExecutor(max_workers)
使用 max_workers 數目的線程池執行異步調用。
ProcessPoolExecutor 對象
ThreadPoolExecutor 類是 Executor 子類,使用進程池執行異步調用。
class concurrent.futures.ProcessPoolExecutor(max_workers=None)
使用 max_workers 數目的進程池執行異步調用,如果 max_workers 爲 None 則使用機器的處理器數目(如 4 核機器 max_worker 配置爲 None 時,則使用 4 個進程進行異步併發)。
submit() 方法
Executor 中定義了 submit() 方法,這個方法的作用是提交一個可執行的回調 task,並返回一個 future 實例。future 對象代表的就是給定的調用。
Executor.submit(fn, *args, **kwargs)
-
fn:需要異步執行的函數
-
*args, **kwargs:fn 參數
使用示例:
from concurrent import futures
def test(num):
import time
return time.ctime(), num
with futures.ThreadPoolExecutor(max_workers=1) as executor:
future = executor.submit(test, 1)
print(future.result())
map() 方法
除了 submit,Exectuor 還爲我們提供了 map 方法,這個方法返回一個 map(func, *iterables) 迭代器,迭代器中的回調執行返回的結果有序的。
Executor.map(func, *iterables, timeout=None)
-
func:需要異步執行的函數
-
*iterables:可迭代對象,如列表等。每一次 func 執行,都會從 iterables 中取參數。
-
timeout:設置每次異步操作的超時時間,timeout 的值可以是 int 或 float,如果操作超時,會返回 raisesTimeoutError;如果不指定 timeout 參數,則不設置超時間。
使用示例:
from concurrent import futures
def test(num):
import time
return time.ctime(), num
data = [1, 2, 3]
with futures.ThreadPoolExecutor(max_workers=1) as executor:
for future in executor.map(test, data):
print(future)
shutdown() 方法
釋放系統資源, 在 Executor.submit() 或 Executor.map() 等異步操作後調用。使用 with 語句可以避免顯式調用此方法。
Executor.shutdown(wait=True)
Future
Future 可以理解爲一個在未來完成的操作,這是異步編程的基礎。通常情況下,我們執行 io 操作,訪問 url 時(如下)在等待結果返回之前會產生阻塞,cpu 不能做其他事情,而 Future 的引入幫助我們在等待的這段時間可以完成其他的操作。
Future 類封裝了可調用的異步執行。Future 實例通過 Executor.submit() 方法創建。
-
cancel():試圖取消調用。如果調用當前正在執行,並且不能被取消,那麼該方法將返回 False,否則調用將被取消,方法將返回 True。
-
cancelled():如果成功取消調用,返回 True。
-
running():如果調用當前正在執行並且不能被取消,返回 True。
-
done():如果調用成功地取消或結束了,返回 True。
-
result(timeout=None):返回調用返回的值。如果調用還沒有完成,那麼這個方法將等待超時秒。如果調用在超時秒內沒有完成,那麼就會有一個 Futures.TimeoutError 將報出。timeout 可以是一個整形或者浮點型數值,如果 timeout 不指定或者爲 None, 等待時間無限。如果 futures 在完成之前被取消了,那麼 CancelledError 將會報出。
-
exception(timeout=None):返回調用拋出的異常,如果調用還未完成,該方法會等待 timeout 指定的時長,如果該時長後調用還未完成,就會報出超時錯誤 futures.TimeoutError。timeout 可以是一個整形或者浮點型數值,如果 timeout 不指定或者爲 None, 等待時間無限。如果 futures 在完成之前被取消了,那麼 CancelledError 將會報出。如果調用完成並且無異常報出,返回 None.
-
add_done_callback(fn):將可調用 fn 捆綁到 future 上,當 Future 被取消或者結束運行,fn 作爲 future 的唯一參數將會被調用。如果 future 已經運行完成或者取消,fn 將會被立即調用。
-
wait(fs, timeout=None, return_when=ALL_COMPLETED)
-
等待 fs 提供的 Future 實例 (possibly created by different Executor instances) 運行結束。返回一個命名的 2 元集合,分表代表已完成的和未完成的
-
return_when 表明什麼時候函數應該返回。它的值必須是一下值之一:
-
FIRST_COMPLETED : 函數在任何 future 結束或者取消的時候返回。
-
FIRST_EXCEPTION :函數在任何 future 因爲異常結束的時候返回,如果沒有 future 報錯,效果等於
-
ALL_COMPLETED : 函數在所有 future 結束後纔會返回。
-
as_completed(fs, timeout=None):參數是一個 Future 實例列表,返回值是一個迭代器,在運行結束後產出 Future 實例 。
使用示例:
from concurrent.futures import ThreadPoolExecutor, wait, as_completed
from time import sleep
from random import randint
def return_after_5_secs(num):
sleep(randint(1, 5))
return "Return of {}".format(num)
pool = ThreadPoolExecutor(5)
futures = []
for x in range(5):
futures.append(pool.submit(return_after_5_secs, x))
print(1)
for x in as_completed(futures):
print(x.result())
print(2)
參考鏈接:
- https://pythonhosted.org/futures/
作者:錢魏 Way
https://www.biaodianfu.com/python-multi-thread-and-multi-process.html
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/i3RvQc5uI9dVZ82pcwOQsQ