Python 多線程與多進程

在學習 Python 的過程中,有接觸到多線程編程相關的知識點,先前一直都沒有徹底的搞明白。今天準備花一些時間,把裏面的細節儘可能的梳理清楚。

線程與進程的區別

進程(process)和線程(thread)是操作系統的基本概念,但是它們比較抽象,不容易掌握。關於多進程和多線程,教科書上最經典的一句話是 “進程是資源分配的最小單位,線程是 CPU 調度的最小單位”。線程是程序中一個單一的順序控制流程。進程內一個相對獨立的、可調度的執行單元,是系統獨立調度和分派 CPU 的基本單位指運行中的程序的調度單位。在單個程序中同時運行多個線程完成不同的工作,稱爲多線程。

進程和線程區別

進程是資源分配的基本單位。所有與該進程有關的資源,都被記錄在進程控制塊 PCB 中。以表示該進程擁有這些資源或正在使用它們。另外,進程也是搶佔處理機的調度單位,它擁有一個完整的虛擬地址空間。當進程發生調度時,不同的進程擁有不同的虛擬地址空間,而同一進程內的不同線程共享同一地址空間。

與進程相對應,線程與資源分配無關,它屬於某一個進程,並與進程內的其他線程一起共享進程的資源。線程只由相關堆棧(系統棧或用戶棧)寄存器和線程控制表 TCB 組成。寄存器可被用來存儲線程內的局部變量,但不能存儲其他線程的相關變量。

通常在一個進程中可以包含若干個線程,它們可以利用進程所擁有的資源。在引入線程的操作系統中,通常都是把進程作爲分配資源的基本單位,而把線程作爲獨立運行和獨立調度的基本單位。

由於線程比進程更小,基本上不擁有系統資源,故對它的調度所付出的開銷就會小得多,能更高效的提高系統內多個程序間併發執行的程度,從而顯著提高系統資源的利用率和吞吐量。

因而近年來推出的通用操作系統都引入了線程,以便進一步提高系統的併發性,並把它視爲現代操作系統的一個重要指標。

線程與進程的區別可以歸納爲以下 4 點:

多進程和多線程的比較

| 對比維度 | 多進程 | 多線程 | 總結 | | --- | --- | --- | --- | | 數據共享、同步 | 數據共享複雜,同步簡單 | 數據共享簡單,同步複雜 | 各有優劣 | | 內存、CPU | 佔用內存多,切換複雜,CPU 利用率低 | 佔用內存少,切換簡單,CPU 利用率高 | 線程佔優 | | 創建、銷燬、切換 | 複雜,速度慢 | 簡單,速度快 | 線程佔優 | | 編程、調試 | 編程簡單,調試簡單 | 編程複雜,調試複雜 | 進程佔優 | | 可靠性 | 進程間不會互相影響 | 一個線程掛掉將導致整個進程掛掉 | 進程佔優 | | 分佈式 | 適用於多核、多機,擴展到多臺機器簡單 | 適合於多核 | 進程佔優 |

總結,進程和線程還可以類比爲火車和車廂:

Python 全局解釋器鎖 GIL

全局解釋器鎖(英語:Global Interpreter Lock,縮寫 GIL),並不是 Python 的特性,它是在實現 Python 解析器(CPython)時所引入的一個概念。由於 CPython 是大部分環境下默認的 Python 執行環境。所以在很多人的概念裏 CPython 就是 Python,也就想當然的把 GIL 歸結爲 Python 語言的缺陷。那麼 CPython 實現中的 GIL 又是什麼呢?來看看官方的解釋:

The mechanism used by the CPython interpreter to assure that only one thread executes Python bytecode at a time. This simplifies the CPython implementation by making the object model (including critical built-in types such as dict) implicitly safe against concurrent access. Locking the entire interpreter makes it easier for the interpreter to be multi-threaded, at the expense of much of the parallelism afforded by multi-processor machines.

Python 代碼的執行由 Python 虛擬機 (也叫解釋器主循環,CPython 版本) 來控制,Python 在設計之初就考慮到要在解釋器的主循環中,同時只有一個線程在執行,即在任意時刻,只有一個線程在解釋器中運行。對 Python 虛擬機的訪問由全局解釋器鎖(GIL)來控制,正是這個鎖能保證同一時刻只有一個線程在運行。

