Q65 — цифровой протокол, разработанный Джо Тейлором (K1JT) и его командой в 2021 году для проведения минимальных связей в условиях сложных трасс прохождения радиосигнала.

В предыдущих частях цикла были рассмотрены структура протокола, алгоритмы формирования сигнала, механизмы компенсации эффекта Доплера, синхронизация и детектирование сигнала в условиях быстрых затуханий сигналов. В этой части статьи рассматривается механизм декодирования данных Q-ary Repeat Accumulation к��дов протокола Q65.

Статья может быть интересна радиолюбителям, людям, интересующимся темой цифровой обработки сигналов и кодами коррекции ошибок.

Общее представление

В предыдущей статье были рассмотрены процессы обработки входящего сигнала, синхронизации и применение модели быстрых затуханий, для подготовки сигнала к последующему декодированию Q-ary Repeat Accumulation (QRA) кодов коррекции ошибок и извлечения данных передаваемого сообщения.

Рисунок 1: Общая схема протокола (абстракция по уровням OSI условная).
Рисунок 1: Общая схема протокола (абстракция по уровням OSI условная).

После применения к спектрам символов модели быстрых затуханий, задача которой заключается в устранении эффекта размытия области частот символов, полученные спектры символов подвергаются процедуре извлечения и восстановления данных Q-ary Repeat Accumulation кодов коррекции ошибок. Полученные в результате декодирования данные подвергаются проверке целостности, и, после успешной проверки, преобразуются в 8-и битное представление, из которого восстанавливается текстовое представление сообщения.

Декодирование Q-ary Repeat Accumulation

Общий пайплайн процесса декодирования представлен на рисунке 2.

Рисунок 2: Схема декодирования Q65.
Рисунок 2: Схема декодирования Q65.

В протоколе Q65 используются алгоритмы коррекции ошибок из класса Q-ary Repeat Accumulation (QRA) кодов, отличающиеся от обычных LDPC кодов такими свойствами, как использование недвоичных алфавитов, коррекция ошибок над конечными полями \mathrm{GF}(q), что позволяет данному классу корректирующих кодов быть ближе к пределу Шеннона, по сравнению с LDPC, и, при этом, иметь достаточно низкую алгоритмическую сложность декодера.

Так как QRA являются подклассом LDPC-кодов, процесс декодирования аналогично основан на алгоритмах распространения доверия — belief propagation.

Параметры декодера Q65 описываются структурой QRACodeParams.

@dataclass(frozen=True)
class QRACodeParams:
    K: int
    N: int
    m: int
    M: int
    a: int
    NC: int
    V: int
    C: int
    N_MSG: int
    MAX_V_DEG: int
    MAX_C_DEG: int
    R: float
...

Общие параметры декодера: 

  • K — задает количество информационных бит;

  • N — количество символов в кодовом слове;

  • m — количество бит на символ;

  • M — размер алфавита (\mathrm{GF}(64));

  • a — фактор группировки;

  • NC — количество контрольных символов.

Параметры графа алгоритма распространения доверия: 

  • V — количество переменных узлов в графе Таннера;

  • C — количество проверочных узлов в графе Таннера;

  • N_MSG — количество сообщений в графе Таннера;

  • MAXVDEG — максимальное количество ребер от переменного узла;

  • MAXCDEG — максимальное количество ребер от проверочного узла;

  • R — кодовая скорость.

    msg_w: npt.NDArray[np.int64]
    v_deg: npt.NDArray[np.int64]
    c_deg: npt.NDArray[np.int64]
    v2cm_idx: npt.NDArray[np.int64]
    c2vm_idx: npt.NDArray[np.int64]
    gf_perm_mat: npt.NDArray[np.int64]

Таблицы, используемые при декодировании:

  • msg_w — таблица логарифмов весов сообщений в конечной группе;

  • v_deg — таблица степеней переменных узлов;

  • c_deg — таблица степеней проверочных узлов;

  • v2cm_idx — таблица индексов сообщений v -> c;

  • c2vm_idx— таблица индексов сообщений c -> v;

  • gf_perm_mat — матрица перестановок.

Значения параметров для протокола Q65:

qra_K = 15
qra_N = 65
qra_m = 6
qra_M = 64
qra_a = 1
qra_NC = qra_N - qra_K
qra_V = 65
qra_C = 116
qra_NMSG = 216
qra_MAXVDEG = 5
qra_MAXCDEG = 3  # maximum factor degree
qra_R = qra_K / qra_N

Значения таблиц для протокола Q65:

qra_msgw = np.array([
   0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
   0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
   0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
   0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
   0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
   0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
   0, 0, 0, 0, 0, 0, 14, 0, 0, 13,
   37, 0, 27, 56, 62, 29, 0, 52, 34, 62,
   4, 3, 22, 25, 0, 22, 0, 20, 10, 0,
   43, 53, 60, 0, 0, 0, 62, 0, 5, 0,
   61, 36, 31, 61, 59, 10, 0, 29, 39, 25,
   18, 0, 14, 11, 50, 17, 0, 0, 0, 0,
   0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
   0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
   0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
   0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
   0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
   0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
   0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
   0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
   0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
   0, 0, 0, 0, 0, 0
])

qra_vdeg = np.array([
   4, 4, 4, 4, 4, 4, 4, 5, 5, 5,
   5, 5, 5, 4, 4, 3, 3, 3, 3, 3,
   3, 3, 3, 3, 3, 3, 3, 3, 3, 3,
   3, 3, 3, 3, 3, 3, 3, 3, 3, 3,
   3, 3, 3, 3, 3, 3, 3, 3, 3, 3,
   3, 3, 3, 3, 3, 3, 3, 3, 3, 3,
   3, 3, 3, 3, 3
])

qra_cdeg = np.array([
   1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
   1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
   1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
   1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
   1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
   1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
   1, 1, 1, 1, 1, 2, 3, 3, 3, 3,
   3, 3, 3, 3, 3, 3, 3, 3, 3, 3,
   3, 3, 3, 3, 3, 3, 3, 3, 3, 3,
   3, 3, 3, 3, 3, 3, 3, 3, 3, 3,
   3, 3, 3, 3, 3, 3, 3, 3, 3, 3,
   3, 3, 3, 3, 3, 2
])

qra_v2cmidx = np.array([
   0, 75, 92, 106, -1,
   1, 66, 77, 93, -1,
   2, 87, 95, 104, -1,
   3, 67, 83, 113, -1,
   4, 68, 90, 108, -1,
   5, 74, 86, 98, -1,
   6, 82, 99, 114, -1,
   7, 76, 85, 101, 109,
   8, 69, 79, 91, 111,
   9, 71, 80, 96, 105,
   10, 73, 84, 107, 115,
   11, 78, 94, 103, 112,
   12, 70, 81, 89, 102,
   13, 65, 88, 100, -1,
   14, 72, 97, 110, -1,
   15, 116, 117, -1, -1,
...
   64, 214, 215, -1, -1
])

qra_c2vmidx = np.array([
   0, -1, -1, 1, -1, -1, 2, -1, -1, 3, -1, -1,
   4, -1, -1, 5, -1, -1, 6, -1, -1, 7, -1, -1,
   8, -1, -1, 9, -1, -1, 10, -1, -1, 11, -1, -1,
...
   64, -1, -1, 65, 116, -1, 66, 117, 118, 67, 119, 120,
   68, 121, 122, 69, 123, 124, 70, 125, 126, 71, 127, 128,
   72, 129, 130, 73, 131, 132, 74, 133, 134, 75, 135, 136,
...
   112, 209, 210, 113, 211, 212, 114, 213, 214, 115, 215, -1
])

qra_perm_mat = np.array([
   0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15,
   16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31,
...
   3, 1, 7, 5, 11, 9, 15, 13, 19, 17, 23, 21, 27, 25, 31, 29,
   35, 33, 39, 37, 43, 41, 47, 45, 51, 49, 55, 53, 59, 57, 63, 61
])

Полные таблицы можно посмотреть по этой ссылке.

Инициализация объекта параметров:

qra15_65_64_irr_e23 = QRACodeParams(
    qra_K,
    qra_N,
    qra_m,
    qra_M,
    qra_a,
    qra_NC,
    qra_V,
    qra_C,
    qra_NMSG,
    qra_MAXVDEG,
    qra_MAXCDEG,
    qra_R,
    qra_msgw,
    qra_vdeg,
    qra_cdeg,
    qra_v2cmidx,
    qra_c2vmidx,
    qra_perm_mat
)

Декодер Q65

Поведение декодера и его контекст определяются датаклассом Q65Codec:

@dataclass
class Q65Codec:
    qra_code: QRACodeParams
    decoderEsNoMetric: float
    x: npt.NDArray[np.uint8]
    y: npt.NDArray[np.uint8]
    qra_v2cmsg: npt.NDArray[np.float64]
    qra_c2vmsg: npt.NDArray[np.float64]
    ix: npt.NDArray[np.float64]
    ex: npt.NDArray[np.float64]
    BinsPerTone: int
    BinsPerSymbol: int
    NoiseVar: float
    EsNoMetric: float
    WeightsCount: int
    FastFadingWeights: npt.NDArray[np.float64]

