Pull to refresh

Отработка периодических событий в QNX и RT Linux

Configuring LinuxC++*nixC
Sandbox

Для периодических событий очень важна задержка начала отработки события. Точнее максимальный джиттер. Когда джитер соизмерим с периодом возникновения события, система становится непригодной для отработки периодических событий.

Рассмотрим следующий пример. Допустим на шине PCI мы имеем многоканальную плату аналого-цифрового преобразования (АЦП). Плата конфигурируется таким образом, чтобы осуществлять преобразование всех каналов за определённый период времени и результаты преобразования складывать в заранее предоставленный буфер по технологии DMA (Direct Memory Access). Буфер разделён на 2 части. По окончании цикла преобразования плата генерирует прерывание, свидетельствующее о готовности первой части буфера и уходит на новый цикл преобразования используя вторую часть буфера для фиксации результатов.

Задача системы - отреагировать на прерывание, обработать полученные результаты находящиеся в первой части буфера и выдать управляющий сигнал, в зависимости от рассчитанных значений, до момента готовности второй части буфера. Если не успеть обработать первую часть до готовности второй, плата АЦП вновь переключится на первую часть буфера и начнёт переписывать результаты, которые мы ещё не успели обработать. Типовая схема double buffering.

Обработчик прерываний, по понятным причинам не осуществляет обработку данных. Он лишь отправляет сообщение пользовательскому потоку, который активируется и обрабатывает готовый фрейм. Расчёты связанные с обработкой данных сложны, и требуют процессорного времени, а обработчик прерываний обязан быть прост, чтобы на длительное время не блокировать систему.

Момент начала обработки данных в пользовательском потоке имеет джиттер, обусловленный задержкой переключения задач, а так же возможной генераций прерываний от других периферийных устройств и переключением на обработку других, более приоритетных потоков с последующим возвратом к текущему.

В системах псевдо-реального времени такая задержка вообще особо не регламентирована. К примеру в ядре ОС Linux имеются блокировки типа spin_lock_irqsave, отключающие обработку прерываний на данном ядре процессора, tasklet-ы, отработка которых может начаться в момент обработки данных фрейма и прочее. Длительность периода готовности в таких системах необходимо выбирать с большим запасом.

Теперь представим ситуацию, когда длительность периода начинает уменьшаться и становится соизмеримой с джиттером начала обработки. Помимо длительности самой обработки данных важен и момент начала обработки относительно прерывания готовности данных. В сумме эти величины должны быть меньше длительности периода. Если длительность обработки данных уменьшается, при уменьшении периода, поскольку сам фрейм становится меньшего размера, то джиттер начала обработки не зависит от размера данных.

Таким образом мы приходим к понятию минимальной длительности периода событий в системе, связанной с джиттером начала отработки. В операционных системах реального времени этот джиттер должен быть меньше чем в ОС общего назначения в виду детерминированной длительности системных процессов. Рассмотрим две ОС реального времени QNX и Linux с патчем реального времени PREEMPT-RT.

QNX

Как известно, лидером среди операционных систем реального времени является QNX. ОС предназначена преимущественно для встраиваемых систем и применяется в таких областях, где основным приоритетом является высокая надёжность.

Бесспорно ОС стабильна и эффективна. Основным её достоинством является применение микроядра. Системные службы вынесены из ядра в отдельные задачи и конкурирую за процессорные ресурсы вместе с пользовательскими процессами, подчиняясь общим правилам менеджера планирования потоков системы. Данный подход обеспечивает предсказуемость при отработке периодических событий. Будь то готовность данных, либо срабатывание таймера. Много хороших слов об этой ОС написано вот тут.

С QNX я познакомился недавно. Минусом на мой взгляд является очень слабая поддержка этой ОС производителями всевозможного периферийного оборудования. Мало компаний, предлагают драйверы для своих устройств под QNX, поскольку ОС крайне специализирована. Иногда это становится ключевым фактором. Приходится выбирать оборудование из узкого списка, либо самим заниматься портированием кода драйверов. В последнем случае производитель может снять себя ответственность за неработоспособность оборудования управляемого вашим драйвером. Также приходится самим следить за изменениями характеристик обновлённых устройств. Другим нюансом является то, что эта ОС не бесплатна.

