
MSK144 — цифровой протокол, разработанный Джо Тейлором (K1JT) и его командой в 2016 году для проведения связей через метеорное рассеивание.
В предыдущей части были рассмотрены общие характеристики протокола, а также алгоритмы, формирующие сигнал. В этой части статьи рассматриваются механизмы детектирования и декодирования сигналов в протоколе MSK144.
Статья может быть интересна радиолюбителям и людям, интересующимся темой цифровой обработки сигналов.
Общее представление
Как было рассмотрено в предыдущей статье, формирование сигнала, отправляемого в радиоэфир, состоит из этапов упаковки бит данных, расчета контрольной суммы, вычислению LDPC-кодов, после чего, к данным добавляются маркеры (или якоря) синхронизации, состоящие из последовательности бит 01110010
, прежде чем всё это преобразовать в дискретный сигнал звуковой частоты.

Обратная задача, при приеме сигнала, заключается в поиске маркера синхронизации с последующей фазовой и частотной коррекцией сигнала, после чего, выполняя IQ преобразования, выделить аналоговое представление бит данных, которые далее проходят процесс декодирования, сверки контрольных сумм и так далее, пока на выходе не останутся только данные сообщения.

Анализ сигнала
Анализ сигнала состоит из нескольких этапов (рисунок 3):
фильтрация сигнала;
детекция сигнала;
декодирования фрейма.

Фильтрация сигнала
Перед тем, как начать анализировать сигнал, осуществляется фильтрация сигнала полосовым фильтром, задача которого исключить незадействованные области частот анализируемого сигнала и оставить только ту полосу, в которой присутствуют интересующие сигналы с данными.
В MSK144 применяется полосовой фильтр на основе преобразований Фурье. Отличительными особенностями фильтров такого типа является то, что фильтрация осуществляется в частотной области, что позволяет добиться высокой степени аттенюации, конфигурировать АЧХ фильтра с любой степенью наклона и задание множества полос частот; а также, помимо аттенюации, осуществлять усиление сигнала (эквализация).
На рисунке 4 показана схема фильтра.

Принцип работы фильтра заключается в том, что исходный сигнал подвергается быстрому преобразованию Фурье, результатом которого является представление сигнала в комплексной форме, в виде амплитуд и фаз в разрезе частот; после умножения амплитудных значений на требуемую характеристику, осуществляется обратное преобразование Фурье, позволяющее обратно получить аналоговый сигнал.
В Python реализовать фильтр на основе преобразования Фурье можно используя функционал библиотеки numpy.
import numpy as np
import numpy.typing as npt
def fourier_bpf(signal: npt.NDArray[np.float64], n_fft: int,
response: npt.NDArray[np.float64]) -> npt.NDArray[np.complex128]:
freq_d = np.fft.fft(signal, n=n_fft)
response_len = len(response)
freq_d[:response_len] *= response
freq_d[0] = 0.5 * freq_d[0]
freq_d[response_len: n_fft] = complex(0, 0)
time_d = np.fft.ifft(freq_d, n=n_fft)
return time_d
На вход функции fourier_bpf
подается исходный сигнал и АЧХ фильтра, задаваемая аргументом response
. Аргумент n_fft
определяет размер блока данных для преобразования Фурье; значение задается степенью числа 2.
Частоты сигналов MSK144 в основном располагаются в диапазоне от 1.0 КГц до 2.0 КГц, фильтр должен подавлять все остальные частоты, находящиеся за пределами диапазона (рисунок 6).
На рисунке 5 представлена АЧХ фильтра для фильтрации сигналов MSK144. Так как сигналы подвержены частотному смещению, полоса пропускания фильтра имеет несколько большую ширину захвата.

