Давно мучал вопрос передачи показаний давления системы отопления со штатного манометра газового котла. Для этого несколько лет назад была приобретена камера ESP32-CAM и интегрирована в Home Assistant посредством ESPHome.

Камеру я направил прямо на манометр, что позволило мне периодически вручную удаленно контролировать давление в системе отопления, и, при необходимости, открывать кран подачи воды в систему (также удалённо).

Для того, чтобы хоть как-то облагородить этот "аналоговый" процесс в Node-RED была создана автоматизация, завязанная на контроле значений энергопотребления котла и насосов:

Работало это так:
В Телеграм приходит уведомление об ошибке котла;
Я лезу в интерфейс HA и в камере смотрю какое давление;
Если давление низкое - открываю кран подпитки.
Уже неплохо, но руки-то чешутся, поэтому я решил, что изображение надо распознать и вывести давление в HA в нормальном виде.
Выбор способа оцифровки
Помимо Raspberry Pi 4B, на котором у меня крутится HA у меня есть самосборный NAS c Xpenology на борту. Поэтому я решил задействовать в этом деле его.
Установка контейнера Python в Xpenology
Первым делом в Container Manager загружаем образ Python 3-10-slim

Создание контейнера:
Открываем вкладку Образы, выбираем python и нажимаем Запустить;
Как-нибудь называем контейнер;
На этапе Настройка томов:
Нажимаем Добавить папку
Выбираем папку /volume1/docker/gauge
В контейнере указываем путь: /app
Нажимаем Далее, потом Применить

Далее временно включаем доступ к NAS по SSH в панели управления:

И подключаемся к с серверу с помощью Putty

Вводим логин, пароль, переключаемся на root, получаем ID контейнера:
docker ps
Заходим в контейнер:
docker exec -it ID_контейнера bash
Устанавливаем необходимые библиотеки:
apt update && apt install -y python3-pip libgl1 libglib2.0-0 apt-get install -y python3-pip python3-dev apt-get install -y libglib2.0-0 libsm6 libxext6 libxrender-dev apt-get install -y qt5-qmake qtbase5-dev-tools qtchooser apt-get install -y libx11-dev libgl1-mesa-glx libfontconfig1 libxkbcommon0 apt-get install -y libxcb-xinerama0 pip install opencv-python numpy pip3 install opencv-python matplotlib
Создаем скрипт gauge_reader.py и кладем его в папку docker\gauge на NAS. К нему мы вернемся чуть позже.
Настройка Home Assistant
Теперь мне нужно положить в эту же папку, где лежит скрипт само изображение, которое нужно распознать. Для этого делаем следующее:
В HA переходим в Настройки, Хранилище, Добавить сетевое хранилище, указываем путь к папке docker на NAS:

После создаем автоматизацию, которая будет каждый час включать светодиод на ESP32-CAM, делать снимок, выкладывать его в папку gauge, которую мы подключили шагом выше, и выключать светодиод:
alias: ESP Cam snapshot каждый час description: "" triggers: - minutes: 0 trigger: time_pattern actions: - target: entity_id: switch.pressure_boiler_flash_led action: switch.turn_on data: {} - delay: hours: 0 minutes: 0 seconds: 10 milliseconds: 0 - data: entity_id: camera.pressure_boiler_my_camera filename: /media/gauge/sample.jpg action: camera.snapshot - delay: hours: 0 minutes: 0 seconds: 10 milliseconds: 0 - target: entity_id: switch.pressure_boiler_flash_led action: switch.turn_off data: {} mode: single
И сразу создадим сенсор, в который будет присылать NAS результат распознавания давления:
sensor: - name: "Boiler Pressure" state_topic: "sensors/boiler/pressure" unit_of_measurement: "bar" device_class: pressure unique_id: pressure_boiler
Подготовка фото
Теперь надо немного поколдовать с самим объектом - манометром. Спустя пару десятков итераций со скриптом я пришел к тому, что несмотря на все ухищрения, качество изображения не позволяет со 100% результатом распознать стрелку манометра. Поэтому я наклеил вокруг манометра квадрат для возможности правильной обрезки и коррекции перспективы. Получилось так:

