
Введение
В современном цифровом мире критически важна возможность обрабатывать данные в режиме реального времени и масштабировать приложения. Для этого хорошо подходит Kafka – платформа для распределённой потоковой обработки событий, особенно, если сочетать её с реактивным программированием. В данной статье будет рассказано, как создавать реактивные приложения при помощи этого инструментария.
Разберём основы
Прежде, чем перейти к деталям, давайте изучим некоторые ключевые концепции.
Kafka: Apache Kafka — это платформа для распределенной обработки потоков событий, применяемая для создания конвейеров данных, работающих в режиме реального времени, а также потоковых приложений. Kafka оперативно обрабатывает огромные объёмы данных. При работе с Kafka приложения получают возможность производить и потреблять потоки записей (событий или сообщений).
Реактивное программирование: это такой подход к обработке ПО, при котором акцент делается на обработке асинхронных потоков данных и на распространении изменений. Такой подход способствует отзывчивости, надёжности и масштабированию приложений.
Масштабируемость: это способность системы справляться с нарастающими рабочими нагрузками или потенциал для увеличения системы, чтобы подстроиться под этот рост. В контексте приложения масштабируемость — это способность справляться с растущим количеством пользователей, объёмами данных или новыми транзакциями без ущерба для производительности.
Потоковая обработка данных в режиме реального времени: это технология обработки данных по мере их поступления. Благодаря этому приложение может сразу же реагировать на изменения или события. Потоковая обработка обычно противопоставляется пакетной; в последнем случае данные аккумулируются и обрабатываются партиями через заранее намеченные интервалы.
Расстановка: Почему Kafka сочетается с реактивным программированием?
Kafka и реактивное программирование хорошо дополняют друг друга. Kafka предоставляет надёжный механизм для обработки потоков данных, а реактивное программирование позволяет эффективно и оперативно обрабатывать эти потоки. В совокупности две эти технологии удобны для создания масштабируемых приложений, работающих в режиме реального времени.
Ключевые достоинства:
- Высокая пропускная способность: Kafka с минимальными задержками обрабатывать большие объёмы данных.
- Отказоустойчивость: распределённая архитектура Kafka гарантирует репликацию данных и отказоустойчивость системы.
- Масштабируемость: как Kafka, так и реактивное программирование рассчитаны на горизонтальное масштабирование. Таким образом, можно добавлять всё новые и новые узлы, чтобы справляться с возрастающими нагрузками.
- Асинхронная обработка: поскольку реактивное программирование по своей природе асинхронное, в этой парадигме гарантируются неблокирующие операции и улучшается отзывчивость.
Настройка Kafka
Чтобы приступить к программированию масштабируемых приложений при помощи Kafka, сначала нужно настроить кластер Kafka. Кластер состоит из множества брокеров Kafka, и каждый из них отвечает за обработку потоков данных.
Пошаговая настройка:
- Скачиваем и устанавливаем Kafka:
- Переходим на страницу для скачивания Apache Kafka и берём там новейшую версию.
- Извлекаем загруженные файлы и переходим в каталог с Kafka.
- Запускаем Zookeeper:
- При работе Kafka опирается на Zookeeper, распределённый координационный сервис. Запустим Zookeeper следующей командой:
sh bin/zookeeper-server-start.sh config/zookeeper.properties
- Запускаем брокер Kafka:
- Когда Zookeeper уже работает, запускаем брокер Kafka:
sh bin/kafka-server-start.sh config/server.properties
- Создаём топик:
- Данные в Kafka организованы в виде топиков. Создадим новый топик так:
sh bin/kafka-topics.sh --create --topic my-topic --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1
- Начинаем производить и потреблять сообщения:
- Производим сообщения в топик:
sh bin/kafka-console-producer.sh --topic my-topic --bootstrap-server localhost:9092 - Потребляем сообщения из топика:
sh bin/kafka-console-consumer.sh --topic my-topic --bootstrap-server localhost:9092 --from-beginning
Знакомство с реактивным программированием
Концепции реактивного программирования:
- Асинхронность: операции являются неблокирующими, то есть, ни одна операция не должна дожидаться завершения другой операции.
- Движущая роль событий: код реагирует на события — то есть, на действия пользователя или на изменения данных.
- Наблюдаемые потоки: данные понимаются как потоки, которые можно наблюдать и на которые можно реагировать.
- Project Reactor: библиотека для реактивного программирования на Java, предоставляет мощный API для сборки асинхронных приложений, управляемых через события.
- RxJava: ещё одна популярная библиотека для реактивного программирования на Java, вдохновлённая проектом Reactive Extensions.
Простой реактивный пример
Рассмотрим простейший пример реактивного программирования, в котором используется Project Reactor.
Java
import reactor.core.publisher.Flux; public class ReactiveExample { public static void main(String[] args) { Flux<String> dataStream = Flux.just("Hello", "Reactive", "World"); dataStream .map(String::toUpperCase) .subscribe(System.out::println); } }
Здесь Flux — это поток данных. Функция map преобразует каждый из элементов, а subscribe потребляет данные, выводя их в консоль.
Интеграция Kafka с реактивным программированием
Чтобы создать масштабируемое приложение, нужно интегрировать Kafka со средствами реактивного программирования. Для этого воспользуемся реактивным клиентом Kafka, который будет производить и потреблять сообщения.
Производство сообщений для Kafka
При помощи библиотеки reactor-kafka можно производить сообщения для Kafka, построив работу по реактивному принципу.
Зависимости:
Добавим следующие зависимости в файл pom.xml (для Maven):
Java
<dependency> <groupId>io.projectreactor.kafka</groupId> <artifactId>reactor-kafka</artifactId> <version>1.3.5</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.7.0</version> </dependency>
Реактивный продьюсер Kafka:
Java
import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.StringSerializer; import reactor.core.publisher.Flux; import reactor.kafka.sender.KafkaSender; import reactor.kafka.sender.SenderOptions; import reactor.kafka.sender.SenderRecord; import java.util.HashMap; import java.util.Map; public class ReactiveKafkaProducer { public static void main(String[] args) { Map<String, Object> props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); SenderOptions<String, String> senderOptions = SenderOptions.create(props); KafkaSender<String, String> sender = KafkaSender.create(senderOptions); Flux<SenderRecord<String, String, String>> outboundFlux = Flux.range(1, 10) .map(i -> SenderRecord.create(new ProducerRecord<>("my-topic", "key" + i, "value" + i), "correlationId" + i)); sender.send(outboundFlux) .doOnError(e -> System.err.println("Send failed: " + e)) .doOnNext(r -> System.out.println("Message sent: " + r.correlationMetadata())) .subscribe(); } }
В этом примере мы настраиваем продьюсер Kafka при помощи реактивного программирования. Он отправляет десять сообщений в топик my-topic, занося в лог, как окончилась каждая операция — успешно или неуспешно.
Потребление сообщений из Kafka
Аналогично, можно реактивно потреблять сообщения из Kafka.
Реактивный потребитель Kafka:
Java
import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.StringDeserializer; import reactor.kafka.receiver.KafkaReceiver; import reactor.kafka.receiver.ReceiverOptions; import java.util.Collections; import java.util.HashMap; import java.util.Map; public class ReactiveKafkaConsumer { public static void main(String[] args) { Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ConsumerConfig.GROUP_ID_CONFIG, "reactive-consumer-group"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); ReceiverOptions<String, String> receiverOptions = ReceiverOptions.create(props) .subscription(Collections.singleton("my-topic")); KafkaReceiver<String, String> receiver = KafkaReceiver.create(receiverOptions); receiver.receive() .doOnNext(record -> System.out.println("Received message: " + record.value())) .subscribe(); } }
В данном примере потребитель подписывается на my-topic и выводит в консоль все принятые сообщения.
Создаём масштабируемое приложение: разбор примера
Чтобы проиллюстрировать, как можно использовать Kafka в сочетании с реактивным программированием, рассмотрим практический пример: напишем систему, которая позволяет в режиме реального времени отслеживать движение целого парка грузовичков. Система должна обрабатывать данные о движении машин и показывать, где находится в данный момент каждая из них.
Компоненты системы
- Сенсоры грузовиков: датчики, установленные в автомобилях и отправляющие в систему данные о местоположении и обновлении статуса.
- Брокер Kafka: собирает информацию с датчиков и отправляет её в потоковом режиме.
- Реактивные микросервисы: обрабатывают и анализируют данные.
- Приложение для визуализации данных: демонстрирует пользователю данные, обновляемые в режиме реального времени.
1. Сенсоры грузовиков: сымитируем датчики, отправляющие информацию в Kafka.
Java
import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.StringSerializer; import reactor.core.publisher.Flux; import reactor.kafka.sender.KafkaSender; import reactor.kafka.sender.SenderOptions; import reactor.kafka.sender.SenderRecord; import java.util.HashMap; import java.util.Map; import java.util.Random; public class TruckSensorSimulator { public static void main(String[] args) { Map<String, Object> props = new HashMap<>(); props.put(Producer Config.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); SenderOptions<String, String> senderOptions = SenderOptions.create(props); KafkaSender<String, String> sender = KafkaSender.create(senderOptions); Random random = new Random(); Flux<SenderRecord<String, String, String>> sensorDataFlux = Flux.interval(Duration.ofSeconds(1)) .map(tick -> { String truckId = "truck-" + random.nextInt(10); String location = "loc-" + random.nextInt(100); String status = "status-" + random.nextInt(3); String value = String.format("%s,%s,%s", truckId, location, status); return SenderRecord.create(new ProducerRecord<>("truck-data", truckId, value), "correlationId" + tick); }); sender.send(sensorDataFlux) .doOnError(e -> System.err.println("Send failed: " + e)) .doOnNext(r -> System.out.println("Message sent: " + r.correlationMetadata())) .subscribe(); } }
2. Брокер Kafka: подготовим и запустим Kafka так, как описано выше.
3. Реактивные микросервисы: будем обрабатывать данные из Kafka и анализировать их в режиме реального времени.
Java
import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.StringDeserializer; import reactor.kafka.receiver.KafkaReceiver; import reactor.kafka.receiver.ReceiverOptions; import java.util.Collections; import java.util.HashMap; import java.util.Map; public class TruckDataProcessor { public static void main(String[] args) { Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ConsumerConfig.GROUP_ID_CONFIG, "truck-data-processor-group"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); ReceiverOptions<String, String> receiverOptions = ReceiverOptions.create(props) .subscription(Collections.singleton("truck-data")); KafkaReceiver<String, String> receiver = KafkaReceiver.create(receiverOptions); receiver.receive() .doOnNext(record -> { String[] data = record.value().split(","); String truckId = data[0]; String location = data[1]; String status = data[2]; // Process and analyze the data (e.g., updating a database or triggering alerts) System.out.println("Processed data for truck: " + truckId + ", location: " + location + ", status: " + status); }) .subscribe(); } }
4. Приложение для визуализации данных: в режиме реального времени показываем обработанные данные пользователю.
Чтобы создать такое приложение, можно воспользоваться веб-фреймворком, поддерживающим реактивное программирование, например, Spring Boot с WebFlux. Приложение подпишется на конечную точку WebSocket, чтобы получать обновления в режиме реального времени.
Настройка Spring Boot WebFlux:
Зависимости:
Добавим следующие зависимости в файл pom.xml (для Maven):
Java
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-webflux</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-websocket</artifactId> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.7.0</version> </dependency>
Конфигурация WebFlux:
Java
import org.springframework.context.annotation.Configuration; import org.springframework.web.reactive.config.EnableWebFlux; import org.springframework.web.reactive.config.WebFluxConfigurer; @Configuration @EnableWebFlux public class WebFluxConfig implements WebFluxConfigurer { // WebFlux configuration can be added here if needed }
Конфигурация WebSocket:
Java
import org.springframework.context.annotation.Configuration; import org.springframework.web.socket.config.annotation.EnableWebSocket; import org.springframework.web.socket.config.annotation.WebSocketConfigurer; import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry; @Configuration @EnableWebSocket public class WebSocketConfig implements WebSocketConfigurer { @Override public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) { registry.addHandler(new TruckDataWebSocketHandler(), "/truck-data").setAllowedOrigins("*"); } }
WebSocket Handler:
Java
import org.springframework.web.reactive.socket.WebSocketHandler; import org.springframework.web.reactive.socket.WebSocketSession; import org.springframework.web.reactive.socket.server.support.WebSocketHandlerAdapter; import reactor.core.publisher.Mono; public class TruckDataWebSocketHandler extends WebSocketHandlerAdapter implements WebSocketHandler { @Override public Mono<Void> handle(WebSocketSession session) { // Simulate sending real-time updates to the client return session.send( session.receive() .map(msg -> session.textMessage("Received: " + msg.getPayloadAsText())) .doOnError(e -> System.err.println("WebSocket error: " + e)) ); } }
Reactive Kafka Consumer Service:
Java
import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.StringDeserializer; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; import org.springframework.kafka.listener.config.ContainerProperties; import org.springframework.stereotype.Service; import reactor.core.publisher.Flux; import reactor.core.publisher.Sinks; import java.util.HashMap; import java.util.Map; @Service public class TruckDataService { private final Sinks.Many<String> sink = Sinks.many().multicast().onBackpressureBuffer(); @KafkaListener(topics = "truck-data", groupId = "truck-data-processor-group") public void listen(String message) { sink.tryEmitNext(message); } public Flux<String> getTruckDataStream() { return sink.asFlux(); } public ConsumerFactory<String, String> consumerFactory() { Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ConsumerConfig.GROUP_ID_CONFIG, "truck-data-processor-group"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); return new DefaultKafkaConsumerFactory<>(props); } public ConcurrentMessageListenerContainer<String, String> kafkaListenerContainer() { ContainerProperties containerProps = new ContainerProperties("truck-data"); return new ConcurrentMessageListenerContainer<>(consumerFactory(), containerProps); } }
REST-контроллер:
Java
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; import reactor.core.publisher.Flux; @RestController public class TruckDataController { @Autowired private TruckDataService truckDataService; @GetMapping("/truck-data") public Flux<String> getTruckData() { return truckDataService.getTruckDataStream(); } }
При такой конфигурации данные о передвижении машин в режиме реального времени отправляются на конечную точку WebSocket, к которой может подключаться приложение для визуализации данных. В нём мы и увидим обновления.
Заключение
Сочетая Kafka и реактивное программирование, можно создавать хорошо масштабируемые приложения, способные обрабатывать поступающие в режиме реального времени потоки данных и реагировать на них. Kafka обеспечивает надёжный механизм для обработки больших объёмов данных, а реактивное программирование гарантирует, что приложение будет оставаться отзывчивым, устойчивым и эффективным.
В этой статье было рассказано, как подготовить Kafka к работе, была приведена базовая информация о реактивном программировании, а также было объяснено, как интегрировать два этих инструмента и создать мониторинговую систему для работы в режиме реального времени. Если вы пишете приложение для мониторинга, обработки данных или других задач, решаемых в режиме реального времени, то именно Kafka и реактивное программирование помогут вам преуспеть в этом.
Освоив эти технологии, вы сможете создавать приложения, которые будут не только хорошо масштабироваться по мере роста запросов, но и позволят получать данные в режиме реального времени, а также обеспечат отзывчивость, которая нужна пользователям, работающим в современной динамичной цифровой среде.
P.S Обращаем ваше внимание на то, что у нас на сайте проходит распродажа.
