验证 rabbitmq 消息不丢失需关闭自动 ack(设 amqp.autoack=false),手动调用 ch.ack()/ch.nack();启用 qos 限流;panic 用 recover 捕获;模拟断连用 http crash 接口或 iptables,而非 kill -9;队列与消息须持久化;重试通过 dlx 实现。

怎么验证 RabbitMQ 消息没丢——先关掉自动 ACK
默认 amqp.AutoAck 是 true,消息一到消费者就立刻被 RabbitMQ 删除,根本没法测重传。必须设为 false,靠手动调用 ch.Ack() 或 ch.Nack() 控制生命周期。
- 消费者启动时,
ch.Qos(1, 0, false)限流,避免一次拉太多导致崩溃后批量丢失 - 每条消息处理完再
ch.Ack();出错时用ch.Nack(false, true)让消息重回队列头部(注意第二个参数requeue=true) - 别在 handler 里直接 panic —— 要用
recover()捕获,否则 goroutine 退出,连接断开,未 ack 的消息会被 RabbitMQ 释放回队列(行为取决于 connection 和 channel 的关闭方式)
模拟网络中断或消费者崩溃——别 kill -9,要用优雅断连
真实场景下,消费者不是“挂了”,而是连接突然断开(如网卡抖动、k8s pod 重启)。直接 kill -9 进程无法触发 AMQP 的 connection recovery 流程,也测不出消息是否真能重入队列。
- 在 consumer 代码中加一个
http.HandleFunc("/crash", func(w http.ResponseWriter, r *http.Request) { os.Exit(1) }),用 curl 触发,观察未 ack 消息是否回到队列 - 更贴近生产:用
iptables -A OUTPUT -p tcp --dport 5672 -j DROP模拟网络闪断,等几秒再恢复,看 consumer 是否自动重连并继续消费 - RabbitMQ 管理界面里手动 close channel 或 connection,也能触发未确认消息的 requeue 行为(前提是 queue 声明时
durable=true,且 publish 时deliveryMode=2)
重试次数和死信怎么控制——靠 header + DLX,别硬写 for 循环
业务逻辑里写 for i := 0; i 重试,只是本地循环,RabbitMQ 根本不知道,消息也不会进死信队列,更没法人工干预。
- 发消息时,在
amqp.Publishing.Headers里塞map[string]interface{}{"x-death": []interface{}{}, "retry_count": 0}(初始值) - 消费者收到后,先读
msg.Headers["retry_count"],若 ≥3 就ch.Publish(..., amqp.Publishing{Headers: map[string]interface{}{"dlx": "my-dl-exchange"}})发往死信交换器 - 死信交换器绑定到 DLQ(dead-letter queue),DLQ 再绑定回原 exchange,形成闭环;但注意 TTL 必须设在 DLQ 对应的 queue 上,不是 message 上
- 别依赖消息体里的 JSON 字段存重试计数——header 更轻量、AMQP 原生支持、不会被序列化污染
测试失败时怎么定位是哪条消息丢了——加 trace_id 和日志上下文
光看 RabbitMQ 管理后台的 “Ready / Unacked” 数字没用,你不知道具体哪条消息卡住了、为什么没 ack、是不是被 nack 了又没重入成功。
立即学习“go语言免费学习笔记(深入)”;
- 所有消息 publish 前生成唯一
trace_id,通过msg.Headers["trace_id"]透传,consumer 日志统一打log.Printf("[trace:%s] processing...", traceID) - 在
ch.Confirm()模式下监听notifyPublish,记录哪些 msgId 没收到 confirm,对应查原始业务请求日志 - 开启 RabbitMQ 的
firehose(rabbitmqctl trace_on),抓全链路 AMQP 帧,但仅用于调试,别长期开——性能损耗大
真正难的不是让消息重传,而是确认它「该重传的时候重传了,不该重传的时候不重传」。比如数据库主键冲突这种错误,重试一百次也没用,得进 DLQ 人工判断。所以 header 里除了 retry_count,还得加 error_code 字段,按错误类型分流处理。










