Привет, Хабр!
Подготовительным этапом для видеоаналитики с применением методов машинного обучения - является выгрузка записей из видеорегистраторов Hikvision, что является достаточно длительной задачей, особенно если регистраторов несколько, давайте попытаемся разобраться как выполнить эту работу быстрее и удобнее, используя сервер.

Основанием для разработки «Task-Сервера» - послужила задача по выгрузке видеозаписей с видеорегистраторов Hikvision. В работе была использована библиотека Hikload которая подключалась к регистраторам и загружала видеоролики.
Для начала немного о «Task-Сервере» (Рис. 1), в данном случае это небольшой сервер, хранящий в базе данных или файле заранее подготовленные строки, которые будут переданы в качестве параметров клиентам, подключенным к хосту. Клиент же в свою очередь запускает скрипт с полученными данными и после выполнения запрашивает новую задачу до тех пор, пока задачи не закончатся.

В качестве инструмента для создания такого сервера был выбран микрофреймворк Flask, нам достаточно небольшого веб-приложения для нашей задачи. Из плюсов Flask можно выделить простоту создания веб-приложений, отсутствие сложных настроек и достаточно стабильная и быстрая работа. В качестве хранилища для команд была выбрана таблица Excel. Все задачи были сформированы заранее.
Проблема
Так как количество камер в разных помещениях, разных городов достаточно много, а время выгрузки с одного устройства сильно варьировалось, от нескольких минут до часа (Рис. 2), то было принято решение: на основе параметров для подключения к регистраторам сформировать задачи и распределить их на разных машинах, для чего и потребовался сервер.

Решение:
Начнем работу с создания сервера, для начала создаем новое окружение под приложение следующей командой в терминале:
python –m venv venv
Так как решение было использовать Flask то подключаем следующие модули:
«Flask» - для создания веб-приложения.
«openpyxl» – непосредственно для работы с excel файлом, где находятся команды.
«multiprocessing» - из этого модуля нам потребуются только очереди, так как они идеально подходят для нашей задачи.
Для начала импортируем библиотеку Flask и инициализируем приложение:
from flask import Flask app = Flask(__name__)
Импортируем остальные библиотеки:
from openpyxl import load_workbook from multiprocessing import Queue
Создаем и наполняем базу данных задачами, которые сервер будет возвращать клиенту. Задачи сформированы заранее, они состоят из сервера/ ip – адреса к которому подключаемся, user - логина, password - пароля, cameras - номера камеры. По умолчанию если не задавать дату и время - то Hikload выгружает видео за один текущий день. Полный список параметров для выгрузки видео можно узнать, набрав «hikload --help» (рис. 3)

