Как стать автором
Обновить
98.94
beeline cloud
Безопасный облачный провайдер

Custom Kafka Deserializer и Spring’овый контекст. Как инжектить в статические поля

Время на прочтение4 мин
Количество просмотров2.4K

Привет, Хабр! Меня зовут Николай Пискунов, я ведущий разработчик в подразделении Big Data. 

В блоге beeline cloud я рассказывал о Spring Data JPA, Hibernate, делился личными наблюдениями, как облегчить себе жизнь при написании тестов. Сегодня речь о другом: расскажу, как инжектить в статические поля. Как всегда — на примерах. Поехали.

На практике десериализаторов, представленных в ядре Spring, хватает в 99% случаев. Но бывают ситуации, когда всё же требуется описать свою логику предобработки входящего сообщения.

Для наглядности предлагаю рассмотреть гипотетический случай, когда на вход Kafka подается строка лога с задаваемым разделителем. В этой строке нас интересует лишь часть сообщения: мы будем делить строку по определенному символу и собирать из получившегося массива новую строку.

Задавать разделитель и индексы в файле application.properties мы планируем так:

kafka.message.deserializer.delimiter=,
kafka.message.deserializer.indexes=0,2,3,5

Сразу скажу, что org.apache.kafka.common.serialization.Deserializer и все его реализации не знают о спринговом контексте ничего — от слова совсем. Даже если попытаться сделать из него бин, пометив соответствующей аннотацией, например @Component.

 Для этого напишем сам Deserializer и попробуем прямо в него заинжектить требуемые настройки, используя спринговую аннотацию @Value:

@Slf4j
@Component
public class SmartStringDeserializer implements Deserializer<String> {
   @Value("${kafka.message.deserializer.delimiter}")
   String delimiter;
 
   @Value("${kafka.message.deserializer.indexes}")
   List<Integer> indexes;
   @Override
   public void configure(Map<String, ?> configs, boolean isKey) {
       Deserializer.super.configure(configs, isKey);
   }
 
   @Override
   public String deserialize(String s, byte[] bytes) {
       return setValueByIndex(bytes);
   }
 
   @Override
   public void close() {
       Deserializer.super.close();
   }
 
   private String setValueByIndex(byte[] bytes) {
       String msg = new String(bytes, StandardCharsets.UTF_8);
       LOGGER.debug("Income message: {}", msg);
       if (indexes.getFirst() == -1) {
       	return msg;
       }
       String[] incomeMessage = msg.split(delimiter);
       String formattedIncomeMessage = indexes.stream()
           	.map(index -> incomeMessage[index])
           	.collect(Collectors.joining(delimiter));
       LOGGER.debug("Consumed message: {}", formattedIncomeMessage);
 
       return formattedIncomeMessage;
   }
}

К сожалению, нам вернется null при попытке получить значения:

А если мы проставим стандартные значения в @Values, результат будет таким же:

 @Value("${kafka.message.deserializer.delimiter:,}")

 Но можно использовать статические поля, описав их в соседнем классе. Вот только надо придумать, как в них заинжектить данные из спрингового контекста. Конечно, это должен быть спринговый бин, например @Component, и в нем мы определим статические поля, включая поля, которые будут заполняться данными из properties:

@Component
public class DeserializerUtils {
 
   public static String DELIMITER;
 
   public static List<Integer> INDEXES;
 
   private String delimiter;
 
   private List<Integer> indexes;
   …
}

Теперь напишем сеттеры, они-то и будут присваивать значения статическим полям:

@Component
public class DeserializerUtils {
 
   public static String DELIMITER;
 
   public static List<Integer> INDEXES;
 
   private String delimiter;
 
   private List<Integer> indexes;
 
   @Value("${kafka.message.deserializer.delimiter:,}")
   private void setDelimiter(String delimiter){
       DeserializerUtils.DELIMITER = delimiter;
   }
 
   @Value("${kafka.message.deserializer.indexes:-1}")
   private void setIndexes(List<Integer> indexes){
       DeserializerUtils.INDEXES = indexes;
   }
}

Важно помнить, что @Values необходимо навешивать над методами, иначе не заработает. И, если вы решите обновить контекст без перезапуска приложения, например, поправив значения в файле конфигураций, обновятся и статические поля.

 Теперь перепишем сам десериалайзер:

@Slf4j
public class SmartStringDeserializer implements Deserializer<String> {
 
   @Override
   public void configure(Map<String, ?> configs, boolean isKey) {
       Deserializer.super.configure(configs, isKey);
   }
 
   @Override
   public String deserialize(String s, byte[] bytes) {
       return setValueByIndex(bytes);
   }
 
   @Override
   public void close() {
       Deserializer.super.close();
   }
 
   private String setValueByIndex(byte[] bytes) {
       String delimiter = DeserializerUtils.DELIMITER;
       List<Integer> indexes = DeserializerUtils.INDEXES;
       String msg = new String(bytes, StandardCharsets.UTF_8);
       LOGGER.debug("Income message: {}", msg);
       if (indexes.getFirst() == -1) {
       	return msg;
       }
       String[] incomeMessage = msg.split(delimiter);
       String formattedIncomeMessage = indexes.stream()
           	.map(index -> incomeMessage[index])
           	.collect(Collectors.joining(delimiter));
       LOGGER.debug("Consumed message: {}", formattedIncomeMessage);
 
       return formattedIncomeMessage;
   }
}

А вот теперь мы можем конфигурировать Deserializer, используя спринговый контекст:

У такого подхода есть свои минусы. По факту у нас дублируются данные в хипе, которые не удалит GC ввиду того, что первые хранятся в контексте спринга, вторые — в статических полях. Поэтому старайтесь инжектить конкретные бины и не делайте так:

public static ApplicationContext CONTEXT;
private ApplicationContext context;
 
private void setIndexes(ApplicationContext context){
   DeserializerUtils.CONTEXT = context;
}

beeline cloud — secure cloud provider. Разрабатываем облачные решения, чтобы вы предоставляли клиентам лучшие сервисы.

Теги:
Хабы:
Всего голосов 8: ↑6 и ↓2+8
Комментарии3

Публикации

Информация

Сайт
cloud.beeline.ru
Дата регистрации
Дата основания
Численность
501–1 000 человек
Местоположение
Россия