Привет, Хабр! Меня зовут Павел, я ведущий разработчик. В этой статье расскажу про Kafka, consumer groups, lag, offset commit и встроенное сжатие сообщений. Не в формате “что такое Kafka за 15 минут”, а через обычную продовую историю, где кластер начал есть диск быстрее, чем политики очистки успевали его освобождать.

Снаружи проблема выглядела банально: Kafka живет, producer’ы пишут, consumer’ы читают, retention настроен. Внутри было веселее: в пиковую нагрузку место на брокерах улетало так бодро, будто у него был отдельный KPI на исчезновение. Retention вроде бы должен был чистить старые данные, но он не маг. Если входящий поток крупных сообщений быстрее, чем Kafka успевает освобождать сегменты, диск все равно закончится. А когда диск заканчивается у Kafka, настроение портится не только у Kafka.

В итоге мы пришли к встроенному сжатию Kafka на producer’ах. Без ручного zip payload, без “давайте завернем JSON в архив и пусть каждый consumer теперь разбирается сам”. Просто включили compression на уровне Kafka-клиента. В нашем случае объем сообщений на хранении уменьшился примерно в 5 раз. Это не отменило необходимость следить за retention, lag и consumer groups, но дало кластеру перестать жить в режиме “еще один пик, и пишем посмертную”.

Ниже разберу:

  • почему retention не всегда спасает от заполнения диска;

  • чем Kafka native compression отличается от ручного zip’ования payload;

  • как включить сжатие в C# через Confluent.Kafka;

  • как consumer groups, lag и offset commit связаны с этой историей;

  • что мониторить, чтобы не узнать о проблеме по тишине сервиса.

Kafka под пиковой нагрузкой
Kafka под пиковой нагрузкой

У нас была система, где несколько producer’ов писали события в Kafka. События были достаточно крупные, поток в обычное время держался нормально, consumer’ы успевали читать, retention был настроен. В общем, все выглядело как система, которую не хочется трогать без причины.

Проблема появилась в пиковой нагрузке. Объем входящих сообщений резко увеличивался, Kafka активно писала новые log segments, а политики очистки не успевали освобождать место с той же скоростью. В какой-то момент диск на брокерах стал главным участником архитектуры: не потому что он умный, а потому что без свободного места все остальные компоненты внезапно становятся философами и начинают рассуждать о бренности SLA.

Упрощенная картина была такая:

Компонент

Что делал

Почему это стало проблемой

Producer’ы

Писали крупные сообщения в Kafka

В пике быстро занимали место на брокерах

Kafka brokers

Хранили log segments

Диск рос быстрее, чем освобождался

Retention/cleanup

Удалял старые сегменты

Не успевал компенсировать входящий поток

Consumer group

Читала сообщения

Чтение не решало проблему storage, потому что Kafka хранит данные по retention

Кластер

Пытался жить

При заполнении диска начинались неприятные эффекты для всех

Важно: consumer может читать быстро, но это не значит, что Kafka сразу удалит сообщение. Kafka не очередь в классическом смысле “прочитал — удалилось”. Она хранит log по политикам retention. Поэтому “consumer’ы же читают” не является ответом на вопрос “почему диск заканчивается”.

Почему retention не спас

Retention в Kafka обычно задают по времени, по размеру или комбинацией этих правил. Например:

  • хранить сообщения не дольше N часов/дней;

  • хранить не больше N байт на локальный лог partition;

  • периодически проверять, какие log segments можно удалить.

Но тут есть несколько важных деталей.

Kafka удаляет не отдельные сообщения, а log segments. Если segment еще активный или не подходит под условия удаления, данные внутри него продолжают занимать место. Очистка происходит не непрерывно в каждую наносекунду, а по своим циклам. И самое главное: если producer’ы в пике пишут быстрее, чем старые сегменты освобождают диск, retention не становится волшебной кнопкой.

Можно представить это как ванну:

Сценарий

Что происходит

Вода вливается медленнее, чем уходит

Все спокойно

Вода вливается примерно с той же скоростью

Живем на грани, но красиво

Вода вливается быстрее, чем уходит

Рано или поздно вода будет не только в ванной

У нас Kafka была этой ванной. Только вместо воды были сообщения, вместо пола — диск брокера, а вместо соседей снизу — сервисы, которые внезапно начинали страдать вместе с кластером.

