DSL — это то, что делает Kafka Streams таким простым и позволяет подняться на более высокий уровень абстракции. В прошлой статье мы рассматривали следующий код:
KStream<String, String> source = builder.stream("input-topic"); KStream<String, String> processed = source.mapValues(value -> value.toUpperCase()); processed.to("output-topic");
Всё это — часть DSL Kafka Streams. Теперь хотелось бы глубже погрузиться в его особенности и в следующих статьях ещё шире исследовать инструментарий.
Архитектурно Kafka Streams строится на топологии обработчиков, опираясь на парадигму Dataflow Programming (DFP). Вместо классического построения программы в виде последовательности шагов используется Directed Acyclic Graph (DAG) — направленный ациклический граф.

Узел источника (Source Processor) — точка, откуда поступают данные.
Узел обработчика потока (Stream Processor) — место применения логики обработки данных, таких методов как filter, map, flatMap и других.
Узел приёмника (Sink Processor) — куда направляются отфильтрованные, обогащённые и преобразованные данные.
Данные проходят по топологии, и за один проход через неё обрабатывается исключительно одна запись. Например, такая схема работать не будет:

В итоге у нас выстраивается пирамида абстракций: чем выше уровень абстракции, тем меньше контроля над деталями. В Kafka Streams попытались объединить преимущества разных уровней, чтобы «усидеть на двух стульях одновременно».

DSL прекрасно подходит для работы с данными — различных преобразований и трансформаций. Однако для работы с метаданными, возможности планировать периодичность функций и контролировать время выполнения определённых операций уже требуют обращения к низкоуровневому API узлов (Processor API).
Настройка Apache Kafka и наполнение данных
Для начала необходимо запустить Apache Kafka. Я использую Docker и запускаю Kafka вместе с UI с помощью docker-compose.yml. Вот конфигурация:
version: '3.8' services: kafka: image: confluentinc/confluent-local:7.5.6.arm64 hostname: kafka container_name: kafka ports: - "8082:8082" - "9092:9092" - "9101:9101" environment: KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092' KAFKA_CONTROLLER_QUORUM_VOTERS: '1@kafka:29093' KAFKA_LISTENERS: 'PLAINTEXT://kafka:29092,CONTROLLER://kafka:29093,PLAINTEXT_HOST://0.0.0.0:9092' kafka-ui: image: provectuslabs/kafka-ui:latest ports: - 9999:8080 environment: DYNAMIC_CONFIG_ENABLED: true KAFKA_CLUSTERS_0_NAME: local KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:29092
Примечание: Если вы используете другую платформу или настройки, адаптируйте конфигурацию под себя.
Запустите Docker Compose
docker-compose up -d
Создадим топик с названием users-topic и добавим в него несколько значений: Bob, Alice, John, Jake, Nancy.
Можно использовать Kafka UI для создания топика и добавления сообщений. Если вы предпочитаете командную строку, используйте следующие команды:
# Создание топика docker exec -it kafka kafka-topics --create --topic users-topic --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1 # Отправка сообщений в топик docker exec -it kafka kafka-console-producer --topic users-topic --bootstrap-server localhost:9092
После выполнения последней команды введите имена по одному в каждой строке:
Bob Alice John Jake Nancy
Нажмите Ctrl+C для выхода из продюсера.
Теперь у нас есть настроенная среда и данные для дальнейшего изучения возможностей Kafka Streams.
Конфигурация репозитория
Теперь нам нужна правильная конфигурация build.gradle.kts
plugins { id("java") id("application") } group = "kz.nitec" version = "1.0-SNAPSHOT" repositories { mavenCentral() } dependencies { implementation("org.apache.kafka:kafka-streams:3.8.0") } tasks.test { useJUnitPlatform() } // Конфигурация для запуска приложения для DSL tasks.register<JavaExec>("runDSL") { mainClass.set("kz.nitec.DslExample") classpath = sourceSets["main"].runtimeClasspath } // Конфигурация для запуска приложения для Processor API tasks.register<JavaExec>("runProcessorApi") { mainClass.set("kz.nitec.ProcessorApiExample") classpath = sourceSets["main"].runtimeClasspath }
./gradlew build библиотека Kafka Stream добавлена и готова к использованию теперь можно начать писать код для приложения
DSL решение
Создайте новый класс DslExample со следующим исходным кодом:
import java.util.Properties; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.kstream.KStream; public class DslExample { public static void main(String[] args) { // Настройка свойств приложения Properties props = new java.util.Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "dsl-example-app"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); // чтобы читать данные с начала топика в явном виде указываем начальное смещение props.put("consumer.auto.offset.reset", "earliest"); // Создание билдера для построения топологии StreamsBuilder builder = new StreamsBuilder(); // Чтение данных из входного топика KStream<String, String> source = builder.stream("users-topic"); // Обработка данных и вывод в консоль source.foreach((key, value) -> System.out.println("Hello, " + value)); // Создание и запуск Kafka Streams приложения KafkaStreams streams = new KafkaStreams(builder.build(), props); streams.start(); // Добавление shutdown hook для корректного завершения работы Runtime.getRuntime().addShutdownHook(new Thread(streams::close)); } }
после запуска ./gradlew runDSL --info в консоли будут следующие значения
Hello Bob Hello Alice ... остальные имена
Processor API решение
Processor API предоставляет более низкоуровневый доступ к обработке потоков в Kafka Streams, что даёт больше контроля над процессом обработки данных.
import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.processor.*; import java.util.Properties; public class ProcessorApiExample { public static void main(String[] args) { // Настройка свойств приложения Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "processor-api-example-app"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); // Чтобы читать данные с начала топика, указываем начальное смещение props.put(StreamsConfig.consumerPrefix("auto.offset.reset"), "earliest"); // Создание топологии var topology = new Topology(); // Добавляем источник (source) - читаем из топика "users-topic" topology.addSource("Source", "users-topic") // Добавляем процессор (processor) для обработки сообщений .addProcessor("Processor", (ProcessorSupplier<String, String>) () -> new Processor<>() { @Override public void init(org.apache.kafka.streams.processor.ProcessorContext context) { // Инициализация процессора } @Override public void process(String key, String value) { // Обработка сообщения и вывод в консоль System.out.println("Hello, " + value); } @Override public void close() { // Закрытие ресурсов при необходимости } }, "Source"); // Создание и запуск Kafka Streams приложения KafkaStreams streams = new KafkaStreams(topology, props); streams.start(); // Добавление shutdown hook для корректного завершения работы Runtime.getRuntime().addShutdownHook(new Thread(streams::close)); } }
после запуска ./gradlew runProcessorApi --info в консоли будет идентичный результат как и при DSL примере.
DSL или Processor API
Processor API предоставляет более низкоуровневый доступ к обработке потоков в Kafka Streams. Это даёт больше контроля и гибкости, но требует большего объёма кода и глубокого понимания внутренних механизмов системы.
В конкретном случае удобнее использовать DSL, так как он предоставляет простой доступ к абстракциям потоков (Streams) и таблиц (Tables). DSL позволяет писать более понятный и лаконичный код, сосредотачиваясь на логике обработки данных, а не на инфраструктурных деталях. Это упрощает разработку и снижает вероятность ошибок, делая процесс более эффективным.
