Java

JDK9’s ForkJoinPool Upgrades

While everyone’s busy with modularity, local-variable-type-inference, and other Next Big Things of recent JDK releases, there’s a fairly small and important update for the ForkJoinPool that deserves some attention.

ForkJoinPool was an experiment brought to life by JDK 7 and attracted a lot of attention at that time – it’s main selling point was the implementation of the idea of work-stealing – simply put, free threads were able to steal tasks from worker queues of other busy threads within the same pool.

ForkJoinPool Configuration

Since the beginning, ForkJoinPool suffered from a lack of reasonable config options. The most generous constructor offered us only parameters such as:

  1. Parallelism level
  2. A custom ForkJoinWorkerThreadFactory
  3. A custom UncaughtExceptionHandler
  4. asyncMode
public ForkJoinPool(
int parallelism,
ForkJoinWorkerThreadFactory factory,
UncaughtExceptionHandler handler,
boolean asyncMode)

Some of you, that were a bit more intrusive, would discover that there’s one more private constructor available since JDK 8 which offers an additional, very useful parameter: the worker name prefix.

I must admit that it was very disappointing to see this one being private and not accessible using any legal means, but luckily there are other ways for achieving the same result.

JDK9

JDK9, however, brought a huge improvement – firstly, the implementation was rewritten using VarHandles, and we got a new, very generous constructor exposing additional configuration parameters such as:

public ForkJoinPool(
// ...
int corePoolSize,
int maximumPoolSize,
int minimumRunnable,
Predicate<? super ForkJoinPool> saturate,
long keepAliveTime, TimeUnit unit
)

Let’s see what do those give us.

int corePoolSize

This one is pretty self-explanatory:

The number of threads to keep in the pool.

Normally (and * by default) this is the same value as the parallelism level, * but may be set to a larger value to reduce dynamic overhead if * tasks regularly block.

Using a smaller value (for example 0) has the same effect as the default.

However, it’d be important to add that the maximum possible value is 32767.

int maximumPoolSize

Pretty self-explanatory as well. By default, 256 spare threads are allowed.

int minimumRunnable

It’s the first huge improvement that gives us an opportunity to ensure that there’s at least N usable threads in the pool – usable threads are those that aren’t blocked by a join() or a ManagedBlocker instance. When a number of free unblocked threads go below the provided value, new threads get spawned if maximumPoolSize allows it.

Setting the minimumRunnable to a larger value might ensure better throughput in the presence of blocking tasks for the cost of the increased overhead (remember to make sure that gains are bigger than costs).

If we know that our tasks won’t need any additional threads, we can go for 0.

Predicate<? super ForkJoinPool> saturate

If we end up in a situation when there’s an attempt made to spawn more threads in order to satisfy the minimumRunnable constraint, but it gets blocked by the maximumPoolSize, by default, RejectedExecutionException(“Thread limit exceeded replacing blocked worker”) is thrown.

But now, we can provide a Predicate that gets fired once such situation occurs, and eventually allow thread pool saturation by ignoring the minimumRunnable value.

It’s good to see that we have a choice now.

long keepAliveTime, TimeUnit unit

Just like with the classic ExecutorService, we can now specify how long unused threads should be kept alive before getting terminated.

Keep in mind that it applies only for threads spawned above the corePoolSize value.

Conclusion

JDK9 brought huge improvements for ForkJoinPool.

Unfortunately, we still can’t provide a custom worker name prefix easily, and cap the size of the worker queue, which is now capped at “1 << 24” – which is way too much than any reasonable value.

If you’re interested in seeing the raw diff, you can find it here.

Kotlin: Beware of Java Stream API Habits

Kotlin’s Collections API is extremely versatile and expressive but there’s one important thing to be aware of especially when migrating from Java.

Aching Design of Java Collections

In Java, although we could easily implement our own immutable data structures or simply use provided immutable views, using them on an everyday basis was challenging because of the Collections API design.

For example, imagine yourself implementing a new immutable data structure for Java and trying to implement the following methods:

boolean add(E e);
boolean remove(Object o);
void clear();
boolean removeAll(Collection<?> c);
// ...

The only reasonable way is to simply forbid using them:

public E set(int index, E element) {
throw new UnsupportedOperationException();
}
public void add(int index, E element) {
throw new UnsupportedOperationException();
}
public E remove(int index) {
throw new UnsupportedOperationException();
}
// and so on ...

And this shall remain like this forever as long as Java embraces backward compatibility (although last releases were not very good at this).

This is one of the reasons why the only reasonable way to enrich the API, was to introduce a new abstraction – the Stream API. Streams aren’t collections so the API could be designed from scratch.

Stream API Laziness

Besides the new fancy functional API, the major selling point of java.util.stream.Stream was laziness – which made them nearly as performant as standard imperative solutions and made it possible to work with potentially infinite sequences.

Consider the following example:

Optional<Integer> resultStream = list.stream()
.map(i -> i * 2)
.filter(i -> i > 1)
.findAny();

Initially, it might be tempting to think that the optimistic time complexity is O(2*N), while the imperative alternative provides O(1) while being less error-prone and less readable:

Optional<Integer> resultLoop = Optional.empty();
for (Integer i : list) {
Integer integer = i * 2;
if (integer > 1) {
resultLoop = Optional.of(integer);
break;
}
}

Even the huge improvement in code maintainability/readability wouldn’t be enough to justify the performance impact.

Luckily, thanks to lazy evaluation, both examples provide the optimistic time complexity of O(1) (the first encountered element matches) and pessimistic of O(N) (no elements match) – Streams simply take elements one by one and push them through the whole operation chain allowing short-circuiting as soon as the result is obtained.

A Trap of Kotlin’s Expressive Collections API

Kotlin’s Collections API is extremely versatile, expressive, and well-suited for working with immutable data structures – sounds great, right? But they surely can backfire if not used properly!

The great thing about collections in Kotlin is that they are immutable by default and feature a very rich functional API so we no longer need to turn to Stream API because our collections provide all this functionality already.

So, if we tried to recreate the example above, we could simply write:

val list = listOf(1, 2, 3, 4, 5)
list
.map(expensiveOperation())
.map(anotherExpensiveOperation())
.first()

Nice and clean but there’s a catch – let’s measure the execution time.

What do you think the result will be assuming that both expensiveOperation() and anotherExpensiveOperation() last exactly one second?

Kotlin’s collections are not lazy hence the operation took around 10 seconds which matches the optimistic time-complexity of O(2*N), where N = 5.

With great power comes great responsibility – an addition of every new map/flatMap/filter/… call, might negatively impact the time-complexity of your solution in certain scenarios because each of them simply creates a new instance of collection simply by iterating over the previous one.

Bringing Laziness Back

Good news, everyone!

Naturally, Kotlin provided a native way of achieving laziness in situations like above by introducing lazy sequences. We can convert every Iterable to a lazy Sequence, which is a Kotlin’s Stream alternative, by using the asSequence() method:

val list = listOf(1, 2, 3, 4, 5).asSequence()
list
.map(expensiveOperation())
.map(anotherExpensiveOperation())
.first()

If we measure the execution time now, we can breathe a sigh of relief because the result is around 2 seconds regardless of the initial sequence size which matches the time-complexity of O(1) (2 seconds because of two map() calls).

The above code snippets can be found on GitHub.

Key Takeaways

  • Kotlin Collections are not lazy in nature like Java Stream API is
  • If you want laziness, use asSequence()