Работа с очередями сообщений — важная часть современных систем обработки данных. В нашей команде мы используем брокер сообщений RabbitMQ, но нам пришлось столкнуться с проблемами при обработке большого объема данных. В поисках решений я начал изучать различные способы оптимизации, и таким образом познакомился с RabbitMQ Streams – плагином, добавляющим log-based потоки, работающие по аналогии с Kafka

Я потратил некоторые время, вникая в принципы работы RabbitMQ Streams с .NET и хочу представить вам краткий обзор, который призван упростить погружение в эту систему


Статья будет разбита по вопросам, ответы на которые я последовательно искал, разбираясь с RabbitMQ Streams. В основном вся информация взята прямиком с официальной документации, где в общем-то всё подробно расписано. Поэтому если возникнет желание углубиться – везде будут ссылки 😊

❓ Что это и зачем?

Если вы знакомы с современными брокерами, то скорее всего знаете, что сейчас распространены два разных подхода к обработке сообщений: очередь и поток. Для наглядности в дальнейшем буду ссылаться на ярких представителей каждого из этих подходов: RabbitMQ, собственной персоной, и Apache Kafka соответственно

RabbitMQ

Apache Kafka

Использует очереди - сообщения хранятся в оперативной памяти

Оптимизирован под быстрое «очищение» очереди

Использует потоки - сообщения пишутся на диск

Оптимизирован под большие объемы данных

Каждый из подходов имеет свои слабые и сильные стороны. Так, слабая сторона очередей в RabbitMQ в том, что они плохо справляются с огромными объемами данных:

  • Низкая пропускная способность в сравнении с потоковыми системами

  • Низкая производительность при больших очередях

  • Низкая производительность при большом количестве очередей в «fanout» обменнике (exchange)

Представьте, что сидите вы спокойно на RabbitMQ с 10к сообщний в секунду и вас всё устраивает. Но по одной очереди внезапно начинает приходить 100к сообщений: CPU грузится на 100%, сообщения перестают долетать, дата-центр сгорает дотла и его прах увозят инопланетяне на своем НЛО

Собственно, примерно с этим мы и столкнулись

Что же делать? Внедрять Kafka? К счастью, пересаживаться на другой брокер оказалось необязательно, ведь существует RabbitMQ Streams – плагин, который добавляет в RabbitMQ потоки (streams). Потоки работают примерно так же, как и в Kafka: пишутся прямиком в лог на диске и предоставляются по запросу клиента с любого места

⚠️Небольшой дисклеймер⚠️

Если верить следующему бенчмарку, то RabbitMQ Streams полноценно не заменяет Кафку, зато действительно увеличивает пропускную способность брокера в 3-4 раза: https://www.youtube.com/watch?v=UPkOsXKG4ns


⚙️ Как это работает?

Ссылка на полный материал: https://www.rabbitmq.com/docs/streams#usage

С RabbitMQ Streams можно работать как через отдельный протокол, доступный по порту 5552, так и через привычный AMQP-протокол (по порту 5672), но с некоторыми ограничениями, о которых речь пойдет ниже. Для начала работы достаточно выполнить команду в терминале системы, где установлен брокер. Эта команда включает плагин потоков:

# docker run -it --rm -p 5552:5552 -p 5672:5672 -p 15672:15672 rabbitmq:3-management
rabbitmq-plugins enable rabbitmq_stream
Запуск плагина rabbitmq_stream в терминале
Запуск плагина rabbitmq_stream в терминале

Поток остается AMQP-совместимой очередью (queue). И в принципе вы можете создать его так же, как и обычную очередь, лишь указав атрибут «x-queue-type»

await channel.QueueDeclareAsync(
    "my-stream",
    durable: true,
    exclusive: false,
    autoDelete: false,
    arguments: new Dictionary<string, object> { { "x-queue-type", "stream" } }
);

Небольшие нюансы кроются в создании подписчиков (consumers), так как во-первых, для них необходимо указать prefetch, а во-вторых, в них необходимо учитывать возможность считывать поток с любой точки

// указываем prefetch
await channel.BasicQosAsync(0, prefetchCount: 100, false);
 
var consumer = new AsyncEventingBasicConsumer(channel);
consumer.ReceivedAsync += async (model, ea) =>
{
    // читаем сообщение
    var message = Encoding.UTF8.GetString(ea.Body.ToArray());

    // получаем позицию сообщения
    var offest = ea.BasicProperties.Headers?["x-stream-offset"];

    await channel.BasicAckAsync(ea.DeliveryTag, false);
};


