redb kafka connector
redb kafka connector

Серия: redb ecosystem / redb.Route deep-dive

Очередная статья из цикла про redb.Route — наш Apache Camel под .NET. Если вы только подключились, вот предыдущие на Хабре:

Сегодня заходим с Kafka‑коннектора — разбираем его по косточкам, как делали с HTTP, — а потом сажаем на него два EIP‑паттерна: Scatter‑Gather и Aggregator. И главное — разбираем то, о чём в туториалах молчат: как это живёт под транзакциями. Заодно вышел 3.2.0.

Сразу спойлер

Никакого «exactly‑once из коробки» в Kafka здесь нет — и ниже по коду будет видно, почему именно. Что работает, а что нет — по факту, без округлений.

далее ниже Боевой пример: один HTTP‑запрос → шесть параллельных агрегаций


Оглавление

  1. Зачем уходить от MassTransit

  2. Что приехало в 3.2.0

  3. Kafka‑коннектор: анатомия URI и фабрика подключений

  4. Kafka‑продюсер: что происходит на .To("kafka:...")

  5. Kafka‑консьюмер: поллинг, батчи, ребаланс

  6. Заголовки и трейсинг сквозь брокер

  7. Честно про transacted=true

  8. EIP #1 — Scatter‑Gather: один процессор, и разослал, и собрал

  9. EIP #2 — Aggregator: сборка во времени

  10. Транзакции: две модели, и в этом вся соль

  11. Родня по параллели: Splitter и Multicast

  12. Saga — «немного другая»

  13. Outbox — его нет, и это правильно

  14. Итог: размен, а не «лучше/хуже»


1. Зачем уходить от MassTransit

MassTransit — отличная штука, и я не собираюсь её хоронить. Но это opinionated‑фреймворк: durable saga как state‑machine, transactional outbox, retry‑политики — всё зашито внутрь, и ты живёшь по правилам фреймворка. Когда твой сценарий совпадает с его картиной мира — это кайф, всё работает «из коробки». Когда не совпадает — ты воюешь с абстракцией, лезешь в кишки и пишешь костыли поверх чужих решений.

Apache Camel — другая школа. В фундаменте: EIP‑примитивы + коннекторы + DSL. Тебе никто не «дарит» outbox как фичу — тебе дают Splitter, Aggregator, Scatter‑Gather, транзакционные транспорты, и ты сам собираешь ровно то, что нужно под твои инварианты. Меньше магии — больше явной сборки из кубиков.

redb.Route стоит на второй школе. Поэтому в заголовке — «идём к Apache Camel»: не «у нас всё из коробки», а «у нас кубики, и они стыкуются предсказуемо». Дальше будет видно, чем именно этот размен оплачивается и что взамен получаешь.


2. Что приехало в 3.2.0

Полный список — в CHANGELOG. Для нашей темы важны два пункта:

  1. Параллельные Splitter / Multicast теперь изолируют ambient‑транзакцию по ветке. Каждая ветка получает свой DependentTransaction.DependentClone(BlockCommitUntilComplete); родительский коммит ждёт завершения всех веток. Раньше параллельные ветки шарили один Transaction.Current, а System.Transactions запрещает конкурентное использование одной транзакции из нескольких потоков. Ниже мы разберём этот фикс по коду и поймём, почему Scatter‑Gather в нём не нуждался.

  2. Throttle получил RFC 6585‑режим .RejectOnOverflow() (429 + Retry‑After). К нашей теме напрямую не относится, но если строите HTTP‑фасад перед всем этим — пригодится.

Все пакеты redb.Route.* версионируются вместе: 3.2.0.


3. Kafka‑коннектор: анатомия URI и фабрика подключений

Начнём, как просили, с Kafka. И начнём с самого простого, что есть — строкового URI эндпоинта. Вот минимальный консьюмер и минимальный продюсер:

// Консьюмер: читаем топик orders, группа order-workers, с самого начала
From("kafka:orders?brokers=kafka:9092&groupId=order-workers&autoOffsetReset=earliest")
    .Log("Получили: ${body}")
    .To("direct:process");

// Продюсер: публикуем в топик notifications с acks=all
From("direct:notify")
    .To("kafka:notifications?brokers=kafka:9092&acks=All");

Схема URI — kafka:<topic>?<query>. Имя топика берётся из пути (KafkaEndpoint):

public KafkaEndpoint(EndpointUri uri, KafkaComponent component, KafkaEndpointOptions options)
    : base(uri, component, options)
{
    TopicName = uri.Path;                       // ← топик = путь URI

    // Если в URI указан connectionFactory=<имя> — резолвим фабрику из реестра
    if (!string.IsNullOrEmpty(options.ConnectionFactory) && component.Context is not null)
        ResolvedFactory = component.Context.GetFromRegistry<KafkaConnectionFactory>(options.ConnectionFactory);

    // brokers в URI нет, но фабрика их знает — подставляем
    if (string.IsNullOrWhiteSpace(options.Brokers) && ResolvedFactory is not null)
        options.Brokers = ResolvedFactory.Brokers;
}

Обратите внимание на последний штрих: на консьюмере groupId обязателен, и если его забыть — эндпоинт не даст создать консьюмер:

public override IConsumer CreateConsumer(IProcessor processor)
{
    if (string.IsNullOrWhiteSpace(Options.GroupId))
        throw new InvalidOperationException(
            $"The 'groupId' parameter is required for Kafka consumer on topic '{TopicName}'.");
    return new KafkaConsumer(this, processor, Options);
}

Фабрика подключений — чтобы не дублировать настройки

Когда у тебя двадцать Kafka‑эндпоинтов на один кластер, расписывать brokers=...&securityProtocol=...&saslMechanism=... в каждом URI — самоубийство. Для этого есть KafkaConnectionFactory: регистрируешь её в реестре под именем и ссылаешься через connectionFactory=имя.

Полный её параметр‑сет (всё из исходника, ничего не выдумано):

Группа

Свойства

Дефолт

Брокеры

Brokers

localhost:9092

Безопасность

SecurityProtocolSaslMechanismSaslUsernameSaslPassword

Plaintext

SSL/TLS

SslCaLocationSslCertificateLocationSslKeyLocationSslKeyPasswordSslEndpointIdentificationAlgorithm

Продюсер

AcksRetries

Leader3

Консьюмер

GroupIdAutoOffsetReset

—, Latest

Тюнинг консьюмера

GroupInstanceIdSessionTimeoutMsHeartbeatIntervalMsMaxPollIntervalMsPartitionAssignmentStrategyIsolationLevel

Тюнинг продюсера

LingerMsBatchSizeCompressionTypeMessageTimeoutMs

Reconnect

ReconnectBackoffMsReconnectBackoffMaxMs

Advanced

ClientIdRequestTimeoutMsMetadataMaxAgeMsSocketTimeoutMsMaxInFlightAdditionalProperties

redb.Route, 30000, 300000, 60000, 5

Регистрация и использование:

context.AddToRegistry("prod-cluster", new KafkaConnectionFactory
{
    Brokers          = "kafka1:9092,kafka2:9092,kafka3:9092",
    SecurityProtocol = "SaslSsl",
    SaslMechanism    = "ScramSha512",
    SaslUsername     = "svc-orders",
    SaslPassword     = secret,
    CompressionType  = "Zstd",
    MaxInFlight      = 5,
});

// Теперь в URI — только то, что специфично для эндпоинта:
From("kafka:orders?connectionFactory=prod-cluster&groupId=order-workers");
To("kafka:notifications?connectionFactory=prod-cluster&acks=All");

KafkaConnectionFactory умеет собрать три разных конфига — BuildConsumerConfig()BuildProducerConfig()BuildAdminConfig() (последний используется для запроса метаданных топика). Эндпоинт‑уровневые опции применяются поверх фабрики (if (!string.IsNullOrWhiteSpace(Brokers)) config.BootstrapServers = Brokers; и т.д.) — то есть фабрика задаёт базу, URI её точечно перекрывает.

