Как стать автором
Обновить

Spring Boot приложение с использованием Kafka Streams

Время на прочтение6 мин
Количество просмотров13K
Привет, Хабр! В этой статье мы рассмотрим, как в МегаФоне производят потоковую обработку данных, и разработаем простое Spring Boot приложение с использованием Kafka Streams.


У нас в компании есть домен «Фабрика микросервисов» (MFactory) — подразделение, которое занимается исследованием и разработкой приложений на базе микросервисной архитектуры. Для того, чтобы микросервисы могли «общаться» между собой, мы используем брокеры сообщений RabbitMQ и Kafka, но сегодня мы поговорим подробнее о Kafka, так как именно этот брокер реализует очень мощный инструмент для потоковой обработки данных — Kafka Streams.

Рассмотрим стандартный подход к обработке данных: получаем поток данных, помещаем их в базу данных, после чего подключаемся к базе, выгружаем данные и обрабатываем. Но существует ряд задач, в которых мы можем увеличить производительность, обрабатывая поток данных в режиме реального времени, минуя базу данных. Для их решения используем интерфейс Kafka Streams. Но для начала, вспомним, что такое Kafka.



Kafka — это распределённый программный брокер сообщений. «Распределённый» значит, что брокер может работать синхронно на нескольких серверах, образуя кластер. Kafka состоит из генератора сообщений, потребителя сообщений и топика. Топик — это файл, в котором входящие записи добавляются в конец файла. Другими словами, топик — это упорядоченный по времени журнал сообщений.

Рассмотрим следующую задачу. Мы получаем поток данных с информацией об изменении баланса абонента в формате JSON — {«phone_number»:number,«balance»:balance} и записываем его в топик “src”. И нам нужно в реальном времени отфильтровать сообщения, в которых баланс меньше либо равен нулю.



Теперь рассмотрим, что же такое Kafka Streams. Apache Kafka Streams API — это клиентская библиотека для создания приложений и микросервисов, которые позволяют в режиме реального времени обрабатывать данные, хранящиеся в кластерах Kafka. Kafka Streams сочетает в себе простоту написания и развертывания стандартных приложений Java и Scala на стороне клиента с преимуществами серверной кластерной технологии Kafka.



Устройство Kafka Streams можно представить в виде графа (топологии обработки):

  1. узел-обработчик, который подписывается на топик и вычитывает входящие сообщения;
  2. узел-обработчик, который реализует бизнес-логику;
  3. узел-обработчик, который отправляет результат обработки в топик.

Представим следующую задачу. Мы получаем поток информации об изменении баланса абонентов в формате JSON вида {«phone_number»:number,«balance»:balance} и записываем его в топик “src”. Нам нужно в реальном времени отфильтровать сообщения, в которых баланс меньше и равен нулю, и записать результат в топик “out”, на который подписан сервис для обработки данных сообщений. Сымитируем подобный поток, используя консольный генератор Kafka:

PS C:\Users\User> cd C:\kafka_2.12-2.5.0\
PS C:\kafka> bin\windows\kafka-topics.bat --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic src
Created topic src.
PS C:\kafka> bin\windows\kafka-topics.bat --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic out
Created topic out.
PS C:\kafka> bin\windows\kafka-topics.bat --list --bootstrap-server localhost:9092
out
src
PS C:\kafka> bin\windows\kafka-console-producer.bat --bootstrap-server localhost:9092 --topic src
>{"phone_number":79301,"balance":100}
>{"phone_number":79302,"balance":0}
>{"phone_number":79303,"balance":-50}
>

Теперь разработаем простое Spring Boot приложение и посмотрим на Kafka Streams в действии. Для начала определим следующие зависимости:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-streams</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

Создадим объект User:

package ru.megafon.kafkastreamsdemo.model;

import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.Getter;

@Getter
public class User {
    @JsonProperty("phone_number")
    private Long phoneNumber;
    @JsonProperty("balance")
    private Double balance;
}

Напишем конфигурацию нашего приложения:

package ru.megafon.kafkastreamsdemo.config;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Produced;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.annotation.EnableKafkaStreams;
import org.springframework.kafka.annotation.KafkaStreamsDefaultConfiguration;
import org.springframework.kafka.config.KafkaStreamsConfiguration;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import org.springframework.kafka.support.serializer.JsonSerializer;
import ru.megafon.kafkastreamsdemo.model.User;

import java.util.HashMap;
import java.util.Map;

@Slf4j
@Configuration
@EnableKafka
@EnableKafkaStreams
public class KafkaStreamsConfig {
    @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
    public KafkaStreamsConfiguration kStreamsConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "id");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        return new KafkaStreamsConfiguration(props);
    }

    @Bean
    public Serde<User> userSerde() {
        return Serdes.serdeFrom(new JsonSerializer<>(), new JsonDeserializer<>(User.class));
    }

    @Bean
    public KStream<String, User> kStream(StreamsBuilder kStreamBuilder) {
        KStream<String, String> stream = kStreamBuilder
                .stream("src", Consumed.with(Serdes.String(), Serdes.String()));
        KStream<String, User> userStream = stream
                .mapValues(this::getUserFromString)
                .filter((key, value) -> value.getBalance() <= 0);
        userStream.to("out", Produced.with(Serdes.String(), userSerde()));
        return userStream;
    }

    @Bean
    public ObjectMapper objectMapper() {
        return new ObjectMapper();
    }

    User getUserFromString(String userString) {
        User user = null;
        try {
            user = objectMapper().readValue(userString, User.class);
        } catch (JsonProcessingException e) {
            log.error(e.getMessage(), e);
        }
        return user;
    }
}

Рассмотрим подробнее каждый метод:

В методе kStreamsConfigs мы создаём минимальную конфигурацию для подключения нашего приложения к серверу Kafka: устанавливаем идентификатор нашего приложения и адрес сервера брокера, дефолтный сериализатор/десериализатор.

В методе userSerde мы определили собственный сериализатор/десериализатор для модели User.
В методе kStream мы создаём топологию нашего Kafka Streams приложения и возвращаем объект KStream<String, User>. Основным объектом для создания топологии является объект StreamsBuilder:

  1. в методе stream мы подписываемся на топик “src” и вычитываем входящие сообщения;
  2. в методе filter мы реализовали нашу бизнес-логику — фильтрацию;
  3. в методе to мы отправляем результат обработки в результирующий топик “out”.

Запускаем приложение и подписываемся на топик “out”:

PS C:\Users\User> cd C:\kafka_2.12-2.5.0\
PS C:\kafka> bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic out --from-beginning
{"phone_number":79302,"balance":0.0}
{"phone_number":79303,"balance":-50.0}

Мы видим, что данные успешно обработаны.

Преимущества Kafka Streams:

  • простая и легковесная библиотека, которая может быть легко встроена в любое Java-приложение и интегрирована с инструментами упаковки, развертывания и эксплуатации;
  • не имеет внешних зависимостей от систем кроме самого Apache Kafka;
  • поддерживает отказоустойчивое локальное состояние, что обеспечивает очень быстрые и эффективные операции с сохранением состояния;
  • поддерживает правило “обработка ровно один раз”, чтобы гарантировать, что каждая запись будет обработана один и только один раз, даже если в процессе обработки происходит сбой на клиентах Kafka Streams или брокерах Kafka;
  • использует обработку “по одной записи за раз” для достижения миллисекундной задержки.
Теги:
Хабы:
+3
Комментарии5

Публикации

Информация

Сайт
job.megafon.ru
Дата регистрации
Дата основания
Численность
свыше 10 000 человек
Местоположение
Россия