RT Linux

RT Linux, это дистрибутив Linux, в составе которого имеется ядро с наложенным патчем реального времени PREEMPT-RT, и собранное с этой опцией. Патч можно скачать с официального ресурса вот тут для вашей версии ядра Linux. Про RT Linux имеется много информации в интернете. Основная идея заключается в переводе обработчиков прерываний в потоки ядра, которые подчиняются общим правилам планирования потоков системы, а так же переводом блокировок spin_lock_t в мьютексы с приоритизацией. Вот тут можно ознакомится с официальным обзором патча.

Сам имею многолетний опыт разработки под Linux как на пользовательском уровне, так и на уровне ядра. Linux изначально не проектировался для решения задач реального времени. Вышеупомянутый патч на мой взгляд это вынужденная мера. При одних и тех же аппаратных возможностях RT Linux будет проигрывать QNX по задержкам и джитеру. Но мне кажется эта мера может быть оправдана. Linux очень активно развивается. Многие современные IT технологии не мыслимы без ядра Linux. Проще при необходимости купить компьютер чуть по дороже.

Ядро RT Linux монолитное. Все основные службы, хоть и планируются вместе с пользовательскими потоками, но выполняются в контексте ядра. При возникновении ошибки страдает всё ядров целом, а не отдельный модуль, который можно перезапустить. К счастью процесс разработки кода ядра жестко модерируется сообществом. Все коммиты подвержены глубокому анализу и тестированию. На моём опыте работы с Linux проблемы почти всегда были именно с моим кодом.

В качестве минуса я бы выделил ещё одну особенность. Все проприетарные драйверы должны иметься у вас в виде исходных кодов. При внедрении патча драйверы должны быть пересобраны с новым ядром, поскольку бинарный код средств синхронизации и прочих модифицированных патчем объектов изменился. Не всегда производители предлагают исходный код, ограничиваясь списком собранных версий под конкретные платформы.

Тесты

Перейдём к тестированию операционных систем на отработку периодических событий. Тест будет проводится на i7-3770 CPU @ 3.40GH под управлением следующих ОС:

  • QNX Neutrino 6.5.0 SP1 32 bit

  • Ubuntu 18.04 LTS c ядром 5.4.3-rt1 64 bit

В качестве теста предлагается следующий код.

#include <stdbool.h>
#include <stdint.h>
#include <string.h>
#include <stdio.h>
#include <unistd.h>

#include <pthread.h>
#include <sched.h>
#include <sys/time.h>
#include <time.h>

#ifndef  __linux__
#include <sys/neutrino.h>
#else
#include <atomic>
#endif

#include <cassert>
#include <iostream>
#include <fstream>
#include <ios>

#include <stdlib.h>

#define SIZEOF_ARRAY(a) (sizeof(a) / sizeof((a)[0]))

#define RING_BUFF_SIZE 4096

class CRingBuff 
{

#ifdef  __linux__
    std::atomic<size_t> m_tail, m_head; // необходимо учесть модель памяти
#else
    size_t m_tail, m_head;
#endif

    uint8_t m_buff[RING_BUFF_SIZE];

    inline size_t Capacity() const { return SIZEOF_ARRAY(m_buff); } 
    inline size_t Have() const { return m_head >= m_tail ? m_head - m_tail : Capacity() + m_head - m_tail; } 
    inline size_t Left() const { return Capacity() - Have(); } 

public:

     CRingBuff(): m_tail(0), m_head(0) {}

     inline bool Empty() const { return m_head == m_tail; } 
     size_t putData( const uint8_t *data, size_t len );
     size_t getData( uint8_t *data, const size_t max_size );
};

size_t CRingBuff::putData( const uint8_t * data, size_t len )
{
    if (Left() <= len)          // <= защита от переполнения
        return 0;

    size_t capacity = Capacity();

    if (m_head + len > capacity)
    {
        size_t del = capacity - m_head;
        memcpy(m_buff + m_head, data, del);
        memcpy(m_buff, data + del, len - del);
    }
    else 
    {
        memcpy(m_buff + m_head, data, len);
    }

    m_head = (m_head + len) % capacity;

    return len;
}

