Как стать автором
Обновить

Десктопный клиент для Apache Kafka, преобразуем protobuf в json

Время на прочтение5 мин
Количество просмотров2.8K

Продолжаю делать пилить свой petproject. Что нового с прошлой публикацией:

  • запись сообщений в кафку;

  • создание/удаление топиков;

  • бинарные сборки для OSX и Windows.

Сейчас подошел к тому ради чего все это затевалось: декодирование protobuf без schema registry и кодогенерации.

Чем же неудобен protobuf?

Если опустить его бинарную природу, то он позволяет писать так

//order.proto
syntax = "proto3";

package order;

import "enums.proto";
import "google/protobuf/timestamp.proto";

message EventOrderEnrichment {
	string shipment_uuid = 1;
	string order_uuid = 2;
	string place_uuid = 3;
	enums.ShipmentStatus shipment_status = 4;
	string shipment_type = 5;
	uint64 weight = 6;
	enums.Location client_location = 7;
	enums.Location place_location = 8;
	uint64 assembly_time_min = 9;
	repeated string assembly = 10;
	repeated string delivery = 11;
	optional DispatchMeta dispatch_meta = 12;
	optional Settings settings = 13;
}

message Settings {
	uint64 max_order_assign_retry_count = 1;
	uint64 avg_parking_min_vehicle = 2;
	uint64 max_current_order_assign_queue = 3;
	fixed64 order_weight_threshold_to_assign_to_vehicle_gramms = 4;
	uint64 average_speed_for_straight_distance_to_client_min = 5;
	uint64 additional_factor_for_straight_distance_to_client_min = 6;
	uint64 order_transfer_time_from_assembly_to_delivery_min = 7;
	uint64 avg_to_place_min_external = 8;
	uint64 avg_to_place_min = 9;
	bool place_location_center = 10;
	uint64 search_radius_transport_pedestrian = 11;
	uint64 search_radius_transport_auto = 12;
	uint64 search_radius_transport_bike = 13;
	uint64 last_position_expire = 14;
}

message DispatchMeta {
	uint64 dispatch_count = 1;
	google.protobuf.Timestamp dispatch_start = 2;
	repeated string dispatch_ids = 3;
	string dispatch_id = 4;
	optional Tasks decline_task = 5;
	optional string decline_performer_uuid = 6;
}

enum Tasks {
	DELIVERY = 0;
	ASSEMBLY = 1;
	ASSEMBLY_AND_DELIVERY = 2;
}

message Location {
	double latitude = 1;
	double longitude = 2;
}
// enums.proto
syntax = "proto3";

package enums;

enum ShipmentStatus {
	NEW = 0;
	POSTPONED = 1;
	AUTOMATIC_ROUTING = 2;
	MANUAL_ROUTING = 3;
	OFFERING = 4;
	OFFERED = 5;
	DECLINED = 6;
	CANCELED = 7;
}

message Location {
	double latitude = 1;
	double longitude = 2;
}

Если вы хотите декодировать protobuf, то нужно указать:

  • где найти фай proto файлы(order.proto, enums.proto, timestamp.proto);

  • тип сообщения.

Реализация на C++

Какие классы из C++ API потребуются

В Google не используют исключения, поэтому ошибки парсинга proto файла будем ловить наследником от MultiFileErrorCollector

class ProtobufErrorCollector final : public google::protobuf::compiler::MultiFileErrorCollector
{
public:
    void AddError(const std::string &filename,
                  int line,
                  int column,
                  const std::string &message) override;

    void AddWarning(const std::string &filename,
                    int line,
                    int column,
                    const std::string &message) override;

    QStringList errors() const;

    bool hasErrors() const;

private:
    QStringList m_messages;
};
/// 
void ProtobufErrorCollector::AddError(const std::string &filename,
                                      int line,
                                      int column,
                                      const std::string &message)
{
    m_messages << QString("error file: %1, line: %2, column: %3 %4")
                      .arg(QString::fromStdString(filename))
                      .arg(line)
                      .arg(column)
                      .arg(QString::fromStdString(message));
}

void ProtobufErrorCollector::AddWarning(const std::string &filename,
                                        int line,
                                        int column,
                                        const std::string &message)
{
    m_messages << QString("warning file: %1, line: %2, column: %3 %4")
                      .arg(QString::fromStdString(filename))
                      .arg(line)
                      .arg(column)
                      .arg(QString::fromStdString(message));
}

QStringList ProtobufErrorCollector::errors() const
{
    return m_messages;
}

bool ProtobufErrorCollector::hasErrors() const
{
    return !m_messages.isEmpty();
}

SourceTree это абстрактное дерево каталогов. Его наследник DiskSourceTree позволяет нам класть proto файлы в структуру каталогов

dir/
  order.proto
  enums.proto
  google/
    protobuf/
      timestamp.proto

