Привет, Хабр! Меня зовут Павел, я ведущий разработчик. В этой статье расскажу про Kafka, consumer groups, lag, offset commit и встроенное сжатие сообщений. Не в формате “что такое Kafka за 15 минут”, а через обычную продовую историю, где кластер начал есть диск быстрее, чем политики очистки успевали его освобождать.
Снаружи проблема выглядела банально: Kafka живет, producer’ы пишут, consumer’ы читают, retention настроен. Внутри было веселее: в пиковую нагрузку место на брокерах улетало так бодро, будто у него был отдельный KPI на исчезновение. Retention вроде бы должен был чистить старые данные, но он не маг. Если входящий поток крупных сообщений быстрее, чем Kafka успевает освобождать сегменты, диск все равно закончится. А когда диск заканчивается у Kafka, настроение портится не только у Kafka.
В итоге мы пришли к встроенному сжатию Kafka на producer’ах. Без ручного zip payload, без “давайте завернем JSON в архив и пусть каждый consumer теперь разбирается сам”. Просто включили compression на уровне Kafka-клиента. В нашем случае объем сообщений на хранении уменьшился примерно в 5 раз. Это не отменило необходимость следить за retention, lag и consumer groups, но дало кластеру перестать жить в режиме “еще один пик, и пишем посмертную”.
Ниже разберу:
почему retention не всегда спасает от заполнения диска;
чем Kafka native compression отличается от ручного zip’ования payload;
как включить сжатие в C# через Confluent.Kafka;
как consumer groups, lag и offset commit связаны с этой историей;
что мониторить, чтобы не узнать о проблеме по тишине сервиса.

У нас была система, где несколько producer’ов писали события в Kafka. События были достаточно крупные, поток в обычное время держался нормально, consumer’ы успевали читать, retention был настроен. В общем, все выглядело как система, которую не хочется трогать без причины.
Проблема появилась в пиковой нагрузке. Объем входящих сообщений резко увеличивался, Kafka активно писала новые log segments, а политики очистки не успевали освобождать место с той же скоростью. В какой-то момент диск на брокерах стал главным участником архитектуры: не потому что он умный, а потому что без свободного места все остальные компоненты внезапно становятся философами и начинают рассуждать о бренности SLA.
Упрощенная картина была такая:
Компонент | Что делал | Почему это стало проблемой |
|---|---|---|
Producer’ы | Писали крупные сообщения в Kafka | В пике быстро занимали место на брокерах |
Kafka brokers | Хранили log segments | Диск рос быстрее, чем освобождался |
Retention/cleanup | Удалял старые сегменты | Не успевал компенсировать входящий поток |
Consumer group | Читала сообщения | Чтение не решало проблему storage, потому что Kafka хранит данные по retention |
Кластер | Пытался жить | При заполнении диска начинались неприятные эффекты для всех |
Важно: consumer может читать быстро, но это не значит, что Kafka сразу удалит сообщение. Kafka не очередь в классическом смысле “прочитал — удалилось”. Она хранит log по политикам retention. Поэтому “consumer’ы же читают” не является ответом на вопрос “почему диск заканчивается”.
Почему retention не спас
Retention в Kafka обычно задают по времени, по размеру или комбинацией этих правил. Например:
хранить сообщения не дольше N часов/дней;
хранить не больше N байт на локальный лог partition;
периодически проверять, какие log segments можно удалить.
Но тут есть несколько важных деталей.
Kafka удаляет не отдельные сообщения, а log segments. Если segment еще активный или не подходит под условия удаления, данные внутри него продолжают занимать место. Очистка происходит не непрерывно в каждую наносекунду, а по своим циклам. И самое главное: если producer’ы в пике пишут быстрее, чем старые сегменты освобождают диск, retention не становится волшебной кнопкой.
Можно представить это как ванну:
Сценарий | Что происходит |
|---|---|
Вода вливается медленнее, чем уходит | Все спокойно |
Вода вливается примерно с той же скоростью | Живем на грани, но красиво |
Вода вливается быстрее, чем уходит | Рано или поздно вода будет не только в ванной |
У нас Kafka была этой ванной. Только вместо воды были сообщения, вместо пола — диск брокера, а вместо соседей снизу — сервисы, которые внезапно начинали страдать вместе с кластером.
Первым желанием может быть: “давайте сильнее уменьшим retention”. Иногда это нормальный путь. Но он не всегда подходит:
бизнесу может быть нужно окно переобработки;
consumer’ы могут временно отставать;
replay может быть частью нормальной эксплуатации;
слишком агрессивный retention превращает временный лаг в потерю возможности восстановиться.
Поэтому мы пошли не только в сторону “быстрее удалять”, а в сторону “меньше писать на диск”.
Что именно мы включили
Мы использовали встроенное сжатие Kafka. Это принципиальный момент.
Мы не делали так:
message.Value = Zip(JsonPayload)
И не заставляли каждый consumer вручную распаковывать payload. Мы включили compression на уровне producer config. Producer собирает batch сообщений и сжимает его. Broker обычно хранит batch в producer codec, если на broker/topic не переопределен compression.type. Если переопределен, broker может перекомпрессировать данные. Consumer получает данные и прозрачно распаковывает их через Kafka-клиент.

