Всем привет.
Цель данной статьи и моей личной разработки - написать о том, как я придумал свой собственный и удобный формат json-журналирования специально для компании, в которой я работаю, но не нашел готовых решений для реализации, который позволит мне воплощать очень гибко и удобно некоторые вещи с единым наименованием полей, чтобы логгирование протекало быстро и асинхронно, а также: не заставляло бы меня писать много кода и обойтись одной-двумя библиотеками максимум.
Пошел искать решение в лоб: искал готовые решения в виде сторонних библиотек. Отыскал пару-тройку годных решений. Например библиотека json-logging. Всё хорошо: почитал, что делает и поставил не глядя. Данная библиотека имеет массу преимуществ:
может напрямую работать с FastAPI, Flask, Aiohttp, Sanic,
перехватывает uvicorn access log,
производит форматирование запроса в JSON формат.
Данная библиотека генерирует прекрасный лог сообщения с полями запроса, который поступил в сервис:
Код
{
"type": "request",
"written_at": "2017-12-23T16:55:37.280Z",
"written_ts": 1514048137280721000,
"component_id": "-",
"component_name": "-",
"component_instance_idx": 0,
"correlation_id": "1975a02e-e802-11e7-8971-28b2bd90b19a",
"remote_user": "user_a",
"request": "/index.html",
"referer": "-",
"x_forwarded_for": "-",
"protocol": "HTTP/1.1",
"method": "GET",
"remote_ip": "127.0.0.1",
"request_size_b": 1234,
"remote_host": "127.0.0.1",
"remote_port": 50160,
"request_received_at": "2017-12-23T16:55:37.280Z",
"response_time_ms": 0,
"response_status": 200,
"response_size_b": "122",
"response_content_type": "text/html; charset=utf-8",
"response_sent_at": "2017-12-23T16:55:37.280Z"
}
Но у этой библиотеки обнаружились серьёзные недостатки:
Название полей в логе изменить нельзя без наследования Formatter и кастомизацию.
Нельзя никаким образом поставить в логирование тело ответа без написания собственного Middleware для FastAPI, который перехватит еще и тело ответа, что мы добавили в заголовки.
Логгирование перехватывало синхронный поток вывода логгера
uvicorn.access
.
Я не буду писать, как я пытался закостылить исправить первые два минуса, пока не столкнулся с тем, что мне всё-таки нужен асинхронный логер. Я лучше Вам расскажу, как я решил сделать собственное решение.
Для начала я решил отказаться от этой библиотеки сразу:
pip uninstall json-logging
Далее я хотел обозначить структуру журнала. Мне очень помог Pydantic для элегантного описания данного вопроса:
from typing import Union
from pydantic import BaseModel, Field
class BaseJsonLogSchema(BaseModel):
"""
Схема основного тела лога в формате JSON
"""
thread: Union[int, str]
level: int
level_name: str
message: str
source: str
timestamp: str = Field(..., alias='@timestamp')
app_name: str
app_version: str
app_env: str
duration: int
exceptions: Union[list[str], str] = None
trace_id: str = None
span_id: str = None
parent_id: str = None
class Config:
allow_population_by_field_name = True
class RequestJsonLogSchema(BaseModel):
"""
Схема части запросов-ответов лога в формате JSON
"""
request_uri: str
request_referer: str
request_protocol: str
request_method: str
request_path: str
request_host: str
request_size: int
request_content_type: str
request_headers: str
request_body: str
request_direction: str
remote_ip: str
remote_port: str
response_status_code: int
response_size: int
response_headers: str
response_body: str
duration: int
После того, как была сформирована структура тела запроса необходимо определить сам механизм форматирования. Формирование, что логично, производится с помощью создания класса JSONLogFormatter с наследованием от стандартного logging.Formatter.
Код test_project.utils.json_logger.py
import datetime
import json
import logging
import traceback
from test_project.main.settings import APP_NAME, \
APP_VERSION, ENVIRONMENT
from test_project.utils.utils_schemas import BaseJsonLogSchema
class JSONLogFormatter(logging.Formatter):
"""
Кастомизированный класс-форматер для логов в формате json
"""
def format(self, record: logging.LogRecord, *args, **kwargs) -> str:
"""
Преобразование объект журнала в json
:param record: объект журнала
:return: строка журнала в JSON формате
"""
log_object: dict = self._format_log_object(record)
return json.dumps(log_object, ensure_ascii=False)
@staticmethod
def _format_log_object(record: logging.LogRecord) -> dict:
"""
Перевод записи объекта журнала
в json формат с необходимым перечнем полей
:param record: объект журнала
:return: Словарь с объектами журнала
"""
now = datetime \
.datetime \
.fromtimestamp(record.created) \
.astimezone() \
.replace(microsecond=0) \
.isoformat()
message = record.getMessage()
duration = record.duration \
if hasattr(record, 'duration') \
else record.msecs
# Инициализация тела журнала
json_log_fields = BaseJsonLogSchema(
thread=record.process,
timestamp=now,
level=record.levelno,
level_name=LEVEL_TO_NAME[record.levelno],
message=message,
source=record.name,
duration=duration,
app_name=APP_NAME,
app_version=APP_VERSION,
app_env=ENVIRONMENT,
)
if hasattr(record, 'props'):
json_log_fields.props = record.props
if record.exc_info:
json_log_fields.exceptions = \
traceback.format_exception(*record.exc_info)
elif record.exc_text:
json_log_fields.exceptions = record.exc_text
# Преобразование Pydantic объекта в словарь
json_log_object = json_log_fields.dict(
exclude_unset=True,
by_alias=True,
)
# Соединение дополнительных полей логирования
if hasattr(record, 'request_json_fields'):
json_log_object.update(record.request_json_fields)
return json_log_object
Нужно заставить FastApi ловить request и response, желательно с наличием body. Я сначала пытался обратиться к dependency подходу FastApi. Но понял, что не хочу описывать в каждом методе метод логирования. Естественно, к нам на помощь приходит Middleware.
Я написал класс, который позволяет внедрить в app-объект журналирование тела запроса и ответа, а также их полей: status code, headers, remote ip, protocol, etc. Класс я назвал LoggingMiddleware и все выполнение происходит методу async def __call__()
Код test_project.utils.middlewares.py
from test_project.main.settings import PORT
from test_project.utils.json_logger import EMPTY_VALUE
from test_project.utils.utils_schemas import RequestJsonLogSchema
class LoggingMiddleware:
"""
Middleware для обработки запросов и ответов с целью журналирования
"""
@staticmethod
async def get_protocol(request: Request) -> str:
protocol = str(request.scope.get('type', ''))
http_version = str(request.scope.get('http_version', ''))
if protocol.lower() == 'http' and http_version:
return f'{protocol.upper()}/{http_version}'
return EMPTY_VALUE
@staticmethod
async def set_body(request: Request, body: bytes) -> None:
async def receive() -> Message:
return {'type': 'http.request', 'body': body}
request._receive = receive
async def get_body(self, request: Request) -> bytes:
body = await request.body()
await self.set_body(request, body)
return body
async def __call__(
self, request: Request, call_next: RequestResponseEndpoint,
*args, **kwargs
):
start_time = time.time()
exception_object = None
# Request Side
try:
raw_request_body = await request.body()
# Последующие действия нужны,
# чтобы не перезатереть тело запроса
# и не уйти в зависание event-loop'a
# при последующем получении тела ответа
await self.set_body(request, raw_request_body)
raw_request_body = await self.get_body(request)
request_body = raw_request_body.decode()
except Exception:
request_body = EMPTY_VALUE
server: tuple = request.get('server', ('localhost', PORT))
request_headers: dict = dict(request.headers.items())
# Response Side
try:
response = await call_next(request)
except Exception as ex:
response_body = bytes(
http.HTTPStatus.INTERNAL_SERVER_ERROR.phrase.encode()
)
response = Response(
content=response_body,
status_code=http.HTTPStatus.INTERNAL_SERVER_ERROR.real,
)
exception_object = ex
response_headers = {}
else:
response_headers = dict(response.headers.items())
response_body = b''
async for chunk in response.body_iterator:
response_body += chunk
response = Response(
content=response_body,
status_code=response.status_code,
headers=dict(response.headers),
media_type=response.media_type
)
duration: int = math.ceil((time.time() - start_time) * 1000)
# Инициализация и формирования полей для запроса-ответа
request_json_fields = RequestJsonLogSchema(
request_uri=str(request.url),
request_referer=request_headers.get('referer', EMPTY_VALUE),
request_protocol=await self.get_protocol(request),
request_method=request.method,
request_path=request.url.path,
request_host=f'{server[0]}:{server[1]}',
request_size=int(request_headers.get('content-length', 0)),
request_content_type=request_headers.get(
'content-type', EMPTY_VALUE),
request_headers=json.dumps(request_headers),
request_body=request_body,
request_direction='in',
remote_ip=request.client[0],
remote_port=request.client[1],
response_status_code=response.status_code,
response_size=int(response_headers.get('content-length', 0)),
response_headers=json.dumps(response_headers),
response_body=response_body.decode(),
duration=duration
).dict()
# Хочется на каждый запрос читать
# и понимать в сообщении самое главное,
# поэтому message мы сразу делаем типовым
message = f'{"Ошибка" if exception_object else "Ответ"} ' \
f'с кодом {response.status_code} ' \
f'на запрос {request.method} \"{str(request.url)}\", ' \
f'за {duration} мс'
logger.info(
message,
extra={
'request_json_fields': request_json_fields,
'to_mask': True,
},
exc_info=exception_object,
)
return response
У меня есть одержимость по поводу асинхронного логирования в FastApi. Методы вывода потока в консоль, запись в файл, logstash - это тот же I/O процесс, как и сгонять по сетке в БД или в другой сервис, согласны? А так как это I/O процесс, то в асинхронном FastApi очень важно не замедлять работу такой простой штукой как логирование. Я нашел очень хорошую библиотеку, которая за одну конфигурацию перестраивает вывод в консоль в асинхронный режим (также можно провернуть и с файлами и отправкой в logstash, и другие ресурсы). Данная библиотека называется: asynclog. Настройка и установка производится очень просто. Не нужно иметь никаких классов и переопределять handler-ы: Нужно просто настроить логирование через словарь в конфигурации проекта и задать handler.
Создаем конфигурацию логера и использовать его везде. Благо, в python есть для этого конфигурация, которую можно воплотить в настройках проекта.
LOG_CONFIG = {
'version': 1,
'disable_existing_loggers': False,
'formatters': {
'json': {
'()': 'test_project.utils.json_logger.JSONLogFormatter',
},
},
'handlers': {
# Используем AsyncLogDispatcher для асинхронного вывода потока.
'json': {
'formatter': 'json',
'class': 'asynclog.AsyncLogDispatcher',
'func': 'test_project.utils.json_logger.write_log',
},
},
'loggers': {
'test_project': {
'handlers': ['json'],
'level': 'DEBUG' if DEBUG else 'INFO',
'propagate': False,
},
'uvicorn': {
'handlers': ['json'],
'level': 'INFO',
'propagate': False,
},
# Не даем стандартному логгеру fastapi работать по пустякам и замедлять работу сервиса
'uvicorn.access': {
'handlers': ['json'],
'level': 'ERROR',
'propagate': False,
},
},
}
Здесь у handler есть особенность в поле func, тут должна быть указана функция вывода потока. Это может быть stdout, print, write file. Что угодно, вплоть до передачи файла по сети в logstash. В нашем случае я решил просто вывести в консоль простейшим способом для примера.
Код test_project.utils.json_logger.py:
def write_log(msg):
print(msg)
Настраиваем приложение на работу с данным middleware, инициализируем logging и его конфигурацию. Далее производим написание обычного route с логгированием сообщения 'Test log'.
import logging
from logging.config import dictConfig
from fastapi import FastAPI
from test_project.utils.middlewares import LoggingMiddleware
dictConfig(LOG_CONFIG)
logger = logging.getLogger(name)
app = FastAPI()
app.middleware('http')(
LoggingMiddleware()
)
router = APIRouter(
tags=['root'],
responses={404: {'description': 'Not found'}},
)
@router.get('/')
async def root(request: Request):
logger.info('Test log')
return {'foo': 'bar'}
app.include_router(router)
Получаем сразу два сообщения в консоли:
Первое: как раз таки тот вывод из Middleware
, который гарантирует отображение полей запроса
{
"thread": 60535,
"level": 20,
"level_name": "Information",
"message": "Ответ с кодом 200 на запрос GET \"http://localhost:8080/\", за 4 мс",
"source": "test_project.utils.middlewares",
"@timestamp": "2021-08-28T01:49:23+03:00",
"app_name": "test_project",
"app_version": "1.0.0",
"app_env": "LOCAL",
"duration": 4,
"request_uri": "http://localhost:8080/",
"request_referer": "",
"request_protocol": "HTTP/1.1",
"request_method": "GET",
"request_path": "/",
"request_host": "127.0.0.1:8080",
"request_size": 0,
"request_content_type": "",
"request_headers": "{\"host\": \"localhost:8080\"}",
"request_body": "",
"request_direction": "in",
"remote_ip": "127.0.0.1",
"remote_port": "50259",
"response_status_code": 200,
"response_size": 13,
"response_headers": "{\"content-length\": \"13\", \"content-type\": \"application/json\"}",
"response_body": "{\"foo\":\"baz\"}"
}
Второе: Наше сообщение 'Test Log'
вне контекста HTTP запроса, а уже в контексте работы внутри route
{
"thread": 60535,
"level": 20,
"level_name": "Information",
"message": "Test log",
"source": "test_project.main.routes",
"@timestamp": "2021-08-28T01:49:23+03:00",
"app_name": "test_project",
"app_version": "1.0.0",
"app_env": "LOCAL",
"duration": 385,
"trace_id": "3bd6f269dbbbd275d46320c7e240bfe5",
"span_id": "2417303bd1af1ee6",
"parent_id": null
}
Бонус. Демонстрация производительности асинхронного вывода.
Почему я так сильно хотел асинхронный вывод журнала? Сейчас будет представлено нагрузочное тестирование на обычном примере одного и того же запроса, с одними и теми же конфигурациями сервера.
Количество воркеров: 10
Количество одновременно-подключенных клиентов: 200
Таймаут: 15 секунд.
Время выполнения теста: 15 секунд (больше и не надо)
Логгирование одного и того же сообщения: test.
Formatter не менял свой код.
Команда запуска нагрузочного тестирования:
wrk -c200 -t1 -d15s http://localhost:8080/users/public/ --timeout 15
Результат нагрузочного тестирования синхронного логгирования:

Результат нагрузочного тестирования асинхронного логгирования:

Как можно заметить, что синхронный вариант вывода дал 2294 rps, а асинхронный вариант вывода выдал целых 8364 rps. Получается, разница производительности целого сложного веб-сервиса увеличилась в 3.64 раза благодаря простому переходу и подходу к логгированию.
Вывод
Никогда не стоит соглашаться на сторонние библиотеки, если чувствуете, что они не могут удовлетворить ваших потребностей. Наследоваться от сторонних библиотек может обернуться плохой шуткой и лучше сразу не полениться и попробовать написать что-то своё. Да, это решение вряд-ли подойдет 99.99% пользователям, есть моменты, которые бы улучшил, оптимизировал и сделал бы более гибкими. Как раз про это я бы хотел почитать в комментариях, обсудить, как можно улучшить данный подход.
Пока что данная реализация находится в закрытой разработке одного из моих проектов, но я обещаю, что дополню эту статью ссылкой на гитхаб, либо подготовлю уже PyPi решение для вас, дорогие читатели данной статьи.
Telegram: @Lev_key