Как стать автором
Обновить
70.26
Слёрм
Учебный центр для тех, кто работает в IT

Потоковый проект в режиме реального времени с использованием данных смартфона

Время на прочтение 18 мин
Количество просмотров 2.8K
Автор оригинала: Harrison Hoffman

Девайсы повсюду. Смартфоны, холодильники, дверные звонки, часы, медицинские датчики, системы безопасности и фитнес-трекеры — все это лишь некоторые из них, которые стали обычным явлением. Они постоянно записывают потенциально высокочастотную информацию и образуют сеть, известную как «Интернет вещей», или IoT, представляя обширные источники данных.

Хотя ресурсов по этой теме достаточно, немногие приводят примеры с реальными данными, доступными любому желающему. Переходя от статьи к статье, чтобы узнать о системах, управляемых событиями, и потоковых технологиях, таких как Apache Kafka, Harrison Hoffman наткнулся на приложение для смартфонов Sensor Logger, которое позволяет пользователям передавать данные с датчиков движения на свои телефоны. Такой вариант показался идеальным способом обучения, поэтому родился проект «smartphone_sensor_stream». Этот проект использует FastAPI, Kafka, QuestDB и Docker для визуализации данных датчиков в реальном времени на информационной панели.

В этой статье мы рассмотрим все основные компоненты этого проекта на продвинутом уровне. Все необходимое для локального запуска проекта доступно на GitHub, а краткая демонстрация доступна на YouTube. 

Архитектура проекта

Давайте начнем с рассмотрения архитектуры этого проекта. То есть с того, как именно данные будут поступать со смартфонов на панель мониторинга:

Каждый смартфон отправляет показания датчиков акселерометра, гироскопа и магнитометра через POST-запрос в приложение FastAPI. Производитель FastAPI асинхронно записывает показания датчика в раздел Kafka в виде JSON, данные из тела запроса. Каждый объект JSON обрабатывается процессом python, консумером, а потом сохраняется в таблице Quest DB. Как только данные попадают в базу данных, они становятся доступными для любой зависимой службы или приложения. В первой части этого проекта мы будем выводить показания датчиков на панель мониторинга, используя события, отправляемые сервером (SSE).

Структура каталогов и компоновка Docker Compose

Этот проект представляет собой набор небольших сервисов, которые взаимодействуют друг с другом для получения данных со смартфонов на информационную панель. Вот структура каталогов:

|-producer
 | |-app
 | | |-core
 | | | |-config.py
 | | |-__init__.py
 | | |-schemas
 | | | |-sensors.py
 | | |-main.py
 | |-requirements.txt
 | |-Dockerfile
 | |-entrypoint.sh
|-db_consumer
 | |-app
 | | |-core
 | | | |-config.py
 | | |-models
 | | | |-sensors.py
 | | |-db
 | | | |-ingress.py
 | | |-main.py
 | |-requirements.txt
 | |-Dockerfile
 | |-entrypoint.sh
|-ui_server
 | |-app
 | | |-core
 | | | |-config.py
 | | |-models
 | | | |-sensors.py
 | | |-static
 | | | |-js
 | | | | |-main.js
 | | |-db
 | | | |-data_api.py
 | | |-templates
 | | | |-index.html
 | | |-main.py
 | |-requirements.txt
 | |-Dockerfile
 | |-entrypoint.sh
|-README.md
|-.gitignore
|-.env
|-docker-compose.yml

Мы напишем три сервиса: продюсер, консумер и пользовательский интерфейс. Каждая служба упакована с помощью Dockerfile и организована при помощи docker-compose. Docker-compose позволяет нам запускать сервисы, которые мы пишем, с внешними сервисами, Kafka, Zookeeper и QuestDB, в виде отдельных контейнеров, подключенных через внутреннюю сеть. Все, что нам нужно для организации служб в этом проекте, находится в файле docker-compose:

version: '3.8'

