Python 實現定時任務的八種方案!

在日常工作中,我們常常會用到需要週期性執行的任務,一種方式是採用 Linux 系統自帶的 crond[1] 結合命令行實現。另外一種方式是直接使用 Python。接下里整理的是常見的 Python 定時任務的實現方式。

img

利用 while True: + sleep() 實現定時任務

位於 time 模塊中的 sleep(secs) 函數,可以實現令當前執行的線程暫停 secs 秒後再繼續執行。所謂暫停,即令當前線程進入阻塞狀態,當達到 sleep() 函數規定的時間後,再由阻塞狀態轉爲就緒狀態,等待 CPU 調度。

基於這樣的特性我們可以通過 while 死循環 + sleep() 的方式實現簡單的定時任務。

代碼示例:

import datetime

import time

def time_printer():

    now = datetime.datetime.now()

    ts = now.strftime('%Y-%m-%d %H:%M:%S')

    print('do func time :', ts)

def loop_monitor():

    while True:

        time_printer()

        time.sleep(5)  # 暫停 5 秒

if __name__ == "__main__":

    loop_monitor()

主要缺點:

使用 Timeloop 庫運行定時任務

Timeloop[2] 是一個庫,可用於運行多週期任務。這是一個簡單的庫,它使用 decorator 模式在線程中運行標記函數。

示例代碼:

import time

from timeloop import Timeloop

from datetime import timedelta

tl = Timeloop()

@tl.job(interval=timedelta(seconds=2))

def sample_job_every_2s():

    print "2s job current time : {}".format(time.ctime())

@tl.job(interval=timedelta(seconds=5))

def sample_job_every_5s():

    print "5s job current time : {}".format(time.ctime())

@tl.job(interval=timedelta(seconds=10))

def sample_job_every_10s():

    print "10s job current time : {}".format(time.ctime())

利用 threading.Timer 實現定時任務

threading 模塊中的 Timer 是一個非阻塞函數,比 sleep 稍好一點,timer 最基本理解就是定時器,我們可以啓動多個定時任務,這些定時器任務是異步執行,所以不存在等待順序執行問題。

代碼示例:

import datetime

from threading import Timer

def time_printer():

    now = datetime.datetime.now()

    ts = now.strftime('%Y-%m-%d %H:%M:%S')

    print('do func time :', ts)

    loop_monitor()

def loop_monitor():

    t = Timer(5, time_printer)

    t.start()

if __name__ == "__main__":

    loop_monitor()

備註:Timer 只能執行一次,這裏需要循環調用,否則只能執行一次

利用內置模塊 sched 實現定時任務

代碼示例:

import datetime

import time

import sched

def time_printer():

    now = datetime.datetime.now()

    ts = now.strftime('%Y-%m-%d %H:%M:%S')

    print('do func time :', ts)

    loop_monitor()

def loop_monitor():

    s = sched.scheduler(time.time, time.sleep)  # 生成調度器

    s.enter(5, 1, time_printer, ())

    s.run()

if __name__ == "__main__":

    loop_monitor()

scheduler 對象主要方法:

個人點評:比 threading.Timer 更好,不需要循環調用。

利用調度模塊 schedule 實現定時任務

schedule[3] 是一個第三方輕量級的任務調度模塊,可以按照秒,分,小時,日期或者自定義事件執行時間。schedule[4] 允許用戶使用簡單、人性化的語法以預定的時間間隔定期運行 Python 函數(或其它可調用函數)。

先來看代碼,是不是不看文檔就能明白什麼意思?

import schedule

import time

def job():

    print("I'm working...")

schedule.every(10).seconds.do(job)

schedule.every(10).minutes.do(job)

schedule.every().hour.do(job)

schedule.every().day.at("10:30").do(job)

schedule.every(5).to(10).minutes.do(job)

schedule.every().monday.do(job)

schedule.every().wednesday.at("13:15").do(job)

schedule.every().minute.at(":17").do(job)

while True:

    schedule.run_pending()

    time.sleep(1)

裝飾器:通過 @repeat() 裝飾靜態方法

import time

from schedule import every, repeat, run_pending

@repeat(every().second)

def job():

    print('working...')

while True:

    run_pending()

    time.sleep(1)

傳遞參數:

import schedule

def greet(name):

    print('Hello', name)

