Представьте, что у вас есть многослойный пайплайн обработки данных.

  • Слой 0 — сырые события: цена тика, действие пользователя, показание датчика.

  • Слой 1 — агрегаты по инструментам: дельта, гамма, скор.

  • Слой 2 — агрегаты по секторам: риск на сектор, общая экспозиция.

  • Слой 3 — портфельные метрики: VaR, ожидаемая прибыль.

  • Слой 4 — enterprise-лимиты и алерты.

Ширина слоя — 5000 узлов. Количество слоёв — 60. Общее число узлов — 300 000.

Каждую секунду приходит 10 новых событий (изменений на входе). Наивный подход — пересчитать всё с нуля — будет перебирать все 300 000 узлов на каждое обновление. При 10 обновлениях в секунду это 3 млн вычислений узлов в секунду. А если ширина слоя 100 000 и слоёв 100? Получаем 10 млн узлов на пересчёт. Компьютер не справляется.

Классические подходы и их ограничения

Подход

Проблема

Полный пересчёт (recompute everything)

Экспоненциальный рост времени при увеличении графа

Триггеры в БД

Не работают для многослойных in-memory графов

Stream-процессоры (Flink, Kafka Streams)

Тяжёлые, не для in-memory иерархий

Кастомный кэш инвалидации

Сложно реализовать корректно, легко ошибиться

Требования к решению

  • Инкрементальный пересчёт: только затронутые узлы, а не все.

  • Минимальные аллокации в горячем пути.

  • Точные индексы для быстрых запросов (Fenwick, гистограммы, суммы).

  • Поддержка двух режимов обновления: SetValue (production) и Mutate (симуляции).

Решение: PhiFlow

PhiFlow — библиотека для .NET 8.0+, реализующая инкрементальные вычисления на слоистых графах фиксированной ширины.

dotnet add package PhiFlow --version 0.1.3

Ключевые идеи

  1. Interval Cone-of-Influence — для каждого обновления вычисляется минимальное множество затронутых узлов в виде непрерывных интервалов на каждом слое.

  2. Фиксированная ширина слоёв — упрощает индексацию и ускоряет доступ.

  3. Точные индексы (Fenwick, гистограммы, суммы) — без приближений и с вероятностными структурами вроде HyperLogLog.

  4. Два режима обновления — SetValue (замена значения) для бизнес-логики и Mutate (дельта) для симуляций.

Быстрый старт

Шаг 1. Создание графа

Определяем параметры:

  • width — количество узлов в каждом слое.

  • layers — количество слоёв.

  • domain — диапазон дискретных значений (0..DomainSize-1).

using PhiFlow;
int width = 5000;
int layers = 60;
int domain = 1024;
var rt = new PhiFlowRuntime(width, layers, domain);
// Рекомендуется: зарезервировать место под дельты
rt.Reserve(maxDeltaCount: 16);

Шаг 2. Подключение индексов

Индексы ускоряют запросы (CountGreater, RangeCount, Sum, TopKSum). Подключаются к нужному слою.

int lastLayer = layers - 1;
// Fenwick-индекс для быстрых CountGreater и RangeCount
rt.AttachIndex(lastLayer, new FenwickCountIndex(domain));
// Индекс для суммы значений
rt.AttachIndex(lastLayer, new SumIndex(width));
// Индекс для Top-K через гистограммы
rt.AttachIndex(lastLayer, new HistogramTopKIndex(domain, width));

Шаг 3. Инициализация входного слоя

Заполняем слой 0 начальными значениями.

var rnd = new Random(1);
int[] input = new int[width];
for (int i = 0; i < width; i++)
{
    input[i] = rnd.Next(domain);
}
rt.SetInput(input);
rt.BuildAll(kWork: 50); // полная сборка графа

Шаг 4. Применение обновлений

Вместо пересчёта всего графа передаём только изменившиеся входы.

var updates = new InputUpdate[]
{
    new InputUpdate(index: 10, value: 512),
    new InputUpdate(index: 123, value: 7),
    new InputUpdate(index: 2048, value: 999)
};
rt.ApplyInputUpdates(updates, kWork: 50);

Библиотека сама определяет, какие узлы затронуты (Interval Cone-of-Influence), и пересчитывает только их.

Шаг 5. Выполнение запросов

Благодаря индексам запросы выполняются мгновенно.

// Количество элементов на последнем слое > 500
long countGt = rt.CountGreater(lastLayer, threshold: 500);
// Количество элементов в диапазоне [100, 200)
long rangeCount = rt.RangeCount(lastLayer, loInclusive: 100, hiExclusive: 200);
// Сумма всех значений на слое
long sum = rt.Sum(lastLayer);
// Среднее значение
long avg = rt.Avg(lastLayer);
// Сумма топ-50 значений
long topKSum = rt.TopKSum(lastLayer, k: 50);