var arguments = new Dictionary<string, object?>
{
   { "x-stream-offset", "first" } // читаем с самого первого сообщения в потоке
};
await channel.BasicConsumeAsync("my-stream", false, "my-consumer", arguments, consumer);

С точки зрения издателя (producer) абсолютно ничего не меняется. И все возможности роутинга для RabbitMQ полностью сохраняются

// создание fanout обменника
await channel.ExchangeDeclareAsync(exchange: "my-exchange", type: ExchangeType.Fanout);

// бинд очереди за созданным обменником
await channel.QueueBindAsync("my-stream", "my-exchange", "key");

// публикация сообщения в обменник
await channel.BasicPublishAsync(exchange: "my-exchange", routingKey: "key", body: "Hello World!"u8.ToArray());

⚠️Обратите внимание!⚠️

Поведение нескольких подписчиков (consumers), считывающих один поток, будет сильно отличаться от их же поведения, но при чтении обычной очереди. Проще говоря, если у вас есть очередь и у неё есть две подписки, то часть сообщений будет отправляться одному подписчику, а часть – другому. Но в случае с потоками, оба подписчика будут читать абсолютно все сообщения

К сожалению, масштабировать потоки, используя AMQP-протокол довольно проблематично. Сразу возникает несколько вопросов:

  • Что делать, если у сервиса-подписчика будет несколько реплик?

  • Что если поток будет слишком большим для одной машины?

У RabbitMQ Streams готов ответ на каждый из этих вопросов. Но об этом ниже

А пока хотелось бы отметить, что для нераспределенных систем RabbitMQ Streams делает настоящую магию. Благодаря потокам можно в разы увеличить пропускную способность очередей, отделавшись при этом минимальными доработками

📌Небольшое примечание📌

В статье я использую термины подписчик / издатель вместо consumer / producer, т.к. в русском языке это звучит плохенько 😕


📈  Как это масштабируется?

Ссылка на полный материал: https://www.rabbitmq.com/blog/2022/07/13/rabbitmq-3-11-feature-preview-super-streams

А также: https://www.rabbitmq.com/tutorials/tutorial-one-dotnet-stream

И ещё это:  https://www.rabbitmq.com/docs/stream-connections

🧩 Партиционирование

Возможно, вы уже представили, какой жуткий костыль вам придется пилить, чтобы раскидать данные по нескольким потокам… Но делать этого не придется, ибо разработчики всё предусмотрели и почти дословно перенесли из Kafka систему с топиками и партициями. Называется она Super Streams

На языке RabbitMQ

На языке Apache Kafka

Super stream

Topic

Stream

Partition

Consumers group by Reference (я не нашел нормального термина)

Group

В AMQP-семантике суперстрим (super stream) – это обменник (exchange) с некоторым количеством очередей-потоков в нём

A super stream is a structure that sits above streams, allowing to logically group a set of streams. AMQP 0.9.1 resources define its physical topology.
Super stream - это структура поверх обычных потоков, группирующая набор потоков

Для того, чтобы распределить потоки между репликами, используется специальная фича – Single Active Consumer (не путать с фичей с аналогичным названием, но для очередей). Благодаря ней, у конкретного потока может быть только один активный подписчик. Остальные подписчики становятся неактивными. Это исключает конкурентную обработку одного потока

Only one instance receives messages in a group of consumers when single active consumer is enabled.
Если включен Single Active Consumer, сообщения в группе потребителей получает только один инстанс

В итоге при запуске нескольких реплик получается следующая картина: по каждому подключению запускается сразу несколько подписчиков на разные потоки одного суперстрима. Но при этом соотношение активных и неактивных подписчиков равномерно распределено по всем подключениям. Ну и соответственно если подписчик на одном подключении активен, то на остальных он будет неактивен

Combining super stream consumers and single active consumer. There is only one active consumer on a partition at a time for a given group.
Сочетание Super Streams и Single Active Consumer. У каждого потока (партиции) только один активный подписчик

Также хочу обратить внимание на то, что по каждому суперстриму может быть несколько групп подписчиков. В RabbitMQ Streams для каждого подписчика можно указать Reference, по значению которого будет проверяться его принадлежность к той или иной группе. Это крайне важная особенность, ведь без неё не получится продублировать сообщения из суперстрима сразу в несколько сервисов (допустим, у вас есть domain-logic-service, который выполняет какую-то бизнес-логику, и logs-service, который просто собирает логи; и оба сервиса хотят получать все события из потока)

Сама логика маршрутизации сообщения в тот или иной поток суперстрима заложена в клиенте RabbitMQ.Stream.Client. Он отличается от обычного AMQP-клиента тем, что использует совершенно другой протокол (без которого невозможно работать с Single Active Consumer как минимум). Этот клиент умный и умеет вычислять хэш по переданному ему идентификатору сообщения. По этому хэшу он и распределяет сообщения между потоками

