Всем привет! Меня зовут Андрей, я инженер по автоматизации тестирования в команде Озон Банка, занимающейся разработкой и поддержанием инструментов тестирования, которыми пользуется весь банк.
Сегодня я хотел бы поделиться опытом сбора технических метрик pytest и их анализа в целях выявления узких мест и ускорения выполнения тестов.

Какую проблему решаем?

Когда речь заходит о выборе инструмента для автоматизации тестирования, чаще всего на сцену выходит pytest, так как имеет обширную систему аддонов и позволяет на своей основе собрать фреймворк для тестирования, заточенный под конкретный проект.
На первых этапах внедрения автоматизации обычно все идет достаточно гладко и безболезненно, но с нарастанием количества тестов процедура их запуска обрастает усложняющейся логикой, которая размещается в хуках pytest, фикстурах и других местах.
Все чаще происходят ситуации, когда при запуске тестов возникают задержки и паузы, причины которых становится отлавливать все сложнее и сложнее.
При этом стандартный отчет pytest о результатах выполнения тестов содержит достаточно мало информации, которая могла бы помочь в вопросе уменьшения времени их выполнения.
Именно для решения этой задачи — поиска узких мест — мы и займемся реализацией сбора технических метрик в pytest.

Сейчас у нас около 20000 тестов и более 1000 запусков наборов тестов в CI ежедневно. На этом фоне даже незначительные оптимизации (например, в 5 секунд) для каждого запуска тестов в итоге дают существенный результат.

Для тех, кто еще не знаком с pytest, может быть полезен доклад Andrew Knight - heisenbug / youtube.

Что и зачем будем измерять?

Для эффективного выявления узких мест нам необходимо разобрать выполнение тестов на основные этапы. Каждый из этих этапов соотносится с одним или несколькими pytest-хуками, в которые мы можем встраивать нашу логику:

  1. Запуск и конфигурация pytest (pytest_configure): Начальный этап, где pytest инициализируется, подключаются плагины и применяются основные настройки. Медленная логика здесь замедляет каждый запуск тестов.

  2. Сбор тестов для запуска (pytest_collection и входящие в него хуки): Тестовая сессия официально начинается с хука pytest_sessionstart, после чего pytest ищет и собирает все тестовые файлы и функции, которые нужно запустить. В случае использования сложной логики процесса сбора или обработки собранных тестов (pytest_collection_modifyitems) этот этап может стать бутылочным горлышком.

  3. Выполнение тестов (pytest_runtest_protocol и входящие в него хуки) - это основной этап процесса. И его стоит разбить на фазы:
    - Setup: подготовка фикстур и окружения.
    - Call: выполнение кода теста.
    - Teardown: Очистка ресурсов после теста.
    Именно на этом этапе часто скрываются проблемы, связанные с медленными фикстурами или логикой перезапуска упавших тестов.

  4. Завершение тестовой сессии (pytest_sessionfinish): Финальный этап, где могут выполняться такие операции, как сборка отчета, отправка результатов во внешние системы (например, Allure) и очистка глобальных ресурсов.

Наша цель — собрать данные о времени выполнения для каждого из этих этапов. Эти метрики мы будем отправлять в базу данных (в нашем примере — ClickHouse). Это позволит нам отслеживать динамику производительности, строить графики и мгновенно замечать, когда и на каком именно этапе наши тесты начали замедляться.

Архитектура сборщика метрик

Чтобы не раздувать conftest.py и отделить логику сбора метрик от остального кода, мы оформим наш сборщик в виде отдельного плагина. Такая структура упростит его поддержку и переиспользование.
Наш плагин будет реализован в виде класса MetricsCollector, который мы подключим в pytest на этапе конфигурации.

.
└── plugins
    └── metrics_collector
        └── collector.py

Если есть желание больше узнать о плагинах для pytest, рекомендую ознакомиться с документацией, докладами Александра Волкова (heisenbug / youtube) и Андрея Донцова(heisenbug / youtube)

Как уже было отмечено выше, большинству этапов выполнения тестов соответствует один или несколько pytest-хуков, в связи с чем, основным инструментом для замера времени выполнения хуков станут hookwrapper-ы. Это специальные методы, которые "оборачивают" ста��дартные хуки pytest, позволяя нам выполнить код до и после их основного вызова — идеальное место для запуска и остановки таймера.

Важно понимать, что код для замера метрик может быть не единственным, кто реализует тот или иной хук. По умолчанию порядок их вызова не гарантирован. Для управления этим порядком pytest предоставляет параметры tryfirst и trylast в декораторе @pytest.hookimpl():

  • tryfirst=True: при использовании такого аргумента ваша реализация хука будет вызвана раньше большинства других.

  • trylast=True: при использовании такого аргумента ваша реализация будет вызвана позже большинства других.

Также следует учитывать, что логика работы hookwrapper'ов напоминает собой луковицу: hookwrapper, который запускается первым, завершит свою работу последним.

Еще один ключевой принцип реализации сбора метрик — надежность. Сбор метрик не должен влиять на результаты тестов. Любые ошибки внутри сборщика мы будем перехватывать и логгировать, но не пробрасывать наружу, чтобы не уронить процесс выполнения тестов. Сразу заведем для этих целей пару методов для обработки исключений.

# ./pugins/metrics_collector/collector.py

import time
import traceback
from pathlib import Path

import pytest

BASE_METRICS_DIR = (
    Path(__file__).resolve().parent.parent.parent / "tmp_files/metrics"
)


