Недавно мне в голову пришла мысль о том, как бы получать список техники, на которой, например, установлен последний патч системы или нужное ПО. Можно написать скрипты, но, как мне кажется, это не совсем удобно и безопасно, особенно когда парк машин неоднообразен. Поэтому я предлагаю реализовать систему, где вся запрашиваемая информация будет собираться с помощью агентов на конечных устройствах в реальном времени с помощью специального языка запросов.

Как я думаю, у этой системы довольно большой потенциал, ведь его можно доработать и превратить в какой-нибудь аналог Ansible. Или в базу для SIEM системы. Или прикрутить к какой-нибудь ITSM-системе, чтобы проводить инвентаризацию по нужным параметрам.

Вся система будет строиться на gRPC, так как он из коробки идёт со всем необходимы, включая простую интеграцию систем аутентификации и TLS, плюс описание интерфейсов в .proto файлах позволит писать клиенты и агентов на различных языках и фреймворках. Единственный минус, так это то, что двунаправленное взаимодействие не укладывается в философию фреймворка, но это решим в процессе написания.

Как это будет работать

Наша система будет представлять из себя агентов, которые устанавливают соединение с севером, а сервер уже будет по этим соединениям отправлять команды и получать результаты выполнения. А отправлять команды на сервер будем посредством другого, простого gRPC клиента, который также будет устанавливать соединение с сервером, но работать с другим сервисом.

Почему бы просто не прикрутить какую-нибудь командую строку серверу, чтобы сразу на нём выполнять команды? Мне показалось такое решение довольно неудобным, особенно в малоуправляемых средах по типу Docker-контейнера. Будет куда практичнее сделать именно так, как я описал выше. Это позволит даже с помощью скриптов запрашивать нужную информацию с сервера и не придётся городить для этого трёхэтажные костыли. Если я в чём-то ошибаюсь, то поправьте меня, пожалуйста, в комментариях.

gRPC

Начнём с того, что опишем весь интерфейс взаимодействия сервера с агентами и клиентами. Для этого создадим в корне проекта папку proto и создадим файл main.proto, в котором опишем сервис Excavator. Да, хотелось какое-то вычурное и необычное имя, которое будет олицетворять какую-то тяжёлую работу.

Как можете наблюдать ниже, агент-серверное общение будет происходить посредством стримов, это позволит достичь двунаправленное общение. Чтобы особо не путаться в типах сообщений, я решил просто объединить их в одно.

И наличие ExcavatorHeartbeat может показаться избыточным и ненужным, но оно на самом деле нужно для того, чтобы установить соединение.

syntax = "proto3";

package main;

service Excavator {
  rpc RunExcavator (stream ExcavatorMessage) returns (stream ExcavatorMessage);
}

message ExcavatorHeartbeat {}

message ExcavatorCommandArg {
  string key = 1;
  string value = 2;
}

message ExcavatorCommand {
  string uid = 1;
  string name = 2;
  repeated ExcavatorCommandArg args = 3;
}

message ExcavatorResponseResult {
  string key = 1;
  string value = 2;
}

message ExcavatorResponse {
  string uid = 1;
  int64 code = 2;
  repeated ExcavatorResponseResult results = 3;
}

message ExcavatorMessage {
  oneof request {
    ExcavatorHeartbeat heartbeat = 1;
    ExcavatorResponse response = 2;
    ExcavatorCommand command = 3;
  }
}

Теперь можем описать клиент-серверное общение, для этого создадим ещё один файл client.proto и опишем сервис Query, содержащий два метода: для запросов и логов. Да, будем отправлять логи сервера на клиент.

Как можете наблюдать, стримов тут нет. Это всё из-за того, что не сервер будет обращаться к клиенту, а клиент к серверу.

syntax = "proto3";

package client;

service Query {
  rpc Query (QueryRequest) returns (QueryResponse);
  rpc Logs (LogsQueryRequest) returns (stream LogsQueryResponse);
}

message LogsQueryRequest {}

message LogsQueryResponse {
  repeated string log = 1;
}

message QueryRequest {
  string command = 1;
}

message TableCol {
  string key = 1;
  string data = 2;
}

message TableRow {
  repeated TableCol cols = 1;
}

message QueryResponse {
  repeated TableRow rows = 1;
}

Сервер

Предлагаю начать от того, куда все будут пытаться подключиться, а именно с сервера. Для этого давайте инициализируем проект. Но для начала создадим Cargo.toml в корне проекта.

[workspace]
resolver = "3"

Теперь можем инициализировать проект сервера следующей командой, она автоматом добавит проект в члены воркспейса. Но также создадим проект библиотеки, которую будут использовать все проекты для генерации кода gRPC.

cargo new server
cargo new --lib build-common

Библиотека будет очень миниатюрная, будет содержать лишь пару функций для генерации кода в build.rs. Но для начала перейдём в этот проект и в зависимости добавим следующее:

[package]
name = "build-common"
version = "0.1.0"
edition = "2024"

[dependencies]
tonic-prost-build = "0.14"
glob = "0.3"

Теперь в lib.rs можем написать следующие функции:

use std::{env, fs, io, path::Path};

pub const PROTO_FOLDER: &str = "proto";

pub fn get_proto_folder() -> std::path::PathBuf {
    Path::new(&env::var("CARGO_MANIFEST_DIR").expect("CARGO_WORKSPACE not set"))
        .join("..")
        .join(PROTO_FOLDER)
}

pub fn compile_protos_folder(folder: impl AsRef<Path>) -> io::Result<()> {
    let folder = folder.as_ref();
    println!("cargo:rerun-if-changed={}/*", folder.display());
    for entry in fs::read_dir(folder)? {
        tonic_prost_build::compile_protos(entry.unwrap().path())?;
    }
    Ok(())
}

Откроем server/Cargo.toml и добавим этот проект в build-зависимости проекта сервера. Также сразу в обычные зависимости добавляем следующие крейты, они потребуются для кодогенерации.

[package]
name = "server"
version = "0.1.0"
edition = "2024"

[dependencies]
tokio = { version = "1.41", features = ["full"] }
prost = "0.14"
tonic = { version = "0.14", features = ["tls-ring"] }
tonic-prost = "0.14"
hex = "0.4"
tokio-stream = "0.1"
lazy_static = "1.5"
futures = "0.3"
small_uid = "0.2.4"
nom = "8.0.0"

[build-dependencies]
build-common = { path = "../build-common" }

Создадим файл server\build.rs и добавим туда следующие строки:

fn main() {
    let proto_folder = build_common::get_proto_folder();
    build_common::compile_protos_folder(&proto_folder).unwrap();
}

Если проект собирается, то код успешно генерируется. Чтобы взаимодействовать с ним, создадим папку server\proto с файлом mod.rs, в нём добавим следующие строчки:

tonic::include_proto!("main");
tonic::include_proto!("client");

Предлагаю продолжить с чего-то простого, как, например, реализация логов. Для этого создадим модуль client.rs в server\src\proto. Все логи будем хранить в глобальном векторе и будем динамически подгружать их клиентам с помощью стрима. Чтобы ручками каждый раз не делать всё это, создадим глобальную структуру LogsManager, которая будет помещать логи в этот вектор и отправлять в поток.

В целом, думаю, тут нечего объяснять, всё довольно просто. Кроме того, что lazy_static нужен из-за того, что Mutex из tokio не const-совместимый, как, например, Mutex из стандартной библиотеки, поэтому просто так в статике его не инициализировать.

Обёртка в виде LogsReceiver нужен чисто для того, чтобы не запутаться в каналах, компилятор будет защищать нас от глупых ошибок. Наверное.

lazy_static! {
    pub static ref LOGS_MANAGER: LogsManager = LogsManager(broadcast::channel(128).0);
    static ref LOGS: Mutex<Vec<String>> = Mutex::new(Vec::new());
}

pub struct LogsManager(broadcast::Sender<String>);

impl LogsManager {
    pub async fn send_log(&self, log: String) {
        LOGS.lock().await.push(log.clone());
        if let Err(err) = self.0.send(log) {
            match err {
                SendError(log) => {
                    LOGS.lock().await.push(format!("dropped log when no subscribers: '{log}'"));
                }
            }
        }
    }

    pub fn subscribe(&self) -> LogsReceiver {
        LogsReceiver(self.0.subscribe())
    }
}

pub struct LogsReceiver(broadcast::Receiver<String>);

impl Deref for LogsReceiver {
    type Target = broadcast::Receiver<String>;

    fn deref(&self) -> &Self::Target {
        &self.0
    }
}

Давайте реализуем сервис Query. Для этого создадим структуру QueryService, которая будет содержать ссылку на наш канал LogsReceiver.

pub struct QueryService(LogsReceiver);

impl QueryService {
    pub fn new(logs: LogsReceiver) -> Self {
        Self(logs)
    }
}

Имплементируем сгенерированный из proto-файла трейт Query для созданной структуры. Метод query будет пока пустым, чуть позже его заполним. В силу того, что методы асинхронные, а нормальной поддержки асинхронных трейтов в языке ещё нет, используется реэкспорт async_trait, помогающий этого добиться.

#[tonic::async_trait]
impl Query for QueryService {
    async fn query(&self, request: Request<QueryRequest>) -> Result<Response<QueryResponse>, Status> {
        todo!()
}

    type LogsStream = Pin<Box<dyn Stream<Item = Result<LogsQueryResponse, Status>> + Send>>;
    async fn logs(&self, _: Request<LogsQueryRequest>) -> Result<Response<Self::LogsStream>, Status> {
        let (tx, rx) = tokio::sync::mpsc::channel(128);

        let mut logs_rx = self.0.resubscribe();

        // Свободно плавающий поток, который будет слушать поток от LogsManager
        // и отправлять в стрим новые логи.
        tokio::spawn(async move {
            let log = LOGS.lock().await.clone();
            let entry = LogsQueryResponse { log };
            // при первом подключении отправляем все логи сразу
            tx.send(Ok(entry)).await.expect("failed to send log");

            // динамически отправляем новые логи
            while let Ok(log) = logs_rx.recv().await {
                let entry = LogsQueryResponse { log: vec![log] };
                tx.send(Ok(entry)).await.expect("failed to send log");
            }
        });

        // Создаём поток из канала с помощью обёртки из tokio-stream
        let stream = ReceiverStream::new(rx);
        Ok(Response::new(Box::pin(stream)))
    }
}

Перед тем, как продолжить с реализацией метода query, предлагаю сначала реализовать небольшой язык запросов. В силу того, что на момент написания статьи мне было лень его прорабатывать, он будет очень простым и будет состоять всего из нескольких выражений. Для этого создадим модуль query.rs в server\src.

Для реализации парсера воспользуемся комбинаторами из крейта nom. Что такое парсинг-комбинаторы можете ознакомиться в документации, ссылку на которую я привёл.

