Привет. Сейчас я расскажу вам о механизме IO Completion Ports в Windows. Разработчики описывают порт завершения как «средство, повышающее производительность приложений, часто использующих операции ввода/вывода». В общем-то, они и не врут, поэтому IOCP часто используют при написании масштабируемых серверных приложений. Однако же считается, что порт завершения — тема мудреная и тяжелая для понимания.
Теория.
Объект «порт», по сути, представляет собой очередь событий ядра, из которой извлекаются и в которую добавляются сообщения об операциях ввода/вывода. Естественно туда добавляются не все текущие операции, а только те, которые мы указали порту. Делается это путем связывания дескриптора (хендла) файла (не обязательно именно файла на диске, это может быть сокет, пайп, мэилслот и т.д.) с дескриптором порта. Когда над файлом инициируется асинхронная операция ввода/вывода, то после ее завершения соответствующая запись добавляется в порт.
Для обработки результатов используется пул потоков, количество которых выбирается пользователем. Когда поток присоединяют к пулу, он извлекает из очереди один результат операции и обрабатывает его. Если на момент присоединения очередь пуста, то поток засыпает до тех пор, пока не появится сообщение для обработки. К интересной особенности порта завершения можно отнести то, что в очередь можно «руками» положить какое-то сообщение, чтобы потом его извлечь.
Выглядит запутанно? На практике несколько проще.
Реализация.
После описания схемы работы перейдем к более конкретным вещам. А именно реализации приложения, использующего IOCP. В качестве примера я буду использовать сервер, который просто принимает входящие подключения и пакеты от клиентов. Используемый язык — С.
Итак, для начала было бы неплохо этот самый порт создать. Делается это APIшкой
Что примечательно, этот же вызов используется для связывания хендла файла с уже существующим портом. Зачем так было делать — неизвестно.
Строчка
создаст новый объект порта завершения и вернет нам его хендл. Здесь в качестве первого аргумента передается значение INVALID_HANDLE_VALUE, которое и означает то, что нам нужен новый порт. Следующие два аргумента должны иметь значение 0. При создании можно указать сколько потоков одновременно смогут работать для этого порта с помощью последнего аргумента. Если указать 0, то будет использовано значение, равное количеству процессоров в системе.
Следующим шагом является создание потоков, которые будут задействованы в пуле. Тут нельзя дать универсального совета. Некоторые говорят, что потоков должно быть в два раза больше процессоров в системе, другие, что их количество должно быть равно, третьи динамически изменяют размеры пула. Как тут поступить зависит от приложения и конфигурации компьютера. У меня старенький пенек с HyperThreading, поэтому система видит мой процессор как два. По этой причине в моем примере будет 2 рабочих потока в пуле.
Обращаю ваше внимание: мы передаем рабочим потокам хендл порта завершения в качестве параметра. Он понадобится им, когда потоки будут заявлять о своей готовности поработать. Сама функция WorkingThread() будет приведена ниже.
Теперь, когда потоки созданы, можно начинать принимать клиенты и их сообщения. Код инициализации Winsock я здесь приводить не буду (но он есть в тексте исходников этой статьи), поэтому просто напишу:
Вызов accept возвращает нам сокет очередного клиента, в который можно писать и из которого можно читать как из обычного файла. В вашем случае тут может быть файл на диске или любой другой IO-объект.
Теперь мы должны оповестить порт завершения о том, что хотим, чтобы он наблюдал за этим сокетом. Для этого связываем дескрипторы сокета и порта:
Последний аргумент в данном случае игнорируется, поскольку порт уже создан. А вот предпоследний требует особого разбирательства. В прототипе он значится как CompletionKey («ключ завершения»). Фактически ключ является указателем на любые данные, т.е. на структуру или экземпляр класса, определенные вами. Он используется для того, чтобы внутри потока вы могли отличить одну операцию от другой или же для хранения состояния того или иного клиента. Как минимум вам придется хранить там байт, показывающий какая операция завершилась — отправки или приема (чтения или записи).
После связывания дескрипторов можно инициировать асинхронный ввод/вывод. Для сокетов используются функции Winsock2 — WSASend() и WSARecv() с переданным туда указателем на структуру OVERLAPPED, которая собственно и знаменует собой асинхронную операцию. Для файлов можно использовать функции WriteFile() и ReadFile() соответственно.
Помимо структуры OVERLAPPED вам потребуется передавать в поток еще кое-какую IO-информацию — например, адрес буфера, его длину или даже сам буфер. Это можно делать либо через ключ завершения, либо создать структуру, содержащую OVERLAPPED первым полем и передавать указатель на нее в WSASend()/WSARecv().
Теперь рассмотрим функцию API, которая присоединяет вызывающий ее поток к пулу:
Здесь CompletionPort — хендл порта, к пулу которого следует подключиться; lpNumberOfBytes — указатель на переменнную, в которую запишется количество переданных байт в результате завершения операции (фактически это возвращаемое значение recv() и send() в синхронном режиме); lpCompletionKey — указатель на переменную, в которую запишется указатель на ключ завершения; lpOverlapped — указатель на OVERLAPPED, ассоциированную с этой IO-транзакцией; наконец, dwMilliseconds — время, на которое поток может уснуть в ожидании завершения какого-либо запроса. Если указать INFINITE, то будет ждать вечно.
Теперь, когда мы познакомились с функцией извлечения из очереди можно взглянуть на функцию, с которой начинают выполнение рабочие потоки.
Внутри switch'а вызываются новые асинхронные операции, которые будут обработаны при следующем прохождении цикла. Если мы не хотим, чтобы определенная операция по завершению не была передана в порт (например, когда нам не важен результат), можно использовать следующий трюк — установить первый бит поля OVERLAPPED.hEvent равным 1. Стоит отметить что, с точки зрения производительности, помещать обработку пришедшей информации в этот же цикл не самое разумное решение, т.к. это замедлит реакцию сервера на входящие пакеты. Чтобы решить проблему можно вынести разбор прочитанной информации в еще один отдельный поток, и тут нам пригодится третья API-функция:
Суть ее понятна из названия — она помещает в очередь порта сообщение. Собственно все асинхронные функции по завершении операции незаметно ее и вызывают. Все аргументы перечисленные здесь сразу же передаются одному из потоков. Благодаря PostQueuedCompletionStatus порт завершения можно использовать не только для обработки IO-операций, но и просто для эффективной организации очереди с пулом потоков.
В нашем примере имеет смысл создать еще один порт и после завершения какой-нибудь операции вызывать PostQueuedCompletionStatus(), передавая в ключе принятый пакет на обработку в другом потоке.
Внутреннее устройство.
Порт завершения представляет собой следующую структуру:
Как уже отмечалось выше, это просто очередь событий ядра. Вот описание структуры KQUEUE:
При создании порта функцией CreateIoCompletionPort вызывается внутренний сервис NtCreateIoCompletion. Затем происходит его инициализация с помощью функции KeInitializeQueue. Когда происходит связывание порта с объектом «файл», Win32-функция CreateIoCompletionPort вызывает NtSetInformationFile.
Для этой функции FILE_INFORMATION_CLASS устанавливается как FileCompletionInformation, а в качестве параметра FileInformation передается указатель на структуру IO_COMPLETION_CONTEXT или FILE_COMPLETION_INFORMATION.
После завершения асинхронной операции ввода/вывода для ассоциированного файла диспетчер ввода/вывода создает пакет запроса из структуры OVERLAPPED и ключа завершения и помещает его в очередь с помощью вызова KeInsertQueue. Когда поток вызывает функцию GetQueuedCompletionStatus, на самом деле вызывается функция NtRemoveIoCompletion. NtRemoveIoCompletion проверяет параметры и вызывает функцию KeRemoveQueue, которая блокирует поток, если в очереди отсутствуют запросы, или поле CurrentCount структуры KQUEUE больше или равно MaximumCount. Если запросы есть, и число активных потоков меньше максимального, KeRemoveQueue удаляет вызвавший ее поток из очереди ожидающих потоков и увеличивает число активных потоков на 1. При занесении потока в очередь ожидающих потоков поле Queue структуры KTHREAD устанавливается равным адресу порта завершения. Когда запрос помещается в порт завершения функцией PostQueuedCompletionStatus, на самом деле вызывается функция NtSetIoCompletion, которая после проверки параметров и преобразования хендла порта в указатель, вызывает KeInsertQueue.
На этом у меня все. Ниже ссылка на программу с примером использования.
iocp.c
Теория.
Объект «порт», по сути, представляет собой очередь событий ядра, из которой извлекаются и в которую добавляются сообщения об операциях ввода/вывода. Естественно туда добавляются не все текущие операции, а только те, которые мы указали порту. Делается это путем связывания дескриптора (хендла) файла (не обязательно именно файла на диске, это может быть сокет, пайп, мэилслот и т.д.) с дескриптором порта. Когда над файлом инициируется асинхронная операция ввода/вывода, то после ее завершения соответствующая запись добавляется в порт.
Для обработки результатов используется пул потоков, количество которых выбирается пользователем. Когда поток присоединяют к пулу, он извлекает из очереди один результат операции и обрабатывает его. Если на момент присоединения очередь пуста, то поток засыпает до тех пор, пока не появится сообщение для обработки. К интересной особенности порта завершения можно отнести то, что в очередь можно «руками» положить какое-то сообщение, чтобы потом его извлечь.
Выглядит запутанно? На практике несколько проще.
Реализация.
После описания схемы работы перейдем к более конкретным вещам. А именно реализации приложения, использующего IOCP. В качестве примера я буду использовать сервер, который просто принимает входящие подключения и пакеты от клиентов. Используемый язык — С.
Итак, для начала было бы неплохо этот самый порт создать. Делается это APIшкой
HANDLE CreateIOCompletionPort(
HANDLE FileHandle,
HANDLE ExistingCompletionPort,
ULONG_PTR CompletionKey,
DWORD NumberOfConcurrentThreads);
Что примечательно, этот же вызов используется для связывания хендла файла с уже существующим портом. Зачем так было делать — неизвестно.
Строчка
HANDLE iocp=CreateIoCompletionPort(INVALID_HANDLE_VALUE,0,0,0);
создаст новый объект порта завершения и вернет нам его хендл. Здесь в качестве первого аргумента передается значение INVALID_HANDLE_VALUE, которое и означает то, что нам нужен новый порт. Следующие два аргумента должны иметь значение 0. При создании можно указать сколько потоков одновременно смогут работать для этого порта с помощью последнего аргумента. Если указать 0, то будет использовано значение, равное количеству процессоров в системе.
Следующим шагом является создание потоков, которые будут задействованы в пуле. Тут нельзя дать универсального совета. Некоторые говорят, что потоков должно быть в два раза больше процессоров в системе, другие, что их количество должно быть равно, третьи динамически изменяют размеры пула. Как тут поступить зависит от приложения и конфигурации компьютера. У меня старенький пенек с HyperThreading, поэтому система видит мой процессор как два. По этой причине в моем примере будет 2 рабочих потока в пуле.
for(int i=1;i<=2;i++)
{
HANDLE hWorking=CreateThread(0,0,(LPTHREAD_START_ROUTINE)&WorkingThread,iocp,0,0);
CloseHandle(hWorking);
}
Обращаю ваше внимание: мы передаем рабочим потокам хендл порта завершения в качестве параметра. Он понадобится им, когда потоки будут заявлять о своей готовности поработать. Сама функция WorkingThread() будет приведена ниже.
Теперь, когда потоки созданы, можно начинать принимать клиенты и их сообщения. Код инициализации Winsock я здесь приводить не буду (но он есть в тексте исходников этой статьи), поэтому просто напишу:
while(1)
{
SOCKET clientsock=WSAAccept(listensock,(sockaddr *)&clientaddr,&clientsize,0,0);
...
}
Вызов accept возвращает нам сокет очередного клиента, в который можно писать и из которого можно читать как из обычного файла. В вашем случае тут может быть файл на диске или любой другой IO-объект.
Теперь мы должны оповестить порт завершения о том, что хотим, чтобы он наблюдал за этим сокетом. Для этого связываем дескрипторы сокета и порта:
CreateIoCompletionPort((HANDLE)clientsock,iocp,(ULONG_PTR)key,0);
Последний аргумент в данном случае игнорируется, поскольку порт уже создан. А вот предпоследний требует особого разбирательства. В прототипе он значится как CompletionKey («ключ завершения»). Фактически ключ является указателем на любые данные, т.е. на структуру или экземпляр класса, определенные вами. Он используется для того, чтобы внутри потока вы могли отличить одну операцию от другой или же для хранения состояния того или иного клиента. Как минимум вам придется хранить там байт, показывающий какая операция завершилась — отправки или приема (чтения или записи).
После связывания дескрипторов можно инициировать асинхронный ввод/вывод. Для сокетов используются функции Winsock2 — WSASend() и WSARecv() с переданным туда указателем на структуру OVERLAPPED, которая собственно и знаменует собой асинхронную операцию. Для файлов можно использовать функции WriteFile() и ReadFile() соответственно.
Помимо структуры OVERLAPPED вам потребуется передавать в поток еще кое-какую IO-информацию — например, адрес буфера, его длину или даже сам буфер. Это можно делать либо через ключ завершения, либо создать структуру, содержащую OVERLAPPED первым полем и передавать указатель на нее в WSASend()/WSARecv().
Теперь рассмотрим функцию API, которая присоединяет вызывающий ее поток к пулу:
BOOL GetQueuedCompletionStatus(
HANDLE CompletionPort,
LPDWORD lpNumberOfBytes,
PULONG_PTR lpCompletionKey,
LPOVERLAPPED *lpOverlapped,
DWORD dwMilliseconds);
Здесь CompletionPort — хендл порта, к пулу которого следует подключиться; lpNumberOfBytes — указатель на переменнную, в которую запишется количество переданных байт в результате завершения операции (фактически это возвращаемое значение recv() и send() в синхронном режиме); lpCompletionKey — указатель на переменную, в которую запишется указатель на ключ завершения; lpOverlapped — указатель на OVERLAPPED, ассоциированную с этой IO-транзакцией; наконец, dwMilliseconds — время, на которое поток может уснуть в ожидании завершения какого-либо запроса. Если указать INFINITE, то будет ждать вечно.
Теперь, когда мы познакомились с функцией извлечения из очереди можно взглянуть на функцию, с которой начинают выполнение рабочие потоки.
void WorkingThread(HANDLE iocp)
{
while(1)
{
if(!GetQueuedCompletionStatus(iocp,&bytes,&key,&overlapped,INFINITE))
//ошибка порта
break;
if(!bytes)
//0 означает что дескриптор файла закрыт, т.е. клиент отсоединился
switch(key->OpType)
{
...
}
}
}
Внутри switch'а вызываются новые асинхронные операции, которые будут обработаны при следующем прохождении цикла. Если мы не хотим, чтобы определенная операция по завершению не была передана в порт (например, когда нам не важен результат), можно использовать следующий трюк — установить первый бит поля OVERLAPPED.hEvent равным 1. Стоит отметить что, с точки зрения производительности, помещать обработку пришедшей информации в этот же цикл не самое разумное решение, т.к. это замедлит реакцию сервера на входящие пакеты. Чтобы решить проблему можно вынести разбор прочитанной информации в еще один отдельный поток, и тут нам пригодится третья API-функция:
BOOL PostQueuedCompletionStatus(
HANDLE CompletionPort,
DWORD dwNumberOfBytesTransferred,
ULONG_PTR dwCompletionKey,
LPOVERLAPPED lpOverlapped);
Суть ее понятна из названия — она помещает в очередь порта сообщение. Собственно все асинхронные функции по завершении операции незаметно ее и вызывают. Все аргументы перечисленные здесь сразу же передаются одному из потоков. Благодаря PostQueuedCompletionStatus порт завершения можно использовать не только для обработки IO-операций, но и просто для эффективной организации очереди с пулом потоков.
В нашем примере имеет смысл создать еще один порт и после завершения какой-нибудь операции вызывать PostQueuedCompletionStatus(), передавая в ключе принятый пакет на обработку в другом потоке.
Внутреннее устройство.
Порт завершения представляет собой следующую структуру:
typedef stuct _IO_COMPLETION
{
KQUEUE Queue;
} IO_COMPLETION;
Как уже отмечалось выше, это просто очередь событий ядра. Вот описание структуры KQUEUE:
typedef stuct _KQUEUE
{
DISPATCHER_HEADER Header;
LIST_ENTRY EnrtyListHead; //очередь пакетов
DWORD CurrentCount;
DWORD MaximumCount;
LIST_ENTRY ThreadListHead; //очередь ожидающих потоков
} KQUEUE;
При создании порта функцией CreateIoCompletionPort вызывается внутренний сервис NtCreateIoCompletion. Затем происходит его инициализация с помощью функции KeInitializeQueue. Когда происходит связывание порта с объектом «файл», Win32-функция CreateIoCompletionPort вызывает NtSetInformationFile.
NtSetInformationFile(
HANDLE FileHandle,
PIO_STATUS_BLOCK IoStatusBlock,
PVOID FileInformation,
ULONG Length,
FILE_INFORMATION_CLASS FileInformationClass);
Для этой функции FILE_INFORMATION_CLASS устанавливается как FileCompletionInformation, а в качестве параметра FileInformation передается указатель на структуру IO_COMPLETION_CONTEXT или FILE_COMPLETION_INFORMATION.
typedef struct _IO_COMPLETION_CONTEXT
{
PVOID Port;
PVOID Key;
} IO_COMPLETION_CONTEXT;
typedef struct _FILE_COMPLETION_INFORMATION
{
HANDLE IoCompletionHandle;
ULONG CompletionKey;
} FILE_COMPLETION_INFORMATION, *PFILE_COMPLETION_INFORMATION;
После завершения асинхронной операции ввода/вывода для ассоциированного файла диспетчер ввода/вывода создает пакет запроса из структуры OVERLAPPED и ключа завершения и помещает его в очередь с помощью вызова KeInsertQueue. Когда поток вызывает функцию GetQueuedCompletionStatus, на самом деле вызывается функция NtRemoveIoCompletion. NtRemoveIoCompletion проверяет параметры и вызывает функцию KeRemoveQueue, которая блокирует поток, если в очереди отсутствуют запросы, или поле CurrentCount структуры KQUEUE больше или равно MaximumCount. Если запросы есть, и число активных потоков меньше максимального, KeRemoveQueue удаляет вызвавший ее поток из очереди ожидающих потоков и увеличивает число активных потоков на 1. При занесении потока в очередь ожидающих потоков поле Queue структуры KTHREAD устанавливается равным адресу порта завершения. Когда запрос помещается в порт завершения функцией PostQueuedCompletionStatus, на самом деле вызывается функция NtSetIoCompletion, которая после проверки параметров и преобразования хендла порта в указатель, вызывает KeInsertQueue.
На этом у меня все. Ниже ссылка на программу с примером использования.
iocp.c