关键在于用 run_id 或 batch_date 作逻辑分区键,写入前显式清理对应分区数据,禁用 auto-commit 并手动提交 kafka offset,统一使用 utc 时间(如 pendulum.today('utc'))避免时区不一致。

如何让 Python ETL 任务支持多次重跑不重复写入
关键不是“防止重跑”,而是让每次跑都产出一致结果——靠状态标记 + 写入前清理,而不是靠锁或外部调度判断。否则一旦中间出错、手动触发重跑,数据就乱了。
常见错误现象:KeyError(查不到上次运行时间)、IntegrityError(主键冲突)、下游表里出现双份订单记录。
- 用
run_id或batch_date作为逻辑分区键,所有写入目标(数据库表、Parquet 路径、S3 前缀)必须显式包含它 - 写入前先执行清理:对数据库用
DELETE FROM table WHERE batch_date = '2024-04-01';对文件系统用shutil.rmtree()或fs.delete() - 避免依赖“上次成功时间”字段做增量判断——这个值可能滞后、被人工改过、或在并发重跑时不可靠
用 pandas.to_sql(..., if_exists='replace') 安全吗
不安全,尤其在有外键、索引、权限控制的生产库中。if_exists='replace' 实际是 DROP TABLE + CREATE TABLE,会丢失原表结构元信息,还可能触发级联删除或锁表。
使用场景:仅限临时表、测试环境、或你完全掌控 DDL 的宽表落地环节。
立即学习“Python免费学习笔记(深入)”;
- 生产环境一律改用
if_exists='append'+ 显式DELETE清理,确保约束、索引、注释保留 - 如果目标表没有
batch_date字段,别硬加——先ALTER TABLE ADD COLUMN,再清理写入 - 注意
pandas.to_sql默认不开启事务,大批次写入建议包在connection.begin()里
Airflow 中 task 重试时怎么避免重复消费 Kafka 数据
不是靠 Airflow 的 retries 参数控制,而是靠消费者自己管理 offset 提交时机——必须在数据落库/落盘成功后,才提交 offset。
常见错误现象:task 失败重试 → offset 已提交 → 重试时从新位置开始读 → 漏数据;或者 offset 没提交 → 每次都重读 → 重复写入。
- 用
KafkaConsumer时,禁用enable_auto_commit=True,改用手动commit() - 把
consumer.commit()放在写入逻辑的finally块之后,且只在写入成功时调用 - 如果用
confluent-kafka,注意msg.offset()是下一条,真正要 commit 的是msg.offset() + 1
为什么用 datetime.now() 生成 batch_date 总出问题
因为本地时区、Docker 容器时区、Airflow worker 时区三者不一致,导致同一批任务在不同节点上生成的 batch_date 不同,清理和覆盖失效。
性能影响:看似只是个时间函数,但间接造成跨天数据混写、分区路径错乱、甚至 Hive 表 MSCK REPAIR 失败。
- 统一用
pendulum.today('UTC').date().isoformat()或 Airflow 的{{ ds }}模板变量 - 禁止在 Python 脚本里调用
datetime.now()或date.today() - 如果必须用当前时间(比如打日志),至少用
datetime.utcnow()并明确标注 UTC
最麻烦的其实是跨系统时间对齐——比如 Airflow DAG 设的是 UTC 0 点调度,但你的 ETL 脚本读的是服务器本地时间,差 8 小时,batch_date 就永远错一天,清理动作压根找不到目标分区。










