PM: Нам нужно актуализировать базу знаний для ИИ-ассистента,
там изменилась инструкция по смене пароля.
DevOps: Не проблема, сейчас запущу скрипт, через два часа всё обновится.
Предупреди Заказчика о недоступности сервиса.

Знакомая ситуация? Полная зачистка векторной базы и реиндексация всех имеющихся документов с остановкой сервиса - решение простое и надёжное, но «прощается» только на этапе прототипа.

В продакшене могут быть сотни тысяч документов, живые пользователи и SLA с требованием по доступности сервиса. Обновилась одна, пусть и очень важная инструкция, и сервис недоступен два часа. А если таких обновлений десятки в неделю? Давайте исправим это и напишем ETL-скрипт, который умеет добавлять, обновлять и удалять отдельные документы без остановки сервиса.

Нам потребуется Qdrant. На мой взгляд это отличный движок для векторного поиска с открытым исходным кодом, удобным API и интеграцией. Установить его можно через docker compose или использовать бинарник. У меня версия 1.14.1, не самая новая, но привычка. Если захотите более свежую, не забудьте про версию qdrant-клиента в окружении.

В репозитории туториала есть ноутбук с кодом и тестами, requirements.txt для установки окружения и короткая инструкция в README.

Чтобы упростить код я отказался от сегментации документов, в туториале один документ равен одному чанку. Как добавить сегментацию в код расскажу в конце статьи, в разделе «Замечания по масштабированию».

Для экспериментов я обычно использую тексты в жанре киберпанк. На этот раз тестовые документы будут содержать фрагменты синтетического законопроекта Директива 401. «О лицензировании корпоративного биологического присутствия и коммерческой эксплуатации белковых носителей». Заранее прошу не удивляться формулировкам.

Архитектура и логика

Система инкрементального обновления это ETL-скрипт на python. Qdrant используется через экземпляр класса QdrantVectorStore из LangChain. Ещё есть рекурсивная функция и две таблицы на pandas. И конечно хеширование. В качестве хеша - md5 дайджест полного пути к файлу с именем. Такой вариант позволяет не загружать сам документ в процессе обхода файловой системы.

Логика работы основана на статусах документа: new, update, delete. Статусы хранятся в мастер-таблице. После успешной загрузки документа в хранилище статус меняется на load, а при удалении документа запись о нём удаляется из мастер-таблицы.

логика работы ETL-скрипта
логика работы ETL-скрипта

Начнём с векторного хранилища. Инициализируем клиент. Векторы в Qdrant хранятся в именованных коллекциях. Клиент нужен чтобы создать коллекцию или удалить. Далее определяем эмбеддинг и создаём экземпляр векторного хранилища.

ETL-процесс включает четыре шага.

Первый шаг - рекурсивный обход директорий с документами. На входе - путь к корневой папке, на выходе таблица с перечнем файлов и их атрибутами: полный путь к файлу, имя файла, хеш-имя и дата модификации, а также поле для хранения статуса, пока пустое. Можно отказаться от рекурсии и сделать обход на базе функции os.walk. Вариант есть в ноутбуке, он закомментирован.

Код для рекурсивного обхода директорий
lst = list()
def list_dir(root: str, lst: List[Any]) -> None:
    """ Recursive traversing over directories """
    for node in os.listdir(root):
        if os.path.isdir(os.path.join(root, node)):
            node = os.path.join(root, node)
            list_dir(node, lst)  # Recursion
        else:
            path = os.path.join(root, node)
            path = path.replace('\\', '/')
            full_path = path.replace(f'{ROOT_DOC_PATH}/', '')
            files_dict = {
                'full_path': full_path,
                'file_name': node,
                'hash_name': md5(full_path.encode('utf-8')).hexdigest(),
                'modified': datetime.fromtimestamp(os.path.getmtime(path)),
                'status': ' ',
            }
            lst.append(files_dict)
    return None

  def scan_dir(directory: str) -> bool:
    """ Recursive scan over directories (file folder) """
    print('Files scanning...')
    lst = list()
    # os.walk implementation is here
    list_dir(directory, lst)  # comment this if os.walk used
    if lst:
        files_data = pd.DataFrame(lst)
        files_data.to_csv(os.path.join(STORAGE_PATH,  'files_data.csv'), sep=';', index=False)
        print(f'Recursive traversing Complete, total files: {files_data.shape[0]}')       
        return True
    else:
        print(f'Recursive traversing Complete. No documents found')       
        return False

