Всем привет. Написал бессерверный брокер сообщений, кому интересно прошу под кат.
Проект представляет из себя динамическую библиотеку с Си-интерфейсом.
Библиотеку назвал liner (в репе крейтов раста пришлось название сделать пошире liner_broker, поскольку занято имя уже было).
Следующие фичи в наличии сейчас:
высокая пропускная способность (ниже посмотрим)
гарантия доставки - хотя бы один раз (at-least-once), используется БД Redis для этого
размер сообщения не определен заранее и не ограничен
простой API: создать клиента, запустить и можно отправлять сообщения
интерфейсы для Python, C++, Rust
кроссплатформенность (windows, linux)
различные варианты передачи сообщений: один к одному, один ко многим, многие ко многим, и по подписке на топик
Передача данных внутри по TCP, без шифрования.
По плану статьи: посмотрим как реализована библиотека, напишем пример отправки 10к сообщений и сравним общее время отправки-получения с либой ZeroMQ.
Сразу посмотрим как выглядит пример отправки-получения сообщений на расте:
use liner_broker::Liner;
fn main() {
let mut client1 = Liner::new("client1", "topic_client1", "localhost:2255", "redis://localhost/");
let mut client2 = Liner::new("client2", "topic_client2", "localhost:2256", "redis://localhost/");
client1.run(Box::new(|_to: &str, _from: &str, _data: &[u8]|{
println!("receive_from {}", _from);
}));
client2.run(Box::new(|_to: &str, _from: &str, _data: &[u8]|{
println!("receive_from {}", _from);
}));
let array = [0; 100];
for _ in 0..10{
client1.send_to("topic_client2", array.as_slice());
println!("send_to client2");
}
}
Клиентов может быть несколько в одном пользовательском процессе. При создании клиента надо задать уникальное имя клиента, название топика, адрес клиента и адрес БД Redis (далее БД). При запуске клиента в работу надо задать колбек-функцию для получения данных, сообщения от всех отправителей будут приходить в нее.

