Как стать автором
Обновить

Kafka Streams ч1: Привет, мир

Уровень сложностиПростой
Время на прочтение4 мин
Количество просмотров7.5K

Привет, сообщество Хабр =)

Начав изучать Kafka Streams, я заметил, что для решения различных задач приходится искать информацию по разным источникам, поэтому со временем накопилось много собственных конспектов. Хочу поделиться ими в виде серии туториалов на Хабре.

Несмотря на обилие ресурсов по Kafka Streams и отличные статьи на Хабре [ноль, один, два], мне не хватало пошаговых руководств, которые детально раскрывают изъяны и преимущества этой технологии. Поэтому решил создать такой материал, чтобы помочь другим разобраться структурно и последовательно.

💡 Не буду рассматривать, что такое Apache Kafka. Предполагается, что вы уже знакомы с данным брокером.

Я не сразу понял ценность данной библиотеки (Kafka Streams это часть Apache Kafka) как бы приложение все равно читает сообщение применяет набор действий к сообщению и записывает снова в тот или другой топик, или в совсем другой источник данных, в общем на первый взгляд будто бы никакой разницы, но с погружением в детали начинаешь понимать почему сделали этот инструмент и почему он удобен и востребован.

Без Kafka Streams

Установим нужный пакет

implementation("org.apache.kafka:kafka-clients:3.8.0")

При прямом использовании Apache Kafka нужно использовать KafkaConsumer для чтения сообщений из топика. Вот простой пример:

import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
import java.util.Collections;

public class SimpleKafkaConsumerProducer {
    public static void main(String[] args) {
        // Настройки для consumer
        Properties consumerProps = new Properties();
        consumerProps.put("bootstrap.servers", "localhost:9092");
        consumerProps.put("group.id", "my-group");
        consumerProps.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        consumerProps.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        // Настройки для producer
        Properties producerProps = new Properties();
        producerProps.put("bootstrap.servers", "localhost:9092");
        producerProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        producerProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        var consumer = new KafkaConsumer<String, String>(consumerProps);
        var producer = new KafkaProducer<String, String>(producerProps);

        consumer.subscribe(Collections.singletonList("input-topic"));

        try {
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(100);

                for (ConsumerRecord<String, String> record : records) {
                    // Обработка сообщения
                    String originalValue = record.value();
                    String processedValue = originalValue.toUpperCase();

                    // Отправка обработанного сообщения в другой топик
                    producer.send(new ProducerRecord<>("output-topic", record.key(), processedValue));

                    System.out.println("Обработанное сообщение: " + processedValue);
                }
            }
        } finally {
            consumer.close();
            producer.close();
        }
    }
}
  • Чтение сообщения: Используем KafkaConsumer для чтения сообщений из input-topic.

  • Обработка сообщения: В данном случае просто преобразуем текст сообщения в верхний регистр.

  • Запись сообщения: Используем KafkaProducer для отправки обработанного сообщения в output-topic.

С Kafka Streams

Установим нужный пакет

implementation("org.apache.kafka:kafka-streams:3.8.0")

Теперь тот же функционал с использованием Kafka Streams:

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.StreamsConfig;
import java.util.Properties;

public class SimpleKafkaStreamsApp {
    public static void main(String[] args) {
        // Настройки для Kafka Streams
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-app");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        // чтобы читать данные с начала топика в явном виде указываем начальное смещение
        props.put("consumer.auto.offset.reset", "earliest");

        // Строим топологию потоков
        StreamsBuilder builder = new StreamsBuilder();

        // Читаем данные из входного топика
        KStream<String, String> source = builder.stream("input-topic");

        // Обрабатываем данные (преобразуем в верхний регистр)
        KStream<String, String> processed = source.mapValues(value -> value.toUpperCase());

        // Отправляем обработанные данные в выходной топик
        processed.to("output-topic");

        // Создаем и запускаем Kafka Streams приложение
        KafkaStreams streams = new KafkaStreams(builder.build(), props);
        streams.start();

        // Добавляем shutdown hook для корректного завершения работы
        Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
    }
}
  • Чтение и обработка данных: Используем StreamsBuilder для построения топологии потоков. Метод mapValues применяется для преобразования значения сообщения.

  • Запись данных: Обработанные данные автоматически отправляются в output-topic с помощью метода to.

Без Kafka Streams:

  • Нужно самостоятельно управлять потребителем и производителем.

  • Больше кода для настройки и управления.

  • Менее декларативный подход.

С Kafka Streams:

  • Управление потоками данных абстрагировано библиотекой.

  • Код более декларативен и фокусируется на логике обработки.

  • Легче масштабировать и добавлять сложные преобразования.

Визуальная схема, того почему обработка данных с помощью Kafka Streams проще и эффективнее.

Kafka Streams
Kafka Streams

На схеме видно, что при прямом использовании Kafka нам приходится самостоятельно управлять потребителями и производителями, что усложняет код и архитектуру приложения. С Kafka Streams же мы работаем с более высоким уровнем абстракции, где многие детали управления потоками данных уже реализованы за нас. Это позволяет сосредоточиться на логике обработки данных, а не на инфраструктурных деталях.

Пока рано делать окончательные выводы — это лишь верхушка айсберга. Чем глубже мы погружаемся в Kafka Streams, тем более очевидной становится её ценность. Продолжение следует в следующей статье.

Теги:
Хабы:
Всего голосов 2: ↑2 и ↓0+4
Комментарии3

Публикации

Работа

Java разработчик
200 вакансий

Ближайшие события