Немного фактов о python asyncio

Всем привет! Хотелось бы поделиться опытом использования python asyncio. За полтора года использования в продакшене накопился некоторый опыт, общие приемы, облегчающие жизнь. Естественно, были и грабли, о которых также стоит упомянуть, ибо это поможет сэкономить кучу времени тем, кто только начинает использовать в своих приложениях asyncio. Кому интересно — прошу под кат.

Немного истории


Asyncio появился в Python версии 3.4, в 3.5 был добавлен более приятный глазу async/await синтаксис. Asyncio предоставляет из коробки Event loop, Future, Task, Coroutine, I/O multiplexing, Synchronization primitives. Это, конечно, не мало, но для полноценной разработки недостаточно. Для этого есть сторонние библиотеки. Отличная подборка есть вот тут. У себя в компании мы используем asyncio вместе с набором сторонних библиотек для написания микросервисов. По своей природе наши сервисы больше ориентированы на I/O нежели на CPU, так что для нас asyncio отлично подходит.

Собственно факты


Это не учебник по asyncio. Я не буду объяснять, почему асинхронный ввод/вывод это хорошо, или почему бы не использовать потоки. Не будет рассказов о корутинах, генераторах, event loop'ах и т.д. Также тут не будет никаких бенчмарков и сравнений с другими языками. Поехали!

Debug

Во-первых, PYTHONASYNCIODEBUG. Это переменная окружения, которая включает дебаг режим. Например, можно увидеть сообщения о том, что вы объявили функцию как корутину, но вызываете как обычную функцию(актуально для python3.4). Также необходимо настроить asyncio logger на уровень дебаг и еще разрешить вывод ResourseWarning. Можно увидеть много интересного: сообщения о том, что вы забыли закрыть транспорт или сам event loop(читай — забыли освободить ресурсы). Сравните запуск следующего кода с параметром интерпретатора -Wdefault и переменной окружения PYTHONASYNCIODEBUG=1 и без них (здесь и далее в примерах кода я буду опускать некоторые несущественные части такие как import или обработка исключений):

@asyncio.coroutine
def test():
    pass

loop = asyncio.get_event_loop()
test()

Правильное завершение

Кстати об освобождении ресурсов. Event loop надо уметь правильно остановить, дождавшись корректного заверешения всех тасок, закрытия соединений и т.д. И если с использованием run_until_complete() особых проблем нет, то с run_forever() все немного сложнее. Метод close() у event loop'а можно вызвать, только если он уже остановлен — т.е. после метода stop(). Лучше всего это сделать с помощью сигналов:

def handler(loop):
    loop.remove_signal_handler(signal.SIGTERM)
    loop.stop()

loop = asyncio.get_event_loop()
loop.add_signal_handler(signal.SIGTERM, handler, loop)

try:
    loop.run_forever()
finally:
    loop.close()

Далее в примерах кода я все же буду концентрироваться на самой сути, а не на правильном завершении программы.

Запуск блокирующего кода

Естественно, не для всего есть асинхронные библиотеки. Некоторый код так и остается блокирующим, и его надо как-то запускать, чтобы он не блокировал наш event loop. Для этого есть хороший метод run_in_executor(), который запускает то, что вы ему передали в одном из потоков встроенного пула, не блокируя основной поток с event loop'ом. Все бы хорошо, но с этим есть 2 проблемы. Во-первых, размер стандартного пула всего 5. Во-вторых, в asyncio синхронный dns resolver, который запускается именно таким образом во встроенном пуле. Значит, за пул всего в 5 потоков будут конкурировать ваши синхронные операции, плюс все кому надо сделать getaddrinfo(). Выход — использовать свой пул. Всегда:

def blocking_function():
    time.sleep(42)

pool = ThreadPoolExecutor(max_workers=multiprocessing.cpu_count())
loop = asyncio.get_event_loop()
loop.run_in_executor(pool, blocking_function)
loop.close()

Коварные Future

У Future есть одна очень интересная особенность: если в ней произойдет исключение — вы об этом ничего не узнаете, если только явно не спросите об этом у самой future. В документации есть хороший пример на эту тему. Вы увидите, что было исключение, только когда gc будет удалять объект future. Отсюда следует простое правило — всегда проверяете результат вашей future. Даже если по вашей задумке код внутри future должен просто крутиться в бесконечном цикле, и, казалось бы, негде проверять результат — все равно надо обработать исключения, например так:

async def handle_exception():
    try:
        await bug()
    except Exception:
        print('TADA!')

async def bug():
    raise Exception()

loop = asyncio.get_event_loop()
loop.create_task(handle_exception())
loop.run_forever()
loop.close()

await и __init__()

Невозможно. Магический метод __init__() не может содержать асинхронный код. Есть два пути. Или сделать у класса еще один метод, например, initialize(), который уже будет корутиной. Он будет содержать весь асинхронный код для инициализации, и его надо будет вызывать после создания объекта. Выглядит ужасно. Поэтому принято использовать функции-фабрики. Поясню на примере:

class Foo:
    def __init__(self, reader, writer, loop, *args, **kwargs):
        self._reader = reader
        self._writer = writer
        self._loop = loop

async def create_foo(loop):
    reader, writer = await asyncio.open_connection('127.0.0.1', 8888, loop=loop)
    return Foo(reader, writer, loop)

loop = asyncio.get_event_loop()
foo = loop.run_until_complete(create_foo(loop))
print(foo)
loop.close()

Wake up, Neo

