Нейрохирургическая операционная. Система предыдущего поколения в левой трети снимка.
Нейрохирургическая операционная. Система предыдущего поколения в левой трети снимка.

Преамбула

Фирма, в которой я работаю, производит аппаратуру для нейрохирургов и нейрофизиологов, в основе которой лежит технология Deep Brain Stimulation. Если коротко, в живой мозг втыкается электрод, и нейрохирург может считывать из мозга сигнал или стимулировать клетки мозга разрядом тока. У технологии огромное будущее в деле лечения болезней (например, болезни Паркинсона, дистонии, эпилепсии) и в создании нейроинтерфейсов: нейропротезирование, в частности, восстановление зрения; аугментация мозга дополнительными устройствами, расширение возможностей мозга. Скажем, спецслужбы очень интересуются способом считывать и записывать информацию из зрительных и слуховых нервов, а также управлять движением животных, что позволит создать новый класс шпионов.

Для клинического применения в целях лечения тремора при болезни Паркинсона достаточно лишь нескольких внедряемых контактов (а некоторые нейрохирурги обходятся вообще одним). Но для исследователей, изучающих работу мозга, количество контактов имеет значение, и им нужно получать данные от как можно большего числа контактов одновременно и синхронно. Скажем, тысяча, или две тысячи внедрённых в мозг контактных площадок. Понятно, хотелось бы, чтобы и скорость была пристойной, — скажем, сорок тысяч замеров с каждого контакта в секунду. И чтоб разрешение было повыше, чтоб каждый замер был хотя бы в 32 бита, в формате float или Q. Получаем, что система производит порядка 320 мегабайт данных в секунду, так что всё это придётся обрабатывать.

Кроме считываемых непосредственно из мозга «чистых» данных, есть ещё данные отфильтрованные: результат применённых к замерам фильтров верхних и нижних частот. Используются фильтры высоких порядков, хотя бы четвёртого, реализованные в виде полиномов. Они применяются к каждому входящему замеру, увеличивая количество данных, о которых система должна заботиться, вчетверо, и поднимая количество генерируемых данных до 1,3 гигабайта в секунду. Но это уже не моя забота, потому что они генерируются из переданных мной данных после того, как я свою часть работы выполнил.

Результат всего этого счастья нужен в режиме реального времени и крайне важен. Пропускать нельзя ни одного замера, потому что основную работу по анализу данных исследователи выполняют после окончания эксперимента. Поэтому всё это богатст��о данных, помимо показа на экране, придётся записывать на жёсткий диск. Все 1,3 гигабайта данных в секунду. И потом читать в Matlab`е, NeuroExplorer`е или другой программе. Система, которая сохраняла 99,99999% данных, не прошла контроль качества и была забракована, потому что теряла до 13 тысяч замеров каждую секунду.

А вот теперь мы со всем этим попробуем взлететь.

Формулировка задачи

Имеется разработанная фирмой плата с контроллером FPGA, с одной стороны в которую воткнуты провода, идущие от мозга, (ну, на самом деле, от конвертеров, типа такого), а с другой есть выход PCIe. Этим выходом плата будет воткнута в порт PCIe на самом обычном, просто очень мощном, компьютере.

Мне предстояло создать драйвер, который получает данные потенциалов от этой нашей кастомной платы, обрабатывает на подключённой к тому же PCIe видеокарте (результат расчёта фильтров одного контакта не зависит от результатов другого; глупо не использовать для независимых расчётов процессор, который специально заточен на выполнение большого числа однотипных параллельных расчётов одновременно) и передаёт дальше в пользовательский интерфейс. И делать это надо очень-очень быстро, потому что новые пакеты с восемью замерами данных от каждого контакта приходят каждые 200 микросекунд. И, самое главное, сделать это надо под десятой Виндой, потому что нейрохирурги не знают и знать не хотят ничего, кроме Винды и Мака. Причём, судя по внешнему виду некоторых клиентов и адекватности высказываемых ими требований к программе, последнее слово предыдущего предложения можно писать с маленькой буквы.