class MetricsCollector:
    """Сборщик технических метрик."""

    def __init__(self):
        self.stats = {}
        self.errors = []
        BASE_METRICS_DIR.mkdir(parents=True, exist_ok=True)

    def collect_error(self, error: Exception) -> None:
        """Сохранить трейсбек исключения в список."""
        self.errors.append(
            "\n".join(
                traceback.TracebackException.from_exception(
                    error, capture_locals=True
                ).format(chain=True)
            )
        )

    @contextmanager
    def handle_errors(self):
        """Контекстный менеджер для безопасного выполнения кода."""
        try:
            yield
        except Exception as error:
            self.collect_error(error)

    @pytest.hookimpl(hookwrapper=True)
    def pytest_<hook_name>(...):
        start_time = time.monotonic()
        # чтобы гарантировано собрать метрики, используем try-finally
        try:
            yield
        finally:
            duration = time.monotonic() - start_time
            self.stats["<hook_name>"] = duration

    @staticmethod
    def write_to_file(file_name, content):
        with open(BASE_METRICS_DIR / file_name, "w") as f:
            f.write(content)

    def finalize_metrics(self):
        if self.errors:
            self.write_to_file(
                "errors",
                "\n\n".join(self.errors)
            )

P.S.: Полный код тестового проекта можно посмотреть на Github.

Реализация. Часть 1 - собираем метрики сессии

Теперь, когда мы разобрали основные моменты, перейдем к реализации сборщика.

Начнем с небольшой настройки окружения для корректной работы pytest и создания нескольких тестов, метрики которых мы сможем собрать.
Создадим в корневой директории проекта файл pytest.ini и добавим в нем в Path директорию проекта:

[pytest]
pythonpath = .

А также добавим пару тестов в ./tests/test_example.py:

import time


def test_example_short():
    assert True


def test_example_long():
    time.sleep(1)
    assert True

В итоге получим следующую структуру проекта:

.
├── plugins
│   └── metrics_collector
│       └── collector.py
├── pytest.ini
└── tests
    ├── conftest.py
    └── test_example.py

Перейдем к сбору метрик.

Начнем с простого - соберем данные о длительности сессии, а именно время исполнения хуков pytest_sessionstart и pytest_sessionfinish, а также время от старта до завершения сессии.
Для этого нам понадобится реализовать в нашем сборщике два hookwrapper'а для указанных хуков:

# ./pugins/metrics_collector/collector.py

from datetime import datetime, timezone
...


class MetricsCollector:
    """Сборщик технических метрик."""

    def __init__(self):
        self.stats = {}
        # нам пригодится атрибут для хранения промежуточных данных,
        # которые не попадут в конечную версию метрик
        self.temp_stats = {}
        self.errors = []
        BASE_METRICS_DIR.mkdir(parents=True, exist_ok=True)

    ...

    @pytest.hookimpl(hookwrapper=True)
    def pytest_sessionstart(self, session):
        # зафиксируем дату и время старта сессии
	    self.stats['run_timestamp'] = (
		    datetime.now(timezone.utc).isoformat()
		)
        start_time = time.monotonic()
        self.temp_stats["session_started_at"] = start_time
        try:
            yield
        finally:
            hook_duration = time.monotonic() - start_time
            self.stats["pytest_sessionstart_duration"] = hook_duration

    @pytest.hookimpl(hookwrapper=True)
    def pytest_sessionfinish(self, session, exitstatus):
        start_time = time.monotonic()
        try:
            yield
        finally:
            finish_time = time.monotonic()
            hook_duration = finish_time - start_time
            self.stats["pytest_sessionfinish_duration"] = hook_duration
            session_duration = (
                finish_time - self.temp_stats["session_started_at"]
            )
            self.stats["session_duration"] = session_duration

    ...

Мы добавили сбор первых метрик, но чтобы увидеть результат, нам не хватает двух вещей:

  1. Плагин со сборщиком необходимо подключить.

  2. Необходимо реализовать вывод результата сбора метрик.

Начнем с подключения.
Проще всего импортировать наш класс-сборщик в conftest.py и подключить его в хуке pytest_configure:

# ./tests/conftest.py

from plugins.metrics_collector.collector import MetricsCollector


def pytest_configure(config):
    config.pluginmanager.register(MetricsCollector(), "metrics_collector")

Теперь реализуем вывод результата.
Добавим немного универсальности и реализуем сохранение метрик в JSON-файл.
Для этого доработаем метод write_to_file, чтобы он умел сохранять данные в виде JSON:

class MetricsCollector:
    ...

    @staticmethod
    def write_to_file(file_name, content, as_json=False):
        with open(BASE_METRICS_DIR / file_name, "w") as f:
            if as_json:
                f.write(json.dumps(content, indent=4))
            else:
                f.write(content)

Добавим сохранение метрик в метод finalize_metrics, а также вызов данного метода.
Указанный метод стоит вызывать на последних этапах работы pytest. Он может быть в конце pytest_sessionfinish или в одном из хуков, вызываемых после него, например, в pytest_terminal_summary:

class MetricsCollector:
    ...
    @pytest.hookimpl(hookwrapper=True)
    def pytest_sessionfinish(self, session, exitstatus):
        ...
        try:
            yield
        finally:
            ...
            # перестрахуемся на случай сбоя записи в файл
            with self.handle_errors():
                self.finalize_metrics()

    def finalize_metrics(self):
        """Завершает сбор метрик, записывает ошибки, если есть."""
        self.write_to_file("session_metrics.json", self.stats, as_json=True)
        if self.errors:
            self.write_to_file(
                "errors",
                "\n\n".join(self.errors)
            )

Запустим pytest, чтобы увидеть первые метрики:

{
    "run_timestamp": "2025-10-02T12:46:42.160686+00:00",
    "pytest_sessionstart_duration": 0.0016916579916141927,
    "pytest_sessionfinish_duration": 0.0013223300338722765,
    "session_duration": 1.0096204479923472
}