size_t CRingBuff::getData( uint8_t * data, size_t max_size )
{
    if (Empty())
        return 0;

    size_t have = Have();
    if (have > max_size)
        have = max_size;

    size_t capacity = Capacity();

    if (m_tail + have > capacity)
    {
        size_t del = capacity - m_tail;
        memcpy(data, m_buff + m_tail, del);
        memcpy(data + del, m_buff, have - del);
    }
    else 
    {
        memcpy(data, m_buff + m_tail, have);
    }

    m_tail = (m_tail + have) % capacity;

    return have;
}

void dummy() {}
void do_something()             // -- работа на десятки микросекунд
{
    volatile int i;
    for (i = 0; i < 21000; i++) 
    {
        dummy();
    }
}

struct sData 
{
    uint32_t t1, t2;
    sData(): t1(0), t2(0) {}
};

CRingBuff RingBuff;

int iexit = 0;

const char sFileName[] = "elapsed.csv"; // имя файла для обсчёта в Excel

// функция второстепенного потока сохраняющая результаты измерения
void *flusher(void *arg)
{
    uint32_t step(0);

    while (!iexit)
    {
        if (!RingBuff.Empty()) 
        {
            uint8_t buff[RING_BUFF_SIZE];
            size_t len = RingBuff.getData(buff, sizeof(buff));

            if (!len) 
            {
                printf("ringbuff logic 1 error\n");
                exit(1);
            }

            if (len % sizeof(sData))
            {
                printf("ringbuff logic 2 error\n");
                exit(1);
            }

            size_t sz = len / (sizeof(sData));
            sData *ptr = reinterpret_cast < sData * >(buff);

            for (size_t i = 0; i < sz; i++) 
            {
                double t1 = ptr[i].t1 * 1e-7; // перевод из 100 нс интервалов в секунды
                double t2 = ptr[i].t2 * 1e-7;
	
                printf("%u). duration: %f elapsed: %f\n", step, t1, t2);

                std::ofstream myfile;
                myfile.open(sFileName, std::ios::out | std::ios::app);
                if (myfile.good()) 
                {
                    myfile << step << ';' << t1 << ';' << t2 << ";\n";
                    myfile.close();
                }

                step++;
            }
        }

        usleep(10 * 1000);      // выгребаем результаты каждые 10 мс
    }

    return NULL;
}


#define USECS_PER_SEC (1000 * 1000)

#ifdef  __linux__

inline uint64_t ClockCycles() // В QNX имеется такая функция, а для Linux собственная реализация.
{
    unsigned int low, high;
    asm volatile ("rdtsc\n":"=a" (low), "=d"(high));
    return ((uint64_t) high << 32) | low;
}
#endif

// Класс подсчёта времени в тактах частоты процессора 
class Cycles
{
    uint32_t m_CyclesPerUs;				// Количество тактов за 1 мкс.
    uint32_t m_CyclesPer100Ns;    // Количество тактов за 100 нс.

    CRingBuff m_Values;

 public:

    Cycles( uint32_t N ) : m_CyclesPerUs(1), m_CyclesPer100Ns(1)
    {
        if (N < 2) 
        {
            N = 2;
        }

        uint32_t dc(0);

        for (int i = 0; i < N; i++)  // Замеряем N раз 1 секунду
        {
            uint32_t c = (uint32_t)ClockCycles();
            usleep(USECS_PER_SEC);
            dc = (uint32_t)ClockCycles() - c;
            
            printf("%d). Cycles: %u\n", i, dc);

            dc /= USECS_PER_SEC;

            m_Values.putData(reinterpret_cast<const uint8_t*>(&dc), sizeof(dc));
        }

        for (int i = 0; i < N - 1; i++) // Проверяем результаты
        {
            uint32_t val;
            m_Values.getData(reinterpret_cast <uint8_t*>(&val), sizeof(val));
            if (val != dc) // Все результаты замеров секунды должны совпадать
            {
                printf("CyclesPerUs error %u %u\n", val, dc);
                exit(1);
            }
        }

        m_CyclesPerUs = (uint32_t)dc;
        m_CyclesPer100Ns = m_CyclesPerUs / 10;

        printf("Cycles_per_us:%u\nCycles_per_100ns:%u\n", m_CyclesPerUs, m_CyclesPer100Ns);
    }

