Flightradar24 — как это работает? Часть 2, ADS-B протокол

    Привет Хабр. Наверное каждый, кто хоть раз встречал или провожал родственников или друзей на самолет, пользовался бесплатным сервисом Flightradar24. Это весьма удобный способ отслеживания положения самолета в реальном времени.

    image

    В первой части был описан принцип работы такого онлайн-сервиса. Сейчас мы пойдем дальше, и выясним, какие данные передаются и принимаются от воздушного судна к приемной станции, и декодируем их самостоятельно с помощью Python.

    История


    Очевидно, что данные о самолетах передаются не для того, чтобы пользователи видели их на своих смартфонах. Система называется ADS–B (Automatic dependent surveillance—broadcast), и служит для автоматической передачи информации о воздушном судне в диспетчерский центр — передаются его идентификатор, координаты, направление, скорость, высота и прочие данные. Ранее, до появления таких систем, диспетчер мог видеть лишь точку на радаре. Этого стало недостаточно, когда самолетов стало слишком много.

    Технически, ADS-B состоит из передатчика на воздушном судне, который периодически посылает пакеты с информацией на достаточно высокой частоте 1090 МГц (есть и другие режимы, но нам они не так интересены, т.к. координаты передаются только здесь). Разумеется, кроме передатчика, есть и приемник где-то в аэропорту, но для нас, как для пользователей, интересен приемник наш собственный.

    Кстати, для сравнения, первая такая система, Airnav Radarbox, расчитанная на обычных пользователей, появилась в 2007 году, и стоила около 900$, еще около 250$ в год стоила подписка на сетевые сервисы.



    Отзывы тех первых российских владельцев можно почитать на форуме radioscanner. Сейчас, когда массово стали доступны RTL-SDR приемники, аналогичный девайс можно собрать за 30$, подробнее об этом было в первой части. Мы же перейдем собственно, к протоколу — посмотрим как это работает.

    Прием сигналов


    Для начала, сигнал нужно записать. Весь сигнал имеет длительность всего лишь 120 микросекунд, поэтому чтобы комфортно разобрать его компоненты, желателен SDR-приемник с частотой дискретизации не менее 5МГц.

    image

    После записи мы получаем WAV-файл с частотой дискретизации 5000000 семплов/сек, 30 секунд такой записи «весят» около 500Мб. Слушать её медиаплеером разумеется, бесполезно — файл содержит не звук, а непосредственно оцифрованный радиосигнал — именно так работает Software Defined Radio.

    Открывать и обрабатывать файл мы будем с помощью Python. Желающие поэкспериментировать самостоятельно, могут скачать пример записи по ссылке.

    Загрузим файл, и посмотрим что внутри.

    from scipy.io import wavfile
    import matplotlib.pyplot as plt
    import numpy as np
    
    fs, data = wavfile.read("adsb_20190311_191728Z_1090000kHz_RF.wav")
    data = data.astype(float)
    I, Q = data[:, 0], data[:, 1]
    A = np.sqrt(I*I + Q*Q)
    
    plt.plot(A)
    plt.show()
    

    Результат: мы видим явные «импульсы» на фоне шума.



    Каждый «импульс» — это и есть сигнал, структуру которого хорошо видно, если увеличить разрешение на графике.



    Как можно видеть, картинка вполне соответствует тому, что приведено в описании выше. Можно приступать к обработке данных.

    Декодирование


    Для начала, нужно получить битовый поток. Сам сигнал закодирован с помощью manchester encoding:



    Из разницы уровней в полубайтах легко получить реальные «0» и «1».

        bits_str = ""
        for p in range(8):
            pos = start_data + bit_len*p
            p1, p2 = A[pos: pos + bit_len/2], A[pos + bit_len/2: pos + bit_len]
            avg1, avg2 = np.average(p1), np.average(p2)
            if avg1 < avg2:
                bits_str += "0"
            elif avg1 > avg2:
                bits_str += "1"
    

    Структура самого сигнала имеет следующий вид:



    Рассмотрим поля более подробно.

    DF (Downlink Format, 5 бит) — определяет тип сообщения. Их несколько типов:


    (источник таблицы)

    Нас интересует только тип DF17, т.к. именно он содержит координаты воздушного судна.

    ICAO (24 бита) — международный уникальный код воздушного судна. Проверить самолет по его коду можно на сайте (к сожалению, автор перестал обновлять базу, но она еще актуальна). К примеру, для кода 3c5ee2 имеем следующую информацию:



    Правка: в комментарии к статье описание кода ICAO приведено более подробно, интересующимся рекомендую ознакомиться.

    DATA (56 или 112 бит) — собственно данные, которые мы и будем декодировать. Первые 5 бит данных — поле Type Code, содержащее подтип хранящихся данных (не путать с DF). Таких типов довольно много:


    (источник таблицы)

    Разберем несколько примеров пакетов.

    Aircraft identification

    Пример в бинарном виде:

    00100 011 000101 010111 000111 110111 110001 111000

    Поля данных:

    +------+------+------+------+------+------+------+------+------+------+
    | TC,5 | EC,3 | C1,6 | C2,6 | C3,6 | C4,6 | C5,6 | C6,6 | C7,6 | C8,6 |
    +------+------+------+------+------+------+------+------+------+------+
    

    TC = 00100b = 4, каждый символ C1-C8 содержит коды, соответствующие индексам в строке:
    #ABCDEFGHIJKLMNOPQRSTUVWXYZ#####_###############0123456789######

    Раскодировав строку, несложно получить код самолета: EWG7184

    symbols = "#ABCDEFGHIJKLMNOPQRSTUVWXYZ#####_###############0123456789######"
    code_str = ""
    for p in range(8):
         c = int(bits_str[8 + 6*p:8 + 6*(p + 1)], 2)
         code_str += symbols[c]
    print("Aircraft Identification:", code_str.replace('#', ''))
    

    Airborne position

    Если с названием все просто, то с координатами посложнее. Они передаются в виде 2х, четных и нечетных фреймов. Код поля TC = 01011b = 11.



    Пример четного и нечетного пакетов:

    01011 000 000101110110 00 10111000111001000 10000110101111001
    01011 000 000110010000 01 10010011110000110 10000011110001000
    

    Само вычисление координат происходит по достаточно хитрой формуле:


    (источник)

    Я не специалист по ГИС, так что откуда оно выводится, не знаю. Кто в курсе, напишите в комментариях.

    Высота считается проще — в зависимости от определенного бита, она может представляться либо кратной 25, либо 100 футам.

    Airborne Velocity

    Пакет с TC=19. Интересно тут то, что скорость может быть как точная, относительно земли (Ground Speed), так и воздушная, измеряемая датчиком самолета (Airspeed). Еще передается множество разных полей:


    (источник)

    Заключение


    Как можно видеть, технология ADS-B стала интересным симбиозом, когда какой-либо стандарт пригождается не только профессионалам, но и обычным пользователям. Но разумеется, ключевую роль в этом сыграло удешевление технологии цифровых SDR-приемников, позволяющим на девайсе буквально «за копейки» принимать сигналы с частотой выше гигагерца.

    В самом стандарте разумеется, гораздо больше всего. Желающие могут посмотреть PDF на странице ICAO или посетить уже упомянутый выше сайт.

    Вряд ли многим пригодится все вышенаписанное, но по крайней мере общая идея того, как это работает, надеюсь, осталась.

    Кстати, готовый декодер на Python уже существует, его можно изучить здесь. А владельцы SDR-приемников могут собрать и запустить готовый ADS-B декодер со страницы, подробнее об этом рассказывалось в первой части.

    Исходный код парсера, описанный в статье, приведен под катом. Это тестовый пример, не претендующий на production, но кое-что в нем работает, и парсить записанный выше файл, им можно.
    Исходный код (Python)
    from __future__ import print_function
    
    from scipy.io import wavfile
    from scipy import signal
    import matplotlib.pyplot as plt
    import numpy as np
    import math
    import sys
    
    
    def parse_message(data, start, bit_len):
        max_len = bit_len*128
        A = data[start:start + max_len]
        A = signal.resample(A, 10*max_len)
        bits = np.zeros(10*max_len)
        bit_len *= 10
        start_data = bit_len*8
    
        # Parse first 8 bits
        bits_str = ""
        for p in range(8):
            pos = start_data + bit_len*p
            p1, p2 = A[pos: pos + bit_len/2], A[pos + bit_len/2: pos + bit_len]
            avg1, avg2 = np.average(p1), np.average(p2)
            if avg1 < avg2:
                bits_str += "0"
            elif avg1 > avg2:
                bits_str += "1"
    
        df = int(bits_str[0:5], 2)
    
        # Aircraft address (db - https://junzis.com/adb/?q=3b1c5c )
        bits_str = ""
        for p in range(8, 32):
            pos = start_data + bit_len * p
            p1, p2 = A[pos: pos + bit_len / 2], A[pos + bit_len / 2: pos + bit_len]
            avg1, avg2 = np.average(p1), np.average(p2)
            if avg1 < avg2:
                bits_str += "0"
            elif avg1 > avg2:
                bits_str += "1"
        # print "Aircraft address:", bits_str, hex(int(bits_str, 2))
        address = hex(int(bits_str, 2))
    
        # Filter specific aircraft (optional)
        # if address != "0x3c5ee2":
        #    return
    
        if df == 16 or df == 17 or df == 18 or df == 19 or df == 20 or df == 21:
            # print "Pos:", start, "DF:", msg_type
    
            # Data (56bit)
            bits_str = ""
            for p in range(32, 88):
                pos = start_data + bit_len*p
                p1, p2 = A[pos: pos + bit_len/2], A[pos + bit_len/2: pos + bit_len]
                avg1, avg2 = np.average(p1), np.average(p2)
                if avg1 < avg2:
                    bits_str += "0"
                    # bits[pos + bit_len / 2] = 50
                elif avg1 > avg2:
                    bits_str += "1"
    
            # http://www.lll.lu/~edward/edward/adsb/DecodingADSBposition.html
            # print "Data:"
            # print bits_str[:8], bits_str[8:20],  bits_str[20:22], bits_str[22:22+17], bits_str[39:39+17]
            # Type Code:
            tc, ec = int(bits_str[:5], 2), int(bits_str[5:8], 2)
            # print("DF:", df, "TC:", tc)
            
            # 1 - 4  Aircraft identification
            # 5 - 8  Surface position
            # 9 - 18  Airborne position (w/ Baro Altitude)
            # 19  Airborne velocities
    
            if tc >= 1 and tc <= 4: # and (df == 17 or df == 18):
                print("Aircraft address:", address)
                print("Data:")
                print(bits_str[:8], bits_str[8:14],  bits_str[14:20], bits_str[20:26], bits_str[26:32], bits_str[32:38], bits_str[38:44])
    
                symbols = "#ABCDEFGHIJKLMNOPQRSTUVWXYZ#####_###############0123456789######"
                code_str = ""
                for p in range(8):
                    c = int(bits_str[8 + 6*p:8 + 6*(p + 1)], 2)
                    code_str += symbols[c]
                print("Aircraft Identification:", code_str.replace('#', ''))
                print()
            if tc == 11:
                print("Aircraft address:", address)
                print("Data: (11)")
                print(bits_str[:8], bits_str[8:20],  bits_str[20:22], bits_str[22:22+17], bits_str[39:39+17])
    
                # Bit 22 contains the F flag which indicates which CPR format is used (odd or even)
                # First frame has F flag = 0 so is even and the second frame has F flag = 1 so odd
                # f = bits_str[21:22]
                # print("F:", int(f, 2))
    
                # Altitude
                alt1b = bits_str[8:20]
                if alt1b[-5] == '1':
                    bits = alt1b[:-5] + alt1b[-4:]
                    n = int(bits, 2)
                    alt_ft = n*25 - 1000
                    print("Alt (ft)", alt_ft)
    
                # lat_dec = int(bits_str[22:22+17], 2)
                # lon_dec = int(bits_str[39:39+17], 2)
                # print("Lat/Lon:", lat_dec, lon_dec)
    
                # http://airmetar.main.jp/radio/ADS-B%20Decoding%20Guide.pdf
                print()
            if tc == 19:
                print("Aircraft address:", address)
                print("Data:")
                # print(bits_str)
                print(bits_str[:5], bits_str[5:8], bits_str[8:10], bits_str[10:13], bits_str[13] ,bits_str[14:24], bits_str[24], bits_str[25:35], bits_str[35:36], bits_str[36:65])
    
                subtype = int(bits_str[5:8], 2)
                # https://mode-s.org/decode/adsb/airborne-velocity.html
                spd, hdg, rocd = -1, -1, -1
                if subtype == 1 or subtype == 2:
                    print("Velocity Subtype 1: Ground speed")
                
                    v_ew_sign = int(bits_str[13], 2)
                    v_ew = int(bits_str[14:24], 2) - 1       # east-west velocity
                    
                    v_ns_sign = int(bits_str[24], 2)
                    v_ns = int(bits_str[25:35], 2) - 1       # north-south velocity
                    
                    v_we = -1*v_ew if v_ew_sign else v_ew
                    v_sn = -1*v_ns if v_ns_sign else v_ns
                    
                    spd = math.sqrt(v_sn*v_sn + v_we*v_we)  # unit in kts
                    
                    hdg = math.atan2(v_we, v_sn)
                    hdg = math.degrees(hdg)                 # convert to degrees
                    hdg = hdg if hdg >= 0 else hdg + 360    # no negative val
                if subtype == 3:
                    print("Subtype Subtype 3: Airspeed")
                    hdg = int(bits_str[14:24], 2)/1024.0*360.0
                    spd = int(bits_str[25:35], 2)
                
                vr_sign = int(bits_str[36], 2)
                vr = int(bits_str[36:45], 2)
                rocd = -1*vr if vr_sign else vr         # rate of climb/descend
                print("Speed (kts):", spd, "Rate:", rocd, "Heading:", hdg)
                print()
    
            # print()
    
    def calc_coordinates():
        def _cprN(lat, is_odd):
            nl = _cprNL(lat) - is_odd
            return nl if nl > 1 else 1
    
        def _cprNL(lat):
            try:
                nz = 15
                a = 1 - math.cos(math.pi / (2 * nz))
                b = math.cos(math.pi / 180.0 * abs(lat)) ** 2
                nl = 2 * math.pi / (math.acos(1 - a/b))
                return int(math.floor(nl))
            except:
                # happens when latitude is +/-90 degree
                return 1
        
        def floor_(x):
            return int(math.floor(x))
      
        lat1b, lon1b, alt1b = "10111000111010011", "10000110111111000", "000101111001"
        lat2b, lon2b, alt2b = "10010011101011100", "10000011000011011", "000101110111"
        lat1, lon1, alt1 = int(lat1b, 2), int(lon1b, 2), int(alt1b, 2)
        lat2, lon2, alt2 = int(lat2b, 2), int(lon2b, 2), int(alt2b, 2)
        
        # 131072 is 2^17, since CPR lat and lon are 17 bits each
        cprlat_even, cprlon_even = lat1/131072.0, lon1/131072.0
        cprlat_odd, cprlon_odd = lat2/131072.0, lon2/131072.0
        print(cprlat_even, cprlon_even)
    
        j = floor_(59*cprlat_even - 60*cprlat_odd)
        print(j)
    
        air_d_lat_even = 360.0 / 60
        air_d_lat_odd = 360.0 / 59
    
        # Lat
        lat_even = float(air_d_lat_even * (j % 60 + cprlat_even))
        lat_odd = float(air_d_lat_odd * (j % 59 + cprlat_odd))
        if lat_even >= 270:
            lat_even = lat_even - 360
        if lat_odd >= 270:
            lat_odd = lat_odd - 360
    
        # Lon
        ni = _cprN(lat_even, 0)
        m = floor_(cprlon_even * (_cprNL(lat_even)-1) - cprlon_odd * _cprNL(lat_even) + 0.5)
        lon = (360.0 / ni) * (m % ni + cprlon_even)
        print("Lat", lat_even, "Lon", lon)
    
        # Altitude
        # Q-bit (bit 48) indicates whether the altitude is encoded in multiples of 25 or 100 ft (0: 100 ft, 1: 25 ft)
        # The value can represent altitudes from -1000 to +50175 ft.
        if alt1b[-5] == '1':
            bits = alt1b[:-5] + alt1b[-4:]
            n = int(bits, 2)
            alt_ft = n*25 - 1000
            print("Alt (ft)", alt_ft)
    
    
    fs, data = wavfile.read("adsb_20190311_191728Z_1090000kHz_RF.wav")
    T = 1/fs
    
    print("Sample rate %f MS/s" % (fs / 1e6))
    print("Cnt samples %d" % len(data))
    print("Duration: %f s" % (T * len(data)))
    
    data = data.astype(float)
    
    cnt = data.shape[0]
    # Processing only part on file (faster):
    # cnt = 10000000
    # data = data[:cnt]
    print("Processing I/Q...")
    I, Q = data[:, 0], data[:, 1]
    A = np.sqrt(I*I + Q*Q)
    
    bits = np.zeros(cnt)
    
    # To see scope without any processing, uncomment
    # plt.plot(A)
    # plt.show()
    # sys.exit(0)
    
    print("Extracting signals...")
    
    pos = 0
    avg = 200
    msg_start = 0
    # Find beginning of each signal
    while pos < cnt - 16*1024:
        # P1 - message start
        while pos < cnt - 16*1024:
            if A[pos] < avg and A[pos+1] > avg and pos - msg_start > 1000:
                msg_start = pos
                bits[pos] = 100
                pos += 4
                break
            pos += 1
    
        start1, start2, start3, start4 = msg_start, 0, 0, 0
        # P2
        while pos < cnt - 16*1024:
            if A[pos] < avg and A[pos+1] > avg:
                start2 = pos
                bits[pos] = 90
                pos += 1
                break
            pos += 1
        # P3
        while pos < cnt - 16*1024:
            if A[pos] < avg and A[pos+1] > avg:
                start3 = pos
                bits[pos] = 80
                pos += 1
                break
            pos += 1
        # P4
        while pos < cnt - 16*1024:
            if A[pos] < avg and A[pos+1] > avg:
                start4 = pos
                bits[pos] = 70
                pos += 1
                break
            pos += 1
    
    
        sig_diff = start4 - start1
        if 20 < sig_diff < 25:
            bits[msg_start] = 500
            bit_len = int((start4 - start1) / 4.5)
            # print(pos, start1, start4, ' - ', bit_len)
            # start = start1 + 8*bit_len
            parse_message(A, msg_start, bit_len)
    
            pos += 450
    
    # For debugging: check signal start
    # plt.plot(A)
    # plt.plot(bits)
    # plt.show()
    


    Надеюсь, кому-то было интересно, спасибо за внимание.
    Поделиться публикацией

    Комментарии 13

      +1
      чтобы комфортно разобрать его компоненты, желателен SDR-приемник с частотой дискретизации не менее 5МГц. 

      А как тогда rtl-sdr его обрабатывает? У него ведь полоса максимум 3.2 МГц?
        +1
        Полоса и частота дискретизации очень разные понятия. Простыми словами, последнее, можно назвать «разрешением»
          0
          Виноват, правильно, частота дискретизации у ртлсдр максимум 3.2 МГц, а полоса получается 2.56 МГц максимум?
            0
            При 5МГц разрешение на графике получается таким:
            Image


            Как можно видеть, количество отсчетов на бит уже не очень, а если еще в 2 раза уменьшить, картинка получается весьма «смазанной».
              0
              Для полноты картины добавил скриншот с rtl-sdr при 3.2МГц:
              График

            0
            Смотреть на rtl-sdr можно, просто некомфортно, мало отсчетов на символ получается.
            +1
            Было б интересно узнать, как у них бэкенд устроен
              0
              Спасибо большое за столь интересную статью!
                +4
                Проверить самолет по его коду можно на сайте (к сожалению, автор перестал обновлять базу, но она еще актуальна)

                еще есть airframes.org

                по RA номерам есть такой способ
                ADS-B hex-код, это 24-х битное 16-тиричное число, содержащее в случае Российской регистрации следующую информацию: страну, тип/принадлежность судна, регистрационный номер.

                первые 4 бита означают страну согласно распределению кодов ИКАО. {0001} — код РФ. С полным списком можно ознакомиться здесь www.aerohelp.ru/data/432/an10_v3_cons_ru.pdf
                следующие 3 бита — принадлежность судна. В случае местного реестра встречаются случаи: {010} — магистральное судно ГА и {111} — принадлежность к стороннему ведомству (в нашем случае ЗАО «ГСС», то есть для экспериментальных и экспортных для SJI бортов — 97XXX)
                оставшиеся 17 бит содержат бортовой номер воздушного судна или тип радиоизлучаемого оборудования (второе — не наш случай).

                Например, hex код 155BBE (число в шестнадцатеричной системе) в двоичной системе будет выглядеть как {000101010101101110111110}.

                Биты {0001} — РФ
                Биты {010} — ГА
                Последние 17 бит — это номер ВС. В данном случае {10101101110111110} это 89022 (в десятичной системе).

                В итоге получаем RA-89022
                источник
                  0
                  Спасибо, интересно.
                    0
                    Добавил в статью ссылку на ваш комментарий.
                    0
                    Очень познавательная серия. Для меня, как бортпроводника, очень интересно будет пообщаться с пилотами на эту тему.

                  Только полноправные пользователи могут оставлять комментарии. Войдите, пожалуйста.

                  Самое читаемое