Привет, Хабр!
Сегодня рассмотрим библиотеку Lapin в Rust. Lapin — это библиотека, реализующая протокол AMQP 0.9.1, она помогает взаимодействовать с RabbitMQ.
Многоканальная работа: один TCP‑соединение поддерживает множество каналов.
Поддержка подтверждений: безопасная обработка сообщений (ack/nack).
Интеграция с async: асинхронный API, который вписывается в экосистему Rust.
TLS: поддержка защищённых соединений через
native-tls,opensslилиrustls.
Основные сущности Lapin
Connection
Соединение с RabbitMQ устанавливается один раз и может использоваться для работы с несколькими каналами. Это основа любого взаимодействия.
Пример:
use lapin::{Connection, ConnectionProperties}; let addr = "amqp://user:password@localhost:5672/%2f"; let connection = Connection::connect(&addr, ConnectionProperties::default()) .await .expect("Ошибка подключения");
Поддерживаются как TCP, так и TLS‑соединения.
Channel
Каналы — это основа взаимодействия с RabbitMQ. Через них создаются очереди, подписки и отправляются сообщения.
Пример:
let channel = connection.create_channel() .await .expect("Ошибка создания канала");
RabbitMQ рекомендует использовать отдельные каналы для отправки и получения сообщений. В одном соединении можно создавать множество каналов.
Queue
Очереди — это хранилища сообщений. Они бывают:
Durable: сохраняются при перезапуске сервера.
Exclusive: доступны только для текущего соединения.
Auto‑delete: удаляются, когда больше не используются.
Пример:
use lapin::options::QueueDeclareOptions; use lapin::types::FieldTable; let queue = channel .queue_declare( "task_queue", QueueDeclareOptions { durable: true, // Устойчивая очередь ..Default::default() }, FieldTable::default(), ) .await .expect("Ошибка объявления очереди");
Используйте FieldTable, чтобы настроить TTL сообщений, ограничение размера очереди и т. п.
Exchange
Exchange — это маршрутизатор, который направляет сообщения в очереди на основе типов и правил маршрутизации:
Direct: сообщения отправляются в конкретную очередь по ключу маршрутизации.
Fanout: сообщение отправляется во все очереди, привязанные к exchange.
Topic: сложная маршрутизация по шаблонам.
Headers: маршрутизация на основе заголовков сообщения.
Пример настройки exchange и привязки к очереди:
use lapin::options::{ExchangeDeclareOptions, QueueBindOptions}; channel .exchange_declare( "my_exchange", lapin::ExchangeKind::Direct, // Тип exchange ExchangeDeclareOptions { durable: true, ..Default::default() }, FieldTable::default(), ) .await .expect("Ошибка создания exchange"); channel .queue_bind( "task_queue", "my_exchange", "routing_key", // Ключ маршрутизации QueueBindOptions::default(), FieldTable::default(), ) .await .expect("Ошибка привязки очереди");
TLS:
Для продакшен‑окружения требуется защищённое соединение. Настроим соединение через rustls:
use lapin::{Connection, ConnectionProperties}; use lapin::tcp::{OwnedTLSConfig, OwnedTLSStream}; let addr = "amqps://user:password@rabbitmq.example.com:5671/"; let tls_config = OwnedTLSConfig::default(); let connection = Connection::connect( addr, ConnectionProperties::default().with_tls(tls_config), ) .await .expect("Ошибка подключения через TLS");
Обработка ошибок
Ошибки неизбежны.
Используйте
retryдля повторного подключения или обработки сообщения.Обрабатывайте
nack, если сообщение нельзя обработать.
Пример обработки ошибок:
while let Some(delivery) = consumer.next().await { match delivery { Ok(delivery) => { if let Err(err) = process_message(&delivery).await { error!("Ошибка обработки сообщения: {:?}", err); delivery .nack(Default::default()) .await .expect("Ошибка отправки nack"); } else { delivery .ack(Default::default()) .await .expect("Ошибка отправки ack"); } } Err(err) => { error!("Ошибка получения сообщения: {:?}", err); } } }
Пример применения
Напишем приложение, которое:
Создаёт exchange и очередь.
Маршрутизирует сообщения.
Обрабатывает входящие сообщения с QoS и подтверждениями.
use lapin::{ options::{BasicPublishOptions, QueueBindOptions, QueueDeclareOptions}, types::FieldTable, BasicProperties, Connection, ConnectionProperties, }; use tokio; use tracing::info; #[tokio::main] async fn main() { tracing_subscriber::fmt::init(); let addr = "amqp://user:password@localhost:5672/%2f"; let connection = Connection::connect(&addr, ConnectionProperties::default()) .await .expect("Ошибка подключения"); let channel = connection.create_channel().await.expect("Ошибка создания канала"); channel .exchange_declare( "logs", lapin::ExchangeKind::Fanout, Default::default(), FieldTable::default(), ) .await .expect("Ошибка создания exchange"); let queue = channel .queue_declare( "", QueueDeclareOptions { exclusive: true, ..Default::default() }, FieldTable::default(), ) .await .expect("Ошибка объявления очереди"); channel .queue_bind( &queue.name(), "logs", "", QueueBindOptions::default(), FieldTable::default(), ) .await .expect("Ошибка привязки очереди"); info!("Очередь привязана к exchange"); tokio::spawn(async move { for i in 0..5 { let message = format!("Log message {}", i); channel .basic_publish( "logs", "", BasicPublishOptions::default(), message.as_bytes(), BasicProperties::default(), ) .await .expect("Ошибка отправки сообщения"); } }); info!("Сообщения отправлены"); }
Создаём exchange типа Fanout, объявляем временную очередь и связываем её с exchange. После этого отправляем несколько сообщений в exchange, которые автоматически маршрутизируются во все привязанные очереди.
Подробнее с библиотекой можно ознакомиться здесь.
Рекомендую обратить внимание на открытые уроки, которые в феврале проведут в Otus преподаватели-практики:
11 февраля: «Разбираем анатомию парсера на Rust».
Разберём устройство игрушечного парсера на Rust, его ключевые компоненты и архитектурные принципы, обеспечивающие надёжность и производительность кода. Записаться17 февраля: «Инцидент-менеджмент в SRE — как быстро найти, устранить и предотвратить сбои в системе».
Практическое руководство по эффективному управлению аварийными ситуациями в рамках Site Reliability Engineering (SRE). Записаться