Далее добавим функцию, которая возвращает команду из базы данных. Все задачи из таблицы Excel выгружаются в Queue(очередь).
data = load_workbook('test.xlsx', 'wb') sheet = data.active que = Queue() result = [] #выгрузка задач из таблицы Excel и запись в очередь for row in sheet: for cell in row: result.append(cell.value) que.put(result) result = [] #выдача команд из очереди def get_task(): return que.get() #отправка команд клиентам.
Декоратор «app.route» регистрирует URL-адрес, по которому будет доступно представление, описанное ниже и вызывается экземпляром приложения app(Flask):
@app.route('/') @app.route('/index') def index(): return ' '.join(map(str, get_task()))
Весь код сервера представлен ниже:
from flask import Flask app = Flask(__name__) from openpyxl import load_workbook from multiprocessing import Queue data = load_workbook('test.xlsx', 'wb') sheet = data.active que = Queue() result = [] #выгрузка задач из таблицы Excel и запись в очередь for row in sheet: for cell in row: result.append(cell.value) que.put(result) result = [] #выдача команд из очереди def get_task(): return que.get() #отправка команд клиентам @app.route('/') @app.route('/index') def index(): if not que.empty(): return ' '.join(map(str, get_task())) else: return if __name__ == '__main__': app.run(debug=False)
Сервер готов, очередь заполнена задачами, следующим пунктом будет разработка клиента для правильной интерпретации принятых команд и запуска соответствующих скриптов. Рецепт программы-клиента состоит из следующих ингредиентов:
«Requests» для обращения к серверу и получения задачи.
def get_task(): for _ in range(2): resp = rq.get('localhost') if resp != “the end”: tasks.append(list((resp.content.decode('utf-8').split(' ')))) else: return None return (tasks)
«Multiprocessing» для выполнения 2-х задач (можно и другое количество, зависящее от характеристик вашего устройства) параллельно.
if __name__ == '__main__': while True: tasks = [] #создаем пустой список для задач response = get_task() NUM_CORE = 2 #объявляем количество ядер read = mp.Queue() [read.put(x) for x in response] #записываем очередь командами print("read qsize", read.qsize()) for i in range(NUM_CORE): mp.Process(target=load, args=(read,)).start() #запускаем скрипт mp.join() while True: time.sleep(1)
Модуль «os» непосредственно, для запуска Hikload.
def load(read): while True: if not read.empty(): #проверяем заполненность очереди x = read.get() name_proc = mp.current_process().name print(f"{name_proc} stared") #выводим работающие процессы task = (f'hikload --server={x[2]} ' f'--user={x[0]} ' f'--password={x[1]} ' f'--cameras={x[3]} ' f'--downloads {x[2]+ "_" + name_proc}' ) print(task) '''Запуск скрипта с нашими командами''' os.system(task) else: break
Код клиента целиком выглядит следующим образом:
import multiprocessing as mp, os, time, requests as rq #получаем задачи с сервера def get_task(): for _ in range(2): resp = rq.get('http://127.0.0.1:5000/') if resp != 'the end': tasks.append(list((resp.content.decode('utf-8').split(' ')))) else: return None return (tasks) #запуск выгрузки видеозаписей def load(read): while True: if not read.empty(): #проверяем заполненность очереди x = read.get() name_proc = mp.current_process().name print(f"{name_proc} stared") #выводим работающие процессы task = (f'hikload --server={x[2]} ' f'--user={x[0]} ' f'--password={x[1]} ' f'--cameras={x[3] ' f'--downloads {x[2]+ "_" + name_proc}' ) print(task) '''Запуск скрипта с нашими командами''' os.system(task) else: break if __name__ == '__main__': while True: tasks = [] response = get_task() NUM_CORE = 2 #объявляем количество ядер read = mp.Queue() get_task() #вызываем функцию загрузки задач [read.put(x) for x in tasks] #записываем очередь командами print("read qsize", read.qsize()) for i in range(NUM_CORE): mp.Process(target=load, args=(read,)).start() #запускаем скрипт mp.join() while True: time.sleep(1)
Для начала запускаем сервер (Рис. 5) на локальном хосте, следующей командой: flask run

Далее запускаем клиент, команда: python main.py:

Мы видим, что очередь автоматически заполнилась 2-мя задачами (Рис. 6) и клиент начал выгрузку, в это время сервер показывает, что передал 2 команды, код 200 в ответе говорит об успешной обработке запроса (Рис. 7)

Под каждую команду создаются отдельные папки, где хранятся выгруженные видеозаписи, названия формируются параметром --download (рис.8). После отработки/выгрузки команды – клиент снова посылает запрос на сервер и получает новую команду.

И, конечно, вывод
В результате мы реализовали автоматизированное выполнение задачи, разделенной на множество подзадач. На каждом клиенте выполняются 2 выгрузки параллельно (рис.9)

Из явных преимуществ такого подхода можно выделить:
Загрузка видеозаписей проходит быстрее с каждым подключенным клиентом.
Все задачи распределяются автоматически.
Сервер прост в разработке, многофункционален и его можно использовать для любых, похожих на показанной в примере задач.
