Я активно использую на проекте Kafka Connect Framework и в частности Kafka JDBC Sink Connector для быстрого сохранения данных из Kafka Topic в БД PostgresSQL. Для большинства задач достаточно написать простую JSON-конфигурацию и все стабильно и быстро работает из коробки. Нет необходимости в написании собственного кода. Однако в нетиповых ситуациях расширяемость Kafka Connect тоже помогает - можно переопределить и написать один из компонентов.

В конфигурации JDBC Sink Connector Task существует настройка dialect.name, которая отвечает за выбор диалекта для работы с конкретной БД. Как правило, в 99% случаев используется один из уже реализованных для популярных БД диалектов, как например в моем случае PostgresSqlDatabaseDialect для PostgreSQL.

Может показаться, что вряд ли кому-то понадобится реализовывать свой диалект, если только не имеем дело с какой-то специфичной непопулярной БД. Однако на практике оказалось, что реализация своего диалекта даже для PostgreSQL может быть полезна для решения некоторых возникающих прикладных задач.

В данной статье я хочу показать идеи того, как реализация своего DatabaseDialect может помочь при имплементации нестандартных сценариев для вполне себе популярной БД Postgres, для которой существует PostgresSqlDatabaseDialect.

Что такое DatabaseDialect и зачем он нужен

DatabaseDialect - это слой абстракции в Kafka JDBC Sink Connector, который отвечает за генерацию SQL-запросов и взаимодействие с конкретной БД через JDBC. Именно в диалекте определяется, какие SQL конструкции будут использоваться при вставке, обновлении или удалении данных.

Каждая БД имеет свои особенности: различия в синтаксисе расширенного SQL, поддержке upsert, типах данных и т.д. Для того, чтобы учесть все эти особенности существуют различные реализации интерфейса DatabaseDialect.

Выбор диалекта происходит в момент старта коннектора. Для этого коннектор смотрит на настройку dialect.name. Все доступные реализации DatabaseDialectProvider загружаются через Java SPI и выбирается provider, имя которого явно совпадает со значением настройки dialect.name.

Т.е. если возникает необходимость изменить или расширить SQL-запросы, например для использования каких-то дополнительных возможностей или, как будет разобрано далее, нестандартное поведение при конфликте вставки, то для подобных задач необходимо переопределять именно поведение в DatabaseDialect. Например с помощью SMT ( Single Message Transform) это сделать не получится, т.к. они работают на уровне сообщений Kafka и не влияют на генерацию SQL.

Реализация и подключение своего DatabaseDialect

Реализация интерфейса DatabaseDialect

Свой диалект должен реализовывать интерфейс DatabaseDialect. На практике скорей всего понадобиться расширить один из существующих как в моем случае PostgreSqlDatabaseDialect.

Реализация DatabaseDialectProvider

Kafka JDBC Sink Connector не создает диалекты напрямую. Для этого используется DatabaseDialectProvider. Минимальная реализация provider’а в моем случае выглядит так:

class MyProvider : DatabaseDialectProvider.SubprotocolBasedProvider(
    MyPostgresSqlDatabaseDialect::class.java.simpleName,
    "postgresql"
) {
    override fun create(config: AbstractConfig): DatabaseDialect = MyPostgresSqlDatabaseDialect(config)
}

Здесь собственно указывается уникальное имя диалекта, поддерживаемый JDBC subprotocol и фабричный метод для создания инстанса диалекта. При старте коннектор должен будет найти данный provider и с помощью него создать и использовать соответств��ющий диалект.

Регистрация через Java SPI

Для того, чтобы Kafka Connect, используя механизм Java SPI, смог обнаружить новый диалект и подключить его в jar-файл с диалектом необходимо добавить файл META-INF/services/io.confluent.connect.jdbc.dialect.DatabaseDialectProvider с содержимым:

my.kafka.connect.dialect.provider.MyProvider

Подключение jar в Kafka Connector

Собранный jar необходимо положить в classpath Kafka Connect до старта worker'a. Я в качестве базового образа использую confluentinc/cp-kafka-connect-base   и поэтому при сборке образа копирую данный артефакт в /libs/ /usr/share/java/kafka-serde-tools/.

Реальные сценарии с примерами реализации

Далее приведены примеры задач, которые удалось решить с помощью реализации DatabaseDialect. Эти примеры очень близки реальным кейсам в PROD, но упрощены для демонстрации.