Второй шаг - расчёт инкремента обновления или так называемой дельты. На входе таблица с результатами сканирования и мастер таблица. Если это первый запуск, то мастер таблицы ещё нет, и она создаётся копированием таблицы с результатами сканирования. На выходе — мастер-таблица с проставленными статусами (new, update, delete). Статусы определяются по следующей схеме:

1. Строим два списка с хеш-именами, new_hashes на основе сканирования, old_hashes на базе мастер-таблицы:

new_hashes = list(files_data.loc[:, 'hash_name'])
old_hashes = list(master_data.loc[:, 'hash_name'])

2. Помечаем в мастер-таблице статусом delete все документы, хеш-имена которых отсутствуют в списке new_hashes

master_data.loc[~master_data.hash_name.isin(new_hashes), 'status'] = 'delete'

3. Обходим мастер-таблицу в цикле по строкам, пропуская документы со статусом delete, сравниваем даты модификаций для документов с одинаковым хеш-именем из таблицы файлов и мастер-таблицы. Если дата в таблице файлов больше даты в мастер-таблице, устанавливаем статус update и записываем новую дату в мастер-таблицу.

4. Делаем выборку из таблицы файлов для документов, хеш-имена которых отсутствуют в мастер-таблице, устанавливаем для них статус new и добавляем этот срез в мастер-таблицу.

new_data = files_data.loc[~files_data.hash_name.isin(old_hashes), :].copy()
new_data.status = 'new'
master_data = pd.concat([master_data, new_data], axis=0, ignore_index=True)

Весь код расчёта дельты
def create_delta() -> bool:
    """ Updating delta calculation"""
    print('\nIncrement calculation...')
    
    try:  # Attempt to load the scan data
        files_data = pd.read_csv(os.path.join(STORAGE_PATH, 'files_data.csv'), sep=';')
    except FileNotFoundError:
        print(f'Files data not found, exit')
        return False
    
    try:  # Attempt to load master data
        master_data = pd.read_csv(os.path.join(STORAGE_PATH, 'master_data.csv'), sep=';')
    except FileNotFoundError:
        print(f'Master data not found, create new')
        master_data = files_data.copy()  # Copy master data from scan data
        master_data.status = 'new'
        master_data.to_csv(f'{STORAGE_PATH}/master_data.csv', sep=';', index=False)
    
    new_hashes = list(files_data.loc[:, 'hash_name'])
    old_hashes = list(master_data.loc[:, 'hash_name'])

    # Check for deleted files
    master_data.loc[~master_data.hash_name.isin(new_hashes), 'status'] = 'delete'
        
    # Check for updated files
    for i in master_data.index:
        if master_data.loc[i, 'status'] == 'delete':
            continue
    
        master_mod_date = datetime.fromisoformat(master_data.loc[i, 'modified'])
        status = master_data.loc[i, 'status']
        hash_name = master_data.loc[i, 'hash_name']
        file_mod_date_str = files_data.loc[files_data.hash_name == hash_name, 'modified'].values[0]
        file_mod_date = datetime.fromisoformat(file_mod_date_str)
    
        if file_mod_date > master_mod_date:
            master_data.loc[i, 'status'] = 'update'
            master_data.loc[i, 'modified'] = file_mod_date_str

    # Check for new files
    new_data = files_data.loc[~files_data.hash_name.isin(old_hashes), :].copy()
    new_data.status = 'new'
    master_data = pd.concat([master_data, new_data], axis=0, ignore_index=True)

    master_data.to_csv(f'{STORAGE_PATH}/master_data.csv', sep=';', index=False)
    print('Current master data saved')

    print(f"Deleted files: {master_data[master_data.status == 'delete'].shape[0]}")
    print(f"Updated files: {master_data[master_data.status == 'update'].shape[0]}")
    print(f"New files: {master_data[master_data.status == 'new'].shape[0]}")
    return True

Третий шаг - загрузка новых и обновляемых документов. На входе - мастер-таблица. Если в ней нет записей со статусами new, update, delete, процесс останавливается.

