0

0

高效并行处理文本任务并安全写入 CSV:分离计算与 I/O 的最佳实践

碧海醫心

碧海醫心

发布时间:2026-03-12 18:48:01

|

705人浏览过

|

来源于php中文网

原创

高效并行处理文本任务并安全写入 CSV:分离计算与 I/O 的最佳实践

本文介绍如何通过分离耗时计算与轻量 I/O,利用 concurrent.futures 安全、鲁棒地并行处理大规模字符串列表,并将结果可靠写入 CSV 文件——避免共享 csv.writer 导致的序列化失败、线程竞争与进程挂起问题。

本文介绍如何通过分离耗时计算与轻量 i/o,利用 `concurrent.futures` 安全、鲁棒地并行处理大规模字符串列表,并将结果可靠写入 csv 文件——避免共享 `csv.writer` 导致的序列化失败、线程竞争与进程挂起问题。

在 Python 中对大批量文本(如医学术语、用户查询、日志条目)进行 CPU 或 I/O 密集型处理时,直接使用多线程/多进程写 CSV 常引发严重问题:csv.writer 对象不可被 pickle(导致 ProcessPoolExecutor 报 TypeError),且多线程并发调用 writerow() 会因共享文件句柄引发竞态条件、数据错乱甚至死锁。你遇到的超时后进程挂起、线程无法回收、批量中断难恢复等问题,根源正在于将计算逻辑与 I/O 操作耦合在同一函数中,并试图跨进程/线程共享状态

正确的解法是遵循“计算与输出分离”原则:
✅ 让并行任务只负责纯计算——输入字符串,返回结构化结果(如 list 或 dict),不触碰任何文件或全局状态;
✅ 将所有 I/O(CSV 写入、错误记录)移至主线程单线程串行执行,确保原子性与可预测性;
✅ 利用 concurrent.futures 的异常捕获机制实现容错:单个任务失败不影响整体流程,错误可单独记录。

以下是经过生产验证的完整实现:

import csv
import logging
import time
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor

# 配置结构化日志(便于排查超时/异常)
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s | %(levelname)-8s | %(message)s",
    handlers=[
        logging.StreamHandler(),
        logging.FileHandler("processing.log", mode="a")
    ]
)

# 示例原始数据(实际中可能来自文件或数据库)
term_list = [
    "Dementia", "HER2-positive Breast Cancer", "Stroke", "Hemiplegia",
    "Type 1 Diabetes", "IBD", "Lung Cancer", "Psoriasis", "Healthy",
    "Asthma", "Obesity", "Schizophrenia", "Cancer", "COVID-19"
]

def run_mappers(individual_string: str, other_args) -> list:
    """
    核心处理函数:仅执行业务逻辑,严格禁止 I/O 或修改全局状态。
    返回值必须是可序列化的(如 list/tuple/dict),供主进程收集。
    """
    try:
        # ✅ 模拟真实耗时操作(网络请求、NLP 分析、规则匹配等)
        time.sleep(0.1 + len(individual_string) * 0.005)  # 避免过快掩盖并发效果

        # 示例处理:标准化术语 + 添加元信息
        normalized = individual_string.strip().title()
        processed_result = [normalized, other_args, len(normalized), int(time.time() % 1000)]

        logging.debug("✓ Processed: %r → %s", individual_string, processed_result)
        return processed_result

    except Exception as e:
        # ❌ 不在此处处理异常,由主流程统一捕获
        logging.error("✗ Failed on %r: %s", individual_string, e)
        raise  # 让 future.exception() 可检测

