Как стать автором
Обновить
66.9
X5 Tech
Всё о технологиях в ритейле

Повышаем эффективность хранения данных до 300 раз с помощью таблиц SCD-2

Уровень сложностиПростой
Время на прочтение13 мин
Количество просмотров1.5K

Всем привет, меня зовут Василий. С 2021 года работаю в роли инженера данных, а в 2024 году присоединился к одной из продуктовых команд в Х5 Tech. За это время успел познакомиться с несколькими интересными проектами и подходами в области обработки данных, об одном из которых пойдет речь далее.

В этой статье расскажу о том, как можно повысить эффективность хранения данных за счет уменьшения их дублирования. 

Разберем, что из себя представляют Slowly Changing Dimensions-2 (далее SCD-2) таблицы и самостоятельно реализуем на PySpark алгоритм сохранения данных в них. Попутно поговорим о том, как находить изменения в любой таблице, даже если отсутствуют поля для выбора изменившихся записей, и научимся получать из созданной SCD-2 таблицы срезы на требуемую дату в прошлом.

Что касается используемых технологий, я считаю, что не так существенно, работаете ли вы всё еще на кластере Hadoop, храните данные в hdfs или в S3, использует Hive-таблицы или Aiceberg-таблицы, делаете обработку данных с помощью Spark, в Trino или просто в Pandas. Будет полезным понимать основы, перечисленные выше.

Но для начала, думаю, нужно сказать пару слов про сами SCD-2 таблицы. Если простыми словами – это специальные таблицы для сохранения истории изменений по каждой записи. За счет сохранения только изменяющихся данных мы убираем дублирование данных, имеющее место при хранении полных копий таблиц на каждую дату расчета. Таким образом, таблицы SCD-2 помогают в сотни раз сокращать объем хранимых данных.

Как работают SCD-2 таблицы

У нас на проекте есть реальная таблица, в которой каждый срез данных занимал около 4 Gb. Хранилась история за 12 месяцев, т.е. объем данных был 4 * 365 = 1460 Gb. После перевода на SCD-2 данные стали занимать всего около 5 Gb, т.е. удалось уменьшить объем данных в 292 раза.
Как будет выглядеть наша целевая SCD-2 таблица?

Основное отличие подобных таблиц от обычных таблиц – это наличие специальных полей:

  • valid_from_dt – дата начала действия новой версии записи;

  • valid_to_dt – дата окончания действия записи. Всегда будем заполнять его константой «9999-12-31», за исключением случаев, когда запись удаляется из исходной таблицы.

Итак, рассмотрим пример SCD-2 таблицы и разберемся, как в ней будет накапливаться история изменений по каждой записи.

primary_key

name

surname

has_child

has_cat

favorite_shop

valid_from_dt

valid_to_dt

123

Степан

П.

FALSE

FALSE

E345

2022-01-01

9999-12-31

123

Степан

P.

TRUE

FALSE

E345

2023-03-10

9999-12-31

123

Степан

P.

TRUE

TRUE

E345

2024-02-10

9999-12-31

123

Степан

P.

TRUE

TRUE

D123

2025-10-05

9999-12-31

111

Галина

С.

TRUE

TRUE

E255

2025-01-12

9999-12-31

111

Галина

С.

TRUE

TRUE

E255

2025-03-10

2025-03-09

В таблице выше мы видим данные по двум клиентам. У Степана история начинается с 2022-01-01. Тогда у него не было детей и домашних животных. Потом в 2023-03-10 у него появился ребенок, а 2024-2-10 он завел еще и котика. В 2025-10-05 он, видимо, куда-то переехал и у него сменился любимый магазин Х5 для покупок на D123.

История по Галине выглядит так. Она стала нашим клиентом 2025-01-12, а вот 2025-03-09 запись о ней была удалена в исходных системах, поэтому мы добавили запись, у которой valid_to_dt заполнен этой датой.

Если мы сделаем выборку записей, актуальных, например, на 2025-03-08 (получим срез данных), то увидим данные по Степану, как они отражены в строке с valid_from_dt = 2025-10-05 (последняя запись в истории изменений), по Галине состояние будет отражено в записи на 2025-01-12.

Если сделать выборку актуальных записей после 2025-03-10, то в ней уже не будет записи про Галину, останется только Степан.

Теперь разберем, какие данные подходят для хранения в SCD-2 таблицах? Например, у вас есть клиентские данные, которые обновляются несколько раз в месяц по небольшой доле клиентов. При этом вы хотите хранить историю всех изменений по каждому из них.

Еще хороший пример – таблица, в которой хранятся данные о предоставленных согласиях клиентов на получение коммуникаций по почте, sms и т.д. Мы в Х5 заботимся об этом, а потому бережно храним всю историю изменений, чтобы можно было провести анализ соблюдения контактной политики на любой день в прошлом. Как вы понимаете, данные в такой таблице тоже меняются довольно редко по незначительному числу записей.

Плохой пример – таблица, которую совсем не получится сжать, переписав в SCD-2 формат – это таблица с клиентами, по каждому из которых считаются какие-либо агрегированные метрики, например, средняя сумма покупки, число дней с последней покупки и т.д. Поскольку каждый срез будет полностью отличаться от предыдущего, то и сокращения объема хранимых данных мы совсем не получим.

Тут время прерваться и подумать – есть ли у вас подходящие данные, которые вы можете переписать в SCD-2 таблицы и сократить тем самым объем для хранения в 10-300 раз?
Итак, вы сделали анализ и поняли, что у вас есть подходящие данные, которые меняются не часто и вы хотели бы сложить их в SCD-2 таблицы для уменьшения объема для хранения. Тут возникает вопрос, как же это сделать?

Такой же вопрос возник и у меня примерно 1,5 года назад. Тогда, да и сегодня, мне не удалось найти готового решения, которое я смог бы использовать в экосистеме Hadoop. Поэтому мной был реализован несложный алгоритм сборки SCD-2 таблиц на PySpark, которым я и хочу поделиться с вами.

Прочитав статью, надеюсь, что вы без труда сможете его реализовать у себя или переписать под другой Spark API или framework, например, Pandas.

Несмотря на простоту самого алгоритма сборки SCD-2 таблиц, есть особенность, которую нужно учитывать при работе с файловой системой hdfs – неизменяемость записанных в нее данных. Если вы используете более продвинутые форматы хранения данных, позволяющие вносить изменения в существующие записи, то задача становится немного проще, но понимать, как находить дельту изменений без использования специальных полей и получать срез данных на заданную дату, всё равно нужно.

Замечу также, что данный алгоритм совсем не претендует на звание самого оптимального. Однако, это уже вторая версия его реализации. Ключевое отличие второй версии от первой в том, что сначала я пытался постоянно переписывать SCD-2 таблицу целиком для обновления в ней дат окончания действия у записей, а во второй – я всегда только добавляю записи в партицию на очередную дату расчета. Это позволило существенно ускорить работу алгоритма и сделать его намного проще.

Данный алгоритм был протестирован и работает в пилотном режиме на одном из наших проектов в Х5.

Надеюсь, что не слишком утомил вас вводной частью, но зато сэкономил время тем, для кого данная тема не слишком актуальна.

Что ж, давайте переходить к сути!

Итак, алгоритм сборки таблиц SCD-2 (назовем его так), на вход будет принимать срезы или по-другому снимки (snapshot’ы) данных. Это могут быть результаты расчетов витрин, которые вы делали ранее и сохраняли на каждую дату расчета. Или это могут быть загруженные данные из таблиц в системах-источниках, например, из вашей CRM-системы.

При первом запуске будет создана новая таблица в целевой схеме (target_db) на основе первого доступного среза данных. Далее изменения из каждого следующего среза будут автоматически добавляться, пока не будут обработаны все имеющиеся срезы данных. Если обработка срезов будет прервана по каким-то причинам, то при следующем запуске процесс продолжится автоматически cо следующего по очереди не обработанного среза данных.

На выходе из алгоритма мы получим готовую SCD-2 таблицу с историей изменений по каждой записи.

Теперь давайте разберем все детали алгоритма по шагам, чтобы вы смогли легко повторить его реализацию в своих ETL-инструментах.

В моем примере код реализован на PySpark и Python. Вся логика сборки собрана в функции main() модуля builder.py. Полную версию кода вы сможете найти на GitHub.

Шаг 1. Определяем первичный ключ в таблице-источнике.

Это состав колонок, которые мы будем использовать для однозначной идентификации записей в таблице-источнике. С помощью первичного ключа (далее ПК) мы будем находить новые и удаленные записи. Если вы не сможете определить ПК, то придется взять в качестве ПК все колонки.

Ваш составной ПК может состоять из одной колонки. При этом в ней не должно быть пустых значений, поскольку они будут все равно отброшены при соединении таблиц с помощью Spark.

На этом шаге можно сразу начать заполнение конфигурационного файла в модуле settings.py, в котором вы будете описывать параметры для каждой будущей таблицы SCD-2.

В моем примере конфигурационный файл – это модуль на Python, в котором есть класс Settings. Все параметры собраны в атрибут класса с типом dict для каждой SCD-2 таблицы по отдельности. Так удобнее импортировать все параметры в нужных местах далее.

class Settings:
    schema_name_table_name_hist = dict(
        start_date="2025-01-01",  # дата первого среза данных>
        valid_to_dt="9999-12-31",  # константа
        source_db="source_hive_db", # схема, где находятся срезы данных для обработки
        source_table="source_hive_table_name",  # имя таблицы-источника
        partition_column="dataflow_dt",  # имя колонки, по которой партицирована таблица с исходными данными
        primary_key=["name", "surname"],  # список колонок для формирования первичного ключа
        init_num_partitions=1,  # число файлов для сохранения данных исходной таблицы SCD-2
        num_partitions=10,  # число файлов для сохранения изменений
        target_db="my_hive_schema",  # схема для сохранения SCD-2 таблицы
        target_table="table_name_hist",  # имя целевой SCD-2 таблицы
        need_to_mask_pd=False,  # нужно ли заменят персональные данные на признаки
        pd_columns=[]  # список колонок, содержащих персональные данные
    )
settings = Settings()

Шаг 2. Создаем начальную таблицу SCD-2.

На данном шаге при первом запуске создается наша целевая таблица для хранения истории или мы получаем из нее дату последней загрузки данных.

Для этого мы делаем следующее:

  • проверяем, создана ли уже целевая SCD-2 таблица;

  • если нет, то берем самый первый срез данных на дату, которая указана в параметре start_date в настройках, добавляем в него служебные колонки valid_from_dt со значением как в start_date и valid_to_dt – константа «9999-12-31» (чтобы не оставлять это поле пустым);

  • записываем нашу будущую таблицу SCD-2 в нужное место.

Создание исходной таблицы SCD-2 у меня реализовано в функции create_init_table. Она не содержит ничего особенного, просто читаем нужные данные из таблицы-источника и переписываем их туда, где будет хранится целевая таблица.

Отмечу здесь важный момент – наша итоговая таблица SCD-2 будет партицирована (partitionBy) по датам загрузки (поле "dataflow_dt"). Это позволит нам в будущем переписывать данные за нужные даты в случае их изменения задним числом, как это иногда случается в реальной жизни, без перезаписи таблицы целиком.

На выходе мы получим записанную первоначальную таблицу SCD-2 или, если она уже была ранее создана, дату последнего расчета. Так мы сможем продолжить расчет при каждом новом запуске скрипта с момента, на котором остановились.

Шаг 3. Определяем, за какие даты у нас есть срезы для последующей обработки.

Пропускается, если у вас есть только один очередной срез данных.

Собирать SCD-таблицу удобно из готовых срезов на любые две даты, например, за t-1 (вчера) и t-2 (позавчера). 

Если у вас есть только один последний срез данных, то для получения предыдущего среза данных из таблицы SCD-2 нужно воспользоваться специальной функцией get_data_slice_on_target_date. 