Мы собрали метрики с длительностью сессии и нескольких основных pytest-хуков. Аналогичным образом мы можем добавить метрики для сбора длительности коллекта тестов в целом и отдельных хуков данного процесса в частности:

    @pytest.hookimpl(hookwrapper=True)
    def pytest_collection(self, session):
        """Хук для замера полного время времени сб��ра тестов."""
        start_time = time.monotonic()
        try:
            yield
        finally:
            hook_duration = time.monotonic() - start_time
            self.stats["pytest_collection_duration"] = hook_duration

    @pytest.hookimpl(hookwrapper=True)
    def pytest_collectstart(self, collector):
        """Хук вызывается для каждого коллектора перед сбором тестов.

        Будем хранить общее время выполнения данного хука для всех коллекторов.
        """
        start_time = time.monotonic()
        try:
            yield
        finally:
            hook_duration = time.monotonic() - start_time
            self.stats["pytest_collectstart_duration"] = (
                self.stats.get("pytest_collectstart_duration",  0) + hook_duration
            )

    @pytest.hookimpl(hookwrapper=True)
    def pytest_collection_modifyitems(self, session, config, items):
        """Замер времени модификации собранных тестов."""
        start_time = time.monotonic()
        try:
            yield
        finally:
            hook_duration = time.monotonic() - start_time
            self.stats["pytest_collection_modifyitems_duration"] = hook_duration

    @pytest.hookimpl(hookwrapper=True)
    def pytest_collection_finish(self, session):
        """Замер времени завершения сбора тестов."""
        start_time = time.monotonic()
        try:
            yield
        finally:
            hook_duration = time.monotonic() - start_time
            self.stats["pytest_collection_finish_duration"] = hook_duration

Реализация. Часть 2 — рефакторинг для подготовки к сбору метрик тестов

Ранее мы разделили процесс прогона тестов на 4 этапа и на данный момент реализовали сбор метрик для двух из них - сбор тестов и завершение тестовой сессии, плюс некоторые общие метрики, такие как общая продолжительность сессии.
Этап запуска и конфигурации pytest реже других этапов подвергается модификации, в связи с чем в рамках статьи сбор метрик данного этапа не будет рассматриваться, но будет реализован в коде проекта, ссылка на который была указана в начале статьи.

Настало время перейти к сбору данных по каждому отдельному тесту. Такие метрики помогут найти "бутылочное горлышко" на уровне конкретных фикстур, то есть в setup или teardown фазах.

Здесь мы сталкиваемся с важной особенностью - в отличие от большинства других хуков pytest, хуки, связанные с выполнением тестов, вызываются многократно - для каждого теста. Если мы добавим всю эту новую, сложную логику в наш существующий MetricsCollector, он станет слишком громоздким.

Чтобы упростить поддержку и разделить ответственность, проведем небольшой рефакторинг, выделив сбор метрик тестов в отдельный класс-сборщик и создав базовый класс для сборщиков, куда уберем общий для них код.

Начнем с MetricsCollector:

  1. Переименуем файл, в котором хранится данный класс (./plugins/metrics_collector/collector.py) в session_metrics_collector.py

  2. Сам класс переименуем в SessionMetricsCollector

Теперь займемся базовым классом:

  1. Создадим файл ./plugins/metrics_collector/base_metrics_collector.py, где создадим базовый класс-сборщик.

  2. Внутри данного файла создадим класс, куда перенесем из SessionMetricsCollector методы, которые пригодятся в обоих планируемых сборщиках:

import json
import traceback
from contextlib import contextmanager
from pathlib import Path

BASE_METRICS_DIR = (
    Path(__file__).resolve().parent.parent.parent / "tmp_files/metrics"
)


class BaseMetricsCollector:
    """Сборщик технических метрик."""

    def __init__(self):
        self.stats = {}
        self.temp_stats = {}
        self.errors = []
        BASE_METRICS_DIR.mkdir(parents=True, exist_ok=True)

    def collect_error(self, error: Exception) -> None:
        """Сохранить трейсбек исключения в список."""
        self.errors.append(
            "\n".join(
                traceback.TracebackException.from_exception(
                    error, capture_locals=True
                ).format(chain=True)
            )
        )

    @contextmanager
    def handle_errors(self):
        """Контекстный менеджер для безопасного выполнения кода."""
        try:
            yield
        except Exception as error:
            self.collect_error(error)

    @staticmethod
    def write_to_file(file_name, content, as_json=False):
        with open(BASE_METRICS_DIR / file_name, "w") as f:
            if as_json:
                f.write(json.dumps(content, indent=4))
            else:
                f.write(content)

После чего унаследуем SessionMetricsCollector от BaseMetricsCollector:

# ./plugins/metrics_collector/session_metrics_collector.py
...

from .base_metrics_collector import BaseMetricsCollector


class SessionMetricsCollector(BaseMetricsCollector):
    """Сборщик технических метрик тестовой сессии."""
    ...

И, наконец, создадим файл ./plugins/metrics_collector/tests_metrics_collector.py, в котором объявим класс-сборщик метрик тестов, унаследовав его от базового класса:

# ./plugins/metrics_collector/tests_metrics_collector.py
from .base_metrics_collector import BaseMetricsCollector


class TestMetricsCollector(BaseMetricsCollector):
    """Сборщик технических метрик тестов."""

Для удобства работы с плагином организуем его в пакет, добавив __init__.py и импортировав в него классы-сборщики:

# ./plugins/metrics_collector/__init__.py
from .session_metrics_collector import SessionMetricsCollector
from .tests_metrics_collector import TestMetricsCollector

__all__ = [
    "SessionMetricsCollector",
    "TestMetricsCollector",
]

После чего доработаем conftest.py, подключив новый сборщик:

import pytest

