Управление действиями процессов. Не превышение лимита RPS (QPS) API

image

Структурно-функциональная схема модуля (кликабельно)

Хочу рассказать о разработанном модуле Publisher Pulsar (github), который позволяет динамически синхронизировать действия процессов.

В качестве частного случая его использования:

Например, есть множество (десятки или сотни, тысячи) процессов, независимо друг от друга обращающихся к API Google Analytics.

При этом, GA установлен лимит в 10 queries per second с одного IP.

Если не регулировать обращения к API
то постоянно будет возвращаться ошибка превышения лимита некоторыми процессами, и либо они не смогут выполнить свою задачу без данных API, либо будут снова и снова пытаться получить данные в цикле, создавая проблему для других процессов, увеличивая количество ошибок. То есть будет хаос и отсутствие прогнозируемости в части процента корректно получаемых данных и процента ошибок при обращениях к API.

И постольку возникает задача избежать превышения лимита RPS (QPS), чтобы все процессы могли корректно получать данные.

Учитывая, что обращения к GA могут идти хаотично, из различных сторонних процессов, использующих те же IP адреса.

И таким образом, возникает вопрос — использовать более дорогую избыточность, либо ставить более скромные жесткие лимиты в статическом token bucket - https://github.com/bandwidth-throttle/token-bucket.

Данный модуль как раз справляется с достижением баланса избыточности и максимальной эффективности, динамически корректируя параметры своей активности (являясь вариантом реализации динамического token bucket).


Система призвана функционировать подобно пульсару — совершать регулярные («пульсирующие») рассылки подписчикам.

Общую структуру действий по использованию модуля можно описать так:

1. Указать параметры и запустить Пульсар как демон.

2. Настроить код процесса (Сервиса), обращающегося к API (прим. — выполняющего любое действие, которое необходимо синхронизировать), для коннекта к Пульсару, чтобы прежде выполнения действия (например, совершения запроса к API) процесс обращался бы к Пульсару и ждал разрешения на выполнение действия. И только после получения разрешения выполнял бы его.

В результате Пульсар согласно настройкам одновременно разрешает быть подписчиками только [например] 10 процессам (которые вышли из FIFO стека; т.е. 10-ти разрешили стать подписчиками, а остальные N находятся в ZMQ очереди).

И после того, как необходимое количество процессов стало подписчиками, им высылается разрешение, после которого они могут совершить свое действие (например, обратиться к API).

Таким образом, лимит будет соблюдаться независимо от количества параллельно работающих процессов (в пределах возможностей стека ZMQ).

3. После этого подписчик (исполнитель) должен послать в Пульсар сообщение о выполненном действии — присутствуют ли ошибки или все в порядке.

Т.к. если при выполнении действия присутствуют ошибки, связанные с количеством одновременно выполненных действий, то Пульсар может скорректировать свое поведение — временно, до нормализации ситуации (исчезновения ошибок) уменьшить число подписчиков, увеличить интервал между публикациями (разрешениями действий), или даже на время прекратить работу (в случае ошибки, требующей перерыва в действиях; например, превышения суточного лимита 403 DailyLimitExceeded).

1) Настройка и запуск Пульсара:

Есть режим из коробки и режим настройки. Из коробки предполагает в том числе дефолтные сокетные адреса, и это может подойти только в случае, если Пульсар и воркеры запускаются на одной машине.

Из коробки:
$pulsar = new \React\PublisherPulsar\Pulsar();
        
$publisherPulsarDto = new \React\PublisherPulsar\Inventory\PublisherPulsarDto();              
$publisherPulsarDto->setModuleName('react:pulsar'); //произвольное имя, характеризующее назначение Пульсара
$publisherPulsarDto->setReplyStackCommandName('php artisan react:pulsar-reply-stack'); // Вызов субсидиарного скрипта, выполняющего роль стека для исполнителей. Код этого скрипта не требует настроек, он приведен чуть ниже. В данном случае указан путь вызова консольной команды Laravel
$publisherPulsarDto->initDefaultPulsarSocketsParams();
        
$pulsar->setPublisherPulsarDto($publisherPulsarDto);
$pulsar->manage();

И опции настройки:
$publisherPulsarDto->setPulsationIterationPeriod(1); // количество секунд между публикациями (в результате размер будет не меньшим, чем указанный в этом параметре; и может быть большим при некоторых условиях)
$publisherPulsarDto->setSubscribersPerIteration(10); // количество подписчиков, которым высылается разрешение на действие (в т.ч. одновременное; и эта одновременность или не одновременность зависит уже от кода процесса-исполнителя/подписчика)
$publisherPulsarDto->setPerformerContainerActionMaxExecutionTime(7); // количество секунд ожидания результирующих сообщений от исполнителей для возможной коррекции поведения
$publisherPulsarDto->setLogger(\Log::getMonolog());  // чтобы использовать имеющиеся StreamHandlers. Если не сделать set, то создаст новый Logger с выводом информации в STDOUT
$publisherPulsarDto->setMaxWaitReplyStackResult(7); // количество секунд ожидания подключения нужного количества подписчиков, указанного в свойстве subscribersPerIteration выше. Если за это время нужное количество не подключится к Стеку, то Пульсар запустит процесс имитации подключения исполнителей, чтобы добрать нужное количество в виде "фантомов" и продолжить работу

