Apache Flink: динамическое определение выходного топика в Kafka
Всем привет, меня зовут Александр Бобряков. Я техлид в команде МТС Аналитики, занимаюсь Real-Time обработкой данных. Мы начали использовать фреймворк Apache Flink, и я решил поделиться на Хабре своим опытом внедрения этой технологии в цикле статей.
В предыдущей статье — «Apache Flink. Как работает дедупликация данных в потоке Kafka-to-Kafka?» — я рассказывал про построение пайплайна Kafka-to-Kafka с промежуточным разделением потока и дедупликацией событий. Также разобрались, что такое состояние оператора и зачем оно нужно.
В этой статье добавим возможность динамического определения топика в Kafka для каждого события, куда его нужно записать.
Список моих статей про Flink:
Весь разбираемый исходный код можно найти в репозитории. Этот пост соответствует релизной ветке с названием release/3_WrappedSinkMessage.
Оглавление статьи:
Постановка задачи динамически определяемого топика Kafka Sink
В предыдущей статье мы построили основной пайплайн, но все итоговые события ProductMessage попадали в единственный общий топик. Теперь немного усложним логику. Напомню, что входное сообщение ClickMessage имеет вид:
@Data
@Builder
@Jacksonized
@JsonIgnoreProperties(ignoreUnknown = true)
public class ClickMessage {
@JsonProperty(required = true)
@JsonPropertyDescription("User id")
private UUID userId;
@JsonPropertyDescription("Clicked object")
private String object;
@JsonPropertyDescription("User platform")
@JsonDeserialize(using = Platform.Deserializer.class)
private Platform platform;
@JsonPropertyDescription("Product name")
private String productName;
@JsonPropertyDescription("Product topic")
private String productTopic;
@JsonProperty(required = true)
@JsonPropertyDescription("Timestamp")
private Long timestamp;
@JsonPropertyDescription("Additional data")
private Map<String, Object> data;
}
Теперь бизнес-клиентам захотелось бы записывать соответствующее сообщение ProductMessage (полученное преобразованием над ClickMessage) в топик, указанный в поле ClickMessage.productTopic, чтобы каждый продукт (клиент) мог подключаться и читать сообщения только для себя.
Статическая реализация определения топика Kafka Sink
Также в предыдущей части мы реализовали Kafka Sink. Давайте вспомним как он выглядел (за подробными объяснениями кода обратитесь к предыдущей статье), и доработаем его:
@Component
@RequiredArgsConstructor
public class ProductMessageKafkaSinkProvider implements SinkProvider<ProductMessage> {
private final KafkaProperties kafkaProperties;
private final SerializationSchema<ProductMessage> serializationProductMessageSchema;
@Override
public Sink<ProductMessage> createSink() {
return KafkaSink.<ProductMessage>builder()
.setBootstrapServers(kafkaProperties.getBootstrapServers())
.setRecordSerializer(KafkaRecordSerializationSchema.<ProductMessage>builder()
.setTopic(kafkaProperties.getTopics().getProductTopic())
.setValueSerializationSchema(serializationProductMessageSchema)
.build())
.setDeliveryGuarantee(NONE)
.build();
}
}
В дополнение к этому Kafka Sink предоставляет класс-сериализатор, который отвечает за способ сериализации сообщения ProductMessage перед отправкой в топик Kafka с помощью Jackson:
@Component
@RequiredArgsConstructor
class ProductMessageSerializationSchema implements SerializationSchema<ProductMessage> {
private static final long serialVersionUID = 1;
private transient ObjectMapper objectMapper;
@Override
public void open(InitializationContext context) {
objectMapper = createObjectMapper();
}
@Override
@SneakyThrows
public byte[] serialize(ProductMessage element) {
return objectMapper.writeValueAsBytes(element);
}
}
В Kafka Sink мы статически определяем выходной топик setTopic(). Он задан в наших application.yml настройках. Теперь это поведение нужно изменить — Kafka Sink должен динамически определить топик, в который нужно записать событие ProdcutMessage.
Для начала нам поможет метод setTopicSelector() вместо предыдущего setTopic(). Реализация TopicSelector динамически определяет топик на основе текущего записываемого события, определяемого в лямбда-функции.
/**
* Selects a topic for the incoming record.
*
* @param <IN> type of the incoming record
*/
@PublicEvolving
public interface TopicSelector<IN> extends Function<IN, String>, Serializable {}
Проблема в том, что в выходном ProductMessage нет информации об этом топике. А топик хранится в поле входного сообщения ClickMessage.productTopic. Значит — нам нужна обертка, которая будет содержать сам ProductMessage и важную мета-информацию.
WrappedSinkMessage — обертка над отправляемым событием
Чтобы сохранить информацию о топике и самом событии сделаем обертку WrappedSinMessage:
@Getter
@EqualsAndHashCode
@RequiredArgsConstructor
public class WrappedSinkMessage<T> implements Serializable {
private static final long serialVersionUID = 1L;
private final Meta meta;
private final T message;
@Getter
@EqualsAndHashCode
@RequiredArgsConstructor
public static class Meta implements Serializable {
private static final long serialVersionUID = 1L;
public final String targetTopicName;
}
}
В ней мы храним само целевое сообщение ProductMessage для отправки в Sink, а также дополнительную meta-информацию — в нашем случае только целевой топик targetTopicName. Поэтому перед непосредственным применением sinkTo() в пайплайне нам нужно будет привести входное сообщение ClickMessage к этой обертке. В поле метаданных можно также хранить любую нужную вам информацию. Например, мы добавляем в него данные, требующиеся для метрик, такие как latency обработки, типы обрабатываемых событий и так далее.
Забегая вперед, скажу, что такой подход — обертка для события еще на старте — очень хорошо себя зарекомендовал. Потому что во многих задачах нужно «нести» информацию о текущем событии до самого конца пайплайна, изменяя исходное событие сколько угодно раз
Динамическая реализация определения топика Kafka Sink
Теперь нужно изменить Kafka Sink и алгоритм сериализации сообщения, ведь к нам теперь поступает WrappedSinMessage с целевымProductMessage внутри. Взглянем на изменение сериализатора:
@Component
@RequiredArgsConstructor
class WrappedProductMessageSerializationSchema implements SerializationSchema<WrappedSinkMessage<ProductMessage>> {
private static final long serialVersionUID = 1;
private transient ObjectMapper objectMapper;
@Override
public void open(InitializationContext context) {
objectMapper = createObjectMapper();
}
@Override
@SneakyThrows
public byte[] serialize(WrappedSinkMessage<ProductMessage> element) {
return objectMapper.writeValueAsBytes(element.getMessage());
}
}
В классе изменилась только сериализация непосредственно внутреннего ProductMessage в строчке element.getMessage(). Тут все довольно прозрачно. А вот в самом создании Kafka Sink мы имеем доступ к WrappedSinkMessage, из которого легко получить сам целевой топик:
@Component
@RequiredArgsConstructor
public class WrappedProductMessageKafkaSinkProvider implements SinkProvider<WrappedSinkMessage<ProductMessage>> {
private final KafkaProperties kafkaProperties;
private final SerializationSchema<WrappedSinkMessage<ProductMessage>> serializationProductMessageSchema;
@Override
public Sink<WrappedSinkMessage<ProductMessage>> createSink() {
return KafkaSink.<WrappedSinkMessage<ProductMessage>>builder()
.setBootstrapServers(kafkaProperties.getBootstrapServers())
.setRecordSerializer(KafkaRecordSerializationSchema.<WrappedSinkMessage<ProductMessage>>builder()
.setTopicSelector(wrappedMessage -> wrappedMessage.getMeta().getTargetTopicName())
.setValueSerializationSchema(serializationProductMessageSchema)
.build())
.setDeliveryGuarantee(NONE)
.build();
}
}
Основное изменение только в строке с методом setTopicSelector() с приведением остальных типов из ProductMessage к WrappedSinkMessage<ProductMessage>. Значит, можем убрать в рамках репозитория настройку kafka.topics.product-topic.
Трансформация входного события к обертке
Итак, у нас был класс для трансформации входного ClickMessage в выходной ProductMessage. А сейчас нам нужно преобразовать ClickMessage в целевой WrappedSinkMessage<ProductMessage>. Для этого мы совсем немного дополним предыдущую реализацию:
@Slf4j
@RequiredArgsConstructor
public class ClickMessageToWrappedProductSinkMessageMapFunction implements FlatMapFunction<ClickMessage, WrappedSinkMessage<ProductMessage>> {
private static final long serialVersionUID = 1L;
@Override
public void flatMap(ClickMessage clickMessage, Collector<WrappedSinkMessage<ProductMessage>> out) {
try {
final var productMessage = ProductMessage.builder()
.userId(clickMessage.getUserId())
.productName(clickMessage.getProductName())
.object(clickMessage.getObject())
.platform(clickMessage.getPlatform())
.timestamp(clickMessage.getTimestamp())
.build();
final var wrappedMessage = new WrappedSinkMessage<>(
new WrappedSinkMessage.Meta(clickMessage.getProductTopic()),
productMessage);
out.collect(wrappedMessage);
} catch (Exception e) {
log.error("Error converting ClickMessage to ProductMessage", e);
}
}
}
Также в коде Flink Job ClickToProductJob необходимо теперь инжектить бин SinkProvider<WrappedSinkMessage<ProductMessage>>.
Вывод
Вот мы и разобрались как динамически определять целевой топик для записи события в Kafka.
Теперь нужно все это протестировать. В следующей статье займемся Unit-тестированием всех операторов джобы. Описанные там подходы к тестированию будут применимы и в любых других приложениях, не связанных с Flink. Затем перейдем к тестированию всей Flink Job, а также E2E-тестированию.
Если остались вопросы — задавайте в комментариях!