Привет, Хабр! В наше время при постоянном росте объемов данных и необходимостью более быстрой и надежной обработки информации, мы сталкиваемся с требованием к эффективному обмену и синхронизации данных между различными системами. Отслеживание и обработка данных в реальном времени стало жизненно необходимым для современных приложений.

В этой статье мы рассмотрим, как Kafka Connect – мощный инструмент из экосистемы Apache Kafka – приходит на помощь при решении сложной задачи синхронизации данных между базами данных. Мы рассмотрим, как используя Kafka Connect, мы можем эффективно следить за изменениями в одной базе данных, обрабатывать их в нашем Java приложении и мгновенно записывать их в другую базу данных, обеспечивая надежность и безопасность данных.

Построим гибкую и масштабируемую архитектуру, которая позволит нам забыть о проблемах, связанных с несогласованными данными, и наслаждаться мгновенным доступом к актуальной информации для наших бизнес-процессов.

Что мы будем использовать?

  • Docker

  • Kubectl

  • Minikube

  • Helm (v3)

  • Java

Предполагается, что всё уже установлено и работает в штатном режиме. В связи с использованием minikube мы будем жить с некоторыми ограничениями.

Комментарии к коду приведены непосредственно в коде

Какой флоу мы построим?

Путь данных от одного сервиса к другому
Путь данных от одного сервиса к другому

Мы будем асинхронно передавать сообщения. Kafka - шина событий, в топик которой попадают ивенты, происходящие в другом сервисе. Мы сможем получить:

  • Независимую обработку данных

  • Масштабируемость (Использование нескольких брокеров)

  • Отказоустойчивость (В случае недоступности сервиса, данные в топики будут ждать, пока сервис не восстановится)

  • Гибкость архитектуры (Возможность обрабатывать ивенты в нескольких местах для разных целей)

За передачу ивентов будет отвечать Kafka Connect , предназначенный для интеграции между источниками данных и Kafka. Он имеет:

  • Коннекторы для подключения к базам данных, файловой системе, облачным сервисам, а также пользовательские коннекторы

  • Горизонтальное масштабирование

  • Обработку данных в реальном времени

Разработка первого Java приложения

Для начала мы напишем Java приложение, которое будет писать в нашу базу данных:
Код отправителя типовой. Мы лишь вставляем несколько записей в базу. Полный код доступен на GitHub. Единственное, что могу отметить, это сущность, которую мы будем записывать:

public class PersonalData {

 @Id
 @GeneratedValue(strategy = GenerationType.IDENTITY)
 private Long id;
 private String bankBic;
 private String bankName;
 @Column(name = "last_update")
 private LocalDateTime lastUpdate = LocalDateTime.now();

 public PersonalData(String bankBic) {
   this.bankBic = bankBic;
 }

}

bankBic будем получать, используя рандомайзер, bankName всегда будет null, в качестве даты последнего обновления берём текущую.

Деплой в DockerHub

После окончания написания кода, необходимо загрузить наш сервис в DockerHub.

Для этого напишем Dockerfile для подготовки образа:

FROM maven:3.8.4-openjdk-17-slim AS builder

COPY pom.xml /build/
COPY src /build/src/
WORKDIR /build
RUN mvn -B -e -C -T 1C -DskipTests clean package \
    && rm -rf ~/.m2
FROM openjdk:17-slim
COPY --from=builder /build/target/producer-*.jar /app/application.jar
WORKDIR /app
EXPOSE 8080

ENTRYPOINT ["java", "-jar", "application.jar"]

Теперь, когда всё готово мы можем собрать наше приложение с присвоением репозитория, имени и тега.

docker login

docker build -t <your nickname>/<image name>:<tag> .

После успешной сборки образа, отправим его в DockerHub:

docker push <your nickname>/<image name>:<tag>

Теперь, когда образ загружен, необходимо подготовить для него деплоймент

Но перед этим подготовим нашу базу PostgreSQL, ��уда будет писать наш сервис

Подготовка окружения

Первым делом мы запустим PostgreSQL. Для этого нам понадобится создать 3 файла: storage.yaml, pv.yaml, pvc.yaml. Содержимое приведено ниже, подробнее о файлах ещё ниже:

storage.yaml
kind: StorageClass
apiVersion: storage.k8s.io/v1
metadata:
  name: local-storage
provisioner: kubernetes.io/no-provisioner
volumeBindingMode: WaitForFirstConsumer