Все опции эндпоинта

Полный список — KafkaEndpointOptions. Группами, с дефолтами:

Подключение / безопасность: brokers (обязателен, иначе Validate() бросит), securityProtocol (Plaintext/Ssl/SaslPlaintext/SaslSsl), saslMechanism/saslUsername/saslPasswordsslCaLocation/sslCertificateLocation/sslKeyLocation/sslKeyPasswordconnectionFactory.

Консьюмер: groupIdautoOffsetReset (Latest/Earliest/Error, дефолт Latest), enableAutoCommit (дефолт true, framework-level — с 3.2.1), maxPollRecords (0 = одиночный режим, >0 = батч), pollTimeoutMs (1000), breakOnFirstErrorseekTo (beginning/end), topicIsPatterngroupInstanceIdsessionTimeoutMsheartbeatIntervalMsmaxPollIntervalMspartitionAssignmentStrategy (Range/RoundRobin/CooperativeSticky), isolationLevel (ReadUncommitted/ReadCommitted).

Продюсер: acks (None/Leader/All, дефолт Leader), retries (3), recordMetadatakeypartitionNumbertransactedtransactionIdPrefix (redb-kafka).

Тюнинг продюсера: lingerMsbatchSizecompressionType (None/Gzip/Snappy/Lz4/Zstd), messageTimeoutMs.

Advanced: additionalProperties — произвольные librdkafka‑свойства, применяются последними и перекрывают всё типизированное.

Validate() проверяет ровно три вещи (по коду): brokers непустой, maxPollRecords >= 0pollTimeoutMs >= 0retries >= 0. Всё остальное либо имеет дефолт, либо опционально.

Один нюанс — про коммит оффсета, и он важный. На уровне librdkafka EnableAutoCommit прошит в false всегда — библиотека сама, по таймеру, ничего не коммитит (иначе закоммитила бы непрочитанное). Из BuildConsumerConfig:

var cfg = new ConsumerConfig
{
    BootstrapServers = Brokers,
    GroupId          = GroupId,
    AutoOffsetReset  = ParseAutoOffsetReset(),
    EnableAutoCommit = false,   // librdkafka-уровень: ручной коммит всегда
};

Но коммит, конечно, есть — поверх, на уровне фреймворка. С 3.2.1 у консьюмера появилась типизированная опция EnableAutoCommit (дефолт true): после успешного Process консьюмер коммитит оффсет инлайн — ровно как RabbitMQ‑консьюмер ацкает после обработки. Ставится из URI (?enableAutoCommit=false) или флентом (.EnableAutoCommit(false)). Транзакционный маршрут имеет приоритет: если оффсет уже закоммитила .Transacted(), инлайн‑ветка пропускается. Механику разберём в §5.

То же самое флентом

Строковые URI хороши для коротких примеров и для конфигов. В коде эндпоинты обычно строят флентом — Kafka.Topic(...):

From(Kafka.Topic("orders")
        .Brokers("kafka1:9092,kafka2:9092")
        .GroupId("order-workers")
        .AutoOffsetReset("Earliest")
        .MaxPollRecords(500)
        .PartitionAssignmentStrategy("CooperativeSticky")
        .IsolationLevel("ReadCommitted")
        .SessionTimeout(30_000)
        .MaxPollInterval(300_000));

To(Kafka.Topic("notifications")
        .Brokers("kafka1:9092")
        .Acks("All")
        .Compression("zstd")
        .Linger(20)
        .BatchSize(64 * 1024)
        .Key("${header.orderId}"));   // ключ партиционирования — выражение

Флент‑билдер просто собирает ту же query‑строку (Kafka.Topic("orders").Brokers(...)...Build() → "kafka:orders?brokers=...&..."), с URL‑энкодингом значений. Сеттеры принимают IExpression, но резолвятся опции в разные моменты: ключ продюсера вычисляется ${...}‑выражением на каждое сообщение (в DetermineKey на момент отправки — см. ниже), а connection‑level опции (brokers, креды) разбираются один раз на коннекте, где exchange ещё нет, так что выражение в них смысла не имеет. Per‑message‑история — это именно про ключ партиционирования.


4. Kafka‑продюсер: что происходит на .To("kafka:...")

Теперь разберём, что реально делает KafkaProducer, когда сообщение доходит до .To("kafka:..."). Это ConnectableProducer — то есть требует Start() перед использованием (EnsureStarted() в начале Process).

Коннект

protected override Task ConnectAsync(CancellationToken ct)
{
    var config = _options.BuildProducerConfig(_endpoint.ResolvedFactory);

    _producer = new ProducerBuilder<string, byte[]>(config)
        .SetValueSerializer(Serializers.ByteArray)
        .SetErrorHandler((_, e) =>
            Logger?.LogError("Kafka producer error: {Reason} (Code: {Code})", e.Reason, e.Code))
        .Build();

    if (_options.Transacted)
    {
        _producer.InitTransactions(TimeSpan.FromSeconds(30));
        Logger?.LogInformation("Kafka transactional mode enabled: topic={Topic}", _endpoint.TopicName);
    }
    return Task.CompletedTask;
}

Ключ — string, значение — byte[]. На дисконнекте — Flush(30s) перед Dispose, чтобы не потерять буферизованные сообщения.

Подготовка сообщения

PrepareMessage превращает тело exchange в Message<string, byte[]>:

byte[] valueBytes = body switch
{
    byte[] bytes => bytes,                                  // уже байты — как есть
    string str   => Encoding.UTF8.GetBytes(str),            // строка → UTF-8
    null         => Array.Empty<byte>(),                    // null → пустой массив
    _            => Encoding.UTF8.GetBytes(body.ToString() ?? string.Empty)  // прочее → ToString → UTF-8
};

Дальше — заголовки. ContentType пробрасывается отдельным заголовком content-type, а пользовательские заголовки exchange переезжают в Kafka‑заголовки — кроме внутренних redbKafka.*, которые отфильтровываются:

foreach (var (key, value) in exchange.In.Headers)
{
    if (KafkaHeaders.IsRedbHeader(key))   // redbKafka.* — внутренние, не пробрасываем
        continue;
    msg.Headers.Add(key, Encoding.UTF8.GetBytes(value?.ToString() ?? string.Empty));
}

Ключ партиционирования

DetermineKey — три ветки, в порядке приоритета:

private string? DetermineKey(IExchange exchange)
{
    // 1. Явная партиция → ключ не нужен (партиционер обходится стороной)
    if (_options.PartitionNumber.HasValue)
        return null;

    // 2. Ключ из опции — поддерживает ${...}-выражения, резолвится на каждое сообщение
    if (!string.IsNullOrWhiteSpace(_options.Key))
    {
        var resolved = _options.ResolveOption(_options.Key, exchange);
        if (!string.IsNullOrEmpty(resolved)) return resolved;
    }

    // 3. Иначе — без ключа (round-robin по партициям)
    return null;
}

То есть key=${header.orderId} означает «партиционируй по orderId» — все события одного заказа лягут в одну партицию и сохранят порядок.

Отправка: немедленная vs отложенная

Вот развилка, которая важна для транзакций:

if (_options.Transacted)
{
    // Отложенная отправка — реальный publish случится на коммите маршрутной транзакции
    var action = new KafkaSendAction(_producer, _endpoint.TopicName, message,
        _options.PartitionNumber, _options.RecordMetadata, exchange, Logger);
    RegisterTransactedAction(exchange, $"kafka-send-{Guid.NewGuid():N}", action);
}
else
{
    // Немедленная отправка
    var result = _options.PartitionNumber.HasValue
        ? await _producer.ProduceAsync(new TopicPartition(_endpoint.TopicName,
              new Partition(_options.PartitionNumber.Value)), message, ct)
        : await _producer.ProduceAsync(_endpoint.TopicName, message, ct);

    if (_options.RecordMetadata)
        AddDeliveryMetadata(exchange, result);   // redbKafka.Sent.Topic/Partition/Offset/Timestamp
}

