Допустим, вы написали код для решения задачи, которая хорошо распараллеливается. Каждый поток занимается своим участком работы и не зависит от других, поэтому потоки почти не должны координироваться друг с другом, за исключением самого последнего этапа, когда требуется объединить результаты. Естественно, в данном случае логично предположить, что чем больше ядер задействуется для такого кода, тем быстрее он выполнится. Вы ставите бенчмарки и сначала прогоняете этот код на ноутбуке. Действительно, оказывается, что он практически идеально масштабируется на всех четырёх доступных ядрах. Затем вы прогоняете его на большой и пафосной многопроцессорной машине, рассчитывая, что производительность будет ещё выше — но убеждаетесь, что на практике этот код работает даже медленнее, чем на ноутбуке, сколько бы ядер под него не выделить. Да, именно с таким случаем мне однажды довелось столкнуться.

I’veНедавно я работал над Latte — инструментом для бенчмаркинга Cassandra. Пожалуй, он самый эффективный в своём роде, как по потреблению ресурсов ЦП, так и по расходу памяти. Идея в основе этого инструмента очень проста: небольшой фрагмент кода генерирует данные, после чего выполняет ряд асинхронных инструкций на языке CQL внутри Cassandra. Latte циклически вызывает этот код и записывает, сколько времени заняла каждая из итераций. Наконец, он выполняет статистический анализ и отображает его результаты в различных формах.

Мне представляется, что бенчмаркинг — как раз из тех задач, распараллеливать которые — одно удовольствие. Если проверяемый при помощи бенчмарка код не сохраняет состояние, то его можно без труда вызывать из множества потоков. Здесь и здесь рассказано, как это сделать на Rust.

Правда, на момент подготовки двух вышеупомянутых постов в Latte ещё были очень плохо разработаны возможности определения рабочих нагрузок. Инструмент мог работать только с двумя предопределёнными жёстко закодированными рабочими нагрузками, одна была рассчитана на чтение, а другая — на запись. Параметризовать также можно было лишь немногие, например, сколько в таблице столбцов и какого они размера — словом, ничего особенно интересного. Никаких вторичных индексов. Никаких пользовательских условий для фильтрации. Невозможно управлять текстом CQL. В самом деле, ничего. На тот момент Latte в целом был рабочим прототипом, а не универсальным инструментом для выполнения полезной. Разумеется, от него можно было сделать форк, написать новую рабочую нагрузку на Rust, а затем скомпилировать всё из исходников. Но кому захочется тратить время на изучение внутреннего устройства нишевого инструмента для бенчмаркинга?

Скрипты Rune

Пришло время, когда мне потребовалась возможность измерять производительность внутренних индексов хранилища (storage attached indexes) в Cassandra. Я решил интегрировать Latte со скриптовым движком, который позволил бы мне с лёгкостью определять рабочие нагрузки без перекомпиляции всей программы. Поупражнявшись немного со встраиванием инструкций CQL в конфигурационные файлы TOML (оказалось, что это дело одновременно путаное и малопрактичное), повеселившись с попытками встраивать Lua (пожалуй, этот язык отлично смотрится в мире C, но с Rust взаимодействует гораздо хуже, чем я ожидал, хотя, вроде бы работает), я в итоге решил спроектировать что-то наподобие sysbench, но встроить в него не Lua, а интерпретатор Rune.

Основные достоинства Rune, благодаря которым он меня и подкупил — это безболезненная интеграция с Rust и поддержка асинхронного кода. Благодаря последней, пользователи могут выполнять инструкции на CQL непосредственно в скриптах рабочей нагрузки, опираясь на асинхронное устройство драйвера Cassandra. Кроме того, команда поддержки Rune с готовностью стала мне помогать и практически сразу устранила всё, что меня сковывало.

Вот полный пример рабочей нагрузки, измеряющий, какова производительность при выборе строк по случайным ключам:

const ROW_COUNT = latte::param!("rows", 100000);

const KEYSPACE = "latte";
const TABLE = "basic";

pub async fn schema(ctx) {
    ctx.execute(`CREATE KEYSPACE IF NOT EXISTS ${KEYSPACE} \
                    WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 }`).await?;
    ctx.execute(`CREATE TABLE IF NOT EXISTS ${KEYSPACE}.${TABLE}(id bigint PRIMARY KEY)`).await?;
}

