
CompletableFuture
, который позволяет удобно писать асинхронный код.При использовании
CompletableFuture
из нескольких потоков я столкнулся с его неочевидным поведением, а именно с тем, что callbacks на нём могут выполнятся совсем не в тех потоках, как ожидалось. Об этом и о том, как мне удалось решить проблему — я и расскажу в этой статье.Мною разрабатывался асинхронный, неблокирующийся однопоточный клиент к серверу, который использовал потоконебезопасные структуры данных. Тесты проходили без проблем, но benchmarks иногда падали c
ConcurrentModificationException
на внутренних структурах однопоточного клиента.Асинхронность в клиенте реализовывалась с использованием
CompletableFuture
, все операции внутри клиента производились в одном потоке (далее в коде — singleThreadExecutor
).Фрагмент кода клиента с методом
get
, который доступен пользователям://ожидающие завершения запросы
private final Set<CompletableFuture> pendingFutures = Collections.newSetFromMap(new IdentityHashMap<>());
public CompletableFuture<String> get(String key) {
CompletableFuture<String> future = new CompletableFuture<>();
//передаём задачу на выполнение в поток клиента
singleThreadExecutor.execute(() -> {
//добавляем future в список ожидающих завершения
pendingFutures.add(future);
future.whenComplete((v, e) -> {
//когда future завершится удаляем его из списка ожидающих
pendingFutures.remove(future);
});
//тут был код передающий запрос на сервер и получающий ответ от сервера
//в конечном итоге код вызвал future.complete(data); в потоке этого singleThreadExecutor
});
return future;
}
Оказалось, что так делать нельзя.
Возможно, я узнал бы об этом раньше, если бы внимательно прочитал javadoc для
CompletableFuture
.Посмотреть javadoc
Actions supplied for dependent completions of non-async methods may be performed by the thread that completes the current CompletableFuture, or by any other caller of a completion method.
При использовании такой архитектуры необходимо, чтобы все callbacks у
CompletableFuture
вызывались в том же потоке, который делает CompletableFuture.complete
.По указанному выше коду, вроде бы, так и происходит. Но benchmarks иногда завершались с
ConcurrentModificationException
в коде, который перебирал pendingFutures
в том же потоке клиента (singleThreadExecutor
).Дело в том, что callback, передаваемый в
future.whenComplete
(который вызывает pendingFutures.remove
), иногда выполняется совсем в другом потоке. А точнее в потоке приложения, которое пользуется моим клиентом:Client client = new Client("127.0.0.1", 8080);
CompletableFuture<String> result = client.get(key);
result.thenAccept(data -> {
System.out.println(data);
});
Вызов
result.thenAccept
в этом приложении приводит иногда к вызову остальных callbacks на future, которые были добавлены внутри самого кода клиента.Рассмотрим проблему на простых примерах
Thread mainThread = Thread.currentThread();
CompletableFuture<Void> future = new CompletableFuture<>();
future.thenRun(() -> {
System.out.println(Thread.currentThread() == mainThread);
});
future.complete(null);
Такой код всегда выводит на экран
true
, так как callback выполняется в том же потоке, что и метод complete.Но если к
CompletableFuture
будет хотя бы одно обращение из другого потока, то поведение может измениться://основной поток
Thread mainThread = Thread.currentThread();
//создаём второй поток
Executor executor = Executors.newSingleThreadExecutor();
CompletableFuture<Void> future = new CompletableFuture<>();
future.thenRun(() -> {
System.out.println(Thread.currentThread() == mainThread)
});
//просто добавляем callback к тому же future в другом потоке
executor.execute(() -> {
future.thenRun(() -> {
//nop
});
});
//завершаем future
future.complete(null);
Такой код может иногда выдавать
false
.Дело в том, что вызов
thenRun
у того же future, но во втором потоке, может привести к срабатыванию callback в первом thenRun
. При этом callback первого thenRun
будет вызван во втором потоке.Это происходит в тот момент, когда
future.complete(null)
начало выполняться, но ещё не успело вызвать callbacks, а во втором потоке вызвался thenRun
, который и выполнит все остальные callbacks на этом future но уже в своём потоке.Проблем решается просто:
//основной поток
Thread mainThread = Thread.currentThread();
//создаём второй поток
Executor executor = Executors.newSingleThreadExecutor();
CompletableFuture<Void> future = new CompletableFuture<>();
CompletableFuture<Void> secondThreadFuture = future.thenRun(() -> {
System.out.println(Thread.currentThread() == mainThread);
});
//просто добавляем callback к тому же future в другом потоке
executor.execute(() -> {
secondThreadFuture.thenRun(() -> {
//nop
});
});
//завершаем future
future.complete(null);
Мы просто добавили secondThreadFuture, которая зависит от результата исходной future. И вызов на ней
thenRun
во втором потоке не приводит к возможному срабатыванию callbacks на исходной future.Для гарантированного вызова callbacks в заданных пользователем потоках у
CompletableFuture
существуют async реализации методов, например — thenRunAsync
, которым нужно передавать Executor. Но async-версии методов могут работать медленней, чем обычные. Поэтому, я не хотел лишний раз их использовать.Вывод
Вывод, который я сделал для себя: не использовать один объект
CompletableFuture
в нескольких потоках, если необходимо быть уверенным, что все callbacks на нём выполняются в заданном потоке. А если использовать несколько потоков с одним CompletableFuture необходимо — то достаточно передавать в другой поток не оригинальный CompletableFuture
, а новый, который будет зависеть от исходного. Например, вот так:CompletableFuture<Void> secondThreadFuture = firstThreadFuture.whenComplete((v, e) -> {
//nop
});