services:
  zookeeper:
    image: bitnami/zookeeper:latest
    ports:
      - 2181:2181
    environment:
      - ALLOW_ANONYMOUS_LOGIN=yes

  kafka:
    image: bitnami/kafka:latest
    ports:
      - 9092:9092
      - 9093:9093
    environment:
      - KAFKA_BROKER_ID=1
      - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092
      - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092
      - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
      - ALLOW_PLAINTEXT_LISTENER=yes
      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CLIENT:PLAINTEXT
      - KAFKA_CFG_LISTENERS=CLIENT://:9092
      - KAFKA_CFG_ADVERTISED_LISTENERS=CLIENT://kafka:9092
      - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=CLIENT
    depends_on:
      - zookeeper

  questdb:
    image: questdb/questdb
    container_name: questdb
    restart: always
    expose:
      - 9000
      - 9009
      - 9003
    ports:
      - 8812:8812
    volumes:
      - ./questdb:/root/.questdb
    environment:
      - QDB_LOG_W_STDOUT_LEVEL=ERROR
      - QDB_LOG_W_FILE_LEVEL=ERROR
      - QDB_LOG_W_HTTP_MIN_LEVEL=ERROR
      - QDB_SHARED_WORKER_COUNT=2
      - QDB_PG_USER=${DB_USER}
      - QDB_PG_PASSWORD=${DB_PASSWORD}
      - QDB_TELEMETRY_ENABLED=false
      - QDB_CAIRO_SQL_COPY_ROOT=./
  producer:
    build:
      context: ./producer
      dockerfile: Dockerfile
    command: uvicorn main:app --workers 1 --host 0.0.0.0 --port 8000
    ports:
      - 8000:8000
    env_file:
      - .env
    depends_on:
      - kafka
      - zookeeper
  db_consumer:
    build:
      context: ./db_consumer
      dockerfile: Dockerfile
    command: python main.py
    env_file:
      - .env
    depends_on:
      - kafka
      - zookeeper
  ui_server:
    build:
      context: ./ui_server
      dockerfile: Dockerfile
    command: uvicorn main:app --workers 1 --host 0.0.0.0 --port 5000
    ports:
      - 5000:5000
    env_file:
      - .env
    depends_on:
      - db_consumer
  kafka-ui:
    image: provectuslabs/kafka-ui
    container_name: kafka-ui
    ports:
      - "18080:8080"
    restart: always
    environment:
      - KAFKA_CLUSTERS_0_NAME=local
      - KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=kafka:9092
      - KAFKA_CLUSTERS_0_ZOOKEEPER=zookeeper:2181
    depends_on:
      - kafka
      - zookeeper
volumes:
  zookeeper_data:
    driver: local
  kafka_data:
    driver: local

Обратите внимание на четыре сервиса, которые мы не пишем сами (к счастью): Zookeeper, Kafka, QuestDB и Kafka-UI. Эти сервисы работают совместно с производителем, потребителем и пользовательским интерфейсом для создания проекта. Мы рассмотрим каждый сервис в отдельности, но сначала нам нужно разобраться с источником данных.

Регистратор датчиков

Sensor Logger — это приложение для iOS и Android, которое позволяет пользователям регистрировать показания датчиков, связанных с движением, со своих смартфонов. Пользователи могут просматривать показания датчиков в режиме реального времени, экспортировать данные в виде файлов и отправлять оперативные данные на сервер по протоколу HTTP. Этот проект использует функциональность HTTP для извлечения показаний датчиков. Чтобы настроить регистратор, для начала убедитесь, что выбраны все следующие датчики:

Мы будем получать показания акселерометра, гироскопа и магнитометра телефона. Далее нам нужно настроить параметры регистратора датчиков таким образом, чтобы он знал, куда именно отправлять данные:

Наиболее важным компонентом является обеспечение правильности “Push URL” — это конечная точка производителя FastAPI, которая принимает необработанные показания датчиков с помощью POST-запросов. Мы будем использовать наш компьютер в качестве сервера, поэтому нам нужно определить соответствующий IP-адрес. На Mac это находится в разделе Системные настройки -> Сеть.

Обратите внимание, что IP-адрес компьютера обычно уникален для сети WI-FI, что означает, что новый IP-адрес выделяется каждый раз, когда компьютер подключается к новой сети. Поэтому крайне важно, чтобы смартфон и главный компьютер находились в одной сети. Производитель FastAPI принимает показания датчиков на:

http://%7Byour_ip_address%7D:8000/phone-producer

Вставьте приведенный выше URL-адрес в поле «Push URL», и регистратор датчиков должен быть готов к работе!

Кафка и ZooKeeper

