В данной статье я хочу рассказать об подходе к организации параллельной обработки сообщений из Kafka, когда Kafka Topic используется как транспорт бизнес-событий, например транзакций или ордеров, которые необходимо отправить во внешнюю систему. При это важно обеспечить стабильную скорость обработки трафика и надёжность (отсутствие потерь) в условиях, когда downstream-система не на все запросы отвечает стабильно и быстро.

Покажу почему стандартные подходы, такие как обработка batch'ами, в определённых сценариях перестают работать и приводят к деградации производительности или рисками потери сообщений.

Для иллюстрации подходов далее будет использоваться код из демонстрационного проекта на Kotlin с использованием spring boot, webFlux, spring reactor и reactor-kafka. Код проекта не является production-ready: в нём, например, отсутствует обработка rebalance, а также ряд других моментов, обязательных для промышленной системы. Используется reactor-kafka, однако описываемые решения не зависят от конкретного фреймворка работы с Kafka и могут быть реализованы с использованием других.

В статье намеренно опущены детали реализации бизнес-протокола и механизма идемпотентности. В реальной системе они реализованы с использованием внутреннего хранилища и машины состояний обработки ордеров, но это за рамками данной статьи.

Исходные требования и ограничения

У нас есть Kafka Topic с транзакциями, которые необходимо по определённому протоколу отправить во внешнюю систему (контрагенту) и дождаться ответа о статусе: успешно, ошибка или таймаут. По результату обработки мы должны сформировать некоторое бизнес сообщение в следующий топик или отправить транзакцию на переобработку.

Со стороны контрагента предъявлено требование по пропускной способности: система должна обеспечивать обработку не менее 200 TPS входящего трафика транзакций, предназначенных для данного контрагента. При этом формальных ограничений на количество одновременно обрабатываемых транзакций на стороне контрагента нет. Нам не задано некое значение N - число транзакций, которые контрагент способен обрабатывать одновременно, при превышении которого начинается деградация или возникают ошибки и такое мы не фиксировали.

По собранным метрикам штатно работающего в prod сервиса наблюдается следующая картина:

  • среднее время ответа ~ 0.8 сек

  • максимальное время ответа временами достигает ~ 20 сек

Дополнительно существует требование ожидать ответ до 60 секунд и только после этого можно считать обработку завершённой по таймауту.

На первый взгляд, при таком среднем времени ответа обеспечить 200 TPS стандартными подходами не выглядит сложной задачей, даже если изредка встречаются задержки до 20 секунд или отдельные таймауты.

Однако на практике такие задержки периодически начинают носить системный характер. Были эпизоды, когда на стороне контрагента возникал сбой, и определённый процент транзакций стабильно завершался по таймауту, т.к. интеграцией с нами у контрагента стоит несколько внутренних систем, и одна из них начала деградировать. При этом на нашей стороне отсутствовала возможность заранее определить или отфильтровать «проблемные» ордера.

Первая реализация: обработка сообщений batch'ами

На начальном этапе, когда у нас ещё не было полной картины взаимодействия с контрагентом под постоянной нагрузкой, была реализована классическая схема обработки: consumer вычитывает batch сообщений, запускает их параллельную обработку, дожидается завершения всех операций и только после этого выполняет commit offset и переходит к следующему batch'y.

В упрощённом виде алгоритм выглядел следующим образом:

  1. Вычитать batch сообщений из Kafka

  2. Запустить параллельную обработку

  3. Дождаться завершения обработки всех сообщений в batch

  4. Commit offset

  5. Перейти к обработке следующего batch

Пример реализации такого consumer'a с помощью reactor-kafka:

class BatchProcessing(
    val kafkaConfigurationProperties: KafkaConfigurationProperties,
    val recordProcessor: RecordProcessor
) {

    private val logger = LoggerFactory.getLogger(BatchProcessing::class.java)
    private var kafkaTask: Disposable? = null
    private lateinit var kafkaReceiver: KafkaReceiver<String, String>
    private lateinit var sender: KafkaSender<String, String>

    @PostConstruct
    fun init() {
        sender = kafkaConfigurationProperties.createTransactionalSender()
        kafkaReceiver = kafkaConfigurationProperties.createReceiver()

        kafkaTask = kafkaReceiver.receiveExactlyOnce(sender.transactionManager())
            .concatMap<Void> { batch ->
                batch
                    .flatMapSequential { record ->
                      recordProcessor.invoke(record)
                            .onErrorResume { ex -> sendDeadLetter(record, ex) }
                    }
                    .then(sender.transactionManager().commit())

            }
            .subscribe()
    }

    private fun sendDeadLetter(record: ConsumerRecord<String, String>, ex: Throwable): Mono<Void> =
        sender.sendDeadLetter(kafkaConfigurationProperties.deadletterTopic, record, ex)


    @PreDestroy
    fun destroy() {
        kafkaTask?.dispose()
    }
}

RecordProcessor содержит логику обработки Kafka record ( post запрос в некий сервис)   и сбор метрик. Эта часть намеренно изолирована и не зависит от выбранного подхода к организации обработки трафика, поскольку далее будет использована и в альтернативной реализации:

class RecordProcessor(
    private val webClient: WebClient
) : (ConsumerRecord<String, String>) -> Mono<Void> {

    private val logger = LoggerFactory.getLogger(RecordProcessor::class.java)

    override fun invoke(record: ConsumerRecord<String, String>): Mono<Void> =
        post(record.value())
            .metricKafkaReceiver(record.topic())

    private fun post(value: String) =
        webClient.post()
            .uri("/authData")
            .bodyValue(value)
            .retrieve()
            .bodyToMono(String::class.java)
            .then()

    private fun <T> Mono<T>.metricKafkaReceiver(topicSource: String): Mono<T> =
        this.name("kafka.processed")
            .tag("topic", topicSource)
            .metrics()

}

При таком подходе, если downstream-сервис отвечает относительно стабильно и задержки или таймауты встречаются редко, пропускную способность можно регулировать за счёт увеличения размера batch. Например, если требуется поддерживать 200 TPS, а среднее время обработки одной операции составляет около 1 секунды, то установка max.poll.records ≈ 200 теоретически позволяет выйти на необходимую производительность.

Однако ситуация кардинально меняется, если задержки и таймауты начинают проявляться системно хотя бы на некотором проценте транзакций. В этом случае увеличение размера batch уже не помогает.

Batch обрабатывается столько времени, сколько занимает самая медленная операция внутри него. Если в партии присутствует транзакция с таймаутом 60 секунд, то завершение всего batch буд��т отложено на это время. При появлении таких операций пропускная способность начинает резко деградировать.

Таким образом, пропускная способность сервиса начинает определяться не средним временем ответа контрагента, а небольшим процентом задержек, возникающих при взаимодействии с ним. Иными словами, достаточно малого процента медленных операций, чтобы существенно замедлить обработку всего потока.

Параллельная обработка с гарантией at-least-once

Схематично процесс выглядит следующим образом:

At Least Once Processing
At Least Once Processing

Модель обработки включает три фазы:

  • Prepare

  • Processing

  • Post Processing

Сообщения обрабатываются параллельно и одновременно в работе находится N ( настройка maxPollRecords в демонстрационном проекте) сообщений. Как только обработка любого сообщения завершается, немедленно запрашивается следующее сообщение из Kafka. Таким образом при постоянной нагрузке   достигается постоянная загрузка consumer без ожидания завершения обработки какого-либо из сообщений.

Prepare

На данном этапе формируется специальное сообщение и отправляется в wait Topic:

  • key — сгенерированный уникальный UUID

  • value — value оригинального Kafka-сообщения

  • headers: ORIGINAL_TOPIC, PARTITION, ORIGINAL_OFFSET, ORIGINAL_KEY, ORIGINAL_TIMESTAMP, ORIGINAL_HEADERS, WAIT_EXPIRATION_TIME

