Асинхронное программирование сегодня используется почти везде — от пользовательских интерфейсов до серверных систем с высокой нагрузкой. Оно позволяет не блокировать поток выполнения и эффективно работать с операциями ввода-вывода. Но вместе с этой гибкостью появляется и обратная сторона: пересекающиеся асинхронные вызовы начинают конкурировать друг с другом.

Представим простой пример. Пользователь печатает в строке поиска, и каждый символ запускает новый запрос. Или сервис несколько раз запрашивает одни и те же данные, потому что разные части системы одновременно обращаются к одному ресурсу. Это не редкие случаи — такое происходит постоянно.

Если такие вызовы никак не координировать, система начинает выполнять лишнюю работу. Возникают дублирующиеся запросы, гонки данных и дополнительные вычисления. В интерфейсах результаты могут приходить в неправильном порядке, а на сервере растёт нагрузка и появляются трудно воспроизводимые ошибки. Обычно проблему пытаются решать блокировками, токенами отмены или дополнительными проверками в коде, но такие решения часто оказываются лишь временными костылями.

Более системный подход — объединять пересекающиеся асинхронные операции. Если несколько вызовов требуют один и тот же результат, нет необходимости выполнять работу несколько раз. Гораздо разумнее выполнить её один раз и распределить результат между всеми ожидающими.

Начиная с .NET 9, подобный механизм частично появился в стандартной библиотеке: при загрузке данных в MemoryCache несколько параллельных запросов могут объединяться в одно выполнение. По сути это то, что я называю стратегией Use First: первый запрос запускает операцию, остальные просто ожидают её результат.

Однако в реальных системах одного такого поведения недостаточно. Иногда результат должен определяться самым последним запросом. Иногда выполнение лучше немного отложить, чтобы дождаться стабилизации входных данных. А в некоторых случаях операции вообще нужно выполнять строго по очереди.

В этой статье я разберу пять стратегий Async Coalescing — подхода, который позволяет контролировать конкуренцию асинхронных операций. Мы посмотрим, в каких ситуациях каждая стратегия полезна и какие компромиссы она вносит в систему.

Англоязычный оригинал: тут.


Рассматриваемые стратегии

В статье будут разобраны следующие стратегии:

  • Use First — операция выполняется один раз, остальные запросы используют тот же результат.

  • Use Last — предыдущие запросы отменяются, обрабатывается только самый последний.

  • Use Last with Debounce — выполнение откладывается на небольшой интервал, чтобы дождаться стабилизации входных данных.

  • Queue — запросы обрабатываются последовательно, результат возвращается последнему из них.

  • Aggregate — несколько запросов собираются в коротком временном окне и выполняются одной операцией.

Дополнительные материалы

1. Use First — выполнить один раз и использовать общий результат

В этой стратегии, если несколько асинхронных запросов одновременно обращаются к одному и тому же ресурсу, выполняется только первый запрос. Все последующие пересекающиеся вызовы не запускают новую операцию — они просто ожидают завершения первого и получают тот же результат. Таким образом удаётся избежать лишней работы при параллельных вызовах.

Этот подход полезен, когда:

  • нужно устранить дублирующиеся параллельные запросы;

  • результат операции стабилен и не требует немедленного обновления;

  • повторный запуск операции дорогой;

  • используется кэш: первый запрос инициирует загрузку значения, а все остальные просто ожидают тот же результат, вместо того чтобы запускать дополнительные загрузки.

Типичные примеры — инициализация при запуске приложения, загрузка конфигурации или получение общих данных, когда несколько компонентов интерфейса обращаются к ним одновременно. При использовании стратегии Use First параллельные запросы не выполняются независимо — они просто ожидают результат первого.

Use First — псевдокод

Чтобы понять принцип работы Use First, можно свести его к простой идее: стратегия хранит ссылку на текущую выполняемую задачу. Если новый запрос приходит в момент, когда операция уже выполняется, новая задача не создаётся — возвращается уже суествующая.

// Общая задача для конкретного ресурса
Task<T>? _runningTask;

public Task<T> Sync(Func<Task<T>> operation)
{
    // Для простоты опущены детали потокобезопасности

    // Если операция уже выполняется — возвращаем её
    if (_runningTask != null && !_runningTask.IsCompleted)
        return _runningTask;

    // Иначе запускаем новую
    _runningTask = operation();

    return _runningTask;
}

