
С задачей построения платформы для потоковой обработки данных по тем или иным причинам сталкиваются многие компании. И довольно часто подобная разработка превращается в попытку «создать ракету», не понимая сути происходящего под капотом. При этом для решения подобной задачи достаточно иметь набор инструментов и знать принципы их настройки на каждом из этапов.
Меня зовут Сергей Емельянов. Я руководитель Core-команды VK Tech. В этой статье я пошагово покажу процесс построения синтетической платформы для обработки потоковых данных на Kubernetes.
Предпосылки выбора Kubernetes для построения платформы
Решения для потоковой обработки данных могут быть развернуты на разных платформах, среди которых Apache Hadoop YARN, Bare Metal (без оркестратора контейнеров) и Kubernetes.
Вместе с тем часто для этой задачи выбирают именно Kubernetes. Причин несколько:
Полезен компаниям любого размера. Kubernetes помогает эффективно управлять инфраструктурой независимо от масштаба бизнеса. Для крупных компаний он упрощает управление тысячами микросервисов, обеспечивая отказоустойчивость и масштабирование. Маленьким командам Kubernetes облегчает DevOps-процессы благодаря операторам — специальным инструментам автоматизации развертывания и обслуживания приложений.
Демократизация. Сейчас Kubernetes доступен каждому благодаря облаку. Провайдеры предлагают управляемые сервисы Kubernetes (Managed Kubernetes), позволяющие развернуть кластеры буквально парой кликов. Также появились удобные инструменты установки и управления, что значительно снизило порог входа для новичков.
Вселенная Kubernetes. Kubernetes имеет обширное и активное комьюнити, а также целую экосистему инструментов и решений, поддерживаемых Cloud Native Computing Foundation (CNCF).
Легкость разработки. Работа с Kubernetes напоминает использование веб-фреймворка Django или конструктора лего — вы получаете готовый каркас, куда легко интегрируются дополнительные модули («библиотеки») или компоненты («детали»), создавая нужную инфраструктуру. При этом практически любые компоненты для реализации можно найти в каталоге проектов CNCF Landscape. Такой подход значительно ускоряет разработку и запуск сложных распределенных систем, снижая порог входа и сложность настройки инфраструктуры.
Вводные условия для построения платформы
В рамках синтетического примера будем рассматривать простой бизнес-процесс:
пользователи совершают покупки;
события фиксируются агрегирующей системой;
события попадают в хранилище данных;
далее события из хранилища используются для аналитики.

Технически под капотом процесс реализуется в несколько этапов:
Генератор событий на Go публикует события в Kafka Topic.
Flink обрабатывает поступившие события, агрегируя их каждые 30 секунд.
Агрегированные данные сохраняются в ClickHouse.
Полученные данные можно отслеживать и на их основе строить дашборды.

