
Привет, Хабр! Меня зовут Ольга Косарева, я инженер данных команды «Прогнозирование финансового результата» Центра разработки решений ALM в ИТ‑холдинге Т1, мы занимаемся созданием современной ALM‑системы (подробнее тут ).
Полтора года назад я пришла в команду и получила задачу дописать и внедрить инструмент для DDL‑операций над данными в экосистеме Hadoop. Моя первая реакция была: «А зачем так сложно? Какой инструмент? Почему нельзя просто выполнить команду ALTER TABLE через Hive?»
В этой статье мы с коллегами Никитой Королёвым и Алексеем Пожар расскажем, в каких случаях целесообразно именно так и сделать, а в каких это приведёт к различным проблемам с данными, что такое Schema Evolution и как мы решаем задачу периодического изменения структур таблиц с нашими отчётами.
Что такое Schema Evolution
Под термином Schema Evolution мы понимаем возможность безопасного изменения структуры данных во времени с сохранением функциональности и доступа к старым данным. Приводить к изменению структуры таблицы могут добавление, удаление, переименование колонок или изменение типа данных. Может показаться, что это решается одной операцией ALTER TABLE, и в реляционных базах данных вроде PostgreSQL так и есть: команда в одну строчку запустит изменения и таблица мгновенно поменяется, старые данные обновятся и приложение продолжит работать.
Но в экосистеме Hadoop такой подход натыкается на ряд ограничений. Это связано с тем, как хранятся данные:
метаданные — в Hive Metastore;
файлы — на HDFS (Parquet, ORC, Avro и так далее).
Через ALTER TABLE мы можем добавить или удалить колонки, изменить их имя и тип. Но при этом Hive поменяет только метаданные таблицы, сами данные на HDFS останутся без изменения. Ниже приводим сами команды:
ALTER TABLE {table_name} ADD COLUMNS ({column_name} {column_type}); — добавление колонки
ALTER TABLE {table_name} CHANGE COLUMN {column_name_old} {column_name_new} {column_type}; — переименование колонки или изменение типа
ALTER TABLE {table_name} REPLACE COLUMNS (column_name column_type); — задание нового списка колонок, команду можно использовать для удаления колонок
Особенности стратегии Schema-on-Read
Такой подход, когда изменяют только схему данных, а физическое представление в Parquet‑файлах остаётся неизменным, называется Schema‑on‑Read. К его достоинствам можно отнести простоту: нужно только выполнить команду ALTER TABLE через DBeaver или утилиту beeline. Не нужно разбираться в исходных шаблонах и схеме данных, ETL может представлять собой простое копирование файлов без изменений и преобразований. Быстрая загрузка структурированных и неструктурированных данных, возможны различные способы интерпретации данных через наложение различной схемы с метаданными.
Но у подхода есть и недостатки. Мы провели тестирование: команды по изменению таблиц запускали в DBeaver, данные пробовали считать через Hive (DBeaver) и через Spark в Jupyter (командой spark.table(“table_name”).show()). Все таблицы, с которыми мы работаем, — внешние, формат хранения — Parquet, с партициями и без. Результаты описаны в таблице 1.
Сразу отметим, что изменить колонки партиционирования при данном подходе невозможно, так как это часть пути в HDFS, и на практике таблицу пересоздают с новой требуемой схемой. Наличие партиционирования не влияет на результат, однако команды по изменению типа или удалению поля могут не отрабатывать или приводить к ошибкам чтения через Hive, что можно исправить добавлением ключевого слова CASCADE, с помощью которого достигается единство метаданных таблицы и партиций.
Таблица 1: тестирование подхода Schema‑on‑Read
Команда | Данные через Hive | Данные через Spark |
ADD COLUMN | По умолчанию новое поле со значением null | |
RENAME COLUMN | Значение заменено на null (потеря значения) | |
CAST COLUMN | Значение или null в зависимости от cast | Ошибка («parquet column cannot be converted…») |
DROP COLUMN | Без ошибок, ожидаемый результат | |
Отдельно мы проверили работу с колонками типа Struct<>: при работе с партиционированными таблицами команда по удалению поля внутри колонки Struct<> также требовала использования ключевого слова CASCADE для корректного отображения данных через Hive. Другие команды дали ожидаемые результаты.
В экспериментах мы наблюдали три возможных сценария:
корректное отображение данных через Hive и Spark;
ошибка при чтении данных Spark, чтение в Hive без ошибок, но возможна замена значения на null;
тихая замена значения на null.
В случае удаления или переименования колонок данные не «пропадают», в Parquet‑файлах они существуют под старыми именами и могут быть получены в первоначальном виде командой spark.read.parquet("путь").
Достоинства и недостатки подхода Schema-on-Write
Есть стратегия, которая позволяет поддерживать полное соответствие между данными и метаданными — это Schema‑on‑Write. Её достоинства — быстрое и предсказуемое чтение, быстрый анализ данных и их повышенная согласованность. Но в крайнем случае при изменении схемы приходится перезаписывать всю таблицу, требуются дополнительные манипуляции с данными перед записью, возможно работать только со структурированными данными, меньше гибкости в интерпретации данных — они были изменены и структурированы для определённой ограниченной цели.
Реализовать стратегию Schema‑on‑Write можно с помощью Hive командой INSERT OVERWRITE. Если это происходит нечасто, количество данных небольшое и структура несложная, то это простой и доступный способ. Но если данных много — терабайты, — то через Hive не подойдёт: медленно, сложно управлять потребляемыми ресурсами кластера, нет системы бэкапов и откатов в случае падения.
При работе с партиционированными таблицами схема может меняться от релиза к релизу. Если старые партиции не будут переписываться, это приведёт к накоплению в HDFS партиций с разной структурой. При дальнейшем чтении таких данных Hive и Spark будут сопоставлять метаданные со структурой самих файлов. Тогда при анализе результатов нужно будет отвечать на вопрос: значение не посчиталось по каким‑либо причинам, либо его не существует в самих данных (и это нормально), либо оно было заменено на null. Если таблицу нужно в дальнейшем читать Spark командой spark.read.parquet, а партиции имеют разную схему, то опция mergeSchema=true поможет Spark попробовать собрать единую схему из всех файлов, чтобы прочитать данные. Это может приводить не к тем результатам, какие вы ожидали, например если переименовали колонку. В этом случае Spark покажет сразу оба поля. Кроме того, это может быть достаточно «дорогая» операция с точки зрения времени и ресурсов.
Schema-on-Write на практике: наше решение
Что мы имеем и чего хотим
Все пункты, которые мы описали, превращают задачку по изменению структуры таблицы в полноценный ETL‑процесс. Мы поделимся своим решением и идеями, а сейчас немного контекста.
У нас релизы один раз в два месяца и несколько десятков таблиц с отчётами и справочниками. Любая доработка, которая приводит к изменению схемы таблицы, долгое время была проблемой: мы просили поддержку удалить таблицу в продуктовом контуре, запускали расчёт, он сам создавал новую схему и сохранял текущий результат. При этом удалялись исторические данные, и мы не могли ответить на вопросы о предыдущих расчётах, если те возникали.
Наши отчёты достаточно большие, чтобы делать изменения через Hive (до 7–8 Тб), есть потребность уметь изменять имя и тип колонок, поэтому нужно было какое‑то решение по стратегии Schema‑on‑Write с использованием Spark:
распределённые вычисления;
с контролируемым количеством ресурсов;
с надёжной системой отката в случае ошибки;
с сохранением версионности;
на основе доступных в контуре банка на период разработки инструментов — Spark, SQL, Python.
Сразу отметим, что варианты на основе Delta Lake и Apache Iceberg мы не рассматривали: Delta Lake в компании тестировали, но в продуктив он не прошёл, а Apache Iceberg в качестве доступного и согласованного внутри банка инструмента на момент разработки не было.
Архитектура нашего мигратора
Мы разработали свой универсальный инструмент — мигратор, которым можно делать все нужные нам DDL‑операции; история миграций хранится в отдельной таблице; изменения подаются максимально наглядно и унифицировано; в случае падения в процессе мы не потеряем данные, а бизнес может начинать делать новые расчёты максимально быстро. На рисунке 1 представлена схема работы нашего мигратора.