    uint32_t getCycPerUs() const { return m_CyclesPerUs; } 
    uint32_t getCycPer100Ns() const { return m_CyclesPer100Ns; } 
   
    // 32 бита, т.о. считывание тактов атомарно для 32-х разрядных сиситем 
    static uint32_t getCycles() { return (uint32_t) ClockCycles(); }
  
    uint32_t calc100Ns( const uint32_t cycles ) const { return cycles / m_CyclesPer100Ns; } 
    uint32_t calcUs( const uint32_t cycles ) const { return cycles / m_CyclesPerUs; }
};

const unsigned long sleep_us = 500;

int elapsed( void )
{
    double clock_res;

    // Для QNX меняем квант системного таймера
#ifndef __linux__
    {
        const unsigned long system_resolution_ns = 10 * 1000;

        {
            struct _clockperiod nres;
            nres.fract = 0;
            nres.nsec = system_resolution_ns;
            if (ClockPeriod(CLOCK_REALTIME, &nres, NULL, 0) < 0) 
            {
                printf("ClockPeriod error\n");
                exit(1);
            }
        }

        // Проверяем установленный системный квант
        struct timespec res;
        if (clock_getres(CLOCK_REALTIME, &res) < 0) {
            printf(" get system resolution error\n");
            exit(1);
        }

        clock_res = res.tv_sec * 1e9;
        clock_res = clock_res + res.tv_nsec * 1e-9;

        printf("clock_getres: %f sec\n", clock_res);
    }
#endif

    // Порождаем второстепенный поток и устанавливаем максимальный приоритет для основного потока
    {
        // удаляем файл резульнатов измерений с прошлого запуска
        remove(sFileName);
        
pthread_t tid;
        pthread_create(&tid, NULL, flusher, NULL);

        struct sched_param sp;
#ifndef __linux__
        sp.sched_priority = 255;        // максимальный приоритет в QNX
#else
        sp.sched_priority = 99;         // максимальный приоритет в Linux
#endif
        int rt = sched_setscheduler(0, SCHED_FIFO, &sp);
        if (rt) {
            printf("set scheduler error\n");
            exit(1);
        }
    }

    const uint32_t N(5);
    Cycles cyc(N);

    // Сохранение условий эксперимента
    {
        std::ofstream myfile;
        myfile.open(sFileName, std::ios::out | std::ios::app);
        if (myfile.good()) 
        {
            myfile << "Resolution (sec)" << ';' << "Cycles per us" << ';' <<"Cycles per 100ns" << ";\n";
            myfile << clock_res << ';' << cyc.getCycPerUs() << ';' << cyc.getCycPer100Ns() << ";\n";
            myfile << "Step" << ';' << "job (sec)" << ';' << "Sleep (sec)" << ";\n";
            myfile.close();
        }
    }

    uint32_t start = cyc.getCycles();

    do 
    {
        do_something();         // задача длительностью десятки микросекунд

        sData data;

        uint32_t mid = cyc.getCycles() - start;

        data.t1 = cyc.calc100Ns(mid);
        
        usleep(sleep_us - cyc.calcUs(mid));

        uint32_t end = cyc.getCycles();
        data.t2 = cyc.calc100Ns(end - start);

        RingBuff.putData(reinterpret_cast < const uint8_t * >(&data), sizeof(data));

        start = end;

    } while (1);

    return 0;
}

int main(int argc, const char **argv)
{
    return elapsed();
}

Будем измерять на обеих ОС отработку временного интервала 500 мкс. Основной пользовательский поток переводится в режим планирования реального времени SCHED_FIFO. Потоку назначается максимально возможный для заданной ОС приоритет. Это обеспечивает выбор планировщиком именно этого потока при его готовности. Мешать работе потока теоретически может только обработка прерываний.

