Как стать автором
Обновить

Комментарии 12

Как же вы переусложнили простейший код. И насажали багов с гонками. Гонка будет в некоторых ваших вариантах междуisNextDownloadTaskAvailable и nextTask

У вас есть два бизнес процесса котрые требуют ресурсы. Загрузка даных и обработка того что загрузили. И некая внешняя функция которая дает задания.

Ну так сделайте простейшие очереди. И два пула размеры которых можно покрутить и тонко все настроить.

Есть задача на загрузку? Ставим в очередь.

Есть загруженная задача? Обрабатываем.

Загрузили слишком много и не успеваем обработать? Ждем пока обработается.

Основной код

while(isNextDownloadTaskAvailable()) {
    DownloadTask downloadTask = nextTask();
    downloadExecutorService.submit(() -> {
        DownloadProcessor downloadProcesor = downloadService.createProcessor(downloadTask);
        Data data = downloadProcessor.getData();
        processExecutorService.submit(() -> {
            process(data);
        });
    });
}

Объявление экзекуторов. У меня копипаста с SO. В любом живом проекте такое куда-то в общие библиотеки всегда вытащено. Всем нужны блокирующие экзекуторы.

int DOWNLOAD_TREADS_COUNT = 10;
int PROCESS_TREADS_COUNT = 10;

BlockingQueue<Runnable> downloadQueue = new ArrayBlockingQueue<>(DOWNLOAD_TREADS_COUNT);
ThreadPoolExecutor downloadExecutorService =
        new ThreadPoolExecutor(1, DOWNLOAD_TREADS_COUNT, 30, TimeUnit.SECONDS, downloadQueue);
downloadExecutorService.setRejectedExecutionHandler((r, executor) -> {
    try {
        executor.getQueue().put(r);
    } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
        throw new RejectedExecutionException("Producer interrupted", e);
    }
});

BlockingQueue<Runnable> processQueue = new ArrayBlockingQueue<>(PROCESS_TREADS_COUNT);
ThreadPoolExecutor processExecutorService =
        new ThreadPoolExecutor(1, PROCESS_TREADS_COUNT, 30, TimeUnit.SECONDS, processQueue);
processExecutorService.setRejectedExecutionHandler((r, executor) -> {
    try {
        executor.getQueue().put(r);
    } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
        throw new RejectedExecutionException("Producer interrupted", e);
    }
});

Катим в прод или на нагрузочное тестирование и крутим размеры пулов так чтобы результат нас устраивал.

Загруженное, но необработанное надо где-то хранить... А память не резиновая. Поэтому схема именно такая.

Мне кажется минимизация количества потоков уже давно не актуальная задача. С тех пор как остались только 64-битные процессоры и операционные системы, даже на обычном современном ноутбуке легко создается сотни тысяч потоков, и этого достаточно для практических задач.

И вся эта возня с Future, CompletableFuture, пайплайнами callback-ов, тред пулами и перекладыванием данных из очереди в очередь ушла в прошлое.

Сейчас вполне можно просто создать по потоку на загрузку и пусть они выполняют все этапы последовательно

ExecutorService pool = Executors.newCachedThreadPool();
for (Task t : tasks) {
		pool.submit(() -> {
    		Data data = Data.parse(t.download());
        data.process();
    });
}

А если хочется чего-то ограничить, то можно ипользовать старый добрый семафор

Semaphore processors = new Semaphore(Runtime.getAvailableProcessors());
ExecutorService pool = Executors.newCachedThreadPool();
for (Task t : tasks) {
    pool.submit(() -> {
       String raw = t.download();
       processors.acquire();
       try {
       		Data data = Data.parse(raw);
          data.process();
       } finally {
       		processors.release();
       }
    });
}

Перекладывание из очереди в очередь имеет смысл когда загрузка и обработка происходят на разных машинах, у которых нет общего локального стека.

А на одной машине все эти очереди, явные и неявные, через .then конструкции, просто все значительно усложняют без реальной на то причины.

С тех пор как остались только 64-битные процессоры и операционные системы, даже на обычном современном ноутбуке легко создается сотни тысяч потоков, и этого достаточно для практических задач.