Первые наши команды будут выглядеть как-то так. То есть первая команда будет составлять список агентов по какому-то критерию, а вторая будет делать запросы к конкретному агенту. Запросы будут выглядеть как функции с массивом аргументов.

list by field_name
from agent select say_hello(name = world), machine(get = name)

Начнём с того, что опишем AST структурами Rust. То есть мы будем перемещаться по тексту запроса с помощью комбинаторов и превращать его в структуры. Как можете наблюдать, AST будет очень простой, так как те же числа мы не будем парсить, поэтому язык будет пока довольно ограничен, но для примера хватит, я думаю.

#[derive(Debug, PartialEq)]
pub struct InvokeFuncArg {
    pub name: String,
    pub value: String,
}

#[derive(Debug, PartialEq)]
pub struct InvokeFunc {
    pub name: String,
    pub args: Vec<InvokeFuncArg>,
}

#[derive(Debug, PartialEq)]
pub enum QueryExpr {
    ListBy(String),
    SelectFrom { from: String, select: Vec<InvokeFunc> },
}

Начнём от частного к общему, то есть реализуем сначала составные части, по типу парсеров значений и будем двигаться в сторону парсера всего выражения. Для начала реализуем парсеры identifier и field. Всё отличие в том, что первый парсит текст с числами, а второе просто текстовые значения. То есть будет запрещено в названиях полей использовать что-то, кроме буков.

fn identifier(input: &str) -> IResult<&str, &str> {
    // берёт символы до тех пор, пока функция char::is_alphanumeric не вернёт false, 
    // а потом возвращает эту подстроку
    take_while(char::is_alphanumeric).parse(input)
}

fn field(input: &str) -> IResult<&str, &str> {
    take_while(char::is_alphabetic).parse(input)
}

Думаю, этого хватит, чтобы реализовать парсер для выражения list by field_name. Парсер tag берёт какую-то строку и проверяет, соответствует ли входная строка этому значению.

Все парсеры возвращают слайс оставшегося входного текста. То есть если на вход этому парсеру вошла строчка list by field_name, то вернёт от оставшегося текста слайс field_name.

Парсер space1 - встроенный в крейт парсер, который парсит пробелы. Число в конце функции означает то, что должен встретиться минимум один пробел. Есть аналогичный парсер space0, который делает то же самое, но пробелов может вовсе не быть.

fn list_by(input: &str) -> IResult<&str, QueryExpr> {
    let (input, _) = tag("list by").parse(input)?;
    let (input, _) = space1(input)?;
    let (input, field_name) = field(input)?;

    Ok((input, QueryExpr::ListBy(field_name.to_string())))
}

Давайте реализуем парсер функций. Для этого имплементируем парсер аргументов. Он тоже довольно простой, ведь сначала мы реализуем парсер одного аргумента invoke_arg, а потом с помощью парсера separated_list0, принимающий на вход два парсера: разделителя и значения.

Если честно, то не понял разницы между separated_list0 и separated_list1, так как у них поведение идентичное: выбрасывают ошибку, если один из парсеров на входе выдал ошибку. Так как у нас тут не может встретиться EOF, то это не проблема, но далее это создаст проблему.

fn invoke_arg(input: &str) -> IResult<&str, InvokeFuncArg> {
    // считываем название аргумента
    let (input, name) = field(input)?;

    // возможные пробелы перед и после знака '='
    let (input, _) = space0(input)?;
    let (input, _) = char('=').parse(input)?; // то же самое, что парсер `tag`, но на вход принимает исключительно символы
    let (input, _) = space0(input)?;

    // считываем значение аргумента
    let (input, value) = identifier(input)?;

    Ok((
        input,
        InvokeFuncArg {
            name: name.to_string(),
            value: value.to_string(),
        },
    ))
}

fn invoke_args(input: &str) -> IResult<&str, Vec<InvokeFuncArg>> {
    separated_list0(invoke_list_separate, invoke_arg).parse(input)
}

fn invoke_list_separate(input: &str) -> IResult<&str, ()> {
    let (input, _) = space0(input)?;
    let (input, _) = char(',').parse(input)?;
    let (input, _) = space0(input)?;
    Ok((input, ()))
}

Реализуем парсер функции и списка функций. Так как парсер функции очень простой, то не буду на нём подробно останавливаться.

У нас обязательно присутствует хоть одна функция, поэтому сначала вызываем парсер функции и добавляем результат его парсинга в вектор. Далее сепаратор и последующие вызовы функций опциональны, но если у нас может быть плавающая запятая, поэтому мы не возвращаем ошибку из парсера invoke_list_separate, а лишь прерываем цикл и возвращаем массив распарсенных функций; а вот вызов последующих функций обязателен, поэтому возвращаем ошибку из invoke_func. Но если удалось распарсить функцию, то сохраняем слайс, чтобы парсер не ушёл в бесконечный цикл парсинга одного и того же, и полученную функцию в вектор.

Как упоминалось ранее, все эти костыли нужны из-за того, что separated_list0 выкидывает ошибку при встрече EOF, который может встретиться в данном случае.

// парсер одной функции
fn invoke_func(input: &str) -> IResult<&str, InvokeFunc> {
    let (input, name) = map(field, |s| s.to_string()).parse(input)?;
    let (input, _) = space0(input)?;
    let (input, _) = char('(').parse(input)?;
    let (input, _) = space0(input)?;
    let (input, args) = invoke_args(input)?;
    let (input, _) = space0(input)?;
    let (input, _) = char(')').parse(input)?;

    Ok((input, InvokeFunc { name, args }))
}

// парсер списка функций
fn invoke_list(input: &str) -> IResult<&str, Vec<InvokeFunc>> {
    let (input, first) = invoke_func(input)?;
    let mut acc = vec![first];

    let mut input = input;
    while let Ok((i, _)) = invoke_list_separate(input) {
        let (i, func) = invoke_func(i)?;
        acc.push(func);
        input = i;
    }

    Ok((input, acc))
}

Далее можем реализовать парсер более сложного выражения from agent select say_hello(name = world), machine(get = name). Тут тоже ничего особенного, поэтому перейдём сразу к функции parse_query, где есть парсер alt, который запускает последовательно кортеж/вектор переданных в него парсеров пока один из них не вернёт Ok. То есть сначала запускаем парсер list_by и если парсер tag("list by") возвращает Err, то эта ошибка возвращается выше в комбинатор alt, который, в свою очередь, запускает далее парсер select_from.

fn select_from(input: &str) -> IResult<&str, QueryExpr> {
    let (input, _) = tag("from").parse(input)?;
    let (input, _) = space1(input)?;
    let (input, from) = map(identifier, |s| s.to_string()).parse(input)?;
    let (input, _) = space1(input)?;

    let (input, _) = tag("select").parse(input)?;
    let (input, _) = space1(input)?;
    let (input, invoke_list) = invoke_list(input)?;

    Ok((input, QueryExpr::SelectFrom { from, select: invoke_list }))
}

pub async fn parse_query(query: &str) -> Result<QueryExpr, QueryParseError> {
    let query = alt((list_by, select_from)).parse(query);

    match query {
        Ok(result) => Ok(result.1),
        Err(err) => {
            LOGS_MANAGER.send_log(format!("parse error: {err}")).await;
            Err(QueryParseError)
        }
    }
}

Но толку от этого всего, если нельзя сделать выборку нужных устройств. Поэтому предлагаю немного усложнить синтаксис нашего языка запросов и добавить стейтмент where, после которого будем вычислять boolean значение, включая запросы к агентам, которые будут выполнены в виде функций.

Первое, что нам потребуется сделать, это дополнить модель AST. Будем использовать метод рекурсивного спуска, поэтому добавляем детерминированные значения Term и Value в перечисления QueryConditionExpr и QueryConditionTerm, соответственно.

#[derive(Debug, PartialEq, Clone)]
pub struct InvokeFuncArg {
    pub name: String,
    pub value: String,
}

#[derive(Debug, PartialEq, Clone)]
pub struct InvokeFunc {
    pub name: String,
    pub args: Vec<InvokeFuncArg>,
}

#[derive(Debug, PartialEq, Clone)]
pub enum QueryValue {
    FnField { func: InvokeFunc, field: String },
    Identifier(String),
    String(String),
    Number(f64),
    Bool(bool),
    Null,
}

impl Display for QueryValue {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        let str = match self {
            QueryValue::FnField { func, field } => format!(
                "{}({}).{field}",
                func.name,
                func.args
                    .iter()
                    .map(|arg| format!("{}={}", arg.name, arg.value))
                    .collect::<Vec<_>>()
                    .join(",")
            ),
            QueryValue::Identifier(s) => s.clone(),
            QueryValue::String(s) => s.clone(),
            QueryValue::Number(n) => n.to_string(),
            QueryValue::Bool(b) => b.to_string(),
            QueryValue::Null => "null".to_string(),
        };
        write!(f, "{}", str)
    }
}

#[derive(Debug, PartialEq)]
enum QueryOperation {
    Eq,
    More,
    Less,
}

#[derive(Debug, PartialEq)]
pub enum QueryConditionTerm {
    Eq {
        left: Box<QueryConditionTerm>,
        right: Box<QueryConditionTerm>,
    },
    More {
        left: Box<QueryConditionTerm>,
        right: Box<QueryConditionTerm>,
    },
    Less {
        left: Box<QueryConditionTerm>,
        right: Box<QueryConditionTerm>,
    },
    Value(QueryValue),
}

#[derive(Debug, PartialEq)]
enum QueryExprOperation {
    And,
    Or,
}

#[derive(Debug, PartialEq)]
pub enum QueryConditionExpr {
    And {
        left: Box<QueryConditionExpr>,
        right: Box<QueryConditionExpr>,
    },
    Or {
        left: Box<QueryConditionExpr>,
        right: Box<QueryConditionExpr>,
    },
    Term(QueryConditionTerm),
}

#[derive(Debug, PartialEq)]
pub enum QueryExpr {
    ListBy { field: String, condition: Option<QueryConditionExpr> },
    SelectFrom { from: String, select: Vec<InvokeFunc> },
}

Теперь можем написать новенькие парсеры. Предлагаю начать с парсера value, который как раз будет парсить конечные значения в нашей модели.

Я, если честно, не уверен, как более правильно реализовать парсер с take_while, чтоб он не выдавал пустую строчку за идентификатор, поэтому воспользовался map_opt, чтоб тот завершал работу парсера ошибкой, если строчка пустая.

У нас есть парсер string, он очень простой, поэтому пока не будет обрабатывать строчки с экранированием символов. Если кому нужно экранирование, то в nom есть парсер escaped для таких целей.

fn identifier(input: &str) -> IResult<&str, QueryValue> {
    map_opt(take_while(|c: char| c.is_alphanumeric() || c == '_' || c == '-'), |s: &str| {
        if s.is_empty() {
            None
        }
        else {
            Some(QueryValue::Identifier(s.to_string()))
        }
    })
    .parse(input)
}