Контекст кодека содержит поля:

  • qra_code — объект параметров QRA-декодера ;

  • decoderEsNoMetric — параметр оптимизации Es/No dB метрики;

  • x — данные на входе декодера;

  • y — данные на выходе декодера;

  • qra_v2cmsg — матрица состояния сообщений v->c;

  • qra_c2vmsg — матрица состояния сообщений c->v;

  • ix — внутреннее состояние декодера;

  • ex — внешнее состояние декодера;

Поля, используемые для вычисления внутренних параметров в условиях быстрых затуханий:

  • BinsPerTone — количество FFT-бинов на тон;

  • BinsPerSymbol — количество FFT-бинов на один символ;

  • NoiseVar — дисперсия шума;

  • EsNoMetric — отношение энергии символа к спектральной плотности мощности шума, вычисляемое на этапе анализа быстрых затуханий;

  • WeightsCount — количество весов модели быстрых затуханий;

  • FastFadingWeights — значения весов, вычисленных после применения модели быстрых затуханий к сигналу.

Затронутая в предыдущей статье функция q65_init представляет собой заполнение полей датаклассов с параметрами декодера:

def q65_init() -> Q65Codec:
    qra_code = qra15_65_64_irr_e23
    EbNodBMetric = 2.8
    EbNoMetric = 10 ** (EbNodBMetric / 10)

    nm = qra_code.bits_per_symbol
    R = qra_code.code_rate

    return Q65Codec(
        qra_code=qra_code,
        decoderEsNoMetric=1.0 * nm * R * EbNoMetric,
        x=np.zeros(qra_code.K, dtype=np.uint8),
        y=np.zeros(qra_code.N, dtype=np.uint8),
        qra_v2cmsg=np.zeros((qra_code.N_MSG, qra_code.M), dtype=np.float64),
        qra_c2vmsg=np.zeros((qra_code.N_MSG, qra_code.M), dtype=np.float64),
        ix=np.zeros((qra_code.N, qra_code.M), dtype=np.float64),
        ex=np.zeros((qra_code.N, qra_code.M), dtype=np.float64),

        BinsPerTone=0,
        BinsPerSymbol=0,
        NoiseVar=0,
        EsNoMetric=0,
        WeightsCount=0,
        FastFadingWeights=np.zeros(Q65_FASTFADING_MAXWEIGTHS, dtype=np.float64)
    )

Для матриц выделяется память, вычисляемые поля обнуляются.

Значение 2.8 переменной EbNodBMetric, задающей оптимальную метрику декодера, подобрано эмпирически.

Выражение decoderEsNoMetric=1.0*nm*R*EbNoMetric вычисляет соотношение канала с Гауссовским шумом к модели затуханий Рэлея (AWGN/Rayleigh Es/No).

Функция декодирования Q65 принимает на вход матрицу вероятностей информационных символов, сформированную на этапе применения модели быстрых затуханий.

def q65_decode(
       codec: Q65Codec,
       sym_prob: npt.NDArray[np.float64],
       max_iters: int
):
    qra_code = codec.qra_code
    ix = codec.ix
    ex = codec.ex

    K = qra_code.message_length
    N = qra_code.codeword_length
    M = qra_code.M
    bits = qra_code.m

    x = codec.x
    y = codec.y

    ix[:K, :M] = sym_prob[:K, :M]

    uniform = pd_uniform(bits)
    ix[K, :M] = uniform[:M]
    ix[K + 1, :M] = uniform[:M]
    ix[K + 2: K + 2 + N - K, :M] = sym_prob[K: K + N - K, :M]

    rc = qra_extrinsic(qra_code, ex, ix, max_iters, codec.qra_v2cmsg, codec.qra_c2vmsg)
    if rc < 0:
        raise DecodeFailed

    qra_map_decode(qra_code, x, ex, ix)

    crc = crc12(x[:K])
    if (crc & 0x3F) != x[K] or (crc >> 6) != x[K + 1]:
        raise CRCMismatch

    decoded_msg = x[:K]

    y[:] = qra_encode(qra_code, x, concat=True)

    decoded_codeword = np.concat([y[:K], y[K + 2:K + (N - K) + 2]])

    return decoded_codeword, decoded_msg

Параметр max_iters определяет максимальное количество итераций алгоритма декодирования QRA-кодов, после которых переданные данные будут признаны не декодируемым. sym_prob — матрица вероятностей символов. Параметр codec — датакласс с параметрами декодера.

Данные из матриц вероятностей символов копируются внутрь кодека (ix[:K, :M]); так как в Q65 из результирующей последовательности символов исключается значение CRC-12, в декодере на их место подставляются значения, соответствующие равномерному распределению (pd_uniform).