pub async fn erase(ctx) {
    ctx.execute(`TRUNCATE TABLE ${KEYSPACE}.${TABLE}`).await?;
}

pub async fn prepare(ctx) {
    ctx.load_cycle_count = ROW_COUNT;
    ctx.prepare("insert", `INSERT INTO ${KEYSPACE}.${TABLE}(id) VALUES (:id)`).await?;
    ctx.prepare("select", `SELECT * FROM ${KEYSPACE}.${TABLE} WHERE id = :id`).await?;
}

pub async fn load(ctx, i) {
    ctx.execute_prepared("insert", [i]).await?;
}

pub async fn run(ctx, i) {
    ctx.execute_prepared("select", [latte::hash(i) % ROW_COUNT]).await?;
}

Подробнее о том, как писать такие скрипты, рассказано в README.

Обвешиваем бенчмарками бенчмаркинговую программу

Пусть пока и не поддерживается динамическая компиляция этих скриптов прямо в нативный код, работают они достаточно быстро, а благодаря тому, что обычно они содержат не так много кода, их не выносит наверх выдачи профилировщика. Я опытным путём установил, что издержки от использования интерфейса между функциями (FFI) Rust-Rune ниже, чем от аналогичного интерфейса Rust-Lua, предоставляемого mlua. Пожалуй, всё из-за проверок безопасности, применяемых в mlua.

Первым делом, чтобы оценить производительность бенчмаркингового цикла, я создал пустой скрипт:

pub async fn run(ctx, i) {
}

Хотя здесь и не тела функции, бенчмаркинговая программа всё‑таки должна кое‑что проделать, чтобы выполнить этот код:

  • назначить N параллельных асинхронных вызовов с применением buffer_unordered

  • установить свежее локальное состояние (напр., стек) для виртуальной машины Rune

  • вызвать функцию Rune, передающую параметры со стороны Rust

  • измерить, сколько времени ушло на возврат каждой возвращённой футуры

  • собрать логи, обновить гистограммы HDR и вычислить другую статистику

  • выполнить всё это на M потоков, воспользовавшись многопоточным планировщиком Tokio

На старом 4-ядерном ноутбуке с процессором Intel Xeon E3-1505M v6 с фиксированной частотой 3 ГГц результаты выглядели очень многообещающе:

Поскольку здесь 4 ядра, пропускная способность возрастает линейно, пока работа не займёт 4 потока. Затем наблюдается ещё некоторый небольшой рост вплоть до 8 потоков, благодаря технологии гиперпоточности, позволяющей выжать ещё немного из каждого ядра. Далее, сверх 8 потоков, производительность совершенно не растёт, так как к этому моменту все ресурсы ЦП полностью насыщены.

Меня вполне устроили и абсолютные числа, которые я получил. Когда на ноутбуке фиксируется несколько миллионов пустых вызовов в секунду, представляется, что выбранный в качестве бенчмарка цикл достаточно легковесен и не обременяет реальные изменения серьёзными издержками. Локальный сервер Cassandra, запущенный на том же ноутбуке, при полной загрузке может обслуживать лишь около 200k запросов в секунду, да и то, если эти запросы донельзя просты, а все данные умещаются в памяти.

Кстати, если добавить в тело цикла некоторое количество реального кода, который действительно генерирует данные, но не делает вызовов к базе данных — работа ожидаемо пропорционально замедляется, но не более, чем в два раза. Поэтому производительность всё равно остаётся в диапазоне «миллионы операций в секунду».

Это было просто. Можно было на этом остановиться и объявить себя победителем. Но мне было любопытно, какой максимальной скорости можно было бы добиться на более крупной машине, где больше ядер.

Прогоняем пустой цикл на 24 ядрах

Сервер с двумя процессорами Intel Xeon CPU E5-2650L v3, по 12 ядер на каждом, с частотой 1,8 ГГц, очевидно, должен работать гораздо быстрее старого 4-ядерного ноутбука, верно? Да, пожалуй, с 1 потоком работа должна идти медленнее, с учётом частоты ЦП (3 ГГц против 1,8 ГГц), но эта разница должна быть быстро скомпенсирована благодаря тому, что ядер на сервере гораздо больше.

