Disclaimer

Статья предполает, что вы знаете что такое Databricks, что такое ноутбуки, кластера и джобы (воркфлоус) в нём. Поиск по хабру предлагает вот эту статью для ознакомления.

Вступление

Я продолжаю серию статей, где анализирую свой текущий проект в области BigData. Первая статья была посвящена найму питонистов, вторая - проблемам внедрения новых процессов в распределённых командах. Однако помимо чисто управленческих задач, в проекте есть ряд технических сложностей. Одной из таких сложностей является тестирование платформы обработки данных.

Если с тестированием более привычных программных продуктов более-менее ясно, то вот с BigData возникает множество вопросов. Если у вас Java - у вас есть как минимум JUnit, а абсолютное большинство фреймворков заботятся о простоте тестирования. Например Spring посвящает этому очень много документации. Тестирование фронтенда тоже хорошо проработано: от Selenium до JestJs. Тестировать блокчейн и смарт-контракты одно удовольствие (хотя бы на Ethereum сети благодаря Truffle Suite)

На Питоне тоже есть свои фреймворки для тестирования и вопрос этот вполне себе проработан. Даже сам Databricks, на базе которого построена наша платформа обработки данных, предлагает свои пути для тестирования. Например вот неплохой официальный гайд по тестированию: Unit testing for notebooks. Однако нужно иметь репозиторий внутри самого Databricks. А у нас код хранится в корпоративном GitLab, который недоступен из нашего же Databricks. Очень неудобно, но отдел безопасност�� не даёт разрешения настроить доступ из внешнего ресурса во внутреннюю сеть.

Есть возможность по тестированию прямо внутри ноутбуков Databricks: Test Databricks notebooks. Из минусов: сложность в организации кода, урезанный импорт ноутбуков внутри Databricks.

Мы сделали несколько попыток тестировать нашу платформу и так, и сяк, но всё выходило весьма "велосипедно". И тут мы наткнулись и решили попробовать Nutter Framework. То, что он обещал очень подходило для наших потребностей. Респект Microsoft за такой тул в opensource.

Nutter: очень краткое руководство

Главная цель данного фреймворка - дать возможность легко и быстро тестировать ноутбуки в Databricks. Nutter предлагает определённый подход к написанию тестов и несколько подходов к их выполнению.

Самый простой тест

Во-первых, чтобы начать работать с тестами, необходимо поставить библиотеку nutter на тот кластер, где будут выполняться тестовые ноутбуки:

Далее создаём ноутбук, импортируем базовый класс NutterFixture и начинаем писать тестовый класс:

from runtime.nutterfixture import NutterFixture, tag
class FirstTestFixture(NutterFixture):
    def run_test(self):
        dbutils.notebook.run('notebook_to_test', 600)
    def assertion_test(self):
        assert True
    def run_test_secundo(self):
        dbutils.notebook.run('another_notebook_to_test', 600)
    def assertion_test_secundo(self):
        assert True

Чтобы выполнить и увидеть результаты выполнения тестов, запускаем сам тестовый класс:

result = FirstTestFixture().execute_tests()
print(result.to_string())

Тесты можно запускать через коммандную строку (см. ниже как это сделать). И чтобы вернуть результаты выполнения тестов в nutter-cli, нужно выполнить в конце ноутбука:

result.exit(dbutils)

Однако есть неприятное ограничение: внутри result.exit вызывает dbutils.notebook.exit() , что приводит к тому, что Databricks прячет все выводы от команд print . Поэтому если запускать тесты прямо ноутбуках, строчку с exit нужно закомментировать.

Правила именования тестов

Внутри одного тест-класса может быть несколько тестов. Каждый тест состоит из 1 обязательного метода и 3 дополнительных:

  • before_(testname) - выполнятеся перед run Используется для настройки тестов и выполнения подготовительных действий. Необязателен.

  • run_(testname) - выполняется после before (если он имеется) или первым. Тут должны быть действия, которые непосредственно тестируются, например вызов ноутбука. Необязателен.

  • assertion_(testname) - выполняется после run (если он есть). Содержит проверки состояний. Можно использовать assert из любых питоновских тестовых библиотек. Каждый тест-класс должен содержать хотя бы 1 assertion-метод.

  • after_(testname) - выполняется после assertion. Обычно используется, чтобы вернуть состояние тестовых объектов в исходное или "почистить" что-то после выполнения теста.

Пример: методы run_checkpoint_location_generation и assertion_checkpoint_location_generation будут трактоваться как 1 тест-кейс.

Дополнительно есть два метода:

  • before_all

  • after_all