В статье я не буду описывать развертывание Kubernetes-кластера, а буду исходить из того, что он уже готов и имеет следующую топологию:
один неймспейс (Namespace) для операторов;
отдельный неймспейс для приложений общего вида (Common);
один неймспейс для стейджа (Stage);
один неймспейс для прода (Prod).
kubectl create namespace operators kubectl create namespace common kubectl create namespace stage kubectl create namespace prod
То есть у нас будет один кластер.
Также у нас будет:
одна мастер-нода, где будет крутиться весь Control Plane Kubernetes;
одна нода общего назначения (Common);
одна стейдж нода (Stage);
три ноды прода, растянутые по разным зонам доступности (gz1, me1, ms1).
kubectl get nodes -o custom-columns=NAME:.metadata.name NAME di-common-0 di-master-0 di-prod-gz1-0 di-prod-mel-0 di-prod-ms1-0 di-stage-0
Примечание: Весь упомянутый в статье код есть в публичном доступе на Gitverse — можете ознакомиться с ним и попробовать все, о чем я рассказал, самостоятельно.
Переходим к построению платформы
Этап 1. Подготовка
Поскольку изначально у нас есть только чистый Kubernetes, на первом этапе построения платформы нам нужно выполнить подготовку, то есть установить на K8S все нужные компоненты.
Multi-AZ StorageClass
Первым делом добавляем Multi-AZ StorageClass. Это нужно, поскольку в нашем облаке нет мультизональных сторадж-классов по умолчанию: так мы исключаем проблемы, которые возможны, если под и диск попадут в разные зоны доступности.
Добавить Multi-AZ StorageClass довольно просто:
apiVersion: storage.k8s.io/v1 kind: StorageClass metadata: name: csi-ceph-ssd annotations: storageclass.kubernetes.io/is-default-class: "true" parameters: type: ceph-ssd provisioner: cinder.csi.openstack.org reclaimPolicy: Delete volumeBindingMode: WaitForFirstConsumer allowVolumeExpansion: true
Операторы
Затем устанавливаем набор необходимых операторов. Среди таковых:
strimzi-kafka-operator;
alfinity-clickhouse-operator;
flink-operator-repo.
# Kafka operator helm install \ strimzi-kafka-operator \ oci://quay.io/strimzi-helm/strimzi-kafka-operator \ --version 0.47.0 \ -n operators \ -f files/strimzi-values.yaml # Clickhouse operator helm repo add \ altinity-clickhouse-operator \ https://docs.altinity.com/clickhouse-operator/ helm install \ altinity-clickhouse-operator \ altinity-clickhouse-operator/altinity-clickhouse-operator \ --version 0.25.3 \ -n operators \ -f files/altinity-values.yaml # Flink operator helm repo add \ flink-operator-repo \ https://downloads.apache.org/flink/flink-kubernetes-operator-1.12.1 helm install \ flink-kubernetes-operator \ flink-operator-repo/flink-kubernetes-operator \ --version 1.12.1 \ -n operators \ -f files/flink-values.yaml
Следом устанавливаем приложения общего вида.
Envoy Gateway — решение, предназначенное для управления API-шлюзами и сторонними шлюзами Kubernetes.
# Envoy gateway helm install \ eg \ oci://docker.io/envoyproxy/gateway-helm \ --version v1.5.0 \ -n operators
Grafana Operator — инструмент, предназначенный для автоматизации развертывания, настройки и управления компонентами мониторинга Grafana и Prometheus.
# Grafana operator helm repo add \ grafana \ https://grafana.github.io/helm-charts helm install \ grafana-operator \ grafana/grafana-operator \ --version v5.19.4 \ -n operators
VictoriaMetrics Operator — оператор Kubernetes, созданный для автоматического развертывания, конфигурации и управления системой мониторинга VictoriaMetrics.
# VictoriaMetrics operator helm install \ victoria-metrics \ oci://ghcr.io/victoriametrics/helm-charts/victoria-metrics-operator \ --version 0.53.0 \ -n operators
KEDA Operator — компонент Kubernetes, обеспечивающий установку, настройку и поддержание работоспособности механизма KEDA (Kubernetes-based Event Driven Autoscaler), предназначенного для динамического масштабирования приложений на основе обработки событий.
# Keda operator helm repo add \ kedacore \ https://kedacore.github.io/charts helm install \ keda \ kedacore/keda \ --version 2.17.2 \ -n operators
Chaos Mesh Operator — оператор, предназначенный для запуска экспериментов по методологии Chaos Engineering, позволяющих проверять устойчивость приложений и инфраструктурных компонентов путем моделирования сбоев и стрессовых ситуаций в контролируемой среде.
# Chaos mest operator helm repo add \ chaos-mesh \ https://charts.chaos-mesh.org helm install \ chaos-mesh \ chaos-mesh/chaos-mesh \ --version 2.7.2 \ -n operators \ -f files/chaos-mesh-values.yaml
Gateway
На следующем этапе настраиваем Gateway — стандарт API-интерфейсов и объект Kubernetes, определяющий способ предоставления сервиса внешним пользователям или другим сервисам вне кластера Kubernetes.
Для этого используем два простых инструмента: Helm и Kustomize.
Так, в Kustomize-файл для Gateway добавляем:
gatewayclass;
gateway;
reference-grant (ресурс управления доступом, позволяющий сервисному аккаунту получать права доступа к ресурсам другого Namespace);
config-map;
deployment-index;
service-index.
resources: - gatewayclass.yaml - gateway.yaml - reference-grant-prod.yaml - reference-grant-stage.yaml - reference-grant-operators.yaml - config-map-index.yaml - deployment-index.yaml - service-index.yaml
Сам Gateway также настраивается довольно легко: мы просто указываем ему списком набор портов, которые хотим вывести наружу. Например, web-порт, metabase-stage, metabase-prod и другие.
apiVersion: gateway.networking.k8s.io/v1 kind: Gateway metadata: name: eg namespace: common spec: gatewayClassName: eg listeners: - name: web protocol: HTTP port: 80 - name: metabase-stage protocol: HTTP port: 8000 - name: metabase-prod protocol: HTTP port: 9000 - name: chaos protocol: HTTP port: 10000 - name: grafana protocol: HTTP port: 8080 - name: victoria-metrics protocol: HTTP port: 8428 - name: victoria-metrics-agent protocol: HTTP port: 8429 - name: victoria-metrics-logs protocol: HTTP port: 9428
ReferenceGrant
Настройка ReferenceGrant подразумевает, что мы его явно устанавливаем в конкретный Namespace, где есть нужные сервисы, и разрешаем HTTPRoute из Namespace Common ходить в выбранную группу сервисов.
apiVersion: gateway.networking.k8s.io/v1beta1 kind: ReferenceGrant metadata: name: rg-common namespace: prod spec: from: - group: gateway.networking.k8s.io kind: HTTPRoute namespace: common to: - group: "" kind: Service
Routes Kustomize
При настройке роутов (routes) для начала решаем вопрос с Chaos Mesh. Так, по дефолту у него сложная авторизация с токеном, поэтому мы просто закрываем его с помощью basic-auth.
Далее добавляем роуты, то есть создаем их для каждого объекта:
http-route-metabase-stage;
http-route-metabase-prod;
http-route-grafana;
http-route-victoria-metrics;
http-route-victoria-metrics-agent;
http-route-victoria-metrics-logs;
http-route-index.
namespace: common secretGenerator: - name: basic-auth options: disableNameSuffixHash: true files: - .htpasswd resources: - http-route-metabase-stage.yaml - http-route-metabase-prod.yaml - http-route-grafana.yaml - http-route-victoria-metrics.yaml - http-route-victoria-metrics-agent.yaml - http-route-victoria-metrics-logs.yaml - http-route-chaos.yaml - http-route-index.yaml - security-policy-chaos.yaml
Для Chaos мы отдельно делаем security-policy-chaos и http-route-chaos. Это удобно, поскольку позволяет исключить влияние на http-route.
HTTP Route
Сам HTTP Route имеет типовую конфигурацию. Здесь мы указываем:
Gateway;
секцию (sectionName);
backendRefs.
apiVersion: gateway.networking.k8s.io/v1 kind: HTTPRoute metadata: name: chaos-dashboard spec: parentRefs: - name: eg sectionName: chaos rules: - backendRefs: - group: "" kind: Service name: chaos-dashboard port: 2333 weight: 1 namespace: operators matches: - path: type: PathPrefix value: /
Security Policy
Настройки Security Policy также типовые. Здесь мы указываем:
какой закрываем Route (targetRefs);
имя секрета (basicAuth).
apiVersion: gateway.envoyproxy.io/v1alpha1 kind: SecurityPolicy metadata: name: chaos-basic-auth spec: targetRefs: - group: gateway.networking.k8s.io kind: HTTPRoute name: chaos-dashboard basicAuth: users: name: basic-auth
VictoriaStack
VictoriaStack нужен, чтобы собирать метрики, логи и события из контейнеризированных приложений, визуализируя их в удобной форме. VictoriaStack упрощает мониторинг состояния системы, диагностику проблем и оптимизацию производительности.
Ставим сразу VictoriaMetrics и VictoriaLogs:
# Victoria Metrics helm install \ victoria-metrics \ oci://ghcr.io/victoriametrics/helm-charts/victoria-metrics-k8s-stack \ --version 0.60.1 \ -n common \ -f files/victoria-metrics-values.yaml # Victoria Logs helm install \ victoria-logs \ oci://ghcr.io/victoriametrics/helm-charts/victoria-logs-single \ --version 0.11.8 \ -n common \ -f files/victoria-logs-values.yaml kubectl wait deployments --all --for=condition=Available --timeout=5m -n common helm list -n common
В настройках VictoriaMetrics:
отключаем victoria-metrics-operator — это нужно, поскольку мы его подключили заранее;
добавляем объекты дашборда Grafana, указываем, какая именно должна быть Grafana:
victoria-metrics-operator: enabled: false fullnameOverride: example defaultDashboards: enabled: true grafanaOperator: enabled: true spec: instanceSelector: matchLabels: dashboards: example allowCrossNamespaceImport: true
настраиваем defaultDatasources:
defaultDatasources: enabled: true grafanaOperator: enabled: true spec: instanceSelector: matchLabels: dashboards: example allowCrossNamespaceImport: true
задаем параметры хранения:
vmsingle: spec: retentionPeriod: "24h" storage: accessModes: - ReadWriteOnce storageClassName: "csi-ceph-ssd" resources: requests: storage: 10Gi
отключаем создание собственной Grafana:
grafana: enabled: false forceDeployDatasource: true
Настройки VictoriaLogs во многом схожи: задаем Dashboards, retentionPeriod, Vector (универсальный агент для сбора, трансформации и доставки логов и метрик в Kubernetes).
dashboards: enabled: true grafanaOperator: enabled: true spec: allowCrossNamespaceImport: true instanceSelector: matchLabels: dashboards: example server: retentionPeriod: "24h" persistentVolume: enable: true accessModes: - ReadWriteOnce storageClassName: "csi-ceph-ssd" size: 20Gi vmServiceScrape: enabled: true vector: enabled: true tolerations: - effect: NoSchedule key: env operator: Exists
Scrapes Kustomize
После подготовки Victoria Stack нам нужно добавить Scrapes — процесс периодического сбора метрик с целевых сервисов и узлов инфраструктуры Kubernetes для последующего хранения и анализа в системах мониторинга.
Здесь отдельно добавляем:
service-scrape-clickhouse;
service-scrape-keda;
pod-scrape-flink;
pod-scrape-kafka.
namespace: common resources: - service-scrape-clickhouse.yaml - service-scrape-keda.yaml - pod-scrape-flink.yaml - pod-scrape-kafka.yaml
Pod-scrape для подов выглядят довольно просто. Мы указываем:
под для сбора метрик;
порт;
селектор.
apiVersion: operator.victoriametrics.com/v1beta1 kind: VMPodScrape metadata: name: kafka-brokers spec: namespaceSelector: any: true podMetricsEndpoints: - port: tcp-prometheus scheme: http selector: matchLabels: strimzi.io/kind: Kafka
Аналогично для сервисов:
apiVersion: operator.victoriametrics.com/v1beta1 kind: VMServiceScrape metadata: name: keda spec: namespaceSelector: any: true endpoints: - port: metrics selector: matchLabels: app.kubernetes.io/name: keda-operator-metrics-apiserver
Grafana Kustomize
Для Grafana задаем configMap, где лежат нужные нам дополнительные дашборды. Помимо этого, добавляем необходимые ресурсы:
grafana;
datasource-chaos;
datasource-victoria-logs;
dashboard-clickhouse;
dashboard-flink;
dashboard-kafka-exporter;
dashboard-keda;
dashboard-logs;
dashboard-show.
namespace: common configMapGenerator: - name: grafana-dashboards files: - dashboard-logs.json - dashboard-show.json options: disableNameSuffixHash: true resources: - grafana.yaml - datasource-chaos.yaml - datasource-victoria-logs.yaml - dashboard-clickhouse.yaml - dashboard-flink.yaml - dashboard-kafka-exporter.yaml - dashboard-keda.yaml - dashboard-logs.yaml - dashboard-show.yaml
Grafana
Настройки Grafana у нас также типовые. Здесь указываем Labels, который говорит, какие дашборды и datasource забирать, а также задаем параметры хранения (в рамках примера будем использовать SQLite).
apiVersion: grafana.integreatly.org/v1beta1 kind: Grafana metadata: name: example labels: dashboards: example spec: persistentVolumeClaim: spec: accessModes: - ReadWriteOnce resources: requests: storage: 10Gi storageClassName: csi-ceph-ssd config: security: admin_user: admin admin_password: admin service: spec: ports: - protocol: TCP port: 3000 targetPort: 3000 deployment: spec: template: spec: securityContext: fsGroup: 472 volumes: - name: grafana-data persistentVolumeClaim: claimName: example-pvc
Flink Dashboard
На последнем этапе подготовки настраиваем дашборды Flink. Для этого задаем:
instanceSelector;
allowCrossNamespaceImport;
указатель на Grafana (в нашем случае — id, но могут применяться и другие указатели, например ссылка).
apiVersion: grafana.integreatly.org/v1beta1 kind: GrafanaDashboard metadata: name: flink-dashboard spec: instanceSelector: matchLabels: dashboards: example allowCrossNamespaceImport: true grafanaCom: id: 14911
Результаты первого этапа
Пройдя все упомянутые шаги, мы фактически выполнили полную подготовку. А именно:
добавили StorageClass для мультизонной установки;
установили операторы, которые будут выполнять всю работу;
добавили Gateway, обеспечили сетевой доступ;
установили Grafana для визуализации;
добавили VictoriaMetrics и VictoriaLogs, чтобы больше знать о нашей системе.
Чтобы убедиться в корректности установки и настройки, можем перейти в Getaway. Например, можно посмотреть Data Sources — здесь уже есть:
Alertmanager;
chaosmesh;
VictoriaLogs;
VictoriaMetrics.

