Привет! Меня зовут Максим Чижов, я уже третий год работаю бэкенд-инженером в Авито. Когда только пришёл в компанию, я столкнулся с проблемой хранения больших объёмов информации. О том, как её решить, расскажу в статье.
Сервисы, которые создаёт наш юнит, работают по классической схеме ETL. Extractor извлекает сырые аналитические данные из внешнего источника, Transformer преобразует их в плоский вид и сохраняет в MongoDB. А Loader загружает трансформированные данные в хранилище Vertica.
Сейчас Авито хранит данные в шестой нормальной форме в Vertica.
Читать также: Vertica+Anchor Modeling = запусти рост своей грибницы
В чём проблема
Сырые данные хранятся в MongoDB. Несмотря на то, что мы должны хранить их не меньше трёх месяцев, их объём постоянно растет из-за добавления новых экстракторов. В какой-то момент прирост данных составил 5 ТБ в неделю. Проблема в том, что наша база данных представляет собой стандартную реплику из трёх баз данных — 1 master и 2 slave. При штатной работе репликация обрабатывала их без проблем. Но однажды replication lag начал зашкаливать из-за проблем с сетью.
Единственным вариантом стало пересоздание реплики. В итоге, мы потеряли около 40 ТБ сырых данных — это было больно. Чтобы избежать повторения такой ситуации, мы стали действовать в двух направлениях:
Разносили данные по разным репликам.
Решили хранить часть данных в архиве, а при необходимости восстанавливать их в Mongo.
Как мы выбирали хранилище
На основании запросов заказчиков и необходимого объёма данных, мы выделили требования к хранилищу:
Сырые аналитические данные должны храниться не меньше года.
Объём данных хранилища — 800 ТБ.
В нём можно быстро восстанавливать данные в горячее хранилище.
Данные можно трансформировать также, как если бы они не были заархивированы.
Должна быть возможность делать запросы к холодному хранилищу, хотя бы с примитивными фильтрами.
В итоге выбирали между Ceph, Hadoop и обычными файлами. Чтобы сравнивать хранилища было удобнее, мы собрали таблицу:
Преимуществом хранилища Ceph было то, что Авито предоставляет для него интерфейс S3. С ним написано много библиотек, в том числе и на Python. Для наших целей была выбрана библиотека aioboto3.
Результат работы — AaaS
Сервис предоставляет две ручки: архивации и восстановления. Под капотом у него работает стандартный архиватор pigz, который позволяет сжимать данные в четыре раза и уменьшать годовой объем данных до 200 ТБ. А еще он умеет работать в несколько потоков. Затем, на основе параметров от экстрактора формируется уникальный путь в хранилище.
Система работает по такому алгоритму:
Extractor по крону запускает выгрузку из внешнего источника и записывает данные в MongoDB.
После окончания работы Extractor запускает worker архивации, который режет данные на батчи размером 1 ГБ.
Каждый батч асинхронно передаётся в AaaS по websocket as is.
На стороне сервиса архивов данные сжимаются и отправляются в Ceph.
Extractor проставляет метаданные об успешной архивации extract_id.
Кроме этого, каждый день запускается крон, который удаляет устаревшие экстракты из горячего хранилища. Перед этим проверяется, заархивированы ли эти данные. Каждый экстрактор имеет свои настройки по TTL, обычно это 7 дней. В этот момент в метаданные записывается удаление extract_id.
Многие спросят, а почему нельзя было сделать так, чтобы каждый экстрактор сам архивировал данные и отправлял их в Ceph? Потому что для каждого изменения логики архивации пришлось бы обновлять каждый экстрактор — это дорого. Кроме того, другим типам сервисов, например, трансформерам, тоже может потребоваться архивация.
Объясню, как происходит разделение данных:
Для каждой коллекции считается средний размер документа в гигабайтах. За это отвечает параметр collstats.avgObjSize.
Исходя из размера батча (по умолчанию это 1 Гб), мы получаем его в записях.
Если знаем общее количество документов в коллекции для текущего extract_id, получаем количество батчей.
В Redis записывается ключ EXTRACT_ID.batches_info с полученными цифрами. Это важный момент, так как средний размер документа часто меняется, особенно когда коллекция только создана. Если этого не сделать, то при повторной архивации номера записей в батчах будут разные и возникнет путница.
Что произошло с нагрузкой на сеть
Первый рабочий вариант подразумевал, что сервис архивов читал данные напрямую из MongoDB экстракторов. Но от этого варианта быстро отказались по нескольким причинам:
Каждый раз при добавлении нового экстрактора нужно было править сервис архивов, чтобы указать ему путь к коллекциям и другие настройки.
Нарушался один из основных принципов микросервисной архитектуры в Авито — «1 база — 1 сервис». Это показатель качественной декомпозиции функционала отдельных сервисов.
Чтобы достигнуть нужной чистоты архитектуры, мы пожертвовали нагрузкой на сеть. Давайте посчитаем, как она возросла.
Сжатие, в среднем, в 4 раза — значит, 4Tx = Rx. Можем посчитать увеличение нагрузки на сеть по формуле:
Проще говоря, нагрузка на сеть увеличилась на 80%. Это оказалось допустимым и не вызвало никаких проблем с перегрузкой сети.
Восстановление данных
Для восстановления данных используется ручка Restore, которая отдает их по websocket. Сначала мы решили, что достаточно будет восстанавливать их в горячее хранилище, а потом проводить над ним нужные операции. Это может быть, например, трансформация архивных данных. Как временное решение это работало неплохо, но оно нарушало принцип «1 база – 1 сервис».
Пользователи злоупотребляли возможностями и восстанавливали данные в продуктовую MongoDB. Чтобы решить эту проблему, мы добавили экстрактору возможность отдавать не только данные, которые ещё не удалены из горячего хранилища, но и архивные из Ceph.
Объясню, что происходит на схеме:
Трансформер идёт за данным в ручку экстрактора через extract_id.
Экстрактор проверяет свои метаданные — архивированы и удалены ли данные этого extract_id.
Если не архивированы и не удалены, то читает их из MongoDB.
Если архивированы и удалены, то читает их из AaaS, который в свою очередь ищет данные в Ceph.
Если же данные удалены из архива — выдаётся сообщение об ошибке.
Трансформер не подозревает о существовании сервиса архивов и может использовать данные за любой период. В случае, когда их нет в горячем хранилище, экстрактор работает как proxy.
Что в итоге
В результате мы получили стабильный размер горячего хранилища, которое держится в районе 20 ТБ. Это комфортное значение для репликации. Но даже если снова возникнет ситуация с критичным replication lag, мы можем просто безболезненно пересоздать реплику.
Теперь мы можем трансформировать данные за любой период хранения в холодном хранилище. Трансформер ничего не знает об архиве и идет напрямую в экстрактор. Такая схема позволяет быстро вносить правки и доставлять данные в Vertica, уменьшая при этом расход ресурсов.
Несмотря на подводные камни, начиная от выбора способа хранения холодных дампов и заканчивая бесшовной трансформацией архивных данных, схема получилась довольно простой.
Мы получили чистую архитектуру, но увеличили нагрузку на сеть. Такие жертвы неизбежны, чтобы получить те качества системы, которые мы хотим достичь:
Горизонтальная масштабируемость. Можем уменьшать или увеличивать ресурсы AaaS. В нашем случае мы просто меняем количество подов в Kubernetes.
Взаимозаменяемость компонентов. Если вдруг вместо Ceph нам захочется использовать другое хранилище, мы можем внести изменения безболезненно для всех экстракторов, и они даже не заметят их.
Предыдущая статья: Trunk Based Development — кто такой и зачем нужен