Это текстовая версия доклада с Java Rock Star Meetup, с которым выступал Дмитрий Фатов (@FatOFF ) — руководитель разработки Газпромбанка с опытом разработки приложений более 13 лет. Дмитрий работал как backend-, так и fullstack-разработчиком на языках Java, Kotlin, JS, TS, 1С и имеет большой опыт работы с SQL-базами данных. Если вы больше любите смотреть видео, то смотрите запись доклада на YouTube или VK Видео.
Предыстория появления доклада: мы с командой строим платформу G2, которая позволяет нам создавать enterprise-решения. В работе мы используем YDB и Postgres, последнему и посвящён доклад.
На своей платформе мы разрабатываем SaaS-решение (уже в продакшене), в котором есть внешние интеграции через xml. До какого-то момента к нам приходило максимум до 30 тыс. документов в одной выгрузке. Потом мы подключили очень крупного клиента, что увеличило количество документов до 2 млн (= 4 млн записей в БД) в одной выгрузке. На тот момент наша система не справлялась с этой нагрузкой. SLA (Service Level Agreement) составлял до 5 минут на обмен данными со сторонними системами. В этот SLA мы не укладывались.
Мы оптимизировали очень много по бизнес-логике и основной проблемой для нас осталась вставка данных в БД. Поэтому сегодня мы про это поговорим.
Итак, в докладе разберём, как ускорить вставку данных в PostgreSQL:
От обычных insert до кастомных методов PostgreSQL и сколько профита они дают по производительности.
Распараллеливание процесса вставки с сохранением атомарности всей операции.
Как ускорить обновление данных в PostgreSQL и сделать эту вставку атомарной.
Важное примечание. Все примеры в статье будут написаны без использования ORM. Только JDBC, только хардор)
Весь код, который будет использоваться в докладе написан на Kotlin и доступен на GitHub. Итак, дано:
Подготовленная БД размером 32 Гб.
100 млн строк в основной таблице с индексами.
Будем тестировать вставку на 4 млн записей.
Замеры: 3 итерации прогрева, 5 итераций замеров.
Окружение: java 17, PostgreSQL 17.4.
Специально для целей тестирования я арендовал небольшой сервер в Yandex Cloud:


От обычных insert до кастомных методов PostgreSQL
Начнем с базового insert, который есть практически во всех базах данных:
INSERT INTO payment_document (account_id, amount, expense, cur, order_date, order_number, payment_purpose, prop_10, prop_15, prop_20)
VALUES (1000004, '10.23', true, 'RUB', '2023-06-25', '123456', 'some_purpose0', 'some 10', 'some 15', 'some 20');
INSERT INTO payment_document (account_id, amount, expense, cur, order_date, order_number, payment_purpose, prop_10, prop_15, prop_20)
VALUES (1000005, '11.23', true, 'RUB', '2023-06-26', '123457', 'some_purpose1', 'some 10', 'some 15', 'some 20');
INSERT INTO payment_document (account_id, amount, expense, cur, order_date, order_number, payment_purpose, prop_10, prop_15, prop_20)
VALUES (1000006, '12.23', true, 'RUB', '2023-06-27', '123458', 'some_purpose1', 'some 10', 'some 15', 'some 20');
INSERT INTO payment_document (account_id, amount, expense, cur, order_date, order_number, payment_purpose, prop_10, prop_15, prop_20)
В этом INSERT мы передаём имена колонок и сами данные.
Посмотрим, сколько времени займёт выполнение запроса. От этой цифры и будем в дальнейшем отталкиваться.
Как мы это будем делать? В моем приложении реализовано это все на уровне сейверов (saver). Сейвер (saver) — это такой класс, который будет сохранять данные в БД, используя определённые методы. У нас будет абстрактный класс сейверов, у которого будет счётчик и который будет отправлять данные в БД батчами, чтобы мы не ходили с каждым запросом в БД. Первый сейрвер:
private val dataForInsert = mutableLitOf<String>()
override fun addDataForSave(entity: E) {
dataForInsert.add(proessor.getStringForInsert(entity))
super.addDataForSave(entity)
}
override fun saveData() {
processor.insertDataToDataBase(entityClass, dataForInsert, conn)
dataForInsert.clear()
}
В методе addDataForSave() через рефлексию достаём из entity нужные нам значения. В небольшой кэш dataForInsert складываем сформированные строки. В методе saveData() мы их отправляем в базу данных. Сам метод saveData() выглядит тоже достаточно просто. Мы вызываем INSERT INTO, передаём таблицу и сами данные:
conn.createStatement().use { stmt ->
data.map { s ->
stmt.addBatch("INSERT INTO $tablename ($columns) VALUES ($s);")
}
stmt.executeBatch()
}
Как мы будем работать с этими сейверами? Сам тестируемый метод.
fun saveByInsert(count: Int) {
val currencies = currencyRepo.findAll()
val accounts = accountRepo.findAll()
pdBatchByEntitySaverFactory.getSaver(SaverType.INSERT).use { saver ->
for (i in 0 <= until < count) {
saver.addDataForSave(getRandomEntity(null, currencies.random(), account.random()))
}
saver.commit()
}
}
Через фабрику (factory) мы получаем нужный нам сейвер, генерируем какую-то рандомную entity,передаем в метод addDataForSave и коммитим.
Давайте посмотрим на результат:
"name": "Insert method",
"count": 4000000
"time": "8 min, 37 sec, 509 ms"
8 минут и 37 секунд — достаточно много. Эта цифра нам нужна для того, чтобы понимать, от чего вообще отталкиваться. Отталкиваемся мы обычно от базы, от базовых INSERT.
Помимо обычного INSERT, есть INSERT Prepared Statement:
INSERT INTO payment_document (account_id, amount, expense, cur, order_date, order_number, payment_purpose, prop_10, prop_15, prop_20)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?);
INSERT INTO payment_document (account_id, amount, expense, cur, order_date, order_number, payment_purpose, prop_10, prop_15, prop_20)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?);
INSERT INTO payment_document (account_id, amount, expense, cur, order_date, order_number, payment_purpose, prop_10, prop_15, prop_20)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?);
INSERT INTO payment_document (account_id, amount, expense, cur, order_date, order_number, payment_purpose, prop_10, prop_15, prop_20)
Все знают, что обычный INSERT использовать не очень хорошо, потому что они небезопасные, там могут быть всякие SQL injection. Давайте сравним эти два INSERT и оценим насколько по скорости вставки они отличаются друг от друга. Переделаем немного наш сейвер:
private val dataForInsert = mutableLitOf<Any?>()
override fun addDataForSave(entity: E) {
dataForInsert.add(proessor.getStringForInsert(entity))
super.addDataForSave(entity)
}
override fun saveData() {
processor.insertDataToDataBasePreparedStatementBasic(entityClass, dataForInsert, conn)
dataForInsert.clear()
}
Теперь мы будем в нашем кэше хранить не строки, а список параметров, которые мы из нашей entity вытащили. В методе saveData(), соответственно, будем ставить вопросы и через setObject устанавливать нужные параметры.
val params = columns.joinToString(",") {"?"}
conn.prepareStatement(
"INSERT INTO $tablename ($columns.joinToString(",")}) VALUES ($params);"
).use { stmt ->
data.forEach { str ->
str.forEachIndexed { idx, col ->
stmt.setObject(idx + 1, col)
}
stmt.executeBatch()
}
Посмотрим на результат. Все результаты я буду приводить вот в таком графике, чтобы их было легко сравнивать друг с другом.

И мы получаем - 2 минуты и 33 секунды, то есть минус 30%. Обычный INSERT медленнее Prepared Statement приблизительно на 30%.
Давайте посмотрим, почему.
Statement vs Prepared Statement
Включим логи на стороне Postgres: логи по операциям bind, parse и execute.
ALTER DATABASE postgres SET log_statement = 'all';
ALTER DATABASE postgres SET log_duration = 'on';
ALTER DATABASE postgres SET log_min_duration_statement = 0;
Выведем время и посмотрим, сколько каждый этап операции занимает в Postgres:

Полученные цифры я вывел в такую табличку, чтобы было легко сравнивать.

Во-первых, Prepared Statement парсинг выполнился всего лишь один раз. И, если внимательно посмотреть на эти цифры, то можно увидеть, что первые шесть раз binding тоже выполняется немного дольше: сначала сотые доли миллисекунды, а потом уже тысячные доли миллисекунды. У обычного Statement всегда цифры плюс-минус одинаковые.
Давайте разбираться, в чем причина. Всему виной оптимизация JDBC-драйвера, которая называется Server Prepare. Она служит как раз для того, чтобы Postgres мог использовать планы запросов и избежать overhead на парсинге и планировании запросов.
Как это работает под капотом?
Server Prepared Statements
JDBC-драйвер понимает, что текущий запрос, который он будет отправлять в Postgres, не мусорный и назначает ему какой то id, например, S_1. Затем он говорит Postgres, чтобы тот закешировал запрос и рассчитал по этому запросу оптимальный план, чтобы его быстро выполнять. Далее JDBC-драйвер будет передавать в Postgres только этот id.
Чтобы не кешировать все запросы подряд из-за ограничений по памяти, у JDBC-драйвера есть параметр prepareThreshold для определения мусорных запросов. По умолчанию этот параметр равен 5. То есть один и тот же запрос должен повториться 5 раз, чтобы JDBC-драйвер его начал кешировать. Но почему у нас только один раз выполнился parsing и запрос сразу закешировался?
В JDBC-драйвере есть ещё одна недокументированная оптимизация, которая касается именно INSERT'ов. Если мы используем INSERTы и методы addBatch() и executeBatch(), то JDBC-драйвер понимает, что сейчас к нему полетят много INSERT'ов, так почему бы их сразу же все не закешировать?
По этой причине parsing выполнен только один раз у Prepared Statement. Почему binding выполнялся дольше первые шесть раз? PostgreSQL рассчитывает для данного запроса оптимальный план, потом его кеширует для быстрого исполнения.
Помимо обычных INSERT'ов, есть мультистрочные INSERT'ы.
INSERT INTO payment_document (account_id, amount, expense, cur, order_date, order_number, payment_purpose, prop_10, prop_15, prop_20)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?),
(?, ?, ?, ?, ?, ?, ?, ?, ?, ?),
(?, ?, ?, ?, ?, ?, ?, ?, ?, ?);
Вроде как они должны работать быстрее.
Postgres эти INSERT'ы также поддерживает. Мультистрочные INSERT'ы – это когда в оператор INSERT INTO мы передаём значения списком, этаким массивом.
Давайте попробуем добавить ещё один сейвер. Сам сейвер практически не будет ничем отличаться от того, что мы видели выше. Будет отличаться только вызываемый метод.
val params = columns.joinToString(",") {"?"}
conn.prepareStatement(
"INSERT INTO $tablename ($columns.joinToString(",")}) VALUES $ (List(data.size) {index -> (${params})$if (index == data.lastIndex) ";" else ","}.joinToString("\n")}"
).use { stmt ->
var idx = 0
data.forEach { str ->
str.forEach { col ->
idx++
stmt.setObject(idx, col)
}
}
stmt.executeLargeUpdate()
}
Давайте посмотрим на результат.

