Как стать автором
Обновить

Как справиться с Debezium + MySql + Spring Could Streams, Part 1

Время на прочтение5 мин
Количество просмотров6K

Добрый день, Хаброжители! Это моя первая статья на Хабре, любые замечания, предложения, пожелания приветствуются!

Предистория

На предыдущем месте работы (в одной Британской компанни) существовала распиленная MySql DB под три микросевиса, назовем их условно Cusomer, Person, Account (у каждого микросервиса своя DB соответственно). К каждой DB был подвязан свой Address (адрес у Customer, у Person, и под Account тоже мог быть подвязан Address). Раньше когда это была одна большая база данных, поиск по Address не составлял труда, но когда базу распилили, это сделать стало сложнее. Было принято решение написать микросервис, который бы подключался по CDC к этим трем DB (технологии: MySql, Debezium, Spring Cloud Streams) и складывал бы только нужные данные для поиска (по Address, по имени, и т.д.) в одну базу данных (MongoDB). В ходе решения этой задачи, я получил интересный опыт, которым и хотел бы поделиться.

Debezium + MySql

Debezium - написанная на Java платформа, которая подключается к бинарному логу MySql (куда он записывает изменения схемы, данных и т.д.), считывает непрочитанные /новые логи и отправляет их в Kafka topic. Дальше мы можем подключиться к этому Kafka топику и обрабатывать сообщения.

Чтобы разобраться с Debezium мне очень помогло видео Виктора Гамова и официальная документация.

Для того, что бы запустить все локально, начнем с docker-compose.yml файла:

version: '2'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:5.5.1
    container_name: zookeeper
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  broker:
    image: confluentinc/cp-server:5.5.1
    hostname: broker
    container_name: broker
    depends_on:
      - zookeeper
    ports:
      - "29092:29092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:9092,PLAINTEXT_HOST://localhost:29092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      CONFLUENT_METRICS_ENABLE: 'false'
      KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1

  connect-debezium:
    image: debezium/connect:1.2
    container_name: connect-debezium
    depends_on:
      - broker
      - person-database
    ports:
      - 8083:8083
    environment:
      BOOTSTRAP_SERVERS: broker:9092
      GROUP_ID: connect-debezium
      CONFIG_STORAGE_TOPIC: docker-connect-debezium-configs
      OFFSET_STORAGE_TOPIC: docker-connect-debezium-offsets
      STATUS_STORAGE_TOPIC: docker-connect-debezium-status
      KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
      VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
    volumes:
      - ${PWD}/scripts:/scripts
  person-database:
    container_name: person-database
    image: mysql
    hostname: person-database
    ports:
      - 3306:3306
    environment:
      MYSQL_ROOT_PASSWORD: admin
      MYSQL_USER: dev
      MYSQL_PASSWORD: password
      MYSQL_DATABASE: person
#    volumes:
#      - ./mysqlconf/mysqlconf.cnf:/etc/mysql/conf.d/mysql.cnf
#      - ./dump:/docker-entrypoint-initdb.d
  mongodb:
    container_name: search-database
    image: mongo
    hostname: search-database
    ports:
      - 27017-27019:27017-27019
    environment:
      MONGO_INITDB_DATABASE: search
      MONGO_INITDB_ROOT_USERNAME: dev
      MONGO_INITDB_ROOT_PASSWORD: password

Здесь мы поднимаем 4 главных контейнера:

  • zookeeper — необходим для менеджмента kafka брокера.

  • broker — это наша kafka с конфигами поключения zookeeper, брокера, и т. д.

  • connect-debezium — standalong debezium, готовый к подключению к kafka.
    Стоит отметить что в официальной документации можно встретить этот docker-compose файл с конфигурациями key/value конверторов в Avro формате ( и еще один контейнер schem-registry для поддержки этого формата). Но в моей задаче обычного string/json конвертера было больше чем достаточно.
    KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
    VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter

  • MySql контейнер.

  • MongoDB контейнер (для части 2 туториала).

Запускаем докер: docker-compose -up

После того как все контейнеры запустились, нужно включить в Msyql бин-лог и дать права debezium подключаться к нему.

Для этого идем в mysql контейнер
docker exec -it <mysql-container-id> mysql -u root -padmin
и выполняем nano /etc/mysql/my.cnf . Т.к. в первоначально текстового редактора в контейнере нет, его можно установить apt-get update && apt-get install nano.

Добавляем после [mysql] следующее:

server_id=42

log_bin=mysql_bin

binlog_format=row

binlog_row_image=full

expire_logs_days=10