Скажем, у вас есть таска, которая крутится в event loop'е и периодически сбрасывает какой-нибудь буфер. Можно написать такой код:

async def flush_task():
    while True:
        # flushing...
        await asyncio.sleep(FLUSH_TIMEOUT)

Сделать create_task() — и все вроде бы хорошо, кроме одного: что делать, если по завершении вам необходимо принудительно сбросить содержимое буфера? Как заставить таску «проснусться»? Тут на помощь приходят примитивы синхронизации:

class Foo:

    def __init__(self, loop, *args, **kwargs):
        self._loop = loop
        self._waiter = asyncio.Event()
        self._flush_future = self._loop.create_task(self.flush_task())

    async def flush_task(self):
        while True:
            try:
                await asyncio.wait_for(self._waiter.wait(), timeout=FLUSH_TIMEOUT, loop=self._loop)
            except asyncio.TimeoutError:
                pass
            # flushing ...
            self._waiter.clear()

    def force_flush():
        self._waiter.set()

loop = asyncio.get_event_loop()
foo = Foo(loop)
loop.run_forever()
loop.close()

Тестирования

Тестировать асинхронный код можно и нужно. И делать это так же просто, как и в случае синхронного кода:

class TestCase(unittest.TestCase):

    def setUp(self):
        self.loop = asyncio.new_event_loop()
        asyncio.set_event_loop(None)

    def tearDown(self):
        self.loop.close()

    def test_001(self):
        async def func():
            self.assertEqual(42, 42)
        self.loop.run_until_complete(func())

Тесты отлично изолированы, т.к. в каждом новом тесте используется свой event loop. А можно пойти дальше и использовать pytest, где есть удобные декораторы.

Источники вдохновения


Прежде всего — личный опыт. Многое из перечисленного было осознано в результате «ловли граблей», а затем изучения документации и исходников asyncio. Также отличными примерами послужили исходники популярных библиотек, таких как aiohttp, aioredis, aiopg.

Спасибо всем, кто дочитал статью до конца. Удачи с asyncio!
Поделиться публикацией

Комментарии 18

    +1
    полтора года использования в продакшене

    Расскажите подробнее, немного информации об асинхронном питоне в продакшене.
      +1
      Облачный сервис с бэкэндом на Django. И сбоку около 10 микросервисов так или иначе использующий asyncio. Некоторые взаимодействуют с другими сервисами комнании по http, также много работы с s3, Redis, Kafka. Проект пока не назовешь очень уж большим, но мы растем. К сожалению, больше рассказать про внутренее устройство системы не могу, сами понимаете.
        +1
        Спасибо, этого достаточно.
          0
          А в сторону django channels не смотрели? Имеет ли смысл использовать?
        0

        Не понял ваш пример с "примитивами синхрнизации", не вижу места где force_flash вызывается.

          0
          Это просто пример реализации. Можно использовать где угодно — зависит от логики вашего приложения.
          0
          Asyncio появился в Pyhton версии 3.4
          Но можно было использовать с python 3.3 (как внешнюю либу)

          за пул всего в 5 потоков будут конкурировать ваши синхронные операции, плюс все кому надо сделать getaddrinfo(). Выход — использовать свой пул. Всегда:
          Так же можно использовать асинхронный http клиент + json dns api.
            +1
            Если бы dns был нужен только для интернета… И кстати: кто будет резолвить адрес dns.google.com, для того чтобы отправить туда запрос?
              0
              Лучше уж https://github.com/saghul/aiodns
                0
                Вопрос спорный. У aiodns свои проблемы, может не для всех они критичны, но все же. Раз Два Три
            • НЛО прилетело и опубликовало эту надпись здесь
                0
                Про микросервисы писал выше. Компания занимается видеоаналитикой.
                0

                В примере по пул потоков при запуске скобки после blocking_function не лишние?
                И, вроде бы, число потоков по умолчанию не 5, а по 5 на каждой процессор.


                Поправьте, если ошибаюсь.

                  +1
                  Спасибо, поправил. По поводу пула: в python 3.5.2 в asyncio.base_events еще есть константа _MAX_WORKERS=5. В upstrem как я виже это уже пофикшено, но свой пул актуален для всего что < 3.5.2
                  +4

                  Функции фабрики удобно делать @classmethod'ами


                  async def create_foo(loop):
                      reader, writer = await asyncio.open_connection('127.0.0.1', 8888, loop=loop)
                      return Foo(reader, writer, loop)

                  превращается в


                  @classmethod
                  async def create(cls, loop):
                      reader, writer = await asyncio.open_connection('127.0.0.1', 8888, loop=loop)
                      return cls(reader, writer, loop)

                  красивее и работает с наследованием

                    0
                    Да, такой метод есть. Тут уже дело вкуса: мне больше нравятся функции-фабрики. К тому же такой подход более распространен в сообществе.
                    0
                    Немного воскрешу тему вопросом: а можно ли сделать такую конструкцию:
                    async def __getattr__(self, name):
                        pass

                    И как ее вообще использовать можно?
                    Обсуждать целесообразность здесь не стоит, а вот необходимость может быть, например, когда мы хотим внутри __getattr__ выполнить вызов функции, объявленной как async.
                      0
                      С asyncio можно await'ить не async функции, и наоборот — внутри не async ф-ии вызывать с «ожиданием» async функцию. Так что даже если такая конструкция не компиллируется, то способ сделать задуманое есть.

                    Только полноправные пользователи могут оставлять комментарии. Войдите, пожалуйста.

                    Самое читаемое