5 分鐘快速掌握 Python 定時任務框架

APScheduler 簡介

在實際開發中我們經常會碰上一些重複性或週期性的任務,比如像每天定時爬取某個網站的數據、一定週期定時運行代碼訓練模型等,類似這類的任務通常需要我們手動來進行設定或調度,以便其能夠在我們設定好的時間內運行。

在 Windows 上我們可以通過計劃任務來手動實現,而在 Linux 系統上往往我們會用到更多關於 crontab 的相關操作。但手動管理並不是一個很好的選擇,如果我們需要有十幾個不同的定時任務需要管理,那麼每次通過人工來進行干預未免有些笨拙,那這時候就真的是「人工智能」了。

所以將這些定時任務的調度代碼化纔是能夠讓我們很好地從這種手動管理的純人力操作中解脫出來。

在 Python 生態中對於定時任務的一些操作主要有那麼幾個:

  1. schedule:第三方模塊,該模塊適合比較輕量級的一些調度任務,但卻不適用於複雜時間的調度

  2. APScheduler:第三方定時任務框架,是對 Java 第三方定時任務框架 Quartz 的模仿與移植,能提供比 schedule 更復雜的應用場景,並且各種組件都是模塊化,易於使用與二次開發。

  3. Celery Beat:屬於 celery 這分佈式任務隊列第三方庫下的一個定時任務組件,如果使用需要配合 RabbitMQ 或 Redis 這類的消息隊列套件,需要花費一定的時間在環境搭建上,但在高版本中已經不支持 Windows。

所以爲了滿足能夠相對複雜的時間條件,又不需要在前期的環境搭建上花費很多時間的前提下,選擇 APScheduler 來對我們的調度任務或定時任務進行管理是個性價比極高的選擇。而本文主要會帶你快速上手有關 APScheduler 的使用。

APScheduler 概念與組件

雖然說官方文檔上的內容不是很多,而且所列舉的 API 不是很多,但這側面也反映了這一框架的簡單易用。所以在使用 APScheduler 之前,我們需要對這個框架的一些概念簡單瞭解,主要有那麼以下幾個:

觸發器(trigger)

所謂的觸發器就是用以觸發定時任務的組件,在 APScheduler 中主要是指時間觸發器,並且主要有三類時間觸發器可供使用:

任務持久化(job stores)

任務持久化主要是用於將設定好的調度任務進行存儲,即便是程序因爲意外情況,如斷電、電腦或服務器重啓時,只要重新運行程序時,APScheduler 就會根據對存儲好的調度任務結果進行判斷,如果出現已經過期但未執行的情況會進行相應的操作。

APScheduler 爲我們提供了多種持久化任務的途徑,默認是使用 memory 也就是內存的形式,但內存並不是持久化最好的方式。最好的方式則是通過像數據庫這樣的載體來將我們的定時任務寫入到磁盤當中,只要磁盤沒有損壞就能將數據給恢復。

APScheduler 支持的且常用的數據庫主要有:

通常我們可以在創建 Scheduler 實例時創建,或是單獨爲任務指定。配置的方式相對簡單,我們只需要指定對應的數據庫鏈接即可。

執行器(executor)

執行器顧名思義就是執行我們任務的對象,在計算機內通常要麼是 CPU 調度任務,要麼是單獨維護一個線程來運行任務。所以 APScheduler 裏的執行器通常就是 ThreadPoolExecutorProcessPoolExecutor 這樣的線程池和進程池兩種。

當然如果是和協程或異步相關的任務調度,還可以使用對應的 AsyncIOExecutorTwistedExecutorGeventExecutor 三種執行器。

調度器(scheduler)

調度器的選擇主要取決於你當前的程序環境以及 APScheduler 的用途。根據用途的不同,APScheduler 又提供了以下幾種調度器:

通常情況下如果不是和 Web 項目或應用集成共存,那麼往往都首選 BlockingScheduler 調度器來進行操作,它會在當前進程中啓動相應的線程來進行任務調度與處理;反之,如果是和 Web 項目或應用共存,那麼需要選擇 BackgroundScheduler 調度器,因爲它不會干擾當前應用的線程或進程狀況。

基於對以上的概念和組件認識,我們就能基本上摸清 APScheduler 的運行流程:

  1. 設定調度器(scheduler)用以對任務的調度與安排進行全局統籌

  2. 對相應的函數或方法上設定相應的觸發器(trigger),並添加到調度器中

  3. 如有任務持久化(job stores)需要則需要設定對應的持久化層,否則默認使用內存存儲任務

  4. 當觸發器被觸發時,就將任務交由執行器(executor)進行執行

