Python 自動化之定時任務

作者:錢魏 Way

原文:https://www.biaodianfu.com/python-schedule.html

在日常工作中,我們常常會用到需要週期性執行的任務,一種方式是採用 Linux 系統自帶的 crond 結合命令行實現。另外一種方式是直接使用 Python。接下來整理的是常見的 Python 定時任務的實現方式。

目錄

利用 while True: + sleep() 實現定時任務

位於 time 模塊中的 sleep(secs) 函數,可以實現令當前執行的線程暫停 secs 秒後再繼續執行。所謂暫停,即令當前線程進入阻塞狀態,當達到 sleep() 函數規定的時間後,再由阻塞狀態轉爲就緒狀態,等待 CPU 調度。

基於這樣的特性我們可以通過 while 死循環 + sleep() 的方式實現簡單的定時任務。

代碼示例:

import datetime
import time
def time_printer():
    now = datetime.datetime.now()
    ts = now.strftime('%Y-%m-%d %H:%M:%S')
    print('do func time :', ts)
def loop_monitor():
    while True:
        time_printer()
        time.sleep(5)  # 暫停5秒
if __name__ == "__main__":
    loop_monitor()

主要缺點:

使用 Timeloop 庫運行定時任務

Timeloop 是一個庫,可用於運行多週期任務。這是一個簡單的庫,它使用 decorator 模式在線程中運行標記函數。

示例代碼:

import time
from timeloop import Timeloop
from datetime import timedelta
tl = Timeloop()
@tl.job(interval=timedelta(seconds=2))
def sample_job_every_2s():
    print "2s job current time : {}".format(time.ctime())
@tl.job(interval=timedelta(seconds=5))
def sample_job_every_5s():
    print "5s job current time : {}".format(time.ctime())
@tl.job(interval=timedelta(seconds=10))
def sample_job_every_10s():
    print "10s job current time : {}".format(time.ctime())

利用 threading.Timer 實現定時任務

threading 模塊中的 Timer 是一個非阻塞函數,比 sleep 稍好一點,timer 最基本理解就是定時器,我們可以啓動多個定時任務,這些定時器任務是異步執行,所以不存在等待順序執行問題。

Timer(interval, function, args=[], kwargs={ })

代碼示例:

import datetime
from threading import Timer
def time_printer():
    now = datetime.datetime.now()
    ts = now.strftime('%Y-%m-%d %H:%M:%S')
    print('do func time :', ts)
    loop_monitor()
def loop_monitor():
    t = Timer(5, time_printer)
    t.start()
if __name__ == "__main__":
    loop_monitor()

備註:Timer 只能執行一次,這裏需要循環調用,否則只能執行一次

利用內置模塊 sched 實現定時任務

sched 模塊實現了一個通用事件調度器,在調度器類使用一個延遲函數等待特定的時間,執行任務。同時支持多線程應用程序,在每個任務執行後會立刻調用延時函數,以確保其他線程也能執行。

class sched.scheduler(timefunc, delayfunc) 這個類定義了調度事件的通用接口,它需要外部傳入兩個參數,timefunc 是一個沒有參數的返回時間類型數字的函數 (常用使用的如 time 模塊裏面的 time),delayfunc 應該是一個需要一個參數來調用、與 timefunc 的輸出兼容、並且作用爲延遲多個時間單位的函數 (常用的如 time 模塊的 sleep)。

代碼示例:

import datetime
import time
import sched
def time_printer():
    now = datetime.datetime.now()
    ts = now.strftime('%Y-%m-%d %H:%M:%S')
    print('do func time :', ts)
    loop_monitor()
def loop_monitor():
    s = sched.scheduler(time.time, time.sleep)  # 生成調度器
    s.enter(5, 1, time_printer, ())
    s.run()
if __name__ == "__main__":
    loop_monitor()

scheduler 對象主要方法:

個人點評:比 threading.Timer 更好,不需要循環調用。

利用調度模塊 schedule 實現定時任務

schedule 是一個第三方輕量級的任務調度模塊,可以按照秒,分,小時,日期或者自定義事件執行時間。schedule 允許用戶使用簡單、人性化的語法以預定的時間間隔定期運行 Python 函數 (或其它可調用函數)。

先來看代碼,是不是不看文檔就能明白什麼意思?

