BlockingCollection<T> долгое время был стандартом для producer/consumer в .NET. Он работает, но построен на блокирующих примитивах — когда очередь пуста, читающий поток висит на Monitor.Wait(). В мире async/await это антипаттерн: заблокированный поток — потраченный ресурс.

System.Threading.Channels грамотно решил эту проблему. Lock-free структуры данных, async API, контроль backpressure, интеграция с пайплайнами обработки данных. Это не замена BlockingCollection — это просто другой уровень.

Разберём, как Channels устроены, когда их использовать, и какие паттерны они открывают.

Проблема с BlockingCollection

Классический producer/consumer на BlockingCollection:

var collection = new BlockingCollection<Message>(boundedCapacity: 1000);

// Producer
Task.Run(() => {
    foreach (var msg in GetMessages())
    {
        collection.Add(msg);  // Блокируется, если очередь полная
    }
    collection.CompleteAdding();
});

// Consumer
Task.Run(() => {
    foreach (var msg in collection.GetConsumingEnumerable())
    {
        Process(msg);  // Блокируется, если очередь пустая
    }
});

Выглядит просто, но:

  1. Блокирующие вызовы. Add() блокирует поток, если очередь заполнена. Take() блокирует, если пуста. В ThreadPool это означает истощение потоков под нагрузкой.

  2. Нет async. AddAsync() не существует. Для async-кода приходится делать Task.Run(() => collection.Add(...)), по сути, притворяться асинхронным.

  3. Внутренние блокировки. BlockingCollection построен на lock и Monitor. Под высокой нагрузкой contention снижает производительность.

Channel: async-native очередь

Channel — такая вот трубка с двумя концами: Writer для записи, Reader для чтения.

var channel = Channel.CreateUnbounded<Message>();

// Producer (async)
async Task ProduceAsync()
{
    await foreach (var msg in GetMessagesAsync())
    {
        await channel.Writer.WriteAsync(msg);
    }
    channel.Writer.Complete();
}

// Consumer (async)
async Task ConsumeAsync()
{
    await foreach (var msg in channel.Reader.ReadAllAsync())
    {
        await ProcessAsync(msg);
    }
}

WriteAsync и ReadAsync — обычные async-методы. Если очередь полная/пустая, они не блокируют поток, а возвращают незавершённый ValueTask, который продолжится, когда появится место/данные.

Unbounded vs Bounded

Два типа каналов с принципиально разным поведением:

Unbounded Channel:

var channel = Channel.CreateUnbounded<T>();

Бесконечная очередь. WriteAsync никогда не ждёт — данные всегда принимаются. Просто и быстро, но если producer быстрее consumer, память растёт без ограничений. Подходит, когда уверены, что consumer справляется, или для коротких берстов.

Bounded Channel:

var channel = Channel.CreateBounded<T>(new BoundedChannelOptions(1000)
{
    FullMode = BoundedChannelFullMode.Wait
});

Ограниченная очередь. Когда полная, поведение определяется FullMode:

  • WaitWriteAsync ждёт, пока освободится место (backpressure)

  • DropNewest — новое сообщение отбрасывается

  • DropOldest — старейшее сообщение отбрасывается

  • DropWrite — запись отклоняется, но не ждёт

Backpressure (Wait) — правильный выбор для большинства сценариев. Producer замедляется, если consumer не успевает.

Внутренняя архитектура

Channels используют lock-free структуры данных где возможно. Unbounded channel построен на concurrent queue с атомарными операциями. Bounded channel сложнее,нужно отслеживать количество элементов.

Основная оптимизация — SingleReader и SingleWriter опции:

var channel = Channel.CreateUnbounded<T>(new UnboundedChannelOptions
{
    SingleReader = true,
    SingleWriter = true
});

Если канал используется одним producer и одним consumer, эти флаги позволяют убрать синхронизацию. Разница в производительности в разы.

На практике жеSingleWriter часто актуален (один источник данных), SingleReader реже (часто несколько обработчиков).

API: Writer и Reader

ChannelWriter<T>:

// Основной метод записи
await writer.WriteAsync(item, cancellationToken);

// Синхронная запись, если можно
if (writer.TryWrite(item))
{
    // Записано
}
else
{
    // Очередь полная (для bounded) или завершена
}

// Ожидание возможности записи
await writer.WaitToWriteAsync(cancellationToken);

// Сигнал завершения
writer.Complete();
writer.Complete(exception);  // С ошибкой

ChannelReader<T>:

// Чтение с ожиданием
T item = await reader.ReadAsync(cancellationToken);

// Попытка без ожидания
if (reader.TryRead(out T item))
{
    // Прочитано
}

// Ожидание данных
if (await reader.WaitToReadAsync(cancellationToken))
{
    // Данные есть, можно читать
}

// Итерация по всем элементам
await foreach (var item in reader.ReadAllAsync(cancellationToken))
{
    Process(item);
}

ReadAllAsync() — самый удобный способ для consumer. Он читает до завершения канала, корректно обрабатывает cancellation.

Pipeline обработки

Channels естественно соединяются в пайплайны:

async Task ProcessPipeline(ChannelReader<RawData> source, 
                           ChannelWriter<ProcessedData> sink)
{
    await foreach (var raw in source.ReadAllAsync())
    {
        var processed = Transform(raw);
        await sink.WriteAsync(processed);
    }
    sink.Complete();
}

// Построение пайплайна
var input = Channel.CreateBounded<RawData>(100);
var validated = Channel.CreateBounded<ValidData>(100);
var enriched = Channel.CreateBounded<EnrichedData>(100);
var output = Channel.CreateBounded<FinalData>(100);

// Запуск стадий
var tasks = new[]
{
    ValidateStage(input.Reader, validated.Writer),
    EnrichStage(validated.Reader, enriched.Writer),
    TransformStage(enriched.Reader, output.Writer),
    OutputStage(output.Reader)
};

await Task.WhenAll(tasks);

Каждая стадия как независимая async-задача. Bounded channels дают backpressure,если последняя стадия тормозит, давление распространяется назад по цепочке.

Fan-out / Fan-in

Fan-out — один producer, много consumers:

async Task FanOut(ChannelReader<Work> source, int workerCount)
{
    var tasks = Enumerable.Range(0, workerCount)
        .Select(_ => WorkerAsync(source))
        .ToArray();
    
    await Task.WhenAll(tasks);
}

async Task WorkerAsync(ChannelReader<Work> source)
{
    await foreach (var work in source.ReadAllAsync())
    {
        await ProcessAsync(work);
    }
}

Несколько consumers читают из одного канала. Channel сам распределяет работу, каждый элемент достаётся одному consumer.

Fan-in — много producers, один consumer:

async Task FanIn(IEnumerable<ChannelReader<Result>> sources,
                 ChannelWriter<Result> sink)
{
    async Task Forward(ChannelReader<Result> source)
    {
        await foreach (var item in source.ReadAllAsync())
        {
            await sink.WriteAsync(item);
        }
    }
    
    var tasks = sources.Select(Forward).ToArray();
    await Task.WhenAll(tasks);
    
    sink.Complete();
}

Результаты из нескольких источников собираются в один канал.

Пример обработки логов

Архитектура: HTTP endpoint принимает логи, они буферизуются и пакетами записываются в storage.

public class LogProcessor : IHostedService
{
    private readonly Channel<LogEntry> _channel;
    private readonly ILogStorage _storage;
    private Task _processingTask;
    
    public LogProcessor(ILogStorage storage)
    {
        _storage = storage;
        _channel = Channel.CreateBounded<LogEntry>(new BoundedChannelOptions(10000)
        {
            FullMode = BoundedChannelFullMode.DropOldest,  // При перегрузке теряем старые
            SingleReader = true
        });
    }
    
    // Вызывается из HTTP handlers
    public bool TryEnqueue(LogEntry entry)
    {
        return _channel.Writer.TryWrite(entry);
    }
    
    public Task StartAsync(CancellationToken ct)
    {
        _processingTask = ProcessLogsAsync(ct);
        return Task.CompletedTask;
    }
    
    public async Task StopAsync(CancellationToken ct)
    {
        _channel.Writer.Complete();
        await _processingTask;
    }
    
    private async Task ProcessLogsAsync(CancellationToken ct)
    {
        var batch = new List<LogEntry>(100);
        var timer = new PeriodicTimer(TimeSpan.FromSeconds(1));
        
        while (!ct.IsCancellationRequested)
        {
            // Читаем до заполнения batch или timeout
            using var cts = CancellationTokenSource.CreateLinkedTokenSource(ct);
            
            var timerTask = timer.WaitForNextTickAsync(cts.Token).AsTask();
            var readTask = ReadBatchAsync(batch, cts.Token);
            
            await Task.WhenAny(timerTask, readTask);
            cts.Cancel();  // Отменяем оставшуюся задачу
            
            if (batch.Count > 0)
            {
                await _storage.WriteBatchAsync(batch);
                batch.Clear();
            }
        }
    }
    