Давайте посмотрим на числа:

Сразу видно, что тут что-то не так. Два потока – лучше одного, и это аксиома. Мне не удавалось поднять производительность сверх примерно 2 миллионов вызовов в секунду, и это было примерно вчетверо хуже того показателя, которого я смог добиться на ноутбуке. Тут либо сервер совсем не фурычит, либо у меня в программе — серьёзная проблема с масштабируемостью.

Разбираемся

Когда возникает проблема с производительностью, чаще всего она решается путём выполнения кода с привлечением профилировщика. В Rust не составляет труда сгенерировать флеймграфы при помощи cargo flamegraph. Сравним флеймграфы, которые получаются при прогоне бенчмарка с применением 1 потока и 12 потоков:

Я ожидал, что найдётся ровно одно узкое место — например, оспариваемый мьютекс или что‑то подобное. Но, к моему удивлению, на первый взгляд ничего такого не нашлось. Нм единого узкого места! Казалось, что около 1/3 времени уходило на выполнение кода VM::run из Rune, а остальное время уходило просто на опрос футур. По‑видимому, тормозящий код просто встраивался в функцию и поэтому ускользал от профилировщика.

В любом случае, поскольку VM::run и путь rune::shared::assert_send::AssertSend также вели к Rune, я решил отключить весь код, отвечающий за вызов функции Rune, после чего повторил эксперимент так, чтобы в цикле прогонялась просто пустая футура, но код для замера времени и сбора статистики оставался активен:

// Выполняет одну итерацию с рабочей нагрузкой
// Это действие должно быть идемпотентным –
// сгенерированное действие должно быть функцией от количества итераций.
// Возвращает информацию о том, когда завершился запрос.
pub async fn run(&self, iteration: i64) -> Result<Instant, LatteError> {
    let start_time = Instant::now();
    let session = SessionRef::new(&self.session);
    // let result = self
    //     .program
    //     .async_call(self.function, (session, iteration))
    //     .await
    //     .map(|_| ()); // стираем Value, поскольку Value — это !Send
    let end_time = Instant::now();
    let mut state = self.state.try_lock().unwrap();
    state.fn_stats.operation_completed(end_time - start_time);
    // ... 
    Ok(end_time)   
}

Такой код отлично масштабировался до 100M вызовов в секунду на 48 потоках! Так что проблема должна находиться где-то ниже функции Program::async_call:

// Скомпилированная программа рабочей нагрузки
pub struct Program {
    sources: Sources,
    context: Arc<RuntimeContext>, 
    unit: Arc<Unit>,
}

// Выполняет полученную асинхронную функцию с её аргументами.
// Если выполнение пройдёт неудачно, то выдаёт диагностические сообщения, напр., стектрейс к стандартному потоку ошибок.
// Также сигнализирует об ошибке, если функция выполнится успешно, но функция вернёт 
// значение-ошибку.    
pub async fn async_call(
    &self,
    fun: FnRef,
    args: impl Args + Send,
) -> Result<Value, LatteError> {
    let handle_err = |e: VmError| {
        let mut out = StandardStream::stderr(ColorChoice::Auto);
        let _ = e.emit(&mut out, &self.sources);
        LatteError::ScriptExecError(fun.name, e)
    };
    let execution = self.vm().send_execute(fun.hash, args).map_err(handle_err)?;
    let result = execution.async_complete().await.map_err(handle_err)?;
    self.convert_error(fun.name, result)
}

// Инициализирует свежую виртуальную машину, необходимую для выполнения этой программы.
// Получается исключительно легковесно
fn vm(&self) -> Vm {
    Vm::new(self.context.clone(), self.unit.clone())
}

Функция async_call делает сразу несколько вещей:

  • готовит свежую виртуальную машину Rune — предполагается, что эта операция должна быть очень легковесной и, в принципе, она сводится к подготовке свежего стека. Виртуальные машины не поступают в совместное использование ни вызовам, ни потокам, поэтому могут эксплуатироваться совершенно независимо друг от друга

  • вызывает функцию, передавая её идентификатор и параметры

  • наконец, получает результат и преобразует некоторые ошибки; можем смело исходить из того, что для пустого бенчмарка таким результатом будет no‑op

