Создание пайплайнов для трансфера данных — рутинная задача Data-инженеров. Чтобы ее решить, многие копируют код коннекторов из одного проекта в другой. Из-за копипаста общая структура ломается, и в перспективе может возникнуть трудность с поддержкой проекта.

Источников данных много — Яндекс.Директ, Google Analytics и другие. По отдельности они не дают нужной картины, — данные всё равно приходится собирать в один Data Warehouse. Тут на помощь приходит Meltano: он позволяет стандартизировать написание коннекторов к различным источникам данных и быстро перенести все нужные данные.

Data-пайплайны

Большинство компаний данные централизуют: это помогает упростить управление данными и работать с общей картиной. Data-аналитики получают доступ к централизованным данным через специализированные аналитические базы данных — Data Lakes — и хранилища данных — Data Warehouse.

Чтобы получить данные, инженеры настраивают и поддерживают Data-пайплайны: берут необработанные данные исходной системы или приемника данных и передают их в аналитическую базу данных и в производственные системы — CRM и рекламные платформы. Как правило, Data-пайплайн включает в себя три элемента:

  1. Извлечение (E) данных из исходной системы или приемника данных.

  2. Преобразование (T) данных в определенную схему или модель данных.

  3. Загрузка (L) данных в пункт назначения.

Раньше, когда хранение данных стоило дорого или очень дорого, было важно понимать, какие данные поступают в конечную цель, и преобразовывать их до того, как они туда попадут. Использовали подход ETL — «извлечение, преобразование, загрузка». Сейчас, когда можно сделать трансфер всех данных из всех источников в одно место и уже потом думать, что с этим делать, перешли на ELT — «извлечение, загрузка, преобразование».

Один из самых распространенных Data-пайплайнов — получение да��ных из облачного хранилища в Data Warehouse. Эти технологии хранения часто используют в качестве приемников данных, куда сбрасываются «мгновенные фотоснимки» виртуального диска — снапшоты — из производственных баз данных или экстракты из SaaS-систем для дальнейшей обработки. Когда Data-инженеру нужно проанализировать данные, он копирует их в Data Warehouse и готовит к использованию. 

Представьте, что ваши аналитики хотят проанализировать эволюцию медленно меняющихся характеристик своих клиентов: где они живут, где работают, сколько у них детей и так далее. Один из способов провести такой анализ — делать ежедневный снапшот исходной базы данных клиентов, выгружая ее в Storage Bucket. Затем с помощью Data-пайплайна загрузить все данные снапшота из Storage Bucket в Data Warehouse. 


Для организаций с миллионами клиентов объем этих данных может быстро вырасти до наборов данных с миллиардами строк. Meltano поможет быстро обработать весь массив данных благодаря инкрементальной записи: когда в источнике появляется что-то новое, Meltano сразу это дописывает в Data Warehouse.

Одно из самых популярных облачных решений для хранения и обработки больших объемов данных — это Google BigQuery. Загрузить туда данные можно множеством способов — от прямой загрузки CSV или Apache Parquet через веб-интерфейс до Python-клиента, который позволяет писать свои скрипты экстракции и загрузки данных в Google BigQuery.

В следующем примере рассмотрим, как загрузить данные из Яндекс.Директа в Google BigQuery, используя Meltano и инструменты Singer, Prefect и DBT.

Как написать тап для REST API, GraphQL, базы данных или другого источника

Meltano предоставляет свой набор для разработки, — Meltano SDK — который позволяет быстро и легко создавать свои собственные тапы под любой источник. Для каждого источника нужно указать его тип, REST API, GraphQL, SQL, указать метод авторизации и описать стримы. В случае с SQL это будут таблицы, с REST API — отдельные методы. Каждый стрим требует описания схемы данных.

Руководство по написанию своего тапа

Для начала нужно установить cookiecutter:

pip3 install pipx
pipx ensurepath
pipx install cookiecutter
pipx install poetry

Далее скопировать шаблон для нового тапа:

cookiecutter https://github.com/meltano/sdk --directory="cookiecutter/tap-template"

Затем ответить на вопросы инсталлятора и завершить инициализацию тапа.


Теперь нам нужно переписать несколько классов:

YandexDirectStream в файле client.py