    private async Task ReadBatchAsync(List<LogEntry> batch, CancellationToken ct)
    {
        while (batch.Count < 100)
        {
            if (!await _channel.Reader.WaitToReadAsync(ct))
                break;
            
            while (batch.Count < 100 && _channel.Reader.TryRead(out var entry))
            {
                batch.Add(entry);
            }
        }
    }
}

DropOldest — при перегрузке теряем старые логи, а не блокируем HTTP

Batching — накапливаем до 100 записей или 1 секунды

Graceful shutdownComplete() сигнализирует о завершении, consumer дочитывает остаток

rate limiter

Ограничение скорости запросов к внешнему API:

public class RateLimitedClient
{
    private readonly Channel<Func<Task>> _requestQueue;
    private readonly HttpClient _client;
    
    public RateLimitedClient(HttpClient client, int requestsPerSecond)
    {
        _client = client;
        _requestQueue = Channel.CreateUnbounded<Func<Task>>(
            new UnboundedChannelOptions { SingleReader = true });
        
        // Запускаем processor
        _ = ProcessQueueAsync(requestsPerSecond);
    }
    
    public async Task<T> ExecuteAsync<T>(Func<HttpClient, Task<T>> request)
    {
        var tcs = new TaskCompletionSource<T>();
        
        await _requestQueue.Writer.WriteAsync(async () =>
        {
            try
            {
                var result = await request(_client);
                tcs.SetResult(result);
            }
            catch (Exception ex)
            {
                tcs.SetException(ex);
            }
        });
        
        return await tcs.Task;
    }
    
    private async Task ProcessQueueAsync(int rps)
    {
        var delay = TimeSpan.FromMilliseconds(1000.0 / rps);
        
        await foreach (var request in _requestQueue.Reader.ReadAllAsync())
        {
            await request();
            await Task.Delay(delay);
        }
    }
}

// Использование
var client = new RateLimitedClient(httpClient, requestsPerSecond: 10);

var results = await Task.WhenAll(
    client.ExecuteAsync(c => c.GetStringAsync("/api/1")),
    client.ExecuteAsync(c => c.GetStringAsync("/api/2")),
    client.ExecuteAsync(c => c.GetStringAsync("/api/3"))
);
// Запросы выполнятся с интервалом 100ms

Channel здесь — очередь ожидающих запросов. Processor выполняет их с заданной скоростью.

Возможные проблемы

Забыть вызвать Complete():

// Consumer будет ждать вечно
await foreach (var item in reader.ReadAllAsync())
{
    Process(item);
}
// Никогда не выйдет, если writer не вызвал Complete()

Всегда вызывайте Complete() когда producer закончил работу. Используйте try/finally или Complete(exception) при ошибках.

Не обработать завершение с ошибкой:

writer.Complete(new InvalidDataException("Bad input"));

// Consumer должен это обработать
try
{
    await foreach (var item in reader.ReadAllAsync()) { }
}
catch (ChannelClosedException ex) when (ex.InnerException != null)
{
    // Здесь ex.InnerException — InvalidDataException
}

Unbounded channel + медленный consumer = OOM:

// Producer быстрый
while (true)
{
    await channel.Writer.WriteAsync(GenerateData());
}

// Consumer медленный
await foreach (var data in channel.Reader.ReadAllAsync())
{
    await SlowProcessAsync(data);  // Очередь растёт без ограничений
}

Используйте bounded channel или мониторьте размер очереди.


Итак, на основе channels можно построить пайплайны, rate limiters, буферизацию, fan-out/fan-in. Это примитив — простой и быстрый.

Если хочется прокачать не только синтаксис, но и инженерную зрелость в вебе на .NET, посмотрите программу «C# ASP.NET Core разработчик». Там много практики по высоконагруженным API, интеграционным/нагрузочным тестам, Docker, CI/CD и Kubernetes — ровно тот стек, где Channels и пайплайны становятся ежедневным инструментом. Готовы к серьезному обучению? Пройдите вступительный тест.

Для знакомства с форматом обучения и экспертами приходите на бесплатные демо-уроки:

  • 29 января в 20:00. «Конфигурирование приложения ASP.NET CORE». Записаться

  • 11 февраля в 20:00. «Настройка CI/CD на практике с помощью Docker для ASP.NET-приложений». Записаться

  • 16 февраля в 20:00. «Реализация паттерна CQRS с использованием библиотеки MediatR и ASP.NET CORE». Записаться