Как стать автором
Обновить

Асинхронный JSON-Logger для FastAPI

Время на прочтение10 мин
Количество просмотров22K

Всем привет.

Цель данной статьи и моей личной разработки - написать о том, как я придумал свой собственный и удобный формат json-журналирования специально для компании, в которой я работаю, но не нашел готовых решений для реализации, который позволит мне воплощать очень гибко и удобно некоторые вещи с единым наименованием полей, чтобы логгирование протекало быстро и асинхронно, а также: не заставляло бы меня писать много кода и обойтись одной-двумя библиотеками максимум.

Пошел искать решение в лоб: искал готовые решения в виде сторонних библиотек. Отыскал пару-тройку годных решений. Например библиотека json-logging. Всё хорошо: почитал, что делает и поставил не глядя. Данная библиотека имеет массу преимуществ:

  • может напрямую работать с FastAPI, Flask, Aiohttp, Sanic,

  • перехватывает uvicorn access log,

  • производит форматирование запроса в JSON формат.

Данная библиотека генерирует прекрасный лог сообщения с полями запроса, который поступил в сервис:

Код
{
	"type": "request",
	"written_at": "2017-12-23T16:55:37.280Z",
	"written_ts": 1514048137280721000,
	"component_id": "-",
	"component_name": "-",
	"component_instance_idx": 0,
	"correlation_id": "1975a02e-e802-11e7-8971-28b2bd90b19a",
	"remote_user": "user_a",
	"request": "/index.html",
	"referer": "-",
	"x_forwarded_for": "-",
	"protocol": "HTTP/1.1",
	"method": "GET",
	"remote_ip": "127.0.0.1",
	"request_size_b": 1234,
	"remote_host": "127.0.0.1",
	"remote_port": 50160,
	"request_received_at": "2017-12-23T16:55:37.280Z",
	"response_time_ms": 0,
	"response_status": 200,
	"response_size_b": "122",
	"response_content_type": "text/html; charset=utf-8",
	"response_sent_at": "2017-12-23T16:55:37.280Z"
}

Но у этой библиотеки обнаружились серьёзные недостатки:

  • Название полей в логе изменить нельзя без наследования Formatter и кастомизацию.

  • Нельзя никаким образом поставить в логирование тело ответа без написания собственного Middleware для FastAPI, который перехватит еще и тело ответа, что мы добавили в заголовки.

  • Логгирование перехватывало синхронный поток вывода логгера uvicorn.access.

Я не буду писать, как я пытался закостылить исправить первые два минуса, пока не столкнулся с тем, что мне всё-таки нужен асинхронный логер. Я лучше Вам расскажу, как я решил сделать собственное решение.
Для начала я решил отказаться от этой библиотеки сразу:

pip uninstall json-logging

Далее я хотел обозначить структуру журнала. Мне очень помог Pydantic для элегантного описания данного вопроса:

from typing import Union

from pydantic import BaseModel, Field


class BaseJsonLogSchema(BaseModel):
    """
    Схема основного тела лога в формате JSON
    """
    thread: Union[int, str]
    level: int
    level_name: str
    message: str
    source: str
    timestamp: str = Field(..., alias='@timestamp')
    app_name: str
    app_version: str
    app_env: str
    duration: int
    exceptions: Union[list[str], str] = None
    trace_id: str = None
    span_id: str = None
    parent_id: str = None

    class Config:
        allow_population_by_field_name = True


class RequestJsonLogSchema(BaseModel):
    """
    Схема части запросов-ответов лога в формате JSON
    """
    request_uri: str
    request_referer: str
    request_protocol: str
    request_method: str
    request_path: str
    request_host: str
    request_size: int
    request_content_type: str
    request_headers: str
    request_body: str
    request_direction: str
    remote_ip: str
    remote_port: str
    response_status_code: int
    response_size: int
    response_headers: str
    response_body: str
    duration: int

После того, как была сформирована структура тела запроса необходимо определить сам механизм форматирования. Формирование, что логично, производится с помощью создания класса JSONLogFormatter с наследованием от стандартного logging.Formatter.
Код test_project.utils.json_logger.py

import datetime
import json
import logging
import traceback

from test_project.main.settings import APP_NAME, \
    APP_VERSION, ENVIRONMENT
from test_project.utils.utils_schemas import BaseJsonLogSchema


