Как стать автором
Обновить

Секреты Spark в Arenadata Hadoop: как мы ускорили построение витрин для задач ML

Уровень сложностиСредний
Время на прочтение9 мин
Количество просмотров849

Привет, Хабр! Я Дмитрий Жихарев, CPO Платформы искусственного интеллекта RAISA в Лаборатории ИИ РСХБ-Интех. В этой статье я и архитектор нашей платформы Александр Рындин @aryndin9999 расскажем о том, как мы построили взаимодействие Платформы ИИ и Озера данных для работы с витринами данных моделей машинного обучения с использованием Spark.

Нашей Лаборатории ИИ всего два года, и мы имеем весьма амбициозную цель — улучшить процессы и результаты РСХБ путём внедрения технологий ИИ. А в большом банке это означает выстроить тысячу движущихся частей так, чтобы процесс работал быстро, надёжно и эффективно. Интеграция с Платформой данных — это всего одна из тысячи движущихся частей в Платформе ИИ, но краеугольная для всего жизненного цикла моделей. Давайте для начала разберём, почему.

Для чего нужны витрины

Создавать, внедрять и поддерживать модели машинного обучения — одна из основных задач Лаборатории ИИ. А поскольку жизненный цикл моделей тесно связан с жизненным циклом данных, на которых они построены и работают, то важнейшим шагом для успешной разработки и применения моделей является создание эффективного тракта данных для моделирования и инференса.

Возьмём для примера классические банковские модели — модели оттока, взыскания, LTV. Все эти модели пакетные, и создаются по однотипному процессу.

Сначала дата-инженер строит витрину, на которой модель будет обучаться. При этом эту витрину нужно ещё и периодически перестраивать, когда мы экспериментируем, добавляя и удаляя какие-то поля. Далее дата-сайентист выбирает алгоритм модели, обучает и тестирует модель.

При успешном прохождении тестов модель уходит на продуктивизацию, т.е. включается в пакетный процесс, который выполняется по расписанию (регламенту) на боевом окружении и в котором модель обрабатывает данные, предсказывая целевую переменную. Витрина для применения модели может отличаться от витрины для обучения более узким набором полей. Эту витрину нужно регулярно перестраивать, чтобы у модели были новые данные для обработки. Свои результирующие выходные векторы модель сохраняет в ещё одну витрину.

Таким образом, получается, что витрин на каждую модель минимум три — витрина для обучения, витрина для применения и результирующая витрина. В нашем случае все витрины располагаются в Платформе данных, а конкретнее в одной из систем платформы — Озере данных. Витрины могут содержать десятки миллионов записей, поэтому для работы с ними из Платформы ИИ нам необходимо использовать производительный инструмент пакетной обработки. Теперь обратимся к сценариям, в которых этот инструмент нам нужен.

Сценарии взаимодействия с витринами

Варианты взаимодействия Платформы ИИ с Озером данных
Варианты взаимодействия Платформы ИИ с Озером данных

Таких сценариев два — ad-hoc анализ в Jupyter и пакетная обработка по регламенту в Airflow. В Озере данных были для этого инструменты и раньше — у нас в банке используется дистрибутив Arenadata Hadoop 3.3.6.1, в нём есть Hive и Impala. Мы использовали Impala, но она хорошо себя показывает только на коротких ad-hoc запросах. Использовали также и Hive, но он проигрывает по скорости, так как постоянно пишет промежуточные результаты на диск. Поэтому у нас давно чесались руки перейти на Spark в качестве движка обработки данных, тем более, что он в ADH есть из коробки! Причём относительно свежий — версия 3.5.2. Вот как мы можем использовать Spark в двух сценариях:

  • при ad-hoc анализе — DS запускает в Jupyter-ноутбуке запрос через Sparkmagic;

  • при пакетной обработке на этапе применения — в DAG'е Airflow Spark вызывается через SparkSubmitOperator.

