Как стать автором
Обновить
СИГМА
Разработка и внедрение ИТ-решений в энергетике/ЖКХ

Как одна приоритетная очередь спасла наш биллинг от кэш-хаоса

Уровень сложностиСредний
Время на прочтение6 мин
Количество просмотров1.3K

Привет! Меня зовут Дмитрий Бандурин, я заместитель директора департамента биллинговых решений в компании «СИГМА». Моя команда регулярно выполняет нетривиальные задачи для стабильной работы высоконагруженных систем. Сегодня расскажу, как мы переработали логику обработки пакетных процессов в нашей системе массовых операций, на примере расчета дебиторской задолженности. Нам было необходимо, чтобы она справлялась с возрастающим объемом данных — и всё это в жестких временных рамках и в условиях многопоточности.

Как устроен механизм массовых операций

В основе всех процессов лежит наша система массовых операций. Она состоит из двух ключевых компонентов:

  • Генератор — создаёт задачи на основе заданных условий.

  • Исполнитель — берёт задачи из очереди и выполняет их.

Из связок «генератор — исполнитель» можно строить структуры исполнения, представляющие собой не просто цепочки, а направленные ациклические графы (DAG). Такие графы позволяют описывать зависимости между задачами, в том числе иерархические и вложенные отношения.

Например, в расчётах возможно использование объектов, включающих в себя другие объекты: один энергообъект может содержать вложенные энергообъекты, каждый из которых, в свою очередь, имеет собственные энергообъекты. Путем рекурсивного раскрытия таких вложений можно получить все зависимости.

Интерфейс настройки массовых операций
Интерфейс настройки массовых операций

Типичный сценарий массовой операции расчёта дебиторской задолженности:

  1. Обновление балансов.

  2. Активация событий (например, перевод просроченной задолженности в другую категорию).

  3. Итоговый расчёт задолженности.

> Важно: переход к следующему шагу возможен только после завершения всех зависимостей.

В многопоточном режиме исполнительные потоки берут задачи из очереди и обрабатывают их параллельно. При этом если число задач превышает количество потоков, избыточные задачи помещаются в очередь и ожидают выполнения.


Первый подход: масштабирование и рост потоков

С увеличением числа договоров и регионов, включённых в расчёты, появились проблемы со временем выполнения. Рабочее окно, отведённое на выполнение массовых операций, сократилось с 8 часов до 4. Система перестала укладываться в эти рамки.

Первое решение, которое мы попробовали — горизонтальное масштабирование:

  • Увеличили количество потоков.

  • Увеличили количество вычислительных узлов.

Важно: расширить инфраструктуру базы данных мы не могли. Это потребовало бы значительных инвестиций со стороны заказчика и в лучшем случае мощности можно было бы докупить только через год. Поэтому всё внимание пришлось сосредоточить на оптимизации самого ПО.

Что в итоге?

  • Нагрузка на CPU на расчётных нодах выросла.

  • База данных работала под постоянной нагрузкой, равной практически 100%.

  • Расчёт стал проходить быстрее, но не кардинально — мы по-прежнему не укладывались в отведённые 4 часа.

Нагрузка на БД
Нагрузка на БД
Нагрузка на БД
Нагрузка на БД

Начинаем копать глубже

Так как основное узкое место наблюдалось на уровне базы данных, первым делом мы сосредоточились на оптимизации SQL-запросов. Провели детальное профилирование: запускали расчёты по каждому договору, анализировали планы выполнения и нагрузку на ресурсы. Результаты показали: запросы уже написаны максимально эффективно. Локально — всё работает отлично.

Пришлось абстрагироваться от конкретных задач и посмотреть на весь процесс целиком. Когда мы начали анализировать системные метрики, стало ясно: основная проблема не в самих операциях, а в контексте их исполнения. В логах БД фиксировались массовые кэш-миссы — данные, которые недавно использовались, к моменту повторного обращения уже вытеснялись из кэша, и СУБД была вынуждена повторно загружать их с диска.

Чтобы понять, в чём именно была проблема, рассмотрим упрощённый пример

Допустим, у нас в обработке находится 500 000 договоров, а система располагает 1000 потоков исполнения. Генератор создаёт 500 000 задач первого уровня — например, расчёт балансов по каждому договору. Потоки начинают выполнять по одной задаче: каждый берёт по договору и приступает к расчёту.

Оставшиеся 499 000 задач встают в очередь и ждут, пока какой-нибудь поток освободится.

Как только поток завершает расчёт баланса по одному договору, он вызывает генератор второго уровня, который создаёт задачи на активацию событий по этому договору. Эти задачи попадают в конец общей очереди.

Что происходит дальше? Прежде чем мы доберёмся хотя бы до первой задачи активации, система должна обработать все оставшиеся 499 000 задач по расчёту баланса. Причём договоры между собой никак не связаны.

