在多线程编程中,线程间的数据传递和同步往往是让人头疼的问题。要么担心多线程操作导致数据混乱,要么纠结如何处理 “生产者没数据,消费者等着”“集合满了,生产者还在塞数据” 的尴尬场景。而 BlockingCollection 就是为解决这些问题而生的 —— 它像一个智能的 “数据中转站”,自动协调生产者和消费者的节奏,还自带线程安全buff,让多线程协作变得简单高效
一、BlockingCollection 到底是什么?
简单说,BlockingCollection<T> 是.NET 框架中专门为生产者 - 消费者模式设计的线程安全集合。它的核心作用就是在多线程之间 “安全地传数据”,而且能自动处理 “没数据” 和 “数据满了” 的情况。它的特点可以用三个词概括:
- 线程安全:内部自带同步机制,允许多个线程安全地访问集合,无需开发者手动处理复杂的线程同步逻辑
- 阻塞特性:
Add
和Take
等方法的阻塞特性。集合空了,消费者取数据时会自动 “等”(阻塞);集合满了(设了容量上限),生产者加数据时也会 “等”,不用手动写等待逻辑 - 灵活配置:可以指定最大容量,还能换底层集合(比如队列
ConcurrentQueue
、栈ConcurrentStack
),适配不同的业务场景
二、推荐使用场景
BlockingCollection 不是万能的,但在以下场景中能大放异彩:
- 多线程数据传递:如,一个线程负责从网络中接收数据(生产者),另一个线程负责处理这些数据(消费者),用它来传数据,不用手动控制线程休眠或唤醒
- 任务队列:可以将需要执行的任务添加到
BlockingCollection
中,然后由一个或多个工作线程从集合中取出任务并执行。能自动平衡任务负载 - 流数据处理:像日志收集场景,一个线程收集日志(生产者),多个线程分析日志(消费者),它能让日志数据 “按需流动”,不积压也不空闲
三、底层集合:选对 “容器” 很重要
- BlockingCollection 本身不存数据,而是依赖底层的 “容器”,这些容器必须实现 IProducerConsumerCollection<T> 接口,且自带线程安全特性。常用的有两种:
底层集合 | 特点 | 适用场景 |
ConcurrentQueue<T> | 先进先出(FIFO),按添加顺序取数据 | 任务队列、消息处理(先到先处理) |
ConcurrentStack<T> | 后进先出(LIFO),后添加的先取 | 模拟栈操作,比如表达式计算、撤销操作记录 |
- 默认情况下,BlockingCollection 会用 ConcurrentQueue<T> 作为底层集合,如果你需要栈的特性,创建时指定即可:
// 用栈作为底层集合var stackBased = new BlockingCollection<int>(new ConcurrentStack<int>());var stackBased = new BlockingCollection<int>(new ConcurrentStack<int>(), 10);
// 用栈作为底层集合
var stackBased = new BlockingCollection<int>(new ConcurrentStack<int>());
var stackBased = new BlockingCollection<int>(new ConcurrentStack<int>(), 10);
四、实战 Demo:生产者 - 消费者模式轻松实现
1. 基础用法:自动阻塞,节奏自洽
- 下面的例子里,生产者添加 10 个数据,消费者逐个处理
- 当集合满了(容量 10),生产者会等
- 集合空了,消费者会等,完全不用手动控制线程休眠
- 当生产者线程完成添加后,通过调用
CompleteAdding
方法通知消费者线程,消费者线程在检测到集合完成添加且为空时,结束遍历
using System;using System.Collections.Concurrent;using System.Threading;class Program{ static void Main() { // 创建容量为10的BlockingCollection(默认用ConcurrentQueue) var blockingCollection = new BlockingCollection<int>(10); // 启动生产者线程 new Thread(() => { for (int i = 0; i < 10; i++) { Console.WriteLine($"生产者线程添加元素: {i}"); blockingCollection.Add(i); Thread.Sleep(100); } blockingCollection.CompleteAdding();// 告诉消费者:不会再添数据了 }).Start(); // 启动消费者线程 new Thread(() => { // 遍历集合,没数据会等,数据取完自动结束 foreach (var item in blockingCollection.GetConsumingEnumerable()) { Console.WriteLine($"消费者线程取出元素: {item}"); Thread.Sleep(200); } Console.WriteLine("所有数据处理完毕"); }).Start(); Console.ReadKey(); }}
using System;
using System.Collections.Concurrent;
using System.Threading;
class Program
{static void Main(){// 创建容量为10的BlockingCollection(默认用ConcurrentQueue)var blockingCollection = new BlockingCollection<int>(10);// 启动生产者线程new Thread(() =>{for (int i = 0; i < 10; i++){Console.WriteLine($"生产者线程添加元素: {i}");blockingCollection.Add(i);Thread.Sleep(100);}blockingCollection.CompleteAdding();// 告诉消费者:不会再添数据了}).Start();// 启动消费者线程new Thread(() =>{// 遍历集合,没数据会等,数据取完自动结束foreach (var item in blockingCollection.GetConsumingEnumerable()){Console.WriteLine($"消费者线程取出元素: {item}");Thread.Sleep(200);}Console.WriteLine("所有数据处理完毕");}).Start();Console.ReadKey();}
}
2. 用 Take 方法手动控制消费
- 除了用 GetConsumingEnumerable 自动遍历,也可以用 Take 方法手动取数据,适合需要更灵活控制的场景:
// 启动消费者线程new Thread(() =>{ while (!blockingCollection.IsCompleted) { try { int item = blockingCollection.Take();// 没数据会阻塞,取完且生产者结束会抛异常 Console.WriteLine($"消费者线程通过 Take 方法取出元素: {item}"); } catch (InvalidOperationException) { // 当blockingCollection.CompleteAdding()被调用且集合为空时,Take会抛此异常 break; } }}).Start();
// 启动消费者线程
new Thread(() =>
{while (!blockingCollection.IsCompleted){try{int item = blockingCollection.Take();// 没数据会阻塞,取完且生产者结束会抛异常Console.WriteLine($"消费者线程通过 Take 方法取出元素: {item}");}catch (InvalidOperationException){// 当blockingCollection.CompleteAdding()被调用且集合为空时,Take会抛此异常break;}}
}).Start();
五、常用方法和变量
BlockingCollection 的 API 设计得很简洁,核心方法和属性就几个,用好它们能应对大多数场景:
方法 / 属性 | 作用 | 关键点 |
Add(T item) | 向集合添加元素 | 集合满了会阻塞,直到有空间 |
Take() | 从集合取数据 | 集合空了会阻塞,直到有数据;注意:取完且结束添加时抛异常InvalidOperationException,需处理 |
TryAdd(T item) | 尝试添加数据 | 集合满了直接返回 false,不阻塞 |
TryTake(out T item) | 尝试取数据 | 集合空了直接返回 false,不阻塞 |
CompleteAdding() | 标记 “不再添加数据” | 必须调用,否则消费者可能一直等 |
IsCompleted | 判断是否 “添加结束且数据取完” | 消费者可通过它判断是否退出 |
GetConsumingEnumerable() | 返回一个可枚举对象 | 遍历它时会自动等数据,直到取出所有数据(没有数据时会阻塞) |
Count | 当前元素数量 | 多线程下可能瞬间变化,仅作参考,别用来判断 “是否为空 / 满” |
- 小技巧:需要非阻塞操作时用 TryAdd/TryTake,比如 “能加就加,加不上就先干别的”;需要阻塞等待时用 Add/Take,简化逻辑
六、线程安全怎么保证?内部机制揭秘
BlockingCollection 的线程安全不是凭空来的,主要靠这两点:
1. 底层集合的线程安全:它依赖的 ConcurrentQueue 等底层集合,本身就用了锁或无锁算法处理多线程冲突。比如 ConcurrentQueue,多个线程同时入队 / 出队时,会自动保证数据不乱
2. 阻塞与同步机制:当需要阻塞时(比如 Take 时集合空了),通过内部的信号量(Monitor.Wait/Pulse
或 SemaphoreSlim
)让线程休眠。有数据时再唤醒,避免了线程空转浪费资源
简单说,你不用手动加锁,调用它的方法时,放心让多线程同时操作就行
七、这些坑要避开!使用注意事项
1. 别忘调用 CompleteAdding:生产者结束添加后一定要调用,否则消费者线程会一直阻塞在 Take 或 GetConsumingEnumerable 上,永远不退出
2. 慎用 Count 属性:多线程环境下,Count 的值可能刚读完就变了,别用它判断 “集合是否满了” 或 “是否为空”,应该用 TryAdd/TryTake
3. 异常处理不能少:用 Take 方法时,必须捕获 InvalidOperationException(当集合结束添加且为空时抛出),否则程序会崩溃
4. 别画蛇添足加锁:BlockingCollection 内部已经处理了线程同步,外部再加 lock 会导致性能下降,甚至死锁
5. 控制集合容量:创建时指定合适的容量(比如 new BlockingCollection<int>(100)),避免数据无限积压导致内存暴涨
总结:多线程协作,有它就够了
- BlockingCollection 是.NET 中处理生产者 - 消费者模式的 “瑞士军刀”,它的核心价值在于:用简单的 API 解决了复杂的线程同步问题,自动协调生产者和消费者的节奏,还自带线程安全保证
- 无论是简单的线程间数据传递,还是复杂的多生产者 - 多消费者场景,它都能胜任。记住它的使用场景(数据传递、任务队列)、核心方法(Add/Take/CompleteAdding)和避坑点(别忘结束标记、慎用 Count),你就能在多线程编程中少走很多弯路
- 下次再遇到多线程数据同步的问题,不妨试试 BlockingCollection,让它帮你 “管好” 数据的流转
C# 生产者+消费者 实现视频播放器,代码示例及技术文档说明:
https://download.csdn.net/download/qq_34552942/91557549