$pulsarSocketsParams = new \React\PublisherPulsar\Inventory\PulsarSocketsParamsDto();

//могут быть любые свободные порты
$pulsarSocketsParams->setReplyToReplyStackSocketAddress('tcp://127.0.0.1:6271');
$pulsarSocketsParams->setPushToReplyStackSocketAddress('tcp://127.0.0.1:6272');
$pulsarSocketsParams->setPublishSocketAddress('tcp://127.0.0.1:6273');
$pulsarSocketsParams->setPullSocketAddress('tcp://127.0.0.1:6274');
$pulsarSocketsParams->setReplyStackSocketAddress('tcp://127.0.0.1:6275');

$publisherPulsarDto->setPulsarSocketsParams($pulsarSocketsParams);
$pulsar->setPublisherPulsarDto($publisherPulsarDto);

$pulsar->manage();

Код скрипта ReplyStack:

$replyStack = new  \React\PublisherPulsar\ReplyStack();
$replyStack->startCommunication();

Note: важно, чтобы Пульсар был запущен раньше процессов, подключающихся к нему, иначе процессы будут стучаться в пустоту по адресам, которые еще не связаны с Пульсаром, и просто зависнут в ожидании ответа, который не придет никогда.

2) Настройка кода исполнителя (подписчика):

Включаем объект Performer пакета модуля в код процесса:

Из коробки:
$performer = new \React\PublisherPulsar\Performer();
 
$performerDto = new \React\PublisherPulsar\Inventory\PerformerDto();
$performerDto->setModuleName("YourServiceNameContainingPerformer"); // для понимания в логах какой тип исполнителей выполняет действие 

$performer->setPerformerDto($performerDto);
$performer->initDefaultPerformerSocketsParams();
 
$this->zmqPerformer = $performer;  

И опции настройки:

$performerDto->setLogger(\Log::getMonolog()); 
 
$performerSocketParams = new \React\PublisherPulsar\Inventory\PerformerSocketsParamsDto();

//эти адреса должны совпадать с адресами Пульсара в рамках ZMQ-парности (Publish/Subscribe, Push/Pull, Request/Reply)
$performerSocketParams->setPublisherPulsarSocketAddress('tcp://127.0.0.1:6273');
$performerSocketParams->setPushPulsarSocketAddress('tcp://127.0.0.1:6274');
$performerSocketParams->setRequestPulsarRsSocketAddress('tcp://127.0.0.1:6275');

$performerDto->setSocketsParams($performerSocketParams);

$performer->setPerformerDto($performerDto);

$this->zmqPerformer = $performer; 

И далее в необходимом месте, перед вызовом целевого действия, требующего синхронизации/координации, вызываем метод, отвечающий за получение разрешения от Пульсара:

$this->zmqPerformer->connectToPulsarAndWaitPermissionToAct();


3) После выполнения целевого действия необходимо отправить результирующее сообщение о том, возникли ли ошибки. Например в таком виде:

if (isUserRateLimitExceeded()) {
    $result = new ActionResultingPushDto();
    $result->setActionCompleteCorrectly(false);
    $result->setSlowDown(true);
    $result->setErrorMessage($e->getMessage());
    $result->setErrorReason(GaErrorResponsesConstants::USER_RATE_LIMIT_EXCEEDED);

    $this->zmqPerformer->pushActionResultInfo($result);
} elseif (isDailyLimitExceeded()) {
    $result = new ActionResultingPushDto();
    $result->setActionCompleteCorrectly(false);

    $sleepForPeriod = new ErrorSleepForPeriod();
    $sleepForPeriod->setSleepPeriod((60 * 60 * 1000000));

    $result->setSleepForPeriod($sleepForPeriod);
    $result->setErrorMessage($e->getMessage());
    $result->setErrorReason(GaErrorResponsesConstants::DAILY_LIMIT_EXCEEDED);

    $this->zmqPerformer->pushActionResultInfo($result);
} else {
    $this->zmqPerformer->pushActionResultInfoWithoutPulsarCorrectionBehavior();
}

Это актуально также для случая, если к Пульсару подключена только часть процессов, а другая работает в произвольном хаотичном виде. И такая механика позволит снижать количество ошибок, создаваемых совместной активностью упорядоченных и неупорядоченных процессов.