fn string(input: &str) -> IResult<&str, QueryValue> {
    map_opt(delimited(char('"'), take_till(|c: char| c == '"'), char('"')), |s: &str| {
        if s.is_empty() { None } else { Some(QueryValue::String(s.to_string())) }
    })
    .parse(input)
}

fn number(input: &str) -> IResult<&str, QueryValue> {
    map(double, QueryValue::Number).parse(input)
}

fn boolean(input: &str) -> IResult<&str, QueryValue> {
    map(alt((tag("true"), tag("false"))), |s: &str| QueryValue::Bool(s == "true")).parse(input)
}

fn null(input: &str) -> IResult<&str, QueryValue> {
    map(tag("null"), |_| QueryValue::Null).parse(input)
}

fn func_field(input: &str) -> IResult<&str, QueryValue> {
    map(separated_pair(invoke_func, char('.'), identifier), |(f, i)| QueryValue::FnField {
        func: f,
        field: i.to_string(),
    })
    .parse(input)
}

fn value(input: &str) -> IResult<&str, QueryValue> {
    alt((string, func_field, identifier, number, boolean, null)).parse(input)
}

Теперь можем рекурсивно распарсить выражение по типу info(type = modules).info = version, написав небольшой парсер, где мы сначала парсим гарантированно какое-то значение, а потом проверяем, идёт ли какая-нибудь операции далее, и если есть, то запускаем рекурсию.

fn operation(input: &str) -> IResult<&str, QueryOperation> {
    alt((
        map(char('='), |_| QueryOperation::Eq),
        map(char('<'), |_| QueryOperation::Less),
        map(char('>'), |_| QueryOperation::More),
    ))
    .parse(input)
}

fn condition_term(input: &str) -> IResult<&str, QueryConditionTerm> {
    let (input, left) = map(value, QueryConditionTerm::Value).parse(input)?;
    let (input, _) = space0(input)?;

    let (input, op) = opt(operation).parse(input)?;
    if let Some(op) = op {
        let (input, _) = space0(input)?;
        let (input, right) = condition_term(input)?;

        Ok((
            input,
            match op {
                QueryOperation::Eq => QueryConditionTerm::Eq {
                    left: Box::new(left),
                    right: Box::new(right),
                },
                QueryOperation::More => QueryConditionTerm::More {
                    left: Box::new(left),
                    right: Box::new(right),
                },
                QueryOperation::Less => QueryConditionTerm::Less {
                    left: Box::new(left),
                    right: Box::new(right),
                },
            },
        ))
    }
    else {
        Ok((input, left))
    }
}

Можем проделать то же самое с выражениями по типу true & false, то есть term & term.

fn condition_expr_op(input: &str) -> IResult<&str, QueryExprOperation> {
    alt((map(char('&'), |_| QueryExprOperation::And), map(char('|'), |_| QueryExprOperation::Or))).parse(input)
}

fn condition_expr(input: &str) -> IResult<&str, QueryConditionExpr> {
    let (input, left) = map(condition_term, QueryConditionExpr::Term).parse(input)?;
    let (input, _) = space0(input)?;

    let (input, op) = opt(condition_expr_op).parse(input)?;
    if let Some(op) = op {
        let (input, _) = space0(input)?;
        let (input, right) = condition_expr(input)?;

        Ok((
            input,
            match op {
                QueryExprOperation::And => QueryConditionExpr::And {
                    left: Box::new(left),
                    right: Box::new(right),
                },
                QueryExprOperation::Or => QueryConditionExpr::Or {
                    left: Box::new(left),
                    right: Box::new(right),
                },
            },
        ))
    }
    else {
        Ok((input, left))
    }
}

Доработает парсер тем, что будем парсить сначала стейтмент where, а поток рекурсивно условие.

fn condition(input: &str) -> IResult<&str, QueryConditionExpr> {
    let (input, _) = space1(input)?;
    let (input, _) = tag("where").parse(input)?;
    let (input, _) = space1(input)?;

    condition_expr(input)
}

fn list_by(input: &str) -> IResult<&str, QueryExpr> {
    let (input, _) = tag("list by").parse(input)?;
    let (input, _) = space1(input)?;
    let (input, field_name) = identifier(input)?;

    let (input, condition) = opt(condition).parse(input)?;

    Ok((
        input,
        QueryExpr::ListBy {
            field: field_name.to_string(),
            condition,
        },
    ))
}
Чтобы убедиться в том, что всё работает корректно, я написал несколько простых юнит-тестов
#[cfg(test)]
mod tests {
    use super::*;

    #[tokio::test]
    async fn test_parse_field_parser() {
        let query = "list by name";
        let result = identifier(&query[8..]);
        assert_eq!(result, Ok(("", QueryValue::Identifier("name".to_owned()))));
    }

    #[tokio::test]
    async fn test_parse_list_by() {
        let query = "list by name";
        let result = list_by(query);
        assert_eq!(
            result,
            Ok((
                "",
                QueryExpr::ListBy {
                    field: "name".to_owned(),
                    condition: None
                }
            ))
        );
    }

    #[tokio::test]
    async fn test_parse_list_by_where() {
        let query = "list by name where addr = \"127.0.0.1:8080\"";
        let result = list_by(query);
        assert_eq!(
            result,
            Ok((
                "",
                QueryExpr::ListBy {
                    field: "name".to_owned(),
                    condition: Some(QueryConditionExpr::Term(QueryConditionTerm::Eq {
                        left: Box::new(QueryConditionTerm::Value(QueryValue::Identifier("addr".to_string()))),
                        right: Box::new(QueryConditionTerm::Value(QueryValue::String("127.0.0.1:8080".to_string()))),
                    }))
                }
            ))
        );
    }

    #[tokio::test]
    async fn test_parse_list_by_where_invoke_func_field() {
        let query = "list by name where version().version = \"1.0.0\"";
        let result = list_by(query);
        assert_eq!(
            result,
            Ok((
                "",
                QueryExpr::ListBy {
                    field: "name".to_owned(),
                    condition: Some(QueryConditionExpr::Term(QueryConditionTerm::Eq {
                        left: Box::new(QueryConditionTerm::Value(QueryValue::FnField {
                            func: InvokeFunc {
                                name: "version".to_owned(),
                                args: vec![]
                            },
                            field: "version".to_owned()
                        })),
                        right: Box::new(QueryConditionTerm::Value(QueryValue::String("1.0.0".to_string()))),
                    }))
                }
            ))
        );
    }

    #[tokio::test]
    async fn test_parse_list_by_where_invoke_func_field_with_args() {
        let query = "list by name where info(type = modules).version = \"1.0.0\"";
        let result = list_by(query);
        assert_eq!(
            result,
            Ok((
                "",
                QueryExpr::ListBy {
                    field: "name".to_owned(),
                    condition: Some(QueryConditionExpr::Term(QueryConditionTerm::Eq {
                        left: Box::new(QueryConditionTerm::Value(QueryValue::FnField {
                            func: InvokeFunc {
                                name: "info".to_owned(),
                                args: vec![InvokeFuncArg {
                                    name: "type".to_owned(),
                                    value: "modules".to_owned(),
                                }],
                            },
                            field: "version".to_owned()
                        })),
                        right: Box::new(QueryConditionTerm::Value(QueryValue::String("1.0.0".to_string()))),
                    }))
                }
            ))
        );
    }

    #[tokio::test]
    async fn test_parse_list_by_where_invoke_func_field_with_args_and_other() {
        let query = "list by name where info(type = modules).version = \"version\" & version().version = \"0.1.0\"";
        let result = list_by(query);
        assert_eq!(
            result,
            Ok((
                "",
                QueryExpr::ListBy {
                    field: "name".to_owned(),
                    condition: Some(QueryConditionExpr::And {
                        left: Box::new(QueryConditionExpr::Term(QueryConditionTerm::Eq {
                            left: Box::new(QueryConditionTerm::Value(QueryValue::FnField {
                                func: InvokeFunc {
                                    name: "info".to_string(),
                                    args: vec![InvokeFuncArg {
                                        name: "type".to_string(),
                                        value: "modules".to_string(),
                                    }],
                                },
                                field: "version".to_string()
                            })),
                            right: Box::new(QueryConditionTerm::Value(QueryValue::String("version".to_string()))),
                        })),
                        right: Box::new(QueryConditionExpr::Term(QueryConditionTerm::Eq {
                            left: Box::new(QueryConditionTerm::Value(QueryValue::FnField {
                                func: InvokeFunc {
                                    name: "version".to_string(),
                                    args: vec![],
                                },
                                field: "version".to_string()
                            })),
                            right: Box::new(QueryConditionTerm::Value(QueryValue::String("0.1.0".to_string()))),
                        })),
                    })
                }
            ))
        );
    }

    #[tokio::test]
    async fn test_parse_identifier_parser() {
        let query = "from name select version()";
        let (rest, _) = tag::<_, _, nom::error::Error<&str>>("from").parse(query).unwrap();
        let (rest, _) = space1::<_, nom::error::Error<&str>>(rest).unwrap();

        let result = identifier(rest);

        assert_eq!(result, Ok((" select version()", QueryValue::Identifier("name".to_string()))));
    }

    #[tokio::test]
    async fn test_parse_space_parser() {
        let query = "from name select version()";
        let (rest, _) = tag::<_, _, nom::error::Error<&str>>("from").parse(query).unwrap();
        let result = space1::<_, nom::error::Error<&str>>(rest);
        assert_eq!(result, Ok(("name select version()", " ")));
    }

    #[tokio::test]
    async fn test_parse_more_space_parser() {
        let query = "from     name select version()";
        let (rest, _) = tag::<_, _, nom::error::Error<&str>>("from").parse(query).unwrap();
        let result = space1::<_, nom::error::Error<&str>>(rest);
        assert_eq!(result, Ok(("name select version()", "     ")));
    }

    #[tokio::test]
    async fn test_parse_from_parser() {
        let query = "from name select version()";
        let result = tag::<_, _, nom::error::Error<&str>>("from").parse(query);
        assert_eq!(result, Ok((" name select version()", "from")));
    }

    #[tokio::test]
    async fn test_parse_maybe_space_no_spaces() {
        let query = "";
        let result = space0::<_, nom::error::Error<&str>>(query);
        assert_eq!(result, Ok(("", "")))
    }

    #[tokio::test]
    async fn test_parse_maybe_space() {
        let query = "   ,";
        let result = space0::<_, nom::error::Error<&str>>(query);
        assert_eq!(result, Ok((",", "   ")))
    }

    #[tokio::test]
    async fn test_parse_invoke_list_separate() {
        let query = " ,  hello";
        let result = invoke_list_separate(query);
        assert_eq!(result, Ok(("hello", ())));
    }