За формирование кривой АЧХ фильтра отвечает функция filter_response
.
MSK144_CENTER_FREQ = 1500
def filter_response(n_fft: int, sample_rate: int) -> npt.NDArray[np.float64]:
t = 1 / 2000
beta = 0.1
df = sample_rate / n_fft
nh = (n_fft // 2) + 1
response = np.full(nh, 1, dtype=np.float64)
for i in range(nh):
f = abs(df * i - MSK144_CENTER_FREQ)
if (1 + beta) / (2 * t) >= f > (1 - beta) / (2 * t):
response[i] = 0.5 * (1 + np.cos((np.pi * t / beta) * (f - (1 - beta) / (2 * t))))
elif f > (1 + beta) / (2 * t):
response[i] = 0
return response
Функция принимает на вход размер выборки преобразования Фурье n_fft
, и частоту дискретизации сигнала sample_rate
. Результатом является массив значений от 0 до 1.
Кривая АЧХ описывается уравнением:
Точки, соответствующие аттенюации в 3дБ (что соответствует 0.5 от исходного значения амплитуды) соответствуют частотам 0.5 КГц и 2.5 КГц соответственно. Гладкость характеристики в зонах аттенюации определяется функцией косинуса а ширина значением константы beta
, чем меньше значение, тем сильнее кривая будет стремиться к прямоугольной форме.
На рисунке 5 изображены спектры сигналов до и после применения фильтра с характеристикой filter_response
.

Недостатками фильтров такого типа являются требования в вычислительным ресурсам, а также эффект Гиббса — то есть появление нежелательных осцилляций на границах сигнала, что требует применения к сигналу сглаживающих оконных функций.
Частотное смещение
Сигналы протокола MSK144 формируются в слышимом человеческим ухом диапазоне частот, с тем расчетом, что их прием и передача в радиоэфире осуществляется с помощью радиолюбительского трансивера, рассчитанного на передачу голоса; так как опорные генераторы в любительских трансиверах имеют некоторую погрешность в точности частоты, то отправляемые и принимаемые сигналы имеют частотный дрейф, в связи с этим, принятый сигнал необходимо подстраивать под частоту, на которой будет происходить декодирование.
Смещение частоты сигнала реализуется функцией shift_freq
.
def shift_freq(complex_signal: npt.NDArray[np.complex128],
freq: float, sample_rate: int) -> npt.NDArray[np.complex128]:
phi = 2 * np.pi * freq / sample_rate
step = complex(np.cos(phi), np.sin(phi))
phasor = complex(1, 1)
signal = np.zeros(len(complex_signal), dtype=np.complex128)
for i, cs_val in enumerate(complex_signal):
phasor *= step
signal[i] = phasor * cs_val
return signal
Функция shift_freq
принимает на вход отфильтрованный преобразованиями Фурье сигнал в комплексном представлении, аргумент freq определяет на сколько герц необходимо сместить сигнал; sample_rate
— частота дискретизации сигнала.
Функция реализует фазовое вращение сигнала в каждой точке на угол phi
, соответствующему смещению на заданную частоту. Фазор реализует комплексную экспоненту.
В общем виде смещение может быть записано как: .
Детекция сигнала
Задача детекции сигнала заключается в коррекции фазовых и частотных ошибок сигнала и синхронизация по маркерам.
Функция детектора состоит из этапов:
поиск сигналов-кандидатов по амплитуде;
коррекция фазы и частотной ошибки;
синхронизация;
выделение фрейма.
Этап фазовой коррекции применяется для снижения межсимвольных интерференций сигнала, связанных с тем, что в большинстве трансиверов используется узкополосные фильтры, способные вносить фазовые изменения сигнала.
Маркер синхронизации
Для выделения маркера с помощью автокорреляционной функции, формируется эталонный, искомый сигнал.
MSK144_SYNC = [0, 1, 1, 1, 0, 0, 1, 0] # 0x72 in binary
SAMPLES_PER_WORD = 12
SAMPLES_PER_HALF_WORD = SAMPLES_PER_WORD // 2
WORD_SAMPLES = np.sin([i * np.pi / SAMPLES_PER_WORD for i in range(SAMPLES_PER_WORD)])
SYNC_WORDS = np.array([2 * b - 1 for b in MSK144_SYNC])
SYNC_I = np.array([
sample * SYNC_WORDS[j * 2 + 1]
for j in range(4)
for sample in WORD_SAMPLES
])
SYNC_Q = np.array([
pp * SYNC_WORDS[j * 2]
for j in range(4)
for pp in (WORD_SAMPLES[SAMPLES_PER_HALF_WORD:] if j == 0 else WORD_SAMPLES)
])
SYNC_WAVEFORM = np.array([complex(SYNC_I[i], SYNC_Q[i]) for i in range(42)])
SYNC_WAVEFORM_LEN = len(SYNC_WAVEFORM)
SYNC_WAVEFORM_CONJ = np.conj(SYNC_WAVEFORM)
В MSK144 для маркера синхронизации используется последовательность бит 01110010
(0x72
), задаваемая в MSK144_SYNC
.
WORD_SAMPLES
определяет дискретное представление участка сигнала для формирования символа MSK.
В SYNC_WORDS
формируется последовательность сдвигов частоты, в соответствии с последовательностью бит в MSK144_SYNC
.
Формирование сигнала с MSK манипуляцией осуществляется в комплексной форме, через IQ преобразование, четные биты записываются в SYNC_Q
, нечетные в SYNC_I
соответственно (рисунок 7).

Результирующий сигнал SYNC_WAVEFORM
записывается в комплексном виде, где синфазная составляющая часть сигнала записывается в вещественную часть значений, а квадратурная в мнимую.
SYNC_WAVEFORM_CONJ
хранит комплексно-сопряженное представление SYNC_WAVEFORM
, используемое в вычислении автокорреляции.
Поиск кандидатов
При поиске кандидатов происходит анализ спектра сигнала в полосе захвата, оценка силы сигнала, его соотношения сигнал/шум и определение погрешности частоты.
Детектор сигналов реализуется функцией detect_signals
, являющаяся методом класса MSK144Monitor
.
MSK144_NSPM = 864
MSK144_NPTS = 3 * MSK144_NSPM
MSK144_NFFT = 864
MSK144_FREQ_CENTER = 1500
MSK144_FREQ_HI = MSK144_FREQ_CENTER + 500
MSK144_FREQ_LO = MSK144_FREQ_CENTER - 500
SMOOTH_WINDOW_LEN = SAMPLES_PER_WORD
SMOOTH_WINDOW = (1 - np.cos([i * np.pi / SMOOTH_WINDOW_LEN for i in range(SMOOTH_WINDOW_LEN)])) / 2
def detect_signals(self, signal: npt.NDArray[np.complex128], signal_len: int, start: float,
max_cand: int = 16, tolerance: int = 150) -> typing.Generator[LogItem, None, None]:
...
Функция detect_signals
принимает на вход отфильтрованный сигнал signal
в комплексном виде, параметр signal_len
определяет общую длину исходного сигнала (так как после фильтра на основе преобразований Фурье результирующие массивы по длине отличаются от исходного сигнала, параметр передается отдельно).
Параметр start
определяет временную позицию в сигнале, относительно которой необходимо осуществлять анализ (например для ручного анализа участка сигнала).
Параметр tolerance
позволяет задать ширину полосы частот анализируемого сигнала.
dt = 1 / self.sample_rate
df = self.sample_rate / MSK144_NFFT
f_hi = MSK144_FREQ_HI * 2
f_lo = MSK144_FREQ_LO * 2
f_hi_id_top = int((f_hi + 2 * tolerance) / df)
f_hi_id_bot = int((f_hi - 2 * tolerance) / df)
f_lo_id_top = int((f_lo + 2 * tolerance) / df)
f_lo_id_bot = int((f_lo - 2 * tolerance) / df)
f_2Khz = int(f_lo / df)
f_4Khz = int(f_hi / df)
...
Переменные dt
и df
определяют минимальный временной шаг, относительно частоты дискретизации, и минимальный шаг изменения частоты в спектре после преобразования Фурье.
Значения f_hi
и f_lo
определяют минимальную и максимальную частоты MSK модуляции.
Группы переменных "f_hi_id_top
, f_hi_id_bot
" и "f_lo_id_top
, f_lo_id_bot
" определяют полосу частот, в рамках которых будет производиться анализ.
step_size = MSK144_NSPM // 4
steps_count = (signal_len - MSK144_NPTS) // step_size
det_amp = np.zeros(steps_count)
det_snr = np.zeros(steps_count)
det_f_err = np.full(steps_count, -999.99)
...
Блок массивов "det_amp
, det_snr
, det_f_err
" аккумулирует параметры амплитуд, SNR и отклонения частот на каждой итерации анализа участка сигнала.
steps_real = 0
for step in range(steps_count):
part_start = step_size * step
part_end = part_start + MSK144_NSPM
if part_end > signal_len:
break
part = signal[part_start: part_end]
part = part ** 2
part[:SMOOTH_WINDOW_LEN] *= SMOOTH_WINDOW
part[MSK144_NSPM - SMOOTH_WINDOW_LEN:MSK144_NSPM] *= SMOOTH_WINDOW[::-1]
spec = np.fft.fft(part, MSK144_NFFT)
amps = np.abs(spec) ** 2
...
В цикле до steps_count
из общего сигнала выделяется участок part размером MSK144_NSPM
, который далее возводится в степень 2 (part ** 2
), для выделения несущих тонов в спектре и повышения их амплитуд относительно шума (по этой причине ранее объявленные переменные со значениями частот умножались на 2).
Затем к границам сигнала применяется сглаживающая функция SMOOTH_WINDOW
, чья характеристика соответствует оконной функции Ханна, изображенная на рисунке 8.

Быстрое преобразование Фурье позволяет получить спектр (spec
) и АЧХ (amps
) анализируемого участка сигнала.
# Hi freq
peak_hi = f_hi_id_bot + np.argmax(amps[f_hi_id_bot:f_hi_id_top])
delta_hi = -((spec[peak_hi - 1] - spec[peak_hi + 1]) / (
2 * spec[peak_hi] - spec[peak_hi - 1] - spec[peak_hi + 1])).real
amp_hi = amps[peak_hi]
amp_avg_hi = (np.sum(amps[f_hi_id_bot:f_hi_id_top]) - amp_hi) / (f_hi_id_top - f_hi_id_bot)
snr_hi = amp_hi / (amp_avg_hi + 0.01)
...
На участке частот спектра в окрестностях частоты 4.0 КГц ищется максимальное значение peak_hi
. Значение delta_hi
определяет смещение частоты пика. Значение переменной amp_avg_hi
определяет среднее значение амплитуд шума для расчета значения SNR в переменной snr_hi
.
Аналогичным образом осуществляется поиск и оценка шумовой картины для частоты 2.0 КГц.
# Low freq
peak_lo = f_lo_id_bot + np.argmax(amps[f_lo_id_bot:f_lo_id_top])
delta_lo = -((spec[peak_lo - 1] - spec[peak_lo + 1]) / (
2 * spec[peak_lo] - spec[peak_lo - 1] - spec[peak_lo + 1])).real
amp_lo = amps[peak_lo]
amp_avg_lo = (np.sum(amps[f_lo_id_bot:f_lo_id_top]) - amp_lo) / (f_lo_id_top - f_lo_id_bot)
snr_lo = amp_lo / (amp_avg_lo + 0.01)
# Errors
f_err_hi = (peak_hi + delta_hi - f_4Khz) * df / 2
f_err_lo = (peak_lo + delta_lo - f_2Khz) * df / 2
f_err = f_err_hi if amp_hi >= amp_lo else f_err_lo
...
Значения переменных f_err_hi
и f_err_lo
определяют разность между искомой и найденной частотами.
Тройка массивов "det_amp
, det_snr
, det_f_err
" для каждого участка проанализированного сигнала содержит амплитуды сигналов, значение SNR и значение частотной погрешности.
Следующий этап заключается в отборе сигналов-кандидатов, на основе ранее результатов детектора частот.
indices = np.argsort(det_amp[:steps_real])
amp_median = det_amp[indices[steps_real // 4]]
if amp_median == 0:
amp_median = 1
det_amp[:steps_real] /= amp_median
...
Амплитудные значения во временных окнах нормализуются относительно медианного значения.
time_arr = np.zeros(max_cand, dtype=np.float64)
freq_arr = np.zeros(max_cand, dtype=np.float64)
snr_arr = np.zeros(max_cand, dtype=np.float64)
count_cand = 0
for ip in range(max_cand):
il = np.argmax(det_amp[:steps_real])
if det_amp[il] < 3.5:
break
if abs(det_f_err[il]) <= tolerance:
time_arr[count_cand] = ((il - 0) * step_size + MSK144_NSPM / 2) * dt
freq_arr[count_cand] = det_f_err[il]
snr_arr[count_cand] = 12 * np.log10(det_amp[il]) / 2 - 9
count_cand += 1
det_amp[il] = 0
...
В цикле из общего скопа задетектированных частот отбираются только те, которые не превышают заданную ошибку несущей частоты и имеют достаточное амплитудное значение, превышающее порог в 3.5.
Пороговое значение в 3.5 выбрано как минимально приемлемое значение для решения задач обработки акустических сигналов, так как меньшие значения считаются близкими к уровню шума и плохо пригодными для достоверного извлечения полезного сигнала.
if count_cand < 3:
for ip in range(max_cand - count_cand):
if ip >= max_cand - count_cand:
break
il = np.argmax(det_snr[:steps_real])
if det_snr[il] < 12.0:
break
if abs(det_f_err[il]) <= tolerance:
time_arr[count_cand] = ((il - 0) * step_size + MSK144_NSPM / 2) * dt
freq_arr[count_cand] = det_f_err[il]
snr_arr[count_cand] = 12 * np.log10(det_snr[il]) / 2 - 9
count_cand += 1
det_snr[il] = 0
...
Если отобранных кандидатов по критерию амплитуды сигнала нашлось менее 3, дополнительно осуществляется поиск кандидатов по SNR. Такой механизм позволяет обнаруживать слабые сигналы при тропосферном прохождении.
if count_cand > 0:
indices = np.argsort(time_arr[:count_cand])
hashes = set()
for cand in range(count_cand):
with suppress(NextCand):
cand_idx = indices[cand]
frame_mid = int(time_arr[cand_idx] * self.sample_rate)
if frame_mid < MSK144_NPTS / 2:
frame_mid = MSK144_NPTS // 2
if frame_mid > signal_len - MSK144_NPTS / 2:
frame_mid = signal_len - MSK144_NPTS // 2
t0 = time_arr[cand_idx] + dt * (start)
part = signal[frame_mid - MSK144_NPTS // 2: frame_mid + MSK144_NPTS // 2]
f_err = freq_arr[cand_idx]
snr = 2 * int(snr_arr[cand_idx] / 2)
snr = max(-4.0, min(24.0, snr))
part = self.shift_freq(part, -(MSK144_FREQ_CENTER + f_err), self.sample_rate)
...
Для каждого кандидата выбирается участок сигнала из найденных временных границ, производится коррекция ошибки несущей частоты функцией shift_freq
.
Синхронизация
Следующий этап — синхронизация с найденными сигналами.
sync1 = np.zeros(MSK144_NPTS, dtype=np.complex128)
sync2 = np.zeros(MSK144_NPTS, dtype=np.complex128)
for i in range(MSK144_NPTS - (56 * 6 + 42)):
sync1[i] = sum(part[i: i + SYNC_WAVEFORM_LEN] * SYNC_WAVEFORM_CONJ)
sync2[i] = sum(part[i + 56 * 6: i + 56 * 6 + SYNC_WAVEFORM_LEN] * SYNC_WAVEFORM_CONJ)
sync_corr = abs(sync1) * abs(sync2)
...
В sync1
и sync2
считаются значения АКФ сигнала и ранее рассмотренного маркера синхронизации SYNC_WAVEFORM_CONJ
.
Значение sync_corr
определяет общее значение АКФ для обоих маркеров синхронизации в сигнале.
peaks = []
for i in range(6):
peak = np.argmax(sync_corr)
sync_corr[max(0, peak - 7): min(MSK144_NPTS - 56 * 6 - 42, peak + 7)] = 0.0
peaks.append(peak)
...
Формируется список шести пиков АКФ маркеров синхронизации для последующей синхронизации.
for peak in peaks:
peak_corr = np.zeros(6, dtype=np.complex128)
for i in range(6):
j = peak + i
if peak + 11 + MSK144_NSPM < MSK144_NPTS:
s1 = slice(j + 6, j + 6 + MSK144_NSPM, 6)
s2 = slice(j, j + MSK144_NSPM, 6)
else:
s1 = slice(j + 6, MSK144_NPTS, 6)
s2 = slice(j, MSK144_NPTS - 6, 6)
peak_corr[i] = np.sum((part[s1] * np.conj(part[s2])) ** 2)
peak_corr_idx = np.argmax(np.abs(peak_corr))
if peak_corr_idx <= 2:
peak_corr_idx -= 1
if peak_corr_idx > 2:
peak_corr_idx -= 7
...
Для найденных шести кипов вычисляется корреляция для тонкой подстройки джиттера, чтобы попасть в центр глазковой диаграммы.
for id in range(3):
if id == 1:
sign = -1
elif id == 2:
sign = 1
else:
sign = 0
ic = peak + peak_corr_idx + sign
if ic < 0:
ic = ic + MSK144_NSPM
cca = np.sum(part[ic:ic + SYNC_WAVEFORM_LEN] * SYNC_WAVEFORM_CONJ)
if ic + 56 * 6 + 42 < MSK144_NPTS:
ccb = np.sum(part[ic + 56 * 6:ic + 56 * 6 + SYNC_WAVEFORM_LEN] * SYNC_WAVEFORM_CONJ)
cfac = ccb * np.conj(cca)
f_err_2 = np.atan2(cfac.imag, cfac.real) / (2 * np.pi * 56 * 6 * dt)
else:
ccb = np.sum(part[ic - 88 * 6:ic - 88 * 6 + SYNC_WAVEFORM_LEN] * SYNC_WAVEFORM_CONJ)
cfac = ccb * np.conj(cca)
f_err_2 = np.atan2(cfac.imag, cfac.real) / (2 * np.pi * 88 * 6 * dt)
freq_est = int(MSK144_FREQ_CENTER + f_err + f_err_2)
...
В freq_est
вычисляется итоговое значение ошибки отклонения несущей частоты.
Выделение фрейма
Финальный этап детекции — выделение фрейма с сигналом для передачи его в декодер.
for idf in range(5):
if idf == 0:
delta_f = 0
elif idf % 2 == 0:
delta_f = idf
else:
delta_f = -(idf + 1)
subpart = self.shift_freq(part, -(f_err_2 + delta_f), self.sample_rate)
subpart = np.roll(subpart, -(ic - MSK144_NSPM))
for avg_pattern in range(8):
if avg_pattern == 0:
frame = subpart[MSK144_NSPM: MSK144_NSPM + MSK144_NSPM]
elif avg_pattern == 1:
frame = subpart[MSK144_NSPM - 432: MSK144_NSPM - 432 + MSK144_NSPM]
frame = np.roll(frame, 432)
elif avg_pattern == 2:
frame = subpart[2 * MSK144_NSPM - 432: 2 * MSK144_NSPM - 432 + MSK144_NSPM]
frame = np.roll(frame, 432)
elif avg_pattern == 3:
frame = subpart[:MSK144_NSPM]
elif avg_pattern == 4:
frame = subpart[2 * MSK144_NSPM: 2 * MSK144_NSPM + MSK144_NSPM]
elif avg_pattern == 5:
frame = subpart[:MSK144_NSPM] + subpart[MSK144_NSPM:MSK144_NSPM + MSK144_NSPM]
elif avg_pattern == 6:
frame = subpart[MSK144_NSPM: MSK144_NSPM + MSK144_NSPM] + subpart[2 * MSK144_NSPM: 2 * MSK144_NSPM + MSK144_NSPM]
elif avg_pattern == 7:
frame = subpart[:MSK144_NSPM] + subpart[MSK144_NSPM:MSK144_NSPM + MSK144_NSPM] + subpart[2 * MSK144_NSPM: 2 * MSK144_NSPM + MSK144_NSPM]
...
Перебор значений delta_f
реализует частотный джиттеринг, используемый для тонкой коррекции частотной ошибки, которая могла быть недостаточно точно скомпенсирована на предыдущих этапах.
В subpart
содержится скорректированная по частоте и выровненная по времени часть анализируемого сигнала.
Значение avg_pattern
определяет схему выборки фрейма для декодирования.
Скорректированный по частоте и подогнанный под временные границы фрейм отправляется в функцию декодирования. Цикл джиттеринга и перебора схем прерывается в случае успешного декодирования фрейма и извлечения из него данных.
if x := self.decode_fame(frame, self.LDPC_ITERATIONS):
status, payload, eye_opening, bit_errors = x
if status.crc_extracted in hashes:
raise NextCand
hashes.add(status.crc_extracted)
df_hv = freq_est - MSK144_FREQ_CENTER
log_item = LogItem(
snr=snr,
dT=t0,
dF=df_hv,
p_avg=avg_pattern + 1,
freq=freq_est,
eye_open=eye_opening,
bit_err=bit_errors,
payload=payload,
crc=status.crc_extracted
)
yield log_item
raise NextCand
if len(hashes) >= 3:
break
Результат декодирования заносится в LogItem
, хранящий в себе информацию о принятом сигнале, его качестве, а также декодированное сообщение.
При декодировании трех и более сообщений в сигнале, цикл декодирования прерывается.
Описание датакласса LogItem
.
from dataclasses import dataclass
@dataclass
class LogItem:
snr: float
dT: float
dF: float
p_avg: int
freq: int
eye_open: float
bit_err: int
payload: typing.ByteString
crc: int
Описание класса-исключения для управления циклом обработки кандидатов.
class NextCand(Exception):
pass
Декодирование фрейма
Разлагая участки сигнала, в которых, предположительно, должны содержаться биты данных, на синфазную и квадратурную составляющие и сравнивая значения с эталонным сигналом, получая тем самым коэффициент правдоподобия.
Декодирование осуществляется функцией decode_fame
.
Функция принимает на вход данные декодируемого фрейма, извлеченные в детекторе. Аргумент max_iterations
определяет сколько раз декодер LDPC будет осуществлять попыток восстановить данные, прежде, чем они будут отбракованы.
def decode_fame(self, frame: npt.NDArray[np.complex128],
max_iterations: int) -> typing.Optional[typing.Tuple[DecodeStatus, typing.ByteString, float, int]]:
sync_corr_1 = sum(frame[:SYNC_WAVEFORM_LEN] * SYNC_WAVEFORM_CONJ)
sync_corr_2 = sum(frame[56 * 6: 56 * 6 + SYNC_WAVEFORM_LEN] * SYNC_WAVEFORM_CONJ)
sync_corr = sync_corr_1 + sync_corr_2
phase_0 = np.atan2(sync_corr.imag, sync_corr.real)
corr_factor = complex(np.cos(phase_0), np.sin(phase_0))
frame *= corr_factor.conjugate()
...
Для фазовой коррекции сигнала, происходит вычисление корреляции по маркерным сигналам (sync_corr
, sync_corr_1
, sync_corr_2
). Фазовая коррекция осуществляется умножением сигнала на комплексное сопряженное значение фазы corr_factor
.
soft_bits = np.zeros(MSK144_BITS_COUNT, dtype=np.float64)
hard_bits = np.zeros(MSK144_BITS_COUNT, dtype=np.int64)
for i in range(6):
soft_bits[0] += frame[i].imag * WORD_SAMPLES[i + 6]
soft_bits[0] += frame[i + (MSK144_NSPM - 5 - 1)].imag * WORD_SAMPLES[i]
for i in range(12):
soft_bits[1] += frame[i].real * WORD_SAMPLES[i]
for i in range(1, 72):
soft_sum = 0
for j in range(12):
soft_sum += frame[i * 12 - 6 + j].imag * WORD_SAMPLES[j]
soft_bits[2 * i] = soft_sum
soft_sum = 0
for j in range(12):
soft_sum += frame[i * 12 + j].real * WORD_SAMPLES[j]
soft_bits[2 * i + 1] = soft_sum
if soft_bits[2 * i] >= 0:
hard_bits[2 * i] = 1
if soft_bits[2 * i + 1] >= 0:
hard_bits[2 * i + 1] = 1
if soft_bits[0] >= 0:
hard_bits[0] = 1
if soft_bits[1] >= 0:
hard_bits[1] = 1
...
В soft_bits
аккумулируются коэффициенты правдоподобия бит данных для синфазной и квадратурной составляющих сигнала (real
— I, imag
— Q).
bad_sync_1 = 0
bad_sync_2 = 0
for i, sw in enumerate(SYNC_WORDS):
bad_sync_1 += (2 * hard_bits[i] - 1) * sw
bad_sync_2 += ((2 * hard_bits[i + 57 - 1] - 1) * sw)
bad_sync_1 = (MSK144_SYNC_LEN - bad_sync_1) // 2
bad_sync_2 = (MSK144_SYNC_LEN - bad_sync_2) // 2
bad_sync = bad_sync_1 + bad_sync_2
if bad_sync > 4:
return
...
Переменная hard_bits
используется для оценки точности синхронизации bad_sync
.
soft_bits_std = np.std(soft_bits)
soft_bits /= soft_bits_std
...
Коэффициенты правдоподобия бит нормализуются через стандартное отклонение.
sigma = 0.60
soft_bits_128 = np.concat([soft_bits[8: 9 + 47], soft_bits[64: 65 + 80 - 1]])
log128 = 2 * soft_bits_128 / sigma ** 2
...
В soft_bits_128
конкатенируются информационные биты, тем самым исключаются биты маркеров синхронизации.
Нормализованные логарифмические значения коэффициентов правдоподобия передаются в декодер LDPC. Декодирование LDPC алгоритмом Belief Propagation, возвращает последовательность двоичных бит.
ldpc_errors, plain128 = bp_decode(log128, max_iterations)
if ldpc_errors > self.MAX_LDPC_ERRORS:
return None
...
В plain128
содержатся декодированные методом Belief Propagation двоичные значения бит.
Значение ldpc_errors
определяет количество ошибок бит остались неразрешенными в процессе декодирования. Если значение превышает порог MAX_LDPC_ERRORS
, декодируемый фрейм исключается.
Финальный этап — проверка контрольных сумм и формирование отчета о качестве сигнала.
if not mskx_check_crc(plain128):
return None
payload = self.pack_bits(plain128, MSKX_LDPC_K)
crc_extracted = mskx_extract_crc(payload)
tones = list(msk144_encode(payload))
msg_bits = np.zeros(MSK144_BITS_COUNT)
for i in range(MSK144_BITS_COUNT - 1):
j = i + 1
if tones[i] == 0:
msg_bits[j] = msg_bits[i] if j % 2 == 1 else (msg_bits[i] + 1) % 2
else:
msg_bits[j] = (msg_bits[i] + 1) % 2 if j % 2 == 1 else msg_bits[i]
eye_top = 1
eye_bot = -1
for i in range(MSK144_BITS_COUNT):
if msg_bits[i] == 1:
eye_top = min(soft_bits[i], eye_top)
else:
eye_bot = max(soft_bits[i], eye_bot)
eye_opening = eye_top - eye_bot
bit_errors = np.count_nonzero(hard_bits != msg_bits)
return DecodeStatus(ldpc_errors, crc_extracted), payload, eye_opening, bit_errors
В payload содержатся данные сообщения в байтовом представлении.
crc_extracted
— контрольная сумма сообщения.
Формирование tones и msg_bits
используется для оценки BER (bit error rate).
На основе значений soft_bits
производится оценка сигнала по глазковой диаграмме (рисунки 9, 10).


Проверка CRC-13
При извлечении двоичных бит данных сообщения, осуществляется проверка корректности контрольной суммы принятого пакета.
MSKX_CRC_WIDTH = 13
MSKX_LDPC_K = 90
def mskx_crc(msg_bits: npt.NDArray[np.uint8], msg_len: int) -> npt.NDArray[np.uint8]:
div = [1, 1, 0, 1, 0, 1, 1, 1, 0, 1, 0, 1, 1, 1]
msg = np.zeros(msg_len + MSKX_CRC_WIDTH, dtype=np.uint8)
for i in range(msg_len + MSKX_CRC_WIDTH):
if i < 77:
msg[i] = msg_bits[i]
for i in range(msg_len):
if msg[i] != 0:
for j, d in enumerate(div):
msg[i + j] = msg[i + j] ^ d
return msg[msg_len:msg_len + MSKX_CRC_WIDTH]
def mskx_check_crc(msg_bits: npt.NDArray[np.uint8]) -> bool:
crc = mskx_crc(msg_bits, 83)
return np.array_equal(crc, msg_bits[MSKX_LDPC_K - MSKX_CRC_WIDTH:MSKX_LDPC_K])
Предикат mskx_check_crc
на вход принимает биты данных сообщения, вычисляет CRC-13 (mskx_crc
), если биты вычисленной контрольной суммы совпадает с битами контрольной суммы пакета данных, тогда пакет считается корректным и может быть обработан дальше.
Вспомогательная функция извлечения контрольной суммы из пакета данных сообщения.
def mskx_extract_crc(msg: typing.ByteString) -> int:
return ((msg[9] & 0x07) << 10) | (msg[10] << 2) | (msg[11] >> 6)
Монитор
Описанные выше функции объединены в один класс MSK144Monitor
, задача которого предоставить интерфейс загрузки данных и инкапсулировать пайплайн и логику декодера.
class MSK144Monitor(AbstractMonitor):
MAX_LDPC_ERRORS = 18
LDPC_ITERATIONS = 10
@staticmethod
@cache
@jit(nopython=True)
def filter_response(n_fft: int, sample_rate: int) -> npt.NDArray[np.float64]:
...
@staticmethod
def fourier_bpf(signal: npt.NDArray[np.float64], n_fft: int,
response: npt.NDArray[np.float64]) -> npt.NDArray[np.complex128]:
...
@staticmethod
@jit(nopython=True)
def shift_freq(complex_signal: npt.NDArray[np.complex128],
freq: float, sample_rate: int) -> npt.NDArray[np.complex128]:
...
def __init__(self, sample_rate: int):
self.sample_rate = sample_rate
self.signal = np.zeros(0, dtype=np.float64)
def decode_fame(self, frame: npt.NDArray[np.complex128],
max_iterations: int) -> typing.Optional[typing.Tuple[DecodeStatus, typing.ByteString, float, int]]:
...
def detect_signals(self, signal: npt.NDArray[np.complex128], signal_len: int, start: float,
max_cand: int = 16, tolerance: int = 150) -> typing.Generator[LogItem, None, None]:
...
def decode(self, tm_slot_start: float) -> typing.Generator[LogItem, None, None]:
signal_len = min(len(self.signal), 30 * self.sample_rate)
signal_part = self.signal[:signal_len]
rms = np.sqrt(np.mean(signal_part ** 2))
if rms == 0 or np.isnan(rms):
rms = 1
part_norm = signal_part / (rms / 2)
n = int(np.log(signal_len) / np.log(2) + 1)
nfft = int(min(2 ** n, 1024 ** 2))
filter_response = self.filter_response(nfft, self.sample_rate)
part_filtered = self.fourier_bpf(part_norm, nfft, filter_response)
yield from self.detect_signals(part_filtered, signal_len, tm_slot_start)
def monitor_process(self, frame: npt.NDArray):
wave = frame * 0.000390625
avg = np.mean(wave)
wave -= avg
self.signal = wave
MAX_LDPC_ERRORS
определяет пороговое значение количества ошибок, свыше которого декодированный сигнал игнорируется.
LDPC_ITERATIONS
задает максимальное количество итераций декодирования Belief Propagation алгоритма.
В __init__
задается частота семплирования сигнала sample_rate
.
Метод monitor_process сохраняет дискретный сигнал для декодирования. Значение 0.000390625
используется для перехода от int16
к float64
значениям.
Генераторная функция decode
осуществляет фильтрацию сигнала и передачу его в функцию-детектор. Генератор выдает данные, инкапсулированные в LogItem
.
Полный пример декодера с использованием класса MSK144Monitor
:
from scipy.io.wavfile import read
from decode_msk144 import MSK144Monitor
from message import message_decode
def main():
sample_rate, data = read("examples/signal.wav")
mon = MSK144Monitor(sample_rate=sample_rate)
mon.monitor_process(data)
for it in mon.decode(0):
msg = message_decode(it.payload)
msg_text = " ".join(it for it in msg if isinstance(it, str))
print(
f"dB: {it.snr:.3f}\t"
f"T: {it.dT:.3f}\t"
f"DF: {it.dF:.3f}\t"
f"Averaging pattern: {it.p_avg}\t"
f"Freq: {it.freq}\t"
f"Eye opening: {it.eye_open:.3f}\t"
f"Bit errors: {it.bit_err}\t"
f"CRC: {it.crc}\t"
f"Message text: {msg_text}"
)
if __name__ == '__main__':
main()
Результат декодирования тестового сигнала:
dB: 2.000 T: 0.234 DF: -1.000 Averaging pattern: 1 Freq: 1499 Eye opening: 0.768 Bit errors: 1 CRC: 1509 Message text: CQ R9FEU LO87
Заключение
В продолжении цикла статей, посвященных протоколу MSK144, в этой статье были рассмотрены принципы демодуляции и декодирования сигналов с минимальным частотным сдвигом. Были рассмотрены механизмы цифровой обработки сигналов, фазовая и частотная коррекции.
В статью не вошли описание алгоритмов декодирования LDPC, кодирования сообщений в текст, формирование эталонного сигнала и прочее, так как они были описаны в предыдущих статьях, посвященных FT8 и кодированию MSK144.
Декодер MSK144 может представлять интерес с точки зрения цифровой обработки сигналов.