Ключевые свойства этой стратегии:

  • только первый запрос фактически запускает выполнение;

  • все пересекающиеся вызовы используют одну и ту же Task;

  • отмена операций не используется;

  • параллельного выполнения не происходит;

  • после завершения задачи следующий запрос может запустить новую операцию.

2. Use Last — учитывать только последний запрос

Иногда важен не первый запрос, а самый последний. Если в систему приходит новый вызов, предыдущая операция больше не имеет смысла — её результат всё равно будет устаревшим.

В стратегии Use Last каждый новый запрос инициирует отмену предыдущего и становится «активным». Старые операции получают сигнал отмены, хотя на практике они могут продолжать выполняться некоторое время, если отмена обрабатывается не сразу или вовсе игнорируется.

При этом итоговый результат определяется последним запущенным запросом. Именно его результат возвращается всем, кто ожидает завершения операции.

Этот подход полезен, когда:

  • нужно всегда учитывать самый последний запрос;

  • предыдущие запросы теряют актуальность после появления нового;

  • нет смысла тратить ресурсы на завершение старых операций;

  • данные постоянно меняются — например, при фильтрации, поиске по мере ввода или изменении параметров в интерфейсе.

Типичный пример — строка поиска. Пользователь быстро вводит текст, и каждый символ запускает новый запрос. Нет смысла дожидаться ответа для промежуточных вариантов — важен только результат для последнего введённого значения.

Стоит учитывать, что некоторое время несколько операций могут выполняться параллельно, пока происходит отмена предыдущих. Однако их результат всё равно игнорируется — итог определяется только последним запросом.

Use Last — псевдокод

// Общие данные для конкретного ресурса
TaskCompletionSource<T>? _completion;
CancellationTokenSource? _cts;
Task<T>? _latestTask;

public Task<T> Sync(Func<CancellationToken, Task<T>> operation)
{
    // Потокобезопасность опущена для простоты

    // Отменяем предыдущую операцию
    _cts?.Cancel();
    _cts = new CancellationTokenSource();

    // Запускаем новую
    var task = operation(_cts.Token);
    _latestTask = task;

    // Создаём общий источник результата
    _completion = new TaskCompletionSource<T>();

    task.ContinueWith(t =>
    {
        // Если это уже не последняя задача — игнорируем
        if (_latestTask != t)
            return;

        if (t.IsFaulted)
            _completion.TrySetException(t.Exception!);
        else
            _completion.TrySetResult(t.Result);
    });

    return _completion.Task;
}

Ключевые свойства стратегии:

  • каждый новый запрос инициирует отмену предыдущего;

  • результат определяется последней запущенной операцией;

  • предыдущие задачи могут продолжать выполняться, если игнорируют отмену;

  • всем ожидающим возвращается результат последнего запроса.

3. Debounce — дождаться “тишины”, затем выполнить один раз

Эта стратегия нужна для сценариев, когда входные данные быстро меняются, а выполнять операцию на каждое изменение бессмысленно или слишком дорого. Идея простая: мы ждём короткое окно тишины (например, 300–500 мс или 1–2 секунды). Пока запросы продолжают приходить, ничего не выполняем. Как только наступает пауза — выполняем операцию один раз, используя самое последнее значение.

Если операция уже запущена и приходит новый запрос, текущей операции отправляется сигнал отмены, после чего начинается новый debounce-цикл.

Этот подход полезен, когда:

  • нет смысла выполнять операцию на каждое изменение входных данных ;

  • важен только итоговый результат после паузы;

  • операция дорогая (БД, сеть, сложные вычисления);

  • источник изменений — пользовательский ввод: поиск, фильтры, слайдеры, валидация формы.

Типичные примеры: поиск “по мере ввода”, live-фильтры, валидация, предпросмотр. В течение окна ожидания никакого выполнения нет. Как только выполнение началось, новый запрос инициирует отмену текущей операции и запускает новый debounce-цикл. Если отмена игнорируется, старое вычисление может продолжать работать параллельно — поэтому особенно аккуратно надо быть с кодом, который меняет общее состояние.

Use Last + Debounce — псевдокод

