Оцифровка показаний стрелочного манометра в Home Assistant
Давно мучал вопрос передачи показаний давления системы отопления со штатного манометра газового котла. Для этого несколько лет назад была приобретена камера ESP32-CAM и интегрирована в Home Assistant посредством ESPHome.
Камеру я направил прямо на манометр, что позволило мне периодически вручную удаленно контролировать давление в системе отопления, и, при необходимости, открывать кран подачи воды в систему (также удалённо).
Для того, чтобы хоть как-то облагородить этот "аналоговый" процесс в Node-RED была создана автоматизация, завязанная на контроле значений энергопотребления котла и насосов:
Работало это так:
В Телеграм приходит уведомление об ошибке котла;
Я лезу в интерфейс HA и в камере смотрю какое давление;
Если давление низкое - открываю кран подпитки.
Уже неплохо, но руки-то чешутся, поэтому я решил, что изображение надо распознать и вывести давление в HA в нормальном виде.
Выбор способа оцифровки
Помимо Raspberry Pi 4B, на котором у меня крутится HA у меня есть самосборный NAS c Xpenology на борту. Поэтому я решил задействовать в этом деле его.
Установка контейнера Python в Xpenology
Первым делом в Container Manager загружаем образ Python 3-10-slim
Создание контейнера:
Открываем вкладку Образы, выбираем python и нажимаем Запустить;
Как-нибудь называем контейнер;
На этапе Настройка томов:
Нажимаем Добавить папку
Выбираем папку /volume1/docker/gauge
В контейнере указываем путь: /app
Нажимаем Далее, потом Применить
Далее временно включаем доступ к NAS по SSH в панели управления:
И подключаемся к с серверу с помощью Putty
Вводим логин, пароль, переключаемся на root, получаем ID контейнера:
docker ps
Заходим в контейнер:
docker exec -it ID_контейнера bash
Устанавливаем необходимые библиотеки:
apt update && apt install -y python3-pip libgl1 libglib2.0-0
apt-get install -y python3-pip python3-dev
apt-get install -y libglib2.0-0 libsm6 libxext6 libxrender-dev
apt-get install -y qt5-qmake qtbase5-dev-tools qtchooser
apt-get install -y libx11-dev libgl1-mesa-glx libfontconfig1 libxkbcommon0
apt-get install -y libxcb-xinerama0
pip install opencv-python numpy
pip3 install opencv-python matplotlib
Создаем скрипт gauge_reader.py и кладем его в папку docker\gauge на NAS. К нему мы вернемся чуть позже.
Настройка Home Assistant
Теперь мне нужно положить в эту же папку, где лежит скрипт само изображение, которое нужно распознать. Для этого делаем следующее:
В HA переходим в Настройки, Хранилище, Добавить сетевое хранилище, указываем путь к папке docker на NAS:
После создаем автоматизацию, которая будет каждый час включать светодиод на ESP32-CAM, делать снимок, выкладывать его в папку gauge, которую мы подключили шагом выше, и выключать светодиод:
alias: ESP Cam snapshot каждый час
description: ""
triggers:
- minutes: 0
trigger: time_pattern
actions:
- target:
entity_id: switch.pressure_boiler_flash_led
action: switch.turn_on
data: {}
- delay:
hours: 0
minutes: 0
seconds: 10
milliseconds: 0
- data:
entity_id: camera.pressure_boiler_my_camera
filename: /media/gauge/sample.jpg
action: camera.snapshot
- delay:
hours: 0
minutes: 0
seconds: 10
milliseconds: 0
- target:
entity_id: switch.pressure_boiler_flash_led
action: switch.turn_off
data: {}
mode: single
И сразу создадим сенсор, в который будет присылать NAS результат распознавания давления:
sensor:
- name: "Boiler Pressure"
state_topic: "sensors/boiler/pressure"
unit_of_measurement: "bar"
device_class: pressure
unique_id: pressure_boiler
Подготовка фото
Теперь надо немного поколдовать с самим объектом - манометром. Спустя пару десятков итераций со скриптом я пришел к тому, что несмотря на все ухищрения, качество изображения не позволяет со 100% результатом распознать стрелку манометра. Поэтому я наклеил вокруг манометра квадрат для возможности правильной обрезки и коррекции перспективы. Получилось так:
Написание скрипта (хвала нейросетям!)
Теперь переходим к скрипту. Задачу по определению давления разбиваем на следующие шаги:
Обнаруживаем на фото четырехугольник;
Корректируем перспективу - превращаем четырехугольник в квадрат и обрезаем изображение;
Определяем центр и стрелку манометра;
Замеряем угол отклонения стрелки и по таблице калибровки переводим угол-градусы в давление-бары;
Отрисовываем некоторые этапы для контроля выполнения кода скрипта;
Отправляем полученные данные в mqtt HA.
import cv2
import numpy as np
import math
import logging
import paho.mqtt.client as mqtt
from typing import Optional, Tuple, List
# Настройка логгирования
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Конфигурация
class Config:
MQTT_BROKER = "192.168.1.****"
MQTT_PORT = ****
MQTT_TOPIC = "sensors/boiler/pressure"
MQTT_USER = "****"
MQTT_PASS = "****"
MQTT_TIMEOUT = 5
CALIBRATION = {
215: 0.0,
155: 1.0,
90: 2.0,
29: 3.0,
327: 4.0
}
OUTPUT_DIR = '/app/'
OUTPUT_SIZE = 600
BLUR_SIZE = (5, 5)
ADAPTIVE_THRESH_BLOCK = 11
ADAPTIVE_THRESH_C = 2
HOUGH_CIRCLES_PARAMS = {
'dp': 1,
'minDist': 100,
'param1': 50,
'param2': 30,
'minRadius': 100,
'maxRadius': 200
}
HOUGH_LINES_PARAMS = {
'rho': 1,
'theta': np.pi/180,
'threshold': 50,
'minLineLength': 150,
'maxLineGap': 10
}
def detect_quadrilateral(image: np.ndarray) -> Tuple[Optional[List[Tuple[int, int]]], np.ndarray]:
"""Определение углов четырехугольника вокруг манометра"""
gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
_, thresh = cv2.threshold(gray, 50, 255, cv2.THRESH_BINARY_INV)
kernel = np.ones((5,5), np.uint8)
thresh = cv2.morphologyEx(thresh, cv2.MORPH_CLOSE, kernel)
thresh = cv2.morphologyEx(thresh, cv2.MORPH_OPEN, kernel)
contours, _ = cv2.findContours(thresh, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
if not contours:
return None, image.copy()
largest_contour = max(contours, key=cv2.contourArea)
epsilon = 0.03 * cv2.arcLength(largest_contour, True)
approx = cv2.approxPolyDP(largest_contour, epsilon, True)
if len(approx) != 4:
for eps in [0.02, 0.04, 0.05]:
epsilon = eps * cv2.arcLength(largest_contour, True)
approx = cv2.approxPolyDP(largest_contour, epsilon, True)
if len(approx) == 4:
break
else:
return None, image.copy()
points = np.array([point[0] for point in approx], dtype=np.float32)
center = np.mean(points, axis=0)
def sort_key(point):
return math.atan2(point[1] - center[1], point[0] - center[0])
sorted_points = sorted(points, key=sort_key, reverse=True)
bottom_idx = np.argmax([p[1] for p in sorted_points])
sorted_points = np.roll(sorted_points, -bottom_idx, axis=0)
if bottom_idx == 1:
sorted_points = np.roll(sorted_points, -1, axis=0)
corners = [tuple(map(int, p)) for p in sorted_points]
vis = image.copy()
for i, corner in enumerate(corners):
cv2.circle(vis, corner, 10, (0, 255, 0), -1)
cv2.putText(vis, f"{i+1}", (corner[0]+15, corner[1]+15),
cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 255), 2)
for i in range(4):
cv2.line(vis, corners[i], corners[(i+1)%4], (255, 0, 0), 2)
return corners, vis
def correct_perspective(image: np.ndarray, corners: List[Tuple[int, int]]) -> np.ndarray:
"""Коррекция перспективы на основе найденных углов"""
size = Config.OUTPUT_SIZE
dst_points = np.array([
[0, size-1],
[size-1, size-1],
[size-1, 0],
[0, 0]
], dtype=np.float32)
src_points = np.array(corners, dtype=np.float32)
matrix = cv2.getPerspectiveTransform(src_points, dst_points)
corrected = cv2.warpPerspective(image, matrix, (size, size))
return corrected
def enhance_image(image: np.ndarray, roi: Tuple[int, int, int, int]) -> Tuple[np.ndarray, np.ndarray]:
"""Улучшение изображения и выделение стрелки"""
x, y, w, h = roi
cropped = image[y:y+h, x:x+w]
gray = cv2.cvtColor(cropped, cv2.COLOR_BGR2GRAY)
blurred = cv2.GaussianBlur(gray, Config.BLUR_SIZE, 0)
thresh = cv2.adaptiveThreshold(
blurred, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C,
cv2.THRESH_BINARY_INV, Config.ADAPTIVE_THRESH_BLOCK, Config.ADAPTIVE_THRESH_C
)
return thresh, cropped
def find_dial_center(image: np.ndarray, roi: Tuple[int, int, int, int]) -> Tuple[Tuple[int, int, int], np.ndarray]:
"""Определение центра циферблата"""
x, y, w, h = roi
cropped = image[y:y+h, x:x+w]
gray = cv2.cvtColor(cropped, cv2.COLOR_BGR2GRAY)
vis = cropped.copy()
circles = cv2.HoughCircles(gray, cv2.HOUGH_GRADIENT, **Config.HOUGH_CIRCLES_PARAMS)
if circles is not None:
circles = np.uint16(np.around(circles))
best = circles[0][0]
cv2.circle(vis, (best[0], best[1]), best[2], (0, 255, 0), 2)
cv2.circle(vis, (best[0], best[1]), 2, (0, 0, 255), 3)
center_vis_path = f"{Config.OUTPUT_DIR}center_detection.jpg"
cv2.imwrite(center_vis_path, vis)
return (x + best[0], y + best[1], best[2]), vis
center = (x + w//2, y + h//2, min(w, h)//2)
cv2.circle(vis, (w//2, h//2), min(w, h)//2, (0, 255, 0), 2)
center_vis_path = f"{Config.OUTPUT_DIR}center_detection.jpg"
cv2.imwrite(center_vis_path, vis)
return center, vis
def find_arrow_angle(thresh: np.ndarray, original: np.ndarray,
roi: Tuple[int, int, int, int],
center_info: Tuple[int, int, int]) -> Tuple[Optional[float], Optional[Tuple[int, int]], np.ndarray]:
"""Поиск угла стрелки"""
x, y, w, h = roi
cX, cY, radius = center_info
cropped = original[y:y+h, x:x+w]
cX_roi, cY_roi = cX - x, cY - y
params = Config.HOUGH_LINES_PARAMS.copy()
params['minLineLength'] = radius * 0.5
edges = cv2.Canny(thresh, 50, 150)
lines = cv2.HoughLinesP(edges, **params)
if lines is not None:
filtered = []
for line in lines:
x1, y1, x2, y2 = line[0]
dist1 = math.hypot(x1 - cX_roi, y1 - cY_roi)
dist2 = math.hypot(x2 - cX_roi, y2 - cY_roi)
if min(dist1, dist2) < radius * 0.3:
filtered.append(line)
if filtered:
longest = max(filtered, key=lambda l: math.hypot(l[0][0]-l[0][2], l[0][1]-l[0][3]))
x1, y1, x2, y2 = longest[0]
tip = (x2, y2) if math.hypot(x1-cX_roi, y1-cY_roi) < math.hypot(x2-cX_roi, y2-cY_roi) else (x1, y1)
angle = math.degrees(math.atan2(cY_roi - tip[1], tip[0] - cX_roi)) % 360
cv2.line(cropped, (cX_roi, cY_roi), tip, (0, 0, 255), 3)
return angle, (cX, cY), cropped
return None, None, cropped
def setup_mqtt() -> mqtt.Client:
"""Настройка MQTT клиента"""
client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2)
client.username_pw_set(Config.MQTT_USER, Config.MQTT_PASS)
client.connect(Config.MQTT_BROKER, Config.MQTT_PORT, Config.MQTT_TIMEOUT)
return client
def calibrate_pressure(angle: float) -> float:
"""Калибровка давления"""
angles = np.array(sorted(Config.CALIBRATION.keys()))
pressures = np.array([Config.CALIBRATION[a] for a in angles])
return float(np.interp(angle, angles, pressures))
def send_mqtt(pressure: float) -> bool:
"""Отправка данных через MQTT"""
try:
client = setup_mqtt()
client.publish(Config.MQTT_TOPIC, f"{pressure:.2f}")
client.disconnect()
return True
except Exception as e:
logger.error(f"Ошибка MQTT: {str(e)}")
return False
def process_image(image_path: str) -> Optional[float]:
"""Основная функция обработки изображения"""
try:
image = cv2.imread(image_path)
if image is None:
raise FileNotFoundError(f"Не удалось загрузить изображение: {image_path}")
quad_corners, quad_vis = detect_quadrilateral(image)
if quad_corners is None:
logger.error("Не удалось обнаружить четырехугольник")
return None
cv2.imwrite(f"{Config.OUTPUT_DIR}quadrilateral_detection.jpg", quad_vis)
corrected_image = correct_perspective(image, quad_corners)
cv2.imwrite(f"{Config.OUTPUT_DIR}corrected_perspective.jpg", corrected_image)
corrected_roi = (0, 0, Config.OUTPUT_SIZE, Config.OUTPUT_SIZE)
thresh, cropped = enhance_image(corrected_image, corrected_roi)
center, _ = find_dial_center(corrected_image, corrected_roi)
angle, _, processed = find_arrow_angle(thresh, corrected_image, corrected_roi, center)
if angle is None:
logger.error("Не удалось определить угол стрелки")
return None
pressure = calibrate_pressure(angle)
cv2.putText(processed, f"Angle: {angle:.1f}°", (20, 40),
cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
cv2.putText(processed, f"Pressure: {pressure:.2f} bar", (20, 80),
cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
cv2.imwrite(f"{Config.OUTPUT_DIR}processed.jpg", processed)
return pressure
except Exception as e:
logger.error(f"Ошибка обработки: {str(e)}")
return None
def main():
image_path = '/app/sample.jpg'
pressure = process_image(image_path)
if pressure is not None:
send_mqtt(pressure)
if __name__ == "__main__":
main()
Код скрипта писал трудяга DeepSeek под моим чутким руководством. Сначала пытался это сделать в ChatGPT, но после достижения бесплатного лимита запросов он резко "тупел" и дело шло уже не так хорошо:)
Настройка автозапуска скрипта в Xpenology
Осталось дело за малым - настраиваем на NAS автозапуск скрипта. Переходим в Панель управления, Планировщик задач, Создать, Запланированная задача, Скрипт заданный пользователем.
Во вкладке Общие указываем, что задача должна выполняться от пользователя root. Расписание - ежедневно каждый час. В Настройки задач в поле Выполнить команду прописываем docker exec ID_контейнера python3 /app/gauge_reader.py.
Результат
Теперь раз в час HA делает фотографию манометра и кладет ее в папку NAS. А NAS спустя пять минут распознает это дело шлёт в mqtt фактическое давление в системе отопления.
Да, я в курсе, что сейчас уже есть DIY zigbee манометры. Может быть я соберусь с мыслями, возьму паяльник и врежусь в систему отопления и в систему водоснабжения, чтобы можно было автоматизировать включение крана подпитки системы отопления только в случае наличия холодной воды. Но пока пусть поработает так.
Почему не реализовал распознавание изображения на Raspberry Pi? Просто так захотелось))
Сама идея взаимодействия двух устройств в домашней сети показалась интересной.