Привет, Хабр!
Подготовительным этапом для видеоаналитики с применением методов машинного обучения - является выгрузка записей из видеорегистраторов Hikvision, что является достаточно длительной задачей, особенно если регистраторов несколько, давайте попытаемся разобраться как выполнить эту работу быстрее и удобнее, используя сервер.

Основанием для разработки «Task-Сервера» - послужила задача по выгрузке видеозаписей с видеорегистраторов Hikvision. В работе была использована библиотека Hikload которая подключалась к регистраторам и загружала видеоролики.
Для начала немного о «Task-Сервере» (Рис. 1), в данном случае это небольшой сервер, хранящий в базе данных или файле заранее подготовленные строки, которые будут переданы в качестве параметров клиентам, подключенным к хосту. Клиент же в свою очередь запускает скрипт с полученными данными и после выполнения запрашивает новую задачу до тех пор, пока задачи не закончатся.

В качестве инструмента для создания такого сервера был выбран микрофреймворк Flask, нам достаточно небольшого веб-приложения для нашей задачи. Из плюсов Flask можно выделить простоту создания веб-приложений, отсутствие сложных настроек и достаточно стабильная и быстрая работа. В качестве хранилища для команд была выбрана таблица Excel. Все задачи были сформированы заранее.
Проблема
Так как количество камер в разных помещениях, разных городов достаточно много, а время выгрузки с одного устройства сильно варьировалось, от нескольких минут до часа (Рис. 2), то было принято решение: на основе параметров для подключения к регистраторам сформировать задачи и распределить их на разных машинах, для чего и потребовался сервер.

Решение:
Начнем работу с создания сервера, для начала создаем новое окружение под приложение следующей командой в терминале:
python –m venv venv
Так как решение было использовать Flask то подключаем следующие модули:
«Flask» - для создания веб-приложения.
«openpyxl» – непосредственно для работы с excel файлом, где находятся команды.
«multiprocessing» - из этого модуля нам потребуются только очереди, так как они идеально подходят для нашей задачи.
Для начала импортируем библиотеку Flask и инициализируем приложение:
from flask import Flask
app = Flask(__name__)
Импортируем остальные библиотеки:
from openpyxl import load_workbook
from multiprocessing import Queue
Создаем и наполняем базу данных задачами, которые сервер будет возвращать клиенту. Задачи сформированы заранее, они состоят из сервера/ ip – адреса к которому подключаемся, user - логина, password - пароля, cameras - номера камеры. По умолчанию если не задавать дату и время - то Hikload выгружает видео за один текущий день. Полный список параметров для выгрузки видео можно узнать, набрав «hikload --help» (рис. 3)

