Давно мучал вопрос передачи показаний давления системы отопления со штатного манометра газового котла. Для этого несколько лет назад была приобретена камера ESP32-CAM и интегрирована в Home Assistant посредством ESPHome.

Камеру я направил прямо на манометр, что позволило мне периодически вручную удаленно контролировать давление в системе отопления, и, при необходимости, открывать кран подачи воды в систему (также удалённо).

Штатный светодиод ESP32-CAM помогает разглядеть манометр в закрытом шкафу

Для того, чтобы хоть как-то облагородить этот "аналоговый" процесс в Node-RED была создана автоматизация, завязанная на контроле значений энергопотребления котла и насосов:

Работало это так:

  1. В Телеграм приходит уведомление об ошибке котла;

  2. Я лезу в интерфейс HA и в камере смотрю какое давление;

  3. Если давление низкое - открываю кран подпитки.

Уже неплохо, но руки-то чешутся, поэтому я решил, что изображение надо распознать и вывести давление в HA в нормальном виде.

Выбор способа оцифровки

Помимо Raspberry Pi 4B, на котором у меня крутится HA у меня есть самосборный NAS c Xpenology на борту. Поэтому я решил задействовать в этом деле его.

Установка контейнера Python в Xpenology

Первым делом в Container Manager загружаем образ Python 3-10-slim

Создание контейнера:

  1. Открываем вкладку Образы, выбираем python и нажимаем Запустить;

  2. Как-нибудь называем контейнер;

  3. На этапе Настройка томов:

    • Нажимаем Добавить папку

    • Выбираем папку /volume1/docker/gauge

    • В контейнере указываем путь: /app

  4. Нажимаем Далее, потом Применить

Получаем такой результат

Далее временно включаем доступ к NAS по SSH в панели управления:

И подключаемся к с серверу с помощью Putty

Вводим логин, пароль, переключаемся на root, получаем ID контейнера:

docker ps

Заходим в контейнер:

docker exec -it ID_контейнера bash

Устанавливаем необходимые библиотеки:

apt update && apt install -y python3-pip libgl1 libglib2.0-0
apt-get install -y python3-pip python3-dev
apt-get install -y libglib2.0-0 libsm6 libxext6 libxrender-dev
apt-get install -y qt5-qmake qtbase5-dev-tools qtchooser
apt-get install -y libx11-dev libgl1-mesa-glx libfontconfig1 libxkbcommon0
apt-get install -y libxcb-xinerama0
pip install opencv-python numpy
pip3 install opencv-python matplotlib

Создаем скрипт gauge_reader.py и кладем его в папку docker\gauge на NAS. К нему мы вернемся чуть позже.

Настройка Home Assistant

Теперь мне нужно положить в эту же папку, где лежит скрипт само изображение, которое нужно распознать. Для этого делаем следующее:

  1. В HA переходим в Настройки, Хранилище, Добавить сетевое хранилище, указываем путь к папке docker на NAS:

После создаем автоматизацию, которая будет каждый час включать светодиод на ESP32-CAM, делать снимок, выкладывать его в папку gauge, которую мы подключили шагом выше, и выключать светодиод:

alias: ESP Cam snapshot каждый час
description: ""
triggers:
  - minutes: 0
    trigger: time_pattern
actions:
  - target:
      entity_id: switch.pressure_boiler_flash_led
    action: switch.turn_on
    data: {}
  - delay:
      hours: 0
      minutes: 0
      seconds: 10
      milliseconds: 0
  - data:
      entity_id: camera.pressure_boiler_my_camera
      filename: /media/gauge/sample.jpg
    action: camera.snapshot
  - delay:
      hours: 0
      minutes: 0
      seconds: 10
      milliseconds: 0
  - target:
      entity_id: switch.pressure_boiler_flash_led
    action: switch.turn_off
    data: {}
mode: single

И сразу создадим сенсор, в который будет присылать NAS результат распознавания давления:

sensor:
    - name: "Boiler Pressure"
      state_topic: "sensors/boiler/pressure"
      unit_of_measurement: "bar"
      device_class: pressure
      unique_id: pressure_boiler

Подготовка фото

Теперь надо немного поколдовать с самим объектом - манометром. Спустя пару десятков итераций со скриптом я пришел к тому, что несмотря на все ухищрения, качество изображения не позволяет со 100% результатом распознать стрелку манометра. Поэтому я наклеил вокруг манометра квадрат для возможности правильной обрезки и коррекции перспективы. Получилось так:

ESP32-CAM в лего-кронштейне. Над манометром временно прилепил белый лист бумаги, т.к. рельеф корпуса котла иногда вносил небольшую неразбериху в определении угла квадрата.

Написание скрипта (хвала нейросетям!)

Теперь переходим к скрипту. Задачу по определению давления разбиваем на следующие шаги:

  1. Обнаруживаем на фото четырехугольник;

  2. Корректируем перспективу - превращаем четырехугольник в квадрат и обрезаем изображение;

  3. Определяем центр и стрелку манометра;

  4. Замеряем угол отклонения стрелки и по таблице калибровки переводим угол-градусы в давление-бары;

  5. Отрисовываем некоторые этапы для контроля выполнения кода скрипта;

  6. Отправляем полученные данные в mqtt HA.

import cv2
import numpy as np
import math
import logging
import paho.mqtt.client as mqtt
from typing import Optional, Tuple, List

# Настройка логгирования
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Конфигурация
class Config:
    MQTT_BROKER = "192.168.1.****"
    MQTT_PORT = ****
    MQTT_TOPIC = "sensors/boiler/pressure"
    MQTT_USER = "****"
    MQTT_PASS = "****"
    MQTT_TIMEOUT = 5
    
    CALIBRATION = {
        215: 0.0,
        155: 1.0,
        90: 2.0,
        29: 3.0,
        327: 4.0
    }
    
    OUTPUT_DIR = '/app/'
    OUTPUT_SIZE = 600
    BLUR_SIZE = (5, 5)
    ADAPTIVE_THRESH_BLOCK = 11
    ADAPTIVE_THRESH_C = 2
    HOUGH_CIRCLES_PARAMS = {
        'dp': 1,
        'minDist': 100,
        'param1': 50,
        'param2': 30,
        'minRadius': 100,
        'maxRadius': 200
    }
    HOUGH_LINES_PARAMS = {
        'rho': 1,
        'theta': np.pi/180,
        'threshold': 50,
        'minLineLength': 150,
        'maxLineGap': 10
    }

def detect_quadrilateral(image: np.ndarray) -> Tuple[Optional[List[Tuple[int, int]]], np.ndarray]:
    """Определение углов четырехугольника вокруг манометра"""
    gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
    _, thresh = cv2.threshold(gray, 50, 255, cv2.THRESH_BINARY_INV)
    
    kernel = np.ones((5,5), np.uint8)
    thresh = cv2.morphologyEx(thresh, cv2.MORPH_CLOSE, kernel)
    thresh = cv2.morphologyEx(thresh, cv2.MORPH_OPEN, kernel)
    
    contours, _ = cv2.findContours(thresh, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
    
    if not contours:
        return None, image.copy()
    
    largest_contour = max(contours, key=cv2.contourArea)
    epsilon = 0.03 * cv2.arcLength(largest_contour, True)
    approx = cv2.approxPolyDP(largest_contour, epsilon, True)
    
    if len(approx) != 4:
        for eps in [0.02, 0.04, 0.05]:
            epsilon = eps * cv2.arcLength(largest_contour, True)
            approx = cv2.approxPolyDP(largest_contour, epsilon, True)
            if len(approx) == 4:
                break
        else:
            return None, image.copy()
    
    points = np.array([point[0] for point in approx], dtype=np.float32)
    center = np.mean(points, axis=0)
    
    def sort_key(point):
        return math.atan2(point[1] - center[1], point[0] - center[0])
    
    sorted_points = sorted(points, key=sort_key, reverse=True)
    bottom_idx = np.argmax([p[1] for p in sorted_points])
    sorted_points = np.roll(sorted_points, -bottom_idx, axis=0)
    
    if bottom_idx == 1:
        sorted_points = np.roll(sorted_points, -1, axis=0)
    
    corners = [tuple(map(int, p)) for p in sorted_points]
    
    vis = image.copy()
    for i, corner in enumerate(corners):
        cv2.circle(vis, corner, 10, (0, 255, 0), -1)
        cv2.putText(vis, f"{i+1}", (corner[0]+15, corner[1]+15), 
                   cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 255), 2)
    
    for i in range(4):
        cv2.line(vis, corners[i], corners[(i+1)%4], (255, 0, 0), 2)
    
    return corners, vis

