Пишем свой Config Provider для Kafka Connect с поддержкой значений по умолчанию
Для инжекции значений из различных источников в конфигурации тасок Kafka Connect используются различные реализации интерфейса org.apache.kafka.common.config.provider.ConfigProvider. Это специализированный интерфейс, реализации которого отвечают за то, чтобы подставить значения используя различные источники. Самая используемая и стандартная реализация данного интерфейса, которую использовал в том числе и я, - org.apache.kafka.common.config.provider.FileConfigProvider. Также упомяну используемый мной в некоторых тасках org.ggt.kafka.config.provider.KafkaEnvConfigProvider - реализация Config Provider, которая в качестве источника для инжекции значений использует environment variables.
Как подключить и использовать Config Provider
Для того, чтобы использовать Config Provider необходимо убедиться, что та реализация интерфейса ConfigProvider, которую собираемся использовать, добавлена в classpath. В environment variables необходимо прописать переменные, задающие префикс и class providera. Я собираю свой образ с kafka connector используя базовый cp-kafka-connect-base, где я добавляю в classpath все необходимые расширения и прописываю такого плана переменные окружения. Dockerfile с подключенным FileConfigProvider c префиксом ‘file’ может выглядеть так:
FROM confluentinc/cp-kafka-connect-base:7.3.0
CONNECT_CONFIG_PROVIDERS: “file”
CONNECT_CONFIG_PROVIDERS_FILE_CLASS="org.apache.kafka.common.config.provider.FileConfigProvider"
......................................................
Пример использования подключенного FileConfigProvider в json конфигурациях тасок:
{
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url": "${file:/configurations/configuration.properties:db-url}",
"connection.user": "${file:/configurations/configuration.properties:db-user}",
"connection.password": "${file:/configurations/configuration.properties:db-password}",
"dialect.name": "PostgreSqlDatabaseDialect",
................................................................
}
Значения для свойств connection.url, connection.user и connection.password берутся из файла /configurations/configuration.properties по ключам db-url, db-user и db-password соответственно. Если один из ключей не задан в файле, то при старте данной таски будет ошибка.
Реализация собственного Config Provider
Захотелось иметь возможность задавать значения по умолчанию, т.е. значение, которое будет проставляться в случае отсутствия ключа в файле configuration.properties. В тоже время, т.к. во всех используемых у нас тасках configuration.properties файл зафиксирован в качестве источника значений для инжекции, а также зафиксировано его расположения, то нет нужды каждый раз прописывать path - этот момент также можно предусмотреть в реализации Config Providera.
Для того, чтобы реализовать свой Config Provider, необходимо собственно написать имплементацию интерфейса ConfigProvider. Реализуем свой Config Provider, с помощью которого можно будет задавать значения по умолчанию, а источником значений будет configuration.properties файл без явного указания полного пути к нему.
class DefaultValueConfigProvider : ConfigProvider {
companion object {
private val LOGGER: Logger = LoggerFactory.getLogger(DefaultValueConfigProvider::class.java)
private const val PATH = "/configurations/configuration.properties"
private const val DEFAULT_DELIMITER = "|"
}
private lateinit var properties: Map<String, String>
override fun configure(configs: Map<String, *>) {
properties = Properties().apply { load(InputStreamReader(FileInputStream(PATH), Charsets.UTF_8)) }.toMap() as Map<String, String>
}
override fun close() {
LOGGER.info("${DefaultValueConfigProvider::class}:close")
}
override fun get(path: String): ConfigData = ConfigData(properties)
override fun get(path: String, keys: MutableSet<String>) =
ConfigData(
properties.filter { p -> p.key in keys.filter { key -> !key.contains(DEFAULT_DELIMITER) } } +
keys.filter { key -> key.contains(DEFAULT_DELIMITER) }
.associateBy(
{ it },
{ key -> key.split(DEFAULT_DELIMITER).let { properties[it[0]] ?: it[1] } }
)
)
}
Реализация достаточно тривиальная, но стоит обговорить некоторые нюансы. В качестве разделителя хотелось использовать “:”, но данный символ зарезервирован в качестве разделителя между prefix, path и property, поэтому пришлось выбрать другой в качестве разделителя - ‘|’. Ключом в Map, передаваемую в ConfigData, будет именно изначальное значение, которое содержит ‘|’ и дефолтное значение, не нужно его обрезать, как было сделано мной ошибочно изначально. Т.е. изначально код был написан так:
override fun get(path: String, keys: MutableSet<String>) =
ConfigData(
properties.filter { p -> p.key in keys.filter { key -> !key.contains(DEFAULT_DELIMITER) } } +
keys.filter { key -> key.contains(DEFAULT_DELIMITER) }
.map{ it.split(DEFAULT_DELIMITER) } // !ошибка
.associateBy(
{ it[0] }, // ! ключ нельзя изменять
{ properties[it[0]] ?: it[1] }
)
Понадобилось некоторое время, чтобы разобраться, что не так.
Подключение и использование
Представленную выше реализацию ConfigProvider необходимо сконфигурировать и добавить в classpath. Также необходимо настроить environment variables должным образом. В моем случае Dockerfile изменится так:
FROM confluentinc/cp-kafka-connect-base:7.3.0
ENV CONNECT_CONFIG_PROVIDERS="file,dflt"
ENV CONNECT_CONFIG_PROVIDERS_FILE_CLASS="org.apache.kafka.common.config.provider.FileConfigProvider"
ENV CONNECT_CONFIG_PROVIDERS_DFLT_CLASS="ru.typik.kafka.connect.config.provider.DefaultValueConfigProvider"
# В libs лежит собранный jar с реализацией DefaultValueConfigProvider
COPY /libs/ /usr/share/java/kafka-serde-tools/
COPY /libs/ /usr/share/java/cp-base-new/
.................................................................
Представленная выше конфигурация таски трансформируется так, используя новый Config Provider и функционал дефолтных значений:
{
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url": "${dflt:db-url}",
"connection.user": "${dflt:db-user|postgres}",
"connection.password": "${dflt:db-password|postgres}",
"dialect.name": "PostgreSqlDatabaseDialect",
….
В приведенном примере значения для connection.url, connection.user и connection.password все также будут браться из файла /configurations/configuration.properties по ключам db-url,db-user и db-password соответственно, с той лишь разницей, что теперь, если ключи db-user и db-password будут отсутствовать, то будут проставляться значения по умолчанию postgres и postgres. Отсутствие же ключа db-url приведет к ошибке при инициализации таски как и в предыдущем варианте.