Всем привет, меня зовут Александр Бобряков. Я техлид в команде МТС Аналитики, занимаюсь Real-Time обработкой данных. Мы начали использовать фреймворк Apache Flink, и я решил поделиться на Хабре своим опытом внедрения этой технологии в цикле статей.

В предыдущей статье — «Apache Flink. Как работает дедупликация данных в потоке Kafka-to-Kafka?» — я рассказывал про построение пайплайна Kafka-to-Kafka с промежуточным разделением потока и дедупликацией событий. Также разобрались, что такое состояние оператора и зачем оно нужно.

В этой статье добавим возможность динамического определения топика в Kafka для каждого события, куда его нужно записать.

Список моих статей про Flink:

Весь разбираемый исходный код можно найти в репозитории. Этот пост соответствует релизной ветке с названием release/3_WrappedSinkMessage.

Оглавление статьи:

Постановка задачи динамически определяемого топика Kafka Sink

В предыдущей статье мы построили основной пайплайн, но все итоговые события ProductMessage попадали в единственный общий топик. Теперь немного усложним логику. Напомню, что входное сообщение ClickMessage имеет вид:

@Data
@Builder
@Jacksonized
@JsonIgnoreProperties(ignoreUnknown = true)
public class ClickMessage {
   @JsonProperty(required = true)
   @JsonPropertyDescription("User id")
   private UUID userId;

   @JsonPropertyDescription("Clicked object")
   private String object;

   @JsonPropertyDescription("User platform")
   @JsonDeserialize(using = Platform.Deserializer.class)
   private Platform platform;

   @JsonPropertyDescription("Product name")
   private String productName;

   @JsonPropertyDescription("Product topic")
   private String productTopic;

   @JsonProperty(required = true)
   @JsonPropertyDescription("Timestamp")
   private Long timestamp;

   @JsonPropertyDescription("Additional data")
   private Map<String, Object> data;
}

Теперь бизнес-клиентам захотелось бы записывать соответствующее сообщение ProductMessage (полученное преобразованием над ClickMessage) в топик, указанный в поле ClickMessage.productTopic, чтобы каждый продукт (клиент) мог подключаться и читать сообщения только для себя.

Статическая реализация определения топика Kafka Sink

Также в предыдущей части мы реализовали Kafka Sink. Давайте вспомним как он выглядел (за подробными объяснениями кода обратитесь к предыдущей статье), и доработаем его:

@Component
@RequiredArgsConstructor
public class ProductMessageKafkaSinkProvider implements SinkProvider<ProductMessage> {
   private final KafkaProperties kafkaProperties;
   private final SerializationSchema<ProductMessage> serializationProductMessageSchema;

   @Override
   public Sink<ProductMessage> createSink() {
       return KafkaSink.<ProductMessage>builder()
                  .setBootstrapServers(kafkaProperties.getBootstrapServers())
                  .setRecordSerializer(KafkaRecordSerializationSchema.<ProductMessage>builder()
                                           .setTopic(kafkaProperties.getTopics().getProductTopic())
                                           .setValueSerializationSchema(serializationProductMessageSchema)
                                           .build())
                  .setDeliveryGuarantee(NONE)
                  .build();
   }
}

В дополнение к этому Kafka Sink предоставляет класс-сериализатор, который отвечает за способ сериализации сообщения ProductMessage перед отправкой в топик Kafka с помощью Jackson:

@Component
@RequiredArgsConstructor
class ProductMessageSerializationSchema implements SerializationSchema<ProductMessage> {
   private static final long serialVersionUID = 1;

   private transient ObjectMapper objectMapper;

   @Override
   public void open(InitializationContext context) {
       objectMapper = createObjectMapper();
   }

   @Override
   @SneakyThrows
   public byte[] serialize(ProductMessage element) {
       return objectMapper.writeValueAsBytes(element);
   }
}

В Kafka Sink мы статически определяем выходной топик setTopic(). Он задан в наших application.yml настройках. Теперь это поведение нужно изменить — Kafka Sink должен динамически определить топик, в который нужно записать событие ProdcutMessage.

Для начала нам поможет метод setTopicSelector() вместо предыдущего setTopic(). Реализация TopicSelector динамически определяет  топик на основе текущего записываемого события, определяемого в лямбда-функции.

/**
 * Selects a topic for the incoming record.
 *
 * @param <IN> type of the incoming record
 */
@PublicEvolving
public interface TopicSelector<IN> extends Function<IN, String>, Serializable {}

Проблема в том, что в выходном ProductMessage нет информации об этом топике.  А топик хранится в поле входного сообщения ClickMessage.productTopic. Значит — нам нужна обертка, которая будет содержать сам ProductMessage и важную мета-информацию.

WrappedSinkMessage — обертка над отправляемым событием

Чтобы сохранить информацию о топике и самом событии сделаем обертку WrappedSinMessage:

@Getter
@EqualsAndHashCode
@RequiredArgsConstructor
public class WrappedSinkMessage<T> implements Serializable {
   private static final long serialVersionUID = 1L;

   private final Meta meta;
   private final T message;