def correct_perspective(image: np.ndarray, corners: List[Tuple[int, int]]) -> np.ndarray:
    """Коррекция перспективы на основе найденных углов"""
    size = Config.OUTPUT_SIZE
    dst_points = np.array([
        [0, size-1],
        [size-1, size-1],
        [size-1, 0],
        [0, 0]
    ], dtype=np.float32)
    
    src_points = np.array(corners, dtype=np.float32)
    matrix = cv2.getPerspectiveTransform(src_points, dst_points)
    corrected = cv2.warpPerspective(image, matrix, (size, size))
    
    return corrected

def enhance_image(image: np.ndarray, roi: Tuple[int, int, int, int]) -> Tuple[np.ndarray, np.ndarray]:
    """Улучшение изображения и выделение стрелки"""
    x, y, w, h = roi
    cropped = image[y:y+h, x:x+w]
    gray = cv2.cvtColor(cropped, cv2.COLOR_BGR2GRAY)
    blurred = cv2.GaussianBlur(gray, Config.BLUR_SIZE, 0)
    thresh = cv2.adaptiveThreshold(
        blurred, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C,
        cv2.THRESH_BINARY_INV, Config.ADAPTIVE_THRESH_BLOCK, Config.ADAPTIVE_THRESH_C
    )
    return thresh, cropped

