Apache Airflow 2.x 深度指南:用 Python 编排一切的现代化工作流引擎

Source

一、什么是 Apache Airflow

Apache Airflow 是一个由 Airbnb 于 2014 年开源、2016 年进入 Apache 孵化器的工作流编排平台。它的核心理念可以用一句话概括:用 Python 代码定义、调度和监控你的工作流

与 shell 脚本或 crontab 定时任务不同,Airflow 将工作流抽象为有向无环图(DAG),提供了任务间的依赖管理、失败重试、可视化监控等一整套生产能力。

核心概念速览

Airflow 的架构围绕四个基础概念构建:

概念 含义 类比
DAG (有向无环图) 工作流的完整定义,由 @dag 装饰器或 DAG() 构造函数创建 一次"项目"的蓝图
Task (任务) DAG 中的一个最小执行单元 蓝图里的一个步骤
Operator (操作器) 定义 Task 具体做什么——可以是 Bash 命令、Python 函数、SQL 查询、Spark 作业等 每个步骤的动作模板
Scheduler (调度器) 持续轮询 DAG 目录,解析 DAG 文件,将到期 Task 放入执行队列 整个系统的"大脑"

除上述核心概念外,Airflow 的架构还包括 Executor(决定 Task 以何种方式运行,如本地进程、Celery 分布式或多容器 Kubernetes)、Web Server(提供 UI 监控与交互)以及 Metastore(PostgreSQL / MySQL,存储所有元数据与运行状态)。

一个最小的 Airflow DAG 长这样:

from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime

# 定义一个 DAG,每天凌晨 2 点运行,带有基础重试机制
with DAG(
    dag_id="hello_airflow",
    start_date=datetime(2025, 1, 1),
    schedule_interval="0 2 * * *",
    catchup=False,
    tags=["demo"],
) as dag:

    task_say_hello = BashOperator(
        task_id="say_hello",
        bash_command='echo "Hello from Airflow 2.x!"',
    )

这就是 Airflow 的魅力:工作流就是代码,可版本控制、可代码审查、可单元测试。


二、使用优点

1. Python 原生定义,零学习曲线

Airflow DAG 文件就是标准的 Python 脚本。你可以在 DAG 定义中使用任何 Python 语法——for 循环动态生成 Task、从配置文件读取参数、用 Jinja 模板注入变量。这意味着:

  • 无 DSL 学习成本:数据工程师的 Python 技能直接复用。
  • 无黑盒配置:所有逻辑都在代码中显式呈现,CR 一目了然。
  • 动态 DAG 生成:可以从 YAML / JSON 配置文件批量生成几十甚至上百个结构相似的 DAG。
# 动态生成多个 Task 的示例
task_list = []
for table in ["users", "orders", "products"]:
    task_list.append(
        BashOperator(
            task_id=f"backup_{table}",
            bash_command=f"pg_dump -t {table} > /backup/{table}.sql",
        )
    )
# 按索引建立顺序依赖
for i in range(len(task_list) - 1):
    task_list[i] >> task_list[i + 1]

2. 丰富且活跃的 Provider 生态

Airflow 2.x 引入了 Provider 包 机制,将各类第三方集成从核心仓库中解耦,独立发布和升级。目前已有超过 80+ 个官方 Provider:

  • 云平台:AWS、Azure、GCP、阿里云
  • 数据库:PostgreSQL、MySQL、Snowflake、BigQuery、ClickHouse
  • 计算引擎:Spark、Kubernetes、Docker、Databricks
  • 消息队列:Kafka、RabbitMQ、SQS

一个 Provider 包的安装可以极度精简:pip install apache-airflow-providers-amazon,即可在 DAG 中直接使用 S3Hook、EmrOperator 等组件。这套机制同时解决了旧版本"装一个 Airflow 附带 500 个依赖"的痛点。

3. 强大的可视化监控