В качестве исходных данных мы используем файлы с расширением.yaml (примеры на рис.2), которые мы называем «ревизиями» (вдохновлялись библиотекой для управления миграциями Alembic). В данном случае можно использовать любой формат, в котором можно в читаемом виде передать информацию: тип действия, название колонки, тип колонки и т. д. Мы выбрали YAML, а не JSON, просто как более читаемый.

Через путь к ревизии (например, src/migrations/v1.1.0/migrate.example1/revision1.yaml) мы передаём номер версии релиза, название схемы и таблицы, которую необходимо изменить. Далее нам это понадобится при формировании патчей.
На основе YAML‑файлов мы получаем список ревизий. Сущность «ревизия» характеризуется набором трансформаций (actions), названием и номером релиза. Трансформация в данном случае — это «атомарное» изменение датафрейма, например, добавление колонки или изменение типа данных. Каждая трансформация получает на вход датафрейм, проводит изменение и возвращает новый датафрейм. В случае добавления колонки поле по умолчанию заполняется null.
Трансформация может выглядеть так:
@dataclass class ActionDropColumn(ActionABC): name: str def apply(self, df: DataFrame) -> DataFrame: if self.name not in df.columns: LOG.warning(f"skipping 'drop column {self.name}'. This column not exists.") return df return df.drop(self.name)
Из сгруппированных по сочетанию названий схемы и таблицы ревизий мы получаем патчи, которые характеризуются атрибутами: схема, таблица, список ревизий. С помощью таблицы с историей миграции (пример на рис. 3) мы проверяем, какие ревизии уже были применены к таблице, и фильтруем их.

