In the previous articles, we’ve seen how to apply parallel processing to our collections using high-level tools like CompletableFuture and Stream API which allows us to bypass problems introduced by Parallel Streams:
Parallel Collection Processing: With Parallel Collectors (3/3)
Parallel Collection Processing: Without Parallel Streams (1/3)
In this article, we’ll dive into parallel-collectors – my library that’s supposed to consolidate ideas from previous articles into a set of public APIs.
Overview
Simply put, parallel-collectors is a zero-dependency toolkit easing parallel collection processing in Java using Stream API and CompletableFuture allowing you to choose your own parallelization strategy, maximum parallelism, executor, and much more.
All of the above and more is provided via custom implementations of the Collector interface:
list.stream() .collect(parallel(i -> foo(i), toList(), executor, parallelism)) .orTimeout(1000, MILLISECONDS) .thenAcceptAsync(System.out::println, otherExecutor) .thenRun(() -> System.out.println("Finished!"));
In order to be able to start using it, add the following dependency:
<dependency> <groupId>com.pivovarit</groupId> <artifactId>parallel-collectors</artifactId> <version>2.3.0</version> </dependency>
Double-check the latest version here.
Features
The library leverages the combined power of Stream API and CompletableFuture to provide missing functionality for standard Parallel Streams.
They are:
- non-invasive – the functionality is provided using custom implementations of public APIs
- lightweight – if your goal is to apply straightforward parallel processing, it’s the sweet spot between Parallel Streams and heavy-armor technologies like RxJava/Project Reactor
- configurable – choose your own Executor, parallelism, batching/completion strategies
- non-blocking – results can be returned as CompletableFutures in order to not block the calling thread
- short-circuiting – if one of the subtasks throws an exception, remaining subtasks get interrupted and/or skipped
Parallel Collectors are unopinionated by design so it’s up to their users to use them responsibly, which involves things like:
- proper configuration of a provided
Executor
and its lifecycle management - choosing the appropriate parallelism level
- making sure that the tool is applied in the right context
Make sure to read API documentation before using these in production.
API Overview
The com.pivovarit.collectors.ParallelCollectors
class serves as a single entry point to the whole library mimicking the java.util.stream.Collectors
class.
So, a classic straightforward parallel stream usage:
List<Integer> result = list.stream() .parallel() .map(i -> foo(i)) .collect(toList());
Ends up being replaced by:
private static ExecutorService e = ... // ... CompletableFuture<List<Integer>> result = list.stream() .collect(parallel(i -> foo(i), toList(), e, 4));
The library features four types of collectors:
ParallelCollectors.parallel()
– process the stream, collect it using a provided Collector, and receive the result as CompletableFutureProcessCollectors.parallelToStream()
– process the stream and stream the results in the completion orderParallelCollectors.parallelToOrderedStream()
– process the stream and stream the results in the original orderParallelCollectors.toFuture()
– collect a stream of CompletableFutures using a custom Collector
By default, all upstream elements are processed separately, if you want to process them in batches instead, use ParallelCollectors.Batching
namespace that contains drop-in alternatives (less stress on the executor in exchange for the lack of work-stealing).
More examples can be found on GitHub in README.
Good Practices
- Consider providing reasonable timeouts for
CompletableFuture
s in order to not block for unreasonably long in the case when something bad happens (how-to) - Name your thread pools – it makes debugging easier (how-to)
- Limit the size of a working queue of your thread pool (source)
- Always limit the max parallelism unless you know what you are doing (source)
- Remember to shut-down no-longer-used
ExecutorService
instances to allow reclamation of resources CompletableFuture#then(Apply|Combine|Consume|Run|Accept)
in some scenarios might be executed by the calling thread. If this is not suitable, useCompletableFuture#then(Apply|Combine|Consume|Run|Accept)Async
instead, and provide the desired executor instance.
Limitations
Due to the limitations of the design of the Collector API, upstream is always evaluated eagerly, even if downstream operations are short-circuiting. This means that none of these should be used for working with infinite streams.
Words of Caution
Even if this tool makes it easy to parallelize things, it doesn’t always mean that you should. Parallel processing is cool, but only if applied to the right problems – it comes with a price that can be often higher than not using it at all. Threads are expensive to create, maintain and switch between, and you can only create a limited number of them.
It’s important to follow up on the root cause and double-check if parallelism is the way to go.
It often turns out that the root cause can be addressed by using a simple JOIN statement, batching, reorganizing your data… or even just by choosing a different API method.