from plugins.metrics_collector import (
    SessionMetricsCollector,
    TestMetricsCollector,
)


@pytest.hookimpl(tryfirst=True)
def pytest_configure(config):
    session_metric_collector = SessionMetricsCollector()
    config.pluginmanager.register(
        session_metric_collector, "session_metric_collector"
    )
    session_metric_collector.begin_configure()
    config.pluginmanager.register(
        TestMetricsCollector(), "test_metrics_collector"
    )

Реализация. Часть 3 - Сбор метрик тестов

Прежде чем начать собирать метрики тестов, обозначим какие данные мы хотим получить. Возьмем минимальный набор: общее время теста, время отдельных его фаз и статус теста.
Метрики тестов в итоге будут представлять собой список словарей, которые будут содержать название теста, его параметры (если использована параметризация), статус и временные показатели.

Добавим соответствующий список в сборщик и соберем информацию о названии и параметрах теста, а также общее время выполнения каждого теста. В этом нам поможет хук pytest_runtest_protocol, который оборачивает все фазы выполнения каждого теста.

import time
from datetime import datetime, timezone

import pytest

from .base_metrics_collector import BaseMetricsCollector


class TestMetricsCollector(BaseMetricsCollector):
    """Сборщик технических метрик тестов."""

    def __init__(self):
        super().__init__()
        self.tests_stats = []

    @staticmethod
    def get_test_name_and_params(item):
        """Выделяет из item название теста и параметры."""
        test_name = item.nodeid.split("[")[0]
        parameter = item.callspec.id if hasattr(item, "callspec") else ""
        return test_name, parameter

    @pytest.hookimpl(hookwrapper=True)
    def pytest_runtest_protocol(self, item, nextitem):
        """Хук для замера полного время выполнения теста."""
        start_time = time.monotonic()
        test_name, parameter = self.get_test_name_and_params(item)
        self.stats["nodeid"] = item.nodeid
        self.stats["test_name"] = test_name
        self.stats["parameter"] = parameter
        # зафиксируем дату и время старта теста
	    self.stats['run_timestamp'] = (
		    datetime.now(timezone.utc).isoformat()
		)
        try:
            yield
        finally:
            self.stats["total_duration"] = time.monotonic() - start_time
            self.tests_stats.append(self.stats)
            self.stats = {}

    def finalize_metrics(self):
        """Завершает сбор метрик, записывает ошибки, если есть."""
        self.write_to_file(
            "test_metrics.json", self.tests_stats, as_json=True
        )
        if self.errors:
            self.write_to_file("test_errors.log", "\n\n".join(self.errors))

    @pytest.hookimpl(trylast=True)
    def pytest_sessionfinish(self, session, exitstatus):
        """Хук для завершения сессии pytest."""
        with self.handle_errors():
            self.finalize_metrics()

Теперь определим статус теста и время прохождения отдельных его фаз.
Поскольку статус есть у каждой фазы теста, мы присвоим каждому статусу, который может нам встретиться, определенный "вес" и в качестве итогового статуса используем самый "тяжелый".

# ./plugins/metrics_collector/tests_metrics_collector.py
...
from enum import StrEnum
...


class TestStatus(StrEnum):
    UNKNOWN = "UNKNOWN"
    PASSED = "PASSED"
    SKIPPED = "SKIPPED"
    FAILED = "FAILED"
    ERROR = "ERROR"


STATUS_WEIGHTS = {
    TestStatus.UNKNOWN: 0,
    TestStatus.PASSED.value: 1,
    TestStatus.SKIPPED.value: 2,
    TestStatus.FAILED.value: 3,
    TestStatus.ERROR.value: 4,
}


class TestMetricsCollector(BaseMetricsCollector):
    """Сборщик технических метрик тестов."""
    ...

    def set_status(self, report):
        """Определение статуса теста."""
        with self.handle_errors():
            saved_status = self.stats.get("status", TestStatus.UNKNOWN.value)
            current_status = TestStatus(report.outcome.upper())
            if (
                current_status == TestStatus.FAILED
                and report.when in ("setup", "teardown")
            ):
                current_status = TestStatus.ERROR
            if STATUS_WEIGHTS[current_status.value] > STATUS_WEIGHTS[saved_status]:
                self.stats["status"] = current_status.value

    @pytest.hookimpl(trylast=True)
    def pytest_runtest_logreport(self, report):
        """Хук вызывается в конце каждой фазы теста."""
        self.set_status(report)
        match report.when:
            case "setup":
                self.stats["setup_duration"] = report.duration
            case "call":
                self.stats["call_duration"] = report.duration
            case "teardown":
                self.stats["teardown_duration"] = report.duration
    ...

Для определения статуса теста мы использовали хук pytest_runtest_logreport. В нем же получили время выполнения каждой фазы теста из report.duration.

Добавим параметризацию в один из наших тестов для большей наглядности и запустим pytest, чтобы увидеть результат:

# ./tests/test_example.py
...

import pytest


@pytest.mark.parametrize("value", (True, False))
def test_example_short(value):
    assert value
 
...
[
    {
        "nodeid": "tests/test_example.py::test_example_short[True]",
        "test_name": "tests/test_example.py::test_example_short",
        "parameter": "True",
        "run_timestamp": "2025-10-02T14:32:27.449968+00:00",
        "status": "PASSED",
        "setup_duration": 0.0002259399916511029,
        "call_duration": 0.00010595099593047053,
        "teardown_duration": 9.156300802715123e-05,
        "total_duration": 0.0009245050023309886
    },
    {
        "nodeid": "tests/test_example.py::test_example_short[False]",
        "test_name": "tests/test_example.py::test_example_short",
        "parameter": "False",
        "run_timestamp": "2025-10-02T14:32:27.450922+00:00",
        "status": "FAILED",
        "setup_duration": 0.00011118900147266686,
        "call_duration": 0.0002075020020129159,
        "teardown_duration": 0.00010643999848980457,
        "total_duration": 0.009049831991433166
    },
    {
        "nodeid": "tests/test_example.py::test_example_long",
        "test_name": "tests/test_example.py::test_example_long",
        "parameter": "",
        "run_timestamp": "2025-10-02T14:32:27.460005+00:00",
        "status": "PASSED",
        "setup_duration": 7.675700180698186e-05,
        "call_duration": 1.0008193050016416,
        "teardown_duration": 0.00018661899957805872,
        "total_duration": 1.0017387109983247
    }
]

