
Работа с очередями сообщений — важная часть современных систем обработки данных. В нашей команде мы используем брокер сообщений RabbitMQ, но нам пришлось столкнуться с проблемами при обработке большого объема данных. В поисках решений я начал изучать различные способы оптимизации, и таким образом познакомился с RabbitMQ Streams – плагином, добавляющим log-based потоки, работающие по аналогии с Kafka
Я потратил некоторые время, вникая в принципы работы RabbitMQ Streams с .NET и хочу представить вам краткий обзор, который призван упростить погружение в эту систему
Статья будет разбита по вопросам, ответы на которые я последовательно искал, разбираясь с RabbitMQ Streams. В основном вся информация взята прямиком с официальной документации, где в общем-то всё подробно расписано. Поэтому если возникнет желание углубиться – везде будут ссылки 😊
❓ Что это и зачем?
Если вы знакомы с современными брокерами, то скорее всего знаете, что сейчас распространены два разных подхода к обработке сообщений: очередь и поток. Для наглядности в дальнейшем буду ссылаться на ярких представителей каждого из этих подходов: RabbitMQ, собственной персоной, и Apache Kafka соответственно
RabbitMQ | Apache Kafka |
Использует очереди - сообщения хранятся в оперативной памяти Оптимизирован под быстрое «очищение» очереди | Использует потоки - сообщения пишутся на диск Оптимизирован под большие объемы данных |
Каждый из подходов имеет свои слабые и сильные стороны. Так, слабая сторона очередей в RabbitMQ в том, что они плохо справляются с огромными объемами данных:
Низкая пропускная способность в сравнении с потоковыми системами
Низкая производительность при больших очередях
Низкая производительность при большом количестве очередей в «fanout» обменнике (exchange)
Представьте, что сидите вы спокойно на RabbitMQ с 10к сообщний в секунду и вас всё устраивает. Но по одной очереди внезапно начинает приходить 100к сообщений: CPU грузится на 100%, сообщения перестают долетать, дата-центр сгорает дотла и его прах увозят инопланетяне на своем НЛО
Собственно, примерно с этим мы и столкнулись
Что же делать? Внедрять Kafka? К счастью, пересаживаться на другой брокер оказалось необязательно, ведь существует RabbitMQ Streams – плагин, который добавляет в RabbitMQ потоки (streams). Потоки работают примерно так же, как и в Kafka: пишутся прямиком в лог на диске и предоставляются по запросу клиента с любого места
⚠️Небольшой дисклеймер⚠️
Если верить следующему бенчмарку, то RabbitMQ Streams полноценно не заменяет Кафку, зато действительно увеличивает пропускную способность брокера в 3-4 раза: https://www.youtube.com/watch?v=UPkOsXKG4ns
⚙️ Как это работает?
Ссылка на полный материал: https://www.rabbitmq.com/docs/streams#usage
С RabbitMQ Streams можно работать как через отдельный протокол, доступный по порту 5552, так и через привычный AMQP-протокол (по порту 5672), но с некоторыми ограничениями, о которых речь пойдет ниже. Для начала работы достаточно выполнить команду в терминале системы, где установлен брокер. Эта команда включает плагин потоков:
# docker run -it --rm -p 5552:5552 -p 5672:5672 -p 15672:15672 rabbitmq:3-management
rabbitmq-plugins enable rabbitmq_stream
Поток остается AMQP-совместимой очередью (queue). И в принципе вы можете создать его так же, как и обычную очередь, лишь указав атрибут «x-queue-type»
await channel.QueueDeclareAsync(
"my-stream",
durable: true,
exclusive: false,
autoDelete: false,
arguments: new Dictionary<string, object> { { "x-queue-type", "stream" } }
);Небольшие нюансы кроются в создании подписчиков (consumers), так как во-первых, для них необходимо указать prefetch, а во-вторых, в них необходимо учитывать возможность считывать поток с любой точки
// указываем prefetch
await channel.BasicQosAsync(0, prefetchCount: 100, false);
var consumer = new AsyncEventingBasicConsumer(channel);
consumer.ReceivedAsync += async (model, ea) =>
{
// читаем сообщение
var message = Encoding.UTF8.GetString(ea.Body.ToArray());
// получаем позицию сообщения
var offest = ea.BasicProperties.Headers?["x-stream-offset"];
await channel.BasicAckAsync(ea.DeliveryTag, false);
};
var arguments = new Dictionary<string, object?>
{
{ "x-stream-offset", "first" } // читаем с самого первого сообщения в потоке
};
await channel.BasicConsumeAsync("my-stream", false, "my-consumer", arguments, consumer);С точки зрения издателя (producer) абсолютно ничего не меняется. И все возможности роутинга для RabbitMQ полностью сохраняются
// создание fanout обменника
await channel.ExchangeDeclareAsync(exchange: "my-exchange", type: ExchangeType.Fanout);
// бинд очереди за созданным обменником
await channel.QueueBindAsync("my-stream", "my-exchange", "key");
// публикация сообщения в обменник
await channel.BasicPublishAsync(exchange: "my-exchange", routingKey: "key", body: "Hello World!"u8.ToArray());⚠️Обратите внимание!⚠️
Поведение нескольких подписчиков (consumers), считывающих один поток, будет сильно отличаться от их же поведения, но при чтении обычной очереди. Проще говоря, если у вас есть очередь и у неё есть две подписки, то часть сообщений будет отправляться одному подписчику, а часть – другому. Но в случае с потоками, оба подписчика будут читать абсолютно все сообщения
К сожалению, масштабировать потоки, используя AMQP-протокол довольно проблематично. Сразу возникает несколько вопросов:
Что делать, если у сервиса-подписчика будет несколько реплик?
Что если поток будет слишком большим для одной машины?
У RabbitMQ Streams готов ответ на каждый из этих вопросов. Но об этом ниже
А пока хотелось бы отметить, что для нераспределенных систем RabbitMQ Streams делает настоящую магию. Благодаря потокам можно в разы увеличить пропускную способность очередей, отделавшись при этом минимальными доработками
📌Небольшое примечание📌
В статье я использую термины
подписчик / издательвместоconsumer / producer, т.к. в русском языке это звучит плохенько 😕
📈 Как это масштабируется?
Ссылка на полный материал: https://www.rabbitmq.com/blog/2022/07/13/rabbitmq-3-11-feature-preview-super-streams
А также: https://www.rabbitmq.com/tutorials/tutorial-one-dotnet-stream
И ещё это: https://www.rabbitmq.com/docs/stream-connections
🧩 Партиционирование
Возможно, вы уже представили, какой жуткий костыль вам придется пилить, чтобы раскидать данные по нескольким потокам… Но делать этого не придется, ибо разработчики всё предусмотрели и почти дословно перенесли из Kafka систему с топиками и партициями. Называется она Super Streams
На языке RabbitMQ | На языке Apache Kafka |
Super stream | Topic |
Stream | Partition |
Consumers group by Reference (я не нашел нормального термина) | Group |
В AMQP-семантике суперстрим (super stream) – это обменник (exchange) с некоторым количеством очередей-потоков в нём

