
Всем привет! Меня зовут Александр Токарев, я работаю в Yandex Infrastructure и занимаюсь интеграцией Apache Spark (далее просто Spark) с YTsaurus. В этой статье я расскажу про то, как мы сначала форкнули и пропатчили Spark, а потом вернулись к использованию оригинальной версии и поддержали совместимость с множеством других версий.
YTsaurus — это разработанная Яндексом система для хранения и обработки больших объёмов данных. Она активно развивается с 2010 года, а в 2023 году была выложена в опенсорс. Подробнее почитать про историю создания и выход YTsaurus в опенсорс можно в статье Максима Бабенко.
В какой‑то момент мы решили подружить YTsaurus и Spark. Так и родился проект SPYT powered by Apache Spark (далее просто SPYT), который активно развивается с 2019 года. Основательница проекта Саша Белоусова уже рассказывала, как были реализованы SPI Spark для работы со структурами данных YTsaurus — это набор классов, интерфейсов, методов, которые мы расширяем или реализуем. Во многом эта статья и моё выступление на HighLoad++ 2024 являются продолжением её доклада.
Зачем мы форкнули Spark
В опенсорс‑мире достаточно широко распространена практика создания форков популярных библиотек. Как правило, форки создают, когда хотят быстро что‑то доработать. Не обошло это стороной и нас: в начале существования проекта SPYT мы также сделали свой форк Apache Spark чтобы внести в него несколько доработок. Перечислю лишь некоторые из них:
Кластерный режим для Python‑скриптов в Spark Standalone. В основном, в Яндексе (да и не только в Яндексе) аналитики данных любят работать, используя Python в качестве основного языка программирования. В первой реализации SPYT мы поднимали кластер Spark Standalone внутри vanilla‑операции YTsaurus.
В оригинальной версии Spark почему‑то при работе со Standalone‑кластером нельзя запускать PySpark‑приложения в кластерном режиме. Нам же это было необходимо, чтобы все компоненты Spark‑приложения работали в периметре YTsaurus. И при этом не было сетевого взаимодействия из кластера YTsaurus с клиентскими машинами, как это подразумевается при работе в client mode.
В итоге в своём форке мы доработали кластер Spark Standalone, чтобы была возможность запускать PySpark‑приложения в кластерном режиме.
Получение статуса приложения и драйвера по REST. Кластер Spark Standalone в своей базовой реализации уже содержит ряд REST‑методов для запуска и получения статуса Spark‑приложений. Мы расширили имеющийся REST API для получения более полной информации, такой как связка id драйвера с id приложения и т. д.
Нативная поддержка UInt64 и YSON. Типы данных UInt64 и YSON есть в YTsaurus, но они не поддерживаются в Spark.
YSON — это наш структурный тип данных, который чем‑то напоминает JSON (а точнее, jsonb). Он стандартен для хранения структурированных данных в таблицах YTsaurus.
UInt64 — беззнаковый целочисленный тип, который есть в ряде языков программирования, таких как C++, Rust и т. д., но нет среди jvm‑примитивов.
Поддержка IPv6. Также в Spark на тот момент не было вообще никакой поддержки IPv6, а в Яндексе IPv6 применяется достаточно давно. Поэтому нам пришлось реализовать эту поддержку самим. Собственно, IPv6 в Spark появился только в версии 3.4, которая вышла только в начале 2023 года.
Дополнительные оптимизации. Мы сделали ряд доработок для оптимизаций плана выполнения запросов, которые учитывали партиционированность и отсортированность данных в YTsaurus, чтобы не делать дополнительных сортировок, шаффлов и так далее. Подробнее про это можно узнать из статьи Алексея Шишкина по мотивам его выступления на Saint Highload++ 2023.
Почему раньше форк был приемлем
Решение с форком поначалу было вполне приемлемым для нас, потому что в целом нашим пользователям не нужен был сам Spark как таковой. Им нужно было решение, позволяющее запускать Spark‑задачи на YTsaurus в виде некоторого сервиса, который они бы могли использовать. Так и появился наш проект SPYT, который изначально представлял собой форк Spark плюс реализации Spark SPI для обеспечения работы с YTsaurus.
И это был достаточно простой и быстрый способ сделать Proof of Concept. Основной фокус разработчиков в начале проекта был на том, чтобы обеспечить производительность Spark‑приложений при работе с данными, хранящимися в YTsaurus. В первую очередь мы хотели добиться, чтобы Spark‑задачи выполнялись за хотя бы примерно такое же время, за которое аналогичные расчёты выполняются на других системах.
Так как мы добавили в свою кодовую базу форк Spark, то возник ряд ограничений, который накладывало его использование.
Во‑первых, мы стали жёстко привязаны к определённой версии Spark (3.0.1).
Во‑вторых, когда мы решили обновиться до версии Spark 3.2.2, встал вопрос о миграции наших доработок с сохранением их функциональности в новой версии Spark.
В‑третьих, увеличился объём общей кодовой базы проекта за счёт включения в неё кода самого Spark, для которой нужно было настраивать CI/CD‑пайплайны. Соответственно, это увеличивало общее время на CI/CD нашего проекта.
А в марте 2023 года произошло знаковое для нас событие — YTsaurus вышел в опенсорс. Про то, как мы выходили в опенсорс, и с какими проблемами столкнулись, рассказывал Андрей Ривкин на московском HighLoad++ 2023.
После выхода в опенсорс SPYT столкнулся с дополнительными проблемами:
Пришлось включать в наш репозиторий модифицированную версию Apache Spark, что увеличило кодовую базу проекта более чем на 2М строк кода.
Использование фиксированной версии в форке усложнило процесс миграции для процессов из других систем, которые использовали версии Spark, отличающиеся от используемой в нашем форке.
Усложнился процесс внесения изменений для сторонних контрибьюторов проекта, которым также необходимо было разбираться со сборкой форка. Из‑за этого порог входа для новых разработчиков был достаточно высокий.
Кроме того, после выхода в опенсорс, к проекту возник ряд новых требований:
Нужна возможность использовать произвольную версию Spark. Пользователи опенсорс‑версии хотели использовать не фиксированную версию Spark, которую мы им даём, а произвольную, поэтому мы должны были обеспечить совместимость не с одной конкретной версией, а с широким диапазоном современных версий Spark.
Упростить процесс сборки и тестирования проекта для сторонних контрибьюторов. В изначальной версии сборки проекта очень много было завязано на нашу внутреннюю специфику сборки и деплоя. После выхода в опенсорс пришлось существенно отрефакторить и упростить этот процесс для разработчиков, которые приносят доработки извне.
Унифицировать подходы использования Spark с другими системами распределённой обработки (Hadoop + YARN, S3 + Kubernetes …). Люди привыкли использовать Spark по каким‑то определённым паттернам. То есть, кто‑то работает в Hadoop, кто‑то в Kubernetes и хранит данные на S3, но паттерны взаимодействия со Spark у них примерно одинаковые. Мы хотели, чтобы эти же паттерны можно было применять и в YTsaurus, используя стандартные скрипты Spark, такие как spark‑submit или spark‑shell. Мы хотели, чтобы люди, которые решили попробовать YTsaurus и сравнить Spark‑процессы с другими системами, могли достаточно легко перенести свои Spark‑процессы, например, из того же Hadoop в YTsaurus.
Таким образом, после выхода в опенсорс мы поставили себе следующие задачи:
Перенести все наши доработки, которые мы сделали в форке, в наш проект SPYT.
Перейти на использование оригинального дистрибутива Spark.
Расширить перечень поддерживаемых версий Spark.
Варианты переноса наших доработок из форка в SPYT, которые не подошли
Итак, первая задача — перенести наши доработки из форка к себе. В целом, она была достаточно нетривиальная: для неё мы перепробовали несколько подходов решения. Для начала расскажу про те, что не подошли.
Выделение доработок в отдельные классы и подклассы. Мы сначала думали пойти по классическому пути и вынести наши доработки в отдельные классы и подклассы. Но, к сожалению, наши доработки достаточно глубоко проросли в код Spark, в том числе в приватные и статические методы, которые не поддаются расширению в подклассах.
Кроме того, сам код Spark тоже написан был далеко не идеально, без должного уровня декомпозиции. Иногда приходилось вносить одну строчку в какой‑нибудь метод из нескольких сотен строчек (мой любимый — это вот этот, prepareSubmitEnvironment), и стандартные способы расширения функциональности здесь никак бы не подошли.
Переписывание нужных классов заново и загрузка их в более приоритетном порядке. Следующим шагом мы думали просто переписать существующие классы и поставить их на загрузку в ClassLoader в более приоритетном порядке. Но в Spark есть много крупных классов по несколько тысяч строк — очень не хотелось копировать такой класс целиком себе в проект ради изменения всего нескольких строк. Это привело бы к большим объёмам копипасты, а это, на мой взгляд, ещё хуже, чем форк.
Внесение доработок в основной репозиторий Spark. Мы также думали пойти по пути внесения наших доработок в основной репозиторий Spark. В целом, это было бы самым правильным решением, но у него есть ряд существенных недостатков.
Во‑первых, сам процесс внесения доработок в популярный проект достаточно длительный по времени. Для Spark нам пришлось бы сначала написать SPIP, убедить мейнтейнеров Spark, что мы действительно принесли полезные доработки, потом сделать пул‑реквест, пройти ревью с ними и т. д.
Во‑вторых, даже если все наши доработки будут приняты, то появятся они только в новых версиях Spark. Мы же используем какую‑то определённую, уже вышедшую версию Spark, и миграция на новую версию — это дополнительная нетривиальная задача. Да и в целом основной наш риск был в том, что наши доработки специфичны именно под YTsaurus, и с высокой вероятностью большую их часть вряд ли бы взяли в основной репозиторий Spark.
Для нас же важно было сохранить наши доработки в используемой версии Spark, но при этом перейти на использование оригинальных версий. Перепробовав все варианты, о которых я написал чуть выше, мы поняли, что нельзя просто так взять и перенести все наши доработки из форка в SPYT.
В конце концов, мы обратили внимание на подход, используемый в разработке на Python — monkey patching. Его суть заключается в том, чтобы изменять поведение классов и методов во время выполнения программы.
Приведу небольшой пример. Допустим, есть у нас простой класс Dog — собачка, который умеет лаять. Мы создаём экземпляр этой собачки, вызываем у неё метод woof
, она предсказуемо лает.
class Dog:
def woof(self):
print("WOOF")
catdog = Dog()
catdog.woof()
# prints WOOF
Нам не нравится, что эта собачка лает, мы хотим, например, чтобы она мяукала. В Python для этого нужно написать метод с такой же сигнатурой, как оригинальный метод, и затем присвоить методу woof
класса Dog имя нового метода.
def meow(self):
print("MEOW")
Dog.woof = meow
catdog.woof()
# prints MEOW
В результате получаем, что при вызове метода woof из существующего экземпляра мы получаем изменённое поведение, то есть наша собачка вместо того, чтобы лаять, начинает мяукать.
Таким образом, в Python при помощи monkey‑patching можно получить такого замечательного котопса.

