Как стать автором
Обновить

Пишем свой Config Provider для Kafka Connect с поддержкой значений по умолчанию

Уровень сложностиПростой
Время на прочтение4 мин
Количество просмотров679

Для инжекции значений из различных источников в конфигурации тасок Kafka Connect используются различные реализации интерфейса org.apache.kafka.common.config.provider.ConfigProvider. Это специализированный интерфейс, реализации которого отвечают за то, чтобы подставить значения используя различные источники. Самая используемая и стандартная реализация данного интерфейса, которую использовал в том числе и я, - org.apache.kafka.common.config.provider.FileConfigProvider. Также упомяну используемый мной в некоторых тасках org.ggt.kafka.config.provider.KafkaEnvConfigProvider - реализация Config Provider, которая в качестве источника для инжекции значений использует environment variables.

Как подключить и использовать Config Provider

Для того, чтобы использовать Config Provider необходимо убедиться, что та реализация интерфейса ConfigProvider, которую собираемся использовать, добавлена в classpath. В environment variables необходимо прописать переменные, задающие префикс и class providera. Я собираю свой образ с kafka connector используя базовый cp-kafka-connect-base, где я добавляю в classpath все необходимые расширения и прописываю такого плана переменные окружения. Dockerfile с подключенным FileConfigProvider c префиксом ‘file’ может выглядеть так:

FROM confluentinc/cp-kafka-connect-base:7.3.0

CONNECT_CONFIG_PROVIDERS: “file”
CONNECT_CONFIG_PROVIDERS_FILE_CLASS="org.apache.kafka.common.config.provider.FileConfigProvider"

......................................................

Пример использования подключенного FileConfigProvider в json конфигурациях тасок:

{
 "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
 "connection.url": "${file:/configurations/configuration.properties:db-url}",
 "connection.user": "${file:/configurations/configuration.properties:db-user}",
 "connection.password": "${file:/configurations/configuration.properties:db-password}",
 "dialect.name": "PostgreSqlDatabaseDialect",
  ................................................................
}

Значения для свойств connection.url, connection.user и connection.password берутся из файла /configurations/configuration.properties по ключам db-url, db-user и db-password соответственно. Если один из ключей не задан в файле, то при старте данной таски будет ошибка.

Реализация собственного Config Provider

Захотелось иметь возможность задавать значения по умолчанию, т.е. значение, которое будет проставляться в случае отсутствия ключа в файле configuration.properties. В тоже время, т.к. во всех используемых у нас тасках configuration.properties файл зафиксирован в качестве источника значений для инжекции, а также зафиксировано его расположения, то нет нужды каждый раз прописывать path - этот момент также можно предусмотреть в реализации Config Providera.

Для того, чтобы реализовать свой Config Provider, необходимо собственно написать имплементацию интерфейса ConfigProvider. Реализуем свой Config Provider, с помощью которого можно будет задавать значения по умолчанию, а источником значений будет configuration.properties файл без явного указания полного пути к нему.

class DefaultValueConfigProvider : ConfigProvider {

   companion object {
       private val LOGGER: Logger = LoggerFactory.getLogger(DefaultValueConfigProvider::class.java)
       private const val PATH = "/configurations/configuration.properties"
       private const val DEFAULT_DELIMITER = "|"
   }

   private lateinit var properties: Map<String, String>

   override fun configure(configs: Map<String, *>) {
       properties = Properties().apply { load(InputStreamReader(FileInputStream(PATH), Charsets.UTF_8)) }.toMap() as Map<String, String>
   }

   override fun close() {
       LOGGER.info("${DefaultValueConfigProvider::class}:close")
   }

   override fun get(path: String): ConfigData = ConfigData(properties)

   override fun get(path: String, keys: MutableSet<String>) =
       ConfigData(
           properties.filter { p -> p.key in keys.filter { key -> !key.contains(DEFAULT_DELIMITER) } } +
               keys.filter { key -> key.contains(DEFAULT_DELIMITER) }
                   .associateBy(
                       { it },
                       { key -> key.split(DEFAULT_DELIMITER).let { properties[it[0]] ?: it[1] } }
                   )
       )
}

Реализация достаточно тривиальная, но стоит обговорить некоторые нюансы. В качестве разделителя хотелось использовать “:”, но данный символ зарезервирован в качестве разделителя между prefix, path и property, поэтому пришлось выбрать другой в качестве разделителя - ‘|’. Ключом в Map, передаваемую в ConfigData, будет именно изначальное значение, которое содержит ‘|’ и дефолтное значение, не нужно его обрезать, как было сделано мной ошибочно изначально. Т.е. изначально код был написан так:

override fun get(path: String, keys: MutableSet<String>) =
       ConfigData(
           properties.filter { p -> p.key in keys.filter { key -> !key.contains(DEFAULT_DELIMITER) } } +
               keys.filter { key -> key.contains(DEFAULT_DELIMITER) }
			       .map{ it.split(DEFAULT_DELIMITER) } // !ошибка
                   .associateBy(
                       { it[0] }, // ! ключ нельзя изменять
                       { properties[it[0]] ?: it[1] }
                   )

Понадобилось некоторое время, чтобы разобраться, что не так.

Подключение и использование

Представленную выше реализацию ConfigProvider необходимо сконфигурировать и добавить в classpath. Также необходимо настроить environment variables должным образом. В моем случае Dockerfile изменится так:

FROM confluentinc/cp-kafka-connect-base:7.3.0

ENV CONNECT_CONFIG_PROVIDERS="file,dflt"
ENV CONNECT_CONFIG_PROVIDERS_FILE_CLASS="org.apache.kafka.common.config.provider.FileConfigProvider"
ENV CONNECT_CONFIG_PROVIDERS_DFLT_CLASS="ru.typik.kafka.connect.config.provider.DefaultValueConfigProvider"


# В libs лежит собранный jar с реализацией DefaultValueConfigProvider 
COPY /libs/ /usr/share/java/kafka-serde-tools/
COPY /libs/ /usr/share/java/cp-base-new/

.................................................................

Представленная выше конфигурация таски трансформируется так, используя новый Config Provider и функционал дефолтных значений:

{
 "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
 "connection.url": "${dflt:db-url}",
 "connection.user": "${dflt:db-user|postgres}",
 "connection.password": "${dflt:db-password|postgres}",
 "dialect.name": "PostgreSqlDatabaseDialect",
….

В приведенном примере значения для connection.url, connection.user и connection.password все также будут браться из файла /configurations/configuration.properties по ключам db-url,db-user и db-password соответственно, с той лишь разницей, что теперь, если ключи db-user и db-password будут отсутствовать, то будут проставляться значения по умолчанию postgres и postgres. Отсутствие же ключа db-url приведет к ошибке при инициализации таски как и в предыдущем варианте.

Теги:
Хабы:
Рейтинг0
Комментарии0

Публикации

Работа

Java разработчик
201 вакансия

Ближайшие события