Файлы документов загружаются по полному пути из мастер-таблицы, а затем целиком помещаются в контейнер Document из LangChain с необходимым набором метаданных, в частности с хеш-именем. На выходе - бинарный файл с python-списком контейнеров.

Код загрузки новых и обновленных документов
def download_docs() -> bool:
    """ Create list of new and updated documents """
    print('\nLoad docs...')
    try:
        master_data = pd.read_csv(os.path.join(STORAGE_PATH, 'master_data.csv'), sep=';')
    except FileNotFoundError:
        print(f'Master data not found, exit')
        return False

    # Check delta
    delta_size = master_data[master_data.status.isin(['new', 'update', 'delete'])].shape[0]
    if not delta_size:
        print(f'There is nothing to update, exit')
        return False
    
    docs = []
    for i in master_data.index:
        status = master_data.loc[i, 'status']
        full_path = master_data.loc[i, 'full_path']
        if status in ['new', 'update']:
            
            # Loading document(s) text
            with open(os.path.join(ROOT_DOC_PATH, full_path), 'r', encoding='utf-8') as handle:
                doc_text = handle.read()
                doc_title = ''
                if re.search(r'# (.*)\n', doc_text):
                    doc_title = re.search(r'# (.*)\n', doc_text).group(1)
            
            # Chunking should be here
            # splits = ...
            doc = Document(
                page_content=doc_text,
                metadata={
                    'title': doc_title,
                    'hash_name': master_data.loc[i, 'hash_name'],
                    'mod_date': master_data.loc[i, 'modified'],
                    'size': len(doc_text),
                    'status': master_data.loc[i, 'status'],
                }
            )
            docs.append(doc)  # Comment this if splits are used
            # docs.extend(splits)  # if chunking is used
    with open(os.path.join(STORAGE_PATH, 'documens.pkl'), 'wb') as handle:  # Save in binary
        pickle.dump(docs, handle)
    print(f"New and updated docs: {len(docs)}")
    return True

Четвертый шаг - обновление коллекции векторного хранилища. На входе - мастер-таблица и бинарный файл с контейнерами документов. Документы удаляются или добавляются в хранилище по их ID, в качестве которого используется хеш-имя из мастер-таблицы. Если документ с таким хеш-именем уже есть в коллекции, он заново векторизируется и сохраняется на место старого вектора. Так работает обновление по документу. Превращение документа в вектор реализовано внутри объекта vector_store.

Код обновления векторного хранилища
def vstore_update(vector_store: QdrantVectorStore) -> bool:
    """ Update the vectors """
    print('\nUpdating the vector store...')
    try:
        master_data = pd.read_csv(os.path.join(STORAGE_PATH, 'master_data.csv'), sep=';')
    except FileNotFoundError:
        print(f'Master data not found, exit')
        return False
    try:
        with open(os.path.join(STORAGE_PATH, 'documens.pkl'), 'rb') as handle:
            docs = pickle.load(handle)
    except FileNotFoundError:
        print(f'Documents not found, exit')
        return False

    # Create a lookup table to find a document by its hash-name.
    lookup = dict()
    for i in range(len(docs)):
        doc = docs[i]
        lookup[doc.metadata.get('hash_name', 'NF')] = i
    
    deleting = 0
    loading = 0
    for i in master_data.index:
        status = master_data.loc[i, 'status']
        hash_name = master_data.loc[i, 'hash_name']

        # Deleting
        if status == 'delete':
            # If chunking is used:
            # Don't forget to remove the splits for the updated document, 
            # otherwise the content will be duplicated.
            # Here is a cycle through document splits, 
            # or you can use a batch of splits if the documents are small in size.
            try:
                vector_store.delete(ids=[hash_name])
                master_data.loc[i, 'status'] = 'delete'
                deleting += 1
            except Exception as ex:
                print(f"Error while deleting: {ex}")
            
        # Adding new and updating vectors
        if status in ['new', 'update']:
            # Here is a cycle through document splits if chunking is used...
            try:
                doc = docs[lookup[hash_name]]
                vector_store.add_documents(documents=[doc], ids=[hash_name])
                master_data.loc[i, 'status'] = 'load'
                loading += 1
            except Exception as ex:
                print(f"Error while updating: {ex}")

    # Drop records with "delete" status
    master_data = master_data.loc[master_data.status != 'delete', :]
    master_data.to_csv(f'{STORAGE_PATH}/master_data.csv', sep=';', index=False)
    print('Current master data saved')
    print(f"Deleted files: {deleting}")
    print(f"Loaded files: {loading}")
    return True

