Всем привет!
Меня зовут Владимир Олохтонов, я руковожу командой разработки в отделе Message Bus, который является частью платформы Ozon. Мы занимаемся разработкой самых разных систем вокруг Kafka, etcd и Vault. В этой статье я расскажу о том, как мы строили линейно масштабируемую gRPC-прокси перед Kafka, способную обслуживать миллионы запросов в секунду, используя Go.
У нас довольно крупный кластер. Он состоит из 75 брокеров, управляет 30 000 партиций, а рейт поднимается до 5 млн запросов в секунду. Так что задачка перед нами стояла нетривиальная.
Дисклеймер
Статья написана с расчётом на читателя, хотя бы поверхностно знакомого с Kafka и gRPC streaming. Если вы пока не можете про себя такое сказать, то вот несколько ссылочек для ознакомления:
Зачем вообще нужна такая прокси?
В Ozon, как и в множестве других крупных компаний, Kafka используется в качестве корпоративной шины данных — механизма, позволяющего с минимальными трудозатратами строить асинхронные связи между системами по паттерну Pub/Sub.
Разберём стандартную историю развития шины данных на базе Kafka в большой компании.
Первые связи строятся на основе ванильных open-source-библиотек — и это не так просто, как кажется, поскольку в дизайне Kafka используется подход с «умными библиотеками», содержащими десятки параметров, значения которых надо грамотно подбирать. Более того, они поддерживают только часть функциональности Kafka и содержат немало багов.
С ростом числа связей логичным следующим шагом становится выбор из всего многообразия библиотек нескольких наиболее стабильных и написание поверх них простых адаптеров с вдумчиво подобранными параметрами и стандартизированными метриками.
Затем возникает потребность в дополнительной функциональности, специфичной для компании. В случае Ozon это автоматическое получение списка брокеров от service discovery — Warden — и OAuth-авторизация.
Адаптеры становятся всё сложнее и сложнее, их используют сотни сервисов, что означает длинный хвост старых версий и много работы по их регулярному обновлению. Более того, на горизонте маячат дополнительные фичи — и наши адаптеры уже не выглядят такими уж приятными для поддержки.
Как известно, нет таких проблем в программировании, которые нельзя решить повышением уровня абстракции 🙂 В голову приходит идея: что, если большую часть логики реализовать на стороне сервера, полностью изолировать от потребителей работу с Kafka, а клиентский протокол упростить до неприличия? Тогда мы могли бы снова сделать наши библиотеки простыми, обновлять их редко, а клиенты бы получали новые фичи практически без доработок с их стороны. Да и уметь переезжать с одного кластера на другой было бы очень кстати.
Разумеется, у такого подхода есть своя цена: появляется новая точка отказа, дополнительный хоп для трафика между сервисами и брокерами, а также уменьшается максимальная производительность каждого из клиентов в отдельности по сравнению с тщательно настроенной библиотекой.
Мы взвесили плюсы и минусы, после чего принялись за работу. Проект получил кодовое название data-bus.
Сбор требований
Вид связи
При работе с Kafka есть два основных сценария: publish и subscribe. И если для первого из них постоянное соединение нужно лишь для обеспечения высокой производительности, то в случае со вторым это требование протокола самой Kafka.
Значит, обычный RPC нам не подойдёт — нужно использовать долгоживущую двустороннюю связь.
Самый простой из доступных вариантов — это TCP. Однако он достаточно низкоуровневый. Поэтому удобнее опереться на протокол более высокого уровня: WebSocket или gRPC streaming.
В Ozon gRPC используется повсеместно, поэтому в качестве первого приближения мы выбрали второй вариант.
gRPC streaming предоставляет отличный уровень абстракции — ordered reliable channel. И на уровне приложения нам не нужно думать о деталях поддержания связи с клиентом.
Рейт и трафик
Из численных показателей начнём с рейта и трафика, поскольку их легко посчитать 🙂
Возьмём данные с самого большого из наших кластеров. Хоть сейчас и не сезон, но порядок оценить можно: fetch rate (читающие запросы) — 2 млн rps, produce (запись) — около 1,5 млн rps.
Входящего трафика у нас порядка 2,5 гигабайт в секунду, то есть 20 гигабит, исходящего — традиционно для Pub/Sub заметно больше — около 7 гигабайт в секунду, или 56 гигабит.
Переходим к оценке числа клиентов. Они бывают двух видов: консьюмеры и продюсеры.
В случае с консьюмерами всё просто, поскольку существуют kafka-scripts. kafka-consumer-groups --bootstrap-server ${broker} --all-groups --describe --members
— и мы получили IP-адреса всех консьюмеров. После небольшой обработки вывода с помощью Python мы узнали, что у нас 16 000 консьюмеров, из которых 11 000 имеют хотя бы одну назначенную партицию.
С продюсерами ситуация посложнее — пришлось исходить из оценок по косвенным метрикам. Мы знали, что подключений к кластеру у нас около 400 000, из которых примерно четверть — это соединения от консьюмеров.
Когда продюсер подключается к кластеру, он устанавливает соединения со всеми лидерами партиций, в которые собирается писать, поэтому не каждое из подключений к кластеру отражается в отдельного продюсера.
Вот что у нас получилось:
Трафик | Входящий: 20 Гбит/с | Исходящий: 56 Гбит/с |
Рейт запросов | Produce 1,5 млн rps | Fetch 2 млн rps |
Конкурентность | 50 000 продюсеров | 16 000 консьюмеров |
Строим систему методом постепенного возведения
Вообще сложно сразу придумать, как построить систему такого размера. Есть, конечно, какие-то очевидные вещи вроде того, что она должна деплоиться в k8s, скейлиться подами, писать метрики и т. д. Однако в какой-то момент всё равно придётся начать думать о деталях реализации, которых немало.
Построить мы должны что-то подобное:
Как известно, слона надо есть по частям. Я расскажу о том, как такого слоника грамотно приготовить.
Отладка транспорта
Мы начали с самого непонятного — с выяснения того, как будет себя вести gRPC streaming под нагрузкой и хватит ли нам его возможностей.
Для этого мы соорудили локальный стенд из двух сервисов на «голом» gRPC прямо по инструкции с официального сайта. data-bus реализовывал семантику echo-сервера, и это позволило нам понять, как себя ведёт один экземпляр приложения с gRPC-сервером на борту.
Получили такие результаты:
Количество клиентов | Количество переданных сообщений | Время (секунд) | Количество сообщений в секунду |
1 | 100 000 | 3,52 | 28 373 |
2 | 200 000 | 3,63 | 54 989 |
5 | 500 000 | 4,73 | 105 607 |
10 | 1 000 000 | 6,22 | 160 678 |
20 | 2 000 000 | 8,58 | 232 907 |
50 | 5 000 000 | 16,62 | 300 808 |
100 | 10 000 000 | 29,44 | 339 664 |
На этом этапе мы можем сделать следующий вывод: с одного инстанса удаётся снять не менее 300 000 rps gRPC-циклов запрос-ответ на четырёх ядрах. Это нас более чем устроило. Мы также проверили, скейлится ли gRPC по ядрам, запустив те же программы на dev-сервере и выкрутив GOMAXPROCS, — масштабирование практически линейное.
Кстати, рекомендую отличный пост с исследованием gRPC streaming performance.
Путь в Kubernetes
Все сервисы в Ozon должны жить в Kubernetes, и data-bus не исключение. Важно убедиться, что на этом этапе мы не получим серьёзной деградации производительности, чтобы потом было легче отделить проблемы с Kafka от проблем с инфраструктурой.
Для начала мы переделали нашу связку из echo-сервера с клиентом в полноценные сервисы data-bus и data-bus-checker на основе платформенного фреймворка и измерили производительность. На этом этапе из-за записи метрик мы получили деградацию около 17% по сравнению с «чистым» echo-сервером.
Причину деградации удобнее всего отслеживать с помощью go tool pprof в формате flame graph.
В дальнейшем data-bus-checker будет играть роль своеобразного внешнего скелета системы, позволяя проводить постоянное нагрузочное тестирование.
Настоящий API
Жалко, конечно, но мы не echo-сервер пишем, а слой абстракции перед Kafka. Нам придётся придумать новый интерфейс, а затем проверить, повлияет ли он как-то на производительность системы. Не должен, конечно, но мало ли 🙂
Семантически мы хотели обеспечить гарантии durable writes и at-least-once delivery, то есть чтобы каждое успешно записанное сообщение было обработано системой-потребителем не менее одного раза. Это заметно упрощает реализацию бизнес-логики, поскольку единственное, что нужно делать, — это корректно обрабатывать дубликаты сообщений, опираясь на ключи идемпотентности.
В качестве технологии описания протокола в Ozon используется Protocol Buffers. На ней остановились и мы. Настройки подключения один раз пробрасываются через grpc-заголовки при установлении соединения и в дальнейшем больше не передаются.
Протокол
После генерации новых gRPC-стабов мы заменили нашу реализацию echo-сервера на тот же самый echo-сервер, но работающий через новый протокол, после чего снова прогнали нагрузочные тесты с помощью data-bus-checker. Таким образом мы проверили, не сломали ли мы чего по дороге. Всё было в порядке.
Прежде чем переходить к прикручиванию настоящей работы с Kafka, мы решили проверить, что будет, если увеличить число подключенных клиентов.
Напомню, что наша цель — около 80 000 клиентов. Каково же было наше удивление, когда обнаружилось, что уже на 1000 подключений резко вырос latency операций!
К счастью, в этот момент система ещё была достаточно простой (банальный echo-сервер), поэтому мы быстро нашли проблему в работе с gRPC: выяснилось, что на стороне data-bus-checker для всех конкурентных подключений используется один инстанс gRPC-клиента, что, несмотря на создание отдельных gRPC-стримов, приводило к их упаковке в одно-единственное TCP-соединение, то есть требовало упорядочивания всех сообщений во всех стримах.
Как только мы решили эту проблему, начав создавать по одному клиенту на подключение, мы смогли увеличить количество соединений до 10 000 — и нам тут же разорвало data-bus по памяти. Здесь мы тоже искали причину недолго: работа с большим количеством подключений приводит к аллокации множества буферов, размер которых был выставлен неадекватно большим. Хороший ориентир — 32 Кб. Искать причины подобных проблем приятнее всего с помощью go tool pprof -heap.
После решения и этой проблемы всё наконец-то заработало как надо. С транспортом было покончено — и мы перешли непосредственно к работе с Kafka.
Прикручиваем работу с Kafka
В качестве библиотеки мы выбрали franz-go — нам хотелось, чтобы data-bus из коробки предоставлял для консьюмеров механизм ребалансировки без stop the world cooperative-sticky, а при таком требовании выбор, мягко говоря, невелик.
Начали мы с продюсера, поскольку он семантически устроен несколько проще, чем консьюмер.
Логика работы Producer
Клиент — приложение, работающее с Kafka через data-bus, например data-bus-checker.
Сервер — data-bus.
Клиент устанавливает двунаправленное соединение.
Сервер заводит Kafka producer, через который будет идти запись в целевой топик.
Клиент отправляет сообщение
PublishRequest{messages: []Message}
.Сервер синхронно записывает полученные сообщения в Kafka и отвечает сообщением
PublishResponse{ack: true}
.Клиент, прочитав
PublishResponse
, понимает статус записи.goto 3.
Единственное, что нам пришлось подкрутить, — это параметр ProducerLinger, время накопления сообщений на стороне библиотеки для отправки в Kafka батчами. Такой подход помогает увеличить throughput при работе с библиотекой напрямую, но в нашем случае его стоило сделать поменьше, поскольку клиенты между подключениями у нас не разделялись.
Логика работы Consumer
Клиент — приложение, работающее с Kafka через data-bus, например data-bus-checker.
Сервер — data-bus.
Клиент устанавливает двунаправленное соединение.
Сервер заводит Kafka consumer с
group_id=consumer_group_name
.Сервер отправляет клиенту сообщение
SubscribeResponse{messages: []Message}
.Клиент, получив
SubscribeResponse
, обрабатывает сообщения и направляет серверуSubscribeRequest{ack: true}
.Сервер отмечает сообщения как обработанные.
goto 3.
Изначально мы хотели опереться на автокоммиты, чтобы надёжно фиксировать уже обработанные сообщения, не перегружая брокеры коммитами на каждой итерации. Но натолкнулись на новый подводный камень: при разрыве соединения от клиента под капотом библиотеки происходил коммит. Это может быть нормально при работе с библиотекой напрямую, но нам не подходило совершенно, ведь мы не должны ничего коммитить без команды от клиента.
В итоге мы перешли на механику AutoCommitMarks в сочетании с автокоммитами. Это позволило нам обеспечить подходящий уровень производительности при соблюдении семантических требований.
Пришло время натурных испытаний системы в сборе.
Натурные испытания
Producer
Начали мы с продюсера. На стороне data-bus-checker было имплементировано три сценария:
latency-optimized — отправка по одному сообщению без задержек.
throughput-optimized — отправка батча из 100 сообщений без задержек.
ticker — отправка по одному сообщению с регулируемой задержкой между отправками.
С первыми двумя сценариями всё достаточно очевидно, а третий мы хотели использовать для проверки поведения системы при тысячах и десятках тысяч подключений, через каждое из которых отправляется по одному сообщению раз в секунду.
Испытание на 1000 подключений
Первое, с чем мы столкнулись, — это волнообразный рост 99 перцентиля latency с периодичностью в пять минут. Наши опытные админы тут же предположили, что проблема кроется в обновлении метаданных.
Дело в том, что каждый клиент Kafka время от времени обновляет метаданные о конфигурации кластера, и это довольно тяжёлые запросы. Если же мы запускаем тысячи клиентов одновременно, то их жизненные циклы становятся синхронизированными и они все запрашивают метаданные практически одновременно. Мы добавили случайную задержку перед началом передачи сообщений, но, к нашему удивлению, это не помогло.
Спустя два дня поисков проблемы мы обнаружили, что, хоть клиенты и спят перед отправкой первого сообщения, gRPC-соединение с data-bus устанавливается до сна, а именно в этот момент заводился Kafka-клиент, регулярно запрашивающий метаданные.
Поняли мы это после того, как добавили на стороне data-bus метрику для времени ожидания сообщений от клиентов и это время стало поразительно напоминать наше время сна :)
Когда мы это исправили, показатель latency перестал гулять и продюсеры, наконец, заработали нормально.
Испытание на 10 000 подключений
Для этого испытания мы выставили задержку между отправками сообщений в 30 секунд, чтобы не отвлекаться на скорость обработки самих сообщений.
На графиках время от времени стали появляться пики по несколько секунд. Не буду мучить вас историей о том, как мы это отлаживали, но в итоге выяснилось, что в franz-go по умолчанию довольно непродолжительное время жизни idle-подключений и, пока мы проходили по всем партициям, подключения к первым из них успевали «протухнуть» и их приходилось устанавливать заново. Проблема решилась подкручиванием параметра ConnIdleTimeout.
В этой точке мы смогли получить ровный график latency без пиков, но с 99 перцентилем в районе 300 мс — это слишком много, поскольку 99 перцентиль записи в Kafka у нас составляет примерно 35 мс.
Результаты профилирования показали, что около 75% времени сервер проводит в ожидании системных вызовов. Следовательно, нам нужно было уменьшить их число. Пришло время немного пересмотреть дизайн системы: если сначала мы не собирались разделять клиенты между подключениями, то теперь поняли, что это необходимо для более эффективной утилизации ресурсов сервера.
После прикручивания механизма агрегации клиентов у нас уменьшился показатель latency, а волосы стали гладкими и шелковистыми батчинг и сжатие стали работать заметно лучше. Однако это привело к уменьшению максимального throughput и увеличению latency для latency-optimized-продюсеров из-за необходимости ждать, пока обработаются сообщения из конкурентных подключений.
В итоге мы реализовали агрегацию опцией with_clients_aggregation, которой могут управлять клиенты, чтобы можно было выбрать наиболее эффективную стратегию для каждого из сценариев.
Consumer
Первым делом на стороне data-bus-checker был имплементирован механизм, позволяющий управлять числом и размером групп, а также их конфигурацией: размером батчей и задержкой между запросами.
Сначала проверялось максимальное количество циклов get-ack, затем — максимальная пропускная способность. Мы заметили интересный эффект: тогда как два пода data-bus из трёх выдавали порядка 10 Гбит/с трафика, третий выдавал почти в два раза больше. Это было связано с тем, что он физически находился на той же машине, где соответствующий ему под data-bus-checker, и, соответственно, между ними были околонулевые сетевые задержки. Мы запомнили это на случай, если нам понадобится ещё сильнее оптимизировать систему.
Дальше по аналогии с продюсером мы перешли к проверке максимального количества подключений — и… data-bus снова стал умирать от OOM Killer.
К тому моменту мы были уже стреляными воробьями. Расчехлили наш go tool pprof -heap — и увидели, что основной объём памяти выделяется в кишках franz-go. Это оказались буферы, куда записываются сообщения из партиций, которые затем будут передаваться на обработку.
Размер буферов регулируется параметром FetchMaxBytes. По умолчанию установлено довольно большое значение в 50 Мб — можете себе представить, сколько нужно памяти для обработки тысяч клиентов с десятками партиций.
После выбора более подходящего значения (у нас это 32 Кб) система начала стабильно работать при 10 000 подключений на под.
Полноразмерный тест
В начале статьи я упоминал о том, что мы хотим уметь обрабатывать в секунду порядка 3,5 млн запросов, разделённых между продюсерами и консьюмерами. Надо бы проверить, будет ли система корректно работать под такой нагрузкой.
К этому моменту нам было известно, что один под data-bus может обрабатывать порядка 100 000 rps, поэтому мы отмасштабировали data-bus до 48 подов (с запасом производительности в полтора раза).
Затем мы подобрали такую конфигурацию data-bus-checker, которая более-менее реалистично описывала наш боевой профиль, и запустили нагрузку.
Первое, во что мы упёрлись, — это в невозможность записывать более 1 млн rps в один топик с 21 партицией. Эту проблему мы решили, просто создав десять топиков и немного подкрутив data-bus-checker, чтобы он умел в них писать.
После реализации записи в десять топиков мы смогли получить требуемую производительность. Важно отметить, что на этом этапе мы не использовали агрегацию клиентов, поскольку это необязательная оптимизация.
Мы также проверили, как влияет на систему включение агрегации для небольших групп клиентов. И нам удалось записать почти 5 млн rps!
Что ещё интереснее, так это то, насколько сильно уменьшилась нагрузка на саму Kafka после этого.
Заключение
Безусловно, до того как система будет готова к внедрению, нам предстоит проделать ещё немало работы. Однако костяк, способный выдерживать нагрузку в миллионы rps, мы возвели — и потратили на это всего около двух человеко-месяцев.
Главный принцип: большие системы надо строить по кусочкам, непрерывно отслеживая важные свойства. Тогда, если после добавления очередного блока будет замечено падение производительности, сразу будет понятно, куда копать.
Помните: не боги горшки обжигают. Пробуйте — и у вас получится, как получилось у нас. Желаю удачи!