Тесты — это хорошо. Медленные тесты — это CI на 40 минут и разработчики, которые забывают их запускать локально. Разберём, как ускорить pytest от простого -n auto до распределённого запуска в Kubernetes.
Уровень 1: pytest-xdist
Базовый инструмент для параллелизации. Устанавливается одной командой:
pip install pytest-xdist pytest -n auto # Воркеров = количество CPU ядер pytest -n 8 # Явно 8 воркеров
Каждый воркер — отдельный процесс Python. Тесты распределяются между ними, результаты собираются в конце.
На 8-ядерной машине можно получить ускорение в 5-7 раз.
Но просто добавить флаг недостаточно. Появятся странные падения тестов, которые раньше проходили.
Стратегии распределения
Флаг --dist определяет, как тесты раздаются воркерам:
load (дефолт) — тесты раздаются по мере освобождения воркера. Простой и быстрый, но есть проблема, фикстуры уровня module/session создаются в каждом воркере заново.
loadscope — группировка по модулям и классам. Все тесты одного модуля идут в один воркер. Фикстура scope="module" создаётся один раз на модуль, как и ожидается.
pytest -n 8 --dist loadscope
loadfile — ещё жёстче: все тесты из одного файла — одному воркеру. Полезно, если тесты в файле сильно связаны.
loadgroup — явная группировка через маркер. Даёт нам обширный контроль:
@pytest.mark.xdist_group("database") def test_create_user(): ... @pytest.mark.xdist_group("database") def test_delete_user(): ... @pytest.mark.xdist_group("api") def test_external_api(): ...
Тесты с одинаковым группой гарантированно попадают в один воркер.
worksteal — воркеры нагло крадут задачи у тех, кто ещё не закончил. Хорошо балансирует нагрузку при тестах разной длительности.
Для большинства проектов loadscope самый адекватный выбор. Разумный компромисс между параллелизмом и переиспользованием фикстур.
Уровень 2: изоляция фикстур
Session-фикстуры в xdist
Каждый воркер xdist — отдельный процесс со своей сессией pytest. Фикстура scope="session" создаётся в каждом воркере независимо.
Это значит, если session-фикстура поднимает тестовую базу или загружает большой файл, это произойдёт N раз (по числу воркеров).
Для тяжёлых shared-ресурсов используем межпроцессную координацию через файловые блокировки:
from filelock import FileLock import json @pytest.fixture(scope="session") def shared_database(tmp_path_factory, worker_id): # worker_id: "master" (без xdist), "gw0", "gw1", ... (с xdist) if worker_id == "master": # Запуск без xdist — просто создаём return create_test_database() # С xdist — координация через файлы root = tmp_path_factory.getbasetemp().parent # Общая для всех воркеров lock_file = root / "db.lock" data_file = root / "db.json" with FileLock(lock_file): if data_file.exists(): # Кто-то уже создал — читаем return json.loads(data_file.read_text()) # Мы первые — создаём и сохраняем db_info = create_test_database() data_file.write_text(json.dumps({ "host": db_info.host, "port": db_info.port, "name": db_info.name, })) return db_info
Первый воркер, захвативший lock, создаёт ресурс и записывает информацию в файл. Остальные ждут на lock, потом читают готовое.
Изоляция данных между воркерами
Shared база — это хорошо, но воркеры не должны мешать друг другу. Один создал пользователя, другой удалил — оба упали.
Простое решение — отдельная схема (namespace) на каждый воркер:
@pytest.fixture(scope="session") def db_connection(shared_database, worker_id): conn = psycopg2.connect( host=shared_database["host"], port=shared_database["port"], dbname=shared_database["name"], ) # Уникальная схема для воркера schema = f"test_{worker_id.replace('gw', '')}" if worker_id != "master" else "test_main" with conn.cursor() as cur: cur.execute(f"DROP SCHEMA IF EXISTS {schema} CASCADE") cur.execute(f"CREATE SCHEMA {schema}") cur.execute(f"SET search_path TO {schema}") conn.commit() yield conn # Cleanup with conn.cursor() as cur: cur.execute(f"DROP SCHEMA {schema} CASCADE") conn.commit() conn.close()
Воркер gw0 работает в схеме test_0, gw1 — в test_1. Полная изоляция, никаких конфликтов.
Для Redis — prefix на ключи: f"{worker_id}:user:123". Для файловой системы отдельные директории.
Уровень 3: балансировка по времени
Проблема: xdist распределяет тесты поровну по количеству, но не по времени. Один воркер получил 50 быстрых юнит-тестов и закончил за минуту. Другой 10 медленных интеграционных и работает полчаса.
Общее время определяется самым медленным воркером. Нужна балансировка по времени выполнения.
pytest-split решает эту задачу:
pip install pytest-split # Первый прогон — собираем статистику времени pytest --store-durations --durations-path=.test_durations # Последующие прогоны — используем для балансировки pytest --splits 8 --group 1 --durations-path=.test_durations
Файл .test_durations (JSON) хранит время выполнения каждого теста из предыдущего прогона. При запуске с --splits N --group M pytest-split распределяет тесты так, чтобы суммарное время каждой из N групп было примерно одинаковым.
Тест на 45 секунд не попадёт в одну группу с другим таким же,они разойдутся по разным воркерам.
Файл .test_durations стоит добавить в репозиторий и периодически обновлять (например, ночным CI-прогоном).
Уровень 4: Kubernetes
8 ядер на CI-машине — это 8 воркеров максимум. А тестов всё ещё 3000, и даже с параллелизацией прогон занимает 20 минут.
Время горизонтально масштабироваться — запускать тесты в нескольких подах Kubernetes.
Testkube
Testkube — фреймворк для запуска тестов в K8s, представленный как Custom Resource:
apiVersion: testworkflows.testkube.io/v1 kind: TestWorkflow metadata: name: pytest-parallel spec: content: git: uri: https://github.com/company/repo branch: main container: image: python:3.11 workingDir: /data/repo steps: - name: Install dependencies shell: pip install -r requirements.txt - name: Run tests parallel: count: 16 # 16 параллельных подов transfer: - from: /data/repo shell: | pytest tests/ \ --splits 16 \ --group {{ index }} \ --junitxml=/data/results/junit-{{ index }}.xml
Testkube создаёт 16 подов, каждый запускает свою 1/16 часть тестов (благодаря pytest-split). После завершения собирает JUnit-отчёты, показывает результаты в дашборде.
Установка через Helm:
helm repo add testkube https://kubeshop.github.io/helm-charts helm install testkube testkube/testkube
Своё решение на Kubernetes Jobs
Иногда хочется больше контроля, чем даёт Testkube. Минимальная реализация на Python с kubernetes-client:
from kubernetes import client, config class K8sTestRunner: def __init__(self): config.load_incluster_config() # Или load_kube_config() для локальной разработки self.batch_v1 = client.BatchV1Api() self.core_v1 = client.CoreV1Api() self.namespace = "testing" def run_parallel(self, workers: int, image: str) -> dict: job_names = [] for i in range(workers): job = self._create_job(i, workers, image) self.batch_v1.create_namespaced_job( namespace=self.namespace, body=job ) job_names.append(job.metadata.name) # Ждём завершения всех джобов results = self._wait_for_completion(job_names) # Собираем логи/результаты return self._aggregate_results(job_names) def _create_job(self, index: int, total: int, image: str): return client.V1Job( metadata=client.V1ObjectMeta( name=f"pytest-{index}", labels={"app": "pytest-runner"} ), spec=client.V1JobSpec( backoff_limit=2, template=client.V1PodTemplateSpec( spec=client.V1PodSpec( restart_policy="Never", containers=[ client.V1Container( name="pytest", image=image, command=[ "pytest", "tests/", f"--splits={total}", f"--group={index + 1}", "--junitxml=/results/junit.xml" ], resources=client.V1ResourceRequirements( requests={"memory": "512Mi", "cpu": "500m"}, limits={"memory": "1Gi", "cpu": "1000m"} ) ) ] ) ) ) ) def _wait_for_completion(self, job_names: list, timeout: int = 1800): # Polling или watch API ...
Плюсы своего решения: полный контроль над ресурсами, retry-логикой, сбором артефактов. Минусы: больше кода для поддержки.
Что выбрать
Масштаб проекта | Решение | Ожидаемое ускорение |
|---|---|---|
До 500 тестов |
| 3-5x |
500-2000 тестов | xdist + loadscope + pytest-split | 5-8x |
2000+ тестов | Kubernetes (Testkube или своё) | 10-20x |
Путь оптимизации:
Начните с
pytest -n auto— бесплатное ускорение в несколько разРазберитесь с падающими тестами — это инвестиция в качество
Добавьте
--dist loadscopeи pytest-split для лучшей балансировкиЕсли этого мало — Kubernetes, там потолок практически не ограничен
Главное, не останавливайтесь на первом шаге.

Если ускорение тестов упирается не в флаги, а в культуру и инструменты, стоит прокачать базу. На курсе Python QA Engineer разбирают PyTest, автотесты UI/API (Selenium 4, Appium), запуск в CI и практики DevOps, плюс диагностику проблем в веб-приложениях — чтобы параллелизация работала в проде, а не только в демо. Готовы к серьезному обучению? Пройдите входной тест.
Чтобы узнать больше о формате обучения и познакомиться с преподавателями, приходите на бесплатные демо-уроки:
26 января в 20:00. «API автотестирование облака с tempest». Записаться
10 февраля в 20:00. «Docker Compose для тестировщика: легко о сложном». Записаться
16 февраля в 20:00. «Моки в автотестировании на pythonv». Записаться
