Недавно мне в голову пришла мысль о том, как бы получать список техники, на которой, например, установлен последний патч системы или нужное ПО. Можно написать скрипты, но, как мне кажется, это не совсем удобно и безопасно, особенно когда парк машин неоднообразен. Поэтому я предлагаю реализовать систему, где вся запрашиваемая информация будет собираться с помощью агентов на конечных устройствах в реальном времени с помощью специального языка запросов.
Как я думаю, у этой системы довольно большой потенциал, ведь его можно доработать и превратить в какой-нибудь аналог Ansible. Или в базу для SIEM системы. Или прикрутить к какой-нибудь ITSM-системе, чтобы проводить инвентаризацию по нужным параметрам.
Вся система будет строиться на gRPC, так как он из коробки идёт со всем необходимы, включая простую интеграцию систем аутентификации и TLS, плюс описание интерфейсов в .proto файлах позволит писать клиенты и агентов на различных языках и фреймворках. Единственный минус, так это то, что двунаправленное взаимодействие не укладывается в философию фреймворка, но это решим в процессе написания.
Как это будет работать
Наша система будет представлять из себя агентов, которые устанавливают соединение с севером, а сервер уже будет по этим соединениям отправлять команды и получать результаты выполнения. А отправлять команды на сервер будем посредством другого, простого gRPC клиента, который также будет устанавливать соединение с сервером, но работать с другим сервисом.

