Pull to refresh

Comments 109

Я не автор но скажу что тащить в прокрашен неизвестный проект 1-го разработчика имеющего известную альтернативу-предшественника с тысячами статей, туториалов и рассказов о том как поднимали когда всё сломалось — интересно, но слишком рискованно.

Вопрос скорее не об «тащить в прод», а о «пробовали или нет? если да, то какие впечатления?»
Смотрели на этот проект в рамках другой задачи. Сами не использовали, т.к. у решения нет поддержки consumer groups и из-за этого нам не подошло бы в любом случае.

Судя по issues и отсутствию поддержки, проект, кажется, создавался в первую очередь как учебный (идея прикольная :) ). Как им можно пользоваться для реальных задач – не знаю, разве что подменять Кафку в интеграционных тестах (если протокол на самом деле совместим).
оффтоп: просьба уделить больше внимания мошенникам — работодателям на Авито. Форма и содержание типовое ±, каждую неделю Петя меняется на Васю и т.д… в профиле одно единственное объявление. У площадки больше шансов бороться, чем у соискателя. Заранее благодарю!

Добрый день. Очень хорошая статья!
Вопрос возник… Если разработчик контролирует commit offset, то как kafka сможет понять, что меседж не законсьюмился, если консьюмер не закомитил оффсет но и не обработал меседж. Есть какой-то тайм-аут период после которого зукипер вновь запроцессит незакомиченный меседж, либо же я что-то не правильно понял в логике работы с кафкой? Спасибо!)

Спасибо за комментарий :)

Оффет контролирует положение вашего консьюмера в логе. С точки зрения Кафки если вы не закоммитили изменения, вы не сдвинулись с места.

Вы можете не коммитить события, но тогда вы каждый раз будете перечитывать их из Кафки повторно и никогда не получите новые события.
Пасиб за ответ) А что вы имеете ввиду под «каждый раз»? Как часто будет происходить этот каждый раз? И когда он вообще произойдет и при каком условии? При реконекте? В этом мой вопрос)
И еще. Может ли измениться партиция при таком сценарии?
Обычно, consumer poll'ит события из кафки в бесконечном цикле. Внутри цикла у вас происходит логика обработки событий (в том числе commit). Как только вы сделали все необходимые действия с полученными событиями, опять вызывается poll (в простом случае) и из кафки прилетает новая пачка событий (или повторно прилетает старая пачка событий, если не было коммита).
Не вводите людей в заблуждение. При поллинге в том же цикле коммит при обработке пачки делать не обязательно. При следующем полле консьюмер получит следующие сообщения, если группа не ребалансировалась.
Более того, коммит можно делать самостоятельно (синхронно и асинхронно) из poll-loop-а или использовать автокоммит консьюмера. При большой нагрузке нет особого смысла делать это при обработке каждой пачки сообщений.
Закоммиченные оффсеты партиций топиков, на которые подписана группа — место, с которого начнут читать сообщения консьюмеры группы после ребалансировки. Таким образом, если какие-то сообщения были обработаны, но коммит по ним не произведен — группа после ребалансировки получит их повторно.
Перечитал свое сообщение – да, вы правы.

Консьюмеры будут читать из топика и без коммита до момента аварии/ребаланса. При наступлении одного из этих событий, консьюмеры перечитают закомиченные офсеты из кафки и продолжат работать с этого места.
При повторном чтении есть несколько разных сценариев:
— если один консьюмер читал из одной партиции, он продолжит из нее читать (если не было ребалансировки между двумя чтениями)
— если один консьюмер читал из нескольких партиций, он попытается опять вычитать данные из них, но на сколько я знаю, в документации нет гарантий сохранения порядка чтения из партиций (те он может и измениться).

В документации к консьюмеру есть следующая фраза «If a consumer is assigned multiple partitions to fetch data from, it will try to consume from all of them at the same time, effectively giving these partitions the same priority for consumption. However in some cases consumers may want to first focus on fetching from some subset of the assigned partitions at full speed, and only start fetching other partitions when these partitions have few or no data to consume.»

Также есть открытый KIP, который говорит о том, что сейчас поведение по выбору партиций при чтении детерминировано.