***

При этом, как уже было сказано, модуль можно использовать для любой периодической передачи информации процессам. Для этого достаточно при инициализации засетить свой класс, отнаследованный от PublisherToSubscribersDto, содержащий логику управления процессами, которые его получат.

То есть при инициализации демона в пункте 1) добавить:

$publisherToSubscribersDto = new YourNameExtendedByPublisherToSubscribersDto(); 
$publisherToSubscribersDto->setYourProperty();

$publisherPulsarDto->setPublisherToSubscribersDto($publisherToSubscribersDto);

И этот объект будет передаваться процессам.
AdBlock похитил этот баннер, но баннеры не зубы — отрастут

Подробнее
Реклама

Комментарии 11

    0
    А есть ли что-то подобное для Node.js?
      0
      Если гуглить Token Bucket algorithm node.js, то находит

      https://github.com/jhurliman/node-rate-limiter

      и еще ряд других.
        0
        Да, спасибо!
      0
      Для решения похожей задачи в php есть библиотека https://github.com/bandwidth-throttle/token-bucket. Позволяет реализовать алгоритм «текущего ведра» (равная длительность между выдачей токенов). Есть два режима работы — когда подписчик завершает работу при отсутствии свободного токена, либо ждёт до появления следюущего.
      Правда в некоторых, редких, случаях модуль выдаёт больше токенов в секунду чем запрашивалось, победить так и не удалось. Проблему решаем возвращением отклонённого запроса в очередь.
        0
        Интересная библиотека!

        «Правда в некоторых, редких, случаях модуль выдаёт больше токенов в секунду чем запрашивалось, победить так и не удалось.»

        Хм… Интересно, почему так…

        Вот тут, кажется, центральная логика
        https://github.com/bandwidth-throttle/token-bucket/blob/master/classes/TokenBucket.php#L166

        И раз он выдает токен, когда не должен, значит $delta становится равной нулю или положительной тогда, когда должна быть отрицательной.

        Там перед этим есть вызов bcsub() [пусть и до 8-го знака] и округление до int(). Возможно, какие-то маленькие положительные float значения приводятся к нулю.

        Может быть можно было бы попробовать изменить условие:

        if ($delta < 0) {

        false

        }

        if ($delta <= 0){

        false

        }

        «Для решения похожей задачи»

        Да, можно было бы задействовать эту библиотеку. Но в ней нет динамического изменения количества пропускаемых «токенов» и динамического изменения периодичности (Rate) или засыпания всех процессов в зависимости от ответов API.

        Как нет и возможности посылать произвольные управляющие сигналы в процессы.

        И получается, описанный модуль шире по функциям и отличается внутренней механикой.

        Тут конечно интересный вопрос максимальной нагрузки на модуль: zmq укладывает запросы в стек, и по их документации, может быть более 2-х млн. сообщений в секунду (http://zeromq.org/results:more-precise-0mq-tests, с необходимостью сделать поправку на влияние скорости движения по Сети и разные типы обмена сообщениями).

        И тогда вопрос упрется только в оперативную память для большинства задач [ну, просто вообще редко когда нужны десятки и выше миллионов такого рода сообщений в секунду, ставящие под вопрос поведение zmq, и в этом случае будут использованы другие языки для реализации подобной системы.)]

        А в модуле token-bucket для глобального доступа (с разных хостов) используется Redis или Memcache, и обращение через, например, Predis->set().

        И насколько это загрузит сервер, где крутится хранилище, какое значение будет записано — тут могут быть подводные камни, которые тоже будут влиять на устойчивость/корректность системы.
          0
          Тут конечно интересный вопрос максимальной нагрузки на модуль: zmq укладывает запросы в стек, и по их документации, может быть более 2-х млн. сообщений в секунду (http://zeromq.org/results:more-precise-0mq-tests, с необходимостью сделать поправку на влияние скорости движения по Сети и разные типы обмена сообщениями).

          А можете пояснить момент, из текста не понял. Допустим у нас ограничение на 10 запросов в секунду, сколько воркеров в вашей системе нужно запустить? Просто зачем 2 млн., если ограничение на вызовы api гораздо меньше?

          Наша система работает следующим образом (предположим что ограничение 10 в секунду). В RabbitMQ есть очередь со списком индетификаторов. Так же есть главный процесс php. Он получает пачку заданий (10 шт., ограничение через prefetch) и под каждое запускает свой форк. Из-за ограничения, мы не породим сразу 100500 процессов, а будем их поразжать по мере выполнения. Токены выдаются в самом форке, т.е. если нет свободных, то скрипт ждёт освободившегося.

          Спасбио за библиотеку!
            0
            В RabbitMQ есть очередь со списком индетификаторов. Так же есть главный процесс php. Он получает пачку заданий (10 шт., ограничение через prefetch) и под каждое запускает свой форк.

            При этом вы запускаете форки на одной машине. Если же запросом не 10, а например 10 тыс, эффективнее их уже запускать параллельно на нескольких машинах, что и призван решить инструмент автора.

            Что касается количества воркеров — тут уже влияет скорость выполнения заданий. Если вам разрешено 10 запросов в секунду, а после получения ответа воркер производит, например, тяжелую аналитику, так что время выполнения запроса и последующей его обработки составит 2 секунды, можно использовать уже 20 воркеров.
              0
              Допустим у нас ограничение на 10 запросов в секунду, сколько воркеров в вашей системе нужно запустить? Просто зачем 2 млн., если ограничение на вызовы api гораздо меньше?


              Тут немножко другая логика.

              Из-за ограничения, мы не породим сразу 100500 процессов, а будем их поразжать по мере выполнения.


              Действительно, как и в комментарии KarimSI: представьте, что вы хотите еще более распараллелить выполнение заданий.

              Чтобы одновременно работали бы, например, 100 или 1000 процессов.

              а после получения ответа воркер производит, например, тяжелую аналитику

              Или просто какие-то более-менее затратные действия, коннекты к БД, обработку.

              И таким образом пока один процесс выполняет обработку другой, в следующую секунду, уже может обращаться к API. И получается, что нет простоя.

              При этом, распараллеливание возможно даже на одной машине. Просто нужно скорректировать механику получения данных (например id) нужных для работы воркера.

              Пульсар — это часть более крупной системы, которая позволяет создавать параллельные процессы через proc_open (а не fork), передавать им нужные данные, и при этом количество создаваемых процессов ограничивается количеством заданий из очереди и количеством выделенной для этих целей оперативной памяти.

              Эта система уже работает, код выложен, но релиз еще не сделан (нужно сделать небольшие правки и тесты).

              Она состоит из модуля Process&Load Manager и связки с Gearman (модуль Gearman Conveyor).

              В целом всё выглядит вот так:
              image

              В ее рамках можно в запускаемом по крону Клиенте добавить в очередь Gearman нужные задания. После чего по Крону запустить процесс, запускающий Process&Load Manager (Pm&Lm), который в свою очередь начинает создавать воркеров в пределах отведенного процента свободной оперативной памяти и количества заданий.

              Затем каждый из создаваемых воркеров коннектится к Gearman, получает задачу. После чего коннектится к Пульсару и выполняет ее в описанной в статье логике, с разрешения Пульсара.

              При этом, процесс может быть поставлен на паузу или прекращен Pm&Lm, и в таком случае сведения об этом отправляются в Клиент, чтобы задача была пересоздана и выполнена при следующей итерации запуска Pm&Lm.

              Gearman хорош тем, что это не просто очередь, но и система call-бэков. Что позволяет очень быстро реагировать на изменения в процессах.

              Например, процесс-воркер посылает сигнал о каком-либо состоянии выполнения/не выполнения на другую ноду, где крутится Gearman, там в обработчике колбэка происходит анализ и принимается решение: пересоздать, отправить уведомление, отправить в другую очередь (например, RabbitMQ) и соответственно, в другую систему обработки и так далее. То есть не нужно делать переодические запросы о проверке состояния.

              Но при этом очередь Gearman в таком использовании не сохраняема (т.е. упадет клиент, создавший задачи — все задачи будут удалены из очереди), и ее нельзя администрировать и просматривать как Rabbit, например.

              ***

              Если же использовать только модуль Пульсар, например, в форке, то получается, что можно взять не 10 id за раз, а столько, сколько позволяет оперативка (тут просто вопрос, сколько можно создать процессов, когда каждый из них может потреблять по разному, в зависимости от ответа API, например. И заранее это не предугадаешь. Поэтому и был написан Pm&Lm, чтобы защитить ноду от падения в связи с перегрузом оперативки).

              Так, наверное, можно сделать и сейчас, взяв не 10, а [условно] 100 id, если есть понимание, что памяти хватит. Т.к. token-bucket будет делать ограничение в 10 обращений в секунду.
                0
                Эта система уже работает, код выложен, но релиз еще не сделан

                Соответственно, до релиза, в продакшне не стоило бы использовать, т.к. могут вноситься изменения в названия методов и какие-то небольшие, но влияющие на API вещи.
              0
              «какие-то маленькие положительные float значения приводятся к нулю.»
              поправка: конечно отрицательные float значения
              0
              [ответ отправлен не туда и стерт]

              Только полноправные пользователи могут оставлять комментарии. Войдите, пожалуйста.

              Самое читаемое