Предыдущий срез нам нужен для нахождения изменений относительно него в новом срезе данных.

def get_data_slice_on_target_date(
    spark: SparkSession, scd_table_name: str, primary_key: list, target_dt: str
) -> DataFrame:
    scd_df = spark.table(scd_table_name)
    data_slice = scd_df \
        .where(F.col("valid_from_dt") <= F.to_date(F.lit(target_dt))) \
        .select(
            '*',
            F.row_number().over(
                Window.partitionBy(*primary_key).orderBy(F.col('valid_from_dt').desc())
            ).alias('row_number')
        ) \
        .where(F.col('row_number') == 1) \
        .where(F.col("valid_to_dt") >= F.to_date(F.lit(target_dt))) \
        .drop('row_number')
    return data_slice

При реализации функции выше важно учесть, что мы всегда только добавляем записи в SCD-2 таблицу, не меняя ранее добавленные записи в ней.

Функция принимает на вход имя таблицы SCD-2, список из колонок составного ПК (primary_key) и целевую дату (target_dt).

Внутри мы отбираем записи, которые начали действовать ДО даты target_dt включительно. Таким образом, если запись появилась после target_dt, то она будет отброшена, что нам и нужно.

Затем мы среди всех записей с одинаковым ПК отбираем самую последнюю, сортируя по дате valid_from_dt. Так мы получим последнюю версию записи на указанную дату.

В конце проверяем, чтобы запись не была удалена из таблицы до нашей целевой даты (where(F.col("valid_to_dt") >= F.to_date(F.lit(target_dt)))). Дело в том, что при удалении записей мы ничего не удаляем из SCD-2 таблицы, а добавляем запись с тем же ПК, но у нее будет заполнено поле valid_to_dt датой удаления записи в исходной таблице.

Бизнес-пользователям, скорее всего, будет удобнее использовать SQL-запрос, который можно выполнить в любом клиенте для работы с Hive или в Spark SQL.

select 
    *
from (
    select
        *,
        row_number() over (partition by {primary_key} order by valid_from_dt desc) as rn
    from 
        {target_db}.{target_table}
    where
        valid_from_dt <= '{target_dt}'               
) t    
where 
    rn = 1
    and valid_to_dt >= '{target_dt}'

Вместо параметров в {} нужно подставить соответствующие значения. 

Если же у вас есть таблица, в которой хранятся результаты расчетов по датам (есть партиции), как у меня в примере, то функция get_dates_to_process поможет получить список всех дат, на которые у нас есть партиции с расчетами.

Для этого лучше использовать команду для работы с hdfs, которую можно запускать стандартными средствами библиотеки subprocess для Python. Так будет работать быстрее, чем чтение всех партиций в таблице.

def get_dates_to_process(src_db: str, src_table: str) -> list:
    src_path = f"/apps/hive/warehouse/{src_db}.db/{src_table}"
    proc = Popen(f"hdfs dfs -ls {src_path}", shell=True, stdout=PIPE)
    std_out = proc.communicate()[0].decode('utf-8')
    date_list = sorted([x.split("=")[-1] for x in std_out.split("\n") if "=" in x])
    return date_list

На выходе данного шага мы получим список дат, на которые у нас есть готовые срезы данных в таблице-источнике.

Шаг 4. Подготавливаем предыдущий и новый срез данных для сравнения.

Вся логика этого шага реализована в функции append_delta_to_scd_table.

Сначала мы читаем предыдущий срез (если он у нас есть готовый в исходной таблице) или получаем его с помощь функции get_data_slice_on_target_date.

Далее мы добавляем наш составной ПК. Для этого используется функция add_composite_key, которая добавляет правильный композитный ПК.

Здесь важно отметить, что в составе ПК могут быть колонки, в которых будут пустые значения, которые нужно обязательно заменить на заглушки, подходящие по типам. В своей реализации я использую: *, 0, 0.0, False. 

Это важно для исключений случаев получения одинаковых строк после их конкатенации, которые изначально не являются одинаковыми. Например, у нас есть вот такая простая таблица из 4-х колонок:

Колонка 1

Колонка 2

Колонка 3

Колонка 4

CONCAT_WS

А

null

В

null

АВ

А

null

null

В

АВ

В результате конкатенации колонок в одну строку (см. колонку CONCAT_WS) мы на выходе получим одинаковые строки, хотя записи в таблице совсем не одинаковые изначально.

Поэтому нужно сделать замену пустых значений на *, например. 

Получаем в итоге: A*B* != A**B

В функции мы сначала делам копии колонок, чтобы не испортить исходные данные заменой пустых значений.

Затем заполняем пустые значения заглушками (fillna) и собираем составной ключ в колонку с именем «composite_key». В конце удаляем созданные нами копии колонок.

def add_composite_key(input_df: DataFrame, primary_key: list) -> DataFrame:
    composite_df = input_df
    for col_nm in primary_key:
        composite_df = composite_df.withColumn(f'{col_nm}_copy', F.col(col_nm))

    composite_df = composite_df \
        .fillna(0, primary_key) \
        .fillna('*', primary_key) \
        .fillna(0.0, primary_key) \
        .fillna(False, primary_key) \
        .select(
            '*',
            F.concat_ws('|', *primary_key).alias('composite_key')
        )
    composite_df = composite_df.drop(*primary_key)

    for col_nm in primary_key:
        composite_df = composite_df.withColumnRenamed(f'{col_nm}_copy', col_nm)

    return composite_df

Также мы добавляем технические поля tech_hash_id, valid_from_dt к тому же срезу данных. Для этого используется функция add_tech_columns.

def add_tech_columns(input_df: DataFrame, valid_from_dt: str, valid_to_dt: str, hash_columns: list) -> DataFrame:
    hashed_df = input_df \
        .fillna(0) \
        .fillna('*') \
        .fillna(0.0) \
        .fillna(False) \
        .select(
            'composite_key',
            F.concat_ws('|', *hash_columns).alias('concat_ws_col')
        ) \
        .select(
            'composite_key',
            F.md5(F.col('concat_ws_col')).alias('tech_hash_id'),
            F.to_date(F.lit(valid_from_dt)).alias('valid_from_dt'),
            F.to_date(F.lit(valid_to_dt)).alias('valid_to_dt')
        )
    return hashed_df

В колонке tech_hash_id будет храниться хеш-сумма по каждой строке. Она нам потребуется дальше, чтобы найти изменившиеся записи.

Перед хешированием записей все пустые значения тоже нужно предварительно заменить на любые не пустые значения подходящего типа, как мы это делали при сборке композитного ключа выше.

На выходе из шага мы получаем два Spark DataFrame: текущий (последний) срез данных и предыдущий срез, в которых добавлены все необходимые технические колонки.

Шаг 6. Заменяем персональные данные на признаки.

Пропускается, если в наших данных из предыдущего шага нет персональными данных, которые вы разметили в конфигурационном файле.

Если в вашей таблице есть персональные данные, которые вы хотите из нее убрать, то на этом шаге будет уместно сделать это, на мой взгляд. Например, это может пригодиться, если вы хотите иметь две таблицы: с персональными данными (в отдельной строго охраняемой схеме) и без них – в схеме с общими правилами управления доступом. 

Для этой задачи мной была реализована функция mask_personal_data, которая в колонки из вашего конфигурационного файла вместо реальных значений подставляет признак наличия значения или его отсутствия. 

def mask_personal_data(df: DataFrame, column_names: list) -> DataFrame:
    output = df
    for column_name in column_names:
        output = output.withColumn(column_name, 
            F.when(F.col(column_name).isNull(), False).otherwise(F.lit(True)).cast(BooleanType())
        )
    return output

Вы, конечно, можете сделать более сложную реализацию для маскирования персональных данных при необходимости.

На выходе мы получим два среза данных (предыдущий и последний)  но уже без персональных данных.

Шаг 7. Находим новые и удаленные записи.

Для этого срезы, полученные в предыдущем шаге, соединяются (join) между собой по колонке composite_key, что мы добавили в шаге 4. 