class JSONLogFormatter(logging.Formatter):
    """
    Кастомизированный класс-форматер для логов в формате json
    """

    def format(self, record: logging.LogRecord, *args, **kwargs) -> str:
        """
        Преобразование объект журнала в json

        :param record: объект журнала
        :return: строка журнала в JSON формате
        """
        log_object: dict = self._format_log_object(record)
        return json.dumps(log_object, ensure_ascii=False)

    @staticmethod
    def _format_log_object(record: logging.LogRecord) -> dict:
        """
        Перевод записи объекта журнала
        в json формат с необходимым перечнем полей

        :param record: объект журнала
        :return: Словарь с объектами журнала
        """
        now = datetime \
            .datetime \
            .fromtimestamp(record.created) \
            .astimezone() \
            .replace(microsecond=0) \
            .isoformat()
        message = record.getMessage()
        duration = record.duration \
            if hasattr(record, 'duration') \
            else record.msecs
        # Инициализация тела журнала
        json_log_fields = BaseJsonLogSchema(
            thread=record.process,
            timestamp=now,
            level=record.levelno,
            level_name=LEVEL_TO_NAME[record.levelno],
            message=message,
            source=record.name,
            duration=duration,
            app_name=APP_NAME,
            app_version=APP_VERSION,
            app_env=ENVIRONMENT,
        )

        if hasattr(record, 'props'):
            json_log_fields.props = record.props

        if record.exc_info:
            json_log_fields.exceptions = \
                traceback.format_exception(*record.exc_info)

        elif record.exc_text:
            json_log_fields.exceptions = record.exc_text
        # Преобразование Pydantic объекта в словарь
        json_log_object = json_log_fields.dict(
            exclude_unset=True,
            by_alias=True,
        )
        # Соединение дополнительных полей логирования
        if hasattr(record, 'request_json_fields'):
            json_log_object.update(record.request_json_fields)

        return json_log_object

Нужно заставить FastApi ловить request и response, желательно с наличием body. Я сначала пытался обратиться к dependency подходу FastApi. Но понял, что не хочу описывать в каждом методе метод логирования. Естественно, к нам на помощь приходит Middleware.
Я написал класс, который позволяет внедрить в app-объект журналирование тела запроса и ответа, а также их полей: status code, headers, remote ip, protocol, etc. Класс я назвал LoggingMiddleware и все выполнение происходит методу async def __call__()

Код test_project.utils.middlewares.py

from test_project.main.settings import PORT
from test_project.utils.json_logger import EMPTY_VALUE
from test_project.utils.utils_schemas import RequestJsonLogSchema


