У тех, кто не кодит, бытует мнение, что у разработчиков на любую задачу есть какое-то одно решение — бест-практика. Достаточно почитать SO или, вот теперь, пообщаться с нейронкой и — вуаля — задача сделана. На практике, даже у такой задачи, как вставить данные в БД, есть варианты решения, каждый со своими плюсами и минусами. Делюсь небольшим примером.
Задача
Пилю мок-утилиту для тестирования сервиса на работе. В одной из задач по утилите нужно было воркеру построчно валидировать XLSX-файл и результат записывать в БД. Далее результат подхватывал следующий воркер и отправлял DTO во внешнее API.
Я решал, как делать вставку данных для первого воркера. Код ниже синхронный, так как воркер написан на синхронной либе rq. Синтаксис запросов в БД на SQLAlchemy.
Реализация один: построчное сохранение в БД
Это было моим mvp-решением в виду простоты. Алгоритм:
Читаем листы файла
Валидируем строки файла с помощью Pydantic
Собираем список из строк на вставку
Построчно коммитим в БД.
data_list: list[dict[str, Any]] = ... try: with SessionLocal() as db, db.begin(): # Контекстный менеджер сессии и транзакции for row in data_list: # Итерируем список с данными и каждую строку вставляем stmt = ... db.execute(stmt) except SQLAlchemyError as e: ...
Плюсы: простота. Легкость в обработке построчных исключений. Устойчивость к битым данным: аффектятся только битые строки, их сразу можно пометить, как проблемные. Остальное запишем.
Минусы: низкое быстродействие при больших объемах данных. Файлы пока у нас 1000 - 20 000 строк. Загружать файлы при тестировании будет вся команда QA. Дальше нагрузка может быть больше, если утилиту будем использовать в проде как источник данных.
Батчинг
Метод я решил отрефачить на вставку батчами — пачками данных, по 500–1000 строк за транзакцию. Таких транзакций может быть пара десятков Postgres и стенд спокойно это переварят. Наша команда владеет данными, может оперативно отлаживать битые строки.
Наполняем батч через валидацию Pydantic-схемы:
batch: list[dict[str, Any]] = [] for sheet_name, rows in data.items(): # вытаскиваем название листа и строки файла for row in rows: # итерируемся по каждой строке insert_obj = FileRowsInsertSchema( job_id=job_id, sheet_name=sheet_name, row_num=row.row_num, row_data=row.row_data, status=row.row_status, errors=row.errors, ) batch.append(insert_obj.model_dump()) # собираем список отвалидированных словарей для вставки
Из данных XLSX-файла берём листы и строки каждого листа, кастим строки в Pydantic-модель и дампаем их в словари. Получаем список словарей, готовый к вставке в таблицу БД. В какой момент формировать батч на вставку? Основных вариантов два: во время транзакции и до.
Реализация два: собирать батчи во время транзакции
Открываем транзакцию
Наполняем список отвалидированными данными
По мере наполнения чанка, вставляем его в БД
Очищаем батч и повторяем для следующего чанка
После завершения цикла не забываем вставить остаток, размер которого меньше чанка
try: with SessionLocal() as db, db.begin(): # помечаем джоб как VALIDATING stmt = select(FileJobsModel).where(FileJobsModel.job_id == job_id) job = db.execute(stmt).scalar_one_or_none() job.status = JobStatusEnum.VALIDATING.value # наполняем батч внутри транзакции ... # как только батч наполнен, вставляем в БД и чистим батч if len(batch) >= config.db.batch_size: db.execute(insert(FileRowsModel).values(batch)) batch.clear() # когда чанки закончились, заливаем в БД остаток if batch: db.execute(insert(FileRowsModel).values(batch)) except SQLAlchemyError as e: ...
Плюсы. У способа низкий риск out-of-memory: мы управляем размером списка, регулярно его очищаем. Подходит для нод с ограниченными ресурсами, например для тестовых стендов.
Минусы — хрупкость. В основном, из-за того, что иксель ненадежный источник данных. В упавшем батче нужно искать проблемные строки и решать, что с ними делать. Плюс, потенциально долгая транзакция: мы делаем валидацию, сборку батча и проверки внутри одной транзакции, держим локи и соединение с БД дольше. А также смешение зон ответственности. Код транзакции отвечает за валидацию данных.
Реализация три: собрать полный список на вставку до транзакции
К этому варианту пришел пообщавшись с нейронкой. Идея в том, чтобы оставить у функций только профильные зоны ответственности и не грузить логику вставки данных вычислительной логикой
Собираем полный список данных заранее
Открываем транзакцию
Внутри неё нарезаем чанки и вставляем в БД
try: # собираем batch до транзакции (код выше) # ... with SessionLocal() as db, db.begin(): # открываем сессию и транзакцию stmt = select(FileJobsModel).where(FileJobsModel.job_id == job_id) job = db.execute(stmt).scalar_one_or_none() job.status = JobStatusEnum.VALIDATING.value # вставка чанков chunk_size = config.db.chunk_size # размер чанка из конфига for i in range(0, len(batch), chunk_size): chunk = batch[i : i + chunk_size] db.execute(insert(FileRowsModel).values(chunk)) except SQLAlchemyError as e: pass
Плюсы — разделение зон ответственности: сбор и валидация данных выполняются до транзакции, код взаимодействия с БД только нарезает и инсертит. Транзакции короче, локи и конкуренция за коннекты ниже.
Минусы — аналогично варианту выше из-за икселя, как источника данных. А также память O(n). Если файл содержит десятки тысяч строк и много столбцов/листов, на слабой ноде может случиться OOM. Нужны надёжные политики ретраев, хелс-чеки и обработка рестартов воркера.
Я пока остановился на третьем варианте для нашей задачи. Поставим эмпирически размер чанка, чтобы сбалансировать скорость и хрупкость, настроим флоу для завалившихся инсертов и в путь. Конфликты уникальных ключей при вставке либо можно скипнуть с помощью `INSERT ... ON CONFLICT DO NOTHING/DO UPDATE`, либо использовать метод `Divide & Conquer` и уменьшить размер батча при повторной попытке инсерта. Это не такой простой механизм, как построчная вставка, но будем посмотреть.
```
def _insert_batch_or_split( db: Session, model: type, rows: list[dict[str, Any]], *, min_batch_size: int = 1, ) -> None: if not rows: return try: with db.begin(): # отдельная транзакция для этого батча db.execute(insert(model), rows) logger.info("Успешная вставка батча", size=len(rows)) except IntegrityError as exc: if len(rows) <= min_batch_size: logger.error("Пропускаем невалидную строку в батче", row=rows[0], error=str(exc)) return mid = len(rows) // 2 # Сплитуем и вызываем метод для суб-батчей _insert_batch_or_split(db, model, rows[:mid], min_batch_size=min_batch_size) _insert_batch_or_split(db, model, rows[mid:], min_batch_size=min_batch_size)
Итого
На вскидку можно предложить еще несколько вариантов решения теми или иными способами. У любых вариантов есть свои юзкейсы и ограничения. И это делает тему компромиссов и мышления полутонами в проектировании такой интересной. Задачи должны поощрять осознанный выбор следующего шага. А из осознанных шагов должны складываться надежные и полезные системы.
