Как стать автором
Обновить

Немного о многопоточном программировании. Часть 1. Синхронизация зло или все-таки нет

Время на прочтение12 мин
Количество просмотров70K
Мне по работе часто приходится сталкиваться с высоконагруженными многопоточными или многопроцессными сервисами (application-, web-, index-server).
Достаточно интересная, но иногда неблагодарная работа — оптимизировать все это хозяйство.
Растущие потребности клиентов часто упираются в невозможность просто заменить железную составляющую системы на более современную, т.к. производительность компьютеров, скорость чтения-записи жестких дисков и сети растут много медленнее запросов клиентов.
Редко помогает увеличение количества нодов кластера (система как правило распределенная).
Чаще приходится запустив профайлер, искать узкие места, лезть в source code и править ляпы, которые оставили коллеги, а иногда и сам, чего греха таить, много лет назад.
Некоторые из проблем, связаных с синхронизацией, я попытаюсь изложить здесь. Это не будет вводный курс по многопоточному программированию — предпологается, что читатель знаком с понятием thread и context switch, и знает для чего нужны mutex, semaphore и т.д.

Любому разработчику, многопоточно проектирующему что-то большее чем «Hello world», ясно, что создать полностью асинхронный код невероятно сложно — нужно что-то писать в общий channel, изменить структуру в памяти (к примеру повернуть дерево hash-таблицы), забрать что-то из очереди и т.д.
Синхронизируя такой доступ, мы ограничеваем одновременное исполнение некоторых критичных участков кода. Как правило это один, редко несколько потоков (например 1 writer/ N readers).
Необходимость синхронизации неоспорима. Чрезмерная же синхронизация очень вредна — кусок программы более-менее шустро работаюший на 2-х или 3-х потоках, уже для 5-ти потоков может выполняться почти «singlethreaded», а на 20-ти даже на очень неплохом железе практически ложится спать.

Однако практика показывает, что иногда и недостаточная синхронизация исполнения приводит к тому же результату — система залипает. Это происходит, когда исполняемый параллельно код содержит например обращения к HDD (непрерывный seek), или при множественном обращении к различным большим кускам памяти (например постоянный сброс кэша при context switch — CPU cache просто тупо отваливается).

Используйте семафоры (semaphore)

Семафоры изобрели не только для того, что бы на них строить конструкции вида ReadWriteMutex. Семафоры можно и нужно использовать для уменьшения нагрузки на железо на куске кода, исполняемого параллельно.
Как правило таким образом можно вылечить множество «залипаний», которые легко найти профилированием кода — когда при увеличении количества потоков, время исполнения отдельных функций заметно растет, при том, что другие функции отрабатывают с той же или сравнимой скоростью.
Развернуть Profiler-Output
========================================================================================================================
Run # 1 (5 Threads)
  rpcsd (hbgsrv0189, PID:0718, TID:2648)
  # 03-09-2012 | 13:50:45 | Servlet: A::RpcsServlet, URI: /index-search
========================================================================================================================
                             NS | Name                   |  C |  T | Tot(s) | TwR(s) | Avg(s) | AwR(s) | Max(s) | Min(s)
========================================================================================================================
                 ::RPC::Service | service                |  1 |  1 |  1.593 |  1.593 |  1.593 |  1.593 |  1.593 |  1.593
               ::A::RpcsServlet | service                |  1 |  1 |  1.592 |  1.592 |  1.592 |  1.592 |  1.592 |  1.592
                  ::IndexSrvRpc | index-search           |  1 |  1 |  1.584 |  1.584 |  1.584 |  1.584 |  1.584 |  1.584
              ::Indexer::Search | Search                 |  1 |  1 |  1.584 |  1.584 |  1.584 |  1.584 |  1.584 |  1.584
              ::Indexer::Search | ParallelSearch         |  2 |  2 |  1.256 |  1.256 |  0.628 |  0.628 |  0.655 |  0.601
       ::Indexer::Search::Cache | SearchL2Index          | 44 | 44 |  0.686 |  0.686 |  0.016 |  0.016 |  0.016 |  0.015
              ::Indexer::Search | InvalidateCacheIdx     | 20 | 20 |  0.570 |  0.570 |  0.028 |  0.028 |  0.031 |  0.020
       ::Indexer::Search::Cache | InvalidateIdx          | 20 | 20 |  0.276 |  0.276 |  0.014 |  0.014 |  0.016 |  0.002
              ::Indexer::Search | SearchL1Index          |  1 | 14 |  0.203 |  0.203 |  0.203 |  0.016 |  0.203 |  0.016
              ::Indexer::Search | MergeJoin              |  1 |  1 |  0.125 |  0.125 |  0.125 |  0.125 |  0.125 |  0.125