Написание скрипта (хвала нейросетям!)
Теперь переходим к скрипту. Задачу по определению давления разбиваем на следующие шаги:
Обнаруживаем на фото четырехугольник;
Корректируем перспективу - превращаем четырехугольник в квадрат и обрезаем изображение;
Определяем центр и стрелку манометра;
Замеряем угол отклонения стрелки и по таблице калибровки переводим угол-градусы в давление-бары;
Отрисовываем некоторые этапы для контроля выполнения кода скрипта;
Отправляем полученные данные в mqtt HA.
import cv2 import numpy as np import math import logging import paho.mqtt.client as mqtt from typing import Optional, Tuple, List # Настройка логгирования logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Конфигурация class Config: MQTT_BROKER = "192.168.1.****" MQTT_PORT = **** MQTT_TOPIC = "sensors/boiler/pressure" MQTT_USER = "****" MQTT_PASS = "****" MQTT_TIMEOUT = 5 CALIBRATION = { 215: 0.0, 155: 1.0, 90: 2.0, 29: 3.0, 327: 4.0 } OUTPUT_DIR = '/app/' OUTPUT_SIZE = 600 BLUR_SIZE = (5, 5) ADAPTIVE_THRESH_BLOCK = 11 ADAPTIVE_THRESH_C = 2 HOUGH_CIRCLES_PARAMS = { 'dp': 1, 'minDist': 100, 'param1': 50, 'param2': 30, 'minRadius': 100, 'maxRadius': 200 } HOUGH_LINES_PARAMS = { 'rho': 1, 'theta': np.pi/180, 'threshold': 50, 'minLineLength': 150, 'maxLineGap': 10 } def detect_quadrilateral(image: np.ndarray) -> Tuple[Optional[List[Tuple[int, int]]], np.ndarray]: """Определение углов четырехугольника вокруг манометра""" gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) _, thresh = cv2.threshold(gray, 50, 255, cv2.THRESH_BINARY_INV) kernel = np.ones((5,5), np.uint8) thresh = cv2.morphologyEx(thresh, cv2.MORPH_CLOSE, kernel) thresh = cv2.morphologyEx(thresh, cv2.MORPH_OPEN, kernel) contours, _ = cv2.findContours(thresh, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) if not contours: return None, image.copy() largest_contour = max(contours, key=cv2.contourArea) epsilon = 0.03 * cv2.arcLength(largest_contour, True) approx = cv2.approxPolyDP(largest_contour, epsilon, True) if len(approx) != 4: for eps in [0.02, 0.04, 0.05]: epsilon = eps * cv2.arcLength(largest_contour, True) approx = cv2.approxPolyDP(largest_contour, epsilon, True) if len(approx) == 4: break else: return None, image.copy() points = np.array([point[0] for point in approx], dtype=np.float32) center = np.mean(points, axis=0) def sort_key(point): return math.atan2(point[1] - center[1], point[0] - center[0]) sorted_points = sorted(points, key=sort_key, reverse=True) bottom_idx = np.argmax([p[1] for p in sorted_points]) sorted_points = np.roll(sorted_points, -bottom_idx, axis=0) if bottom_idx == 1: sorted_points = np.roll(sorted_points, -1, axis=0) corners = [tuple(map(int, p)) for p in sorted_points] vis = image.copy() for i, corner in enumerate(corners): cv2.circle(vis, corner, 10, (0, 255, 0), -1) cv2.putText(vis, f"{i+1}", (corner[0]+15, corner[1]+15), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 255), 2) for i in range(4): cv2.line(vis, corners[i], corners[(i+1)%4], (255, 0, 0), 2) return corners, vis def correct_perspective(image: np.ndarray, corners: List[Tuple[int, int]]) -> np.ndarray: """Коррекция перспективы на основе найденных углов""" size = Config.OUTPUT_SIZE dst_points = np.array([ [0, size-1], [size-1, size-1], [size-1, 0], [0, 0] ], dtype=np.float32) src_points = np.array(corners, dtype=np.float32) matrix = cv2.getPerspectiveTransform(src_points, dst_points) corrected = cv2.warpPerspective(image, matrix, (size, size)) return corrected def enhance_image(image: np.ndarray, roi: Tuple[int, int, int, int]) -> Tuple[np.ndarray, np.ndarray]: """Улучшение изображения и выделение стрелки""" x, y, w, h = roi cropped = image[y:y+h, x:x+w] gray = cv2.cvtColor(cropped, cv2.COLOR_BGR2GRAY) blurred = cv2.GaussianBlur(gray, Config.BLUR_SIZE, 0) thresh = cv2.adaptiveThreshold( blurred, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C, cv2.THRESH_BINARY_INV, Config.ADAPTIVE_THRESH_BLOCK, Config.ADAPTIVE_THRESH_C ) return thresh, cropped def find_dial_center(image: np.ndarray, roi: Tuple[int, int, int, int]) -> Tuple[Tuple[int, int, int], np.ndarray]: """Определение центра циферблата""" x, y, w, h = roi cropped = image[y:y+h, x:x+w] gray = cv2.cvtColor(cropped, cv2.COLOR_BGR2GRAY) vis = cropped.copy() circles = cv2.HoughCircles(gray, cv2.HOUGH_GRADIENT, **Config.HOUGH_CIRCLES_PARAMS) if circles is not None: circles = np.uint16(np.around(circles)) best = circles[0][0] cv2.circle(vis, (best[0], best[1]), best[2], (0, 255, 0), 2) cv2.circle(vis, (best[0], best[1]), 2, (0, 0, 255), 3) center_vis_path = f"{Config.OUTPUT_DIR}center_detection.jpg" cv2.imwrite(center_vis_path, vis) return (x + best[0], y + best[1], best[2]), vis center = (x + w//2, y + h//2, min(w, h)//2) cv2.circle(vis, (w//2, h//2), min(w, h)//2, (0, 255, 0), 2) center_vis_path = f"{Config.OUTPUT_DIR}center_detection.jpg" cv2.imwrite(center_vis_path, vis) return center, vis def find_arrow_angle(thresh: np.ndarray, original: np.ndarray, roi: Tuple[int, int, int, int], center_info: Tuple[int, int, int]) -> Tuple[Optional[float], Optional[Tuple[int, int]], np.ndarray]: """Поиск угла стрелки""" x, y, w, h = roi cX, cY, radius = center_info cropped = original[y:y+h, x:x+w] cX_roi, cY_roi = cX - x, cY - y params = Config.HOUGH_LINES_PARAMS.copy() params['minLineLength'] = radius * 0.5 edges = cv2.Canny(thresh, 50, 150) lines = cv2.HoughLinesP(edges, **params) if lines is not None: filtered = [] for line in lines: x1, y1, x2, y2 = line[0] dist1 = math.hypot(x1 - cX_roi, y1 - cY_roi) dist2 = math.hypot(x2 - cX_roi, y2 - cY_roi) if min(dist1, dist2) < radius * 0.3: filtered.append(line) if filtered: longest = max(filtered, key=lambda l: math.hypot(l[0][0]-l[0][2], l[0][1]-l[0][3])) x1, y1, x2, y2 = longest[0] tip = (x2, y2) if math.hypot(x1-cX_roi, y1-cY_roi) < math.hypot(x2-cX_roi, y2-cY_roi) else (x1, y1) angle = math.degrees(math.atan2(cY_roi - tip[1], tip[0] - cX_roi)) % 360 cv2.line(cropped, (cX_roi, cY_roi), tip, (0, 0, 255), 3) return angle, (cX, cY), cropped return None, None, cropped def setup_mqtt() -> mqtt.Client: """Настройка MQTT клиента""" client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2) client.username_pw_set(Config.MQTT_USER, Config.MQTT_PASS) client.connect(Config.MQTT_BROKER, Config.MQTT_PORT, Config.MQTT_TIMEOUT) return client def calibrate_pressure(angle: float) -> float: """Калибровка давления""" angles = np.array(sorted(Config.CALIBRATION.keys())) pressures = np.array([Config.CALIBRATION[a] for a in angles]) return float(np.interp(angle, angles, pressures)) def send_mqtt(pressure: float) -> bool: """Отправка данных через MQTT""" try: client = setup_mqtt() client.publish(Config.MQTT_TOPIC, f"{pressure:.2f}") client.disconnect() return True except Exception as e: logger.error(f"Ошибка MQTT: {str(e)}") return False def process_image(image_path: str) -> Optional[float]: """Основная функция обработки изображения""" try: image = cv2.imread(image_path) if image is None: raise FileNotFoundError(f"Не удалось загрузить изображение: {image_path}") quad_corners, quad_vis = detect_quadrilateral(image) if quad_corners is None: logger.error("Не удалось обнаружить четырехугольник") return None cv2.imwrite(f"{Config.OUTPUT_DIR}quadrilateral_detection.jpg", quad_vis) corrected_image = correct_perspective(image, quad_corners) cv2.imwrite(f"{Config.OUTPUT_DIR}corrected_perspective.jpg", corrected_image) corrected_roi = (0, 0, Config.OUTPUT_SIZE, Config.OUTPUT_SIZE) thresh, cropped = enhance_image(corrected_image, corrected_roi) center, _ = find_dial_center(corrected_image, corrected_roi) angle, _, processed = find_arrow_angle(thresh, corrected_image, corrected_roi, center) if angle is None: logger.error("Не удалось определить угол стрелки") return None pressure = calibrate_pressure(angle) cv2.putText(processed, f"Angle: {angle:.1f}°", (20, 40), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2) cv2.putText(processed, f"Pressure: {pressure:.2f} bar", (20, 80), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2) cv2.imwrite(f"{Config.OUTPUT_DIR}processed.jpg", processed) return pressure except Exception as e: logger.error(f"Ошибка обработки: {str(e)}") return None def main(): image_path = '/app/sample.jpg' pressure = process_image(image_path) if pressure is not None: send_mqtt(pressure) if __name__ == "__main__": main()
Код скрипта писал трудяга DeepSeek под моим чутким руководством. Сначала пытался это сделать в ChatGPT, но после достижения бесплатного лимита запросов он резко "тупел" и дело шло уже не так хорошо:)




Настройка автозапуска скрипта в Xpenology
Осталось дело за малым - настраиваем на NAS автозапуск скрипта. Переходим в Панель управления, Планировщик задач, Создать, Запланированная задача, Скрипт заданный пользователем.
Во вкладке Общие указываем, что задача должна выполняться от пользователя root. Расписание - ежедневно каждый час. В Настройки задач в поле Выполнить команду прописываем docker exec ID_контейнера python3 /app/gauge_reader.py.

Результат
Теперь раз в час HA делает фотографию манометра и кладет ее в папку NAS. А NAS спустя пять минут распознает это дело шлёт в mqtt фактическое давление в системе отопления.

Да, я в курсе, что сейчас уже есть DIY zigbee манометры. Может быть я соберусь с мыслями, возьму паяльник и врежусь в систему отопления и в систему водоснабжения, чтобы можно было автоматизировать включение крана подпитки системы отопления только в случае наличия холодной воды. Но пока пусть поработает так.
Почему не реализовал распознавание изображения на Raspberry Pi? Просто так захотелось))
Сама идея взаимодействия двух устройств в домашней сети показалась интересной.