Первым желанием может быть: “давайте сильнее уменьшим retention”. Иногда это нормальный путь. Но он не всегда подходит:

  • бизнесу может быть нужно окно переобработки;

  • consumer’ы могут временно отставать;

  • replay может быть частью нормальной эксплуатации;

  • слишком агрессивный retention превращает временный лаг в потерю возможности восстановиться.

Поэтому мы пошли не только в сторону “быстрее удалять”, а в сторону “меньше писать на диск”.

Что именно мы включили

Мы использовали встроенное сжатие Kafka. Это принципиальный момент.

Мы не делали так:

message.Value = Zip(JsonPayload)

И не заставляли каждый consumer вручную распаковывать payload. Мы включили compression на уровне producer config. Producer собирает batch сообщений и сжимает его. Broker обычно хранит batch в producer codec, если на broker/topic не переопределен compression.type. Если переопределен, broker может перекомпрессировать данные. Consumer получает данные и прозрачно распаковывает их через Kafka-клиент.

Kafka native compression
Kafka native compression

Разница между встроенным сжатием Kafka и ручным zip payload:

Подход

Что сжимается

Что видит приложение

Плюсы

Минусы

Kafka native compression

Batch сообщений на уровне Kafka client

Тот же payload, что и раньше

Не меняет контракт, проще consumer’ы, проще replay/DLQ

Нужен CPU на сжатие/распаковку

Ручной zip/gzip payload

Само значение сообщения

Архив/bytes вместо привычной структуры

Полный контроль над форматом

Все consumer’ы должны знать, как распаковать; сложнее дебажить

В нашем случае был нужен именно первый вариант. Мы хотели экономить место на брокерах, не переписывая контракт интеграции.

Пример настройки в C# + Confluent.Kafka

Ниже обезличенный пример producer config. В статье я показываю Gzip, потому что в нашем кейсе речь была именно про “зипование” на стороне Kafka. Если у вас используется Lz4, Snappy или Zstd, логика та же, меняется codec и профиль CPU/степени сжатия.

using Confluent.Kafka;

var producerConfig = new ProducerConfig
{
    BootstrapServers = "broker-1:9092,broker-2:9092",
    Acks = Acks.All,
    EnableIdempotence = true,

    // Kafka native compression:
    // producer сжимает batch,
    // broker обычно хранит его в producer codec
    // или перекомпрессирует, если compression.type переопределен,
    // consumer распаковывает прозрачно.
    CompressionType = CompressionType.Gzip,

    // Сжатие лучше раскрывается на batch'ах.
    // Конкретные значения нужно подбирать под нагрузку и latency.
    LingerMs = 5
};

Эквивалент на уровне librdkafka properties:

compression.type=gzip
acks=all
enable.idempotence=true
linger.ms=5

Если конфигурация живет в appsettings.json, она может выглядеть примерно так:

{
  "Kafka": {
    "Producer": {
      "BootstrapServers": "broker-1:9092,broker-2:9092",
      "Acks": "All",
      "EnableIdempotence": true,
      "CompressionType": "Gzip",
      "LingerMs": 5
    }
  }
}

Главная идея: мы не меняем бизнес-сообщение. Consumer’ы продолжают читать ту же структуру. Меняется то, как Kafka-клиент упаковывает batch при отправке.

Почему объем уменьшился примерно в 5 раз

Сжатие хорошо работает, когда в сообщениях много повторяемых структур. В типичных интеграционных событиях это встречается часто:

  • повторяются имена полей;

  • повторяются технические блоки;

  • повторяются идентификаторы типов, статусы, названия сущностей;

  • сообщения идут пачками и похожи друг на друга;

  • JSON/XML обычно сжимаются заметно лучше, чем уже сжатые бинарные данные.

Если до включения compression Kafka хранила условные 100 единиц объема, после включения встроенного gzip в нашем кейсе стало около 20. То есть примерно в 5 раз меньше.

Упрощенная таблица эффекта:

Вариант

Размер на диске Kafka

Эффект

Что получили

Без сжатия

100%

Базовая линия

Диск быстро заканчивался в пик

Kafka gzip

около 20%

примерно x5

Кластер перестал задыхаться по месту

Ручной zip payload

не использовали

не требовался

Не стали усложнять consumer’ы

Сразу оговорюсь: “примерно в 5 раз” — не универсальная константа. Если у вас уже сжатые изображения, архивы или плотный бинарный payload, эффект будет другим. Если у вас большой JSON с повторяющимися полями, gzip/zstd могут дать очень хороший результат.