Также можем посмотреть доступные для Namespace дашборды.

В нашем случае здесь будет доступен целый набор дашбордов.

Например, при открытии дашборда VictoriaLogs можем увидеть, что за время настройки у нас уже успело заехать 34 тысячи логов.

Этап 2. Stage
Завершив подготовку, можем переходить к созданию тестовой среды, то есть Stage. Для этого также последовательно добавляем несколько компонентов и задаем их параметры.
Kafka Kustomize
Первым делом добавляем Kafka. Она состоит из трех ресурсов:
cluster;
node-pool;
topic.
resources: - cluster.yaml - node-pool.yaml - topic.yaml
Kafka Cluster
Далее настраиваем Kafka Cluster. Здесь все просто:
задаем имя кластеру;
подключаем использование node-pools и kraft:
apiVersion: kafka.strimzi.io/v1beta2 kind: Kafka metadata: name: example annotations: strimzi.io/node-pools: enabled strimzi.io/kraft: enabled spec: kafka: version: 4.0.0 metadataVersion: 4.0-IV3
указываем версию Kafka:
настраиваем Listeners:
задаем конфиг самой Kafka:
config: offsets.topic.replication.factor: 1 transaction.state.log.replication.factor: 1 transaction.state.log.min.isr: 1 default.replication.factor: 1 min.insync.replicas: 1
указываем entityOperator:
entityOperator: topicOperator: {} userOperator: {}
говорим, чтобы у нас здесь были метрики.
kafkaExporter: topicRegex: ".*" groupRegex: ".*"
Kafka Node Pool
Kafka Node Pool отвечает за то, какие ресурсы нам будут доступны в кластере. Здесь через селектор указываем:
название кластера;
количество реплик;
параметры хранилища;
перечень и объем доступных вычислительных ресурсов.
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaNodePool metadata: name: dual-role labels: strimzi.io/cluster: example spec: template: pod: {} replicas: 1 roles: - controller - broker storage: type: jbod volumes: - id: 0 type: persistent-claim size: 10Gi deleteClaim: true kraftMetadata: shared class: csi-ceph-ssd resources: limits: cpu: "2" memory: "2Gi" requests: memory: "512Mi" cpu: "250m"
Kafka Topic
Для Kafka Topic задаем:
имя;
селектор;
конфигурации партиций, количество реплик, Retention Period.
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaTopic metadata: name: events labels: strimzi.io/cluster: example spec: partitions: 5 replicas: 1 config: retention.ms: 86400000
ClickHouse Kustomize
Далее переходим к настройке ClickHouse.
ClickHouse в нашем случае состоит из двух компонентов:
cluster;
keeper (альтернатива ZooKeeper для ClickHouse).
Также в конфиге сразу прописываем migrate-configmap и migrate-job.
resources: - cluster.yaml - keeper.yaml - migrate-configmap.yaml - migrate-job.yaml
ClickHouse
Настройка самого ClickHouse сводится к указанию базовых параметров:
берем с сайта ClickHouse конфигурацию для инсталляции меньше 16 Гб;
разрешаем дефолтному пользователю доступ по IP;
указываем в качестве сервиса Keeper;
задаем количество реплик;
прописываем параметры хранилища;
указываем необходимое количество ресурсов.
apiVersion: clickhouse.altinity.com/v1 kind: ClickHouseInstallation metadata: name: example spec: defaults: templates: volumeClaimTemplate: default podTemplate: default configuration: settings: mark_cache_size: 524288000 max_server_memory_usage: 1500000000 profiles: default: max_threads: 1 max_block_size: 8192 max_download_threads: 1 input_format_parallel_parsing: 0 output_format_parallel_formatting: 0 users: default/networks/ip: - 0.0.0.0/0 zookeeper: nodes: - host: keeper-example port: 2181 clusters: - name: replicated layout: replicasCount: 1 templates: volumeClaimTemplates: - name: default spec: storageClassName: csi-ceph-ssd accessModes: - ReadWriteOnce resources: requests: storage: 10Gi podTemplates: - name: default spec: containers: - name: clickhouse-pod image: clickhouse/clickhouse-server:24.8 resources: limits: memory: "2048Mi" cpu: "2" requests: memory: "512Mi" cpu: "250m"
ClickHouse Keeper
ClickHouse Keeper устанавливается и настраивается практически идентично.
apiVersion: clickhouse-keeper.altinity.com/v1 kind: ClickHouseKeeperInstallation metadata: name: example spec: defaults: templates: volumeClaimTemplate: default podTemplate: default configuration: clusters: - name: replicated layout: replicasCount: 1 templates: volumeClaimTemplates: - name: default spec: storageClassName: csi-ceph-ssd accessModes: - ReadWriteOnce resources: requests: storage: 500Mi podTemplates: - name: default spec: containers: - name: clickhouse-pod image: clickhouse/clickhouse-server:24.8 resources: limits: memory: "2048Mi" cpu: "2" requests: memory: "512Mi" cpu: "250m"
Migrate Configmap
Для настройки Migrate Configmap полностью прописываем SQL-файл.
apiVersion: v1 kind: ConfigMap metadata: name: clickhouse-migration data: events_table.sql: | -- Создание реплицированной таблицы для агрегированных данных пользователей CREATE TABLE IF NOT EXISTS user_aggregations ON CLUSTER '{cluster}' ( user_id String, total_amount Float64, purchase_count UInt64, window_start Int64, window_end Int64, last_update_time Int64, created_at DateTime DEFAULT now() ) ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/user_aggregations', '{replica}') ORDER BY (user_id, window_start) PARTITION BY toYYYYMM(toDateTime(window_start / 1000)); -- Создание распределенной таблицы для запросов CREATE TABLE IF NOT EXISTS user_aggregations_distributed ON CLUSTER '{cluster}' AS user_aggregations ENGINE = Distributed('{cluster}', default, user_aggregations, rand());
Следом запускаем Migrate job. Здесь подключаем initContainer — специальный контейнер K8s, который запускается перед основными контейнерами пода и служит для подготовки среды исполнения основного приложения. Его цель заключается в выполнении предварительных операций, необходимых для нормального функционирования основного приложения.
apiVersion: batch/v1 kind: Job metadata: name: clickhouse-migrate spec: template: spec: initContainers: - name: wait-clickhouse image: jwilder/dockerize:v0.9.5 command: ["dockerize", "-wait", "http://clickhouse-example:8123", "--wait", "tcp://keeper-example:2181", "-timeout", "300s"] containers: - name: migrate image: clickhouse/clickhouse-server:24.8 command: ["clickhouse-client", "--host", "clickhouse-example", "--queries-file", "/events_table.sql"] volumeMounts: - mountPath: /events_table.sql subPath: events_table.sql name: create-table volumes: - name: create-table configMap: name: clickhouse-migration restartPolicy: Never backoffLimit: 4
Одновременно настраиваем монтирование добавленного SQL через ConfigMap.
Metabase Kustomize
Также для Stage подготавливаем UI для Metabase. Для этого задаем требуемые ресурсы:
deployment;
service;
pvc;
setup-job.
resources: - deployment.yaml - pvc.yaml - service.yaml - setup-job.yaml
Metabase Deployment
В Metabase Deployment настраиваем MB_SETUP_TOKEN и монтируем PVC для базы.
apiVersion: apps/v1 kind: Deployment metadata: name: metabase labels: app: metabase spec: replicas: 1 selector: matchLabels: app: metabase template: metadata: labels: app: metabase spec: containers: - name: metabase image: metabase/metabase:latest ports: - containerPort: 3000 name: http env: - name: MB_DB_TYPE value: "h2" - name: MB_DB_FILE value: "/metabase-data/metabase.db" - name: JAVA_TIMEZONE value: "UTC" - name: MB_ENCRYPTION_SECRET_KEY value: "your-secret-key-here-change-in-production" - name: MB_SETUP_TOKEN value: "0198bcd4-1492-78dd-98f0-261832a0378f" resources: requests: memory: "512Mi" cpu: "250m" limits: memory: "2Gi" cpu: "2" volumeMounts: - name: metabase-data mountPath: /metabase-data livenessProbe: httpGet: path: /api/health port: 3000 initialDelaySeconds: 60 periodSeconds: 30 readinessProbe: httpGet: path: /api/health port: 3000 initialDelaySeconds: 30 periodSeconds: 10 volumes: - name: metabase-data persistentVolumeClaim: claimName: metabase-pvc
Metabase setup job
Настройки Metabase setup job во многом схожи с параметрами для предыдущих компонентов:
настраиваем ожидание установки Metabase;
вкатываем с помощью Curl скрипт.
apiVersion: batch/v1 kind: Job metadata: name: metabase-setup spec: template: spec: initContainers: - name: metabase-init-clickhouse image: jwilder/dockerize:v0.9.5 command: [ "dockerize", "-wait", "http://metabase-service:3000", "-timeout", "300s" ] containers: - name: metabase-setup image: curlimages/curl:latest command: - /bin/sh - -c - | echo "Setting up Metabase admin user..." curl -X POST http://metabase-service:3000/api/setup \ -H "Content-Type: application/json" \ -d '{ "prefs": { "site_name": "Data internals 2025", "site_locale": "ru" }, "token": "0198bcd4-1492-78dd-98f0-261832a0378f", "user": { "first_name": "Admin", "last_name": "Admin", "email": "admin@example.com", "password": "admin123z!" } }' echo "Setup completed!" restartPolicy: OnFailure
Generator
Переходим к настройке генератора. Его конфиг стандартный:
прописываем ожидание Kafka, чтобы в нее писать;
указываем генерацию бачами по одному;
задаем частоту генерации (Generator Rate).
apiVersion: apps/v1 kind: Deployment metadata: name: generator labels: app: generator spec: replicas: 1 selector: matchLabels: app: generator template: metadata: labels: app: generator spec: initContainers: - name: wait-kafka image: jwilder/dockerize:v0.9.5 command: ["dockerize", "-wait", "tcp://example-kafka-bootstrap:9092", "-timeout", "300s"] containers: - name: generator image: totaki/generator:0.0.2 env: - name: KAFKA_BROKERS value: "example-kafka-bootstrap:9092" - name: GENERATOR_RATE value: "1" - name: BATCH_SIZE value: "1" - name: BURST_EVERY_MS value: ""
Flink Kustomize
Для Flink настраиваем основные ресурсы:
unt.deployment;
role-binding (чтобы Flink мог сам создавать поды и управлять ими);
service-account.
Также через .env добавляем в папку с секретами новый секрет. Это нужно, поскольку Flink умеет записывать на S3 чек-пойнты, чтобы знать, откуда стартовать, если произойдет сбой.
resources: - deployment.yaml - role-binding.yaml - service-account.yaml secretGenerator: - name: aws-params envs: - .env options: disableNameSuffixHash: true
Flink
В настройках самого Flink также все стандартно:
указываем метрики, которые отдаем (metrics.reporter);
задаем параметры высокой доступности;
назначаем стратегию перезапуска для обработки сбоев;
настраиваем джобу, в которой определяем необходимость ожидания доступности Kafka и ClickHouse, а также включаем встроенные плагины и Prometheus.
apiVersion: flink.apache.org/v1beta1 kind: FlinkDeployment metadata: name: basic-example spec: image: flink:1.20 flinkVersion: v1_20 flinkConfiguration: taskmanager.numberOfTaskSlots: "2" # Метрики metrics.reporter.prom.factory.class: org.apache.flink.metrics.prometheus.PrometheusReporterFactory metrics.reporter.prom.port: 9249-9250 # Параметры высокой доступности execution.checkpointing.interval: 10s execution.checkpointing.mode: AT_LEAST_ONCE execution.checkpointing.externalized-checkpoint-retention: RETAIN_ON_CANCELLATION high-availability.type: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory s3.endpoint: https://hb.ru-msk.vkcloud-storage.ru s3.path.style.access: "true" s3.endpoint.region: ru-msk # Стратегия перезапуска для обработки сбоев restart-strategy: failure-rate restart-strategy.failure-rate.max-failures-per-interval: "5" restart-strategy.failure-rate.failure-rate-interval: 5min restart-strategy.failure-rate.delay: 10s serviceAccount: flink jobManager: resource: memory: "2048m" cpu: 1 taskManager: replicas: 1 resource: memory: "2048m" cpu: 1 job: jarURI: https://data-internals-2025.hb.ru-msk.vkcloud-storage.ru/job/flink-user-aggregation-1.0.5.jar parallelism: 2 upgradeMode: stateless podTemplate: spec: initContainers: - name: wait-kafka-clickhouse image: jwilder/dockerize:v0.9.5 command: ["dockerize", "-wait", "tcp://example-kafka-bootstrap:9092", "-wait", "http://clickhouse-example:8123", "-timeout", "300s"] containers: - name: flink-main-container envFrom: - secretRef: name: aws-params env: - name: "KAFKA_BROKERS" value: "example-kafka-bootstrap:9092" - name: "CLICKHOUSE_URL" value: "jdbc:clickhouse://clickhouse-example:8123/default" - name: ENABLE_BUILT_IN_PLUGINS value: flink-s3-fs-presto-1.20.2.jar - name: "PROCESSING_DELAY_MS" value: "0" - name: "CLICKHOUSE_BATCH_SIZE" value: "100" - name: "CLICKHOUSE_BATCH_INTERVAL_MS" value: "50" ports: - name: prometheus containerPort: 9249 protocol: TCP
Результаты второго этапа
Таким образом, на этапе подготовки Stage мы:
установили Kafka и добавили топик;
установили ClickHouse;
накатили миграции ClickHouse;
установили Metabase;
инициализировали Metabase;
запустили Flink job;
запустили генератор тестовых данных.
Теперь, если мы перейдем на Gateway, сможем попасть на наш Stage.