В этом разделе не будем вдаваться в подробности о Kafka, поскольку на платформе доступно много ресурсов. Однако можно сказать, что Kafka — это высокопроизводительный фреймворк для хранения и чтения потоковых данных. Фундаментальная структура данных Кафки — журнал. Приложения, которые записывают сообщения в журнал, называются продюсерами. В отличие от очереди, сообщения в журнале сохраняются даже после прочтения — это позволяет нескольким приложениям, известным как консумеры, читать одновременно с разных позиций. 

Для простоты в этом проекте есть только один продюсер — приложение FastAPI, которое записывает необработанные показания датчиков в Kafka, и один консумер, процесс на python, который считывает сообщения из Kafka и форматирует их в базе данных. Zookeeper — это сервис, который помогает управлять различными компонентами Kafka.

Для локального запуска Kafka и Zookeeper необходимы только два образа docker:

zookeeper:
    image: bitnami/zookeeper:latest
    ports:
      - 2181:2181
    environment:
      - ALLOW_ANONYMOUS_LOGIN=yes
      
kafka:
    image: bitnami/kafka:latest
    ports:
      - 9092:9092
      - 9093:9093
    environment:
      - KAFKA_BROKER_ID=1
      - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092
      - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092
      - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
      - ALLOW_PLAINTEXT_LISTENER=yes
      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CLIENT:PLAINTEXT
      - KAFKA_CFG_LISTENERS=CLIENT://:9092
      - KAFKA_CFG_ADVERTISED_LISTENERS=CLIENT://kafka:9092
      - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=CLIENT
    depends_on:
      - zookeeper
kafka-ui:
    image: provectuslabs/kafka-ui
    container_name: kafka-ui
    ports:
      - "18080:8080"
    restart: always
    environment:
      - KAFKA_CLUSTERS_0_NAME=local
      - KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=kafka:9092
      - KAFKA_CLUSTERS_0_ZOOKEEPER=zookeeper:2181
    depends_on:
      - kafka
      - zookeeper

Мы будем использовать дистрибутив Kafka и Zookeeper от Bitmani. Kafka-UIImage позволяет пользователям взаимодействовать с кластерами Kafka через веб-приложение, но не требуется для этого проекта. Сохраните приведенный выше файл docker-compose как docker-compose.yml, запустите docker-compose, и графический интерфейс, подобный следующему, должен быть доступен по адресу http://localhost:18080/.

Информация о брокерах, темах и консумерах будет добавляться на эту панель мониторинга по мере добавления компонентов в систему.

Продюсер

Пока что у нас есть регистратор датчиков, настроенный для отправки необработанных показаний датчиков на сервер, и экземпляр Kafka готов к приему этих показаний. Следующий шаг — создать мост между исходными данными и Kafka-продюсером. Продюсером в этом проекте является быстрое API-приложение, которое принимает данные, отправляемые со смартфонов, и записывает их в журнал Kafka. Вот макет продюсера:

|-producer
 | |-app
 | | |-core
 | | | |-config.py
 | | |-__init__.py
 | | |-schemas
 | | | |-sensors.py
 | | |-main.py
 | |-requirements.txt
 | |-Dockerfile
 | |-entrypoint.sh

Мы не будем просматривать каждый файл в каталоге продюсера, поскольку все доступно на GitHub. Вместо этого давайте взглянем на main.py управляющий скрипт API производителя:

import json
from fastapi import FastAPI
import asyncio
from aiokafka import AIOKafkaProducer
from schemas.sensors import SensorReading, SensorResponse
from core.config import app_config
from loguru import logger

app = FastAPI(title=app_config.PROJECT_NAME)

loop = asyncio.get_event_loop()

producer = AIOKafkaProducer(
    loop=loop,
    client_id=app_config.PROJECT_NAME,
    bootstrap_servers=app_config.KAFKA_URL
)

@app.on_event("startup")
async def startup_event():
    await producer.start()

@app.on_event("shutdown")
async def shutdown_event():
    await producer.stop()

@app.post("/phone-producer/")
async def kafka_produce(data: SensorReading):

    """
    Produce a message containing readings from a smartphone sensor.
    Parameters
    ----------
    data : SensorReading
        The request body containing sensor readings and metadata.
    Returns
    -------
    response : SensorResponse
        The response body corresponding to the processed sensor readings
        from the request.
    """

    await producer.send(app_config.TOPIC_NAME, json.dumps(data.dict()).encode("ascii"))

    response = SensorResponse(
        messageId=data.messageId,
        sessionId=data.sessionId,
        deviceId=data.deviceId
    )

    logger.info(response)

    return response