Разница между встроенным сжатием Kafka и ручным zip payload:
Подход | Что сжимается | Что видит приложение | Плюсы | Минусы |
|---|---|---|---|---|
Kafka native compression | Batch сообщений на уровне Kafka client | Тот же payload, что и раньше | Не меняет контракт, проще consumer’ы, проще replay/DLQ | Нужен CPU на сжатие/распаковку |
Ручной zip/gzip payload | Само значение сообщения | Архив/bytes вместо привычной структуры | Полный контроль над форматом | Все consumer’ы должны знать, как распаковать; сложнее дебажить |
В нашем случае был нужен именно первый вариант. Мы хотели экономить место на брокерах, не переписывая контракт интеграции.
Пример настройки в C# + Confluent.Kafka
Ниже обезличенный пример producer config. В статье я показываю Gzip, потому что в нашем кейсе речь была именно про “зипование” на стороне Kafka. Если у вас используется Lz4, Snappy или Zstd, логика та же, меняется codec и профиль CPU/степени сжатия.
using Confluent.Kafka; var producerConfig = new ProducerConfig { BootstrapServers = "broker-1:9092,broker-2:9092", Acks = Acks.All, EnableIdempotence = true, // Kafka native compression: // producer сжимает batch, // broker обычно хранит его в producer codec // или перекомпрессирует, если compression.type переопределен, // consumer распаковывает прозрачно. CompressionType = CompressionType.Gzip, // Сжатие лучше раскрывается на batch'ах. // Конкретные значения нужно подбирать под нагрузку и latency. LingerMs = 5 };
Эквивалент на уровне librdkafka properties:
compression.type=gzip acks=all enable.idempotence=true linger.ms=5
Если конфигурация живет в appsettings.json, она может выглядеть примерно так:
{ "Kafka": { "Producer": { "BootstrapServers": "broker-1:9092,broker-2:9092", "Acks": "All", "EnableIdempotence": true, "CompressionType": "Gzip", "LingerMs": 5 } } }
Главная идея: мы не меняем бизнес-сообщение. Consumer’ы продолжают читать ту же структуру. Меняется то, как Kafka-клиент упаковывает batch при отправке.
Почему объем уменьшился примерно в 5 раз
Сжатие хорошо работает, когда в сообщениях много повторяемых структур. В типичных интеграционных событиях это встречается часто:
повторяются имена полей;
повторяются технические блоки;
повторяются идентификаторы типов, статусы, названия сущностей;
сообщения идут пачками и похожи друг на друга;
JSON/XML обычно сжимаются заметно лучше, чем уже сжатые бинарные данные.
Если до включения compression Kafka хранила условные 100 единиц объема, после включения встроенного gzip в нашем кейсе стало около 20. То есть примерно в 5 раз меньше.
Упрощенная таблица эффекта:
Вариант | Размер на диске Kafka | Эффект | Что получили |
|---|---|---|---|
Без сжатия | 100% | Базовая линия | Диск быстро заканчивался в пик |
Kafka gzip | около 20% | примерно x5 | Кластер перестал задыхаться по месту |
Ручной zip payload | не использовали | не требовался | Не стали усложнять consumer’ы |
Сразу оговорюсь: “примерно в 5 раз” — не универсальная константа. Если у вас уже сжатые изображения, архивы или плотный бинарный payload, эффект будет другим. Если у вас большой JSON с повторяющимися полями, gzip/zstd могут дать очень хороший результат.
Цена сжатия
У сжатия есть цена. Оно не берется из воздуха, хотя иногда очень хочется верить, что где-то в кластере есть бесплатная полка с производительностью.
Что меняется:
Ресурс | Что происходит |
|---|---|
CPU producer’а | Растет, потому что producer сжимает batch |
CPU consumer’а | Растет, потому что consumer распаковывает batch |
Сеть | Обычно становится легче |
Диск брокера | Обычно становится легче |
Latency | Может немного измениться из-за batching и compression |
Debug | При native compression почти не усложняется для приложения |
Поэтому включать compression стоит не по принципу “у нас теперь модно”, а по понятной причине:
диск стал бутылочным горлышком;
сеть стала бутылочным горлышком;
сообщения хорошо сжимаются;
CPU producer/consumer позволяет это оплатить;
latency не выходит за допустимые границы.
В нашем случае сделка была выгодной: CPU оказался дешевле, чем свободное место на брокерах во время пика.
Почему здесь вообще consumer groups
На первый взгляд можно спросить: если статья про disk pressure и compression, зачем сюда consumer groups?
Потому что в реальной эксплуатации Kafka эти вещи редко живут отдельно. Когда диск начинает заполняться, команда почти всегда смотрит еще и на lag, consumer groups, скорость обработки, rebalance и offset commit. И очень легко перепутать симптомы.
Например:
lag растет, потому что consumer’ы реально не успевают;
lag растет, потому что downstream DB тормозит;
lag выглядит страшно в сообщениях, но по времени задержка терпимая;
consumer’ы читают нормально, но диск все равно растет, потому что retention хранит данные независимо от факта чтения;
rebalance временно останавливает обработку и создает ощущение, что Kafka “опять сама что-то делает”.
Consumer group в Kafka устроена просто, пока не нужно объяснять это ночью.
В рамках одной consumer group один partition назначается только одному consumer’у. Если у topic 6 partitions, а вы подняли 20 consumer’ов в одной группе, 14 из них не начнут магически ускорять чтение этого topic. Они будут стоять без partition assignment и думать о смысле своего деплоя.
Количество partitions | Количество consumer’ов в group | Что будет |
|---|---|---|
6 | 3 | Каждый consumer может читать несколько partitions |
6 | 6 | Обычно по одному partition на consumer |
6 | 20 | Лишние consumer’ы останутся без работы |
Поэтому, когда вы боретесь с lag, нельзя просто сказать “добавим pod’ов”. Сначала надо посмотреть:
сколько partitions у topic;
как распределены partitions между consumer’ами;
нет ли горячих partitions;
не тормозит ли обработка внутри consumer’а;
не упираетесь ли вы в БД, внешний сервис или сериализацию.
Lag: цифра, которая любит притворяться ответом
Kafka lag часто показывают как количество сообщений между последним записанным offset и последним закоммиченным offset consumer group.
Это полезная метрика, но сама по себе она не отвечает на главный вопрос: насколько мы опаздываем с точки зрения бизнеса?
Пример:
Ситуация | Offset lag | Реальная задержка |
|---|---|---|
100 000 маленьких сообщений | Большой | Может обработаться за минуты |
5 000 тяжелых сообщений | Меньше | Может обрабатываться дольше |
Один горячий partition | Средний | Один consumer страдает, остальные скучают |
Consumer быстро читает, но долго пишет в БД | Растет | Kafka не виновата, но ей все равно прилетит |
Для продовой картины лучше смотреть не только offset lag, но и:
age/event-time lag: насколько старое последнее необработанное событие;
скорость входящего потока;
скорость обработки;
размер сообщений;
disk usage broker’ов;
under replicated partitions;
consumer rebalance count;
ошибки produce/fetch;
latency записи в downstream БД.
В нашем случае lag был важен, но корень проблемы был в storage: Kafka должна была хранить слишком много данных в пиковый момент. Сжатие помогло именно там.
Offset commit: где появляются дубли
Теперь про вещь, которую лучше понимать до того, как у вас появились дубли в бизнес-операциях.
Consumer получает сообщение, обрабатывает его и коммитит offset. Вопрос в порядке действий.
Подход | Что может случиться |
|---|---|
Commit до обработки | Если consumer упадет после commit, сообщение можно потерять |
Commit после обработки | Если consumer упадет после обработки, но до commit, сообщение придет повторно |
Auto commit без понимания процесса | Можно получить неприятный сюрприз и долго спорить с логами |
Обычно для бизнес-операций безопаснее commit после успешной обработки. Это дает at-least-once: сообщение не теряется, но может прийти повторно. Значит, обработчик должен быть идемпотентным.

Типовая ситуация:
Consumer получил сообщение
offset=42.Записал результат в БД.
Упал до commit offset.
После рестарта Kafka снова отдала
offset=42.Если запись в БД не идемпотентна, получили дубль.
Это не “Kafka сломалась”. Это нормальная цена at-least-once. Kafka честно говорит: “Я не видела commit, держи сообщение еще раз”. А дальше уже вопрос к приложению: умеет ли оно повторную обработку пережить без истерики.
Пример consumer config:
var consumerConfig = new ConsumerConfig { BootstrapServers = "broker-1:9092,broker-2:9092", GroupId = "portfolio-consumer", EnableAutoCommit = false, // Важно подобрать под реальное время обработки. MaxPollIntervalMs = 300000, SessionTimeoutMs = 10000 };
Я специально не добавляю сюда AutoOffsetReset. Для новой группы в dev- или bootstrap-сценарии это бывает полезно, но в продовой статье такую строку слишком легко скопировать вслепую и внезапно перечитать весь topic.
Упрощенный пример обработки:
while (!cancellationToken.IsCancellationRequested) { var consumeResult = consumer.Consume(cancellationToken); try { await HandleMessageAsync(consumeResult.Message, cancellationToken); // Commit только после успешной обработки. consumer.Commit(consumeResult); } catch (Exception ex) { logger.LogError(ex, "Kafka message processing failed"); // Дальше зависит от стратегии: // retry, DLQ, stop consumer, manual intervention. // Главное — не делать вид, что commit все вылечит. throw; } }
Какие настройки producer’а проверить
Compression редко живет одна. Она связана с batching и гарантиями доставки.
Настройка | Зачем смотреть |
|---|---|
| Собственно codec: gzip, snappy, lz4, zstd |
| Чем лучше batch, тем лучше может раскрыться сжатие |
Позволяет немного подождать, чтобы набрать batch | |
| Уровень подтверждения записи broker’ами |
| Защита producer’а от дублей при retry |
| Лимит размера request producer’а |
Таймаут сообщения на стороне producer’а |
Про linger.ms важно сказать отдельно. Если поставить его слишком высоким, можно получить лишнюю задержку. Если поставить слишком низким, batch’и будут маленькими и compression может раскрыться хуже. Баланс зависит от вашей нагрузки.
Для latency-sensitive сценариев я бы начинал осторожно:
compression.type=gzip linger.ms=5 acks=all enable.idempotence=true
А дальше смотрел бы не на веру в конфиг, а на графики. batch.size я бы тюнил отдельно, уже после замеров: это полезная ручка, но не та, которую стоит вставлять в статью как универсальную магическую цифру.
Какие настройки consumer’а проверить
Настройка | Почему важна |
|---|---|
| Для бизнес-обработки часто лучше отключить и коммитить вручную |
Если обработка долгая, consumer может быть исключен из group | |
Как быстро group coordinator решит, что consumer умер | |
Частота heartbeat | |
| Сколько данных consumer может получить за fetch |
| Лимит данных на partition |
Если сообщение стало меньше на диске из-за compression, это не значит, что бизнес-обработка стала бесплатной. Consumer все равно получит исходный payload и обработает его. Если bottleneck был в БД, compression не сделает SQL-запросы быстрее. Она поможет диску и сети, но не вылечит все подряд.
Отдельно отмечу: в Java-мире часто вспоминают max.poll.records, но в статье у нас стек C# + Confluent.Kafka, поэтому здесь лучше держаться тех настроек, которые реально доступны и привычны для .NET-клиента.
Что мониторить после включения сжатия
После включения compression я бы смотрел минимум на такие метрики:
Метрика | Почему важна |
|---|---|
Disk usage broker’ов | Главная метрика нашего кейса |
Bytes in / bytes out | Должны увидеть снижение объема |
Produce request latency | Проверяем, не стало ли producer’ам больно |
Consumer fetch latency | Проверяем, не стало ли consumer’ам больно |
CPU producer’ов | Сжатие оплачивается CPU |
CPU consumer’ов | Распаковка тоже не бесплатная |
Consumer group lag | Смотрим, не ухудшилась ли обработка |
Rebalance count | Частые rebalance могут маскировать другие проблемы |
Under replicated partitions | Сигнал проблем брокеров |
Failed produce requests | Нельзя радоваться экономии диска, если запись стала падать |
Хороший результат — это не просто “диск стал меньше”. Хороший результат:
диск перестал расти до опасных значений;
producer latency осталась приемлемой;
consumer lag не ухудшился критично;
CPU не стал новым пожаром;
replay/DLQ/debug не превратились в археологию.
Что может пойти не так
1. Включили compression и забыли про CPU
Если producer’ы и так жили на пределе CPU, gzip может сделать им неприятно. В таком случае стоит сравнить codec’и. Например, lz4 часто выбирают как компромисс между скоростью и степенью сжатия, zstd может хорошо сжимать, но тоже требует проверки на вашем payload.
2. Ждали, что старые данные сразу станут меньше
Compression влияет на новые batches. Уже записанные log segments не станут внезапно меньше только потому, что вы поменяли producer config. Старые данные уйдут по retention, когда Kafka до них доберется.
3. Перепутали native compression и ручной zip payload
Ручной zip payload может быть нужен в отдельных случаях, но это уже изменение контракта. Все consumer’ы должны знать, что внутри не обычный JSON/Avro, а архив. DLQ, replay, локальная отладка и интеграционные тесты становятся веселее. Слово “веселее” здесь из тех, после которых обычно заводят отдельный регламент.
4. Решили, что compression лечит lag
Compression может помочь, если bottleneck был в сети или broker storage. Но если consumer долго пишет в БД, вызывает внешний сервис или держит транзакцию, lag останется с вами. Он просто будет смотреть на вас из другого графика.
5. Увеличили количество consumer’ов больше числа partitions
Это не ускорит чтение одного topic в рамках одной group. Сначала partitions, потом consumer’ы. Не наоборот.
Чек-лист перед включением Kafka compression
Перед изменением я бы проверил:
какой средний и p95 размер сообщения;
какой формат payload: JSON, XML, Avro, protobuf, binary;
насколько payload вообще сжимается локально;
сколько места занимают topic’и в пике;
какие retention-настройки сейчас включены;
какой прирост disk usage в GB/час;
сколько CPU свободно у producer’ов;
сколько CPU свободно у consumer’ов;
есть ли алерты на disk usage broker’ов;
есть ли алерты на consumer lag;
есть ли план отката codec’а;
есть ли тест на совместимость consumer’ов.
После изменения:
сравнить disk usage до/после;
сравнить bytes in/out;
проверить producer latency;
проверить consumer lag;
проверить CPU;
отдельно посмотреть пиковое окно, а не среднюю температуру по суткам.
Итог
Встроенное сжатие Kafka — не магия и не бесплатный тюнинг ради галочки. Это нормальный инженерный инструмент, когда вы понимаете, какой ресурс хотите сэкономить и каким ресурсом готовы заплатить.
В нашем случае проблема была не в том, что Kafka “плохо чистила” или consumer’ы “как-то не так читали”. Проблема была в балансе: в пиковую нагрузку крупные сообщения занимали диск быстрее, чем retention успевал освобождать место. Включение Kafka native compression уменьшило объем хранения примерно в 5 раз и сняло самое опасное давление с брокеров.
Но вместе с этим важно помнить:
Kafka хранит сообщения по retention, а не удаляет их сразу после чтения;
consumer group не ускоряется бесконечным добавлением consumer’ов;
lag в сообщениях не всегда равен бизнес-задержке;
commit после обработки защищает от потерь, но требует идемпотентности;
compression экономит диск и сеть, но стоит CPU.
Если коротко: сначала измеряем, потом включаем, потом снова измеряем. Иначе это не оптимизация, а шаманство с YAML.
P.S. Что бы ничего не пропустить, подписывайтесь https://t.me/pro_it_live
