Всем привет. Написал бессерверный брокер сообщений, кому интересно прошу под кат.

Проект представляет из себя динамическую библиотеку с Си-интерфейсом.

Библиотеку назвал liner (в репе крейтов раста пришлось название сделать пошире liner_broker, поскольку занято имя уже было).

Следующие фичи в наличии сейчас:

  • высокая пропускная способность (ниже посмотрим)

  • гарантия доставки - хотя бы один раз (at-least-once), используется БД Redis для этого

  • размер сообщения не определен заранее и не ограничен

  • простой API: создать клиента, запустить и можно отправлять сообщения

  • интерфейсы для Python, C++, Rust

  • кроссплатформенность (windows, linux)

  • различные варианты передачи сообщений: один к одному, один ко многим, многие ко многим, и по подписке на топик

Передача данных внутри по TCP, без шифрования.

По плану статьи: посмотрим как реализована библиотека, напишем пример отправки 10к сообщений и сравним общее время отправки-получения с либой ZeroMQ.

Сразу посмотрим как выглядит пример отправки-получения сообщений на расте:

use liner_broker::Liner;

fn  main() {

    let mut client1 = Liner::new("client1", "topic_client1", "localhost:2255", "redis://localhost/");
    let mut client2 = Liner::new("client2", "topic_client2", "localhost:2256", "redis://localhost/");
   
    client1.run(Box::new(|_to: &str, _from: &str, _data: &[u8]|{
        println!("receive_from {}", _from);
    }));
    client2.run(Box::new(|_to: &str, _from: &str, _data: &[u8]|{
        println!("receive_from {}", _from);
    }));

    let array = [0; 100];
    for _ in 0..10{
        client1.send_to("topic_client2", array.as_slice());
        println!("send_to client2");       
    }
}

Клиентов может быть несколько в одном пользовательском процессе. При создании клиента надо задать уникальное имя клиента, название топика, адрес клиента и адрес БД Redis (далее БД). При запуске клиента в работу надо задать колбек-функцию для получения данных, сообщения от всех отправителей будут приходить в нее.

Архитектура библиотеки liner
Архитектура библиотеки liner

Внутри либа состоит из двух крупных блоков: отправитель и получатель сообщений.

Отправитель - это отдельный поток для отправки сообщений клиентов, чтобы не тормозить клиентов и увеличить пропускную способность:

  • сообщения от клиента попадают сначала в промежуточный mempool (далее "пул" буду писать). Каждое сообщение получает порядковый номер.

  • отправляется уведомление в поток о новом сообщении

  • поток отправки при получении уведомления не сразу пишет сообщения в сокет, а немного ждет (по умолч 1мс, зашитый параметр компиляции), таким образом реализуется pipelining

  • сообщения для многих клиентов пишутся в сокеты независимо друг от друга в разных потоках (использую для этого крейт rayon)

  • после отправки сообщения идет приращение номера отправленного сообщения

  • этот же поток делает повторные попытки подключения к адресатам (новым либо потерявшим связь), период попыток по умолч 10сек (зашитый параметр компиляции)

  • при успешном подключении к клиенту, идет обращение в БД за прошлыми сообщениями, которые не смогли когда-то отправить. Эти старые сообщения будут отправлены снова в первую очередь.

  • сообщения не удаляем из памяти после отправки, ждем подтверждения получения и обработки

  • подтверждение получения основано на сравнении порядковых номеров сообщений, которые хранятся и обновляются в БД. Отправитель периодически читает последние обработанные номера сообщений из БД, и чистит память в пуле

  • когда не можем отправить сообщение (ловим ошибку записи в сокет), то все неподтвержденные сообщения сохраняем в БД

С отправителем закончили. Еще стоит добавить, что сообщения размером более 1мб (зашитый параметр компиляции) сжимаются, используется крейт zstd.

Получатель сообщений - здесь уже два потока, один для ожидания данных в сокетах,
второй - поток обработки, для вызова пользовательских колбеков:

  • цикл ожидания данных сделан на базе mio, благодаря ему имеем кроссплатформенность либы (в первых версиях использовал линуксовый epool, соответственно только линуксом был ограничен)

  • прием данных из сокетов также ведем в свободных потоках, пишем сообщения в пул.

  • поток обработки используется для вызова соответствующего пользовательского колбека, после успешного вызова порядковый номер сообщения обновляется в БД и сообщение освобождается в пуле

