Последние несколько лет async вообще и asyncio в частности в питоне все больше набирают популярность и их все чаще используют. При этом иногда забывают о принципе KISS (Keep it simple, stupid) и о том, какие вообще проблемы решает асинхронный код и зачем он нужен. В этой статье я бы хотел описать пример, когда задачу можно и, на мой взгляд, нужно решать без использования async. И вообще, практически без всего.

Рассмотрим задачу.

Для начала подключим к компьютеру (ноутбуку) одну вэб-камеру. Далее нам нужно с этой камеры раз в 10 секунд запрашивать кадр и сохранять его. Для простоты примера будем сохранять кадр на диск. Эта задача простая и решение тоже будет простым, даже примитивным я бы сказал.

import time
import cv2


def main():
   source = 0
   frame_file_path = '<path to a frame file>.jpg'
   cap = cv2.VideoCapture(source)

   while True:
       ret, frame = cap.read()

       if not ret:
           print('Failed to get a frame')
           break

       cv2.imwrite(frame_file_path, frame)
       time.sleep(10)

   cap.release()


if __name__ == '__main__':
   main()

Чем хорош данный скрипт? Тем, что он прост, понятен и надежен как лом и не требует никаких внешних инструментов типа cron-а.

Теперь немного масштабируем наш пример: добавим еще одну камеру и скажем, что с нее нам нужны кадры каждые 15 секунд. Если на проекте вы активно пользуетесь asyncio, то можете по инерции поступить следующим образом: сказать, что раз задача по существу I/O Bound, а камер стало больше, то это работа для asyncio. Затем взять какой-нибудь планировщик задач типа APScheduler, который поддерживает asyncio и с его помощью и помощью лома накидать что-то вроде такого.

import asyncio
from functools import partial

import cv2
from apscheduler.schedulers.asyncio import AsyncIOScheduler


async def job(source, target_file_path):
   cap = cv2.VideoCapture(source)

   ret, frame = await asyncio.get_event_loop().run_in_executor(None, partial(cap.read, cap))

   if not ret:
       print('Failed to get a frame')
       return

   await asyncio.get_event_loop().run_in_executor(None, partial(cv2.imwrite, target_file_path, frame))


def main():
   frame_0_file_path = ''
   frame_1_file_path = ''

   scheduler = AsyncIOScheduler()
   scheduler.add_job(job, 'interval', seconds=10, kwargs={'source': 0, 'target_file_path': frame_0_file_path})
   scheduler.add_job(job, 'interval', seconds=15, kwargs={'source': 1, 'target_file_path': frame_1_file_path})
   scheduler.start()

   try:
       asyncio.get_event_loop().run_forever()
   except (KeyboardInterrupt, SystemExit):
       pass


if __name__ == '__main__':
   main()

На первый взгляд все неплохо, но это на первый взгляд. Помимо дополнительной внешней зависимости мы собрали под капотом корутины, asyncio с ивент лупом и потоки для запуска синхронных функций асинхронным образом. И теперь вместо топорной и надежной программки у нас есть целый технический букет, с которым можно повозиться и от этого кайфануть. Но оно нам надо? Мне вот как инженеру не очень :) Если вспомнить, что async, ивент луп и вот это все призваны при написании I/O Bound приложений экономить ресурсы при большой нагрузке, а нагрузки у нас нет, да и камер будет конечное и вполне осмысливаемое число, то могут закрасться смутные сомнения. А нужен ли нам там asyncio тут вообще? И можно прийти к выводу, что не нужен и что вместо запуска асинхронных задач на ивент лупе, можно воспользоваться классом apscheduler.executors.pool.ProcessPoolExecutor, который позволит нам запускать наши джобы в процессах. Есть еще класс, который позволяет нам запускать джобы в потоках. Но сомнения у нас смутные, поэтому мы его тоже отбросим. Получится что-то вроде такого:

import cv2
from apscheduler.schedulers.blocking import BlockingScheduler


def job(source, target_file_path):
   cap = cv2.VideoCapture(source)

   ret, frame = cap.read()

   if not ret:
       print('Failed to get a frame')
       return

   cv2.imwrite(target_file_path, frame)


def main():
   frame_0_file_path = ''
   frame_1_file_path = ''

   scheduler = BlockingScheduler({
       'apscheduler.executors.default': {
           'class': 'apscheduler.executors.pool:ProcessPoolExecutor',
           'max_workers': 2
       }
   })
   scheduler.add_job(job, 'interval', seconds=10, kwargs={'source': 0, 'target_file_path': frame_0_file_path})
   scheduler.add_job(job, 'interval', seconds=15, kwargs={'source': 1, 'target_file_path': frame_1_file_path})

   scheduler.start()

  
if __name__ == '__main__':
   main()

Так намного лучше: нет потоков, нет asyncio с ивент лупом не по делу. Но все еще не идеал, ибо есть APScheduler с какими-то внутренним механизмами. Оно нам надо? Мне - нет. Наше решение для одной камеры было максимально простым, тупым, надежным и имело всего одну зависимость от которой, увы, не убежишь. Почему бы нам тогда не вспомнить про принцип KISS еще разок и не взять первую версию программы, не параметризовать ее и не запускать ее для каждой камеры? Этот подход мне лично нравится больше всего: минимум усилий (если подумать о нем сразу), максимум надежности (я как инженер люблю все надежное) и максимально просто сопровождение, а точнее его отсутствие :)

В итоге получаем следующее:

import argparse
import time
import cv2


def main(source: int, frame_file_path: str, interval: int):
  cap = cv2.VideoCapture(source)

  while True:
      ret, frame = cap.read()

      if not ret:
          print('Failed to get a frame')
          break

      cv2.imwrite(frame_file_path, frame)
      time.sleep(interval)

  cap.release()


if __name__ == '__main__':
   parser = argparse.ArgumentParser()
   parser.add_argument('--source', type=int)
   parser.add_argument('--target-frame-path', type=str, dest='target_frame_path')
   parser.add_argument('--interval', type=int)
   args = parser.parse_args()

   main(args.source, args.target_frame_path, args.interval)

Такое же простое, надежное и понятное решение как лом. А против лома, как известно, приема нет :)