Привет, Хабр!
Сегодня рассмотрим, как построить Kafka‑консьюмер, который не падёт при первой же проблеме, а аккуратно сложит битые события в Dead Letter Queue (DLQ).
Когда и зачем нужен DLQ
В Kafka жизненно важно различать две плоскости:
Плоскость | Что происходит без DLQ | Что хотим видеть |
|---|---|---|
Обработка | Консьюмер читает сообщение, попытка | Консьюмер коммитит оффсет, а проблемное событие перекидывает в DLQ‑топик. |
Хранение | Сообщение остаётся в основном топике и будет «досаждать» ретраями до конца ретеншена. | Чётко знаем: «в DLQ лежат только фэйлы, прод стрим идёт дальше». |
Типовые сценарии
Ситуация | Повторяемая? | Действие |
|---|---|---|
Не‑валидный JSON / Avro‑схема | Fatal | Сразу в DLQ, смысла ретраить нет. |
Временная недоступность БД | Retryable | DLQ с отложенным re‑consume после таймаута. |
Новая версия схемы (schema registry lag) | Retryable | Либо автоматический retry, либо DLQ → пересчитай позже. |
Confluent и ряд энтузиастов приводят такую практику: разделяйте ошибки на fatal и retryable, метите их в header (error.class, error.stacktrace, retryable=true/false).
Где теряются сообщения, если DLQ нет?
Консьюмер зависает на ядовитом сообщении, оффсеты не коммитятся, всё после него не читается.
DevOps убивает pod — Kubernetes рестартует, — ситуация повторяется.
Через N часов приходит ваш SRE и задаёт вопрос: «Почему в топике lag 50 млн, а бизнес‑процесс мёртв?»
Реализация DLQ на примере Kafka Consumer
Python (confluent-kafka)
from confluent_kafka import Consumer, Producer, KafkaException import json, logging, time # Базовый конфиг common = { "bootstrap.servers": "kafka-prod:9092", "group.id": "enricher-v1", "auto.offset.reset": "earliest", "enable.auto.commit": False, } consumer = Consumer(common | {"key.deserializer": str}) producer = Producer({"bootstrap.servers": common["bootstrap.servers"]}) SOURCE_TOPIC = "clicks.raw" DLQ_TOPIC = f"{SOURCE_TOPIC}.dlq" def push_dlq(msg, exc, retryable: bool): """Проксируем оригинальное сообщение в DLQ c расширенными headers.""" headers = msg.headers() or [] headers.extend([ ("error.class", str(type(exc)).encode()), ("error.message", str(exc).encode()), ("retryable", b"1" if retryable else b"0"), ("ts.failed", str(int(time.time() * 1000)).encode()), ]) producer.produce( topic=DLQ_TOPIC, key=msg.key(), value=msg.value(), headers=headers, ) producer.flush(1_000) def handle(msg): payload = json.loads(msg.value()) # может кинуть JSONDecodeError enriched = call_some_db(payload["user_id"]) # может кинуть DBError publish_downstream(enriched) while True: batch = consumer.consume(num_messages=500, timeout=1.0) for m in batch: try: handle(m) consumer.commit(m) # ручной коммит ⇒ at-least-once except json.JSONDecodeError as je: logging.exception("Bad JSON") push_dlq(m, je, retryable=False) consumer.commit(m) except TransientDBError as te: logging.warning("DB temp issue → DLQ for later retry") push_dlq(m, te, retryable=True) consumer.commit(m) except Exception as e: # last-line defense push_dlq(m, e, retryable=False) consumer.commit(m)
Коммит оффсета после пуша в DLQ — иначе мы бы «застряли» на плохом событии. retryable header — пригодится автоматическму ретраю. producer.flush() — в проде заменяем на асинхронный delivery callback + back‑pressure.
Java (Spring Kafka ≥ 2.8)
У Spring всё из коробки благодаря DefaultErrorHandler и DeadLetterPublishingRecoverer:
@Bean public ConcurrentKafkaListenerContainerFactory<String, OrderEvt> kafkaFactory( ConsumerFactory<String, OrderEvt> cf, KafkaOperations<String, OrderEvt> dlqTemplate) { var recoverer = new DeadLetterPublishingRecoverer( dlqTemplate, (rec, ex) -> new TopicPartition(rec.topic() + ".dlq", rec.partition()) ); var errorHandler = new DefaultErrorHandler( recoverer, new FixedBackOff(0L, 0) // сразу в DLQ, без ретраев ); errorHandler.addNotRetryableExceptions(JsonParseException.class); errorHandler.addRetryableExceptions(SQLException.class); var factory = new ConcurrentKafkaListenerContainerFactory<String, OrderEvt>(); factory.setConsumerFactory(cf); factory.setCommonErrorHandler(errorHandler); return factory; } @KafkaListener(topics = "orders.raw", groupId = "enricher-v1") public void onEvent(OrderEvt evt) { // enrich & persist }
DeadLetterPublishingRecoverer автоматически копирует key/value и доклеивает метадату (kafka_dlt-original-topic, kafka_dlt-exception-fqcn, & др.). Для временных сбоев можно поставить FixedBackOff(1000L, 3) — три локальных ретрая до DLQ.
Spring‑team подтянула эти фичи с Apache Kafka 2.8; до этого приходилось писать кастомные SeekToCurrentErrorHandler.
Как мониторить и обрабатывать DLQ
Метрики и алёрты
Что смотрим | Где берём | Порог |
|---|---|---|
|
| >10% от основного топика (5-мин. окно) |
Lag основного consumer‑group |
| рост экспоненциальный |
Кол‑во | ksqlDB / Kafka Streams window count | >1000 за 15 м |
Пример алерт‑правила в Prometheus:
- alert: HighDLQInRate expr: rate(kafka_server_brokertopicmetrics_messages_in_total{topic=~".*\\.dlq"}[5m]) > 100 for: 2m labels: severity: critical annotations: summary: "DLQ inflow is {{ $value }} msg/s" description: "Too many errors hitting DLQ topics"
UI для ручного ревью
kcat (kafkacat) — быстрый просмотр одного сообщения:
kcat -C -t clicks.raw.dlq -o -5 -q
AKHQ / Redpanda Console / Confluent UI — визуальный поиск по key/headers. Когда сообщений десятки тысяч, делают «DLQ‑WorkBench»: удобное SPA, где можно фильтровать по error.class, делать bulk‑reprocess.
Автоматический retry-pipeline
Архитектурный паттерн таков:
clicks.raw.dlq (-- only retryable -->) clicks.retry | | | DLQ-retry-consumer | primary-consumer +----------> clicks.raw -----------+
Retry‑consumer читает только retryable=true. Ставит задержку (например, sleep(300_000) или Scheduled Executor). Пушит обратно в исходный топик с новым header x-retry-count. При превышении MAX_RETRY отправляет в clicks.raw.dlq.permanent — это уже зона ручного разбора.
В 2024-м Confluent добавила готовый компонент — Parallel Consumer с built‑in retry — но он пока в tech‑preview.
Итог
DLQ — это не просто «корзинка для битых сообщений», а фундамент отказоустойчивой стриминговой архитектуры:
Коммит оффсета → живой консьюмер.
Разметка ошибок → осмысленные ретраи.
Метрики → SRE спит спокойно.
Собрали? Тестируем — швыряем кривой JSON и дропаем коннект к БД. Консьюмер улыбается, а в DLQ аккуратно появляется две записи: одна retryable=false, вторая retryable=true. Красота.
В заключение приглашаем на открытый урок 22 мая «Оптимизация Nginx и Angie под высокие нагрузки». Узнайте, как настроить ключевые параметры для стабильной работы серверов при большом трафике, оптимизировать TLS, кэширование и анализировать производительно��ть. Меньше узких мест — больше скорости. Записывайтесь по ссылке.
Любое развитие начинается с честной оценки. Пройдите тест на знание инфраструктуры высоконагруженных систем — он подскажет, куда расти дальше.
