Pull to refresh

Comments 72

Количество операций в секунду — 6, количество потоков — 4, количество запросов — 50.
6х4 = 20, должно выполниться за 2.5 секунды же? У вас вышло 5+.
Что? Нет. Эти две величины вообще между собой не связаны. Количество операций в секунду это ограничение, выставленное мной для барьера, а количество потоков относится к тестовому методу. Метод запускает проверку на 4х потоках.
И да, 6х4=24 :)
на какой диапазон допустимых значений для «количества операций в секунду» вы ориентировались, разрабатывая этот алгоритм? вы уверены что вам нужно именно точное решение этой задачи и не достаточно приближенного?
Я ориентировался на небольшие (меньше 100). Но в целом можно использовать и для бОльших значений. Тут все будет зависеть от конкретной задачи.
А почему Вы спросили про приближенное? Вообще мне нужно ограничить количество определенных запросов от пользователя конкретной цифрой.
основной вопрос — зачем вам хранить все таймстемпы запросов за последнюю секунду? почему бы не использовать leaky bucket?
Потому что я не вижу способа использовать здесь данный алгоритм. Но если вы знаете, подскажите.
Берете счетчик, устанавливаете его в число rps. Допустим 10 — для 10 rps. Допустим последний запрос был в таймстемп t_last. Счетчик будет пополняться со скоростью 10 единиц в секунду (до максимального значения равного 10).

Когда приходит новый запрос в таймстемп t_current — увеличиваете счетчик на (t_current-t_last) * 10 и уменьшаете на 1. если получилось меньше 0 — значит количество запросов превышено. Если больше — то ок, сохраняете t_last = t_current.
Я не понимаю. Устанавливаю счетчик на значение 10 (для 10 rps). Счетчик будет пополняться со скоростью 10 единиц в секунду до максимального значения равного 10. Он же уже 10. И зачем его дергать каждую секунду без надобности?

И как t_current (t_current-t_last) * 10 может получиться меньше 0?

Не понимаю короче.
Дергаете только когда новый запрос приходит. C_current = C_last-1+(t_current-t_last)*10. Достаточно хранить только t_last (таймстемп последнего запроса) и С_last (последний счетчик)
Что такое t_current и t_last? Если это отпечатки времени в миллисекундах, то
(t_current-t_last)*10 при разнице в полсекунды будет равно 5000.

Извините, может мне уже пора спать.
Вещественное число, в секундах. Ну если хотите чтобы было целое и в миллисекундах — то счетчик должен будет прирастать за 1000 мс на 10 единиц, т.е. на 0.01 единиц за 1 мс. Формула преобразуется в C_current = C_last-1+(t_current-t_last)*0.01
Не должен метод дергаться по таймеру. Да и с расчетами беда у вас. но общий смысл вроде уловил. Подумаю над этим завтра. Сегодня голова уже не варит.
хм. whatever… ключевые слова в гугле: leaky bucket, token bucket. в качестве примера можно взять исходники модуля limit_req в nginx'е.
Все, я вас понял. Я попробовал сделать через ведро размером rps. Каждый запрос добавлял в ведро 1 и вычитал дельту времени, поделенную на rps и умноженную на 1000.
c_count = l_count + 1 — (c_time-l_time) / rps * 1000

Плюс данного метода в том, что нет пула, то есть нет расхода оперативной памяти и сложность инициализации О(1).
А вот минусы:
  1. (c_time-l_time) / rps * 1000 — теряет точность при rps кратному простому числу начиная с 3.
  2. Количество математических операций больше (то есть расчеты выполняются дольше чем в моем способе)


Так вот для моей задачи эти минусы критичны, а лишняя оперативка — нет.
Конечно же, я напутал здесь в формуле. Но смысл не меняется
Блин, как тут удалять комменты? :)

Конечно же, формула будет такой
c_count = Math.Max(l_count + 1 — (c_time — l_time) /1000 * rps, 0)

Минуса с неточностью, следовательно, не будет. Но вот по производительности надо будет проверить. К ней требования выше, чем к жору оперативной памяти.
В общем способ тут не подходит. Совсем.
Проблема в том, что в ведро приходит целая операция, а уходит какая-то ее часть (эквивалент дельты времени).
В результате из ведра может утечь половина, но реально количество выполненных за последнюю секунду операций останется неизменным. В итоге функция пропускает операции, которые не должна была пропустить.

ЗЫ Я прежде чем чего то предлагать людям, стараюсь проверить сам. Попробуйте и вы. На данный момент способ видится мне нерабочим.
Есть коллекция отпечатков DateTime

