
В 2017 году мы рассказывали о том, как спроектировали нашу систему поиска сообщений так, чтобы она могла индексировать миллиарды сообщений. Благодаря этому наша поисковая инфраструктура стала высокопроизводительной, экономной, масштабируемой и простой в использовании. Мы решили выбрать Elasticsearch, в котором сообщения Discord шардились по индексам и использовалось логическое пространство имён для сообщений Elasticsearch в двух кластерах Elasticsearch. Сообщения шардились или по серверу Discord (который ниже будем называть гильдией) или по личным сообщениям (DM). Это позволило нам хранить все сообщения гильдии рядом для обеспечения высокой скорости запросов и работать с маленькими, более удобными кластерами. Так как поиском пользуются не все, сообщения индексировались лениво, и мы создали очередь сообщений, позволявшую воркерам получать блоки сообщений для индексирования, чтобы воспользоваться возможностями массового индексирования (bulk-indexing) Elasticsearch.
Но с ростом объёмов Discord наша поисковая инфраструктура начала трещать по швам…
Наша очередь индексирования сообщений Redis теряла сообщения
В основе нашей очереди индексирования сообщений в реальном времени лежала Redis, которая поначалу справлялась отлично. Однако если по какой-то причине происходило резервное копирование нашей очереди индексирования, что часто случается при сбое узла Elasticsearch, кластер Redis становился источником сбоев, начинавшим терять сообщения при задействовании всех ресурсов CPU в случаях, когда в очереди находилось слишком много сообщений.
Наше массовое индексирование было неустойчиво при сбое индекса или узла Elasticsearch
Частично причиной такого сильного влияния сбоев узлов Elasticsearch на очередь индексирования стал способ оптимизации массового индексирования сообщений. Работавшие в реальном времени воркеры индексирования получали пакеты сообщений из очереди, чтобы индексировать их в Elasticsearch. Однако все эти сообщения принадлежали к разным индексам и узлам Elasticsearch в нашем кластере — одна операция массового индексирования пакета из 50 сообщений могла распределяться по 50 узлам Elasticsearch в кластере! Если в пакете происходил сбой операции, то сбойной считалась вся операция массового индексирования, а для повторной попытки все сообщения снова помещались в очередь.
Рассмотрим пример: допустим, наш кластер Elasticsearch состоит из 100 узлов и мы индексируем пакеты по 50 сообщений. В случае сбоя узла, если принять равномерное распределение, вероятность того, что конкретный пакет содержит хотя бы одно сообщение, отправляемое этому сбойному узлу примерно составляет 40%. Это значит, что сбой одного узла приводит к сбоям примерно 40% операций массового индексирования!

