Pull to refresh
28
0
Анатолий Солдатов @EasyGrow

Исследователь

Send message
Приятного аппетита :)

Все происходит через data-bus. В том числе есть поддержка consumer groups через data-bus.

По поводу request/reply – NATS Streaming его не поддерживает, на сколько я знаю (только NATS). Сошлюсь на объяснение ведущего разработчика NATS Streaming – тут он объясняет, почему такая фича не нужна для персистентных очередей. Но в целом, конечно, стоит внимательно изучить внутреннюю реализацию NATS Streaming (тем более тоже на go) и мб что-то нам позаимствовать/оптимизировать.
У нас сейчас сервисы написаны по большей части на php, go, python. Есть немного java, rust и, наверное, что-то еще.
1. Schema registry – инструмент, позволяющий гарантировать совместимость продюсера и консьюмера (вы описываете схему отправляемых в топик событий, добавляете в schema registry, проверяете, что при produce схема соблюдается, настраиваете правила совместимости схем при эволюции). Schema registry работает только с Avro. Из минусов – не все клиентские библиотеки умеют с ней работать, но есть REST API. В целом – очень нужная штука, если вы не хотите сломать своих консьюмеров.
2. Ключ можно рассматривать как идентификатор события. Как минимум без ключа нельзя гарантировать упорядоченность (так как по ключу идет распределение событий по партициям топика). Офсет – индекс, указывающий на положение события в партиции. Офсеты будут повторяться между разными партициями.
3. Кафка и зукиперы у нас запущены на железных дисках в LXC. В кубернетесе запущен сервис data-bus, он stateless.
Спасибо за фидбек по Nats Streaming, интересно. В качестве распределенной in-memory очереди у нас был неплохой опыт с NSQ (https://nsq.io/).

На KIP с избавлением от зукиперов, кстати, я давал ссылку в этой статье :)
Спасибо за комментарий!

На данный момент data-bus написан на go с использованием библиотеки sarama от Shopify.

Да, для клиентов data-bus нужно подключить простую библиотеку (обертку над Апи серверной части). Далее при вызове методов produce/consume серверная часть data-bus автоматически поднимает необходимое число инстансов (или роутит запрос на уже существующие инстансы) продюсеров и консьюмеров Кафки в кубернетесе, но все это скрыто от клиента.
Здравствуйте!

Мы используем фактор репликации 3 и min.insync.replicas 2. При этом прямо сейчас мы ведем работы по репликации данных между кластерами в нескольких ДЦ. Т.е. в целевой схеме фактор репликации будет 3*<число ДЦ>
В нашем случае все сервисы крутятся в нашем Kubernetes кластере. В том числе и data-bus.
as a Service используется чтобы показать, что мы абстрагируемся от слоя хранения и предоставляем сервис – брокер сообщений (а уж на Кафке он или на csv файлах – не важно).
1. Это сервис. он крутится в кубернетесе и предоставляет апи для доступа к Кафке. Апи обернуто в виде клиентских библиотек под разные языки.
2. Сложно сказать. С одной стороны, по сравнению с ванильной Кафкой производительность упала примерно в 3 раза (бенчмарки). С другой стороны, нам для этого кейса не нужна производительность в миллионы эвентов в секунду и мы никак это не ощутили. При этом дата-бас надо поддерживать. Но зато поддержка снимается со всех команд сервисов, которые хотят интегрироваться с дата-басом (им достаточно подключить библиотеку и дернуть метод. Сейчас клиенты к дата-басу появляются с помощью кодогенерации и сервисам в целом остается только начать отправлять данные без каких-либо настроек. Так что не знаю про плату – везде компромиссы.
Микро – это маркетинговый булщит. Это не я так сказал, это буквально на днях к нам в Авито прилетал на тренинг Крис Ричардсон и я его вольно цитирую. На сколько я помню, этот термин оказался на слуху как раз после конференции с участием Мартина Фаулера, где использовался для привлечения внимания к топику.

А слово сервисы в статье употребляется 35 раз, специально посчитал :)
Планирую более подробно рассказать на DevOps Conf в этом году. Статью специально старался держать обзорной, чтобы не вышел лонгрид, который никто не дочитает до конца ;)

По вопросам:
1. Не уверен. Вокруг data-bus у нас еще свой коннектор, свой schema registry, своя спецификация, свои либы и тд. То есть data-bus очень интегрирован в платформу Авито и его сложно оттуда отдельно отковырять :)
2. Пока нет автоматизированного решения. Происходит это довольно редко и в таких случаях мы создаем новый топик с целевым числом партиций, реплицируем в него старые данные и затем переключаем продюсеров и консьюмеров (в случае, когда надо сохранить упорядоченность). В случаях, когда у топика идет раскидывание ключей раунд робином, мы просто увеличиваем ему число партиций на лету.
3. Самый холиварный вопрос – что нужно реализовать самим, а что взять уже готовое (например, долго обсуждали брать ли Schema Registry от Confluent). Это, наверное, основные проблемы :) Фидбек получаем в основном через канал data-bus в слаке, где есть все заинтересованные. Плюс сами рассказываем про data-bus и его развитие на внутренних митапах и ведем документацию.
У Пульсара нет аналогов KStreams и не такое больше сообщество (и не так много коммерческих компаний, продвигающих эту технологию, что важно). Также под Пульсар почти невозможно найти специалистов на рынке (по крайней мере в России).