def parallel_process_and_write(
    term_list: list,
    other_args,
    output_csv: str = "results.csv",
    error_log: str = "errors.txt",
    max_workers: int = 4,
    executor_type: str = "thread"  # "thread" or "process"
):
    """
    主协调函数:并行计算 → 收集结果 → 单线程写入
    """
    logging.info("Starting parallel processing of %d terms...", len(term_list))

    # Step 1: 并行提交所有任务(无状态、无共享)
    ExecutorClass = ThreadPoolExecutor if executor_type == "thread" else ProcessPoolExecutor
    with ExecutorClass(max_workers=max_workers) as executor:
        # 提交任务:每个 term 独立运行,返回 Future 对象
        futures = [
            executor.submit(run_mappers, term, other_args)
            for term in term_list
        ]

        # Step 2: 同步等待全部完成(支持 timeout,但不中断已启动任务)
        # 注意:as_completed 是可选优化,此处用 list comprehension 确保顺序无关性
        results = []
        errors = []
        for i, future in enumerate(futures):
            try:
                result = future.result(timeout=180)  # 单任务超时 3 分钟
                results.append(result)
                logging.debug("Task %d completed successfully.", i+1)
            except Exception as exc:
                error_msg = f"[Term {i+1}: {term_list[i]}] {type(exc).__name__}: {exc}"
                errors.append(error_msg)
                logging.warning("Task %d failed: %s", i+1, error_msg)

    # Step 3: 主线程单次写入 CSV(绝对安全)
    logging.info("Writing %d successful results to %s...", len(results), output_csv)
    with open(output_csv, "w", newline="", encoding="utf-8") as f:
        writer = csv.writer(f)
        # 写入表头(按需调整)
        writer.writerow(["Term", "OtherArgs", "Length", "Timestamp"])
        writer.writerows(results)

    # Step 4: 记录所有失败项(便于重试或审计)
    if errors:
        logging.warning("Encountered %d errors. Details written to %s", len(errors), error_log)
        with open(error_log, "w", encoding="utf-8") as f:
            f.write("Failed Processing Log\n" + "="*50 + "\n")
            f.write("\n".join(errors))

    logging.info("Processing completed. %d success, %d failed.", len(results), len(errors))

# 使用示例
if __name__ == "__main__":
    # ✅ 推荐:I/O 密集型(如含 HTTP 请求)用 ThreadPoolExecutor
    parallel_process_and_write(
        term_list=term_list,
        other_args="v2.1-annotation",
        output_csv="terms_output.csv",
        error_log="term_errors.log",
        max_workers=6,
        executor_type="thread"
    )

    # ⚠️ 备选:纯 CPU 密集型(如复杂 NLP 模型推理)可尝试 ProcessPoolExecutor
    # parallel_process_and_write(..., executor_type="process")

关键设计说明与注意事项

  • 为什么不用 ThreadPoolExecutor 共享 csv.writer?
    csv.writer 包含内部缓冲区和文件句柄,多线程并发调用 writerow() 会导致写入交错、换行丢失、部分行被截断。即使加锁,也会严重抵消并发收益。

  • 超时处理为何更健壮?
    future.result(timeout=180) 仅限制单个任务等待时间,超时后抛出 concurrent.futures.TimeoutError,主线程立即记录错误并继续处理下一个 future,不会阻塞或挂起整个池。这比手动管理 threading.Event 或 terminate_flag 更简洁可靠。

  • ThreadPoolExecutor vs ProcessPoolExecutor 怎么选?

    Bolt.new
    Bolt.new

    Bolt.new是一个免费的AI全栈开发工具

    下载
    • ThreadPoolExecutor:适用于 I/O 密集型任务(HTTP 调用、数据库查询、文件读取)。Python 的 GIL 在 I/O 时自动释放,线程能真正并发。
    • ProcessPoolExecutor:适用于 CPU 密集型任务(正则爆炸、数值计算、模型推理)。绕过 GIL,但有进程启动开销和对象序列化成本。建议先用线程池测试,若 CPU 使用率低且耗时长,再切换为进程池并 benchmark。
  • 内存与批量控制(呼应你的需求)
    若 term_list 极大(千万级),可分批处理:

    batch_size = 1000
    for i in range(0, len(term_list), batch_size):
        batch = term_list[i:i+batch_size]
        parallel_process_and_write(batch, other_args, 
                                 output_csv=f"batch_{i//batch_size}.csv")

    每批独立执行,失败批次可单独重试,彻底规避单次长任务失控风险。

  • 生产环境增强建议

    • 添加 tqdm 进度条:for future in tqdm(as_completed(futures), total=len(futures))
    • 结果去重/校验:在写入前对 results 做 set(tuple(r) for r in results)
    • CSV 编码容错:open(..., encoding="utf-8-sig") 防止 Excel 打开乱码
    • 错误重试:对 TimeoutError 或特定异常,可封装 retrying 逻辑(需确保 run_mappers 幂等)

通过此方案,你将获得:
? 高吞吐:并行压榨 CPU/I/O 资源;
? 强鲁棒:单任务崩溃不阻塞全局,超时自动跳过;
? 易维护:计算与 I/O 解耦,逻辑清晰,调试简单;
? 可扩展:无缝支持分批、重试、监控与分布式迁移(如改用 dask 或 Celery)。