Но несмотря на то, что у нас есть большой энтерпрайз-дистрибутив, в котором всё поставлено, надеяться, что это всё поставленное ещё и заработает из коробки, не надо. Во-первых, входящие в наши платформы программные компоненты надо правильно настроить и привести версии в консистентный вид. Во-вторых, надо сделать так, чтобы после одноразового процесса настройки эта конструкция не развалилась после ближайшего релиза. Рассказываем по порядку, каким путём мы прошли.

Первый этап — интеграция Платформы ИИ с Озером данных

Для запуска заданий Spark есть три режима — YARN Cluster, YARN Client и Spark Standalone.

YARN Cluster

YARN Client

Spark Standalone

Где запускается драйвер

Application Master

Client

Client

Кто запрашивает ресурсы

Application Master

Application Master

Client

Кто стартует процесс Executor

YARN NodeManager

YARN NodeManager

Spark Slave

Сервис персистентности

YARN ResourceManager и NodeManagers

YARN ResourceManager и NodeManagers

Spark Master и Workers

Поддержка интерактивной работы

Нет

Да

Да

Режим Spark Standalone мы не используем, поскольку Платформа ИИ пока не располагает собственным кластером Spark. Таким образом, у нас остаётся два режима на выбор — YARN Cluster или YARN Client. 

Начнём с более простого сценария — запуск из Airflow с помощью оператора SparkSubmitOperator. Интерактивное взаимодействие в данном случае нам не нужно, и мы можем сэкономить ресурсы, запуская драйвер на стороне Озера данных. Поэтому для работы из Airflow мы выбираем режим YARN Cluster.

А для работы из Jupyter-ноутбуков требуется интерактивное взаимодействие, поэтому в этом случае у нас есть только режим YARN Client. Здесь главная тонкость — выбор места запуска драйвера, потому что от него сильно зависит организация сетевого взаимодействия между системами.

В чём особенность? Дело в том, что для взаимодействия Spark и YARN в интерактивном режиме нужно открыть обоюдный доступ между сетями, где они расположены. Причём в обе стороны. И не просто один порт, а целый диапазон, так как для каждого взаимодействия поднимается отдельный экземпляр драйвера на стороне инициатора и отдельный набор Spark-экзекьюторов и Application Master на стороне кластера Yarn. Есть несколько решений, рассмотрим их:

Вариант 1: запустить драйвер рядом с Jupyter, внутри Платформы ИИ (она у нас развёрнута в кластере Kubernetes). Но в этом случае драйвер будет запускаться внутри пода Kubernetes, и чтобы к нему могли обращаться Spark-экзекьюторы, его нужно будет опубликовать как сервис типа Node port. И таких публикаций будет много, т.к. заданий Spark также много:

Сетевое взаимодействие между драйвером Spark и YARN в интерактивном режиме
Сетевое взаимодействие между драйвером Spark и YARN в интерактивном режиме

Вариант 2: расширить Jupyter специальным веб-сервером Jupyter Enterprise Gateway, разместив этот сервер вне кластера Kubernetes. Этот вариант требует, очевидно, инсталляции этого самого веб-сервера, его поддержки и наличия экспертизы. Также потребуются дополнительные виртуальные машины для его запуска. 

Вариант 3: Использовать входящий в ADH компонент Apache Livy. Сам Livy навсегда завис в инкубаторе Apache и обновляется крайне редко, но обеспечивает основную функциональность — аутентификацию при подключении и имперсонацию при выполнении операций. Также он не требует дополнительных ресурсов для его поддержки:

Сетевое взаимодействие через Livy
Сетевое взаимодействие через Livy

