
本文介绍一种基于 `sync.waitgroup` 和非阻塞通道发送的优雅方案,解决“工作池中每个任务可动态生成新任务”这一典型并发问题,避免死锁、竞态与资源浪费。
在构建爬虫、并行处理树状结构或执行可扩展异步任务时,常遇到一类特殊需求:初始一批任务启动后,每个任务在执行过程中可能动态产生新任务(如解析网页发现新链接),这些新任务需被同一工作池消费。此时,传统固定数量 goroutine + 简单 channel 模型易陷入僵局——所有 worker 同时阻塞在
核心挑战在于:既要保证任务不丢失、不重复,又要确保所有 worker 能安全退出(即无待处理任务且无活跃生产者)。原问题中尝试用 working 通道统计活跃 worker 数并关闭队列的方式,不仅逻辑复杂、易出竞态,还依赖对 select 执行顺序的误解(实际是随机公平选择),且无法应对“worker 自产自销”的递归场景。
推荐解法是采用 sync.WaitGroup 驱动生命周期 + 非阻塞通道回退机制,其关键设计如下:
- WaitGroup 精确跟踪“待完成任务数”:每次入队前 wg.Add(1),每次完成(无论由 worker 还是当前 goroutine 执行)后 wg.Done();
- 非阻塞发送保障不阻塞:向任务 channel 发送时使用 select { case jobs 立即降级为同步执行(j.do(enqueue)),避免任何 goroutine 卡住;
- 主流程等待全部完成:wg.Wait() 确保所有任务(含递归生成的)执行完毕后才退出;
- 无需手动关闭 channel:close(jobs) 放在 wg.Wait() 后,确保所有 worker 已自然退出(range jobs 遇到 closed channel 会自动终止)。
以下是精简可靠的实现示例:
package main
import (
"fmt"
"sync"
"time"
)
type Job struct {
URL string
Depth int
}
func (j *Job) Do(enqueue func(Job)) {
fmt.Printf("Processing %s (depth %d)\n", j.URL, j.Depth)
time.Sleep(10 * time.Millisecond) // 模拟网络请求
// 模拟发现新链接(仅在深度 < 2 时递归)
if j.Depth < 2 {
for i := 0; i < 2; i++ {
enqueue(Job{
URL: fmt.Sprintf("%s/sub%d", j.URL, i),
Depth: j.Depth + 1,
})
}
}
}
func main() {
const workers = 3
jobs := make(chan Job, 10) // 缓冲通道提升吞吐,但大小非关键
var wg sync.WaitGroup
// 启动 worker 池
for i := 0; i < workers; i++ {
go func() {
for job := range jobs {
job.Do(func(j Job) {
wg.Add(1)
select {
case jobs <- j:
// 成功入队,由其他 worker 处理
default:
// 通道满或无空闲 worker → 当前 goroutine 同步执行
j.Do(func(k Job) {
wg.Add(1)
select {
case jobs <- k:
default:
k.Do(enqueue) // 递归嵌套需同策略
wg.Done()
}
})
wg.Done()
}
})
wg.Done()
}
}()
}
// 封装安全入队函数(注意:需在 goroutine 启动后定义,避免闭包捕获未初始化变量)
var enqueue func(Job)
enqueue = func(j Job) {
wg.Add(1)
select {
case jobs <- j:
default:
j.Do(enqueue)
wg.Done()
}
}
// 提交初始任务
for i := 0; i < 5; i++ {
enqueue(Job{URL: fmt.Sprintf("https://example.com/%d", i), Depth: 0})
}
// 等待所有任务完成
wg.Wait()
close(jobs) // 通知所有 worker 退出
fmt.Println("All jobs completed.")
}⚠️ 注意事项:
- enqueue 函数必须在 worker 启动之后定义(如示例中所示),否则闭包可能捕获未初始化的 jobs 或 wg;
- 递归调用 enqueue 时仍需 wg.Add(1),确保 WaitGroup 计数准确;
- 缓冲通道大小(如 make(chan Job, 10))仅影响吞吐,不决定正确性——非阻塞 default 分支兜底消除死锁风险;
- 若任务量极大(如全网爬取),需额外加入去重(如 map[string]bool + sync.Map)、限速、错误重试等机制,但本模式的调度骨架依然适用。
该方案以极少代码达成高鲁棒性:它天然支持任意深度的动态任务生成,无竞态、无死锁、无资源泄漏,是 Go 中处理“递归式工作池”问题的标准实践。