schedule.every(2).seconds.do(greet, name='Alice')

schedule.every(4).seconds.do(greet, name='Bob')

while True:

    schedule.run_pending()

裝飾器同樣能傳遞參數:

from schedule import every, repeat, run_pending

@repeat(every().second, 'World')

@repeat(every().minute, 'Mars')

def hello(planet):

    print('Hello', planet)

while True:

    run_pending()

取消任務:

import schedule

i = 0

def some_task():

    global i

    i += 1

    print(i)

    if i == 10:

        schedule.cancel_job(job)

        print('cancel job')

        exit(0)

job = schedule.every().second.do(some_task)

while True:

    schedule.run_pending()

運行一次任務:

import time

import schedule

def job_that_executes_once():

    print('Hello')

    return schedule.CancelJob

schedule.every().minute.at(':34').do(job_that_executes_once)

while True:

    schedule.run_pending()

    time.sleep(1)

根據標籤檢索任務:

# 檢索所有任務:schedule.get_jobs()

import schedule

def greet(name):

    print('Hello {}'.format(name))

schedule.every().day.do(greet, 'Andrea').tag('daily-tasks''friend')

schedule.every().hour.do(greet, 'John').tag('hourly-tasks''friend')

schedule.every().hour.do(greet, 'Monica').tag('hourly-tasks''customer')

schedule.every().day.do(greet, 'Derek').tag('daily-tasks''guest')

friends = schedule.get_jobs('friend')

print(friends)

根據標籤取消任務:

# 取消所有任務:schedule.clear()

import schedule

def greet(name):

    print('Hello {}'.format(name))

    if name == 'Cancel':

        schedule.clear('second-tasks')

        print('cancel second-tasks')

schedule.every().second.do(greet, 'Andrea').tag('second-tasks''friend')

schedule.every().second.do(greet, 'John').tag('second-tasks''friend')

schedule.every().hour.do(greet, 'Monica').tag('hourly-tasks''customer')

schedule.every(5).seconds.do(greet, 'Cancel').tag('daily-tasks''guest')

while True:

    schedule.run_pending()

運行任務到某時間:

import schedule

from datetime import datetime, timedelta, time

def job():

    print('working...')

schedule.every().second.until('23:59').do(job)  # 今天 23:59 停止

schedule.every().second.until('2030-01-01 18:30').do(job)  # 2030-01-01 18:30 停止

schedule.every().second.until(timedelta(hours=8)).do(job)  # 8 小時後停止

schedule.every().second.until(time(23, 59, 59)).do(job)  # 今天 23:59:59 停止

schedule.every().second.until(datetime(2030, 1, 1, 18, 30, 0)).do(job)  # 2030-01-01 18:30 停止

while True:

    schedule.run_pending()

馬上運行所有任務(主要用於測試):

import schedule

def job():

    print('working...')

def job1():

    print('Hello...')

schedule.every().monday.at('12:40').do(job)

schedule.every().tuesday.at('16:40').do(job1)

schedule.run_all()

schedule.run_all(delay_seconds=3)  # 任務間延遲 3 秒

並行運行:使用 Python 內置隊列實現:

import threading

import time

import schedule

def job1():

    print("I'm running on thread %s" % threading.current_thread())

def job2():

    print("I'm running on thread %s" % threading.current_thread())

def job3():

    print("I'm running on thread %s" % threading.current_thread())

def run_threaded(job_func):

    job_thread = threading.Thread(target=job_func)

    job_thread.start()

schedule.every(10).seconds.do(run_threaded, job1)

schedule.every(10).seconds.do(run_threaded, job2)

schedule.every(10).seconds.do(run_threaded, job3)

while True:

    schedule.run_pending()

    time.sleep(1)

利用任務框架 APScheduler 實現定時任務

APScheduler[5](advanceded python scheduler)基於 Quartz 的一個 Python 定時任務框架,實現了 Quartz 的所有功能,使用起來十分方便。提供了基於日期、固定時間間隔以及 crontab 類型的任務,並且可以持久化任務。基於這些功能,我們可以很方便的實現一個 Python 定時任務系統。

它有以下三個特點:

APScheduler 有四種組成部分:

示例代碼:

from apscheduler.schedulers.blocking import BlockingScheduler

from datetime import datetime

# 輸出時間

def job():

    print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"))

# BlockingScheduler

