Прим. переводчика: автор статьи рассказывает о процессе обновления кластера Elasticsearch размером более 3 петабайт методом последовательного включения двух кластеров, а также о том, как решались проблемы согласованности индексирования и миграции данных.
Это вторая часть из серии статей об обновлении кластера Elasticsearch без простоев и с минимальным воздействием на пользователей.
Как упоминалось в первой части, необходимо было обеспечить плавный переход между двумя версиями системы, при этом сохраняя возможность отката.
Поэтому с самого начала было очевидно, что придется запускать параллельно пару кластеров Elasticsearch, а затем постепенно переходить со старого на новый. В этой части пойдет речь о том, как решались проблемы согласованности индексирования и миграции данных.
Всё меняется
Требования к нашей системе довольно необычны по сравнению с типичным кластером Elasticsearch, собирающим логи, поскольку в ней попросту отсутствуют неизменяемые данные. Любой документ в любом индексе в любой момент времени может быть обновлен. Поэтому приходится внимательно следить за обновлениями и всегда применять их в правильном порядке.
Каждую секунду кластер получает 5000 новых документов и более 10000 обновлений. Всего в нем более 400 млрд документов, которые, вместе с репликами, «весят» более 3,5 ПБ.
Управлять этой горой данных очень непросто, даже если речь идет всего об одном кластере. К счастью, в Elasticsearch есть оптимистическая блокировка — и, смею вас заверить, ее мы использовали по полной. До начала миграции на новый кластер особых проблем с согласованностью в старом кластере не было (по крайней мере, мы ничего о них не знали).
Сценарии работы с обновлениями документов
Документы в нашей системе обновляются по ряду причин. Три распространенных примера: новая активность (например, лайк или комментарий к сообщению в соцсети), аннотации к документу, сделанные клиентом (например, метка или комментарий, прикрепленный к документу) или нормативные требования (приходится скрывать контент от поставщиков или удалять его на основании определенных правил).
Справиться со всем этим нашей системе помогают четыре базовые операции над документами и полями. Все они применяются к заданному идентификатору документа, а идентификаторы гарантированно уникальны для всего кластера.
Upsert
Добавить новый документ или полностью заменить существующий.
Обратите внимание, что удаления также проводятся как Upsert'ы. То есть Upsert просто заменяет существующий документ «пустым».
AddIfNotExists
Добавить новый документ, если его еще нет. В противном случае сохранить существующий и отбросить новый.
SetField
Заменить значение одного поля в документе, сохранив остальные поля нетронутыми. Выдать ошибку, если документ не существует.
AppendToField / RemoveFromField
Для полей массива также поддерживается функция добавления/удаления на уровне полей. Она удобнее SetField при работе с наборами значений. Во всех остальных аспектах семантика ее операций аналогична SetField.
До обновления для всех этих операций брался существующий документ из Elasticsearch, в него вносились изменения, после чего он сохранялся в кластер. За синхронизацию и версионирование этих операций отвечала сама Elasticsearch. Следование правилам оптимистической блокировки более или менее защищало нас от потери данных и/или другого непредсказуемого поведения. Но, как и ожидалось, вся эта идиллия закончилась с добавлением второго кластера.
Добавление второго кластера
Добавление в архитектуру второго кластера повлекло за собой массу новых проблем. Любой пропуск обновления или, что хуже, его потеря незамедлительно ударили бы по клиентам.
Одна из задач проекта миграции состояла в том, чтобы получить возможность перебрасывать поисковые запросы между кластерами незаметно для клиентов. То есть все полученные операции с документами должны были применяться в обоих кластерах в одном и том же порядке. Потеря операции в любом из кластеров сильно бы осложнила жизнь пользователям: например, тег, добавленный к записи, попросту исчезал бы для всех запросов, направляемых в этот кластер. При наших объемах даже маловероятная ошибка (к примеру, один шанс на миллион) случалась бы с завидным постоянством.
Также предстояло решить распространенную проблему, когда две операции применяются к одному и тому же документу почти одновременно. К примеру, за Upsert'ом сразу следует SetField. В такой ситуации неосторожное обращение грозит полной потерей данных. Если эти операции применить в кластерах в разном порядке, получится совершенно разный результат.
Требовалось сделать так, чтобы в конечном итоге оба кластера сходились к одному конечному результату при любой последовательности операций, даже если те применялись в разном порядке.
Непростая задача.
Архитектура, «заточенная» под согласованность
Почти сразу пришло понимание, что необходимо ограничить количество компонентов с прямым доступом к Elasticsearch. В исходном состоянии миграция на новый кластер была бы слишком сложной и требовала бы абсолютной координации из-за одномоментности переключения. Было решено остановиться на двух таких компонентах: один — для пакетной потоковой передачи данных в больших объемах и один — с REST API.
Главное, что предстояло сделать — убрать версионирование из Elasticsearch, поскольку было бы очень сложно обеспечить применение каждого отдельного обновления в одном и том же порядке в обоих кластерах. Также было решено отказаться от поддержки всех описанных выше операций в компонентах, которые взаимодействуют с Elasticsearch, а вместо этого реализовать две операции «низкого уровня»:
GetDocument
Извлекает документ и номер его версии по идентификатору documentId. Возвращаемую здесь версию необходимо передать UpsertIfLatest.
UpsertIfLatest
Делает Upsert для документа, если версия определена как последняя.
Если версия не новее существующей, выдать ошибку и сообщить об этом вызывающей стороне.
Например, высокоуровневая операция SetField может быть реализована так:
fun SetField(String field, String newValue, String docId) {
var success = false
while (!success) {
val document = GetDocument(docId)
document[field] = newValue
document.version = GetNextVersion(document.version)
success = UpsertIfLatest(docId, document)
}
}
Это означает, что пришлось внедрить сервис документов (Document), который использовал компоненты хранилища для реализации API высокого уровня (AddIfNotExists / SetField / AppendToField / RemoveFromField). Сервис документов создает версионированные Upsert'ы, которые дублируются в компоненты хранилища, работающие с отдельными кластерами. Компоненты хранилища независимо для каждого кластера решают, последняя ли версия у входящей операции Upsert или нет.
Дополнительная сложность состояла в том, что разные версии Elasticsearch отличались друг от друга реализацией внутреннего версионирования и оптимистической блокировки. Поэтому пришлось изобрести третью, единую причинно-согласованную схему версионирования на основе порядкового номера, работающую поверх двух других. Порядковые номера, которые управлялись «вне» кластеров, хранились в нескольких полях версий всех документов.
Не будем вдаваться в подробности, но команде из четырех человек потребовалось более года, чтобы аккуратно и постепенно перестроить весь пайплайн индексирования и обновления. В итоге получилась такая схема:
Преимущества архитектуры, «заточенной» под согласованность
Вся работа по поддержке двух кластеров была не просто «неизбежным злом», связанным с обновлением. Она принесла пользу гораздо быстрее, чем предполагалось. Вот краткий список дополнительных преимуществ, которые удалось получить:
Улучшены автоматические наборы тестов и мониторинг согласованности.
Улучшено разделение ответственности между несколькими компонентами, которые до этого были слишком сильно переплетены между собой.
Обнаружен и исправлен ряд ошибок, связанных с пограничными кейсами и возникавших в старой кластерной архитектуре: в данных были обнаружены свидетельства некоторых известных ошибок согласованности, порожденных самой Elasticsearch, о которых ранее не было известно.
Архитектуру «двух кластеров» можно использовать и в будущем:
например, для новых обновлений кластера;
на ее основе можно организовать еще один кластер в другой географической точке, чтобы обеспечить отказоустойчивость и локальность данных или снизить задержки.
Миграция данных между кластерами
Параллельно с проектированием и реализацией заточенной под согласованность архитектуры стояла задача понять, как именно копировать данные из старого кластера в новый.
Мы рассмотрели множество различных вариантов, от внедрения новых компонентов, которые считывали бы снапшоты Elasticsearch и записывали их на новый кластер, до индексации с удаленного сервера. В итоге было решено экспортировать все данные (1 ПБ) с помощью scan & scroll API и сохранять сжатые пакеты JSON-файлов в хранилище S3. После этого достаточно было пропустить эти JSON-документы через основной пайплайн.
Так мы можем не только реплицировать данные, но и одновременно улучшать их. Например, удалить кастомный код, который обрабатывал старые данные, и добавить поля, введенные с момента индексации. В результате получилась более последовательная, тонкая и понятная информационная модель для нового кластера, что само по себе огромное улучшение.
По нашим оценкам, пайплайн экспорта, переработки и реиндексации смог бы обрабатывать около 50 тыс. документов в секунду. Таким образом, работа по заполнению нового кластера должна была занять не менее 3 месяцев при условии, что процесс не будет прерываться.
Поскольку копирование заняло бы так много времени, было решено перестроить архитектуру остальной части системы на использование «частичных» данных, которые постепенно появляются в новом кластере. Это позволило бы избежать глобального переключения в самом конце. Такое решение повлияло на дальнейшие планы и концепцию и в итоге пришлось весьма кстати (подробнее об этом в одной из следующих статей).
Пиковая пропускная способность индексирования во время миграции составила 130 тыс. документов в секунду. Но, как видно из рисунка 5, по разным причинам ее не удавалось поддерживать длительное время. Кроме того, индексирование пришлось дополнительно замедлить после того, как основная часть запросов конечных пользователей начала направляться в новый кластер (чтобы не рисковать ростом поисковых задержек).
Стоит также отметить, что оценка средней пропускной способности оказалась очень точной. Весь процесс развертывания занял ~ 110 дней с момента начала индексирования до момента, когда все данные оказались на новом кластере.
На этом заканчивается вторая часть серии материалов об обновлении кластера Elasticsearch. Следите за обновлениями: очередная статья будет опубликована на следующей неделе.
P.S.
Читайте также в нашем блоге: