Привет, Хабр! Меня зовут Ольга Косарева, я инженер данных команды «Прогнозирование финансового результата» Центра разработки решений ALM в ИТ‑холдинге Т1, мы занимаемся созданием современной ALM‑системы (подробнее тут ).

Полтора года назад я пришла в команду и получила задачу дописать и внедрить инструмент для DDL‑операций над данными в экосистеме Hadoop. Моя первая реакция была: «А зачем так сложно? Какой инструмент? Почему нельзя просто выполнить команду ALTER TABLE через Hive?»

В этой статье мы с коллегами Никитой Королёвым и Алексеем Пожар расскажем, в каких случаях целесообразно именно так и сделать, а в каких это приведёт к различным проблемам с данными, что такое Schema Evolution и как мы решаем задачу периодического изменения структур таблиц с нашими отчётами.

Что такое Schema Evolution

Под термином Schema Evolution мы понимаем возможность безопасного изменения структуры данных во времени с сохранением функциональности и доступа к старым данным. Приводить к изменению структуры таблицы могут добавление, удаление, переименование колонок или изменение типа данных. Может показаться, что это решается одной операцией ALTER TABLE, и в реляционных базах данных вроде PostgreSQL так и есть: команда в одну строчку запустит изменения и таблица мгновенно поменяется, старые данные обновятся и приложение продолжит работать.

Но в экосистеме Hadoop такой подход натыкается на ряд ограничений. Это связано с тем, как хранятся данные:

  • метаданные — в Hive Metastore;

  • файлы — на HDFS (Parquet, ORC, Avro и так далее).

Через ALTER TABLE мы можем добавить или удалить колонки, изменить их имя и тип. Но при этом Hive поменяет только метаданные таблицы, сами данные на HDFS останутся без изменения. Ниже приводим сами команды:

ALTER TABLE {table_name} ADD COLUMNS ({column_name} {column_type}); — добавление колонки

ALTER TABLE {table_name} CHANGE COLUMN {column_name_old} {column_name_new} {column_type}; — переименование колонки или изменение типа

ALTER TABLE {table_name} REPLACE COLUMNS (column_name column_type); — задание нового списка колонок, команду можно использовать для удаления колонок

Особенности стратегии Schema-on-Read

Такой подход, когда изменяют только схему данных, а физическое представление в Parquet‑файлах остаётся неизменным, называется Schema‑on‑Read. К его достоинствам можно отнести простоту: нужно только выполнить команду ALTER TABLE через DBeaver или утилиту beeline. Не нужно разбираться в исходных шаблонах и схеме данных, ETL может представлять собой простое копирование файлов без изменений и преобразований. Быстрая загрузка структурированных и неструктурированных данных, возможны различные способы интерпретации данных через наложение различной схемы с метаданными.

Но у подхода есть и недостатки. Мы провели тестирование: команды по изменению таблиц запускали в DBeaver, данные пробовали считать через Hive (DBeaver) и через Spark в Jupyter (командой spark.table(“table_name”).show()). Все таблицы, с которыми мы работаем, — внешние, формат хранения — Parquet, с партициями и без. Результаты описаны в таблице 1.

Сразу отметим, что изменить колонки партиционирования при данном подходе невозможно, так как это часть пути в HDFS, и на практике таблицу пересоздают с новой требуемой схемой. Наличие партиционирования не влияет на результат, однако команды по изменению типа или удалению поля могут не отрабатывать или приводить к ошибкам чтения через Hive, что можно исправить добавлением ключевого слова CASCADE, с помощью которого достигается единство метаданных таблицы и партиций.

Таблица 1: тестирование подхода Schema‑on‑Read

Команда

Данные через Hive

Данные через Spark

ADD COLUMN

По умолчанию новое поле со значением null

RENAME COLUMN

Значение заменено на null (потеря значения)

CAST COLUMN

Значение или null в зависимости от cast

Ошибка («parquet column cannot be converted…»)

DROP COLUMN

Без ошибок, ожидаемый результат

Отдельно мы проверили работу с колонками типа Struct<>: при работе с партиционированными таблицами команда по удалению поля внутри колонки Struct<> также требовала использования ключевого слова CASCADE для корректного отображения данных через Hive. Другие команды дали ожидаемые результаты.

В экспериментах мы наблюдали три возможных сценария:

  • корректное отображение данных через Hive и Spark;

  • ошибка при чтении данных Spark, чтение в Hive без ошибок, но возможна замена значения на null;

  • тихая замена значения на null.

В случае удаления или переименования колонок данные не «пропадают», в Parquet‑файлах они существуют под старыми именами и могут быть получены в первоначальном виде командой spark.read.parquet("путь").

Достоинства и недостатки подхода Schema-on-Write

