Как стать автором
Обновить

Пишем ETL-процесс на Python

Время на прочтение5 мин
Количество просмотров15K

Привет! Меня зовут Сергей Климов, я работаю Python-разработчиком, специализируюсь на написании бэкенда на фреймворке Django. Идея данной статьи появилась из-за желания написать простой и понятный ETL-процесс без итерации по спискам и прочей “вложенности”, попутно решая вопросы рационального использования ОЗУ. Уверен, что мои наработки будут полезны для питонистов, которые столкнулись с аналогичной задачей.

Зачем это надо

Для начала обозначим предметную область. Нас интересует ETL-процесс (extract, transform, load) реализованный через паттерн “Цепочка обязанностей”. Мы разработаем в качестве примера три обработчика, которые будут передавать данные последовательно из функции в функцию. Каждый последующий обработчик решает, может ли он обработать запрос сам и стоит ли передавать запрос дальше по цепи.

В рассматриваемом примере идёт выборка из БД. Но поскольку у нас Python, то мы можем написать код, который может работать с любым интерфейсом (будь то БД, API и всё, к чему найдем документацию). Обработчики могут парсить данные, анализировать их, разрабатывать билдеры различного формата документов. К услугам нашего etl-проекта весь потенциал Python.

Перед тем, как начать

Нарратив функциональный подхода просит нас написать что-то вроде

records = extract()
transform_records = transform(records)
load(transform_records)

Такой подход абсолютно рабочий и понятный. С тем недостатком, что от шага к шагу нам приходится "таскать" списки записей. Так, в примере выше для трех шагов, ОЗУ нам потребуется в два раза больше, чем занимают сами данные. Для 4-х шагов в 3 раза больше и т.д.

Идея данной статьи в том, чтобы передавать данные из функцию в функцию, не выходя за пределы их кода. Т.е. мы можем взять объект (или небольшую группу объектов) из источника, обработать его и передать в следующую функцию.

Реализацию данной идеи начнем с псевдокода:

extract(
    transform(
       load()
    )
)

Первый обработчик extract(). В качестве параметра он получает обработчик transform(), которому (после получения данных из источника) передаст объект для дальнейшей обработки. В свою очередь transform() после своей обработки должен передать объект в обработчик load(), который мы и указываем в качестве параметра. load()- финальный обработчик, поэтому он уже не получает параметров-функций.

Чтобы реализовать эту логику в коде нам требуется сохранять состояние между вызовами функций. В python такой особенностью обладают генераторы. Также функции должны быть вызывающими (или передающими), т.е. использовать метод send() для передачи данных в генератор.

Технически в вызывающей функции передача данных в следующий обработчик будет через метод send(). А в принимающей функции прием идет через выражение yield.

Также для примера обработки данных мы будем использовать распаковку кортежей, сопоставление match/case

Постановка задачи

Вымышленная задачка, главное “погонять” данные из функции в функцию.

В базе данных есть таблица, содержащая целые числа. ETL-процесс должен пройтись по всем записям таблицы, возвести каждое число в квадрат и отобразить в консоли. Для каждого четного числа вывести информационное сообщение "the square of an even number". Если число из базы данных равно 3, то никаких действий над ним мы не делаем.

Первая функция

Задача первого обработчика из “цепочки обязанностей” - получить и передать значение в генератор путем вызова метода send(). В нашем вымышленном примере мы делаем sql-запрос к таблице, содержащей числовые строки и по одной передаем их в генератор.

from collections.abc import Generator # используется для тайпингов
from typing import Tuple, Dict, Any

import psycopg2

from psycopg2.extras import DictCursor

SQL = """select id, number from etl.source"""

def extract(batch: Generator) -> None:

    dbs: Dict = dict(dbname='demo', user='sergei', password='sergei', host='localhost')
    with psycopg2.connect(**dbs) as connection:
        with connection.cursor(cursor_factory=DictCursor) as cursor:
            cursor.execute(SQL)
            record = cursor.fetchone()  # можно использовать fetchmany, чтобы извлекать данные "пачками"
            while record:
                batch.send(record)  # следим за тем, чтобы аргументом был итерируемый объект
                record = cursor.fetchone()

После выполнения метода batch.send(record) управление возвращается в нашу функцию - идём за следующей записью из нашей БД.

Вторая и все промежуточные функции

