兩萬字長文讓你徹底掌握 celery

什麼是 celery

這次我們來介紹一下 Python 的一個第三方模塊 celery,那麼 celery 是什麼呢?

所以 celery 本質上就是一個任務調度框架,類似於 Apache 的 airflow,當然 airflow 也是基於 Python 語言編寫。

不過有一點需要注意,celery 是用來調度任務的,但它本身並不具備存儲任務的功能,而調度任務的時候肯定是要把任務存起來的。因此要使用 celery 的話,還需要搭配一些具備存儲、訪問功能的工具,比如:消息隊列、Redis 緩存、數據庫等等。官方推薦的是消息隊列 RabbitMQ,個人認爲有些時候使用 Redis 也是不錯的選擇,當然我們都會介紹。

那麼 celery 都可以在哪些場景中使用呢?

celery 的架構

我們看一下 celery 的架構:

下面我們來安裝 celery,安裝比較簡單,直接 pip install celery 即可。這裏我本地的 celery 版本是 5.2.7,Python 版本是 3.8.10。

另外,由於 celery 本身不提供任務存儲的功能,所以這裏我們使用 Redis 作爲消息隊列,負責存儲任務。因此你還要在機器上安裝 Redis,我這裏有一臺雲服務器,已經安裝好了。

後續 celery 就會將任務存到 broker 裏面,當然要想實現這一點,就必須還要有能夠操作相應 broker 的驅動。Python 操作 Redis 的驅動也叫 redis,操作 RabbitMQ 的驅動叫 pika,直接 pip install ... 安裝即可。

celery 實現異步任務

我們新建一個工程,就叫 celery_demo,然後在裏面新建一個 app.py 文件。

# 文件名:app.py
import time
# 這個 Celery 就類似於 flask.Flask
# 然後實例化得到一個app
from celery import Celery

# 指定一個 name、以及 broker 的地址、backend 的地址
app = Celery(
    "satori",
    # 這裏使用我服務器上的 Redis
    # broker 用 1 號庫, backend 用 2 號庫
    broker="redis://:maverick@82.157.146.194:6379/1",
    backend="redis://:maverick@82.157.146.194:6379/2")

# 這裏通過 @app.task 對函數進行裝飾
# 那麼之後我們便可調用 task.delay 創建一個任務
@app.task
def task(name, age):
    print("準備執行任務啦")
    time.sleep(3)
    return f"name is {name}, age is {age}"

我們說執行任務的對象是 worker,那麼我們是不是需要創建一個 worker 呢?顯然是需要的,而創建 worker 可以使用如下命令創建:

注意:在 5.0 之前我們可以寫成 celery worker -A app ...,也就是把所有的參數都放在子命令 celery worker 的後面。但從 5.0 開始這種做法就不允許了,必須寫成 celery -A app worker ...,因爲 -A 變成了一個全局參數,所以它不應該放在 worker 的後面,而是要放在 worker 的前面。

下面執行該命令:

以上就前臺啓動了一個 worker,正在等待從隊列中獲取任務,圖中也顯示了相應的信息。然而此時隊列中並沒有任務,所以我們需要在另一個文件中創建任務併發送到隊列裏面去。

import time
from app import task

# 從 app 導入 task, 創建任務, 但是注意: 不要直接調用 task
# 因爲那樣的話就在本地執行了, 我們的目的是將任務發送到隊列裏面去
# 然後讓監聽隊列的 worker 從隊列裏面取任務並執行
# 而 task 被 @app.task 裝飾, 所以它不再是原來的 task 了
# 我們需要調用它的 delay 方法


# 調用 delay 之後, 就會創建一個任務
# 然後發送到隊列裏面去, 也就是我們這裏的 Redis
# 至於參數, 普通調用的時候怎麼傳, 在 delay 裏面依舊怎麼傳
start = time.perf_counter()
task.delay("古明地覺", 17)
print(
    time.perf_counter() - start
)  # 0.11716766700000003

