Pull to refresh
2282.28
МТС
Про жизнь и развитие в IT

Стриминг Apache Flink из MongoDB в PostgreSQL на Python

Reading time11 min
Views1.6K

Привет, Хабр! Меня зовут Александр Цай, я ведущий аналитик в МТС Web Services, но на деле занимаюсь всеми вопросами, касающимися DA/DE/BI: выявлением потребностей и сбором требований, проектированием дашбордов и витрин для них, построением и развитием внутреннего хранилища, поиском источников данных, созданием сложных ETL-пайплайнов по их доставке, DQ, проведением аналитики и много чем еще.

В этом материале я расскажу про разворачивание пайплайна по стримингу данных из MongoDB в PostgreSQL с помощью Apache Flink (стримить из Kafka банально, а так заодно пощупаем документоориентированную БД). Делать это мы будем в minikube (kubernetes), а языком программирования для заданий выступит Python. Все описанное в посте выполняется на MacBook с процессором i7.

В интернете, тем более русскоязычном, нет информации о стриминге из MongoDB в Postgres с помощью Flink. Почти все материалы по Flink, которые мне попадались, сводятся к пережевыванию примера WordCount из flink-kubernetes-operator, где на запущенном поде из папки с примерами читается файл и в консоль выводится количество слов в нем. Если спускаться до использования PyFlink, то мы натыкаемся на кастомные образы с Harness SDK и Apache Beam и другие страшные слова. Знакомо?

Так вот, это не наш путь! Данное руководство будет полезно тем, кто такой же извращенец хочет пощупать Flink на родном Python и кто не планирует брать примеры, оторванные от реальности. Все материалы можно найти у меня в репозитории GitHub.

Дисклеймер

Я не претендую на истину в последней инстанции и с удовольствием почитаю ваши комментарии, что можно было бы улучшить и как.

Также уточню, что все секреты захардкоржены прямо в манифестах, так как это учебный проект. Я не стал заморачиваться, но в перспективе, конечно же, стоит.

Содержание:

Подготовка

Что мы будем использовать:

  • любое IDE по вашему выбору — у меня VsCode;

  • Minikube (kubernetes) как среду для развертывания;

  • Docker для работы minikube;

  • Helm в качестве менеджера пакетов для установки;

  • Flink-kubernetes-operator для одного из способов развертывания;

  • Minio как внешнее хранилище для checkpoints/savepoints Flink (об этом будет подробнее в соответствующем разделе);

  • mongodb как источник;

  • postgresql в качестве таргетной базы.

Останавливаться на установке docker, minikube и helm я не буду: у них отличная документация, и ее изучение не займет у вас много времени. Перейдем сразу к развертыванию компонентов стриминга.

Итак, вы склонировали себе репозиторий, и теперь после установки вышеперечисленного нам нужно запустить minikube. Для этого в терминале пишем:

minikube start --cpus=5 --memory=8096;

Ресурсы выделяйте по своему усмотрению. Когда мини-куб успешно запустится, мы получим такой вывод:

такой вывод вы получите, когда миникуб успешно запустится

Теперь запускаем в отдельной вкладке терминала Web UI minikube dashboard:

minikube dashboard;

Получаем:

в отдельной вкладке браузера откроется WebUI

Видим, что в браузере открылся WebUI. Отлично. Minikube запущен, интерфейс доступен. Далее мы можем видеть все, что у нас происходит.

Установим сертификаты, которые нужны для работы Flink-оператора:

kubectl apply -f https://github.com/jetstack/cert-manager/releases/download/v1.11.0/cert-manager.yaml; 

Они достаточно долго поднимаются, поэтому при установке Flink-оператора могут возникнуть ошибки с сертификатами. Если вы с этим столкнулись, то просто подождите немного и повторите попытку.

Установка и настройка MongoDB 

Если у вас MacBook на M-чипе, то на этом шаге вы получите ошибку: 

ErrImagePull: no matching manifest for linux/arm64/v8 in the manifest list entries 

На Stackoverflow в этом случае предлагают перезапустить кластер под amd64 архитектуру:

minikube start --cpus=5 --memory=8096 --driver=docker --kubernetes-version=v1.26.3 --base-image=kicbase/ubuntu:v1.26.3 

Работает ли этот способ, сказать не могу, так как ноутбука на M-чипе у меня нет. Если кто пробовал, то дополните в комментариях.

Разворачивать будем через helm-образ от bitnami. Поднять MongoDB нам надо в режиме репликации, а сделать это в kubernetes — задача нетривиальная. Для этого нужно проделать множество ручных манипуляций — с использованием helm chart все сводится к нескольким командам в консоли и yaml-файлу на 12 строчек.

Режим репликации нужен, так как cdc-коннектор подключается к OPLOG-журналу (фиксирует все происходящие операции, чтобы применить их на репликах) и пишется только в случае, если база является частью replica set.

В каталоге bitnami_mongo_replica_set лежат 2 файла: values.yaml и init-mongo.js. Первый служит конфигом для установки образа, а второй требуется, чтобы инициализировать коллекцию и закинуть в нее несколько документов для старта.

Для удобства создаем namespace и configMap, устанавливаем нашу MongoDB в режиме replica set:

kubectl create namespace mongodb; 
kubectl -n mongodb create configmap init-mongo --from-file ./bitnami_mongo_replica_set/init-mongo.js; 
helm install mongodb oci://registry-1.docker.io/bitnamicharts/mongodb -f ./bitnami_mongo_replica_set/values.yaml --namespace mongodb;

В результате мы получим вывод с информацией:

В результате вы получите вывод с информацией

Теперь можно заглянуть в наш WebUI и убедиться, что поды поднялись и все работает:

После этого можно заглянуть в наш WebUI и убедиться что поды поднялись и все работает

Далее можно проверить, что коллекция и документы в ней успешно созданы. Для этого идем внутрь пода: нажимаем на под mongodb-0, затем в правом верхнем углу кликаем на exec into pod. Вводим команды:

mongosh; 
use database; 
db.users.find();

В итоге получаем:

Видим что база есть, коллекция есть и документы в ней тоже есть
Видим базу, коллекцию и документы в ней

Установка и настройка PostgreSQL 

Теперь поднимем Postgres со всем необходимым для доступа извне, а также создадим таблицу-приемник:

kubectl apply -f ./postgresql-deployment.yaml;

В postgresql-deployment.yaml формируются отдельный namespace, сервисы для доступа к базе данных внутри кластера и извне, а также инициализируется база данных и таргетная таблица, куда наша будущая Flink job будет лить данные.

При инициализации на таблицу вешаются 2 триггера для обновления таймстампа записи и создания таймстампа вместо удаления строки (soft delete). Так мы наглядно видим, что задание flink корректно обрабатывает не только insert, но также update и delete.

Поднятая БД
Поднятая БД

Как и в случае с MongoDB, с помощью консоли внутри пода проверяем, что таблица создана:

psql -U user -password password -p 5432 -d mydb; 
SELECT table_name FROM information_schema.tables WHERE table_schema='public'; 

Также можно настроить подключение через тот же DBeaver, но для доступа извне необходимо пробросить порты. Для этого открываем новую вкладку терминала и пишем:

kubectl port-forward service/postgresql-nodeport 30432:5432 --namespace=postgresql;

После чего настраиваем подключение: 

Подключение к нашей таблице

Установка и настройка MinIo 

В нашем случае Minio выступает в качестве внешнего хранилища для checkpoint- и savepoint-заданий Flink. Это необходимо для обеспечения их отказоустойчивости при падениях или ручной остановке с созданием контрольной точки. Иначе перезапущенное задание будет проходить весь oplog-журнал (в нашем случае) с самого начала.

Flink может хранить состояние без внешнего хранилища, используя FsStateBackend/RocksDBStateBackend или MemoryStateBackend. Первый вариант обеспечивает отказоустойчивость, а второй — нет, поэтому предусмотрите достаточное место для их хранения и внесите изменения в манифесты с конфигами для state.backend, а также добавьте создание PVC. В этом посте рассматриваться будет только вариант с MinIo.

