Привет, Хабр! Я Дмитрий Жихарев, CPO Платформы искусственного интеллекта RAISA в Лаборатории ИИ РСХБ-Интех. В этой статье я и архитектор нашей платформы Александр Рындин @aryndin9999 расскажем о том, как мы построили взаимодействие Платформы ИИ и Озера данных для работы с витринами данных моделей машинного обучения с использованием Spark.

Нашей Лаборатории ИИ всего два года, и мы имеем весьма амбициозную цель — улучшить процессы и результаты РСХБ путём внедрения технологий ИИ. А в большом банке это означает выстроить тысячу движущихся частей так, чтобы процесс работал быстро, надёжно и эффективно. Интеграция с Платформой данных — это всего одна из тысячи движущихся частей в Платформе ИИ, но краеугольная для всего жизненного цикла моделей. Давайте для начала разберём, почему.
Для чего нужны витрины
Создавать, внедрять и поддерживать модели машинного обучения — одна из основных задач Лаборатории ИИ. А поскольку жизненный цикл моделей тесно связан с жизненным циклом данных, на которых они построены и работают, то важнейшим шагом для успешной разработки и применения моделей является создание эффективного тракта данных для моделирования и инференса.
Возьмём для примера классические банковские модели — модели оттока, взыскания, LTV. Все эти модели пакетные, и создаются по однотипному процессу.
Сначала дата-инженер строит витрину, на которой модель будет обучаться. При этом эту витрину нужно ещё и периодически перестраивать, когда мы экспериментируем, добавляя и удаляя какие-то поля. Далее дата-сайентист выбирает алгоритм модели, обучает и тестирует модель.
При успешном прохождении тестов модель уходит на продуктивизацию, т.е. включается в пакетный процесс, который выполняется по расписанию (регламенту) на боевом окружении и в котором модель обрабатывает данные, предсказывая целевую переменную. Витрина для применения модели может отличаться от витрины для обучения более узким набором полей. Эту витрину нужно регулярно перестраивать, чтобы у модели были новые данные для обработки. Свои результирующие выходные векторы модель сохраняет в ещё одну витрину.
Таким образом, получается, что витрин на каждую модель минимум три — витрина для обучения, витрина для применения и результирующая витрина. В нашем случае все витрины располагаются в Платформе данных, а конкретнее в одной из систем платформы — Озере данных. Витрины могут содержать десятки миллионов записей, поэтому для работы с ними из Платформы ИИ нам необходимо использовать производительный инструмент пакетной обработки. Теперь обратимся к сценариям, в которых этот инструмент нам нужен.
Сценарии взаимодействия с витринами

Таких сценариев два — ad-hoc анализ в Jupyter и пакетная обработка по регламенту в Airflow. В Озере данных были для этого инструменты и раньше — у нас в банке используется дистрибутив Arenadata Hadoop 3.3.6.1, в нём есть Hive и Impala. Мы использовали Impala, но она хорошо себя показывает только на коротких ad-hoc запросах. Использовали также и Hive, но он проигрывает по скорости, так как постоянно пишет промежуточные результаты на диск. Поэтому у нас давно чесались руки перейти на Spark в качестве движка обработки данных, тем более, что он в ADH есть из коробки! Причём относительно свежий — версия 3.5.2. Вот как мы можем использовать Spark в двух сценариях:
при ad-hoc анализе — DS запускает в Jupyter-ноутбуке запрос через Sparkmagic;
при пакетной обработке на этапе применения — в DAG'е Airflow Spark вызывается через SparkSubmitOperator.
Но несмотря на то, что у нас есть большой энтерпрайз-дистрибутив, в котором всё поставлено, надеяться, что это всё поставленное ещё и заработает из коробки, не надо. Во-первых, входящие в наши платформы программные компоненты надо правильно настроить и привести версии в консистентный вид. Во-вторых, надо сделать так, чтобы после одноразового процесса настройки эта конструкция не развалилась после ближайшего релиза. Рассказываем по порядку, каким путём мы прошли.
Первый этап — интеграция Платформы ИИ с Озером данных
Для запуска заданий Spark есть три режима — YARN Cluster, YARN Client и Spark Standalone.
YARN Cluster | YARN Client | Spark Standalone | |
Где запускается драйвер | Application Master | Client | Client |
Кто запрашивает ресурсы | Application Master | Application Master | Client |
Кто стартует процесс Executor | YARN NodeManager | YARN NodeManager | Spark Slave |
Сервис персистентности | YARN ResourceManager и NodeManagers | YARN ResourceManager и NodeManagers | Spark Master и Workers |
Поддержка интерактивной работы | Нет | Да | Да |
Режим Spark Standalone мы не используем, поскольку Платформа ИИ пока не располагает собственным кластером Spark. Таким образом, у нас остаётся два режима на выбор — YARN Cluster или YARN Client.
Начнём с более простого сценария — запуск из Airflow с помощью оператора SparkSubmitOperator. Интерактивное взаимодействие в данном случае нам не нужно, и мы можем сэкономить ресурсы, запуская драйвер на стороне Озера данных. Поэтому для работы из Airflow мы выбираем режим YARN Cluster.
А для работы из Jupyter-ноутбуков требуется интерактивное взаимодействие, поэтому в этом случае у нас есть только режим YARN Client. Здесь главная тонкость — выбор места запуска драйвера, потому что от него сильно зависит организация сетевого взаимодействия между системами.
В чём особенность? Дело в том, что для взаимодействия Spark и YARN в интерактивном режиме нужно открыть обоюдный доступ между сетями, где они расположены. Причём в обе стороны. И не просто один порт, а целый диапазон, так как для каждого взаимодействия поднимается отдельный экземпляр драйвера на стороне инициатора и отдельный набор Spark-экзекьюторов и Application Master на стороне кластера Yarn. Есть несколько решений, рассмотрим их:
Вариант 1: запустить драйвер рядом с Jupyter, внутри Платформы ИИ (она у нас развёрнута в кластере Kubernetes). Но в этом случае драйвер будет запускаться внутри пода Kubernetes, и чтобы к нему могли обращаться Spark-экзекьюторы, его нужно будет опубликовать как сервис типа Node port. И таких публикаций будет много, т.к. заданий Spark также много:

