0

0

Airflow 中实现带状态感知的条件任务调度:精准触发带宽策略变更

碧海醫心

碧海醫心

发布时间:2026-02-25 09:31:04

|

302人浏览过

|

来源于php中文网

原创

Airflow 中实现带状态感知的条件任务调度:精准触发带宽策略变更

本文介绍如何在 Apache Airflow 中设计两个独立、定时触发的 DAG,分别在高峰起始与结束时刻精确执行一次带宽调整操作,并通过幂等性设计确保任务失败可重试、重复调度不重复生效。

本文介绍如何在 apache airflow 中设计两个独立、定时触发的 dag,分别在高峰起始与结束时刻精确执行一次带宽调整操作,并通过幂等性设计确保任务失败可重试、重复调度不重复生效。

在实际网络运维场景中(如 ISP 带宽动态调控),我们常需在特定时间窗口(例如每日 09:00–13:00)仅执行一次策略变更动作:高峰开始时统一限速,高峰结束时恢复原速。关键挑战在于:

  • ✅ 动作必须严格按时触发(非轮询判断);
  • ✅ 同一时刻只执行一次,避免重复调用导致配置冲突;
  • ✅ 支持失败自动重试,但重试不应破坏业务语义(即“恢复带宽”不可因重试而误降速);
  • ❌ 不应依赖 BranchDateTimeOperator 等运行时条件分支——它无法解决“已执行过”的状态记忆问题,且每日仅调度一次的 DAG 中,分支逻辑易造成误判或漏判。

最佳实践是:将“进入高峰”和“退出高峰”拆分为两个完全解耦、独立调度的 DAG。 每个 DAG 只含一个幂等任务,通过精准的 schedule 触发,天然满足“单次执行 + 可重试”要求。

viable
viable

基于GPT-4的AI非结构化数据分析平台

下载

✅ 推荐方案:双 DAG 架构(推荐 Airflow 2.6+)

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago
from datetime import time
import pendulum

# 共享策略获取逻辑(建议抽取为工具函数)
def fetch_active_policy():
    """从数据库/配置中心获取当前生效的带宽策略"""
    # 示例:实际应查询 DB 或 API,返回如 {'id': 1, 'start_time': '09:00', 'end_time': '13:00'}
    return {'id': 1, 'start_time': '09:00', 'end_time': '13:00'}

# 幂等的带宽降低函数(关键!)
def decrease_bandwidth(**context):
    policy = fetch_active_policy()
    policy_id = policy['id']

    # 【关键】添加幂等检查:写入标记或查询当前带宽状态
    # 此处以伪代码示意,生产环境建议使用 Airflow Variable / DB 记录 last_executed_ts 或 status
    from airflow.models import Variable
    last_decrease = Variable.get(f"bandwidth_decrease_{policy_id}", default_var=None)
    if last_decrease:
        # 若今日已执行,跳过(或校验是否为今天)
        from datetime import datetime
        if pendulum.parse(last_decrease).date() == pendulum.today().date():
            print(f"[SKIP] Bandwidth decrease already executed today for policy {policy_id}")
            return

    # 执行真实操作(如调用网管 API)
    print(f"[EXEC] Decreasing bandwidth for policy {policy_id} at {pendulum.now()}")
    # your_network_api.set_bandwidth_limit(policy_id, "20Mbps", "5Mbps")

    # 记录执行时间(保障幂等)
    Variable.set(f"bandwidth_decrease_{policy_id}", pendulum.now().isoformat())

# 幂等的带宽恢复函数
def return_to_normal_bandwidth(**context):
    policy = fetch_active_policy()
    policy_id = policy['id']

    from airflow.models import Variable
    last_restore = Variable.get(f"bandwidth_restore_{policy_id}", default_var=None)
    if last_restore:
        if pendulum.parse(last_restore).date() == pendulum.today().date():
            print(f"[SKIP] Bandwidth restore already executed today for policy {policy_id}")
            return

    print(f"[EXEC] Restoring bandwidth for policy {policy_id} at {pendulum.now()}")
    # your_network_api.set_bandwidth_limit(policy_id, "20Mbps", "20Mbps")

    Variable.set(f"bandwidth_restore_{policy_id}", pendulum.now().isoformat())