然後執行該文件,發現只用了 0.12 秒,而 task 裏面明明 sleep 了 3 秒。所以說明這一步是不會阻塞的,調用 task.delay 只是創建一個任務併發送至隊列。我們再看一下 worker 的輸出信息:

可以看到任務已經被消費者接收並且消費了,而且調用 delay 方法是不會阻塞的,花費的那 0.12 秒是用在了其它地方,比如連接 Redis 發送任務等等。

另外需要注意,函數被 @app.task 裝飾之後,可以理解爲它就變成了一個任務工廠,因爲被裝飾了嘛,然後調用任務工廠的 delay 方法即可創建任務併發送到隊列裏面。我們也可以創建很多個任務工廠,但是這些任務工廠必須要讓 worker 知道,否則不會生效。所以如果修改了某個任務工廠、或者添加、刪除了某個任務工廠,那麼一定要讓 worker 知道,而做法就是先停止 celery worker 進程,然後再重新啓動。

如果我們新建了一個任務工廠,然後在沒有重啓 worker 的情況下,就用調用它的 delay 方法創建任務、併發送到隊列的話,那麼會拋出一個 KeyError,提示找不到相應的任務工廠。

其實很好理解,因爲代碼已經加載到內存裏面了,光修改了源文件而不重啓是沒用的。因爲加載到內存裏面的還是原來的代碼,不是修改過後的。

然後我們再來看看 Redis 中存儲的信息,1 號庫用作 broker,負責存儲任務;2 號庫用作 backend,負責存儲執行結果。我們來看 2 號庫:

以上我們就啓動了一個 worker 併成功消費了隊列中的任務,並且還從 Redis 裏面拿到了執行信息。當然啦,如果只能通過查詢 backend 才能拿到信息的話,那 celery 就太不智能了。我們也可以直接從程序中獲取。

直接查詢任務執行信息

Redis(backend)裏面存儲了很多關於任務的信息,這些信息我們可以直接在程序中獲取。

from app import task

res = task.delay("古明地覺", 17)
print(type(res))
"""
<class 'celery.result.AsyncResult'>
"""
# 直接打印,顯示任務的 id
print(res)
"""
4bd48a6d-1f0e-45d6-a225-6884067253c3
"""
# 獲取狀態, 顯然此刻沒有執行完
# 因此結果是PENDING, 表示等待狀態
print(res.status)
"""
PENDING
"""
# 獲取 id,兩種方式均可
print(res.task_id)
print(res.id)
"""
4bd48a6d-1f0e-45d6-a225-6884067253c3
4bd48a6d-1f0e-45d6-a225-6884067253c3
"""
# 獲取任務執行結束時的時間
# 任務還沒有結束, 所以返回None
print(res.date_done) 
"""
None
"""
# 獲取任務的返回值, 可以通過 result 或者 get()
# 注意: 如果是 result, 那麼任務還沒有執行完的話會直接返回 None
# 如果是 get(), 那麼會阻塞直到任務完成
print(res.result) 
print(res.get()) 
"""
None
name is 古明地覺, age is 17
"""
# 再次查看狀態和執行結束時的時間
# 發現 status 變成SUCCESS
# date_done 變成了執行結束時的時間
print(res.status)  
# 但顯示的是 UTC 時間
print(res.date_done)  
"""
SUCCESS
2022-09-08 06:40:34.525492
"""

另外我們說結果需要存儲在 backend 中,如果沒有配置 backend,那麼獲取結果的時候會報錯。至於 backend,因爲它是存儲結果的,所以一般會保存在數據庫中,因爲要持久化。我這裏爲了方便,就還是保存在 Redis 中。

celery.result.AsyncResult 對象

調用完任務工廠的 delay 方法之後,會創建一個任務併發送至隊列,同時返回一個 AsyncResult 對象,基於此對象我們可以拿到任務執行時的所有信息。但是 AsyncResult 對象我們也可以手動構造,舉個例子:

import time
# 我們不光要導入 task, 還要導入裏面的 app
from app import app, task
# 導入 AsyncResult 這個類
from celery.result import AsyncResult

# 發送任務到隊列當中
res = task.delay("古明地覺", 17)

# 傳入任務的 id 和 app, 創建 AsyncResult 對象
async_result = AsyncResult(res.id, app=app)

# 此時的這個 res 和 async_result 之間是等價的
# 兩者都是 AsyncResult 對象, 它們所擁有的方法也是一樣的
# 下面用誰都可以
while True:
    # 等價於async_result.state == "SUCCESS"
    if async_result.successful():
        print(async_result.get())
        break
    # 等價於async_result.state == "FAILURE"
    elif async_result.failed():
        print("任務執行失敗")
    elif async_result.status == "PENDING":
        print("任務正在被執行")
    elif async_result.status == "RETRY":
        print("任務執行異常正在重試")
    elif async_result.status == "REJECTED":
        print("任務被拒絕接收")
    elif async_result.status == "REVOKED":
        print("任務被取消")
    else:
        print("其它的一些狀態")
    time.sleep(0.8)

"""
任務正在被執行
任務正在被執行
任務正在被執行
任務正在被執行
name is 古明地覺, age is 17
"""

以上就是任務可能出現的一些狀態,通過輪詢的方式,我們也可以查看任務是否已經執行完畢。當然 AsyncResult 還有一些別的方法,我們來看一下:

from app import task

res = task.delay("古明地覺", 17)

# 1. ready():查看任務狀態,返回布爾值。
# 任務執行完成返回 True,否則爲 False
# 那麼問題來了,它和 successful() 有什麼區別呢?
# successful() 是在任務執行成功之後返回 True, 否則返回 False
# 而 ready() 只要是任務沒有處於阻塞狀態就會返回 True
# 比如執行成功、執行失敗、被 worker 拒收都看做是已經 ready 了
print(res.ready())  
"""
False
"""

# 2. wait():和之前的 get 一樣, 因爲在源碼中寫了: wait = get
# 所以調用哪個都可以, 不過 wait 可能會被移除,建議直接用 get 就行
print(res.wait())
print(res.get())
"""
name is 古明地覺, age is 17
name is 古明地覺, age is 17
"""

# 3. trackback:如果任務拋出了一個異常,可以獲取原始的回溯信息
# 執行成功就是 None
print(res.traceback)  
"""
None
"""

以上就是獲取任務執行結果相關的部分。

celery 的配置

celery 的配置不同,所表現出來的性能也不同,比如序列化的方式、連接隊列的方式,單線程、多線程、多進程等等。那麼 celery 都有那些配置呢?

根據情況,選擇合適的類型。如果不是跨語言的話,直接選擇 binary 即可,默認是 json。

# 將 RabbitMQ 作爲 broker 時需要使用
task_queues = {
    # 這是指定的默認隊列
    "default"{ 
        "exchange""default",
        "exchange_type""direct",
        "routing_key""default"
    },
    # 凡是 topic 開頭的 routing key
    # 都會被放到這個隊列
    "topicqueue"{ 
        "routing_key""topic.#",
        "exchange""topic_exchange",
        "exchange_type""topic",
    },
    "task_eeg"{ # 設置扇形交換機
        "exchange""tasks",
        "exchange_type""fanout",
        "binding_key""tasks",
    },
}

celery 的配置非常多,不止我們上面說的那些,更多配置可以查看官網,寫的比較詳細。

https://docs.celeryq.dev/en/latest/userguide/configuration.html#general-settings

值得一提的是,在 5.0 之前配置項都是大寫的,而從 5.0 開始配置項改成小寫了。不過老的寫法目前仍然支持,只是啓動的時候會拋警告,並且在 6.0 的時候不再兼容老的寫法。

官網也很貼心地將老版本的配置和新版本的配置羅列了出來,儘管配置有很多,但並不是每一個都要用,可以根據自身的業務合理選擇。

