Привет, Хабр!
Сегодня рассмотрим, почему отставание у Kafka‑консьюмеров — это не просто строчка в kafka-consumer-groups, а метрика, от которой зависит SLA вашего сервиса. Рассмотрим, как её считать без самообмана, как соорудить собственный мониторинг на Python и Go, а главное — чем именно тушить всплески lag»а: throttle, autoscale и backpressure.
Как считать lag правильно и почему offset ≠ задержка
Слово «lag» используют лениво в двух разных смыслах.
По количеству сообщений. Классическая формула: latest_offset – committed_offset. latest_offset — крайний смещённый офсет каждого partition»a на брокере; committed_offset — то, что консьюмер группа уже зафиксировала в __consumer_offsets. Но тот же CLI kafka-consumer-groups.sh --describe выводит ещё current_offset — номер последнего прочитанного (но не обязательно закоммиченного) сообщения. Многие путают их и получают «плавающий» lag.
По времени. Когда бизнесу важна реальная задержка доставки, считают: now() – timestamp(последнего прочитанного сообщения). Это показательно на топиках с batch‑продюсерами, где сообщения пачками пуляются раз в N секунд. Time‑lag хорош тем, что уровень нагрузки выражается в секундах и понятен продактам, но требует тянуть таймстемпы событий.
Разница между committed, latest и current offset
latest хранит брокер, он увеличивается всегда.
current живёт в памяти конкретного консьюмера и обновляется сразу после
poll().committed попадает в
__consumer_offsets, когда вы вызвалиcommitAsync()/commitSync()или это сделала framework‑обвязка.
Если группа упала до коммита — current убежит вперёд, committed останется старым, а CLI покажет аномальный всплеск lag»а. Именно поэтому производственные метрики считают по committed, а в коде полезно держать gauge и для current, чтобы ловить «разрывы».
kafka-consumer-groups vs метрики в коде
CLI‑скрипт прекрасен для ад‑хока, но запускается долго, опрашивает брокеры последовательно и нагружает зоопарк кучи RPC. По факту удобнее:
JMX‑метрики
records-lag/records-lag-maxпрямо из клиента;Prometheus‑экспортеры (
kafka-lag-exporter,kafka_exporter,Burrow). Они собирают offset»ы батчами и кэшируют.
Простая формула realtime-lag в коде
lag = latest_offset - committed_offset # сообщений time_lag_ms = int(time.time()*1000) - last_ts # миллисекунд
Считать надо для каждой пары <topic, partition>, потом суммировать поверх партиций группы.
Реализация кастомного lag-мониторинга на Python и Go
from confluent_kafka import KafkaException from confluent_kafka.admin import AdminClient, ListOffsetsRequest, ListOffsetsResult from prometheus_client import Gauge, start_http_server BROKERS = "kafka-broker-1:9092,kafka-broker-2:9092" TOPIC = "payments" GROUP = "billing-service" lag_gauge = Gauge('kafka_consumer_lag', 'Lag per {topic,partition}', ['topic', 'partition']) admin = AdminClient({'bootstrap.servers': BROKERS}) coordinator = admin.list_consumer_groups().result()[GROUP] def calc_partition_lag(tp): committed = admin.list_consumer_group_offsets(GROUP, partitions=[tp]).result()[tp].offset latest = admin.list_offsets({tp: ListOffsetsRequest.LATEST}).result()[tp].offset return latest - committed for p in admin.list_topics(TOPIC).topics[TOPIC].partitions: tp = (TOPIC, p) lag = calc_partition_lag(tp) lag_gauge.labels(TOPIC, p).set(lag)
Скрипт запускается как side‑car, открывает /metrics, и Prometheus подтягивает гейдж раз в 10 секунд.
requirements
# requirements.txt confluent-kafka~=2.5.1 # ≥ 2.5, фикс CVE-2024-02xx, поддержка ListOffsetsRequest.LATEST prometheus-client~=0.20.0 # последняя стабильная на апрель 2025
Go
package main import ( "context" "github.com/segmentio/kafka-go" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" "log" "net/http" ) var ( lag = prometheus.NewGaugeVec( prometheus.GaugeOpts{ Name: "kafka_consumer_lag", Help: "Lag per topic/partition", }, []string{"topic", "partition"}, ) ) func main() { prometheus.MustRegister(lag) conn, _ := kafka.Dial("tcp", "kafka-broker-1:9092") defer conn.Close() partitions, _ := conn.ReadPartitions("payments") for _, p := range partitions { latest, _ := conn.ReadLastOffset(p.Topic, p.ID) committed, _ := conn.ReadCommittedOffset( kafka.GroupOffset{ Group: "billing-service", Topic: p.Topic, Partition: p.ID, }) lag.WithLabelValues(p.Topic, strconv.Itoa(p.ID)). Set(float64(latest - committed)) } http.Handle("/metrics", promhttp.Handler()) log.Fatal(http.ListenAndServe(":2112", nil)) }
gomod
// go.mod module github.com/you/kafka-lag-exporter go 1.22 require ( github.com/segmentio/kafka-go v0.5.5 // ≥ 0.5 — ReadCommittedOffset переименован github.com/prometheus/client_golang v1.18.0 )
Плюс — чистый standard lib + promclient, минус — нет встроенного кеша offset»ов, поэтому таймауты и batch‑poll целиком на вас.
Построение панели
Prometheus → Grafana — самый короткий путь: sum(kafka_consumer_lag) на графике, alert на > 1000 со срабатыванием ≤ 1 минуты.
Simple UI — FastAPI + HTMX отрисовывает таблицу лагов, обновляя дифф через SSE, неплохо заходит в разработке, когда Grafana ещё недоступна.
Поддержка партиций реализуется банально: цикл по list_topics() и асинхронные list_offsets/OffsetFetch. Главное — не склеивать всё в один RPC, иначе брокер отдаст 50×1 000 партиций и упрётся в сетевой MTU.
Как реагировать на рост lag: throttle, scale, backpressure
Автоматизация реакций
Пороговый alert в Grafana стучит в PagerDuty, а параллельно метрика попадает в Kubernetes‑кластер, где KEDA дергает HorizontalPodAutoscaler. Скейл‑фактор пропорционален lag»у: каждые N сообщений добавляют под.
apiVersion: keda.sh/v1alpha1 kind: ScaledObject metadata: name: billing-consumer spec: scaleTargetRef: name: billing-consumer minReplicaCount: 1 maxReplicaCount: 10 triggers: - type: kafka metadata: bootstrapServers: kafka-broker-1:9092 consumerGroup: billing-service topic: payments lagThreshold: "500"
Черновой, но рабочий пример: как только суммарный lag переваливает 500, KEDA масштабирует deployment.
Throttle
Часто вы ограничены числом разделов или лицензиями Confluent Cloud, и скейлить некуда. Тогда:
while True: batch = consumer.poll(timeout_ms=100, max_records=100) process(batch) if lag_gauge.get() > 5000: time.sleep(0.2) # мягкий back-off
Наглядно, но важно: sleep держите маленьким (мс 200–500), иначе консьюмер выпадет из rebalance‑протокола и сломает группу.
Backpressure через pause/resume
Для Java‑клиента у вас есть consumer.pause(partitions) и resume(partitions). Они позволяют остановить приём новых сообщений, продолжая poll() и не давая группе ребаланситься. Реализуйте счётчик in‑flight задач, достигли high‑water‑mark — вызывайте pause. Закончили — resume.
В реактивных обвязках (Project Reactor‑Kafka, Spring Kafka) pause/resume уже завёрнуты, но не забывайте, что прямой вызов KafkaConsumer.pause() без ведома контейнера ломает контракт и после ребаланса partition возобновится сам.
Внешние очереди
Если бизнес‑сервису тяжело обрабатывать пики, проще буферизовать в отдельной системной очереди — Redis Streams, RabbitMQ или тот же PostgreSQL. Kafka‑консьюмер превращается в своебразный перекачивающий насос, а пользовательский воркер читает из очереди с контролируемой скоростью. Конфигурация чуть сложнее, зато lag на Kafka держится плоским, а «просадка» уходит в дешёвое дисковое хранилище.
Выводы
Отставание консьюмеров — это не просто цифра в CLI. Правильное измерение требует понимания трёх офсетов и временных таймстемпов, а стабильность достигается комбинацией:
Тонких метрик и быстрых алертов.
Гибкого автоскейлинга с порогами на lag.
Локального throttling»а и pause/resume при всплесках.
Архитектурного буфера, когда нагрузка принципиально «взрывная».
Если вы сталкиваетесь с проблемами интеграции и управления данными в микросервисах или API, то знакомы с тем, как ошибки и сложности могут возникать из-за гибкости JSON. Schema Registry решает эти проблемы, обеспечивая структуру и стандартизацию данных, что критически важно для масштабируемости и надежности.
Как выбрать между JSON и Schema Registry, когда каждый подход уместен, и как внедрить Schema Registry в своих проектах для улучшения поддержки и совместимости данных? Поговорим об этом на открытом уроке 19 мая.
Максимум практики по работе с Kafka для инженеров данных и разработчиков можно получить на онлайн-курсе "Apache Kafka".