Airflow Web UI 是业界公认的"金牌体验",2.x 版本中新增的 Grid View 更是将监控效率提升了一个档次:

  • Tree / Graph / Grid View:从不同维度观察 DAG 运行拓扑与历史状态。
  • 甘特图:一眼定位哪个 Task 是性能瓶颈。
  • Landing Time:追踪数据实际到达时间与期望时间的延迟。
  • Task 级别操作:直接在 UI 上 Clear、Retry、Mark Success / Failed,无需登录服务器。

在 2025 年的版本路线中,Airflow 正在向 Data-Aware Scheduling (AIP-48) 迈进——UI 将不仅展示 Task 状态,还会显示"本次运行是由哪个上游数据资产触发",进一步打通可观测性闭环。

4. 灵活到极致的调度能力

Airflow 支持多种调度触发方式,远超传统 cron 表达式的范畴:

调度方式 说明 示例
Cron 表达式 最经典的时间调度 0 6 * * 1-5(工作日早 6 点)
Timedelta 固定间隔,从 start_date 累加 datetime.timedelta(hours=4)
Dataset (AIP-48) 数据驱动的调度——"当某表有更新时触发" schedule=[Dataset("s3://bucket/sales/")]
External Trigger 由 API / CLI / 上游 DAG 主动触发 airflow dags trigger my_dag
Sensor 等待外部条件满足后继续 ExternalTaskSensor、S3KeySensor

Dataset 机制是 Airflow 2.4+ 最大亮点之一:它让跨 DAG 的依赖从"时间耦合"变成了"数据耦合"。两个独立团队分别维护的 DAG,只要声明对同一个 Dataset 的产消关系,Airflow 就能自动串起整个链路。

5. 弹性可扩展的架构设计

Airflow 的 Executor 模型支持从单机到超大规模集群的平滑演进:

本地开发: SequentialExecutor / LocalExecutor(单机多进程)
     ↓
中等规模: CeleryExecutor(多机分布式,Redis/RabbitMQ 做消息队列)
     ↓
超大规模: KubernetesExecutor(每个 Task 跑在独立 Pod 中,资源完全隔离)
     ↓
混合架构: 2025 年路线图中的 Edge Worker (AIP-72),
         可实现跨 VPC / 跨云的远程任务执行

关键点在于:切换 Executor 只需修改 airflow.cfg 一个配置项,DAG 代码完全不用改。这意味着团队可以从单机起步,等业务增长后再无缝迁到分布式架构。


三、使用场景

场景 1:数据 ETL / ELT 管道

这是 Airflow 最经典的主场。假设电商平台每天需要:

  1. 从 MySQL 抽取前一日的订单数据
  2. 在 Spark 中做聚合计算
  3. 将结果写入 ClickHouse 供 BI 查询
  4. 数据写入成功后通知下游报表系统

整条链路可以组织为一个 DAG,依赖关系和错误处理完全自动化。

场景 2:机器学习 Pipeline

模型训练不是一步到位的,而是"数据拉取 → 特征工程 → 训练 → 评估 → 部署"的级联任务。Airflow 可以将这些步骤编排在一起,并利用 BranchPythonOperator 实现条件分支——例如评估指标不达标时自动走"回退到旧模型"的分支。

结合 KubernetesPodOperator,每个训练步骤运行在独立的 GPU Pod 中,训练完即释放资源,成本可控。

场景 3:报表自动化

某金融机构每天需要生成上百份客户持仓报表,PDF 格式,通过邮件分发。传统做法是用 shell 脚本跑一堆 R / Python 脚本,出错了靠人工排查。迁移到 Airflow 后:

  • 每个报告生成步骤是一个 Task,失败自动重试并通知。
  • 使用 EmailOperator 在 DAG 末尾统一发送。
  • 耗时统计直接看甘特图,优化有据可依。

场景 4:DevOps 运维自动化

Airflow 不仅可以编排数据任务,也能编排基础设施操作:

  • 定时执行数据库备份 → 上传到 S3 → 清理过期备份。
  • 每月自动生成 SSL 证书过期清单,通知运维团队。
  • 大促前批量扩容 K8s 集群节点,大促结束后缩容。

场景 5:数据质量监控

结合 SQLCheckOperator 或 Great Expectations,可以构建数据质量监控 DAG:

每日凌晨 4 点:
  → 检查核心表行数是否 > 0
  → 检查关键字段空值率是否 < 阈值
  → 检查数值字段分布是否偏离历史基准
  → 任一检查失败 → 阻断下游 DAG + 发送告警

四、具体使用方式

4.1 安装

推荐使用 pip 配合约束文件安装,避免依赖冲突:

# 设置 Airflow 版本
AIRFLOW_VERSION=2.9.3
PYTHON_VERSION="$(python --version | cut -d ' ' -f 2 | cut -d '.' -f 1-2)"
CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-${PYTHON_VERSION}.txt"

pip install "apache-airflow==${AIRFLOW_VERSION}" --constraint "${CONSTRAINT_URL}"

初始化数据库并创建管理员用户:

airflow db init
airflow users create \
    --username admin \
    --firstname Admin \
    --lastname User \
    --role Admin \
    --email admin@example.com

启动所有组件(开发环境):

airflow standalone  # 一键启动 WebServer + Scheduler + 初始化

生产环境建议将 WebServer、Scheduler、Worker 拆分部署,并使用 PostgreSQL 替代默认的 SQLite。

4.2 实战:构建一个完整的数据管道 DAG

以下代码展示了一个典型的"API 拉取 → 数据清洗 → 入库 → 质量检查 → 通知"管道:

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.utils.dates import days_ago
from datetime import timedelta
import requests
import json

# ---------- 业务逻辑函数 ----------
def fetch_api_data(**context):
    """从公开 API 拉取数据并写入临时文件"""
    response = requests.get("https://jsonplaceholder.typicode.com/posts")
    response.raise_for_status()
    posts = response.json()

    # 将数据作为 XCom 传递给下游 Task
    file_path = "/tmp/posts.json"
    with open(file_path, "w") as f:
        json.dump(posts, f)
    context["task_instance"].xcom_push(key="data_file", value=file_path)
    print(f"Fetched {len(posts)} posts.")

def clean_data(**context):
    """读取原始数据,清洗后写回"""
    file_path = context["task_instance"].xcom_pull(
        key="data_file", task_ids="fetch_data"
    )
    with open(file_path, "r") as f:
        raw = json.load(f)

    cleaned = [
        {
            "id": p["id"],
            "user_id": p["userId"],
            "title": p["title"].strip(),
            "body_length": len(p["body"]),
        }
        for p in raw
    ]

    clean_path = "/tmp/posts_clean.json"
    with open(clean_path, "w") as f:
        json.dump(cleaned, f)
    print(f"Cleaned {len(cleaned)} records.")

# ---------- DAG 定义 ----------
default_args = {
    "owner": "data-team",
    "retries": 3,
    "retry_delay": timedelta(minutes=5),
    "email_on_failure": True,
    "email": ["data-alert@company.com"],
}

with DAG(
    dag_id="data_pipeline_demo",
    default_args=default_args,
    description="An end-to-end data pipeline: fetch -> clean -> load -> check",
    schedule_interval="0 5 * * *",      # 每天凌晨 5 点
    start_date=days_ago(1),
    catchup=False,
    tags=["production", "etl"],
) as dag:

    start = BashOperator(
        task_id="start_pipeline",
        bash_command='echo "Pipeline started at $(date)"',
    )

    fetch_data = PythonOperator(
        task_id="fetch_data",
        python_callable=fetch_api_data,
    )

    clean = PythonOperator(
        task_id="clean_data",
        python_callable=clean_data,
    )

    # 注意:这里仅展示语法,实际需要目标表提前建好
    create_table = PostgresOperator(
        task_id="create_table_if_not_exists",
        postgres_conn_id="my_postgres",
        sql="""
            CREATE TABLE IF NOT EXISTS public.posts (
                id INT PRIMARY KEY,
                user_id INT,
                title TEXT,
                body_length INT
            );
        """,
    )

    load_data = BashOperator(
        task_id="load_to_postgres",
        bash_command="""
            echo "Data loading simulation: $(wc -c < /tmp/posts_clean.json) bytes ready"
        """,
    )

    quality_check = BashOperator(
        task_id="quality_check",
        bash_command="""
            count=$(python -c "import json; print(len(json.load(open('/tmp/posts_clean.json'))))")
            if [ "$count" -lt 10 ]; then
                echo "ERROR: Too few records!"
                exit 1
            fi
            echo "Quality check passed: $count records"
        """,
    )

    notify = BashOperator(
        task_id="notify_success",
        bash_command='echo "Pipeline completed successfully!"',
    )

    # 声明 Task 之间的依赖关系
    start >> fetch_data >> clean >> [create_table, quality_check]
    create_table >> load_data
    [load_data, quality_check] >> notify