Они выполняются соответственно до и после всех тестов. Если вы хотите использовать несколько о��дельных assertion на 1 тест-кейс, тогда нужно использовать before_all и для подготовки теста и для вызова тестируемых действий.

from runtime.nutterfixture import NutterFixture, tag
class MultiTestFixture(NutterFixture):
  def before_all(self):
    dbutils.notebook.run('notebook_under_test', 600, args) 
    #...
  def assertion_test_case_1(self):
    #...
  def assertion_test_case_2(self):
    #...
  def after_all(self):
    #... 

Nutter гарантирует выполнение тестов в алфавитном порядке на основе имени тест-кейса.

Также Nutter позволяет сохранять состояние и передавать данные между тест-кейсами через параметры конструктора:

 class TestFixture(NutterFixture):
  def __init__(self):
    self.file = '/data/myfile'
    NutterFixture.__init__(self)

Паралелльный запуск тестов

Когда тестов много, то хочется запускать их в несколько потоков и сократить время выполнения. Последняя версия Nutter`а (0.1.35) позволяет это сделать с помощью NutterFixtureParallelRunner :

from runtime.runner import NutterFixtureParallelRunner
#...
parallel_runner = NutterFixtureParallelRunner(num_of_workers=2)
parallel_runner.add_test_fixture(FirstTestFixture())
parallel_runner.add_test_fixture(AnotherTestFixture())
result = parallel_runner.execute()
print(result.to_string())

Резульаты тестов объединяются и показываются в удобном виде:

Notebook: N/A - Lifecycle State: N/A, Result: N/A
Run Page URL: N/A
============================================================
PASSING TESTS
------------------------------------------------------------
test_another_case (40.98681362399998 seconds)
test (40.99089991400001 seconds)
test_secundo (10.680228194999927 seconds)


============================================================

Nutter CLI

Одна из главных фишек Nutter`а - это возможность запускать тесты из командной строки.

Для начала установим Nutter через pip:

$ pip install nutter

Затем зададим 2 переменных окружения, чтобы предоставить доступ Databricks:

Linux

export DATABRICKS_HOST=<HOST>
export DATABRICKS_TOKEN=<TOKEN>

Windows PowerShell

$env:DATABRICKS_HOST="HOST"
$env:DATABRICKS_TOKEN="TOKEN"

Для начала можно показать какие тесты могут быть исполнены:

 $ nutter list /common/test/nutter

Получаем что-то такое:

Nutter Version 0.1.35
++++++++++++++++++++++++++++++++++++++++++++++++++

--> Looking for tests in /common/test/nutter
--> 3 tests found

Tests Found
-------------------------------------------------------
Name:   test_StreamDeltaTransformation_multi
Path:   /common/test/nutter/test_StreamDeltaTransformation_multi

Name:   test_RefineryFromJsonMerge_single
Path:   /common/test/nutter/test_RefineryFromJsonMerge_single

Name:   test_StreamDeltaTransformation_single
Path:   /common/test/nutter/test_StreamDeltaTransformation_single

-------------------------------------------------------

Total: 3

А теперь запустим какой-то конкретный тест:

 $ nutter run /common/test/nutter/test_StreamDeltaTransformation_single --cluster_id '0000-099999-abcdabcd'

Эта команда запускает конкретный тест-класс на определённом кластере (на кластере должна быть установлены библиотека nutter)

Чтобы запустить все тест-классы, можно использовать следующую команду

$ nutter run /common/test/nutter/ --cluster_id 0123-12334-tonedabc --recursive 

Имена тестов должны начинаться с test_

Флаг --recursiveобеспечивает поиск тестовых классов рекурсивно во всех поддиректориях.

Передавать какие-то параметры можно через опцию --notebook_params:

$ nutter run /common/test/nutter/* --cluster_id 0123-12334-tonedabc --notebook_params "{\"example_key_1\": \"example_value_1\", \"example_key_2\": \"example_value_2\"}"

Пример реального тест-класса

Спасибо @raukasky за написание конкретных тест-классов для сервисов платформы.

# команда ниже импортирует длинные yaml-конфиги, которые отправляются на вход нашим сервисам
%run ../configs/stream_delta_transformation_test_cases
#...
from runtime.runner import NutterFixtureParallelRunner
from runtime.nutterfixture import NutterFixture
from cds_utils.platform import ConfigRunner # это класс, который может запускать сервисы платформы
#...
class TestCase_01(NutterFixture):
    def __init__(self):
        self.test_uc_path = 'dataservices_nonprod.test_nonprod'
        self.s3_path = 's3://super_bucket.data'
        NutterFixture.__init__(self)
    def before_all(self):
        # так как у нас много проверок, то мы выполняем код в этом методе
        ConfigRunner(dbutils, spark).run_yaml_config(param_yaml=test_case_01, notebook_path='/../../../cds_platform/stream_delta_transformation')
    def assertion_isExistTable(self):
        # проверяем, что сервис вообще создал таблицу
        assert(spark.catalog.tableExists(f'{self.test_uc_path}.tindata_expected')==True)
    def assertion_table_location(self):
        # проверяем, что таблица созадан в нужном месте (как external)
        location = sql(f"describe detail {self.test_uc_path}.tindata_expected").select('location').collect()[0]['location']
        assert(location == f'{self.s3_path}/test/hive/test_nonprod/tindata_expected')
    def assertion_checkpoint_location(self):
        # проверяем, что после работы сервиса чекпоинт есть, и он в правильном месте
        location = [file.path for file in dbutils.fs.ls(f'{self.s3_path}/test/checkpoint/test_nonprod/tindata_expected')][0]
        assert(location == f'{self.s3_path}/test/checkpoint/test_nonprod/tindata_expected/tindata/')
    def assertion_partitions(self):
        # проверяем, что конфиг прочитался правильно и таблица имеют верные партиции (как указано в yaml-конфиге)
        partitions = sql(f"describe detail dataservices_nonprod.test_nonprod.tindataxref").select('partitionColumns').collect()[0]['partitionColumns']
        assert(partitions == ['data_source_date'])
    def assertion_dedupe_column(self):
        # проверем, что дедупликация входных данных по первичному ключу прошла успешно
        # и что количество записей в целевой таблице ожидаемо меньше, чем в исходной
        num_of_change = spark.sql(f"select * from {self.test_uc_path}.tindataxref where source_id = 27 and data_source_ts = '2022-11-04T00:00:11.129+0000'").count()
        count_source = spark.sql(f"select * from {self.test_uc_path}.tindataxref").count()
        count_target = spark.sql(f"select * from {self.test_uc_path}.tindataxref_expected").count()
        assert(count_target == count_source-(num_of_change-1))  
    def after_all(self):
        # чистим за собой таблицы и директории с данными и чекпоинтами
        spark.sql(f'drop table {self.test_uc_path}.tindata_expected')
        dbutils.fs.rm(f'{self.s3_path}/test/hive/test_nonprod/tindata_expected', True)
        dbutils.fs.rm(f'{self.s3_path}/test/checkpoint/test_nonprod/tindata_expected', True)

Такой тест-класс можно отнести к уровню интеграционного тестирования: сервис stream_delta_transformation проверяется в окружении, максимально приближенном к реальному. Мы не внедряем моки или "шпионские" объекты, проверки идут на реально созданных таблицах и данных. Тем не менее, мы можем проверить некоторые методы внутри сервиса по тем результатам, что они выдают по итогам работы сервиса. Например мы проверяем, что целевые таблицы создаются, что они находятся в правильном месте (потому что все таблицы у нас external), и так далее. Кроме проверок работы конкретных методов, мы можем прочитать данные из целевой таблицы и проверить, что трансформация данных прошла корректно.

Пока мы запускаем это вручную из командной строки, но это уже огромный шаг вперёд для того, чтобы обеспечить интеграционное тестирование. А так как это вполне себе обычный python-код, то это открывает возможности проверки всего, что делают сервисы. Например, вполне реально написать тест на проверку публикации данных из Delta Lake в MySql, потому что в тестовом классе мы можем обратить в MySql напрямую и проверить, что там оказалось.

Выводы

Итак, Nutter Framework позволил нашей команде:

  • проводить интеграционное тестирование сервисов, написанных как ноутбуки в Databricks;

  • запускать тесты как прямо в ноутубках в Databricks, так и из командной строки;

  • написать набор регрессионных тестов на отдельные методы сервисов. Не так удобно как чистые unit-тесты, но значительно лучше, чем ничего. Время проведения регрессионного тестирования значительно снизилось.

В дальнейшем мы планируем внедрить запуск тестов в процесс сборки и деплоя платформы, непосредственно в GitLab CI/CD. Как это делать для Azure есть в официальной документации.

Для нашего проекта Nutter стал отличным выбором и очень удобным инструментом тестирования.

В конце этой статьи хочу порекомендовать вам полезный вебинар от OTUS, где расскажут о профессии автоматизатора тестирования, об актуальных технологиях, пользе использования автотестов, а так же о нужных навыках и особенностях собеседований.