pv.yaml
apiVersion: v1
kind: PersistentVolume
metadata:
  name: "pv-pg"
  labels:
    type: local
spec:
  capacity:
    storage: "4Gi"
  volumeMode: Filesystem
  accessModes:
    - ReadWriteOnce
  persistentVolumeReclaimPolicy: Retain
  storageClassName: local-storage
  local:
    path: "/opt/"
  nodeAffinity:
    required:
      nodeSelectorTerms:
        - matchExpressions:
            - key: kubernetes.io/hostname
              operator: In
              values:
                - minikube

pvc.yaml
kind: PersistentVolumeClaim
apiVersion: v1
metadata:
  name: "pvc-pg"
spec:
  storageClassName: "local-storage"
  accessModes:
    - ReadWriteOnce
  resources:
    requests:
      storage: "4Gi"

StorageClass - хранилище, где мы определяем, что:

provisioner: kubernetes.io/no-provisioner - мы не будем динамически создавать локальные хранилища поскольку в kubernetes нет такой возможности; 

volumeBindingMode: WaitForFirstConsumer - предоставлять хранилище будем лишь при необходимости в этом.

PV (Persistent Volume) - постоянные тома, которые предоставляют непосредственно хранилище для данных.

capacity: storage: "4Gi" - определяем размер.

accessModes: - ReadWriteOnce - режим доступа. В нашем случае хранилище доступно только для одного пода. Возможно варианты: 

  • ReadOnlyMany - множество подов, но только на чтение;

  • ReadWriteMany - чтение и запись для множества подов;

persistentVolumeReclaimPolicy: Retain - определяет политику восстановления тома после того, как он перестанет использоваться. В нашем случае том будет освобождён и останется существовать. Также доступны:

  • Delete - удаление данных и PV;

  • Recycle - устаревшая политика. Удаляет данные, но не PV;

storageClassName: local-storage - указываем какой StorageClass используем.
local: path: "/opt/" - определяем путь к локальному хранилищу на узле (путь должен существовать).

PVC (PersistentVolumeClaim) - запрос к PV на предоставление ресурсов определённого размера.

storageClassName: "local-storage" - указываем какой StorageClass используем.

accessModes: - ReadWriteOnce - определяем режим доступа к PVC аналогично PV
resources: requests: storage: "4Gi" - определяем запрашиваемый размер хранилища в 4 гигабайта.

StorageClass определяет тип и параметры хранилища, но не физическое хранилище. Это делает PersistentVolume. PersistentVolumeClaim  запрашивает предоставление хранилища и если есть доступный PV с подходящим размером и классом хранилища, то PVC к нему привяжется и поды смогут использовать это хранилище.

Теперь, когда всё готово, необходимо установить PostgreSQL. Для этого используем репозиторий bitnami.

Добавим репозиторий bitnami:

helm repo add bitnami https://charts.bitnami.com/bitnami

Обновим репозиторий:

helm repo update

И теперь выполним команду для установки:

helm install postgresql-dev bitnami/postgresql 
--set primary.persistence.existingClaim=pvc-pg,auth.postgresPassword=pgpass

Выполним команду kubectl get pods -w с флагом -w , чтобы отслеживать состояние подов в прямом эфире. Проследим за запуском нашей базы и в случае успеха можно переходить дальше.

Отправитель

Теперь мы можем подготовить деплоймент, чтобы развернуть наше приложение, которое уже сможет выполнять свои функции - писать в базу данных. Создадим файл producer-deploy.yaml со следующим содержимым:

producer-deploy.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: producer-dep
  labels:
    app: marmarks-dep
spec:
  replicas: 1
  selector:
    matchLabels:
      app: marmarks-dep
  template:
    metadata:
      labels:
        app: marmarks-dep
    spec:
      containers:
        - name: producer-dep
          image: "marmarks/producer:0.2"
          imagePullPolicy: IfNotPresent
          envFrom:
            - configMapRef:
                name: producer-config-map
            - secretRef:
                name: producer-secrets
          ports:
            - containerPort: 8080
              protocol: TCP
          readinessProbe:
            httpGet:
              path: /actuator/health
              port: 8080
            initialDelaySeconds: 5
            periodSeconds: 3

---

apiVersion: v1
kind: ConfigMap
metadata:
  name: producer-config-map
  labels:
    app: marmarks-dep
data:
  SERVER_PORT: "8080"
  SPRING_PROFILES_ACTIVE: "dev"
  SPRING_DATASOURCE_URL: "jdbc:postgresql://postgresql-dev:5432/postgres"