这个 DAG 展示了 Airflow 2.x 中最常用的几种模式:

  • PythonOperator:运行自定义 Python 函数,适合灵活的业务逻辑。
  • PostgresOperator:原生 SQL 执行,支持 postgres_conn_id 引用 Connection 配置。
  • XCom:Task 间轻量数据传递(xcom_push / xcom_pull),适合传文件路径、ID 等小数据。大数据量请使用外部存储(S3、共享文件系统)。
  • 依赖声明:>> 运算符语法糖清晰表达上下游关系,[a, b] >> c 表示 a 和 b 都成功后触发 c。

4.3 TaskFlow API:更 Pythonic 的写法

Airflow 2.0 引入的 TaskFlow API 将 Python 函数直接映射为 Task,消除了 XCom 的显式 push/pull 模板代码:

from airflow.decorators import dag, task
from datetime import datetime

@dag(
    schedule_interval=None,
    start_date=datetime(2025, 1, 1),
    catchup=False,
    tags=["taskflow"],
)
def taskflow_demo():
    @task
    def extract():
        """从多个数据源拉取,返回一个列表"""
        return [100, 200, 300, 400]

    @task
    def transform(raw_numbers: list):
        """对每个元素做计算"""
        return [x * 1.2 for x in raw_numbers]

    @task
    def load(processed: list):
        """写入目标系统"""
        print(f"Loading {len(processed)} records: {processed}")

    # 数据流向即依赖关系
    raw = extract()
    transformed = transform(raw)
    load(transformed)

taskflow_demo()

TaskFlow API 通过 Python 类型注解和函数返回值自动完成数据流转,让 DAG 代码简洁到几乎看不出框架痕迹。对于数据科学团队来说,这种风格的学习成本接近零。

4.4 生产部署架构建议

一个经过验证的中等规模部署方案:

┌─────────────────────────────────────────────────┐
│                   Nginx (HTTPS)                 │
├─────────────────┬───────────────────────────────┤
│  WebServer × 2  │     Scheduler × 2 (HA)        │
├─────────────────┴───────────────────────────────┤
│              PostgreSQL (Metastore)             │
├─────────────────────────────────────────────────┤
│         Redis / RabbitMQ (Broker)               │
├─────────────────────────────────────────────────┤
│    Celery Workers × N  (Task 执行节点)          │
└─────────────────────────────────────────────────┘
  • WebServer 和 Scheduler 各部署 2 个实例实现高可用。
  • Metastore 使用托管的 PostgreSQL(RDS / Cloud SQL),定期备份。
  • Broker 使用 Redis Sentinel 或 RabbitMQ 集群,承载 Task 消息。
  • Worker 按需水平扩展,配置 worker_autoscale 动态调节并发数。

五、与其他方案对比

Airflow vs Prefect vs Dagster vs Luigi

维度 Airflow 2.x Prefect 2.x Dagster Luigi
核心哲学 工作流编排 现代 Python 编排 数据资产管理 任务依赖管理
动态工作流 较弱,需变通 原生支持,运行时动态 良好 不支持
数据血缘 通过 OpenLineage 外挂 基础标签级别 原生一等公民
本地开发 需要 Scheduler + DB 极简,flow.run() 优秀,dagster dev 一般
生态丰富度 最多,80+ Provider 中等 深度整合 dbt/Spark 较少
UI 体验 成熟、功能全 现代、清爽 资产视角独特 基础
社区规模 最大(GitHub 36k+ stars) 中等(17k+ stars) 快速增长(11k+ stars) 较小
最佳场景 传统 ETL、批量调度 Python 原生快速原型 分析工程、数据湖管理 简单链式任务

