
Серия: redb ecosystem / redb.Route deep-dive
Очередная статья из цикла про redb.Route — наш Apache Camel под .NET. Если вы только подключились, вот предыдущие на Хабре:
redb.Route — Apache Camel для .NET, который мы написали потому что выхода другого не было — с чего всё началось;
redb.Route изнутри: четыре in‑memory канала и Exchange, который их связывает;
redb.Route 3.0.1 — плоская навигация по DSL, рефакторинг CRTP и тихий null;
Apache Camel под .NET, разбор по косточкам: HTTP‑коннектор без ASP.NET MVC + паттерн Content‑Based Router — предыдущая «EIP + коннектор».
Сегодня заходим с Kafka‑коннектора — разбираем его по косточкам, как делали с HTTP, — а потом сажаем на него два EIP‑паттерна: Scatter‑Gather и Aggregator. И главное — разбираем то, о чём в туториалах молчат: как это живёт под транзакциями. Заодно вышел 3.2.0.
Сразу спойлер
Никакого «exactly‑once из коробки» в Kafka здесь нет — и ниже по коду будет видно, почему именно. Что работает, а что нет — по факту, без округлений.
далее ниже Боевой пример: один HTTP‑запрос → шесть параллельных агрегаций
Оглавление
1. Зачем уходить от MassTransit
MassTransit — отличная штука, и я не собираюсь её хоронить. Но это opinionated‑фреймворк: durable saga как state‑machine, transactional outbox, retry‑политики — всё зашито внутрь, и ты живёшь по правилам фреймворка. Когда твой сценарий совпадает с его картиной мира — это кайф, всё работает «из коробки». Когда не совпадает — ты воюешь с абстракцией, лезешь в кишки и пишешь костыли поверх чужих решений.
Apache Camel — другая школа. В фундаменте: EIP‑примитивы + коннекторы + DSL. Тебе никто не «дарит» outbox как фичу — тебе дают Splitter, Aggregator, Scatter‑Gather, транзакционные транспорты, и ты сам собираешь ровно то, что нужно под твои инварианты. Меньше магии — больше явной сборки из кубиков.
redb.Route стоит на второй школе. Поэтому в заголовке — «идём к Apache Camel»: не «у нас всё из коробки», а «у нас кубики, и они стыкуются предсказуемо». Дальше будет видно, чем именно этот размен оплачивается и что взамен получаешь.
2. Что приехало в 3.2.0
Полный список — в CHANGELOG. Для нашей темы важны два пункта:
Параллельные
Splitter/Multicastтеперь изолируют ambient‑транзакцию по ветке. Каждая ветка получает свойDependentTransaction.DependentClone(BlockCommitUntilComplete); родительский коммит ждёт завершения всех веток. Раньше параллельные ветки шарили одинTransaction.Current, аSystem.Transactionsзапрещает конкурентное использование одной транзакции из нескольких потоков. Ниже мы разберём этот фикс по коду и поймём, почему Scatter‑Gather в нём не нуждался.Throttleполучил RFC 6585‑режим.RejectOnOverflow()(429 +Retry‑After). К нашей теме напрямую не относится, но если строите HTTP‑фасад перед всем этим — пригодится.
Все пакеты redb.Route.* версионируются вместе: 3.2.0.
3. Kafka‑коннектор: анатомия URI и фабрика подключений
Начнём, как просили, с Kafka. И начнём с самого простого, что есть — строкового URI эндпоинта. Вот минимальный консьюмер и минимальный продюсер:
// Консьюмер: читаем топик orders, группа order-workers, с самого начала From("kafka:orders?brokers=kafka:9092&groupId=order-workers&autoOffsetReset=earliest") .Log("Получили: ${body}") .To("direct:process"); // Продюсер: публикуем в топик notifications с acks=all From("direct:notify") .To("kafka:notifications?brokers=kafka:9092&acks=All");
Схема URI — kafka:<topic>?<query>. Имя топика берётся из пути (KafkaEndpoint):
public KafkaEndpoint(EndpointUri uri, KafkaComponent component, KafkaEndpointOptions options) : base(uri, component, options) { TopicName = uri.Path; // ← топик = путь URI // Если в URI указан connectionFactory=<имя> — резолвим фабрику из реестра if (!string.IsNullOrEmpty(options.ConnectionFactory) && component.Context is not null) ResolvedFactory = component.Context.GetFromRegistry<KafkaConnectionFactory>(options.ConnectionFactory); // brokers в URI нет, но фабрика их знает — подставляем if (string.IsNullOrWhiteSpace(options.Brokers) && ResolvedFactory is not null) options.Brokers = ResolvedFactory.Brokers; }
Обратите внимание на последний штрих: на консьюмере groupId обязателен, и если его забыть — эндпоинт не даст создать консьюмер:
public override IConsumer CreateConsumer(IProcessor processor) { if (string.IsNullOrWhiteSpace(Options.GroupId)) throw new InvalidOperationException( $"The 'groupId' parameter is required for Kafka consumer on topic '{TopicName}'."); return new KafkaConsumer(this, processor, Options); }
Фабрика подключений — чтобы не дублировать настройки
Когда у тебя двадцать Kafka‑эндпоинтов на один кластер, расписывать brokers=...&securityProtocol=...&saslMechanism=... в каждом URI — самоубийство. Для этого есть KafkaConnectionFactory: регистрируешь её в реестре под именем и ссылаешься через connectionFactory=имя.
Полный её параметр‑сет (всё из исходника, ничего не выдумано):
Группа | Свойства | Дефолт |
|---|---|---|
Брокеры |
|
|
Безопасность |
|
|
SSL/TLS |
| — |
Продюсер |
|
|
Консьюмер |
| —, |
Тюнинг консьюмера |
| — |
Тюнинг продюсера |
| — |
Reconnect |
| — |
Advanced |
|
|
Регистрация и использование:
context.AddToRegistry("prod-cluster", new KafkaConnectionFactory { Brokers = "kafka1:9092,kafka2:9092,kafka3:9092", SecurityProtocol = "SaslSsl", SaslMechanism = "ScramSha512", SaslUsername = "svc-orders", SaslPassword = secret, CompressionType = "Zstd", MaxInFlight = 5, }); // Теперь в URI — только то, что специфично для эндпоинта: From("kafka:orders?connectionFactory=prod-cluster&groupId=order-workers"); To("kafka:notifications?connectionFactory=prod-cluster&acks=All");
KafkaConnectionFactory умеет собрать три разных конфига — BuildConsumerConfig(), BuildProducerConfig(), BuildAdminConfig() (последний используется для запроса метаданных топика). Эндпоинт‑уровневые опции применяются поверх фабрики (if (!string.IsNullOrWhiteSpace(Brokers)) config.BootstrapServers = Brokers; и т.д.) — то есть фабрика задаёт базу, URI её точечно перекрывает.
Все опции эндпоинта
Полный список — KafkaEndpointOptions. Группами, с дефолтами:
Подключение / безопасность: brokers (обязателен, иначе Validate() бросит), securityProtocol (Plaintext/Ssl/SaslPlaintext/SaslSsl), saslMechanism/saslUsername/saslPassword, sslCaLocation/sslCertificateLocation/sslKeyLocation/sslKeyPassword, connectionFactory.
Консьюмер: groupId, autoOffsetReset (Latest/Earliest/Error, дефолт Latest), enableAutoCommit (дефолт true, framework-level — с 3.2.1), maxPollRecords (0 = одиночный режим, >0 = батч), pollTimeoutMs (1000), breakOnFirstError, seekTo (beginning/end), topicIsPattern, groupInstanceId, sessionTimeoutMs, heartbeatIntervalMs, maxPollIntervalMs, partitionAssignmentStrategy (Range/RoundRobin/CooperativeSticky), isolationLevel (ReadUncommitted/ReadCommitted).
Продюсер: acks (None/Leader/All, дефолт Leader), retries (3), recordMetadata, key, partitionNumber, transacted, transactionIdPrefix (redb-kafka).
Тюнинг продюсера: lingerMs, batchSize, compressionType (None/Gzip/Snappy/Lz4/Zstd), messageTimeoutMs.
Advanced: additionalProperties — произвольные librdkafka‑свойства, применяются последними и перекрывают всё типизированное.
Validate() проверяет ровно три вещи (по коду): brokers непустой, maxPollRecords >= 0, pollTimeoutMs >= 0, retries >= 0. Всё остальное либо имеет дефолт, либо опционально.
Один нюанс — про коммит оффсета, и он важный. На уровне librdkafka EnableAutoCommit прошит в false всегда — библиотека сама, по таймеру, ничего не коммитит (иначе закоммитила бы непрочитанное). Из BuildConsumerConfig:
var cfg = new ConsumerConfig { BootstrapServers = Brokers, GroupId = GroupId, AutoOffsetReset = ParseAutoOffsetReset(), EnableAutoCommit = false, // librdkafka-уровень: ручной коммит всегда };
Но коммит, конечно, есть — поверх, на уровне фреймворка. С 3.2.1 у консьюмера появилась типизированная опция EnableAutoCommit (дефолт true): после успешного Process консьюмер коммитит оффсет инлайн — ровно как RabbitMQ‑консьюмер ацкает после обработки. Ставится из URI (?enableAutoCommit=false) или флентом (.EnableAutoCommit(false)). Транзакционный маршрут имеет приоритет: если оффсет уже закоммитила .Transacted(), инлайн‑ветка пропускается. Механику разберём в §5.
То же самое флентом
Строковые URI хороши для коротких примеров и для конфигов. В коде эндпоинты обычно строят флентом — Kafka.Topic(...):
From(Kafka.Topic("orders") .Brokers("kafka1:9092,kafka2:9092") .GroupId("order-workers") .AutoOffsetReset("Earliest") .MaxPollRecords(500) .PartitionAssignmentStrategy("CooperativeSticky") .IsolationLevel("ReadCommitted") .SessionTimeout(30_000) .MaxPollInterval(300_000)); To(Kafka.Topic("notifications") .Brokers("kafka1:9092") .Acks("All") .Compression("zstd") .Linger(20) .BatchSize(64 * 1024) .Key("${header.orderId}")); // ключ партиционирования — выражение
Флент‑билдер просто собирает ту же query‑строку (Kafka.Topic("orders").Brokers(...)...Build() → "kafka:orders?brokers=...&..."), с URL‑энкодингом значений. Сеттеры принимают IExpression, но резолвятся опции в разные моменты: ключ продюсера вычисляется ${...}‑выражением на каждое сообщение (в DetermineKey на момент отправки — см. ниже), а connection‑level опции (brokers, креды) разбираются один раз на коннекте, где exchange ещё нет, так что выражение в них смысла не имеет. Per‑message‑история — это именно про ключ партиционирования.
4. Kafka‑продюсер: что происходит на .To("kafka:...")
Теперь разберём, что реально делает KafkaProducer, когда сообщение доходит до .To("kafka:..."). Это ConnectableProducer — то есть требует Start() перед использованием (EnsureStarted() в начале Process).
Коннект
protected override Task ConnectAsync(CancellationToken ct) { var config = _options.BuildProducerConfig(_endpoint.ResolvedFactory); _producer = new ProducerBuilder<string, byte[]>(config) .SetValueSerializer(Serializers.ByteArray) .SetErrorHandler((_, e) => Logger?.LogError("Kafka producer error: {Reason} (Code: {Code})", e.Reason, e.Code)) .Build(); if (_options.Transacted) { _producer.InitTransactions(TimeSpan.FromSeconds(30)); Logger?.LogInformation("Kafka transactional mode enabled: topic={Topic}", _endpoint.TopicName); } return Task.CompletedTask; }
Ключ — string, значение — byte[]. На дисконнекте — Flush(30s) перед Dispose, чтобы не потерять буферизованные сообщения.
Подготовка сообщения
PrepareMessage превращает тело exchange в Message<string, byte[]>:
byte[] valueBytes = body switch { byte[] bytes => bytes, // уже байты — как есть string str => Encoding.UTF8.GetBytes(str), // строка → UTF-8 null => Array.Empty<byte>(), // null → пустой массив _ => Encoding.UTF8.GetBytes(body.ToString() ?? string.Empty) // прочее → ToString → UTF-8 };
Дальше — заголовки. ContentType пробрасывается отдельным заголовком content-type, а пользовательские заголовки exchange переезжают в Kafka‑заголовки — кроме внутренних redbKafka.*, которые отфильтровываются:
foreach (var (key, value) in exchange.In.Headers) { if (KafkaHeaders.IsRedbHeader(key)) // redbKafka.* — внутренние, не пробрасываем continue; msg.Headers.Add(key, Encoding.UTF8.GetBytes(value?.ToString() ?? string.Empty)); }
Ключ партиционирования
DetermineKey — три ветки, в порядке приоритета:
private string? DetermineKey(IExchange exchange) { // 1. Явная партиция → ключ не нужен (партиционер обходится стороной) if (_options.PartitionNumber.HasValue) return null; // 2. Ключ из опции — поддерживает ${...}-выражения, резолвится на каждое сообщение if (!string.IsNullOrWhiteSpace(_options.Key)) { var resolved = _options.ResolveOption(_options.Key, exchange); if (!string.IsNullOrEmpty(resolved)) return resolved; } // 3. Иначе — без ключа (round-robin по партициям) return null; }
То есть key=${header.orderId} означает «партиционируй по orderId» — все события одного заказа лягут в одну партицию и сохранят порядок.
Отправка: немедленная vs отложенная
Вот развилка, которая важна для транзакций:
if (_options.Transacted) { // Отложенная отправка — реальный publish случится на коммите маршрутной транзакции var action = new KafkaSendAction(_producer, _endpoint.TopicName, message, _options.PartitionNumber, _options.RecordMetadata, exchange, Logger); RegisterTransactedAction(exchange, $"kafka-send-{Guid.NewGuid():N}", action); } else { // Немедленная отправка var result = _options.PartitionNumber.HasValue ? await _producer.ProduceAsync(new TopicPartition(_endpoint.TopicName, new Partition(_options.PartitionNumber.Value)), message, ct) : await _producer.ProduceAsync(_endpoint.TopicName, message, ct); if (_options.RecordMetadata) AddDeliveryMetadata(exchange, result); // redbKafka.Sent.Topic/Partition/Offset/Timestamp }
В транзакционном режиме продюсер не отправляет сразу — он кладёт KafkaSendAction в словарь отложенных действий с уникальным ключом kafka-send-{guid}. Реальный ProduceAsync произойдёт позже, на границе .Transacted(). К этому вернёмся.
KafkaSendAction при создании клонирует сообщение (глубокая копия заголовков через Buffer.BlockCopy), чтобы отложенная отправка не зависела от того, что случится с exchange дальше:
public async Task Commit(CancellationToken ct = default) { var result = _partition.HasValue ? await _producer.ProduceAsync(new TopicPartition(_topicName, new Partition(_partition.Value)), _message, ct) : await _producer.ProduceAsync(_topicName, _message, ct); if (_recordMetadata) { /* пишем Sent.* заголовки */ } } public Task Rollback(CancellationToken ct = default) { // Сообщение просто выбрасывается — ничего не публиковали return Task.CompletedTask; }
Трейсинг сквозь брокер
Перед отправкой продюсер инжектит W3C trace context (traceparent/tracestate) в Kafka‑заголовки через стандартный DistributedContextPropagator:
var propagator = DistributedContextPropagator.Current; propagator.Inject(activity, headers, static (carrier, key, value) => { if (carrier is Headers h && !string.IsNullOrEmpty(value)) { h.Remove(key); h.Add(key, Encoding.UTF8.GetBytes(value)); } });
Консьюмер на другом конце это поднимет — и в трейсе вы увидите непрерывную цепочку через брокер. Без единой строчки настройки с вашей стороны.
5. Kafka‑консьюмер: поллинг, батчи, ребаланс
KafkaConsumer — это DrainableConsumer (умеет gracefully дослить in‑flight сообщения при остановке). Разберём по шагам.
Старт: подписка/назначение, seek, метаданные
protected override Task OnStarting(CancellationToken ct) { var config = _options.BuildConsumerConfig(_endpoint.ResolvedFactory); _consumer = new ConsumerBuilder<string, byte[]>(config) .SetValueDeserializer(Deserializers.ByteArray) .SetErrorHandler((_, e) => { /* fatal → LogError, иначе LogWarning */ }) .SetPartitionsAssignedHandler((c, partitions) => { /* лог assigned */ }) .SetPartitionsRevokedHandler((c, partitions) => { // Перед отзывом партиций — коммитим текущие оффсеты try { c.Commit(partitions); } catch (KafkaException) { /* нечего коммитить — ок */ } }) .SetPartitionsLostHandler((_, partitions) => { /* лог involuntary loss */ }) .Build(); if (_options.PartitionNumber.HasValue) _consumer.Assign(new[] { new TopicPartition(_endpoint.TopicName, new Partition(_options.PartitionNumber.Value)) }); else _consumer.Subscribe(_endpoint.TopicName); HandleSeekTo(); // seekTo=beginning/end на первом старте LogTopicMetadata(); // дамп: партиции, лидеры, реплики, ISR return Task.CompletedTask; }
Два режима членства: либо Subscribe(topic) (динамическое назначение партиций группой + ребаланс), либо явный Assign(partition) если задан partitionNumber (статическое назначение, без группового ребаланса).
LogTopicMetadata при старте поднимает AdminClient и логирует топологию топика — сколько партиций, кто лидер каждой, реплики, ISR. Удобно для дебага «почему меня назначило только на 2 из 6 партиций».
Поллинг‑цикл
RunAsync запускает long‑running таску с поллинг‑циклом:
private async Task PollLoop(CancellationToken pollCt, CancellationToken processingCt) { while (!pollCt.IsCancellationRequested) { try { if (_options.MaxPollRecords > 0) await ProcessBatch(pollCt, processingCt); // батч-режим else await ProcessSingleMessage(pollCt, processingCt); // по одному } catch (OperationCanceledException) { break; } catch (Exception ex) { Logger?.LogError(ex, "Error in Kafka poll loop for topic {Topic}", _endpoint.TopicName); try { await Task.Delay(1000, pollCt); } catch (OperationCanceledException) { break; } } } }
Поймали исключение в цикле — залогировали, подождали секунду, продолжили (не убиваем консьюмер из‑за одного сбойного сообщения).
Одиночный режим
private async Task ProcessSingleMessage(CancellationToken pollCt, CancellationToken processingCt) { var result = _consumer!.Consume(pollCt); if (result?.Message is null) return; var exchange = CreateExchange(result); IncrementInflight(); try { // Регистрируем коммит оффсета как отложенное действие — на случай транзакционного маршрута. var commitAction = new KafkaCommitAction(_consumer, result, Logger); RegisterTransactedAction(exchange, $"kafka-commit-{result.Offset.Value}", commitAction); await Processor.Process(exchange, processingCt); ProcessedCount++; // 3.2.1: авто-коммит после успеха — если оффсет не закоммитила транзакция (флаг Committed). if (_options.EnableAutoCommit && !commitAction.Committed) await commitAction.Commit(processingCt); } finally { await exchange.DisposeAsync(); DecrementInflight(); } }
Важная деталь: консьюмер регистрирует KafkaCommitAction до обработки. То есть оффсет закоммитится не сразу, а на коммите маршрутной транзакции — вместе с любыми отложенными отправками. Если обработка упадёт — оффсет не коммитнется, сообщение приедет снова.
KafkaCommitAction.Commit — это обычный _consumer.Commit(result):
public Task Commit(CancellationToken ct = default) { _consumer.Commit(_result); // ← обычный коммит оффсета, НЕ SendOffsetsToTransaction return Task.CompletedTask; } public Task Rollback(CancellationToken ct = default) => Task.CompletedTask; // не коммитим — сообщение перечитается
Батч‑режим
При maxPollRecords > 0 консьюмер собирает пачку до maxPollRecords штук или до дедлайна pollTimeoutMs, и отдаёт их одним exchange, в теле которого — List<IMessage>:
while (batch.Count < _options.MaxPollRecords && DateTime.UtcNow < deadline && !pollCt.IsCancellationRequested) { var result = _consumer!.Consume(TimeSpan.FromMilliseconds(100)); if (result?.Message is not null) batch.Add(result); } if (batch.Count == 0) return; var exchange = CreateBatchExchange(batch); // Body = List<IMessage>, redbKafka.BatchSize = N // Коммитим только последний оффсет батча var last = batch[^1]; RegisterTransactedAction(exchange, $"kafka-batch-commit-{last.Offset.Value}", new KafkaCommitAction(_consumer!, last, Logger)); await Processor.Process(exchange, processingCt);
Тонкость: коммитится только последний оффсет батча (Kafka‑оффсеты монотонны в партиции, коммит последнего покрывает всю пачку). Дальше в маршруте можно сделать .Split(body => body) и обработать каждое сообщение по отдельности — но коммит всё равно один на батч.
Как exchange наполняется метаданными
CreateExchange кладёт тело (byte[]), переносит Kafka‑заголовки (декодируя как UTF‑8), восстанавливает ContentType, и проставляет метаданные:
message.Headers[KafkaHeaders.Topic] = result.Topic; message.Headers[KafkaHeaders.Partition] = result.Partition.Value; message.Headers[KafkaHeaders.Offset] = result.Offset.Value; if (result.Message.Timestamp.Type != TimestampType.NotAvailable) message.Headers[KafkaHeaders.Timestamp] = result.Message.Timestamp.UtcDateTime; if (!string.IsNullOrEmpty(result.Message.Key)) message.Headers[KafkaHeaders.Key] = result.Message.Key; var exchange = Exchange.Create(message, _endpoint.ScopeFactory); exchange.Pattern = ExchangePattern.InOnly; // консьюмер — InOnly
Что делает консьюмер, пока сообщение идёт по маршруту
Разберём по шагам ProcessSingleMessage, потому что это объясняет и про коммит:
Consume(pollCt)— забрал одно сообщение (в одиночном режиме поллинг блокирующий: следующее не заберётся, пока текущее не отработает);CreateExchange— собрал exchange;IncrementInflight()— пометил сообщение как in‑flight (это нужноDrainableConsumerдля graceful‑остановки: при стопе он дождётся всех in‑flight);зарегистрировал
KafkaCommitActionвTRANSACT_ACTION— но пока не закоммитил;await Processor.Process(exchange)— прогнал сообщение через весь маршрут (синхронно ждёт завершения);ProcessedCount++;если
EnableAutoCommit(дефолтtrue) и оффсет ещё не закоммитила транзакция — коммитит инлайн прямо здесь;finally:exchange.DisposeAsync()+DecrementInflight().
Ключевой момент: по умолчанию (EnableAutoCommit=true, с 3.2.1) консьюмер коммитит оффсет инлайн после успешного Process — at‑least‑once, как и RabbitMQ. Регистрация KafkaCommitAction в TRANSACT_ACTION нужна для другого случая — когда оффсет должна закоммитить транзакция: тогда .Transacted() коммитит его на своей границе, а флаг Committed на действии не даёт консьюмеру закоммитить второй раз инлайн.
В батч‑режиме то же самое, только Process получает один exchange с List<IMessage> в теле, а коммитится (инлайн или транзакцией) последний оффсет батча.
Как коммитится оффсет: дефолт и транзакционный режим
С 3.2.1 поведение по умолчанию простое и совпадает с кроликом: обработал — закоммитил.
// ✅ Дефолт (EnableAutoCommit=true): оффсет коммитится инлайн после успешного Process. // Голый консьюмер настраивать не нужно — at-least-once из коробки. From("kafka:orders?brokers=kafka:9092&groupId=w") .To("direct:process");
Транзакционный режим нужен, когда оффсет надо закоммитить вместе с другой работой (отложенные отправки в Kafka/Redis, redb‑запись) — атомарно на границе маршрута. Тогда выключаешь авто‑коммит и оборачиваешь в транзакцию: коммит оффсета становится частью пачки TRANSACT_ACTION.
// ✅ Транзакционно: оффсет + отложенные отправки коммитятся вместе на границе .Transacted(). // enableAutoCommit=false → инлайн-коммита нет, оффсет коммитит транзакция. From("kafka:orders?brokers=kafka:9092&groupId=w&enableAutoCommit=false") .Transacted() .To("kafka:orders.done?brokers=kafka:9092&transacted=true") .EndTransaction(); // ✅ Императивно — то же самое через явный commit. From("kafka:orders?brokers=kafka:9092&groupId=w&enableAutoCommit=false") .BeginTransaction() .To("direct:process") .CommitTransaction();
Если оставить enableAutoCommit=true (дефолт) и при этом обернуть в .Transacted() — транзакция имеет приоритет: она коммитит оффсет на границе, а флаг Committed на KafkaCommitAction не даёт консьюмеру продублировать коммит инлайн. То есть в транзакционном маршруте опция де‑факто игнорится.
Кстати, в демках redb.Route.Demo/Routes Kafka используется только как продюсер (
.WireTap(KafkaWireTap)на топикdemo-auditвMainPipelineRoutes.cs), консьюмера там нет — поэтому боевой паттерн consume‑process‑produce смотри в разделе про транзакции.
6. Заголовки и трейсинг сквозь брокер
Полный справочник заголовков — KafkaHeaders. Префикс у всех — redbKafka.:
Заголовок | Кто ставит | Что значит |
|---|---|---|
| консьюмер | топик прочитанного сообщения |
| консьюмер | партиция |
| консьюмер | оффсет в партиции |
| консьюмер | таймстемп записи |
| консьюмер | ключ (если был) |
| консьюмер | размер батча (батч‑режим) |
| продюсер | топик, куда отправили (при |
| продюсер | партиция назначения |
| продюсер | присвоенный оффсет |
| продюсер | время отправки |
В маршруте к ним обращаешься как ${header.redbKafka.Offset}:
From("kafka:orders?brokers=kafka:9092&groupId=w") .Log("offset=${header.redbKafka.Offset} partition=${header.redbKafka.Partition} key=${header.redbKafka.Key}");
Эти redbKafka.* при ре‑публикации не пробрасываются в исходящее Kafka‑сообщение (фильтр IsRedbHeader в продюсере) — чтобы метаданные одного хопа не утекали в следующий.
7. Честно про transacted=true
Вот место, где маркетинг любит сказать «exactly‑once». Открываем код.
Что делает transacted=true (из BuildProducerConfig):
if (Transacted) { config.EnableIdempotence = true; config.Acks = Acks.All; // обязательно для идемпотентного продюсера }
И всё: transactional.id не ставится, InitTransactions / BeginTransaction / CommitTransaction / SendOffsetsToTransaction — нигде. Транзакционный API librdkafka не задействован.
Что это значит по факту:
«transacted» здесь = идемпотентный продюсер (
EnableIdempotence=true, дедуп ретраев в рамках сессии) + отложенная отправка (черезKafkaSendAction, реальныйProduceAsyncна границе маршрутной транзакции);это НЕ Kafka‑EOS поверх consume‑process‑produce: нет атомарной Kafka‑транзакции через
SendOffsetsToTransaction;краш ровно между отправкой и коммитом оффсета на рестарте даст повтор (at‑least‑once), а не exactly‑once.
Поэтому честная формулировка — «идемпотентность + deferred‑commit на уровне роута», а не exactly‑once. Для большинства задач «не потерять сообщение и не закоммитить оффсет раньше, чем сделали работу» этого хватает; настоящий read‑process‑write EOS (BeginTransaction / SendOffsetsToTransaction / CommitTransaction) — отдельная история, и её тут нет.
Апач Кемел, к слову, EOS через transacted() тоже не даёт: там авто‑коммит по умолчанию + ручной commit (KafkaManualCommit) + idempotent‑repo для дедупа — та же модель. Так что это не «недо‑Kafka», а ровно тот же честный размен, что и у зрелого интеграционного фреймворка.
Теперь, когда Kafka разобран, можно ставить на него EIP.
8. EIP #1 — Scatter‑Gather: один процессор, и разослал, и собрал
Классический Scatter‑Gather из книги Хопе/Вульфа — это «разослать запрос N получателям и собрать их ответы в один». В redb.Route это один процессор ScatterGatherProcessor, у которого агрегатор обязательный, а параллельность включена по умолчанию.
Самый текстовый вид — Kafka на входе, fan‑out по трём сервисам, склейка, публикация результата:
From("kafka:orders.incoming?brokers=kafka:9092&groupId=enricher&autoOffsetReset=earliest") .RouteId("order-enrich") .ScatterGather( // (накопленное, текущее) → склеенное. Вызывается попарно. aggregationStrategy: (acc, cur) => { var merged = (acc.In.Headers.TryGetValue("merged", out var m) ? m as string : "") ?? ""; acc.In.Headers["merged"] = merged + cur.In.Body; return acc; }, "http://pricing:8080/quote", "http://inventory:8080/check", "http://fraud:8080/score") .To("kafka:orders.enriched?brokers=kafka:9092&acks=All");
ℹ️ Здесь без транзакции, и это нормально: по умолчанию (3.2.1) входной оффсет коммитится инлайн после успешной обработки. Транзакция нужна, только если хочешь атомарную связку «коммит оффсета + публикация результата» — тогда
enableAutoCommit=false+.Transacted()(см. §5).
Реалистичная оговорка про Kafka. Scatter‑Gather собирает ответы, поэтому его естественные цели разсылки — request/reply эндпоинты: HTTP, gRPC, SQL‑SELECT. Kafka‑продюсер fire‑and‑forget, «собирать» с него по сути нечего, кроме метаданных доставки. Поэтому Kafka здесь живёт по краям: источник (
From("kafka:...")) и сток (To("kafka:...")), а fan‑out внутри идёт по сервисам. Так оно и работает в бою.
Что под капотом: параллельный путь
private async Task ProcessParallel(IExchange exchange, IReadOnlyList<string> recipients, CancellationToken ct, CancellationToken callerCt) { var clones = new IExchange?[recipients.Count]; var maxDop = _maxDegreeOfParallelism > 0 ? _maxDegreeOfParallelism : Environment.ProcessorCount; using var semaphore = new SemaphoreSlim(maxDop); async Task<IExchange> SendToRecipient(int index) { await semaphore.WaitAsync(ct); try { var clone = exchange.Clone(); // ← Clone(): Properties копируются поверхностно clones[index] = clone; var producer = GetOrCreateProducer(recipients[index]); // кеш продюсеров по URI await producer.Process(clone, ct); return clone; } finally { if (clones[index] != null) await clones[index]!.ReleaseScopes(); // отпускаем DI-скоупы рано, тело/заголовки живут для агрегации semaphore.Release(); } } var tasks = Enumerable.Range(0, recipients.Count).Select(SendToRecipient).ToList(); var results = await Task.WhenAll(tasks); // (в stopOnException; в best-effort — обёрнуто в try/catch на ветку) // Агрегация — в детерминированном порядке индексов, не в порядке прихода IExchange? aggregated = null; foreach (var result in results) { if (result == null) continue; aggregated = aggregated is null ? result : _aggregationStrategy(aggregated, result); } ApplyAggregation(exchange, aggregated); }
Несколько вещей здесь критичны:
exchange.Clone()— каждый получатель работает на своём клоне.Clone()копируетPropertiesповерхностно (об этом — в разделе транзакций), что важно для согласованного коммита.SemaphoreSlim(maxDop)— параллельность ограничена.MaxDegreeOfParallelism=0означаетEnvironment.ProcessorCount.Агрегация в порядке индексов. Даже если
fraudответил первым, склейка пойдётpricing → inventory → fraud. Это предсказуемость, на которую можно закладываться.ReleaseScopes()рано — DI‑скоупы клона отпускаются сразу после отправки, а тело и заголовки остаются живыми для агрегации.Кеш продюсеров —
GetOrCreateProducerхранитConcurrentDictionary<string, Lazy<ToProcessor>>, наDisposeAsyncвсе поднятые продюсеры останавливаются.
Последовательный путь
При ParallelProcessing(false) — другой код, и другой клон:
foreach (var uri in recipients) { var clone = exchange.CloneLinked(); // ← CloneLinked(), не Clone() var producer = GetOrCreateProducer(uri); await producer.Process(clone, ct); aggregated = aggregated is null ? clone : _aggregationStrategy(aggregated, clone); }
Разница Clone() vs CloneLinked() — не косметика, разберём в транзакциях.
Обработка ошибок: best‑effort vs stop‑on‑exception
StopOnException(false) (дефолт, best‑effort): упавшая ветка получает clone.Exception и всё равно идёт в агрегацию — стратегия сама решает, что делать с веткой, у которой выставлен Exception:
catch (Exception ex) { if (_stopOnException) throw; clone.Exception = ex; aggregated = aggregated is null ? clone : _aggregationStrategy(aggregated, clone); }
Таймаут в best‑effort: последовательный режим собирает частичное и выходит (break), параллельный — выкидывает протухшие ветки (null). StopOnException(true) — первая же ошибка пробрасывается, а таймаут оборачивается в TimeoutException:
catch (OperationCanceledException) when (_timeout > TimeSpan.Zero && !ct.IsCancellationRequested) { throw new TimeoutException($"Scatter-gather timed out after {_timeout}."); }
Все параметры
Полный контракт — IScatterGatherDefinition:
Параметр | По умолчанию | Что делает |
|---|---|---|
| — | Статический список URI получателей. |
| — | Динамический — список вычисляется из сообщения в рантайме. |
| обязательна | Попарная |
|
| Параллельно или последовательно. |
|
| Потолок одновременных отправок. |
|
| best‑effort или fail‑fast. |
|
| Общий дедлайн. |
Полный флент‑вид со всеми ручками:
.ScatterGather(sg => sg .Recipients("http://pricing:8080/quote", "http://inventory:8080/check", "http://fraud:8080/score") .AggregationStrategy((acc, cur) => { /* ... */ return acc; }) .ParallelProcessing(true) .MaxDegreeOfParallelism(3) .Timeout(TimeSpan.FromSeconds(2)) .StopOnException(false))
Динамические получатели
Список получателей можно вычислять из сообщения — например, веер по шардам, номера которых пришли в заголовке:
.ScatterGather(sg => sg .Recipients(e => { var shards = (e.In.Headers["shards"] as string ?? "").Split(','); return shards.Select(s => $"http://shard-{s.Trim()}:8080/query"); }) .AggregationStrategy((acc, cur) => MergeJson(acc, cur)))
Это уже почти Dynamic Recipient List внутри Scatter‑Gather — то, ради чего EIP‑примитивы и существуют.
Боевой пример: один HTTP‑запрос → шесть параллельных агрегаций
Хватит синтетики — вот реальный прод‑маршрут (дашборд мониторинга логистики). Эндпоинт POST /api/tsum/routes отдаёт страницу маршрутов плюс пять разных агрегатных блоков для виджетов. Раньше это были бы шесть последовательных запросов к БД; здесь — один Scatter‑Gather:
From("http:0.0.0.0:5090/api/tsum/routes?inOut=true&cors=true") .RouteId("tsum-api-routes") .Process(Auth.ProcessAsync) // аутентификация .ConvertBody<string>() .Process(ParseAndStashFilter) // распарсили фильтр → в Properties .ScatterGather(sg => sg .Recipients( "direct://routes-page", // страница результатов (пагинация) "direct://routes-ownership", // агрегат: свои/наёмные × статус загрузки "direct://routes-route-status", // агрегат: по статусам маршрутов "direct://routes-point-status", // агрегат: по статусам точек "direct://routes-territory", // агрегат: сколько машин на территории "direct://routes-departure") // агрегат: статистика отправлений .AggregationStrategy(MergeRouteFragments) .ParallelProcessing(true) .MaxDegreeOfParallelism(4) .StopOnException(true) .Timeout(TimeSpan.FromSeconds(30))) .Process(ComposeRoutesResponse); // собрали финальный JSON из фрагментов
Каждая ветка — отдельный direct://‑маршрут со своим redb‑запросом, который кладёт свой кусок ответа в frag:*‑проперти (и заодно свою метрику времени):
From("direct://routes-ownership") .ProcessWithRedb(async (redb, ex, ct) => { var sw = Stopwatch.StartNew(); var query = await BuildFilteredQuery(redb, Filter(ex)); // тот же фильтр, что и у всех веток var groups = await query .GroupBy(r => new { r.Own, r.LoadStatus }) // серверная агрегация в redb .SelectAsync(g => new { g.Key.Own, g.Key.LoadStatus, Count = Agg.Count(g) }); ex.Properties["frag:ownership"] = BuildOwnershipFragment(groups); ex.Properties["metric:ownershipMs"] = sw.ElapsedMilliseconds; });
Агрегатор просто переносит frag:* и metric:* из клона каждой ветки в аккумулятор:
private static IExchange MergeRouteFragments(IExchange aggregated, IExchange current) { foreach (var kv in current.Properties) if (kv.Key.StartsWith("frag:") || kv.Key.StartsWith("metric:")) aggregated.Properties[kv.Key] = kv.Value; // фрагмент ветки → в общий результат return aggregated; }
А ComposeRoutesResponse уже собирает из шести frag:* финальный JSON.
Почему это «крайне быстро». Шесть веток бегут параллельно (потолок 4 одновременно). Латентность эндпоинта = самая медленная ветка, а не сумма шести. Если каждая агрегация ~80–150 мс, последовательно вышло бы ~0.6–0.9 с, а через Scatter‑Gather — ~150 мс. Один запрос фронта → один HTTP‑хоп → шесть серверных redb‑агрегаций разом → один JSON.
И тут всё, что мы разбирали про транзакции, сходится на практике: каждая ветка через ProcessWithRedb поднимает свой per‑exchange redb‑скоуп → свой коннект (см. §10). Шесть параллельных запросов идут по шести коннектам, не делят одну транзакцию — и поэтому не падают. Никакого .Transacted() тут не нужно: ветки только читают, склейка идёт по frag:* в памяти. StopOnException(true) означает «виджет не отдадим кривым» — если любая ветка упала, весь ответ — ошибка, а не полу‑данные.
9. EIP #2 — Aggregator: сборка во времени
Scatter‑Gather собирает ответы здесь и сейчас (веер → джойн). Aggregator — другой паттерн: он собирает независимые сообщения во времени по correlation‑ключу и отдаёт склейку, когда сработал предикат завершения.
Контракт AggregatorProcessor — четыре вещи: correlationKey, aggregationStrategy, completionPredicate, target. Ядро:
public async Task Process(IExchange exchange, CancellationToken ct = default) { var key = _correlationKey(exchange); IExchange? completed = null; lock (_lock) { if (_aggregated.TryGetValue(key, out var existing)) { var merged = _aggregationStrategy(existing, exchange); _aggregated[key] = merged; if (_completionPredicate(merged)) // готова ли группа? { completed = merged; _aggregated.Remove(key); } } else { _aggregated[key] = exchange; // первая в группе if (_completionPredicate(exchange)) { completed = exchange; _aggregated.Remove(key); } } } if (completed != null) await _target.Process(completed, ct); // наружу уходят только завершённые группы }
Живой пример из демо EipRoutes.cs — собираем по 3 события с одинаковым batchId:
From("timer://agg-source?period=2000&repeatCount=9") .RouteId("demo-aggregator") .SetHeader("batchId", e => $"batch-{DateTime.UtcNow.Second % 3}") .SetBody(e => $"event-{DateTime.UtcNow:ss.fff}") .Aggregate( correlationKey: e => GetHeader(e, "batchId") ?? "default", aggregationStrategy: (oldEx, newEx) => { oldEx.In.Body = $"{oldEx.In.Body} + {newEx.In.Body}"; var count = oldEx.Properties.TryGetValue("agg.count", out var c) ? (int)c! : 1; oldEx.Properties["agg.count"] = count + 1; return oldEx; }, completionPredicate: e => e.Properties.TryGetValue("agg.count", out var c) && (int)c! >= 3) .Log("[AGG] ✔ Собрали 3 события: ${body}"); // выполняется на завершённой группе
.Aggregate(...) открывает scope (AggregateDefinition) — шаги после неё (.Log(...)) строят target‑пайплайн, который выполняется на завершённых группах. Сообщения до завершения «съедаются молча».
На Kafka это классический «collect N by key»:
From("kafka:order-lines?brokers=kafka:9092&groupId=order-assembler") .Aggregate( correlationKey: e => e.In.Headers["redbKafka.Key"]?.ToString() ?? "x", // ключ = orderId aggregationStrategy: (acc, cur) => AppendLine(acc, cur), completionPredicate: e => IsOrderComplete(e)) .To("kafka:orders.assembled?brokers=kafka:9092&acks=All");
ℹ️ По умолчанию входной оффсет коммитится после обработки. Для атомарной связки «оффсет + публикация собранного результата» —
enableAutoCommit=false+.Transacted().
Жёсткое ограничение, которое надо назвать вслух
Хранилище групп — обычный Dictionary<string, IExchange> под lock:
private readonly Dictionary<string, IExchange> _aggregated = new(StringComparer.Ordinal); private readonly object _lock = new();
Из этого следует:
теряется при рестарте процесса — все недозавершённые группы исчезают;
нет таймаута/эвикции групп — только предикат завершения. Группа, которая никогда не доберёт до условия (ждём 3 события, пришло 2, источник умер), висит в памяти вечно. Это потенциальная утечка, если ключей много и завершаются они не всегда.
Это не персистентный recovery‑аггрегатор как в «большом» Camel (с completion timeout, persistent repository, recovery). Для сценария «собрать N по ключу и эмитнуть пачку, потеря при деплое допустима» — отлично. Для «копить сутки и не потерять при рестарте» — нужен персист, которого здесь нет. Честно. Есть PendingGroupCount для мониторинга числа висящих групп — хотя бы видно, что копится.
10. Транзакции: две модели, и в этом вся соль
Вот ключевая развилка. В redb.Route две разные транзакционные модели, и коннекторы делятся на два лагеря. Понимание этой развилки — половина грамотной работы с фреймворком.
Лагерь 1: отложенные действия (ITransactedAction) — НЕ лезут в System.Transactions
Сюда входят все брокеры и redb: Kafka, Redis, RabbitMQ, AMQP, IBM MQ, Azure Service Bus и сам redb. Механика одна на всех. Транспорт не делает «настоящую» работу сразу — он кладёт ITransactedAction в общий словарь exchange.Properties["TRANSACT_ACTION"] (это ConcurrentDictionary) с уникальным на каждое сообщение ключом:
private static void RegisterTransactedAction(IExchange exchange, string key, ITransactedAction action) { if (!exchange.Properties.TryGetValue("TRANSACT_ACTION", out var raw) || raw is not ConcurrentDictionary<string, ITransactedAction> dict) { dict = new ConcurrentDictionary<string, ITransactedAction>(StringComparer.OrdinalIgnoreCase); exchange.Properties["TRANSACT_ACTION"] = dict; } dict[key] = action; }
Ключи — kafka-send-{guid}, redis-write-{guid}, redb:{name}, оффсет, deliveryTag. Уникальность критична: параллельные ветки fan‑out пишут в один общий словарь (Clone() копирует Properties поверхностно → словарь у всех клонов общий), и без уникальных ключей они бы затирали друг друга.
На границе .Transacted() за дело берётся TransactedProcessor:
public async Task Process(IExchange exchange, CancellationToken ct = default) { if (!exchange.Properties.ContainsKey(TransactActionPropertyKey)) exchange.Properties[TransactActionPropertyKey] = new ConcurrentDictionary<string, ITransactedAction>(...); using var scope = _policy.CreateScope(); // System.Transactions.TransactionScope (AsyncFlow enabled) try { await _inner.Process(exchange, ct); await CommitActions(exchange, ct); // коммитим ВСЕ отложенные действия scope.Complete(); } catch (Exception ex) { await RollbackActions(exchange, ct); // на ошибке — откатываем все throw; } }
CommitActions проходит словарь последовательно и коммитит каждое действие — это нам ещё аукнется в трейд‑оффе ниже:
foreach (var kvp in actions) await kvp.Value.Commit(ct);
Эти транспорты Transaction.Current не трогают вообще. Поэтому параллельный fan‑out по ним безопасен: уникальные ключи → нет коллизий в словаре, нет конкурентного enlistment в одну транзакцию System.Transactions.
redb в этот лагерь входит через RedbTransactedAction — он оборачивает redb‑нативную IRedbTransaction (свой BEGIN/COMMIT на своём коннекте) и кладёт её в тот же словарь под ключом redb:{name}:
public async Task Commit(CancellationToken ct = default) { if (Interlocked.Exchange(ref _completed, 1) != 0) return; // single-use try { if (_tx.IsActive) await _tx.CommitAsync(); } finally { await _tx.DisposeAsync(); } }
Лагерь 2: SQL — энлистится в System.Transactions
SQL‑коннектор ведёт себя принципиально иначе (SqlProducer):
var connection = await factory.CreateConnectionAsync(readOnly: ..., ct); // Если есть ambient TransactionScope (route-level .Transacted()), коннект авто-энлистится — // локальная транзакция не нужна. Иначе всегда оборачиваем в локальную (как EF SaveChanges). var hasAmbientTx = Transaction.Current != null; DbTransaction? tx = null; if (!hasAmbientTx) tx = await connection.BeginTransactionAsync(ct); // ... выполняем команду ... if (tx != null) await tx.CommitAsync(ct);
То есть:
нет ambient — каждое выполнение оборачивается в локальный
DbTransactionна своём коннекте (атомарно, какSaveChangesв EF, без всякого?transacted=true);есть ambient
Transaction.Current(его открыл.Transacted()/.BeginTransaction()) → локальную пропускает, коннект авто‑энлистится в ambientTransactionScope.
SQL — единственный in‑box коннектор, который реально участвует в System.Transactions. Это и хорошо (одна .Transacted() обёртка вокруг нескольких SQL‑записей = одна атомарная транзакция, идеально для outbox‑write), и требует понимания границ.
Почему параллельный Scatter‑Gather не падает под транзакцией
Логичный страх: параллельные ветки бегут на разных потоках, Transaction.Current через ExecutionContext течёт в каждую — не будет ли конкурентного использования одной транзакции и промоута в MSDTC (который на PG/Linux просто кинет)?
Нет. И вот почему — по коду:
1. Скоупы делаются per‑exchange. Именованный redb резолвится через GetRedbService(name, exchange) (RedbRouteExtensions), который кладёт DI‑scope в exchange.Properties["__redb_scope:{name}"] и тянет сервис из провайдера этого скоупа:
var cacheKey = ScopeCachePrefix + cleanName; // "__redb_scope:orders-db" if (exchange.Properties.TryGetValue(cacheKey, out var cached) && cached is IServiceScope cachedScope) return cachedScope.ServiceProvider.GetRequiredService<IRedbService>(); var scope = factory.CreateScope(); // нет в кеше — создаём свой exchange.Properties[cacheKey] = scope; return scope.ServiceProvider.GetRequiredService<IRedbService>();
2. Clone() намеренно НЕ копирует скоупы. Из Exchange.cs:
foreach (var kvp in _properties) { // Именованные redb-скоупы — per-exchange; ребёнок создаст свой при первом обращении. if (kvp.Key.StartsWith("__redb_scope:", StringComparison.Ordinal)) continue; clone._properties[kvp.Key] = kvp.Value; }
Значит каждая параллельная ветка поднимает свой скоуп → свой IRedbService → свой коннект.
3. redb вообще не энлистится в System.Transactions — он использует свою IRedbTransaction, положенную в TRANSACT_ACTION.
Итог: нет общей System.Transactions‑транзакции, которую несколько коннектов дёргают конкурентно → нет промоута в MSDTC, нет «transaction context in use by another thread», не падает. Разные коннекты, не одна транзакция — и в этом фишка, а не баг.
Для SQL — то же самое. В Scatter‑Gather без обёртки .Transacted() каждая ветка поднимает свой коннект (CreateConnectionAsync на каждый Process) и свой локальный DbTransaction → независимые атомарные записи → не падает. Упасть можно только если осознанно завернуть параллельный fan‑out в .Transacted() (ambient System.Transactions) и разослать по нескольким SQL‑коннектам: тогда они энлистятся в одну ambient‑транзакцию, и на PG/Npgsql это уедет в распределённую и кинет. Это узкий край, в который просто не лезут — для брокеров/redb есть deferred‑модель.
А DependentTransactionBranch из 3.2.0 — это про что тогда?
Тот самый 3.2.0‑фикс изоляции ambient‑транзакции по ветке применяется только в Multicast и Splitter (плюс сам хелпер) — больше нигде. Вот его ядро (DependentTransactionBranch):
internal static async Task RunAsync(Func<Task> branch) { var ambient = Transaction.Current; if (ambient is null) { await branch(); return; } // нет транзакции — ноль накладных // Форкаем dependent-клон: enlistment этой ветки приватен её потоку. var dependent = ambient.DependentClone(DependentCloneOption.BlockCommitUntilComplete); using (var scope = new TransactionScope(dependent, TransactionScopeAsyncFlowOption.Enabled)) { await branch(); scope.Complete(); } dependent.Complete(); // разрешаем родительский коммит (он ждал BlockCommitUntilComplete) }
Он нужен только для ресурсов, которые реально энлистятся в Transaction.Current — то есть для случая, когда параллельные ветки Multicast/Splitter делают inline‑работу на энлистящемся коннекте. Scatter‑Gather в нём не нуждается: per‑exchange‑скоупы + deferred ITransactedAction уже дают изоляцию без шаринга коннекта. Это не асимметрия‑недоработка — это два механизма под два разных случая.
Честный трейд‑офф (это не баг, это дизайн)
TransactedProcessor.CommitActions коммитит отложенные действия последовательно в цикле, а ветки Scatter‑Gather/Multicast — это разные коннекты, разные транзакции. Значит: если ветка A закоммитилась, а коммит ветки B бросил — получишь частичный коммит. Кросс‑веточной атомарности на уровне БД здесь нет. Это плата за «не падает, не промоутится в MSDTC». Нужна реальная кросс‑веточная атомарность с enlistment — это Multicast/Splitter под .Transacted() с тем самым 3.2.0‑фиксом, не Scatter‑Gather.
Политики транзакций
.Transacted() под капотом использует TransactionPolicy — четыре готовых:
Политика |
| Поведение |
|---|---|---|
|
| Присоединиться к ambient или создать новую. Таймаут 30с, |
|
| Всегда новая транзакция, ambient подвешивается. |
|
| Выполнить без транзакции (ambient подавляется). |
| (маркер) | Требует существующую ambient, иначе |
TransactionDefinition ещё даёт Camel‑parity хуки: .Retry(attempts, delay) (оборачивает тело в RetryProcessor) и .DeadLetterChannel(uri) (на провале после отката шлёт exchange в DLC). То есть транзакционный блок с ретраями и dead‑letter — это:
From("kafka:orders?brokers=kafka:9092&groupId=w") .Transacted() .Retry(3, TimeSpan.FromMilliseconds(200)) .To("sql:INSERT INTO orders ...") .To("kafka:orders.done?brokers=kafka:9092&transacted=true") .EndTransaction();
11. Родня по параллели: Splitter и Multicast
Scatter‑Gather — не единственный fan‑out в семействе. Рядом — Splitter (разбить тело на части и обработать каждую) и Multicast (послать копию N процессорам). У обоих, как и у Scatter‑Gather, есть опциональная агрегация, параллельность и stopOnException. Разница — в источнике «веток»:
Scatter‑Gather: ветки = эндпоинты (URI продюсеров), агрегатор обязателен.
Multicast: ветки = процессоры/суб‑пайплайны, агрегатор опционален (
MulticastProcessor).Splitter: ветки = части тела (
SplitterProcessor), агрегатор опционален, и есть нюансы Camel‑совместимости.
Splitter, кстати, — самый «навороченный» по агрегации: его стратегия (IExchange?, IExchange) → IExchange вызывается даже для первой части с oldExchange == null (Camel‑контракт seed/wrap), и есть флаги parallelAggregate (агрегировать инлайн под локом из воркеров вместо детерминированного post‑pass) и aggregateOnException (включать упавшие части в агрегат). И именно у Splitter/Multicast в параллельном пути стоит тот самый DependentTransactionBranch.RunAsync:
// MulticastProcessor.ProcessParallel / SplitterProcessor.ProcessParallel await DependentTransactionBranch.RunAsync(() => _targets[idx].Process(clone, ct));
Почему у них есть, а у Scatter‑Gather нет — разобрали выше: Splitter/Multicast гоняют inline‑процессоры (которые могут открыть энлистящийся коннект прямо в ветке), а Scatter‑Gather гоняет продюсеры по URI с per‑exchange скоупами. Разные риски — разные механизмы.
Простой Multicast из демо:
From("direct://demo-multicast") .Multicast(new[] { "direct://mcast-a", "direct://mcast-b", "direct://mcast-c" }, parallelProcessing: true) .Log("[MCAST] ◀ Все получатели обработали сообщение");
12. Saga — «немного другая»
Saga в redb.Route есть, но это не MassTransit'овский durable state‑machine. Это SagaProcessor — in‑process компенсация в рамках одного exchange:
public async Task Process(IExchange exchange, CancellationToken ct = default) { var completedCount = 0; try { for (var i = 0; i < _steps.Length; i++) { ct.ThrowIfCancellationRequested(); await _steps[i].Action(exchange, ct); completedCount++; } } catch (Exception ex) when (ex is not OperationCanceledException) { // Компенсации завершённых шагов — в ОБРАТНОМ порядке for (var i = completedCount - 1; i >= 0; i--) { if (_steps[i].Compensate is null) continue; try { await _steps[i].Compensate!(exchange, ct); } catch (Exception compEx) { _logger?.LogError(compEx, "Saga compensation for step {i} failed", i); } } throw; } if (_onCompletion is not null) await _onCompletion(exchange, ct); }
Падения компенсаций логируются и не рвут откат остальных. DSL (SagaDefinition) — два стиля, callback и fluent‑scope:
From("direct:checkout") .Saga(s => s .Step(reserve, compensate: unreserve) .Step(charge, compensate: refund) .Step(ship) // forward-only, без компенсации .OnCompletion(e => Log("заказ оформлен")));
Жёстко: нет персиста состояния, нет корреляции по сообщениям/времени, не переживает рестарт процесса. Это Camel‑стиль «routing slip с компенсацией», а не state‑machine с персистом. Совпадает только имя. Метрики, кстати, есть: SagaCompleted / SagaCompensated / SagaFailed.
И это, опять же, не недостаток — другая школа. Нужен durable‑saga с персистом поверх Kafka — собираешь из Aggregator + SQL/Redis‑стора + correlation‑ключа. Кубики на месте.
13. Outbox — его нет, и это правильно
Grep по всему src на Outbox — ноль. Встроенного outbox нет. И это не пробел, а позиция: transactional outbox — это паттерн, а не кнопка фреймворка. Лепить его в ядро — оверинжиниринг; кто хочет, соберёт свой за пять строк под свои инварианты:
// 1. Запись: бизнес-данные + строка в outbox под одной транзакцией. // Для SQL это ровно тот кейс, где System.Transactions-энлист работает на тебя: // одна .Transacted() обёртка → один ambient scope → атомарно. From("direct:place-order") .Transacted() .To("sql:INSERT INTO orders(id, payload) VALUES (@id, @payload)") .To("sql:INSERT INTO outbox(id, topic, payload, sent) VALUES (@id, 'orders', @payload, false)") .EndTransaction(); // 2. Доставка: отдельный роут-поллер читает outbox и публикует. From("sql:SELECT * FROM outbox WHERE sent = false ORDER BY id?outputType=SelectList&delay=1000") .Split(body => (IEnumerable<object?>)body) // по строке на сообщение .To("kafka:orders?brokers=kafka:9092") .To("sql:UPDATE outbox SET sent = true WHERE id = @id");
Хочешь inbox‑паттерн, дедуп по messageId, TTL на строки, claim‑check для больших payload'ов — допишешь. Никто не навязывает свою схему outbox‑таблицы и свою семантику ретраев. Это и есть «идём к Camel»: меньше зашитой магии — больше явной сборки.
14. Итог: размен, а не «лучше/хуже»
MassTransit | redb.Route (Camel‑школа) | |
|---|---|---|
Saga | durable state‑machine, персист, переживает рестарт | in‑process компенсация в рамках exchange |
Outbox | фича фреймворка | твой роут из SQL/Redis за пять строк |
Транзакции | абстракция фреймворка | две явные модели: deferred |
Kafka «transactional» | EOS из коробки | идемпотентный продюсер + deferred‑commit (не EOS) |
Философия | батарейки в комплекте, делай как решили | EIP + коннекторы + DSL, композишь сам |
Это не «redb.Route лучше MassTransit». Это другой размен: меньше зашитой магии — больше явной сборки из примитивов. Если твои сценарии не лезут в чужую модель saga/outbox — Camel‑подход даёт собрать ровно своё. Если лезут — может, тебе и не надо никуда уходить.
Scatter‑Gather, к слову, в проде у нас работает отлично — один процессор, который и разсылает, и собирает, параллельно, с предсказуемым порядком склейки и понятной транзакционной семантикой. Ровно тот случай, когда EIP‑примитив закрывает реальную задачу без единой строчки инфраструктурного кода.
Версия в проде — 3.2.0, CHANGELOG здесь. Куда наступали, что перепроверить (привет, transacted‑Kafka), какие EIP разобрать дальше — в комментарии, разберём по коду.