Подробнее со спецификацией протокола, используемого для взаимодействия с потоками, вы можете ознакомиться здесь: https://github.com/rabbitmq/rabbitmq-server/blob/v4.2.x/deps/rabbitmq_stream/docs/PROTOCOL.adoc#credit

И раз уж зашла речь про клиент для стримов, то предлагаю уже с ним ознакомиться

// конфигурируем подключение
var cfg = new StreamSystemConfig
{
    Password = "guest",
    UserName = "guest",
    VirtualHost = "/",
    Endpoints = new List<EndPoint>
    {
        new IPEndPoint(IPAddress.Loopback, 5552)
    }
};

var streamSystem = await StreamSystem.Create(cfg);

// создаем суперстрим на 6 партиций (стримов)
await streamSystem.CreateSuperStream(new PartitionsSuperStreamSpec("super-stream", 6));

// создаем подписчика
var superConsumer = await streamSystem.CreateSuperStreamConsumer(new RawSuperStreamConsumerConfig("super-stream")
{
    IsSingleActiveConsumer = true,
    Reference = "ConsumerGroup0",
    MessageHandler = async (stream, _, _, message) =>
        Console.WriteLine(Encoding.UTF8.GetString(message.Data.Contents)),
});

// …

// создаем издателя
var producer = await streamSystem.CreateRawSuperStreamProducer(new RawSuperStreamProducerConfig("super-stream")
{
    Routing = (message) => message.Properties.CorrelationId.ToString()
});

// пишем в суперстрим
await producer.Send(
  publishingId: 0,
  new Message(Encoding.UTF8.GetBytes($"Hello, World!"))
    { Properties = new Properties { CorrelationId = "1" } }
);
// что за магический нуль первым аргументом – поговорим чуть ниже 😊

Примерно так и осуществляется работа с суперстримами в .NET. Хотелось бы написать какой-то комментарий, но тут больше и нечего добавить – интуитивно понятное API говорит само за себя

Единственное, что хотелось бы отметить, так это то, что всё богатство роутинга в RabbitMQ, к сожалению, исчезает в режиме суперстримов. С этим ничего не сделаешь – остается только фильтровать сообщения на стороне подписчика: https://rabbitmq.github.io/rabbitmq-stream-dotnet-client/stable/htmlsingle/index.html#_filtering

🌐 Кластеризация

В работе с кластером есть некоторые нюансы. Дело в том, что обычно неважно, к какой именно ноде вы коннектитесь. Но в случае с RabbitMQ Streams это не так: клиент должен обладать информацией о всех нодах. Это приводит к некоторым проблемам, если вы используете балансировщик нагрузки. Впрочем решение этой проблемы есть, и ознакомиться с ним вы можете здесь: https://rabbitmq.github.io/rabbitmq-stream-dotnet-client/stable/htmlsingle/index.html#address-resolver

А работает кластеризация для потоков достаточно предсказуемо. При создании нового потока выбирается лидер для него. Остальные ноды будут получать реплики этого потока. Ожидается, что только издатель будет взаимодействовать с потоком-лидером, то бишь писать в него. Подписчики же будут обращаться к одной из реплик для чтения. На этом моменте и возникают проблемы с балансировщиками нагрузки, ведь им неизвестно, где находится лидер, а где – реплика

По каждому потоку выбирается лидер на одной из нод. Остальные ноды будут содержать реплики
По каждому потоку выбирается лидер н�� одной из нод. Остальные ноды будут содержать реплики

Подразумевается, что одна нода не может содержать все потоки-лидеры: они будут равномерно раскиданы по всему кластеру. И в общем-то это всё, что нужно знать для начала работы

Здесь опять же используются возможности «умного» клиента. Он, как было сказано выше, знает о существовании всех нод. А ещё он умеет запрашивать мета-информацию о кластере

На практике работа с кластером выглядит примерно так

var cfg = new StreamSystemConfig
 {
     Password = "guest",
     UserName = "guest",
     VirtualHost = "/",
     // указываем все доступные ноды
     Endpoints = new List<EndPoint>
     {
         new DnsEndPoint("rabbitmq1", 5552),
         new DnsEndPoint("rabbitmq2", 5552),
         new DnsEndPoint("rabbitmq3", 5552)
     }
 };
 var streamSystem = await StreamSystem.Create(cfg);

 
// создаем поток, применяя механизм выбора лидера
await streamSystem.CreateSuperStream(new PartitionsSuperStreamSpec("super-stream", 3)
 {
     LeaderLocator = LeaderLocator.LeastLeaders,
 });

