应全局复用 *amqp.connection(sync.once 初始化),按需创建 channel 并及时 close;发送时设 deliverymode=persistent、队列 durable=true、mandatory=true;消费者需幂等校验、qos 限流、业务完成后再 ack。

怎么建立稳定连接,而不是每次发消息都 Dial
Go 应用连 RabbitMQ 最常见的翻车点,就是把 amqp.Dial 放在业务函数里——每发一条消息就建一次 TCP 连接,不出三天服务就卡死在 connect: cannot assign requested address。RabbitMQ 的 Connection 是重量级资源,必须复用;Channel 才是轻量级的、可按需创建。
正确做法是全局单例一个 *amqp.Connection,用 sync.Once 初始化,再通过 conn.Channel() 按需获取 Channel。别忘了设超时:amqp.Dial("amqp://...?connect_timeout=5"),否则 DNS 故障时会卡住整个 goroutine。
- Connection 复用:避免频繁 handshake 和端口耗尽
- Channel 不复用:每个 goroutine 用完立刻
ch.Close(),否则 Channel 泄漏会导致 RabbitMQ 报channel error: too many channels - 加重连逻辑:网络抖动时
conn.IsClosed()+ 背压重试(比如指数退避),别让一次断连导致整条链路静默
发消息前必须设置哪些关键参数
默认直连队列("" exchange)看似简单,但生产环境一上量就丢消息——因为没开持久化、没确认、没处理失败。真正可靠的发送,至少要配齐三样:mandatory、immediate(已废弃,忽略)、以及更关键的 publishing.DeliveryMode。
DeliveryMode: amqp.Transient(默认)意味着消息只存在内存,Broker 重启就消失;必须改成 amqp.Persistent 并配合队列声明时的 durable: true,才能落盘。
立即学习“go语言免费学习笔记(深入)”;
- 队列声明必须带
durable: true,否则即使 DeliveryMode=Persistent 也无效 - 发送时加
mandatory: true,这样如果路由失败(比如 exchange 不存在或 binding 错误),ch.Publish会立即返回 error,而不是静默丢弃 - 别依赖
channel.Confirm()后再发下一条——它只是开启 publisher confirm 模式,真正要等确认得调ch.NotifyPublish+ select 监听,否则还是“发了就不管”
消费者怎么写才不会重复消费或卡死
RabbitMQ 的 consumer 不是“收到就干”,而是“收到→干活→显式 Ack→再收下一条”。很多人在 handler 里直接 delivery.Ack(false),结果业务 panic 或数据库挂了,消息却已被确认,彻底丢失。
正确顺序是:先做幂等校验(比如用 redis.SetNX("processed:order_123", "1", time.Hour)),再执行业务逻辑,最后成功才 delivery.Ack(false)。同时必须设 ch.Qos(1, 0, false) 控制预取数,否则 RabbitMQ 会一口气推几百条给一个 consumer,OOM 或重启时全丢。
- Ack 必须放在业务逻辑**完全结束之后**,且包裹在 defer 或 if err == nil 分支里
- 用
context.WithTimeout包裹整个 handler,防止某条消息卡死整个 consumer(比如 DB 查询超时) - 不要用
delivery.Reject(true)重入队列来“重试”——它不保证顺序,且可能无限循环;该走死信队列(DLX)就配好x-dead-letter-exchange,让失败消息进单独队列人工干预
为什么 fanout exchange 比 direct 更适合事件广播
订单创建后要通知库存、积分、风控三个服务?别用 direct exchange + 一堆 routing key 绑定——新增一个服务就得改订单服务代码,又回到紧耦合。fanout exchange 才是真正的发布/订阅:发一次,所有绑定的 queue 都收到副本,彼此完全隔离。
注意 fanout 不看 routing key,所以 ch.Publish("", "events.fanout", ...) 第二个参数(routing key)可以填空字符串,但 exchange 名必须提前 ch.ExchangeDeclare("events.fanout", "fanout", true, false, false, false, nil) 声明为 fanout 类型。
- fanout 的 queue 必须各自独立声明,不能共用同一个 queue name,否则多个 consumer 实际在争抢同一条消息
- 如果需要按规则过滤(比如只收 error 日志),换 topic exchange,用
"logs.error"这类 routing key +"logs.*"binding key - 别在 fanout 场景下设
durable: false的 queue——服务重启后 queue 消失,消息就永远丢了,哪怕 exchange 是 durable 的