@cache
def pd_uniform(dim: int) -> npt.NDArray[np.float64]:
    return np.full(2 ** dim, 1 / 2 ** dim, dtype=np.float64)

pd_uniform возвращает массив из 2^{dim} элементов, со значениями, сумма которых равняется 1. Для dim=6 результатом является массив из 64 элементов со значениями 0.015625.

QRA belief propagation

Функция qra_extrinsic осуществляет декодирование методом belief propagation на основе матрицы вероятностей.

def qra_extrinsic(
        code: QRACodeParams,
        ex: npt.NDArray[np.float64],
        ix: npt.NDArray[np.float64],
        max_iter: int,
        qra_v2c_msg: npt.NDArray[np.float64],
        qra_c2v_msg: npt.NDArray[np.float64],
):
    M = code.M
    m = code.m
    V = code.V
    MAXVDEG = code.MAX_V_DEG
    vdeg = code.v_deg
    C = code.C
    MAXCDEG = code.MAX_C_DEG
    cdeg = code.c_deg
    v2cmidx = code.v2cm_idx
    c2vmidx = code.c2vm_idx
    pmat = code.gf_perm_mat.reshape((-1, M))
    msgw = code.msg_w

    msg_out = np.zeros(QRACODE_MAX_M, dtype=np.float64)

    rc = -1

    if M > QRACODE_MAX_M:
        raise MExceeded
...

В msg_out выделяется память под результат декодирования.

    qra_c2v_msg[:V, :M] = ix[:V, :M]

    for v in range(V):
        v_deg = vdeg[v]
        msg_base = v * MAXVDEG

        for k in range(1, v_deg):
            idx_msg = v2cmidx[msg_base + k]
            qra_v2c_msg[idx_msg, :M] = ix[v, :M]
...

Осуществляется формирование графа Таннера, заполняются узлы, матрицы qra_c2v_msg и qra_v2c_msg данными из матриц вероятностей.

    for iter in range(max_iter):
        for nc in range(V, C):
            deg = cdeg[nc]

            if deg == 1:
                return -2

            msg_base = nc * MAXCDEG
            for k in range(deg):
                idx_msg = c2vmidx[msg_base + k]
                fwht(m, qra_v2c_msg[idx_msg, :], qra_v2c_msg[idx_msg, :])
            for k in range(deg):
                msg_out[:M] = pd_uniform(m)[:M]
                for kk in range(deg):
                    if kk != k:
                        idx_msg = c2vmidx[msg_base + kk]
                        pd_imul(msg_out, qra_v2c_msg[idx_msg, :], m)
                msg_out[0] += 1E-7

                fwht(m, msg_out, msg_out)

                idx_msg = c2vmidx[msg_base + k]
                w_msg = msgw[idx_msg]

                if w_msg == 0:
                    qra_c2v_msg[idx_msg, :M] = msg_out[:M]
                else:
                   pd_backward_permutation(qra_c2v_msg[idx_msg, :], msg_out, pmat[w_msg, :], M)
        for v in range(V):
            deg = vdeg[v]
            msg_base = v * MAXVDEG

            for k in range(deg):
                msg_out[:M] = pd_uniform(m)[:M]
                for kk in range(deg):
                    if kk != k:
                        idx_msg = v2cmidx[msg_base + kk]
                        pd_imul(msg_out, qra_c2v_msg[idx_msg, :], m)

                pd_norm(msg_out, m)
                idx_msg = v2cmidx[msg_base + k]
                w_msg = msgw[idx_msg]

                if w_msg == 0:
                    qra_v2c_msg[idx_msg, :M] = msg_out[:M]
                else:
                   pd_forward_permutation(qra_v2c_msg[idx_msg, :M], msg_out, pmat[w_msg, :], M)
...