Собственно, ETL-скрипт это тоже функция, которая последовательно запускает описанные шаги, если шаг возвращает «ложь», процесс останавливается.

Код ETL функции
def updating_script(vector_store: QdrantVectorStore) -> bool:
    scan_state = scan_dir(directory=ROOT_DOC_PATH)
    if not scan_state:
        return False

    delta_state = create_delta()
    if not delta_state:
        return False

    download_state = download_docs()
    if not download_state:
        return False

    update_state = vstore_update(vector_store=vector_store)
    if not download_state:
        return False
    return True

Давайте протестируем то, что у нас получилось.

Сценарий I. Первоначальная загрузка документов

Создадим два документа, один из которых сохраним в корневой папке, а второй в папке yaf (yet another folder) внутри корневой. Так сможем проверить обход директорий. Для демонстрации я подготовил функцию, которая возвращает метаданные первых десяти векторов в коллекции:

def collection_scroll(client: QdrantClient, collection_name: str = "Collection_D401") -> None:
    points, _ = client.scroll(
        collection_name="Collection_D401",
        limit=10, # Number of points to retrieve per page
        offset=None,
        with_payload=True,
        with_vectors=False
    )
    for p in points:
        for k, v in p.payload.get('metadata').items():
            print(f"{k}: {v}")
        print()

Запускаем скрипт, получаем вот такой лог:

Files scanning...
Recursive traversing Complete, total files: 2

Increment calculation...
Master data not found, create new
Current master data saved
Deleted files: 0
Updated files: 0
New files: 2

Load docs...
New and updated docs: 2

Updating the vector store...
Current master data saved
Deleted files: 0
Loaded files: 2
CPU times: total: 3.44 s
Wall time: 701 ms
True

Смотрим на коллекцию:

collection_scroll(client=qdrant_client)

title: Статья 15. Запрет на самолечение и неавторизованную регенерацию
hash_name: 2121124bad94b5b0b8a6641d72d41292
mod_date: 2026-05-23 13:49:06.028412
size: 537
status: new

title: Статья 14. Запрет на несанкционированное саморазвитие и био-взлом
hash_name: 62fc20a3e20bb7b23efac3fe3ee05d08
mod_date: 2026-05-23 13:49:06.027412
size: 601
status: new

Этот сценарий можно запустить повторно (без повторной загрузки текстов), процесс должен остановиться с сообщением «Nothing to update, exit».

Лог скрипта
Files scanning...
Recursive traversing Complete, total files: 2

Increment calculation...
Current master data saved
Deleted files: 0
Updated files: 0
New files: 0

Load docs...
There is nothing to update, exit

False

Как всё вернуть в исходное состояние, если потребуется?

- через клиент удалите коллекцию Collection_D401 в Qdrant
- в проектной папке удалите директории data (корневая папка) и storage (папка с промежуточными результатами ETL)

Сценарий II. Новый документ

Создаем новый документ и сохраняем его в ту же папку yaf. Запускаем скрипт.

Лог скрипта
Files scanning...
Recursive traversing Complete, total files: 3

Increment calculation...
Current master data saved
Deleted files: 0
Updated files: 0
New files: 1

Load docs...
New and updated docs: 1

Updating the vector store...
Current master data saved
Deleted files: 0
Loaded files: 1
CPU times: total: 1.58 s
Wall time: 340 ms

True

Сценарий III. Удаляем документ

Давайте удалим документ из корневой папки:

os.remove(os.path.join(ROOT_DOC_PATH, "D401A14.md"))
Лог скрипта
Files scanning...
Recursive traversing Complete, total files: 2

Increment calculation...
Current master data saved
Deleted files: 1
Updated files: 0
New files: 0

Load docs...
New and updated docs: 0

Updating the vector store...
Current master data saved
Deleted files: 1
Loaded files: 0
CPU times: total: 31.2 ms
Wall time: 80.9 ms

True

Сценарий IV. Обновление документа

Добавим текста в D401A16.md и запустим скрипт.

