Привет, Хабр!
Сегодня я расскажу, как с помощью Liquibase, GitLab и немного Python настроить прозрачный, безопасный и удобный процесс миграций для ClickHouse кластера.


Для Clickhouse кластера создать прозрачный инструмент для сборки, проверки и доставки миграций на окружения, исключив ручные изменения в БД.
Возможность отката миграций в один клик.
Простота: оставить разработчикам возможность писать миграции на привычном SQL.
Liquibase
Liquibase — универсальный инструмент для работы с миграциями, который поддерживает множество БД. Но есть только один маленький нюанс (как всегда) – из коробки Liquibase не умеет работать с ClickHouse кластером. Для решения этой проблемы был найден плагин Liquibase-ClickHouse от Arenadata. Плагин добавляет поддержку кластера, Liquibase создаст собственные таблицы как реплицируемые, добавляется возможность использовать ON CLUSTER в SQL скриптах.
Единственная проблема, которую не решает плагин – поддержка shared для системных таблиц Liquibase. Если ваш кластер, как и наш, настроен с несколькими shared, то имейте в виду, что системные таблицы Liquibase будут созданы лишь на том шарде и его нодах, к которому было создано подключение.
Давайте рассмотрим, как же работает процесс создания миграций с помощью Liquibase.
Liquibase использует файл changelog.xml для управления миграциями. Каждая миграция записывается как отдельный changeSet.
Плагин Liquibase-ClickHouse Arenadata обеспечивает поддержку SQL-операций с кластером, а путь до SQL-скриптов миграций можно описать changelog.xml с помощью конструкции sqlFile path="path/example_path". Таким образом, мы оставляем разработчикам возможность использовать знакомый SQL вместо изучения специфического формата Liquibase.
С помощью конструкции <tagDatabase tag="1"> мы можем фиксировать версию изменений, а значит, в дальнейшем сможем до нее откатиться.
Чтобы иметь возможность сделать rollback, после добавления скрипта создания миграций с помощью sqlFile добавляем в каждый changeSet блок rollback, в котором указываем путь до такого же sql скрипта, но уже с описанием процесса отката.
Приведу пример changelog.xml файла, который обеспечивает все описанные выше пункты.
<?xml version="1.0" encoding="UTF-8"?> <databaseChangeLog xmlns="http://www.liquibase.org/xml/ns/dbchangelog" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.liquibase.org/xml/ns/dbchangelog http://www.liquibase.org/xml/ns/dbchangelog/dbchangelog-3.8.xsd"> <changeSet id="1" author="ivanov"> <tagDatabase tag="1"/> </changeSet> <changeSet id="2" author="petrov"> <sqlFile path="migration/1_create_first_table.sql" relativeToChangelogFile="true"/> <rollback> <sqlFile path="rollback/1_delete_first_table.sql" relativeToChangelogFile="true"/> </rollback> </changeSet> <changeSet id="3" author="petrov"> <tagDatabase tag="2"/> </changeSet> </databaseChangeLog>
GitLab CI/CD: автоматизация миграций
Весь процесс доставки миграций ложится на плечи через GitLab CI/CD. Полный flow Pipeline показывать не будем, остановимся подробнее на 3х stage, которые непосредственно относятся к теме статьи.
Validate ChangeLog.xml
Так как changelog.xml разработчики заполняют вручную, нужно предотвратить возможные ошибки.
Что валидируем? Разберем на примере changelog.xml, который я привел выше.
Уникальность и правильную последовательность changeSet ID

Уникальность и валидность (все теги целое число, без sevmer и т.д.) тегов tagDatabase