Может показаться, что данные задачи вызваны промахами при проектирование системы и это скорей всего отчасти так. Однако приходится работать и с такими кейсами. Хорошо, что Kafka Connect со своей расширяемостью и в частности с механизмом переопределения DatabaseDialect позволяет решать подобные задачи достаточно просто и прозрачно, без срочного перепроектирования моделей и миграций.

В качестве примера далее будет использоваться следующая таблица в PostgresSQL:

CREATE TABLE master_data.operation (
	id int8 NOT NULL GENERATED BY DEFAULT AS IDENTITY,
	operation_id varchar(10) NOT NULL,
	field1 text NULL,
	field2 text NULL,
	field3 text NULL,
	field4 int8 NULL,
	CONSTRAINT operation_operation_id_key null,
	CONSTRAINT operation_pkey null
);

В Kafka Connect тасках будет использоваться JsonConverter:

"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "true",

Json Shema Registry использоваться не будет для простоты. При тестировании в топики отправлялись сообщения со схемой:

{
      "schema": {
        "type": "struct",
        "name": "operation",
        "fields": [
            { "field": "operation_id", "type": "string", "optional": false },
            { "field": "field1", "type": "string", "optional": true },
            { "field": "field2", "type": "string", "optional": true },
            { "field": "field3", "type": "string", "optional": true },
            { "field": "field4", "type": "int64",  "optional": true }
            ]
        },
        "payload": {"operation_id":"2","field3":"3","field4":"22"}
    }

Но далее, чтобы не загромождать статью, в примерах будет использоваться только содержимое payload часть.

Игнорирование дублей при вставке

Есть Kafka Topic, данные из которого сохраняются в таблицу PostgreSQL. Необходимо сделать так, чтобы данные из этого топика сохранялись только в том случае, если записи с таким же ключом еще нет в таблице. В эту же таблицу в режиме upsert пишет Kafka JDBC Sink Connector из другого топика. Важно, чтобы данные из первого не перезаписали данные из 2го, но при этом успешно вставлялись в БД, если записи с таким ключом еще нет.

Данную задачу в принципе можно решить на моей версии kafka connect ( confluentinc/cp-kafka-connect-base:7.8.0)   реализовав простую insert-таску для первого топика:

{
  "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",

  "connection.url": "${db.url}",
  "connection.user": "${db.user}",
  "connection.password": "${db.password}",
  "connection.reWriteBatchedInserts": "${db.reWriteBatchedInserts|false}",
  "connection.attempts": "${db.connection.attempts|100}",
  "connection.backoff.ms": "${db.connection.backoff.ms|10000}",

  "dialect.name": "PostgreSqlDatabaseDialect",
  "consumer.override.group.id": "${operation-intermediate.group|master-data-operation-state-received-group}",
  "key.converter": "org.apache.kafka.connect.storage.StringConverter",
  "value.converter": "org.apache.kafka.connect.json.JsonConverter",
  "value.converter.schemas.enable": "true",
  "table.name.format": "operation",
  "auto.create": "false",
  "insert.mode": "insert",
  "pk.mode": "record_value",
  "pk.fields": "operation_id",
  "tasks.max": "1",
  "topics": "operation_duplicate_error",

  "errors.tolerance": "all",
  "errors.log.enable": true,
  "errors.log.include.messages": true,
  "errors.deadletterqueue.topic.name": "${topic.deadLetter}",
  "errors.deadletterqueue.topic.replication.factor": "${replication-factor}",
  "errors.deadletterqueue.context.headers.enable": true
}

В такой конфигурации если сначала послать сообщение:

{"operation_id":"1","field1":"1","field2":"1"}

Запись с operation_id = 1 успешно сохранится в БД. Затем отправим пачку сообщений:

{"operation_id":"2","field1":"2","field2":"2"}
{"operation_id":"1","field1":"3","field2":"3"}
{"operation_id":"3","field1":"3","field2":"3"}

В результате через некоторое время записи с operation_id = 2 и operation_id = 3 будут успешно сохранены в таблицу, а запись с operation_id = 1 не обновится.

Функционально это именно то поведение, которое я ожидаю. Однако если посмотреть логи, то можно увидеть, что пачка обрабатывалась более 30 секунд (при моих настройках ретрая). В логах видно, что при обработке пачки возникали многочисленные ошибки и ретраи:

2026-02-01T19:43:39.340049010Z 2026-02-01 19:43:39 JdbcSinkTask [task-thread-ignoreDuplicatesWithDelay-0] Write of 3 records failed, remainingRetries=0
2026-02-01T19:43:39.340066013Z java.sql.BatchUpdateException: Batch entry 1 INSERT INTO "operation" ("operation_id","field1","field2","field3","field4") VALUES ('1','3','3',NULL,NULL) was aborted: ERROR: duplicate key value violates unique constraint "operation_operation_id_key"
2026-02-01T19:43:39.340069713Z   Detail: Key (operation_id)=(3) already exists.  Call getNextException to see other errors in the batch.