Именно в этом месте мы сталкиваемся с проблемой вытеснения из кэша. Информация, полученная на первом этапе, может быть довольно объёмной — особенно если требуется, допустим, загрузить историю оплат за 10 лет. Пока очередь дойдёт до задачи активации событий, данные по договору уже вытесняются из кэша, и БД вынуждена будет запрашивать их заново, теперь уже из холодного хранилища. Это создаёт кэш-миссы и значительно увеличивает общее время обработки информации.


Переход к обходу вглубь

Разбираясь с проблемой кэш-миссов(Вытеснение нужной информации из кеша), мы пришли к пониманию, что это связано с обходом графа в ширину. Это заставило нас пересмотреть архитектуру самого обхода графа задач. Мы внедрили стратегию обхода вглубь: теперь, как только поток завершал расчёт первого уровня по договору, он не переходил к новому договору, а сразу запускал задачи следующего уровня — и так до конца всей цепочки. Только после полной обработки одной ветки исполнитель переходил к следующей.

Такой подход дал существенные преимущества:

  • Снизил число обращений к дисковому хранилищу за счёт удержания связанных данных в кэше.

  • Существенно уменьшил общее количество кэш-миссов.

  • Повысил предсказуемость и стабильность выполнения отдельных цепочек операций.


Когда обхода вглубь недостаточно

В системе есть крупные договоры, полный расчёт которых может занимать до 4 часов. Эти задачи сложно распараллелить, потому что они выполняются в рамках транзакции и включают сложную цепочку зависимостей, требующих строгого порядка исполнения. Чтобы такой договор успел пройти весь расчёт в пределах доступного окна, с ним нужно начинать работать одним из первых.

В первом приближении мы решили попробовать наивно реализовать обход в глубину. Вместо стандартной очереди FIFO (first-in, first-out) мы использовали LIFO (last-in, first-out) и сортировали договоры по ожидаемому времени исполнения, которое собрали по результатам предыдущих запусков. Задачи, связанные с самыми долгими договорами, помещались в очередь первыми.

Казалось, всё просто. Но на практике возникли неочевидные сложности.


Приоритетная очередь: управляем хаосом

Мы реализовали приоритетную очередь с учётом вложенности, веса и глубины задач.

Используем PriorityBlockingQueue из пакета java.util.concurrent:

  • Потокобезопасность.

  • Управление порядком исполнения.

this.order = TaskUtils
  .calculateOrder(parentTask != null ? parentTask.getOrder() 
                                     : jobInstance.getOrder(),
                  taskIndex, parentTask == null);

Построение порядка (order)

public static int[][] calculateOrder(final int[][] currentOrder, final int position, final boolean isNewLevel) {
    final int[][] calculatedOrder = new int[(isNewLevel || currentOrder.length == 0) ? currentOrder.length + 1 : currentOrder.length][];
    final int lastIndex = calculatedOrder.length - 1;
    for (int i = 0; i < lastIndex; i++) {
        calculatedOrder[i] = new int[currentOrder[i].length];
        System.arraycopy(currentOrder[i], 0, calculatedOrder[i], 0, currentOrder[i].length);
    }
    if (calculatedOrder.length == currentOrder.length) {
        calculatedOrder[lastIndex] = new int[currentOrder[lastIndex].length + 1];
        System.arraycopy(currentOrder[lastIndex], 0, calculatedOrder[lastIndex], 0, currentOrder[lastIndex].length);
    } else {
        calculatedOrder[lastIndex] = new int[1];
    }
    calculatedOrder[lastIndex][calculatedOrder[lastIndex].length - 1] = position;
    return calculatedOrder;
}

Компаратор приоритетов

public static int compare(@NotNull final int[][] o1, @NotNull final int[][] o2) {
    final int compareIndex = Math.min(o1.length, o2.length);
    for (int i = 0; i < compareIndex; i++) {
        final int compareSubIndex = Math.min(o1[i].length, o2[i].length);
        for (int subIndex = 0; subIndex < compareSubIndex; subIndex++) {
            final int compare = Integer.compare(o1[i][subIndex], o2[i][subIndex]);
            if (compare != 0) {
                return compare;
            }
        }
    }
    final int compare = Integer.compare(o1.length, o2.length);
    if (compare != 0) {
        return compare;
    } else {
        for (int i = 0; i < o1.length; i++) {
            final int compareInternal = Integer.compare(o1[i].length, o2[i].length);
            if (compareInternal != 0) {
                return compareInternal;
            }
        }
    }
    return compare;
}

Подведем итоги

После внедрения приоритетной очереди:

  • Тяжелые задачи запускаются первыми.

  • Лёгкие не перехватывают потоки.

  • Система укладывается в 3 часа вместо 8.

> Мораль: не всегда нужно больше железа или оптимизация отдельных элементов исполнения. Иногда достаточно правильно выстроить порядок задач.

Нагрузка на БД после модификаций
Нагрузка на БД после модификаций
Нагрузка на БД после модификаций
Нагрузка на БД после модификаций
Теги:
Хабы:
+13
Комментарии2

Публикации

Информация

Сайт
sigma-it.ru
Дата регистрации
Дата основания
Численность
1 001–5 000 человек
Местоположение
Россия