Привет, Хабр. Меня зовут Олег Жуковец. Я руководитель команды «Экосистема» в Tarantool R&D компании VK Tech.

Многие разработчики сталкивались с ситуацией, когда запросы к базе данных выполняются быстро, индексы настроены, оборудование справляется с нагрузкой, но конечное приложение все равно работает медленно. Нередко проблема кроется не в самой базе данных, а в некорректно реализованном клиенте, который может стать «бутылочным горлышком» для всего ИТ-ландшафта. Именно поэтому оптимизация клиентов для работы с БД имеет важное значение.

В этой статье я на примере коннектора к Tarantool расскажу о доступных и простых оптимизациях клиента для БД, которые позволяют минимизировать аллокации и число горутин, чтобы выкрутить скорость обработки запросов (RPS) на максимум.

Немного о Tarantool

Tarantool — Open-Source-база данных, которая также может использоваться в качестве платформы для In-memory-вычислений. 

Примечание: У Tarantool также развиваются коробочные версии продуктов. Подробнее ознакомиться с ними можно здесь.

У Tarantool есть несколько ключевых особенностей.

  • Однопоточность. В Tarantool используется кооперативная многозадачность. У него всего один поток обрабатывает логику (при этом сетевые подключения могут обрабатывать еще несколько потоков). Детальнее о модели потоков — здесь

  • Собственный сетевой протокол. Для Tarantool разработан собственный асинхронный протокол IPROTO, основанный на MessagePack. Подробнее о протоколе можно почитать здесь

Асинхронность протокола имеет важное значение для нас как клиентов Tarantool.

Так, синхронная работа протокола подразумевает, что мы отправляем запрос, ждем ответ и только после можем отправлять новый запрос. Таким образом, обработка последующих запросов зависит от предыдущих. К примеру, так работает HTTP.

В случае асинхронного сетевого протокола клиент имеет возможность отправить сразу несколько запросов, не дожидаясь результата обработки. Хотя и с точки зрения сети данные все еще будут отправляться и читаться последовательно. 

При этом асинхронное взаимодействие с точки зрения пользователя реализовывается таким образом, что, даже отправляя один запрос ранее второго, нельзя гарантировать, что именно он первым поступит в сеть. 

Это связано с тем, что обычно в асинхронных клиентах применяются различные оптимизации, которые в том числе могут «перемешивать» запросы. 

Подобные оптимизации применяются и в нашей OpenSource Go-библиотеке для работы с Tarantool.

Асинхронные запросы: реализация работы с сетью

Чтобы понять, как реализуется асинхронная работа с сетью в нашем клиенте, рассмотрим небольшой фрагмент асинхронного API-коннектора.

Так, в типе соединения есть метод Do, выполняющий определенный запрос (Request), но не возвращающий ответ немедленно. Вместо этого он возвращает фьючер (Future) — объект для работы с асинхронным запросом.

У фьючера есть три интересные для нас ключевые метода:

  • WaitChan — возвращает канал, закрытие которого сигнализирует клиенту о завершении обработки запроса и получении ответа.

  • Get — блокирующий метод, выполняющий полную десериализацию полученного ответа в общую структуру. Такой способ является достаточно ресурсозатратным, особенно при обработке больших объемов данных.

  • GetTyped — также блокирующий метод, но, в отличие от первого варианта, производит десериализацию в заранее заданную целевую структуру, что значительно повышает производительность и снижает накладные расходы на обработку данных.

// Do принимает запрос, сериализует его и
// возвращает *Future, не ожидая ответ.
func (c Connection) Do(req Request) Future

// Future — это объект для работы с
// асинхронным запросом.
type Future struct {
    // Детали реализации.
}

// WaitChan возвращает канал, который закрывается
// при получении ответа.
func (f *Future) WaitChan() <-chan struct{}

// Get — блокирующий вызов, ждет ответ и
// десериализует его в подходящий тип.

func (f *Future) Get() ([]any, error)

// GetTyped — блокирующий вызов, ждет ответ и
// десериализует его в переданный объект.
func (f *Future) GetTyped(result any) error

