Сейчас Data Warehouse в Авито — это инсталляция на 32 серверах. Мы используем девятую версию Vertica и ClickHouse. В команде, которая отвечает за хранилище, работает 21 человек. Поток событий, который мы загружаем, достигает 20 миллионов событий в минуту.
Я расскажу, как менялось наше хранилище аналитических данных с 2013 года. Обрисую, что мы имели в начале пути, поделюсь историей борьбы за производительность, и покажу, к чему мы пришли. О части вещей мы уже говорили в статье о продуктовом подходе к DWH, но я сделаю больший акцент на технические детали и преодоление технических трудностей.
2013-2017. Начало пути
В далёком 2013 году мы выбрали методологию anchor modeling, чтобы построить наше хранилище. Это супернормализованная модель данных, где каждый атрибут хранится в отдельной таблице. Все таблицы состоят из трёх колонок: ID объекта, значения атрибута и времени, когда значение стало актуальным.
Плюс такого подхода в том, что изменения модели данных на источнике не приводят к перестроению всего, что пользуется этими данными в хранилище. При добавлении нового атрибута вы просто добавляете новую узкую таблицу, никакие старые таблицы не меняются. При изменении атрибута вы просто меняете одну узкую таблицу, не затрагивая другие. При удалении атрибута вы не делаете ничего. Лёгкость администрирования поражает. Также при супернормализованной модели данных мы не храним лишнее: каждый атрибут хранится отдельно и записывается только в том случае, если его значение изменилось с прошлой загрузки.
Возможно, это вызывает культурный шок у человека, который смотрит на хранилище в первый раз. Каждый раз при сборе следующего отчёта нужно заджойнить несколько десятков таблиц. Это может казаться безумием, но Vertica отлично умеет джойнить, поэтому сценарий прекрасно работает.
Как выглядела загрузка данных в 2013 году? В нашем Git-репозитории тогда лежали всего несколько скриптов, построенных на семи типовых операциях. Скрипты позволяли загружать данные в узкие таблички в супернормальной форме. У нас было много загрузок из разных систем источников и скрипты для обсчёта витрин или ежедневных отчётов, которые запускались после того, как все загрузки завершились. Скрипты выглядели примерно так:
loaders = []
loaders.append(Process(target = elt, args = (
ELT_Load('location_geoCodes', launch_id, conn_string),)))
loaders.append(Process(target = elt, args = (
ELT_Load('v_UniqUser', launch_id, conn_string),)))
loaders.append(Process(target = elt, args = (
ELT_Load('category', launch_id, conn_string),)))
loaders.append(Process(target = elt, args = (
ELT_Load('locations', launch_id, conn_string),)))
loaders.append(Process(target = elt, args = (
ELT_Load('users_statuses', launch_id, conn_string),)))
loaders.append(Process(target = elt, args = (
ELT_Load('items', launch_id, conn_string),)))
loaders.append(Process(target = elt, args = (
ELT_Load('users', launch_id, conn_string),)))
loaders.append(Process(target = elt, args = (
ELT_Load('paysys_operations', launch_id, conn_string),)))
map(Process.start, loaders)
map(Process.join, loaders)
Это длинная «простыня» из копипасты, много-много отдельных заданий, которые запускаются в каком-то пуле. Пулов несколько, они запускаются последовательно друг за другом.
Примерно так DWH продолжало выглядеть на протяжении четырёх лет. В 2016 году мы поняли, что встраиваться в скрипты становится сложно. Допустим, если мы хотим добавить новую витрину в скрипт расчёта витрин, нужно найти правильную строчку и добавить новую витрину так, чтобы при запуске все её зависимости уже были готовы, и при этом она стартовала максимально рано.
Пришла пора избавляться от «простыней» в файлах, чтобы сделать нашу жизнь лучше. Первое, что мы сделали, — это внедрили своё single-app приложение, которое назвали Dependency Walker. Внутри себя оно делает генерацию и планировку заданий и запускает несколько тредов, чтобы эти задания выполнять. И очередь, и планировщик, и воркеры живут в одном приложении и разделяют одну общую память. Планировщик заданий построен на системной табличке Projection Usage. Подход с Dependency Walker работал целых четыре года, пока в 2020 мы не решили написать свой распределённый планировщик заданий.
Следующая проблема, с которой мы столкнулись, — это шестая нормальная форма и трудности работы с ней. В 2017 году у нас уже были десятки тысяч таблиц в базе и порядка 100 витрин. Аналитики жаловались, что им сложно что-то найти. Поэтому мы решили сделать корпоративный портал dwh-docs, который упростил бы поиск данных.
Работало и до сих пор работает это следующим образом. У нас есть Vertica, в которую мы загружаем данные из множества источников, в том числе из связанных с workflow внутри компании.
Мы объединяем данные, начиная от задачи в Jira по построению нового отчёта или новой витрины и заканчивая логом расчётов. Данные загружаются в Vertica, и мы представляем их в поисковом движке. С его помощью пользователи DWH могут:
Найти таблицу и увидеть результат того, как она последний раз посчиталась.
Узнать, насколько можно верить этим данным, прошёл ли расчёт проверку.
Подписаться на изменения в случае, если какая-то из проверок целостности была нарушена. Человеку сразу приходит оповещение, если с данными что-то не так, чтобы он на это отреагировал.
2017-2018. Борьба за секунды
В 2017 году началась борьба за секунды. При очередном расширении кластера мы добавили пару нод и поняли, что хранилище перестало нормально работать: всё лагало и тормозило. В первый раз появилась проблема с тем, что мы не успевали загружать данные. Как следствие, мы позже запускали ежедневные расчёты витрин, и они накладывались на следующий день загрузки данных. Это копилось, и получился неприятный снежный ком, от которого нужно было избавиться.
Мы начали уделять больше времени оптимизации. Чтобы понять, что пошло не так, настроили мониторинг в Grafana на таблицах Data Collector. Так мы обнаружили, что главная проблема тормозов в том, что мы ожидали локов по 5 минут и пытались в моменте побороть симптомы, а именно максимально сократить количество транзакций.
Мы переписали скрипты так, чтобы они требовали минимального количества обращений к каталогу, и сократили в два раза количество транзакций. Это позволило жить какое-то время.
В 2018 году мы вернулись к этой проблеме и переписали самые тяжёлые загрузки, чтобы они меньше походили на «простыню» с копипастой из табличек в Excel. В итоге получились более короткие скрипты, которые берут метаданные из общего места и оптимально генерируют низкоуровневые задания на загрузку в узкие таблицы.
Казалось бы, всё заработало, но аналитики всё равно страдали и приходили к нам с жалобами, что хранилище очень медленное. Мы увидели, что простейшие запросы типа select *
из таблицы с лимитом выполняются минутами. Мы пытались разобраться, что не так, и увидели, что DBeaver и DataGrip, которыми пользуются наши пользователи, ведут себя агрессивно по отношению к Vertica. На каждый запрос клиенты пытались достать какие-то метаданные и понять, какие колонки только что вернул запрос, какие колонки были в таблицах из запроса и так далее.
Мы посмотрели, сколько времени тратит Vertica на выполнение запросов от аналитиков, и какая доля от этого времени уходит на запросы к метаданным. Ужаснулись, увидев, что это целых 30%, и решили, что с этим надо что-то сделать.
Сначала мы подумали об административном решении проблемы: можно пройтись по коллегам и заставить их правильно настроить клиенты. Но так от запросов к метаданным всё равно бы не получилось избавиться. Плюс, были смежные задачи, которые оставались бы нерешёнными, например, распределение запросов между серверами и перехват плохих запросов. Поэтому мы решили подойти к вопросу более системно, и сделали свою прослойку между клиентскими приложениями и Vertica.
Мы воспользовались тем, что Vertica в части клиентских библиотек и в части клиентского протокола очень похожа на Postgres. Мы выбирали между PgBouncer и Crunchy Proxy. Выбрали Crunchy Proxy, сделали из него форк: переписали так, чтобы он начал работать с Vertica.
В итоге, в прокси приходит запрос, прокси смотрит, что просили в запросе и видит SQL. В зависимости от запроса, он перенаправляет его в один из возможных бэкендов. Если прокси видит, что запрос в каталог, то перенаправляет его сразу же в Postgres, куда мы сложили копию каталога. Если он видит, что это какой-то страшный запрос, который кладёт наш кластер, то просто его отменяет. Если же видит, что запрос нормальный, то по round robin распределяет его к одной из нод Vertica.
Аналитики вздохнули с облегчением. Но уже год спустя, в 2018, мы поняли, что ресурсов снова не хватает. Аналитиков становилось всё больше, количество книг в Tableau росло непрерывно и достигло 6 000. Мы решили поднять реплику нашего кластера для того, чтобы аналитики могли разрабатывать там новые запросы и отчёты, и чтобы у них всё летало.
Мы развернули реплику. Со временем у нас появились сервисы, которые позволяли загружать данные одновременно в обе «Вертики». Все обсчёты и продовая нагрузка от сервисов остались на мастере, а пользователи смогли разрабатывать новые отчёты на базе, где им не нужно бороться за ресурсы с продовой нагрузкой.
Здесь вздохнули с облегчением уже мы: Vertica начала показывать скорости, которые мы от неё ожидали и которые видели в начале пути.
2018. Борьба за место
Следующая проблема, которая перед нами встала, — грядущее расширение и занятое место на дисках перед этим расширением. Мы попробовали расшириться, и поняли, что накопился огромный техдолг, от которого нужно избавиться. Мы добавили две ноды, и увидели, что потребление дисков упало всего на 10%. Кажется, что всё было зря, и что-то мы сделали не так.
Весь 2018 год мы скреблись о потолок по месту. Но при этом увидели, что в Vertica есть кодировки, и поняли, что до этого не обращали на них внимание. Мы попробовали сжать наши таблицы и оказалось, что эффект от сжатия одной был таким же, как от добавления двух нод. В итоге 2018 год получилось продержаться на тех же самых 80% занятого места на дисках за счёт сжатия таблиц, несмотря на то, что рост по данным продолжался. Мы ужимались, ужимались и ужимались.
Мы сделали собственный предсказатель для кодировок таблиц, чтобы не выбирать их руками. Мы знали, сколько весит каждая таблица и каждая колонка в ней, и знали, какая там сейчас кодировка. Мы собирали эти данные и строили модель предсказания на XGBoost. Модель говорила, на каких сжатиях лучше оценка по занимаемому месту, и мы постепенно переводили все таблицы.
Дальше мы подумали, что раз храним очень много данных, то точно есть те, которые мы трогаем редко. Их не обязательно хранить в Vertica, а можно выгружать куда-то во внешнюю систему. До 2018 года мы выгружали данные в CSV, и они занимали очень много места на дисках. Мы решили попробовать открытые колоночные форматы, например, ORC и Parquet.
Мы потестили и увидели, что ORC хорошо жмётся, лучше, чем Parquet. И увидели, что в ORC хранится статистика по блокам, что очень полезно. Плюс на тестах стало понятно, что ORC быстрее Parquet на тех запросах, которые мы писали. Поэтому мы решили пойти в его сторону.
В Vertica отсутствует расширение, которое позволяет выгружать данные в ORC. Но, недолго поискав, мы нашли расширение Export Data, которое позволяет выгружать локальные данные с каждого узла. И нашли оригинальную библиотеку Apache ORC, которая реализует стандарт. Мы скрестили их, и начали выгружать архивные партиции в каждый локальный кусочек в виде CSV, а затем перегонять его в ORC.
В 2020 году мы попробовали формировать ORC на лету из событий, которые получаем на входе, и у нас это получилось. В Vertica 11 появилась поддержка выгрузки в ORC, и мы планируем продолжать эксперименты с ORC для работы с холодными данными. В 2022 году планируем заняться разработкой нового решения для работы с холодными данными и сделать безболезненным переключение между горячими и холодными данными в наших расчётах.
2018-2019. Рост пользователей
Мы побороли проблемы, научились компактно храниться и использовать Vertica по назначению. Запросы снова летали, и всё было хорошо. Но мы понимали, что пользователи хранилища ограничены кругом аналитиков, а мы хотели, чтобы им мог пользоваться любой сотрудник Авито. Поэтому новой главной целью стал рост пользователей.
В 2018 году мы увидели, что задачи на создание новых отчётов, которые мы называем условно «витрины», мы делаем довольно долго. Сначала аналитики тратят спринт, чтобы сформулировать задачу и предложить первоначальный вариант расчёта. После этого задача попадает в спринт DWH, где расчёт перерабатывается, и из него делается что-то похожее на регламентный расчёт. После этого задача ставится на регламент, возвращается обратно аналитику, и аналитик пытается построить отчёт в Tableau или решить свою задачу. Получается по меньшей мере три недели. Это долго.
Мы хотели, чтобы всё было быстрее. Количество аналитиков росло, а количество DWH-инженеров росло не так быстро. Поэтому мы решили изобрести реестр витрин, сделать его расширяемым, зафиксировать и регламентировать формат SQL для витрин:
У нас есть шапка с параметрами для расчёта, есть DDL создания витрины, и есть код расчёта витрины. После этого настаиваем тесты и отдаём создание витрины аналитику. Мы выкатили новый флоу разработки витрин, и нагрузка на команду DWH сократилась. Мы живём в этой модели по сей день.
Мы дали карт-бланш аналитикам, но они не всегда отслеживали, насколько хорошо или плохо что-то делали. Мы решили снять эту проблему с помощью Slack-бота, который делает оповещение о плохих запросах. Если у аналитиков что-то плохо или как-то напрягает Vertica, то мы высылали уведомление, что что-то идёт не так. Мы собирали таблицы из Data Collector, в которых были интересующие нас фичи, и настраивали на них трэшхолды. Когда трэшхолды пробивались, мы высылали оповещение.
Чтобы включить в оповещение обфусцированные запросы от Tableau, мы научились их парсить и связывать с базой известных нам запросов, которые стоят на регламенте. Если это разработка нового отчёта, мы увидим имя пользователя ОС и найдём его. Если запрос из старого отчёта, который вдруг деградировал, то его мы тоже найдём, потому что свяжем с отчётами, которые есть в Tableau.
В 2019 году мы взяли в работу новую цель — снижение порога входа в хранилище. Им было сложно пользоваться ещё и потому, что сложно к нему подключиться. Пользователю нужно было настроить VPN, установить и настроить клиентские драйверы, установить клиентское приложение. И даже потом у пользователя будет периодически отваливаться сеть и будут теряться временные таблички. Плюс клиентские приложения не думают о том, чтобы их работа не оказывала влияния на СУБД, которая под ними стоит. Они просто кидают тысячи и десятки тысяч запросов к метаданным. Это плохо.
Мы понимали, что можем полечить это, просто написав свой клиент. Мы посмотрели, какие существуют в open source возможности написать свой веб-клиент к хранилищу, и выбрали из них SQLPad как наиболее простой на тот момент. Мы переписали бэкенд на Python и написали расширения и драйверы, которые позволяют подключаться к Vertica, Postgres и ClickHouse.
В своём sqlpad мы даём пользователям:
Базовую функциональность клиента, с автодополнениями и табличным просмотром.
Возможность возвращать пользователя в его сессию, потому что держим сессию за него.
Возможность сохранять запросы и делиться ими, пулить и пушить изменения в stash.
Возможность скачивать результаты выполнения запросов, формировать из запроса датасорсы для Tableau или сразу же из клиента говорить: «Построй мне график в Tableau по тому запросу, который я только что написал». Клиент сразу открывает Tableau, логинится и открывает график.
Следующим шагом для увеличения возможностей аналитиков делать привычные вещи было открытие расширений в Vertica. Аналитики — люди, которые любят Python. Периодически у них возникают задачи, которые, по их словам, невозможно решить в SQL.
В 2019 году Vertica в версии 9.2 выпустила возможность писать парсеры и трансформирующие UDF на Python. Мы пустили в Vertica аналитиков, и за год увидели настоящий бум: у нас уже 32 библиотеки и 50 функций. Они пользуются этой возможностью и пишут функции, когда не могут сделать что-то с помощью SQL. А возможность писать парсеры на Python позволила DWH-инженерам в дальнейшем упрощать загрузки из внешних систем.
В итоге у нас есть свои UDF для доступа к MongoDB, MySQL, Kafka, ClickHouse, Salesforce, для быстрого парсинга json-ов, для эффективного расчёта count distinct-ов, для разных других задач обработки данных.
2020. DWH в нескольких дата-центрах
В 2020 году нас ждал разъезд в несколько дата-центров. Общая схема примерно следующая:
У нас есть несколько классов наших инструментов. Один класс обеспечивает загрузку и обсчёт: это Cron, сервис расчёта витрин, сервисы загрузки данных. Другой класс — это входные точки в виде прокси и сервиса выгрузки данных для внешних сервисов. И третий класс — это инструменты для пользователя, которые позволяют нам быстро и легко решать свои задачи.
Мы знаем, что для узлов хранилища в текущей конфигурации принципиально важно иметь быстрый доступ к соседу. Поэтому для нас выбор, как именно разрезать хранилище, не стоял. Если всё, что на схеме обозначено оранжевым, мы можем легко переподнять, то над жёлтыми прямоугольниками надо поработать и сделать всё правильно.
Мы увезли реплику в другой дата-центр. Нам сказали: «Вот вам 6 гигабит между дата-центрами, пожалуйста, пользуйтесь, и ни в чём себе не отказывайте». С тех пор многое изменилось, но в тот момент мы поняли, что здесь есть определённые трудности, и сделали несколько исправлений.
В Vertica есть утилита VBR, которая выполняет репликацию. У неё есть некоторое количество ограничений. Например, первое из них — нельзя динамически изменять ограничение на килобайты в секунду в случае, когда под конец репликации остаётся только одна нода из тридцати. Нам нужно, чтобы этой одной ноде отдался весь канал. И мы этого добились. Плюс к этому мы понимаем, что не хотим гонять данные между дата-центрами зазря. Лучше хранить локальные бэкапы перед тем, как копировать данные с мастера, проверять, что файлик с данными есть в бэкапе, и тащить его оттуда.
Мы сделали пару патчей для VBR и попробовали их раскатить. Мы взяли один бэкап-сервер, а копировать с одного сервера оказалось примерно так же, как копировать по сети все наши десятки терабайт, которые ложатся на каждую ноду. Но мы понимаем, что если размажем бэкап по нескольким серверам, то это действительно будет быстрее, и наверное, так мы и сделаем, когда до этого доберёмся.
Плюс мы наткнулись на неприятную неожиданность с тем, что мы копируем данные, проходят сутки, и в конце мы видим сообщение “DDL statement interfered”. И до свидания все часы и сутки копирования. В итоге мы снова посмотрели на симптомы и пытались их вылечить.
Мы нашли запросы, которые мешают восстанавливать данные при репликации, и сделали так, чтобы этих запросов не было. В итоге сейчас всё более-менее штатно работает. Но мы имеем в виду что репликация — нежная штука, с которой нужно быть аккуратными, особенно когда вы разъезжаетесь в несколько дата-центров.
2021. Vertica + ClickHouse
2020 год продолжился для нас поиском удобного инструмента для realtime-аналитики. А потом случились то, что затронуло нас всех и закончилось ещё большим ростом количества данных, чем мы ожидали. Нам пришлось переформатироваться из выбора инструмента для realtime в поиск новых способов хранения таких объёмов данных.
Мы и раньше понимали, что есть один источник данных, который сильно отличается от остальных. Это пользовательские события, которые мы называем словом clickstream. В тот момент непосредственно сырые события составляли четверть того, что мы храним в DWH. Мы часто сталкивались с проблемами этой загрузки и много экспериментировали с ней в прошлом:
Начинали с Fluentd и множества различных монг в качестве транспорта — разные коллекции под разное предназначение.
В какой-то момент осознали, что все события одинаковые и удобнее работать с ними как с одним большим потоком, использовать реестр событий и SDK для их логирования.
Изобрели собственный способ хранения событий в виде массивов полей в Vertica, чтобы обеспечить загрузку данных с минимальной задержкой, найдя подходящий для нас баланс между скоростью записи и чтения.
Пересмотрели модель данных событий в DWH, чтобы обеспечить для них максимально эффективное сжатие.
Отошли от Anchor Modeling для хранения атрибутов событий и объединили обязательные атрибуты в одной таблице. Подробнее о дилемме моделирования событий в Anchor Modeling недавно писал Николай Голов в своей новой статье.
Но в какой-то момент наступило понимание, что всего этого недостаточно, и мы могли бы вовсе отказаться от хранения сырых данных в Vertica. Мы рассмотрели различные варианты архитектурных решений с учётом нашей специфики и выбранной модели данных. В итоге решили дополнить Vertica специальным местом для хранения сырых событий. Им стал ClickHouse. Это позволило нам, с одной стороны, облегчить ситуацию с ростом количества событий и обоснованностью их хранения нагорячую в Vertica, а с другой — учесть потребность в realtime-аналитике и начать строить новые решения поверх новой инфраструктуры.
Раньше мы понимали, что realtime-аналитика в наших реалиях затруднена выбором гипернормализованной модели данных и текущими нагрузками на хранилище. ClickHouse позволил начать с чистого листа.
При этом Vertica остаётся основной хранилища, в которую сливаются данные из всех систем источников. В ней всё также осуществляются ежедневные расчёты, она используется в качестве бэкенда для аналитики в Tableau, пользователи в основной своей массе продолжают пользоваться Vertica для доступа к данным в ClickHouse.
Чтобы обеспечить доступ к событиям кликстрима в ClickHouse, мы написали расширение для Vertica, которое позволяет эффективно ходить за данными в ClickHouse так, как будто они лежат в Vertica. Внутри этого расширения мы парсим запросы, достаём из них используемые колонки, предикаты и группировки, относящиеся к данным в ClickHouse, и формируем на их основе запрос в ClickHouse.
В итоге мы получили приемлемое по быстродействию решение, которое позволило совершить переход наших регламентных расчётов на новую модель данных без потери в производительности. В новой модели мы храним наши плохо структурированные события с тысячами полей в одной широкой таблице в ClickHouse.
Наши выводы
На опыте нашей DWH-команды можно сказать следующее:
Vertica действительно очень быстрая. Если возникают какие-то тормоза и проблемы, их надо полечить, и после этого пользоваться всеми благами, которые даёт СУБД.
Инструменты для Postgres с небольшим допилом работают и для Vertica. Не надо чураться брать их и применять в случае, если вам это нужно.
Много таблиц — это боль. Наша модель данных такова, что у нас много таблиц, но мы понимаем, что для репликации и каталога это минус. Пока мы с этим ничего делать не собираемся, просто имеем в виду.
Дефолтные кодировки — хорошо, а свои — лучше. Можно сжаться в два раза по сравнению с тем, что сжимает Vertica по дефолту.
ORC прекрасно себя показывает.
Репликация между дата-центрами — нежная.
Пользователи сами могут решать свои задачи, если им немножко помочь. Тогда команда DWH может сосредоточиться на интрументах и вызовах, которые стоят перед хранилищем.
Свой SQL-клиент избавляет нас от множества проблем, которые возникают у пользователей при использовании хранилища.
Vertica — достаточно простая в использовании и гибкая система, которая позволяет делать быстрые и надёжные интеграции с другими хранилищами, выступая в роли lakehouse и дополняя возможности внешних систем.
Послесловие
Нас периодически просят поделиться нашими наработками или выложить какие-то из компонентов в open source. И первым компонентом, который мы выкладываем в публичный репозиторий Авито, будет расширение для работы с кликхаусом в Vertica. У нас много интересного, чем мы бы хотели поделиться, мы планируем продолжать эту практику в будущем и надеемся, что наши разработки будут полезны кому-то еще.
Инжиниринг данных — телеграм-канал про аналитику, инжиниринг данных, технологии и карьеру в этой области.