Строка 9 создает экземпляр объекта Fast API. Строки 11-17 создают экземпляр объекта Kafka-продюсер с помощью Aiokafka. Aiokafka позволяет нам записывать сообщения в Kafka асинхронно. Это значит, что нам не нужно ждать, пока Kafka получит и обработает сообщение, в строке 45, прежде чем мы перейдем к следующей строке кода. Вместо этого Aiokafka отправляет текущее сообщение в Kafka и почти мгновенно готова выдать другое сообщение. Строки 27-55 определяют маршрут, по которому будут приниматься необработанные показания датчиков. Чтобы лучше понять это, давайте взглянем на формат тела запроса, который ожидает этот маршрут — аргумент data):

{"messageId": 20,
 "sessionId": "4bf3b3b9-a241-4aaa-b1d3-c05100df9976",
 "deviceId": "86a5b0e3-6e06-40e2-b226-5a72bd39b65b",
 "payload": [{"name": "accelerometeruncalibrated",
              "time": "1671406719721160400",
              "values": {"z": -0.9372100830078125,
                         "y": -0.3241424560546875, 
                         "x": 0.0323486328125}},
             {"name": "magnetometeruncalibrated",
              "time": "1671406719726579500",
              "values": {"z": -5061.64599609375,
                         "y": 591.083251953125,
                         "x": 3500.541015625}},
             {"name": "gyroscopeuncalibrated",
              "time": "1671406719726173400",
              "values": {"z": -0.004710599314421415,
                         "y": -0.013125921599566936,
                         "x": 0.009486978873610497}}, 
...
]}

Каждое тело запроса представляет собой объект JSON с записями «MessageId», «SessionID», «DeviceID» и «payload». Смартфоны однозначно идентифицируются по их «идентификатору устройства». Каждый раз, когда телефон начинает новый поток, для него создается новый «sessionId». Запись «MessageId» указывает порядок расположения сообщений в последовательности из текущего сеанса. Запись «payload»  представляет собой массив объектов JSON, которые содержат показания для каждого датчика, настроенного в Sensor Logger. Каждая запись «payload»  содержит имя датчика, время записи показаний по времени unix и само считывание. Мы работаем исключительно с трехосными датчиками, поэтому каждый датчик должен иметь показания «x», «y» и «z», соответствующие трем пространственным измерениям.

Маршрут FastAPI записывает необработанное тело запроса непосредственно в раздел Kafka, в строке 45, а метаданные регистрируются и возвращаются в строках 47-55. Этот маршрут доступен по адресу http://{your_ip_address}:8000/phone-producer, как описано в разделе «Регистратор датчиков» (Sensor Logger) . Все запросы проверяются объектом Pydantic SensorReading. То есть, любой запрос, который не соответствует формату регистратора датчиков, не будет обработан маршрутом:

from pydantic import BaseModel, validator
from datetime import datetime
from typing import List, Dict, Union


class SensorReading(BaseModel):

    """
    Base model class for incoming requests from smartphone sensors
    Attributes
    ----------
    messageId : int
        The identifier of a message in the current session
    sessionId : int
        The identifier of a session
    deviceId : int
        The identifier of the device sending the data
    payload : List[Dict[str:Union[str, int, Dict]]]
        The payload of the request containing sensor readings
        and metadata about the readings
    """

    messageId: int
    sessionId: str
    deviceId: str
    payload: List[Dict[str, Union[str, int, Dict]]]
    
class SensorResponse(BaseModel):

    """
    Base model class for the response of the sensor request endpoint
    Attributes
    ----------
    messageId : int
        The identifier of a message in the current session
    sessionId : int
        The identifier of a session
    deviceId : int
        The identifier of the device sending the data
    timestamp : str
        The timestamp when a sensor request was processed
    """
    messageId: str
    sessionId: str
    deviceId: str
    timestamp: str = ""

    @validator("timestamp", pre=True, always=True)
    def set_datetime_utcnow(cls, v):
        return str(datetime.utcnow())

Конфигурация для производителя обрабатывается с помощью переменных окружения, которые считываются объектом Pedantic Base Settings:

from pydantic import BaseSettings, validator

