MSK144 — цифровой протокол, разработанный Джо Тейлором (K1JT) и его командой в 2016 году для проведения связей через метеорное рассеивание.

В предыдущей части были рассмотрены общие характеристики протокола, а также алгоритмы, формирующие сигнал. В этой части статьи рассматриваются механизмы детектирования и декодирования сигналов в протоколе MSK144.

Статья может быть интересна радиолюбителям и людям, интересующимся темой цифровой обработки сигналов.

Общее представление

Как было рассмотрено в предыдущей статье, формирование сигнала, отправляемого в радиоэфир, состоит из этапов упаковки бит данных, расчета контрольной суммы, вычислению LDPC-кодов, после чего, к данным добавляются маркеры (или якоря) синхронизации, состоящие из последовательности бит 01110010, прежде чем всё это преобразовать в дискретный сигнал звуковой частоты.

Рисунок 1: Общая схема протокола (абстракция по уровням OSI условная).
Рисунок 1: Общая схема протокола (абстракция по уровням OSI условная).

Обратная задача, при приеме сигнала, заключается в поиске маркера синхронизации с последующей фазовой и частотной коррекцией сигнала, после чего, выполняя IQ преобразования, выделить аналоговое представление бит данных, которые далее проходят процесс декодирования, сверки контрольных сумм и так далее, пока на выходе не останутся только данные сообщения.

Рисунок 2: Общее представление процесса декодирования.
Рисунок 2: Общее представление процесса декодирования.

Анализ сигнала

Анализ сигнала состоит из нескольких этапов (рисунок 3):

  • фильтрация сигнала;

  • детекция сигнала;

  • декодирования фрейма.

Рисунок 3: Пайплайн обработки сигнала.
Рисунок 3: Пайплайн обработки сигнала.

Фильтрация сигнала

Перед тем, как начать анализировать сигнал, осуществляется фильтрация сигнала полосовым фильтром, задача которого исключить незадействованные области частот анализируемого сигнала и оставить только ту полосу, в которой присутствуют интересующие сигналы с данными.

В MSK144 применяется полосовой фильтр на основе преобразований Фурье. Отличительными особенностями фильтров такого типа является то, что фильтрация осуществляется в частотной области, что позволяет добиться высокой степени аттенюации, конфигурировать АЧХ фильтра с любой степенью наклона и задание множества полос частот; а также, помимо аттенюации, осуществлять усиление сигнала (эквализация).

На рисунке 4 показана схема фильтра.

Рисунок 4: Фильтр на основе преобразования Фурье.
Рисунок 4: Фильтр на основе преобразования Фурье.

Принцип работы фильтра заключается в том, что исходный сигнал подвергается быстрому преобразованию Фурье, результатом которого является представление сигнала в комплексной форме, в виде амплитуд и фаз в разрезе частот; после умножения амплитудных значений на требуемую характеристику, осуществляется обратное преобразование Фурье, позволяющее обратно получить аналоговый сигнал.

В Python реализовать фильтр на основе преобразования Фурье можно используя функционал библиотеки numpy.

import numpy as np
import numpy.typing as npt


def fourier_bpf(signal: npt.NDArray[np.float64], n_fft: int,
               response: npt.NDArray[np.float64]) -> npt.NDArray[np.complex128]:
    freq_d = np.fft.fft(signal, n=n_fft)


    response_len = len(response)
    freq_d[:response_len] *= response
    freq_d[0] = 0.5 * freq_d[0]
    freq_d[response_len: n_fft] = complex(0, 0)


    time_d = np.fft.ifft(freq_d, n=n_fft)
    return time_d

На вход функции fourier_bpf подается исходный сигнал и АЧХ фильтра, задаваемая аргументом response. Аргумент n_fft определяет размер блока данных для преобразования Фурье; значение задается степенью числа 2.

Частоты сигналов MSK144 в основном располагаются в диапазоне от 1.0 КГц до 2.0 КГц, фильтр должен подавлять все остальные частоты, находящиеся за пределами диапазона (рисунок 6).

На рисунке 5 представлена АЧХ фильтра для фильтрации сигналов MSK144. Так как сигналы подвержены частотному смещению, полоса пропускания фильтра имеет несколько большую ширину захвата.

Рисунок 5: АЧХ БПФ фильтра.
Рисунок 5: АЧХ БПФ фильтра.

За формирование кривой АЧХ фильтра отвечает функция filter_response.

MSK144_CENTER_FREQ = 1500