========================================================================================================================
Run # 2 (25 Threads w/o semaphore)
  rpcsd (hbgsrv0189, PID:0718, TID:2648)
  # 03-09-2012 | 13:52:03 | Servlet: A::RpcsServlet, URI: /index-search
========================================================================================================================
                             NS | Name                   |  C |  T | Tot(s) | TwR(s) | Avg(s) | AwR(s) | Max(s) | Min(s)
========================================================================================================================
                 ::RPC::Service | service                |  1 |  1 |  4.255 |  4.255 |  4.255 |  4.255 |  4.255 |  4.255
               ::A::RpcsServlet | service                |  1 |  1 |  4.254 |  4.254 |  4.254 |  4.254 |  4.254 |  4.254
                  ::IndexSrvRpc | index-search           |  1 |  1 |  4.244 |  4.244 |  4.244 |  4.244 |  4.244 |  4.244
              ::Indexer::Search | Search                 |  1 |  1 |  4.244 |  4.244 |  4.244 |  4.244 |  4.244 |  4.244
              ::Indexer::Search | ParallelSearch         |  2 |  2 |  3.729 |  3.729 |  1.865 |  1.865 |  1.889 |  1.840
              ::Indexer::Search | InvalidateCacheIdx     | 20 | 20 |  2.497 |  2.497 |  0.125 |  0.125 |  0.126 |  0.125
       ::Indexer::Search::Cache | InvalidateIdx          | 20 | 20 |  2.188 |  2.188 |  0.109 |  0.109 |  0.113 |  0.109
       ::Indexer::Search::Cache | SearchL2Index          | 44 | 44 |  1.231 |  1.231 |  0.028 |  0.028 |  0.031 |  0.015
              ::Indexer::Search | SearchL1Index          |  1 | 14 |  0.360 |  0.360 |  0.360 |  0.028 |  0.360 |  0.016
              ::Indexer::Search | MergeJoin              |  1 |  1 |  0.155 |  0.155 |  0.155 |  0.155 |  0.155 |  0.155
              
========================================================================================================================

Run # 3 (25 Threads with semaphore in InvalidateCacheIdx, before InvalidateIdx)
  rpcsd (hbgsrv0189, PID:0718, TID:2648)
  # 03-09-2012 | 14:02:51 | Servlet: A::RpcsServlet, URI: /index-search
========================================================================================================================
                             NS | Name                   |  C |  T | Tot(s) | TwR(s) | Avg(s) | AwR(s) | Max(s) | Min(s)
========================================================================================================================
                 ::RPC::Service | service                |  1 |  1 |  2.213 |  2.213 |  2.213 |  2.213 |  2.213 |  2.213
               ::A::RpcsServlet | service                |  1 |  1 |  2.213 |  2.213 |  2.213 |  2.213 |  2.213 |  2.213
                  ::IndexSrvRpc | index-search           |  1 |  1 |  2.205 |  2.205 |  2.205 |  2.205 |  2.205 |  2.205
              ::Indexer::Search | Search                 |  1 |  1 |  2.205 |  2.205 |  2.205 |  2.205 |  2.205 |  2.205
              ::Indexer::Search | ParallelSearch         |  2 |  2 |  1.690 |  1.690 |  0.845 |  0.845 |  0.889 |  0.801
       ::Indexer::Search::Cache | SearchL2Index          | 44 | 44 |  1.153 |  1.153 |  0.026 |  0.026 |  0.031 |  0.016
              ::Indexer::Search | InvalidateCacheIdx     | 20 | 20 |  0.537 |  0.537 |  0.027 |  0.027 |  0.031 |  0.007
              ::Indexer::Search | SearchL1Index          |  1 | 14 |  0.359 |  0.359 |  0.359 |  0.028 |  0.359 |  0.017
       ::Indexer::Search::Cache | InvalidateIdx          | 20 | 20 |  0.278 |  0.278 |  0.014 |  0.014 |  0.016 |  0.004
              ::Indexer::Search | MergeJoin              |  1 |  1 |  0.156 |  0.156 |  0.156 |  0.156 |  0.156 |  0.156