В транзакционном режиме продюсер не отправляет сразу — он кладёт KafkaSendAction в словарь отложенных действий с уникальным ключом kafka-send-{guid}. Реальный ProduceAsync произойдёт позже, на границе .Transacted(). К этому вернёмся.

KafkaSendAction при создании клонирует сообщение (глубокая копия заголовков через Buffer.BlockCopy), чтобы отложенная отправка не зависела от того, что случится с exchange дальше:

public async Task Commit(CancellationToken ct = default)
{
    var result = _partition.HasValue
        ? await _producer.ProduceAsync(new TopicPartition(_topicName, new Partition(_partition.Value)), _message, ct)
        : await _producer.ProduceAsync(_topicName, _message, ct);
    if (_recordMetadata) { /* пишем Sent.* заголовки */ }
}

public Task Rollback(CancellationToken ct = default)
{
    // Сообщение просто выбрасывается — ничего не публиковали
    return Task.CompletedTask;
}

Трейсинг сквозь брокер

Перед отправкой продюсер инжектит W3C trace context (traceparent/tracestate) в Kafka‑заголовки через стандартный DistributedContextPropagator:

var propagator = DistributedContextPropagator.Current;
propagator.Inject(activity, headers, static (carrier, key, value) =>
{
    if (carrier is Headers h && !string.IsNullOrEmpty(value))
    {
        h.Remove(key);
        h.Add(key, Encoding.UTF8.GetBytes(value));
    }
});

Консьюмер на другом конце это поднимет — и в трейсе вы увидите непрерывную цепочку через брокер. Без единой строчки настройки с вашей стороны.


5. Kafka‑консьюмер: поллинг, батчи, ребаланс

KafkaConsumer — это DrainableConsumer (умеет gracefully дослить in‑flight сообщения при остановке). Разберём по шагам.

Старт: подписка/назначение, seek, метаданные

protected override Task OnStarting(CancellationToken ct)
{
    var config = _options.BuildConsumerConfig(_endpoint.ResolvedFactory);

    _consumer = new ConsumerBuilder<string, byte[]>(config)
        .SetValueDeserializer(Deserializers.ByteArray)
        .SetErrorHandler((_, e) => { /* fatal → LogError, иначе LogWarning */ })
        .SetPartitionsAssignedHandler((c, partitions) => { /* лог assigned */ })
        .SetPartitionsRevokedHandler((c, partitions) =>
        {
            // Перед отзывом партиций — коммитим текущие оффсеты
            try { c.Commit(partitions); }
            catch (KafkaException) { /* нечего коммитить — ок */ }
        })
        .SetPartitionsLostHandler((_, partitions) => { /* лог involuntary loss */ })
        .Build();

    if (_options.PartitionNumber.HasValue)
        _consumer.Assign(new[] { new TopicPartition(_endpoint.TopicName, new Partition(_options.PartitionNumber.Value)) });
    else
        _consumer.Subscribe(_endpoint.TopicName);

    HandleSeekTo();        // seekTo=beginning/end на первом старте
    LogTopicMetadata();    // дамп: партиции, лидеры, реплики, ISR
    return Task.CompletedTask;
}

Два режима членства: либо Subscribe(topic) (динамическое назначение партиций группой + ребаланс), либо явный Assign(partition) если задан partitionNumber (статическое назначение, без группового ребаланса).

LogTopicMetadata при старте поднимает AdminClient и логирует топологию топика — сколько партиций, кто лидер каждой, реплики, ISR. Удобно для дебага «почему меня назначило только на 2 из 6 партиций».

Поллинг‑цикл

RunAsync запускает long‑running таску с поллинг‑циклом:

private async Task PollLoop(CancellationToken pollCt, CancellationToken processingCt)
{
    while (!pollCt.IsCancellationRequested)
    {
        try
        {
            if (_options.MaxPollRecords > 0)
                await ProcessBatch(pollCt, processingCt);     // батч-режим
            else
                await ProcessSingleMessage(pollCt, processingCt);  // по одному
        }
        catch (OperationCanceledException) { break; }
        catch (Exception ex)
        {
            Logger?.LogError(ex, "Error in Kafka poll loop for topic {Topic}", _endpoint.TopicName);
            try { await Task.Delay(1000, pollCt); } catch (OperationCanceledException) { break; }
        }
    }
}

Поймали исключение в цикле — залогировали, подождали секунду, продолжили (не убиваем консьюмер из‑за одного сбойного сообщения).

Одиночный режим

private async Task ProcessSingleMessage(CancellationToken pollCt, CancellationToken processingCt)
{
    var result = _consumer!.Consume(pollCt);
    if (result?.Message is null) return;

    var exchange = CreateExchange(result);
    IncrementInflight();
    try
    {
        // Регистрируем коммит оффсета как отложенное действие — на случай транзакционного маршрута.
        var commitAction = new KafkaCommitAction(_consumer, result, Logger);
        RegisterTransactedAction(exchange, $"kafka-commit-{result.Offset.Value}", commitAction);

        await Processor.Process(exchange, processingCt);
        ProcessedCount++;

        // 3.2.1: авто-коммит после успеха — если оффсет не закоммитила транзакция (флаг Committed).
        if (_options.EnableAutoCommit && !commitAction.Committed)
            await commitAction.Commit(processingCt);
    }
    finally
    {
        await exchange.DisposeAsync();
        DecrementInflight();
    }
}

Важная деталь: консьюмер регистрирует KafkaCommitAction до обработки. То есть оффсет закоммитится не сразу, а на коммите маршрутной транзакции — вместе с любыми отложенными отправками. Если обработка упадёт — оффсет не коммитнется, сообщение приедет снова.

KafkaCommitAction.Commit — это обычный _consumer.Commit(result):

public Task Commit(CancellationToken ct = default)
{
    _consumer.Commit(_result);   // ← обычный коммит оффсета, НЕ SendOffsetsToTransaction
    return Task.CompletedTask;
}
public Task Rollback(CancellationToken ct = default) => Task.CompletedTask;  // не коммитим — сообщение перечитается

Батч‑режим

При maxPollRecords > 0 консьюмер собирает пачку до maxPollRecords штук или до дедлайна pollTimeoutMs, и отдаёт их одним exchange, в теле которого — List<IMessage>:

while (batch.Count < _options.MaxPollRecords && DateTime.UtcNow < deadline && !pollCt.IsCancellationRequested)
{
    var result = _consumer!.Consume(TimeSpan.FromMilliseconds(100));
    if (result?.Message is not null) batch.Add(result);
}
if (batch.Count == 0) return;

var exchange = CreateBatchExchange(batch);   // Body = List<IMessage>, redbKafka.BatchSize = N
// Коммитим только последний оффсет батча
var last = batch[^1];
RegisterTransactedAction(exchange, $"kafka-batch-commit-{last.Offset.Value}", new KafkaCommitAction(_consumer!, last, Logger));
await Processor.Process(exchange, processingCt);

Тонкость: коммитится только последний оффсет батча (Kafka‑оффсеты монотонны в партиции, коммит последнего покрывает всю пачку). Дальше в маршруте можно сделать .Split(body => body) и обработать каждое сообщение по отдельности — но коммит всё равно один на батч.

Как exchange наполняется метаданными

CreateExchange кладёт тело (byte[]), переносит Kafka‑заголовки (декодируя как UTF‑8), восстанавливает ContentType, и проставляет метаданные:

message.Headers[KafkaHeaders.Topic]     = result.Topic;
message.Headers[KafkaHeaders.Partition] = result.Partition.Value;
message.Headers[KafkaHeaders.Offset]    = result.Offset.Value;
if (result.Message.Timestamp.Type != TimestampType.NotAvailable)
    message.Headers[KafkaHeaders.Timestamp] = result.Message.Timestamp.UtcDateTime;
if (!string.IsNullOrEmpty(result.Message.Key))
    message.Headers[KafkaHeaders.Key] = result.Message.Key;

var exchange = Exchange.Create(message, _endpoint.ScopeFactory);
exchange.Pattern = ExchangePattern.InOnly;   // консьюмер — InOnly

Что делает консьюмер, пока сообщение идёт по маршруту

Разберём по шагам ProcessSingleMessage, потому что это объясняет и про коммит:

  1. Consume(pollCt) — забрал одно сообщение (в одиночном режиме поллинг блокирующий: следующее не заберётся, пока текущее не отработает);

  2. CreateExchange — собрал exchange;

  3. IncrementInflight() — пометил сообщение как in‑flight (это нужно DrainableConsumer для graceful‑остановки: при стопе он дождётся всех in‑flight);

  4. зарегистрировал KafkaCommitAction в TRANSACT_ACTION — но пока не закоммитил;

  5. await Processor.Process(exchange) — прогнал сообщение через весь маршрут (синхронно ждёт завершения);

  6. ProcessedCount++;

  7. если EnableAutoCommit (дефолт true) и оффсет ещё не закоммитила транзакция — коммитит инлайн прямо здесь;

  8. finallyexchange.DisposeAsync() + DecrementInflight().

Ключевой момент: по умолчанию (EnableAutoCommit=true, с 3.2.1) консьюмер коммитит оффсет инлайн после успешного Process — at‑least‑once, как и RabbitMQ. Регистрация KafkaCommitAction в TRANSACT_ACTION нужна для другого случая — когда оффсет должна закоммитить транзакция: тогда .Transacted() коммитит его на своей границе, а флаг Committed на действии не даёт консьюмеру закоммитить второй раз инлайн.

В батч‑режиме то же самое, только Process получает один exchange с List<IMessage> в теле, а коммитится (инлайн или транзакцией) последний оффсет батча.

Как коммитится оффсет: дефолт и транзакционный режим

С 3.2.1 поведение по умолчанию простое и совпадает с кроликом: обработал — закоммитил.

// ✅ Дефолт (EnableAutoCommit=true): оффсет коммитится инлайн после успешного Process.
//    Голый консьюмер настраивать не нужно — at-least-once из коробки.
From("kafka:orders?brokers=kafka:9092&groupId=w")
    .To("direct:process");

Транзакционный режим нужен, когда оффсет надо закоммитить вместе с другой работой (отложенные отправки в Kafka/Redis, redb‑запись) — атомарно на границе маршрута. Тогда выключаешь авто‑коммит и оборачиваешь в транзакцию: коммит оффсета становится частью пачки TRANSACT_ACTION.

// ✅ Транзакционно: оффсет + отложенные отправки коммитятся вместе на границе .Transacted().
//    enableAutoCommit=false → инлайн-коммита нет, оффсет коммитит транзакция.
From("kafka:orders?brokers=kafka:9092&groupId=w&enableAutoCommit=false")
    .Transacted()
        .To("kafka:orders.done?brokers=kafka:9092&transacted=true")
    .EndTransaction();

// ✅ Императивно — то же самое через явный commit.
From("kafka:orders?brokers=kafka:9092&groupId=w&enableAutoCommit=false")
    .BeginTransaction()
    .To("direct:process")
    .CommitTransaction();

Если оставить enableAutoCommit=true (дефолт) и при этом обернуть в .Transacted() — транзакция имеет приоритет: она коммитит оффсет на границе, а флаг Committed на KafkaCommitAction не даёт консьюмеру продублировать коммит инлайн. То есть в транзакционном маршруте опция де‑факто игнорится.

Кстати, в демках redb.Route.Demo/Routes Kafka используется только как продюсер (.WireTap(KafkaWireTap) на топик demo-audit в MainPipelineRoutes.cs), консьюмера там нет — поэтому боевой паттерн consume‑process‑produce смотри в разделе про транзакции.


6. Заголовки и трейсинг сквозь брокер

Полный справочник заголовков — KafkaHeaders. Префикс у всех — redbKafka.:

Заголовок

Кто ставит

Что значит

redbKafka.Topic

консьюмер

топик прочитанного сообщения

redbKafka.Partition

консьюмер

партиция

redbKafka.Offset

консьюмер

оффсет в партиции

redbKafka.Timestamp

консьюмер

таймстемп записи

redbKafka.Key

консьюмер

ключ (если был)

redbKafka.BatchSize

консьюмер

размер батча (батч‑режим)

redbKafka.Sent.Topic

продюсер

топик, куда отправили (при recordMetadata)

redbKafka.Sent.Partition

продюсер

партиция назначения

redbKafka.Sent.Offset

продюсер

присвоенный оффсет

redbKafka.Sent.Timestamp

продюсер

время отправки

В маршруте к ним обращаешься как ${header.redbKafka.Offset}:

From("kafka:orders?brokers=kafka:9092&groupId=w")
    .Log("offset=${header.redbKafka.Offset} partition=${header.redbKafka.Partition} key=${header.redbKafka.Key}");

Эти redbKafka.* при ре‑публикации не пробрасываются в исходящее Kafka‑сообщение (фильтр IsRedbHeader в продюсере) — чтобы метаданные одного хопа не утекали в следующий.


7. Честно про transacted=true

Вот место, где маркетинг любит сказать «exactly‑once». Открываем код.

Что делает transacted=true (из BuildProducerConfig):

if (Transacted)
{
    config.EnableIdempotence = true;
    config.Acks              = Acks.All;   // обязательно для идемпотентного продюсера
}

И всё: transactional.id не ставится, InitTransactions / BeginTransaction / CommitTransaction / SendOffsetsToTransaction — нигде. Транзакционный API librdkafka не задействован.

Что это значит по факту:

  • «transacted» здесь = идемпотентный продюсер (EnableIdempotence=true, дедуп ретраев в рамках сессии) + отложенная отправка (через KafkaSendAction, реальный ProduceAsync на границе маршрутной транзакции);

  • это НЕ Kafka‑EOS поверх consume‑process‑produce: нет атомарной Kafka‑транзакции через SendOffsetsToTransaction;

  • краш ровно между отправкой и коммитом оффсета на рестарте даст повтор (at‑least‑once), а не exactly‑once.

Поэтому честная формулировка — «идемпотентность + deferred‑commit на уровне роута», а не exactly‑once. Для большинства задач «не потерять сообщение и не закоммитить оффсет раньше, чем сделали работу» этого хватает; настоящий read‑process‑write EOS (BeginTransaction / SendOffsetsToTransaction / CommitTransaction) — отдельная история, и её тут нет.

Апач Кемел, к слову, EOS через transacted() тоже не даёт: там авто‑коммит по умолчанию + ручной commit (KafkaManualCommit) + idempotent‑repo для дедупа — та же модель. Так что это не «недо‑Kafka», а ровно тот же честный размен, что и у зрелого интеграционного фреймворка.

Теперь, когда Kafka разобран, можно ставить на него EIP.


8. EIP #1 — Scatter‑Gather: один процессор, и разослал, и собрал

Классический Scatter‑Gather из книги Хопе/Вульфа — это «разослать запрос N получателям и собрать их ответы в один». В redb.Route это один процессор ScatterGatherProcessor, у которого агрегатор обязательный, а параллельность включена по умолчанию.

Самый текстовый вид — Kafka на входе, fan‑out по трём сервисам, склейка, публикация результата:

From("kafka:orders.incoming?brokers=kafka:9092&groupId=enricher&autoOffsetReset=earliest")
    .RouteId("order-enrich")
    .ScatterGather(
        // (накопленное, текущее) → склеенное. Вызывается попарно.
        aggregationStrategy: (acc, cur) =>
        {
            var merged = (acc.In.Headers.TryGetValue("merged", out var m) ? m as string : "") ?? "";
            acc.In.Headers["merged"] = merged + cur.In.Body;
            return acc;
        },
        "http://pricing:8080/quote",
        "http://inventory:8080/check",
        "http://fraud:8080/score")
    .To("kafka:orders.enriched?brokers=kafka:9092&acks=All");

ℹ️ Здесь без транзакции, и это нормально: по умолчанию (3.2.1) входной оффсет коммитится инлайн после успешной обработки. Транзакция нужна, только если хочешь атомарную связку «коммит оффсета + публикация результата» — тогда enableAutoCommit=false + .Transacted() (см. §5).

Реалистичная оговорка про Kafka. Scatter‑Gather собирает ответы, поэтому его естественные цели разсылки — request/reply эндпоинты: HTTP, gRPC, SQL‑SELECT. Kafka‑продюсер fire‑and‑forget, «собирать» с него по сути нечего, кроме метаданных доставки. Поэтому Kafka здесь живёт по краям: источник (From("kafka:...")) и сток (To("kafka:...")), а fan‑out внутри идёт по сервисам. Так оно и работает в бою.

Что под капотом: параллельный путь

private async Task ProcessParallel(IExchange exchange, IReadOnlyList<string> recipients, CancellationToken ct, CancellationToken callerCt)
{
    var clones = new IExchange?[recipients.Count];
    var maxDop = _maxDegreeOfParallelism > 0 ? _maxDegreeOfParallelism : Environment.ProcessorCount;
    using var semaphore = new SemaphoreSlim(maxDop);

    async Task<IExchange> SendToRecipient(int index)
    {
        await semaphore.WaitAsync(ct);
        try
        {
            var clone = exchange.Clone();                 // ← Clone(): Properties копируются поверхностно
            clones[index] = clone;
            var producer = GetOrCreateProducer(recipients[index]);  // кеш продюсеров по URI
            await producer.Process(clone, ct);
            return clone;
        }
        finally
        {
            if (clones[index] != null)
                await clones[index]!.ReleaseScopes();      // отпускаем DI-скоупы рано, тело/заголовки живут для агрегации
            semaphore.Release();
        }
    }

    var tasks = Enumerable.Range(0, recipients.Count).Select(SendToRecipient).ToList();
    var results = await Task.WhenAll(tasks);   // (в stopOnException; в best-effort — обёрнуто в try/catch на ветку)

    // Агрегация — в детерминированном порядке индексов, не в порядке прихода
    IExchange? aggregated = null;
    foreach (var result in results)
    {
        if (result == null) continue;
        aggregated = aggregated is null ? result : _aggregationStrategy(aggregated, result);
    }
    ApplyAggregation(exchange, aggregated);
}

Несколько вещей здесь критичны:

  • exchange.Clone() — каждый получатель работает на своём клоне. Clone() копирует Properties поверхностно (об этом — в разделе транзакций), что важно для согласованного коммита.

  • SemaphoreSlim(maxDop) — параллельность ограничена. MaxDegreeOfParallelism=0 означает Environment.ProcessorCount.

  • Агрегация в порядке индексов. Даже если fraud ответил первым, склейка пойдёт pricing → inventory → fraud. Это предсказуемость, на которую можно закладываться.

  • ReleaseScopes() рано — DI‑скоупы клона отпускаются сразу после отправки, а тело и заголовки остаются живыми для агрегации.

  • Кеш продюсеров — GetOrCreateProducer хранит ConcurrentDictionary<string, Lazy<ToProcessor>>, на DisposeAsync все поднятые продюсеры останавливаются.

Последовательный путь

При ParallelProcessing(false) — другой код, и другой клон:

foreach (var uri in recipients)
{
    var clone = exchange.CloneLinked();        // ← CloneLinked(), не Clone()
    var producer = GetOrCreateProducer(uri);
    await producer.Process(clone, ct);
    aggregated = aggregated is null ? clone : _aggregationStrategy(aggregated, clone);
}

Разница Clone() vs CloneLinked() — не косметика, разберём в транзакциях.

Обработка ошибок: best‑effort vs stop‑on‑exception

StopOnException(false) (дефолт, best‑effort): упавшая ветка получает clone.Exception и всё равно идёт в агрегацию — стратегия сама решает, что делать с веткой, у которой выставлен Exception:

catch (Exception ex)
{
    if (_stopOnException) throw;
    clone.Exception = ex;
    aggregated = aggregated is null ? clone : _aggregationStrategy(aggregated, clone);
}

Таймаут в best‑effort: последовательный режим собирает частичное и выходит (break), параллельный — выкидывает протухшие ветки (null). StopOnException(true) — первая же ошибка пробрасывается, а таймаут оборачивается в TimeoutException:

catch (OperationCanceledException) when (_timeout > TimeSpan.Zero && !ct.IsCancellationRequested)
{
    throw new TimeoutException($"Scatter-gather timed out after {_timeout}.");
}

Все параметры

Полный контракт — IScatterGatherDefinition:

Параметр

По умолчанию

Что делает

Recipients(params string[])

Статический список URI получателей.

Recipients(Func<IExchange, IEnumerable<string>>)

Динамический — список вычисляется из сообщения в рантайме.

AggregationStrategy(...)

обязательна

Попарная (acc, cur) → merged. Без неё — ошибка на build.

ParallelProcessing(bool)

true

Параллельно или последовательно.

MaxDegreeOfParallelism(int)

0 → ProcessorCount

Потолок одновременных отправок.

StopOnException(bool)

false

best‑effort или fail‑fast.

Timeout(TimeSpan)

Zero (нет)

Общий дедлайн.

Полный флент‑вид со всеми ручками:

.ScatterGather(sg => sg
    .Recipients("http://pricing:8080/quote", "http://inventory:8080/check", "http://fraud:8080/score")
    .AggregationStrategy((acc, cur) => { /* ... */ return acc; })
    .ParallelProcessing(true)
    .MaxDegreeOfParallelism(3)
    .Timeout(TimeSpan.FromSeconds(2))
    .StopOnException(false))

Динамические получатели

Список получателей можно вычислять из сообщения — например, веер по шардам, номера которых пришли в заголовке:

.ScatterGather(sg => sg
    .Recipients(e =>
    {
        var shards = (e.In.Headers["shards"] as string ?? "").Split(',');
        return shards.Select(s => $"http://shard-{s.Trim()}:8080/query");
    })
    .AggregationStrategy((acc, cur) => MergeJson(acc, cur)))

Это уже почти Dynamic Recipient List внутри Scatter‑Gather — то, ради чего EIP‑примитивы и существуют.

Боевой пример: один HTTP‑запрос → шесть параллельных агрегаций

Хватит синтетики — вот реальный прод‑маршрут (дашборд мониторинга логистики). Эндпоинт POST /api/tsum/routes отдаёт страницу маршрутов плюс пять разных агрегатных блоков для виджетов. Раньше это были бы шесть последовательных запросов к БД; здесь — один Scatter‑Gather:

From("http:0.0.0.0:5090/api/tsum/routes?inOut=true&cors=true")
    .RouteId("tsum-api-routes")
    .Process(Auth.ProcessAsync)                 // аутентификация
    .ConvertBody<string>()
    .Process(ParseAndStashFilter)               // распарсили фильтр → в Properties
    .ScatterGather(sg => sg
        .Recipients(
            "direct://routes-page",             // страница результатов (пагинация)
            "direct://routes-ownership",        // агрегат: свои/наёмные × статус загрузки
            "direct://routes-route-status",     // агрегат: по статусам маршрутов
            "direct://routes-point-status",     // агрегат: по статусам точек
            "direct://routes-territory",        // агрегат: сколько машин на территории
            "direct://routes-departure")        // агрегат: статистика отправлений
        .AggregationStrategy(MergeRouteFragments)
        .ParallelProcessing(true)
        .MaxDegreeOfParallelism(4)
        .StopOnException(true)
        .Timeout(TimeSpan.FromSeconds(30)))
    .Process(ComposeRoutesResponse);            // собрали финальный JSON из фрагментов