Использовать это API можно следующим образом:

  • создали запрос и отправили его на выполнение;

  • получили объект Future;

  • ожидаем завершения обработки запроса и десериализуем результат.

// Создаем запрос.
request := NewSelectRequest("space")
// Получаем объект Future.
future := conn.Do(request)

// Можем передать объект в соседнюю горутину,
// скажем, через канал.
//
// А можем просто подождать ответ.
select {
case <-ctx.Done():
case <-future.WaitChan():
}

// Получаем результат первым методом.
resp, err := future.Get()

// Или же вторым методом.
var resp ResponseType
err := future.GetTyped(&resp)

При этом шаг с future.WaitChan можно опустить. В таком случае получится некая мимикрия под синхронное API. 

resp, err := connection.Do(request).Get()
// Или.
err := connection.Do(request).GetTyped(&resp)

Но если запросов много, так делать не надо. Почему — объясню ниже.

Реализация асинхронного API на стороне библиотеки

Наивная реализация синхронного API подразумевает простейший алгоритм:

  • клиент пишет сокет;

  • ждет сообщение от сокета;

  • декодирует и читает сообщение.

Но такая реализация работает медленно и неэффективно, поскольку клиенты обращаются к сокету последовательно.

В случае асинхронной реализации алгоритм несколько изменяется. Так, дополнительно создаются пишущая и читающая горутины. То есть клиенты передают данные в пишущую горутину и получают их из читающей.

Причем в такой реализации уже может быть несколько клиентов, которые могут обращаться к горутинам параллельно. 

Здесь важно, что наивную асинхронную реализацию можно оптимизировать. 

Так, из пишущей и читающей горутин можно убрать всю логику, которая потребляет время потока на исполнение, оставив только операции записи и чтения. Чтобы горутины были заняты только записью и чтением соответственно.

В результате можно снизить нагрузку на пишущие и читающие горутины, а также повысить эффективность работы с сокетами.

Реализация в библиотеке Go Tarantool

На уровне кода коннектора это реализовано следующим образом.

Так, у нас есть структура connShard, которая содержит Mutex для фьючеров, Mutex для записи в буфер и вспомогательные структуры. 

type connShard struct {
    rmut            sync.Mutex
    requests        [128]futureList
    requestsWithCtx [128]futureList
    bufmut          sync.Mutex
    buf             smallBuf
    enc             *msgpack.Encoder
}

При этом подобных шардов несколько. Это нужно, чтобы снизить конкуренцию за общие ресурсы: поскольку запросы пишутся последовательно в разные шарды, запросы не блокируют друг друга и обрабатываются значительно быстрее.

Реализация со стороны пишущей горутины (Writer Goroutine)

Реализация со стороны Writer Goroutine на уровне кода использует:

  • список фьючеров;

  • Mutex, который блокирует буфер;

  • буфер с данными;

  • Encoder — вспомогательное поле для оптимизации.

type connShard struct {
    // rmut            sync.Mutex
    requests        [128]futureList
    // requestsWithCtx [128]futureList
    bufmut          sync.Mutex
    buf             smallBuf
    enc             *msgpack.Encoder
}

Пошагово запись с точки зрения пишущей горутины будет иметь следующий вид:

  • Находим request id следующего запроса, вычисляем необходимый шард. Шард содержит три элемента: Future, Mutex, Buffer. 

  • Регистрируем Future, блокируем Mutex и увеличиваем буфер. 

  • После этого передаем номер шарда по каналу, чтобы уведомить пишущую горутину. 

  • После получения по каналу номера шарда пишущая горутина блокирует шард.

  • Затем пишущая горутина забирает буфер себе. 

  • При этом буфер на шарде сокращается до минимального размера, а содержимое буфера передается в сокет. 

В коде это выглядит так:

// Находим шард.
shardn := fut.requestId % shardCnt
shard := &conn.shard[shardn]
// Блокируем шард.
shard.bufmut.Lock()
// Записываем данные в шард
firstWritten := shard.buf.Len() == 0
reqid := fut.requestId
pack(&shard.buf, shard.enc, reqid, req, streamId, conn.schemaResolver)
// Разблокируем шард.
shard.bufmut.Unlock()
// Передаем номер шарда во writer при необходимости.
if firstWritten {
    conn.dirtyShard <- shardn
}

