Как стать автором
Обновить

modern-cpp-kafka для C++. Решаем проблемы владения и сериализации

Уровень сложностиСредний
Время на прочтение11 мин
Количество просмотров381

Доброго времени суток! Эта статья написана для тех, кто в общих чертах знаком с тем, что такое и для чего используется Apache Kafka, кто такие Producer и Consumer и как они друг с другом работают. Целью этой статьи является показать способ использования библиотеки modern-cpp-kafka для работы с Apache Kafka на современном C++. В общих чертах с темой можно ознакомится, например, здесь (отсюда же взяты скриншоты), а в этой статье будет рассмотрено решение проблем с владением и (де)сериализацией наиболее простым способом.

Идея написания этого небольшого руководства появилась у меня, когда я начал изучать одну из самых популярных библиотек для работы с Apache Kafka - а именно modern-cpp-kafka. Она основана в виде оболочки над старой доброй реализацией для C - lirdbkaka. Версия для C++ подкупила меня тем, что она предоставляет довольно удобный интерфейс для работы, использует современные и актуальные семантики языка вроде RAII и шаблонов, а также, как утверждает создатель, отсутствие оверхеда и самую быструю скорость при работе с данными размерами в пределах 256 B ~ 2 KB.

Однако, как водится, ничто не идеально, а именно - владение данными, сериализация и десериализация, которые реализованы что в librdkafka, что в modern-cpp-kafka примерно никак. Для того, чтобы разобраться, что именно не так, необходимо: рассмотреть механизм работы самой Kafka; ознакомиться с реализацией функций отправки и приема в librdkafka и modern-cpp-kafka; понять, причем здесь владение; разработать способ сериализации и десериализации.

В конце статьи я приведу код собственного решения проблемы с сериализацией. Решение отчасти банальное и не годится для промышленного использования, но оно хорошо подойдет начинающим пользователям ввиду своей минималистичности и простоты.

Механизм работы с данными в Apache Kafka

Максимально кратко рассмотрим механизм добавления и извлечения данных в Apache Kafka. Данные доставляются, хранятся и извлекаются исключительно в двоичном представлении, что автоматически поднимает вопрос о том, как получить данные в нужном пользователю виде - то есть о сериализации и десериализации.

Пример добавления сообщения с ключом int, равным 123, и значением string, равным "hello world"
Пример добавления сообщения с ключом int, равным 123, и значением string, равным "hello world"

Как видно на скриншоте, при добавлении ключа и соответствующего ему значения требуется преобразовать их в двоичный вид (сериализовать), для чего используются сериализаторы как для ключа, так и значения. Эти задачи берет на себя Producer.

Ситуация с извлечением данных выглядит очень похоже.

Пример извлечения сообщения с ключом int, равным 123, и значением string, равным "hello world"
Пример извлечения сообщения с ключом int, равным 123, и значением string, равным "hello world"

На этом скриншоте показана схема работы Consumer-а, который занимается извлечением двоичных данных и преобразованием их в нужный тип (десериализацией).

Механизм работы с данными в librdkafka и modern-cpp-kafka

Библиотека librdkafka, будучи написанной на C, использует именно C-шный механизм так называемого стирания типов (type erasure) путем использования функции rd_kafka_produce, а также связки void-указателя и длины данных.

RD_EXPORT
int rd_kafka_produce(rd_kafka_topic_t *rkt,
                     int32_t partition,
                     int msgflags,
                     void *payload,
                     size_t len,
                     const void *key,
                     size_t keylen,
                     void *msg_opaque);

Критика такого подхода стара как сам язык C, однако это единственный доступный в C способ передавать любые данные, нужно лишь их сериализовать. Тем не менее, сама библиотека не предоставляет никаких инструментов для сериализации.

Не решает ее, увы, и библиотека modern-cpp-kafka. Рассмотрим функцию KafkaProducer::send для отправки сообщения:

inline void
KafkaProducer::send(const producer::ProducerRecord& record,     // Структура с данными (ключ, значение, хедеры, топик, партишен и id)
                    const producer::Callback&       deliveryCb, // Callback-функция
                    SendOption                      option,     // Опции для отправки
                    ActionWhileQueueIsFull          action);    // Опции действия на заполненность очереди (блокировать или нет)

Нам нужен первый тип первого параметра - ProducerRecord. Его структура (не включая методы) выглядит так:

/**
 * A key/value pair to be sent to Kafka.
 * This consists of a topic name to which the record is being sent, an optional partition number, and an optional key and value.
 * Note: `ProducerRecord` would not take the ownership from the memory block of `Value`.
 */
