Python批量请求需用Session复用连接、ThreadPoolExecutor并发、分层捕获异常并统一收口结果。关键包括:连接池调优、线程安全复用Session、按错误类型重试、结构化存储成败结果。

Python调用接口时,批量请求和异常处理是提升稳定性和效率的关键。单纯用requests.get()逐个发请求,既慢又容易崩——尤其面对几十上百个URL时。核心思路就两条:并发控制 + 稳健容错。
用requests.Session复用连接,减少开销
每次新建requests.get()都会重建TCP连接、TLS握手,耗时明显。换成Session对象,能自动复用底层连接,尤其适合批量请求同一域名的场景。
说明:
- Session会缓存连接池,默认保持10个空闲连接;
- 同一Session发出的请求,若Host相同,大概率复用已有连接;
- 配合mount可为HTTP/HTTPS定制Adapter(比如设置最大连接数)。
建议写法:
import requestssession = requests.Session()
立即学习“Python免费学习笔记(深入)”;
可选:调整连接池大小(避免TooManyRedirects或ConnectionPoolSizeError)
adapter = requests.adapters.HTTPAdapter(pool_connections=20, pool_maxsize=20) session.mount('http://', adapter) session.mount('https://', adapter)
urls = ['https://www.php.cn/link/cf24f44a79866351337c1b317ffdc18d', 'https://www.php.cn/link/abc58d2523df2aea708a509fbd201437'] for url in urls: try: resp = session.get(url, timeout=5) resp.raise_for_status() print(resp.json()) except requests.exceptions.RequestException as e: print(f"请求失败 {url}:{e}")
并发请求别硬上asyncio,先试试ThreadPoolExecutor
多数业务接口是IO密集型,用多线程比多进程更轻量,也比手写async更易维护。Python标准库concurrent.futures.ThreadPoolExecutor足够应对几百以内的并发量。
关键点:
- 控制max_workers(通常设为CPU核数×2~5,或根据目标服务器承载力调低);
- 每个worker内仍用Session,避免连接池竞争;
- 用as_completed实时获取结果,不阻塞等待全部完成。
示例结构:
from concurrent.futures import ThreadPoolExecutor, as_completeddef fetch_one(session, url): try: resp = session.get(url, timeout=8) resp.raise_for_status() return {'url': url, 'status': 'success', 'data': resp.json()} except Exception as e: return {'url': url, 'status': 'error', 'error': str(e)}
复用Session实例(注意:Session不是线程安全的,但用于GET基本无问题;如需绝对安全,可在每个worker里新建)
with ThreadPoolExecutor(max_workers=10) as executor: futures = [executor.submit(fetch_one, session, url) for url in urls] for future in as_completed(futures): result = future.result() if result['status'] == 'success': print("✅", result['url']) else: print("❌", result['url'], result['error'])
异常要分层捕获,别只靠try-except包全局
接口请求失败原因多样,统一用一个except Exception掩盖细节,调试和重试策略都会失效。应按错误类型分层处理:
-
网络层异常:如
ConnectionError、Timeout——适合立即重试(加退避); -
协议/响应异常:如
HTTPError(4xx/5xx)——4xx一般不重试,5xx可考虑重试; -
解析异常:如
JSONDecodeError——说明返回非预期格式(可能是HTML错误页),需记录原始resp.text排查; -
业务逻辑异常:如API返回
{"code": 4001, "msg": "余额不足"}——属于正常业务流,不应进except,而应在response后判断字段。
推荐做法:封装一个带基础重试和分类日志的请求函数:
import time import logging from requests.exceptions import ConnectionError, Timeout, HTTPErrordef safe_get(session, url, max_retries=2, backoff_factor=1): for i in range(max_retries + 1): try: resp = session.get(url, timeout=10) resp.raise_for_status() return resp except ConnectionError: if i == max_retries: raise logging.warning(f"连接失败 {url},{backoff_factor * (2 i)}s后重试") time.sleep(backoff_factor * (2 * i)) except Timeout: if i == max_retries: raise logging.warning(f"超时 {url},重试中...") time.sleep(backoff_factor (2 i)) except HTTPError as e: if resp.status_code >= 500 and i < max_retries: logging.warning(f"服务端错误 {url}({resp.status_code}),重试...") time.sleep(backoff_factor * (2 ** i)) continue raise # 4xx直接抛出
批量结果要统一收口,别让异常中断整个流程
批量请求的目标不是“全成功”,而是“可知可控”——哪怕100个里失败20个,也要明确知道哪20个、为什么失败、返回什么原始信息。
建议:
- 结果用字典或命名元组存储,含url、status('success'/'failed')、response(成功时为json dict,失败时为exception或原始resp);
- 失败项单独写入log文件或数据库,包含url、timestamp、error_type、error_msg、response_text(如有);
- 最终汇总打印成功数/失败数/平均耗时,方便快速评估批次质量。
不复杂但容易忽略。