class YandexDirectStream(RESTStream):
   """YandexDirect stream class."""

   url_base = "https://api.direct.yandex.com/json/v5"

   records_jsonpath = "$.result[*]"  # Or override `parse_response`.
   next_page_token_jsonpath = "$.next_page"  # Or override `get_next_page_token`.
   def validate_response(self, response):
       if response.status_code == 400:
           data = response.json()
           raise FatalAPIError(f"Error message found: {data['error']['error_string']} {data['error']['error_detail']}")
       super().validate_response(response)

       if response.status_code in [201, 202]:
           raise RetriableAPIError("The report is being generated in offline mode")
       try:
           data = response.json()
           if data.get("error"):
                   raise FatalAPIError(f"Error message found: {data['error']['error_detail']}")
       except JSONDecodeError:
           if 200 <= response.status_code < 300:
               pass
  
   def backoff_max_tries(self) -> int:
       return 8

Обратите внимание на атрибуты url_base, которые нужно заменить в соответствии с URL вашего API, records_jsonpath и next_page_token — в соответствии со структурой возвращаемого ответа, и в случае Яндекс.Директ — метод validate_response.

В файле streams.py нужно создать классы, унаследованные от YandexDirectStream для всех эндпоинтов, с которых вы хотите получать данные. Для примера будем использовать https://api.direct.yandex.com/json/v5/campaigns.

class CampaignsStream(YandexDirectStream):
   """Define custom stream."""

   name = "campaigns"
   path = "/campaigns"
   primary_keys = ["Id"]
   replication_key = None
   records_jsonpath = "$.result.Campaigns[*]"
   schema = th.PropertiesList(
       th.Property("Name", th.StringType),
       th.Property("Id", th.IntegerType, description="The user's system ID"),
   ).to_dict()

   def prepare_request_payload(
       self, context: Optional[dict], next_page_token: Optional[Any]
   ) -> Optional[dict]:
       data = {
           "method": "get",
           "params": {"SelectionCriteria": {}, "FieldNames": ["Id", "Name", "Status"]},
       }
       return data

name — название Stream, оно будет отражено в названии таблицы после работы тапа.

path — название эндпоинта.

primary_keys — наб��р полей, которые будут уникальными для всей таблицы.

replication_key — поле, по которому будет производиться инкрементальная репликация.

records_jsonpath — путь к массиву с записями.

schema — описание схемы таблицы.

метод prepare_request_payload — payload, который нужно передать в соответствии с документацией API.

И наконец файл tap.py:

from typing import List

from singer_sdk import Tap, Stream
from singer_sdk import typing as th  # JSON schema typing helpers
# TODO: Import your custom stream types here:
from tap_yandexdirect.streams import (
   YandexDirectStream,
   CampaignsStream,
)

STREAM_TYPES = [
   CampaignsStream,
]


class TapYandexDirect(Tap):
   """YandexDirect tap class."""
   name = "tap-yandexdirect"

   # TODO: Update this section with the actual config values you expect:
   config_jsonschema = th.PropertiesList(
       th.Property(
           "access_token",
           th.StringType,
           required=True,
           description="The token to authenticate against the API service"
       ),
       th.Property(
           "start_date",
           th.DateTimeType,
           description="The earliest record date to sync"
       ),
       th.Property(
           "end_date",
           th.DateTimeType,
           description="The earliest record date to sync"
       ),
   ).to_dict()

   def discover_streams(self) -> List[Stream]:
       """Return a list of discovered streams."""
       return [stream_class(tap=self) for stream_class in STREAM_TYPES]

Здесь нужно импортировать стримы из streams.py и указать, что нужно для конфигурации тапа.

Для запуска выполните команду:

poetry run tap-yandexdirect

Чтобы получить возможность использовать ваш тап в Meltano-проектах, добавьте его в репозиторий.

На выходе получаются необходимые данные из источника, отформатированные в соответствии со спецификацией Singer. Это позволяет загружать их в любой из предложенных источников — Google BigQuery, PostgreSQL, Microsoft Azure или любую другую платформу для хранения данных. Таким образом, единожды описав схему, мы автоматически получаем в��зможность загрузить эти данные куда угодно без дополнительного написания кода.

Создание и настройка проекта Meltano

Запустите проект Meltano с помощью команды init. Это основа вашей централизованной платформы обработки данных, которую мы будем развивать.

meltano init my-project

Затем добавьте необходимые плагины Meltano с помощью команды add. Плагины, которые вам нужны, — это tap-yandexdirect и target-bigquery. Добавьте tap-yandexdirect, используя флаг custom, и предоставьте следующие входные данные при появлении запроса:

Плагины, которые вам нужны, — это tap-yandexdirect и target-bigquery. Добавьте tap-yandexdirect, используя флаг custom, и предоставьте следующие входные данные при появлении запроса:

meltano add --custom extractor tap-yandexdirect 
# (namespace) [tap_yandexdirect]: <hit enter, no value needed>
# (pip_url) [tap-yandexdirect]: git+https://github.com/epoch8/tap-yandexdirect.git 
# (executable) [pipelinewise-tap-s3-csv]: tap-yandexdirect
# (capabilities) [[]]: properties,discover,state
Затем добавьте target-bigquery:
meltano add loader target-bigquery


Все плагины будут добавлены в файл meltano.yml. Для каждого плагина используйте команду invoke с флажком --help, которая вызовет плагин и напечатает его справочное сообщение. Если вы видите такое сообщение, значит, плагины установлены успешно:

meltano --log-level=debug invoke tap-yandexdirect --help 
meltano --log-level=debug invoke target-bigquery --help

Создание тапа Яндекс.Директ, Google BigQuery и их конфигурации

Для создания тапа Яндекс.Директа и его конфигураций нужно указать дату начала экстракции, дату конца экстракции и токен для доступа — его можно получить в личном кабинете в Яндекс.Директе.

REST API Яндекс.Директа предполагает использование отчетов с указанием необходимых полей. В нашем примере мы сделали сложные отчеты, но гибкость в описании тапов позволяет создавать конфигурируемые отчеты в случае необходимости.

Для минимальных конфигураций таргета Google BigQuery понадобится указать идентификатор проекта, идентификатор Data-сета, его местонахождение и путь к реквизитам для входа.

Для начала настройте yan tap-yandexdirect. С помощью Meltano CLI вы можете использовать следующие команды для настройки экстрактора и загрузчика:

  • start_date: Дата начала экстракции данных;

  • end_date: Дата окончания экстракции данных;

  • access_token: Ваш токен доступа к API yandex-direct.

meltano config tap-yandexdirect set start_date 2021-11-02 

meltano config  tap-yandexdirect set end_date 2022-01-01

meltano config  tap-yandexdirect set access_token your-yandexdirect-token

Если у вас возникли какие-либо проблемы при настройке экстрактора, попробуйте почитать «Устранение неполадок» или присоединяйтесь к сообществу Slack, — там можно задавать любые вопросы другим членам сообщества.

Затем выполните следующие команды и настройте загрузчик Bigquery:

meltano config target-bigquery set project_id {your_project_id}

meltano config target-bigquery set dataset_id {your_dataset_id}

meltano config target-bigquery set location {your_dataset_location}

meltano config target-bigquery set credentials_path {your_path_to_service_account_credentials.json}

Когда выполните шаги настройки с помощью CLI, meltano.yml обновится и будет выглядеть примерно так:

version: 1
default_environment: dev
project_id: e564ac62-b9ed-41c5-8941-083c535b50eb
plugins:
 extractors:
 - name: tap-yandexdirect
   namespace: tap_yandexdirect
   pip_url: git+https://github.com/epoch8/tap-yandexdirect.git
   executable: tap-yandexdirect
   capabilities:
   - state
   - discover
   - catalog
   config:
     access_token: your-access-token
     start_date: 2021-11-02
     end_date: 2022-01-01
 loaders:
 - name: target-bigquery
   variant: adswerve
   pip_url: git+https://github.com/adswerve/target-bigquery.git
   config:
     project_id: your_project_id
     dataset_id: your_dataset_id
     location: your_dataset_locations
     credentials_path: your_path_to_service_account_credentials

environments:
- name: dev
- name: staging
- name: prod

Запуск Data-пайплайна Meltano

Запуск пайплайна загрузки данных стартуется командой Meltano run, которая принимает два аргумента: название источника и название таргета. С помощью одной этой команды мы получаем загруженные данные.

Что дальше?

Если вы хотите изучить больше, попробуйте после загрузки данных в Bigquery выполнить преобразования с помощью плагина dbt, запланировать Data-пайплайн с помощью плагина Airflow orchestrator или объединить Data-пайплайн с другими экстракторами и загрузчиками с помощью команды run Meltano. Как только ваши данные преобразуются, включите в свой проект плагин Superset для анализа и создания информационных панелей.