В третьей выдаче профайлера можно видеть, как изменилось время исполнения метода InvalidateIdx и соответственно метода InvalidateCacheIdx, после окружения семафором invCI_semaphore вызова метода InvalidateIdx
semaphore invCI_semaphore(config.InvCI_Count/* = 5*/);
...
int InvalidateCacheIdx() {
  ...
  while (...) {
    cache.SearchL2Index();
    invCI_semaphore++;
    while (cache.InvalidateIdx()) {};
    invCI_semaphore--;
  }
  ...
}

Такой метод использования семафоров довольно прост и не обязательно требует полного понимания процесса, но имеет множество недостатков, в том числе тот факт, что максимальное количество потоков для каждого блока будет скорее всего подбираться в бою (в продакшен, на системе клиента) — что не всегда есть хорошо. Зато огромным преимуществом такого способа оптимизации является возможность быстро увеличить количество потоков всего сервиса, без изменения execution plan, т.е. практически без переделки всего движка — просто проставив несколько семафоров на предыдущее значение в узких местах. Я не сторонник необдуманно использовать семафоры, но в качестве временного решения (для успокоения клиента), я не раз использовал этот метод, что бы впоследствии спокойно переделать «правильно», вникнув в исходный код.

Расставляйте приоритеты (priority)

Приоритеты являются очень удобным механизмом, так же позволяющим довольно просто «облегчить» приложение. Например, если в системе логи пишутся отдельным потоком, то уменьшив ему приоритет до минимального, можно сильно «облегчить» процесс, не уменьшая log-level.
Например конструкцию следующего вида можно использовать если пул со множеством потоков обрабатывает задачи разной приоритетности:
// before doing ...
if ( thisThread.pool.count() > 1 
  && !(currentTaskType in (asap, immediately, now)) 
) {
  thisThread.priority = 2 * thisThread.pool.priority;
} else {
  thisThread.priority = 5 * thisThread.pool.priority;
}
// do current task ...

При этом нужно понимать, что приоритет потока действует для всего процесса, а не только для пула в котором существует этот поток — используйте его с оторожностью.

Divide et impera (Разделяй и властвуй)

Довольно часто не требуется мгновенное исполнение какого-либо участка кода — т.е. некоторое действие или часть задачи можно отложить. Например писать логи, считать посещения, переиндексировать кэш, и т.д.
Существенно повысить скорость выполнения можно, выделяя куски синхронного кода в отдельные задачи, с последующим выполнением их позже (например фоново — используя т.н. background service). Это может быть отдельный поток, пул потоков или даже другой процесс aka RPC (например асинхронный вызов WebService). Естественно временная стоимость вызова (помещения в очередь и т.д.) этой задачи должна быть меньше стоимости самого исполнения.
Пример с отдельным LOG-потоком:
// здесь мы пишем лог напрямую :
int log(int level, ...) {
  if (level >= level2log) {
    logMutex.lock();
    try {
      file.write(...);
      file.flush();
    } finally {
      logMutex.release();
    }
  }
}


// здесь - фоново :
int log(int level, ...) {
  if (level >= level2log) {
    // защитить, добавить и освободить очередь :
    logQueue.mutex.lock();
    logQueue.add(currentThread.id, ...);
    logQueue.mutex.release();
    // разбудить лог-worker'а :
    logQueue.threadEvent.pulse();
  }
}
// background-logging thread:
int logThreadProc() {
  ...
  while (true) {
    // делаем задержку - ожидаем латенц /* 500 ms */ или размер очереди /* 10 */:
    if ( logQueue.count < config.LogMaxCount /* = 10 */
      || (sleepTime = currentTime - lastTime) < config.LogLatency /* = 500 */) 
    {
      logQueue.threadEvent.wait(config.LogLatency - sleepTime);
      continue;
    };
    // пишем в буфер и удаляем из очереди :
    logQueue.mutex.lock();
    try {
      foreach (... in logQueue) {
        file.write(...);
        logQueue.delete(...);
      }
    } finally {
      logQueue.mutex.release();
    }
    // пишем буфер в лог:
    file.flush();
    // спать :
    logQueue.threadEvent.wait();
    lastTime = currentTime;
  }
  ...
}

