Search
Write a publication
Pull to refresh
732.31
VK
Building the Internet

Сквозь тернии к апдейту: история о том, как мы обновляли стриминг ОК

Level of difficultyMedium
Reading time11 min
Views718

Стриминговые данные — важная часть многих современных ИТ-платформ. Они активно используются в разных разделах социальной сети Одноклассники уже более 10 лет. И за это время используемый нами стриминговый фреймворк успел заметно измениться, поэтому настало время обновить его в нашем проекте. Но на пути обновления легаси-систем иногда возникают сложности.

Меня зовут Алина Шестакова. Я разработчик в команде разработки DataPlatform единой облачной платформы One-сloud. В этой статье расскажу о том, как устроен стриминг данных в ОК, а также обо всех патчах и челленджах, возникших в процессе обновления стримингового фреймворка.

Дата-инфраструктура ОК

Дата-инфраструктура ОК выстроена на основе большого стека, и оперирует большим объемом данных:

  • HDFS — 3 кластера в 7 ДЦ с объемом данных более 200 ПБ (+500 ТБ ежедневно);

  • Clickhouse — более 2 ПБ (20 кластеров);

  • YARN — более 200 ТБ RAM, более 60 тысяч вычислительных ядер vcores, более 10 тысяч Spark-джоб ежедневно;

  • Kafka — более 2 ПБ (40 кластеров);

  • Airflow, Netflow, персональные инструменты Zeppelin и Jupyter Notebook и др.

Вся инфраструктура Одноклассников развернута на единой для всех сервисов VK облачной платформе One-cloud. При этом неотъемлемой частью этой дата-инфраструктуры является стриминг.

Примечание: Подробнее о One-cloud можно прочитать в статье Олега Анастасьева «One-cloud – ОС уровня дата-центра в Одноклассниках».

Стриминг ОК

Все данные на портале Одноклассников можно разделить на две группы:

  • контент — храним в объектных хранилищах: OBS, S3 и OCS;

  • события — храним в Kafka, Hadoop, ClickHouse, Druid.

Наш стриминг работает именно с событиями. При этом стриминговые данные используются практически во всех разделах и сервисах портала, среди которых:

  • подарки, рамки;

  • видео, клипы, прямые эфиры;

  • музыка;

  • друзья;

  • группы;

  • воспоминания;

  • игры;

  • реклама;

  • новости.

Сейчас наш стриминг — это:

  • 145 продовых джоб;

  • 10 млн сообщений в минуту;

  • 170 млн сообщений в минуту в пике для одной джобы;

  • 20 ТБ памяти и 2300 ядер в Yarn.

Примечание: Среднее время обработки сообщения — 0.02 мс.

Инсталляция нашего стриминга выглядит примерно следующим образом:

  • контейнеры джобы запускаются в YARN;

  • хранение state — KV-store: Kafka, HDFS, in-memory;

  • мониторинг — на инстансах + Charts + Grafana.

Примечание: Инстансы, развернутые в облаке проекта odnoklassniki-streaming-analysis, производят submit джоб и проверку их «живучести» с помощью Zookeepers.

Наша типовая джоба может читать данные из нескольких входных топиков, отправлять их в выходные топики и взаимодействовать с несколькими хранилищами состояний. Эти состояния хранятся в памяти джобы. 

Чтобы при перезапуске или сбое джобы данные state-а из памяти джобы не терялись, все изменения state-а в памяти джобы дублируются в топики Kafka.

Основными потребителями стриминговых данных являются:

Обновление стриминга ОК

Текущий стриминговый фреймворк работает уже более 10 лет на первой официальной релизной версии 0.7.0. В рамках обновления нам надо было решить довольно большой пул задач, в том числе:

  • поднять версию Apache Samza до 1.8.0;

  • добавить мониторинг в Grafana;

  • перевести сборку проекта с Ant на Gradle;

  • перевести стриминг в облако.

Примечание: Об опыте миграции «железного Hadoop» в облако можно узнать в докладе Михаила Марюфича «Hadoop в Облаке: история миграции сотен петабайт».

И если добавление мониторинга, переход на Gradle и миграция в облако нужны для повышения надежности, удобства эксплуатации и разработки, то необходимость поднятия версии Apache Samza изначально может показаться неочевидной в условиях, когда и так всё работает хорошо и стабильно. 

