TL;DR

Volga — open-source движок обработки данных, созданный как альтернатива Apache Spark и Apache Flink и ориентированный на требования real-time AI/ML систем: консистентное вычисление фичей между online и offline режимами, point-in-time корректные агрегации, длинные скользящие окна, а также ML-ориентированные функции, такие как top- и категориальные агрегации.

В статье рассматриваются мотивация и история разработки, архитектура системы и её ключевые компоненты, а также проводится сравнение с ML-ориентированными решениями (Chronon, OpenMLDB) и универсальными стриминговыми движками (Apache Flink, Apache Spark, Arroyo).

Содержание

  • Вступление

  • История проекта

  • Volga и Rust

  • Архитектура системы

  • Streaming и Batch режимы: Dataflow-модель, инкрементальные вычисления, Data Shuffle

  • Request режим: разделение Read/Write вычислений и Queryable State

  • Оконные агрегации: Tiling и кастомные функции (top, cate, cate_where)

  • Удалённое хранение стейта: разделение compute и storage

  • Сравнение с существующими системами

  • Текущее состояние и дальнейшее развитие

Вступление

Привет, Хабр!

Меня зовут Андрей Новицкий, я занимаюсь созданием Volga — движка обработки данных для real-time AI/ML задач.

Я веду блог (и дублирую в ТГ - https://t.me/realtimeaiml), где подробно пишу про процесс разработки системы, архитектурные решения и эксперименты с производительностью. Эта статья — собранный в одном месте материал с дополнительным контекстом для читателей Хабра, где мы познакомимся с проектом, проблемами которые он решает и техническими деталями.


Современные real-time AI/ML-системы (рекомендательные системы, antifraud, персонализированный поиск, RAG-пайплайны) опираются на фичи (признаки), вычисляемые из потоков событий — действия пользователей, транзакции, логи и т.д. Эти фичи обычно задаются как агрегаты по недавним событиям — «число покупок за последний час» или «средний чек за последние 7 дней». На практике это означает непрерывные скользящие оконные агрегации, которые являются базовым строительным блоком многих ML пайплайнов.

Такие системы создают ряд сложных инфраструктурных задач: необходимо поддерживать консистентные определения фичей и семантику их вычисления между online и offline пайплайнами, правильно делать backfill, обеспечивать point-in-time корректные агрегации как для обучения, так и для инференса, а также эффективно считать длинные окна при высоких скоростях поступления данных.

В open-source экосистеме решений пока немного — универсальные движки, такие как Apache Flink или Apache Spark, позволяют реализовать подобные пайплайны, но требуют много дополнительной инфраструктуры (промежуточного хранилища, serving-слоёв, DSL для описания фичей, одновременной оркестрации и поддержки нескольких движков), а также нетривиальной настройки производительности. Специализированные системы вроде Chronon и OpenMLDB только начинают появляться, и каждая из них так же имеет свои архитектурные ограничения (которые мы рассмотрим ниже). Как результат, уже формируется отдельный рынок специализированных managed-решений: Tecton.ai (приобретена Databricks), Fennel.ai (также приобретена Databricks), а также Chalk.ai и Zipline.ai.

История проекта

Изначально Volga начиналась как Python-система, построенная поверх Ray — распределённого Python-рантайма с экосистемой для AI/ML-задач. Основной акцент был на быстром cоздании прототипа для валидации идеи и сбора обратной связи от потенциальных пользователей, а так же на стратегическом «закрытии» недостающей части в экосистеме Ray — потоковых вычислений (streaming).

DataFrame, граф вычислений и операторы реализовывались на Python, Ray предоставлял рантайм для их исполнения и оркестрации (через Ray Actors и KubeRay), а также брал на себя инфраструктурные задачи: peer-to-peer транспорт, RPC-вызовы, управление зависимостями, интеграция с Kubernetes и другие низкоуровневые детали.

Это позволило достаточно быстро собрать рабочую систему, проверить ключевые идеи и также сфокусироваться на удобном Python-центрированном API для описания вычислений (один из ключевых value proposition системы) - про исходную мотивацию проекта можно почитать здесь.

По мере развития проекта начали появляться более сложные компоненты:

В результате система демонстрировала хорошую горизонтальную масштабируемость и убедительные экспериментальные результаты:

Однако со временем стало очевидно, что даже при оптимизации отдельных компонентов построение полноценного аналога современных data processing систем — особенно для batch-вычислений — требует отказа от Python в пользу более производительного языка с подходящей экосистемой. В итоге таким языком стал Rust.

Volga и Rust

За последние несколько лет Rust стал одним из ключевых языков для построения систем обработки данных — это во многом связано с его сочетанием высокой производительности, строгой модели владения памятью и безопасной конкурентности без GC, что позволяет строить высокопроизводительные системы с предсказуемым поведением и хорошей утилизацией ресурсов.

На этом фоне сформировалась целая волна data-систем на Rust:

Текущая версия Volga реализована на Rust с использованием асинхронного runtime на базе Tokio, а также ключевых строительных блоков: Apache DataFusion, Apache Arrow и SlateDB

DataFusion

Apache DataFusion — это движок выполнения SQL-запросов на Rust, предоставляющий парсинг SQL, построение планов выполнения и векторизованный execution engine для построения систем обработки данных.

Одним из важных аспектов DataFusion является его роль в формировании подхода к composable data systems — когда различные компоненты (парсинг, планирование, execution, storage) могут комбинироваться и переиспользоваться при построении новых систем. Вокруг DataFusion уже сформировалась экосистема (например, Ballista, Delta-rs и другие проекты), которая позволяет использовать его как базовый строительный блок.

В Volga это позволило перейти от кастомного Python DataFrame API к чистому SQL (с возможностью расширения через собственный DataFrame API DataFusion), используя SQL Query Planner для построения единого dataflow-графа, который применяется во всех режимах выполнения.

Использование SQL даёт привычный способ описания как общих пайплайнов обработки данных, так и преобразований фичей в ML-задачах. Кроме того, Volga использует DataFusion для работы со схемами, вычисления выражений и вспомогательных операций — таких как хеширование, оконные выражения и абстракции агрегаций, — расширяя его при этом кастомными оконными функциями и аккумуляторами.

Arrow

Apache Arrow — это стандарт колонночного представления данных в памяти, используемый во всей системе. Volga обрабатывает все данные в виде Arrow RecordBatch, и Apache DataFusion использует Arrow как основной формат выполнения.

Ключевая идея Arrow — унифицированный, language-agnostic формат данных в памяти, который может эффективно использоваться разными системами без сериализации и копирования. По сути, это своего рода «ABI для данных» между компонентами системы.

Обработка данных в виде columnar-батчей позволяет операторам работать значительно эффективнее, сохраняя при этом стриминговую модель исполнения. В отличие от построчной (row-based) обработки, колонночный формат лучше использует CPU-кеши и позволяет выполнять операции сразу над массивами значений используя SIMD.

Многие операторы используют compute kernels — оптимизированные векторизованные функции, работающие с колонночными массивами. Это снижает накладные расходы на обработку каждой записи и даёт существенный прирост производительности.

SlateDB

Одной из ключевых целей Volga является разделение вычислений и хранения (compute–storage separation): состояние (стейт) операторов хранится вне execution-воркеров, что позволяет независимо масштабировать вычисления и хранение, а также более эффективно перемещать данные, существенно снижая общее потребление ресурсов.

В качестве слоя для хранения состояния используется SlateDB — встраиваемое key–value хранилище на базе LSM-дерева, рассчитанное на работу поверх облачного объектного хранилища (например, S3). Это делает его хорошо подходящим для потоковых нагрузок, где преобладают операции записи.

Такой подход отличается от классических решений в стриминговых движках. Например, в Apache Flink состояние операторов обычно хранится локально на воркерах (часто с использованием RocksDB). Это даёт хорошую производительность, но усложняет масштабирование и восстановление: стейт привязан к конкретным узлам и требует перемещения при рескейлинге или failover.

В Volga стейт хранилище изначально проектируется как удалённое, что позволяет независимо масштабировать compute и storage и избегать перемещения больших объёмов данных между воркерами. При этом SlateDB остаётся embedded: он работает непосредственно внутри процесса воркера Volga, а не как отдельный сервис, что позволяет избежать лишних сетевых задержек на пути доступа к хранилищу.

Архитектура

Volga представляет собой распределённую Dataflow-систему, состоящую из Control Plane и runtime-части, включающей Master-узлы, Worker-узлы и операторы, выполняемые в виде параллельных задач (Tasks).

Control plane

Control plane предоставляет API для создания, обновления и удаления пайплайнов (jobs), а также отвечает за взаимодействие с окружением выполнения кластера (например, Kubernetes или Ray) и управление деплоем и распределением ресурсов для пайплайнов.

Метаданные пайплайнов и состояние их жизненного цикла хранятся в SQL-совместимом хранилище (например, PostgreSQL), что позволяет сохранять их состояние между перезапусками.

Master и Workers

Каждый пайплайн запускается как кластер в модели Master–Workers.

Master управляет жизненным циклом пайплайна: компилирует execution graph, распределяет задачи по воркерам, собирает телеметрию и координирует checkpointing.

Workers исполняют dataflow-граф: запускают операторы, хранят стейт (если это требуется) и обмениваются данными, используя gRPC streaming для межузлового взаимодействия и каналы Tokio для локальной коммуникации.

Operators, Tasks, Parallelism и Partitioning

Пайплайн компилируется в dataflow-граф операторов (operators), каждый из которых представляет преобразование одного или нескольких потоков данных: map/filter, join, оконные агрегации, а также источники (source) и приёмники (sink) данных.

Операторы могут быть:

  • stateless (без стейта)

  • stateful (со стейтом, сохраняемым в хранилище)

Task — это физическая единица выполнения оператора. Каждый таск запускается в собственном Tokio runtime (обеспечивая изоляцию ресурсов) и исполняется асинхронным актором (с использованием Kameo - https://github.com/tqwewe/kameo), вместе с соответствующими ему I/O-задачами Tokio.

Операторы могут выполняться с настраиваемым уровнем параллелизма (parallelism) — это означает, что несколько тасок могут исполнять один и тот же оператор одновременно.

Данные распределяются между тасками с использованием стратегий partitioning:

  • Forward — передача один-к-одному, когда параллелизм upstream и downstream совпадает

  • Hash — гарантирует, что записи с одинаковым ключом попадают в одну таску (используется для окон и join’ов)

  • RoundRobin — равномерно распределяет записи между тасками

Во время выполнения Control Plane поднимает рантайм пайплайна, Master компилирует запрос в execution graph и распределяет по воркерам которые начинают обработку данных, а Master координирует телеметрию и checkpointing.

Volga поддерживает три режима выполнения пайплайнов: Streaming, Batch и Request

Streaming + Batch режимы

Как универсальный движок обработки данных, Volga поддерживает как streaming-, так и batch-режимы выполнения. В AI/ML-пайплайнах streaming-режим используется для вычисления и непрерывного обновления фичей, выступая основой для request-режима (описан ниже), тогда как batch-режим применяется для расчёта обучающих датасетов.

Volga использует dataflow-модель, в которой вычисления представлены в виде ориентированного графа операторов, обрабатывающих данные. Операторы поддерживают свой стейт и обмениваются записями внутри графа, в то время как watermarks распространяют логическое время (event-time) по пайплайну - это позволяет поддерживать как непрерывные streaming-вычисления, так и обработку ограниченных (bounded) batch-данных переиспользуя логику операторов, что гарантирует консистентность результатов.

Streaming режим: watermarks, out-of-orderness и stateful processing

В streaming-режиме операторы обрабатывают события в соответствии с их логическим временем event-time. Одной из ключевых проблем является логическая неупорядоченность: события могут приходить не в порядке своего event-time. Это может происходить как на стороне источников, так и внутри системы — например, из-за сетевых задержек или перераспределения данных между воркерами.

Для решения этой проблемы Volga использует watermarks — оценки прогресса event-time внутри системы. Когда watermark достигает оператора, это означает, что более ранние события уже не ожидаются, и оператор может безопасно генерировать результаты или продвигать оконные вычисления.

Volga генерирует watermarks на основе предположения об ограниченной неупорядоченности (bounded out-of-orderness), аналогично подходу в Apache Flink. События допускаются с задержкой относительно их event-time в пределах настраиваемого окна; всё, что выходит за этот предел, считается «поздними» событиями и на текущий момент отбрасывается.

Операторы, такие как окна и join’ы, являются stateful — они сохраняют промежуточные данные для корректного вычисления результата. Например, оконные агрегации хранят записи, относящиеся к текущему окну, а join’ы удерживают данные с обеих сторон до тех пор, пока не появятся соответствующие совпадения.

Fault tolerance, чекпоинты и exactly-once семантика

Volga обеспечивает отказоустойчивость с помощью распределённого checkpointing, основанного на алгоритме Chandy–Lamport.

Master периодически посылает запрос источникам вставлять специальные сообщения-метки (checkpoint barriers) в dataflow-граф — по мере их распространения, stateful-операторы сохраняют свой стейт, формируя консистентный снимок (checkpoint) всего графа. В случае сбоя воркера пайплайн восстанавливается из последнего checkpoint.

Если источники поддерживают точное воспроизведение данных (например, с использованием offset’ов в Apache Kafka), Volga гарантирует семантику exactly-once: даже если события повторно проигрываются при восстановлении, итоговый стейт и выходные данные будут идентичны выполнению без сбоев.

Инкрементальные вычисления, поддержка обновлений и Differential Dataflow

Текущая модель вычислений в Volga ориентирована на append-only данные — поток событий, в котором записи только добавляются, что хорошо подходит для многих сценариев (логи, транзакции и т.д.). Однако при работе с Change Data Capture (CDC) источниками это предположение нарушается: источник начинает генерировать сообщения об изменении данных (обновления и удаления), агрегаты перестают быть монотонными, ранее вычисленные результаты могут требовать пересчёта, а watermark больше не гарантирует, что «прошлое закрыто».

Changelog-потоки и retract (подход в духе Flink)

Один из путей — расширить текущую dataflow-модель до работы с changelog-потоками, как это реализовано в Apache Flink. В этом случае записи между операторами представляются не просто как события, а как изменения с типом (INSERT, UPDATE, DELETE или пары UPDATE_BEFORE / UPDATE_AFTER).

Этот подход требует чтобы каждый оператор поддерживал инкрементальное обновление стейта: умел применять изменения и при необходимости откатывать ранее учтённые значения (retract), тогда и результаты становятся changelog-потоком, а не append-only выводом. Такой подход позволяет сохранить текущую архитектуру с dataflow и watermarks, но усложняет реализацию: операторы должны хранить дополнительный стейт, а некоторые агрегации (например, top-k или min/max) требуют более сложной логики.

Дифференциальные вычисления (Differential Dataflow)

Альтернативный подход — использовать модель дифференциальных вычислений Differential Dataflow. Здесь данные представляются как поток изменений (diff’ов), где каждая запись имеет вес (например, +1 или -1), а обновления выражаются как комбинация удаления старого значения и добавления нового.

Вычисления формулируются через алгебру над весами: изменения складываются и компенсируются, а в операциях вроде join веса комбинируются (например, через умножение), что позволяет изменениям естественно распространяются по всему графу без отдельной логики retract в операторах.

Вместо event-time и watermarks используется более общая модель времени: каждая запись имеет логическую метку, а прогресс отслеживается через фронтиры (frontiers) — множество моментов времени, для которых ещё возможны обновления. В отличие от линейного watermark, это позволяет работать с частично упорядоченным временем.

В итоге стейт определяется накоплением изменений до заданного момента, а update/delete становятся естественной частью модели - такой подход более выразительный, но требует более глубокой инженерной доработки.

Batch режим

В batch-режиме тот же dataflow-граф выполняется над ограниченным набором данных. Пайплайн исполняется как последовательность стадий (stages), где каждая стадия обновляет стейт одного оператора, используя все данные, полученные на предыдущем этапе. Например:

sources (a+b) → join → window → sink

Стадии выполнения:

  • чтение всех данных источников и построение стейта join

  • эмиссия результатов join и обновление стейта окна

  • эмиссия результатов окна в sink

Каждая стадия сначала потребляет все входные данные, а затем отправляет специальный watermark конца входа (MAX_WATERMARK), который сигнализирует оператору завершить обработку стейта и выдать все оставшиеся результаты.

Использование одних и тех же операторов для streaming и batch обеспечивает консистентную семантику между режимами выполнения. Поддержка batch-режима в настоящий момент находится в стадии разработки.

Data Shuffle

Data Shuffle — это этап перераспределения по сети данных между задачами на разных физических воркерах (обычно по ключу), необходимый, например, перед агрегациями или join-операциями.

  • Streaming: сейчас data shuffle реализован через shared gRPC streaming каналы между воркерами (worker-to-worker), что близко к pipelined shuffle в Apache Flink. Проблема в том, что несколько downstream-задач могут делить один канал, и один медленный consumer может создавать backpressure для всех (head-of-line blocking). Возможные решения — добавить credit-based flow control (как во Flink) или перейти на отдельные gRPC-соединения на каждую пару задач (task-to-task), чтобы изолировать потребителей.

  • Batch: для batch-режима более подходящим выглядит отдельный shuffle-сервис с дисагрегацией compute и storage. Один из вариантов — S3-based (diskless) shuffle, где промежуточные данные материализуются в объектном хранилище и затем читаются downstream-задачами: похожие идеи используются в remote shuffle в Apache Spark (например, Apache Uniffle и Apache Celeborn), а также обсуждаются в экосистеме Apache Flink.

Request режим: разделение Read/Write вычислений и Queryable State

Request режим (режим выполнения входящих запросов) позволяет Volga напрямую отдавать результаты внешним системам, асинхронно читая стейт streaming-операторов и выполняя финальную агрегацию в момент запроса (например, для выдачи фичей при инференсе модели).

Дизайн основан на разделении вычислений на write-path и read-path: streaming-пайплайн непрерывно обновляет промежуточное состояние, а входящие запросы читают это состояние и вычисляют финальный результат - больше в блоге zipline.ai.

Типичная реализация предполагает материализацию промежуточного состояния (например, предагрегированных тайлов фиксированного размера) во внешнем hot storage (например, Redis или MongoDB). Поверх этого работает отдельный stateless serving-слой, который обрабатывает запросы, читая состояние и выполняя финальную агрегацию. Такой подход используется в системах вроде Chronon (на базе Apache Flink или Apache Spark с MongoDB) и в ранних версиях Volga.

Volga развивает эту идею, встраивая request-time вычисления прямо в dataflow-граф и работая напрямую со стейтом операторов. Входящие запросы рассматриваются как дополнительный поток данных и обрабатываются отдельным оператором, который читает стейт соседних streaming-операторов и выполняет финальную агрегацию. Обновления стрима и запросы обрабатываются асинхронно, что сохраняет разделение read/write, при этом позволяя избежать материализации во внешние системы и снизить задержку за счёт прямого доступа к стейту.

Для запроса

В обычном streaming или batch режиме планировщик строит стандартный dataflow-граф

В request mode этот граф дополняется несколькими компонентами

  • RequestSource — точка входа для внешних запросов (сервисный интерфейс, очередь запросов, ожидание ответа, использует axum)

  • WindowRequestOperator — обрабатывает запросы, читая стейт WindowOperator по ключу и выполняя агрегацию используя ту же логику, что и в streaming/batch режиме; при этом сам WindowOperator в Request режиме только обновляет свой стейт и не эмитит результаты

  • RequestSink — возвращает результат обратно в RequestSource, замыкая request–reply цикл

Такой дизайн позволяет переиспользовать одно и то же состояние и логику агрегаций во всех режимах (streaming, batch и request), сохраняя serving-путь полностью внутри движка.

Открытым остаётся вопрос обеспечения высокой доступности (high availability). В частности, при падении stateful-оператора — как система может продолжать обслуживать запросы (пусть и с немного устаревшими данными), пока streaming-пайплайн восстанавливается из чекпоинта и догоняет актуальное состояние, перечитывая данные из источника.

Оконные агрегации

Оконные агрегации в Volga выполняются специализированным оператором (WindowOperator), который поддерживает состояние окна отдельно для каждого ключа. Входной поток партиционируется по ключу (hash partitioning), и каждая партиция обрабатывается независимо, при этом окно непрерывно сдвигается по мере поступления новых данных.

По мере попадания строк в окно оператор обновляет агрегирующие аккумуляторы. Когда окно сдвигается и строки выходят за его границы, их вклад удаляется — если агрегат поддерживает retraction. В связи с этим Volga разделяет агрегаты на два типа: retractable (например avg, count, sum), которые поддерживают добавление и удаление значений и хранят только текущее состояние, и plain агрегаты (например min, max), которые не поддерживают retraction и поэтому хранят все элементы внутри окна, чтобы при необходимости пересчитать результат.

Эмиссия результатов управляется watermark’ами, которые отражают прогресс event-time и позволяют корректно обрабатывать out-of-order данные. По мере продвижения watermark оператор эмитит обновлённые результаты для тех частей окна, которые считаются завершёнными.

Кастомные функции для ML задач

Вдохновляясь OpenMLDB, помимо стандартных агрегатов (min, max, avg, sum, count), Volga предоставляет набор SQL-нативных оконных функций, ориентированных на типичные паттерны feature engineering в ML. Внутри эти функции реализованы поверх интерфейса User Defined Window Function (UDWF) из Apache DataFusion: каждая функция хранит своё состояние в виде window-аккумулятора, который инкрементально обновляется при добавлении и удалении строк из окна.

Top-подобные функции

Частый паттерн в feature engineering — определение доминирующих или наиболее частых значений в окне. Для этого Volga предоставляет встроенные функции:

  • top(col, k) — возвращает top-k значений

  • topn_frequency(col, k) — возвращает k самых частых значений вместе с их частотами

  • top1_ratio(col) — возвращает долю самого частого значения

Внутри аккумулятор хранит компактное состояние частот. Для topn_frequency это map частот + небольшой heap для извлечения top-k. Для top1_ratio хранится максимальная частота и общее число элементов в окне. Благодаря инкрементальному обновлению при входе/выходе строк из окна система избегает полного пересчёта статистик.

Аналогичный расчёт в Apache Flink SQL обычно требует многошагового запроса (оконная агрегация + ranking через ROW_NUMBER() и фильтрация). В Volga это выражается одной функцией, что упрощает описание фичей.

Агрегации по категориям

Ещё один распространённый паттерн — агрегации по категориям внутри окна. Volga поддерживает categorical-варианты стандартных агрегатов: sum_cate, count_cate, avg_cate, min_cate, max_cate, а также условные версии с where и cate_where. Это позволяет напрямую выражать запросы вроде «сумма по категориям» или «количество событий по категориям при выполнении условия» внутри оконной агрегации.

Реализация использует compute kernels из Apache Arrow для векторизованной фильтрации и группировки, а само состояние хранится в аккумуляторе DataFusion. Внутри используется hash map по значению категории, которая инкрементально обновляется при движении окна.

Оптимизация длинных окон и Tiling

Для AI/ML нагрузок критична поддержка длинных окон — от дней до месяцев или даже лет. В сочетании с непрерывным сдвигом окна и высокой частотой записей это приводит к высокой вычислительной нагрузке как на write-, так и на read- пути. Традиционные стриминговые системы вроде Apache Flink и Apache Spark не имеют встроенных оптимизаций для такого сценария.

Системы вроде Tecton и Chronon решают это через Tiling: на этапе записи вычисляются предагрегированные значения для фиксированных интервалов времени (tiles), а при запросе нужные тайлы комбинируются для получения результата. Это позволяет избежать сканирования большого объёма сырых данных.

Из документации Chronon
Из документации Chronon

Volga реализует этот подход нативно внутри window-оператора. Тайлы являются частью состояния окна и обновляются во время стриминга. И streaming, и request-запросы используют их для вычисления финального результата, что позволяет эффективно поддерживать длинные окна без внешнего промежуточного хранения.

Удаленное хранение стейта

Проектирование хранилища стейта операторов требует учёта ряда ограничений, чтобы система эффективно работала во всех режимах: streaming, batch и request.

Ключевые требования

  • Разделение Compute и Storage — стейт хранится удалённо, что позволяет независимо масштабировать compute и storage и не перемещать данные при checkpoint/restore. В системах вроде Apache Flink или Apache Spark, стейт часто физически хранится на машинах воркеров, что может приводить к неэффективному использованию ресурсов при больших размерах.

  • Разные паттерны доступа на чтение и запись

    • Batch mode — крупные одноразовые записи и последующее полное чтение стейта, взаимодействие с shuffle service.

    • Streaming mode — частые мелкие записи и последовательные синхронные чтения (пишет и читает один оператор).

    • Request mode — асинхронные чтения и записи из разных операторов.

  • Time-aware доступ к данным — данные приходят почти упорядоченно по event-time с возможными отклонениями; чтения должны всегда возвращать корректно отсортированное по времени представление для заданного окна.

  • Чтение по частям — стейт окна может включать сами ивенты, индексы и тайлы; для больших окон необходимо загружать только нужные данные (например, тайлы и минимальный набор ивентов).

  • Изоляция ресурсов — на одном воркере могут крутится множество логически отдельных тасок, поэтому буферы и кэши должны иметь лимиты и справедливо делить ресурсы между ними (чтобы одна жадная таска все не выела).

Ключевые компоненты

Входящие события сначала попадают в in-memory буфер WriteBuffer, который батчит обновления и асинхронно записывает их в удалённое персистентное KV-хранилище. Размер буфера настраивается и может быть отключён в batch-режиме.

Для эффективного доступа по времени, данные сортируются по event-time и разбиваются на фиксированные временные бакеты. Для каждого ключа поддерживается структура WindowState, содержащая индекс (BucketIndex), сопоставляющий бакеты с идентификаторами батчей в KV-хранилище.

Также в WindowState хранятся тайлы (tiles) — предагрегированные значения для фиксированных интервалов времени, используемые для эффективного вычисления длинных окон.

При чтении Volga сначала загружает WindowState, затем с помощью BucketIndex и тайлов определяет необходимые батчи для заданного временного диапазона и загружает только их. Поскольку батчи уже отсортированы, итоговое представление формируется через их слияне (merge), что вычислительно очень дёшево. Так же есть опция включить фоновую таску которая будет периодически объединять батчи внутри бакетов, уменьшая фрагментацию.

Компонент MemoryGovernor отслеживает использование памяти (буферы и кэши) на уровне задач и при необходимости инициирует backpressure или сброс данных из памяти в хранилище.

Персистентное хранилище

Персистентное хранилище реализовано с помощью SlateDB — embedded KV-хранилища, используещее LSM-дерево поверх объектного хранилища (например, S3) как durable-слой. Так же есть опция использования библиотеки Foyer для локального in-memory+disk кэширования - такой подход обеспечивает высокую пропускную способность записи и низкую задержку чтения.

Поскольку все данные в итоге делегируются объектному хранилищу, replication и durability обеспечиваются на уровне S3, что упрощает систему и избавляет от необходимости реализовывать собственный механизм репликации.

SlateDB работает с версионированными данными, что делает checkpoint дешёвым: вместо копирования состояния можно просто зафиксировать версию. Это также упрощает восстановление пайплайна по сравнению с Apache Flink, где часто требуется перенос по сети больших объёмов данных.

В результате получается storage-слой на базе LSM-дерева с merge-on-read, time-bucketed организацией данных поверх объектного хранилища, обеспечивающий масштабируемость, высокую пропускную способность и низкие задержки.

Открытые вопросы

Слой хранения всё ещё находится в разработке, и остаётся ряд открытых вопросов:

  • можно ли упростить двухшаговое чтение (state+index → batches), используя range scan’ы LSM-дерева?

  • как организовать concurrency control в request-mode, когда чтения пересекаются с фрагментированной записью и удалением данных из окна разными операторами

  • как перераспределять стейт при рескейлинге пайплайна, учитывая что каждый воркер имеет свой экземпляр SlateDB (single-writer модель)

  • где выполнять compaction — на уровне воркера, пайплайна или централизованно?

  • оптимально ли LSM-дерво для data shuffle сервиса в batch-нагрузке, где данные записываются один раз и читаются целиком?

Сравнение с существующими системами

Перечисленные ниже системы значительно более зрелые production-ready решения, в то время как Volga находится в разработке, поэтому сравнение фокусируется на архитектурных решениях, а не на полноте функциональности.

vs Flink / Arroyo

Все три системы — это streaming-движки на основе dataflow-модели с поддержкой event-time и watermark’ов.

Apache Flink и Arroyo — это general-purpose движки с широкой поддержкой SQL. Volga же в первую очередь фокусируется на непрерывных оконных агрегациях, которые доминируют в real-time ML пайплайнах (на данный момент у нас нет поддержки общего GroupBy, так же мы подразумеваем наличие времени как измерения в данных). Основноые отличия:

  • В отличие от event-by-event обработки в Apache Flink, Volga использует векторизованные вычисления на базе Apache Arrow, что позволяет существенно снизить накладные расходы и повысить пропускную способность, особенно в batch режиме.

  • Ещё одно ключевое отличие — архитектура хранилища стейта. Volga изначально проектируется с удалённым state (remote state storage), что позволяет реализовать разделение compute и storage.

  • Volga включает ML-ориентированные функции (top, cate, where), которые сложнее и менее естественно выразимы во Flink-подобном SQL диалекте (через ROW_NUMBER / RANK).

  • Volga также предоставляет нативный Request-режим, в котором внешние запросы напрямую обращаются к стейту операторов внутри dataflow графа. В архитектурах на базе Flink или Arroyo такой serving-слой обычно приходится реализовывать отдельно.

С архитектурной точки зрения Volga ближе всего к Arroyo: обе системы написаны на Rust и используют Apache DataFusion для SQL-планирования. Однако Arroyo опирается на встроенную реализацию окон в DataFusion (WindowAggExec), тогда как Volga реализует собственные window-операторы, оптимизированные под удаленной хранилище, а не под in-memory batch-модель DataFusion.

vs Chronon

Обе системы ориентированы на вычисление ML-фичей в реальном времени и используют модель разделения вычислений на write-path и read-path: streaming-пайплайн поддерживает промежуточное состояние, а запросы читают его для получения финального результата.

Основное отличие — композиция системы. Chronon строится как набор независимых под-систем (Flink, Spark, внешнее KV-хранилище вроде MongoDB или Redis и отдельный serving-слой). Volga, напротив, стремится быть единым standalone рантаймом на Rust, объединяя стриминг, хранение и serving внутри одной системы. При этом Chronon наследует ограничения управления streaming стейтом, характерные для Flink-подобных систем.

Ещё одно отличие — язык описания фичей: Volga использует SQL, тогда как Chronon опирается на собственный DSL.

vs OpenMLDB

OpenMLDB — это real-time база данных, оптимизированная под ML-запросы, где основная часть вычислений происходит в read-path. Volga, напротив, разделяет вычисления между write-path (streaming ingestion) и read-path (обработка запросов).

Ключевые архитектурные различия:

  • Рантайм

    • Volga: streaming ingestion (через нативные источники) + асинхронные request-запросы

    • OpenMLDB: bulk ingest (внешний процесс записи) + вычисления в момент запроса

  • Хранилище

    • Volga: удалённое state-хранилище на базе object storage (например, S3)

    • OpenMLDB: локальное хранение (in-memory + диск) рядом с compute

На практике обе системы могут достигать сопоставимой задержки для запросов, но делают разные архитектурные компромиссы.

Так как Volga использует watermark-based streaming, свежесть данных зависит от настройки watermark’ов: более агрессивные настройки уменьшают задержку, но увеличивают вероятность late events, которые в текущей реализации просто отбрасываются. Улучшение работы с late events — одно из направлений развития.

Durability также реализована по-разному: в Volga она обеспечивается через S3-backed стейт без необходимости собственного механизма репликации.

Дополнительно, системы отличаются языками реализации и поддержкой batch-режима:

  • Язык реализации: Volga (Rust) vs OpenMLDB (C++)

  • Batch execution: Volga имеет нативный batch-режим, OpenMLDB использует Apache Spark

Текущее состояние и дальнейшее развитие

Volga всё ещё находится в разработке, однако основные компоненты распределённого рантайма уже реализованы: control plane, master/workers, исполнение тасок, partitioning, планирование SQL-запросов, watermark’и и checkpointing. Поддерживаются streaming и request режимы, включая отдельный execution path для запросов.

В системе уже есть базовые операторы (Kafka/Parquet sources and sinks, projection/filtering, оконные агрегации), оптимизации для окон (tiling, ML-ориентированные агрегаты) и observability (Prometheus + API для програмного чтения метрик). Реализована базовая работа со стейтом (in-memory с заготовкой под remote storage), ведётся интеграция с Kubernetes.

Планы на ближайшее время:

  • завершить Remote Storage

  • сделать Batch Shuffle Service

  • завершить Batch execution mode

  • завершить Kubernetes integration

  • реализовать Join Operator (Hash и Broadcast)

  • реализовать Python Client

  • реализовать Backfills (через оркестрацию коннекторов)

  • добавить Dashboard и UI

Идей и задач больше — см. issues. Если вам интересно поучаствовать или обсудить проект, буду рад обратной связи - больше информации в блоге и ТГ (https://t.me/realtimeaiml).