Разработанная Яндексом СУБД ClickHouse — одно из самых распространенных решений для аналитической обработки в режиме онлайн, позволяющее обрабатывать и анализировать большие объемы данных с минимальной задержкой.
Используемый в СУБД формат хранения в виде столбцов и параллельное выполнение запросов оптимизируют поиск и агрегирование данных, обеспечивая молниеносные аналитические возможности. ClickHouse славится своей масштабируемостью, позволяя организациям справляться с постоянно растущим объемом данных, генерируемых устройствами IoT, сохраняя при этом исключительную производительность.
Интернет вещей, в свою очередь тоже в последние годы получил бурное развитие. Увеличилось и само количество устройств, и множество тех сообщений, которые эти устройства создают. Соответственно, возникла потребность в высокопроизводительной СУБД, способной обрабатывать и сохранять большой поток данных.
В этой статье мы рассмотрим использование ClickHouse для хранения данных, поступающих по протоколу MQTT. Данный протокол был специально разработан для приложений IoT и предназначен для обеспечения эффективной и надежной связи между устройствами.
В MQTT используется облегченная модель публикации‑подписки, обеспечивающая бесперебойную передачу данных даже в условиях ограниченных ресурсов. Низкие накладные расходы и поддержка потоковой передачи данных в реальном времени делают протокол MQTT идеальным выбором для сбора и передачи данных IoT с различных конечных устройств на платформы обработки данных.
О преимуществах интеграции
Используя MQTT в качестве коммуникационного уровня, данные с IoT‑устройств можно легко передавать в ClickHouse, эффективно хранить и обрабатывать с помощью высокопроизводительных аналитических возможностей. При этом можно использовать EMQX — самый популярный MQTT‑брокер. Его возможности интеграции данных из коробки могут легко обеспечить работу решения MQTT‑ClickHouse и принести ряд преимуществ.
Например, мы можем обеспечить потоковую передачу данных в реальном времени с высокой производительностью и масштабируемостью. Также большое значение здесь имеет гибкость в преобразовании данных: EMQX предоставляет мощный механизм правил на базе SQL, позволяющий предварительно обрабатывать данные перед их передачей на хранение в ClickHouse.
На рисунке ниже представлена общая архитектура интеграции ClickHouse, EMQX и MQTT. В нашем случае кластер EMQX будет состоять из одной ноды, но для демонстрации работы этого вполне достаточно.

Теперь перейдем к практике и начнем с установки ClickHouse. На самом деле для примера с IoT, который мы будем разбирать далее, установка отдельного экземпляра необязательна, так как все уже есть в контейнерах, но я решил в образовательных целях описать сам процесс установки.
Установка ClickHouse
Процесс установки ClickHouse достаточно прост. Для начала устанавливаем необходимые пакеты и проводим манипуляции с ключами.
sudo apt-get install -y apt-transport-https ca-certificates dirmngr
sudo apt-key adv --keyserver hkp://keyserver.ubuntu.com:80 --recv 8919F6BD2B48D754
Подключаем необходимый репозиторий и выполняем обновление:
echo "deb
https://packages.clickhouse.com/deb
stable main" | sudo tee \
/etc/apt/sources.list.d/clickhouse.list
sudo apt-get update
Затем устанавливаем сервер и клиент и запускаем сервер:
sudo apt-get install -y clickhouse-server clickhouse-client
sudo service clickhouse-server start
Чтобы убедиться в работоспособности ClickHouse посмотрим статус сервиса:

Тестовый проект
Чтобы убедиться в работоспособности нашей интеграции мы развернем тестовый проект. Это будет демонстрационный образец автомобильной зарядной станции на базе IoT. Она использует EMQX для подключения к зарядным станциям и MQTT для передачи их статуса и данных. Данные по мониторингу и анализу количества заказов, продолжительности зарядки и состояния зарядки в режиме реального времени будут храниться в ClickHouse.
Здесь у нас все будет помещено в контейнеры. Клонируем файлы проекта:
git clone
https://github.com/emqx/mqtt-to-clickhouse
cd mqtt-to-clickhouse
Далее просто прибегнем к помощи docker compose:
docker-compose up –d
У нас должно развернуться пять компонентов: СУБД, EMQX, MQTTX и симулятор активности, и Grafana для визуализации.

Далее открываем в браузере веб интерфейс EMQX:
Учетные данные: admin/public. Далее нас попросят сменить пароль и предупредят об ограничении в 25 управляемых устройств. Всё, стенд готов.
В веб‑интерфейсе мы можем наблюдать имитацию подключения 5 зарядных пистолетов, основанных на протоколе OCPP, к EMQX. Они начали моделировать нагрузку с 48 часов назад, публикуя сообщения о начале и окончании зарядки (заказы на зарядку) через MQTT, и периодически сообщая такие данные, как мощность, напряжение, показания счетчика и продолжительность зарядки для каждого заказа в процессе зарядки.

