Привет, Хабр! Я Юрий Петров, руководитель отдела мобильной разработки в Friflex и автор телеграм-канала «Мобильный разработчик».
В этой статье хотел бы поделиться с вами опытом работы с брокером сообщений RabbitMQ из Dart-кода.
Разберу вопросы:
Как установить и запустить контейнер с RabbitMQ
Как настроить RabbitMQ.
Как создать Producer (отправителя) на Flutter.
Как создать Consumer (потребителя) на Flutter.
Давайте представим, что вы пишете маленький сервис на Dart, задача которого — отправить миллион сообщений. Не важно, каким способом — с помощью пушей или любого другого сервиса доставки. Но есть условие, что сообщения должны отправляться не все сразу, а по некоторому алгоритму. Например, ровно в 12 дня или, если у пользователя ночь, то только в дневное время. На самом деле не важен алгоритм, главное — понять, что у нас есть некая задержка отправки сообщений.
Есть простой способ это организовать. Например, мы будем использовать для пула сообщений задержку:
await Future.delayed(Duration(hours: 2));
По окончании задержки система отправит сообщения через два часа после получения. И в целом это будет работать, но это не самый лучший способ. Просто представьте, что таких сообщений может быть миллион, или, например, задержку отправки необходимо сделать на 10 дней, пока пользователь в отпуске. Понятно, что это не самый эффективный вариант.
Для решения таких проблем программисты придумали планировщики задач. Их называют брокеры сообщений, самый подходящий для нас — это RabbitMQ, так как у него есть нативный плагин Delayed Message Exchange, который отлично подходит для выполнения задач через установленное время.
Как работает сам брокер и что это вообще такое, я рассказывать не буду, так как это отдельная тема. Но вы можете почитать, например, здесь.
Для решения этой задачи, будем использовать следующие понятия:
Producer (Отправитель): приложение, которое будет отправлять сообщение в брокер. Сообщение должно быть доставлено потребителю не сразу, а через пять секунд.
Broker: сервис, который получает сообщение и хранит его в очереди до момента, когда нужно его отправить. Используем RabbitMQ.
Consumer (Потребитель): приложение, которое слушает очередь в брокере, получает готовые к доставке сообщения и отображает их.
Установка и запуск контейнера с RabbitMQ
И первое, с чего мы начнем, это запустим сервис RabbitMQ как докер-контейнер. Но сначала вы должны у себя установить Docker Desktop, перейти на сайт и выбрать необходимую сборку. Делается это очень просто, я думаю, труда у вас это не составит.
После того как установили docker в систему, проверяем в терминале командой:
docker --version
Вывод в консоль должен быть примерно такой:
Docker version 27.4.0, build bde2b89.
После того как разобрались с докером, нам необходимо запустить контейнер с RabbitMQ. И здесь у вас есть выбор. Вы можете использовать оригинальный контейнер, запустив команду в терминале, которая автоматически скачает и развернет докер-контейнер. И далее самим установить плагин Delayed Message Exchange.
docker run -d --name my-rabbit -p 5672:5672 -p 15672:15672 rabbitmq:4.0.5-management
И можно использовать готовый образ с уже установленным плагином.
docker run -d --name my-rabbit -p 5672:5672 -p 15672:15672 heidiks/rabbitmq-delayed-message-exchange:4.0.2-management
Обе эти команды запускают контейнер с RabbitMQ, используя нужный докер-образ с параметрами:
-d
: Флаг -d
(или --detach
) указывает Docker запускать контейнер в фоновом режиме, не блокируя терминал. Контейнер работает как отдельный процесс.--name my-rabbit
: Этот параметр задает имя для контейнера.
-p 5672:5672 -p 15672:15672
: Пробрасывает порты между хост-машиной и контейнером.
После успешного выполнения команды проверьте, запущен ли у вас контейнер командой:
docker ps
В терминале вы должны увидеть запущенный контейнер в колонке NAMES.
Настройка RabbitMQ
Теперь нам необходимо настроить RabbitMQ. Для это переходим на веб-интерфейс — вводим адрес в браузере: http://localhost:15672/
Вы должны увидеть окно для ввода логина и пароля.
Веб интерфейс RabbitMQ - ввод логина и пароля
Вводим данные по умолчанию: username - guest и password - guest.
Попадаем на главное окно:
Веб интерфейс RabbitMQ - главное окно
Далее переходим во вкладку Exchanges и добавляем новый обменник.
Заполняем поля:
Name: delayed_exchange.
Type: выбираем x-delayed-message. Если в списке нет x-delayed-message, значит, плагин не установлен — проверьте, что все установлено и включено.
Arguments: добавляем ключ:
key: x-delayed-type
value: direct.
Веб интерфейс RabbitMQ - настройки Exchanges
Нажимаем на кнопку Add exchange. Убеждаемся, что в списке появился delayed_exchange.
Веб интерфейс RabbitMQ - проверка
Осталось настроить очередь для сообщений. Для этого переходим на вкладку Queues.
Добавляем новую очередь. Заполняем поля:
Name: messages_queue.
Durability: Durable.
Нажимаем Add queue.
Веб интерфейс RabbitMQ - настройки Queues
Отлично, у нас есть настроенный exchange и queue. Осталось их соединить.
Для этого переходим в созданную очередь.
Веб интерфейс RabbitMQ - переход на Queues
Далее на странице очереди messages_queue находим секцию Bindings.
Заполняем поля:
From exchange: delayed_exchange
Routing key: messages_queue
Нажимаем кнопку Bind.
Веб интерфейс RabbitMQ - пример связки
Таким образом, когда мы публикуем в delayed_exchange, сообщение придет в messages_queue, после того как истечет задержка x-delay.
Если вам не нравится работать в веб-интерфейсе, или вы хотели бы этот процесс автоматизировать, можно это все сделать через терминал командами:
Подключаемся к контейнеру my-rabbit:
docker exec -it my-rabbit bash
Объявляем exchange внутри контейнера:
rabbitmqadmin declare exchange \
name=delayed_exchange \
type=x-delayed-message \
arguments='{"x-delayed-type":"direct"}'
Создаем очередь:
rabbitmqadmin declare queue name=messages_queue durable=true
Создаем связку:
rabbitmqadmin declare binding source=delayed_exchange destination=messages_queue routing_key="messages_queue"
Все готово, теперь нам необходимо реализовать Producer на Flutter.
Создание Producer (отправителя) на Flutter
Для реализации отправителя, создадим простой проект на Flutter с одной кнопкой, которая будет отправлять сообщение с нужной нам задержкой. Для работы с брокером будем использовать библиотеку dart_amqp
Добавляем библиотеку в проект командой:
flutter pub add dart_amqp
Создаем функцию, чтобы создать клиент для работы с RabbitMQ:
_initClient
Future<Exchange> _initClient() async {
try {
/// Настройки подключения
final settings = ConnectionSettings(
// Если у вас RabbitMQ запущен на на локальном компьютере
// Укажите нужный IP-адрес или hostname
host: 'localhost',
// Порт
port: 5672,
// Данные для входа в RabbitMQ
authProvider: const PlainAuthenticator('guest', 'guest'),
);
/// Создаем клиента для подключения к RabbitMQ
Client client = Client(settings: settings);
// Подключаемся
await client.connect();
// Создаем канал, по которому будем отправлять сообщения
final channel = await client.channel();
// Декларируем exchange. Если он уже создан, то повторная декларация
// должна совпадать по параметрам.
// Если не совпадает — будет ошибка.
// Если exchange не существует, то он будет создан
final exchange = await channel.exchange(
'delayed_exchange',
// Используем ExchangeType типа x-delayed-message
ExchangeType.custom('x-delayed-message'),
durable: true,
arguments: {
'x-delayed-type': 'direct',
},
);
log('Успешно подключились к RabbitMQ');
return exchange;
} on Object catch (error, stackTrace) {
log('Ошибка подключения к RabbitMQ', error: error, stackTrace: stackTrace);
rethrow;
}
}
Далее создаем простой виджет с одной кнопкой, которая будет отправлять сообщение в брокер. Это будет простое сообщение с датой отправки.
_sendMessage
void _sendMessage() {
// Публикуем сообщение. Ставим задержку 5000 мс (5 секунд) для примера
// Через 5 секунд сообщение будет доставлено в очередь из Exchange x-delayed-message
final int delayMs = 5000;
// Обязательно передаем задержку в хедере
final headers = {'x-delay': delayMs};
// Создаем сообщение которое будем отправлять, можно отправить объект
final message =
'Привет, я отложенное сообщение, которое было отправлено в ${DateTime.now()}';
// Настройки сообщения
final properties = MessageProperties()..headers = headers;
// Ключ очереди
final routingKey = 'messages_queue';
// Отправляем
exchange.publish(
message,
routingKey,
properties: properties,
);
log('Отправили сообщение с задержкой $delayMs мс');
}
Полный исходный код:
main.dart
import 'dart:developer';
import 'package:dart_amqp/dart_amqp.dart';
import 'package:flutter/material.dart';
void main() async {
/// Инициализируем клиента
final exchange = await _initClient();
runApp(MyApp(exchange: exchange));
}
Future<Exchange> _initClient() async {
try {
/// Настройки подключения
final settings = ConnectionSettings(
// Если у вас RabbitMQ запущен на не локальном компьютере
// укажите нужный IP-адрес или hostname
host: 'localhost',
// Порт
port: 5672,
// Данные для входа в RabbitMQ
authProvider: const PlainAuthenticator('guest', 'guest'),
);
/// Создаем клиента для подключения к RabbitMQ
Client client = Client(settings: settings);
// Подключаемся
await client.connect();
// Создаем канал, по которому будем отправлять сообщения
final channel = await client.channel();
// Декларируем exchange. Если он уже создан, то повторная декларация
// должна совпадать по параметрам.
// Если не совпадает — будет ошибка.
// Если exchange не существует, то он будет создан
final exchange = await channel.exchange(
'delayed_exchange',
// Используем ExchangeType типа x-delayed-message
ExchangeType.custom('x-delayed-message'),
durable: true,
arguments: {
'x-delayed-type': 'direct',
},
);
log('Успешно подключились к RabbitMQ');
return exchange;
} on Object catch (error, stackTrace) {
log(
'Ошибка подключения к RabbitMQ',
error: error,
stackTrace: stackTrace,
);
rethrow;
}
}
class MyApp extends StatelessWidget {
// Exchange для публикаций
final Exchange exchange;
const MyApp({super.key, required this.exchange});
@override
Widget build(BuildContext context) {
return MaterialApp(
home: Scaffold(
appBar: AppBar(title: const Text('Flutter Producer Пример')),
body: Center(
child: ElevatedButton(
onPressed: _sendMessage,
child: Text('Отправить'),
),
),
),
);
}
void _sendMessage() {
// Публикуем сообщение. Ставим задержку 5000 мс (5 секунд) для примера
// Через 5 секунд сообщение будет доставлено в очередь из Exchange x-delayed-message
final int delayMs = 5000;
// Обязательно передаем задержку в хедере
final headers = {'x-delay': delayMs};
// Создаем сообщение которое будем отправлять, можно отправить объект
final message =
'Привет, я отложенное сообщение, которое было отправлено в ${DateTime.now()}';
// Настройки сообщения
final properties = MessageProperties()..headers = headers;
// Ключ очереди
final routingKey = 'messages_queue';
// Отправляем
exchange.publish(
message,
routingKey,
properties: properties,
);
log('Отправили сообщение с задержкой $delayMs мс');
}
}
После запуска приложения отправляем сообщение в брокер.
Producer - внешний вид
После отправки переходим в web-интерфейс RabbitMQ, далее в Exchange: delayed_exchange. На графике вы должны увидеть, что сообщение было добавлено в exchange. И если перейти в очередь, можно увидеть, что данные ушли и в очередь.
Веб интерфейс RabbitMQ - графики delayed_exchange и queues
Отлично, теперь можно написать потребителя.
Создание Consumer (потребителя) на Flutter
Как и в случае с отправителем, создаем новое Flutter-приложение, и добавляем библиотеку dart_amqp. Создаем метод для создания потребителя с тегом 'my_consumer
'.
_initConsumer()
Future<Consumer> _initConsumer() async {
// Инициализируем Client
final client = Client(
settings: ConnectionSettings(
// Так как мы тестируем, используем локальный хост
host: 'localhost',
port: 5672,
authProvider: const PlainAuthenticator('guest', 'guest'),
),
);
// Создаем клиент
final channel = await client.channel();
// Создаем очередь
final queue = await channel.queue(
'messages_queue',
durable: true,
);
// Создаем потребителя, с тегом my_consumer
final consumer = await queue.consume(
consumerTag: 'my_consumer',
);
return consumer;
}
Далее создаем простой StatefulWidget
с текстовым полем по центру, и в методе initState()
создаем подписку на поток из RabbitMQ:
main.dart
import 'dart:async';
import 'package:dart_amqp/dart_amqp.dart';
import 'package:flutter/material.dart';
void main() async {
// Инициализируем Consumer
final consumer = await _initConsumer();
runApp(MyApp(consumer: consumer));
}
Future<Consumer> _initConsumer() async {
// Инициализируем Client
final client = Client(
settings: ConnectionSettings(
// Так как мы тестируем, используем локальный хост
host: 'localhost',
port: 5672,
authProvider: const PlainAuthenticator('guest', 'guest'),
),
);
// Создаем клиент
final channel = await client.channel();
// Создаем очередь
final queue = await channel.queue(
'messages_queue',
durable: true,
);
// Создаем потребителя, с тегом my_consumer
final consumer = await queue.consume(
consumerTag: 'my_consumer',
);
return consumer;
}
class MyApp extends StatefulWidget {
const MyApp({super.key, required this.consumer});
final Consumer consumer;
@override
State<MyApp> createState() => _MyAppState();
}
class _MyAppState extends State<MyApp> {
String message = 'Нет данных';
late final StreamSubscription streamSubscription;
@override
initState() {
super.initState();
// Подписываемся на получение сообщений
streamSubscription = widget.consumer.listen((AmqpMessage amqpMessage) {
message =
'Полученное сообщение: ${amqpMessage.payloadAsString} \nВремя получения: ${DateTime.now().toIso8601String()}';
setState(() {});
});
}
@override
void dispose() {
streamSubscription.cancel();
super.dispose();
}
@override
Widget build(BuildContext context) {
return MaterialApp(
home: Scaffold(
appBar: AppBar(
title: const Text(
'Flutter Consumer Пример',
),
),
body: Center(child: Text(message)),
),
);
}
}
Запускаем потребителя, и если подключение прошло успешно, можно перейти в очередь и убедиться, что мы подключены как потребители.
Веб интерфейс RabbitMQ - подключенные потребители
Нажимаем на кнопку «Отправить» на отправителе.
Пример работы отправителя и получателя.
Обратите внимание, что сообщение было отправлено в 18:42:50, а потребитель получил его в 18:42:55. То есть ровно через 5 секунд, так как мы и планировали.
Вот в целом и все, чем я хотел с вами поделиться.
Полный исходный код потребителя и отправителя можно посмотреть здесь.
А как бы вы решили такую задачу? Жду ваших комментариев.