GIL 有什麼好處?簡單來說,它在單線程的情況更快,並且在和 C 庫結合時更方便,而且不用考慮線程安全問題,這也是早期 Python 最常見的應用場景和優勢。另外,GIL 的設計簡化了 CPython 的實現,使得對象模型,包括關鍵的內建類型如字典,都是隱含可以併發訪問的。鎖住全局解釋器使得比較容易的實現對多線程的支持,但也損失了多處理器主機的並行計算能力。

在多線程環境中,Python 虛擬機按以下方式執行:

  1. 設置 GIL

  2. 切換到一個線程去運行

  3. 運行直至指定數量的字節碼指令,或者線程主動讓出控制(可以調用 sleep(0))

  4. 把線程設置爲睡眠狀態

  5. 解鎖 GIL

  6. 再次重複以上所有步驟

Python3.2 前,GIL 的釋放邏輯是當前線程遇見 IO 操作或者 ticks 計數達到 100(ticks 可以看作是 python 自身的一個計數器,專門做用於 GIL,每次釋放後歸零,這個計數可以通過 sys.setcheckinterval 來調整),進行釋放。因爲計算密集型線程在釋放 GIL 之後又會立即去申請 GIL,並且通常在其它線程還沒有調度完之前它就已經重新獲取到了 GIL,就會導致一旦計算密集型線程獲得了 GIL,那麼它在很長一段時間內都將佔據 GIL,甚至一直到該線程執行結束。

Python 3.2 開始使用新的 GIL。新的 GIL 實現中用一個固定的超時時間來指示當前的線程放棄全局鎖。在當前線程保持這個鎖,且其他線程請求這個鎖時,當前線程就會在 5 毫秒後被強制釋放該鎖。該改進在單核的情況下,對於單個線程長期佔用 GIL 的情況有所好轉。

在單核 CPU 上,數百次的間隔檢查纔會導致一次線程切換。在多核 CPU 上,存在嚴重的線程顛簸(thrashing)。而每次釋放 GIL 鎖,線程進行鎖競爭、切換線程,會消耗資源。單核下多線程,每次釋放 GIL,喚醒的那個線程都能獲取到 GIL 鎖,所以能夠無縫執行,但多核下,CPU0 釋放 GIL 後,其他 CPU 上的線程都會進行競爭,但 GIL 可能會馬上又被 CPU0 拿到,導致其他幾個 CPU 上被喚醒後的線程會醒着等待到切換時間後又進入待調度狀態,這樣會造成線程顛簸 (thrashing),導致效率更低。

另外,從上面的實現機制可以推導出,Python 的多線程對 IO 密集型代碼要比 CPU 密集型代碼更加友好。

針對 GIL 的應對措施:

Python 的多進程包 multiprocessing

Python 的 threading 包主要運用多線程的開發,但由於 GIL 的存在,Python 中的多線程其實並不是真正的多線程,如果想要充分地使用多核 CPU 的資源,大部分情況需要使用多進程。在 Python 2.6 版本的時候引入了 multiprocessing 包,它完整的複製了一套 threading 所提供的接口方便遷移。唯一的不同就是它使用了多進程而不是多線程。每個進程有自己的獨立的 GIL,因此也不會出現進程之間的 GIL 爭搶。

藉助這個 multiprocessing,你可以輕鬆完成從單進程到併發執行的轉換。multiprocessing 支持子進程、通信和共享數據、執行不同形式的同步,提供了 Process、Queue、Pipe、Lock 等組件。

Multiprocessing 產生的背景

除了應對 Python 的 GIL 以外,產生 multiprocessing 的另外一個原因時 Windows 操作系統與 Linux/Unix 系統的不一致。

Unix/Linux 操作系統提供了一個 fork() 系統調用,它非常特殊。普通的函數,調用一次,返回一次,但是 fork() 調用一次,返回兩次,因爲操作系統自動把當前進程(父進程)複製了一份(子進程),然後,分別在父進程和子進程內返回。子進程永遠返回 0,而父進程返回子進程的 ID。這樣做的理由是,一個父進程可以 fork 出很多子進程,所以,父進程要記下每個子進程的 ID,而子進程只需要調用 getpid() 就可以拿到父進程的 ID。

