利用 Python 實現多任務進程

一、進程介紹

進程:正在執行的程序,由程序、數據和進程控制塊組成,是正在執行的程序,程序的一次執行過程,是資源調度的基本單位。

程序:沒有執行的代碼,是一個靜態的。

二、線程和進程之間的對比

由圖可知:此時電腦有 9 個應用進程,但是一個進程又會對應於多個線程,可以得出結論:

進程:能夠完成多任務,一臺電腦上可以同時運行多個 QQ

線程:能夠完成多任務,一個 QQ 中的多個聊天窗口

根本區別:進程是操作系統資源分配的基本單位,而線程是任務調度和執行的基本單位.

使用多進程的優勢:

1、擁有獨立 GIL:

首先由於進程中 GIL 的存在,Python 中的多線程並不能很好地發揮多核優勢,一個進程中的多個線程,在同 一時刻只能有一個線程運行。而對於多進程來說,每個進程都有屬於自己的 GIL,所以,在多核處理器下,多進程的運行是不會受 GIL 的影響的。因此,多進 程能更好地發揮多核的優勢。

2、效率高

當然,對於爬蟲這種 IO 密集型任務來說,多線程和多進程影響差別並不大。對於計算密集型任務來說,Python 的多進程相比多線 程,其多核運行效率會有成倍的提升。

三、Python 實現多進程

我們先用一個實例來感受一下:

1、使用 process 類

import multiprocessing 
def process(index): 
    print(f'Process: {index}') 
if __name__ == '__main__': 
    for i in range(5): 
        p = multiprocessing.Process(target=process, args=(i,)) 
        p.start()

這是一個實現多進程最基礎的方式:通過創建 Process 來新建一個子進程,其中 target 參數傳入方法名,args 是方法的參數,是以 元組的形式傳入,其和被調用的方法 process 的參數是一一對應的。

注意:這裏 args 必須要是一個元組,如果只有一個參數,那也要在元組第一個元素後面加一個逗號,如果沒有逗號則 和單個元素本身沒有區別,無法構成元組,導致參數傳遞出現問題。創建完進程之後,我們通過調用 start 方法即可啓動進程了。

運行結果如下:

Process: 0 
Process: 1 
Process: 2 
Process: 3 
Process: 4

可以看到,我們運行了 5 個子進程,每個進程都調用了 process 方法。process 方法的 index 參數通過 Process 的 args 傳入,分別是 0~4 這 5 個序號,最後打印出來,5 個子進程運行結束。

2、繼承 process 類

from multiprocessing import Process
import time

class MyProcess(Process):
    def __init__(self,loop):
        Process.__init__(self)
        self.loop = loop


    def run(self):
        for count in range(self.loop):
            time.sleep(1)
            print(f'Pid:{self.pid} LoopCount: {count}')
if __name__ == '__main__':
    for i in range(2,5):
        p = MyProcess(i)
        p.start()

我們首先聲明瞭一個構造方法,這個方法接收一個 loop 參數,代表循環次數,並將其設置爲全局變量。在 run 方法中,又使用這 個 loop 變量循環了 loop 次並打印了當前的進程號和循環次數。

在調用時,我們用 range 方法得到了 2、3、4 三個數字,並把它們分別初始化了 MyProcess 進程,然後調用 start 方法將進程啓動起 來。

注意:這裏進程的執行邏輯需要在 run 方法中實現,啓動進程需要調用 start 方法,調用之後 run 方法便會執行。

運行結果如下:

Pid:12976 LoopCount: 0
Pid:15012 LoopCount: 0
Pid:11976 LoopCount: 0
Pid:12976 LoopCount: 1
Pid:15012 LoopCount: 1
Pid:11976 LoopCount: 1
Pid:15012 LoopCount: 2
Pid:11976 LoopCount: 2
Pid:11976 LoopCount: 3

注意,這裏的進程 pid 代表進程號,不同機器、不同時刻運行結果可能不同。

四、進程之間的通信

1、Queue - 隊列 先進先出

from multiprocessing import Queue
import multiprocessing

def download(p)# 下載數據
    lst = [11,22,33,44]
    for item in lst:
        p.put(item)
    print('數據已經下載成功....')


def savedata(p):
    lst = []
    while True:
        data = p.get()
        lst.append(data)
        if p.empty():
            break
    print(lst)

def main():
    p1 = Queue()

    t1 = multiprocessing.Process(target=download,args=(p1,))
    t2 = multiprocessing.Process(target=savedata,args=(p1,))

    t1.start()
    t2.start()


if __name__ == '__main__':
    main()
數據已經下載成功....
[11, 22, 33, 44]