---

apiVersion: v1
kind: Secret
metadata:
  name: producer-secrets
  labels:
    app: marmarks-dep
type: Opaque
stringData:
  SPRING_DATASOURCE_USERNAME: "postgres"
  SPRING_DATASOURCE_PASSWORD: "pgpass"

Здесь, помимо деплоймнета, мы создадим ConfigMap и хранилище секретов, куда передадим необходимые переменные окружения для подключения. Более подробный разбор конфигураций будет приведён в следующей части статьи, где мы рассмотрим Helm Chart.

Запустим наш деплоймент, используя kubectl apply -f producer-deploy.yaml

Выполним команду kubectl get pods -w.

Когда образ будет загружен и под станет доступным, можем зайти внутрь пода PostgreSQL выполнив команду kubectl exec -i -t -n default postgresql-dev-0 -c postgresql -- sh -c "clear; (bash || ash || sh)"

Находясь внутри подключимся к базе выполнив команду: psql -h postgresql-dev -p 5432 -U postgres -d postgres

Следующим шагом у нас запросят пароль. В моём случае я введу pgpass

Теперь, когда мы внутри, выполним SELECT * FROM personal_data; для получения содержащихся внутри записей. Получим что-то типа такого:

|  id  |  bank_bic   | bank_name |                  last_update                       |
|------|-------------------|---------------------|--------------------------------------------------------|
|  1  |   1234567   |                        | 2023-07-27 17:06:44.052581         |
|  2  |   1234568   |                        | 2023-07-27 17:06:44.266662         |
|  3   |   1234569   |                        | 2023-07-27 17:06:44.272390         |
|  4  |   1234510   |                        | 2023-07-27 17:06:44.276271       |
|  5  |   1234511 |                        | 2023-07-27 17:06:44.339826         |

Kafka

Теперь, когда наш образ и база данных готовы для работы и мы убедились, что они работают, мы поднимим Kafka Cluster. Теперь загрузим Zookeeper и Kafka из репозитория bitnami:

helm install zookeeper bitnami/zookeeper
helm install kafka bitnami/kafka

Так мы используем настройки по умолчанию. Для того, чтобы установить число реплик равным 3, необходимо добавить  --set replicaCount=3 в команду для чарта Kafka. Если чарт уже запущен, сделаем helm upgrade kafka bitnami/kafka --set replicaCount=3

Альтернативным путём будет запустить чарты из их репозитория на GitHub.

Kafka Connect

Теперь необходимо подготовить необходимый образ kafka-connect. Для этого напишем Dockerfile, где установим в наш образ jdbc connect для чтения из базы данных, а также avro converter для конвертации данных в формат Avro:

FROM confluentinc/cp-kafka-connect-base:6.2.1

RUN confluent-hub install --no-prompt confluentinc/kafka-connect-avro-converter:5.5.4

RUN confluent-hub install --no-prompt confluentinc/kafka-connect-jdbc:10.1.1

После этого соберем наш образ и загрузим в DockerHub, как мы делали это ранее. 

Когда наш образ собран и загружен, обратимся к confluentinc для получения helm чарта - GitHub. Клонируем себе репозиторий и выполним команду:

helm dependency update charts/cp-kafka/

После этого используем наш образ для запуска Kafka Connect:

helm install kafka-connect \
 --set image="marmarks/cp-kafka-connect-jdbc" \
 --set imageTag="6.2.1" \
 --set kafka.bootstrapServers="PLAINTEXT://kafka-headless:9092" \
 --set prometheus.jmx.enabled=false \
 ./charts/cp-kafka-connect

где Image, imageTag, необходимо заменить на свои параметры.

kafka.bootstrapServers - путь до брокера. Необходимо заменить на свой путь, если запускали kafka отличным от моего способа и в этом есть необходимость.

prometheus.jmx.enabled=false чтобы отключить сбор метрик.

Если реплик Kafka не три, то следует дополнить команду сменив число реплик топиков, поскольку по умолчанию их 3. Итоговая команда (для 1 реплики) будет выглядеть так:

helm install kafka-connect \
 --set image="marmarks/cp-kafka-connect-jdbc" \
 --set imageTag="6.2.1" \
 --set kafka.bootstrapServers="PLAINTEXT://kafka-headless:9092" \
 --set prometheus.jmx.enabled=false \
 --set config.storage.replication.factor="1" \ 
 --set offset.storage.replication.factor="1" \ 
 --set  status.storage.replication.factor="1" \
 ./charts/cp-kafka-connect