Мемпул собственной разработки (посмотрел что есть, подходящего прямо без переделок не нашел). Сначала без мемпула было, профилировал это все дело, много вызовов было выделения памяти. Работает мемпул таким образом:

  • в основе массив байт, сообщения в него записываются последовательно друг за другом, при недостатке свободного места массив ресайзится.

  • для индексации этого массива используется мапа типа <длина, массив свободных позиций>.

  • массив никогда не уменьшается в размере, только растет.

  • чтобы не происходило постоянного роста, сделана дефрагментация свободных участков, запускается в момент запроса выделения нового участка в зависимости от текущего общего свободного места. В этом месте могут быть тормоза конечно, в этот момент.

Для гарантии доставки сообщений используется БД Redis:

  • в БД сохраняются привязки уникальных имен клиентов к их адресам и топикам

  • номера последних отправленных и полученных сообщений

  • сами сообщения, которые не были отправлены по какой-либо причине

Сейчас посмотрим пример отправки 10к сообщений, каждое размером по 1024байт.

use std::time::SystemTime;
use std::{thread, time};
use std::sync::{ Arc, Mutex};

use liner_broker::Liner;

fn  main() {

    let mut client1 = Liner::new("client1", "topic_client1", "localhost:2255", "redis://localhost/");
    let mut client2 = Liner::new("client2", "topic_client2", "localhost:2256", "redis://localhost/");
   
    client1.clear_stored_messages();
    client2.clear_stored_messages();

    client1.clear_addresses_of_topic();
    client2.clear_addresses_of_topic();

    const MESS_SEND_COUNT: usize = 10000;
    const MESS_SIZE: usize = 1024;
    const SEND_CYCLE_COUNT: usize = 10;

    let mut receive_count: i32 = 0;
    let send_end = Arc::new(Mutex::new(0));
    let _send_end = send_end.clone();

    client1.run(Box::new(|_to: &str, _from: &str,  _data: &[u8]|{
        println!("receive_from {}", _from);
    }));
    client2.run(Box::new(move |_to: &str, _from: &str,  _data: &[u8]|{
        receive_count += 1;    
        if receive_count == MESS_SEND_COUNT as i32{
            receive_count = 0;
            println!("receive_from {} ms", current_time_ms() - *_send_end.lock().unwrap());
        }
    }));

    let array = [0; MESS_SIZE];
    for _ in 0..SEND_CYCLE_COUNT{
        let send_begin = current_time_ms();
        for _ in 0..MESS_SEND_COUNT{
            client1.send_to("topic_client2", array.as_slice());
        }
        let send = current_time_ms();
        println!("send_to {} ms", send - send_begin);  
        *send_end.lock().unwrap() = send;     
    
        thread::sleep(time::Duration::from_millis(1000));
    }
}

fn current_time_ms()->u64{ 
    SystemTime::now()
    .duration_since(SystemTime::UNIX_EPOCH)
    .unwrap()
    .as_millis() as u64
}
alex@ubuntu2004:~/projects/rust/liner/cpp$ cargo build --release
   Compiling liner_broker v1.1.2 (/home/alex/projects/rust/liner)
    Finished `release` profile [optimized] target(s) in 5.62s
alex@ubuntu2004:~/projects/rust/liner/cpp$ cd ../target/release/
alex@ubuntu2004:~/projects/rust/liner/target/release$ ./throughput_10k 
send_to 8 ms
receive_from 8 ms
send_to 5 ms
receive_from 5 ms
send_to 7 ms
receive_from 3 ms
send_to 11 ms
receive_from 3 ms
send_to 6 ms
receive_from 3 ms
send_to 3 ms
receive_from 4 ms
send_to 6 ms
receive_from 4 ms
send_to 12 ms
receive_from 3 ms
send_to 7 ms
receive_from 4 ms
send_to 7 ms
receive_from 3 ms

В итоге в среднем имеем 10мс общего времени.

В плюсовом клиенте медленнее получилось в 2 раза, думаю из-за косвенного вызова колбека. Посмотрим код плюсового примера ради интереса:

#include "liner_broker.h"

#include <ctime>
#include <iostream>
#include <chrono>
#include <thread>

const int MESS_SEND_COUNT = 10000;
const int MESS_SIZE = 1024;
const int SEND_CYCLE_COUNT = 30;

