Привет, Хабр! В предыдущей статье был разобран первичный анализ работы приложения, какие инструменты стоит использовать для сбора информации и как с этими инструментами работать. Напомню, что речь шла о двух утилитах: poor man's profile (pt‑pmp), которая позволяет комплексно оценивать работу приложения, отображая off-cpu и on-cpuчасти; и perf, которая обладает высокой точностью и мощной функциональностью в целом. Оба этих инструмента применяются для анализа производительности, так как их комбинация позволяет целиком и со всех сторон осмотреть «пациента».
Однако есть один пункт, который не был раскрыт в прошлой части: использование этих инструментов на настоящих продуктах. Синтетический пример на базе open‑source‑проекта — это хорошо, но будет не лишним показать, какие реальные проблемы были найдены, исправлены и какой прирост производительности удалось в итоге получить. В этой статье мы поговорим о практическом применении pt-pmp и perf, с помощью которых удалось обнаружить места для оптимизации работы п��ограммы. Меня по‑прежнему зовут Александр Слепнев, устраивайтесь поудобнее, начинаем!
Использование pt-pmp
Первичный анализ приложения
Основное преимущество pt-pmp по сравнению с perf — возможность отобразить on-cpuи off-cpu-части. И это может быть полезно при анализе многопоточных приложений: оценить взаимодействие потоков между собой, определить, есть ли длительное ожидание в их работе и прочее. В качестве примера рассмотрим проект syslog module, который предназначен для пассивного сбора событий MaxPatrol SIEM и используется для приема сообщений стандарта syslog и их дальнейшей обработки. Не будем сильно погружаться в логику работы с пакетом, нам интересна структура модуля. Он состоит из одного потока‑производителя (producer) и множества потоков‑потребителей (consumers). Когда producer получает на вход пакет, он перемещает его в специальную очередь, откуда consumers вычитывают сообщение и начинают обработку — таким образом ведется параллельная работа над пакетами. Очередь формируется с помощью std::conditional_variable и std::list. Producer‑поток работает следующим образом:
// Через std::conditional_variable проверяем, что в очереди есть место
std::unique_lock<std::mutex> notFullLock{m_NotFullMutex};
m_NotFull.wait(notFullLock,
[this] { return m_queue.size() < m_queueSize; }
);
notFullLock.unlock();
// Помещаем новое сообщение в очередь
{
std::lock_guard<std::mutex> queueLock{m_QueueMutex};
m_queue.emplace_back(std::move(message));
}
// Пробуждаем один поток-consumer для обработки сообщения
m_NotEmpty.notify_one();Теперь посмотрим на код consumers‑потоков:
while (!m_Stopped)
{
// Через std::conditional_variable проверяем, что в очереди есть сообщение
std::unique_lock<std::mutex> notEmptyLock{m_NotEmptyMutex};
m_NotEmpty.wait(notEmptyLock,
[this] { return !m_queue.empty(); }
);
notEmptyLock.unlock();
// Вычитываем сообщение из очереди
Message message;
{
std::lock_guard<std::mutex> queueLock{m_QueueMutex};
if (m_queue.empty())
continue;
message = m_queue.front();
m_queue.pop_front();
}
// Пробуждаем другие потоки и начинаем обработку
m_NotFull.notify_one();
message.process();
}Видно, что потоки могут блокировать друг друга в двух местах:
проверка
std::conditional_variableна пустую (полную) очередь;добавление (вычитывание) сообщения из очереди.
При обычной работе модуля такой код может не вызвать проблем, однако если его сильно нагрузить и послать на вход много пакетов, producer или consumers могут перейти в режим ожидания (например, при быстрой обработке сообщений producer не успевает наполнить очередь). Чтобы понять, как ведет себя модуль под нагрузкой, были проведены нагрузочные тесты, в которых использовалось восемь потоков‑потребителей. При помощи pt-pmp удалось построить флеймграф:

Здесь отображена только часть флеймграфа, а именно методы потоков‑потребителей (корневой метод — Workers::ExecuteTask). Видно, что consumers тратят примерно 77% времени на обработку пакета, остальные 23% ждут разблокировку мьютекса, тем самым блокируя работу друг друга. Становится понятно, что для оптимизации работы потребителей необходимо уменьшить время, в течение которого они простаивают, и направить эти ресурсы непосредственно на работу с сообщениями.
Проработка решения проблемы
Эффективный способ ускорить работу потоков в приведенном примере — поменять концепцию std::conditional_variable + std::list и полностью убрать работу с мьютексами. Отличным вариантом кажется lock-free queue — это структура данных, предназначенная для хранения элементов без использования механизмов блокировки. Lock-free queue использует атомарные операции и другие методы синхронизации, которые позволяют нескольким потокам одновременно добавлять и извлекать элементы из очереди без блокировки друг друга (больше подробностей про такие очереди можно прочитать в данной статье).
В случае модуля SysLog очередь должна подходить под следующие требования:
соответствовать шаблону
single producer — many consumers;быть ограниченной: нельзя добавить в очередь бесконечное количество сообщений;
иметь возможность блокировать потоки: если очередь пустая, потоки‑потребители должны вставать в ожидание нового сообщения. Если очередь полная, поток‑производитель должен вставать в ожидание нового места.
Под такое описание подходит concurrentqueue — это реализация lock‑free‑очереди на базе последовательных блоков вместо связанных списков. Она состоит из набора подочередей, по одной для каждого производителя. Когда поток‑потребитель хочет вычитать сообщение из очереди, он пр��веряет все подочереди, пока не найдет ту, которая не пуста (в нашем случае используется только одна подочередь, так как производитель один). Более подробное описание с деталями реализации и бенчмарками можно посмотреть в статье от автора библиотеки.
После интеграции очереди код потока‑производителя выглядит так:
// Создаем очередь (используем вариант с блокировкой потоков)
static constexpr std::size_t EXPLICIT_PRODUCERS_NUMBER = 1;
static constexpr std::size_t IMPLICIT_PRODUCERS_NUMBER = 0;
static moodycamel::BlockingConcurrentQueue<Message> m_queue{queueSize, EXPLICIT_PRODUCERS_NUMBER, IMPLICIT_PRODUCERS_NUMBER};
// Создаем токен для потока-производителя
static moodycamel::ProducerToken token(m_queue);
// Помещаем сообщение в очередь
if (!m_queue.try_enqueue(token, std::move(message)))
m_queue.wait_enqueue(token, std::move(message));
Изначально используется метод try_enqueue, который помещает сообщение в очередь, если есть место. В случае, если оно отсутствует, вызываем wait_enqueue, который сделает это, когда место будет свободно. На момент написания статьи BlockingConcurrentQueue не содержит этого метода, поэтому я реализовал его следующим образом:
// Отправляем поток «спать», затем пытаемся добавить сообщение в очередь.
// Время ожидания увеличивается в два раза с каждой последующей итерацией.
inline void wait_enqueue(producer_token_t const& token, T&& item)
{
auto sleepTime = 100;
do
{
std::this_thread::sleep_for(std::chrono::nanoseconds(sleepTime));
sleepTime = std::min(sleepTime * 2, 10000000);
} while (!add_item_busy_wait(token, std::move(item)));
}
// Пробуем добавить сообщение в очередь. Если места нет, используем spin lock для ожидания
// (предполагается, что место в очереди появится быстро и короткой паузы будет достаточно)
bool add_item_busy_wait(producer_token_t const& token, T&& item)
{
for (auto i = 0; i < 1000; i++)
{
if (try_enqueue(token, std::move(item)))
return true;
_mm_pause();
}
return false;
}Ниже представлен код потока‑потребителя:
static constexpr std::int64_t QUEUE_WAITING_PERIOD_USEC = 100000;
// Создаем токен для потока-потребителя
thread_local moodycamel::ConsumerToken token(m_queue);
while (!m_Stopped)
{
Message message;
if (!m_Tasks.wait_dequeue_timed(token, message, QUEUE_WAITING_PERIOD_USEC))
continue;
message.process();
}Здесь логика простая: ожидаем сообщение в очереди, вычитываем его и обрабатываем.
Тестирование оптимизации
Теперь посмотрим, как изменились флеймграфы после добавления патча. pt-pmp показал такие результаты:

Видно, что обработка сообщения теперь занимает 97,5% времени, ожидание заметно уменьшилось, на графе не видны мьютекс‑блокировки, так как используется lock-free queue. По сравнению с предыдущей версией модуль SysLog стал работать примерно на 50% быстрее. Ниже представлены результаты сравнения двух версий через MPS (message per second). Красным обозначена версия до оптимизации, синим — после:

Сравнение с perf-флеймграфами
Остается ответить на вопрос: что в данном случае показывает флеймграф на основе perf? Получилось бы найти описанную оптимизацию, если бы использовался только on-cpuпрофилировщик?
Ответ: флеймграфа на базе perf недостаточно. Вот так выглядит работа потребителя с точки зрения perf:

Здесь полностью отсутствуют блокировки, в которых consumer на самом деле проводит довольно много времени. Создается ощущение, что поток только обрабатывает сообщения и практически не находится в ожидании.
Теперь посмотрим на perf‑флеймграф после внедрения lock-free queue:

Картина практически не изменилась, видим 97% полезной работы по сравнению с 95,5%. Можно сделать вывод, что для анализа приложения c точки зрения off-cpu части больше подходит pt-pmp, нежели perf.
Использование perf
Основное преимущество этого профилировщика по сравнению с pt-pmp — более высокая точность измерений, поэтому рассмотрим пример, в котором эта особенность пригодилась. Будем анализировать нормализатор событий (компонент MaxPatrol SIEM), его основная задача — преобразовать все события в один вид, чтобы другие компоненты могли его корректно обработать после.
Проблема изменения регистра
Один из популярных методов, который используется при нормализации, — преобразование строки из uppercase в lowercase. Код базируется на open‑source‑библиотеке ICU:
inline static std::string toLowerIcu(std::string_view src) {
static UErrorCode err = U_ZERO_ERROR;
static const UCaseMap *csm = ucasemap_open(nullptr, U_FOLD_CASE_DEFAULT, &err);
std::string dst(src.size(), char{});
ucasemap_utf8ToLower(csm, dst.data(), static_cast<std::int32_t>(dst.size()),
src.data(), static_cast<std::int32_t>(src.size()), &err);
return dst;
}На perf‑флеймграфе преобразование выглядит следующим образом:

Видно, что оно состоит целиком из ICU‑функции и занимает 1,5–2% от общего времени работы, поэтому здесь есть, что оптимизировать.
Подготовка решения и тестирование
Одна из особенностей нормализатора заключается в том, что он должен корректно обрабатывать как строки формата ASCII, так и более обширный набор строк формата UTF-8 (именно поэтому и используется ICU). Однако этот способ довольно медленный, особенно для ASCII‑строк. Для того чтобы сконвертировать такую строку из верхнего регистра в нижний, достаточно вызвать стандартную функцию tolower, которая работает быстрее. Но перед этим необходимо убедиться, что входная строка соответствует формату ASCII.
Обновленный код конвертации выглядит так:
// Вспомогательная функция для проверки ASCII-строки
inline static void isAsciiAndLowercase(std::string_view str,
bool &isAscii,
bool &needsConvToLower)
{
const uint8_t *pos;
size_t len;
uint64_t acc1 = 0;
uint64_t acc2 = 0x2020202020202020UL;
// Предполагается, что входные данные этой функции достаточно короткие,
// то есть сканирование всей строки в векторизуемом цикле должно быть
// быстрее, чем проверка условий внутри циклаь чтобы выйти из него раньше.
//
// Используются следующие предположения:
//
// - у всех символов ASCII (и то��ько у них) старший бит
// (бит // #8) их первого и единственного байта установлен в 0;
//
// - ни для одного символа верхнего регистра ASCII не установлен бит #6
// ('A' - 'Z' кодируются в двоичные значения 0100 0001 - 0101 1010).
// Это означает, что, если бит #6 для всех символов установлен, строка
// гарантированно не будет содержать символов верхнего регистра
// и, следовательно, ее не нужно будет преобразовывать в нижний регистр
// (needsConvToLower == false);
//
// - Наоборот, если все символы в строке меньше 64
// (то есть их бит № 7 очищен), преобразования не требуются;
//
// - если оба условия выше ложные, строка может иметь или не иметь буквы
// верхнего регистра, но функция вернет (needsConvToLower == true),
// то есть ложноположительные срабатывания для этой проверки возможны,
// но не ложноотрицательные.
for (pos = (const uint8_t *)str.data(), len = str.length(); len >= 8;
pos += 8, len -= 8) {
uint64_t val = *(const uint64_t *)pos;
acc1 |= val;
acc2 &= val;
}
uint8_t acc3 = 0x20;
for (; len > 0; pos++, len--) {
acc1 |= *pos;
acc3 &= *pos;
}
isAscii = (acc1 & 0x8080808080808080UL) == 0;
needsConvToLower =
!((acc1 & 0x4040404040404040UL) == 0 ||
((acc2 & 0x2020202020202020UL) == 0x2020202020202020UL &&
(acc3 & 0x20) == 0x20));
}
// Функция конвертации ASCII-строки
inline static std::string asciiToLower(std::string_view src) {
std::string dst(src.size(), char{});
for (size_t i = 0; i < src.size(); i++) {
dst[i] = ::tolower(src[i]);
}
return dst;
}
// Функция преобразования с проверкой ASCII-строки
inline static std::string toLowerAscii(std::string_view src) {
bool isAscii;
bool needsConvToLower;
pt::isAsciiAndLowercase(src, isAscii, needsConvToLower);
if (isAscii) {
return needsConvToLower ? asciiToLower(src) : std::string{src};
}
// Вызываем оригинальный метод на базе ICU,
// если строка не подходит под ASCII-формат
return toLowerIcu(src);
}Сравним его производительность на микробенчмарке:
$ lscpu
...
Vendor ID: GenuineIntel
CPU family: 6
Model: 62
Model name: Intel(R) Xeon(R) CPU E5-2695 v2 @ 2.40GHz
...
$ ./bin/benchmark_string_upper_to_lower
2024-07-29T12:31:18+00:00
Running ./bin/benchmark_string_upper_to_lower
Run on (12 X 2400 MHz CPU s)
CPU Caches:
L1 Data 32 KiB (x12)
L1 Instruction 32 KiB (x6)
L2 Unified 256 KiB (x6)
L3 Unified 30720 KiB (x0)
Load Average: 4.60, 4.39, 3.88
------------------------------------------------------------
Benchmark Time CPU Iterations
------------------------------------------------------------
IcuConversion 1815000 ns 1815000 ns 314
ToLowerConversion 837000 ns 836966 ns 835
Видно, что оптимизированный алгоритм работает примерно в два раза быстрее, чем первоначальная версия. Теперь посмотрим, как это отразится на perf‑флеймграфе:

Метод занимает около 0,6% от общего времени, что на процент меньше оригинальной версии. Это также повлияло на общий EPS (events per second) нормализатора, ускорив его примерно на 1%.

Сравнение с pt-pmp-флеймграфами
Если бы в качестве профилировщика использовался только pt-pmp, то проблему конвертации было бы тяжело обнаружить, поскольку он обладает менее высокой точностью по сравнению с perf и не показывает lower‑метод на флеймграфе.
Выглядит это следующим образом:

Для наглядности продублирую perf‑флеймграф до оптимизации, только с таким же масштабом, как у pt-pmp, чтобы точно увидеть разницу:

Можно сделать вывод, что для нахождения локальных оптимизаций лучше использовать perf, нежели pt-pmp.
Заключение
Описанные примеры демонстрируют сильные и слабые стороны pt-pmp и perf и показывают, когда и в каких случаях был применен каждый из них на практике. Как уже было сказано в первой части статьи, нам нет необходимости выбирать что‑то одно, эффективный вариант — применять оба профилировщика сразу, таким образом получится проанализировать приложение с разных сторон: как с точки зрения взаимодействия потоков и различных ожиданий, так и с точки зрения точечных оптимизаций и ускорения специализированных алгоритмов.
Стоит уточнить: хотя производительность в примерах отличается (при помощи pt-pmp удалось найти оптимизацию в 50%, в 1% — с использованием perf), это не значит, что perf слабее и его не надо использовать. Здесь делается акцент на сильных и слабых сторонах данных инструментов, так как при помощи pt-pmp тяжело находить проблемы, закрытие которых даст прирост в 1–2%. Но таких проблем может быть много, и, если постепенно их исправлять, производительность приложения ощутимо ускорится.
Приложение
Код микробенчмарка для конвертации строки:
#include <vector>
#include <string>
#include <random>
#include <algorithm>
#include <benchmark/benchmark.h>
#include <casemap/casemap.h>
static constexpr size_t STR_MAX_SIZE = 100;
static constexpr size_t VECTOR_SIZE = 3000;
static std::string getRandomString(size_t length)
{
static constexpr std::string_view CHARACTERS = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz";
std::default_random_engine generator;
std::uniform_int_distribution<> distribution(0, CHARACTERS.size());
std::string string;
string.reserve(length);
for (std::size_t i = 0; i < length; ++i)
{
string.push_back(CHARACTERS[distribution(generator)]);
}
return string;
}
static std::vector<std::string> getRandomStringVector() {
std::default_random_engine generator;
std::uniform_int_distribution<> distribution(1, STR_MAX_SIZE);
std::vector<std::string> vector{VECTOR_SIZE};
std::generate(vector.begin(), vector.end(), [&generator, &distribution]() {
auto length = distribution(generator);
return getRandomString(length);
});
return vector;
}
static void IcuConversion(benchmark::State &state) {
auto vector = getRandomStringVector();
for( auto _: state ) {
for( auto &str: vector ) {
auto result = toLowerIcu(str);
benchmark::DoNotOptimize(result);
}
}
}
BENCHMARK(IcuConversion);
static void ToLowerConversion(benchmark::State &state) {
auto vector = getRandomStringVector();
for( auto _: state ) {
for( auto &str: vector ) {
auto result = toLowerAscii(str);
benchmark::DoNotOptimize(result);
}
}
}
BENCHMARK(ToLowerConversion);
BENCHMARK_MAIN();