Search
Write a publication
Pull to refresh
513.08
OTUS
Развиваем технологии, обучая их создателей

Fast Lane / Slow Lane: разделение трафика через две очереди Kafka

Level of difficultyEasy
Reading time7 min
Views428

Привет, Хабр!

Сегодня мы рассмотрим Fast Lane / Slow Lane для Kafka: как одним росчерком кода защитить SLA‑критичный поток от толстых сообщений, не перекраивая пол‑стека и не устраивая зоопарк из очередей.

Kafka читает батчами и строго по порядку. Если впереди в логах стоит гигантский JSON, consumer обязан проглотить его прежде, чем добраться до маленького heartbeat. Лёгкие события застревают, медианное время обработки идёт в космос, SLA горит синим пламенем. Разнести трафик на fast lane и slow lane — самый прямой способ убрать взаимное влияние. Лёгкие события летят в приоритетный топик, тяжёлые отправляются в отдельный, медленный. Теоретически можно пытаться делать приоритизацию внутри одной очереди, но тогда упираемся в порядковую семантику Kafka и получаем latency‑капкан.

Топология на уровне брокера

Создаём два топика с разными настройками. Fast Lane держим с большим количеством партиций и жёстким лимитом на размер сообщения, Slow Lane — с меньшим количеством партиций, но с повышенным message.max.bytes. Пример Terraform‑модуля:

resource "kafka_topic" "events_fast" {
  name               = "events.input.fast"
  replication_factor = 3
  partitions         = 12

  config = {
    "max.message.bytes" = "1048576"   # 1 MB
    "retention.ms"      = "604800000" # 7 дней
  }
}

resource "kafka_topic" "events_slow" {
  name               = "events.input.slow"
  replication_factor = 3
  partitions         = 6

  config = {
    "max.message.bytes" = "8388608"   # 8 MB
    "retention.ms"      = "259200000" # 3 дня
  }
}

Продюсер

Первый вариант это решать по размеру сообщение/события:

@Service
@RequiredArgsConstructor
public class EventRouter {

    private final KafkaTemplate<String, byte[]> kafka;

    public void send(byte[] payload) {
        if (payload.length > 900_000) {          // > ~900 КБ
            kafka.send("events.input.slow", payload);
        } else {
            kafka.send("events.input.fast", payload);
        }
    }
}

Логика размазана по сервисам; если понадобится сложная классификация — придётся менять все продюсеры.

RecordInterceptor: централизуем роутинг

@Configuration
public class KafkaProducerConfig {

    @Bean
    public ProducerFactory<String, byte[]> producerFactory() {
        Map<String, Object> props = Map.of(
            ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrap,
            ProducerConfig.ACKS_CONFIG, "all",
            ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4", // дешёвое сжатие
            ProducerConfig.LINGER_MS_CONFIG, 5,
            ProducerConfig.BATCH_SIZE_CONFIG, 32_768
        );
        DefaultKafkaProducerFactory<String, byte[]> factory =
            new DefaultKafkaProducerFactory<>(props);

        factory.addPostProcessor((producer, tx) ->    // подключаем интерцептор
            producer.setInterceptor(new RoutingInterceptor()));
        return factory;
    }
}

public class RoutingInterceptor implements ProducerInterceptor<String, byte[]> {

    @Override
    public ProducerRecord<String, byte[]> onSend(ProducerRecord<String, byte[]> record) {
        byte[] payload = record.value();
        if (payload != null && payload.length > 900_000) {
            return new ProducerRecord<>("events.input.slow", record.key(), payload);
        }
        return new ProducerRecord<>("events.input.fast", record.key(), payload);
    }
}

Интерцептор прозрачен для бизнес‑кода: сервисы зовут kafkaTemplate.send("events.input", …) и ничего не знают про дорожки.

Kafka Streams для динамического бранчинга

Когда нужны более хитрые правила, например обогащение события метаданными или ML‑моделью, удобнее взять Kafka Streams:

@Bean
public Topology topology() {
    StreamsBuilder builder = new StreamsBuilder();

    KStream<String, Event> source = builder.stream("events.input");

    Predicate<String, Event> isSmall =
        (k, v) -> v.size() <= 900_000;
    Predicate<String, Event> isLarge =
        (k, v) -> v.size() > 900_000;

    KStream<String, Event>[] branches = source.branch(isSmall, isLarge); // fast / slow

    branches[0].to("events.input.fast");
    branches[1].to("events.input.slow");

    return builder.build();
}

Streams‑процесс на отдельном сервисе — и у продюсеров чистая боль, а правила меняются в одном месте.

Конфигурация consumer’ов

Быстрый контейнер

@Bean
public ConcurrentKafkaListenerContainerFactory<String, byte[]> fastFactory(
        ConsumerFactory<String, byte[]> base) {

    var f = new ConcurrentKafkaListenerContainerFactory<String, byte[]>();
    f.setConsumerFactory(base);
    f.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
    f.setBatchListener(true);
    f.setConcurrency(6);

    // poll меньше 100 мс — реагируем быстро
    f.getContainerProperties().setIdleBetweenPolls(50);

    // после 1000 рекордов делаем commit
    f.setAckOnError(false);
    return f;
}

Медленный контейнер

@Bean
public ConcurrentKafkaListenerContainerFactory<String, byte[]> slowFactory(
        ConsumerFactory<String, byte[]> base) {

    var f = new ConcurrentKafkaListenerContainerFactory<String, byte[]>();
    f.setConsumerFactory(base);
    f.setBatchListener(true);
    f.setConcurrency(2);

    // читаем больше данных за один fetch
    Map<String, Object> props = f.getConsumerFactory().getConfigurationProperties();
    props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 2_097_152); // 2 MB
    props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, 8_388_608); // 8 MB
    return f;
}

