
MSK144 — цифровой протокол, разработанный Джо Тейлором (K1JT) и его командой в 2016 году для проведения связей через метеорное рассеивание.
В предыдущей части были рассмотрены общие характеристики протокола, а также алгоритмы, формирующие сигнал. В этой части статьи рассматриваются механизмы детектирования и декодирования сигналов в протоколе MSK144.
Статья может быть интересна радиолюбителям и людям, интересующимся темой цифровой обработки сигналов.
Общее представление
Как было рассмотрено в предыдущей статье, формирование сигнала, отправляемого в радиоэфир, состоит из этапов упаковки бит данных, расчета контрольной суммы, вычислению LDPC-кодов, после чего, к данным добавляются маркеры (или якоря) синхронизации, состоящие из последовательности бит 01110010, прежде чем всё это преобразовать в дискретный сигнал звуковой частоты.

Обратная задача, при приеме сигнала, заключается в поиске маркера синхронизации с последующей фазовой и частотной коррекцией сигнала, после чего, выполняя IQ преобразования, выделить аналоговое представление бит данных, которые далее проходят процесс декодирования, сверки контрольных сумм и так далее, пока на выходе не останутся только данные сообщения.

Анализ сигнала
Анализ сигнала состоит из нескольких этапов (рисунок 3):
фильтрация сигнала;
детекция сигнала;
декодирования фрейма.

Фильтрация сигнала
Перед тем, как начать анализировать сигнал, осуществляется фильтрация сигнала полосовым фильтром, задача которого исключить незадействованные области частот анализируемого сигнала и оставить только ту полосу, в которой присутствуют интересующие сигналы с данными.
В MSK144 применяется полосовой фильтр на основе преобразований Фурье. Отличительными особенностями фильтров такого типа является то, что фильтрация осуществляется в частотной области, что позволяет добиться высокой степени аттенюации, конфигурировать АЧХ фильтра с любой степенью наклона и задание множества полос частот; а также, помимо аттенюации, осуществлять усиление сигнала (эквализация).
На рисунке 4 показана схема фильтра.

Принцип работы фильтра заключается в том, что исходный сигнал подвергается быстрому преобразованию Фурье, результатом которого является представление сигнала в комплексной форме, в виде амплитуд и фаз в разрезе частот; после умножения амплитудных значений на требуемую характеристику, осуществляется обратное преобразование Фурье, позволяющее обратно получить аналоговый сигнал.
В Python реализовать фильтр на основе преобразования Фурье можно используя функционал библиотеки numpy.
import numpy as np import numpy.typing as npt def fourier_bpf(signal: npt.NDArray[np.float64], n_fft: int, response: npt.NDArray[np.float64]) -> npt.NDArray[np.complex128]: freq_d = np.fft.fft(signal, n=n_fft) response_len = len(response) freq_d[:response_len] *= response freq_d[0] = 0.5 * freq_d[0] freq_d[response_len: n_fft] = complex(0, 0) time_d = np.fft.ifft(freq_d, n=n_fft) return time_d
На вход функции fourier_bpf подается исходный сигнал и АЧХ фильтра, задаваемая аргументом response. Аргумент n_fft определяет размер блока данных для преобразования Фурье; значение задается степенью числа 2.
Частоты сигналов MSK144 в основном располагаются в диапазоне от 1.0 КГц до 2.0 КГц, фильтр должен подавлять все остальные частоты, находящиеся за пределами диапазона (рисунки 5 и 6).
На рисунке 5 представлена АЧХ фильтра для фильтрации сигналов MSK144. Так как сигналы подвержены частотному смещению, полоса пропускания фильтра имеет несколько большую ширину захвата.

