Как стать автором
Обновить

BI с Redshift от ETL до бордов

Время на прочтение7 мин
Количество просмотров2.8K

Привет, Хабр, я Node.js разработчик, и я хочу поделиться с вами опытом по реализации business intelligence (BI) процесса. 

В какой-то момент бизнес вырос до размера, пусть и небольшого, когда считать различные цифры и проводить аналитику в excel таблицах уже сложно и медленно, да и количество людей работающих с данными значительно выросло. Тогда зашла речь об автоматизации этого процесса и визуализации различного рода аналитики. Так мы подошли к мысли, что пора реализовывать BI внутри компании.

BI - это просто набор механизмов, технологий для сбора, анализа и визуализации данных.

Немного о бизнесе

У нас есть заказы (orders), которые создаются в лабораториях. У этих заказов есть различные этапы (statuses) их обработки, бизнесу интересно понимать скорость обработки этих заказов, т.е. время перехода из одного статуса в другой, соотношение количества результатов заказов к общему количеству.

Выбор хранилища

В качестве главного хранилища мы используем не реляционную базу данных Cassandra. Для нашего BI она нам не подходит, т.к. она не предназначена для аналитики данных. У нее есть свой язык CQL (Cassandra Query Language), но он сильно урезан относительно SQL. к тому же мы используем такую схему данных, когда у нас в одной колонки хранится ID объекта, а в другой хранится JSON объекта. С такой схемой далеко в аналитику не уедешь. 

Т.к. у нас вся архитектура лежит в AWS, наш взор сразу пал на Redshift. Изучив ее особенности мы решили, что для наших задач ее функционала хватит более чем. Мы не занимались здесь глубокой аналитикой и сравнением Redshift с другими решениями, такими как ClickHouse, BigQuery, Azure.

ETL для Redshift

Разработчики заранее дают нам понять, что подход, когда мы в режиме реального времени пишем данные в БД с Redshift не прокатит. Лучший способ доставки данных в хранилище это копирование из S3 файлов в формате csv.

Этот способ невероятно быстрый за счет того, что позволяет обрабатывать файлы параллельно, но есть и некоторые ограничения. Файлов должно быть немного, желательно столько сколько у вас развернуто node Redshift, чтобы файлы обрабатывались параллельно и размер файлов должен быть от 1 - 125 МБ.

Подумав над всеми этими рекомендациями, мы поняли, что нам нужна реализация, которая будет писать данные в файлы, а затем заливать их в файловое хранилище AWS S3 и дальше перемещать уже в хранилище Redshift.

Первая серьезная проблема с которой мы столкнулись это то, что обновлять данные будет очень сложно и дорого как по времени, так и по ресурсам. Исходя из этого, мы поняли, что нам нужно записывать данные в разных состояниях, опираясь на какие-то события в системе, а уже по дате создания записи фильтровать их. Важно, что дата создания (timestamp) !== дате добавления строки в БД, т.к. мы пишем данные пачками.

Таким образом, каждая таблица у нас содержит данные об объекте в разный промежуток времени и выглядят они примерно так:

Сервис для подготовки данных (service-redshift)

Давайте реализуем сервис таким образом, чтобы он прослушивал все события, которые прилетают в брокер и решал, надо ли ему записать объект из события, взяв определенные поля, или на его основе сделать какую-то запись.

В наш брокер прилетают сообщения (events) в формате:

some_event: {
  method: POST,
  current_state: { object_type: order_status, … },
  previous_state: { - } 
},

another_some_event: {    
  method: PATCH,    
  current_state: { object_type: order, status: received, … },        
  previous_state: { object_type: order, status: created, … } 
},

Мы можем подключить наш сервис к брокеру и реагировать на события. Давайте напишем функцию, которая будет реагировать на метод PATCH у event на конкретный тип объекта requisition.

// index.js
const handlers = {
	"PATCH order": require("./order").onPatch,
}
const onHandleEvent = async (event) => {
	await handlers[`${event.method} ${event.object_type}`]();
}

Теперь опишем обработчик для объекта requisition

// order.js
const onPatch = async event => {
	...
}

