Как стать автором
Обновить

Оцифровка показаний стрелочного манометра в Home Assistant

Уровень сложностиСредний
Время на прочтение9 мин
Количество просмотров6.8K

Давно мучал вопрос передачи показаний давления системы отопления со штатного манометра газового котла. Для этого несколько лет назад была приобретена камера ESP32-CAM и интегрирована в Home Assistant посредством ESPHome.

Камеру я направил прямо на манометр, что позволило мне периодически вручную удаленно контролировать давление в системе отопления, и, при необходимости, открывать кран подачи воды в систему (также удалённо).

Штатный светодиод ESP32-CAM помогает разглядеть манометр в закрытом шкафу
Штатный светодиод ESP32-CAM помогает разглядеть манометр в закрытом шкафу

Для того, чтобы хоть как-то облагородить этот "аналоговый" процесс в Node-RED была создана автоматизация, завязанная на контроле значений энергопотребления котла и насосов:

Работало это так:

  1. В Телеграм приходит уведомление об ошибке котла;

  2. Я лезу в интерфейс HA и в камере смотрю какое давление;

  3. Если давление низкое - открываю кран подпитки.

Уже неплохо, но руки-то чешутся, поэтому я решил, что изображение надо распознать и вывести давление в HA в нормальном виде.

Выбор способа оцифровки

Помимо Raspberry Pi 4B, на котором у меня крутится HA у меня есть самосборный NAS c Xpenology на борту. Поэтому я решил задействовать в этом деле его.

Установка контейнера Python в Xpenology

Первым делом в Container Manager загружаем образ Python 3-10-slim

Создание контейнера:

  1. Открываем вкладку Образы, выбираем python и нажимаем Запустить;

  2. Как-нибудь называем контейнер;

  3. На этапе Настройка томов:

    • Нажимаем Добавить папку

    • Выбираем папку /volume1/docker/gauge

    • В контейнере указываем путь: /app

  4. Нажимаем Далее, потом Применить

Получаем такой результат
Получаем такой результат

Далее временно включаем доступ к NAS по SSH в панели управления:

И подключаемся к с серверу с помощью Putty

Вводим логин, пароль, переключаемся на root, получаем ID контейнера:

docker ps

Заходим в контейнер:

docker exec -it ID_контейнера bash

Устанавливаем необходимые библиотеки:

apt update && apt install -y python3-pip libgl1 libglib2.0-0
apt-get install -y python3-pip python3-dev
apt-get install -y libglib2.0-0 libsm6 libxext6 libxrender-dev
apt-get install -y qt5-qmake qtbase5-dev-tools qtchooser
apt-get install -y libx11-dev libgl1-mesa-glx libfontconfig1 libxkbcommon0
apt-get install -y libxcb-xinerama0
pip install opencv-python numpy
pip3 install opencv-python matplotlib

Создаем скрипт gauge_reader.py и кладем его в папку docker\gauge на NAS. К нему мы вернемся чуть позже.

Настройка Home Assistant

Теперь мне нужно положить в эту же папку, где лежит скрипт само изображение, которое нужно распознать. Для этого делаем следующее:

  1. В HA переходим в Настройки, Хранилище, Добавить сетевое хранилище, указываем путь к папке docker на NAS:

После создаем автоматизацию, которая будет каждый час включать светодиод на ESP32-CAM, делать снимок, выкладывать его в папку gauge, которую мы подключили шагом выше, и выключать светодиод:

alias: ESP Cam snapshot каждый час
description: ""
triggers:
  - minutes: 0
    trigger: time_pattern
actions:
  - target:
      entity_id: switch.pressure_boiler_flash_led
    action: switch.turn_on
    data: {}
  - delay:
      hours: 0
      minutes: 0
      seconds: 10
      milliseconds: 0
  - data:
      entity_id: camera.pressure_boiler_my_camera
      filename: /media/gauge/sample.jpg
    action: camera.snapshot
  - delay:
      hours: 0
      minutes: 0
      seconds: 10
      milliseconds: 0
  - target:
      entity_id: switch.pressure_boiler_flash_led
    action: switch.turn_off
    data: {}
mode: single

И сразу создадим сенсор, в который будет присылать NAS результат распознавания давления:

sensor:
    - name: "Boiler Pressure"
      state_topic: "sensors/boiler/pressure"
      unit_of_measurement: "bar"
      device_class: pressure
      unique_id: pressure_boiler

Подготовка фото

