Lowkiq. Зачем мы его сделали?

    Lowkiq — это новый сервер упорядоченной обработки фоновых задач для ruby и redis. Он был создан в компании BIA-Technologies, разрабатывающей логистические решения.



    В этой статье я расскажу о проблемах обработки фоновых заданий, с которыми мы столкнулись, и о их решении.


    Нам нужно индексировать в ElasticSearch документы по перевозке грузов. У нас есть 2 системы: система A асинхронно обрабатывает сообщения от системы B. Информация об одной и той же перевозке содержится в сообщениях различных типов, поэтому данные сначала собираются в БД, а уже потом индексируются. Таким образом, система A ставит в очередь задачу на индексацию заказа. Задача содержит только идентификатор перевозки, обработчик выбирает актуальные данные перевозки из БД, форматирует и отправляет в поисковый движок.


    Данные о перевозках обновляются настолько часто, что в очереди содержится много повторяющихся идентификаторов. Повторная обработка дубликатов приводит к увеличенной нагрузке на БД и поисковый движок. Кроме того при параллельной обработке задач новые данные могут перезаписываться старыми (состояние гонки):


    • обработчик 1 прочитал первую версию объекта из бд
    • объект изменился в бд
    • обработчик 2 прочитал вторую версию объекта из бд
    • обработчик 2 записал вторую версию объекта в поисковый движок
    • обработчик 1 записал первую версию объекта в поисковый движок

    Мы использовали Sidekiq для этого случая. Он использует списки в redis для хранения очередей
    и запускает параллельно работающие воркеры, выбирающие задачи из очереди, что и вызывает наши проблемы.


    Мы решили написать свой демон обработки задач под названием индексер для этого случая. Вместо очереди использовалось неупорядоченное множество в redis, где хранились идентификаторы перевозок, ведь нам не важен порядок и срок их индексации. Таким образом, мы сократили нагрузку, исключив обработку дубликатов. В случае падения задача просто ставилась обратно в очередь, ведь у нас нет порядка или запланированного времени исполнения.


    Чтобы исключить состояние гонки, "очередь" была разбита на шарды. Каждому шарду назначался единственный тред-обработчик. Ключом шардирования, очевидно, стала сама задача — идентификатор перевозки.


    Разбиение на шарды напоминает партицирование топиков в Kafka. Использование Kafka для хранения задач устранит состояние гонки, но не решит проблему с дубликатами.


    Спустя время мы решили расширить этот подход для более сложных случаев. Так появился Lowkiq.


    Как говорилось выше, система A обрабатывает сообщения от сторонней системы и сохраняет их в собственную базу данных. При плотном потоке сообщений возникало состояние гонки при обработке других типов сообщений и дэдлоки в бд. Исторически они не очень успешно решались собственным механизмом блокировок в redis.


    Теперь задача кроме идентификатора содержит данные о перевозке груза. Нам стали важны порядок обработки задач и предсказуемость обработки ошибок, поэтому обычное множество не подходит для хранения задач.


    Спустя несколько итераций появилась схема хранения задач в redis. Каждая задача при постановке в очередь содержит атрибуты:


    • идентификатор
    • полезную нагрузку
    • score полезной нагрузки
    • perform_in

    Задача в очереди имеет немного другие атрибуты:


    • идентификатор
    • сортированное множество полезных нагрузок, упорядоченное по их score
    • perform_in
    • retry_count

    score и perform_in — числа и по умолчанию равны текущему unix timestamp.


    Нагладно посмотреть процессы, протекающие в очереди, можно в презентации.


    Задачи с одинаковыми идентификаторами объединяются в одну задачу в очереди, при этом payloads объединяются в сортированное множество и попадают в обработку как одно целое. Это удобно, когда нужно обработать все версии объекта или взять только последнюю.


    Идентификаторы, подобно индексеру, хранятся в множестве, правда уже в сортированном по perform_in. Если не задавать perform_in, то Lowkiq будет похож на индексер, правда с увеличенными накладными расходами на хранение задач из-за более сложной структуры.


    Lowkiq унаследовал от индексера разбиение на шарды, но получил более гибкую систему распределения шардов между тредами. Теперь каждый тред может обрабатывать несколько шардов, что позволяет добавлять новые очереди без увеличения количества тредов.


    Lowkiq так же как и индексер гарантирует, что никакой другой обработчик не сможет параллельно обрабатывать задачу с тем же идентификатором.


    Несколько наших проектов используют Lowkiq в production в течение года. При этом мы по-прежнему используем Sidekiq и прочие системы очередей в обычных случаях. Если вы сталкиваетесь с теми же проблемами что и мы, попробуйте Lowkiq.

    AdBlock похитил этот баннер, но баннеры не зубы — отрастут

    Подробнее
    Реклама

    Комментарии 5

      0

      UPD: проект доступен под двойной LGPL or EULA лицензией.

        0

        Что-то или очень сложно, или я что-то упускаю.


        1. При сохранении объекта в БД с использованием оптимистической блокировки получаем его актуальную версию (та, что в update ... set Version = Version + 1 where Version = ...)
        2. Эту версию передаем в "задаче"
        3. Когда наступает время обработки, вычитываем из ES "индексированную" версию объекта и сравниваем с той, что пришла в задаче. Если версия из задачи меньше, то просто ничего не делаем
        4. Если индексировать всё же надо, то вычитываем сущность из БД (или получаем из API) и обновляем ES-индекс с использованием version_type=external
          0

          Если так смотреть, то все еще проще, эластик сам умеет оптимистические блокировки.
          Возможно вы просто не сталкивались с подобными проблемами.


          У нас очень много дублей. Оптимистические блокировки никак это не исправят, все равно нужно ходить в эластик/бд.


          Предположим, пришла задача с id = 1. Пока она лежит в очереди, пришло еще с десяток задач с id = 1. Все кроме последней задачи протухли и обрабатывать их уже не нужно. Опять таки оптимистическая блокировка не поможет.


          https://youtu.be/TFCLaZr6vxY?t=11622 вот тут cto из ecwid рассказывают почему не взяли kafka, а написали сами. Мотивы схожие.

          0
          Пока она лежит в очереди, пришло еще с десяток задач с id = 1. Все кроме последней задачи протухли и обрабатывать их уже не нужно.

          Я ровно об этом и писал. Если мы получили задачу с определенным ID заказа, то варианта развития событий всего два: мы либо обновляем индекс свежайшей информацией из БД, либо не делаем ничего, поскольку данные кто-то уже обновил и в индексе хранятся уже самые актуальные данные.


          Похода в ES можно избежать небольшой оптимизацией — хранить в памяти словарь "ID заказа — последняя полученная из ES версия". В таком случае если версия новой задачи меньше, чем та, что есть у нас в памяти, то даже в ES стучаться не надо.

            0
            Похода в ES можно избежать небольшой оптимизацией

            Среди прочего lowkiq делает это.

          Только полноправные пользователи могут оставлять комментарии. Войдите, пожалуйста.

          Самое читаемое