Цена сжатия

У сжатия есть цена. Оно не берется из воздуха, хотя иногда очень хочется верить, что где-то в кластере есть бесплатная полка с производительностью.

Что меняется:

Ресурс

Что происходит

CPU producer’а

Растет, потому что producer сжимает batch

CPU consumer’а

Растет, потому что consumer распаковывает batch

Сеть

Обычно становится легче

Диск брокера

Обычно становится легче

Latency

Может немного измениться из-за batching и compression

Debug

При native compression почти не усложняется для приложения

Поэтому включать compression стоит не по принципу “у нас теперь модно”, а по понятной причине:

  • диск стал бутылочным горлышком;

  • сеть стала бутылочным горлышком;

  • сообщения хорошо сжимаются;

  • CPU producer/consumer позволяет это оплатить;

  • latency не выходит за допустимые границы.

В нашем случае сделка была выгодной: CPU оказался дешевле, чем свободное место на брокерах во время пика.

Почему здесь вообще consumer groups

На первый взгляд можно спросить: если статья про disk pressure и compression, зачем сюда consumer groups?

Потому что в реальной эксплуатации Kafka эти вещи редко живут отдельно. Когда диск начинает заполняться, команда почти всегда смотрит еще и на lag, consumer groups, скорость обработки, rebalance и offset commit. И очень легко перепутать симптомы.

Например:

  • lag растет, потому что consumer’ы реально не успевают;

  • lag растет, потому что downstream DB тормозит;

  • lag выглядит страшно в сообщениях, но по времени задержка терпимая;

  • consumer’ы читают нормально, но диск все равно растет, потому что retention хранит данные независимо от факта чтения;

  • rebalance временно останавливает обработку и создает ощущение, что Kafka “опять сама что-то делает”.

Consumer group в Kafka устроена просто, пока не нужно объяснять это ночью.

В рамках одной consumer group один partition назначается только одному consumer’у. Если у topic 6 partitions, а вы подняли 20 consumer’ов в одной группе, 14 из них не начнут магически ускорять чтение этого topic. Они будут стоять без partition assignment и думать о смысле своего деплоя.

Количество partitions

Количество consumer’ов в group

Что будет

6

3

Каждый consumer может читать несколько partitions

6

6

Обычно по одному partition на consumer

6

20

Лишние consumer’ы останутся без работы

Поэтому, когда вы боретесь с lag, нельзя просто сказать “добавим pod’ов”. Сначала надо посмотреть:

  • сколько partitions у topic;

  • как распределены partitions между consumer’ами;

  • нет ли горячих partitions;

  • не тормозит ли обработка внутри consumer’а;

  • не упираетесь ли вы в БД, внешний сервис или сериализацию.

Lag: цифра, которая любит притворяться ответом

Kafka lag часто показывают как количество сообщений между последним записанным offset и последним закоммиченным offset consumer group.

Это полезная метрика, но сама по себе она не отвечает на главный вопрос: насколько мы опаздываем с точки зрения бизнеса?

Пример:

Ситуация

Offset lag

Реальная задержка

100 000 маленьких сообщений

Большой

Может обработаться за минуты

5 000 тяжелых сообщений

Меньше

Может обрабатываться дольше

Один горячий partition

Средний

Один consumer страдает, остальные скучают

Consumer быстро читает, но долго пишет в БД

Растет

Kafka не виновата, но ей все равно прилетит

Для продовой картины лучше смотреть не только offset lag, но и:

  • age/event-time lag: насколько старое последнее необработанное событие;

  • скорость входящего потока;

  • скорость обработки;

  • размер сообщений;

  • disk usage broker’ов;

  • under replicated partitions;

  • consumer rebalance count;

  • ошибки produce/fetch;

  • latency записи в downstream БД.

В нашем случае lag был важен, но корень проблемы был в storage: Kafka должна была хранить слишком много данных в пиковый момент. Сжатие помогло именно там.

Offset commit: где появляются дубли

Теперь про вещь, которую лучше понимать до того, как у вас появились дубли в бизнес-операциях.

Consumer получает сообщение, обрабатывает его и коммитит offset. Вопрос в порядке действий.

Подход

Что может случиться

Commit до обработки

Если consumer упадет после commit, сообщение можно потерять

Commit после обработки

Если consumer упадет после обработки, но до commit, сообщение придет повторно

