Привет, Хабр! Меня зовут Александр Цай, я ведущий аналитик в МТС Web Services, но на деле занимаюсь всеми вопросами, касающимися DA/DE/BI: выявлением потребностей и сбором требований, проектированием дашбордов и витрин для них, построением и развитием внутреннего хранилища, поиском источников данных, созданием сложных ETL-пайплайнов по их доставке, DQ, проведением аналитики и много чем еще.
В этом материале я расскажу про разворачивание пайплайна по стримингу данных из MongoDB в PostgreSQL с помощью Apache Flink (стримить из Kafka банально, а так заодно пощупаем документоориентированную БД). Делать это мы будем в minikube (kubernetes), а языком программирования для заданий выступит Python. Все описанное в посте выполняется на MacBook с процессором i7.
В интернете, тем более русскоязычном, нет информации о стриминге из MongoDB в Postgres с помощью Flink. Почти все материалы по Flink, которые мне попадались, сводятся к пережевыванию примера WordCount из flink-kubernetes-operator, где на запущенном поде из папки с примерами читается файл и в консоль выводится количество слов в нем. Если спускаться до использования PyFlink, то мы натыкаемся на кастомные образы с Harness SDK и Apache Beam и другие страшные слова. Знакомо?
Так вот, это не наш путь! Данное руководство будет полезно тем, кто такой же извращенец хочет пощупать Flink на родном Python и кто не планирует брать примеры, оторванные от реальности. Все материалы можно найти у меня в репозитории GitHub.

Дисклеймер
Я не претендую на истину в последней инстанции и с удовольствием почитаю ваши комментарии, что можно было бы улучшить и как.
Также уточню, что все секреты захардкоржены прямо в манифестах, так как это учебный проект. Я не стал заморачиваться, но в перспективе, конечно же, стоит.
Содержание:
Подготовка
Что мы будем использовать:
любое IDE по вашему выбору — у меня VsCode;
Minikube (kubernetes) как среду для развертывания;
Docker для работы minikube;
Helm в качестве менеджера пакетов для установки;
Flink-kubernetes-operator для одного из способов развертывания;
Minio как внешнее хранилище для checkpoints/savepoints Flink (об этом будет подробнее в соответствующем разделе);
mongodb как источник;
postgresql в качестве таргетной базы.
Останавливаться на установке docker, minikube и helm я не буду: у них отличная документация, и ее изучение не займет у вас много времени. Перейдем сразу к развертыванию компонентов стриминга.
Итак, вы склонировали себе репозиторий, и теперь после установки вышеперечисленного нам нужно запустить minikube. Для этого в терминале пишем:
minikube start --cpus=5 --memory=8096;
Ресурсы выделяйте по своему усмотрению. Когда мини-куб успешно запустится, мы получим такой вывод:

Теперь запускаем в отдельной вкладке терминала Web UI minikube dashboard:
minikube dashboard;
Получаем:

Видим, что в браузере открылся WebUI. Отлично. Minikube запущен, интерфейс доступен. Далее мы можем видеть все, что у нас происходит.
Установим сертификаты, которые нужны для работы Flink-оператора:
kubectl apply -f https://github.com/jetstack/cert-manager/releases/download/v1.11.0/cert-manager.yaml;
Они достаточно долго поднимаются, поэтому при установке Flink-оператора могут возникнуть ошибки с сертификатами. Если вы с этим столкнулись, то просто подождите немного и повторите попытку.
Установка и настройка MongoDB
Если у вас MacBook на M-чипе, то на этом шаге вы получите ошибку:
ErrImagePull: no matching manifest for linux/arm64/v8 in the manifest list entries
На Stackoverflow в этом случае предлагают перезапустить кластер под amd64 архитектуру:
minikube start --cpus=5 --memory=8096 --driver=docker --kubernetes-version=v1.26.3 --base-image=kicbase/ubuntu:v1.26.3
Работает ли этот способ, сказать не могу, так как ноутбука на M-чипе у меня нет. Если кто пробовал, то дополните в комментариях.
Разворачивать будем через helm-образ от bitnami. Поднять MongoDB нам надо в режиме репликации, а сделать это в kubernetes — задача нетривиальная. Для этого нужно проделать множество ручных манипуляций — с использованием helm chart все сводится к нескольким командам в консоли и yaml-файлу на 12 строчек.
Режим репликации нужен, так как cdc-коннектор подключается к OPLOG-журналу (фиксирует все происходящие операции, чтобы применить их на репликах) и пишется только в случае, если база является частью replica set.
В каталоге bitnami_mongo_replica_set лежат 2 файла: values.yaml и init-mongo.js. Первый служит конфигом для установки образа, а второй требуется, чтобы инициализировать коллекцию и закинуть в нее несколько документов для старта.
Для удобства создаем namespace и configMap, устанавливаем нашу MongoDB в режиме replica set:
kubectl create namespace mongodb;
kubectl -n mongodb create configmap init-mongo --from-file ./bitnami_mongo_replica_set/init-mongo.js;
helm install mongodb oci://registry-1.docker.io/bitnamicharts/mongodb -f ./bitnami_mongo_replica_set/values.yaml --namespace mongodb;
В результате мы получим вывод с информацией:

Теперь можно заглянуть в наш WebUI и убедиться, что поды поднялись и все работает:

Далее можно проверить, что коллекция и документы в ней успешно созданы. Для этого идем внутрь пода: нажимаем на под mongodb-0, затем в правом верхнем углу кликаем на exec into pod. Вводим команды:
mongosh;
use database;
db.users.find();
В итоге получаем:

Установка и настройка PostgreSQL
Теперь поднимем Postgres со всем необходимым для доступа извне, а также создадим таблицу-приемник:
kubectl apply -f ./postgresql-deployment.yaml;
В postgresql-deployment.yaml формируются отдельный namespace, сервисы для доступа к базе данных внутри кластера и извне, а также инициализируется база данных и таргетная таблица, куда наша будущая Flink job будет лить данные.
При инициализации на таблицу вешаются 2 триггера для обновления таймстампа записи и создания таймстампа вместо удаления строки (soft delete). Так мы наглядно видим, что задание flink корректно обрабатывает не только insert, но также update и delete.

Как и в случае с MongoDB, с помощью консоли внутри пода проверяем, что таблица создана:
psql -U user -password password -p 5432 -d mydb;
SELECT table_name FROM information_schema.tables WHERE table_schema='public';
Также можно настроить подключение через тот же DBeaver, но для доступа извне необходимо пробросить порты. Для этого открываем новую вкладку терминала и пишем:
kubectl port-forward service/postgresql-nodeport 30432:5432 --namespace=postgresql;
После чего настраиваем подключение:

Установка и настройка MinIo
В нашем случае Minio выступает в качестве внешнего хранилища для checkpoint- и savepoint-заданий Flink. Это необходимо для обеспечения их отказоустойчивости при падениях или ручной остановке с созданием контрольной точки. Иначе перезапущенное задание будет проходить весь oplog-журнал (в нашем случае) с самого начала.
Flink может хранить состояние без внешнего хранилища, используя FsStateBackend/RocksDBStateBackend или MemoryStateBackend. Первый вариант обеспечивает отказоустойчивость, а второй — нет, поэтому предусмотрите достаточное место для их хранения и внесите изменения в манифесты с конфигами для state.backend, а также добавьте создание PVC. В этом посте рассматриваться будет только вариант с MinIo.
Здесь аналогично Postgres в манифесте minio.yaml формируются namespace, сервисы доступа, персистентное хранилище и инициализируется бакет, в котором Flink создает каталоги для заданий.
kubectl apply -f ./minio.yaml;
Для проверки пробрасываем порты в отдельной вкладке терминала:
kubectl port-forward services/minio-service 9090:9090 --namespace=minio;
Теперь видим интерфейс MinIo с созданным бакетом под checkpoint Flink:

Опционально: Fake pipe
Необязательный шаг, в котором мы поднимем под с генератором данных в MongoDB, чтобы имитировать постоянное поступление данных в нашу коллекцию.
В первую очередь нам нужно создать кастомный докер-образ с генератором. Для этого открываем новую вкладку в терминале (да, опять) и пишем:
eval $(minikube docker-env);
docker build -t myimages/python_mongo_generator:latest ./fake_source_pipe;
Получаем:

Команды, которые вводили выше, создают образ внутри minikube. Если не сделать eval $(minikube docker-env), то он соберется в локальном окружении и minikube его не увидит.
После его сборки можно применить манифест для поднятия пода с генератором, который яростно начнет слать данные в нашу коллекцию MongoDB:
kubectl apply -f ./fake_source_pipe/python-pod.yaml;
На этом подготовка инфраструктуры завершена. У нас есть источник данных в виде коллекции users в MongoDB, таблица-приемник users_postgres в PostgreSQL, а также есть внешнее хранилище S3 с бакетом для checkpoint’ов/savepoint’ов Flink в виде MinIo. Фейковый генератор уже вовсю старается и наполняет коллекцию.
Работа с Apache Flink
Поднять Apache Flink можно двумя способами:
При application mode мы используем flink-kubernetes-operator. В этом режиме для каждого задания разворачивается микрокластер, состоящий из своих job- и task-менеджеров. Этот подход дает лучшую изоляцию процессов и утилизацию ресурсов. Оператор берет на себя все заботы по менеджменту заданий. Правда, у каждого задания есть свой отдельный WebUI-интерфейс, что неудобно.
При session mode мы будем разворачивать Apache Flink на нативном kubernetes. В этом режиме у нас один job manager (не включая реплики для обеспечения высокой доступности), соответственно, единый WebUI для всех заданий. Но менеджмент заданий
на ручной коробке передачуже берет на себя разработчик.
В одноименных каталогах flink_cluster_high_availability, flink_cluster_no_high_availability и flink-operator/flink_job лежат манифесты для запуска, докер-файлы для сборки образов и Python-скрипты с кодом задания. Они немного различаются между собой для разных методов, поэтому в каждом каталоге будет свой набор под конкретный метод.
Мы попробуем оба подхода, чтобы лучше понять, как все происходит.
В обоих вариантах перед развертыванием Flink нужно создать наш докер-образ, где настроены все зависимости, установлены библиотеки и коннекторы и так далее. Для этого, как и в случае с Fake pipe, выполняем в отдельной вкладке терминала:
eval $(minikube docker-env);
docker build -t myimages/pyflink-base:latest ./path/to/folder/with/flink_jobs;
Развертываем Flink c помощью Flink-kubernetes-operator
Сначала установим оператор. Для этого добавим репозиторий с нужной версией в helm:
helm repo add flink-operator-1-10-0-repo https://downloads.apache.org/flink/flink-kubernetes-operator-1.10.0/;
helm install flink-kubernetes-operator flink-operator-1-10-0-repo/flink-kubernetes-operator --namespace flink-operator --create-namespace;

К этому моменту у нас уже есть сертификаты и подготовлен докер-образ, по желанию в MongoDB уже льются данные. Теперь мы можем сделать submit нашего задания во Flink:
kubectl apply -f ./flink-operator/flink_jobs/flink-users-job.yaml;
В этом манифесте указаны:
ресурсы, которые мы хотим выделить для задания;
настройки для хранения checkpoint и savepoint;
докер-образ и аргументы, что мы запускаем Python-задание;
параметры подключения к MinIo, а также используемый для подключения плагин.
Это важный момент, так как их два: presto и hadoop. Я рекомендую взять hadoop, т. к. presto не позволяет восстанавливать состояние задания из checkpoint. Да, это вот так работает. Не спрашивайте почему, сам не знаю.
Отдельно еще расскажу про две настройки:
upgrade mode отвечает за способ обновления задания. Я установил savepoint. Если мы применим манифест с изменениями, то оператор перед остановкой сделает его слепок задания и при перезапуске использует его, чтобы продолжить работу с того места, где был остановлен.
state непосредственно указывает, стоит ли запустить или приостановить задание.
Итак, мы применили манифест, задание поднялось:

Для доступа к WebUI этого задания нужно пробросить порты до пода. Открываем новую вкладку терминала и пишем:
kubectl port-forward -n flink-operator <flink-jobmanager-pod-id> 8081:8081;
Теперь мы видим всю информацию:

Сам интерфейс интуитивно понятен, в нем вы можете посмотреть состояние задания, checkpoint’ы, логи job- и task-менеджеров и т. д.
Если все работает как надо, то в таблице PostgreSQL вы увидите записи, перекочевавшие из коллекции MongoDB. А если запускали Fake pipe, то они будут продолжать активно литься.

Чтобы остановить задание, меняем state в манифесте с "running" на "suspended" и применяем его.
Поздравляю, вы только что настроили пайплайн стриминговой обработки данных из MongoDB в PostgreSQL с помощью Apache Flink с использованием kubernetes. А теперь перейдем к более интересному.
Flink Native Kubernetes
Если вы выполняли все последовательно, то давайте сначала удалим Flink-оператор:
helm uninstall flink-kubernetes-operator -n flink-operator;
kubectl delete namespace flink-operator;
А если сразу перешли сюда, то пропустите этот шаг.
Итак, у нас есть 2 каталога: flink_cluster_high_availability и flink_cluster_no_high_availability. Первый разворачивает минимум два job manager для обеспечения высокой доступности и использует возможности kubernetes для восстановления после сбоев, а второй, очевидно из названия, нет. © кэп
Принципиально они отличаются только прописываемой конфигурацией и числом обязательных реплик job manager. Тут выбирайте что вам хочется, в остальном процесс будет одинаковый. Я разверну в режиме high availability.
Внутри папки flink_cluster_high_availability лежат манифесты для формирования и биндинга роли, создания сервисного аккаунта, конфиг-мапа для прокидывания наших настроек во Flink, а также непосредственно деплоймента и разворачивания сопутствующих сервисов для доступа. Flink-configuration-configmap.yaml содержит в себе данные и настройки для подключения MinIo, настройки стратегии хранения checkpoint’ов и логирования.
Для этого подхода открываем новую вкладку терминала и пишем:
eval $(minikube docker-env);
docker build -t myimages/pyflink-base:latest ./flink_cluster_high_availability/flink_jobs;
По готовности применяем манифесты в папке:
kubectl apply -f ./flink_cluster_high_availability;

