Привет, Хабр! В наше время при постоянном росте объемов данных и необходимостью более быстрой и надежной обработки информации, мы сталкиваемся с требованием к эффективному обмену и синхронизации данных между различными системами. Отслеживание и обработка данных в реальном времени стало жизненно необходимым для современных приложений.
В этой статье мы рассмотрим, как Kafka Connect – мощный инструмент из экосистемы Apache Kafka – приходит на помощь при решении сложной задачи синхронизации данных между базами данных. Мы рассмотрим, как используя Kafka Connect, мы можем эффективно следить за изменениями в одной базе данных, обрабатывать их в нашем Java приложении и мгновенно записывать их в другую базу данных, обеспечивая надежность и безопасность данных.
Построим гибкую и масштабируемую архитектуру, которая позволит нам забыть о проблемах, связанных с несогласованными данными, и наслаждаться мгновенным доступом к актуальной информации для наших бизнес-процессов.
Что мы будем использовать?
Docker
Kubectl
Minikube
Helm (v3)
Java
Предполагается, что всё уже установлено и работает в штатном режиме. В связи с использованием minikube мы будем жить с некоторыми ограничениями.
Комментарии к коду приведены непосредственно в коде
Какой флоу мы построим?

Мы будем асинхронно передавать сообщения. Kafka - шина событий, в топик которой попадают ивенты, происходящие в другом сервисе. Мы сможем получить:
Независимую обработку данных
Масштабируемость (Использование нескольких брокеров)
Отказоустойчивость (В случае недоступности сервиса, данные в топики будут ждать, пока сервис не восстановится)
Гибкость архитектуры (Возможность обрабатывать ивенты в нескольких местах для разных целей)
За передачу ивентов будет отвечать Kafka Connect , предназначенный для интеграции между источниками данных и Kafka. Он имеет:
Коннекторы для подключения к базам данных, файловой системе, облачным сервисам, а также пользовательские коннекторы
Горизонтальное масштабирование
Обработку данных в реальном времени
Разработка первого Java приложения
Для начала мы напишем Java приложение, которое будет писать в нашу базу данных:
Код отправителя типовой. Мы лишь вставляем несколько записей в базу. Полный код доступен на GitHub. Единственное, что могу отметить, это сущность, которую мы будем записывать:
public class PersonalData {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
private String bankBic;
private String bankName;
@Column(name = "last_update")
private LocalDateTime lastUpdate = LocalDateTime.now();
public PersonalData(String bankBic) {
this.bankBic = bankBic;
}
}bankBic будем получать, используя рандомайзер, bankName всегда будет null, в качестве даты последнего обновления берём текущую.
Деплой в DockerHub
После окончания написания кода, необходимо загрузить наш сервис в DockerHub.
Для этого напишем Dockerfile для подготовки образа:
FROM maven:3.8.4-openjdk-17-slim AS builder
COPY pom.xml /build/
COPY src /build/src/
WORKDIR /build
RUN mvn -B -e -C -T 1C -DskipTests clean package \
&& rm -rf ~/.m2
FROM openjdk:17-slim
COPY --from=builder /build/target/producer-*.jar /app/application.jar
WORKDIR /app
EXPOSE 8080
ENTRYPOINT ["java", "-jar", "application.jar"]Теперь, когда всё готово мы можем собрать наше приложение с присвоением репозитория, имени и тега.
docker login
docker build -t <your nickname>/<image name>:<tag> .
После успешной сборки образа, отправим его в DockerHub:
docker push <your nickname>/<image name>:<tag>
Теперь, когда образ загружен, необходимо подготовить для него деплоймент
Но перед этим подготовим нашу базу PostgreSQL, ��уда будет писать наш сервис
Подготовка окружения
Первым делом мы запустим PostgreSQL. Для этого нам понадобится создать 3 файла: storage.yaml, pv.yaml, pvc.yaml. Содержимое приведено ниже, подробнее о файлах ещё ниже:
storage.yaml
kind: StorageClass
apiVersion: storage.k8s.io/v1
metadata:
name: local-storage
provisioner: kubernetes.io/no-provisioner
volumeBindingMode: WaitForFirstConsumerpv.yaml
apiVersion: v1
kind: PersistentVolume
metadata:
name: "pv-pg"
labels:
type: local
spec:
capacity:
storage: "4Gi"
volumeMode: Filesystem
accessModes:
- ReadWriteOnce
persistentVolumeReclaimPolicy: Retain
storageClassName: local-storage
local:
path: "/opt/"
nodeAffinity:
required:
nodeSelectorTerms:
- matchExpressions:
- key: kubernetes.io/hostname
operator: In
values:
- minikubepvc.yaml
kind: PersistentVolumeClaim
apiVersion: v1
metadata:
name: "pvc-pg"
spec:
storageClassName: "local-storage"
accessModes:
- ReadWriteOnce
resources:
requests:
storage: "4Gi"StorageClass - хранилище, где мы определяем, что:
provisioner: kubernetes.io/no-provisioner - мы не будем динамически создавать локальные хранилища поскольку в kubernetes нет такой возможности;
volumeBindingMode: WaitForFirstConsumer - предоставлять хранилище будем лишь при необходимости в этом.
PV (Persistent Volume) - постоянные тома, которые предоставляют непосредственно хранилище для данных.
capacity: storage: "4Gi" - определяем размер.
accessModes: - ReadWriteOnce - режим доступа. В нашем случае хранилище доступно только для одного пода. Возможно варианты:
ReadOnlyMany - множество подов, но только на чтение;
ReadWriteMany - чтение и запись для множества подов;
persistentVolumeReclaimPolicy: Retain - определяет политику восстановления тома после того, как он перестанет использоваться. В нашем случае том будет освобождён и останется существовать. Также доступны:
Delete - удаление данных и PV;
Recycle - устаревшая политика. Удаляет данные, но не PV;
storageClassName: local-storage - указываем какой StorageClass используем.
local: path: "/opt/" - определяем путь к локальному хранилищу на узле (путь должен существовать).
PVC (PersistentVolumeClaim) - запрос к PV на предоставление ресурсов определённого размера.
storageClassName: "local-storage" - указываем какой StorageClass используем.
accessModes: - ReadWriteOnce - определяем режим доступа к PVC аналогично PV
resources: requests: storage: "4Gi" - определяем запрашиваемый размер хранилища в 4 гигабайта.
StorageClass определяет тип и параметры хранилища, но не физическое хранилище. Это делает PersistentVolume. PersistentVolumeClaim запрашивает предоставление хранилища и если есть доступный PV с подходящим размером и классом хранилища, то PVC к нему привяжется и поды смогут использовать это хранилище.
Теперь, когда всё готово, необходимо установить PostgreSQL. Для этого используем репозиторий bitnami.
Добавим репозиторий bitnami:
helm repo add bitnami https://charts.bitnami.com/bitnami
Обновим репозиторий:
helm repo update
И теперь выполним команду для установки:
helm install postgresql-dev bitnami/postgresql
--set primary.persistence.existingClaim=pvc-pg,auth.postgresPassword=pgpassВыполним команду kubectl get pods -w с флагом -w , чтобы отслеживать состояние подов в прямом эфире. Проследим за запуском нашей базы и в случае успеха можно переходить дальше.
Отправитель
Теперь мы можем подготовить деплоймент, чтобы развернуть наше приложение, которое уже сможет выполнять свои функции - писать в базу данных. Создадим файл producer-deploy.yaml со следующим содержимым:
producer-deploy.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: producer-dep
labels:
app: marmarks-dep
spec:
replicas: 1
selector:
matchLabels:
app: marmarks-dep
template:
metadata:
labels:
app: marmarks-dep
spec:
containers:
- name: producer-dep
image: "marmarks/producer:0.2"
imagePullPolicy: IfNotPresent
envFrom:
- configMapRef:
name: producer-config-map
- secretRef:
name: producer-secrets
ports:
- containerPort: 8080
protocol: TCP
readinessProbe:
httpGet:
path: /actuator/health
port: 8080
initialDelaySeconds: 5
periodSeconds: 3
---
apiVersion: v1
kind: ConfigMap
metadata:
name: producer-config-map
labels:
app: marmarks-dep
data:
SERVER_PORT: "8080"
SPRING_PROFILES_ACTIVE: "dev"
SPRING_DATASOURCE_URL: "jdbc:postgresql://postgresql-dev:5432/postgres"
---
apiVersion: v1
kind: Secret
metadata:
name: producer-secrets
labels:
app: marmarks-dep
type: Opaque
stringData:
SPRING_DATASOURCE_USERNAME: "postgres"
SPRING_DATASOURCE_PASSWORD: "pgpass"Здесь, помимо деплоймнета, мы создадим ConfigMap и хранилище секретов, куда передадим необходимые переменные окружения для подключения. Более подробный разбор конфигураций будет приведён в следующей части статьи, где мы рассмотрим Helm Chart.
Запустим наш деплоймент, используя kubectl apply -f producer-deploy.yaml
Выполним команду kubectl get pods -w.
Когда образ будет загружен и под станет доступным, можем зайти внутрь пода PostgreSQL выполнив команду kubectl exec -i -t -n default postgresql-dev-0 -c postgresql -- sh -c "clear; (bash || ash || sh)"
Находясь внутри подключимся к базе выполнив команду: psql -h postgresql-dev -p 5432 -U postgres -d postgres
Следующим шагом у нас запросят пароль. В моём случае я введу pgpass
Теперь, когда мы внутри, выполним SELECT * FROM personal_data; для получения содержащихся внутри записей. Получим что-то типа такого:
| id | bank_bic | bank_name | last_update |
|------|-------------------|---------------------|--------------------------------------------------------|
| 1 | 1234567 | | 2023-07-27 17:06:44.052581 |
| 2 | 1234568 | | 2023-07-27 17:06:44.266662 |
| 3 | 1234569 | | 2023-07-27 17:06:44.272390 |
| 4 | 1234510 | | 2023-07-27 17:06:44.276271 |
| 5 | 1234511 | | 2023-07-27 17:06:44.339826 |
Kafka
Теперь, когда наш образ и база данных готовы для работы и мы убедились, что они работают, мы поднимим Kafka Cluster. Теперь загрузим Zookeeper и Kafka из репозитория bitnami:
helm install zookeeper bitnami/zookeeper
helm install kafka bitnami/kafka
Так мы используем настройки по умолчанию. Для того, чтобы установить число реплик равным 3, необходимо добавить --set replicaCount=3 в команду для чарта Kafka. Если чарт уже запущен, сделаем helm upgrade kafka bitnami/kafka --set replicaCount=3
Альтернативным путём будет запустить чарты из их репозитория на GitHub.
Kafka Connect
Теперь необходимо подготовить необходимый образ kafka-connect. Для этого напишем Dockerfile, где установим в наш образ jdbc connect для чтения из базы данных, а также avro converter для конвертации данных в формат Avro:
FROM confluentinc/cp-kafka-connect-base:6.2.1
RUN confluent-hub install --no-prompt confluentinc/kafka-connect-avro-converter:5.5.4
RUN confluent-hub install --no-prompt confluentinc/kafka-connect-jdbc:10.1.1После этого соберем наш образ и загрузим в DockerHub, как мы делали это ранее.
Когда наш образ собран и загружен, обратимся к confluentinc для получения helm чарта - GitHub. Клонируем себе репозиторий и выполним команду:
helm dependency update charts/cp-kafka/
После этого используем наш образ для запуска Kafka Connect:
helm install kafka-connect \
--set image="marmarks/cp-kafka-connect-jdbc" \
--set imageTag="6.2.1" \
--set kafka.bootstrapServers="PLAINTEXT://kafka-headless:9092" \
--set prometheus.jmx.enabled=false \
./charts/cp-kafka-connectгде Image, imageTag, необходимо заменить на свои параметры.
kafka.bootstrapServers - путь до брокера. Необходимо заменить на свой путь, если запускали kafka отличным от моего способа и в этом есть необходимость.
prometheus.jmx.enabled=false чтобы отключить сбор метрик.
Если реплик Kafka не три, то следует дополнить команду сменив число реплик топиков, поскольку по умолчанию их 3. Итоговая команда (для 1 реплики) будет выглядеть так:
helm install kafka-connect \
--set image="marmarks/cp-kafka-connect-jdbc" \
--set imageTag="6.2.1" \
--set kafka.bootstrapServers="PLAINTEXT://kafka-headless:9092" \
--set prometheus.jmx.enabled=false \
--set config.storage.replication.factor="1" \
--set offset.storage.replication.factor="1" \
--set status.storage.replication.factor="1" \
./charts/cp-kafka-connectSchema Regestry
Следующим шагом следует запустить Schema Regestry. Для этого используем следующую команду:
helm install schema-registry \
--set kafka.bootstrapServers="PLAINTEXT://kafka-headless:9092" \
--set prometheus.jmx.enabled=false ./charts/cp-schema-registryЗдесь мы снова указываем путь до брокера (при необходимости изменить) и отключаем сбор метрик.
Для работы с Kafka Connect и брокером, мы поднимем под Kafka Client. Создадим файл kafka-client.yaml с содержимым:
kafka-client.yaml
apiVersion: v1
kind: Pod
metadata:
name: kafka-client
namespace: default
spec:
containers:
- name: kafka-client
image: confluentinc/cp-enterprise-kafka:5.4.1
command:
- sh
- -c
- "sleep infinity"Применим с помощью kubectl apply -f kafka-client.yaml
Когда kafka-client запущен, мы для удобства зайдём в него с помощью команды:
kubectl exec -i -t -n default kafka-client -c kafka-client -- sh -c "clear; (bash || ash || sh)"
Проверим наличие коннекторов:
curl -X GET http://kafka-connect-cp-kafka-connect:8083/connectors
На текущем этапы мы получим [], что будет означать, что коннекторов не создано. Создадим наш коннектор:
curl -X POST \
-H "Content-Type: application/json" \
--data '{
"name": "kafka-connector",
"config": {
"connector.class":"io.confluent.connect.jdbc.JdbcSourceConnector",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://schema-registry-cp-schema-registry:8081",
"value.converter.schema.registry.url": "http://schema-registry-cp-schema-registry:8081",
"tasks.max": 1,
"connection.url": "jdbc:postgresql://postgresql-dev:5432/postgres?user=postgres&password=pgpass",
"table.whitelist": "personal_data",
"mode": "incrementing",
"incrementing.column.name": "id",
"topic.prefix": "kafka-connect-",
"poll.interval.ms": 1000
}
}' \
http://kafka-connect-cp-kafka-connect:8083/connectors"name": "kafka-connector" - укажем имя для коннектора.
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector" - определяет класс коннектора, позволяющий читать данные из базы данных с использованием JDBC.
"key.converter": "io.confluent.connect.avro.AvroConverter" - указываем конвертер для ключей сообщений, который будет использоваться при записи данных в Kafka. Используем AvroConverter, который позволяет сериализовать ключи сообщений в Avro формат перед отправкой в Kafka.
"value.converter": "io.confluent.connect.avro.AvroConverter" - задаем конвертер для значений сообщений, который будет использоваться при записи данных в Kafka. Аналогично ключам, значения также сериализуются в Avro формат.
"key.converter.schema.registry.url": "http://schema-registry-cp-schema-registry:8081" - указываем URL-адрес Schema Registry, который будет использоваться для регистрации схемы ключей сообщений в Avro формате.
"value.converter.schema.registry.url": "http://schema-registry-cp-schema-registry:8081" - указываем URL-адрес Schema Registry, который будет использоваться для регистрации схемы значений сообщений в Avro формате.
"tasks.max": 1 - указываем количество потоков для данного коннектора, когда у нас больше 1 пода Kafka Connect, имеет смысл ставить больше потоков.
"connection.url": "jdbc:postgresql://postgresql-dev:5432/postgres?user=postgres&password=pgpass" - задаем URL-подключения к базе данных PostgreSQL, которая будет использоваться для чтения данных.
"table.whitelist": "personal_data" - определяем список таблиц, которые будут считываться из базы данных. Если не перечислить здесь таблицы, то данные будут считываться из всех таблиц доступных для пользователя переданного в connection.url
"mode": "incrementing" - указываем режим считывания данных, это инкрементный режим, который использует значение столбца для определения обновлений.
"incrementing.column.name": "id" - указываем имя столбца, используемого для определения обновлений в инкрементном режиме.
"timestamp.column.name": "last_update" - указываем имя столбца, для получения обновлений уже существующих записей
"topic.prefix": "kafka-connect-" - задаем префикс для имен топиков, которые будут создаваться в Kafka при записи данных из базы данных.
"poll.interval.ms": 1000: Задает интервал между опросами базы данных на предмет обновлений данных. В данном случае, это 1000 миллисекунд (1 секунда).
Дополнительно отмечу, что вручную мы никаких схем регистрировать не будем. Kafka Connect сам создаст необходимые схемы и запишет их в хранилище. Помимо приведённых выше конфигураций, в документации можно найти ещё множество различных настроек.
А работает ли?
После отправки выполним curl -X GET http://kafka-connect-cp-kafka-connect:8083/connectors/kafka-connector/status для получения статуса созданного коннектора. В норме статус будет выглядеть следующим образом:
{
"name": "kafka-connector",
"connector": {
"state": "RUNNING",
"worker_id": "10.244.2.104:8083"
},
"tasks": [
{
"id": 0,
"state": "RUNNING",
"worker_id": "10.244.2.104:8083"
}
],
"type": "source"
}
Чтобы получить конфигурацию коннектора необходимо выполнить команду:
curl -X GET http://kafka-connect-cp-kafka-connect:8083/connectors/kafka-connector/config
Теперь получим список доступных топиков в Kafka, используя команду:
kafka-topics --bootstrap-server kafka:9092 --list
Здесь мы увидим топик с нашими записями из таблицы personal data -> kafka-connect-personal_data. Топик с конфигурациями, оффсетом, статусами -> kafka-connect-cp-kafka-connect-config, kafka-connect-cp-kafka-connect-offset, kafka-connect-cp-kafka-connect-status, соответственно.
Теперь проверим наличие записей в нашем топике:
kafka-console-consumer --bootstrap-server kafka:9092 --topic kafka-connect-personal_data --from-beginning
В случае, если всё прошло успешно, это будет выглядеть как-то так:
{"id":1,"bank_bic":"1234568","bank_name":null,"last_update":1690113936653}
{"id":2,"bank_bic":"1234569","bank_name":null,"last_update":1690113936752}
{"id":3,"bank_bic":"1234510","bank_name":null,"last_update":1690113936843}
{"id":4,"bank_bic":"1234511","bank_name":null,"last_update":1690113936858}
Теперь посмотрим на схемы в Schema Regestry. Для получения списка доступных схем выполним команду:
curl -X GET http://schema-registry-cp-schema-registry:8081/subjects
В моём случае ответ выглядит так: ["kafka-connect-personal_data-value"]
Теперь узнаем версию схемы:
curl -X GET http://schema-registry-cp-schema-registry:8081/subjects/kafka-connect-personal_data-value/versions
В моём случае она первая, и теперь посмотрим саму схему:
curl -X GET http://schema-registry-cp-schema-registry:8081/subjects/kafka-connect-personal_data-value/versions/1
Ответ:
{
"subject": "kafka-connect-personal_data-value",
"version": 1,
"id": 1,
"schema": {
"type": "record",
"name": "personal_data",
"fields": [
{
"name": "id",
"type": "long"
},
{
"name": "bank_bic",
"type": ["null", "string"],
"default": null
},
{
"name": "bank_name",
"type": ["null", "string"],
"default": null
},
{
"name": "last_update",
"type": ["null", { "type": "long", "connect.version": 1, "connect.name": "org.apache.kafka.connect.data.Timestamp", "logicalType": "timestamp-millis" }],
"default": null
}
],
"connect.name": "personal_data"
}
}Мы развернули множество сервисов, убедились, что каждый из них общается друг с другом, все данные успешно доставляются от одного сервису к другому, и всё работает в штатном режиме. Повторим наш флоу: данные попадают в базу данных, откуда их считывает опираясь на поля id (для новых записей) и last_update (для уже существующих) Kafka Connect, после этого он отправляет их брокеру Kafka в отдельный топик, а также генерирует схему данных, которую отправляет в Schema Regestry. Теперь осталось подготовить сервис, который будет получать схемы из реестра, считывать данные из топика, модифицировать их по своему усмотрению и записывать в базу данных. Также подготовим Helm Chart для удобного развёртывания нашего отправителя и получателя. Всё это будет проделано в следующей части статьи.