Вариант 4: Использовать также входящий в ADH клиент-серверный компонент Spark 3 — Spark Connect. Он должен со временем заменить Livy, став встроенным в Spark (и возможно даже основным) инструментом удалённого подключения. Также Spark Connect призван обеспечить изоляцию пользовательских сессий по ресурсам и отвязать версии пользовательских библиотек от серверных. Но сейчас в нём нет встроенной аутентификации, а значит её придётся делать самим отдельно. И когда мы сделаем её, дальше нужно будет сделать ещё и имперсонацию — задание в Spark должно запускаться от учётки пользователя и с его правами.

Сейчас мы выбрали вариант №3 — Livy, но продолжаем следить за развитием Spark Connect. Теперь, когда мы выбрали два архитектурных решения, которые реализуют запуск заданий Spark как в интерактивном режиме из Jupyter, так и в пакетном режиме из Airflow, нужно было сделать так, чтобы эти решения работали стабильно.

Второй этап — синхронизация версий и настроек

Для стабильности нужно, чтобы версии и эндпоинты между платформами не расходились. В Платформе ИИ мы по умолчанию используем Python 3.11, но поддерживаем также и Python 3.8 как LTS. В дистрибутиве ADH по умолчанию стоит Python 3.10, но можно доставить любую нужную, что мы и сделали. Таким образом, для неинтерактивного режима версии Python между Платформой ИИ и Озером стали синхронизованы.

С интерактивным взаимодействием оказалось сложнее — в ADH 3.3.6.1 стоит версия Livy на базе релиза 0.7, который был разработан под Python 3.8, и поддержки Python 3.11 в ней ещё нет. Мы написали об этом вендору, он обещал нам обновить этот момент в следующих версиях. Пока же мы для интерактивного режима остались на Python 3.8. Конечно, это не блокирует работу, но фактор в любом случае сдерживающий — например, нельзя запустить несколько команд Spark в одной ячейке. 

Для правильной работы интерактивного режима понадобилось также донастроить Kerberos-аутентификацию по всем участвующим компонентам, поскольку процесс контролируется со стороны ИБ, и без этого не работает. Кроме того, мы подключили в Livy имперсонацию, чтобы задание в Spark запускалось от учётки пользователя и с его правами, а не от «дефолтного» пользователя spark.

Ещё одна задача, которую нам потребовалось решить — синхронизация настроек подключения к Озеру. Адреса сервисов платформы ADH (Hive, Impala, Livy, HDFS, YARN) в нашем случае не прибиты гвоздями и могут меняться при установке патчсетов, донастройке и расширении системы. Если эти параметры начинают расходиться с теми, что указаны в настройках у Платформы ИИ, мы получаем рандомные ошибки подключения. Поэтому мы настроили на стороне Озера процесс, который автоматически собирает текущую конфигурацию и выгружает её в боевое хранилище настроек, откуда Платформа ИИ их забирает.

Теперь все интеграции были готовы, протестированы, все настройки выполнены и синхронизованы, и мы могли запускаться на бою.

Танец с транзакционными таблицами

Вот мы радостно запустили первые отряды Spark-экзекьюторов вытащить из Озера данные, и... больше половины вернулись к нам "с пустыми руками". Примерно 60% таблиц на боевом Озере данных при попытках чтения возвращали пустые датасеты. Никаких ошибок нет, просто датасет пустой:

Мы попытались прочесть файлы в Hadoop напрямую, и результат чтения приблизил нас к пониманию сути проблемы — таблицы оказались транзакционными. Вот такая структура у транзакционного файла:

Выяснили, что Озеро данных постоянно доливает в таблицы инкременты, и чтобы не блокировать чтение со стороны всех его потребителей, при проектировании Озера многие таблицы сделали транзакционными. Это исключает возможность чтения несогласованных состояний таблиц.

Как можно видеть из структуры ORC-файла выше — в нем есть 5 технических полей и поле с данными. При этом с точки зрения Hive описаны только поля с данными. В метаданных таблицы не отражено, что строка хранится в некотором JSON-формате. Поэтому Spark читает ее как обычную таблицу, ожидая получить только строки, соответствующие схеме метаданных Hive. Признак транзакционности таблицы Spark не учитывает, и получив строки в другом формате, в итоге отбрасывает всё их содержимое — с его точки зрения оно не соответствует схеме данных.

