В данной статье рассматриваются примечательные ошибки при работе с Kafka, в том числе при использовании библиотеки KafkaJS, а также способы их устранения и методы увеличения производительности при публикации и обработки сообщений.

Предполагается, что читатель имеет базовое представление о Kafka (раздел «Общие термины» поможет освежить информацию) и функционале библиотеки KafkaJS.

В первой части разбираются аспекты, связанные с публикацией сообщений.

Общие термины

Broker (Брокер/нода) – инстансы кластера Kafka, которые занимаются хранением данных, а также обработкой запросов от клиентов на запись и чтение сообщений. Брокеры задаются при создании инстанса класса KafkaJS: new Kafka({ clientId:'my-app', brokers:['kafka1:9092', 'kafka2:9092'] }).Ноды как правило запущены на разных серверах для отказоустойчивости.

Topic (топик) – хранилище, в которое пишут сообщения producers (продьюсеры), а читают из него consumers (консьюмеры). Это не классическая очередь, так как сообщения не удаляются после того, как их прочитал какой-либо консьюмер.

Partition (партиция) – топики делятся на partition (разделы): 0, 1, 2....100500. Это аналог шардирования. Данный механизм нужен для масштабирования чтения и сохранения порядка обработки сообщений консьюмерами.

Producer (продьюсер) – клиентское приложение, которое пишет сообщения в тот или иной топик. Запись идет в лидер ноду, с которой уже на реплики разъежаются данные.

Consumer (консьюмер) – клиентское приложение, которое читает сообщения из топика/топиков. Консьюмер в ответ отсылает брокеру информацию о том, сколько сообщений прочитано, для того чтобы зафиксировать offset. Кроме того, консьюмер отсылает heartbeart сообщения, для того чтобы брокер мог не учитывать «мертвые» консьюмеры. По умолчанию консьюмеры читают данные с лидера партиции.

Consumer Group (консьюмер групп) – консьюмеры с одним тем же groupId объединяются в группу. В KafkaJS данный параметр передается при создании консьюмера: kafka.consumer({ groupId: string }) . Консьюмер группы нужны для балансировки нагрузки, так как партиции потребляемого топика делятся между всеми участниками группы.

Offset (оффсет) – уникальный индекс (0, 1, 2 ... ) в рамках партиции топика, который определяет, с какого места консьюмеру нужно читать сообщения. Оффсет возрастает по мере того, как консьюмер читает сообщения из партиции топика. Его значение можно поменять через брокер, например, для того чтобы перечитать сообщения повторно. Оффсет закрепляется именно за целой консьюмер группой, а не отдельным консьюмером. Это предотвращает возможность прочитать одно и то же сообщение дважды несколькими консьюмерами (из одной группы). Сами оффсеты тоже хранятся в Kafka, их можно найти в топике __consumer_offsets.

Group coordinator (координатор) – брокер, который выбран Kafka для распределения партиций, отслеживания активных консьюмеров и управления ребалансом в конкретной консьюмер группе.

In-Sync replicas (ISR) – синхронизированные реплики (лидер + реплики), имеющие лаг, который не превышаетreplica.lag.time.max.ms (по умолчанию 10 cекунд). При публикации сообщения с наивысшими гарантиями записи (acks=-1) именно от них ждет подтверждения продьюсер, чтобы считать запись успешной. Цель этих реплик – обеспечивать доступность кластера на запись (смена лидера при его падении), а также предоставлять гарантию записи (сообщение записано на несколько брокеров).

На данной схеме упрощено взаимодействие с лидером партиции. Продюсер/консьюмер подключается к брокеру (ноде), на котором расположена лидер-партиция, для того чтобы записать/прочитать данные. В данной схеме все лидер партиции находятся наnode-1, но в реальности это будет не так. Лидер партиции будут распределены между нодами и соответственно консьюмеры/продьюсеры в зависимости от партиции будут обращаться к целевой ноде.

Producer

В начале разберем примечательные ошибки при публикации сообщений в Kafka.

Расчет партиции вещь важная

Допустим у продьюсера для выбора целевой партиции задана следующая функция:

const producer = await kafka.producer(topic, {
  createPartitioner: function () {
	return ({ partitionMetadata, message }: PartitionerArgs) => {
		return partitionMetadata.length
			? partitionMetadata[Number(message.key) % partitionMetadata.length]?.partitionId || 0
			: 0;
		};
  },
});

Основная логика функции заключается в целочисленном делении значения поля key из сообщений на общее количество партиций в топике.

Но что произойдет, если keyбудет иметь тип, который не может быть корректно приведен к integer:

await producer.send({
  topic: 'user_events',
  messages: [
      // new ObjectID - это создание mongoDB id: 5f92cdce0cf217478ba93563
     { key: new ObjectID(user._id), value: 'user_created' },
  ],
});

Произойдет следующее, по логике createPartitioner все сообщения будут публиковаться в одну партицию. И консьюмеры точно не скажут спасибо, ведь только один единственный консьюмер в группе вынужден будет потреблять все сообщения, и соответственно о масштабировании можно будет забыть.

Решение проблемы cледущее:

  • Метрика на большой дисбаланс в размерах партиций.

  • Валидировать и/или типизировать ключ партиции на уровне самого продьюсера.

Партиция 404

Например, создали, топик user_events с одной партицией. После чего наше приложение опубликовало сообщение в партицию 1:

const result = await producer.send({
  topic: 'user_events',
  messages: [
    { key: '2', value: 'user_created', partition: 1 }
  ]
});

Казалось бы ничто не предвещало беды. Но консьюмер не получит сообщение, а если заглянуть в топик, то сообщение будет отсутствовать. А ответ метода send вернет пустой массив:

console.log(result); // [] - Пустой ответ ибо партиция не создана

Проблема вызвана тем, что продьюсер публикует в несуществующую партицию. Нумерование партиций идет с 0, а приложение писало в 1.

KafkaJS не бросает ошибку при публикации в несуществующую партицию, а лишь возвращает пустой массив в качестве ответа. Поведение похоже на то, как работают некоторые клиенты при публикации сообщений в RabbitMQ. Если mandatory: false, то сообщение считается опубликованным, даже ecли сообщение не подошло ни для одной очереди.

Для решения данной проблемы топики и партиции необходимо создавать через файлы конфигурации при деплое приложения. Это позволит избежать человеческих ошибок при заведении топика. Кроме того, на уровне приложения нужно валидировать ответ от producer.send:

[
  {
    topicName: 'user_events',
    partition: 0,
    errorCode: 0,
    baseOffset: '0',
    logAppendTime: '-1',
    logStartOffset: '0'
  }
]

Например, бросать ошибку, писать в логи, публиковать в запасной топик или отсылать метрику о проблеме, тут все зависит от требований бизнес-логики приложения и критичности проблемы.

Каждому по продьюсеру

Иногда в проектах, работающих с Kafka, может присутствовать нечто подобное:

app.post('/send-message', async (request) => {
  const { topic, message } = request.body;
 
  const producer = kafka.producer();

  const result = await producer.send({
    topic: topic,
    messages: [
      { 
        value: JSON.stringify({
          content: message,
          timestamp: new Date().toISOString(),
          requestId: request.id
        }),
      }
    ]
  });
  
  return {
    success: true,
    topic: topic,
    message: message,
    result: result
  };
});

Проблема тут в следующем, на каждый запрос создается новый продьюсер: kafka.producer(). Делать так не следует, ведь на каждый такой вызов делается много тяжелой сетевой машинерии (у Kafka свой протокол поверх TCP). Требуется использовать паттерн singleteon для продьюсеров: cоздали один раз и многократно используем для публикации сообщений (producer.send), это повысит пропускную способность и снимет лишнюю нагрузку с брокера.

Cеть ненадежна

Рекомендуется иметь подписку на событие request_timeout, это позволит отловить проблемы с сетью при отправке сообщений в брокер:

producer.on('producer.network.request_timeout', (error) => {
	console.error({
		code: 'producer_network_timeout',
		message: error.message,
		data: { stack: error.stack },
	});
})

Это позволит писать логи или иметь метрику по данному событию.

Стоит обратить внимание на опцию retry, которая может указываться при создании консьюмера:

kafka.producer({
  retry: { retries: 10 }
});

По умолчанию она равна 5, но ее можно увеличивать, если требуется повысить гарантии доставки. Например, при наличии проблем с сетью между продьюсером и брокером. Для того чтобы не DDoSить брокер повторными запросами можно сконфигурировать стратегию ретраев.

Большому куску и рот радуется

Одна из классических ошибок это не оценивать размер сообщения при проектировании нового топика и не иметь метрик для отслеживания реального размера сообщений в продакшене. Kafka не спроектирована для отправки данных больше 1Mb, а если таких сообщений еще и много, то кластеру будет совсем худо. Да, в целом можно настроить Kafka для того, чтобы она справлялась и с сообщениями большего размера 1Mb - 5Mb (больше 10Mb однозначно нет), но это будет не бесплатно. Ценой за это будет:

  • Размер сегмента. Если у партиции настроен небольшой размер сегмента, то большие сообщения будут заставлять Kafka часто ротировать их. Например, при максимальном размере сегмента в 512Mb, ротация сегмента cлучится при публикации всего лишь 35 файлов размером 15Mb. Если использовать большие сегменты, то в случае аварии восстановление кластера будет занимать много времени.

  • Нагрузка на сеть. Большие сообщения будут порождать большой сетевой трафик, ведь необходимо отреплицировать сообщение на все реплики кластера. Кроме того, продьюсеру при использовании acks=-1 придется увеличить таймаут для запросов к брокеру, так как нужно будет больше времени для репликации на все ISR (In-Sync Replicas) реплики.

  • Проблемы записи. Пропускная способность продьюсера упадет при передаче больших сообщений и увеличатся риски потери сообщений (буффер сообщений для отправки будет расти). Кроме того, для хост машины брокера придется выделить больше RAM под os page cache (сначала запись идет в него, а не на диск). В противном случае Kafka будет совершать много I/O операций, так как ей придется интенсивно опустошать кэш на диск.

  • Нагрузка Java. При передаче больших сообщений потребуется больше RAM не только консьюмерам и продьюсерам, но и самой JVM Kafka. Ко всему прочему у брокеров возникнет повышенная CPU нагрузка из-за работы GC.

Рекомендуется использовать следующую формулу для расчета максимально возможного размера сообщения в топике: message.max.bytes = средний размер сообщений * 2, но меньше 1Mb.

А что делать, если надо передавать большие сообщения?

  • Видео, аудио, фото, архивные файлы или большие документы (csv, pdf, xlsx и т.д) следует загружать во внешнее хранилище (например, S3), а в теле сообщения передавать лишь ссылку на данные. Возможен вариант с дроблением большего сообщения на небольшие блоки, а консьюмер на своей стороне собирает цельные данные из блоков сообщений. Однако это дополнительные хлопоты, которые можно избежать, используя внешнее хранилище.

  • В случае, если имеются проблемы с передачей больших данных, например, таких как json/xml без использования внешнего хранилища, то есть большая вероятность, что передается избыточная информация. Как правило, такое случается со «старыми» топиками, когда один топик слушает много консьюмер групп, но каждому из них не нужны все поля из сообщения. Разбейте один топик на несколько и в каждый из них отправляйте лишь то, что нужно отдельно взятому потребителю. Другой подход это передавать лишь только минимально необходимые поля, например, id cущности и тип события ({ user_id, event_type }), а все остальные данные консьюмер будет получать по api (например, HTTP) при обработке сообщения.


Далее рассмотрены способы повышения производительности продьюсинга сообщений, а также факторы, ограничивающие её.

Нужно больше отправлять

Теперь разберемся, что мешает и что может помочь разогнать производительность продьюсинга сообщений в Kafka.

Batch

Отправка сообщений пачками может крайне положительно сказаться на скорости отправки данных в брокер:

const promises = data
  .map(({ user_id, data }) => ({ key: user_id, value: data }))
  .map((data) => producer.send(data));

// ❌ - отправка по одному
await Promise.all(promises);

Продьюсер в KafkaJS интеллектуальный и зачастую он может накапливать сообщения во внутреннем буффере перед их отправкой. Но гораздо предпочтительнее, где по логике программы это возможно, использовать sendBatch :