Как ясно из названия в ORIGINAL_* headers содержится информация об оригинальном сообщении. Это критически важно для корректной реконструкции контекста при анализе инцидентов и повторной отправке.

WAIT_EXPIRATION_TIME определяет момент, после которого сообщение считается потенциально потерянным и подлежит отправке в Dead Letter Topic для разбора.

После успешной отправки в wait Topic выполняется acknowledge offset'а.
Это делает offset доступным для периодического commit. Таким образом, ответственность за сохранность сообщения переносится с offset-коммита на durable-хранилище.

Processing

На этапе Processing выполняется собственно бизнес-логика обработки сообщения. В демонстрационном проекте это POST /authData к внешнему сервису.

При возникновении ошибки сообщение отправляется в Dead Letter Topic. При этом дальнейшая логика Post Processing выполняется независимо от результата.

Post Processing

На этапе Post Processing отправляется tombstone ( сообщение с value = null ) в wait Topic с тем же ключом, который имело сообщение отправленной в wait Topic на стадии Prepare. Это означает, что сообщение успешно или с ошибкой завершило обработку и запись в wait-хранилище может быть удалена.

Важно: Post Processing выполняется всегда - и при успехе, и при ошибке. Следовательно, ситуация, когда в wait Topic присутствует только одно сообщение (без tombstone), возможна почти исключительно при аварийном завершении сервиса при правильной имплементации At Least Once Processing. Именно такие сообщения считаются потенциально потерянными и требуют внимания со стороны сопровождения.

Реализация на reactor kafka такого At Least Once Processing

class AtLeastOnceProcessing(
    val kafkaConfigurationProperties: KafkaConfigurationProperties,
    val recordProcessor: RecordProcessor
) {

    companion object {
        private val log = LoggerFactory.getLogger(this::class.java)
    }

    private lateinit var kafkaReceiver: KafkaReceiver<String, String>
    private lateinit var sender: KafkaSender<String, String>
    private lateinit var kafkaTask: Disposable


    @PostConstruct
    fun init() {
        kafkaReceiver = kafkaConfigurationProperties.createReceiver()
        sender = kafkaConfigurationProperties.createSender()

        kafkaTask = kafkaReceiver.receive(1)
            .flatMapSequential(
                { record ->
                    val recordWaitWrapper = RecordWaitWrapper(
                        record,
                        UUID.randomUUID().toString(),
                        LocalDateTime.now().plusHours(1)
                    )
                    sendWaitRecord(recordWaitWrapper)
                        .doOnSuccess { record.receiverOffset().acknowledge() }
                        .thenReturn(recordWaitWrapper)
                },
                kafkaConfigurationProperties.consumer.maxPollRecords,
                kafkaConfigurationProperties.consumer.maxPollRecords
            )
            .flatMap(
                { recordWaitWrapper ->
                    val record = recordWaitWrapper.record
                      recordProcessor.invoke(record)
                        .onErrorResume { ex ->
                            log.error("Error in processing ${record.offset()}", ex)
                            sendDeadLetter(record, ex)
                        }
                        .then(Mono.defer { sendTombstoneWaitRecord(recordWaitWrapper) })

                },
                kafkaConfigurationProperties.consumer.maxPollRecords
            )
            .subscribe()

    }

    @PreDestroy
    fun destroy() {
        kafkaTask.dispose()
    }

    private fun sendWaitRecord(recordWaitWrapper: RecordWaitWrapper): Mono<Void> =
        sender.sendWithOriginalHeaders(
            kafkaConfigurationProperties.waitTopic,
            recordWaitWrapper.record,
            recordWaitWrapper.expirationUUID,
            recordWaitWrapper.record.value(),
            recordWaitWrapper.recordHeaders
        )

    private fun sendTombstoneWaitRecord(recordWaitWrapper: RecordWaitWrapper): Mono<Void> =
        sender.sendWithOriginalHeaders(
            kafkaConfigurationProperties.waitTopic,
            recordWaitWrapper.record,
            recordWaitWrapper.expirationUUID,
            null,
            recordWaitWrapper.recordHeaders
        )

    private fun sendDeadLetter(record: ConsumerRecord<String, String>, ex: Throwable): Mono<Void> =
        sender.sendDeadLetter(kafkaConfigurationProperties.deadletterTopic, record, ex)

    private data class RecordWaitWrapper(
        val record: ReceiverRecord<String, String>,
        val expirationUUID: String,
        val expirationTime: LocalDateTime
    ) {
        val recordHeaders: Headers =
            RecordHeaders()
                .add(EXPIRATION_UUID.headerName(), expirationUUID.toByteArray())
                .add(EXPIRATION_TIME.headerName(), expirationTime.toString().toByteArray())
    }


}

