
Disclaimer
Статья предполает, что вы знаете что такое Databricks, что такое ноутбуки, кластера и джобы (воркфлоус) в нём. Поиск по хабру предлагает вот эту статью для ознакомления.
Вступление
Я продолжаю серию статей, где анализирую свой текущий проект в области BigData. Первая статья была посвящена найму питонистов, вторая - проблемам внедрения новых процессов в распределённых командах. Однако помимо чисто управленческих задач, в проекте есть ряд технических сложностей. Одной из таких сложностей является тестирование платформы обработки данных.
Если с тестированием более привычных программных продуктов более-менее ясно, то вот с BigData возникает множество вопросов. Если у вас Java - у вас есть как минимум JUnit, а абсолютное большинство фреймворков заботятся о простоте тестирования. Например Spring посвящает этому очень много документации. Тестирование фронтенда тоже хорошо проработано: от Selenium до JestJs. Тестировать блокчейн и смарт-контракты одно удовольствие (хотя бы на Ethereum сети благодаря Truffle Suite)
На Питоне тоже есть свои фреймворки для тестирования и вопрос этот вполне себе проработан. Даже сам Databricks, на базе которого построена наша платформа обработки данных, предлагает свои пути для тестирования. Например вот неплохой официальный гайд по тестированию: Unit testing for notebooks. Однако нужно иметь репозиторий внутри самого Databricks. А у нас код хранится в корпоративном GitLab, который недоступен из нашего же Databricks. Очень неудобно, но отдел безопасност�� не даёт разрешения настроить доступ из внешнего ресурса во внутреннюю сеть.
Есть возможность по тестированию прямо внутри ноутбуков Databricks: Test Databricks notebooks. Из минусов: сложность в организации кода, урезанный импорт ноутбуков внутри Databricks.
Мы сделали несколько попыток тестировать нашу платформу и так, и сяк, но всё выходило весьма "велосипедно". И тут мы наткнулись и решили попробовать Nutter Framework. То, что он обещал очень подходило для наших потребностей. Респект Microsoft за такой тул в opensource.
Nutter: очень краткое руководство
Главная цель данного фреймворка - дать возможность легко и быстро тестировать ноутбуки в Databricks. Nutter предлагает определённый подход к написанию тестов и несколько подходов к их выполнению.
Самый простой тест
Во-первых, чтобы начать работать с тестами, необходимо поставить библиотеку nutter на тот кластер, где будут выполняться тестовые ноутбуки:

Далее создаём ноутбук, импортируем базовый класс NutterFixture и начинаем писать тестовый класс:
from runtime.nutterfixture import NutterFixture, tag
class FirstTestFixture(NutterFixture):
def run_test(self):
dbutils.notebook.run('notebook_to_test', 600)
def assertion_test(self):
assert True
def run_test_secundo(self):
dbutils.notebook.run('another_notebook_to_test', 600)
def assertion_test_secundo(self):
assert TrueЧтобы выполнить и увидеть результаты выполнения тестов, запускаем сам тестовый класс:
result = FirstTestFixture().execute_tests()
print(result.to_string())Тесты можно запускать через коммандную строку (см. ниже как это сделать). И чтобы вернуть результаты выполнения тестов в nutter-cli, нужно выполнить в конце ноутбука:
result.exit(dbutils)Однако есть неприятное ограничение: внутри result.exit вызывает dbutils.notebook.exit() , что приводит к тому, что Databricks прячет все выводы от команд print . Поэтому если запускать тесты прямо ноутбуках, строчку с exit нужно закомментировать.
Правила именования тестов
Внутри одного тест-класса может быть несколько тестов. Каждый тест состоит из 1 обязательного метода и 3 дополнительных:
before_(testname) - выполнятеся перед
runИспользуется для настройки тестов и выполнения подготовительных действий. Необязателен.run_(testname) - выполняется после
before(если он имеется) или первым. Тут должны быть действия, которые непосредственно тестируются, например вызов ноутбука. Необязателен.assertion_(testname) - выполняется после
run(если он есть). Содержит проверки состояний. Можно использоватьassertиз любых питоновских тестовых библиотек. Каждый тест-класс должен содержать хотя бы 1 assertion-метод.after_(testname) - выполняется после
assertion. Обычно используется, чтобы вернуть состояние тестовых объектов в исходное или "почистить" что-то после выполнения теста.
Пример: методы run_checkpoint_location_generation и assertion_checkpoint_location_generation будут трактоваться как 1 тест-кейс.
Дополнительно есть два метода:
before_all
after_all
Они выполняются соответственно до и после всех тестов. Если вы хотите использовать несколько о��дельных assertion на 1 тест-кейс, тогда нужно использовать before_all и для подготовки теста и для вызова тестируемых действий.
from runtime.nutterfixture import NutterFixture, tag
class MultiTestFixture(NutterFixture):
def before_all(self):
dbutils.notebook.run('notebook_under_test', 600, args)
#...
def assertion_test_case_1(self):
#...
def assertion_test_case_2(self):
#...
def after_all(self):
#... Nutter гарантирует выполнение тестов в алфавитном порядке на основе имени тест-кейса.
Также Nutter позволяет сохранять состояние и передавать данные между тест-кейсами через параметры конструктора:
class TestFixture(NutterFixture):
def __init__(self):
self.file = '/data/myfile'
NutterFixture.__init__(self)Паралелльный запуск тестов
Когда тестов много, то хочется запускать их в несколько потоков и сократить время выполнения. Последняя версия Nutter`а (0.1.35) позволяет это сделать с помощью NutterFixtureParallelRunner :
from runtime.runner import NutterFixtureParallelRunner
#...
parallel_runner = NutterFixtureParallelRunner(num_of_workers=2)
parallel_runner.add_test_fixture(FirstTestFixture())
parallel_runner.add_test_fixture(AnotherTestFixture())
result = parallel_runner.execute()
print(result.to_string())Резульаты тестов объединяются и показываются в удобном виде:
Notebook: N/A - Lifecycle State: N/A, Result: N/A
Run Page URL: N/A
============================================================
PASSING TESTS
------------------------------------------------------------
test_another_case (40.98681362399998 seconds)
test (40.99089991400001 seconds)
test_secundo (10.680228194999927 seconds)
============================================================Nutter CLI
Одна из главных фишек Nutter`а - это возможность запускать тесты из командной строки.
Для начала установим Nutter через pip:
$ pip install nutterЗатем зададим 2 переменных окружения, чтобы предоставить доступ Databricks:
Linux
export DATABRICKS_HOST=<HOST>
export DATABRICKS_TOKEN=<TOKEN>Windows PowerShell
$env:DATABRICKS_HOST="HOST"
$env:DATABRICKS_TOKEN="TOKEN"Для начала можно показать какие тесты могут быть исполнены:
$ nutter list /common/test/nutterПолучаем что-то такое:
Nutter Version 0.1.35
++++++++++++++++++++++++++++++++++++++++++++++++++
--> Looking for tests in /common/test/nutter
--> 3 tests found
Tests Found
-------------------------------------------------------
Name: test_StreamDeltaTransformation_multi
Path: /common/test/nutter/test_StreamDeltaTransformation_multi
Name: test_RefineryFromJsonMerge_single
Path: /common/test/nutter/test_RefineryFromJsonMerge_single
Name: test_StreamDeltaTransformation_single
Path: /common/test/nutter/test_StreamDeltaTransformation_single
-------------------------------------------------------
Total: 3А теперь запустим какой-то конкретный тест:
$ nutter run /common/test/nutter/test_StreamDeltaTransformation_single --cluster_id '0000-099999-abcdabcd'Эта команда запускает конкретный тест-класс на определённом кластере (на кластере должна быть установлены библиотека nutter)
Чтобы запустить все тест-классы, можно использовать следующую команду
$ nutter run /common/test/nutter/ --cluster_id 0123-12334-tonedabc --recursive Имена тестов должны начинаться с test_
Флаг --recursiveобеспечивает поиск тестовых классов рекурсивно во всех поддиректориях.
Передавать какие-то параметры можно через опцию --notebook_params:
$ nutter run /common/test/nutter/* --cluster_id 0123-12334-tonedabc --notebook_params "{\"example_key_1\": \"example_value_1\", \"example_key_2\": \"example_value_2\"}"Пример реального тест-класса
Спасибо @raukasky за написание конкретных тест-классов для сервисов платформы.
# команда ниже импортирует длинные yaml-конфиги, которые отправляются на вход нашим сервисам
%run ../configs/stream_delta_transformation_test_cases
#...
from runtime.runner import NutterFixtureParallelRunner
from runtime.nutterfixture import NutterFixture
from cds_utils.platform import ConfigRunner # это класс, который может запускать сервисы платформы
#...
class TestCase_01(NutterFixture):
def __init__(self):
self.test_uc_path = 'dataservices_nonprod.test_nonprod'
self.s3_path = 's3://super_bucket.data'
NutterFixture.__init__(self)
def before_all(self):
# так как у нас много проверок, то мы выполняем код в этом методе
ConfigRunner(dbutils, spark).run_yaml_config(param_yaml=test_case_01, notebook_path='/../../../cds_platform/stream_delta_transformation')
def assertion_isExistTable(self):
# проверяем, что сервис вообще создал таблицу
assert(spark.catalog.tableExists(f'{self.test_uc_path}.tindata_expected')==True)
def assertion_table_location(self):
# проверяем, что таблица созадан в нужном месте (как external)
location = sql(f"describe detail {self.test_uc_path}.tindata_expected").select('location').collect()[0]['location']
assert(location == f'{self.s3_path}/test/hive/test_nonprod/tindata_expected')
def assertion_checkpoint_location(self):
# проверяем, что после работы сервиса чекпоинт есть, и он в правильном месте
location = [file.path for file in dbutils.fs.ls(f'{self.s3_path}/test/checkpoint/test_nonprod/tindata_expected')][0]
assert(location == f'{self.s3_path}/test/checkpoint/test_nonprod/tindata_expected/tindata/')
def assertion_partitions(self):
# проверяем, что конфиг прочитался правильно и таблица имеют верные партиции (как указано в yaml-конфиге)
partitions = sql(f"describe detail dataservices_nonprod.test_nonprod.tindataxref").select('partitionColumns').collect()[0]['partitionColumns']
assert(partitions == ['data_source_date'])
def assertion_dedupe_column(self):
# проверем, что дедупликация входных данных по первичному ключу прошла успешно
# и что количество записей в целевой таблице ожидаемо меньше, чем в исходной
num_of_change = spark.sql(f"select * from {self.test_uc_path}.tindataxref where source_id = 27 and data_source_ts = '2022-11-04T00:00:11.129+0000'").count()
count_source = spark.sql(f"select * from {self.test_uc_path}.tindataxref").count()
count_target = spark.sql(f"select * from {self.test_uc_path}.tindataxref_expected").count()
assert(count_target == count_source-(num_of_change-1))
def after_all(self):
# чистим за собой таблицы и директории с данными и чекпоинтами
spark.sql(f'drop table {self.test_uc_path}.tindata_expected')
dbutils.fs.rm(f'{self.s3_path}/test/hive/test_nonprod/tindata_expected', True)
dbutils.fs.rm(f'{self.s3_path}/test/checkpoint/test_nonprod/tindata_expected', True)Такой тест-класс можно отнести к уровню интеграционного тестирования: сервис stream_delta_transformation проверяется в окружении, максимально приближенном к реальному. Мы не внедряем моки или "шпионские" объекты, проверки идут на реально созданных таблицах и данных. Тем не менее, мы можем проверить некоторые методы внутри сервиса по тем результатам, что они выдают по итогам работы сервиса. Например мы проверяем, что целевые таблицы создаются, что они находятся в правильном месте (потому что все таблицы у нас external), и так далее. Кроме проверок работы конкретных методов, мы можем прочитать данные из целевой таблицы и проверить, что трансформация данных прошла корректно.
Пока мы запускаем это вручную из командной строки, но это уже огромный шаг вперёд для того, чтобы обеспечить интеграционное тестирование. А так как это вполне себе обычный python-код, то это открывает возможности проверки всего, что делают сервисы. Например, вполне реально написать тест на проверку публикации данных из Delta Lake в MySql, потому что в тестовом классе мы можем обратить в MySql напрямую и проверить, что там оказалось.
Выводы
Итак, Nutter Framework позволил нашей команде:
проводить интеграционное тестирование сервисов, написанных как ноутбуки в Databricks;
запускать тесты как прямо в ноутубках в Databricks, так и из командной строки;
написать набор регрессионных тестов на отдельные методы сервисов. Не так удобно как чистые unit-тесты, но значительно лучше, чем ничего. Время проведения регрессионного тестирования значительно снизилось.
В дальнейшем мы планируем внедрить запуск тестов в процесс сборки и деплоя платформы, непосредственно в GitLab CI/CD. Как это делать для Azure есть в официальной документации.
Для нашего проекта Nutter стал отличным выбором и очень удобным инструментом тестирования.
В конце этой статьи хочу порекомендовать вам полезный вебинар от OTUS, где расскажут о профессии автоматизатора тестирования, об актуальных технологиях, пользе использования автотестов, а так же о нужных навыках и особенностях собеседований.
Регистрация на вебинар доступна по ссылке