class ProducerRecord
{
public:
    using Id  = std::uint64_t;

    // конструкторы, геттеры, сеттеры

private:
    Topic        _topic;
    Partition    _partition;
    Key          _key;
    Value        _value;
    Headers      _headers;
    Optional<Id> _id;
};

Типы Key и Value являются type alias-ами на тип ConstBuffer:

// Which is similar with `boost::const_buffer` (thus avoid the dependency towards `boost`)
class ConstBuffer
{
public:
    // побайтовое копирование указателя
    explicit ConstBuffer(const void* data = nullptr, std::size_t size = 0): _data(data), _size(size) {}
    const void* data()     const { return _data; }
    std::size_t size()     const { return _size; }
    std::string toString() const
    {
        // опустим реализацию
    }
private:
    const void* _data;
    std::size_t _size;
};

Поля класса выглядят как-то знакомо, не правда ли? Да, это именно та связка void-указателя и длины, которая используется и в функции rd_kafka_produce.

Копирование указателей выполняется побайтово, из-за этого мы сталкиваемся с проблемой "висящего" указателя и, как следствие, use-after-free. Именно реализация класса ConstBuffer делает класс ProducerRecord невладеющим (и крайне неудобным в нетривиальных случаях), что и сказано прямиков в коментарии в описании класса (для удобства переведенного на русский):

Примечание: ProducerRecord не будет принимать права владения блоком памяти Value.

Как мы видим, что в librdkafka, что в modern-cpp-kafka используется невладеющий механизм для передачи или хранения параметров.

Функции Serialize и Deserialize будут описаны ниже, в соответствующем пункте, пока же нам важно знать, что Serialize возврашает std::vector<std::byte>.Уже на этапе формировании данных для отправки возникает проблема с потерей данных:

// Хотим получить готовый объект из сериализованных данных
// но получаем "висящий" указатель - увы и ах!
template <typename T>
ConstBuffer ValueFromNonOwning(const T& value) {
  const std::vector<byte> serialized = Serialize(value);

  // глубокого копирования не происходит - "висящий" указатель
  return ConstBuffer(serialized.data(), serialized.size());
}

// Не спасет тут и умный указатель: ни на локальную переменную serialized
// т.к. данные из умного указателя будут просто скопированы по значению
// вместо создания копии; ни на сам возвращаемый объект ConstBuffer
// (по изначальной причине) - и снова увы и ах!
template <typename T>
ConstBuffer ValueFromNonOwningUniquePtr(const T& value) {
  // выделяем память в куче (с shared_ptr та же история, ведь счетчик ссылок равен 0)
  const auto serialized = std::make_unique<std::vector<std::byte>>(Serialize(value));

  // проблема остается, потому что теперь разрушается сам умный указатель
  return ConstBuffer(serialized->data(), serialized->size());
}

Владеющий ConstBuffer

Чтобы починить проблему с "висящим" указателем, мне пришла идея написать тот владеющий буфер, который будет гарантировать сохранение данных на протяжении всей жизни объекта. Так появился OwningBuffer:

// Владеющий аналог класса ConstBuffer
class OwningBuffer {
 public:
  explicit OwningBuffer(const void* data = nullptr, const std::size_t size = 0) {
    if (data && size > 0) {
      m_rawData.assign(static_cast<const std::byte*>(data),
                       static_cast<const std::byte*>(data) + size);
    }
  }

  explicit OwningBuffer(const ConstBuffer& buffer)
      : OwningBuffer(buffer.data(), buffer.size()) {}

  // конструктор копирования для создания из сырых данных
  explicit OwningBuffer(const std::vector<std::byte>& bytes) : m_rawData(bytes) {}
  
  // конструктор перемещения для создания из сырых данных
  explicit OwningBuffer(std::vector<std::byte>&& bytes) noexcept
      : m_rawData(std::move(bytes)) {}

  [[nodiscard]] const void* data() const { return m_rawData.data(); }
  [[nodiscard]] std::size_t size() const { return m_rawData.size(); }

  [[nodiscard]] std::string toString() const {
    // так как создание ConstBuffer не содержит сложных операций,
    // нет и оверхеда на создание временного объекта
    const ConstBuffer buffer(m_rawData.data(), m_rawData.size());
    return buffer.toString();
  }

  // получение всегда валидного объекта ConstBuffer
  [[nodiscard]] ConstBuffer asConstBuffer() const {
    return ConstBuffer(m_rawData.data(), m_rawData.size());
  }

 private:
  // сырые данные объекта
  std::vector<std::byte> m_rawData;
};

