asyncio 不能直接使用 pulsar-client,因其底层为同步阻塞 c++ 实现;正确方式是 consumer 启用 listener 回调并用 asyncio.to_thread 执行异步逻辑,producer 需线程安全复用,且必须显式 ack/nack 消息。

asyncio 不能直接用 pulsar.Client
Python pulsar-client 官方 SDK 本身是同步阻塞的,底层基于 C++ client 封装,pulsar.Client、producer.send()、consumer.receive() 全部会阻塞当前线程 —— 即使你在 async def 函数里调用,也会让整个 event loop 卡住。
常见错误现象:RuntimeWarning: coroutine 'xxx' was never awaited 没出现,但 CPU 占用低、吞吐上不去、大量协程堆积在 pending 状态,本质是同步调用拖垮了 asyncio 调度。
- 别试图给
client.create_producer()加await—— 它根本不是协程,加了会报TypeError: object XXX can't be used in 'await' expression - 不要用
loop.run_in_executor包一层就以为“异步化”了:短消息场景下线程切换开销可能比发送本身还高,且无法解决 consumer 拉取的实时性问题 - 真实使用场景:Web API 接收请求后发 Pulsar 消息(需低延迟响应)、长周期 consumer 处理流式事件(需不阻塞其他任务)
必须用 pulsar-client 的 listener 模式配 asyncio.to_thread
官方唯一推荐的轻量适配方式,是启用 consumer 的 listener callback,并用 asyncio.to_thread 把业务逻辑扔进线程池执行,避免阻塞 event loop。注意:producer 仍需手动管理线程安全,不能跨线程复用同一 Producer 实例。
关键参数差异:consumer.subscribe(listener=...) 中的 listener 函数必须是普通同步函数;它内部调用 asyncio.to_thread(your_async_handler, msg) 才真正进入异步世界。
立即学习“Python免费学习笔记(深入)”;
-
listener函数内禁止 await 任何东西,否则会 crash(Pulsar C++ 层不识别协程对象) - 每个
msg必须显式调用consumer.acknowledge(msg)或consumer.negative_acknowledge(msg),listener 不自动 ack - 若 handler 抛异常,需在 listener 里捕获并调用
negative_acknowledge,否则消息会卡在 unacked 状态
替代方案:用 pulsar-client-cpp + uvloop 自建 bridge 成本太高
有人尝试用 Cython 封装 pulsar-client-cpp 的 async 接口,或通过 uvloop 直接对接 libpulsar 的 event loop —— 这些路径在实际项目中几乎不可维护。C++ client 的 async callback 语义和 Python asyncio 的生命周期管理冲突严重,比如 MessageId 在 callback 触发时可能已被 GC,导致 acknowledge 失败并静默丢消息。
性能影响明显:每条消息多一次 Python/C 边界穿越 + 手动 refcount 管理,吞吐反而比 to_thread 低 20%~40%,且 macOS 和 musl libc 环境下容易 core dump。
- 兼容性风险:pulsar-client 3.x 的 C++ 库 ABI 不稳定,升级 minor 版本常需重编译 binding
- 调试困难:stack trace 混杂 Python/Cpp/uvloop 三层,gdb 断点难打,core 文件无 Python 上下文
- 除非你已有成熟 C++ 异步消息中间件团队,否则别碰这个方向
Consumer 启动慢、首次 receive 延迟高?检查 receiver_queue_size 和 max_pending_chunked_message
默认配置下,consumer 初始化要等 broker 返回全量 topic 分区元数据,再逐个建立 TCP 连接,冷启动耗时可达 2~5 秒。这不是异步问题,而是 client 内部同步初始化逻辑导致的 —— 即使你用 to_thread,这个初始化过程也得在主线程完成。
可缓解但无法根除:把 receiver_queue_size 从默认 1000 降到 100,能减少单次拉取的数据量;设 max_pending_chunked_message=0 关闭 chunk 支持(如果你不用 batch + compression),避免额外解析开销。
- 千万别在
async def startup_event()里直接client.subscribe(...)—— FastAPI/Uvicorn 的 reload 模式会反复 init client,触发连接泄漏 - 推荐做法:用
atexit.register()或 lifespan hook 显式 close client,且 consumer 实例应全局单例复用 - 最容易被忽略的一点:broker 端
subscriptionTypesEnabled配置为[Exclusive, Shared]时,Shared 模式 consumer 启动更快,但要注意 message redelivery 行为变化











