У тех, кто не кодит, бытует мнение, что у разработчиков на любую задачу есть какое-то одно решение — бест-практика. Достаточно почитать SO или, вот теперь, пообщаться с нейронкой и — вуаля — задача сделана. На практике, даже у такой задачи, как вставить данные в БД, есть варианты решения, каждый со своими плюсами и минусами. Делюсь небольшим примером.

Задача

Пилю мок-утилиту для тестирования сервиса на работе. В одной из задач по утилите нужно было воркеру построчно валидировать XLSX-файл и результат записывать в БД. Далее результат подхватывал следующий воркер и отправлял DTO во внешнее API.

Я решал, как делать вставку данных для первого воркера. Код ниже синхронный, так как воркер написан на синхронной либе rq. Синтаксис запросов в БД на SQLAlchemy.

Реализация один: построчное сохранение в БД

Это было моим mvp-решением в виду простоты. Алгоритм:

  1. Читаем листы файла

  2. Валидируем строки файла с помощью Pydantic

  3. Собираем список из строк на вставку

  4. Построчно коммитим в БД.

data_list: list[dict[str, Any]] = ...

try:
    with SessionLocal() as db, db.begin(): # Контекстный менеджер сессии и транзакции
      for row in data_list: # Итерируем список с данными и каждую строку вставляем
        stmt = ...
        db.execute(stmt)
        
except SQLAlchemyError as e:
    ...
  • Плюсы: простота. Легкость в обработке построчных исключений. Устойчивость к битым данным: аффектятся только битые строки, их сразу можно пометить, как проблемные. Остальное запишем.

  • Минусы: низкое быстродействие при больших объемах данных. Файлы пока у нас 1000 - 20 000 строк. Загружать файлы при тестировании будет вся команда QA. Дальше нагрузка может быть больше, если утилиту будем использовать в проде как источник данных.

Батчинг

Метод я решил отрефачить на вставку батчами — пачками данных, по 500–1000 строк за транзакцию. Таких транзакций может быть пара десятков Postgres и стенд спокойно это переварят. Наша команда владеет данными, может оперативно отлаживать битые строки.

Наполняем батч через валидацию Pydantic-схемы:

batch: list[dict[str, Any]] = []
for sheet_name, rows in data.items():  # вытаскиваем название листа и строки файла
    for row in rows:  # итерируемся по каждой строке
        insert_obj = FileRowsInsertSchema(
            job_id=job_id,
            sheet_name=sheet_name,
            row_num=row.row_num,
            row_data=row.row_data,
            status=row.row_status,
            errors=row.errors,
        )
        batch.append(insert_obj.model_dump())  # собираем список отвалидированных словарей для вставки

Из данных XLSX-файла берём листы и строки каждого листа, кастим строки в Pydantic-модель и дампаем их в словари. Получаем список словарей, готовый к вставке в таблицу БД. В какой момент формировать батч на вставку? Основных вариантов два: во время транзакции и до.

Реализация два: собирать батчи во время транзакции

  1. Открываем транзакцию

  2. Наполняем список отвалидированными данными

  3. По мере наполнения чанка, вставляем его в БД

  4. Очищаем батч и повторяем для следующего чанка

  5. После завершения цикла не забываем вставить остаток, размер которого меньше чанка

try:
    with SessionLocal() as db, db.begin():
        # помечаем джоб как VALIDATING
        stmt = select(FileJobsModel).where(FileJobsModel.job_id == job_id)
        job = db.execute(stmt).scalar_one_or_none()
        job.status = JobStatusEnum.VALIDATING.value

        # наполняем батч внутри транзакции
        ...

                # как только батч наполнен, вставляем в БД и чистим батч
                if len(batch) >= config.db.batch_size:
                    db.execute(insert(FileRowsModel).values(batch))
                    batch.clear()
        # когда чанки закончились, заливаем в БД остаток
        if batch:
            db.execute(insert(FileRowsModel).values(batch))

