Еще раз о KISS и трендах
Последние несколько лет async вообще и asyncio в частности в питоне все больше набирают популярность и их все чаще используют. При этом иногда забывают о принципе KISS (Keep it simple, stupid) и о том, какие вообще проблемы решает асинхронный код и зачем он нужен. В этой статье я бы хотел описать пример, когда задачу можно и, на мой взгляд, нужно решать без использования async. И вообще, практически без всего.
Рассмотрим задачу.
Для начала подключим к компьютеру (ноутбуку) одну вэб-камеру. Далее нам нужно с этой камеры раз в 10 секунд запрашивать кадр и сохранять его. Для простоты примера будем сохранять кадр на диск. Эта задача простая и решение тоже будет простым, даже примитивным я бы сказал.
import time
import cv2
def main():
source = 0
frame_file_path = '<path to a frame file>.jpg'
cap = cv2.VideoCapture(source)
while True:
ret, frame = cap.read()
if not ret:
print('Failed to get a frame')
break
cv2.imwrite(frame_file_path, frame)
time.sleep(10)
cap.release()
if __name__ == '__main__':
main()
Чем хорош данный скрипт? Тем, что он прост, понятен и надежен как лом и не требует никаких внешних инструментов типа cron-а.
Теперь немного масштабируем наш пример: добавим еще одну камеру и скажем, что с нее нам нужны кадры каждые 15 секунд. Если на проекте вы активно пользуетесь asyncio, то можете по инерции поступить следующим образом: сказать, что раз задача по существу I/O Bound, а камер стало больше, то это работа для asyncio. Затем взять какой-нибудь планировщик задач типа APScheduler, который поддерживает asyncio и с его помощью и помощью лома накидать что-то вроде такого.
import asyncio
from functools import partial
import cv2
from apscheduler.schedulers.asyncio import AsyncIOScheduler
async def job(source, target_file_path):
cap = cv2.VideoCapture(source)
ret, frame = await asyncio.get_event_loop().run_in_executor(None, partial(cap.read, cap))
if not ret:
print('Failed to get a frame')
return
await asyncio.get_event_loop().run_in_executor(None, partial(cv2.imwrite, target_file_path, frame))
def main():
frame_0_file_path = ''
frame_1_file_path = ''
scheduler = AsyncIOScheduler()
scheduler.add_job(job, 'interval', seconds=10, kwargs={'source': 0, 'target_file_path': frame_0_file_path})
scheduler.add_job(job, 'interval', seconds=15, kwargs={'source': 1, 'target_file_path': frame_1_file_path})
scheduler.start()
try:
asyncio.get_event_loop().run_forever()
except (KeyboardInterrupt, SystemExit):
pass
if __name__ == '__main__':
main()
На первый взгляд все неплохо, но это на первый взгляд. Помимо дополнительной внешней зависимости мы собрали под капотом корутины, asyncio с ивент лупом и потоки для запуска синхронных функций асинхронным образом. И теперь вместо топорной и надежной программки у нас есть целый технический букет, с которым можно повозиться и от этого кайфануть. Но оно нам надо? Мне вот как инженеру не очень :) Если вспомнить, что async, ивент луп и вот это все призваны при написании I/O Bound приложений экономить ресурсы при большой нагрузке, а нагрузки у нас нет, да и камер будет конечное и вполне осмысливаемое число, то могут закрасться смутные сомнения. А нужен ли нам там asyncio тут вообще? И можно прийти к выводу, что не нужен и что вместо запуска асинхронных задач на ивент лупе, можно воспользоваться классом apscheduler.executors.pool.ProcessPoolExecutor, который позволит нам запускать наши джобы в процессах. Есть еще класс, который позволяет нам запускать джобы в потоках. Но сомнения у нас смутные, поэтому мы его тоже отбросим. Получится что-то вроде такого:
import cv2
from apscheduler.schedulers.blocking import BlockingScheduler
def job(source, target_file_path):
cap = cv2.VideoCapture(source)
ret, frame = cap.read()
if not ret:
print('Failed to get a frame')
return
cv2.imwrite(target_file_path, frame)
def main():
frame_0_file_path = ''
frame_1_file_path = ''
scheduler = BlockingScheduler({
'apscheduler.executors.default': {
'class': 'apscheduler.executors.pool:ProcessPoolExecutor',
'max_workers': 2
}
})
scheduler.add_job(job, 'interval', seconds=10, kwargs={'source': 0, 'target_file_path': frame_0_file_path})
scheduler.add_job(job, 'interval', seconds=15, kwargs={'source': 1, 'target_file_path': frame_1_file_path})
scheduler.start()
if __name__ == '__main__':
main()
Так намного лучше: нет потоков, нет asyncio с ивент лупом не по делу. Но все еще не идеал, ибо есть APScheduler с какими-то внутренним механизмами. Оно нам надо? Мне - нет. Наше решение для одной камеры было максимально простым, тупым, надежным и имело всего одну зависимость от которой, увы, не убежишь. Почему бы нам тогда не вспомнить про принцип KISS еще разок и не взять первую версию программы, не параметризовать ее и не запускать ее для каждой камеры? Этот подход мне лично нравится больше всего: минимум усилий (если подумать о нем сразу), максимум надежности (я как инженер люблю все надежное) и максимально просто сопровождение, а точнее его отсутствие :)
В итоге получаем следующее:
import argparse
import time
import cv2
def main(source: int, frame_file_path: str, interval: int):
cap = cv2.VideoCapture(source)
while True:
ret, frame = cap.read()
if not ret:
print('Failed to get a frame')
break
cv2.imwrite(frame_file_path, frame)
time.sleep(interval)
cap.release()
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('--source', type=int)
parser.add_argument('--target-frame-path', type=str, dest='target_frame_path')
parser.add_argument('--interval', type=int)
args = parser.parse_args()
main(args.source, args.target_frame_path, args.interval)
Такое же простое, надежное и понятное решение как лом. А против лома, как известно, приема нет :)