Для того, чтобы распределить потоки между репликами, используется специальная фича – Single Active Consumer (не путать с фичей с аналогичным названием, но для очередей). Благодаря ней, у конкретного потока может быть только один активный подписчик. Остальные подписчики становятся неактивными. Это исключает конкурентную обработку одного потока

В итоге при запуске нескольких реплик получается следующая картина: по каждому подключению запускается сразу несколько подписчиков на разные потоки одного суперстрима. Но при этом соотношение активных и неактивных подписчиков равномерно распределено по всем подключениям. Ну и соответственно если подписчик на одном подключении активен, то на остальных он будет неактивен

Также хочу обратить внимание на то, что по каждому суперстриму может быть несколько групп подписчиков. В RabbitMQ Streams для каждого подписчика можно указать Reference, по значению которого будет проверяться его принадлежность к той или иной группе. Это крайне важная особенность, ведь без неё не получится продублировать сообщения из суперстрима сразу в несколько сервисов (допустим, у вас есть domain-logic-service, который выполняет какую-то бизнес-логику, и logs-service, который просто собирает логи; и оба сервиса хотят получать все события из потока)
Сама логика маршрутизации сообщения в тот или иной поток суперстрима заложена в клиенте RabbitMQ.Stream.Client. Он отличается от обычного AMQP-клиента тем, что использует совершенно другой протокол (без которого невозможно работать с Single Active Consumer как минимум). Этот клиент умный и умеет вычислять хэш по переданному ему идентификатору сообщения. По этому хэшу он и распределяет сообщения между потоками
Подробнее со спецификацией протокола, используемого для взаимодействия с потоками, вы можете ознакомиться здесь: https://github.com/rabbitmq/rabbitmq-server/blob/v4.2.x/deps/rabbitmq_stream/docs/PROTOCOL.adoc#credit
И раз уж зашла речь про клиент для стримов, то предлагаю уже с ним ознакомиться
// конфигурируем подключение
var cfg = new StreamSystemConfig
{
Password = "guest",
UserName = "guest",
VirtualHost = "/",
Endpoints = new List<EndPoint>
{
new IPEndPoint(IPAddress.Loopback, 5552)
}
};
var streamSystem = await StreamSystem.Create(cfg);
// создаем суперстрим на 6 партиций (стримов)
await streamSystem.CreateSuperStream(new PartitionsSuperStreamSpec("super-stream", 6));
// создаем подписчика
var superConsumer = await streamSystem.CreateSuperStreamConsumer(new RawSuperStreamConsumerConfig("super-stream")
{
IsSingleActiveConsumer = true,
Reference = "ConsumerGroup0",
MessageHandler = async (stream, _, _, message) =>
Console.WriteLine(Encoding.UTF8.GetString(message.Data.Contents)),
});
// …
// создаем издателя
var producer = await streamSystem.CreateRawSuperStreamProducer(new RawSuperStreamProducerConfig("super-stream")
{
Routing = (message) => message.Properties.CorrelationId.ToString()
});
// пишем в суперстрим
await producer.Send(
publishingId: 0,
new Message(Encoding.UTF8.GetBytes($"Hello, World!"))
{ Properties = new Properties { CorrelationId = "1" } }
);
// что за магический нуль первым аргументом – поговорим чуть ниже 😊Примерно так и осуществляется работа с суперстримами в .NET. Хотелось бы написать какой-то комментарий, но тут больше и нечего добавить – интуитивно понятное API говорит само за себя
Единственное, что хотелось бы отметить, так это то, что всё богатство роутинга в RabbitMQ, к сожалению, исчезает в режиме суперстримов. С этим ничего не сделаешь – остается только фильтровать сообщения на стороне подписчика: https://rabbitmq.github.io/rabbitmq-stream-dotnet-client/stable/htmlsingle/index.html#_filtering
🌐 Кластеризация
В работе с кластером есть некоторые нюансы. Дело в том, что обычно неважно, к какой именно ноде вы коннектитесь. Но в случае с RabbitMQ Streams это не так: клиент должен обладать информацией о всех нодах. Это приводит к некоторым проблемам, если вы используете балансировщик нагрузки. Впрочем решение этой проблемы есть, и ознакомиться с ним вы можете здесь: https://rabbitmq.github.io/rabbitmq-stream-dotnet-client/stable/htmlsingle/index.html#address-resolver
А работает кластеризация для потоков достаточно предсказуемо. При создании нового потока выбирается лидер для него. Остальные ноды будут получать реплики этого потока. Ожидается, что только издатель будет взаимодействовать с потоком-лидером, то бишь писать в него. Подписчики же будут обращаться к одной из реплик для чтения. На этом моменте и возникают проблемы с балансировщиками нагрузки, ведь им неизвестно, где находится лидер, а где – реплика