Здесь аналогично Postgres в манифесте minio.yaml формируются namespace, сервисы доступа, персистентное хранилище и инициализируется бакет, в котором Flink создает каталоги для заданий.

kubectl apply -f ./minio.yaml;

Для проверки пробрасываем порты в отдельной вкладке терминала:

kubectl port-forward services/minio-service 9090:9090 --namespace=minio;

Теперь видим интерфейс MinIo с созданным бакетом под checkpoint Flink:

логин и пароль можно посмотреть внутри манифеста
Логин и пароль можно посмотреть внутри манифеста. По умолчанию login = minio, password = minio123

Опционально: Fake pipe

Необязательный шаг, в котором мы поднимем под с генератором данных в MongoDB, чтобы имитировать постоянное поступление данных в нашу коллекцию.

В первую очередь нам нужно создать кастомный докер-образ с генератором. Для этого открываем новую вкладку в терминале (да, опять) и пишем:

eval $(minikube docker-env); 
docker build -t myimages/python_mongo_generator:latest ./fake_source_pipe;

Получаем:

Начнет собираться наш образ в окружении minikube и будет доступен для сборки в кластере.

Команды, которые вводили выше, создают образ внутри minikube. Если не сделать eval $(minikube docker-env), то он соберется в локальном окружении и minikube его не увидит.

После его сборки можно применить манифест для поднятия пода с генератором, который яростно начнет слать данные в нашу коллекцию MongoDB:

kubectl apply -f ./fake_source_pipe/python-pod.yaml;

На этом подготовка инфраструктуры завершена. У нас есть источник данных в виде коллекции users в MongoDB, таблица-приемник users_postgres в PostgreSQL, а также есть внешнее хранилище S3 с бакетом для checkpoint’ов/savepoint’ов Flink в виде MinIo. Фейковый генератор уже вовсю старается и наполняет коллекцию.

Работа с Apache Flink

Поднять Apache Flink можно двумя способами: 

  • При application mode мы используем flink-kubernetes-operator. В этом режиме для каждого задания разворачивается микрокластер, состоящий из своих job- и task-менеджеров. Этот подход дает лучшую изоляцию процессов и утилизацию ресурсов. Оператор берет на себя все заботы по менеджменту заданий. Правда, у каждого задания есть свой отдельный WebUI-интерфейс, что неудобно.

  • При session mode мы будем разворачивать Apache Flink на нативном kubernetes. В этом режиме у нас один job manager (не включая реплики для обеспечения высокой доступности), соответственно, единый WebUI для всех заданий. Но менеджмент заданий на ручной коробке передач уже берет на себя разработчик.

В одноименных каталогах flink_cluster_high_availability, flink_cluster_no_high_availability и flink-operator/flink_job лежат манифесты для запуска, докер-файлы для сборки образов и Python-скрипты с кодом задания. Они немного различаются между собой для разных методов, поэтому в каждом каталоге будет свой набор под конкретный метод.

Мы попробуем оба подхода, чтобы лучше понять, как все происходит.

В обоих вариантах перед развертыванием Flink нужно создать наш докер-образ, где настроены все зависимости, установлены библиотеки и коннекторы и так далее. Для этого, как и в случае с Fake pipe, выполняем в отдельной вкладке терминала:

eval $(minikube docker-env); 
docker build -t myimages/pyflink-base:latest ./path/to/folder/with/flink_jobs;

Развертываем Flink c помощью Flink-kubernetes-operator

Сначала установим оператор. Для этого добавим репозиторий с нужной версией в helm:

helm repo add flink-operator-1-10-0-repo https://downloads.apache.org/flink/flink-kubernetes-operator-1.10.0/; 
helm install flink-kubernetes-operator flink-operator-1-10-0-repo/flink-kubernetes-operator --namespace flink-operator --create-namespace; 
в дашборде kubernetes увидим что все поднялось
В дашборде kubernetes видим, что все поднялось

К этому моменту у нас уже есть сертификаты и подготовлен докер-образ, по желанию в MongoDB уже льются данные. Теперь мы можем сделать submit нашего задания во Flink:

kubectl apply -f ./flink-operator/flink_jobs/flink-users-job.yaml; 

В этом манифесте указаны:

  • ресурсы, которые мы хотим выделить для задания;

  • настройки для хранения checkpoint и savepoint;

  • докер-образ и аргументы, что мы запускаем Python-задание;

  • параметры подключения к MinIo, а также используемый для подключения плагин.

Это важный момент, так как их два: presto и hadoop. Я рекомендую взять hadoop, т. к. presto не позволяет восстанавливать состояние задания из checkpoint. Да, это вот так работает. Не спрашивайте почему, сам не знаю.

Отдельно еще расскажу про две настройки: 

  • upgrade mode отвечает за способ обновления задания. Я установил savepoint. Если мы применим манифест с изменениями, то оператор перед остановкой сделает его слепок задания и при перезапуске использует его, чтобы продолжить работу с того места, где был остановлен.

  • state непосредственно указывает, стоит ли запустить или приостановить задание.

Итак, мы применили манифест, задание поднялось:

В нашем случае мы видим job manager и в паре к нему task manager
В нашем случае мы видим job manager и в паре к нему task manager

Для доступа к WebUI этого задания нужно пробросить порты до пода. Открываем новую вкладку терминала и пишем:

kubectl port-forward -n flink-operator <flink-jobmanager-pod-id> 8081:8081;

Теперь мы видим всю информацию:

Видим всю информацию о задании
Видим всю информацию о задании

Сам интерфейс интуитивно понятен, в нем вы можете посмотреть состояние задания, checkpoint’ы, логи job- и task-менеджеров и т. д.

Если все работает как надо, то в таблице PostgreSQL вы увидите записи, перекочевавшие из коллекции MongoDB. А если запускали Fake pipe, то они будут продолжать активно литься.

результаты работы нашего Flink задания
Результаты работы нашего Flink-задания

Чтобы остановить задание, меняем state в манифесте с "running" на "suspended" и применяем его.

Поздравляю, вы только что настроили пайплайн стриминговой обработки данных из MongoDB в PostgreSQL с помощью Apache Flink с использованием kubernetes. А теперь перейдем к более интересному.

Flink Native Kubernetes 

Если вы выполняли все последовательно, то давайте сначала удалим Flink-оператор:

helm uninstall flink-kubernetes-operator -n flink-operator; 
kubectl delete namespace flink-operator; 

А если сразу перешли сюда, то пропустите этот шаг.

Итак, у нас есть 2 каталога: flink_cluster_high_availability и flink_cluster_no_high_availability. Первый разворачивает минимум два job manager для обеспечения высокой доступности и использует возможности kubernetes для восстановления после сбоев, а второй, очевидно из названия, нет. © кэп

Принципиально они отличаются только прописываемой конфигурацией и числом обязательных реплик job manager. Тут выбирайте что вам хочется, в остальном процесс будет одинаковый. Я разверну в режиме high availability.

Внутри папки flink_cluster_high_availability лежат манифесты для формирования и биндинга роли, создания сервисного аккаунта, конфиг-мапа для прокидывания наших настроек во Flink, а также непосредственно деплоймента и разворачивания сопутствующих сервисов для доступа. Flink-configuration-configmap.yaml содержит в себе данные и настройки для подключения MinIo, настройки стратегии хранения checkpoint’ов и логирования.

Для этого подхода открываем новую вкладку терминала и пишем:

eval $(minikube docker-env); 
docker build -t myimages/pyflink-base:latest ./flink_cluster_high_availability/flink_jobs; 

По готовности применяем манифесты в папке: 

kubectl apply -f ./flink_cluster_high_availability;

После поднятия подов пробрасываем в отдельной вкладке терминала (как мы любим) порты до WebUI.

kubectl port-forward services/flink-jobmanager-rest 8081:8081 --namespace=flink; 

Итак, у нас есть развернутый Flink с одним или несколькими job manager, куда мы будем сабмитить задания. В отличие от работы с оператором, делать это нам придется потными руками. Для этого копируем скрипт с заданием на под job manager. Да, это можно было реализовать на уровне образа, но я захотел так. Что вы мне сделаете, я текст).