# Load environment variables into a pydantic BaseSetting object
class AppConfig(BaseSettings):

    PROJECT_NAME : str 
    KAFKA_HOST : str 
    KAFKA_PORT : str
    TOPIC_NAME : str 
    KAFKA_URL : str = ""

    class Config:

        case_sensitive = True

    @validator("KAFKA_URL", pre=True, always=True)
    def set_kafka_url(cls, v, values, **kwargs):
        return values['KAFKA_HOST'] + ":" + values['KAFKA_PORT'] 

app_config = AppConfig()

Переменные окружения хранятся в файле .env:

# Kafka config
PROJECT_NAME=phone_stream_producer
TOPIC_NAME=raw-phone-stream
KAFKA_HOST=kafka
KAFKA_PORT=9092

и передаются проидюсеру в файле docker-compose, строка 9 ниже:

producer:
    build:
      context: ./producer
      dockerfile: Dockerfile
    command: uvicorn main:app --workers 1 --host 0.0.0.0 --port 8000
    ports:
      - 8000:8000
    env_file:
      - .env
    depends_on:
      - kafka
      - zookeeper

Обратите внимание, что значение host в команде запуска равно 0.0.0.0. Это позволяет получить доступ к продюсеру по его IP-адресу с любого устройства в локальной сети.

Консумер

Теперь у нас есть инфраструктура для потоковой передачи данных датчиков со смартфонов в Fast API producer и Kafka. Следующим шагом является создание процесса — консумера, который считывает данные из Kafka и что-то делает с данными. Консумеры могут нести ответственность за все, что связано с чтением данных, хранящихся в журнале, и манипулированием ими. Для этого проекта они будут использоваться для преобразования необработанных показаний датчиков и сохранения их в базе данных временных рядов, известной как QuestDB. Вот структура каталогов для консумера:

|-db_consumer
 | |-app
 | | |-core
 | | | |-config.py
 | | |-models
 | | | |-sensors.py
 | | |-db
 | | | |-ingress.py
 | | |-main.py
 | |-requirements.txt
 | |-Dockerfile
 | |-entrypoint.sh

Перед созданием консумера нам нужно создать экземпляр Quest DB. Quest DB — это высокопроизводительная база данных временных рядов с открытым исходным кодом и API, совместимым с Postgres. Это означает, что мы можем запрашивать таблицы Quest DB так, как если бы они были таблицами Postgres, ориентированными на строки. При этом  мы пользуемся преимуществами таблиц, ориентированных на столбцы. Мы можем запустить QuestDB с помощью docker:

questdb:
    image: questdb/questdb
    container_name: questdb
    restart: always
    expose:
      - 9000
      - 9009
      - 9003
    ports:
      - 8812:8812
    volumes:
      - ./questdb:/root/.questdb
    environment:
      - QDB_LOG_W_STDOUT_LEVEL=ERROR
      - QDB_LOG_W_FILE_LEVEL=ERROR
      - QDB_LOG_W_HTTP_MIN_LEVEL=ERROR
      - QDB_SHARED_WORKER_COUNT=2
      - QDB_PG_USER=${DB_USER}
      - QDB_PG_PASSWORD=${DB_PASSWORD}
      - QDB_TELEMETRY_ENABLED=false
      - QDB_CAIRO_SQL_COPY_ROOT=./

Обратите внимание в строках 5-8, что мы открываем порты 9000, 9009 и 9003. Эти порты, в частности порт 9000, используются для записи данных в таблицы QuestDB. Включив эти порты в раздел expose, а не в раздел ports, мы гарантируем, что только контейнеры, работающие в одной сети Docker, могут записывать данные. Порт 8812 доступен за пределами сети Docker и используется для запроса данных. Переменные окружения QDB_PG_USER и QDB_PG_PASSWORD, наряду с другими переменными, связанными с QuestDB, задаются в файле .env:

# Questdb config 
DB_USER =admin 
DB_PASSWORD =quest 
DB_HOST =questdb 
DB_PORT = 8812 
DB_IMP_PORT = 9000 
DB_NAME =qdb 
DB_TRIAXIAL_OFFLOAD_TABLE_NAME =device_ off load

Управляющий код потребителя находится в main.py :

import asyncio
import json
from aiokafka import AIOKafkaConsumer
from core.config import app_config
from db.ingress import (create_connection,
                        create_triaxial_table,
                        write_sensor_payloads)