Каждая ветка — отдельный direct://‑маршрут со своим redb‑запросом, который кладёт свой кусок ответа в frag:*‑проперти (и заодно свою метрику времени):

From("direct://routes-ownership")
    .ProcessWithRedb(async (redb, ex, ct) =>
    {
        var sw = Stopwatch.StartNew();
        var query = await BuildFilteredQuery(redb, Filter(ex));   // тот же фильтр, что и у всех веток
        var groups = await query
            .GroupBy(r => new { r.Own, r.LoadStatus })            // серверная агрегация в redb
            .SelectAsync(g => new { g.Key.Own, g.Key.LoadStatus, Count = Agg.Count(g) });

        ex.Properties["frag:ownership"]      = BuildOwnershipFragment(groups);
        ex.Properties["metric:ownershipMs"]  = sw.ElapsedMilliseconds;
    });

Агрегатор просто переносит frag:* и metric:* из клона каждой ветки в аккумулятор:

private static IExchange MergeRouteFragments(IExchange aggregated, IExchange current)
{
    foreach (var kv in current.Properties)
        if (kv.Key.StartsWith("frag:") || kv.Key.StartsWith("metric:"))
            aggregated.Properties[kv.Key] = kv.Value;   // фрагмент ветки → в общий результат
    return aggregated;
}

А ComposeRoutesResponse уже собирает из шести frag:* финальный JSON.

Почему это «крайне быстро». Шесть веток бегут параллельно (потолок 4 одновременно). Латентность эндпоинта = самая медленная ветка, а не сумма шести. Если каждая агрегация ~80–150 мс, последовательно вышло бы ~0.6–0.9 с, а через Scatter‑Gather — ~150 мс. Один запрос фронта → один HTTP‑хоп → шесть серверных redb‑агрегаций разом → один JSON.

И тут всё, что мы разбирали про транзакции, сходится на практике: каждая ветка через ProcessWithRedb поднимает свой per‑exchange redb‑скоуп → свой коннект (см. §10). Шесть параллельных запросов идут по шести коннектам, не делят одну транзакцию — и поэтому не падают. Никакого .Transacted() тут не нужно: ветки только читают, склейка идёт по frag:* в памяти. StopOnException(true) означает «виджет не отдадим кривым» — если любая ветка упала, весь ответ — ошибка, а не полу‑данные.


9. EIP #2 — Aggregator: сборка во времени

Scatter‑Gather собирает ответы здесь и сейчас (веер → джойн). Aggregator — другой паттерн: он собирает независимые сообщения во времени по correlation‑ключу и отдаёт склейку, когда сработал предикат завершения.

Контракт AggregatorProcessor — четыре вещи: correlationKeyaggregationStrategycompletionPredicate, target. Ядро:

public async Task Process(IExchange exchange, CancellationToken ct = default)
{
    var key = _correlationKey(exchange);
    IExchange? completed = null;

    lock (_lock)
    {
        if (_aggregated.TryGetValue(key, out var existing))
        {
            var merged = _aggregationStrategy(existing, exchange);
            _aggregated[key] = merged;
            if (_completionPredicate(merged))     // готова ли группа?
            {
                completed = merged;
                _aggregated.Remove(key);
            }
        }
        else
        {
            _aggregated[key] = exchange;          // первая в группе
            if (_completionPredicate(exchange)) { completed = exchange; _aggregated.Remove(key); }
        }
    }

    if (completed != null)
        await _target.Process(completed, ct);     // наружу уходят только завершённые группы
}

Живой пример из демо EipRoutes.cs — собираем по 3 события с одинаковым batchId:

From("timer://agg-source?period=2000&repeatCount=9")
    .RouteId("demo-aggregator")
    .SetHeader("batchId", e => $"batch-{DateTime.UtcNow.Second % 3}")
    .SetBody(e => $"event-{DateTime.UtcNow:ss.fff}")
    .Aggregate(
        correlationKey:      e => GetHeader(e, "batchId") ?? "default",
        aggregationStrategy: (oldEx, newEx) =>
        {
            oldEx.In.Body = $"{oldEx.In.Body} + {newEx.In.Body}";
            var count = oldEx.Properties.TryGetValue("agg.count", out var c) ? (int)c! : 1;
            oldEx.Properties["agg.count"] = count + 1;
            return oldEx;
        },
        completionPredicate:  e => e.Properties.TryGetValue("agg.count", out var c) && (int)c! >= 3)
    .Log("[AGG] ✔ Собрали 3 события: ${body}");   // выполняется на завершённой группе

.Aggregate(...) открывает scope (AggregateDefinition) — шаги после неё (.Log(...)) строят target‑пайплайн, который выполняется на завершённых группах. Сообщения до завершения «съедаются молча».

На Kafka это классический «collect N by key»:

From("kafka:order-lines?brokers=kafka:9092&groupId=order-assembler")
    .Aggregate(
        correlationKey:      e => e.In.Headers["redbKafka.Key"]?.ToString() ?? "x",  // ключ = orderId
        aggregationStrategy: (acc, cur) => AppendLine(acc, cur),
        completionPredicate: e => IsOrderComplete(e))
    .To("kafka:orders.assembled?brokers=kafka:9092&acks=All");

ℹ️ По умолчанию входной оффсет коммитится после обработки. Для атомарной связки «оффсет + публикация собранного результата» — enableAutoCommit=false + .Transacted().

Жёсткое ограничение, которое надо назвать вслух

Хранилище групп — обычный Dictionary<string, IExchange> под lock:

private readonly Dictionary<string, IExchange> _aggregated = new(StringComparer.Ordinal);
private readonly object _lock = new();

Из этого следует:

  • теряется при рестарте процесса — все недозавершённые группы исчезают;

  • нет таймаута/эвикции групп — только предикат завершения. Группа, которая никогда не доберёт до условия (ждём 3 события, пришло 2, источник умер), висит в памяти вечно. Это потенциальная утечка, если ключей много и завершаются они не всегда.

Это не персистентный recovery‑аггрегатор как в «большом» Camel (с completion timeout, persistent repository, recovery). Для сценария «собрать N по ключу и эмитнуть пачку, потеря при деплое допустима» — отлично. Для «копить сутки и не потерять при рестарте» — нужен персист, которого здесь нет. Честно. Есть PendingGroupCount для мониторинга числа висящих групп — хотя бы видно, что копится.


10. Транзакции: две модели, и в этом вся соль

Вот ключевая развилка. В redb.Route две разные транзакционные модели, и коннекторы делятся на два лагеря. Понимание этой развилки — половина грамотной работы с фреймворком.

Лагерь 1: отложенные действия (ITransactedAction) — НЕ лезут в System.Transactions

Сюда входят все брокеры и redb: Kafka, Redis, RabbitMQ, AMQP, IBM MQ, Azure Service Bus и сам redb. Механика одна на всех. Транспорт не делает «настоящую» работу сразу — он кладёт ITransactedAction в общий словарь exchange.Properties["TRANSACT_ACTION"] (это ConcurrentDictionary) с уникальным на каждое сообщение ключом:

private static void RegisterTransactedAction(IExchange exchange, string key, ITransactedAction action)
{
    if (!exchange.Properties.TryGetValue("TRANSACT_ACTION", out var raw) ||
        raw is not ConcurrentDictionary<string, ITransactedAction> dict)
    {
        dict = new ConcurrentDictionary<string, ITransactedAction>(StringComparer.OrdinalIgnoreCase);
        exchange.Properties["TRANSACT_ACTION"] = dict;
    }
    dict[key] = action;
}

