Как стать автором
Обновить

Делаем многопоточный конвейер

Время на прочтение5 мин
Количество просмотров6.4K

В последнее время я довольно часто сталкиваюсь с оптимизацией процессов загрузки и обработки информации. Периодически вижу не самые оптимальные, а то и вредные для производительности решения причем от разработчиков middle и senior уровня. Поэтому хотелось бы подробнее описать общий подход к написанию правильных и быстрых многопоточных архитектур с максимальной утилизацией ресурсов, но при этом с минимально возможным в каждом отдельном случае количеством потоков.

Что вообще можно ускорять с помощью многопоточной работы?

В первую очередь нужно вообще понять, возможно ли ускорить с помощью Thread Pool конкретно тот кусок кода или системы, с которым вы работаете.

Ответ “нет” если:

  • Процесс уже работает микросекунды/миллисекунды и выполняется быстрее, чем необходимо по требованиям/техзаданию

  • Утилизация основного ресурса, на который опирается алгоритм, уже достигает постоянных 100%

Теперь подробнее по каждому пункту “нет”.

Алгоритм уже укладывается в требования

Само по себе использование Thread Pools ведет к увеличению расхода памяти и ресурсов, поэтому если ваш алгоритм уже укладывается в SLA, то нет смысла все усложнять. Само по себе добавление потоков усложняет понимание кода, а также добавляет сразу пачку неочевидных потенциальных проблем и багов. Тут как никогда отлично применяются принципы KISS (Keep It Simple, Stupid) и YAGNI (You aren’t Gonna Need It).

Утилизация одного из ресурсов уже составляет постоянные 100%

О каких ресурсах идет речь? В первую очередь – CPU, Disk IO, Network. Это именно те ресурсы, которые можно забивать, увеличивая количество потоков вашего процесса. Таким образом, если один из ресурсов уже утилизирован по максимуму, то ничего ускорить мы не сможем. Другое дело, что современное железо сложно забить на 100% в один поток. Если это произошло, то стоит сперва обратить внимание на то, как вы их используете: возможно стоит рассмотреть, например, возможность замены случайного доступа к диску на последовательный. Далее будем говорить о системе в которой стоит CPU с 8 ядрами, для примера. Итак, вопрос. Почему мы говорим о каких-то еще ресурсах, если сами потоки работают на ядрах? Дело в том, что при работе со всеми другими ресурсами, кроме CPU, сами потоки очень часто находятся в состоянии ожидания новых данных, будь то чтение или сохранение данных на диск или загрузка чего-то по сети. Таким образом диспетчер задач операционной системы имеет достаточно времени CPU, чтобы запускать на наших 8 ядрах сотни потоков. 

Теперь возвращаемся к ресурсам и посмотрим пример. Предположим, наш канал 1 Гбит/сек, и мы парсим данные с удаленных серверов, средняя скорость которых 10 мбит.

Решение с одним потоком

//single thread
while (isNextDownloadTaskAvailable()) {
  DownloadProcessor downloadProcesor = downloadService.createProcessor(nextTask());
  Data data = downloadProcessor.getData();
  process(data);
}

Чтобы забить такой канал на максимум понадобится запустить 100 потоков. 100 * 10 Мбит/сек = 1 Гбит/сек.

Решение в 100 потоков

//100 threads
private static final ExecutorService executorService = Executors.fixedThreadPool(100);
while (isNextDownloadTaskAvailable()) {
  DownloadProcessor downloadProcesor = downloadService.createProcessor(nextTask());
  executorService.submit(() -> {
    Data data = downloadProcessor.getData();
    process(data);
  });
}

Забили канал. Mission Complete. Теперь посмотрим внимательнее. Решение имеет недостатки. 

В общем случае любое задаче требуются разные ресурсы в разных стадиях. Сначала нужно что-то откуда-то загрузить(нагружаем сеть или диск), потом это обработать (что обычно делается на CPU), а потом сохранить (опять сеть или диск).

Например, парсинг результатов загрузки занимает 1 секунду процессорного времени. В нашем случае произойдет следующее: в первой части программы весь executorService забьется каким-то количеством задач на загрузку. Если все они требуют примерно одинакового времени для работы, то через некоторый промежуток времени мы получим 100 потоков, в каждом из которых алгоритм дойдет до выполнения метода process. Итого: 100 потоков, которые хотят получить 1 секунду процессорного времени, а это значит 100 задач будут выполняться на 100 потоках уже после выполненной загрузки данных,100 / 8 ~= 13 секунд. И все эти 13 секунд сеть будет простаивать, так как все работающие потоки будут заниматься обработкой загруженных данных. В более сложном случае сами задачи будут требовать разного количества ресурсов. Часть задач могут загружаться 100 секунд и обрабатываться 10 мсек, другая часть задач может загружаться 10 мсек и требовать 100 сек работы ядра.

