Данная статья является продолжением статьи - Ивентная модель данных с использованием Kafka и Kafka Connect: Построение гибкой и распределенной архитектуры.
Добро пожаловать во вторую часть статьи о построении гибкой и распределенной архитектуры с использованием Apache Kafka и Kafka Connect! В первой части мы ознакомились с ивентной моделью данных, разработали сервис отправителя и настроили интеграцию с Kafka, чтобы асинхронно отправлять сообщения. Теперь настало время рассмотреть вторую часть этого увлекательного проекта.
В этой части статьи мы сфокусируемся на реализации получателя на Java, который будет получать Avro-схемы из Schema Registry и читать сообщения из Kafka, отправленные сервисом отправителя через Kafka Connect. Получатель будет обрабатывать сообщения из топика, обогащать их своими данными и сохранять результаты в своей базе данных.
Цели статьи:
Разработка Java-приложения в качестве получателя данных из Kafka.
Изучение работы с Avro-схемами и Schema Registry для обеспечения совместимости сообщений.
Обработка с��общений из Kafka с использованием асинхронного подхода.
Интеграция с PostgreSQL для сохранения обработанных данных.
Подготовка Helm Chart для развертывания отправителя и получателя в Kubernetes.
Конфигурирование Kubernetes для обеспечения масштабируемости и отказоустойчивости архитектуры.
Подготовка Java-приложения в качестве получателя
Для начала подготовим наш POM файл
pom.xml
<dependencies> <dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> <version>32.1.1-jre</version> </dependency> <dependency> <groupId>io.confluent</groupId> <artifactId>kafka-avro-serializer</artifactId> <version>7.4.0</version> </dependency> <dependency> <groupId>io.confluent</groupId> <artifactId>kafka-schema-registry-client</artifactId> <version>7.4.0</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-jpa</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-actuator</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> <dependency> <groupId>org.postgresql</groupId> <artifactId>postgresql</artifactId> <scope>runtime</scope> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka-test</artifactId> <scope>test</scope> </dependency> </dependencies>
Здесь мы добавим библиотеки необходимые для работы со Schema Regestry, PostgreSQL, Kafka и проч. Чтобы успешно выгрузить библиотеки io.confluent необходимо подключиться к их репозиторию
<repositories> <repository> <id>confluent</id> <url>https://packages.confluent.io/maven/</url> </repository> </repositories>
Теперь напишем yaml конфигурацию для проекта:
# Порт сервера server: port: 8081 # Настройки Spring spring: main: allow-bean-definition-overriding: true application: name: kafka-example-consumer jpa: # Диалект базы данных database-platform: org.hibernate.dialect.PostgreSQLDialect generate-ddl: false hibernate: ddl-auto: update show-sql: true properties: hibernate: generate_statistics: false datasource: # URL для подключения к базе данных PostgreSQL url: jdbc:postgresql://localhost:5432/public username: postgres password: postgres kafka: schema: registry: # Пути до хранилищ схем если у нас их несколько (дополнительная конфигурация) urls: http://schema-registry-cp-schema-registry:8081 bootstrap-servers: localhost:29092 listener: # Как подтверждаем получение сообщений ack-mode: record producer: client-id: ${spring.application.name} # Сериализатор ключа key-serializer: org.apache.kafka.common.serialization.StringSerializer # Сериализатор значения в DLT очередь value-serializer: ru.marmarks.consumer.config.DltMessageSerializer retries: 3 consumer: group-id: ${spring.application.name} autoOffsetReset: earliest # Сериализаторы для ключа и значения с обработкой ошибок keyDeserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer valueDeserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer properties: # Тип значения в JSON по умолчанию spring.json.value.default.type: org.apache.avro.generic.GenericRecord spring.deserializer.key.delegate.class: org.apache.kafka.common.serialization.StringDeserializer # Сериализатор значения spring.deserializer.value.delegate.class: io.confluent.kafka.serializers.KafkaAvroDeserializer specific: avro: reader: true # Хранилище схем по умолчанию schema: registry: url: http://schema-registry-cp-schema-registry:8081 # Настройки Kafka kafka: topics: # Топик из которого будем получать записи personal-data: kafka-connect-personal_data services: # Для определения того из каких топиков мы поддерживаем чтение personal-data: kafka-connect-personal_data
Подключение к Schema Registry и Kafka брокеру
Теперь перейдём к конфигурированию Kafka. Комментарии приведены в коде. Здесь мы конфигурируем подключение к Schema Regestry, слушателя и отправку сообщений, которые не вышло обработать в dlt очередь
KafkaConfiguration.java
@Configuration @EnableKafka @RequiredArgsConstructor @EnableConfigurationProperties(SchemaRegistryProperties.class) public class KafkaConfiguration { private static final int DEFAULT_CACHE_CAPACITY = 200; private static final String DLT_TOPIC_SUFFIX = ".dlt"; private final ConsumerFactory<String, GenericRecord> consumerFactory; private final ProducerFactory<Object, Object> producerFactory; @Bean public ConcurrentKafkaListenerContainerFactory<String, GenericRecord> objectKafkaListenerContainerFactory( DefaultErrorHandler errorHandler ) { ConcurrentKafkaListenerContainerFactory<String, GenericRecord> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory); factory.setCommonErrorHandler(errorHandler); return factory; } /** * Клиент для кеширования схем */ @Bean public CachedSchemaRegistryClient cachedSchemaRegistryClient( SchemaRegistryProperties schemaRegistryProperties) { // Сервис для коммуникацией со Schema Registry RestService restService = new RestService(schemaRegistryProperties.urls()); //Также передаём размер кэша схем return new CachedSchemaRegistryClient(restService, DEFAULT_CACHE_CAPACITY); } @Bean public KafkaTemplate<Object, Object> kafkaTemplate() { return new KafkaTemplate<>(producerFactory); } /** * Публикатор в dead-letter topic. */ @Bean public DeadLetterPublishingRecoverer publisher(KafkaTemplate<Object, Object> bytesTemplate) { // Определяем логику выбора партиции для отправки сообщения в DLT. // В данном случае, создаём новый объект TopicPartition, используя имя //топика (consumerRecord.topic()) и добавляя суффикс DLT_TOPIC_SUFFIX, // а также номер партиции (consumerRecord.partition()). // Следовательно в DLT топике должно быть столько партиций //сколько и в топике откуда читаем return new DeadLetterPublishingRecoverer(bytesTemplate, (consumerRecord, exception) -> new TopicPartition(consumerRecord.topic() + DLT_TOPIC_SUFFIX, consumerRecord.partition())); } /** * Обработчик исключений при получении сообщений из kafka по умолчанию. */ @Bean public DefaultErrorHandler errorHandler( DeadLetterPublishingRecoverer deadLetterPublishingRecoverer) { final var handler = new DefaultErrorHandler(deadLetterPublishingRecoverer); // Обрабатываем любые исключения и отправляем в DLT handler.addNotRetryableExceptions(Exception.class); return handler; } }
Посмотрим на сущность, которую будем обрабатывать:
public class ClientData { @Id @GeneratedValue(strategy = GenerationType.IDENTITY) private Long id; private String bankBic; private String bankName; private String fio; private Long correlationId; private LocalDateTime lastUpdate; }
Добавили новые поля, относительно отправляемой сущности - fio, correlationId. Предположим, что мы заполняем ФИО, обращаясь в какой-то сторонний сервис.
Обработка сообщений из топика
Теперь перейдём к тому, как будем обрабатывать приходящее сообщение
KafkaConsumerListeners.java
public class KafkaConsumerListeners { // Получим сервисы, которые смогут обрабатывать приходящие сообщения private final List<ServiceResolver> serviceResolverList; // Определяем топики из которых будем читать. Их может быть несколько. //Также определим конфигурацию листнера @KafkaListener(topics = "#{'${kafka.topics.personal-data}'.split(',')}", containerFactory = "objectKafkaListenerContainerFactory") // Принимаем абстрактный класс GenericRecord. //Мы сможем обработать только то сообщение, для которого определена //схема в Schema Regestry void readOrganizationConnectorMessages(ConsumerRecord<String, GenericRecord> message) { readMessage(message); } private void readMessage(ConsumerRecord<String, GenericRecord> message) { String topic = message.topic(); long offset = message.offset(); log.info("Сообщение из топика: {} offset: {}", topic, offset); // Получаем само сообщение GenericRecord value = message.value(); if (value == null) { log.info("Пустое сообщение из топика: {} offset: {}", topic, offset); return; } // Находим сервис, который сможет обработать сообщение ServiceResolver resolver = serviceResolverList.stream() .filter(it -> it.isSupported(topic)) .findFirst() .orElseThrow(() -> new RuntimeException("Сервис не найден для топика: " + topic)); // Передаём сообщение на обработку resolver.process(value); log.info("Сообщение обработано: топик {} offset {}", topic, offset); } }
Комментарии по коду приведены непосредственно в коде. Реализация заточена на то, что мы сможем обрабатывать несколько топиков с различными сообщения. Схема каждого сообщения должна быть приведена в Schema Regestry
Работа с Avro-схемами
Посмотрим на обработку сообщения в сервисе
Обработка сообщения
public void process(GenericRecord value) { Long id = Long.valueOf(String.valueOf(value.get("id"))); String bankBic = String.valueOf(value.get("bank_bic")); String bankName = String.valueOf(value.get("bank_name")); String lastUpdate = String.valueOf(value.get("last_update")); long millis = Long.parseLong(lastUpdate); Instant instant = Instant.ofEpochMilli(millis); LocalDateTime lastUpdateTime = instant.atZone(ZoneId.systemDefault()).toLocalDateTime(); // Подготовим новую сущность на основе приходящих данных ClientData clientData = new ClientData(); clientData.setBankBic(bankBic); clientData.setFio("Рауль " + id); clientData.setBankName(bankName); clientData.setCorrelationId(id); clientData.setLastUpdate(lastUpdateTime); ClientData save = clientDataRepository.save(clientData); log.info("Данные клиента сохранены: {}", save); }
В данном случае мы примитивным способом вытаскиваем все поля из сообщения и на основе них создаём новую сущность, которую модифицируем новыми данными, которые мы можем получить обратившись куда-либо ещё или иным способом и записываем в базу
Полный код получателя доступен на GitHub.
Helm Chart
Теперь перейдём к Helm чарту, который будет разворачивать нашего отправителя и получателя.
Подготовим файл с информацией о чарте:
# Версия API Helm Chart apiVersion: v2 # Имя чарта name: marmarks-chart # Версия чарта version: 1.0.0 # Описание description: Some project # Тип type: application sources: - https://github.com/marmarks maintainers: - name: Maksim Krylov email: kryloumaksim@gmail.com url: https://habr.com/ru/users/Marmaksmark/ # Версия приложения appVersion: 1.0.0
Конфигурации и шаблоны
Следующим шагом подготовим файл values.yaml, который предоставляет конфигурации для чарта. Помимо того, что настройки будут определены в этом файле мы сможем также их переопределять используя --set при запуске чарта
producer: # Конфигурация деплоймента deployment: # Имя деплоймента name: "producer" # Число реплик replicas: 1 # Настройки контейнера container: name: "producer-container" resources: { } # Настройки сервиса service: name: "producer-service" port: 8080 # Информация об образе сервиса image: name: marmarks/producer tag: "0.2" # Загрузим образ, если локально его нет pullPolicy: IfNotPresent consumer: deployment: name: "consumer" replicas: 1 container: name: "consumer-container" resources: { } service: name: "consumer-service" port: 80 # тип сервиса: ClusterIP, NodePort, LoadBalancer type: LoadBalancer image: name: marmarks/consumer tag: "0.5" pullPolicy: IfNotPresent
Также мы определим файл _helpers.tpl содержащий вспомогательные шаблоны, такие как:
_helpers.tpl
Имена меток для связей между объектами
{{- define "marmarks-chart.selectorLabels" -}} app.kubernetes.io/name: {{ .Chart.Name }} app.kubernetes.io/instance: {{ .Release.Name }} {{- end -}}
Имя и версия чарта
{{- define "marmarks-chart.chart" -}} {{- printf "%s-%s" .Chart.Name .Chart.Version | replace "+" "_" | trunc 60 | trimSuffix "-" -}} {{- end -}}
Имена меток в манифестах
{{- define "marmarks-chart.labels" -}} helm.sh/chart: {{ include "marmarks-chart.chart" . }} {{ include "marmarks-chart.selectorLabels" . }} {{- if .Chart.AppVersion }} app.kubernetes.io/version: {{ .Chart.AppVersion | quote }} {{- end }} app.kubernetes.io/managed-by: {{ .Release.Service }} {{- end -}}
Функция для вычисления хэш суммы. Для обновления чарта при изменении секретов или конфигураций
{{- define "marmarks-chart.propertiesHash" -}} {{- $producerSecrets := include (print $.Template.BasePath "/producer-secrets.yaml") . | sha256sum -}} {{- $consumerSecrets := include (print $.Template.BasePath "/consumer-secrets.yaml") . | sha256sum -}} {{- $producerConfig := include (print $.Template.BasePath "/producer-config.yaml") . | sha256sum -}} {{- $consumerConfig := include (print $.Template.BasePath "/consumer-config.yaml") . | sha256sum -}} {{ print $producerSecrets $producerConfig $consumerSecrets $consumerConfig | sha256sum }} {{- end -}}
Имена компонентов отправителя
{{- define "marmarks-chart.producer.defaultName" -}} {{- printf "producer-%s" .Release.Name -}} {{- end -}} {{- define "marmarks-chart.producer.deployment.name" -}} {{- default (include "marmarks-chart.producer.defaultName" .) .Values.producer.deployment.name | trunc 63 | trimSuffix "-" -}} {{- end -}} {{- define "marmarks-chart.producer.container.name" -}} {{- default (include "marmarks-chart.producer.defaultName" .) .Values.producer.container.name | trunc 63 | trimSuffix "-" -}} {{- end -}} {{- define "marmarks-chart.producer.service.name" -}} {{- default (include "marmarks-chart.producer.defaultName" .) .Values.producer.service.name | trunc 63 | trimSuffix "-" -}} {{- end -}}
Имена компонентов получателя
{{- define "marmarks-chart.consumer.defaultName" -}} {{- printf "consumer-%s" .Release.Name -}} {{- end -}} {{- define "marmarks-chart.consumer.deployment.name" -}} {{- default (include "marmarks-chart.consumer.defaultName" .) .Values.consumer.deployment.name | trunc 63 | trimSuffix "-" -}} {{- end -}} {{- define "marmarks-chart.consumer.container.name" -}} {{- default (include "marmarks-chart.consumer.defaultName" .) .Values.consumer.container.name | trunc 63 | trimSuffix "-" -}} {{- end -}} {{- define "marmarks-chart.consumer.service.name" -}} {{- default (include "marmarks-chart.consumer.defaultName" .) .Values.consumer.service.name | trunc 63 | trimSuffix "-" -}} {{- end -}}
Имена прочих компонентов компонентов (секретов, конфигов)
{{- define "marmarks-chart.producerSecrets.defaultName" -}} {{- printf "producer-secrets-%s" .Release.Name -}} {{- end -}} {{- define "marmarks-chart.consumerSecrets.defaultName" -}} {{- printf "consumer-secrets-%s" .Release.Name -}} {{- end -}} {{- define "marmarks-chart.producerConfig.defaultName" -}} {{- printf "producer-config-%s" .Release.Name -}} {{- end -}} {{- define "marmarks-chart.consumerConfig.defaultName" -}} {{- printf "consumer-config-%s" .Release.Name -}} {{- end -}}
Получатель
Теперь, когда конфигурации и шаблоны описаны мы можем описать конфигурационный файл consumer-config.yaml для отправителя:
apiVersion: v1 kind: ConfigMap metadata: name: {{ include "marmarks-chart.consumerConfig.defaultName" . }} labels: {{- include "marmarks-chart.labels" . | nindent 4 }} data: SPRING_PROFILES_ACTIVE: "dev" SERVER_PORT: "8080" SPRING_KAFKA_BOOTSTRAP_SERVERS: "kafka:9092" SPRING_DATASOURCE_URL: "jdbc:postgresql://postgresql-dev:5432/postgres"
Здесь мы определили на каком порту будем запускать наше приложение, выбранный профиль, url брокера и url до базы данных. Логин-пароль для аутентификации будем хранить в секретах consumer-secrets.yaml:
# Манифест для создания секретов # Версия API apiVersion: v1 # Тип манифеста kind: Secret # Метаданные манифеста metadata: # Имя манифеста name: {{ include "marmarks-chart.consumerSecrets.defaultName" . }} labels: # Метки манифеста {{- include "marmarks-chart.labels" . | nindent 4 }} # Тип секрета: неструктурированные данные. Храни просто пару "ключ-значение" # Без автоматического обновления, управления, ротации и т.д. type: Opaque stringData: SPRING_DATASOURCE_USERNAME: "postgres" SPRING_DATASOURCE_PASSWORD: "pgpass"
Подробнее про секреты и их типы в статье - Безопасное хранение secrets в Kubernetes
Осталось описать лишь дейломент и сервис получателя consumer.yaml. Исчерпывающие комментарии оставлены в коде:
apiVersion: apps/v1 kind: Deployment metadata: name: {{ include "marmarks-chart.consumer.deployment.name" . }} labels: {{- include "marmarks-chart.labels" . | nindent 4 }} tier: consumer {{/* Спецификация развёртывания*/}} spec: {{/* Число реплик*/}} replicas: {{ .Values.consumer.deployment.replicas }} selector: {{/* Метки подов*/}} matchLabels: {{- include "marmarks-chart.selectorLabels" . | nindent 6 }} tier: consumer {{/* Стратегия обновления развёртывания*/}} strategy: {{/* Пошаговое обновление*/}} rollingUpdate: {{/* Максимальное количество реплик для обновления*/}} maxSurge: 25% {{/* Максимальное число реплик недоступных во время обновления*/}} maxUnavailable: 25% {{/* Тип стратегии развёртывания*/}} type: RollingUpdate {{/* Шаблон создания подов*/}} template: metadata: {{/* Аннотации*/}} annotations: {{/* Проверка чек суммы для обновления секретов и конфигов*/}} checksum/config: {{ include "marmarks-chart.propertiesHash" . }} labels: {{- include "marmarks-chart.selectorLabels" . | nindent 8 }} tier: consumer {{/* Спецификации пода*/}} spec: {{/* Контейнер, который будет запущен в поде*/}} containers: - name: {{ include "marmarks-chart.consumer.container.name" . }} image: "{{ .Values.consumer.image.name }}:{{ .Values.consumer.image.tag }}" imagePullPolicy: {{ .Values.consumer.image.pullPolicy }} {{/* Переменные окружения из конфиг мапы*/}} envFrom: - configMapRef: name: {{ include "marmarks-chart.consumerConfig.defaultName" . }} {{/* Переменные окружения секреты*/}} - secretRef: name: {{ include "marmarks-chart.producerSecrets.defaultName" . }} readinessProbe: {{/* Проверка доступности пода*/}} httpGet: path: /actuator/health port: 8080 initialDelaySeconds: 5 periodSeconds: 3 ports: {{/* Порты пода*/}} - containerPort: 8080 {{/* Протокол*/}} protocol: TCP resources: {{/* Ресурсы выделенные для контейнера*/}} {{- toYaml .Values.consumer.container.resources | nindent 12 }} --- apiVersion: v1 kind: Service metadata: name: {{ include "marmarks-chart.consumer.service.name" . }} labels: {{- include "marmarks-chart.labels" . | nindent 4 }} tier: consumer spec: type: {{ .Values.consumer.service.type }} ports: {{/* Порт службы*/}} - port: {{ .Values.consumer.service.port }} protocol: TCP {{/* Порт контейнера, на который будет направлен трафик*/}} targetPort: 8080 {{/* Имя порта*/}} name: http {{/* Селектор для выбора подов, которые будут обслужены сервисом*/}} selector: {{- include "marmarks-chart.selectorLabels" . | nindent 4 }} tier: consumer
Мы определили стратегию развертывания, health checks и прочее. Наш получатель уже полностью готов к приёму трафика из брокера, а брокер уже готов его нам поставлять. Однако мы сделаем аналогичную конфигурацию для отправителя
Отправитель
producer-config.yaml
apiVersion: v1 kind: ConfigMap metadata: name: {{ include "marmarks-chart.producerConfig.defaultName" . }} labels: {{- include "marmarks-chart.labels" . | nindent 4 }} data: SERVER_PORT: "8080" SPRING_PROFILES_ACTIVE: "dev" SPRING_DATASOURCE_URL: "jdbc:postgresql://postgresql-dev:5432/postgres"
producer-secrets.yaml
# Манифест для создания секретов # Версия API apiVersion: v1 # Тип манифеста kind: Secret # Метаданные манифеста metadata: # Имя манифеста name: {{ include "marmarks-chart.producerSecrets.defaultName" . }} labels: # Метки манифеста {{- include "marmarks-chart.labels" . | nindent 4 }} # Тип секрета: неструктурированные данные. Храни просто пару "ключ-значение" # Без автоматического обновления, управления, ротации и т.д. type: Opaque stringData: SPRING_DATASOURCE_USERNAME: "postgres" SPRING_DATASOURCE_PASSWORD: "pgpass"
producer.yaml
apiVersion: apps/v1 kind: Deployment metadata: name: {{ include "marmarks-chart.producer.deployment.name" . }} labels: {{- include "marmarks-chart.labels" . | nindent 4 }} tier: producer spec: replicas: {{ .Values.producer.deployment.replicas }} selector: matchLabels: {{- include "marmarks-chart.selectorLabels" . | nindent 6 }} tier: producer strategy: rollingUpdate: maxSurge: 25% maxUnavailable: 25% type: RollingUpdate template: metadata: labels: {{- include "marmarks-chart.selectorLabels" . | nindent 8 }} tier: producer spec: containers: - name: {{ include "marmarks-chart.producer.container.name" . }} image: "{{ .Values.producer.image.name }}:{{ .Values.producer.image.tag }}" imagePullPolicy: {{ .Values.producer.image.pullPolicy }} envFrom: - configMapRef: name: {{ include "marmarks-chart.producerConfig.defaultName" . }} - secretRef: name: {{ include "marmarks-chart.producerSecrets.defaultName" . }} ports: - containerPort: 8080 protocol: TCP readinessProbe: httpGet: path: /actuator/health port: 8080 initialDelaySeconds: 5 periodSeconds: 3 resources: {{- toYaml .Values.producer.container.resources | nindent 12 }} --- apiVersion: v1 kind: Service metadata: name: {{ include "marmarks-chart.producer.service.name" . }} labels: {{- include "marmarks-chart.labels" . | nindent 4 }} tier: producer spec: ports: - port: {{ .Values.producer.service.port }} protocol: TCP targetPort: 8080 name: http selector: {{- include "marmarks-chart.selectorLabels" . | nindent 4 }} tier: producer
Запуск и тестирование архитектуры
Необходимо иметь полностью развёрнутую среду из первой части статьи.
Теперь когда Helm Chart готов к запуску, находясь в директории чарта, выполним команду:
helm install <имя чарта> .
Наш чарт успешно запустится, если логи не скажут иного, и будет доступен. Зайдя в логи пода отправителя мы увидим, что он записал несколько новых сообщений в свою базу. После этого, зайдя в логи получателя, мы увидим, что он прочитал и обработал все сообщения, что были в топике раньше и появились только что. Это происходит весьма быстро.
Проверка работоспособности получателя и отправител
Если в ходе обработки сообщений произошла ошибка, то сообщения, чья обработка была безуспешна отправятся в DLT топик. Для того, чтобы убедиться в успешной записи сообщений, мы зайдём в под PostgreSQL и выполним несколько команд. Для начала авторизуемся:
psql -h postgresql-dev -p 5432 -U postgres -d postgres
Нас попросят ввести пароль, в моём случае это pgpass. После его ввода и успешной авторизации выполним:
SELECT * FROM client_data;
id | bank_bic | bank_name | correlation_id | fio | last_update
---+-------------------+-----------------+---------------------+-----------+-----------------------------------
1 | 1179896397 | null | 1 | Рауль 1| 2023-08-06 16:08:16.156
2 | -1471064429 | null | 2 | Рауль 2| 2023-08-06 16:08:16.221
3 | -604766198 | null | 3 | Рауль 3| 2023-08-06 16:08:16.228
4 | -771359675 | null | 4 | Рауль 4| 2023-08-06 16:08:16.232
5 | 1082682564 | null | 5 | Рауль 5| 2023-08-06 16:08:16.237
--------------------------------------------------------------------------------------------------------------------
Заключение
Если данные будут успешно обработаны, то записи будут получены при выполнении SQL запроса. Таким образом мы видим, что ивентная модель работает и успешно обрабатывает данные в асинхронном режиме, чем обеспечивает Evential Consistency.
Исходный код получателя и Helm Chart доступен на GitHub.