    #[tokio::test]
    async fn test_parse_invoke_list_separate_without_space() {
        let query = ",hello";
        let result = invoke_list_separate(query);
        assert_eq!(result, Ok(("hello", ())));
    }

    #[tokio::test]
    async fn test_parse_no_invoke_args() {
        let query = "()";
        let (input, _) = char::<_, nom::error::Error<&str>>('(').parse(query).unwrap();
        let result = invoke_args(input);
        assert_eq!(result, Ok((")", vec![])));
    }

    #[tokio::test]
    async fn test_parse_invoke_args() {
        let query = "one = first, two = second)";
        let result = invoke_args(query);
        assert_eq!(
            result,
            Ok((
                ")",
                vec![
                    InvokeFuncArg {
                        name: "one".to_owned(),
                        value: "first".to_owned()
                    },
                    InvokeFuncArg {
                        name: "two".to_owned(),
                        value: "second".to_owned()
                    }
                ]
            ))
        )
    }

    #[tokio::test]
    async fn test_parse_invoke_func() {
        let query = "version()";
        let result = invoke_func(query);
        assert_eq!(
            result,
            Ok((
                "",
                InvokeFunc {
                    name: "version".to_string(),
                    args: vec![]
                }
            ))
        );
    }

    #[tokio::test]
    async fn test_parse_invoke_list() {
        let query = "version()";
        let result = invoke_list(query);
        assert_eq!(
            result,
            Ok((
                "",
                vec![InvokeFunc {
                    name: "version".to_string(),
                    args: vec![]
                }]
            ))
        )
    }

    #[tokio::test]
    async fn test_parse_invoke_list_many_invoke() {
        let query = "version(), hello()";
        let result = invoke_list(query);
        assert_eq!(
            result,
            Ok((
                "",
                vec![
                    InvokeFunc {
                        name: "version".to_string(),
                        args: vec![]
                    },
                    InvokeFunc {
                        name: "hello".to_string(),
                        args: vec![]
                    }
                ]
            ))
        )
    }

    #[tokio::test]
    async fn test_parse_select_from() {
        let query = "from name select version()";
        let result = select_from(query);
        assert_eq!(
            result,
            Ok((
                "",
                QueryExpr::SelectFrom {
                    from: "name".to_string(),
                    select: vec![InvokeFunc {
                        name: "version".to_string(),
                        args: vec![]
                    }]
                }
            ))
        )
    }

    #[tokio::test]
    async fn test_parse_query_select() {
        let query = "from name select version()";
        let result = parse_query(query).await;
        assert_eq!(
            result,
            Ok(QueryExpr::SelectFrom {
                from: "name".to_string(),
                select: vec![InvokeFunc {
                    name: "version".to_string(),
                    args: vec![]
                }]
            })
        );
    }
}

Отлично, можем продолжить, хотел бы сказать, что реализовывать трейт Query, но у нас пока нет пула соединений с агентами, поэтому предлагаю реализовать трейт сервиса Excavator. Для этого создам новый модуль server\src\excavator.rs.

Для начала давайте имплементируем AgentId, чтобы как-то идентифицировать агента в пуле.

#[derive(Eq, PartialEq, Hash)]
pub struct AgentInner {
    pub name: String,
    pub addr: SocketAddr,
}

#[derive(Clone)]
pub struct AgentId(Arc<Mutex<AgentInner>>);

impl Hash for AgentId {
    fn hash<H: Hasher>(&self, state: &mut H) {
        let inner = block_in_place(|| Handle::current().block_on(async { self.0.lock().await }));
        inner.name.hash(state);
    }
}

impl Eq for AgentId {}

impl PartialEq for AgentId {
    fn eq(&self, other: &Self) -> bool {
        block_in_place(|| {
            Handle::current().block_on(async {
                let left = self.0.lock().await;
                // 
                match other.0.try_lock() {
                    Ok(right) => *left == *right,
                    Err(_) => true,
                }
            })
        })
    }
}

impl Deref for AgentId {
    type Target = Arc<Mutex<AgentInner>>;

    fn deref(&self) -> &Self::Target {
        &self.0
    }
}

impl AgentId {
    pub fn new(name: &str, addr: SocketAddr) -> Self {
        let inner = AgentInner { name: name.to_owned(), addr };
        Self(Arc::new(Mutex::new(inner)))
    }

    pub fn generate(addr: SocketAddr) -> Self {
        // получаем слайс байтов из адреса агента
        let bytes: &[u8] = unsafe { std::slice::from_raw_parts(&addr as *const _ as *const u8, std::mem::size_of::<SocketAddr>()) };
        // кодируем половину этого массива в шестнадцатеричное представление
        let hex = hex::encode(&bytes[..bytes.len() / 2]);
        let name = format!("client{}", hex);

        let inner = AgentInner { name, addr };
        Self(Arc::new(Mutex::new(inner)))
    }
}

Ключ для идентификации агентов в пуле есть, а значением будем использовать AgentCommandManager, который будет хранить ссылки на каналы отправки-получения сообщений из стримов. И по мелочи будет отправлять команду агенту и дожидаться ответа.

pub struct AgentCommandManager {
    command_tx: mpsc::Sender<Result<ExcavatorMessage, Status>>,
    response_rx: broadcast::Receiver<ExcavatorResponse>,
}

impl AgentCommandManager {
    pub async fn send(&mut self, msg: ExcavatorCommand) -> Option<ExcavatorResponse> {
        let uid = msg.uid.clone();
        let msg = ExcavatorMessage {
            request: Some(excavator_message::Request::Command(msg)),
        };

        // отправляем команду
        self.command_tx.send(Ok(msg)).await.expect("channel closed");
        
        // дожидаемся нужного ответа из потока
        while let Ok(msg) = self.response_rx.recv().await {
            if msg.uid == uid {
                return Some(msg);
            }
        }

        None
    }
}

impl Clone for AgentCommandManager {
    fn clone(&self) -> Self {
        Self {
            command_tx: self.command_tx.clone(),
            response_rx: self.response_rx.resubscribe(),
        }
    }
}

Для хранения всего этого будем использовать обычную HashMap, обёрнутую в структуру AgentsMap.

type AgentsMapInner = Arc<RwLock<HashMap<AgentId, AgentCommandManager>>>;

#[derive(Default, Clone)]
pub struct AgentsMap(AgentsMapInner);

impl Deref for AgentsMap {
    type Target = AgentsMapInner;

    fn deref(&self) -> &Self::Target {
        &self.0
    }
}

Отлично, теперь можем реализовать трейт Excavator. Для этого создадим структуру ExcavatorService, которая будет хранить ссылку на описанную ранее AgentsMap.

pub struct ExcavatorService(AgentsMap);

impl ExcavatorService {
    pub fn new(map: AgentsMap) -> Self {
        Self(map)
    }
}

#[tonic::async_trait]
impl Excavator for ExcavatorService {
    type RunExcavatorStream = Pin<Box<dyn Stream<Item = Result<ExcavatorMessage, Status>> + Send + 'static>>;

    async fn run_excavator(&self, request: Request<Streaming<ExcavatorMessage>>) -> Result<Response<Self::RunExcavatorStream>, Status> {
        todo!()
    }
}

Предлагаю для начала получить heartbeat-сообщение из стрима и распаковать его до ExcavatorMessage. Другие сообщения нам неинтересны, поэтому мы один раз вызываем stream.next().

async fn run_excavator(&self, request: Request<Streaming<ExcavatorMessage>>) -> Result<Response<Self::RunExcavatorStream>, Status> {
        let addr = request.remote_addr().ok_or(Status::aborted("remote address not found"))?;
        let mut stream = request.into_inner();

        match stream.next().await {
            Some(msg) => match msg {
                Ok(msg) => {
                    if let Some(heartbeat) = msg.request {
                        match heartbeat {

                            excavator_message::Request::Heartbeat(_) => {}

                            // остальные сообщения нам не интересны
                            _ => {
                                LOGS_MANAGER
                                    .send_log(format!(
                                        "the agent ({addr}) tried to connect, but the first message should be a heartbeat"
                                    ))
                                    .await;

                                Err(Status::failed_precondition("no heartbeat"))
                            }
                        }
                    }
                    else {
                        LOGS_MANAGER
                            .send_log(format!("the agent {addr} tried to connect, but no heartbeat was detected"))
                            .await;
                        Err(Status::failed_precondition("empty heartbeat"))
                    }
                }
                Err(err) => {
                    LOGS_MANAGER
                        .send_log(format!(
                            "agent {addr} tried to connect, but the connection could not be established due to: {err}"
                        ))
                        .await;

                    Err(Status::unknown("failed to connect"))
                }
            },
            None => {
                LOGS_MANAGER
                    .send_log(format!(
                        "the agent ({addr}) tried to connect, but it failed because no message was received"
                    ))
                    .await;

                Err(Status::unknown("failed to connect"))
            }
        }
    }

Теперь следите за руками, так как сейчас будет много различных каналов и стримов.

excavator_message::Request::Heartbeat(_) => {
    let id = AgentId::generate(addr);

    let (command_tx, command_rx) = mpsc::channel(128);
    let (response_tx, response_rx) = broadcast::channel(128);
    let manager = AgentCommandManager { command_tx, response_rx };

    self.0.write().await.insert(id.clone(), manager);

    // ...
}

Так как соединение установлено и каналы агента сохранены, теперь можем начать слушать другие сообщения. Делать это будем в отдельном потоке. А в данном потоке возвращаем стрим.

let id = AgentId::generate(addr);

let (command_tx, command_rx) = mpsc::channel(128);
let (response_tx, response_rx) = broadcast::channel(128);
let manager = AgentCommandManager { command_tx, response_rx };

self.0.write().await.insert(id.clone(), manager);

tokio::spawn({
    let id = id.clone();
    async move {
        while let Some(msg) = stream.next().await {
            // ...
        }
    }
});

LOGS_MANAGER
    .send_log(format!("agent ({}) connected successfully", id.lock().await.name))
    .await;

let stream = ReceiverStream::new(command_rx);
Ok(Response::new(Box::pin(stream) as Self::RunExcavatorStream))

Можем реализовать цикл событий, где нас интересует лишь ответы от агентов. Как только получаем его - отправляем его всем слушателям, то есть запросам от сервиса Query.

while let Some(msg) = stream.next().await {
    match msg {
        Ok(ExcavatorMessage { request: Some(request) }) => match request {
            // отправляем результат команды соответствующему клиенту
            excavator_message::Request::Response(response) => {
                if let Err(err) = response_tx.send(response) {
                    LOGS_MANAGER.send_log(format!("occurred error: {err}")).await;
                }
            }

            // остальные сообщения нам неинтересны
            excavator_message::Request::Heartbeat(_) => {
                LOGS_MANAGER.send_log(format!("agent ({}) sent heartbeat", id.lock().await.name)).await;
            }
            excavator_message::Request::Command(_) => {
                LOGS_MANAGER.send_log(format!("agent ({}) sent command", id.lock().await.name)).await;
            }
        },
        Err(err) => {
            LOGS_MANAGER.send_log(format!("occurred error: {err}")).await;
        }
        _ => {
            LOGS_MANAGER
                .send_log(format!("received empty message from {}", id.lock().await.name))
                .await;
        }
    }
}

