Почему стриминг на KSQL и Kafka Streams — это непросто

    Привет, Хабр!

    Меня зовут Саша, я лид-разработчик в GlowByte Consulting. Мы с командой сделали неплохой стриминговый движок для одного крупного банка. Сейчас в продакшене крутится онлайн обработка банковских авторизаций, визитов клиентов в офис и еще ряд более мелких процессов, при этом все работает на KSQL и Kafka Streams. Хочу поделиться тем, на какие грабли мы наступили в процессе.

    Если интересны подробности, прошу под кат.

    image

    Для тех, кто в танке


    Если вы не знакомы с Kafka, советую почитать вводную часть с официального сайта Apache: kafka.apache.org/intro, либо с сайта Confluent:
    docs.confluent.io/current/kafka/introduction.html, чтобы было понятно, о чем вообще речь.

    Если же вы работали с Kafka как с шиной для обмена сообщениями, то у вас мог возникнуть вопрос: как сделать на этом стриминговый движок? Отвечаю, все благодаря Kafka Streams API и KSQL.

    Kafka Streams — это Java API, которое позволяет перекладывать данные из топика в топик, по пути совершая различные преобразования этих данных.

    KSQL — это надстройка над Kafka Streams, которая позволяет вместо написания Java кода использовать SQL-подобный язык, который автоматически генерирует Kafka Streams приложения.

    Что мы сделали


    image

    Архитектура решения GlowByte

    Наш PROD-кластер Kafka состоит из 3 нод. Конфигурация каждой примерно такая:

    • 48 ядер CPU (96 с HT) по 2.1 ГГц;
    • 512 ГБ RAM;
    • 12x HDD 10 ТБ;
    • Внутренняя сеть 40 Гбит/с.

    В качестве входных данных используются:

    • Плоские файлы, падающие на сетевой диск с частотой раз в минуту;
    • Оперативные данные из витрин в Oracle, загружаемые через Oracle Golden Gate;
    • Медленные данные с витринами и справочниками, нужными для процессинга, которые грузятся из Hadoop.

    Все входные данные обрабатываются в режиме реального времени и кладутся в топики Kafka, которые далее вычитываются через SAS RTDM (Real-Time Decision Manager), а также выгружаются в NOSQL базу HBase, куда SAS RTDM ходит за точечными лукапами. Далее уже SAS RTDM принимает решение о проведении маркетинговых коммуникаций посредством SMS, PUSH-уведомлений и т.д.

    Пример сценария (не обязательно настоящего)

    Клиент совершает покупку по карте в каком-либо магазине. Определяется, что магазин находится в крупном торговом центре, где также есть магазины-партнеры банка, выпустившего карту. Так что клиенту почти сразу после совершения покупки приходит сообщение с предложением посетить также и эти магазины.

    Почти всю логику мы реализовали на KSQL-скриптах. Пишутся они, в отличие от обычного SQL не так просто. Но все же проще, чем Java-код. К тому же, их можно исправлять без компиляции (удобно менять какие-нибудь простые фильтры), а также они могут быть прочитаны человеком, абсолютно не знакомым с Java и с классическим программированием вообще.

    Kafka Streams API мы использовали только в тех случаях, где KSQL оказался бессилен. Например, при сложных one-to-many джойнах.

    Как надо и как не надо


    Теперь перейду к основной сути поста. А конкретно, к граблям, на которые мы напоролись.

    НЕ НАДО: Надеяться, что все заработает «из коробки»
    НАДО: Изучить все технические детали


    Казалось бы, очевидный пункт. Но когда реализуешь большое и технически сложное решение, все равно на что-то да наткнешься. Основные наши проблемы были:

    • Мы использовали везде одинаковое дефолтное число партиций.

      Партицирование в Kafka нужно для регулирования возможностей параллелизма. Мы поставили везде по 20 партиций “с запасом”. В итоге же оказалось, что большое количество партиций сильно увеличило нагрузку на диски. А диски у нас — обычные HDD, пусть и большого объема;
    • Нужно сильно «тюнить» параметры Kafka-брокеров, Zookeeper’а и т.д.

      Несколько раз при попытке планово перезапустить Kafka (для внесения пары незначительных параметров), Kafka просто не поднималась. Приходилось оперативно разбираться с проблемой и чаще всего увеличивать те или иные timeout’ы в конфигах.

    Диски и железо


    НЕ НАДО: Использовать одни и те же диски для всего, просто потому что места хватает
    НАДО: Хорошо подобрать конфигурацию железа (особенно диски)


    image

    Основная проблема, с которой мы столкнулись — использование обычных HDD под State Store.
    Kafka Streams и, соответственно, KSQL под собой всегда имеют State Store в виде RocksDB для джойнов и вычисления агрегатов. На State Store приходится очень большое количество операций чтения-записи. Собственно, мы не ожидали, насколько много. Один обычный HDD вполне тянет (по IO) пару средних KSQL-процессов (два-три джойна) или один очень большой (пять джойнов + агрегация). Но процессов у нас очень много (десятки). А дисков всего 12 на ноду, причем почти все заняты топиками Kafka и нагружать их еще сверху — плохая идея.

    В итоге мы вынесли почти весь State Store в RAM. Работает хорошо, но занимает несколько сотен ГБ памяти.

    Смена парадигмы


    НЕ НАДО: Просто переносить SQL-прототипы в стриминг, ожидая, что все заработает
    НАДО: Перейти от парадигмы ETL в парадигму стриминга


    Наша компания очень активно делает разработку регламентных ETL-процессов для различных заказчиков. ETL-процессы обычно представляют собой батчевые расчеты, проводимые раз в сутки, раз в неделю или реже. В 90%+ случаев под всеми обертками ETL-процесс — это последовательное выполнение SQL-запросов в той или иной БД.

    Так как писали мы на KSQL, то в начале вполне логичным решением казалось просто адаптировать все SQL-прототипы, сделанные аналитиками, под синтаксис KSQL. Оказалось, так это не работает. KSQL — это лишь SQL-подобный язык, но работает он совершенно по-другому.

    Ниже картинка, показывающая разницу в обновлении данных в БД и в Kafka. В Kafka записи не обновляются. Старая остается и добавляется новая.

    image

    Еще пример, на этот раз с перемешиванием записей. В процессе преобразования какие либо два сообщения могут сначала быть в одной партиции, потом сменить ключ, а потом снова оказаться в одной партиции. В итоге их порядок может измениться. А так как в некоторых случаях порядок важен, получаем проблему.

    image

    Ниже будет пример адаптации одного скрипта, который в парадигме ETL считался довольно простым:

    SQL-скрипт:

    • 95 строк кода;
    • 1 шаг;
    • 1 full join;
    • Около 1-2 человеко-дней на разработку и тестирование.

    KSQL-скрипт:

    • 874 строки кода;
    • 12 шагов;
    • 2 left join + 1 group by;
    • 40+ человеко-дней на разработку и тестирование.

    Батчи вместо стриминга


    НЕ НАДО: Делать на стриминге то, для чего хватит батчей
    НАДО: Правильно выбирать, какие задачи можно сделать, а какие нет


    Вначале планировалось, что все необходимые процессы будут реализованы на KSQL и Kafka Streams. В процессе же оказалось, что некоторые вещи, оперативность которых не столь критична, а сложность реализации очень высока, лучше вынести в батч. Такие батчи можно запускать раз в час, несколько раз в день или реже, в зависимости от критичности. Но это будет явно лучше, чем рассчитывать в онлайне все подряд.
    Для себя мы решили:

    Можно делать в стриминге:

    • Фильтрация;
    • Скалярное изменение атрибутов;
    • Точечные lookup’ы;
    • Простые агрегации.

    Лучше по возможности вынести в батч

    • Джойны один ко многим и многие ко многим;
    • Сложные агрегации;
    • Пересчет за старые даты.

    Проблема пересчета


    НЕ НАДО: Ставить небольшой retention на все входящие события
    НАДО: В задачах онлайн-расчетов быть готовыми к тому, что все придется пересчитать


    В жизни никогда нельзя быть уверенным, что ты все сразу сделал правильно. Может возникнуть ситуация, что в расчеты, которые делались онлайн половину месяца, закралась ошибка (или требования поменялись задним числом). Конечно, во всех stateless операциях это не важно. Что было, то прошло. Но вот если в онлайне считаются, например, агрегаты (например, оборот клиента по карте), то это проблема. Если ничего не сделать, то они продолжат считаться неправильно. Приходится все пересчитывать. Поэтому, в отдельных случаях мы храним данные в топиках Kafka вплоть до трех месяцев. При этом пересчет в стриминге — штука не простая.

    База данных

    1. Делается пересчет нужной витрины данных за нужные даты;
    2. Данные в целевой таблице заменяются пересчитанными.

    Streaming

    1. Отключается онлайн-обработка данных;
    2. Очищается state store;
    3. Сбрасываются оффсеты;
    4. Идет репроцессинг данных;
    5. Если нужно, данные выгружаются в базу.

    Вместо итога


    Разумеется, я описал далеко не все проблемы, с которыми мы столкнулись в процессе реализации стриминг-движка. Но это точно одни из самых основных граблей, которые первыми приходят в голову. Если кому-то отдельные пункты показались очевидными или банальными — прошу прощения. Искренне надеюсь, что было не очень скучно, и что хотя бы кому-то эта статья поможет.
    GlowByte
    Компания

    Комментарии 15

      –3

      Почему при stream+table когда приходит со стороны table нету тригера? что это за бред?


      Например у нас полностью асинхронный сайт:
      1) добавляются видео (генерим guid, отправляем в очередь)
      2) в UI можно сделать сразу допустим like (отправляем в очередь) (не придирайтесь я для простаты восприятия).


      окей для видео TABLE — всё логично.
      НО специально делать table для лайков вместо stream, из за того что не можем гарантировать кто первый придет, это бред!

        0
        Во-первых, не особо понятно, как при stream-table join должен работать триггер справа. В стриме могут быть дубли по ключу, на что джойниться?
        Во-вторых, Kafka Streams API писали не мы, возмущаться тут бессмысленно.
          –3

          учите документацию, вы пишите статью о граблях, не зная о таком лимите…

            0

            Не понял, чего и о каком лимите, по вашему, я не знаю? Про то, что stream-table join триггерит джойн только приходом записей с левой части — это очевидный факт, так работает KSQL с Kafka Streams API

        0
        пока похоже просто на рекламный пост… почему именно так сделали как на схеме? зачем там например «золотой по деньгам» golden gate оракловый? Можно про это именно рассказать?
          0
          Лицензия на Golden Gate уже была. К тому же, это довольно хорошо работает. Из альтернативы, для загрузки данных из Оракла видится только какой-нибудь JDBC-коннектор, который работает ощутимо хуже и медленнее. GoldenGate льет данные в режиме реального времени, что является явным преимуществом.
          0

          Не нашел примерные нагрузки. Не поделитесь что летает на такой конфигурации ПО и хардваре, приемные цифры потоков?

            0

            Точных цифр по памяти сейчас не назову, надо лезть, смотреть и считать.
            В целом, вопрос нагрузки интересный, вас что именно интересует? Если смотреть на голые цифры количества событий на вход и на выход, будет не очень много, но это ни о чем не скажет. Внутри происходят сложные вычисления в режиме реального времени с кучей джойнов и агрегатов, что создаёт почти всю нагрузку на кластер.

              0

              Сильно поддержку вопрос касательно нагрузки. О каких вообще цифрах входящих и выходящих событий идет речь? Хотя бы порядок чисел.

            0
            На практике столкнулся еще с одними граблями: на основе стримов реализовать ретраи (перекинуть сообщение обратно в очередь если оно было запроцессено с ошибками) с конфигурируемым временем задержки. Штатного решения я не нашел, Thread.sleep() просто усыпил поток стримов (а потоков согласно архитектуре kafka streams выделяется пропорционально кол-ву партиций). Остановился на таком решении: закидывать все сообщения для ретраев в State Store и периодически скедулером доставать все сообщения и проверять сколько времени они там лежат, если это время >= сконфигурированного времени ретрая — перекидываем в очередь. Возможно есть более простое решение?
              0
              Мы реализовывали нечто подобное. Схема примерно такая: делаем перекладку сообщений в боковой топик с задержкой N секунд от таймстемпа сообщения, потом перекладываем сообщения в исходный топик. Понятное дело, накладываем по пути нужные условия, чтобы переподкидывать не всё. Если не очень понятно, пишите в личку, объясню подробнее.
              0
              Спасибо за статью. Скажите, какая в итоге получается нагрузка «с другой стороны трубы» — на RTDM? Желательно в TPS и MBPS
                0
                В понедельник смогу посмотреть, скорее всего. На вскидку там порядка нескольких тысяч сообщений в секунду. В Mb надо измерять.
                +1
                Привет, спасибо за статью.
                С проблемой RocksDb на HDD мы тоже столкнулись и в итоге перенесли хранение стора в tmpfs. Подскажите как вы реализуете деплой без потери данных? По сути когда меняешь структуру стрима, то меняется структура хранения в сторе и новая версия приложения с ней уже не совоместима? Или с ksql таких проблем нет?
                  0
                  Мы тоже используем tmpfs. С ksql ровно те же проблемы, что и с kafka streams. Действия от изменения какого-либо шага со state store зависят от ситуации. Если меняется топик, на который идет джойн как на table, достаточно очистить state store и changelog топик от этого шага, после чего вычитать топик-таблицу заново. Если меняется структура агрегатов, приходится делать частичный репроцессинг данных.

                Только полноправные пользователи могут оставлять комментарии. Войдите, пожалуйста.

                Самое читаемое