
Меня зовут Артем Шнайдер, и я занимаюсь DataScience в Бланке. Сегодня я хочу рассказать вам о том, как можно интегрировать два мощных инструмента – Dagster и Great Expectations.
Great Expectations позволяет определить так называемые ожидания от ваших данных, то есть задать правила и условия, которым данные должны соответствовать.
Dagster, с другой стороны, это платформа с открытым исходным кодом для управления данными, которая позволяет создавать, тестировать и развертывать пайплайны данных. Написан на python, что позволяет пользователям гибко настраивать и расширять его функциональность.
Исходный код к этой статье на GitHub.
Давайте начнем?)
Подготовка
Нам понадобятся какие-нибудь данные, которые мы загрузим в базу данных. Я решил взять набор данных транзакций с Kaggle.
Собрал для вас готовый репозиторий, который вы можете клонировать по этой ссылке.
Перейдем в директорию /simple‑postgres‑container и запустим нашу базу данных:
docker compose -f "docker-compose.yaml" up -d --build
Теперь у нас есть локальный экземпляр базы данных Postgres с табличкой transactions.
Идем дальше)
1. Great Expectations шаг за шагом
В этой части мы установим пакет great-expectations, подключимся к нашей базе данных и создадим простой набор ожиданий.
Выполним установку great-expectationsи инициализируем проект:
pip install great-expectations great_expectations init
Команда создаст директорию со следующей структурой:
great_expectations/
├── checkpoints
├── expectations
├── great_expectations.yml
├── plugins
│ └── custom_data_docs
├── profilers
└── uncommitted
├── config_variables.yml
├── data_docs
└── validations
Дальнейшие команды будем выполнять из директории /great_expectations:
cd great_expectations
Great Expectations предлагает использовать интерфейс командной строки для автоматического создания предварительно настроенных блокнотов Jupyter, которые проведут через конкретные этапы работы с Great Expectations.
1.1 Настроим источник данных
Выполним команду:
great_expectations datasource new
GE предложит варианты подключения к данным, выбираем Relational database (SQL):

P.S. Может потребоваться установка дополнительных пакетов - их GE установит сам в процессе.
Далее выбираем нашу базу данных:

GE сгенерирует Jupyter ноутбук, в котором мы должны заполнить учетные данные для подключения:

Больше ничего менять не потребуется, выполняем ноутбук до конца, закрываем и прерываем команду в терминале. Конфигурацию нашего источника данных можно увидеть в файле great_expectations.yaml.
great_expectations.yaml
# Welcome to Great Expectations! Always know what to expect from your data. # # Here you can define datasources, batch kwargs generators, integrations and # more. This file is intended to be committed to your repo. For help with # configuration please: # - Read our docs: https://docs.greatexpectations.io/docs/guides/connecting_to_your_data/connect_to_data_overview/#2-configure-your-datasource # - Join our slack channel: http://greatexpectations.io/slack # config_version refers to the syntactic version of this config file, and is used in maintaining backwards compatibility # It is auto-generated and usually does not need to be changed. config_version: 3.0 # Datasources tell Great Expectations where your data lives and how to get it. # You can use the CLI command `great_expectations datasource new` to help you # add a new datasource. Read more at https://docs.greatexpectations.io/docs/guides/connecting_to_your_data/connect_to_data_overview datasources: my_datasource: class_name: Datasource module_name: great_expectations.datasource data_connectors: default_runtime_data_connector_name: batch_identifiers: - default_identifier_name class_name: RuntimeDataConnector module_name: great_expectations.datasource.data_connector default_inferred_data_connector_name: include_schema_name: true class_name: InferredAssetSqlDataConnector module_name: great_expectations.datasource.data_connector introspection_directives: schema_name: public default_configured_data_connector_name: class_name: ConfiguredAssetSqlDataConnector assets: transactions: class_name: Asset schema_name: public module_name: great_expectations.datasource.data_connector.asset module_name: great_expectations.datasource.data_connector execution_engine: class_name: SqlAlchemyExecutionEngine credentials: host: localhost port: '5432' username: postgres password: postgres database: postgres drivername: postgresql module_name: great_expectations.execution_engine # This config file supports variable substitution which enables: 1) keeping # secrets out of source control & 2) environment-based configuration changes # such as staging vs prod. # # When GX encounters substitution syntax (like `my_key: ${my_value}` or # `my_key: $my_value`) in the great_expectations.yml file, it will attempt # to replace the value of `my_key` with the value from an environment # variable `my_value` or a corresponding key read from this config file, # which is defined through the `config_variables_file_path`. # Environment variables take precedence over variables defined here. # # Substitution values defined here can be a simple (non-nested) value, # nested value such as a dictionary, or an environment variable (i.e. ${ENV_VAR}) # # # https://docs.greatexpectations.io/docs/guides/setup/configuring_data_contexts/how_to_configure_credentials config_variables_file_path: uncommitted/config_variables.yml # The plugins_directory will be added to your python path for custom modules # used to override and extend Great Expectations. plugins_directory: plugins/ stores: # Stores are configurable places to store things like Expectations, Validations # Data Docs, and more. These are for advanced users only - most users can simply # leave this section alone. # # Three stores are required: expectations, validations, and # evaluation_parameters, and must exist with a valid store entry. Additional # stores can be configured for uses such as data_docs, etc. expectations_store: class_name: ExpectationsStore store_backend: class_name: TupleFilesystemStoreBackend base_directory: expectations/ validations_store: class_name: ValidationsStore store_backend: class_name: TupleFilesystemStoreBackend base_directory: uncommitted/validations/ evaluation_parameter_store: class_name: EvaluationParameterStore checkpoint_store: class_name: CheckpointStore store_backend: class_name: TupleFilesystemStoreBackend suppress_store_backend_id: true base_directory: checkpoints/ profiler_store: class_name: ProfilerStore store_backend: class_name: TupleFilesystemStoreBackend suppress_store_backend_id: true base_directory: profilers/ expectations_store_name: expectations_store validations_store_name: validations_store evaluation_parameter_store_name: evaluation_parameter_store checkpoint_store_name: checkpoint_store data_docs_sites: # Data Docs make it simple to visualize data quality in your project. These # include Expectations, Validations & Profiles. The are built for all # Datasources from JSON artifacts in the local repo including validations & # profiles from the uncommitted directory. Read more at https://docs.greatexpectations.io/docs/terms/data_docs local_site: class_name: SiteBuilder show_how_to_buttons: true store_backend: class_name: TupleFilesystemStoreBackend base_directory: uncommitted/data_docs/local_site/ site_index_builder: class_name: DefaultSiteIndexBuilder anonymous_usage_statistics: enabled: true data_context_id: ffd790a7-4454-4b53-b9b1-428bfcfb4e64 notebooks: include_rendered_content: globally: false expectation_validation_result: false expectation_suite: false
1.2 Создадим набор ожиданий
Возвращаемся в командую строку и выполняем:
great_expectations suite new
GE на выбор предоставит несколько способов создания наборов проверок. В документации рекомендуется автоматическое создание с использованием Data Assistant — предварительно настроенной утилиты, упрощающей создание ожиданий.

Также GE предлагает три типа DataConnector классов. Про выбор коннекторов можно почитать здесь, а нам нужен default_configured_data_connector_name:

Выбираем ассет данных и задаем имя нашему набору проверок:

Наборы ожиданий, созданные с помощью ноутбука, не предназначены для использования в производственной среде. Мы можем отредактировать сгенерированный файл/expectations/transactions/suite.json и добавить туда еще пару ожиданий: проверим, что все транзакции имеют ID, а количество купленных товаров больше 0.
Итоговый suite.json
{ "data_asset_type": null, "expectation_suite_name": "transactions.suite", "expectations": [ { "expectation_type": "expect_table_row_count_to_be_between", "kwargs": { "max_value": 2000000, "min_value": 0 }, "meta": { "profiler_details": { "metric_configuration": { "domain_kwargs": {}, "metric_name": "table.row_count", "metric_value_kwargs": null }, "num_batches": 1 } } }, { "expectation_type": "expect_table_columns_to_match_set", "kwargs": { "column_set": [ "ItemCode", "UserId", "NumberOfItemsPurchased", "Country", "TransactionTime", "ItemDescription", "TransactionId", "CostPerItem" ], "exact_match": null }, "meta": { "profiler_details": { "success_ratio": 1.0 } } }, { "expectation_type": "expect_column_values_to_not_be_null", "kwargs": { "column": "TransactionId" } }, { "expectation_type": "expect_column_values_to_be_between", "kwargs": { "column": "NumberOfItemsPurchased", "min_value": 1 } } ], "ge_cloud_id": null, "meta": { "citations": [ { "citation_date": "2023-07-06T00:08:01.886955Z", "comment": "Created by effective Rule-Based Profiler of OnboardingDataAssistant with the configuration included.\n" } ], "great_expectations_version": "0.17.2" } }
С множеством доступных ожиданий можно ознакомиться в Expectations Store.
Таким образом, у нас есть две проверки на уровне таблицы, и еще две — на уровне колонок. Идем дальше.
1.3 Создание Checkpoint
Checkpoint (контрольная точка) — это основное средство проверки данных при развертывании Great Expectations в производственной среде. Выполним команду:
great_expectations checkpoint new my_checkpoint
После выполнения всех шагов в ноутбуке в директории /checkpoints появится файл my_checkpoint.yaml:
my_checkpoint.yaml
name: my_checkpoint config_version: 1.0 template_name: module_name: great_expectations.checkpoint class_name: Checkpoint run_name_template: '%Y%m%d-%H%M%S-my-run-name-template' expectation_suite_name: batch_request: {} action_list: - name: store_validation_result action: class_name: StoreValidationResultAction - name: store_evaluation_params action: class_name: StoreEvaluationParametersAction - name: update_data_docs action: class_name: UpdateDataDocsAction site_names: [] evaluation_parameters: {} runtime_configuration: {} validations: - batch_request: datasource_name: my_datasource data_connector_name: default_inferred_data_connector_name data_asset_name: public.transactions data_connector_query: index: -1 expectation_suite_name: transactions.suite profilers: [] ge_cloud_id: expectation_suite_ge_cloud_id:
Если в ноутбуке вы не проверяли работоспособность чекпоинта, это можно сделать в терминале:
great_expectations checkpoint run my_checkpoint
Мы должны увидеть результаты проверки:

В терминале не будет отображаться информация о том, какая именно проверка не прошла, однако имеется возможность автоматически сгенерировать документацию в виде файлов HTML, что позволит посмотреть результаты в любом ��раузере. Подробнее об использовании Data Docs можно почитать здесь.
great_expectations docs build
Команда автоматически откроет окно в браузере, чтобы мы смогли увидеть результаты валидации:

Набор ожиданий готов, а значит, мы готовы запускать наши чекпоинты в Dagster!
2. Работа с Dagster
Прежде, чем мы создадим наш проект, поговорим об основных концепциях Dagster.
Asset (активы) — это объект, который фиксирует результат выполнения некоторого этапа в пайплайне dagster. Ассеты могут быть любого типа, например, файлами, таблицами базы данных или моделью машинного обучения.
Ops (операции) — являются основной единицей вычисления в Dagster, и должны выполнять относительно простые задачи, например, какие-нибудь вычисления, обращения к базе данных, вызовы API и т.д.
Таким образом, операции — это действия, которые выполняются в пайплайне, а активы — это данные, которые обрабатываются в процессе выполнения этих операций.
Jobs (задания) - основная единица исполнения и мониторинга в Dagster. Задание может материализовать набор активов, или выполнить граф операций.
Resources (ресурсы) - это объекты, которые являются общими для нескольких активов, операций, расписаний или датчиков. Например, ресурсом может быть подключение к базе данных или сервису.
2.1 Создание проекта
Самый простой способ начать работу с Dagster — использовать шаблон проекта по умолчанию с помощью интерфейса командной строки:
pip install dagster dagster project scaffold --name great-expectations-dagster
Команда dagster project создаст необходимую структуру папок:
great-expectations-dagster/
├── great_expectations_dagster
│ ├── assets.py
│ └── init.py
├── great_expectations_dagster_tests/
├── pyproject.toml
├── README.md
├── setup.cfg
└── setup.py
Файлы, с которыми мы будем работать:
__init__.py— содержит все определения, созданные в нашем проекте. Это могут активы, задания, расписания, датчики и ресурсы;assets.py— модуль Python, содержащий программно-определяемые активы;setup.py— содержит сценарий сборки, используем его для указания зависимостей для нашего проекта.
Подробнее о структуре проекта Dagster можно прочитать здесь.
2.2 Создание программно-определяемого актива
Мы создадим ассет, который провалидилирует наши данные и запишет результаты проверки в постоянное хранилище.
P.S. В Dagster есть возможность использовать фабрику ge_validation_op_factory из пакета dagster-ge для создания операций Dagster, которые интегрируются с Great Expectations. Однако это потребует создания операции, которая выгрузит нам наш набор данных в виде датафрейма и передаст его в качестве входных данных операции, созданной с помощью ge_validation_op_factory.
Отредактируем файл assets.py:
from dagster import MetadataValue, Output, asset from dagster_ge.factory import GEContextResource from great_expectations.render.renderer import ValidationResultsPageRenderer from great_expectations.render.view import DefaultMarkdownPageView @asset def validate_data(data_context: GEContextResource): # предоставим контекст данных GE context = data_context.get_data_context() # запустим проверку на соответствие набору ожиданий results = context.run_checkpoint('my_checkpoint') # визуазлиция результатов проверки в Dagster требует вывода метаданных validation_results_page_renderer = ValidationResultsPageRenderer(run_info_at_end=True) rendered_document_content_list = ( validation_results_page_renderer.render_validation_operator_result(results) ) md_str = " ".join(DefaultMarkdownPageView().render(rendered_document_content_list)) return Output( value=results['success'], metadata={ "Expectation Results": MetadataValue.md(md_str) })
Далее идем в __init__.py и редактируем наше определение:
from dagster import (Definitions, define_asset_job, file_relative_path, load_assets_from_modules) from dagster_ge.factory import GEContextResource from great_expectations_dagster.assets import validate_data defs = Definitions( assets=[ validate_data ], jobs=[ # задание, которое запустит материализацию нашего ассета define_asset_job( name='validate_data_job', selection=[validate_data] ) ], resources={ # для настройки контекста GE необходимо указать корень каталога GE # (путь к файлу great_exepctations.yaml) 'data_context': GEContextResource( ge_root_dir=file_relative_path(__file__, '../../great_expectations') ) } )
Теперь нам останется поправить файл setup.py и добавить туда пакет dagster-ge:
from setuptools import find_packages, setup setup( name="great_expectations_dagster", packages=find_packages(exclude=["great_expectations_dagster_tests"]), install_requires=[ "dagster", "dagster-ge" ], extras_require={"dev": ["dagit", "pytest"]}, )
Теперь все готово, чтобы мы могли запустить наш маленький паплайн!
В корне директории great-expectations-dagster/ выполним следующие команды:
pip install -e “.[dev]” dagit
Команда dagit распечатает URL-адрес, по которому мы сможем получить доступ к интерфейсу в браузере, обычно через порт 3000. Мы должны увидеть наше задание validate_data_job:

Теперь нам достаточно нажать на кнопку Materialize, подождать, пока задание материализует актив, и мы сможем увидеть метаданные нашей материализации:

На вкладке мы видим поле Expectation Results и ссылку [Show Markdown], нам сюда)

Вот и всё :-)
Интеграция между Dagster и Great Expectations представляет собой мощный инструмент для обеспечения качества данных в конвейерах данных, с возможностью проверки и визуализации ожиданий на каждом этапе контейнеров данных.
Надеюсь, этот туториал будет полезен :-)