def find_dial_center(image: np.ndarray, roi: Tuple[int, int, int, int]) -> Tuple[Tuple[int, int, int], np.ndarray]:
    """Определение центра циферблата"""
    x, y, w, h = roi
    cropped = image[y:y+h, x:x+w]
    gray = cv2.cvtColor(cropped, cv2.COLOR_BGR2GRAY)
    vis = cropped.copy()
    
    circles = cv2.HoughCircles(gray, cv2.HOUGH_GRADIENT, **Config.HOUGH_CIRCLES_PARAMS)
    
    if circles is not None:
        circles = np.uint16(np.around(circles))
        best = circles[0][0]
        cv2.circle(vis, (best[0], best[1]), best[2], (0, 255, 0), 2)
        cv2.circle(vis, (best[0], best[1]), 2, (0, 0, 255), 3)
        center_vis_path = f"{Config.OUTPUT_DIR}center_detection.jpg"
        cv2.imwrite(center_vis_path, vis)
        return (x + best[0], y + best[1], best[2]), vis
    
    center = (x + w//2, y + h//2, min(w, h)//2)
    cv2.circle(vis, (w//2, h//2), min(w, h)//2, (0, 255, 0), 2)
    center_vis_path = f"{Config.OUTPUT_DIR}center_detection.jpg"
    cv2.imwrite(center_vis_path, vis)
    return center, vis

def find_arrow_angle(thresh: np.ndarray, original: np.ndarray, 
                    roi: Tuple[int, int, int, int], 
                    center_info: Tuple[int, int, int]) -> Tuple[Optional[float], Optional[Tuple[int, int]], np.ndarray]:
    """Поиск угла стрелки"""
    x, y, w, h = roi
    cX, cY, radius = center_info
    cropped = original[y:y+h, x:x+w]
    cX_roi, cY_roi = cX - x, cY - y
    
    params = Config.HOUGH_LINES_PARAMS.copy()
    params['minLineLength'] = radius * 0.5
    
    edges = cv2.Canny(thresh, 50, 150)
    lines = cv2.HoughLinesP(edges, **params)
    
    if lines is not None:
        filtered = []
        for line in lines:
            x1, y1, x2, y2 = line[0]
            dist1 = math.hypot(x1 - cX_roi, y1 - cY_roi)
            dist2 = math.hypot(x2 - cX_roi, y2 - cY_roi)
            if min(dist1, dist2) < radius * 0.3:
                filtered.append(line)
        
        if filtered:
            longest = max(filtered, key=lambda l: math.hypot(l[0][0]-l[0][2], l[0][1]-l[0][3]))
            x1, y1, x2, y2 = longest[0]
            
            tip = (x2, y2) if math.hypot(x1-cX_roi, y1-cY_roi) < math.hypot(x2-cX_roi, y2-cY_roi) else (x1, y1)
            angle = math.degrees(math.atan2(cY_roi - tip[1], tip[0] - cX_roi)) % 360
            
            cv2.line(cropped, (cX_roi, cY_roi), tip, (0, 0, 255), 3)
            return angle, (cX, cY), cropped
    
    return None, None, cropped

def setup_mqtt() -> mqtt.Client:
    """Настройка MQTT клиента"""
    client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2)
    client.username_pw_set(Config.MQTT_USER, Config.MQTT_PASS)
    client.connect(Config.MQTT_BROKER, Config.MQTT_PORT, Config.MQTT_TIMEOUT)
    return client

def calibrate_pressure(angle: float) -> float:
    """Калибровка давления"""
    angles = np.array(sorted(Config.CALIBRATION.keys()))
    pressures = np.array([Config.CALIBRATION[a] for a in angles])
    return float(np.interp(angle, angles, pressures))

def send_mqtt(pressure: float) -> bool:
    """Отправка данных через MQTT"""
    try:
        client = setup_mqtt()
        client.publish(Config.MQTT_TOPIC, f"{pressure:.2f}")
        client.disconnect()
        return True
    except Exception as e:
        logger.error(f"Ошибка MQTT: {str(e)}")
        return False

def process_image(image_path: str) -> Optional[float]:
    """Основная функция обработки изображения"""
    try:
        image = cv2.imread(image_path)
        if image is None:
            raise FileNotFoundError(f"Не удалось загрузить изображение: {image_path}")
        
        quad_corners, quad_vis = detect_quadrilateral(image)
        if quad_corners is None:
            logger.error("Не удалось обнаружить четырехугольник")
            return None
            
        cv2.imwrite(f"{Config.OUTPUT_DIR}quadrilateral_detection.jpg", quad_vis)
        corrected_image = correct_perspective(image, quad_corners)
        cv2.imwrite(f"{Config.OUTPUT_DIR}corrected_perspective.jpg", corrected_image)
        
        corrected_roi = (0, 0, Config.OUTPUT_SIZE, Config.OUTPUT_SIZE)
        thresh, cropped = enhance_image(corrected_image, corrected_roi)
        center, _ = find_dial_center(corrected_image, corrected_roi)
        angle, _, processed = find_arrow_angle(thresh, corrected_image, corrected_roi, center)
        
        if angle is None:
            logger.error("Не удалось определить угол стрелки")
            return None
        
        pressure = calibrate_pressure(angle)
        cv2.putText(processed, f"Angle: {angle:.1f}°", (20, 40), 
                   cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
        cv2.putText(processed, f"Pressure: {pressure:.2f} bar", (20, 80),
                   cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
        
        cv2.imwrite(f"{Config.OUTPUT_DIR}processed.jpg", processed)
        return pressure
        
    except Exception as e:
        logger.error(f"Ошибка обработки: {str(e)}")
        return None

def main():
    image_path = '/app/sample.jpg'
    pressure = process_image(image_path)
    if pressure is not None:
        send_mqtt(pressure)

if __name__ == "__main__":
    main()

Код скрипта писал трудяга DeepSeek под моим чутким руководством. Сначала пытался это сделать в ChatGPT, но после достижения бесплатного лимита запросов он резко "тупел" и дело шло уже не так хорошо:)

Определение углов
Коррекция перспективы и обрезка изображения
Определение центра и окружности манометра
Поиск стрелки и определение давления по углу отклонения стрелки

Настройка автозапуска скрипта в Xpenology

Осталось дело за малым - настраиваем на NAS автозапуск скрипта. Переходим в Панель управления, Планировщик задач, Создать, Запланированная задача, Скрипт заданный пользователем.

Во вкладке Общие указываем, что задача должна выполняться от пользователя root. Расписание - ежедневно каждый час. В Настройки задач в поле Выполнить команду прописываем docker exec ID_контейнера python3 /app/gauge_reader.py.

Результат

Теперь раз в час HA делает фотографию манометра и кладет ее в папку NAS. А NAS спустя пять минут распознает это дело шлёт в mqtt фактическое давление в системе отопления.

Да, я в курсе, что сейчас уже есть DIY zigbee манометры. Может быть я соберусь с мыслями, возьму паяльник и врежусь в систему отопления и в систему водоснабжения, чтобы можно было автоматизировать включение крана подпитки системы отопления только в случае наличия холодной воды. Но пока пусть поработает так.

Почему не реализовал распознавание изображения на Raspberry Pi? Просто так захотелось))

Сама идея взаимодействия двух устройств в домашней сети показалась интересной.