
本文介绍一种健壮的断点续传机制:当调用 api 因 bearer token 过期而中断时,程序能自动记录最后失败的 clientid,并在下次运行时精准从该节点恢复迭代,避免重复请求与数据错乱。
在批量调用外部 API 的实际场景中(如逐个查询客户信息),身份令牌(Bearer Token)常有较短有效期(例如 1 小时)。若待处理客户量较大,程序极可能在中途遭遇 401 Unauthorized 或类似 Session already expired 响应而中断——此时若简单重启脚本,将从头开始重试,既浪费资源,又可能导致状态不一致(如重复写入、漏处理)。理想的解决方案是实现可持久化的断点续传(Resume from Last Failure)。
核心思路是:将“最后失败的 ClientId”作为恢复锚点,持久化存储于本地文件;每次启动时优先读取该文件,决定迭代起点。以下是推荐的工程化实现方案:
✅ 关键设计原则
- 无状态感知:主逻辑不依赖内存中的“当前索引”,而是通过 ClientId 精确定位,兼容 JSON 结构嵌套变化;
- 原子性保障:仅在确认 Token 失效时才落盘记录,且成功跑完全量后主动清理断点文件,避免误续传;
- 解耦清晰:数据提取、断点定位、API 调用分层封装,便于测试与扩展。
? 完整可运行代码示例
import itertools
import json
import pathlib
import requests
import time
# 配置项:断点文件路径(建议使用绝对路径或确保工作目录稳定)
LAST_FAILED_PATH = pathlib.Path("last_failed.txt")
def get_clients(data):
"""扁平化提取所有 Client 对象,支持多层嵌套结构"""
for outer in data:
if "ClientList" in outer and isinstance(outer["ClientList"], list):
yield from outer["ClientList"]
def get_clients_starting_from(data, client_id: str):
"""从指定 ClientId 开始生成后续 Client 迭代器(含该 ID)"""
return itertools.dropwhile(
lambda client: client["ClientId"] != client_id,
get_clients(data)
)
def call_api_with_client(client: dict, token: str) -> tuple[bool, str]:
"""
调用目标 API,返回 (是否成功, 响应文本/错误信息)
⚠️ 实际使用时请替换为真实 URL 和请求逻辑
"""
url = f"https://api.example.com/clients/{client['ClientId']}"
headers = {
"Authorization": f"Bearer {token}",
"Cache-Control": "no-cache",
}
try:
response = requests.get(url, headers=headers, timeout=10)
if response.status_code == 200:
return True, response.text
elif response.status_code in (401, 403):
# Token 失效典型响应码
try:
err_data = response.json()
msg = err_data.get("ErrorMessage", "").lower()
if "expired" in msg or "invalid token" in msg:
return False, "Token expired"
except (json.JSONDecodeError, KeyError):
pass
return False, f"Auth failed: {response.status_code}"
else:
return False, f"HTTP {response.status_code}: {response.reason}"
except requests.RequestException as e:
return False, f"Request error: {str(e)}"
def main():
# 1. 加载原始 JSON 数据
with open("client_file.json", "r", encoding="utf-8") as f:
data = json.load(f)
# 2. 确定起始位置
if LAST_FAILED_PATH.exists():
print("⚠️ 检测到断点文件,尝试从中断处恢复...")
try:
with open(LAST_FAILED_PATH, "r", encoding="utf-8") as f:
last_id = f.read().strip()
clients = get_clients_starting_from(data, client_id=last_id)
print(f"✅ 将从 ClientId = '{last_id}' 开始继续处理")
except Exception as e:
print(f"❌ 断点文件读取失败,将从头开始:{e}")
clients = get_clients(data)
else:
print("✅ 未检测到断点,从头开始处理")
clients = get_clients(data)
# 3. 执行 API 调用循环
success_count = 0
failure_count = 0
start_time = time.time()
for client in clients:
print(f"\n? 正在处理 Client: {client['ClientId']} ({client['ClientName']})")
# 替换为你的有效 Token(生产环境建议从环境变量或密钥管理服务获取)
TOKEN = "your_actual_bearer_token_here"
success, result = call_api_with_client(client, TOKEN)
if success:
print(f"✅ 成功响应 | {len(result)} 字符")
success_count += 1
else:
print(f"❌ 请求失败 | {result}")
failure_count += 1
# 关键:仅当确认 Token 过期时,记录断点并退出
if "expired" in result.lower() or "auth" in result.lower():
print(f"? 检测到 Token 过期,保存断点 ClientId = '{client['ClientId']}'")
LAST_FAILED_PATH.write_text(client["ClientId"], encoding="utf-8")
break # 立即终止,等待下次恢复
# 4. 清理断点(仅当全部成功完成时)
else:
if LAST_FAILED_PATH.exists():
LAST_FAILED_PATH.unlink()
print("✅ 全部任务完成,已清除断点文件")
# 5. 输出统计摘要
elapsed = time.time() - start_time
print(f"\n? 总结:成功 {success_count} 个,失败 {failure_count} 个,耗时 {elapsed:.2f} 秒")
if __name__ == "__main__":
main()⚠️ 注意事项与最佳实践
- Token 安全:切勿将 Token 硬编码在源码中!推荐使用 os.getenv("API_TOKEN") 或专用配置管理工具。
- 错误判定鲁棒性:示例中通过 HTTP 状态码 + 错误消息双重校验 Token 过期。请根据你对接的 API 文档调整判断逻辑(如检查 WWW-Authenticate 头、特定错误码 498 等)。
- 并发与限流:若需提升吞吐量,可用 concurrent.futures.ThreadPoolExecutor 并发调用,但务必添加 time.sleep() 或令牌桶限流,避免触发 API 阈值限制。
- 日志增强:生产环境建议集成 logging 模块,记录详细时间戳、请求 ID、响应头等,便于问题追溯。
- JSON 结构兼容性:当前 get_clients() 函数假设结构为 [{"ClientList": [...]}, ...]。若格式变化(如多级嵌套、字段名不同),请同步调整提取逻辑。
通过该方案,你的批处理脚本具备了生产级的容错与恢复能力——即使 Token 在第 1,247 个客户处过期,下次执行也将毫秒级定位并继续,真正实现“所断即所续”。