Полный рабочий пример

using PhiFlow;
public class RealTimeAnalyticsPipeline
{
    private readonly PhiFlowRuntime _runtime;
    private readonly int _lastLayer;
    
    public RealTimeAnalyticsPipeline(int width, int layers, int domain)
    {
        _runtime = new PhiFlowRuntime(width, layers, domain);
        _runtime.Reserve(maxDeltaCount: 32);
        _lastLayer = layers - 1;
        
        // Подключаем индексы для аналитики
        _runtime.AttachIndex(_lastLayer, new FenwickCountIndex(domain));
        _runtime.AttachIndex(_lastLayer, new SumIndex(width));
        _runtime.AttachIndex(_lastLayer, new HistogramTopKIndex(domain, width));
    }
    
    public void Initialize(int[] initialData)
    {
        _runtime.SetInput(initialData);
        _runtime.BuildAll(kWork: 50);
    }
    
    public void ProcessEvents(IEnumerable<InputUpdate> events)
    {
        var updates = events.ToArray();
        _runtime.ApplyInputUpdates(updates, kWork: 50);
    }
    
    public AnalyticsSnapshot GetSnapshot()
    {
        return new AnalyticsSnapshot
        {
            TotalCount = _runtime.Sum(_lastLayer),
            HighThresholdCount = _runtime.CountGreater(_lastLayer, 800),
            MidRangeCount = _runtime.RangeCount(_lastLayer, 200, 600),
            Top10Sum = _runtime.TopKSum(_lastLayer, 10)
        };
    }
}
public class AnalyticsSnapshot
{
    public long TotalCount { get; set; }
    public long HighThresholdCount { get; set; }
    public long MidRangeCount { get; set; }
    public long Top10Sum { get; set; }
}

Сценарии использования

1. FinTech: управление рисками

4 слоя: инструменты → сектора → портфель → enterprise-лимиты.

int width = 10000;  // 10k инструментов
int layers = 4;
int domain = 100000; // дискретные уровни экспозиции
var riskRuntime = new PhiFlowRuntime(width, layers, domain);
riskRuntime.AttachIndex(3, new FenwickCountIndex(domain)); // лимиты
// Пришло обновление цены на инструмент 42
var tickUpdate = new InputUpdate(42, newExposureValue);
riskRuntime.ApplyInputUpdates(new[] { tickUpdate }, kWork: 50);
// Мгновенный запрос: сколько секторов превысили лимит?
long breachedCount = riskRuntime.CountGreater(2, threshold: 10000);

2. GameDev: симуляция распространения влияния (эпидемия / социальные настроения)

6 слоёв: заражение в городе → риск для соседних городов → плотность заболевших → нагрузка на больницы → дефицит ресурсов → уровень паники.

int cities = 5000;
int layers = 6;
int domain = 100; // 0-100: процент заражённых / уровень паники
var epidemicRuntime = new PhiFlowRuntime(cities, layers, domain);
epidemicRuntime.AttachIndex(5, new SumIndex(cities));              // общий уровень паники
epidemicRuntime.AttachIndex(5, new HistogramTopKIndex(domain, cities)); // самые паникующие города
// Вирус мутировал в городе 123, заражённость выросла до 75%
epidemicRuntime.ApplyInputUpdates(new[] { new InputUpdate(123, 75) }, kWork: 50);
// Как изменился общий уровень паники по стране?
long totalPanic = epidemicRuntime.Sum(5); // сумма процентов паники по всем городам

3. IIoT: предиктивная аналитика

Датчики → станки → линии → заводы → регион.

int sensors = 20000;
int layers = 5;
int domain = 4096; // показания датчиков 0..4095
var iotRuntime = new PhiFlowRuntime(sensors, layers, domain);
iotRuntime.AttachIndex(4, new FenwickCountIndex(domain)); // алерты по регионам
// Датчик 5001 показал аномалию
var anomalyUpdate = new InputUpdate(5001, 3800);
iotRuntime.ApplyInputUpdates(new[] { anomalyUpdate }, kWork: 50);
// Сколько заводов в аномальной зоне?
long anomalousPlants = iotRuntime.CountGreater(3, threshold: 3500);

4. AdTech: real-time bidding

Импрессия → пользователь → сегмент → кампания → бюджет.