// Общие данные для конкретного ресурса
CancellationTokenSource? _cts;
TaskCompletionSource<T>? _completion;
Task<T>? _runningTask;
Timer? _timer;

public Task<T> Sync(Func<CancellationToken, Task<T>> operation)
{
    // Потокобезопасность опущена для простоты

    // Сбросить таймер ожидания "тишины"
    _timer?.Dispose();

    // Все запросы в рамках одного debounce-цикла ждут один и тот же результат
    _completion ??= new TaskCompletionSource<T>();

    _timer = new Timer(_ =>
    {
        // Если уже что-то выполняется — попросим отмениться
        _cts?.Cancel();
        _cts = new CancellationTokenSource();

        // Запускаем операцию один раз по окончании окна тишины
        var task = operation(_cts.Token);
        _runningTask = task;

        task.ContinueWith(t =>
        {
            if (t.IsFaulted)
                _completion.TrySetException(t.Exception!);
            else
                _completion.TrySetResult(t.Result);

            // Следующий цикл начнётся с нового completion
            _completion = null;

        }, TaskContinuationOptions.ExecuteSynchronously);

    }, null, debounceDelay, Timeout.InfiniteTimeSpan);

    return _completion.Task;
}

Ключевые свойства стратегии:

  • каждый новый запрос сбрасывает таймер debounce;

  • пока таймер не истёк, ничего не выполняется;

  • после паузы выполняется операция один раз для последнего значения;

  • если во время выполнения приходит новый запрос — текущая операция получает сигнал отмены и начинается новый цикл;

  • все запросы в рамках одного debounce-окна получают один и тот же результат.

4. Queue — последовательная обработка запросов

В этой стратегии запросы, обращающиеся к одному ресурсу, ставятся в очередь и выполняются строго по одному, в порядке поступления. Каждый следующий запрос начинает выполняться только после завершения предыдущего.

Если несколько запросов оказываются в очереди одновременно, они все ждут завершения обработки. При этом итоговый результат, который возвращается ожидающим, определяется последней выполненной операцией.

Этот подход полезен, когда:

  • требуется строго последовательное выполнение, чтобы избежать гонок данных или конфликтов ресурсов;

  • запросы должны выполняться по порядку, но важен только итоговый результат;

  • операции дорогие и их нельзя выполнять параллельно;

  • операции работают с общим ресурсом и должны выполняться последовательно — например, запись в общий файл, обновление данных или обработка задач в одном потоке.

Типичные примеры — фоновые задания, пакетные обновления или любые операции записи, где параллельное выполнение может привести к конфликтам.

При использовании стратегии Queue несколько запросов могут ожидать выполнения, пока текущая операция работает, но сами операции всегда выполняются строго последовательно. Важно учитывать, что все операции в очереди всё равно будут выполнены — даже если их индивидуальный результат в итоге не используется.

Queue — псевдокод

Стратегия Queue гарантирует, что операции, обращающиеся к одному ресурсу, выполняются по одной и в порядке поступления.

Отдельные запросы не получают собственный результат. Вместо этого все вызовы, пришедшие в течение одного цикла обработки очереди, ожидают общий результат. Когда очередь опустеет, всем возвращается результат последней выполненной операции.

Ниже приведён упрощённый псевдокод, иллюстрирующий идею.

Queue<Func<Task<T>>> _queue = new();
TaskCompletionSource<T>? _tcs;
bool _processing;

public Task<T> Sync(Func<Task<T>> op)
{
    lock (_queue)
    {
        // Добавляем запрос в очередь
        _queue.Enqueue(op);

        // При необходимости создаём новый цикл ожидания
        _tcs ??= new TaskCompletionSource<T>();

        // Если обработка ещё не запущена — запускаем её
        if (!_processing)
        {
            _processing = true;
            _ = ProcessQueue(_tcs);
        }

        // Все запросы в рамках текущего цикла ждут один и тот же результат
        return _tcs.Task;
    }
}

