Простой сервер задач с очередью в MySQL (без проблем с блокировками)

    Почти в каждом более менее динамическом проекте бывает возникает необходимость выполнять очереди задач в фоне (отправка email, обновления кеша, реиндексация поиска и т.д.). Job сервера (Gearman и т.п.) хороши, но для большинства простых задач они избыточны. Классическая реализация очередей в MySQL (при помощи SELECT … LOCK FOR UPDATE) при росте нагрузки со временем начинает приводить к проблемам с блокировкой. Потому, как это обычно бывает, пришлось написать свой «велосипед» для работы с фоновыми задачами, который бы «точно работал» и был предельно прост.

    Основа: Cron, PHP 5.3 (mysqli), MySQL > 5.1 — легко «влепить» почти на любой хостинг.
    Операция получения (захвата) задачи — атомарна (один UPDATE запрос). Никаких проблем с блокировкой и RC.
    Возможность распределения воркерам задач по группам и приоритетам, передача массива данных в исполняемый метод (функцию).
    Три режима обработки завершенных задач: переместить запись в отдельную таблицу, удалить запись, оставить запись и отметить как успешно обработанная.
    Обработка незавершенных задач или задач, обработанных с ошибкой — на совести разработчика.
    На всё про всё 400 строк кода (с полными PHPDOC).
    Ограничения: текущая реализация не подходит для persistent соединений, но если кому-то потребуется, несложно допилить. Даже при желании переписать на другой язык :)

    Возможность неблокирующей работы с очередью реализована через использование пользовательских переменных в UPDATE запросе с их последующей выборкой. Посвящать этому приему целую статью — глупо. Гораздо приятнее конечная реализация, которую можно применить в дело (Мы же с вами практики, не так ли?). Во всём остальном исключительно классическая очередь с группами и приоритетами.

    Пример использования (клиент):
    $task_server = \DBTaskServer::create('localhost', 'root', '', 'testDB', 'jobs_queue');
    $task_server->addTask('mywork', $data);
    

    mywork — функция, которая должна быть доступна воркеру. В нее будет передан массив $data. Также возможно указывать вызов статических методов класса.
    $task_server->addTask('MyWork::doWork', $data);
    


    Пример воркера:
    \DBTaskServer::create('localhost', 'root', '', 'testDB', 'jobs_queue') // Создаем сервер.
    		->setByCLIAgruments($argv) // Устанавливаем параметры вызова из консоли.
    		->setMode(\DBTaskServer::MODE_MARK_AS_COMPLETED) // Выбираем режим обработки.
    		->run(); // Запускам воркера.
    


    Запуск воркера из консоли с параметрами:
    /path/to/script/worker.php [max_tasks_per_lifecycle] [comma_separated_group_ids]
    

    Как понятно из названия, первая опция говорит о том сколько максимум задач может выполнить воркер прежде чем завершит работу (если конечно таковые для него будут доступны), вторая опция — это значения group_id заданий, которые данный воркер должен обрабатывать. Если группы не указаны, то воркер обрабатывает любые группы.

    Например:
    /path/to/script/worker.php 100 3,5,6
    

    Выполнить 100 заданий из групп 3, 5 и 6.
    Если заданий не будет найдено, то воркер сразу завершит свою работу.

    Добавляем воркера в крон:
    0-59/5 * * * * /path/to/script/worker.php 5 3 >/dev/null 2>&1
    

    Каждые 5 минут обрабатывать по 5 заданий с group_id=3.

    В архиве примеры клиента, воркера, сам класс сервера (задокументирован), sql файл с таблицей задач.
    Качать тут (аж целых 5kB).

    Приятного вам кода.
    Поделиться публикацией

    Комментарии 35

      +14
      Чего только не придумывают, лишь бы не юзать нормальные брокеры очередей.
        +3
        Подскажите «нормальный брокер очередей», который бы нормально работал на большинстве хостингов.
          +1
          Rabbitmq
            –4
            И какой процент shared хостингов позволяют его использовать? Не говоря уже про «стрельбу из пушки по воробьям».
              +4
              Не, ну в данном случае стоит сразу определиться какая целевая аудитория у приведенного в статье решения. В статье написано, что «при росте нагрузки со временем начинает приводить к проблемам с блокировкой» и мол по этому SELECT FOR UPDATE череват deadlock-ами, и при большом количестве воркеров/тасков не эффективен — это подразумевает, что решение ориентировано на проекты с приличной нагрузкой. Но при таком раскладе люди, занимающиеся такими проектами, обычно не запариваются на тему «чорт побери, у нас shared-хостинг, мы не можем поставить so-шку для rabbitmq/zeromq». Обычно в случаях маломальской нагрузки и сервера-то соответствующие. С другой стороны если на проекте посещаемость — полтора землекопа, то возникает вопрос «а нужна ли очередь?»
                0
                Поверьте, есть множество задач которые не требуют внедрения таких комплексных систем. Не говоря уже о поддержке и KIS. У любого растущего проекта всегда есть такая стадия, когда простого cron'a уже мало, а rabbitMQ или Gearman еще не известно понадобятся ли.
                Так что говорить «Если вам нужно очереди задач — значит обязательно покупайте соответствующий хостинг и ставьте rabbitMQ», на мой взгляд, как-то опрометчиво. Не согласны?
                  –1
                  нууу допустим, что даже простецкий VPS с рутовым доступом (где можно настроить все что угодно) стоит не так уж и дорого (за пару евро в месяц) :-) но вообще согласен, что подход, предложенный в статье, имеет право на жизнь… сам использовал нечто подобное в одном своем проекте в прошлом. Правда я не использовал переменные MySQL, а делал SELECT и потом нагонял UPDATE для фиксации задачи воркером, и иногда (особенное при разрастании очереди до 50k задач) воркеры работали вхолостую по несколько раз…
              +1
              С таким подходом можно отвергнуть все предложения. Вы спросили про нормальный MQ broker, я вам ответил. Лично я не использую shared hosting, поэтому для меня это некритично. И это не стрельба из пушки по воробьям — это использование инструментов по назначению.
          0
          Почему все вокруг так любят подменять задачи очередями? Или я один имею дело с задачами с зависимостями?
          0
          Чего только не придумают, лишь бы не юзать демонов
            0
            Демоны имеют совершенно другое назначение. Самый простой пример: когда ваши процессорные мощности ограничены (чтобы не грузить продакшн) и вам нужно обрабатывать данные задания на другом сервере.
            +2
            Сдается мне в данной реализации есть один минус — если использовать mysql_pconnect(), то коннекты к MySQL после создания будут сохраняться в pool-e до следующего вызова, и соответственно вместе с ними будет сохраняться весь «мусор», в том числе и локально установленные переменные. А при большом количестве воркеров может произойти ситуация, когда между операцией UPDATE и инициализацией в коде PHP переменных, установленных в MySQL (аля ID и т.п.), может вклиниться другой процесс и изменить локально установленные переменные.
            ИМХО предложенный вариант с использованием сессионных переменными MySQL не True way, лучше уж сделать предварительный SELECT, а потом UPDATE. С другой стороны, в этом случае при большом количестве воркеров схема будет работать вхолостую.
            На самом деле про грабли реализации очереди на MySQL писал ещё Котеров, например вот тут: xpoint.ru/forums/computers/dbms/mysql/thread/43203.xhtml Так что лучше использовать специализированные средства, типо rabbitMQ и не извращаться
              +2
              Похоже, вы не совсем внимательно прочитали:
              > Ограничения: текущая реализация не подходит для persistent соединений, но если кому-то потребуется, несложно допилить.
                0
                Оу, точно, извиняюсь с persistent — упустил этот момент в статье
                0
                Сессии при pconnect-е не шарятся между работающими скриптами. Т.е. Если скрипт работает и открыл соединение с БД (сессию), то никакой другой скрипт эту же сессию не получит, пока первый скрипт не умрет.
              • НЛО прилетело и опубликовало эту надпись здесь
                  +2
                  Возьмем простой пример из жизни.

                  На проекте есть несколько десятков пользователей. Когда приходит клиент и оставляет заявку, нужно обработать его «корзину», произвести операции со счетами «поставщиков», переформировать кеш, отправить сообщения «поставщикам». Нагрузка именно посетителей — минимальна, так как клиентов мало (не соц. сеть), но заставлять посетителя ждать несколько секунд после нажатия кнопки «Отправить» — кощунство, не говоря уже о перекладывании всей этой работы в клиентский запрос.

                  В результате имеем достаточно простые операции, но и просто нежелательно выполнять при клиентском запросе. Поднимать ради этого VPS, ставить rabbitMQ, писать скрипты — абсолютно ненужная трата ресурсов, когда абсолютно тоже самое можно реализовать приведенным в статье примером. К тому же это будет значительно проще в поддержке, следовательно дешевле в доработке для заказчика. Соответственно и для разработчика.

                  Как бы вы решили данный вопрос?
                    +1
                    То есть установка rabbitmq — это трата ресурсов, а ваше решение — нет? Вы это серьезно?
                      0
                      Абсолютно. Только rabbitMQ, это не только как вы кратко выразились «установка», это поднятие и поддержка сервера, доработка кода с использованием клиента, любая последующая поддержка будет требовать от исполнителя компетенции в использовании rabbitMQ, и так далее.

                      Согласитесь, что это слегка разные трудозатраты по сравнению php+mysql, особенно для заказчика, который будет искать исполнителя.
                  –6
                  Статью не читал, но за картинку плюсик. :)
                    0
                    У меня как раз стоит задача, описанная автором в комменте выше, а именно: после нажатия какой либо кнопки, нужно сделать некую тяжелую задачу (например запрос к стороннему апи). Это все лишние секунды, причем несвязанные с перегруженными мощностями.

                    Я еще не имею опыта реализации очередей, демонов и прочего. Поэтому думал просто написать скрипт и как обычно запускать его кроном раз в период. Скрипт будет пробегаться по записям в специальной таблице и выполнять для них эту трудоемкую задачу.

                    Предлагаемое вами решение очевидно инкапсулирует более навороченную логику, не могли бы вы подробнее описать ее, для чего бывают нужны те или иные функции?
                      0
                      Да собственно вы своим вопросом сами себе и ответили.
                      Действие клиента — простое, бизнес логика за действием — сложная и не обязательно должна быть исполнена мгновенно.
                      Примеров множество:
                      Отправка большого числа email сообщений, реиндексация поиска (например при редактировании какой-либо сущности), обновление взаимосвязанных сущностей или таблиц активности (если их логика в силу каких-то причин не могла быть реализована в хранимых функциях базы), взаимодействие со сторонними api, транзакционные действия со счетами и т.д.

                      Соответственно чтобы не пихать всё в один файл, или же разделять задачи по группам и приоритетам на разных мощностях («мухи отдельно, мёд отдельно»), может пригодится такая реализация.
                      0
                      Говоря о блокировках, есть еще вариант MySQL-specific. Где-то вычитал давно, не помню где. Смысл в том, что нужно получить именованую блокировку сервера и имя каким-то образом будет зависеть от ID выбираемой записи. Код примерно такой

                      SELECT *
                      FROM `queue`
                      WHERE GET_LOCK(CONCAT("my_queue_", id), 0)
                      LIMIT 1
                      

                      id — первичный ключ в таблице. В persistent режиме вроде не будет работать без явного закрытия блокировки (не уверен, никогда не использовал). Если процесс внезапно помрет посреди обработки (и соединение закроется!), то блокировка отпускается.
                        +1
                        Что-то похожее сделал не так давно в виде модуля на Yii github.com/yupe/yupe/tree/master/protected/modules/queue
                          0
                          >>Операция получения (захвата) задачи — атомарна (один UPDATE запрос). Никаких проблем с блокировкой и RC.
                          >>Возможность неблокирующей работы с очередью реализована через использование пользовательских переменных в UPDATE запросе с их последующей выборкой. Посвящать этому приему целую статью — глупо.

                          Нет уж простите. В этом ведь и весь самый-самый цимес. Как это вы умудряетесь захватывать задачи без блокировок? Неужели сразу же после захвата фиксируете транзакцию? Уборщица помыла полы в серверной, сессия срубилась, не завершенная задача не освободилась?
                            +3
                            Вот вопрос относится не блокировкам как таковым, а скорее к обработке исключительных ситуаций. В случае с select lock for update вы получаете пачку зависающих блокировок (которые в «случае со шваброй» просто будут отпущены), в случае с update set status 'processing', вы получите в базе несколько задач, которые имеют незавершенный статус.

                            И как и было сказано в статье, их обработка на совести разработчика. Данный пример даже частично выигрывает в возможности обработки исключительных ситуаций, потому как мы можем легко понять в каком статусе мы закончили работу с заданием (соответственно проверить что было сделать и исправить ситуацию). Если же брать select lock..., то там освободившаяся «после швабры» блокировка никак не сообщит что произошло с заданием, даже если оно было полностью выполнено, но мы не успели об этом сообщить базе.

                            Иногда лучше что-то не сделать и знать об этом, чем сделать два раза ничего не подозревая (например операция со счетом).
                              0
                              У вас в коде везде в методах стоит exit(1). Мне вот интересно, как вы видите обработку исключительных ситуаций разработчиком? В ручном режиме проверять syslog? Для исключительных ситуаций придумали исключения, так и дайте разработчику право самому решать, как их обрабатывать, а не убивать его скрипт.
                                0
                                Простите, но разве я разве где-то сказал «выключите свои мозги и никогда не дорабатывайте код под свои нужды»? Не говоря уже о каких-то лицензиях.
                                Если предложенное решение вас не устраивает — сделайте как вам нравится, не вижу тут никакой проблемы.
                                  0
                                  Да, нет проблем. Хотел лишь придостеречь тех, кто вдруг захочет выключить свои мозги и использовать этот код как есть.
                                0
                                >>Иногда лучше что-то не сделать и знать об этом, чем сделать два раза ничего не подозревая (например операция со счетом).

                                Если вы выполняете саму задачу и установку статуса задачи в разных транзакциях, вы всегда имеете риск, что задача отработала(посчитала), но статус не обновился.
                              0
                              В последнее время, если нужна простенькая очередь, использую REDIS. Как-то MySQL, как реляционная БД не под то заточена.
                                0
                                А как вы сохраняете очередь на случай непредвиденных рестартов сервера и т.п.?
                                Вроде у redis с этим не очень хорошо — можно потерять данные за последнюю секунду как минимум. А там может быть важная задача.
                                Если этого можно как-то избежать, ткните, пожалуйста ссылкой или напишите тут.
                                  0
                                  Вы правы, что-то может отвалиться в процессе обработки пакета. Если исключить ошибку программиста, то может отвалиться сокет, например. Но, для моих задач это не особо критично, — тем более, что вероятность очень мала — поэтому я и сказал про «простую очередь». Если кто-то в курсе серьезных отказоустойчивых примеров на Redis, тоже с радостью прочитаю.
                                    0
                                    Еще есть Mongo (серьезный) и совсем несерьезный SQLite, который тем не менее вполне неплох для однопоточных решений

                              Только полноправные пользователи могут оставлять комментарии. Войдите, пожалуйста.

                              Самое читаемое