А теперь подумайте, что случится при смене системного времени по тем или иным причинам. Для таких целей следует использовать монотонный таймер. В .NET таковым является Stopwatch.

Да, ценное замечание. Сейчас подправлю под него :)
Как то усложнено, не так ли?
Достаточно хранить круговой массив из N дат последних запусков задач, если дата последнего больше периода — запускаете задачу, иначе ждете паузу в разницу между этим периодом и текущим временем.
Да, если вы посмотрите предыдущие ревизии на гитхабе, то увидите, что до этого использовался как раз круговой массив из дат :)
Но мне еще важна сложность поддержки. Поэтому я сделал очевидный круговой набор данных.
Кстати, ничего ждать мне не нужно. Вместо ожидания функция просто возвращает false
А задача всего проекта какая?
Похоже на сценарий для НТ :)
UFO landed and left these words here
UFO landed and left these words here
Я не знаю что делает BlockingQueue в Java, но в C# нет ничего, что мне бы подошло из коробки.
Вы знаете, весь код умещается у меня на ладошке. То есть его немного. Так что приведите конкретную реализацию. А до тех пор я не вижу плюсов. Из минусов сразу скажу, размер коллекции динамический, что значит, что при добавлении нового элемента происходит выделение памяти. А это ненужная мне дополнительная нагрузка.
UFO landed and left these words here
Я же вам сказал, повезло вам. Я в шарпе работаю. И ответил я не вам, а lisper'у :)

А про таймер, тут вы не правы ИМХО. Таймер = дополнительная нагрузка.
UFO landed and left these words here
Стандартные инструменты подходят для большинства стандартных задач.

В моем случае задача не стандартная и стандартных инструментов без потери в производительности для нее нет. В первую очередь код должен соответствовать своей задаче, не выполнять ничего лишнего (особенно там, где каждый тик на счету), и только во вторую должен быть понятным.

Поток будет спать не 99,9% времени в данном случае. Хотя это зависит от его периодичности. Тут во избежание ошибок периодичность должна быть 1 такт в миллисекунду. И то этого мало, это урезает точность выполнения, когда за 1 миллисекунду происходит больше одного запроса. И кроме всего прочего таймер (в .NET по крайней мере) гарантирует выполнение не чаще, чем установленная периодичность, а вот реже вполне может. Значит точность будет еще ниже.

Но как обстоят дела на Java, уважаемый Senior Java Developer, я не знаю. тут вам виднее.
UFO landed and left these words here
Или вы не привыкли замерять производительность вашего кода.
UFO landed and left these words here
Покажите код вашего таймера
UFO landed and left these words here
Я не уверен, как в яве разруливаются потоки, но в .NET количество активных потоков не может превышать количество ядер процессора. Если потоков запущено больше, то они будут чередоваться (один уснул, другой проснулся). То есть если я запускаю таймер с интервалом 1мс и в системе запущено еще несколько потоков, то таймер может не проснуться вовремя, так как другие активные потоки могут надолго зависнуть и не отдавать эстафету системе (среде выполнения). Почему то мне кажется, что это ограничение железа и системы по-другому обойти проблематично и в яве все устроено так же.
В любом случае вы можете это проверить, запустив 100 потоков параллельно на цикл 1-100 с выводом счетчика и номера потока.

Но даже если таймер в яве работает точно как швейцарские часы, появляется просадка по точности. И точность эта 1мс. Хотя за 1 мс может быть выполнено довольно много запросов.

В моем коде на данный момент то же самое ограничение, но я легко могу его преодолеть, заменим штамп с ElapseMilliseconds на ElapsedTicks. Количество тиков за миллисекунду зависит от процессора и настроек системы. На моем домашнем (слабеньком) компе это значение равно около 300000. То есть точность будет максимальной (за 1 тик точно больше одной операции не будет выполнено).
Как то так.

А про AtomicInteger я не в курсе. В шарпе такого нет. Так что я не понимаю как это работает.
Я не знаю что делает BlockingQueue в Java, но в C# нет ничего, что мне бы подошло из коробки.

А про AtomicInteger я не в курсе. В шарпе такого нет.

Вам бы книжку какую почитать, а то получается что Вы не в курсе, а виноват почему-то C#
BlockingCollection Overview
Interlocked Operations

Очень замечательно. А теперь предложите решение на основе BlockingCollection, которое не проигрывает по скорости.

Проигрывает по скорости чего? Скорость выполнения CanExecute()?
Нужен критерий перформанса.