А нет какой-то функциональности отложенной отправки как, скажем, в RabbitMQ? Типа мы консьюмером сообщение попытались обработать, но по какой-то причине не смогли и формально подтверждаем получение, но тут же помещаем его опять в очередь с пометкой «отложенного». И оно опять «стрельнет» через заданное количество миллисекунд.
Есть. На базе data-bus запускаем Queue as a Service. И там будет возможность переложить в очередь с заданной паузой.
Поэтому потенциально возможна ситуация повторного вычитывания уже обработанного сообщения. Чтобы минимизировать проблемы крайне желательно иметь идемпотентность как основное требование к обработчику сообщений.
Вопрос в том насколько часто такое может происходить и можно ли это контролировать? Если логика консьюмера очень долгая, например услово 5 минут. Решит ли зукипер за это время, что меседж не законсьюмился и повторит ли операцию?
Если ответить просто — то Zookeeper отслеживает живой или нет консьюмер (у него с ним постоянный TCP коннект). Поэтому кейс не самый частый, но на проде (если он более-менее нагружен) вполне себе может срабатывать закон больших чисел.
Соответственно механизм контроля и исправления ошибок связанных с этим должен быть.
Решит ли зукипер за это время, что меседж не законсьюмился и повторит ли операцию?
Зукипер не повторяет, повторяет консьюмер.

Если консьюмер не успел закормить чтение в регулируеооме временное окно то он считается выбывшим и эти данные передадут другому (если их в группе несколько). А когда консьюмер доработает — он попытается закоммититься в кафку и получит ошибку.


У нас были довольно медленные таски и в них эту проблему решали параллельным тредом в консьюмере, который раз в 30 сек коммитил предыдущее положение указателя.

Если логика обработки сообщений очень долгая, на стороне консьюмера нужно увеличить значение параметра `max.poll.interval.ms`. Кафка будет считать, что консьюмер жив и не будет его трогать, если следующий poll произойдет в рамках этого интервала (при условии, что консьюмер на самом деле жив и все это время отправляет heartbits контроллеру – это делается в отдельном background треде прозрачно для разработчика)
Также можно использовать exactly once подход на основе транзакций Кафки

Exactly once semantics на только основе транзакций Кафки сделать нельзя. Вернее, можно попробовать накостылить так, чтобы оффсеты хранились там же, где идет обработка и т.д., но это будет так себе решение. Транзакции в Кафке немного не для того сделаны.

В итоге если тезисно, то какие плюсы у Kafka vs Pulsar? В упоминается, только то, что Kafka традиционно для таких задач выбирается. Но какие-то тесты были?

У Пульсара нет аналогов KStreams и не такое больше сообщество (и не так много коммерческих компаний, продвигающих эту технологию, что важно). Также под Пульсар почти невозможно найти специалистов на рынке (по крайней мере в России).

Но у него есть очень много интересных фичей – например, подписка на ключи, nack и отложенное чтение, stateless брокеры и т.д. Если вы хотите хранить ОЧЕНЬ много данных, опять же у Пульсара есть очень крутая фича – Tiered Storage, которая в Кафке пока только планируется (по слухам, Confluent обещает ее до конца года).

В нашем случае оба решения подходили под задачу. С одной стороны много продакшн опыта, много специалистов на рынке, много саппорта и большое комьюнити (Кафка). С другой стороны более фичастое и заряженное решение, по которому значительно меньше информации в сети и для которого скорее всего придется выращивать специалистов с нуля и тратить много времени на ресерч.
А что такое «специалист под Пульсар»?
Интересно услышать больше деталей про ваш data-bus и опыт эксплуатации его в продакшн-среде. А именно:
1. Планируете ли вы отдавать data-bus в open-source? Обычно внутренние технологии быстро устаревают, если разработка ведется только внутри компании.
2. Как вы решаете вопросы изменения количества партиций топика кафки без даунтайма?
3. С какими проблемами вы столкнулись при использовании? Как принимаете фидбэк от команд, которые интегрируются с вашим решением?
Планирую более подробно рассказать на DevOps Conf в этом году. Статью специально старался держать обзорной, чтобы не вышел лонгрид, который никто не дочитает до конца ;)