Основной цикл декодирования, осуществляется свертка вероятностей по алфавиту конечного поля \mathrm{GF}(M). Исходя из того, что проверка должна удовлетворять условию x1+x2+x3=0 (прим. x1, x2, x3 элементы поля \mathrm{GF}(2^{m})), при данных Prob(x2) и Prob(x3), получается, что Prob(x1=X1)=Prob((x2+x3)=X1)=\mathrm{sum}((Prob(x2=X2)*Prob(x3=(X1+X2))), следовательно такое выражение может быть записано как Prob(x1) = \mathrm{IWHT}(\mathrm{WHT}(Prob(x2))*\mathrm{WHT}(Prob(x3))), где \mathrm{WHT} и \mathrm{IWHT} — прямое и обратное преобразования Уолша-Адамара. Так как на этом шаге нет необходимости в нормализации выходных данных, выражение может быть упрощено до Prob(x1) = \mathrm{WH}(\mathrm{WH}(Prob(x2))*\mathrm{WH}(Prob(x3))).

        total_ex = np.max(qra_v2c_msg[:V, :M], axis=1).sum()

        if total_ex > (1.0 * (V) - 0.01):
            rc = iter
            break

    ex[:V, :M] = qra_v2c_msg[:V, :M]
    return rc

Осуществляется проверка вероятностей результирующих символов, определяется максимальное значение вероятности символа; в случае, если сумма вероятностей символов близка к длине кодового слова, цикл декодирования прерывается, следовательно, данные были успешно декодированы. В ex содержатся данные декодированных символов; rc определяет количество итераций, за которое было осуществлено QRA-декодирование.

Функции pd_backward_permutation и pd_forward_permutation осуществляют применение матрицы перестановок к декодируемым символам в msg_out.

def pd_backward_permutation(
        dst: npt.NDArray[np.float64], src: npt.NDArray[np.float64],
        perm: npt.NDArray[np.int64], dim: int):
    dst[perm[:dim]] = src[:dim]


def pd_forward_permutation(
        dst: npt.NDArray[np.float64], src: npt.NDArray[np.float64],
        perm: npt.NDArray[np.int64], dim: int):
    dst[:dim] = src[perm[:dim]]

Аргументы dst, src — приемник и источник; perm — матрица перестановок; dim — длина вектора.

Функция pd_imul осуществляет операцию умножения вероятностных распределений над алфавитом \mathrm{GF}(M):

def pd_imul(dst: npt.NDArray[np.float64], src: npt.NDArray[np.float64], dim: int):
    idx = int(2 ** dim)
    dst[:idx] *= src[:idx]

Аргументы dst, src — приемник и источник; dim — размерность конечного поля.

Функции pd_norm и pd_norm_tab осуществляют нормализацию вероятностных распределений таким образом, чтобы достигалось условие, что суммарная вероятность символов была равна 1.

def pd_norm_tab(data: npt.NDArray[np.float64], dim: int) -> float:
    dim = min(dim, qra_m)

    if dim == 0:
        t = data[0]
        data[0] = 1.0
        return t

    c = 2 ** dim
    t = np.sum(data[:c])

    if t <= 0:
        data[:c] = pd_uniform(dim)[:c]
        return t
      
    data *= 1 / t
    return t


def pd_norm(data: npt.NDArray[np.float64], dim: int) -> float:
   	 return pd_norm_tab(data, dim)

Преобразование Уолша-Адамара

Преобразование Уолша-Адамара (WHT — Fast Walsh-Hadamard Transform) — ортогональное преобразование, осуществляющее разложение сигнала на базисные функции Уолша, являющимися кусочно-заданными функциями с нормированным интервалом определения [0..1) или [-0.5..05) и интервалом изменения аргумента, зависящим от порядка системы функций и равен \frac{1}{2^n}, где n \in \mathbb{N}

Преобразование Уолша-Адамара применяется для быстрого анализа спектров и аналогично преобразованию Фурье, с той разницей, что возвращает половину среднего числа переходов через ноль на единицу времени, вместо частот, в преобразовании Фурье.

Алгоритм БПУ осуществляет разделение входного вектора на пары элементов длины N=2^k, новые значения вычисляются как сумма и разность соседних элементов; графически, такой алгоритм образует диаграмму, похожую на бабочку (или на песочные часы), которая так и называется — "бабочка Адамара" (рисунок 3). Действия повторяются \log_2(N) раз, на каждом последующем этапе происходит удвоение шага. Таким образом, прямое преобразование Уолша-Адамара сводится к выполнению только операций сложения и вычитания.

Рисунок 3: Бабочка Адамара.
Рисунок 3: Бабочка Адамара.
import numpy as np
import numpy.typing as npt


def wh_bfy(dst: npt.NDArray[np.float64], src: npt.NDArray[np.float64], base: int, offs: int, dist: int):
    dst[base + offs] = src[base + offs] + src[base + offs + dist]
    dst[base + offs + dist] = src[base + offs] - src[base + offs + dist]

Функция wh_bfy реализует низкоуровневую операцию бабочки Адамара. Аргументы dst, src — приемник и источник данных; base — общее начало блока данных; offs — смещение на количество элементов в блоке данных, относительно base; dist — расстояние между парами элементов.

def fwht_1(dst: npt.NDArray[np.float64], src: npt.NDArray[np.float64]):
    dst[0] = src[0]


def fwht_2(dst: npt.NDArray[np.float64], src: npt.NDArray[np.float64]):
    t = np.zeros(2, dtype=np.float64)

    wh_bfy(t, src, 0, 0, 1)
    dst[0] = t[0]
    dst[1] = t[1]


def fwht_4(dst: npt.NDArray[np.float64], src: npt.NDArray[np.float64]):
    t = np.zeros(4, dtype=np.float64)

    # group 1
    for i in range(2):
        wh_bfy(t, src, 0, i, 2)
    # group 2
    for i in range(2):
        wh_bfy(dst, t, i * 2, 0, 1)


def fwht_8(dst: npt.NDArray[np.float64], src: npt.NDArray[np.float64]):
    t1 = np.zeros(8, dtype=np.float64)
    t2 = np.zeros(8, dtype=np.float64)

    # group 1
    for i in range(4):
        wh_bfy(t1, src, 0, i, 4)
    # group 2
    for i in range(2):
        for j in range(2):
            wh_bfy(t2, t1, i * 4, j, 2)
    # group 3
    for i in range(4):
        wh_bfy(dst, t2, i * 2, 0, 1)


def fwht_16(dst: npt.NDArray[np.float64], src: npt.NDArray[np.float64]):
    t1 = np.zeros(16, dtype=np.float64)
    t2 = np.zeros(16, dtype=np.float64)

    # group 1
    for i in range(8):
        wh_bfy(t1, src, 0, i, 8)
    # group 2
    for i in range(2):
        for j in range(4):
            wh_bfy(t2, t1, i * 8, j, 4)
    # group 3
    for i in range(4):
        for j in range(2):
            wh_bfy(t1, t2, i * 4, j, 2)
    # group 4
    for i in range(8):
        wh_bfy(dst, t1, i * 2, 0, 1)


def fwht_32(dst: npt.NDArray[np.float64], src: npt.NDArray[np.float64]):
    t1 = np.zeros(32, dtype=np.float64)
    t2 = np.zeros(32, dtype=np.float64)

    # group 1
    for i in range(16):
        wh_bfy(t1, src, 0, i, 16)
    # group 2
    for i in range(2):
        for j in range(8):
            wh_bfy(t2, t1, i * 16, j, 8)
    # group 3
    for i in range(4):
        for j in range(4):
            wh_bfy(t1, t2, i * 8, j, 4)
    # group 4
    for i in range(8):
        for j in range(2):
            wh_bfy(t2, t1, i * 4, j, 2)
    # group 5
    for i in range(16):
        wh_bfy(dst, t2, i * 2, 0, 1)


def fwht_64(dst: npt.NDArray[np.float64], src: npt.NDArray[np.float64]):
    t1 = np.zeros(64, dtype=np.float64)
    t2 = np.zeros(64, dtype=np.float64)

    # group 1
    for i in range(32):
        wh_bfy(t1, src, 0, i, 32)
    # group 2
    for i in range(2):
        for j in range(16):
            wh_bfy(t2, t1, i * 32, j, 16)
    # group 3
    for i in range(4):
        for j in range(8):
            wh_bfy(t1, t2, i * 16, j, 8)
    # group 4
    for i in range(8):
        for j in range(4):
            wh_bfy(t2, t1, i * 8, j, 4)
    # group 5
    for i in range(16):
        for j in range(2):
            wh_bfy(t1, t2, i * 4, j, 2)
    # group 6
    for i in range(32):
        wh_bfy(dst, t1, i * 2, 0, 1)

Функции fwht_1, fwht_2, fwht_4, fwht_8, fwht_16, fwht_32, fwht_64 реализуют преобразование Уолша-Адамара для N={1, 2, 4, 8, 16, 32, 64} соответственно (функция fwht_1 является тривиальным случаем, когда матрица Адамара вырождается в скаляр).

Рисунок 4: Бабочка Адамара для трех групп.
Рисунок 4: Бабочка Адамара для трех групп.
fwht_tab = [
   fwht_1,
   fwht_2,
   fwht_4,
   fwht_8,
   fwht_16,
   fwht_32,
   fwht_64
]


# Fast Walsh-Hadamard direct transform
def fwht(N: int, dst: npt.NDArray[np.float64], src: npt.NDArray[np.float64]):
   fwht_tab[N](dst, src)

Функция реализует общий интерфейс для доступа к преобразованиям Адамара N-го порядка (через список fwht_tab, где индексация соответствует порядку N).

При выполнении свертки вероятностных распределений проверочных узлов в конечном поле полным перебором, алгоритмическая сложность требует O(N^2) операций, в то время как быстрое преобразование Уолша-Адамара решает ту же задачу имея сложность O(N log N), таким образом, для \mathrm{GF}(64) преобразование Уолша-Адамара выполнится за 64*6=384 итерации, вместо 64^2=4096 итераций прямым перебором. 

Применимость преобразования Уолша-Адамара также доказывается тем, что любое конечное поле \mathrm{GF}(q) (q=p^n) может рассматриваться как векторное пространство n-ой размерности над своим простым подполем \mathbb{F}_p, и является абелевой группой, с покомпонентным сложением элементов; из чего следует, что аддитивная группа (\mathrm{GF}(q), +) изоморфна (\mathbb{F}_2)^q. Из чего следует, что быстрое преобразование Уолша-Адамара является оптимальным решением задачи аддитивной свертки над \mathrm{GF}(q) (q=p^n).

Maximum a posteriori probability декодер

Финальный этап декодирования заключается в получении информации из символов, на основе внутренней и внешней информации символов, полученных при QRA-декодировании. Значение каждого символа определяется как argmax от произведения внешних и внутренних вероятностных распределений в алфавите.

def qra_map_decode(code: QRACodeParams, x_dec: npt.NDArray[np.uint8],
                   ex: npt.NDArray[np.float64], ix: npt.NDArray[np.float64]):
    M = code.M
    m = code.m
    K = code.K

    for k in range(K):
        pd_imul(ex[k, :], ix[k, :], m)
        x_dec[k] = np.argmax(ex[k, :M])

Функция записывает результат в массив x_dec и не возвращает результат.

Проверка целост��ости пакета CRC-12

После декодирования и извлечения данных символов, осуществляется проверка целостности пакета алгоритмом CRC-12. Так как для экономии места при передаче из пакета данных исключался участок данных с CRC-12, алгоритм QRA позволяет восстановить недостающую информацию в пакете данных.

Участок из ранее рассмотренного кода:

...
crc = crc12(x[:K])
if (crc & 0x3F) != x[K] or (crc >> 6) != x[K + 1]:
    raise CRCMismatch
...

Алгоритм вычисления CRC-12 для данных в 6-и битном представлении:

CRC12_POLYNOMIAL = 0xF01


@njit
def crc_six_bit(x: npt.NDArray[np.uint8], polynomial: int) -> int:
    sr = 0
    for t in x:
        for _ in range(6):
            if (t ^ sr) & 0x01:
                sr = (sr >> 1) ^ polynomial
            else:
                sr >>= 1
            t >>= 1
    return sr


def crc12(x: npt.NDArray[np.uint8]) -> int:
    return crc_six_bit(x, CRC12_POLYNOMIAL)

Оценка сигнал/шум Es/No dB

Оценка качества сигнала в условиях быстрых затуханий осуществляется функцией q65_fastfading_EsNodB.

def q65_fastfading_EsNodB(
       codec: Q65Codec,
       y_dec: npt.NDArray[np.int64],
       input_energies: npt.NDArray[np.float64],
) -> float:
    N = codec.qra_code.codeword_length
    M = codec.qra_code.alphabet_size

    bins_per_tone = codec.BinsPerTone
    bins_per_symbol = codec.BinsPerSymbol
    weights = codec.WeightsCount
    noise_var = codec.NoiseVar
    EsNo_metric = codec.EsNoMetric
    tot_weights = 2 * weights - 1

    EsPlusWNo = 0.0
    for n in range(N):
        cur_tone_idx = M + y_dec[n] * bins_per_tone
        cur_bin_idx = cur_tone_idx - weights + 1

        EsPlusWNo += np.sum(input_energies[n, cur_bin_idx: cur_bin_idx + tot_weights])

    EsPlusWNo = EsPlusWNo / N
...

Вычисление энергии символов, с учетом шума, путем суммирования энергий, относящихся к декодированным символам в кодовом слове:

    u = EsPlusWNo / (noise_var * (1 + EsNo_metric / bins_per_symbol))
    u = max(u, tot_weights + 0.316)
    u = (u - tot_weights) / (1 - u / bins_per_symbol)
...

Вычисление дисперсии шума:

    EsNodB = dB(u)
    return EsNodB

Функция dB переводит значение в децибелы:

def dB(x: float) -> float:
    val = -99.0

    if x > 1.259e-10:
        x = max(x, 0.000001)
        val = 10.0 * np.log10(x)

    return val

Декодирование сообщения

Данные, возвращаемые QRA-декодером представлены в 6-и битной форме, функция q65_6bit_decode осуществляет преобразование в 8-и битное представление, в котором данные могут быть расшифрованы в текстовое сообщение.

def q65_dec(
        codec: Q65Codec,
        sym_spectra: npt.NDArray[np.float64],
        sym_prob: npt.NDArray[np.float64],
        max_iter: int,
) -> typing.Tuple[float, npt.NDArray[np.uint8]]:
    ydec, xdec = q65_decode(codec, sym_prob, max_iter)

    EsNodB = q65_fastfading_EsNodB(codec, ydec, sym_spectra)

    return EsNodB, xdec


def q65_6bit_decode(values: npt.NDArray[np.uint8]) -> npt.NDArray[np.uint8]:
    c77 = np.zeros(13 * 6 + 5, dtype=np.bool)

    bc = 0
    for i in range(13):
        bits = 6
        b = int(values[i])

        izz = bits - 1
        for j in range(bits):
            c77[bc] = (1 & (b >> -(j - izz)))
            bc += 1

    return np.packbits(c77[:bc - 1])
...
EsNo_dB, decoded = q65_dec(self.q65_codec, s3, s3prob, self.max_iters)

if np.sum(decoded) <= 0:
    return None

payload = q65_6bit_decode(decoded)

Переменная payload содержит байты принятого сообщения. Декодер сообщений идентичен декодеру в протоколе FT8, рассматривавшемся в соответствующей статье.

Функция-генератор класса Q65Monitor, включающая в себя весь ранее описанный процесс детекции сигнала и декодирования QRA:

@dataclass
class LogItem:
    snr: float
    dT: float
    dF: float
    payload: typing.ByteString
    crc: int

...
class Q65Monitor(AbstractMonitor):
...
    def decode(self, f0: int, **kwargs) -> typing.Generator[LogItem, None, None]:
        dT, ccf_freq, payload, snr = self.decode_0(f0=f0)

        yield LogItem(snr, dT, ccf_freq - f0, payload.tobytes(), 0)

Полный пример декодера с использованием класса Q65Monitor:

import time
from scipy.io.wavfile import read

from decoders import Q65Monitor
from msg.message import MsgServer


def main():
    sample_rate, data = read("examples/signal.wav")

    msg_svr = MsgServer()

    mon = Q65Monitor(q65_type=2, period=15, eme_delay=False)
    mon.monitor_process(data)

    for it in mon.decode(f0=1000, eme_delay=False):
        msg = msg_svr.decode(it.payload)

        print(
            f"dB: {it.snr:.3f}\t"
            f"T: {it.dT:.3f}\t"
            f"DF: {it.dF:.3f}\t"
            f"CRC: {it.crc}\t"
            f"Message text: {msg}"
        )


if __name__ == '__main__':
    main()

Пример декодированного сообщения Q65:

dB: 6.724 T: -0.487 DF: 0.000 CRC: 0 Message text: CQ R9FEU LO87

Заключение

В заключении цикла статей, посвященных протоколу Q65, в этой статье был рассмотрен алгоритм декодирования Q-ary Repeat Accumulation кодов коррекции ошибок, а также рассмотрен алгоритм определения качества сигнала соотношения Es/No dB. В цикл статей не вошли алгоритмы декодирования сообщений по априорным данным (декодирование сообщений по частично принятому сигналу, опираясь на данные, полученные в процессе радиообмена, такие как радиолюбительский позывной), а также не рассмотрены математические доказательства работы QRA-декодера.

Протокол Q65 может представлять интерес с точки зрения применения модуляций и способов коррекции ошибок в каналах связи с большими потерями, подверженных влиянию эффекта Доплера и быстрым затуханиям. Также представляет большой интерес класс кодов коррекции ошибок QRA, обеспечивающих передачу данных с SNR ниже -20дБ, при этом обладающих меньшей вычислительной сложностью и находящихся ближе, по сравнению с LDPC, к пределу Шеннона.

От автора статьи:

QRA-декодер оказался достаточно простым для переноса на Python и numpy (и был завершен раньше детектора), но при этом достаточно сложным, с точки зрения описания, и потребовал значительное количество усилий для понимания принципов QRA; тем не менее, процесс изучения протокола показал еще больше новых направлений для изучения темы коррекции ошибок и передачи радиосигналов.

В заключении, автор выражает благодарность всем читателям данного цикла статей и лицам, заинтересованных в устройстве протокола Q65.

На обложке: группа людей в лабораторных халатах, задумчиво смотрящих на луну как на инструмент радиосвязи.

Ссылки

  1. Первая часть о протоколе Q65

  2. Вторая часть о протоколе Q65

  3. Общий репозиторий с реализацией FTX, MSKX, Q65 Python

  4. Исходный код демодулятора Q65

  5. Статья из журнала QEX с описание протокола (перевод 1, перевод 2)

  6. Презентация IV3NWV о QRA

  7. Программа WSJT, реализующая протокол Q65

  8. Программа MSHV, реализующая протокол Q65