System.Threading.Channels: адекватные lock-free очереди для producer/consumer
BlockingCollection<T> долгое время был стандартом для producer/consumer в .NET. Он работает, но построен на блокирующих примитивах — когда очередь пуста, читающий поток висит на Monitor.Wait(). В мире async/await это антипаттерн: заблокированный поток — потраченный ресурс.
System.Threading.Channels грамотно решил эту проблему. Lock-free структуры данных, async API, контроль backpressure, интеграция с пайплайнами обработки данных. Это не замена BlockingCollection — это просто другой уровень.
Разберём, как Channels устроены, когда их использовать, и какие паттерны они открывают.
Проблема с BlockingCollection
Классический producer/consumer на BlockingCollection:
var collection = new BlockingCollection<Message>(boundedCapacity: 1000);
// Producer
Task.Run(() => {
foreach (var msg in GetMessages())
{
collection.Add(msg); // Блокируется, если очередь полная
}
collection.CompleteAdding();
});
// Consumer
Task.Run(() => {
foreach (var msg in collection.GetConsumingEnumerable())
{
Process(msg); // Блокируется, если очередь пустая
}
});Выглядит просто, но:
Блокирующие вызовы.
Add()блокирует поток, если очередь заполнена.Take()блокирует, если пуста. В ThreadPool это означает истощение потоков под нагрузкой.Нет async.
AddAsync()не существует. Для async-кода приходится делатьTask.Run(() => collection.Add(...)), по сути, притворяться асинхронным.Внутренние блокировки. BlockingCollection построен на
lockиMonitor. Под высокой нагрузкой contention снижает производительность.
Channel: async-native очередь
Channel — такая вот трубка с двумя концами: Writer для записи, Reader для чтения.
var channel = Channel.CreateUnbounded<Message>();
// Producer (async)
async Task ProduceAsync()
{
await foreach (var msg in GetMessagesAsync())
{
await channel.Writer.WriteAsync(msg);
}
channel.Writer.Complete();
}
// Consumer (async)
async Task ConsumeAsync()
{
await foreach (var msg in channel.Reader.ReadAllAsync())
{
await ProcessAsync(msg);
}
}WriteAsync и ReadAsync — обычные async-методы. Если очередь полная/пустая, они не блокируют поток, а возвращают незавершённый ValueTask, который продолжится, когда появится место/данные.
Unbounded vs Bounded
Два типа каналов с принципиально разным поведением:
Unbounded Channel:
var channel = Channel.CreateUnbounded<T>();Бесконечная очередь. WriteAsync никогда не ждёт — данные всегда принимаются. Просто и быстро, но если producer быстрее consumer, память растёт без ограничений. Подходит, когда уверены, что consumer справляется, или для коротких берстов.
Bounded Channel:
var channel = Channel.CreateBounded<T>(new BoundedChannelOptions(1000)
{
FullMode = BoundedChannelFullMode.Wait
});Ограниченная очередь. Когда полная, поведение определяется FullMode:
Wait —
WriteAsyncждёт, пока освободится место (backpressure)DropNewest — новое сообщение отбрасывается
DropOldest — старейшее сообщение отбрасывается
DropWrite — запись отклоняется, но не ждёт
Backpressure (Wait) — правильный выбор для большинства сценариев. Producer замедляется, если consumer не успевает.
Внутренняя архитектура
Channels используют lock-free структуры данных где возможно. Unbounded channel построен на concurrent queue с атомарными операциями. Bounded channel сложнее,нужно отслеживать количество элементов.
Основная оптимизация — SingleReader и SingleWriter опции:
var channel = Channel.CreateUnbounded<T>(new UnboundedChannelOptions
{
SingleReader = true,
SingleWriter = true
});Если канал используется одним producer и одним consumer, эти флаги позволяют убрать синхронизацию. Разница в производительности в разы.
На практике жеSingleWriter часто актуален (один источник данных), SingleReader реже (часто несколько обработчиков).
API: Writer и Reader
ChannelWriter<T>:
// Основной метод записи
await writer.WriteAsync(item, cancellationToken);
// Синхронная запись, если можно
if (writer.TryWrite(item))
{
// Записано
}
else
{
// Очередь полная (для bounded) или завершена
}
// Ожидание возможности записи
await writer.WaitToWriteAsync(cancellationToken);
// Сигнал завершения
writer.Complete();
writer.Complete(exception); // С ошибкойChannelReader<T>:
// Чтение с ожиданием
T item = await reader.ReadAsync(cancellationToken);
// Попытка без ожидания
if (reader.TryRead(out T item))
{
// Прочитано
}
// Ожидание данных
if (await reader.WaitToReadAsync(cancellationToken))
{
// Данные есть, можно читать
}
// Итерация по всем элементам
await foreach (var item in reader.ReadAllAsync(cancellationToken))
{
Process(item);
}ReadAllAsync() — самый удобный способ для consumer. Он читает до завершения канала, корректно обрабатывает cancellation.
Pipeline обработки
Channels естественно соединяются в пайплайны:
async Task ProcessPipeline(ChannelReader<RawData> source,
ChannelWriter<ProcessedData> sink)
{
await foreach (var raw in source.ReadAllAsync())
{
var processed = Transform(raw);
await sink.WriteAsync(processed);
}
sink.Complete();
}
// Построение пайплайна
var input = Channel.CreateBounded<RawData>(100);
var validated = Channel.CreateBounded<ValidData>(100);
var enriched = Channel.CreateBounded<EnrichedData>(100);
var output = Channel.CreateBounded<FinalData>(100);
// Запуск стадий
var tasks = new[]
{
ValidateStage(input.Reader, validated.Writer),
EnrichStage(validated.Reader, enriched.Writer),
TransformStage(enriched.Reader, output.Writer),
OutputStage(output.Reader)
};
await Task.WhenAll(tasks);Каждая стадия как независимая async-задача. Bounded channels дают backpressure,если последняя стадия тормозит, давление распространяется назад по цепочке.
Fan-out / Fan-in
Fan-out — один producer, много consumers:
async Task FanOut(ChannelReader<Work> source, int workerCount)
{
var tasks = Enumerable.Range(0, workerCount)
.Select(_ => WorkerAsync(source))
.ToArray();
await Task.WhenAll(tasks);
}
async Task WorkerAsync(ChannelReader<Work> source)
{
await foreach (var work in source.ReadAllAsync())
{
await ProcessAsync(work);
}
}Несколько consumers читают из одного канала. Channel сам распределяет работу, каждый элемент достаётся одному consumer.
Fan-in — много producers, один consumer:
async Task FanIn(IEnumerable<ChannelReader<Result>> sources,
ChannelWriter<Result> sink)
{
async Task Forward(ChannelReader<Result> source)
{
await foreach (var item in source.ReadAllAsync())
{
await sink.WriteAsync(item);
}
}
var tasks = sources.Select(Forward).ToArray();
await Task.WhenAll(tasks);
sink.Complete();
}Результаты из нескольких источников собираются в один канал.
Пример обработки логов
Архитектура: HTTP endpoint принимает логи, они буферизуются и пакетами записываются в storage.
public class LogProcessor : IHostedService
{
private readonly Channel<LogEntry> _channel;
private readonly ILogStorage _storage;
private Task _processingTask;
public LogProcessor(ILogStorage storage)
{
_storage = storage;
_channel = Channel.CreateBounded<LogEntry>(new BoundedChannelOptions(10000)
{
FullMode = BoundedChannelFullMode.DropOldest, // При перегрузке теряем старые
SingleReader = true
});
}
// Вызывается из HTTP handlers
public bool TryEnqueue(LogEntry entry)
{
return _channel.Writer.TryWrite(entry);
}
public Task StartAsync(CancellationToken ct)
{
_processingTask = ProcessLogsAsync(ct);
return Task.CompletedTask;
}
public async Task StopAsync(CancellationToken ct)
{
_channel.Writer.Complete();
await _processingTask;
}
private async Task ProcessLogsAsync(CancellationToken ct)
{
var batch = new List<LogEntry>(100);
var timer = new PeriodicTimer(TimeSpan.FromSeconds(1));
while (!ct.IsCancellationRequested)
{
// Читаем до заполнения batch или timeout
using var cts = CancellationTokenSource.CreateLinkedTokenSource(ct);
var timerTask = timer.WaitForNextTickAsync(cts.Token).AsTask();
var readTask = ReadBatchAsync(batch, cts.Token);
await Task.WhenAny(timerTask, readTask);
cts.Cancel(); // Отменяем оставшуюся задачу
if (batch.Count > 0)
{
await _storage.WriteBatchAsync(batch);
batch.Clear();
}
}
}
private async Task ReadBatchAsync(List<LogEntry> batch, CancellationToken ct)
{
while (batch.Count < 100)
{
if (!await _channel.Reader.WaitToReadAsync(ct))
break;
while (batch.Count < 100 && _channel.Reader.TryRead(out var entry))
{
batch.Add(entry);
}
}
}
}DropOldest — при перегрузке теряем старые логи, а не блокируем HTTP
Batching — накапливаем до 100 записей или 1 секунды
Graceful shutdown — Complete() сигнализирует о завершении, consumer дочитывает остаток
rate limiter
Ограничение скорости запросов к внешнему API:
public class RateLimitedClient
{
private readonly Channel<Func<Task>> _requestQueue;
private readonly HttpClient _client;
public RateLimitedClient(HttpClient client, int requestsPerSecond)
{
_client = client;
_requestQueue = Channel.CreateUnbounded<Func<Task>>(
new UnboundedChannelOptions { SingleReader = true });
// Запускаем processor
_ = ProcessQueueAsync(requestsPerSecond);
}
public async Task<T> ExecuteAsync<T>(Func<HttpClient, Task<T>> request)
{
var tcs = new TaskCompletionSource<T>();
await _requestQueue.Writer.WriteAsync(async () =>
{
try
{
var result = await request(_client);
tcs.SetResult(result);
}
catch (Exception ex)
{
tcs.SetException(ex);
}
});
return await tcs.Task;
}
private async Task ProcessQueueAsync(int rps)
{
var delay = TimeSpan.FromMilliseconds(1000.0 / rps);
await foreach (var request in _requestQueue.Reader.ReadAllAsync())
{
await request();
await Task.Delay(delay);
}
}
}
// Использование
var client = new RateLimitedClient(httpClient, requestsPerSecond: 10);
var results = await Task.WhenAll(
client.ExecuteAsync(c => c.GetStringAsync("/api/1")),
client.ExecuteAsync(c => c.GetStringAsync("/api/2")),
client.ExecuteAsync(c => c.GetStringAsync("/api/3"))
);
// Запросы выполнятся с интервалом 100msChannel здесь — очередь ожидающих запросов. Processor выполняет их с заданной скоростью.
Возможные проблемы
Забыть вызвать Complete():
// Consumer будет ждать вечно
await foreach (var item in reader.ReadAllAsync())
{
Process(item);
}
// Никогда не выйдет, если writer не вызвал Complete()Всегда вызывайте Complete() когда producer закончил работу. Используйте try/finally или Complete(exception) при ошибках.
Не обработать завершение с ошибкой:
writer.Complete(new InvalidDataException("Bad input"));
// Consumer должен это обработать
try
{
await foreach (var item in reader.ReadAllAsync()) { }
}
catch (ChannelClosedException ex) when (ex.InnerException != null)
{
// Здесь ex.InnerException — InvalidDataException
}Unbounded channel + медленный consumer = OOM:
// Producer быстрый
while (true)
{
await channel.Writer.WriteAsync(GenerateData());
}
// Consumer медленный
await foreach (var data in channel.Reader.ReadAllAsync())
{
await SlowProcessAsync(data); // Очередь растёт без ограничений
}Используйте bounded channel или мониторьте размер очереди.
Итак, на основе channels можно построить пайплайны, rate limiters, буферизацию, fan-out/fan-in. Это примитив — простой и быстрый.
Если хочется прокачать не только синтаксис, но и инженерную зрелость в вебе на .NET, посмотрите программу «C# ASP.NET Core разработчик». Там много практики по высоконагруженным API, интеграционным/нагрузочным тестам, Docker, CI/CD и Kubernetes — ровно тот стек, где Channels и пайплайны становятся ежедневным инструментом. Готовы к серьезному обучению? Пройдите вступительный тест.
Для знакомства с форматом обучения и экспертами приходите на бесплатные демо-уроки:
29 января в 20:00. «Конфигурирование приложения ASP.NET CORE». Записаться
11 февраля в 20:00. «Настройка CI/CD на практике с помощью Docker для ASP.NET-приложений». Записаться
16 февраля в 20:00. «Реализация паттерна CQRS с использованием библиотеки MediatR и ASP.NET CORE». Записаться