BlockingCollection<T>是C#中专为生产者-消费者模式设计的线程安全集合,基于IProducerConsumerCollection<T>实现阻塞、限容、取消及完成通知;支持多生产者多消费者并发操作,配合CompleteAdding()和GetConsumingEnumerable()可优雅退出。

BlockingCollection<T> 是 C# 中专为生产者-消费者模式设计的线程安全集合,底层基于 IProducerConsumerCollection<T>(如 ConcurrentQueue<T>),自带阻塞、限容、取消支持,用起来简洁又可靠。
为什么选 BlockingCollection?
它不是普通集合,而是“带闸门的容器”:
- 生产者调用 Add() 时,若集合已满(设置了容量),会自动阻塞等待空位;
- 消费者调用 Take() 时,若集合为空,会自动阻塞等待新项;
- 支持 CompleteAdding() 通知“不再生产”,配合 IsCompleted 和 GetConsumingEnumerable() 实现优雅退出;
- 内置取消支持(可传 CancellationToken);
- 默认使用 ConcurrentQueue
基础实现:一个生产者 + 一个消费者
下面是最简可用示例,模拟日志写入场景:
var logs = new BlockingCollection<string>(boundedCapacity: 100);
// 生产者:异步生成日志
_ = Task.Run(() =>
{
for (int i = 0; i < 500; i++)
{
logs.Add($"Log #{i} at {DateTime.Now:HH:mm:ss}");
Thread.Sleep(10); // 模拟生产间隔
}
logs.CompleteAdding(); // 标记生产结束
});
// 消费者:逐条消费并打印
foreach (var log in logs.GetConsumingEnumerable())
{
Console.WriteLine($"[CONSUMED] {log}");
}多生产者 + 多消费者(真实场景)
实际中常需多个线程并发生产和消费。注意:
- `BlockingCollection
var tasks = new List<Task>();
var messages = new BlockingCollection<string>(boundedCapacity: 50);
// 启动 3 个生产者
for (int p = 0; p < 3; p++)
{
tasks.Add(Task.Run(() =>
{
var rnd = new Random();
for (int i = 0; i < 100; i++)
{
messages.Add($"P{p}-Msg{i}");
Thread.Sleep(rnd.Next(5, 20));
}
}));
}
// 启动 2 个消费者
for (int c = 0; c < 2; c++)
{
tasks.Add(Task.Run(() =>
{
foreach (var msg in messages.GetConsumingEnumerable())
{
Console.WriteLine($"[C{c}] {msg}");
Thread.Sleep(15); // 模拟处理耗时
}
}));
}
// 等所有生产者完成后再关闭
Task.WaitAll(tasks.Take(3).ToArray()); // 等前3个生产者
messages.CompleteAdding();
// 等所有任务结束
Task.WaitAll(tasks.ToArray());带取消和超时的健壮写法
实际项目中建议加入取消令牌和超时控制,避免死等:
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10));
try
{
// 生产者(带取消)
_ = Task.Run(() =>
{
for (int i = 0; i < 100; i++)
{
messages.Add($"Item{i}", cts.Token);
cts.Token.ThrowIfCancellationRequested();
}
messages.CompleteAdding();
}, cts.Token);
// 消费者(带取消 + 超时取值)
foreach (var item in messages.GetConsumingEnumerable(cts.Token))
{
Console.WriteLine(item);
// 可在此处做业务处理,也可用 cts.Token 判断是否该退出
}
}
catch (OperationCanceledException)
{
Console.WriteLine("操作被取消或超时");
}
catch (InvalidOperationException ex) when (ex.Message.Contains("completed"))
{
Console.WriteLine("集合已标记完成,消费结束");
}基本上就这些。BlockingCollection