Контроль "зависших" сообщений

Для отслеживания ситуаций, когда tombstone не пришёл, используется таблица wait в Postgres:

CREATE TABLE wait (
    id BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY,

    original_topic TEXT,
    original_key TEXT,
    original_offset BIGINT,
    original_partition BIGINT,
    original_timestamp TIMESTAMP,
    original_headers TEXT,

    wait_expiration_uuid TEXT UNIQUE,
    wait_expiration_time timestamp NOT NULL,

    body TEXT,

    created_at timestamp  DEFAULT NOW()
);

Kafka Jdbc Sink Connector ( wait.sink ) сохраняет данные из wait Topic в wait - таблицу, а также удаляет соответствующую запись в wait-таблице при получении tombstone. Json конфигурация таски:

{
  "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
  "connection.url": "jdbc:postgresql://postgres:5432/wait",
  "connection.user": "postgres",
  "connection.password": "postgres",
  "connection.attempts": "10",
  "connection.backoff.ms": "10000",
  "dialect.name": "PostgreSqlDatabaseDialect",
  "key.converter": "org.apache.kafka.connect.storage.StringConverter",
  "value.converter": "ru.typik.connector.converter.WaitRecordConverter",
  "table.name.format": "wait",
  "auto.create": "false",
  "insert.mode": "upsert",
  "pk.mode": "record_key",
  "pk.fields": "wait_expiration_uuid",
  "delete.enabled": true,
  "tasks.max": "1",
  "topics": "waitTopic",
  "errors.tolerance": "none",
  "errors.log.enable": true,
  "errors.log.include.messages": true
}

В данной таске реализован свой value.converter - WaitRecordConverter - для упрощения работы преобразованиями. В целом можно было бы обойтись стандартными SMT( Single Message Transforms), но, на мой взгляд, в данном случае получилась бы довольно громоздкая и нечитабельная json-конфигурация.

Kafka Jdbc Source Connector ( expired.source ) - выполняет периодические select-запросы к wait-таблице и выгружает потенциально "зависшие" записи в Dead Letter Topic:

{
  "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
  "connection.url": "jdbc:postgresql://postgres:5432/wait",
  "connection.user": "postgres",
  "connection.password": "postgres",
  "connection.attempts": "10",
  "connection.backoff.ms": "10000",

  "dialect.name": "PostgreSqlDatabaseDialect",
  "topic.prefix": "deadLetterTopic",

  "mode": "timestamp+incrementing",
  "query": "SELECT * FROM (SELECT * FROM wait WHERE wait_expiration_time <= CURRENT_TIMESTAMP) as _t1",
  "incrementing.column.name": "id",
  "timestamp.column.name": "wait_expiration_time",

  "value.converter": "org.apache.kafka.connect.storage.StringConverter",

  "transforms": "createKey,extractKey,addHeader,copyOriginalHeaders,extractValue",

  "transforms.createKey.type": "org.apache.kafka.connect.transforms.ValueToKey",
  "transforms.createKey.fields": "original_key",

  "transforms.extractKey.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
  "transforms.extractKey.field": "original_key",

  "transforms.extractValue.type": "org.apache.kafka.connect.transforms.ExtractField$Value",
  "transforms.extractValue.field": "body",

  "transforms.addHeader.type": "org.apache.kafka.connect.transforms.InsertHeader",
  "transforms.addHeader.header": "Exception",
  "transforms.addHeader.value.literal": "ExpiredRecordException",

  "transforms.copyOriginalHeaders.type": "ru.typik.connector.transforms.JsonHeadersToKafkaHeaders",
  "transforms.copyOriginalHeaders.json.field": "original_headers"
}