В эту функцию у нас прилетает event с типом объекта requisition и его предыдущим и текущим состоянием. Теперь мы можем сформировать какой-нибудь объект из полученных данных и отправить его в промежуточное хранилище. У нас могут быть разные события на один и тот же объект, поэтому давайте реализуем функцию, которая будет формировать объект независимо от того какой метод отработал.

// collector_order.js
const onHandleOrder = event => {
	//здесь могут быть какие-то расчеты полей, но у этого объекта 
  //все будет просто
  return {
    id: event.id,
    status: event.status,
    created_at: event.created_at,
  };
};

// order.js
const onPatch = async event => {
  const order = onHandleOrder(event);
  
  await sendToTempStore(order);
};

Промежуточное хранилище

Определимся теперь, где нам промежуточно хранить данные перед их отправкой в S3. Можно было бы писать прямо в S3 файлы, но:

  1. В файлы нельзя проводить дописывание

  2. Если бы мы загружали туда по одному файлу, а потом собирали их и где-то агрегировали, тогда наш алгоритм работал бы медленно

Т.к. мысль с файлами нас еще не покинула, наш взор пал на технологию AWS EFS. Но прошерстив документацию, стало понятно, что возникнет проблема с чтением маленьких файлов, а записывать все в один будет не так быстро, как хотелось бы.

И тут мы наконец-то добрались до самого быстрого хранилища - оперативная память, а именно: Redis.

Соберем брокер, service_redshift, Redis и сам Redshift в одну единую схему:

Было решено, что данные будут ходить в Redshift раз в какой-то промежуток времени (Migration шаг в схеме). Значит, что за это время данные должны полежать в Redis, а во время боя курантов перебраться в файлы и отправиться в S3. 

В Redis нужно каким-то образом искать данные для отправки, не зная их ID. Лучший вариант был - это сложить все данные в Set, а затем найти нужный Set и перебрать его значения. Сами данные будут представлять из себя строчки, связанные запятой, так сказать, подготовились к формату csv.

Чтобы легче всего было найти нужные данные в Redis мы воспользовались маской для ключа вот такого вида: table_name-YYYY-MM-DD-HH. Таким образом, мы можем в назначенное время, зная имена конечных таблиц, найти все необходимые нам данные.

Адаптируем функцию обработки объекта по методу, добавив запись в Redis

// temp_store.js
const moment = require("moment");
const redis = require("./connected_redis");

const generateKeyName = (data, tableName) => {
	const date = moment().format('YYYY-MM-DD-HH');
  
  return `${tableName}-${date}`;
}

const sendToTempStore = async (order, tableName) => {
  const row = Object.values(order)
  	.push(new Date().toISOString()) // Это наша колонка timestamp, она всегда идет в конце
  	.join(",");
  const key = generateKeyName(row, tableName);
  
  // В случае отсутсвия Set, редис сам создаст его
  // Поэтому никаких дополнительных проверок тут не надо
  await redis.rPush(key, row);
}

// order.js
const onPatch = async event => {
  const order = onHandleOrder(event);
  
  await sendToTempStore(order, "orders");
};

В конечно итоге мы получаем Set с нашими объектами для определенной таблицы

Миграция в Redshift

Собрав данные в Redis, мы раз в период начинаем их оттуда извлекать и записывать в файлы, причем, не забывая о рекомендациях документации о размере и количестве файлов.

Алгоритм действия будет у нас выглядеть таким образом:

Берем все сеты, которые есть в Redis (мы выделили отдельный Redis под эти манипуляции, а значит лишнего не захватим). Перебираем все таблицы, создаем папку под каждую таблицу, туда мы будем складывать файлы для последующей миграции. Отбираем набор сетов для таблицы, убираем тот, который не равен текущему часу (почему так чуть подробнее ниже), остальные складываем в структуру данных Map.

Теперь берем каждую таблицу и все ее сеты. Так как в сете у нас может быть много записей, ограничим обработку до 100к, т.к. одна запись примерно 102 bytes, значит 100к ~ 10 MB. Таким образом контролируем память.  Если записей больше, чем 100к, разбиваем на чанки, записывая пограничные индексы. В конечном счете записываем данные в файлы по сетам.