Schema Regestry

Следующим шагом следует запустить Schema Regestry. Для этого используем следующую команду:

helm install schema-registry \
  --set kafka.bootstrapServers="PLAINTEXT://kafka-headless:9092" \
  --set prometheus.jmx.enabled=false   ./charts/cp-schema-registry

Здесь мы снова указываем путь до брокера (при необходимости изменить) и отключаем сбор метрик.

Для работы с Kafka Connect и брокером, мы поднимем под Kafka Client. Создадим файл kafka-client.yaml с содержимым:

kafka-client.yaml
apiVersion: v1
kind: Pod
metadata:
  name: kafka-client
  namespace: default
spec:
  containers:
  - name: kafka-client
    image: confluentinc/cp-enterprise-kafka:5.4.1
    command:
    - sh
    - -c
    - "sleep infinity"

Применим с помощью kubectl apply -f kafka-client.yaml

Когда kafka-client запущен, мы для удобства зайдём в него с помощью команды: 

kubectl exec -i -t -n default kafka-client -c kafka-client -- sh -c "clear; (bash || ash || sh)"

Проверим наличие коннекторов: 

curl -X GET http://kafka-connect-cp-kafka-connect:8083/connectors

На текущем этапы мы получим [], что будет означать, что коннекторов не создано. Создадим наш коннектор:

curl -X POST \
  -H "Content-Type: application/json" \
  --data '{
    "name": "kafka-connector",
    "config": {
     "connector.class":"io.confluent.connect.jdbc.JdbcSourceConnector", 
       "key.converter": "io.confluent.connect.avro.AvroConverter", 
       "value.converter": "io.confluent.connect.avro.AvroConverter", 
       "key.converter.schema.registry.url": "http://schema-registry-cp-schema-registry:8081", 
       "value.converter.schema.registry.url": "http://schema-registry-cp-schema-registry:8081",
      "tasks.max": 1,
      "connection.url": "jdbc:postgresql://postgresql-dev:5432/postgres?user=postgres&password=pgpass",
      "table.whitelist": "personal_data",
      "mode": "incrementing",
      "incrementing.column.name": "id",
      "topic.prefix": "kafka-connect-",
      "poll.interval.ms": 1000
    }
  }' \
  http://kafka-connect-cp-kafka-connect:8083/connectors
  • "name": "kafka-connector" - укажем имя для коннектора.

  • "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector" - определяет класс коннектора, позволяющий читать данные из базы данных с использованием JDBC.

  • "key.converter": "io.confluent.connect.avro.AvroConverter" - указываем конвертер для ключей сообщений, который будет использоваться при записи данных в Kafka. Используем AvroConverter, который позволяет сериализовать ключи сообщений в Avro формат перед отправкой в Kafka.

  • "value.converter": "io.confluent.connect.avro.AvroConverter" - задаем конвертер для значений сообщений, который будет использоваться при записи данных в Kafka. Аналогично ключам, значения также сериализуются в Avro формат.

  • "key.converter.schema.registry.url": "http://schema-registry-cp-schema-registry:8081" - указываем URL-адрес Schema Registry, который будет использоваться для регистрации схемы ключей сообщений в Avro формате.

  • "value.converter.schema.registry.url": "http://schema-registry-cp-schema-registry:8081" - указываем URL-адрес Schema Registry, который будет использоваться для регистрации схемы значений сообщений в Avro формате.

  • "tasks.max": 1 - указываем количество потоков для данного коннектора, когда у нас больше 1 пода Kafka Connect, имеет смысл ставить больше потоков.

  • "connection.url": "jdbc:postgresql://postgresql-dev:5432/postgres?user=postgres&password=pgpass" - задаем URL-подключения к базе данных PostgreSQL, которая будет использоваться для чтения данных.

  • "table.whitelist": "personal_data" - определяем список таблиц, которые будут считываться из базы данных. Если не перечислить здесь таблицы, то данные будут считываться из всех таблиц доступных для пользователя переданного в connection.url

  • "mode": "incrementing" - указываем режим считывания данных, это инкрементный режим, который использует значение столбца для определения обновлений.

  • "incrementing.column.name": "id" - указываем имя столбца, используемого для определения обновлений в инкрементном режиме.

  •  "timestamp.column.name": "last_update" - указываем имя столбца, для получения обновлений уже существующих записей

  • "topic.prefix": "kafka-connect-" - задаем префикс для имен топиков, которые будут создаваться в Kafka при записи данных из базы данных.

  • "poll.interval.ms": 1000: Задает интервал между опросами базы данных на предмет обновлений данных. В данном случае, это 1000 миллисекунд (1 секунда).