По вопросам:
1. Не уверен. Вокруг data-bus у нас еще свой коннектор, свой schema registry, своя спецификация, свои либы и тд. То есть data-bus очень интегрирован в платформу Авито и его сложно оттуда отдельно отковырять :)
2. Пока нет автоматизированного решения. Происходит это довольно редко и в таких случаях мы создаем новый топик с целевым числом партиций, реплицируем в него старые данные и затем переключаем продюсеров и консьюмеров (в случае, когда надо сохранить упорядоченность). В случаях, когда у топика идет раскидывание ключей раунд робином, мы просто увеличиваем ему число партиций на лету.
3. Самый холиварный вопрос – что нужно реализовать самим, а что взять уже готовое (например, долго обсуждали брать ли Schema Registry от Confluent). Это, наверное, основные проблемы :) Фидбек получаем в основном через канал data-bus в слаке, где есть все заинтересованные. Плюс сами рассказываем про data-bus и его развитие на внутренних митапах и ведем документацию.
В том же русле, ближе к уровню чайника,
1. Что представляет собой дата бас? просто библиотека используемая из продюсеров и консьюмеров или что-то ближе к микросервису?
2. Насколько большая плата за использование дополнительного слоя?
1. Это сервис. он крутится в кубернетесе и предоставляет апи для доступа к Кафке. Апи обернуто в виде клиентских библиотек под разные языки.
2. Сложно сказать. С одной стороны, по сравнению с ванильной Кафкой производительность упала примерно в 3 раза (бенчмарки). С другой стороны, нам для этого кейса не нужна производительность в миллионы эвентов в секунду и мы никак это не ощутили. При этом дата-бас надо поддерживать. Но зато поддержка снимается со всех команд сервисов, которые хотят интегрироваться с дата-басом (им достаточно подключить библиотеку и дернуть метод. Сейчас клиенты к дата-басу появляются с помощью кодогенерации и сервисам в целом остается только начать отправлять данные без каких-либо настроек. Так что не знаю про плату – везде компромиссы.
ESB какое-то.
Можно упаковывать и продавать :-)
Название статьи не соответствует содержимому. В тексте статьи слово «микросервисы» ни разу не употребляется. Из того что вы описали ясно что у вас используется SOA, а уж «микро» похоже для кликбейта добавлено.
Микро – это маркетинговый булщит. Это не я так сказал, это буквально на днях к нам в Авито прилетал на тренинг Крис Ричардсон и я его вольно цитирую. На сколько я помню, этот термин оказался на слуху как раз после конференции с участием Мартина Фаулера, где использовался для привлечения внимания к топику.

А слово сервисы в статье употребляется 35 раз, специально посчитал :)
Микро – это маркетинговый булщит
— а вот и нет)
Точнее, если человек не понимает о чем говорит, то булшит, а если понимает, то это шаблон проектирования в рамках SOA. Суть шаблона в уменьшении функциональности сервиса и возможности независимого, от других сервисов, деплоя такого сервиса (что вероятно для Вашего случая как раз верно, но статья у Вас не об этом).

Лично по моим наблюдениям (дальше сарказм) — если есть проблемы с транзакциями, то это SOA вырожденная до микросервисов, а если все нормально, то это просто SOA.
Интересная статья, спасибо!
На какой технологии реализован Data-bus?
Каким образом происходит отправка данных в Data-bus? (для Apache Kafka клиентскую библиотеку подключать надо, для Data-bus тоже?)
Спасибо за комментарий!

На данный момент data-bus написан на go с использованием библиотеки sarama от Shopify.

Да, для клиентов data-bus нужно подключить простую библиотеку (обертку над Апи серверной части). Далее при вызове методов produce/consume серверная часть data-bus автоматически поднимает необходимое число инстансов (или роутит запрос на уже существующие инстансы) продюсеров и консьюмеров Кафки в кубернетесе, но все это скрыто от клиента.
Немного дополню. Клиенты общаются с Data-Bus по вэбсокету. Сообщение отправлять можно пачкой. Обрабатывать можно по одному или пачками. Все общение происходит синхронно, т.е. клиент не получит ОК пока data-bus не получит ОК от Кафки. И обратно сервис не отправит новые сообщения пока клиент не ответит на предыдущие.
as a Service в вашем случае означает что Data-bus крутиться у вас в приватном cloud?
В нашем случае все сервисы крутятся в нашем Kubernetes кластере. В том числе и data-bus.
as a Service используется чтобы показать, что мы абстрагируемся от слоя хранения и предоставляем сервис – брокер сообщений (а уж на Кафке он или на csv файлах – не важно).
Последний уточняющий вопрос.
Используете ли вы при написании Сервисов спеку OSB www.openservicebrokerapi.org?
Добрый день! Подскажите, какой replication factor вы в итоге посчитали достаточным?
Здравствуйте!