Здесь можем добавить новую базу ClickHouse.

В рамках примера все параметры можем оставить по умолчанию.

После этого можно перейти в User Aggregation и посмотреть инсерты вновь созданной базы ClickHouse.

К этому моменту данные уже начинают агрегироваться.

Поэтому при переходе через Metabase в только что созданную базу ClickHouse можно будет увидеть первые графики с детализацией инсертов.

Этап 3. Production
На следующем этапе переходим к созданию высокодоступного прода на несколько реплик. Для этого обновляем и настраиваем несколько компонентов.
Kafka
Для Kafka добавляем автобалансировку (autoRebalance) при добавлении и удалении брокеров (add-brokers и remove-brokers соответственно).
apiVersion: kafka.strimzi.io/v1beta2 kind: Kafka metadata: name: example spec: cruiseControl: autoRebalance: - mode: add-brokers template: name: example-add-brokers-rebalancing-template - mode: remove-brokers template: name: example-remove-brokers-rebalancing-template
Kafka Node Pool
Для Kafka Node Pool указываем, что:
у нас две реплики (replicas:2);
можем ехать на ноды прода (tolerations - value “prod”);
должны растянуться по нескольким зонам доступности (задаем через matchExpressions);
поды должны ехать на ноды прода.
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaNodePool metadata: name: dual-role spec: replicas: 2 template: pod: tolerations: - key: "env" operator: "Equal" value: "prod" effect: "NoSchedule" affinity: podAntiAffinity: requiredDuringSchedulingIgnoredDuringExecution: - labelSelector: matchExpressions: - key: strimzi.io/name operator: In values: - example-kafka topologyKey: topology.kubernetes.io/zone nodeAffinity: requiredDuringSchedulingIgnoredDuringExecution: nodeSelectorTerms: - matchExpressions: - key: env operator: In values: - prod
Kafka Topic
В Kafka Topic увеличиваем количество партиций и реплик — остальные настройки останутся без изменений.
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaTopic metadata: name: events spec: partitions: 10 replicas: 3
ClickHouse
Для ClickHouse делаем то же самое:
меняем количество реплик;
позволяем работать с продом;
указываем, что нужно растянуться по нескольким зонам доступности и что поды должны ехать на ноды прода.
- op: replace path: /spec/configuration/clusters/0/layout/replicasCount value: 2 - op: replace path: /spec/templates/podTemplates/0/spec/tolerations value: - key: "env" operator: "Equal" value: "prod" effect: "NoSchedule" - op: replace path: /spec/templates/podTemplates/0/spec/affinity value: podAntiAffinity: requiredDuringSchedulingIgnoredDuringExecution: - labelSelector: matchExpressions: - key: clickhouse.altinity.com/chi operator: In values: - example topologyKey: topology.kubernetes.io/zone nodeAffinity: requiredDuringSchedulingIgnoredDuringExecution: nodeSelectorTerms: - matchExpressions: - key: env operator: In values: - prod
ClickHouse Keeper
Для ClickHouse Keeper все аналогично.
- op: replace path: /spec/configuration/clusters/0/layout/replicasCount value: 2 - op: replace path: /spec/templates/podTemplates/0/spec/tolerations value: - key: "env" operator: "Equal" value: "prod" effect: "NoSchedule" - op: replace path: /spec/templates/podTemplates/0/spec/affinity value: podAntiAffinity: requiredDuringSchedulingIgnoredDuringExecution: - labelSelector: matchExpressions: - key: clickhouse.altinity.com/chk operator: In values: - example topologyKey: topology.kubernetes.io/zone nodeAffinity: requiredDuringSchedulingIgnoredDuringExecution: nodeSelectorTerms: - matchExpressions: - key: env operator: In values: - prod
Metabase
Для Metabase также настраиваем работу только на нодах прода.
apiVersion: apps/v1 kind: Deployment metadata: name: metabase spec: template: spec: tolerations: - key: "env" operator: "Equal" value: "prod" effect: "NoSchedule" affinity: nodeAffinity: requiredDuringSchedulingIgnoredDuringExecution: nodeSelectorTerms: - matchExpressions: - key: env operator: In values: - prod
Flink
Для Flink мы используем S3-бакет для хранения чек-пойнтов. Поэтому здесь также изменяем директорию. Все остальные параметры меняем так же, как и для предыдущих ресурсов.
apiVersion: flink.apache.org/v1beta1 kind: FlinkDeployment metadata: name: basic-example spec: flinkConfiguration: execution.checkpointing.savepoint-dir: s3://data-internals-2025/prod/savepoints state.checkpoints.dir: s3://data-internals-2025/prod/checkpoints high-availability.storageDir: s3://data-internals-2025/prod/ha podTemplate: spec: tolerations: - key: "env" operator: "Equal" value: "prod" effect: "NoSchedule" affinity: nodeAffinity: requiredDuringSchedulingIgnoredDuringExecution: nodeSelectorTerms: - matchExpressions: - key: env operator: In values: - prod
Role Binding
Для Role Binding нам надо, чтобы сервисный аккаунт работал с продом. Для этого определяем prod в качестве основного Namespace.
- op: replace path: /subjects/0/namespace value: prod
Результаты третьего этапа
Подготовка прода на основе уже имеющихся конфигов — довольно простая задача. Так, изменив всего несколько параметров, мы:
добавили реплики для Kafka и ClickHouse;
добавили автобалансировку для Kafka;
растянули компоненты на несколько зон доступности.
Стоит отметить, что минимальный патчинг ресурсов не просто упрощает подготовку прода. Помимо этого он также позволяет гарантировать, что различия между Stage- и Prod-окружениями будут минимальны.
Доступные ресурсы в проде можем проверить через вывод.