int main(int argc, char* argv[])
{  
    auto client1 = LinerBroker("client1", "topic_client1", "localhost:2255", "redis://localhost/");
    auto client2 = LinerBroker("client2", "topic_client2", "localhost:2256", "redis://localhost/");
 
    int receive_count = 0;
    clock_t send_begin = clock();
    clock_t send_end = clock();

    client1.run([](const std::string& to, const std::string& from, const std::string& data){});
    client2.run([&receive_count, &send_end](const std::string& to, const std::string& from, const std::string& data){
        receive_count += 1;
        if (receive_count == MESS_SEND_COUNT){
            receive_count = 0;
            std::cout << "receive_from " << 1000.0 * (clock() - send_end) / CLOCKS_PER_SEC << " ms" << std::endl;
        }
    });
    
    char data[MESS_SIZE];
    for (int i = 0; i < SEND_CYCLE_COUNT; ++i){
        send_begin = clock();
        for (int j = 0; j < MESS_SEND_COUNT; ++j){
            client1.sendTo("topic_client2", data);
        }
        send_end = clock();
        std::cout << "send_to " << 1000.0 * (send_end - send_begin) / CLOCKS_PER_SEC << " ms" << std::endl;
        std::this_thread::sleep_for(std::chrono::milliseconds(1000));
    }    
}

Теперь запустим аналогичный пример для ZeroMQ

#include <iostream>
#include <zmq_addon.hpp>
#include <ctime>
#include <chrono>
#include <thread>

int main()
{
    zmq::context_t ctx;
    zmq::socket_t sock1(ctx, zmq::socket_type::push);
    zmq::socket_t sock2(ctx, zmq::socket_type::pull);
    sock1.bind("tcp://127.0.0.1:*");
    const std::string last_endpoint =
        sock1.get(zmq::sockopt::last_endpoint);
    std::cout << "Connecting to "
              << last_endpoint << std::endl;
    sock2.connect(last_endpoint);

    std::vector<zmq::const_buffer> send_msgs;
    char mess[1024];
    for (int i = 0; i < 10000; ++i){
        send_msgs.push_back(zmq::str_buffer(mess));
    }
    for (int i = 0; i < 10; ++i){
        auto send_begin = clock();
    
        if (!zmq::send_multipart(sock1, send_msgs))
            return 1;

        std::vector<zmq::message_t> recv_msgs;
        const auto ret = zmq::recv_multipart(
            sock2, std::back_inserter(recv_msgs));
        if (!ret)
            return 1;
        
        auto receive_end = clock();
        std::cout << "send_to " << 1000.0 * (receive_end - send_begin) / CLOCKS_PER_SEC << " ms" << std::endl;
        std::this_thread::sleep_for(std::chrono::milliseconds(1000));
    }
    return 0;
}
alex@ubuntu2004:~/projects/rust/liner$ cd benchmark/compare_with_zeromq/
alex@ubuntu2004:~/projects/rust/liner/benchmark/compare_with_zeromq$ make
g++ -Wall -O2 -std=c++17 -g -Wno-write-strings -o compare_with_zmq compare_with_zmq.cpp -lzmq
alex@ubuntu2004:~/projects/rust/liner/benchmark/compare_with_zeromq$ ./compare_with_zmq 
Connecting to tcp://127.0.0.1:34079
send_to 20.198 ms
send_to 16.504 ms
send_to 11.5 ms
send_to 13.153 ms
send_to 10.964 ms
send_to 10.788 ms
send_to 10.785 ms
send_to 11.119 ms
send_to 11.348 ms
send_to 10.826 ms

Имеем примерно тоже самое. Скорее всего ZeroMQ еще можно как-то подкрутить (сокеты на пайпы заменить, буферами внутренними поиграться и тп) и он быстрее будет думаю.

Ну вот и все пожалуй.

Пока в планах следующие задачки:

  • тестов побольше надо сделать, сейчас есть несколько

  • веб-страничку сделать с описанием, примерами, кейсами типовыми

  • интерфейсы для других языков: Go, C#, Java

  • ошибки думаю возвращать через колбек, сейчас в поток ошибок пишет, мбыть неудобно

  • мемпул протестить, насколько он тормозит при дефрагментации, мбыть что-нидь там улучшить

  • опционально выбирать другие БД (добавить sqlite, duckDb) или вообще без БД, если не нужна гарантированная доставка

  • расширить на другие протоколы внутри (сейчас только TCP)

  • статьи восторженные написать надо кому-то на реддите, медиуме..

Присоединяйтесь к разработке кто хочет-может, или продвижению, если есть интерес к этому делу.

Спасибо.

PS: лицензия MIT, ссылка на гитхаб