Мы используем фактор репликации 3 и min.insync.replicas 2. При этом прямо сейчас мы ведем работы по репликации данных между кластерами в нескольких ДЦ. Т.е. в целевой схеме фактор репликации будет 3*<число ДЦ>
Добрый день, как Вы пришли к такой формуле? Очень странный подход.
Здравствуйте!

Фактор репликации внутри одного ДЦ выбрали равный 3, т.к. 2 мало, 4 дорого.
Далее, у нас есть задача обеспечить масштабирование, отказоустойчиовость и не потерять данные в случае полного выхода из строя ДЦ – соответственно, во всех ДЦ стоят идентичные кластера Кафки с полным набором данных.

Не знаю, почему подход вам кажется странным. Это стандартный подход – использование active active топологии кластеров в нескольких ДЦ. Подробнее он описан, например, тут.
может я не очень понял Ваш комментарий, но из него следует что Вы ставите еще N брокеров в другом ДЦ, включаете их в состав того же кластера и увеличиваете фактор репликации на 3 или я не прав?
Да, мы неправильно поняли друг друга.

Мы не включаем брокеров из разных ДЦ в один кластер – это скорее антипаттерн. Мы используем active-active топологию с абсолютно независимыми кластерами в разных ДЦ и репликацией данных (Confluent Replicator) между ними. Таким образом получается, что в каждом ДЦ у нас по 3 копии данных, а всего 3*<число ДЦ> копий данных
Т.е. у вас данные из 2 ДЦ реплицируются в обоих направлениях? Как при этом происходит балансировка нагрузки и используете ли какой-то специальный подход к именованию топиков (чтобы избежать заицикливания)? Помогает ли вам во всей этой истории контейнерная среда, в которой вы запускаете брокеры?
Слово «микросервисы» сбило. Нигде не указано, что вы кафку не на железе запускаете. — даже скорее наоборот. Последний вопрос снимается.
Кафку и зукиперы запускаем в LXC на железных машинах. Data-bus и все остальные сервисы запускаем в Kubernetes.
Про балансировку пока не думали, так как по дизайну любой кластер в любом ДЦ должен быть способен выдержать 100% нагрузку (иначе будем ловить неожиданные последствия при failover).

Зацикливание можно избежать введением абстракции неймспейсов или использованием относительно новой фичи Кафки – хедеров. Confluent Replicator использует второй подход, он добавляет в отправляемые события хедер с идентификатором кластера, из которого эти события были изначально получены, тем самым предотвращая зацикливание.
Теперь стало значительно понятнее)
Но ответ породил другой вопрос, каким образом Вы производите переключение на другой кластер?
Насколько я знаю, использование репликатора не обеспечивает active-active, он просто копирует данные и настройки топиков в другой кластер, мониторит лаг и т.д. Для схемы active-active потребуется еще один репликатор в обратную сторону, но тогда как мониторится лаг, получится ведь каша?
Может Вы имели ввиду чтение, читать можно с любого кластера, запись переадресуется на основной кластер?
Ох, это довольно интересная тема с большим числом нюансов, она заслуживает отдельной статьи.

Да, вы правы. В нашем случае все кластеры связаны между собой и каждый реплицирует данные в другие. Ограничений никаких нет. Один сервис может раскатить своих продюсеров и консьюмеров в нескольких ДЦ, при этом каждый будет писать в кластер своего ДЦ и читать из кластера своего ДЦ. Вот тут довольно подробно описана верхнеуровневая идея.

Большая оговорка – пока архитектура на несколько ДЦ живет только на бумаге, скорее всего на этапе завершения тестовой реализации мы поймем, что все не так просто и может быть что-то еще изменим.

А почему не использовали stretched-cluster с rack awareness? Если ДЦ географически близко, то это — самый простой и правильный способ обеспечить failover

rack awareness у нас и сейчас используется в рамках одного ДЦ.

А можно какие-то пруфлинки, что это самый правильный способ? Про самый простой не спорю.