Есть стратегия, которая позволяет поддерживать полное соответствие между данными и метаданными — это Schema‑on‑Write. Её достоинства — быстрое и предсказуемое чтение, быстрый анализ данных и их повышенная согласованность. Но в крайнем случае при изменении схемы приходится перезаписывать всю таблицу, требуются дополнительные манипуляции с данными перед записью, возможно работать только со структурированными данными, меньше гибкости в интерпретации данных — они были изменены и структурированы для определённой ограниченной цели.

Реализовать стратегию Schema‑on‑Write можно с помощью Hive командой INSERT OVERWRITE. Если это происходит нечасто, количество данных небольшое и структура несложная, то это простой и доступный способ. Но если данных много — терабайты, — то через Hive не подойдёт: медленно, сложно управлять потребляемыми ресурсами кластера, нет системы бэкапов и откатов в случае падения.

При работе с партиционированными таблицами схема может меняться от релиза к релизу. Если старые партиции не будут переписываться, это приведёт к накоплению в HDFS партиций с разной структурой. При дальнейшем чтении таких данных Hive и Spark будут сопоставлять метаданные со структурой самих файлов. Тогда при анализе результатов нужно будет отвечать на вопрос: значение не посчиталось по каким‑либо причинам, либо его не существует в самих данных (и это нормально), либо оно было заменено на null. Если таблицу нужно в дальнейшем читать Spark командой spark.read.parquet, а партиции имеют разную схему, то опция mergeSchema=true поможет Spark попробовать собрать единую схему из всех файлов, чтобы прочитать данные. Это может приводить не к тем результатам, какие вы ожидали, например если переименовали колонку. В этом случае Spark покажет сразу оба поля. Кроме того, это может быть достаточно «дорогая» операция с точки зрения времени и ресурсов.

Schema-on-Write на практике: наше решение

Что мы имеем и чего хотим

Все пункты, которые мы описали, превращают задачку по изменению структуры таблицы в полноценный ETL‑процесс. Мы поделимся своим решением и идеями, а сейчас немного контекста.

У нас релизы один раз в два месяца и несколько десятков таблиц с отчётами и справочниками. Любая доработка, которая приводит к изменению схемы таблицы, долгое время была проблемой: мы просили поддержку удалить таблицу в продуктовом контуре, запускали расчёт, он сам создавал новую схему и сохранял текущий результат. При этом удалялись исторические данные, и мы не могли ответить на вопросы о предыдущих расчётах, если те возникали.

Наши отчёты достаточно большие, чтобы делать изменения через Hive (до 7–8 Тб), есть потребность уметь изменять имя и тип колонок, поэтому нужно было какое‑то решение по стратегии Schema‑on‑Write с использованием Spark:

  • распределённые вычисления;

  • с контролируемым количеством ресурсов;

  • с надёжной системой отката в случае ошибки;

  • с сохранением версионности;

  • на основе доступных в контуре банка на период разработки инструментов — Spark, SQL, Python.

Сразу отметим, что варианты на основе Delta Lake и Apache Iceberg мы не рассматривали: Delta Lake в компании тестировали, но в продуктив он не прошёл, а Apache Iceberg в качестве доступного и согласованного внутри банка инструмента на момент разработки не было.

Архитектура нашего мигратора

Мы разработали свой универсальный инструмент — мигратор, которым можно делать все нужные нам DDL‑операции; история миграций хранится в отдельной таблице; изменения подаются максимально наглядно и унифицировано; в случае падения в процессе мы не потеряем данные, а бизнес может начинать делать новые расчёты максимально быстро. На рисунке 1 представлена схема работы нашего мигратора.

Рисунок 1. Схема работы мигратора
Рисунок 1. Схема работы мигратора

В качестве исходных данных мы используем файлы с расширением.yaml (примеры на рис.2), которые мы называем «ревизиями» (вдохновлялись библиотекой для управления миграциями Alembic). В данном случае можно использовать любой формат, в котором можно в читаемом виде передать информацию: тип действия, название колонки, тип колонки и т. д. Мы выбрали YAML, а не JSON, просто как более читаемый.

Рисунок 2. Примеры входных данных
Рисунок 2. Примеры входных данных

Через путь к ревизии (например, src/migrations/v1.1.0/migrate.example1/revision1.yaml) мы передаём номер версии релиза, название схемы и таблицы, которую необходимо изменить. Далее нам это понадобится при формировании патчей.

На основе YAML‑файлов мы получаем список ревизий. Сущность «ревизия» характеризуется набором трансформаций (actions), названием и номером релиза. Трансформация в данном случае — это «атомарное» изменение датафрейма, например, добавление колонки или изменение типа данных. Каждая трансформация получает на вход датафрейм, проводит изменение и возвращает новый датафрейм. В случае добавления колонки поле по умолчанию заполняется null.

