本文介绍一种安全、高效的csv数据分流方法:读取原始csv,将满足条件的行(如已处理数据)写入“used.csv”,其余行保留至新文件,最后用新文件覆盖原文件,实现自动归档与清理。
本文介绍一种安全、高效的csv数据分流方法:读取原始csv,将满足条件的行(如已处理数据)写入“used.csv”,其余行保留至新文件,最后用新文件覆盖原文件,实现自动归档与清理。
在实际数据处理流程中(如批量邮件发送、任务队列消费或ETL预处理),常需将“已使用”的记录从源CSV中隔离出来,既保留历史操作痕迹,又确保后续运行只处理未处理数据。CSV本身不支持原地行删除,因此推荐采用“读–分发–覆写”策略:一次性读取全部内容,按规则分流写入两个目标文件,再用保留未使用数据的新文件替换原始文件。该方法原子性强、逻辑清晰,且避免了逐行修改带来的IO开销与并发风险。
以下是完整实现示例(以部门为"Marketing"的行作为“已使用”数据为例):
import csv
import shutil
# 定义路径
original_file = "data.csv"
used_file = "used.csv"
temp_file = "data_cleaned.csv"
# 步骤1:打开输入与两个输出文件(注意:必须指定 newline='' 防止空行)
with open(original_file, newline='') as f_in, \
open(used_file, "w", newline='') as f_used, \
open(temp_file, "w", newline='') as f_clean:
reader = csv.reader(f_in)
writer_used = csv.writer(f_used)
writer_clean = csv.writer(f_clean)
# 步骤2:读取并写入表头(如有)
try:
header = next(reader)
writer_used.writerow(header)
writer_clean.writerow(header)
except StopIteration:
# 空文件处理
pass
# 步骤3:逐行判断并分流写入
for row in reader:
# ✅ 自定义“已使用”条件:例如第2列(索引1)值为 "Marketing"
if len(row) > 1 and row[1].strip() == "Marketing":
writer_used.writerow(row)
else:
writer_clean.writerow(row)
# 步骤4:用 cleaned 文件安全覆盖原始文件(原子性替换)
shutil.move(temp_file, original_file)
print(f"✅ 已处理 {len([r for r in csv.reader(open(used_file))]) - (1 if open(used_file).readline().strip() else 0)} 行至 '{used_file}'")
print(f"✅ 原始文件 '{original_file}' 已更新为剩余未使用数据")⚠️ 关键注意事项:
- 始终使用 newline='':这是Python csv 模块的强制要求,否则在Windows下可能产生多余空行;
- 表头一致性:若原始CSV含表头,务必同步写入两个输出文件,否则后续读取易出错;
- 条件判断健壮性:示例中增加了 len(row) > 1 和 .strip() 防御空列或空白字符干扰;
- 原子性保障:通过 shutil.move() 替换原文件(而非直接写入),可避免程序中断导致原文件损坏;
- 大文件优化提示:若CSV超10万行,建议改用 pandas 的 chunksize 流式处理,或借助数据库临时表。
此方案无需第三方库,兼容Python 3.6+,可轻松适配任意布尔条件(如时间戳早于当前日期、状态字段为"processed"等)。只需修改 if 判断逻辑,即可将“移动-删除”行为精准绑定到您的业务规则上,是构建可靠数据流水线的基础实践。