Пруфов как таковых нет, но это наследуется от логики. В случае растянутого кластера переключения потребителей услуги не требуется, работаем с живыми бутстрапами. В случае же разрыва соединения между ДЦ при поставщиков в каждом из них мы получаем split-brain, т.е. в одном и том же топике разных кластеров разная информация лежит в одних и тех же оффсетах соответствующих партиций

Приведу цитату из Definitive guide от Confluent: «Apache Kafka’s brokers and clients were designed, developed, tested, and tuned all
within a single datacenter. We assumed low latency and high bandwidth between
brokers and clients. This is apparent in default timeouts and sizing of various buffers.
For this reason, it is not recommended to install some Kafka brokers in one datacenter and others in another datacenter.» В целом, рекомендуемой топологией является именно active-active.

Могу в ответ только сообщить изустные предания от архитекторов Confluent, что stretched cluster при latency менее 7 млс между ДЦ является вполне себе решением. Просто под мультиДЦ обычно подразумевается широко географически распределённая топология, конда физика против такой latency.

Может быть вы и правы :)
Такие вопросы сложно теоретически обосновать, надо тестировать для конкретного случая.
При этом Clustering режим NATS Streaming не давал возможности сильного горизонтального масштабирования (вероятно, это уже не проблема после добавления partitioning режима в 2017 году).

Все еще проблема. NATS предельно примитивен в том, что касается кластеризации. partitioning в нем это не более чем настройка нескольких NATS Streaming серверов с ограничением на то, кто какие каналы обслуживает. Один канал может быть только на одном сервере. Т.е. это никакое не масштабирование, по сути даже. Чтобы было надежно каждый сервер еще дополнительно превращается в FT кластер. С учетом, что FT кластер требует еще и общее хранилище дисковое, то получается совсем печально.

Только недавно сами рассматривали возможности NATS Streaming как быстрой простой in-memory очереди масштабируемой. В итоге получилось, что все режимы кластеризации, по сути, бесполезны, т.к. хранение в памяти не поддерживается, а фич при этом полезных это особо не дает. И масштабирования никакого тут и нет. Только городить самому распределение очередей между сервисами и эмулировать то, что таже кафка делает сама. Посему решили, что лучше поработать над архитектурой сервисов, чтобы им было достаточно того, что предлагает NATS обычный.

С учетом общей философии NATS, я думаю в этой области вряд ли что-то изменится.

А так, кафка это конечно штука уникальная на рынке. Ей бы избавиться от зукипера и цены бы ей не было. Топик эффсеты уже перенесли. Осталось всего ничего выбор контроллера и хранение конфигурации общей.
Уже лучше, но, по сути, теже яйца сбоку. Вместо зукипера все так же отдельные инстансы теперь контроллеров, которые или на отдельные сервера, или вместе с брокерами, опять ломая голову, а в каком количестве тебе они нужны. Упрощение разве что с точки зрения унификации всего этого в один какой-то продукт, а не склеенные между собой две абсолютно разные системы.

Кафка не такая уж уникальная, технически Pulsar ее полностью превосходит, единственная проблема Пульсара — он гораздо менее распространен

Как проводили сравнение?
Я поэтому и задал вопрос про практический опыт:)
Да, в теории Пульсар во многом лучше Кафки.
Единицы тестов, которые я нашел, тоже показывают преимущество. Как минимум с латентностью. А может и с пропускной способностью. С латентностью при чем вполне ясно откуда берется — репликация на основе кворума.

Но все равно как-то все печально. Браться за проект, про который даже блог постов буквально пара штук — ну такое. И не важно, что у яху какого-то оно в продакшене уже долго. Они то все про него знают и понимают.
Недавно изучал Kafka, есть пара вопросов:
  1. Что такое schema registry? Думается, это реестр схем сериализации сообщений типа proto, avro. Как с этим работают? И какие схемы сериализации вы тестили? Какие оверхеды?
  2. С сообщением можно отправить какой-то ключ, как его используют? Можно ли его получить на consumer-е? И зачем ключ если уже есть offset?
  3. Как обеспечиваете персистентность данных брокера в k8s?