Далее я решил просто избавиться от вызовов send_execute и async_complete и оставить лишь подготовку VM. Собственно, я хотел проконтролировать выполнение этой строки:

Vm::new(self.context.clone(), self.unit.clone())

Код выглядит совершенно невинно. Без всяких блокировок, мьютексов, системных вызовов, нет и разделяемых изменяемых данных. Есть предназначенные только для чтения структуры context и unit, совместно используемые Arc, но совместное использование только для чтения не должно вызывать проблем.

Часть VM::new также тривиальна:

impl Vm {

    // Конструируем новую виртуальную машину
    pub const fn new(context: Arc<RuntimeContext>, unit: Arc<Unit>) -> Self {
        Self::with_stack(context, unit, Stack::new())
    }

    // Собираем новую виртуальную машину с пользовательским стеком
    pub const fn with_stack(context: Arc<RuntimeContext>, unit: Arc<Unit>, stack: Stack) -> Self {
        Self {
            context,
            unit,
            ip: 0,
            stack,
            call_frames: vec::Vec::new(),
        }
    }

Правда, как бы безобидно ни выглядел код, я предпочитаю дважды проверить собственные допущения. Я попробовал прогонять его на разном количестве потоков и, пусть он теперь и работал быстрее, чем раньше, он опять совсем перестал масштабироваться – утыкаясь в потолок производительности, составлявший около 4 миллионов вызовов в секунду!

Проблема

Хотя, на первый взгляд кажется, что в вышеприведённом коде нет никакого совместного использования изменяемых данных, на самом деле здесь скрыто что-то такое, что разделяется и изменяется: это сами счётчики ссылок Arc. Эти счётчики разделяются в ходе всех вызовов и вызываются из множества потоков — именно в этом и есть причина возникающего затора.

Можно утверждать, что при атомарном увеличении и уменьшении разделяемого атомарного счётчика — в сущности, не проблема, поскольку это «неблокирующие» операции. Они даже преобразуются в одиночные инструкции на ассемблере (напр., lock xadd)! Если какая-то операция укладывается в одну инструкцию на ассемблере, то она не может быть медленной, так? К сожалению, такая линия рассуждения ошибочна.

Истинный корень проблемы — не в вычислениях, а в затратах на поддержание разделяемого состояния.

Сколько времени требуется на чтение или запись данных, обычно зависит от того,  насколько далеко ядру ЦП приходится «тянуться» за данными. Вот типичные значения задержек, характерные для процессора Intel Haswell Xeon (данные взяты с  этого сайта):

  • Кэш L1: 4 цикла

  • Кэш L2: 12 циклов

  • Кэш L3: 43 цикла