然後下面我們就根據配置文件的方式啓動 celery,當前目錄結構如下:

celery_demo/config.py

broker_url = "redis://:maverick@82.157.146.194:6379/1"
result_backend = "redis://:maverick@82.157.146.194:6379"
# 寫倆就完事了

celery_demo/tasks/task1.py

celery 可以支持非常多的定時任務,而不同種類的定時任務我們一般都會寫在不同的模塊中(當然這裏目前只有一個),然後再將這些模塊組織在一個單獨的目錄中。

當前只有一個 task1.py,我們隨便往裏面寫點東西,當然你也可以創建更多的文件。

def add(x, y):
    return x + y 

def sub(x, y):
    return x - y

def mul(x, y):
    return x * y 

def div(x, y):
    return x / y

celery_demo/app.py

from celery import Celery
import config
from tasks.task1 import (
    add, sub, mul, div
)

# 指定一個 name 即可
app = Celery("satori")
# 其它參數通過加載配置文件的方式指定
# 和 flask 非常類似
app.config_from_object(config)

# 創建任務工廠,有了任務工廠才能創建任務
# 這種方式和裝飾器的方式是等價的
add = app.task(add)
sub = app.task(sub)
mul = app.task(mul)
div = app.task(div)

然後重新啓動 worker:

輸出結果顯示,任務工廠都已經被加載進來了,然後我們創建任務併發送至隊列。

# 在 celery_demo 目錄下
# 將 app.py 裏面的任務工廠導入進來
>>> from app import add, sub, mul, div
# 然後創建任務發送至隊列,並等待結果
>>> add.delay(3, 4).get()
7
>>> sub.delay(3, 4).get()
-1
>>> mul.delay(3, 4).get()
12
>>> div.delay(3, 4).get()
0.75
>>>

結果正常返回了,再來看看 worker 的輸出,

多個任務都被執行了。

發送任務時指定參數

我們在發送任務到隊列的時候,使用的是 delay 方法,裏面直接傳遞函數所需的參數即可,那麼除了函數需要的參數之外,還有沒有其它參數呢?

首先 delay 方法實際上是調用的 apply_async 方法,並且 delay 方法裏面只接收函數的參數,但是 apply_async 接收的參數就很多了,我們先來看看它們的函數原型:

**delay 方法的 *args 和 kwargs 就是函數的參數,它會傳遞給 apply_async 的 args 和 kwargs。而其它的參數就是發送任務時所設置的一些參數,我們這裏重點介紹一下 apply_async 的其它參數。

我們隨便挑幾個舉例說明:

>>> from app import add
# 使用 apply_async,要注意參數的傳遞
# 位置參數使用元組或者列表,關鍵字參數使用字典
# 因爲是args和kwargs, 不是 *args和 **kwargs
>>> add.apply_async([3]{"y": 4}, 
...                 task_id="戀戀", 
...                 countdown=5).get()

7
>>>

查看一下 worker 的輸出:

注意左邊的時間,16:25:16 收到的消息,但 5 秒後才執行完畢,因爲我們將 countdown 參數設置爲 5。並且任務的 id 也被我們修改了。

另外還需要注意一下那些接收時間的參數,比如 eta。如果我們手動指定了 eta,那麼一定要注意時區的問題,要保證 celery 所使用的時區和你傳遞的 datetime 的時區是統一的。

其它的參數可以自己手動測試一下,這裏不細說了,根據自身的業務選擇合適的參數即可。

創建任務工廠的另一種方式

之前在創建任務工廠的時候,是將函數導入到 app.py 中,然後通過 add = app.task(add) 的方式手動裝飾,因爲有哪些任務工廠必須要讓 worker 知道,所以一定要在 app.py 裏面出現。但是這顯然不夠優雅,那麼可不可以這麼做呢?

# celery_demo/tasks/task1.py
from app import app
# celery_demo 所在路徑位於 sys.path 中
# 因此這裏可以直接 from app import app
@app.task
def add(x, y):
    return x + y

