pipeline 核心是“每阶段只做一件事+channel串联”,关键在正确控制channel关闭时机和数据流向:输入用range需上游关闭,输出channel由最后使用者关闭,过滤时用select+default防阻塞。

Go 里怎么用 channel 实现基础 Pipeline 链式处理
Pipeline 的核心不是“多 goroutine”,而是“每个阶段只做一件事 + 用 channel 串起来”。你写错的关键往往在 channel 关闭时机和数据流向控制上。
常见错误现象:fatal error: all goroutines are asleep - deadlock,或者某阶段卡住、漏数据、重复消费。
- 每个阶段函数接收
chan T输入、返回chan U输出,不直接操作外部变量 - 用
range读取输入 channel,但必须确保上游会关闭它;否则range永远等下去 - 下游阶段不要自己关输出 channel —— 关闭动作应由**最后使用该 channel 的 goroutine** 承担(通常是调用方或下一个阶段)
- 如果中间阶段要过滤或跳过某些项,别用
continue后空转,要用select+default防阻塞,或提前判断再写入
示例:字符串转大写再统计长度
func upper(in <-chan string) <-chan string {
out := make(chan string)
go func() {
defer close(out)
for s := range in {
out <- strings.ToUpper(s)
}
}()
return out
}
<p>func length(in <-chan string) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for s := range in {
out <- len(s)
}
}()
return out
}为什么 pipeline 中间加 buffer channel 容易出错
加 make(chan int, 10) 看似能缓解阻塞,但会掩盖背压缺失问题,导致内存暴涨或数据丢失。
立即学习“go语言免费学习笔记(深入)”;
使用场景:仅当明确知道上游生产速率稳定、下游消费能力可预测,且延迟敏感(如实时日志采样)时才考虑缓冲。
- 缓冲 channel 不解决“谁来关 channel”的问题,反而让关闭逻辑更难追踪
-
len(ch)不是安全的判断依据 —— 它只反映当前缓冲区长度,无法反映 goroutine 是否还在往里写 - 缓冲大小设太大(比如
10000)会让 OOM 风险前移,错误出现在内存耗尽而非逻辑卡死 - 若 pipeline 某阶段 panic,带缓冲的 channel 可能滞留未消费数据,且无从感知
如何安全地终止正在运行的 pipeline
Go 没有“强制 kill goroutine”机制,终止必须靠 channel 信号协同,而不是靠 recover 或 context.WithCancel 简单包一层就完事。
常见错误现象:context canceled 报了,但 goroutine 还在跑;或者关闭了 input channel,但中间 stage 还在往 output channel 写导致 panic。
- 每个 stage 都要监听
ctx.Done(),并在 select 中优先响应取消信号 - 写入 output channel 前必须用
select判断是否已取消,避免向已关闭 channel 发送 - 不要在 stage 内部启动新 goroutine 并忽略其生命周期 —— 它们不会随 ctx 自动结束
- 如果某个 stage 依赖外部 I/O(如 HTTP 请求),需单独设置超时,并在 ctx 取消时主动中断连接
Go 1.22+ 的 iter.Seq 能替代 pipeline 吗
不能。它解决的是“遍历抽象”,不是“并发阶段解耦”。iter.Seq 是同步迭代器,底层仍是单 goroutine 顺序执行。
使用场景:当你只需要链式转换数据结构(如 slice → map → filtered slice),且无需并行、无背压需求、不涉及 I/O 或阻塞操作时,iter.Seq 更轻量。
-
iter.Seq返回值不能直接喂给另一个 goroutine —— 它不是 channel,没有并发安全保证 - 无法实现“三个 stage 分别跑在不同 CPU 核心”的真实流水线效果
- 一旦某个 step 出错(比如除零),整个迭代中断,没法像 channel pipeline 那样隔离失败影响范围
- 与现有基于 channel 的工具链(如
golang.org/x/exp/slices)不兼容,迁移成本高
真正复杂的 pipeline 得靠 channel + context + 显式错误传递。想绕开这些细节,迟早会在生产环境遇到数据不一致或 goroutine 泄露。