Теперь надо немного поколдовать с самим объектом - манометром. Спустя пару десятков итераций со скриптом я пришел к тому, что несмотря на все ухищрения, качество изображения не позволяет со 100% результатом распознать стрелку манометра. Поэтому я наклеил вокруг манометра квадрат для возможности правильной обрезки и коррекции перспективы. Получилось так:

ESP32-CAM в лего-кронштейне. Над манометром временно прилепил белый лист бумаги, т.к. рельеф корпуса котла иногда вносил небольшую неразбериху в определении угла квадрата.
ESP32-CAM в лего-кронштейне. Над манометром временно прилепил белый лист бумаги, т.к. рельеф корпуса котла иногда вносил небольшую неразбериху в определении угла квадрата.

Написание скрипта (хвала нейросетям!)

Теперь переходим к скрипту. Задачу по определению давления разбиваем на следующие шаги:

  1. Обнаруживаем на фото четырехугольник;

  2. Корректируем перспективу - превращаем четырехугольник в квадрат и обрезаем изображение;

  3. Определяем центр и стрелку манометра;

  4. Замеряем угол отклонения стрелки и по таблице калибровки переводим угол-градусы в давление-бары;

  5. Отрисовываем некоторые этапы для контроля выполнения кода скрипта;

  6. Отправляем полученные данные в mqtt HA.

import cv2
import numpy as np
import math
import logging
import paho.mqtt.client as mqtt
from typing import Optional, Tuple, List

# Настройка логгирования
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Конфигурация
class Config:
    MQTT_BROKER = "192.168.1.****"
    MQTT_PORT = ****
    MQTT_TOPIC = "sensors/boiler/pressure"
    MQTT_USER = "****"
    MQTT_PASS = "****"
    MQTT_TIMEOUT = 5
    
    CALIBRATION = {
        215: 0.0,
        155: 1.0,
        90: 2.0,
        29: 3.0,
        327: 4.0
    }
    
    OUTPUT_DIR = '/app/'
    OUTPUT_SIZE = 600
    BLUR_SIZE = (5, 5)
    ADAPTIVE_THRESH_BLOCK = 11
    ADAPTIVE_THRESH_C = 2
    HOUGH_CIRCLES_PARAMS = {
        'dp': 1,
        'minDist': 100,
        'param1': 50,
        'param2': 30,
        'minRadius': 100,
        'maxRadius': 200
    }
    HOUGH_LINES_PARAMS = {
        'rho': 1,
        'theta': np.pi/180,
        'threshold': 50,
        'minLineLength': 150,
        'maxLineGap': 10
    }

def detect_quadrilateral(image: np.ndarray) -> Tuple[Optional[List[Tuple[int, int]]], np.ndarray]:
    """Определение углов четырехугольника вокруг манометра"""
    gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
    _, thresh = cv2.threshold(gray, 50, 255, cv2.THRESH_BINARY_INV)
    
    kernel = np.ones((5,5), np.uint8)
    thresh = cv2.morphologyEx(thresh, cv2.MORPH_CLOSE, kernel)
    thresh = cv2.morphologyEx(thresh, cv2.MORPH_OPEN, kernel)
    
    contours, _ = cv2.findContours(thresh, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
    
    if not contours:
        return None, image.copy()
    
    largest_contour = max(contours, key=cv2.contourArea)
    epsilon = 0.03 * cv2.arcLength(largest_contour, True)
    approx = cv2.approxPolyDP(largest_contour, epsilon, True)
    
    if len(approx) != 4:
        for eps in [0.02, 0.04, 0.05]:
            epsilon = eps * cv2.arcLength(largest_contour, True)
            approx = cv2.approxPolyDP(largest_contour, epsilon, True)
            if len(approx) == 4:
                break
        else:
            return None, image.copy()
    
    points = np.array([point[0] for point in approx], dtype=np.float32)
    center = np.mean(points, axis=0)
    
    def sort_key(point):
        return math.atan2(point[1] - center[1], point[0] - center[0])
    
    sorted_points = sorted(points, key=sort_key, reverse=True)
    bottom_idx = np.argmax([p[1] for p in sorted_points])
    sorted_points = np.roll(sorted_points, -bottom_idx, axis=0)
    
    if bottom_idx == 1:
        sorted_points = np.roll(sorted_points, -1, axis=0)
    
    corners = [tuple(map(int, p)) for p in sorted_points]
    
    vis = image.copy()
    for i, corner in enumerate(corners):
        cv2.circle(vis, corner, 10, (0, 255, 0), -1)
        cv2.putText(vis, f"{i+1}", (corner[0]+15, corner[1]+15), 
                   cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 255), 2)
    
    for i in range(4):
        cv2.line(vis, corners[i], corners[(i+1)%4], (255, 0, 0), 2)
    
    return corners, vis