Сохраняем историю. Загрузка метрик в ClickHouse

Мы собрали основные метрики сессии и отдельных тестов. Чтобы ими пользоваться и отслеживать изменения нам необходимо место, где будут храниться метрики.
Для этих целей в Озон Банке используется ClickHouse, в который метрики отправляются крупными батчами отдельным CI-скриптом после завершения работы pytest.

Давайте реализуем скрипт для записи метрик в нашем тестовом проекте.
Для демонстрации используем ClickHouse в docker контейнере.

Создадим файл ./clickhouse_init/init-db.sql и подготовим в нем скрипт для создания необходимых таблиц в базе данных:

-- таблица для хранения метрик сессии
CREATE TABLE IF NOT EXISTS default.session_metrics
(
    run_id UUID,
    run_timestamp DateTime64(3),
    session_duration Float64,
    pytest_configure_duration Float64,
    pytest_sessionstart_duration Float64,
    pytest_collectstart_duration Float64,
    pytest_collection_modifyitems_duration Float64,
    pytest_collection_finish_duration Float64,
    pytest_collection_duration Float64,
    pytest_sessionfinish_duration Float64
)
ENGINE = MergeTree()
ORDER BY run_timestamp;

-- таблица для хранения метрик тестов
CREATE TABLE IF NOT EXISTS default.test_metrics
(
    run_id UUID,
    run_timestamp DateTime64(3),
    nodeid String,
    test_name String,
    parameter String,
    status Enum('PASSED' = 1, 'FAILED' = 2, 'SKIPPED' = 3, 'ERROR' = 4, 'UNKNOWN' = 0),
    setup_duration Float64,
    call_duration Float64,
    teardown_duration Float64,
    total_duration Float64
)
ENGINE = MergeTree()
ORDER BY (run_timestamp, nodeid);

Создадим docker-compose.yml в корне проекта, в котором пропишем настройки ClickHouse и используем SQL-скрипт, описанный выше при запуске контейнера:

services:
  clickhouse-server:
    image: clickhouse/clickhouse-server
    container_name: example-clickhouse-server
    ports:
      - "8123:8123"
      - "9000:9000"
    environment:
      # в реальных проектах пароль стоит хранить в секретах :)
      - CLICKHOUSE_PASSWORD=password
    volumes:
      - clickhouse-data:/var/lib/clickhouse/
      - ./clickhouse_init:/docker-entrypoint-initdb.d/
    ulimits:
      nproc: 65535
      nofile:
        soft: 262144
        hard: 262144

volumes:
  clickhouse-data:

И запустим контейнер: docker compose up -d.

Добавим python-скрипт для загрузки собранных в JSON-файлы метрик с использованием библиотеки clickhouse_connect (upload_metrics.py):

Скрипт для загрузки метрик в базу данных
import json
import uuid
from pathlib import Path

import clickhouse_connect

# --- Настройки ---
CLICKHOUSE_HOST = "localhost"
CLICKHOUSE_PORT = 8123
METRICS_DIR = Path(__file__).parent / "tmp_files/metrics"
SESSION_METRICS_FILE = METRICS_DIR / "session_metrics.json"
TEST_METRICS_FILE = METRICS_DIR / "test_metrics.json"


def upload_session_metrics(client, run_id):
    """Читает JSON-файл с метриками сессии и загружает их в ClickHouse."""
    if not SESSION_METRICS_FILE.exists():
        print(f"Файл с метриками сессии не найден: {SESSION_METRICS_FILE}")
        return
    print("Загрузка метрик сессии...")
    with open(SESSION_METRICS_FILE, "r") as f:
        session_data = json.load(f)

    session_data["run_id"] = run_id
    column_names = [
        'run_id',
        'run_timestamp',
        'session_duration',
        'pytest_configure_duration',
        'pytest_sessionstart_duration',
        'pytest_collectstart_duration',
        'pytest_collection_modifyitems_duration',
        'pytest_collection_finish_duration',
        'pytest_collection_duration',
        'pytest_sessionfinish_duration',
    ]
    row_data = [session_data.get(col) for col in column_names]

    try:
        # 3. Передаем и данные, и названия колонок в метод insert
        client.insert(
            "default.session_metrics",
            [row_data],
            column_names=column_names
        )
        print("Метрики сессии успешно загружены.")
    except Exception as error:
        print(f"Ошибка при загрузке метрик сессии: {error}")


def upload_test_metrics(client, run_id):
    """Читает JSON-файл с метриками тестов и загружает их в ClickHouse."""
    if not TEST_METRICS_FILE.exists():
        print(f"Файл с метриками тестов не найден: {TEST_METRICS_FILE}")
        return
    print("Загрузка метрик тестов...")
    with open(TEST_METRICS_FILE, "r") as f:
        tests_metrics = json.load(f)
    if not tests_metrics:
        print("Файл с метриками тестов пуст. Пропускаем загрузку.")
        return
    column_names = [
        'run_id',
        'run_timestamp',
        'nodeid',
        'test_name',
        'parameter',
        'status',
        'setup_duration',
        'call_duration',
        'teardown_duration',
        'total_duration',
    ]
    rows_data = []
    for singl_test_metrics in tests_metrics:
        singl_test_metrics['run_id'] = run_id
        row = [singl_test_metrics.get(col) for col in column_names]
        rows_data.append(row)

    try:
        client.insert(
            "default.test_metrics",
            rows_data,
            column_names=column_names
        )
        print(f"Метрики тестов ({len(rows_data)} шт.) успешно загружены.")
    except Exception as error:
        print(f"Ошибка при загрузке метрик тестов: {error}")