Через коллекцию токенов можно посмотреть как в Akka.Net реализован Throttle.


А через атомарные операции и таймер:
https://pastebin.com/fZ7EBGe3


Вызов таймера выполняется через ThreadPool

Да хватит пихать сюда таймер! Я уже описал пробелмы таймера. Если ThreadPool не найдет свободного потока, то таймер будет опаздывать! И это при том, что частоты 1 раз за 1мс и так недостаточно!

Тогда могу сказать что в Вашей задаче не хватает требований, которые Вы придумываете на лету, и которые никто кроме Вас не знает.


Способов реализации того, что описано в начале статьи (2 пункта) — масса. В том числе с помощью стандартных инструментов, с которыми Вы незнакомы.


Что в данном случае понимается под скоростью — остаётся загадкой.

Под скоростью в данном случае я имею ввиду время выполнения CanExecute()

Требования — она должна выполняться максимально быстро. Если в моем решении она выполняется быстрее, чем в вашем, вы проигрываете.

Я много с чем не знаком в платформе .NET. Как и вы, думаю. И если бы данная коллекция могла бы мне помочь, я бы с ней познакомился. Потому что я, прежде чем «изобретать велосипед», прочитал описание всех коллекций, которые смог найти. На некоторых останавливался чтобы изучить подробнее. И ничего мне не подошло.
Под скоростью в данном случае я имею ввиду время выполнения CanExecute()

Тогда спешу сообщить, что на моей машине вызов CanExecute() из моего решения с атомарными операциями занимает 12нс, а из вашего — 42нс.


Измерял BenchmarkDotNet.

Но я так понимаю в CanExecute() вы только добавляете слепок времени? А очистка происходит по таймеру?
Но я так понимаю в CanExecute() вы только добавляете слепок времени? А очистка происходит по таймеру?

Если что код я выложил: https://pastebin.com/fZ7EBGe3


В CanExecute() происходит атомарный декремент числа (никаких походов в heap, всё на стеке). Если число меньше 0, то атомарно инкрементим обратно.


По таймеру происходит обратное. Инкрементим, и если токенов стало очень много — декрементим.


Переменную можно сделать volatile чтобы она случайно незакешировалась в проце.

тут даже наверное и тред с таймером не нужен — можно обрабатывать возможное срабатывание таймера в том месте где запрос пришел (непосредственно в CanExecute). но опять таки, автору это решение как я понимаю не подходит, т.к. например в ситуации ограничения на 1 рпс последовательность «putToken -> sleep(0.9с) -> request -> sleep(0.1с) -> putToken -> sleep(0.1с) -> request» выполнит оба реквеста и между ними интервал будет всего 0.2с, что не вписывается в ограничение по рпс. нужно ли именно такое ограничение на самом деле и стоит ли ради его точного соблюдения тратить лишнюю память на хранение каждого таймстемпа запросов за последнюю секунду — это уже вопрос к автору
Что я должен передать в конструктор в качестве perInterval?
Что я должен передать в конструктор в качестве perInterval?

Допустим TimeSpan.FromSeconds(1)

Ваша реализация ломает ограничение на количество операций в секунду.
Ваша реализация ломает ограничение на количество операций в секунду.

Нет, не ломает. Желательно скриншот и объяснение.

Нет, не ломает. Желательно скриншот и объяснение.
Фактически, то же самое предложил skyramp только без таймера. Математическая модель та же. Вместо таймера мы просто замеряем дельту времени между запросами при выполнении CanExecute(). То есть у вас происходят вычисления каждые Х мс, а без таймера просто при запросе эти вычисления умножатся на на количество мс между запросами.
Так вот такое решение на том же самом тесте, на котором я тестировал свое, пропустило 8 задач вместо 6. Все дело в том, что запрос при таком подходе оценивается как усредненное значение времени. Это будет работать только в том случае, если запросы будут поступать каждые s / rps секунд. То есть такой подход ломает ограничение. Более того, он позволит пользователям обойти его намеренно.
Точно вижу один плюс — использование стандартных средств языка, а не написание своего велосипеда. Экономия на выделении памяти под элементы — экономия на спичках. Никто не запрещает перед основной работой сразу создать нужное количество элементов. Кстати, не очень понятно, зачем в условии задачи привязка к количеству запусков в секунду. Очень похоже, что это переформулированная задача об одновременном существовании определенного количества объектов в нескольких потоках. Для таких вещей придуманы семафоры. В частности в шарпе есть модификация SemaphoreSlim, которая позволяет задать сколько именно потоков будут иметь параллельный доступ к ресурсу. И нет, писать код за вас я не стану. На MSDN отличный пример для SemaphoreSlim
Я уже где то отвечал на это. У меня задача не стандартная и стандартные средства тут не годятся.

