大規模運行 Apache Airflow 的經驗和教訓

作者 | Sam Wheating Megan Parker

譯者 | Sambodhi

策劃 | 羅燕珊

Apache Airflow 是一個能夠開發、調度和監控工作流的編排平臺。在 Shopify,我們已經在生產中運行了兩年多的 Airflow,用於各種工作流,包括數據提取、機器學習模型訓練、Apache Iceberg 表維護和 DBT 驅動的數據建模。在撰寫本文時,我們正通過 Celery 執行器和 MySQL 8 在 Kubernetes 上來運行 Airflow 2.2。

Shopify 在 Airflow 上的應用規模在過去兩年中急劇擴大。在我們最大的應用場景中,我們使用了 10000 多個 DAG,代表了大量不同的工作負載。在這個場景中,平均有 400 多項任務正在進行,並且每天的運行次數超過 14 萬次。由於 Shopify 的內部採用率越來越高,我們的 Airflow 部署將會產生更多的負載。因爲這樣的迅速增長,我們所面臨的困難包括:文件存取速度太慢、對 DAG(Directed acyclic graph,有向無環圖)能力的控制不足、流量水平的不規則、工作負載之間的資源爭用等等。

接下來,我們將與大家分享我們所獲得的經驗以及我們爲實現大規模運行 Airflow 而構建的解決方案。

使用雲端存儲時,文件存取速度可能會變慢

對於 Airflow 環境的性能和完整性,快速的文件存取速度至關重要。一個清晰的文件存取策略可以保證調度器能夠迅速地對 DAG 文件進行處理,並且讓你的作業保持更新。

通過重複掃描和重新解析配置的 DAG 目錄中的所有文件,可以保持其工作流的內部表示最新。這些文件必須經常掃描,以保持每個工作負載的磁盤數據源和其數據庫內部表示之間的一致性。這就意味着 DAG 目錄的內容必須在單一環境中的所有調度器和工作器之間保持一致(Airflow 提供了幾種方法來實現這一目標)。

在 Shopify 中,我們利用谷歌雲存儲(Google Cloud Storage,GCS)來存儲 DAG。我們最初部署 Airflow 時,利用 GCSFuse 在單一的 Airflow 環境中的所有工作器和調度器來維護一致的文件集。然而,在規模上,這被證明是一個性能瓶頸,因爲每個文件的存取都會引起對 GCS 的請求。由於在環境中的每一個 pod 都需要單獨掛在桶,所以存取量特別大。

經過幾次試驗,我們發現,在 Kubernetes 集羣上運行一個 NFS(Network file system,網絡文件系統)服務器,可以大大改善 Airflow 環境的性能。然後,我們把 NFS 服務器當作一個多讀多寫的卷轉進工作器和調度器的 pod 中。我們編寫了一個自定義腳本,使該卷的狀態與 GCS 同步,因此,當 DAG 被上傳或者管理時,用戶可以與 GCS 進行交互。這個腳本在同一個集羣內的單獨 pod 中運行。這使得我們可以有條件地在給定的桶中僅同步 DAG 的子集,或者根據環境的配置,將多個桶中的 DAG 同步到一個文件系統中(稍後會詳細闡述)。

總而言之,這爲我們提供了快速的文件存取作爲一個穩定的外部數據源,同時保持了我們快速添加或修改 Airflow 中 DAG 文件的能力。另外,我們還可以利用谷歌雲平臺的 IAM(識別和存取管理)功能來控制哪些用戶能夠上傳文件到特定的環境。例如,我們可以讓用戶直接將 DAG 直接上傳到 staging 環境,但將生產環境的上傳限制在我們的持續部署過程中。

在大規模運行 Airflow 時,確保快速文件存取的另一個考慮因素是你的文件處理性能。Airflow 具有高度的可配置性,可以通過多種方法調整後臺文件處理(例如排序模式、並行性和超時)。這使得你可以根據需求優化環境,以實現交互式 DAG 開發或調度器性能。