Вариант 2: расширить Jupyter специальным веб-сервером Jupyter Enterprise Gateway, разместив этот сервер вне кластера Kubernetes. Этот вариант требует, очевидно, инсталляции этого самого веб-сервера, его поддержки и наличия экспертизы. Также потребуются дополнительные виртуальные машины для его запуска.
Вариант 3: Использовать входящий в ADH компонент Apache Livy. Сам Livy навсегда завис в инкубаторе Apache и обновляется крайне редко, но обеспечивает основную функциональность — аутентификацию при подключении и имперсонацию при выполнении операций. Также он не требует дополнительных ресурсов для его поддержки:

Вариант 4: Использовать также входящий в ADH клиент-серверный компонент Spark 3 — Spark Connect. Он должен со временем заменить Livy, став встроенным в Spark (и возможно даже основным) инструментом удалённого подключения. Также Spark Connect призван обеспечить изоляцию пользовательских сессий по ресурсам и отвязать версии пользовательских библиотек от серверных. Но сейчас в нём нет встроенной аутентификации, а значит её придётся делать самим отдельно. И когда мы сделаем её, дальше нужно будет сделать ещё и имперсонацию — задание в Spark должно запускаться от учётки пользователя и с его правами.
Сейчас мы выбрали вариант №3 — Livy, но продолжаем следить за развитием Spark Connect. Теперь, когда мы выбрали два архитектурных решения, которые реализуют запуск заданий Spark как в интерактивном режиме из Jupyter, так и в пакетном режиме из Airflow, нужно было сделать так, чтобы эти решения работали стабильно.
Второй этап — синхронизация версий и настроек
Для стабильности нужно, чтобы версии и эндпоинты между платформами не расходились. В Платформе ИИ мы по умолчанию используем Python 3.11, но поддерживаем также и Python 3.8 как LTS. В дистрибутиве ADH по умолчанию стоит Python 3.10, но можно доставить любую нужную, что мы и сделали. Таким образом, для неинтерактивного режима версии Python между Платформой ИИ и Озером стали синхронизованы.
С интерактивным взаимодействием оказалось сложнее — в ADH 3.3.6.1 стоит версия Livy на базе релиза 0.7, который был разработан под Python 3.8, и поддержки Python 3.11 в ней ещё нет. Мы написали об этом вендору, он обещал нам обновить этот момент в следующих версиях. Пока же мы для интерактивного режима остались на Python 3.8. Конечно, это не блокирует работу, но фактор в любом случае сдерживающий — например, нельзя запустить несколько команд Spark в одной ячейке.
Для правильной работы интерактивного режима понадобилось также донастроить Kerberos-аутентификацию по всем участвующим компонентам, поскольку процесс контролируется со стороны ИБ, и без этого не работает. Кроме того, мы подключили в Livy имперсонацию, чтобы задание в Spark запускалось от учётки пользователя и с его правами, а не от «дефолтного» пользователя spark.
Ещё одна задача, которую нам потребовалось решить — синхронизация настроек подключения к Озеру. Адреса сервисов платформы ADH (Hive, Impala, Livy, HDFS, YARN) в нашем случае не прибиты гвоздями и могут меняться при установке патчсетов, донастройке и расширении системы. Если эти параметры начинают расходиться с теми, что указаны в настройках у Платформы ИИ, мы получаем рандомные ошибки подключения. Поэтому мы настроили на стороне Озера процесс, который автоматически собирает текущую конфигурацию и выгружает её в боевое хранилище настроек, откуда Платформа ИИ их забирает.
Теперь все интеграции были готовы, протестированы, все настройки выполнены и синхронизованы, и мы могли запускаться на бою.
Танец с транзакционными таблицами
Вот мы радостно запустили первые отряды Spark-экзекьюторов вытащить из Озера данные, и... больше половины вернулись к нам "с пустыми руками". Примерно 60% таблиц на боевом Озере данных при попытках чтения возвращали пустые датасеты. Никаких ошибок нет, просто датасет пустой:

Мы попытались прочесть файлы в Hadoop напрямую, и результат чтения приблизил нас к пониманию сути проблемы — таблицы оказались транзакционными. Вот такая структура у транзакционного файла:

Выяснили, что Озеро данных постоянно доливает в таблицы инкременты, и чтобы не блокировать чтение со стороны всех его потребителей, при проектировании Озера многие таблицы сделали транзакционными. Это исключает возможность чтения несогласованных состояний таблиц.
Как можно видеть из структуры ORC-файла выше — в нем есть 5 технических полей и поле с данными. При этом с точки зрения Hive описаны только поля с данными. В метаданных таблицы не отражено, что строка хранится в некотором JSON-формате. Поэтому Spark читает ее как обычную таблицу, ожидая получить только строки, соответствующие схеме метаданных Hive. Признак транзакционности таблицы Spark не учитывает, и получив строки в другом формате, в итоге отбрасывает всё их содержимое — с его точки зрения оно не соответствует схеме данных.
Мы могли бы попробовать прочитать эти данные напрямую и распарсить их из JSON-формата, но наверняка должно было существовать промышленное решение, которое бы сделало это за нас.
Поговорили с владельцами Озера и с вендором Arenadata — в новейших релизах ADH в платформу добавили поддержку формата метаданных Iceberg, в котором работа с транзакционными таблицам есть уже из коробки. Это целевое решение, и Озеро на него перейдёт. Но в моменте у нас есть транзакционные таблицы Hive и нет возможности подождать несколько месяцев, пока все процессы в Озере, связанные с транзакционными таблицами, начнут сохранять данные в формате Iceberg.
Таким образом, единственным вариантом было своими силами сделать API для работы с транзакционными таблицами на текущей версии ADH. Поискали, с чего можно начать, и нашли библиотеку Hive ACID Data Source for Apache Spark. Она реализует поддержку обработки транзакционных таблиц на базе API Spark Datasource V1, поэтому есть ограничения и с производительностью, и с функциональностью, но как начальное решение нас это устроило. Библиотека сделана была для Spark 2 и Hive 3. А у нас Spark 3 и Hive 4. У создателей библиотеки скорее всего есть более новые версии, но уже с закрытым исходным кодом. Поэтому мы сделали форк от последней доступной по лицензии Apache 2.0 версии, смигрировали его исходный код на новые версии Spark и Hive, а сборку адаптировали к окружению ADH. И теперь новая версия поддерживает то, что нам нужно, и позволяет нам работать с такими табличками:

Что дальше?
В будущем мы планируем использовать помимо встроенного в ADH сервера Spark также поднять у себя на платформе собственный Spark-кластер, а также History Server. Попутно мы сделаем интеграцию журналов таким образом, чтобы разработчики и инженеры, создающие и эксплуатирующие процессы на Платформе ИИ, имели одну точку для просмотра и анализа логов Airflow и Spark. Это упростит нам задачу поддержки сетевой связности, а также позволит контролировать версию Spark на своей стороне. С точки зрения интерактивного режима многообещающим выглядит Spark Connect, но для него предстоит решить задачу обеспечения информационной безопасности. А сейчас у дата-сайентистов и дата-инженеров появился стандартный и производительный инструмент для работы с витринами.
Итак, в этот статье мы рассмотрели сценарий технологической интеграции для работы с витринами под классические модели машинного обучения. И это одна из тысячи движущихся частей. Что ждёт платформы ИИ и платформы данных в ближайшие 3-5 лет — можно подглядеть в ИИ-стратегиях крупных технологических компаний. Не забегая слишком далеко в general AI, подумаем хотя бы про agentic AI. Вместе с живыми дата-инженерами средствами обработки данных будет оперировать АИ-агент. И он будет работать не один, а в сотрудничестве с другими агентами через handoff'ы, и вся Платформа данных таким образом теснее сблизится с Платформой ИИ. Вместо хранилища файлов это решение уже станет генератором ответов на вопросы. А это означает, что над теми слоями взаимодействия, которые есть в наших платформах сегодня, нарастут новые, ещё более сложные, которые мы сейчас только осмысляем — ищем в них ценность, понимаем, как их строить и как ими управлять. И на следующем этапе эволюции платформ данных движущихся частей будет уже тысяча, а миллион. Задача нашей Лаборатории ИИ — собрать этот миллион частей так, чтобы он работал быстро, надёжно и эффективно.
