Pull to refresh
VK
Building the Internet

Сервер очередей

VK corporate blog Programming *


В процессе роста во многих проектах появляется необходимость решения ряда задач, связанных с очередями. Часто очереди сообщений используют как связующее звено между различными внутренними подсистемами. Несколько классических примеров:
  • отложенная обработка пользовательских данных;
  • передача статистики;
  • сглаживание нагрузки на относительно медленные системы;
  • выполнение периодических задач.


Существует несколько подходов к организации очередей:
  • использовать реляционные базы данных;
  • применить существующие решения (RabbitMQ и т.п.);
  • написать свой велосипед.


«Мой Мир» какое-то время использовал очереди в реляционной базе, но с ростом проекта начались проблемы с производительностью. Мы встали перед выбором: применить существующие решения или разработать свою систему.

Для начала мы оценили первый вариант. Существует множество серверов очередей под различные задачи. В основном мы рассматривали наиболее известный, RabbitMQ, по праву считающийся лидером среди подобных систем с открытым исходным кодом.

Несколько достоинств:
  • неплохая производительность, достаточная для большого количества задач;
  • богатый функционал, позволяющий реализовать почти любую логику работы с очередями;
  • подсистема вытеснения сообщений на диск, которая позволяет работать на серверах с недостаточным объемом оперативной памяти, пусть и с потерей производительности;
  • возможность разрабатывать плагины для расширения функциональности.


Но в любой системе хватает и недостатков. Вот некоторые из тех, которые повлияли на наш выбор в пользу разработки своего решения:
  • Недостаточная производительность. Нам важна информация, передающаяся через очереди, поэтому мы хотим все изменения в сервере очередей писать на диск в виде бинарных логов. С включением записи сообщений на диск производительность RabbitMQ падает до слишком низких значений (<10000 вставок событий в секунду).
  • Слишком быстрое падение производительности с увеличением размера сообщения, даже без включения записи на диск.
  • Отсутствие таймаутов на обработку сообщения. Если обработчик очереди завис из-за ошибки в коде или еще по какой-то причине, то сообщение не будет передано другому обработчику, пока зависший не оборвет соединение.
  • Нестабильное время ответа на запросы, особенно на больших очередях (>10 000 000 сообщений).
  • Тот факт, что сервер определяет, кому из обработчиков отдавать события и сколько их отдавать. В сети немало статей по подгону параметров под конкретную задачу. Нас это не устраивало, хотелось вынести логику управления ресурсами обработчиков за пределы сервера. Забегая вперед, скажу — это оказалось хорошей идеей. Мы построили над серверами очередей большую инфраструктуру, отслеживающую приоритеты очередей, количество событий в очередях и свободные ресурсы на серверах с обработчиками событий. Это позволяет динамически порождать и убивать обработчики очередей, а также настраивать лимиты на допустимое множество событий, получаемых из сервера очередей за один запрос (batch processing).
  • Достаточно большой объем служебных данных на каждое сообщение. По непонятным (для меня) причинам объем занимаемой памяти на одно и то же количество сообщений может заметно отличаться от запуска к запуску.


Итак, проанализировав плюсы и минусы этого варианта, после некоторых раздумий мы выбрали разработку собственной системы. Одним из основных доводов стало то, что в 2009 году (когда состоялся первый релиз собственного сервера очередей) существующие решения работали под нагрузкой не очень стабильно. Сейчас многое исправлено и улучшено, но аргументов в пользу своего решения все еще хватает.

Осознав, что нам нужно, подготовили ТЗ и определили требования, предъявляемые к нашему серверу очередей:
  • Любое сообщение должно выдаваться потребителям только после срабатывания некоторого условия. В качестве условия выбрано устанавливаемое клиентом время, после которого событие считается активным и может быть получено обработчиком соответствующей очереди.
  • Сохранять все изменения (Persistence) событий на диск, в случае программных или аппаратных сбоев восстанавливаться со всеми данными.
  • Обеспечить возможность задать порядок выдачи событий обработчикам (сортировка по времени активации события).
  • Не обманывать клиента, отвечать OK на вставке только после записи данных на диск.
  • Обеспечивать стабильно низкую задержку при работе с очередью до 100 000 000 событий.
  • Работать с событиями различного размера от 1 байта до нескольких мегабайтов.
  • Не менее 15000 вставок в секунду.
  • Производительность не должна падать при работе как минимум с 1000 изготовителей событий и 1000 потребителей.
  • Обеспечить отказоустойчивость (хотя бы частичную) в случае аппаратного сбоя дисков и потери\порчи данных. При запуске сервера важно уметь определять корректные данные и отбрасывать битые записи.


