Неужели нельзя обойтись без кафок и рэббитов, когда принимаешь 10 000 ивентов в секунду

    Однажды я вел вебинар про то, как принимать 10 000 ивентов в секунду. Показал вот такую картинку, зрители увидели сиреневый слой, и началось: «Ребят, а зачем нам все эти кафки и рэббиты, неужели без них не обойтись»? Мы и ответили: «Зачем-зачем, чтобы пройти собес!» 

    Очень смешно, но давайте я все-таки объясню.


    Мы можем принимать ивенты сразу в зеленой области и заставить наши приложения писать их в кликхаус.

    Но кликхаус любит, когда в него пишут сообщения пачками

    Другими словами, в него лучше запихнуть миллион сообщений, вместо того чтобы писать по одному. Kafka, Rabbit или Яндекс.Кью выступают как буфер, и мы можем контролировать с его помощью входящую нагрузку. 

    Как бывает: в одну секунду пришло 10 тысяч ивентов, в следующую — тысяча, в другую — 50 тысяч. Это нормально, пользователи рандомно включают свои мобильные приложения. В таком случае в кликхаус напрямую будет заходить то 2 тысячи, то 10 тысяч сообщений. Но с помощью буфера вы можете подкопить сообщения, потом достать из этой копилки миллион ивентов и направить в кликхаус. И вот она — желанная стабильная нагрузка на ваш кластер.

    Это все история про очереди

    — Во-первых, очереди можно использовать для передачи сообщений между различными сервисами. 

    Например, для бэкграунд задач. Вы заходите в админку магазина и генерируете отчет по продажам за год. Задача трудоемкая: нужно прочитать миллионы строк из базы, это хлопотно и очень долго. Если клиент будет висеть постоянно с открытым http-коннектом — 5, 10 минут — связь может оборваться, и он не получит файл. 

    Логично выполнить эту задачу асинхронно на фоне. Пользователь нажимает кнопку «сгенерировать отчет», ему пишут: «Все окей, отчет генерируется, он придет на вашу почту в течение часа». Задача автоматически попадает в очередь сообщений и далее на «стол» воркера, который выполнит ее и отправит пользователю на ящик.

    — Второй кейс про кучу микросервисов, которые общаются через шину. 

    Например, один сервис принимает ивенты от пользователей, передает их в очередь. Следующий сервис вытаскивает ивенты и нормализует их, к примеру, проверяет, чтобы у них был валидный e-mail или телефон. Если все хорошо, он перекладывает сообщение дальше, в следующую очередь, из которой данные будут записываться в базу.

    — Еще один поинт — это падение дата-центра, в котором хостится база.

    Конец, ничего не работает. Что будет с сообщениями? Если писать без буфера, сообщения потеряются. Кликхаус недоступен, клиенты отвалились. До какого-то предела выручит память, но с буфером безопаснее — вы просто взяли и записали сообщения в очередь (например, в кафку). И сообщения будут храниться там, пока не закончится место или пока их не прочитают и не обработают.


    Как автоматически добавлять новые виртуалки при увеличении нагрузки

    Чтобы протестить нагрузку, я написал приложение и протестил автоматически масштабируемые группы.

    Мы создаем инстанс-группу. Задаем ей имя и указываем сервисный аккаунт. Он будет использоваться для создания виртуалок.

    resource "yandex_compute_instance_group" "events-api-ig" {
      name               = "events-api-ig"
      service_account_id = yandex_iam_service_account.instances.id

    Затем указываем шаблон виртуалки. Указываем CPU, память, размер диска и т.д.

    instance_template {
        platform_id = "standard-v2"
        resources {
          memory = 2
          cores  = 2
        }
        boot_disk {
          mode = "READ_WRITE"
          initialize_params {
            image_id = data.yandex_compute_image.container-optimized-image.id
            size = 10
          }

    Указываем, к какому сетевому интерфейсу его подрубить.

    }
        network_interface {
          network_id = yandex_vpc_network.internal.id
          subnet_ids = [yandex_vpc_subnet.internal-a.id, yandex_vpc_subnet.internal-b.id, yandex_vpc_subnet.internal-c.id]
          nat = true
        }

    Самое интересное — это scale_policy.

    Можно задать группу фиксированного размера fixed scale с тремя инстансами A, B, C.

    scale_policy {
        fixed_scale {
          size = 3
        }
      }
    
      allocation_policy {
        zones = ["ru-central1-a", "ru-central1-b", "ru-central1-c"]
      }

    Либо использовать auto_scale — тогда группа будет автоматически масштабироваться в зависимости от нагрузки и параметров.

    scale_policy {
    auto_scale {
        initial_size = 3
        measurment_duration = 60
        cpu_utilization_target = 60
        min_zone_size = 1
        max_size = 6
        warmup_duration = 60
        stabilization_duration = 180
    }

    Главный параметр, на который надо обратить внимание, — это cpu utilization target. Можно выставить значение, при превышении которого Яндекс.Облако автоматически создаст нам новую виртуалку.

    Теперь протестируем автомасштабирование при увеличении нагрузки

    Наше приложение принимает различные ивенты, проверяет джейсонку и направляет в кафку.

    Перед нашей инстанс-группой стоит load-балансер. Он принимает все запросы, которые приходят на адрес 84.201.147.84 на порту 80, и направляет их на нашу инстанс-группу — на порт 8080. 

    У меня есть виртуалка, которая с помощью Yandex.Tank делает тестовую нагрузку. Для теста я установил 20 тысяч запросов в течение 5 минут. 


    Итак, нагрузка пошла.

    Сначала все ноды будут загружены во всех трех зонах (A, B и C), но когда мы превысим нагрузку, Яндекс.Облако должно развернуть дополнительные инстансы.

    По логам будет видно, что нагрузка выросла и в каждом регионе количество нод увеличилось до двух. В балансировку добавилась еще одна машина, количество инстансов тоже везде увеличилось.

    При этом у меня был интересный момент. Один инстанс, который находится в регионе С, записывал данные (от момента приема данных до записи) за 23 миллисекунды, а у инстанса из региона А было 12,8 миллисекунд. Такое происходит из-за расположения кафки. Кафка находится в регионе А, поэтому в нее записи идут быстрее.

    Ставить все инстансы кафки в одном регионе — не надо.

    Когда добавилась еще одна машина, новая нагрузка спала, показатель CPU вернулся к норме. Полную аналитику по тестовому запуску можно посмотреть по ссылке: overload.yandex.net/256194.


    Как написать приложение для работы с очередями и буферами обмена

    Приложение написано на golang. Сначала мы импортируем встроенные модули.

    package main
    
    import (
        "encoding/json"
        "flag"
        "io"
        "io/ioutil"
        "log"
        "net/http"
        "strings"
    
    )

    Затем подключаем github.com/Shopify/sarama — это библиотека для работы с кафкой.

    Прописываем github.com/prometheus/client_golang/prometheus, чтобы метрики передавались в API Metrics.

    Также подключаем github.com/streadway/amqp для работы с rabbitmq.

    Затем следуют параметры бэкендов, в которые мы будем записывать.

    var (
        // Config options
        addr     = flag.String("addr", ":8080", "TCP address to listen to")
        kafka    = flag.String("kafka", "127.0.0.1:9092", "Kafka endpoints")
        enableKafka    = flag.Bool("enable-kafka", false, "Enable Kafka or not")
    amqp    = flag.String("amqp", "amqp://guest:guest@127.0.0.1:5672/", "AMQP URI")
    enableAmqp    = flag.Bool("enable-amqp", false, "Enable AMQP or not")
    sqsUri    = flag.String("sqs-uri", "", "SQS URI")
    sqsId    = flag.String("sqs-id", "", SQS Access id")
    sqsSecret    = flag.String("sqs-secret", "", "SQS Secret key")
    enableSqs    = flag.Bool("enable-sqs", false, "Enable SQS or not")
    
        
    
        // Declaring prometheus metrics
        apiDurations = prometheus.NewSummary(
            prometheus.SummaryOpts{
                Name:       "api_durations_seconds",
                Help:       "API duration seconds",
                Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001},
            },
        )

    Адрес кафки — (строка).

    Включить кафку или нет — поскольку приложение может писать в несколько разных бэкендов.

    В приложении реализована возможность работы с тремя очередям.

    Первое — это кафка.

    Второе — amqp для рэббита.

    И третья очередь — sqs для Яндекс.Кью.

    Дальше мы открываем и задаем общие глобальные переменные для работы с нашим бэкендом. Прописываем настройки prometheus для отображения и визуализации.

    В main мы включаем кафку, рэббит и создаем очередь с названием Load.

    И если у нас включен sqs, мы создаем клиент для Яндекс.Кью.

    Дальше наше приложение по http принимает несколько инпоинтов:

    /status просто отдает okey, это сигнал для load-балансера, что наше приложение работает.

    Если вы кидаете запрос на /post/kafka, ваша джейсонка попадет в кафку. Также работают /post/amqp и /post/sqs.

    Как работает кафка

    Кафка — простой, некапризный и очень эффективный инструмент. Если вам нужно быстро принять и сохранить много сообщений, кафка к вашим услугам.

    Как-то на одном из проектов важно было уложиться в маленький бюджет. И вот представьте, мы берем самые дешевые машины без SSD (а кафка пишет последовательно и читает последовательно, так что можно не тратиться на дорогие диски), ставим кафку и zookeeper. Наше скромное решение на три ноды спокойно выдерживает нагрузку 200 тысяч сообщений в секунду! Кафка — это про «поставил и забыл», за пару лет работы кластер ни разу нас не потревожил. И стоил 120 евро в месяц.

    Единственное, что нужно запомнить — кафка очень требовательна к CPU, и ей очень не нравится, когда кто-то рядом поджирает проц. Если у нее будет сосед под боком, она начнет тормозить.

    Кафка устроена так: у вас есть topic, можно сказать, что это название очереди. Каждый topic бьется на части до 50 partitions. Эти партиции размещаются на разных серверах.

    Как вы видите на схемке, topic load разбит на 3 партиции. Partition 1 оказывается на Kafka 1, вторая партиция — на кафка 2, третья — на 3. Тем самым нагрузка полностью распределяется. Когда кластер начинает принимать нагрузку, сообщения пишутся в один топик, а кафка раскидывает их по партициям, гоняет их по кругу. В итоге все ноды нагружаются равномерно.

    Можно заморочиться и разбить топик на 50 партиций, поставить 50 серверов и расположить на каждом сервере по 1 партиции — нагрузка распределится на 50 нод. И это очень круто.

    Партиции могут реплицироваться благодаря zookeeper. Кафке необходимо минимум 3 ноды зукипера. Например, вы хотите, чтобы ваша партиция реплицировались на 2 ноды. Указываете репликейшн фактор 2 и каждая партиция будет закинута 2 раза на рандомные хосты. И если ваша нода упадет, то благодаря зукиперу кафка это увидит: «ага, первая нода в дауне», кафка 2 заберет себе первую партицию.

    Как я разворачивал кафку с помощью Terraform

    В репозитории у нас есть terraform-файл, он называется kafka.tf .

    Вначале мы поднимем 3 зукипера: resource “yandex compute instance” “zookeeper count = 3”.

    Потом находим “zookeeper_deploy”, который деплоит наш зукипер. Хорошо, если он будет вынесен на отдельные машины, где кроме него ничего нет. Далее собираем айдишники нод и генерируем файл. Запускаем ansible для настройки зукипера.

    Кафку поднимаем аналогично зукиперу и, что важно, после него.

    Как работает RabbitMQ 

    Благодаря тому, что кафка по сути просто сохраняет сообщения на диск и по запросу отдает клиенту с нужного места, она очень и очень шустрая. Производительность рэббита значительно ниже, но он напичкан фичами под завязку! Рэббит пытается сделать очень многое, что естественным образом влечет за собой потребление ресурсов.

    Рэббит уже не так прост — тут вам и exchanges с роутингом, и куча плагинов для delayed messages, deadletter и прочего хлама. За сообщениями следит сам кролик. Как только консьюмер подтвердил обработку сообщения, оно удаляется. Если консьюмер отвалился посередине — рэббит вернет сообщение в очередь. В общем, хороший комбайн, когда нужно перекидывать сообщения между сервисами. Цена этого — производительность.

    Рэббит практически все делает внутри себя. В нем есть много встроенных инструментов, можно подключать разные плагины, много настроек для работы с сообщениями и очередями.

    Если вам нужно перекидывать сообщения между сервисами в небольшом количестве — ваш выбор однозначно RabbitMQ. Если вам необходимо быстро сохранять кучу событий — метрики от клиентов, логи, аналитика и т.д. — ваш выбор kafka. Подробнее о сравнении двух инструментов можно прочитать в моей статье.

    И еще: рэббиту не нужен зукипер.

    Ребреин
    REBRAIN: Онлайн-практикумы для Инженеров

    Комментарии 42

      0
      Если вам нужно перекидывать сообщения между сервисами в небольшом количестве — ваш выбор однозначно RabbitMQ.

      Ваш выбор однозначно лёгкий и быстрый Redis (PubSub или Streams) или тот же NATS или NSQ, но не как не оверхед в виде RabbitMQ.
        +7
        Так паб саб вроде же не хранит сообщения если нет консюмеров… Все таки это больше для fire and forget. Раббит же будет держать месседжи пока ваши сервисы которые возможно лежат из за бага или еще чего не поднимутся, и вы не потеряете данные.
          +3

          Именно. rabbitmq и nats в совершенно разных нишах для совершенно разных задач. Мне очень нравится nats, но в последний раз я его использовал очень давно — чаще нужна надежная доставка, чем ненадежная.

            0
            Комментарий был удалён.
            +1
            pubsub не хранит, а streams хранят
              0
              Именно, но все зависит от задачи. В Redis можно и блокирующий список использовать для организации очереди если он вам подходит, и он так же будет хранить сообщения.
                0
                Действительно. Давно не следил за редисом. Похоже Redis Streams это кафка на минималках. Но все же интересно. Может пригодится когда нибудь.
              +1
              Кроль это не только очереди, но и средство маршрутизации сообщений. При использовании редиски будете эту часть писать сами.
              +5
              Я только не понял какое отношение процесса имеет к «Зачем-зачем, чтобы пройти собес!»?
                –1
                Чтобы выстроить интересную, логичную цепочку
                  +1
                  И как, получилось?
                0
                Вот на днях на собесе спрашивали, «вы пишите в своем резюме, что есть опыт работы с RabbitMQ, а почему не используете Kafka? Kafka же лучше!». Мой ответ, что это лишь инструмент, команда имеет опыт работы с RabbitMQ, и мы не умеем админить Kafka, и нету задач для него — обрабатывать кучу ивентов в реалтайме, такой ответ интервьювера не удовлетворил.
                  +2
                  Нужно было ещё указать на различия в RabbitMQ и Kafka и упомянуть объёмы данных (например если у вас 2 сообщения / в сек, то можете вообще очередь не использовать). Например какой-нибудь dead-letter-exchange для меня киллер-фича, задачи которые он красиво решает ставят в небольшой ступор пользователей Kafka, когда выясняется что нужно городить здоровенный костыль чтобы сделать это на Kafka.

                  Ну и в целом да, это основное правило собеседований — интервьюверы зачастую не любят, когда кандидат не соглашается с их единственно-верным мнением. Или принимать как есть, или подстраиваться под обстоятельства — просто соглашаться с тем что кафка круче.

                  PS: Я разбирался с протоколом AMQP и считаю его ужасным, с недавних пор ко всем решениям на AMQP отношусь с подозрением.
                    +1
                    Это больше говорит о интервьювере, чем о вас. Само заявление «Kafka же лучше!» уже вызывает фейспалм — это примерно, как заявить, что разводной ключ лучше молотка.
                      0
                      Ответ будет зависеть от вакансии, но скорее всего в данном случае интервьюер хотел проверить инициативность и любознательность кандидата при помощи провокационного вопроса.
                        0
                        Если вопрос звучал именно, как «Почему вы не используете Х?», то единственный достойный ответ будет «Потому что мы уже используем Y и нас устраивает!». То есть доказывать нужно как раз преимущества Х, а не защищать используемую технологию, если она исправно выполняет свои обязанности. А любознательность проверяется вопросом «А если вместо используемого Y использовать Х? Плюсы, минусы, подводные камни?»
                          0
                          Не всегда интервьюер хочет разжёвывать вопросы и это его право — заодно проверить soft skills кандидата.
                  0
                  Давно работаем с раббитом… топлю теперь на работе за ввод кафки в некоторые процессы. В раббите мне не хватает шардинга из коробки, ну и остальные плюшки. Иногда когда все упало нужно риплей сделать, и приходится танцевать с бубном и вытаскивать данные с тяжелых хранилищ типа snowflake. А так как кафка это аппенд онли лог, восстановление системы должно стать намного легче.

                  Роутинга там как в кролике нет конечно нет, но думаю это можно пережить…
                    0
                    В RabbitMQ есть cluster из коробки, как раз для HA/HL. Но в целом работает кролик крайне медленно.
                    Если трафик до 10к-20к сообщений / сек — всё хорошо.
                    Если выше — стоит рассматривать другие решения.
                      0
                      Проблема в том что в раббите нужно свой велосипед писать для ребалансировки шардов если вдруг нужно убрать/добавить консюмер. Насчет медленности раббита — зависит от задачи. У нас мессежди обрабатываются сравнительно медленно, поэтому скорость самого раббита вообще не интересна.
                        0
                        Есть команды для перебалансировки. Через раз даёт хороший результат =)
                        rabbitmq-queues rebalance "all" --vhost-pattern "a-vhost" --queue-pattern ".*"

                        Я тоже с этим столкнулся и решил, что меня пока устраивает такой формат.
                    0
                    Возможно я конечно не нашел в kafka, но вот как раз на прошлой неделе хотел разделить топик на несколько партиций и раскидать их по серверам, в итоге нашел что только указав replication factor можно потом раскидать партиции по серверам. Тогда получается все равно будет все писаться на все серверы. Но у меня задача была не разделить нагрузку а увеличить место для хранения сообщений. Возможно не в ту сторону смотрел.
                      0

                      kafka-reassign-partitions утилита

                        0
                        Да, я ее видел и даже читал, но видимо в завале работы понял не совсем правильно возможность задать Replica
                        Перечитал еще раз и более менее все стало яснее
                        Спасибо
                      +1
                      Каждый topic бьется на части до 50 partitions

                      Что это за лимит в 50 partitions? Откуда он?

                        0

                        Сложнее задача когда нужно разделить очередь на под очереди.
                        Чтобы сыпались ивенты в систему, а процессились у каждого пользователя свои под очереди в той последовательности в какой они сохранились.
                        Проще всего здесь использовать базу данных и пулить раз в х секунд с группировкой по пользователю.

                          +1

                          База умрет под нагрузкой, плюс всякие вакуум. Очереди в базе так себе идея, если есть нагрузка.


                          под базой имел ввиду sql.

                            0

                            Все верно. А иначе разве что динамически создавать топики под пользователя.
                            Но если много топиков, то это шардинг топиков

                            0
                            Ну так именно эту проблему шардинг кафки и решает. И автоматический ребаланс вроде как киллер фича.
                              0

                              Но как консьюмить миллионы топиков?
                              Продюсеру то все равно. Открыл соединение и если указать в настройках Кафки, что автосоздавай топики, то по user id писать.
                              Скорее корректнее разбивать на partitions где key будет user id

                                0
                                Ну да. Шардинг это не про «отдельный топик по id» (id как пример), это про обеспечение условия fifo по id. Это просто условие что мессежди одного и того же юзера будут обработаны в определенном порядке, за счет того что они прилетят на один и тот же консюмер.

                                Решение с топиком под каждого юзера кажется мне избыточным и возможно даже не реализуемым. Вот что пишут в самой кафке

                                Unlike many messaging systems Kafka topics are meant to scale up arbitrarily. Hence we encourage fewer large topics rather than many small topics. So for example if we were storing notifications for users we would encourage a design with a single notifications topic partitioned by user id rather than a separate topic per user.

                                The actual scalability is for the most part determined by the number of total partitions across all topics not the number of topics itself (see the question below for details).


                                И

                                Jun Rao (Kafka committer; now at Confluent but he was formerly in LinkedIn's Kafka team) wrote:

                                At LinkedIn, our largest cluster has more than 2K topics. 5K topics should be fine. [...]

                                With more topics, you may hit one of those limits: (1) # dirs allowed in a FS; (2) open file handlers (we keep all log segments open in the broker); (3) ZK nodes.
                                  0
                                  просто вот нужно и fifo и шардинг :)
                            0
                            Прочитал статью здесь и на медиум, но так и не понял, в каком режиме тестировался кролик, только в режиме «записи на диск», т.е. режим «in memory» не тестировался и не сравнивался априори?
                              0
                              Перед нашей инстанс-группой стоит load-балансер

                              Перед нашей группой экземпляров стоит балансировщик нагрузки.
                                0
                                Как только консьюмер подтвердил обработку сообщения, оно удаляется.

                                При этом нужно понимать, что подтверждение одностороннее в случае AMQP, что может иметь последствия. Или есть опция включить ответы на подтверждения?
                                  0
                                  Логично выполнить эту задачу асинхронно на фоне.

                                  Логично эти данные денормализовать и сохранить. Жечь процессорные мощности снова и снова на одной и той же задаче весьма странно.
                                    0
                                    Как это противоречит тому что написано? Это конечно же можно делать как часть процесса обработки месседжа. А там как хотите так и делайте. Хоти в s3 кидайте, хотите в базу. Хотите по ключу определяйте что такой рипорт уже просили и возвращайте данные с прошлой обработки. А где вы их храните — это же ваш бизнес решает.

                                    Тут же не об этом. Тут о том что браузер не будет ждать так долго и оборвет коннекшен. И в добавок если что-то временно пошло не так, можно делать попробовать позже и захендлить этот кейс а не ждать пока юзер обнаружит что что0то зафейлилось и он должен повторить просьбу.
                                    –4
                                    Есть еще вариант — нанять миллиард китайцев ))))… Латенси будет плохая, но объем обработок большой. )))
                                      +1
                                      Я ожидал минималистичный велосипед вместо известных решений, а получил так сказать минималистичный «вебинар» по кафке и еще более минималистичный по рэбиту)
                                        –1

                                        Если вы про одмин Кафки — это очень короткий рассказ, как правильно юзать Кафку вместе с контрактами в виде авро, как ее обезопасить и т.д.
                                        Если вы разраб, который левой пяткой ещё и кластеры админит — покажите ваш код, что бы было понятно, как вы совмещаете все навыки не в ущерб друг другу.
                                        И нет, вам не нужна Кафка с 10к евентов в секунду и exactly one логикой.

                                          0
                                          Спасибо, полезная статья для тех кто с кафкой дела не имел а только слышал. Появилось желание поиграться с ней, может в будущем пригодится.

                                          Только полноправные пользователи могут оставлять комментарии. Войдите, пожалуйста.

                                          Самое читаемое