现在,只需将你真实的 run_mappers() 业务逻辑填入函数体,保持其「纯计算、无副作用」特性,即可安全投入生产。

本站声明:本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系admin@php.cn

热门AI工具

更多
DeepSeek
DeepSeek

幻方量化公司旗下的开源大模型平台

豆包大模型
豆包大模型

字节跳动自主研发的一系列大型语言模型

WorkBuddy
WorkBuddy

腾讯云推出的AI原生桌面智能体工作台

腾讯元宝
腾讯元宝

腾讯混元平台推出的AI助手

文心一言
文心一言

文心一言是百度开发的AI聊天机器人,通过对话可以生成各种形式的内容。

讯飞写作
讯飞写作

基于讯飞星火大模型的AI写作工具,可以快速生成新闻稿件、品宣文案、工作总结、心得体会等各种文文稿

即梦AI
即梦AI

一站式AI创作平台,免费AI图片和视频生成。

ChatGPT
ChatGPT

最最强大的AI聊天机器人程序,ChatGPT不单是聊天机器人,还能进行撰写邮件、视频脚本、文案、翻译、代码等任务。

相关专题

更多
什么是分布式
什么是分布式

分布式是一种计算和数据处理的方式,将计算任务或数据分散到多个计算机或节点中进行处理。本专题为大家提供分布式相关的文章、下载、课程内容,供大家免费下载体验。

407

2023.08.11

分布式和微服务的区别
分布式和微服务的区别

分布式和微服务的区别在定义和概念、设计思想、粒度和复杂性、服务边界和自治性、技术栈和部署方式等。本专题为大家提供分布式和微服务相关的文章、下载、课程内容,供大家免费下载体验。

251

2023.10.07

js 字符串转数组
js 字符串转数组

js字符串转数组的方法:1、使用“split()”方法;2、使用“Array.from()”方法;3、使用for循环遍历;4、使用“Array.split()”方法。本专题为大家提供js字符串转数组的相关的文章、下载、课程内容,供大家免费下载体验。

760

2023.08.03

js截取字符串的方法
js截取字符串的方法

js截取字符串的方法有substring()方法、substr()方法、slice()方法、split()方法和slice()方法。本专题为大家提供字符串相关的文章、下载、课程内容,供大家免费下载体验。

221

2023.09.04

java基础知识汇总
java基础知识汇总

java基础知识有Java的历史和特点、Java的开发环境、Java的基本数据类型、变量和常量、运算符和表达式、控制语句、数组和字符串等等知识点。想要知道更多关于java基础知识的朋友,请阅读本专题下面的的有关文章,欢迎大家来php中文网学习。

1567

2023.10.24

字符串介绍
字符串介绍

字符串是一种数据类型,它可以是任何文本,包括字母、数字、符号等。字符串可以由不同的字符组成,例如空格、标点符号、数字等。在编程中,字符串通常用引号括起来,如单引号、双引号或反引号。想了解更多字符串的相关内容,可以阅读本专题下面的文章。

649

2023.11.24

java读取文件转成字符串的方法
java读取文件转成字符串的方法

Java8引入了新的文件I/O API,使用java.nio.file.Files类读取文件内容更加方便。对于较旧版本的Java,可以使用java.io.FileReader和java.io.BufferedReader来读取文件。在这些方法中,你需要将文件路径替换为你的实际文件路径,并且可能需要处理可能的IOException异常。想了解更多java的相关内容,可以阅读本专题下面的文章。

1228

2024.03.22

php中定义字符串的方式
php中定义字符串的方式

php中定义字符串的方式:单引号;双引号;heredoc语法等等。想了解更多字符串的相关内容,可以阅读本专题下面的文章。

1204

2024.04.29

C# ASP.NET Core微服务架构与API网关实践
C# ASP.NET Core微服务架构与API网关实践

本专题围绕 C# 在现代后端架构中的微服务实践展开,系统讲解基于 ASP.NET Core 构建可扩展服务体系的核心方法。内容涵盖服务拆分策略、RESTful API 设计、服务间通信、API 网关统一入口管理以及服务治理机制。通过真实项目案例,帮助开发者掌握构建高可用微服务系统的关键技术,提高系统的可扩展性与维护效率。

76

2026.03.11

热门下载

更多
网站特效
/
网站源码
/
网站素材
/
前端模板

精品课程

更多
相关推荐
/
热门推荐
/
最新课程
关于我们 免责申明 举报中心 意见反馈 讲师合作 广告合作 最新更新
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送

Copyright 2014-2026 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号