Миграция с «железа» в облако в большинстве случаев уже не кажется чем-то сложным или удивительным — тенденция на развертывание решений в облаке общая и устоявшаяся. Но если с переносом в облачную среду небольших ИТ-компонентов все просто, то в случае с глобальными системами на сотни петабайт данных все несколько иначе — такие кейсы встречаются редко. 

Меня зовут Михаил Марюфич. Я руководитель Data Platform в ОК, отвечаю за инфраструктуру для Big Data и машинного обучения. В этой статье я расскажу о нашем опыте переноса Hadoop с Bare Metal в облако: с чего стартовали, какие варианты рассматривали, как выстроили миграцию и с чем сталкивались в процессе.

Исходная инсталляция

Hadoop — один из основных компонентов ИТ-инфраструктуры ОК, который мы активно задействуем для работы с данными от разных источников.

Наш Hadoop — довольно крупная система:

  • размер кластера в дисках превышает 250 петабайт;

  • объем оперативной памяти — 200 терабайт, которые распределены по 7 дата-центрам;

  • выполняет более 10 тысяч задач ежедневно;

  • имеет федерацию из трех основных кластеров.

В Hadoop две ключевые подсистемы:

  • YARN;

  • HDFS.

YARN — подсистема выполнения расчётов. Она интенсивно использует CPU и RAM на машине, на которой развернута. Состоит из: 

  • ресурсного менеджера (resourcemanager), который выделяет ресурсы;

  • nodemanager, на которых запускаются приложения, потребляющие ресурсы.

HDFS — подсистема хранения данных. Состоит из:

  • namenode, где хранится информация о блоках;

  • datanode, где хранятся блоки данных.

В «железе» это выглядит следующим образом:

  • есть мастер-хосты, где развернуты namenode и resourcemanager;

  • есть worker-host, где развернуты nodemanager и datanode. 

Такая реализация нужна, чтобы эффективно потреблять ресурсы всей машины: например, datanode, как правило, больше задействуют диски, а nodemanager — наоборот.

Чтобы обеспечить работу Hadoop без даунтаймов, мы используем классическую схему c синхронизацией namenode через кворумные журналы (quorum journal):

  • есть активная namenode, которая пишет информацию; 

  • есть standby namenodes, которые зачитывают логи изменений из журналов, синхронизируют изменения и могут стать активными в любой момент.

К тому же наш Hadoop использует преимущества зон доступности — фактически у каждого блока реплики находятся в трех удаленных друг от друга дата-центрах. Это позволяет гарантировать отказоустойчивость решения даже в случае временного или полного выхода одного ЦОДа из строя. 

Проблемы исходной реализации и пути их решения

У нашего Hadoop на железе было несколько «слабых мест», которые хотелось устранить или улучшить.

  • Эксплуатация. Большинство задач, связанных с администрированием решения и обслуживанием инфраструктуры нам приходилось частично или полностью выполнять вручную. Замена/добавление узлов, обновление конфигураций на кластере, развертывание новых кластеров — многие моменты требовали вовлечения специалистов.

  • Мутабельные состояния компонентов. У нас хорошо развиты практики работы с физическим оборудованием, что позволяет минимизировать появление мутабельных состояний. Вместе с тем, полностью исключить ситуации, когда на части узлов оказываются неконсистентные с другими узлами кластера конфигурации или версии ПО, на практике не получилось, что чревато инцидентами.

  • Объединение Compute и Storage. Мы хотели разделить Compute и Storage, чтобы повысить эффективность потребления ресурсов. Потребление мощностей неравномерно в течение дня, на ночное время приходится пик вычислительных нагрузок — по нашему внутреннему SLA все расчёты должны быть закончены к утру. В итоге нам нужно выделять мощности на проект с большим запасом, который будет простаивать большую часть времени. В схеме с разделенным compute и storage мы получаем возможность расширить кластер в моменты высоких нагрузок, а когда ресурсы не востребованы — передавать их другим приложениям.

    График потребления CPU.
    График потребления CPU.

То есть нам нужна была реализация, которая позволила бы:

  • обеспечить независимое горизонтальное и вертикальное масштабирование compute- и storage-слоёв;

  • повысить удобство эксплуатации (ввод-вывод узлов, создание новых кластеров и других операций);

  • поддерживать иммутабельное состояние компонентов.

