company_banner

Практическое руководство по разработке бэкенд-сервиса на Python

  • Tutorial
Привет, меня зовут Александр Васин, я бэкенд-разработчик в Едадиле. Идея этого материала началась с того, что я хотел разобрать вступительное задание (Я.Диск) в Школу бэкенд-разработки Яндекса. Я начал описывать все тонкости выбора тех или иных технологий, методику тестирования… Получался совсем не разбор, а очень подробный гайд по тому, как писать бэкенды на Python. От первоначальной идеи остались только требования к сервису, на примере которых удобно разбирать инструменты и технологии. В итоге я очнулся на сотне тысяч символов. Ровно столько потребовалось, чтобы рассмотреть всё в мельчайших подробностях. Итак, программа на следующие 100 килобайт: как строить бэкенд сервиса, начиная от выбора инструментов и заканчивая деплоем.



TL;DR: Вот репка на GitHub с приложением, а кто любит (настоящие) лонгриды — прошу под кат.

Мы разработаем и протестируем REST API-сервис на Python, упакуем его в легкий Docker-контейнер и развернем с помощью Ansible.

Реализовать REST API-сервис можно по-разному, с помощью разных инструментов. Описанное решение не единственно верное, реализацию и инструменты я выбирал исходя из своего личного опыта и предпочтений.


Что будем делать?


Представим, что интернет-магазин подарков планирует запустить акцию в разных регионах. Чтобы стратегия продаж была эффективной, необходим анализ рынка. У магазина есть поставщик, регулярно присылающий (например, на почту) выгрузки данных с информацией о жителях.

Давайте разработаем REST API-сервис на Python, который будет анализировать предоставленные данные и выявлять спрос на подарки у жителей разных возрастных групп в разных городах по месяцам.

В сервисе реализуем следующие обработчики:

  • POST /imports
    Добавляет новую выгрузку с данными;
  • GET /imports/$import_id/citizens
    Возвращает жителей указанной выгрузки;
  • PATCH /imports/$import_id/citizens/$citizen_id
    Изменяет информацию о жителе (и его родственниках) в указанной выгрузке;
  • GET /imports/$import_id/citizens/birthdays
    Вычисляет число подарков, которое приобретет каждый житель выгрузки своим родственникам (первого порядка), сгруппированное по месяцам;
  • GET /imports/$import_id/towns/stat/percentile/age
    Вычисляет 50-й, 75-й и 99-й перцентили возрастов (полных лет) жителей по городам в указанной выборке.

Какие инструменты выбрать?


Итак, пишем сервис на Python, используя знакомые фреймворки, библиотеки и СУБД.

В 4 лекции видеокурса рассказывается о различных СУБД и их особенностях. Для моей реализации я выбрал СУБД PostgreSQL, зарекомендовавшую себя как надежное решение c отличной документацией на русском языке, сильным русским сообществом (всегда можно найти ответ на вопрос на русском языке) и даже бесплатными курсами. Реляционная модель достаточно универсальна и хорошо понятна многим разработчикам. Хотя то же самое можно было сделать на любой NoSQL СУБД, в этой статье будем рассматривать именно PostgreSQL.

Основная задача сервиса — передача данных по сети между БД и клиентами — не предполагает большой нагрузки на процессор, но требует возможности обрабатывать несколько запросов в один момент времени. В 10 лекции рассматривается асинхронный подход. Он позволяет эффективно обслуживать нескольких клиентов в рамках одного процесса ОС (в отличие, например, от используемой во Flask/Django pre-fork-модели, которая создает несколько процессов для обработки запросов от пользователей, каждый из них потребляет память, но простаивает большую часть времени). Поэтому в качестве библиотеки для написания сервиса я выбрал асинхронный aiohttp.



В 5 лекции видеокурса рассказывается, что SQLAlchemy позволяет декомпозировать сложные запросы на части, переиспользовать их, генерировать запросы с динамическим набором полей (например, PATCH-обработчик позволяет частичное обновление жителя с произвольными полями) и сосредоточиться непосредственно на бизнес-логике. С выполнением этих запросов и передачей данных быстрее всех справится драйвер asyncpg, а подружить их поможет asyncpgsa.

Мой любимый инструмент для управления состоянием БД и работы с миграциями — Alembic. Кстати, я недавно рассказывал о нем на Moscow Python.

Логику валидации получилось лаконично описать схемами Marshmallow (включая проверки на родственные связи). С помощью модуля aiohttp-spec я связал aiohttp-обработчики и схемы для валидации данных, а бонусом получилось сгенерировать документацию в формате Swagger и отобразить ее в графическом интерфейсе.

Для написания тестов я выбрал pytest, подробнее о нем — в 3 лекции.

Для отладки и профилирования этого проекта я использовал отладчик PyCharm (лекция 9).

В 7 лекции рассказывается, как на любом компьютере с Docker (и даже на разных ОС) можно запускать упакованное приложение без необходимости настраивать окружение для запуска и легко устанавливать/обновлять/удалять приложение на сервере.

Для деплоя я выбрал Ansible. Он позволяет декларативно описывать желаемое состояние сервера и его сервисов, работает по ssh и не требует специального софта.

Разработка


Я решил дать Python-пакету название analyzer и использовать следующую структуру:



В файле analyzer/__init__.py я разместил общую информацию о пакете: описание (docstring), версию, лицензию, контакты разработчиков.

Ее можно посмотреть встроенной командой help
$ python
>>> import analyzer
>>> help(analyzer)

Help on package analyzer:

NAME
    analyzer

DESCRIPTION
    Сервис с REST API, анализирующий рынок для промоакций.

PACKAGE CONTENTS
    api (package)
    db (package)
    utils (package)

