Kafka Streams — непростая жизнь в production

    Привет, Хабр! Вокруг меня сформировался позитивный информационный фон на тему обработки событий через Kafka Streams. Этот инструмент привлекает множеством видео-докладов и статей на Хабре, подробной документацией, понятным API и красивой архитектурой. Некоторые мои знакомые и коллеги разрабатывают с его помощью свои системы. Но что происходит в реальной жизни, когда эти системы уходят в production?

    В этой статье я опущу введение в Kafka Streams, предполагая, что читатель уже знаком с ней, и расскажу о нашем опыте жизни с этой библиотекой на примере достаточно нагруженной системы.

    Коротко о проекте

    Внутренней командой вместе с партнерами мы работаем над биржей Ad Exchange, которая помогает перепродавать рекламный трафик. Специфику подобных инструментов мы уже когда-то описывали в статье на Хабре. По мере роста числа партнеров среди SSP и DSP, нагрузка на сервера биржи растет. А для повышения ценности самой биржи мы должны собирать с этого трафика развернутую аналитику. Тут-то мы и попытались применить Kafka Streams.

    Внедрить новый инструмент у себя на продакшене - это всегда некоторый риск. Мы его осознавали, но смирились с этим, поскольку в целом Kafka Streams концептуально должен был хорошо подойти для расчета агрегатов. И хотя первое впечатление было отличным, далее я расскажу о проблемах, к которым это привело.

    Статья не претендует на объективность и описывает только лишь наш опыт и проблемы, с которыми мы столкнулись, работая с этой технологией. Надеюсь, кому-то она поможет избежать ошибок при использовании Kafka Streams. А может быть даст повод посмотреть по сторонам.

    Агрегаты

    В нашу систему приходят десятки и сотни тысяч сообщений в секунду. Первое, что нам необходимо было сделать, - агрегации по различным ключам. Например, идёт поток событий "Показ рекламы". Для него надо считать на лету общую стоимость и количество, группируя по различным ключам: стране, партнёру, рекламной площадке и т.д.

    Вроде бы стандартный кейс для Kafka Streams: используем функции groupBy и aggregate и получаем нужный результат. Всё работает именно так, причем с отличной скоростью за счёт внутреннего кэша: несколько последовательных изменений по одному и тому же ключу сначала выполняются в кэше и только в определённые моменты отправляются в changelog-топик. Далее Kafka в фоне удаляет устаревшие дублирующиеся ключи через механизм log compaction. Что здесь может пойти не так?

    Репартиционирование

    Если ваш ключ группировки отличается от ключа, под которым изначально пришло событие, то Kafka Streams создаёт специальный repartition-топик, отправляет в него событие уже под новым ключом, а потом считывает его оттуда и только после этого проводит агрегацию и отправку в changelog-топик. В нашем примере вполне может быть, что событие "Показ рекламы" пришло с ключом в виде UUID. Почему нет? Если вам надо сделать группировку, например по трём другим ключам, то это будет три разных repartition-топика. На каждый топик будет одно дополнительное чтение и одна дополнительная запись в Kafka. Чувствуете, к чему я веду?

    Предположим, на входе у вас 100 тысяч показов рекламы в секунду. В нашем примере вы создадите дополнительную нагрузку на брокер сообщений в размере +600 тысяч сообщений в секунду (300 на запись и 300 на чтение). И ведь не только на брокер. Для таких объёмов надо добавлять дополнительные сервера с сервисами Kafka Streams. Можете посчитать, во сколько тысяч долларов обойдется такое решение с учётом цен на железо.

    Для читателей, не очень хорошо знакомых с механизмом репартиционирования, я поясню один момент. Это не баг или недоработка Kafka Streams. С учётом её идеологии и архитектуры это единственное возможное поведение - его нельзя просто взять и отключить. Когда у сообщения меняется ключ, этот новый ключ надо равномерно "размазать" по кластеру так, чтобы каждый инстанс Kafka Streams имел собственный набор ключей. Для этого и служат дополнительные запись/чтение в repartition-топик. При этом если инстанс А записал в топик сообщение, то не факт, что он же его и прочитает. Это может сделать инстанс Б, В и т.д. в зависимости от того, кто какую партицию читает. В итоге каждый ключ группировки у вас будет более-менее равномерно распределён по серверам кластера (если вы его хэшируете, конечно).

    Запросы к агрегатам

    Данные пишут для того, чтобы с ними потом можно было работать. Например делать к ним запросы. И здесь наши возможности оказались серьезно ограничены. Исчерпывающий список того, как мы можем запрашивать данные у Kafka Streams, есть здесь или здесь для оконных агрегатов. Если используется стандартная реализация state-store на основе RocksDB (а скорее всего это так), фактически данные можно получать только по ключам.

    Предположу, что первое, что вам хочется сделать с полученными данными, - это фильтрация, сортировка и пагинация. Но здесь таких возможностей нет. Максимум, что вы можете, - это считать всё, что есть, используя метод all(), а потом вручную делать нужные операции. Вам повезет, если ключей не слишком много и данные уместятся в оперативной памяти. Нам не повезло. Пришлось писать дополнительный модуль, который по ночам выгружал данные из RocksDB и отправлял в Postgres.

    Ещё надо помнить, что на каждом отдельно взятом сервисе находится только часть ключей. Что если вам необходимо отдавать ваши агрегаты, скажем, по HTTP? Вот кто-то сделал запрос к одному из сервисов Kafka Streams: мол, дай мне такие-то данные. Сервис смотрит у себя локально - данных нет. Или есть, но какая-то часть. Другая часть может быть на другом сервере. Или на всех. Что делать? Документация Kafka Streams говорит, что это наши проблемы.

    Стабильность Kafka Streams

    У нас бывают проблемы с хостинг провайдером. Иногда выходит из строя оборудование, иногда человеческий фактор, иногда и то, и другое. Если по какой-то причине теряется связь с Kafka, то Kafka Streams переводит все свои потоки в состояние DEAD и ждёт, когда мы проснёмся и сделаем ей рестарт. При этом рядом стоят соседние сервисы, которые работают с той же Kafka через Spring и @KafkaListener. Они восстанавливаются сами, как ни в чём не бывало.

    Kafka Streams может умереть и по другой причине: не пойманные исключения в вашем коде. Предположим, вы делаете какой-то внешний вызов внутри потоков Kafka Streams. Пусть это будет обращение к базе данных. Когда база данных падает, у вас два варианта. Первый - вы ловите эту ошибку и продолжаете работать дальше. Но если база будет лежать долго, а работа с ней - критична, вы будете считывать большое количество сообщений из топиков и некорректно их обрабатывать. Второй вариант кажется лучше - совсем не ловить исключение и дать Kafka Streams умереть. Дальше как обычно: ночной подъём, рестарт всех серверов с Kafka Streams. "Из коробки" у вас нет варианта подождать, пока база восстановится, и продолжить работу.

    Нам пришлось дописать в каждый сервис Kafka Streams дополнительный модуль, который работает как watchdog: поднимает Kafka Streams, если видит, что она умерла.

    Кстати, если вы работаете с Kafka Streams через Spring, то не забудьте переопределить стандартный StreamsBuilderFactoryBean, указав в нём свой CleanupConfig. Иначе будете неприятно удивлены тем, что при каждом рестарте будет удаляться вся локальная база RocksDB. Напомню, что это приведёт к тому, что при каждом рестарте все сервера начнут активно считывать данные из changelog-топика. Поверьте, вам это не нужно.

    KStream-KStream Join

    Здесь можно было бы обойтись одной фразой: никогда это не используйте. Джоин двух потоков создаёт десятки топиков в Kafka и огромную нагрузку на всю систему. Просто не делайте этого. Ну или хотя бы проверьте все под нагрузкой, прежде чем ставить в production.

    Вообще Kafka Streams любит создавать топики под различные свои нужды. Если вы не изучили под лупой документацию и не протестировали, как это работает, то это может стать для вас и ваших DevOps неприятным сюрпризом. Чем больше топиков, тем сложнее их администрировать.

    Масштабируемость

    Любой человек, знакомый с Kafka, скажет, что масштабируемость - одна из главных её преимуществ: "В чём проблема? Добавляем партиций и радуемся". Всё так. Но не для Kafka Streams.

    Если вы используете джоины, то все ваши топики должны быть ко-партиционированы (co-partitioning), что, в числе прочего, означает, что у них должно быть одинаковое количество партиций. Так в чём же проблема?

    Представим, что вы сделали несколько топологий: с агрегатами, джоинами, всё как положено. Поставили в production, оно работает, вы радуетесь. Дальше бизнес пошел в гору, нагрузка начала расти, вы добавили серверов и хотите увеличить количество партиций, чтобы загрузить эти новые сервера. Но Kafka Streams наплодил десятки топиков. И у всех надо поднимать количество партиций, иначе следующий рестарт ваших сервисов может стать последним. Если Kafka Streams увидит, что топики не ко-партиционированы, она выдаст ошибку и просто не запустится.

    На вопрос, что с этим делать, у меня сегодня ответа нет. Вероятно, можно потушить все рабочие инстансы Kafka Streams, потом поднять число партиций на всех причастных топиках, затем поднять Kafka Streams обратно и молиться. А может быть последовать совету отсюда: Matthias J. Sax пишет, что это нужно делать, создавая новый топик с новым количеством партиций и подключать к нему Kafka Streams с новым application.id. Там же есть совет, что если вы знаете заранее, что нагрузка будет большая, то лучше сделать партиций с запасом.

    Заключение

    Если у вас в системе нет и не планируется высоких нагрузок, то со всеми этими проблемами вы скорее всего не столкнётесь. Всё будет работать отлично. В противном случае стоит серьёзно задуматься, нужно ли вам это, особенно если планируете использовать следующий уровень абстракции - KSQL.

    Автор статьи: Андрей Буров, Максилект.

    P.S. Мы публикуем наши статьи на нескольких площадках Рунета. Подписывайтесь на наши страницы в VK, FB, Instagram или Telegram-канал, чтобы узнавать обо всех наших публикациях и других новостях компании Maxilect.

    Maxilect
    Умные решения для вашего бизнеса

    Похожие публикации

    Комментарии 11

      0
      Kafka Streams может умереть и по другой причине: не пойманные исключения в вашем коде.… Второй вариант кажется лучше — совсем не ловить исключение и дать Kafka Streams умереть...

      Вот это меня удивляет больше всего — почему они не могут просто откатывать оффсет и репроцесить сообщение, если добавлять счетчик ретраем в метаданные то было бы вообще хорошо. И тогда пользовательский код решал что бы хватит уже ретраить или же продолжаем ретраить пока не подымется база.


      А так стримы хороши когда сами в себе :(

        0
        особенно если планируете использовать следующий уровень абстракции — KSQL.

        А можете поподробнее прокомментировать этот момент? Чем опасен KSQL?

          0
          У KSQL “под капотом” всё та же kafka-streams. То есть все проблемы, которые я описал выше, имеют место быть и в случае с KSQL. Но при этом KSQL внешне выглядит ещё проще: вроде бы просто написал SQL запрос и радуйся.
          Проблема в том, что такая простота обманчива. Без глубокого понимания, как это работает, она может стать большой проблемой при увеличении нагрузок на систему.
          При повышении уровня абстракции всегда приносится в жертву производительность.
          В случае с KSQL такая жертва, думаю, будет чересчур велика.
          Также уточню, что опыта работы с KSQL в production у нас нет, так что рассуждения выше — это только рассуждения.
          0
          Иначе будете неприятно удивлены тем, что при каждом рестарте будет удаляться вся локальная база RocksDB.

          А как организована связь RocksDB с разделами темы? На каждый раздел своя база или как-то иначе?


          И что происходит, если данных в разделе/теме много и они не помещаются в локальную базу?

            0
            И что происходит, если данных в разделе/теме много и они не помещаются в локальную базу?

            давайте либо много диска чтобы помещалось либо делаете несколько инстансов на которых бегают стримы и у каждого свой локальный сторадж

              0

              Количество инстансов со стримами чем-то ограничено (partitions?)?

                0

                и да и нет.
                у вас активных консюмеров топика не может быть больше чем кол-ва партишинов. но вы можете держать про запас у вас могут быть разные топики и следовательно кафка распределит консюмеров по ним.

              0
              Под разделами темы вы имеете ввиду партиции топика кафки?
              Если да, то выглядит это так.
              Например, у вас в топике 10 партиций и топик читают две ноды kafka streams.
              Каждая нода берёт себе по 5 партиций. Соответственно у каждой ноды в её локальной базе RocksDB будут данные только с этих 5 партиций.
              При условии, что ноды находятся в одной consumer-группе, естественно.
              Если данные не умещаются в локальную базу, то будете получать стандартные для этого исключения: no space left on device, GC overhead limit exceeded и т.п.
                0
                Под разделами темы вы имеете ввиду партиции топика кафки?

                Если точнее, я имею ввиду такую терминологию:


                • Тема — Topic
                • Раздел — Partition

                См. здесь или здесь.


                Если данные не умещаются в локальную базу, то будете получать стандартные для этого исключения: no space left on device, GC overhead limit exceeded и т.п.

                Как же тогда масштабироваться, если на входе много данных?

                  0
                  Как же тогда масштабироваться, если на входе много данных?

                  бейте на много партишинов ваш топик

                    0
                    Вы можете добавить ещё серверов с kafka-streams. Например, если у вас будет 5 серверов вместо 2, то в моём примере на каждого будет приходиться по 2 партиции и, соответственно, в 2,5 раза меньше данных.

              Только полноправные пользователи могут оставлять комментарии. Войдите, пожалуйста.

              Самое читаемое