UPD: перечитал про ключи, понял что используют для роутинга по партициям, но ведь можно же ключ использовать для бизнес-логики, например id объявления в качества стае ключа?
1. Schema registry – инструмент, позволяющий гарантировать совместимость продюсера и консьюмера (вы описываете схему отправляемых в топик событий, добавляете в schema registry, проверяете, что при produce схема соблюдается, настраиваете правила совместимости схем при эволюции). Schema registry работает только с Avro. Из минусов – не все клиентские библиотеки умеют с ней работать, но есть REST API. В целом – очень нужная штука, если вы не хотите сломать своих консьюмеров.
2. Ключ можно рассматривать как идентификатор события. Как минимум без ключа нельзя гарантировать упорядоченность (так как по ключу идет распределение событий по партициям топика). Офсет – индекс, указывающий на положение события в партиции. Офсеты будут повторяться между разными партициями.
3. Кафка и зукиперы у нас запущены на железных дисках в LXC. В кубернетесе запущен сервис data-bus, он stateless.
Ключи также можно использовать в compacted-топиках, практически как идентификаторы записей. При этом после компакт-процедуры в партиции по ключу будет оставаться одна запись (последняя).
К тому же, если логика партиционирования записей основана на ключе — вы получаете гарантию последовательности чтения (консьюмер читает партицию всегда по порядку — при этом для полученных из разных партиций записей их взаимный порядок не гарантируется даже в рамках одного консьюмера) такой же, как и записи. Есть классы задач, которые отлично решаются с использованием «локального состояния» — когда, если есть гарантия, что события с одинаковыми ключами попадут в одну сущность обработчика, вы можете не использовать отдельное хранилище, к которому будете обращаться при получении каждой записи из Кафка. Вокруг этой идеи построены Kafka Streams, Flink и другие движки потоковой обработки.

"Почти всегда будет оставаться одна последняя запись" имеется в виду? Компэкшн Кафки — такой компэкшн...

Спасибо за фидбек по Nats Streaming, интересно. В качестве распределенной in-memory очереди у нас был неплохой опыт с NSQ (https://nsq.io/).

На KIP с избавлением от зукиперов, кстати, я давал ссылку в этой статье :)
Почему NSQ не персиcтентный?
Он же сообщения на диск сохраняет и размер очереди в можно регулировать
NSQ может работать в режиме сохранения данных на диск, но это не совсем то же самое, что предлагает Кафка. Например, нет репликации, нет упорядоченности записи. Также слышал отзывы, что NSQ иногда теряет данные, не не могу сказать на сколько это правда.
по умолчанию NSQ всё же для быстрого обмена сообщениями с хранением в памяти. Можно настроить параметр
--mem-queue-size=0
и тогда все сообщения будут проходить через диск. Но надо будет ещё позаботиться о репликации. NSQ мы используем для clickstream аналитики, где потеря одного события не будет иметь катастрофических последствий. В случае с шиной нам нужна гарантия что мы ничего не потеряем.
NSQ выглядит интересно. По опыту, были проблемы с ним? Первое, что бросилось в глаза, это довольно странная архитектура с dnslookupd процессами, которые еще и работают в режиме eventual consistency. Любопытно, насколько хорошо это работает на деле. Ну и масштабируемость. По логике вещей, у них сделано все просто и горизонтально должно скейлиться, но как оно на деле интересно.
На деле тоже всё хорошо. миллионы сообщений clickstream в минуту через NSQ гонялись(и гоняются) без проблем (И немаловажную роль играет именно удобный скейлинг + nsqlookupd). nsqlookupd довольно удобная вещь в плане topic discovery. Т.е. достаточно знать только адресс nsqlookupd и указать ему имя топика, а в ответ список серваков на которых этот топик хранится. Официальный клиент под golang всё это делает автоматически. Единственное чего там не было — это поиск по wildcard, но это довольно просто решалось руками.
Вот тут можно немного почитать про clickstream и NSQ.

Nsqlookupd ищет топики. Найдя топик (на каком nsqd он находится) consumer подключится к этому nsqd напрямую.
Сам lookupd только дискаверит.


Просто, но с кучей нюансов. Нам оказалось проще продублировать (есть набор микросервисов объедененных очередью, тоесть мы создали несколько идентичных наборов со своей тдельной очередью в каждом) nsqd несколько раз, не связывая их в "кластер", чем попытаться бороться с дублированием сообщений.