Для упрощения процесса поиска записей, которые появились в новом срезе или которых в нем уже больше нет, используется удобный тип соединения leftanti, который легко покажет нам все записи из левой таблицы, которых нет в правой таблице.

new_rows = current_pre_df \
        .join(previous_pre_df.select('composite_key'), on='composite_key', how='leftanti')

deleted_rows = previous_pre_df \
        .join(current_pre_df.select('composite_key'), on='composite_key', how='leftanti')

В коде выше:

  • current_pre_df – это предварительно подготовленный текущий или новый срез данных; 

  • previous_pre_df – предыдущий срез данных (t-2, как правило).

Шаг 8. Находим изменившиеся записи.

Поиск изменений мы будем делать, сравнивая хеш-суммы построчно для записей с одинаковыми ПК. При выборе алгоритма хеширования рекомендую обратить внимание на вероятность возникновения коллизий (когда для разных строк у нас будут получаться одинаковые хеш-суммы) и скорость его работы.

В качестве функции хеширования мной используется алгоритм md5, входящий в стандартный состав функций библиотеки pyspark.sql.

Шаг 9. Дописываем все найденные изменения в целевую SCD-2 таблицу.

Здесь мы объединяем (unionByName) новые, удаленные и изменившиеся записи в один общий датафрейм.

Напомню, что мы ничего не меняем в нашей SCD-2 таблице, а всегда только добавляем в нее изменения в очередную партицию с датой расчета. Это особенно актуально для hdfs и популярных форматов parquet и orc. 

Если вы используете, например, Aiceberg-таблицы (https://iceberg.apache.org/) или формат Delta Lake (https://docs.delta.io/latest/delta-intro.html), то здесь вы вполне можете сделать обновление дат окончания срок действия (valid_to_dt) для изменившихся и удаленных записей вместо добавления новых записей с тем же ПК. Так вы сможете еще уменьшить объем хранимых данных.

Шаг 10. Удаляем обработанные срезы данных.

Наиболее оптимальным решением, на мой взгляд, будет поддерживать некоторую глубину истории в виде привычных для пользователей полных срезов на каждую дату расчета. А вот данные, например, старше шести месяцев, можно уже складывать в SCD-2 таблицу, используя ее как архив. Так мы дополнительно обезопасим себя необходимости вносить изменения в SCD-2 и повысим удобство работы для пользователей.

В усложнении процедуры получения среза данных на нужную дату заключается основной недостаток таких таблиц. Но остальные преимущества намного более весомые, на мой взгляд.

Вот, в целом, и весь алгоритм. Остается повторить шаги 4-10 для каждого имеющегося у нас для обработки среза данных.

В заключении пару слов по поводу автоматизации сборки SCD-2 таблиц

Мы на нашем проекте регулярно считаем большое количество разных витрин. Сначала складываем каждый расчет в виде среза на дату расчета. 

Далее подходящие (медленно меняющиеся данные) сворачиваем в SCD-2 таблицы с помощь Airflow в отдельном общем DAG-е. Код для DAG-а есть также в репозитории.

Чтобы добавить новую таблицу в DAG сборки, достаточно заполнить параметры для нее в конфигурационном файле. 

Для включения автозамены персональных данных нужно параметр need_to_mask_pd сделать True и перечислить поля, содержащие персональные данные в параметре pd_columns.

Далее весть процесс обработки всех доступных срезов данных и загрузки изменений в SCD-2 происходит автоматизированно. 

Надеюсь, что статья оказалась для вас полезной? Теперь вы еще лучше знаете, как работать с таблицами SCD-2 и освоили их самостоятельную сборку.

Буду рад вашим отзывам и предложениям в комментариях по улучшению алгоритма. Думаю, что вместе мы сможем сделать его еще лучше.

Повторю, что полный код реализации вы сможете найти на GitHub.

Теги:
Хабы:
+4
Комментарии2

Публикации

Информация

Сайт
x5.tech
Дата регистрации
Дата основания
2006
Численность
свыше 10 000 человек
Местоположение
Россия