-1 минута 10 секунд (примерно 20%).
У этого подхода есть один недостаток — ограничение по количеству параметров. Вот, к примеру, у меня самым эффективным размером получился батч в 5 000 записей. На моей таблице, на моем оборудовании он был самый эффективный. 5 000 записей, 10 колонок, 50 000 параметров я уже использовал. То есть если бы я сделал размер батча чуть побольше или же добавились еще какие-то колонки, то легко можно получить такую ошибку:

То есть моя система перестанет работать. Как быть в этом случае? JDBC-драйвер тоже умеет за нас выполнять эту работу. Есть такой параметр, который называется reWriteBatchedInserts. JDBC-драйвер будет принимать обычные однострочные INSERTы и внутри себя переделывать в мультистрочные INSERTы. Давайте посмотрим на результат:

Получилось +10 секунд (около 3%). Разница, безусловно, есть. Давайте поймем, она заключается.
Для этого откроем сам код и посмотрим. Для того, чтобы сэкономить на кэше, JDBC-драйвер объединяет мультистрочные INSERT'ы по степени двойки. То есть вы увидите там такой размер мультистрочного INSERT'а, как 8, 16, 32, 64 и 128 — это верхний предел. Причём этот предел захардкожен напрямую в JDBC-драйвере, его поменять никак нельзя. Если вы вставляете 200 строк, то эти строки будут объединены в три мультистрочных пакета — по 128, 64 и 8 строк в каждом пакете. Соответственно, наша вставка с reWriteBatchedInsterts, которая, состоит из 5 000 записей, будет объединена в 40 пакетов, а не на один пакет, как мы это сделали руками. Зато не нужно следить за количеством параметров и рисковать попасть на ограничение в 65 636 параметров, за нас всю работу делает JDBC-драйвер. Да, получается чуть медленнее, но в большинстве случаев это не критично.
Insert select from unnest
Итак, какие у нас еще есть способы вставки данных? Если мы вернёмся к ошибке выше и внимательно на неё посмотрим, то заметим следующее:

JDBC-драйвер советует использовать некий arrays, то есть массивы. Как мы это можем сделать? В PostgreSQL можно вставлять данные не только через параметры, но и через выборки из других таблиц.
Один из вариантов это при помощи метода unnest передать в наш SELECT данные по колонкам, то есть данные в массивах по колонкам:

Например, массив значений account_id, массив amount, массив cur и так далее — по одному массиву на каждую колонку.
Давайте посмотрим, сколько будет «весить» этот метод. Таким образом, вместо 50 000 параметров (5 000 строк × 10 колонок) у нас остаётся всего 10 параметров — по количеству колонок. Единственное, что нам потребуется — это «перевернуть» наш массив.

Через createArrayOf() в setArray() мы можем эти массивы устанавливать.
Важно учитывать, что PostgreSQL требует явного указания типа массива. Его можно получить дополнительным SELECT и закешировать.
Посмотрим на результат:

Этот подход дал выигрыш около 14 секунд, то есть примерно 5% по сравнению с предыдущим результатом. Он оказался быстрее мультистрокового INSERT и при этом полностью избавляет нас от риска превышения лимита параметров. Вполне рабочий и удобный вариант.
COPY
Вернёмся к сообщению JDBC-драйвера. Помимо советов использовать массивы, он также рекомендует обратить внимание на метод COPY:

COPY — это метод, предназначенный для копирования данных между файлом и таблицей. Почему бы не попробовать использовать его?
Синтаксис COPY достаточно простой и напоминает обычный INSERT. У драйвера есть замечательный copyAPI и метод copyIn(). В конце указываем источник данных (можно брать данные, например, с диска или из памяти. Мы берём из памяти):
COPY payment_document (account_id, amount, expense, cur, order_date, order_number, payment_purpose, prop_10, prop_15, prop_20) FROM STDON (DELIMITER '|', NULL 'NULL')
fun saveToDataBaseByCopyMethod(
tableName: String,
columns: String,
delimiter: String,
nullValue: String,
from: Reader,
conn: Connection
) {
conn.unwrap(PGConnection::class.java) copyAPI.copyIn(
"COPY $tableName ($columns) FROM STDIN (DELIMITER '$delimiter', NULL '$nullValue')",
from
)
}
Посмотрим на результат:

COPY дал выигрыш около 18 секунд, что составляет примерно 6% относительно предыдущего результата.
COPY binary
Посмотрим, что ещё умеет COPY. Помимо обычного текстового формата есть бинарный формат. В документации написано следующее:

Последнее утверждение ставит под вопрос использование этого формата. Никто не хочет ситуации, когда при обновлении версии Postgres в продакшн сломаются какие-то вставки данных. Но давайте посмотрим, насколько формат binary будет эффективным.
Сделаем свой обработчик. Для того, чтобы использовать бинарный COPY, нужно приблизительно таким образом записать данные в файл:

То есть, первым делом — начало файла, затем идёт количество колонок, потом сами данные и конец файла. Но не со всеми данными так просто.
COPY binary. LocalDate
Вот, к примеру, преобразование LocalDate в бинарный вид:

Код взят из JDBC-драйвера.
COPY binary. BigDecimal
Если взять стандартный для Java тип BigDecimal и попробовать преобразовать его в тот формат, который воспринимает база данных, то там вообще будет 110 строк кода. Это нужно учитывать.

