Создание пайплайнов для трансфера данных — рутинная задача Data-инженеров. Чтобы ее решить, многие копируют код коннекторов из одного проекта в другой. Из-за копипаста общая структура ломается, и в перспективе может возникнуть трудность с поддержкой проекта.
Источников данных много — Яндекс.Директ, Google Analytics и другие. По отдельности они не дают нужной картины, — данные всё равно приходится собирать в один Data Warehouse. Тут на помощь приходит Meltano: он позволяет стандартизировать написание коннекторов к различным источникам данных и быстро перенести все нужные данные.

Data-пайплайны
Большинство компаний данные централизуют: это помогает упростить управление данными и работать с общей картиной. Data-аналитики получают доступ к централизованным данным через специализированные аналитические базы данных — Data Lakes — и хранилища данных — Data Warehouse.
Чтобы получить данные, инженеры настраивают и поддерживают Data-пайплайны: берут необработанные данные исходной системы или приемника данных и передают их в аналитическую базу данных и в производственные системы — CRM и рекламные платформы. Как правило, Data-пайплайн включает в себя три элемента:
Извлечение (E) данных из исходной системы или приемника данных.
Преобразование (T) данных в определенную схему или модель данных.
Загрузка (L) данных в пункт назначения.
Раньше, когда хранение данных стоило дорого или очень дорого, было важно понимать, какие данные поступают в конечную цель, и преобразовывать их до того, как они туда попадут. Использовали подход ETL — «извлечение, преобразование, загрузка». Сейчас, когда можно сделать трансфер всех данных из всех источников в одно место и уже потом думать, что с этим делать, перешли на ELT — «извлечение, загрузка, преобразование».
Один из самых распространенных Data-пайплайнов — получение да��ных из облачного хранилища в Data Warehouse. Эти технологии хранения часто используют в качестве приемников данных, куда сбрасываются «мгновенные фотоснимки» виртуального диска — снапшоты — из производственных баз данных или экстракты из SaaS-систем для дальнейшей обработки. Когда Data-инженеру нужно проанализировать данные, он копирует их в Data Warehouse и готовит к использованию.
Представьте, что ваши аналитики хотят проанализировать эволюцию медленно меняющихся характеристик своих клиентов: где они живут, где работают, сколько у них детей и так далее. Один из способов провести такой анализ — делать ежедневный снапшот исходной базы данных клиентов, выгружая ее в Storage Bucket. Затем с помощью Data-пайплайна загрузить все данные снапшота из Storage Bucket в Data Warehouse.
Для организаций с миллионами клиентов объем этих данных может быстро вырасти до наборов данных с миллиардами строк. Meltano поможет быстро обработать весь массив данных благодаря инкрементальной записи: когда в источнике появляется что-то новое, Meltano сразу это дописывает в Data Warehouse.
Одно из самых популярных облачных решений для хранения и обработки больших объемов данных — это Google BigQuery. Загрузить туда данные можно множеством способов — от прямой загрузки CSV или Apache Parquet через веб-интерфейс до Python-клиента, который позволяет писать свои скрипты экстракции и загрузки данных в Google BigQuery.
В следующем примере рассмотрим, как загрузить данные из Яндекс.Директа в Google BigQuery, используя Meltano и инструменты Singer, Prefect и DBT.
Как написать тап для REST API, GraphQL, базы данных или другого источника
Meltano предоставляет свой набор для разработки, — Meltano SDK — который позволяет быстро и легко создавать свои собственные тапы под любой источник. Для каждого источника нужно указать его тип, REST API, GraphQL, SQL, указать метод авторизации и описать стримы. В случае с SQL это будут таблицы, с REST API — отдельные методы. Каждый стрим требует описания схемы данных.
Руководство по написанию своего тапа
Для начала нужно установить cookiecutter:
pip3 install pipx pipx ensurepath pipx install cookiecutter pipx install poetry
Далее скопировать шаблон для нового тапа:
cookiecutter https://github.com/meltano/sdk --directory="cookiecutter/tap-template"
Затем ответить на вопросы инсталлятора и завершить инициализацию тапа.
Теперь нам нужно переписать несколько классов:
YandexDirectStream в файле client.py
class YandexDirectStream(RESTStream): """YandexDirect stream class.""" url_base = "https://api.direct.yandex.com/json/v5" records_jsonpath = "$.result[*]" # Or override `parse_response`. next_page_token_jsonpath = "$.next_page" # Or override `get_next_page_token`. def validate_response(self, response): if response.status_code == 400: data = response.json() raise FatalAPIError(f"Error message found: {data['error']['error_string']} {data['error']['error_detail']}") super().validate_response(response) if response.status_code in [201, 202]: raise RetriableAPIError("The report is being generated in offline mode") try: data = response.json() if data.get("error"): raise FatalAPIError(f"Error message found: {data['error']['error_detail']}") except JSONDecodeError: if 200 <= response.status_code < 300: pass def backoff_max_tries(self) -> int: return 8
Обратите внимание на атрибуты url_base, которые нужно заменить в соответствии с URL вашего API, records_jsonpath и next_page_token — в соответствии со структурой возвращаемого ответа, и в случае Яндекс.Директ — метод validate_response.
В файле streams.py нужно создать классы, унаследованные от YandexDirectStream для всех эндпоинтов, с которых вы хотите получать данные. Для примера будем использовать https://api.direct.yandex.com/json/v5/campaigns.
class CampaignsStream(YandexDirectStream): """Define custom stream.""" name = "campaigns" path = "/campaigns" primary_keys = ["Id"] replication_key = None records_jsonpath = "$.result.Campaigns[*]" schema = th.PropertiesList( th.Property("Name", th.StringType), th.Property("Id", th.IntegerType, description="The user's system ID"), ).to_dict() def prepare_request_payload( self, context: Optional[dict], next_page_token: Optional[Any] ) -> Optional[dict]: data = { "method": "get", "params": {"SelectionCriteria": {}, "FieldNames": ["Id", "Name", "Status"]}, } return data
name — название Stream, оно будет отражено в названии таблицы после работы тапа.
path — название эндпоинта.
primary_keys — наб��р полей, которые будут уникальными для всей таблицы.
replication_key — поле, по которому будет производиться инкрементальная репликация.
records_jsonpath — путь к массиву с записями.
schema — описание схемы таблицы.
метод prepare_request_payload — payload, который нужно передать в соответствии с документацией API.
И наконец файл tap.py:
from typing import List from singer_sdk import Tap, Stream from singer_sdk import typing as th # JSON schema typing helpers # TODO: Import your custom stream types here: from tap_yandexdirect.streams import ( YandexDirectStream, CampaignsStream, ) STREAM_TYPES = [ CampaignsStream, ] class TapYandexDirect(Tap): """YandexDirect tap class.""" name = "tap-yandexdirect" # TODO: Update this section with the actual config values you expect: config_jsonschema = th.PropertiesList( th.Property( "access_token", th.StringType, required=True, description="The token to authenticate against the API service" ), th.Property( "start_date", th.DateTimeType, description="The earliest record date to sync" ), th.Property( "end_date", th.DateTimeType, description="The earliest record date to sync" ), ).to_dict() def discover_streams(self) -> List[Stream]: """Return a list of discovered streams.""" return [stream_class(tap=self) for stream_class in STREAM_TYPES]
Здесь нужно импортировать стримы из streams.py и указать, что нужно для конфигурации тапа.
Для запуска выполните команду:
poetry run tap-yandexdirect
Чтобы получить возможность использовать ваш тап в Meltano-проектах, добавьте его в репозиторий.
На выходе получаются необходимые данные из источника, отформатированные в соответствии со спецификацией Singer. Это позволяет загружать их в любой из предложенных источников — Google BigQuery, PostgreSQL, Microsoft Azure или любую другую платформу для хранения данных. Таким образом, единожды описав схему, мы автоматически получаем в��зможность загрузить эти данные куда угодно без дополнительного написания кода.
Создание и настройка проекта Meltano
Запустите проект Meltano с помощью команды init. Это основа вашей централизованной платформы обработки данных, которую мы будем развивать.
meltano init my-project
Затем добавьте необходимые плагины Meltano с помощью команды add. Плагины, которые вам нужны, — это tap-yandexdirect и target-bigquery. Добавьте tap-yandexdirect, используя флаг custom, и предоставьте следующие входные данные при появлении запроса:
Плагины, которые вам нужны, — это tap-yandexdirect и target-bigquery. Добавьте tap-yandexdirect, используя флаг custom, и предоставьте следующие входные данные при появлении запроса:
meltano add --custom extractor tap-yandexdirect # (namespace) [tap_yandexdirect]: <hit enter, no value needed> # (pip_url) [tap-yandexdirect]: git+https://github.com/epoch8/tap-yandexdirect.git # (executable) [pipelinewise-tap-s3-csv]: tap-yandexdirect # (capabilities) [[]]: properties,discover,state Затем добавьте target-bigquery: meltano add loader target-bigquery
Все плагины будут добавлены в файл meltano.yml. Для каждого плагина используйте команду invoke с флажком --help, которая вызовет плагин и напечатает его справочное сообщение. Если вы видите такое сообщение, значит, плагины установлены успешно:
meltano --log-level=debug invoke tap-yandexdirect --help meltano --log-level=debug invoke target-bigquery --help
Создание тапа Яндекс.Директ, Google BigQuery и их конфигурации
Для создания тапа Яндекс.Директа и его конфигураций нужно указать дату начала экстракции, дату конца экстракции и токен для доступа — его можно получить в личном кабинете в Яндекс.Директе.
REST API Яндекс.Директа предполагает использование отчетов с указанием необходимых полей. В нашем примере мы сделали сложные отчеты, но гибкость в описании тапов позволяет создавать конфигурируемые отчеты в случае необходимости.
Для минимальных конфигураций таргета Google BigQuery понадобится указать идентификатор проекта, идентификатор Data-сета, его местонахождение и путь к реквизитам для входа.
Для начала настройте yan tap-yandexdirect. С помощью Meltano CLI вы можете использовать следующие команды для настройки экстрактора и загрузчика:
start_date: Дата начала экстракции данных;
end_date: Дата окончания экстракции данных;
access_token: Ваш токен доступа к API yandex-direct.
meltano config tap-yandexdirect set start_date 2021-11-02 meltano config tap-yandexdirect set end_date 2022-01-01 meltano config tap-yandexdirect set access_token your-yandexdirect-token |
Если у вас возникли какие-либо проблемы при настройке экстрактора, попробуйте почитать «Устранение неполадок» или присоединяйтесь к сообществу Slack, — там можно задавать любые вопросы другим членам сообщества.
Затем выполните следующие команды и настройте загрузчик Bigquery:
meltano config target-bigquery set project_id {your_project_id} meltano config target-bigquery set dataset_id {your_dataset_id} meltano config target-bigquery set location {your_dataset_location} meltano config target-bigquery set credentials_path {your_path_to_service_account_credentials.json} |
Когда выполните шаги настройки с помощью CLI, meltano.yml обновится и будет выглядеть примерно так:
version: 1 default_environment: dev project_id: e564ac62-b9ed-41c5-8941-083c535b50eb plugins: extractors: - name: tap-yandexdirect namespace: tap_yandexdirect pip_url: git+https://github.com/epoch8/tap-yandexdirect.git executable: tap-yandexdirect capabilities: - state - discover - catalog config: access_token: your-access-token start_date: 2021-11-02 end_date: 2022-01-01 loaders: - name: target-bigquery variant: adswerve pip_url: git+https://github.com/adswerve/target-bigquery.git config: project_id: your_project_id dataset_id: your_dataset_id location: your_dataset_locations credentials_path: your_path_to_service_account_credentials environments: - name: dev - name: staging - name: prod
Запуск Data-пайплайна Meltano
Запуск пайплайна загрузки данных стартуется командой Meltano run, которая принимает два аргумента: название источника и название таргета. С помощью одной этой команды мы получаем загруженные данные.
Что дальше?
Если вы хотите изучить больше, попробуйте после загрузки данных в Bigquery выполнить преобразования с помощью плагина dbt, запланировать Data-пайплайн с помощью плагина Airflow orchestrator или объединить Data-пайплайн с другими экстракторами и загрузчиками с помощью команды run Meltano. Как только ваши данные преобразуются, включите в свой проект плагин Superset для анализа и создания информационных панелей.