@app.task
def sub(x, y):
    return x - y
    
# celery_demo/app.py    
from tasks.task1 import add, sub

按照上面這種做法,理想上可以,但現實不行,因爲會發生循環導入。

所以 celery 提供了一個辦法,我們依舊在 task1.py 中 import app,但在 app.py 中不再使用 import,而是通過 include 加載的方式,我們看一下:

# celery_demo/tasks/task1.py
from app import app

@app.task
def add(x, y):
    return x + y

@app.task
def sub(x, y):
    return x - y
    
# celery_demo/app.py    
from celery import Celery
import config

# 通過 include 指定存放任務的 py 文件
# 注意它和 worker 啓動路徑之間的關係
# 我們是在 celery_demo 目錄下啓動的 worker
# 所以應該寫成 "tasks.task1"
# 如果是在 celery_demo 的上一級目錄啓動 worker
# 那麼這裏就要指定爲 "celery_demo.tasks.task1"
# 當然啓動時的 -A app 也要換成 -A celery_demo.app
app = Celery(__name__, include=["tasks.task1"])
# 如果還有其它文件,比如 task2.py, task3.py
# 那麼就把 "tasks.task2", "tasks.task3" 加進去
app.config_from_object(config)

在 celery_demo 目錄下重新啓動 worker。

爲了方便,我們只保留了兩個任務工廠。可以看到此時就成功啓動了,並且也更加方便和優雅一些。之前是在 task1.py 中定義函數,然後再把 task1.py 中的函數導入到 app.py 裏面,然後手動進行裝飾。雖然這麼做是沒問題的,但很明顯這種做法不適合管理。

所以還是要將 app.py 中的 app 導入到 task1.py 中直接創建任務工廠,但如果再將 task1.py 中的任務工廠導入到 app.py 中就會發生循環導入。於是 celery 提供了一個 include 參數,可以在創建 app 的時候自動將裏面所有的任務工廠加載進來,然後啓動並告訴 worker。

我們來測試一下:

# 通過 tasks.task1 導入任務工廠
# 然後創建任務,發送至隊列
>>> from tasks.task1 import add, sub
>>> add.delay(11, 22).get()
33
>>> sub.delay(11, 22).get()
-11
>>>

查看一下 worker 的輸出:

結果一切正常。

Task 對象

我們之前通過對一個函數使用 @app.task 即可將其變成一個任務工廠,而這個任務工廠就是一個 Task 實例對象。而我們在使用 @app.task 的時候,其實是可以加上很多的參數的,常用參數如下:

當然 app.task 還有很多不常用的參數,這裏就不說了,有興趣可以去查看官網或源碼,我們演示一下幾個常用的參數:

# celery_demo/tasks/task1.py
from app import app

@app.task()
def add(x, y):
    return x + y

@app.task(bind=True)
def sub(self, x, y):
    """
    如果 bind=True,則需要多指定一個 self
    這個 self 就是對應的任務工廠
    """
    # self.request 是一個 celery.task.Context 對象
    # 獲取它的屬性字典,即可拿到該任務的所有屬性
    print(self.request.__dict__)
    return x - y

其它代碼不變,我們重新啓動 worker:

然後創建任務發送至隊列,再由 worker 取出執行:

>>> from tasks.task1 import add, sub
>>> add.delay(111, 222).get()
333
>>> sub.delay(111, 222).get()
-111
>>>

執行沒有問題,然後看看 worker 的輸出:

創建任務工廠時,如果指定了 bind=True,那麼執行任務時會將任務工廠本身作爲第一個參數傳過去。任務工廠本質上就是 Task 實例對象,調用它的 delay 方法即可創建任務。

所以如果我們在 sub 內部繼續調用 self.delay(11, 22),會有什麼後果呢?沒錯,worker 會進入無限遞歸。因爲執行任務的時候,在任務的內部又創建了任務,所以會死循環下去。