async move {
    while let Some(msg) = stream.next().await {
        match msg {
            Ok(ExcavatorMessage { request: Some(request) }) => match request {
                // отправляем результат команды соответствующему клиенту
                excavator_message::Request::Response(response) => {
                    if let Err(err) = response_tx.send(response) {
                        LOGS_MANAGER.send_log(format!("occurred error: {err}")).await;
                        break;
                    }
                }

                // остальные сообщения нам неинтересны
                excavator_message::Request::Heartbeat(_) => {
                    LOGS_MANAGER.send_log(format!("agent ({}) sent heartbeat", id.lock().await.name)).await;
                }
                excavator_message::Request::Command(_) => {
                    LOGS_MANAGER.send_log(format!("agent ({}) sent command", id.lock().await.name)).await;
                }
            },
            Err(err) => {
                LOGS_MANAGER.send_log(format!("occurred error: {err}")).await;
                break;
            }
            _ => {
                LOGS_MANAGER
                    .send_log(format!("received empty message from {}", id.lock().await.name))
                    .await;
            }
        }
    }

    // если цикл завершился по какой-либо причине, то, вероятно, 
    // агент просто напросто отключился, поэтому удаляем его из пула агентов

    LOGS_MANAGER.send_log(format!("exit agent ({}) event loop", id.lock().await.name)).await;

    if map.write().await.remove_entry(&id).is_some() {
        LOGS_MANAGER.send_log(format!("agent ({}) disconnected", id.lock().await.name)).await;
    }
}

Всё, это вся реализация сервиса Excavator, довольно просто, не так ли? Теперь, после того, как реализовали AgentsMap, можем приступить к реализации метода query. Думаю, тут нет ничего сложного, поэтому предлагаю перейти к реализации запросов.

async fn query(&self, request: Request<QueryRequest>) -> Result<Response<QueryResponse>, Status> {
    let query = request.into_inner().command;

    if query.is_empty() {
        return Err(Status::invalid_argument("empty query"));
    }

    LOGS_MANAGER.send_log(format!("got query: '{query}'")).await;

    let query = parse_query(query.trim())
        .await
        .map_err(|_| Status::invalid_argument("failed to parse query"))?;

    match query {
        QueryExpr::ListBy(field) => todo!(),
        QueryExpr::SelectFrom { from, select } => todo!(),
    }
}

Но перед тем, как продолжить, давайте реализуем структуру EvalContext, которая будет хранить состояние нашего QueryConditionExpr. Вычислять конечное значение будем тоже через рекурсию.

#[derive(Error, Debug)]
enum EvalError {
    #[error("not a number: {0}")]
    NotNumber(QueryValue),
    #[error("not a boolean: {0}")]
    NotBool(QueryValue),
    #[error("invoke error {0}")]
    InvokeError(String),
}

fn must_number(val: &QueryValue) -> Result<f64, EvalError> {
    match val {
        QueryValue::Number(n) => Ok(*n),
        _ => Err(EvalError::NotNumber(val.clone())),
    }
}

struct EvalContext<'a> {
    agent: &'a mut AgentCommandManager,
}

impl<'a> EvalContext<'a> {
    fn new(agent: &'a mut AgentCommandManager) -> Self {
        Self { agent }
    }

    async fn eval(&mut self, expr: &QueryConditionExpr) -> Result<bool, EvalError> {
        match expr {
            QueryConditionExpr::And { left, right } => {
                let left = Box::pin(self.eval(left)).await?;
                let right = Box::pin(self.eval(right)).await?;
                Ok(left && right)
            }
            QueryConditionExpr::Or { left, right } => {
                let left = Box::pin(self.eval(left)).await?;
                let right = Box::pin(self.eval(right)).await?;
                Ok(left || right)
            }
            QueryConditionExpr::Term(term) => match self.eval_term(term).await? {
                QueryValue::Bool(b) => Ok(b),
                val => Err(EvalError::NotBool(val.clone())),
            },
        }
    }

    async fn eval_term(&mut self, term: &QueryConditionTerm) -> Result<QueryValue, EvalError> {
        match term {
            QueryConditionTerm::Eq { left, right } => Ok(QueryValue::Bool(Box::pin(self.eval_eq(left, right)).await?)),
            QueryConditionTerm::More { left, right } => Ok(QueryValue::Bool(Box::pin(self.eval_more(left, right)).await?)),
            QueryConditionTerm::Less { left, right } => Ok(QueryValue::Bool(Box::pin(self.eval_less(left, right)).await?)),
            QueryConditionTerm::Value(val) => Ok(val.clone()),
        }
    }

    async fn eval_eq(&mut self, left: &QueryConditionTerm, right: &QueryConditionTerm) -> Result<bool, EvalError> {
        let left = self.eval_term(left).await?;
        let right = self.eval_term(right).await?;

        if let QueryValue::FnField { func, field } = &left {
            // ...
        }
        else {
            Ok(left == right)
        }
    }

    async fn eval_less(&mut self, left: &QueryConditionTerm, right: &QueryConditionTerm) -> Result<bool, EvalError> {
        let left = must_number(&self.eval_term(left).await?)?;
        let right = must_number(&self.eval_term(right).await?)?;
        Ok(left < right)
    }

    async fn eval_more(&mut self, left: &QueryConditionTerm, right: &QueryConditionTerm) -> Result<bool, EvalError> {
        let left = must_number(&self.eval_term(left).await?)?;
        let right = must_number(&self.eval_term(right).await?)?;
        Ok(left > right)
    }
}

Всё довольно просто, но давайте реализуем выполнение функций, для чего делали интерфейс структуры асинхронным. Тут мы составляем команду, после чего отправляем её агенту и дожидаемся ответа. Как только ответ пришёл, проверяем соответствует ли хоть одна строчка ответа правому значению.

if let QueryValue::FnField { func, field } = &left {
    let cmd = ExcavatorCommand {
        uid: SmallUid::new().to_string(),
        name: func.name.clone(),
        args: func
            .args
            .iter()
            .map(|a| ExcavatorCommandArg {
                key: a.name.clone(),
                value: a.value.clone(),
            })
            .collect(),
    };

    Ok(self
        .agent
        .send(cmd)
        .await
        .ok_or(EvalError::InvokeError(format!("{}.{}", func.name, field)))?
        .results
        .iter()
        .any(|r| {
            r.key == *field
                && (match &right {
                    QueryValue::FnField { .. } => false,
                    QueryValue::Identifier(i) => r.value == *i,
                    QueryValue::String(s) => r.value == *s,
                    QueryValue::Number(n) => {
                        if let Ok(val) = r.value.parse::<f64>() {
                            val == *n
                        }
                        else {
                            false
                        }
                    }
                    QueryValue::Bool(b) => *b,
                    QueryValue::Null => r.value == "null",
                })
        }))
}

Теперь можем реализовать ListBy. Предлагаю начать с ветки без условия. Она довольно простая, ведь мы просто итерируемся по всему пулу агентов и в зависимости от вхожего значения field подставляем нужное значение, после чего мапим результат к нужному виду.

QueryExpr::ListBy { field, condition } => {
    let mut map = self.1.write().await;

    if let Some(condition) = condition {
        todo!()
    }
    else {
        let rows = stream::iter(map.keys())
            .filter_map(|k| {
                let field = field.clone();
                async move {
                    let id = k.lock().await;
                    Some(match field.as_str() {
                        "name" => id.name.clone(),
                        "addr" => id.addr.to_string(),
                        _ => return None,
                    })
                }
            })
            .map(|data| TableRow {
                cols: vec![TableCol { key: field.clone(), data }],
            })
            .collect::<Vec<_>>()
            .await;
        let response = Response::new(QueryResponse { rows });
        Ok(response)
    }
}

С условием делаем похожее, но в этот раз инициализируем наш вычислительный контекст, ловим результат и уже в зависимости от поля field подставляем результат.

if let Some(condition) = condition {
    let condition = Arc::new(condition);
    let rows = stream::iter(map.iter_mut())
        .filter_map({
            |(id, agent)| {
                let condition = condition.clone();
                let field = field.clone();
                async move {
                    let result = match EvalContext::new(agent).eval(&condition).await {
                        Ok(res) => res,
                        Err(err) => {
                            LOGS_MANAGER.send_log(format!("failed to evaluate condition: '{err}'")).await;
                            return None;
                        }
                    };

                    if result {
                        let id = id.lock().await;
                        Some(match field.as_str() {
                            "name" => id.name.clone(),
                            "addr" => id.addr.to_string(),
                            _ => return None,
                        })
                    }
                    else {
                        None
                    }
                }
            }
        })
        .map({
            let field = field.clone();
            move |r| TableRow {
                cols: vec![TableCol { key: field.clone(), data: r }],
            }
        })
        .collect::<Vec<_>>()
        .await;

    let response = Response::new(QueryResponse { rows });
    Ok(response)
}

Реализуем теперь SelectFrom. Для начала получаем агента. Если не получилось его найти по хэшу, то итерируемся по HashMap и ищем значение, которое начинается на полученную строку. Подглядел это у git, где достаточно лишь часть хэша коммита ввести, ведь у нас довольно длинное имя агента получается.

QueryExpr::SelectFrom { from, select } => {
    let map = self.1.read().await;
    let id = AgentId::new(&from, SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), 0)));

    let mut agent = if let Some(agent) = map.get(&id).cloned() {
        agent
    }
    else {
        stream::iter(map.iter())
            .filter_map(|(k, m)| Box::pin(async { if k.lock().await.name.starts_with(&from) { Some(m.clone()) } else { None } }))
            .next()
            .await
            .ok_or(Status::not_found("failed to get agent id"))?
    };

    // ...
}

Агента мы получили, но давайте теперь реализуем выполнение команд.

