async函数不能直接在同步代码中调用,需用anyio.to_thread.run_sync配合asyncio.run来桥接执行环境,即在新线程中启动新event loop运行async函数并同步返回结果。

async 函数不能直接在同步代码里调用,anyio.to_thread.run_sync 是正确入口
你写了个 async def fetch_data(),但当前上下文是纯同步的(比如 CLI 主函数、旧版 Flask 视图、或单元测试 setup),直接调用会报 RuntimeWarning: coroutine 'fetch_data' was never awaited,甚至返回一个悬空的 coroutine 对象。这不是语法错误,而是执行模型不匹配——你得把 async 工作“扔进线程”再等结果回来。anyio.to_thread.run_sync 就是干这个的:它在后台线程中启动 event loop,运行你的 async 函数,再把结果同步地交还给你。
注意:anyio.to_thread.run_sync 不是把 async 函数“变同步”,而是帮你桥接两种执行环境。它内部会创建新线程 + 新 asyncio event loop,所以开销比纯同步调用大,别在热路径频繁用。
必须用 await 包裹的 async 函数,不能直接传给 run_sync
run_sync 只接受同步可调用对象(即普通函数)。如果你把 async def 函数本身传进去,比如 run_sync(fetch_data),会立刻失败——因为 fetch_data 返回的是 coroutine,不是可执行的同步函数。
正确做法是用 anyio.run_sync_in_worker_thread?不,那是旧名,已弃用。现在标准写法是:
- 用
lambda或包装函数把 await 行为收束成同步调用:run_sync(lambda: anyio.run_sync_in_worker_thread(fetch_data))?不对,这嵌套错了。 - 正确方式是用
anyio.run_sync_in_worker_thread的替代方案?等等,anyio3.x 已统一为to_thread.run_sync,但它只跑同步函数。所以你真正需要的是:anyio.run_sync_in_worker_thread已被移除,现在必须用anyio.from_thread.start_blocking_portal+portal.call?太重。 - 最简解法:用
asyncio.run在同步函数里启动新 loop:def sync_wrapper(): return asyncio.run(fetch_data()),再传给run_sync(sync_wrapper)。但注意:asyncio.run禁止在已有 event loop 中调用(比如你已在async def里),而to_thread.run_sync启动的线程没有 loop,所以安全。
示例:
import anyio import asyncioasync def fetch_data(): await asyncio.sleep(0.1) return "done"
def sync_entry():
✅ 正确:在新线程里用 asyncio.run 执行 async 函数
result = anyio.to_thread.run_sync( lambda: asyncio.run(fetch_data()) ) return result参数传递和异常传播要手动处理,
run_sync不自动解包 awaitable
run_sync不会识别你传进去的函数返回的是 coroutine;它只管执行、拿返回值。所以如果你忘了用asyncio.run或等价机制,返回的就是一个未 await 的coroutine对象,后续调用.result()会报AttributeError。常见翻车点:
- 传了
fetch_data(函数对象)而不是fetch_data()(调用结果)——后者是 coroutine,前者啥也不做。 - 用了
asyncio.get_event_loop().run_until_complete(fetch_data()),但在某些环境下(如 Windows + Python 3.12+)可能触发RuntimeError: no running event loop,因为线程里 loop 没显式创建。 - async 函数抛异常时,会原样抛到
run_sync调用处,无需额外 try,但堆栈会跨线程,可能丢失部分上下文。
建议封装一层防错:
def run_async_safely(coro_func, *args, **kwargs):
async def _runner():
return await coro_func(*args, **kwargs)
return anyio.to_thread.run_sync(lambda: asyncio.run(_runner()))
用法
result = run_async_safely(fetch_data)
替代方案对比:为什么不用 trio.to_thread.run_sync 或 asyncio.to_thread?
anyio.to_thread.run_sync 是跨 runtime 的抽象,底层可选 asyncio 或 trio,但你得确保当前 anyio 后端一致。如果项目已固定用 asyncio,且你只是临时桥接,asyncio.to_thread(Python 3.9+)更轻量——但它只能在已有 asyncio event loop 中用,无法在纯同步主线程里调用(会报 RuntimeError: no current event loop)。
trio.to_thread.run_sync 同理,依赖 trio 运行时。
所以结论很实际:只要你在同步上下文(没 loop),又想用 anyio 生态,就老实用 anyio.to_thread.run_sync + asyncio.run 组合。别试图绕过“启动新 loop”这步——它不是冗余,是必要隔离。
最后提醒:如果 async 函数本身只做 CPU 密集型工作(比如解析大 JSON),别用这个模式,直接上 anyio.to_process.run_sync 或 concurrent.futures.ProcessPoolExecutor 更合适。IO 密集才值得走 event loop 路线。