Наличие блока
<rollback>для каждого changeSet.
Так как за все сложные части CICD процессов у нас отвечает cli утилита на Python, то и валидацию changelog.xml было доверено проводить ей. Часть, которая отвечает за валидацию может быть полезна:
код туть
import logging from enum import StrEnum from functools import cached_property from lxml import etree class ValidateUniqIdsError(Exception): def __str__(self): return ( "Валидация уникальности changeset ID не пройдена! " "Проверьте, что все id changeset'ов являются уникальными." ) class ValidateChangesetIdsAscendingError(Exception): def __str__(self): return ( "Валидация корректной последовательности ID changeset бд не пройдена! " "Проверьте, что все ID указаны в порядке возрастания." ) class ValidateTagVersionsAscendingError(Exception): def __str__(self): return ( "Валидация правильной последовательности tag version схемы БД не пройдена! Проверьте, что все tag " "version указаны в порядке возрастания." ) class ValidateUniqTagVersionsError(Exception): def __str__(self): return ( "Валидация уникальности tag version схемы БД не пройдена. Проверьте, что все tag " "version являются уникальными." ) class ValidateAreValuesIncrementingByOneError(Exception): def __init__(self, tag_id: int): self.tag_id = tag_id def __str__(self): return ( f"TAG: {self.tag_id} больше предыдущего TAG'а более чем на 1. " f"Из-за этого будет невозможно выполнить корректный rollback до предыдущей версии БД." ) class ValidateExistenceRollbackInChangesetError(Exception): def __init__(self, changeset_id: int): self.changeset_id = changeset_id def __str__(self): return ( f"в ChangeSet c ID: {self.changeset_id} необходимо добавить <rollback></rollback> " f"блок с SQL скриптом отката таблицы." ) class LiquiShit(StrEnum): common = "{http://www.liquibase.org/xml/ns/dbchangelog}" rollback = f"{common}rollback" sql_file = f"{common}sqlFile" changeset = f"{common}changeSet" tag_database = f"{common}tagDatabase" class LiquibaseChangelogWorker: def __init__(self, changelog_filename: str): self.changelog_filename = changelog_filename @cached_property def xml_tree(self): return etree.parse(self.changelog_filename).getroot() @cached_property def all_changesets(self): return self.xml_tree.findall(LiquiShit.changeset) @cached_property def all_tags(self): return [ int(changeset[0].get("tag")) for changeset in self.all_changesets if changeset.findall(LiquiShit.tag_database) ] @cached_property def all_sql_changesets(self): return [ changeset for changeset in self.all_changesets if changeset.findall(LiquiShit.sql_file) ] @cached_property def all_changeset_ids(self) -> list: return [ int(changeset.attrib.get("id")) for changeset in self.all_changesets ] def get_last_tag(self) -> str: return str(self.all_tags[-1]) def validate_uniq_ids(self) -> None: if len(self.all_changeset_ids) == len(set(self.all_changeset_ids)): logging.info("Валидация uniq changeset ID прошла успешно!") return raise ValidateUniqIdsError def validate_changeset_ids_ascending(self): if self.all_changeset_ids == sorted(self.all_changeset_ids): logging.info( "Валидация корректной последовательности ID changeset прошла успешно!" ) return raise ValidateChangesetIdsAscendingError def validate_tag_versions_ascending(self) -> None: if self.all_tags == sorted(self.all_tags): logging.info( "Валидация последовательности tag version схемы БД прошла успешно!" ) return raise ValidateTagVersionsAscendingError def validate_uniq_tag_versions(self) -> None: if len(self.all_tags) == len(set(self.all_tags)): logging.info( "Валидация уникальности tag version схемы БД прошла успешно!" ) return raise ValidateUniqTagVersionsError def validate_are_values_incrementing_by_one(self) -> None: for index, _tag in enumerate(self.all_tags[:-1]): if self.all_tags[index] + 1 != self.all_tags[index + 1]: raise ValidateAreValuesIncrementingByOneError( self.all_tags[index + 1] ) logging.info( "Валидация на корректное повышение tag version прошла успешно!" ) def validate_existence_rollback_in_changeset(self) -> None: for sql_changeset in self.all_sql_changesets: if not sql_changeset.findall(LiquiShit.rollback): raise ValidateExistenceRollbackInChangesetError( sql_changeset.attrib.get("id") ) logging.info( "Валидация на наличие в ChangeSet блока с rollback прошла успешно!" )
Deploy Dev, Stage, Prod. Выкатываем изменения на контур dev/stg/prod
script: - liquibase update --changelog-file=changelog-master.xml --username=$CLICKHOUSE_USER --password=$CLICKHOUSE_PASSWORD --url=jdbc:clickhouse://$CLICKHOUSE_HOST:$CLICKHOUSE_PORT/$CLICKHOUSE_DB_NAME --driver=com.clickhouse.jdbc.ClickHouseDriver --defaultSchemaName=$CLICKHOUSE_DB_NAME --logLevel=OFF
Rollback Dev, Stage, Prod
Благодаря добавленным блокам <rollback> , наличию и описанному разработчиком в SQL формате процессу отката, tagDatabase GitlabJob в случае необходимости будет откатывать примененные изменения до предыдущей версии (для этого нам и пригодились tagDatabase). Также благодаря жесткой валидации changelog.xml мы всегда уверены, что предыдущая версия БД на 1 меньше текущей, что значительно облегчает логику скрипта rollback
script: - ROLLBACK_TO_VERSION=$((LAST_TAG_DB-1)) - liquibase rollback --tag=${ROLLBACK_TO_VERSION} --changelog-file=changelog-master.xml --username=$CLICKHOUSE_USER --password=$CLICKHOUSE_PASSWORD --url=jdbc:clickhouse://$CLICKHOUSE_HOST:$CLICKHOUSE_PORT/$CLICKHOUSE_DB_NAME --driver=com.clickhouse.jdbc.ClickHouseDriver --defaultSchemaName=$CLICKHOUSE_DB_NAME --logLevel=OFF
P.S: откуда берется LAST_TAG_DB спросите Вы? В блоке кода есть метод get_last_tag, который вычисляет последний tagDatabase. После чего мы складываем build.env пайплайна так:
def save_last_tag(self): with open("build.env", "w") as f: f.write( f"LAST_TAG_DB={self.liquibase_changelog_validator.get_last_tag()}\n" )

Полная автоматизация миграций, от проверки до применения.
Поддержка кластерных миграций ClickHouse благодаря Liquibase + добрые люди.
Минимизация ошибок благодаря валидации.
Прозрачность: каждый changeSet документирован, а состояние БД можно откатить в случае необходимости с помощью нажатия кнопки в Gitlab Pipeline
Мы успешно применили этот подход для автоматизации миграций ClickHouse кластера. Хотелось бы услышать, как в других командах решают подобные задачи, делитесь в комментариях!
