Как стать автором
Обновить
602.83
OTUS
Цифровые навыки от ведущих экспертов

Трассировка распределенных IoT приложений на EMQX

Уровень сложностиПростой
Время на прочтение7 мин
Количество просмотров1.5K

Автоматизация вокруг интернета вещей, использующая информацию телеметрии для управления процессами, умноженная на распределенную микросервисную архитектуру управляющих систем, усложняет диагностику процессов и их выполнения на различных этапах. Сейчас в целом для повышения прозрачности мониторинга распределенных систем развивается технологическая платформа для Observability, включающая в себя сбор логов микросервисов (Logging), извлечение операционных метрик по использованию ресурсов (Metrics) и выполнение замеров времени обработки и отслеживание цепочки обработки (Tracing). Для трассировки систем, использующих REST или gRPC запросы есть множество решений, которые нередко интегрированы в инструменты API Gateway (например в Tyk API Gateway, Kong, Krakend и другие), и для них даже есть инструменты автоматической генерации (например, tracetest, который был рассмотрен в этой статье). Но IoT-системы основаны на передаче сообщений и, во многих случаях, создаются вокруг брокеров очередей сообщений (часто над протоколом передачи телеметрии MQTT) и трассировка должна быть реализована иначе. В этой статье мы создадим простое приложений на основе MQTT с использованием брокера сообщений EMQX и настроим наблюдение через встроенные механизмы трассировки.

EMQX является специализированным MQTT-брокером, который может подписываться на поток событий из внешних источником и может использоваться для координации запуска отдельных микросервисов для реализации логика процесса реакции на данные телеметрии. EMQX может быть запущен в режиме кластера для отказоустойчивости, а также поддерживает установку расширений (plugins), создаваемых на Erlang. Также могут быть настроены шлюзы для взаимодействия с другими протоколами: CoAP, ExProto, LwM2M, MQTT-SN и STOMP или в произвольный адрес через WebHook.

Начнем с установки брокера, он может быть запущен из пакетов/исполняемых файлов для Ubuntu/CentOS/RHEL/MacOS/Windows, через Kubernetes Operator, или через Docker:

docker run -d --name emqx -p 1883:1883 -p 8083:8083 -p 8084:8084 -p 8883:8883 -p 18083:18083 emqx/emqx:5.0.26

Для тестирования MQTT-брокера можно использовать инструмент командной строки mqttx-cli:

docker run -itd --net host --name subscriber emqx/mqttx-cli sub -t room/temperature
docker run -it --net host emqx/mqttx-cli pub -t room/temperature -m "23.2"
docker logs -f subscriber

Также доступен веб-интерфейс для наблюдения за ресурсами по адресу http://localhost:18083 (при первом входе используется логин admin и пароль public).

Для доступа из JVM-приложения также может использоваться клиентская библиотека (например, от HiveMQ):

implementation("com.hivemq:hivemq-mqtt-client:1.3.0")

Создадим три микросервиса, которые будут взаимодействовать через EMQX в соответствии со следующем схемой информационных потоков:

MQTT(3).png
MQTT(3).png

Запускать все компоненты системы будем на одной машине . Создадим реализацию микросервиса логирования, для этого добавим необходимые зависимости в build.gradle.kts:

repositories {
    mavenCentral()
    maven(url = "https://jitpack.io")
}

dependencies {
    implementation("com.github.hivemq.hivemq-mqtt-client:hivemq-mqtt-client:develop-SNAPSHOT")

    implementation(platform("com.github.hivemq.hivemq-mqtt-client:hivemq-mqtt-client-websocket:develop-SNAPSHOT"))
    implementation(platform("com.github.hivemq.hivemq-mqtt-client:hivemq-mqtt-client-proxy:develop-SNAPSHOT"))
    implementation(platform("com.github.hivemq.hivemq-mqtt-client:hivemq-mqtt-client-epoll:develop-SNAPSHOT"))
    implementation("com.github.hivemq.hivemq-mqtt-client:hivemq-mqtt-client-reactor:develop-SNAPSHOT")

    implementation("org.slf4j:slf4j-simple:2.0.3")
    implementation("io.github.oshai:kotlin-logging-jvm:4.0.0-beta-29")

    testImplementation(kotlin("test"))
}

Реализуем подписку на топик room/events:

fun main(args: Array<String>) {
    val log = KotlinLogging.logger("console")
    val client = Mqtt5Client.builder()
        .serverHost("localhost")
        .addConnectedListener {
            log.info { "Logger microservice connected" }
        }.addDisconnectedListener {
            log.info { "Logger microservice disconnected" }
        }.identifier(UUID.randomUUID().toString()).buildBlocking()
    client.connect()
    client.toAsync().subscribeWith().topicFilter("room/events").qos(MqttQos.EXACTLY_ONCE).callback {
        val value = String(it.payloadAsBytes)
        log.info { "Message: $value" }
    }.send()
}

Микросервис Analyzer будет также использовать значение температуры и отправлять управляющую команду в топики room/events и control.

fun main(args: Array<String>) {
    var lastState = ""
    val log = KotlinLogging.logger("Analyzer")
    val client = Mqtt5Client.builder()
        .serverHost("localhost")
        .addConnectedListener {
            log.info { "Analyzer microservice connected" }
        }.addDisconnectedListener {
            log.info { "Analyzer microservice disconnected" }
        }.identifier(UUID.randomUUID().toString()).buildBlocking()
    client.connect()
    client.toAsync().subscribeWith().topicFilter("room/temperature").qos(MqttQos.EXACTLY_ONCE).callback {
        val temperature = String(it.payloadAsBytes).toDoubleOrNull()
        log.info { "Temperature = $temperature" }
        val event = when {
            temperature!=null && temperature<23 -> "heat"
            temperature!=null && temperature>=26 -> "cool"
            else -> null
        }
        if (event!=lastState) {
            event?.let {
                lastState = event
                log.info { "Send event $event" }
                client.toAsync().publishWith().topic("room/events").qos(MqttQos.AT_LEAST_ONCE).payload(it.toByteArray()).send()
                client.toAsync().publishWith().topic("control").qos(MqttQos.AT_LEAST_ONCE).payload(it.toByteArray()).send()
            }
        }
    }.send()
}

Микросервис Control будет выглядеть аналогично Analyzer, с отправкой всех полученных сообщений в очередь управления для кондиционера. Добавим дополнительную задержку перед пересылкой сообщения, для этого используем библиотеку поддержки корутин для Kotlin-проекта.

fun main(args: Array<String>) {
    val log = KotlinLogging.logger("Control")
    val client = Mqtt5Client.builder()
        .serverHost("localhost")
        .addConnectedListener {
            log.info { "Control microservice connected" }
        }.addDisconnectedListener {
            log.info { "Control microservice disconnected" }
        }.identifier(UUID.randomUUID().toString()).buildBlocking()
    client.connect()
    client.toAsync().subscribeWith().topicFilter("control").qos(MqttQos.EXACTLY_ONCE).callback {
        log.info { "Get command: ${String(it.payloadAsBytes)}"}
        runBlocking {
            delay(2000)
            log.info { "Send to conditioner" }
            client.toAsync().publishWith().topic("control/conditioner").qos(MqttQos.AT_LEAST_ONCE).payload(it.payloadAsBytes).send()
        }
    }.send()}

Теперь перейдем к настройке мониторинга. Прежде всего можно добавить мониторинг на зарегистрированные топики (появятся после запуска наших сервисов) на странице http://localhost:18083/#/subscriptions/topics.

изображение.png
изображение.png

Дальше можно зарегистрировать Trace Log по topic или ClientId. К каждому подключенному приложению присоединяется уникальный идентификатор, их можно посмотреть на странице http://localhost:18083/#/clients. Например, идентификатор клиента, который подписан на topic control (микросервис Control) может быть da03906b-66ef-4a11-a978-7592d97bf676. Создадим трассировку на странице http://localhost:18083/#/log-trace. Это позволит отследить момент времени, когда сообщение было добавлено в topic или извлечено из него (при создании лога по ClientId), например:

2023-06-17T20:37:17.871989+00:00 [MQTT] 6e57f382-888e-4012-bc3b-0f4158bfc4cf@172.17.0.1:52472 msg: mqtt_packet_sent, packet: PUBLISH(Q1, R0, D0, Topic=control, PacketId=2, Payload=heat)
2023-06-17T20:37:17.876491+00:00 [MQTT] 6e57f382-888e-4012-bc3b-0f4158bfc4cf@172.17.0.1:52472 msg: mqtt_packet_received, packet: PUBACK(Q0, R0, D0, PacketId=2, ReasonCode=0)
2023-06-17T20:37:19.883670+00:00 [MQTT] 6e57f382-888e-4012-bc3b-0f4158bfc4cf@172.17.0.1:52472 msg: mqtt_packet_received, packet: PUBLISH(Q1, R0, D0, Topic=, PacketId=1, Payload=heat)
2023-06-17T20:37:19.883863+00:00 [PUBLISH] 6e57f382-888e-4012-bc3b-0f4158bfc4cf@172.17.0.1:52472 msg: publish_to, topic: control/conditioner, payload: heat

