
Преамбула
Фирма, в которой я работаю, производит аппаратуру для нейрохирургов и нейрофизиологов, в основе которой лежит технология Deep Brain Stimulation. Если коротко, в живой мозг втыкается электрод, и нейрохирург может считывать из мозга сигнал или стимулировать клетки мозга разрядом тока. У технологии огромное будущее в деле лечения болезней (например, болезни Паркинсона, дистонии, эпилепсии) и в создании нейроинтерфейсов: нейропротезирование, в частности, восстановление зрения; аугментация мозга дополнительными устройствами, расширение возможностей мозга. Скажем, спецслужбы очень интересуются способом считывать и записывать информацию из зрительных и слуховых нервов, а также управлять движением животных, что позволит создать новый класс шпионов.
Для клинического применения в целях лечения тремора при болезни Паркинсона достаточно лишь нескольких внедряемых контактов (а некоторые нейрохирурги обходятся вообще одним). Но для исследователей, изучающих работу мозга, количество контактов имеет значение, и им нужно получать данные от как можно большего числа контактов одновременно и синхронно. Скажем, тысяча, или две тысячи внедрённых в мозг контактных площадок. Понятно, хотелось бы, чтобы и скорость была пристойной, — скажем, сорок тысяч замеров с каждого контакта в секунду. И чтоб разрешение было повыше, чтоб каждый замер был хотя бы в 32 бита, в формате float или Q. Получаем, что система производит порядка 320 мегабайт данных в секунду, так что всё это придётся обрабатывать.
Кроме считываемых непосредственно из мозга «чистых» данных, есть ещё данные отфильтрованные: результат применённых к замерам фильтров верхних и нижних частот. Используются фильтры высоких порядков, хотя бы четвёртого, реализованные в виде полиномов. Они применяются к каждому входящему замеру, увеличивая количество данных, о которых система должна заботиться, вчетверо, и поднимая количество генерируемых данных до 1,3 гигабайта в секунду. Но это уже не моя забота, потому что они генерируются из переданных мной данных после того, как я свою часть работы выполнил.
Результат всего этого счастья нужен в режиме реального времени и крайне важен. Пропускать нельзя ни одного замера, потому что основную работу по анализу данных исследователи выполняют после окончания эксперимента. Поэтому всё это богатст��о данных, помимо показа на экране, придётся записывать на жёсткий диск. Все 1,3 гигабайта данных в секунду. И потом читать в Matlab`е, NeuroExplorer`е или другой программе. Система, которая сохраняла 99,99999% данных, не прошла контроль качества и была забракована, потому что теряла до 13 тысяч замеров каждую секунду.
А вот теперь мы со всем этим попробуем взлететь.
Формулировка задачи
Имеется разработанная фирмой плата с контроллером FPGA, с одной стороны в которую воткнуты провода, идущие от мозга, (ну, на самом деле, от конвертеров, типа такого), а с другой есть выход PCIe. Этим выходом плата будет воткнута в порт PCIe на самом обычном, просто очень мощном, компьютере.
Мне предстояло создать драйвер, который получает данные потенциалов от этой нашей кастомной платы, обрабатывает на подключённой к тому же PCIe видеокарте (результат расчёта фильтров одного контакта не зависит от результатов другого; глупо не использовать для независимых расчётов процессор, который специально заточен на выполнение большого числа однотипных параллельных расчётов одновременно) и передаёт дальше в пользовательский интерфейс. И делать это надо очень-очень быстро, потому что новые пакеты с восемью замерами данных от каждого контакта приходят каждые 200 микросекунд. И, самое главное, сделать это надо под десятой Виндой, потому что нейрохирурги не знают и знать не хотят ничего, кроме Винды и Мака. Причём, судя по внешнему виду некоторых клиентов и адекватности высказываемых ими требований к программе, последнее слово предыдущего предложения можно писать с маленькой буквы.
Люди в теме уже сообразили, что речь идёт о hard realtime: гарантированный ответ на полученные данные в течение фиксированного времени, и неважно, что за дичь творится вокруг, без возможности задержаться или пропустить хотя бы один пакет. Эти же люди в теме уже покачали головой: творящаяся вокруг дичь — это Windows, hard realtime под Windows невозможен, Windows не операционная система реального времени. Более того, Windows не заточен под работу с квантами времени меньше миллисекунды, поэтому работа со скоростью «полный цикл обработки данных за 200 микросекунд» под Windows невозможна вдвойне.
Soft realtime отличается от hard realtime тем, что в soft иногда небольшие задержки всё-таки разрешены, при условии, что система от задержки очухается и успеет наверстать упущенное и разгрести данные, накопившиеся за время задержки, без потери производительности.
Существуют всякие расширения под Windows, которые позволяют частично имплементировать realtime. Например, операционные системы On Time, RTX64 от IntervalZero и прочие. Они сводятся к одной и той же идее: мы отбираем у Винды одно или несколько ядер и кусок памяти, делаем вид, что их в компьютере больше нет, и запускаем на них свою собственную операционку. После того, как этот монстр Франкенштейна раскочегарится и выйдет на рабочий режим, на компьютере будут работать одновременно две операционных системы: realtime и Windows. Между ними можно настроить общение. Это решение будет работать с двумя оговорками: во-первых, из-под Windows практически нет возможностей повлиять на то, что происходит внутри запущенной параллельно realtime ОС, (например, программы для неё надо компилировать при помощи проприетарной SDK; нельзя во время работы передать в неё свою собственную программу для обработки получаемых данных и запустить её), а во-вторых, стоимость этого решения, мягко говоря, неадекватна. Лицензия разработчика RTX64 стоит порядка 10 тысяч долларов, а за каждый экземпляр готового продукта, ушедшего клиенту (тому самому нейрохирургу), придётся заплатить ещё 500 долларов. Вдобавок к 600-долларовой лицензии на Винду, которую клиент тоже получит. Это выводит общую стоимость продукта из зоны конкурентоспособности и делает его финансово непривлекательным для потенциальных покупателей.
За десять тысяч долларов плюс неустановленное количество 500-долларовых лицензионных платежей я сам себе RTOS под Windows напишу, подумал я. И написал.
Применённые технические хитрости
Во-первых, нам нужно, чтобы как можно больше работы выполняла наша плата с FPGA на борту. Скажем, передачу данных лучше перевесить на неё: у неё DMA-контроллер точно ничем не будет занят, нет шансов, что, когда нам потребуется DMA-канал, Винда скажет нам в ответ «в очередь, линуксьи дети, в очередь!»
Как подключать FPGA к PCIe так, чтобы DMA писал данные, куда надо, это совсем-совсем отдельная тема, которая выходит за рамки данной статьи. Скажу только, что FPGA должен быть сконфигурирован как PCIe Endpoint, потому что компьютер остаётся Root Complex, — ему ведь ещё видеоадаптером управлять. При этом, раз DMA запускается платой, то и трансляция адресов должна выполняться на плате. И тут возникает вопрос: а куда плата будет писать? Изнутри Windows я могу работать только с виртуальными адресами. Даже если я выделю реальную память при помощи MmAllocateContiguousMemory, я получу только виртуальный адрес, достучаться до которого плата не сможет.
Так что совсем без решений Франкенштейна обойтись не удалось. Я резервирую кусок физической памяти на компьютере для использования только нашим устройством, выполнив в командной строке от Администратора команду следующего вида:
bcdedit /set removememory Х(Х — сколько мегабайт зарезервировать)Таким образом, последние мегабайты скрыты от Windows, и обращаться к ним Винда не может. Помимо гарантии отсутствия столкновений на memory bus, таким образом решается ещё несколько проблем, в частности, отсутствует нужда в синхронизации доступа, что лишает меня необходимости использовать долгие и медленные семафоры и мьютексы. (Синхронизацию между записью данных в память и чтением можно осуществлять по времени: пусть плата пишет в пять буферов с разницей в 200 микросекунд; зная, что в нулевой буфер она писала в целое число миллисекунд, я буду читать буферы с отставанием на один: в целую миллисекунду — четвёртый, в миллисекунду и 200 микросекунд — нулевой, в миллисекунду и четыреста микросекунд — первый, и так далее. Как синхронизировать время на уровне микросекунд между двумя устройствами — задача, при наличии канала связи между ними, решаемая).
Драйвер, который будет читать данные из зарезервированной памяти, бежит строго на одном ядре. Для этого я меняю его привязку к процессору:
/* * The ID of the PCI driver CPU core. Starting from 0. */ static constexpr USHORT DRIVER_CPU_ID = 3; . . . . // Set the thread to run on specific processor KAFFINITY affinity = 1ULL << (DRIVER_CPU_ID); KeSetSystemAffinityThread(affinity);— и поднимаю его приоритет, но не до высшего, а до того, который чуть ниже. На самом высшем приоритете некоторые системные функции не работают, и критические системные задачи, которые бегут на таком же приоритете, не будут выполняться:
// Set the thread priority to the highest available -1 // Тhe "-1" is because running for a long time in HIGH_PRIORITY // "starves" important system tasks which run in HIGH_PRIORTY KeSetPriorityThread(PsGetCurrentThread(), HIGH_PRIORITY - 1);Но этого недостаточно. Нужно не только чтобы этот процесс бежал на одном ядре, но и чтобы никакой другой процесс на этом ядре не бежал. Для этого я поднимаю приоритет прерываний, которые могут прервать выполнение моего процесса (
KIRQL), до максимального (DISPATCH_LEVEL):KIRQL oldIrql; KeRaiseIrql(DISPATCH_LEVEL, &oldIrql);Однако процесс не может всё время бежать с запретом на любые прерывания, Винда за этим строго следит и может наглеца прибить. Поэтому периодически я понижаю приоритет прерываний, которым разрешаю свой процесс, гхм, прерывать. Чисто формально, но всё-таки:
// It's important that we don't stay at DISPATCH_LEVEL for too long // so we record the last tick we were at passive, and every once in // a while lower the KIRQL static constexpr ULONG64 MS_ALLOWED = 50; LARGE_INTEGER freq{}; LONGLONG lastPassiveTick = 0; . . . . . . KeQueryPerformanceCounter(&freq); timePassed = ((KeQueryPerformanceCounter(nullptr).QuadPart - lastPassiveTick) * 1000ULL) / freq.QuadPart; if (timePassed >= MS_ALLOWED) { yieldProcessor(); lastTickAtPassive = KeQueryPerformanceCounter(nullptr).QuadPart; } /* Yield Processor means lowering to PASSIVE_LEVEL and then raising back * to DISPATCH_LEVEL. It allows other important tasks to run in between, * if they are fast enough. */ void yieldProcessor() { KIRQL oldIrql; KeLowerIrql(PASSIVE_LEVEL); KeRaiseIrql(DISPATCH_LEVEL, &oldIrql); }А теперь самое весёлое.
При инициализации драйвера я прохожу по всем имеющимся в операционной системе процессам и меняю их привязку к процессору:
namespace accelerator {
class IAccelerator {
public:
explicit IAccelerator() = default;
virtual void revert() = 0;
virtual void accelerate() = 0;
virtual ~IAccelerator() = default;
};
}
namespace accelerator {
const std::vector<std::wstring> DEFAULT_BLACKLIST_PROCESSES = {
L"system",
L"system.exe",
L"winlogon.exe"
};
class AffinitySetter : public IAccelerator {
public:
/**
* Sets the processor affinity of all processes.
*
* Affinity is reset upon reseting the computer.
*
* @param activeCpuIdentifiers The cpu identifiers which should NOT be used by any process.
* @param blacklistProcesses A list of processes that should not be altered.
*
*/
explicit AffinitySetter(std::vector<uint8_t> activeCpuIdentifiers,
std::vector<std::wstring> blacklistProcesses = DEFAULT_BLACKLIST_PROCESSES);
virtual void revert();
virtual void accelerate();
virtual ~AffinitySetter() = default;
private:
ULONG_PTR getAffinityMaskWithoutBlackList(ULONG_PTR maskLimit);
std::vector<uint8_t> m_activeCpuIdentifiers;
std::vector<std::wstring> m_blacklistProcesses;
};
}
. . . . . . .
std::vector<std::unique_ptr<accelerator::IAccelerator>> accelerators;
auto affinitySetter = std::make_unique<accelerator::AffinitySetter>(
std::vector<uint8_t>({ DRIVER_CPU_ID }));
accelerators.push_back(std::move(affinitySetter));
for (auto& accelerator : accelerators) {
accelerator->accelerate();
}Но и это ещё не всё. Мало позаботиться о тех процессах, которые уже есть, надо ещё позаботиться о тех, которые пользователь создаcт в будущем. Для этого я регистрирую два системных коллбэка, на создание процесса и на создание потока; они вызываются для каждого нового процесса и потока, и я меняю их привязку к процессору:
/* * We want to keep this core to ourself, so register a callback for each * process and thread created. At this callback we change their affinity * (the core they can run on) to be different from our core */ if (!NT_SUCCESS(PsSetCreateProcessNotifyRoutine(newProcessCreated, FALSE))) { DEBUG_TRACE("PsCreateProcessNotifyRoutine failed"); COMPLETE_IRP(Irp, STATUS_UNSUCCESSFUL); } FINALLY([&guardActivator]() { if (guardActivator) { PsSetCreateProcessNotifyRoutine(newProcessCreated, TRUE); } }); if (!NT_SUCCESS(PsSetCreateThreadNotifyRoutine(newThreadCreated))) { DEBUG_TRACE("PsCreateProcessNotifyRoutine failed"); COMPLETE_IRP(Irp, STATUS_UNSUCCESSFUL); } FINALLY([&guardActivator]() { if (guardActivator) { PsRemoveCreateThreadNotifyRoutine(newThreadCreated); } }); . . . . . . void newProcessCreated( HANDLE ParentId, HANDLE ProcessId, BOOLEAN Create ) { UNREFERENCED_PARAMETER(ParentId); if (Create) { KAFFINITY affinity = ~((1ULL << (DRIVER_CPU_ID))); KAFFINITY maximumAffinity = KeQueryActiveProcessors(); affinity &= maximumAffinity; // Get process handle by id HANDLE processHandle; OBJECT_ATTRIBUTES objectAttributes{ 0 }; InitializeObjectAttributes(&objectAttributes, NULL, OBJ_KERNEL_HANDLE, NULL, NULL); CLIENT_ID clientid{ 0 }; clientid.UniqueProcess = ProcessId; auto status = ZwOpenProcess(&processHandle, GENERIC_ALL, &objectAttributes, &clientid); if (!NT_SUCCESS(status)) { DEBUG_TRACE("ZwOpenProcess failed getting process for pid %d with status %d", ProcessId, status); return; } FINALLY([&processHandle]() { ZwClose(processHandle); }); // Set the process affinity by handle DEBUG_TRACE("Will set process affinity: %d for process: %d", affinity, ProcessId); if (affinity) { status = ZwSetInformationProcess(processHandle, ProcessAffinityMask, &affinity, sizeof(affinity)); if (!NT_SUCCESS(status)) { DEBUG_TRACE("ZwSetInformationProcess failed getting process affinity for pid %d with status %d", ProcessId, status); return; } } } } void newThreadCreated( HANDLE ProcessId, HANDLE ThreadId, BOOLEAN Create ) { if (Create) { // Thread affinity should eventually be all cpus except our own. KAFFINITY affinity = ~((1ULL << (DRIVER_CPU_ID))); KAFFINITY maximumAffinity = KeQueryActiveProcessors(); affinity &= maximumAffinity; // Get process handle by id HANDLE processHandle; OBJECT_ATTRIBUTES objectAttributes{ 0 }; InitializeObjectAttributes(&objectAttributes, NULL, OBJ_KERNEL_HANDLE, NULL, NULL); CLIENT_ID clientid{ 0 }; clientid.UniqueProcess = ProcessId; auto status = ZwOpenProcess(&processHandle, GENERIC_READ, &objectAttributes, &clientid); if (!NT_SUCCESS(status)) { DEBUG_TRACE("ZwOpenProcess failed getting process for pid %d with status %d", ProcessId, status); return; } FINALLY([&processHandle]() { ZwClose(processHandle); }); // Get the process affinity by handle PROCESS_BASIC_INFORMATION processInformation; ULONG returnLength; status = ZwQueryInformationProcess(processHandle, ProcessBasicInformation, &processInformation, sizeof(processInformation), &returnLength); if (!NT_SUCCESS(status)) { DEBUG_TRACE("ZwQueryInformationProcess failed getting process for pid %d with status %d", ProcessId, status); return; } // Reduce affinity to by subset of process affinity &= processInformation.AffinityMask; // Get thread handle by id HANDLE threadHandle; objectAttributes = { 0 }; InitializeObjectAttributes(&objectAttributes, NULL, OBJ_KERNEL_HANDLE, NULL, NULL); clientid = { 0 }; clientid.UniqueThread = ThreadId; status = ZwOpenThread(&threadHandle, GENERIC_ALL, &objectAttributes, &clientid); if (!NT_SUCCESS(status)) { DEBUG_TRACE("ZwOpenThread failed getting thread for tid %d with status %d", ProcessId, status); return; } FINALLY([&threadHandle]() { ZwClose(threadHandle); }); // Set the thread affinity by handle DEBUG_TRACE("Will set thread affinity: %d for thread: %d", affinity, ThreadId); if (affinity) { status = ZwSetInformationThread(threadHandle, ThreadAffinityMask, &affinity, sizeof(affinity)); if (!NT_SUCCESS(status)) { DEBUG_TRACE("ZwSetInformationThread failed getting thread affinity for tid %d with status %d", ProcessId, status); return; } } } }Надо только не забыть убрать эти коллбэки при окончании работы.
Заключение
По факту, я реализовал систему, работающую в реальном времени, внутри Windows. Техника, в общем-то, такая же, как у коммерческих решений типа вышеупомянутой On Time: я забираю под свои цели ядро и часть памяти и не позволяю Windows добираться до них и мешать мне. Но есть и отличие: моё решение работает внутри Windows, в пространстве ядра, и позволяет пользоваться всеми преимуществами операционной системы. Я не ограничен в общении с остальными программами операционной системы и могу использовать весь набор средств для межпроцессного взаимодействия. Более того, я могу вернуть занятое драйвером ядро обратно Windows в любой момент, достаточно только убрать мои коллбэки и пройтись по процессам, исправляя их привязки.
Время обработки одного пакета данных при таких условиях не превышает 155 микросекунд, включая добавление заголовков к каждому пакету данных. Затем данные передаются из зарезервированной памяти в программу обработки, которая уже заботится о передаче данных в GPU, занимается показом всего этого богатства на экране и сохранением на жёсткий диск. Время передачи данных из платы в память компьютера здесь не учитывается, потому что я начинаю работать только после того, как данные окажутся в памяти.
