Как стать автором
Обновить
48.37
Friflex
Мобильные приложения для миллионов пользователей🚀

Дружим RabbitMQ и Flutter/Dart

Уровень сложностиСредний
Время на прочтение9 мин
Количество просмотров251

Привет, Хабр! Я Юрий Петров, руководитель отдела мобильной разработки в Friflex и автор телеграм-канала «Мобильный разработчик».
В этой статье хотел бы поделиться с вами опытом работы с брокером сообщений RabbitMQ из Dart-кода. 

Разберу вопросы:

  1. Как установить и запустить контейнер с RabbitMQ

  2. Как настроить RabbitMQ.

  3. Как создать Producer (отправителя) на Flutter.

  4. Как создать Consumer (потребителя) на Flutter.

Давайте представим, что вы пишете маленький сервис на Dart, задача которого — отправить миллион сообщений. Не важно, каким способом — с помощью пушей или любого другого сервиса доставки. Но есть условие, что сообщения должны отправляться не все сразу, а по некоторому алгоритму.  Например, ровно в 12 дня или, если у пользователя ночь, то только в дневное время. На самом деле не важен алгоритм, главное — понять, что у нас есть некая задержка отправки сообщений.

Есть простой способ это организовать. Например, мы будем использовать для пула сообщений задержку:

await Future.delayed(Duration(hours: 2));

По окончании задержки система отправит сообщения через два часа после получения. И в целом это будет работать, но это не самый лучший способ.  Просто представьте, что таких сообщений может быть миллион, или, например, задержку отправки необходимо сделать на 10 дней, пока пользователь в отпуске. Понятно, что это не самый эффективный вариант.

Для решения таких проблем программисты придумали планировщики задач. Их называют брокеры сообщений, самый подходящий для нас — это RabbitMQ, так как у него есть нативный плагин Delayed Message Exchange, который отлично подходит для выполнения задач через установленное время. 

Как работает сам брокер и что это вообще такое, я рассказывать не буду, так как это отдельная тема. Но вы можете почитать, например, здесь.

Для решения этой задачи, будем использовать следующие понятия:

Producer (Отправитель): приложение, которое будет отправлять сообщение в брокер. Сообщение должно быть доставлено потребителю не сразу, а  через пять секунд.

Broker: сервис, который получает сообщение и хранит его в очереди  до момента, когда нужно его отправить. Используем RabbitMQ.

Consumer (Потребитель): приложение, которое слушает очередь в брокере, получает готовые к доставке сообщения и отображает их.

Установка и запуск контейнера с RabbitMQ

И первое, с чего мы начнем, это запустим сервис RabbitMQ как докер-контейнер. Но сначала вы должны у себя установить Docker Desktop, перейти на сайт и выбрать необходимую сборку. Делается это очень просто, я думаю, труда у вас это не составит.

После того как установили docker в систему, проверяем в терминале командой:

docker --version

Вывод в консоль должен быть примерно такой:

Docker version 27.4.0, build bde2b89.

После того как разобрались с докером, нам необходимо запустить контейнер с RabbitMQ. И здесь у вас есть выбор. Вы можете использовать оригинальный контейнер, запустив команду в терминале, которая автоматически скачает и развернет докер-контейнер. И далее самим установить плагин Delayed Message Exchange.

docker run -d --name my-rabbit -p 5672:5672 -p 15672:15672 rabbitmq:4.0.5-management

И можно использовать готовый образ с уже установленным плагином.

docker run -d --name my-rabbit -p 5672:5672 -p 15672:15672 heidiks/rabbitmq-delayed-message-exchange:4.0.2-management

Обе эти команды запускают контейнер с RabbitMQ, используя нужный докер-образ с параметрами:

-d: Флаг -d (или --detach) указывает Docker запускать контейнер в фоновом режиме, не блокируя терминал. Контейнер работает как отдельный процесс.
--name my-rabbit: Этот параметр задает имя для контейнера.

-p 5672:5672 -p 15672:15672: Пробрасывает порты между хост-машиной и контейнером.

После успешного выполнения команды проверьте, запущен ли у вас контейнер командой:

docker ps

В терминале вы должны увидеть запущенный контейнер в колонке NAMES.