Те части кода из JBDC-драйвера, которые нельзя было переиспользовать я просто скопировал в свой проект.
Посмотрим на результаты:

Итого: -4 секунды, то есть получилось сэкономить 2% при помощи COPY binary.
Я сравнивал данные подходы в PostgreSQL 14 и 17 версиях. В 14 версии COPY был стабильно медленнее, чем формат CSV. В PostgreSQL 17 версии COPY оказался немного быстрее, чем формат CSV. Важно: перед тем, как использовать какие-либо методы в своем проекте, проверьте их на своих таблицах, данных и оборудовании.
К примеру, в документации к reWriteBatchedInserts написано, что производительность от его использования увеличивается в 2-3 раза. По факту у меня получилось около 20%. Когда я общался с Владимиром Ситниковым по этому поводу, то он мне показал бенчмарк на котором получили такие результаты. В качестве источника данных была одна маленькая табличка, без индексов. Там метод действительно даёт прирост в 2-3 раза. Как только мы добавим индексов, добавим колонки, то прирост получится меньше. Перед тем, как использовать методы, обязательно тестируйте их!
Выводы. Однопоточная вставка
Запросы через prepared statement работают быстрее из-за кэширования планов на клиенте и на сервере.
Многострочный insert быстрее на 20%. Можно использовать параметр
reWriteBatchedInserts. Не надо ничего переписывать в своем коде.
COPY binary и текстовый форматы имеют приблизительно одинаковые показатели по скорости. Максимум, который я мог выжать из обычного COPY — это 10% относительно мультистрочных insert.
Многопоточная вставка
Как мы пришли к многопоточной вставке?
Я замерил время, которое мой основной поток (который генерирует мои entity) тратит на то, чтобы ждать, пока сохранятся batches в БД.

У меня получилось 77% времени. Получается, что 77% времени мой основной поток спит и ждёт, пока данные запишутся в БД. Давайте заставим наш поток продолжать работу, а сохранение данных отдадим другим потокам.
Что мы хотим сделать?
Мы заведем отдельный ThreadPoolExecutor для управления задачами.
Сделаем ConcurrentSaverHandler, который будет распределять нагрузку между несколькими Saver.
Расширим функциональность CopyBinaryByEntitySaver для неблокирующей отправки данных в БД.
Copy binary concurrent
Вот как это будет выглядеть на схеме:

У нас есть Handler. У Handler'а есть несколько Saver'ов. Каждый Saver работает в своей транзакции и сессии. Handler сначала генерирует данные в один Saver. Как только тот Saver пошёл отправлять данные в БД, мы переключаемся на другой и так по кругу. Каждый Saver в это время работает и записывает данные.
Посмотрим на результат:

Нам удалось ускорить нашу вставку данных более, чем в 2 раза. Однако у этого подхода есть один большой недостаток — все Saver'ы работают в своих сессиях. Конечно же, мы будем стараться выполнить commit данных в наших saver одномоментно. Да и у Postgres коммит — это достаточно быстрая операция, которая занимает примерно 10-ые доли миллисекунды. Фактически, это внесение изменения в Commit Log и ещё одну табличку по активным транзакциям. Тем не менее, всегда что-то может пойти не так: какой-то Saver успеет сохранить данные, а другой Saver не успеет этого сделать. Это нарушает атомарность, которую в случае однопоточной вставки гарантирует нам сама БД. Давайте попробуем реализовать атомарность на уровне бизнес-логики.
Атомарность через флаг
Первое что приходит в голову, это воткнуть какой-нибудь флаг прямо в саму таблицу. Пускай этот флаг называется ready_to_read.
ALTER TABLE PAYMENT DOCUMENT ADD COLUMN ready_to_read bool NOT NULL DEFAULT true;
ALTER TABLE PAYMENT_DOCUMENT ADD COLUMN transaction_id uuid DEFAULT null;
CREATE INDEX IX PAYMENT DOCUMENT transaction_id on PAYMENT DOCUMENT (transaction_id) where payment_document.transaction_id is not null;
SELECT *
FROM payment document paymentdoc
WHERE paymentdoc.ready_to_read = true
В своих SELECT'ах мы будем выбирать актуальные данные с условием, где у нас ready_to_read = true. Дополнительно создадим одну индексную колонку transaction_id в нашей основной таблице с исключением значений типа null. По этой колонке мы будем искать те строки в текущей транзакции, которые мы вставили, для того, чтобы их обновить и поставить ready_to_read = true.
И здесь стоит задуматься. Насколько обновление — это тяжелая операция?
Set read to read by transaction id
Для этого мы изолированно измерим сколько будет весить установка флага ready_to_read, у нас за это будет отвечать метод SetReadyToReadByTransactionId:
val transactionId = Generators.timeBasedEpochGenerator().generate()
service.saveByCopyConcurrentForUpdate(count, transactionId)
val time = measureTimeMillis{
service.setReadyToReadByTransactionId(transactionId)
}
Сам метод будет выглядеть следующим образом:
fun setReadyToReadByTransactionId(transactionId: UUID): Int {
val conn = dataSource.connection
return conn
.prepareStatement("update payment_document set ready_ro_read = true where transaction_id = ?")
.use { ps ->
ps.setObject(transactionId)
ps.executeUpdate()
}
}
Давайте посмотрим на результат:

Время совершенно неудовлетворительное для того, чтобы взять и использовать метод для сохранения атомарности.
Как работает PostgreSQL?
Почему для PostgreSQL обновление – это тяжелая операция? Давайте разберемся как работает PostgreSQL под капотом:
MVCC (Multiversion Concurrency Control) многоверсионная управление конкурентным доступом. Это значит, что у каждой строки есть своя версия.
Заголовки строк (24 байта) содержат информацию о версиях.
Посмотреть данные станицы индексов и данных можно при помощи расширения pageInspect:
CREATE EXTENSION pageInspect;
select * from heap_page_items(get_raw_page('payment_document', 0));
select * from bt_page_items('pk_payment_document', 1);
Обновление индексного поля
На примере обновления индексного поля я хочу показать как работает MVCC.

Мы один раз создали запись, 2 раза её обновили. На странице с данными мы получили 3 версии. В PostgreSQL счётчик транзакций — это некий long, который просто постоянно увеличивается на единичку. Здесь нам интересно посмотреть на страницу с первичным ключом. Мы видим, что у нас на каждую версию строки создалась своя версия на странице с индексом. У PostgreSQL есть одна очень хорошая оптимизация, которая называется HOT (Heap-Only Tuple).
HOT (Heap-Only Tuple)-обновления
HOT позволяет не создавать дополнительные ссылки индексов на строки данных. Это применимо, когда:
На странице есть место для вставки новой строки.
На обновляемых столбцах нет индексов.
Обновляемые столбцы есть в индексах, но их значения не меняются
Посмотрим на пример обновления не индексного поля, чтобы понять, как работает оптимизация HOT-обновлений.
Попробуем изменить наш флаг ready_to_read и посмотрим на результат.

Во-первых, на странице с индексами у нас осталась одна запись, которая указывает на первую строку на странице с данными. Дополнительно сюда я вывел ссылку на следующую строку в цепочке. PostgreSQL по этой ссылке найдёт наши актуальные данные. Так как PostgreSQL читает все данные в память постранично, то эта операция должна быть достаточно быстрой.
Теперь посмотрим, были ли наши обновления HOT. Мы меняли не индексное поле, это флаг ready_to_read. Осталось посмотреть, было ли место на странице для обновлений.
По умолчанию PostgreSQL максимально уплотняет свои страницы, чтобы экономить наши ресурсы, память. Оставить место на странице для HOT-обновлений можно при помощи параметра Fillfactor.
Fillfactor для HOT-обновлений
Fillfactor — процент наполнения страниц при вставке данных.
При достижении указанного порога в Fillfactor вставка будет осуществляться на новую страницу, а незаполненное место будет оставаться под HOT-обновления. Установить его можно следующим образом:
alter table payment_document set (fillfactor = 80);
В таблице ниже показано влияние fillfactor на время обновления и на размер таблицы (обновление 4 млн строк):

Мы получили хороший результат по ускорению производительности. И это мы уже можем использовать для сохранения атомарности. Это уже укладывается в наш SLA.
Почему именно при fillfactor 50% мы получили такое кратное ускорение? Потому что как только мы оставили ровно половину, чтобы у нас каждое обновление прошло через HOT, мы получили такую производительность.
Давайте попробуем использовать эту идею в своем коде:

Создадим некий transactionId. Этот transactionId передадим в нашу генерируемую entity. Через метод apply ставим флаг readyToRead в false. Затем по этому же transactionId мы этот флаг взводим в true.
Давайте посмотрим на результат:

Мы потеряли 30 секунд от предыдущего результата. Тем не менее это все равно хороший результат, который нас в целом устраивает. Получается, что мы в 2 раза ускорили вставку данных относительно однопоточной, при этом сохранив её атомарность.
Transaction id в двух таблицах
А что если вынести transactionId в отдельную таблицу?
Оставим в нашей таблице transaction_id, но сделаем его не индексным полем, чтобы он меньше влиял на вставки данных. Также добавим таблицу, где у нас будут храниться наши активные транзакции, и назовём её ACTIVE TRANSACTION. В этой таблице поле transaction_id будет индексным:
ALTER TABLE PAYMENT_DOCUMENT ADD COLUMN transaction_id uuid DEFAULT null;
CREATE TABLE ACTIVE_TRANSACTION
(
transaction_id uuid NOT NULL
);
CREATE INDEX IX_ACTIVE_TRANSACTION_transaction_id on ACTIVE_TRANSACTION (transaction_id);
Как будет выглядеть сама вставка?

Сначала мы добавляем запись в таблицу ACTIVE TRANSACTION. После этого делаем свои многопоточные вставки, затем удаляем запись из ACTIVE TRANSACTION.
Посмотрим на результ:

Итог: лучше на 29 секунд относительно предыдущего результата. В этом случае мы ничего не делали с PostgreSQL. Мы быстро сделали вставки, при этом сохранили атомарность данных, если их правильно читать.
Чтение актуальных данных
У нас есть два подхода как мы можем их читать:
Подход с использованием LEFT JOIN. Мы присоединяем к нашей основной таблице ACTIVE TRANSACTION, и в условии WHERE отбросим те записи, которые имеют активную транзакцию в таблице.
from payment_document pd
left outer join active_transaction at on pd.transaction_id = at.transaction_id
where at.transaction_id is null
Анти-JOIN. Он есть в PostgreSQL и реализуется через NOT EXISTS. Соединение напрямую с таблицей ACTIVE TRANSACTION в условии WHERE:
from payment_document pd
where (NOT EXISTS (SELECT * FROM active_transaction at
WHERE at.transaction_id = pd.transaction_id))
В данном случае два запроса будут иметь практически одинаковые планы выполнения. PostgreSQL приведёт их к анти-JOIN. Здесь есть Sequence Scan, потому что сама таблица очень маленькая. Если таблица будет больше, PostgreSQL будет использовать индексы.

Теперь давайте посмотрим, насколько наши дополнительные условия (флаг ready_to_read или соединение с другими таблицами) будут влиять на итоговое количество TPS на выборку данных.
Влияние дополнительных условий на TPS
В своём бенчмарке я сделал несколько разрезов по незафиксированным строкам:
Первый разрез: 4 млн незафиксированных строк.
Второй разрез: 32 млн незафиксированных строк.
Выборка 1 записи по id
В синем квадрате указано целевое значение.

С выборкой одной записи по id отклонения незначительные. Да, это условие есть, оно немного отбирает количество TPS, но это некритично. То есть на выборку по первичному ключу это никак не влияет.
Выборка 10к записей по order_dt
Этот случай уже интереснее. Здесь мы выбираем 10 000 записей по индексному полю order_dt.

Здесь видно, что количество незафиксированных строк сильно влияет на то количество TPS, которое мы в итоге получаем.
Итог. Реализация атомарности
Нужно следить за количеством незафиксированных данных.
Реализация атомарности в смежных таблицах – не нужно тюнить Postgres, на 22% работает быстрее.
Все запросы должны учитывать реализацию атомарности на уровне бизнес-логики.
Реализация атомарности параллельных вставок на стороне PostgreSQL возможна?
В PostgreSQL есть метод PREPARE TRANSACTION, разра��отанный для двухфазной фиксации:

По умолчанию в PostgreSQL все транзакции прибиты к сессиям. То есть я могу закрыть сессию и у меня откатится транзакция. Этот метод позволяет отвязать текущую транзакцию от сессии. Получается, в одной сессии мы данные вставим, а в другой — закоммитим.
Однако у этого подхода есть свои недостатки. PREPARE TRANSACTION работает так же, как и обычный TRANSACTION, поэтому он:
удерживает блокировки,
мешает работать VACUUM.
В документации написано, что если у вас нету отдельного менеджера транзакции, который следит за PREPARE TRANSACTION, то его не рекомендуют использовать. PostgreSQL, как и любая другая БД, не любит долгих транзакций.

PREPARE TRANSACTION
Давайте посмотрим, как бы мы могли бы его использовать. Вывести информацию об активных подготовленных транзакциях можно следующим образом:
select * from pg_prepared_xacts;

Тут можно посмотреть информацию о текущих транзакциях (служебный ИД транзакции, ИД транзакции, который назначили мы) и время, когда данная транзакция была открыта. На это время можно опираться при разработке внешнего менеджера транзакций. Если вдруг какая-то транзакция “висит” больше некоторого времени, то значит, что-то пошло не так и можно эту транзакцию просто откатить (rollback).
Как бы мы это могли использовать? Например, в одной транзакции мы вставили данные и назначили некоторый transaction id, затем открыли второй connection и вставили другие данные, назначив другой transaction id. В конце, в отдельной транзакции выполнили по этим двум ИД commit prepared.

Но не все так просто как кажется. Пример который описан выше выдаст ошибку:

Оказывается, нельзя использовать commit prepared в одном транзакционном блоке. Эта ошибка в корне рубит нашу идею использовать его для атомарности, потому что мы также будем коммитить эти данные не атомарно. Также мы не можем использовать один и тот же transaction id для нескольких connection:

Почему? Потому что так тоже нельзя:

С этим вопросом я подходил к коллегам из Postgres PRO и предложил им реализовать решение, которое позволило бы нам использовать prepare transaction для сохранения атомарности. Коллеги откликнулись и прислали мне патч на тестирование.
Посмотрим, что из себя представляет этот патч.

Мы также задаем каждый transaction id для каждой сессии, но в commit prepared мы можем через запятую перечислить те транзакции, которые мы будем коммитить в БД.
Доработаем немного наш код, чтобы наш Saver выполнял prepared transaction перед коммитом:

Наш handler будет управлять этими транзакциями. Перед тем, как закоммитить все данные, он соберёт все transaction id в список через запятую и закоммитит их.

Посмотрим на результат:

Результат примерно такой же, как мы получили с атомарностью в другой таблице, за исключением того, что теперь нам не нужно писать дополнительных условий к запросу (где мы писали ready_to_read = true или же соединяться с другими таблицами).
Что насчёт TPS?
Мы отдали эту проблему на сторону PostgreSQL. Может быть, там у нас будут какие то улучшения по производительности.
Патч был выпущен к PostgreSQL 18, а тестирование я проводил на версии 17.4, поэтому результаты TPS будут немного отличаться друг от друга.
Выборка одной записи по id

Разницы с выборкой по первичному ключу нет. Она по-прежнему быстро работает.
Выборка 10к записей по order_dt

А вот здесь изменений в лучшую сторону не произошло.
То есть вне зависимости от того, сами мы фильтруем незафиксированные данные, или PostgreSQL фильтрует их за нас, они одинаково влияют на показатель TPS. Соответственно, чуда не произошло.
Итоги Prepared Transaction
Незафиксированные строки также влияют на TPS, как и в предыдущих вариантах.
За незафиксированными транзакциями нужно следить, откатывать поломанные.
Облегчает написание кода, не нужно делать дополнительные условия к select.
Нет гарантии попадания в релиз.
Итоги
Многострочный insert быстрее обычного. Можно использовать параметр
reWriteBatchedInserts.Самый быстрый метод для вставки данных – COPY, но не стоит ждать чуда.
Распараллеливание процесса вставки даёт наибольший прирост по производительности.
Сохранение атомарности лучше реализовывать через смежные таблицы. Нужно следить за незафиксированными данными.
С патчем и prepared transaction работать удобнее, но нет гарантии, что он войдет в релиз.