Как стать автором
Поиск
Написать публикацию
Обновить
7
0
Temirlan Amanbayev @temirlan100

Staff Software Engineer с 12+ лет опытом работы

Отправить сообщение

Точно не знаю как это делать в Kafka Streams, я лично такие вещи отлавливаю и уже ручками делаю нужную обработку удаляю окно

спасибо, согласен с вами, про этот аспект упустил в статье

многое зависит от объемов данных, если позволяют + в эко системе Apache Kafka и команда готова поддерживает то почему бы и нет для Kafka Streams, но думаю при больших объемах возможно лучше использовать другие ETL пайплайны и инструменты и да делать это вне шины

Тут наверное не нужно искать решение в плоскости Kafka Streams, на мой взгляд

Как организовать ретраи или circuit-breaker? Что будет, если один из процессоров в стриме упадет с ошибкой?

Обработка ошибок уже зависит от бизнес потребностей, также там было упомянуто про akka, у меня тут к сожалению нет опыта, но стандартно обработать можно так

KStream<String, String> stream = builder.stream("input-topic");

stream.mapValues(value -> {
    int maxRetries = 3;
    int attempt = 0;
    while (attempt < maxRetries) {
        try {
            // Ваша логика обработки
            return process(value);
        } catch (Exception e) {
            attempt++;
            if (attempt == maxRetries) {
                // Логирование и обработка после максимального количества попыток
                // Например, отправка в Dead Letter Topic
            }
        }
    }
    return null; // Или другое значение по умолчанию
});

но так же есть удобная библиотека https://github.com/resilience4j/resilience4j

import io.github.resilience4j.retry.Retry;
import io.github.resilience4j.retry.RetryConfig;

import io.github.resilience4j.circuitbreaker.CircuitBreaker;
import io.github.resilience4j.circuitbreaker.CircuitBreakerConfig;

https://github.com/resilience4j/resilience4j/tree/master/resilience4j-circuitbreaker
https://github.com/resilience4j/resilience4j/tree/master/resilience4j-retry

К примеру, можно ли хотя бы в отдельный топик направить все сообщения, которые вызвали сбои топологии?

Тут можно настроить обработчик исключений на уровне конфигурации Kafka Streams

Properties props = new Properties();
props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
          LogAndContinueExceptionHandler.class);
props.put(StreamsConfig.DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG,
          DefaultProductionExceptionHandler.class);

или обрабатывать прям в коде

Ни в какой секте я не состою =)
Мне кажется разбираться в этом не есть плохо.

Хорошо что реализуют и знать это при коммуникациях очень полезно

В каком виде описываются схемы?

{
   "type" : "record",
   "namespace" : "someSchema",
   "name" : "Employee",
   "fields" : [
      { "name" : "Name" , "type" : "string" },
      { "name" : "Age" , "type" : "int" }
   ]
}

А как быстро стартануть то? Какой-то есть материал, который вам сильно помог?

Сейчас и не вспомню к сожалению) все через документацию

Как их закидывать? UI или по API?

API =) UI тулзы не смотрел

Совместимость обеспечивается тем, что kafka преобразует v1 в v2 каким-то внутренним механизмом, если ей на вход прийдет v1?

Просто алерты которые через логи можно как то аггрегировать и анализировать в дальнейшем, но чтобы слету как-то преобразовать, такого не знаю и не встречал в своей практики =)

насколько внедрение схем влияет на пропускную способность или насколько оно увеличивает накладные расходы на проверку?

Мы наблюдали, но просадки в performance не заметили

сервис со схемами является отдельным хранилищем в которое кафка по апи ходит

именно так, но мысль про отдельный топик очень интересна!

Если данные летят в старом deprecated формате, то подразумевается, что мы как-то записываем логи или метрики об этом?

Логирование происходит в сервисе и в самих логах Kafka

Чтобы понять уменьшается ли у нас объем таких сообщений и можем ли мы убивать обратную совместимость или как мониторинг осуществляется на соответствие схемам?

Не уверен что до конца понял Ваш вопрос, есть топики с обязательным требованиям к обратной совместимости и если там что то Deprecated, происходил переезд в новый топик

Отдельное спасибо за Jolt! Так как схожие проблемы всегда решали custom разработкой на Groovy =)

Информация

В рейтинге
Не участвует
Дата рождения
Зарегистрирован
Активность

Специализация

Бэкенд разработчик, Архитектор программного обеспечения
Ведущий
От 600 000 ₽
Java
Python