Идея написать данную статью родилась после моего фейла по разработке данного сервиса. Суть задачи была проста — написать сервер с базовыми методами сохранения и отдачи файлов и сервисными методами по специфичной обработке файлов. Обмен данными (тело запроса, возвращаемые данные) я реализовал через json, про асинхрон идею упустил. По началу всё было хорошо, файлы не превышали размер нескольких мегабайтов, методы испо��ьзовались редко. Но буквально через пару месяцев размеры файлов стали измеряться десятками мегабайт, количество запросов сотни в минуту. Сервис стал тормозить, возникали ошибки совместного доступа к файлам. «Никогда Штирлиц не был так близок к провалу».
В этом кейсе я покажу как я переписал код базовых методов.
В проекте будут использованы библиотеки asyncio, aiohttp для обеспечения асинхронности сервиса.
На этот раз я использовал специальный метод передачи данных порциями по HTTP протоколу — Chunked transfer encoding. Зачем он нужен и как работает подробно описано тут — в англоязычной статье Википедии. Хорошо, что aiohttp поддерживает такой метод из «коробки» — StreamResponse.enable_chunked_encoding.
Ссылка на полный код проекта будет внизу статьи.
Пишем скрипт потокового вызова post-метода сохранения файла
Этот скрипт вспомогательный, его задача — стримить байтовый поток в файловый сервис. Основа скрипта взята из документации aiohttp, в которой показано показано как отправлять большие файлы, не считывая их полностью в память.
Также в заголовок запроса я добавил информацию об имени файла. Для этого создал в заголовке запроса раздел CONTENT‑DISPOSITION.
Итого скрипт выглядит так:
async def file_sender(file_name: str, chunk_size: int) -> Generator[bytes, None, None]:
"""
Генератор считывания файла по частям
Параметры:
file_name (str): имя файла, включая путь
chunk_size (int): размер порции для считывания файла в память
Возвращаемое значение:
chunk (bytes): часть байтового потока файла
"""
async with aiofiles.open(file_name, 'rb') as f:
chunk = await f.read(chunk_size)
while chunk:
yield chunk
chunk = await f.read(chunk_size)
async def main() -> None:
"""Функция генерации post-запроса в адрес файлового сервиса"""
args = get_args() # код этой функции доступен в репозитории для этой статьт
url = urljoin(f'{args.protocol}://{args.host}:{args.port}', args.url)
headers = {
'CONTENT-DISPOSITION': f'attachment;filename={os.path.basename(args.path)}',
}
async with aiohttp.ClientSession() as session:
async with session.post(
url,
headers=headers,
data=file_sender(file_name=args.path, chunk_size=args.chunk_size)
) as resp:
logger.info(await resp.text())Разрабатываем post-метод сохранения файла
В первом приближении код хендлера по обработке post‑метода выглядит следующим образом:
from aiofile import async_open
async def save_archive(request: Request) -> web.Response:
"""Хендлер сохранения байтового потока из запроса в файл"""
async with async_open(file_path, 'bw') as afp:
# https://docs.aiohttp.org/en/stable/streams.html#asynchronous-iteration-support
# Выполняет итерацию по блокам данных в порядке их ввода в поток
async for data in request.content.iter_any():
await afp.write(data)
return web.Response(text='file accepted')Далее я научил хэндлер доставать из заголовка запроса имя файла и сохранять информацию в таблицу базу данных. Таблица имеет два поля — айдишник и имя файла, которое мы получаем из CONTENT‑DISPOSITION.
Итоговый код хендлера:
async def save_file(request: Request) -> web.Response:
"""
Хендлер сохранения байтового потока из http-запроса в файл
Параметры:
request (aiohttp.web_request): объект http-запроса
Возвращаемое значение:
response (aiohttp.Response): объект ответа
"""
_, params = cgi.parse_header(request.headers['CONTENT-DISPOSITION'])
file_name = params['filename']
file_id = str(uuid.uuid4())
file_path = os.path.join(app['folder'], file_id)
async with async_open(file_path, 'bw') as afp:
# https://docs.aiohttp.org/en/stable/streams.html#asynchronous-iteration-support
# Выполняет итерацию по блокам данных в порядке их ввода в поток
async for data in request.content.iter_any():
await afp.write(data)
await logger.debug(f'Файл принят {file_name} и записан на диск')
async with engine.begin() as conn:
response = await conn.execute(files.insert().values(id=file_id, name=file_name))
await logger.debug(f'Файл сохранен под id={response.inserted_primary_key[0]}')
return web.Response(status=201, reason='OK', text=response.inserted_primary_key[0])Обратите внимание, файл мы сохраняем под именем айдишника. В нашем случае это UUID4. Данный прием позволил мне избежать проблем с сохранением файлов с одинаковым именем.
Для работы с БД я использовал sqlalchemy. Код подключения к БД ниже:
metadata = sqlalchemy.MetaData()
engine = create_async_engine(os.environ["FILE_SERVICE_DATABASE_URL"], echo=True)
files = sqlalchemy.Table(
"file",
metadata,
sqlalchemy.Column("id", sqlalchemy.String(38), primary_key=True),
sqlalchemy.Column("name", sqlalchemy.String(255)),
)
async def init_db():
"""
Cоздает таблицу для хранения информации о файлах
Параметры:
Возвращаемое значение:
None
"""
async with engine.begin() as conn:
await conn.run_sync(metadata.create_all)Разрабатываем get-метод получения файла
У пользователей бывает разная скорость сетевого соединения. Поэтому процесс отдачи файла я делил на части. Это основная идея в интерфейсе файлового сервиса. Все будут в позиции «выиграл — выиграл»: клиент начнет скачивать файл сразу, а нам не придется хранить в памяти сервера файл целиком:
async def get_file(request: Request) -> web.StreamResponse:
"""
Хендлер формирования архива и скачивания его в файл
Параметры:
request (aiohttp.web_request): объект http-запроса
Возвращаемое значение:
response (aiohttp.StreamResponse): объект ответа в виде байтового потока
"""
file_id = request.match_info['id']
folder_path = os.path.join(os.getcwd(), app['folder'])
if not (os.path.exists(folder_path) and os.path.isdir(folder_path)):
await logger.warning(f'Запрошена несуществующая папка {folder_path}')
raise web.HTTPNotFound(text='Архив не существует или был удален')
async with engine.connect() as conn:
statement = select(files.c.id, files.c.name).where(files.c.id == file_id)
file_rows = await conn.execute(statement)
file = file_rows.fetchone()
if file is None:
raise web.HTTPNotFound(text='Файла по указанному id не существует')
file_path = os.path.join(app['folder'], file_id)
response = web.StreamResponse(
status=200,
reason='OK',
headers={
'Content-Type': 'multipart/x-mixed-replace',
'CONTENT-DISPOSITION': f'attachment;filename={file.name}'
}
)
# Отправляет клиенту HTTP заголовки
await response.prepare(request)
try:
async with async_open(file_path, 'rb') as f:
chunk = await f.read(app['chunk_size'])
while chunk:
await response.write(chunk)
chunk = await f.read(app['chunk_size'])
except asyncio.CancelledError:
await logger.error("Download was interrupted ")
# отпускаем перехваченный CancelledError
raise
return responseИтого
Я создал асинхронный файловый сервис, который помог разгрузить ОЗУ сервера и при этом был способен работать в HL‑режиме, что важно для приложений, которыми пользуется большое количество клиентов.
Репозиторий файлового сервиса для данной статьи доступен по ссылке. В README.md вы найдете инструкцию по запуску проекта.
Фишка данного сервиса заключается в том, что получить файл вы сможете поместив ссылку в адресной строке браузера. Например http://localhost:8080/files/ff6b51b5–4030–48ed‑aa5a-1b8aba37d1c4/. А это значит, что для получения файла из сервиса вам достаточно сформировать урл в шаблоне web‑страницы.