Люди в теме уже сообразили, что речь идёт о hard realtime: гарантированный ответ на полученные данные в течение фиксированного времени, и неважно, что за дичь творится вокруг, без возможности задержаться или пропустить хотя бы один пакет. Эти же люди в теме уже покачали головой: творящаяся вокруг дичь — это Windows, hard realtime под Windows невозможен, Windows не операционная система реального времени. Более того, Windows не заточен под работу с квантами времени меньше миллисекунды, поэтому работа со скоростью «полный цикл обработки данных за 200 микросекунд» под Windows невозможна вдвойне.

Soft realtime отличается от hard realtime тем, что в soft иногда небольшие задержки всё-таки разрешены, при условии, что система от задержки очухается и успеет наверстать упущенное и разгрести данные, накопившиеся за время задержки, без потери производительности.

Существуют всякие расширения под Windows, которые позволяют частично имплементировать realtime. Например, операционные системы On Time, RTX64 от IntervalZero и прочие. Они сводятся к одной и той же идее: мы отбираем у Винды одно или несколько ядер и кусок памяти, делаем вид, что их в компьютере больше нет, и запускаем на них свою собственную операционку. После того, как этот монстр Франкенштейна раскочегарится и выйдет на рабочий режим, на компьютере будут работать одновременно две операционных системы: realtime и Windows. Между ними можно настроить общение. Это решение будет работать с двумя оговорками: во-первых, из-под Windows практически нет возможностей повлиять на то, что происходит внутри запущенной параллельно realtime ОС, (например, программы для неё надо компилировать при помощи проприетарной SDK; нельзя во время работы передать в неё свою собственную программу для обработки получаемых данных и запустить её), а во-вторых, стоимость этого решения, мягко говоря, неадекватна. Лицензия разработчика RTX64 стоит порядка 10 тысяч долларов, а за каждый экземпляр готового продукта, ушедшего клиенту (тому самому нейрохирургу), придётся заплатить ещё 500 долларов. Вдобавок к 600-долларовой лицензии на Винду, которую клиент тоже получит. Это выводит общую стоимость продукта из зоны конкурентоспособности и делает его финансово непривлекательным для потенциальных покупателей.

За десять тысяч долларов плюс неустановленное количество 500-долларовых лицензионных платежей я сам себе RTOS под Windows напишу, подумал я. И написал.