После поднятия подов пробрасываем в отдельной вкладке терминала (как мы любим) порты до WebUI.
kubectl port-forward services/flink-jobmanager-rest 8081:8081 --namespace=flink;
Итак, у нас есть развернутый Flink с одним или несколькими job manager, куда мы будем сабмитить задания. В отличие от работы с оператором, делать это нам придется потными руками. Для этого копируем скрипт с заданием на под job manager. Да, это можно было реализовать на уровне образа, но я захотел так. Что вы мне сделаете, я текст).
Для этого смотрим id любого из наших job manager, копируем скрипт с заданием на под и запускаем:
kubectl -n flink cp ./flink_cluster_high_availability/flink-job/users-job.py <flink-job-manager-pod-id>:/tmp/users-job.py;
kubectl -n flink exec -it <flink-job-manager-pod-id> -- /opt/flink/bin/flink run --python /tmp/users-job.py;

Задание появится в WebUI, checkpoint’ы начнут писаться в MinIo, а в таблицу Postgres полетят записи.

Для остановки задания можно в WebUI в правом верхнем углу нажать Cancel Job (при этом оно просто завершится и savepoint не создастся). Правильнее это делать как оператор: остановить задание созданием savepoint в MinIo.
Провернем мы это, конечно же, вручную, указав id задания и путь для сохранения savepoint:
kubectl -n flink exec -it <flink job-manager-pod-id> -- /opt/flink/bin/flink stop --savepointPath s3a://apache-flink/savepoints/users-job/ <job-id>;

Соответственно, чтобы запустить задание с savepoint или checkpoint, сходим в MinIo посмотреть путь до него. Так как в конфиге Flink мы установили execution.checkpointing.externalized-checkpoint-retention: RETAIN_ON_CANCELLATION, checkpoint не удаляются при остановке задания, и запустить мы его можем не только с savepoint, но и с checkpoint. В противном случае они бы удалились, и у нас бы остался только вариант предварительного создания savepoint.
Перезапускаем задание с cheсkpoint:

Смотрим нужный нам checkpoint:
kubectl -n flink exec -it <flink-jobmanager-pod-id> -- /opt/flink/bin/flink run --python /tmp/flink-job/users-job.py -s s3a://apache-flink/checkpoints/users-job/aa05d2d7f89335aa898c381dba05a420/chk-58;
Или с savepoint:

Смотрим savepoint:
kubectl -n flink exec -it <flink-jobmanager-pod-id> -- /opt/flink/bin/flink run --fromSavepoint s3a://apache-flink/savepoints/users-job/savepoint-829251-03bc2e202d0e --python /tmp/users-job.py;
Тут сделаю ремарку про отличие checkpoint и savepoint. Они похожи, и их часто путают, но при этом:
checkpoint’ы нужны для автоматического восстановления после сбоя — они легкие, быстрые и привязаны к текущей версии задания. Flink управляет их жизненным циклом сам;
savepoint’ы предназначены для ручной остановки и перезапуска задания. Они поддерживают изменения в заданиях. Их жизненным циклом вы управляете уже сами.
Первые работают как журналы восстановления, а вторые — как резервные копии.
Ну вот, пожалуй, и все
Мы настроили потоковую обработку с использованием MongoDB, PostgreSQL, Apache Flink, Minio и Python — и все на minikube (kubernetes). Сделали это как с помощью готового оператора, так и поуправляли процессом вручную.
После того, как мы закончили работу с minikube стоит его остановить:
minikube stop;
Или удалить ВМ, если она вам больше не нужна:
minikube delete;
Надеюсь, это сэкономит кому-то часы (а скорее всего, дни или больше) на изучение этого вопроса. А с вами был Александр Цай, успехов, друзья!