async Task ProcessQueue(TaskCompletionSource<T> batchTcs)
{
    T lastResult = default!;

    try
    {
        while (true)
        {
            Func<Task<T>>? op;

            lock (_queue)
            {
                // Если очередь пуста — завершаем цикл
                if (_queue.Count == 0)
                {
                    _processing = false;
                    _tcs = null;
                    break;
                }

                // Берём следующий запрос
                op = _queue.Dequeue();
            }

            // Выполняем операции строго последовательно
            lastResult = await op();
        }

        // Завершаем цикл и возвращаем итоговый результат всем ожидающим
        batchTcs.TrySetResult(lastResult);
    }
    catch (Exception ex)
    {
        lock (_queue)
        {
            _processing = false;
            _tcs = null;
        }

        batchTcs.TrySetException(ex);
    }
}

Ключевые свойства стратегии:

  • операции выполняются строго по одной;

  • пока выполняется текущая операция, другие запросы могут ожидать в очереди;

  • все операции в очереди всё равно выполняются, даже если их индивидуальный результат не используется;

  • все запросы, пришедшие в один цикл обработки, получают один общий итоговый результат;

  • после опустошения очереди следующий запрос запускает новый цикл обработки.

5. Aggregate — объединение запросов в один пакет

В этой стратегии входящие асинхронные запросы в течение короткого времени накапливаются в буфере (например, 1–2 секунды). Вместо того чтобы выполнять каждую операцию отдельно, их входные данные объединяются с помощью функции агрегации. Когда окно накопления заканчивается, выполняется одна операция, использующая объединённое значение.

Если во время выполнения этой операции приходят новые запросы, они накапливаются уже для следующего пакета. После завершения текущего выполнения обрабатывается следующий накопленный набор запросов.

Все запросы, попавшие в одно окно накопления, получают результат одной общей операции.

Этот подход полезен, когда:

  • множество небольших запросов можно объединить в одну более крупную операцию;

  • базовая операция выигрывает от пакетной обработки;

  • уменьшение количества запусков важнее, чем мгновенный отклик;

  • накопленные данные (например, суммы, коллекции или обновления) можно безопасно объединять.

Типичные примеры — пакетная запись в базу данных, группировка телеметрии, объединение обновлений интерфейса или любые сценарии, где множество входных событий можно собрать и обработать одной операцией.

При использовании Aggregate в каждом окне накопления выполняется только одна операция. Однако важно учитывать, что объединяемые данные должны быть безопасны для агрегации: функция объединения должна давать предсказуемый результат и корректно работать с несколькими входными значениями.

Aggregate — псевдокод

Стратегия Aggregate собирает входные запросы в течение короткого окна времени (например, 1.5 секунды), объединяет их значения с помощью функции агрегации и затем выполняет одну операцию с полученным результатом.

Если новые запросы приходят во время выполнения операции, они накапливаются для следующего цикла обработки.

on request(value):

    if not running:
        add value to accumulator
        reset buffer timer

    else:
        add value to nextAccumulator

when buffer timer expires:

    running = true
    result = execute(accumulator)
    complete all callers in this batch with result

    accumulator = nextAccumulator
    nextAccumulator = empty

    if accumulator not empty:
        execute immediately (no new delay)
    else:
        running = false

Ключевые свойства стратегии:

  • запросы собираются в течение короткого окна накопления;

  • за одно окно выполняется только одна операция;

  • во время выполнения новые запросы формируют следующий пакет;

  • после завершения текущего выполнения следующий пакет запускается сразу;

  • каждый запрос получает результат того пакета, в который он попал.

Важно понимать, что стратегия Aggregate решает не только проблему лишних запросов. Она снижает IO-амплификацию — ситуацию, когда большое количество мелких параллельных операций приводит к многократному выполнению одинаковой работы.

Вместо множества небольших запросов система выполняет одну агрегированную операцию, что уменьшает нагрузку на базу данных, сеть и другие ресурсы.

Я подробнее разобрал этот эффект в отдельной статье (англ.).

Сравнение стратегий

Рассмотренные стратегии отличаются тем, как они управляют порядком выполнения операций, отменой запросов и тем, какой результат возвращается ожидающим. Ниже приведено краткое сравнение их поведения.

Стратегия

Модель выполнения

Поведение при отмене

Параллельность

Возвращаемый результат

Типичные сценарии

Use First

Выполняется первый запрос; остальные не запускаются

Отмена не используется

Нет

Всем возвращается результат первого запроса

Кэш, инициализация

Use Last