Ключи — kafka-send-{guid}redis-write-{guid}redb:{name}, оффсет, deliveryTag. Уникальность критична: параллельные ветки fan‑out пишут в один общий словарь (Clone() копирует Properties поверхностно → словарь у всех клонов общий), и без уникальных ключей они бы затирали друг друга.

На границе .Transacted() за дело берётся TransactedProcessor:

public async Task Process(IExchange exchange, CancellationToken ct = default)
{
    if (!exchange.Properties.ContainsKey(TransactActionPropertyKey))
        exchange.Properties[TransactActionPropertyKey] = new ConcurrentDictionary<string, ITransactedAction>(...);

    using var scope = _policy.CreateScope();   // System.Transactions.TransactionScope (AsyncFlow enabled)
    try
    {
        await _inner.Process(exchange, ct);
        await CommitActions(exchange, ct);      // коммитим ВСЕ отложенные действия
        scope.Complete();
    }
    catch (Exception ex)
    {
        await RollbackActions(exchange, ct);    // на ошибке — откатываем все
        throw;
    }
}

CommitActions проходит словарь последовательно и коммитит каждое действие — это нам ещё аукнется в трейд‑оффе ниже:

foreach (var kvp in actions)
    await kvp.Value.Commit(ct);

Эти транспорты Transaction.Current не трогают вообще. Поэтому параллельный fan‑out по ним безопасен: уникальные ключи → нет коллизий в словаре, нет конкурентного enlistment в одну транзакцию System.Transactions.

redb в этот лагерь входит через RedbTransactedAction — он оборачивает redb‑нативную IRedbTransaction (свой BEGIN/COMMIT на своём коннекте) и кладёт её в тот же словарь под ключом redb:{name}:

public async Task Commit(CancellationToken ct = default)
{
    if (Interlocked.Exchange(ref _completed, 1) != 0) return;  // single-use
    try { if (_tx.IsActive) await _tx.CommitAsync(); }
    finally { await _tx.DisposeAsync(); }
}

Лагерь 2: SQL — энлистится в System.Transactions

SQL‑коннектор ведёт себя принципиально иначе (SqlProducer):

var connection = await factory.CreateConnectionAsync(readOnly: ..., ct);

// Если есть ambient TransactionScope (route-level .Transacted()), коннект авто-энлистится —
// локальная транзакция не нужна. Иначе всегда оборачиваем в локальную (как EF SaveChanges).
var hasAmbientTx = Transaction.Current != null;
DbTransaction? tx = null;
if (!hasAmbientTx)
    tx = await connection.BeginTransactionAsync(ct);
// ... выполняем команду ...
if (tx != null) await tx.CommitAsync(ct);

То есть:

  • нет ambient — каждое выполнение оборачивается в локальный DbTransaction на своём коннекте (атомарно, как SaveChanges в EF, без всякого ?transacted=true);

  • есть ambient Transaction.Current (его открыл .Transacted() / .BeginTransaction()) → локальную пропускает, коннект авто‑энлистится в ambient TransactionScope.

SQL — единственный in‑box коннектор, который реально участвует в System.Transactions. Это и хорошо (одна .Transacted() обёртка вокруг нескольких SQL‑записей = одна атомарная транзакция, идеально для outbox‑write), и требует понимания границ.

Почему параллельный Scatter‑Gather не падает под транзакцией

Логичный страх: параллельные ветки бегут на разных потоках, Transaction.Current через ExecutionContext течёт в каждую — не будет ли конкурентного использования одной транзакции и промоута в MSDTC (который на PG/Linux просто кинет)?

Нет. И вот почему — по коду:

1. Скоупы делаются per‑exchange. Именованный redb резолвится через GetRedbService(name, exchange) (RedbRouteExtensions), который кладёт DI‑scope в exchange.Properties["__redb_scope:{name}"] и тянет сервис из провайдера этого скоупа:

var cacheKey = ScopeCachePrefix + cleanName;          // "__redb_scope:orders-db"
if (exchange.Properties.TryGetValue(cacheKey, out var cached) && cached is IServiceScope cachedScope)
    return cachedScope.ServiceProvider.GetRequiredService<IRedbService>();

var scope = factory.CreateScope();                    // нет в кеше — создаём свой
exchange.Properties[cacheKey] = scope;
return scope.ServiceProvider.GetRequiredService<IRedbService>();

2. Clone() намеренно НЕ копирует скоупы. Из Exchange.cs:

foreach (var kvp in _properties)
{
    // Именованные redb-скоупы — per-exchange; ребёнок создаст свой при первом обращении.
    if (kvp.Key.StartsWith("__redb_scope:", StringComparison.Ordinal))
        continue;
    clone._properties[kvp.Key] = kvp.Value;
}

Значит каждая параллельная ветка поднимает свой скоуп → свой IRedbService → свой коннект.

3. redb вообще не энлистится в System.Transactions — он использует свою IRedbTransaction, положенную в TRANSACT_ACTION.

Итог: нет общей System.Transactions‑транзакции, которую несколько коннектов дёргают конкурентно → нет промоута в MSDTC, нет «transaction context in use by another thread», не падает. Разные коннекты, не одна транзакция — и в этом фишка, а не баг.

Для SQL — то же самое. В Scatter‑Gather без обёртки .Transacted() каждая ветка поднимает свой коннект (CreateConnectionAsync на каждый Process) и свой локальный DbTransaction → независимые атомарные записи → не падает. Упасть можно только если осознанно завернуть параллельный fan‑out в .Transacted() (ambient System.Transactions) и разослать по нескольким SQL‑коннектам: тогда они энлистятся в одну ambient‑транзакцию, и на PG/Npgsql это уедет в распределённую и кинет. Это узкий край, в который просто не лезут — для брокеров/redb есть deferred‑модель.

А DependentTransactionBranch из 3.2.0 — это про что тогда?

Тот самый 3.2.0‑фикс изоляции ambient‑транзакции по ветке применяется только в Multicast и Splitter (плюс сам хелпер) — больше нигде. Вот его ядро (DependentTransactionBranch):

internal static async Task RunAsync(Func<Task> branch)
{
    var ambient = Transaction.Current;
    if (ambient is null) { await branch(); return; }   // нет транзакции — ноль накладных

    // Форкаем dependent-клон: enlistment этой ветки приватен её потоку.
    var dependent = ambient.DependentClone(DependentCloneOption.BlockCommitUntilComplete);
    using (var scope = new TransactionScope(dependent, TransactionScopeAsyncFlowOption.Enabled))
    {
        await branch();
        scope.Complete();
    }
    dependent.Complete();   // разрешаем родительский коммит (он ждал BlockCommitUntilComplete)
}

Он нужен только для ресурсов, которые реально энлистятся в Transaction.Current — то есть для случая, когда параллельные ветки Multicast/Splitter делают inline‑работу на энлистящемся коннекте. Scatter‑Gather в нём не нуждается: per‑exchange‑скоупы + deferred ITransactedAction уже дают изоляцию без шаринга коннекта. Это не асимметрия‑недоработка — это два механизма под два разных случая.

Честный трейд‑офф (это не баг, это дизайн)

TransactedProcessor.CommitActions коммитит отложенные действия последовательно в цикле, а ветки Scatter‑Gather/Multicast — это разные коннекты, разные транзакции. Значит: если ветка A закоммитилась, а коммит ветки B бросил — получишь частичный коммит. Кросс‑веточной атомарности на уровне БД здесь нет. Это плата за «не падает, не промоутится в MSDTC». Нужна реальная кросс‑веточная атомарность с enlistment — это Multicast/Splitter под .Transacted() с тем самым 3.2.0‑фиксом, не Scatter‑Gather.

Политики транзакций