До обновления:
title: Статья 16. Регламент фильтрации сенсорного восприятия и управления доступом
hash_name: 193226b849538bae28eae4e9e716dce5
mod_date: 2026-05-20 13:02:05.476812
size: 563
status: new
После обновления:
title: Статья 16. Регламент фильтрации сенсорного восприятия и управления доступом
hash_name: 193226b849538bae28eae4e9e716dce5
mod_date: 2026-05-20 13:14:33.882040
size: 1002
status: update

Видим, что время модификации, размер и статус изменились.

Лог скрипта
Files scanning...
Recursive traversing Complete, total files: 2

Increment calculation...
Current master data saved
Deleted files: 0
Updated files: 1
New files: 0

Load docs...
New and updated docs: 1

Updating the vector store...
Current master data saved
Deleted files: 0
Loaded files: 1
CPU times: total: 2.66 s
Wall time: 506 ms

True

Замечания по масштабированию

1. Для обхода папок файловой системы я предпочитаю использовать os.walk. Рекурсивная функция поможет, если вместо файловой системы будет облачное хранилище. Главное, чтобы клиент хранилища поддерживал функции list_dir, is_dir или is_file. Тогда изменения в коде рекурсивной функции будут минимальны. Проверено на NextCloud и YAD. Также потребуется скорректировать операции по загрузке документов.

2. Код работает только c текстами в формате txt или markdown. Парсинг и конвертация - отдельная большая тема. Потребуется дополнительный шаг в ETL-скрипте сразу после калькуляции дельты. Я рекомендую конвертировать новые и обновленные документы в markdown, а затем сохранять их в отдельную папку c хеш-именем вместо названия файла (это можно и нужно делать в несколько потоков). И уже из этой папки загружать на следующем шаге.

3. Функции не используют внешние переменные в основном скрипте, всё взаимодействие организовано через файловую систему, то есть первая функция сохраняет файл, следующая его считывает. Это сделано специально, чтобы можно было быстро построить направленный ациклический граф (DAG), и реализовать ETL в AirFlow.

4. Про чанкинг - я пометил комментариями соответствующие места в функции загрузки документов, где должен быть реализован этот механизм. Нужно учитывать, что каждый фрагмент должен иметь свой уникальный id, обычно это uuid4 из библиотеки uuid. Фрагменты связаны с документом через хеш-имя в метаданных. При обновлении фрагментов одного и того же документа у них будут другие id и векторы в итоге задублируются. Поэтому придётся добавить в процедуру обновления полное удаление фрагментов обновляемого документа.

5. Удалять фрагменты удобно через функцию фильтрации метаданных Qdrant (пример фильтрации есть в конце ноутбука). Добавлять новые можно по одному, если эмбеддинг работает на CPU, списком или батчами, если есть поддержка CUDA.

6. В коде ETL-скрипта не предусмотрена проверка дублирования документов. Если один и тот же документ записать в разные папки, обе копии будут загружены. Проблема может быть решена разными способами. Самый простой - проверять на дублирование прямо в ретривере. Вариант посложнее - отслеживать уникальность файлов при сканировании.

7. В реальной системе лучше использовать asyncio и асинхронные функции для работы с векторным хранилищем (aadd_documents, adelete). Класс QdrantVectorStore поддерживает асинхронное взаимодействие.


Инкрементальное обновление векторной базы по отдельным документам занимает около секунды на документ (у меня ~300 мс на чанк с эмбеддингом deepvk/USER-bge-m3 на CPU). Мы работаем только с инкрементом, поэтому экономим часы компьюта на парсинг и векторизацию всего массива документов.

При высокой динамике обновления (например, логи или сообщения в чате) существует ненулевая вероятность, что в момент обновления релевантные векторы будут отсутствовать в коллекции. Но технически ничего не сломается. Просто LLM выдаст некорректный ответ. Так как векторное хранилище «не зачищается», вопрос остановки сервиса переходит в категорию оценки качества ответа. Я знаю несколько проектов, где сервис не останавливают, а обновление проводят в периоды наименьшей нагрузки, запуская ежедневно по планировщику.

А как в ваших проектах устроено обновление RAG-системы? Останавливаете сервис?
Напишите пожалуйста в комментариях.

Полезные ссылки:
LangChain + Qdrant
Фильтрация в Qdrant
Концепция DAG в Airflow
Код туториала