Мы могли бы попробовать прочитать эти данные напрямую и распарсить их из JSON-формата, но наверняка должно было существовать промышленное решение, которое бы сделало это за нас.

Поговорили с владельцами Озера и с вендором Arenadata — в новейших релизах ADH в платформу добавили поддержку формата метаданных Iceberg, в котором работа с транзакционными таблицам есть уже из коробки. Это целевое решение, и Озеро на него перейдёт. Но в моменте у нас есть транзакционные таблицы Hive и нет возможности подождать несколько месяцев, пока все процессы в Озере, связанные с транзакционными таблицами, начнут сохранять данные в формате Iceberg.

Таким образом, единственным вариантом было своими силами сделать API для работы с транзакционными таблицами на текущей версии ADH. Поискали, с чего можно начать, и нашли библиотеку Hive ACID Data Source for Apache Spark. Она реализует поддержку обработки транзакционных таблиц на базе API Spark Datasource V1, поэтому есть ограничения и с производительностью, и с функциональностью, но как начальное решение нас это устроило. Библиотека сделана была для Spark 2 и Hive 3. А у нас Spark 3 и Hive 4. У создателей библиотеки скорее всего есть более новые версии, но уже с закрытым исходным кодом. Поэтому мы сделали форк от последней доступной по лицензии Apache 2.0 версии, смигрировали его исходный код на новые версии Spark и Hive, а сборку адаптировали к окружению ADH. И теперь новая версия поддерживает то, что нам нужно, и позволяет нам работать с такими табличками:

Что дальше?

В будущем мы планируем использовать помимо встроенного в ADH сервера Spark также поднять у себя на платформе собственный Spark-кластер, а также History Server. Попутно мы сделаем интеграцию журналов таким образом, чтобы разработчики и инженеры, создающие и эксплуатирующие процессы на Платформе ИИ, имели одну точку для просмотра и анализа логов Airflow и Spark. Это упростит нам задачу поддержки сетевой связности, а также позволит контролировать версию Spark на своей стороне. С точки зрения интерактивного режима многообещающим выглядит Spark Connect, но для него предстоит решить задачу обеспечения информационной безопасности. А сейчас у дата-сайентистов и дата-инженеров появился стандартный и производительный инструмент для работы с витринами.

Итак, в этот статье мы рассмотрели сценарий технологической интеграции для работы с витринами под классические модели машинного обучения. И это одна из тысячи движущихся частей. Что ждёт платформы ИИ и платформы данных в ближайшие 3-5 лет — можно подглядеть в ИИ-стратегиях крупных технологических компаний. Не забегая слишком далеко в general AI, подумаем хотя бы про agentic AI. Вместе с живыми дата-инженерами средствами обработки данных будет оперировать АИ-агент. И он будет работать не один, а в сотрудничестве с другими агентами через handoff'ы, и вся Платформа данных таким образом теснее сблизится с Платформой ИИ. Вместо хранилища файлов это решение уже станет генератором ответов на вопросы. А это означает, что над теми слоями взаимодействия, которые есть в наших платформах сегодня, нарастут новые, ещё более сложные, которые мы сейчас только осмысляем — ищем в них ценность, понимаем, как их строить и как ими управлять. И на следующем этапе эволюции платформ данных движущихся частей будет уже тысяча, а миллион. Задача нашей Лаборатории ИИ — собрать этот миллион частей так, чтобы он работал быстро, надёжно и эффективно.

Теги:
Хабы:
+11
Комментарии0

Публикации

Информация

Сайт
www.rshbdigital.ru
Дата регистрации
Дата основания
Численность
свыше 10 000 человек
Местоположение
Россия
Представитель
Юлия Князева