const topicMessages = data
  .map(({ user_id, data }) => ({ key: user_id, value: data }));

// ✅ - отправка пачкой
await producer.sendBatch({ topicMessages });

Внутрение отличия между send и sendBatch отсутствуют. Методsend под капотом это обертка над sendBatch, которая отсылает массив, состоящий из одного единственного сообщения.

В отличие от многих других клиентов для Kafka у продьюсера отсутствуют параметры linger.ms и batch.size. Это связано с тем, что библиотека KafkaJS cамостоятельно решает, когда и сколько отправить сообщений в брокер. Отправка происходит через заданный интервал. На интервал влияет брокер, который может сообщить продьюсеру, что надо «притормозить», так как продьюсер слишком много или быстро отправляет сообщения. На количество отправляемых за раз порций сообщений влияет параметр maxInFlightRequests, который определяет максимальное количество сообщений, находящихся в процессе отправки.

Подписка на событие request_queue_size позволяет отследить количество событий, ожидающих отправку в брокер:

producer.on('producer.network.request_queue_size', (event) => {
  /* 
   {
     broker: 'localhost:9092',
     clientId: 'my-app',
     queueSize: 1
   }
  */
  console.log('Producer request_queue_size', event.payload);
});

Используя поле queueSize можно построить метрику с целью отслеживания размера буффера сообщений.

Компрессинг

Cжатие данных это то, что может кардинально увеличить скорость отправки сообщений. В Kafka можно использовать сжатие как на стороне брокера, так и продьюсера. Где же правильно включать компрессинг? Для 95% случаев, ответ правильный, что на стороне продьюсера, ведь замасштабировать продьюсер гораздо дешевле (добавить инстанс��в приложения), чем увеличивать количество нод в кластере Kafka (намного дороже и сложнее). В сочетании с отправкой батчами (вероятность пересекающихся данных в пачке сообщений) сжатие может дать очень большой прирост в скорости продьюсинга за счет значительной экономии трафика.

Гарантии доставки

Очевидно, что уменьшение гарантий записи может сделать публикацию сообщений очень быстрой. Параметрacks на продьюсере определяет уровень гарантий. По умолчанию, acks равен -1 – требуется подтверждение от всех ISR реплик, и общее время записи будет равно времени ответа самой медленной реплики. Зачастую есть возможность понизить гарантии записи: поставить 1 (подтверждение только от лидер ноды кластера) или даже 0 (подтверждения не требуются).

Например, приложение отправляет в топик несколько раз в минуту данные c термометров, находящихся в складских помещения. Затем эти данные отображаются на дашбордах в пользовательском приложении. В случае, если будут теряться сообщения, то в самом плохом случае пользователь будет некоторое время наблюдать устаревшие данные в дашборде.

Другой пример, сервис A пишет в топик сообщения о смене статуса заказа в онлайн магазине. Cервис B отображает в личном кабинете пользователя текущее состояние заказа и при этом каждые 10 мин получает текущее состояние заказа посредством HTTP запроса в сервис A. Другими словами, если будут потеряны сообщение о смене статуса заказа, то пользователь будет видеть 10 минут «старый» cтатус.

В обоих случаях, если нет строгих требований к консистентности, то вполне нормально пожертвовать ей в пользу увеличения пропускной способности. Если все же используется самая высокая гарантия записи (acks=-1), то следует выставить параметр брокера min.insync.replicas меньше, чем общее количество реплик в кластере. Обычно, предлагается выставлять общее кол-во реплик - 1, но если имеется требование к высокой доступности кластера на запись, то рекомендуется устанавливать параметр как 60-70% от общего количества реплик.

Транзакции и Exactly Once

В контексте продьюсинга сообщений следует затронуть такой функционал Kafka как транзакции. При использовании данного функционала снижается как скорость публикации, так и обработки сообщений. Транзакции позволяют реализовать exactly once(с оговорками) при публикации сообщений. К тому же опубликованные данные в рамках транзакции будут доступны консьюмеру только после ее коммита, если явно не включить «грязное» чтение (readUncommitted: true). Под капотом транзакции работают с помощью внутреннего топика __transaction_state для хранения состояния и двухфазного коммита для обеспечения гарантии, что данные будут опубликованы даже в случае падении брокера или продьюсера. Общая схема работы транзакции:

Но семанитка exactly onceне гарантируется для версии Kafka 3 и младше. В этих версиях Kafka имеются баги, которые могут приводить к дублям при записи сообщений. В качестве примера, разберем один из них:

  • Продьюсер запускает транзакцию и отправляет брокеру сообщение, которое зависает из-за проблем с сетью.

  • После чего продьюсер ретраит транзакцию.

  • Сообщение из второй транзакции успешно записывается в топик.

  • Однако, продьюсер еще не успел сделать коммит, как к брокеру приходит сообщение из первой транзакции и записывается в партицию.

  • Продьюсер коммитит транзакцию и в результате в партиции имеются два одинаковых сообщения.

В случае необходимости применения функционала транзакции, крайне рекомендуется обновить Kafka до 4 версии. Иначе могут возникать ситуации, когда гарантии exactly once формальны.

Производительность является платой за возможность пользоваться exactly once. Если публикация сообщений без транзакций похожа на перемещение по широкополосной автостраде, то использование транзакции больше похоже на использование однополосной дороги с реверсивным движением. Почему же транзакции снижают производительность?

  • Бутылочное горлышко на продьюсере. У одного инстанса продьюсера может быть только одна активная транзакция. Следовательно, невозможно запустить несколько параллельно работающих транзакций с помощью одного инстанса продьюсера или часть сообщений публиковать в рамках транзакции, а части вне ее. Создание множества продьюсеров нагружает брокер и добавляет сетевые задержки. Ко всему прочему ограничение на количество одновременно отправляемых сообщений (maxInFlightRequests: 1) капитально снижает пропускную способность продьюсера. Использование батчинга в рамках транзакции может несколько уменьшить негативное влияние этой настройки. Ко всему прочему транзакции добавляют внутреннюю работу брокеру по фиксации результатов работы и сетевые запросы к нему: выдача id продьюсера, регистрация транзакции, коммит или откат ее.

  • Мусорные сообщения в топике. Данные, которые были опубликованны в рамках отмененной транзакций (abort) все равно пишутся в топик. Консьюмер после получения по сети пачки данных отфильтровывает незакоммиченные сообщения внутри себя. Большое количества отмененных транзакций замусоривают топик, создают между брокером и консьюмером лишний сетевой трафик и замедляют последнего.

  • Блокирование обработки сообщений. Транзакции, которые долго выполняются или зависли (например, продьюсер упал с ошибкой), блокируют обработку сообщений в партиции. Kafka использует LSO (Last Stable Offset) для определения до какой позиции в партиции консьюмер может обрабатывать сообщения. Консьюмеры, читающие с readUncommitted: false, остановятся на LSO, даже при наличии более поздних зафиксированных cообщений в партиции. Пример блокировки обработки партиции транзакцией:

    На схеме выше транзакция 1 (еще не завершилась) блокирует обработку сообщений 5-6 (успешно опубликованны транзакцией 2) и 7-8 (успешно опубликованные без транзакции) консьюмером. Снизить время блокировки чтения данных можно следующими способами: уменьшить таймаут у продьюсера для транзакций и/или разделить по разным топикам данные, которые нужно публиковать в транзакциях, а какие без их использования.

Однако, семантика exactly onceневозможна на стороне консьюмера (подробнее в следующей части), а только при публикации сообщений. Исходя из вышесказанного получается, что использование данного функционала довольно сомнительно: потеря производительности, но при сохранении необходимости на стороне консьюмера имплементации логики повторной обработки сообщения. Очень вероятно, что если есть потребность использовать транзакции, то нужно использовать вместо Kafka другой инструмент для решения задачи. Например, перейти от событийной коммуникации к последовательному вызову внешних систем с сохранением промежуточного состояния в БД (паттерн Cага). Temporal может выступать в качестве примера готового инструмента для реализации данного паттерна.


Следующая часть будет посвящена ошибкам связанным с обработкой сообщений, ребалансом в консьюмер группах и увеличением производительности консьюмеров.