import schedule
import time
def job():
    print("I'm working...")
schedule.every(10).seconds.do(job)
schedule.every(10).minutes.do(job)
schedule.every().hour.do(job)
schedule.every().day.at("10:30").do(job)
schedule.every(5).to(10).minutes.do(job)
schedule.every().monday.do(job)
schedule.every().wednesday.at("13:15").do(job)
schedule.every().minute.at(":17").do(job)
while True:
    schedule.run_pending()
    time.sleep(1)

裝飾器:通過 @repeat() 裝飾靜態方法

import time
from schedule import every, repeat, run_pending
@repeat(every().second)
def job():
    print('working...')
while True:
    run_pending()
    time.sleep(1)

傳遞參數:

import schedule
def greet(name):
    print('Hello', name)
schedule.every(2).seconds.do(greet, name='Alice')
schedule.every(4).seconds.do(greet, name='Bob')
while True:
    schedule.run_pending()

裝飾器同樣能傳遞參數:

from schedule import every, repeat, run_pending
@repeat(every().second, 'World')
@repeat(every().minute, 'Mars')
def hello(planet):
    print('Hello', planet)
while True:
    run_pending()

取消任務:

import schedule
i = 0
def some_task():
    global i
    i += 1
    print(i)
    if i == 10:
        schedule.cancel_job(job)
        print('cancel job')
        exit(0)
job = schedule.every().second.do(some_task)
while True:
    schedule.run_pending()

運行一次任務:

import time
import schedule
def job_that_executes_once():
    print('Hello')
    return schedule.CancelJob
schedule.every().minute.at(':34').do(job_that_executes_once)
while True:
    schedule.run_pending()
    time.sleep(1)

根據標籤檢索任務:

# 檢索所有任務:schedule.get_jobs()
import schedule
def greet(name):
    print('Hello {}'.format(name))
schedule.every().day.do(greet, 'Andrea').tag('daily-tasks', 'friend')
schedule.every().hour.do(greet, 'John').tag('hourly-tasks', 'friend')
schedule.every().hour.do(greet, 'Monica').tag('hourly-tasks', 'customer')
schedule.every().day.do(greet, 'Derek').tag('daily-tasks', 'guest')
friends = schedule.get_jobs('friend')
print(friends)

根據標籤取消任務:

# 取消所有任務:schedule.clear()
import schedule
def greet(name):
    print('Hello {}'.format(name))
    if name == 'Cancel':
        schedule.clear('second-tasks')
        print('cancel second-tasks')
schedule.every().second.do(greet, 'Andrea').tag('second-tasks', 'friend')
schedule.every().second.do(greet, 'John').tag('second-tasks', 'friend')
schedule.every().hour.do(greet, 'Monica').tag('hourly-tasks', 'customer')
schedule.every(5).seconds.do(greet, 'Cancel').tag('daily-tasks', 'guest')
while True:
    schedule.run_pending()

運行任務到某時間:

import schedule
from datetime import datetime, timedelta, time
def job():
    print('working...')
schedule.every().second.until('23:59').do(job)  # 今天23:59停止
schedule.every().second.until('2030-01-01 18:30').do(job)  # 2030-01-01 18:30停止
schedule.every().second.until(timedelta(hours=8)).do(job)  # 8小時後停止
schedule.every().second.until(time(23, 59, 59)).do(job)  # 今天23:59:59停止
schedule.every().second.until(datetime(2030, 1, 1, 18, 30, 0)).do(job)  # 2030-01-01 18:30停止
while True:
    schedule.run_pending()

馬上運行所有任務(主要用於測試):

import schedule
def job():
    print('working...')
def job1():
    print('Hello...')
schedule.every().monday.at('12:40').do(job)
schedule.every().tuesday.at('16:40').do(job1)
schedule.run_all()
schedule.run_all(delay_seconds=3)  # 任務間延遲3秒

並行運行:使用 Python 內置隊列實現:

import threading
import time
import schedule
def job1():
    print("I'm running on thread %s" % threading.current_thread())
def job2():
    print("I'm running on thread %s" % threading.current_thread())
def job3():
    print("I'm running on thread %s" % threading.current_thread())
def run_threaded(job_func):
    job_thread = threading.Thread(target=job_func)
    job_thread.start()