def main():
    run_id = str(uuid.uuid4())
    print(f"Запуск загрузки для run_id: {run_id}")
    try:
        client = clickhouse_connect.get_client(
            host=CLICKHOUSE_HOST,
            port=CLICKHOUSE_PORT,
            username="default",
            password="password",
        )
        client.ping()
        print("Успешное подключение к ClickHouse.")
    except Exception as error:
        print(f"Не удалось подключиться к ClickHouse: {error}")
        return
    upload_session_metrics(client, run_id)
    upload_test_metrics(client, run_id)
    print("\nПроцесс загрузки завершен.")


if __name__ == "__main__":
    main()

И небольшой bash-скрипт для проверки того, что данные дошли до БД (query_metrics.sh):

#!/bin/bash

HOST="http://localhost:8123"
USER="default"
PASSWORD="password"

echo "--- Метрики сессии (session_metrics) ---"
curl -sS "$HOST" \
  --user "$USER:$PASSWORD" \
  --data "SELECT run_id, run_timestamp, session_duration, pytest_collection_duration FROM default.session_metrics ORDER BY run_timestamp DESC LIMIT 5 FORMAT TSVWithNames"

echo -e "\n\n--- Последние метрики тестов (test_metrics) ---"
curl -sS "$HOST" \
  --user "$USER:$PASSWORD" \
  --data "SELECT run_id, run_timestamp, test_name, parameter, status, total_duration FROM default.test_metrics ORDER BY run_timestamp DESC LIMIT 10 FORMAT TSVWithNames"

Запустим поочередно наши скрипты:

python upload_metrics.py
bash query_metrics.sh

Видим, что данные успешно загружены:

--- Метрики сессии (session_metrics) ---
run_id                                  run_timestamp           session_duration        pytest_collection_duration
1c8285c5-9e73-49f8-82b1-67b04c23fb9c    2025-10-02 14:32:27.440 1.0301336349948542      0.0072877769998740405


--- Последние метрики тестов (test_metrics) ---
run_id                                  run_timestamp           test_name                                     parameter status  total_duration
1c8285c5-9e73-49f8-82b1-67b04c23fb9c    2025-10-02 14:32:27.460 tests/test_example.py::test_example_long                PASSED  1.0017387109983247
1c8285c5-9e73-49f8-82b1-67b04c23fb9c    2025-10-02 14:32:27.450 tests/test_example.py::test_example_short       False   FAILED  0.009049831991433166
1c8285c5-9e73-49f8-82b1-67b04c23fb9c    2025-10-02 14:32:27.449 tests/test_example.py::test_example_short       True    PASSED  0.0009245050023309886

Сбор метрик при параллельном запуске тестов

Поскольку мы говорим о необходимости уменьшения времени выполнения тестов, то сбор метрик для поиска и оптимизации узких мест в хуках pytest видится следующим этапом, после внедрения параллелизации выполнения тестов.
Наша текущая реализация не рассчитана на параллельное исполнение тестов.
Давайте это исправим.

Рассмотрим данный процесс на примере pytest-xdist, как одной из самых популярных библиотек для параллелизации тестов.
Основная особенность при использовании pytest-xdist заключается в том, что тесты запускаются в воркерах, каждый из которых выполняется в отдельном процессе.
Это повлияет на процесс сбора метрик, в особенности метрик тестов. Нам понадобится синхронизировать данные, собранные в отдельных процессах.

Начнем с того, что добавим в наше окружение xdist: pip install pytest-xdist.

Теперь попробуем запустить pytest с использованием xdist. С учетом нашего количества тестов для наглядности используем только два воркера: pytest -n 2

В результате мы видим, что метрики тестов не собрались вообще - в файле лишь пустой список, а в метриках сессии пропали данные о нескольких хуках.

Все дело в том, что теперь часть хуков pytest выполняются только внутри воркеров xdist, другая часть (например, pytest_sessionstart и pytest_sessionfinish) выполняются как в основном процессе, так и в воркерах, и метрики необходимо собирать с учетом данного обстоятельства.
В частности, выполнение всех тестов теперь происходит внутри воркеров xdist. Каждый воркер пытается записать результаты в файл, но в конечном итоге файл перезаписывается основным процессом, внутри которого тесты не запускались и список с метриками тестов пуст.
Так как основные метрики сессии у нас по-прежнему собираются, мы не будем в тестовом проекте дорабатывать эту часть, но в реальном проекте стоит собирать метрики сессии как в основном процессе, так и в воркерах, поскольку некоторые операции могут достаточно быстро отрабатывать в основном процессе, но замедляться при одновременном запуске в воркерах (особенно CPU-bound код).

Чтобы разобраться со способом синхронизации данных, собранных в воркерах xdist, доработаем сбор метрик тестов.
Для реализации данного механизма в конце работы каждого из воркеров мы будем сохранять метрики во временный файл, уникальный для воркера, а в основном процессе, перед завершением сессии, загрузим данные воркеров, объединим их и запишем в общий файл.
Правки коснутся хука pytest_sessionfinish и добавим дополнительный метод для загрузки данных из файлов воркеров:

import json
import time
from datetime import datetime, timezone
from enum import StrEnum

