Комментарии 3
Будет ли в следующих частях:
Как организовать ретраи или circuit-breaker? Что будет, если один из процессоров в стриме упадет с ошибкой?
К примеру, можно ли хотя бы в отдельный топик направить все сообщения, которые вызвали сбои топологии?
https://habr.com/ru/articles/747658/comments/#comment_25786932 спасибо @FruTbза наводку
Тут наверное не нужно искать решение в плоскости Kafka Streams, на мой взгляд
Как организовать ретраи или circuit-breaker? Что будет, если один из процессоров в стриме упадет с ошибкой?
Обработка ошибок уже зависит от бизнес потребностей, также там было упомянуто про akka, у меня тут к сожалению нет опыта, но стандартно обработать можно так
KStream<String, String> stream = builder.stream("input-topic");
stream.mapValues(value -> {
int maxRetries = 3;
int attempt = 0;
while (attempt < maxRetries) {
try {
// Ваша логика обработки
return process(value);
} catch (Exception e) {
attempt++;
if (attempt == maxRetries) {
// Логирование и обработка после максимального количества попыток
// Например, отправка в Dead Letter Topic
}
}
}
return null; // Или другое значение по умолчанию
});
но так же есть удобная библиотека https://github.com/resilience4j/resilience4j
import io.github.resilience4j.retry.Retry;
import io.github.resilience4j.retry.RetryConfig;
import io.github.resilience4j.circuitbreaker.CircuitBreaker;
import io.github.resilience4j.circuitbreaker.CircuitBreakerConfig;
https://github.com/resilience4j/resilience4j/tree/master/resilience4j-circuitbreaker
https://github.com/resilience4j/resilience4j/tree/master/resilience4j-retry
К примеру, можно ли хотя бы в отдельный топик направить все сообщения, которые вызвали сбои топологии?
Тут можно настроить обработчик исключений на уровне конфигурации Kafka Streams
Properties props = new Properties();
props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
LogAndContinueExceptionHandler.class);
props.put(StreamsConfig.DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG,
DefaultProductionExceptionHandler.class);
или обрабатывать прям в коде
Обеспечение отказоустойчивости (resiliense and fault tolerance) идет от самой кафки. Т.е. самостоятельно организовывать в коде ничего не надо - только настраивать. А вот с обработкой ошибок не так всё не то, чтобы хорошо - скорее неудобно: необходимо либо их полностью обрабатывать в пределах отдельного этапа обработки, либо обрабатывать и разделять (split) дальнейший поток обработки. Это является одной из особенностей стримовой обработки сообщений в целом.
Kafka Streams ч1: Привет, мир