except SQLAlchemyError as e:
    ...
  • Плюсы. У способа низкий риск out-of-memory: мы управляем размером списка, регулярно его очищаем. Подходит для нод с ограниченными ресурсами, например для тестовых стендов.

  • Минусы — хрупкость. В основном, из-за того, что иксель ненадежный источник данных. В упавшем батче нужно искать проблемные строки и решать, что с ними делать. Плюс, потенциально долгая транзакция: мы делаем валидацию, сборку батча и проверки внутри одной транзакции, держим локи и соединение с БД дольше. А также смешение зон ответственности. Код транзакции отвечает за валидацию данных.

Реализация три: собрать полный список на вставку до транзакции

К этому варианту пришел пообщавшись с нейронкой. Идея в том, чтобы оставить у функций только профильные зоны ответственности и не грузить логику вставки данных вычислительной логикой

  1. Собираем полный список данных заранее

  2. Открываем транзакцию

  3. Внутри неё нарезаем чанки и вставляем в БД

try:
    # собираем batch до транзакции (код выше)
    # ...

    with SessionLocal() as db, db.begin():  # открываем сессию и транзакцию
        stmt = select(FileJobsModel).where(FileJobsModel.job_id == job_id)
        job = db.execute(stmt).scalar_one_or_none()
        job.status = JobStatusEnum.VALIDATING.value

        # вставка чанков
        chunk_size = config.db.chunk_size  # размер чанка из конфига
        for i in range(0, len(batch), chunk_size):
            chunk = batch[i : i + chunk_size]
            db.execute(insert(FileRowsModel).values(chunk))

except SQLAlchemyError as e:
    pass
  • Плюсы — разделение зон ответственности: сбор и валидация данных выполняются до транзакции, код взаимодействия с БД только нарезает и инсертит. Транзакции короче, локи и конкуренция за коннекты ниже.

  • Минусы — аналогично варианту выше из-за икселя, как источника данных. А также память O(n). Если файл содержит десятки тысяч строк и много ст��лбцов/листов, на слабой ноде может случиться OOM. Нужны надёжные политики ретраев, хелс-чеки и обработка рестартов воркера.

Я пока остановился на третьем варианте для нашей задачи. Поставим эмпирически размер чанка, чтобы сбалансировать скорость и хрупкость, настроим флоу для завалившихся инсертов и в путь. Конфликты уникальных ключей при вставке либо можно скипнуть с помощью `INSERT ... ON CONFLICT DO NOTHING/DO UPDATE`, либо использовать метод `Divide & Conquer` и уменьшить размер батча при повторной попытке инсерта. Это не такой простой механизм, как построчная вставка, но будем посмотреть.

```

def _insert_batch_or_split(
    db: Session,
    model: type,
    rows: list[dict[str, Any]],
    *,
    min_batch_size: int = 1,
) -> None:
    if not rows:
        return
    try:
        with db.begin():  # отдельная транзакция для этого батча
            db.execute(insert(model), rows)
            logger.info("Успешная вставка батча", size=len(rows))
    except IntegrityError as exc:
        if len(rows) <= min_batch_size:
            logger.error("Пропускаем невалидную строку в батче", row=rows[0], error=str(exc))
            return
        mid = len(rows) // 2
        # Сплитуем и вызываем метод для суб-батчей
        _insert_batch_or_split(db, model, rows[:mid], min_batch_size=min_batch_size)
        _insert_batch_or_split(db, model, rows[mid:], min_batch_size=min_batch_size)

Итого

На вскидку можно предложить еще несколько вариантов решения теми или иными способами. У любых вариантов есть свои юзкейсы и ограничения. И это делает тему компромиссов и мышления полутонами в проектировании такой интересной. Задачи должны поощрять осознанный выбор следующего шага. А из осознанных шагов должны складываться надежные и полезные системы.