Python 爬蟲 -- 任務調度之 Celery 從入門到進階
作者:wedo 實驗君
整理:Python 編程時光
- 什麼是 celery
celery 是一個簡單,靈活、可靠的分佈式任務執行框架,可以支持大量任務的併發執行。celery 採用典型生產者和消費者模型。生產者提交任務到任務隊列,衆多消費者從任務隊列中取任務執行。
1.1 celery 架構
Celery 由以下三部分構成:消息中間件 (Broker)、任務執行單元 Worker、結果存儲 (Backend)
-
任務調用提交任務執行請求給 Broker 隊列
-
如果是異步任務,worker 會立即從隊列中取出任務並執行,執行結果保存在 Backend 中
-
如果是定時任務,任務由 Celery Beat 進程週期性地將任務發往 Broker 隊列,Worker 實時監視消息隊列獲取隊列中的任務執行
1.2 應用場景
-
大量的長時間任務的異步執行, 如上傳大文件
-
大規模實時任務執行,支持集羣部署,如支持高併發的機器學習推理
-
定時任務執行,如定時發送郵件,定時掃描機器運行情況
- 安裝
celery 安裝非常簡單, 除了安裝 celery,本文中使用 redis 作爲消息隊列即 Broker
# celery 安裝
pip install celery
# celery 監控 flower
pip install flower
pip install redis
# redis 安裝
yum install redis
# redis啓動
redis-server /etc/redis.conf
- 完整例子
celery 的應用開發涉及四個部分
-
celery 實例初始化
-
任務的定義(定時和實時任務)
-
任務 worker 的啓動
-
任務的調用
3.1 項目目錄
# 項目目錄
wedo
.
├── config.py
├── __init__.py
├── period_task.py
└── tasks.py
3.2 celery 實例初始化
celery 的實例化,主要包括執行 Broker 和 backend 的訪問方式,任務模塊的申明等
# celery 實例初始化
# __init__.py
from celery import Celery
app = Celery('wedo') # 創建 Celery 實例
app.config_from_object('wedo.config')
# 配置 wedo.config
# config.py
BROKER_URL = 'redis://10.8.238.2:6379/0' # Broker配置,使用Redis作爲消息中間件
CELERY_RESULT_BACKEND = 'redis://10.8.238.2:6379/0' # BACKEND配置,這裏使用redis
CELERY_RESULT_SERIALIZER = 'json' # 結果序列化方案
CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24 # 任務過期時間
CELERY_TIMEZONE='Asia/Shanghai' # 時區配置
CELERY_IMPORTS = ( # 指定導入的任務模塊,可以指定多個
'wedo.tasks',
'wedo.period_task'
)
3.3 任務的定義
celery 中通過 @task 的裝飾器來進行申明 celery 任務,其他操作無任何差別
# 任務的定義
# 簡單任務 tasks.py
import celery
import time
from celery.utils.log import get_task_logger
from wedo import app
@app.task
def sum(x, y):
return x + y
@app.task
def mul(x, y):
time.sleep(5)
return x * y
定時任務和實時任務的區別主要是要申明何時執行任務,任務本身也是通過 task 裝飾器來申明 何時執行任務有 2 種
-
指定頻率執行:sender.add_periodic_task(時間頻率單位 s, 任務函數, name='to_string')
-
crontab 方式:分鐘 / 小時 / 天 / 月 / 周粒度, 可以支持多種調度
# 任務的定義
# 定時任務 period_task.py
from wedo import app
from celery.schedules import crontab
@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
sender.add_periodic_task(5.0, to_string.s("celery peroid task"), name='to_string') # 每5秒執行add
sender.add_periodic_task(
crontab(minute='*/10'), #每10分鐘執行一次
send_mail.s('hello, this is a celery'), name='send_mail'
)
@app.task
def send_mail(content):
print('send mail, content is %s' % content)
@app.task
def to_string(text):
return 'this is a %s' % text
3.4 任務 worker 的啓動
任務啓動分爲 worker 啓動和定時任務 beat 啓動
# -A wedo爲應用模塊
# -l爲日誌level
# -c 爲進程數
celery worker -A wedo -l debug -c 4
# 後臺啓動
nohup celery worker -A wedo -l debug -c 4 > ./log.log 2>&1
# 從下面的日誌可以看出啓動了4個任務
# . wedo.period_task.send_mail
# . wedo.period_task.to_string
# . wedo.tasks.mul
# . wedo.tasks.sum
-------------- celery@localhost.localdomain v4.4.2 (cliffs)
--- ***** -----
-- ******* ---- Linux-3.10.0-327.28.3.el7.x86_64-x86_64-with-centos-7.2.1511-Core 2020-04-25 23:35:26
- *** --- * ---
- ** ---------- [config]
- ** ---------- .> app: wedo:0x7f05af30d320
- ** ---------- .> transport: redis://10.8.238.2:6379/0
- ** ---------- .> results: redis://10.8.238.2:6379/0
- *** --- * --- .> concurrency: 4 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
-------------- [queues]
.> celery exchange=celery(direct) key=celery
[tasks]
. celery.accumulate
. celery.backend_cleanup
...
. wedo.period_task.send_mail
. wedo.period_task.to_string
. wedo.tasks.mul
. wedo.tasks.sum
...
[2020-04-25 23:35:27,617: INFO/MainProcess] celery@localhost.localdomain ready.
[2020-04-25 23:35:27,617: DEBUG/MainProcess] basic.qos: prefetch_count->16
[2020-04-25 23:35:27,655: DEBUG/MainProcess] celery@12103675 joined the party
celery beat -A wedo.period_task
celery beat v4.4.2 (cliffs) is starting.
__ - ... __ - _
LocalTime -> 2020-04-25 23:37:08
Configuration ->
. broker -> redis://10.8.238.2:6379/0
. loader -> celery.loaders.app.AppLoader
. scheduler -> celery.beat.PersistentScheduler
. db -> celerybeat-schedule
. logfile -> [stderr]@%WARNING
. maxinterval -> 5.00 minutes (300s)
# worker啓動是4個進程
\_ /root/anaconda3/envs/post/bin/celery worker -A wedo -l debug -c 4
\_ /root/anaconda3/envs/post/bin/celery worker -A wedo -l debug -c 4
\_ /root/anaconda3/envs/post/bin/celery worker -A wedo -l debug -c 4
\_ /root/anaconda3/envs/post/bin/celery worker -A wedo -l debug -c 4
\_ /root/anaconda3/envs/post/bin/celery worker -A wedo -l debug -c 4
worker 和 beat 的停止
ps auxww | awk '/celery worker/ {print $2}' | xargs kill -9
ps auxww | awk '/celery beat/ {print $2}' | xargs kill -9
3.5 任務的調用
任務 worker 已經啓動好了,通過任務調用傳遞給 broker(redis),並返回任務執行結果 任務調用主要有兩種,本質是一致的,delay 是 apply_async 的封裝,apply_async 可以支持更多的任務調用配置
-
task.apply_async(args=[arg1, arg2], kwargs={'kwarg1': 'x', 'kwarg2': 'y'})
-
task.delay(arg1, arg2, kwarg1='x', kwarg2='y')
apply_async 和 delay 會返回一個異步的任務結果,AsyncResult 中存儲了任務的執行狀態和結果,常用的操作
value = result.get() # 任務返回值
print(result.__dict__) # 結果信息
print(result.successful()) # 是否成功
print(result.fail()) # 是否失敗
print(result.ready()) # 是否執行完成
print(result.state) # 狀態 PENDING -> STARTED -> SUCCESS/FAIL
常規任務:
from celery.utils.log import get_logger
from wedo.tasks import sum, mul, post_file
from celery import group, chain, chord
logger = get_logger(__name__)
try:
result = mul.apply_async(args=(2, 2))
value = result.get() # 等待任務執行完畢後,纔會返回任務返回值
print(value)
except mul.OperationalError as exc: # 任務異常處理
logger.exception('Sending task raised: %r', exc)
組合任務:
-
多個任務並行執行, group
-
多個任務鏈式執行,chain:第一個任務的返回值作爲第二個的輸入參數,以此類推
result = group(sum.s(i, i) for i in range(5))()
result.get()
# [0, 2, 4, 6, 8]
result = chain(sum.s(1,2), sum.s(3), mul.s(3))()
result.get()
# ((1+2)+3)*3=18
- 分佈式集羣部署
celery 作爲分佈式的任務隊列框架,worker 是可以執行在不同的服務器上的。部署過程和單機上啓動是一樣。只要把項目代碼 copy 到其他服務器,使用相同命令就可以了。可以思考下,這個是怎麼實現的?對了,就是通過共享Broker隊列
。使用合適的隊列,如 redis,單進程單線程的方式可以有效的避免同個任務被不同 worker 同時執行的情況。
celery worker -A wedo -l debug -c 4
- 分佈式集羣如下:
- 進階使用
在前面已經瞭解了 celery 的主要的功能了。celery 還爲一些特別的場景提供了需要擴展的功能
5.1 任務狀態跟蹤和日誌
有時候我們需要對任務的執行情況做一些監控,比如失敗後報警通知。
-
celery 在裝飾器 @app.task 中提供了 base 參數,傳入重寫的 Task 模塊,重新 on_* 函數就可以控制不同的任務結果
-
在 @app.task 提供 bind=True,可以通過 self 獲取 Task 中各種參數
-
- self.request:任務的各種參數
-
self.update_state: 自定義任務狀態, 原有的任務狀態:PENDING -> STARTED -> SUCCESS, 如果你想了解 STARTED -> SUCCESS 之間的一個狀態,比如執行的百分比之類,可以通過自定義狀態來實現
-
self.retry: 重試
import celery
import time
from celery.utils.log import get_task_logger
from wedo import app
logger = logger = get_task_logger(__name__)
class TaskMonitor(celery.Task):
def on_failure(self, exc, task_id, args, kwargs, einfo):
"""failed callback"""
logger.info('task id: {0!r} failed: {1!r}'.format(task_id, exc))
def on_success(self, retval, task_id, args, kwargs):
"""success callback"""
logger.info('task id:{} , arg:{} , successful !'.format(task_id,args))
def on_retry(self, exc, task_id, args, kwargs, einfo):
"""retry callback"""
logger.info('task id:{} , arg:{} , retry ! einfo: {}'.format(task_id, args, exc))
@app.task(base=TaskMonitor, bind=True, name='post_file')
def post_file(self, file_names):
logger.info(self.request.__dict__)
try:
for i, file in enumerate(file_names):
print('the file %s is posted' % file)
if not self.request.called_directly:
self.update_state(state='PROGRESS',
meta={'current': i, 'total': len(file_names)})
time.sleep(2)
except Exception as exec:
raise self.retry(exc=exec, countdown=3, max_retries=5)
5.2 任務指定特定的 worker 執行
celery 做爲支持分佈式,理論上可以無限擴展 worker。默認情況下 celery 提交任務後,任務會放入名爲 celery 的隊列,所有在線的 worker 都會從任務隊列中獲取任務,任一個 worker 都有可能執行這個任務。有時候,有時候任務的特殊性或者機器本身的限制,某些任務只能跑在某些 worker 上。celery 提供了 queue 在區別不同的 worker,很好的支持這種情況。
- 啓動 worker 時,-Q 指定 worker 支持的任務列隊名, 可以支持多個隊列名哦
celery worker -A wedo -l debug -c 4 -Q celery,hipri
- 任務調用時,
queue=*
來指定需要執行 worker
result = mul.apply_async(args=(2, 2), queue='hipri')
- 任務隊列監控
如果你想通過可視化的方式,查看 celery 的一切。flower 提供可行的解決方案,十分的方便
flower -A wedo --port=6006
# web訪問 http://10.8.238.2:6006/
- 總結
本文和大家了介紹了分佈式的隊列 celery, 妥妥的很全吧, 歡迎交流。總結下內容:
-
celery 爲分佈式隊列, 通過消息隊列連接任務提交和執行者 worker, 松耦合模式,可擴展
-
celery 消息隊列建議爲 redis
-
celery 通過 @app.task 裝飾把普通任務變成 celery Task
-
celery worker 通過不同 queue 支持特定的 worker 消費特定的任務
-
@app.task 中可以同步 base 和 bind 參數獲取更過的控制任務生命週期
-
flower 監控 celery 全過程
-
celery doc:https://docs.celeryproject.org/en/master/getting-started/index.html
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/Y3VRmMtku7ZpWrT-oF2q5w