sched = BlockingScheduler()

sched.add_job(my_job, 'interval'seconds=5, id='my_job_id')

sched.start()

APScheduler 中的重要概念

Job 作業

Job 作爲 APScheduler 最小執行單位。創建 Job 時指定執行的函數,函數中所需參數,Job 執行時的一些設置信息。

構建說明:

Trigger 觸發器

Trigger 綁定到 Job,在 scheduler 調度篩選 Job 時,根據觸發器的規則計算出 Job 的觸發時間,然後與當前時間比較確定此 Job 是否會被執行,總之就是根據 trigger 規則計算出下一個執行時間。

目前 APScheduler 支持觸發器:

觸發器參數:date

date 定時,作業只執行一次。

sched.add_job(my_job, 'date'run_date=date(2009, 11, 6)args=['text'])

sched.add_job(my_job, 'date'run_date=datetime(2019, 7, 6, 16, 30, 5)args=['text'])

觸發器參數:interval

interval 間隔調度

sched.add_job(job_function, 'interval'hours=2)

觸發器參數:cron

cron 調度

CronTrigger 可用的表達式:

hhOXu7

# 6-8,11-12 月第三個週五 00:00, 01:00, 02:00, 03:00 運行

sched.add_job(job_function, 'cron'month='6-8,11-12'day='3rd fri'hour='0-3')

# 每週一到週五運行 直到 2024-05-30 00:00:00