Связано это с тем, что в режиме insert коннектор формирует batch insert запрос из вычитанных записей. В случае ошибки duplicate key value коннектор начинает повторять вставку поблочно, уменьшая размер батча. В конечном итоге он сохранит все записи до проблемной, а дальнейшее поведение будет зависеть от настройки error.tollerance.

При error.tollerance = all (как в моем примере) коннектор попытается сделать ретрай проблемной записи согласно настройкам и затем отправит в dead letter queue, если она задана.

При error.tollerance=none   обработка завершится на проблемной записи и коннектор остановится.

Почему обработка батча с пролемной записью заняло 30 сек? Потому что сначала происходит ретрай текущего батча, затем его дробление, и после этого повторные ретраи проблемной записи. Сколько это займет времени зависит от конкретных настроек интервалов и количества ретраев.

Т.е. функционально текущее решение подходит под требования, но постоянные 30 секундные залипания при появлении дублей могут вызвать существенную деградацию производительности и лаг. Такая же проблема только для MySql описана здесь: при появлении в топике дублей, коннектор продолжает корректно работать, но средняя производительность существенно падает из-за ретрая проблемных операций.

В данном кейсе как раз нам может помочь реализация своего диалекта с   использованием возможностей Postgres. В Postgres можно немного модифицировать INSERT, чтобы при конфликте по ключу не выбрасывалась ошибка:

INSERT INTO operation (operation_id,field1,field2,field3,field4) VALUES (..)
ON CONFLIC ( operation_id) DO NOTHING

Тогда, реализовав свое диалект, можно добиться желаемого поведения без существенной деградации производительности:

class PostgresSqlDatabaseInsertOnConflictDoNothingDialect(config: AbstractConfig) : PostgreSqlDatabaseDialect(config) {
    class Provider : DatabaseDialectProvider.SubprotocolBasedProvider(
        PostgresSqlDatabaseInsertOnConflictDoNothingDialect::class.java.simpleName,
        "postgresql"
    ) {
        override fun create(config: AbstractConfig): DatabaseDialect = PostgresSqlDatabaseInsertOnConflictDoNothingDialect(config)
    }

    override fun buildInsertStatement(
        table: TableId,
        keyColumns: MutableCollection<ColumnId>,
        nonKeyColumns: MutableCollection<ColumnId>,
        definition: TableDefinition
    ): String =
        expressionBuilder()
            .apply {
                append("INSERT INTO ")
                append(table)
                append(" (")
                appendList().delimitedBy(",").transformedBy(ExpressionBuilder.columnNames()).of(keyColumns, nonKeyColumns)
                append(") VALUES (")
                appendList().delimitedBy(",").transformedBy(columnValueVariables(definition)).of(keyColumns, nonKeyColumns)
                append(") ON CONFLICT (")
                appendList().delimitedBy(",").transformedBy(ExpressionBuilder.columnNames()).of(keyColumns)
                append(") DO NOTHING")
            }
            .toString()
}

Конфигурация таски будет выглядеть так:

{
  "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",

  "connection.url": "${db.url}",
  "connection.user": "${db.user}",
  "connection.password": "${db.password}",
  "connection.reWriteBatchedInserts": "${db.reWriteBatchedInserts|false}",
  "connection.attempts": "${db.connection.attempts|100}",
  "connection.backoff.ms": "${db.connection.backoff.ms|10000}",

  "dialect.name": "PostgresSqlDatabaseInsertOnConflictDoNothingDialect",
  "consumer.override.group.id": "${operation-intermediate.group|master-data-operation-state-received-group}",
  "key.converter": "org.apache.kafka.connect.storage.StringConverter",
  "value.converter": "org.apache.kafka.connect.json.JsonConverter",
  "value.converter.schemas.enable": "true",
  "table.name.format": "operation",
  "auto.create": "false",
  "insert.mode": "insert",
  "pk.mode": "record_value",
  "pk.fields": "operation_id",
  "tasks.max": "1",
  "topics": "operation_duplicate",

  "errors.tolerance": "all",
  "errors.log.enable": true,
  "errors.log.include.messages": true,
  "errors.deadletterqueue.topic.name": "${topic.deadLetter}",
  "errors.deadletterqueue.topic.replication.factor": "${replication-factor}",
  "errors.deadletterqueue.context.headers.enable": true
}