За формирование кривой АЧХ фильтра отвечает функция filter_response.
MSK144_CENTER_FREQ = 1500 def filter_response(n_fft: int, sample_rate: int) -> npt.NDArray[np.float64]: t = 1 / 2000 beta = 0.1 df = sample_rate / n_fft nh = (n_fft // 2) + 1 response = np.full(nh, 1, dtype=np.float64) for i in range(nh): f = abs(df * i - MSK144_CENTER_FREQ) if (1 + beta) / (2 * t) >= f > (1 - beta) / (2 * t): response[i] = 0.5 * (1 + np.cos((np.pi * t / beta) * (f - (1 - beta) / (2 * t)))) elif f > (1 + beta) / (2 * t): response[i] = 0 return response
Функция принимает на вход размер выборки преобразования Фурье n_fft, и частоту дискретизации сигнала sample_rate. Результатом является массив значений от 0 до 1.
Кривая АЧХ описывается уравнением:
Точки, соответствующие аттенюации в 3дБ (что соответствует 0.5 от исходного значения амплитуды) соответствуют частотам 0.5 КГц и 2.5 КГц соответственно. Гладкость характеристики в зонах аттенюации определяется функцией косинуса а ширина значением константы beta, чем меньше значение, тем сильнее кривая будет стремиться к прямоугольной форме.
На рисунке 6 изображены спектры сигналов до и после применения фильтра с характеристикой filter_response.

Недостатками фильтров такого типа являются требования в вычислительным ресурсам, а также эффект Гиббса — то есть появление нежелательных осцилляций на границах сигнала, что требует применения к сигналу сглаживающих оконных функций.
Частотное смещение
Сигналы протокола MSK144 формируются в слышимом человеческим ухом диапазоне частот, с тем расчетом, что их прием и передача в радиоэфире осуществляется с помощью радиолюбительского трансивера, рассчитанного на передачу голоса; так как опорные генераторы в любительских трансиверах имеют некоторую погрешность в точности частоты, то отправляемые и принимаемые сигналы имеют частотный дрейф, в связи с этим, принятый сигнал необходимо подстраивать под частоту, на которой будет происходить декодирование.
Смещение частоты сигнала реализуется функцией shift_freq.
def shift_freq(complex_signal: npt.NDArray[np.complex128], freq: float, sample_rate: int) -> npt.NDArray[np.complex128]: phi = 2 * np.pi * freq / sample_rate step = complex(np.cos(phi), np.sin(phi)) phasor = complex(1, 1) signal = np.zeros(len(complex_signal), dtype=np.complex128) for i, cs_val in enumerate(complex_signal): phasor *= step signal[i] = phasor * cs_val return signal
Функция shift_freq принимает на вход отфильтрованный преобразованиями Фурье сигнал в комплексном представлении, аргумент freq определяет на сколько герц необходимо сместить сигнал; sample_rate — частота дискретизации сигнала.
Функция реализует фазовое вращение сигнала в каждой точке на угол phi, соответствующему смещению на заданную частоту. Фазор реализует комплексную экспоненту.
В общем виде смещение может быть записано как: .
Детекция сигнала
Задача детекции сигнала заключается в коррекции фазовых и частотных ошибок сигнала и синхронизация по маркерам.
Функция детектора состоит из этапов:
поиск сигналов-кандидатов по амплитуде;
коррекция фазы и частотной ошибки;
синхронизация;
выделение фрейма.
Этап фазовой коррекции применяется для снижения межсимвольных интерференций сигнала, связанных с тем, что в большинстве трансиверов используется узкополосные фильтры, способные вносить фазовые изменения сигнала.
Маркер синхронизации
Для выделения маркера с помощью автокорреляционной функции, формируется эталонный, искомый сигнал.
MSK144_SYNC = [0, 1, 1, 1, 0, 0, 1, 0] # 0x72 in binary SAMPLES_PER_WORD = 12 SAMPLES_PER_HALF_WORD = SAMPLES_PER_WORD // 2 WORD_SAMPLES = np.sin([i * np.pi / SAMPLES_PER_WORD for i in range(SAMPLES_PER_WORD)]) SYNC_WORDS = np.array([2 * b - 1 for b in MSK144_SYNC]) SYNC_I = np.array([ sample * SYNC_WORDS[j * 2 + 1] for j in range(4) for sample in WORD_SAMPLES ]) SYNC_Q = np.array([ pp * SYNC_WORDS[j * 2] for j in range(4) for pp in (WORD_SAMPLES[SAMPLES_PER_HALF_WORD:] if j == 0 else WORD_SAMPLES) ]) SYNC_WAVEFORM = np.array([complex(SYNC_I[i], SYNC_Q[i]) for i in range(42)]) SYNC_WAVEFORM_LEN = len(SYNC_WAVEFORM) SYNC_WAVEFORM_CONJ = np.conj(SYNC_WAVEFORM)
В MSK144 для маркера синхронизации используется последовательность бит 01110010 (0x72), задаваемая в MSK144_SYNC.
WORD_SAMPLES определяет дискретное представление участка сигнала для формирования символа MSK.
В SYNC_WORDS формируется последовательность сдвигов частоты, в соответствии с последовательностью бит в MSK144_SYNC.
Формирование сигнала с MSK манипуляцией осуществляется в комплексной форме, через IQ преобразование, четные биты записываются в SYNC_Q, нечетные в SYNC_I соответственно (рисунок 7).

Результирующий сигнал SYNC_WAVEFORM записывается в комплексном виде, где синфазная составляющая часть сигнала записывается в вещественную часть значений, а квадратурная в мнимую.
SYNC_WAVEFORM_CONJ хранит комплексно-сопряженное представление SYNC_WAVEFORM, используемое в вычислении корреляции с анализируемым сигналом.
Поиск кандидатов
При поиске кандидатов происходит анализ спектра сигнала в полосе захвата, оценка силы сигнала, его соотношения сигнал/шум и определение погрешности частоты.
Детектор сигналов реализуется функцией detect_signals, являющаяся методом класса MSK144Monitor.
MSK144_NSPM = 864 MSK144_NPTS = 3 * MSK144_NSPM MSK144_NFFT = 864 MSK144_FREQ_CENTER = 1500 MSK144_FREQ_HI = MSK144_FREQ_CENTER + 500 MSK144_FREQ_LO = MSK144_FREQ_CENTER - 500 SMOOTH_WINDOW_LEN = SAMPLES_PER_WORD SMOOTH_WINDOW = (1 - np.cos([i * np.pi / SMOOTH_WINDOW_LEN for i in range(SMOOTH_WINDOW_LEN)])) / 2 def detect_signals(self, signal: npt.NDArray[np.complex128], signal_len: int, start: float, max_cand: int = 16, tolerance: int = 150) -> typing.Generator[LogItem, None, None]: ...
Функция detect_signals принимает на вход отфильтрованный сигнал signal в комплексном виде, параметр signal_len определяет общую длину исходного сигнала (так как после фильтра на основе преобразований Фурье результирующие массивы по длине отличаются от исходного сигнала, параметр передается отдельно).
Параметр start определяет временную позицию в сигнале, относительно которой необходимо осуществлять анализ (например для ручного анализа участка сигнала).
Параметр tolerance позволяет задать ширину полосы частот анализируемого сигнала.
dt = 1 / self.sample_rate df = self.sample_rate / MSK144_NFFT f_hi = MSK144_FREQ_HI * 2 f_lo = MSK144_FREQ_LO * 2 f_hi_id_top = int((f_hi + 2 * tolerance) / df) f_hi_id_bot = int((f_hi - 2 * tolerance) / df) f_lo_id_top = int((f_lo + 2 * tolerance) / df) f_lo_id_bot = int((f_lo - 2 * tolerance) / df) f_2Khz = int(f_lo / df) f_4Khz = int(f_hi / df) ...
Переменные dt и df определяют минимальный временной шаг, относительно частоты дискретизации, и минимальный шаг изменения частоты в спектре после преобразования Фурье.
Значения f_hi и f_lo определяют минимальную и максимальную частоты MSK модуляции.
Группы переменных "f_hi_id_top, f_hi_id_bot" и "f_lo_id_top, f_lo_id_bot" определяют полосу частот, в рамках которых будет производиться анализ.
step_size = MSK144_NSPM // 4 steps_count = (signal_len - MSK144_NPTS) // step_size det_amp = np.zeros(steps_count) det_snr = np.zeros(steps_count) det_f_err = np.full(steps_count, -999.99) ...
Блок массивов "det_amp, det_snr, det_f_err" аккумулирует параметры амплитуд, SNR и отклонения частот на каждой итерации анализа участка сигнала.
steps_real = 0 for step in range(steps_count): part_start = step_size * step part_end = part_start + MSK144_NSPM if part_end > signal_len: break part = signal[part_start: part_end] part = part ** 2 part[:SMOOTH_WINDOW_LEN] *= SMOOTH_WINDOW part[MSK144_NSPM - SMOOTH_WINDOW_LEN:MSK144_NSPM] *= SMOOTH_WINDOW[::-1] spec = np.fft.fft(part, MSK144_NFFT) amps = np.abs(spec) ** 2 ...
В цикле до steps_count из общего сигнала выделяется участок part размером MSK144_NSPM, который далее возводится в степень 2 (part ** 2), для выделения несущих тонов в спектре и повышения их амплитуд относительно шума (по этой причине ранее объявленные переменные со значениями частот умножались на 2).
Затем к границам сигнала применяется сглаживающая функция SMOOTH_WINDOW, чья характеристика соответствует оконной функции Ханна, изображенная на рисунке 8.

Быстрое преобразование Фурье позволяет получить спектр (spec) и АЧХ (amps) анализируемого участка сигнала.
# Hi freq peak_hi = f_hi_id_bot + np.argmax(amps[f_hi_id_bot:f_hi_id_top]) delta_hi = -((spec[peak_hi - 1] - spec[peak_hi + 1]) / ( 2 * spec[peak_hi] - spec[peak_hi - 1] - spec[peak_hi + 1])).real amp_hi = amps[peak_hi] amp_avg_hi = (np.sum(amps[f_hi_id_bot:f_hi_id_top]) - amp_hi) / (f_hi_id_top - f_hi_id_bot) snr_hi = amp_hi / (amp_avg_hi + 0.01) ...
На участке частот спектра в окрестностях частоты 4.0 КГц ищется максимальное значение peak_hi. Значение delta_hi определяет смещение частоты пика. Значение переменной amp_avg_hi определяет среднее значение амплитуд шума для расчета значения SNR в переменной snr_hi.
Аналогичным образом осуществляется поиск и оценка шумовой картины для частоты 2.0 КГц.
# Low freq peak_lo = f_lo_id_bot + np.argmax(amps[f_lo_id_bot:f_lo_id_top]) delta_lo = -((spec[peak_lo - 1] - spec[peak_lo + 1]) / ( 2 * spec[peak_lo] - spec[peak_lo - 1] - spec[peak_lo + 1])).real amp_lo = amps[peak_lo] amp_avg_lo = (np.sum(amps[f_lo_id_bot:f_lo_id_top]) - amp_lo) / (f_lo_id_top - f_lo_id_bot) snr_lo = amp_lo / (amp_avg_lo + 0.01) # Errors f_err_hi = (peak_hi + delta_hi - f_4Khz) * df / 2 f_err_lo = (peak_lo + delta_lo - f_2Khz) * df / 2 f_err = f_err_hi if amp_hi >= amp_lo else f_err_lo ...
Значения переменных f_err_hi и f_err_lo определяют разность между искомой и найденной частотами.
Тройка массивов "det_amp, det_snr, det_f_err" для каждого участка проанализированного сигнала содержит амплитуды сигналов, значение SNR и значение частотной погрешности.
Следующий этап заключается в отборе сигналов-кандидатов, на основе ранее результатов детектора частот.
indices = np.argsort(det_amp[:steps_real]) amp_median = det_amp[indices[steps_real // 4]] if amp_median == 0: amp_median = 1 det_amp[:steps_real] /= amp_median ...
Амплитудные значения во временных окнах нормализуются относительно медианного значения.
time_arr = np.zeros(max_cand, dtype=np.float64) freq_arr = np.zeros(max_cand, dtype=np.float64) snr_arr = np.zeros(max_cand, dtype=np.float64) count_cand = 0 for ip in range(max_cand): il = np.argmax(det_amp[:steps_real]) if det_amp[il] < 3.5: break if abs(det_f_err[il]) <= tolerance: time_arr[count_cand] = ((il - 0) * step_size + MSK144_NSPM / 2) * dt freq_arr[count_cand] = det_f_err[il] snr_arr[count_cand] = 12 * np.log10(det_amp[il]) / 2 - 9 count_cand += 1 det_amp[il] = 0 ...
В цикле из общего скопа задетектированных частот отбираются только те, которые не превышают заданную ошибку несущей частоты и имеют достаточное амплитудное значение, превышающее порог в 3.5.
Пороговое значение в 3.5 выбрано как минимально приемлемое значение для решения задач обработки акустических сигналов, так как меньшие значения считаются близкими к уровню шума и плохо пригодными для достоверного извлечения полезного сигнала.
if count_cand < 3: for ip in range(max_cand - count_cand): if ip >= max_cand - count_cand: break il = np.argmax(det_snr[:steps_real]) if det_snr[il] < 12.0: break if abs(det_f_err[il]) <= tolerance: time_arr[count_cand] = ((il - 0) * step_size + MSK144_NSPM / 2) * dt freq_arr[count_cand] = det_f_err[il] snr_arr[count_cand] = 12 * np.log10(det_snr[il]) / 2 - 9 count_cand += 1 det_snr[il] = 0 ...
Если отобранных кандидатов по критерию амплитуды сигнала нашлось менее 3, дополнительно осуществляется поиск кандидатов по SNR. Такой механизм позволяет обнаруживать слабые сигналы при тропосферном прохождении.
if count_cand > 0: indices = np.argsort(time_arr[:count_cand]) hashes = set() for cand in range(count_cand): with suppress(NextCand): cand_idx = indices[cand] frame_mid = int(time_arr[cand_idx] * self.sample_rate) if frame_mid < MSK144_NPTS / 2: frame_mid = MSK144_NPTS // 2 if frame_mid > signal_len - MSK144_NPTS / 2: frame_mid = signal_len - MSK144_NPTS // 2 t0 = time_arr[cand_idx] + dt * (start) part = signal[frame_mid - MSK144_NPTS // 2: frame_mid + MSK144_NPTS // 2] f_err = freq_arr[cand_idx] snr = 2 * int(snr_arr[cand_idx] / 2) snr = max(-4.0, min(24.0, snr)) part = self.shift_freq(part, -(MSK144_FREQ_CENTER + f_err), self.sample_rate) ...
Для каждого кандидата выбирается участок сигнала из найденных временных границ, производится коррекция ошибки несущей частоты функцией shift_freq.
Синхронизация
Следующий этап — синхронизация с найденными сигналами.
sync1 = np.zeros(MSK144_NPTS, dtype=np.complex128) sync2 = np.zeros(MSK144_NPTS, dtype=np.complex128) for i in range(MSK144_NPTS - (56 * 6 + 42)): sync1[i] = sum(part[i: i + SYNC_WAVEFORM_LEN] * SYNC_WAVEFORM_CONJ) sync2[i] = sum(part[i + 56 * 6: i + 56 * 6 + SYNC_WAVEFORM_LEN] * SYNC_WAVEFORM_CONJ) sync_corr = abs(sync1) * abs(sync2) ...
В sync1 и sync2 считаются значения АКФ сигнала и ранее рассмотренного маркера синхронизации SYNC_WAVEFORM_CONJ.
Значение sync_corr определяет общее значение АКФ для обоих маркеров синхронизации в сигнале.
peaks = [] for i in range(6): peak = np.argmax(sync_corr) sync_corr[max(0, peak - 7): min(MSK144_NPTS - 56 * 6 - 42, peak + 7)] = 0.0 peaks.append(peak) ...
Формируется список шести пиков АКФ маркеров синхронизации для последующей синхронизации.
for peak in peaks: peak_corr = np.zeros(6, dtype=np.complex128) for i in range(6): j = peak + i if peak + 11 + MSK144_NSPM < MSK144_NPTS: s1 = slice(j + 6, j + 6 + MSK144_NSPM, 6) s2 = slice(j, j + MSK144_NSPM, 6) else: s1 = slice(j + 6, MSK144_NPTS, 6) s2 = slice(j, MSK144_NPTS - 6, 6) peak_corr[i] = np.sum((part[s1] * np.conj(part[s2])) ** 2) peak_corr_idx = np.argmax(np.abs(peak_corr)) if peak_corr_idx <= 2: peak_corr_idx -= 1 if peak_corr_idx > 2: peak_corr_idx -= 7 ...
Для найденных шести кипов вычисляется корреляция для тонкой подстройки джиттера, чтобы попасть в центр глазковой диаграммы.
for id in range(3): if id == 1: sign = -1 elif id == 2: sign = 1 else: sign = 0 ic = peak + peak_corr_idx + sign if ic < 0: ic = ic + MSK144_NSPM cca = np.sum(part[ic:ic + SYNC_WAVEFORM_LEN] * SYNC_WAVEFORM_CONJ) if ic + 56 * 6 + 42 < MSK144_NPTS: ccb = np.sum(part[ic + 56 * 6:ic + 56 * 6 + SYNC_WAVEFORM_LEN] * SYNC_WAVEFORM_CONJ) cfac = ccb * np.conj(cca) f_err_2 = np.atan2(cfac.imag, cfac.real) / (2 * np.pi * 56 * 6 * dt) else: ccb = np.sum(part[ic - 88 * 6:ic - 88 * 6 + SYNC_WAVEFORM_LEN] * SYNC_WAVEFORM_CONJ) cfac = ccb * np.conj(cca) f_err_2 = np.atan2(cfac.imag, cfac.real) / (2 * np.pi * 88 * 6 * dt) freq_est = int(MSK144_FREQ_CENTER + f_err + f_err_2) ...
В freq_est вычисляется итоговое значение ошибки отклонения несущей частоты.
Выделение фрейма
Финальный этап детекции — выделение фрейма с сигналом для передачи его в декодер.
for idf in range(5): if idf == 0: delta_f = 0 elif idf % 2 == 0: delta_f = idf else: delta_f = -(idf + 1) subpart = self.shift_freq(part, -(f_err_2 + delta_f), self.sample_rate) subpart = np.roll(subpart, -(ic - MSK144_NSPM)) for avg_pattern in range(8): if avg_pattern == 0: frame = subpart[MSK144_NSPM: MSK144_NSPM + MSK144_NSPM] elif avg_pattern == 1: frame = subpart[MSK144_NSPM - 432: MSK144_NSPM - 432 + MSK144_NSPM] frame = np.roll(frame, 432) elif avg_pattern == 2: frame = subpart[2 * MSK144_NSPM - 432: 2 * MSK144_NSPM - 432 + MSK144_NSPM] frame = np.roll(frame, 432) elif avg_pattern == 3: frame = subpart[:MSK144_NSPM] elif avg_pattern == 4: frame = subpart[2 * MSK144_NSPM: 2 * MSK144_NSPM + MSK144_NSPM] elif avg_pattern == 5: frame = subpart[:MSK144_NSPM] + subpart[MSK144_NSPM:MSK144_NSPM + MSK144_NSPM] elif avg_pattern == 6: frame = subpart[MSK144_NSPM: MSK144_NSPM + MSK144_NSPM] + subpart[2 * MSK144_NSPM: 2 * MSK144_NSPM + MSK144_NSPM] elif avg_pattern == 7: frame = subpart[:MSK144_NSPM] + subpart[MSK144_NSPM:MSK144_NSPM + MSK144_NSPM] + subpart[2 * MSK144_NSPM: 2 * MSK144_NSPM + MSK144_NSPM] ...
Перебор значений delta_f реализует частотный джиттеринг, используемый для тонкой коррекции частотной ошибки, которая могла быть недостаточно точно скомпенсирована на предыдущих этапах.
В subpart содержится скорректированная по частоте и выровненная по времени часть анализируемого сигнала.
Значение avg_pattern определяет схему выборки фрейма для декодирования.
Скорректированный по частоте и подогнанный под временные границы фрейм отправляется в функцию декодирования. Цикл джиттеринга и перебора схем прерывается в случае успешного декодирования фрейма и извлечения из него данных.
if x := self.decode_fame(frame, self.LDPC_ITERATIONS): status, payload, eye_opening, bit_errors = x if status.crc_extracted in hashes: raise NextCand hashes.add(status.crc_extracted) df_hv = freq_est - MSK144_FREQ_CENTER log_item = LogItem( snr=snr, dT=t0, dF=df_hv, p_avg=avg_pattern + 1, freq=freq_est, eye_open=eye_opening, bit_err=bit_errors, payload=payload, crc=status.crc_extracted ) yield log_item raise NextCand if len(hashes) >= 3: break
Результат декодирования заносится в LogItem, хранящий в себе информацию о принятом сигнале, его качестве, а также декодированное сообщение.
При декодировании трех и более сообщений в сигнале, цикл декодирования прерывается.
Описание датакласса LogItem.
from dataclasses import dataclass @dataclass class LogItem: snr: float dT: float dF: float p_avg: int freq: int eye_open: float bit_err: int payload: typing.ByteString crc: int
Описание класса-исключения для управления циклом обработки кандидатов.
class NextCand(Exception): pass
Декодирование фрейма
Разлагая участки сигнала, в которых, предположительно, должны содержаться биты данных, на синфазную и квадратурную составляющие и сравнивая значения с эталонным сигналом, получая тем самым коэффициент правдоподобия.
Декодирование осуществляется функцией decode_fame.
Функция принимает на вход данные декодируемого фрейма, извлеченные в детекторе. Аргумент max_iterations определяет сколько раз декодер LDPC будет осуществлять попыток восстановить данные, прежде, чем они будут отбракованы.
def decode_fame(self, frame: npt.NDArray[np.complex128], max_iterations: int) -> typing.Optional[typing.Tuple[DecodeStatus, typing.ByteString, float, int]]: sync_corr_1 = sum(frame[:SYNC_WAVEFORM_LEN] * SYNC_WAVEFORM_CONJ) sync_corr_2 = sum(frame[56 * 6: 56 * 6 + SYNC_WAVEFORM_LEN] * SYNC_WAVEFORM_CONJ) sync_corr = sync_corr_1 + sync_corr_2 phase_0 = np.atan2(sync_corr.imag, sync_corr.real) corr_factor = complex(np.cos(phase_0), np.sin(phase_0)) frame *= corr_factor.conjugate() ...
Для фазовой коррекции сигнала, происходит вычисление корреляции по маркерным сигналам (sync_corr, sync_corr_1, sync_corr_2). Фазовая коррекция осуществляется умножением сигнала на комплексное сопряженное значение фазы corr_factor.
soft_bits = np.zeros(MSK144_BITS_COUNT, dtype=np.float64) hard_bits = np.zeros(MSK144_BITS_COUNT, dtype=np.int64) for i in range(6): soft_bits[0] += frame[i].imag * WORD_SAMPLES[i + 6] soft_bits[0] += frame[i + (MSK144_NSPM - 5 - 1)].imag * WORD_SAMPLES[i] for i in range(12): soft_bits[1] += frame[i].real * WORD_SAMPLES[i] for i in range(1, 72): soft_sum = 0 for j in range(12): soft_sum += frame[i * 12 - 6 + j].imag * WORD_SAMPLES[j] soft_bits[2 * i] = soft_sum soft_sum = 0 for j in range(12): soft_sum += frame[i * 12 + j].real * WORD_SAMPLES[j] soft_bits[2 * i + 1] = soft_sum if soft_bits[2 * i] >= 0: hard_bits[2 * i] = 1 if soft_bits[2 * i + 1] >= 0: hard_bits[2 * i + 1] = 1 if soft_bits[0] >= 0: hard_bits[0] = 1 if soft_bits[1] >= 0: hard_bits[1] = 1 ...
В soft_bits аккумулируются коэффициенты правдоподобия бит данных для синфазной и квадратурной составляющих сигнала (real — I, imag — Q).
bad_sync_1 = 0 bad_sync_2 = 0 for i, sw in enumerate(SYNC_WORDS): bad_sync_1 += (2 * hard_bits[i] - 1) * sw bad_sync_2 += ((2 * hard_bits[i + 57 - 1] - 1) * sw) bad_sync_1 = (MSK144_SYNC_LEN - bad_sync_1) // 2 bad_sync_2 = (MSK144_SYNC_LEN - bad_sync_2) // 2 bad_sync = bad_sync_1 + bad_sync_2 if bad_sync > 4: return ...
Переменная hard_bits используется для оценки точности синхронизации bad_sync.
soft_bits_std = np.std(soft_bits) soft_bits /= soft_bits_std ...
Коэффициенты правдоподобия бит нормализуются через стандартное отклонение.
sigma = 0.60 soft_bits_128 = np.concat([soft_bits[8: 9 + 47], soft_bits[64: 65 + 80 - 1]]) log128 = 2 * soft_bits_128 / sigma ** 2 ...
В soft_bits_128 конкатенируются информационные биты, тем самым исключаются биты маркеров синхронизации.
Нормализованные логарифмические значения коэффициентов правдоподобия передаются в декодер LDPC. Декодирование LDPC алгоритмом Belief Propagation, возвращает последовательность двоичных бит.
ldpc_errors, plain128 = bp_decode(log128, max_iterations) if ldpc_errors > self.MAX_LDPC_ERRORS: return None ...
В plain128 содержатся декодированные методом Belief Propagation двоичные значения бит.
Значение ldpc_errors определяет количество ошибок бит остались неразрешенными в процессе декодирования. Если значение превышает порог MAX_LDPC_ERRORS, декодируемый фрейм исключается.
Финальный этап — проверка контрольных сумм и формирование отчета о качестве сигнала.
if not mskx_check_crc(plain128): return None payload = self.pack_bits(plain128, MSKX_LDPC_K) crc_extracted = mskx_extract_crc(payload) tones = list(msk144_encode(payload)) msg_bits = np.zeros(MSK144_BITS_COUNT) for i in range(MSK144_BITS_COUNT - 1): j = i + 1 if tones[i] == 0: msg_bits[j] = msg_bits[i] if j % 2 == 1 else (msg_bits[i] + 1) % 2 else: msg_bits[j] = (msg_bits[i] + 1) % 2 if j % 2 == 1 else msg_bits[i] eye_top = 1 eye_bot = -1 for i in range(MSK144_BITS_COUNT): if msg_bits[i] == 1: eye_top = min(soft_bits[i], eye_top) else: eye_bot = max(soft_bits[i], eye_bot) eye_opening = eye_top - eye_bot bit_errors = np.count_nonzero(hard_bits != msg_bits) return DecodeStatus(ldpc_errors, crc_extracted), payload, eye_opening, bit_errors
В payload содержатся данные сообщения в байтовом представлении.
crc_extracted — контрольная сумма сообщения.
Формирование tones и msg_bits используется для оценки BER (bit error rate).
На основе значений soft_bits производится оценка сигнала по глазковой диаграмме (рисунки 9, 10).


Проверка CRC-13
При извлечении двоичных бит данных сообщения, осуществляется проверка корректности контрольной суммы принятого пакета.
MSKX_CRC_WIDTH = 13 MSKX_LDPC_K = 90 def mskx_crc(msg_bits: npt.NDArray[np.uint8], msg_len: int) -> npt.NDArray[np.uint8]: div = [1, 1, 0, 1, 0, 1, 1, 1, 0, 1, 0, 1, 1, 1] msg = np.zeros(msg_len + MSKX_CRC_WIDTH, dtype=np.uint8) for i in range(msg_len + MSKX_CRC_WIDTH): if i < 77: msg[i] = msg_bits[i] for i in range(msg_len): if msg[i] != 0: for j, d in enumerate(div): msg[i + j] = msg[i + j] ^ d return msg[msg_len:msg_len + MSKX_CRC_WIDTH] def mskx_check_crc(msg_bits: npt.NDArray[np.uint8]) -> bool: crc = mskx_crc(msg_bits, 83) return np.array_equal(crc, msg_bits[MSKX_LDPC_K - MSKX_CRC_WIDTH:MSKX_LDPC_K])
Предикат mskx_check_crc на вход принимает биты данных сообщения, вычисляет CRC-13 (mskx_crc), если биты вычисленной контрольной суммы совпадает с битами контрольной суммы пакета данных, тогда пакет считается корректным и может быть обработан дальше.
Вспомогательная функция извлечения контрольной суммы из пакета данных сообщения.
def mskx_extract_crc(msg: typing.ByteString) -> int: return ((msg[9] & 0x07) << 10) | (msg[10] << 2) | (msg[11] >> 6)
Монитор
Описанные выше функции объединены в один класс MSK144Monitor, задача которого предоставить интерфейс загрузки данных и инкапсулировать пайплайн и логику декодера.
class MSK144Monitor(AbstractMonitor): MAX_LDPC_ERRORS = 18 LDPC_ITERATIONS = 10 @staticmethod @cache @jit(nopython=True) def filter_response(n_fft: int, sample_rate: int) -> npt.NDArray[np.float64]: ... @staticmethod def fourier_bpf(signal: npt.NDArray[np.float64], n_fft: int, response: npt.NDArray[np.float64]) -> npt.NDArray[np.complex128]: ... @staticmethod @jit(nopython=True) def shift_freq(complex_signal: npt.NDArray[np.complex128], freq: float, sample_rate: int) -> npt.NDArray[np.complex128]: ... def __init__(self, sample_rate: int): self.sample_rate = sample_rate self.signal = np.zeros(0, dtype=np.float64) def decode_fame(self, frame: npt.NDArray[np.complex128], max_iterations: int) -> typing.Optional[typing.Tuple[DecodeStatus, typing.ByteString, float, int]]: ... def detect_signals(self, signal: npt.NDArray[np.complex128], signal_len: int, start: float, max_cand: int = 16, tolerance: int = 150) -> typing.Generator[LogItem, None, None]: ... def decode(self, tm_slot_start: float) -> typing.Generator[LogItem, None, None]: signal_len = min(len(self.signal), 30 * self.sample_rate) signal_part = self.signal[:signal_len] rms = np.sqrt(np.mean(signal_part ** 2)) if rms == 0 or np.isnan(rms): rms = 1 part_norm = signal_part / (rms / 2) n = int(np.log(signal_len) / np.log(2) + 1) nfft = int(min(2 ** n, 1024 ** 2)) filter_response = self.filter_response(nfft, self.sample_rate) part_filtered = self.fourier_bpf(part_norm, nfft, filter_response) yield from self.detect_signals(part_filtered, signal_len, tm_slot_start) def monitor_process(self, frame: npt.NDArray): wave = frame * 0.000390625 avg = np.mean(wave) wave -= avg self.signal = wave
MAX_LDPC_ERRORS определяет пороговое значение количества ошибок, свыше которого декодированный сигнал игнорируется.
LDPC_ITERATIONS задает максимальное количество итераций декодирования Belief Propagation алгоритма.
В __init__ задается частота семплирования сигнала sample_rate.
Метод monitor_process сохраняет дискретный сигнал для декодирования. Значение 0.000390625 используется для перехода от int16 к float64 значениям.
Генераторная функция decode осуществляет фильтрацию сигнала и передачу его в функцию-детектор. Генератор выдает данные, инкапсулированные в LogItem.
Полный пример декодера с использованием класса MSK144Monitor:
from scipy.io.wavfile import read from decode_msk144 import MSK144Monitor from message import message_decode def main(): sample_rate, data = read("examples/signal.wav") mon = MSK144Monitor(sample_rate=sample_rate) mon.monitor_process(data) for it in mon.decode(0): msg = message_decode(it.payload) msg_text = " ".join(it for it in msg if isinstance(it, str)) print( f"dB: {it.snr:.3f}\t" f"T: {it.dT:.3f}\t" f"DF: {it.dF:.3f}\t" f"Averaging pattern: {it.p_avg}\t" f"Freq: {it.freq}\t" f"Eye opening: {it.eye_open:.3f}\t" f"Bit errors: {it.bit_err}\t" f"CRC: {it.crc}\t" f"Message text: {msg_text}" ) if __name__ == '__main__': main()
Результат декодирования тестового сигнала:
dB: 2.000 T: 0.234 DF: -1.000 Averaging pattern: 1 Freq: 1499 Eye opening: 0.768 Bit errors: 1 CRC: 1509 Message text: CQ R9FEU LO87
Заключение
В продолжении цикла статей, посвященных протоколу MSK144, в этой статье были рассмотрены принципы демодуляции и декодирования сигналов с минимальным частотным сдвигом. Были рассмотрены механизмы цифровой обработки сигналов, фазовая и частотная коррекции.
В статью не вошли описание алгоритмов декодирования LDPC, кодирования сообщений в текст, формирование эталонного сигнала и прочее, так как они были описаны в предыдущих статьях, посвященных FT8 и кодированию MSK144.
Декодер MSK144 может представлять интерес с точки зрения цифровой обработки сигналов.
