
本文教你使用 `concurrent.futures.threadpoolexecutor` 并发调用深层嵌套的 api(如逐个获取 award 详情),将串行耗时从数分钟降至数秒,兼顾代码简洁性与生产可用性。
在处理类似开放采购数据(如 Paraguay 公共合同 API)这类层级嵌套的 JSON 响应时,常见的串行请求模式——即外层遍历 records,内层遍历每个 compiledRelease.awards,再为每个 award.id 发起独立 HTTP 请求——极易成为性能瓶颈。尤其当总量达数百个 compiledRelease、单个 awards 数组含数十甚至上百条记录时,同步阻塞式调用会导致总耗时呈线性增长(例如 500 次请求 × 平均 1.2s = 超 10 分钟),而网络 I/O 实际占用 CPU 极少,大量时间浪费在等待响应上。
此时,多线程并发是 Python 中最直接、稳定且无需修改业务逻辑的加速方案。concurrent.futures.ThreadPoolExecutor 正是为此类 I/O 密集型任务设计的标准库工具:它自动管理线程池、复用线程、支持结果收集与异常处理,且语法清晰易读。
以下是一个生产就绪的示例实现:
import requests
from concurrent.futures import ThreadPoolExecutor, as_completed
import time
# 示例 API 调用函数(请替换为你的真实函数)
def api_call_function(award_id):
# 注意:真实场景中应添加超时、重试、错误码判断
try:
response = requests.get(f"https://api.example.com/awards/{award_id}", timeout=10)
response.raise_for_status()
return response.json()
except Exception as e:
return {"error": str(e), "award_id": award_id}
# 主处理函数
def fetch_all_award_details(records, max_workers=20):
"""
并发获取所有 awards 的完整详情
:param records: 原始 JSON 中的 records 列表
:param max_workers: 线程池最大并发数(建议 10–30,避免被限流)
:return: 所有成功响应的列表(含 award_id 标识)
"""
award_ids = []
# 第一步:扁平化提取全部 award ID(保留上下文信息可选)
for item in records:
compiled = item.get("compiledRelease", {})
awards = compiled.get("awards", [])
for award in awards:
award_id = award.get("id")
if award_id:
award_ids.append(award_id)
print(f"共发现 {len(award_ids)} 个 award ID,启动并发请求...")
results = []
# 第二步:使用线程池并发执行
with ThreadPoolExecutor(max_workers=max_workers) as executor:
# 提交所有任务
future_to_id = {executor.submit(api_call_function, aid): aid for aid in award_ids}
# 按完成顺序收集结果(非提交顺序)
for future in as_completed(future_to_id):
award_id = future_to_id[future]
try:
data = future.result()
results.append({"award_id": award_id, "data": data})
except Exception as exc:
results.append({"award_id": award_id, "error": str(exc)})
return results
# 使用示例
if __name__ == "__main__":
# 假设你已通过 requests 获取原始 records
# response = requests.get("https://api.example.com/records?limit=100")
# records = response.json().get("records", [])
# 这里用模拟数据演示结构
sample_records = [
{
"compiledRelease": {
"awards": [
{"id": "award-001"},
{"id": "award-002"}
]
}
},
{
"compiledRelease": {
"awards": [
{"id": "award-003"},
{"id": "award-004"},
{"id": "award-005"}
]
}
}
]
start_time = time.time()
all_details = fetch_all_award_details(sample_records, max_workers=10)
end_time = time.time()
print(f"\n✅ 完成!共获取 {len(all_details)} 条响应,耗时 {end_time - start_time:.2f} 秒")
# 可进一步处理:筛选成功项、写入文件、入库等? 关键注意事项与最佳实践:
立即学习“Python免费学习笔记(深入)”;
- 合理设置 max_workers:通常 10–30 是安全起点;过高可能触发服务端限流或本地端口耗尽;可通过小规模测试(如 50 个 ID)对比不同值的吞吐量来调优。
- 务必添加超时与异常处理:HTTP 请求必须设 timeout(推荐 5–15 秒),否则单个失败请求会阻塞整个线程;as_completed() 确保不因某次失败中断整体流程。
- 避免全局共享状态:线程间不要直接修改同一字典/列表;本例中每个 future.result() 返回独立数据,天然线程安全。
- 替代方案说明:虽 asyncio + aiohttp 在理论吞吐上更高,但需全面重构为异步风格(包括 HTTP 客户端、JSON 解析等),学习成本与维护复杂度显著上升;对绝大多数 API 场景,ThreadPoolExecutor 已足够高效且稳健。
通过以上改造,你的脚本可在数秒内完成原本需数分钟的批量请求,同时保持代码逻辑清晰、易于调试和扩展。