Первые два сообщения связаны с извлечением сообщения из топика, следующие два - отправка пакета и публикация сообщения в topic (control/conditioner). Если добавить такие обработчики к каждому Client и сделать дополнительную обработку сохраненных логов трассировки можно было отсле:дить время обработки на каждом шаге обработки. Но здесь нет никакого идентификатора, по которому можно было бы отследить порядок обработки конкретного сообщения. Самое простое решение - можно кодировать сообщение в JSON и добавить в него дополнительный токен на первом этапе обработки телеметрии и передается между последующими сообщениями. Для этого добавим поддержку kotlinx.serialization (плагин для gradle и implementation) и будем кодирование сообщение с токеном трассировки (в Analyzer):

@Serializable
data class TraceableMessage(val traceId:String, val message:String)

//
  val token = UUID.randomUUID().toString()
  val message = Json.encodeToString(TraceableMessage(traceId=token, message=String(it.toByteArray())))
  client.toAsync().publishWith().topic("room/events").qos(MqttQos.AT_LEAST_ONCE).payload(message.toString().toByteArray()).send()

Теперь во всей цепочке сообщений будет доступен уникальный токен для отслеживания обработки одного замера температуры.

2023-06-17T21:13:31.874551+00:00 [MQTT] 3c635788-77bf-4439-a8f1-01eaf6b9e74f@172.17.0.1:37276 msg: mqtt_packet_sent, packet: PUBLISH(Q1, R0, D0, Topic=control, PacketId=2, Payload={"traceId":"0276260a-2371-46c6-8c4e-cdc509971ba4","message":"cool"})
2023-06-17T21:13:31.881634+00:00 [MQTT] 3c635788-77bf-4439-a8f1-01eaf6b9e74f@172.17.0.1:37276 msg: mqtt_packet_received, packet: PUBACK(Q0, R0, D0, PacketId=2, ReasonCode=0)
2023-06-17T21:13:33.887715+00:00 [MQTT] 3c635788-77bf-4439-a8f1-01eaf6b9e74f@172.17.0.1:37276 msg: mqtt_packet_received, packet: PUBLISH(Q1, R0, D0, Topic=, PacketId=1, Payload={"traceId":"0276260a-2371-46c6-8c4e-cdc509971ba4","message":"cool"})
2023-06-17T21:13:33.887924+00:00 [PUBLISH] 3c635788-77bf-4439-a8f1-01eaf6b9e74f@172.17.0.1:37276 msg: publish_to, topic: control/conditioner, payload: {"traceId":"0276260a-2371-46c6-8c4e-cdc509971ba4","message":"cool"}

Файлы логов могут извлекаться и анализироваться автоматически и создаваться trace-последовательности в формате OpenTracing (сейчас входит в OpenTelemetry). Например можно использовать Java-библиотеку, которые объединяют все запросы одной последовательности в span и создает subspans для каждого этапа обработки на основе анализа Trace Log. Пример такой реализации может быть найден в github-репозитории (там же размещены исходные тексты рассмотренных выше микросервисов). Также отслеживание может быть реализовано в виде плагина (за основу может быть взять шаблон) и установлено в EMQX.

Таким образом, использование встроенных возможностей трассировки в брокере сообщений с добавлением токена отслеживания помогает в диагностике потенциальных проблем в распределенных системах.

В заключение рекомендую открытый урок, посвященный декомпозиции системы на микросервисы по бизнес-аспектам и Event Storming. О чем пойдет речь на этом занятии:

  • Декомпозиция по бизнес-аспектам;

  • Определение системных операций;

  • Разбиение операций на сервисы;

  • Разбиение по системным аспектам и DDD подход;

  • Методология PARL и Event Storming;

Записаться на урок можно по ссылке.

Теги:
Хабы:
Если эта публикация вас вдохновила и вы хотите поддержать автора — не стесняйтесь нажать на кнопку
Всего голосов 8: ↑8 и ↓0+8
Комментарии0

Публикации

Информация

Сайт
otus.ru
Дата регистрации
Дата основания
Численность
101–200 человек
Местоположение
Россия
Представитель
OTUS