Добиться этого можно было тремя способами.

  • Переход на Managed-инфраструктуру. Подход подразумевает использование инфраструктуры и сервисов стороннего вендора, который берет на себя задачи администрирования и обслуживания. Вариант позволяет по умолчанию получить разделение compute и storage, а также уйти от эксплуатационных трудностей. Но нам он не подходил — мы не можем хранить данные пользователей в публичных облаках, да и стоимость развертывания и поддержки такого решения под наш объем данных в стороннем облаке неоправданно велика. 

  • Смена стека технологий. Потенциально мы могли применить в качестве storage что-то compatible (ceph, apache ozone, свое решение), а в качестве compute — spark over k8s или trino over k8s. Это наиболее популярные варианты решений. Но в нашем случае такой подход не оправдан: во-первых, у нас уже есть hdfs, который нас целиком устраивает, во-вторых, у нас есть не только Spark, но и другие legacy-инструменты, а Kubernetes нет вовсе. К тому же менять весь стек ради получения нескольких преимуществ — не самый очевидный путь.

  • Развертывание решения на внутренней инфраструктуре. В VK есть собственный контейнерный оркестратор One-Cloud. Ближайший его аналог из open-source — Kubernetes. По сути, One-Cloud — внутреннее облако, которым пользуются многие бизнес-юниты холдинга (OK, ВКонтакте, Дзен и другие). То есть мы можем развернуть Hadoop в One-Cloud и разделить compute/storage (hdfs/yarn). Причём такой путь уже проторен — подобные проекты уже реализовывали Uber и Yandex. Для нас этот вариант стал приоритетным — он эволюционный и согласуется со стратегией развития компании. 

От идеи к первым проработкам

One-Cloud — это большой контейнерный оркестратор, который управляет распределением всех ресурсов. Фактически он решает, где будет размещен тот или иной инстанс приложения. Например, nodemanager и datanode могут быть автоматически размещены на разных хостах. 

Главное условие для системы — максимально использовать доступные ресурсы. Для нас основное преимущество облака — решение проблемы менеджмента ресурсов и возможность гибко масштабировать компоненты без необходимости их ручного переноса между хостами.

Чтобы развернуть кластер в облаке, достаточно небольших манифестов с указанием Docker-образа, количества ресурсов и некоторых дополнительных параметров. 

На первый взгляд, ничего сложного. Но для работы Hadoop нам нужно было предварительно решить несколько проблем.

  • Конфигурация. Во внешнем мире конфигурирование решают разными способами. В ОК для этого применяется собственная система — Portal Management System (PMS). Инструмент проинтегрирован со всем продакшеном. Он позволяет писать конфигурации к отдельному приложению и подписываться на изменения этих конфигураций. Используя PMS, можно доставлять файлы с конфигурациями в контейнеры Hadoop. В итоге задачи менеджмента конфигураций мы закрываем с помощью Portal Management System.

  • Kerberos. В Hadoop есть Kerberos — сетевой протокол аутентификации с использованием центра доверия. Он применяется для аутентификации клиентов кластера и узлов внутри кластера. Фактически это единственный способ настроить security в Hadoop. В реализации на Bare Metal мы контролировали эти процессы в полуручном режиме, но для облака сразу искали способ полной автоматизации. Для этого мы написали Kerberos Registration manager — компонент, который сканирует топологию новых кластеров, находит узлы ��ез учётки и создаёт их в Kerberos, а также создаёт keytabs и заливает их в Vault.

В итого пайплайн создания компонентов кластера выглядит так:

  • Выполняем сабмит манифеста в облако.

  • Облако определяет сервер, подходящий для запуска. Стартует контейнер.

  • В контейнер доставляются конфигурации, сертификат, kerberos keytab.

  • Стартует Hadoop. Узел Hadoop входит в кластер.

Так мы можем создать кластер, в котором будут разделены Compute и Storage, HDFS и YARN.

От теории к реальным петабайтам

После выработки алгоритма создания кластера в облаке, стал вопрос переноса сотен петабайтов данных с физического оборудования в облачную среду. 

Есть два основных подхода к миграции:

  • постепенный перевод приложений на новый кластер;

  • постепенное добавление облачных узлов.

Постепенный перевод приложений на новый кластер

Метод подразумевает развертывание облачного кластера и последовательную миграцию в него целых приложений или их крупных компонентов. Также можно реализовать несколько усложненную, но более экономичную схему, при которой облачный кластер постепенно расширяется, соразмерно «схлопыванию» физической инфраструктуры:

  • постепенно переводим запись;

  • копируем историю;

  • наращиваем кластер, освобождаем старые ресурсы.