import pytest
import xdist

from .base_metrics_collector import (
    BASE_METRICS_DIR,
    BaseMetricsCollector
)

WORKER_TEMP_FILE_PREFIX = "worker_"
...

class TestMetricsCollector(BaseMetricsCollector):
    ...
    def load_workers_metrics(self) -> None:
        """Загрузка метрик воркеров из файлов."""
        worker_stats_files = (
            BASE_METRICS_DIR.glob(f"{WORKER_TEMP_FILE_PREFIX}*.json")
        )
        with self.handle_errors():
            for file_path in worker_stats_files:
                with open(file_path, "r") as f:
                    worker_stats = f.read()
                    self.tests_stats.extend(json.loads(worker_stats))

    @pytest.hookimpl(trylast=True)
    def pytest_sessionfinish(self, session, exitstatus):
        """Хук для завершения сессии pytest."""
        # в конце работы воркера сохраняем его метрики в файл
        if xdist.is_xdist_worker(session):
            worker_id = xdist.get_xdist_worker_id(session)
            self.write_to_file(
                f'{WORKER_TEMP_FILE_PREFIX}{worker_id}.json',
                self.tests_stats,
                as_json=True,
            )
        else:
            # в главном процессе загружаем метрики всех воркеров
            self.load_workers_metrics()
            with self.handle_errors():
                self.finalize_metrics()

Еще раз запустим pytest с использованием xdist (pytest -n 2). Теперь все метрики тестов вновь собираются в файл ./tmp_files/metrics/test_metrics.json.

P.S.: Полный код примера сбора метрик с использованием xdist для запуска тестов можно посмотреть в отдельной ветке тестового проекта на Github.

Визуализация и анализ: Превращаем цифры в результат

До сих пор мы занимались технической работой: строили конвейер по сбору данных. Но сами по себе метрики — это сухие цифры в базе данных. Пользу они начинают приносить, когда мы превращаем их в наглядные графики и находим ответы на главный вопрос: "Где именно наши тесты тормозят?"
В этой заключительной части мы перейдем от нашего учебного примера к реальным боевым кейсам из Озон Банка. Я покажу, как с помощью дашбордов в Grafana мы выявляем места для оптимизации и какие неочевидные проблемы производительности нам удалось обнаружить благодаря этой системе.
Да, в нашей внутренней системе мы собираем еще больше данных, но принципы анализа остаются теми же.

Метрики сессии:
## Мета-данные:
ci_job_id: Int32
ci_pipeline_id: Int32
test_project_id: Int32
test_project_branch: String
test_project_commit_sha: String
ci_username: String
upstream_project_id: Int32
upstream_service_name: String
upstream_commit_ref_slug: String
upstream_service_pipeline_id: Int32

## Глобальные временные показатели:
created_at: DateTime   # время запуска ранера, который оборачивает вызове pytest
started_at: DateTime   # время запуска тестовой сессии
finished_at: DateTime
session_duration: Float32
test_runner_duration: Float32

## Временные показатели хуков pytest:
pytest_sessionstart_duration: Float32
pytest_sessionfinish_duration: Float32
pytest_configure_duration: Float32
pytest_collection_duration: Float32
pytest_collection_modifyitems_duration: Float32
pytest_collection_finish_duration: Float32
pytest_runtestloop_duration: Float32 

## Метрики воркеров xdist
pytest_xdist_worker_pytest_collection_max_duration: Float32
pytest_xdist_node_collection_finished_max_duration: Float32
pytest_xdist_worker_collection_modifyitems_max_duration: Float32
pytest_xdist_workers_count: Int32
pytest_xdist_worker_max_runtime: Float32

# Результаты тестов
test_total: Int32
test_ok: Int32
test_fail: Int32
test_error: Int32
test_skip: Int32
test_unknown: Int32
total_retries: Int32

pytest_exit_code: Int32
```

#### Метрики тестов:
```
ci_job_id: Int32
ci_pipeline_id: Int32
test_name: String
param_name: String
test_param: String  # test_name[param_name]
started_at: DateTime
finished_at: DateTime
duration: Float32
status: Enum8('UNKNOWN' = 0, 'PASSED' = 1, 'FAILED' = 2, 'BROKEN' = 3, 'SKIPPED' = 4, 'ERROR' = 5)
pytest_xdist_worker_id: String
pytest_runtest_setup_duration: Float32
pytest_runtest_makereport_duration: Float32
pytest_fixture_setup_duration: Float32
pytest_itemcollected_duration: Float32
is_rerun: bool = Bool

pytest_runtest_teardown_duration: Float32
pytest_runtest_call_duration: Float32
Метрики тестов
ci_job_id: Int32
ci_pipeline_id: Int32
test_name: String
param_name: String
test_param: String  # test_name[param_name]
started_at: DateTime
finished_at: DateTime
duration: Float32
status: Enum8('UNKNOWN' = 0, 'PASSED' = 1, 'FAILED' = 2, 'BROKEN' = 3, 'SKIPPED' = 4, 'ERROR' = 5)
pytest_xdist_worker_id: String
pytest_runtest_setup_duration: Float32
pytest_runtest_makereport_duration: Float32
pytest_fixture_setup_duration: Float32
pytest_itemcollected_duration: Float32
is_rerun: bool = Bool

pytest_runtest_teardown_duration: Float32
pytest_runtest_call_duration: Float32

P.S.: я не стал указывать часть специфических для нашего проекта метрик, смысл которых не будет ясен без контекста.
Отмечу также, что для подключения Grafana к ClickHouse используется плагин Altinity. Способ его подключения и использования можно посмотреть в документации.

Ловим "хвосты"