Каждый раз раскладывать proto файлы от Google неудобно. Поэтому было принято решение таскать эти файлы в самом бинарнике. Так появился ProtobufSourceTree

class ProtobufSourceTree final : public google::protobuf::compiler::DiskSourceTree
{
public:
    google::protobuf::io::ZeroCopyInputStream *Open(const std::string &filename) override;

    void Add(const QDir &dir);

private:
    static google::protobuf::io::ZeroCopyInputStream *openFromResources(const std::string &filename);
};
///
class ByteArrayInputStream final : public google::protobuf::io::ArrayInputStream
{
public:
    explicit ByteArrayInputStream(QByteArray &&data)
        : ArrayInputStream(data.data(), data.size())
        , m_data(std::move(data))
    {}

private:
    QByteArray m_data;
};


google::protobuf::io::ZeroCopyInputStream *ProtobufSourceTree::Open(const std::string &filename)
{
    static QSet<std::string> inResources = {"google/protobuf/any.proto",
                                            "google/protobuf/api.proto",
                                            "google/protobuf/descriptor.proto",
                                            "google/protobuf/duration.proto",
                                            "google/protobuf/empty.proto",
                                            "google/protobuf/field_mask.proto",
                                            "google/protobuf/source_context.proto",
                                            "google/protobuf/struct.proto",
                                            "google/protobuf/timestamp.proto",
                                            "google/protobuf/type.proto",
                                            "google/protobuf/wrappers.proto"};

    if (inResources.contains(filename)) {
        return openFromResources(filename);
    }

    return DiskSourceTree::Open(filename);
}

google::protobuf::io::ZeroCopyInputStream *ProtobufSourceTree::openFromResources(
    const std::string &filename)
{
    using namespace google::protobuf::io;

    QString path(QString(":/%1").arg(QString::fromStdString(filename)));
    QFile file(path);
    if (!file.open(QIODevice::ReadOnly)) {
        spdlog::error("failed open file {} from resources error {}",
                      path.toStdString(),
                      file.errorString().toStdString());
        return nullptr;
    }
    auto data = file.readAll();
    file.close();

    return new ByteArrayInputStream(std::move(data));
}

void ProtobufSourceTree::Add(const QDir &dir)
{
    QString path = dir.path();
#ifdef Q_OS_WINDOWS
    if (path.front() == '/') {
        path = path.remove(0, 1);
    }
#endif
    MapPath("", path.toStdString());
}

ByteArrayInputStream откровенный костыль, за то не нужно реализовывать все методы ZeroCopyInputStream. Тут стоит обратить внимание на вызов DiskSourceTree::MapPath. Первый параметр пустой, что заставляет второй параметр интерпретировать как путь к каталогу

void DiskSourceTree::MapPath(
        const std::string & virtual_path,
        const std::string & disk_path)

Парсим proto файл и получаем список типов, который выведем в UI

    using namespace google::protobuf;
    using namespace google::protobuf::compiler;

    QFileInfo info(m_file.path());

    ProtobufErrorCollector errors;
    ProtobufSourceTree sources;
    sources.Add(info.dir());

    SourceTreeDescriptorDatabase database(&sources, nullptr);
    database.RecordErrorsTo(&errors);

    DescriptorPool pool(&database, database.GetValidationErrorCollector());
    pool.EnforceWeakDependencies(true);

    const auto *const fileDescriptor = pool.FindFileByName(info.fileName().toStdString());
    // обработка ошибок

    beginResetModel();
    m_messages.clear();
    for (int i = 0; i < fileDescriptor->message_type_count(); i++) {
        m_messages << QString::fromStdString(fileDescriptor->message_type(i)->name());
    }
    endResetModel();

Собираем Message. Обратите внимание, что имя включает в себя имя пакета

    const auto package = fileDescriptor->package();
    const auto messageType = package + "." + message.toStdString();

    const auto *const typeDescriptor = pool->FindMessageTypeByName(messageType);
    // обработка ошибок
    auto factory = std::make_unique<DynamicMessageFactory>(pool.get());
    auto *dynamicMessage = factory->GetPrototype(typeDescriptor)->New();

Само преобразование

QByteArray ProtobufConverter::toJSON(QByteArray &&binary)
{
    using namespace google::protobuf::util;

    m_message->Clear();
    if (!m_message->ParseFromArray(binary.data(), binary.size())) {
        return errParse;
    }

    JsonPrintOptions opt;
    std::string json;
    MessageToJsonString(*m_message, &json, opt);
    return QByteArray(json.c_str(), json.size());
}

На этом все. Весь код доступен на GitHub, бинарные сборки на странице релизов

Теги:
Хабы:
Если эта публикация вас вдохновила и вы хотите поддержать автора — не стесняйтесь нажать на кнопку
Рейтинг0
Комментарии0

Публикации

Истории

Работа

Программист C++
136 вакансий
QT разработчик
8 вакансий

Ближайшие события