def correct_perspective(image: np.ndarray, corners: List[Tuple[int, int]]) -> np.ndarray:
    """Коррекция перспективы на основе найденных углов"""
    size = Config.OUTPUT_SIZE
    dst_points = np.array([
        [0, size-1],
        [size-1, size-1],
        [size-1, 0],
        [0, 0]
    ], dtype=np.float32)
    
    src_points = np.array(corners, dtype=np.float32)
    matrix = cv2.getPerspectiveTransform(src_points, dst_points)
    corrected = cv2.warpPerspective(image, matrix, (size, size))
    
    return corrected

def enhance_image(image: np.ndarray, roi: Tuple[int, int, int, int]) -> Tuple[np.ndarray, np.ndarray]:
    """Улучшение изображения и выделение стрелки"""
    x, y, w, h = roi
    cropped = image[y:y+h, x:x+w]
    gray = cv2.cvtColor(cropped, cv2.COLOR_BGR2GRAY)
    blurred = cv2.GaussianBlur(gray, Config.BLUR_SIZE, 0)
    thresh = cv2.adaptiveThreshold(
        blurred, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C,
        cv2.THRESH_BINARY_INV, Config.ADAPTIVE_THRESH_BLOCK, Config.ADAPTIVE_THRESH_C
    )
    return thresh, cropped

