Python 爬蟲 -- 任務調度之 Celery 從入門到進階

作者:wedo 實驗君

整理:Python 編程時光

  1. 什麼是 celery

celery 是一個簡單,靈活、可靠的分佈式任務執行框架,可以支持大量任務的併發執行。celery 採用典型生產者和消費者模型。生產者提交任務到任務隊列,衆多消費者從任務隊列中取任務執行。

1.1 celery 架構

Celery 由以下三部分構成:消息中間件 (Broker)、任務執行單元 Worker、結果存儲 (Backend)

1.2 應用場景

  1. 安裝

celery 安裝非常簡單, 除了安裝 celery,本文中使用 redis 作爲消息隊列即 Broker

# celery 安裝
pip install celery
# celery 監控 flower
pip install flower
pip install redis
# redis 安裝
yum install redis
# redis啓動
redis-server /etc/redis.conf
  1. 完整例子

celery 的應用開發涉及四個部分

3.1 項目目錄

# 項目目錄
wedo
.
├── config.py
├── __init__.py
├── period_task.py
└── tasks.py

3.2 celery 實例初始化

celery 的實例化,主要包括執行 Broker 和 backend 的訪問方式,任務模塊的申明等

# celery 實例初始化 
# __init__.py
from celery import Celery
app = Celery('wedo')  # 創建 Celery 實例
app.config_from_object('wedo.config') 

# 配置 wedo.config
# config.py
BROKER_URL = 'redis://10.8.238.2:6379/0' # Broker配置,使用Redis作爲消息中間件
CELERY_RESULT_BACKEND = 'redis://10.8.238.2:6379/0' # BACKEND配置,這裏使用redis
CELERY_RESULT_SERIALIZER = 'json' # 結果序列化方案
CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24 # 任務過期時間
CELERY_TIMEZONE='Asia/Shanghai'   # 時區配置
CELERY_IMPORTS = (     # 指定導入的任務模塊,可以指定多個
    'wedo.tasks',
    'wedo.period_task'
)

3.3 任務的定義

celery 中通過 @task 的裝飾器來進行申明 celery 任務,其他操作無任何差別

# 任務的定義
# 簡單任務  tasks.py
import celery
import time
from celery.utils.log import get_task_logger
from wedo import app

@app.task
def sum(x, y):
    return x + y

@app.task
def mul(x, y):
    time.sleep(5)
    return x * y

定時任務和實時任務的區別主要是要申明何時執行任務,任務本身也是通過 task 裝飾器來申明 何時執行任務有 2 種

# 任務的定義
# 定時任務  period_task.py
from wedo import app
from celery.schedules import crontab

@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
    sender.add_periodic_task(5.0, to_string.s("celery peroid task")name='to_string') # 每5秒執行add
    sender.add_periodic_task(
        crontab(minute='*/10'),      #每10分鐘執行一次
        send_mail.s('hello, this is a celery')name='send_mail'
    )

@app.task
def send_mail(content):
    print('send mail, content is %s' % content)

@app.task
def to_string(text):
    return 'this is a %s' % text

3.4 任務 worker 的啓動

任務啓動分爲 worker 啓動和定時任務 beat 啓動

# -A wedo爲應用模塊
# -l爲日誌level
# -c 爲進程數
celery worker -A wedo  -l debug -c 4

# 後臺啓動
nohup celery worker -A wedo -l debug -c 4 > ./log.log  2>&1

# 從下面的日誌可以看出啓動了4個任務
#   . wedo.period_task.send_mail
#   . wedo.period_task.to_string
#   . wedo.tasks.mul
#   . wedo.tasks.sum

 -------------- celery@localhost.localdomain v4.4.2 (cliffs)
--- ***** ----- 
-- ******* ---- Linux-3.10.0-327.28.3.el7.x86_64-x86_64-with-centos-7.2.1511-Core 2020-04-25 23:35:26
- *** --- * --- 
- ** ---------- [config]
- ** ---------- .> app:         wedo:0x7f05af30d320
- ** ---------- .> transport:   redis://10.8.238.2:6379/0
- ** ---------- .> results:     redis://10.8.238.2:6379/0
- *** --- * --- .> concurrency: 4 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** ----- 
 -------------- [queues]
                .> celery           exchange=celery(direct) key=celery


[tasks]
  . celery.accumulate
  . celery.backend_cleanup
...
  . wedo.period_task.send_mail
  . wedo.period_task.to_string
  . wedo.tasks.mul
  . wedo.tasks.sum
...
[2020-04-25 23:35:27,617: INFO/MainProcess] celery@localhost.localdomain ready.
[2020-04-25 23:35:27,617: DEBUG/MainProcess] basic.qos: prefetch_count->16
[2020-04-25 23:35:27,655: DEBUG/MainProcess] celery@12103675 joined the party
celery beat -A wedo.period_task

celery beat v4.4.2 (cliffs) is starting.
__    -    ... __   -        _
LocalTime -> 2020-04-25 23:37:08
Configuration ->
    . broker -> redis://10.8.238.2:6379/0
    . loader -> celery.loaders.app.AppLoader
    . scheduler -> celery.beat.PersistentScheduler
    . db -> celerybeat-schedule
    . logfile -> [stderr]@%WARNING
    . maxinterval -> 5.00 minutes (300s)