Архитектура




Сервер очередей реализован на C модулем к нашему первому сетевому фреймворку для построения хранилищ, из которого вырос и Tarantool. Это однопоточный асинхронный сервер, использующий libev для организации event loop'а. Все запросы обрабатываются по простому бинарному протоколу на основе IPROTO.

WAL процесс


Все изменения пишутся на диск в виде бинарных логов с помощью отдельного WAL процесса. Идеологически все очень похоже на Tarantool, сказываются общие корни. Каждая запись подписывается с помощью crc32, чтобы в процессе загрузки проверить корректность считываемых данных. Сервер очередей, пожалуй, больше всех наших модулей взаимодействует с WAL процессом, так как практически все команды, в том числе и выдача сообщений потребителям, модифицируют состояние сервера, и их необходимо писать на диск.

Dumper


Время от времени порождается процесс, сохраняющий полный образ текущего состояния пользовательских данных и необходимую служебную информацию на диск. По большому счету Dumper не необходим, но позволяет ускорить подъем сервера после перезапуска, так как достаточно прочитать последний snapshot и применить только те бинарные логи, что сделаны после записи образа данных.

Say Logger


Последний процесс отвечает за запись текстовых логов на диск. Часто логи полностью отключают на боевых серверах из-за ухудшающейся производительности; мы постарались избежать этой участи. Для этого порождается отдельный процесс, в котором выполняется внешний логгер, к примеру cronolog. Общение реализовано с помощью socket'ов таким образом, что мы можем работать в одном из двух режимов:
  • ждать запись на диск. Задержки записи логов ухудшат общую производительность.
  • игнорировать переполнение очереди сообщений к процессу логера. Это приведет к потере некоторых записей, но позволит не зависеть от производительности диска с текстовыми логами.


Погружаемся дальше




Все события в очереди находятся в одном из трех состояний:
  • Неактивно. Сообщение принято сервером сообщений, но его нельзя отдавать обработчикам очередей до наступления времени активации.
  • Активно. Время активации события наступило и событие может быть выдано обработчикам очередей.
  • Заблокировано. Событие уже выдано и ожидает подтверждения об обработке. Может быть выдано повторно, если через X секунд не придет команда на удаление события.


В каждой очереди организованы индексы под события в каждом из трех состояний и еще один центральный индекс по всем событиям очереди.

Сервер очередей работает с двумя типами очередей. Логика различается в политике выдачи ID-сообщений: либо ID выдает сервер, либо клиент. Наличие идентификатора у всех сообщений позволяет реализовать расширенную логику работы с очередями. Помимо вставки, получения и удаления, поддерживаются команды изменения данных или времени активации сообщения. Это позволяет организовать перестановку сообщений и изменение статуса обработки в рамках одной очереди. Если у вас есть периодические действия, необязательно удалять событие после его обработки — достаточно переставить время активации на нужное количество минут\часов\дней.

Сервер очередей поддерживает транзакции. Все необходимые для ответа действия (аллокация временных данных, проверка наличия нужных событий, подготовка буферов соединений) выполняются перед записью на диск. При возникновении ошибок все изменения откатываются. Не поддерживается выполнение нескольких команд в рамках одной транзакции. Механизм транзакций обеспечивает эксклюзивное владение правом на изменение события. Одновременная попытка двух клиентов совершить действия, изменяющие состояние какого-то события, закончится возвратом ошибки с соответствующим кодом второму клиенту.

Для корректной активации событий в сервере реализована служебная команда, которую нельзя вызвать по сети — она активируется по таймеру. Внутри организована служебная очередь по всем пользовательским очередям, отвечающая за активацию событий.

