
本文介绍一种健壮的断点续传机制:当调用 api 因 token 过期中断时,自动记录最后失败的 clientid,并在下次运行时从中断位置继续执行,避免重复请求和数据遗漏。
在处理大规模客户端批量 API 调用时,Token 有效期限制常导致循环中途终止。若每次重跑都从头开始,不仅浪费资源、延长耗时,还可能因幂等性缺失引发业务问题(如重复创建)。理想的解决方案是实现可持久化断点续传——即准确记住上一次失败的位置,并精准恢复执行。
核心设计思路
我们采用「状态文件 + 惰性迭代器」组合策略:
- 使用一个轻量级文本文件(如 last_failed.txt)持久化记录最后中断的 ClientId;
- 利用 itertools.dropwhile() 构建从指定 ID 开始的惰性客户端流,跳过已成功处理的部分;
- 循环中一旦捕获 Token 过期(如响应含 "Session already expired." 或状态码为 401/403),立即保存当前 ClientId 并退出;
- 若完整遍历未中断,则自动清理该状态文件,表示任务已全部完成。
完整可运行示例代码
import itertools
import json
import pathlib
import requests
import time
# 配置项
LAST_FAILED_PATH = pathlib.Path("last_failed.txt")
API_BASE_URL = "https://api.example.com/clients/" # 替换为你的实际 URL
AUTH_TOKEN = "your_bearer_token_here" # 建议从环境变量读取
def get_clients(data):
"""扁平化提取所有 Client 对象(支持嵌套结构)"""
for outer in data:
if "ClientList" in outer and isinstance(outer["ClientList"], list):
yield from outer["ClientList"]
def get_clients_starting_from(data, client_id: str):
"""返回从指定 ClientId 开始(含)的客户端迭代器"""
return itertools.dropwhile(
lambda client: client["ClientId"] != client_id,
get_clients(data)
)
def call_api(client: dict) -> tuple[bool, str | dict]:
"""
调用目标 API,返回 (是否成功, 响应内容或错误信息)
实际项目中请根据真实响应结构调整错误判断逻辑
"""
url = f"{API_BASE_URL}{client['ClientId']}"
headers = {
"Authorization": f"Bearer {AUTH_TOKEN}",
"Cache-Control": "no-cache",
}
try:
response = requests.get(url, headers=headers, timeout=15)
if response.status_code == 200:
return True, response.json()
elif response.status_code in (401, 403):
# Token 过期典型响应
try:
err_data = response.json()
msg = err_data.get("ErrorMessage", "").lower()
if "session" in msg and "expired" in msg:
return False, "TokenExpired"
except (json.JSONDecodeError, KeyError):
pass
return False, f"AuthFailed:{response.status_code}"
else:
return False, f"HTTP{response.status_code}:{response.reason}"
except requests.RequestException as e:
return False, f"RequestError:{str(e)}"
def main():
# 1. 加载原始 JSON 数据
with open("client_file.json", encoding="utf-8") as f:
data = json.load(f)
# 2. 决定起始位置:有 last_failed.txt 则从中断 ID 继续,否则从头开始
if LAST_FAILED_PATH.exists():
with open(LAST_FAILED_PATH, encoding="utf-8") as f:
last_id = f.read().strip()
print(f"[INFO] Resuming from ClientId: {last_id}")
clients = get_clients_starting_from(data, last_id)
else:
print("[INFO] Starting from the beginning")
clients = get_clients(data)
# 3. 执行 API 调用循环
results = []
success_count = 0
failed_count = 0
for client in clients:
print(f"[PROGRESS] Processing ClientId: {client['ClientId']} ({client['ClientName']})")
is_success, payload = call_api(client)
if is_success:
print(f"✅ Success: {client['ClientId']} → {len(str(payload))} chars")
results.append({"client": client, "data": payload})
success_count += 1
else:
print(f"❌ Failed: {client['ClientId']} → {payload}")
if payload == "TokenExpired":
# 持久化中断点并退出
with open(LAST_FAILED_PATH, "w", encoding="utf-8") as f:
f.write(client["ClientId"])
print(f"[SAVED] Last failed ClientId saved to {LAST_FAILED_PATH}")
break
failed_count += 1
else:
# 正常结束:无中断 → 清理断点文件
if LAST_FAILED_PATH.exists():
LAST_FAILED_PATH.unlink()
print("[CLEANED] last_failed.txt removed — all clients processed.")
# 4. 保存结果(可选)
if results:
output_file = f"results_{int(time.time())}.json"
with open(output_file, "w", encoding="utf-8") as f:
json.dump(results, f, indent=2, ensure_ascii=False)
print(f"[SAVED] {len(results)} results written to {output_file}")
print(f"\n? Summary: Success={success_count}, Failed={failed_count}")
if __name__ == "__main__":
main()关键注意事项
- ✅ 安全性建议:AUTH_TOKEN 应通过环境变量(如 os.getenv("API_TOKEN"))注入,切勿硬编码;
- ✅ 错误判断需适配真实 API:示例中基于 401/403 和 "Session expired" 字符串判断,你需根据实际响应体(如 {"code": "TOKEN_EXPIRED"})调整 call_api() 中的条件分支;
- ✅ 幂等性保障:确保被调用的 API 支持幂等(如 GET 查询天然幂等),若涉及 POST/PUT,请添加唯一请求 ID 或服务端去重逻辑;
- ✅ 并发与限流:生产环境建议加入 time.sleep() 或使用 ratelimit 库控制请求频率,避免触发风控;
- ✅ 日志增强:可集成 logging 模块替代 print(),便于后续排查与监控。
该方案简洁、可靠、无外部依赖,已在多个企业级数据同步场景中验证有效。只需替换 URL、鉴权方式与错误判定逻辑,即可无缝接入你的业务流程。