Это зависит от того, какие у у потока накладные расходы. Например, .NET (по крайней мере, раньше) аллоцировал на каждый поток 4Mb под стек. Сотни тысяч потоков дают вам сотни гигабайт аллокации — не каждый "современный ноутбук" такое выдержит.

Ну и ладно, пусть будет 4 Мб, это же виртуальная память, от этого только растут записи в таблицах виртуальной памяти OS. Реально память выделяется по мере наполнения стека.

В 32-битных системах просто не хватало адресного пространства, даже виртуальной памяти нельзя было столько аллоцировать.

Но в 64-битных системах с этим нет проблем, адресное пространство большое.

Ну и ладно, пусть будет 4 Мб, это же виртуальная память, от этого только растут записи в таблицах виртуальной памяти OS. Реально память выделяется по мере наполнения стека.

Не в случае .NET. Там память именно "реально выделялась". Поэтому нехватка памяти когда потоков слишком много была реальной опасностью.


В 32-битных системах просто не хватало адресного пространства, даже виртуальной памяти нельзя было столько аллоцировать. [...] Но в 64-битных системах с этим нет проблем, адресное пространство большое.

На 32-битных системах .NET выделял по 1Мб на поток.

Если наш канал 1 Гбит и мы поставим на загрузку одновременно 10к задач в 10к потоков - то сначала мы задушим свой же CPU, RAM и сеть т.к. забьем память бесполезными тредами с высокими IO и будем загружать со средней скоростью в <100 кбит. А если больше 60к потоков будет - то еще и порты на хост машине забьем. Какие сотни тысяч потоков?

Если есть ограничения на количество соединений - нужно их ограничивать с помощью семафора в функции download.

В случае с пайплайнами и тредпулами основной высокоуровневый алгоритм, бизнес логика так сказать

  • скачать данные

  • распарсить

  • что-то обработать

оказывается разбит на мелкие мелкие кусочки, а чтобы связать эти кусочки воедино нужны промежуточные структуры Future, очереди и коллбэки. А из-за того что кусочки мелкие и находятся в разных местах кода - весь алгоритм целиком очень сложно понять, что-то в нем изменить, исправить, и очень легко всовершить ошибку. Как раз при передачи чего-то во вспомогательной структуре Future

В случае же если создавать под каждую задачу отдельный поток, общий алгоритм, бизнес логика, написан в одной функции и выполняется последовательно. Его проще понять и меньше шанс совершить ошибку. То что создается много потоков, это не проблема для современных машин. А количество сокетов и одновременных вычислений надо ограничивать, да.

Сдаётся мне, что первый алгоритм с названием "Конвейер в 100+8 потоков" логически неверный. Там сначала стартуем загрузку всех задач, а потом уже их обработку. Обычно мы не знаем, сколько задач и просто загружаем, пока не какой-то маркер <eof>. То есть, либо загрузку и обработку надо делать в том же цикле, либо через очередь - 100 потоков грузят и пишут задачи в очередь, а другие 8 потоков берут загруженные задачи из очередити обрабатывают их. И как уже писали выше, обычно размер очереди ограничивают.

там сначала наполняется очередь, а потом разгребаются результаты, конечно - это палец, высосанный из примера, и реальные кейсы могут немного отличаться по эндпоинту поступления задач и по характеру агрегации результата. И, конечно, там есть минусы и они сразу после куска кода подписаны :)

Если действительно потребовалось выдавливать предельную многопоточную производительность, то возможно стоит посмотреть в сторону https://openjdk.java.net/projects/loom/ .

Если это кажется слишком сложным для вашей проблемы, то возможно она не стоит и предложенных усилий, и оригинальный вариант вполне неплох.

Для pipe в .net есть прекрасная библиотека TPL, с помощью которой можно довольно гибко регулировать нагрузку на железо.

если всего 8 ядер на тачке, зачем 100 тредов? они же друг у друга будут ресурсы забирать и потеряете на их частые переключения

Зарегистрируйтесь на Хабре, чтобы оставить комментарий

Публикации

Истории