Тесты — это хорошо. Медленные тесты — это CI на 40 минут и разработчики, которые забывают их запускать локально. Разберём, как ускорить pytest от простого -n auto до распределённого запуска в Kubernetes.

Уровень 1: pytest-xdist

Базовый инструмент для параллелизации. Устанавливается одной командой:

pip install pytest-xdist
pytest -n auto  # Воркеров = количество CPU ядер
pytest -n 8     # Явно 8 воркеров

Каждый воркер — отдельный процесс Python. Тесты распределяются между ними, результаты собираются в конце.

На 8-ядерной машине можно получить ускорение в 5-7 раз.

Но просто добавить флаг недостаточно. Появятся странные падения тестов, которые раньше проходили.

Стратегии распределения

Флаг --dist определяет, как тесты раздаются воркерам:

load (дефолт) — тесты раздаются по мере освобождения воркера. Простой и быстрый, но есть проблема, фикстуры уровня module/session создаются в каждом воркере заново.

loadscope — группировка по модулям и классам. Все тесты одного модуля идут в один воркер. Фикстура scope="module" создаётся один раз на модуль, как и ожидается.

pytest -n 8 --dist loadscope

loadfile — ещё жёстче: все тесты из одного файла — одному воркеру. Полезно, если тесты в файле сильно связаны.

loadgroup — явная группировка через маркер. Даёт нам обширный контроль:

@pytest.mark.xdist_group("database")
def test_create_user():
    ...

@pytest.mark.xdist_group("database")  
def test_delete_user():
    ...

@pytest.mark.xdist_group("api")
def test_external_api():
    ...

Тесты с одинаковым группой гарантированно попадают в один воркер.

worksteal — воркеры нагло крадут задачи у тех, кто ещё не закончил. Хорошо балансирует нагрузку при тестах разной длительности.

Для большинства проектов loadscope самый адекватный выбор. Разумный компромисс между параллелизмом и переиспользованием фикстур.

Уровень 2: изоляция фикстур

Session-фикстуры в xdist

Каждый воркер xdist — отдельный процесс со своей сессией pytest. Фикстура scope="session" создаётся в каждом воркере независимо.

Это значит, если session-фикстура поднимает тестовую базу или загружает большой файл, это произойдёт N раз (по числу воркеров).

Для тяжёлых shared-ресурсов используем межпроцессную координацию через файловые блокировки:

from filelock import FileLock
import json

@pytest.fixture(scope="session")
def shared_database(tmp_path_factory, worker_id):
    # worker_id: "master" (без xdist), "gw0", "gw1", ... (с xdist)
    
    if worker_id == "master":
        # Запуск без xdist — просто создаём
        return create_test_database()
    
    # С xdist — координация через файлы
    root = tmp_path_factory.getbasetemp().parent  # Общая для всех воркеров
    lock_file = root / "db.lock"
    data_file = root / "db.json"
    
    with FileLock(lock_file):
        if data_file.exists():
            # Кто-то уже создал — читаем
            return json.loads(data_file.read_text())
        
        # Мы первые — создаём и сохраняем
        db_info = create_test_database()
        data_file.write_text(json.dumps({
            "host": db_info.host,
            "port": db_info.port,
            "name": db_info.name,
        }))
        return db_info

Первый воркер, захвативший lock, создаёт ресурс и записывает информацию в файл. Остальные ждут на lock, потом читают готовое.

Изоляция данных между воркерами

Shared база — это хорошо, но воркеры не должны мешать друг другу. Один создал пользователя, другой удалил — оба упали.

Простое решение — отдельная схема (namespace) на каждый воркер:

@pytest.fixture(scope="session")
def db_connection(shared_database, worker_id):
    conn = psycopg2.connect(
        host=shared_database["host"],
        port=shared_database["port"],
        dbname=shared_database["name"],
    )
    
    # Уникальная схема для воркера
    schema = f"test_{worker_id.replace('gw', '')}" if worker_id != "master" else "test_main"
    
    with conn.cursor() as cur:
        cur.execute(f"DROP SCHEMA IF EXISTS {schema} CASCADE")
        cur.execute(f"CREATE SCHEMA {schema}")
        cur.execute(f"SET search_path TO {schema}")
    conn.commit()
    
    yield conn
    
    # Cleanup
    with conn.cursor() as cur:
        cur.execute(f"DROP SCHEMA {schema} CASCADE")
    conn.commit()
    conn.close()

Воркер gw0 работает в схеме test_0, gw1 — в test_1. Полная изоляция, никаких конфликтов.

Для Redis — prefix на ключи: f"{worker_id}:user:123". Для файловой системы отдельные директории.

Уровень 3: балансировка по времени

Проблема: xdist распределяет тесты поровну по количеству, но не по времени. Один воркер получил 50 быстрых юнит-тестов и закончил за минуту. Другой 10 медленных интеграционных и работает полчаса.

Общее время определяется самым медленным воркером. Нужна балансировка по времени выполнения.

pytest-split решает эту задачу:

pip install pytest-split

# Первый прогон — собираем статистику времени
pytest --store-durations --durations-path=.test_durations

# Последующие прогоны — используем для балансировки  
pytest --splits 8 --group 1 --durations-path=.test_durations

