Как стать автором
Обновить

Комментарии 3

Будет ли в следующих частях:

Как организовать ретраи или circuit-breaker? Что будет, если один из процессоров в стриме упадет с ошибкой?

К примеру, можно ли хотя бы в отдельный топик направить все сообщения, которые вызвали сбои топологии?

https://habr.com/ru/articles/747658/comments/#comment_25786932 спасибо @FruTbза наводку

Тут наверное не нужно искать решение в плоскости Kafka Streams, на мой взгляд

Как организовать ретраи или circuit-breaker? Что будет, если один из процессоров в стриме упадет с ошибкой?

Обработка ошибок уже зависит от бизнес потребностей, также там было упомянуто про akka, у меня тут к сожалению нет опыта, но стандартно обработать можно так

KStream<String, String> stream = builder.stream("input-topic");

stream.mapValues(value -> {
    int maxRetries = 3;
    int attempt = 0;
    while (attempt < maxRetries) {
        try {
            // Ваша логика обработки
            return process(value);
        } catch (Exception e) {
            attempt++;
            if (attempt == maxRetries) {
                // Логирование и обработка после максимального количества попыток
                // Например, отправка в Dead Letter Topic
            }
        }
    }
    return null; // Или другое значение по умолчанию
});

но так же есть удобная библиотека https://github.com/resilience4j/resilience4j

import io.github.resilience4j.retry.Retry;
import io.github.resilience4j.retry.RetryConfig;

import io.github.resilience4j.circuitbreaker.CircuitBreaker;
import io.github.resilience4j.circuitbreaker.CircuitBreakerConfig;

https://github.com/resilience4j/resilience4j/tree/master/resilience4j-circuitbreaker
https://github.com/resilience4j/resilience4j/tree/master/resilience4j-retry

К примеру, можно ли хотя бы в отдельный топик направить все сообщения, которые вызвали сбои топологии?

Тут можно настроить обработчик исключений на уровне конфигурации Kafka Streams

Properties props = new Properties();
props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
          LogAndContinueExceptionHandler.class);
props.put(StreamsConfig.DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG,
          DefaultProductionExceptionHandler.class);

или обрабатывать прям в коде

Обеспечение отказоустойчивости (resiliense and fault tolerance) идет от самой кафки. Т.е. самостоятельно организовывать в коде ничего не надо - только настраивать. А вот с обработкой ошибок не так всё не то, чтобы хорошо - скорее неудобно: необходимо либо их полностью обрабатывать в пределах отдельного этапа обработки, либо обрабатывать и разделять (split) дальнейший поток обработки. Это является одной из особенностей стримовой обработки сообщений в целом.

Зарегистрируйтесь на Хабре, чтобы оставить комментарий

Публикации