class LoggingMiddleware:
    """
    Middleware для обработки запросов и ответов с целью журналирования
    """
    @staticmethod
    async def get_protocol(request: Request) -> str:
        protocol = str(request.scope.get('type', ''))
        http_version = str(request.scope.get('http_version', ''))
        if protocol.lower() == 'http' and http_version:
            return f'{protocol.upper()}/{http_version}'
        return EMPTY_VALUE

    @staticmethod
    async def set_body(request: Request, body: bytes) -> None:
        async def receive() -> Message:
            return {'type': 'http.request', 'body': body}

        request._receive = receive

    async def get_body(self, request: Request) -> bytes:
        body = await request.body()
        await self.set_body(request, body)
        return body

    async def __call__(
            self, request: Request, call_next: RequestResponseEndpoint,
            *args, **kwargs
    ):
        start_time = time.time()
        exception_object = None
        # Request Side
        try:
            raw_request_body = await request.body()
            # Последующие действия нужны, 
            # чтобы не перезатереть тело запроса
						# и не уйти в зависание event-loop'a
            # при последующем получении тела ответа
            await self.set_body(request, raw_request_body)
            raw_request_body = await self.get_body(request)
            request_body = raw_request_body.decode()
        except Exception:
            request_body = EMPTY_VALUE

        server: tuple = request.get('server', ('localhost', PORT))
        request_headers: dict = dict(request.headers.items())
        # Response Side
        try:
            response = await call_next(request)
        except Exception as ex:
            response_body = bytes(
                http.HTTPStatus.INTERNAL_SERVER_ERROR.phrase.encode()
            )
            response = Response(
                content=response_body,
                status_code=http.HTTPStatus.INTERNAL_SERVER_ERROR.real,
            )
            exception_object = ex
            response_headers = {}
        else:
            response_headers = dict(response.headers.items())
            response_body = b''
            async for chunk in response.body_iterator:
                response_body += chunk
            response = Response(
                content=response_body,
                status_code=response.status_code,
                headers=dict(response.headers),
                media_type=response.media_type
            )
        duration: int = math.ceil((time.time() - start_time) * 1000)
				# Инициализация и формирования полей для запроса-ответа
        request_json_fields = RequestJsonLogSchema(
            request_uri=str(request.url),
            request_referer=request_headers.get('referer', EMPTY_VALUE),
            request_protocol=await self.get_protocol(request),
            request_method=request.method,
            request_path=request.url.path,
            request_host=f'{server[0]}:{server[1]}',
            request_size=int(request_headers.get('content-length', 0)),
            request_content_type=request_headers.get(
                'content-type', EMPTY_VALUE),
            request_headers=json.dumps(request_headers),
            request_body=request_body,
            request_direction='in',
            remote_ip=request.client[0],
            remote_port=request.client[1],
            response_status_code=response.status_code,
            response_size=int(response_headers.get('content-length', 0)),
            response_headers=json.dumps(response_headers),
            response_body=response_body.decode(),
            duration=duration
        ).dict()
        # Хочется на каждый запрос читать 
        # и понимать в сообщении самое главное, 
        # поэтому message мы сразу делаем типовым
        message = f'{"Ошибка" if exception_object else "Ответ"} ' \
                  f'с кодом {response.status_code} ' \
                  f'на запрос {request.method} \"{str(request.url)}\", ' \
                  f'за {duration} мс'
        logger.info(
            message,
            extra={
                'request_json_fields': request_json_fields,
                'to_mask': True,
            },
            exc_info=exception_object,
        )
        return response

У меня есть одержимость по поводу асинхронного логирования в FastApi. Методы вывода потока в консоль, запись в файл, logstash - это тот же I/O процесс, как и сгонять по сетке в БД или в другой сервис, согласны? А так как это I/O процесс, то в асинхронном FastApi очень важно не замедлять работу такой простой штукой как логирование. Я нашел очень хорошую библиотеку, которая за одну конфигурацию перестраивает вывод в консоль в асинхронный режим (также можно провернуть и с файлами и отправкой в logstash, и другие ресурсы). Данная библиотека называется: asynclog. Настройка и установка производится очень просто. Не нужно иметь никаких классов и переопределять handler-ы: Нужно просто настроить логирование через словарь в конфигурации проекта и задать handler.

Создаем конфигурацию логера и использовать его везде. Благо, в python есть для этого конфигурация, которую можно воплотить в настройках проекта.

LOG_CONFIG = {
    'version': 1,
    'disable_existing_loggers': False,
    'formatters': {
        'json': {
            '()': 'test_project.utils.json_logger.JSONLogFormatter',
        },
    },
    'handlers': {
# Используем AsyncLogDispatcher для асинхронного вывода потока. 
        'json': {
            'formatter': 'json',
            'class': 'asynclog.AsyncLogDispatcher',
            'func': 'test_project.utils.json_logger.write_log',
        },
    },
    'loggers': {
        'test_project': {
            'handlers': ['json'],
            'level': 'DEBUG' if DEBUG else 'INFO',
            'propagate': False,
        },
        'uvicorn': {
            'handlers': ['json'],
            'level': 'INFO',
            'propagate': False,
        },
      # Не даем стандартному логгеру fastapi работать по пустякам и замедлять работу сервиса
        'uvicorn.access': {
            'handlers': ['json'],
            'level': 'ERROR',
            'propagate': False,
        },
    },
}

Здесь у handler есть особенность в поле func, тут должна быть указана функция вывода потока. Это может быть stdout, print, write file. Что угодно, вплоть до передачи файла по сети в logstash. В нашем случае я решил просто вывести в консоль простейшим способом для примера.
Код test_project.utils.json_logger.py:

def write_log(msg):
    print(msg)

Настраиваем приложение на работу с данным middleware, инициализируем logging и его конфигурацию. Далее производим написание обычного route с логгированием сообщения 'Test log'.

import logging
from logging.config import dictConfig