Почему бы просто не прикрутить какую-нибудь командую строку серверу, чтобы сразу на нём выполнять команды? Мне показалось такое решение довольно неудобным, особенно в малоуправляемых средах по типу Docker-контейнера. Будет куда практичнее сделать именно так, как я описал выше. Это позволит даже с помощью скриптов запрашивать нужную информацию с сервера и не придётся городить для этого трёхэтажные костыли. Если я в чём-то ошибаюсь, то поправьте меня, пожалуйста, в комментариях.
gRPC
Начнём с того, что опишем весь интерфейс взаимодействия сервера с агентами и клиентами. Для этого создадим в корне проекта папку proto и создадим файл main.proto, в котором опишем сервис Excavator. Да, хотелось какое-то вычурное и необычное имя, которое будет олицетворять какую-то тяжёлую работу.
Как можете наблюдать ниже, агент-серверное общение будет происходить посредством стримов, это позволит достичь двунаправленное общение. Чтобы особо не путаться в типах сообщений, я решил просто объединить их в одно.
И наличие ExcavatorHeartbeat может показаться избыточным и ненужным, но оно на самом деле нужно для того, чтобы установить соединение.
syntax = "proto3"; package main; service Excavator { rpc RunExcavator (stream ExcavatorMessage) returns (stream ExcavatorMessage); } message ExcavatorHeartbeat {} message ExcavatorCommandArg { string key = 1; string value = 2; } message ExcavatorCommand { string uid = 1; string name = 2; repeated ExcavatorCommandArg args = 3; } message ExcavatorResponseResult { string key = 1; string value = 2; } message ExcavatorResponse { string uid = 1; int64 code = 2; repeated ExcavatorResponseResult results = 3; } message ExcavatorMessage { oneof request { ExcavatorHeartbeat heartbeat = 1; ExcavatorResponse response = 2; ExcavatorCommand command = 3; } }
Теперь можем описать клиент-серверное общение, для этого создадим ещё один файл client.proto и опишем сервис Query, содержащий два метода: для запросов и логов. Да, будем отправлять логи сервера на клиент.
Как можете наблюдать, стримов тут нет. Это всё из-за того, что не сервер будет обращаться к клиенту, а клиент к серверу.
syntax = "proto3"; package client; service Query { rpc Query (QueryRequest) returns (QueryResponse); rpc Logs (LogsQueryRequest) returns (stream LogsQueryResponse); } message LogsQueryRequest {} message LogsQueryResponse { repeated string log = 1; } message QueryRequest { string command = 1; } message TableCol { string key = 1; string data = 2; } message TableRow { repeated TableCol cols = 1; } message QueryResponse { repeated TableRow rows = 1; }
Сервер
Предлагаю начать от того, куда все будут пытаться подключиться, а именно с сервера. Для этого давайте инициализируем проект. Но для начала создадим Cargo.toml в корне проекта.
[workspace] resolver = "3"
Теперь можем инициализировать проект сервера следующей командой, она автоматом добавит проект в члены воркспейса. Но также создадим проект библиотеки, которую будут использовать все проекты для генерации кода gRPC.
cargo new server cargo new --lib build-common
Библиотека будет очень миниатюрная, будет содержать лишь пару функций для генерации кода в build.rs. Но для начала перейдём в этот проект и в зависимости добавим следующее:
[package] name = "build-common" version = "0.1.0" edition = "2024" [dependencies] tonic-prost-build = "0.14" glob = "0.3"
Теперь в lib.rs можем написать следующие функции:
use std::{env, fs, io, path::Path}; pub const PROTO_FOLDER: &str = "proto"; pub fn get_proto_folder() -> std::path::PathBuf { Path::new(&env::var("CARGO_MANIFEST_DIR").expect("CARGO_WORKSPACE not set")) .join("..") .join(PROTO_FOLDER) } pub fn compile_protos_folder(folder: impl AsRef<Path>) -> io::Result<()> { let folder = folder.as_ref(); println!("cargo:rerun-if-changed={}/*", folder.display()); for entry in fs::read_dir(folder)? { tonic_prost_build::compile_protos(entry.unwrap().path())?; } Ok(()) }
Откроем server/Cargo.toml и добавим этот проект в build-зависимости проекта сервера. Также сразу в обычные зависимости добавляем следующие крейты, они потребуются для кодогенерации.
[package] name = "server" version = "0.1.0" edition = "2024" [dependencies] tokio = { version = "1.41", features = ["full"] } prost = "0.14" tonic = { version = "0.14", features = ["tls-ring"] } tonic-prost = "0.14" hex = "0.4" tokio-stream = "0.1" lazy_static = "1.5" futures = "0.3" small_uid = "0.2.4" nom = "8.0.0" [build-dependencies] build-common = { path = "../build-common" }
Создадим файл server\build.rs и добавим туда следующие строки:
fn main() { let proto_folder = build_common::get_proto_folder(); build_common::compile_protos_folder(&proto_folder).unwrap(); }
Если проект собирается, то код успешно генерируется. Чтобы взаимодействовать с ним, создадим папку server\proto с файлом mod.rs, в нём добавим следующие строчки:
tonic::include_proto!("main"); tonic::include_proto!("client");
Предлагаю продолжить с чего-то простого, как, например, реализация логов. Для этого создадим модуль client.rs в server\src\proto. Все логи будем хранить в глобальном векторе и будем динамически подгружать их клиентам с помощью стрима. Чтобы ручками каждый раз не делать всё это, создадим глобальную структуру LogsManager, которая будет помещать логи в этот вектор и отправлять в поток.
В целом, думаю, тут нечего объяснять, всё довольно просто. Кроме того, что lazy_static нужен из-за того, что Mutex из tokio не const-совместимый, как, например, Mutex из стандартной библиотеки, поэтому просто так в статике его не инициализировать.
Обёртка в виде LogsReceiver нужен чисто для того, чтобы не запутаться в каналах, компилятор будет защищать нас от глупых ошибок. Наверное.
lazy_static! { pub static ref LOGS_MANAGER: LogsManager = LogsManager(broadcast::channel(128).0); static ref LOGS: Mutex<Vec<String>> = Mutex::new(Vec::new()); } pub struct LogsManager(broadcast::Sender<String>); impl LogsManager { pub async fn send_log(&self, log: String) { LOGS.lock().await.push(log.clone()); if let Err(err) = self.0.send(log) { match err { SendError(log) => { LOGS.lock().await.push(format!("dropped log when no subscribers: '{log}'")); } } } } pub fn subscribe(&self) -> LogsReceiver { LogsReceiver(self.0.subscribe()) } } pub struct LogsReceiver(broadcast::Receiver<String>); impl Deref for LogsReceiver { type Target = broadcast::Receiver<String>; fn deref(&self) -> &Self::Target { &self.0 } }
Давайте реализуем сервис Query. Для этого создадим структуру QueryService, которая будет содержать ссылку на наш канал LogsReceiver.
pub struct QueryService(LogsReceiver); impl QueryService { pub fn new(logs: LogsReceiver) -> Self { Self(logs) } }
Имплементируем сгенерированный из proto-файла трейт Query для созданной структуры. Метод query будет пока пустым, чуть позже его заполним. В силу того, что методы асинхронные, а нормальной поддержки асинхронных трейтов в языке ещё нет, используется реэкспорт async_trait, помогающий этого добиться.
#[tonic::async_trait] impl Query for QueryService { async fn query(&self, request: Request<QueryRequest>) -> Result<Response<QueryResponse>, Status> { todo!() } type LogsStream = Pin<Box<dyn Stream<Item = Result<LogsQueryResponse, Status>> + Send>>; async fn logs(&self, _: Request<LogsQueryRequest>) -> Result<Response<Self::LogsStream>, Status> { let (tx, rx) = tokio::sync::mpsc::channel(128); let mut logs_rx = self.0.resubscribe(); // Свободно плавающий поток, который будет слушать поток от LogsManager // и отправлять в стрим новые логи. tokio::spawn(async move { let log = LOGS.lock().await.clone(); let entry = LogsQueryResponse { log }; // при первом подключении отправляем все логи сразу tx.send(Ok(entry)).await.expect("failed to send log"); // динамически отправляем новые логи while let Ok(log) = logs_rx.recv().await { let entry = LogsQueryResponse { log: vec![log] }; tx.send(Ok(entry)).await.expect("failed to send log"); } }); // Создаём поток из канала с помощью обёртки из tokio-stream let stream = ReceiverStream::new(rx); Ok(Response::new(Box::pin(stream))) } }
Перед тем, как продолжить с реализацией метода query, предлагаю сначала реализовать небольшой язык запросов. В силу того, что на момент написания статьи мне было лень его прорабатывать, он будет очень простым и будет состоять всего из нескольких выражений. Для этого создадим модуль query.rs в server\src.
Для реализации парсера воспользуемся комбинаторами из крейта nom. Что такое парсинг-комбинаторы можете ознакомиться в документации, ссылку на которую я привёл.
Первые наши команды будут выглядеть как-то так. То есть первая команда будет составлять список агентов по какому-то критерию, а вторая будет делать запросы к конкретному агенту. Запросы будут выглядеть как функции с массивом аргументов.
list by field_name from agent select say_hello(name = world), machine(get = name)
Начнём с того, что опишем AST структурами Rust. То есть мы будем перемещаться по тексту запроса с помощью комбинаторов и превращать его в структуры. Как можете наблюдать, AST будет очень простой, так как те же числа мы не будем парсить, поэтому язык будет пока довольно ограничен, но для примера хватит, я думаю.
#[derive(Debug, PartialEq)] pub struct InvokeFuncArg { pub name: String, pub value: String, } #[derive(Debug, PartialEq)] pub struct InvokeFunc { pub name: String, pub args: Vec<InvokeFuncArg>, } #[derive(Debug, PartialEq)] pub enum QueryExpr { ListBy(String), SelectFrom { from: String, select: Vec<InvokeFunc> }, }
Начнём от частного к общему, то есть реализуем сначала составные части, по типу парсеров значений и будем двигаться в сторону парсера всего выражения. Для начала реализуем парсеры identifier и field. Всё отличие в том, что первый парсит текст с числами, а второе просто текстовые значения. То есть будет запрещено в названиях полей использовать что-то, кроме буков.
fn identifier(input: &str) -> IResult<&str, &str> { // берёт символы до тех пор, пока функция char::is_alphanumeric не вернёт false, // а потом возвращает эту подстроку take_while(char::is_alphanumeric).parse(input) } fn field(input: &str) -> IResult<&str, &str> { take_while(char::is_alphabetic).parse(input) }
Думаю, этого хватит, чтобы реализовать парсер для выражения list by field_name. Парсер tag берёт какую-то строку и проверяет, соответствует ли входная строка этому значению.
Все парсеры возвращают слайс оставшегося входного текста. То есть если на вход этому парсеру вошла строчка list by field_name, то вернёт от оставшегося текста слайс field_name.
Парсер space1 - встроенный в крейт парсер, который парсит пробелы. Число в конце функции означает то, что должен встретиться минимум один пробел. Есть аналогичный парсер space0, который делает то же самое, но пробелов может вовсе не быть.
fn list_by(input: &str) -> IResult<&str, QueryExpr> { let (input, _) = tag("list by").parse(input)?; let (input, _) = space1(input)?; let (input, field_name) = field(input)?; Ok((input, QueryExpr::ListBy(field_name.to_string()))) }
Давайте реализуем парсер функций. Для этого имплементируем парсер аргументов. Он тоже довольно простой, ведь сначала мы реализуем парсер одного аргумента invoke_arg, а потом с помощью парсера separated_list0, принимающий на вход два парсера: разделителя и значения.
Если честно, то не понял разницы между
separated_list0иseparated_list1, так как у них поведение идентичное: выбрасывают ошибку, если один из парсеров на входе выдал ошибку. Так как у нас тут не может встретиться EOF, то это не проблема, но далее это создаст проблему.
fn invoke_arg(input: &str) -> IResult<&str, InvokeFuncArg> { // считываем название аргумента let (input, name) = field(input)?; // возможные пробелы перед и после знака '=' let (input, _) = space0(input)?; let (input, _) = char('=').parse(input)?; // то же самое, что парсер `tag`, но на вход принимает исключительно символы let (input, _) = space0(input)?; // считываем значение аргумента let (input, value) = identifier(input)?; Ok(( input, InvokeFuncArg { name: name.to_string(), value: value.to_string(), }, )) } fn invoke_args(input: &str) -> IResult<&str, Vec<InvokeFuncArg>> { separated_list0(invoke_list_separate, invoke_arg).parse(input) } fn invoke_list_separate(input: &str) -> IResult<&str, ()> { let (input, _) = space0(input)?; let (input, _) = char(',').parse(input)?; let (input, _) = space0(input)?; Ok((input, ())) }
Реализуем парсер функции и списка функций. Так как парсер функции очень простой, то не буду на нём подробно останавливаться.
У нас обязательно присутствует хоть одна функция, поэтому сначала вызываем парсер функции и добавляем результат его парсинга в вектор. Далее сепаратор и последующие вызовы функций опциональны, но если у нас может быть плавающая запятая, поэтому мы не возвращаем ошибку из парсера invoke_list_separate, а лишь прерываем цикл и возвращаем массив распарсенных функций; а вот вызов последующих функций обязателен, поэтому возвращаем ошибку из invoke_func. Но если удалось распарсить функцию, то сохраняем слайс, чтобы парсер не ушёл в бесконечный цикл парсинга одного и того же, и полученную функцию в вектор.
Как упоминалось ранее, все эти костыли нужны из-за того, что separated_list0 выкидывает ошибку при встрече EOF, который может встретиться в данном случае.
// парсер одной функции fn invoke_func(input: &str) -> IResult<&str, InvokeFunc> { let (input, name) = map(field, |s| s.to_string()).parse(input)?; let (input, _) = space0(input)?; let (input, _) = char('(').parse(input)?; let (input, _) = space0(input)?; let (input, args) = invoke_args(input)?; let (input, _) = space0(input)?; let (input, _) = char(')').parse(input)?; Ok((input, InvokeFunc { name, args })) } // парсер списка функций fn invoke_list(input: &str) -> IResult<&str, Vec<InvokeFunc>> { let (input, first) = invoke_func(input)?; let mut acc = vec![first]; let mut input = input; while let Ok((i, _)) = invoke_list_separate(input) { let (i, func) = invoke_func(i)?; acc.push(func); input = i; } Ok((input, acc)) }
Далее можем реализовать парсер более сложного выражения from agent select say_hello(name = world), machine(get = name). Тут тоже ничего особенного, поэтому перейдём сразу к функции parse_query, где есть парсер alt, который запускает последовательно кортеж/вектор переданных в него парсеров пока один из них не вернёт Ok. То есть сначала запускаем парсер list_by и если парсер tag("list by") возвращает Err, то эта ошибка возвращается выше в комбинатор alt, который, в свою очередь, запускает далее парсер select_from.
fn select_from(input: &str) -> IResult<&str, QueryExpr> { let (input, _) = tag("from").parse(input)?; let (input, _) = space1(input)?; let (input, from) = map(identifier, |s| s.to_string()).parse(input)?; let (input, _) = space1(input)?; let (input, _) = tag("select").parse(input)?; let (input, _) = space1(input)?; let (input, invoke_list) = invoke_list(input)?; Ok((input, QueryExpr::SelectFrom { from, select: invoke_list })) } pub async fn parse_query(query: &str) -> Result<QueryExpr, QueryParseError> { let query = alt((list_by, select_from)).parse(query); match query { Ok(result) => Ok(result.1), Err(err) => { LOGS_MANAGER.send_log(format!("parse error: {err}")).await; Err(QueryParseError) } } }
Но толку от этого всего, если нельзя сделать выборку нужных устройств. Поэтому предлагаю немного усложнить синтаксис нашего языка запросов и добавить стейтмент where, после которого будем вычислять boolean значение, включая запросы к агентам, которые будут выполнены в виде функций.
Первое, что нам потребуется сделать, это дополнить модель AST. Будем использовать метод рекурсивного спуска, поэтому добавляем детерминированные значения Term и Value в перечисления QueryConditionExpr и QueryConditionTerm, соответственно.
#[derive(Debug, PartialEq, Clone)] pub struct InvokeFuncArg { pub name: String, pub value: String, } #[derive(Debug, PartialEq, Clone)] pub struct InvokeFunc { pub name: String, pub args: Vec<InvokeFuncArg>, } #[derive(Debug, PartialEq, Clone)] pub enum QueryValue { FnField { func: InvokeFunc, field: String }, Identifier(String), String(String), Number(f64), Bool(bool), Null, } impl Display for QueryValue { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { let str = match self { QueryValue::FnField { func, field } => format!( "{}({}).{field}", func.name, func.args .iter() .map(|arg| format!("{}={}", arg.name, arg.value)) .collect::<Vec<_>>() .join(",") ), QueryValue::Identifier(s) => s.clone(), QueryValue::String(s) => s.clone(), QueryValue::Number(n) => n.to_string(), QueryValue::Bool(b) => b.to_string(), QueryValue::Null => "null".to_string(), }; write!(f, "{}", str) } } #[derive(Debug, PartialEq)] enum QueryOperation { Eq, More, Less, } #[derive(Debug, PartialEq)] pub enum QueryConditionTerm { Eq { left: Box<QueryConditionTerm>, right: Box<QueryConditionTerm>, }, More { left: Box<QueryConditionTerm>, right: Box<QueryConditionTerm>, }, Less { left: Box<QueryConditionTerm>, right: Box<QueryConditionTerm>, }, Value(QueryValue), } #[derive(Debug, PartialEq)] enum QueryExprOperation { And, Or, } #[derive(Debug, PartialEq)] pub enum QueryConditionExpr { And { left: Box<QueryConditionExpr>, right: Box<QueryConditionExpr>, }, Or { left: Box<QueryConditionExpr>, right: Box<QueryConditionExpr>, }, Term(QueryConditionTerm), } #[derive(Debug, PartialEq)] pub enum QueryExpr { ListBy { field: String, condition: Option<QueryConditionExpr> }, SelectFrom { from: String, select: Vec<InvokeFunc> }, }
Теперь можем написать новенькие парсеры. Предлагаю начать с парсера value, который как раз будет парсить конечные значения в нашей модели.
Я, если честно, не уверен, как более правильно реализовать парсер с take_while, чтоб он не выдавал пустую строчку за идентификатор, поэтому воспользовался map_opt, чтоб тот завершал работу парсера ошибкой, если строчка пустая.
У нас есть парсер string, он очень простой, поэтому пока не будет обрабатывать строчки с экранированием символов. Если кому нужно экранирование, то в nom есть парсер escaped для таких целей.
fn identifier(input: &str) -> IResult<&str, QueryValue> { map_opt(take_while(|c: char| c.is_alphanumeric() || c == '_' || c == '-'), |s: &str| { if s.is_empty() { None } else { Some(QueryValue::Identifier(s.to_string())) } }) .parse(input) } fn string(input: &str) -> IResult<&str, QueryValue> { map_opt(delimited(char('"'), take_till(|c: char| c == '"'), char('"')), |s: &str| { if s.is_empty() { None } else { Some(QueryValue::String(s.to_string())) } }) .parse(input) } fn number(input: &str) -> IResult<&str, QueryValue> { map(double, QueryValue::Number).parse(input) } fn boolean(input: &str) -> IResult<&str, QueryValue> { map(alt((tag("true"), tag("false"))), |s: &str| QueryValue::Bool(s == "true")).parse(input) } fn null(input: &str) -> IResult<&str, QueryValue> { map(tag("null"), |_| QueryValue::Null).parse(input) } fn func_field(input: &str) -> IResult<&str, QueryValue> { map(separated_pair(invoke_func, char('.'), identifier), |(f, i)| QueryValue::FnField { func: f, field: i.to_string(), }) .parse(input) } fn value(input: &str) -> IResult<&str, QueryValue> { alt((string, func_field, identifier, number, boolean, null)).parse(input) }
Теперь можем рекурсивно распарсить выражение по типу info(type = modules).info = version, написав небольшой парсер, где мы сначала парсим гарантированно какое-то значение, а потом проверяем, идёт ли какая-нибудь операции далее, и если есть, то запускаем рекурсию.
fn operation(input: &str) -> IResult<&str, QueryOperation> { alt(( map(char('='), |_| QueryOperation::Eq), map(char('<'), |_| QueryOperation::Less), map(char('>'), |_| QueryOperation::More), )) .parse(input) } fn condition_term(input: &str) -> IResult<&str, QueryConditionTerm> { let (input, left) = map(value, QueryConditionTerm::Value).parse(input)?; let (input, _) = space0(input)?; let (input, op) = opt(operation).parse(input)?; if let Some(op) = op { let (input, _) = space0(input)?; let (input, right) = condition_term(input)?; Ok(( input, match op { QueryOperation::Eq => QueryConditionTerm::Eq { left: Box::new(left), right: Box::new(right), }, QueryOperation::More => QueryConditionTerm::More { left: Box::new(left), right: Box::new(right), }, QueryOperation::Less => QueryConditionTerm::Less { left: Box::new(left), right: Box::new(right), }, }, )) } else { Ok((input, left)) } }
Можем проделать то же самое с выражениями по типу true & false, то есть term & term.
fn condition_expr_op(input: &str) -> IResult<&str, QueryExprOperation> { alt((map(char('&'), |_| QueryExprOperation::And), map(char('|'), |_| QueryExprOperation::Or))).parse(input) } fn condition_expr(input: &str) -> IResult<&str, QueryConditionExpr> { let (input, left) = map(condition_term, QueryConditionExpr::Term).parse(input)?; let (input, _) = space0(input)?; let (input, op) = opt(condition_expr_op).parse(input)?; if let Some(op) = op { let (input, _) = space0(input)?; let (input, right) = condition_expr(input)?; Ok(( input, match op { QueryExprOperation::And => QueryConditionExpr::And { left: Box::new(left), right: Box::new(right), }, QueryExprOperation::Or => QueryConditionExpr::Or { left: Box::new(left), right: Box::new(right), }, }, )) } else { Ok((input, left)) } }
Доработает парсер тем, что будем парсить сначала стейтмент where, а поток рекурсивно условие.
fn condition(input: &str) -> IResult<&str, QueryConditionExpr> { let (input, _) = space1(input)?; let (input, _) = tag("where").parse(input)?; let (input, _) = space1(input)?; condition_expr(input) } fn list_by(input: &str) -> IResult<&str, QueryExpr> { let (input, _) = tag("list by").parse(input)?; let (input, _) = space1(input)?; let (input, field_name) = identifier(input)?; let (input, condition) = opt(condition).parse(input)?; Ok(( input, QueryExpr::ListBy { field: field_name.to_string(), condition, }, )) }
Чтобы убедиться в том, что всё работает корректно, я написал несколько простых юнит-тестов
#[cfg(test)] mod tests { use super::*; #[tokio::test] async fn test_parse_field_parser() { let query = "list by name"; let result = identifier(&query[8..]); assert_eq!(result, Ok(("", QueryValue::Identifier("name".to_owned())))); } #[tokio::test] async fn test_parse_list_by() { let query = "list by name"; let result = list_by(query); assert_eq!( result, Ok(( "", QueryExpr::ListBy { field: "name".to_owned(), condition: None } )) ); } #[tokio::test] async fn test_parse_list_by_where() { let query = "list by name where addr = \"127.0.0.1:8080\""; let result = list_by(query); assert_eq!( result, Ok(( "", QueryExpr::ListBy { field: "name".to_owned(), condition: Some(QueryConditionExpr::Term(QueryConditionTerm::Eq { left: Box::new(QueryConditionTerm::Value(QueryValue::Identifier("addr".to_string()))), right: Box::new(QueryConditionTerm::Value(QueryValue::String("127.0.0.1:8080".to_string()))), })) } )) ); } #[tokio::test] async fn test_parse_list_by_where_invoke_func_field() { let query = "list by name where version().version = \"1.0.0\""; let result = list_by(query); assert_eq!( result, Ok(( "", QueryExpr::ListBy { field: "name".to_owned(), condition: Some(QueryConditionExpr::Term(QueryConditionTerm::Eq { left: Box::new(QueryConditionTerm::Value(QueryValue::FnField { func: InvokeFunc { name: "version".to_owned(), args: vec![] }, field: "version".to_owned() })), right: Box::new(QueryConditionTerm::Value(QueryValue::String("1.0.0".to_string()))), })) } )) ); } #[tokio::test] async fn test_parse_list_by_where_invoke_func_field_with_args() { let query = "list by name where info(type = modules).version = \"1.0.0\""; let result = list_by(query); assert_eq!( result, Ok(( "", QueryExpr::ListBy { field: "name".to_owned(), condition: Some(QueryConditionExpr::Term(QueryConditionTerm::Eq { left: Box::new(QueryConditionTerm::Value(QueryValue::FnField { func: InvokeFunc { name: "info".to_owned(), args: vec![InvokeFuncArg { name: "type".to_owned(), value: "modules".to_owned(), }], }, field: "version".to_owned() })), right: Box::new(QueryConditionTerm::Value(QueryValue::String("1.0.0".to_string()))), })) } )) ); } #[tokio::test] async fn test_parse_list_by_where_invoke_func_field_with_args_and_other() { let query = "list by name where info(type = modules).version = \"version\" & version().version = \"0.1.0\""; let result = list_by(query); assert_eq!( result, Ok(( "", QueryExpr::ListBy { field: "name".to_owned(), condition: Some(QueryConditionExpr::And { left: Box::new(QueryConditionExpr::Term(QueryConditionTerm::Eq { left: Box::new(QueryConditionTerm::Value(QueryValue::FnField { func: InvokeFunc { name: "info".to_string(), args: vec![InvokeFuncArg { name: "type".to_string(), value: "modules".to_string(), }], }, field: "version".to_string() })), right: Box::new(QueryConditionTerm::Value(QueryValue::String("version".to_string()))), })), right: Box::new(QueryConditionExpr::Term(QueryConditionTerm::Eq { left: Box::new(QueryConditionTerm::Value(QueryValue::FnField { func: InvokeFunc { name: "version".to_string(), args: vec![], }, field: "version".to_string() })), right: Box::new(QueryConditionTerm::Value(QueryValue::String("0.1.0".to_string()))), })), }) } )) ); } #[tokio::test] async fn test_parse_identifier_parser() { let query = "from name select version()"; let (rest, _) = tag::<_, _, nom::error::Error<&str>>("from").parse(query).unwrap(); let (rest, _) = space1::<_, nom::error::Error<&str>>(rest).unwrap(); let result = identifier(rest); assert_eq!(result, Ok((" select version()", QueryValue::Identifier("name".to_string())))); } #[tokio::test] async fn test_parse_space_parser() { let query = "from name select version()"; let (rest, _) = tag::<_, _, nom::error::Error<&str>>("from").parse(query).unwrap(); let result = space1::<_, nom::error::Error<&str>>(rest); assert_eq!(result, Ok(("name select version()", " "))); } #[tokio::test] async fn test_parse_more_space_parser() { let query = "from name select version()"; let (rest, _) = tag::<_, _, nom::error::Error<&str>>("from").parse(query).unwrap(); let result = space1::<_, nom::error::Error<&str>>(rest); assert_eq!(result, Ok(("name select version()", " "))); } #[tokio::test] async fn test_parse_from_parser() { let query = "from name select version()"; let result = tag::<_, _, nom::error::Error<&str>>("from").parse(query); assert_eq!(result, Ok((" name select version()", "from"))); } #[tokio::test] async fn test_parse_maybe_space_no_spaces() { let query = ""; let result = space0::<_, nom::error::Error<&str>>(query); assert_eq!(result, Ok(("", ""))) } #[tokio::test] async fn test_parse_maybe_space() { let query = " ,"; let result = space0::<_, nom::error::Error<&str>>(query); assert_eq!(result, Ok((",", " "))) } #[tokio::test] async fn test_parse_invoke_list_separate() { let query = " , hello"; let result = invoke_list_separate(query); assert_eq!(result, Ok(("hello", ()))); } #[tokio::test] async fn test_parse_invoke_list_separate_without_space() { let query = ",hello"; let result = invoke_list_separate(query); assert_eq!(result, Ok(("hello", ()))); } #[tokio::test] async fn test_parse_no_invoke_args() { let query = "()"; let (input, _) = char::<_, nom::error::Error<&str>>('(').parse(query).unwrap(); let result = invoke_args(input); assert_eq!(result, Ok((")", vec![]))); } #[tokio::test] async fn test_parse_invoke_args() { let query = "one = first, two = second)"; let result = invoke_args(query); assert_eq!( result, Ok(( ")", vec![ InvokeFuncArg { name: "one".to_owned(), value: "first".to_owned() }, InvokeFuncArg { name: "two".to_owned(), value: "second".to_owned() } ] )) ) } #[tokio::test] async fn test_parse_invoke_func() { let query = "version()"; let result = invoke_func(query); assert_eq!( result, Ok(( "", InvokeFunc { name: "version".to_string(), args: vec![] } )) ); } #[tokio::test] async fn test_parse_invoke_list() { let query = "version()"; let result = invoke_list(query); assert_eq!( result, Ok(( "", vec![InvokeFunc { name: "version".to_string(), args: vec![] }] )) ) } #[tokio::test] async fn test_parse_invoke_list_many_invoke() { let query = "version(), hello()"; let result = invoke_list(query); assert_eq!( result, Ok(( "", vec![ InvokeFunc { name: "version".to_string(), args: vec![] }, InvokeFunc { name: "hello".to_string(), args: vec![] } ] )) ) } #[tokio::test] async fn test_parse_select_from() { let query = "from name select version()"; let result = select_from(query); assert_eq!( result, Ok(( "", QueryExpr::SelectFrom { from: "name".to_string(), select: vec![InvokeFunc { name: "version".to_string(), args: vec![] }] } )) ) } #[tokio::test] async fn test_parse_query_select() { let query = "from name select version()"; let result = parse_query(query).await; assert_eq!( result, Ok(QueryExpr::SelectFrom { from: "name".to_string(), select: vec![InvokeFunc { name: "version".to_string(), args: vec![] }] }) ); } }
Отлично, можем продолжить, хотел бы сказать, что реализовывать трейт Query, но у нас пока нет пула соединений с агентами, поэтому предлагаю реализовать трейт сервиса Excavator. Для этого создам новый модуль server\src\excavator.rs.
Для начала давайте имплементируем AgentId, чтобы как-то идентифицировать агента в пуле.
#[derive(Eq, PartialEq, Hash)] pub struct AgentInner { pub name: String, pub addr: SocketAddr, } #[derive(Clone)] pub struct AgentId(Arc<Mutex<AgentInner>>); impl Hash for AgentId { fn hash<H: Hasher>(&self, state: &mut H) { let inner = block_in_place(|| Handle::current().block_on(async { self.0.lock().await })); inner.name.hash(state); } } impl Eq for AgentId {} impl PartialEq for AgentId { fn eq(&self, other: &Self) -> bool { block_in_place(|| { Handle::current().block_on(async { let left = self.0.lock().await; // match other.0.try_lock() { Ok(right) => *left == *right, Err(_) => true, } }) }) } } impl Deref for AgentId { type Target = Arc<Mutex<AgentInner>>; fn deref(&self) -> &Self::Target { &self.0 } } impl AgentId { pub fn new(name: &str, addr: SocketAddr) -> Self { let inner = AgentInner { name: name.to_owned(), addr }; Self(Arc::new(Mutex::new(inner))) } pub fn generate(addr: SocketAddr) -> Self { // получаем слайс байтов из адреса агента let bytes: &[u8] = unsafe { std::slice::from_raw_parts(&addr as *const _ as *const u8, std::mem::size_of::<SocketAddr>()) }; // кодируем половину этого массива в шестнадцатеричное представление let hex = hex::encode(&bytes[..bytes.len() / 2]); let name = format!("client{}", hex); let inner = AgentInner { name, addr }; Self(Arc::new(Mutex::new(inner))) } }
Ключ для идентификации агентов в пуле есть, а значением будем использовать AgentCommandManager, который будет хранить ссылки на каналы отправки-получения сообщений из стримов. И по мелочи будет отправлять команду агенту и дожидаться ответа.
pub struct AgentCommandManager { command_tx: mpsc::Sender<Result<ExcavatorMessage, Status>>, response_rx: broadcast::Receiver<ExcavatorResponse>, } impl AgentCommandManager { pub async fn send(&mut self, msg: ExcavatorCommand) -> Option<ExcavatorResponse> { let uid = msg.uid.clone(); let msg = ExcavatorMessage { request: Some(excavator_message::Request::Command(msg)), }; // отправляем команду self.command_tx.send(Ok(msg)).await.expect("channel closed"); // дожидаемся нужного ответа из потока while let Ok(msg) = self.response_rx.recv().await { if msg.uid == uid { return Some(msg); } } None } } impl Clone for AgentCommandManager { fn clone(&self) -> Self { Self { command_tx: self.command_tx.clone(), response_rx: self.response_rx.resubscribe(), } } }
Для хранения всего этого будем использовать обычную HashMap, обёрнутую в структуру AgentsMap.
type AgentsMapInner = Arc<RwLock<HashMap<AgentId, AgentCommandManager>>>; #[derive(Default, Clone)] pub struct AgentsMap(AgentsMapInner); impl Deref for AgentsMap { type Target = AgentsMapInner; fn deref(&self) -> &Self::Target { &self.0 } }
Отлично, теперь можем реализовать трейт Excavator. Для этого создадим структуру ExcavatorService, которая будет хранить ссылку на описанную ранее AgentsMap.
pub struct ExcavatorService(AgentsMap); impl ExcavatorService { pub fn new(map: AgentsMap) -> Self { Self(map) } } #[tonic::async_trait] impl Excavator for ExcavatorService { type RunExcavatorStream = Pin<Box<dyn Stream<Item = Result<ExcavatorMessage, Status>> + Send + 'static>>; async fn run_excavator(&self, request: Request<Streaming<ExcavatorMessage>>) -> Result<Response<Self::RunExcavatorStream>, Status> { todo!() } }
Предлагаю для начала получить heartbeat-сообщение из стрима и распаковать его до ExcavatorMessage. Другие сообщения нам неинтересны, поэтому мы один раз вызываем stream.next().
async fn run_excavator(&self, request: Request<Streaming<ExcavatorMessage>>) -> Result<Response<Self::RunExcavatorStream>, Status> { let addr = request.remote_addr().ok_or(Status::aborted("remote address not found"))?; let mut stream = request.into_inner(); match stream.next().await { Some(msg) => match msg { Ok(msg) => { if let Some(heartbeat) = msg.request { match heartbeat { excavator_message::Request::Heartbeat(_) => {} // остальные сообщения нам не интересны _ => { LOGS_MANAGER .send_log(format!( "the agent ({addr}) tried to connect, but the first message should be a heartbeat" )) .await; Err(Status::failed_precondition("no heartbeat")) } } } else { LOGS_MANAGER .send_log(format!("the agent {addr} tried to connect, but no heartbeat was detected")) .await; Err(Status::failed_precondition("empty heartbeat")) } } Err(err) => { LOGS_MANAGER .send_log(format!( "agent {addr} tried to connect, but the connection could not be established due to: {err}" )) .await; Err(Status::unknown("failed to connect")) } }, None => { LOGS_MANAGER .send_log(format!( "the agent ({addr}) tried to connect, but it failed because no message was received" )) .await; Err(Status::unknown("failed to connect")) } } }
Теперь следите за руками, так как сейчас будет много различных каналов и стримов.
excavator_message::Request::Heartbeat(_) => { let id = AgentId::generate(addr); let (command_tx, command_rx) = mpsc::channel(128); let (response_tx, response_rx) = broadcast::channel(128); let manager = AgentCommandManager { command_tx, response_rx }; self.0.write().await.insert(id.clone(), manager); // ... }
Так как соединение установлено и каналы агента сохранены, теперь можем начать слушать другие сообщения. Делать это будем в отдельном потоке. А в данном потоке возвращаем стрим.
let id = AgentId::generate(addr); let (command_tx, command_rx) = mpsc::channel(128); let (response_tx, response_rx) = broadcast::channel(128); let manager = AgentCommandManager { command_tx, response_rx }; self.0.write().await.insert(id.clone(), manager); tokio::spawn({ let id = id.clone(); async move { while let Some(msg) = stream.next().await { // ... } } }); LOGS_MANAGER .send_log(format!("agent ({}) connected successfully", id.lock().await.name)) .await; let stream = ReceiverStream::new(command_rx); Ok(Response::new(Box::pin(stream) as Self::RunExcavatorStream))
Можем реализовать цикл событий, где нас интересует лишь ответы от агентов. Как только получаем его - отправляем его всем слушателям, то есть запросам от сервиса Query.
while let Some(msg) = stream.next().await { match msg { Ok(ExcavatorMessage { request: Some(request) }) => match request { // отправляем результат команды соответствующему клиенту excavator_message::Request::Response(response) => { if let Err(err) = response_tx.send(response) { LOGS_MANAGER.send_log(format!("occurred error: {err}")).await; } } // остальные сообщения нам неинтересны excavator_message::Request::Heartbeat(_) => { LOGS_MANAGER.send_log(format!("agent ({}) sent heartbeat", id.lock().await.name)).await; } excavator_message::Request::Command(_) => { LOGS_MANAGER.send_log(format!("agent ({}) sent command", id.lock().await.name)).await; } }, Err(err) => { LOGS_MANAGER.send_log(format!("occurred error: {err}")).await; } _ => { LOGS_MANAGER .send_log(format!("received empty message from {}", id.lock().await.name)) .await; } } } async move { while let Some(msg) = stream.next().await { match msg { Ok(ExcavatorMessage { request: Some(request) }) => match request { // отправляем результат команды соответствующему клиенту excavator_message::Request::Response(response) => { if let Err(err) = response_tx.send(response) { LOGS_MANAGER.send_log(format!("occurred error: {err}")).await; break; } } // остальные сообщения нам неинтересны excavator_message::Request::Heartbeat(_) => { LOGS_MANAGER.send_log(format!("agent ({}) sent heartbeat", id.lock().await.name)).await; } excavator_message::Request::Command(_) => { LOGS_MANAGER.send_log(format!("agent ({}) sent command", id.lock().await.name)).await; } }, Err(err) => { LOGS_MANAGER.send_log(format!("occurred error: {err}")).await; break; } _ => { LOGS_MANAGER .send_log(format!("received empty message from {}", id.lock().await.name)) .await; } } } // если цикл завершился по какой-либо причине, то, вероятно, // агент просто напросто отключился, поэтому удаляем его из пула агентов LOGS_MANAGER.send_log(format!("exit agent ({}) event loop", id.lock().await.name)).await; if map.write().await.remove_entry(&id).is_some() { LOGS_MANAGER.send_log(format!("agent ({}) disconnected", id.lock().await.name)).await; } }
Всё, это вся реализация сервиса Excavator, довольно просто, не так ли? Теперь, после того, как реализовали AgentsMap, можем приступить к реализации метода query. Думаю, тут нет ничего сложного, поэтому предлагаю перейти к реализации запросов.
async fn query(&self, request: Request<QueryRequest>) -> Result<Response<QueryResponse>, Status> { let query = request.into_inner().command; if query.is_empty() { return Err(Status::invalid_argument("empty query")); } LOGS_MANAGER.send_log(format!("got query: '{query}'")).await; let query = parse_query(query.trim()) .await .map_err(|_| Status::invalid_argument("failed to parse query"))?; match query { QueryExpr::ListBy(field) => todo!(), QueryExpr::SelectFrom { from, select } => todo!(), } }
Но перед тем, как продолжить, давайте реализуем структуру EvalContext, которая будет хранить состояние нашего QueryConditionExpr. Вычислять конечное значение будем тоже через рекурсию.
#[derive(Error, Debug)] enum EvalError { #[error("not a number: {0}")] NotNumber(QueryValue), #[error("not a boolean: {0}")] NotBool(QueryValue), #[error("invoke error {0}")] InvokeError(String), } fn must_number(val: &QueryValue) -> Result<f64, EvalError> { match val { QueryValue::Number(n) => Ok(*n), _ => Err(EvalError::NotNumber(val.clone())), } } struct EvalContext<'a> { agent: &'a mut AgentCommandManager, } impl<'a> EvalContext<'a> { fn new(agent: &'a mut AgentCommandManager) -> Self { Self { agent } } async fn eval(&mut self, expr: &QueryConditionExpr) -> Result<bool, EvalError> { match expr { QueryConditionExpr::And { left, right } => { let left = Box::pin(self.eval(left)).await?; let right = Box::pin(self.eval(right)).await?; Ok(left && right) } QueryConditionExpr::Or { left, right } => { let left = Box::pin(self.eval(left)).await?; let right = Box::pin(self.eval(right)).await?; Ok(left || right) } QueryConditionExpr::Term(term) => match self.eval_term(term).await? { QueryValue::Bool(b) => Ok(b), val => Err(EvalError::NotBool(val.clone())), }, } } async fn eval_term(&mut self, term: &QueryConditionTerm) -> Result<QueryValue, EvalError> { match term { QueryConditionTerm::Eq { left, right } => Ok(QueryValue::Bool(Box::pin(self.eval_eq(left, right)).await?)), QueryConditionTerm::More { left, right } => Ok(QueryValue::Bool(Box::pin(self.eval_more(left, right)).await?)), QueryConditionTerm::Less { left, right } => Ok(QueryValue::Bool(Box::pin(self.eval_less(left, right)).await?)), QueryConditionTerm::Value(val) => Ok(val.clone()), } } async fn eval_eq(&mut self, left: &QueryConditionTerm, right: &QueryConditionTerm) -> Result<bool, EvalError> { let left = self.eval_term(left).await?; let right = self.eval_term(right).await?; if let QueryValue::FnField { func, field } = &left { // ... } else { Ok(left == right) } } async fn eval_less(&mut self, left: &QueryConditionTerm, right: &QueryConditionTerm) -> Result<bool, EvalError> { let left = must_number(&self.eval_term(left).await?)?; let right = must_number(&self.eval_term(right).await?)?; Ok(left < right) } async fn eval_more(&mut self, left: &QueryConditionTerm, right: &QueryConditionTerm) -> Result<bool, EvalError> { let left = must_number(&self.eval_term(left).await?)?; let right = must_number(&self.eval_term(right).await?)?; Ok(left > right) } }
Всё довольно просто, но давайте реализуем выполнение функций, для чего делали интерфейс структуры асинхронным. Тут мы составляем команду, после чего отправляем её агенту и дожидаемся ответа. Как только ответ пришёл, проверяем соответствует ли хоть одна строчка ответа правому значению.
if let QueryValue::FnField { func, field } = &left { let cmd = ExcavatorCommand { uid: SmallUid::new().to_string(), name: func.name.clone(), args: func .args .iter() .map(|a| ExcavatorCommandArg { key: a.name.clone(), value: a.value.clone(), }) .collect(), }; Ok(self .agent .send(cmd) .await .ok_or(EvalError::InvokeError(format!("{}.{}", func.name, field)))? .results .iter() .any(|r| { r.key == *field && (match &right { QueryValue::FnField { .. } => false, QueryValue::Identifier(i) => r.value == *i, QueryValue::String(s) => r.value == *s, QueryValue::Number(n) => { if let Ok(val) = r.value.parse::<f64>() { val == *n } else { false } } QueryValue::Bool(b) => *b, QueryValue::Null => r.value == "null", }) })) }
Теперь можем реализовать ListBy. Предлагаю начать с ветки без условия. Она довольно простая, ведь мы просто итерируемся по всему пулу агентов и в зависимости от вхожего значения field подставляем нужное значение, после чего мапим результат к нужному виду.
QueryExpr::ListBy { field, condition } => { let mut map = self.1.write().await; if let Some(condition) = condition { todo!() } else { let rows = stream::iter(map.keys()) .filter_map(|k| { let field = field.clone(); async move { let id = k.lock().await; Some(match field.as_str() { "name" => id.name.clone(), "addr" => id.addr.to_string(), _ => return None, }) } }) .map(|data| TableRow { cols: vec![TableCol { key: field.clone(), data }], }) .collect::<Vec<_>>() .await; let response = Response::new(QueryResponse { rows }); Ok(response) } }
С условием делаем похожее, но в этот раз инициализируем наш вычислительный контекст, ловим результат и уже в зависимости от поля field подставляем результат.
if let Some(condition) = condition { let condition = Arc::new(condition); let rows = stream::iter(map.iter_mut()) .filter_map({ |(id, agent)| { let condition = condition.clone(); let field = field.clone(); async move { let result = match EvalContext::new(agent).eval(&condition).await { Ok(res) => res, Err(err) => { LOGS_MANAGER.send_log(format!("failed to evaluate condition: '{err}'")).await; return None; } }; if result { let id = id.lock().await; Some(match field.as_str() { "name" => id.name.clone(), "addr" => id.addr.to_string(), _ => return None, }) } else { None } } } }) .map({ let field = field.clone(); move |r| TableRow { cols: vec![TableCol { key: field.clone(), data: r }], } }) .collect::<Vec<_>>() .await; let response = Response::new(QueryResponse { rows }); Ok(response) }
Реализуем теперь SelectFrom. Для начала получаем агента. Если не получилось его найти по хэшу, то итерируемся по HashMap и ищем значение, которое начинается на полученную строку. Подглядел это у git, где достаточно лишь часть хэша коммита ввести, ведь у нас довольно длинное имя агента получается.
QueryExpr::SelectFrom { from, select } => { let map = self.1.read().await; let id = AgentId::new(&from, SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), 0))); let mut agent = if let Some(agent) = map.get(&id).cloned() { agent } else { stream::iter(map.iter()) .filter_map(|(k, m)| Box::pin(async { if k.lock().await.name.starts_with(&from) { Some(m.clone()) } else { None } })) .next() .await .ok_or(Status::not_found("failed to get agent id"))? }; // ... }
Агента мы получили, но давайте теперь реализуем выполнение команд.
QueryExpr::SelectFrom { from, select } => { // ... let mut rows = Vec::new(); for crate::query::InvokeFunc { name, args } in select { let uid = SmallUid::new().to_string(); let args = args.into_iter().map(|a| ExcavatorCommandArg { key: a.name, value: a.value }).collect(); LOGS_MANAGER.send_log(format!("executing command: '{name}' with args: '{args:?}'")).await; let result = agent .send(ExcavatorCommand { uid, name: name.clone(), args, }) .await; // преобразуем результат в табличный вид if let Some(response) = result { LOGS_MANAGER .send_log(format!("'{name}' command is done with status '{}'", response.code)) .await; let cols = response .results .into_iter() .fold(HashMap::new(), |mut acc: HashMap<String, Vec<String>>, res| { if let Some(col) = acc.get_mut(&res.key) { col.push(res.value); } else { acc.insert(res.key, vec![res.value]); } acc }); if rows.is_empty() { for (col_name, values) in cols { for row in values { rows.push(TableRow { cols: vec![TableCol { key: col_name.clone(), data: row, }], }); } } } else { for (col_name, values) in &cols { for (i, r) in values.iter().enumerate() { let row = rows.get_mut(i); if let Some(row) = row { row.cols.push(TableCol { key: col_name.clone(), data: r.clone(), }); } else { let cols_len = rows.first().map(|r| r.cols.len()).unwrap_or(0); let cols = (0..cols_len) .filter_map(|i| { cols.iter().nth(i).map(|(c, _)| TableCol { key: c.clone(), data: Default::default(), }) }) .chain(vec![TableCol { key: col_name.clone(), data: r.clone(), }]) .collect(); rows.push(TableRow { cols }); } } } } } else { return Err(Status::cancelled("failed to execute command".to_string())); } } let response = Response::new(QueryResponse { rows }); Ok(response) }
Сервисы готовы, но сам сервер пока ничего не слушает и даже не знает ничего об этих сервисах, предлагаю это исправить. Теперь наш сервер слушает 1299 порт и готов отвечать на запросы.
#[tokio::main] async fn main() { // переложим бремя парсинга адреса на `SocketAddr` let addr = format!("{}:{}", "0.0.0.0", 1299) .parse() .expect("could not parse address"); let client_map = AgentsMap::default(); LOGS_MANAGER.send_log(format!("server listening on {}", addr)).await; Server::builder() .add_service(ExcavatorServer::new(ExcavatorService::new(client_map.clone()))) .add_service(QueryServer::new(QueryService::new(LOGS_MANAGER.subscribe(), client_map))) .serve(addr) .await .expect("failed to serve") }
Агент
Сервер мы реализовали, но вот того, кто будет выполнять команды, нету. Предлагаю это исправить, создав проект agent.
cargo new agent
Сразу в зависимости добавим необходимые крейты.
[package] name = "agent" version = "0.1.0" edition = "2024" [dependencies] tokio = { version = "1.52", features = ["rt", "rt-multi-thread", "fs", "macros"] } tokio-stream = "0.1" tonic = "0.14" prost = "0.14" tonic-prost = "0.14" async-trait = "0.1" knus = "3.3" [build-dependencies] build-common = { path = "../build-common" }
Создаём agent\build.rs со следующим содержимым. Оно немного отличается от предыдущего скрипта, так как будем ещё копировать файл agent.kdl, конфиг агента, в выходную директорию.
use std::{env, fs, path::PathBuf}; const CONFIG_FILE: &str = "agent.kdl"; fn main() { let proto_folder = build_common::get_proto_folder(); build_common::compile_protos_folder(&proto_folder).unwrap(); println!("cargo:rerun-if-changed={CONFIG_FILE}"); let target_folder = PathBuf::from_iter([ env::var("CARGO_MANIFEST_DIR").unwrap(), "..".to_owned(), "target".to_owned(), env::var("PROFILE").unwrap(), ]); let config_path = target_folder.join(CONFIG_FILE); fs::copy(CONFIG_FILE, &config_path).unwrap(); }
Предлагаю создать этот конфиг-файл agent\agent.kdl со следующим содержимым. Да, небогато, но это пока что.
server "127.0.0.1"
Но, чтобы распарсить этот конфиг, нужна модель для knus. Предлагаю описать её, создав модуль agent\src\config.rs.
#[derive(Decode)] pub struct Server { #[knus(argument)] pub address: String, #[knus(argument)] pub port: Option<u16>, } #[derive(Decode)] pub struct Config { #[knus(child)] pub server: Server, }
Создаём модуль agent\src\proto.rs, где подключаем сгенерированный код gRPC.
tonic::include_proto!("main");
Но что это за агент, если он ничего не умеет. Предлагаю это решить тем, что создадим трейт Module в agent\src\modules\mod.rs, чтобы добиться модульности агента. Хотя я бы предпочёл заводить модули на сервере на каком-нибудь скриптовом языке или байт-коде и рассылать их агентам в случае необходимости, но на данный момент это слишком заморочено.
#[async_trait::async_trait] pub trait Module { fn name(&self) -> &'static str; fn description(&self) -> &'static str; fn args(&self) -> Vec<ModuleArg>; async fn execute(&self, args: Args) -> ExecuteResult; } pub struct ExecuteResult { pub code: i64, pub output: Vec<String>, } pub struct ModuleArg { pub name: &'static str, pub description: &'static str, pub required: bool, pub default: Option<String>, }
Модули нужно где-то хранить, поэтому предлагаю создать структуру ModulesRegistry. Так как у нас HashMap внутри RwLock и чтобы каждый раз не вызывать await после каждого вызова метода, то мы залочим RwLock и будем передавать ссылку на его внутренний HashMap.
type ModulesMap = HashMap<&'static str, Arc<dyn Module + Send + Sync>>; pub struct ModulesRegistryBuilder<'a>(&'a mut ModulesMap); impl ModulesRegistryBuilder<'_> { pub fn register(&mut self, module: impl Module + 'static + Send + Sync) -> &mut Self { self.0.insert(module.name(), Arc::new(module)); self } } #[derive(Default, Clone)] pub struct ModulesRegistry(Arc<RwLock<ModulesMap>>); impl ModulesRegistry { pub async fn build<F>(&self, builder: F) where F: FnOnce(&mut ModulesRegistryBuilder, &Self), { let mut s = self.0.write().await; builder(&mut ModulesRegistryBuilder(&mut s), self); } pub async fn get(&self, name: &str) -> Option<Arc<dyn Module + Send + Sync>> { self.0.read().await.get(name).cloned() } pub async fn get_all(&self) -> Vec<&'static str> { self.0.read().await.values().map(|v| v.name()).collect() } }
Давайте создадим пару модулей, они будут очень простые.
// agent\src\modules\info.rs pub struct GetInfoModule(ModulesRegistry); unsafe impl Send for GetInfoModule {} impl GetInfoModule { pub fn new(registry: ModulesRegistry) -> Self { Self(registry) } } #[async_trait::async_trait] impl Module for GetInfoModule { fn name(&self) -> &'static str { "info" } fn description(&self) -> &'static str { "get information about the agent modules" } fn args(&self) -> Vec<ModuleArg> { vec![ ModuleArg { name: "type", description: "type of information to retrieve (modules, etc.)", required: true, default: None, }, ModuleArg { name: "name", description: "name of the module to retrieve information about", required: false, default: None, }, ] } async fn execute(&self, args: Args) -> ExecuteResult { let ty = args.get("type"); if let Some(ty) = ty { match ty.as_str() { "modules" => { let modules = self.0.get_all().await.into_iter().map(|s| s.to_owned()).collect(); ExecuteResult { code: 0, output: modules } } _ => ExecuteResult { code: 1, output: vec!["unknown type".to_string()], }, } } else { ExecuteResult { code: 1, output: vec!["missing type parameter".to_string()], } } } } // agent\src\modules\version.rs pub struct AgentVersionModule; unsafe impl Send for AgentVersionModule {} #[async_trait::async_trait] impl Module for AgentVersionModule { fn name(&self) -> &'static str { "version" } fn description(&self) -> &'static str { "get agent version" } fn args(&self) -> Vec<ModuleArg> { vec![] } async fn execute(&self, _: Args) -> ExecuteResult { ExecuteResult { code: 0, output: vec![env!("CARGO_PKG_VERSION").to_string()], } } }
Теперь можем инициализировать и зарегистрировать модули, после чего считать конфиг с адресом сервера.
#[tokio::main] async fn main() { let registry = ModulesRegistry::default(); registry.build(|builder, registry| { builder .register(AgentVersionModule) .register(GetInfoModule::new(registry.clone())); }).await; let config_path = path::use_config_path(); let config_content = tokio::fs::read_to_string(&config_path).await.expect("unable to read config file"); let config: Config = knus::parse("config", &config_content).expect("failed to parse config"); // ... }
К счастью, клиент не нужно реализовывать, кодогенерация всё сделала за нас и наш клиент готов, достаточно указать адрес сервера. И сразу посылаем heartbeat-сообщение и получаем поток.
#[tokio::main] async fn main() { // ... let addr = format!("http://{}:{}", config.server.address, config.server.port.unwrap_or(1299)); println!("listening server at {}", addr); let mut client = ExcavatorClient::connect(addr).await.expect("failed to connect to excavator"); let (tx, rx) = tokio::sync::mpsc::channel(128); tx.send(ExcavatorMessage { request: Some(Request::Heartbeat(ExcavatorHeartbeat::default())), }) .await .expect("failed to send heartbeat"); let mut stream = client .run_excavator(ReceiverStream::new(rx)) .await .expect("failed to run excavator") .into_inner(); // ... }
Теперь можем начать слушать поток и обрабатывать входящие команды. Чтобы не блокировать поток и не пропускать другие команды от сервера, запускаем отдельный поток.
while let Some(msg) = stream.next().await { let Ok(msg) = msg else { continue; }; let Some(command) = msg.request else { continue; }; match command { Request::Command(cmd) => { // парсим аргументы let args = Args::new(cmd.args); // получаем модуль let module = registry.get(&cmd.name).await; if let Some(module) = module { let tx = tx.clone(); // спавним поток под задачу tokio::spawn(async move { // выполняем команду let ExecuteResult { code, output } = module.execute(args).await; let results = output .into_iter() .map(|output| MessageResult { key: cmd.name.clone(), value: output, }) .collect(); // отправляем результат обратно серверу tx.send(Message::new(cmd.uid, code, results).into()) .await .expect("failed to send response"); }); } else { tx.send( Message::new( cmd.uid, 1, vec![MessageResult { key: "error".to_string(), value: format!("module '{}' not found", cmd.name), }], ) .into(), ) .await .expect("failed to send response"); } } Request::Response(_) => {} Request::Heartbeat(_) => {} } }
Клиент
Отлично, всё готово, осталось намалевать клиент. Он будет очень простым, поэтому будем использовать TUI для взаимодействия с пользователем.
Инициализируем проект.
cargo new client-tui
Добавляем в зависимости нужные крейты.
[package] name = "client-tui" version = "0.1.0" edition = "2024" [dependencies] tokio = { version = "1.52", features = ["rt", "rt-multi-thread", "macros"] } # для кодогенерации tonic = "0.14" prost = "0.14" tonic-prost = "0.14" # для парсинга аргументов clap = { version = "4.6.1", features = ["derive"] } # для TUI ratatui = "0.30" tui-input = "0.15" [build-dependencies] build-common = { path = "../build-common" }
И также создаём скрипт client-tui\build.rs с таким же содержимым, как у сервера, так как никаких конфигов у нас не будет.
fn main() { let proto_folder = build_common::get_proto_folder(); build_common::compile_protos_folder(&proto_folder).unwrap(); }
И включаем сгенерированный код в модуле client-tui\src\proto.rs.
tonic::include_proto!("client");
Если речь зашла о том, что конфигов у нас не будет, будем использовать аргументы. Для того, чтобы распарсить их, будем использовать clap. Давайте для него создадим простую модель.
#[derive(Subcommand)] pub enum Command { #[command(about = "Connect to a Selecit server")] Connect { host: String, port: Option<u16> }, } #[derive(Parser)] #[command(about = "Selecit client for connecting to Selecit servers")] pub struct Args { #[command(subcommand)] pub command: Command, }
Наш TUI будет очень простым, поэтому не буду особо подробно останавливаться на этом. Что тут происходит можете посмотреть в документации ratatui и tui-input. К сожалению, библиотека не поддерживает асинхронность, но предлагает для этого акторную модель, но мне было слишком лень это делать, проще воткнуть блокировку посредством block_in_place.
#[derive(Default, Clone, Copy)] enum Tab { #[default] Query = 0, Logs = 1, } impl Tab { fn next(self) -> Option<Self> { match self { Tab::Query => Some(Tab::Logs), Tab::Logs => None, } } fn back(self) -> Option<Self> { match self { Tab::Query => None, Tab::Logs => Some(Tab::Query), } } } #[derive(Default, PartialEq)] enum AppMode { #[default] Input, Normal, } #[derive(Default)] pub struct App { current_tab: Tab, app_mode: AppMode, addr: String, input: Input, client: Option<QueryClient<Channel>>, logs_cache: Arc<RwLock<Vec<String>>>, query_cache: Option<Vec<TableRow>>, } impl App { pub async fn new(addr: String, mut client: QueryClient<Channel>) -> Self { let mut stream = client.logs(LogsQueryRequest {}).await.expect("unable to load server logs").into_inner(); let logs = Arc::new(RwLock::new(Vec::new())); tokio::spawn({ let logs = logs.clone(); async move { loop { while let Some(entry) = stream.next().await { match entry { Ok(entry) => { for entry in entry.log { logs.write().await.push(entry); } } Err(err) => { logs.write().await.push(format!("unable to read logs because: {}", err)); } } } } } }); Self { addr, client: Some(client), logs_cache: logs, ..Default::default() } } pub fn run(mut self, terminal: &mut DefaultTerminal) -> io::Result<()> { loop { terminal.draw(|frame| self.render(frame))?; let event = event::read()?; if let Event::Key(key) = event { match key.code { KeyCode::Char('q') if key.modifiers.contains(KeyModifiers::CONTROL) => return Ok(()), KeyCode::Enter if self.app_mode == AppMode::Input && !self.input.value().is_empty() => { if let Some(ref mut client) = self.client { let command = self.input.value_and_reset(); let result = block_in_place(|| Handle::current().block_on(async move { client.query(QueryRequest { command }).await })); match result { Ok(response) => { let rows = response.into_inner().rows; self.query_cache = Some(rows); } Err(err) => block_in_place(|| { Handle::current().block_on(async { self.logs_cache .write() .await .push(format!("an error occurred while trying to execute the command: {err}")) }); }), } } } KeyCode::Right if let Some(tab) = self.current_tab.next() => { self.stop_editing(); self.current_tab = tab; } KeyCode::Left if let Some(tab) = self.current_tab.back() => { self.start_editing(); self.current_tab = tab; } _ if self.app_mode == AppMode::Input => { self.input.handle_event(&event); } _ => {} } } } } fn start_editing(&mut self) { self.app_mode = AppMode::Input; } fn stop_editing(&mut self) { self.app_mode = AppMode::Normal; } fn render(&mut self, frame: &mut Frame) { let [title_area, tabs_area, content_area] = Layout::vertical([Constraint::Length(1), Constraint::Length(1), Constraint::Fill(1)]).areas(frame.area()); let title = Line::from(format!("Selecit Client - connected to {}", self.addr)).centered(); frame.render_widget(title, title_area); let tabs = Tabs::new(vec!["Query", "Logs"]).select(self.current_tab as usize); frame.render_widget(tabs, tabs_area); match self.current_tab { Tab::Query => self.render_query(frame, content_area), Tab::Logs => self.render_logs(frame, content_area), } } fn render_query(&self, frame: &mut Frame, content_area: Rect) { let [table_area, input_area] = Layout::vertical([Constraint::Fill(8), Constraint::Fill(2)]).areas(content_area); let (rows, widths) = if let Some(rows) = self.query_cache.as_ref() { // получаем ссылку на первую строчку, чтобы составить из неё заголовок таблицы if let Some(first) = rows.first() { let widths = first.cols.iter().map(|_| Constraint::Percentage(30)).collect(); let cols = first.cols.iter().map(|c| c.key.clone()).collect::<Vec<_>>(); let body = rows .into_iter() .map(|r| Row::new(r.cols.iter().map(|c| c.data.clone()).collect::<Vec<_>>())) .collect::<Vec<_>>(); let rows = vec![Row::new(cols)].into_iter().chain(body).collect(); (rows, widths) } else { (vec![], vec![]) } } else { (vec![], vec![]) }; let table = Table::new(rows, widths).block(Block::bordered()); frame.render_widget(table, table_area); let input = Paragraph::new(self.input.value()).block(Block::bordered().title("Command")); frame.render_widget(input, input_area); } fn render_logs(&self, frame: &mut Frame, content_area: Rect) { let logs = block_in_place(|| Handle::current().block_on(async { self.logs_cache.read().await })); let list = List::new(logs.iter().cloned()).block(Block::bordered()); frame.render_widget(list, content_area); } }
Отлично, теперь можем запустить проект.
cargo run -p server cargo run -p agent cargo run -p client-tui -- connect 127.0.0.1
И пробуем получить список агентов с помощью команды list by name.

Или попробуем отправить команду агенту и получить результат.

Можем также с помощью стрелочек переключиться на другую вкладку и посмотреть логи.

Безопасность
TLS
Всё отлично работает, но есть одно но: весь трафик незашифрован, а это значит, что любой может прочитать наши сообщения, как пример:

Чтобы это исправить, давайте подключим TLS, чтобы гонять трафик по HTTPS. Для этого я сначала сгенерирую сертификаты в директории certs с помощью скрипта certs\generate.ps1.
$SUBJ = "/C=RU/ST=Test/L=Test/O=Selecit/OU=/CN=localhost/emailAddress=" # CA openssl req -x509 -newkey rsa:4096 -days 36500 -keyout ca-key.pem -out ca-cert.pem -nodes -subj $SUBJ # CSR openssl req -newkey rsa:4096 -keyout server-key.pem -out server-req.pem -subj $SUBJ -nodes # server cert openssl x509 -req -in server-req.pem -CA ca-cert.pem -CAkey ca-key.pem -CAcreateserial -out server-cert.pem -extfile localhost.ext
Чтобы корректно разрешать адрес сервера, создадим файл localhost.ext со следующим содержимым:
authorityKeyIdentifier=keyid,issuer basicConstraints=CA:FALSE subjectAltName = @alt_names [alt_names] DNS.1 = localhost IP.1 = 127.0.0.1
После корректного выполнения скрипта должна появиться пачка сертификатов, как на скриншоте:

Так как сразу во всех проектах будет работа с файлами, то для удобства создам ещё один проект common, в котором будут функции для получения путей до каких-либо директорий.
cargo new --lib common
Создам модуль common\src\path.rs со следующим содержимым:
use std::{env, path::PathBuf}; const CERTS_FOLDER: &str = "certs"; pub fn use_app_folder() -> PathBuf { env::current_exe() .expect("failed to get current executable") .parent() .expect("failed to get current executable parent") .to_path_buf() } #[cfg(debug_assertions)] pub fn use_project_folder() -> PathBuf { PathBuf::from(env::var("CARGO_MANIFEST_DIR").expect("failed to get CARGO_MANIFEST_DIR")) } #[cfg(debug_assertions)] pub fn use_workspace_folder() -> PathBuf { use_project_folder().join("..") } pub fn use_certs_folder() -> PathBuf { if cfg!(debug_assertions) { use_workspace_folder().join(CERTS_FOLDER) } else { use_app_folder().join(CERTS_FOLDER) } }
А в файле common\src\lib.rs добавлю константу SERVER_PORT. Всё, это всё назначение этого проекта.
pub mod path; pub const SERVER_PORT: u16 = 1299;
Теперь можем перейти к проекту сервера и создать модуль server\src\config.rs, в котором опишем модель для конфига.
use std::path::PathBuf; use crate::path; use knus::Decode; #[derive(Decode)] pub struct Server { pub address: String, pub port: u16, } #[derive(Decode, Clone)] pub struct Auth { #[knus(child, unwrap(argument))] pub token: String, } #[derive(Decode, Default)] pub struct Path { #[knus(argument)] pub path: PathBuf } #[derive(Decode)] pub struct Certificates { #[knus(child)] pub server_cert: Path, #[knus(child)] pub server_key: Path, } #[derive(Decode, Default)] pub struct Config { #[knus(child)] pub server: Option<Server>, #[knus(child)] pub auth: Option<Auth>, #[knus(child)] pub certificates: Option<Certificates>, } pub async fn use_config() -> Config { let config_path = path::use_config_path(); if tokio::fs::metadata(&config_path).await.is_ok() { let config_content = tokio::fs::read_to_string(config_path).await.expect("could not read config file"); knus::parse("config", &config_content).expect("could not parse config file") } else { Config::default() } }
И создадим сам конфиг по пути server\server.kdl и добавим пути до файлов сертификатов. Можно написать относительный путь, так как далее будем разрешать это.
certificates { server-cert "server-cert.pem" server-key "server-key.pem" }
Бежим в server\src\main.rs и немного переделываем функцию main. Тут всё довольно просто, не вижу особого смысла разглагольствовать, поэтому предлагаю перейти далее.
#[tokio::main] async fn main() { let config = use_config().await; let addr = if let Some(server) = config.server { format!("{}:{}", server.address, server.port).parse().expect("could not parse address") } else { SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), SERVER_PORT) }; let client_map = AgentsMap::default(); LOGS_MANAGER.send_log(format!("server listening on {}", addr)).await; let mut builder = if let Some(certs) = config.certificates { let cert_path = { let path = certs.server_cert.path; if path.is_absolute() { path } else { use_certs_folder().join(path) } }; let key_path = { let path = certs.server_key.path; if path.is_absolute() { path } else { use_certs_folder().join(path) } }; let cert = tokio::fs::read_to_string(cert_path).await.expect("could not read certificate"); let key = tokio::fs::read_to_string(key_path).await.expect("could not read key"); let identity = Identity::from_pem(cert, key); let tls_config = ServerTlsConfig::new().identity(identity); match Server::builder().tls_config(tls_config) { Ok(b) => { LOGS_MANAGER.send_log("server using tls".to_owned()).await; b } Err(err) => { LOGS_MANAGER.send_log(format!("failed to set tls config: {}", err)).await; Server::builder() } } } else { Server::builder() }; builder .add_service(ExcavatorServer::new(ExcavatorService::new(client_map.clone()))) .add_service(QueryServer::new(QueryService::new(LOGS_MANAGER.subscribe(), client_map))) .serve(addr) .await .expect("failed to serve") }
Перейдём к агенту и сделаем примерно то же самое. Изменим немного модель конфига у агента в файле agent\src\config.rs.
use knus::Decode; #[derive(Decode)] pub struct Server { #[knus(argument)] pub address: String, #[knus(argument)] pub port: Option<u16>, } #[derive(Decode)] pub struct Certificate { #[knus(argument)] pub ca_cert: String, } #[derive(Decode, Default)] pub struct Auth { #[knus(child, unwrap(argument))] pub token: String, } #[derive(Decode)] pub struct Config { #[knus(child)] pub server: Server, #[knus(child)] pub auth: Option<Auth>, #[knus(child)] pub certificate: Option<Certificate>, }
Теперь можем добавить настройки TLS клиенту в main функции.
#[tokio::main] async fn main() { // ... let config_path = path::use_config_path(); let config_content = tokio::fs::read_to_string(&config_path).await.expect("unable to read config file"); let config: Config = knus::parse("config", &config_content).expect("failed to parse config"); let mut client = if let Some(cert) = config.certificate { let cert = { let path = PathBuf::from(cert.ca_cert); if path.is_absolute() { path } else { use_certs_folder().join(path) } }; let cert_content = tokio::fs::read_to_string(cert).await.expect("unable to read certificate file"); let cert = Certificate::from_pem(cert_content); let addr = format!("https://{}:{}", config.server.address, config.server.port.unwrap_or(SERVER_PORT)); let tls = ClientTlsConfig::new().ca_certificate(cert); let channel = Channel::from_shared(addr.clone()) .unwrap() .tls_config(tls) .unwrap() .connect() .await .expect("TLS connection failed"); println!("listening server at {}", addr); ExcavatorClient::new(channel) } else { let addr = format!("http://{}:{}", config.server.address, config.server.port.unwrap_or(SERVER_PORT)); println!("listening server at {}", addr); ExcavatorClient::connect(addr.clone()).await.expect("connection to server failed") }; // ... }
Теперь наш трафик успешно шифруется, в чём можно убедиться, посмотрев в WireShark, что у нас теперь гоняются TLS пакеты.

Аутентификация
Трафик шифруется, но подключиться может всё равно каждый. Давайте это исправим, добавив аутентификацию пользователей.
Тут есть два пути: классическая аутентификация по каком-нибудь паролю или mTLS (пример из репозитория tonic). Тут выбирайте сами, что вам удобнее или проще, но я для примера буду использовать простой токен.
Для этого добавим слой в server\src\main.rs, который будет обрабатывать все входящие запросы и проверять, есть ли в заголовках поле authorization с нужным нам токеном.
#[tokio::main] async fn main() { let config = use_config().await; // ... if config.auth.is_some() { LOGS_MANAGER.send_log("server using token auth".to_owned()).await; } let mut auth = config.auth; builder .layer(InterceptorLayer::new(move |req: Request<()>| { if let Some(auth) = auth.as_mut() { if req .metadata() .get("authorization") .map(|header| header.to_str()) .is_some_and(|s| s.is_ok_and(|s| s == auth.token)) { Ok(req) } else { Err(Status::unauthenticated("unauthorized")) } } else { Ok(req) } })) .add_service(ExcavatorServer::new(ExcavatorService::new(client_map.clone()))) .add_service(QueryServer::new(QueryService::new(LOGS_MANAGER.subscribe(), client_map))) .serve(addr) .await .expect("failed to serve") }
Теперь можем модифицировать модель для аргументов в client-tui\src\args.
use clap::{Parser, Subcommand}; #[derive(Subcommand)] pub enum Command { #[command(about = "Connect to a Selecit server")] Connect { /// Server host host: String, /// Server port port: Option<u16>, /// Path to the CA certificate #[arg(long)] ca: Option<String>, /// Authentication token #[arg(long, short)] token: Option<String>, }, } #[derive(Parser)] #[command(about = "Selecit client for connecting to Selecit servers")] pub struct Args { #[command(subcommand)] pub command: Command, }
И в client-tui\srv\main.rs создадим замыкание, которое будет вставлять в заголовки запроса токен из аргументов. После чего закидываем это замыкание в QueryClient::with_interceptor. Мне пришлось упаковать замыкание в Box, так как client дальше передаётся в App, из-за чего компилятор жаловался на несоответствие типов.
pub type InterceptFn = Box<dyn FnMut(Request<()>) -> Result<Request<()>, Status>>; #[tokio::main] async fn main() { let args = Args::parse(); match args.command { Command::Connect { host, port, ca: ca_cert, mut token, } => { let interceptor: InterceptFn = Box::new(move |mut req: tonic::Request<()>| { if let Some(auth) = token.as_mut() { req.metadata_mut().insert(AUTHORIZATION, MetadataValue::from_str(auth.as_str()).unwrap()); } Ok(req) }); let (client, server_addr) = if let Some(ca_cert) = ca_cert { // ... (QueryClient::with_interceptor(channel, interceptor), server_addr) } else { // ... (QueryClient::with_interceptor(channel, interceptor), server_addr) }; let mut terminal = ratatui::init(); App::new(server_addr, client).await.run(&mut terminal).expect("failed to run app"); ratatui::restore(); } } }
Можем теперь проделать то же самое и с агентом.
#[tokio::main] async fn main() { // ... let config_path = path::use_config_path(); let config_content = tokio::fs::read_to_string(&config_path).await.expect("unable to read config file"); let mut config: Config = knus::parse("config", &config_content).expect("failed to parse config"); let interceptor = move |mut req: tonic::Request<()>| { if let Some(auth) = config.auth.as_mut() { req.metadata_mut() .insert(AUTHORIZATION, MetadataValue::from_str(auth.token.as_str()).unwrap()); } Ok(req) }; let mut client = if let Some(cert) = config.certificate { // ... ExcavatorClient::with_interceptor(channel, interceptor) } else { // ... ExcavatorClient::with_interceptor(channel, interceptor) }; // ... }
Итого
Мы реализовали систему распределённого управления, которая позволяет нам получать список агентов, подходящие под какие-то нужные нам критерии, и даже выполнять команды посредством этих агентов на удалённых хостах.
Наша система далеко не идеальна, везде используются except да unwrap, нет переподключения агентов, язык запросов пока что в зачаточном состоянии, нет никакого кэширования, нужно прикрутить БД для хранения какого-то состояния системы, и куча других проблем, но, доработав слабые места, думаю, система жизнеспособна.
Если кому нужно посмотреть полный код проекта, то вот его репозиторий.
Из похожего я нашёл bssh, который стремится или даже уже достиг что-то похожее, к чему стремился я в начале статьи, но он выбрал другой путь работы поверх SSH.