Python 的 os 模塊封裝了常見的系統調用,其中就包括 fork,可以在 Python 程序中輕鬆創建子進程:

import os

print('Process (%s) start...' % os.getpid())

\# Only works on Unix/Linux/Mac:

pid = os.fork()

if pid == 0:

    print('I am child process (%s) and my parent is %s.' % (os.getpid(), os.getppid()))

else:

    print('I (%s) just created a child process (%s).' % (os.getpid(), pid))

上述代碼在 Linux、Unix 和 Mac 上的執行結果爲:

Process (876) start...

I (876) just created a child process (877).

I am child process (877) and my parent is 876.

有了 fork 調用,一個進程在接到新任務時就可以複製出一個子進程來處理新任務,常見的 Apache 服務器就是由父進程監聽端口,每當有新的 http 請求時,就 fork 出子進程來處理新的 http 請求。

由於 Windows 沒有 fork 調用,上面的代碼在 Windows 上無法運行。由於 Python 是跨平臺的,自然也應該提供一個跨平臺的多進程支持。multiprocessing 模塊就是跨平臺版本的多進程模塊。multiprocessing 模塊封裝了 fork()調用,使我們不需要關注 fork()的細節。由於 Windows 沒有 fork 調用,因此,multiprocessing 需要 “模擬” 出 fork 的效果。

multiprocessing 常用組件及功能

創建管理進程模塊:

同步子進程模塊:

接下來就一起來學習下每個組件及功能的具體使用方法。

Process(用於創建進程)

multiprocessing 模塊提供了一個 Process 類來代表一個進程對象。

在 multiprocessing 中,每一個進程都用一個 Process 類來表示。

構造方法:Process([group [, target [, name [, args [, kwargs]]]]])

實例方法:

屬性介紹:

使用示例:(注意:在 windows 中 Process() 必須放到 if name == ‘main’: 下)

from multiprocessing import Process

import os

def run_proc(name):

    print('Run child process %s (%s)...' % (name, os.getpid()))

if __name__=='__main__':

    print('Parent process %s.' % os.getpid())

    p = Process(target=run_proc, args=('test',))

    print('Child process will start.')

    p.start()

    p.join()

print('Child process end.')

Pool(用於創建管理進程池)

Pool 類用於需要執行的目標很多,而手動限制進程數量又太繁瑣時,如果目標少且不用控制進程數量則可以用 Process 類。Pool 可以提供指定數量的進程,供用戶調用,當有新的請求提交到 Pool 中時,如果池還沒有滿,那麼就會創建一個新的進程用來執行該請求;但如果池中的進程數已經達到規定最大值,那麼該請求就會等待,直到池中有進程結束,就重用進程池中的進程。

構造方法:Pool([processes[, initializer[, initargs[, maxtasksperchild[, context]]]]])

實例方法:

方法 apply_async() 和 map_async() 的返回值是 AsyncResul 的實例 obj。實例具有以下方法:

使用示例:

\# -*- coding:utf-8 -*-

\# Pool+map

from multiprocessing import Pool

def test(i):

    print(i)

if __name__ == "__main__":

    lists = range(100)

    pool = Pool(8)

    pool.map(test, lists)

    pool.close()

    pool.join()
\# -*- coding:utf-8 -*-

\# 異步進程池(非阻塞)

from multiprocessing import Pool

def test(i):

    print(i)

if __name__ == "__main__":

    pool = Pool(8)

    for i in range(100):

        '''

        For循環中執行步驟:

        (1)循環遍歷,將100個子進程添加到進程池(相對父進程會阻塞)

        (2)每次執行8個子進程,等一個子進程執行完後,立馬啓動新的子進程。(相對父進程不阻塞)

        apply_async爲異步進程池寫法。異步指的是啓動子進程的過程,與父進程本身的執行(print)是異步的,而For循環中往進程池添加子進程的過程,與父進程本身的執行卻是同步的。

        '''

        pool.apply_async(test, args=(i,))  # 維持執行的進程總數爲8,當一個進程執行完後啓動一個新進程.

    print("test")

    pool.close()

    pool.join()
\# -*- coding:utf-8 -*-

\# 異步進程池(非阻塞)

from multiprocessing import Pool