Merge nullable полей

Рассмотрим сценарий, когда данные из нескольких топиков должны сохраняться в одну таблицу БД. Пусть каждый топик содержит свой набор полей. Все сообщения относятся к одной сущности и имеют общий ключ.

Например в терминах таблицы operation в одном топике приходит field1, field2, а в другом field3, field4. Важно, чтобы после обработки сообщений для одного ключа из обоих топиков все поля field1,field2,field3,field4 были заполнены.

Режим upsert в чистом виде здесь не подойдет - второе сообщения перезапишет уже заполненные поля из первого, даже если эти поля отсутствуют или null.

Такую логику можно реализовать с помощью триггеров, но у такой реализации будет ряд существенных недостатков:

  • триггеры сложны в сопровождении

  • важно всегда помнить о них при разборе инцидентов

  • усложняют изменение структуры таблиц

  • логика обновления данных становится неявной

PostgreSQL позволяет решить эту задачу напрямую на уровне SQL:  

INSERT INTO "operation" as t ("operation_id","field1","field2","field3","field4")
 VALUES (....) ON CONFLICT ("operation_id") 
 do update set 
 field1 = COALESCE(t.field1,EXCLUDED.field1),
 field2 = COALESCE(t.field2,EXCLUDED.field2),
 field3 = COALESCE(t.field3,EXCLUDED.field3),
 field4 = COALESCE(t.field4,EXCLUDED.field4) 

Такой запрос обновит только те поля, которые NULL и не затрет уже сохраненные данные. Осталось реализовать и подключить собственный диалект:

class PostgresSqlDatabaseUpsertMergeDialect(config: AbstractConfig) : PostgreSqlDatabaseDialect(config) {

    companion object{
        const val TABLE_ALIAS = "t"
    }

    class Provider : DatabaseDialectProvider.SubprotocolBasedProvider(
        PostgresSqlDatabaseUpsertMergeDialect::class.java.simpleName,
        "postgresql"
    ) {
        override fun create(config: AbstractConfig): DatabaseDialect = PostgresSqlDatabaseUpsertMergeDialect(config)
    }

    override fun buildUpsertQueryStatement(
        table: TableId?,
        keyColumns: MutableCollection<ColumnId>?,
        nonKeyColumns: MutableCollection<ColumnId>?,
        definition: TableDefinition
    ): String =
        expressionBuilder()
            .append("INSERT INTO $table as $TABLE_ALIAS (")
            .appendList().delimitedBy(",").transformedBy(ExpressionBuilder.columnNames()).of(keyColumns, nonKeyColumns)
            .append(") VALUES (")
            .appendList().delimitedBy(",").transformedBy(columnValueVariables(definition)).of(keyColumns, nonKeyColumns)
            .append(") ON CONFLICT (")
            .appendList().delimitedBy(",").transformedBy(ExpressionBuilder.columnNames()).of(keyColumns)
            .append(") do update set ")
            .appendList().delimitedBy(",").of(
                nonKeyColumns?.map { "${it.name()} = COALESCE($TABLE_ALIAS.${it.name()},EXCLUDED.${it.name()})" }
            )
            .toString()
}

Конфигурации таски будет выглядеть так:

{
  "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",

  "connection.url": "${db.url}",
  "connection.user": "${db.user}",
  "connection.password": "${db.password}",
  "connection.attempts": "${db.connection.attempts|100}",
  "connection.backoff.ms": "${db.connection.backoff.ms|10000}",

  "dialect.name": "PostgresSqlDatabaseUpsertMergeDialect",
  "consumer.override.group.id": "${operation-intermediate.group|master-data-operation-state-received-group}",
  "key.converter": "org.apache.kafka.connect.storage.StringConverter",
  "value.converter": "org.apache.kafka.connect.json.JsonConverter",
  "value.converter.schemas.enable": "true",
  "table.name.format": "operation",
  "auto.create": "false",
  "insert.mode": "upsert",
  "pk.mode": "record_value",
  "pk.fields": "operation_id",
  "tasks.max": "1",
  "topics": "operation_merge",

  "errors.tolerance": "all",
  "errors.log.enable": true,
  "errors.log.include.messages": true,
  "errors.deadletterqueue.topic.name": "${topic.deadLetter}",
  "errors.deadletterqueue.topic.replication.factor": "${replication-factor}",
  "errors.deadletterqueue.context.headers.enable": true
}

На мой взгляд данная реализация гораздо прозрачнее варианта с триггером. Вся логика обновления данных выражена явным SQL, запросы можно увидеть в логах коннектора при включенном логировании.