Я активно использую на проекте Kafka Connect Framework и в частности Kafka JDBC Sink Connector для быстрого сохранения данных из Kafka Topic в БД PostgresSQL. Для большинства задач достаточно написать простую JSON-конфигурацию и все стабильно и быстро работает из коробки. Нет необходимости в написании собственного кода. Однако в нетиповых ситуациях расширяемость Kafka Connect тоже помогает - можно переопределить и написать один из компонентов.
В конфигурации JDBC Sink Connector Task существует настройка dialect.name, которая отвечает за выбор диалекта для работы с конкретной БД. Как правило, в 99% случаев используется один из уже реализованных для популярных БД диалектов, как например в моем случае PostgresSqlDatabaseDialect для PostgreSQL.
Может показаться, что вряд ли кому-то понадобится реализовывать свой диалект, если только не имеем дело с какой-то специфичной непопулярной БД. Однако на практике оказалось, что реализация своего диалекта даже для PostgreSQL может быть полезна для решения некоторых возникающих прикладных задач.
В данной статье я хочу показать идеи того, как реализация своего DatabaseDialect может помочь при имплементации нестандартных сценариев для вполне себе популярной БД Postgres, для которой существует PostgresSqlDatabaseDialect.
Что такое DatabaseDialect и зачем он нужен
DatabaseDialect - это слой абстракции в Kafka JDBC Sink Connector, который отвечает за генерацию SQL-запросов и взаимодействие с конкретной БД через JDBC. Именно в диалекте определяется, какие SQL конструкции будут использоваться при вставке, обновлении или удалении данных.
Каждая БД имеет свои особенности: различия в синтаксисе расширенного SQL, поддержке upsert, типах данных и т.д. Для того, чтобы учесть все эти особенности существуют различные реализации интерфейса DatabaseDialect.
Выбор диалекта происходит в момент старта коннектора. Для этого коннектор смотрит на настройку dialect.name. Все доступные реализации DatabaseDialectProvider загружаются через Java SPI и выбирается provider, имя которого явно совпадает со значением настройки dialect.name.
Т.е. если возникает необходимость изменить или расширить SQL-запросы, например для использования каких-то дополнительных возможностей или, как будет разобрано далее, нестандартное поведение при конфликте вставки, то для подобных задач необходимо переопределять именно поведение в DatabaseDialect. Например с помощью SMT ( Single Message Transform) это сделать не получится, т.к. они работают на уровне сообщений Kafka и не влияют на генерацию SQL.
Реализация и подключение своего DatabaseDialect
Реализация интерфейса DatabaseDialect
Свой диалект должен реализовывать интерфейс DatabaseDialect. На практике скорей всего понадобиться расширить один из существующих как в моем случае PostgreSqlDatabaseDialect.
Реализация DatabaseDialectProvider
Kafka JDBC Sink Connector не создает диалекты напрямую. Для этого используется DatabaseDialectProvider. Минимальная реализация provider’а в моем случае выглядит так:
class MyProvider : DatabaseDialectProvider.SubprotocolBasedProvider( MyPostgresSqlDatabaseDialect::class.java.simpleName, "postgresql" ) { override fun create(config: AbstractConfig): DatabaseDialect = MyPostgresSqlDatabaseDialect(config) }
Здесь собственно указывается уникальное имя диалекта, поддерживаемый JDBC subprotocol и фабричный метод для создания инстанса диалекта. При старте коннектор должен будет найти данный provider и с помощью него создать и использовать соответств��ющий диалект.
Регистрация через Java SPI
Для того, чтобы Kafka Connect, используя механизм Java SPI, смог обнаружить новый диалект и подключить его в jar-файл с диалектом необходимо добавить файл META-INF/services/io.confluent.connect.jdbc.dialect.DatabaseDialectProvider с содержимым:
my.kafka.connect.dialect.provider.MyProvider
Подключение jar в Kafka Connector
Собранный jar необходимо положить в classpath Kafka Connect до старта worker'a. Я в качестве базового образа использую confluentinc/cp-kafka-connect-base и поэтому при сборке образа копирую данный артефакт в /libs/ /usr/share/java/kafka-serde-tools/.
Реальные сценарии с примерами реализации
Далее приведены примеры задач, которые удалось решить с помощью реализации DatabaseDialect. Эти примеры очень близки реальным кейсам в PROD, но упрощены для демонстрации.
Может показаться, что данные задачи вызваны промахами при проектирование системы и это скорей всего отчасти так. Однако приходится работать и с такими кейсами. Хорошо, что Kafka Connect со своей расширяемостью и в частности с механизмом переопределения DatabaseDialect позволяет решать подобные задачи достаточно просто и прозрачно, без срочного перепроектирования моделей и миграций.
В качестве примера далее будет использоваться следующая таблица в PostgresSQL:
CREATE TABLE master_data.operation ( id int8 NOT NULL GENERATED BY DEFAULT AS IDENTITY, operation_id varchar(10) NOT NULL, field1 text NULL, field2 text NULL, field3 text NULL, field4 int8 NULL, CONSTRAINT operation_operation_id_key null, CONSTRAINT operation_pkey null );
В Kafka Connect тасках будет использоваться JsonConverter:
"value.converter": "org.apache.kafka.connect.json.JsonConverter", "value.converter.schemas.enable": "true",
Json Shema Registry использоваться не будет для простоты. При тестировании в топики отправлялись сообщения со схемой:
{ "schema": { "type": "struct", "name": "operation", "fields": [ { "field": "operation_id", "type": "string", "optional": false }, { "field": "field1", "type": "string", "optional": true }, { "field": "field2", "type": "string", "optional": true }, { "field": "field3", "type": "string", "optional": true }, { "field": "field4", "type": "int64", "optional": true } ] }, "payload": {"operation_id":"2","field3":"3","field4":"22"} }
Но далее, чтобы не загромождать статью, в примерах будет использоваться только содержимое payload часть.
Игнорирование дублей при вставке
Есть Kafka Topic, данные из которого сохраняются в таблицу PostgreSQL. Необходимо сделать так, чтобы данные из этого топика сохранялись только в том случае, если записи с таким же ключом еще нет в таблице. В эту же таблицу в режиме upsert пишет Kafka JDBC Sink Connector из другого топика. Важно, чтобы данные из первого не перезаписали данные из 2го, но при этом успешно вставлялись в БД, если записи с таким ключом еще нет.
Данную задачу в принципе можно решить на моей версии kafka connect ( confluentinc/cp-kafka-connect-base:7.8.0) реализовав простую insert-таску для первого топика:
{ "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector", "connection.url": "${db.url}", "connection.user": "${db.user}", "connection.password": "${db.password}", "connection.reWriteBatchedInserts": "${db.reWriteBatchedInserts|false}", "connection.attempts": "${db.connection.attempts|100}", "connection.backoff.ms": "${db.connection.backoff.ms|10000}", "dialect.name": "PostgreSqlDatabaseDialect", "consumer.override.group.id": "${operation-intermediate.group|master-data-operation-state-received-group}", "key.converter": "org.apache.kafka.connect.storage.StringConverter", "value.converter": "org.apache.kafka.connect.json.JsonConverter", "value.converter.schemas.enable": "true", "table.name.format": "operation", "auto.create": "false", "insert.mode": "insert", "pk.mode": "record_value", "pk.fields": "operation_id", "tasks.max": "1", "topics": "operation_duplicate_error", "errors.tolerance": "all", "errors.log.enable": true, "errors.log.include.messages": true, "errors.deadletterqueue.topic.name": "${topic.deadLetter}", "errors.deadletterqueue.topic.replication.factor": "${replication-factor}", "errors.deadletterqueue.context.headers.enable": true }
В такой конфигурации если сначала послать сообщение:
{"operation_id":"1","field1":"1","field2":"1"}
Запись с operation_id = 1 успешно сохранится в БД. Затем отправим пачку сообщений:
{"operation_id":"2","field1":"2","field2":"2"} {"operation_id":"1","field1":"3","field2":"3"} {"operation_id":"3","field1":"3","field2":"3"}
В результате через некоторое время записи с operation_id = 2 и operation_id = 3 будут успешно сохранены в таблицу, а запись с operation_id = 1 не обновится.
Функционально это именно то поведение, которое я ожидаю. Однако если посмотреть логи, то можно увидеть, что пачка обрабатывалась более 30 секунд (при моих настройках ретрая). В логах видно, что при обработке пачки возникали многочисленные ошибки и ретраи:
2026-02-01T19:43:39.340049010Z 2026-02-01 19:43:39 JdbcSinkTask [task-thread-ignoreDuplicatesWithDelay-0] Write of 3 records failed, remainingRetries=0 2026-02-01T19:43:39.340066013Z java.sql.BatchUpdateException: Batch entry 1 INSERT INTO "operation" ("operation_id","field1","field2","field3","field4") VALUES ('1','3','3',NULL,NULL) was aborted: ERROR: duplicate key value violates unique constraint "operation_operation_id_key" 2026-02-01T19:43:39.340069713Z Detail: Key (operation_id)=(3) already exists. Call getNextException to see other errors in the batch.
Связано это с тем, что в режиме insert коннектор формирует batch insert запрос из вычитанных записей. В случае ошибки duplicate key value коннектор начинает повторять вставку поблочно, уменьшая размер батча. В конечном итоге он сохранит все записи до проблемной, а дальнейшее поведение будет зависеть от настройки error.tollerance.
При error.tollerance = all (как в моем примере) коннектор попытается сделать ретрай проблемной записи согласно настройкам и затем отправит в dead letter queue, если она задана.
При error.tollerance=none обработка завершится на проблемной записи и коннектор остановится.
Почему обработка батча с пролемной записью заняло 30 сек? Потому что сначала происходит ретрай текущего батча, затем его дробление, и после этого повторные ретраи проблемной записи. Сколько это займет времени зависит от конкретных настроек интервалов и количества ретраев.
Т.е. функционально текущее решение подходит под требования, но постоянные 30 секундные залипания при появлении дублей могут вызвать существенную деградацию производительности и лаг. Такая же проблема только для MySql описана здесь: при появлении в топике дублей, коннектор продолжает корректно работать, но средняя производительность существенно падает из-за ретрая проблемных операций.
В данном кейсе как раз нам может помочь реализация своего диалекта с использованием возможностей Postgres. В Postgres можно немного модифицировать INSERT, чтобы при конфликте по ключу не выбрасывалась ошибка:
INSERT INTO operation (operation_id,field1,field2,field3,field4) VALUES (..) ON CONFLIC ( operation_id) DO NOTHING
Тогда, реализовав свое диалект, можно добиться желаемого поведения без существенной деградации производительности:
class PostgresSqlDatabaseInsertOnConflictDoNothingDialect(config: AbstractConfig) : PostgreSqlDatabaseDialect(config) { class Provider : DatabaseDialectProvider.SubprotocolBasedProvider( PostgresSqlDatabaseInsertOnConflictDoNothingDialect::class.java.simpleName, "postgresql" ) { override fun create(config: AbstractConfig): DatabaseDialect = PostgresSqlDatabaseInsertOnConflictDoNothingDialect(config) } override fun buildInsertStatement( table: TableId, keyColumns: MutableCollection<ColumnId>, nonKeyColumns: MutableCollection<ColumnId>, definition: TableDefinition ): String = expressionBuilder() .apply { append("INSERT INTO ") append(table) append(" (") appendList().delimitedBy(",").transformedBy(ExpressionBuilder.columnNames()).of(keyColumns, nonKeyColumns) append(") VALUES (") appendList().delimitedBy(",").transformedBy(columnValueVariables(definition)).of(keyColumns, nonKeyColumns) append(") ON CONFLICT (") appendList().delimitedBy(",").transformedBy(ExpressionBuilder.columnNames()).of(keyColumns) append(") DO NOTHING") } .toString() }
Конфигурация таски будет выглядеть так:
{ "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector", "connection.url": "${db.url}", "connection.user": "${db.user}", "connection.password": "${db.password}", "connection.reWriteBatchedInserts": "${db.reWriteBatchedInserts|false}", "connection.attempts": "${db.connection.attempts|100}", "connection.backoff.ms": "${db.connection.backoff.ms|10000}", "dialect.name": "PostgresSqlDatabaseInsertOnConflictDoNothingDialect", "consumer.override.group.id": "${operation-intermediate.group|master-data-operation-state-received-group}", "key.converter": "org.apache.kafka.connect.storage.StringConverter", "value.converter": "org.apache.kafka.connect.json.JsonConverter", "value.converter.schemas.enable": "true", "table.name.format": "operation", "auto.create": "false", "insert.mode": "insert", "pk.mode": "record_value", "pk.fields": "operation_id", "tasks.max": "1", "topics": "operation_duplicate", "errors.tolerance": "all", "errors.log.enable": true, "errors.log.include.messages": true, "errors.deadletterqueue.topic.name": "${topic.deadLetter}", "errors.deadletterqueue.topic.replication.factor": "${replication-factor}", "errors.deadletterqueue.context.headers.enable": true }
Merge nullable полей
Рассмотрим сценарий, когда данные из нескольких топиков должны сохраняться в одну таблицу БД. Пусть каждый топик содержит свой набор полей. Все сообщения относятся к одной сущности и имеют общий ключ.
Например в терминах таблицы operation в одном топике приходит field1, field2, а в другом field3, field4. Важно, чтобы после обработки сообщений для одного ключа из обоих топиков все поля field1,field2,field3,field4 были заполнены.
Режим upsert в чистом виде здесь не подойдет - второе сообщения перезапишет уже заполненные поля из первого, даже если эти поля отсутствуют или null.
Такую логику можно реализовать с помощью триггеров, но у такой реализации будет ряд существенных недостатков:
триггеры сложны в сопровождении
важно всегда помнить о них при разборе инцидентов
усложняют изменение структуры таблиц
логика обновления данных становится неявной
PostgreSQL позволяет решить эту задачу напрямую на уровне SQL:
INSERT INTO "operation" as t ("operation_id","field1","field2","field3","field4") VALUES (....) ON CONFLICT ("operation_id") do update set field1 = COALESCE(t.field1,EXCLUDED.field1), field2 = COALESCE(t.field2,EXCLUDED.field2), field3 = COALESCE(t.field3,EXCLUDED.field3), field4 = COALESCE(t.field4,EXCLUDED.field4)
Такой запрос обновит только те поля, которые NULL и не затрет уже сохраненные данные. Осталось реализовать и подключить собственный диалект:
class PostgresSqlDatabaseUpsertMergeDialect(config: AbstractConfig) : PostgreSqlDatabaseDialect(config) { companion object{ const val TABLE_ALIAS = "t" } class Provider : DatabaseDialectProvider.SubprotocolBasedProvider( PostgresSqlDatabaseUpsertMergeDialect::class.java.simpleName, "postgresql" ) { override fun create(config: AbstractConfig): DatabaseDialect = PostgresSqlDatabaseUpsertMergeDialect(config) } override fun buildUpsertQueryStatement( table: TableId?, keyColumns: MutableCollection<ColumnId>?, nonKeyColumns: MutableCollection<ColumnId>?, definition: TableDefinition ): String = expressionBuilder() .append("INSERT INTO $table as $TABLE_ALIAS (") .appendList().delimitedBy(",").transformedBy(ExpressionBuilder.columnNames()).of(keyColumns, nonKeyColumns) .append(") VALUES (") .appendList().delimitedBy(",").transformedBy(columnValueVariables(definition)).of(keyColumns, nonKeyColumns) .append(") ON CONFLICT (") .appendList().delimitedBy(",").transformedBy(ExpressionBuilder.columnNames()).of(keyColumns) .append(") do update set ") .appendList().delimitedBy(",").of( nonKeyColumns?.map { "${it.name()} = COALESCE($TABLE_ALIAS.${it.name()},EXCLUDED.${it.name()})" } ) .toString() }
Конфигурации таски будет выглядеть так:
{ "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector", "connection.url": "${db.url}", "connection.user": "${db.user}", "connection.password": "${db.password}", "connection.attempts": "${db.connection.attempts|100}", "connection.backoff.ms": "${db.connection.backoff.ms|10000}", "dialect.name": "PostgresSqlDatabaseUpsertMergeDialect", "consumer.override.group.id": "${operation-intermediate.group|master-data-operation-state-received-group}", "key.converter": "org.apache.kafka.connect.storage.StringConverter", "value.converter": "org.apache.kafka.connect.json.JsonConverter", "value.converter.schemas.enable": "true", "table.name.format": "operation", "auto.create": "false", "insert.mode": "upsert", "pk.mode": "record_value", "pk.fields": "operation_id", "tasks.max": "1", "topics": "operation_merge", "errors.tolerance": "all", "errors.log.enable": true, "errors.log.include.messages": true, "errors.deadletterqueue.topic.name": "${topic.deadLetter}", "errors.deadletterqueue.topic.replication.factor": "${replication-factor}", "errors.deadletterqueue.context.headers.enable": true }
На мой взгляд данная реализация гораздо прозрачнее варианта с триггером. Вся логика обновления данных выражена явным SQL, запросы можно увидеть в логах коннектора при включенном логировании.