# === DAG 1:高峰开始时执行限速 ===
dag_decrease = DAG(
    dag_id="bandwidth_decrease_on_peak_start",
    schedule="0 9 * * *",  # 每日 09:00 UTC(请按实际时区调整)
    start_date=days_ago(1),
    catchup=False,
    tags=["network", "bandwidth", "isp"],
    timezone="Europe/Istanbul",  # ⚠️ 必须显式设置时区!
)

decrease_task = PythonOperator(
    task_id="decrease_bandwidth",
    python_callable=decrease_bandwidth,
    dag=dag_decrease,
)

# === DAG 2:高峰结束时恢复带宽 ===
dag_restore = DAG(
    dag_id="bandwidth_restore_on_peak_end",
    schedule="0 13 * * *",  # 每日 13:00 UTC
    start_date=days_ago(1),
    catchup=False,
    tags=["network", "bandwidth", "isp"],
    timezone="Europe/Istanbul",
)

restore_task = PythonOperator(
    task_id="restore_bandwidth",
    python_callable=return_to_normal_bandwidth,
    dag=dag_restore,
)

? 关键设计说明

  • 精准调度替代运行时判断:schedule="0 9 * * *" 确保任务在每天 09:00(指定时区)准时触发一次,无需轮询或条件分支,语义清晰、资源开销低。
  • 幂等性保障:通过 Airflow Variable 记录当日执行时间戳,每次运行前校验,避免重复操作。也可替换为数据库状态表、Redis 锁等更健壮方案。
  • 时区安全:务必在 DAG 级别显式声明 timezone(如 "Europe/Istanbul"),避免因 Airflow 默认 UTC 导致调度偏差。
  • 失败可重试:默认 retries=1,若首次失败,Airflow 将在 retry_delay 后重试,且幂等逻辑保证重试不会引发副作用。
  • 解耦清晰:两个 DAG 完全独立,便于单独启停、监控、调试,也支持未来扩展多时段策略(如午休高峰、晚间高峰)。

⚠️ 注意事项

  • Airflow Variable 适用于轻量状态记录,高并发或强一致性场景建议使用外部数据库(如 PostgreSQL 表 bandwidth_policy_log)并加唯一约束(policy_id + date)。
  • 避免在 PythonOperator 中执行长时间阻塞操作;如带宽调整 API 响应慢,应封装为异步任务或增加超时与重试策略。
  • 生产环境请启用 email_on_failure 并接入告警系统,确保策略变更失败时及时人工介入。
  • 若策略动态变化(如高峰时段每日不同),可将 schedule 改为 @hourly,并在任务内增加「是否到达今日高峰起点/终点」的实时判断(仍需配合幂等存储)。

通过该双 DAG 设计,您将获得一个简洁、可靠、可维护的带宽策略调度系统——既符合 Airflow 的声明式哲学,又完美契合网络运维对时效性与准确性的严苛要求。

热门AI工具

更多
DeepSeek
DeepSeek

幻方量化公司旗下的开源大模型平台

豆包大模型
豆包大模型

字节跳动自主研发的一系列大型语言模型

通义千问
通义千问

阿里巴巴推出的全能AI助手

腾讯元宝
腾讯元宝

腾讯混元平台推出的AI助手

文心一言
文心一言

文心一言是百度开发的AI聊天机器人,通过对话可以生成各种形式的内容。

讯飞写作
讯飞写作

基于讯飞星火大模型的AI写作工具,可以快速生成新闻稿件、品宣文案、工作总结、心得体会等各种文文稿

即梦AI
即梦AI

一站式AI创作平台,免费AI图片和视频生成。

ChatGPT
ChatGPT

最最强大的AI聊天机器人程序,ChatGPT不单是聊天机器人,还能进行撰写邮件、视频脚本、文案、翻译、代码等任务。

智谱清言 - 免费全能的AI助手
智谱清言 - 免费全能的AI助手

智谱清言 - 免费全能的AI助手

相关专题

更多
常用的数据库软件
常用的数据库软件