Такая простая конструкция позволяет существенно снизить затраты на логирование и уменьшить последствия от context switch, которые практически не будут зависеть от количества потоков, использующих метод log.
Важно понимать, что теперь, навешивая дополнительную логику на логирование, нагружается только поток непосредственно пишущий в лог. Т.е. можно сколько угодно делать наш лог интелигентней — ввести понятие LogLatency, как на примере, добавить какой-нибудь лог-анализатор (что-нибудь как fail2ban) или же сохранять например все debug сообщения, с целью логировать их только в случае ошибки, группировать по TID, и т.д. — все это практически не будет нагружать остальные потоки.
Кроме того, при использовании первого метода (сообщение пишется синхронно напрямую в лог-файл), потоки так сказать «разпараллеливаются». Т.е. чем больше объектов синхронизации (mutex, critical section, waiting events) и выше затраты на context switch, тем вероятнее, что все потоки проходящие через эти объекты будут выполнятся последовательно.
Т.е. скорость многопоточного исполнения задачи приближается или становится даже хуже скорости однопоточного ее исполнения. Уменьшая время между lock() и release(), код улучшается сразу в двух направлениях — становится быстрее в самом потоке и уменьшается вероятность «разпараллеливания» процесса.
Организовав очередь событий, иногда можно создавать подобные конструкции даже не прибегая к созданию дополнительных потоков. Например, записывать в очередь некоторые действия, чтобы позже, например во время «idle time» выполнить их этим же потоком, один за другим.
Легко проилюстрировать это можно на TCL:
## отдать страницу / документ ...
...
## показать counter :
set counter [db onecolumn {select cntr from accesslog where userid = $userid}]
%>
Вы видели эту страницу <%= $counter %> раз...
<%
## добавить событие "писать access log" in background, когда будет выполнено "update idle":
after idle UpdateAccess $userid [clock seconds]
## завершить.
....
## где-то в коде приложения :
proc UpdateAccess {userid lasttime} {
  db exec {update accesslog set cntr = cntr + 1, lastaccess = $lasttime where userid = $userid}
}


Очереди, FIFO, LIFO и многопоточность

Организация очереди, пула данных или последовательного буфера дело не хитрое, однако нужно иметь в виду, что при многопоточности и прочих равных условиях очередь LIFO стоит сделать выбором номер один (конечно если при этом последовательность действий не важна). Иногда можно комбинировать или группировать LIFO и FIFO (элементы LIFO сделать маленькими очередями FIFO или например строить буфер с конца и т.д.). Смысл таких извращений кроется в кеше процессора и отчасти в виртуальной организации памяти. Т.е. вероятность того, что последние элементы из LIFO еще находятся в кеше процессора несравненно выше вероятности того же у FIFO той же длинны.

Пример из жизни — В нашем собственном memory manager, была организована хеш-таблица из пулов свободных объектов одинакого размера (кто очень часто вызывал malloc / free, знает зачем это делается:). Пулы были организованны по принципу FIFO — функция mymalloc возвращала первый, давным-давно положеный в пул функцией myfree элемент. Причина побудившая тогда разработчика использовать FIFO проста до банальности — если какой-нибудь недобросовестный «программист» некоторое время будет использовать объект после myfree, то программа возможно проработает дольше. После замены на LIFO весь арсенал (application server), активно использующий memory manager заработал порядка 30% быстрее.

ReadWriteMutex

Очень часто синхронизировать необходимо только в случае изменения объекта. Например при записи в общий файл, при изменении структуры списков или hash таблиц и т.д. При этом, как правило, это разрешается только одному потоку, при этом часто даже читающие потоки блокируются (что бы исключить dirty read и вылет программы с исключением, поскольку записи до конца изменения не совсем валидны).
Блокировку таких объектов правильнее делать используя RW-mutex, где читающие потоки не блокируют друг друга, и только при блокировке записи происходит полная синхронизация кода (исполняется одним потоком).
При использовании read/write-mutex, нужно всегда точно представлять, как происходит чтение объекта, поскольку в некоторых случаях, даже при чтении, объект может изменятся (например при построении внутреннего cache при первичной инициализации или реинициализации после записи). В этом случае идеальный API предоставляет callback для блокировки, либо блокирует самостоятельно в случае многопоточности, либо возможное использование RW-mutex, со всеми исключениями, подробнейше описано в документации к API. В некоторых реализациях RW-mutex нужно заранее знать (сообщать mutex) количество reader-потоков, иногда writer-потоков. Это связано с конкретной реализацией блокировки записи (как правило используются семафоры). Несмотря на эти и другие ограничения, при наличии нескольких reader-потоков, желательно по возможности пытаться выполнить синхронизацию именно на таком mutex.

Читайте документацию, читайте source code

