
python标准库不提供“读一行即删一行”的文件操作函数,但可通过外部进度文件记录已处理行号,实现断点续传式文件处理,避免重复读写全量数据。
在实际生产环境中,尤其是处理日志、批量任务或ETL流水线时,常需保证程序崩溃(如意外断电、进程被杀)后能从中断处继续执行,而非从头开始或丢失状态。直接修改原文件(如每次读取后重写剩余内容)不仅效率低下(O(n²)时间复杂度)、线程不安全,还存在数据丢失风险——若写入中途失败,原始文件可能已被截断。
更健壮的替代方案是解耦“读取”与“状态管理”:保持源文件只读不变,将处理进度单独持久化。推荐采用以下两种工业级实践:
✅ 方案一:基于行号的轻量进度追踪(推荐用于结构化文本)
使用独立的进度文件(如 progress.txt)仅保存最后成功处理的行号。启动时读取该值,跳过此前所有行:
def process_file_with_checkpoint(filepath: str, progress_filepath: str):
# 读取上次中断位置
try:
with open(progress_filepath, 'r') as f:
last_line = int(f.read().strip())
except (FileNotFoundError, ValueError):
last_line = 0
# 流式读取,避免一次性加载全部内容(适用于大文件)
with open(filepath, 'r') as f:
for line_num, line in enumerate(f, start=1):
if line_num <= last_line:
continue
try:
# ✅ 在此处执行核心业务逻辑(如解析、入库、调用API等)
print(f"✅ Processing line {line_num}: {line.rstrip()}")
# 模拟可能失败的操作
# process_line(line)
# ✅ 立即更新进度(原子写入,降低丢失风险)
with open(progress_filepath, 'w') as pf:
pf.write(str(line_num))
except Exception as e:
print(f"❌ Error at line {line_num}: {e}")
raise # 或选择记录错误后继续
# 使用示例
process_file_with_checkpoint("data.txt", "progress.txt")⚠️ 注意事项: 进度文件写入应尽量轻量(单行整数),并确保写入后调用 os.fsync() 可进一步增强可靠性(尤其在Linux上); 若需更高一致性,可结合临时文件+原子重命名(如写入 progress.tmp 后 os.replace()); 对于超大文件(GB级),避免 readlines() 全加载,改用逐行迭代器(如示例所示)。
✅ 方案二:归档已处理行(适合需审计或回溯场景)
将每行处理结果追加至归档文件(如 processed.log),源文件永久保留。重启时通过对比两文件行数确定起点:
立即学习“Python免费学习笔记(深入)”;
import os
def archive_and_process(source: str, archive: str):
source_lines = sum(1 for _ in open(source))
archive_lines = sum(1 for _ in open(archive)) if os.path.exists(archive) else 0
start_at = archive_lines + 1
with open(source, 'r') as src, open(archive, 'a') as arc:
for i, line in enumerate(src, start=1):
if i < start_at:
continue
# 处理逻辑
result = f"[{i}] {line.rstrip()}"
print(result)
# 归档(追加写入,天然幂等)
arc.write(result + '\n')? 总结
- 不要尝试“边读边删”原文件:操作系统层面无原子性支持,易致数据损坏;
- 优先选用“外部进度文件”方案:简单、高效、易调试,满足95%的断点续传需求;
- 关键操作后立即落盘进度:比“处理完全部再存”更安全;
- 补充容错设计:如添加重试机制、错误日志隔离、行号校验等,可显著提升鲁棒性。










