Strimzi — это практически самый широкий оператор Kubernetes Kafka, который можно использовать для развертывания Apache Kafka, либо других его компонентов, таких как Kafka Connect, Kafka Mirror и т.д. В статье мы пошагово разберем развертывание Kafka Connect в Kubernetes. А еще затронем проблемы, с которыми можно столкнуться во время процедуры развертывания и приведем способы их решения.
Примечание: Учтите, что Strimzi основан на Apache Kafka, а не на платформе Confluent. Вот почему вам, скорее всего, потребуется добавить некоторые артефакты Confluent, например, Confluent Avro Converter, чтобы получить отдачу от этого.
Статья основана на Strimzi v0.29.0
. Это значит, что вы можете установить следующие версии Kafka Connect:
Стримзи: 0.29.0
Apache Kafka и Kafka Connect: до 3.2
Эквивалентная платформа слияния: 7.2.4
Примечание: вы можете преобразовать версию платформы Confluent в версию Apache Kafka и наоборот с помощью приведенной здесь таблицы.
Установка
Графический интерфейс Openshift и CLI Kubernetes
Если вы используете Openshift, перейдите в раздел Операторы > установленные операторы > Strimzi > Kafka Connect.
Шаг за шагом: Теперь вы столкнетесь с формой, содержащей конфигурации Kafka connect. Чтобы получить эквивалентный Yaml‑файл формы, нажмите на Yaml View. Любое обновление представления формы применяется к представлению Yaml «на лету». Только не используйте его для непосредственного создания экземпляра. Он нужен для преобразования желаемой конфигурации в файл Yaml. После получения Yaml‑файл разверните оператор с помощью команды kubectl apply. Итак, подведем итог:
Введите конфигурацию в представлении формы.
Нажмите на просмотр Yaml.
Скопируйте его содержимое в файл Yaml на вашем локальном компьютере (например, kafka-connect.yaml).
Выполнить: kubectl apply -f kafka-connect.yaml .
Вид Kafka-Connect либо развертывается, либо обновляется. Развернутые ресурсы состоят из развертывания и модулей, сервиса, конфигурационных карт и секретов.
Давайте пройдемся по минимальной конфигурации и шаг за шагом сделаем ее более продвинутой.
Минимальная конфигурация
Чтобы развернуть простую минимальную конфигурацию Kafka Connect, вы можете использовать приведенный ниже Yaml:
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
name: my-connect-cluster
namespace: <YOUR_PROJECT_NAME>
spec:
config:
config.storage.replication.factor: -1
config.storage.topic: okd4-connect-cluster-configs
group.id: okd4-connect-cluster
offset.storage.replication.factor: -1
offset.storage.topic: okd4-connect-cluster-offsets
status.storage.replication.factor: -1
status.storage.topic: okd4-connect-cluster-status
bootstrapServers: kafka1, kafka2
version: 3.2.0
replicas: 1
Также для этой цели подойдет Rest API Kafka Connect на порту 8083, открытом в модуле. Предоставьте его в частной или внутренней сети, определив маршрут в OLD.
Аутентификация REST API
С помощью конфигурации, описанной здесь, добавьте аутентификацию к REST-прокси Kafka Connect. Единственное — это не сработает с оператором Strimzi. Поэтому для обеспечения безопасности в Kafka Connect, у вас есть два варианта:
Используйте API-оператор Kafka Connector. Оператор Strimzi поможет определить тип соединителя в файле YAML. Однако в некоторых случаях это может быть непрактично, ведь необходимо обновлять, приостанавливать и останавливать соединители через REST API.
Поместите небезопасный REST API за аутентифицированный API-шлюз, такой, как Apache APISIX. Подойти может любой другой инструмент или приложение собственной разработки.
Показатели JMX Prometheus
Чтобы предоставить метрики JMX Prometheus, полезные для наблюдения за статусами соединителей в Grafana, добавьте приведенную ниже конфигурацию:
metricsConfig:
type: jmxPrometheusExporter
valueFrom:
configMapKeyRef:
key: jmx-prometheus
name: configs
jmxOptions: {}
Он использует предварительно определенную конфигурацию для экспорта Prometheus. Вы можете использовать эту конфигурацию:
startDelaySeconds: 0
ssl: false
lowercaseOutputName: false
lowercaseOutputLabelNames: false
rules:
- pattern : "kafka.connect<type=connect-worker-metrics>([^:]+):"
name: "kafka_connect_connect_worker_metrics_$1"
- pattern : "kafka.connect<type=connect-metrics, client-id=([^:]+)><>([^:]+)"
name: "kafka_connect_connect_metrics_$2"
labels:
client: "$1"
- pattern: "debezium.([^:]+)<type=connector-metrics, context=([^,]+), server=([^,]+), key=([^>]+)><>RowsScanned"
name: "debezium_metrics_RowsScanned"
labels:
plugin: "$1"
name: "$3"
context: "$2"
table: "$4"
- pattern: "debezium.([^:]+)<type=connector-metrics, context=([^,]+), server=([^>]+)>([^:]+)"
name: "debezium_metrics_$4"
labels:
plugin: "$1"
name: "$3"
context: "$2"
Сервис для внешнего Prometheus
Если вы собираетесь развернуть Prometheus в сочетании со Strimzi для сбора показателей, следуйте инструкциям. Но помните, что в случае использования внешнего Prometheus история разворачивается по-другому.
Оператор Strimzi создает отображение портов в сервисе только для этих портов:
8083: Kafka Connect REST API
9999: порт JMX
К сожалению, это не создает сопоставление для порта 9404, HTTP-порта экспортера Prometheus. Итак, мы должны создать сервис самостоятельно:
kind: Service
apiVersion: v1
metadata:
name: kafka-connect-jmx-prometheus
namespace: kafka-connect
labels:
app.kubernetes.io/instance: kafka-connect
app.kubernetes.io/managed-by: strimzi-cluster-operator
app.kubernetes.io/name: kafka-connect
app.kubernetes.io/part-of: strimzi-kafka-connect
strimzi.io/cluster: kafka-connect
strimzi.io/kind: KafkaConnect
spec:
ports:
- name: tcp-prometheus
protocol: TCP
port: 9404
targetPort: 9404
type: ClusterIP
selector:
strimzi.io/cluster: kafka-connect
strimzi.io/kind: KafkaConnect
strimzi.io/name: kafka-connect-connect
status:
loadBalancer: {}
Примечание: этот метод работает только для развертываний с одним модулем. Вы должны определить маршрут для сервиса, и даже в случае headless сервиса маршрут возвращает один IP-модуль за раз. Следовательно, Prometheus не может очистить все показатели pods. Вот почему рекомендуется использовать Podmonitor и Prometheus в облаке.
Плагины и артефакты
Чтобы добавить плагины и артефакты, есть два способа.
Секция сборки оператора
Чтобы добавить плагины, воспользуйтесь разделом сборки оператора. Он получает адреса плагинов или артефактов, загружает их на этапе сборки, так как оператор автоматически создает конфигурацию сборки, и добавляет их в каталог плагинов изображения.
Он поддерживает jar, tgz, zip, and maven
. Но в случае Maven создается многоступенчатый файл Dockerfile, который проблематичен для Openshift, и он сталкивается со сбоем на стадии сборки. Следовательно, вам следует использовать только другие типы, которым не нужна стадия компиляции, например, jar, zip, tgz, и в итоге вы получите одноступенчатый файл Dockerfile.
Например, чтобы добавить плагин Debezium MySQL, вы можете использовать приведенную ниже конфигурацию:
spec:
build:
output:
image: 'kafkaconnect:1.0'
type: imagestream
plugins:
- artifacts:
- type: tgz
url: >-
https://repo1.maven.org/maven2/io/debezium/debezium-connector-mysql/2.1.4.Final/debezium-connector-mysql-2.1.4.Final-plugin.tar.gz
name: debezium-connector-mysql
Важно: оператор Strimzi может загружать только общедоступные артефакты. Поэтому, если вы хотите загрузить конфиденциально защищенный артефакт, недоступный Kubernetes, вам следует отказаться от этого метода и перейти к следующему.
Изменение изображений
Оператор может использовать желаемое вами изображение вместо изображения по умолчанию. Так вы можете добавить выбранные артефакты и плагины, создав образ вручную или с помощью CI/CD. Вы, возможно, захотите использовать именно этот способ, ведь Strimzi использует Apache Kafka image, а не платформу Confluent. То есть, в развертываниях нет совместимых пакетов, таких как Confluent Avro Converter и т.д. Поэтому вам нужно добавить их в свой образ и настроить оператора на использование вашего образа docker.
Например, если вы хотите добавить свой настроенный плагин Debezium MySQL Connector из универсальных пакетов Gitlab и Confluent Avro Converter в базовый образ, сначала используйте этот файл Dockerfile:
ARG CONFLUENT_VERSION=7.2.4
# Install confluent avro converter
FROM confluentinc/cp-kafka-connect:${CONFLUENT_VERSION} as cp
# Reassign version
ARG CONFLUENT_VERSION
RUN confluent-hub install --no-prompt confluentinc/kafka-connect-avro-converter:${CONFLUENT_VERSION}
# Copy privious artifacts to the main strimzi kafka image
FROM quay.io/strimzi/kafka:0.29.0-kafka-3.2.0
ARG GITLAB_TOKEN
ARG CI_API_V4_URL=https://gitlab.snapp.ir/api/v4
ARG CI_PROJECT_ID=3873
ARG DEBEZIUM_CONNECTOR_MYSQL_CUSTOMIZED_VERSION=1.0
USER root:root
# Copy Confluent packages from previous stage
RUN mkdir -p /opt/kafka/plugins/avro/
COPY --from=cp /usr/share/confluent-hub-components/confluentinc-kafka-connect-avro-converter/lib /opt/kafka/plugins/avro/
# Connector plugin debezium-connector-mysql
RUN 'mkdir' '-p' '/opt/kafka/plugins/debezium-connector-mysql' \
&& curl --header "${GITLAB_TOKEN}" -f -L \
--output /opt/kafka/plugins/debezium-connector-mysql.tgz \
${CI_API_V4_URL}/projects/${CI_PROJECT_ID}/packages/generic/debezium-customized/${DEBEZIUM_CONNECTOR_MYSQL_CUSTOMIZED_VERSION}/debezium-connector-mysql-customized.tar.gz \
&& 'tar' 'xvfz' '/opt/kafka/plugins/debezium-connector-mysql.tgz' '-C' '/opt/kafka/plugins/debezium-connector-mysql' \
&& 'rm' '-vf' '/opt/kafka/plugins/debezium-connector-mysql.tgz'
USER 1001
Создайте образ. Отправьте его в поток изображений или любой другой репозиторий docker и настройте оператора, добавив строку ниже:
spec:
image: image-registry.openshift-image-registry.svc:5000/kafka-connect/kafkaconnect-customized:1.0
В зависимости от его типа используйте различные конфигурации для добавления проверки подлинности Kafka. Здесь вы можете увидеть конфигурацию для Kafka с механизмом SASL/Plaintext и scram-sha-512:
spec:
authentication:
passwordSecret:
password: kafka-password
secretName: mysecrets
type: scram-sha-512
username: myuser
Далее вам нужно указать пароль в секретном файле с именем my secret.
Обработка учетных данных файла
Поскольку соединителям нужны учетные данные для доступа к базам данных, вы должны определить их как секреты и получить к ним доступ с помощью переменных среды. Однако, если их слишком много, вы можете поместить все учетные данные в файл и адресовать их в соединителе с помощью модификатора $file modifier
.
Поместите все учетные данные в качестве значения ключа с именем credentials в секретный файл.
Файл учетных данных:
USERNAME_DB_1=user1
PASSWORD_DB_1=pass1
USERNAME_DB_2=user2
PASSWORD_DB_2=pass2
Secret file:
kind: Secret
apiVersion: v1
metadata:
name: mysecrets
namespace: kafka-connect
data:
credentials: <BASE64 YOUR DATA>
Настройте оператора с секретом в качестве тома:
spec:
config:
config.providers: file
config.providers.file.class: org.apache.kafka.common.config.provider.FileConfigProvider
externalConfiguration:
volumes:
- name: database_credentials
secret:
items:
- key: credentials
path: credentials
optional: false
secretName: mysecrets
Теперь в соединителе вы можете получить доступ к PASSWORD_DB_1 с помощью приведенной ниже команды:
"${file:/opt/kafka/external-configuration/database_credentials/credentials:PASSWORD_DB_1}"
Собрать все это воедино
Если мы соберем все конфигурации вместе, у нас будет приведенная ниже конфигурация для Kafka Connect:
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
name: kafka-connect
namespace: kafka-connect
spec:
authentication:
passwordSecret:
password: kafka-password
secretName: mysecrets
type: scram-sha-512
username: myuser
config:
config.providers: file
config.providers.file.class: org.apache.kafka.common.config.provider.FileConfigProvider
config.storage.replication.factor: -1
config.storage.topic: okd4-connect-cluster-configs
group.id: okd4-connect-cluster
offset.storage.replication.factor: -1
offset.storage.topic: okd4-connect-cluster-offsets
status.storage.replication.factor: -1
status.storage.topic: okd4-connect-cluster-status
bootstrapServers: 'kafka1:9092, kafka2:9092'
metricsConfig:
type: jmxPrometheusExporter
valueFrom:
configMapKeyRef:
key: jmx-prometheus
name: configs
resources:
limits:
memory: 1Gi
requests:
memory: 1Gi
readinessProbe:
failureThreshold: 10
initialDelaySeconds: 60
periodSeconds: 20
jmxOptions: {}
livenessProbe:
failureThreshold: 10
initialDelaySeconds: 60
periodSeconds: 20
image: image-registry.openshift-image-registry.svc:5000/kafka-connect/kafkaconnect-customized:1.0
version: 3.2.0
replicas: 2
externalConfiguration:
volumes:
- name: database_credentials
secret:
items:
- key: credentials
path: credentials
optional: false
secretName: mysecrets
Примечание: сервис, маршрут и конфигурация сборки не указаны, это есть в статье выше.
Делаем выводы
Развертывание Kafka Connect с использованием оператора Strimzi может стать мощным и эффективным способом управления интеграцией данных в вашей организации. Используя гибкость и масштабируемость Kafka, а также простоту использования и автоматизацию, предоставляемые оператором Strimzi, вы можете оптимизировать свои каналы передачи данных и улучшить процесс принятия решений на основе данных.
Чтобы разобраться, как развертывать Kafka и использовать этот инструмент в работе, приглашаем вас на курс Apache Kafka для разработчиков. Обучение стартует 12 мая 2023. На нем разберем:
неправильное использование Кафка и отсутствие коммитов в ней;
ваши кейсы о проблемах при работе с Apache Kafka;
опыт создания Data Lake на ~80 ТБ с помощью Apache Kafka;
особенности эксплуатации kafka с retention в 99999999.
В этой статье мы рассмотрели ключевые шаги, связанные с развертыванием Kafka Connect с помощью оператора Strimzi, включая создание минимального пользовательского определения ресурсов (CRD), проблему базовой аутентификации REST API, аутентификацию Kafka, показатели JMX Prometheus, плагины и артефакты, а также обработку учетных данных файла. Выполнив эти действия, вы можете легко настроить развертывание Kafka Connect в соответствии с вашими конкретными потребностями.