Проблема незнания, иногда непонимания того, что скрывается за тем или иным классом или объектом, особенно критично проявляется при использовании их в многопоточном приложении. Особенно это касается и базовых объектов синхронизации. Попробую разъяснить, что я имею в виду, на примере неправильного использования RW-mutex.
Один мой коллега как-то использовал fair RW-mutex, построеный на семафорах. Он поленился динамически передавать количество reader-потоков в класс RWMutex (задал статически «максимально возможное» значение 500) и написал следующий код для writer-потока:
...
RWMutex mtx(500);
...
mtx.lockWrite();
hashTab.add(...);
mtx.releaseWrite();
...

И при хорошей нагрузке, сервер уходил в глубокий запой ложился в спячку. Все дело в том, что он сделал две ошибки — взяв статичное значение 500 и не разобрался как будет вести себя такой RW-mutex на этой конкретной платформе. Т.к. RW-mutex был сделан fair — использовался код, подобный следующему:
void RWMutex::lockWrite() {
  writeMutex.lock();
  for (register int i = 0; i < readersCount /* в нашем случае 500 */; i++)
    readSemaphore++;
}
void RWMutex::releaseWrite() {
  if (!f4read)
    writeMutex.release();
  readSemaphore -= readersCount;
  if (f4read)
    writeMutex.release();
}

Такая конструкция благодаря использованию в теле lockWrite инкремента readSemaphore++ в цикле, вместо readSemaphore += readersCount, дает одинаковые шансы для reader- и writer-потоков. Возможно он не знал, что семафор класс для построения этого RWMutex, использовал одну кросплатформенную библиотеку, которая выдавала для этой конкретной платформы простенький код, выглядевший как-то так:
int Semaphore::operator ++() {
  mutex.lock();
  if (sema++ > MaxFlowCount) flowMutex.lock();
  mutex.release();
}

Т.е. при добавлении в хеш-таблицу hashTab 100-а значений, при одновременном чтении несколькими reader-потоками, мы имели 100*500 блокировок (и выпадание в осадок на несколько милисекунд из-за context switch). Самое интересное в этой истории, что это был базовый класс RWSyncHashTable, активно используемый повсеместно в нашем коде.
Нужно четко запомнить: некоторые конструкции API могут быть уже синхронизированы. Иногда это даже конструктор и деструктор объекта. В этом случае дополнительная синхронизация — часто вред. Это как раз тот случай, когда кашу маслом испортишь.
Читайте источники, заглядывайте в документацию к API — и такие ляпы, с большей вероятностью, обойдут Вас стороной.

Синхронный != Ожидание

Синхронизация исполнения абсолютно не означает, что наш процесс только и делает, что ждет. Блокирующие методы современных систем давольно гибки, и позволяют делать следуюшие конструкции:
static int mtx_locked = 0;
// уже заблокировано кем-то - нет, подождем 1 мс?
while ( mtx_locked || !mtx.lock(config.MaxWaitTime /* пример 1 ms */) ) {
  // не могу блокировать - сделай что-то другое ... например ...
  processNextRequest();
}
// за мютексом - блокирован ...
mtx_locked++;
// исполняем ...
processInLock();
// unlock ...
mtx_locked--;
mtx.release();

Использование кода такого вида, позволяет не ждать блокировки mutex'а и ложится спать, а попытаться сделать что-то другое в это время. На подобном принципе, хотя зачастую немного по другому реализованному (callback or event execution, transactional nowait locking, per thread-caching и т.д.), базируется концепция асинхронного программирования. При этом нужно соблюдать очень простое правило — «не ждать».
В этом примере приведен еще один прием избежать или минимировать context switch: это статичная переменная mtx_locked. Такой прием позволяет не выполнять mtx.lock, если и так заранее известно, что код блокирован (mtx_locked > 0), и нам не обязательно это знать наверняка — мы просто делаем что-то другое.

Наверное стоит закончить на этом первую часть (а-то много букв). Если где-то написал прописные для кого-то истины, прошу простить покорно — не со зла. Предложения, пожелания, критика приветствуются.

В следующей части:

  • Deadlock
  • События; реакция во время ожидания;
  • Синхронизация в Банках данных;
  • Systemwide synchronization (crossprocess, crosscluster)
  • Асинхронное программированиие;
  • Shared resources
  • Garbage, Освобождение ресурсов
  • Мониторинг потоков, HeartBit
  • Профайлинг
  • Ваши пожелания
Теги:
Хабы:
Если эта публикация вас вдохновила и вы хотите поддержать автора — не стесняйтесь нажать на кнопку
Всего голосов 69: ↑59 и ↓10+49
Комментарии55

Публикации

Истории

Ближайшие события