Благодаря этому подходу удалось перенести все доработки, которые мы делали в PySpark, в наш проект. Это выглядит следующим образом: мы пишем наши дополнительные методы для классов, определённых в PySpark, либо переопределяем существующие в них методы, и затем патчим их на стадии инициализации нашего пакета spyt в файле __init__.py.
Таким образом, если мы хотим использовать пакет PySpark с нашими расширениями, первым делом нужно импортировать наш проект SPYT. Тогда в init‑скрипте пропатчатся все необходимые методы и классы в оригинальном PySpark, а дальше мы можем работать со Spark по стандартной схеме:
import spyt
from pyspark.sql import SparkSession
spark = (SparkSession.builder
.appName("My Application")
.getOrCreate()) # Создаём Spark сессию обычным способом
try:
df = spark.read.yt("//path/to/some/table")
# используем для чтения метод .yt(...), который был добавлен нами в процессе инициализации пакета spyt
... # Application code
finally:
spark.stop()
В целом, этот подход нас вполне устроил. Прежде всего тем, что в отличие от других здесь можно сделать чёткое разделение между оригинальным кодом и кодом, который добавляем мы, не прибегая при этом к копированию.
Но так как Spark написан на нескольких языках, основные из которых это компилируемые Scala и Java, перед нами встал вопрос о том, как применить аналогичный подход к скомпилированному коду. Всё‑таки Python в первом приближении можно считать интерпретируемым языком, подразумевая под этим, что у нас есть в runtime доступ к исходному коду. Но основная часть Spark написана преимущественно на Scala, который является компилируемым JVM‑языком и у которого в runtime есть доступ только к скомпилированному байт‑коду.
Как сделать monkey patching скомпилированного кода
Мы начали искать способ применить аналогичный подход для скомпилированного кода. В JVM есть возможность подключения дополнительных модулей для различных служебных целей через параметр javaagent
, передаваемый при запуске приложения. Сам дополнительный модуль передаётся в виде jar-файла:
java -javaagent:agent.jar -jar application.jar
Внутри этого jar‑файла как минимум должен находиться класс, у которого есть статический метод premain
. Он должен принимать два параметра: первый — это пользовательские аргументы агента, и второй — это реализация интерфейса Instrumentation.

