company_banner

Сервер очередей



    В процессе роста во многих проектах появляется необходимость решения ряда задач, связанных с очередями. Часто очереди сообщений используют как связующее звено между различными внутренними подсистемами. Несколько классических примеров:
    • отложенная обработка пользовательских данных;
    • передача статистики;
    • сглаживание нагрузки на относительно медленные системы;
    • выполнение периодических задач.


    Существует несколько подходов к организации очередей:
    • использовать реляционные базы данных;
    • применить существующие решения (RabbitMQ и т.п.);
    • написать свой велосипед.


    «Мой Мир» какое-то время использовал очереди в реляционной базе, но с ростом проекта начались проблемы с производительностью. Мы встали перед выбором: применить существующие решения или разработать свою систему.

    Для начала мы оценили первый вариант. Существует множество серверов очередей под различные задачи. В основном мы рассматривали наиболее известный, RabbitMQ, по праву считающийся лидером среди подобных систем с открытым исходным кодом.

    Несколько достоинств:
    • неплохая производительность, достаточная для большого количества задач;
    • богатый функционал, позволяющий реализовать почти любую логику работы с очередями;
    • подсистема вытеснения сообщений на диск, которая позволяет работать на серверах с недостаточным объемом оперативной памяти, пусть и с потерей производительности;
    • возможность разрабатывать плагины для расширения функциональности.


    Но в любой системе хватает и недостатков. Вот некоторые из тех, которые повлияли на наш выбор в пользу разработки своего решения:
    • Недостаточная производительность. Нам важна информация, передающаяся через очереди, поэтому мы хотим все изменения в сервере очередей писать на диск в виде бинарных логов. С включением записи сообщений на диск производительность RabbitMQ падает до слишком низких значений (<10000 вставок событий в секунду).
    • Слишком быстрое падение производительности с увеличением размера сообщения, даже без включения записи на диск.
    • Отсутствие таймаутов на обработку сообщения. Если обработчик очереди завис из-за ошибки в коде или еще по какой-то причине, то сообщение не будет передано другому обработчику, пока зависший не оборвет соединение.
    • Нестабильное время ответа на запросы, особенно на больших очередях (>10 000 000 сообщений).
    • Тот факт, что сервер определяет, кому из обработчиков отдавать события и сколько их отдавать. В сети немало статей по подгону параметров под конкретную задачу. Нас это не устраивало, хотелось вынести логику управления ресурсами обработчиков за пределы сервера. Забегая вперед, скажу — это оказалось хорошей идеей. Мы построили над серверами очередей большую инфраструктуру, отслеживающую приоритеты очередей, количество событий в очередях и свободные ресурсы на серверах с обработчиками событий. Это позволяет динамически порождать и убивать обработчики очередей, а также настраивать лимиты на допустимое множество событий, получаемых из сервера очередей за один запрос (batch processing).
    • Достаточно большой объем служебных данных на каждое сообщение. По непонятным (для меня) причинам объем занимаемой памяти на одно и то же количество сообщений может заметно отличаться от запуска к запуску.


    Итак, проанализировав плюсы и минусы этого варианта, после некоторых раздумий мы выбрали разработку собственной системы. Одним из основных доводов стало то, что в 2009 году (когда состоялся первый релиз собственного сервера очередей) существующие решения работали под нагрузкой не очень стабильно. Сейчас многое исправлено и улучшено, но аргументов в пользу своего решения все еще хватает.

    Осознав, что нам нужно, подготовили ТЗ и определили требования, предъявляемые к нашему серверу очередей:
    • Любое сообщение должно выдаваться потребителям только после срабатывания некоторого условия. В качестве условия выбрано устанавливаемое клиентом время, после которого событие считается активным и может быть получено обработчиком соответствующей очереди.
    • Сохранять все изменения (Persistence) событий на диск, в случае программных или аппаратных сбоев восстанавливаться со всеми данными.
    • Обеспечить возможность задать порядок выдачи событий обработчикам (сортировка по времени активации события).
    • Не обманывать клиента, отвечать OK на вставке только после записи данных на диск.
    • Обеспечивать стабильно низкую задержку при работе с очередью до 100 000 000 событий.
    • Работать с событиями различного размера от 1 байта до нескольких мегабайтов.
    • Не менее 15000 вставок в секунду.
    • Производительность не должна падать при работе как минимум с 1000 изготовителей событий и 1000 потребителей.
    • Обеспечить отказоустойчивость (хотя бы частичную) в случае аппаратного сбоя дисков и потери\порчи данных. При запуске сервера важно уметь определять корректные данные и отбрасывать битые записи.


    Архитектура




    Сервер очередей реализован на C модулем к нашему первому сетевому фреймворку для построения хранилищ, из которого вырос и Tarantool. Это однопоточный асинхронный сервер, использующий libev для организации event loop'а. Все запросы обрабатываются по простому бинарному протоколу на основе IPROTO.

    WAL процесс


    Все изменения пишутся на диск в виде бинарных логов с помощью отдельного WAL процесса. Идеологически все очень похоже на Tarantool, сказываются общие корни. Каждая запись подписывается с помощью crc32, чтобы в процессе загрузки проверить корректность считываемых данных. Сервер очередей, пожалуй, больше всех наших модулей взаимодействует с WAL процессом, так как практически все команды, в том числе и выдача сообщений потребителям, модифицируют состояние сервера, и их необходимо писать на диск.

    Dumper


    Время от времени порождается процесс, сохраняющий полный образ текущего состояния пользовательских данных и необходимую служебную информацию на диск. По большому счету Dumper не необходим, но позволяет ускорить подъем сервера после перезапуска, так как достаточно прочитать последний snapshot и применить только те бинарные логи, что сделаны после записи образа данных.

    Say Logger


    Последний процесс отвечает за запись текстовых логов на диск. Часто логи полностью отключают на боевых серверах из-за ухудшающейся производительности; мы постарались избежать этой участи. Для этого порождается отдельный процесс, в котором выполняется внешний логгер, к примеру cronolog. Общение реализовано с помощью socket'ов таким образом, что мы можем работать в одном из двух режимов:
    • ждать запись на диск. Задержки записи логов ухудшат общую производительность.
    • игнорировать переполнение очереди сообщений к процессу логера. Это приведет к потере некоторых записей, но позволит не зависеть от производительности диска с текстовыми логами.


    Погружаемся дальше




    Все события в очереди находятся в одном из трех состояний:
    • Неактивно. Сообщение принято сервером сообщений, но его нельзя отдавать обработчикам очередей до наступления времени активации.
    • Активно. Время активации события наступило и событие может быть выдано обработчикам очередей.
    • Заблокировано. Событие уже выдано и ожидает подтверждения об обработке. Может быть выдано повторно, если через X секунд не придет команда на удаление события.


    В каждой очереди организованы индексы под события в каждом из трех состояний и еще один центральный индекс по всем событиям очереди.

    Сервер очередей работает с двумя типами очередей. Логика различается в политике выдачи ID-сообщений: либо ID выдает сервер, либо клиент. Наличие идентификатора у всех сообщений позволяет реализовать расширенную логику работы с очередями. Помимо вставки, получения и удаления, поддерживаются команды изменения данных или времени активации сообщения. Это позволяет организовать перестановку сообщений и изменение статуса обработки в рамках одной очереди. Если у вас есть периодические действия, необязательно удалять событие после его обработки — достаточно переставить время активации на нужное количество минут\часов\дней.

    Сервер очередей поддерживает транзакции. Все необходимые для ответа действия (аллокация временных данных, проверка наличия нужных событий, подготовка буферов соединений) выполняются перед записью на диск. При возникновении ошибок все изменения откатываются. Не поддерживается выполнение нескольких команд в рамках одной транзакции. Механизм транзакций обеспечивает эксклюзивное владение правом на изменение события. Одновременная попытка двух клиентов совершить действия, изменяющие состояние какого-то события, закончится возвратом ошибки с соответствующим кодом второму клиенту.

    Для корректной активации событий в сервере реализована служебная команда, которую нельзя вызвать по сети — она активируется по таймеру. Внутри организована служебная очередь по всем пользовательским очередям, отвечающая за активацию событий.

    При получении запроса от обработчика на новую порцию событий (обычно в районе 1000) из определенной очереди сервер выполняет следующие действия:
    • создаем транзакцию;
    • ищем указанную очередь;
    • блокируем необходимое количество сообщений из вершины индекса активных сообщений;
    • формируем служебную запись о блокировке выбранных событий;
    • отправляем служебную запись в WAL процесс;
    • получаем ответ от WAL процесса;
    • применяем транзакцию к данным в памяти, переносим события в очередь заблокированных;
    • пишем ответ клиенту из заранее подготовленных данных;
    • завершаем транзакцию, снимаем внутренние блокировки со всех событий и чистим временные данные транзакции;
    • в случае каких-либо ошибок на любом из этапов откатываем транзакцию и сообщаем клиенту причину.


    Обработка дублирования вставок сообщений. В случае проблем с сетью или высокой загрузкой сервера клиент может решить, что наступил таймаут, сервер не обработал сообщение и нужно его послать еще раз; однако сервер к этому времени может уже обработать сообщение. С очередями, в которых клиент выдает ID, все просто: в данных запроса есть ID — проверяем, есть ли такой в очереди, и, если это так, отсылаем ошибку.

    В очередях с внутренней выдачей ID ситуация сложнее: у сервера нет однозначного признака, по которому можно понять, что текущий запрос — это на самом деле дубль запроса, обработанного несколько секунд назад. «Значит, такой признак надо добавить», — решили мы и ввели в пакет 2 дополнительных поля: RequestID, гарантированно уникальный в рамках одного процесса клиента, и PID процесса. На сервере очередей организован кэш insert'ов по ключу {ClientIP, RequestID, PID}, позволяющий отследить дубликаты запросов в течение 10–15 минут. На практике этого более чем достаточно. Потенциальный недостаток — метод не работает через NAT, так как у всех клиентов окажется один и тот же IP и, соответственно, возможны ложные срабатывания.

    Создание и настройка очередей. Для упрощения конфигурации очередь автоматически создается с настройками по умолчанию при первой попытке вставки сообщения в указанную очередь. В конфигурации могут быть заданы настройки конкретной очереди, размер порций для попыток активации событий, время, после которого событие из очереди переходит из статуса заблокировано в статус активно и т. п.

    Кстати, замечу, что сейчас я бы не стал делать автоматическое создание очередей. Это прижилось и нравится разработчикам бизнес-логики, использующим очереди направо и налево, но отладка всего этого великолепия отняла немало времени и сил.
    Неожиданно много corner case'ов всплыло в процессе тестирования, ради редких ситуаций пришлось написать немало кода. Основные проблемы выявились при обработке отката транзакции события, порождающего новую очередь. Если во время создания новой очереди и записи события на диск другие клиенты пытаются добавить события в еще не созданную очередь, приходится понимать, что очередь в процессе создания. Ситуация осложняется, если это — очередь с внутренними ID, в которых сервер сам выдает ID сообщений в ответ на команды вставки. Все события в созданную очередь блокируются до завершения процесса создания очереди, при этом им уже назначается ID. Если транзакцию, создающую очередь, приходится откатывать, необходимо откатить и все транзакции зависимых событий, ожидающих создания очереди. Звучит страшно, а в коде еще страшнее.

    Подводим итоги


    Хорошее
    • Хорошие показатели производительности — не меньше 50000 rps на вставках. Производительность зависит исключительно от мощности дисков и количества записей, через которое нужно вызывать системный вызов fdatasync.
    • Работа с большими очередями. Одно время у нас были очереди по 170 000 000 сообщений на боевой сервер.
    • Стабильная работа с неравномерной нагрузкой (в каких-то очередях высокая интенсивность, в каких-то часто приходит пиковая нагрузка).
    • Хорошие результаты SLAB-аллокатора — и по производительности и по фрагментации (обычно 90% сообщений в рамках одной очереди имеют одинаковый или близкий размер).
    • Стабильность системы в целом. Ежедневно мы обрабатываем миллиарды сообщений на множестве серверов очередей. За последние 2-3 года не было ни одного сбоя по вине программной части.


    Плохое и задачи на будущее
    • Ряд наследственных проблем, полученных от использованного сетевого фреймворка. Все они должны уйти с переводом на кодовую базу Tarantool.
    • Шардинг на клиенте.
    • Обработка дублирования сообщений — ее стоило бы переделать. В принципе работает, но проблема с NAT'ом смущает.
    • Необходимо выделить создание очередей отдельной командой.
    • Иногда хочется хранимых процедур на Lua для расширения возможностей по работе с очередями. Пока что не настолько часто, чтобы дошли руки до реализации.
    • Все события всегда в памяти. Теоретически хорошо бы вытеснять события, которые нескоро активируются на диск. А на практике пока что для нас важнее стабильное время ответа на запросы к серверу.


    Использование очередей в Моем Мире


    • Отложенная обработка действий пользователя. Не лучшая идея заставлять пользователя ждать, пока вы сохраните его данные в SQL базу или другое хранилище (а зачастую необходимо вносить изменения сразу в несколько систем). Хуже того, в некоторых реализациях (в основном в маленьких проектах у молодых разработчиков) данные могут вовсе не попасть в хранилище, если клиент оборвал соединение, не дождавшись ответа. Хорошей практикой является добавление события о пользовательских действиях в достаточно быстрый сервер очередей, после чего можно отвечать клиенту, что операция прошла успешно. Всю остальную работу надежно и эффективно выполнят обработчики очереди. В качестве бесплатного бонуса получите упрощение кода на frontend-серверах, которым будет достаточно общаться только с демоном очередей для внесения изменений в любые хранилища. Знания о бизнес-логике различных данных можно вынести в обработчики очередей.
    • Рассылка сообщений, писем и т.п. Вам необходимо отослать большое количество данных, при этом не перегрузив хранилища всплеском запросов. Легко! Варьируя количество обработчиков очереди, размазываем пиковую нагрузку до разумного уровня, чтобы время обработки клиентских запросов в те же источники данных не ухудшилось. И, самое главное, с помощью очередей легко избежать дублирования сообщений. Крайне неприятно получить два письма об одном и том же событии. Для периодических рассылок достаточно после обработки сообщения обновлять время его активации, а не удалять: в нужное время оно снова будет обработано.
    • Транспорт для «надежной» статистики. Передача важных (все, что связано с деньгами) данных на агрегаторы статистики. Системы агрегации статистики обычно требовательны к ресурсам процессора, и при обработке данных могут не обеспечивать необходимое для frontend-серверов время ответа. Еще одна особенность подобных систем — неравномерная загрузка, обычно связанная с обработкой данных порциями. Передача статистики через серверы очередей позволит избежать проблем с нестабильной задержкой и при этом сохранит гарантию доставки.
    • Группировка событий. Если группа событий будет обращаться к одному и тому же набору данных в других системах, имеет смысл ставить одинаковое время активации, так как даже если установить время активации в прошлом, события отсортированы по времени активации. Физический смысл ухищрений в более эффективном использовании кэшей хранилища, в которое пойдут запросы из обработчиков сообщений.
    • Каскадные очереди. Организация конечного автомата из нескольких очередей путем перекладывания данных между очередями по завершении очередного этапа обработки. Часто необходимо в процессе обработки сообщения выполнить ряд действий, сильно различающихся по необходимым ресурсам. В таком случае разнесение «быстрых» и «медленных» действий по разным этапам (очередям) позволяет эффективно управлять необходимым количеством ресурсов, варьируя число обработчиков для каждой очереди. Дополнительно выигрываем в упрощении кода обработчиков и поиске ошибок в бизнес логике. По графикам очередей можно понять, на каком из этапов копятся события и в каком обработчике нужно искать проблемы.


    В проекте используем клиенты на Perl и C, в других проектах реализовали клиенты на PHP, Ruby и Java.

    P.S.: Специально не стали рисовать таблички со сравнением производительности с существующими подсистемами. Нельзя (я не знаю систем с подходящими возможностями) сравнить с тем же функционалом, что мы используем в бою, а без этого получится еще один тест сферического коня в вакууме.

    P.P.S.: Описание некоторых компонентов (административный интерфейс, локальная реплика и т.п.) опустили, так как они реализованы схожим образом в Tarantool.

    P.P.P.S.: В одной из следующих статей постараемся рассказать о нашей инфраструктуре по работе с очередями — о том, как управляем ресурсами, как отслеживаем состояние очередей, как организован шардинг событий между серверами очередей и о многом другом.

    Если есть какие-то вопросы, задавайте в комментариях.
    Mail.ru Group
    424,00
    Строим Интернет
    Поделиться публикацией

    Похожие публикации

    Комментарии 56

      +13
      Не нашёл в тексте ссылку на исходный код проекта, он (код) закрыт?
        +4
        Проект закрытый.
        Постараемся открыть после перехода на кодовую базу Tarantool.
          0
          Уже ждем.
        +1
        В в случае, если-бы вы сегодня озадачились аналогичной проблемой, подошло-ли бы вам какое-либо из существующих решений?
          +1
          Для части задач подходит нынешняя версия RabitMQ
          они сильно улучшили производительность в районе 2.8.х

          Для остальных не знаю подходящего аналога.
          +1
          planet.jboss.org/post/8_2_million_messages_second_with_specjms

          Я просто оставлю это здесь:)

          Кстати, а как у вас с отложенной доставкой сообщений и отмены отложенной доставки?
            0
            Всего максимум 125 байт на сообщение, что существенно меньше, чем требуется авторам, судя по описанию. Мы же не знаем, для какого реального объема сообщений заявлена метрика «не меньше 50000 rps на вставках».
              0
              Сразу скажу — этой системы не видел.

              что бросилось в глаза при первом знакомстве:
              1. не указан размер сообщений которыми тестировали
              2. в тестовой конфигурации сервера указано 12 SAS дисков
              «HP StorageWorks 146GB 15K dual port SAS LFF disk drives 512547-B21»

              Если загружают все диски, то, указанная цифра в 8 млн кажется маленькой. Опять же, зависит от размера сообщения.

              я то же не указал конфигурацию.
              В нашем случае эталон это 1 инстанс сервера очередей на одном типовом SATA диске. Размер сообщений равен 4096 байт пользовательских данных.

              По другим тестам производительность не выглядит впечатляющей, возможно неправильные тесты (http://integr8consulting.blogspot.ru/2011/02/jms-speed-test-activemq-vs-hornetq.html)
                0
                Если я правильно понял вопрос про отложенную доставку:
                — если нужно отложить обработку события приходит команда на переустановку времени активации события.
                — если нужно отложенное событие снова вернуть в пул готовых к обработке приходит та же команда, устанавливающая время активации в текущее значение или в прошлое.
              +1
              И казалось бы, причем тут цепи Маркова? :)
                +1
                А в сторону Apache Kafka, Apache Flume смотрели?
                  0
                  На момент разработки Apache Kafka еще не существовала
                  сейчас Kafka выглядит крайне интересно.
                  Нет смысле переделывать существующую архитектуру ради Kafka
                  в новом проекте можно использовать Kafka для транспорта важной статистики, транспорт между внутренними хранилищами и т.п.

                  мне нравится как используется Kafka в хранилище druid (http://druid.io)
                  0
                  В рамках Rabbitmq подтверждаю все сказанное и для текущих версий. Плюс были проблемы стабильности.
                    0
                    Т.е. все перечисленные проблемы остались?
                      0
                      — непредсказуемая память
                      — очень, ОЧЕНЬ, тяжело распределять нагрузку (приложение celery-based), нормальная ситуация, что 2 потребителя загружены под завязку, а еще 8 по 1 задаче в секунду берут (это примерно при 2000-10000 вставок в секунду).
                      — катастрофическое падение производительности при больших ответах (так получилось тезнически, что в celery-based приложении пришлось отдавать ответы через rabbitmq, вес ответа доходил до 30 Мб, при около 2000 вставок/сек машина с двумя 8-core процами и 16Гб оперативы по LA уходила в 6-10)
                        0
                        Ну вот (
                        Уже думал, что это следующий шаг для нас после Gearman.
                          0
                          Попробуйте. В моем случае оснонвная проблема была в том, что разрабы не хотели вылечить свой код, и отдавать ответы через redis.

                          Думаю большая часть моих проблем и была связанно с тем, что результаты передавались через rabbit.

                          Я для себя писал с участием кролика пару проектов, где не было такой «фичи», у меня на стресс-тесте (яндекс танком) на среднего пошиба виртуалке на DO, все нормально жило при 15000 вставок/сек. На продакшене еще не смотрел, но не думаю, что что-то изменится.

                          Тут главное изначально подойти с умом к кролику, а не латать дыры. ИМХО.
                            0
                            Да, нагрузка пока маленькая, и этот этап в любом случае нужно пройти.
                              0
                              Опять же, повторюсь, с моей точки зрения — обратите внимание на размер ответов. Я делал, что в ответе идет клюс, по которому я с редиса забираю. Оверхед куда менше, чем задержки с раббитом. Ну и цена работы куда ниже.

                              Было подозрение, не подтвержденное, что как раз те потребители, которые отдавали большие данные и не брали потом больше задач. И когда загруженные потребители тоже отдавали данные, кролик все равно им пизал новые задачи.
                                0
                                А у кролика вообще заявлена балансировка, может он и не умеет?
                                  0
                                  Там есть параметры, лучше сами посмотрите.
                                  Я игрался с кластером через haproxy, проблем не обнаружил. Но и в продакшн не сталвил.
                    0
                    Отложенная обработка действий пользователя.

                    А как решаются проблемы возникающие в результате задержки записи в базу? Ведь как я понял из описания данные от клиента ушли, но в базу еще не записались, т.к. находятся в очереди на запись. Клиент повторно запросил данные и:
                    * отдались старые данные из базы?
                    * из какого-то быстрого кэша?
                    * прямо из очереди?
                    * показали клиенту на фронте данные которые он оправил? (а если открыл в другом браузере?)
                    * просто забили?
                    Не очень момент понятен. Для примера пусть это будет обновление юзером своего профайла.
                      0
                      Никакой магии.
                      — пользователь отредактировал свой профиль, сообщение ушло в очередь.
                      — пользователю скажут что все ОК и покажут обновленные данные.
                      — если пользователь нажал F5 до того как данные дошли до хранилища, он увидит старые данные.

                      При нормальной работе сервиса события из очереди обрабатываются за несколько миллисекунд, данные окажутся в хранилище раньше чем браузер пользователя отобразит ответ от сервера.
                      Выгода проявляется во время сбоев, что-то с сетью, часть хранилищ перегружено, или аппаратный сбой, да все что угодно. В таком случае данные из очереди попадут не сразу в хранилище, важно, что они туда обязательно попадут. Неважно, ждет пользователь ответ от сервера, или уже ушел.
                        0
                        А есть что-то подобное постингу топиков? Как в этом случае?
                        Пользователь запостил сообщение, оно ушло в очередь, но из-за сбоя сети сообщение попало в очередь, но не ушло в базу. Допустим, что и у пользователя произошел сбой и ему пришлось обновить страницу. Своего сообщения он не увидел, ибо в базе его еще нет, поэтому он тут же постит точно такое же (сохранил перед постингом в блокноте, я сам так делаю регулярно). Оно так же попадает в очередь. Я правильно понимаю, что при записи в базу второго сообщение будет отклонено (считаем md5 хэш) если оно дословно повторяет первое, и размещено, если оно примерно такое же?
                          0
                          Описанный случай выходит за рамки ответственности сервера очередей
                          Это чистой воды бизнес логика обработчика сообщений из очереди.
                          В каких-то случаях можно по уникальному идентификатору контента догадаться что это то же самое что пытались вставить, а в каких-то банальный MD5
                      +2
                      Мы пару лет как разрабатываем свое решение — IronMQ, есть вариант как сервиса так on-premise. Из описанных вами фич разве что еще нет приоритетов сообщений, но выруливаем при помощи разных очередей.
                        0
                        Интересный проект
                        расстраивает текстовый протокол для взаимодействия с сервером
                          0
                          Также есть частичная реализация протокола beanstalk (хотя там тоже ASCII over TCP)
                          0
                          Крутая реализация! On-premise только оп запросу?
                            0
                            Пока да.
                          0
                          У вас по ТЗ сразу получился сервер задач, а не сервер очередей сообщений. Ключевое отличие как раз в том, с чего у вас начинается первая картинка — существование такого признака как «время активации». Отсюда и причины некоторых других различий.
                          А в целом всё круто, респект. Особенно респект за каскадные очереди, мы тоже голову сломали в своё время, пока додумались.
                            0
                            Все верно,
                            нам нужен был нестандартный сервер сообщений, скорее специализированное хранилище
                            поверх которого построена ферма для выполнения разнообразных задач
                              0
                              Даже боюсь спрашивать, что думаете по поводу Gearman?
                                0
                                К сожалению, ничего не могу сказать.
                                Это относительно новая система, на тот момент в Моем Мире не один год существовал свой сервер приложений
                            0
                            Может ли очередь жить в нескольких инстансах сервера (гео распределённость) на случай нештатных ситуаций?
                              0
                              К сожалению, нет.
                              Очередь может быть пошаржена между несколькими серверами
                              одно сообщение всегда обрабатывается с конкретного мастера шарда, жить может и в репликах

                              В планах есть реализация этого функционала, но, это будет нескоро.
                              0
                              В сторону NSQ смотрели?
                                0
                                а разве производительности тарантула не хватает?
                                его вполне хорошо можно использовать для очередей

                                вопрос 2 — кто пишет сервер очередей или перефразирую вопрос:
                                команда тарантула в разработке как-то участвует ;)
                                или просто взят их код?
                                  0
                                  не хватает,
                                  очереди в тарантуле хороши для быстрого старта
                                  по сравнению с нашим решением больше дополнительных данных на каждое сообщение

                                  Разрабатывает команда Моего Мира.
                                  Команда Tarantool не участвует.
                                  По родословной все наоборот. Tarantool родился из фреймворка разработанного в Моем Мире
                                  +1
                                  Фейсбучный beanstalkd? (pевью на Quora)
                                    0
                                    По описанию очень похоже.
                                    0
                                    А запись на диск делается аналогично как в Redis? Я правильно понимаю, что запись выполняется с некоторой периодичностью (таймер? событие?) и если в очередь попало сообщение, но до момента следующей записи сервер упал, то сообщение теряется?
                                      0
                                      write вызывается на каждую команду.
                                      Мы не отвечаем клиенту пока не получили подтверждение о выполнении write (это еще не гарантия того что данные попали на диск)
                                      Каждые N записываемых записей, вызываем fdatasync, что бы данные все таки попали на диск.

                                      Что у нас получается:
                                      — Если упал софт, но, сервер не перезагружался — все нормально. Не будет потеряна ни одна команда.
                                      — Если отключилось питание на сервере, то, мы можем потерять несколько записей которые находились в файловых кэшах, но, еще не записаны операционной системой на диск (записи после последнего fdatasync)

                                      Кэш в жестких дисках считаем выключенным.
                                        0
                                        Т.е. так же как в Redis если нет кластера, то потеря данных возможно при падении всего сервера. Значит ли это, что через этот сервер очередей критичные данные (биллинг там) не ходят?
                                          0
                                          Все верно.
                                          Единственное что мы можем обеспечить, что потеряно будет не больше N записей
                                      0
                                      Коммерческие серверы очередей рассматривались? Например IBM WebSphere MQ?
                                        0
                                        Нет
                                        Мы используем только системы с открытым исходным кодом
                                        0
                                        Приветствую…

                                        Уже в который раз удивляюсь желанию скрестить «ежиков и ужиков». Тем не менее, идея протащить scheduling в messaging возникает раз за разом :-) Речь идет о, так называемой, отложенной доставке — delayed delivery.

                                        Просто интересно, вы задумывались над тем почему, например, в AMQP этого нет? А, например, TTL у сообщений есть. Почему этого нет в стандарте JMS?

                                        Желание, «с экономить на спичках» и при этом потенциально поиметь геморрой размером даже не с кулак… Я не говорю, что потребности в DD нет — она как раз возникает почти всегда когда пытаются использовать messaging. Но вот способ, которым в подавляющем большинстве случаев пытаются эту потребность удовлетворить, вызывает то смех, то слезы.

                                        Люди… ну объясните, почему вы считаете, что DD это задача «транспорта»?!
                                          0
                                          Поддерживаю, тут явно 2 системы в одной. Причем и система очередей, и система планировщика явно выиграли бы от разделения.

                                          Причем события, которые нужно выполнить «сейчас», вполне ложатся под идею планировщика. А вот отложенные события в идею очередей – никак.

                                          Рассуждая дальше приходим к выводу, что это не система очередей, а планировщик, использующий проприетарную самописанную систему очередей.

                                          Что вобщем-то довольно полезно.

                                          Хотя сервер очередей можно было бы вынести во внешний протокол и разрешить использование сторонних MQ-серверов через протокол AMQP например.
                                            0
                                            Здравствуйте,
                                            выше писал что у нас получилась специализированная система, не соответствующая классическому представлению о Message Queue

                                            Почему и зачем — у нас были задачи, которые нужно было решать.
                                            Сделать это существующими решениями с необходимой производительностью не удалось.
                                            Для нас смешение логики оказалось очень удачным.
                                            Сервер очередей одна из немногих активно используемых систем, которая не вызывает постоянного желания что-то доделывать и переделывать.
                                            Наше решение пришлось по вкусу и в других проектах, как оказалось у них то же хватает «странных» задач
                                              0
                                              И вам — не хворать.

                                              Я-то представляю себе, что примерно у вас получилось :-) Тем не менее, в заголовок вы вынесли именно «Сервер очередей».

                                              >> Почему и зачем — у нас были задачи, которые нужно было решать.
                                              >>Сделать это существующими решениями с необходимой производительностью не удалось.

                                              Сдается, мне — не больно то и хотелось :-) Но, мне действительно интересно, изучали ли предметно тот же AMQP, или просто «по втыкали» в «кролика» и успокоились? Почему ограничились только «кроликом»? Если я все правильно понял про 2009 год, то он чуть менее чем ужасен в плане реализации AMQP на тот момент. Тот же qpid от Апачей на то время — практически эталон.

                                              >> Для нас смешение логики оказалось очень удачным.

                                              Ну дай-то бог :-) Рад за вас, если это действительно так.
                                              От себя могу сказать, что за 10 лет я таких «удачных» решений видел множество. И все они — какие-то раньше, какие-то и после нескольких лет эксплуатации — показывали свое истинное лицо :-)

                                              Что касается вашей системы, то, осмелюсь предположить, что ваши «группировки событий» — на самом деле «костыль со стразиками» :-)

                                              По тексту не видно большинства интересующих нюансов… но, таки попробую полюбопытствовать. Я правильно понял, что сервер у вас не знает о наличии/отсутствии подписчик[а/ов]?
                                                0
                                                Я-то представляю себе, что примерно у вас получилось :-) Тем не менее, в заголовок вы вынесли именно «Сервер очередей».

                                                Правильно, так как система ближе всего именно к серверу очередей

                                                Сдается, мне — не больно то и хотелось :-) Но, мне действительно интересно, изучали ли предметно тот же AMQP, или просто «по втыкали» в «кролика» и успокоились? Почему ограничились только «кроликом»? Если я все правильно понял про 2009 год, то он чуть менее чем ужасен в плане реализации AMQP на тот момент. Тот же qpid от Апачей на то время — практически эталон.

                                                Пробовали разное, но, больше всего RabbitMQ.
                                                В qpid для сохранения данных на диск используется bdb, о какой производительности можно говорить… В моих экспериментах результаты были хуже RabbitMQ

                                                От себя могу сказать, что за 10 лет я таких «удачных» решений видел множество

                                                Звучит очень знакомо :)
                                                Сейчас отвлекусь от очередей, Вы затронули популярную тему.

                                                Обычно, новые сотрудники первые 3 месяца высказываются аналогично. А потом приходит понимание, зачем приходится писать свои велосипеды.
                                                Мы пытаемся упростить себе жизнь и использовать сторонние разработки, к сожалению, очень редко удается пройти этап тестирования.
                                                В подавляющем большинстве случаев разрекламированные решения не устраивают или по производительности, или по надежности или по критерию стабильности работы. Пробовали привлекать разработчиков сторонних систем, писать патчи — чаще всего не помогает.
                                                И приходится в очередной раз разрабатывать свои собственные решения, менее универсальные, зато оптимизированные под наши задачи.

                                                Это не значит что мы не используем чужие системы — используем, но, не так часто как хотелось бы.

                                                Что касается вашей системы, то, осмелюсь предположить, что ваши «группировки событий» — на самом деле «костыль со стразиками» :-)

                                                Не совсем так. Это своего рода бонус, который позволяет небольшими доработками улучшить производительность в критичных местах. Не будете же Вы говорить, что улучшать локальность данных, для того что бы уменьшить промахи кэша процессора сродни костылю? В данном случае работает тот же принцип.

                                                Я правильно понял, что сервер у вас не знает о наличии/отсутствии подписчик[а/ов]?

                                                Все верно. Вся логика управления подписчиками вынесена в отдельную подсистему. Это отдельный кластер машин со своей внутренней диспетчеризацией и управлением ресурсами выделяемыми для обработки той или иной очереди.

                                                  0
                                                  В qpid для сохранения данных на диск используется bdb, о какой производительности можно
                                                  говорить… В моих экспериментах результаты были хуже RabbitMQ


                                                  Если меня не подводит мой склероз, то в те времена «кролик» для message store использовал исключительно mnesia. А в qpid, опять же, емнип, «испокон веков» persister — вещь настраиваемая. Заменить bdb на что-то более подходящее — не проблема. По крайней мере у нас с этим никаких проблем не возникло в свое время. И отказались мы от него по совсем другой причине.

                                                  Звучит очень знакомо :)
                                                  Сейчас отвлекусь от очередей, Вы затронули популярную тему.


                                                  Я может быть не вполне ясно выразился… но, под «удачными решениями» я имел ввиду исключительно попытки «вшить» scheduling в messaging. А не какие-то абстрактные «велосипеды».

                                                  Не совсем так. Это своего рода бонус, который позволяет небольшими доработками улучшить производительность в критичных местах.


                                                  Да понятно, что бонус :-) Не было бы бонуса, не было бы и «стразиков» :-) А можно, тогда по подробней за эти группировки рассказать? Я же правильно понимаю, что речь идет о «группировке» сообщений одного и того же продюсера, которые он положил в разные очереди? «Группировку» делает сам продюсер, а не сервер?

                                                  Все верно. Вся логика управления подписчиками вынесена в отдельную подсистему.


                                                  Хм… но это означает, в том числе и то, что сервер не имеет возможности уведомить подписчика о том, что то сообщение, которое он обрабатывает, повторно активировано. Т.е. что истек timeout и данное сообщение разблокировано.

                                                  Каким образом избегаете повторной обработки сообщений, если не секрет?

                                                  И вот смотрите… отдельная подсистема «управления подписчиками» у вас есть. Но, что-то помешало вам сделать отдельную (от messaging) подсистему для DD. Хотя, казалось бы :-) Вот в чем сакральный смысл хранить активные и не активные сообщения в одних и тех же очередях? Это же еще и мониторинг системы усложняет.
                                                    0
                                                    Заменить bdb на что-то более подходящее — не проблема

                                                    Насколько помню, в то время, из альтернатив bdb qpid поддерживал только SQL базы в качестве системы записи данных

                                                    под «удачными решениями» я имел ввиду исключительно попытки «вшить» scheduling в messaging

                                                    Но, что-то помешало вам сделать отдельную (от messaging) подсистему для DD. Хотя, казалось бы :-) Вот в чем сакральный смысл хранить активные и не активные сообщения в одних и тех же очередях?

                                                    Суровая реальность вносит коррективы в идеальную картину мира :)
                                                    Потребуется в 2 раза больше дисков, 1 в сервере очередей, чтоб не потерялись события, другой в планировщике, так как в нем большое количество событий могут жить длительное время. Если писать все это на один диск, упадет производительность, на каждое событие придется делать 2 записи на диск вместо одной.
                                                    Что делать если клиент захотел внести коррективы в сообщение, уже доставленное планировщику? Можно слать через систему очередей служебные сообщения для планировщика, который будет понимать что это сообщение с мета данными полученного ранее события. Можно обучить клиентов знанию еще и о планировщике и слать изменения напрямую в него. Но, это тоже приводит к нехорошим последствиям. Сообщение по каким-то причинам может быть еще не получено планировщиком из очереди сообщений, а клиент уже прислал изменения. Получается много граничных условий, усложняющих жизнь
                                                    Это всего лишь несколько примеров.
                                                    Скрестив 2 сущности в одной, мы не очень-то усложнили реализацию сервера очередей, зато добились упрощения архитектуры проекта в целом.

                                                    Я же правильно понимаю, что речь идет о «группировке» сообщений одного и того же продюсера, которые он положил в разные очереди? «Группировку» делает сам продюсер, а не сервер?

                                                    Речь идет о группировке событий в рамках одной очереди с помощью установки одинакового времени активации. Необязательно должен быть именно один продюсер, обычно группировка делается по внутреннему id данных, например по id пользователя, это позволяет группировать события разным продюсерам.
                                                    Сервер про группировку ничего не знает.
                                                    Приведу пример:
                                                    — Анти спам решил что пользователь X потенциальный спамер (неважно почему)
                                                    — Мы хотим проверить последние Y действий пользователя попадающих в ленту к друзьям (загрузка фото, высказывания и т.п.)
                                                    — Продюсер получает список ID этих Y событий (из какого-то хранилища)
                                                    — Для каждого ID из этих Y порождает событие в очередь и устанавливает одинаковое время активации
                                                    В чем польза:
                                                    Обработчики событий будут ходить в дисковое хранилище, в котором лежит вся активность пользователей и получение каждого события, если его нет в кэше хранилища — это поход на диск.
                                                    Если же события активируются в близкий период времени, то, поход на диск если и будет, то только один. При чтении данных с диска в большинстве наших хранилищ делается prefetch, в кэше на какое-то время оседают и другие события пользователя, а не только те, что были запрошены select'ом из конкретного обработчика.
                                                    Таким образом мы уменьшаем суммарное время обработки всех Y событий пользователя X.
                                                    И все это на распределенной ферме подписчиков, без каких либо ухищрений в виде синхронизаций на уровне обработчиков.

                                                    Хм… но это означает, в том числе и то, что сервер не имеет возможности уведомить подписчика о том, что то сообщение, которое он обрабатывает, повторно активировано. Т.е. что истек timeout и данное сообщение разблокировано.

                                                    Каким образом избегаете повторной обработки сообщений, если не секрет?

                                                    Верно, сервер никого ни о чем не уведомляет.
                                                    Рецепт простой, проблем не доставляет.
                                                    Время реактивации сообщений в большинстве очередей выставляется в 1 — 2 часа.
                                                    Ни одно сообщение не может так долго обрабатываться. Если за это время оно не было обработано, значит что-то случилось или с обработчиком, или с хранилищем в которое ходит обработчик.
                                                    Если среднее время обработки сообщений начинает расти, или в очереди копятся заблокированные события — срабатывает мониторинг и разработчики бизнес логики идут искать ошибки.
                                                      0
                                                      Насколько помню, в то время, из альтернатив bdb qpid поддерживал только SQL базы в качестве системы записи данных


                                                      Сдается мне — плохо помните :-) Емнип, SQL persister был написан сторонними ребятами специально для Windows релиза, т.к. там были сложности с работой libbdbstore. Но, это даже не важно… важно то, что весь persistence слой в qpid — by design — это plug-in. В отличие от «кролика», где — на тот момент — использование dets и mnesia было «прибито гвоздями».

                                                      Так что, реализовать свой message persister для qpid — абсолютно не проблема. По крайней мере, у нас это проблем не вызвало.

                                                      Суровая реальность вносит коррективы в идеальную картину мира :)


                                                      Я как-то слабо начал Вас понимать. Вы хотите меня убедить в том, что в текущей реализации изменение состояния сообщения (active/passive) не требует дисковых операций?! Или в чем?

                                                      Рассуждение о том, что «в 2 раза больше дисков» я, с вашего позволения, спишу на выходные :-) Мне сложно представить причину зачем это может потребоваться в реальности. Но — не спорю — реализовать можно и так, что нужно будет и в 3 раза больше дисков :-)

                                                      Что делать если клиент захотел внести коррективы в сообщение, уже доставленное планировщику?… и т.д. по скипано мной


                                                      Мне совершенно не понятно, зачем продюсеру с «планировщиком» общаться асинхронно? С сервером очередей же взаимодействие синхронное. Я правильно понял? Зачем — при наличии «планировщика» — продюсеру вообще знать о сервере очередей?

                                                      Более того, если «планировщик» реализован как некий фасад сервера очередей, вполне возможно обеспечить и zero-copy сообщений.

                                                      Речь идет о группировке событий в рамках одной очереди с помощью установки одинакового времени активации. Необязательно должен быть именно один продюсер...


                                                      Хм… а можно тогда пример, с разными продюсерами? А то как-то не понятно, каким образом этим _разным_ продюсерам удается промеж собой договорится об одинаковой группировке? Плюс, из примера не ясно, в чем же именно заключается _группировка_? Только в том, что у этих событий одинаковое время активации? Т.е. все события в одной очереди с временем активации X — это одна и та же группа?

                                                      По вашему примеру вопрос: почему в описанном случае продюсер не может сам прочитать эти Y событий, и послать уже данные этих событий, а не их ID? В этом, случае, имхо, гораздо больше гарантий что пробоя кэша при чтении не случится.

                                                      Верно, сервер никого ни о чем не уведомляет.
                                                      Рецепт простой, проблем не доставляет.
                                                      Время реактивации сообщений в большинстве очередей выставляется в 1 — 2 часа.
                                                      Ни одно сообщение не может так долго обрабатываться.


                                                      Извините, но тут я вообще перестаю что-либо понимать. Понятно, что два часа на обработку сообщения — это много. Но сдается мне, если обработчик забрал у сервера 1000 сообщений, и у всех у них «время реактивации» выставлено пусть даже в 2 часа, то у обработчика на самом деле есть только 7.2 секунды на обработку одного сообщения. Иначе, последнее сообщение — будет повторно активировано.

                                                      7 секунд — это, конечно, тоже не мало… но и с выставленным «временем реактивации» у сообщения тоже имеет мало общего. Или обработчик, как-то дополнительно уведомляет сервер о начале обработки конкретного сообщения?
                                            0
                                            Если не сложно, то могли бы привести пример каскадных очередей в ваших проектах?

                                            Только полноправные пользователи могут оставлять комментарии. Войдите, пожалуйста.

                                            Самое читаемое