Подразумевается, что одна нода не может содержать все потоки-лидеры: они будут равномерно раскиданы по всему кластеру. И в общем-то это всё, что нужно знать для начала работы
Здесь опять же используются возможности «умного» клиента. Он, как было сказано выше, знает о существовании всех нод. А ещё он умеет запрашивать мета-информацию о кластере
На практике работа с кластером выглядит примерно так
var cfg = new StreamSystemConfig
{
Password = "guest",
UserName = "guest",
VirtualHost = "/",
// указываем все доступные ноды
Endpoints = new List<EndPoint>
{
new DnsEndPoint("rabbitmq1", 5552),
new DnsEndPoint("rabbitmq2", 5552),
new DnsEndPoint("rabbitmq3", 5552)
}
};
var streamSystem = await StreamSystem.Create(cfg);
// создаем поток, применяя механизм выбора лидера
await streamSystem.CreateSuperStream(new PartitionsSuperStreamSpec("super-stream", 3)
{
LeaderLocator = LeaderLocator.LeastLeaders,
});📊 Как соблюсти консистентность?
Ссылка на полный материал: https://rabbitmq.github.io/rabbitmq-stream-dotnet-client/stable/htmlsingle/index.html#stream-client-overview
Возможно, вас сильно смутил код издателя в разделе выше, а именно в части понимания предназначенияpublishingId и correlationId. Это действительно важные идентификаторы, ведь их правильный подбор обеспечивает поддержку консистентности внутри агрегата
PublishingId – это строго возрастающий идентификатор события. По-умолчанию, он не используется. Но если вы будете использовать дедублицирующего издателя, то по его значению сервер сможет понять, было ли отправлено новое сообщение в поток, или же это случайный дубликат, который был получен из-за лага в сети
var entity = // … ;
var entityEvent = // … ;
// инициализация дедублицирующего производителя
var producer = await DeduplicatingProducer.Create(
new DeduplicatingProducerConfig(streamSystem, "super-stream", "my-producer")
{
SuperStreamConfig = new SuperStreamConfig
{
Routing = (message) => message.Properties.CorrelationId.ToString()
}
});
// отправка сообщения
await producer.Send(
publishingId: entityEvent.Id,
new Message(Encoding.UTF8.GetBytes($"Hello, World!"))
{ Properties = new Properties { CorrelationId = entity.Id } }
);CorrelationId в свою очередь отвечает за то, чтобы все события по одному агрегату отправлялись строго в один поток. Это исключит конкурентность, что гарантирует обработку всех событий строго последовательно. Вообще вместо CorrelationId можно использовать любое другое поле – главное явно указать в Routing, какое значение использовать для вычисления хэша
⚠️Обратите внимание!⚠️
Если вы пробовали запускать код, то наверное, обратили внимание, что вы не получаете дубли уже прочитанных событий, как в случае с AMQP-клиентом. Дело в том, что при указании
Referenceдля подписчика сервер автоматически отслеживает прочитанные им сообщения
🚀 Выводы, или что там по MassTransit?