當然 self 還有很多其它屬性和方法,具體有哪些可以通過 Task 這個類來查看。這裏面比較重要的是 self.request,它包含了某個具體任務的相關信息,而且信息非常多。

比如當前傳遞的參數是什麼,就可以通過 self.request 拿到。當然啦,self.request 是一個 Context 對象,因爲不同任務獲取 self.request 的結果肯定是不同的,但 self(任務工廠)卻只有一個,所以要基於 Context 進行隔離。

我們可以通過 dict 拿到 Context 對象的屬性字典,然後再進行操作。

最後再來說一說 @app.task 裏面的 base 參數。

# celery_demo/tasks/task1.py
from celery import app
from app import Task

class MyTask(Task):
    """
    自定義一個類,繼承自celery.Task
    exc: 失敗時的錯誤的類型;
    task_id: 任務的id;
    args: 任務函數的位置參數;
    kwargs: 任務函數的關鍵字參數;
    einfo: 失敗時的異常詳細信息;
    retval: 任務成功執行的返回值;
    """
    def on_failure(self, exc, task_id, args, kwargs, einfo):
        """任務失敗時執行"""

    def on_success(self, retval, task_id, args, kwargs):
        """任務成功時執行"""
        print("任務執行成功")

    def on_retry(self, exc, task_id, args, kwargs, einfo):
        """任務重試時執行"""


# 使用 @app.task 的時候,指定 base 即可
# 然後任務在執行的時候,會觸發 MyTask 裏面的回調函數
@app.task(base=MyTask)
def add(x, y):
    print("加法計算")
    return x + y

重新啓動 worker,然後創建任務。

指定了 base,任務在執行的時候會根據執行狀態的不同,觸發 MyTask 裏面的不同方法。

自定義任務流

有時候我們也可以將執行的多個任務,劃分到一個組中。

# celery_demo/tasks/task1.py
from app import app

@app.task()
def add(x, y):
    print("加法計算")
    return x + y

@app.task()
def sub(x, y):
    print("減法計算")
    return x - y

@app.task()
def mul(x, y):
    print("乘法計算")
    return x * y

@app.task()
def div(x, y):
    print("除法計算")
    return x // y

老規矩,重啓 worker,因爲我們修改了任務工廠。

然後來導入它們,創建任務,並將這些任務劃分到一個組中。

>>> from tasks.task1 import add, sub, mul, div
>>> from celery import group
# 調用 signature 方法,得到 signature 對象
# 此時 t1.delay() 和 add.delay(2, 3) 是等價的
>>> t1 = add.signature(args=(2, 3))
>>> t2 = sub.signature(args=(2, 3))
>>> t3 = mul.signature(args=(2, 3))
>>> t4 = div.signature(args=(4, 2))
# 但是變成 signature 對象之後,
# 我們可以將其放到一個組裏面
>>> gp = group(t1, t2, t3, t4)
# 執行組任務
# 返回 celery.result.GroupResult 對象
>>> res = gp()
# 每個組也有一個唯一 id
>>> print("組id:", res.id) 
組id: 65f32cc4-b8ce-4bf8-916b-f5cc359a901a

# 調用 get 方法也會阻塞,知道組裏面任務全部完成
>>> print("組結果:", res.get())
組結果: [5, -1, 6, 2]
>>>

可以看到整個組也是有唯一 id 的,另外 signature 也可以寫成 subtask 或者 s,在源碼裏面這幾個是等價的。

我們觀察一下 worker 的輸出,任務是併發執行的,所以哪個先完成不好說。但是調用組的 get 方法時,裏面的返回值順序一定和任務添加時候的順序保持一致。

除此之外,celery 還支持將多個任務像鏈子一樣串起來,第一個任務的輸出會作爲第二個任務的輸入,傳遞給下一個任務的第一個參數。

# celery_demo/tasks/task1.py
from app import app

@app.task
def task1():
    l = []
    return l

@app.task
# task1 的返回值會傳遞給這裏的 task1_return
def task2(task1_return, value):
    task1_return.append(value)
    return task1_return