APScheduler 快速上手

雖然 APScheduler 裏面的概念和組件看起來有點多,但在使用上並不算很複雜,我們可以通過本節的示例就能夠很快使用。

選擇對應的 scheduler

在使用之前我們需要先實例化一個 scheduler 對象,所有的 scheduler 對象都被放在了 apscheduler.schedulers 模塊下,我們可以直接通過查看 API 文檔或者藉助 IDE 補全的提示來獲取相應的 scheduler 對象。

這裏我直接選取了最基礎的 BlockingScheduler

1# main.py
2
3from apscheduler.schedulers.blocking import BlockingScheduler
4
5scheduler = BlockingScheduler()
6
7

配置 scheduler

對於 scheduler 的一些配置我們可以直接在實例化對象時就進行配置,當然也可以在創建實例化對象之後再進行配置。

實例化時進行參數配置:

 1# main.py
 2from datetime import datetime
 3
 4from apscheduler.executors.pool import ThreadPoolExecutor
 5from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
 6from apscheduler.schedulers.blocking import BlockingScheduler
 7
 8# 任務持久化 使用 SQLite
 9jobstores = {
10    'default': SQLAlchemyJobStore(url = 'sqlite:///jobs.db')
11}
12# 執行器配置
13executors = {
14    'default': ThreadPoolExecutor(20),
15}
16# 關於 Job 的相關配置,見官方文檔 API
17job_defaults = {
18    'coalesce': False,
19    'next_run_time': datetime.now()
20}
21scheduler = BlockingScheduler(
22  jobstores = jobstores,
23  executors = executors,
24  job_defaults = job_defaults,
25  timezone = 'Asia/Shanghai'
26)
27
28

或是通過 scheduler.configure 方法進行同樣的操作:

1scheduler = BlockingScheduler()
2scheduler.configure(jobstores=jobstores, executors=executors, job_defaults=job_defaults, timezone='Asia/Shanghai')
3
4

添加並執行你的任務

創建 scheduler 對象之後,我們需要調用其下的 add_job() 或是 scheduled_job() 方法來將我們需要執行的函數進行註冊。前者是以傳參的形式指定對應的函數名,而後者則是以裝飾器的形式直接對我們要執行的函數進行修飾。

比如我現在有一個輸出此時此刻時間的函數 now()

1from datetime import datetime
2
3def now(trigger):
4    print(f"trigger:{trigger} -> {datetime.now()}")
5
6

然後我打算每 5 秒的時候運行一次,那我們使用 add_job() 可以這樣寫:

1if __name__ == '__main__':
2    scheduler.add_job(now, trigger = "interval", args = ("interval",), seconds = 5)
3    scheduler.start()
4
5

在調用 start() 方法之後調度器就會開始執行,並在控制檯上看到對應的結果了:

1trigger:interval -> 2021-01-16 21:19:43.356674
2trigger:interval -> 2021-01-16 21:19:46.679849
3trigger:interval -> 2021-01-16 21:19:48.356595
4
5

當然使用 @scheduled_job 的方式來裝飾我們的任務或許會更加自由一些,於是上面的例子就可以寫成這樣:

1@scheduler.scheduled_job(trigger = "interval", args = ("interval",), seconds = 5)
2def now(trigger):
3    print(f"trigger:{trigger} -> {datetime.now()}")
4
5if __name__ == '__main__':
6    scheduler.start()
7
8

運行之後就會在控制檯看到同樣的結果了。

不過需要注意的是,添加任務一定要在 start() 方法執行前調用,否則會找不到任務或是拋出異常。

將 APScheduler 集成到 Web 項目中

如果你是正在做有關的 Web 項目且存在一些定時任務,那麼得益於 APScheduler 由於多樣的調度器,我們能夠將其和我們的項目結合到一起。

如果你正在使用 Flask,那麼 Flask-APScheduler 這一別人寫好的第三方包裝庫就很適合你,雖然它沒有相關的文檔,但只要你瞭解了前面我所介紹的有關於 APScheduler 的概念和組件,你就能很輕易地看懂這個第三方庫倉庫裏的示例代碼。

如果你使用的不是 Flask 框架,那麼 APScheduler 本身也提供了一些對任務或作業的增刪改查操作,我們可以自己編寫一套合適的 API。

這裏我使用的是 FastAPI 這一目前流行的 Web 框架。demo 項目結構如下:

1temp-scheduler
2├── config.py       # 配置項
3├── main.py         # API 文件
4└── scheduler.py    # APScheduler 相關設置
5
6