Auto commit без понимания процесса

Можно получить неприятный сюрприз и долго спорить с логами

Обычно для бизнес-операций безопаснее commit после успешной обработки. Это дает at-least-once: сообщение не теряется, но может прийти повторно. Значит, обработчик должен быть идемпотентным.

Offset commit и дубль
Offset commit и дубль

Типовая ситуация:

  1. Consumer получил сообщение offset=42.

  2. Записал результат в БД.

  3. Упал до commit offset.

  4. После рестарта Kafka снова отдала offset=42.

  5. Если запись в БД не идемпотентна, получили дубль.

Это не “Kafka сломалась”. Это нормальная цена at-least-once. Kafka честно говорит: “Я не видела commit, держи сообщение еще раз”. А дальше уже вопрос к приложению: умеет ли оно повторную обработку пережить без истерики.

Пример consumer config:

var consumerConfig = new ConsumerConfig
{
    BootstrapServers = "broker-1:9092,broker-2:9092",
    GroupId = "portfolio-consumer",
    EnableAutoCommit = false,
    // Важно подобрать под реальное время обработки.
    MaxPollIntervalMs = 300000,
    SessionTimeoutMs = 10000
};

Я специально не добавляю сюда AutoOffsetReset. Для новой группы в dev- или bootstrap-сценарии это бывает полезно, но в продовой статье такую строку слишком легко скопировать вслепую и внезапно перечитать весь topic.

Упрощенный пример обработки:

while (!cancellationToken.IsCancellationRequested)
{
    var consumeResult = consumer.Consume(cancellationToken);

    try
    {
        await HandleMessageAsync(consumeResult.Message, cancellationToken);

        // Commit только после успешной обработки.
        consumer.Commit(consumeResult);
    }
    catch (Exception ex)
    {
        logger.LogError(ex, "Kafka message processing failed");

        // Дальше зависит от стратегии:
        // retry, DLQ, stop consumer, manual intervention.
        // Главное — не делать вид, что commit все вылечит.
        throw;
    }
}

Какие настройки producer’а проверить

Compression редко живет одна. Она связана с batching и гарантиями доставки.

Настройка

Зачем смотреть

compression.type

Собственно codec: gzip, snappy, lz4, zstd

batch.size

Чем лучше batch, тем лучше может раскрыться сжатие

linger.ms

Позволяет немного подождать, чтобы набрать batch

acks

Уровень подтверждения записи broker’ами

enable.idempotence

Защита producer’а от дублей при retry

max.request.size

Лимит размера request producer’а

message.timeout.ms

Таймаут сообщения на стороне producer’а

Про linger.ms важно сказать отдельно. Если поставить его слишком высоким, можно получить лишнюю задержку. Если поставить слишком низким, batch’и будут маленькими и compression может раскрыться хуже. Баланс зависит от вашей нагрузки.

Для latency-sensitive сценариев я бы начинал осторожно:

compression.type=gzip
linger.ms=5
acks=all
enable.idempotence=true

А дальше смотрел бы не на веру в конфиг, а на графики. batch.size я бы тюнил отдельно, уже после замеров: это полезная ручка, но не та, которую стоит вставлять в статью как универсальную магическую цифру.

Какие настройки consumer’а проверить

Настройка

Почему важна

enable.auto.commit

Для бизнес-обработки часто лучше отключить и коммитить вручную

max.poll.interval.ms

Если обработка долгая, consumer может быть исключен из group

session.timeout.ms

Как быстро group coordinator решит, что consumer умер

heartbeat.interval.ms

Частота heartbeat

fetch.max.bytes

Сколько данных consumer может получить за fetch

max.partition.fetch.bytes

Лимит данных на partition

Если сообщение стало меньше на диске из-за compression, это не значит, что бизнес-обработка стала бесплатной. Consumer все равно получит исходный payload и обработает его. Если bottleneck был в БД, compression не сделает SQL-запросы быстрее. Она поможет диску и сети, но не вылечит все подряд.

Отдельно отмечу: в Java-мире часто вспоминают max.poll.records, но в статье у нас стек C# + Confluent.Kafka, поэтому здесь лучше держаться тех настроек, которые реально доступны и привычны для .NET-клиента.

Что мониторить после включения сжатия

После включения compression я бы смотрел минимум на такие метрики:

Метрика

Почему важна

Disk usage broker’ов

Главная метрика нашего кейса

