Тесты — это хорошо. Медленные тесты — это CI на 40 минут и разработчики, которые забывают их запускать локально. Разберём, как ускорить pytest от простого -n auto до распределённого запуска в Kubernetes.
Уровень 1: pytest-xdist
Базовый инструмент для параллелизации. Устанавливается одной командой:
pip install pytest-xdist
pytest -n auto # Воркеров = количество CPU ядер
pytest -n 8 # Явно 8 воркеровКаждый воркер — отдельный процесс Python. Тесты распределяются между ними, результаты собираются в конце.
На 8-ядерной машине можно получить ускорение в 5-7 раз.
Но просто добавить флаг недостаточно. Появятся странные падения тестов, которые раньше проходили.
Стратегии распределения
Флаг --dist определяет, как тесты раздаются воркерам:
load (дефолт) — тесты раздаются по мере освобождения воркера. Простой и быстрый, но есть проблема, фикстуры уровня module/session создаются в каждом воркере заново.
loadscope — группировка по модулям и классам. Все тесты одного модуля идут в один воркер. Фикстура scope="module" создаётся один раз на модуль, как и ожидается.
pytest -n 8 --dist loadscopeloadfile — ещё жёстче: все тесты из одного файла — одному воркеру. Полезно, если тесты в файле сильно связаны.
loadgroup — явная группировка через маркер. Даёт нам обширный контроль:
@pytest.mark.xdist_group("database")
def test_create_user():
...
@pytest.mark.xdist_group("database")
def test_delete_user():
...
@pytest.mark.xdist_group("api")
def test_external_api():
...Тесты с одинаковым группой гарантированно попадают в один воркер.
worksteal — воркеры нагло крадут задачи у тех, кто ещё не закончил. Хорошо балансирует нагрузку при тестах разной длительности.
Для большинства проектов loadscope самый адекватный выбор. Разумный компромисс между параллелизмом и переиспользованием фикстур.
Уровень 2: изоляция фикстур
Session-фикстуры в xdist
Каждый воркер xdist — отдельный процесс со своей сессией pytest. Фикстура scope="session" создаётся в каждом воркере независимо.
Это значит, если session-фикстура поднимает тестовую базу или загружает большой файл, это произойдёт N раз (по числу воркеров).
Для тяжёлых shared-ресурсов используем межпроцессную координацию через файловые блокировки:
from filelock import FileLock
import json
@pytest.fixture(scope="session")
def shared_database(tmp_path_factory, worker_id):
# worker_id: "master" (без xdist), "gw0", "gw1", ... (с xdist)
if worker_id == "master":
# Запуск без xdist — просто создаём
return create_test_database()
# С xdist — координация через файлы
root = tmp_path_factory.getbasetemp().parent # Общая для всех воркеров
lock_file = root / "db.lock"
data_file = root / "db.json"
with FileLock(lock_file):
if data_file.exists():
# Кто-то уже создал — читаем
return json.loads(data_file.read_text())
# Мы первые — создаём и сохраняем
db_info = create_test_database()
data_file.write_text(json.dumps({
"host": db_info.host,
"port": db_info.port,
"name": db_info.name,
}))
return db_infoПервый воркер, захвативший lock, создаёт ресурс и записывает информацию в файл. Остальные ждут на lock, потом читают готовое.
Изоляция данных между воркерами
Shared база — это хорошо, но воркеры не должны мешать друг другу. Один создал пользователя, другой удалил — оба упали.
Простое решение — отдельная схема (namespace) на каждый воркер:
@pytest.fixture(scope="session")
def db_connection(shared_database, worker_id):
conn = psycopg2.connect(
host=shared_database["host"],
port=shared_database["port"],
dbname=shared_database["name"],
)
# Уникальная схема для воркера
schema = f"test_{worker_id.replace('gw', '')}" if worker_id != "master" else "test_main"
with conn.cursor() as cur:
cur.execute(f"DROP SCHEMA IF EXISTS {schema} CASCADE")
cur.execute(f"CREATE SCHEMA {schema}")
cur.execute(f"SET search_path TO {schema}")
conn.commit()
yield conn
# Cleanup
with conn.cursor() as cur:
cur.execute(f"DROP SCHEMA {schema} CASCADE")
conn.commit()
conn.close()Воркер gw0 работает в схеме test_0, gw1 — в test_1. Полная изоляция, никаких конфликтов.
Для Redis — prefix на ключи: f"{worker_id}:user:123". Для файловой системы отдельные директории.
Уровень 3: балансировка по времени
Проблема: xdist распределяет тесты поровну по количеству, но не по времени. Один воркер получил 50 быстрых юнит-тестов и закончил за минуту. Другой 10 медленных интеграционных и работает полчаса.
Общее время определяется самым медленным воркером. Нужна балансировка по времени выполнения.
pytest-split решает эту задачу:
pip install pytest-split
# Первый прогон — собираем статистику времени
pytest --store-durations --durations-path=.test_durations
# Последующие прогоны — используем для балансировки
pytest --splits 8 --group 1 --durations-path=.test_durationsФайл .test_durations (JSON) хранит время выполнения каждого теста из предыдущего прогона. При запуске с --splits N --group M pytest-split распределяет тесты так, чтобы суммарное время каждой из N групп было примерно одинаковым.
Тест на 45 секунд не попадёт в одну группу с другим таким же,они разойдутся по разным воркерам.
Файл .test_durations стоит добавить в репозиторий и периодически обновлять (например, ночным CI-прогоном).
Уровень 4: Kubernetes
8 ядер на CI-машине — это 8 воркеров максимум. А тестов всё ещё 3000, и даже с параллелизацией прогон занимает 20 минут.
Время горизонтально масштабироваться — запускать тесты в нескольких подах Kubernetes.
Testkube
Testkube — фреймворк для запуска тестов в K8s, представленный как Custom Resource:
apiVersion: testworkflows.testkube.io/v1
kind: TestWorkflow
metadata:
name: pytest-parallel
spec:
content:
git:
uri: https://github.com/company/repo
branch: main
container:
image: python:3.11
workingDir: /data/repo
steps:
- name: Install dependencies
shell: pip install -r requirements.txt
- name: Run tests
parallel:
count: 16 # 16 параллельных подов
transfer:
- from: /data/repo
shell: |
pytest tests/ \
--splits 16 \
--group {{ index }} \
--junitxml=/data/results/junit-{{ index }}.xmlTestkube создаёт 16 подов, каждый запускает свою 1/16 часть тестов (благодаря pytest-split). После завершения собирает JUnit-отчёты, показывает результаты в дашборде.
Установка через Helm:
helm repo add testkube https://kubeshop.github.io/helm-charts
helm install testkube testkube/testkubeСвоё решение на Kubernetes Jobs
Иногда хочется больше контроля, чем даёт Testkube. Минимальная реализация на Python с kubernetes-client:
from kubernetes import client, config
class K8sTestRunner:
def __init__(self):
config.load_incluster_config() # Или load_kube_config() для локальной разработки
self.batch_v1 = client.BatchV1Api()
self.core_v1 = client.CoreV1Api()
self.namespace = "testing"
def run_parallel(self, workers: int, image: str) -> dict:
job_names = []
for i in range(workers):
job = self._create_job(i, workers, image)
self.batch_v1.create_namespaced_job(
namespace=self.namespace,
body=job
)
job_names.append(job.metadata.name)
# Ждём завершения всех джобов
results = self._wait_for_completion(job_names)
# Собираем логи/результаты
return self._aggregate_results(job_names)
def _create_job(self, index: int, total: int, image: str):
return client.V1Job(
metadata=client.V1ObjectMeta(
name=f"pytest-{index}",
labels={"app": "pytest-runner"}
),
spec=client.V1JobSpec(
backoff_limit=2,
template=client.V1PodTemplateSpec(
spec=client.V1PodSpec(
restart_policy="Never",
containers=[
client.V1Container(
name="pytest",
image=image,
command=[
"pytest", "tests/",
f"--splits={total}",
f"--group={index + 1}",
"--junitxml=/results/junit.xml"
],
resources=client.V1ResourceRequirements(
requests={"memory": "512Mi", "cpu": "500m"},
limits={"memory": "1Gi", "cpu": "1000m"}
)
)
]
)
)
)
)
def _wait_for_completion(self, job_names: list, timeout: int = 1800):
# Polling или watch API
...Плюсы своего решения: полный контроль над ресурсами, retry-логикой, сбором артефактов. Минусы: больше кода для поддержки.
Что выбрать
Масштаб проекта | Решение | Ожидаемое ускорение |
|---|---|---|
До 500 тестов |
| 3-5x |
500-2000 тестов | xdist + loadscope + pytest-split | 5-8x |
2000+ тестов | Kubernetes (Testkube или своё) | 10-20x |
Путь оптимизации:
Начните с
pytest -n auto— бесплатное ускорение в несколько разРазберитесь с падающими тестами — это инвестиция в качество
Добавьте
--dist loadscopeи pytest-split для лучшей балансировкиЕсли этого мало — Kubernetes, там потолок практически не ограничен
Главное, не останавливайтесь на первом шаге.

Если ускорение тестов упирается не в флаги, а в культуру и инструменты, стоит прокачать базу. На курсе Python QA Engineer разбирают PyTest, автотесты UI/API (Selenium 4, Appium), запуск в CI и практики DevOps, плюс диагностику проблем в веб-приложениях — чтобы параллелизация работала в проде, а не только в демо. Готовы к серьезному обучению? Пройдите входной тест.
Чтобы узнать больше о формате обучения и познакомиться с преподавателями, приходите на бесплатные демо-уроки:
26 января в 20:00. «API автотестирование облака с tempest». Записаться
10 февраля в 20:00. «Docker Compose для тестировщика: легко о сложном». Записаться
16 февраля в 20:00. «Моки в автотестировании на pythonv». Записаться