def test(i):

    print(i)

if __name__ == "__main__":

    pool = Pool(8)

    for i in range(100):

        '''

            實際測試發現,for循環內部執行步驟:

            (1)遍歷100個可迭代對象,往進程池放一個子進程

            (2)執行這個子進程,等子進程執行完畢,再往進程池放一個子進程,再執行。(同時只執行一個子進程)

            for循環執行完畢,再執行print函數。

        '''

        pool.apply(test, args=(i,))  # 維持執行的進程總數爲8,當一個進程執行完後啓動一個新進程.

    print("test")

    pool.close()

    pool.join()

Queue(用於進程通信,資源共享)

在使用多進程的過程中,最好不要使用共享資源。普通的全局變量是不能被子進程所共享的,只有通過 Multiprocessing 組件構造的數據結構可以被共享。

Queue 是用來創建進程間資源共享的隊列的類,使用 Queue 可以達到多進程間數據傳遞的功能(缺點:只適用 Process 類,不能在 Pool 進程池中使用)。

構造方法:Queue([maxsize])

實例方法:

使用示例:

from multiprocessing import Process, Queue

import os, time, random

def write(q):

    print('Process to write: %s' % os.getpid())

    for value in ['A''B''C']:

        print('Put %s to queue...' % value)

        q.put(value)

        time.sleep(random.random())

def read(q):

    print('Process to read: %s' % os.getpid())

    while True:

        value = q.get(True)

        print('Get %s from queue.' % value)

if __name__ == "__main__":

    q = Queue()

    pw = Process(target=write, args=(q,))

    pr = Process(target=read, args=(q,))

    pw.start()

    pr.start()

    pw.join()  # 等待pw結束

    pr.terminate()  # pr進程裏是死循環,無法等待其結束,只能強行終止

JoinableQueue 就像是一個 Queue 對象,但隊列允許項目的使用者通知生成者項目已經被成功處理。通知進程是使用共享的信號和條件變量來實現的。

構造方法:JoinableQueue([maxsize])

實例方法

JoinableQueue 的實例 p 除了與 Queue 對象相同的方法之外還具有:

使用示例:

\# -*- coding:utf-8 -*-

from multiprocessing import Process, JoinableQueue

import time, random

def consumer(q):

    while True:

        res = q.get()

        print('消費者拿到了 %s' % res)

        q.task_done()

def producer(seq, q):

    for item in seq:

        time.sleep(random.randrange(1,2))

        q.put(item)

        print('生產者做好了 %s' % item)

    q.join()

if __name__ == "__main__":

    q = JoinableQueue()

    seq = ('產品%s' % i for i in range(5))

    p = Process(target=consumer, args=(q,))

    p.daemon = True  # 設置爲守護進程,在主線程停止時p也停止,但是不用擔心,producer內調用q.join保證了consumer已經處理完隊列中的所有元素

    p.start()

    producer(seq, q)

    print('主線程')

Value,Array(用於進程通信,資源共享)

multiprocessing 中 Value 和 Array 的實現原理都是在共享內存中創建 ctypes() 對象來達到共享數據的目的,兩者實現方法大同小異,只是選用不同的 ctypes 數據類型而已。

Value