Параметры fetch.max.bytes и max.partition.fetch.bytes: для slow lane разрешаем больше, иначе толстый пакет не пролезет и consumer поймает RecordTooLargeException.

Слушатели с ручным ack

@Slf4j
@Service
public class FastListener {

    @KafkaListener(
        id = "fast-listener",
        topics = "events.input.fast",
        containerFactory = "fastFactory"
    )
    public void onFast(List<byte[]> messages, Acknowledgment ack) {
        messages.forEach(this::processFast);
        ack.acknowledge(); // commit offset сразу
    }
}

@Slf4j
@Service
public class SlowListener {

    @KafkaListener(
        id = "slow-listener",
        topics = "events.input.slow",
        containerFactory = "slowFactory"
    )
    public void onSlow(List<byte[]> messages, Acknowledgment ack) {
        for (byte[] m : messages) {
            try {
                processSlow(m);
            } catch (Exception ex) {
                sendToDlq(m, ex);
            }
        }
        ack.acknowledge();
    }
}

В slow lane часто нужна Dead Letter Queue. Сразу выделяем events.input.slow.dlq и не ломаем голову.

Динамический паузинг slow‑консьюмера

Когда fast lane начинает отставать, можно временно остановить slow listener, сохранив heartbeat, чтобы не получить ребаланс. Spring‑Kafka умеет это из коробки.

@Component
@RequiredArgsConstructor
public class SlowLaneThrottler {

    private final KafkaListenerEndpointRegistry registry;

    // запускаем из Spring‑Scheduler
    @Scheduled(fixedDelay = 30_000)
    public void controlSlowLane() {
        MessageListenerContainer fast = registry.getListenerContainer("fast-listener");
        MessageListenerContainer slow = registry.getListenerContainer("slow-listener");

        long lagFast = lag("events.input.fast");
        boolean overloaded = lagFast > 10_000;

        if (overloaded && !slow.isPauseRequested()) {
            slow.pause();
            log.warn("Slow lane paused, fast lag={}", lagFast);
        } else if (!overloaded && slow.isPauseRequested()) {
            slow.resume();
            log.info("Slow lane resumed");
        }
    }
}

Метод lag берёт данные из JMX/Prometheus; реализацию опустим ради краткости.

Dynamic Throttle API

Начиная с Kafka 3.3 можно отдавать broker‑side throttle через Admin API: ограничиваем скорость отдачи для конкретных consumer‑групп. В Spring‑Kafka это делается так:

@Bean
public KafkaAdmin.NewPartitions throttleGroup() {
    return (admin) -> admin.alterConsumerGroupOffsets(
            "slow-consumer-group",
            Map.of(new TopicPartition("events.input.slow", 0),
                   new OffsetAndMetadata(0L)),
            new AlterConsumerGroupOffsetsOptions().timeoutMs(5_000)
                    .throttle(512 * 1024)); // 512 KiB/s
}

Такая мера включается по алерту и почти не требует остановки приложения.

Метрики и алерты

Подключаем Micrometer: management.metrics.enable.kafka: true — и получаем пачку готовых метрик. Главное: лейблы client.id, topic. Вывешиваем в Grafana два графика:

  • kafka_consumer_records_lag_max{topic="events.input.fast"}

  • kafka_consumer_records_lag_max{topic="events.input.slow"}

Держим fast‑lag < 1000, slow‑lag < 100 000. Алерт: если fast‑lag > 10 000 за 5 минут — пауза slow‑консьюмера и Slack‑уведомление.

Некоторые ошибки

Часто промахиваются на этапе тюнинга fast‑lane: оставляют одинаковое значение max.poll.records для обоих слушателей, и быстрый consumer захлёбывается, потому что на него ложится объём батча, рассчитанный на slow‑lane; отсюда лавинообразный рост лагов. Вторая типовая оплошность это отправлять события без сжатия: когда payload приближается к порогу 1 MB, брокер отклоняет запись, а продюсер отвечает ошибкой RecordTooLarge, хотя проблему решали бы две строчки compression.type=lz4.

Другая пара ошибок связана с ресурсными лимитами. Если забыть про quota, «медленный» consumer при высоком fetch.max.bytes идёт быстрее «быстрого» и съедает пропускную способность брокера, сводя на нет всю идею приоритизации. И, наконец, retention для slow‑topic: день хранения выглядит разумно, пока ночью не прилетит пик крупных событий; если retention меньше фактической волны, самые важные сообщения исчезнут до обработки, и восстановить их будет некуда.


Итог

Если вы уже внедряли подобную схему разделения трафика или пошли другим путём — делитесь опытом в комментариях. Интересно посмотреть и обсудить, как вы решали проблему приоритизации событий в Kafka.

Если вы работаете с высоконагруженными системами и интересуетесь архитектурными подходами, приглашаем вас на два открытых урока курса Highload Architect:

  • 12 августа в 20:00 — «Мониторинг в высоконагруженных проектах»
    Разберём, как выстроить наблюдаемость системы под постоянной нагрузкой: какие метрики важны, как быстро выявлять узкие места и реагировать на инциденты до того, как они станут проблемой.

  • 20 августа в 20:00 — «Wasm на сервере в высоконагруженных системах»
    Поговорим о применении WebAssembly на серверной стороне. Вы узнаете, зачем его интегрируют в высоконагруженные сервисы, какие преимущества даёт песочница и как подход влияет на производительность.

Кроме того, вы можете пройти тестирование, чтобы проверить свои знания и навыки в области высоконагруженных систем.

Tags:
Hubs:
+2
Comments0

Articles

Information

Website
otus.ru
Registered
Founded
Employees
101–200 employees
Location
Россия
Representative
OTUS