
Однажды каждый C# программист получает на работе задачу по разработке интеграции с внешней системой, где ограничена максимальная частота запросов в секунду.
Интернет яростно сопротивлялся предоставить мне инструкцию к написанию такого кода, закидывая туториалами по настройке ограничения RPS на сервере, а не клиенте.
Но теперь на Хабре есть эта статья, которая научит отправлять запросы из
HttpClient так, чтобы не получать 429 Too Many Requests.Throttling
Обозначенный процесс взаимодействия с API, где нельзя превышать заданный RPS, в общем случае именуется троттлингом (throttling).
Throttling — это широко используемая техника для увеличения производительности кода, который выполняется повторно с некоторой периодичностью.
Троттлинг функции означает, что функция вызывается не более одного раза в указанный период времени (например, раз в 10 секунд). Другими словами ― троттлинг предотвращает запуск функции, если она уже запускалась недавно. Троттлинг также обеспечивает регулярность выполнения функции с заданной периодичностью.

В экшен-играх приходится нажимать кнопки с высокой частотой для выполнения какого-либо действия (стрельба, удар). Как правило, игроки нажимают кнопки намного чаще, чем это требуется, вероятно, увлекаясь происходящим. Таким образом игрок может нажать на кнопку «удара» 10 раз в течение пяти секунд, но персонаж делает не более одного удара в секунду. В этом случае троттлинг события «удар» позволяет игнорировать повторные нажатия кнопки в течение секунды.
Как реализуется подобное ограничение?
Rate Limiting Алгоритмы
Есть ряд алгоритмов, которые реализуют технику Rate Limiting — контроль количества допущенного трафика к объекту.

Существует множество различных алгоритмов ограничения скорости для управления потоком запросов. .NET 7 представил 4 таких алгоритма.
▍ Concurrency limit
Concurrency лимитер ограничивает количество одновременных запросов, которые могут получить доступ к ресурсу. Если установленный предел равен 10, то 10 запросов могут получить доступ к ресурсу одновременно, а 11-й запрос не будет допущен. Как только запрос завершается, количество разрешённых запросов увеличивается до 1, при завершении двух запросов — до 2 и так далее. Это делается с помощью вызова
Dispose на экземпляре RateLimitLease, о которой поговорим позже.▍ Token bucket limit
Token bucket алгоритм получил своё название, исходя из принципа работы. Представьте, что есть ведро, до краёв наполненное токенами. Когда поступает запрос, он забирает токен и хранит его вечно. Через некоторое время кто-то добавляет в ведро заранее определённое количество токенов, никогда не добавляя больше, чем ведро может вместить. Если ведро пустое, то при поступлении запроса ему будет отказано в доступе к ресурсу.
Приведу более конкретный пример. Предположим, что ведро вмещает 10 токенов, и каждую минуту в него добавляется 2 токена. Когда приходит запрос, он забирает 1 токен, так что у нас остаётся 9. Ещё 3 запроса приходят и забирают 3 токена, оставляя в ведре 6 токенов. Через минуту поступает 2 новых токена, что даёт 8 в сумме. 8 запросов приходят и забирают оставшиеся токены, опустошая ведро. Если приходит ещё один запрос после этого, то у него уже не получится взять доступ к ресурсу, пока в ведре не окажется больше токенов. Они в данном примере восполняются каждую минуту. Через 5 минут отсутствия запросов в ведре снова будут все 10 токенов, и в последующие минуты они не будут добавляться, пока новые запросы не начнут забирать токены.
▍ Fixed window limit
Алгоритм с фиксированным окном использует идею окна, которая будет использоваться и в следующем алгоритме. Окно — это промежуток времени, в течение которого действует ограничение, прежде чем произойдёт переход к следующему окну. В случае с фиксированным окном переход к следующему окну означает сброс ограничения обратно в начальную точку.
Представим, что есть кинотеатр с одним залом, вмещающим 100 человек, которые пришли смотреть 2-часовой фильм. Когда фильм начинается, людям разрешается встать в очередь на следующий сеанс, который состоится через 2 часа. В очереди могут стоять до 100 человек, прежде чем им начнут говорить прийти позже. По прошествии двух часов фильм заканчивается, и очередь от 0 до 100 человек может переместиться в кинотеатр, тем самым начиная формирование новой очереди. Всё равно что двигать само окно в алгоритме фиксированного окна.
▍ Sliding window limit
Алгоритм со скользящим окном похож на алгоритм с фиксированным окном. Но туда добавляются отрезки. Отрезок — это, соответственно, часть окна. Если взять 2-часовое окно из предыдущего раздела и разбить его на 4 отрезка, то получится четыре 30-минутных отрезка. Также учитывается индекс текущего отрезка, который будет всегда указывать на самый новый отрезок в окне. Запросы в течение получасового периода попадают в текущий отрезок, и каждые 30 минут окно сдвигается на один отрезок. Если в течение отрезка, мимо которого проскользнуло окно, были запросы, они обновляются, и установленное ограничение увеличивается на эту величину. Если запросов не было, наше ограничение остаётся прежним.
Время кодить
Окей, вот и разобрались немного с теоретической базой. Менеджер не дремлет и всё ещё просит задачу по интеграции, поэтому надо как можно скорее начать писать код.
Вот так, кстати, он выглядит:

Предположим, что есть некий
DataObject, содержащий некий Content.Этот
DataObject можно получить, вызвав какой-нибудь IDataObjectExternalApiService, под капотом у которого экземпляр HttpClient делает запрос:record DataObject(string Content); interface IDataObjectExternalApiService { Task<DataObject> GetByIdAsync(int id, CancellationToken ct = default); }
Есть какой-то набор идентификаторов, исчисляющийся тысячами, и по ним нужно выкачать соответствующие контенты:
interface IDataObjectCollectionProvider { Task<IReadOnlyCollection<DataObject>> GetByIdsAsync( IReadOnlyCollection<int> ids, CancellationToken ct = default); }
Но вот незадача! Это самое внешнее API допускает максимум только 10 RPS.
▍ SemaphoreSlim
Если допускается, что речь идёт о 10 запросах одновременно, то можно попробовать реализовать Concurrency лимитер с помощью примитива синхронизации
SemaphoreSlim.SemaphoreSlim — это облегчённая альтернатива Semaphore, которая ограничивает количество потоков, имеющих одновременный доступ к ресурсу или пулу ресурсов. При этом SemaphoreSlim можно использовать в рамках async/await.Получится примерно что-то такое:
class DataObjectCollectionProvider : IDataObjectCollectionProvider { private readonly IDataObjectExternalApiService _externalApiService; public DataObjectCollectionProvider(IDataObjectExternalApiService externalApiService) => _externalApiService = externalApiService; public async Task<IReadOnlyCollection<DataObject>> GetByIdsAsync( IReadOnlyCollection<int> ids, CancellationToken ct = default) { if (ids.Count == 0) return []; var semaphoreSlim = new SemaphoreSlim( initialCount: 10, maxCount: 10); ConcurrentBag<DataObject> dataObjects = []; var tasks = ids.Select(async id => { await semaphoreSlim.WaitAsync(ct); try { var dataObject = await _externalApiService.GetByIdAsync(id, ct); dataObjects.Add(dataObject); } finally { semaphoreSlim.Release(); } }); await Task.WhenAll(tasks); return dataObjects; } }
На мой взгляд, выглядит достаточно кустарно — надо встраивать использование примитива синхронизации в логику вызова и перебора, перемешивая инфраструктурный код с бизнес-логикой.
А если понадобится другой алгоритм, что делать в таком случае?
▍ System.Threading.RateLimiting
Как уже было сказано, .NET 7 представил новый NuGet-пакет
System.Threading.RateLimiting, содержащий реализации указанных в статье алгоритмов.Все они являются наследниками абстрактного класса
RateLimiter:public abstract class RateLimiter : IAsyncDisposable, IDisposable { public abstract int GetAvailablePermits(); public abstract TimeSpan? IdleDuration { get; } public RateLimitLease Acquire(int permitCount = 1); public ValueTask<RateLimitLease> WaitAsync(int permitCount = 1, CancellationToken cancellationToken = default); public void Dispose(); public ValueTask DisposeAsync(); }
Наследники этого класса принимают в качестве параметра специальные конфигурационные настройки для регулирования поведения алгоритма.
Соответственно, тут уже выбор какой-то появляется. Например, указанная задача, по моему скромному мнению, решается интуитивнее с помощью Fixed Window.
Тогда и переписать код можно следующим образом:
class DataObjectCollectionProvider : IDataObjectCollectionProvider { private readonly IDataObjectExternalApiService _externalApiService; public DataObjectCollectionProvider(IDataObjectExternalApiService externalApiService) => _externalApiService = externalApiService; public async Task<IReadOnlyCollection<DataObject>> GetByIdsAsync( IReadOnlyCollection<int> ids, CancellationToken ct = default) { if (ids.Count == 0) return []; var limiter = new FixedWindowRateLimiter( new FixedWindowRateLimiterOptions { Window = TimeSpan.FromSeconds(1), PermitLimit = 10, QueueLimit = 10 }); ConcurrentBag<DataObject> dataObjects = []; var tasks = ids.Select(async id => { using var lease = await limiter.AcquireAsync(cancellationToken: ct); if (lease.IsAcquired) { var dataObject = await _externalApiService.GetByIdAsync(id, ct); dataObjects.Add(dataObject); } }); await Task.WhenAll(tasks); return dataObjects; } }
Уже лучше, поскольку алгоритм заменим, а опции можно внедрить через DI.
Кстати, обратите внимание, что создаётся лимитер с параметрами
PermitLimit = 10 и QueueLimit = 10. Это означает, что в окно размером в секунду пустят не больше 10 запросов, и разрешается ставить в очередь вызовы WaitAsync с общим количеством запросов на разрешение не более 10.Однако что делать, если разрешение не удастся выбить, и лимитер не пустит наш запрос? Как построить логику обработки ошибок? И всё это сделать так, чтобы можно было написать unit-тесты?
▍ Polly.RateLimiting
Тут на помощь приходит библиотека
Polly, которая построила обёртку над System.Threading.RateLimiting в виде пакета Polly.RateLimiting.Вообще, инструмент сильно изменился, добавив концепцию пайплайнов. Пайплайн оборачивает необходимый вызов, но перед этим в него накручиваются все свистелки с перделками. Подробности вот в этом разделе документации.
Так вот лимитеры тоже можно добавлять в качестве этапа пайплайна, а сам пайплайн протаскивать через DI до конкретного потребителя.
Внедрение происходит с помощью специального провайдера, который достаёт пайплайн по заданному ключу. Для простоты в качестве ключа буду использовать имя типа потребителя.
Получается, что можно задекорировать внешний сервис, обернув вызов, а затем использовать его как угодно:
class DataObjectServiceRateLimiterDecorator : IDataObjectExternalApiService { private readonly IDataObjectExternalApiService _decorated; private readonly ResiliencePipeline<DataObject> _pipeline; public DataObjectServiceRateLimiterDecorator( IDataObjectExternalApiService decorated, ResiliencePipelineProvider<string> pipelineProvider) { _decorated = decorated; _pipeline = pipelineProvider.GetPipeline<DataObject>( key: nameof(DataObjectServiceRateLimiterDecorator)); } public async Task<DataObject> GetByIdAsync(int id, CancellationToken ct = default) => await _pipeline.ExecuteAsync(async token => await _decorated.GetByIdAsync(id, token), ct); } class DataObjectCollectionProvider : IDataObjectCollectionProvider { private readonly IDataObjectExternalApiService _externalApiService; public DataObjectCollectionProvider(IDataObjectExternalApiService externalApiService) => _externalApiService = externalApiService; public async Task<IReadOnlyCollection<DataObject>> GetByIdsAsync( IReadOnlyCollection<int> ids, CancellationToken ct = default) { if (ids.Count == 0) return []; var tasks = ids.Select(id => _externalApiService.GetByIdAsync(id, ct)); return await Task.WhenAll(tasks); } }
Допустим, RPS задаётся некими опциями, а вместе с лимитером хочется настроить некоторую политику повторов. Тогда конфигурация будет примерно такая:
services.AddResiliencePipeline<string, DataObject>( nameof(DataObjectServiceRateLimiterDecorator), (builder, pollyContext) => { var allowedRps = pollyContext.ServiceProvider .GetRequiredService<IOptions<IDataObjectApiOptions>>() .Value.RequestsPerSecond; builder .ConfigureTelemetry(NullLoggerFactory.Instance) .AddRetry( new RetryStrategyOptions<DataObject> { Delay = TimeSpan.FromSeconds(1), MaxRetryAttempts = 5 }) .AddRateLimiter( new FixedWindowRateLimiter( new FixedWindowRateLimiterOptions { Window = TimeSpan.FromSeconds(1), PermitLimit = allowedRps, QueueLimit = allowedRps / 3 + 10, })); });
В конце концов, можно и unit-тесты написать, проверив поведение пайплайна в некоторых ситуациях. Например, вот такие:
public class DataObjectServiceRateLimiterDecoratorTests { [Theory] [InlineData(1)] [InlineData(2)] [InlineData(3)] [InlineData(4)] [InlineData(5)] public async Task GetByIdAsync_ApiReturnedError_RetryCallHappened(int retryCount) { // arrange var response = new DataObject(Content: Guid.NewGuid().ToString()); var apiService = new Mock<IDataObjectExternalApiService>(); var sequentialResult = apiService.SetupSequence( x => x.GetByIdAsync( It.IsAny<int>(), It.IsAny<CancellationToken>())); for (var i = 0; i < retryCount - 1; i++) sequentialResult = sequentialResult.ThrowsAsync(new Exception()); sequentialResult.ReturnsAsync(response); var pipelineProvider = new Mock<ResiliencePipelineProvider<string>>(); pipelineProvider .Setup( x => x.GetPipeline<DataObject>( nameof(DataObjectServiceRateLimiterDecorator))) .Returns( new ResiliencePipelineBuilder<DataObject>() .AddRetry( new RetryStrategyOptions<DataObject> { MaxRetryAttempts = retryCount, Delay = TimeSpan.FromMilliseconds(1) }) .Build()); var decorator = new DataObjectServiceRateLimiterDecorator( apiService.Object, pipelineProvider.Object); // act var dataObject = await decorator.GetByIdAsync(id: default, ct: default); // assert dataObject.Should().BeEquivalentTo(response); apiService .Verify( x => x.GetByIdAsync( It.IsAny<int>(), It.IsAny<CancellationToken>()), Times.Exactly(retryCount)); } [Theory] [InlineData(5, 1)] [InlineData(100, 50)] [InlineData(60, 60)] [InlineData(10, 11)] [InlineData(20, 40)] [InlineData(30, 100)] public async Task GetByIdAsync_IfRpsRateLimitExceeded_ThenExceptionIsThrown(int rps, int amount) { // arrange var apiService = new Mock<IDataObjectExternalApiService>(); apiService .Setup( x => x.GetByIdAsync( It.IsAny<int>(), It.IsAny<CancellationToken>())) .ReturnsAsync(new DataObject(Content: string.Empty)); var pipelineProvider = new Mock<ResiliencePipelineProvider<string>>(); pipelineProvider .Setup( x => x.GetPipeline<DataObject>( nameof(DataObjectServiceRateLimiterDecorator))) .Returns( new ResiliencePipelineBuilder<DataObject>() .AddRateLimiter( new FixedWindowRateLimiter( new FixedWindowRateLimiterOptions { PermitLimit = rps, Window = TimeSpan.FromSeconds(1) })) .Build()); var decorator = new DataObjectServiceRateLimiterDecorator( apiService.Object, pipelineProvider.Object); // act var tasks = Enumerable.Range(0, amount) .Select(id => decorator.GetByIdAsync(id, ct: default)); var ex = await Record.ExceptionAsync(() => Task.WhenAll(tasks)); // assert if (amount > rps) ex.Should().BeOfType<RateLimiterRejectedException>(); else ex.Should().BeNull(); } [Fact] public async Task GetByIdAsync_HappyPath() { // arrange var apiService = new Mock<IDataObjectExternalApiService>(); apiService .Setup( x => x.GetByIdAsync( It.IsAny<int>(), It.IsAny<CancellationToken>())) .ReturnsAsync(new DataObject(Content: Guid.NewGuid().ToString())); var pipelineProvider = new Mock<ResiliencePipelineProvider<string>>(); pipelineProvider .Setup( x => x.GetPipeline<DataObject>( nameof(DataObjectServiceRateLimiterDecorator))) .Returns( new ResiliencePipelineBuilder<DataObject>() .AddRetry( new RetryStrategyOptions<DataObject> { MaxRetryAttempts = 5, Delay = TimeSpan.FromSeconds(1) }) .AddRateLimiter( new FixedWindowRateLimiter( new FixedWindowRateLimiterOptions { PermitLimit = 10, Window = TimeSpan.FromSeconds(1) })) .Build()); var decorator = new DataObjectServiceRateLimiterDecorator( apiService.Object, pipelineProvider.Object); // act var tasks = Enumerable.Range(0, 100) .Select(id => decorator.GetByIdAsync(id, ct: default)); var result = await Task.WhenAll(tasks); // assert result.Length.Should().Be(100); } }
Заключение
Сегодня вы узнали, как потреблять внешнее API с ограничением по RPS несколькими способами. Итеративно было выявлено, что удобнее и гибче всего работать через
Polly. Был предоставлен подробный пример.Ещё я веду Telegram-канал StepOne, куда выкладываю много интересного контента про коммерческую разработку на C#, даю карьерные советы, рассказываю истории из личного опыта и раскрываю все тайны IT-индустрии.
Telegram-канал со скидками, розыгрышами призов и новостями IT 💻