Трансформация может выглядеть так:

 @dataclass
 class ActionDropColumn(ActionABC):
     name: str
 
     def apply(self, df: DataFrame) -> DataFrame:
         if self.name not in df.columns:
             LOG.warning(f"skipping 'drop column {self.name}'. This column not exists.")
             return df
         return df.drop(self.name)

Из сгруппированных по сочетанию названий схемы и таблицы ревизий мы получаем патчи, которые характеризуются атрибутами: схема, таблица, список ревизий. С помощью таблицы с историей миграции (пример на рис. 3) мы проверяем, какие ревизии уже были применены к таблице, и фильтруем их.

Рисунок 3. Пример таблицы с историей миграции
Рисунок 3. Пример таблицы с историей миграции

Для всех необходимых нам DDL‑операций мы определяем пять стратегий: две описывают действия с колонками, три определяют работу с таблицами: создание, удаление и переименование. Каждая стратегия характеризуется своим типом и содержит метод run, в котором мы применяем патчи к таблице и сохраняем результат, а также пополняем таблицу с историей миграции.

Ниже показан пример стратегии переименования таблицы. Ревизия в таком случае содержит ровно одну трансформацию с новым именем таблицы:

 class StrategyRenameTable(StrategyABC):
     revision_type = RevisionRenameTable
     def run(self, spark: SparkSession, patch: Patch, history: HistoryABC) -> None:
         # Для RevisionRenameTable ожидаем одну ревизию и одну трансформацию внутри неё
         revision = patch.revisions[0]
         rename_op: ActionRenameTable = revision.actions[0]
         rename_table(
             spark,
             schema=patch.schema,
             old_name=patch.table,
             new_name=rename_op.new_name,
         )
         history.add_entry(patch)

При работе с колонками в существующей таблице мы учитываем ещё два фактора: наличие партиционирования и готовность к откату в случае ошибки. Наш мигратор устроен так, что можно использовать либо существующие колонки партиционирования, либо задавать новые. Перед применением патчей мы создаём резервную копию таблицы на случай падения. И здесь проявляется недостаток нашей реализации подхода Schema-on-Write: на период обработки таблицы мы храним её бэкап.

В качестве оркестратора мы используем Apache Airflow, где задачи определяют наши стратегии (рис. 4). Выделение отдельных стратегий migrate_create_schema и migrate_backfill для описания работы с колонками позволяют нам быстро предоставить бизнесу изменённую схему для запуска новых расчётов, а подкладывать историю мы можем параллельно с новыми расчётами с помощью записи в разные партиции. В каждой задаче есть логика отката: при ошибке мы возвращаем первоначальное состояние таблиц, а при корректной работе пополняется таблица с историей миграций и удаляется бэкап.

Приводим вариант реализации метода main из задачи по созданию обновлённой схемы таблицы (migrate_create_table). Каждая Airflow-задача запускает конкретную стратегию через общий метод migrate, внутри которого собираются патчи и применяются к таблицам.

 @classmethod
 def main(cls, spark: SparkSession, conf: Config) -> None:

         LOG.info("Start strategy: create schema")
         strategy = StrategyDDLSchema()
 
     try:
         migrate(spark, conf, strategy)
     except Exception:
         LOG.error("Create schema failed. Restoring tables from backup.")
         rename_backup_to_origin_tables(spark, conf)
         raise
Рисунок 4. DAG мигратора в Apache Airflow
Рисунок 4. DAG мигратора в Apache Airflow

Мигратор — это наше кастомное решение, которое хорошо закрывает потребности и разработано на основе простых и повсеместно доступных инструментов. С недавнего времени в нашей компании стал доступен Apache Iceberg, мы планируем перевод таблиц и трансформацию мигратора, так что остаёмся на связи.

Резюмируем

Выбор подхода к Schema Evolution зависит от различных факторов: периодичности изменения таблиц, что именно нужно поменять, какие инструменты есть в доступе и какие требования к итоговому результату.

Мы предлагаем рассуждать следующим образом:

  • Разовое изменение при относительно небольшом объёме данных? Рассмотрите возможность использования команды INSERT OVERWRITE через Hive.

  • Нужно добавлять или удалять столбцы? Можно изменять метаданные по стратегии Schema‑on‑Read, контролируя способ обращения к данным через Spark.

  • Нужно менять имя или тип колонки? Schema‑on‑Write, можно ориентироваться на архитектуру нашего решения, учитывая его недостатки.

Приглашаем вас в комментариях делиться своими подходами к Schema Evolution в Hadoop или историями, когда вы внезапно теряли данные после изменения схемы таблицы!