В данной таске реализована свое SMT ( Single Message Transform) JsonHeadersToKafkaHeaders, т.к. при отправке в Dead Letter Topic необходимо десереализовать из ORIGINAL_HEADERS, восстановив headers оригинального сообщения.

Таким образом wait-таблица становиться некоторым durable-реестром потенциально "зависших" сообщений.

Сообщения из Dead Letter Topic обрабатываются Retry-модулем. Этот механизм универсален всех сервисов системы и позволяет сотруднику сопровождения анализировать возникающие проблемы и переотправлять проблемное сообщение в исходный Topic, если требуется.

Сравнительное тестирование batch и at-least-once моделей в демонстрационном проекте

В демонстрационном проекте в качестве downstream-системы используется mockServer, эмулирующий различные задержки ответа. В expectation.json задана следующая модель задержке:

  • каждое 10е сообщение обрабатывается с задержкой 1 сек

  • остальные - 200мс

Обращение к mockServer реализовано в RecordProcessor.kt. Именно эта часть кода формирует метрики времени обработки и является общей для обеих моделей обработки.

Демонстрационное приложение может быть запущено в двух вариантах, а переключение выполняется с помощью Spring profile: batch, atLeastOnce.

В subproject compose описани и поднимается c помощью gradle-task batchComposeUp инфрастуктура, необходимая для работы, тестирования и мониторинга приложения: Kafka, Kafka Connect, Postgres, mockServer, Prometheus, Grafana.

Для генерации тестовых данных и нагрузки используется KafkaTestDataGenerator.kt, где с помощью парамтера count задается количество сообщений, отправляемых в Kafka Topic.

Теоретическая оценка пропускной способности

В batch-модели весь набор из maxPollrecords фактически синхронизируется по самому медленному запросу внутри batch.

Для atLeastOnce один из потоков занят при такой конфигурации выполнением медленного запроса, тогда как остальные могут обрабатывать до 5 сообщений в секунду.

 

 maxPollRecords=10

 maxPollRecords=50

 profile = batch

 ~10 TPS

 ~50 TPS

 profile = atLeastOnce

 ~46 TPS

 ~246 TPS

Фактические результаты

Демонстрационный проект запускался для тестирования в четырех конфигурациях:

Profile = batch, maxPollRecords = 10, ~ 9.4 TPS

Profile = batch, maxPollRecords = 50, ~46 TPS

Profile = atLeastOnce, maxPollRecords = 10, ~34 TPS

Profile = atLeastOnce, maxPollRecords = 50, ~170 TPS

В целом результаты соответствуют теоретическому тренду. Несмотря на то что при atLeastOnce не удалось достичь значения, близкого к 246 TPS, из-за нестабильности mockServer (по графикам видно, что среднее время ответа периодически превышало 1 секунду, а максимальные значения доходили до 20 секунд), разница в пропускной способности и характер поведения системы отчётливо видны.

При увеличении доли медленных ответов или возникновении таймаутов (например, 60 секунд) деградация batch-подхода становится критической, тогда как atLeastOnce продолжает обрабатывать поток без полной блокировки.

Выводы и обоснование