from fastapi import FastAPI
from test_project.utils.middlewares import LoggingMiddleware
dictConfig(LOG_CONFIG)
logger = logging.getLogger(name)
app = FastAPI()
app.middleware('http')(
    LoggingMiddleware()
)
router = APIRouter(
    tags=['root'],
    responses={404: {'description': 'Not found'}},
)
@router.get('/')
async def root(request: Request):
    logger.info('Test log')
    return {'foo': 'bar'}
app.include_router(router)

Получаем сразу два сообщения в консоли:
Первое: как раз таки тот вывод из Middleware, который гарантирует отображение полей запроса

{
    "thread": 60535,
    "level": 20,
    "level_name": "Information",
    "message": "Ответ с кодом 200 на запрос GET \"http://localhost:8080/\", за 4 мс",
    "source": "test_project.utils.middlewares",
    "@timestamp": "2021-08-28T01:49:23+03:00",
    "app_name": "test_project",
    "app_version": "1.0.0",
    "app_env": "LOCAL",
    "duration": 4,
    "request_uri": "http://localhost:8080/",
    "request_referer": "",
    "request_protocol": "HTTP/1.1",
    "request_method": "GET",
    "request_path": "/",
    "request_host": "127.0.0.1:8080",
    "request_size": 0,
    "request_content_type": "",
    "request_headers": "{\"host\": \"localhost:8080\"}",
    "request_body": "",
    "request_direction": "in",
    "remote_ip": "127.0.0.1",
    "remote_port": "50259",
    "response_status_code": 200,
    "response_size": 13,
    "response_headers": "{\"content-length\": \"13\", \"content-type\": \"application/json\"}",
    "response_body": "{\"foo\":\"baz\"}"
}

Второе: Наше сообщение 'Test Log' вне контекста HTTP запроса, а уже в контексте работы внутри route

{
    "thread": 60535,
    "level": 20,
    "level_name": "Information",
    "message": "Test log",
    "source": "test_project.main.routes",
    "@timestamp": "2021-08-28T01:49:23+03:00",
    "app_name": "test_project",
    "app_version": "1.0.0",
    "app_env": "LOCAL",
    "duration": 385,
    "trace_id": "3bd6f269dbbbd275d46320c7e240bfe5",
    "span_id": "2417303bd1af1ee6",
    "parent_id": null
}

Бонус. Демонстрация производительности асинхронного вывода.

Почему я так сильно хотел асинхронный вывод журнала? Сейчас будет представлено нагрузочное тестирование на обычном примере одного и того же запроса, с одними и теми же конфигурациями сервера.

  • Количество воркеров: 10

  • Количество одновременно-подключенных клиентов: 200

  • Таймаут: 15 секунд.

  • Время выполнения теста: 15 секунд (больше и не надо)

  • Логгирование одного и того же сообщения: test.

  • Formatter не менял свой код.

Команда запуска нагрузочного тестирования:

wrk -c200 -t1 -d15s http://localhost:8080/users/public/ --timeout 15

Результат нагрузочного тестирования синхронного логгирования:

Результат нагрузочного тестирования асинхронного логгирования:

Как можно заметить, что синхронный вариант вывода дал 2294 rps, а асинхронный вариант вывода выдал целых 8364 rps. Получается, разница производительности целого сложного веб-сервиса увеличилась в 3.64 раза благодаря простому переходу и подходу к логгированию.

Вывод

Никогда не стоит соглашаться на сторонние библиотеки, если чувствуете, что они не могут удовлетворить ваших потребностей. Наследоваться от сторонних библиотек может обернуться плохой шуткой и лучше сразу не полениться и попробовать написать что-то своё. Да, это решение вряд-ли подойдет 99.99% пользователям, есть моменты, которые бы улучшил, оптимизировал и сделал бы более гибкими. Как раз про это я бы хотел почитать в комментариях, обсудить, как можно улучшить данный подход.

Пока что данная реализация находится в закрытой разработке одного из моих проектов, но я обещаю, что дополню эту статью ссылкой на гитхаб, либо подготовлю уже PyPi решение для вас, дорогие читатели данной статьи.

Telegram: @Lev_key

Теги:
Хабы:
Всего голосов 10: ↑10 и ↓0+10
Комментарии23

Публикации

Истории

Работа

Python разработчик
137 вакансий
Data Scientist
61 вакансия

Ближайшие события