PM: Нам нужно актуализировать базу знаний для ИИ-ассистента,
там изменилась инструкция по смене пароля.
DevOps: Не проблема, сейчас запущу скрипт, через два часа всё обновится.
Предупреди Заказчика о недоступности сервиса.
Знакомая ситуация? Полная зачистка векторной базы и реиндексация всех имеющихся документов с остановкой сервиса - решение простое и надёжное, но «прощается» только на этапе прототипа.
В продакшене могут быть сотни тысяч документов, живые пользователи и SLA с требованием по доступности сервиса. Обновилась одна, пусть и очень важная инструкция, и сервис недоступен два часа. А если таких обновлений десятки в неделю? Давайте исправим это и напишем ETL-скрипт, который умеет добавлять, обновлять и удалять отдельные документы без остановки сервиса.
Нам потребуется Qdrant. На мой взгляд это отличный движок для векторного поиска с открытым исходным кодом, удобным API и интеграцией. Установить его можно через docker compose или использовать бинарник. У меня версия 1.14.1, не самая новая, но привычка. Если захотите более свежую, не забудьте про версию qdrant-клиента в окружении.
В репозитории туториала есть ноутбук с кодом и тестами, requirements.txt для установки окружения и короткая инструкция в README.
Чтобы упростить код я отказался от сегментации документов, в туториале один документ равен одному чанку. Как добавить сегментацию в код расскажу в конце статьи, в разделе «Замечания по масштабированию».
Для экспериментов я обычно использую тексты в жанре киберпанк. На этот раз тестовые документы будут содержать фрагменты синтетического законопроекта Директива 401. «О лицензировании корпоративного биологического присутствия и коммерческой эксплуатации белковых носителей». Заранее прошу не удивляться формулировкам.
Архитектура и логика
Система инкрементального обновления это ETL-скрипт на python. Qdrant используется через экземпляр класса QdrantVectorStore из LangChain. Ещё есть рекурсивная функция и две таблицы на pandas. И конечно хеширование. В качестве хеша - md5 дайджест полного пути к файлу с именем. Такой вариант позволяет не загружать сам документ в процессе обхода файловой системы.
Логика работы основана на статусах документа: new, update, delete. Статусы хранятся в мастер-таблице. После успешной загрузки документа в хранилище статус меняется на load, а при удалении документа запись о нём удаляется из мастер-таблицы.