Здесь, например, видим:
две реплики ClickHouse;
две реплики Kafka;
entity-operator;
kafka-exporter и другие ресурсы.
Можно также посмотреть на графики.
Так, у нас уже появилась Kafka для прода.

Flink также работает — получает и обрабатывает сообщения.
Также можем посмотреть на ClickHouse — он тоже заработал.

Этап 4. Масштабирование
После подготовки Stage- и Prod-окружения нужно позаботиться о масштабировании, чтобы адаптировать платформу под возможный рост нагрузки.
Возможны два варианта:
ручное масштабирование;
автоматическое масштабирование.
Ручное масштабирование
В случае ручного масштабирования для Kafka и ClickHouse все сводится к замене количества реплик — то есть мы просто указываем для ClickHouse, ClickHouse Keeper и Kafka Node Pool, что вместо двух реплик нужно три.
namespace: prod resources: - ../base patches: - target: kind: ClickHouseInstallation name: example patch: |- - op: replace path: /spec/configuration/clusters/0/layout/replicasCount value: 3 - target: kind: ClickHouseKeeperInstallation name: example patch: |- - op: replace path: /spec/configuration/clusters/0/layout/replicasCount value: 3 - target: kind: KafkaNodePool name: dual-role patch: |- - op: replace path: /spec/replicas value: 3
В данном случае ручная настройка оптимальна потому, что мы работаем со Stateful-приложениями, которые хранят данные, и их не всегда возможно корректно масштабировать автоматически.
При этом масштабирование выполняется быстро. Если посмотреть на поды, то можно увидеть, что новые реплики начинают создаваться сразу.