При получении запроса от обработчика на новую порцию событий (обычно в районе 1000) из определенной очереди сервер выполняет следующие действия:
  • создаем транзакцию;
  • ищем указанную очередь;
  • блокируем необходимое количество сообщений из вершины индекса активных сообщений;
  • формируем служебную запись о блокировке выбранных событий;
  • отправляем служебную запись в WAL процесс;
  • получаем ответ от WAL процесса;
  • применяем транзакцию к данным в памяти, переносим события в очередь заблокированных;
  • пишем ответ клиенту из заранее подготовленных данных;
  • завершаем транзакцию, снимаем внутренние блокировки со всех событий и чистим временные данные транзакции;
  • в случае каких-либо ошибок на любом из этапов откатываем транзакцию и сообщаем клиенту причину.


Обработка дублирования вставок сообщений. В случае проблем с сетью или высокой загрузкой сервера клиент может решить, что наступил таймаут, сервер не обработал сообщение и нужно его послать еще раз; однако сервер к этому времени может уже обработать сообщение. С очередями, в которых клиент выдает ID, все просто: в данных запроса есть ID — проверяем, есть ли такой в очереди, и, если это так, отсылаем ошибку.

В очередях с внутренней выдачей ID ситуация сложнее: у сервера нет однозначного признака, по которому можно понять, что текущий запрос — это на самом деле дубль запроса, обработанного несколько секунд назад. «Значит, такой признак надо добавить», — решили мы и ввели в пакет 2 дополнительных поля: RequestID, гарантированно уникальный в рамках одного процесса клиента, и PID процесса. На сервере очередей организован кэш insert'ов по ключу {ClientIP, RequestID, PID}, позволяющий отследить дубликаты запросов в течение 10–15 минут. На практике этого более чем достаточно. Потенциальный недостаток — метод не работает через NAT, так как у всех клиентов окажется один и тот же IP и, соответственно, возможны ложные срабатывания.

Создание и настройка очередей. Для упрощения конфигурации очередь автоматически создается с настройками по умолчанию при первой попытке вставки сообщения в указанную очередь. В конфигурации могут быть заданы настройки конкретной очереди, размер порций для попыток активации событий, время, после которого событие из очереди переходит из статуса заблокировано в статус активно и т. п.

Кстати, замечу, что сейчас я бы не стал делать автоматическое создание очередей. Это прижилось и нравится разработчикам бизнес-логики, использующим очереди направо и налево, но отладка всего этого великолепия отняла немало времени и сил.
Неожиданно много corner case'ов всплыло в процессе тестирования, ради редких ситуаций пришлось написать немало кода. Основные проблемы выявились при обработке отката транзакции события, порождающего новую очередь. Если во время создания новой очереди и записи события на диск другие клиенты пытаются добавить события в еще не созданную очередь, приходится понимать, что очередь в процессе создания. Ситуация осложняется, если это — очередь с внутренними ID, в которых сервер сам выдает ID сообщений в ответ на команды вставки. Все события в созданную очередь блокируются до завершения процесса создания очереди, при этом им уже назначается ID. Если транзакцию, создающую очередь, приходится откатывать, необходимо откатить и все транзакции зависимых событий, ожидающих создания очереди. Звучит страшно, а в коде еще страшнее.

Подводим итоги


Хорошее
  • Хорошие показатели производительности — не меньше 50000 rps на вставках. Производительность зависит исключительно от мощности дисков и количества записей, через которое нужно вызывать системный вызов fdatasync.
  • Работа с большими очередями. Одно время у нас были очереди по 170 000 000 сообщений на боевой сервер.
  • Стабильная работа с неравномерной нагрузкой (в каких-то очередях высокая интенсивность, в каких-то часто приходит пиковая нагрузка).
  • Хорошие результаты SLAB-аллокатора — и по производительности и по фрагментации (обычно 90% сообщений в рамках одной очереди имеют одинаковый или близкий размер).
  • Стабильность системы в целом. Ежедневно мы обрабатываем миллиарды сообщений на множестве серверов очередей. За последние 2-3 года не было ни одного сбоя по вине программной части.


Плохое и задачи на будущее
  • Ряд наследственных проблем, полученных от использованного сетевого фреймворка. Все они должны уйти с переводом на кодовую базу Tarantool.
  • Шардинг на клиенте.
  • Обработка дублирования сообщений — ее стоило бы переделать. В принципе работает, но проблема с NAT'ом смущает.
  • Необходимо выделить создание очередей отдельной командой.
  • Иногда хочется хранимых процедур на Lua для расширения возможностей по работе с очередями. Пока что не настолько часто, чтобы дошли руки до реализации.
  • Все события всегда в памяти. Теоретически хорошо бы вытеснять события, которые нескоро активируются на диск. А на практике пока что для нас важнее стабильное время ответа на запросы к серверу.


