В этой статье хочу поделиться способом, который позволил нам прекратить хаос с процессингом данных. Раньше я считал этот хаос и последующий ре-процессинг неизбежным, а теперь мы забыли что это такое. Привожу пример реализации на BiqQuery, но трюк довольно универсальный.
У нас вполне стандартный процесс работы с данными. Исходные данные в максимально сыром виде регулярно подгружаются в единое хранилище, в нашем случае в BigQuery. Из одних источников (наш собственный продакшн) данные приходят каждый час, из других (обычно сторонние источники) данные идут ежедневно.
В последствии данные обрабатываются до состояния пригодного к употреблению разнообразными пользователями. Это могут быть внутренние дашборды; отчёты партнёрам; результаты, которые идут в продакшн и влияют на поведение продукта. Эти операции могут быть довольно сложными и включать несколько источников данных. Но по большей части мы с этим справляется внутри BigQuery с помощью SQL+UDF. Результаты сохраняются в отдельные таблицы там же.
Очевидным способом организации этого процессинга является создание расписания операций. Если данные подгружаются ежедневно в час ночи, то мы настроим процессинг на 01:05. Если этот источник данных подгружается в районе 5й минуты каждого часа, то настроим процессинг на 10ю минуту каждого часа. Промежутки в 5 минут для пользователей не критичны и предполагается, что всё должно работать.
Но мир жесток! Данные не всегда приходят вовремя. Или вообще не приходят, если не починить. Если твоя часовая загрузка закончилась на 11й минуте, а трансформация запускалась на 10й – то пожалуйста, жди ещё час чтобы увидеть эти данные в дэшборде. А если операция использует несколько источников, то ситуация будет ещё веселее.
Более того, подгружаемые сырые данные не всегда верны (данные вообще всегда не верны!). Периодически данные приходится чистить или перезагружать. И тогда нужно перезапустить все операции и с корректными параметрами, чтобы всё починилось.
Это всё, конечно, проблемы с сырыми данными и нужно именно их и решать. Но это та война, в которой нельзя окончательно победить. Что-то всё равно будет поломано. Если источник данных внутренний – то ваши разработчики будут заняты новыми крутыми фичами, а не надёжностью трекинга. Если это сторонние данные, тогда вообще труба. Хотелось бы, чтобы по крайней мере процессинг не мешался по дороге и как только сырые данные починены — все клиенты сразу видели корректные результаты.
Это реально большая проблема. И как же ещё решить?
Если процессинг приводит к проблемам, то не надо его делать! Не надо делать вообще никакой процессинг и хранить промежуточные результаты. Как только пользователю нужны результаты, всё должно вычисляться на лету из сырых данных. Учитывая скорость BigQuery это вполне реалистично. Особенно если все что вы делаете с данными это GROUP BY date и count(1), и нужны только данные за последние 14 дней.
Большинство аналитики работает именно с такими запросами. Поэтому мы данное решение активно используем. Но этот подход не работает со сложными трансформациями.
Одна проблема – это сложность кода. Если сложить все операции в один SQL запрос, то его будет не прочитать. К счастью это решается за счёт таблиц типа view (представления). Это логические таблицы в BigQuery, данные в них не хранятся, а генерируются из SQL-запроса на лету. Это сильно упрощает код.
Но другая проблема – это производительность. Здесь всё плохо. Не важно какие быстрые и дешёвые современные базы данных. Если запустить сложную трансформацию на одном годе исторических данных, это займёт время и будет стоить денег. Других вариантов нет. Эта проблема делает данную стратегию неприменимой в довольно большом проценте случаев.
Если нет возможности обойтись без системы управления процессингом, то нужно построить эту систему хорошо. Не просто расписание выполнения скриптов в cron, а система мониторинга загрузки данных, которая определяет когда и какие трансформации запускать. Наверное паттерн pub/sub тут очень подходит.
Но есть проблема. Если построить сложную систему более менее просто, то вот поддерживать её и ловить баги – это очень сложно. Чем больше кода, тем больше проблем.
По счастью есть и третье решение.
Лямбда архитектура – это знаменитый подход к процессингу данных, который использует преимущества обработки данных по расписанию и в реальном времени:
*Как нормально перевести на русский не знаю, batch job – это что пакетное задание? Кто знает, подскажите!
Обычно это все строится с использованием нескольких решений. Но мы используем по сути тот же трюк просто внутри BigQuery.
И вот как это работает:
Процессинг по расписанию (Batch layer). Мы ежедневно выполняем SQL-запросы, которые трансформируют данные имеющиеся на текущий момент, и сохраняем результаты в таблицы. У всех запросов следующая структура:
Результаты этого запроса будут сохранены в table_static (перезапишут её). Да, BigQuery позволяет сохранять результаты запроса в таблице, которая использовалась в этом запросе. В итоге мы берём старые, уже посчитанные данные (чтобы их не пересчитывать) и соединяем с новыми данными. X дней – это выбранный период, за который мы хотим пересчитать данные, чтобы учесть все возможные корректировки сырых данных. Предполагается что за X дней (сколько – это индивидуально для источника) все корректировки уже будут внесены, всё что сломалось починится и данные уже больше не будут меняться.
Доступ в реальном времени (Speed layer + Serving layer). Эти обе задачи объединены в один SQL-запрос:
Да, это тот же самый запрос! Его мы сохраняем как представление (view) с именем table_live и все пользователи (дэшборды, другие запросы, и т.п.) тянут результаты из этого представления. Так как представления в BigQuery хранятся на логическом уровне (только запрос, не данные), каждый раз при обращении он будет пересчитывать последние X дней на лету и все изменения в изначальных данных будут отражены в результатах.
Так как запрос в обоих случаях одинаковый, то в реальности, чтобы избежать дупликации кода, ежедневный запрос (из batch layer) выглядит так:
Этот подход имеет ряд важных преимуществ:
PS Если любите использовать таблицы разбитые по датам в BigQuery (мы очень любим), то есть решение и для этого. Но это тема для другого поста. Подсказка – функции для работы с этими таблицами не ругаются, если часть таблиц – это только представления.
PPS Если бы представления в BigQuery поддерживали кешинг (как это работает с обычными запросами), это было бы реально круто. Это по сути сделало бы их материализованными (materialized views). И эффективность нашего подхода стала бы ещё выше. Если вы согласны – здесь можно поставить звёздочку, чтобы эту фичу быстрее реализовали.
У нас вполне стандартный процесс работы с данными. Исходные данные в максимально сыром виде регулярно подгружаются в единое хранилище, в нашем случае в BigQuery. Из одних источников (наш собственный продакшн) данные приходят каждый час, из других (обычно сторонние источники) данные идут ежедневно.
В последствии данные обрабатываются до состояния пригодного к употреблению разнообразными пользователями. Это могут быть внутренние дашборды; отчёты партнёрам; результаты, которые идут в продакшн и влияют на поведение продукта. Эти операции могут быть довольно сложными и включать несколько источников данных. Но по большей части мы с этим справляется внутри BigQuery с помощью SQL+UDF. Результаты сохраняются в отдельные таблицы там же.
Очевидным способом организации этого процессинга является создание расписания операций. Если данные подгружаются ежедневно в час ночи, то мы настроим процессинг на 01:05. Если этот источник данных подгружается в районе 5й минуты каждого часа, то настроим процессинг на 10ю минуту каждого часа. Промежутки в 5 минут для пользователей не критичны и предполагается, что всё должно работать.
Но мир жесток! Данные не всегда приходят вовремя. Или вообще не приходят, если не починить. Если твоя часовая загрузка закончилась на 11й минуте, а трансформация запускалась на 10й – то пожалуйста, жди ещё час чтобы увидеть эти данные в дэшборде. А если операция использует несколько источников, то ситуация будет ещё веселее.
Более того, подгружаемые сырые данные не всегда верны (данные вообще всегда не верны!). Периодически данные приходится чистить или перезагружать. И тогда нужно перезапустить все операции и с корректными параметрами, чтобы всё починилось.
Это всё, конечно, проблемы с сырыми данными и нужно именно их и решать. Но это та война, в которой нельзя окончательно победить. Что-то всё равно будет поломано. Если источник данных внутренний – то ваши разработчики будут заняты новыми крутыми фичами, а не надёжностью трекинга. Если это сторонние данные, тогда вообще труба. Хотелось бы, чтобы по крайней мере процессинг не мешался по дороге и как только сырые данные починены — все клиенты сразу видели корректные результаты.
Это реально большая проблема. И как же ещё решить?
Решение №1 – убрать проблемные детали
Если процессинг приводит к проблемам, то не надо его делать! Не надо делать вообще никакой процессинг и хранить промежуточные результаты. Как только пользователю нужны результаты, всё должно вычисляться на лету из сырых данных. Учитывая скорость BigQuery это вполне реалистично. Особенно если все что вы делаете с данными это GROUP BY date и count(1), и нужны только данные за последние 14 дней.
Большинство аналитики работает именно с такими запросами. Поэтому мы данное решение активно используем. Но этот подход не работает со сложными трансформациями.
Одна проблема – это сложность кода. Если сложить все операции в один SQL запрос, то его будет не прочитать. К счастью это решается за счёт таблиц типа view (представления). Это логические таблицы в BigQuery, данные в них не хранятся, а генерируются из SQL-запроса на лету. Это сильно упрощает код.
Но другая проблема – это производительность. Здесь всё плохо. Не важно какие быстрые и дешёвые современные базы данных. Если запустить сложную трансформацию на одном годе исторических данных, это займёт время и будет стоить денег. Других вариантов нет. Эта проблема делает данную стратегию неприменимой в довольно большом проценте случаев.
Решение № 2 – построить сложную систему
Если нет возможности обойтись без системы управления процессингом, то нужно построить эту систему хорошо. Не просто расписание выполнения скриптов в cron, а система мониторинга загрузки данных, которая определяет когда и какие трансформации запускать. Наверное паттерн pub/sub тут очень подходит.
Но есть проблема. Если построить сложную систему более менее просто, то вот поддерживать её и ловить баги – это очень сложно. Чем больше кода, тем больше проблем.
По счастью есть и третье решение.
Решение № 3 – лямбда архитектура! …ну, типа того
Лямбда архитектура – это знаменитый подход к процессингу данных, который использует преимущества обработки данных по расписанию и в реальном времени:
*Как нормально перевести на русский не знаю, batch job – это что пакетное задание? Кто знает, подскажите!
Обычно это все строится с использованием нескольких решений. Но мы используем по сути тот же трюк просто внутри BigQuery.
И вот как это работает:
Процессинг по расписанию (Batch layer). Мы ежедневно выполняем SQL-запросы, которые трансформируют данные имеющиеся на текущий момент, и сохраняем результаты в таблицы. У всех запросов следующая структура:
Результаты этого запроса будут сохранены в table_static (перезапишут её). Да, BigQuery позволяет сохранять результаты запроса в таблице, которая использовалась в этом запросе. В итоге мы берём старые, уже посчитанные данные (чтобы их не пересчитывать) и соединяем с новыми данными. X дней – это выбранный период, за который мы хотим пересчитать данные, чтобы учесть все возможные корректировки сырых данных. Предполагается что за X дней (сколько – это индивидуально для источника) все корректировки уже будут внесены, всё что сломалось починится и данные уже больше не будут меняться.
Доступ в реальном времени (Speed layer + Serving layer). Эти обе задачи объединены в один SQL-запрос:
Да, это тот же самый запрос! Его мы сохраняем как представление (view) с именем table_live и все пользователи (дэшборды, другие запросы, и т.п.) тянут результаты из этого представления. Так как представления в BigQuery хранятся на логическом уровне (только запрос, не данные), каждый раз при обращении он будет пересчитывать последние X дней на лету и все изменения в изначальных данных будут отражены в результатах.
Так как запрос в обоих случаях одинаковый, то в реальности, чтобы избежать дупликации кода, ежедневный запрос (из batch layer) выглядит так:
SELECT * FROM table_live
(и сохраняем результаты в table_static)Этот подход имеет ряд важных преимуществ:
- Каждый пользователь получает актуальные результаты, нет никакой ресинхронизации между сырыми данными и агрегированными (это при условии, что все косяки с сырыми данными разрешаются за X дней)
- Пользователи получают результаты быстро. Никто не пересчитывает 2 года данных каждый раз, когда к ним обращаются. Пересчёт последних X дней в BigQuery происходит приемлемо быстро. X так и выбирается, чтобы не создавать проблем с производительностью. Можно и на часы вместо дней перейти конечно, но не приходилось этого делать с нашими данными.
- Не нужно вообще думать о расписании ежедневных операций. Просто нужно сделать это раз в день, но от времени загрузки сырых данных это не зависит. Кроме того, проблем не возникает, если в один день трансформация упадёт или если её запустить дважды. Чтобы пользователь заметил проблему, нужно чтобы процессинг «лежал» больше X дней.
- Для построения такой системы нужно минимальное количество кода (читай — проблем). Сам SQL-запрос на 10 строчек длиннее оригинального и теперь хранится внутри BigQuery как представление (view). Плюс нужно запускать ежедневный процессинг, но это элементарная задача.
- Издержки данной системы могут быть даже меньше, чем при создании сложной системы расписаний. Кажется, что должно быть дороже, так как мы постоянно пересчитываем X дней. Но наш опыт показывает, что наоборот. Если вы подгружаете данные каждый час, то вам придётся делать пересчёт каждый час, включая выходные. И ещё придётся пересчитывать, на X дней назад, чтобы отразить корректировки. Итого за неделю наберётся 24*7=168 раз. Но в реальности пользователь может открывать этот дэшборд только три раза в неделю. При нашем подходе придётся пересчитать 7 раз по ежедневному расписанию и плюс 3 раза на лету. Существенно меньше.
PS Если любите использовать таблицы разбитые по датам в BigQuery (мы очень любим), то есть решение и для этого. Но это тема для другого поста. Подсказка – функции для работы с этими таблицами не ругаются, если часть таблиц – это только представления.
PPS Если бы представления в BigQuery поддерживали кешинг (как это работает с обычными запросами), это было бы реально круто. Это по сути сделало бы их материализованными (materialized views). И эффективность нашего подхода стала бы ещё выше. Если вы согласны – здесь можно поставить звёздочку, чтобы эту фичу быстрее реализовали.