Вы не задумываетесь над процессами, а фокусируетесь на решении задач в сжатые сроки, только когда вы аналитик в стартапе или маленькой команде. Но после первых успехов хочется оглянуться назад и наладить процессы, почистить библиотеку артефактов и подтянуть качество. Особенно когда команда стремительно растет. Непрозрачная структура тяжело поддается управлению и не позволяет быстро обучать сотрудников.
Меня зовут Елдос, я Big Data аналитик, и сегодня я расскажу о том, как команда Fintech из службы Big Data аналитики и машинного обучения Beeline Казахстан создала среду для совместной работы, связала используемые инструменты одним ключом, обеспечила централизованное хранение кода в Git и подсчетов в HDFS, и тем самым обеспечила воспроизводимость экспериментов.
Зачем нам это
Предположим, в команде из пяти аналитиков каждый пишет код, делает анализ и строит модели обособленно. Остальные члены команды не имеют доступа к этой работе, поэтому не проверяют качество результата и не воспроизводят эксперимент. Если вы захотите оптимизировать витрину или модель вашего коллеги, то потратите больше времени на ознакомление и передачу кода, чем на саму задачу. Это особенно сложно, если нужная модель не попала в продуктив или сотрудник в отпуске или уволился.
Что такое воспроизводимость экспериментов?
В разработке программного обеспечения и анализе больших данных эксперимент проводится, чтобы подтвердить или опровергнуть гипотезы. В него входит исходный код, Jupyter Notebooks (Jupyter-блокноты или ноутбуки), таблицы, выгрузки и другие артефакты.
Воспроизводимость эксперимента – это его способность воспроизводить одинаковые или близкие результаты при повторном проведении эксперимента в аналогичных условиях.
Чтобы обеспечить воспроизводимость экспериментов/гипотезы, наши команды Big Data аналитики и машинного обучения следуют трем правилам:
код, анализ, данные и артефакты эксперимента хранятся централизованно;
код/Jupyter-блокноты последовательны и воспроизводимы;
завершенные эксперименты не изменяются. Для переиспользования данных обращаемся к ним из другого эксперимента.
К централизации экспериментов нас привела необходимость в коллаборативной среде, где команда работает совместно и ежедневно обменивается знаниями и результатами.
Для этого при проверке гипотезы создается задача в Jira, которая агрегирует проделанную работу и связывает все компоненты: исходный код, анализ которого находится в Git-репозитории, а входные данные и результаты вычислений – в распределенной файловой системе HDFS.
Централизация исходного кода
Каждая команда аналитиков разных проектов работает в своем репозитории Git со структурой каталогов, которая соответствует основным направлениям и их подпроектам.
Стабильный протестированный анализ/код эксперимента хранится в основной master-ветке.
Поэтому всегда в начале работы над экспериментом делаем копирование изменений Pull Request из master-ветки в локальный master, и из него уже создаем новую ветку эксперимента. Название ветки эксперимента соответствует номеру задачи в Jira и написано в стиле kebab-case, что автоматически связывает ветку и задачу в Jira. Затем в папке соответствующего проекта создаем каталог эксперимента с названием ветки.
После завершения проверки мини-гипотез заливаем изменения с помощью commit в локальную ветку эксперимента, а в конце рабочего дня отправляем все коммиты в удаленный репозиторий командой “push”.
После того как задача завершена и протестирована локально, запрашиваем Merge Request, в котором настроены правила для проверки качества кода (code review) другими участниками команды. Изменения попадают в master-ветку только после проверки и одобрения кода двумя проверяющими.
Merge Request должны быть понятными и развернутыми, важно подсветить следующее:
Какая была цель?
Какая гипотеза?
Что было протестировано / сделано?
Какие результаты и метрики качества?
Какие сложности были и какие есть возможные доработки?
На что стоит обратить внимание?
Работа с Jupyter Notebook
Ноутбуки эксперимента вместе с историей изменений также хранятся в Git-репозитории и доступны в html-формате для просмотра и изучения аналитики проекта.
Наша команда использует следующие рекомендации при работе с Jupyter-блокнотами:
Блокноты эксперимента пронумерованы, и, если запускать их один за другим, результат выполнения будет соответствовать исходному запуску полного эксперимента.
Такое же требование применяется к ячейкам кода ноутбука. Если запустить все ячейки последовательно, результат будет тем же, что и при запуске всего блокнота.
Помимо воспроизводимости результатов, код сопровождается описательными ячейками Markdown, чтобы коллеги быстрее ориентировались в блокноте.
Применяя эти правила как часть совместной работы, мы обеспечиваем воспроизводимость экспериментов.
Централизация входных данных и результатов
Для хранения и управления витринами данных создаем отдельную папку в командном каталоге системы хранения данных HDFS. Название папки содержит номер задачи (Jira).
Для простоты управления мы связали все компоненты эксперимента (код, данные, задача) единым ключом (номера задачи в Jira). В роли коннектора между HDFS и Python-кодом выступает самописный класс Task, который создан для облегчения работы с данными.
Как связать все компоненты эксперимента единым ключом
Все, что нужно для такой связи, – инициализировать класс Task с основным атрибутом (номером задачи в Jira) и создать экземпляр этого класса. Все дальнейшие действия производятся через созданный экземпляр – с необходимыми для комфортной работы методами:
ls
write
read
get_path
Метод “ls” универсальный. С ним можно работать и со стандартным HDFS dfs -ls, или, к примеру, вывести список разделов той или иной директории/таблицы.
Метод “write” записывает просчеты в виде датамартов и имеет следующие аргументы:
df – объект датафрейм (Spark DataFrame);
name – название сущности/датамарта;
Partition By – поле, по которому ведется партиционирование набора данных;
repartition – перераспределение данных;
mode – ‘overwrite’ или ‘append’;
CSV – по умолчанию ‘False’, но можно включить запись CSV-файлов.
Метод “read” предназначен для считывания сущности/датамарта в папке эксперимента и имеет следующие аргументы:
instance – название датамарта в Task или неполный путь до референса в формате 'fin-000/name'.
date – дата чтения. Если даты нет, датамарт считается целиком (все партиции).
fmt – маска даты. Принимает формат Python-масок. Если его нет, то берутся 5 рандомных партиций из источника и автоматически идет подбор формата дат.
key – ключ партиционирования. Если его нет, то берутся 5 рандомных партиций из источника и автоматически идет подбор ключа партиционирования.
dates – даты чтения диапазона. Если какие-то даты из диапазона не были найдены, то прочитается только то, что есть. И отобразится warning о том, что какие-то партиции не загрузились. Список незагрузившихся дат будет в warning.
CSV – индикатор считывания CSV.
Ниже приведены примеры использования метода “read”:
>>>fin_000 = Task("fin-000", spark)
Чтение всего пути
>>>fin_000.read("geo_features")
#'/fin_tech/tasks/fin_000/geo_features'
Чтение определенного формата во всем пути
>>>fin_000.read("geo_features", "*", "*.parq")
#'/fin_tech/tasks/fin_000/geo_features/*/*.parq/'
Чтение определённой патриции
>>>fin_000.read("geo_features", TIME_KEY='2020-01-01')
#'/fin_tech/tasks/fin_000/geo_features/TIME_KEY=2020-01-01'
Чтение определённой даты с заданным типом даты
```
fin_000.read(
"geo_features",
date = datetime(2020, 1, 1),
fmt = "%Y-%m"
)
```
#'/fin_tech/tasks/fin_000/geo_features/2020-01'
Чтение определённой даты с заданным типом даты и партиции
>>>fin_000.read("geo_features", date=datetime(2020, 1, 1), fmt="%Y%m", key="MONTH_KEY")
#'/fin_tech/tasks/fin_000/geo_features/MONTH_KEY=202001'
Чтение списка дат
>>>fin_000.read("geo_features", dates=pd.date_range("2020-01-01", "2020-09-01", freq="MS"))
#'/fin_tech/tasks/fin_000/geo_features/{01-2020,02-2020,03-2020,04-2020,05-2020,06-2020,07-2020,08-2020,09-2020}'
Чтение списка дат с определённым типом дат
>>>fin_000.read("geo_features", dates=pd.date_range("2020-01-01", "2020-10-01", freq="MS"), fmt="%m-%Y"))
#'/fin_tech/tasks/fin_000/geo_features/{01-2020,02-2020,03-2020,04-2020,05-2020,06-2020,07-2020,08-2020,09-2020,10-2020}'
Как переиспользовать данные
В ситуациях, когда проверяем несколько гипотез на одних и тех же данных, удобнее переиспользовать витрины данных или модели завершенных экспериментов без их пересчета. Мы создаем референс на результаты закрытых задач, чтобы не забивать HDFS дубликатами.
Предположим, fin-000 – это старый эксперимент. В его HDFS-каталоге хранится несколько таблиц, включая suspended_inactive_abons, которая содержит 4 партиции. В новом эксперименте fin-999 мы переиспользуем партиции таблицы, но при этом для завершения эксперимента не хватает разделов partition_5 и partition_6.
По нашим правилам, нельзя дописывать новые партиции в закрытую задачу fin-000, чтобы не нарушить ее воспроизводимость. Поэтому в папке задачи fin-999 создается каталог fin-000 с пустой подпапкой suspended_inactive_abons, а затем в рамках нового эксперимента запускается код fin-000 для создания новых разделов partition_5 и partition_6.
Для примера мы допустили ошибку и снова просчитали уже существующий partition_4. При считывании в рамках коннектора fin-999 используется метод “read” со ссылкой на старую задачу fin-000.
Ридер считывает все партиции из старой и новой задач, а затем объединяет (union) результаты считывания. Дубликат не принимается во внимание при считывании. Необходимо вручную удалить раздел из нового эксперимента.
Так мы автоматизировали коннектор между завершенными и новыми экспериментами, а также создание необходимых каталогов и подкаталогов для переиспользования данных.
Ближайшие планы
Первая задача, которая стоит перед командой, это разработка пайплайнов, которые будут работать с пайплайнами DVC, что позволит:
автоматизировать и упорядочить процессы предварительной обработки данных;
сохранить время и ментальное здоровье аналитика;
обеспечить совпадение просчетов в продуктивной и рабочей средах за счет сериализации в среде разработки и десериализации в продуктивной среде;
сократить появление ошибок при переносе модели и исключить дополнительную поддержку кода в продуктивной среде.
Следующая задача – реализация автоматических smoke-тестов в последнем стейдже пайплайна в CI/CD. Проверка будет состоять в сравнении скоров модели, полученных при прогоне модели в локальных и продуктовых средах на одной и той же валидационной выборке. Близкие к идентичным результаты подтверждают, что модель, прошедшая дополнительную обработку при установке в продуктив, идентична тестовой модели. Примером обработок служит замена нулов, замена функций агрегаций, конвертация из PySpark датафрейма в Pandas и наоборот, в процессе которой модель может «поехать» из-за применения специфических методов и функций каждого инструмента. К примеру, типы данных PySpark могут не совпадать с типами данных Pandas или LGBM.
Воспроизводимость пайплайна, который состоит из исходного кода, обеспечивает инструмент DVC. Он отлично справляется с версионированием пайплайнов и данных, позволяет запускать код в правильном порядке, логировать изменения в данных/коде. И, благодаря механизму хэширования и логирования мета-данных, он дает нам полную уверенность в том, что эксперимент полностью воспроизводим. При повторном прогоне пайплайна без изменений, DVC сигнализирует, что никаких правок в коде и в данных не было и что надобности запускать пайплайн снова нет. Остается добавить этот тест-кейс в CI/CD.
В наши цели также входит автоматическая проверка блокнотов на воспроизводимость в CI/CD. Мы уже рассказали о правиле последовательности блокнотов при последовательном запуске ячеек одного блокнота и запуске нескольких блокнотов. Но мы пойдем дальше и сделаем надстройку над CI/CD, которая будет несколько раз последовательно запускать ноутбуки в некотором обособленном окружении, а затем сравнивать созданные логи, CSV-файлы и другие артефакты. Совпадение артефактов будет свидетельствовать о воспроизводимости блокнотов.
Будет отлично, если в комментариях вы расскажете, как выстроен workflow в вашей компании. А еще круче – поделитесь своими лучшими практиками того, как вы достигаете полной воспроизводимости экспериментов.