Т.е. жестко прописывали какие микросервисы через какой инстанс nsqd общаются? А не было проблем с администрированием? И потом это же вроде не решает проблемы дублирования? Если сервис не успел обработать сообщение за таймаут настроенный на nsqd — то это сообщение может получить другой потребитель. А с какими нюансами столкнулись? На моей практике было достаточно настроить правильно nsqd, а лукап работал довольно стабильно и исправно дискаверил :)
вариант кластера, создавал явное и постоянное дублирование,
при повторной отправке по таймауту оно очень редкое.
Природа наших событий позволяет допустить редкое дублирование событий. А увеличив время таймаута мы снизили эту вероятность практически к 0, да и микросервисы обрабатывают сообщения ооочень быстро, там в основном математика и перепосылка сообщения следующему сервису.

а вот какой нюанс реально вылез боком — это не упорядоченность сообщений (нам она важна), о чем собственно и пишется в доке, но как всегда оказалось неожиданно )), пришлось реорганизовать консюмеров

в целом особой проблемы в администрировании не возникло, группа микросервсиов связывается со своей очередью. Ну как сказать жестко, все конфигурируемо через консул, можно указать другой инстанс NSQD перезапустить сервисы и все ок

Какие же это микросервисы, если для общения между собой нужна специальная библиотека? Получается, все сервисы должны быть написаны на одном яп. Или будете под каждый язык свою версию писать?

У нас сейчас сервисы написаны по большей части на php, go, python. Есть немного java, rust и, наверное, что-то еще.
Да, такая цена data-bus'а. Но при этом библиотека очень тонкая.
Да, спасибо, знаем про такое :)
Интересная статья, давно хотел поесть кафки, спасибо за блюдо:D Хотел бы уточнить, через сервис data-bus происходит исключительно publishing сообщений в топик? Доставка до suscriber-а осуществляется непосредственно силами Кафки, или же все происходит через вашу апишку? И разве так не дольше чем request/reply того же натса/натс-стримминга?
Приятного аппетита :)

Все происходит через data-bus. В том числе есть поддержка consumer groups через data-bus.

По поводу request/reply – NATS Streaming его не поддерживает, на сколько я знаю (только NATS). Сошлюсь на объяснение ведущего разработчика NATS Streaming – тут он объясняет, почему такая фича не нужна для персистентных очередей. Но в целом, конечно, стоит внимательно изучить внутреннюю реализацию NATS Streaming (тем более тоже на go) и мб что-то нам позаимствовать/оптимизировать.
С учетом, что NATS Streaming это всего лишь надстройка над NATS, то тут их сильно разделяться смысла мало. Даже авторы вон говорят — библиотека nats streaming дает доступ к подключению до самого nats. Там уже можно делать что угодно.
Несколько вопросов по тестированию производительности Producer:
  1. Во сколько партиций велась запись продьюсерами?
  2. Использовали штатный perf-test или писали свой? Всю нагрузку с одного хоста подавали?
  3. Это максимальная производительность, которой удалось достичь? Настраивали ли producer (max.inflight.requests, max.linger.ms и пр)? Какой стоял acks (от всех брокеров ожидалось подтверждение записи или только от лидера — ибо это сказывается с одной стороны на гарантиях записи, а с другой — на времени выполнения записи)?

Аналогичные вопросы по консьюмерам. Тюнили?
Ну и про группы, имхо, не очень точно (и понятно) написано. При изменении состава читателей группы (в т.ч. первичном подключении) каждый консьюмер группы оказывается назначенным (assign) на одну или несколько партиций (при этом на каждую партицию назначен только один консьюмер из каждой подключенной к кластеру группы). Далее в процессе poll-инга, консьюмер запрашивает сообщения только из партиций, на которые он назначен до момента отключения или ребалансировки группы.
P.S. вообще, помимо автоматической балансировки группы и назначения консьюмеров на партиции брокером, можно сделать это «вручную».
1. Динамически меняли число партиций, уже не помню на каком числе партиций нашли максимальную производительность.

2. Написали свой небольшой баш-фреймворк :) Под капотом он получал в конфиге список параметров, которые нужно изменять от теста к тесу и массив значений этих параметров (например, partitions [1,3,9,27,54]). Дальше крутились тесты по всем комбинациям всех параметров и по каждому снимались итоговые throughput/latency (min, max, 95%%, avg). Нагрузка шла с трех отдельных хостов (не тех, где стоит Кафка). Также кидали события разного размера.

3. Частично про конфигурацию ответил в пункте 2. Да, max.inflight.requests, max.linger.ms и другие параметры настраивали. Acks всегда стоял all, так как в проде у нас используется только acks=all. Плюс был включен довольно агрессивный fsync. Консьюмеров также тюнили. Во время тестов производительности ребалансировка консьюмеров не производилась.