В качестве альтернативы рассматривался подход работы напрямую с неким хранилищем (например, Postgres или Redis) на стадиях Prepare и Post Processing. Т.е. вместо отправки в wait Topic - напрямую сохранять в некий реестр и удалять из него, а уже отдельный scheduler ( например тот же Kafka Source Connector) периодически выгружал бы потенциально "потерянные" записи в Dead Letter Topic. С технической точки зрения получилось бы очень похожее на текущее решение. Но при таком подходе важно обеспечить высокую доступность и надежность хранилища.

Ключевая особенность нашей инфраструктуры заключается в том, что сервис не должен напрямую зависеть от доступности кластера Postgres. Согласно регламентам, в аварийных ситуациях восстановление Postgres может занимать до 8 часов. Это не означает, что такие сбои происходят регулярно, однако такой SLA предусмотрен официально. При этом есть требования, что при таком сбое обработка трафика останавливаться не должна. Поэтому на данном этапе использовать Postgres напрямую неприемлемо.

В реализованной схеме сервис не зависит от Postgres напрямую. Да, при недоступности Postgres: Kafka Connector временно не сможет работать с wait таблицей, сохранять и выгружать "просроченные" записи. Однако основной поток обработки продолжает работать, сообщения сохраняются в Kafka и после восстановления Postgres Cluster будут вычитаны, сохранены и выгружены. Потенциально часть, которые фактически были обработаны, могут оказаться на ручном разборе, но это приемлемый риск, после такого сбоя. При этом гарантии доставки сохраняются, а сервис остаётся работоспособным. Для нас это приоритетнее строгой синхронной фиксации состояния в базе данных.

Идемпотентность и state-machine в сервисе реализованы на базе: партиционирования в Kafka и Cassandra. Postgres не участвует в бизнес-логике сервиса. Попытка встроить механизм контроля «зависших» операций непосредственно в Cassandra сталкивается с архитектурными ограничениями. Для wait-реестра требуется таблица, которая одновременно позволяет:

  • Удалять запись по UUID (при получении tombstone)

  • Выполнять периодическую выборку по условию wait_expiration_time < now

В Cassandra подобную модель реализовать корректно и эффективно сложно.

В реляционной базе (Postgres) такая модель естественна. Таким образом, Cassandra подходит нам для state-machine, но плохо подходит для роли временного реестра незавершённых операций с возможностью выборки по времени.

Ранее, когда вместо Cassandra у нас использовался Redis, мы реализовали такую модель через Redis-структуры: на стадиях Prepare и Post Processing работали напрямую с Redis структурами, scheduler также выгружал данные из Redis. Позднее в рамках проекта было принято решение отказаться от Redis Cluster в пользу Cassandra, да и откровенно говоря на сложность поддержки такого функционала в Redis жаловались сотрудники сопровождения.

В результате был разработан компромиссный вариант:

  • сервис остаётся независимым от Postgres

  • Kafka обеспечивает основную гарантию

  • Postgres используется как вспомогательный реестр для операционного контроля

Когда все системы работают штатно, Kafka Connect практически нивелирует разницу между записью напрямую в Postgres и записью через Topic. При этом текущая схема устойчива к сбоям Postgres-кластера, что является критичным требованием.

Parallel Consumer

В ходе анализа натолкнулся на библиотека Parallel Consumer от Confluent, которая как я понял решает в том числе схожую задачу. Преимущества подхода: для учета необработанных операций используется только Kafka, нет лишних систем вроде Postgres, управление параллелизмом реализовано на уровне клиента. Однако на текущий момент остаются вопросы:

  • зрелость библиотеки

  • соответствие нашему стеку

  • прозрачность и удобство разбора инцидентов

  • операционная поддержка

Вариант с Postgres в роли вспомогательного реестра пока выглядит более предсказуемым и управляемым с точки зрения эксплуатации. При этом в будущем не исключаю, что изменю свое мнение в пользу Parallel Consumer.

https://github.com/typik89/kafka_batch_bottleneck_refactoring/tree/main