Доброго времени суток! Эта статья написана для тех, кто в общих чертах знаком с тем, что такое и для чего используется Apache Kafka, кто такие Producer и Consumer и как они друг с другом работают. Целью этой статьи является показать способ использования библиотеки modern-cpp-kafka
для работы с Apache Kafka на современном C++. В общих чертах с темой можно ознакомится, например, здесь (отсюда же взяты скриншоты), а в этой статье будет рассмотрено решение проблем с владением и (де)сериализацией наиболее простым способом.
Идея написания этого небольшого руководства появилась у меня, когда я начал изучать одну из самых популярных библиотек для работы с Apache Kafka - а именно modern-cpp-kafka
. Она основана в виде оболочки над старой доброй реализацией для C - lirdbkaka
. Версия для C++ подкупила меня тем, что она предоставляет довольно удобный интерфейс для работы, использует современные и актуальные семантики языка вроде RAII и шаблонов, а также, как утверждает создатель, отсутствие оверхеда и самую быструю скорость при работе с данными размерами в пределах 256 B ~ 2 KB.
Однако, как водится, ничто не идеально, а именно - владение данными, сериализация и десериализация, которые реализованы что в librdkafka
, что в modern-cpp-kafka
примерно никак. Для того, чтобы разобраться, что именно не так, необходимо: рассмотреть механизм работы самой Kafka; ознакомиться с реализацией функций отправки и приема в librdkafka
и modern-cpp-kafka
; понять, причем здесь владение; разработать способ сериализации и десериализации.
В конце статьи я приведу код собственного решения проблемы с сериализацией. Решение отчасти банальное и не годится для промышленного использования, но оно хорошо подойдет начинающим пользователям ввиду своей минималистичности и простоты.
Механизм работы с данными в Apache Kafka
Максимально кратко рассмотрим механизм добавления и извлечения данных в Apache Kafka. Данные доставляются, хранятся и извлекаются исключительно в двоичном представлении, что автоматически поднимает вопрос о том, как получить данные в нужном пользователю виде - то есть о сериализации и десериализации.
Как видно на скриншоте, при добавлении ключа и соответствующего ему значения требуется преобразовать их в двоичный вид (сериализовать), для чего используются сериализаторы как для ключа, так и значения. Эти задачи берет на себя Producer.
Ситуация с извлечением данных выглядит очень похоже.
На этом скриншоте показана схема работы Consumer-а, который занимается извлечением двоичных данных и преобразованием их в нужный тип (десериализацией).
Механизм работы с данными в librdkafka и modern-cpp-kafka
Библиотека librdkafka
, будучи написанной на C, использует именно C-шный механизм так называемого стирания типов (type erasure) путем использования функции rd_kafka_produce
, а также связки void
-указателя и длины данных.
RD_EXPORT
int rd_kafka_produce(rd_kafka_topic_t *rkt,
int32_t partition,
int msgflags,
void *payload,
size_t len,
const void *key,
size_t keylen,
void *msg_opaque);
Критика такого подхода стара как сам язык C, однако это единственный доступный в C способ передавать любые данные, нужно лишь их сериализовать. Тем не менее, сама библиотека не предоставляет никаких инструментов для сериализации.
Не решает ее, увы, и библиотека modern-cpp-kafka
. Рассмотрим функцию KafkaProducer::send
для отправки сообщения:
inline void
KafkaProducer::send(const producer::ProducerRecord& record, // Структура с данными (ключ, значение, хедеры, топик, партишен и id)
const producer::Callback& deliveryCb, // Callback-функция
SendOption option, // Опции для отправки
ActionWhileQueueIsFull action); // Опции действия на заполненность очереди (блокировать или нет)
Нам нужен первый тип первого параметра - ProducerRecord
. Его структура (не включая методы) выглядит так:
/**
* A key/value pair to be sent to Kafka.
* This consists of a topic name to which the record is being sent, an optional partition number, and an optional key and value.
* Note: `ProducerRecord` would not take the ownership from the memory block of `Value`.
*/
class ProducerRecord
{
public:
using Id = std::uint64_t;
// конструкторы, геттеры, сеттеры
private:
Topic _topic;
Partition _partition;
Key _key;
Value _value;
Headers _headers;
Optional<Id> _id;
};
Типы Key
и Value
являются type alias-ами на тип ConstBuffer
:
// Which is similar with `boost::const_buffer` (thus avoid the dependency towards `boost`)
class ConstBuffer
{
public:
// побайтовое копирование указателя
explicit ConstBuffer(const void* data = nullptr, std::size_t size = 0): _data(data), _size(size) {}
const void* data() const { return _data; }
std::size_t size() const { return _size; }
std::string toString() const
{
// опустим реализацию
}
private:
const void* _data;
std::size_t _size;
};
Поля класса выглядят как-то знакомо, не правда ли? Да, это именно та связка void-указателя и длины, которая используется и в функции rd_kafka_produce.
Копирование указателей выполняется побайтово, из-за этого мы сталкиваемся с проблемой "висящего" указателя и, как следствие, use-after-free. Именно реализация класса ConstBuffer
делает класс ProducerRecord
невладеющим (и крайне неудобным в нетривиальных случаях), что и сказано прямиков в коментарии в описании класса (для удобства переведенного на русский):
Примечание:
ProducerRecord
не будет принимать права владения блоком памятиValue
.
Как мы видим, что в librdkafka
, что в modern-cpp-kafka
используется невладеющий механизм для передачи или хранения параметров.
Функции Serialize
и Deserialize
будут описаны ниже, в соответствующем пункте, пока же нам важно знать, что Serialize возврашает std::vector<std::byte>
.Уже на этапе формировании данных для отправки возникает проблема с потерей данных:
// Хотим получить готовый объект из сериализованных данных
// но получаем "висящий" указатель - увы и ах!
template <typename T>
ConstBuffer ValueFromNonOwning(const T& value) {
const std::vector<byte> serialized = Serialize(value);
// глубокого копирования не происходит - "висящий" указатель
return ConstBuffer(serialized.data(), serialized.size());
}
// Не спасет тут и умный указатель: ни на локальную переменную serialized
// т.к. данные из умного указателя будут просто скопированы по значению
// вместо создания копии; ни на сам возвращаемый объект ConstBuffer
// (по изначальной причине) - и снова увы и ах!
template <typename T>
ConstBuffer ValueFromNonOwningUniquePtr(const T& value) {
// выделяем память в куче (с shared_ptr та же история, ведь счетчик ссылок равен 0)
const auto serialized = std::make_unique<std::vector<std::byte>>(Serialize(value));
// проблема остается, потому что теперь разрушается сам умный указатель
return ConstBuffer(serialized->data(), serialized->size());
}
Владеющий ConstBuffer
Чтобы починить проблему с "висящим" указателем, мне пришла идея написать тот владеющий буфер, который будет гарантировать сохранение данных на протяжении всей жизни объекта. Так появился OwningBuffer
:
// Владеющий аналог класса ConstBuffer
class OwningBuffer {
public:
explicit OwningBuffer(const void* data = nullptr, const std::size_t size = 0) {
if (data && size > 0) {
m_rawData.assign(static_cast<const std::byte*>(data),
static_cast<const std::byte*>(data) + size);
}
}
explicit OwningBuffer(const ConstBuffer& buffer)
: OwningBuffer(buffer.data(), buffer.size()) {}
// конструктор копирования для создания из сырых данных
explicit OwningBuffer(const std::vector<std::byte>& bytes) : m_rawData(bytes) {}
// конструктор перемещения для создания из сырых данных
explicit OwningBuffer(std::vector<std::byte>&& bytes) noexcept
: m_rawData(std::move(bytes)) {}
[[nodiscard]] const void* data() const { return m_rawData.data(); }
[[nodiscard]] std::size_t size() const { return m_rawData.size(); }
[[nodiscard]] std::string toString() const {
// так как создание ConstBuffer не содержит сложных операций,
// нет и оверхеда на создание временного объекта
const ConstBuffer buffer(m_rawData.data(), m_rawData.size());
return buffer.toString();
}
// получение всегда валидного объекта ConstBuffer
[[nodiscard]] ConstBuffer asConstBuffer() const {
return ConstBuffer(m_rawData.data(), m_rawData.size());
}
private:
// сырые данные объекта
std::vector<std::byte> m_rawData;
};
// Такой вариант функции работает корректно, никаких "висящих" указателей
template <typename T>
OwningBuffer ValueFrom(const T& value) {
const auto serialized = Serialize(value);
return OwningBuffer(serialized);
}
Интерфейс класса аналогичен интерфейсу ConstBuffer
, но содержит некоторые дополнения в виде новых конструкторов для создания объекта из сырых данных, а также вспомогательного метода для получения валидного объекта ConstBuffer
. Владеющий подход полностью решает проблему с "висящими" указателями и use-after-free, и теперь можно перейти к сериализации и десериализации.
Сериализация и десериализация
Теперь, решив проблему с владением, можно перейти к самому главному - сериализации и десериализации данных. Несмотря на обилие библиотек, которые предоставляют такую функциональность, я не буду их использовать, чтобы не перегружать статью лишними зависимостями и усложнять материал разбором доступных средств для сериализации. Поэтому мы будем использовать старый добрый std::byte
из C++17 в еще более старом std::vector
и reinterpret_cast
.
#include <cstring>
#include <stdexcept>
#include <type_traits>
#include <vector>
template <typename T>
std::vector<std::byte> Serialize(const T& value) {
static_assert(std::is_trivially_copyable_v<T>, "Type must be trivially copyable");
const auto begin = reinterpret_cast<const std::byte*>(&value);
return {begin, begin + sizeof(T)};
}
template <typename T>
T Deserialize(const std::vector<std::byte>& serializedData) {
static_assert(std::is_trivially_copyable_v<T>, "Type must be trivially copyable");
static_assert(std::is_default_constructible_v<T>, "Type must be default constructible");
// примечание: проверяется только размер объекта.
// никто не мешает записать в int32_t содержимое std::string размером 5 байт
if (serializedData.size() != sizeof(T)) {
throw std::runtime_error("Serialized data size does not match target type size");
}
T value;
std::memcpy(&value, serializedData.data(), sizeof(T));
return value;
}
// специализация для std::string
template <>
inline std::vector<std::byte> Serialize<std::string>(const std::string& value) {
auto begin = reinterpret_cast<const std::byte*>(value.data());
return {begin, begin + value.size()};
}
// специализация для std::string
template <>
inline std::string Deserialize<std::string>(
const std::vector<std::byte>& serializedData) {
return {reinterpret_cast<const char*>(serializedData.data()), serializedData.size()};
}
В static_assert
-ах, по большому счету, нет такой необходимости, ведь код и так и так не скомпилируется, если что-то пойдет не так, но всегда приятнее видеть красивые сообщения об ошибках, верно? Сеханизм сериализации банален: берем адрес объекта и прибавляем его размер, таким образом "охватывая" все содержимое объекта, на выходе получаем вектор байтов, то есть сырые данные объекта.
С десериализацией все немного сложнее (и не так гладко). Здесь мы используем функцию memcpy, чтобы заполнить созданный при помощи конструктора по умолчанию объект переданными сырыми байтами. Также нужно иметь в виду, что проверяются только размеры десериализуемого объекта, то есть можно записать объект, например, std::string
в объект типа float
и получить бессмыслицу.
Остается лишь создать обертки для сериализации и десериализации. В этом нам помогут функции ValueTo
и ValueFrom
:
// сериализация
template <typename T>
OwningBuffer ValueFrom(const T& value) {
const auto serialized = Serialize(value);
return OwningBuffer(serialized);
}
// десериализация
template <typename T>
T ValueTo(const Value& value) {
if (!value.data()) {
throw std::runtime_error("Received empty value");
}
// преобразуем void* к const byte* и "захватываем" все значения
const std::vector<std::byte> serializedData(
static_cast<const std::byte*>(value.data()),
static_cast<const std::byte*>(value.data()) + value.size());
// возвращаем объект, полученный при помощи десериализации
return Deserialize<T>(serializedData);
}
Демонстрация работы
В качестве примера рассмотрим классическую модель Single Producer - SIngle Consumer. Код Producer-а:
#include <kafka/KafkaProducer.h>
#include <iostream>
#include <string>
#include "KafkaUtils.h"
using namespace kafka::clients::producer;
void SendValues(const kafka::Topic& topic, KafkaProducer& producer) {
// Prepare delivery callback
auto deliveryCb = [](const RecordMetadata& metadata, const kafka::Error& error) {
if (!error) {
std::cout << "Message delivered: " << metadata.toString() << std::endl;
} else {
std::cerr << "Message failed to be delivered: " << error.message() << std::endl;
}
};
{
std::cout << "Sending messages with FLOAT type" << std::endl;
constexpr float f = 2.34;
kafka::extensions::SendValue(producer, topic, kafka::NullKey,
kafka::extensions::ValueFrom(f).AsConstBuffer(),
deliveryCb);
}
{
std::cout << "Sending messages with STRINGS type" << std::endl;
const auto values = std::vector<std::string>{"amogus", "breakpoint", "cappa",
"delta", "extension", "final"};
for (const auto& value : values) {
kafka::extensions::SendValue(producer, topic, kafka::NullKey,
kafka::extensions::ValueFrom(value).AsConstBuffer(),
deliveryCb);
}
}
}
void DoProducerWork() {
// взято из конфигурации Docker Compose
const std::string brokers = "localhost:29092";
const kafka::Topic topic = "test-topic";
// Prepare the configuration
kafka::Properties props;
props.put("bootstrap.servers", brokers);
// Create a producer
KafkaProducer producer(props);
std::println(std::cout, "SendValues function");
SendValues(topic, producer);
}
int main() {
try {
DoProducerWork();
} catch (const std::exception& e) {
std::cerr << "Error: " << e.what() << std::endl;
return 1;
}
return 0;
}
Код Consumer-а:
#include <KafkaUtils.h>
#include <kafka/KafkaConsumer.h>
#include <csignal>
#include <iostream>
#include <string>
std::atomic_bool running = {true};
void StopRunning(int sig) {
if (sig != SIGINT) return;
if (running) {
running = false;
} else {
// Restore the signal handler, -- to avoid stuck with this handler
signal(SIGINT, SIG_IGN); // NOLINT
}
}
void DoConsumerWork() {
using namespace kafka;
using namespace kafka::clients::consumer;
const std::string brokers = "localhost:29092";
const Topic topic = "test-topic";
// Prepare the configuration
Properties props;
props.put("bootstrap.servers", brokers);
// Create a consumer instance
KafkaConsumer consumer(props);
// Subscribe to topics
consumer.subscribe({topic});
while (running) {
// Poll messages from Kafka brokers
for (const auto records = consumer.poll(std::chrono::milliseconds(100));
const auto& record : records) {
if (record.error()) {
std::cerr << record.toString() << std::endl;
continue;
}
std::cout << "Got a new message..." << std::endl;
std::cout << " Topic : " << record.topic() << std::endl;
std::cout << " Partition: " << record.partition() << std::endl;
std::cout << " Offset : " << record.offset() << std::endl;
std::cout << " Timestamp: " << record.timestamp().toString() << std::endl;
std::cout << " Headers : " << toString(record.headers()) << std::endl;
try {
const auto stringValue = kafka::extensions::ValueTo<std::string>(record.value());
std::cout << " STRING [" << stringValue << "]" << std::endl;
} catch (const std::exception& e) {
std::cerr << " Failed to deserialize as string: " << e.what() << std::endl;
}
try {
const auto floatValue = kafka::extensions::ValueTo<float>(record.value());
std::cout << " FLOAT [" << floatValue << "]" << std::endl;
} catch (const std::exception& e) {
std::cerr << " Failed to deserialize as float: " << e.what() << std::endl;
}
}
}
}
int main() {
// Use Ctrl-C to terminate the program
signal(SIGINT, StopRunning); // NOLINT
DoConsumerWork();
return 0;
}
Заключение
В этой статье я постарался совместить сразу несколько вещей: рассказать про Apache Kafka, про хранение данных внутри, про наличие определенных проблем и подходы к их решению. Написание текстов (помимо курсовых и дипломной работ) для меня в новинку, поэтому призываю каждого, кто прочитал, поделиться мыслями - с удовольствием почитаю и отвечу.
Демонстрацию сериализации и десериализации прямо в браузере можно посмотреть на сайте godbolt. Исходный код с настройкой Apache Kafka в Docker Compose находится в моем github-репозитории.