Применённые технические хитрости

  1. Во-первых, нам нужно, чтобы как можно больше работы выполняла наша плата с FPGA на борту. Скажем, передачу данных лучше перевесить на неё: у неё DMA-контроллер точно ничем не будет занят, нет шансов, что, когда нам потребуется DMA-канал, Винда скажет нам в ответ «в очередь, линуксьи дети, в очередь!»

    Как подключать FPGA к PCIe так, чтобы DMA писал данные, куда надо, это совсем-совсем отдельная тема, которая выходит за рамки данной статьи. Скажу только, что FPGA должен быть сконфигурирован как PCIe Endpoint, потому что компьютер остаётся Root Complex, — ему ведь ещё видеоадаптером управлять. При этом, раз DMA запускается платой, то и трансляция адресов должна выполняться на плате. И тут возникает вопрос: а куда плата будет писать? Изнутри Windows я могу работать только с виртуальными адресами. Даже если я выделю реальную память при помощи MmAllocateContiguousMemory, я получу только виртуальный адрес, достучаться до которого плата не сможет.

    Так что совсем без решений Франкенштейна обойтись не удалось. Я резервирую кусок физической памяти на компьютере для использования только нашим устройством, выполнив в командной строке от Администратора команду следующего вида:

    bcdedit /set removememory Х(Х — сколько мегабайт зарезервировать)

    Таким образом, последние мегабайты скрыты от Windows, и обращаться к ним Винда не может. Помимо гарантии отсутствия столкновений на memory bus, таким образом решается ещё несколько проблем, в частности, отсутствует нужда в синхронизации доступа, что лишает меня необходимости использовать долгие и медленные семафоры и мьютексы. (Синхронизацию между записью данных в память и чтением можно осуществлять по времени: пусть плата пишет в пять буферов с разницей в 200 микросекунд; зная, что в нулевой буфер она писала в целое число миллисекунд, я буду читать буферы с отставанием на один: в целую миллисекунду — четвёртый, в миллисекунду и 200 микросекунд — нулевой, в миллисекунду и четыреста микросекунд — первый, и так далее. Как синхронизировать время на уровне микросекунд между двумя устройствами — задача, при наличии канала связи между ними, решаемая).

  2. Драйвер, который будет читать данные из зарезервированной памяти, бежит строго на одном ядре. Для этого я меняю его привязку к процессору:

    /*
     * The ID of the PCI driver CPU core. Starting from 0.
     */
    static constexpr USHORT DRIVER_CPU_ID = 3;
    . . . .
    // Set the thread to run on specific processor
    KAFFINITY affinity = 1ULL << (DRIVER_CPU_ID);
    KeSetSystemAffinityThread(affinity);

    — и поднимаю его приоритет, но не до высшего, а до того, который чуть ниже. На самом высшем приоритете некоторые системные функции не работают, и критические системные задачи, которые бегут на таком же приоритете, не будут выполняться:

    // Set the thread priority to the highest available -1
    // Тhe "-1" is because running for a long time in HIGH_PRIORITY
    // "starves" important system tasks which run in HIGH_PRIORTY
    KeSetPriorityThread(PsGetCurrentThread(), HIGH_PRIORITY - 1);
  3. Но этого недостаточно. Нужно не только чтобы этот процесс бежал на одном ядре, но и чтобы никакой другой процесс на этом ядре не бежал. Для этого я поднимаю приоритет прерываний, которые могут прервать выполнение моего процесса (KIRQL), до максимального (DISPATCH_LEVEL):

    KIRQL oldIrql;
    KeRaiseIrql(DISPATCH_LEVEL, &oldIrql);

    Однако процесс не может всё время бежать с запретом на любые прерывания, Винда за этим строго следит и может наглеца прибить. Поэтому периодически я понижаю приоритет прерываний, которым разрешаю свой процесс, гхм, прерывать. Чисто формально, но всё-таки:

    // It's important that we don't stay at DISPATCH_LEVEL for too long
    // so we record the last tick we were at passive, and every once in
    // a while lower the KIRQL
    static constexpr ULONG64 MS_ALLOWED = 50;
    LARGE_INTEGER freq{};
    LONGLONG lastPassiveTick = 0;
    . . . . . .
    KeQueryPerformanceCounter(&freq);
    timePassed = ((KeQueryPerformanceCounter(nullptr).QuadPart - 
                                lastPassiveTick) * 1000ULL) / freq.QuadPart;
    if (timePassed >= MS_ALLOWED) {
        yieldProcessor();
        lastTickAtPassive = KeQueryPerformanceCounter(nullptr).QuadPart;
    }
    
    /* Yield Processor means lowering to PASSIVE_LEVEL and then raising back
     * to DISPATCH_LEVEL. It allows other important tasks to run in between,
     * if they are fast enough.
     */
    void yieldProcessor() {
        KIRQL oldIrql;
        KeLowerIrql(PASSIVE_LEVEL);
        KeRaiseIrql(DISPATCH_LEVEL, &oldIrql);
    }
  4. А теперь самое весёлое.

    При инициализации драйвера я прохожу по всем имеющимся в операционной системе процессам и меняю их привязку к процессору:

namespace accelerator {
	class IAccelerator {
	public:
		explicit IAccelerator() = default;
		virtual void revert() = 0;
		virtual void accelerate() = 0;
		virtual ~IAccelerator() = default;
	};
}