При параллельном запуске тестов в идеальных условиях тесты должны завершаться во всех воркерах (будь это xdist или иная система параллелизации) примерно одновременно. Это позволяет наиболее эффективно утилизировать ресурсы и снизить общее время выполнения тестов.

Однако, в реальных условиях зачастую возникает проблема длинных "хвостов" - ситуация, в которой тесты в одном из воркеров выполняются существенно дольше, чем в других.
Для выявления таких случаев мы используем метрики - вычисляем продолжительность выполнения тестов в каждом воркере, а затем сравниваем их, чтобы выявить хвосты.
Анализ таких ситуаций помогает улучшать алгоритм распределения тестов по воркерам.

Визуализация выполнена в виде таблицы с возможностью быстрого перехода в пайплайн и CI джобу для анализа причин:

Таблица формируется следующим запросом:
-- 1. Выбираем только первые запуски джоб в каждом пайплайне, отбрасывая перезапуски.
WITH FirstJobInPiplenes AS (
    SELECT
        ti.ci_pipeline_id AS ci_pipeline_id,
        ti.pytest_xdist_worker_id AS pytest_xdist_worker_id,
        toTimeZone(ti.started_at, 'Europe/Moscow') as started_at,
        ti.duration AS duration,
        ti.ci_job_id AS ci_job_id,
        argMin(ti.ci_job_id, started_at) AS first_ci_job_id
    FROM
        $table AS ti
        LEFT JOIN svalqa.testsession_indicators AS tsi
            ON ti.ci_pipeline_id = tsi.ci_pipeline_id
            AND ti.ci_job_id = tsi.ci_job_id
    WHERE
        $timeFilter
    GROUP BY
        ti.ci_pipeline_id,
        ti.ci_job_id,
        ti.pytest_xdist_worker_id,
        ti.duration,
        started_at
    ORDER BY
        ci_pipeline_id
),
-- 2. Вычисляем продолжительность работы воркеров
WorkerDurations AS (
    SELECT
        ci_pipeline_id,
        ci_job_id,
        pytest_xdist_worker_id,
        min(started_at) as start_time,
        count(*) AS test_count_on_worker,
        round(sum(duration), 3) AS total_duration_on_worker_sec
    FROM
        FirstJobInPiplenes
    GROUP BY
        ci_pipeline_id,
        ci_job_id,
        pytest_xdist_worker_id
    ORDER BY
        ci_pipeline_id DESC,
        ci_job_id DESC,
        pytest_xdist_worker_id
),
-- 3. Расчитываем медиану и длину хвоста
TestTails AS (
    SELECT
        ci_pipeline_id,
        ci_job_id,
        start_time,
        round(quantile(0.5)(total_duration_on_worker_sec), 3) AS median_worker_duration,
        round(max(total_duration_on_worker_sec), 3) AS total_duration,  -- будем считать максимальное время общим временем прогона тестов, т.к. запускаются воркеры примерно одновременно
        total_duration - median_worker_duration AS tail_duration,
        round(((total_duration - median_worker_duration) / median_worker_duration) * 100, 1) AS percent_over_median
    FROM
        WorkerDurations
    GROUP BY
        ci_pipeline_id,
        ci_job_id,
        start_time
    ORDER BY
        ci_pipeline_id DESC,
        ci_job_id DESC
)
-- 4. Выводим данные запусков, длинна хвоста в которых превышает установленное предельное значение
SELECT
    start_time,
    ci_pipeline_id,
    ci_job_id,
    median_worker_duration,
    total_duration,
    tail_duration,
    percent_over_median
FROM
    TestTails
WHERE
    percent_over_median >= 20
ORDER BY
    start_time DESC,
    percent_over_median DESC

Анализ данной таблицы помог выявить основного виновника появления "хвостов" - им оказался pytest-rerunfailures.
Особенности реализации механизма реранов упавших тестов внутри одного запуска, реализованного в данной библиотеке, заключается в том, что упавший тест выполняется повторно на том же воркере внутри pytest_runtest_protocol. Из-за этого, в случае если какой-то из тестов стабильно падает, он последовательно запускается в одном и том же процессе без учета того, что в данный момент могут быть уже другие свободные воркеры, что приводит к образованию "хвоста".
Кроме того, важна сортировки тестов, от которой зависит порядок их выполнения. Если длинные тесты запускаются в конце сессии, то они могут вызывать образование "хвостов".

Так же отмечу, что "хвосты" видны и в таймлайне Allure-отчетов, но в таком виде их сложно анализировать и отслеживать статистику, так как обработка каждого случая вручную неэффективна.

Анализ длительности хуков pytest

Одним из первых мест для оптимизации, которое нам подсветили метрики, стал этап сбора тестов. При анализе времени выполнения хуков мы заметили, что pytest_collection отрабатывал аномально долго по сравнению с другими.
Это послужило сигналом к рефакторингу, в ходе которого мы оптимизировали внутреннюю логику обработки тестов. Как видно на графике, это позволило сократить время выполнения хука примерно на 60%, что стало большим выигрышем для всех запусков.

На графике данные за две недели: неделя до оптимизации и неделя после
На графике данные за две недели: неделя до оптимизации и неделя после

Как показывают эти примеры, система сбора метрик — это не просто накопление цифр, а удобный диагностический инструмент. Проблемы, которые раньше были невидимы — будь то несбалансированные "хвосты" в xdist или аномально долгий хук — становятся очевидными на графиках. Визуализация превращает догадки и предположения в конкретные измеримые задачи по оптимизации, позволяя принимать решения на основе данных и видеть результаты оптимизаций.

Если вам интересны такие подходы и вы хотите обсудить их вживую, приходите знакомиться! 19 и 20 октября наша команда будет на конференции Heisenbug. Ищите стенд Ozon Банка, чтобы пообщаться и поучаствовать в наших активностях.