Способ надёжен, но у него есть несколько недостатков:

  • процесс требует ручного управления и контроля;

  • копирование данных может затянуться на долгий срок;

  • на момент миграции надо перестраивать бизнес-логику обращения с данными, а пользователи вынуждены работать одновременно с несколькими кластерами: читать в одних, писать в другие.

В результате этот вариант нам не подошёл.

Постепенное добавление облачных узлов

Метод подразумевает, что параллельно со старым кластером в облаке создаётся новая, пустая datanode, которая сразу включается в кластер.

Кластер начинает писать данные на новую ноду. Параллельно начинается выво�� одной из старых нод через процедуру декомиссии (блоки постепенно реплицируются на другие узлы).

После того, как старая нода опустеет, её можно вывести из кластера.

По такому алгоритму можно бесшовно перенести в облако все данные без влияния на конечного пользователя.

Последним этапом в облако переносится namenode. 

После этого миграцию Hadoop в облако можно считать завершённой. Причём такой подход исключает даунтайм и потерю данных.

Мы начали миграцию по этому пути, но…

Миграция и первые несоответствия ожиданиям

После запуска процедуры переноса данных на ноду в облаке мы получили скорость репликации на уровне 2 тысяч блоков в минуту. Это примерно 250 Гб. С учётом того, что у нас даже в не самом большом кластере было около 150 млн блоков, на миграцию с такой скоростью нужно было около 52 дней, что непозволительно долго.

Мы начали разбираться, изучили Namenode и увидели, что ядра не загружены, но в фоне крутится RedundancyMonitor, метод — computeDatanodeWork().

Он отвечает за назначение заданий на репликацию нод. При этом алгоритм выбора датанод для репликации примерно следующий:

  • Выбираются случайные датаноды в случайных дата-центрах.

  • Проверяется соответствие политике размещения блоков — важно, чтобы были выбраны три датаноды в трех разных ЦОДах.

  • Если условие не выполняется, цикл повторяется.

Наш анализ показал, что 90% времени уходит на выбор случайных датанод и случайных ЦОДов. Примечательно, что обычно это быстрые операции. Чтобы найти первопричину проблемы, мы начали углубляться в реализацию на уровне кода. 

По дефолту порядок выбора случайной ноды такой:

  • Проверяются все датаноды в ЦОДе.

  • Проверяется наличие дисков нужного типа.

  • Если диски есть, нода добавляется в список.

  • Из списка выбирается случайная датанода.

В такой реализации основная загвоздка была с проверкой на наличие дисков соответствующего типа — вероятно при разработке не учитывали, что в списке может быть не 10 элементов, а, например, 150, что кратно увеличивает времязатраты. 

Но в нашей инсталляции только HDD-диски, то есть проверять их соответствие не нужно. Это позволило немного переписать код и существенно упростить схему — теперь выбор всегда рандомный без дополнительных согласований.

Такие манипуляции позволили нам повысить скорость репликаций до 15 тысяч блоков в минуту (2 ТБ против 250 ГБ до внесения изменений). В итоге время на миграцию сократилось с 52 до 7 дней. 

После этого оставался вопрос с миграцией мастеров.

Миграция мастеров

Алгоритм миграции мастеров довольно прозрачный. 

  • Например, есть кластер из двух namenode — одна из них активная, а другая standby. Есть три журнала, в которые они пишут с кворумом.

  • В облако добавляется новая namenode и пара журналов (для сохранения нечетности).

  • После проверки корректности, namenode в облаке делают активной.

  • Одна из старых namenode выводится из строя.

  • По такому же принципу в облако переносится вторая namenode standby. После этого оставшееся «железо» выводится из кластера. Миграция мастеров завершена.

