Press "Enter" to skip to content

Parallel Collection Processing: With Parallel Collectors (3/3)

In the previous articles, we’ve seen how to apply parallel processing to our collections using high-level tools like CompletableFuture and Stream API which allows us to bypass problems introduced by Parallel Streams:

Parallel Collection Processing: With Parallel Collectors (3/3)

Parallel Collection Processing: Without Parallel Streams (1/3)

In this article, we’ll dive into parallel-collectors – my library that’s supposed to consolidate ideas from previous articles into a set of public APIs.

Overview

Simply put, parallel-collectors is a zero-dependency toolkit easing parallel collection processing in Java using Stream API and CompletableFuture allowing you to choose your own parallelization strategy, maximum parallelism, executor, and much more.

All of the above and more is provided via custom implementations of the Collector interface:

list.stream()
  .collect(parallel(i -> foo(i), toList(), executor, parallelism))
  .orTimeout(1000, MILLISECONDS)
  .thenAcceptAsync(System.out::println, otherExecutor)
  .thenRun(() -> System.out.println("Finished!"));

In order to be able to start using it, add the following dependency:

<dependency>
    <groupId>com.pivovarit</groupId>
    <artifactId>parallel-collectors</artifactId>
    <version>2.3.0</version>
</dependency>

Double-check the latest version here.

Features

The library leverages the combined power of Stream API and CompletableFuture to provide missing functionality for standard Parallel Streams.

They are:

  • non-invasive – the functionality is provided using custom implementations of public APIs
  • lightweight – if your goal is to apply straightforward parallel processing, it’s the sweet spot between Parallel Streams and heavy-armor technologies like RxJava/Project Reactor
  • configurable – choose your own Executor, parallelism, batching/completion strategies
  • non-blocking – results can be returned as CompletableFutures in order to not block the calling thread
  • short-circuiting – if one of the subtasks throws an exception, remaining subtasks get interrupted and/or skipped

Parallel Collectors are unopinionated by design so it’s up to their users to use them responsibly, which involves things like:

  • proper configuration of a provided Executor and its lifecycle management
  • choosing the appropriate parallelism level
  • making sure that the tool is applied in the right context

Make sure to read API documentation before using these in production.

API Overview

The com.pivovarit.collectors.ParallelCollectors class serves as a single entry point to the whole library mimicking the java.util.stream.Collectors class.

So, a classic straightforward parallel stream usage:

List<Integer> result = list.stream()
  .parallel()
  .map(i -> foo(i))
  .collect(toList());

Ends up being replaced by:

private static ExecutorService e = ...
// ...

CompletableFuture<List<Integer>> result = list.stream()
  .collect(parallel(i -> foo(i), toList(), e, 4));

The library features four types of collectors:

  1. ParallelCollectors.parallel() – process the stream, collect it using a provided Collector, and receive the result as CompletableFuture
  2. ProcessCollectors.parallelToStream() – process the stream and stream the results in the completion order
  3. ParallelCollectors.parallelToOrderedStream() – process the stream and stream the results in the original order
  4. ParallelCollectors.toFuture() – collect a stream of CompletableFutures using a custom Collector

By default, all upstream elements are processed separately, if you want to process them in batches instead, use ParallelCollectors.Batching namespace that contains drop-in alternatives (less stress on the executor in exchange for the lack of work-stealing).

More examples can be found on GitHub in README.

Good Practices

  • Consider providing reasonable timeouts for CompletableFutures in order to not block for unreasonably long in the case when something bad happens (how-to)
  • Name your thread pools – it makes debugging easier (how-to)
  • Limit the size of a working queue of your thread pool (source)
  • Always limit the max parallelism unless you know what you are doing (source)
  • Remember to shut-down no-longer-used ExecutorService instances to allow reclamation of resources
  • CompletableFuture#then(Apply|Combine|Consume|Run|Accept) in some scenarios might be executed by the calling thread. If this is not suitable, use CompletableFuture#then(Apply|Combine|Consume|Run|Accept)Async instead, and provide the desired executor instance.

Limitations

Due to the limitations of the design of the Collector API, upstream is always evaluated eagerly, even if downstream operations are short-circuiting. This means that none of these should be used for working with infinite streams.

Words of Caution

Even if this tool makes it easy to parallelize things, it doesn’t always mean that you should. Parallel processing is cool, but only if applied to the right problems – it comes with a price that can be often higher than not using it at all. Threads are expensive to create, maintain and switch between, and you can only create a limited number of them.

It’s important to follow up on the root cause and double-check if parallelism is the way to go.

It often turns out that the root cause can be addressed by using a simple JOIN statement, batching, reorganizing your data… or even just by choosing a different API method.




If you enjoyed the content, consider supporting the site: