Комментарии 12
Как же вы переусложнили простейший код. И насажали багов с гонками. Гонка будет в некоторых ваших вариантах междуisNextDownloadTaskAvailable и nextTask
У вас есть два бизнес процесса котрые требуют ресурсы. Загрузка даных и обработка того что загрузили. И некая внешняя функция которая дает задания.
Ну так сделайте простейшие очереди. И два пула размеры которых можно покрутить и тонко все настроить.
Есть задача на загрузку? Ставим в очередь.
Есть загруженная задача? Обрабатываем.
Загрузили слишком много и не успеваем обработать? Ждем пока обработается.
Основной код
while(isNextDownloadTaskAvailable()) {
DownloadTask downloadTask = nextTask();
downloadExecutorService.submit(() -> {
DownloadProcessor downloadProcesor = downloadService.createProcessor(downloadTask);
Data data = downloadProcessor.getData();
processExecutorService.submit(() -> {
process(data);
});
});
}
Объявление экзекуторов. У меня копипаста с SO. В любом живом проекте такое куда-то в общие библиотеки всегда вытащено. Всем нужны блокирующие экзекуторы.
int DOWNLOAD_TREADS_COUNT = 10;
int PROCESS_TREADS_COUNT = 10;
BlockingQueue<Runnable> downloadQueue = new ArrayBlockingQueue<>(DOWNLOAD_TREADS_COUNT);
ThreadPoolExecutor downloadExecutorService =
new ThreadPoolExecutor(1, DOWNLOAD_TREADS_COUNT, 30, TimeUnit.SECONDS, downloadQueue);
downloadExecutorService.setRejectedExecutionHandler((r, executor) -> {
try {
executor.getQueue().put(r);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RejectedExecutionException("Producer interrupted", e);
}
});
BlockingQueue<Runnable> processQueue = new ArrayBlockingQueue<>(PROCESS_TREADS_COUNT);
ThreadPoolExecutor processExecutorService =
new ThreadPoolExecutor(1, PROCESS_TREADS_COUNT, 30, TimeUnit.SECONDS, processQueue);
processExecutorService.setRejectedExecutionHandler((r, executor) -> {
try {
executor.getQueue().put(r);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RejectedExecutionException("Producer interrupted", e);
}
});
Катим в прод или на нагрузочное тестирование и крутим размеры пулов так чтобы результат нас устраивал.
Загруженное, но необработанное надо где-то хранить... А память не резиновая. Поэтому схема именно такая.
Мне кажется минимизация количества потоков уже давно не актуальная задача. С тех пор как остались только 64-битные процессоры и операционные системы, даже на обычном современном ноутбуке легко создается сотни тысяч потоков, и этого достаточно для практических задач.
И вся эта возня с Future, CompletableFuture, пайплайнами callback-ов, тред пулами и перекладыванием данных из очереди в очередь ушла в прошлое.
Сейчас вполне можно просто создать по потоку на загрузку и пусть они выполняют все этапы последовательно
ExecutorService pool = Executors.newCachedThreadPool();
for (Task t : tasks) {
pool.submit(() -> {
Data data = Data.parse(t.download());
data.process();
});
}
А если хочется чего-то ограничить, то можно ипользовать старый добрый семафор
Semaphore processors = new Semaphore(Runtime.getAvailableProcessors());
ExecutorService pool = Executors.newCachedThreadPool();
for (Task t : tasks) {
pool.submit(() -> {
String raw = t.download();
processors.acquire();
try {
Data data = Data.parse(raw);
data.process();
} finally {
processors.release();
}
});
}
Перекладывание из очереди в очередь имеет смысл когда загрузка и обработка происходят на разных машинах, у которых нет общего локального стека.
А на одной машине все эти очереди, явные и неявные, через .then конструкции, просто все значительно усложняют без реальной на то причины.
С тех пор как остались только 64-битные процессоры и операционные системы, даже на обычном современном ноутбуке легко создается сотни тысяч потоков, и этого достаточно для практических задач.
Это зависит от того, какие у у потока накладные расходы. Например, .NET (по крайней мере, раньше) аллоцировал на каждый поток 4Mb под стек. Сотни тысяч потоков дают вам сотни гигабайт аллокации — не каждый "современный ноутбук" такое выдержит.
Ну и ладно, пусть будет 4 Мб, это же виртуальная память, от этого только растут записи в таблицах виртуальной памяти OS. Реально память выделяется по мере наполнения стека.
В 32-битных системах просто не хватало адресного пространства, даже виртуальной памяти нельзя было столько аллоцировать.
Но в 64-битных системах с этим нет проблем, адресное пространство большое.
Ну и ладно, пусть будет 4 Мб, это же виртуальная память, от этого только растут записи в таблицах виртуальной памяти OS. Реально память выделяется по мере наполнения стека.
Не в случае .NET. Там память именно "реально выделялась". Поэтому нехватка памяти когда потоков слишком много была реальной опасностью.
В 32-битных системах просто не хватало адресного пространства, даже виртуальной памяти нельзя было столько аллоцировать. [...] Но в 64-битных системах с этим нет проблем, адресное пространство большое.
На 32-битных системах .NET выделял по 1Мб на поток.
Если наш канал 1 Гбит и мы поставим на загрузку одновременно 10к задач в 10к потоков - то сначала мы задушим свой же CPU, RAM и сеть т.к. забьем память бесполезными тредами с высокими IO и будем загружать со средней скоростью в <100 кбит. А если больше 60к потоков будет - то еще и порты на хост машине забьем. Какие сотни тысяч потоков?
Если есть ограничения на количество соединений - нужно их ограничивать с помощью семафора в функции download.
В случае с пайплайнами и тредпулами основной высокоуровневый алгоритм, бизнес логика так сказать
скачать данные
распарсить
что-то обработать
оказывается разбит на мелкие мелкие кусочки, а чтобы связать эти кусочки воедино нужны промежуточные структуры Future, очереди и коллбэки. А из-за того что кусочки мелкие и находятся в разных местах кода - весь алгоритм целиком очень сложно понять, что-то в нем изменить, исправить, и очень легко всовершить ошибку. Как раз при передачи чего-то во вспомогательной структуре Future
В случае же если создавать под каждую задачу отдельный поток, общий алгоритм, бизнес логика, написан в одной функции и выполняется последовательно. Его проще понять и меньше шанс совершить ошибку. То что создается много потоков, это не проблема для современных машин. А количество сокетов и одновременных вычислений надо ограничивать, да.
Сдаётся мне, что первый алгоритм с названием "Конвейер в 100+8 потоков" логически неверный. Там сначала стартуем загрузку всех задач, а потом уже их обработку. Обычно мы не знаем, сколько задач и просто загружаем, пока не какой-то маркер <eof>. То есть, либо загрузку и обработку надо делать в том же цикле, либо через очередь - 100 потоков грузят и пишут задачи в очередь, а другие 8 потоков берут загруженные задачи из очередити обрабатывают их. И как уже писали выше, обычно размер очереди ограничивают.
Если действительно потребовалось выдавливать предельную многопоточную производительность, то возможно стоит посмотреть в сторону https://openjdk.java.net/projects/loom/ .
Если это кажется слишком сложным для вашей проблемы, то возможно она не стоит и предложенных усилий, и оригинальный вариант вполне неплох.
если всего 8 ядер на тачке, зачем 100 тредов? они же друг у друга будут ресурсы забирать и потеряете на их частые переключения
Делаем многопоточный конвейер