schedule.every(10).seconds.do(run_threaded, job1)
schedule.every(10).seconds.do(run_threaded, job2)
schedule.every(10).seconds.do(run_threaded, job3)
while True:
    schedule.run_pending()
    time.sleep(1)

利用任務框架 APScheduler 實現定時任務

APScheduler(advanceded python scheduler)基於 Quartz 的一個 Python 定時任務框架,實現了 Quartz 的所有功能,使用起來十分方便。提供了基於日期、固定時間間隔以及 crontab 類型的任務,並且可以持久化任務。基於這些功能,我們可以很方便的實現一個 Python 定時任務系統。

它有以下三個特點:

APScheduler 有四種組成部分:

示例代碼:

from apscheduler.schedulers.blocking import BlockingScheduler
from datetime import datetime
# 輸出時間
def job():
    print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
# BlockingScheduler
sched = BlockingScheduler()
sched.add_job(my_job, 'interval', seconds=5, id='my_job_id')
sched.start()

APScheduler 中的重要概念

Job 作業

Job 作爲 APScheduler 最小執行單位。創建 Job 時指定執行的函數,函數中所需參數,Job 執行時的一些設置信息。

構建說明:

Trigger 觸發器

Trigger 綁定到 Job,在 scheduler 調度篩選 Job 時,根據觸發器的規則計算出 Job 的觸發時間,然後與當前時間比較確定此 Job 是否會被執行,總之就是根據 trigger 規則計算出下一個執行時間。

目前 APScheduler 支持觸發器:

觸發器參數:date

date 定時,作業只執行一次。

sched.add_job(my_job, 'date', run_date=date(2009, 11, 6), args=['text'])
sched.add_job(my_job, 'date', run_date=datetime(2019, 7, 6, 16, 30, 5), args=['text'])

觸發器參數:interval

interval 間隔調度

sched.add_job(job_function, 'interval', hours=2)

觸發器參數:cron

cron 調度

CronTrigger 可用的表達式:

ptLClN