  • ОЗУ: 62 цикла + 100 нс

Кэши L1 и L2 обычно являются локальными в пределах одного ядра (кэш L2 может совместно использоваться двумя ядрами). Кэш L3 используется совместно всеми ядрами процессора. Кроме того, существуют прямые связи между кэшами L3 различных процессоров на одной материнской плате — эти связи нужны для управления когерентностью кэша L3. Таким образом, кэш L3 логически разделяется между всеми процессорами.

Пока вы не обновляете кэш-линию, а только читаете её из множества потоков, эта линия будет принимать нагрузку от множества ядер и считаться разделяемой. Вполне вероятно, что частые обращения к таким данным будут обслуживаться из кэша L1 — очень быстрого. Соответственно, совершенно нормально совместно использо��ать данные, предназначенные только для чтения, и эта операция хорошо масштабируется. Даже при использовании атомиков только для чтения описываемая задача будет выполняться довольно быстро.

Правда, как только мы привнесём в эту картину обновление разделяемой кэш-линии, всё начинает усложняться. В архитектуре x86-amd64 — когерентные кэши данных. В принципе, это означает, что, если вы записываете данные через одно ядро, то можете прочитать их через другое. Невозможно хранить сразу на нескольких ядрах кэш-линию с конфликтующими данными. Как только поток решает обновить разделяемую кэш-линию, то она инвалидируется и на всех других ядрах. Поэтому при последующих загрузках на этих ядрах придётся заново вытянуть данные, как минимум, из кэша L3. Конечно же, это гораздо медленнее, и тем более медленно, если на конкретной материнской плате расположен не один процессор, а несколько.

Атомарность счётчиков ссылок в данном случае превращается в дополнительную проблему, тем более усложняющую ситуацию для процессора. Хотя, использование атомарных инструкций зачастую характеризуется как «неблокирующее программирование», это не совсем так. На самом деле, для выполнения атомарных операций всё-таки требуются некоторые блокировки, и они происходят на аппаратном уровне. Такая блокировка остаётся очень тонкой и дешёвой, пока нет затора. Но, как обычно бывает при блокировках, производительность может сильно ухудшаться, если сразу многие сущности начнут одновременно соперничать за одну и ту же блокировку. Разумеется, всё гораздо хуже, если в роли таких «сущностей» будут целые процессоры, а не отдельные ядра, расположенные поблизости друг к другу.

Как это исправить

Очевидный выход — исключить совместное использование счётчиков ссылок. Жизненный цикл Latte структурирован очень просто, иерархически, поэтому все эти обновления Arc казались мне чрезмерными. Пожалуй, их можно было бы заменить более простыми ссылками, а ещё написать на Rust времена жизни. Но это проще сказать, чем сделать. К сожалению, Rune требует, чтобы ссылки на Unit и RuntimeContext передавались, будучи обёрнутыми в Arc. Это нужно для управления временем жизни (в потенциально более сложных сценариях). Также инструмент использует некоторые обёрнутые в Arc значения внутрисистемно, в рамках своих структур. Вариант переписать Rune лишь для стоявшей передо мной крошечной задачи не рассматривался.

Следовательно, Arc трогать нельзя. Вместо того, чтобы использовать значение Arc в единственном экземпляре, можно было задействовать одно Arc на каждый поток. Для этого также требуется разделить значения Unit и RuntimeContext, так, чтобы каждый поток получил своё собственное. В качестве побочного эффекта тем самым гарантируется, что никакой разделяемости вообще не будет. Так что, даже если Rune клонирует Arc, хранящееся внутрисистемно как часть этих значений, то эта проблема тоже решается. Недостаток этого решения — в повышенном расходе памяти. К счастью, рабочие скрипты Latte обычно крошечные, поэтому такой перерасход памяти не представляет особой проблемы.

Чтобы можно было по отдельности использ��вать Unit и RuntimeContext, я отправил в Rune  патч, чтобы их можно было клонировать. Затем, уже на стороне Latte, благодаря этому нововведению сделали целую новую функцию для «глубокого» клонирования структуры Program. Далее можно убедиться, что каждый поток обзавёлся собственной копией:

    // Получение глубокой копии context и unit.
    // Если вызывать этот метод вместо `clone`, то гарантируется, что структуры времени исполнения Rune
    // существуют отдельно, их можно эффективно переносить на разные ядра ЦП, не допуская  
    // случайного совместного использования ссылок на Arc.
    fn unshare(&self) -> Program {
        Program {
            sources: self.sources.clone(),
            context: Arc::new(self.context.as_ref().clone()),   // клонирует значение под Arc и оборачивает его в новый счётчик
            unit: Arc::new(self.unit.as_ref().clone()),         // клонирует значение под Arc и оборачивает его в новый счётчик
        }
    }

Кстати: поле sources в ходе выполнения не используется ни для чего кроме как для выдачи диагностики, поэтому его можно оставить разделяемым.

Обратите внимание: в той строке, где я исходно нашёл замедление, никаких изменений вносить не потребовалось!

Vm::new(self.context.clone(), self.unit.clone())

Дело в том, что self.context и self.unit больше не разделяются между потоками. К счастью, атомарные изменения, вносимые в неразделяемые счётчики, получаются очень быстрыми.

Окончательные результаты

Теперь, как и ожидалось, пропускная способность линейно масштабируется до 24 потоков:

Выводы

  • Цена разделяемого Arc в некоторых аппаратных конфигурациях бывает абсурдно высока, если часто поступают обновления сразу от множества потоков.

  • Не рассчитывайте, что всего одна ассемблерная инструкция никак не может вызвать проблем с производительностью.

  • Не исходите из того, что, если нечто нормально масштабируется на однопроцессорном компьютере, оно будет хорошо масштабироваться и на многопроцессорном компьютере.