   @Getter
   @EqualsAndHashCode
   @RequiredArgsConstructor
   public static class Meta implements Serializable {
       private static final long serialVersionUID = 1L;

       public final String targetTopicName;
   }
}

В ней мы храним само целевое сообщение ProductMessage для отправки в Sink, а также дополнительную meta-информацию — в нашем случае только целевой топик targetTopicName. Поэтому перед непосредственным применением sinkTo() в пайплайне нам нужно будет привести входное сообщение ClickMessage к этой обертке. В поле метаданных можно также хранить любую нужную вам информацию. Например, мы добавляем в него данные, требующиеся для метрик, такие как latency обработки, типы обрабатываемых событий и так далее.

Забегая вперед, скажу, что такой подход — обертка для события еще на старте — очень хорошо себя зарекомендовал. Потому что во многих задачах нужно «нести» информацию о текущем событии до самого конца пайплайна, изменяя исходное событие сколько угодно раз

Динамическая реализация определения топика Kafka Sink

Теперь нужно изменить Kafka Sink и алгоритм сериализации сообщения, ведь к нам теперь поступает WrappedSinMessage с целевымProductMessage внутри. Взглянем на изменение сериализатора:

@Component
@RequiredArgsConstructor
class WrappedProductMessageSerializationSchema implements SerializationSchema<WrappedSinkMessage<ProductMessage>> {
   private static final long serialVersionUID = 1;

   private transient ObjectMapper objectMapper;

   @Override
   public void open(InitializationContext context) {
       objectMapper = createObjectMapper();
   }

   @Override
   @SneakyThrows
   public byte[] serialize(WrappedSinkMessage<ProductMessage> element) {
       return objectMapper.writeValueAsBytes(element.getMessage());
   }
}

В классе изменилась только сериализация непосредственно внутреннего ProductMessage в строчке element.getMessage(). Тут все довольно прозрачно. А вот в самом создании Kafka Sink мы имеем доступ к WrappedSinkMessage, из которого легко получить сам целевой топик:

@Component
@RequiredArgsConstructor
public class WrappedProductMessageKafkaSinkProvider implements SinkProvider<WrappedSinkMessage<ProductMessage>> {
   private final KafkaProperties kafkaProperties;
   private final SerializationSchema<WrappedSinkMessage<ProductMessage>> serializationProductMessageSchema;

   @Override
   public Sink<WrappedSinkMessage<ProductMessage>> createSink() {
       return KafkaSink.<WrappedSinkMessage<ProductMessage>>builder()
                  .setBootstrapServers(kafkaProperties.getBootstrapServers())
                  .setRecordSerializer(KafkaRecordSerializationSchema.<WrappedSinkMessage<ProductMessage>>builder()
                                           .setTopicSelector(wrappedMessage -> wrappedMessage.getMeta().getTargetTopicName())
                                           .setValueSerializationSchema(serializationProductMessageSchema)
                                           .build())
                  .setDeliveryGuarantee(NONE)
                  .build();
   }
}

Основное изменение только в строке с методом setTopicSelector() с приведением остальных типов из ProductMessage к WrappedSinkMessage<ProductMessage>. Значит, можем  убрать в рамках репозитория настройку kafka.topics.product-topic.

Трансформация входного события к обертке

Итак, у нас был класс для трансформации входного ClickMessage в выходной ProductMessage. А сейчас нам нужно преобразовать ClickMessage в целевой WrappedSinkMessage<ProductMessage>. Для этого мы совсем немного дополним предыдущую реализацию:

@Slf4j
@RequiredArgsConstructor
public class ClickMessageToWrappedProductSinkMessageMapFunction implements FlatMapFunction<ClickMessage, WrappedSinkMessage<ProductMessage>> {
   private static final long serialVersionUID = 1L;

   @Override
   public void flatMap(ClickMessage clickMessage, Collector<WrappedSinkMessage<ProductMessage>> out) {
       try {
           final var productMessage = ProductMessage.builder()
                                          .userId(clickMessage.getUserId())
                                          .productName(clickMessage.getProductName())
                                          .object(clickMessage.getObject())
                                          .platform(clickMessage.getPlatform())
                                          .timestamp(clickMessage.getTimestamp())
                                          .build();
           final var wrappedMessage = new WrappedSinkMessage<>(
               new WrappedSinkMessage.Meta(clickMessage.getProductTopic()),
               productMessage);
           out.collect(wrappedMessage);
       } catch (Exception e) {
           log.error("Error converting ClickMessage to ProductMessage", e);
       }
   }
}

Также в коде Flink Job ClickToProductJob необходимо теперь инжектить бин SinkProvider<WrappedSinkMessage<ProductMessage>>.

Вывод

Вот мы и разобрались как динамически определять целевой топик для записи события в Kafka.

Теперь нужно все это протестировать. В следующей статье займемся Unit-тестированием всех операторов джобы. Описанные там подходы к тестированию будут применимы и в любых других приложениях, не связанных с Flink. Затем перейдем к тестированию всей Flink Job, а также E2E-тестированию.

Если остались вопросы — задавайте в комментариях!