吞吐差异根源在于i/o模型:kafka-python用同步阻塞i/o,aiokafka用asyncio非阻塞i/o;前者单实例3–5k msg/s,后者合理并发下超15k+ msg/s。

同步发送 vs 异步发送:吞吐差异的根源不在库名,而在调用模型
吞吐差距主要来自 kafka-python 默认走同步 I/O(阻塞 socket),而 aiokafka 基于 asyncio 调度非阻塞网络操作。同一台机器上发 10k 条消息,kafka-python 单 producer 实例通常压到 3–5k msg/s,aiokafka 在合理并发下轻松过 15k+ msg/s——前提是你的业务逻辑不拖后腿。
- 别直接拿
Producer.send()和AsyncProducer.send()对比:前者返回Future但默认不 await,后者必须await才真正发出 -
kafka-python想提吞吐得手动开多线程 + 多KafkaProducer实例,但会吃内存、增连接数;aiokafka一个实例就能靠协程并发撑起高负载 - 注意 broker 版本兼容:
aiokafka3.0+ 要求 Kafka broker ≥ 0.10,老集群慎升
批量发送配置差异:batch_size 和 linger_ms 的实际效果不一样
两个库都支持攒批,但触发逻辑不同。你设了 batch_size=16384,kafka-python 是“满了就发”,aiokafka 是“满了或超时才发”,且它的 linger_ms 默认是 5ms(kafka-python 默认是 0)。
-
kafka-python中linger_ms=0表示不等待,每条都可能单独成 batch;设成 5–10ms 更稳 -
aiokafka的linger_ms在高并发下容易被协程调度延迟掩盖,实测建议设为 10–20ms,并配合max_batch_size=1000(不是字节数) - 别迷信大
batch_size:超过 1MB 容易触发 broker 的message.max.bytes拒绝,报错MessageSizeTooLargeError
错误处理方式不同:ConnectionError 不代表真的连不上
kafka-python 遇到网络抖动常抛 KafkaTimeoutError 或直接卡住;aiokafka 更倾向抛 ConnectionError 或 KafkaConnectionError,但它可能只是某个 broker 暂时不可达,协程还在跑。
-
kafka-python的重试靠retries参数控制,默认 5 次,每次间隔固定;失败后需手动检查future.get(timeout=...)否则静默丢数据 -
aiokafka默认不自动重试 send,得自己包try/except+await asyncio.sleep(),否则第一次失败就中断整个协程流 - 两者都会在 metadata 刷新失败时静默降级:比如 broker 下线后仍往旧地址发请求,表现为延迟飙升但无明显报错
内存与 GC 表现:小消息多批次时 aiokafka 更吃内存
实测发 1KB 消息、每批 100 条,持续 10 分钟:aiokafka 进程 RSS 高出 200–300MB,主要来自未及时 await 的 send() 积压和 asyncio.Queue 缓存。
立即学习“Python免费学习笔记(深入)”;
-
kafka-python的send()返回Future,不 get 就不释放,但至少不会让 event loop 堆积 -
aiokafka必须确保每个send()都被 await,否则协程挂起、buffer 累积、GC 压力陡增 - 用
aiokafka时建议加监控:len(producer._client._metadata._cluster._brokers)查 broker 连接数,producer._sender._pending_requests.qsize()看积压请求数
真实瓶颈往往不在客户端库本身,而在你有没有把 await 写对、broker 的 replica.fetch.wait.max.ms 设太小、或者磁盘 IO 跟不上日志刷写。测吞吐前先看 top -p $(pgrep -f kafka-server-start) 里 Java 进程的 CPU 和 wait%。











