BlockingCollection<T> долгое время был стандартом для producer/consumer в .NET. Он работает, но построен на блокирующих примитивах — когда очередь пуста, читающий поток висит на Monitor.Wait(). В мире async/await это антипаттерн: заблокированный поток — потраченный ресурс.
System.Threading.Channels грамотно решил эту проблему. Lock-free структуры данных, async API, контроль backpressure, интеграция с пайплайнами обработки данных. Это не замена BlockingCollection — это просто другой уровень.
Разберём, как Channels устроены, когда их использовать, и какие паттерны они открывают.
Проблема с BlockingCollection
Классический producer/consumer на BlockingCollection:
var collection = new BlockingCollection<Message>(boundedCapacity: 1000); // Producer Task.Run(() => { foreach (var msg in GetMessages()) { collection.Add(msg); // Блокируется, если очередь полная } collection.CompleteAdding(); }); // Consumer Task.Run(() => { foreach (var msg in collection.GetConsumingEnumerable()) { Process(msg); // Блокируется, если очередь пустая } });
Выглядит просто, но:
Блокирующие вызовы.
Add()блокирует поток, если очередь заполнена.Take()блокирует, если пуста. В ThreadPool это означает истощение потоков под нагрузкой.Нет async.
AddAsync()не существует. Для async-кода приходится делатьTask.Run(() => collection.Add(...)), по сути, притворяться асинхронным.Внутренние блокировки. BlockingCollection построен на
lockиMonitor. Под высокой нагрузкой contention снижает производительность.
Channel: async-native очередь
Channel — такая вот трубка с двумя концами: Writer для записи, Reader для чтения.
var channel = Channel.CreateUnbounded<Message>(); // Producer (async) async Task ProduceAsync() { await foreach (var msg in GetMessagesAsync()) { await channel.Writer.WriteAsync(msg); } channel.Writer.Complete(); } // Consumer (async) async Task ConsumeAsync() { await foreach (var msg in channel.Reader.ReadAllAsync()) { await ProcessAsync(msg); } }
WriteAsync и ReadAsync — обычные async-методы. Если очередь полная/пустая, они не блокируют поток, а возвращают незавершённый ValueTask, который продолжится, когда появится место/данные.
Unbounded vs Bounded
Два типа каналов с принципиально разным поведением:
Unbounded Channel:
var channel = Channel.CreateUnbounded<T>();
Бесконечная очередь. WriteAsync никогда не ждёт — данные всегда принимаются. Просто и быстро, но если producer быстрее consumer, память растёт без ограничений. Подходит, когда уверены, что consumer справляется, или для коротких берстов.
Bounded Channel:
var channel = Channel.CreateBounded<T>(new BoundedChannelOptions(1000) { FullMode = BoundedChannelFullMode.Wait });
Ограниченная очередь. Когда полная, поведение определяется FullMode:
Wait —
WriteAsyncждёт, пока освободится место (backpressure)DropNewest — новое сообщение отбрасывается
DropOldest — старейшее сообщение отбрасывается
DropWrite — запись отклоняется, но не ждёт
Backpressure (Wait) — правильный выбор для большинства сценариев. Producer замедляется, если consumer не успевает.
Внутренняя архитектура
Channels используют lock-free структуры данных где возможно. Unbounded channel построен на concurrent queue с атомарными операциями. Bounded channel сложнее,нужно отслеживать количество элементов.
Основная оптимизация — SingleReader и SingleWriter опции:
var channel = Channel.CreateUnbounded<T>(new UnboundedChannelOptions { SingleReader = true, SingleWriter = true });
Если канал используется одним producer и одним consumer, эти флаги позволяют убрать синхронизацию. Разница в производительности в разы.
На практике жеSingleWriter часто актуален (один источник данных), SingleReader реже (часто несколько обработчиков).
API: Writer и Reader
ChannelWriter<T>:
// Основной метод записи await writer.WriteAsync(item, cancellationToken); // Синхронная запись, если можно if (writer.TryWrite(item)) { // Записано } else { // Очередь полная (для bounded) или завершена } // Ожидание возможности записи await writer.WaitToWriteAsync(cancellationToken); // Сигнал завершения writer.Complete(); writer.Complete(exception); // С ошибкой
ChannelReader<T>:
// Чтение с ожиданием T item = await reader.ReadAsync(cancellationToken); // Попытка без ожидания if (reader.TryRead(out T item)) { // Прочитано } // Ожидание данных if (await reader.WaitToReadAsync(cancellationToken)) { // Данные есть, можно читать } // Итерация по всем элементам await foreach (var item in reader.ReadAllAsync(cancellationToken)) { Process(item); }
ReadAllAsync() — самый удобный способ для consumer. Он читает до завершения канала, корректно обрабатывает cancellation.
Pipeline обработки
Channels естественно соединяются в пайплайны:
async Task ProcessPipeline(ChannelReader<RawData> source, ChannelWriter<ProcessedData> sink) { await foreach (var raw in source.ReadAllAsync()) { var processed = Transform(raw); await sink.WriteAsync(processed); } sink.Complete(); } // Построение пайплайна var input = Channel.CreateBounded<RawData>(100); var validated = Channel.CreateBounded<ValidData>(100); var enriched = Channel.CreateBounded<EnrichedData>(100); var output = Channel.CreateBounded<FinalData>(100); // Запуск стадий var tasks = new[] { ValidateStage(input.Reader, validated.Writer), EnrichStage(validated.Reader, enriched.Writer), TransformStage(enriched.Reader, output.Writer), OutputStage(output.Reader) }; await Task.WhenAll(tasks);
Каждая стадия как независимая async-задача. Bounded channels дают backpressure,если последняя стадия тормозит, давление распространяется назад по цепочке.
Fan-out / Fan-in
Fan-out — один producer, много consumers:
async Task FanOut(ChannelReader<Work> source, int workerCount) { var tasks = Enumerable.Range(0, workerCount) .Select(_ => WorkerAsync(source)) .ToArray(); await Task.WhenAll(tasks); } async Task WorkerAsync(ChannelReader<Work> source) { await foreach (var work in source.ReadAllAsync()) { await ProcessAsync(work); } }
Несколько consumers читают из одного канала. Channel сам распределяет работу, каждый элемент достаётся одному consumer.
Fan-in — много producers, один consumer:
async Task FanIn(IEnumerable<ChannelReader<Result>> sources, ChannelWriter<Result> sink) { async Task Forward(ChannelReader<Result> source) { await foreach (var item in source.ReadAllAsync()) { await sink.WriteAsync(item); } } var tasks = sources.Select(Forward).ToArray(); await Task.WhenAll(tasks); sink.Complete(); }
Результаты из нескольких источников собираются в один канал.
Пример обработки логов
Архитектура: HTTP endpoint принимает логи, они буферизуются и пакетами записываются в storage.
public class LogProcessor : IHostedService { private readonly Channel<LogEntry> _channel; private readonly ILogStorage _storage; private Task _processingTask; public LogProcessor(ILogStorage storage) { _storage = storage; _channel = Channel.CreateBounded<LogEntry>(new BoundedChannelOptions(10000) { FullMode = BoundedChannelFullMode.DropOldest, // При перегрузке теряем старые SingleReader = true }); } // Вызывается из HTTP handlers public bool TryEnqueue(LogEntry entry) { return _channel.Writer.TryWrite(entry); } public Task StartAsync(CancellationToken ct) { _processingTask = ProcessLogsAsync(ct); return Task.CompletedTask; } public async Task StopAsync(CancellationToken ct) { _channel.Writer.Complete(); await _processingTask; } private async Task ProcessLogsAsync(CancellationToken ct) { var batch = new List<LogEntry>(100); var timer = new PeriodicTimer(TimeSpan.FromSeconds(1)); while (!ct.IsCancellationRequested) { // Читаем до заполнения batch или timeout using var cts = CancellationTokenSource.CreateLinkedTokenSource(ct); var timerTask = timer.WaitForNextTickAsync(cts.Token).AsTask(); var readTask = ReadBatchAsync(batch, cts.Token); await Task.WhenAny(timerTask, readTask); cts.Cancel(); // Отменяем оставшуюся задачу if (batch.Count > 0) { await _storage.WriteBatchAsync(batch); batch.Clear(); } } } private async Task ReadBatchAsync(List<LogEntry> batch, CancellationToken ct) { while (batch.Count < 100) { if (!await _channel.Reader.WaitToReadAsync(ct)) break; while (batch.Count < 100 && _channel.Reader.TryRead(out var entry)) { batch.Add(entry); } } } }
DropOldest — при перегрузке теряем старые логи, а не блокируем HTTP
Batching — накапливаем до 100 записей или 1 секунды
Graceful shutdown — Complete() сигнализирует о завершении, consumer дочитывает остаток
rate limiter
Ограничение скорости запросов к внешнему API:
public class RateLimitedClient { private readonly Channel<Func<Task>> _requestQueue; private readonly HttpClient _client; public RateLimitedClient(HttpClient client, int requestsPerSecond) { _client = client; _requestQueue = Channel.CreateUnbounded<Func<Task>>( new UnboundedChannelOptions { SingleReader = true }); // Запускаем processor _ = ProcessQueueAsync(requestsPerSecond); } public async Task<T> ExecuteAsync<T>(Func<HttpClient, Task<T>> request) { var tcs = new TaskCompletionSource<T>(); await _requestQueue.Writer.WriteAsync(async () => { try { var result = await request(_client); tcs.SetResult(result); } catch (Exception ex) { tcs.SetException(ex); } }); return await tcs.Task; } private async Task ProcessQueueAsync(int rps) { var delay = TimeSpan.FromMilliseconds(1000.0 / rps); await foreach (var request in _requestQueue.Reader.ReadAllAsync()) { await request(); await Task.Delay(delay); } } } // Использование var client = new RateLimitedClient(httpClient, requestsPerSecond: 10); var results = await Task.WhenAll( client.ExecuteAsync(c => c.GetStringAsync("/api/1")), client.ExecuteAsync(c => c.GetStringAsync("/api/2")), client.ExecuteAsync(c => c.GetStringAsync("/api/3")) ); // Запросы выполнятся с интервалом 100ms
Channel здесь — очередь ожидающих запросов. Processor выполняет их с заданной скоростью.
Возможные проблемы
Забыть вызвать Complete():
// Consumer будет ждать вечно await foreach (var item in reader.ReadAllAsync()) { Process(item); } // Никогда не выйдет, если writer не вызвал Complete()
Всегда вызывайте Complete() когда producer закончил работу. Используйте try/finally или Complete(exception) при ошибках.
Не обработать завершение с ошибкой:
writer.Complete(new InvalidDataException("Bad input")); // Consumer должен это обработать try { await foreach (var item in reader.ReadAllAsync()) { } } catch (ChannelClosedException ex) when (ex.InnerException != null) { // Здесь ex.InnerException — InvalidDataException }
Unbounded channel + медленный consumer = OOM:
// Producer быстрый while (true) { await channel.Writer.WriteAsync(GenerateData()); } // Consumer медленный await foreach (var data in channel.Reader.ReadAllAsync()) { await SlowProcessAsync(data); // Очередь растёт без ограничений }
Используйте bounded channel или мониторьте размер очереди.
Итак, на основе channels можно построить пайплайны, rate limiters, буферизацию, fan-out/fan-in. Это примитив — простой и быстрый.

Если хочется прокачать не только синтаксис, но и инженерную зрелость в вебе на .NET, посмотрите программу «C# ASP.NET Core разработчик». Там много практики по высоконагруженным API, интеграционным/нагрузочным тестам, Docker, CI/CD и Kubernetes — ровно тот стек, где Channels и пайплайны становятся ежедневным инструментом. Готовы к серьезному обучению? Пройдите вступительный тест.
Для знакомства с форматом обучения и экспертами приходите на бесплатные демо-уроки:
29 января в 20:00. «Конфигурирование приложения ASP.NET CORE». Записаться
11 февраля в 20:00. «Настройка CI/CD на практике с помощью Docker для ASP.NET-приложений». Записаться
16 февраля в 20:00. «Реализация паттерна CQRS с использованием библиотеки MediatR и ASP.NET CORE». Записаться