元數據數量的增加,可能會降低 Airflow 運行效率

在一個正常規模的 Airflow 部署中,由於元數據的數量而造成的性能降低並不是問題,至少在最初的幾年裏是這樣。

但是,從規模上看,元數據正在迅速地累積。一段時間之後,就可能開始對數據庫產生額外的負載。這一點在 Web 用戶界面的加載時間上就可以看得出來,尤其是 Airflow 的更新,在這段時間裏,遷移可能要花費數小時。

經過反覆試驗,我們確定了 28 天的元數據保存策略,並實施了一個簡單的 DAG,在 PythonOperator 中利用 ORM(對象關係映射)查詢,從任何包含歷史數據(DagRuns、TaskInstances、Logs、TaskRetries 等)的表中刪除行。我們之所以選擇 28 天,是因爲它可以讓我們有充足的歷史記錄來管理事件和跟蹤歷史工作績效,同時將數據庫中的數據量保持在合理的水平。

db_clean.py:

import logging
from datetime import datetime, timezone, timedelta
from sqlalchemy import delete
from airflow.models import DAG, Log, DagRun, TaskInstance, TaskReschedule, Variable
from airflow.jobs.base_job import BaseJob
from airflow.utils.dates import days_ago
from airflow.operators.python import PythonOperator
from airflow.utils.state import State
from airflow.utils.session import provide_session
EXPIRATION_WEEKS = 4
@provide_session
def delete_old_database_entries_by_model(table, date_col, session=None):
    expiration_date = datetime.now(timezone.utc) - timedelta(weeks=EXPIRATION_WEEKS)
    query = delete(table).where(date_col < expiration_date)
    if "state" in dir(table):
        query = query.where(State.RUNNING != "state")
    result = session.execute(query)
    logging.info(
        "Deleted %s rows from the database for the %s table that are older than %s.",
        result.rowcount,
        table,
        expiration_date,
    )
def delete_old_database_entries():
    if Variable.get("ENABLE_DB_TRUNCATION", "") != "True":
        logging.warning("This DAG will delete all data older than %s weeks.", EXPIRATION_WEEKS)
        logging.warning("To enable this, create an Airflow Variable called ENABLE_DB_TRUNCATION set to 'True'")
        logging.warning("Skipping truncation until explicitly enabled.")
        return
    delete_old_database_entries_by_model(TaskInstance, TaskInstance.end_date)
    delete_old_database_entries_by_model(DagRun, DagRun.end_date)
    delete_old_database_entries_by_model(BaseJob, BaseJob.end_date)
    delete_old_database_entries_by_model(Log, Log.dttm)
    delete_old_database_entries_by_model(TaskReschedule, TaskReschedule.end_date)
dag = DAG(
    "airflow-utils.truncate-database",
    start_date=days_ago(1),
    max_active_runs=1,
    dagrun_timeout=timedelta(minutes=20),
    schedule_interval="@daily",
    catchup=False,
)
PythonOperator(
    task_id="cleanup-old-database-entries",
    dag=dag,
    python_callable=delete_old_database_entries,
)

遺憾的是,這就意味着,在我們的環境中,Airflow 中的那些依賴於持久作業歷史的特性(例如,長時間的回填)並不被支持。這對我們來說並不是一個問題,但是它有可能會導致問題,這要取決於你的保存期和 Airflow 的使用情況。

作爲自定義 DAG 的另一種方法,Airflow 最近增加了對 db clean 命令的支持,可以用來刪除舊的元數據。這個命令在 Airflow 2.3 版本中可用。

DAG 可能很難與用戶和團隊關聯

在多租戶環境中運行 Airflow 時(尤其是在大型組織中),能夠將 DAG 追溯到個人或團隊是很重要的。爲什麼?因爲如果一個作業失敗了,拋出錯誤或干擾其他工作負載,我們的管理員可以迅速聯繫到合適的用戶。