@app.task
# task2 的返回值會傳遞給這裏的 task2_return
def task3(task2_return, num):
    return [i + num for i in task2_return]

@app.task
# task3 的返回值會傳遞給這裏的 task3_return
def task4(task3_return):
    return sum(task3_return)

然後我們看怎麼將這些任務像鏈子一樣串起來。

>>> from tasks.task1 import *
>>> from celery import chain
# 將多個 signature 對象進行與運算
# 當然內部肯定重寫了 __or__ 這個魔法方法
>>> my_chain = chain(
...     task1.s() | task2.s(123) | task3.s(5) | task4.s())
>>> 
# 執行任務鏈
>>> res = my_chain()
# 獲取返回值
>>> print(res.get())
128
>>>

這種鏈式處理的場景非常常見,比如 MapReduce。

celery 實現定時任務

既然是定時任務,那麼就意味着 worker 要後臺啓動,否則一旦遠程連接斷開,就停掉了。因此 celery 是支持我們後臺啓動的,並且可以啓動多個。

# 啓動 worker
celery multi start w1 -A app -l info 
# 可以同時啓動多個
celery multi start w2 w3 -A app -l info

# 以上我們就啓動了 3 個 worker
# 如果想停止的話
celery multi stop w1 w2 w3 -A app -l info

但是注意,這種啓動方式在 Windows 上面不支持,因爲 celery 會默認創建兩個目錄,分別是 /var/log/celery 和 /var/run/celery,顯然這是類 Unix 系統的目錄結構。

顯然啓動和關閉是沒有問題的,不過爲了更好地觀察到輸出,我們還是用之前的方式,選擇前臺啓動。

然後回顧一下 celery 的架構,裏面除了 producer 之外還有一個 celery beat,也就是調度器。我們調用任務工廠的 delay 方法,手動將任務發送到隊列,此時就相當於 producer。如果是設置定時任務,那麼會由調度器自動將任務添加到隊列。

我們在 tasks 目錄裏面再創建一個 period_task1.py 文件。

# celery_demo/tasks/period_task1.py
from celery.schedules import crontab
from app import app
from .task1 import task1, task2, task3, task4


@app.on_after_configure.connect
def period_task(sender, **kwargs):
    # 第一個參數爲 schedule,可以是 float,或者 crontab
    # crontab 後面會說,第二個參數是任務,第三個參數是名字
    sender.add_periodic_task(10.0, task1.s(),
                             )
    sender.add_periodic_task(15.0, task2.s("task2"),
                             )
    sender.add_periodic_task(20.0, task3.s(),
                             )
    sender.add_periodic_task(
        crontab(hour=18, minute=5, day_of_week=0),
        task4.s("task4"),
        
    )

# celery_demo/tasks/task1.py 
from app import app

@app.task
def task1():
    print("我是task1")
    return "task1你好"

@app.task
def task2(name):
    print(f"我是{name}")
    return f"{name}你好"

@app.task
def task3():
    print("我是task3")
    return "task3你好"

@app.task
def task4(name):
    print(f"我是{name}")
    return f"{name}你好"

既然使用了定時任務,那麼一定要設置時區。

# celery_demo/config.py
broker_url = "redis://:maverick@82.157.146.194:6379/1"
result_backend = "redis://:maverick@82.157.146.194:6379/2"
# 之前說過,celery 默認使用 utc 時間
# 其實我們是可以手動禁用的,然後手動指定時區
enable_utc = False
timezone = "Asia/Shanghai"

最後是修改 app.py,將定時任務加進去。

from celery import Celery
import config

app = Celery(
    __name__,
    include=["tasks.task1""tasks.period_task1"])
app.config_from_object(config)

下面就來啓動任務,先來啓動 worker,生產上應該後臺啓動,這裏爲了看到信息,選擇前臺啓動。

tasks.task1 裏面的 4 個任務工廠都被添加進來了,然後再來啓動調度器。