安裝依賴

這裏我們需要的依賴不多,只需要簡單幾個即可:

1pip install fastapi apscheduler sqlalchemy uvicorn
2
3

配置項

如果項目中模塊過多,那麼使用一個文件或模塊來進行統一管理是最好的選擇。這裏的 config.py 我們主要像 Flask 的配置那樣簡單設定:

 1from apscheduler.executors.pool import ThreadPoolExecutor
 2from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
 3from apscheduler.schedulers.blocking import BlockingScheduler
 4
 5class SchedulerConfig:
 6
 7    JOBSTORES = {"default": SQLAlchemyJobStore(url="sqlite:///job.db")}
 8    EXECUTORS = {"default": ThreadPoolExecutor(20)}
 9    JOB_DEFAULTS = {"coalesce": False}
10
11    @classmethod
12    def to_dict(cls):
13        return {
14            "jobstores": cls.JOBSTORES,
15            "executors": cls.EXECUTORS,
16            "job_defaults": cls.JOB_DEFAULTS,
17        }
18
19

SchedulerConfig 配置項中我們可以自己實現一個 to_dict() 類方法,以便我們後續傳參時通過解包的方式直接傳入配置參數即可。

Scheduler 相關設置

scheduler.py 模塊的設定也比較簡單,即設定對應的 scheduler 調度器即可。由於是演示 demo 我還將要定期執行的任務也放在了這個模塊當中:

 1import logging
 2from datetime import datetime
 3
 4from apscheduler.schedulers.background import BackgroundScheduler
 5
 6from config import SchedulerConfig
 7
 8scheduler = BackgroundScheduler()
 9logger = logging.getLogger(__name__)
10
11def init_scheduler() -> None:
12    # config scheduler
13    scheduler.configure(**SchedulerConfig.to_dict())
14
15    logger.info("scheduler is running...")
16
17    # schedule test
18    scheduler.add_job(
19        func=mytask,
20        trigger="date",
21        args=("APScheduler Initialize.",),
22        next_run_time=datetime.now(),
23    )
24    scheduler.start()
25
26def mytask(message: str) -> None:
27    print(f"[{datetime.now()}] message: {message}")
28
29

在這一部分中:

API 設置

main.py 模塊就主要存放着我們由 FastAPI 所構建的相關 API。如果在後續開發時存在多個接口,此時就需要將不同接口放在不同模塊文件中,以達到路由的分發與管理,類似於 Flask 的藍圖模式。

  1import logging
  2import uuid
  3from datetime import datetime
  4from typing import Any, Dict, Optional, Sequence, Union
  5
  6from fastapi import FastAPI
  7from pydantic import BaseModel
  8
  9from scheduler import init_scheduler, mytask, scheduler
 10
 11logger = logging.getLogger(__name__)
 12
 13app = FastAPI(title="APScheduler API")
 14app.add_event_handler("startup", init_scheduler)
 15
 16class Job(BaseModel):
 17    id: Union[int, str, uuid.UUID]
 18    name: Optional[str] = None
 19    func: Optional[str] = None
 20    args: Optional[Sequence[Optional[str]]] = None
 21    kwargs: Optional[Dict[str, Any]] = None
 22    executor: Optional[str] = None
 23    misfire_grace_time: Optional[str] = None
 24    coalesce: Optional[bool] = None
 25    max_instances: Optional[int] = None
 26    next_run_time: Optional[Union[str, datetime]] = None
 27
 28@app.post("/add")
 29def add_job(
 30    message: str,
 31    trigger: str,
 32    trigger_args: Optional[dict],
 33    id: Union[str, int, uuid.UUID],
 34):
 35    try:
 36        scheduler.add_job(
 37            func=mytask,
 38            trigger=trigger,
 39            kwargs={"message": message},
 40            id=id,
 41            **trigger_args,
 42        )
 43    except Exception as e:
 44        logger.exception(e.args)
 45        return {"status_code": 0, "message": "添加失敗"}
 46    return {"status_code": 1, "message": "添加成功"}
 47
 48@app.delete("/delete/{id}")
 49def delete_job(id: Union[str, int, uuid.UUID]):
 50    """delete exist job by id"""
 51    try:
 52        scheduler.remove_job(job_id=id)
 53    except Exception:
 54        return dict(
 55            message="刪除失敗",
 56            status_code=0,
 57        )
 58    return dict(
 59        message="刪除成功",
 60        status_code=1,
 61    )
 62
 63@app.put("/reschedule/{id}")
 64def reschedule_job(
 65    id: Union[str, int, uuid.UUID], trigger: str, trigger_args: Optional[dict]
 66):
 67    try:
 68        scheduler.reschedule_job(job_id=id, trigger=trigger, **trigger_args)
 69    except Exception as e:
 70        logger.exception(e.args)
 71        return dict(
 72            message="修改失敗",
 73            status_code=0,
 74        )
 75    return dict(
 76        message="修改成功",
 77        status_code=1,
 78    )
 79
 80@app.get("/job")
 81def get_all_jobs():
 82    jobs = None
 83    try:
 84        job_list = scheduler.get_jobs()
 85        if job_list:
 86            jobs = [Job(**task.__getstate__()) for task in job_list]
 87    except Exception as e:
 88        logger.exception(e.args)
 89        return dict(
 90            message="查詢失敗",
 91            status_code=0,
 92            jobs=jobs,
 93        )
 94    return dict(
 95        message="查詢成功",
 96        status_code=1,
 97        jobs=jobs,
 98    )
 99
