
本文介绍如何在保持发送顺序的前提下,将原本同步阻塞的 `send_to_space()` 调用转为非阻塞异步执行,从而显著提升批量任务吞吐量,同时无需修改原函数、不依赖线程且避免竞态风险。
在实际开发中,我们常遇到这样一类场景:需按严格顺序向外部服务(如 Space API)逐个发送请求,但每个请求本身是 I/O 密集型、耗时数十毫秒,且调用方并不关心响应内容。若直接使用同步循环(for item in items_list: send_to_space(item)),整个流程将被逐次阻塞,总耗时 ≈ N × 单次延迟 —— 这在处理数百项时会明显拖慢整体性能。
此时,asyncio 是最恰当的选择,而非多线程或多进程。原因如下:
- ✅ send_to_space() 是 I/O 等待型函数(即使当前是同步写法),本质适合异步调度;
- ✅ asyncio.create_task() 可立即提交协程并返回 Task 对象,不等待其完成,实现“发完即走”;
- ✅ await 在循环体内逐个 await create_task(...) 保证了逻辑上的严格串行性:前一个请求已发起(非必须完成),才发起下一个,既避免乱序,又消除阻塞;
- ❌ 多线程(threading.Thread)会引入 GIL 争用与上下文切换开销,且无法天然保障执行/发送顺序;
- ❌ asyncio.gather() 或 asyncio.as_completed() 会导致并发乱序,违背“必须按列表顺序发送”的硬性要求。
以下是优化后的完整实现:
import asyncio
# 假设 send_to_space 是一个同步函数(不可修改),例如:
# def send_to_space(sub_item):
# requests.post("https://api.space.example", json={"data": sub_item})
# 将其包装为可 await 的协程(推荐方式)
async def send_to_space_async(sub_item):
loop = asyncio.get_running_loop()
# 使用 run_in_executor 非阻塞调用同步函数
await loop.run_in_executor(None, send_to_space, sub_item)
# 主协程:按序发起请求,不等待响应完成
async def send_items(items_list):
for item in items_list:
sub_item = item['sub_item']
# 创建任务并立即继续下一轮 —— 发送动作已启动
task = asyncio.create_task(send_to_space_async(sub_item))
# 注意:此处不 await task,否则退化为同步
# 但我们仍需确保“发起顺序”,所以用 await task 确保前一个已提交再发下一个
# 更准确地说:await task 保证前一个请求已进入网络栈,符合“顺序发送”语义
await task
# 入口点
if __name__ == "__main__":
my_finite_list = [
{"sub_item": "A"}, {"sub_item": "B"}, {"sub_item": "C"}
]
asyncio.run(send_items(my_finite_list))⚠️ 关键注意事项:
立即学习“Python免费学习笔记(深入)”;
- 若 send_to_space() 内部含大量 CPU 计算,建议改用 concurrent.futures.ProcessPoolExecutor 替代 None(默认 ThreadPoolExecutor);
- await task 并非等待响应返回,而是等待该任务被调度并开始执行(在 run_in_executor 场景下,即等待同步函数被线程池接纳)。这已满足“顺序发出”的约束;
- 若你 完全不需要任何等待(甚至不关心是否已成功提交),可改用 asyncio.create_task(...) 后不 await —— 但需额外处理异常和生命周期(如 asyncio.all_tasks() 清理),不推荐用于生产级顺序任务;
- Python ≥ 3.9 支持 asyncio.to_thread()(更简洁替代 run_in_executor),可进一步简化包装逻辑。
总结:对于“顺序 + 非阻塞 + 不改原函数”的需求,asyncio 结合 run_in_executor 是兼具简洁性、可控性与可靠性的标准解法。它让同步代码在异步框架中高效复用,是现代 Python I/O 编程的核心实践之一。