Начнём с векторного хранилища. Инициализируем клиент. Векторы в Qdrant хранятся в именованных коллекциях. Клиент нужен чтобы создать коллекцию или удалить. Далее определяем эмбеддинг и создаём экземпляр векторного хранилища.
ETL-процесс включает четыре шага.
Первый шаг - рекурсивный обход директорий с документами. На входе - путь к корневой папке, на выходе таблица с перечнем файлов и их атрибутами: полный путь к файлу, имя файла, хеш-имя и дата модификации, а также поле для хранения статуса, пока пустое. Можно отказаться от рекурсии и сделать обход на базе функции os.walk. Вариант есть в ноутбуке, он закомментирован.
Код для рекурсивного обхода директорий
lst = list() def list_dir(root: str, lst: List[Any]) -> None: """ Recursive traversing over directories """ for node in os.listdir(root): if os.path.isdir(os.path.join(root, node)): node = os.path.join(root, node) list_dir(node, lst) # Recursion else: path = os.path.join(root, node) path = path.replace('\\', '/') full_path = path.replace(f'{ROOT_DOC_PATH}/', '') files_dict = { 'full_path': full_path, 'file_name': node, 'hash_name': md5(full_path.encode('utf-8')).hexdigest(), 'modified': datetime.fromtimestamp(os.path.getmtime(path)), 'status': ' ', } lst.append(files_dict) return None def scan_dir(directory: str) -> bool: """ Recursive scan over directories (file folder) """ print('Files scanning...') lst = list() # os.walk implementation is here list_dir(directory, lst) # comment this if os.walk used if lst: files_data = pd.DataFrame(lst) files_data.to_csv(os.path.join(STORAGE_PATH, 'files_data.csv'), sep=';', index=False) print(f'Recursive traversing Complete, total files: {files_data.shape[0]}') return True else: print(f'Recursive traversing Complete. No documents found') return False
Второй шаг - расчёт инкремента обновления или так называемой дельты. На входе таблица с результатами сканирования и мастер таблица. Если это первый запуск, то мастер таблицы ещё нет, и она создаётся копированием таблицы с результатами сканирования. На выходе — мастер-таблица с проставленными статусами (new, update, delete). Статусы определяются по следующей схеме:
1. Строим два списка с хеш-именами, new_hashes на основе сканирования, old_hashes на базе мастер-таблицы:
new_hashes = list(files_data.loc[:, 'hash_name']) old_hashes = list(master_data.loc[:, 'hash_name'])
2. Помечаем в мастер-таблице статусом delete все документы, хеш-имена которых отсутствуют в списке new_hashes
master_data.loc[~master_data.hash_name.isin(new_hashes), 'status'] = 'delete'
3. Обходим мастер-таблицу в цикле по строкам, пропуская документы со статусом delete, сравниваем даты модификаций для документов с одинаковым хеш-именем из таблицы файлов и мастер-таблицы. Если дата в таблице файлов больше даты в мастер-таблице, устанавливаем статус update и записываем новую дату в мастер-таблицу.
4. Делаем выборку из таблицы файлов для документов, хеш-имена которых отсутствуют в мастер-таблице, устанавливаем для них статус new и добавляем этот срез в мастер-таблицу.
new_data = files_data.loc[~files_data.hash_name.isin(old_hashes), :].copy() new_data.status = 'new' master_data = pd.concat([master_data, new_data], axis=0, ignore_index=True)
Весь код расчёта дельты
def create_delta() -> bool: """ Updating delta calculation""" print('\nIncrement calculation...') try: # Attempt to load the scan data files_data = pd.read_csv(os.path.join(STORAGE_PATH, 'files_data.csv'), sep=';') except FileNotFoundError: print(f'Files data not found, exit') return False try: # Attempt to load master data master_data = pd.read_csv(os.path.join(STORAGE_PATH, 'master_data.csv'), sep=';') except FileNotFoundError: print(f'Master data not found, create new') master_data = files_data.copy() # Copy master data from scan data master_data.status = 'new' master_data.to_csv(f'{STORAGE_PATH}/master_data.csv', sep=';', index=False) new_hashes = list(files_data.loc[:, 'hash_name']) old_hashes = list(master_data.loc[:, 'hash_name']) # Check for deleted files master_data.loc[~master_data.hash_name.isin(new_hashes), 'status'] = 'delete' # Check for updated files for i in master_data.index: if master_data.loc[i, 'status'] == 'delete': continue master_mod_date = datetime.fromisoformat(master_data.loc[i, 'modified']) status = master_data.loc[i, 'status'] hash_name = master_data.loc[i, 'hash_name'] file_mod_date_str = files_data.loc[files_data.hash_name == hash_name, 'modified'].values[0] file_mod_date = datetime.fromisoformat(file_mod_date_str) if file_mod_date > master_mod_date: master_data.loc[i, 'status'] = 'update' master_data.loc[i, 'modified'] = file_mod_date_str # Check for new files new_data = files_data.loc[~files_data.hash_name.isin(old_hashes), :].copy() new_data.status = 'new' master_data = pd.concat([master_data, new_data], axis=0, ignore_index=True) master_data.to_csv(f'{STORAGE_PATH}/master_data.csv', sep=';', index=False) print('Current master data saved') print(f"Deleted files: {master_data[master_data.status == 'delete'].shape[0]}") print(f"Updated files: {master_data[master_data.status == 'update'].shape[0]}") print(f"New files: {master_data[master_data.status == 'new'].shape[0]}") return True
Третий шаг - загрузка новых и обновляемых документов. На входе - мастер-таблица. Если в ней нет записей со статусами new, update, delete, процесс останавливается.
Файлы документов загружаются по полному пути из мастер-таблицы, а затем целиком помещаются в контейнер Document из LangChain с необходимым набором метаданных, в частности с хеш-именем. На выходе - бинарный файл с python-списком контейнеров.
Код загрузки новых и обновленных документов
def download_docs() -> bool: """ Create list of new and updated documents """ print('\nLoad docs...') try: master_data = pd.read_csv(os.path.join(STORAGE_PATH, 'master_data.csv'), sep=';') except FileNotFoundError: print(f'Master data not found, exit') return False # Check delta delta_size = master_data[master_data.status.isin(['new', 'update', 'delete'])].shape[0] if not delta_size: print(f'There is nothing to update, exit') return False docs = [] for i in master_data.index: status = master_data.loc[i, 'status'] full_path = master_data.loc[i, 'full_path'] if status in ['new', 'update']: # Loading document(s) text with open(os.path.join(ROOT_DOC_PATH, full_path), 'r', encoding='utf-8') as handle: doc_text = handle.read() doc_title = '' if re.search(r'# (.*)\n', doc_text): doc_title = re.search(r'# (.*)\n', doc_text).group(1) # Chunking should be here # splits = ... doc = Document( page_content=doc_text, metadata={ 'title': doc_title, 'hash_name': master_data.loc[i, 'hash_name'], 'mod_date': master_data.loc[i, 'modified'], 'size': len(doc_text), 'status': master_data.loc[i, 'status'], } ) docs.append(doc) # Comment this if splits are used # docs.extend(splits) # if chunking is used with open(os.path.join(STORAGE_PATH, 'documens.pkl'), 'wb') as handle: # Save in binary pickle.dump(docs, handle) print(f"New and updated docs: {len(docs)}") return True
Четвертый шаг - обновление коллекции векторного хранилища. На входе - мастер-таблица и бинарный файл с контейнерами документов. Документы удаляются или добавляются в хранилище по их ID, в качестве которого используется хеш-имя из мастер-таблицы. Если документ с таким хеш-именем уже есть в коллекции, он заново векторизируется и сохраняется на место старого вектора. Так работает обновление по документу. Превращение документа в вектор реализовано внутри объекта vector_store.
Код обновления векторного хранилища
def vstore_update(vector_store: QdrantVectorStore) -> bool: """ Update the vectors """ print('\nUpdating the vector store...') try: master_data = pd.read_csv(os.path.join(STORAGE_PATH, 'master_data.csv'), sep=';') except FileNotFoundError: print(f'Master data not found, exit') return False try: with open(os.path.join(STORAGE_PATH, 'documens.pkl'), 'rb') as handle: docs = pickle.load(handle) except FileNotFoundError: print(f'Documents not found, exit') return False # Create a lookup table to find a document by its hash-name. lookup = dict() for i in range(len(docs)): doc = docs[i] lookup[doc.metadata.get('hash_name', 'NF')] = i deleting = 0 loading = 0 for i in master_data.index: status = master_data.loc[i, 'status'] hash_name = master_data.loc[i, 'hash_name'] # Deleting if status == 'delete': # If chunking is used: # Don't forget to remove the splits for the updated document, # otherwise the content will be duplicated. # Here is a cycle through document splits, # or you can use a batch of splits if the documents are small in size. try: vector_store.delete(ids=[hash_name]) master_data.loc[i, 'status'] = 'delete' deleting += 1 except Exception as ex: print(f"Error while deleting: {ex}") # Adding new and updating vectors if status in ['new', 'update']: # Here is a cycle through document splits if chunking is used... try: doc = docs[lookup[hash_name]] vector_store.add_documents(documents=[doc], ids=[hash_name]) master_data.loc[i, 'status'] = 'load' loading += 1 except Exception as ex: print(f"Error while updating: {ex}") # Drop records with "delete" status master_data = master_data.loc[master_data.status != 'delete', :] master_data.to_csv(f'{STORAGE_PATH}/master_data.csv', sep=';', index=False) print('Current master data saved') print(f"Deleted files: {deleting}") print(f"Loaded files: {loading}") return True
Собственно, ETL-скрипт это тоже функция, которая последовательно запускает описанные шаги, если шаг возвращает «ложь», процесс останавливается.
Код ETL функции
def updating_script(vector_store: QdrantVectorStore) -> bool: scan_state = scan_dir(directory=ROOT_DOC_PATH) if not scan_state: return False delta_state = create_delta() if not delta_state: return False download_state = download_docs() if not download_state: return False update_state = vstore_update(vector_store=vector_store) if not download_state: return False return True
Давайте протестируем то, что у нас получилось.
Сценарий I. Первоначальная загрузка документов
Создадим два документа, один из которых сохраним в корневой папке, а второй в папке yaf (yet another folder) внутри корневой. Так сможем проверить обход директорий. Для демонстрации я подготовил функцию, которая возвращает метаданные первых десяти векторов в коллекции:
def collection_scroll(client: QdrantClient, collection_name: str = "Collection_D401") -> None: points, _ = client.scroll( collection_name="Collection_D401", limit=10, # Number of points to retrieve per page offset=None, with_payload=True, with_vectors=False ) for p in points: for k, v in p.payload.get('metadata').items(): print(f"{k}: {v}") print()
Запускаем скрипт, получаем вот такой лог:
Files scanning... Recursive traversing Complete, total files: 2 Increment calculation... Master data not found, create new Current master data saved Deleted files: 0 Updated files: 0 New files: 2 Load docs... New and updated docs: 2 Updating the vector store... Current master data saved Deleted files: 0 Loaded files: 2 CPU times: total: 3.44 s Wall time: 701 ms True
Смотрим на коллекцию:
collection_scroll(client=qdrant_client) title: Статья 15. Запрет на самолечение и неавторизованную регенерацию hash_name: 2121124bad94b5b0b8a6641d72d41292 mod_date: 2026-05-23 13:49:06.028412 size: 537 status: new title: Статья 14. Запрет на несанкционированное саморазвитие и био-взлом hash_name: 62fc20a3e20bb7b23efac3fe3ee05d08 mod_date: 2026-05-23 13:49:06.027412 size: 601 status: new
Этот сценарий можно запустить повторно (без повторной загрузки текстов), процесс должен остановиться с сообщением «Nothing to update, exit».
Лог скрипта
Files scanning... Recursive traversing Complete, total files: 2 Increment calculation... Current master data saved Deleted files: 0 Updated files: 0 New files: 0 Load docs... There is nothing to update, exit False
Как всё вернуть в исходное состояние, если потребуется?
- через клиент удалите коллекцию Collection_D401 в Qdrant
- в проектной папке удалите директории data (корневая папка) и storage (папка с промежуточными результатами ETL)
Сценарий II. Новый документ
Создаем новый документ и сохраняем его в ту же папку yaf. Запускаем скрипт.
Лог скрипта
Files scanning... Recursive traversing Complete, total files: 3 Increment calculation... Current master data saved Deleted files: 0 Updated files: 0 New files: 1 Load docs... New and updated docs: 1 Updating the vector store... Current master data saved Deleted files: 0 Loaded files: 1 CPU times: total: 1.58 s Wall time: 340 ms True
Сценарий III. Удаляем документ
Давайте удалим документ из корневой папки:
os.remove(os.path.join(ROOT_DOC_PATH, "D401A14.md"))
Лог скрипта
Files scanning... Recursive traversing Complete, total files: 2 Increment calculation... Current master data saved Deleted files: 1 Updated files: 0 New files: 0 Load docs... New and updated docs: 0 Updating the vector store... Current master data saved Deleted files: 1 Loaded files: 0 CPU times: total: 31.2 ms Wall time: 80.9 ms True
Сценарий IV. Обновление документа
Добавим текста в D401A16.md и запустим скрипт.
До обновления: title: Статья 16. Регламент фильтрации сенсорного восприятия и управления доступом hash_name: 193226b849538bae28eae4e9e716dce5 mod_date: 2026-05-20 13:02:05.476812 size: 563 status: new
После обновления: title: Статья 16. Регламент фильтрации сенсорного восприятия и управления доступом hash_name: 193226b849538bae28eae4e9e716dce5 mod_date: 2026-05-20 13:14:33.882040 size: 1002 status: update
Видим, что время модификации, размер и статус изменились.
Лог скрипта
Files scanning... Recursive traversing Complete, total files: 2 Increment calculation... Current master data saved Deleted files: 0 Updated files: 1 New files: 0 Load docs... New and updated docs: 1 Updating the vector store... Current master data saved Deleted files: 0 Loaded files: 1 CPU times: total: 2.66 s Wall time: 506 ms True
Замечания по масштабированию
1. Для обхода папок файловой системы я предпочитаю использовать os.walk. Рекурсивная функция поможет, если вместо файловой системы будет облачное хранилище. Главное, чтобы клиент хранилища поддерживал функции list_dir, is_dir или is_file. Тогда изменения в коде рекурсивной функции будут минимальны. Проверено на NextCloud и YAD. Также потребуется скорректировать операции по загрузке документов.
2. Код работает только c текстами в формате txt или markdown. Парсинг и конвертация - отдельная большая тема. Потребуется дополнительный шаг в ETL-скрипте сразу после калькуляции дельты. Я рекомендую конвертировать новые и обновленные документы в markdown, а затем сохранять их в отдельную папку c хеш-именем вместо названия файла (это можно и нужно делать в несколько потоков). И уже из этой папки загружать на следующем шаге.
3. Функции не используют внешние переменные в основном скрипте, всё взаимодействие организовано через файловую систему, то есть первая функция сохраняет файл, следующая его считывает. Это сделано специально, чтобы можно было быстро построить направленный ациклический граф (DAG), и реализовать ETL в AirFlow.
4. Про чанкинг - я пометил комментариями соответствующие места в функции загрузки документов, где должен быть реализован этот механизм. Нужно учитывать, что каждый фрагмент должен иметь свой уникальный id, обычно это uuid4 из библиотеки uuid. Фрагменты связаны с документом через хеш-имя в метаданных. При обновлении фрагментов одного и того же документа у них будут другие id и векторы в итоге задублируются. Поэтому придётся добавить в процедуру обновления полное удаление фрагментов обновляемого документа.
5. Удалять фрагменты удобно через функцию фильтрации метаданных Qdrant (пример фильтрации есть в конце ноутбука). Добавлять новые можно по одному, если эмбеддинг работает на CPU, списком или батчами, если есть поддержка CUDA.
6. В коде ETL-скрипта не предусмотрена проверка дублирования документов. Если один и тот же документ записать в разные папки, обе копии будут загружены. Проблема может быть решена разными способами. Самый простой - проверять на дублирование прямо в ретривере. Вариант посложнее - отслеживать уникальность файлов при сканировании.
7. В реальной системе лучше использовать asyncio и асинхронные функции для работы с векторным хранилищем (aadd_documents, adelete). Класс QdrantVectorStore поддерживает асинхронное взаимодействие.
Инкрементальное обновление векторной базы по отдельным документам занимает около секунды на документ (у меня ~300 мс на чанк с эмбеддингом deepvk/USER-bge-m3 на CPU). Мы работаем только с инкрементом, поэтому экономим часы компьюта на парсинг и векторизацию всего массива документов.
При высокой динамике обновления (например, логи или сообщения в чате) существует ненулевая вероятность, что в момент обновления релевантные векторы будут отсутствовать в коллекции. Но технически ничего не сломается. Просто LLM выдаст некорректный ответ. Так как векторное хранилище «не зачищается», вопрос остановки сервиса переходит в категорию оценки качества ответа. Я знаю несколько проектов, где сервис не останавливают, а обновление проводят в периоды наименьшей нагрузки, запуская ежедневно по планировщику.
А как в ваших проектах устроено обновление RAG-системы? Останавливаете сервис?
Напишите пожалуйста в комментариях.
Полезные ссылки:
LangChain + Qdrant
Фильтрация в Qdrant
Концепция DAG в Airflow
Код туториала