Дело в том, что в последней актуальной версии Apache Samza 1.8.0 было реализовано большое количество полезных фич, в том числе:

  • поддержка Java 11;

  • Transactional State Checkpointing;

  • host affinity для stateful-джоб в YARN;

  • поддержка Apache Beam pipelines;

  • поддержка Kafka 2.x.x;

  • поддержка Apache Log4j 2;

  • SamzaSQL, UDFs;

  • Multi-stage batch processing;

  • поддержка YARN node labels;

  • поддержка работы с HDFS;

  • Multi-threaded processing в рамках контейнера;

  • поддержка Broadcast Stream;

  • добавление метрик;

  • поддержка RocksDB/in-memory state management;

  • поддержка сессионных окон.

Так, для наших разработчиков была особо востребована поддержка Java 11, Kafka 2.x.x и сессионных окон.

И поскольку поднятие зависимостей в проектах — довольно типовая задача, с которой регулярно сталкиваются многие компании, мы предполагали, что и у нас при обновлении Apache Samza не возникнет трудностей. Но всё оказалось не совсем так.

Проблемы, выявленные до выкатки на прод

Для начала мы подняли зависимости, заменили вызовы удаленных в новой версии интерфейсов в коде джоб, преодолели сложности совместимости с такими библиотеками как guava, mahout и начали запускать тестовую джобу. И сразу выявили первый челлендж.

Авторизация в Kafka

При запуске джобы мы стали получать exception при попытке создать топик для хранения state джобы. Это было странно, поскольку все топики создаются заранее администраторами Kafka, а автосоздание топиков в продакшен-кластерах отключено для упрощения администрирования.

После исследования ошибки оказалось, что загвоздка в том, что Apache Samza не проверяет, создан ли уже топик, а просто пытается создать его, не обрабатывая исключение авторизации, а именно ситуацию, когда у пользователя, из-под которого работают с Kafka наши стриминговые джобы, может не хватать прав на создание топиков.

if (systemAdmin.createStream (changelogSpec)) { 
  LOG.info("created changelog stream");
} else {
  LOG.info("changelog stream already exists.");
}

Чтобы решить эту задачу, мы добавили проверку на существование топика перед его созданием. Успех.

Примечание: Этот и некоторые другие патчи, о которых я буду упоминать, лежат в моем форке на Github. Но они не мерджены в upstream, поскольку сейчас Samza поддерживается крайне неактивно. 

«Ending offset must be equal to or greater than starting offset»

Спустя время мы столкнулись с другой сложностью. Так, наша тестовая джоба работала, но в какой-то момент мы ее отключили. За это время данные в ее топике-сторе успели почиститься по retention. В результате, когда через четыре дня мы попытались запустить джобу повторно, ничего не получилось — начали получать exception с утверждением, что конечный offset должен быть больше или равен начальному offset.

При рестарте джобы происходит рестор — восстановление state из топика-стора. При этом, когда Apache Samza запрашивает информацию о топике, Kafka дает указатель за последним сообщением, в то время, как Apache Samza хочет получить указатель перед последним сообщением.

Чтобы выполнить требование и обеспечить соответствие, в Samza применяется следующий подход:

  • если топик новый — ничего не вычитаем;

  • если топик не новый — отнимаем 1 от последнего оффсета, полученного от Kafka:

0 <= start offset < end offset
0 == start offset == end offset

Но мы столкнулись с тем, что топик не новый, но пустой, то есть начальный оффсет равен конечному, но при этом они не равны нулю. 

0 <= start offset == end offset

Соответственно, мы получаем данное исключение, поскольку Apache Samza вычитает единицу из конечного оффсета и падает на этой своей же проверке. Решением оказался патч с добавлением новой ветки кода в if-else блоке. 

Проблема решена. 

Недоступность Application Master UI

У Apache Samza удобный UI, который позволяет просматривать конфиги, информацию о контейнерах, ссылки на логи в контейнерах, обрабатываемые партиции топиков и другую полезную информацию. Такая функциональность нам нужна.

Но на практике мы начали сталкиваться с недоступностью Application Master UI. В наших кластерах мы не даем доступы на порты экзекьюторов обычным пользователям, но для YARN это не проблема, поскольку он умеет проксировать запросы к WEB UI. При этом, если протокол не задан, YARN выбирает его сам. 

Сложность оказалась в том, что у нас на кластере включена безопасность, поэтому протокол, который выбирает YARN по умолчанию, — https. Samza, в свою очередь, явно определяла его как http.

Чтобы решить это, оказалось достаточно написать простой патч с явным указанием протокола в коде Samza.

Проблемы, выявленные после выкатки на прод

После устранения всех «шероховатостей», мы перешли к самой выкатке в прод. Но и здесь начали возникать трудности.

Битые записи в топиках

После выкатки в прод мы начали получать битые записи в топиках сторов джоб. Поскольку наш feature store не работает с ними и является одним из основных потребителей стриминговых данных, для нас это было критично.

