У тех, кто не кодит, бытует мнение, что у разработчиков на любую задачу есть какое-то одно решение — бест-практика. Достаточно почитать SO или, вот теперь, пообщаться с нейронкой и — вуаля — задача сделана. На практике, даже у такой задачи, как вставить данные в БД, есть варианты решения, каждый со своими плюсами и минусами. Делюсь небольшим примером.
Задача
Пилю мок-утилиту для тестирования сервиса на работе. В одной из задач по утилите нужно было воркеру построчно валидировать XLSX-файл и результат записывать в БД. Далее результат подхватывал следующий воркер и отправлял DTO во внешнее API.
Я решал, как делать вставку данных для первого воркера. Код ниже синхронный, так как воркер написан на синхронной либе rq. Синтаксис запросов в БД на SQLAlchemy.
Реализация один: построчное сохранение в БД
Это было моим mvp-решением в виду простоты. Алгоритм:
Читаем листы файла
Валидируем строки файла с помощью Pydantic
Собираем список из строк на вставку
Построчно коммитим в БД.
data_list: list[dict[str, Any]] = ...
try:
with SessionLocal() as db, db.begin(): # Контекстный менеджер сессии и транзакции
for row in data_list: # Итерируем список с данными и каждую строку вставляем
stmt = ...
db.execute(stmt)
except SQLAlchemyError as e:
...Плюсы: простота. Легкость в обработке построчных исключений. Устойчивость к битым данным: аффектятся только битые строки, их сразу можно пометить, как проблемные. Остальное запишем.
Минусы: низкое быстродействие при больших объемах данных. Файлы пока у нас 1000 - 20 000 строк. Загружать файлы при тестировании будет вся команда QA. Дальше нагрузка может быть больше, если утилиту будем использовать в проде как источник данных.
Батчинг
Метод я решил отрефачить на вставку батчами — пачками данных, по 500–1000 строк за транзакцию. Таких транзакций может быть пара десятков Postgres и стенд спокойно это переварят. Наша команда владеет данными, может оперативно отлаживать битые строки.
Наполняем батч через валидацию Pydantic-схемы:
batch: list[dict[str, Any]] = []
for sheet_name, rows in data.items(): # вытаскиваем название листа и строки файла
for row in rows: # итерируемся по каждой строке
insert_obj = FileRowsInsertSchema(
job_id=job_id,
sheet_name=sheet_name,
row_num=row.row_num,
row_data=row.row_data,
status=row.row_status,
errors=row.errors,
)
batch.append(insert_obj.model_dump()) # собираем список отвалидированных словарей для вставкиИз данных XLSX-файла берём листы и строки каждого листа, кастим строки в Pydantic-модель и дампаем их в словари. Получаем список словарей, готовый к вставке в таблицу БД. В какой момент формировать батч на вставку? Основных вариантов два: во время транзакции и до.
Реализация два: собирать батчи во время транзакции
Открываем транзакцию
Наполняем список отвалидированными данными
По мере наполнения чанка, вставляем его в БД
Очищаем батч и повторяем для следующего чанка
После завершения цикла не забываем вставить остаток, размер которого меньше чанка
try:
with SessionLocal() as db, db.begin():
# помечаем джоб как VALIDATING
stmt = select(FileJobsModel).where(FileJobsModel.job_id == job_id)
job = db.execute(stmt).scalar_one_or_none()
job.status = JobStatusEnum.VALIDATING.value
# наполняем батч внутри транзакции
...
# как только батч наполнен, вставляем в БД и чистим батч
if len(batch) >= config.db.batch_size:
db.execute(insert(FileRowsModel).values(batch))
batch.clear()
# когда чанки закончились, заливаем в БД остаток
if batch:
db.execute(insert(FileRowsModel).values(batch))
except SQLAlchemyError as e:
...
Плюсы. У способа низкий риск out-of-memory: мы управляем размером списка, регулярно его очищаем. Подходит для нод с ограниченными ресурсами, например для тестовых стендов.
Минусы — хрупкость. В основном, из-за того, что иксель ненадежный источник данных. В упавшем батче нужно искать проблемные строки и решать, что с ними делать. Плюс, потенциально долгая транзакция: мы делаем валидацию, сборку батча и проверки внутри одной транзакции, держим локи и соединение с БД дольше. А также смешение зон ответственности. Код транзакции отвечает за валидацию данных.
Реализация три: собрать полный список на вставку до транзакции
К этому варианту пришел пообщавшись с нейронкой. Идея в том, чтобы оставить у функций только профильные зоны ответственности и не грузить логику вставки данных вычислительной логикой
Собираем полный список данных заранее
Открываем транзакцию
Внутри неё нарезаем чанки и вставляем в БД
try:
# собираем batch до транзакции (код выше)
# ...
with SessionLocal() as db, db.begin(): # открываем сессию и транзакцию
stmt = select(FileJobsModel).where(FileJobsModel.job_id == job_id)
job = db.execute(stmt).scalar_one_or_none()
job.status = JobStatusEnum.VALIDATING.value
# вставка чанков
chunk_size = config.db.chunk_size # размер чанка из конфига
for i in range(0, len(batch), chunk_size):
chunk = batch[i : i + chunk_size]
db.execute(insert(FileRowsModel).values(chunk))
except SQLAlchemyError as e:
pass
Плюсы — разделение зон ответственности: сбор и валидация данных выполняются до транзакции, код взаимодействия с БД только нарезает и инсертит. Транзакции короче, локи и конкуренция за коннекты ниже.
Минусы — аналогично варианту выше из-за икселя, как источника данных. А также память O(n). Если файл содержит десятки тысяч строк и много ст��лбцов/листов, на слабой ноде может случиться OOM. Нужны надёжные политики ретраев, хелс-чеки и обработка рестартов воркера.
Я пока остановился на третьем варианте для нашей задачи. Поставим эмпирически размер чанка, чтобы сбалансировать скорость и хрупкость, настроим флоу для завалившихся инсертов и в путь. Конфликты уникальных ключей при вставке либо можно скипнуть с помощью `INSERT ... ON CONFLICT DO NOTHING/DO UPDATE`, либо использовать метод `Divide & Conquer` и уменьшить размер батча при повторной попытке инсерта. Это не такой простой механизм, как построчная вставка, но будем посмотреть.
```
def _insert_batch_or_split(
db: Session,
model: type,
rows: list[dict[str, Any]],
*,
min_batch_size: int = 1,
) -> None:
if not rows:
return
try:
with db.begin(): # отдельная транзакция для этого батча
db.execute(insert(model), rows)
logger.info("Успешная вставка батча", size=len(rows))
except IntegrityError as exc:
if len(rows) <= min_batch_size:
logger.error("Пропускаем невалидную строку в батче", row=rows[0], error=str(exc))
return
mid = len(rows) // 2
# Сплитуем и вызываем метод для суб-батчей
_insert_batch_or_split(db, model, rows[:mid], min_batch_size=min_batch_size)
_insert_batch_or_split(db, model, rows[mid:], min_batch_size=min_batch_size)
Итого
На вскидку можно предложить еще несколько вариантов решения теми или иными способами. У любых вариантов есть свои юзкейсы и ограничения. И это делает тему компромиссов и мышления полутонами в проектировании такой интересной. Задачи должны поощрять осознанный выбор следующего шага. А из осознанных шагов должны складываться надежные и полезные системы.