Конечно, в любом случае выигрыш в скорости работы перед обычным однопоточным приложением будет существенный, но ресурсы будут забиваться неравномерно и, в среднем, не на 100%. 

Что можно сделать? Разобьем нашу задачу на 2 подзадачи, которые утилизируют разные ресурсы, и добавим еще один пул потоков, который отдельно будет утилизировать CPU, не мешая при этом потокам занимающимся загрузкой данных. На 8 ядер нам понадобится ровно 8 CPU-heavy потоков чтоб утилизировать их на 100%.

Конвейер в 100+8 потоков

//100+8 threads
private static final ExecutorService downloadExecutorService = Executors.fixedThreadPool(100);
private static final ExecutorService processExecutorService = Executors.fixedThreadPool(8);
List<Future<Data>> futureDataResults = new ArrayList<>();
while (isNextDownloadTaskAvailable()) {
  DownloadProcessor downloadProcesor = downloadService.createProcessor(nextTask());
  futureDataResults.add(downloadExecutorService.submit(() -> downloadProcessor.getData()));
}

for (Future<Data> futureData : futureDataResults) {
  processExecutorService.submit(() -> process(futureData.get()));
}

Выглядит это лучше, хотя все еще не идеально. У такого решения есть один главный недостаток (хотя иногда может и преимущество): мы не можем запускать в обработку результат третьей загрузки, пока не запустим результат второй, и если вторая загрузка будет загружаться час, то второй пул будет простаивать очень долго без работы. Из плюсов: простой, почти последовательный алгоритм, который не требует особых знаний о том, как организовывать многопоточную работу и читается очень легко(опять же почти последовательный). И он еще лучше будет читаться если, например, вам нужно загружать данные из разных мест, а потом обрабатывать их вместе. Тогда по двум массивам можно будет пробежаться и взять элементы по одному и тому же индексу. Здесь есть несколько вариантов сделать это хотя бы так, чтоб кровь из глаз не пошла. С использованием очередей, CompletableFuture или с использованием паттерна Observer, когда одни задачи будут уведомлять другие о том, что результаты можно брать в обработку. В любом случае, все это подвиды колбэков, которые мы можем отдавать в нашу задачу, чтоб она выполняла переданный ей кусок кода, которому передаст результат выполнения. Этот кусок кода просто должен добавлять следующий этап задачи в следующий Thread Pool.

Конвейер в 100+8 потоков с коллбэками

while (isNextDownloadTaskAvailable()) {
   DownloadProcessor downloadProcessor = downloadService.createProcessor();
   DownloadTask downloadTask = new DownloadTask(downloadTask, data -> {
       CalculateProcessor calculateProcessor = calculateService.createProcessor(data);
       CalculateTask calculateTask = new CalculateTask(calculateProcessor, results::add);
       calculateExecutorService.submit(new CalculateTaskCallbackProxy(calculateTask));
   });
   downloadExecutorService.submit(downloadTaskCallbackProxy);
}

Конвейер в 100+8 потоков на CompletableFuture

while (isNextDownloadTaskAvailable()) {
   DownloadProcessor downloadProcessor = downloadService.createProcessor();
   DownloadTask downloadTask = new DownloadTask(downloadProcessor);
   CompletableFuture<DownloadData> downloadResultFuture = CompletableFuture
           .supplyAsync(downloadTask::call, downloadExecutorService);
   CompletableFuture<Long> calcResultFuture = downloadResultFuture
           .thenApplyAsync(data -> {
               CalculateProcessor calculateProcessor = calculateService.createProcessor(data);
               CalculateTask calculateTask = new CalculateTask(calculateProcessor);
               return calculateTask.call();
           }, calculateExecutorService);
}

Все запускаемые примеры, с которыми можно поиграться, доступны в репозитории.

Всякие подобные заметки я периодически записываю у себя в телеграм канале.

Теги:
Хабы:
Всего голосов 6: ↑1 и ↓5-4
Комментарии12

Публикации

Истории

Работа

Ближайшие события