.Transacted() под капотом использует TransactionPolicy — четыре готовых:

Политика

ScopeOption

Поведение

Default

Required

Присоединиться к ambient или создать новую. Таймаут 30с, ReadCommitted.

RequiresNew

RequiresNew

Всегда новая транзакция, ambient подвешивается.

Suppress

Suppress

Выполнить без транзакции (ambient подавляется).

Mandatory

(маркер)

Требует существующую ambient, иначе InvalidOperationException на создании scope.

TransactionDefinition ещё даёт Camel‑parity хуки: .Retry(attempts, delay) (оборачивает тело в RetryProcessor) и .DeadLetterChannel(uri) (на провале после отката шлёт exchange в DLC). То есть транзакционный блок с ретраями и dead‑letter — это:

From("kafka:orders?brokers=kafka:9092&groupId=w")
    .Transacted()
        .Retry(3, TimeSpan.FromMilliseconds(200))
        .To("sql:INSERT INTO orders ...")
        .To("kafka:orders.done?brokers=kafka:9092&transacted=true")
    .EndTransaction();

11. Родня по параллели: Splitter и Multicast

Scatter‑Gather — не единственный fan‑out в семействе. Рядом — Splitter (разбить тело на части и обработать каждую) и Multicast (послать копию N процессорам). У обоих, как и у Scatter‑Gather, есть опциональная агрегация, параллельность и stopOnException. Разница — в источнике «веток»:

  • Scatter‑Gather: ветки = эндпоинты (URI продюсеров), агрегатор обязателен.

  • Multicast: ветки = процессоры/суб‑пайплайны, агрегатор опционален (MulticastProcessor).

  • Splitter: ветки = части тела (SplitterProcessor), агрегатор опционален, и есть нюансы Camel‑совместимости.

Splitter, кстати, — самый «навороченный» по агрегации: его стратегия (IExchange?, IExchange) → IExchange вызывается даже для первой части с oldExchange == null (Camel‑контракт seed/wrap), и есть флаги parallelAggregate (агрегировать инлайн под локом из воркеров вместо детерминированного post‑pass) и aggregateOnException (включать упавшие части в агрегат). И именно у Splitter/Multicast в параллельном пути стоит тот самый DependentTransactionBranch.RunAsync:

// MulticastProcessor.ProcessParallel / SplitterProcessor.ProcessParallel
await DependentTransactionBranch.RunAsync(() => _targets[idx].Process(clone, ct));

Почему у них есть, а у Scatter‑Gather нет — разобрали выше: Splitter/Multicast гоняют inline‑процессоры (которые могут открыть энлистящийся коннект прямо в ветке), а Scatter‑Gather гоняет продюсеры по URI с per‑exchange скоупами. Разные риски — разные механизмы.

Простой Multicast из демо:

From("direct://demo-multicast")
    .Multicast(new[] { "direct://mcast-a", "direct://mcast-b", "direct://mcast-c" }, parallelProcessing: true)
    .Log("[MCAST] ◀ Все получатели обработали сообщение");

12. Saga — «немного другая»

Saga в redb.Route есть, но это не MassTransit'овский durable state‑machine. Это SagaProcessor — in‑process компенсация в рамках одного exchange:

public async Task Process(IExchange exchange, CancellationToken ct = default)
{
    var completedCount = 0;
    try
    {
        for (var i = 0; i < _steps.Length; i++)
        {
            ct.ThrowIfCancellationRequested();
            await _steps[i].Action(exchange, ct);
            completedCount++;
        }
    }
    catch (Exception ex) when (ex is not OperationCanceledException)
    {
        // Компенсации завершённых шагов — в ОБРАТНОМ порядке
        for (var i = completedCount - 1; i >= 0; i--)
        {
            if (_steps[i].Compensate is null) continue;
            try { await _steps[i].Compensate!(exchange, ct); }
            catch (Exception compEx) { _logger?.LogError(compEx, "Saga compensation for step {i} failed", i); }
        }
        throw;
    }
    if (_onCompletion is not null) await _onCompletion(exchange, ct);
}

Падения компенсаций логируются и не рвут откат остальных. DSL (SagaDefinition) — два стиля, callback и fluent‑scope:

From("direct:checkout")
    .Saga(s => s
        .Step(reserve, compensate: unreserve)
        .Step(charge,  compensate: refund)
        .Step(ship)                              // forward-only, без компенсации
        .OnCompletion(e => Log("заказ оформлен")));

Жёстко: нет персиста состояния, нет корреляции по сообщениям/времени, не переживает рестарт процесса. Это Camel‑стиль «routing slip с компенсацией», а не state‑machine с персистом. Совпадает только имя. Метрики, кстати, есть: SagaCompleted / SagaCompensated / SagaFailed.

И это, опять же, не недостаток — другая школа. Нужен durable‑saga с персистом поверх Kafka — собираешь из Aggregator + SQL/Redis‑стора + correlation‑ключа. Кубики на месте.


13. Outbox — его нет, и это правильно

Grep по всему src на Outbox — ноль. Встроенного outbox нет. И это не пробел, а позиция: transactional outbox — это паттерн, а не кнопка фреймворка. Лепить его в ядро — оверинжиниринг; кто хочет, соберёт свой за пять строк под свои инварианты:

// 1. Запись: бизнес-данные + строка в outbox под одной транзакцией.
//    Для SQL это ровно тот кейс, где System.Transactions-энлист работает на тебя:
//    одна .Transacted() обёртка → один ambient scope → атомарно.
From("direct:place-order")
    .Transacted()
        .To("sql:INSERT INTO orders(id, payload) VALUES (@id, @payload)")
        .To("sql:INSERT INTO outbox(id, topic, payload, sent) VALUES (@id, 'orders', @payload, false)")
    .EndTransaction();

// 2. Доставка: отдельный роут-поллер читает outbox и публикует.
From("sql:SELECT * FROM outbox WHERE sent = false ORDER BY id?outputType=SelectList&delay=1000")
    .Split(body => (IEnumerable<object?>)body)        // по строке на сообщение
        .To("kafka:orders?brokers=kafka:9092")
        .To("sql:UPDATE outbox SET sent = true WHERE id = @id");

Хочешь inbox‑паттерн, дедуп по messageId, TTL на строки, claim‑check для больших payload'ов — допишешь. Никто не навязывает свою схему outbox‑таблицы и свою семантику ретраев. Это и есть «идём к Camel»: меньше зашитой магии — больше явной сборки.


14. Итог: размен, а не «лучше/хуже»

MassTransit

redb.Route (Camel‑школа)

Saga

durable state‑machine, персист, переживает рестарт

in‑process компенсация в рамках exchange

Outbox

фича фреймворка

твой роут из SQL/Redis за пять строк

Транзакции

абстракция фреймворка

две явные модели: deferred ITransactedAction (брокеры+redb) / System.Transactions (SQL)

Kafka «transactional»

EOS из коробки

идемпотентный продюсер + deferred‑commit (не EOS)

Философия

батарейки в комплекте, делай как решили

EIP + коннекторы + DSL, композишь сам

Это не «redb.Route лучше MassTransit». Это другой размен: меньше зашитой магии — больше явной сборки из примитивов. Если твои сценарии не лезут в чужую модель saga/outbox — Camel‑подход даёт собрать ровно своё. Если лезут — может, тебе и не надо никуда уходить.

Scatter‑Gather, к слову, в проде у нас работает отлично — один процессор, который и разсылает, и собирает, параллельно, с предсказуемым порядком склейки и понятной транзакционной семантикой. Ровно тот случай, когда EIP‑примитив закрывает реальную задачу без единой строчки инфраструктурного кода.

Версия в проде — 3.2.0CHANGELOG здесь. Куда наступали, что перепроверить (привет, transacted‑Kafka), какие EIP разобрать дальше — в комментарии, разберём по коду.