DATA
    __all__ = ('__author__', '__email__', '__license__', '__maintainer__',...
    __email__ = 'alvassin@yandex.ru'
    __license__ = 'MIT'
    __maintainer__ = 'Alexander Vasin'

VERSION
    0.0.1

AUTHOR
    Alexander Vasin

FILE
    /Users/alvassin/Work/backendschool2019/analyzer/__init__.py

Пакет имеет две входных точки — REST API-сервис (analyzer/api/__main__.py) и утилита управления состоянием БД (analyzer/db/__main__.py). Файлы называются __main__.py неспроста — во-первых, такое название привлекает внимание, по нему понятно, что файл является входной точкой.

Во-вторых, благодаря этому подходу к входным точкам можно обращаться с помощью команды python -m:

# REST API
$ python -m analyzer.api --help

# Утилита управления состоянием БД
$ python -m analyzer.db --help

Почему нужно начать с setup.py?


Забегая вперед, подумаем, как можно распространять приложение: оно может быть упаковано в zip- (а также wheel/egg-) архив, rpm-пакет, pkg-файл для macOS и установлено на удаленный компьютер, в виртуальную машину, MacBook или Docker-контейнер.

Главная цель файла setup.py — описать пакет с приложением для distutils/setuptools.

В файле необходимо указать общую информацию о пакете (название, версию, автора и т. д.), но также в нем можно указать требуемые для работы модули, «экстра»-зависимости (например для тестирования), точки входа (например, исполняемые команды) и требования к интерпретатору.

Плагины setuptools позволяют собирать из описанного пакета артефакт. Есть встроенные плагины: zip, egg, rpm, macOS pkg. Остальные плагины распространяются через PyPI: wheel, xar, pex.

В сухом остатке, описав один файл, мы получаем огромные возможности. Именно поэтому разработку нового проекта нужно начинать с setup.py.

В функции setup() зависимые модули указываются списком:

setup(..., install_requires=["aiohttp", "SQLAlchemy"])

Но я описал зависимости в отдельных файлах requirements.txt и requirements.dev.txt, содержимое которых используется в setup.py. Мне это кажется более гибким, плюс тут есть секрет: впоследствии это позволит собирать Docker-образ быстрее. Зависимости будут ставиться отдельным шагом до установки самого приложения, а при пересборке Docker-контейнера попадать в кеш.

Чтобы setup.py смог прочитать зависимости из файлов requirements.txt и requirements.dev.txt, написана функция:

def load_requirements(fname: str) -> list:
    requirements = []
    with open(fname, 'r') as fp:
        for req in parse_requirements(fp.read()):
            extras = '[{}]'.format(','.join(req.extras)) if req.extras else ''
            requirements.append(
                '{}{}{}'.format(req.name, extras, req.specifier)
            )
    return requirements

Стоит отметить, что setuptools при сборке source distribution по умолчанию включает в сборку только файлы .py, .c, .cpp и .h. Чтобы файлы с зависимостями requirements.txt и requirements.dev.txt попали в пакет, их необходимо явно указать в файле MANIFEST.in.

setup.py целиком
import os
from importlib.machinery import SourceFileLoader

from pkg_resources import parse_requirements
from setuptools import find_packages, setup

module_name = 'analyzer'

# Возможно, модуль еще не установлен (или установлена другая версия), поэтому
# необходимо загружать __init__.py с помощью machinery.
module = SourceFileLoader(
    module_name, os.path.join(module_name, '__init__.py')
).load_module()

def load_requirements(fname: str) -> list:
    requirements = []
    with open(fname, 'r') as fp:
        for req in parse_requirements(fp.read()):
            extras = '[{}]'.format(','.join(req.extras)) if req.extras else ''
            requirements.append(
                '{}{}{}'.format(req.name, extras, req.specifier)
            )
    return requirements

setup(
    name=module_name,
    version=module.__version__,
    author=module.__author__,
    author_email=module.__email__,
    license=module.__license__,
    description=module.__doc__,
    long_description=open('README.rst').read(),
    url='https://github.com/alvassin/backendschool2019',
    platforms='all',
    classifiers=[
        'Intended Audience :: Developers',
        'Natural Language :: Russian',
        'Operating System :: MacOS',
        'Operating System :: POSIX',
        'Programming Language :: Python',
        'Programming Language :: Python :: 3',
        'Programming Language :: Python :: 3.8',
        'Programming Language :: Python :: Implementation :: CPython'
    ],
    python_requires='>=3.8',
    packages=find_packages(exclude=['tests']),
    install_requires=load_requirements('requirements.txt'),
    extras_require={'dev': load_requirements('requirements.dev.txt')},
    entry_points={
        'console_scripts': [
            # f-strings в setup.py не используются из-за соображений
            # совместимости.
            # Несмотря на то, что этот пакет требует Python 3.8, технически
            # source distribution для него может собираться с помощью более
            # ранних версий Python. Не стоит лишать пользователей этой
            # возможности.
            '{0}-api = {0}.api.__main__:main'.format(module_name),
            '{0}-db = {0}.db.__main__:main'.format(module_name)
        ]
    },
    include_package_data=True
)

Установить проект в режиме разработки можно следующей командой (в editable-режиме Python не установит пакет целиком в папку site-packages, а только создаст ссылки, поэтому любые изменения, вносимые в файлы пакета, будут видны сразу):

# Установить пакет с обычными и extra-зависимостями "dev"
pip install -e '.[dev]'

# Установить пакет только с обычными зависимостями
pip install -e .

Как указать версии зависимостей?


Здорово, когда разработчики активно занимаются своими пакетами — в них активнее исправляются ошибки, появляется новая функциональность и можно быстрее получить обратную связь. Но иногда изменения в зависимых библиотеках не имеют обратной совместимости и могут привести к ошибкам в вашем приложении, если не подумать об этом заранее.

Для каждого зависимого пакета можно указать определенную версию, например aiohttp==3.6.2. Тогда приложение будет гарантированно собираться именно с теми версиями зависимых библиотек, с которыми оно было протестировано. Но у этого подхода есть и недостаток — если разработчики исправят критичный баг в зависимом пакете, не влияющий на обратную совместимость, в приложение это исправление не попадет.

Существует подход к версионированию Semantic Versioning, который предлагает представлять версию в формате MAJOR.MINOR.PATCH:

  • MAJOR — увеличивается при добавлении обратно несовместимых изменений;
  • MINOR — увеличивается при добавлении новой функциональности с поддержкой обратной совместимости;
  • PATCH — увеличивается при добавлении исправлений багов с поддержкой обратной совместимости.

Если зависимый пакет следует этому подходу (о чем авторы обычно сообщают в файлах README или CHANGELOG), то достаточно зафиксировать значения MAJOR, MINOR и ограничить минимальное значение для PATCH-версии: >= MAJOR.MINOR.PATCH, == MAJOR.MINOR.*.

Такое требование можно реализовать с помощью оператора ~=. Например, aiohttp~=3.6.2 позволит PIP установить для aiohttp версию 3.6.3, но не 3.7.

Если указать интервал версий зависимостей, это даст еще одно преимущество — не будет конфликтов версий между зависимыми библиотеками.

Если вы разрабатываете библиотеку, которая требует другой пакет-зависимость, то разрешите для него не одну определенную версию, а интервал. Тогда потребителям вашей библиотеки будет намного легче ее использовать (вдруг их приложение требует этот же пакет-зависимость, но уже другой версии).

Semantic Versioning — лишь соглашение между авторами и потребителями пакетов. Оно не гарантирует, что авторы пишут код без багов и не могут допустить ошибку в новой версии своего пакета.

База данных


Проектируем схему


В описании обработчика POST /imports приведен пример выгрузки с информацией о жителях:

Пример выгрузки
{
  "citizens": [
    {
      "citizen_id": 1,
      "town": "Москва",
      "street": "Льва Толстого",
      "building": "16к7стр5",
      "apartment": 7,
      "name": "Иванов Иван Иванович",
      "birth_date": "26.12.1986",
      "gender": "male",
      "relatives": [2]
    },
    {
      "citizen_id": 2,
      "town": "Москва",
      "street": "Льва Толстого",
      "building": "16к7стр5",
      "apartment": 7,
      "name": "Иванов Сергей Иванович",
      "birth_date": "01.04.1997",
      "gender": "male",
      "relatives": [1]
    },
    {
      "citizen_id": 3,
      "town": "Керчь",
      "street": "Иосифа Бродского",
      "building": "2",
      "apartment": 11,
      "name": "Романова Мария Леонидовна",
      "birth_date": "23.11.1986",
      "gender": "female",
      "relatives": []
    },
    ...
  ]
}

Первой мыслью было хранить всю информацию о жителе в одной таблице citizens, где родственные связи были бы представлены полем relatives в виде списка целых чисел.

Но у этого способа есть ряд недостатков
  1. В обработчике GET /imports/$import_id/citizens/birthdays для получения месяцев, на которые приходятся дни рождения родственников, потребуется выполнить слияние таблицы citizens с самой собой. Для этого будет необходимо развернуть список с идентификаторами родственников relatives с помощью фунции UNNEST.

    Такой запрос будет выполняться сравнительно медленно, и обработчик не уложится в 10-секундный таймаут:
    SELECT 
        relations.citizen_id, 
        relations.relative_id, 
        date_part('month', relatives.birth_date) as relative_birth_month
    FROM (
    	SELECT
            citizens.import_id, 
            citizens.citizen_id,
            UNNEST(citizens.relatives) as relative_id
    	FROM citizens
        WHERE import_id = 1
    ) as relations
    INNER JOIN citizens as relatives ON
        relations.import_id = relatives.import_id AND
        relations.relative_id = relatives.citizen_id
    

  2. В таком подходе целостность данных в поле relatives не обеспечивается PostgreSQL, а контролируется приложением: технически в список relatives можно добавить любое целое число, в том числе идентификатор несуществующего жителя. Ошибка в коде или человеческий фактор (редактирование записей напрямую в БД администратором) обязательно рано или поздно приведут к несогласованному состоянию данных.

Далее, я решил привести все требуемые для работы данные к третьей нормальной форме, и получилась следующая структура:



  1. Таблица imports состоит из автоматически инкрементируемого столбца import_id. Он нужен для создания проверки по внешнему ключу в таблице citizens.
  2. В таблице citizens хранятся скалярные данные о жителе (все поля за исключением информации о родственных связях).

    В качестве первичного ключа используется пара (import_id, citizen_id), гарантирующая уникальность жителей citizen_id в рамках import_id.

    Внешний ключ citizens.import_id -> imports.import_id гарантирует, что поле citizens.import_id будет содержать только существующие выгрузки.
  3. Таблица relations содержит информацию о родственных связях.

    Одна родственная связь представлена двумя записями (от жителя к родственнику и обратно): эта избыточность позволяет использовать более простое условие при слиянии таблиц citizens и relations и получать информацию более эффективно.
    Первичный ключ состоит из столбцов (import_id, citizen_id, relative_id) и гарантирует, что в рамках одной выгрузки import_id у жителя citizen_id будут родственники c уникальными relative_id.

    Также в таблице используются два составных внешних ключа: (relations.import_id, relations.citizen_id) -> (citizens.import_id, citizens.citizen_id) и (relations.import_id, relations.relative_id) -> (citizens.import_id, citizens.citizen_id), гарантирующие, что в таблице будут указаны существующие житель citizen_id и родственник relative_id из одной выгрузки.

Такая структура обеспечивает целостность данных средствами PostgreSQL, позволяет эффективно получать жителей с родственниками из базы данных, но подвержена состоянию гонки во время обновления информации о жителях конкурентными запросами (подробнее рассмотрим при реализации обработчика PATCH).

Описываем схему в SQLAlchemy


В лекции 5 я рассказывал, что для создания запросов с помощью SQLAlchemy необходимо описать схему базы данных с помощью специальных объектов: таблицы описываются с помощью sqlalchemy.Table и привязываются к реестру sqlalchemy.MetaData, который хранит всю метаинформацию о базе данных. К слову, реестр MetaData способен не только хранить описанную в Python метаинформацию, но и представлять реальное состояние базы данных в виде объектов SQLAlchemy.

Эта возможность в том числе позволяет Alembic сравнивать состояния и генерировать код миграций автоматически.

Кстати, у каждой базы данных своя схема именования constraints по умолчанию. Чтобы вы не тратили время на именование новых constraints или на воспоминания/поиски того, как назван constraint, который вы собираетесь удалить, SQLAlchemy предлагает использовать шаблоны именования naming conventions. Их можно определить в реестре MetaData.

Создаем реестр MetaData и передаем в него шаблоны именования
# analyzer/db/schema.py
from sqlalchemy import MetaData

convention = {
    'all_column_names': lambda constraint, table: '_'.join([
        column.name for column in constraint.columns.values()
    ]),

    # Именование индексов
    'ix': 'ix__%(table_name)s__%(all_column_names)s',

    # Именование уникальных индексов
    'uq': 'uq__%(table_name)s__%(all_column_names)s',

    # Именование CHECK-constraint-ов
    'ck': 'ck__%(table_name)s__%(constraint_name)s',

    # Именование внешних ключей
    'fk': 'fk__%(table_name)s__%(all_column_names)s__%(referred_table_name)s',

    # Именование первичных ключей
    'pk': 'pk__%(table_name)s'
}
metadata = MetaData(naming_convention=convention)

Если указать шаблоны именования, Alembic воспользуется ими во время автоматической генерации миграций и будет называть все constraints в соответствии с ними. В дальнейшем cозданный реестр MetaData потребуется для описания таблиц:

Описываем схему базы данных объектами SQLAlchemy
# analyzer/db/schema.py
from enum import Enum, unique

from sqlalchemy import (
    Column, Date, Enum as PgEnum, ForeignKey, ForeignKeyConstraint, Integer,
    String, Table
)


@unique
class Gender(Enum):
    female = 'female'
    male = 'male'


imports_table = Table(
    'imports',
    metadata,
    Column('import_id', Integer, primary_key=True)
)

citizens_table = Table(
    'citizens',
    metadata,
    Column('import_id', Integer, ForeignKey('imports.import_id'),
           primary_key=True),
    Column('citizen_id', Integer, primary_key=True),
    Column('town', String, nullable=False, index=True),
    Column('street', String, nullable=False),
    Column('building', String, nullable=False),
    Column('apartment', Integer, nullable=False),
    Column('name', String, nullable=False),
    Column('birth_date', Date, nullable=False),
    Column('gender', PgEnum(Gender, name='gender'), nullable=False),
)

relations_table = Table(
    'relations',
    metadata,
    Column('import_id', Integer, primary_key=True),
    Column('citizen_id', Integer, primary_key=True),
    Column('relative_id', Integer, primary_key=True),
    ForeignKeyConstraint(
        ('import_id', 'citizen_id'),
        ('citizens.import_id', 'citizens.citizen_id')
    ),
    ForeignKeyConstraint(
        ('import_id', 'relative_id'),
        ('citizens.import_id', 'citizens.citizen_id')
    ),
)

Настраиваем Alembic


Когда схема базы данных описана, необходимо сгенерировать миграции, но для этого сначала нужно настроить Alembic, об этом тоже рассказывается в лекции 5.

Чтобы воспользоваться командой alembic, необходимо выполнить следующие шаги:

  1. Установить пакет: pip install alembic
  2. Инициализировать Alembic: cd analyzer && alembic init db/alembic.

    Эта команда создаст файл конфигурации analyzer/alembic.ini и папку analyzer/db/alembic со следующим содержимым:
    • env.py — вызывается каждый раз при запуске Alembic. Подключает в Alembic реестр sqlalchemy.MetaData с описанием желаемого состояния БД и содержит инструкции по запуску миграций.
    • script.py.mako — шаблон, на основе которого генерируются миграции.
    • versions — папка, в которой Alembic будет искать (и генерировать) миграции.
  3. Указать адрес базы данных в файле alembic.ini:

    ; analyzer/alembic.ini
    [alembic] 
    sqlalchemy.url = postgresql://user:hackme@localhost/analyzer
  4. Указать описание желаемого состояния базы данных (реестр sqlalchemy.MetaData), чтобы Alembic мог генерировать миграции автоматически:

    # analyzer/db/alembic/env.py
    from analyzer.db import schema
    target_metadata = schema.metadata

Alembic настроен и им уже можно пользоваться, но в нашем случае такая конфигурация имеет ряд недостатков:

  1. Утилита alembic ищет alembic.ini в текущей рабочей директории. Путь к alembic.ini можно указать аргументом командной строки, но это неудобно: хочется иметь возможность вызывать команду из любой папки без дополнительных параметров.
  2. Чтобы настроить Alembic на работу с определенной базой данных, требуется менять файл alembic.ini. Гораздо удобнее было бы указать настройки БД переменной окружения и/или аргументом командной строки, например --pg-url.
  3. Название утилиты alembic не очень хорошо коррелирует с названием нашего сервиса (а пользователь фактически может вообще не владеть Python и ничего не знать об Alembic). Конечному пользователю было бы намного удобнее, если бы все исполняемые команды сервиса имели общий префикс, например analyzer-*.

Эти проблемы решаются с помощью небольшой обертки analyzer/db/__main__.py:

  • Для обработки аргументов командной строки Alembic использует стандартный модуль argparse. Он позволяет добавить необязательный аргумент --pg-url со значением по умолчанию из переменной окружения ANALYZER_PG_URL.

    Код
    import os
    from alembic.config import CommandLine, Config
    from analyzer.utils.pg import DEFAULT_PG_URL
    
    
    def main():
        alembic = CommandLine()
        alembic.parser.add_argument(
            '--pg-url', default=os.getenv('ANALYZER_PG_URL', DEFAULT_PG_URL),
            help='Database URL [env var: ANALYZER_PG_URL]'
        )
        options = alembic.parser.parse_args()
    
        # Создаем объект конфигурации Alembic
        config = Config(file_=options.config, ini_section=options.name,
                        cmd_opts=options)
    
        # Меняем значение sqlalchemy.url из конфига Alembic
        config.set_main_option('sqlalchemy.url', options.pg_url)
    
        # Запускаем команду alembic
        exit(alembic.run_cmd(config, options))
    
    
    if __name__ == '__main__':
        main()
  • Путь до файла alembic.ini можно рассчитывать относительно расположения исполняемого файла, а не текущей рабочей директории пользователя.

    Код
    import os
    from alembic.config import CommandLine, Config
    from pathlib import Path
    
    
    PROJECT_PATH = Path(__file__).parent.parent.resolve()
    
    
    def main():
        alembic = CommandLine()
        options = alembic.parser.parse_args()
    
        # Если указан относительный путь (alembic.ini), добавляем в начало
        # абсолютный путь до приложения
        if not os.path.isabs(options.config):
            options.config = os.path.join(PROJECT_PATH, options.config)
    
        # Создаем объект конфигурации Alembic
        config = Config(file_=options.config, ini_section=options.name,
                        cmd_opts=options)
    
        # Подменяем путь до папки с alembic на абсолютный (требуется, чтобы alembic
        # мог найти env.py, шаблон для генерации миграций и сами миграции)
        alembic_location = config.get_main_option('script_location')
        if not os.path.isabs(alembic_location):
            config.set_main_option('script_location',
                                   os.path.join(PROJECT_PATH, alembic_location))
    
        # Запускаем команду alembic
        exit(alembic.run_cmd(config, options))
    
    
    if __name__ == '__main__':
        main()

Когда утилита для управления состоянием БД готова, ее можно зарегистрировать в setup.py как исполняемую команду с понятным конечному пользователю названием, например analyzer-db:

Регистрация исполняемой команды в setup.py
from setuptools import setup

setup(..., entry_points={
    'console_scripts': [
        'analyzer-db = analyzer.db.__main__:main'
    ]
})

После переустановки модуля будет сгенерирован файл env/bin/analyzer-db и команда analyzer-db станет доступной:

$ pip install -e '.[dev]'

Генерируем миграции


Чтобы сгенерировать миграции, требуется два состояния: желаемое (которое мы описали объектами SQLAlchemy) и реальное (база данных, в нашем случае пустая).

Я решил, что проще всего поднять Postgres с помощью Docker и для удобства добавил команду make postgres, запускающую в фоновом режиме контейнер с PostgreSQL на 5432 порту:

Поднимаем PostgreSQL и генерируем миграцию
$ make postgres
...
$ analyzer-db revision --message="Initial" --autogenerate
INFO  [alembic.runtime.migration] Context impl PostgresqlImpl.
INFO  [alembic.runtime.migration] Will assume transactional DDL.
INFO  [alembic.autogenerate.compare] Detected added table 'imports'
INFO  [alembic.autogenerate.compare] Detected added table 'citizens'
INFO  [alembic.autogenerate.compare] Detected added index 'ix__citizens__town' on '['town']'
INFO  [alembic.autogenerate.compare] Detected added table 'relations'
  Generating /Users/alvassin/Work/backendschool2019/analyzer/db/alembic/versions/d5f704ed4610_initial.py ...  done

Alembic в целом хорошо справляется с рутинной работой генерации миграций, но я хотел бы обратить внимание на следующее:

  • Пользовательские типы данных, указанные в создаваемых таблицах, создаются автоматически (в нашем случае — gender), но код для их удаления в downgrade не генерируется. Если применить, откатить и потом еще раз применить миграцию, это вызовет ошибку, так как указанный тип данных уже существует.

    Удаляем тип данных gender в методе downgrade
    from alembic import op
    from sqlalchemy import Column, Enum
    
    GenderType = Enum('female', 'male', name='gender')
    
    
    def upgrade():
        ...
        # При создании таблицы тип данных GenderType будет создан автоматически
        op.create_table('citizens', ...,
                        Column('gender', GenderType, nullable=False))
        ...
    
    
    def downgrade():
        op.drop_table('citizens')
    
        # После удаления таблицы тип данных необходимо удалить
        GenderType.drop(op.get_bind())
  • В методе downgrade некоторые действия иногда можно убрать (если мы удаляем таблицу целиком, можно не удалять ее индексы отдельно):

    Например
    def downgrade():
    op.drop_table('relations')
    
    # Следующим шагом мы удаляем таблицу citizens, индекс будет удален автоматически
    # эту строчку можно удалить
    op.drop_index(op.f('ix__citizens__town'), table_name='citizens')
    op.drop_table('citizens')
    op.drop_table('imports')

Когда миграция исправлена и готова, применим ее:

$ analyzer-db upgrade head
INFO  [alembic.runtime.migration] Context impl PostgresqlImpl.
INFO  [alembic.runtime.migration] Will assume transactional DDL.
INFO  [alembic.runtime.migration] Running upgrade  -> d5f704ed4610, Initial

Приложение


Прежде чем приступить к созданию обработчиков, необходимо сконфигурировать приложение aiohttp.

Если посмотреть aiohttp quickstart, можно написать приблизительно такой код
import logging

from aiohttp import web


def main():
    # Настраиваем логирование
    logging.basicConfig(level=logging.DEBUG)

    # Создаем приложение
    app = web.Application()

    # Регистрируем обработчики
    app.router.add_route(...)

    # Запускаем приложение
    web.run_app(app)

Этот код вызывает ряд вопросов и имеет ряд недостатков:

  • Как конфигурировать приложение? Как минимум, необходимо указать хост и порт для подключения клиентов, а также информацию для подключения к базе данных.

    Мне очень нравится решать эту задачу с помощью модуля ConfigArgParse: он расширяет стандартный argparse и позволяет использовать для конфигурации аргументы командной строки, переменные окружения (незаменимые для конфигурации Docker-контейнеров) и даже файлы конфигурации (а также совмещать эти способы). C помощью ConfigArgParse также можно валидировать значения параметров конфигурации приложения.

    Пример обработки параметров с помощью ConfigArgParse
    from aiohttp import web
    from configargparse import ArgumentParser, ArgumentDefaultsHelpFormatter
    
    from analyzer.utils.argparse import positive_int
    
    parser = ArgumentParser(
        # Парсер будет искать переменные окружения с префиксом ANALYZER_,
        # например ANALYZER_API_ADDRESS и ANALYZER_API_PORT
        auto_env_var_prefix='ANALYZER_',
    
        # Покажет значения параметров по умолчанию
        formatter_class=ArgumentDefaultsHelpFormatter
    )
    
    parser.add_argument('--api-address', default='0.0.0.0',
                        help='IPv4/IPv6 address API server would listen on')
    
    # Разрешает только целые числа больше нуля
    parser.add_argument('--api-port', type=positive_int, default=8081,
                        help='TCP port API server would listen on')
    
    
    def main():
        # Получаем параметры конфигурации, которые можно передать как аргументами
        # командной строки, так и переменными окружения
        args = parser.parse_args()
    
        # Запускаем приложение на указанном порту и адресе
        app = web.Application()
        web.run_app(app, host=args.api_address, port=args.api_port)
    
    
    if __name__ == '__main__':
        main()

    Кстати, ConfigArgParse, как и argparse, умеет генерировать подсказку по запуску команды с описанием всех аргументов (необходимо позвать команду с аргументом -h или --help). Это невероятно облегчает жизнь пользователям вашего ПО:

    Например
    $ python __main__.py --help
    usage: __main__.py [-h] [--api-address API_ADDRESS] [--api-port API_PORT]
    
    If an arg is specified in more than one place, then commandline values override environment variables which override defaults.
    
    optional arguments:
      -h, --help            show this help message and exit
      --api-address API_ADDRESS
                            IPv4/IPv6 address API server would listen on [env var: ANALYZER_API_ADDRESS] (default: 0.0.0.0)
      --api-port API_PORT   TCP port API server would listen on [env var: ANALYZER_API_PORT] (default: 8081)
  • После получения переменные окружения больше не нужны и даже могут представлять опасность — например, они могут случайно «утечь» с отображением информации об ошибке. Злоумышленники в первую очередь будут пытаться получить информацию об окружении, поэтому очистка переменных окружения считается хорошим тоном.

    Можно было бы воспользоваться os.environ.clear(), но Python позволяет управлять поведением модулей стандартной библиотеки с помощью многочисленных переменных окружения (например, вдруг потребуется включить режим отладки asyncio?), поэтому разумнее очищать переменные окружения по префиксу приложения, указанного в ConfigArgParser.

    Пример
    import os
    from typing import Callable
    from configargparse import ArgumentParser
    from yarl import URL
    
    from analyzer.api.app import create_app
    from analyzer.utils.pg import DEFAULT_PG_URL
    
    ENV_VAR_PREFIX = 'ANALYZER_'
    
    parser = ArgumentParser(auto_env_var_prefix=ENV_VAR_PREFIX)
    parser.add_argument('--pg-url', type=URL, default=URL(DEFAULT_PG_URL),
                       help='URL to use to connect to the database')
    
    
    def clear_environ(rule: Callable):
        """
        Очищает переменные окружения, переменные для очистки определяет переданная
        функция rule
        """
        # Ключи из os.environ копируются в новый tuple, чтобы не менять объект
        # os.environ во время итерации
        for name in filter(rule, tuple(os.environ)):
            os.environ.pop(name)
    
    
    def main():
        # Получаем аргументы
        args = parser.parse_args()
    
        # Очищаем переменные окружения по префиксу ANALYZER_
        clear_environ(lambda i: i.startswith(ENV_VAR_PREFIX))
    
        # Запускаем приложение
        app = create_app(args)
        ...
    
    
    if __name__ == '__main__':
        main()
  • Запись логов в stderr/файл в основном потоке блокирует цикл событий.

    В лекции 9 рассказывается, что по умолчанию logging.basicConfig() настраивает запись логов в stderr.

    Чтобы логирование не мешало эффективной работе асинхронного приложения, необходимо выполнять запись логов в отдельном потоке. Для этого можно воспользоваться готовым методом из модуля aiomisc.

    Настраиваем логирование с помощью aiomisc
    import logging
    
    from aiomisc.log import basic_config
    
    basic_config(logging.DEBUG, buffered=True)    
    
  • Как масштабировать приложение, если одного процесса станет недостаточно для обслуживания входящего трафика? Можно сначала аллоцировать сокет, затем с помощью fork создать несколько новых отдельных процессов, и соединения на сокете будут распределяться между ними механизмами ядра (конечно, под Windows это не работает).

    Пример
    import os
    from sys import argv
    
    import forklib
    from aiohttp.web import Application, run_app
    from aiomisc import bind_socket
    from setproctitle import setproctitle
    
    
    def main():
        sock = bind_socket(address='0.0.0.0', port=8081, proto_name='http')
        setproctitle(f'[Master] {os.path.basename(argv[0])}')
    
        def worker():
            setproctitle(f'[Worker] {os.path.basename(argv[0])}')
            app = Application()
            run_app(app, sock=sock)
    
        forklib.fork(os.cpu_count(), worker, auto_restart=True)
    
    
    if __name__ == '__main__':
        main()
    
  • Требуется ли приложению обращаться или аллоцировать какие-либо ресурсы во время работы? Если нет, по соображениям безопасности все ресурсы (в нашем случае — сокет для подключения клиентов) можно аллоцировать на старте, а затем сменить пользователя на nobody. Он обладает ограниченным набором привиллегий — это здорово усложнит жизнь злоумышленникам.

    Пример
    import os
    import pwd
    
    from aiohttp.web import run_app
    from aiomisc import bind_socket
    
    from analyzer.api.app import create_app
    
    
    def main():
        # Аллоцируем сокет
        sock = bind_socket(address='0.0.0.0', port=8085, proto_name='http')
    
        user = pwd.getpwnam('nobody')
        os.setgid(user.pw_gid)
        os.setuid(user.pw_uid)
    
        app = create_app(...)
        run_app(app, sock=sock)
    
    
    if __name__ == '__main__':
        main()
  • В конце концов я решил вынести создание приложения в отдельную параметризуемую функцию create_app, чтобы можно было легко создавать идентичные приложения для тестирования.

Сериализация данных


Все успешные ответы обработчиков будем возвращать в формате JSON. Информацию об ошибках клиентам тоже было бы удобно получать в сериализованном виде (например, чтобы увидеть, какие поля не прошли валидацию).

Документация aiohttp предлагает метод json_response, который принимает объект, сериализует его в JSON и возвращает новый объект aiohttp.web.Response с заголовком Content-Type: application/json и сериализованными данными внутри.

Как сериализовать данные с помощью json_response
from aiohttp.web import Application, View, run_app
from aiohttp.web_response import json_response


class SomeView(View):
    async def get(self):
        return json_response({'hello': 'world'})


app = Application()
app.router.add_route('*', '/hello', SomeView)
run_app(app)

Но существует и другой способ: aiohttp позволяет зарегистрировать произвольный сериализатор для определенного типа данных ответа в реестре aiohttp.PAYLOAD_REGISTRY. Например, можно указать сериализатор aiohttp.JsonPayload для объектов типа Mapping.

В этом случае обработчику будет достаточно вернуть объект Response с данными ответа в параметре body. aiohttp найдет сериализатор, соответствующий типу данных и сериализует ответ.

Помимо того, что сериализация объектов описана в одном месте, этот подход еще и более гибкий — он позволяет реализовывать очень интересные решения (мы рассмотрим один из вариантов использования в обработчике GET /imports/$import_id/citizens).

Как сериализовать данные с помощью aiohttp.PAYLOAD_REGISTRY
from types import MappingProxyType
from typing import Mapping

from aiohttp import PAYLOAD_REGISTRY, JsonPayload
from aiohttp.web import run_app, Application, Response, View

PAYLOAD_REGISTRY.register(JsonPayload, (Mapping, MappingProxyType))


class SomeView(View):
    async def get(self):
        return Response(body={'hello': 'world'})


app = Application()
app.router.add_route('*', '/hello', SomeView)
run_app(app)

Важно понимать, что метод json_response, как и aiohttp.JsonPayload, используют стандартный json.dumps, который не умеет сериализовать сложные типы данных, например datetime.date или asyncpg.Record (asyncpg возвращает записи из БД в виде экземпляров этого класса). Более того, одни сложные объекты могут содержать другие: в одной записи из БД может быть поле типа datetime.date.

Разработчики Python предусмотрели эту проблему: метод json.dumps позволяет с помощью аргумента default указать функцию, которая вызывается, когда необходимо сериализовать незнакомый объект. Ожидается, что функция приведет незнакомый объект к типу, который умеет сериализовать модуль json.

Как расширить JsonPayload для сериализации произвольных объектов
import json
from datetime import date
from functools import partial, singledispatch
from typing import Any

from aiohttp.payload import JsonPayload as BaseJsonPayload
from aiohttp.typedefs import JSONEncoder

@singledispatch
def convert(value):
    raise NotImplementedError(f'Unserializable value: {value!r}')


@convert.register(Record)
def convert_asyncpg_record(value: Record):
    """
    Позволяет автоматически сериализовать результаты запроса, возвращаемые
    asyncpg
    """
    return dict(value)


@convert.register(date)
def convert_date(value: date):
    """
    В проекте объект date возвращается только в одном случае — если необходимо
    отобразить дату рождения. Для отображения даты рождения должен
    использоваться формат ДД.ММ.ГГГГ
    """
    return value.strftime('%d.%m.%Y')
    
 
dumps = partial(json.dumps, default=convert)


class JsonPayload(BaseJsonPayload):
    def __init__(self,
                 value: Any,
                 encoding: str = 'utf-8',
                 content_type: str = 'application/json',
                 dumps: JSONEncoder = dumps,
                 *args: Any,
                 **kwargs: Any) -> None:
        super().__init__(value, encoding, content_type, dumps, *args, **kwargs)

Обработчики


aiohttp позволяет реализовать обработчики асинхронными функциями и классами. Классы более расширяемы: во-первых, код, относящийся к одному обработчику, можно разместить в одном месте, а во вторых, классы позволяют использовать наследование для избавления от дублирования кода (например, каждому обработчику требуется соединение с базой данных).

Базовый класс обработчика
from aiohttp.web_urldispatcher import View
from asyncpgsa import PG


class BaseView(View):
    URL_PATH: str

    @property
    def pg(self) -> PG:
        return self.request.app['pg']

Так как один большой файл читать сложно, я решил разнести обработчики по файлам. Маленькие файлы поощряют слабую связность, а если, например, есть кольцевые импорты внутри хэндлеров — значит, возможно, что-то не так с композицией сущностей.

POST /imports


На вход обработчик получает json с данными о жителях. Максимально допустимый размер запроса в aiohttp регулируется опцией client_max_size и по умолчанию равен 2 МБ. При превышении лимита aiohttp вернет HTTP-ответ со статусом 413: Request Entity Too Large Error.

В то же время корректный json c максимально длинными строчками и цифрами будет весить ~63 мегабайта, поэтому ограничения на размер запроса необходимо расширить.

Далее, необходимо проверить и десериализовать данные. Если они некорректные, нужно вернуть HTTP-ответ 400: Bad Request.

Мне потребовались две схемы Marhsmallow. Первая, CitizenSchema, проверяет данные каждого отдельного жителя, а также десериализует строку с днем рождения в объект datetime.date:

  • Тип данных, формат и наличие всех обязательных полей;
  • Отсутствие незнакомых полей;
  • Дата рождения должна быть указана в формате DD.MM.YYYY и не может иметь значения из будущего;
  • Список родственников каждого жителя должен содержать уникальные существующие в этой выгрузке идентификаторы жителей.

Вторая схема, ImportSchema, проверяет выгрузку в целом:

  • citizen_id каждого жителя в рамках выгрузки должен быть уникален;
  • Родственные связи должны быть двусторонними (если у жителя #1 в списке родственников указан житель #2, то и у жителя #2 должен быть родственник #1).

Если данные корректные, их необходимо добавить в БД с новым уникальным import_id.
Для добавления данных потребуется выполнить несколько запросов в разные таблицы. Чтобы в БД не осталось частично добавленных данных в случае возникновения ошибки или исключения (например, при отключении клиента, который не получил ответ полностью, aiohttp бросит исколючение CancelledError), необходимо использовать транзакцию.

Добавлять данные в таблицы необходимо частями, так как в одном запросе к PostgreSQL может быть не более 32 767 аргументов. В таблице citizens 9 полей. Соответственно, за 1 запрос в эту таблицу можно вставить только 32 767 / 9 = 3640 строк, а в одной выгрузке может быть до 10 000 жителей.

GET /imports/$import_id/citizens


Обработчик возвращает всех жителей для выгрузки с указанным import_id. Если указанная выгрузка не существует, необходимо вернуть HTTP-ответ 404: Not Found. Это поведение выглядит общим для обработчиков, которым требуется существующая выгрузка, поэтому я вынес код проверки в отдельный класс.

Базовый класс для обработчиков с выгрузками
from aiohttp.web_exceptions import HTTPNotFound
from sqlalchemy import select, exists

from analyzer.db.schema import imports_table


class BaseImportView(BaseView):
    @property
    def import_id(self):
        return int(self.request.match_info.get('import_id'))

    async def check_import_exists(self):
        query = select([
            exists().where(imports_table.c.import_id == self.import_id)
        ])
        if not await self.pg.fetchval(query):
            raise HTTPNotFound()

Чтобы получить список родственников для каждого жителя, потребуется выполнить LEFT JOIN из таблицы citizens в таблицу relations, агрегируя поле relations.relative_id с группировкой по import_id и citizen_id.

Если у жителя нет родственников, то LEFT JOIN вернет для него в поле relations.relative_id значение NULL и в результате агрегации список родственников будет выглядеть как [NULL].

Чтобы исправить это некорректное значение, я воспользовался функцией array_remove.

БД хранит дату в формате YYYY-MM-DD, а нам нужен формат DD.MM.YYYY.

Технически форматировать дату можно либо SQL-запросом, либо на стороне Python в момент сериализации ответа с json.dumps (asyncpg возвращает значение поля birth_date как экземпляр класса datetime.date).

Я выбрал сериализацию на стороне Python, учитывая, что birth_date — единственный объект datetime.date в проекте с единым форматом (см. раздел «Сериализация данных»).

Несмотря на то, что в обработчике выполняется два запроса (проверка на существование выгрузки и запрос на получение списка жителей), использовать транзакцию необязательно. По умолчанию PostgreSQL использует уровень изоляции READ COMMITTED и даже в рамках одной транзакции будут видны все изменения других, успешно завершенных транзакций (добавление новых строк, изменение существующих).

Самая большая выгрузка в текстовом представлении может занимать ~63 мегабайта — это достаточно много, особенно учитывая, что одновременно может прийти несколько запросов на получение данных. Есть достаточно интересный способ получать данные из БД с помощью курсора и отправлять их клиенту по частям.

Для этого нам потребуется реализовать два объекта:

  1. Объект SelectQuery типа AsyncIterable, возвращающий записи из базы данных. При первом обращении подключается к базе, открывает транзакцию и создает курсор, при дальнейшей итерации возвращает записи из БД. Возвращается обработчиком.

    Код SelectQuery
    from collections import AsyncIterable
    from asyncpgsa.transactionmanager import ConnectionTransactionContextManager
    from sqlalchemy.sql import Select
    
    
    class SelectQuery(AsyncIterable):
        """
        Используется, чтобы отправлять данные из PostgreSQL клиенту сразу после
        получения, по частям, без буфферизации всех данных
        """
        PREFETCH = 500
    
        __slots__ = (
            'query', 'transaction_ctx', 'prefetch', 'timeout'
        )
    
        def __init__(self, query: Select,
                     transaction_ctx: ConnectionTransactionContextManager,
                     prefetch: int = None,
                     timeout: float = None):
            self.query = query
            self.transaction_ctx = transaction_ctx
            self.prefetch = prefetch or self.PREFETCH
            self.timeout = timeout
    
        async def __aiter__(self):
            async with self.transaction_ctx as conn:
                cursor = conn.cursor(self.query, prefetch=self.prefetch,
                                     timeout=self.timeout)
                async for row in cursor:
                    yield row
    
  2. Сериализатор AsyncGenJSONListPayload, который умеет итерироваться по асинхронным генераторам, сериализовать данные из асинхронного генератора в JSON и отправлять данные клиентам по частям. Регистрируется в aiohttp.PAYLOAD_REGISTRY как сериализатор объектов AsyncIterable.

    Код AsyncGenJSONListPayload
    import json
    from functools import partial
    
    from aiohttp import Payload
    
    
    # Функция, умеющая сериализовать в JSON объекты asyncpg.Record и datetime.date
    dumps = partial(json.dumps, default=convert, ensure_ascii=False)
    
    
    class AsyncGenJSONListPayload(Payload):
        """
        Итерируется по объектам AsyncIterable, частями сериализует данные из них
        в JSON и отправляет клиенту
        """
        def __init__(self, value, encoding: str = 'utf-8',
                     content_type: str = 'application/json',
                     root_object: str = 'data',
                     *args, **kwargs):
            self.root_object = root_object
            super().__init__(value, content_type=content_type, encoding=encoding,
                             *args, **kwargs)
    
        async def write(self, writer):
            # Начало объекта
            await writer.write(
                ('{"%s":[' % self.root_object).encode(self._encoding)
            )
    
            first = True
            async for row in self._value:
                # Перед первой строчкой запятая не нужнаа
                if not first:
                    await writer.write(b',')
                else:
                    first = False
    
                await writer.write(dumps(row).encode(self._encoding))
    
            # Конец объекта
            await writer.write(b']}')

Далее, в обработчике можно будет создать объект SelectQuery, передать ему SQL запрос и функцию для открытия транзакции и вернуть его в Response body:

Код обработчика
# analyzer/api/handlers/citizens.py
from aiohttp.web_response import Response
from aiohttp_apispec import docs, response_schema

from analyzer.api.schema import CitizensResponseSchema
from analyzer.db.schema import citizens_table as citizens_t
from analyzer.utils.pg import SelectQuery

from .query import CITIZENS_QUERY
from .base import BaseImportView


class CitizensView(BaseImportView):
    URL_PATH = r'/imports/{import_id:\d+}/citizens'

    @docs(summary='Отобразить жителей для указанной выгрузки')
    @response_schema(CitizensResponseSchema())
    async def get(self):
        await self.check_import_exists()

        query = CITIZENS_QUERY.where(
            citizens_t.c.import_id == self.import_id
        )
        body = SelectQuery(query, self.pg.transaction())
        return Response(body=body)

aiohttp обнаружит в реестре aiohttp.PAYLOAD_REGISTRY зарегистрированный сериализатор AsyncGenJSONListPayload для объектов типа AsyncIterable. Затем сериализатор будет итерироваться по объекту SelectQuery и отправлять данные клиенту. При первом обращении объект SelectQuery получает соединение к БД, открывает транзакцию и создает курсор, при дальнейшей итерации будет получать данные из БД курсором и возвращать их построчно.

Этот подход позволяет не выделять память на весь объем данных при каждом запросе, но у него есть особенность: приложение не сможет вернуть клиенту соответствующий HTTP-статус, если возникнет ошибка (ведь клиенту уже был отправлен HTTP-статус, заголовки, и пишутся данные).

При возникновении исключения не остается ничего, кроме как разорвать соединение. Исключение, конечно, можно залогировать, но клиент не сможет понять, какая именно ошибка произошла.

С другой стороны, похожая ситуация может возникнуть, даже если обработчик получит все данные из БД, но при передаче данных клиенту моргнет сеть — от этого никто не застрахован.

PATCH /imports/$import_id/citizens/$citizen_id


Обработчик получает на вход идентификатор выгрузки import_id, жителя citizen_id, а также json с новыми данными о жителе. В случае обращения к несуществующей выгрузке или жителю необходимо вернуть HTTP-ответ 404: Not Found.

Переданные клиентом данные требуется проверить и десериализовать. Если они некорректные — необходимо вернуть HTTP-ответ 400: Bad Request. Я реализовал Marshmallow-схему PatchCitizenSchema, которая проверяет:

  • Тип и формат данных для указанных полей.
  • Дату рождения. Она должна быть указана в формате DD.MM.YYYY и не может иметь значения из будущего.
  • Список родственников каждого жителя. Он должен иметь уникальные идентификаторы жителей

Существование родственников, указанных в поле relatives, можно отдельно не проверять: при добавлении в таблицу relations несуществующего жителя PostgreSQL вернет ошибку ForeignKeyViolationError, которую можно обработать и вернуть HTTP-статус 400: Bad Request.

Какой статус возвращать, если клиент прислал некорректные данные для несуществующего жителя или выгрузки? Семантически правильнее проверять сначала существование выгрузки и жителя (если такого нет — возвращать 404: Not Found) и только потом —корректные ли данные прислал клиент (если нет — возвращать 400: Bad Request). На практике часто бывает дешевле сначала проверить данные, и только если они корректные, обращаться к базе.

Оба варианта приемлемы, но я решил выбрать более дешевый второй вариант, так как в любом случае результат операции — ошибка, которая ни на что не влияет (клиент исправит данные и потом так же узнает, что житель не существует).

Если данные корректные, необходимо обновить информацию о жителе в БД. В обработчике потребуется сделать несколько запросов к разным таблицам. Если возникнет ошибка или исключение, изменения в базе данных должны быть отменены, поэтому запросы необходимо выполнять в транзакции.

Метод PATCH позволяет передавать лишь некоторые поля для изменяемого жителя.

Обработчик необходимо написать таким образом, чтобы он не падал при обращении к данным, которые не указал клиент, а также не выполнял запросы к таблицам, данные в которых не изменились.

Если клиент указал поле relatives, необходимо получить список существующих родственников. Если он изменился — определить, какие записи из таблицы relatives необходимо удалить, а какие добавить, чтобы привести базу данных в соответствие с запросом клиента. По умолчанию в PostgreSQL для изоляции транзакций используется уровень READ COMMITTED. Это означает, что в рамках текущей транзакции будут видны изменения существующих (а также добавления новых) записей других завершенных транзакций. Это может привести к состоянию гонки между конкурентными запросами.

Предположим, существует выгрузка с жителями #1, #2, #3, без родственных связей. Сервис получает два одновременных запроса на изменение жителя #1: {"relatives": [2]} и {"relatives": [3]}. aiohttp создаст два обработчика, которые одновременно получат текущее состояние жителя из PostgreSQL.

Каждый обработчик не обнаружит ни одной родственной связи и примет решение добавить новую связь с указанным родственником. В результате у жителя #1 поле relatives равно [2,3].



Такое поведение нельзя назвать очевидным. Есть два варианта ожидаемо решить исход гонки: выполнить только первый запрос, а для второго вернуть HTTP-ответ
409: Conflict (чтобы клиент повторил запрос), либо выполнить запросы по очереди (второй запрос будет обработан только после завершения первого).

Первый вариант можно реализовать, включив режим изоляции SERIALIZABLE. Если во время обработки запроса кто-то уже успел изменить и закоммитить данные, будет брошено исключение, которое можно обработать и вернуть соответствующий HTTP-статус.

Минус такого решения — большое число блокировок в PostgreSQL, SERIALIZABLE будет вызывать исключение, даже если конкурентные запросы меняют записи жителей из разных выгрузок.

Также можно воспользоваться механизмом рекомендательных блокировок. Если получить такую блокировку по import_id, конкурентные запросы для разных выгрузок смогут выполняться параллельно.

Для обработки конкурентных запросов в одной выгрузке можно реализовать поведение любого из вариантов: функция pg_try_advisory_xact_lock пытается получить блокировку и
возвращает результат boolean немедленно (если блокировку получить не удалось — можно бросить исключение), а pg_advisory_xact_lock ожидает, пока
ресурс не станет доступен для блокировки (в этом случае запросы выполнятся последовательно, я остановился на этом варианте).

В итоге обработчик должен вернуть актуальную информацию об обновленном жителе. Можно было ограничиться возвращением клиенту данных из его же запроса (раз мы возвращаем ответ клиенту, значит, исключений не было и все запросы успешно выполнены). Или — воспользоваться ключевым словом RETURNING в запросах, изменяющих БД, и сформировать ответ из полученных результатов. Но оба этих подхода не позволили бы увидеть и протестировать случай с гонкой состояний.

К сервису не предъявлялись требования по высокой нагрузке, поэтому я решил запрашивать все данные о жителе заново и возвращать клиенту честный результат из БД.

GET /imports/$import_id/citizens/birthdays


Обработчик вычисляет число подарков, которое приобретет каждый житель выгрузки своим родственникам (первого порядка). Число сгруппировано по месяцам для выгрузки с указанным import_id. В случае обращения к несуществующей выгрузке необходимо вернуть HTTP-ответ 404: Not Found.

Есть два варианта реализации:

  1. Получить данные для жителей с родственниками из базы, а на стороне Python агрегировать данные по месяцам и сгенерировать списки для тех месяцев, для которых нет данных в БД.
  2. Cоставить json-запрос в базу и дописать для отсутствующих месяцев заглушки.

Я остановился на первом варианте — визуально он выглядит более понятным и поддерживаемым. Число дней рождений в определенном месяце можно получить, сделав JOIN из таблицы с родственными связями (relations.citizen_id — житель, для которого мы считаем дни рождения родственников) в таблицу citizens (содержит дату рождения, из которой требуется получить месяц).

Значения месяцев не должны содержать ведущих нулей. Месяц, получаемый из поля birth_date c помощью функции date_part, может содержать ведущий ноль. Чтобы убрать его, я выполнил cast к integer в SQL-запросе.

Несмотря на то, что в обработчике требуется выполнить два запроса (проверить существование выгрузки и получить информации о днях рождения и подарках), транзакция не требуется.

По умолчанию PostgreSQL использует режим READ COMMITTED, при котором в текущей транзакции видны все новые (добавляемые другими транзакциями) и существующие (изменяемые другими транзакциями) записи после их успешного завершения.

Например, если в момент получения данных будет добавлена новая выгрузка — она никак не повлияет на существующие. Если в момент получения данных будет выполнен запрос на изменение жителя — то либо данные еще не будут видны (если транзакция, меняющая данные, не завершилась), либо транзакция полностью завершится и станут видны сразу все изменения. Целостность получаемых из базы данных не нарушится.

GET /imports/$import_id/towns/stat/percentile/age


Обработчик вычисляет 50-й, 75-й и 99-й перцентили возрастов (полных лет) жителей по городам в выборке с указанным import_id. В случае обращения к несуществующей выгрузке необходимо вернуть HTTP-ответ 404: Not Found.

Несмотря на то, что в обработчике выполняется два запроса (проверка на существование выгрузки и получение списка жителей), использовать транзакцию необязательно.

Есть два варианта реализации:

  1. Получить из БД возраста жителей, сгруппированные по городам, а затем на стороне Python вычислить перцентили с помощью numpy (который в задании указан как эталонный) и округлить до двух знаков после запятой.
  2. Сделать всю работу на стороне PostgreSQL: функция percentile_cont вычисляет перцентиль с линейной интерполяцией, затем округляем полученные значения до двух знаков после запятой в рамках одного SQL-запроса, а numpy используем для тестирования.

Второй вариант требует передавать меньше данных между приложением и PostgreSQL, но у него есть не очень очевидный подводный камень: в PostgreSQL округление математическое, (SELECT ROUND(2.5) вернет 3), а в Python — бухгалтерское, к ближайшему целому (round(2.5) вернет 2).

Чтобы тестировать обработчик, реализация должна быть одинаковой и в PostgreSQL, и в Python (реализовать функцию с математическим округлением в Python выглядит проще). Стоит отметить, что при вычислении перцентилей numpy и PostgreSQL могут возвращать немного отличающиеся числа, но с учетом округления эта разница будет незаметна.

Тестирование


Что нужно проверить в этом приложении? Во-первых, что обработчики отвечают требованиям и выполняют требуемую работу в окружении, максимально близком к боевому. Во-вторых, что миграции, которые изменяют состояние базы данных, работают без ошибок. В-третьих, есть ряд вспомогательных функций, которые тоже было бы правильно покрыть тестами.

Я решил воспользоваться фреймворком pytest из-за его гибкости и простоты в использовании. Он предлагает мощный механизм подготовки окружения для тестов — фикстуры, то есть функции с декоратором pytest.mark.fixture, названия которых можно указать параметром в тесте. Если pytest обнаружит в аннотации теста параметр с названием фикстуры, он выполнит эту фикстуру и передаст результат в значении этого параметра. А если фикстура является генератором, то параметр теста примет значение, возвращаемое yield, и после окончания теста выполнится вторая часть фикстуры, которая может очистить ресурсы или закрыть соединения.

Для большинства тестов нам потребуется база данных PostgreSQL. Чтобы изолировать тесты друг от друга, можно перед выполнением каждого теста создавать отдельную базу данных, а после выполнения — удалять ее.

Создаем базу данных фикстурой для каждого теста
import os
import uuid

import pytest
from sqlalchemy import create_engine
from sqlalchemy_utils import create_database, drop_database
from yarl import URL

from analyzer.utils.pg import DEFAULT_PG_URL

PG_URL = os.getenv('CI_ANALYZER_PG_URL', DEFAULT_PG_URL)


@pytest.fixture
def postgres():
    tmp_name = '.'.join([uuid.uuid4().hex, 'pytest'])
    tmp_url = str(URL(PG_URL).with_path(tmp_name))
    create_database(tmp_url)

    try:
        # Это значение будет иметь параметр postgres в функции-тесте
        yield tmp_url
    finally:
        drop_database(tmp_url)


def test_db(postgres):
    """
    Пример теста, использующего PostgreSQL
    """
    engine = create_engine(postgres)
    assert engine.execute('SELECT 1').scalar() == 1
    engine.dispose()

C этой задачей здорово справился модуль sqlalchemy_utils, учитывающий особенности разных баз данных и драйверов. Например, PostgreSQL не разрешает выполнение CREATE DATABASE в блоке транзакции. При создании БД sqlalchemy_utils переводит psycopg2 (который обычно выполняет все запросы в транзакции) в режим autocommit.

Другая важная особенность: если к PostgreSQL подключен хотя бы один клиент — базу данных нельзя удалить, а sqlalchemy_utils отключает всех клиентов перед удалением базы. БД будет успешно удалена, даже если зависнет какой-нибудь тест, имеющий активные подключения к ней.

PostgreSQL потребуется нам в разных состояниях: для тестирования миграций необходима чистая база данных, в то время как обработчики требуют, чтобы все миграции были применены. Изменять состояние базы данных можно программно с помощью команд Alembic, для их вызова требуется объект конфигурации Alembic.

Создаем фикстурой объект конфигурации Alembic
from types import SimpleNamespace

import pytest

from analyzer.utils.pg import make_alembic_config


@pytest.fixture()
def alembic_config(postgres):
    cmd_options = SimpleNamespace(config='alembic.ini', name='alembic',
                                  pg_url=postgres, raiseerr=False, x=None)
    return make_alembic_config(cmd_options)

Обратите внимание, что у фикстуры alembic_config есть параметр postgrespytest позволяет не только указывать зависимость теста от фикстур, но и зависимости между фикстурами.

Этот механизм позволяет гибко разделять логику и писать очень краткий и переиспользуемый код.

Обработчики


Для тестирования обработчиков требуется база данных с созданными таблицами и типами данных. Чтобы применить миграции, необходимо программно вызвать команду upgrade Alembic. Для ее вызова потребуется объект с конфигурацией Alembic, который мы уже определили фикстурой alembic_config. База данных с миграциями выглядит как вполне самостоятельная сущность, и ее можно представить в виде фикстуры:

from alembic.command import upgrade

@pytest.fixture
async def migrated_postgres(alembic_config, postgres):
    upgrade(alembic_config, 'head')
    # Возвращаем DSN базы данных, которая была смигрирована 
    return postgres

Когда миграций в проекте становится много, их применение для каждого теста может занимать слишком много времени. Чтобы ускорить процесс, можно один раз создать базу данных с миграциями и затем использовать ее в качестве шаблона.

Помимо базы данных для тестирования обработчиков, потребуется запущенное приложение, а также клиент, настроенный на работу с этим приложением. Чтобы приложение было легко тестировать, я вынес его создание в функцию create_app, которая принимает параметры для запуска: базу данных, порт для REST API и другие.

Аргументы для запуска приложения можно также представить в виде отдельной фикстуры. Для их создания потребуется определить свободный порт для запуска тестируемого приложения и адрес до смигрированной временной базы данных.

Для определения свободного порта я воспользовался фикстурой aiomisc_unused_port из пакета aiomisc.

Стандартная фикстура aiohttp_unused_port тоже вполне бы подошла, но она возвращает функцию для определения свободых портов, в то время как aiomisc_unused_port возвращает сразу номер порта. Для нашего приложения требуется определить только один свободный порт, поэтому я решил не писать лишнюю строчку кода с вызовом aiohttp_unused_port.

@pytest.fixture
def arguments(aiomisc_unused_port, migrated_postgres):
    return parser.parse_args(
        [
            '--log-level=debug',
            '--api-address=127.0.0.1',
            f'--api-port={aiomisc_unused_port}',
            f'--pg-url={migrated_postgres}'
        ]
    )

Все тесты с обработчиками подразумевают запросы к REST API, работа напрямую с приложением aiohttp не требуется. Поэтому я сделал одну фикстуру, которая запускает приложение и с помощью фабрики aiohttp_client создает и возвращает подключенный к приложению стандартный тестовый клиент aiohttp.test_utils.TestClient.

from analyzer.api.app import create_app

@pytest.fixture
async def api_client(aiohttp_client, arguments):
    app = create_app(arguments)
    client = await aiohttp_client(app, server_kwargs={
        'port': arguments.api_port
    })

    try:
        yield client
    finally:
        await client.close()

Теперь, если в параметрах теста указать фикстуру api_client, произойдет следующее:

  1. Фикстура postgres создаст базу данных (зависимость для migrated_postgres).
  2. Фикстура alembic_config создаст объект конфигурации Alembic, подключенный к временной базе данных (зависимость для migrated_postgres).
  3. Фикстура migrated_postgres применит миграции (зависимость для arguments).
  4. Фикстура aiomisc_unused_port обнаружит свободный порт (зависимость для arguments).
  5. Фикстура arguments создаст аргументы для запуска (зависимость для api_client).
  6. Фикстура api_client создаст и запустит приложение и вернет клиента для выполнения запросов.
  7. Выполнится тест.
  8. Фикстура api_client отключит клиента и остановит приложение.
  9. Фикстура postgres удалит базу данных.

Фикстуры позволяют избежать дублирования кода, но помимо подготовки окружения в тестах есть еще одно потенциальное место, в котором будет очень много одинакового кода — запросы к приложению.

Во-первых, сделав запрос, мы ожидаем получить определенный HTTP-статус. Во-вторых, если статус совпадает с ожидаемым, то перед работой с данными необходимо убедиться, что они имеют правильный формат. Здесь легко ошибиться и написать обработчик, который делает правильные вычисления и возвращает правильный результат, но не проходит автоматическую валидацию из-за неправильного формата ответа (например, забыть обернуть ответ в словарь с ключом data). Все эти проверки можно было бы сделать в одном месте.

В модуле analyzer.testing я подготовил для каждого обработчика функцию-помощник, которая проверяет статус HTTP, а также формат ответа с помощью Marshmallow.

GET /imports/$import_id/citizens


Я решил начать с обработчика, возвращающего жителей, потому что он очень полезен для проверки результатов работы других обработчиков, изменяющих состояние базы данных.

Я намеренно не использовал код, добавляющий данные в базу из обработчика POST /imports, хотя вынести его в отдельную функцию несложно. Код обработчиков имеет свойство меняться, а если в коде, добавляющем в базу, будет какая-либо ошибка, есть вероятность, что тест перестанет работать как задумано и неявно для разработчиков перестанет показывать ошибки.

Для этого теста я определил следующие наборы данных для тестирования:

  • Выгрузка с несколькими родственниками. Проверяет, что для каждого жителя будет правильно сформирован список с идентификаторами родственников.
  • Выгрузка с одним жителем без родственников. Проверяет, что поле relatives — пустой список (из-за LEFT JOIN в SQL-запросе список родственников может быть равен [None]).
  • Выгрузка с жителем, который сам себе родственник.
  • Пустая выгрузка. Проверяет, что обработчик разрешает добавить пустую выгрузку и не падает с ошибкой.

Чтобы запустить один и тот же тест отдельно на каждой выгрузке, я воспользовался еще одним очень мощным механизмом pytest — параметризацией. Этот механизм позволяет обернуть функцию-тест в декоратор pytest.mark.parametrize и описать в нем, какие параметры должна принимать функция-тест для каждого отдельного тестируемого случая.

Как параметризовать тест
import pytest

from analyzer.utils.testing import generate_citizen

datasets = [
    # Житель с несколькими родственниками
    [
        generate_citizen(citizen_id=1, relatives=[2, 3]),
        generate_citizen(citizen_id=2, relatives=[1]),
        generate_citizen(citizen_id=3, relatives=[1])
    ],

    # Житель без родственников
    [
        generate_citizen(relatives=[])
    ],

    # Выгрузка с жителем, который сам себе родственник
    [
        generate_citizen(citizen_id=1, name='Джейн', gender='male',
                         birth_date='17.02.2020', relatives=[1])
    ],

    # Пустая выгрузка
    [],
]


@pytest.mark.parametrize('dataset', datasets)
async def test_get_citizens(api_client, dataset):
    """
    Этот тест будет вызван 4 раза, отдельно для каждого датасета
    """

Итак, тест добавит выгрузку в базу данных, затем с помощью запроса к обработчику получит информацию о жителях и сравнит эталонную выгрузку с полученной. Но как сравнить жителей?

Каждый житель состоит из скалярных полей и поля relatives — списка идентификаторов родственников. Список в Python — упорядоченный тип, и при сравнении порядок элементов каждого списка имеет значение, но при сравнении списков с родственниками порядок не должен иметь значение.

Если привести relatives к множеству перед сравнением, то при сравнении не получится обнаружить ситуацию, когда у одного из жителей в поле relatives есть дубли. Если отсортировать список с идентификаторами родственников, это позволит обойти проблему разного порядка идентификаторов родственников, но при этом обнаружить дубли.

При сравнении двух списков с жителями можно столкнуться с похожей проблемой: технически, порядок жителей в выгрузке не важен, но важно обнаружить, если в одной выгрузке будет два жителя с одинаковыми идентификаторами, а в другой нет. Так что помимо упорядочивания списка с родственниками relatives для каждого жителя необходимо упорядочить жителей в каждой выгрузке.

Так как задача сравнения жителей возникнет еще не раз, я реализовал две функции: одну для сравнения двух жителей, а вторую для сравнения двух списков с жителями:

Сравниваем жителей
from typing import Iterable, Mapping

def normalize_citizen(citizen):
    """
    Возвращает жителя с упорядоченным списком родственников
    """
    return {**citizen, 'relatives': sorted(citizen['relatives'])}


def compare_citizens(left: Mapping, right: Mapping) -> bool:
    """
    Сравнивает двух жителей
    """
    return normalize_citizen(left) == normalize_citizen(right)


def compare_citizen_groups(left: Iterable, right: Iterable) -> bool:
    """
    Упорядочивает списки с родственниками для каждого жителя, списки с жителями
    и сравнивает их
    """
    left = [normalize_citizen(citizen) for citizen in left]
    left.sort(key=lambda citizen: citizen['citizen_id'])

    right = [normalize_citizen(citizen) for citizen in right]
    right.sort(key=lambda citizen: citizen['citizen_id'])
    return left == right

Чтобы убедиться, что этот обработчик не возвращает жителей других выгрузок, я решил перед каждым тестом добавлять дополнительную выгрузку с одним жителем.

POST /imports


Я определил следующие наборы данных для тестирования обработчика:

  • Корректные данные, ожидается успешное добавление в БД.

    • Житель без родственников (самый простой).

      Обработчику необходимо добавить данные в две таблицы. Если не обрабатывается ситуация, когда у жителя нет родственников, будет выполнен пустой insert в таблицу родственных связей, что приведет к ошибке.
    • Житель с родственниками (более сложный, обычный).

      Проверяет, что обработчик корректно сохраняет данные и о жителе и его родственных связях.
    • Житель сам себе родственник.

      Про этот случай было много вопросов, поэтому в шутку решил добавить и его. :)
    • Выгрузка с максимального размера

      Проверяет, что aiohttp позволяет загружать такие объемы данных и что при большом количестве данных в PostgreSQL не отправляется больше 32 767 аргументов (обработчик должен выполнить несколько запросов).
    • Пустая выгрузка

      Обработчик должен учитывать такой случай и не падать, пытаясь выполнить пустой insert в таблицу с жителями.

  • Данные с ошибками, ожидаем HTTP-ответ 400: Bad Request.

    • Дата рождения некорректная (будущее время).
    • citizen_id в рамках выгрузки не уникален.
    • Родственная связь указана неверно (есть только от одного жителя к другому, но нет обратной).
    • У жителя указан несуществующий в выгрузке родственник.
    • Родственные связи не уникальны.

Если обработчик отработал успешно и данные были добавлены, необходимо получить добавленных в БД жителей и сравнить их с эталонной выгрузки. Для получения жителей я воспользовался уже протестированным обработчиком GET /imports/$import_id/citizens, а для сравнения — функцией compare_citizen_groups.

PATCH /imports/$import_id/citizens/$citizen_id


Валидация данных во многом похожа на описанную в обработчике POST /imports с небольшими исключениями: есть только один житель и клиент может передать только те поля, которые пожелает.

Я решил использовать следующие наборы с некорректными данными, чтобы проверить, что обработчик вернет HTTP-ответ 400: Bad request:

  • Поле указано, но имеет некорректный тип и/или формат данных
  • Указана некорректная дата рождения (будущее время).
  • Поле relatives содержит несуществующего в выгрузке родственника.

Также необходимо проверить, что обработчик корректно обновляет информацию о жителе и его родственниках.

Для этого создадим выгрузку с тремя жителями, два из которых — родственники, и отправим запрос с новыми значениями всех скалярных полей и новым идентификатором родственника в поле relatives.

Чтобы убедиться, что обработчик различает жителей разных выгрузок перед тестом (и, например, не изменит жителей с одинаковыми идентификаторами из другой выгрузки), я создал дополнительную выгрузку с тремя жителями, которые имеют такие же идентификаторы.

Обработчик должен сохранить новые значения скалярных полей, добавить нового указанного родственника и удалить связь со старым, не указанным родственником. Все изменения родственных связей должны быть двусторонними. Изменений в других выгрузках быть не должно.

Поскольку такой обработчик может быть подвержен состоянию гонки (это рассматривалось в разделе «Разработка»), я добавил два дополнительных теста. Один воспроизводит проблему с состоянием гонки (расширяет класс обработчика и убирает блокировку), второй доказывает, что проблема с состоянием гонки не воспроизводится.

GET /imports/$import_id/citizens/birthdays


Для тестирования этого обработчика я выбрал следующие наборы данных:

  • Выгрузка, в которой у жителя есть один родственник в одном месяце и два родственника в другом.
  • Выгрузка с одним жителем без родственников. Проверяет, что обработчик не учитывает его при расчетах.
  • Пустая выгрузка. Проверяет, что обработчик не упадет с ошибкой и вернет в ответе корректный словарь с 12 месяцами.
  • Выгрузка с жителем, который сам себе родственник. Проверяет, что житель купит себе подарок в месяц своего рождения.

Обработчик должен возвращать в ответе все месяцы, даже если в эти месяцы нет дней рождений. Чтобы избежать дублирования, я сделал функцию, которой можно передать словарь, чтобы она дополнила его значениями для отсутствующих месяцев.

Чтобы убедиться, что обработчик различает жителей разных выгрузок, я добавил дополнительную выгрузку с двумя родственниками. Если обработчик по ошибке использует их при расчетах, то результаты будут некорректными и обработчик упадет с ошибкой.

GET /imports/$import_id/towns/stat/percentile/age


Особенность этого теста в том, что результаты его работы зависят от текущего времени: возраст жителей вычисляется исходя из текущей даты. Чтобы результаты тестирования не менялись с течением времени, текущую дату, даты рождения жителей и ожидаемые результаты необходимо зафиксировать. Это позволит легко воспроизвести любые, даже краевые случаи.

Как лучше зафиксировать дату? В обработчике для вычисления возраста жителей используется PostgreSQL-функция AGE, принимающая первым параметром дату, для которой необходимо рассчитать возраст, а вторым — базовую дату (определена константой TownAgeStatView.CURRENT_DATE).

Подменяем базовую дату в обработчике на время теста
from unittest.mock import patch

import pytz

CURRENT_DATE = datetime(2020, 2, 17, tzinfo=pytz.utc)


@patch('analyzer.api.handlers.TownAgeStatView.CURRENT_DATE', new=CURRENT_DATE)
async def test_get_ages(...):
    ...

Для тестирования обработчика я выбрал следующие наборы данных (для всех жителей указывал один город, потому что обработчик агрегирует результаты по городам):

  • Выгрузка с несколькими жителями, у которых завтра день рождения (возраст — несколько лет и 364 дня). Проверяет, что обработчик использует в расчетах только количество полных лет.
  • Выгрузка с жителем, у которого сегодня день рождения (возраст — ровно несколько лет). Проверяет краевой случай — возраст жителя, у которого сегодня день рождения, не должен рассчитаться как уменьшенный на 1 год.
  • Пустая выгрузка. Обработчик не должен на ней падать.

Эталон для расчета перцентилей — numpy с линейной интерполяцией, и эталонные результаты для тестирования я рассчитал именно им.

Также нужно округлять дробные значения перцентилей до двух знаков после запятой. Если вы использовали в обработчике для округления PostgreSQL, а для расчета эталонных данных — Python, то могли заметить, что округление в Python 3 и PostgreSQL может давать разные результаты.

Например
# Python 3
round(2.5)
> 2

-- PostgreSQL
SELECT ROUND(2.5)
> 3

Дело в том, что Python использует банковское округление до ближайшего четного, а PostgreSQL — математическое (half-up). В случае, если расчеты и округление производятся в PostgreSQL, было бы правильным в тестах также использовать математическое округление.

Сначала я описал наборы данных с датами рождения в текстовом формате, но читать тест в таком формате было неудобно: приходилось каждый раз вычислять в уме возраст каждого жителя, чтобы вспомнить, что проверяет тот или иной набор данных. Конечно, можно было обойтись комментариями в коде, но я решил пойти чуть дальше и написал функцию age2date, которая позволяет описать дату рождения в виде возраста: количества лет и дней.

Например, вот так
import pytz

from analyzer.utils.testing import generate_citizen


CURRENT_DATE = datetime(2020, 2, 17, tzinfo=pytz.utc)

def age2date(years: int, days: int = 0, base_date=CURRENT_DATE) -> str:
    birth_date = copy(base_date).replace(year=base_date.year - years)
    birth_date -= timedelta(days=days)
    return birth_date.strftime(BIRTH_DATE_FORMAT)

# Сколько лет этому жителю? Посчитать несложно, но если их будет много?
generate_citizen(birth_date='17.02.2009')

# Жителю ровно 11 лет и у него сегодня день рождения
generate_citizen(birth_date=age2date(years=11))

Чтобы убедиться, что обработчик различает жителей разных выгрузок, я добавил дополнительную выгрузку с одним жителем из другого города: если обработчик по ошибке использует его, в результатах появится лишний город и тест сломается.

Интересный факт: когда я писал этот тест 29 февраля 2020 года, у меня внезапно перестали генерироваться выгрузки с жителями из-за бага в Faker (2020-й — високосный год, а другие годы, которые выбирал Faker, не всегда были високосными и в них не было 29 февраля). Не забывайте фиксировать даты и тестировать краевые случаи!

Миграции


Код миграций на первый взгляд кажется очевидным и наименее подверженным ошибкам, зачем его тестировать? Это очень опасное заблуждение: самые коварные ошибки миграций могут проявить себя в самый неподходящий момент. Даже если они не испортят данные, то могут стать причиной лишнего даунтайма.

Существующая в проекте initial миграция изменяет структуру базы данных, но не изменяет данные. От каких типовых ошибок можно защититься в подобных миграциях?

  • Метод downgrade не реализован или не удалены все созданные в миграции сущности (особенно это касается пользовательских типов данных, которые создаются автоматически при создании таблицы, я про них уже упоминал).

    Это приведет к тому, что миграцию нельзя будет применить два раза (применить-откатить-применить): при откате не будут удалены все созданные миграцией сущности, при повторном создании миграция пройдет с ошибкой — тип данных уже существует.
  • Cинтаксические ошибки и опечатки.
  • Ошибки в связях миграций (цепочка нарушена).

Большинство этих ошибок обнаружит stairway-тест. Его идея — применять миграции по одной, последовательно выполняя методы upgrade, downgrade, upgrade для каждой миграции. Такой тест достаточно один раз добавить в проект, он не требует поддержки и будет служить верой и правдой.

А вот если миграция, помимо структуры, изменяла бы данные, то потребовалось бы написать хотя бы один отдельный тест, проверяющий, что данные корректно изменяются в методе upgrade и возвращаются к изначальному состоянию в downgrade. На всякий случай: проект с примерами тестирования разных миграций, который я подготовил для доклада про Alembic на Moscow Python.

Сборка


Конечный артефакт, который мы собираемся разворачивать и который хотим получить в результате сборки, — Docker-образ. Для сборки необходимо выбрать базовый образ c Python. Официальный образ python:latest весит ~1 ГБ и, если его использовать в качестве базового, образ с приложением будет огромным. Существуют образы на основе ОС Alpine, размер которых намного меньше. Но с растущим количеством устанавливаемых пакетов размер конечного образа вырастет, и в итоге даже образ, собранный на основе Alpine, будет не таким уж и маленьким. Я выбрал в качестве базового образа snakepacker/python — он весит немного больше Alpine-образов, но основан на Ubuntu, которая предлагает огромный выбор пакетов и библиотек.

Еще один способ уменьшить размер образа с приложением — не включать в итоговый образ компилятор, библиотеки и файлы с заголовками для сборки, которые не потребуются для работы приложения.

Для этого можно воспользоваться многоступенчатой сборкой Docker:

  1. С помощью «тяжелого» образа snakepacker/python:all (~1 ГБ, в сжатом виде ~500 МБ) создаем виртуальное окружение, устанавливаем в него все зависимости и пакет с приложением. Этот образ нужен исключительно для сборки, он может содержать компилятор, все необходимые библиотеки и файлы с заголовками.

    FROM snakepacker/python:all as builder
    
    # Создаем виртуальное окружение
    RUN python3.8 -m venv /usr/share/python3/app
    
    # Копируем source distribution в контейнер и устанавливаем его
    COPY dist/ /mnt/dist/
    RUN /usr/share/python3/app/bin/pip install /mnt/dist/*
  2. Готовое виртуальное окружение копируем в «легкий» образ snakepacker/python:3.8 (~100 МБ, в сжатом виде ~50 МБ), который содержит только интерпретатор требуемой версии Python.

    Важно: в виртуальном окружении используются абсолютные пути, поэтому его необходимо скопировать по тому же адресу, по которому оно было собрано в контейнере-сборщике.

    FROM snakepacker/python:3.8 as api
    
    # Копируем готовое виртуальное окружение из контейнера builder
    COPY --from=builder /usr/share/python3/app /usr/share/python3/app
    
    # Устанавливаем ссылки, чтобы можно было воспользоваться командами
    # приложения
    RUN ln -snf /usr/share/python3/app/bin/analyzer-* /usr/local/bin/
    
    # Устанавливаем выполняемую при запуске контейнера команду по умолчанию
    CMD ["analyzer-api"]

Чтобы сократить время на сборку образа, зависимые модули приложения можно установить до его установки в виртуальное окружение. Тогда Docker закеширует их и не будет устанавливать заново, если они не менялись.

Dockerfile целиком
############### Образ для сборки виртуального окружения ################
# Основа — «тяжелый» (~1 ГБ, в сжатом виде ~500 ГБ) образ со всеми необходимыми
# библиотеками для сборки модулей
FROM snakepacker/python:all as builder

# Создаем виртуальное окружение и обновляем pip
RUN python3.8 -m venv /usr/share/python3/app
RUN /usr/share/python3/app/bin/pip install -U pip

# Устанавливаем зависимости отдельно, чтобы закешировать. При последующей сборке
# Docker пропустит этот шаг, если requirements.txt не изменится
COPY requirements.txt /mnt/
RUN /usr/share/python3/app/bin/pip install -Ur /mnt/requirements.txt

# Копируем source distribution в контейнер и устанавливаем его
COPY dist/ /mnt/dist/
RUN /usr/share/python3/app/bin/pip install /mnt/dist/* \
    && /usr/share/python3/app/bin/pip check

########################### Финальный образ ############################
# За основу берем «легкий» (~100 МБ, в сжатом виде ~50 МБ) образ с Python
FROM snakepacker/python:3.8 as api

# Копируем в него готовое виртуальное окружение из контейнера builder
COPY --from=builder /usr/share/python3/app /usr/share/python3/app

# Устанавливаем ссылки, чтобы можно было воспользоваться командами
# приложения
RUN ln -snf /usr/share/python3/app/bin/analyzer-* /usr/local/bin/

# Устанавливаем выполняемую при запуске контейнера команду по умолчанию
CMD ["analyzer-api"]

Для удобства сборки я добавил команду make upload, которая собирает Docker-образ и загружает его на hub.docker.com.

CI


Теперь, когда код покрыт тестами и мы умеем собирать Docker-образ, самое время автоматизировать эти процессы. Первое, что приходит в голову: запускать тесты на создание пул-реквестов, а при добавлении изменений в master-ветку собирать новый Docker-образ и загружать его на Docker Hub (или GitHub Packages, если вы не собираетесь распространять образ публично).

Я решил эту задачу с помощью GitHub Actions. Для этого потребовалось создать YAML-файл в папке .github/workflows и описать в нем workflow (c двумя задачами: test и publish), которое я назвал CI.

Задача test выполняется при каждом запуске workflow CI, с помощью services поднимает контейнер с PostgreSQL, ожидает, когда он станет доступен, и запускает pytest в контейнере snakepacker/python:all.

Задача publish выполняется, только если изменения были добавлены в ветку master и если задача test была выполнена успешно. Она собирает source distribution контейнером snakepacker/python:all, затем собирает и загружает Docker-образ с помощью docker/build-push-action@v1.

Полное описание workflow
name: CI

# Workflow должен выполняться при добавлении изменений 
# или новом пул-реквесте в master
on:
  push:
    branches: [ master ]
  pull_request:
    branches: [ master ]

jobs:
  # Тесты должны выполняться при каждом запуске workflow
  test:
    runs-on: ubuntu-latest

    services:
      postgres:
        image: docker://postgres
        ports:
          - 5432:5432
        env:
          POSTGRES_USER: user
          POSTGRES_PASSWORD: hackme
          POSTGRES_DB: analyzer

    steps:
      - uses: actions/checkout@v2
      - name: test
        uses: docker://snakepacker/python:all
        env:
          CI_ANALYZER_PG_URL: postgresql://user:hackme@postgres/analyzer
        with:
          args: /bin/bash -c "pip install -U '.[dev]' && pylama && wait-for-port postgres:5432 && pytest -vv --cov=analyzer --cov-report=term-missing tests"

  # Сборка и загрузка Docker-образа с приложением
  publish:
    # Выполняется только если изменения попали в ветку master
    if: github.event_name == 'push' && github.ref == 'refs/heads/master'
    # Требует, чтобы задача test была выполнена успешно
    needs: test
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v2
      - name: sdist
        uses: docker://snakepacker/python:all
        with:
          args: make sdist

      - name: build-push
        uses: docker/build-push-action@v1
        with:
          username: ${{ secrets.REGISTRY_LOGIN }}
          password: ${{ secrets.REGISTRY_TOKEN }}
          repository: alvassin/backendschool2019
          target: api
          tags: 0.0.1, latest

Теперь при добавлении изменений в master во вкладке Actions на GitHub можно увидеть запуск тестов, сборку и загрузку Docker-образа:



А при создании пул-реквеста в master-ветку в нем также будут отображаться результаты выполнения задачи test:



Деплой


Чтобы развернуть приложение на предоставленном сервере, нужно установить Docker, Docker Compose, запустить контейнеры с приложением и PostgreSQL и применить миграции.

Эти шаги можно автоматизировать с помощью системы управления конфигурациями Ansible. Она написана на Python, не требует специальных агентов (подключается прямо по ssh), использует jinja-шаблоны и позволяет декларативно описывать желаемое состояние в YAML-файлах. Декларативный подход позволяет не задумываться о текущем состоянии системы и действиях, необходимых, чтобы привести систему к желаемому состоянию. Вся эта работа ложится на плечи модулей Ansible.

Ansible позволяет сгруппировать логически связанные задачи в роли и затем переиспользовать. Нам потребуются две роли: docker (устанавливает и настраивает Docker) и analyzer (устанавливает и настраивает приложение).

Роль docker добавляет в систему репозиторий с Docker, устанавливает и настраивает пакеты docker-ce и docker-compose.

Опционально можно наладить автоматическое возобновление работы REST API после перезагрузки сервера. Ubuntu позволяет решить эту задачу силами системы инициализации systemd. Она управляет юнитами, представляющими собой различные ресурсы (демоны, сокеты, точки монтирования и другие). Чтобы добавить новый юнит в systemd, необходимо описать его конфигурацию в отдельном файле .service и разместить этот файл в одной из специальных папок, например в /etc/systemd/system. Затем юнит можно запустить, а также включить для него автозагрузку.

Пакет docker-ce при установке автоматически создаст файл с конфигурацией юнита — необходимо только убедиться, что он запущен и включается при запуске системы. Для Docker Compose файл конфигурации docker-compose@.service будет создан силами Ansible. Символ @ в названии указывает systemd, что юнит является шаблоном. Это позволяет запускать сервис docker-compose с параметром — например, с названием нашего сервиса, который будет подставлен вместо %i в файле конфигурации юнита:

[Unit]
Description=%i service with docker compose
Requires=docker.service
After=docker.service

[Service]
Type=oneshot
RemainAfterExit=true
WorkingDirectory=/etc/docker/compose/%i
ExecStart=/usr/local/bin/docker-compose up -d --remove-orphans
ExecStop=/usr/local/bin/docker-compose down

[Install]
WantedBy=multi-user.target

Роль analyzer сгенерирует из шаблона файл docker-compose.yml по адресу /etc/docker/compose/analyzer, зарегистрирует приложение как автоматически запускаемый сервис в systemd и применит миграции. Когда роли готовы, необходимо описать playbook.

---

- name: Gathering facts
  hosts: all
  become: yes
  gather_facts: yes

- name: Install docker
  hosts: docker
  become: yes
  gather_facts: no
  roles:
    - docker

- name: Install analyzer
  hosts: api
  become: yes
  gather_facts: no
  roles:
    - analyzer

Список хостов, а также переменные, использованные в ролях, можно указать в inventory-файле hosts.ini.

[api]
130.193.51.154

[docker:children]
api

[api:vars]
analyzer_image = alvassin/backendschool2019
analyzer_pg_user = user
analyzer_pg_password = hackme
analyzer_pg_dbname = analyzer

После того, как все файлы Ansible будут готовы, запустим его:

$ ansible-playbook -i hosts.ini deploy.yml

Про нагрузочное тестирование
Итак, приложение покрыто тестами, развернуто и готово к эксплуатации. Для полноты картины на минутку вспомним, что поводом для построения сервиса когда-то было техническое задание. В нем были указаны ограничения: на выгрузке с десятью тысячами жителей, из которых тысяча — родственники первого порядка, каждый обработчик должен обрабатывать запрос менее чем за 10 секунд. Безусловно, такое тестирование целесообразно производить именно на конечном сервере (а, скажем, не на CI-сервере): результаты тестирования напрямую зависят от конфигурации сервера и количества доступных ресурсов.

Допустим, мы сгенерировали выгрузку с жителями, вызвали друг за другом все обработчики, каждый из них отработал менее чем за 10 секунд. Достаточно ли этого? Можно предположить, что скорость обработки данных будет деградировать при увеличении количества данных, загружаемых в сервис. Важно понимать, сколько выгрузок сможет обработать сервис, прежде чем обработчики перестанут укладываться в ограничения.

Хоть для тестирования данного сервиса и не требуется генерировать высокий RPS, его нагрузочное тестирование имеет свою особенность: использовать статический набор запросов не получится. Например, чтобы получить список жителей, необходимо иметь идентификатор выгрузки import_id, который возвращается обработчиком POST /imports и может оказаться любым целым числом. Этот подход называется тестированием по сценарию.

Учитывая, что генерация данных уже реализована на Python 3, я решил воспользоваться фреймворком Locust.

Чтобы выполнить нагрузочное тестирование, необходимо описать сценарий в файле locustfile.py и запустить модуль командой locust. Затем результаты тестирования можно наблюдать на графиках в веб-интерфейсе или таблице результатов в консоли.

Графики Locust показывают общую информацию. Мне было интересно узнать, на каком раунде сервис не уложится в таймаут. Я добавил переменную с номером текущей
итерации self.round и логивание каждого запроса с указанием итерации тестирования и времени выполнения.

Описываем сценарий в файле locustfile.py
# locustfile.py
import logging
from http import HTTPStatus

from locust import HttpLocust, constant, task, TaskSet
from locust.exception import RescheduleTask

from analyzer.api.handlers import (
    CitizenBirthdaysView, CitizensView, CitizenView, TownAgeStatView
)
from analyzer.utils.testing import generate_citizen, generate_citizens, url_for


class AnalyzerTaskSet(TaskSet):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.round = 0

    def make_dataset(self):
        citizens = [
            # Первого жителя создаем с родственником. В запросе к
            # PATCH-обработчику список relatives будет содержать только другого
            # жителя, что потребует выполнения максимального кол-ва запросов
            # (как на добавление новой родственной связи, так и на удаление
            # существующей).
            generate_citizen(citizen_id=1, relatives=[2]),
            generate_citizen(citizen_id=2, relatives=[1]),
            *generate_citizens(citizens_num=9998, relations_num=1000,
                               start_citizen_id=3)
        ]
        return {citizen['citizen_id']: citizen for citizen in citizens}

    def request(self, method, path, expected_status, **kwargs):
        with self.client.request(
                method, path, catch_response=True, **kwargs
        ) as resp:
            if resp.status_code != expected_status:
                resp.failure(f'expected status {expected_status}, '
                             f'got {resp.status_code}')
            logging.info(
                'round %r: %s %s, http status %d (expected %d), took %rs',
                self.round, method, path, resp.status_code, expected_status,
                resp.elapsed.total_seconds()
            )
            return resp

    def create_import(self, dataset):
        resp = self.request('POST', '/imports', HTTPStatus.CREATED,
                            json={'citizens': list(dataset.values())})
        if resp.status_code != HTTPStatus.CREATED:
            raise RescheduleTask
        return resp.json()['data']['import_id']

    def get_citizens(self, import_id):
        url = url_for(CitizensView.URL_PATH, import_id=import_id)
        self.request('GET', url, HTTPStatus.OK,
                     name='/imports/{import_id}/citizens')

    def update_citizen(self, import_id):
        url = url_for(CitizenView.URL_PATH, import_id=import_id, citizen_id=1)
        self.request('PATCH', url, HTTPStatus.OK,
                     name='/imports/{import_id}/citizens/{citizen_id}',
                     json={'relatives': [i for i in range(3, 10)]})

    def get_birthdays(self, import_id):
        url = url_for(CitizenBirthdaysView.URL_PATH, import_id=import_id)
        self.request('GET', url, HTTPStatus.OK,
                     name='/imports/{import_id}/citizens/birthdays')

    def get_town_stats(self, import_id):
        url = url_for(TownAgeStatView.URL_PATH, import_id=import_id)
        self.request('GET', url, HTTPStatus.OK,
                     name='/imports/{import_id}/towns/stat/percentile/age')

    @task
    def workflow(self):
        self.round += 1
        dataset = self.make_dataset()

        import_id = self.create_import(dataset)
        self.get_citizens(import_id)
        self.update_citizen(import_id)
        self.get_birthdays(import_id)
        self.get_town_stats(import_id)


class WebsiteUser(HttpLocust):
    task_set = AnalyzerTaskSet
    wait_time = constant(1)

Выполнив 100 итераций c максимальными выгрузками, я убедился, что время работы всех обработчиков укладывается в ограничения:



Как видно на графике распределения времени ответов обработчиков, скорость обработки запросов почти не деградирует с ростом количества данных (желтый — 95 перцентиль, зеленый — медиана). Даже со ста выгрузками сервис будет работать эффективно.



На графиках потребления ресурсов виден всплеск — установка приложения с помощью Ansible и далее ровное потребление ресурсов с ~20.15 до ~20.30 под нагрузкой от Locust.


Что еще можно сделать?


Профилирование приложения показало, что около четверти всего времени выполнения запросов уходит на сериализацию и десериализацию JSON: данных, отправляемых и получаемых из сервиса, достаточно много. Эти процессы можно существенно ускорить с помощью библиотеки orjson, но сервис придется немного подготовить — orjson не является drop-in-заменой для стандартного модуля json

Обычно для продакшена требуется несколько копий сервиса, чтобы обеспечить отказоустойчивость и справиться с нагрузкой. Для управления группой сервисов нужен инструмент, показывающий, «жива» ли копия сервиса. Решить эту задачу можно обработчиком /health, который опрашивает все требуемые для работы ресурсы, в нашем случае — базу данных. Если SELECT 1 выполняется меньше чем за секунду, то сервис жив. Если нет — нужно обратить на него внимание.

Когда приложение очень интенсивно работает с сетью, uvloop может здорово увеличить производительность.

Немаловажным фактором является и читабельность кода. Один мой коллега, Юрий Шиканов, написал объединяющий несколько инструментов модуль gray для автоматической проверки и оформления кода, который легко добавить в pre-commit Git-хук, настроить одним файлом конфигурации или переменными окружения. Gray позволяет сортировать импорты (isort), оптимизирует выражения python в соответствии с новыми версиями языка (pyupgrade), добавляет запятые в конце вызовов функций, импортов, списков и т. д. (add-trailing-comma), а также приводит кавычки к единому виду (unify).

* * *


На этом у меня все: мы разработали, покрыли тестами, собрали и развернули сервис, а также провели нагрузочное тестирование.

Благодарности


Я хотел бы выразить огромную благодарность ребятам, которые нашли время принять участие в написании этой статьи, поревьювить код, внести свои идеи и замечания: Марии Зеленовой zelma, Владимиру Соломатину leenr, Анастасии Семёновой morkov, Юрию Шиканову dizballanze, Михаилу Шушпанову mishush, Павлу Мосеину pavkazzz и особенно Дмитрию Орлову orlovdl.
Яндекс
Как мы делаем Яндекс

Comments 50

    +1
    недавно подсказали либку для генерации данных в тестах
    factoryboy.readthedocs.io
    может как писать в бд, так и сырые данные генерировать
      0
      Мы когда-то mimesis использовали
        0

        разве mimesis это не просто генератор сырых данных? тут речь о том, что фабрика умеет работать с ORM моделями той же алхимии, а что использовать для данных (mimesis или faker) уже на усмотрение разработчика.

          0
          Судя по их примерам там только sqlalchemy.orm, так что для асинхронных библиотек (aiopg/asyncpg) придется самому писать, но благо там не ракеты запускать.
            +2

            так это же тесты, можно и синхронно пописать в бд

              0
              Можно писать синхронно, а можно и свои фабрики сделать, что не очень сложно, зато тесты подготавливать потом гораздо проще.
                0
                Тут нет «правильного» решения, тут нужно от задачи исходить.
          0
          Генерация данных для тестов не всегда хорошее решение
            0
            Там можно необходимые значения прописывать, зато не ломаешь голову над неважными или просто рыбой.
          +1
          Монументальный труд, спасибо :) Кстати, судя по бенчмаркам, обычный синхронный код в языках без проблемы GIL (PHP / Go) работает на подобных задачах не хуже (производительность + латенси), чем завернутый в асинхронные библиотеки Python (или тот же NodeJS).
            +2
            От задачи зависит, если нужно стримить что-то что не кончается, то для php ответ — скорее всего нет. Для Golang чаще всего — да.

            «Асинхронные библиотеки» по сути же просто очень вкусный «синтаксический сахар» над epoll/kqueue/select.
              0

              В Go всё-таки далеко не "обычный синхронный код", скорее наоборот там всё асинхронное.

                0
                Именно код по умолчанию синхронный, а библиотеки (в том числе веб-серверы) чаще асинхронные, да.
                  0
                  Сам код — да, но при этом он весь выполняется файберами. В Go очень любопытная реализация неявной кооперативной многозадачности :)
                    0

                    Подумал, что дискуссия здесь скорее терминологическая, действительно, правильно было бы сказать что весь код на Go не асинхронный, а конкурентный.

                  0

                  Перед python ставите nginx и почти не имеете проблем :))

                    0
                    Мне вот интересно а как поможет nginx напротив prefork модели? Воркеры смогут обрабатывать не один запрос за раз?
                      –1

                      у воркера есть три фазы работы


                      • получение запроса
                      • обработка запроса (полезная работа собственно)
                      • отправка результата

                      Все бы ничего но окружающий мир не идеальный. И первая и последняя фаза на медленных клиентах может быть в несколько раз больше чем обработка запроса. Когда ставим nginx перед воркером, то первая и последняя фаза приближаются к идеалу. В итоге если к примеру получение и отправка запроса требовали 400 миллисекунд, а обработка запроса 200 миллисекунд, можно легко получить увеличение производительности на 100%.

                  +9

                  Большое чтиво, все разжевано, но внимание уделили только самому приложению (и остались вопросы), а про остальные части жизненного цикла сказано мало. Я бы добавил еще несколько тем:


                  1. Идемпотентность. Что будет, если один и тот же файл импорта загрузится несколько раз?
                  2. Что будет с данными из 2 файлов импорта одновременно?
                  3. Стоит ли обрабатывать импорты отдельными воркерами, которые можно горизонтально масштабировать?
                  4. Тротлинг, кеш.
                  5. Что будет делать CI при изменении версий зависимостей? Может лучше делать сборку через werf?
                  6. При человеческой структуре и обвязке проекта (CI, тесты, semver, env-переменные) совсем не сказали про 12 factor app.
                  7. В статье уделено время масштабированию, асинхронной обработке данных, но забыли рассказать о том, что постгрес плодит 1 процесс на 1 соединение. Было бы круто и про pg_bouncer упомянуть.
                  8. Написали про health check и про несколько экземпляров для горизонтального масштабирования, а потом бац — деплой докер композом через анзибл. Как узнать, что деплой прошел успешно? Как откатиться?
                  9. Очень мало написано про инфраструктуру. Сделали сервис, а что с ним дальше? Есть ли какой-то балансировщик перед ним? Что делать с zero-downtime deploy?
                    +3

                    Сейчас более продвинутый стек — fastapi + pydantic. Преимущества перед aiohttp — наличие переиспользуемых middleware, совместимость с ASGI, возможность использовать uvloop без каких-либо изменений в коде.

                      +1
                      Да, но как только понадобится HTTP клиент придется поставить aiohttp ;-)
                        +1
                          +2
                          Можно и httpx, но он над httpcore, который над h11 (http/1.1) и h2 (http/2), оба на питоне написаны, и перформансом не блещут. В aiohttp и для сервера и для клиента используется написанный на Cython парсер HTTP протокола, и только как фолбек, реализация на python.

                          Можно померяться бенчмарками, разумеется, но это все выглядит как вкусовщина.
                      0

                      Сериализация не через Marshmallow сделана? Окей что будет если я положу нативный UUID в PostgreSQL? :)


                      Просто я проверял несколько сериализаторов у всех кроме Marshmallow с этим были проблемы. А размещать uuid строкой в PostgreSQL при отличной нативной поддержке не хочется.

                        +1
                        Marshmallow я использовал исключительно для валидации, сериализацию UUID asyncpg умеет из коробки. Кстати, ему можно указать как сериализовать/десериализовать произвольные типы.

                        Пример
                        import asyncio
                        import logging
                        import uuid
                        
                        import asyncpg
                        
                        
                        async def main():
                            logging.basicConfig(level=logging.INFO)
                            
                            conn = await asyncpg.connect('postgresql://user:hackme@localhost/example')
                            await conn.execute(
                                'CREATE TABLE IF NOT EXISTS examples '
                                '(example_id serial primary key, uuid UUID)'
                            )
                        
                            value = uuid.uuid4()
                            result = await conn.fetchrow(
                                'INSERT INTO examples (uuid) VALUES ($1) RETURNING examples.*', 
                                value
                            )
                            assert isinstance(result['uuid'], uuid.UUID)
                            assert value == result['uuid']
                        
                        
                        asyncio.run(main())
                        

                          0

                          Я про сериализацию. В каком месте драйвер asyncpg делает сериализацию uuid в json?

                            0
                            Ага, думал вы другую задачу решаете. Сериализовать объект с произвольными типами в json можно силами asyncpg методом Connection.set_type_codec. Ему можно указать произвольный сериализатор, например json.dumps с параметром default из stdlib.

                            Пример
                            import asyncio
                            import json
                            import logging
                            import uuid
                            from functools import partial
                            
                            import asyncpg
                            
                            
                            def convert(value):
                                if isinstance(value, uuid.UUID):
                                    return str(value)
                                raise NotImplementedError
                            
                            
                            async def main():
                                logging.basicConfig(level=logging.INFO)
                            
                                conn = await asyncpg.connect('postgresql://user:hackme@localhost/example')
                                await conn.set_type_codec(
                                    'json',
                                    encoder=partial(json.dumps, default=convert),
                                    decoder=json.loads,
                                    schema='pg_catalog'
                                )
                                await conn.execute(
                                    'CREATE TABLE IF NOT EXISTS examples '
                                    '(example_id serial primary key, data JSON)'
                                )
                            
                                uuid_value = uuid.uuid4()
                                result = await conn.fetchrow(
                                    'INSERT INTO examples (data) VALUES ($1) RETURNING examples.*',
                                    {'uuid': uuid_value}
                                )
                            
                                # Не сработает, т.к. uuid десериализуется в виде строки.
                                assert isinstance(result['data']['uuid'], uuid.UUID)
                            


                            Вот десериализовать json-поле обратно в желаемые объекты без дополнительного инструмента уже просто так не получится — тут, безусловно, Marshmallow будет очень кстати.

                            Но тогда возникает вопрос: вы хотите хранить UUID как нативный тип PostgreSQL, или использовать тип данных JSON, который будет хранить UUID в виде строки?
                              0
                              Но тогда возникает вопрос: вы хотите хранить UUID как нативный тип PostgreSQL, или использовать тип данных JSON, который будет хранить UUID в виде строки?

                              У вас backend. Он внезапно в наружу отдает json и отдает его из базы. В PostgreSQL уже давно можно использовать uuid в качестве ключей. Что кстати весьма удобно при общении с внешним миром.


                              json dumps как раз и обламывается в этом случае :) Ну или включаем наши руки не для скуки и дописываем. Из коробки с этой задачей нормально справляется Marshmallow плюс он весьма хорошо интегрируется с SQLAlchemy что позволяет писать весьма компактный код.

                                0
                                Для сериализации данных для клиента Marshmallow полезен, если нужно из одной структуры в базе сделать совсем другую структуру и когда где-то необходимо описать этот маппинг.

                                В противном случае json.dumps вполне справится, параметром default можно сериализовать любой, в том числе произвольный объект и описать эти правила сериализации в одном месте.

                                В PostgreSQL уже давно можно использовать uuid в качестве ключей

                                Если в качестве ключа использовать случайный uuid то индекс будет перестраиваться на каждый запрос на вставку. Это тяжело.
                                  0
                                  Для сериализации данных для клиента Marshmallow полезен, если нужно из одной структуры в базе сделать совсем другую структуру и когда где-то необходимо описать этот маппинг.

                                  Пишутся схемы ожидаемых полей и все. Декларативно и хорошо, вполне прозрачно.


                                  В противном случае json.dumps вполне справится, параметром default можно сериализовать любой, в том числе произвольный объект и описать эти правила сериализации в одном месте.

                                  По мне лучше декларативно схему описать. Это будет еще гарантировать, что в наружу лишнее не уйдет. Лишние данные просто отсекаются. В случае json.dumps это не произойдет.


                                  Если в качестве ключа использовать случайный uuid то индекс будет перестраиваться на каждый запрос на вставку. Это тяжело.

                                  Надо просто использовать функцию uuid_generate_v1mc() она имеет корелляцию к timestamp. Что позволяет это уменьшить.

                                    0
                                    Пишутся схемы ожидаемых полей и все. Декларативно и хорошо, вполне прозрачно.

                                    Пожалуй, сильно зависит от задачи. Если задача — отдать клиенту данные из базы без каких-либо изменений за исключением ряда столбцов — я бы не стал запрашивать их у базы данных — зачем попусту гонять данные по сети?

                                    Гарантировать, что в наружу лишнее не уйдет конечно можно и нужно схемами, но например в тестах. Это можно проверить 1 раз на CI.

                                    Надо просто использовать функцию uuid_generate_v1mc() она имеет корелляцию к timestamp. Что позволяет это уменьшить.

                                    Не понимаю только зачем. Не проще ли использовать первичным ключом int (оно очевидно будет быстрее и проще для постгрес — 4 байта против 16) и иметь дополнительное поле uuid4 с уникальным индексом, которое отдавать наружу клиентам, если вы хотите избежать перебора сущностей по id?
                                      0
                                      Если задача — отдать клиенту данные из базы без каких-либо изменений за исключением ряда столбцов — я бы не стал запрашивать их у базы данных — зачем попусту гонять данные по сети?

                                      Как правило у вас задача стоит как сделайте CRUD по БД. И там вот такие вещи сильно ускоряют разработку и более прозрачны для других.


                                      Не проще ли использовать первичным ключом int (оно очевидно будет быстрее и проще для постгрес — 4 байта против 16)

                                      Для начала стоит использовать bigint. Это конечно не очевидно, но с переполнением int я уже сталкивался.


                                      и иметь дополнительное поле uuid4 с уникальным индексом, которое отдавать наружу клиентам

                                      И в этом случае вместо одного поля в 16 байт и одного индекса у меня будет 2 поля в сумме дающие 20 байт, а если все же брать bigint 24 байта и два индекса. И собственно зачем?

                                        +1
                                        Для начала стоит использовать bigint. Это конечно не очевидно, но с переполнением int я уже сталкивался.
                                        Зависит от задачи. В постоянно растущих таблицах в этом конечно есть смысл.
                                        И в этом случае вместо одного поля в 16 байт и одного индекса у меня будет 2 поля в сумме дающие 20 байт, а если все же брать bigint 24 байта и два индекса. И собственно зачем?
                                        Cкорость выполнение запросов (и как следствие — скорость работы сервиса) очень важна, в то время как диск почти ничего не стоит.

                                        Предлагаемый вами вариант на вставке медленнее на ~20%
                                        Для теста создал две таблицы, в которые вставлял 3 миллиона записей частями по 10 тысяч записей за раз:
                                        CREATE TABLE citizens_int_uuid4 (
                                            citizen_id bigint primary key, 
                                            name text,
                                            external_id uuid DEFAULT uuid_generate_v4() UNIQUE
                                        );
                                        
                                        CREATE TABLE citizens_uuid1 (
                                            citizen_id uuid primary key DEFAULT uuid_generate_v1mc(),
                                            name text
                                        );
                                        

                                        Вариант с uuid1, который вы предлагаете, выполнился за 2 мин 27 сек. Вариант с uuid4 — за 2 мин.

                                        Также попробовал что вариант с uuid4 как с bigint так и с int. Интересно, что разница получилась всего на 2-4 секунды.

                                        Уверен, что другие операции (например, JOIN) тоже будут работать медленее, вопрос только насколько.
                                          0
                                          Зависит от задачи. В постоянно растущих таблицах в этом конечно есть смысл.

                                          Сейчас это стоит делать всегда. Использовать сейчас 32 битный инт странное занятие.


                                          Вариант с uuid1, который вы предлагаете, выполнился за 2 мин 27 сек. Вариант с uuid4 — за 2 мин.

                                          А теперь сходите посмотрите что там с местом на диске и размер индексов. Смысл использования uuid1 был в том чтобы uuid были более равномерные.


                                          Также попробовал что вариант с uuid4 как с bigint так и с int. Интересно, что разница получилась всего на 2-4 секунды.

                                          Что на самом деле говорит нам, что можно использовать все что угодно.


                                          Ну и да я просто напомню основной профиль нагрузки у СУБД это все же не только запись, но и чтение. А вот тут размер индексов и размер записи начинают влиять.

                        0
                        Целое работоспособное приложение и не реализовано ни одного алгоритма? Яндекс, ты ли это?
                          +1
                          Это задание являлось вторым этапом отбора. Первым этапом был контест с алгоритмическими задачами.
                          +1
                          alvassin По тексту не понятно, вы различаете или нет понятие модуля и пакета в python. И немного смутил момент с папочкой utils. Как вы следите чтобы все не писали одни и те-же utils в разных сервисах? Часто в utils пишут что-то, что не ясно куда положить. Как-то боретесь с этим в Яндекс?
                            +3
                            rmsbeetle, согласно документации «модуль» — файл с инструкциями Python, а «пакет» — способ организации пространства имен модулей (синтаксически — с помощью точек). Однако в ней же слово «модуль» иногда трактуется более широко: например, технически asyncio — пакет (неймспейс с несколькими модулями), но иногда называется модулем.
                            Под пакетом я имел в виду единицу дистрибуции, а под модулем разные вещи, в зависимости от контекста. Я исправлю терминологию, чтобы не вводить никого в заблужение. Спасибо за замечание.

                            На второй вопрос я, пожалуй, не смогу ответить за весь Яндекс, так как в компании очень много команд, и у всех немного по-разному.
                            В Едадиле — это прозвучит очень банально — мы стараемся как можно больше делиться опытом друг с другом и общаться с коллегами, смотреть PR друг друга (посмотреть PR другой команды — скорее правило, а не исключение).
                            Если кто-то видит, что он уже решал похожую проблему — скорее всего появится хорошо решающая эту проблему библиотека, покрытая тестами. Например, в свое время так появился aiomisc, aiomisc-dependency, snakepacker, wsrpc и другие.
                              +1
                              ОГРОМНОЕ СПАСИБО! Я В ЖИЗНИ СТАТЬИ ЛУЧШЕ НЕ ВИДЕЛ!
                                0
                                alvassin Спасибо за отличную статью!
                                Скажите, почему бы не использовать poetry? И setup.py не нужен, и зависимости в одном файле. Отказ от него здесь — просто вкусовщина, или есть какие-то противопоказания?
                                  0
                                  Существует два стандарта: зрелый setup.py и новый, описанный в PEP 518 и PEP 517.

                                  PEP 518 описывает возможность указывать требования (зависимости) к системе сборки с помощью файла pyproject.toml и находится в статусе final. Это означает, что он больше не будет изменяться.

                                  PEP 517 описывает интерфейсы для независимых от distutils и/или setuptools систем сборки (которые использует poetry и другие альтернативные инструменты), и находится в статусе provisional. Авторы PEP в данный момент собирают фидбек сообщества и PEP еще может измениться или его вообще могут не принять в существующем виде. Поэтому на мой взгляд рекомендовать его пока рано — не хотелось бы потом переписывать все проекты.

                                  Вообще PEP 517 и poetry выглядят здорово, лично мне они нравятся. К слову, aiohttp уже вместе с setup.py использует pyproject.toml.
                                  0
                                  Большое спасибо за статью!
                                  Принимал участие в конкурсе (не прошел, не успел задеплоиться).

                                  Подскажите, пожалуйста, по следующим вопросам:
                                  1. Почему не прикрутили NGINX? Не будет ли standalone сервер aiohttp медленее, чем с NGINX?
                                  2. Почему citizens не разбиты на отдельные таблицы для каждого импорта? Намек на это также был в вашем FAQ видео. Еще это позволило бы лочиться только по citizens.id, а не по всей выгрузке (import_id).
                                  3. Не рассматривался ли вариант получения (от клиента) выгрузки по частям? Возможно ли это сделать средствами aiohttp/python? Например, через request.content: docs.aiohttp.org/en/stable/streams.html или это не поможет читать данные кусками?
                                  4. Не очень понял используется ли connection pool. В коде, при старте, приложение коннектится к postgres, но не очень понятен размер пула (или коннекшн один?).
                                  5. Я c экосистемой python не очень хорошо знаком (в последние пару лет сижу на java), немного смутило, что нет разделения на слои («луковой» архитектуры).
                                  Транспортный слой (контроллеры, у вас это handlers) перемешан со слоем бизнес логики и со слоем БД. Можно было бы как-нибудь разделить, часть вынести в сервисы. Кмк, это уменьшило бы связанность (если говорить о проекте, который будет со временем расти).
                                  6. Так же не используется DI. Для python есть библиотека injector, но создается впечатление что в экосистеме python'a так не принято :)
                                  Кстати, в Яндексе где-нибудь используется injector или своя DI-библиотека для python'а?

                                  В целом, статья очень поучительная, многие вещи не знал :)
                                  Спасибо!
                                    +1
                                    1. Почему не прикрутили NGINX? Не будет ли standalone сервер aiohttp медленее, чем с NGINX?
                                    Андрей Светлов рекомендует использовать nginx c aiohttp по следующим причинам:

                                    • Nginx может предовратить множество атак на основе некорректных запросов HTTP, отклонять запросы, методы которых не поддерживаются приложением, запросы со слишком большим body.
                                      Это может снять какую-то нагрузку с приложения в production, но выходит за рамки данного задания.
                                    • Nginx здорово раздает статику.
                                      В нашем приложении нет статических файлов.
                                    • Nginx позволяет распределить нагрузку по нескольким экземплярам приложения слушающих разные сокеты (upstream module), чтобы загрузить все ядра одного сервера.
                                      На мой взгляд проще и эффективнее обслуживать запросы в одном приложении на 1 сокете несколькими форками, чем устанавливать и настраивать для этого отдельное приложение. Я рассказывал как это сделать в разделе «Приложение».
                                    • Добавлю от себя: nginx может играть роль балансера, распределяя нагрузку по разным физическим серверам (виртуалкам).
                                      По условиям задания, у нас есть только один сервер, где мы можем развернуть приложение, поэтому это тоже выходит за рамки нашей задачи.

                                    Как вы видите, именно для этого приложения nginx не очень полезен. Но в других случаях (описанных выше), он может очень здорово увеличить эффективность вашего сервиса.

                                    2. Почему citizens не разбиты на отдельные таблицы для каждого импорта? Намек на это также был в вашем FAQ видео. Еще это позволило бы лочиться только по citizens.id, а не по всей выгрузке (import_id).
                                    Для этой задачи большой разницы нет, можно создавать и отдельные таблицы. Минусы этого подхода: если потребуется реализовать обработчик, возвращающий всех жителей всех выгрузок придется делать очень много UNION-ов, а также при создании таблиц с динамическими названиями их придется экранировать вручную, т.к. PostgreSQL не позволяет использовать аргументы в DDL.

                                    3. Не рассматривался ли вариант получения (от клиента) выгрузки по частям? Возможно ли это сделать средствами aiohttp/python? Например, через request.content: docs.aiohttp.org/en/stable/streams.html или это не поможет читать данные кусками?
                                    Получать и обрабатывать json запрос по частям cилами aiohttp можно
                                    from http import HTTPStatus
                                    
                                    import ijson
                                    from aiohttp.web_response import Response
                                    
                                    from .base import BaseView
                                    
                                    
                                    class ImportsView(BaseView):
                                        URL_PATH = '/imports'
                                    
                                        async def post(self):
                                            async for citizen in ijson.items_async(
                                                self.request.content,
                                                'citizens.item',
                                                buf_size=4096
                                            ):
                                                # Обработка жителя
                                                print(citizen)
                                            return Response(status=HTTPStatus.CREATED)

                                    Сейчас валидация входных данных в обработчике POST /imports происходит двумя Marshmallow схемами: CitizenSchema (проверяет конкретного жителя) ImportSchema (проверяет связи между жителями и уникальность citizen_id) и только затем пишется в базу.

                                    В случае обработки жителей по одному, вы можете проверить только текущего жителя. Чтобы проверить уникальность его citizen_id и его родственные связи в рамках выгрузки придется так или иначе накапливать информацию о уже обработанных жителях выгрузки и откатывать транзакцию, если вы встретите некорректные данные. Можно хранить для каждого жителя только поля citizen_id и relatives, это сэкономит память, но код будет сложнее, поэтому в этом приложении от этого варианта я отказался.

                                    4. Не очень понял используется ли connection pool. В коде, при старте, приложение коннектится к postgres, но не очень понятен размер пула (или коннекшн один?).
                                    Используется пул с 10 подключениями (размер по умолчанию). При запуске aiohttp приложения cleanup_ctx вызывает генератор setup_pg, который в свою очередь создает объект asyncpgsa.PG (с пулом соединений) и сохраняет его в app['pg']. Базовый обработчик, для удобства предоставляет объект app['pg'] в виде свойства pg. Кол-во подключений, кстати, можно вынести аргументом ConfigArgParse — настраивать приложение будет очень удобно.

                                    5. Я c экосистемой python не очень хорошо знаком (в последние пару лет сижу на java), немного смутило, что нет разделения на слои («луковой» архитектуры).
                                    Транспортный слой (контроллеры, у вас это handlers) перемешан со слоем бизнес логики и со слоем БД. Можно было бы как-нибудь разделить, часть вынести в сервисы. Кмк, это уменьшило бы связанность (если говорить о проекте, который будет со временем расти).
                                    У нас микросервисы, поэтому на мой взгляд нужно руководствоваться правилом KISS. Если что — можно быстро написать новый и выкинуть старый.

                                    6. Так же не используется DI. Для python есть библиотека injector, но создается впечатление что в экосистеме python'a так не принято :) Кстати, в Яндексе где-нибудь используется injector или своя DI-библиотека для python'а?
                                    Не хотелось переусложнять это приложение. В Едадиле мы часто пользуемся модулем aiomisc-dependency. Он здорово выручает, когда в одной программе живет несколько сервисов (REST API, почтовый сервис, еще что-нибудь) и им требуется раздлеляемый пул ресурсов (например, один connection pool asyncpg), хотя этот модуль вполне можно использовать и для создания любых других объектов. Что касается Яндекса — не могу сказать, разработчиков очень много и у всех есть свои любимые инструменты.
                                      0
                                      Спасибо за подробные ответы!
                                    +1
                                    Для валидации входящих запросов и генерации сваггер доков можно еще можно использовать вот эту библиотеку github.com/hh-h/aiohttp-swagger3
                                      0
                                      С удовольствием прослушал курс бэк-энд разработчика, попал на эту публикацию и решил собрать сервис.
                                      1. make postgres
                                      2. make docker
                                      3. docker run -it \
                                      -e ANALYZER_PG_URL=postgresql://user:hackme@localhost/analyzer \
                                      alvassin/backendschool2019 analyzer-db upgrade head
                                      — и это пока последняя команда:
                                      Потому что результаты:
                                      Ошибка присоединения к Postgres
                                      Traceback (most recent call last):
                                        File "/usr/share/python3/app/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 2285, in _wrap_pool_connect
                                          return fn()
                                        File "/usr/share/python3/app/lib/python3.8/site-packages/sqlalchemy/pool/base.py", line 303, in unique_connection
                                          return _ConnectionFairy._checkout(self)
                                        File "/usr/share/python3/app/lib/python3.8/site-packages/sqlalchemy/pool/base.py", line 773, in _checkout
                                          fairy = _ConnectionRecord.checkout(pool)
                                        File "/usr/share/python3/app/lib/python3.8/site-packages/sqlalchemy/pool/base.py", line 492, in checkout
                                          rec = pool._do_get()
                                        File "/usr/share/python3/app/lib/python3.8/site-packages/sqlalchemy/pool/impl.py", line 238, in _do_get
                                          return self._create_connection()
                                        File "/usr/share/python3/app/lib/python3.8/site-packages/sqlalchemy/pool/base.py", line 308, in _create_connection
                                          return _ConnectionRecord(self)
                                        File "/usr/share/python3/app/lib/python3.8/site-packages/sqlalchemy/pool/base.py", line 437, in __init__
                                          self.__connect(first_connect_check=True)
                                        File "/usr/share/python3/app/lib/python3.8/site-packages/sqlalchemy/pool/base.py", line 657, in __connect
                                          pool.logger.debug("Error on connect(): %s", e)
                                        File "/usr/share/python3/app/lib/python3.8/site-packages/sqlalchemy/util/langhelpers.py", line 68, in __exit__
                                          compat.raise_(
                                        File "/usr/share/python3/app/lib/python3.8/site-packages/sqlalchemy/util/compat.py", line 178, in raise_
                                          raise exception
                                        File "/usr/share/python3/app/lib/python3.8/site-packages/sqlalchemy/pool/base.py", line 652, in __connect
                                          connection = pool._invoke_creator(self)
                                        File "/usr/share/python3/app/lib/python3.8/site-packages/sqlalchemy/engine/strategies.py", line 114, in connect
                                          return dialect.connect(*cargs, **cparams)
                                        File "/usr/share/python3/app/lib/python3.8/site-packages/sqlalchemy/engine/default.py", line 488, in connect
                                          return self.dbapi.connect(*cargs, **cparams)
                                        File "/usr/share/python3/app/lib/python3.8/site-packages/psycopg2/__init__.py", line 126, in connect
                                          conn = _connect(dsn, connection_factory=connection_factory, **kwasync)
                                      psycopg2.OperationalError: could not connect to server: Connection refused
                                              Is the server running on host "localhost" (127.0.0.1) and accepting
                                              TCP/IP connections on port 5432?
                                      could not connect to server: Cannot assign requested address
                                              Is the server running on host "localhost" (::1) and accepting
                                              TCP/IP connections on port 5432?
                                      
                                      
                                      The above exception was the direct cause of the following exception:
                                      
                                      Traceback (most recent call last):
                                        File "/usr/local/bin/analyzer-db", line 11, in <module>
                                          load_entry_point('analyzer==0.0.1', 'console_scripts', 'analyzer-db')()
                                        File "/usr/share/python3/app/lib/python3.8/site-packages/analyzer/db/__main__.py", line 32, in main
                                          exit(alembic.run_cmd(config, options))
                                        File "/usr/share/python3/app/lib/python3.8/site-packages/alembic/config.py", line 546, in run_cmd
                                          fn(
                                        File "/usr/share/python3/app/lib/python3.8/site-packages/alembic/command.py", line 298, in upgrade
                                          script.run_env()
                                        File "/usr/share/python3/app/lib/python3.8/site-packages/alembic/script/base.py", line 489, in run_env
                                          util.load_python_file(self.dir, "env.py")
                                        File "/usr/share/python3/app/lib/python3.8/site-packages/alembic/util/pyfiles.py", line 98, in load_python_file
                                          module = load_module_py(module_id, path)
                                        File "/usr/share/python3/app/lib/python3.8/site-packages/alembic/util/compat.py", line 173, in load_module_py
                                          spec.loader.exec_module(module)
                                        File "<frozen importlib._bootstrap_external>", line 783, in exec_module
                                        File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
                                        File "/usr/share/python3/app/lib/python3.8/site-packages/analyzer/db/alembic/env.py", line 79, in <module>
                                          run_migrations_online()
                                        File "/usr/share/python3/app/lib/python3.8/site-packages/analyzer/db/alembic/env.py", line 67, in run_migrations_online
                                          with connectable.connect() as connection:
                                        File "/usr/share/python3/app/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 2218, in connect
                                          return self._connection_cls(self, **kwargs)
                                        File "/usr/share/python3/app/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 103, in __init__
                                          else engine.raw_connection()
                                        File "/usr/share/python3/app/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 2317, in raw_connection
                                          return self._wrap_pool_connect(
                                        File "/usr/share/python3/app/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 2288, in _wrap_pool_connect
                                          Connection._handle_dbapi_exception_noconnection(
                                        File "/usr/share/python3/app/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1554, in _handle_dbapi_exception_noconnection
                                          util.raise_(
                                        File "/usr/share/python3/app/lib/python3.8/site-packages/sqlalchemy/util/compat.py", line 178, in raise_
                                          raise exception
                                        File "/usr/share/python3/app/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 2285, in _wrap_pool_connect
                                          return fn()
                                        File "/usr/share/python3/app/lib/python3.8/site-packages/sqlalchemy/pool/base.py", line 303, in unique_connection
                                          return _ConnectionFairy._checkout(self)
                                        File "/usr/share/python3/app/lib/python3.8/site-packages/sqlalchemy/pool/base.py", line 773, in _checkout
                                          fairy = _ConnectionRecord.checkout(pool)
                                        File "/usr/share/python3/app/lib/python3.8/site-packages/sqlalchemy/pool/base.py", line 492, in checkout
                                          rec = pool._do_get()
                                        File "/usr/share/python3/app/lib/python3.8/site-packages/sqlalchemy/pool/impl.py", line 238, in _do_get
                                          return self._create_connection()
                                        File "/usr/share/python3/app/lib/python3.8/site-packages/sqlalchemy/pool/base.py", line 308, in _create_connection
                                          return _ConnectionRecord(self)
                                        File "/usr/share/python3/app/lib/python3.8/site-packages/sqlalchemy/pool/base.py", line 437, in __init__
                                          self.__connect(first_connect_check=True)
                                        File "/usr/share/python3/app/lib/python3.8/site-packages/sqlalchemy/pool/base.py", line 657, in __connect
                                          pool.logger.debug("Error on connect(): %s", e)
                                        File "/usr/share/python3/app/lib/python3.8/site-packages/sqlalchemy/util/langhelpers.py", line 68, in __exit__
                                          compat.raise_(
                                        File "/usr/share/python3/app/lib/python3.8/site-packages/sqlalchemy/util/compat.py", line 178, in raise_
                                          raise exception
                                        File "/usr/share/python3/app/lib/python3.8/site-packages/sqlalchemy/pool/base.py", line 652, in __connect
                                          connection = pool._invoke_creator(self)
                                        File "/usr/share/python3/app/lib/python3.8/site-packages/sqlalchemy/engine/strategies.py", line 114, in connect
                                          return dialect.connect(*cargs, **cparams)
                                        File "/usr/share/python3/app/lib/python3.8/site-packages/sqlalchemy/engine/default.py", line 488, in connect
                                          return self.dbapi.connect(*cargs, **cparams)
                                        File "/usr/share/python3/app/lib/python3.8/site-packages/psycopg2/__init__.py", line 126, in connect
                                          conn = _connect(dsn, connection_factory=connection_factory, **kwasync)
                                      sqlalchemy.exc.OperationalError: (psycopg2.OperationalError) could not connect to server: Connection refused
                                              Is the server running on host "localhost" (127.0.0.1) and accepting
                                              TCP/IP connections on port 5432?
                                      could not connect to server: Cannot assign requested address
                                              Is the server running on host "localhost" (::1) and accepting
                                              TCP/IP connections on port 5432?
                                      
                                      (Background on this error at: http://sqlalche.me/e/e3q8)
                                      
                                      


                                      Посмотрел sudo netstat -tupln
                                       Proto Recv-Q Send-Q Local Address           Foreign Address         State       PID/Program name
                                      tcp        0      0 127.0.0.1:46624         0.0.0.0:*               LISTEN      6645/kited      
                                      tcp        0      0 127.0.0.1:46601         0.0.0.0:*               LISTEN      6929/code       
                                      tcp        0      0 192.168.122.1:53        0.0.0.0:*               LISTEN      5020/dnsmasq    
                                      tcp        0      0 10.75.69.1:53           0.0.0.0:*               LISTEN      3251/dnsmasq    
                                      tcp        0      0 127.0.1.1:53            0.0.0.0:*               LISTEN      2476/dnsmasq    
                                      tcp        0      0 127.0.0.1:631           0.0.0.0:*               LISTEN      16677/cupsd     
                                      tcp6       0      0 :::8081                 :::*                    LISTEN      4938/docker-proxy
                                      tcp6       0      0 fd42:c2ba:2d3b:518e::53 :::*                    LISTEN      3251/dnsmasq    
                                      tcp6       0      0 fe80::b8bb:caff:fe06:53 :::*                    LISTEN      3251/dnsmasq    
                                      tcp6       0      0 ::1:631                 :::*                    LISTEN      16677/cupsd     
                                      tcp6       0      0 :::5432                 :::*                    LISTEN      25651/docker-proxy
                                      ,,,
                                      


                                      Не видно tcp4 — но ведь все сделано, как Васин рассказал ;-)
                                      Оба контейра живы, docker run alvassin/backendschool2019 analyzer-db --help — работает
                                      Postgres-контейнер тоже
                                      Хоть это вне архитектуры приложения — но как-то потерял несколько часов — не могу понять, почему не устанавливается соединение.
                                      Спасибо заранее
                                        +1

                                        Пообщался в переписке с автором alvassin — если производить запуск 3 команды (на одном хосте) и дополнением --network host — все заработает:


                                        3. docker run -it  --network host \
                                        -e ANALYZER_PG_URL=postgresql://user:hackme@localhost/analyzer \
                                        alvassin/backendschool2019 analyzer-db upgrade head

                                        Аналогично для запуска приложения:


                                        4. docker run -it -p 8081:8081 --network host -e ANALYZER_PG_URL=postgresql://user:hackme@localhost/analyzer alvassin/backendschool2019

                                        5. http://0.0.0.0:8081 в браузере

                                        и видна swagger документация сервиса

                                          +1
                                          Что делает команда make postgres
                                          # Останавливает контейнер analyzer-postgres, если он запущен
                                          $ docker stop analyzer-postgres || true
                                          
                                          # Запускает контейнер с именем analyzer-postgres
                                          $ docker run --rm --detach --name=analyzer-postgres \
                                              --env POSTGRES_USER=user \
                                              --env POSTGRES_PASSWORD=hackme \
                                              --env POSTGRES_DB=analyzer \
                                              --publish 5432:5432 postgres

                                          У вас не получалось подключиться, потому что если при запуске контейнера сеть не указана явно, он создается в дефолтной bridge-сети. В такой сети контейнеры могут обращаться друг к другу (и к хосту) только по ip адресу. Кстати, Docker for Mac предоставляет специальное DNS имя host.docker.internal для обращений к хост-машине из контейнеров, что довольно удобно для локальной разработки.

                                          Аргумент --publish 5432:5432 указывает Docker добавить в iptables правило, по которому запросы к хост-машине на порт 5432 отправляются в контейнер с Postgres на порт 5432.

                                          Поэксперементируем с подключением к контейнеру с Postgres
                                          Подключение с хост-машины на published порт:
                                          $ telnet localhost 5432
                                          Trying ::1...
                                          Connected to localhost.
                                          Escape character is '^]'.
                                          

                                          Подключение из контейнера с приложением (по IP хост-машины на published-порт):
                                          # Получаем IP хост-машины
                                          $ docker run -it alvassin/backendschool2019 /bin/bash -c \
                                              'apt update && apt install -y iproute2 && ip route show | grep default'
                                          default via 172.17.0.1 dev eth0
                                          
                                          # Применяем миграции
                                          $ docker run -it \
                                              -e ANALYZER_PG_URL=postgresql://user:hackme@172.17.0.1/analyzer \
                                              alvassin/backendschool2019 \
                                              analyzer-db upgrade head
                                          

                                          Подключение из контейнера с приложением (по host.docker.internal, published порт нужен, потому что подключаемся через хост-машину):
                                          $ export HOST=host.docker.internal
                                          $ docker run -it \
                                              -e ANALYZER_PG_URL=postgresql://user:hackme@${HOST}/analyzer \
                                              alvassin/backendschool2019 \
                                              analyzer-db upgrade head
                                          

                                          Подключение из контейнера с приложением (по IP контейнера Postgres, published порт в этом случае не нужен):
                                          # Получаем IP
                                          $ docker inspect \
                                              -f '{{range .NetworkSettings.Networks}}{{.IPAddress}}{{end}}' \
                                              analyzer-postgres
                                          172.17.0.2
                                          
                                          $ docker run -it \
                                              -e ANALYZER_PG_URL=postgresql://user:hackme@172.17.0.2/analyzer \
                                              alvassin/backendschool2019 \
                                              analyzer-db upgrade head
                                          


                                          Если контейнер с приложением запускается в сети хост-машины (параметр --network host), он не получает отдельного ip адреса и его сетевой стек не изолируется.

                                          Поэтому такой контейнер имеет доступ к портам хост-машины через localhost.
                                          $ docker run -it \
                                              --network host \
                                              -e ANALYZER_PG_URL=postgresql://user:hackme@localhost/analyzer \
                                              alvassin/backendschool2019 \
                                              analyzer-db upgrade head
                                          


                                          Третий вариант подружить контейнеры — создать bridge сеть вручную и запустить в ней оба контейнера — и Postgres и контейнер с приложением. В отличие от дефолтной bridge-сети, в созданных пользователем bridge-сетях можно обращаться по имени контейнера.

                                          Это бы выглядело следующим образом
                                          # Создаем сеть
                                          $ docker network create analyzer-net
                                          
                                          # Запускаем контейнер с PostgreSQL в сети analyzer-net
                                          $ docker run --rm -d \
                                              --name=analyzer-postgres \
                                              --network analyzer-net \
                                              -e POSTGRES_USER=user \
                                              -e POSTGRES_PASSWORD=hackme \
                                              -e POSTGRES_DB=analyzer \
                                              -p 5432:5432 \
                                              postgres
                                          
                                          # Запускаем контейнер с приложением в сети analyzer-net,
                                          # обращаемся по имени к контейнеру с Postgres
                                          $ docker run -it --network analyzer-net \
                                              -e ANALYZER_PG_URL=postgresql://user:hackme@analyzer-postgres/analyzer \
                                              alvassin/backendschool2019 \
                                              analyzer-db upgrade head
                                          

                                        Only users with full accounts can post comments. Log in, please.