調度器啓動之後會自動檢測定時任務,如果到時間了,就發送到隊列。而啓動調度器的命令如下:

根據調度器的輸出內容,我們知道定時任務執行完了,但很明顯定時任務本質上也是任務,只不過有定時功能,但也要發到隊列裏面。然後 worker 從隊列裏面取出任務,並執行,那麼 worker 必然會有信息輸出。

調度器啓動到現在已經有一段時間了,worker 在終端中輸出了非常多的信息。

此時我們就成功實現了定時任務,並且是通過定義函數、打上裝飾器的方式實現的。除此之外,我們還可以通過配置的方式實現。

# celery_demo/tasks/period_task1.py
from celery.schedules import crontab
from app import app

# 此時也不需顯式導入任務工廠了
# 直接以字符串的方式指定即可
app.conf.beat_schedule = {
    # 參數通過 args 和 kwargs 指定
    "每10秒執行一次"{"task""tasks.task1.task1",
                 "schedule": 10.0},
    "每15秒執行一次"{"task""tasks.task1.task2",
                 "schedule": 15.0,
                 "args"("task2",)},
    "每20秒執行一次"{"task""tasks.task1.task3",
                 "schedule": 20.0},
    "每個星期天的18:05運行一次"{"task""tasks.task1.task4",
                        "schedule": crontab(hour=18,
                                            minute=5,
                                            day_of_week=0),
                        "args"("task4",)}
}

需要注意:雖然我們不用顯式導入任務工廠,但其實是 celery 自動幫我們導入。由於這些任務工廠都位於 celery_demo/tasks/task1.py 裏面,而 worker 也是在 celery_demo 目錄下啓動的,所以需要指定爲 tasks.task1.task{1234}。

這種啓動方式也是可以成功的,貌似還更方便一些,但是會多出一個文件,用來存儲配置信息。

crontab 參數

定時任務除了指定一個浮點數之外(表示每隔多少秒執行一次),還可以指定 crontab。關於 crontab 應該都知道是什麼,我們在 Linux 上想啓動定時任務的話,直接 crontab -e 然後添加即可。

而 celery 的 crontab 和 Linux 高度相似,我們看一下函數原型就知道了。

**簡單解釋一下:
**

以上就是這些參數的含義,並且參數接收的值還可以是一些特殊的通配符:

通配符之間是可以自由組合的,比如 */3,8-17 就表示能被 3 整除,或範圍處於 8-17 的時候觸發。

除此之外,還可以根據天色來設置定時任務(有點離譜)。

from celery.schedules import solar
app.conf.beat_schedule = {
    "日落"{"task""task1", 
           "schedule": solar("sunset", 
                             -37.81753, 
                             144.96715)
          },
}

solar 裏面接收三個參數,分別是 event、lat、lon,後兩個比較簡單,表示觀測者所在的緯度和經度。值大於 0,則對應東經 / 北緯,小於 0,則對應西經 / 南緯。

我們重點看第一個參數 event,可選值如下:

比如代碼中的 "sunset", -37.81753, 144.96715 就表示,當站在南緯 37.81753、東經 144.96715 的地方觀察,發現傍晚太陽的上邊緣消失在西方地平線上的時候,觸發任務執行。

個人覺得這個功能有點強悍,但估計絕大部分人應該都用不到,可能氣象領域相關的會用的比較多。

小結

以上就是 celery 的使用,另外這裏的 broker 和 backend 用的都是 Redis,其實還可以使用 RabbitMQ 和數據庫。

broker_url = "amqp://admin:123456@82.157.146.194:5672//"
result_backend = "mysql+pymysql://root:123456@82.157.146.194:3306/store"

可以自己測試一下,但不管用的是哪種存儲介質,對於我們使用 celery 而言,都是沒有區別的。

celery 在工作中用的還是比較多的,而且有一個調度工具 Apache airflow,它的核心調度功能也是基於 celery 實現的。

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/xZL2AXwH0HXbOANv5XMBJg