Но у него есть очень много интересных фичей – например, подписка на ключи, nack и отложенное чтение, stateless брокеры и т.д. Если вы хотите хранить ОЧЕНЬ много данных, опять же у Пульсара есть очень крутая фича – Tiered Storage, которая в Кафке пока только планируется (по слухам, Confluent обещает ее до конца года).

В нашем случае оба решения подходили под задачу. С одной стороны много продакшн опыта, много специалистов на рынке, много саппорта и большое комьюнити (Кафка). С другой стороны более фичастое и заряженное решение, по которому значительно меньше информации в сети и для которого скорее всего придется выращивать специалистов с нуля и тратить много времени на ресерч.
Если логика обработки сообщений очень долгая, на стороне консьюмера нужно увеличить значение параметра `max.poll.interval.ms`. Кафка будет считать, что консьюмер жив и не будет его трогать, если следующий poll произойдет в рамках этого интервала (при условии, что консьюмер на самом деле жив и все это время отправляет heartbits контроллеру – это делается в отдельном background треде прозрачно для разработчика)
Также можно использовать exactly once подход на основе транзакций Кафки
При повторном чтении есть несколько разных сценариев:
— если один консьюмер читал из одной партиции, он продолжит из нее читать (если не было ребалансировки между двумя чтениями)
— если один консьюмер читал из нескольких партиций, он попытается опять вычитать данные из них, но на сколько я знаю, в документации нет гарантий сохранения порядка чтения из партиций (те он может и измениться).

В документации к консьюмеру есть следующая фраза «If a consumer is assigned multiple partitions to fetch data from, it will try to consume from all of them at the same time, effectively giving these partitions the same priority for consumption. However in some cases consumers may want to first focus on fetching from some subset of the assigned partitions at full speed, and only start fetching other partitions when these partitions have few or no data to consume.»

Также есть открытый KIP, который говорит о том, что сейчас поведение по выбору партиций при чтении детерминировано.

Обычно, consumer poll'ит события из кафки в бесконечном цикле. Внутри цикла у вас происходит логика обработки событий (в том числе commit). Как только вы сделали все необходимые действия с полученными событиями, опять вызывается poll (в простом случае) и из кафки прилетает новая пачка событий (или повторно прилетает старая пачка событий, если не было коммита).
Спасибо за комментарий :)

Оффет контролирует положение вашего консьюмера в логе. С точки зрения Кафки если вы не закоммитили изменения, вы не сдвинулись с места.

Вы можете не коммитить события, но тогда вы каждый раз будете перечитывать их из Кафки повторно и никогда не получите новые события.
Смотрели на этот проект в рамках другой задачи. Сами не использовали, т.к. у решения нет поддержки consumer groups и из-за этого нам не подошло бы в любом случае.

Судя по issues и отсутствию поддержки, проект, кажется, создавался в первую очередь как учебный (идея прикольная :) ). Как им можно пользоваться для реальных задач – не знаю, разве что подменять Кафку в интеграционных тестах (если протокол на самом деле совместим).
Сейчас работаем над переводом выпусков в онлайн. Для этого будем создавать слой АПИ на уровне БД (хранимки, версионные схемы, вьюхи). «Многопоточку» и в таком подходе вполне успешно можно применять будет.

Ядер более ста, оперативки не очень много (хватает, чтобы все индексы при построении влезали в maintenance_work_mem, например), данные распределены между SSD и SAS дисками в зависимости от профиля нагрузки. Подробно детали по нагрузке и конфигурации раскрыть не могу, к сожалению.
Обычно, все процессы выполняются в своей отдельной транзакции. Соответственно, если один диапазон завершается с ошибкой, он помечается необработанным и в лог сохраняется описание ошибки, а «многопоточка» прерывается для анализа.

Дальше все зависит от выполняемой задачи, на сколько нам критично мигрировать все данные без ошибки. Часто можно пропустить диапазон с ошибкой и дождаться завершения миграции, затем точечно разобраться с ошибками и после их устранения перезапустить «многопоточку». При перезапусках «многопоточки» работу будут взяты только еще необработанные диапазоны. Например, такой сценарий возможен при миграции полей «заранее», когда приложение их еще не видит у себя.

Есть также вариант с созданием резервной копии таблицы до миграции, если нам нужна возможность откатиться к состоянию до «многопоточки». Дальше делаем или rename или восстанавливаем из нее данные.

В крайних случаях можно переключиться на стендбай с запаздыванием, где «многопоточки» еще не было или восстановиться из бэкапа и донакатить WALы.

Тут еще могут быть и другие варианты проведения миграции и восстановления, зависит от сценариев. На проде у нас критичных случаев, когда бы понадобились варианты 2 или 3 не было.

Information

Rating
Does not participate
Works in
Registered
Activity