Если мы выберем третий сверху белый значок, а затем опцию Flow Designer, то мы увидим два правила, которые созданы в EMQX для интеграции зарядной станции с ClickHouse. Далее мы посмотрим структуру этих сообщений и то, как EMQX записывает их в ClickHouse с помощью механизма правил и функции интеграции данных.

Вот пример сообщения начала зарядки StartTransaction:
{"messageType":"Call","action":"StartTransaction","payload":{"connectorId":"f788a12a-1b7d-4205-9d8e-37307aae366a","transactionId":"6b738341-b9f3-4d42-adb8-9696e0b8aba6","idTag":"No. 2","timestamp":1710801744456,"reservationId":null,"stackLevel":0,"meterStart":20184.628600000484}}
ClickHouse не нужно хранить это событие, оно сохраняется после завершения процесса зарядки. Поэтому на этом этапе нет необходимости создавать соответствующие правила.
В процессе зарядки зарядное оборудование периодически отправляет данные в MeterValues, предоставляя данные учета, такие как количество заряда, напряжение, ток и т. д. EMQX создаст правило для обработки данных MeterValues и запишет их в ClickHouse для анализа моделей использования электроэнергии.
Давайте создадим правило в EMQX, которое будет запрашивать из MeterValues значения payload.payload и record.meterValue. Для этого выбираем Rules, и в открывшемся списке правил нажимаем Create. Здесь нам надо указать имя правила и запрос следующего вида:
SELECT
payload.payload as record,
record.meterValue as meterValue
FROM
"mqttx/simulate/charge/+/MeterValues"
Далее снова Create. Сразу после создания правила можно нажать на него и убедиться в его работоспособности.

Здесь мы видим количество срабатываний правила и различные статистические характеристики.
После обработки данных с помощью правила EMQX использует действие, указанное в свойствах для записи данных о значениях счетчиков в режиме реального времени в ClickHouse.
EMQX поддерживает интеграцию данных с ClickHouse с помощью шаблонов SQL для вставки данных, которые могут хорошо адаптироваться к сложным структурам данных, что позволяет гибко записывать данные.
EMQX будет использовать следующий SQL‑шаблон для сохранения в ClickHouse каждой части данных о тарификации на основе данных от зарядной системы:
INSERT INTO charging_record (
connectorId,
transactionId,
timestamp,
voltage,
currentInput,
power,
meter,
currentTemperature
) VALUES (
'${record.connectorId}',
'${record.transactionId}',
toDateTime(${record.timestamp}/1000),
${meterValue.voltage},
${meterValue.currentInput},
${meterValue.power},
${meterValue.meter},
${meterValue.currentTemperature}
)
Аналогичные правила есть и для других симулируемых событий. Кстати сами события, которые мы в большом количестве сохраняем можно увидеть с помощью логов docker, выполнив:
$ docker logs -f mqttx

Однако количество событий будет мягко говоря большим, так что это не лучший способ для их просмотра. Удобнее использовать:
mqttx sub -t mqttx/simulate/charge/+/+
Также, просмотреть собранные данные о зарядных станциях можно на панели Grafana. Для этого откройте в браузере сайт http://IP_адрес:3000/ и войдите в систему, используя имя пользователя admin и пароль public.

После успешного входа в систему перейдите на страницу «Приборные панели» в навигационной панели главной страницы. Затем выберите приборную панель «ClickHouse — зарядная станция». На этой панели будут отображаться такие ключевые показатели, как текущие заказы на зарядной станции, состояние выручки, потребление электроэнергии, статистика отключенных зарядных пистолетов и почасовое потребление электроэнергии.
Заключение
Универсальность и возможности MQTT в режиме реального времени в сочетании с возможностями ClickHouse по хранению и анализу данных позволяют использовать данные IoT для повышения эффективности работы, снижения затрат и принятия решений на основе собранных данных.
Благодаря высокой скорости работы БД CkickHouse мы можем использовать данное решение в различных отраслях, требующих большой скорости обработки данных. Помимо IoT это решение также может использоваться в системах мониторинга, сбора логов, трейсов и других.
В завершение рекомендую открытые уроки по CkickHouse, которые проведут мои коллеги в Otus:
11 марта: «Миграция OLTP данных в ClickHouse и денормализация на практике». Узнать подробнее
18 марта: «Практикум по дедупликации данных в ClickHouse».
Узнать подробнее