У всех промежуточных функций “цепочки обязанностей” три задачи - принять данные, обработать их и передать в следующую функцию. В продолжении нашего вымышленного примера:

def transform(batch: Generator) -> Generator[None, DictRow, None]:
    while record := (yield):  # получаем данные из генератора

        new_number = record["number"] ** 2
        if record["number"] % 2 == 0:
            foo = "an even number"
        elif record["number"] == 3:
            print("skip load stage")
            continue
        else:
            foo = 0

        batch.send((new_number, foo))  # передаем данные в следующий генератор

Обратите внимание, что yield в скобках предполагает, что будет получен итерируемый объект. Иначе вы словите исключение StopItearation. Ветками if/elif/else показано, что можно управлять наборами данных, которые будут направлены на следующий этап. А также через continue можно вообще прервать выполнение “цепочки обязанностей”.

Заключительная функция

Технически это функция из предыдущего раздела только без инструкции send(), т.к. на вход заключительного обработчика мы не получаем параметр-функцию. Завершаем нашу вымышленную цепочку:

def load() -> Generator[None, Tuple, None]:
    while subject := (yield):
        match subject:
            case (int(number), str(bar)):
                print("the square of", bar, number)
            case (int(number), int(bar)):
                print(number)
            case _:
                raise SyntaxError(f"Unknown structure of {subject=}")

Здесь мы управляем набором данных через match/case.

if/elif/else и match/case указаны в качестве примера обработки данных.

Собираем все вместе

Пытаемся запустить наш псевдокод:

extract(
    transform(
       load()
    )
)

В итоге ловим ошибку "не удается отправить значение, отличное от None, только что запущенному генератору":

TypeError: can't send non-None value to a just-started generator

Действительно, мы не позаботились о том, как получить значения из функции генератора. Воспользуемся методом next(). Вызов метода приводит к выполнению, что возвращает результат тому, кто делал вызов. Делаем рефакторинг кода выше:

unloads = load()
next(unloads)  # внутри transform именно unloads делал вызов send
multiplication = transform(unloads)
next(multiplication)  # внутри extract именно multiplication делал вызов send
extract(multiplication)

Такой код несколько сбивает с толку, т.к. кажется, что читать его надо “снизу вверх”.

Декоратор

Давайте уберём копипасту из фрагмента кода выше в виде повторяющегося вызова next(). Для этого напишем декоратор для наших функций генераторов.

def coroutine(func):
    @wraps(func)
    def inner(*args:tuple[Any, ...], **kwargs: dict[str, Any]) -> Generator:
        fn: Generator = func(*args, **kwargs)
        next(fn)
        return fn

    return inner

В декоратор оборачиваем все функции-обработчики, кроме первой:

@coroutine
def transform(batch: Generator) -> Generator[None, DictRow, None]:
  ...

@coroutine
def load() -> Generator[None, Tuple, None]:
  ...

Итоговый вариант кода запуска etl-процесса:

unloads = load()
multiplication = transform(unloads)
extract(multiplication)

Что далее

При первом знакомстве с кодом возникает ощущение его асинхронности (спасибо декоратору с говорящим названием coroutine). Но это не так. Все действия выполняются последовательно.

В следующий раз мы поработаем над расписанием запуска etl-процессов.

Желаю успехов и с удовольствием отвечу на ваши вопросы.

Репозиторий с кодом доступен по ссылке.

Теги:
Хабы:
Всего голосов 6: ↑5 и ↓1+5
Комментарии7

Публикации

Истории

Работа

Data Scientist
77 вакансий
Python разработчик
133 вакансии

Ближайшие события

27 августа – 7 октября
Премия digital-кейсов «Проксима»
МоскваОнлайн
11 сентября
Митап по BigData от Честного ЗНАКа
Санкт-ПетербургОнлайн
14 сентября
Конференция Practical ML Conf
МоскваОнлайн
19 сентября
CDI Conf 2024
Москва
20 – 22 сентября
BCI Hack Moscow
Москва
24 сентября
Конференция Fin.Bot 2024
МоскваОнлайн
25 сентября
Конференция Yandex Scale 2024
МоскваОнлайн
28 – 29 сентября
Конференция E-CODE
МоскваОнлайн
28 сентября – 5 октября
О! Хакатон
Онлайн
30 сентября – 1 октября
Конференция фронтенд-разработчиков FrontendConf 2024
МоскваОнлайн