选型建议

  • 选 Airflow:团队已有一定规模,需要稳定、成熟的调度方案,生态要求高。
  • 选 Prefect:数据科学团队为主,重视开发体验,需要运行时动态生成工作流。
  • 选 Dagster:以数据资产为核心的管理视角,重度使用 dbt,重视数据血缘。
  • Luigi:轻量级场景、不想引入 Redis/RabbitMQ 等中间件,但功能天花板较低。

六、实践建议与避坑指南

1. start_date 与 catchup 的配合

这是 Airflow 新人踩坑率最高的配置。核心规则:

  • start_date 不是"首次运行日期",而是"调度周期的逻辑起点"。
  • Airflow 默认会在首次激活时回填(catchup)从 start_date 到当前的所有历史周期。
  • 设置 catchup=False 只运行当前及未来的周期。
# 正确做法:不想回填历史时显式关闭
with DAG(..., catchup=False) as dag:
    ...

2. 保持 DAG 文件轻量

Scheduler 每隔 min_file_process_interval(默认 30 秒)会重新解析所有 DAG 文件。如果一个 DAG 文件顶部写了耗时的网络请求或数据库查询,会严重拖慢整个调度循环。

正确做法:将业务逻辑放在 PythonOperator 的 python_callable 函数内部,而非 DAG 文件的顶层代码。

# ❌ 错误:顶层执行 http 请求——Scheduler 每次解析都跑一次
import requests
config = requests.get("https://api.internal/config").json()

# ✅ 正确:将请求封装在 Task 函数内
def load_config(**context):
    config = requests.get("https://api.internal/config").json()
    ...

3. 管理 XCom 数据量

XCom 默认存储在 Metastore 数据库中,适合传递少量元数据(Task ID、文件路径、短字符串)。如果你试图通过 XCom 传递一份 50MB 的 DataFrame,既拖慢执行也拖垮数据库。

替代方案:将大数据写入 S3 / GCS / NFS,通过 XCom 只传递存储路径。

4. 合理使用 Sensor

Sensor 本质上是"轮询外部条件"的死循环,默认 poke_interval 为 60 秒。当系统中有多个 Sensor 同时运行时,会占用 Worker 槽位却不做实质计算。

  • 为 Sensor 设置合理的 timeout,避免无限等待。
  • 使用 Smart Sensor(Airflow 2.2+)合并同类 Sensor 的轮询逻辑,减少资源消耗。
  • 考虑用 Deferrable Operator(异步模式)替代同步 Sensor,在等待期间释放 Worker 槽位。

5. 版本与依赖管理

  • 始终使用约束文件(constraints)安装,否则 pip 可能拉取不兼容版本的依赖库导致诡异报错。
  • Provider 包应固定版本号:apache-airflow-providers-amazon==8.20.0,而非 >= 方式,避免 CI 环境与生产环境版本漂移。
  • 升级 Airflow 前,先用 airflow db upgrade --dry-run 预览数据库迁移脚本。

6. 监控与告警

自带的邮件告警适合小团队,但对于生产环境建议:

  • 将 Airflow 日志接入 ELK / Loki 做集中采集分析。
  • 配置 Prometheus Exporter 采集 Scheduler 心跳延迟、Task 队列积压数等关键指标。
  • 对 SLA Miss、DAG 解析失败率设置 Grafana 告警规则。

写在最后

Apache Airflow 2.x 已经从一个"灵活版 crontab"进化为成熟的工作流编排平台。它的 Provider 生态、数据感知调度能力以及向 Edge 架构的演进方向,都表明它仍在积极适应现代数据栈的需求变化。

如果你正在为团队选型工作流调度引擎,Airflow 2.x 的 Python 原生亲和力、超大规模社区以及完善的托管服务(GCP Cloud Composer、AWS MWAA、Astronomer)将成为你最稳妥的选择之一。