Далее добавим функцию, которая возвращает команду из базы данных. Все задачи из таблицы Excel выгружаются в Queue(очередь).
data = load_workbook('test.xlsx', 'wb')
sheet = data.active
que = Queue()
result = []
#выгрузка задач из таблицы Excel и запись в очередь
for row in sheet:
for cell in row:
result.append(cell.value)
que.put(result)
result = []
#выдача команд из очереди
def get_task():
return que.get()
#отправка команд клиентам.
Декоратор «app.route» регистрирует URL-адрес, по которому будет доступно представление, описанное ниже и вызывается экземпляром приложения app(Flask):
@app.route('/')
@app.route('/index')
def index():
return ' '.join(map(str, get_task()))
Весь код сервера представлен ниже:
from flask import Flask
app = Flask(__name__)
from openpyxl import load_workbook
from multiprocessing import Queue
data = load_workbook('test.xlsx', 'wb')
sheet = data.active
que = Queue()
result = []
#выгрузка задач из таблицы Excel и запись в очередь
for row in sheet:
for cell in row:
result.append(cell.value)
que.put(result)
result = []
#выдача команд из очереди
def get_task():
return que.get()
#отправка команд клиентам
@app.route('/')
@app.route('/index')
def index():
if not que.empty():
return ' '.join(map(str, get_task()))
else:
return
if __name__ == '__main__':
app.run(debug=False)
Сервер готов, очередь заполнена задачами, следующим пунктом будет разработка клиента для правильной интерпретации принятых команд и запуска соответствующих скриптов. Рецепт программы-клиента состоит из следующих ингредиентов:
«Requests» для обращения к серверу и получения задачи.
def get_task():
for _ in range(2):
resp = rq.get('localhost')
if resp != “the end”:
tasks.append(list((resp.content.decode('utf-8').split(' '))))
else:
return None
return (tasks)
«Multiprocessing» для выполнения 2-х задач (можно и другое количество, зависящее от характеристик вашего устройства) параллельно.
if __name__ == '__main__':
while True:
tasks = [] #создаем пустой список для задач
response = get_task()
NUM_CORE = 2 #объявляем количество ядер
read = mp.Queue()
[read.put(x) for x in response] #записываем очередь командами
print("read qsize", read.qsize())
for i in range(NUM_CORE):
mp.Process(target=load, args=(read,)).start() #запускаем скрипт
mp.join()
while True:
time.sleep(1)
Модуль «os» непосредственно, для запуска Hikload.
def load(read):
while True:
if not read.empty(): #проверяем заполненность очереди
x = read.get()
name_proc = mp.current_process().name
print(f"{name_proc} stared") #выводим работающие процессы
task = (f'hikload --server={x[2]} '
f'--user={x[0]} '
f'--password={x[1]} '
f'--cameras={x[3]} '
f'--downloads {x[2]+ "_" + name_proc}'
)
print(task)
'''Запуск скрипта с нашими командами'''
os.system(task)
else:
break
Код клиента целиком выглядит следующим образом:
import multiprocessing as mp, os, time, requests as rq
#получаем задачи с сервера
def get_task():
for _ in range(2):
resp = rq.get('http://127.0.0.1:5000/')
if resp != 'the end':
tasks.append(list((resp.content.decode('utf-8').split(' '))))
else:
return None
return (tasks)
#запуск выгрузки видеозаписей
def load(read):
while True:
if not read.empty(): #проверяем заполненность очереди
x = read.get()
name_proc = mp.current_process().name
print(f"{name_proc} stared") #выводим работающие процессы
task = (f'hikload --server={x[2]} '
f'--user={x[0]} '
f'--password={x[1]} '
f'--cameras={x[3] '
f'--downloads {x[2]+ "_" + name_proc}'
)
print(task)
'''Запуск скрипта с нашими командами'''
os.system(task)
else:
break
if __name__ == '__main__':
while True:
tasks = []
response = get_task()
NUM_CORE = 2 #объявляем количество ядер
read = mp.Queue()
get_task() #вызываем функцию загрузки задач
[read.put(x) for x in tasks] #записываем очередь командами
print("read qsize", read.qsize())
for i in range(NUM_CORE):
mp.Process(target=load, args=(read,)).start() #запускаем скрипт
mp.join()
while True:
time.sleep(1)
Для начала запускаем сервер (Рис. 5) на локальном хосте, следующей командой: flask run

Далее запускаем клиент, команда: python main.py:

Мы видим, что очередь автоматически заполнилась 2-мя задачами (Рис. 6) и клиент начал выгрузку, в это время сервер показывает, что передал 2 команды, код 200 в ответе говорит об успешной обработке запроса (Рис. 7)

Под каждую команду создаются отдельные папки, где хранятся выгруженные видеозаписи, названия формируются параметром --download (рис.8). После отработки/выгрузки команды – клиент снова посылает запрос на сервер и получает новую команду.

И, конечно, вывод
В результате мы реализовали автоматизированное выполнение задачи, разделенной на множество подзадач. На каждом клиенте выполняются 2 выгрузки параллельно (рис.9)

Из явных преимуществ такого подхода можно выделить:
Загрузка видеозаписей проходит быстрее с каждым подключенным клиентом.
Все задачи распределяются автоматически.
Сервер прост в разработке, многофункционален и его можно использовать для любых, похожих на показанной в примере задач.