Временные интервалы замеряются циклами тактовой частоты процессора для большей точности. В QNX для этих целей имеется системная функция ClockCycles(). В Linux мы сами реализовали эту функцию на базе не малоизвестной инструкции rdtsc. Для замеров будем использовать только младшую часть 64 разрядного результата этой функции. Это позволит не заботиться об атомарности результата в 32-х разрядной системе. Поскольку тактовая частота процессора 3.4 ГГц, то переполнения 32-х разрядного значения за секунду быть не должно.

Основной поток выполняет некоторую работу в функции do_something(), длинной десятки микросекунд и уходит в состояние покоя в функции usleep() на время 500 мкс за вычетом времени выполнения работы.

Давайте разберёмся что же, такое usleep. По сути это системный вызов. Т.е. обращение к ядру ОС которое разворачивается в просьбу перевести поток в состояние покоя, завести таймер на 500 мкс и переключиться на выполнение других, менее приоритетных задач. Как только истекает запрошенное время, генерируется прерывание и система переводит поток в активное состояние. Поскольку наш поток имеет наивысший приоритет, то помешать ему начать очередной цикл работы может только обработка прерывания от другого периферийного либо системного устройства.

Т. о. наш тест очень похож на пример с платой АЦП. Обсчёт результатов аналого-цифрового преобразования эмулирует функция do_something(), а прерывание, свидетельствующее о готовности очередного фрейма заменяет прерывание таймера. Период повторения события у нас 500 мкс. Мы будем измерять джиттер начала отработки периодического события, а именно джиттер возобновления работы основного потока.

Рис. 1. (функциональная схема алгоритма измерения временного интервала)
Рис. 1. (функциональная схема алгоритма измерения временного интервала)

Следует отметить, что на длительность выполнения функции do_something(), так же влияет обработка прерываний, возникших во время её выполнения. Поэтому мы замеряем время фактического выполнения функции do_something() и вычитаем его из 500 мкс. Т.о. каждый цикл должен длиться одинаково.

В многопроцессорных системах прерывания могут обрабатываться на разных ядрах процессора. На наш джиттер влияют обработчики прерываний выполняемые на том же ядре, что и наш основной поток.

В QNX подсчёт времени отработки таймера ведётся на системных тиках. По умолчанию тик системного таймера равен 1 мс. Для точного подсчёта 500 мкс мы изменяем длительность системного тика до 10 мкс. Да это повышает нагрузку на систему, но вопросы производительности нас сейчас не очень интересуют. Задача у нас не ресурсоёмкая.

В Linux мы имеем системный тик также 1 мс (HZ=1000). Его нельзя изменить без пересборки ядра. К счастью для точного подсчёта времени в Linux используется отдельный аппаратный высокоточный таймер HPET (High Precision Timer Support) предоставленный платформой. Перепланировка активируется при срабатывании этого таймера.

Код POSIX совместим и поэтому работает как на QNX так и на Linux без особых изменений. Код написан на С++. Мы принципиально не используем менеджер памяти. Вместо него используется кольцевой буфер CRingBuff, который рассчитан на одного читателя и одного писателя и поэтому не требует средств синхронизации.

Для сборки под QNX применялась среда Momentics IDE 4.7 с компилятором gcc версии 4.4.2, который в полном объеме не поддерживает C++11. Поэтому для QNX не удалось учесть модель памяти в классе CRingBuff. Но мне повезло и после компиляции код работал правильно. Под Ubuntu 64 bit код собирался с ключом -m32.

Помимо основного потока порождается второстепенный поток flusher с обычной политикой планирования и базовым приоритетом. Его задача - сохранять в фоне результаты измерений в файл elapsed.csv не мешая самим измерениям. Далее этот файл будет обработан в GNU Octave, с целью поиска максимального и минимального отклонений от 500 мкс (min, max), среднего значения (avg), медианного (med) , СКО (std) и доверительного интервала err = 3*sigma . Также была рассчитана относительная погрешность (r_err), относительный размах (r_d), и максимальный относительный размах (r_max). Формулы расчёта представлены ниже в функции stat_summary().