2、共享全局變量不適用於多進程編程

import multiprocessing

a = 1


def demo1():
    global a
    a += 1


def demo2():
    print(a)

def main():
    t1 = multiprocessing.Process(target=demo1)
    t2 = multiprocessing.Process(target=demo2)

    t1.start()
    t2.start()

if __name__ == '__main__':
    main()

運行結果:

1

有結果可知:全局變量不共享;

五、進程池之間的通信

1、進程池引入

當需要創建的子進程數量不多時,可以直接利用 multiprocessing 中的 Process 動態生成多個進程,但是如果是上百甚至上千個目標,手動的去創建的進程的工作量巨大,此時就可以用到 multiprocessing 模塊提供的 Pool 方法。

from multiprocessing import Pool
import os,time,random

def worker(a):
    t_start = time.time()
    print('%s開始執行,進程號爲%d'%(a,os.getpid()))

    time.sleep(random.random()*2)
    t_stop = time.time()
    print(a,"執行完成,耗時%0.2f"%(t_stop-t_start))


if __name__ == '__main__':
    po = Pool(3)        # 定義一個進程池
    for i in range(0,10):
        po.apply_async(worker,(i,))    # 向進程池中添加worker的任務

    print("--start--")
    po.close()      

    po.join()       
    print("--end--")

運行結果:

--start--
0開始執行,進程號爲6664
1開始執行,進程號爲4772
2開始執行,進程號爲13256
0 執行完成,耗時0.18
3開始執行,進程號爲6664
2 執行完成,耗時0.16
4開始執行,進程號爲13256
1 執行完成,耗時0.67
5開始執行,進程號爲4772
4 執行完成,耗時0.87
6開始執行,進程號爲13256
3 執行完成,耗時1.59
7開始執行,進程號爲6664
5 執行完成,耗時1.15
8開始執行,進程號爲4772
7 執行完成,耗時0.40
9開始執行,進程號爲6664
6 執行完成,耗時1.80
8 執行完成,耗時1.49
9 執行完成,耗時1.36
--end--

一個進程池只能容納 3 個進程,執行完成才能添加新的任務,在不斷的打開與釋放的過程中循環往復。

六、案例: 文件批量複製

操作思路:

代碼如下:

導包

import multiprocessing
import os
import time

定製文件複製函數

def copy_file(Q,oldfolderName,newfolderName,file_name):
    # 文件複製,不需要返回
    time.sleep(0.5)
    # print('\r從%s文件夾複製到%s文件夾的%s文件'%(oldfolderName,newfolderName,file_name),end='')

    old_file = open(oldfolderName + '/' + file_name,'rb') # 待複製文件
    content = old_file.read()
    old_file.close()

    new_file = open(newfolderName + '/' + file_name,'wb') # 複製出的新文件
    new_file.write(content)
    new_file.close()

    Q.put(file_name) # 向Q隊列中添加文件

定義主函數

def main():
    oldfolderName = input('請輸入要複製的文件夾名字:') # 步驟1獲取要複製文件夾的名字(可以手動創建,也可以通過代碼創建,這裏我們手動創建)
    newfolderName = oldfolderName + '復件'
    # 步驟二 創建一個新的文件夾
    if not os.path.exists(newfolderName):
        os.mkdir(newfolderName)

    filenames = os.listdir(oldfolderName) # 3.獲取文件夾裏面所有待複製的文件名
    # print(filenames)

    pool = multiprocessing.Pool(5) # 4.創建進程池

    Q = multiprocessing.Manager().Queue() # 創建隊列,進行通信
    for file_name in filenames:
        pool.apply_async(copy_file,args=(Q,oldfolderName,newfolderName,file_name)) # 5.向進程池添加任務
      po.close()

    copy_file_num = 0
    file_count = len(filenames)
    # 不知道什麼時候完成,所以定義一個死循環
    while True:
        file_name = Q.get()
        copy_file_num += 1
        time.sleep(0.2)
        print('\r拷貝進度%.2f %%'%(copy_file_num  * 100/file_count),end='') # 做一個拷貝進度條

        if copy_file_num >= file_count:
            break

程序運行

if __name__ == '__main__':
    main()

運行結果如下圖所示:

運行前後文件目錄結構對比

運行前

運行後

以上內容就是整體大致結果了,由於 test 裏面是隨便粘貼的測試文件,這裏就不展開演示了。

小夥伴們,快快用實踐一下吧!如果在學習過程中,有遇到任何問題,歡迎加我好友,我拉你進 Python 學習交流羣共同探討學習。

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/GTfR_XGtokqMLWM3B4ma3Q