Крупные кластеры Elasticsearch имели большой оверхед, что влияло на производительность и работоспособность
С ростом количества сообщений мы масштабировали кластеры Elasticsearch, добавляя дополнительные узлы. Это позволяло нам достаточно легко выполнять горизонтальное масштабирование добавлением новых индексов и узлов, обрабатывающих большее количество серверов Discord и их сообщений. Однако из-за их добавления каждая из наших операций массового индексирования распределялась по гораздо большему количеству индексов и узлов. Операции распространялись шире, вызывая снижение скорости индексирования. Кроме того, чем больше узлов в кластере, тем выше вероятность сбоя кластера.
Отсутствие хорошего способа реализации обновлений ПО и непрерывных перезапусков
Из-за низкой устойчивости системы к сбоям единственного узла у нас не было качественного решения для выполнения непрерывных перезапусков (rolling restart) и обновлений ПО. Кроме того, наши кластеры стали очень крупными и громоздкими: более 200 узлов, а данные исчислялись в терабайтах. Любые стратегии по плавному опустошению узлов перед перезапуском для обеспечения постоянной доступности сервиса потребовали бы слишком много времени. Мы были вынуждены оставить на наших кластерах легаси-версии операционных систем и Elasticsearch, не имея возможности ни установить критически важные обновления, ни улучшить производительность.
Самым примечательным примером стал патчинг уязвимости log4shell отключением операций поиска JNDI и присваиванием log4j2.formatMsgNoLookups
значения true
— для этого потребовалось бы полностью отключить нашу поисковую систему на период технического обслуживания, пока мы перезапускаем каждый узел Elasticsearch с новой конфигурацией.
Индексы для больших гильдий стали слишком большими
Со временем некоторые из наших индексов Elasticsearch стали хранить данные некоторых очень крупных гильдий Discord. Естественно, крупные гильдии публикуют кучу сообщений, из-за чего индексы становятся очень большими. Каждый из наших индексов Elasticsearch внутри устроен, как единый индекс Lucene, а у Lucene есть ограничение MAX_DOC, равное примерно двум миллиардам сообщений на индекс. Мы прочувствовали это на своей шкуре, когда превысили этот лимит, и все операции индексирования начали завершаться сбоями.
Когда это произошло, единственным способом устранения проблемы стало то, что наша команда безопасности искала гильдии, предназначенные исключительно для спама как можно большего количества сообщений, и удаляла их. Какое-то время это работало, но мы знали, что рано или поздно нам придётся обеспечить поддержку поиска в сообществах, опубликовавших больше двух миллиардов сообщений.
Решения
Наша поисковая инфраструктура многие годы справлялась со своей задачей, но мы начали сталкиваться со всё большим количеством инцидентов: кластеры не выдерживали обработку увеличившегося количества сообщений и запросов. Учитывая уязвимость инфраструктуры к непрерывным перезапускам и обновлениям ПО, нам также нужно было обновить и устаревшее ПО.
Это должно было стать серьёзным мероприятием, поэтому мы решили, что настало время изучить существующую инфраструктуру и развить её.
Развёртывание Elasticsearch на Kubernetes
В то время Discord развёртывал многие свои stateless-сервисы на Kubernetes. Это существенно повысило удобство управления и позволило нам оптимизировать затраты, более точно выделяя ресурсы вычислительной мощи и памяти. У нас не было на Kubernetes никаких сервисов с хранением состояния, но Elastic Kubernetes Operator казался многообещающим решением для оркестрации кластера Elasticsearch на Kubernetes и управления им.
Благодаря Elasticsearch Operator мы могли бы с лёгкостью определять топологию и конфигурацию наших кластеров, а также развёртывать кластер Elasticsearch в нашем пуле узлов Kubernetes. Обновления операционных систем происходили бы автоматически, а ECK предоставлял бы эргономичные инструменты для безопасного выполнения непрерывных перезапусков и обновлений.
Многокластерная архитектура «ячеек» для запуска небольших кластеров Elasticsearch
Определившись, что будем работать на Kubernetes и использовать ECK operator, мы задумали создание более обширной архитектуры, в которой будет работать большее количество кластеров Elasticsearch меньшего размера.
В нашей легаси-системе кластеры разрослись до более чем двухсот узлов. Это означало высокую вероятность сбоев отдельных узлов в кластере и очень высокие затраты на координацию самого кластера.
Состояния кластеров в Elasticsearch не масштабируются — в процессе роста кластера, добавления в него новых узлов и индексов повышается и оверхед для мастер-узла кластера. Наши мастер-узлы кластеров стали сталкиваться с частыми OOM, вызывавшими сбои индексирования, растущий бэклог сообщений, которые не удалось индексировать в Elasticsearch, снижение скорости обработки запросов и частые таймауты.
В нашей новой поисковой инфраструктуре мы решили создавать больше индексов Elasticsearch, чтобы каждый индекс мог оставаться в пределах рекомендованных 200 миллионов сообщений и 50 ГБ данных. Эти индексы можно распределить по множеству мелких кластеров, что поможет снизить оверхед хранения состояния кластеров и их координации, а также повысить производительность.
Мы внедрили концепцию логической «ячейки» множества кластеров Elasticsearch, позволившую нам обеспечивать высокоуровневую группировку множества мелких кластеров Elasticsearch. Ниже мы расскажем о том, как это позволило реализовать новые функции поиска и возможность работы с большими гильдиями.