def find_dial_center(image: np.ndarray, roi: Tuple[int, int, int, int]) -> Tuple[Tuple[int, int, int], np.ndarray]:
    """Определение центра циферблата"""
    x, y, w, h = roi
    cropped = image[y:y+h, x:x+w]
    gray = cv2.cvtColor(cropped, cv2.COLOR_BGR2GRAY)
    vis = cropped.copy()
    
    circles = cv2.HoughCircles(gray, cv2.HOUGH_GRADIENT, **Config.HOUGH_CIRCLES_PARAMS)
    
    if circles is not None:
        circles = np.uint16(np.around(circles))
        best = circles[0][0]
        cv2.circle(vis, (best[0], best[1]), best[2], (0, 255, 0), 2)
        cv2.circle(vis, (best[0], best[1]), 2, (0, 0, 255), 3)
        center_vis_path = f"{Config.OUTPUT_DIR}center_detection.jpg"
        cv2.imwrite(center_vis_path, vis)
        return (x + best[0], y + best[1], best[2]), vis
    
    center = (x + w//2, y + h//2, min(w, h)//2)
    cv2.circle(vis, (w//2, h//2), min(w, h)//2, (0, 255, 0), 2)
    center_vis_path = f"{Config.OUTPUT_DIR}center_detection.jpg"
    cv2.imwrite(center_vis_path, vis)
    return center, vis

def find_arrow_angle(thresh: np.ndarray, original: np.ndarray, 
                    roi: Tuple[int, int, int, int], 
                    center_info: Tuple[int, int, int]) -> Tuple[Optional[float], Optional[Tuple[int, int]], np.ndarray]:
    """Поиск угла стрелки"""
    x, y, w, h = roi
    cX, cY, radius = center_info
    cropped = original[y:y+h, x:x+w]
    cX_roi, cY_roi = cX - x, cY - y
    
    params = Config.HOUGH_LINES_PARAMS.copy()
    params['minLineLength'] = radius * 0.5
    
    edges = cv2.Canny(thresh, 50, 150)
    lines = cv2.HoughLinesP(edges, **params)
    
    if lines is not None:
        filtered = []
        for line in lines:
            x1, y1, x2, y2 = line[0]
            dist1 = math.hypot(x1 - cX_roi, y1 - cY_roi)
            dist2 = math.hypot(x2 - cX_roi, y2 - cY_roi)
            if min(dist1, dist2) < radius * 0.3:
                filtered.append(line)
        
        if filtered:
            longest = max(filtered, key=lambda l: math.hypot(l[0][0]-l[0][2], l[0][1]-l[0][3]))
            x1, y1, x2, y2 = longest[0]
            
            tip = (x2, y2) if math.hypot(x1-cX_roi, y1-cY_roi) < math.hypot(x2-cX_roi, y2-cY_roi) else (x1, y1)
            angle = math.degrees(math.atan2(cY_roi - tip[1], tip[0] - cX_roi)) % 360
            
            cv2.line(cropped, (cX_roi, cY_roi), tip, (0, 0, 255), 3)
            return angle, (cX, cY), cropped
    
    return None, None, cropped

def setup_mqtt() -> mqtt.Client:
    """Настройка MQTT клиента"""
    client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2)
    client.username_pw_set(Config.MQTT_USER, Config.MQTT_PASS)
    client.connect(Config.MQTT_BROKER, Config.MQTT_PORT, Config.MQTT_TIMEOUT)
    return client

def calibrate_pressure(angle: float) -> float:
    """Калибровка давления"""
    angles = np.array(sorted(Config.CALIBRATION.keys()))
    pressures = np.array([Config.CALIBRATION[a] for a in angles])
    return float(np.interp(angle, angles, pressures))

def send_mqtt(pressure: float) -> bool:
    """Отправка данных через MQTT"""
    try:
        client = setup_mqtt()
        client.publish(Config.MQTT_TOPIC, f"{pressure:.2f}")
        client.disconnect()
        return True
    except Exception as e:
        logger.error(f"Ошибка MQTT: {str(e)}")
        return False

def process_image(image_path: str) -> Optional[float]:
    """Основная функция обработки изображения"""
    try:
        image = cv2.imread(image_path)
        if image is None:
            raise FileNotFoundError(f"Не удалось загрузить изображение: {image_path}")
        
        quad_corners, quad_vis = detect_quadrilateral(image)
        if quad_corners is None:
            logger.error("Не удалось обнаружить четырехугольник")
            return None
            
        cv2.imwrite(f"{Config.OUTPUT_DIR}quadrilateral_detection.jpg", quad_vis)
        corrected_image = correct_perspective(image, quad_corners)
        cv2.imwrite(f"{Config.OUTPUT_DIR}corrected_perspective.jpg", corrected_image)
        
        corrected_roi = (0, 0, Config.OUTPUT_SIZE, Config.OUTPUT_SIZE)
        thresh, cropped = enhance_image(corrected_image, corrected_roi)
        center, _ = find_dial_center(corrected_image, corrected_roi)
        angle, _, processed = find_arrow_angle(thresh, corrected_image, corrected_roi, center)
        
        if angle is None:
            logger.error("Не удалось определить угол стрелки")
            return None
        
        pressure = calibrate_pressure(angle)
        cv2.putText(processed, f"Angle: {angle:.1f}°", (20, 40), 
                   cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
        cv2.putText(processed, f"Pressure: {pressure:.2f} bar", (20, 80),
                   cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
        
        cv2.imwrite(f"{Config.OUTPUT_DIR}processed.jpg", processed)
        return pressure
        
    except Exception as e:
        logger.error(f"Ошибка обработки: {str(e)}")
        return None

def main():
    image_path = '/app/sample.jpg'
    pressure = process_image(image_path)
    if pressure is not None:
        send_mqtt(pressure)

if __name__ == "__main__":
    main()

Код скрипта писал трудяга DeepSeek под моим чутким руководством. Сначала пытался это сделать в ChatGPT, но после достижения бесплатного лимита запросов он резко "тупел" и дело шло уже не так хорошо:)

Определение углов
Определение углов
Коррекция перспективы и обрезка изображения
Коррекция перспективы и обрезка изображения
Определение центра и окружности манометра
Определение центра и окружности манометра
Поиск стрелки и определение давления по углу отклонения стрелки
Поиск стрелки и определение давления по углу отклонения стрелки

Настройка автозапуска скрипта в Xpenology

Осталось дело за малым - настраиваем на NAS автозапуск скрипта. Переходим в Панель управления, Планировщик задач, Создать, Запланированная задача, Скрипт заданный пользователем.

Во вкладке Общие указываем, что задача должна выполняться от пользователя root. Расписание - ежедневно каждый час. В Настройки задач в поле Выполнить команду прописываем docker exec ID_контейнера python3 /app/gauge_reader.py.

Результат

Теперь раз в час HA делает фотографию манометра и кладет ее в папку NAS. А NAS спустя пять минут распознает это дело шлёт в mqtt фактическое давление в системе отопления.

Да, я в курсе, что сейчас уже есть DIY zigbee манометры. Может быть я соберусь с мыслями, возьму паяльник и врежусь в систему отопления и в систему водоснабжения, чтобы можно было автоматизировать включение крана подпитки системы отопления только в случае наличия холодной воды. Но пока пусть поработает так.

Почему не реализовал распознавание изображения на Raspberry Pi? Просто так захотелось))

Сама идея взаимодействия двух устройств в домашней сети показалась интересной.

Теги:
Хабы:
+42
Комментарии37

Публикации

Работа

Data Scientist
43 вакансии

Ближайшие события