!Важно, после сохранения нужно ре-стартануть контейнер!

Также необходимо дать права debezium:

CREATE USER 'debezium'@'%' IDENTIFIED BY 'dbz';
GRANT ALL PRIVILEGES ON *.* TO 'debezium'@'%';
ALTER USER 'debezium'@'%' IDENTIFIED WITH mysql_native_password BY 'dbz';

Когда наша БД настроена, настало время подружить ее с коннектором. Для этого нужно отправить метод POST с конфигами коннектора в теле запроса.

curl -i -X POST -H "Content-Type:application/json" \
http://localhost:8083/connectors
-d '{

  "name": "data-connector",  
  "config": {
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "tasks.max": "1",  
    "database.hostname": "person-database",  
    "database.port": "3306",
    "database.user": "debezium",
    "database.allowPublicKeyRetrieval":"true",
    "database.password": "dbz",
    "database.server.id": "42",  
    "database.server.name": "mysql_server",  
    "database.include.list": "person",  
    "database.history.kafka.bootstrap.servers": "broker:9092", 
    "database.history.kafka.topic": "schema-changes.person",
    "include.schema.changes":"false",
    "transforms": "unwrap",
    "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
    "transforms.unwrap.add.headers": "db",
    "transforms.unwrap.add.fields": "op,table",
    "transforms.unwrap.drop.tombstones": "true",
    "transforms.unwrap.delete.handling.mode": "rewrite"
  }
}'

Для этого мне удобнее всего использовать Postman.

Здесь используется Kafka-Сonnect, возможно трансформировать сообщение на этапе генерации. Для этого используется «transform.*». Об этом можно детально почитать здесь и здесь. Например важно удалять tomb-stone сообщения.

Т.к. мне не нужно было читать логи об изменениях схемы, я их выключил:
"include.schema.changes":"false"
(по дефолту «true»)

И так, оправляем Post на http://localhost:8083/connectors.

Дополнительно проверяем наш коннектор:
curl -i -X GET http://localhost:8083/connectors/data-connector
curl -i -X GET http://localhost:8083/connectors/data-connector/status
Ошибок нет, видим статус RUNNING, значит все норм.

Если все-таки есть ошибки, из практики, скорее всего неправильно указан пароль\права debezium или просто не рестартанули контейнер mysql. Вообще прилетает сразу весь список ошибок, можно разобраться.

Итак наш CDC готова, давайте посмотрим топики в kafka:
docker exec -i broker /usr/bin/kafka-topics --list --bootstrap-server broker:29092

Коннектор создал служебные топики в kafka, но это еще не топики для сообщений об изменениях.

Зайдем в наш mysql контейнер и попробуем создать простую таблицу:

CREATE TABLE IF NOT EXISTS tasks (     
  task_id INT AUTO_INCREMENT PRIMARY KEY,     
  title VARCHAR(255) NOT NULL,     
  start_date DATE,     
  status TINYINT NOT NULL,     
  priority TINYINT NOT NULL,     
  description TEXT,     
  created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP 
)  ENGINE=INNODB;

Опять смотрим топики в kafka:

docker exec -i broker /usr/bin/kafka-topics --list --bootstrap-server broker:29092

Ничего не поменялось
Ничего не поменялось

Дело в том, что топики создаются в kafka с «lazy approach» и т.к. мы отключили логи изменения схемы БД, нам ничего не прилетело, топик не создался.

Теперь давайте добавим данные в таблицу:

INSERT INTO tasks (title, start_date, status, priority, description) 
VALUES ("title1", "2021-05-28", "1", "1", "description1");

Опять смотрим топики в kafka:

О чудо, у нас появился новый топик mysql_server.person.tasks!
Тут важно понимать, что коннектор создает в Kafka отдельный топик под каждую таблицу - one topic per table.

Подключаем консольного консьюмера
docker exec -i broker /usr/bin/kafka-console-consumer --bootstrap-server broker:29092 --topic mysql_server.person.tasks --from-beginning

и видим наш лог:

Если эта статья окажется полезной и будет одобрена модератором, в следующей статье я расскажу о новой модной штуке - как создать Spring Cloud Streams приложение (с Spring Function Consumer, без deprecated @StreamListener).

Теги:
Хабы:
+7
Комментарии9

Публикации

Изменить настройки темы

Истории

Работа

Java разработчик
352 вакансии

Ближайшие события

Weekend Offer в AliExpress
Дата20 – 21 апреля
Время10:00 – 20:00
Место
Онлайн
Конференция «Я.Железо»
Дата18 мая
Время14:00 – 23:59
Место
МоскваОнлайн