Файл .test_durations (JSON) хранит время выполнения каждого теста из предыдущего прогона. При запуске с --splits N --group M pytest-split распределяет тесты так, чтобы суммарное время каждой из N групп было примерно одинаковым.

Тест на 45 секунд не попадёт в одну группу с другим таким же,они разойдутся по разным воркерам.

Файл .test_durations стоит добавить в репозиторий и периодически обновлять (например, ночным CI-прогоном).

Уровень 4: Kubernetes

8 ядер на CI-машине — это 8 воркеров максимум. А тестов всё ещё 3000, и даже с параллелизацией прогон занимает 20 минут.

Время горизонтально масштабироваться — запускать тесты в нескольких подах Kubernetes.

Testkube

Testkube — фреймворк для запуска тестов в K8s, представленный как Custom Resource:

apiVersion: testworkflows.testkube.io/v1
kind: TestWorkflow
metadata:
  name: pytest-parallel
spec:
  content:
    git:
      uri: https://github.com/company/repo
      branch: main
      
  container:
    image: python:3.11
    workingDir: /data/repo
    
  steps:
    - name: Install dependencies
      shell: pip install -r requirements.txt
      
    - name: Run tests
      parallel:
        count: 16  # 16 параллельных подов
        transfer:
          - from: /data/repo
      shell: |
        pytest tests/ \
          --splits 16 \
          --group {{ index }} \
          --junitxml=/data/results/junit-{{ index }}.xml

Testkube создаёт 16 подов, каждый запускает свою 1/16 часть тестов (благодаря pytest-split). После завершения собирает JUnit-отчёты, показывает результаты в дашборде.

Установка через Helm:

helm repo add testkube https://kubeshop.github.io/helm-charts
helm install testkube testkube/testkube

Своё решение на Kubernetes Jobs

Иногда хочется больше контроля, чем даёт Testkube. Минимальная реализация на Python с kubernetes-client:

from kubernetes import client, config

class K8sTestRunner:
    def __init__(self):
        config.load_incluster_config()  # Или load_kube_config() для локальной разработки
        self.batch_v1 = client.BatchV1Api()
        self.core_v1 = client.CoreV1Api()
        self.namespace = "testing"
    
    def run_parallel(self, workers: int, image: str) -> dict:
        job_names = []
        
        for i in range(workers):
            job = self._create_job(i, workers, image)
            self.batch_v1.create_namespaced_job(
                namespace=self.namespace, 
                body=job
            )
            job_names.append(job.metadata.name)
        
        # Ждём завершения всех джобов
        results = self._wait_for_completion(job_names)
        
        # Собираем логи/результаты
        return self._aggregate_results(job_names)
    
    def _create_job(self, index: int, total: int, image: str):
        return client.V1Job(
            metadata=client.V1ObjectMeta(
                name=f"pytest-{index}",
                labels={"app": "pytest-runner"}
            ),
            spec=client.V1JobSpec(
                backoff_limit=2,
                template=client.V1PodTemplateSpec(
                    spec=client.V1PodSpec(
                        restart_policy="Never",
                        containers=[
                            client.V1Container(
                                name="pytest",
                                image=image,
                                command=[
                                    "pytest", "tests/",
                                    f"--splits={total}",
                                    f"--group={index + 1}",
                                    "--junitxml=/results/junit.xml"
                                ],
                                resources=client.V1ResourceRequirements(
                                    requests={"memory": "512Mi", "cpu": "500m"},
                                    limits={"memory": "1Gi", "cpu": "1000m"}
                                )
                            )
                        ]
                    )
                )
            )
        )
    
    def _wait_for_completion(self, job_names: list, timeout: int = 1800):
        # Polling или watch API
        ...

Плюсы своего решения: полный контроль над ресурсами, retry-логикой, сбором артефактов. Минусы: больше кода для поддержки.

Что выбрать

Масштаб проекта

Решение

Ожидаемое ускорение

До 500 тестов

pytest -n auto

3-5x

500-2000 тестов

xdist + loadscope + pytest-split

5-8x

2000+ тестов

Kubernetes (Testkube или своё)

10-20x

Путь оптимизации:

  1. Начните с pytest -n auto — бесплатное ускорение в несколько раз

  2. Разберитесь с падающими тестами — это инвестиция в качество

  3. Добавьте --dist loadscope и pytest-split для лучшей балансировки

  4. Если этого мало — Kubernetes, там потолок практически не ограничен

Главное, не останавливайтесь на первом шаге.

Входной тест курса "Python QA Engineer"

Если ускорение тестов упирается не в флаги, а в культуру и инструменты, стоит прокачать базу. На курсе Python QA Engineer разбирают PyTest, автотесты UI/API (Selenium 4, Appium), запуск в CI и практики DevOps, плюс диагностику проблем в веб-приложениях — чтобы параллелизация работала в проде, а не только в демо. Готовы к серьезному обучению? Пройдите входной тест.

Чтобы узнать больше о формате обучения и познакомиться с преподавателями, приходите на бесплатные демо-уроки:

  • 26 января в 20:00. «API автотестирование облака с tempest». Записаться

  • 10 февраля в 20:00. «Docker Compose для тестировщика: легко о сложном». Записаться

  • 16 февраля в 20:00. «Моки в автотестировании на pythonv». Записаться