Кому можем быть полезна эта статья?
В вашей компании вообще нету эирфлоу или аналога.
У вас есть эирфлоу но тестирование дага можете проводить только на проде, нету стейджа (деф контура). А если на прод еще и апрув нужен то это вообще
сказка.Вы еще не работает в компании, а на интервью уже спрашивают опыт (для джунов).
Узнали себя? Тогда погнали!
*Эта статья написана аналитиком для аналитиков! Если вы разработчик, DevOps или не дай бог DBA - уходите!
Сперва устанавливаем доцкер, делается это одной строкой brew install docker
детально можно прочитать в предыдущей статье.
И переходим собственно к созданию образа эирфлоу. Работать я буду в PyCharm.
Весь необходимый код вы можете склонировать из репо.
Там готовый проект, нужно только поменять пути на ваши.
Давайте пройдемся по шагам, как если бы мы его не клонировали, а писали 'с нуля'.
Создаем новый проект, назовем его airflow.
В нем создаем файл с именем
docker-compose.yml
И прокидываем в него:
version: '3.8'
services:
airflow-webserver:
build: .
command: webserver
image: apache/airflow:2.6.0
environment:
- PYTHONPATH=/opt/airflow
- AIRFLOW__CORE__EXECUTOR=LocalExecutor
- AIRFLOW__DATABASE__SQL_ALCHEMY_CONN=postgresql+psycopg2://airflow:airflow@airflow-postgres:5432/airflow
- AIRFLOW__WEBSERVER__SECRET_KEY=your_secret_key
- AIRFLOW__WEBSERVER__WORKERS=4
- AIRFLOW__WEBSERVER__WEB_SERVER_PORT=8080
ports:
- "8080:8080"
volumes:
- /<Абсолютный путь к проекту>/dags:/opt/airflow/dags
- /<Абсолютный путь к проекту>/logs:/opt/airflow/logs
- /<Абсолютный путь к проекту>/plugins:/opt/airflow/plugins
- /<Абсолютный путь к проекту>/connections:/opt/airflow/connections
depends_on:
- airflow-scheduler
- airflow-postgres
airflow-scheduler:
build: .
command: scheduler
image: apache/airflow:2.6.0
environment:
- PYTHONPATH=/opt/airflow
- AIRFLOW__CORE__EXECUTOR=LocalExecutor
- AIRFLOW__DATABASE__SQL_ALCHEMY_CONN=postgresql+psycopg2://airflow:airflow@airflow-postgres:5432/airflow
volumes:
- /<Абсолютный путь к проекту>/dags:/opt/airflow/dags
- /<Абсолютный путь к проекту>/logs:/opt/airflow/logs
- /<Абсолютный путь к проекту>/plugins:/opt/airflow/plugins
- /<Абсолютный путь к проекту>/connections:/opt/airflow/connections # Добавлено
depends_on:
- airflow-postgres
airflow-init:
build: .
command: >
bash -c "
airflow db init &&
airflow users create --username admin --firstname Admin --lastname User --role Admin --email admin@example.com --password admin
"
image: apache/airflow:2.6.0
environment:
- PYTHONPATH=/opt/airflow
- AIRFLOW__CORE__EXECUTOR=LocalExecutor
- AIRFLOW__DATABASE__SQL_ALCHEMY_CONN=postgresql+psycopg2://airflow:airflow@airflow-postgres:5432/airflow
volumes:
- /<Абсолютный путь к проекту>/dags:/opt/airflow/dags
- /<Абсолютный путь к проекту>/logs:/opt/airflow/logs
- /<Абсолютный путь к проекту>/plugins:/opt/airflow/plugins
- /<Абсолютный путь к проекту>/connections:/opt/airflow/connections # Добавлено
depends_on:
- airflow-scheduler
- airflow-postgres
airflow-postgres:
image: postgres:13
environment:
POSTGRES_USER: airflow
POSTGRES_PASSWORD: airflow
POSTGRES_DB: airflow
ports:
- "5434:5432" # Или измените на "5434"
Да страшно, но таков пусть самурая.
И нужно еще кое-что локализовать.
Берем абсолютный путь к проекту который мы создали и вставляем всюду вместо<Абсолютный путь к проекту>
в файле выше.
К примеру у меня путь выглядит вот так:/Users/i.ivanov/PycharmProjects/airflow
и тогда в volumes мы вставляем:- /Users/i.ivanov/PycharmProjects/airflow/dags:/opt/airflow/dags
Создаем
Dockerfile
с содержимым:
FROM apache/airflow:2.6.0
USER root
#USER root # Переход на пользователя root для установки
# Установка необходимых пакетов для компиляции
RUN apt-get update && \
apt-get install -y gcc g++ python3-dev && \
apt-get clean && \
rm -rf /var/lib/apt/lists/*
# Возвращаемся к пользователю airflow
#USER airflow
USER airflow
# Копируем файл requirements.txt в контейнер
COPY requirements.txt .
# Устанавливаем зависимости
RUN pip install --no-cache-dir -r requirements.txt
Знаю вам это не нравится, но уверяю осталось немного :-)
Создаемrequirements.txt
и записываем в него:
pandas==1.3.5
sqlalchemy==1.4.49
clickhouse-sqlalchemy==0.2.0
Этот файл вам будет полезен если в ваших дагах не будет хватать либ.
Можете добавить по аналогу (после добавления нужно будет рестартнуть эирфлоу).
При добавлении надо следить чтобы не было конфликта версий, но можете на авось добавить, если что конфликт все равно вылезет при билде.
И теперь правда все!
Переходим в докер и через терминал поднимаем приложение:docker-compose -f /<Абсолютный путь к проекту>/docker-compose.yml up --build
Билдемся потихоньку .. Прокидываем хук в бд.
В директории проекта создаем папку connections/hooks/clickhouse_hook
В него прокидываем не очень правильный но рабочий хук:
import sqlalchemy as sa
import pandas as pd
def get_alchemy_engine():
ch_host = ''
ch_port = ''
ch_db = ''
ch_user = ''
ch_pass = ''
engine = sa.create_engine(f'clickhouse+native://{ch_user}:{ch_pass}@{ch_host}:{ch_port}/{ch_db}')
return engine
def get_pandas_df(sql):
engine = get_alchemy_engine()
df = pd.read_sql(
sql=sql,
con=engine
)
return df
Дальше как обычно молимся смотрим на логи чтобы не было ошибок.
Если вы поднялись и все хорошо то веб версия доступна по адресу http://localhost:8080/home
Логин/пароль: admin/admin.
На выходе вы получите настоящий airflow с полным админом.
В проекте будет тестовый даг который читает из кх и пишет в него же таблицу.
Нужно только в хуке вставить свои креды /airflow/connections .
По аналогии можно прокинуть хук к другой бд.
Удобно сделать хуки максимально похожими на те которые у вас в компании (именно с точки зрения как они выглядят и используются в даге, а не само их определение).
Импорт дага из репозитория не требует отдельной команды, это происходит автоматически почти мгновенно.
Если вы нуждаетесь в развертывании в облаке, то у ЯО есть готовое решение :-)