Но есть пара нюансов, с которыми пришлось разбираться.

  • Невозможность динамического добавления компонентов. В Hadoop нельзя динамически добавлять новые namenode и журналы. Такую функциональность обещают в ближайшее время, но пока её нет. В итоге нам пришлось вручную перезагружать ноды в нескольких кластерах. Это не критично, но некие неудобства доставило.

  • Сложность бесшовного переноса данных. С namenode работает много клиентов и разрыв связи с любым из них может привести к сбою на уровне процессов — например, к невозможности читать и писать, что равно локальному даунтайму. При этом нам было важно, чтобы на пользователях, работающих с Hadoop, миграция не отразилась.

    Трудность в том, что топология кластера хранится на клиенте в файле hdfs-site.xml.

    Если при создании новой облачной namenode не добавить информацию о ней на все клиенты и не выполнить соответствующую переконфигурацию клиента, система не сможет работать с кластером, запросы будут вести на standby namenode. Чтобы исключить такие проблемы, конфигурации топологии кластера надо доставить на каждого клиента и обновлять при их изменении. 

В нашем случае эту проблему решает, то, что конфигурации в большинстве приложений применяются динамически и хранятся в Portal Management System, который в данном случае выступает единым источником истины. Для Hadoop мы предварительно централизовали все конфиги, поэтому нам было достаточно указать информацию о новой ноде в одном месте и она автоматически подтягивается во всех клиентах кластера.

После этого миграция была завершена. Профит!

Жизнь после миграции

Довольно быстро после начала работы Hadoop в облаке мы столкнулись с двумя обстоятельствами. 

Во-первых, общая производительность расчётов упала на 10-15%. Потеря была отчасти ожидаемой и предсказуемой, поскольку мы отказались от локальной работы с данными и вызовов по сети стало больше. Для нас такая динамика не критична — потеря производительности с лихвой окупается преимуществами, полученными от работы в облаке.

Во-вторых, мы начали отмечать общую деградацию производительности расчётов. 

Так, мы ежедневно проводим расчёты DWH. «Красненькое» — наша внутренняя прокси-метрика, которая позволяет понять, что когда расчёты завершены успешно.

При работе с Hadoop на Bare Metal для нас было нормой, что расчёты заканчиваются утром — максимум до 11:00. После миграции в облако это условие соблюдалось. Но спустя некоторое время началась резкая деградация — окончание расчётов сместилось сначала на 12:00, потом на 16:00, а после и вовсе на 18:00. 

Причём, согласно статистике, расчёты занимают всю доступную память. А часть расчётов даже не успевает считаться день в день.

Дальнейший анализ показал, что проблемы локализуются на IO — с некоторых datanode наши nodemanager считывают в 10-100 раз медленнее, чем с других.

Причина оказалась в IOCost — это технология, которая позволяет изолировать приложения между собой по диску и предоставлять им гарантии чтения/записи. Например, чтобы Kafka могла комфортно «соседствовать» с любым другим приложением, использующим диск.

В нашем случае оказалось, что есть прямая зависимость от включения IOCost на новые ЦОДы. Технологию постепенно раскатывали на новые ЦОДы, чтобы обеспечить лучшие гарантии, но это только усугубляло деградацию. 

В моменте мы решили откатить изменения (отказаться от IOCost) и увидели, что негативный эффект полностью нивелирован — расчёты снова начали заканчиваться до обеда и даже раньше (к 6-7 утра). Hadoop’у стало комфортно. 

В дальнейшем, чтобы обеспечить гарантии для «приложений-соседей», мы перепрофилировали решение и смогли включить IOCost на весь production без аффекта на Hadoop и другие приложения.

Так мы закрыли последний гештальт и окончательно перешли на Hadoop в облаке.

Наши результаты и выученные уроки

Вся миграция — от идеи до финала — заняла у нас 1,5 года и была проведена силами всего трёх специалистов. 

Переход в облако дал нам ряд преимуществ. Мы:

  • получили возможность гибко масштабировать инфраструктуру и сэкономили на закупках N миллионов за счет возможности покупать только диски;

  • делегировали задачи администрирования и обслуживания железа команде One- Cloud;

  • полностью автоматизировали рутинные операции с Hadoop (расширение кластера, замена сломанных дисков/машин и другие);

  • объединили вычислительные кластеры, научились ночью брать больше ресурсов и ускорили расчёты;

  • сделали Hadoop as a service, который теперь предоставляем другим бизнес-юнитам холдинга.

На основе нашего опыта можно сформулировать несколько рекомендаций:

  • Не бойтесь читать и править исходный код системы, если это нужно для задачи.

  • Покрывайте большим количеством метрик все компоненты Hadoop и вовремя реагируйте на подозрительные сигналы.

  • Непонимание любого эффекта — потенциальная проблема, которая может стать реальной в самый неудобный момент. С любыми эффектами лучше разбираться сразу.