async def consume_messages() -> None:

    """
    Coroutine to consume smart phone sensor messages from a kafka topic
    """

    # Create a QuestDB connection
    connection = create_connection(host=app_config.DB_HOST,
                                   port=app_config.DB_PORT,
                                   user_name=app_config.DB_USER,
                                   password=app_config.DB_PASSWORD,
                                   database=app_config.DB_NAME)
    
    # Instantiate the event loop and consumer
    loop = asyncio.get_event_loop()
    consumer = AIOKafkaConsumer(
        app_config.TOPIC_NAME,
        loop=loop,
        client_id='all',
        bootstrap_servers=app_config.KAFKA_URL,
        enable_auto_commit=False,
    )

    await consumer.start()
    try:
        async for msg in consumer:
            print(msg.value)
            print('################')
            # Format each message in the log and write to QuestDB
            write_sensor_payloads(json.loads(msg.value), app_config.DB_IMP_URL, app_config.DB_TRIAXIAL_OFFLOAD_TABLE_NAME)
    finally:
        await consumer.stop()
        connection.close()

async def main():

    await consume_messages()

if __name__ == "__main__":

    # Create the table to store triaxial sensor data if it doesn't exist
    connection = create_connection(host=app_config.DB_HOST,
                                    port=app_config.DB_PORT,
                                    user_name=app_config.DB_USER,
                                    password=app_config.DB_PASSWORD,
                                    database=app_config.DB_NAME)

    create_triaxial_table(app_config.DB_TRIAXIAL_OFFLOAD_TABLE_NAME, connection)

    asyncio.run(main())

Здесь многое нужно распаковать, но основная логика изложена в строках 35-39. Консумер асинхронно перебирает сообщения в указанной теме Kafka. Этот цикл будет непрерывно обрабатывать сообщения до тех пор, пока тема обновляется. Сообщения форматируются и записываются в таблицу Quest DB с помощью следующей функции:

def write_triaxial_sensor_data(data:dict, server_url:str, table_name:str):
    
    """
    Write triaxial phone sensor data to database tables
    Parameters
    ----------
    data : dict
        The raw request data sent by the phone
    server_url : str
        The URL where sensor data will be written to
    table_name : str
        The name of the table to write to 
    """

    session_id = data['sessionId']
    device_id = data['deviceId']

    # Create an empty dict to store structured sensor from the payload
    structured_payload = {'device_id':[],
                            'session_id':[],
                            'device_timestamp':[],
                            'recorded_timestamp':[],
                            'sensor_name':[],
                            'x':[],
                            'y':[],
                            'z':[]
                            }
    
    for d in data['payload']:

        # Triaxial sensors
        if d.get("name") in DEVICE_TO_DB_SENSOR_NAME.keys():

            structured_payload['device_id'].append(device_id)
            structured_payload['session_id'].append(session_id)
            structured_payload['device_timestamp'].append(str(datetime.fromtimestamp(int(d["time"]) / 1000000000)))
            structured_payload['recorded_timestamp'].append(str(datetime.utcnow()))
            structured_payload['sensor_name'].append(DEVICE_TO_DB_SENSOR_NAME.get(d.get("name")))
            structured_payload['x'].append(d["values"]["x"])
            structured_payload['y'].append(d["values"]["y"])
            structured_payload['z'].append(d["values"]["z"])  

    output = StringIO()
    pd.DataFrame(structured_payload).to_csv(output, sep=',', header=True, index=False)
    output.seek(0)
    contents = output.getvalue()
    csv = {'data': (table_name, contents)}
    response = requests.post(server_url, files=csv)

Вся полезная нагрузка форматируется и сохраняется в виде CSV-файла в памяти с помощью StringIO. Оттуда CSV отправляется через POST-запрос на порт записи Quest DB. Это облегчает быструю запись всей полезной нагрузки в QuestDB с использованием одного подключения и запроса.

Таблица, в которой хранятся данные датчиков, предназначена для обеспечения баланса между быстрой записью и быстрым чтением. Вот запрос для создания таблицы в QuestDB:

СОЗДАЙТЕ  ТАБЛИЦУ , ЕСЛИ НЕ  СУЩЕСТВУЕТ device_offload ( 
    device_id TEXT, 
    session_id TEXT, 
    device_timestamp TEXT, 
    record_timestamp TEXT, 
    sensor_name TEXT, 
    x REAL , 
    y REAL , 
    z REAL
 )