sched.add_job(job_function, 'cron'day_of_week='mon-fri'hour=5, minute=30, end_date='2024-05-30'

Executor 執行器

Executor 在 scheduler 中初始化,另外也可通過 scheduler 的 add_executor 動態添加 Executor。每個 executor 都會綁定一個 alias,這個作爲唯一標識綁定到 Job,在實際執行時會根據 Job 綁定的 executor 找到實際的執行器對象,然後根據執行器對象執行 Job。Executor 的種類會根據不同的調度來選擇,如果選擇 AsyncIO 作爲調度的庫,那麼選擇 AsyncIOExecutor,如果選擇 tornado 作爲調度的庫,選擇 TornadoExecutor,如果選擇啓動進程作爲調度,選擇 ThreadPoolExecutor 或者 ProcessPoolExecutor 都可以。Executor 的選擇需要根據實際的 scheduler 來選擇不同的執行器。目前 APScheduler 支持的 Executor:

Jobstore 作業存儲

Jobstore 在 scheduler 中初始化,另外也可通過 scheduler 的 add_jobstore 動態添加 Jobstore。每個 jobstore 都會綁定一個 alias,scheduler 在 Add Job 時,根據指定的 jobstore 在 scheduler 中找到相應的 jobstore,並將 job 添加到 jobstore 中。作業存儲器決定任務的保存方式, 默認存儲在內存中(MemoryJobStore),重啓後就沒有了。APScheduler 支持的任務存儲器有:

不同的任務存儲器可以在調度器的配置中進行配置(見調度器)

Event 事件

Event 是 APScheduler 在進行某些操作時觸發相應的事件,用戶可以自定義一些函數來監聽這些事件,當觸發某些 Event 時,做一些具體的操作。常見的比如。Job 執行異常事件 EVENT_JOB_ERROR。Job 執行時間錯過事件 EVENT_JOB_MISSED。

目前 APScheduler 定義的 Event:

Listener 表示用戶自定義監聽的一些 Event,比如當 Job 觸發了 EVENT_JOB_MISSED 事件時可以根據需求做一些其他處理。

調度器

Scheduler 是 APScheduler 的核心,所有相關組件通過其定義。scheduler 啓動之後,將開始按照配置的任務進行調度。除了依據所有定義 Job 的 trigger 生成的將要調度時間喚醒調度之外。當發生 Job 信息變更時也會觸發調度。

APScheduler 支持的調度器方式如下,比較常用的爲 BlockingScheduler 和 BackgroundScheduler

Scheduler 的工作流程

Scheduler 添加 job 流程:

Scheduler 調度流程:

使用分佈式消息系統 Celery 實現定時任務

Celery[6] 是一個簡單,靈活,可靠的分佈式系統,用於處理大量消息,同時爲操作提供維護此類系統所需的工具,也可用於任務調度。Celery 的配置比較麻煩,如果你只是需要一個輕量級的調度工具,Celery 不會是一個好選擇。

Celery 是一個強大的分佈式任務隊列,它可以讓任務的執行完全脫離主程序,甚至可以被分配到其他主機上運行。我們通常使用它來實現異步任務(async task)和定時任務(crontab)。異步任務比如是發送郵件、或者文件上傳,圖像處理等等一些比較耗時的操作 ,定時任務是需要在特定時間執行的任務。

需要注意,celery 本身並不具備任務的存儲功能,在調度任務的時候肯定是要把任務存起來的,因此在使用 celery 的時候還需要搭配一些具備存儲、訪問功能的工具,比如:消息隊列、Redis 緩存、數據庫等。官方推薦的是消息隊列 RabbitMQ,有些時候使用 Redis 也是不錯的選擇。

它的架構組成如下圖:

Celery 架構,它採用典型的生產者 - 消費者模式,主要由以下部分組成:

實際應用中,用戶從 Web 前端發起一個請求,我們只需要將請求所要處理的任務丟入任務隊列 broker 中,由空閒的 worker 去處理任務即可,處理的結果會暫存在後臺數據庫 backend 中。我們可以在一臺機器或多臺機器上同時起多個 worker 進程來實現分佈式地並行處理任務。

Celery 定時任務實例:

使用數據流工具 Apache Airflow 實現定時任務

Apache Airflow[9] 是 Airbnb 開源的一款數據流程工具,目前是 Apache 孵化項目。以非常靈活的方式來支持數據的 ETL 過程,同時還支持非常多的插件來完成諸如 HDFS 監控、郵件通知等功能。Airflow 支持單機和分佈式兩種模式,支持 Master-Slave 模式,支持 Mesos 等資源調度,有非常好的擴展性。被大量公司採用。

Airflow 使用 Python 開發,它通過 DAGs(Directed Acyclic Graph, 有向無環圖)來表達一個工作流中所要執行的任務,以及任務之間的關係和依賴。比如,如下的工作流中,任務 T1 執行完成,T2 和 T3 才能開始執行,T2 和 T3 都執行完成,T4 才能開始執行。

Airflow 提供了各種 Operator 實現,可以完成各種任務實現:

除了以上這些 Operators 還可以方便的自定義 Operators 滿足個性化的任務需求。

一些情況下,我們需要根據執行結果執行不同的任務,這樣工作流會產生分支。如:

這種需求可以使用 BranchPythonOperator 來實現。

Airflow 產生的背景

通常,在一個運維繫統,數據分析系統,或測試系統等大型系統中,我們會有各種各樣的依賴需求。包括但不限於:

crontab 可以很好地處理定時執行任務的需求,但僅能管理時間上的依賴。Airflow 的核心概念 DAG(有向無環圖)—— 來表現工作流。

Airflow 核心概念

Airflow 的架構

在一個可擴展的生產環境中,Airflow 含有以下組件:

Worker 的具體實現由配置文件中的 executor 來指定,airflow 支持多種 Executor:

生產環境一般使用 CeleryExecutor 和 KubernetesExecutor。

使用 CeleryExecutor 的架構如圖:

使用 KubernetesExecutor 的架構如圖:

其他參考:

參考資料

[1] Linux 系統自帶的 crond: https://www.biaodianfu.com/crontab.html

[2] Timeloop: https://github.com/sankalpjonn/timeloop

[3] schedule: https://github.com/dbader/schedule

[4] schedule: https://schedule.readthedocs.io/en/stable/

[5] APScheduler: https://github.com/agronholm/apscheduler

[6] Celery: https://github.com/celery/celery

[7] Python Celery & RabbitMQ Tutorial: https://tests4geeks.com/blog/python-celery-rabbitmq-tutorial/

[8] Celery 配置實踐筆記: https://github.com/biaodianfu/celery-demo

[9] Apache Airflow: https://airflow.apache.org/

[10] Dask: https://distributed.dask.org/en/latest/

[11] Getting started with Apache Airflow: https://towardsdatascience.com/getting-started-with-apache-airflow-df1aa77d7b1b

[12] Understanding Apache Airflow’s key concepts: https://medium.com/@dustinstansbury/understanding-apache-airflows-key-concepts-a96efed52b1a

轉自:www.biaodianfu.com/python-schedule.html

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/R711eavHKBbvPlZXtUZ8Dg