常用的数据库软件有MySQL、Oracle、SQL Server、PostgreSQL、MongoDB、Redis、Cassandra、Hadoop、Spark和Amazon DynamoDB。更多关于数据库软件的内容详情请看本专题下面的文章。php中文网欢迎大家前来学习。

998

2023.11.02

内存数据库有哪些
内存数据库有哪些

内存数据库有Redis、Memcached、Apache Ignite、VoltDB、TimesTen、H2 Database、Aerospike、Oracle TimesTen In-Memory Database、SAP HANA和ache Cassandra。更多关于内存数据库相关问题,详情请看本专题下面的文章。php中文网欢迎大家前来学习。

664

2023.11.14

mongodb和redis哪个读取速度快
mongodb和redis哪个读取速度快

redis 的读取速度比 mongodb 更快。原因包括:1. redis 使用简单的键值存储,而 mongodb 存储 json 格式的数据,需要解析和反序列化。2. redis 使用哈希表快速查找数据,而 mongodb 使用 b-tree 索引。因此,redis 在需要高性能读取操作的应用程序中是一个更好的选择。本专题为大家提供相关的文章、下载、课程内容,供大家免费下载体验。

499

2024.04.02

redis怎么做缓存服务器
redis怎么做缓存服务器

redis 作为缓存服务器的答案:redis 是一款开源、高性能、分布式的键值存储,可作为缓存服务器使用。本专题为大家提供相关的文章、下载、课程内容,供大家免费下载体验。

410

2024.04.07

redis怎么解决数据一致性
redis怎么解决数据一致性

redis 提供了两种一致性模型,以维护副本数据一致性:强一致性 (sync) 确保写操作仅在复制到所有从节点后才完成;最终一致性 (async) 则在主节点上写操作后认为已完成,牺牲一致性换取性能。本专题为大家提供相关的文章、下载、课程内容,供大家免费下载体验。

405

2024.04.07

mysql和redis怎么保证双写一致性
mysql和redis怎么保证双写一致性

确保 mysql 和 redis 双写一致性的技术包括:1、事务性更新:同时更新 mysql 和 redis,保证一致性;2、主从复制:mysql 主服务器更改同步到 redis 从服务器;3、基于事件的更新:mysql 记录更改并发送到 redis等等。本专题为大家提供相关的文章、下载、课程内容,供大家免费下载体验。

467

2024.04.07

redis缓存一般存些什么数据
redis缓存一般存些什么数据

redis缓存中存储的数据类型包括:字符串、哈希、列表、集合、有序集合、位图、地理空间数据和hyperloglog。这些数据类型适用于存储各种数据,从简单信息到复杂对象和地理位置。本专题为大家提供相关的文章、下载、课程内容,供大家免费下载体验。

420

2024.04.07

redis的8种数据类型有哪些
redis的8种数据类型有哪些

redis 提供 8 种数据类型:字符串(文本、数字、二进制)、哈希(键值对)、列表(有序集合)、集合(无序唯一元素)、有序集合(按分数排序)、地理空间(地理位置)、hyperloglog(估计大数据基数)和位图(位序列存储)。本专题为大家提供相关的文章、下载、课程内容,供大家免费下载体验。

446

2024.04.07

batoto漫画官网入口与网页版访问指南
batoto漫画官网入口与网页版访问指南

本专题系统整理batoto漫画官方网站最新可用入口,涵盖最新官网地址、网页版登录页面及防走失访问方式说明,帮助用户快速找到batoto漫画官方平台,稳定在线阅读各类漫画内容。

32

2026.02.25

热门下载

更多
网站特效
/
网站源码
/
网站素材
/
前端模板

精品课程

更多
相关推荐
/
热门推荐
/
最新课程
10分钟--Midjourney创作自己的漫画
10分钟--Midjourney创作自己的漫画

共1课时 | 0.1万人学习

Midjourney 关键词系列整合
Midjourney 关键词系列整合

共13课时 | 0.9万人学习

AI绘画教程
AI绘画教程

共2课时 | 0.2万人学习

关于我们 免责申明 举报中心 意见反馈 讲师合作 广告合作 最新更新
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送

Copyright 2014-2026 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号