内存事件总线适合单机服务内部解耦场景,如flask中通知日志与推荐模块,要求轻量、快速、无依赖且允许丢失;不适用于跨进程、需重试或保序的场景。

内存事件总线适合什么场景
纯内存实现(比如用 dict 存订阅者、list 存事件队列)只在进程内有效,重启即丢。它轻、快、无依赖,适合单机服务内部解耦——比如 Flask 里发个 user_registered 事件通知日志模块和推荐模块,不跨进程、不需重放、也不怕丢。
常见错误现象:EventBus.publish() 调用了但监听器没触发——大概率是监听注册和发布不在同一个实例里,比如你在单元测试里每次 new 一个新 EventBus,或者用多进程时没意识到内存不共享。
- 用类变量或单例模式确保全局唯一实例,别每次
EventBus()新建 - 避免在多线程中直接共享可变结构(如
list订阅表),加threading.Lock或改用weakref.WeakSet - 如果用 asyncio,别混用同步回调;注册异步函数时得用
asyncio.create_task()显式调度
持久化事件总线要解决哪些问题
一旦需要跨进程、保序、重试、回溯或故障恢复,就必须落盘。这时候不是“要不要持久化”,而是“选哪一层持久化”:消息队列(RabbitMQ、Kafka)、数据库(PostgreSQL LISTEN/NOTIFY 或写表轮询)、甚至本地文件(sqlite + WAL)。它们解决的不是“事件发没发”,而是“发了但消费者挂了怎么办”。
使用场景举例:订单创建后,要同时调库存服务(HTTP)、写风控日志(Kafka)、更新搜索索引(ES)。其中 ES 服务偶发延迟,你得能重推,不能靠内存里那一次 publish 就完事。
立即学习“Python免费学习笔记(深入)”;
-
Kafka保证分区有序+至少一次投递,但需自己管理 offset 和 consumer group -
Redis Stream简单易上手,支持消费者组和 pending list,但没 Kafka 那么强的吞吐和保留策略 - 用
sqlite自建事件表?可以,但别在高并发写入路径上直接INSERT+SELECT,加 WAL 模式和 PRAGMA synchronous = NORMAL
混合方案:内存兜底 + 异步落盘
很多真实服务其实走的是折中路线:先内存广播给本进程内监听器(快),再异步把事件塞进队列或 DB(稳)。这样既不卡主流程,又能兜住下游失败。
容易踩的坑:publish() 返回就以为事件“已送达”,其实只是进了队列缓冲区;或者异步落盘任务抛异常被吞掉,导致事件静默丢失。
- 异步落盘必须有失败重试 + 死信记录,比如写入
kafka失败时存到本地/var/log/eventbus/dead_letter.jsonl - 别让内存广播和持久化逻辑共用同一份事件对象;深拷贝或用不可变数据结构(如
dataclass(frozen=True)),防止一边改一边序列化出错 - 如果用
concurrent.futures.ThreadPoolExecutor做异步落盘,注意默认线程数是 CPU 核心数,突发流量下可能积压;设成max_workers=10更可控
性能与兼容性取舍点在哪
内存总线延迟在微秒级;Redis Stream 网络 RTT 加上序列化,通常 1–5ms;Kafka 批处理后可压到 2ms 内,但首次连接开销大。而 Python 的 asyncio 生态对 Kafka 官方 aiokafka 支持还行,但对 RabbitMQ 的 aio-pika 版本碎片化严重,稍不注意就遇到 ChannelClosedError。
参数差异很关键:比如 aiokafka 的 acks="all" 会等 ISR 全部写入,延迟翻倍;而 acks=1 只等 leader,更接近内存行为但有丢数据风险。
- 开发环境用内存总线 + 日志打印事件流,上线前切配置开关,别硬编码切换逻辑
- 所有事件 payload 必须 JSON-serializable,别传
datetime或Decimal;统一用isoformat()和float()预处理 - 别在事件里塞大文件或原始图片二进制——传 URL 或 ID,否则
redis会报String longer than 512 MB