Само название метода premain говорит, что этот код будет вызван перед вызовом метода main нашего основного приложения. Нас же в первую очередь интересует второй аргумент, реализация интерфейса Instrumentation.
Вообще, само название этого интерфейса говорит о том, что разрабатывался он, прежде всего, для поддержки различных инструментальных средств для JVM, таких как отладчики, профилировщики, мониторинг‑агенты и т. д.
В этом интерфейсе, помимо всего прочего, содержатся различные методы для работы с байт‑кодом классов на этапе их загрузки через ClassLoader. В первую очередь нас интересует метод addTransformer
, при помощи которого мы можем добавить хук, который будет модифицировать байт‑код класса при его загрузке в JVM. Таким образом, нам всего лишь остаётся изменить байт‑код нужных нам классов на этапе их загрузки, чтобы в runtime получить изменённое поведение.
К счастью, менять сам байт‑код напрямую нам не пришлось. Вместо этого мы воспользовались библиотекой Javaassist из зависимостей Spark. Она позволяет манипулировать байт‑кодом уже на высоком уровне, используя древовидную объектную модель, чем‑то напоминающую DOM‑дерево веб‑страницы. Затем можно работать с этой объектной моделью при помощи API Javassist без необходимости спускаться на уровень байт‑код‑инструкций (но при желании это также можно сделать), и затем сериализовывать полученный результат в байт‑код. Таким образом, при помощи Javaassist можно получить модифицированный байт‑код любого класса сравнительно несложным образом.
Итак, мы придумали способ делать monkey‑patching в JVM при помощи javagent и Javaassist. Он состоит из нескольких простых шагов:
Получаем доступ к исходному байт‑коду класса во время его загрузки в память через javaagent.
Модифицируем его при помощи библиотеки Javaassist.
Отдаём ClassLoader'у изменённый байт‑код.
Получаем в runtime изменённое поведение.
Способы, которые мы придумали для патчинга классов в runtime в JVM, заслуживают отдельного доклада, который я делал на конференции Joker 2024. В нём я подробно рассказываю о четырёх способах, придуманных нами для описания вносимых изменений и их реализацию. Здесь же покажу наиболее характерный пример, как мы патчим байт-код на высоком уровне.
Допустим, у нас код на языке Scala. Это object, внутри которого есть приватный метод. Object в Scala используется для определения статических методов, не привязанных к какому-либо классу, и их нельзя переопределить. В этом объекте есть приватный метод, к которому мы хотим добавить одну строчку — дополнительный case
, в инструкцию match
:
object RowEncoder {
private def serializerFor(
inputObject: Expression,
inputType: DataType): Expression = inputType match {
case dt if ScalaReflection.isNativeType(dt) => inputObject
case p: PythonUserDefinedType => serializerFor(inputObject, p.sqlType)
case udt: UserDefinedType[_] =>
val annotation = udt.userClass.getAnnotation(...)
val udtClass: Class[_] =
// ...
// Эту строку мы хотим добавить, её нет в оригинальном коде
case UInt64Type => UInt64Long.createSerializer(inputObject)
case TimestampType =>
if (SQLConf.get.datetimeJava8ApiEnabled) {
// ...
Чтобы изменить байт‑код этого метода, мы будем использовать декорирование. Суть этого способа в том, что мы должны написать декоратор исходного метода, у которого будет такая же сигнатура, как в оригинальном методе. Делать это мы будем в отдельном объекте. Затем, уже в runtime, мы переименовываем оригинальный метод по определённому правилу, например добавляем префикс, состоящий из двух знаков подчёркивания, и переносим байт‑код метода‑декоратора из нашего вспомогательного объекта в оригинальный.
Чтобы можно было вызывать оригинальный метод из метода‑декоратора, во вспомогательном объекте мы должны создать заглушку для оригинального метода с именем, соответствующим новому имени оригинального метода. Код вспомогательного объекта с декораторами выглядит следующим образом:
@Decorate
@OriginClass("org.apache.spark.sql.catalyst.encoders.RowEncoder")
object RowEncoderDecorators {
@DecoratedMethod
private def serializerFor(inputObject: Expression,
inputType: DataType): Expression = inputType match {
case UInt64Type => UInt64Long.createSerializer(inputObject)
case _ => __serializerFor(inputObject, inputType)
}
private def __serializerFor(inputObject: Expression, inputType: DataType): Expression = ???
}
Аннотации, поставленные на объект и на метод, позволяют указать, какой класс мы хотим пропатчить, и какой метод оригинального класса мы хотим декорировать. Они также разработаны нами, чтобы выбрать, какую из четырёх стратегий применять для каждого конкретного случая. Полный код модуля для runtime monkey‑patching в JVM можно посмотреть в нашем опенсорс‑репозитории.
Этот подход показал себя вполне жизнеспособным и удобным прежде всего за счёт того, что мы в runtime можем поменять практически любую функциональность любого класса, в том числе приватную. Кроме того, в этом подходе все модификации описываются на языке высокого уровня (Java и Scala), и почти не возникает необходимость править сам байт‑код.
Справедливости ради хочу отметить, что ради одной доработки нам всё‑таки пришлось делать изменения на уровне байт‑кода, но в целом JVM‑байт‑код — это несложно, особенно при работе с удобными инструментами вроде Javassist.
К недостаткам этого способа можно отнести то, что в целом сам подход немножко хакерский. Для новых разработчиков будет требоваться некоторое время, чтобы разобраться в его сути и понять принципы. Кроме того, при выводе stacktrace для пропатченных классов и методов иногда указывается строка не в коде оригинального класса, а в коде класса‑патча.
Но в целом этот способ по своей сути ничуть не хуже других, более явных способов изменения функциональности. В конце концов, и при наследовании, и при использовании dependency injection мы также меняем поведение оригинальной системы. Просто в этом способе мы это делаем на более глубоком уровне, поэтому в целом с этой технологией нужно обращаться более осторожно. Если доступны более традиционные и понятные способы расширения функциональности, то лучше воспользоваться ими. И уж совсем в крайнем случае, когда больше ничего не остаётся, прибегать к runtime‑патчингу.
Но в итоге этот подход, в конце концов, позволил нам перенести все доработки из форка к себе и перейти на использование оригинального дистрибутива Spark. В мае 2024 года мы наконец‑то избавились от форка в нашем опенсорс‑репозитории YTsaurus, уменьшив кодовую базу проекта более чем на два миллиона строк, а заодно переехали в отдельный репозиторий YTsaurus SPYT.
И самое главное: в результате избавления от форка общее время сборки SPYT сократилось с 50 минут до 2–3 минут!
Что ж, избавились от форка. Что дальше?
Теперь перед нами встала следующая задача: обеспечить совместимость SPYT с как можно более широким диапазоном версий Spark.
Избавившись от форка, мы всё ещё требовали, чтобы SPYT работал с использованием Spark версии 3.2.2, пусть теперь и уже оригинальной сборки. И мы начали постепенно ослаблять это ограничение.
Для начала мы проверили совместимость с версиями 3.2.x, которые согласно SemVer должны быть также совместимы. Прогнав весь наш набор модульных и интеграционных тестов с версиями 3.2.3 и 3.2.4, мы в этом убедились и стали смотреть в сторону более новых версий. Со следующим поколением версий Spark, начиная с версии 3.3.0, мы уже оказались несовместимыми.
Необходимое условие для того, чтобы SPYT мог быть использован с определённой версией Spark — его бинарная совместимость с ней. Соответственно, наша задача стала выглядеть так, что мы должны обеспечить бинарную совместимость со всеми современными версиями Spark.
Определение бинарной совместимости можно найти в спецификации JVM (Binary Compatibility). Основной момент, который оттуда нужно вынести, это то, что если существующая сборка, слинкованная с предыдущей версией библиотеки, также линкуется с новой версией, то они считаются бинарно совместимыми. Процесс линковки в JVM также описан в спецификации, в разделе Linking, и основное его отличие от линковки в C++, в том, что в JVM она происходит в runtime.
Линковка в JVM нужна для объединения отдельных сущностей, таких как классы, в некую исполняемую модель, которая в итоге формирует поток исполнения программы (control flow). Она происходит через процесс разрешения, который также определён в спецификации (Resolution). Самое главное здесь — сущности для взаимодействия определяются по символьным ссылкам.
Вот небольшой пример:
import org.apache.spark.sql.execution.datasources.v2.FileScan
import org.apache.spark.sql.SparkSession
// ...
case class YtScan(spark: SparkSession, ...) extends FileScan {
override def createReaderFactory(): ReaderFactory = {
spark.sparkContext.broadcast(...)
// ...
}
// ...
}
Это фрагмент кода класса YtScan
из нашего проекта (полный код доступен здесь). Он взаимодействует с несколькими классами, определёнными в Spark: классы FileScan и класс SparkSession. От класса FileScan он наследуется и переопределяет в нём метод. Переменная класса SparkSession передаётся ему в качестве аргумента конструктора, у которой он потом вызывает некоторые методы. То есть тут виден полный набор всех возможных видов взаимодействий: расширение класса, переопределение метода, использование класса в качестве параметра и вызов методов.
Чтобы понять, как выглядят символьные ссылки во всех этих случаях, давайте посмотрим соответствующий байт‑код этого фрагмента.
В общем случае байт‑код любого скомпилированного класса для JVM состоит из трёх секций. Первая секция — это заголовок. В ней определяется ряд общих параметров, такие как имя самого класса, имя суперкласса, имена и количество расширяемых интерфейсов, минимальная совместимая версия JVM и т. д. Все значения являются целыми числами, в случае имён это числовые ссылки.
Байт-код класса YtScan (Header):
public class #2 implements #6, #8, #10, #12
minor version: 0
major version: 52
flags: (0x0021) ACC_PUBLIC, ACC_SUPER
this_class: #2
super_class: #4
interfaces: 4, fields: 24, methods: 112, attributes: 6
Значения числовых ссылок определены в следующей секции, которая называется пул констант (Constant Pool). Каждая запись в нём состоит из двух компонентов: типа значения и самого значения. Все строковые константы объявлены как значения типа Utf8, значения же других типов просто ссылаются на эти константы, используя числовые ссылки.
Таким образом, если мы пойдём по числовой ссылке из заголовка для поля this_class: #2, то в пуле констант под этим номером будет сущность типа Class, и его имя определяется константой #1, которая определена как Utf8 и имеет значение org/apache/spark/sql/v2/YtScan.
Байт-код класса YtScan (Const pool):
Constant pool:
#1 = Utf8 org/apache/spark/sql/v2/YtScan
#2 = Class #1
#3 = Utf8 java/lang/Object
#4 = Class #3
#5 = Utf8 org/apache/spark/sql/.../v2/FileScan
#6 = Class #5
#7 = Utf8 org/.../read/SupportsReportPartitioning
#8 = Class #7
#9 = Utf8 scala/Product
#10 = Class #9
#11 = Utf8 scala/Serializable
#12 = Class #11
...
Для классов, методов и полей из фрагмента класса выше в пуле констант также определены соответствующие сущности. Например, методы определены как комбинация (Methodref) класса, имени метода и его сигнатуры (NameAndType), которые, в свою очередь, также определены строковыми константами Utf8:
...
#50 = Utf8 spark
...
#322 = Utf8 ()Lorg/apache/spark/sql/SparkSession;
...
#368 = Utf8 createReaderFactory
#369 = Utf8 ()Lorg/apache/spark/.../ReaderFactory;
#370 = NameAndType #50:#322
#371 = Methodref #2.#370
#372 = Utf8 org/apache/spark/sql/SparkSession
#373 = Class #372
#374 = Utf8 sparkContext
#375 = Utf8 ()Lorg/apache/spark/SparkContext;
#376 = NameAndType #374:#375
#377 = Methodref #373.#376
...
Реализации самих методов следуют в следующей после пула констант секции. Каждый метод состоит из секции метаданных и последовательности инструкций байт‑кода. Почти всегда аргументами инструкций байт‑кода являются числовые константы, определённые в пуле констант:
Байт-код класса YtScan (Method):
method: #368
descriptor: #369
flags: (0x0001) ACC_PUBLIC
Code:
stack=12, locals=3, args_size=1
0: aload_0
1: invokevirtual #371
4: invokevirtual #377
...
166: areturn
Таким образом, на уровне байт‑кода вызов метода представляет собой инструкцию invokevirtual с аргументом в виде числовой ссылки на объект типа Methodref, который, в свою очередь, определён в пуле констант через символьные ссылки. Отсюда мы видим, что если сущность, с которой мы взаимодействуем — будь то класс, метод или поле, — доступна по определённой символьной ссылке, то, значит, мы можем с нею слинковаться и получить исполняемый код.
В итоге мы можем вывести критерии бинарной совместимости, между различными библиотеками JVM:
Полные имена используемых классов и интерфейсов должны оставаться неизменными.
Имена и сигнатуры используемых нами методов также не должны меняться.
В целом, эти критерии также прописаны в спецификации JVM, но в нашем случае мы расширили их для использования приватных сущностей, чтобы работал наш механизм runtime‑патчинга.
Однако сам Spark не стоит на месте и продолжает развиваться, и время от времени в нём меняются как сами классы, которые мы используем, так и сигнатуры используемых нами методов. Для таких случаев мы решили абстрагировать наш код от изменяемых частей Spark путём введения адаптеров к различным версиям Spark:
// Обязан компилироваться с любой версией Spark
trait SparkAdapter {
def minSparkVersion: String
def createPartitioning(nPartitions: Int): Partitioning
// ...
}
object SparkAdapter {
val sparkVersion: String = org.apache.spark.SPARK_VERSION
lazy val instance: SparkAdapter = loadInstance(sparkVersion)
// ...
}
Для каждого семейства версий, то есть 3.2.x, 3.3.x и так далее, мы сделали свои реализации SparkAdapter. Каждая версия адаптера компилируется с использованием соответствующей версии Spark, SPYT же при этом может компилироваться с любой версией. Затем в runtime мы через константу SPARK_VERSION определяем, какая версия Spark у нас лежит на Classpath, и отдаём соответствующую реализацию адаптера.
Приведём небольшой пример на базе уже знакомого нам класса YtScan. Между версиями Spark 3.2.x и 3.3.x был изменён состав методов и реализации интерфейса Partitioning. Соответственно, реализация метода outputPartitioning, которая была сделана нами для версии Spark 3.2.2, стала бинарно не совместимой с версией 3.3.0. Поэтому для каждого семейства версий мы вынесли реализацию этого метода в соответствующий адаптер, а в самом классе YtScan вместо непосредственной реализации этого метода мы стали делегировать её адаптеру:
import org.apache.spark.sql.execution.datasources.v2.FileScan
import org.apache.spark.sql.SparkSession
// ...
case class YtScan(spark: SparkSession, ...) extends FileScan {
// ...
override def outputPartitioning(): Partitioning = {
SparkAdapter.instance.createPartitioning(pLength)
}
// ...
}
В реализациях соответствующих адаптеров мы создаём реализации интерфейса Partitioning, совместимые с соответствующей версией Spark. Примеры реализаций адаптеров приведены ниже:
// Обязан компилироваться со Spark 3.2.2
class SparkAdapter322 extends SparkAdapter {
override def minSparkVersion: String = "3.2.2"
override def createPartitioning(nPartitions: Int): Partitioning = {
new YtScanPartitioning(nPartitions)
}
// ...
}
// Обязан компилироваться со Spark 3.3.0
class SparkAdapter330 extends SparkAdapter {
override def minSparkVersion: String = "3.3.0"
override def createPartitioning(nPartitions: Int): Partitioning = {
new UnknownPartitioning(nPartitions)
}
// ...
}
Также нам пришлось немного доработать наш механизм JVM monkey patching для поддержки работы с различными версиями Spark. Всё ограничилось лишь тем, что мы просто ввели механизм условного патчинга путём добавления дополнительной аннотации, которая показывает, для какой версии Spark у нас применим определённый патч.
Приведём небольшой пример: в классе RowEncoder методу serializerFor добавили дополнительный аргумент начиная с версии 3.3.0. Следовательно, сигнатура этого метода изменилась, и поэтому патч, разработанный под версию 3.2.2, больше не подходит. Соответственно, мы сделали аналогичный патч под метод с новой сигнатурой, который можно использовать начиная с версии 3.3.0, и ограничили аннотацией область применения каждого патча:
Runtime Рatching и широкая совместимость:
@Decorate
@OriginClass("org.apache.spark.sql.catalyst.encoders.RowEncoder")
object RowEncoderDecorators {
@DecoratedMethod
@Applicability(to = "3.2.4")
private def serializerFor(inputObject: Expression,
inputType: DataType): Expression = ...
@DecoratedMethod
@Applicability(from = "3.3.0")
private def serializerFor(inputObject: Expression,
inputType: DataType,
lenient: Boolean): Expression = ...
}
Для запуска SPYT с использованием адаптеров все их реализации должны находиться на Classpath вместе с дистрибутивами SPYT и Spark. Определение нужной версии адаптера происходит уже во время выполнения на основании версии Spark, которую положили на Classpath:
java -cp spark-adapter-api.jar: \
spark-adapter-impl-322.jar: \
spark-adapter-impl-330.jar: \
spark-adapter-impl-340.jar: \
spark-adapter-impl-350.jar: \
spyt-***.jar: \
spark-***-3.X.X.jar: \
...
Необходимость указывать в Classpath все реализации адаптеров вместо какой‑то определённой связана с тем, что итоговая реализация адаптера является композицией из различных его компонентов. Каждый компонент представляет собой адаптер конкретной функции, и реализован не для всех версий Spark, а только для тех, в которых была нарушена бинарная совместимость этой функции. Поэтому, чтобы избежать дублирования кода, в каждый jar адаптера было помещено только то, что изменилось в сравнении с предыдущей версией.
Широкая совместимость в PySpark
В PySpark обеспечить широкую совместимость с различными версиями Spark было гораздо проще. Во‑первых, доля кода на Python в репозитории Spark составляет чуть больше 15% от всей кодовой базы. Во‑вторых, PySpark — это всего лишь Python API к JVM‑runtime, и на нём нет Spark‑Core‑функциональности. Кроме того, код на Python гораздо проще патчить в runtime, чем JVM байт‑код. В Python мы можем воспользоваться преимуществами динамической типизации, а также применить подход monkey‑patching, уже упоминавшийся в этой статье. Версия Spark также доступна в runtime, и мы можем писать код, зависимый от версии, следующим образом:
PySpark version-dependent code:
from pyspark import __version__ as spark_version
if semver_compare(spark_version, "3.3.0") < 0:
# Code for versions 3.2.x and below
...
else:
# Code for versions 3.3.0 and above
...
Тестирование совместимости
И последняя тема, на которой я хотел бы остановиться, — это тестирование SPYT с различными версиями Spark. Для запуска модульных тестов мы компилируем весь проект и тесты с использованием последней релизной версии Spark, но при запуске подкладываем на Classpath произвольную версию Spark, с которой мы хотим проверить совместимость:
# Запуск модульных тестов:
# Компиляция с использованием последней релизной версии Spark
# Запуск - с использованием произвольной версии
sbt -DtestSparkVersion=3.3.4 clean test
Также у нас есть интеграционные тесты, которые тестируют и JVM‑часть, и Python‑часть. Для этого мы используем библиотеку tox, которая определяет ряд окружений, позволяющих тестировать c различными комбинациями версий Python и PySpark. Пример конфигурации tox для тестирования приведён ниже:
# Интеграционное тестирование с использованием tox:
envlist =
py{39, 311, 312}-spark322
py311-spark{324, 330, 334}
...
deps =
pytest==8.2.1
ytsaurus-client==0.13.20
ytsaurus-yson==0.4.9
spark322: pyspark==3.2.2
spark334: pyspark==3.3.4
...
setenv =
py39: python_path=python3.9
py311: python_path=python3.11
Чтобы запустить интеграционные тесты с использованием tox, для начала необходимо собрать весь проект. Из артефактов сборки необходимо взять артефакт в виде whl‑пакета, который нужно установить в то же виртуальное окружение, где установлена библиотека tox. И затем запускаются сами тесты либо с использованием определённых конфигураций окружения, либо с использованием всех возможных комбинаций:
# Компиляция и сборка проекта
sbt clean package
# Артефакт - whl-пакет, устанавливаемый в tox-окружение
pip install target/ytsaurus-spyt-XXX.whl
# Запуск тестов с использованием tox:
tox -e py311-spark330 py312-spark322
Планы на будущее
И в заключение расскажу про наши планы. Мы хотим и дальше расширять совместимость нашего проекта — теперь уже не только с новыми версиями Spark, но и с более современными версиями JVM.
Сейчас пока что мы поддерживаем работу только с Java 11. Но начиная с версии 3.3 Spark также декларирует совместимость Java 17. Для Spark 4, который уже скоро выйдет, Java 17 будет минимально совместимой версией, наряду с Java 21. В настоящий момент SPYT не поддерживает запуск с использованием Java 17, но это в наших ближайших планах.
Кроме того, в рамках подготовки к скорому выходу Spark 4 нужно будет поддержать совместимость со Scala 2.13. Да и с самим Spark 4 мы хотим обеспечить совместимость как можно скорее после его официального выхода.
Выводы
Главный вывод, который хотелось бы сделать: к созданию форков популярных опенсорс‑библиотек нужно относиться с большой осторожностью. Перед созданием форка стоит подумать, есть ли какие‑то альтернативные пути добавления функциональности.
Ещё один важный вывод: при работе со сторонними библиотеками старайтесь использовать публичный интерфейс, потому что для него декларируются больше гарантий по широкой совместимости и будет проще мигрировать на новые версии.
Также не бойтесь пробовать нестандартные подходы. Но не увлекайтесь, чтобы вы сами потом через некоторое время могли в этом разобраться. Начните сначала с более традиционных способов расширения функциональности, и только потом, если они не подошли, переходите к более изощрённым.
И главное, не забывайте всё покрывать тестами, потому что, как правило, тесты дают нам гарантию стабильности и работоспособности при практически любых изменениях, какими бы глубокими они ни были.
Полезные ссылки
YTsaurus в опенсорс. У проекта также есть отдельный сайт, на котором доступны документация, блог со статьями и вебинарами, описание компонент, а также можно попробовать демо.
Наш код на github. Всё, про что написано в этой статье, доступно там. Ставьте звёздочки, чтобы не пропускать обновления.
Комьюнити‑чат YTsaurus в Telegram. Там вы можете задать вопросы по YTsaurus и SPYT.
И также телеграм‑канал Yandex Infrastructure — новости, анонсы, интервью и выступления нашей команды.