Привет, Хабр!
Сегодня мы рассмотрим, как реализовать так называемую event-driven архитектуру с использованием Rust.
Архитектура на основе событий (event-driven architecture, EDA) — это подход к созданию систем, где взаимодействие между компонентами системы происходит с помощью событий. Все это позволяет развязывать компоненты друг от друга и повышать их независимость, что, в свою очередь, увеличивает масштабируемость и гибкость системы.
Как работает EDA?
События: Основные данные или действия, которые происходят в системе, например, нажатие кнопки или завершение загрузки файла.
Производители: Компоненты, которые создают события и отправляют их в систему.
Потребители: Компоненты, которые подписаны на события и реагируют на них.
Брокеры событий: Инструменты или системы, которые управляют передачей событий между производителями и потребителями.
С развитием экосистемы Rust появились хорошие инструменты для работы с архитектурой на основе событий, такие как:
Tokio: асинхронная платформа для работы с сетями.
Actix: высокопроизводительный фреймворк для создания акторных систем.
async-std: асинхронный стандарт для работы с Rust.
Установка и настройка среды
Установим Rust:
curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh
Установим необходимые библиотеки:
tokio
async-std
actix
Простая система на основе событий с использованием Tokio
Создадим простое приложение, которое будет отправлять и обрабатывать события с помощью Tokio.
cargo new event_driven_example
cd event_driven_example
Добавим зависимости в Cargo.toml
:
[dependencies]
tokio = { version = "1.0", features = ["full"] }
А теперь напишем сам код:
use tokio::sync::mpsc;
use tokio::task;
#[tokio::main]
async fn main() {
// Создаем канал для передачи сообщений
let (tx, mut rx) = mpsc::channel(32);
// Запускаем асинхронную задачу для обработки событий
task::spawn(async move {
while let Some(event) = rx.recv().await {
println!("Обработано событие: {}", event);
}
});
// Генерация событий
for i in 1..=10 {
tx.send(format!("Событие {}", i)).await.unwrap();
}
}
Здесь мы:
Создали канал для передачи сообщений между производителем и потребителем.
Запустили асинхронную задачу для обработки событий с помощью
tokio::task::spawn
.Генерируем события и отправляем их в канал.
Использование Actix для акторной модели
Теперь рассмотрим использование Actix, чтобы создать более сложную систему на основе акторов.
Добавим зависимости в Cargo.toml
:
[dependencies]
actix = "0.12"
actix-web = "4.0.0-beta.8"
serde = { version = "1.0", features = ["derive"] }
Создадим акторы:
use actix::prelude::*;
use serde::{Serialize, Deserialize};
#[derive(Message, Serialize, Deserialize)]
#[rtype(result = "()")]
struct Event {
id: u32,
message: String,
}
struct EventProducer;
impl Actor for EventProducer {
type Context = Context<Self>;
}
impl Handler<Event> for EventProducer {
type Result = ();
fn handle(&mut self, event: Event, _: &mut Context<Self>) {
println!("Произведено событие: {} - {}", event.id, event.message);
}
}
struct EventConsumer;
impl Actor for EventConsumer {
type Context = Context<Self>;
}
impl Handler<Event> for EventConsumer {
type Result = ();
fn handle(&mut self, event: Event, _: &mut Context<Self>) {
println!("Получено событие: {} - {}", event.id, event.message);
}
}
#[actix::main]
async fn main() {
let producer = EventProducer.start();
let consumer = EventConsumer.start();
for i in 1..=10 {
let event = Event {
id: i,
message: format!("Сообщение {}", i),
};
producer.do_send(event.clone());
consumer.do_send(event);
}
}
Объяснение:
Создаем акторов
EventProducer
иEventConsumer
, которые обрабатывают события.Определяем структуру
Event
и реализуем для нее сообщениеMessage
.Запускаем акторов и передаем им события с помощью
do_send
.
Реализация брокера событий
Брокеры событий могут быть хороши для управления маршрутизацией событий между различными компонентами системы. Можно создать простой брокер с использованием библиотеки tokio
.
Добавлим зависимости в Cargo.toml
:
[dependencies]
tokio = { version = "1.0", features = ["full"] }
Создадим брокер:
use tokio::sync::mpsc;
use tokio::task;
struct EventBroker {
sender: mpsc::Sender<String>,
receiver: mpsc::Receiver<String>,
}
impl EventBroker {
fn new(buffer_size: usize) -> Self {
let (sender, receiver) = mpsc::channel(buffer_size);
EventBroker { sender, receiver }
}
async fn start(&mut self) {
while let Some(event) = self.receiver.recv().await {
println!("Брокер обработал событие: {}", event);
}
}
async fn send_event(&self, event: String) {
self.sender.send(event).await.unwrap();
}
}
#[tokio::main]
async fn main() {
let mut broker = EventBroker::new(32);
task::spawn(async move {
broker.start().await;
});
for i in 1..=10 {
broker.send_event(format!("Событие {}", i)).await;
}
}
Здесь мы:
Создали
EventBroker
с каналом для передачи событий.Запутили брокер в асинхронной задаче и обрабатываем входящие события.
Отправляем события в брокер с помощью
send_event
.
Оптимизация и масштабируемость
Kafka и NATS
Apache Kafka - распределенная система для обработки потоков данных в реальном времени.
NATS - высокопроизводительная система обмена сообщениями с поддержкой pub/sub и request/reply.
[dependencies]
tokio = { version = "1.0", features = ["full"] }
rdkafka = "0.29"
nats = "0.19"
// пример подключения к Kafka
use rdkafka::config::ClientConfig;
use rdkafka::producer::{FutureProducer, FutureRecord};
async fn produce_kafka_event() {
let producer: FutureProducer = ClientConfig::new()
.set("bootstrap.servers", "localhost:9092")
.create()
.unwrap();
producer.send(
FutureRecord::to("my-topic")
.payload("Это сообщение для Kafka")
.key("ключ"),
0,
).await.unwrap();
}
// пример подключения к NATS
use nats::asynk::Connection;
async fn publish_nats_event() {
let nc = Connection::connect("localhost:4222").await.unwrap();
nc.publish("events", "Это сообщение для NATS").await.unwrap();
}
Примеры
Допустим, нужно обрабатывать большое количество событий в реальном времени и сохранять результаты в базе данных.
Для этого мы будем использовать Kafka для приема и обработки событий и Rust для обработки данных и записи результатов в базу данных:
use rdkafka::consumer::{Consumer, StreamConsumer};
use rdkafka::Message;
use tokio_postgres::{NoTls, Client};
async fn process_events() {
let consumer: StreamConsumer = ClientConfig::new()
.set("bootstrap.servers", "localhost:9092")
.set("group.id", "my-group")
.create()
.unwrap();
let (client, connection) = tokio_postgres::connect("host=localhost user=postgres", NoTls).await.unwrap();
tokio::spawn(async move {
if let Err(e) = connection.await {
eprintln!("Ошибка соединения: {}", e);
}
});
consumer.subscribe(&["my-topic"]).unwrap();
while let Some(message) = consumer.recv().await.unwrap() {
let payload = match message.payload_view::<str>() {
Some(Ok(text)) => text,
Some(Err(e)) => {
eprintln!("Ошибка декодирования сообщения: {:?}", e);
continue;
}
None => continue,
};
println!("Получено сообщение: {}", payload);
client.execute("INSERT INTO events (data) VALUES ($1)", &[&payload]).await.unwrap();
}
}
Подробнее с применяемыми библиотеками можно ознакомиться по гиперссылкам:
Tokio — асинхронная платформа для работы с сетями.
Actix — высокопроизводительный фреймворк для создания акторных систем.
async-std — асинхронный стандарт для работы с Rust.
Больше практических навыков по архитектуре приложений вы можете получить в рамках практических онлайн-курсов от экспертов отрасли.