直接用 goroutine + channel 拼接易卡死,因未正确关闭 channel 和处理接收阻塞:上游须显式关闭输入 channel,下游须用 for v := range ch 而非 for { v, ok := <-ch },否则触发 deadlock 或永久阻塞。

为什么直接用 goroutine + channel 拼接容易卡死
Go 的 pipeline 模式本质是把数据流经的每个阶段抽象成独立的 goroutine,靠 channel 传递。但很多人一上来就写 in := make(chan int) 然后塞一堆 go stage1(in, out1),结果程序跑几轮就 hang 住——根本原因是没处理好 channel 关闭和接收端阻塞。
典型错误现象:fatal error: all goroutines are asleep - deadlock,或者某阶段永远等不到关闭信号,下游一直阻塞在 range ch。
- 每个 stage 的输入 channel 必须由上游显式关闭(不能靠 GC)
- 接收端必须用
for v := range ch而不是for { v, ok := —— 后者漏掉关闭通知时会丢数据 - 如果某个 stage 可能提前退出(比如过滤掉所有数据),它仍要负责关闭输出 channel,否则下游永远等不到 EOF
如何让 pipeline 支持取消和错误传播
真实业务里,pipeline 不可能只跑成功路径。用户中断、上游超时、中间 stage 解析失败,都得让整条链立刻停止并释放资源。Go 标准库的 context.Context 是唯一靠谱方案,但很多人只在入口传 ctx,中间 stage 完全忽略它。
使用场景:ETL 处理一批日志,某条记录 JSON 格式错误,应终止当前 batch 并返回错误;或用户点击取消按钮,正在处理的 5000 条数据要立即中止。
立即学习“go语言免费学习笔记(深入)”;
- 每个 stage 的函数签名必须带
ctx context.Context参数,并在select中监听ctx.Done() - 错误不能只 panic 或 log,必须通过额外的
errCh chan error向上传播(注意:不要用同一个 channel 传数据和错误) -
context.WithCancel的 cancel 函数要在最外层调用,且必须 defer 调用,否则 goroutine 泄漏
goroutine 泄漏的三个高发位置
pipeline 写完跑通了,压测一小时后内存暴涨——大概率是 goroutine 没退出。和普通并发不同,pipeline 的 goroutine 生命周期依赖 channel 关闭顺序,错一个就卡死一片。
常见错误现象:runtime: goroutine stack exceeds 1000000000-byte limit,或 pprof 显示数百个 goroutine 停在 chan receive。
- stage 函数里用了
for range in,但上游忘记 close(in),这个 goroutine 永远不会退出 - 用
select等待多个 channel 时,漏写了default:或case ,导致阻塞在无数据 channel 上 - 错误处理分支里 return 前没 close 输出 channel,下游 stage 一直在 range 等不到关闭
什么时候不该用 pipeline 模式
不是所有并发场景都适合 pipeline。它适合“线性加工流”,比如读 → 解析 → 过滤 → 转换 → 存储。一旦出现分支(如根据字段值分发到不同处理逻辑)、聚合(如统计总数)、或强状态依赖(如需要前 10 条数据才能决定第 11 条怎么处理),硬套 pipeline 只会让代码更难懂、更难 debug。
性能影响:每个 stage 都是一次 goroutine 切换 + channel 通信开销。如果单条数据处理耗时低于 10μs,pipeline 带来的调度成本可能超过收益。
- 数据量小(
- 需要跨 stage 共享状态(比如累计计数器),别用 channel 传,改用原子变量或 mutex 保护的 struct
- stage 之间有反馈回路(如限流器动态调整上游速率),标准 pipeline 模型无法表达,得换 actor 模型或手写协调 goroutine
真正难的不是写对第一个 stage,而是确保最后一个 stage 关闭后,所有中间 goroutine 都已退出、所有 channel 已被垃圾回收。这点连很多开源库都没做干净。