function stat_summary(name, data)
    m = mean(data);
    s = std(data);
    mi = min(data);
    ma = max(data);
    
    printf("%40s [us]: min=%6.1f; max=%7.1f; avr=%6.1f; med=%6.1f; std=%7.3f; err=%6.3f; r_err=%6.2f %%; r_d=%6.2f %%; r_max=%6.2f %%\n", name, mi, ma, m, median(data), s, 3*s, 3 * s / m * 100.0, (ma - mi) / m * 100.0, (ma - m) / m * 100);
end

Для накопления статистики бралась выборка 106 отсчётов. Помимо расчётов погрешностей были построены гистограммы распределения измеренных временных интервалов, как в обычном, так и в логарифмическом масштабах. Гистограммы построены с разрешением 100 бинов и расстоянием между бинами зависящем от найденных максимального и минимального отклонений.

Далее представлены результаты анализа файлов elapsed.csv по каждой из ОС. Странные цифры в названиях экспериментов, это даты их проведения (2021-06-02 16:21).

210602 1621.qnx hw cycles.sleep [us]:

min= 492.2; max= 598.4; avr= 505.0; med= 506.9; std= 5.353; err=16.058;

r_err= 3.18 %; r_d= 21.03 %; r_max= 18.50 %

Рис. 2 (Гистограммы распределения временного интервала QNX)
Рис. 2 (Гистограммы распределения временного интервала QNX)

210604 1305.linux rt cycles.sleep [us]:

min= 492.9; max= 570.7; avr= 499.6; med= 499.5; std= 3.746; err=11.239;

r_err= 2.25 %; r_d= 15.57 %; r_max= 14.22 %

Рис. 3 (Гистограммы распределения временного интервала RT Linux)
Рис. 3 (Гистограммы распределения временного интервала RT Linux)

В момент проведения экспериментов мною осуществлялось активное копирование файлов по сети. Можно видеть, что дисковые операции не смогли существенно повлиять на джиттер.

Гребенчатый характер распределения в QNX c шагом примерно 10 мкс. обусловлен выставленным квантом системного таймера.

Как видно из результатов, Linux даже немного обыграл QNX. Скорее всего нам просто повезло, поскольку наблюдение необходимо вести на более длительном отрезке времени. Главное, что не на одной из ОС максимальное отклонение не становилось больше самого временного интервала. В этом, на мой взгляд и заключается ценность ОС реального времени.

Для сравнения рассмотрим выполнение нашего кода на той же версии ядра Linux, но без патча PREEMPT-RT. Объём выборки оставим прежним.

210606 0934.linux srt native.sleep [us]:

min= 502.4; max= 4524.4; avr= 510.8; med= 505.4; std=128.778; err=386. 334;

r_err= 75.63 %; r_d=787.35 %; r_max=785.70 %

Рис. 4 (Гистограммы распределения временного интервала Linux)
Рис. 4 (Гистограммы распределения временного интервала Linux)

Как сказал мой коллега "Наглядно!"

Хотя среднее значение временного интервала особо не изменилось (510.8 мкс), но вот максимальное отклонение достигает 4524.4 мкс. Даже единичные случаи такой ошибки могут привести к плачевным результатам скажем в медицине или в оборонной промышленности. Такая система для наших задач уже не пригодна.

Заключение

В качестве заключения хочу повторить мысль озвученную выше. QNX - это профессиональная ОС проверенная временем, но на сегодняшний день её популярность значительно ниже чем у Linux. Низкая популярность является причиной отсутствия драйверов под эту ОС на сайтах производителей. В новых проектах, требующих отклика реального времени я призываю обратить внимание на RT Linux. Возможно вам окажется проще слегка расширить аппаратные возможности вашей системы, чем связываться в необходимостью самостоятельной поддержки кода драйверов обновлённых производителем периферийных устройств.

Ещё хочу отметить тот факт, что сам патч, как и ядро Linux активно развивается. Помимо поддержки новых версий ядра проводятся улучшения в плане realtime. В прошлом, мне доводилось использовать патч для версии ядра 2.6.33.7 Думаю результаты были бы другими, нежели сегодня. Как что пришло время использовать RT Linux.

Tags:qnxpreempt_rtreal-timerealtimekernel
Hubs: Configuring Linux C++ *nix C
Total votes 5: ↑5 and ↓0+5
Views3K

Popular right now

Top of the last 24 hours