QueryExpr::SelectFrom { from, select } => {
    // ... 

    let mut rows = Vec::new();
    for crate::query::InvokeFunc { name, args } in select {
        let uid = SmallUid::new().to_string();
        let args = args.into_iter().map(|a| ExcavatorCommandArg { key: a.name, value: a.value }).collect();

        LOGS_MANAGER.send_log(format!("executing command: '{name}' with args: '{args:?}'")).await;

        let result = agent
            .send(ExcavatorCommand {
                uid,
                name: name.clone(),
                args,
            })
            .await;

        // преобразуем результат в табличный вид


        if let Some(response) = result {
            LOGS_MANAGER
                .send_log(format!("'{name}' command is done with status '{}'", response.code))
                .await;

            let cols = response
                .results
                .into_iter()
                .fold(HashMap::new(), |mut acc: HashMap<String, Vec<String>>, res| {
                    if let Some(col) = acc.get_mut(&res.key) {
                        col.push(res.value);
                    }
                    else {
                        acc.insert(res.key, vec![res.value]);
                    }
                    acc
                });

            if rows.is_empty() {
                for (col_name, values) in cols {
                    for row in values {
                        rows.push(TableRow {
                            cols: vec![TableCol {
                                key: col_name.clone(),
                                data: row,
                            }],
                        });
                    }
                }
            }
            else {
                for (col_name, values) in &cols {
                    for (i, r) in values.iter().enumerate() {
                        let row = rows.get_mut(i);
                        if let Some(row) = row {
                            row.cols.push(TableCol {
                                key: col_name.clone(),
                                data: r.clone(),
                            });
                        }
                        else {
                            let cols_len = rows.first().map(|r| r.cols.len()).unwrap_or(0);
                            let cols = (0..cols_len)
                                .filter_map(|i| {
                                    cols.iter().nth(i).map(|(c, _)| TableCol {
                                        key: c.clone(),
                                        data: Default::default(),
                                    })
                                })
                                .chain(vec![TableCol {
                                    key: col_name.clone(),
                                    data: r.clone(),
                                }])
                                .collect();

                            rows.push(TableRow { cols });
                        }
                    }
                }
            }
        }
        else {
            return Err(Status::cancelled("failed to execute command".to_string()));
        }
    }

    let response = Response::new(QueryResponse { rows });
    Ok(response)
}

Сервисы готовы, но сам сервер пока ничего не слушает и даже не знает ничего об этих сервисах, предлагаю это исправить. Теперь наш сервер слушает 1299 порт и готов отвечать на запросы.

#[tokio::main]
async fn main() {
    // переложим бремя парсинга адреса на `SocketAddr`
    let addr = format!("{}:{}", "0.0.0.0", 1299)
        .parse()
        .expect("could not parse address");

    let client_map = AgentsMap::default();

    LOGS_MANAGER.send_log(format!("server listening on {}", addr)).await;

    Server::builder()
        .add_service(ExcavatorServer::new(ExcavatorService::new(client_map.clone())))
        .add_service(QueryServer::new(QueryService::new(LOGS_MANAGER.subscribe(), client_map)))
        .serve(addr)
        .await
        .expect("failed to serve")
}

Агент

Сервер мы реализовали, но вот того, кто будет выполнять команды, нету. Предлагаю это исправить, создав проект agent.

cargo new agent

Сразу в зависимости добавим необходимые крейты.

[package]
name = "agent"
version = "0.1.0"
edition = "2024"

[dependencies]
tokio = { version = "1.52", features = ["rt", "rt-multi-thread", "fs", "macros"] }
tokio-stream = "0.1"
tonic = "0.14"
prost = "0.14"
tonic-prost = "0.14"
async-trait = "0.1"
knus = "3.3"

[build-dependencies]
build-common = { path = "../build-common" }

Создаём agent\build.rs со следующим содержимым. Оно немного отличается от предыдущего скрипта, так как будем ещё копировать файл agent.kdl, конфиг агента, в выходную директорию.

use std::{env, fs, path::PathBuf};

const CONFIG_FILE: &str = "agent.kdl";

fn main() {
    let proto_folder = build_common::get_proto_folder();
    build_common::compile_protos_folder(&proto_folder).unwrap();

    println!("cargo:rerun-if-changed={CONFIG_FILE}");
    let target_folder = PathBuf::from_iter([
        env::var("CARGO_MANIFEST_DIR").unwrap(),
        "..".to_owned(),
        "target".to_owned(),
        env::var("PROFILE").unwrap(),
    ]);

    let config_path = target_folder.join(CONFIG_FILE);
    fs::copy(CONFIG_FILE, &config_path).unwrap();
}

Предлагаю создать этот конфиг-файл agent\agent.kdl со следующим содержимым. Да, небогато, но это пока что.

server "127.0.0.1"

Но, чтобы распарсить этот конфиг, нужна модель для knus. Предлагаю описать её, создав модуль agent\src\config.rs.

#[derive(Decode)]
pub struct Server {
    #[knus(argument)]
    pub address: String,
    #[knus(argument)]
    pub port: Option<u16>,
}

#[derive(Decode)]
pub struct Config {
    #[knus(child)]
    pub server: Server,
}

Создаём модуль agent\src\proto.rs, где подключаем сгенерированный код gRPC.

tonic::include_proto!("main");

Но что это за агент, если он ничего не умеет. Предлагаю это решить тем, что создадим трейт Module в agent\src\modules\mod.rs, чтобы добиться модульности агента. Хотя я бы предпочёл заводить модули на сервере на каком-нибудь скриптовом языке или байт-коде и рассылать их агентам в случае необходимости, но на данный момент это слишком заморочено.

#[async_trait::async_trait]
pub trait Module {
    fn name(&self) -> &'static str;
    fn description(&self) -> &'static str;
    fn args(&self) -> Vec<ModuleArg>;
    async fn execute(&self, args: Args) -> ExecuteResult;
}

pub struct ExecuteResult {
    pub code: i64,
    pub output: Vec<String>,
}

pub struct ModuleArg {
    pub name: &'static str,
    pub description: &'static str,
    pub required: bool,
    pub default: Option<String>,
}

Модули нужно где-то хранить, поэтому предлагаю создать структуру ModulesRegistry. Так как у нас HashMap внутри RwLock и чтобы каждый раз не вызывать await после каждого вызова метода, то мы залочим RwLock и будем передавать ссылку на его внутренний HashMap.

type ModulesMap = HashMap<&'static str, Arc<dyn Module + Send + Sync>>;

pub struct ModulesRegistryBuilder<'a>(&'a mut ModulesMap);

impl ModulesRegistryBuilder<'_> {
    pub fn register(&mut self, module: impl Module + 'static + Send + Sync) -> &mut Self {
        self.0.insert(module.name(), Arc::new(module));
        self
    }
}

#[derive(Default, Clone)]
pub struct ModulesRegistry(Arc<RwLock<ModulesMap>>);

impl ModulesRegistry {
    pub async fn build<F>(&self, builder: F)
    where
        F: FnOnce(&mut ModulesRegistryBuilder, &Self),
    {
        let mut s = self.0.write().await;
        builder(&mut ModulesRegistryBuilder(&mut s), self);
    }

    pub async fn get(&self, name: &str) -> Option<Arc<dyn Module + Send + Sync>> {
        self.0.read().await.get(name).cloned()
    }

    pub async fn get_all(&self) -> Vec<&'static str> {
        self.0.read().await.values().map(|v| v.name()).collect()
    }
}

Давайте создадим пару модулей, они будут очень простые.

// agent\src\modules\info.rs
pub struct GetInfoModule(ModulesRegistry);

unsafe impl Send for GetInfoModule {}

impl GetInfoModule {
    pub fn new(registry: ModulesRegistry) -> Self {
        Self(registry)
    }
}

#[async_trait::async_trait]
impl Module for GetInfoModule {
    fn name(&self) -> &'static str {
        "info"
    }

    fn description(&self) -> &'static str {
        "get information about the agent modules"
    }

    fn args(&self) -> Vec<ModuleArg> {
        vec![
            ModuleArg {
                name: "type",
                description: "type of information to retrieve (modules, etc.)",
                required: true,
                default: None,
            },
            ModuleArg {
                name: "name",
                description: "name of the module to retrieve information about",
                required: false,
                default: None,
            },
        ]
    }

    async fn execute(&self, args: Args) -> ExecuteResult {
        let ty = args.get("type");
        if let Some(ty) = ty {
            match ty.as_str() {
                "modules" => {
                    let modules = self.0.get_all().await.into_iter().map(|s| s.to_owned()).collect();
                    ExecuteResult { code: 0, output: modules }
                }
                _ => ExecuteResult {
                    code: 1,
                    output: vec!["unknown type".to_string()],
                },
            }
        }
        else {
            ExecuteResult {
                code: 1,
                output: vec!["missing type parameter".to_string()],
            }
        }
    }
}


// agent\src\modules\version.rs
pub struct AgentVersionModule;

unsafe impl Send for AgentVersionModule {}

#[async_trait::async_trait]
impl Module for AgentVersionModule {
    fn name(&self) -> &'static str {
        "version"
    }

    fn description(&self) -> &'static str {
        "get agent version"
    }

    fn args(&self) -> Vec<ModuleArg> {
        vec![]
    }

    async fn execute(&self, _: Args) -> ExecuteResult {
        ExecuteResult {
            code: 0,
            output: vec![env!("CARGO_PKG_VERSION").to_string()],
        }
    }
}

Теперь можем инициализировать и зарегистрировать модули, после чего считать конфиг с адресом сервера.

#[tokio::main]
async fn main() {
    let registry = ModulesRegistry::default();

    registry.build(|builder, registry| {
        builder
            .register(AgentVersionModule)
            .register(GetInfoModule::new(registry.clone()));
    }).await;

    let config_path = path::use_config_path();
    let config_content = tokio::fs::read_to_string(&config_path).await.expect("unable to read config file");
    let config: Config = knus::parse("config", &config_content).expect("failed to parse config");
  
    // ...
}

К счастью, клиент не нужно реализовывать, кодогенерация всё сделала за нас и наш клиент готов, достаточно указать адрес сервера. И сразу посылаем heartbeat-сообщение и получаем поток.

#[tokio::main]
async fn main() {
    // ...

    let addr = format!("http://{}:{}", config.server.address, config.server.port.unwrap_or(1299));
    println!("listening server at {}", addr);
    let mut client = ExcavatorClient::connect(addr).await.expect("failed to connect to excavator");

    let (tx, rx) = tokio::sync::mpsc::channel(128);
    tx.send(ExcavatorMessage {
        request: Some(Request::Heartbeat(ExcavatorHeartbeat::default())),
    })
    .await
    .expect("failed to send heartbeat");

    let mut stream = client
        .run_excavator(ReceiverStream::new(rx))
        .await
        .expect("failed to run excavator")
        .into_inner();

    // ...
}

Теперь можем начать слушать поток и обрабатывать входящие команды. Чтобы не блокировать поток и не пропускать другие команды от сервера, запускаем отдельный поток.

