5 分鐘快速掌握 Python 定時任務框架
APScheduler 簡介
在實際開發中我們經常會碰上一些重複性或週期性的任務,比如像每天定時爬取某個網站的數據、一定週期定時運行代碼訓練模型等,類似這類的任務通常需要我們手動來進行設定或調度,以便其能夠在我們設定好的時間內運行。
在 Windows 上我們可以通過計劃任務來手動實現,而在 Linux 系統上往往我們會用到更多關於 crontab 的相關操作。但手動管理並不是一個很好的選擇,如果我們需要有十幾個不同的定時任務需要管理,那麼每次通過人工來進行干預未免有些笨拙,那這時候就真的是「人工智能」了。
所以將這些定時任務的調度代碼化纔是能夠讓我們很好地從這種手動管理的純人力操作中解脫出來。
在 Python 生態中對於定時任務的一些操作主要有那麼幾個:
-
schedule
:第三方模塊,該模塊適合比較輕量級的一些調度任務,但卻不適用於複雜時間的調度 -
APScheduler
:第三方定時任務框架,是對 Java 第三方定時任務框架Quartz
的模仿與移植,能提供比schedule
更復雜的應用場景,並且各種組件都是模塊化,易於使用與二次開發。 -
Celery Beat
:屬於celery
這分佈式任務隊列第三方庫下的一個定時任務組件,如果使用需要配合 RabbitMQ 或 Redis 這類的消息隊列套件,需要花費一定的時間在環境搭建上,但在高版本中已經不支持 Windows。
所以爲了滿足能夠相對複雜的時間條件,又不需要在前期的環境搭建上花費很多時間的前提下,選擇 APScheduler
來對我們的調度任務或定時任務進行管理是個性價比極高的選擇。而本文主要會帶你快速上手有關 APScheduler
的使用。
APScheduler 概念與組件
雖然說官方文檔上的內容不是很多,而且所列舉的 API 不是很多,但這側面也反映了這一框架的簡單易用。所以在使用 APScheduler
之前,我們需要對這個框架的一些概念簡單瞭解,主要有那麼以下幾個:
-
觸發器(trigger)
-
任務持久化(job stores)
-
執行器(executor)
-
調度器(scheduler)
觸發器(trigger)
所謂的觸發器就是用以觸發定時任務的組件,在 APScheduler
中主要是指時間觸發器,並且主要有三類時間觸發器可供使用:
-
date
:日期觸發器。日期觸發器主要是在某一日期時間點上運行任務時調用,是APScheduler
裏面最簡單的一種觸發器。所以通常也適用於一次性的任務或作業調度。 -
interval
:間隔觸發器。間隔觸發器是在日期觸發器基礎上擴展了對時間部分,比如時、分、秒、天、周這幾個部分的設定。是我們用以對重複性任務進行設定或調度的一個常用調度器。設定了時間部分之後,從起始日期開始(默認是當前)會按照設定的時間去執行任務。 -
cron
:cron
表達式觸發器。cron
表達式觸發器就等價於我們 Linux 上的 crontab,它主要用於更復雜的日期時間進行設定。但需要注意的是,APScheduler
不支持 6 位及以上的 cron 表達式,最多隻支持到 5 位。
任務持久化(job stores)
任務持久化主要是用於將設定好的調度任務進行存儲,即便是程序因爲意外情況,如斷電、電腦或服務器重啓時,只要重新運行程序時,APScheduler
就會根據對存儲好的調度任務結果進行判斷,如果出現已經過期但未執行的情況會進行相應的操作。
APScheduler
爲我們提供了多種持久化任務的途徑,默認是使用 memory
也就是內存的形式,但內存並不是持久化最好的方式。最好的方式則是通過像數據庫這樣的載體來將我們的定時任務寫入到磁盤當中,只要磁盤沒有損壞就能將數據給恢復。
APScheduler
支持的且常用的數據庫主要有:
-
sqlalchemy
形式的數據庫,這裏就主要是指各種傳統的關係型數據庫,如 MySQL、PostgreSQL、SQLite 等。 -
mongodb
非結構化的 Mongodb 數據庫,該類型數據庫經常用於對非結構化或版結構化數據的存儲或操作,如 JSON。 -
redis
內存數據庫,通常用作數據緩存來使用,當然通過一些主從複製等方式也能實現當中數據的持久化或保存。
通常我們可以在創建 Scheduler
實例時創建,或是單獨爲任務指定。配置的方式相對簡單,我們只需要指定對應的數據庫鏈接即可。
執行器(executor)
執行器顧名思義就是執行我們任務的對象,在計算機內通常要麼是 CPU 調度任務,要麼是單獨維護一個線程來運行任務。所以 APScheduler
裏的執行器通常就是 ThreadPoolExecutor
或 ProcessPoolExecutor
這樣的線程池和進程池兩種。
當然如果是和協程或異步相關的任務調度,還可以使用對應的 AsyncIOExecutor
、TwistedExecutor
和 GeventExecutor
三種執行器。
調度器(scheduler)
調度器的選擇主要取決於你當前的程序環境以及 APScheduler
的用途。根據用途的不同,APScheduler
又提供了以下幾種調度器:
-
BlockingScheduler
:阻塞調度器,當程序中沒有任何存在主進程之中運行東西時,就則使用該調度器。 -
BackgroundScheduler
:後臺調度器,在不使用後面任何的調度器且希望在應用程序內部運行時的後臺啓動時才進行使用,如當前你已經開啓了一個 Django 或 Flask 服務。 -
AsyncIOScheduler
:AsyncIO
調度器,如果代碼是通過asyncio
模塊進行異步操作,使用該調度器。 -
GeventScheduler
:Gevent
調度器,如果代碼是通過gevent
模塊進行協程操作,使用該調度器 -
TornadoScheduler
:Tornado
調度器,在Tornado
框架中使用 -
TwistedScheduler
:Twisted
調度器,在基於Twisted
的框架或應用程序中使用 -
QtScheduler
:Qt
調度器,在構建Qt
應用中進行使用。
通常情況下如果不是和 Web 項目或應用集成共存,那麼往往都首選 BlockingScheduler
調度器來進行操作,它會在當前進程中啓動相應的線程來進行任務調度與處理;反之,如果是和 Web 項目或應用共存,那麼需要選擇 BackgroundScheduler
調度器,因爲它不會干擾當前應用的線程或進程狀況。
基於對以上的概念和組件認識,我們就能基本上摸清 APScheduler
的運行流程:
-
設定調度器(scheduler)用以對任務的調度與安排進行全局統籌
-
對相應的函數或方法上設定相應的觸發器(trigger),並添加到調度器中
-
如有任務持久化(job stores)需要則需要設定對應的持久化層,否則默認使用內存存儲任務
-
當觸發器被觸發時,就將任務交由執行器(executor)進行執行
APScheduler 快速上手
雖然 APScheduler
裏面的概念和組件看起來有點多,但在使用上並不算很複雜,我們可以通過本節的示例就能夠很快使用。
選擇對應的 scheduler
在使用之前我們需要先實例化一個 scheduler
對象,所有的 scheduler
對象都被放在了 apscheduler.schedulers
模塊下,我們可以直接通過查看 API 文檔或者藉助 IDE 補全的提示來獲取相應的 scheduler
對象。
這裏我直接選取了最基礎的 BlockingScheduler
:
1# main.py
2
3from apscheduler.schedulers.blocking import BlockingScheduler
4
5scheduler = BlockingScheduler()
6
7
配置 scheduler
對於 scheduler
的一些配置我們可以直接在實例化對象時就進行配置,當然也可以在創建實例化對象之後再進行配置。
實例化時進行參數配置:
1# main.py
2from datetime import datetime
3
4from apscheduler.executors.pool import ThreadPoolExecutor
5from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
6from apscheduler.schedulers.blocking import BlockingScheduler
7
8# 任務持久化 使用 SQLite
9jobstores = {
10 'default': SQLAlchemyJobStore(url = 'sqlite:///jobs.db')
11}
12# 執行器配置
13executors = {
14 'default': ThreadPoolExecutor(20),
15}
16# 關於 Job 的相關配置,見官方文檔 API
17job_defaults = {
18 'coalesce': False,
19 'next_run_time': datetime.now()
20}
21scheduler = BlockingScheduler(
22 jobstores = jobstores,
23 executors = executors,
24 job_defaults = job_defaults,
25 timezone = 'Asia/Shanghai'
26)
27
28
或是通過 scheduler.configure
方法進行同樣的操作:
1scheduler = BlockingScheduler()
2scheduler.configure(jobstores=jobstores, executors=executors, job_defaults=job_defaults, timezone='Asia/Shanghai')
3
4
添加並執行你的任務
創建 scheduler
對象之後,我們需要調用其下的 add_job()
或是 scheduled_job()
方法來將我們需要執行的函數進行註冊。前者是以傳參的形式指定對應的函數名,而後者則是以裝飾器的形式直接對我們要執行的函數進行修飾。
比如我現在有一個輸出此時此刻時間的函數 now()
:
1from datetime import datetime
2
3def now(trigger):
4 print(f"trigger:{trigger} -> {datetime.now()}")
5
6
然後我打算每 5 秒的時候運行一次,那我們使用 add_job()
可以這樣寫:
1if __name__ == '__main__':
2 scheduler.add_job(now, trigger = "interval", args = ("interval",), seconds = 5)
3 scheduler.start()
4
5
在調用 start()
方法之後調度器就會開始執行,並在控制檯上看到對應的結果了:
1trigger:interval -> 2021-01-16 21:19:43.356674
2trigger:interval -> 2021-01-16 21:19:46.679849
3trigger:interval -> 2021-01-16 21:19:48.356595
4
5
當然使用 @scheduled_job
的方式來裝飾我們的任務或許會更加自由一些,於是上面的例子就可以寫成這樣:
1@scheduler.scheduled_job(trigger = "interval", args = ("interval",), seconds = 5)
2def now(trigger):
3 print(f"trigger:{trigger} -> {datetime.now()}")
4
5if __name__ == '__main__':
6 scheduler.start()
7
8
運行之後就會在控制檯看到同樣的結果了。
不過需要注意的是,添加任務一定要在 start()
方法執行前調用,否則會找不到任務或是拋出異常。
將 APScheduler 集成到 Web 項目中
如果你是正在做有關的 Web 項目且存在一些定時任務,那麼得益於 APScheduler
由於多樣的調度器,我們能夠將其和我們的項目結合到一起。
如果你正在使用 Flask
,那麼 Flask-APScheduler
這一別人寫好的第三方包裝庫就很適合你,雖然它沒有相關的文檔,但只要你瞭解了前面我所介紹的有關於 APScheduler
的概念和組件,你就能很輕易地看懂這個第三方庫倉庫裏的示例代碼。
如果你使用的不是 Flask 框架,那麼 APScheduler
本身也提供了一些對任務或作業的增刪改查操作,我們可以自己編寫一套合適的 API。
這裏我使用的是 FastAPI 這一目前流行的 Web 框架。demo 項目結構如下:
1temp-scheduler
2├── config.py # 配置項
3├── main.py # API 文件
4└── scheduler.py # APScheduler 相關設置
5
6
安裝依賴
這裏我們需要的依賴不多,只需要簡單幾個即可:
1pip install fastapi apscheduler sqlalchemy uvicorn
2
3
配置項
如果項目中模塊過多,那麼使用一個文件或模塊來進行統一管理是最好的選擇。這裏的 config.py
我們主要像 Flask 的配置那樣簡單設定:
1from apscheduler.executors.pool import ThreadPoolExecutor
2from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
3from apscheduler.schedulers.blocking import BlockingScheduler
4
5class SchedulerConfig:
6
7 JOBSTORES = {"default": SQLAlchemyJobStore(url="sqlite:///job.db")}
8 EXECUTORS = {"default": ThreadPoolExecutor(20)}
9 JOB_DEFAULTS = {"coalesce": False}
10
11 @classmethod
12 def to_dict(cls):
13 return {
14 "jobstores": cls.JOBSTORES,
15 "executors": cls.EXECUTORS,
16 "job_defaults": cls.JOB_DEFAULTS,
17 }
18
19
在 SchedulerConfig
配置項中我們可以自己實現一個 to_dict()
類方法,以便我們後續傳參時通過解包的方式直接傳入配置參數即可。
Scheduler 相關設置
scheduler.py
模塊的設定也比較簡單,即設定對應的 scheduler
調度器即可。由於是演示 demo 我還將要定期執行的任務也放在了這個模塊當中:
1import logging
2from datetime import datetime
3
4from apscheduler.schedulers.background import BackgroundScheduler
5
6from config import SchedulerConfig
7
8scheduler = BackgroundScheduler()
9logger = logging.getLogger(__name__)
10
11def init_scheduler() -> None:
12 # config scheduler
13 scheduler.configure(**SchedulerConfig.to_dict())
14
15 logger.info("scheduler is running...")
16
17 # schedule test
18 scheduler.add_job(
19 func=mytask,
20 trigger="date",
21 args=("APScheduler Initialize.",),
22 next_run_time=datetime.now(),
23 )
24 scheduler.start()
25
26def mytask(message: str) -> None:
27 print(f"[{datetime.now()}] message: {message}")
28
29
在這一部分中:
-
init_scheduler()
方法主要用於在 API 服務啓動時被調用,然後對scheduler
對象的配置以及測試 -
mytask()
則是我們要定期執行的任務,後續我們可以通過 APScheduler 提供的方法來自行添加任務
API 設置
在 main.py
模塊就主要存放着我們由 FastAPI 所構建的相關 API。如果在後續開發時存在多個接口,此時就需要將不同接口放在不同模塊文件中,以達到路由的分發與管理,類似於 Flask 的藍圖模式。
1import logging
2import uuid
3from datetime import datetime
4from typing import Any, Dict, Optional, Sequence, Union
5
6from fastapi import FastAPI
7from pydantic import BaseModel
8
9from scheduler import init_scheduler, mytask, scheduler
10
11logger = logging.getLogger(__name__)
12
13app = FastAPI(title="APScheduler API")
14app.add_event_handler("startup", init_scheduler)
15
16class Job(BaseModel):
17 id: Union[int, str, uuid.UUID]
18 name: Optional[str] = None
19 func: Optional[str] = None
20 args: Optional[Sequence[Optional[str]]] = None
21 kwargs: Optional[Dict[str, Any]] = None
22 executor: Optional[str] = None
23 misfire_grace_time: Optional[str] = None
24 coalesce: Optional[bool] = None
25 max_instances: Optional[int] = None
26 next_run_time: Optional[Union[str, datetime]] = None
27
28@app.post("/add")
29def add_job(
30 message: str,
31 trigger: str,
32 trigger_args: Optional[dict],
33 id: Union[str, int, uuid.UUID],
34):
35 try:
36 scheduler.add_job(
37 func=mytask,
38 trigger=trigger,
39 kwargs={"message": message},
40 id=id,
41 **trigger_args,
42 )
43 except Exception as e:
44 logger.exception(e.args)
45 return {"status_code": 0, "message": "添加失敗"}
46 return {"status_code": 1, "message": "添加成功"}
47
48@app.delete("/delete/{id}")
49def delete_job(id: Union[str, int, uuid.UUID]):
50 """delete exist job by id"""
51 try:
52 scheduler.remove_job(job_id=id)
53 except Exception:
54 return dict(
55 message="刪除失敗",
56 status_code=0,
57 )
58 return dict(
59 message="刪除成功",
60 status_code=1,
61 )
62
63@app.put("/reschedule/{id}")
64def reschedule_job(
65 id: Union[str, int, uuid.UUID], trigger: str, trigger_args: Optional[dict]
66):
67 try:
68 scheduler.reschedule_job(job_id=id, trigger=trigger, **trigger_args)
69 except Exception as e:
70 logger.exception(e.args)
71 return dict(
72 message="修改失敗",
73 status_code=0,
74 )
75 return dict(
76 message="修改成功",
77 status_code=1,
78 )
79
80@app.get("/job")
81def get_all_jobs():
82 jobs = None
83 try:
84 job_list = scheduler.get_jobs()
85 if job_list:
86 jobs = [Job(**task.__getstate__()) for task in job_list]
87 except Exception as e:
88 logger.exception(e.args)
89 return dict(
90 message="查詢失敗",
91 status_code=0,
92 jobs=jobs,
93 )
94 return dict(
95 message="查詢成功",
96 status_code=1,
97 jobs=jobs,
98 )
99
100@app.get("/job/{id}")
101def get_job_by_id(id: Union[int, str, uuid.UUID]):
102 jobs = []
103 try:
104 job = scheduler.get_job(job_id=id)
105 if job:
106 jobs = [Job(**job.__getstate__())]
107 except Exception as e:
108 logger.exception(e.args)
109 return dict(
110 message="查詢失敗",
111 status_code=0,
112 jobs=jobs,
113 )
114 return dict(
115 message="查詢成功",
116 status_code=1,
117 jobs=jobs,
118 )
119
120
以上代碼看起來很多,其實核心的就那麼幾點:
-
FastAPI 對象
app
的初始化。這裏用到的add_event_handler()
方法就有點像 Flask 中的before_first_request
,會在 Web 服務請求伊始進行操作,理解爲初始化相關的操作即可。 -
API 接口路由。路由通過
app
對象下的對應 HTTP 方法來實現,如GET
、POST
、PUT
等。這裏的裝飾器用法其實也和 Flask 很類似,就不多贅述。 -
scheduler
對象的增刪改查。從scheduler.py
模塊中引入我們創建好的scheduler
對象之後就可以直接用來做增刪改查的操作: -
增:使用
add_job()
方法,其主要的參數是要運行的函數(或方法)、觸發器以及觸發器參數等 -
刪:使用
delete_job()
方法,我們需要傳入一個對應任務的id
參數,用以能夠查找到對應的任務 -
改:使用
reschedule_job()
方法,這裏也需要一個對應任務的id
參數,以及需要重新修改的觸發器及其參數 -
查:使用
get_jobs()
和get_job()
兩個方法,前者是直接獲取到當前調度的所有任務,返回的是一個包含了APScheduler.job.Job
對象的列表,而後者是通過id
參數來查找對應的任務對象;這裏我通過底層源碼使用__getstate__()
來獲取到任務的相關信息,這些信息我們通過事先設定好的Job
對象來對其進行序列化,最後將信息從接口中返回。
運行
完成以上的所有操作之後,我們就可以打開控制檯,進入到該目錄下並激活我們的虛擬環境,之後運行:
1uvicorn main:app
2
3
之後我們就能在 FastAPI 默認的地址 http://127.0.0.1:8000/docs
中看到關於全部接口的 Swagger 文檔頁面了:
fastapi 集成的 swagger 頁面
之後我們可以直接在文檔裏面或使用 Postman 來自己進行接口測試即可。
結尾
本文介紹了有關於 APScheduler
框架的概念及其用法,並進行了簡單的實踐。
得益於 APScheduler
的模塊化設計纔可以讓我們更方便地去理解、使用它,並將其運用到我們實際的開發過程中。
從 APScheduler 目前的 Github 倉庫代碼以及 issue 來看,作者已經在開始重構 4.0 版本,當中的一些源代碼和 API 也有較大的變動,相信在 4.0 版本中將會引入更多的新特性。
但如果現階段你正打算使用或已經使用 APScheduler 用於實際生產中,那麼希望本文能對會你有所幫助。
本文所演示的 Web 集成 APScheduler 項目已經上傳到我的碼雲倉庫 fastapi-apscheduler,感興趣的朋友可以隨時借鑑。
作者:100gle,練習時長不到兩年的非正經文科生一枚,喜歡敲代碼、寫寫文章、搗鼓搗鼓各種新事物;現從事有關大數據分析與挖掘的相關工作。
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/2dN4aCQpDEN4nkRg80SPYQ