
本文详细介绍了如何在 Apache Airflow 中实现基于特定日期条件的任务流控制。通过利用 Airflow 的 Python Sensor,我们可以灵活地在 DAG 运行前检查自定义条件(例如,是否为每月的最后一个周二),并据此决定是否继续执行后续任务,从而避免不必要的资源消耗,优化工作流效率。
1. Airflow 条件执行概述
Apache Airflow 作为一个强大的工作流管理平台,通常按照预定义的任务依赖关系顺序执行任务。然而,在实际应用中,我们经常会遇到需要根据特定条件来决定是否执行某个任务流的场景。例如,一个数据同步或报告生成流程可能只在每月的特定日期(如月末、月初或某个特定的工作日)才需要运行,而在其他时间则应暂停或跳过。
在这种情况下,直接让 DAG 运行所有任务,然后在任务内部通过条件判断来跳过执行,不仅会消耗不必要的调度资源,也使得 DAG 运行日志变得复杂。更优雅的解决方案是在任务流的起点引入一个条件检查机制,只有当条件满足时,才触发后续任务的执行。Airflow 提供了多种实现方式,其中 Sensor(传感器)是处理此类前置条件检查的理想工具。
2. 深入理解 Airflow Sensor
Airflow Sensor 是一种特殊的 Operator,其主要职责是周期性地检查某个外部条件或内部状态,直到条件满足为止。一旦条件满足,Sensor 任务就会成功完成,并触发其下游任务的执行。如果条件在设定的超时时间内未能满足,Sensor 任务可以选择失败,从而阻止下游任务的运行。
在众多 Sensor 类型中,PythonSensor 提供了最大的灵活性。它允许用户定义一个任意的 Python 函数作为条件检查逻辑。该函数应返回 True 表示条件满足,False 表示条件不满足。这使得 PythonSensor 能够处理任何可以通过 Python 代码表达的复杂条件。
3. 实现“每月最后一个周二”条件检查
为了实现“只有当当前 Airflow 运行的 execution_date 是该月的最后一个周二时才继续执行”的逻辑,我们需要创建一个 Python 函数,并将其集成到 PythonSensor 中。
3.1 Python 条件函数开发
首先,定义一个 Python 函数来判断给定日期是否为该月的最后一个周二。
from datetime import datetime, timedelta
import calendar
def is_last_tuesday_of_month(execution_date: datetime, **context) -> bool:
"""
检查给定的 execution_date 是否是该月的最后一个周二。
"""
# 获取当前月份的最后一天
year = execution_date.year
month = execution_date.month
last_day_of_month = calendar.monthrange(year, month)[1]
# 从最后一天开始向前查找第一个周二
current_day = datetime(year, month, last_day_of_month)
# 遍历直到找到周二 (weekday() 返回 0-6,周一到周日)
# 周二的 weekday() 值为 1
while current_day.weekday() != calendar.TUESDAY:
current_day -= timedelta(days=1)
# 检查找到的周二是否与 execution_date 的日期部分相同
# 考虑到 execution_date 通常是 DAG 运行的开始时间,我们只关心日期部分
if current_day.date() == execution_date.date():
print(f"条件满足:{execution_date.date()} 是 {month} 月的最后一个周二。")
return True
else:
print(f"条件不满足:{execution_date.date()} 不是 {month} 月的最后一个周二。")
return False
函数说明:
- execution_date: 这是 Airflow 传递给任务的执行日期。我们将基于此日期进行判断。
- calendar.monthrange(year, month)[1]: 获取指定月份的天数,从而确定该月的最后一天。
- while current_day.weekday() != calendar.TUESDAY:: 从该月的最后一天开始,向前递减日期,直到找到一个周二。
- current_day.date() == execution_date.date(): 比较找到的最后一个周二的日期部分是否与 execution_date 的日期部分一致。
3.2 PythonSensor 配置与集成
接下来,我们将这个条件函数集成到 Airflow DAG 中,使用 PythonSensor 作为前置检查任务。
from airflow import DAG
from airflow.sensors.python import PythonSensor
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta
import calendar
# 导入上面定义的条件函数
# from your_module import is_last_tuesday_of_month
# 如果函数定义在同一个文件中,则不需要导入
# DAG 定义
with DAG(
dag_id='conditional_last_tuesday_run',
start_date=datetime(2023, 1, 1),
schedule_interval='0 0 * * *', # 每天运行一次,以便检查条件
catchup=False,
tags=['example', 'sensor', 'condition'],
) as dag:
# 定义 PythonSensor 任务
check_last_tuesday = PythonSensor(
task_id='check_if_last_tuesday_of_month',
python_callable=is_last_tuesday_of_month,
# op_kwargs 允许向 python_callable 传递额外的关键字参数
# 在这里,我们将 execution_date 显式传递,以确保函数使用正确的日期
op_kwargs={'execution_date': '{{ ds }}'}, # {{ ds }} 是 Airflow 模板,解析为执行日期字符串
poke_interval=60 * 5, # 每 5 分钟检查一次条件
timeout=60 * 60 * 24, # 最多等待 24 小时
mode='poke', # 使用 'poke' 模式,周期性检查
)
# 原始任务 T1-T5
# T1 = deletes all files from GCS
t1 = BashOperator(
task_id='delete_gcs_files',
bash_command='echo "Deleting GCS files..."',
)
# T2 = Runs SQL query 1 and outputs to a table within BigQuery
t2 = BashOperator(
task_id='run_sql_query_1',
bash_command='echo "Running SQL Query 1..."',
)
# T3 = Runs SQL query 2 and outputs to a table within BigQuery
t3 = BashOperator(
task_id='run_sql_query_2',
bash_command='echo "Running SQL Query 2..."',
)
# T4 = Runs SQL query 3 and places a copy of this output as csv into the GCS that was emptied in T1
t4 = BashOperator(
task_id='run_sql_query_3_and_upload_to_gcs',
bash_command='echo "Running SQL Query 3 and uploading to GCS..."',
)
# T5= Copies and Appends the reference numbers from the file in T4 to a history table in BigQuery.
t5 = BashOperator(
task_id='append_to_history_table',
bash_command='echo "Appending to history table..."',
)
# 定义任务依赖关系
# 只有当 check_last_tuesday 成功后,后续任务才会被执行
check_last_tuesday >> t1 >> t2 >> t3 >> t4 >> t5
代码说明:
- schedule_interval='0 0 * * *': DAG 设置为每天午夜运行一次。PythonSensor 会在每次运行开始时检查条件。
- PythonSensor 的 python_callable 参数指向我们之前定义的 is_last_tuesday_of_month 函数。
- op_kwargs={'execution_date': '{{ ds }}'}: 通过 op_kwargs 将 Airflow 提供的 execution_date(以 ds 宏表示的日期字符串)传递给 is_last_tuesday_of_month 函数。在函数内部,context 字典会包含完整的 execution_date 对象,但为了演示 op_kwargs 的用法,这里显式传递字符串,并在函数内部进行转换(如果需要,尽管 Sensor 默认会将 execution_date 作为第一个位置参数传递给 python_callable)。更简洁的方式是直接在 python_callable 中利用 context 参数获取 execution_date 对象,例如 execution_date = context['execution_date']。
- poke_interval: 定义 Sensor 检查条件的频率(秒)。
- timeout: 定义 Sensor 等待条件满足的最长时间(秒)。如果超时,Sensor 任务将失败。
- mode='poke': Sensor 运行模式。poke 模式会周期性地在调度器或工作器上运行 python_callable,直到返回 True。对于这种即时判断的条件,poke 是合适的。
4. 注意事项与最佳实践
-
Sensor 的 mode 选择:
- poke 模式:Sensor 任务会一直占用一个工作器槽位,周期性地执行 python_callable。适用于条件检查耗时短、不频繁的场景。
- reschedule 模式:当条件不满足时,Sensor 任务会释放工作器槽位,并在 poke_interval 后重新调度。这对于需要长时间等待的条件(例如等待外部文件生成)更为高效,因为它不会长时间占用工作器资源。对于本例,条件检查是即时的,poke 模式足够。
- 条件函数的幂等性: python_callable 应该是一个幂等函数,即多次调用在相同输入下应返回相同的结果,并且不应产生副作用。
- 异常处理: 如果 python_callable 在执行过程中抛出异常,Sensor 任务将立即失败。确保条件函数内部有适当的错误处理机制。
- 日志记录: 在 python_callable 中添加详细的日志输出,可以帮助调试和理解 Sensor 的行为。
- execution_date 的使用: 确保你的条件判断逻辑是基于 Airflow 的 execution_date 而不是当前的系统时间,这样可以保证 DAG 的回溯(backfill)和重试行为的一致性。
- 替代方案(BranchPythonOperator): 如果你的需求是基于条件选择不同的下游任务分支,而不是等待或阻塞整个流程,那么 BranchPythonOperator 会是更合适的选择。PythonSensor 的核心在于“等待”或“阻止”流程。
5. 总结
通过 PythonSensor,Airflow 提供了强大的机制来实现基于复杂自定义条件的任务流控制。本文演示了如何利用 PythonSensor 结合 Python 日期处理逻辑,实现“每月最后一个周二”的条件判断。这种方法不仅使 DAG 结构更清晰,避免了不必要的任务执行,而且通过灵活的 python_callable,可以应对几乎所有基于 Python 的条件检查需求,极大地提升了 Airflow 工作流的智能性和资源利用效率。










