使用pymysql_replication需满足:开启binlog且格式为row;用户有replication slave权限;初始化时设唯一server_id和resume_stream=true;解析事件时通过columns映射取值;断线重连需持久化并校验位点。

连接 MySQL 时必须开启 binlog 和 ROW 格式
不满足这两个前提,pymysql_replication 连上去也收不到任何事件——它不会报错,只是安静地空转。
检查方式:SHOW VARIABLES LIKE 'log_bin'; 必须为 ON;SHOW VARIABLES LIKE 'binlog_format'; 必须是 ROW(不是 STATEMENT 或 MIXED)。
- MySQL 5.7+ 默认关 binlog,需在
my.cnf中显式加log-bin=mysql-bin并重启 -
binlog_format=ROW必须写在[mysqld]段下,且不能被客户端 SET 覆盖(只读变量) - 用户权限要包含
REPLICATION SLAVE,仅SELECT不够
初始化 BinLogStreamReader 的关键参数
最常漏掉的是 server_id 和 resume_stream,导致重复消费或断连后丢事件。
server_id 不是 MySQL 实例的 ID,而是你这个订阅进程的唯一标识——同一网段下多个订阅程序必须互不相同,否则 MySQL 主动踢掉旧连接。
立即学习“Python免费学习笔记(深入)”;
-
server_id推荐用随机整数(如int(time.time() * 1000) % 65535),避免硬编码 -
resume_stream=True才能从上次log_file+log_pos继续;设为False每次都从当前最新位置开始 -
only_events别滥用:过滤DeleteRowsEvent等类型能减负载,但跳过RotateEvent会导致日志切换时位置错乱
解析 event 时字段名和值的取法容易出错
pymysql_replication 不像 ORM 那样返回字典,原始字段结构藏得深,直接 print(event.rows) 很难看出啥。
比如 event.rows[0]["values"]["name"] 看似合理,但实际 values 是个 dict,key 是 column 对象,不是字符串名——所以会 KeyError。
- 正确取字段值:
event.columns[2].name拿列名,event.rows[0]["values"][event.columns[2]]拿对应值 - 更稳的方式是先构建列名到索引映射:
col_map = {col.name: i for i, col in enumerate(event.columns)},再用row["values"][event.columns[col_map["id"]]] -
event.schema和event.table只在部分 event 类型里有(如WriteRowsEvent),RotateEvent里为空,别无脑访问
长连接断开后的重连和位点恢复很脆弱
网络抖动或 MySQL 重启后,BinLogStreamReader 抛 BrokenPipeError 或 OperationalError 是常态,但它不会自动重试,也不会帮你记最后消费到哪。
靠 resume_stream=True 不够——如果连接中断时还没来得及更新本地位点,重连就可能重复或跳过几条。
- 务必在每次成功处理 event 后,用
event.packet.log_pos和event.packet.log_file持久化当前位点(写文件或 DB) - 重连逻辑要自己包一层:捕获异常 → 关闭旧 stream → 用刚存的
log_file/log_pos新建 stream - 注意
log_file可能已被 MySQL 删除(过期清理),此时要查SHOW BINARY LOGS找可用最早文件,否则初始化失败
位点管理这事没标准解法,但硬编码初始位点、不存盘、或只存内存,线上跑不出三天。