namespace accelerator {

const std::vector<std::wstring> DEFAULT_BLACKLIST_PROCESSES = {
	L"system",
	L"system.exe",
	L"winlogon.exe"
};

class AffinitySetter : public IAccelerator {
public:
	/**
	 * Sets the processor affinity of all processes.
	 *
	 * Affinity is reset upon reseting the computer.
	 *
	 * @param activeCpuIdentifiers The cpu identifiers which should NOT be used by any process.
	 * @param blacklistProcesses A list of processes that should not be altered.
	 *
	 */
	explicit AffinitySetter(std::vector<uint8_t> activeCpuIdentifiers,
							std::vector<std::wstring> blacklistProcesses = DEFAULT_BLACKLIST_PROCESSES);
	virtual void revert();
	virtual void accelerate();
	virtual ~AffinitySetter() = default;
private:
	ULONG_PTR getAffinityMaskWithoutBlackList(ULONG_PTR maskLimit);
	std::vector<uint8_t> m_activeCpuIdentifiers;
	std::vector<std::wstring> m_blacklistProcesses;
  };
}

. . . . . . .
std::vector<std::unique_ptr<accelerator::IAccelerator>> accelerators;
auto affinitySetter = std::make_unique<accelerator::AffinitySetter>(
    std::vector<uint8_t>({ DRIVER_CPU_ID }));