Настройка RabbitMQ

Теперь нам необходимо настроить RabbitMQ. Для это переходим на веб-интерфейс — вводим  адрес в браузере: http://localhost:15672/

Вы должны увидеть окно для ввода логина и пароля.

Веб интерфейс RabbitMQ - ввод логина и пароля

Вводим данные по умолчанию: username - guest и password - guest.

Попадаем на главное окно:

Веб интерфейс RabbitMQ - главное окно

Далее переходим во вкладку Exchanges и добавляем новый обменник.

Заполняем поля:

  •  Name: delayed_exchange.

  •  Type: выбираем x-delayed-message. Если в списке нет x-delayed-message, значит, плагин не установлен — проверьте, что все установлено и включено.

  • Arguments: добавляем ключ: 

    • key: x-delayed-type

    • value: direct.

Веб интерфейс RabbitMQ - настройки Exchanges

Нажимаем на кнопку Add exchange. Убеждаемся, что в списке появился delayed_exchange.

Веб интерфейс RabbitMQ - проверка

Осталось настроить очередь для сообщений. Для этого переходим на вкладку Queues.

Добавляем новую очередь. Заполняем поля:

  • Name: messages_queue.

  • Durability: Durable.

Нажимаем Add queue.

Веб интерфейс RabbitMQ - настройки Queues

Отлично, у нас есть настроенный exchange и queue. Осталось их соединить.

Для этого переходим в созданную очередь.

Веб интерфейс RabbitMQ - переход на Queues

Далее на странице очереди messages_queue находим секцию Bindings.

Заполняем поля:

  • From exchange: delayed_exchange

  • Routing key: messages_queue

Нажимаем кнопку Bind.

Веб интерфейс RabbitMQ - пример связки

Таким образом, когда мы публикуем в delayed_exchange, сообщение придет в messages_queue, после того как истечет задержка x-delay.

Если вам не нравится работать в веб-интерфейсе, или вы хотели бы этот процесс автоматизировать, можно это все сделать через терминал командами:

Подключаемся к контейнеру my-rabbit:

docker exec -it my-rabbit bash

Объявляем exchange внутри контейнера:

rabbitmqadmin declare exchange \
    name=delayed_exchange \
    type=x-delayed-message \
    arguments='{"x-delayed-type":"direct"}'

Создаем очередь:

rabbitmqadmin declare queue name=messages_queue durable=true

Создаем связку:

rabbitmqadmin declare binding source=delayed_exchange destination=messages_queue routing_key="messages_queue"

Все готово, теперь нам необходимо реализовать Producer на Flutter.

Создание Producer (отправителя) на Flutter

Для реализации отправителя, создадим простой проект на Flutter с одной кнопкой, которая будет отправлять сообщение с нужной нам задержкой. Для работы с брокером будем использовать библиотеку dart_amqp

Добавляем библиотеку в проект командой:

flutter pub add dart_amqp

Создаем функцию, чтобы создать клиент для работы с RabbitMQ:

_initClient
Future<Exchange> _initClient() async {
  try {
    /// Настройки подключения
    final settings = ConnectionSettings(
      // Если у вас RabbitMQ запущен на на локальном компьютере
      // Укажите нужный IP-адрес или hostname
      host: 'localhost',
      // Порт
      port: 5672,
      // Данные для входа в RabbitMQ
      authProvider: const PlainAuthenticator('guest', 'guest'),
    );

    /// Создаем клиента для подключения к RabbitMQ
    Client client = Client(settings: settings);
    // Подключаемся
    await client.connect();
    // Создаем канал, по которому будем отправлять сообщения
    final channel = await client.channel();
    // Декларируем exchange. Если он уже создан, то повторная декларация
    // должна совпадать по параметрам.
    // Если не совпадает — будет ошибка.
    // Если exchange не существует, то он будет создан
    final exchange = await channel.exchange(
      'delayed_exchange',
      // Используем ExchangeType типа x-delayed-message
      ExchangeType.custom('x-delayed-message'),
      durable: true,
      arguments: {
        'x-delayed-type': 'direct',
      },
    );

    log('Успешно подключились к RabbitMQ');
    return exchange;
  } on Object catch (error, stackTrace) {
    log('Ошибка подключения к RabbitMQ', error: error, stackTrace: stackTrace);
    rethrow;
  }
}