構造方法:Value((typecode_or_type, args[, lock])

typecode_or_type 支持的類型:

| Type code | C Type             | Python Type       | Minimum size in bytes |

| --------- | ------------------ | ----------------- | --------------------- |

| `'b'`     | signed char        | int               | 1                     |

| `'B'`     | unsigned char      | int               | 1                     |

| `'u'`     | Py_UNICODE         | Unicode character | 2                     |

| `'h'`     | signed short       | int               | 2                     |

| `'H'`     | unsigned short     | int               | 2                     |

| `'i'`     | signed int         | int               | 2                     |

| `'I'`     | unsigned int       | int               | 2                     |

| `'l'`     | signed long        | int               | 4                     |

| `'L'`     | unsigned long      | int               | 4                     |

| `'q'`     | signed long long   | int               | 8                     |

| `'Q'`     | unsigned long long | int               | 8                     |

| `'f'`     | float              | float             | 4                     |

| `'d'`     | double             | float             | 8                     |

參考地址:https://docs.python.org/3/library/array.html

Array

構造方法:Array(typecode_or_type, size_or_initializer, **kwds[, lock])

使用示例:

import multiprocessing

def f(n, a):

    n.value = 3.14

    a[0] = 5

if __name__ == '__main__':

    num = multiprocessing.Value('d', 0.0)

    arr = multiprocessing.Array('i', range(10))

    p = multiprocessing.Process(target=f, args=(num, arr))

    p.start()

    p.join()

    print(num.value)

    print(arr[:])

注意:Value 和 Array 只適用於 Process 類。

Pipe(用於管道通信)

多進程還有一種數據傳遞方式叫管道原理和 Queue 相同。Pipe 可以在進程之間創建一條管道,並返回元組(conn1,conn2), 其中 conn1,conn2 表示管道兩端的連接對象,強調一點:必須在產生 Process 對象之前產生管道。

構造方法:Pipe([duplex])

實例方法:

使用示例:

from multiprocessing import Process, Pipe

import time

\# 子進程執行方法

def f(Subconn):

    time.sleep(1)

    Subconn.send("吃了嗎")

    print("來自父親的問候:", Subconn.recv())

    Subconn.close()

if __name__ == "__main__":

    parent_conn, child_conn = Pipe()  # 創建管道兩端

    p = Process(target=f, args=(child_conn,))  # 創建子進程

    p.start()

    print("來自兒子的問候:", parent_conn.recv())

    parent_conn.send("嗯")

Manager(用於資源共享)

Manager() 返回的 manager 對象控制了一個 server 進程,此進程包含的 python 對象可以被其他的進程通過 proxies 來訪問。從而達到多進程間數據通信且安全。Manager 模塊常與 Pool 模塊一起使用。

Manager 支持的類型有 list,dict,Namespace,Lock,RLock,Semaphore,BoundedSemaphore,Condition,Event,Queue,Value 和 Array。

管理器是獨立運行的子進程,其中存在真實的對象,並以服務器的形式運行,其他進程通過使用代理訪問共享對象,這些代理作爲客戶端運行。Manager() 是 BaseManager 的子類,返回一個啓動的 SyncManager() 實例,可用於創建共享對象並返回訪問這些共享對象的代理。

BaseManager,創建管理器服務器的基類

構造方法:BaseManager([address[, authkey]])

實例方法:

實例屬性:

SyncManager**,**以下類型均不是進程安全的,需要加鎖..

實例方法:

使用示例:

import multiprocessing

def f(x, arr, l, d, n):

    x.value = 3.14

    arr[0] = 5

    l.append('Hello')

    d[1] = 2

    n.a = 10

if __name__ == '__main__':

    server = multiprocessing.Manager()

    x = server.Value('d', 0.0)

    arr = server.Array('i', range(10))

    l = server.list()

    d = server.dict()

    n = server.Namespace()

    proc = multiprocessing.Process(target=f, args=(x, arr, l, d, n))

    proc.start()

    proc.join()

    print(x.value)

    print(arr)

    print(l)

    print(d)

    print(n)

同步子進程模塊

Lock(互斥鎖)

Lock 鎖的作用是當多個進程需要訪問共享資源的時候,避免訪問的衝突。加鎖保證了多個進程修改同一塊數據時,同一時間只能有一個修改,即串行的修改,犧牲了速度但保證了數據安全。Lock 包含兩種狀態——鎖定和非鎖定,以及兩個基本的方法。

構造方法:Lock()

實例方法:

使用示例:

from multiprocessing import Process, Lock

def l(lock, num):

    lock.acquire()

    print("Hello Num: %s" % (num))

    lock.release()

if __name__ == '__main__':

    lock = Lock()  # 這個一定要定義爲全局

    for num in range(20):

        Process(target=l, args=(lock, num)).start()

RLock(可重入的互斥鎖 (同一個進程可以多次獲得它,同時不會造成阻塞)

RLock(可重入鎖)是一個可以被同一個線程請求多次的同步指令。RLock 使用了 “擁有的線程” 和“遞歸等級”的概念,處於鎖定狀態時,RLock 被某個線程擁有。擁有 RLock 的線程可以再次調用 acquire(),釋放鎖時需要調用 release()相同次數。可以認爲 RLock 包含一個鎖定池和一個初始值爲 0 的計數器,每次成功調用 acquire()/release(),計數器將 + 1/-1,爲 0 時鎖處於未鎖定狀態。

構造方法:RLock()

實例方法:

Semaphore(信號量)

信號量是一個更高級的鎖機制。信號量內部有一個計數器而不像鎖對象內部有鎖標識,而且只有當佔用信號量的線程數超過信號量時線程才阻塞。這允許了多個線程可以同時訪問相同的代碼區。比如廁所有 3 個坑,那最多隻允許 3 個人上廁所,後面的人只能等裏面有人出來了才能再進去,如果指定信號量爲 3,那麼來一個人獲得一把鎖,計數加 1,當計數等於 3 時,後面的人均需要等待。一旦釋放,就有人可以獲得一把鎖。

構造方法:Semaphore([value])

實例方法:

使用示例:

from multiprocessing import Process, Semaphore

import time, random

def go_wc(sem, user):

    sem.acquire()

    print('%s 佔到一個茅坑' % user)

    time.sleep(random.randint(0, 3))

    sem.release()

    print(user, 'OK')

if __name__ == '__main__':

    sem = Semaphore(2)

    p_l = []

    for i in range(5):

        p = Process(target=go_wc, args=(sem, 'user%s' % i,))

        p.start()

        p_l.append(p)

    for i in p_l:

        i.join()

Condition(條件變量)

可以把 Condition 理解爲一把高級的鎖,它提供了比 Lock, RLock 更高級的功能,允許我們能夠控制複雜的線程同步問題。Condition 在內部維護一個鎖對象(默認是 RLock),可以在創建 Condigtion 對象的時候把瑣對象作爲參數傳入。Condition 也提供了 acquire, release 方法,其含義與鎖的 acquire, release 方法一致,其實它只是簡單的調用內部鎖對象的對應的方法而已。Condition 還提供了其他的一些方法。

構造方法:Condition([lock/rlock])

實例方法:

使用示例:

import multiprocessing

import time

def stage_1(cond):

    """perform first stage of work,

    then notify stage_2 to continue

    """

    name = multiprocessing.current_process().name

    print('Starting', name)

    with cond:

        print('{} done and ready for stage 2'.format(name))

        cond.notify_all()

def stage_2(cond):

    """wait for the condition telling us stage_1 is done"""

    name = multiprocessing.current_process().name

    print('Starting', name)

    with cond:

        cond.wait()

        print('{} running'.format(name))

if __name__ == '__main__':

    condition = multiprocessing.Condition()

    s1 = multiprocessing.Process(name='s1',

                                 target=stage_1,

                                 args=(condition,))

    s2_clients = [

        multiprocessing.Process(

            name='stage_2[{}]'.format(i),

            target=stage_2,

            args=(condition,),

        )

        for i in range(1, 3)

    ]

    for c in s2_clients:

        c.start()

        time.sleep(1)

    s1.start()

    s1.join()

    for c in s2_clients:

        c.join()

Event(事件)

Event 內部包含了一個標誌位,初始的時候爲 false。可以使用 set() 來將其設置爲 true;或者使用 clear() 將其從新設置爲 false;可以使用 is_set() 來檢查標誌位的狀態;另一個最重要的函數就是 wait(timeout=None),用來阻塞當前線程,直到 event 的內部標誌位被設置爲 true 或者 timeout 超時。如果內部標誌位爲 true 則 wait() 函數理解返回。

使用示例:

import multiprocessing

import time

def wait_for_event(e):

    """Wait for the event to be set before doing anything"""

    print('wait_for_event: starting')

    e.wait()

    print('wait_for_event: e.is_set()->', e.is_set())

def wait_for_event_timeout(e, t):

    """Wait t seconds and then timeout"""

    print('wait_for_event_timeout: starting')

    e.wait(t)

    print('wait_for_event_timeout: e.is_set()->', e.is_set())

if __name__ == '__main__':

    e = multiprocessing.Event()

    w1 = multiprocessing.Process(

        name='block',

        target=wait_for_event,

        args=(e,),

    )

    w1.start()

    w2 = multiprocessing.Process(

        name='nonblock',

        target=wait_for_event_timeout,

        args=(e, 2),

    )

    w2.start()

    print('main: waiting before calling Event.set()')

    time.sleep(3)

    e.set()

    print('main: event is set')

其他內容

multiprocessing.dummy 模塊與 multiprocessing 模塊的區別:dummy 模塊是多線程,而 multiprocessing 是多進程, api 都是通用的。所有可以很方便將代碼在多線程和多進程之間切換。multiprocessing.dummy 通常在 IO 場景可以嘗試使用,比如使用如下方式引入線程池。

from multiprocessing.dummy import Pool as ThreadPool

multiprocessing.dummy 與早期的 threading,不同的點好像是在多多核 CPU 下,只綁定了一個核心(具體未考證)。

參考文檔:

Python 併發之 concurrent.futures

Python 標準庫爲我們提供了 threading 和 multiprocessing 模塊編寫相應的多線程 / 多進程代碼。從 Python3.2 開始,標準庫爲我們提供了 concurrent.futures 模塊,它提供了 ThreadPoolExecutor 和 ProcessPoolExecutor 兩個類,實現了對 threading 和 multiprocessing 的更高級的抽象,對編寫線程池 / 進程池提供了直接的支持。concurrent.futures 基礎模塊是 executor 和 future。

Executor

Executor 是一個抽象類,它不能被直接使用。它爲具體的異步執行定義了一些基本的方法。ThreadPoolExecutor 和 ProcessPoolExecutor 繼承了 Executor,分別被用來創建線程池和進程池的代碼。

ThreadPoolExecutor 對象

ThreadPoolExecutor 類是 Executor 子類,使用線程池執行異步調用。

class concurrent.futures.ThreadPoolExecutor(max_workers)

使用 max_workers 數目的線程池執行異步調用。

ProcessPoolExecutor 對象

ThreadPoolExecutor 類是 Executor 子類,使用進程池執行異步調用。

class concurrent.futures.ProcessPoolExecutor(max_workers=None)

使用 max_workers 數目的進程池執行異步調用,如果 max_workers 爲 None 則使用機器的處理器數目(如 4 核機器 max_worker 配置爲 None 時,則使用 4 個進程進行異步併發)。

submit() 方法

Executor 中定義了 submit() 方法,這個方法的作用是提交一個可執行的回調 task,並返回一個 future 實例。future 對象代表的就是給定的調用。

Executor.submit(fn, *args, **kwargs)

使用示例:

from concurrent import futures

def test(num):

    import time

    return time.ctime(), num

with futures.ThreadPoolExecutor(max_workers=1) as executor:

    future = executor.submit(test, 1)

    print(future.result())

map() 方法

除了 submit,Exectuor 還爲我們提供了 map 方法,這個方法返回一個 map(func, *iterables) 迭代器,迭代器中的回調執行返回的結果有序的。

Executor.map(func, *iterables, timeout=None)

使用示例:

from concurrent import futures

def test(num):

    import time

    return time.ctime(), num

data = [1, 2, 3]

with futures.ThreadPoolExecutor(max_workers=1) as executor:

    for future in executor.map(test, data):

        print(future)

shutdown() 方法

釋放系統資源, 在 Executor.submit() 或 Executor.map() 等異步操作後調用。使用 with 語句可以避免顯式調用此方法。

Executor.shutdown(wait=True)

Future

Future 可以理解爲一個在未來完成的操作,這是異步編程的基礎。通常情況下,我們執行 io 操作,訪問 url 時(如下)在等待結果返回之前會產生阻塞,cpu 不能做其他事情,而 Future 的引入幫助我們在等待的這段時間可以完成其他的操作。

Future 類封裝了可調用的異步執行。Future 實例通過 Executor.submit() 方法創建。

使用示例:

from concurrent.futures import ThreadPoolExecutor, wait, as_completed

from time import sleep

from random import randint

def return_after_5_secs(num):

    sleep(randint(1, 5))

    return "Return of {}".format(num)

pool = ThreadPoolExecutor(5)

futures = []

for x in range(5):

    futures.append(pool.submit(return_after_5_secs, x))

print(1)

for x in as_completed(futures):

    print(x.result())

print(2)

參考鏈接:

作者:錢魏 Way

https://www.biaodianfu.com/python-multi-thread-and-multi-process.html

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/i3RvQc5uI9dVZ82pcwOQsQ