📊 Как соблюсти консистентность?

Ссылка на полный материал: https://rabbitmq.github.io/rabbitmq-stream-dotnet-client/stable/htmlsingle/index.html#stream-client-overview

Возможно, вас сильно смутил код издателя в разделе выше, а именно в части понимания предназначенияpublishingId и correlationId. Это действительно важные идентификаторы, ведь их правильный подбор обеспечивает поддержку консистентности внутри агрегата

PublishingId – это строго возрастающий идентификатор события. По-умолчанию, он не используется. Но если вы будете использовать дедублицирующего издателя, то по его значению сервер сможет понять, было ли отправлено новое сообщение в поток, или же это случайный дубликат, который был получен из-за лага в сети

var entity = // … ;
var entityEvent = // … ;
 
// инициализация дедублицирующего производителя
var producer = await DeduplicatingProducer.Create(
    new DeduplicatingProducerConfig(streamSystem, "super-stream", "my-producer")
    {
        SuperStreamConfig = new SuperStreamConfig
        {
            Routing = (message) => message.Properties.CorrelationId.ToString()
        }
    });

// отправка сообщения
await producer.Send(
  publishingId: entityEvent.Id,
  new Message(Encoding.UTF8.GetBytes($"Hello, World!"))
    { Properties = new Properties { CorrelationId = entity.Id } }
);

CorrelationId в свою очередь отвечает за то, чтобы все события по одному агрегату отправлялись строго в один поток. Это исключит конкурентность, что гарантирует обработку всех событий строго последовательно. Вообще вместо CorrelationId можно использовать любое другое поле – главное явно указать в Routing, какое значение использовать для вычисления хэша

⚠️Обратите внимание!⚠️

Если вы пробовали запускать код, то наверное, обратили внимание, что вы не получаете дубли уже прочитанных событий, как в случае с AMQP-клиентом. Дело в том, что при указании Reference для подписчика сервер автоматически отслеживает прочитанные им сообщения


🚀 Выводы, или что там по MassTransit?

Мем, передающий впечатления от использования MassTransit
Мем, передающий впечатления от использования MassTransit

😐 Поддержка в MassTransit

В первую очередь, хотелось бы затронуть самую животрепещущую тему: что там по MassTransit? Хорошая новость – он полностью поддерживает RabbitMQ Streams. Плохая – только в контексте AMQP-протокола. То есть со Superstreams поработать не получится по крайней мере на сегодняшний день (пишу из конца 2025 года). Имеет ли смысл использовать RabbitMQ Streams без суперстримов? Для нераспределенных систем – вполне. Если намечается high-load – не думаю

// пример конфигурации стрима в MassTransit
rabbit.ReceiveEndpoint("my-stream", e =>
{
   e.PrefetchCount = 100;
   e.Stream(c =>
   {
      c.FromFirst();
   });
   e.ConfigureConsumer<TransportConsumer>(cfg);
});

😞 Поддержка комьюнити

Немного разочаровала поддержка комьюнити. По RabbitMQ Streams очень много информации в официальных источниках, но в остальном он гуглится так себе. Есть ощущение, что большая часть разработчиков даже не знает, о его существовании

🤔 Целесообразность использования

Скорее всего RabbitMQ Streams подойдет только для тех, кто уже использует RabbitMQ. Это будет отличное решение, чтобы увеличить пропускную способность, но не устраивать зоопарк из брокеров. В остальных случаях, вероятно, лучше изначально отдать предпочтение Apache Kafka (про NATS ничего не могу сказать). Полагаю, это объясняет низкую популярность в комьюнити, вкупе с отсутствием поддержки суперстримов в MassTransit

😃 Документация и легкость в использовании

Что порадовало – так это удобный клиент и понятная документация, делающие RabbitMQ Streams крайне легким в освоении. Это опять же идеально накладывается на ситуацию, когда в вашей команде уже используется RabbitMQ, но начали появляться «высоконагруженные» очереди, которые надо как-то оптимизировать. Ваша команда сможет очень быстро внедрить потоки и поправить ситуацию


На этом у меня всё. Надеюсь, изложенный материал оказался полезен или как минимум любопытен тем, кто заинтересован в использовании RabbitMQ Streams

Only registered users can participate in poll. Log in, please.
Каким брокером сообщений вы пользуетесь?
52.63%RabbitMQ10
26.32%Apache Kafka5
10.53%NATS2
10.53%Другой ( будет интересно, если поделитесь в комментариях 🧐 )2
19 users voted. 4 users abstained.