Pull to refresh
129.41
ITSumma
Эксперты в производительности

Картографирование шума с помощью KSQL, Raspberry Pi и радиоприёмника

Reading time 5 min
Views 8.9K
Original author: Simon Aubury


На первый взгляд, в этой истории есть всё, чтобы заслужить статус романтичного поста накануне 8 марта: самолёты, любовь, чуточка шпионажа и, наконец, котик (точнее, кошка). Трудно представить, что всё это имеет самое непосредственное отношение к Kafka, KSQL и эксперименту «как в домашних условиях с помощью информационных технологий найти самый шумный самолёт». Трудно, но придётся: именно такой эксперимент провёл Саймон Обьюри, а мы перевели статью его авторства с описанием всех подробностей процесса.

Наша новая кошка по имени Снежинка просыпается рано. Её будят звуки самолётов, пролетающих над нашим домом. А что если бы я, используя Apache Kafka, KSQL и Raspberry Pi, смог определить, какой именно самолёт не даёт моей кошке спать? Хорошо бы еще создать занятную панель слежения, на которую кошка могла бы переключить свое внимание — и дать мне ещё немножко поспать.

В общих чертах



Переносим самолёты с неба в графики с помощью Kafka и KSQL

Самолёты определяют свое местоположение с помощью GPS приёмников. Бортовой передатчик периодически сообщает локацию, идентификационный номер, высоту и скорость корабля, используя короткие радиопередачи. Эти передачи вещательного автоматического зависимого наблюдения (АЗН-В) являются по сути пакетами данных, открытыми для доступа с наземных станций.

Один микрокомпьютер, такой как Raspberry Pi, и несколько вспомогательных компонентов — это всё, что требуется для получения сообщений от бортовых передатчиков самолётов, снующих над моим домом.

Бортовые сигналы самолётов выглядят, как запутанный клубок сообщений и требуют систематизации. Распознать эти хаотичные потоки данных — это всё равно, что подслушать беседу на шумной вечеринке. Поэтому, чтобы найти самолёт, который тревожит мою кошку, я решил использовать сочетание Kafka и KSQL.


Разбуженная кошка и Raspberry Pi

Сбор показаний АЗН-В с помощью Raspberry Pi


Для сбора бортовых передач я использовал Raspberry Pi и RTL2832U — USB-модем, который изначально продавался как устройство для просмотра цифрового ТВ на компьютере. На Raspberry Pi я установил dump1090 — программу, которая получает данные с АЗН-В через RTL2832U с помощью небольшой антенны.


Мой программный радиоприёмник из Raspberry Pi и RTL2832U

Преобразуем сигналы АЗН-В в темы Kafka


Теперь, когда я получил поток необработанных сигналов АЗН-В, нам следует обратить внимание на трафик. Raspberry Pi не имеет достаточной мощности для серьезных вычислений, поэтому мне пришлось передать обработку данных моему локальному кластеру на Kafka.



Получаемые сообщения делятся либо на сообщения о локации, либо на сообщения об идентификации борта. Локация выглядит как сообщение вида: «борт 7c6db8 летит на высоте 6250 футов в координате -33.8,151.0». Информация об идентификации борта будет выглядит как: «борт 7c451c совершает полет по маршруту QJE1726».

Небольшой Python-скрипт для моей Raspberry Pi разделяет все входящие сообщения АЗН-В. Я использовал прокси-сервер Confluent Rest Proxy для распределения данных с Raspberry Pi в темы location-topic и ident-topic на Kafka. Прокси-сервер предоставляет RESTful интерфейс для кластера Kafka, что позволяет легко создавать сообщения путём простого REST-вызова на Pi.



Я хотел понять, какие самолёты летают над моей крышей и по каким маршрутам. База данных OpenFlights позволяет сопоставить код авиаборта, например 7C6DB8, присвоенный Международной организацией гражданской авиации (ИКАО), с типом самолёта — в нашем случае «Боинг-737». Я загрузил данные моего картографирования в тему icao-to-aircraft.

KSQL предоставляет «SQL-движок», который даёт возможность обрабатывать данные в режиме реального времени по темам Apache Kafka. Например, чтобы найти бортовой код 7C6DB8, мы можем написать следующий запрос:

CREATE TABLE icao_to_aircraft WITH (KAFKA_TOPIC='ICAO_TO_AIRCRAFT_REKEY', VALUE_FORMAT='AVRO', KEY='ICAO'); 

ksql> SELECT manufacturer, aircraft, registration \ 
FROM icao_to_aircraft \ 
WHERE icao = '7C6DB8'; 
Boeing | B738 | VH-VYI

Аналогично, в тему callsign-details я загрузил позывные (т. е. QFA563, это рейс авиакомпании Qantas из Брисбена в Сидней).

CREATE TABLE callsign_details WITH (KAFKA_TOPIC='CALLSIGN_DETAILS_REKEY', VALUE_FORMAT='AVRO', KEY='CALLSIGN'); 

ksql> SELECT operatorname, fromairport, toairport \ 
FROM callsign_details \ 
WHERE callsign = 'QFA563'; 

Qantas | Brisbane | Sydney