accelerators.push_back(std::move(affinitySetter));
for (auto& accelerator : accelerators) {
	  accelerator->accelerate();
}
  1. Но и это ещё не всё. Мало позаботиться о тех процессах, которые уже есть, надо ещё позаботиться о тех, которые пользователь создаcт в будущем. Для этого я регистрирую два системных коллбэка, на создание процесса и на создание потока; они вызываются для каждого нового процесса и потока, и я меняю их привязку к процессору:

    /*
     * We want to keep this core to ourself, so register a callback for each
     * process and thread created. At this callback we change their affinity
     * (the core they can run on) to be different from our core
     */
    if (!NT_SUCCESS(PsSetCreateProcessNotifyRoutine(newProcessCreated, FALSE))) {
        DEBUG_TRACE("PsCreateProcessNotifyRoutine failed");
        COMPLETE_IRP(Irp, STATUS_UNSUCCESSFUL);
    }
    FINALLY([&guardActivator]() {
        if (guardActivator) {
            PsSetCreateProcessNotifyRoutine(newProcessCreated, TRUE);
        }
    });
    
    if (!NT_SUCCESS(PsSetCreateThreadNotifyRoutine(newThreadCreated))) {
        DEBUG_TRACE("PsCreateProcessNotifyRoutine failed");
        COMPLETE_IRP(Irp, STATUS_UNSUCCESSFUL);
    }
    FINALLY([&guardActivator]() {
        if (guardActivator) {
            PsRemoveCreateThreadNotifyRoutine(newThreadCreated);
        }
    });
    . . . . . .
    
    void newProcessCreated(
        HANDLE ParentId,
        HANDLE ProcessId,
        BOOLEAN Create
    )
    {
        UNREFERENCED_PARAMETER(ParentId);
        if (Create) {
            KAFFINITY affinity = ~((1ULL << (DRIVER_CPU_ID)));
            KAFFINITY maximumAffinity = KeQueryActiveProcessors();
            affinity &= maximumAffinity;
    
            // Get process handle by id
            HANDLE processHandle;
            OBJECT_ATTRIBUTES objectAttributes{ 0 };
            InitializeObjectAttributes(&objectAttributes, NULL, OBJ_KERNEL_HANDLE, NULL, NULL);
            CLIENT_ID clientid{ 0 };
            clientid.UniqueProcess = ProcessId;
            auto status = ZwOpenProcess(&processHandle, GENERIC_ALL, &objectAttributes, &clientid);
            if (!NT_SUCCESS(status)) {
                DEBUG_TRACE("ZwOpenProcess failed getting process for pid %d with status %d", ProcessId, status);
                return;
            }
            FINALLY([&processHandle]() {
                ZwClose(processHandle);
            });
    
            // Set the process affinity by handle
            DEBUG_TRACE("Will set process affinity: %d for process: %d", affinity, ProcessId);
    
            if (affinity) {
                status = ZwSetInformationProcess(processHandle, ProcessAffinityMask, &affinity, sizeof(affinity));
                if (!NT_SUCCESS(status)) {
                    DEBUG_TRACE("ZwSetInformationProcess failed getting process affinity for pid %d with status %d", ProcessId, status);
                    return;
                }
            }
    
        }
    }
    
    void newThreadCreated(
        HANDLE ProcessId,
        HANDLE ThreadId,
        BOOLEAN Create
    )
    {
        if (Create) {
            // Thread affinity should eventually be all cpus except our own.
            KAFFINITY affinity = ~((1ULL << (DRIVER_CPU_ID)));
            KAFFINITY maximumAffinity = KeQueryActiveProcessors();
            affinity &= maximumAffinity;
    
            // Get process handle by id
            HANDLE processHandle;
            OBJECT_ATTRIBUTES objectAttributes{ 0 };
            InitializeObjectAttributes(&objectAttributes, NULL, OBJ_KERNEL_HANDLE, NULL, NULL);
            CLIENT_ID clientid{ 0 };
            clientid.UniqueProcess = ProcessId;
            auto status = ZwOpenProcess(&processHandle, GENERIC_READ, &objectAttributes, &clientid);
            if (!NT_SUCCESS(status)) {
                DEBUG_TRACE("ZwOpenProcess failed getting process for pid %d with status %d", ProcessId, status);
                return;
            }
            FINALLY([&processHandle]() {
                ZwClose(processHandle);
            });
    
            // Get the process affinity by handle
            PROCESS_BASIC_INFORMATION processInformation;
            ULONG returnLength;
            status = ZwQueryInformationProcess(processHandle, ProcessBasicInformation, &processInformation, sizeof(processInformation), &returnLength);
            if (!NT_SUCCESS(status)) {
                DEBUG_TRACE("ZwQueryInformationProcess failed getting process for pid %d with status %d", ProcessId, status);
                return;
            }
    
            // Reduce affinity to by subset of process
            affinity &= processInformation.AffinityMask;
    
            // Get thread handle by id
            HANDLE threadHandle;
            objectAttributes = { 0 };
            InitializeObjectAttributes(&objectAttributes, NULL, OBJ_KERNEL_HANDLE, NULL, NULL);
            clientid = { 0 };
            clientid.UniqueThread = ThreadId;
            status = ZwOpenThread(&threadHandle, GENERIC_ALL, &objectAttributes, &clientid);
            if (!NT_SUCCESS(status)) {
                DEBUG_TRACE("ZwOpenThread failed getting thread for tid %d with status %d", ProcessId, status);
                return;
            }
            FINALLY([&threadHandle]() {
                ZwClose(threadHandle);
            });
    
            // Set the thread affinity by handle
            DEBUG_TRACE("Will set thread affinity: %d for thread: %d", affinity, ThreadId);
    
            if (affinity) {
                status = ZwSetInformationThread(threadHandle, ThreadAffinityMask, &affinity, sizeof(affinity));
                if (!NT_SUCCESS(status)) {
                    DEBUG_TRACE("ZwSetInformationThread failed getting thread affinity for tid %d with status %d", ProcessId, status);
                    return;
                }
            }
        }
    }

    Надо только не забыть убрать эти коллбэки при окончании работы.

Заключение

По факту, я реализовал систему, работающую в реальном времени, внутри Windows. Техника, в общем-то, такая же, как у коммерческих решений типа вышеупомянутой On Time: я забираю под свои цели ядро и часть памяти и не позволяю Windows добираться до них и мешать мне. Но есть и отличие: моё решение работает внутри Windows, в пространстве ядра, и позволяет пользоваться всеми преимуществами операционной системы. Я не ограничен в общении с остальными программами операционной системы и могу использовать весь набор средств для межпроцессного взаимодействия. Более того, я могу вернуть занятое драйвером ядро обратно Windows в любой момент, достаточно только убрать мои коллбэки и пройтись по процессам, исправляя их привязки.

Время обработки одного пакета данных при таких условиях не превышает 155 микросекунд, включая добавление заголовков к каждому пакету данных. Затем данные передаются из зарезервированной памяти в программу обработки, которая уже заботится о передаче данных в GPU, занимается показом всего этого богатства на экране и сохранением на жёсткий диск. Время передачи данных из платы в память компьютера здесь не учитывается, потому что я начинаю работать только после того, как данные окажутся в памяти.