Здесь важный момент:

  • за отсутствие null'ов в топиках-сторах отвечает NullSafe Store — при попытке положить null в стор мы получим исключение;

  • отсутствие null'ов в обычных выходных топиках — ответственность разработчика.

Но при запуске stateful-джобы на новой версии мы начали получать null'ы в топике-сторе, что невозможно, поскольку мы используем NullSafe Store.

В рамках хотфикса мы сбросили оффсеты в конец и получили шестичасовой даталосс фич Ленты ОК.

Что же пошло не так?

Помимо работы с топиками сторов и выходными топиками, джобы также работают с двумя системными топиками: 

  • checkpoints — топик с чекпоинтами;

  • coordinator stream — топик с конфигами джобы.

И в новой версии Apache Samza появился механизм Transactional State Checkpointing. Раз в минуту в топик с чекпоинтами отправляется последний оффсет топика-стора. Соответственно, возможна ситуация, когда джоба упала в промежутке между двумя коммитами, и у нас остались незакомиченные сообщения. В новом механизме Apache Samza продюсером по всем незакомиченным ключам в топик отправляются null-значения.

Оказалось, что Transactional State Checkpointing стал дефолтным, что сломало обратную совместимость между версиями Samza. При этом такое изменение дефолтного поведения фреймворка не было помещено в документацию или release notes. В итоге, чтобы исключить появление проблемы, мы нашли «ручку», позволяющую отключить этот механизм:

task.transactional.state.restore.enabled=false

Беды с партиционированием

Следующее, с чем мы столкнулись — потеря части стейта. 

Так, при переводе джоб стали заметны скачки данных, указывающие на то, что мы перестали находить часть данных. Причем проверка первых гипотез показала, что данные по пути не ломаются и не теряются — проблема не в этом. 

После детального анализа оказалось, что данные просто прилетают не в те партиции, где мы их ожидаем. Например, при чтении данных из 15-й партиции и применении к их ключам UDF, вычисляющей искомые для ключей партиции, оказалось, что часть ключей, лежащих в 15 партиции, должна была попасть в совершенно другие партиции. 

Причина такой рассинхронизации оказалась в смене дефолтных алгоритмов расчета партиции на уровне Kafka, а также различий в обработке данной ситуации на нашей стороне и на стороне Samza. Так, раньше для избавления от отрицательных хэш-кодов к хэш-коду ключа в Kafka применялась маска, а теперь — от хэш-кода ключа берется модуль. 

И поскольку наши потребители читают данные из партиций, определенных старым алгоритмом, нашим решением было создать свой класс партишенера, который реализует старый алгоритм, и прокидывать его в продюсеры Samza. 

Но на этом всё не закончилось. Так, одновременно с тем, как мы создали упомянутый класс партишенера, разработчики Apache Samza зафиксировали в коде использование нового алгоритма, тем самым игнорируя возможные классы партишенеров, переданные в конфигах джоб. В результате для решения проблемы мы сделали патч Apache Samza с изменением нового алгоритма на старый, чем решили проблему потери стейта.

Новые беды с партиционированием 

После повторной попытки выкатки в прод мы стали фиксировать, что часть сообщений уходит не туда — нарушается алгоритм, при котором мы сообщения передаем в стор, а оттуда дублируем в топик Kafka.

Чтобы понять, с чем связана проблема, мы пошли в App Master UI и стали изучать таблицу распределения партиций входных топиков по таскам. 

Примечание: Apache Samza не пытается обрабатывать все данные, прилетающие в топик, в один поток. Вместо этого она использует механизм партиционирования, то есть джоба делится на отдельные таски, а каждая таска, в свою очередь, берет на себя определенные входные партиции топиков и определенные партиции топиков-сторов. 

Здесь стала все стало очевидно. Так, например, нулевая таска работает с нулевыми входными партициями, а также с нулевой партицией стора. 

Но нюанс в том, что в коде таска выбирает соответствующую партицию стора, исходя из ID таски, а партиции выходных топиков выбираются, исходя из partition name. Причем соответствие ID именам тасок определяется в TreeMap.

Из-за этого стала возможной ситуация, когда вторая таска работает со второй партицией стора и десятыми входными партициями, в то время как должна работать со вторыми входными партициями. 

Это обусловлено тем, что сравнение ID — это сравнение чисел, а сравнение тасок — сравнение строк. В таком случае становится понятным, почему при сортировке после “Partition 1” идет “Partition 10”, а не “Partition 2”.

В качестве решения мы сделали патч-фикс компаратора тасок с изменением сравнения строк на сравнение чисел для тасок:

@Override
public int compareTo(TaskName that) {
  return Integer.parseInt(taskName.substring("Partition ".length())) - 
    Integer.parseInt(that.taskName.substring("Partition ".length()));
}

Таким образом, мы второй раз разобрались с потерей стейта.

Конфигурирование Kafka

На этом сложности не закончились. 

После перевода очередной джобы на графике было замечено, что упали фидбэки со свойством «male».

Причем, в отличие от предыдущих раз, подобная ситуация воспроизводится не при каждом рестарте джоб, а лишь периодически, и только на трёх джобах. Более того, она случалась и до обновления.

Чтобы добраться до сути, мы решили изучить конфигурации топиков-сторов.

Здесь ничего интересного не оказалось:

  • cleanup.policycompact, delete;

  • retention.bytes — 1610612736 (со значительным запасом).

Но далее оказалось, что при выставлении параметров в сторе не были учтены дефолтные настройки кластера Kafka, в котором находились проблемные топики. Retention в данном кластере выставлен равным 94 часов.

retention.hours=94

Решением было либо отключить retention по времени для топиков проблемных джоб (retention.ms=-1), либо поставить cleanup.policy=compact, чтобы при достижении retention по времени данные не удалялись, а только компактились. 

Таким образом, потеря стейта была исправлена уже третий раз. Но на этом история с потерей стейта не закончилась.

Restore

При переводе очередной джобы я обнаружила, что моя джоба не просто потеряла часть стейта, а полностью его перетерла. 

Так, при переводе этой джобы на её графиках стали заметны аномалии, указывающие на то, что мы попросту перестали находить данные. Особенность ситуации в том, что мы работали с особой джобой с историческими данными. 

Здесь важно сделать отступление.

Так, перевод на новую версию каждой джобы сопровождается её рестартом. 

Механизм рестора реализован следующим образом:

  • джоба вычитывает все данные из топика стора к себе в память;

  • джоба работает с этим стором, кладёт в него данные и извлекает их;

  • все изменения этого стора дублируются в топик Kafka, чтобы при следующем рестарте снова их вычитать и восстановить стейт к себе в память. 

При анализе я заметила, что в памяти джобы, помимо первого стора, в который происходит вычитывание всех данных из топика Kafka, затем создается второй стор, который остается пустым, но при этом вся работа происходит именно с ним. То есть мы пытаемся достать из этого пустого стора данные, кладем в него данные, и все изменения над этим стором отправляются в топик Kafka, перетирая стейт с прошлого запуска джобы. 

В Apache Samza есть три места, где вызывается метод createStore:

  • NonTransactionalStateTaskRestoreManager;

  • TransactionalStateTaskRestoreManager;

  • ContainerStorageManager.

Причем первые два — взаимоисключающие. 

Здесь примечательно, что раньше мы не замечали подобную проблему с потерей всего стейта потому, что работали с джобами, ключом и значением которых являются обычные объекты:

key : Id
value: ImportantData

Но в данной ситуации мы работали с особой джобой с историческими данными, которая в качестве значения вместо объекта хранит массив объектов. 

key : Id
value: Array<ImportantData>

Таким образом, когда мы работали с другими джобами и пытались положить новое значение в стор, мы перетирали один объект на другой в значениях записей, а в текущей джобе мы стали перетирать массив с историческими данными, которых могло быть несколько десятков, на массив с одним значением. 

Решением оказалось добавление хэш-таблицы, которая хранит сторы и не позволяет Apache Samza создавать стор, если он уже существует в системе. 

object StoragesCollection {
  val STORAGES: mutable.HashMap [ (SystemStreamPartition, String), StorageEngine]
    = new mutable.HashMap[(SystemStreamPartition, String), StorageEngine]
}

Что в итоге

Обновление большого зрелого сервиса или инструмента — всегда «задача со звездочкой». Особенно, если у него «под капотом» довольно много legacy-кода. Но наш опыт показывает, что именно избавление от legacy — важная задача, реализация которой может стать точкой мощного роста инструмента и значительно упростить работу с ним в дальнейшем.

Вместе с тем, надо понимать, что в отдельных случаях поддержка привычного, но устаревшего стека — путь к медленной, но неизбежной стагнации. Поэтому, если инструмент или сервис начинает «изживать себя», лучше начать работу над сменой стека. Безусловно, это сопряжено с дополнительными издержками и трудностями, но зачастую польза, полученная от подобных инвестиций, всё оправдывает.

Tags:
Hubs:
+30
Comments0

Articles

Information

Website
vk.com
Registered
Founded
Employees
5,001–10,000 employees
Location
Россия
Representative
Миша Берггрен