company_banner

Знакомство с Debezium — CDC для Apache Kafka



    В своей работе я часто сталкиваюсь с новыми техническими решениями/программными продуктами, информации о которых в русскоязычном интернете довольно мало. Этой статьей постараюсь восполнить один такой пробел примером из своей недавней практики, когда потребовалось настроить отправку CDC-событий из двух популярных СУБД (PostgreSQL и MongoDB) в кластер Kafka при помощи Debezium. Надеюсь, эта обзорная статья, появившаяся по итогам проделанной работы, окажется полезной и другим.

    Что за Debezium и вообще CDC?


    Debezium — представитель категории программного обеспечения CDC (Capture Data Change), а если точнее — это набор коннекторов для различных СУБД, совместимых с фреймворком Apache Kafka Connect.

    Это Open Source-проект, использующий лицензию Apache License v2.0 и спонсируемый компанией Red Hat. Разработка ведётся с 2016 года и на данный момент в нем представлена официальная поддержка следующих СУБД: MySQL, PostgreSQL, MongoDB, SQL Server. Также существуют коннекторы для Cassandra и Oracle, но на данный момент они находятся в статусе «раннего доступа», а новые релизы не гарантируют обратной совместимости.

    Если сравнивать CDC с традиционным подходом (когда приложение читает данные из СУБД напрямую), то к его главным преимуществам относят реализацию стриминга изменения данных на уровне строк с низкой задержкой, высокой надежностью и доступностью. Последние два пункта достигаются благодаря использованию кластера Kafka в качестве хранилища CDC-событий.

    Также к достоинствам можно отнести тот факт, что для хранения событий используется единая модель, поэтому конечному приложению не придётся беспокоиться о нюансах эксплуатации различных СУБД.

    Наконец, благодаря использованию брокера сообщений открывается простор для горизонтального масштабирования приложений, отслеживающих изменения в данных. При этом влияние на источник данных сводится к минимуму, поскольку получение данных происходит не напрямую из СУБД, а из кластера Kafka.

    Об архитектуре Debezium


    Использование Debezium сводится к такой простой схеме:

    СУБД (как источник данных) → коннектор в Kafka Connect → Apache Kafka → консьюмер

    В качестве иллюстрации приведу схему с сайта проекта:



    Однако эта схема мне не очень нравится, поскольку складывается впечатление, что возможно только использование sink-коннектора.

    В действительности же ситуация отличается: наполнение вашего Data Lake (последнее звено на схеме выше) ­— это не единственный способ применения Debezium. События, отправленные в Apache Kafka, могут быть использоваться вашими приложениями для решения различных ситуаций. Например:

    • удаление неактуальных данных из кэша;
    • отправка уведомлений;
    • обновления поисковых индексов;
    • некое подобие логов аудита;

    В случае, если у вас приложение на Java и нет необходимости/возможности использовать кластер Kafka, существует также возможность работы через embedded-коннектор. Очевидный плюс в том, что с ним можно отказаться от дополнительной инфраструктуры (в виде коннектора и Kafka). Однако это решение объявлено устаревшим (deprecated) с версии 1.1 и больше не рекомендуется к использованию (в будущих релизах его поддержку могут убрать).

    В данной статье будет рассматриваться рекомендуемая разработчиками архитектура, которая обеспечивает отказоустойчивость и возможность масштабирования.

    Конфигурация коннектора


    Для того, чтобы начать отслеживать изменения самой главной ценности — данных, — нам потребуются:

    1. источник данных, которым может являться MySQL начиная с версии 5.7, PostgreSQL 9.6+, MongoDB 3.2+ (полный список);
    2. кластер Apache Kafka;
    3. инстанс Kafka Connect (версии 1.x, 2.x);
    4. сконфигурированный коннектор Debezium.

    Работы по первым двум пунктам, т.е. процесс инсталляции СУБД и Apache Kafka, выходят за рамки статьи. Однако для тех, кто хочет развернуть всё в песочнице, в официальном репозитории с примерами есть готовый docker-compose.yaml.

    Мы же остановимся подробнее на двух последних пунктах.

    0. Kafka Connect


    Здесь и далее в статье все примеры конфигурации рассматриваются в контексте Docker-образа, распространяемого разработчиками Debezium. Он содержит все необходимые файлы плагинов (коннекторы) и предусматривает конфигурацию Kafka Connect при помощи переменных окружения.

    В случае, если предполагается использование Kafka Connect от Confluent, потребуется самостоятельно добавить плагины необходимых коннекторов в директорию, указанную в plugin.path или задаваемую через переменную окружения CLASSPATH. Настройки воркера Kafka Connect и коннекторов определяются через конфигурационные файлы, которые передаются аргументами к команде запуска воркера. Подробнее см. в документации.

    Весь процесс по настройке Debeizum в варианте с коннектором осуществляется в два этапа. Рассмотрим каждый из них:

    1. Настройка фреймворка Kafka Connect


    Для стриминга данных в кластер Apache Kafka во фреймворке Kafka Connect задаются специфичные параметры, такие как:

    • параметры подключения к кластеру,
    • названия топиков, в которых будет храниться непосредственно конфигурация самого коннектора,
    • имя группы, в которой запущен коннектор (в случае использования distributed-режима).

    Официальный Docker-образ проекта поддерживает конфигурацию при помощи переменных окружения — этим и воспользуемся. Итак, скачиваем образ:

    docker pull debezium/connect

    Минимальный набор переменных окружения, необходимый для запуска коннектора, выглядит следующим образом:

    • BOOTSTRAP_SERVERS=kafka-1:9092,kafka-2:9092,kafka-3:9092 — начальный список серверов кластера Kafka для получения полного списка членов кластера;
    • OFFSET_STORAGE_TOPIC=connector-offsets — топик для хранения позиций, на которых на данный момент находится коннектор;
    • CONNECT_STATUS_STORAGE_TOPIC=connector-status — топик для хранения статуса коннектора и его заданий;
    • CONFIG_STORAGE_TOPIC=connector-config — топик для хранения данных конфигурации коннектора и его заданий;
    • GROUP_ID=1 — идентификатор группы воркеров, на которых может выполняться задание коннектора; необходим при использовании распределённого (distributed) режима.

    Запускаем контейнер с этими переменными:

    docker run \
      -e BOOTSTRAP_SERVERS='kafka-1:9092,kafka-2:9092,kafka-3:9092' \
      -e GROUP_ID=1 \
      -e CONFIG_STORAGE_TOPIC=my_connect_configs \
      -e OFFSET_STORAGE_TOPIC=my_connect_offsets \
      -e STATUS_STORAGE_TOPIC=my_connect_statuses  debezium/connect:1.2

    Примечание про Avro


    По умолчанию Debezium пишет данные в формате JSON, что приемлемо для песочниц и небольших объёмов данных, но может стать проблемой в высоконагруженных базах. Альтернативой JSON-конвертеру является сериализация сообщений при помощи Avro в бинарный формат, что позволяет снизить нагрузку на подсистему I/O в Apache Kafka.

    Для использования Avro требуется развернуть отдельный schema-registry (для хранения схем). Переменные для конвертера будут выглядеть следующим образом:

    name: CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL
    value: http://kafka-registry-01:8081/
    name: CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL
    value: http://kafka-registry-01:8081/
    name: VALUE_CONVERTER   
    value: io.confluent.connect.avro.AvroConverter

    Детали по использованию Avro и настройке registry за него выходят за рамки статьи — далее для наглядности мы будет использовать JSON.

    2. Настройка самого коннектора


    Теперь можно перейти непосредственно к конфигурации самого коннектора, который будет читать данные из источника.

    Рассмотрим на примере коннекторов для двух СУБД: PostgreSQL и MongoDB, — по которым у меня есть опыт и по которым имеются отличия (пусть и небольшие, но в некоторых случаях — существенные!).

    Конфигурация описывается в нотации JSON и загружается в Kafka Connect при помощи POST-запроса.

    2.1. PostgreSQL


    Пример конфигурации коннектора для PostgreSQL:

    {
      "name": "pg-connector",
      "config": {
        "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
        "plugin.name": "pgoutput",
        "database.hostname": "127.0.0.1",
        "database.port": "5432",
        "database.user": "debezium",
        "database.password": "definitelynotpassword",
        "database.dbname" : "dbname",
        "database.server.name": "pg-dev",
        "table.include.list": "public.(.*)",
        "heartbeat.interval.ms": "5000",
        "slot.name": "dbname_debezium",
        "publication.name": "dbname_publication",
        "transforms": "AddPrefix",
        "transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
        "transforms.AddPrefix.regex": "pg-dev.public.(.*)",
        "transforms.AddPrefix.replacement": "data.cdc.dbname"
      }
    }

    Принцип работы коннектора после такой настройки довольно прост:

    • При первом запуске он подключается к базе, указанной в конфигурации, и запускается в режиме initial snapshot, отправляя в Kafka начальный набор данных, полученных с помощью условного SELECT * FROM table_name.
    • После того, как инициализация будет завершена, коннектор переходит в режим чтения изменений из WAL-файлов PostgreSQL.

    Об используемых опциях:

    • name — имя коннектора, для которого используется конфигурация, описанная ниже; в дальнейшем это имя используется для работы с коннектором (т.е. просмотра статуса/перезапуска/обновления конфигурации) через REST API Kafka Connect;
    • connector.class — класс коннектора СУБД, который будет использоваться конфигурируемым коннектором;
    • plugin.name — название плагина для логического декодирования данных из WAL-файлов. На выбор доступны wal2json, decoderbuffs и pgoutput. Первые два требуют установки соответствующих расширений в СУБД, а pgoutput для PostgreSQL версии 10 и выше не требует дополнительных манипуляций;
    • database.* — опции для подключения к БД, где database.server.name — имя инстанса PostgreSQL, используемое для формирования имени топика в кластере Kafka;
    • table.include.list — список таблиц, в которых мы хотим отслеживать изменения; задаётся в формате schema.table_name; нельзя использовать вместе с table.exclude.list;
    • heartbeat.interval.ms — интервал (в миллисекундах), с которым коннектор отправляет heartbeat-сообщения в специальный топик;
    • heartbeat.action.query — запрос, который будет выполняться при отправке каждого heartbeat-сообщения (опция появилась с версии 1.1);
    • slot.name — имя слота репликации, который будет использоваться коннектором;
    • publication.name — имя публикации в PostgreSQL, которую использует коннектор. В случае, если её не существует, Debezium попытается её создать. В случае, если у пользователя, под которым происходит подключение, недостаточно прав для этого действия — коннектор завершит работу с ошибкой;
    • transforms определяет, как именно изменять название целевого топика:
      • transforms.AddPrefix.type указывает, что будем использовать регулярные выражения;
      • transforms.AddPrefix.regex — маска, по которой переопределяется название целевого топика;
      • transforms.AddPrefix.replacement — непосредственно то, на что переопределяем.

    Подробнее про heartbeat и transforms


    По умолчанию коннектор отправляет данные в Kafka по каждой коммитнутой транзакции, а её LSN (Log Sequence Number) записывает в служебный топик offset. Но что произойдет, если коннектор настроен на чтение не всей базы целиком, а только части её таблиц (в которых обновление данных происходит не часто)?

    • Коннектор будет читать WAL-файлы и не обнаруживать в них коммита транзакций в те таблицы, за которыми он следит.
    • Поэтому он не будет обновлять свою текущую позицию ни в топике, ни в слоте репликации.
    • Это, в свою очередь, приведёт к «удержанию» WAL-файлов на диске и вероятному исчерпанию всего дискового пространства.

    И тут на помощь приходят опции heartbeat.interval.ms и heartbeat.action.query. Использование этих опций в паре даёт возможность каждый раз при отправке heartbeat-сообщения выполнять запрос на изменение данных в отдельной таблице. Тем самым постоянно актуализируется LSN, на котором сейчас находится коннектор (в слоте репликации). Это позволяет СУБД удалить WAL-файлы, которые более не нужны. Подробнее узнать о работе опций можно в документации.

    Другая опция, достойная более пристального внимания, — это transforms. Хотя она скорее про удобство и красоту…

    По умолчанию Debezium создаёт топики, руководствуясь следующей политикой именования: serverName.schemaName.tableName. Это не всегда может быть удобно. Опциями transforms можно с помощью регулярных выражений определять список таблиц, эвенты из которых нужно маршрутизировать в топик с конкретным названием.

    В нашей конфигурации благодаря transforms происходит следующее: все CDC-события из отслеживаемой БД попадут в топик с именем data.cdc.dbname. В противном случае (без этих настроек) Debezium по умолчанию бы создавал по топику на каждую таблицу вида: pg-dev.public.<table_name>.

    Ограничения коннектора


    В завершении описания конфигурации коннектора для PostgreSQL стоит рассказать о следующих особенностях/ограничениях его работы:

    1. Функционал коннектора для PostgreSQL полагается на концепцию логического декодирования. Поэтому он не отслеживает запросы на изменение структуры БД (DDL) — соответственно, в топиках этих данных не будет.
    2. Так как используются слоты репликации, подключение коннектора возможно только к ведущему экземпляру СУБД.
    3. Если пользователю, под которым коннектор подключается к базе данных, выданы права только на чтение, то перед первым запуском потребуется вручную создать слот репликации и публикацию в БД.

    Применение конфигурации


    Итак, загрузим нашу конфигурацию в коннектор:

    curl -i -X POST -H "Accept:application/json" \
      -H  "Content-Type:application/json" \ http://localhost:8083/connectors/ \
      -d @pg-con.json

    Проверяем, что загрузка прошла успешно и коннектор запустился:

    $ curl -i http://localhost:8083/connectors/pg-connector/status 
    HTTP/1.1 200 OK
    Date: Thu, 17 Sep 2020 20:19:40 GMT
    Content-Type: application/json
    Content-Length: 175
    Server: Jetty(9.4.20.v20190813)
    
    {"name":"pg-connector","connector":{"state":"RUNNING","worker_id":"172.24.0.5:8083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"172.24.0.5:8083"}],"type":"source"}

    Отлично: он настроен и готов к работе. Теперь прикинемся консьюмером и подключимся к Kafka, после чего добавим и изменим запись в таблице:

    $ kafka/bin/kafka-console-consumer.sh \
      --bootstrap-server kafka:9092 \
      --from-beginning \
      --property print.key=true \
      --topic data.cdc.dbname
    

    postgres=# insert into customers (id, first_name, last_name, email) values (1005, 'foo', 'bar', 'foo@bar.com');
    INSERT 0 1
    postgres=# update customers set first_name = 'egg' where id = 1005;
    UPDATE 1

    В нашем топике это отобразится следующим образом:

    Очень длинный JSON с нашими изменениями
    {
      "schema":{
        "type":"struct",
        "fields":[
          {
            "type":"int32",
            "optional":false,
            "field":"id"
          }
        ],
        "optional":false,
        "name":"data.cdc.dbname.Key"
      },
      "payload":{
        "id":1005
      }
    }{
      "schema":{
        "type":"struct",
        "fields":[
          {
            "type":"struct",
            "fields":[
              {
                "type":"int32",
                "optional":false,
                "field":"id"
              },
              {
                "type":"string",
                "optional":false,
                "field":"first_name"
              },
              {
                "type":"string",
                "optional":false,
                "field":"last_name"
              },
              {
                "type":"string",
                "optional":false,
                "field":"email"
              }
            ],
            "optional":true,
            "name":"data.cdc.dbname.Value",
            "field":"before"
          },
          {
            "type":"struct",
            "fields":[
              {
                "type":"int32",
                "optional":false,
                "field":"id"
              },
              {
                "type":"string",
                "optional":false,
                "field":"first_name"
              },
              {
                "type":"string",
                "optional":false,
                "field":"last_name"
              },
              {
                "type":"string",
                "optional":false,
                "field":"email"
              }
            ],
            "optional":true,
            "name":"data.cdc.dbname.Value",
            "field":"after"
          },
          {
            "type":"struct",
            "fields":[
              {
                "type":"string",
                "optional":false,
                "field":"version"
              },
              {
                "type":"string",
                "optional":false,
                "field":"connector"
              },
              {
                "type":"string",
                "optional":false,
                "field":"name"
              },
              {
                "type":"int64",
                "optional":false,
                "field":"ts_ms"
              },
              {
                "type":"string",
                "optional":true,
                "name":"io.debezium.data.Enum",
                "version":1,
                "parameters":{
                  "allowed":"true,last,false"
                },
                "default":"false",
                "field":"snapshot"
              },
              {
                "type":"string",
                "optional":false,
                "field":"db"
              },
              {
                "type":"string",
                "optional":false,
                "field":"schema"
              },
              {
                "type":"string",
                "optional":false,
                "field":"table"
              },
              {
                "type":"int64",
                "optional":true,
                "field":"txId"
              },
              {
                "type":"int64",
                "optional":true,
                "field":"lsn"
              },
              {
                "type":"int64",
                "optional":true,
                "field":"xmin"
              }
            ],
            "optional":false,
            "name":"io.debezium.connector.postgresql.Source",
            "field":"source"
          },
          {
            "type":"string",
            "optional":false,
            "field":"op"
          },
          {
            "type":"int64",
            "optional":true,
            "field":"ts_ms"
          },
          {
            "type":"struct",
            "fields":[
              {
                "type":"string",
                "optional":false,
                "field":"id"
              },
              {
                "type":"int64",
                "optional":false,
                "field":"total_order"
              },
              {
                "type":"int64",
                "optional":false,
                "field":"data_collection_order"
              }
            ],
            "optional":true,
            "field":"transaction"
          }
        ],
        "optional":false,
        "name":"data.cdc.dbname.Envelope"
      },
      "payload":{
        "before":null,
        "after":{
          "id":1005,
          "first_name":"foo",
          "last_name":"bar",
          "email":"foo@bar.com"
        },
        "source":{
          "version":"1.2.3.Final",
          "connector":"postgresql",
          "name":"dbserver1",
          "ts_ms":1600374991648,
          "snapshot":"false",
          "db":"postgres",
          "schema":"public",
          "table":"customers",
          "txId":602,
          "lsn":34088472,
          "xmin":null
        },
        "op":"c",
        "ts_ms":1600374991762,
        "transaction":null
      }
    }{
      "schema":{
        "type":"struct",
        "fields":[
          {
            "type":"int32",
            "optional":false,
            "field":"id"
          }
        ],
        "optional":false,
        "name":"data.cdc.dbname.Key"
      },
      "payload":{
        "id":1005
      }
    }{
      "schema":{
        "type":"struct",
        "fields":[
          {
            "type":"struct",
            "fields":[
              {
                "type":"int32",
                "optional":false,
                "field":"id"
              },
              {
                "type":"string",
                "optional":false,
                "field":"first_name"
              },
              {
                "type":"string",
                "optional":false,
                "field":"last_name"
              },
              {
                "type":"string",
                "optional":false,
                "field":"email"
              }
            ],
            "optional":true,
            "name":"data.cdc.dbname.Value",
            "field":"before"
          },
          {
            "type":"struct",
            "fields":[
              {
                "type":"int32",
                "optional":false,
                "field":"id"
              },
              {
                "type":"string",
                "optional":false,
                "field":"first_name"
              },
              {
                "type":"string",
                "optional":false,
                "field":"last_name"
              },
              {
                "type":"string",
                "optional":false,
                "field":"email"
              }
            ],
            "optional":true,
            "name":"data.cdc.dbname.Value",
            "field":"after"
          },
          {
            "type":"struct",
            "fields":[
              {
                "type":"string",
                "optional":false,
                "field":"version"
              },
              {
                "type":"string",
                "optional":false,
                "field":"connector"
              },
              {
                "type":"string",
                "optional":false,
                "field":"name"
              },
              {
                "type":"int64",
                "optional":false,
                "field":"ts_ms"
              },
              {
                "type":"string",
                "optional":true,
                "name":"io.debezium.data.Enum",
                "version":1,
                "parameters":{
                  "allowed":"true,last,false"
                },
                "default":"false",
                "field":"snapshot"
              },
              {
                "type":"string",
                "optional":false,
                "field":"db"
              },
              {
                "type":"string",
                "optional":false,
                "field":"schema"
              },
              {
                "type":"string",
                "optional":false,
                "field":"table"
              },
              {
                "type":"int64",
                "optional":true,
                "field":"txId"
              },
              {
                "type":"int64",
                "optional":true,
                "field":"lsn"
              },
              {
                "type":"int64",
                "optional":true,
                "field":"xmin"
              }
            ],
            "optional":false,
            "name":"io.debezium.connector.postgresql.Source",
            "field":"source"
          },
          {
            "type":"string",
            "optional":false,
            "field":"op"
          },
          {
            "type":"int64",
            "optional":true,
            "field":"ts_ms"
          },
          {
            "type":"struct",
            "fields":[
              {
                "type":"string",
                "optional":false,
                "field":"id"
              },
              {
                "type":"int64",
                "optional":false,
                "field":"total_order"
              },
              {
                "type":"int64",
                "optional":false,
                "field":"data_collection_order"
              }
            ],
            "optional":true,
            "field":"transaction"
          }
        ],
        "optional":false,
        "name":"data.cdc.dbname.Envelope"
      },
      "payload":{
        "before":{
          "id":1005,
          "first_name":"foo",
          "last_name":"bar",
          "email":"foo@bar.com"
        },
        "after":{
          "id":1005,
          "first_name":"egg",
          "last_name":"bar",
          "email":"foo@bar.com"
        },
        "source":{
          "version":"1.2.3.Final",
          "connector":"postgresql",
          "name":"dbserver1",
          "ts_ms":1600375609365,
          "snapshot":"false",
          "db":"postgres",
          "schema":"public",
          "table":"customers",
          "txId":603,
          "lsn":34089688,
          "xmin":null
        },
        "op":"u",
        "ts_ms":1600375609778,
        "transaction":null
      }
    }


    В обоих случаях записи состоят из ключа (PK) записи, которая была изменена, и непосредственно самой сути изменений: какой была запись до и какой стала после.

    • В случае с INSERT: значение до (before) равно null, а после — строка, которая была вставлена.
    • В случае с UPDATE: в payload.before отображается предыдущее состояние строки, а в payload.after — новое с сутью изменений.

    2.2 MongoDB


    Этот коннектор использует стандартный механизм репликации MongoDB, считывая информацию из oplog'а primary-узла СУБД.

    Аналогично уже описанному коннектору для PgSQL, здесь тоже при первом запуске снимается первичный снапшот данных, после чего коннектор переключается на режим чтения oplog’а.

    Пример конфигурации:

    {
      "name": "mp-k8s-mongo-connector",
      "config": {
            "connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
            "tasks.max": "1",
            "mongodb.hosts": "MainRepSet/mongo:27017",
            "mongodb.name": "mongo",
            "mongodb.user": "debezium",
            "mongodb.password": "dbname",
            "database.whitelist": "db_1,db_2",
            "transforms": "AddPrefix",
            "transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
            "transforms.AddPrefix.regex": "mongo.([a-zA-Z_0-9]*).([a-zA-Z_0-9]*)",
            "transforms.AddPrefix.replacement": "data.cdc.mongo_$1"
            }
      }

    Как можно заметить, здесь нет новых опций по сравнению с прошлым примером, но сократилось лишь количество опций, отвечающих за подключение к БД и их префиксы.

    Настройки transforms в этот раз делают следующее: превращают имя целевого топика из схемы <server_name>.<db_name>.<collection_name> в data.cdc.mongo_<db_name>.

    Отказоустойчивость


    Вопрос отказоустойчивости и высокой доступности в наше время стоит как никогда остро — особенно когда мы говорим про данные и транзакции, и отслеживание изменений данных не стоит в этом вопросе в стороне. Рассмотрим, что в принципе может пойти не так и что будет происходить с Debezium в каждом из случаев.

    Есть три варианта отказа:

    1. Отказ Kafka Connect. Если Connect настроен на работу в распределенном режиме, для этого необходимо нескольким воркерам задать одинаковый group.id. Тогда при отказе одного из них коннектор будет перезапущен на другом воркере и продолжит чтение с последней коммитнутой позиции в топике в Kafka.
    2. Потеря связности с Kafka-кластером. Коннектор просто остановит чтение на позиции, которую не удалось отправить в Kafka, и будет периодически пытаться повторно отправить её, пока попытка не завершится успехом.
    3. Недоступность источника данных. Коннектор будет производить попытки повторного подключения к источнику в соответствии с конфигурацией. По умолчанию это 16 попыток с использованием exponential backoff. После 16-й неудачной попытки таск будет помечен как failed и потребуется его ручной перезапуск через REST-интерфейс Kafka Connect.
      • В случае с PostgreSQL данные не пропадут, т.к. использование слотов репликации не даст удалить WAL-файлы, не прочитанные коннектором. В этом случае есть и обратная сторона медали: если на продолжительное время будет нарушена сетевая связность между коннектором и СУБД, есть вероятность, что место на диске закончится, а это может может привести к отказу СУБД целиком.
      • В случае с MySQL файлы бинлогов могут быть отротированы самой СУБД раньше, чем восстановится связность. Это приведёт к тому, что коннектор перейдёт в состояние failed, а для восстановления нормального функционирования потребуется повторный запуск в режиме initial snapshot для продолжения чтения из бинлогов.
      • Про MongoDB. Документация гласит: поведение коннектора в случае, если файлы журналов/oplog'а были удалены и коннектор не может продолжить чтение с той позиции, где остановился, одинаково для всех СУБД. Оно заключается в том, что коннектор перейдёт в состояние failed и потребует повторного запуска в режиме initial snapshot.

        Однако бывают исключения. Если коннектор продолжительное время находился в отключенном состоянии (или не мог достучаться до экземпляра MongoDB), а oplog за это время прошёл ротацию, то при восстановлении подключения коннектор невозмутимо продолжит читать данные с первой доступной позиции, из-за чего часть данных в Kafka не попадёт.

    Заключение


    Debezium — мой первый опыт работы с CDC-системами и в целом весьма положительный. Проект подкупил поддержкой основных СУБД, простотой конфигурации, поддержкой кластеризации и активным сообществом. Заинтересовавшимся практикой рекомендую ознакомиться с гайдами для Kafka Connect и Debezium.

    По сравнению с JDBC-коннектором для Kafka Connect основным преимуществом Debezium является то, что изменения считываются из журналов СУБД, что позволяет получать данные с минимальной задержкой. JDBC Connector (из поставки Kafka Connect) делает запросы к отслеживаемой таблице с фиксированным интервалом и (по этой же причине) не генерирует сообщения при удалении данных (как можно запросить данные, которых нет?).

    Для решения схожих задач можно обратить внимание на следующие решения (помимо Debezium):


    P.S.


    Читайте также в нашем блоге:

    Флант
    DevOps-as-a-Service, Kubernetes, обслуживание 24×7

    Комментарии 1

      +2
      В статье указан везде режим «initial snapshot», но я бы еще обратил внимание на настройку snapshot.mode.
      debezium.io/documentation/reference/connectors/postgresql.html#postgresql-property-snapshot-mode
      Например, зачастую не нужно/невозможно переливать все данные, которые уже есть в базе и этого можно избежать, установив snapshot.mode=never

      Только полноправные пользователи могут оставлять комментарии. Войдите, пожалуйста.

      Самое читаемое