Примечание: Подробнее с кодом реализации можно ознакомиться по ссылке.

Со стороны пишущей горутины на уровне кода это реализовано следующим образом:

for atomic.LoadUint32(&conn.state) != connClosed {
    select {
    // Проверяем, есть ли шард для записи.
    case shardn = <-conn.dirtyShard:
    default:
        // Если нет, то говорим scheduler'у, что мы свободны.
        runtime.Gosched()
        if len(conn.dirtyShard) == 0 {
            if err := w.Flush(); err != nil {
                // Обработка ошибки.
            }
        }
        // Ждем шард для записи или завершения работы.
        select {
        case shardn = <-conn.dirtyShard:
        case <-conn.control:
            return
        }
    }
    shard := &conn.shard[shardn]
    shard.bufmut.Lock()
    // Забираем данные из шарда.
    packet, shard.buf = shard.buf, packet
    shard.bufmut.Unlock()
    if packet.Len() == 0 {
        continue
    }
    // Записываем данные.
    if _, err := w.Write(packet.b); err != nil {
        // Обработка ошибки.
    }
    packet.Reset() // Очищаем буфер.
}

Примечание: Подробнее с кодом реализации можно ознакомиться по ссылке.

Реализация со стороны читающей горутины (Reader Goroutine)

Со стороны читающей горутины все несколько проще, поскольку нас интересует только список фьючеров.

type connShard struct {
    // rmut            sync.Mutex
    requests           [128]futureList
    // requestsWithCtx [128]futureList
    // bufmut          sync.Mutex
    // buf             smallBuf
    // enc             *msgpack.Encoder
}

Пошагово схема чтения имеет следующий вид:

  • Из сокета читающая горутина получает данные, но декодирует только заголовок (Header). 

  • Из заголовка забираем requestID, по которому получаем номер шарда, и идем в него. 

  • В шарде находим фьючер, который принадлежит запросу. 

  • Затем записываем запрос в этот фьючер и сообщаем в клиент, что у нас появились данные.

  • После этого клиент может десериализовать тело запросов.

На уровне кода читающей горутины все реализуется следующим образом:

for atomic.LoadUint32(&conn.state) != connClosed {
    //Вычитываем ответ.
    respBytes, err := read(r, conn.lenbuf[:])
    if err != nil {
        // Обработка ошибки.
    }
    // Декодируем заголовок.
    buf := smallBuf{b: respBytes}
    header, code, err := decodeHeader(conn.dec, &buf)
    if err != nil {
        // Обработка ошибки.
    }
    // Находим объект Future для запроса.
    if fut = conn.fetchFuture(header.RequestId); fut != nil {
        // Устанавливаем ответ.
        if err := fut.SetResponse(header, &buf); err != nil {
            // Обработка ошибки.
        }
    }
}

Примечание: Подробнее с кодом реализации можно ознакомиться по ссылке.

Получаем максимум RPS от Tarantool

Теперь перейдем к изучению вариантов получить максимальную скорость обработки запросов на стороне клиента библиотеки Go Tarantool. При этом сразу оговорюсь, что эффективность от оптимизаций я буду оценивать на основе синтетического теста в «идеальных условиях»: на локальной машине, с небольшим размером запросов и ответов, с минимумом логики на нашей стороне и стороне Tarantool. 

Таким образом, задача теста — нагрузить Tarantool только логикой обработки запросов, чтобы получить максимальную пропускную способность коннектора.

При этом будем исходить из того, что у Tarantool будет задействовано всего два потока исполнения (один для обработки логики, второй для работы с сетью), то есть можем получить потребление CPU до 200%. А у Golang потоков исполнения и обработки может быть много — например, в рамках примера под Golang будет выделено 6 CPU (максимальная нагрузка до 600% CPU).

Зная условия, приступаем к написанию бенчмарка. 

Наивная реализация

Наивная реализация использования нашего клиента для Tarantool довольно проста:

b.ResetTimer()
for b.Loop() {
    // Создаем запрос.
    req := NewSelectRequest(spaceNo).
        Index(indexNo).
        Iterator(IterEq).
        Key([]interface{}{uint(1111)})
    // Выполняем его.
    data, err := conn.Do(req).Get()
    if err != nil {
        b.Errorf("request error: %s", err)
    }
    // Проверяем результат.
    tuple := data[0].([]any)
    if tuple[0].(uint16) != uint16(1111) {
        b.Errorf("invalid result")
    }
}

Эта наивная реализация позволяет получить 35 тыс. RPS. При этом происходит 22 аллокации на запрос, потребляется 65% CPU на сам бенчмарк и 50% на Tarantool.

Вместе с тем, поскольку Tarantool может потреблять до 200% CPU, путем наивной калькуляции можно предположить, что полученные результаты можно умножить на 4 (200 / 50% = 4) и получить до 140 тыс. RPS. 

Поскольку реализация наивная, оптимизировать мы ее будем так же наивно. Так, у нас большое количество аллокаций (22), поэтому начнем с них.

Например, мы можем убрать выделение объектов под запрос и под ключ, а также использовать оптимальный метод для десериализации.

b.ResetTimer()

for b.Loop() {
    // Создаем запрос.
    req := NewSelectRequest(spaceNo).
        Index(indexNo).
        Iterator(IterEq).
        Key([]interface{}{uint(1111)})
    // Выполняем его.
    data, err := conn.Do(req).Get()
    if err != nil {
        b.Errorf("request error: %s", err)
    }
    // Проверяем результат.
    tuple := data[0].([]any)
    if tuple[0].(uint16) != uint16(1111) {
        b.Errorf("invalid result")
    }
}

Одновременно с этим мы можем вынести создание запроса. 

// Создаем один запрос.
req := NewSelectRequest(spaceNo).
    Index(indexNo).
    Iterator(IterEq).
    Key(UintKey{I: 1111})

// Создаем один объект для декодирования на стеке.
var tuple benchTuple

b.ResetTimer()

for b.Loop() {
    // Выполняем запрос.
    err := conn.Do(req).GetTyped(&tuple)
    if err != nil {
        b.Errorf("request error: %s", err)
    }
    // Проверяем результат.
    if tuple.id != 1111 {
        b.Errorf("invalid result")
    }
}

Благодаря этому мы уменьшили количество аллокаций с 22 до 6. Вместе с тем это не позволило получить ожидаемый кратный прирост скорости обработки: RPS вырос до 36 тыс. (прирост на 1 тыс. можно принять за погрешность), потребление CPU на бенчмарк — до 62%, потребление CPU на Tarantool — до 52%.

Поэтому можно сделать вывод, что наивные оптимизации не всегда эффективны. 

Тут стоит отметить, что имеющиеся 6 аллокаций на запрос делает сам клиент. 

Поэтому на стороне клиентского кода оптимизировать больше нечего.

Ищем другие варианты.

Многопоточная реализация

Попробуем выполнять запросы в нескольких горутинах.

b.ResetTimer()

b.RunParallel(func(pb *testing.PB) {
    var tuple benchTuple

    for pb.Next() {
        // Выполняем запрос.
        err := conn.Do(req).GetTyped(&tuple)
        if err != nil {
            b.Errorf("request error: %s", err)
        }
        // Проверяем результат.
        if tuple.id != 1111 {
            b.Errorf("invalid result")
        }
    }
})

Так, потребление CPU бенчмарком увеличилось до 191%, Tarantool — до 87%, а скорость обработки запросов достигла 360 тыс. RPS.

Такой прирост обеспечивается за счет того, что RunParallel-бенчмарк в Golang задействует количество горутин, равное количеству ядер CPU (8 в нашем случае). Таким образом, используя в 8 раз больше горутин, мы увеличили скорость обработки в 10 раз.

Но и это не предел.

Еще более многопоточная реализация

Потенциально мы можем увеличить параметр параллелизма, чтобы запускать не 8 горутин, а больше. Для этого в бенчмарке проходимся от 1 до 1 024 в степени 2, устанавливаем SetParallelism и выполняем первоначальный код.