while let Some(msg) = stream.next().await {
    let Ok(msg) = msg
    else {
        continue;
    };

    let Some(command) = msg.request
    else {
        continue;
    };

    match command {
        Request::Command(cmd) => {
            // парсим аргументы
            let args = Args::new(cmd.args);

            // получаем модуль
            let module = registry.get(&cmd.name).await;
            if let Some(module) = module {
                let tx = tx.clone();

                // спавним поток под задачу
                tokio::spawn(async move {
                    // выполняем команду
                    let ExecuteResult { code, output } = module.execute(args).await;

                    let results = output
                        .into_iter()
                        .map(|output| MessageResult {
                            key: cmd.name.clone(),
                            value: output,
                        })
                        .collect();

                    // отправляем результат обратно серверу
                    tx.send(Message::new(cmd.uid, code, results).into())
                        .await
                        .expect("failed to send response");
                });
            }
            else {
                tx.send(
                    Message::new(
                        cmd.uid,
                        1,
                        vec![MessageResult {
                            key: "error".to_string(),
                            value: format!("module '{}' not found", cmd.name),
                        }],
                    )
                    .into(),
                )
                .await
                .expect("failed to send response");
            }
        }
        Request::Response(_) => {}
        Request::Heartbeat(_) => {}
    }
}

Клиент

Отлично, всё готово, осталось намалевать клиент. Он будет очень простым, поэтому будем использовать TUI для взаимодействия с пользователем.

Инициализируем проект.

cargo new client-tui

Добавляем в зависимости нужные крейты.

[package]
name = "client-tui"
version = "0.1.0"
edition = "2024"

[dependencies]
tokio = { version = "1.52", features = ["rt", "rt-multi-thread", "macros"] }
# для кодогенерации
tonic = "0.14"
prost = "0.14"
tonic-prost = "0.14"
# для парсинга аргументов
clap = { version = "4.6.1", features = ["derive"] }
# для TUI
ratatui = "0.30"
tui-input = "0.15"

[build-dependencies]
build-common = { path = "../build-common" }

И также создаём скрипт client-tui\build.rs с таким же содержимым, как у сервера, так как никаких конфигов у нас не будет.

fn main() {
    let proto_folder = build_common::get_proto_folder();
    build_common::compile_protos_folder(&proto_folder).unwrap();
}

И включаем сгенерированный код в модуле client-tui\src\proto.rs.

tonic::include_proto!("client");

Если речь зашла о том, что конфигов у нас не будет, будем использовать аргументы. Для того, чтобы распарсить их, будем использовать clap. Давайте для него создадим простую модель.

#[derive(Subcommand)]
pub enum Command {
    #[command(about = "Connect to a Selecit server")]
    Connect { host: String, port: Option<u16> },
}

#[derive(Parser)]
#[command(about = "Selecit client for connecting to Selecit servers")]
pub struct Args {
    #[command(subcommand)]
    pub command: Command,
}

Наш TUI будет очень простым, поэтому не буду особо подробно останавливаться на этом. Что тут происходит можете посмотреть в документации ratatui и tui-input. К сожалению, библиотека не поддерживает асинхронность, но предлагает для этого акторную модель, но мне было слишком лень это делать, проще воткнуть блокировку посредством block_in_place.

#[derive(Default, Clone, Copy)]
enum Tab {
    #[default]
    Query = 0,
    Logs = 1,
}

impl Tab {
    fn next(self) -> Option<Self> {
        match self {
            Tab::Query => Some(Tab::Logs),
            Tab::Logs => None,
        }
    }

    fn back(self) -> Option<Self> {
        match self {
            Tab::Query => None,
            Tab::Logs => Some(Tab::Query),
        }
    }
}

#[derive(Default, PartialEq)]
enum AppMode {
    #[default]
    Input,
    Normal,
}

#[derive(Default)]
pub struct App {
    current_tab: Tab,
    app_mode: AppMode,
    addr: String,

    input: Input,

    client: Option<QueryClient<Channel>>,
    logs_cache: Arc<RwLock<Vec<String>>>,
    query_cache: Option<Vec<TableRow>>,
}

impl App {
    pub async fn new(addr: String, mut client: QueryClient<Channel>) -> Self {
        let mut stream = client.logs(LogsQueryRequest {}).await.expect("unable to load server logs").into_inner();
        let logs = Arc::new(RwLock::new(Vec::new()));

        tokio::spawn({
            let logs = logs.clone();
            async move {
                loop {
                    while let Some(entry) = stream.next().await {
                        match entry {
                            Ok(entry) => {
                                for entry in entry.log {
                                    logs.write().await.push(entry);
                                }
                            }
                            Err(err) => {
                                logs.write().await.push(format!("unable to read logs because: {}", err));
                            }
                        }
                    }
                }
            }
        });

        Self {
            addr,
            client: Some(client),
            logs_cache: logs,
            ..Default::default()
        }
    }

    pub fn run(mut self, terminal: &mut DefaultTerminal) -> io::Result<()> {
        loop {
            terminal.draw(|frame| self.render(frame))?;

            let event = event::read()?;
            if let Event::Key(key) = event {
                match key.code {
                    KeyCode::Char('q') if key.modifiers.contains(KeyModifiers::CONTROL) => return Ok(()),
                    KeyCode::Enter if self.app_mode == AppMode::Input && !self.input.value().is_empty() => {
                        if let Some(ref mut client) = self.client {
                            let command = self.input.value_and_reset();
                            let result = block_in_place(|| Handle::current().block_on(async move { client.query(QueryRequest { command }).await }));
                            match result {
                                Ok(response) => {
                                    let rows = response.into_inner().rows;
                                    self.query_cache = Some(rows);
                                }
                                Err(err) => block_in_place(|| {
                                    Handle::current().block_on(async {
                                        self.logs_cache
                                            .write()
                                            .await
                                            .push(format!("an error occurred while trying to execute the command: {err}"))
                                    });
                                }),
                            }
                        }
                    }
                    KeyCode::Right if let Some(tab) = self.current_tab.next() => {
                        self.stop_editing();
                        self.current_tab = tab;
                    }
                    KeyCode::Left if let Some(tab) = self.current_tab.back() => {
                        self.start_editing();
                        self.current_tab = tab;
                    }
                    _ if self.app_mode == AppMode::Input => {
                        self.input.handle_event(&event);
                    }
                    _ => {}
                }
            }
        }
    }

    fn start_editing(&mut self) {
        self.app_mode = AppMode::Input;
    }

    fn stop_editing(&mut self) {
        self.app_mode = AppMode::Normal;
    }

    fn render(&mut self, frame: &mut Frame) {
        let [title_area, tabs_area, content_area] =
            Layout::vertical([Constraint::Length(1), Constraint::Length(1), Constraint::Fill(1)]).areas(frame.area());

        let title = Line::from(format!("Selecit Client - connected to {}", self.addr)).centered();
        frame.render_widget(title, title_area);

        let tabs = Tabs::new(vec!["Query", "Logs"]).select(self.current_tab as usize);
        frame.render_widget(tabs, tabs_area);

        match self.current_tab {
            Tab::Query => self.render_query(frame, content_area),
            Tab::Logs => self.render_logs(frame, content_area),
        }
    }

    fn render_query(&self, frame: &mut Frame, content_area: Rect) {
        let [table_area, input_area] = Layout::vertical([Constraint::Fill(8), Constraint::Fill(2)]).areas(content_area);

        let (rows, widths) = if let Some(rows) = self.query_cache.as_ref() {
            // получаем ссылку на первую строчку, чтобы составить из неё заголовок таблицы
            if let Some(first) = rows.first() {
                let widths = first.cols.iter().map(|_| Constraint::Percentage(30)).collect();

                let cols = first.cols.iter().map(|c| c.key.clone()).collect::<Vec<_>>();
                let body = rows
                    .into_iter()
                    .map(|r| Row::new(r.cols.iter().map(|c| c.data.clone()).collect::<Vec<_>>()))
                    .collect::<Vec<_>>();
                let rows = vec![Row::new(cols)].into_iter().chain(body).collect();

                (rows, widths)
            }
            else {
                (vec![], vec![])
            }
        }
        else {
            (vec![], vec![])
        };

        let table = Table::new(rows, widths).block(Block::bordered());
        frame.render_widget(table, table_area);

        let input = Paragraph::new(self.input.value()).block(Block::bordered().title("Command"));
        frame.render_widget(input, input_area);
    }

    fn render_logs(&self, frame: &mut Frame, content_area: Rect) {
        let logs = block_in_place(|| Handle::current().block_on(async { self.logs_cache.read().await }));

        let list = List::new(logs.iter().cloned()).block(Block::bordered());

        frame.render_widget(list, content_area);
    }
}

Отлично, теперь можем запустить проект.

cargo run -p server
cargo run -p agent
cargo run -p client-tui -- connect 127.0.0.1

И пробуем получить список агентов с помощью команды list by name.

Или попробуем отправить команду агенту и получить результат.

Можем также с помощью стрелочек переключиться на другую вкладку и посмотреть логи.

Безопасность

TLS

Всё отлично работает, но есть одно но: весь трафик незашифрован, а это значит, что любой может прочитать наши сообщения, как пример:

Чтобы это исправить, давайте подключим TLS, чтобы гонять трафик по HTTPS. Для этого я сначала сгенерирую сертификаты в директории certs с помощью скрипта certs\generate.ps1.

$SUBJ = "/C=RU/ST=Test/L=Test/O=Selecit/OU=/CN=localhost/emailAddress="

# CA
openssl req -x509 -newkey rsa:4096 -days 36500 -keyout ca-key.pem -out ca-cert.pem -nodes -subj $SUBJ

# CSR
openssl req -newkey rsa:4096 -keyout server-key.pem -out server-req.pem -subj $SUBJ -nodes

# server cert
openssl x509 -req -in server-req.pem -CA ca-cert.pem -CAkey ca-key.pem -CAcreateserial -out server-cert.pem -extfile localhost.ext

Чтобы корректно разрешать адрес сервера, создадим файл localhost.ext со следующим содержимым:

authorityKeyIdentifier=keyid,issuer
basicConstraints=CA:FALSE
subjectAltName = @alt_names

[alt_names]
DNS.1 = localhost
IP.1 = 127.0.0.1

После корректного выполнения скрипта должна появиться пачка сертификатов, как на скриншоте:

Так как сразу во всех проектах будет работа с файлами, то для удобства создам ещё один проект common, в котором будут функции для получения путей до каких-либо директорий.

cargo new --lib common

Создам модуль common\src\path.rs со следующим содержимым:

use std::{env, path::PathBuf};

const CERTS_FOLDER: &str = "certs";

pub fn use_app_folder() -> PathBuf {
    env::current_exe()
        .expect("failed to get current executable")
        .parent()
        .expect("failed to get current executable parent")
        .to_path_buf()
}

#[cfg(debug_assertions)]
pub fn use_project_folder() -> PathBuf {
    PathBuf::from(env::var("CARGO_MANIFEST_DIR").expect("failed to get CARGO_MANIFEST_DIR"))
}

#[cfg(debug_assertions)]
pub fn use_workspace_folder() -> PathBuf {
    use_project_folder().join("..")
}

pub fn use_certs_folder() -> PathBuf {
    if cfg!(debug_assertions) {
        use_workspace_folder().join(CERTS_FOLDER)
    }
    else {
        use_app_folder().join(CERTS_FOLDER)
    }
}