Внутри либа состоит из двух крупных блоков: отправитель и получатель сообщений.
Отправитель - это отдельный поток для отправки сообщений клиентов, чтобы не тормозить клиентов и увеличить пропускную способность:
сообщения от клиента попадают сначала в промежуточный mempool (далее "пул" буду писать). Каждое сообщение получает порядковый номер.
отправляется уведомление в поток о новом сообщении
поток отправки при получении уведомления не сразу пишет сообщения в сокет, а немного ждет (по умолч 1мс, зашитый параметр компиляции), таким образом реализуется pipelining
сообщения для многих клиентов пишутся в сокеты независимо друг от друга в разных потоках (использую для этого крейт rayon)
после отправки сообщения идет приращение номера отправленного сообщения
этот же поток делает повторные попытки подключения к адресатам (новым либо потерявшим связь), период попыток по умолч 10сек (зашитый параметр компиляции)
при успешном подключении к клиенту, идет обращение в БД за прошлыми сообщениями, которые не смогли когда-то отправить. Эти старые сообщения будут отправлены снова в первую очередь.
сообщения не удаляем из памяти после отправки, ждем подтверждения получения и обработки
подтверждение получения основано на сравнении порядковых номеров сообщений, которые хранятся и обновляются в БД. Отправитель периодически читает последние обработанные номера сообщений из БД, и чистит память в пуле
когда не можем отправить сообщение (ловим ошибку записи в сокет), то все неподтвержденные сообщения сохраняем в БД
С отправителем закончили. Еще стоит добавить, что сообщения размером более 1мб (зашитый параметр компиляции) сжимаются, используется крейт zstd.
Получатель сообщений - здесь уже два потока, один для ожидания данных в сокетах,
второй - поток обработки, для вызова пользовательских колбеков:
цикл ожидания данных сделан на базе mio, благодаря ему имеем кроссплатформенность либы (в первых версиях использовал линуксовый epool, соответственно только линуксом был ограничен)
прием данных из сокетов также ведем в свободных потоках, пишем сообщения в пул.
поток обработки используется для вызова соответствующего пользовательского колбека, после успешного вызова порядковый номер сообщения обновляется в БД и сообщение освобождается в пуле
Мемпул собственной разработки (посмотрел что есть, подходящего прямо без переделок не нашел). Сначала без мемпула было, профилировал это все дело, много вызовов было выделения памяти. Работает мемпул таким образом:
в основе массив байт, сообщения в него записываются последовательно друг за другом, при недостатке свободного места массив ресайзится.
для индексации этого массива используется мапа типа <длина, массив свободных позиций>.
массив никогда не уменьшается в размере, только растет.
чтобы не происходило постоянного роста, сделана дефрагментация свободных участков, запускается в момент запроса выделения нового участка в зависимости от текущего общего свободного места. В этом месте могут быть тормоза конечно, в этот момент.
Для гарантии доставки сообщений используется БД Redis:
в БД сохраняются привязки уникальных имен клиентов к их адресам и топикам
номера последних отправленных и полученных сообщений
сами сообщения, которые не были отправлены по какой-либо причине
Сейчас посмотрим пример отправки 10к сообщений, каждое размером по 1024байт.
use std::time::SystemTime;
use std::{thread, time};
use std::sync::{ Arc, Mutex};
use liner_broker::Liner;
fn main() {
let mut client1 = Liner::new("client1", "topic_client1", "localhost:2255", "redis://localhost/");
let mut client2 = Liner::new("client2", "topic_client2", "localhost:2256", "redis://localhost/");
client1.clear_stored_messages();
client2.clear_stored_messages();
client1.clear_addresses_of_topic();
client2.clear_addresses_of_topic();
const MESS_SEND_COUNT: usize = 10000;
const MESS_SIZE: usize = 1024;
const SEND_CYCLE_COUNT: usize = 10;
let mut receive_count: i32 = 0;
let send_end = Arc::new(Mutex::new(0));
let _send_end = send_end.clone();
client1.run(Box::new(|_to: &str, _from: &str, _data: &[u8]|{
println!("receive_from {}", _from);
}));
client2.run(Box::new(move |_to: &str, _from: &str, _data: &[u8]|{
receive_count += 1;
if receive_count == MESS_SEND_COUNT as i32{
receive_count = 0;
println!("receive_from {} ms", current_time_ms() - *_send_end.lock().unwrap());
}
}));
let array = [0; MESS_SIZE];
for _ in 0..SEND_CYCLE_COUNT{
let send_begin = current_time_ms();
for _ in 0..MESS_SEND_COUNT{
client1.send_to("topic_client2", array.as_slice());
}
let send = current_time_ms();
println!("send_to {} ms", send - send_begin);
*send_end.lock().unwrap() = send;
thread::sleep(time::Duration::from_millis(1000));
}
}
fn current_time_ms()->u64{
SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.as_millis() as u64
}
alex@ubuntu2004:~/projects/rust/liner/cpp$ cargo build --release
Compiling liner_broker v1.1.2 (/home/alex/projects/rust/liner)
Finished `release` profile [optimized] target(s) in 5.62s
alex@ubuntu2004:~/projects/rust/liner/cpp$ cd ../target/release/
alex@ubuntu2004:~/projects/rust/liner/target/release$ ./throughput_10k
send_to 8 ms
receive_from 8 ms
send_to 5 ms
receive_from 5 ms
send_to 7 ms
receive_from 3 ms
send_to 11 ms
receive_from 3 ms
send_to 6 ms
receive_from 3 ms
send_to 3 ms
receive_from 4 ms
send_to 6 ms
receive_from 4 ms
send_to 12 ms
receive_from 3 ms
send_to 7 ms
receive_from 4 ms
send_to 7 ms
receive_from 3 ms
В итоге в среднем имеем 10мс общего времени.
В плюсовом клиенте медленнее получилось в 2 раза, думаю из-за косвенного вызова колбека. Посмотрим код плюсового примера ради интереса:
#include "liner_broker.h"
#include <ctime>
#include <iostream>
#include <chrono>
#include <thread>
const int MESS_SEND_COUNT = 10000;
const int MESS_SIZE = 1024;
const int SEND_CYCLE_COUNT = 30;
int main(int argc, char* argv[])
{
auto client1 = LinerBroker("client1", "topic_client1", "localhost:2255", "redis://localhost/");
auto client2 = LinerBroker("client2", "topic_client2", "localhost:2256", "redis://localhost/");
int receive_count = 0;
clock_t send_begin = clock();
clock_t send_end = clock();
client1.run([](const std::string& to, const std::string& from, const std::string& data){});
client2.run([&receive_count, &send_end](const std::string& to, const std::string& from, const std::string& data){
receive_count += 1;
if (receive_count == MESS_SEND_COUNT){
receive_count = 0;
std::cout << "receive_from " << 1000.0 * (clock() - send_end) / CLOCKS_PER_SEC << " ms" << std::endl;
}
});
char data[MESS_SIZE];
for (int i = 0; i < SEND_CYCLE_COUNT; ++i){
send_begin = clock();
for (int j = 0; j < MESS_SEND_COUNT; ++j){
client1.sendTo("topic_client2", data);
}
send_end = clock();
std::cout << "send_to " << 1000.0 * (send_end - send_begin) / CLOCKS_PER_SEC << " ms" << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
}
}
Теперь запустим аналогичный пример для ZeroMQ
#include <iostream>
#include <zmq_addon.hpp>
#include <ctime>
#include <chrono>
#include <thread>
int main()
{
zmq::context_t ctx;
zmq::socket_t sock1(ctx, zmq::socket_type::push);
zmq::socket_t sock2(ctx, zmq::socket_type::pull);
sock1.bind("tcp://127.0.0.1:*");
const std::string last_endpoint =
sock1.get(zmq::sockopt::last_endpoint);
std::cout << "Connecting to "
<< last_endpoint << std::endl;
sock2.connect(last_endpoint);
std::vector<zmq::const_buffer> send_msgs;
char mess[1024];
for (int i = 0; i < 10000; ++i){
send_msgs.push_back(zmq::str_buffer(mess));
}
for (int i = 0; i < 10; ++i){
auto send_begin = clock();
if (!zmq::send_multipart(sock1, send_msgs))
return 1;
std::vector<zmq::message_t> recv_msgs;
const auto ret = zmq::recv_multipart(
sock2, std::back_inserter(recv_msgs));
if (!ret)
return 1;
auto receive_end = clock();
std::cout << "send_to " << 1000.0 * (receive_end - send_begin) / CLOCKS_PER_SEC << " ms" << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
}
return 0;
}
alex@ubuntu2004:~/projects/rust/liner$ cd benchmark/compare_with_zeromq/
alex@ubuntu2004:~/projects/rust/liner/benchmark/compare_with_zeromq$ make
g++ -Wall -O2 -std=c++17 -g -Wno-write-strings -o compare_with_zmq compare_with_zmq.cpp -lzmq
alex@ubuntu2004:~/projects/rust/liner/benchmark/compare_with_zeromq$ ./compare_with_zmq
Connecting to tcp://127.0.0.1:34079
send_to 20.198 ms
send_to 16.504 ms
send_to 11.5 ms
send_to 13.153 ms
send_to 10.964 ms
send_to 10.788 ms
send_to 10.785 ms
send_to 11.119 ms
send_to 11.348 ms
send_to 10.826 ms
Имеем примерно тоже самое. Скорее всего ZeroMQ еще можно как-то подкрутить (сокеты на пайпы заменить, буферами внутренними поиграться и тп) и он быстрее будет думаю.
Ну вот и все пожалуй.
Пока в планах следующие задачки:
тестов побольше надо сделать, сейчас есть несколько
веб-страничку сделать с описанием, примерами, кейсами типовыми
интерфейсы для других языков: Go, C#, Java
ошибки думаю возвращать через колбек, сейчас в поток ошибок пишет, мбыть неудобно
мемпул протестить, насколько он тормозит при дефрагментации, мбыть что-нидь там улучшить
опционально выбирать другие БД (добавить sqlite, duckDb) или вообще без БД, если не нужна гарантированная доставка
расширить на другие протоколы внутри (сейчас только TCP)
статьи восторженные написать надо кому-то на реддите, медиуме..
Присоединяйтесь к разработке кто хочет-может, или продвижению, если есть интерес к этому делу.
Спасибо.
PS: лицензия MIT, ссылка на гитхаб