
在 airflow 2 迁移中,已弃用的 `{{ prev_execution_date }}` 模板变量需通过 `taskinstance.previous_ti.execution_date` 或 `dagrun.get_previous_dagrun()` 获取前次执行时间,而非依赖过时的模板变量。
Airflow 2 对调度语义进行了重构,明确区分了 逻辑时间(logical date)、数据区间(data interval) 和 执行上下文时间(execution time)。因此,旧版模板变量如 prev_execution_date 已被移除——它曾模糊地表示“上一次任务实例的 execution_date”,但该值在 Airflow 2 的 DAG 运行模型中不再具备明确定义(尤其在回填、手动触发或非标准调度场景下易引发歧义)。
✅ 正确替代方式有两种,适用于不同场景:
WebShop网上商店系统专注中小企业、个人的网上购物电子商务解决方案,淘宝商城系统用户/个人首选开店的购物系统!综合5500多用户的意见或建议,从功能上,界面美观上,安全性,易用性上等对网店系统进行了深度的优化,功能更加强大,界面模板可直接后台选择。WebShop网上商店系统特点:1 对于中小企业、个体、个人、店主和淘宝易趣等卖家,可利用WebShop快速建立购物网。2 源代码开放,利用WebS
-
获取前一个成功任务实例的执行时间(推荐用于任务级依赖判断)
使用 TaskInstance.previous_ti 属性(仅当存在且成功完成时可用):def my_task(ti: TaskInstance, **kwargs): if ti.previous_ti and ti.previous_ti.state == "success": prev_exec_time = ti.previous_ti.execution_date print(f"Previous successful task ran at: {prev_exec_time}") else: print("No previous successful task instance found") -
获取前一个 DAG 运行的执行时间(适用于 DAG 级调度逻辑)
使用 DagRun.get_previous_dagrun() 方法(返回前一个 DagRun 对象,含 execution_date):from airflow.models import DagRun def my_task(ti: TaskInstance, **kwargs): prev_dag_run = DagRun.get_previous_dagrun(dag_run=ti.dag_run) if prev_dag_run: print(f"Previous DAG run started at: {prev_dag_run.execution_date}") else: print("No previous DAG run exists")
⚠️ 注意事项:
- prev_data_interval_start_success 和 prev_data_interval_end_success 并非 prev_execution_date 的直接替代项——它们对应的是前一个成功运行所覆盖的数据区间端点(即 data interval),而 execution_date 在 Airflow 2 中已统一映射为 logical_date(即数据区间的起始时间)。二者语义不同,不可混用。
- ti.previous_ti 仅在当前任务有前置同名任务实例且其状态为 success 时才有效;若需容错(如跳过失败),应结合 state 判断。
- DagRun.get_previous_dagrun() 不校验状态,仅按 execution_date 降序查找最近的 DagRun,适合做调度链路判断,但不保证成功。
? 总结:
放弃模板变量思维,转向面向对象的上下文访问是 Airflow 2 的最佳实践。优先使用 ti.previous_ti.execution_date(任务粒度)或 DagRun.get_previous_dagrun().execution_date(DAG 粒度),并始终添加空值与状态校验,确保逻辑健壮性。