В какой-то момент мы остались довольны полученными цифрами и перестали пытаться разогнаться еще больше. Я уверен, что можно получить более внушительные цифры.
Сервис А часто хочет узнать информацию, которой располагает сервис Б


Хотелось бы узнать как вы узнаете информацию из сервиса Б? То, что Вы отправляете сообщение в сервис через Кафка это понятно, но вот когда нужно сделать запрос в другой сервис тоже через кафка? Если да, то как осуществляете запрос?
По-разному. Если сервисы общаются асихронно, то через Кафку. В таком случае сервис Б подписывается на интересующий его топик. Как только в сервисе А что-то происходит, сервис А отправляет событие об этом в топик. Сервис Б получает это событие и предпринимает требуемые действия. Результат обработки он может записать в другой топик, на который подписан сервис А.

Но есть и прямые вызовы сервисов, есть саги и т.д. Не все взаимодействие идет через Кафку.
Расскажите, какая часть архитектуры прилегла сегодня и можно было ли этого избежать?
Привет. Были проблемы со связанностью сетей подов в Kubernetes и динамической маршрутизацией. Причины расследуем, заведён LSR.
А можно как-то сравнить экономическую составляющую.
Допустим: после перехода на k8s кол-во простоев выросло на Х%, это Y рублей, но также мы смогли заработать благодаря k8s N рублей и т.д.
Нигде не видел подробного экономического обоснования такого перехода.
Допустим для компаний уровня гугл и амазон, нет альтернативы. Но вот можно ли для авито сделать экономическое обоснование такого перехода, ведь и без него вы бы смогли работать.
Нигде не видел подробного экономического обоснования такого перехода.

Потому что экономическое обоснование обычно зависит от кучи факторов, которые могут сильно отличаться от компании к компании:


  • скорость доставки новых фичей в продакшен до/после
  • оптимальность утилизации ресурсов до/после
  • стоимость поддержки и развития до/после
  • etc
это очевидно, но есть и общие моменты — например, повышение скорости доставки на 10%, повышает скорость проверки гипотез продактами, а это влияет на прибыль или капитализацию и т.д.
поэтому и задал вопрос, делали кто-нибудь такую модель, прежде чем вкладывать\закапывать деньги на подобные проекты.
Сейчас это выглядит больше как ИТишный проект с непрозрачной бизнес-ценностью.
Очень интересная статья!

Под прикрытием data-bus мы можем без даунтайма, незаметно обновлять версии Kafka, централизованно вести конфигурации producer’ов, consumer’ов, брокеров и т.д.


Можно поподробнее, как именно data-bus нивелирует даунтайм при обновлении? Вот «упала Kafka» и обновляется… Каким образом сервисы продолжают обмениваться собщениями при этом?
Спасибо за отзыв!

Если весь кластер целиком упал – сейчас никак не нивелируем, но в теории можем зароутить весь траффик на кластер в другом ДЦ через дата-бас прозрачно для сервисов. Если идет failover, можем корректно восстановить положение всех консьюмеров. Если делаем роллинг апдейт или рестарт, можем смотреть за состоянием абсолютно всех клиентов, которые сейчас работают с кластером. Если это обновление клиентов сарамы или конфигурации продюсеров и консьюмеров, то такой процесс происходит полностью на серверной части дата-баса без участия сервисов.
Не совсем понял про databus. Он работает только как wtite model? если да то откуда чтение событий идёт, напрямую с кафки?
Или же как-то надо опросы по http настраивать?
Привет!

databus работает и на запись и на чтение. Никаких опросов в клиенте не нужно настраивать, databus работает через push модель.
Опросы клиенту делать не надо, но надо отправить первый consume/auth запрос, говоря что готов начать прием событий, а потом начинает слушать databus и получать новые события. Всё общение идет через вэб сокеты.

Интересно сравнить с кейсом Booking где они тоже сначала закрыли инфраструктуру (kubernetes, в их случае) за ширмой и отгребли проблем. У Авито сработало ровно обратное решение.

С Booking мы пару раз созванивались недавно и делились опытом. В целом и у них и у нас подход похожий.
Подход к сопровождению кластеров Кафки
Sign up to leave a comment.