一篇文章帶你瞭解 Python 的分佈式進程接口

一、前言

    在 Thread 和 Process 中,應當優選 Process,因爲 Process 更穩定,而且,Process 可以分佈到多臺機器上,而 Thread 最多隻能分佈到同一臺機器的多個 CPU 上。

Python 的 multiprocessing 模塊不但支持多進程,其中 managers 子模塊還支持把多進程分佈到多臺機器上。可以寫一個服務進程作爲調度者,將任務分佈到其他多個進程中,依靠網絡通信進行管理。

二、案例分析

    在做爬蟲程序時,抓取某個網站的所有圖片,如果使用多進程的話,一般是一個進程負責抓取圖片的鏈接地址,將鏈接地址放到 queue 中,另外的進程負責 從 queue 中取鏈接地址進行下載和存儲到本地。

怎麼用分佈式進程實現?

     一臺機器上的進程負責抓取鏈接地址,其他機器上的進程負責系在存儲。那麼遇到的主要問題是將 queue 暴露到網絡中,讓其他機器進程都可以訪問,分佈式進程就是將這個過程進行了封裝,可以將這個過程稱爲本地隊列的網絡化。

例:

1.py

from multiprocessing.managers import BaseManager
from multiprocessing import freeze_support, Queue
# 任務個數
task_number = 10
# 收發隊列
task_quue = Queue(task_number)
result_queue = Queue(task_number)
def get_task():
    return task_quue
def get_result():
    return result_queue
# 創建類似的queueManager
class QueueManager(BaseManager):
    pass
def win_run():
    # 註冊在網絡上,callable 關聯了Queue 對象
    # 將Queue對象在網絡中暴露
    # window下綁定調用接口不能直接使用lambda,所以只能先定義函數再綁定
    QueueManager.register('get_task_queue', callable=get_task)
    QueueManager.register('get_result_queue', callable=get_result)
    # 綁定端口和設置驗證口令
    manager = QueueManager(address=('127.0.0.1', 8001), authkey='qiye'.encode())
    # 啓動管理,監聽信息通道
    manager.start()
    try:
        # 通過網絡獲取任務隊列和結果隊列
        task = manager.get_task_queue()
        result = manager.get_result_queue()
        # 添加任務
        for url in ["ImageUrl_" + str(i) for i in range(10)]:
            print('url is %s' % url)
            task.put(url)
        print('try get result')
        for i in range(10):
            print('result is %s' % result.get(timeout=10))
    except:
        print('Manager error')
    finally:
        manager.shutdown()
if __name__ == '__main__':
    freeze_support()
    win_run()

連接服務器,端口和驗證口令注意保持與服務器進程中完全一致從網絡獲取 Queue,進行本地化,從 task 隊列獲取任務,並且把結果寫入 result 隊列

2.py

#coding:utf-8
import time
from multiprocessing.managers import BaseManager
# 創建類似的Manager:
class Manager(BaseManager):
    pass
#使用QueueManager註冊獲取Queue的方法名稱
Manager.register('get_task_queue')
Manager.register('get_result_queue')
#連接到服務器:
server_addr = '127.0.0.1'
print('Connect to server %s...' % server_addr)
# 端口和驗證口令注意保持與服務進程設置的完全一致:
m = Manager(address=(server_addr, 8001), authkey='qiye')
# 從網絡連接:
m.connect()
#獲取Queue的對象:
task = m.get_task_queue()
result = m.get_result_queue()
#從task隊列取任務,並把結果寫入result隊列:
while(not task.empty()):
        image_url = task.get(True,timeout=5)
        print('run task download %s...' % image_url)
        time.sleep(1)
        result.put('%s--->success'%image_url)
#結束:
print('worker exit.')

任務進程要通過網絡連接到服務進程,所以要指定服務進程的 IP。

運行結果如下:

獲取圖片地址, 將地址傳到 2.py。

接收 1.py 傳遞的地址,進行圖片的下載,控制檯顯示爬取結果。

三、總結

    本文基於 Python 基礎,Python 的分佈式進程接口簡單,封裝良好,適合需要把繁重任務分佈到多臺機器的環境下。通過講解 Queue 的作用是用來傳遞任務和接收結果。

    歡迎大家積極嘗試,有時候看到別人實現起來很簡單,但是到自己動手實現的時候,總會有各種各樣的問題,切勿眼高手低,勤動手,纔可以理解的更加深刻。

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/19V4KyB4IPzMMF4ud0LOzA