А в файле common\src\lib.rs добавлю константу SERVER_PORT. Всё, это всё назначение этого проекта.

pub mod path;

pub const SERVER_PORT: u16 = 1299;

Теперь можем перейти к проекту сервера и создать модуль server\src\config.rs, в котором опишем модель для конфига.

use std::path::PathBuf;
use crate::path;
use knus::Decode;

#[derive(Decode)]
pub struct Server {
    pub address: String,
    pub port: u16,
}

#[derive(Decode, Clone)]
pub struct Auth {
    #[knus(child, unwrap(argument))]
    pub token: String,
}

#[derive(Decode, Default)]
pub struct Path {
    #[knus(argument)]
    pub path: PathBuf
}

#[derive(Decode)]
pub struct Certificates {
    #[knus(child)]
    pub server_cert: Path,
    #[knus(child)]
    pub server_key: Path,
}

#[derive(Decode, Default)]
pub struct Config {
    #[knus(child)]
    pub server: Option<Server>,
    #[knus(child)]
    pub auth: Option<Auth>,
    #[knus(child)]
    pub certificates: Option<Certificates>,
}

pub async fn use_config() -> Config {
    let config_path = path::use_config_path();
    if tokio::fs::metadata(&config_path).await.is_ok() {
        let config_content = tokio::fs::read_to_string(config_path).await.expect("could not read config file");
        knus::parse("config", &config_content).expect("could not parse config file")
    }
    else {
        Config::default()
    }
}

И создадим сам конфиг по пути server\server.kdl и добавим пути до файлов сертификатов. Можно написать относительный путь, так как далее будем разрешать это.

certificates {
    server-cert "server-cert.pem"
    server-key "server-key.pem"
}

Бежим в server\src\main.rs и немного переделываем функцию main. Тут всё довольно просто, не вижу особого смысла разглагольствовать, поэтому предлагаю перейти далее.

#[tokio::main]
async fn main() {
    let config = use_config().await;

    let addr = if let Some(server) = config.server {
        format!("{}:{}", server.address, server.port).parse().expect("could not parse address")
    }
    else {
        SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), SERVER_PORT)
    };

    let client_map = AgentsMap::default();

    LOGS_MANAGER.send_log(format!("server listening on {}", addr)).await;

    let mut builder = if let Some(certs) = config.certificates {
        let cert_path = {
            let path = certs.server_cert.path;
            if path.is_absolute() { path } else { use_certs_folder().join(path) }
        };

        let key_path = {
            let path = certs.server_key.path;
            if path.is_absolute() { path } else { use_certs_folder().join(path) }
        };

        let cert = tokio::fs::read_to_string(cert_path).await.expect("could not read certificate");
        let key = tokio::fs::read_to_string(key_path).await.expect("could not read key");

        let identity = Identity::from_pem(cert, key);

        let tls_config = ServerTlsConfig::new().identity(identity);
        match Server::builder().tls_config(tls_config) {
            Ok(b) => {
                LOGS_MANAGER.send_log("server using tls".to_owned()).await;
                b
            }
            Err(err) => {
                LOGS_MANAGER.send_log(format!("failed to set tls config: {}", err)).await;
                Server::builder()
            }
        }
    }
    else {
        Server::builder()
    };

    builder
        .add_service(ExcavatorServer::new(ExcavatorService::new(client_map.clone())))
        .add_service(QueryServer::new(QueryService::new(LOGS_MANAGER.subscribe(), client_map)))
        .serve(addr)
        .await
        .expect("failed to serve")
}

Перейдём к агенту и сделаем примерно то же самое. Изменим немного модель конфига у агента в файле agent\src\config.rs.

use knus::Decode;

#[derive(Decode)]
pub struct Server {
    #[knus(argument)]
    pub address: String,
    #[knus(argument)]
    pub port: Option<u16>,
}

#[derive(Decode)]
pub struct Certificate {
    #[knus(argument)]
    pub ca_cert: String,
}

#[derive(Decode, Default)]
pub struct Auth {
    #[knus(child, unwrap(argument))]
    pub token: String,
}

#[derive(Decode)]
pub struct Config {
    #[knus(child)]
    pub server: Server,
    #[knus(child)]
    pub auth: Option<Auth>,
    #[knus(child)]
    pub certificate: Option<Certificate>,
}

Теперь можем добавить настройки TLS клиенту в main функции.

#[tokio::main]
async fn main() {
    // ...

    let config_path = path::use_config_path();
    let config_content = tokio::fs::read_to_string(&config_path).await.expect("unable to read config file");
    let config: Config = knus::parse("config", &config_content).expect("failed to parse config");

    let mut client = if let Some(cert) = config.certificate {
        let cert = {
            let path = PathBuf::from(cert.ca_cert);
            if path.is_absolute() { path } else { use_certs_folder().join(path) }
        };

        let cert_content = tokio::fs::read_to_string(cert).await.expect("unable to read certificate file");
        let cert = Certificate::from_pem(cert_content);

        let addr = format!("https://{}:{}", config.server.address, config.server.port.unwrap_or(SERVER_PORT));

        let tls = ClientTlsConfig::new().ca_certificate(cert);
        let channel = Channel::from_shared(addr.clone())
            .unwrap()
            .tls_config(tls)
            .unwrap()
            .connect()
            .await
            .expect("TLS connection failed");

        println!("listening server at {}", addr);

        ExcavatorClient::new(channel)
    }
    else {
        let addr = format!("http://{}:{}", config.server.address, config.server.port.unwrap_or(SERVER_PORT));
        println!("listening server at {}", addr);
        ExcavatorClient::connect(addr.clone()).await.expect("connection to server failed")
    };

    // ...
}

Теперь наш трафик успешно шифруется, в чём можно убедиться, посмотрев в WireShark, что у нас теперь гоняются TLS пакеты.

Аутентификация

Трафик шифруется, но подключиться может всё равно каждый. Давайте это исправим, добавив аутентификацию пользователей.

Тут есть два пути: классическая аутентификация по каком-нибудь паролю или mTLS (пример из репозитория tonic). Тут выбирайте сами, что вам удобнее или проще, но я для примера буду использовать простой токен.

Для этого добавим слой в server\src\main.rs, который будет обрабатывать все входящие запросы и проверять, есть ли в заголовках поле authorization с нужным нам токеном.

#[tokio::main]
async fn main() {
    let config = use_config().await;

    // ...

    if config.auth.is_some() {
        LOGS_MANAGER.send_log("server using token auth".to_owned()).await;
    }

    let mut auth = config.auth;
    builder
        .layer(InterceptorLayer::new(move |req: Request<()>| {
            if let Some(auth) = auth.as_mut() {
                if req
                    .metadata()
                    .get("authorization")
                    .map(|header| header.to_str())
                    .is_some_and(|s| s.is_ok_and(|s| s == auth.token))
                {
                    Ok(req)
                }
                else {
                    Err(Status::unauthenticated("unauthorized"))
                }
            }
            else {
                Ok(req)
            }
        }))
        .add_service(ExcavatorServer::new(ExcavatorService::new(client_map.clone())))
        .add_service(QueryServer::new(QueryService::new(LOGS_MANAGER.subscribe(), client_map)))
        .serve(addr)
        .await
        .expect("failed to serve")
}

Теперь можем модифицировать модель для аргументов в client-tui\src\args.

use clap::{Parser, Subcommand};

#[derive(Subcommand)]
pub enum Command {
    #[command(about = "Connect to a Selecit server")]
    Connect {
        /// Server host
        host: String,

        /// Server port
        port: Option<u16>,

        /// Path to the CA certificate
        #[arg(long)]
        ca: Option<String>,

        /// Authentication token
        #[arg(long, short)]
        token: Option<String>,
    },
}

#[derive(Parser)]
#[command(about = "Selecit client for connecting to Selecit servers")]
pub struct Args {
    #[command(subcommand)]
    pub command: Command,
}

И в client-tui\srv\main.rs создадим замыкание, которое будет вставлять в заголовки запроса токен из аргументов. После чего закидываем это замыкание в QueryClient::with_interceptor. Мне пришлось упаковать замыкание в Box, так как client дальше передаётся в App, из-за чего компилятор жаловался на несоответствие типов.

pub type InterceptFn = Box<dyn FnMut(Request<()>) -> Result<Request<()>, Status>>;

#[tokio::main]
async fn main() {
    let args = Args::parse();
    match args.command {
        Command::Connect {
            host,
            port,
            ca: ca_cert,
            mut token,
        } => {
            let interceptor: InterceptFn = Box::new(move |mut req: tonic::Request<()>| {
                if let Some(auth) = token.as_mut() {
                    req.metadata_mut().insert(AUTHORIZATION, MetadataValue::from_str(auth.as_str()).unwrap());
                }
                Ok(req)
            });

            let (client, server_addr) = if let Some(ca_cert) = ca_cert {
                // ...

                (QueryClient::with_interceptor(channel, interceptor), server_addr)
            }
            else {
                // ...

                (QueryClient::with_interceptor(channel, interceptor), server_addr)
            };

            let mut terminal = ratatui::init();
            App::new(server_addr, client).await.run(&mut terminal).expect("failed to run app");
            ratatui::restore();
        }
    }
}

Можем теперь проделать то же самое и с агентом.

#[tokio::main]
async fn main() {
    // ...

    let config_path = path::use_config_path();
    let config_content = tokio::fs::read_to_string(&config_path).await.expect("unable to read config file");
    let mut config: Config = knus::parse("config", &config_content).expect("failed to parse config");

    let interceptor = move |mut req: tonic::Request<()>| {
        if let Some(auth) = config.auth.as_mut() {
            req.metadata_mut()
                .insert(AUTHORIZATION, MetadataValue::from_str(auth.token.as_str()).unwrap());
        }
        Ok(req)
    };

    let mut client = if let Some(cert) = config.certificate {
        // ...

        ExcavatorClient::with_interceptor(channel, interceptor)
    }
    else {
        // ...

        ExcavatorClient::with_interceptor(channel, interceptor)
    };

    // ...
}

Итого

Мы реализовали систему распределённого управления, которая позволяет нам получать список агентов, подходящие под какие-то нужные нам критерии, и даже выполнять команды посредством этих агентов на удалённых хостах.

Наша система далеко не идеальна, везде используются except да unwrap, нет переподключения агентов, язык запросов пока что в зачаточном состоянии, нет никакого кэширования, нужно прикрутить БД для хранения какого-то состояния системы, и куча других проблем, но, доработав слабые места, думаю, система жизнеспособна.

Если кому нужно посмотреть полный код проекта, то вот его репозиторий.

Из похожего я нашёл bssh, который стремится или даже уже достиг что-то похожее, к чему стремился я в начале статьи, но он выбрал другой путь работы поверх SSH.