如果所有的 DAG 都直接從一個倉庫部署,我們可以簡單地使用 git blame 來追蹤工作的所有者。然而,由於我們允許用戶從自己的項目中部署工作負載(甚至在部署時動態生成作業),這就變得更加困難。

爲了方便追蹤 DAG 的來源,我們引入了一個 Airflow 命名空間的註冊表,並將其稱爲 Airflow 環境的清單文件。

sample_airflow_manifest.yaml:

projects:
  defaults: &defaults
    source_repository: 'https://github.com/my_organization/dag_repo'
    dag_source_bucket: 'my_organization_dags'
    constraints: &constraints
      airflow_celery_queues:
        - 'default'
      pools:
        - 'default'
  data_extracts:
    << : *defaults
    owner_email: 'etl-team@my-organization.com'
    source_repository: 'https://github.com/my_organization/airflow_extracts'
    constraints:
      << : *constraints
      namespaces:
        - 'etl-jobs'
      pools:
        - 'extracts'
  batch_processing:
    <<: *defaults
    owner_email: 'spark-team@my-organization.com'
    source_repository: 'https://github.com/Shopify/airflow_batch_jobs'
    constraints:
      <<: *constraints

清單文件是一個 YAML 文件,用戶必須爲他們的 DAG 註冊一個命名空間。在這個文件中,他們將包括作業的所有者和源 github 倉庫(甚至是源 GCS 桶)的信息,以及爲其 DAG 定義一些基本限制。我們爲每個環境維護一個單獨的清單,並將其與 DAG 一起上傳到 GCS。

DAG 作者有很大的權力

通過允許用戶直接編寫和上傳 DAG 到共享環境,我們賦予了他們很大的權力。由於 Airflow 是我們數據平臺的核心組成部分,它與許多不同的系統相聯繫,因此作業有廣泛的訪問權。雖然我們信任我們的用戶,但我們仍然希望對他們在特定的 Airflow 環境中能做什麼和不能做什麼保持一定程度的控制。這一點在規模上尤爲重要,因爲要讓 Airflow 管理員在所有作業進入生產之前對其進行審查是不現實的。

爲了創建一些基本的 “護欄”,我們採用了一個 DAG 策略,它從之前提到的 Airflow 清單中讀取配置,並通過引發 AirflowClusterPolicyViolation 來拒絕那些不符合其命名空間約束的 DAG。

根據清單文件的內容,該策略將對 DAG 文件應用一些基本限制,例如:

下面是一個簡化的例子,演示如何創建一個 DAG 策略,該策略讀取先前共享的清單文件,並實現上述前三項控制:

airflow_local_settings.py:

import os
from typing import List
import yaml
from airflow.exceptions import AirflowClusterPolicyViolation
from airflow.models import DAG
def validate_pools(dag: DAG, pools: List[str]) -> None:
    for task in dag.tasks:
        if task.pool not in pools:
            raise AirflowClusterPolicyViolation(
                f"DAG {dag.dag_id} cannot submit tasks to the pool: {task.pool}"
            )
def validate_queues(dag: DAG, queues: List[str]) -> None:
    for task in dag.tasks:
        if task.queue not in queues:
            raise AirflowClusterPolicyViolation(
                f"DAG {dag.dag_id} cannot submit tasks to the queue: {task.queue}"
            )
def dag_policy(dag: DAG) -> None:
    airflow_home = os.environ.get('AIRFLOW_HOME', '~/airflow')
    manifest_path = f"{airflow_home}/airflow_manifest.yaml"
    with open(manifest_path, "r", encoding="UTF-8") as manifest_file:
        manifest = yaml.safe_load(manifest_file)
    dag_namespace = dag.dag_id.split(".")[0]
    if dag_namespace not in manifest["projects"]:
        raise AirflowClusterPolicyViolation(
            f"Namespace {dag_namespace} is not registered in the Airflow Manifest."
        )
    constraints = manifest["projects"][dag_namespace]["constraints"]
    validate_pools(dag, constraints["pools"])
    validate_queues(dag, constraints["queues"])

