Press "Enter" to skip to content

Improving CompletableFuture#allOf/anyOf API Java Methods

CompletableFuture is one of my Java 8 additions to the language.

However, there are two methods whose design keeps mind-boggling me:

  • CompletableFuture#allOf
  • CompletableFuture#anyOf

In this article, we’ll see what’s wrong with them and how to reimplement them to make them more convenient for common day-to-day CompletableFuture manipulations.

CompletableFuture#allOf

Let’s start by inspecting the method signature:

public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) {
    // ...
}

There are at least two controversial design choices to be spotted here:

  1. The method accepts multiple CompletableFuture objects returning objects of different types
  2. The method returns a CompletableFuture returning an object of type Void

Additionally, some people might complain about the presence of varargs in the parameter list so let’s tackle this part as well.

CompletableFuture<Void> are often used as signaling devices, however, by applying a small change to the API, this method could be used both as a signaling device and as a carrier of results of all completed futures, so let’s try to achieve that.

Asynchronous CompletableFuture#allOf

Firstly, let’s come up with the desired signature.

It’s fair to assume that most cases would involve processing a list of homogenous futures and returning a future containing a list of results, hence:

public static <T> CompletableFuture<List<T>> allOf(
  Collection<CompletableFuture<T>> futures) {
    // ...
}

Internally, the original method is most likely more complex than you’d expect:

static CompletableFuture<Void> andTree(
  CompletableFuture<?>[] cfs, int lo, int hi) {
    CompletableFuture<Void> d = new CompletableFuture<Void>();
    if (lo > hi) // empty
        d.result = NIL;
    else {
        CompletableFuture<?> a, b;
        int mid = (lo + hi) >>> 1;
        if ((a = (lo == mid ? cfs[lo] :
                  andTree(cfs, lo, mid))) == null ||
            (b = (lo == hi ? a : (hi == mid+1) ? cfs[hi] :
                  andTree(cfs, mid+1, hi)))  == null)
            throw new NullPointerException();
        if (!d.biRelay(a, b)) {
            BiRelay<?,?> c = new BiRelay<>(d, a, b);
            a.bipush(b, c);
            c.tryFire(SYNC);
        }
    }
    return d;
}

So, instead of recreating it, let’s try to reuse what we have already by using the original method just like it was designed to be used – as completion signaller… and then simply swapping the void result with the list of futures:

CompletableFuture<List<CompletableFuture<T>>> i = futures.stream()
    .collect(collectingAndThen(
      toList(), 
      l -> CompletableFuture.allOf(l.toArray(new CompletableFuture[0]))
        .thenApply(__ -> l)));

So far so good, we managed to receive CompletableFuture<List<CompletableFuture<T>>> instead of CompletableFuture<Void> which is already an improvement, but we no longer need a list of futures with results, but a list of results instead.

Now, we can simply process the list and strip it from unwanted futures. It’s perfectly fine to call CompletableFuture#join methods because we know that they will never block (all futures are already completed at that time):

CompletableFuture<List<T>> result = intermediate
    .thenApply(list -> list.stream()
        .map(CompletableFuture::join)
        .collect(toList()));

And now, let’s combine it into a final solution:

public static <T> CompletableFuture<List<T>> allOf(
  Collection<CompletableFuture<T>> futures) {
    return futures.stream()
        .collect(collectingAndThen(
          toList(), 
          l -> CompletableFuture.allOf(l.toArray(new CompletableFuture[0]))
        .thenApply(__ -> l.stream()
           .map(CompletableFuture::join)
           .collect(Collectors.toList()))));
}

Asynchronous and Short-circuiting CompletableFuture#allOf

In the presence of exceptions, the original CompletableFuture#allOf waits for all remaining operations to complete.

Instead, if we wanted to signal completion as soon as one of the operations complete exceptionally, we would need to change the implementation.

In order to do that, we need to create a new CompletableFuture instance and complete it manually as soon as one of the operations raises an exception:

CompletableFuture<List<T>> result = new CompletableFuture<>();
futures.forEach(f -> f
  .handle((__, ex) -> ex == null || result.completeExceptionally(ex)));

…but then we’d need to handle the scenario where all the futures complete successfully. This can be done easily but reusing the method written in the section above:

allOf(futures).handle((r, ex) -> ex != null 
  ? result.completeExceptionally(ex) 
  : result.complete(r));

And then, we can combine above snippets to form the final solution:

public static <T> CompletableFuture<List<T>> 
  allOfShortcircuiting(Collection<CompletableFuture<T>> futures) {
    
    CompletableFuture<List<T>> result = new CompletableFuture<>();
    futures.forEach(f -> f
      .handle((__, ex) -> ex == null || result.completeExceptionally(ex)));
    
    allOf(futures).handle((r, ex) -> ex != null 
      ? result.completeExceptionally(ex) 
      : result.complete(r));
    
    return result;
}

CompletableFuture#anyOf

Once again, let’s start by inspecting the method signature:

public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs) {
    // ...
}

We can immediately spot similar issues like with the method discussed above:

  1. The method accepts multiple CompletableFuture objects holding objects of different types
  2. The method returns a CompletableFuture holding an object of type Object

As much as I understand the design decision for the CompletableFuture#allOf to be used as a signaling device, CompletableFuture#anyOf doesn’t conform to that philosophy by returning CompletableFuture<Object> which is even more confusing.

Imagine the following example where I’m trying to process futures holding data of arbitrary types:

CompletableFuture<Integer> f1 = CompletableFuture.completedFuture(1);
CompletableFuture<String> f2 = CompletableFuture.completedFuture("2");

Integer result = CompletableFuture.anyOf(f1, f2)
  .thenApply(r -> {
      if (r instanceof Integer) {
          return (Integer) r;
      } else if (r instanceof String) {
          return Integer.valueOf((String) r);
      }
      throw new IllegalStateException("unexpected object type!");
  }).join();

Quite inconvenient, isn’t it?

Luckily, this one is fairly easy to tailor for a more common scenario (waiting for one future out of many futures holding values of the same type) by changing the signature and introducing straightforward casting in the strategical spot.

So, within our improved signatures, we can reuse existing methods and safely cast the result:

public static <T> CompletableFuture<T> anyOf(List<CompletableFuture<T>> cfs) {
    return CompletableFuture.anyOf(cfs.toArray(new CompletableFuture[0]))
      .thenApply(o -> (T) o);
}
public static <T> CompletableFuture<T> anyOf(CompletableFuture<T>... cfs) {
    return CompletableFuture.anyOf(cfs).thenApply(o -> (T) o);
}

Sources

The copy-pasteable utility class can be found on Github.




If you enjoyed the content, consider supporting the site:

Support the siteSupport the site