# worker啓動是4個進程
\_  /root/anaconda3/envs/post/bin/celery worker -A wedo -l debug -c 4    
    \_  /root/anaconda3/envs/post/bin/celery worker -A wedo -l debug -c 4
    \_  /root/anaconda3/envs/post/bin/celery worker -A wedo -l debug -c 4
    \_  /root/anaconda3/envs/post/bin/celery worker -A wedo -l debug -c 4
    \_  /root/anaconda3/envs/post/bin/celery worker -A wedo -l debug -c 4

worker 和 beat 的停止

ps auxww | awk '/celery worker/ {print $2}' | xargs kill -9
ps auxww | awk '/celery beat/ {print $2}' | xargs kill -9

3.5 任務的調用

任務 worker 已經啓動好了,通過任務調用傳遞給 broker(redis),並返回任務執行結果 任務調用主要有兩種,本質是一致的,delay 是 apply_async 的封裝,apply_async 可以支持更多的任務調用配置

apply_async 和 delay 會返回一個異步的任務結果,AsyncResult 中存儲了任務的執行狀態和結果,常用的操作

value = result.get() # 任務返回值
print(result.__dict__) # 結果信息
print(result.successful()) # 是否成功
print(result.fail()) # 是否失敗
print(result.ready()) # 是否執行完成
print(result.state) # 狀態 PENDING -> STARTED -> SUCCESS/FAIL

常規任務:

from celery.utils.log import get_logger
from wedo.tasks import sum, mul, post_file
from celery import group, chain, chord
logger = get_logger(__name__)
try:
    result = mul.apply_async(args=(2, 2))
    value = result.get() # 等待任務執行完畢後,纔會返回任務返回值
    print(value)
except mul.OperationalError as exc: # 任務異常處理
    logger.exception('Sending task raised: %r', exc)

組合任務:

result = group(sum.s(i, i) for i in range(5))()
result.get()
# [0, 2, 4, 6, 8]
result = chain(sum.s(1,2), sum.s(3), mul.s(3))()
result.get()
# ((1+2)+3)*3=18
  1. 分佈式集羣部署

celery 作爲分佈式的任務隊列框架,worker 是可以執行在不同的服務器上的。部署過程和單機上啓動是一樣。只要把項目代碼 copy 到其他服務器,使用相同命令就可以了。可以思考下,這個是怎麼實現的?對了,就是通過共享Broker隊列。使用合適的隊列,如 redis,單進程單線程的方式可以有效的避免同個任務被不同 worker 同時執行的情況。

celery worker -A wedo  -l debug -c 4

  1. 進階使用

在前面已經瞭解了 celery 的主要的功能了。celery 還爲一些特別的場景提供了需要擴展的功能

5.1 任務狀態跟蹤和日誌

有時候我們需要對任務的執行情況做一些監控,比如失敗後報警通知。

import celery
import time
from celery.utils.log import get_task_logger
from wedo import app

logger = logger = get_task_logger(__name__)
class TaskMonitor(celery.Task):
    def on_failure(self, exc, task_id, args, kwargs, einfo):
        """failed callback"""
        logger.info('task id: {0!r} failed: {1!r}'.format(task_id, exc))

    def on_success(self, retval, task_id, args, kwargs):
        """success callback"""
        logger.info('task id:{} , arg:{} , successful !'.format(task_id,args))

    def on_retry(self, exc, task_id, args, kwargs, einfo):
        """retry callback"""
        logger.info('task id:{} , arg:{} , retry !  einfo: {}'.format(task_id, args, exc))

@app.task(base=TaskMonitor, bind=True, name='post_file')
def post_file(self, file_names):
    logger.info(self.request.__dict__)
    try:
        for i, file in enumerate(file_names):
            print('the file %s is posted' % file)
            if not self.request.called_directly:
                self.update_state(state='PROGRESS',
                    meta={'current': i, 'total': len(file_names)})
            time.sleep(2)
    except Exception as exec:
        raise self.retry(exc=exec, countdown=3, max_retries=5)

5.2 任務指定特定的 worker 執行

celery 做爲支持分佈式,理論上可以無限擴展 worker。默認情況下 celery 提交任務後,任務會放入名爲 celery 的隊列,所有在線的 worker 都會從任務隊列中獲取任務,任一個 worker 都有可能執行這個任務。有時候,有時候任務的特殊性或者機器本身的限制,某些任務只能跑在某些 worker 上。celery 提供了 queue 在區別不同的 worker,很好的支持這種情況。

celery worker -A wedo  -l debug -c 4 -Q celery,hipri
result = mul.apply_async(args=(2, 2)queue='hipri')
  1. 任務隊列監控

如果你想通過可視化的方式,查看 celery 的一切。flower 提供可行的解決方案,十分的方便

flower -A wedo --port=6006
# web訪問 http://10.8.238.2:6006/

  1. 總結

本文和大家了介紹了分佈式的隊列 celery, 妥妥的很全吧, 歡迎交流。總結下內容:

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/Y3VRmMtku7ZpWrT-oF2q5w