
本文介绍如何通过分离计算密集型任务与I/O操作,安全、高效地并行处理大规模字符串列表,并将结果可靠写入CSV文件——避免多进程/线程直接共享csv.writer引发的序列化失败、竞态或死锁问题。
本文介绍如何通过分离计算密集型任务与i/o操作,安全、高效地并行处理大规模字符串列表,并将结果可靠写入csv文件——避免多进程/线程直接共享`csv.writer`引发的序列化失败、竞态或死锁问题。
在Python中对大批量数据(如数千个医学术语)进行逐项处理时,盲目套用多线程或多进程常导致意外失败:csv.writer对象不可被pickle,无法跨进程传递;多线程并发写同一文件易引发数据错乱或IO阻塞;而粗粒度的超时控制(如thread.join(timeout))又难以优雅降级——超时时程序挂起、线程无法真正终止、后续批次停滞不前。
根本解法在于职责分离(Separation of Concerns):
✅ 并行层仅负责“计算”:每个工作单元独立执行run_mappers(),输入为单个字符串和参数,纯函数式输出处理结果(如list或dict),不触碰任何文件句柄或全局状态;
✅ 串行层统一“聚合与落盘”:所有并行任务完成后,主线程按序收集结果,集中调用csv.writer.writerow()——规避并发写冲突,也无需考虑对象序列化限制。
以下是符合生产级要求的完整实现:
import csv
import logging
import time
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, as_completed
from typing import List, Tuple, Any, Optional
# 配置结构化日志(便于追踪失败项)
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s | %(levelname)-8s | %(message)s",
handlers=[
logging.StreamHandler(),
logging.FileHandler("processing.log", encoding="utf-8")
]
)
# 示例输入(实际中可从文件/数据库加载)
term_list = [
"Dementia", "HER2-positive Breast Cancer", "Stroke", "Hemiplegia",
"Type 1 Diabetes", "IBD", "Lung Cancer", "Psoriasis", "Healthy", "Asthma"
# ... 更多条目(见原始问题)
]
def run_mappers(individual_string: str, other_args: Any) -> List[Any]:
"""
核心处理函数:仅执行计算,返回结构化结果。
✅ 无副作用:不修改全局变量,不访问文件/网络/I/O设备。
✅ 可异常中断:失败时抛出异常,由主流程捕获并记录。
"""
try:
# 模拟耗时业务逻辑(如API调用、NLP解析、规则匹配等)
time.sleep(0.1 + (hash(individual_string) % 300) / 1000) # 随机延迟
# 示例处理:标准化术语 + 附加元数据
normalized = individual_string.strip().title()
processed_result = [normalized, len(normalized), other_args, int(time.time() % 1000)]
# 可选:模拟偶发错误(便于测试容错)
if "Cancer" in normalized and hash(normalized) % 17 == 0:
raise RuntimeError(f"Transient failure on {normalized}")
return processed_result
except Exception as e:
logging.error(f"Failed to process '{individual_string}': {e}")
raise # 让executor捕获异常,而非静默吞掉
def parallel_process_and_write(
terms: List[str],
other_args: Any,
output_csv: str = "results.csv",
error_log: str = "errors.txt",
max_workers: int = 6,
use_processes: bool = False # True for CPU-bound; False (default) for I/O-bound
) -> Tuple[int, int]:
"""
主协调函数:并行计算 + 串行写入。
Args:
terms: 待处理字符串列表
other_args: 透传给run_mappers的额外参数
output_csv: 输出CSV路径
error_log: 错误日志路径
max_workers: 并发工作单元数
use_processes: 是否使用ProcessPoolExecutor(适合CPU密集型)
Returns:
(成功写入行数, 失败条目数)
"""
logging.info(f"Starting parallel processing of {len(terms)} terms...")
start_time = time.time()
# Step 1: 并行提交所有任务(不阻塞)
executor_class = ProcessPoolExecutor if use_processes else ThreadPoolExecutor
with executor_class(max_workers=max_workers) as executor:
# 提交所有future(注意:submit参数是单个term,非整个列表!)
futures = [
executor.submit(run_mappers, term, other_args)
for term in terms
]
# Step 2: 异步收集结果(带超时保护,防无限等待)
results = []
errors = []
for future in as_completed(futures, timeout=300): # 全局超时5分钟
try:
result = future.result(timeout=60) # 单任务超时1分钟
results.append(result)
except Exception as exc:
# 记录具体失败原因(包括TimeoutError、RuntimeError等)
errors.append(str(exc))
logging.warning(f"Task failed: {exc}")
# Step 3: 串行写入CSV(绝对线程/进程安全)
success_count = 0
try:
with open(output_csv, "w", newline="", encoding="utf-8") as f_csv, \
open(error_log, "w", encoding="utf-8") as f_err:
writer = csv.writer(f_csv)
# 写入表头(按需调整)
writer.writerow(["Term", "Length", "OtherArgs", "Timestamp"])
for result in results:
writer.writerow(result)
success_count += 1
# 记录所有错误堆栈
for err in errors:
f_err.write(err + "\n")
except Exception as e:
logging.critical(f"Fatal error during CSV write: {e}")
raise
elapsed = time.time() - start_time
logging.info(
f"Processing completed in {elapsed:.1f}s: "
f"{success_count}/{len(terms)} succeeded, {len(errors)} failed."
)
return success_count, len(errors)
# 使用示例
if __name__ == "__main__":
# 处理全量数据(支持分批调用以控内存)
total_success, total_fail = 0, 0
batch_size = 50
for i in range(0, len(term_list), batch_size):
batch = term_list[i:i + batch_size]
logging.info(f"Processing batch [{i}:{i+len(batch)}]...")
try:
success, fail = parallel_process_and_write(
terms=batch,
other_args="metadata_v1",
output_csv=f"batch_{i//batch_size}.csv",
error_log=f"batch_{i//batch_size}_errors.txt",
max_workers=4,
use_processes=False # 若run_mappers含大量CPU计算,设为True
)
total_success += success
total_fail += fail
except Exception as e:
logging.error(f"Batch [{i}:{i+len(batch)}] crashed: {e}")
# 继续下一组,不中断整体流程
logging.info(f"Final summary: {total_success} success, {total_fail} failed.")关键注意事项与最佳实践
- 永远不要跨进程/线程共享csv.writer或文件对象:它们不是线程安全的,且csv.writer内部持有不可序列化的缓冲区,multiprocessing会直接报PicklingError。
-
选择ThreadPoolExecutor还是ProcessPoolExecutor?
- ✅ ThreadPoolExecutor:适用于I/O密集型任务(如HTTP请求、数据库查询、轻量文本处理);开销小,启动快。
- ✅ ProcessPoolExecutor:适用于CPU密集型任务(如复杂正则、数值计算、机器学习推理);可绕过GIL,但进程创建/通信成本高,需确保other_args可被pickle。
- 超时设计要分层:as_completed(timeout=...)控制整体等待,future.result(timeout=...)控制单任务,双重防护避免卡死。
- 错误处理必须显式:用future.exception()或try/except捕获异常,绝不能依赖future.cancel()(它无法强制终止已运行的线程/进程,仅对未开始的任务有效)。
- 内存友好分批处理:对超大term_list,按batch_size切片后循环调用主函数,避免一次性加载全部future到内存。
- 日志即监控:结构化日志(含时间戳、级别、消息)是调试并行问题的唯一可靠依据,比print()强大百倍。
遵循此模式,你将获得:✅ 稳定可扩展的并行吞吐量、✅ 100%安全的CSV输出、✅ 清晰的错误溯源能力、✅ 无缝的失败降级策略——这才是生产环境应有的并行化实践。