Для всех необходимых нам DDL‑операций мы определяем пять стратегий: две описывают действия с колонками, три определяют работу с таблицами: создание, удаление и переименование. Каждая стратегия характеризуется своим типом и содержит метод run, в котором мы применяем патчи к таблице и сохраняем результат, а также пополняем таблицу с историей миграции.
Ниже показан пример стратегии переименования таблицы. Ревизия в таком случае содержит ровно одну трансформацию с новым именем таблицы:
class StrategyRenameTable(StrategyABC): revision_type = RevisionRenameTable def run(self, spark: SparkSession, patch: Patch, history: HistoryABC) -> None: # Для RevisionRenameTable ожидаем одну ревизию и одну трансформацию внутри неё revision = patch.revisions[0] rename_op: ActionRenameTable = revision.actions[0] rename_table( spark, schema=patch.schema, old_name=patch.table, new_name=rename_op.new_name, ) history.add_entry(patch)
При работе с колонками в существующей таблице мы учитываем ещё два фактора: наличие партиционирования и готовность к откату в случае ошибки. Наш мигратор устроен так, что можно использовать либо существующие колонки партиционирования, либо задавать новые. Перед применением патчей мы создаём резервную копию таблицы на случай падения. И здесь проявляется недостаток нашей реализации подхода Schema-on-Write: на период обработки таблицы мы храним её бэкап.
В качестве оркестратора мы используем Apache Airflow, где задачи определяют наши стратегии (рис. 4). Выделение отдельных стратегий migrate_create_schema и migrate_backfill для описания работы с колонками позволяют нам быстро предоставить бизнесу изменённую схему для запуска новых расчётов, а подкладывать историю мы можем параллельно с новыми расчётами с помощью записи в разные партиции. В каждой задаче есть логика отката: при ошибке мы возвращаем первоначальное состояние таблиц, а при корректной работе пополняется таблица с историей миграций и удаляется бэкап.
Приводим вариант реализации метода main из задачи по созданию обновлённой схемы таблицы (migrate_create_table). Каждая Airflow-задача запускает конкретную стратегию через общий метод migrate, внутри которого собираются патчи и применяются к таблицам.
@classmethod def main(cls, spark: SparkSession, conf: Config) -> None: LOG.info("Start strategy: create schema") strategy = StrategyDDLSchema() try: migrate(spark, conf, strategy) except Exception: LOG.error("Create schema failed. Restoring tables from backup.") rename_backup_to_origin_tables(spark, conf) raise

Мигратор — это наше кастомное решение, которое хорошо закрывает потребности и разработано на основе простых и повсеместно доступных инструментов. С недавнего времени в нашей компании стал доступен Apache Iceberg, мы планируем перевод таблиц и трансформацию мигратора, так что остаёмся на связи.
Резюмируем
Выбор подхода к Schema Evolution зависит от различных факторов: периодичности изменения таблиц, что именно нужно поменять, какие инструменты есть в доступе и какие требования к итоговому результату.
Мы предлагаем рассуждать следующим образом:
Разовое изменение при относительно небольшом объёме данных? Рассмотрите возможность использования команды
INSERT OVERWRITEчерез Hive.Нужно добавлять или удалять столбцы? Можно изменять метаданные по стратегии Schema‑on‑Read, контролируя способ обращения к данным через Spark.
Нужно менять имя или тип колонки? Schema‑on‑Write, можно ориентироваться на архитектуру нашего решения, учитывая его недостатки.
Приглашаем вас в комментариях делиться своими подходами к Schema Evolution в Hadoop или историями, когда вы внезапно теряли данные после изменения схемы таблицы!