// Такой вариант функции работает корректно, никаких "висящих" указателей
template <typename T>
OwningBuffer ValueFrom(const T& value) {
  const auto serialized = Serialize(value);

  return OwningBuffer(serialized);
}

Интерфейс класса аналогичен интерфейсу ConstBuffer, но содержит некоторые дополнения в виде новых конструкторов для создания объекта из сырых данных, а также вспомогательного метода для получения валидного объекта ConstBuffer. Владеющий подход полностью решает проблему с "висящими" указателями и use-after-free, и теперь можно перейти к сериализации и десериализации.

Сериализация и десериализация

Теперь, решив проблему с владением, можно перейти к самому главному - сериализации и десериализации данных. Несмотря на обилие библиотек, которые предоставляют такую функциональность, я не буду их использовать, чтобы не перегружать статью лишними зависимостями и усложнять материал разбором доступных средств для сериализации. Поэтому мы будем использовать старый добрый std::byte из C++17 в еще более старом std::vector и reinterpret_cast.

#include <cstring>
#include <stdexcept>
#include <type_traits>
#include <vector>

template <typename T>
std::vector<std::byte> Serialize(const T& value) {
  static_assert(std::is_trivially_copyable_v<T>, "Type must be trivially copyable");

  const auto begin = reinterpret_cast<const std::byte*>(&value);
  return {begin, begin + sizeof(T)};
}

template <typename T>
T Deserialize(const std::vector<std::byte>& serializedData) {
  static_assert(std::is_trivially_copyable_v<T>, "Type must be trivially copyable");
  static_assert(std::is_default_constructible_v<T>, "Type must be default constructible");

  // примечание: проверяется только размер объекта.
  // никто не мешает записать в int32_t содержимое std::string размером 5 байт
  if (serializedData.size() != sizeof(T)) {
    throw std::runtime_error("Serialized data size does not match target type size");
  }

  T value;
  std::memcpy(&value, serializedData.data(), sizeof(T));
  return value;
}

// специализация для std::string
template <>
inline std::vector<std::byte> Serialize<std::string>(const std::string& value) {
  auto begin = reinterpret_cast<const std::byte*>(value.data());
  return {begin, begin + value.size()};
}

// специализация для std::string
template <>
inline std::string Deserialize<std::string>(
    const std::vector<std::byte>& serializedData) {
  return {reinterpret_cast<const char*>(serializedData.data()), serializedData.size()};
}

В static_assert-ах, по большому счету, нет такой необходимости, ведь код и так и так не скомпилируется, если что-то пойдет не так, но всегда приятнее видеть красивые сообщения об ошибках, верно? Сеханизм сериализации банален: берем адрес объекта и прибавляем его размер, таким образом "охватывая" все содержимое объекта, на выходе получаем вектор байтов, то есть сырые данные объекта.

С десериализацией все немного сложнее (и не так гладко). Здесь мы используем функцию memcpy, чтобы заполнить созданный при помощи конструктора по умолчанию объект переданными сырыми байтами. Также нужно иметь в виду, что проверяются только размеры десериализуемого объекта, то есть можно записать объект, например, std::string в объект типа float и получить бессмыслицу.

Остается лишь создать обертки для сериализации и десериализации. В этом нам помогут функции ValueTo и ValueFrom:

// сериализация
template <typename T>
OwningBuffer ValueFrom(const T& value) {
  const auto serialized = Serialize(value);

  return OwningBuffer(serialized);
}

// десериализация
template <typename T>
T ValueTo(const Value& value) {
  if (!value.data()) {
    throw std::runtime_error("Received empty value");
  }

  // преобразуем void* к const byte* и "захватываем" все значения
  const std::vector<std::byte> serializedData(
      static_cast<const std::byte*>(value.data()),
      static_cast<const std::byte*>(value.data()) + value.size());

  // возвращаем объект, полученный при помощи десериализации
  return Deserialize<T>(serializedData);
}

Демонстрация работы

В качестве примера рассмотрим классическую модель Single Producer - SIngle Consumer. Код Producer-а:

#include <kafka/KafkaProducer.h>

#include <iostream>
#include <string>

#include "KafkaUtils.h"

using namespace kafka::clients::producer;