Поля device_id и session_id берутся непосредственно из первых двух записей необработанной полезной нагрузки, как обсуждалось ранее. device_timestamp — это время, когда на устройстве была собрана отдельная выборка данных датчика, в то время как recorded_timestamp — это время, когда выборка попала в базу данных. Благодаря этому мы можем измерить, сколько времени требуется, чтобы выборка данных попала с устройства в базу данных.

Поскольку мы работаем только с трехосными датчиками, мы можем сохранить их значения в полях x, y и z и указать, какому датчику принадлежит каждый образец, в поле sensor_name. Эта схема позволяет нам записывать данные с каждого датчика в полезной нагрузке в одну и ту же таблицу за одну запись, в отличие от записи в несколько таблиц, требующих многократной записи.

Важно отметить, что в реальных условиях эта таблица QuestDB, скорее всего, не будет конечным местом хранения данных. Вместо этого таблица будет действовать как буфер, позволяя приложениям легко получать доступ к данным в структурированном формате. Данные высокочастотных датчиков, в нашем случае 50 Гц, быстро растут, и их становится трудно поддерживать. Скорее всего, мы бы внедрили еще один конвеер Kafka, отвечающий за перемещение старых данных из QuestDB в архив.

Последним шагом для этого консумера является добавление соответствующих команд docker-compose:

db_consumer:
    build:
      context: ./db_consumer
      dockerfile: Dockerfile
    command: python main.py
    env_file:
      - .env
    depends_on:
      - kafka
      - zookeeper

Дэшборд

У нас есть все для визуализации данных датчика в том виде, в каком они записаны в QuestDB. Чтобы сделать это, нам нужно создать другое приложение Fast API, которое опрашивает базу данных и использует события, отправляемые сервером (SSE), для обновления HTML-страницы. Вот последняя структура каталогов, которую нужно изучить:

|-ui_server
 | |-app
 | | |-core
 | | | |-config.py
 | | |-models
 | | | |-sensors.py
 | | |-static
 | | | |-js
 | | | | |-main.js
 | | |-db
 | | | |-data_api.py
 | | |-templates
 | | | |-index.html
 | | |-main.py
 | |-requirements.txt
 | |-Dockerfile
 | |-entrypoint.sh

Как и прежде, main.py — драйвер для этого приложения:

import asyncio
import json
import logging
import sys
from fastapi import FastAPI
from fastapi.requests import Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.staticfiles import StaticFiles
from fastapi.responses import HTMLResponse
from fastapi.templating import Jinja2Templates
from sse_starlette.sse import EventSourceResponse
from core.config import app_config
from db.data_api import create_connection, get_recent_triaxial_data, DEVICE_TO_DB_SENSOR_NAME
from models.sensors import SensorName


CONNECTION = create_connection(app_config.DB_HOST,
                               app_config.DB_PORT,
                               app_config.DB_USER,
                               app_config.DB_PASSWORD,
                               app_config.DB_NAME )

logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
logger = logging.getLogger(__name__)

app = FastAPI()
origins = [
   f"http://localhost:{app_config.UI_PORT}",
   f"http://127.0.0.1:{app_config.UI_PORT}",
   f"http://0.0.0.0:{app_config.UI_PORT}"
]