int users = 100000;
int layers = 5;
int domain = 100; // скор пользователя 0..99
var adRuntime = new PhiFlowRuntime(users, layers, domain);
adRuntime.AttachIndex(4, new FenwickCountIndex(domain)); // бюджетные лимиты
// Пользователь 42 совершил конверсию
var conversionUpdate = new InputUpdate(42, 95);
adRuntime.ApplyInputUpdates(new[] { conversionUpdate }, kWork: 50);
// Сколько сегментов превысили бюджетный порог?
long budgetBreached = adRuntime.CountGreater(3, threshold: 80);

Производительность

Бенчмарки на Intel Core i5-11400F, Windows 11, .NET 8.0, BenchmarkDotNet 0.15.8.

Параметры графа: ширина 5000, слоёв 60, домен 1024.

Сценарий

Без PhiFlow (полный пересчёт)

С PhiFlow

Ускорение

1 дельта, KWork=50

10.7 с

0.195 с

~55x

4 дельты, KWork=50

10.7 с

0.75 с

~14x

1 дельта, KWork=10

1.8 с

0.034 с

~53x

Что означают эти цифры:

  • При одном изменении на входе библиотека пересчитывает не все 300 000 узлов, а только интервал затронутых.

  • Чем меньше дельт относительно общего объёма графа, тем больше выигрыш.

  • Индексы добавляют ускорение для запросов: CountGreater, RangeCount, TopKSum выполняются за O(log domain) или O(1).

Сравнение с альтернативами

Характеристика

PhiFlow

Полный пересчёт

Stream processor (Flink)

ClickHouse

Инкрементальный пересчёт

✅ (interval cone)

Точные индексы

✅ (но не для per-event)

In-memory

❌ (Java/JVM)

❌ (диск)

Многослойные графы

✅ (родной)

Латентность на запрос

микросекунды

зависит

миллисекунды+

миллисекунды

Сложность внедрения

низкая

высокая

очень высокая

средняя

Пошаговая интеграция в проект

Шаг 1. Моделирование пайплайна

Определите, сколько у вас слоёв и какова ширина каждого. PhiFlow требует фиксированной ширины для всех слоёв — это упрощает индексацию.

Шаг 2. Выбор домена

Домен — это диапазон дискретных значений (0..DomainSize-1). Чем меньше домен, тем компактнее индексы Fenwick и гистограммы.

Шаг 3. Инициализация runtime

var runtime = new PhiFlowRuntime(width, layers, domain);
runtime.Reserve(maxDeltaCount: expectedUpdatesPerBatch);

Шаг 4. Подключение индексов к слоям, которые часто запрашиваются

if (needThresholdQueries)
    runtime.AttachIndex(layer, new FenwickCountIndex(domain));
if (needSumQueries)
    runtime.AttachIndex(layer, new SumIndex(width));
if (needTopKQueries)
    runtime.AttachIndex(layer, new HistogramTopKIndex(domain, width));

Шаг 5. Загрузка начальных данных

runtime.SetInput(initialData);
runtime.BuildAll(kWork: 50); // 50 — эвристика, подбирается под вашу топологию

Шаг 6. Приём обновлений

void OnInputChanged(int index, int newValue)
{
    var update = new InputUpdate(index, newValue);
    runtime.ApplyInputUpdates(new[] { update }, kWork: 50);
}

Шаг 7. Маршрутизация запросов через индексы

public long GetHighRiskCount(int threshold) =>
    runtime.CountGreater(riskLayer, threshold);

Два режима обновлений

SetValue (рекомендуется для production)

Заменяет значение узла на новое.

var update = new InputUpdate(index: 42, value: 512);
runtime.ApplyInputUpdates(new[] { update }, kWork: 50);

Mutation (для симуляций и тестирования)

Детерминированная мутация значения. Полезно, когда нужно воспроизвести последовательность изменений.

var mutation = new InputMutation(index: 42, delta: +5);
runtime.ApplyInputMutations(new[] { mutation }, kWork: 50);

Бесплатное тестирование — в рамках Community Edition. Коммерческое использование требует лицензии.

Где взять

NuGet: dotnet add package PhiFlow

GitHub (бенчмарки): https://github.com/likeslines-maker/PhiFlow

PhiFlow — это библиотека для инкрементальных вычислений на слоистых графах фиксированной ширины.

Она решает конкретную задачу: когда в многослойный пайплайн приходит небольшое количество обновлений, а вам нужно мгновенно получать точные агрегаты (CountGreater, RangeCount, Sum, TopK) на любом слое.

Библиотека не пытается заменить полноценные stream-процессоры или OLAP-базы. Она занимает свою нишу: in-memory, микросекундные латентности, точные индексы, минимальные аллокации.

Если ваш пайплайн из 60 слоёв пересчитывается за 10 секунд вместо 0.2 — возможно, вы просто считали не тем способом.