Экономия на выделении памяти под элементы — экономия на спичках.

Я не экономлю на памяти как раз.

Никто не запрещает перед основной работой сразу создать нужное количество элементов.
Да, я именно так и делаю.

Кстати, не очень понятно, зачем в условии задачи привязка к количеству запусков в секунду. Очень похоже, что это переформулированная задача об одновременном существовании определенного количества объектов в нескольких потоках.
Есть API, некоторые функции для каждого пользователя ограничены количеством выполнений в секунду. Ограничение это взято из статистики. В итоге сервер успевает обработать все входящие запросы и пользователи не ждут в очереди, пока отработают куча ненужных запросов других пользователей, которые, кстати, могут так наспамить, что мало не покажется. А так им придется организовывать работу под ограничение. Ну и плюс это какая никакая защита от ddos.

Для таких вещей придуманы семафоры. В частности в шарпе есть модификация SemaphoreSlim, которая позволяет задать сколько именно потоков будут иметь параллельный доступ к ресурсу. И нет, писать код за вас я не стану. На MSDN отличный пример для SemaphoreSlim

Вы мне рассказываете о чем моя задача? Зачем? Я не писал изначально о ее предназначении, потому что это не имеет значения.
Она никак не помогает решить мою задачу.

Товарищ omikron описал очень интересный алгоритм (который я возьму на вооружение). Раз в секунду в коллекцию кладётся нужное количество "разрешений". Например, всего 10 раз в секунду, метод в прошлую секунду выполнился 7 раз, значит в коллекции осталось 3 разрешения. Другой поток проходится по этой коллекции и "спит", если количество разрешений равно нулю. Особенность BlockingCollection в том, что цикл не заканчивается, если коллекция пустая.
Алгоритм очень лаконичный. Из математики там только одна операция вычитания.

Я тут рассуждаю про минимизацию нагрузки, а вы собираетесь вешать на функцию таймер! Еще и нагрузить выделением памяти для добавления нового элемента в очередь.
UFO landed and left these words here
Я вам мне про сложность, а про динамические коллекции. Вы забываете про выделение памяти для добавляемых элементов. В результате при сложности O(1) у вас выполняется инструкций больше, чем вы рассчитывали. Помножим это на количество обращений, в итоге картинка получится не радужная.
UFO landed and left these words here
Зачем мне использовать массив, если описание элемента связного кругового списка в моем случае занимает 3 строки и вносит ясность? К тому же не требует больше никакой логики на проверку переполнения. К тому же ничего не нужно удалять/добавлять.

Не всегда лаконичный код является наиболее верным. Далеко не всегда.

А выделять дополнительный поток для таймера — это вообще моветон! Я тут на копейках экономлю, а вы предлагаете косарик вложить :)

UFO landed and left these words here
По моему скромному опыту реализация цикла даже с такой проверкой сильно бъёт по процессорному времени и нужно это дело искусственно ограничивать через Thread.Sleep(). Уберите его в своих тестах и посмотрите что произойдёт. Уверен, погоня за памятью сразу уйдёт на второй план.
Зачем убирать Thread.Sleep()? у меня же запросы будут не сплошным потоком. Я же не файлы копирую :)
Между запросами в любом случае будет пауза. Хоть короткая, но будет.

По дороге с работы подумал над конкретными примерами применения обоих алгоритмов. Ваш стоит использовать для контроля количества запросов к API в мобильном приложении. Вариант с BlockingCollection я возьму для демона, который отправляет на вебхук агреггированные данные.
Собственно, да, разные цели и разные приоритеты — память или процессорное время. :)

и держать для этого отдельный поток с таймером?

UFO landed and left these words here
На RX можно так попробовать:

           //исходная последовательность событий
            IObservable<MyEvent> source= ...;

            //ограничения
            var count = 5;
            TimeSpan period = TimeSpan.FromSeconds(1);
            
            //"обрезанная" последовательность
            IObservable<MyEvent> cuttedStream = source
                .WithLatestFrom(Observable.Interval(period).StartWith(-1), (src, timer) => new { src, timer })
                .GroupBy(p => p.timer)
                .SelectMany(p => p.Take(count))
                .Select(p => p.src);
Sign up to leave a comment.

Articles