Далее создаем простой виджет с одной кнопкой, которая будет отправлять сообщение в брокер. Это будет простое сообщение с датой отправки.

_sendMessage
  void _sendMessage() {
    // Публикуем сообщение. Ставим задержку 5000 мс (5 секунд) для примера
    // Через 5 секунд сообщение будет доставлено в очередь из Exchange x-delayed-message
    final int delayMs = 5000;
    // Обязательно передаем задержку в хедере
    final headers = {'x-delay': delayMs};
    // Создаем сообщение которое будем отправлять, можно отправить объект
    final message =
        'Привет, я отложенное сообщение, которое было отправлено в ${DateTime.now()}';
    // Настройки сообщения
    final properties = MessageProperties()..headers = headers;

    // Ключ очереди
    final routingKey = 'messages_queue';
    // Отправляем
    exchange.publish(
      message,
      routingKey,
      properties: properties,
    );
    log('Отправили сообщение с задержкой $delayMs мс');
  }

Полный исходный код:

main.dart
import 'dart:developer';

import 'package:dart_amqp/dart_amqp.dart';
import 'package:flutter/material.dart';

void main() async {
  /// Инициализируем клиента
  final exchange = await _initClient();

  runApp(MyApp(exchange: exchange));
}

Future<Exchange> _initClient() async {
  try {
    /// Настройки подключения
    final settings = ConnectionSettings(
      // Если у вас RabbitMQ запущен на не локальном компьютере
      // укажите нужный IP-адрес или hostname
      host: 'localhost',
      // Порт
      port: 5672,
      // Данные для входа в RabbitMQ
      authProvider: const PlainAuthenticator('guest', 'guest'),
    );

    /// Создаем клиента для подключения к RabbitMQ
    Client client = Client(settings: settings);
    // Подключаемся
    await client.connect();
    // Создаем канал, по которому будем отправлять сообщения
    final channel = await client.channel();
    // Декларируем exchange. Если он уже создан, то повторная декларация
    // должна совпадать по параметрам.
    // Если не совпадает — будет ошибка.
    // Если exchange не существует, то он будет создан
    final exchange = await channel.exchange(
      'delayed_exchange',
      // Используем ExchangeType типа x-delayed-message
      ExchangeType.custom('x-delayed-message'),
      durable: true,
      arguments: {
        'x-delayed-type': 'direct',
      },
    );

    log('Успешно подключились к RabbitMQ');
    return exchange;
  } on Object catch (error, stackTrace) {
    log(
      'Ошибка подключения к RabbitMQ',
      error: error,
      stackTrace: stackTrace,
    );
    rethrow;
  }
}

class MyApp extends StatelessWidget {
  // Exchange для публикаций
  final Exchange exchange;

  const MyApp({super.key, required this.exchange});

  @override
  Widget build(BuildContext context) {
    return MaterialApp(
      home: Scaffold(
        appBar: AppBar(title: const Text('Flutter Producer Пример')),
        body: Center(
          child: ElevatedButton(
            onPressed: _sendMessage,
            child: Text('Отправить'),
          ),
        ),
      ),
    );
  }

  void _sendMessage() {
    // Публикуем сообщение. Ставим задержку 5000 мс (5 секунд) для примера
    // Через 5 секунд сообщение будет доставлено в очередь из Exchange x-delayed-message
    final int delayMs = 5000;
    // Обязательно передаем задержку в хедере
    final headers = {'x-delay': delayMs};
    // Создаем сообщение которое будем отправлять, можно отправить объект
    final message =
        'Привет, я отложенное сообщение, которое было отправлено в ${DateTime.now()}';
    // Настройки сообщения
    final properties = MessageProperties()..headers = headers;

    // Ключ очереди
    final routingKey = 'messages_queue';
    // Отправляем
    exchange.publish(
      message,
      routingKey,
      properties: properties,
    );
    log('Отправили сообщение с задержкой $delayMs мс');
  }
}

После запуска приложения отправляем сообщение в брокер.

Producer - внешний вид

