Существует спецификация протокола JSON-RPC JSON-RPC Specification. При поиске мне не удалось быстро найти реализацию этого протокола для языков C/C++, и, поскольку реализации не было, пришлось её написать. Данная статья описывает библиотеку muRPC для создания сервера и клиента для протокола JSON-RPC. Реализация имеет примеры и юнит-тесты, чтобы проверить функциональность и производительность. Обычный режим работы предполагает, что один из клиентов JSON-RPC предоставляет какие-то методы и сообщает об этом серверу. Тогда другие клиенты JSON-RPC могут эти методы вызывать и получать ответ. Сервер предоставляет маршрутизацию и валидацию сообщений между клиентами. В качестве примера работы в примерах находится код клиента, который делает вычисления на CUDA-ядрах и возвращает результат тому, кто его запросил. Область применения — это любые системы, где требуется обмен сообщениями, удобно соединяемый с вызовом программных функций. Сервер получился производительным и масштабируемым по ядрам CPU практически линейно.
Код проекта доступен по адресу
О протоколе JSON-RPC
JSON-RPC — это протокол удалённого вызова процедур (Remote Procedure Call, RPC), использующий JSON (JavaScript Object Notation) в качестве формата обмена данными.
Протокол позволяет клиенту вызывать методы на сервере так, как если бы они выполнялись локально, но при этом использует JSON для сериализации данных и, например,
TCP в качестве транспортного протокола.
Стандарт протокола
JSON-RPC 2.0 — это официальный стандарт протокола, опубликованный в 2010 году. Он определяет формат сообщений, структуру запросов, ответов и уведомлений. Основные элементы протокола:
jsonrpc: Версия протокола (должна быть "2.0")method: Имя вызываемого методаparams: Параметры вызова (массив или объект)id: Идентификатор запроса (для сопоставления ответов с запросами)result: Результат успешного вызоваerror: Описание ошибки при неудачном вызове
Пример вызова метода:
{ "jsonrpc": "2.0", "method": "subtract", "params": [42, 23], "id": 1 }
Пример ответа:
{ "jsonrpc": "2.0", "result": 19, "id": 1 }
Полная спецификация протокола приведена на страничке https://www.jsonrpc.org/specification
Маппинг вызовов JSON-RPC на C-подобные функции
Вызовы JSON-RPC маппятся на C-подобные функции следующим образом: параметры, переданные в JSON-объекте, преобразуются в аргументы целевой функции, а результат выполнения функции оборачивается в JSON-ответ. Такая схема позволяет легко интегрировать существующие C/C++ функции в RPC-интерфейс.
Вызов из примера выше:
{"jsonrpc": "2.0", "method": "subtract", "params": [42, 23], "id": 1}
Вызывает функцию с именем subtract и параметрами 42 и 23. На выходе возвращает результат:
int subtract(int a, int b) { return a - b; }
Архитектура muRPC
muRPC — это легковесная библиотека RPC на C++, обеспечивающая функциональность JSON-RPC. Она предоставляет разработчикам простой способ создания
клиент-серверных приложений с использованием вызовов методов по сети с использованием формата JSON.
Компоненты системы
Сервер: Основной компонент, занимающийся маршрутизацией сообщений между клиентами
Клиент: Предоставляет функциональность для выполнения RPC-вызовов к серверу, регистрации методов, прослушивания и отправки уведомлений.
Серверная часть
muRPC — это библиотека, которая позволяет создать сервер или клиента. Сервер muRPC реализован для поддержки большого количества одновременных соединений. Сервер в muRPC действует как посредник, позволяя клиентам регистрировать свои методы и вызывать методы других клиентов. Важно отметить, что сервер, как правило, осуществляет маршрутизацию сообщений между клиентами, а не предоставление собственных методов. Хотя muRPC также поддерживает создание собственных методов на сервере. Это позволяет строить децентрализованные системы, где каждый клиент может предоставлять свои сервисы другим клиентам через сервер.
Ключевые компоненты сервера:
Менеджер соединений
Реестр клиентов и их зарегистрированных методов
Система маршрутизации запросов между клиентами
Система уведомлений (публикация/подписка)
Механизмы управления сессиями и таймаутами
Поддержание статистики по соединениям и обработанным сообщениям
Отслеживание статуса клиентов и их зарегистрированных методов
Логирование всех операций для отладки и мониторинга
Обеспечение централизованного контроля над системой
Парсинг и валидация JSON-сообщений
Клиентская часть
Клиентская сторона muRPC пред��ставляет API для выполнения и предоставления удаленных вызовов. Она поддерживает как синхронные, так и асинхронные вызовы,
регистрацию собственных методов, уведомления и подписку на события других клиентов.
Режимы вызовов
muRPC поддерживает два режима выполнения вызовов:
Асинхронные вызовы - основной режим работы:
Используется метод
call()в C++ и PythonВызов не блокирует выполнение программы
Результат обрабатывается через callback-функцию
Подходит для высокопроизводительных приложений
Синхронные вызовы - вспомогательный режим:
Используется метод
call_sync()в C++ и PythonВызов блокирует выполнение до получения результата
Подходит для простых сценариев и инициализации
Клиент может:
Выполнять вызовы методов других клиентов (в обоих режимах)
Предоставлять свои собственные методы для вызова другими клиентами
Подписываться и отписываться от событий (уведомлений)
Отправлять уведомления другим клиентам
Регистрировать собственные методы для вызова другими клиентами
Использование cepoll в muRPC
Первую версию я написал с использованием boost::asio::read_until. В целом она работала неплохо, но нужно было поддерживать разделители между JSON-сообщениями, состоящие из нескольких символов, и мало кто понимал, как внутри работает boost::asio ввиду объёмности исходного кода. Была попытка минимизировать boost::asio, но вся используемая цепочка заголовков C++ занимала около 5 Мб. Ну и, честно говоря, хотелось полного
контроля над системой низкого уровня. Поэтому было решено переписать boost::asio::read_until на чистом C и использовать его. А поскольку основным языком является C, то название cepoll показалось мне подходящим.
cepoll — это библиотека, которая является частью этого проекта и обеспечивает многопоточный I/O на высокой скорости. Данная библиотека реализует системный вызов epoll примерно так же, как это делает nginx. При этом используется EPOLLEXCLUSIVE-распределение с привязкой соединений к потокам-обработчикам ( или воркерами от английского слова worker).
Библиотека cepoll обеспечивает:
Многопоточную обработку событий
Поддержку разделителей сообщений
Управление соединениями и таймаутами
CPU affinity для оптимизации производительности
Nginx-��одобную архитектуру с мастер-воркерами
Простую реализацию для лучшего понимания и отладки
Nginx-подобная архитектура cepoll:
Библиотека cepoll реализует архитектуру, аналогичную Nginx, с использованием мастер-воркеров:
Мастер-процесс: Принимает новые соединения и распределяет их между воркерами
Воркер-процессы: Обрабатывают I/O-операции с использованием epoll
Разделение нагрузки: Новые клиенты распределяются между воркерами по алгоритму round-robin
EPOLLEXCLUSIVE: Используется для более равномерного распределения событий между воркерами
Распределение информации по потокам:
При поступлении нового соединения:
Мастер-процесс принимает соединение
Соединение назначается одному из воркеров (по алгоритму round-robin)
Файловый дескриптор добавляется в epoll-инстанс назначенного воркера
Все последующие операции с этим соединением обрабатываются только назначенным воркером
Каждый воркер имеет свой собственный epoll-инстанс и обрабатывает только свои соединения. Это позволяет эффективно распределять нагрузку между несколькими потоками и использовать все доступные ядра процессора.
Поддержка разделителей сообщений:
Библиотека cepoll реализует функцию ce_read_until, которая позволяет читать данные до определённого разделителя, что особенно полезно для протоколов, основанных на текстовых сообщениях, таких как JSON-RPC. Эта функция эффективно обрабатывает буферизированные данные и ищет указанный разделитель, позволяя корректно разделять сообщения даже при частичном получении данных.
Управление буферами и обработка сообщений:
Библиотека cepoll управляет буферами для каждого соединения, позволяя обрабатывать большие сообщения и множественные сообщения в одном пакете.
При получении данных:
Данные помещаются в буфер соединения
Выполняется поиск разделителя в буфере
При нахождении разделителя вызывается соответствующий обработчик
Оставшиеся данные остаются в буфере для последующей обработки
Обработка ошибок и таймаутов:
cepoll включает механизмы обработки ошибок и таймаутов:
Обнаружение разрыва соединения
Управление таймаутами для неактивных соединений
Защита от атак типа "отказ в обслуживании" через ограничение размера сообщений
Корректная очистка ресурсов при закрытии соединений
Примеры использования muRPC
Рассмотрим примеры использования muRPC для создания системы с несколькими клиентами, взаимодействующими через сервер:
// Пример сервера #include <muRPC/server.hpp> int main() { // Создание сервера с конфигурацией std::string config = R"({ "port": 8080, "thread_count": 4, "keepalive_timeout_ms": 5000, "max_message_size": 4096 })"; muRPC::server srv(config); // Сервер осуществляет маршрутизацию, а не предоставление методов // Методы регистрируются клиентами return 0; }
// Пример клиента 1 — регистрирует метод #include <muRPC/client.hpp> int main() { std::string config = R"({ "host": "localhost", "port": 8080 })"; muRPC::Client cli(config); // Регистрация клиента auto reg_result = cli.register_client("client1"); if (!reg_result.contains("result")) { std::cout << "Failed to register client: " << muRPC::serialize_json(reg_result["error"]) << std::endl; return 1; } // Регистрация метода на клиенте cli.register_method("add", [](const muRPC::Json& params) { int a = static_cast<int>(params[0].get_number()); int b = static_cast<int>(params[1].get_number()); return a + b; }); return 0; }
// Пример клиента 2 — вызывает метод у другого клиента #include <muRPC/client.hpp> int main() { std::string config = R"({ "host": "localhost", "port": 8080 })"; muRPC::Client cli(config); // Регистрация клиента auto reg_result = cli.register_client("client2"); if (!reg_result.contains("result")) { std::cout << "Failed to register client: " << muRPC::serialize_json(reg_result["error"]) << std::endl; return 1; } // Подготовка параметров для вызова muRPC::Json params = muRPC::Json::array(); params.set(0, static_cast<int64_t>(5)); params.set(1, static_cast<int64_t>(3)); // Вызов метода, зарегистрированного другим клиентом auto result = cli.call_sync("add", params); std::cout << "Result: " << result.as<int>() << std::endl; return 0; }
TCP транспорт в muRPC
Текущая версия muRPC использует TCP протокол в качестве транспортного уровня. Между клиентом и сервером открывается одно TCP соединение в котором осуществляется вся коммуникация.
Клиент отправляет на сервер запросы в виде сообщений упакованных в JSON формат:
{"jsonrpc": "2.0", "method": "keepAlive" }\n{"jsonrpc": "2.0", "method": "subtract", "params": [42, 23], "id": 1}\n
И получает ответы от сервера тоже в JSON формате:
{ "jsonrpc": "2.0", "result": 19, "id": 1 }
Протокол TCP обеспечивает надёжную и упорядоченную доставку данных: байты, записанные в сокет на одной стороне, будут получены на удалённой стороне в том же порядке, без потерь и дублирования. Однако TCP работает с потоком байтов, а не с отдельными «сообщениями», поэтому приложения должны сами разделять логические сообщения (например, с помощью длины или разделителей). При условии, что каждое сообщение записывается целиком и последовательно (например, из одного потока), его содержимое не перемешается с другими сообщениями. Благодаря гарантии порядка доставки TCP, приложению не нужно самостоятельно обрабатывать переупорядочивание пакетов. Для сопоставления ответа с запросом используется поле id в формате JSON.
Интересно что при отправках сообщений меньше длины стандартного пакета (1500 байт сообщение + TCP заголовки) сообщение передастся в виде одного сетевого пакета. По этому сжимать или использовать бинарный протокол особо смысла не имеет. Если нужно передать большой объем информации, то сжимать стоит именно payload. А вот в плюс можно записать удобное средство отладки - tcpdump видит протокольные запросы ответы.
Таймауты в muRPC
muRPC предоставляет гибкие возможности настройки таймаутов для различных сценариев использования. Это позволяет адаптировать поведение системы под конкретные
требования производительности и надежности.
Типы таймаутов
В muRPC реализованы следующие типы таймаутов:
Таймаут вызова (call timeout): Определяет максимальное время ожидания ответа на RPC-вызов. Если ответ не получен в течение
указанного времени, вызов считается неудавшимся и генерируется ошибка таймаута.Таймаут keepalive: Определяет максимальное время бездействия соединения до его автоматического закрытия.
Это помогает освобождать ресурсы, занятые неактивными соединениями.Таймаут регистрации: Определяет максимальное время, за которое клиент должен зарегистрироваться на сервере после
установки соединения. Если клиент не регистрируется в течение этого времени, соединение может быть закрыто.
Конфигурация таймаутов
Таймауты можно настроить как на стороне сервера, так и на стороне клиента:
Настройка таймаутов на сервере
На сервере можно задать таймауты в конфигурационном файле или при создании экземпляра сервера:
std::string config = R"({ "port": 8080, "thread_count": 4, "keepalive_timeout_ms": 5000, // 5 секунд таймаута для бездействующих соединений "registration_timeout_ms": 10000, // 10 секунд на регистрацию клиента "max_message_size": 4096 })"; muRPC::server srv(config);
Настройка таймаутов на клиенте
На клиенте можно задать таймауты в конфигурации:
std::string config = R"({ "host": "localhost", "port": 8080, "client_id": "client1", "call_timeout_ms": 30000 // 30 секунд таймаута для вызовов })"; muRPC::client cli(config);
Python-интерфейс
В репозитории проекта muRPC есть клиентский Python-интерфейс. Python-интерфейс медленнее, чем C++, но реализует все возможности C++-версии.
Python-клиент реализован в модуле murpc_client.py и включает в себя:
Поддержку синхронных и асинхронных вызовов
Управление соединениями через пул соединений
Обработку таймаутов и потери соединений
Поддержку регистрации методов, уведомлений и подписок
Код python клиента был сгенерирован из C++ кода, он полностью рабочий, но возможно для реального использования
потребуется его чистка.
Пример использования Python-клиента:
from murpc_client import MuRPCClient # Конфигурация клиента config = { "host": "127.0.0.1", "port": 8080, "client_id": "python_example_client", "call_timeout_ms": 10000 } # Создание клиента client = MuRPCClient(config) # Регистрация метода def hello_handler(params): name = params.get("name", "World") return {"message": f"Hello, {name}!"} client.register_method("hello", hello_handler) # Подготовка параметров для вызова params = {"param": "value"} # Асинхронный вызов def callback(response): print(f"Response: {response}") client.call("some_method", params, callback) # Подготовка параметров для вызова params = {} # Синхронный вызов response = client.call_sync("get_server_stats", params) print(f"Server stats: {response}")
Производительность muRPC
Для тестирования производительности muRPC была разработана утилита perf_test, которая измеряет основные метрики:
PS (запросов в секунду) и пропускную способность канала (скорость обмена сообщениями). Вот значения при запуске
на моем ноутбуке 13th Gen Intel(R) Core(TM) i9-13900H на 12 thread_count в конфиге сервера без привязки к CPU:
Максимальная производительность
При тестировании с 10 соединениями и малым payload (50 байт) достигнута максимальная производительность:
RPS: 24 000 запросов в секунду
Пропускная способность: 33 Mbps (исходящий трафик)
При работе с большими payload (102400 байт) система демонстрирует высокую пропускную способность:
RPS: 1 700 запросов в секунду (при 50 соединениях)
Пропускная способность: 1 400 Mbps (исходящий трафик)
Нагрузка по воркерам на сервере масштабирается практически горизонтально и воркеры друг друга не блокируют.
Основная нагрузка сервера это как ра�� библиотека RapidJSON как видно на perf трейсе:
27,94% libmuRPC.so [.] void rapidjson::GenericReader<rapidjson::UTF8<char>, rapidjson::UTF8<char>, rapidjson::CrtAllocator>::ParseStringToStream<0u, rapidjson::UTF8<char>, rapidjson::UTF 18,67% libmuRPC.so [.] rapidjson::Writer<rapidjson::GenericStringBuffer<rapidjson::UTF8<char>, rapidjson::CrtAllocator>, rapidjson::UTF8<char>, rapidjson::UTF8<char>, rapidjson::CrtAlloc 12,53% libmuRPC.so [.] ce_read_until 9,94% libc.so.6 [.] __memmove_avx_unaligned_erms 2,25% [kernel] [k] sync_regs 1,71% [kernel] [k] _copy_to_iter 1,41% [kernel] [k] native_irq_return_iret 1,13% [kernel] [k] clear_page_erms 0,71% [kernel] [k] __mod_memcg_lruvec_state 0,63% [kernel] [k] __pte_offset_map_lock 0,59% [kernel] [k] __count_memcg_events 0,55% libc.so.6 [.] __strlen_avx2 0,49% libc.so.6 [.] _int_malloc 0,46% [kernel] [k] lru_gen_add_folio 0,46% [kernel] [k] _copy_from_iter
Почему именно RapidJSON?
Первый эксперимент был с libjsoncpp. Однако его скорость парсинга не показалась мне высокой. Альтернативы были nlohmann - так как это фактически один include файл или что-то еще. На github странице nlohman я нашёл интересную ссылку - "Speed. There are certainly faster JSON libraries out there." (Speed comparison of JSON libraries) на сравнение скорости работы разных JSON библиотек. Чтобы их померять на реальных тестах я решил поддерживать разные варианты. В репозитории сейчас существует RapidJSON json_adapter_rapidjson.cpp) и Nlohmann(json_adapter_nlohmann.cpp). Самым быстрым оказалась связка RapidJson и tcmalloc.
Лицензия
Данный проект muRPC и cepoll распространяется под лицензией MIT, которая позволяет свободно использовать, копировать, изменять и распространять программное обеспечение, при условии сохранения уведомления об авторских правах и отказе от гарантий.
Лицензии зависимостей
nlohmann/json: Библиотека для работы с JSON, лицензия MIT
RapidJSON: Альтернативная библиотека для работы с JSON, лицензия MIT
Также существует коммерческая версия muRPC с расширенными возможностями. За дополнительной информацией обращайтесь по адресу info@skbuff.ru.
Заключение
muRPC представляет собой мощную и гибкую библиотеку для реализации RPC-коммуникации с использованием JSON-RPC. Особенность архитектуры заключается в том,
что сервер осуществляет маршрутизацию сообщений между клиентами, а не предоставление собственных методов. Это позволяет строить децентрализованные системы, где каждый клиент может быть как потребителем, так и поставщиком сервисов.
Сочетание асинхронной обработки с использованием cepoll делает muRPC особенно подходящей для высоконагруженных приложений с большим количеством клиентов.
Возможные улучшения и будущие направления развития
Перспективы развития muRPC включают в себя несколько ключевых направлений, каждое из которых направлено на расширение функциональности, повышение безопасности
и производительности системы.
Одним из важных направлений является расширение поддержки транспортных протоколов. В настоящее время muRPC использует TCP в качестве основного транспорта,
но планируется добавить поддержку других протоколов, таких как HTTP и UDP. Это позволит использовать библиотеку в более широком спектре приложений, включая те, где важна низкая задержка (UDP) или совместимость с веб-стандартами (HTTP). Такая гибкость в выборе транспорта сделает muRPC еще более универсальным решением для различных сценариев использования.
Безопасность также остается приоритетом в планах по развитию проекта. В настоящее время рассматриваются возможности для внедрения механизмов аутентификации
и шифрования. Уже существует тестовый патч для поддержки TLS-соединений с использованием сертификатов, что позволит обеспечить защищенную передачу данных
между клиентами и сервером. Это особенно важно для приложений, обрабатывающих чувствительную информацию или работающих в ненадежных сетевых средах.
Еще одним важным направлением является расширение возможностей мониторинга и диагностики. Планируется добавить более подробные метрики производительности,
инструменты для профилирования и отладки, а также улучшить логирование. Это поможет разработчикам легче выявлять и устранять проблемы, а также лучше понимать
поведение системы в продакшене.
Что касается производительности, то в будущем планируется исследование возможностей для дальнейшей оптимизации. Одной из перспективных областей является
использование SIMD (Single Instruction, Multiple Data) для ускорения парсинга JSON. Это может значительно повысить производительность при обработке большого
объема данных, особенно на современных процессорах, поддерживающих соответствующие инструкции.
Все эти улучшения направлены на то, чтобы сделать muRPC еще более надежной, безопасной и производительной библиотекой для построения распределенных систем.
Проект продолжает развиваться как открытый исходный код, и сообщество может внести свой вклад в его развитие. Мы приглашаем всех заинтересованных разработчиков принять участие в проекте и вместе делать его лучше.
