Comments 109
Что скажете об https://github.com/travisjeffery/jocko ?
Я не автор но скажу что тащить в прокрашен неизвестный проект 1-го разработчика имеющего известную альтернативу-предшественника с тысячами статей, туториалов и рассказов о том как поднимали когда всё сломалось — интересно, но слишком рискованно.
Судя по issues и отсутствию поддержки, проект, кажется, создавался в первую очередь как учебный (идея прикольная :) ). Как им можно пользоваться для реальных задач – не знаю, разве что подменять Кафку в интеграционных тестах (если протокол на самом деле совместим).
Добрый день. Очень хорошая статья!
Вопрос возник… Если разработчик контролирует commit offset, то как kafka сможет понять, что меседж не законсьюмился, если консьюмер не закомитил оффсет но и не обработал меседж. Есть какой-то тайм-аут период после которого зукипер вновь запроцессит незакомиченный меседж, либо же я что-то не правильно понял в логике работы с кафкой? Спасибо!)
Оффет контролирует положение вашего консьюмера в логе. С точки зрения Кафки если вы не закоммитили изменения, вы не сдвинулись с места.
Вы можете не коммитить события, но тогда вы каждый раз будете перечитывать их из Кафки повторно и никогда не получите новые события.
И еще. Может ли измениться партиция при таком сценарии?
Более того, коммит можно делать самостоятельно (синхронно и асинхронно) из poll-loop-а или использовать автокоммит консьюмера. При большой нагрузке нет особого смысла делать это при обработке каждой пачки сообщений.
Закоммиченные оффсеты партиций топиков, на которые подписана группа — место, с которого начнут читать сообщения консьюмеры группы после ребалансировки. Таким образом, если какие-то сообщения были обработаны, но коммит по ним не произведен — группа после ребалансировки получит их повторно.
— если один консьюмер читал из одной партиции, он продолжит из нее читать (если не было ребалансировки между двумя чтениями)
— если один консьюмер читал из нескольких партиций, он попытается опять вычитать данные из них, но на сколько я знаю, в документации нет гарантий сохранения порядка чтения из партиций (те он может и измениться).
В документации к консьюмеру есть следующая фраза «If a consumer is assigned multiple partitions to fetch data from, it will try to consume from all of them at the same time, effectively giving these partitions the same priority for consumption. However in some cases consumers may want to first focus on fetching from some subset of the assigned partitions at full speed, and only start fetching other partitions when these partitions have few or no data to consume.»
Также есть открытый KIP, который говорит о том, что сейчас поведение по выбору партиций при чтении детерминировано.
Соответственно механизм контроля и исправления ошибок связанных с этим должен быть.
Решит ли зукипер за это время, что меседж не законсьюмился и повторит ли операцию?
Зукипер не повторяет, повторяет консьюмер.
Если консьюмер не успел закормить чтение в регулируеооме временное окно то он считается выбывшим и эти данные передадут другому (если их в группе несколько). А когда консьюмер доработает — он попытается закоммититься в кафку и получит ошибку.
У нас были довольно медленные таски и в них эту проблему решали параллельным тредом в консьюмере, который раз в 30 сек коммитил предыдущее положение указателя.
В итоге если тезисно, то какие плюсы у Kafka vs Pulsar? В упоминается, только то, что Kafka традиционно для таких задач выбирается. Но какие-то тесты были?
Но у него есть очень много интересных фичей – например, подписка на ключи, nack и отложенное чтение, stateless брокеры и т.д. Если вы хотите хранить ОЧЕНЬ много данных, опять же у Пульсара есть очень крутая фича – Tiered Storage, которая в Кафке пока только планируется (по слухам, Confluent обещает ее до конца года).
В нашем случае оба решения подходили под задачу. С одной стороны много продакшн опыта, много специалистов на рынке, много саппорта и большое комьюнити (Кафка). С другой стороны более фичастое и заряженное решение, по которому значительно меньше информации в сети и для которого скорее всего придется выращивать специалистов с нуля и тратить много времени на ресерч.
Я полагаю, аналогом Kafka Streams под Pulsar будет прикрутить в нему Flink: https://flink.apache.org/2019/05/03/pulsar-flink.html
1. Планируете ли вы отдавать data-bus в open-source? Обычно внутренние технологии быстро устаревают, если разработка ведется только внутри компании.
2. Как вы решаете вопросы изменения количества партиций топика кафки без даунтайма?
3. С какими проблемами вы столкнулись при использовании? Как принимаете фидбэк от команд, которые интегрируются с вашим решением?
По вопросам:
1. Не уверен. Вокруг data-bus у нас еще свой коннектор, свой schema registry, своя спецификация, свои либы и тд. То есть data-bus очень интегрирован в платформу Авито и его сложно оттуда отдельно отковырять :)
2. Пока нет автоматизированного решения. Происходит это довольно редко и в таких случаях мы создаем новый топик с целевым числом партиций, реплицируем в него старые данные и затем переключаем продюсеров и консьюмеров (в случае, когда надо сохранить упорядоченность). В случаях, когда у топика идет раскидывание ключей раунд робином, мы просто увеличиваем ему число партиций на лету.
3. Самый холиварный вопрос – что нужно реализовать самим, а что взять уже готовое (например, долго обсуждали брать ли Schema Registry от Confluent). Это, наверное, основные проблемы :) Фидбек получаем в основном через канал data-bus в слаке, где есть все заинтересованные. Плюс сами рассказываем про data-bus и его развитие на внутренних митапах и ведем документацию.
1. Что представляет собой дата бас? просто библиотека используемая из продюсеров и консьюмеров или что-то ближе к микросервису?
2. Насколько большая плата за использование дополнительного слоя?
2. Сложно сказать. С одной стороны, по сравнению с ванильной Кафкой производительность упала примерно в 3 раза (бенчмарки). С другой стороны, нам для этого кейса не нужна производительность в миллионы эвентов в секунду и мы никак это не ощутили. При этом дата-бас надо поддерживать. Но зато поддержка снимается со всех команд сервисов, которые хотят интегрироваться с дата-басом (им достаточно подключить библиотеку и дернуть метод. Сейчас клиенты к дата-басу появляются с помощью кодогенерации и сервисам в целом остается только начать отправлять данные без каких-либо настроек. Так что не знаю про плату – везде компромиссы.
А слово сервисы в статье употребляется 35 раз, специально посчитал :)
Микро – это маркетинговый булщит— а вот и нет)
Точнее, если человек не понимает о чем говорит, то булшит, а если понимает, то это шаблон проектирования в рамках SOA. Суть шаблона в уменьшении функциональности сервиса и возможности независимого, от других сервисов, деплоя такого сервиса (что вероятно для Вашего случая как раз верно, но статья у Вас не об этом).
Лично по моим наблюдениям (дальше сарказм) — если есть проблемы с транзакциями, то это SOA вырожденная до микросервисов, а если все нормально, то это просто SOA.
На какой технологии реализован Data-bus?
Каким образом происходит отправка данных в Data-bus? (для Apache Kafka клиентскую библиотеку подключать надо, для Data-bus тоже?)
На данный момент data-bus написан на go с использованием библиотеки sarama от Shopify.
Да, для клиентов data-bus нужно подключить простую библиотеку (обертку над Апи серверной части). Далее при вызове методов produce/consume серверная часть data-bus автоматически поднимает необходимое число инстансов (или роутит запрос на уже существующие инстансы) продюсеров и консьюмеров Кафки в кубернетесе, но все это скрыто от клиента.
as a Service используется чтобы показать, что мы абстрагируемся от слоя хранения и предоставляем сервис – брокер сообщений (а уж на Кафке он или на csv файлах – не важно).
Используете ли вы при написании Сервисов спеку OSB www.openservicebrokerapi.org?
Мы используем фактор репликации 3 и min.insync.replicas 2. При этом прямо сейчас мы ведем работы по репликации данных между кластерами в нескольких ДЦ. Т.е. в целевой схеме фактор репликации будет 3*<число ДЦ>
Фактор репликации внутри одного ДЦ выбрали равный 3, т.к. 2 мало, 4 дорого.
Далее, у нас есть задача обеспечить масштабирование, отказоустойчиовость и не потерять данные в случае полного выхода из строя ДЦ – соответственно, во всех ДЦ стоят идентичные кластера Кафки с полным набором данных.
Не знаю, почему подход вам кажется странным. Это стандартный подход – использование active active топологии кластеров в нескольких ДЦ. Подробнее он описан, например, тут.
Мы не включаем брокеров из разных ДЦ в один кластер – это скорее антипаттерн. Мы используем active-active топологию с абсолютно независимыми кластерами в разных ДЦ и репликацией данных (Confluent Replicator) между ними. Таким образом получается, что в каждом ДЦ у нас по 3 копии данных, а всего 3*<число ДЦ> копий данных
Зацикливание можно избежать введением абстракции неймспейсов или использованием относительно новой фичи Кафки – хедеров. Confluent Replicator использует второй подход, он добавляет в отправляемые события хедер с идентификатором кластера, из которого эти события были изначально получены, тем самым предотвращая зацикливание.
Но ответ породил другой вопрос, каким образом Вы производите переключение на другой кластер?
Насколько я знаю, использование репликатора не обеспечивает active-active, он просто копирует данные и настройки топиков в другой кластер, мониторит лаг и т.д. Для схемы active-active потребуется еще один репликатор в обратную сторону, но тогда как мониторится лаг, получится ведь каша?
Может Вы имели ввиду чтение, читать можно с любого кластера, запись переадресуется на основной кластер?
Да, вы правы. В нашем случае все кластеры связаны между собой и каждый реплицирует данные в другие. Ограничений никаких нет. Один сервис может раскатить своих продюсеров и консьюмеров в нескольких ДЦ, при этом каждый будет писать в кластер своего ДЦ и читать из кластера своего ДЦ. Вот тут довольно подробно описана верхнеуровневая идея.
Большая оговорка – пока архитектура на несколько ДЦ живет только на бумаге, скорее всего на этапе завершения тестовой реализации мы поймем, что все не так просто и может быть что-то еще изменим.
А почему не использовали stretched-cluster с rack awareness? Если ДЦ географически близко, то это — самый простой и правильный способ обеспечить failover
А можно какие-то пруфлинки, что это самый правильный способ? Про самый простой не спорю.
Пруфов как таковых нет, но это наследуется от логики. В случае растянутого кластера переключения потребителей услуги не требуется, работаем с живыми бутстрапами. В случае же разрыва соединения между ДЦ при поставщиков в каждом из них мы получаем split-brain, т.е. в одном и том же топике разных кластеров разная информация лежит в одних и тех же оффсетах соответствующих партиций
within a single datacenter. We assumed low latency and high bandwidth between
brokers and clients. This is apparent in default timeouts and sizing of various buffers.
For this reason, it is not recommended to install some Kafka brokers in one datacenter and others in another datacenter.» В целом, рекомендуемой топологией является именно active-active.
Могу в ответ только сообщить изустные предания от архитекторов Confluent, что stretched cluster при latency менее 7 млс между ДЦ является вполне себе решением. Просто под мультиДЦ обычно подразумевается широко географически распределённая топология, конда физика против такой latency.
Такие вопросы сложно теоретически обосновать, надо тестировать для конкретного случая.
При этом Clustering режим NATS Streaming не давал возможности сильного горизонтального масштабирования (вероятно, это уже не проблема после добавления partitioning режима в 2017 году).
Все еще проблема. NATS предельно примитивен в том, что касается кластеризации. partitioning в нем это не более чем настройка нескольких NATS Streaming серверов с ограничением на то, кто какие каналы обслуживает. Один канал может быть только на одном сервере. Т.е. это никакое не масштабирование, по сути даже. Чтобы было надежно каждый сервер еще дополнительно превращается в FT кластер. С учетом, что FT кластер требует еще и общее хранилище дисковое, то получается совсем печально.
Только недавно сами рассматривали возможности NATS Streaming как быстрой простой in-memory очереди масштабируемой. В итоге получилось, что все режимы кластеризации, по сути, бесполезны, т.к. хранение в памяти не поддерживается, а фич при этом полезных это особо не дает. И масштабирования никакого тут и нет. Только городить самому распределение очередей между сервисами и эмулировать то, что таже кафка делает сама. Посему решили, что лучше поработать над архитектурой сервисов, чтобы им было достаточно того, что предлагает NATS обычный.
С учетом общей философии NATS, я думаю в этой области вряд ли что-то изменится.
А так, кафка это конечно штука уникальная на рынке. Ей бы избавиться от зукипера и цены бы ей не было. Топик эффсеты уже перенесли. Осталось всего ничего выбор контроллера и хранение конфигурации общей.
Кафка не такая уж уникальная, технически Pulsar ее полностью превосходит, единственная проблема Пульсара — он гораздо менее распространен
Вы сами перечислили все его преимущества. Как оно работает на практике — в русскоязычной среде, увы, малоизвестно, потому что большинство митапов по Пульсару проходят в Китае на китайском языке.
Да, в теории Пульсар во многом лучше Кафки.
Но все равно как-то все печально. Браться за проект, про который даже блог постов буквально пара штук — ну такое. И не важно, что у яху какого-то оно в продакшене уже долго. Они то все про него знают и понимают.
- Что такое schema registry? Думается, это реестр схем сериализации сообщений типа proto, avro. Как с этим работают? И какие схемы сериализации вы тестили? Какие оверхеды?
- С сообщением можно отправить какой-то ключ, как его используют? Можно ли его получить на consumer-е? И зачем ключ если уже есть offset?
- Как обеспечиваете персистентность данных брокера в k8s?
UPD: перечитал про ключи, понял что используют для роутинга по партициям, но ведь можно же ключ использовать для бизнес-логики, например id объявления в качества стае ключа?
2. Ключ можно рассматривать как идентификатор события. Как минимум без ключа нельзя гарантировать упорядоченность (так как по ключу идет распределение событий по партициям топика). Офсет – индекс, указывающий на положение события в партиции. Офсеты будут повторяться между разными партициями.
3. Кафка и зукиперы у нас запущены на железных дисках в LXC. В кубернетесе запущен сервис data-bus, он stateless.
К тому же, если логика партиционирования записей основана на ключе — вы получаете гарантию последовательности чтения (консьюмер читает партицию всегда по порядку — при этом для полученных из разных партиций записей их взаимный порядок не гарантируется даже в рамках одного консьюмера) такой же, как и записи. Есть классы задач, которые отлично решаются с использованием «локального состояния» — когда, если есть гарантия, что события с одинаковыми ключами попадут в одну сущность обработчика, вы можете не использовать отдельное хранилище, к которому будете обращаться при получении каждой записи из Кафка. Вокруг этой идеи построены Kafka Streams, Flink и другие движки потоковой обработки.
На KIP с избавлением от зукиперов, кстати, я давал ссылку в этой статье :)
Он же сообщения на диск сохраняет и размер очереди в можно регулировать
--mem-queue-size=0
и тогда все сообщения будут проходить через диск. Но надо будет ещё позаботиться о репликации. NSQ мы используем для clickstream аналитики, где потеря одного события не будет иметь катастрофических последствий. В случае с шиной нам нужна гарантия что мы ничего не потеряем.Nsqlookupd ищет топики. Найдя топик (на каком nsqd он находится) consumer подключится к этому nsqd напрямую.
Сам lookupd только дискаверит.
Просто, но с кучей нюансов. Нам оказалось проще продублировать (есть набор микросервисов объедененных очередью, тоесть мы создали несколько идентичных наборов со своей тдельной очередью в каждом) nsqd несколько раз, не связывая их в "кластер", чем попытаться бороться с дублированием сообщений.
при повторной отправке по таймауту оно очень редкое.
Природа наших событий позволяет допустить редкое дублирование событий. А увеличив время таймаута мы снизили эту вероятность практически к 0, да и микросервисы обрабатывают сообщения ооочень быстро, там в основном математика и перепосылка сообщения следующему сервису.
а вот какой нюанс реально вылез боком — это не упорядоченность сообщений (нам она важна), о чем собственно и пишется в доке, но как всегда оказалось неожиданно )), пришлось реорганизовать консюмеров
в целом особой проблемы в администрировании не возникло, группа микросервсиов связывается со своей очередью. Ну как сказать жестко, все конфигурируемо через консул, можно указать другой инстанс NSQD перезапустить сервисы и все ок
Какие же это микросервисы, если для общения между собой нужна специальная библиотека? Получается, все сервисы должны быть написаны на одном яп. Или будете под каждый язык свою версию писать?
Мало ли пригодится.
www.infoq.com/news/2016/02/services-distributed-monolith
Все происходит через data-bus. В том числе есть поддержка consumer groups через data-bus.
По поводу request/reply – NATS Streaming его не поддерживает, на сколько я знаю (только NATS). Сошлюсь на объяснение ведущего разработчика NATS Streaming – тут он объясняет, почему такая фича не нужна для персистентных очередей. Но в целом, конечно, стоит внимательно изучить внутреннюю реализацию NATS Streaming (тем более тоже на go) и мб что-то нам позаимствовать/оптимизировать.
- Во сколько партиций велась запись продьюсерами?
- Использовали штатный perf-test или писали свой? Всю нагрузку с одного хоста подавали?
- Это максимальная производительность, которой удалось достичь? Настраивали ли producer (max.inflight.requests, max.linger.ms и пр)? Какой стоял acks (от всех брокеров ожидалось подтверждение записи или только от лидера — ибо это сказывается с одной стороны на гарантиях записи, а с другой — на времени выполнения записи)?
Аналогичные вопросы по консьюмерам. Тюнили?
Ну и про группы, имхо, не очень точно (и понятно) написано. При изменении состава читателей группы (в т.ч. первичном подключении) каждый консьюмер группы оказывается назначенным (assign) на одну или несколько партиций (при этом на каждую партицию назначен только один консьюмер из каждой подключенной к кластеру группы). Далее в процессе poll-инга, консьюмер запрашивает сообщения только из партиций, на которые он назначен до момента отключения или ребалансировки группы.
P.S. вообще, помимо автоматической балансировки группы и назначения консьюмеров на партиции брокером, можно сделать это «вручную».
2. Написали свой небольшой баш-фреймворк :) Под капотом он получал в конфиге список параметров, которые нужно изменять от теста к тесу и массив значений этих параметров (например, partitions [1,3,9,27,54]). Дальше крутились тесты по всем комбинациям всех параметров и по каждому снимались итоговые throughput/latency (min, max, 95%%, avg). Нагрузка шла с трех отдельных хостов (не тех, где стоит Кафка). Также кидали события разного размера.
3. Частично про конфигурацию ответил в пункте 2. Да, max.inflight.requests, max.linger.ms и другие параметры настраивали. Acks всегда стоял all, так как в проде у нас используется только acks=all. Плюс был включен довольно агрессивный fsync. Консьюмеров также тюнили. Во время тестов производительности ребалансировка консьюмеров не производилась.
В какой-то момент мы остались довольны полученными цифрами и перестали пытаться разогнаться еще больше. Я уверен, что можно получить более внушительные цифры.
Сервис А часто хочет узнать информацию, которой располагает сервис Б
Хотелось бы узнать как вы узнаете информацию из сервиса Б? То, что Вы отправляете сообщение в сервис через Кафка это понятно, но вот когда нужно сделать запрос в другой сервис тоже через кафка? Если да, то как осуществляете запрос?
Но есть и прямые вызовы сервисов, есть саги и т.д. Не все взаимодействие идет через Кафку.
Допустим: после перехода на k8s кол-во простоев выросло на Х%, это Y рублей, но также мы смогли заработать благодаря k8s N рублей и т.д.
Нигде не видел подробного экономического обоснования такого перехода.
Допустим для компаний уровня гугл и амазон, нет альтернативы. Но вот можно ли для авито сделать экономическое обоснование такого перехода, ведь и без него вы бы смогли работать.
Нигде не видел подробного экономического обоснования такого перехода.
Потому что экономическое обоснование обычно зависит от кучи факторов, которые могут сильно отличаться от компании к компании:
- скорость доставки новых фичей в продакшен до/после
- оптимальность утилизации ресурсов до/после
- стоимость поддержки и развития до/после
- etc
поэтому и задал вопрос, делали кто-нибудь такую модель, прежде чем вкладывать\закапывать деньги на подобные проекты.
Сейчас это выглядит больше как ИТишный проект с непрозрачной бизнес-ценностью.
Под прикрытием data-bus мы можем без даунтайма, незаметно обновлять версии Kafka, централизованно вести конфигурации producer’ов, consumer’ов, брокеров и т.д.
Можно поподробнее, как именно data-bus нивелирует даунтайм при обновлении? Вот «упала Kafka» и обновляется… Каким образом сервисы продолжают обмениваться собщениями при этом?
Если весь кластер целиком упал – сейчас никак не нивелируем, но в теории можем зароутить весь траффик на кластер в другом ДЦ через дата-бас прозрачно для сервисов. Если идет failover, можем корректно восстановить положение всех консьюмеров. Если делаем роллинг апдейт или рестарт, можем смотреть за состоянием абсолютно всех клиентов, которые сейчас работают с кластером. Если это обновление клиентов сарамы или конфигурации продюсеров и консьюмеров, то такой процесс происходит полностью на серверной части дата-баса без участия сервисов.
Или же как-то надо опросы по http настраивать?
databus работает и на запись и на чтение. Никаких опросов в клиенте не нужно настраивать, databus работает через push модель.
Интересно сравнить с кейсом Booking где они тоже сначала закрыли инфраструктуру (kubernetes, в их случае) за ширмой и отгребли проблем. У Авито сработало ровно обратное решение.
Kafka и микросервисы: обзор