company_banner

I am Groot. Делаем свою аналитику на событиях



    Весной этого года я узнал о возможности базы данных HP Vertica создавать запросы с матчингом паттернов событий. Так называемый Events Pattern Matching хорошо ложился под задачу анализировать поведение пользователей в продуктах ivi.ru. Мы решили попробовать разобраться с воронками оплаты, с поиском проблемных мест на устройствах, глубже погрузиться в анализ трафика. Нашей команде очень нравится, как реализована аналитика у Mixpanel и Localytics (она как раз основана на событиях и их свойствах), поэтому многие идеи были позаимствованы у них.

    Что вообще происходит?

    Исторически для аналитики мы, как и большинство остальных проектов, использовали Google Analytics. В какой-то момент на наших объемах сэмплирование данных достигло немыслимых масштабов — выборки строились менее чем на 0,5% аудитории. Это делало невозможным работу с небольшими выборками — они либо вообще не были видны, либо погрешность была катастрофичной. Плюс в GA невозможно было прокинуть кучу внутренних данных о контенте, что делало невозможным глубокий анализ.

    Этот факт послужил поводом для того, чтобы заняться разработкой собственной системы. Так родился Groot — внутренняя аналитика ivi.ru.

    Мы начали со списка требований, которым должен был соответствовать Groot:

    • Отсутствие сэмплирования, все данные должны храниться в сыром виде;
    • Кроссплатфоременность. Поскольку у нас помимо сайта есть очень популярные приложения для мобильных платформ и Smart TV, система должна уметь собирать данные даже с утюга, если он подключен к интернету и на нем стоит наше приложение;
    • Возможность быстрого масштабирования;
    • Отсутствие SPOF;
    • Простота настройки и разворачивания.

    Архитектура


    Помимо колоночной базы HP Vertica, решили использовать Apache Kafka и Apache Storm, тем самым открыв для себя великий и ужасный мир Java.

    Apache Kafka — pub/sub система. Основным отличием от обычных реализаций pub/sub является то, что подписчик может начать чтение сообщений не с конца, а с начала или середины. Это решение позволяет не беспокоиться о потере данных, когда подписчик не работает.

    Apache Storm — распределенная система для вычислений большого объема данных. Вообще, на тему Storm можно говорить долго. Нам в нём понравилась интеграция с kafka из коробки, возможность горизонтально масштабировать систему и достаточно быстрая скорость работы.

    Взгляд сверху

    В целом система работает следующим образом:

    • Клиент отправляет запрос с JSON-информацией о событии;
    • web-сервер на flask асинхронно отправляет пачку событий в kafka;
    • storm постоянно забирает новые сообщения из kafka;
    • в storm топология парсит, разбирает событие и строит batch запрос в vertica и сохраняет в базу данные.

    Первые неловкие шаги



    Первая версия работала очень плохо. Точнее, проблем отправкой данных в kafka не было совсем (все работает из коробки). А с apache storm пришлось повозиться, так как нам надо было написать свою топологию на java, которую у нас в компании никто не знает.

    Топология в storm состоит из следующих частей:

    • spout — краник из которого постоянно (или нет) прилетают данные. В нашем случае это стандартный KafkaSpout;
    • bolt — собственно обработчик данных. В «болтах» происходит вся магия работы с данными;
    • tuple — стандартная структура данных. В tuple может хранить что угодно, от простого числа до объекта.

    Я реализовал простейший bolt, который получал событие, парсил json и отправлял в базу пачку. Первые тесты выявили следующие проблемы:

    • Vertica блокирует таблицу во время записи;
    • Очень сложно отследить проблемные места в топологии;
    • Thread с вставкой в базу мог отправлять то 1 запись, то сразу 100. Не было понимания почему так происходит;

    Первая версия была очень простой: есть колонки с id, name, subsite_id, user_id, ivi_id, ts. При этом возникли трудности с таблицами в Vertica тоже оказалось сложно.

    Как видите, больше никаких данных мы не записывали. Потом, правда, решили записывать еще браузер, операционную систему, размеры окна браузера, версию флэш плеера. «Ха!», — подумали мы и сделали такую таблицу:

    | id | event_id | name | int_value | string_value | double_value | datetime_value | added |


    Сделали второй bolt, который из JSON достает дополнительные параметры, проверяет тип и записывает все это в новую табличку.

    Все было прекрасно, я радовался, что так круто получилось реализовать, аналитики радовались, что можно добавлять любые параметры в события и затем по ним строить отчеты. В то время у нас главным источником событий был сам сайт ivi.ru, мобильные приложения еще ничего не отправляли. Когда же они начали отправлять, мы поняли, что все очень плохо.

    Сначала давайте посмотрим на наш запрос для простой воронки «нажал» -> «купил» для браузера Chrome:

    WITH groupped_events AS (
        SELECT MIN(e.ts) as added, MIN(e.user_id) as user_id, e.name,
          MIN(CASE WHEN ep.name = 'browser' THEN string_value ELSE NULL END) as browser
        from events.events as e 
          LEFT JOIN events.event_properties as ep ON ep.event_id = e.id 
        WHERE e.added >= '2014-07-28' and e.added < '2014-07-29' and e.subsite_id = '10' 
        GROUP BY e.id, e.name
    ) 
    SELECT COUNT(q.match_id) as count, name 
    FROM (
           SELECT event_name() as name, user_id, match_id() as match_id 
           FROM groupped_events as e 
           WHERE e.name IN ('click', 'buy') 
           MATCH ( 
           PARTITION BY user_id ORDER BY e.added ASC 
           DEFINE 
            click as e.name = 'click' and e.browser = 'Chrome',
            buy as e.name = 'buy'
           PATTERN P as (click buy | click) 
           )
         ) as q 
    GROUP BY q.match_id, q.name;
    

    Видите подвох? Мы джойним табличку (сейчас там больше миллиарда записей), группируем ее и вытаскием через CASE нужное значение. Конечно же, когда у нас стало много событий, все это стало тормозить. Запросы работали по несколько минут, что нас не устраивало. Аналитики жаловались на запросы в полчаса, продуктологи хотели устроить мне темную.

    Почему?

    Отдельно хочется пояснить факт, что все-таки HP Vertica это колоночная база данных. Она очень компактно хранит кучу данных в колонках и позволяет, например, добавлять новую колонку налету, без перелопачивания всех данных. С нашей же табличкой «все-в-одном» вертика справлялась очень плохо — она не понимала как оптимизировать эту кучу.

    Тогда было принято решение перетащить основные параметры в таблицу events отдельными колонками, и сформировать список параметров, которые часто используются в запросах. Такую процедуру мы проделали 2 раза. В первый раз у нас появилась таблица с 30 колонками, во второй раз, уже с 50. После всех этих манипуляций, среднее время выполнения всех запросов уменьшилось в 6-8 раз.

    После всех манипуляций, предыдущий запрос превратился в простой:

    SELECT COUNT(q.match_id) as count, name 
    FROM (
           SELECT event_name() as name, user_id, match_id() as match_id 
           FROM events.events as e 
           WHERE e.name IN ('click', 'buy') 
           MATCH ( 
           PARTITION BY user_id ORDER BY e.added ASC 
           DEFINE 
            click as e.name = 'click' and e.browser = 'Chrome',
            buy as e.name = 'buy'
           PATTERN P as (click buy | click) 
           )
         ) as q 
    GROUP BY q.match_id, q.name;
    

    На этом мучения с базой у нас прекратились, в таком виде все живет уже около 3х месяцев и притензий у нас к ней не было.

    Мы все равно оставили таблицу event_properties, чтобы можно было разрабатывать приложения быстрее, а не ждать обновления структуры основной таблицы.

    Apache Storm

    Разобравшись с HP Vertica, мы стали разбираться с Apache Storm: нужно было стабилизировать работу, убрать отдельный Thread и быть готовым к большим нагрузкам.

    Есть минимум два способа batch-процессинга в storm:

    1. Отдельный thread с заполняемым списком;
    2. Использование стандартной возможности принимать tickTuple;

    Сначала мы испробовали первый вариант и отбросили его — поведение нестабильным, запросы шли в почти в холостую. Второй вариант показал нам всю прелесть Storm:

    С помощью простой настройки при создании топологии мы можем указать, когда хотим получить tickTuple (у нас 10 секунд). TickTuple это пустая запись, которая отправляется в основной поток раз в 10 секунд. Можем спокойно отследить такую запись, добавить в очередь или запись все в базу.

    private static boolean isTickTuple(Tuple tuple) {
      return tuple.getSourceComponent().equals(Constants.SYSTEM_COMPONENT_ID)
         && tuple.getSourceStreamId().equals(Constants.SYSTEM_TICK_STREAM_ID);
    }
       
    @Override
    public void execute(Tuple tuple) {
      if( isTickTuple(tuple) ) {
        executeTickTuple(tuple);
      } else {
        executeTuple(tuple);
      }
    }
    

    В executeTuple мы сохраняем событие в очередь LinkedBlockingQueue, и, соответственно, в executeTickTuple мы проходим по очереди и вставляем пачкой в базу.

    Нашу топологию мы разделили на несколько Bolt:

    • KafkaRecieverBolt — получает данные из KafkaSpout, парсит JSON и отправляет в PropertiesParserBolt;
    • PropertiesParserBolt — парсит нестандратные параметры, отправляет их EventPropertiesBatchBolt, отправляет все событие дальше в EventsBatchBolt
    • EventsBatchBolt — сохраняет данные в основную таблицу;
    • EventPropertiesBatchBolt — сохраняет данные в таблицу доппараметров

    Теперь мы можем посмотреть какой из «болтов» тормозит и сколько данных через него гоняется: Статистика работы топологии из Storm UI

    Послесловие


    В следующей статье я постараюсь рассказать как это все администрировать и мониторить.
    Онлайн-кинотеатр ivi
    79,00
    Компания
    Поделиться публикацией

    Комментарии 48

      +5
      Спасибо за статью!
      Да, было бы неплохо узнать, как это все администрировать и мониторить ;)
      –2
      Извините конечно что не в тему, но вместо всей этой безумной аналитики лучше бы на ivi.ru добавили возможность посмотреть семпл для оценки качества картинки до оплаты и перестали жадничать с параметрами кодирования так называемого вашего «HD» (на каком-нибудь рутрекере за видео такого качества не только бы удалили раздачу, но и забанили бы пожизненно).
      По-моему без этих двух пунктов аналитика навсегда останется сферическим конем в вакууме.
        +2
        «Vertica блокирует таблицу во время записи;»
        вы, что данные INSERT вставляете?
          +1
          «После всех манипуляций, предыдущий запрос превратился в простой» и что такое проекции вы не читали?
            0
            Строить на все возможные варианты запросов проекции сложно, а у вертики есть ограничение по количеству.
              0
              А что, join на таблицы фактов как-то может быть оптимизирован проекцией?
              Я даже теоретически не знаю как это сделать (только если ценой очень медленной вставки).
                0
                Мне почему кажется, что да, если у вас есть постоянные запросы с не меняющейся функцией агрегации, то наверно есть смысл превратить его pre-join projection, и это проекция будет пересчитываться, про обновление таблицы фактов
                vertica.tips/2014/03/05/pre-join-projections-overview/
                  0
                  интересно. Но мне показалось, что они не хотят join на 2 fact tables
                    0
                    В этом случае нам придется постоянно обновлять или добавлять проекции, так как клиенты присылают новые параметры события. Если аналитикам очень сильно нужно, мы переносим поле в основную таблицу
                    0
                    Это можно сделать, если таблицы фактов identically segmented. В таком случае будет merge join, это очень быстро.
                  0
                  И да, и нет. Мы используем драйвер JDBC для вертики, который при batchInsert делает COPY с транзакцией
                    +1
                    Мы сначала через rsync заливаем csv.gz файл на ноду вертики, затем выполняем запрос типа

                    COPY 'table' FROM 'file' GZIP DELIMITER ',' NULL '\N' ENCLOSED BY '''' DIRECT;
                    

                    Вертика отлично обрабатывает gzip и файл в 25млн строк заливается за ~30 сек
                      0
                      да, так можно быстро заливать, но у нас чуть-чуть другой кейс. в другом кластере заливка так и происходит
                        0
                        Я вот тоже не понял где у них LOCK то возникает, если у них это COPY то по дефолту он в WOS загружает. И не должно быть никакого лока.

                        А у вас 25 млн строк это в мегах сколько?
                        А все увидел у вас метод DIRECT, да он сразу в ROS грузит, и рекомендуется при загрузке файлов более 100 мегов.
                          0
                          Да, вертика — это все-таки OLAP а не OLTP, и частые мелкие инсерты вызывают внутренние процессы по перестройке ROS контейнеров, что может сказаться на производительности всего кластера.
                          Поэтому лучше где-то буферизировать и пачкой записывать сразу в ROS.

                          Ну и делать UPDATE и DELETE тоже лучше не стоит.
                            0
                            Значит им нужно подтюнить Tuple Mover, чтоб пореже переливал из WOS в ROS
                    +1
                    Скажите, а почему HP Vertica?
                      0
                      Очень хорошая скорость на больших объемах данных. Postgesql уже не справлялся.
                        +1
                        Спрашиваю не из разряда «я самый умный, mongo is webscale».

                        Быстрая в чем? Запись, чтение и особенно аналитика?

                        Рассматривали ли вы еще какие-то базы данных?

                        В чем выигрыш с точки зрения бизнеса? Что-то вроде…

                        — Лучше (дешевле) дать менеджерам OLAP чем возиться каждый раз с Hadoop?
                        — Проще было поставить HP потому что дружите с HP или потому что к примеру Cassandra не подходила под задачу.
                          +1
                          >Быстрая в чем? Запись, чтение и особенно аналитика?

                          Все вместе, сложные запросы выполняются очень быстро. Запись, если например через CSV файл происходит тоже быстро, даже очень. По сути HP Vertica это postgres на стероидах, заточенная для аналитики.

                          >Рассматривали ли вы еще какие-то базы данных?

                          Раньше использовали postgres, на больших и сложных отчетах постгря умирала или выполняла запрос несколько часов. Пробовали еще iccube, но что-то у нас не срослось.

                          Решения типа cassandra не рассматривали сразу, так как не было опыта и аналитикам сложнее отчеты строить. + сервера, поддержка и прочее. Сейчас есть мысль старые данные убирать куда-то, но думаем про amazon redshift.

                          Аналитикам всегда проще дать возможность писать запросы на SQL, так как они его хорошо знают и умеют работать. Обучать писать на scalding и на других обертках — дорого.

                          >Проще было поставить HP потому что дружите с HP
                          с HP вроде не дружим
                            0
                            Спасибо, ответ понял.
                              +1
                              >с HP вроде не дружим
                              Дружим, дружим :) И они с нами
                                0
                                Извините, могу ошибаться, но общего между PostgreSQLи HP Vertica только то что они реализуют ACID.

                                Постгрес типичный реляционный версионник. Таблицы храняться строками, работает с индексами.
                                Vertica — колоночная система, индексы не нужны — колонка уже индекс. Со всеми вытекающими и втекающими.

                                Запросы по принципу «посчитай кучу данных и выдай пару сотен строк аггрегата по ним это быстро.
                                А вот запрос поищи мне данные и дай тесколько тысяч строк из таблицы с десятком — другим колонок будет медленно.

                                Грубо — сама операция найди ключи от записей — быстрая операция, но собери теперь по этим ключам всю строку может быть сильно медленее.
                                  0
                                  Это не совсем так.
                                  В Вертике не каждая колонка — индекс. А только та (те), которые указаны в сортировке и сегментации. Технически они являются hash индексом.
                                  Возможно создание альтернативного индекса, он называется проекцией, но с совместным использованием разных индексов по одной таблице возможны проблемы.
                                  –1
                                  А вы не пробовали такую систему, как Splunk? Она позиционируется как раз как система для анализа больших данных. У меня в базе 1,6 млрд. записей (события из логов Windows), и пока нормально справляется. Плюс из коробки есть возможность кластеризации.
                                    0
                                    Сейчас есть мысль старые данные убирать куда-то, но думаем про amazon redshift


                                    Работаете на бесплатном комьюните терабайте? :)
                                      0
                                      Не пробовали voltdb.com?
                                      Мне прям кажется это под ваши задачи.
                                        0
                                        еще нет, но посмотрим :)
                                          0
                                          Тогда будем ждать статью ;)
                                            +1
                                            Вы мне весь мир DWH сломали со своим локом.
                                            Был на highload, Николай Голов из Avito. И я спросил, откуда у вам могли взяться локи таблиц, и он сказал, что нужно смотреть в сторону Transaction Isolation Level потому, что есть глюк у JDBC драйвера выставлять не READ COMMITED
                                            Надеюсь ничего не переврал. :)
                                              0
                                              спасибо, меня на самом деле тоже смущает данная ситуация, так как сейчас пишется через 1 bolt, а хочется в несколько.

                                              в среду постараюсь разобраться, напишу тут результаты.
                                                0
                                                И вы так делаете Loading Batches Directly into ROS?
                                                  0
                                                  сейчас нет, данных не так много заливается одновременно.
                                                    0
                                                    А сколько в граммах, и скажите сколько одновременно льющих соединений.
                                                  0
                                                  Да, так и есть. Это довольно просто проверить, киньте такой запрос, когда подозрительный запрос работает:
                                                  select * from sessions s join transactions tr on tr.transaction_id =s.transaction_id
                                                  У нас, как я говорил, такую ситуацию создавал самый обычный select, сформированный Tableau через неудачно настроенное соединение.
                                                    0
                                                    Вот, кстати, еще возможная причина: «Previously, a deadlock in the JDBC driver sometimes occurred when the close() methods of a Connection and a child PreparedStatement were called concurrently. The issue has been resolved.»
                                                    Эта проблема исправлена последним сервиспаком.
                                                    0
                                                    А я вот вас не понял. Локи таблиц в вертике повсюду, в том числе с READ COMMITED.

                                                    my.vertica.com/docs/7.1.x/HTML/index.htm#Authoring/ConceptsGuide/Other/READCOMMITTEDIsolation.htm

                                                    READ COMMITTED isolation uses exclusive (X) write locks that are maintained until the end of the transaction.
                                                      0
                                                      Локи разного уровня. Вот здесь хорошая табличка на эту тему есть: my.vertica.com/docs/7.1.x/HTML/index.htm#Authoring/SQLReferenceManual/SystemTables/MONITOR/LOCKS.htm?Highlight=lock type
                                                      И здесь, в упрощенном варианте: my.vertica.com/docs/7.1.x/HTML/Content/Authoring/ConceptsGuide/Other/Transactions.htm
                                                      Если вкратце, lock от READ COMMITED другим транзакциям не мешает. А от Serializable — мешает.
                                                        0
                                                        Почитайте подробнее тот пример, откуда взята эта цитата: «READ COMMITTED isolation uses exclusive (X) write locks that are maintained until the end of the transaction.»
                                                        Там описывается, что такой lock дает операция delete (и, соответственно, update).
                                                        insert работает с уровнем блокировки I.
                                                        У меня, например, распространена практика, когда в одну единственную таблицу льет от 7 до 30 параллельных транзакций, c read commited изоляцией. На highload я как раз об одном таком процессе рассказывал.
                                                        И все отлично работает.
                                                          0
                                                          Ооок, спасибо! Очевидно, я что-то как-то упустил, я разнес вставку в разные таблицы по разным потокам (что, вообще-то не до фига хорошо, на сколько я понимаю, pre join projections мне с такой техникой не видать из-за нарушения порядка вставки). Впрочем, у меня во многие таблички делаются еще и апдейты в приличном количестве, которые я в итоге все сделал через оптимизированный-мерж. Думаю, эксклюзивные локи при мерже и сформировали у меня мнение, что вертика лочит все.
                                                            0
                                                            С апдейтами в вертике аккуратно надо.
                                                            Советую выполнить вот такой запрос: select projection_name, count(*) from delete_vectors group by projection_name;
                                                            Он позволит оценить, насколько много работы по дефрагментации нужно провести базе, чтобы устранить последствия делетов и апдейтов.
                                                        0
                                                        select * from locks
                                                        


                                                        если что
                                                  +1
                                                  Ещё мучали Sybase IQ. Но тот не потянул. Я даже где-то расстроился.
                                                    0
                                                    Ваши и ivi посты в целом наверное самые продвинутые на хабре.

                                                    Интересно что расстроились. Я бы скорее удивился если бы она взлетела.

                                                    Любопытно наши ИТшные домыслы и инстинкты работают.
                                                      –1
                                                      В прошлой жизни Sybase IQ мне очень нравился для аналитики. Очень быстро и всё такое. Но вот время прошло, а они остались ровно там, где я их видел. :(
                                                        +1
                                                        Не все успевают за временем. Подозреваю, что там как были клиенты типа страховых компаний так и остались. И ничего менять для них нет надобности.
                                              0
                                              Добрый день, был раз знакомству на HighLoad++. :)
                                              Очень рад, что еще кто-то активно использует Pattern Match в Вертике. Мы тоже его используем уже несколько месяцев, местами это дает просто фантастические результаты… Хотя есть и некоторые смешные особенности.
                                              На тему скорости — а не могли бы вы привести код, как у вас отсортирована, сегментирована и партиционирована та таблица, events.events?

                                              Только полноправные пользователи могут оставлять комментарии. Войдите, пожалуйста.

                                              Самое читаемое