Внутри каждого из этих малых кластеров Elasticsearch в пределах ячейки у нас есть выделенные узлы потребления (ingest node), узлы с ролью мастера (master-eligible node) и узлы данных (data node). Наличие этих выделенных ролей узлов обеспечивает следующие условия:
Узлы с ролью мастера всегда имеют достаточное количество ресурсов для выполнения координации кластеров
Узлы потребления, выполняющие постобработку и маршрутизацию, могут работать как stateless-системы, потому что они не владеют данными и могут наращивать и уменьшать свои объёмы, чтобы справляться с пиками объёмов данных
Узлам данных можно выделить ресурсы в куче, достаточные для обработки операций индексирования и запросов
Учитывая разные потребности в ресурсах у этих типов узлов, соответствующие поды запускаются на разных типах машин и пулах узлов.
Мы спроектировали каждый кластер так, чтобы он был устойчив к зональным сбоям:
Три мастер-узла: по одному на зону
Не менее трёх узлов потребления: по одному на зону
Узлы данных: основной индекс и реплика находятся в разных зонах, учитывающих распределение по шардам с принудительной балансировкой индексов среди всех зон
Очередь сообщений PubSub
Мы провели миграцию очереди сообщений индексирования с Redis на PubSub, что позволило обеспечить гарантированную доставку сообщений и устойчивость к большим бэклогам сообщений. Благодаря этому сбои Elasticsearch теперь могут приводить к замедлению индексирования, но сообщения теряться не будут.
Удобство гарантированной доставки сообщений в PubSub привело к тому, что мы начали использовать PubSub в других сценариях, например в планировании задач Discord, но это уже тема для отдельного поста.
Группировка сообщений по кластерам и индекс перед массовым индексированием
Мы по-прежнему хотим, чтобы сообщения индексировались массово, потому что это обеспечивает высокую производительность, но воркеры должны быть интеллектуальными: собирать и индексировать группы сообщений, находящиеся в одном кластере. Это гарантирует, что каждая операция массового индексирования будет обращаться только к одному индексу и узлу Elasticsearch, чтобы в случае сбоя узла он повлиял на операции индексирования сообщений только в этом узле. Сегодня, когда у нас стало больше кластеров и индексов, это ещё более важно.
Чтобы улучшить нашу стратегию массового индексирования, мы реализовали маршрутизатор сообщений PubSub, который выполняет потоковую передачу сообщений из PubSub и собирает сообщения, сгруппированные по их Destination
, или в нашем случае по кластеру и индексу Elasticsearch, к которым относится сообщение. Маршрутизатор сообщений создаёт канал и порождает задачу tokio для каждого Destination
, куда отправляются сообщения: затем маршрутизатор отправляет сообщение задаче, собирающей блоки сообщений для индексирования, которые она получает по своему каналу.
Маршрутизатор выполняет потоковую передачу сообщений из PubSub и извлекает Destination
(ключ) для каждого сообщения. В нашем случае, Destination
— это кластер и индекс Elasticsearch, в которых будет находиться сообщение. Когда маршрутизатор видит сообщение с новым Destination
, он порождает отдельный канал и задачу tokio, получающую сообщения по этому каналу.
В процессе получения новых сообщений маршрутизатор перенаправляет их в соответствующий канал по ключу Destination
. Задача каждого destination собирает блоки сообщений, ключом для которых становится одно и то же Destination
, а затем выполняет массовое индексирование в Elasticsearch.
/// MessageRouter выполняет маршрутизацию сообщений множеству динамически создаваемых destinations.
/// Это упрощённое представление MessageRouter, используемого для индексации
/// новых сообщений Discord в Elasticsearch.
struct MessageRouter<DestinationKeyT, MessageT> {
destinations: RwLock<HashMap<DestinationKeyT, UnboundedSender<MessageT>>,
}
impl<DestinationKeyT, MessageT> MessageRouter {
/// Пытаемся отправить сообщение указанному destination, создаём его,
/// если destination не существует.
fn send_message(
&self,
destination_key: DestinationKeyT,
message: MessageT,
) -> Result<()> {
let mut destinations = self.destinations.write();
match destinations.entry(destination_key) {
Entry::Occupied(mut ent) => {
// Отправляем сообщение указанному destination
ent.get().send(message).ok();
}
Entry::Vacant(ent) => {
// Создаём новое destination и получателя
let (destination_sender, destination_receiver) = unbounded_channel();
let task = tokio::task::spawn(async move {
// Задача Destination получает сообщения с одним ключом destination
// для destination_receiver.
// В нашем случае, задача destination группирует сообщения
// в блоки и выполняет массовое индексирование в Elasticsearch.
});
ent.insert(destination_sender).send(message).ok();
}
}
Ok(())
}
}
Поддержка новых сценариев использования поиска
Создание определений «ячеек» в кластерах Elasticsearch дало нам полезную абстракцию, позволившую индексировать сообщения в разных размерностях. Пользователи давно просили о создании функции поиска по всем личным сообщениям, но мы не могли обеспечить её поддержку, потому что все сообщения были разбиты на шарды и индексировались по гильдии или личным сообщениям. Распределение поискового запроса по всем личным сообщениям пользователя было бы слишком затратной задачей.
Для эффективного поиска по всем личным сообщениям нам необходимо было бы разбить сообщения на шарды по пользователям, а не по каналам и хранить все личные сообщения пользователя вместе по заданному индексу Elasticsearch. Для этого бы потребовалось хранить вдвое больше данных, потому что каждое личное сообщение индексировалось бы по индексу recipient_a
и индексу recipient_b
.
Учитывая то, что мы уже планировали переиндексировать все сообщения Discord в рамках миграции на новую архитектуру, нам представилась редкая возможность по-другому разделить на шарды часть сообщений и индексировать их. Сообщения в гильдиях по-прежнему будут разбиваться на шарды по guild_id
, но теперь пользовательские личные сообщения будут разбиваться на шарды по user_id
и индексироваться в отдельной ячейке Elasticsearch user-dm-messages
.
Именно такая система теперь обрабатывает новую функциональность поиска по всем личным сообщениям, доступную в мобильной версии. Сегодня всех личные сообщения и поисковые запросы обрабатываются ячейкой Elasticsearch user-dm-messages
, а сообщения и запросы в гильдиях — ячейкой guild-messages
. Пользователю не придётся вспоминать, кто конкретно послал ему сообщение шесть лет назад!
Специализированные кластеры Elasticsearch для больших гильдий со множеством шардов
Чем дольше история гильдий в Discord, тем они становятся больше, и тем большее количество из них упирается в потолок ограничений Lucene MAX_DOC, примерно равное двум миллиардам сообщений. Нам необходимо было решение для масштабирования поиска в этих особых случаях. Мы назвали их BFG (Big Freaking Guilds).
Мы хотели сохранить все преимущества производительности, полученные благодаря хранению всех сообщений одной гильдии в одном и том же шарде Elasticsearch, потому что это по-прежнему подходит подавляющему большинству гильдий, но нам требовалось решение и для масштабирования поиска по BFG.
Индексы Elasticsearch могут иметь множество первичных шардов, и каждый шард находится в узле кластера. Из-за наличия множественных первичных шардов данные индекса распространяются по множеству узлов кластера, для которых хранится множество индексов Lucene.
Для оптимизации производительности запросов мы обычно создаём индексы Elasticsearch с единственным первичным шардом. Это гарантирует, что все сообщения в этом индексе будут находиться в одном узле Elasticsearch, что гарантирует отсутствие оверхеда распределения и координирования в процессе запроса. Для почти всех гильдий Discord это приводит к гораздо более быстрым поисковым запросам, потому что каждому запросу нужно общаться только с одним узлом Elasticsearch.
Однако в случае BFG мы имеем дело с огромными индексами и большим объёмом сообщений: миллиарды сообщений в одной гильдии! Наличие большего количества шардов может ускорить поисковые запросы, но только когда затраты на координирование не превосходят затраты на запросы индекса. В случае BFG мы можем реализовать рост производительности запросов благодаря параллелизму, связанному с распределением поискового запроса по нескольким шардам, и затраты на координирование это оправдывают.
Индексирование сообщений BFG в индексы со множественными первичными шардами позволяет нам масштабировать поисковую функциональность до гильдий с миллиардами сообщений, а также позволяет выполнять более быстрые запросы для этих особых гильдий. Кроме того, это улучшает производительность запросов для других гильдий Discord, ресурсы которых ранее бы потребляли затратные запросы BFG по тому же индексу.
Мы создали новую систему, позволяющую нам переиндексировать сообщения BFG в новый индекс с большим количеством первичных шардов, увеличив таким образом предел максимального числа сообщений. Важно также то, что в процессе миграции нам нужно продолжать индексировать новые сообщения и обрабатывать запросы.
Процесс переиндексирования для BFG происходит следующим образом:
Мы выявляем BFG — гильдию, в которой количество сообщений приближается к
MAX_DOCS
Lucene. Эта гильдия на текущий момент индексируется вindex-a
Создаём в выделенной для BFG ячейке Elasticsearch
new-bfg-index
, увеличив количество шардов вдвое.Новые сообщения теперь индексируются и в
index-a
, и вnew-bfg-index
Запускаем job для исторического индексирования всех сообщений гильдии в
new-bfg-index
. Текущие поисковые запросы по-прежнему обрабатываются изindex-a
После завершения исторического индексирования мы переносим новый трафик запросов с
index-a
наnew-bfg-index
Убедившись в высокой производительности и надёжности
new-bfg-index
, мы прекращаем индексировать новые входящие сообщения вindex-a
и запускаем задачу очистки для удаления всех сообщений BFG изtoo-small-index
Благодаря выделению ячейки Elasticsearch под BFG мы можем оставить стандартными конфигурации кластера и индекса для большинства гильдий и ресурсов, сохранив при этом возможность необходимой подстройки под BFG.
Получилось ли у нас добиться успеха?
Благодаря нашей новой инфраструктуре мы:
Индексируем триллионы сообщений, удвоив при этом пропускную способность по сравнению с легаси-поиском.
Улучшили медианный показатель задержки запроса, снизив его с 500 мс до менее чем 100 мс, а p99 снизился с 1 с до менее чем 500 мс.
Создали 40 кластеров Elasticsearch с тысячами индексов.
Выполняем апгрейды кластеров и непрерывные перезапуски автоматически и без ущерба для доступности сервисов.