app.add_middleware(
    CORSMiddleware,
    allow_origins=origins,
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

app.mount("/static", StaticFiles(directory="static"), name="static")
templates = Jinja2Templates(directory="templates")
templates = Jinja2Templates(directory="templates")

@app.get("/", response_class=HTMLResponse)
async def index(request: Request) -> templates.TemplateResponse:
    return templates.TemplateResponse("index.html", {"request": request})


@app.get('/chart-data')
async def message_stream(request: Request):
    def new_messages():
        yield True
    async def event_generator():
        while True:
           
            if await request.is_disconnected():
                break
                
            if new_messages():

                data = get_recent_triaxial_data(connection=CONNECTION, 
                                                table_name=app_config.DB_TRIAXIAL_OFFLOAD_TABLE_NAME,
                                                sensor_name=DEVICE_TO_DB_SENSOR_NAME[SensorName.ACC.value],
                                                sample_rate=app_config.PHONE_SAMPLE_RATE,
                                                num_seconds=1,
                                                max_lookback_seconds=60)                

                message_data = {}

                for device_id in data['device_id'].unique():

                    data_device = data[data['device_id']==device_id]

                    message_data[device_id] = {
                                    'time':[t[11:] for t in list(data_device['recorded_timestamp'].astype(str).values)],
                                    'x':list(data_device['x'].astype(float).values),
                                    'y':list(data_device['y'].astype(float).values),
                                    'z':list(data_device['z'].astype(float).values)
                                }

                message = json.dumps(message_data)
                yield {
                        "event": "new_message",
                        "id": "message_id",
                        "retry":1500000,
                        "data": message
                }

            await asyncio.sleep(0.1)

    return EventSourceResponse(event_generator())

Каждые 0,1 секунды, строка 90, функция message_stream запрашивает у базы данных самую последнюю секунду показаний датчика, строка 62. В этой итерации панели мониторинга запрашиваются и отображаются только данные акселерометра. Аргументу max_lookback_seconds присвоено значение 60 — это означает, что все телефоны, которые не отправляли данные за последние 60 секунд, будут отфильтрованы в запросе. Следовательно, на этой панели мониторинга будут отображаться данные акселерометра за последнюю секунду для всех телефонов, отправивших данные за последнюю минуту. Вот логика запроса:

def get_recent_triaxial_data(connection:pg.connect,
                             table_name:str,
                             sensor_name:str,
                             sample_rate:int,
                             num_seconds:float,
                             max_lookback_seconds:float):

    """
    Query the most recent data from a triaxial smartphone sensor.
    Parameters
    ----------
    connection:pg.connect
        A postgres connection object
    table_name:str
        The table where the sensor data is stored
    sensor_name:str
        The name of the sensor to query
    sample_rate:int
        The sampling rate of the sensor (in hz)
    num_seconds:float
        The number of seconds of data to pull
    max_lookback_seconds:float
        The maximum amount seconds to look for data from.
        For instance, if a device stopped producing data
        10 seconds ago, and max_lookback_seconds = 10,
        then data for this device will be ignored.
    Returns
    -------
    A DataFrame with the requested sensor data
    """

    # The number of samples to get
    num_samples:int = int(sample_rate*num_seconds)

    query:str = f"""with tmp as (select device_id,
                                       recorded_timestamp,
                                       x,
                                       y,
                                       z,
                                       row_number() over(partition by device_id order by
                                                        recorded_timestamp desc) as rn
                                        from {table_name}
                                        where sensor_name = '{sensor_name}'
                                        and recorded_timestamp::timestamp >= dateadd('s', -{int(max_lookback_seconds)}, now())
                               )
                              select * from tmp 
                              where rn <= {num_samples}
                              """

    return pd.read_sql(query, connection)

Добавьте необходимые строки в файл docker-compose:

ui_server:
    build:
      context: ./ui_server
      dockerfile: Dockerfile
    command: uvicorn main:app --workers 1 --host 0.0.0.0 --port 5000
    ports:
      - 5000:5000
    env_file:
      - .env
    depends_on:
      - db_consumer

И панель мониторинга должна быть доступна по адресу http://localhost:5000:

В этой статье представлен общий обзор проекта потоковой передачи в реальном времени с источником данных, к которому имеет доступ большинство людей (смартфоны). Хотя здесь много движущихся частей, мы просто заглянули в мир потоковой передачи данных.

https://slurm.club/3HmEXDx
Но о Kafka можно узнать больше.

Углублённый курс с практикой на Java или Golang и платформой Spring+Docker+Postgres переведёт вас на новый уровень владения инструментом.

На курсе «Apache Kafka для разработчиков» мы обсудим:

  • неправильное использование Кафка и отсутствие коммитов в ней;

  • ваши кейсы о проблемах при работе с Apache Kafka;

  • опыт создания Data Lake на ~80 ТБ с помощью Apache Kafka;

  • особенности эксплуатации kafka с retention в 99999999.

Старт потока — 12 мая 2023. Присоединяйтесь кобучению сейчас 👉 https://slurm.club/3HmEXDx

Полезные ссылки для изучения:

  1. Apache Kafka

  2. Event-Driven Architectures — The Queue vs The Log

  3. Lucidchart

  4. Kafka Poc using FastApi

  5. geo-stream-kafka

  6. 18 Most Popular IoT Devices in 2022

  7. FastAPI

  8. QuestDB

  9. Row vs Column Oriented Databases

Теги:
Хабы:
+8
Комментарии 1
Комментарии Комментарии 1

Публикации

Информация

Сайт
slurm.io
Дата регистрации
Дата основания
Численность
51–100 человек
Местоположение
Россия
Представитель
Антон Скобин