# 6-8,11-12月第三個週五 00:00, 01:00, 02:00, 03:00運行
sched.add_job(job_function, 'cron', month='6-8,11-12', day='3rd fri', hour='0-3')
# 每週一到週五運行 直到2024-05-30 00:00:00
sched.add_job(job_function, 'cron', day_of_week='mon-fri', hour=5, minute=30, end_date='2024-05-30'

Executor 執行器

Executor 在 scheduler 中初始化,另外也可通過 scheduler 的 add_executor 動態添加 Executor。每個 executor 都會綁定一個 alias,這個作爲唯一標識綁定到 Job,在實際執行時會根據 Job 綁定的 executor 找到實際的執行器對象,然後根據執行器對象執行 Job。

Executor 的種類會根據不同的調度來選擇,如果選擇 AsyncIO 作爲調度的庫,那麼選擇 AsyncIOExecutor,如果選擇 tornado 作爲調度的庫,選擇 TornadoExecutor,如果選擇啓動進程作爲調度,選擇 ThreadPoolExecutor 或者 ProcessPoolExecutor 都可以。

Executor 的選擇需要根據實際的 scheduler 來選擇不同的執行器。目前 APScheduler 支持的 Executor:

Jobstore 作業存儲

Jobstore 在 scheduler 中初始化,另外也可通過 scheduler 的 add_jobstore 動態添加 Jobstore。每個 jobstore 都會綁定一個 alias,scheduler 在 Add Job 時,根據指定的 jobstore 在 scheduler 中找到相應的 jobstore,並將 job 添加到 jobstore 中。作業存儲器決定任務的保存方式, 默認存儲在內存中(MemoryJobStore),重啓後就沒有了。APScheduler 支持的任務存儲器有:

不同的任務存儲器可以在調度器的配置中進行配置(見調度器)

Event 事件

Event 是 APScheduler 在進行某些操作時觸發相應的事件,用戶可以自定義一些函數來監聽這些事件,當觸發某些 Event 時,做一些具體的操作。常見的比如。Job 執行異常事件 EVENT_JOB_ERROR。Job 執行時間錯過事件 EVENT_JOB_MISSED。

目前 APScheduler 定義的 Event:

Listener 表示用戶自定義監聽的一些 Event,比如當 Job 觸發了 EVENT_JOB_MISSED 事件時可以根據需求做一些其他處理。

調度器

Scheduler 是 APScheduler 的核心,所有相關組件通過其定義。scheduler 啓動之後,將開始按照配置的任務進行調度。除了依據所有定義 Job 的 trigger 生成的將要調度時間喚醒調度之外。當發生 Job 信息變更時也會觸發調度。

APScheduler 支持的調度器方式如下,比較常用的爲 BlockingScheduler 和 BackgroundScheduler

Scheduler 的工作流程

Scheduler 添加 job 流程:

Scheduler 調度流程:

使用分佈式消息系統 Celery 實現定時任務

Celery 是一個簡單,靈活,可靠的分佈式系統,用於處理大量消息,同時爲操作提供維護此類系統所需的工具, 也可用於任務調度。Celery 的配置比較麻煩,如果你只是需要一個輕量級的調度工具,Celery 不會是一個好選擇。

Celery 是一個強大的分佈式任務隊列,它可以讓任務的執行完全脫離主程序,甚至可以被分配到其他主機上運行。我們通常使用它來實現異步任務(async task)和定時任務(crontab)。異步任務比如是發送郵件、或者文件上傳, 圖像處理等等一些比較耗時的操作 ,定時任務是需要在特定時間執行的任務。

需要注意,celery 本身並不具備任務的存儲功能,在調度任務的時候肯定是要把任務存起來的,因此在使用 celery 的時候還需要搭配一些具備存儲、訪問功能的工具,比如:消息隊列、Redis 緩存、數據庫等。官方推薦的是消息隊列 RabbitMQ,有些時候使用 Redis 也是不錯的選擇。

它的架構組成如下圖:

Celery 架構,它採用典型的生產者 - 消費者模式,主要由以下部分組成:

實際應用中,用戶從 Web 前端發起一個請求,我們只需要將請求所要處理的任務丟入任務隊列 broker 中,由空閒的 worker 去處理任務即可,處理的結果會暫存在後臺數據庫 backend 中。我們可以在一臺機器或多臺機器上同時起多個 worker 進程來實現分佈式地並行處理任務。

Celery 定時任務實例:

使用數據流工具 Apache Airflow 實現定時任務

Apache Airflow 是 Airbnb 開源的一款數據流程工具,目前是 Apache 孵化項目。以非常靈活的方式來支持數據的 ETL 過程,同時還支持非常多的插件來完成諸如 HDFS 監控、郵件通知等功能。Airflow 支持單機和分佈式兩種模式,支持 Master-Slave 模式,支持 Mesos 等資源調度,有非常好的擴展性。被大量公司採用。

Airflow 使用 Python 開發,它通過 DAGs(Directed Acyclic Graph, 有向無環圖) 來表達一個工作流中所要執行的任務,以及任務之間的關係和依賴。比如,如下的工作流中,任務 T1 執行完成,T2 和 T3 才能開始執行,T2 和 T3 都執行完成,T4 才能開始執行。

Airflow 提供了各種 Operator 實現,可以完成各種任務實現:

除了以上這些 Operators 還可以方便的自定義 Operators 滿足個性化的任務需求。

一些情況下,我們需要根據執行結果執行不同的任務,這樣工作流會產生分支。如:

這種需求可以使用 BranchPythonOperator 來實現。

Airflow 產生的背景

通常,在一個運維繫統,數據分析系統,或測試系統等大型系統中,我們會有各種各樣的依賴需求。包括但不限於:

crontab 可以很好地處理定時執行任務的需求,但僅能管理時間上的依賴。Airflow 的核心概念 DAG(有向無環圖)—— 來表現工作流。

Airflow 核心概念

Airflow 的架構

在一個可擴展的生產環境中,Airflow 含有以下組件:

Worker 的具體實現由配置文件中的 executor 來指定,airflow 支持多種 Executor:

生產環境一般使用 CeleryExecutor 和 KubernetesExecutor。

使用 CeleryExecutor 的架構如圖:

使用 KubernetesExecutor 的架構如圖:

其它參考:

Getting started with Apache Airflow

Understanding Apache Airflow’s key concepts

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/81HaUKI6x44jGvxKhU0Jeg