Активным считается последний запрос

Предыдущим операциям отправляется сигнал отмены

Возможна во время отмены

Всем возвращается результат последнего запроса

Поиск, динамический UI

Queue

Запросы выполняются последовательно

Отмена не используется

Нет

Всем возвращается итоговый результат последовательности

Очереди задач

Debounce

Выполнение откладывается до окончания периода тишины

Текущая операция получает сигнал отмены при новом вводе

Возможна во время отмены

Всем возвращается результат после паузы

Быстро меняющийся ввод

Aggregate

Запросы накапливаются и объединяются

Явная отмена не используется

Одно выполнение на окно

Всем возвращается результат агрегированной операции

Пакетная обработка

FlowSync — применение стратегий объединения

Описанные выше стратегии — не просто теоретические паттерны. Они реализованы в библиотеке FlowSync, которая делает объединение асинхронных операций явной и настраиваемой стратегией.

Вместо того чтобы каждый раз вручную писать код с TaskCompletionSource, CancellationTokenSource, очередями, таймерами и блокировками, FlowSync разделяет две вещи:

  • бизнес-логику — что делает операция;

  • семантику конкуренции — как должны вести себя пересекающиеся вызовы.

Это разделение и является основным принципом библиотеки.

Репозиторий: https://github.com/0x1000000/FlowSync

Основная идея

Метод описывается как обычная асинхронная операция, но возвращает FlowSyncTask<T>:

public async FlowSyncTask<int> FetchAsync(int id)
{
    var ctx = await FlowSyncTask.GetCancellationContext();

    // ctx.CancellationToken учитывает:
    // 1) внешнюю отмену
    // 2) отмену, вызванную стратегией объединения
    // ctx.IsCancelledLocally == true только для случая (2)

    await Task.Delay(Random.Shared.Next(50, 501), ctx.CancellationToken);

    return id + 42;
}

Сам метод не содержит никакой логики синхронизации.
Он просто выполняет свою работу и, при необходимости, реагирует на CancellationToken, переданный через контекст.

Стратегия выбирается в месте вызова:

// Стратегия хранит состояние объединения для группы вызовов
static readonly IFlowSyncStrategy<int> Strategy =
    new UseLastCoalescingSyncStrategy<int>();

public async Task<int> CallerAsync(int id)
{
    return await FetchAsync(id)
        .CoalesceInGroupUsing(Strategy, groupKey: id);
}

Метод CallerAsync может вызываться параллельно (например, из разных потоков). Для одного и того же groupKey стратегия управляет тем, как будут обрабатываться пересекающиеся вызовы: некоторые из них могут ожидать результат, некоторые — отменяться, а некоторые — вообще не запускать собственное выполнение.

Стратегии

Каждый из рассмотренных паттернов реализован в виде отдельной стратегии:

  • UseFirstCoalescingSyncStrategy

  • UseLastCoalescingSyncStrategy

  • QueueCoalescingSyncStrategy

  • DeBounceCoalescingSyncStrategy

  • AggCoalescingSyncStrategy

Все стратегии используют одну и ту же абстракцию, но отличаются семантикой:

  • игнорировать ли предыдущие вызовы;

  • отменять ли их;

  • откладывать ли выполнение;

  • ставить ли вызовы в очередь;

  • объединять ли входные данные.

При этом сама операция не меняется.
Меняется только политика синхронизации (агрегация — небольшое исключение).

Такой подход делает поведение системы при конкуренции явным и настраиваемым, вместо того чтобы размазывать сложную логику синхронизации по коду.


Заключение

Асинхронная конкуренция — это не просто техническая деталь. Она напрямую влияет на то, как система ведёт себя под реальной нагрузкой. Если пересекающиеся асинхронные вызовы никак не координировать, появляются гонки данных, лишняя работа и трудно предсказуемые результаты.

Подход coalescing вводит в эту ситуацию структуру: он определяет, какие запросы действительно важны, когда должна происходить операция и какой результат получают ожидающие вызовы.

Стратегии Use First, Use Last, Queue, Debounce и Aggregate — не единственно возможные, но они охватывают многие практические сценарии. Явно выбранная стратегия делает поведение асинхронного кода понятнее, упрощает тестирование и повышает надёжность системы.

Ссылки