Дополнительно отмечу, что вручную мы никаких схем регистрировать не будем. Kafka Connect сам создаст необходимые схемы и запишет их в хранилище. Помимо приведённых выше конфигураций, в документации можно найти ещё множество различных настроек.

А работает ли?

После отправки выполним curl -X GET http://kafka-connect-cp-kafka-connect:8083/connectors/kafka-connector/status для получения статуса созданного коннектора. В норме статус будет выглядеть следующим образом:

{
  "name": "kafka-connector",
  "connector": {
    "state": "RUNNING",
    "worker_id": "10.244.2.104:8083"
  },
  "tasks": [
    {
      "id": 0,
      "state": "RUNNING",
      "worker_id": "10.244.2.104:8083"
    }
  ],
  "type": "source"
}

Чтобы получить конфигурацию коннектора необходимо выполнить команду:

curl -X GET http://kafka-connect-cp-kafka-connect:8083/connectors/kafka-connector/config

Теперь получим список доступных топиков в Kafka, используя команду: 

kafka-topics --bootstrap-server kafka:9092 --list

Здесь мы увидим топик с нашими записями из таблицы personal data -> kafka-connect-personal_data. Топик с конфигурациями, оффсетом, статусами -> kafka-connect-cp-kafka-connect-config, kafka-connect-cp-kafka-connect-offset, kafka-connect-cp-kafka-connect-status, соответственно.

Теперь проверим наличие записей в нашем топике: 

kafka-console-consumer --bootstrap-server kafka:9092 --topic kafka-connect-personal_data --from-beginning

В случае, если всё прошло успешно, это будет выглядеть как-то так:

{"id":1,"bank_bic":"1234568","bank_name":null,"last_update":1690113936653}
{"id":2,"bank_bic":"1234569","bank_name":null,"last_update":1690113936752}
{"id":3,"bank_bic":"1234510","bank_name":null,"last_update":1690113936843}
{"id":4,"bank_bic":"1234511","bank_name":null,"last_update":1690113936858}

Теперь посмотрим на схемы в Schema Regestry. Для получения списка доступных схем выполним команду:

curl -X GET http://schema-registry-cp-schema-registry:8081/subjects

В моём случае ответ выглядит так: ["kafka-connect-personal_data-value"]

Теперь узнаем версию схемы: 

curl -X GET http://schema-registry-cp-schema-registry:8081/subjects/kafka-connect-personal_data-value/versions

В моём случае она первая, и теперь посмотрим саму схему:

curl -X GET http://schema-registry-cp-schema-registry:8081/subjects/kafka-connect-personal_data-value/versions/1

Ответ:

{
  "subject": "kafka-connect-personal_data-value",
  "version": 1,
  "id": 1,
  "schema": {
    "type": "record",
    "name": "personal_data",
    "fields": [
      {
        "name": "id",
        "type": "long"
      },
      {
        "name": "bank_bic",
        "type": ["null", "string"],
        "default": null
      },
      {
        "name": "bank_name",
        "type": ["null", "string"],
        "default": null
      },
      {
        "name": "last_update",
        "type": ["null", { "type": "long", "connect.version": 1, "connect.name": "org.apache.kafka.connect.data.Timestamp", "logicalType": "timestamp-millis" }],
        "default": null
      }
    ],
    "connect.name": "personal_data"
  }
}

Мы развернули множество сервисов, убедились, что каждый из них общается друг с другом, все данные успешно доставляются от одного сервису к другому, и всё работает в штатном режиме. Повторим наш флоу: данные попадают в базу данных, откуда их считывает опираясь на поля id (для новых записей) и last_update (для уже существующих) Kafka Connect, после этого он отправляет их брокеру Kafka в отдельный топик, а также генерирует схему данных, которую отправляет в Schema Regestry. Теперь осталось подготовить сервис, который будет получать схемы из реестра, считывать данные из топика, модифицировать их по своему усмотрению и записывать в базу данных. Также подготовим Helm Chart для удобного развёртывания нашего отправителя и получателя. Всё это будет проделано в следующей части статьи.