Порт завершения (Completion Port)

    Привет. Сейчас я расскажу вам о механизме IO Completion Ports в Windows. Разработчики описывают порт завершения как «средство, повышающее производительность приложений, часто использующих операции ввода/вывода». В общем-то, они и не врут, поэтому IOCP часто используют при написании масштабируемых серверных приложений. Однако же считается, что порт завершения — тема мудреная и тяжелая для понимания.

    Теория.

    Объект «порт», по сути, представляет собой очередь событий ядра, из которой извлекаются и в которую добавляются сообщения об операциях ввода/вывода. Естественно туда добавляются не все текущие операции, а только те, которые мы указали порту. Делается это путем связывания дескриптора (хендла) файла (не обязательно именно файла на диске, это может быть сокет, пайп, мэилслот и т.д.) с дескриптором порта. Когда над файлом инициируется асинхронная операция ввода/вывода, то после ее завершения соответствующая запись добавляется в порт.
    Для обработки результатов используется пул потоков, количество которых выбирается пользователем. Когда поток присоединяют к пулу, он извлекает из очереди один результат операции и обрабатывает его. Если на момент присоединения очередь пуста, то поток засыпает до тех пор, пока не появится сообщение для обработки. К интересной особенности порта завершения можно отнести то, что в очередь можно «руками» положить какое-то сообщение, чтобы потом его извлечь.
    Выглядит запутанно? На практике несколько проще.

    Реализация.

    После описания схемы работы перейдем к более конкретным вещам. А именно реализации приложения, использующего IOCP. В качестве примера я буду использовать сервер, который просто принимает входящие подключения и пакеты от клиентов. Используемый язык — С.

    Итак, для начала было бы неплохо этот самый порт создать. Делается это APIшкой

    HANDLE CreateIOCompletionPort(
    HANDLE FileHandle,
    HANDLE ExistingCompletionPort,
    ULONG_PTR CompletionKey,
    DWORD NumberOfConcurrentThreads);


    Что примечательно, этот же вызов используется для связывания хендла файла с уже существующим портом. Зачем так было делать — неизвестно.
    Строчка
    HANDLE iocp=CreateIoCompletionPort(INVALID_HANDLE_VALUE,0,0,0);
    создаст новый объект порта завершения и вернет нам его хендл. Здесь в качестве первого аргумента передается значение INVALID_HANDLE_VALUE, которое и означает то, что нам нужен новый порт. Следующие два аргумента должны иметь значение 0. При создании можно указать сколько потоков одновременно смогут работать для этого порта с помощью последнего аргумента. Если указать 0, то будет использовано значение, равное количеству процессоров в системе.

    Следующим шагом является создание потоков, которые будут задействованы в пуле. Тут нельзя дать универсального совета. Некоторые говорят, что потоков должно быть в два раза больше процессоров в системе, другие, что их количество должно быть равно, третьи динамически изменяют размеры пула. Как тут поступить зависит от приложения и конфигурации компьютера. У меня старенький пенек с HyperThreading, поэтому система видит мой процессор как два. По этой причине в моем примере будет 2 рабочих потока в пуле.

    for(int i=1;i<=2;i++)
    {
    HANDLE hWorking=CreateThread(0,0,(LPTHREAD_START_ROUTINE)&WorkingThread,iocp,0,0);
    CloseHandle(hWorking);
    }


    Обращаю ваше внимание: мы передаем рабочим потокам хендл порта завершения в качестве параметра. Он понадобится им, когда потоки будут заявлять о своей готовности поработать. Сама функция WorkingThread() будет приведена ниже.

    Теперь, когда потоки созданы, можно начинать принимать клиенты и их сообщения. Код инициализации Winsock я здесь приводить не буду (но он есть в тексте исходников этой статьи), поэтому просто напишу:

    while(1)
    {
    SOCKET clientsock=WSAAccept(listensock,(sockaddr *)&clientaddr,&clientsize,0,0);
    ...
    }


    Вызов accept возвращает нам сокет очередного клиента, в который можно писать и из которого можно читать как из обычного файла. В вашем случае тут может быть файл на диске или любой другой IO-объект.
    Теперь мы должны оповестить порт завершения о том, что хотим, чтобы он наблюдал за этим сокетом. Для этого связываем дескрипторы сокета и порта:

    CreateIoCompletionPort((HANDLE)clientsock,iocp,(ULONG_PTR)key,0);

    Последний аргумент в данном случае игнорируется, поскольку порт уже создан. А вот предпоследний требует особого разбирательства. В прототипе он значится как CompletionKey («ключ завершения»). Фактически ключ является указателем на любые данные, т.е. на структуру или экземпляр класса, определенные вами. Он используется для того, чтобы внутри потока вы могли отличить одну операцию от другой или же для хранения состояния того или иного клиента. Как минимум вам придется хранить там байт, показывающий какая операция завершилась — отправки или приема (чтения или записи).

    После связывания дескрипторов можно инициировать асинхронный ввод/вывод. Для сокетов используются функции Winsock2 — WSASend() и WSARecv() с переданным туда указателем на структуру OVERLAPPED, которая собственно и знаменует собой асинхронную операцию. Для файлов можно использовать функции WriteFile() и ReadFile() соответственно.
    Помимо структуры OVERLAPPED вам потребуется передавать в поток еще кое-какую IO-информацию — например, адрес буфера, его длину или даже сам буфер. Это можно делать либо через ключ завершения, либо создать структуру, содержащую OVERLAPPED первым полем и передавать указатель на нее в WSASend()/WSARecv().

    Теперь рассмотрим функцию API, которая присоединяет вызывающий ее поток к пулу:

    BOOL GetQueuedCompletionStatus(
    HANDLE CompletionPort,
    LPDWORD lpNumberOfBytes,
    PULONG_PTR lpCompletionKey,
    LPOVERLAPPED *lpOverlapped,
    DWORD dwMilliseconds);


    Здесь CompletionPort — хендл порта, к пулу которого следует подключиться; lpNumberOfBytes — указатель на переменнную, в которую запишется количество переданных байт в результате завершения операции (фактически это возвращаемое значение recv() и send() в синхронном режиме); lpCompletionKey — указатель на переменную, в которую запишется указатель на ключ завершения; lpOverlapped — указатель на OVERLAPPED, ассоциированную с этой IO-транзакцией; наконец, dwMilliseconds — время, на которое поток может уснуть в ожидании завершения какого-либо запроса. Если указать INFINITE, то будет ждать вечно.

    Теперь, когда мы познакомились с функцией извлечения из очереди можно взглянуть на функцию, с которой начинают выполнение рабочие потоки.

    void WorkingThread(HANDLE iocp)
    {
    while(1)
    {
    if(!GetQueuedCompletionStatus(iocp,&bytes,&key,&overlapped,INFINITE))
    //ошибка порта
    break;
    if(!bytes)
    //0 означает что дескриптор файла закрыт, т.е. клиент отсоединился
    switch(key->OpType)
    {
    ...
    }
    }
    }


    Внутри switch'а вызываются новые асинхронные операции, которые будут обработаны при следующем прохождении цикла. Если мы не хотим, чтобы определенная операция по завершению не была передана в порт (например, когда нам не важен результат), можно использовать следующий трюк — установить первый бит поля OVERLAPPED.hEvent равным 1. Стоит отметить что, с точки зрения производительности, помещать обработку пришедшей информации в этот же цикл не самое разумное решение, т.к. это замедлит реакцию сервера на входящие пакеты. Чтобы решить проблему можно вынести разбор прочитанной информации в еще один отдельный поток, и тут нам пригодится третья API-функция:

    BOOL PostQueuedCompletionStatus(
    HANDLE CompletionPort,
    DWORD dwNumberOfBytesTransferred,
    ULONG_PTR dwCompletionKey,
    LPOVERLAPPED lpOverlapped);


    Суть ее понятна из названия — она помещает в очередь порта сообщение. Собственно все асинхронные функции по завершении операции незаметно ее и вызывают. Все аргументы перечисленные здесь сразу же передаются одному из потоков. Благодаря PostQueuedCompletionStatus порт завершения можно использовать не только для обработки IO-операций, но и просто для эффективной организации очереди с пулом потоков.
    В нашем примере имеет смысл создать еще один порт и после завершения какой-нибудь операции вызывать PostQueuedCompletionStatus(), передавая в ключе принятый пакет на обработку в другом потоке.

    Внутреннее устройство.
    Порт завершения представляет собой следующую структуру:

    typedef stuct _IO_COMPLETION
    {
    KQUEUE Queue;
    } IO_COMPLETION;


    Как уже отмечалось выше, это просто очередь событий ядра. Вот описание структуры KQUEUE:

    typedef stuct _KQUEUE
    {
    DISPATCHER_HEADER Header;
    LIST_ENTRY EnrtyListHead; //очередь пакетов
    DWORD CurrentCount;
    DWORD MaximumCount;
    LIST_ENTRY ThreadListHead; //очередь ожидающих потоков
    } KQUEUE;


    При создании порта функцией CreateIoCompletionPort вызывается внутренний сервис NtCreateIoCompletion. Затем происходит его инициализация с помощью функции KeInitializeQueue. Когда происходит связывание порта с объектом «файл», Win32-функция CreateIoCompletionPort вызывает NtSetInformationFile.

    NtSetInformationFile(
    HANDLE FileHandle,
    PIO_STATUS_BLOCK IoStatusBlock,
    PVOID FileInformation,
    ULONG Length,
    FILE_INFORMATION_CLASS FileInformationClass);


    Для этой функции FILE_INFORMATION_CLASS устанавливается как FileCompletionInformation, а в качестве параметра FileInformation передается указатель на структуру IO_COMPLETION_CONTEXT или FILE_COMPLETION_INFORMATION.

    typedef struct _IO_COMPLETION_CONTEXT
    {
    PVOID Port;
    PVOID Key;
    } IO_COMPLETION_CONTEXT;

    typedef struct _FILE_COMPLETION_INFORMATION
    {
    HANDLE IoCompletionHandle;
    ULONG CompletionKey;
    } FILE_COMPLETION_INFORMATION, *PFILE_COMPLETION_INFORMATION;


    После завершения асинхронной операции ввода/вывода для ассоциированного файла диспетчер ввода/вывода создает пакет запроса из структуры OVERLAPPED и ключа завершения и помещает его в очередь с помощью вызова KeInsertQueue. Когда поток вызывает функцию GetQueuedCompletionStatus, на самом деле вызывается функция NtRemoveIoCompletion. NtRemoveIoCompletion проверяет параметры и вызывает функцию KeRemoveQueue, которая блокирует поток, если в очереди отсутствуют запросы, или поле CurrentCount структуры KQUEUE больше или равно MaximumCount. Если запросы есть, и число активных потоков меньше максимального, KeRemoveQueue удаляет вызвавший ее поток из очереди ожидающих потоков и увеличивает число активных потоков на 1. При занесении потока в очередь ожидающих потоков поле Queue структуры KTHREAD устанавливается равным адресу порта завершения. Когда запрос помещается в порт завершения функцией PostQueuedCompletionStatus, на самом деле вызывается функция NtSetIoCompletion, которая после проверки параметров и преобразования хендла порта в указатель, вызывает KeInsertQueue.

    На этом у меня все. Ниже ссылка на программу с примером использования.
    iocp.c
    Share post

    Similar posts

    AdBlock has stolen the banner, but banners are not teeth — they will be back

    More
    Ads

    Comments 14

      +1
      Полезная информация, хотя я этим никогда и не воспользуюсь из-за убогости виндового апи.
        0
        зато есть boost.asio который вроде как их использует
          0
          Если boost использует дополнительные возможности ОС, сам при этом являясь кроссплатформенным, то это хорошо.
        +2
        >Помимо структуры OVERLAPPED вам потребуется передавать в поток еще кое-какую IO-информацию — например, адрес буфера, его длину или даже сам буфер.
        Не убогонько ли для масштабируемых серверов? На каждый слабоактивный сокет выделять целый буффер…
        В нашем юниксовом селе обычно дело обстоит примерно так(далее про неблокирующие сокеты) — есть у нас один большой вектор с указателями на 4кб буфферы(обычно равен максимальному размеру буффера сокета, чтобы одним сисколом readv всё утащить в юзерспэйс). Ну и когда приходят какие-то пакеты, если они маленькие, то сразу обрабатываются и наш большой векторо-буффер сразу опять в строю, если не можем сразу обработать, то передаём занятые 4кбайтовые буффера во владение данных связаных с клиентом и аллокэйтим ещё чистеньких страничек для следующего readv ;)
        p.s. не писал серверов под винду, если не прав — поправьте, тк самому интересно узнать.
          +1
          Вообще статья о потоках, а не буферах, так что я не мудрувствовал особо. Впрочем, у меня вообще нету отдельных буферов для каждого клиента — все клиенту пишут в одно большое кольцо памяти, которое кстати статично и мне не приходится каждый раз искать место под нового юзера или новое сообщение.
            +1
            я наверное один из тех кто неправильно понял как работают IOCP :)
            не могли бы вы вкратце описать как прочитать из сокета?

            ну например на линуксе это выглядело бы так:

            создаём сокет
            добавляем IN/OUT события для данного сокета
            loop:
            дожидаемся событий
            читаем из сокета
            обрабатываем полученные данные
            goto loop;

            Мне почему-то казалось, что когда используется модель IOCP, то нужно делать асинхронные вызовы и передавать буффер?
            как-то так:

            создаём сокет
            аллокэйтим буффер
            loop:
            делаем асинхронный запрос на чтение и передаём буффер
            дожидаемся событий
            обрабатываем полученные данные
            goto loop;
              +1
              То, что вы описали kills the purpose. Я еще в прошлом посте хотел отметить что неблокирующие сокеты (или вообще чтение/запись) это еще не асинхронность.

              Суть как раз в том, что после

              >делаем асинхронный запрос на чтение и передаём буффер

              мы ничего не ждем. Каждый цикл мы смотрим на ситуацию — есть ли чего на отправку или на прием. Если что-то есть, то мы инициируем новую операцию или обрабатываем завершившуюся. Если нет вообще ничего, то поток спит не потребляя CPU.

              Вообще касательно линукса: там есть aio. Вот хорошая, на мой взгляд иллюстрация: www.ibm.com/developerworks/linux/library/l-async/figure1.gif
              Aio — это фактически оверлаппед вызовы виндовс. Порта завершения в линуксе нет, т.к. это относительно высокоуровневый объект. Я сейчас разбираюсь с механизмом kqueue+aio в FreeBSD, чтобы реализовать аналог iocp.
                +1
                И да, скоро соберу себя в кучу и переведу статью, откуда эту картинку взял.
                  +2
                  так, хочу разобраться :)
                  у ядра есть свои буфферы для сокетов(которые достаточно хорошо реализованы и кушают память только по потребности), когда они заполняются — нужно оповестить приложение, которое жаждит знать о том что в сокет поступили данные. Так вот зачем нам тут aio? Если неблокирующий сискол на чтение(есть даже какие-то подвижки в сторону zero-copy — vmsplice) сможет сейчас же забрать всё что находится в буффере ядра и продолжить выполнение.
                  aio же подразумевает то что мы ко всему этому на каждый запрос чтения выделяем буффер, о заполнении которого нас оповещают. Но это же во много хуже по сравнению с неблокирующим вариантом, где мне достаточно одного буффера на тред(при условии что мы сразу обработаем данные, в реальности всё конечно же чуточку сложнее).
                  в чём преимущество у aio с сетью(не файловый io, там всё сложнее и поэтому вполне разумно)?! я вижу только недостатки у подобного подхода.
                    +1
                    ну и по количеству сисколов :)
                    у меня вектор буфферов(буффер = странице памяти) на тред имеет размер = SO_RCVBUF
                    и когда меня оповещают о том что у сокета появились какие-то данные в буффере, я почти уверен что мне будет достаточно одного readv :)
                    При использовании aio я просто не могу себе позволить выделять такие большие буффера на каждый сокет. И сколько тогда раз придётся просить ядро заполнить мне буффер, а потом дожидаться ответа?!
            +1
            >у ядра есть свои буфферы для сокетов
            Да, приложение работает не с этими буферами. Zero-copy это вообще отдельная тема.

            >Но это же во много хуже по сравнению с неблокирующим вариантом, где мне достаточно одного буффера на тред

            На тред? У вас client-per-thread? Тогда стоит считать не буферы, а треды. При описанной выше схеме у вас всего лишь несколько потоков.

            И я не совсем понял насчет
            >что мы ко всему этому на каждый запрос чтения выделяем буффер

            Со стороны вызовы recv на неблокирующем сокете и WSASend с OVERLAPPED выглядят одинаково. Разница в том, как мы получаем и обрабатываем результат выполнения.
              +1
              >Да, приложение работает не с этими буферами
              Я просто с этого начал, так как хотел подчеркнуть бессмысленость aio в связке с сокетами ;)

              >У вас client-per-thread?
              нет, у меня много клиентов на тред, но так как я всё равно не смогу одним тредом работать в двух местах :) то зачем мне больше одного буффера(iovec, чтобы было проще отцеплять заполненые и необработаные) на тред…
              Если в данный момент времени нехватает данных(пришло пол пакета), то я отцепляю из вектора полузаполненный буффер и отдаю клиенту, а когда нужно опять делать сискол — цепляю его первым в iovec… Нет смысла городить пустые буффера на каждого клиента :)

              >И я не совсем понял насчет
              >>что мы ко всему этому на каждый запрос чтения выделяем буффер
              когда вы делаете WSARead — вы сразу же получаете результат или ждёте пока IOCP не скажет что он заполнен? Если второй вариант, то да — вы создаёте пустой и бессмысленый буффер ;)
              У неблокирующего варианта я сразу получаю результат и могу по нему работать. А отработав — вернуть буффер, чтобы им попользовались остальные клиенты ;)
              0
              Я понял о чем вы. Тут вы правы. Просто я pre-allocate все буферы при старте сервера, поэтому как то не задумывался об этом.
                0
                И в общем-то не сказать, что pre-allocate обязательно хуже.
                На практике аллоцированные и не запрошенные страницы памяти обычно не вклеиваются в адресное пространство и не участвуют в планировании физического ресурса.
                Может быть проиллюстрировано следующим образом:
                // 596 kb used, try to allocate 256MB...
                char *mem = (char *)malloc(1024*1024*256);
                // 596 kb used!
                mem[4096 * 0] = 123; // 596 kb used after
                mem[4096 * 1] = 123; // 600 kb used after
                mem[4096 * 2] = 123; // 604 kb used after
                mem[4096 * 300] = 123; // 612 kb used after
                mem[4096 * 400] = 123; // 616 kb used after
                mem[4096 * 500] = 123; // 620 kb used after
                

                Ключевое слово в ядре linux на этот счёт «nopage».
                А в защиту IOCP, при всей его одиозности (и WINAPI вцелом): в отличии от большинства syscall-ов, у него есть шансы на отсутствие копирования данных «буфер сокета -> пользователь». Это случается если overlapped чтение вызывается до приёма TCP пакета, а буфер сокета пуст… Впрочем, это с лёгкостью может нивелироваться межпоточными издержками…

              Only users with full accounts can post comments. Log in, please.