100@app.get("/job/{id}")
101def get_job_by_id(id: Union[int, str, uuid.UUID]):
102    jobs = []
103    try:
104        job = scheduler.get_job(job_id=id)
105        if job:
106            jobs = [Job(**job.__getstate__())]
107    except Exception as e:
108        logger.exception(e.args)
109        return dict(
110            message="查詢失敗",
111            status_code=0,
112            jobs=jobs,
113        )
114    return dict(
115        message="查詢成功",
116        status_code=1,
117        jobs=jobs,
118    )
119
120

以上代碼看起來很多,其實核心的就那麼幾點:

  1. FastAPI 對象 app 的初始化。這裏用到的 add_event_handler() 方法就有點像 Flask 中的 before_first_request,會在 Web 服務請求伊始進行操作,理解爲初始化相關的操作即可。

  2. API 接口路由。路由通過 app 對象下的對應 HTTP 方法來實現,如 GETPOSTPUT 等。這裏的裝飾器用法其實也和 Flask 很類似,就不多贅述。

  3. scheduler 對象的增刪改查。從 scheduler.py 模塊中引入我們創建好的 scheduler 對象之後就可以直接用來做增刪改查的操作:

  4. 增:使用 add_job() 方法,其主要的參數是要運行的函數(或方法)、觸發器以及觸發器參數等

  5. 刪:使用 delete_job() 方法,我們需要傳入一個對應任務的 id 參數,用以能夠查找到對應的任務

  6. 改:使用 reschedule_job() 方法,這裏也需要一個對應任務的 id 參數,以及需要重新修改的觸發器及其參數

  7. 查:使用 get_jobs()get_job() 兩個方法,前者是直接獲取到當前調度的所有任務,返回的是一個包含了 APScheduler.job.Job 對象的列表,而後者是通過 id 參數來查找對應的任務對象;這裏我通過底層源碼使用 __getstate__() 來獲取到任務的相關信息,這些信息我們通過事先設定好的 Job 對象來對其進行序列化,最後將信息從接口中返回。

運行

完成以上的所有操作之後,我們就可以打開控制檯,進入到該目錄下並激活我們的虛擬環境,之後運行:

1uvicorn main:app 
2
3

之後我們就能在 FastAPI 默認的地址 http://127.0.0.1:8000/docs  中看到關於全部接口的 Swagger 文檔頁面了:

fastapi 集成的 swagger 頁面

之後我們可以直接在文檔裏面或使用 Postman 來自己進行接口測試即可。

結尾

本文介紹了有關於 APScheduler 框架的概念及其用法,並進行了簡單的實踐。

得益於 APScheduler 的模塊化設計纔可以讓我們更方便地去理解、使用它,並將其運用到我們實際的開發過程中。

從 APScheduler 目前的 Github 倉庫代碼以及 issue 來看,作者已經在開始重構 4.0 版本,當中的一些源代碼和 API 也有較大的變動,相信在 4.0 版本中將會引入更多的新特性。

但如果現階段你正打算使用或已經使用 APScheduler 用於實際生產中,那麼希望本文能對會你有所幫助。

本文所演示的 Web 集成 APScheduler 項目已經上傳到我的碼雲倉庫 fastapi-apscheduler,感興趣的朋友可以隨時借鑑。

作者:100gle,練習時長不到兩年的非正經文科生一枚,喜歡敲代碼、寫寫文章、搗鼓搗鼓各種新事物;現從事有關大數據分析與挖掘的相關工作。

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/2dN4aCQpDEN4nkRg80SPYQ