Автоматическое масштабирование
В Kubernetes есть HorizontalPodAutoscaler (HPA) — встроенный механизм, предназначенный для автоматического масштабирования количества реплик приложения в зависимости от нагрузки.
HPA отслеживает метрики ресурсов (например, CPU Usage, Memory Consumption) или кастомные метрики (например, количество запросов HTTP, очереди сообщений). Если нагрузка превышает заданные пороговые значения, HPA автоматически увеличивает число подов, обеспечивая лучшую производительность и устойчивость системы. Когда нагрузка снижается, HPA уменьшает количество подов, оптимизируя использование ресурсов кластера.
Для определения необходимого количества реплик используется следующая формула:
y = x * (zc/zt)
x — текущее количество реплик;
zc — текущее значение метрики;
zt — пороговое значение метрики.
Рассмотрим, как автомасштабирование настраивается для отдельных компонентов.
KEDA Generator
Для KEDA Generator определяем:
скейлинг деплоймента;
ollingInterval — интервал времени, через который HPA проверяет состояние и собирает новые метрики производительности (такие как использование CPU, памяти или любые другие настроенные метрики);
cooldownPeriod — временной промежуток, в течение которого HPA воздерживается от дальнейшего увеличения или уменьшения количества реплик после последнего масштабирования;
минимальное и максимальное количество реплик (minReplicaCount, maxReplicaCount);
scaleUp и scaleDown для самого horizontalPodAutoscaler;
сбор метрик с Prometheus.
apiVersion: keda.sh/v1alpha1 kind: ScaledObject metadata: name: generator-scaler spec: scaleTargetRef: name: generator kind: Deployment pollingInterval: 10 cooldownPeriod: 120 minReplicaCount: 1 maxReplicaCount: 3 advanced: horizontalPodAutoscalerConfig: behavior: scaleUp: stabilizationWindowSeconds: 120 scaleDown: stabilizationWindowSeconds: 120 triggers: - type: prometheus metricType: Value metadata: serverAddress: http://vmsingle-example.common.svc.cluster.local:8428/prometheus query: 1 + max(0, (1 - (count(kube_pod_info{namespace="prod", pod=~"generator.*"}) - count(kube_pod_info{namespace="prod", created_by_name="basic-example"}))) * 0.2) threshold: '1'
Примечательно, что формулу для автомасштабирования мы составляем таким образом, чтобы у генератора всегда было на одну реплику больше, чем у Flink.
KEDA Flink
Для Flink делаем то же самое, но дополнительно настраиваем:
scaleTargetRef — ссылку на объект, который подлежит масштабированию в Kubernetes (в нашем случае это FlinkDeployment);
формулу расчета, согласно которой Flink должен будет масштабироваться вслед за генератором.
apiVersion: keda.sh/v1alpha1 kind: ScaledObject metadata: name: flink-kafka-scaler spec: scaleTargetRef: name: basic-example kind: FlinkDeployment apiVersion: flink.apache.org/v1beta1 pollingInterval: 10 cooldownPeriod: 120 minReplicaCount: 1 maxReplicaCount: 3 advanced: horizontalPodAutoscalerConfig: behavior: scaleDown: stabilizationWindowSeconds: 120 triggers: - type: prometheus metricType: Value metadata: serverAddress: http://vmsingle-example.common.svc.cluster.local:8428/prometheus query: max(1, 1.2 - max(0, (1 - (count(kube_pod_info{namespace="prod", pod=~"generator.*"}) - count(kube_pod_info{namespace="prod", created_by_name="basic-example"}))) * 0.2)) threshold: '1'
Если все настроено верно, то подобная «гонка» автомасштабирования между Flink и генератором будет иметь следующий вид:

Здесь видно, что генератор скалируется первым, а Flink пытается его догнать. При этом можно сразу отслеживать, какое количество реплик ожидается.
KEDA Stop Flink и Generator
Также можно настроить остановку автоматического масштабирования Flink. Для этого добавляем аннотацию paused-replicas: “1” — то есть просим остановить масштабирование и оставить одну реплику.
- op: add path: /metadata/annotations value: autoscaling.keda.sh/paused-replicas: "1"
Для генератора схожие настройки.
- op: replace path: /spec/triggers/0/metadata/query value: 'max(0, 1 - (count(kube_pod_info{namespace="prod", pod=~"generator.*"}) - count(kube_pod_info{namespace="prod", created_by_name="basic-example"})) * 0.2)'
Результаты четвертого этапа
По итогам четвертого этапа мы настроили масштабирование Kafka и ClickHouse, а также протестировали автомасштабирование.
При этом стоит отметить, что автоматическое масштабирование не всегда удобно и оправдано:
во-первых, масштабирование хоть и настраивается, но не всегда можно контролировать в реальном времени;
во-вторых, надо четко понимать, на какие метрики ориентироваться для определения необходимости масштабирования.
Этап 5. Хаос
Последний этап создания платформы для потоковой обработки данных на K8s — генерация хаоса, то есть моделирование отказов для проверки отказоустойчивости приложений и инфраструктуры, а также выявления слабых мест для последующего повышения надежности системы.
В рамках примера будем использовать Chaos Mesh — Open-Source-платформу для внедрения Chaos Engineering в Kubernetes.
Реализуется это просто:
запускаем эксперимент;
указываем, что нам надо убить 100% подов;
определяем продолжительность отказа на уровне 120 секунд;
задаем зону доступности, в которой надо отключить поды.
apiVersion: chaos-mesh.org/v1alpha1 kind: PodChaos metadata: name: pod-failure-example spec: action: pod-failure mode: fixed-percent value: "100" duration: '120s' selector: nodeSelectors: mcs.mail.ru/mcs-nodepool: prod-ms1
После запуска эксперимента можем сразу отслеживать его ход через Chaos Mesh. Например, можем сразу увидеть, что именно вышло из строя.

Также можем посмотреть состояние компонентов на уровне подов.

Аналогично ход эксперимента и изменение метрик можно отслеживать и на графиках.

Таким образом, с помощью Chaos Mesh мы можем провести эксперимент с отказом одной зоны доступности и протестировать отказоустойчивость построенной платформы для потоковой обработки данных.
Вместо выводов
Представленный мной алгоритм создания платформы для потоковой обработки данных не универсальная инструкция, а лишь демонстрация того, что построить подобное решение на Kubernetes можно, причем весь процесс во многом напоминает сборку конструктора.
При этом совершенно не обязательно ограничиваться работой с Kubernetes в облаках:
вместо облаков можно попробовать дистрибутивы (например, k3s, k0s);
если нужно больше автоматизации, подойдет GitOps (например, ArgoCD, Flux);
если не хочется работать с kubectl, можно попробовать другие инструменты (например, tanka, terraform, pullumi).
А как вы строите платформы по обработке потоковых данных и сталкивались ли с решениями на Kubernetes? Делитесь опытом и впечатлениями.