void SendValues(const kafka::Topic& topic, KafkaProducer& producer) {
  // Prepare delivery callback
  auto deliveryCb = [](const RecordMetadata& metadata, const kafka::Error& error) {
    if (!error) {
      std::cout << "Message delivered: " << metadata.toString() << std::endl;
    } else {
      std::cerr << "Message failed to be delivered: " << error.message() << std::endl;
    }
  };

  {
    std::cout << "Sending messages with FLOAT type" << std::endl;

    constexpr float f = 2.34;

    kafka::extensions::SendValue(producer, topic, kafka::NullKey,
                                 kafka::extensions::ValueFrom(f).AsConstBuffer(),
                                 deliveryCb);
  }

  {
    std::cout << "Sending messages with STRINGS type" << std::endl;

    const auto values = std::vector<std::string>{"amogus", "breakpoint", "cappa",
                                                 "delta",  "extension",  "final"};

    for (const auto& value : values) {
      kafka::extensions::SendValue(producer, topic, kafka::NullKey,
                                   kafka::extensions::ValueFrom(value).AsConstBuffer(),
                                   deliveryCb);
    }
  }
}

void DoProducerWork() {
  // взято из конфигурации Docker Compose
  const std::string brokers = "localhost:29092";
  const kafka::Topic topic = "test-topic";

  // Prepare the configuration
  kafka::Properties props;
  props.put("bootstrap.servers", brokers);

  // Create a producer
  KafkaProducer producer(props);

  std::println(std::cout, "SendValues function");
  SendValues(topic, producer);
}

int main() {
  try {
    DoProducerWork();
  } catch (const std::exception& e) {
    std::cerr << "Error: " << e.what() << std::endl;
    return 1;
  }

  return 0;
}

Код Consumer-а:

#include <KafkaUtils.h>
#include <kafka/KafkaConsumer.h>

#include <csignal>
#include <iostream>
#include <string>

std::atomic_bool running = {true};

void StopRunning(int sig) {
  if (sig != SIGINT) return;

  if (running) {
    running = false;
  } else {
    // Restore the signal handler, -- to avoid stuck with this handler
    signal(SIGINT, SIG_IGN);  // NOLINT
  }
}

void DoConsumerWork() {
  using namespace kafka;
  using namespace kafka::clients::consumer;

  const std::string brokers = "localhost:29092";
  const Topic topic = "test-topic";

  // Prepare the configuration
  Properties props;
  props.put("bootstrap.servers", brokers);

  // Create a consumer instance
  KafkaConsumer consumer(props);

  // Subscribe to topics
  consumer.subscribe({topic});

  while (running) {
    // Poll messages from Kafka brokers
    for (const auto records = consumer.poll(std::chrono::milliseconds(100));
         const auto& record : records) {
      if (record.error()) {
        std::cerr << record.toString() << std::endl;
        continue;
      }

      std::cout << "Got a new message..." << std::endl;
      std::cout << "    Topic    : " << record.topic() << std::endl;
      std::cout << "    Partition: " << record.partition() << std::endl;
      std::cout << "    Offset   : " << record.offset() << std::endl;
      std::cout << "    Timestamp: " << record.timestamp().toString() << std::endl;
      std::cout << "    Headers  : " << toString(record.headers()) << std::endl;

      try {
        const auto stringValue = kafka::extensions::ValueTo<std::string>(record.value());
        std::cout << "    STRING [" << stringValue << "]" << std::endl;
      } catch (const std::exception& e) {
        std::cerr << "    Failed to deserialize as string: " << e.what() << std::endl;
      }

      try {
        const auto floatValue = kafka::extensions::ValueTo<float>(record.value());
        std::cout << "    FLOAT [" << floatValue << "]" << std::endl;
      } catch (const std::exception& e) {
        std::cerr << "    Failed to deserialize as float: " << e.what() << std::endl;
      }
    }
  }
}

int main() {
  // Use Ctrl-C to terminate the program
  signal(SIGINT, StopRunning);  // NOLINT

  DoConsumerWork();

  return 0;
}

Заключение

В этой статье я постарался совместить сразу несколько вещей: рассказать про Apache Kafka, про хранение данных внутри, про наличие определенных проблем и подходы к их решению. Написание текстов (помимо курсовых и дипломной работ) для меня в новинку, поэтому призываю каждого, кто прочитал, поделиться мыслями - с удовольствием почитаю и отвечу.

Демонстрацию сериализации и десериализации прямо в браузере можно посмотреть на сайте godbolt. Исходный код с настройкой Apache Kafka в Docker Compose находится в моем github-репозитории.

Теги:
Хабы:
+1
Комментарии0

Публикации

Истории

Работа

Программист С
40 вакансий
Программист C++
100 вакансий
QT разработчик
5 вакансий

Ближайшие события

11 – 13 февраля
Epic Telegram Conference
Онлайн
27 марта
Deckhouse Conf 2025
Москва
25 – 26 апреля
IT-конференция Merge Tatarstan 2025
Казань