Pull to refresh

Run ForkJoinTask via custom ForkJoinPool in CompletableFuture sauce

Reading time4 min
Views1.6K
And the sauce of CompletableFuture turns out to be "asynchronous".
And the sauce of CompletableFuture turns out to be "asynchronous".

In one of the tasks on the project, I had to wrap the ForkJoinTask collection in CompletableFuture for asynchronous execution and building data processing pipeline chains.

Is it working? Don't touch.


To run ForkJoinTask in custom ForkJoinPool with its own settings, managed to find such a solution:

CompletableFuture<?> cf = CompletableFuture.runAsync(forJoinTask::invoke, customForkJoinPool);

In such a design we get good advantages:

  1. Catching InterruptedException and ExecutionException exceptions when combining several futures with CompletableFuture.allOf(cf1, cf2).get(), which is not available when calling via ForkJoinPool::invokeAll and ForkJoinTask::invokeAll;

  2. The ability to create asynchronous pipeline chains for programming the behaviour of threads using a rich CompletableFuture contract;

  3. Ability to use the settings of your custom ForkJoinPool. In practice, I realised that it is necessary to at least make special names of ForkJoinWorkerThread for each of the services in the project. This is necessary when executing or debugging an application, it is clearly visible on behalf of which of the threads the messages/errors are coming from.

Below is the example of class with custom ThreadPool creation and submitting void CompletableFuture tasks:

@Slf4j
public abstract class CustomAsyncThreadPool {

    private final Set<CompletableFuture<?>> threadTasks = new HashSet<>();
    private final ForkJoinPool threadPool;

    /**
     * Creates a {@link ForkJoinPool} with the given parameters.
     *
     * @param logicTasksCount is the count of threads=tasks to be created and will be loaded by real existing tasks.
     *                        If logicTasksCount>availableProcessors then pool will be created based on {@link java.lang.Runtime#availableProcessors}.
     * @param asyncMode       if true,
     *                        establishes <em>"FIFO"<em/> (first-in-first-out) scheduling mode for forked
     *                        tasks that are never joined. This mode may be more appropriate
     *                        than default locally stack-based mode in applications in which
     *                        worker threads only process EVENT-STYLE asynchronous tasks.
     *                        For default value, use - {@code false} <em>"LIFO"<em/> (last-in-first-out).
     * @see <a href="https://stackoverflow.com/questions/5640046/what-is-forkjoinpool-async-mode">What is ASYNC?</a>
     */
    protected CustomAsyncThreadPool(int logicTasksCount, boolean asyncMode) {
        int availableProcessors = Runtime.getRuntime().availableProcessors();
        ForkJoinPool.ForkJoinWorkerThreadFactory workerFactory = this::initWorkerFactory;
        if (logicTasksCount < availableProcessors) {
            this.threadPool = new ForkJoinPool(logicTasksCount, workerFactory, null, asyncMode);
        } else {
            this.threadPool = new ForkJoinPool(availableProcessors, workerFactory, null, asyncMode);
        }
        log.info("Thread pool parallelism level - {}.", threadPool.getParallelism());
    }

    private ForkJoinWorkerThread initWorkerFactory(ForkJoinPool pool) {
        ForkJoinWorkerThread thread = ForkJoinPool.defaultForkJoinWorkerThreadFactory.newThread(pool);
        return setThreadName(thread);
    }

    protected abstract ForkJoinWorkerThread setThreadName(ForkJoinWorkerThread thread);

    public <V> void runForkJoinTasks(@NonNull Collection<ForkJoinTask<V>> tasks) {
        tasks.forEach(this::submitTask);
        finishTasks();
        threadPool.shutdown();
    }

    private <V> void submitTask(@NonNull ForkJoinTask<V> task) {
        CompletableFuture<?> cf = CompletableFuture.runAsync(task::invoke, threadPool);
        threadTasks.add(cf);
    }

    private void finishTasks() {
        try {
            CompletableFuture.allOf(threadTasks.toArray(new CompletableFuture[0]))
                    .whenComplete((v, ex) -> {
                        if (isToInterruptOnException() && ex != null) {
                            // already submitted tasks might not cancel, 
                            // but not submitted will be cancelled

                            threadTasks.forEach(f -> f.complete(null)); 
                            // complete(null) instead of cancel(true), 
                            // for not to throw CancellationException
                        }
                    })
                    .get();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new IllegalThreadStateException(e.getMessage());
        } catch (ExecutionException e) {
            rethrow(e.getCause());
        }
    }

    protected boolean isToInterruptOnException() {
        return false;
    }
}

Implementation of the abstract setThreadName method for specifying the worker thread name:

    @Override
    protected ForkJoinWorkerThread setThreadName(ForkJoinWorkerThread thread) {
        thread.setName("My-Best-Service-Thread-" + thread.getPoolIndex());
        return thread;
    }

It is important to note that CompletableFuture chains allow you to write the logic of actions with several stages. But you need to be careful with the methods. Some of them are blocking, and some are not. When blocking, the entire written chain will "materialise" to the place where the blocking method is called.

For example, such a sequence indicates completion with an exception thrown:

...
  .thenRunAsync(superRunnable)
  .completeExceptionally(CustomException);

Here, the thenRunAsync method is not blocking, but completeExceptionally represents a "dead-end branch for the train", which will lead to blocking execution at this point.


Taking the above code example in account, you can use the necessary settings of your custom ForkJoinPool and at the same time use the capabilities of CompletableFuture.

If you have what to say about the topic and want to give negative rating - please feel free to write your comments to understand your opinion. Otherwise it is not serious and it is not clear why you vote negatively.

And of course If you like this article, please vote UP - this will support me to write more such posts with real code examples!

Tags:
Hubs:
Rating0
Comments0

Articles