for p := 1; p <= 1024; p *= 2 {
    b.Run(fmt.Sprintf("%d", p), func(b *testing.B) {
        // Устанавливаем множитель параллелизма.
        b.SetParallelism(p)
        b.ResetTimer()

        b.RunParallel(func(pb *testing.PB) {
            var tuple benchTuple

            for pb.Next() {
                // Выполняем запрос.
                err := conn.Do(req).GetTyped(&tuple)
                if err != nil {
                    b.Errorf("request error: %s", err)
                }
                // Проверяем результат.
                if tuple.id != 1111 {
                    b.Errorf("invalid result")
                }
            }
        })
    })
}

В такой реализации мы можем получить 1,8 млн RPS. 

При этом бенчмарк начинает потреблять 770% CPU, а Tarantool — 85% CPU. То есть бенчмарк вытесняет Tarantool. 

Проведем анализ, при каком значении параметра параллелизма получился максимальный RPS.

В нашем случае точка насыщения примерно при значении параллелизма равным 64, что соответс��вует 512 горутинам (64 * 8 = 512). При этом в точке 1 024 параллелизма или 8 192 горутинах показатели намного хуже, что ожидаемо. 

Для более детального анализа запускаем CPU-профайлер, чтобы сравнить показатели при разных параметрах параллелизма.

64 * 8 горутин:

1 024 * 8 горутин:

Здесь можно заметить, что на 1 024 горутинах у нас создается слишком много объектов NewFuture, из-за чего рантайм Golang перестает справляться с их количеством: фактически вся работа сводится к аллокации объектов и избавлению от них. Поэтому RPS не только не растет, а даже начинает деградировать. 

Асинхронная реализация

Все предыдущие бенчмарки использовали синхронный API, из-за чего для увеличения количества параллельно выполняемых запросов нам приходилось увеличивать количество горутин. 

Но у нас есть и асинхронный API, поэтому мы можем написать асинхронный бенчмарк. Идея очень простая:

  • у нас есть канал, который передает Future из одной горутины в другую; 

  • первая горутина просто отправляет запрос и передает Future в канал; 

  • вторая горутина ждет ответ и декодирует его.