這些驗證爲我們提供了足夠的可追溯性,同時也創造了一些基本的控制,從而減少了 DAG 之間的相互干擾能力。

很難確保負載的一致分佈

對你的 DAG 的計劃間隔中使用一個絕對的間隔是很有吸引力的:簡單地設置 DAG 每運行一次 timedelta(hours=1),你就可以放心地離開,因爲你知道 DAG 將大約每小時運行一次。然而,這可能會導致規模上的問題。

當用戶合併大量自動生成的 DAG,或者編寫一個 Python 文件,在解析時生成許多 DAG,所有的 DAGRuns 將在同一時間被創建。這會導致大量的流量,使 Airflow 調度器以及作業所使用的任何外部服務或基礎設施超載,比如 Trino 集羣。

在一個 schedule_interval 通過之後,所有這些作業將在同一時間再次運行,從而導致另一個流量激增。最終,這可能導致資源利用率不理想,執行時間增加。

雖然基於 crontab 的時間表不會導致這種激增,但它們也存在自己的問題。人類偏向於人類可讀的時間表,因此傾向於創建在整點、每小時、每晚的午夜運行的作業,等等。有時候,它可以爲某一特定的應用提供一個合理的理由(比如,我們希望在每個晚上半夜收集前一天的數據),但是我們常常會發現,用戶僅僅希望在一個固定的時間間隔內運行他們的作業。如果用戶可以直接指定自己的 crontab,那麼將會造成流量的激增,這將會對 SLO 造成影響,並且使外部系統的負載不平衡。

作爲這兩個問題的解決方案,我們對所有自動生成的 DAG(代表了我們絕大多數的工作流)使用一個確定性的隨機時間表間隔。這通常是基於一個恆定種子的哈希值,如 dag_id。

下面的片段提供了一個簡單的函數示例,該函數生成確定性的、隨機的 crontab,產生恆定的時間表間隔。遺憾的是,由於並非全部間隔都可以用 crontab 表示,因此它會限制可能的間隔範圍。我們並沒有發現這種有限的時間表間隔的選擇是有侷限性的,在我們確實需要每五小時運行一個作業的情況下,我們只是接受每天會有一個四小時的間隔。

random_airflow_schedule.py:

from hashlib import md5
from random import randint, seed
def compute_schedule(dag_id, interval):
    # create deterministic randomness by seeding PRNG with a hash of the table name:
    seed(md5(dag_id.encode()).hexdigest())
    if interval.endswith("h"):
        val = int(interval[:-1])
        if 24 % val != 0:
            raise ValueError("Must use a number which evenly divides 24.")
        offset = randint(0, val-1)
        minutes = randint(0, 59)
        return f"{minutes} {offset}/{val} * * *"
    elif interval.endswith("m"):
        val = int(interval[:-1])
        if 60 % val != 0:
            raise ValueError("Minutes must use a number which evenly divides 60.")
        offset = randint(0, val-1)
        return f"{offset}/{val} * * * *"
    elif interval == "1d":
        return f"{randint(0, 59)} {randint(0, 23)} * * *"
    raise ValueError("Interval must be (1, 2, 3, 4, 5, 6, 10, 12, 15, 20, 30)m, (1, 2, 3, 4, 6, 12)h or 1d")

得益於我們的隨機時間表的實施,我們能夠大大平滑負載。下圖顯示了在我們最大的單一 Airflow 環境中,每 10 分鐘完成的任務數。

在我們的生產 Airflow 環境中,每 10 分鐘執行一次任務

存在許多資源爭用點