Теперь давайте взглянем на поток данных location-topic. Тут мы можем наблюдать постоянный поток входящих сообщений о местоположении пролетающего самолёта.

kafka-avro-console-consumer --bootstrap-server localhost:9092 --property --topic location-topic 

{"ico":"7C6DB8","height":"6250","location":"-33.807724,151.091495"}

Запрос на KSQL будет выглядеть так:
ksql> SELECT TIMESTAMPTOSTRING(rowtime, 'yyyy-MM-dd HH:mm:ss'), \ 
ico, height, location \ 
FROM location_stream \ 
WHERE ico = '7C6DB8'; 

2018-09-19 07:13:33 | 7C6DB8 | 6250.0 | -33.807724,151.091495 

KSQL: гармонизация потоков...


Настоящая ценность KSQL заключается в возможности объединения входящих потоков данных о местоположении с исходными данными тем (см. 03_ksql.sql) — то есть в добавлении полезных сведений к необработанному потоку данных. Это очень похоже на «left join» в традиционной БД. Результатом является еще одна тема Kafka, созданная без единой строчки кода на Java!

source>CREATE STREAM location_and_details_stream AS \
SELECT l.ico, l.height, l.location, t.aircraft \
FROM location_stream l \
LEFT JOIN icao_to_aircraft t ON l.ico = t.icao;
К тому же вы получаете запрос KSQL. Поток данных будет выглядеть так:

ksql> SELECT TIMESTAMPTOSTRING(rowtime, 'yy-MM-dd HH:mm:ss') \ 
, manufacturer \ 
, aircraft \ 
, registration \ 
, height \ 
, location \ 
FROM location_and_details_stream; 
18-09-27 09:53:28 | Boeing | B738 | VH-YIA | 7225 | -33.821,151.052 
18-09-27 09:53:31 | Boeing | B738 | VH-YIA | 7375 | -33.819,151.049 
18-09-27 09:53:32 | Boeing | B738 | VH-YIA | 7425 | -33.818,151.048 

Помимо этого, мы можем объединить входящий поток callsign с фиксированной темой callsign_details:

CREATE STREAM ident_callsign_stream AS \ 
SELECT i.ico \ 
, c.operatorname \ 
, c.callsign \ 
, c.fromairport \ 
, c.toairport \ 
FROM ident_stream i \ 
LEFT JOIN callsign_details c ON i.indentification = c.callsign; 

ksql> SELECT TIMESTAMPTOSTRING(rowtime, 'yy-MM-dd HH:mm:ss') \ 
, operatorname \ 
, callsign \ 
, fromairport \ 
, toairport \ 
FROM ident_callsign_stream ; 
18-09-27 13:33:19 | Qantas | QFA926 | Sydney | Cairns 
18-09-27 13:44:11 | China Eastern | CES777 | Kunming | Sydney 
18-09-27 14:00:54 | Air New Zealand | ANZ110 | Sydney | Auckland 

Теперь у нас есть две информативные темы:

  1. location_and_details_stream, которая обеспечивает поток обновленной информации о местоположении и скорости самолёта;
  2. ident_callsign_stream, которая описывает подробности рейса, в том числе авиакомпанию и пункт назначения.

С этими постоянно обновляемыми темами мы можем создать несколько отличных обзорных панелей. Я использовал Kafka Connect, чтобы выгрузить темы Kafka, заполняемые KSQL, в Elasticsearch (полные скрипты здесь).

Обзорная панель Kibana


Вот пример обзорной панели, демонстрирующей местоположение самолёта на карте. Кроме того, вы можете увидеть диаграмму по авиакомпаниям, график высоты полета и облака слов по основным пунктам назначения. Тепловая карта показывает районы сосредоточения самолётов, то есть области с наивысшим уровнем шума.



Назад, к кошке


Сегодня кошка разбудила меня в районе 6 часов утра. Может ли KSQL помочь мне найти тот самолёт, который пролетал в это время над моим домом на высоте меньше 3500 футов?

select timestamptostring(rowtime, 'yyyy-MM-dd HH:mm:ss') 
, manufacturer 
, aircraft 
, registration 
, height 
from location_and_details_stream 
where height < 3500 and rowtime > stringtotimestamp('18-09-27 06:10', 'yy-MM-dd HH:mm') and rowtime < stringtotimestamp('18-09-27 06:20', 'yy-MM-dd HH:mm'); 

2018-09-27 06:15:39 | Airbus | A388 | A6-EOD | 2100.0 
2018-09-27 06:15:58 | Airbus | A388 | A6-EOD | 3050.0 

Потрясающе! Я могу определить самолёт, оказавшийся над моей крышей в 6:15 утра. Оказывается, Снежинку разбудил Airbus А380 (огромный лайнер, кстати), который летел в Дубай.

Всего пара выходных дней, и у вас есть система потоковой обработки с KSQL. Которая, к тому же, позволяет быстро найти интересные события данных. Хотя Снежинка может отнестись к ним скептически.

Tags:
Hubs:
+42
Comments 9
Comments Comments 9

Articles

Information

Website
www.itsumma.ru
Registered
Founded
Employees
101–200 employees
Location
Россия
Representative
ITSumma