После отправки переходим в web-интерфейс RabbitMQ, далее в Exchange: delayed_exchange. На графике вы должны увидеть, что сообщение было добавлено в exchange. И если перейти в очередь, можно увидеть, что данные ушли и в очередь.

Веб интерфейс RabbitMQ - графики delayed_exchange и queues

Отлично, теперь можно написать потребителя.

Создание Consumer (потребителя) на Flutter

Как и в случае с отправителем, создаем новое Flutter-приложение, и добавляем библиотеку dart_amqp. Создаем метод для создания потребителя с тегом 'my_consumer'.

_initConsumer()
Future<Consumer> _initConsumer() async {
  // Инициализируем Client
  final client = Client(
    settings: ConnectionSettings(
      // Так как мы тестируем, используем локальный хост
      host: 'localhost',
      port: 5672,
      authProvider: const PlainAuthenticator('guest', 'guest'),
    ),
  );
  // Создаем клиент
  final channel = await client.channel();

  // Создаем очередь
  final queue = await channel.queue(
    'messages_queue',
    durable: true,
  );
  // Создаем потребителя, с тегом my_consumer
  final consumer = await queue.consume(
    consumerTag: 'my_consumer',
  );
  return consumer;
}

Далее создаем простой StatefulWidget с текстовым полем по центру, и в методе initState() создаем подписку на поток из RabbitMQ:

main.dart
import 'dart:async';

import 'package:dart_amqp/dart_amqp.dart';
import 'package:flutter/material.dart';

void main() async {
  // Инициализируем Consumer
  final consumer = await _initConsumer();

  runApp(MyApp(consumer: consumer));
}

Future<Consumer> _initConsumer() async {
  // Инициализируем Client
  final client = Client(
    settings: ConnectionSettings(
      // Так как мы тестируем, используем локальный хост
      host: 'localhost',
      port: 5672,
      authProvider: const PlainAuthenticator('guest', 'guest'),
    ),
  );
  // Создаем клиент
  final channel = await client.channel();

  // Создаем очередь
  final queue = await channel.queue(
    'messages_queue',
    durable: true,
  );
  // Создаем потребителя, с тегом my_consumer
  final consumer = await queue.consume(
    consumerTag: 'my_consumer',
  );
  return consumer;
}

class MyApp extends StatefulWidget {
  const MyApp({super.key, required this.consumer});
  final Consumer consumer;

  @override
  State<MyApp> createState() => _MyAppState();
}

class _MyAppState extends State<MyApp> {
  String message = 'Нет данных';
  late final StreamSubscription streamSubscription;

  @override
  initState() {
    super.initState();
    // Подписываемся на получение сообщений
    streamSubscription = widget.consumer.listen((AmqpMessage amqpMessage) {
      message =
          'Полученное сообщение: ${amqpMessage.payloadAsString} \nВремя получения: ${DateTime.now().toIso8601String()}';

      setState(() {});
    });
  }

  @override
  void dispose() {
    streamSubscription.cancel();
    super.dispose();
  }

  @override
  Widget build(BuildContext context) {
    return MaterialApp(
      home: Scaffold(
        appBar: AppBar(
          title: const Text(
            'Flutter Consumer Пример',
          ),
        ),
        body: Center(child: Text(message)),
      ),
    );
  }
}

Запускаем потребителя, и если подключение прошло успешно, можно перейти в очередь и убедиться, что мы подключены как потребители.

Веб интерфейс RabbitMQ - подключенные потребители

Нажимаем на кнопку «Отправить» на отправителе.

Пример работы отправителя и получателя.

Обратите внимание, что сообщение было отправлено в 18:42:50, а потребитель получил его в 18:42:55. То есть ровно через 5 секунд, так как мы и планировали.

Вот в целом и все, чем я хотел с вами поделиться. 

Полный исходный код потребителя и отправителя можно посмотреть здесь.

А как бы вы решили такую задачу? Жду ваших комментариев.

Теги:
Хабы:
0
Комментарии0

Публикации

Информация

Сайт
friflex.com
Дата регистрации
Дата основания
Численность
101–200 человек
Местоположение
Россия
Представитель
Friflex_dev

Истории