😐 Поддержка в MassTransit
В первую очередь, хотелось бы затронуть самую животрепещущую тему: что там по MassTransit? Хорошая новость – он полностью поддерживает RabbitMQ Streams. Плохая – только в контексте AMQP-протокола. То есть со Superstreams поработать не получится по крайней мере на сегодняшний день (пишу из конца 2025 года). Имеет ли смысл использовать RabbitMQ Streams без суперстримов? Для нераспределенных систем – вполне. Если намечается high-load – не думаю
// пример конфигурации стрима в MassTransit
rabbit.ReceiveEndpoint("my-stream", e =>
{
e.PrefetchCount = 100;
e.Stream(c =>
{
c.FromFirst();
});
e.ConfigureConsumer<TransportConsumer>(cfg);
});😞 Поддержка комьюнити
Немного разочаровала поддержка комьюнити. По RabbitMQ Streams очень много информации в официальных источниках, но в остальном он гуглится так себе. Есть ощущение, что большая часть разработчиков даже не знает, о его существовании
🤔 Целесообразность использования
Скорее всего RabbitMQ Streams подойдет только для тех, кто уже использует RabbitMQ. Это будет отличное решение, чтобы увеличить пропускную способность, но не устраивать зоопарк из брокеров. В остальных случаях, вероятно, лучше изначально отдать предпочтение Apache Kafka (про NATS ничего не могу сказать). Полагаю, это объясняет низкую популярность в комьюнити, вкупе с отсутствием поддержки суперстримов в MassTransit
😃 Документация и легкость в использовании
Что порадовало – так это удобный клиент и понятная документация, делающие RabbitMQ Streams крайне легким в освоении. Это опять же идеально накладывается на ситуацию, когда в вашей команде уже используется RabbitMQ, но начали появляться «высоконагруженные» очереди, которые надо как-то оптимизировать. Ваша команда сможет очень быстро внедрить потоки и поправить ситуацию
На этом у меня всё. Надеюсь, изложенный материал оказался полезен или как минимум любопытен тем, кто заинтересован в использовании RabbitMQ Streams