Как стать автором
Обновить

Асинхронный файловый api-сервис

Уровень сложностиПростой
Время на прочтение5 мин
Количество просмотров6.6K

Идея написать данную статью родилась после моего фейла по разработке данного сервиса. Суть задачи была проста — написать сервер с базовыми методами сохранения и отдачи файлов и сервисными методами по специфичной обработке файлов. Обмен данными (тело запроса, возвращаемые данные) я реализовал через json, про асинхрон идею упустил. По началу всё было хорошо, файлы не превышали размер нескольких мегабайтов, методы использовались редко. Но буквально через пару месяцев размеры файлов стали измеряться десятками мегабайт, количество запросов сотни в минуту. Сервис стал тормозить, возникали ошибки совместного доступа к файлам. «Никогда Штирлиц не был так близок к провалу».

В этом кейсе я покажу как я переписал код базовых методов.

В проекте будут использованы библиотеки asyncio, aiohttp для обеспечения асинхронности сервиса.

На этот раз я использовал специальный метод передачи данных порциями по HTTP протоколу — Chunked transfer encoding. Зачем он нужен и как работает подробно описано тут — в англоязычной статье Википедии. Хорошо, что aiohttp поддерживает такой метод из «коробки» — StreamResponse.enable_chunked_encoding.

Ссылка на полный код проекта будет внизу статьи.

Пишем скрипт потокового вызова post-метода сохранения файла

Этот скрипт вспомогательный, его задача — стримить байтовый поток в файловый сервис. Основа скрипта взята из документации aiohttp, в которой показано показано как отправлять большие файлы, не считывая их полностью в память.

Также в заголовок запроса я добавил информацию об имени файла. Для этого создал в заголовке запроса раздел CONTENT‑DISPOSITION.

Итого скрипт выглядит так:

async def file_sender(file_name: str, chunk_size: int) -> Generator[bytes, None, None]:
    """
    Генератор считывания файла по частям
            Параметры:
                    file_name (str): имя файла, включая путь
                    chunk_size (int): размер порции для считывания файла в память
            Возвращаемое значение:
                    chunk (bytes): часть байтового потока файла
    """

    async with aiofiles.open(file_name, 'rb') as f:
        chunk = await f.read(chunk_size)

        while chunk:
            yield chunk
            chunk = await f.read(chunk_size)


async def main() -> None:
    """Функция генерации post-запроса в адрес файлового сервиса"""

    args = get_args()  # код этой функции доступен в репозитории для этой статьт

    url = urljoin(f'{args.protocol}://{args.host}:{args.port}', args.url)
    headers = {
        'CONTENT-DISPOSITION': f'attachment;filename={os.path.basename(args.path)}',
    }

    async with aiohttp.ClientSession() as session:
        async with session.post(
                url,
                headers=headers,
                data=file_sender(file_name=args.path, chunk_size=args.chunk_size)
        ) as resp:
            logger.info(await resp.text())

Разрабатываем post-метод сохранения файла

В первом приближении код хендлера по обработке post‑метода выглядит следующим образом:

from aiofile import async_open

async def save_archive(request: Request) -> web.Response:
    """Хендлер сохранения байтового потока из запроса в файл"""

    async with async_open(file_path, 'bw') as afp:
        # https://docs.aiohttp.org/en/stable/streams.html#asynchronous-iteration-support
        # Выполняет итерацию по блокам данных в порядке их ввода в поток
        async for data in request.content.iter_any():
            await afp.write(data)

    return web.Response(text='file accepted')

Далее я научил хэндлер доставать из заголовка запроса имя файла и сохранять информацию в таблицу базу данных. Таблица имеет два поля — айдишник и имя файла, которое мы получаем из CONTENT‑DISPOSITION.

Итоговый код хендлера:

async def save_file(request: Request) -> web.Response:
    """
    Хендлер сохранения байтового потока из http-запроса в файл
            Параметры:
                    request (aiohttp.web_request): объект http-запроса
            Возвращаемое значение:
                    response (aiohttp.Response): объект ответа
    """

    _, params = cgi.parse_header(request.headers['CONTENT-DISPOSITION'])
    file_name = params['filename']
    file_id = str(uuid.uuid4())
    file_path = os.path.join(app['folder'], file_id)

    async with async_open(file_path, 'bw') as afp:
        # https://docs.aiohttp.org/en/stable/streams.html#asynchronous-iteration-support
        # Выполняет итерацию по блокам данных в порядке их ввода в поток
        async for data in request.content.iter_any():
            await afp.write(data)

    await logger.debug(f'Файл принят {file_name} и записан на диск')

    async with engine.begin() as conn:
        response = await conn.execute(files.insert().values(id=file_id, name=file_name))
        await logger.debug(f'Файл сохранен под id={response.inserted_primary_key[0]}')

    return web.Response(status=201, reason='OK', text=response.inserted_primary_key[0])

Обратите внимание, файл мы сохраняем под именем айдишника. В нашем случае это UUID4. Данный прием позволил мне избежать проблем с сохранением файлов с одинаковым именем.

Для работы с БД я использовал sqlalchemy. Код подключения к БД ниже:

metadata = sqlalchemy.MetaData()
engine = create_async_engine(os.environ["FILE_SERVICE_DATABASE_URL"], echo=True)
files = sqlalchemy.Table(
    "file",
    metadata,
    sqlalchemy.Column("id", sqlalchemy.String(38), primary_key=True),
    sqlalchemy.Column("name", sqlalchemy.String(255)),
)


async def init_db():
    """
    Cоздает таблицу для хранения информации о файлах
            Параметры:
            Возвращаемое значение:
                    None
    """

    async with engine.begin() as conn:
        await conn.run_sync(metadata.create_all)

Разрабатываем get-метод получения файла

У пользователей бывает разная скорость сетевого соединения. Поэтому процесс отдачи файла я делил на части. Это основная идея в интерфейсе файлового сервиса. Все будут в позиции «выиграл — выиграл»: клиент начнет скачивать файл сразу, а нам не придется хранить в памяти сервера файл целиком:

async def get_file(request: Request) -> web.StreamResponse:
    """
    Хендлер формирования архива и скачивания его в файл
            Параметры:
                    request (aiohttp.web_request): объект http-запроса
            Возвращаемое значение:
                    response (aiohttp.StreamResponse): объект ответа в виде байтового потока
    """

    file_id = request.match_info['id']
    folder_path = os.path.join(os.getcwd(), app['folder'])

    if not (os.path.exists(folder_path) and os.path.isdir(folder_path)):
        await logger.warning(f'Запрошена несуществующая папка {folder_path}')
        raise web.HTTPNotFound(text='Архив не существует или был удален')

    async with engine.connect() as conn:
        statement = select(files.c.id, files.c.name).where(files.c.id == file_id)

        file_rows = await conn.execute(statement)
        file = file_rows.fetchone()

        if file is None:
            raise web.HTTPNotFound(text='Файла по указанному id не существует')
        file_path = os.path.join(app['folder'], file_id)

    response = web.StreamResponse(
        status=200,
        reason='OK',
        headers={
            'Content-Type': 'multipart/x-mixed-replace',
            'CONTENT-DISPOSITION': f'attachment;filename={file.name}'
        }
    )

    # Отправляет клиенту HTTP заголовки
    await response.prepare(request)

    try:
        async with async_open(file_path, 'rb') as f:
            chunk = await f.read(app['chunk_size'])

            while chunk:
                await response.write(chunk)
                chunk = await f.read(app['chunk_size'])

    except asyncio.CancelledError:
        await logger.error("Download was interrupted ")

        # отпускаем перехваченный CancelledError
        raise

    return response

Итого

Я создал асинхронный файловый сервис, который помог разгрузить ОЗУ сервера и при этом был способен работать в HL‑режиме, что важно для приложений, которыми пользуется большое количество клиентов.

Репозиторий файлового сервиса для данной статьи доступен по ссылке. В README.md вы найдете инструкцию по запуску проекта.

Фишка данного сервиса заключается в том, что получить файл вы сможете поместив ссылку в адресной строке браузера. Например http://localhost:8080/files/ff6b51b5–4030–48ed‑aa5a-1b8aba37d1c4/. А это значит, что для получения файла из сервиса вам достаточно сформировать урл в шаблоне web‑страницы.

Теги:
Хабы:
Всего голосов 5: ↑5 и ↓0+5
Комментарии16

Публикации

Истории

Работа

Data Scientist
61 вакансия
Python разработчик
138 вакансий

Ближайшие события