Введение
В современном цифровом мире критически важна возможность обрабатывать данные в режиме реального времени и масштабировать приложения. Для этого хорошо подходит Kafka – платформа для распределённой потоковой обработки событий, особенно, если сочетать её с реактивным программированием. В данной статье будет рассказано, как создавать реактивные приложения при помощи этого инструментария.
Разберём основы
Прежде, чем перейти к деталям, давайте изучим некоторые ключевые концепции.
Kafka: Apache Kafka — это платформа для распределенной обработки потоков событий, применяемая для создания конвейеров данных, работающих в режиме реального времени, а также потоковых приложений. Kafka оперативно обрабатывает огромные объёмы данных. При работе с Kafka приложения получают возможность производить и потреблять потоки записей (событий или сообщений).
Реактивное программирование: это такой подход к обработке ПО, при котором акцент делается на обработке асинхронных потоков данных и на распространении изменений. Такой подход способствует отзывчивости, надёжности и масштабированию приложений.
Масштабируемость: это способность системы справляться с нарастающими рабочими нагрузками или потенциал для увеличения системы, чтобы подстроиться под этот рост. В контексте приложения масштабируемость — это способность справляться с растущим количеством пользователей, объёмами данных или новыми транзакциями без ущерба для производительности.
Потоковая обработка данных в режиме реального времени: это технология обработки данных по мере их поступления. Благодаря этому приложение может сразу же реагировать на изменения или события. Потоковая обработка обычно противопоставляется пакетной; в последнем случае данные аккумулируются и обрабатываются партиями через заранее намеченные интервалы.
Расстановка: Почему Kafka сочетается с реактивным программированием?
Kafka и реактивное программирование хорошо дополняют друг друга. Kafka предоставляет надёжный механизм для обработки потоков данных, а реактивное программирование позволяет эффективно и оперативно обрабатывать эти потоки. В совокупности две эти технологии удобны для создания масштабируемых приложений, работающих в режиме реального времени.
Ключевые достоинства:
- Высокая пропускная способность: Kafka с минимальными задержками обрабатывать большие объёмы данных.
- Отказоустойчивость: распределённая архитектура Kafka гарантирует репликацию данных и отказоустойчивость системы.
- Масштабируемость: как Kafka, так и реактивное программирование рассчитаны на горизонтальное масштабирование. Таким образом, можно добавлять всё новые и новые узлы, чтобы справляться с возрастающими нагрузками.
- Асинхронная обработка: поскольку реактивное программирование по своей природе асинхронное, в этой парадигме гарантируются неблокирующие операции и улучшается отзывчивость.
Настройка Kafka
Чтобы приступить к программированию масштабируемых приложений при помощи Kafka, сначала нужно настроить кластер Kafka. Кластер состоит из множества брокеров Kafka, и каждый из них отвечает за обработку потоков данных.
Пошаговая настройка:
- Скачиваем и устанавливаем Kafka:
- Переходим на страницу для скачивания Apache Kafka и берём там новейшую версию.
- Извлекаем загруженные файлы и переходим в каталог с Kafka.
- Запускаем Zookeeper:
- При работе Kafka опирается на Zookeeper, распределённый координационный сервис. Запустим Zookeeper следующей командой:
sh bin/zookeeper-server-start.sh config/zookeeper.properties
- Запускаем брокер Kafka:
- Когда Zookeeper уже работает, запускаем брокер Kafka:
sh bin/kafka-server-start.sh config/server.properties
- Создаём топик:
- Данные в Kafka организованы в виде топиков. Создадим новый топик так:
sh bin/kafka-topics.sh --create --topic my-topic --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1
- Начинаем производить и потреблять сообщения:
- Производим сообщения в топик:
sh bin/kafka-console-producer.sh --topic my-topic --bootstrap-server localhost:9092
- Потребляем сообщения из топика:
sh bin/kafka-console-consumer.sh --topic my-topic --bootstrap-server localhost:9092 --from-beginning
Знакомство с реактивным программированием
Концепции реактивного программирования:
- Асинхронность: операции являются неблокирующими, то есть, ни одна операция не должна дожидаться завершения другой операции.
- Движущая роль событий: код реагирует на события — то есть, на действия пользователя или на изменения данных.
- Наблюдаемые потоки: данные понимаются как потоки, которые можно наблюдать и на которые можно реагировать.
- Project Reactor: библиотека для реактивного программирования на Java, предоставляет мощный API для сборки асинхронных приложений, управляемых через события.
- RxJava: ещё одна популярная библиотека для реактивного программирования на Java, вдохновлённая проектом Reactive Extensions.
Простой реактивный пример
Рассмотрим простейший пример реактивного программирования, в котором используется Project Reactor.
Java
import reactor.core.publisher.Flux;
public class ReactiveExample {
public static void main(String[] args) {
Flux<String> dataStream = Flux.just("Hello", "Reactive", "World");
dataStream
.map(String::toUpperCase)
.subscribe(System.out::println);
}
}
Здесь Flux — это поток данных. Функция map преобразует каждый из элементов, а subscribe потребляет данные, выводя их в консоль.
Интеграция Kafka с реактивным программированием
Чтобы создать масштабируемое приложение, нужно интегрировать Kafka со средствами реактивного программирования. Для этого воспользуемся реактивным клиентом Kafka, который будет производить и потреблять сообщения.
Производство сообщений для Kafka
При помощи библиотеки reactor-kafka можно производить сообщения для Kafka, построив работу по реактивному принципу.
Зависимости:
Добавим следующие зависимости в файл pom.xml (для Maven):
Java
<dependency>
<groupId>io.projectreactor.kafka</groupId>
<artifactId>reactor-kafka</artifactId>
<version>1.3.5</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.7.0</version>
</dependency>
Реактивный продьюсер Kafka:
Java
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import reactor.core.publisher.Flux;
import reactor.kafka.sender.KafkaSender;
import reactor.kafka.sender.SenderOptions;
import reactor.kafka.sender.SenderRecord;
import java.util.HashMap;
import java.util.Map;
public class ReactiveKafkaProducer {
public static void main(String[] args) {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
SenderOptions<String, String> senderOptions = SenderOptions.create(props);
KafkaSender<String, String> sender = KafkaSender.create(senderOptions);
Flux<SenderRecord<String, String, String>> outboundFlux = Flux.range(1, 10)
.map(i -> SenderRecord.create(new ProducerRecord<>("my-topic", "key" + i, "value" + i), "correlationId" + i));
sender.send(outboundFlux)
.doOnError(e -> System.err.println("Send failed: " + e))
.doOnNext(r -> System.out.println("Message sent: " + r.correlationMetadata()))
.subscribe();
}
}
В этом примере мы настраиваем продьюсер Kafka при помощи реактивного программирования. Он отправляет десять сообщений в топик my-topic, занося в лог, как окончилась каждая операция — успешно или неуспешно.
Потребление сообщений из Kafka
Аналогично, можно реактивно потреблять сообщения из Kafka.
Реактивный потребитель Kafka:
Java
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import reactor.kafka.receiver.KafkaReceiver;
import reactor.kafka.receiver.ReceiverOptions;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
public class ReactiveKafkaConsumer {
public static void main(String[] args) {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "reactive-consumer-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
ReceiverOptions<String, String> receiverOptions = ReceiverOptions.create(props)
.subscription(Collections.singleton("my-topic"));
KafkaReceiver<String, String> receiver = KafkaReceiver.create(receiverOptions);
receiver.receive()
.doOnNext(record -> System.out.println("Received message: " + record.value()))
.subscribe();
}
}
В данном примере потребитель подписывается на my-topic и выводит в консоль все принятые сообщения.
Создаём масштабируемое приложение: разбор примера
Чтобы проиллюстрировать, как можно использовать Kafka в сочетании с реактивным программированием, рассмотрим практический пример: напишем систему, которая позволяет в режиме реального времени отслеживать движение целого парка грузовичков. Система должна обрабатывать данные о движении машин и показывать, где находится в данный момент каждая из них.
Компоненты системы
- Сенсоры грузовиков: датчики, установленные в автомобилях и отправляющие в систему данные о местоположении и обновлении статуса.
- Брокер Kafka: собирает информацию с датчиков и отправляет её в потоковом режиме.
- Реактивные микросервисы: обрабатывают и анализируют данные.
- Приложение для визуализации данных: демонстрирует пользователю данные, обновляемые в режиме реального времени.
1. Сенсоры грузовиков: сымитируем датчики, отправляющие информацию в Kafka.
Java
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import reactor.core.publisher.Flux;
import reactor.kafka.sender.KafkaSender;
import reactor.kafka.sender.SenderOptions;
import reactor.kafka.sender.SenderRecord;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
public class TruckSensorSimulator {
public static void main(String[] args) {
Map<String, Object> props = new HashMap<>();
props.put(Producer
Config.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
SenderOptions<String, String> senderOptions = SenderOptions.create(props);
KafkaSender<String, String> sender = KafkaSender.create(senderOptions);
Random random = new Random();
Flux<SenderRecord<String, String, String>> sensorDataFlux = Flux.interval(Duration.ofSeconds(1))
.map(tick -> {
String truckId = "truck-" + random.nextInt(10);
String location = "loc-" + random.nextInt(100);
String status = "status-" + random.nextInt(3);
String value = String.format("%s,%s,%s", truckId, location, status);
return SenderRecord.create(new ProducerRecord<>("truck-data", truckId, value), "correlationId" + tick);
});
sender.send(sensorDataFlux)
.doOnError(e -> System.err.println("Send failed: " + e))
.doOnNext(r -> System.out.println("Message sent: " + r.correlationMetadata()))
.subscribe();
}
}
2. Брокер Kafka: подготовим и запустим Kafka так, как описано выше.
3. Реактивные микросервисы: будем обрабатывать данные из Kafka и анализировать их в режиме реального времени.
Java
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import reactor.kafka.receiver.KafkaReceiver;
import reactor.kafka.receiver.ReceiverOptions;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
public class TruckDataProcessor {
public static void main(String[] args) {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "truck-data-processor-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
ReceiverOptions<String, String> receiverOptions = ReceiverOptions.create(props)
.subscription(Collections.singleton("truck-data"));
KafkaReceiver<String, String> receiver = KafkaReceiver.create(receiverOptions);
receiver.receive()
.doOnNext(record -> {
String[] data = record.value().split(",");
String truckId = data[0];
String location = data[1];
String status = data[2];
// Process and analyze the data (e.g., updating a database or triggering alerts)
System.out.println("Processed data for truck: " + truckId + ", location: " + location + ", status: " + status);
})
.subscribe();
}
}
4. Приложение для визуализации данных: в режиме реального времени показываем обработанные данные пользователю.
Чтобы создать такое приложение, можно воспользоваться веб-фреймворком, поддерживающим реактивное программирование, например, Spring Boot с WebFlux. Приложение подпишется на конечную точку WebSocket, чтобы получать обновления в режиме реального времени.
Настройка Spring Boot WebFlux:
Зависимости:
Добавим следующие зависимости в файл pom.xml (для Maven):
Java
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.7.0</version>
</dependency>
Конфигурация WebFlux:
Java
import org.springframework.context.annotation.Configuration;
import org.springframework.web.reactive.config.EnableWebFlux;
import org.springframework.web.reactive.config.WebFluxConfigurer;
@Configuration
@EnableWebFlux
public class WebFluxConfig implements WebFluxConfigurer {
// WebFlux configuration can be added here if needed
}
Конфигурация WebSocket:
Java
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.config.annotation.EnableWebSocket;
import org.springframework.web.socket.config.annotation.WebSocketConfigurer;
import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry;
@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {
@Override
public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
registry.addHandler(new TruckDataWebSocketHandler(), "/truck-data").setAllowedOrigins("*");
}
}
WebSocket Handler:
Java
import org.springframework.web.reactive.socket.WebSocketHandler;
import org.springframework.web.reactive.socket.WebSocketSession;
import org.springframework.web.reactive.socket.server.support.WebSocketHandlerAdapter;
import reactor.core.publisher.Mono;
public class TruckDataWebSocketHandler extends WebSocketHandlerAdapter implements WebSocketHandler {
@Override
public Mono<Void> handle(WebSocketSession session) {
// Simulate sending real-time updates to the client
return session.send(
session.receive()
.map(msg -> session.textMessage("Received: " + msg.getPayloadAsText()))
.doOnError(e -> System.err.println("WebSocket error: " + e))
);
}
}
Reactive Kafka Consumer Service:
Java
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.config.ContainerProperties;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Sinks;
import java.util.HashMap;
import java.util.Map;
@Service
public class TruckDataService {
private final Sinks.Many<String> sink = Sinks.many().multicast().onBackpressureBuffer();
@KafkaListener(topics = "truck-data", groupId = "truck-data-processor-group")
public void listen(String message) {
sink.tryEmitNext(message);
}
public Flux<String> getTruckDataStream() {
return sink.asFlux();
}
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "truck-data-processor-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props);
}
public ConcurrentMessageListenerContainer<String, String> kafkaListenerContainer() {
ContainerProperties containerProps = new ContainerProperties("truck-data");
return new ConcurrentMessageListenerContainer<>(consumerFactory(), containerProps);
}
}
REST-контроллер:
Java
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Flux;
@RestController
public class TruckDataController {
@Autowired
private TruckDataService truckDataService;
@GetMapping("/truck-data")
public Flux<String> getTruckData() {
return truckDataService.getTruckDataStream();
}
}
При такой конфигурации данные о передвижении машин в режиме реального времени отправляются на конечную точку WebSocket, к которой может подключаться приложение для визуализации данных. В нём мы и увидим обновления.
Заключение
Сочетая Kafka и реактивное программирование, можно создавать хорошо масштабируемые приложения, способные обрабатывать поступающие в режиме реального времени потоки данных и реагировать на них. Kafka обеспечивает надёжный механизм для обработки больших объёмов данных, а реактивное программирование гарантирует, что приложение будет оставаться отзывчивым, устойчивым и эффективным.
В этой статье было рассказано, как подготовить Kafka к работе, была приведена базовая информация о реактивном программировании, а также было объяснено, как интегрировать два этих инструмента и создать мониторинговую систему для работы в режиме реального времени. Если вы пишете приложение для мониторинга, обработки данных или других задач, решаемых в режиме реального времени, то именно Kafka и реактивное программирование помогут вам преуспеть в этом.
Освоив эти технологии, вы сможете создавать приложения, которые будут не только хорошо масштабироваться по мере роста запросов, но и позволят получать данные в режиме реального времени, а также обеспечат отзывчивость, которая нужна пользователям, работающим в современной динамичной цифровой среде.
P.S Обращаем ваше внимание на то, что у нас на сайте проходит распродажа.