In the previous articles, we’ve seen how to apply parallel processing to our collections using high-level tools like CompletableFuture and Stream API which allows us to bypass problems introduced by Parallel Streams:
In this article, we’ll dive into parallel-collectors – my library that’s supposed to consolidate ideas from previous articles into a set of public APIs.
Simply put, parallel-collectors is a zero-dependency toolkit easing parallel collection processing in Java using Stream API and CompletableFuture allowing you to choose your own parallelization strategy, maximum parallelism, executor, and much more.
All of the above and more is provided via custom implementations of the Collector interface:
list.stream() .collect(parallel(i -> foo(i), toList(), executor, parallelism)) .orTimeout(1000, MILLISECONDS) .thenAcceptAsync(System.out::println, otherExecutor) .thenRun(() -> System.out.println("Finished!"));
In order to be able to start using it, add the following dependency:
<dependency> <groupId>com.pivovarit</groupId> <artifactId>parallel-collectors</artifactId> <version>2.3.0</version> </dependency>
The library leverages the combined power of Stream API and CompletableFuture to provide missing functionality for standard Parallel Streams.
- non-invasive – the functionality is provided using custom implementations of public APIs
- lightweight – if your goal is to apply straightforward parallel processing, it’s the sweet spot between Parallel Streams and heavy-armor technologies like RxJava/Project Reactor
- configurable – choose your own Executor, parallelism, batching/completion strategies
- non-blocking – results can be returned as CompletableFutures in order to not block the calling thread
- short-circuiting – if one of the subtasks throws an exception, remaining subtasks get interrupted and/or skipped
Parallel Collectors are unopinionated by design so it’s up to their users to use them responsibly, which involves things like:
- proper configuration of a provided
Executorand its lifecycle management
- choosing the appropriate parallelism level
- making sure that the tool is applied in the right context
Make sure to read API documentation before using these in production.
com.pivovarit.collectors.ParallelCollectors class serves as a single entry point to the whole library mimicking the
So, a classic straightforward parallel stream usage:
List<Integer> result = list.stream() .parallel() .map(i -> foo(i)) .collect(toList());
Ends up being replaced by:
private static ExecutorService e = ... // ... CompletableFuture<List<Integer>> result = list.stream() .collect(parallel(i -> foo(i), toList(), e, 4));
The library features four types of collectors:
ParallelCollectors.parallel()– process the stream, collect it using a provided Collector, and receive the result as CompletableFuture
ProcessCollectors.parallelToStream()– process the stream and stream the results in the completion order
ParallelCollectors.parallelToOrderedStream()– process the stream and stream the results in the original order
ParallelCollectors.toFuture()– collect a stream of CompletableFutures using a custom Collector
By default, all upstream elements are processed separately, if you want to process them in batches instead, use
ParallelCollectors.Batching namespace that contains drop-in alternatives (less stress on the executor in exchange for the lack of work-stealing).
- Consider providing reasonable timeouts for
CompletableFutures in order to not block for unreasonably long in the case when something bad happens (how-to)
- Name your thread pools – it makes debugging easier (how-to)
- Limit the size of a working queue of your thread pool (source)
- Always limit the max parallelism unless you know what you are doing (source)
- Remember to shut-down no-longer-used
ExecutorServiceinstances to allow reclamation of resources
CompletableFuture#then(Apply|Combine|Consume|Run|Accept)in some scenarios might be executed by the calling thread. If this is not suitable, use
CompletableFuture#then(Apply|Combine|Consume|Run|Accept)Asyncinstead, and provide the desired executor instance.
Due to the limitations of the design of the Collector API, upstream is always evaluated eagerly, even if downstream operations are short-circuiting. This means that none of these should be used for working with infinite streams.
Words of Caution
Even if this tool makes it easy to parallelize things, it doesn’t always mean that you should. Parallel processing is cool, but only if applied to the right problems – it comes with a price that can be often higher than not using it at all. Threads are expensive to create, maintain and switch between, and you can only create a limited number of them.
It’s important to follow up on the root cause and double-check if parallelism is the way to go.
It often turns out that the root cause can be addressed by using a simple JOIN statement, batching, reorganizing your data… or even just by choosing a different API method.