Bytes in / bytes out

Должны увидеть снижение объема

Produce request latency

Проверяем, не стало ли producer’ам больно

Consumer fetch latency

Проверяем, не стало ли consumer’ам больно

CPU producer’ов

Сжатие оплачивается CPU

CPU consumer’ов

Распаковка тоже не бесплатная

Consumer group lag

Смотрим, не ухудшилась ли обработка

Rebalance count

Частые rebalance могут маскировать другие проблемы

Under replicated partitions

Сигнал проблем брокеров

Failed produce requests

Нельзя радоваться экономии диска, если запись стала падать

Хороший результат — это не просто “диск стал меньше”. Хороший результат:

  • диск перестал расти до опасных значений;

  • producer latency осталась приемлемой;

  • consumer lag не ухудшился критично;

  • CPU не стал новым пожаром;

  • replay/DLQ/debug не превратились в археологию.

Что может пойти не так

1. Включили compression и забыли про CPU

Если producer’ы и так жили на пределе CPU, gzip может сделать им неприятно. В таком случае стоит сравнить codec’и. Например, lz4 часто выбирают как компромисс между скоростью и степенью сжатия, zstd может хорошо сжимать, но тоже требует проверки на вашем payload.

2. Ждали, что старые данные сразу станут меньше

Compression влияет на новые batches. Уже записанные log segments не станут внезапно меньше только потому, что вы поменяли producer config. Старые данные уйдут по retention, когда Kafka до них доберется.

3. Перепутали native compression и ручной zip payload

Ручной zip payload может быть нужен в отдельных случаях, но это уже изменение контракта. Все consumer’ы должны знать, что внутри не обычный JSON/Avro, а архив. DLQ, replay, локальная отладка и интеграционные тесты становятся веселее. Слово “веселее” здесь из тех, после которых обычно заводят отдельный регламент.

4. Решили, что compression лечит lag

Compression может помочь, если bottleneck был в сети или broker storage. Но если consumer долго пишет в БД, вызывает внешний сервис или держит транзакцию, lag останется с вами. Он просто будет смотреть на вас из другого графика.

5. Увеличили количество consumer’ов больше числа partitions

Это не ускорит чтение одного topic в рамках одной group. Сначала partitions, потом consumer’ы. Не наоборот.

Чек-лист перед включением Kafka compression

Перед изменением я бы проверил:

  • какой средний и p95 размер сообщения;

  • какой формат payload: JSON, XML, Avro, protobuf, binary;

  • насколько payload вообще сжимается локально;

  • сколько места занимают topic’и в пике;

  • какие retention-настройки сейчас включены;

  • какой прирост disk usage в GB/час;

  • сколько CPU свободно у producer’ов;

  • сколько CPU свободно у consumer’ов;

  • есть ли алерты на disk usage broker’ов;

  • есть ли алерты на consumer lag;

  • есть ли план отката codec’а;

  • есть ли тест на совместимость consumer’ов.

После изменения:

  • сравнить disk usage до/после;

  • сравнить bytes in/out;

  • проверить producer latency;

  • проверить consumer lag;

  • проверить CPU;

  • отдельно посмотреть пиковое окно, а не среднюю температуру по суткам.

Итог

Встроенное сжатие Kafka — не магия и не бесплатный тюнинг ради галочки. Это нормальный инженерный инструмент, когда вы понимаете, какой ресурс хотите сэкономить и каким ресурсом готовы заплатить.

В нашем случае проблема была не в том, что Kafka “плохо чистила” или consumer’ы “как-то не так читали”. Проблема была в балансе: в пиковую нагрузку крупные сообщения занимали диск быстрее, чем retention успевал освобождать место. Включение Kafka native compression уменьшило объем хранения примерно в 5 раз и сняло самое опасное давление с брокеров.

Но вместе с этим важно помнить:

  • Kafka хранит сообщения по retention, а не удаляет их сразу после чтения;

  • consumer group не ускоряется бесконечным добавлением consumer’ов;

  • lag в сообщениях не всегда равен бизнес-задержке;

  • commit после обработки защищает от потерь, но требует идемпотентности;

  • compression экономит диск и сеть, но стоит CPU.

Если коротко: сначала измеряем, потом включаем, потом снова измеряем. Иначе это не оптимизация, а шаманство с YAML.

P.S. Что бы ничего не пропустить, подписывайтесь https://t.me/pro_it_live