go 中 channel 本身无流水线语义,需靠 goroutine 组织实现;range 遍历需发送方显式关闭 channel,多生产者时须用 waitgroup 同步关闭;取消必须用 done channel(如 context.done()),各 stage 监听并及时退出。

Go 中的 channel 本身不提供“流水线”语义,所谓流水线是多个 goroutine 通过 channel 串行传递数据的模式——关键不在 channel 怎么写,而在如何组织 goroutine 生命周期、错误传播和关闭信号。
为什么 range 遍历 channel 后必须显式关闭
如果不关闭发送端的 channel,接收端 range 会永远阻塞;但过早关闭(比如在第一个 stage 就 close)会导致后续 stage 收不到关闭通知,引发 panic 或死锁。
- 只有发送方能决定何时“数据发完了”,所以 关闭操作必须由最后一个发送 goroutine 执行
- 多个 goroutine 向同一 channel 发送时,不能用
defer close(ch),应使用sync.WaitGroup等待全部发送完成后再 close - 接收方只管读,不负责关;若需提前退出(如出错),要用
select+donechannel 配合
select + done channel 是控制流水线退出的唯一可靠方式
单纯靠关闭 channel 无法实现“任意阶段出错就中止整个流水线”,因为下游 goroutine 可能还在处理旧数据,而上游已停发。必须引入独立的取消信号。
- 每个 stage 都要监听
donechannel:一旦收到信号,立即停止工作、清理资源、返回 -
done通常用context.Context的Done()替代,更易传播取消链 - 不要在 stage 内部直接 close 输入 channel,否则其他 goroutine 可能 panic:「send on closed channel」
示例片段:
立即学习“go语言免费学习笔记(深入)”;
func stage(in <-chan int, done <-chan struct{}) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for v := range in {
select {
case out <- v * 2:
case <-done:
return
}
}
}()
return out
}buffered channel 不等于“防阻塞”,反而容易掩盖背压问题
给 channel 设 buffer(如 make(chan int, 100))会让发送端暂时不阻塞,但若下游消费慢,buffer 会持续积压,最终 OOM。流水线本质是带背压的数据流,buffer 只应作为短时抖动缓冲。
- IO 密集型 stage(如 HTTP 请求)建议 buffer = 1~4,避免堆积请求
- CPU 密集型 stage(如解密、转码)buffer 建议为 0(unbuffered),强制上游等待下游就绪,天然限速
- 永远不要用
len(ch) == cap(ch)判断是否满——这是竞态行为,len和cap在并发下不可靠
error 必须随数据一起流动,不能只走日志或全局变量
流水线中任一环节出错,下游需要知道“这条数据失败了”,而不是默默跳过或全盘终止。常见做法是定义带 error 的结构体,或用两个 channel 分别传数据和 error。
- 推荐方案:用
struct{ Value T; Err error }类型统一承载结果,避免多 channel 管理复杂度 - 若某 stage 需批量处理并部分失败,应在输出结构体中标记
Skipped bool或RetryAfter time.Time - 不要在中间 stage recover panic 并吞掉错误——这会让调用方完全失去上下文
真正难的不是写出几个 goroutine 和 channel,而是让每个 stage 都能独立响应取消、正确报告错误、不泄漏 goroutine,且 buffer 大小与实际吞吐匹配。这些细节不写进代码注释里,运行时根本看不出来。