for cn := 1; cn <= connections; cn *= 2 { // Можно изменять количество соединений.
    for cc := 1; cc <= 512; cc *= 2 { // Можно изменять количество горутин.
        var wg sync.WaitGroup

        curRequests := requests // Количество запросов на цикл.

        start := time.Now()

        // Первая горутина отправляет запрос.
        for i := range concurrency {
            wg.Add(1)

            ch := make(chan *Future, 1024)

            go func(i int) {
                defer close(ch) // Закрываем канал по завершении.
                // Выполняем только curRequests запросов.
                for atomic.AddInt64(&curRequests, -1) >= 0 {
                    // Выполняем запрос.
                    ch <- conns[i%cn].Do(req)
                }
        }(i)

        // Вторая горутина ждет ответ и декодирует его.
        go func() {
            defer wg.Done()

            var tuple benchTuple
            // Пока горутина 1 не завершилась.
            for fut := range ch {
                // Ждем ответ и декодируем его.
                err := fut.GetTyped(&tuple)
                if err != nil {
                    t.Errorf("request error: %s", err)
                }
                // Проверяем результат.
                if tuple.id != 1111 {
                    t.Errorf("invalid result")
                }
            }
        }()

        wg.Wait()

        duration := time.Since(start)

        // Выводим результаты.
}

То есть то, что выполнялось последовательно, у нас становится асинхронным: можно выполнять много запросов и ожидать много ответов одновременно. 

В такой реализации с одним соединением и двумя горутинами нам удалось получить 2 млн RPS. При этом бенчмарк потреблял 323% CPU, а Tarantool — 130%. 

Это уже ближе к истине, но точки роста по-прежнему остаются.

Например, можно увеличить количество соединений к экземпляру Tarantool. 

В результате при 16 соединениях к одному экземпляру мы действительно можем получить ощутимый прирост производительности обработки — 4,8 млн RPS при 580% потребления CPU бенчмарком и 170% потребления CPU Tarantool.

Таким образом мы смогли получить более чем удовлетворительный результат. 

Примечание: Все бенчмарки есть в открытом доступе по ссылке — вы можете ознакомиться и запустить их самостоятельно. 

Итого, применение асинхронного API дает ряд преимуществ:

  • позволяет создавать меньше горутин, то есть уменьшать сопутствующие затраты;

  • увеличивая количество пишущих/читающих горутин относительно друг друга, можно лучше утилизировать CPU;

  • можно уменьшить Latency и увеличить утилизацию CPU, делая цепочки обработки.

Примечание: Идея цепочек обработки для улучшения утилизации CPU заключается в том, чтобы разделять сложную задачу на отдельные шаги и назначать их выполнение на разные горутины. Такая организация позволяет лучше утилизировать CPU.

Внутренние оптимизации

Теперь перейдем к обзору внутренних оптимизаций, позволяющих улучшить производительность обработки запросов. 

Context в API 

У нас в API есть Context. И для каждого отдельного запроса его можно установить, чтобы ограничить время выполнения.

request := NewSelectRequest("space").
    Index("index").
    Context(ctx)
// ...
connection.Do(request)

Но при работе с Context есть некоторые ограничения. Например, мы не можем отменить запрос, который уже ушел по сети. То есть Golang будет думать, что запрос отменился, но фактически он уйдет в сеть и вернется результат.

На практике это значит, что за каждым запросом нужно следить отдельно, чтобы контролировать его время жизни. Именно поэтому в нашем случае добавление Context к запросу приводит к снижению производительности:

  • было 2,2 млн RPS при затратах 525% (коннектор), 130% (Tarantool) CPU с одного соединения;

  • стало 1,2 млн RPS при затратах 730%/100% CPU с одного соединения.

Поэтому вместо этого у нас внутри коннектора используется глобальный тайм-аут для всех запросов. 

Так, при создании соединения у нас запускается простая горутина, которая проходится по всем шардам, очищает истекшие запросы, корректирует таймер до ближайшего запроса, после чего ждет следующий ближайший по времени запрос. По такому алгоритму она работает до конца жизни соединения.

timeout := conn.opts.Timeout
t := time.NewTimer(timeout)
for {
    <-t.C // Ждем следующий ближайший запрос.
    // Находим следующий ближайший по времени завершения запрос.
    var nowepoch time.Duration
    minNext := time.Since(epoch) + timeout
    for i := range conn.shard {
        nowepoch = time.Since(epoch)
        shard := &conn.shard[i]
        for pos := range shard.requests {
            shard.rmut.Lock()
            pair := &shard.requests[pos]
            for pair.first != nil && pair.first.timeout < nowepoch {
                // Очистка истекших запросов.
            }
            if pair.first != nil && pair.first.timeout < minNext {
                // Этот запрос ближе и еще не должен завершиться.
                minNext = pair.first.timeout
            }
            shard.rmut.Unlock()
        }
    }
    // Корректируем таймер до ближайшего запроса.
    nowepoch = time.Since(epoch)
    t.Reset(minNext - nowepoch)
}

Преимущество такого подхода в том, что мы можем сразу следить за всеми запросами, не тратя ресурсы на отслеживание времени жизни каждого отдельного запроса.

На практике этот подход очень эффективен, так как при нормальной работе коннектора и сети горутина просыпается примемрно раз в Timeout времени.

Важность учета аллокаций

По ходу тестов мы снижали количество аллокаций с 22 до 6, но значительного влияния на метрики от этого не получили, так как CPU не был утилизирован полностью и было много свободных ресурсов для работы GC. Поэтому можно наивно предположить, что об аллокациях можно не задумываться. 

Но на практике это не так: при полной утилизации CPU самое время начать экономить на спичках.

Так, изначально у нас в коннекторе было 6 аллокаций. И при поверхностном изучении CPU-профиля видно, что они потребляют не более 20% CPU. 

Поэтому может показаться, что целесообразнее оптимизировать другие места, например сериализацию и десериализацию.

Вместе с тем попробуем сократить количество аллокаций.

Три из шести располагаются в конструкторе NewFuture. Попробуем убрать некоторые из них.

Для начала удаляем аллокации, которые вызваны Deprecated-методами, — от них можно избавиться безболезненно.

Количество аллокаций уменьшается с 6 до 5. Но даже это дает эффект. Так, практически при аналогичных затратах CPU метрика RPS увеличивается:

  • было 2,2 млн RPS при затратах 525%/130% CPU с одного соединения;

  • стало 2,4 млн RPS при затратах 530/130% CPU с одного соединения.

После этого можем применить другой вариант оптимизации, а именно — заменить канал на что-то другое.

Так, канал о готовности создается вместе с Future, и его закрытие сигнализирует о том, что ответ готов и его можно декодировать. Его можно заменить на условную переменную sync.Cond.

type Future struct {
    // Неинтересные нам поля.
    mutex     sync.Mutex
    done      chan struct{}
}

func NewFuture(req Request) (fut *Future) {
    fut = &Future{}
    // Заполняем неинтересные нам поля.
    fut.done = make(chan struct{})
    return fut
}

func (fut *Future) SetResponse(header Header, body io.Reader) error {
    fut.mutex.Lock()
    defer fut.mutex.Unlock()

    // Немного кода.

    // Закрываем канал при завершении.
    close(f.done)
    return nil
}

func (f *Future) wait() {
    // Ждем закрытия канала.
    <-done
}

func (fut *Future) GetTyped(result any) error {
    // Ожидаем ответ.
    fut.wait()
    // Декодируем его.
    return fut.resp.DecodeTyped(result)
}

Для реализации с использованием sync.Cond потребуется только сама переменная, Mutex и булевый флаг, оповещающий, что все завершено.

type Future struct {
    // Неинтересные нам поля.
    mutex     sync.Mutex
    cond      sync.Cond
    finished  bool
}

func NewFuture(req Request) (fut *Future) {
    fut = &Future{}
    // Заполняем неинтересные нам поля.
    fut.finished = false
    fut.cond.L = &fut.mutex
    return fut
}

func (fut *Future) SetResponse(header Header, body io.Reader) error {
    fut.mutex.Lock()
    defer fut.mutex.Unlock()

    // Немного кода.

    // Сигнализируем о получении ответа.
    fut.finished = true
    fut.cond.Broadcast()
    close(f.done)
    return nil
}

func (f *Future) wait() {
    fut.mutex.Lock()
    defer fut.mutex.Unlock()

    // Ждем получения ответа.
    for !fut.finished {
        fut.cond.Wait()
    }
}

func (fut *Future) GetTyped(result any) error {
    // Ожидаем ответ.
    fut.wait()
    // Декодируем его.
    return fut.resp.DecodeTyped(result)
}

Причем Mutex у нас уже был, поэтому достаточно добавить только флаг о завершении.

Теперь:

  • при создании Future флаг находится в положении False, создается условная переменная;

  • при получении ответа значение флажка меняется на True, а условной переменной шлется уведомление об его изменении.  

Таким образом, мы уменьшили количество аллокаций на запрос еще на одну, или с 6 до 4 в сумме. 

При этом такая оптимизация тоже позволяет получить прирост производительности:

  • было 2,2 млн RPS при затратах 525 / 130% CPU с одного соединения;

  • стало 2,6 млн RPS при затратах 535 / 135% CPU с одного соединения.

То есть две микрооптимизации обеспечивают прирост RPS на 18% при незначительном повышении потребления CPU. 

В результате в наших бенчмарках с этими правками можно получить до 5,2 млн RPS от одного экземпляра Tarantool. 

Упомянутые правки уже вносятся в библиотеку и будут выпущены со следующим мажорным релизом.

Описанный в статье кейс показывает, что порой очевидные и глобальные варианты доработки могут оказаться менее эффективными, чем неочевидные, на первый взгляд, микрооптимизации. Помните, что, если вам нужно получить максимальную производительность, сначала нужно изучить свои инструменты, библиотеки и писать код, исходя из понимания их работы, пробуя разные подходы. Только тогда результат будет именно тем, что вам необходим.