
本文介绍如何通过分离耗时计算与轻量 I/O,利用 concurrent.futures 安全、鲁棒地并行处理大规模字符串列表,并将结果可靠写入 CSV 文件——避免共享 csv.writer 导致的序列化失败、线程竞争与进程挂起问题。
本文介绍如何通过分离耗时计算与轻量 i/o,利用 `concurrent.futures` 安全、鲁棒地并行处理大规模字符串列表,并将结果可靠写入 csv 文件——避免共享 `csv.writer` 导致的序列化失败、线程竞争与进程挂起问题。
在 Python 中对大批量文本(如医学术语、用户查询、日志条目)进行 CPU 或 I/O 密集型处理时,直接使用多线程/多进程写 CSV 常引发严重问题:csv.writer 对象不可被 pickle(导致 ProcessPoolExecutor 报 TypeError),且多线程并发调用 writerow() 会因共享文件句柄引发竞态条件、数据错乱甚至死锁。你遇到的超时后进程挂起、线程无法回收、批量中断难恢复等问题,根源正在于将计算逻辑与 I/O 操作耦合在同一函数中,并试图跨进程/线程共享状态。
正确的解法是遵循“计算与输出分离”原则:
✅ 让并行任务只负责纯计算——输入字符串,返回结构化结果(如 list 或 dict),不触碰任何文件或全局状态;
✅ 将所有 I/O(CSV 写入、错误记录)移至主线程单线程串行执行,确保原子性与可预测性;
✅ 利用 concurrent.futures 的异常捕获机制实现容错:单个任务失败不影响整体流程,错误可单独记录。
以下是经过生产验证的完整实现:
import csv
import logging
import time
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
# 配置结构化日志(便于排查超时/异常)
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s | %(levelname)-8s | %(message)s",
handlers=[
logging.StreamHandler(),
logging.FileHandler("processing.log", mode="a")
]
)
# 示例原始数据(实际中可能来自文件或数据库)
term_list = [
"Dementia", "HER2-positive Breast Cancer", "Stroke", "Hemiplegia",
"Type 1 Diabetes", "IBD", "Lung Cancer", "Psoriasis", "Healthy",
"Asthma", "Obesity", "Schizophrenia", "Cancer", "COVID-19"
]
def run_mappers(individual_string: str, other_args) -> list:
"""
核心处理函数:仅执行业务逻辑,严格禁止 I/O 或修改全局状态。
返回值必须是可序列化的(如 list/tuple/dict),供主进程收集。
"""
try:
# ✅ 模拟真实耗时操作(网络请求、NLP 分析、规则匹配等)
time.sleep(0.1 + len(individual_string) * 0.005) # 避免过快掩盖并发效果
# 示例处理:标准化术语 + 添加元信息
normalized = individual_string.strip().title()
processed_result = [normalized, other_args, len(normalized), int(time.time() % 1000)]
logging.debug("✓ Processed: %r → %s", individual_string, processed_result)
return processed_result
except Exception as e:
# ❌ 不在此处处理异常,由主流程统一捕获
logging.error("✗ Failed on %r: %s", individual_string, e)
raise # 让 future.exception() 可检测
def parallel_process_and_write(
term_list: list,
other_args,
output_csv: str = "results.csv",
error_log: str = "errors.txt",
max_workers: int = 4,
executor_type: str = "thread" # "thread" or "process"
):
"""
主协调函数:并行计算 → 收集结果 → 单线程写入
"""
logging.info("Starting parallel processing of %d terms...", len(term_list))
# Step 1: 并行提交所有任务(无状态、无共享)
ExecutorClass = ThreadPoolExecutor if executor_type == "thread" else ProcessPoolExecutor
with ExecutorClass(max_workers=max_workers) as executor:
# 提交任务:每个 term 独立运行,返回 Future 对象
futures = [
executor.submit(run_mappers, term, other_args)
for term in term_list
]
# Step 2: 同步等待全部完成(支持 timeout,但不中断已启动任务)
# 注意:as_completed 是可选优化,此处用 list comprehension 确保顺序无关性
results = []
errors = []
for i, future in enumerate(futures):
try:
result = future.result(timeout=180) # 单任务超时 3 分钟
results.append(result)
logging.debug("Task %d completed successfully.", i+1)
except Exception as exc:
error_msg = f"[Term {i+1}: {term_list[i]}] {type(exc).__name__}: {exc}"
errors.append(error_msg)
logging.warning("Task %d failed: %s", i+1, error_msg)
# Step 3: 主线程单次写入 CSV(绝对安全)
logging.info("Writing %d successful results to %s...", len(results), output_csv)
with open(output_csv, "w", newline="", encoding="utf-8") as f:
writer = csv.writer(f)
# 写入表头(按需调整)
writer.writerow(["Term", "OtherArgs", "Length", "Timestamp"])
writer.writerows(results)
# Step 4: 记录所有失败项(便于重试或审计)
if errors:
logging.warning("Encountered %d errors. Details written to %s", len(errors), error_log)
with open(error_log, "w", encoding="utf-8") as f:
f.write("Failed Processing Log\n" + "="*50 + "\n")
f.write("\n".join(errors))
logging.info("Processing completed. %d success, %d failed.", len(results), len(errors))
# 使用示例
if __name__ == "__main__":
# ✅ 推荐:I/O 密集型(如含 HTTP 请求)用 ThreadPoolExecutor
parallel_process_and_write(
term_list=term_list,
other_args="v2.1-annotation",
output_csv="terms_output.csv",
error_log="term_errors.log",
max_workers=6,
executor_type="thread"
)
# ⚠️ 备选:纯 CPU 密集型(如复杂 NLP 模型推理)可尝试 ProcessPoolExecutor
# parallel_process_and_write(..., executor_type="process")关键设计说明与注意事项
为什么不用 ThreadPoolExecutor 共享 csv.writer?
csv.writer 包含内部缓冲区和文件句柄,多线程并发调用 writerow() 会导致写入交错、换行丢失、部分行被截断。即使加锁,也会严重抵消并发收益。超时处理为何更健壮?
future.result(timeout=180) 仅限制单个任务等待时间,超时后抛出 concurrent.futures.TimeoutError,主线程立即记录错误并继续处理下一个 future,不会阻塞或挂起整个池。这比手动管理 threading.Event 或 terminate_flag 更简洁可靠。-
ThreadPoolExecutor vs ProcessPoolExecutor 怎么选?
- ✅ ThreadPoolExecutor:适用于 I/O 密集型任务(HTTP 调用、数据库查询、文件读取)。Python 的 GIL 在 I/O 时自动释放,线程能真正并发。
- ✅ ProcessPoolExecutor:适用于 CPU 密集型任务(正则爆炸、数值计算、模型推理)。绕过 GIL,但有进程启动开销和对象序列化成本。建议先用线程池测试,若 CPU 使用率低且耗时长,再切换为进程池并 benchmark。
-
内存与批量控制(呼应你的需求)
若 term_list 极大(千万级),可分批处理:batch_size = 1000 for i in range(0, len(term_list), batch_size): batch = term_list[i:i+batch_size] parallel_process_and_write(batch, other_args, output_csv=f"batch_{i//batch_size}.csv")每批独立执行,失败批次可单独重试,彻底规避单次长任务失控风险。
-
生产环境增强建议
- 添加 tqdm 进度条:for future in tqdm(as_completed(futures), total=len(futures))
- 结果去重/校验:在写入前对 results 做 set(tuple(r) for r in results)
- CSV 编码容错:open(..., encoding="utf-8-sig") 防止 Excel 打开乱码
- 错误重试:对 TimeoutError 或特定异常,可封装 retrying 逻辑(需确保 run_mappers 幂等)
通过此方案,你将获得:
? 高吞吐:并行压榨 CPU/I/O 资源;
? 强鲁棒:单任务崩溃不阻塞全局,超时自动跳过;
? 易维护:计算与 I/O 解耦,逻辑清晰,调试简单;
? 可扩展:无缝支持分批、重试、监控与分布式迁移(如改用 dask 或 Celery)。
现在,只需将你真实的 run_mappers() 业务逻辑填入函数体,保持其「纯计算、无副作用」特性,即可安全投入生产。










