Идея написать данную статью родилась после моего фейла по разработке данного сервиса. Суть задачи была проста — написать сервер с базовыми методами сохранения и отдачи файлов и сервисными методами по специфичной обработке файлов. Обмен данными (тело запроса, возвращаемые данные) я реализовал через json, про асинхрон идею упустил. По началу всё было хорошо, файлы не превышали размер нескольких мегабайтов, методы испо��ьзовались редко. Но буквально через пару месяцев размеры файлов стали измеряться десятками мегабайт, количество запросов сотни в минуту. Сервис стал тормозить, возникали ошибки совместного доступа к файлам. «Никогда Штирлиц не был так близок к провалу».

В этом кейсе я покажу как я переписал код базовых методов.

В проекте будут использованы библиотеки asyncio, aiohttp для обеспечения асинхронности сервиса.

На этот раз я использовал специальный метод передачи данных порциями по HTTP протоколу — Chunked transfer encoding. Зачем он нужен и как работает подробно описано тут — в англоязычной статье Википедии. Хорошо, что aiohttp поддерживает такой метод из «коробки» — StreamResponse.enable_chunked_encoding.

Ссылка на полный код проекта будет внизу статьи.

Пишем скрипт потокового вызова post-метода сохранения файла

Этот скрипт вспомогательный, его задача — стримить байтовый поток в файловый сервис. Основа скрипта взята из документации aiohttp, в которой показано показано как отправлять большие файлы, не считывая их полностью в память.

Также в заголовок запроса я добавил информацию об имени файла. Для этого создал в заголовке запроса раздел CONTENT‑DISPOSITION.

Итого скрипт выглядит так:

async def file_sender(file_name: str, chunk_size: int) -> Generator[bytes, None, None]:
    """
    Генератор считывания файла по частям
            Параметры:
                    file_name (str): имя файла, включая путь
                    chunk_size (int): размер порции для считывания файла в память
            Возвращаемое значение:
                    chunk (bytes): часть байтового потока файла
    """

    async with aiofiles.open(file_name, 'rb') as f:
        chunk = await f.read(chunk_size)

        while chunk:
            yield chunk
            chunk = await f.read(chunk_size)


async def main() -> None:
    """Функция генерации post-запроса в адрес файлового сервиса"""

    args = get_args()  # код этой функции доступен в репозитории для этой статьт

    url = urljoin(f'{args.protocol}://{args.host}:{args.port}', args.url)
    headers = {
        'CONTENT-DISPOSITION': f'attachment;filename={os.path.basename(args.path)}',
    }

    async with aiohttp.ClientSession() as session:
        async with session.post(
                url,
                headers=headers,
                data=file_sender(file_name=args.path, chunk_size=args.chunk_size)
        ) as resp:
            logger.info(await resp.text())

Разрабатываем post-метод сохранения файла

В первом приближении код хендлера по обработке post‑метода выглядит следующим образом:

from aiofile import async_open

async def save_archive(request: Request) -> web.Response:
    """Хендлер сохранения байтового потока из запроса в файл"""

    async with async_open(file_path, 'bw') as afp:
        # https://docs.aiohttp.org/en/stable/streams.html#asynchronous-iteration-support
        # Выполняет итерацию по блокам данных в порядке их ввода в поток
        async for data in request.content.iter_any():
            await afp.write(data)

    return web.Response(text='file accepted')

Далее я научил хэндлер доставать из заголовка запроса имя файла и сохранять информацию в таблицу базу данных. Таблица имеет два поля — айдишник и имя файла, которое мы получаем из CONTENT‑DISPOSITION.

Итоговый код хендлера:

async def save_file(request: Request) -> web.Response:
    """
    Хендлер сохранения байтового потока из http-запроса в файл
            Параметры:
                    request (aiohttp.web_request): объект http-запроса
            Возвращаемое значение:
                    response (aiohttp.Response): объект ответа
    """

    _, params = cgi.parse_header(request.headers['CONTENT-DISPOSITION'])
    file_name = params['filename']
    file_id = str(uuid.uuid4())
    file_path = os.path.join(app['folder'], file_id)

    async with async_open(file_path, 'bw') as afp:
        # https://docs.aiohttp.org/en/stable/streams.html#asynchronous-iteration-support
        # Выполняет итерацию по блокам данных в порядке их ввода в поток
        async for data in request.content.iter_any():
            await afp.write(data)

    await logger.debug(f'Файл принят {file_name} и записан на диск')

    async with engine.begin() as conn:
        response = await conn.execute(files.insert().values(id=file_id, name=file_name))
        await logger.debug(f'Файл сохранен под id={response.inserted_primary_key[0]}')

    return web.Response(status=201, reason='OK', text=response.inserted_primary_key[0])

Обратите внимание, файл мы сохраняем под именем айдишника. В нашем случае это UUID4. Данный прием позволил мне избежать проблем с сохранением файлов с одинаковым именем.

Для работы с БД я использовал sqlalchemy. Код подключения к БД ниже:

metadata = sqlalchemy.MetaData()
engine = create_async_engine(os.environ["FILE_SERVICE_DATABASE_URL"], echo=True)
files = sqlalchemy.Table(
    "file",
    metadata,
    sqlalchemy.Column("id", sqlalchemy.String(38), primary_key=True),
    sqlalchemy.Column("name", sqlalchemy.String(255)),
)


async def init_db():
    """
    Cоздает таблицу для хранения информации о файлах
            Параметры:
            Возвращаемое значение:
                    None
    """

    async with engine.begin() as conn:
        await conn.run_sync(metadata.create_all)

Разрабатываем get-метод получения файла

У пользователей бывает разная скорость сетевого соединения. Поэтому процесс отдачи файла я делил на части. Это основная идея в интерфейсе файлового сервиса. Все будут в позиции «выиграл — выиграл»: клиент начнет скачивать файл сразу, а нам не придется хранить в памяти сервера файл целиком:

async def get_file(request: Request) -> web.StreamResponse:
    """
    Хендлер формирования архива и скачивания его в файл
            Параметры:
                    request (aiohttp.web_request): объект http-запроса
            Возвращаемое значение:
                    response (aiohttp.StreamResponse): объект ответа в виде байтового потока
    """

    file_id = request.match_info['id']
    folder_path = os.path.join(os.getcwd(), app['folder'])

    if not (os.path.exists(folder_path) and os.path.isdir(folder_path)):
        await logger.warning(f'Запрошена несуществующая папка {folder_path}')
        raise web.HTTPNotFound(text='Архив не существует или был удален')

    async with engine.connect() as conn:
        statement = select(files.c.id, files.c.name).where(files.c.id == file_id)

        file_rows = await conn.execute(statement)
        file = file_rows.fetchone()

        if file is None:
            raise web.HTTPNotFound(text='Файла по указанному id не существует')
        file_path = os.path.join(app['folder'], file_id)

    response = web.StreamResponse(
        status=200,
        reason='OK',
        headers={
            'Content-Type': 'multipart/x-mixed-replace',
            'CONTENT-DISPOSITION': f'attachment;filename={file.name}'
        }
    )

    # Отправляет клиенту HTTP заголовки
    await response.prepare(request)

    try:
        async with async_open(file_path, 'rb') as f:
            chunk = await f.read(app['chunk_size'])

            while chunk:
                await response.write(chunk)
                chunk = await f.read(app['chunk_size'])

    except asyncio.CancelledError:
        await logger.error("Download was interrupted ")

        # отпускаем перехваченный CancelledError
        raise

    return response

Итого

Я создал асинхронный файловый сервис, который помог разгрузить ОЗУ сервера и при этом был способен работать в HL‑режиме, что важно для приложений, которыми пользуется большое количество клиентов.

Репозиторий файлового сервиса для данной статьи доступен по ссылке. В README.md вы найдете инструкцию по запуску проекта.

Фишка данного сервиса заключается в том, что получить файл вы сможете поместив ссылку в адресной строке браузера. Например http://localhost:8080/files/ff6b51b5–4030–48ed‑aa5a-1b8aba37d1c4/. А это значит, что для получения файла из сервиса вам достаточно сформировать урл в шаблоне web‑страницы.