Для этого смотрим id любого из наших job manager, копируем скрипт с заданием на под и запускаем:

kubectl -n flink cp ./flink_cluster_high_availability/flink-job/users-job.py <flink-job-manager-pod-id>:/tmp/users-job.py; 
kubectl -n flink exec -it <flink-job-manager-pod-id> -- /opt/flink/bin/flink run --python /tmp/users-job.py; 
Получаем вывод что задание зарегистрировано
Видим в выводе, что задание зарегистрировано

Задание появится в WebUI, checkpoint’ы начнут писаться в MinIo, а в таблицу Postgres полетят записи.

как и в случае с оператором

Для остановки задания можно в WebUI в правом верхнем углу нажать Cancel Job (при этом оно просто завершится и savepoint не создастся). Правильнее это делать как оператор: остановить задание созданием savepoint в MinIo.

Провернем мы это, конечно же, вручную, указав id задания и путь для сохранения savepoint:

kubectl -n flink exec -it <flink job-manager-pod-id> -- /opt/flink/bin/flink stop --savepointPath s3a://apache-flink/savepoints/users-job/ <job-id>; 
Для создания savepoint используем id задания
Для создания savepoint используем id задания

Соответственно, чтобы запустить задание с savepoint или checkpoint, сходим в MinIo посмотреть путь до него. Так как в конфиге Flink мы установили execution.checkpointing.externalized-checkpoint-retention: RETAIN_ON_CANCELLATION, checkpoint не удаляются при остановке задания, и запустить мы его можем не только с savepoint, но и с checkpoint. В противном случае они бы удалились, и у нас бы остался только вариант предварительного создания savepoint.

Перезапускаем задание с cheсkpoint:

смотрим нужный нам чекпоинт

Смотрим нужный нам checkpoint: 

kubectl -n flink exec -it <flink-jobmanager-pod-id> -- /opt/flink/bin/flink run --python /tmp/flink-job/users-job.py -s s3a://apache-flink/checkpoints/users-job/aa05d2d7f89335aa898c381dba05a420/chk-58;

Или с savepoint:

смотрим сейвпоинт

Смотрим savepoint: 

kubectl -n flink exec -it <flink-jobmanager-pod-id> -- /opt/flink/bin/flink run --fromSavepoint s3a://apache-flink/savepoints/users-job/savepoint-829251-03bc2e202d0e --python /tmp/users-job.py; 

Тут сделаю ремарку про отличие checkpoint и savepoint. Они похожи, и их часто путают, но при этом:

  • checkpoint’ы нужны для автоматического восстановления после сбоя — они легкие, быстрые и привязаны к текущей версии задания. Flink управляет их жизненным циклом сам;

  • savepoint’ы предназначены для ручной остановки и перезапуска задания. Они поддерживают изменения в заданиях. Их жизненным циклом вы управляете уже сами.

Первые работают как журналы восстановления, а вторые — как резервные копии.

Ну вот, пожалуй, и все 

Мы настроили потоковую обработку с использованием MongoDB, PostgreSQL, Apache Flink, Minio и Python — и все на minikube (kubernetes). Сделали это как с помощью готового оператора, так и поуправляли процессом вручную.

После того, как мы закончили работу с minikube стоит его остановить:

minikube stop;

Или удалить ВМ, если она вам больше не нужна:

minikube delete;

Надеюсь, это сэкономит кому-то часы (а скорее всего, дни или больше) на изучение этого вопроса. А с вами был Александр Цай, успехов, друзья!

Tags:
Hubs:
Total votes 17: ↑17 and ↓0+21
Comments0

Useful links

Интеграция виджета обратного звонка МТС Exolve в документацию на MkDocs

Reading time8 min
Views543
Total votes 6: ↑6 and ↓0+10
Comments0

Путь в AI: от студента до инженера, исследователя или разработчика

Level of difficultyEasy
Reading time8 min
Views2.1K
Total votes 27: ↑26 and ↓1+29
Comments2

Information

Website
www.mts.ru
Registered
Founded
Employees
over 10,000 employees
Location
Россия