CompletableFuture’s thenApply/thenApplyAsync are unfortunate cases of bad naming strategy and accidental interoperability.
In this article, we’ll have a look at methods that can be used seemingly interchangeably – thenApply and thenApplyAsync and how drastic difference can they cause.
CompletableFuture#thenApply/thenApplyAsync
Both methods can be used to execute a callback after the source CompletableFuture completes, both return new CompletableFuture instances and seem to be running asynchronously… so where does the difference in naming come from?
In action:
ExecutorService e = Executors.newSingleThreadExecutor( r -> new Thread(r, "dead-pool")); CompletableFuture<Integer> f = CompletableFuture .supplyAsync(() -> { System.out.println(Thread.currentThread().getName()); return 42; }, e); // dead-pool
Let’s now see what happens if we try to call thenApply():
ExecutorService e = Executors.newSingleThreadExecutor(r -> new Thread(r, "dead-pool")); CompletableFuture<Integer> f = CompletableFuture.supplyAsync(() -> { System.out.println(Thread.currentThread().getName()); return 42; }, e); CompletableFuture<String> f2 = f.thenApply(i -> { System.out.println(Thread.currentThread().getName()); return i.toString(); }); // dead-pool // main
As you can see, despite deriving a new CompletableFuture instance from the previous one, the callback seems to be executed on the client’s thread that called the thenApply method – which is the main thread in this case.
Let’s verify our hypothesis by simulating thread blockage:
CompletableFuture<String> f2 = f.thenApply(i -> { Thread.sleep(Integer.MAX_VALUE); // skipped try-catch for clarity System.out.println(Thread.currentThread().getName()); return i.toString(); }); System.out.println("Hello!"); // dead-pool (2147483647ms later...) // main // Hello!
As you can see, indeed, the main thread got blocked when processing a seemingly asynchronous callback.
Technically, the thread backing the whole family of thenApply methods is undefined which makes sense – imagine what thread should be used if the future was already completed before calling thenApply()? where would it get scheduled?
So, if a future completes before calling thenApply(), it will be run by a client thread, but if we manage to register thenApply() before the task finished, it will be executed by the same thread that completed the original future:
CompletableFuture.supplyAsync(() -> { // sleep 1s return 42; }).thenRun(() -> { System.out.println(Thread.currentThread().getName()); }).join(); // ForkJoinPool.commonPool-worker-3
However, we need to aware of that behaviour and make sure that we don’t end up with unsolicited blocking.
CompletableFuture#thenApplyAsync
Let’s now try the alternative method.
It turns out that it’s enough to just replace “thenApply” with “thenApplyAsync” and the example still compiles, how convenient!
ExecutorService e = Executors.newFixedThreadPool(20, r -> new Thread(r, "dead-pool")); CompletableFuture<Integer> f = CompletableFuture.supplyAsync(() -> { System.out.println(Thread.currentThread().getName()); return 42; }, e); CompletableFuture<String> f2 = f.thenApplyAsync(i -> { Thread.sleep(Integer.MAX_VALUE); // skipped try-catch for clarity System.out.println(Thread.currentThread().getName()); return i.toString(); }); System.out.println("Hello!"); // dead-pool // Hello! (2147483647ms later...) // ForkJoinPool.commonPool-worker-3
And indeed, this time we managed to execute the whole flow fully asynchronous. But pay attention to the last log, the callback was executed on the common ForkJoinPool, argh!
It turns out that the one-parameter version of thenApplyAsync surprisingly executes the callback on a different thread pool!
Let’s double-check the documentation:
Returns a new CompletionStage that, when this stage completes normally, is executed using this stage’s default asynchronous execution facility, with this stage’s result as the argument to the supplied function.
As you can see, there’s no mention about the shared ForkJoinPool but only a reference to the “default asynchronous execution facility” which turns out to be the one provided by CompletableFuture#defaultExecutor method, which can be either a common ForkJoinPool or a mysterious ThreadPerTaskExecutor which simply spins up a new thread for each task which sounds like an controversial idea:
/** * Default executor -- ForkJoinPool.commonPool() unless it cannot * support parallelism. */ private static final Executor ASYNC_POOL = USE_COMMON_POOL ? ForkJoinPool.commonPool() : new ThreadPerTaskExecutor(); static final class ThreadPerTaskExecutor implements Executor { public void execute(Runnable r) { new Thread(r).start(); } }
Luckily, we can supply our Executor instance to the thenApplyAsync method:
ExecutorService e = Executors.newFixedThreadPool(20, r -> new Thread(r, "dead-pool")); CompletableFuture<Integer> f = CompletableFuture.supplyAsync(() -> { System.out.println(Thread.currentThread().getName()); return 42; }, e); CompletableFuture<String> f2 = f.thenApplyAsync(i -> { Thread.sleep(Integer.MAX_VALUE); // skipped try-catch for clarity System.out.println(Thread.currentThread().getName()); return i.toString(); }, e); System.out.println("Hello!"); // dead-pool // Hello! (2147483647ms later...) // dead-pool
And finally, we managed to regain full control over our asynchronous processing flow and execute it on a thread pool of our choice.
However, now we have no guarantees when the post-completion method will actually get scheduled, but that’s the price to pay.
Disclaimer: I did not wait 2147483647ms for the operation to complete.
Key Takeaways
CompletableFuture’s thenApply/thenApplyAsync are unfortunate cases of bad naming strategy and accidental interoperability – exchanging one with the other we end up with code that compiles but executes on a different execution facility, potentially ending up with spurious asynchronicity.
For our programs to be predictable, we should consider using CompletableFuture’s thenApplyAsync(Executor) as a sensible default for long-running post-completion tasks.