def filter_response(n_fft: int, sample_rate: int) -> npt.NDArray[np.float64]:
    t = 1 / 2000
    beta = 0.1
    df = sample_rate / n_fft

    nh = (n_fft // 2) + 1

    response = np.full(nh, 1, dtype=np.float64)
    for i in range(nh):
        f = abs(df * i - MSK144_CENTER_FREQ)

        if (1 + beta) / (2 * t) >= f > (1 - beta) / (2 * t):
            response[i] = 0.5 * (1 + np.cos((np.pi * t / beta) * (f - (1 - beta) / (2 * t))))

        elif f > (1 + beta) / (2 * t):
            response[i] = 0

    return response

Функция принимает на вход размер выборки преобразования Фурье n_fft, и частоту дискретизации сигнала sample_rate. Результатом является массив значений от 0 до 1.

Кривая АЧХ описывается уравнением:

f(x) = \frac{1}{2}\left( 1+\cos(\pi\cdot\frac{t}{\beta})\cdot (x-\frac{1-\beta}{2t}) \right)

Точки, соответствующие аттенюации в 3дБ (что соответствует 0.5 от исходного значения амплитуды) соответствуют частотам 0.5 КГц и 2.5 КГц соответственно. Гладкость характеристики в зонах аттенюации определяется функцией косинуса а ширина значением константы beta, чем меньше значение, тем сильнее кривая будет стремиться к прямоугольной форме.

На рисунке 5 изображены спектры сигналов до и после применения фильтра с характеристикой filter_response.

Рисунок 6: Спектры сигналов до и после фильтрации.
Рисунок 6: Спектры сигналов до и после фильтрации.

Недостатками фильтров такого типа являются требования в вычислительным ресурсам, а также эффект Гиббса — то есть появление нежелательных осцилляций на границах сигнала, что требует применения к сигналу сглаживающих оконных функций.

Частотное смещение

Сигналы протокола MSK144 формируются в слышимом человеческим ухом диапазоне частот, с тем расчетом, что их прием и передача в радиоэфире осуществляется с помощью радиолюбительского трансивера, рассчитанного на передачу голоса; так как опорные генераторы в любительских трансиверах имеют некоторую погрешность в точности частоты, то отправляемые и принимаемые сигналы имеют частотный дрейф, в связи с этим, принятый сигнал необходимо подстраивать под частоту, на которой будет происходить декодирование.

Смещение частоты сигнала реализуется функцией shift_freq.

def shift_freq(complex_signal: npt.NDArray[np.complex128],
              freq: float, sample_rate: int) -> npt.NDArray[np.complex128]:
    phi = 2 * np.pi * freq / sample_rate
    step = complex(np.cos(phi), np.sin(phi))

    phasor = complex(1, 1)
    signal = np.zeros(len(complex_signal), dtype=np.complex128)

    for i, cs_val in enumerate(complex_signal):
        phasor *= step
        signal[i] = phasor * cs_val

    return signal

Функция shift_freq принимает на вход отфильтрованный преобразованиями Фурье сигнал в комплексном представлении, аргумент freq определяет на сколько герц необходимо сместить сигнал; sample_rate — частота дискретизации сигнала.

Функция реализует фазовое вращение сигнала в каждой точке на угол phi, соответствующему смещению на заданную частоту. Фазор реализует комплексную экспоненту.

В общем виде смещение может быть записано как: z(t) \mapsto z(t) e^{j 2 \pi f t}.

Детекция сигнала

Задача детекции сигнала заключается в коррекции фазовых и частотных ошибок сигнала и синхронизация по маркерам.

Функция детектора состоит из этапов:

  • поиск сигналов-кандидатов по амплитуде;

  • коррекция фазы и частотной ошибки;

  • синхронизация;

  • выделение фрейма.

Этап фазовой коррекции применяется для снижения межсимвольных интерференций сигнала, связанных с тем, что в большинстве трансиверов используется узкополосные фильтры, способные вносить фазовые изменения сигнала.

Маркер синхронизации

Для выделения маркера с помощью автокорреляционной функции, формируется эталонный, искомый сигнал.

MSK144_SYNC = [0, 1, 1, 1, 0, 0, 1, 0]  # 0x72 in binary

SAMPLES_PER_WORD = 12
SAMPLES_PER_HALF_WORD = SAMPLES_PER_WORD // 2
WORD_SAMPLES = np.sin([i * np.pi / SAMPLES_PER_WORD for i in range(SAMPLES_PER_WORD)])

SYNC_WORDS = np.array([2 * b - 1 for b in MSK144_SYNC])

SYNC_I = np.array([
    sample * SYNC_WORDS[j * 2 + 1]
    for j in range(4)
    for sample in WORD_SAMPLES
])

SYNC_Q = np.array([
    pp * SYNC_WORDS[j * 2]
    for j in range(4)
    for pp in (WORD_SAMPLES[SAMPLES_PER_HALF_WORD:] if j == 0 else WORD_SAMPLES)
])

SYNC_WAVEFORM = np.array([complex(SYNC_I[i], SYNC_Q[i]) for i in range(42)])
SYNC_WAVEFORM_LEN = len(SYNC_WAVEFORM)
SYNC_WAVEFORM_CONJ = np.conj(SYNC_WAVEFORM)

В MSK144 для маркера синхронизации используется последовательность бит 01110010 (0x72), задаваемая в MSK144_SYNC.

WORD_SAMPLES определяет дискретное представление участка сигнала для формирования символа MSK.

В SYNC_WORDS формируется последовательность сдвигов частоты, в соответствии с последовательностью бит в MSK144_SYNC.

Формирование сигнала с MSK манипуляцией осуществляется в комплексной форме, через IQ преобразование, четные биты записываются в SYNC_Q, нечетные в SYNC_I соответственно (рисунок 7).

Рисунок 7: графики I/Q сигналов MSK144_SYNC.
Рисунок 7: графики I/Q сигналов MSK144_SYNC.

Результирующий сигнал SYNC_WAVEFORM записывается в комплексном виде, где синфазная составляющая часть сигнала записывается в вещественную часть значений, а квадратурная в мнимую.

SYNC_WAVEFORM_CONJ хранит комплексно-сопряженное представление SYNC_WAVEFORM, используемое в вычислении автокорреляции.

Поиск кандидатов

При поиске кандидатов происходит анализ спектра сигнала в полосе захвата, оценка силы сигнала, его соотношения сигнал/шум и определение погрешности частоты.

Детектор сигналов реализуется функцией detect_signals, являющаяся методом класса MSK144Monitor.

MSK144_NSPM = 864
MSK144_NPTS = 3 * MSK144_NSPM
MSK144_NFFT = 864

MSK144_FREQ_CENTER = 1500
MSK144_FREQ_HI = MSK144_FREQ_CENTER + 500
MSK144_FREQ_LO = MSK144_FREQ_CENTER - 500

SMOOTH_WINDOW_LEN = SAMPLES_PER_WORD
SMOOTH_WINDOW = (1 - np.cos([i * np.pi / SMOOTH_WINDOW_LEN for i in range(SMOOTH_WINDOW_LEN)])) / 2

def detect_signals(self, signal: npt.NDArray[np.complex128], signal_len: int, start: float,
                  max_cand: int = 16, tolerance: int = 150) -> typing.Generator[LogItem, None, None]:
    ...

Функция detect_signals принимает на вход отфильтрованный сигнал signal в комплексном виде, параметр signal_len определяет общую длину исходного сигнала (так как после фильтра на основе преобразований Фурье результирующие массивы по длине отличаются от исходного сигнала, параметр передается отдельно).

Параметр start определяет временную позицию в сигнале, относительно которой необходимо осуществлять анализ (например для ручного анализа участка сигнала).

Параметр tolerance позволяет задать ширину полосы частот анализируемого сигнала.

    dt = 1 / self.sample_rate
    df = self.sample_rate / MSK144_NFFT

    f_hi = MSK144_FREQ_HI * 2
    f_lo = MSK144_FREQ_LO * 2

    f_hi_id_top = int((f_hi + 2 * tolerance) / df)
    f_hi_id_bot = int((f_hi - 2 * tolerance) / df)

    f_lo_id_top = int((f_lo + 2 * tolerance) / df)
    f_lo_id_bot = int((f_lo - 2 * tolerance) / df)

    f_2Khz = int(f_lo / df)
    f_4Khz = int(f_hi / df)
    ...

Переменные dt и df определяют минимальный временной шаг, относительно частоты дискретизации, и минимальный шаг изменения частоты в спектре после преобразования Фурье.

Значения f_hi и f_lo определяют минимальную и максимальную частоты MSK модуляции.

Группы переменных "f_hi_id_top, f_hi_id_bot" и "f_lo_id_top, f_lo_id_bot" определяют полосу частот, в рамках которых будет производиться анализ.

    step_size = MSK144_NSPM // 4
    steps_count = (signal_len - MSK144_NPTS) // step_size


    det_amp = np.zeros(steps_count)
    det_snr = np.zeros(steps_count)
    det_f_err = np.full(steps_count, -999.99)
    ...

Блок массивов "det_amp, det_snr, det_f_err" аккумулирует параметры амплитуд, SNR и отклонения частот на каждой итерации анализа участка сигнала.

    steps_real = 0
    for step in range(steps_count):
        part_start = step_size * step
        part_end = part_start + MSK144_NSPM

        if part_end > signal_len:
            break

        part = signal[part_start: part_end]
        part = part ** 2
        part[:SMOOTH_WINDOW_LEN] *= SMOOTH_WINDOW
        part[MSK144_NSPM - SMOOTH_WINDOW_LEN:MSK144_NSPM] *= SMOOTH_WINDOW[::-1]

        spec = np.fft.fft(part, MSK144_NFFT)
        amps = np.abs(spec) ** 2
    ...

В цикле до steps_count из общего сигнала выделяется участок part размером MSK144_NSPM, который далее возводится в степень 2 (part ** 2), для выделения несущих тонов в спектре и повышения их амплитуд относительно шума (по этой причине ранее объявленные переменные со значениями частот умножались на 2).

Затем к границам сигнала применяется сглаживающая функция SMOOTH_WINDOW, чья характеристика соответствует оконной функции Ханна, изображенная на рисунке 8.

\sigma(t)=\frac{1}{2}\left( 1 - \cos(\frac{\pi\cdot t}{T}) \right)
Рисунок 8: График сглаживающей функции.
Рисунок 8: График сглаживающей функции.

Быстрое преобразование Фурье позволяет получить спектр (spec) и АЧХ (amps) анализируемого участка сигнала.

        # Hi freq
        peak_hi = f_hi_id_bot + np.argmax(amps[f_hi_id_bot:f_hi_id_top])

        delta_hi = -((spec[peak_hi - 1] - spec[peak_hi + 1]) / (
               2 * spec[peak_hi] - spec[peak_hi - 1] - spec[peak_hi + 1])).real
        amp_hi = amps[peak_hi]

        amp_avg_hi = (np.sum(amps[f_hi_id_bot:f_hi_id_top]) - amp_hi) / (f_hi_id_top - f_hi_id_bot)
        snr_hi = amp_hi / (amp_avg_hi + 0.01)
    ...

На участке частот спектра в окрестностях частоты 4.0 КГц ищется максимальное значение peak_hi. Значение delta_hi определяет смещение частоты пика. Значение переменной amp_avg_hi определяет среднее значение амплитуд шума для расчета значения SNR в переменной snr_hi.

Аналогичным образом осуществляется поиск и оценка шумовой картины для частоты 2.0 КГц.

        # Low freq
        peak_lo = f_lo_id_bot + np.argmax(amps[f_lo_id_bot:f_lo_id_top])

        delta_lo = -((spec[peak_lo - 1] - spec[peak_lo + 1]) / (
               2 * spec[peak_lo] - spec[peak_lo - 1] - spec[peak_lo + 1])).real
        amp_lo = amps[peak_lo]

        amp_avg_lo = (np.sum(amps[f_lo_id_bot:f_lo_id_top]) - amp_lo) / (f_lo_id_top - f_lo_id_bot)
        snr_lo = amp_lo / (amp_avg_lo + 0.01)

        # Errors
        f_err_hi = (peak_hi + delta_hi - f_4Khz) * df / 2
        f_err_lo = (peak_lo + delta_lo - f_2Khz) * df / 2

        f_err = f_err_hi if amp_hi >= amp_lo else f_err_lo
    ...

Значения переменных f_err_hi и f_err_lo определяют разность между искомой и найденной частотами.

Тройка массивов "det_amp, det_snr, det_f_err" для каждого участка проанализированного сигнала содержит амплитуды сигналов, значение SNR и значение частотной погрешности.

Следующий этап заключается в отборе сигналов-кандидатов, на основе ранее результатов детектора частот.

    indices = np.argsort(det_amp[:steps_real])

    amp_median = det_amp[indices[steps_real // 4]]
    if amp_median == 0:
        amp_median = 1

    det_amp[:steps_real] /= amp_median
    ...

Амплитудные значения во временных окнах нормализуются относительно медианного значения.

    time_arr = np.zeros(max_cand, dtype=np.float64)
    freq_arr = np.zeros(max_cand, dtype=np.float64)
    snr_arr = np.zeros(max_cand, dtype=np.float64)

    count_cand = 0
    for ip in range(max_cand):
        il = np.argmax(det_amp[:steps_real])

        if det_amp[il] < 3.5:
            break

        if abs(det_f_err[il]) <= tolerance:
            time_arr[count_cand] = ((il - 0) * step_size + MSK144_NSPM / 2) * dt
            freq_arr[count_cand] = det_f_err[il]
            snr_arr[count_cand] = 12 * np.log10(det_amp[il]) / 2 - 9

            count_cand += 1

        det_amp[il] = 0
    ...

В цикле из общего скопа задетектированных частот отбираются только те, которые не превышают заданную ошибку несущей частоты и имеют достаточное амплитудное значение, превышающее порог в 3.5.

Пороговое значение в 3.5 выбрано как минимально приемлемое значение для решения задач обработки акустических сигналов, так как меньшие значения считаются близкими к уровню шума и плохо пригодными для достоверного извлечения полезного сигнала.

    if count_cand < 3:
        for ip in range(max_cand - count_cand):
            if ip >= max_cand - count_cand:
                break

            il = np.argmax(det_snr[:steps_real])

            if det_snr[il] < 12.0:
                break

            if abs(det_f_err[il]) <= tolerance:
                time_arr[count_cand] = ((il - 0) * step_size + MSK144_NSPM / 2) * dt
                freq_arr[count_cand] = det_f_err[il]
                snr_arr[count_cand] = 12 * np.log10(det_snr[il]) / 2 - 9

                count_cand += 1

            det_snr[il] = 0
    ...

Если отобранных кандидатов по критерию амплитуды сигнала нашлось менее 3, дополнительно осуществляется поиск кандидатов по SNR. Такой механизм позволяет обнаруживать слабые сигналы при тропосферном прохождении.

        if count_cand > 0:
            indices = np.argsort(time_arr[:count_cand])

        hashes = set()
        for cand in range(count_cand):
            with suppress(NextCand):
                cand_idx = indices[cand]
                frame_mid = int(time_arr[cand_idx] * self.sample_rate)

                if frame_mid < MSK144_NPTS / 2:
                    frame_mid = MSK144_NPTS // 2

                if frame_mid > signal_len - MSK144_NPTS / 2:
                    frame_mid = signal_len - MSK144_NPTS // 2

                t0 = time_arr[cand_idx] + dt * (start)

                part = signal[frame_mid - MSK144_NPTS // 2: frame_mid + MSK144_NPTS // 2]

                f_err = freq_arr[cand_idx]
                snr = 2 * int(snr_arr[cand_idx] / 2)
                snr = max(-4.0, min(24.0, snr))

                part = self.shift_freq(part, -(MSK144_FREQ_CENTER + f_err), self.sample_rate)
    ...

Для каждого кандидата выбирается участок сигнала из найденных временных границ, производится коррекция ошибки несущей частоты функцией shift_freq.

Синхронизация

Следующий этап — синхронизация с найденными сигналами.

               sync1 = np.zeros(MSK144_NPTS, dtype=np.complex128)
               sync2 = np.zeros(MSK144_NPTS, dtype=np.complex128)

               for i in range(MSK144_NPTS - (56 * 6 + 42)):
                   sync1[i] = sum(part[i: i + SYNC_WAVEFORM_LEN] * SYNC_WAVEFORM_CONJ)
                   sync2[i] = sum(part[i + 56 * 6: i + 56 * 6 + SYNC_WAVEFORM_LEN] * SYNC_WAVEFORM_CONJ)

               sync_corr = abs(sync1) * abs(sync2)
    ...

В sync1 и sync2 считаются значения АКФ сигнала и ранее рассмотренного маркера синхронизации SYNC_WAVEFORM_CONJ.

Значение sync_corr определяет общее значение АКФ для обоих маркеров синхронизации в сигнале.

               peaks = []
               for i in range(6):
                   peak = np.argmax(sync_corr)
                   sync_corr[max(0, peak - 7): min(MSK144_NPTS - 56 * 6 - 42, peak + 7)] = 0.0
                   peaks.append(peak)
    ...

Формируется список шести пиков АКФ маркеров синхронизации для последующей синхронизации.

              for peak in peaks:
                  peak_corr = np.zeros(6, dtype=np.complex128)
                  for i in range(6):
                      j = peak + i

                      if peak + 11 + MSK144_NSPM < MSK144_NPTS:
                          s1 = slice(j + 6, j + 6 + MSK144_NSPM, 6)
                          s2 = slice(j, j + MSK144_NSPM, 6)
                      else:
                          s1 = slice(j + 6, MSK144_NPTS, 6)
                          s2 = slice(j, MSK144_NPTS - 6, 6)

                      peak_corr[i] = np.sum((part[s1] * np.conj(part[s2])) ** 2)

                  peak_corr_idx = np.argmax(np.abs(peak_corr))

                  if peak_corr_idx <= 2:
                      peak_corr_idx -= 1
                  if peak_corr_idx > 2:
                      peak_corr_idx -= 7
    ...

Для найденных шести кипов вычисляется корреляция для тонкой подстройки джиттера, чтобы попасть в центр глазковой диаграммы.

                  for id in range(3):
                      if id == 1:
                          sign = -1
                      elif id == 2:
                          sign = 1
                      else:
                          sign = 0

                      ic = peak + peak_corr_idx + sign

                      if ic < 0:
                          ic = ic + MSK144_NSPM

                      cca = np.sum(part[ic:ic + SYNC_WAVEFORM_LEN] * SYNC_WAVEFORM_CONJ)
                      if ic + 56 * 6 + 42 < MSK144_NPTS:
                          ccb = np.sum(part[ic + 56 * 6:ic + 56 * 6 + SYNC_WAVEFORM_LEN] * SYNC_WAVEFORM_CONJ)
                          cfac = ccb * np.conj(cca)
                          f_err_2 = np.atan2(cfac.imag, cfac.real) / (2 * np.pi * 56 * 6 * dt)
                      else:
                          ccb = np.sum(part[ic - 88 * 6:ic - 88 * 6 + SYNC_WAVEFORM_LEN] * SYNC_WAVEFORM_CONJ)
                          cfac = ccb * np.conj(cca)
                          f_err_2 = np.atan2(cfac.imag, cfac.real) / (2 * np.pi * 88 * 6 * dt)

                      freq_est = int(MSK144_FREQ_CENTER + f_err + f_err_2)
    ...

В freq_est вычисляется итоговое значение ошибки отклонения несущей частоты.

Выделение фрейма

Финальный этап детекции — выделение фрейма с сигналом для передачи его в декодер.

                      for idf in range(5):
                          if idf == 0:
                              delta_f = 0
                          elif idf % 2 == 0:
                              delta_f = idf
                          else:
                              delta_f = -(idf + 1)

                          subpart = self.shift_freq(part, -(f_err_2 + delta_f), self.sample_rate)
                          subpart = np.roll(subpart, -(ic - MSK144_NSPM))

                          for avg_pattern in range(8):
                              if avg_pattern == 0:
                                  frame = subpart[MSK144_NSPM: MSK144_NSPM + MSK144_NSPM]
                              elif avg_pattern == 1:
                                  frame = subpart[MSK144_NSPM - 432: MSK144_NSPM - 432 + MSK144_NSPM]
                                  frame = np.roll(frame, 432)
                              elif avg_pattern == 2:
                                  frame = subpart[2 * MSK144_NSPM - 432: 2 * MSK144_NSPM - 432 + MSK144_NSPM]
                                  frame = np.roll(frame, 432)
                              elif avg_pattern == 3:
                                  frame = subpart[:MSK144_NSPM]
                              elif avg_pattern == 4:
                                  frame = subpart[2 * MSK144_NSPM: 2 * MSK144_NSPM + MSK144_NSPM]
                              elif avg_pattern == 5:
                                  frame = subpart[:MSK144_NSPM] + subpart[MSK144_NSPM:MSK144_NSPM + MSK144_NSPM]
                              elif avg_pattern == 6:
                                  frame = subpart[MSK144_NSPM: MSK144_NSPM + MSK144_NSPM] + subpart[2 * MSK144_NSPM: 2 * MSK144_NSPM + MSK144_NSPM]
                              elif avg_pattern == 7:
                                  frame = subpart[:MSK144_NSPM] + subpart[MSK144_NSPM:MSK144_NSPM + MSK144_NSPM] + subpart[2 * MSK144_NSPM: 2 * MSK144_NSPM + MSK144_NSPM]
    ...

Перебор значений delta_f реализует частотный джиттеринг, используемый для тонкой коррекции частотной ошибки, которая могла быть недостаточно точно скомпенсирована на предыдущих этапах.

В subpart содержится скорректированная по частоте и выровненная по времени часть анализируемого сигнала.

Значение avg_pattern определяет схему выборки фрейма для декодирования.

Скорректированный по частоте и подогнанный под временные границы фрейм отправляется в функцию декодирования. Цикл джиттеринга и перебора схем прерывается в случае успешного декодирования фрейма и извлечения из него данных.

                              if x := self.decode_fame(frame, self.LDPC_ITERATIONS):
                                  status, payload, eye_opening, bit_errors = x
                                  if status.crc_extracted in hashes:
                                      raise NextCand
                                  hashes.add(status.crc_extracted)
                                  df_hv = freq_est - MSK144_FREQ_CENTER
                                  log_item = LogItem(
                                      snr=snr,
                                      dT=t0,
                                      dF=df_hv,
                                      p_avg=avg_pattern + 1,
                                      freq=freq_est,
                                      eye_open=eye_opening,
                                      bit_err=bit_errors,

                                      payload=payload,
                                      crc=status.crc_extracted
                                  )
                                  yield log_item

                                  raise NextCand
          if len(hashes) >= 3:
              break

Результат декодирования заносится в LogItem, хранящий в себе информацию о принятом сигнале, его качестве, а также декодированное сообщение.

При декодировании трех и более сообщений в сигнале, цикл декодирования прерывается.

Описание датакласса LogItem.

from dataclasses import dataclass


@dataclass
class LogItem:
   snr: float
   dT: float
   dF: float
   p_avg: int
   freq: int
   eye_open: float
   bit_err: int

   payload: typing.ByteString
   crc: int

Описание класса-исключения для управления циклом обработки кандидатов.

class NextCand(Exception):
   pass

Декодирование фрейма

Разлагая участки сигнала, в которых, предположительно, должны содержаться биты данных, на синфазную и квадратурную составляющие и сравнивая значения с эталонным сигналом, получая тем самым коэффициент правдоподобия.

Декодирование осуществляется функцией decode_fame.

Функция принимает на вход данные декодируемого фрейма, извлеченные в детекторе. Аргумент max_iterations определяет сколько раз декодер LDPC будет осуществлять попыток восстановить данные, прежде, чем они будут отбракованы.

def decode_fame(self, frame: npt.NDArray[np.complex128],
               max_iterations: int) -> typing.Optional[typing.Tuple[DecodeStatus, typing.ByteString, float, int]]:
    sync_corr_1 = sum(frame[:SYNC_WAVEFORM_LEN] * SYNC_WAVEFORM_CONJ)
    sync_corr_2 = sum(frame[56 * 6: 56 * 6 + SYNC_WAVEFORM_LEN] * SYNC_WAVEFORM_CONJ)
    sync_corr = sync_corr_1 + sync_corr_2

    phase_0 = np.atan2(sync_corr.imag, sync_corr.real)
    corr_factor = complex(np.cos(phase_0), np.sin(phase_0))

    frame *= corr_factor.conjugate()
    ...

Для фазовой коррекции сигнала, происходит вычисление корреляции по маркерным сигналам (sync_corr, sync_corr_1, sync_corr_2). Фазовая коррекция осуществляется умножением сигнала на комплексное сопряженное значение фазы corr_factor.

    soft_bits = np.zeros(MSK144_BITS_COUNT, dtype=np.float64)
    hard_bits = np.zeros(MSK144_BITS_COUNT, dtype=np.int64)

    for i in range(6):
        soft_bits[0] += frame[i].imag * WORD_SAMPLES[i + 6]
        soft_bits[0] += frame[i + (MSK144_NSPM - 5 - 1)].imag * WORD_SAMPLES[i]

    for i in range(12):
        soft_bits[1] += frame[i].real * WORD_SAMPLES[i]

    for i in range(1, 72):
        soft_sum = 0
        for j in range(12):
            soft_sum += frame[i * 12 - 6 + j].imag * WORD_SAMPLES[j]
        soft_bits[2 * i] = soft_sum

        soft_sum = 0
        for j in range(12):
            soft_sum += frame[i * 12 + j].real * WORD_SAMPLES[j]
        soft_bits[2 * i + 1] = soft_sum

        if soft_bits[2 * i] >= 0:
           hard_bits[2 * i] = 1

        if soft_bits[2 * i + 1] >= 0:
            hard_bits[2 * i + 1] = 1

    if soft_bits[0] >= 0:
        hard_bits[0] = 1

    if soft_bits[1] >= 0:
        hard_bits[1] = 1
    ...

В soft_bits аккумулируются коэффициенты правдоподобия бит данных для синфазной и квадратурной составляющих сигнала (real — I, imag — Q).

    bad_sync_1 = 0
    bad_sync_2 = 0
    for i, sw in enumerate(SYNC_WORDS):
        bad_sync_1 += (2 * hard_bits[i] - 1) * sw
        bad_sync_2 += ((2 * hard_bits[i + 57 - 1] - 1) * sw)

    bad_sync_1 = (MSK144_SYNC_LEN - bad_sync_1) // 2
    bad_sync_2 = (MSK144_SYNC_LEN - bad_sync_2) // 2

    bad_sync = bad_sync_1 + bad_sync_2
    if bad_sync > 4:
        return
    ...

Переменная hard_bits используется для оценки точности синхронизации bad_sync.

    soft_bits_std = np.std(soft_bits)
    soft_bits /= soft_bits_std
    ...

Коэффициенты правдоподобия бит нормализуются через стандартное отклонение.

    sigma = 0.60

    soft_bits_128 = np.concat([soft_bits[8: 9 + 47], soft_bits[64: 65 + 80 - 1]])
    log128 = 2 * soft_bits_128 / sigma ** 2
    ...

В soft_bits_128 конкатенируются информационные биты, тем самым исключаются биты маркеров синхронизации.

Нормализованные логарифмические значения коэффициентов правдоподобия передаются в декодер LDPC. Декодирование LDPC алгоритмом Belief Propagation, возвращает последовательность двоичных бит.

    ldpc_errors, plain128 = bp_decode(log128, max_iterations)

    if ldpc_errors > self.MAX_LDPC_ERRORS:
        return None
    ...

В plain128 содержатся декодированные методом Belief Propagation двоичные значения бит.

Значение ldpc_errors определяет количество ошибок бит остались неразрешенными в процессе декодирования. Если значение превышает порог MAX_LDPC_ERRORS, декодируемый фрейм исключается.

Финальный этап — проверка контрольных сумм и формирование отчета о качестве сигнала.

    if not mskx_check_crc(plain128):
        return None

    payload = self.pack_bits(plain128, MSKX_LDPC_K)
    crc_extracted = mskx_extract_crc(payload)

    tones = list(msk144_encode(payload))
    msg_bits = np.zeros(MSK144_BITS_COUNT)
    for i in range(MSK144_BITS_COUNT - 1):
        j = i + 1
        if tones[i] == 0:
            msg_bits[j] = msg_bits[i] if j % 2 == 1 else (msg_bits[i] + 1) % 2
        else:
            msg_bits[j] = (msg_bits[i] + 1) % 2 if j % 2 == 1 else msg_bits[i]

    eye_top = 1
    eye_bot = -1
    for i in range(MSK144_BITS_COUNT):
        if msg_bits[i] == 1:
            eye_top = min(soft_bits[i], eye_top)
        else:
            eye_bot = max(soft_bits[i], eye_bot)
    eye_opening = eye_top - eye_bot

    bit_errors = np.count_nonzero(hard_bits != msg_bits)

    return DecodeStatus(ldpc_errors, crc_extracted), payload, eye_opening, bit_errors

В payload содержатся данные сообщения в байтовом представлении.

crc_extracted — контрольная сумма сообщения.

Формирование tones и msg_bits используется для оценки BER (bit error rate).

На основе значений soft_bits производится оценка сигнала по глазковой диаграмме (рисунки 9, 10).

Рисунок 9: Глазковая диаграмма для I составляющей фрейма.
Рисунок 9: Глазковая диаграмма для I составляющей фрейма.
Рисунок 10: Глазковая диаграмма для Q составляющей фрейма.
Рисунок 10: Глазковая диаграмма для Q составляющей фрейма.

Проверка CRC-13

При извлечении двоичных бит данных сообщения, осуществляется проверка корректности контрольной суммы принятого пакета.

MSKX_CRC_WIDTH = 13
MSKX_LDPC_K = 90

def mskx_crc(msg_bits: npt.NDArray[np.uint8], msg_len: int) -> npt.NDArray[np.uint8]:
   div = [1, 1, 0, 1, 0, 1, 1, 1, 0, 1, 0, 1, 1, 1]

   msg = np.zeros(msg_len + MSKX_CRC_WIDTH, dtype=np.uint8)
   for i in range(msg_len + MSKX_CRC_WIDTH):
       if i < 77:
           msg[i] = msg_bits[i]

   for i in range(msg_len):
       if msg[i] != 0:
           for j, d in enumerate(div):
               msg[i + j] = msg[i + j] ^ d

   return msg[msg_len:msg_len + MSKX_CRC_WIDTH]


def mskx_check_crc(msg_bits: npt.NDArray[np.uint8]) -> bool:
    crc = mskx_crc(msg_bits, 83)
    return np.array_equal(crc, msg_bits[MSKX_LDPC_K - MSKX_CRC_WIDTH:MSKX_LDPC_K])

Предикат mskx_check_crc на вход принимает биты данных сообщения, вычисляет CRC-13 (mskx_crc), если биты вычисленной контрольной суммы совпадает с битами контрольной суммы пакета данных, тогда пакет считается корректным и может быть обработан дальше.

Вспомогательная функция извлечения контрольной суммы из пакета данных сообщения.

def mskx_extract_crc(msg: typing.ByteString) -> int:
   return ((msg[9] & 0x07) << 10) | (msg[10] << 2) | (msg[11] >> 6)

Монитор

Описанные выше функции объединены в один класс MSK144Monitor, задача которого предоставить интерфейс загрузки данных и инкапсулировать пайплайн и логику декодера.

class MSK144Monitor(AbstractMonitor):
    MAX_LDPC_ERRORS = 18
    LDPC_ITERATIONS = 10

    @staticmethod
    @cache
    @jit(nopython=True)
    def filter_response(n_fft: int, sample_rate: int) -> npt.NDArray[np.float64]:
        ...

    @staticmethod
    def fourier_bpf(signal: npt.NDArray[np.float64], n_fft: int,
                    response: npt.NDArray[np.float64]) -> npt.NDArray[np.complex128]:
        ...

    @staticmethod
    @jit(nopython=True)
    def shift_freq(complex_signal: npt.NDArray[np.complex128],
                   freq: float, sample_rate: int) -> npt.NDArray[np.complex128]:
        ...

    def __init__(self, sample_rate: int):
        self.sample_rate = sample_rate
        self.signal = np.zeros(0, dtype=np.float64)

    def decode_fame(self, frame: npt.NDArray[np.complex128],
                     max_iterations: int) -> typing.Optional[typing.Tuple[DecodeStatus, typing.ByteString, float, int]]:
        ...

    def detect_signals(self, signal: npt.NDArray[np.complex128], signal_len: int, start: float,
                        max_cand: int = 16, tolerance: int = 150) -> typing.Generator[LogItem, None, None]:
        ...

    def decode(self, tm_slot_start: float) -> typing.Generator[LogItem, None, None]:
        signal_len = min(len(self.signal), 30 * self.sample_rate)
        signal_part = self.signal[:signal_len]

        rms = np.sqrt(np.mean(signal_part ** 2))
        if rms == 0 or np.isnan(rms):
            rms = 1

        part_norm = signal_part / (rms / 2)

        n = int(np.log(signal_len) / np.log(2) + 1)
        nfft = int(min(2 ** n, 1024 ** 2))

        filter_response = self.filter_response(nfft, self.sample_rate)
        part_filtered = self.fourier_bpf(part_norm, nfft, filter_response)

        yield from self.detect_signals(part_filtered, signal_len, tm_slot_start)

    def monitor_process(self, frame: npt.NDArray):
        wave = frame * 0.000390625
        avg = np.mean(wave)
        wave -= avg

        self.signal = wave

MAX_LDPC_ERRORS определяет пороговое значение количества ошибок, свыше которого декодированный сигнал игнорируется.

LDPC_ITERATIONS задает максимальное количество итераций декодирования Belief Propagation алгоритма.

В __init__ задается частота семплирования сигнала sample_rate.

Метод monitor_process сохраняет дискретный сигнал для декодирования. Значение 0.000390625 используется для перехода от int16 к float64 значениям.

Генераторная функция decode осуществляет фильтрацию сигнала и передачу его в функцию-детектор. Генератор выдает данные, инкапсулированные в LogItem.

Полный пример декодера с использованием класса MSK144Monitor:

from scipy.io.wavfile import read
from decode_msk144 import MSK144Monitor
from message import message_decode


def main():
    sample_rate, data = read("examples/signal.wav")

    mon = MSK144Monitor(sample_rate=sample_rate)
    mon.monitor_process(data)

    for it in mon.decode(0):
        msg = message_decode(it.payload)
        msg_text = " ".join(it for it in msg if isinstance(it, str))

        print(
            f"dB: {it.snr:.3f}\t"
            f"T: {it.dT:.3f}\t"
            f"DF: {it.dF:.3f}\t"
            f"Averaging pattern: {it.p_avg}\t"
            f"Freq: {it.freq}\t"
            f"Eye opening: {it.eye_open:.3f}\t"
            f"Bit errors: {it.bit_err}\t"
            f"CRC: {it.crc}\t"
            f"Message text: {msg_text}"
        )


if __name__ == '__main__':
   main()

Результат декодирования тестового сигнала:

dB: 2.000	T: 0.234	DF: -1.000	Averaging pattern: 1	Freq: 1499	Eye opening: 0.768	Bit errors: 1	CRC: 1509	Message text: CQ R9FEU LO87

Заключение

В продолжении цикла статей, посвященных протоколу MSK144, в этой статье были рассмотрены принципы демодуляции и декодирования сигналов с минимальным частотным сдвигом. Были рассмотрены механизмы цифровой обработки сигналов, фазовая и частотная коррекции.

В  статью не вошли описание алгоритмов декодирования LDPC, кодирования сообщений в текст, формирование эталонного сигнала и прочее, так как они были описаны в предыдущих статьях, посвященных FT8 и кодированию MSK144.

Декодер MSK144 может представлять интерес с точки зрения цифровой обработки сигналов.

Ссылки

  1. Первая часть о протоколе MSK144

  2. Общий репозиторий с реализацией FTX и MSKX Python

  3. Исходный код демодулятора MSK144

  4. Статья с описанием протокола из журнала QEX

  5. Программа WSJT, реализующая протокол MSK144

  6. Программа MSHV, реализующая протоколы семейства MSKX