После того, как мы собрали файлы из сетов, можем переносить это все в S3, а затем переносим это в Redshift, используя команду COPY.

Стоит отметить, что сервис не работает постоянно, он представляет из себя контейнер ECS, который запускается по расписанию, что как раз в 2-ом пункте позволяет нам не писать никуда никакие интервалы. Интервал контролируется временем запуска.

Визуализация данных

К визуализации данных мы уже подошли, сравнивая различные решения. Выбор стоял между двумя - Redash, Metabase.

Tableau отпало, т.к. инструмент слишком мощный и у нас просто нет необходимости в большой части его функционала.

Grafana рассматривалась, но вскоре мы решили, что для рядовых пользователей этот инструмент будет слишком тяжелым.

Есть две ключевые особенности, которые были главными аргументами в пользу одного из решений (Redash vs Metabase).

Redash имеет такую архитектуру, что при выполнение какого либо запроса, он не ждет ответа в браузере от своего сервера, который в свою очередь ждет ответа от базы данных. Вместо этого он создает Job, которая ждет выполнения запроса и с браузера клиента он проверяет ее статус. Когда Job завершилась, он берет результат и показывает в дашборде.

Metabase посылает запрос к своему сервису и ждет результата до последнего, если у вас стоит какой-то балансировщик или прокси сервер, который отбрасывает запросы, если они идут дольше 60 секунд, вы испытаете вот такую проблему.

Redash обладает еще одной особенностью, он позволяет выстраивать графики опираясь на один запрос, в то время как Metabase нужно иметь отдельный запрос на каждый график.

В ходе долгой дискуссии мы все же решили, что с точки зрения пользователя, гораздо удобнее будет использовать Metabase, не смотря на, с нашей точки зрения, техническое превосходство Redash.

С инструментом определились, осталось сформировать запросы, учитывая, что мы храним данные в разных состояниях.

Мы используем materialized view, чтобы не собирать постоянно данные с разных таблиц при помощи join, т.к. этот процесс на большом количестве данных и связей достаточно медленный. Оно представляет из себя таблицу, собранную из разных таблиц воедино. Эта таблица имеет возможность обновляться автоматически по мере обновления данных в таблицах, из которых она соткана. Если таблицы начинают обновляться друг за другом, materialized view обновиться лишь один раз, когда закончатся обновления всех таблиц. Сам запрос выглядит примерно так:

create view orders_mv
            (order_id, status, created_at)
as
CREATE MATERIALIZED VIEW orders_mv AUTO REFRESH YES
AS
(
with order_v as (
    SELECT o.order_id,
           o.status,
           o.created_at,
    FROM (SELECT o.order_id,
                 o.status,
                 o.created_at,
                 // Здесь мы как раз проводим фильтрацию, отбирая только последнее состояние заказа в БД
                 pg_catalog.row_number() OVER (PARTITION BY o.order_id ORDER BY o."timestamp" DESC) AS row_num
          FROM orders o
         ) o
    WHERE o.row_num = 1

Подытожим

У нас получилось собрать конструкцию из сервиса и скрипта, который запускается по расписанию, воссоздав таким образом свой процесс ETL.

У него есть один минус, который мы видим сразу же - чем больше данных (колонок) мы хотим видеть в момент их выставления у объектов, тем больше записей мы получим на выходе в Redshift. Например, у ордера мы хотим видеть 2 промежуточных состояния, когда выставится some_column_one и some_column_two, таким образом, мы получим две записи:

Но при текущем количестве данных (<10кк в час) нас вполне устраивает это решение, а когда мы доберемся до больших цифр, мы начнем свой переезд в AWS Kenesis. Думаю мы еще обязательно поделимся этим опытом с вами.

Теги:
Хабы:
Всего голосов 5: ↑4 и ↓1+6
Комментарии7

Публикации

Истории

Работа

Ближайшие события

12 – 13 июля
Геймтон DatsDefense
Онлайн
14 июля
Фестиваль Selectel Day Off
Санкт-ПетербургОнлайн
19 сентября
CDI Conf 2024
Москва
24 сентября
Конференция Fin.Bot 2024
МоскваОнлайн