Использование очередей в Моем Мире


  • Отложенная обработка действий пользователя. Не лучшая идея заставлять пользователя ждать, пока вы сохраните его данные в SQL базу или другое хранилище (а зачастую необходимо вносить изменения сразу в несколько систем). Хуже того, в некоторых реализациях (в основном в маленьких проектах у молодых разработчиков) данные могут вовсе не попасть в хранилище, если клиент оборвал соединение, не дождавшись ответа. Хорошей практикой является добавление события о пользовательских действиях в достаточно быстрый сервер очередей, после чего можно отвечать клиенту, что операция прошла успешно. Всю остальную работу надежно и эффективно выполнят обработчики очереди. В качестве бесплатного бонуса получите упрощение кода на frontend-серверах, которым будет достаточно общаться только с демоном очередей для внесения изменений в любые хранилища. Знания о бизнес-логике различных данных можно вынести в обработчики очередей.
  • Рассылка сообщений, писем и т.п. Вам необходимо отослать большое количество данных, при этом не перегрузив хранилища всплеском запросов. Легко! Варьируя количество обработчиков очереди, размазываем пиковую нагрузку до разумного уровня, чтобы время обработки клиентских запросов в те же источники данных не ухудшилось. И, самое главное, с помощью очередей легко избежать дублирования сообщений. Крайне неприятно получить два письма об одном и том же событии. Для периодических рассылок достаточно после обработки сообщения обновлять время его активации, а не удалять: в нужное время оно снова будет обработано.
  • Транспорт для «надежной» статистики. Передача важных (все, что связано с деньгами) данных на агрегаторы статистики. Системы агрегации статистики обычно требовательны к ресурсам процессора, и при обработке данных могут не обеспечивать необходимое для frontend-серверов время ответа. Еще одна особенность подобных систем — неравномерная загрузка, обычно связанная с обработкой данных порциями. Передача статистики через серверы очередей позволит избежать проблем с нестабильной задержкой и при этом сохранит гарантию доставки.
  • Группировка событий. Если группа событий будет обращаться к одному и тому же набору данных в других системах, имеет смысл ставить одинаковое время активации, так как даже если установить время активации в прошлом, события отсортированы по времени активации. Физический смысл ухищрений в более эффективном использовании кэшей хранилища, в которое пойдут запросы из обработчиков сообщений.
  • Каскадные очереди. Организация конечного автомата из нескольких очередей путем перекладывания данных между очередями по завершении очередного этапа обработки. Часто необходимо в процессе обработки сообщения выполнить ряд действий, сильно различающихся по необходимым ресурсам. В таком случае разнесение «быстрых» и «медленных» действий по разным этапам (очередям) позволяет эффективно управлять необходимым количеством ресурсов, варьируя число обработчиков для каждой очереди. Дополнительно выигрываем в упрощении кода обработчиков и поиске ошибок в бизнес логике. По графикам очередей можно понять, на каком из этапов копятся события и в каком обработчике нужно искать проблемы.


В проекте используем клиенты на Perl и C, в других проектах реализовали клиенты на PHP, Ruby и Java.

P.S.: Специально не стали рисовать таблички со сравнением производительности с существующими подсистемами. Нельзя (я не знаю систем с подходящими возможностями) сравнить с тем же функционалом, что мы используем в бою, а без этого получится еще один тест сферического коня в вакууме.

P.P.S.: Описание некоторых компонентов (административный интерфейс, локальная реплика и т.п.) опустили, так как они реализованы схожим образом в Tarantool.

P.P.P.S.: В одной из следующих статей постараемся рассказать о нашей инфраструктуре по работе с очередями — о том, как управляем ресурсами, как отслеживаем состояние очередей, как организован шардинг событий между серверами очередей и о многом другом.

Если есть какие-то вопросы, задавайте в комментариях.
Tags:
Hubs:
Total votes 87: ↑77 and ↓10 +67
Views 54K
Comments Comments 57

Information

Founded
Location
Россия
Website
vk.com
Employees
5,001–10,000 employees
Registered
Representative
Миша Буданов