Немного о многопоточном программировании. Часть 1. Синхронизация зло или все-таки нет

    Мне по работе часто приходится сталкиваться с высоконагруженными многопоточными или многопроцессными сервисами (application-, web-, index-server).
    Достаточно интересная, но иногда неблагодарная работа — оптимизировать все это хозяйство.
    Растущие потребности клиентов часто упираются в невозможность просто заменить железную составляющую системы на более современную, т.к. производительность компьютеров, скорость чтения-записи жестких дисков и сети растут много медленнее запросов клиентов.
    Редко помогает увеличение количества нодов кластера (система как правило распределенная).
    Чаще приходится запустив профайлер, искать узкие места, лезть в source code и править ляпы, которые оставили коллеги, а иногда и сам, чего греха таить, много лет назад.
    Некоторые из проблем, связаных с синхронизацией, я попытаюсь изложить здесь. Это не будет вводный курс по многопоточному программированию — предпологается, что читатель знаком с понятием thread и context switch, и знает для чего нужны mutex, semaphore и т.д.

    Любому разработчику, многопоточно проектирующему что-то большее чем «Hello world», ясно, что создать полностью асинхронный код невероятно сложно — нужно что-то писать в общий channel, изменить структуру в памяти (к примеру повернуть дерево hash-таблицы), забрать что-то из очереди и т.д.
    Синхронизируя такой доступ, мы ограничеваем одновременное исполнение некоторых критичных участков кода. Как правило это один, редко несколько потоков (например 1 writer/ N readers).
    Необходимость синхронизации неоспорима. Чрезмерная же синхронизация очень вредна — кусок программы более-менее шустро работаюший на 2-х или 3-х потоках, уже для 5-ти потоков может выполняться почти «singlethreaded», а на 20-ти даже на очень неплохом железе практически ложится спать.

    Однако практика показывает, что иногда и недостаточная синхронизация исполнения приводит к тому же результату — система залипает. Это происходит, когда исполняемый параллельно код содержит например обращения к HDD (непрерывный seek), или при множественном обращении к различным большим кускам памяти (например постоянный сброс кэша при context switch — CPU cache просто тупо отваливается).

    Используйте семафоры (semaphore)

    Семафоры изобрели не только для того, что бы на них строить конструкции вида ReadWriteMutex. Семафоры можно и нужно использовать для уменьшения нагрузки на железо на куске кода, исполняемого параллельно.
    Как правило таким образом можно вылечить множество «залипаний», которые легко найти профилированием кода — когда при увеличении количества потоков, время исполнения отдельных функций заметно растет, при том, что другие функции отрабатывают с той же или сравнимой скоростью.
    Развернуть Profiler-Output
    ========================================================================================================================
    Run # 1 (5 Threads)
      rpcsd (hbgsrv0189, PID:0718, TID:2648)
      # 03-09-2012 | 13:50:45 | Servlet: A::RpcsServlet, URI: /index-search
    ========================================================================================================================
                                 NS | Name                   |  C |  T | Tot(s) | TwR(s) | Avg(s) | AwR(s) | Max(s) | Min(s)
    ========================================================================================================================
                     ::RPC::Service | service                |  1 |  1 |  1.593 |  1.593 |  1.593 |  1.593 |  1.593 |  1.593
                   ::A::RpcsServlet | service                |  1 |  1 |  1.592 |  1.592 |  1.592 |  1.592 |  1.592 |  1.592
                      ::IndexSrvRpc | index-search           |  1 |  1 |  1.584 |  1.584 |  1.584 |  1.584 |  1.584 |  1.584
                  ::Indexer::Search | Search                 |  1 |  1 |  1.584 |  1.584 |  1.584 |  1.584 |  1.584 |  1.584
                  ::Indexer::Search | ParallelSearch         |  2 |  2 |  1.256 |  1.256 |  0.628 |  0.628 |  0.655 |  0.601
           ::Indexer::Search::Cache | SearchL2Index          | 44 | 44 |  0.686 |  0.686 |  0.016 |  0.016 |  0.016 |  0.015
                  ::Indexer::Search | InvalidateCacheIdx     | 20 | 20 |  0.570 |  0.570 |  0.028 |  0.028 |  0.031 |  0.020
           ::Indexer::Search::Cache | InvalidateIdx          | 20 | 20 |  0.276 |  0.276 |  0.014 |  0.014 |  0.016 |  0.002
                  ::Indexer::Search | SearchL1Index          |  1 | 14 |  0.203 |  0.203 |  0.203 |  0.016 |  0.203 |  0.016
                  ::Indexer::Search | MergeJoin              |  1 |  1 |  0.125 |  0.125 |  0.125 |  0.125 |  0.125 |  0.125
    
    ========================================================================================================================
    Run # 2 (25 Threads w/o semaphore)
      rpcsd (hbgsrv0189, PID:0718, TID:2648)
      # 03-09-2012 | 13:52:03 | Servlet: A::RpcsServlet, URI: /index-search
    ========================================================================================================================
                                 NS | Name                   |  C |  T | Tot(s) | TwR(s) | Avg(s) | AwR(s) | Max(s) | Min(s)
    ========================================================================================================================
                     ::RPC::Service | service                |  1 |  1 |  4.255 |  4.255 |  4.255 |  4.255 |  4.255 |  4.255
                   ::A::RpcsServlet | service                |  1 |  1 |  4.254 |  4.254 |  4.254 |  4.254 |  4.254 |  4.254
                      ::IndexSrvRpc | index-search           |  1 |  1 |  4.244 |  4.244 |  4.244 |  4.244 |  4.244 |  4.244
                  ::Indexer::Search | Search                 |  1 |  1 |  4.244 |  4.244 |  4.244 |  4.244 |  4.244 |  4.244
                  ::Indexer::Search | ParallelSearch         |  2 |  2 |  3.729 |  3.729 |  1.865 |  1.865 |  1.889 |  1.840
                  ::Indexer::Search | InvalidateCacheIdx     | 20 | 20 |  2.497 |  2.497 |  0.125 |  0.125 |  0.126 |  0.125
           ::Indexer::Search::Cache | InvalidateIdx          | 20 | 20 |  2.188 |  2.188 |  0.109 |  0.109 |  0.113 |  0.109
           ::Indexer::Search::Cache | SearchL2Index          | 44 | 44 |  1.231 |  1.231 |  0.028 |  0.028 |  0.031 |  0.015
                  ::Indexer::Search | SearchL1Index          |  1 | 14 |  0.360 |  0.360 |  0.360 |  0.028 |  0.360 |  0.016
                  ::Indexer::Search | MergeJoin              |  1 |  1 |  0.155 |  0.155 |  0.155 |  0.155 |  0.155 |  0.155
                  
    ========================================================================================================================
    
    Run # 3 (25 Threads with semaphore in InvalidateCacheIdx, before InvalidateIdx)
      rpcsd (hbgsrv0189, PID:0718, TID:2648)
      # 03-09-2012 | 14:02:51 | Servlet: A::RpcsServlet, URI: /index-search
    ========================================================================================================================
                                 NS | Name                   |  C |  T | Tot(s) | TwR(s) | Avg(s) | AwR(s) | Max(s) | Min(s)
    ========================================================================================================================
                     ::RPC::Service | service                |  1 |  1 |  2.213 |  2.213 |  2.213 |  2.213 |  2.213 |  2.213
                   ::A::RpcsServlet | service                |  1 |  1 |  2.213 |  2.213 |  2.213 |  2.213 |  2.213 |  2.213
                      ::IndexSrvRpc | index-search           |  1 |  1 |  2.205 |  2.205 |  2.205 |  2.205 |  2.205 |  2.205
                  ::Indexer::Search | Search                 |  1 |  1 |  2.205 |  2.205 |  2.205 |  2.205 |  2.205 |  2.205
                  ::Indexer::Search | ParallelSearch         |  2 |  2 |  1.690 |  1.690 |  0.845 |  0.845 |  0.889 |  0.801
           ::Indexer::Search::Cache | SearchL2Index          | 44 | 44 |  1.153 |  1.153 |  0.026 |  0.026 |  0.031 |  0.016
                  ::Indexer::Search | InvalidateCacheIdx     | 20 | 20 |  0.537 |  0.537 |  0.027 |  0.027 |  0.031 |  0.007
                  ::Indexer::Search | SearchL1Index          |  1 | 14 |  0.359 |  0.359 |  0.359 |  0.028 |  0.359 |  0.017
           ::Indexer::Search::Cache | InvalidateIdx          | 20 | 20 |  0.278 |  0.278 |  0.014 |  0.014 |  0.016 |  0.004
                  ::Indexer::Search | MergeJoin              |  1 |  1 |  0.156 |  0.156 |  0.156 |  0.156 |  0.156 |  0.156
    


    В третьей выдаче профайлера можно видеть, как изменилось время исполнения метода InvalidateIdx и соответственно метода InvalidateCacheIdx, после окружения семафором invCI_semaphore вызова метода InvalidateIdx
    semaphore invCI_semaphore(config.InvCI_Count/* = 5*/);
    ...
    int InvalidateCacheIdx() {
      ...
      while (...) {
        cache.SearchL2Index();
        invCI_semaphore++;
        while (cache.InvalidateIdx()) {};
        invCI_semaphore--;
      }
      ...
    }
    

    Такой метод использования семафоров довольно прост и не обязательно требует полного понимания процесса, но имеет множество недостатков, в том числе тот факт, что максимальное количество потоков для каждого блока будет скорее всего подбираться в бою (в продакшен, на системе клиента) — что не всегда есть хорошо. Зато огромным преимуществом такого способа оптимизации является возможность быстро увеличить количество потоков всего сервиса, без изменения execution plan, т.е. практически без переделки всего движка — просто проставив несколько семафоров на предыдущее значение в узких местах. Я не сторонник необдуманно использовать семафоры, но в качестве временного решения (для успокоения клиента), я не раз использовал этот метод, что бы впоследствии спокойно переделать «правильно», вникнув в исходный код.

    Расставляйте приоритеты (priority)

    Приоритеты являются очень удобным механизмом, так же позволяющим довольно просто «облегчить» приложение. Например, если в системе логи пишутся отдельным потоком, то уменьшив ему приоритет до минимального, можно сильно «облегчить» процесс, не уменьшая log-level.
    Например конструкцию следующего вида можно использовать если пул со множеством потоков обрабатывает задачи разной приоритетности:
    // before doing ...
    if ( thisThread.pool.count() > 1 
      && !(currentTaskType in (asap, immediately, now)) 
    ) {
      thisThread.priority = 2 * thisThread.pool.priority;
    } else {
      thisThread.priority = 5 * thisThread.pool.priority;
    }
    // do current task ...
    

    При этом нужно понимать, что приоритет потока действует для всего процесса, а не только для пула в котором существует этот поток — используйте его с оторожностью.

    Divide et impera (Разделяй и властвуй)

    Довольно часто не требуется мгновенное исполнение какого-либо участка кода — т.е. некоторое действие или часть задачи можно отложить. Например писать логи, считать посещения, переиндексировать кэш, и т.д.
    Существенно повысить скорость выполнения можно, выделяя куски синхронного кода в отдельные задачи, с последующим выполнением их позже (например фоново — используя т.н. background service). Это может быть отдельный поток, пул потоков или даже другой процесс aka RPC (например асинхронный вызов WebService). Естественно временная стоимость вызова (помещения в очередь и т.д.) этой задачи должна быть меньше стоимости самого исполнения.
    Пример с отдельным LOG-потоком:
    // здесь мы пишем лог напрямую :
    int log(int level, ...) {
      if (level >= level2log) {
        logMutex.lock();
        try {
          file.write(...);
          file.flush();
        } finally {
          logMutex.release();
        }
      }
    }
    


    // здесь - фоново :
    int log(int level, ...) {
      if (level >= level2log) {
        // защитить, добавить и освободить очередь :
        logQueue.mutex.lock();
        logQueue.add(currentThread.id, ...);
        logQueue.mutex.release();
        // разбудить лог-worker'а :
        logQueue.threadEvent.pulse();
      }
    }
    // background-logging thread:
    int logThreadProc() {
      ...
      while (true) {
        // делаем задержку - ожидаем латенц /* 500 ms */ или размер очереди /* 10 */:
        if ( logQueue.count < config.LogMaxCount /* = 10 */
          || (sleepTime = currentTime - lastTime) < config.LogLatency /* = 500 */) 
        {
          logQueue.threadEvent.wait(config.LogLatency - sleepTime);
          continue;
        };
        // пишем в буфер и удаляем из очереди :
        logQueue.mutex.lock();
        try {
          foreach (... in logQueue) {
            file.write(...);
            logQueue.delete(...);
          }
        } finally {
          logQueue.mutex.release();
        }
        // пишем буфер в лог:
        file.flush();
        // спать :
        logQueue.threadEvent.wait();
        lastTime = currentTime;
      }
      ...
    }
    

    Такая простая конструкция позволяет существенно снизить затраты на логирование и уменьшить последствия от context switch, которые практически не будут зависеть от количества потоков, использующих метод log.
    Важно понимать, что теперь, навешивая дополнительную логику на логирование, нагружается только поток непосредственно пишущий в лог. Т.е. можно сколько угодно делать наш лог интелигентней — ввести понятие LogLatency, как на примере, добавить какой-нибудь лог-анализатор (что-нибудь как fail2ban) или же сохранять например все debug сообщения, с целью логировать их только в случае ошибки, группировать по TID, и т.д. — все это практически не будет нагружать остальные потоки.
    Кроме того, при использовании первого метода (сообщение пишется синхронно напрямую в лог-файл), потоки так сказать «разпараллеливаются». Т.е. чем больше объектов синхронизации (mutex, critical section, waiting events) и выше затраты на context switch, тем вероятнее, что все потоки проходящие через эти объекты будут выполнятся последовательно.
    Т.е. скорость многопоточного исполнения задачи приближается или становится даже хуже скорости однопоточного ее исполнения. Уменьшая время между lock() и release(), код улучшается сразу в двух направлениях — становится быстрее в самом потоке и уменьшается вероятность «разпараллеливания» процесса.
    Организовав очередь событий, иногда можно создавать подобные конструкции даже не прибегая к созданию дополнительных потоков. Например, записывать в очередь некоторые действия, чтобы позже, например во время «idle time» выполнить их этим же потоком, один за другим.
    Легко проилюстрировать это можно на TCL:
    ## отдать страницу / документ ...
    ...
    ## показать counter :
    set counter [db onecolumn {select cntr from accesslog where userid = $userid}]
    %>
    Вы видели эту страницу <%= $counter %> раз...
    <%
    ## добавить событие "писать access log" in background, когда будет выполнено "update idle":
    after idle UpdateAccess $userid [clock seconds]
    ## завершить.
    ....
    ## где-то в коде приложения :
    proc UpdateAccess {userid lasttime} {
      db exec {update accesslog set cntr = cntr + 1, lastaccess = $lasttime where userid = $userid}
    }
    


    Очереди, FIFO, LIFO и многопоточность

    Организация очереди, пула данных или последовательного буфера дело не хитрое, однако нужно иметь в виду, что при многопоточности и прочих равных условиях очередь LIFO стоит сделать выбором номер один (конечно если при этом последовательность действий не важна). Иногда можно комбинировать или группировать LIFO и FIFO (элементы LIFO сделать маленькими очередями FIFO или например строить буфер с конца и т.д.). Смысл таких извращений кроется в кеше процессора и отчасти в виртуальной организации памяти. Т.е. вероятность того, что последние элементы из LIFO еще находятся в кеше процессора несравненно выше вероятности того же у FIFO той же длинны.

    Пример из жизни — В нашем собственном memory manager, была организована хеш-таблица из пулов свободных объектов одинакого размера (кто очень часто вызывал malloc / free, знает зачем это делается:). Пулы были организованны по принципу FIFO — функция mymalloc возвращала первый, давным-давно положеный в пул функцией myfree элемент. Причина побудившая тогда разработчика использовать FIFO проста до банальности — если какой-нибудь недобросовестный «программист» некоторое время будет использовать объект после myfree, то программа возможно проработает дольше. После замены на LIFO весь арсенал (application server), активно использующий memory manager заработал порядка 30% быстрее.

    ReadWriteMutex

    Очень часто синхронизировать необходимо только в случае изменения объекта. Например при записи в общий файл, при изменении структуры списков или hash таблиц и т.д. При этом, как правило, это разрешается только одному потоку, при этом часто даже читающие потоки блокируются (что бы исключить dirty read и вылет программы с исключением, поскольку записи до конца изменения не совсем валидны).
    Блокировку таких объектов правильнее делать используя RW-mutex, где читающие потоки не блокируют друг друга, и только при блокировке записи происходит полная синхронизация кода (исполняется одним потоком).
    При использовании read/write-mutex, нужно всегда точно представлять, как происходит чтение объекта, поскольку в некоторых случаях, даже при чтении, объект может изменятся (например при построении внутреннего cache при первичной инициализации или реинициализации после записи). В этом случае идеальный API предоставляет callback для блокировки, либо блокирует самостоятельно в случае многопоточности, либо возможное использование RW-mutex, со всеми исключениями, подробнейше описано в документации к API. В некоторых реализациях RW-mutex нужно заранее знать (сообщать mutex) количество reader-потоков, иногда writer-потоков. Это связано с конкретной реализацией блокировки записи (как правило используются семафоры). Несмотря на эти и другие ограничения, при наличии нескольких reader-потоков, желательно по возможности пытаться выполнить синхронизацию именно на таком mutex.

    Читайте документацию, читайте source code

    Проблема незнания, иногда непонимания того, что скрывается за тем или иным классом или объектом, особенно критично проявляется при использовании их в многопоточном приложении. Особенно это касается и базовых объектов синхронизации. Попробую разъяснить, что я имею в виду, на примере неправильного использования RW-mutex.
    Один мой коллега как-то использовал fair RW-mutex, построеный на семафорах. Он поленился динамически передавать количество reader-потоков в класс RWMutex (задал статически «максимально возможное» значение 500) и написал следующий код для writer-потока:
    ...
    RWMutex mtx(500);
    ...
    mtx.lockWrite();
    hashTab.add(...);
    mtx.releaseWrite();
    ...
    

    И при хорошей нагрузке, сервер уходил в глубокий запой ложился в спячку. Все дело в том, что он сделал две ошибки — взяв статичное значение 500 и не разобрался как будет вести себя такой RW-mutex на этой конкретной платформе. Т.к. RW-mutex был сделан fair — использовался код, подобный следующему:
    void RWMutex::lockWrite() {
      writeMutex.lock();
      for (register int i = 0; i < readersCount /* в нашем случае 500 */; i++)
        readSemaphore++;
    }
    void RWMutex::releaseWrite() {
      if (!f4read)
        writeMutex.release();
      readSemaphore -= readersCount;
      if (f4read)
        writeMutex.release();
    }
    

    Такая конструкция благодаря использованию в теле lockWrite инкремента readSemaphore++ в цикле, вместо readSemaphore += readersCount, дает одинаковые шансы для reader- и writer-потоков. Возможно он не знал, что семафор класс для построения этого RWMutex, использовал одну кросплатформенную библиотеку, которая выдавала для этой конкретной платформы простенький код, выглядевший как-то так:
    int Semaphore::operator ++() {
      mutex.lock();
      if (sema++ > MaxFlowCount) flowMutex.lock();
      mutex.release();
    }
    

    Т.е. при добавлении в хеш-таблицу hashTab 100-а значений, при одновременном чтении несколькими reader-потоками, мы имели 100*500 блокировок (и выпадание в осадок на несколько милисекунд из-за context switch). Самое интересное в этой истории, что это был базовый класс RWSyncHashTable, активно используемый повсеместно в нашем коде.
    Нужно четко запомнить: некоторые конструкции API могут быть уже синхронизированы. Иногда это даже конструктор и деструктор объекта. В этом случае дополнительная синхронизация — часто вред. Это как раз тот случай, когда кашу маслом испортишь.
    Читайте источники, заглядывайте в документацию к API — и такие ляпы, с большей вероятностью, обойдут Вас стороной.

    Синхронный != Ожидание

    Синхронизация исполнения абсолютно не означает, что наш процесс только и делает, что ждет. Блокирующие методы современных систем давольно гибки, и позволяют делать следуюшие конструкции:
    static int mtx_locked = 0;
    // уже заблокировано кем-то - нет, подождем 1 мс?
    while ( mtx_locked || !mtx.lock(config.MaxWaitTime /* пример 1 ms */) ) {
      // не могу блокировать - сделай что-то другое ... например ...
      processNextRequest();
    }
    // за мютексом - блокирован ...
    mtx_locked++;
    // исполняем ...
    processInLock();
    // unlock ...
    mtx_locked--;
    mtx.release();
    

    Использование кода такого вида, позволяет не ждать блокировки mutex'а и ложится спать, а попытаться сделать что-то другое в это время. На подобном принципе, хотя зачастую немного по другому реализованному (callback or event execution, transactional nowait locking, per thread-caching и т.д.), базируется концепция асинхронного программирования. При этом нужно соблюдать очень простое правило — «не ждать».
    В этом примере приведен еще один прием избежать или минимировать context switch: это статичная переменная mtx_locked. Такой прием позволяет не выполнять mtx.lock, если и так заранее известно, что код блокирован (mtx_locked > 0), и нам не обязательно это знать наверняка — мы просто делаем что-то другое.

    Наверное стоит закончить на этом первую часть (а-то много букв). Если где-то написал прописные для кого-то истины, прошу простить покорно — не со зла. Предложения, пожелания, критика приветствуются.

    В следующей части:

    • Deadlock
    • События; реакция во время ожидания;
    • Синхронизация в Банках данных;
    • Systemwide synchronization (crossprocess, crosscluster)
    • Асинхронное программированиие;
    • Shared resources
    • Garbage, Освобождение ресурсов
    • Мониторинг потоков, HeartBit
    • Профайлинг
    • Ваши пожелания
    Поддержать автора
    Поделиться публикацией
    AdBlock похитил этот баннер, но баннеры не зубы — отрастут

    Подробнее
    Реклама

    Комментарии 55

      0
      Хорошая статья. Только пожалуйста, никогда не делайте так:
      static int mtx_locked = 0;
      <source>
      Это не работает:
      www.nwcpp.org/Downloads/2004/DCLP_notes.pdf
      Но в новом стандарте есть atomic int, для которого предоставляются дополнительные гарантии и его можно было бы использовать, если бы мьютексы не были бы futex'ами. Они уже используют атомарные операции и они и так быстро работают, если не возникает коллизии. Системный вызов используется только для ожидания освобождения futex'а.
        0
        Вы не правы — это прекрасно работает (кстати быстрее чем без), ведь синхронизация кода достигается за счет mtx.lock. К тому же вы меня не поняли — я имел ввиду совсем не DCLP. Переменная mtx_locked тут используется чтобы избежать синхронизации, если можно допустить, что при mtx_locked > 0 наш мютекс в локе, и мы делаем что-то другое.
          0
          Ссылку на DCLP я привел потому, что там описываются некоторые проблемы, связанные с оптимизациями компилятора и процессора. В данном случае вы правы.
            +1
            а разве не надо писать static volatile int mtx_locked = 0, намекнув компилятору, что mtx_locked хитрый флаг, изменяемый из разных потоков? прокомментируйте пожалуйста.
              0
              На сколько я знаю, volatile положено здесь в C# и Java.
              В C/С++ декларация volatile указывает, что значение переменной может изменятся вне контекста текущей программы, например, средствами hardware.
              Хотя она не помешает — думаю что без volatile переменная state в следующем примере выпиливается компилятором и выливается просто в while(true)...
              static volatile int state = 0;
              void poll_state() {
                while ( state == 0 ) {...};
              }
              
                +1
                Volatile в c/c++ говорит компилятору, не доверять значению переменной которое хранится в регистрах процессора, а все время лазить за ним в память, в надежде что его там кто-то поменяет. В многопроцессорных системах, на сколько я помню, для «хитрых флагов» стоит его выставлять.
                  0
                  volatile сильно влияет на производительность в худшую сторону.
                  Надо внимательно почитать стандарт. В новом стандарте C++11 прописано, что мьютексы являются синхронизационными примитивами и в момент захвата/освобождения мьютекса остальные потоки должны увидеть изменения (side effects), сделанные в текущем потоке. Могут увидеть и раньше, но не позже.
                  Поэтому компилятор, в данном случае, может закешировать mtx_locked на регистре, но должен будет обновлять состояние в памяти при освобождении мьютекса. Но я сейчас не готов сказать, обязан ли он считывать переменную заново, если не пытается войти в мьютекс. Возможно, чтои не ообязан. И тогда цикл может никогда не увидеть изменений.
                    0
                    Использование кого-нибудь atomic bool точно гарантировало бы корректность предложенной схемы.
                      0
                      с этим никто не спорит, мне интересно, есть ли баг в приведенном в статье коде. На С++ я немного программировал, поэтому я не могу точно сказать… но ИМХО, нет гарантии, что актуальное состояние флага mtx_locked видят все потоки…

                      Например, может ли другой поток закешировать у себя значение mtx_locked большее нуля и вообще перестать пытаться лочить мьютекс (некоторое время или вообще навсегда) и начнет крутиться только в вайле?
                    0
                    volatile сильно влияет на производительность в худшую сторону
                    Посмотрите асм-код ниже — согласно ему (в этом конкретном частном случае) «static volatile» и «static» абсолютно одинаковы, а для «member» все, как ни странно, наоборот: они быстрее чем «member» на 8 тактов для каждого цикла (если не брать во внимание instruction-level parallelism и superscalar architecture).

                    А так да, влияет.
                    +1
                    Во первых, volatile в c++ указывает, что другой процесс (для библиотеки) или hardware изменяет переменную в памяти — прошу заметить не поток.
                    Во вторых, никто не говорил про многопроцессорную систему — речь шла про многопоточную среду.
                    Во третьих, кто мне скажет, как это будет на Delphi? :)
                    Люди… я показывал (пусть и на близких к си примерах) принцип работы, а не готовую реализацию…
                      0
                      Вы проделали отличную работу, спасибо большое за статью, узнал много нового.

                      Но хочется узнать как закодить правильно… в Java без synchronized/volatile флажок mtx_locked бы не работал. А как правильно это сделать в С++, чтобы точно всегда работало? Сейчас почти все процессоры многоядерные и у каждого ядра свой кеш. А volatile очень снижает производительность чтения/записи.
                        0
                        Не думаю что снижение производительности ошутимие mtx.lock… (profiler рассудит нас).
                        Думаю что для этой конкретной реализации — это не важно… Можно даже не в статик оформить, а как переменную класса — возможно что в этом случае блокировка будет заметна не блокирующему потоку еще один цикл (т.е. поток повторит «сделай что-нибудь другое» еще один раз). Часто это не критично.
                        Я сам «volatile» для этого не использовал. Посмотрел у нас в c++ исходники — есть некоторые и с volatile. Что интересно volatile нет ни в одной библиотеке. У нас крутятся несколько одинаковых процессов, используюших эти библиотеки — может поэтому… Попробую на досуге — отпишусь.
                          0
                          Скомпилил с максимальной оптимизацией в трех вариантах на виндах в (devcpp, visual) и на дебиан gcc.
                          Статик при добавлении volatile ожидаемо не показала никаких изменений в машинном коде.
                          При переносе mtx_locked в класс (as non static member) наблюдались незначительные изменения в машинном коде. Что касается блокировки — все три варианта абсолютно рабочие.
                          //==============================
                          //static volatile int mtx_locked = 0;
                          //static int mtx_locked = 0;
                          ...
                          // addr of mtx_locked -> ebx:
                          mov ebx, 0x00403220
                          jmp +0x05; // goto while;
                          // processNextRequest():
                          call processNextRequest()
                          // while ( mtx_locked || !mtx.lock(1) ) 
                          cmp dword ptr [ebx], 0x00
                          jnz -0x0a; // goto processNextRequest;
                          push 0x01; // time = 1
                          lea eax,[esp+0x04]; //mtx;
                          push eax; 
                          call Mtx::lock(int) //mtx.lock(1);
                          add esp,0x08;
                          test eax, eax; //not locked ?
                          jz -0x1d; // goto processNextRequest;
                          // mtx_locked++;
                          inc dword ptr [ebx];
                          // processInLock():
                          call processInLock();
                          // mtx_locked--;
                          dec dword ptr [ebx];
                          // mtx.release():
                          push esp
                          call Mtx::release();
                          pop esp
                          ...
                          

                          //==============================
                          //class member = private: int mtx_locked;
                          ...
                          // addr of this (eax) -> ebx:
                          mov ebx, eax
                          jmp +0x05; // goto while;
                          // processNextRequest():
                          call processNextRequest()
                          // while ( mtx_locked || !mtx.lock(1) ) 
                          cmp dword ptr [ebx+0x000002d0], 0x00
                          jnz -0x0e; // goto processNextRequest;
                          push 0x01; // time = 1
                          lea eax,[esp+0x04]; //mtx;
                          push eax; 
                          call Mtx::lock(int) //mtx.lock(1);
                          add esp,0x08;
                          test eax, eax; //not locked ?
                          jz -0x21; // goto processNextRequest;
                          // mtx_locked++;
                          inc dword ptr [ebx+0x000002d0];
                          // processInLock():
                          call processInLock();
                          // mtx_locked--;
                          dec dword ptr [ebx+0x000002d0];
                          // mtx.release():
                          push esp
                          call Mtx::release();
                          pop ecx
                          ...
                          

                            +1
                            Статик при добавлении volatile ожидаемо не показала никаких изменений в машинном коде.

                            Если volatile нет, то компилятор имеет право кешировать переменную на регистре и не обновлять её значение в памяти. Но регистров мало и, если processNextRequest(); и processInLock(); достаточно сложные, то компилятор оставит регистр для них.
                            А я взял код, максимально удобный для оптимизации:
                            std::mutex mtx;
                            void test_mutex() {
                                static int mtx_locked = 0;
                                while ( mtx_locked || !mtx.try_lock() ) {
                                  // не могу блокировать - сделай что-то другое ... например попробуй снова
                                }
                                // за мютексом - блокирован ...
                                mtx_locked++;
                                // ничего не делаем
                                // unlock ...
                                mtx_locked--;
                                mtx.unlock();
                            }
                            

                            И собирал gcc с разными флагами оптимизации. Во всех случаях компиляатор увидел, что переменная сначала увеличивается на единицу, а потом уменьшается. Это было оптимизировано и mtx_locked не изменяется никогда. При -O2 в коде есть вот такой фрагмент:
                              4006e0:	8b 05 a2 09 20 00    	mov    0x2009a2(%rip),%eax        # 601088 <_ZZ10test_mutexvE10mtx_locked>
                              4006e6:	53                   	push   %rbx
                              4006e7:	bb 00 00 00 00       	mov    $0x0,%ebx
                              4006ec:	0f 1f 40 00          	nopl   0x0(%rax)
                              4006f0:	85 c0                	test   %eax,%eax
                              4006f2:	75 fc                	jne    4006f0 <_Z10test_mutexv+0x10>
                            

                            Обратите внимание на 4006f0 и 4006f2. Это бесконечный цикл. То, о чем я предупреждал.
                            Так что тут обязательно надо использовать atomic bool, если очень хочется оптимизации.
                              0
                              Но согласитесь такое происходит только если "// ничего не делаем".
                              А во вторых в реале mtx_locked++ и mtx_locked-- лежат вообще в разных функциях.
                              Хотя вы отчасти правы — с этим нужно быть осторожным (опять же как и с самой синхронизацией).
                              Может какие нибудь #прагмы использовать, что бы отключить синхронизацию?
                                +1
                                Но согласитесь такое происходит только если "// ничего не делаем".

                                Это абсолютно не важно. Корректность одного куска кода зависит от не связанного с ним другого куска. Это очень сложно для понимания. Это достаточный повод переписать код.
                                Кроме того, корректность вашей программы зависит от возможностей статического анализа компилятора. Он может научиться делать whole program analysis. При этом тесты могут легко пропустить ошибку, а найти и отладить её архисложно. Вам этот геморой точно не нужен.
                                А во вторых в реале mtx_locked++ и mtx_locked-- лежат вообще в разных функциях.

                                Это очень плохо. Также очень плохо, что мьютекс и флаг — это два отдельных объекта, хранящихся в разных местах. Как вы обеспечите, что при любом захвате мьютекса устанавливается флаг, а при его освобождении снимается? Как это поймет человек, читающий ваш код, если захват и отдача мьютекса находятся в разных функциях?

                                Кстати, из-за этого вы словите deadlock, если под мьютексом кинется исключение. Никто его не освободит и все станет плохо. Используйте только scoped lock'и. Если вы используете не scoped lock'и, сначала очень хорошо подумайте. Раз семь. И напишите свой scoped объект, делающий, в том числе, и lock.
                                Может какие нибудь #прагмы использовать, что бы отключить синхронизацию?

                                Ни в коем случае. Прагмы использовать нельзя никогда и ни при каких обстоятельствах. Это недокументированные, vendor specific расширения. Не надо вам этого. Максимум, можно допустить pragma once.
                                Я уже раза три написал про atomic типы, введенные в C++11. Они были введены как раз для таких случаев.
                                  0
                                  Это абсолютно не важно
                                  Нет это очень важно — компилятор никогда не выпилит ++ и — если между ними что-то находится.
                                  Это очень плохо. Также очень плохо, что мьютекс и флаг — это два отдельных объекта
                                  Откуда вы это взяли — в статье это просто пример — в реале у меня такая и подобная логика естественно находится в одном классе…
                                  Я уже раза три написал про atomic типы, введенные в C++11
                                  Вот именно что 11.
                                    0
                                    Нет это очень важно — компилятор никогда не выпилит ++ и — если между ними что-то находится.

                                    Имеет право и выпилит, если будет уверен, что то, что между ними находится, никогда не читает и не пишет в эту переменную. Если компилятор увидит, что адрес локальной переменной никуда не передавался, он может быть в этом вполне уверенным. При -03 вызвананя функция может заинлайниться и компилятор сможет её проанализировать, если она достаточно проста. Вот, например, посчитаем сумму:
                                        mtx_locked++;
                                        for (size_t i = 0; i < 100; ++i) {
                                            sum += tst[i];
                                        }
                                        mtx_locked--;
                                    

                                    при -O1 gcc уже уберет инкремент и декремент. Правда не уберет загрузку на и с регистра. Но это скользкая тема.
                                    Главное, что код имеет неявную зависимость от другого кода. Я правда не хочу помнить о том, что под мьютексом я обязательно должен сделать хоть один системный вызов, чтобы код, который писал кто-то другой, обнаружил отпускание мьютекса. Это ведет к ошибкам, причем в данном случае цена ошибки очень велика.
                                    Откуда вы это взяли — в статье это просто пример — в реале у меня такая и подобная логика естественно находится в одном классе…

                                    Меня волнует, что люди, не знакомые с темой скопируют ваш пример. И сделают их независимыми. Примеры — это как раз идеально выполненные куски кода. Их ставят в пример.
                                    Вот именно что 11.

                                    Уже 2012 год, пользоваться стандартом 2011 года можно и нужно, там много вкусных фишек. В противном случае, намного лучше работающий чуть медленнее код, чем код, который иногда не работает, зато быстро.
                                      0
                                      Если компилятор увидит, что адрес локальной переменной никуда не передавался
                                      Вот именно что локальной — а это к счастью не так.
                                      Меня волнует, что люди, не знакомые с темой… Их ставят в пример.
                                      К сожалению вы правы — часто это так и есть. Хотя синхронизация, возможно, как раз та тема, где нужно вникать в любую мелочь — на одном копипасте здесь не вылезешь.
                          0
                          Так если приложение выполняет два своих потока на двух разных физических процессорах, они разве будут синхронизировать свои кеши? Мне внутреннее чувство подсказывает что нет.
                            0
                            Имею ввиду обычные переменные аля int, а не всякие мьютексы, у которых скорее всего все это предусмотрено. Если кто-то точно знает что я не прав, то буду рад услышать комментарии на эту тему.
                              0
                              Тут такое дело — это сильно зависит от системы, например для x86, я не уверен как в современных моделях, но раньше инструкций для когерентности кэш небыло. Я имеею ввиду инструкций упреждающей выборки, таких как prefetchnta или prefetch0. Кроме того, я совсем не уверен, что компиляторы в большинстве своем используют их для ситуаций типа «volatile». Как раз таки уверен в обратном (даже для последних — современных).
                              Я подозреваю, что x86 процессор автоматически делает недействительным кэш или часть кэша других ядер на том же чипе, когда значение записывается обратно в память. Потому, что эффекта, что кэш на x86 «грязный» я не наблюдал не для intel, не для amd. Про arm ничего не скажу — многоядерные, типа cortex, для этого еще не залапал как следует.

                              Возвращаясь обратно к примеру, в этом частном случае это не сильно и важно, т.к. худшее что может случится — поток отработает «сделай что-нибудь другое» еще один раз, если снятие блокировки не будет заметно (из-за кеша) неблокирующему потоку еще один цикл «while».
                                0
                                На x86 есть механизм поддержания когерентности кэшей всех ядер. Поэтому вы не можете поставить в один сервер очень много процесоров. Но при этом, если объект попал в разные кэшлайны, его сохранение неатомарно и ядра могут видеть наполовину измененный объект.
                                На многоядерных arm есть механизм поддержания когерентности кэшей всех ядер, но он настраиваемый и отключаемый. Устанавливая биты в специальном регистре можно выбирать требуемый уровень синхронизации.
                                Но, вообще говоря, есть архитектуры, где это не гарантируется. Кроме того, компилятор имеет право положить переменную в регистр (и volatile переменную тоже) и работать с регистром. Другие потоки, естественно, этого не увидят.
                                  0
                                  А еще процессор может проигнорировать событие обновления кэша и использовать старое значение: «Я его считал раньше, чем обновление пришло». В результате ядро может видеть память в состоянии, в котором она никогда не была.
                              0
                              Если они будут лазить в область памяти в пределах одной линии кеша, причем хотя бы один будет писать, то постоянно будут синхронизироваться. И тупить :)
                              0
                              Люди… я показывал (пусть и на близких к си примерах) принцип работы, а не готовую реализацию…

                              И за это Вам большое спасибо. Просто тема настолько топкая и требующая учета кучи особенностей, что хочется разобрать её по полочкам и выбрать оптимальные решения, избавленные от ошибок. Ну или с предупреждениями, в каких условиях они работать не будут.
                              Во первых, volatile в c++ указывает, что другой процесс (для библиотеки) или hardware изменяет переменную в памяти — прошу заметить не поток.

                              Даже еще более общо. Любой доступ к volatile переменной является наблюдаемым поведением и компилятор не имеет права выбрасывать такой доступ или менять его местами с другими наблюдаемыми эффектами, созданными текущим тредом. Но при этом гарантия, что другие треды увидят эти эффекты, дается только при использовании барьеров памяти или mutex'ов.
                              Про потоки в новом стандарте как раз теперь есть. В старом вообще ничего про них не было, поэтому, вообще говоря, допускалось весьма странное поведение компилятора.
                      +2
                      Спасибо за программистскую часть, но всё, что касается железа, звучит как-то жалко. Производительность СХД (будь то дисковый массив или сложная конструкция из кучи полок) масштабируется куда проще, чем многопоточный код, так что валить проблемы на «медленные диски» неправильно. Говорите какая нужна производительность массива в условиях полного OLAP (читай — полный рандом на чтение и запись) — и её вполне можно достигнуть.
                        +1
                        Путь наращивания железной составляющей и принебрежения пограмной части, ИМХО, — тупиковый путь. К сожалению очень не многие придерживаются этого мнения и… мы имеем, то что имеем — Ubuntu который умирает на 2ГБ и оперативки и т.д.
                          0
                          Вы считаете, что 2Гб это слишком много? Почему же тогда вы не пишите программы, которые работают в пределах 512кб оперативной памяти?
                            0
                            Ну как бы программа != ось, да и программа программе рознь. Хотя не будем холиварить…
                            А вообще, вы знаете, пишу… Только это сегодня довольно сложно, даже со своими велосипедами — например на debian размер стека по умолчанию 10240кб. Такие дела…
                            0
                            >Путь наращивания железной составляющей и принебрежения пограмной части, ИМХО, — тупиковый путь. К сожалению очень не многие придерживаются этого мнения и…
                            начальство либо не допонимает этого, либо просто нет бюджета — обычная ситуация
                          –2
                          Достаточно интересная, но иногда неблагодарная работа — оптимизировать все это хозяйство.

                          да, относительно сложно все оптимизировать — но, согласись, этим наша работа и интересна.
                          Синхронизируя такой доступ, мы ограничеваем одновременное исполнение некоторых критичных участков кода. Как правило это один, редко несколько потоков (например 1 writer/ N readers).
                          Необходимость синхронизации неоспорима. Чрезмерная же синхронизация очень вредна
                          до боли все знакомо.
                          Очень хорошая заметка про пулы, давно их использую. Вообще спасибо за статью, ждем продолжения.
                            0
                            Просьба в следующей части осветить назначение, нюансы и подводные камни ключе5вого слова volatile.
                              0
                              Давно пользуюсь очередью данных: используется поток, который сам готовит данные и по запросу дает потоку ответ… Изначально он просто готовит данные и хранит их, удаляя из общего списка запросов.
                              Позволяет практически отказываться от синхронизации.
                                0
                                Ваша очередь данных или очередь «запросов» синхронизирована :) и это правильно.
                                  0
                                  хотя нет, забыл ring buffer-очередь, на ней можно и без синхронизации. Хотя тоже не без недостатков: например только 1 reader и 1 writer.
                                  У вас она?
                                    0
                                    Она.
                                    Внутри просто регистрируются потоки и под каждый создается так называемый блок данные, который выбирается из общего потока.
                                    Таким образом достаточно отдать данные из переменной и написать новые, не мешая общему списку и другим потоком.
                                    Но да, функция одна, но вызываться может ассинхронно.
                                      0
                                      В Intel DPDK (Data plane development kit) тема ring buffer'ов с возможностью многих читателей и писателей раскрыта очень хорошо (а также пулов данных, процессорных кэшей и максимального увеличения производительности приложений-обработчиков сетевого трафика). Там работает все максимально изолированно от OS, даже мьютексы считаются долгой операцией, и синхронизировано все через хардварные локеры… платформо-зависимо правда, но работает очень эффективно.
                                      Занимательная статья на тему подходов к fast-path processing с точки зрения Intel (да и не только): download.intel.com/design/intarch/papers/321058.pdf
                                      P.S. Правда это уже ближе к embedded и обработке в реальном времени, с попыткой подвинуть TI процессоры.
                                        0
                                        Что-то подобное хотел раскрыть в следующеей статье — правда насколько помню у intel на Data Plane (или это был Control Plane?) оно там не то чтобы совсем асинхронно (повестка дня: atomic locking).
                                          0
                                          Синхронность / асинхронность зависит от выбранной модели обработки (см. статью). На Control Plane производительность в общем случае никого не волнует, важна 100%-я доставка, т.к. траффик маленький но важный, а вот на Data Plane там все синхронизации делаются через asm атомарные операции процессора, на которую тратится (если я не ошибаюсь) около 1 такта, и нет никаких блокирующих ожиданий — таких как взаимодействие с OS и т.п. — все они конвейером могут быть вынесены на отдельные коры процессора и работать асинхронно разбирая очереди.
                                          К сожалению, я внезапно для себя обнаружил что в открытом доступе про DPDK и их реализации/парадигмы информации нет, так что больше ничего не скажу, чтобы не сболтнуть лишнего..( Скажу лишь что они там действительно ускоряют все что ускоряется.
                                        0
                                        А можно чуточку подробней, как с кольцевым буфером без синхронизации? Прикинул, как стал бы программировать, и в голове сразу слово synchronized возникло. В чем фокус?
                                          +1
                                          Чуточку навряд ли получится. Ну если коротко, то в таком буфере хранятся указатели, данные в которых переписываются одним потоком, затем выставляется флаг «готов для приема» — второй поток бежит по кольцу «готовых» (и удаляет их после работы) пока не встретит не «готовый», тогда делаем что-то другое (можно ложится спать) — в таком часном виде можно обойтись без синхронизации кольца. Только нужно всегда оставлять один указатель «неготовым» — иначе труба. Такое кольцо можно даже без синхронизации расширять…
                                            0
                                            Спасибо, идею уловил. Видел вы внизу собирались про это статью писать, буду ждать с нетерпением.
                                              0
                                              только осторожно расширять, только одним потоком (второй «спит» на «последнем неготовом»)
                                                0
                                                В этом плане можно избежать данной проблемы, когда потоку со списком известно какой поток за какие запросы отвечает и работать со списком самостоятельно.
                                                Тут как бы возникает такая последовательность:
                                                Есть поток данных, в котором регистрируется обрабатывающий поток.
                                                Поток данных готовит для него область памяти и отбирает первый из списка очереди в данную область памяти.
                                                Зарегистрировавшийся поток делает запрос на данные к потоку данных и получает их.
                                                Пока обрабатывающий поток работает с данными, поток данных готовит следующую партию в область данных.
                                                Если данных нет, то при следующем запросе просто отвечаем фолс.

                                                Тут в принципе простор для идей и объяснил я сумбурно, т.к. проще работать с указателями, но, надеюсь, мысль ясна.
                                                  0
                                                  все-таки не совсем то — «поток делает запрос к другому потоку» — тут у вас синхронизация…
                                                  ring buffer работает по другому: оба потока бегают по кругу параллельно (не завися друг от друга) обращаясь к друг другу только через данные, изменения в которых просто пишутся в разные переменные блока и обязательно маркируются флагом — статусом «записано до конца» и/или «готов». Как правило, всегда есть как минимум один «не готовый» блок на котором потоки тормозят. Как правильно написал ниже kibergus:
                                                  Главное барьеры в памяти при этом проставить правильные ...
                                                    0
                                                    Ну, я имел в виду что поток данных один, а регистрируемых — много…
                                                  0
                                                  Главное барьеры в памяти при этом проставить правильные, чтобы все потоки увидели изменения в нужном порядке.
                                            +1
                                            Ну и можно добавить, что существует достаточно большое количество lock-free/non-blocking алгоритмов, особенно на чтение
                                              +2
                                              Хотел упаковать это в «Асинхронное программированиие» в следующую статью. Например про конструкции типа ring buffer и как это огранничено можно использовать для нескольких потоков.
                                              0
                                              Кстати, приведенный способ ускорения мьютексов на самом деле является недоделанной версией futex'а. Объекта, try lock которому можно сделать в userspace т.к. железо позволяет атомарно и синхронно проверить значение и изменить его. А если захват не удался, то есть системный вызов для ожидания, пока он освободится. Освобождение futex'а также не требует системного вызова т.к. использует аппаратные возможности.

                                              Забавно, что gcc использует futex'ы для блокировок при инициализации статических переменных и в библиотеке транзакционной памяти. Но std::mutex, насколько я понимаю, этот системный вызов не использует.
                                                0
                                                Не понял примера про семафоры. Каким образом происходит снижение нагрузки?
                                                  0
                                                  Например профайлером находятся узкие места, где при увеличении количества потоков, начинает «тормозить» — и ограничеваем эти места семафорами на определенное количество потоков (разверните приведенный пример profiler output — run#2, функция InvalidateIdx).
                                                  Такой способ позволяет например быстро увеличить общее число потоков (для параллельного исполнения — если к примеру выросло число посетителей на сайте) без крупной переделки кода, при этом гарантирует, что сервер не ляжет вспячку на HDD seek, транзакциях банка данных или множественном context switch и т.д и т.п.

                                                Только полноправные пользователи могут оставлять комментарии. Войдите, пожалуйста.

                                                Самое читаемое