在 Airflow 中,存在着很多可能的資源爭用點,通過一系列實驗性的配置改變,最終很容易出現瓶頸問題。其中一些資源衝突可以在 Airflow 內部處理,而另一些可能需要一些基礎設施的改變。以下是我們在 Shopify 的 Airflow 中處理資源爭用的幾種方法:

減少資源爭用的一種方法是使用 Airflow 池。池用於限制一組特定任務的併發性。這對於減少流量激增引起的中斷非常有用。雖然池是執行任務隔離的有用工具,但由於只有管理員可以通過 Web UI 編輯池,因此在管理上是一個挑戰。

我們編寫了一個自定義的 DAG,通過一些簡單的 ORM 查詢,將我們環境中的池與 Kubernetes Configmao 中指定的狀態同步。這讓我們可以在管理 Airflow 部署配置的同時管理池,並允許用戶通過審查的拉取請求來更新池,而不需要提升訪問權限。

優先級權重

Priority_weight 允許你爲一個給定的任務分配一個更高的優先級。具有較高優先級的任務將 “浮動” 到堆的頂部,被首先安排。雖然不是資源爭用的直接解決方案,但 priority_weight 對於確保延遲敏感的關鍵任務在低優先級任務之前運行是很有用的。然而,鑑於 priority_weight 是一個任意的尺度,如果不與所有其他任務進行比較,就很難確定一個任務的實際優先級。我們用它來確保我們的基本 Airflow 監控 DAG(它發出簡單的指標併爲一些警報提供動力)總是儘可能及時地運行。

同樣值得注意的是,在默認情況下,一個任務在做調度決策時使用的有效 priority_weight 是其自身和所有下游任務的權重之和。這意味着,大 DAG 中的上游任務往往比小 DAG 中的任務更受青睞。因此,使用 priority_weight 需要對環境中運行的其他 DAG 有一定了解。

Celery 隊列和孤立的工作器

如果你需要你的任務在不同的環境中執行(例如,依賴不同的 python 庫,密集型任務有更高的資源允許量,或者不同的存取級別),你可以創建額外的隊列,由作業的一個子集提交任務。然後,單獨的工作集可以被配置爲從單獨的隊列中提取。可以使用運算符中的 queue 參數將任務分配到一個單獨的隊列。要啓動一個從不同隊列運行任務的工作者,可以使用以下命令:

bashAirflow celery worker -queues <list of queues>

這可以幫助確保敏感或高優先級的工作負載有足夠的資源,因爲它們不會與其他工作負載競爭工作者的能力。

池、優先權和隊列的任何組合在減少資源爭用方面都是有用的。雖然池允許限制單個工作負載內的併發性,但 priority_weight 可以用來使單個任務以比其他任務更低的延遲運行。如果你需要更多的靈活性,工作者隔離可以對執行任務的環境進行細粒度的控制。

重要的是要記住,並不是所有的資源都可以在 Airflow 中被仔細分配:調度器吞吐量、數據庫容量和 Kubernetes IP 空間都是有限的資源,如果不創建隔離環境,就無法在每個工作負載的基礎上進行限制。

展望

以如此高的吞吐量運行 Airflow,需要考慮很多因素,任何解決方案的組合都是有用的。我們已經學到了很多,我們希望你能記住這些教訓,並在你自己的 Airflow 基礎設施和工具中應用我們的一些解決方案。

總結一下我們的主要收穫:

作者簡介:

Megan Parker,供職於 Shopify 的數據平臺團隊,致力於增強 Airflow 和 Trino 的用戶體驗,居住加拿大多倫多,愛好戶外活動,尤其是自行車和徒步運動。

Sam Wheating,來自加拿大不列顛哥倫比亞省溫哥華的高級開發人員。供職於 Shopify 的數據基礎設施和引擎基礎團隊。他是開源軟件的內部倡導者,也是 Apache Airflow 項目的貢獻者。

原文鏈接:

https://shopify.engineering/lessons-learned-apache-airflow-scale#circle=on

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/_uEHwq5fSxpCxzmbtqu8Gg