Press "Enter" to skip to content

Parallel Collection Processing: Leveraging Batching (2/3)

In this article, we’ll see how to increase the performance and cap maximum parallelism by introducing batching to our home-made parallel streams.

This article is a part of the series about parallel collection processing in Java without parallel streams:

Parallel Collection Processing: With Parallel Collectors (3/3)

The Problem

At the end of the previous article, we ended up with fully functional home-made async parallel streams:

CompletableFuture<List<Integer>> results = integers.stream()
  .map(i -> CompletableFuture.supplyAsync(() -> process(i), executor))
  .collect(collectingAndThen(toList(), l -> allOfOrException(l)));

This implementation features some interesting characteristics.

Every element from the source stream is represented as a separate entry in your thread pool’s work queue.

The good thing about it is that threads compete for each item separately which results in a primitive form of work-stealing. If some threads finish early, they continue picking up items from the work queue until it’s empty.

At the same time, the bad thing is that this brings a performance penalty(due to thread contention) in situations where work-stealing doesn’t bring significant value.

Is this something to be concerned about? Let’s see the actual numbers.

Benchmarks

In order to measure the performance overhead, a collection with 1000 elements was processed using an identity function with 1,10,100, and 1000 threads respectively. Minimizing the processing time allows us to focus on the infrastructure overhead instead:

private static final List<Integer> source = IntStream
  .range(0, 1000)
  .boxed()
  .collect(toList());

@Benchmark
public List<Integer> no_batching(BenchmarkState state) {
    return ParallelStreams.inParallel(
      source, i -> i, state.executor).join();
}

Before we have a look at the results, try to run a thought experiment and then compare your intuition with actual numbers.

How faster will it be to process the collection using 1000 threads instead of one?

Results

Threads    Mode  Cnt  Score      Error  Units
1          avgt  5    0.226 ±    0.002  ms/op
10         avgt  5    0.278 ±    0.005  ms/op
100        avgt  5    0.317 ±    0.005  ms/op
1000       avgt  5    0.905 ±    0.097  ms/op

Intel i7-4980HQ (8) @ 2.80GHz

Surprised? When the number of threads increases, synchronization costs become more and more significant which makes the case with 1000 threads more than three times slower than single thread processing!

What if we could decrease the overhead(thread contention) by letting individual threads compete for batches instead of individual items?

Introducing Batching

Luckily, introducing batching is less complicated than it sounds.

It’s a good idea to not burden consumers of your API with implementation details so the API remains unchanged. However, we need to introduce one extra parameter to control the number of batches to create(which we would need to introduce anyway later on):

static <T, R> CompletableFuture<List<R>> inParallelBatching(
  List<T> source, Function<T, R> mapper, Executor executor, int batches) {
    return ...
}

In order to introduce batching, we need to solve three distinct problems:

  1. How to partition a source collection into a collection of batches
  2. How to write an adapter for mapping functions so that it processes a collection of elements
  3. How to combine partial results into a single collection

In order to partition a list of elements, we could reuse the existing splitting mechanism provided by existing Spliterators but in our case, we know that the data structure that we need to partition is ArrayList, so we can write a dedicated partitioning iterator to be able to split a list into N as-equal-as-possible parts:

final class BatchingStream<T> implements Spliterator<List<T>> {

    private final List<T> source;
    private final int maxChunks;

    private int chunks;
    private int chunkSize;
    private int remaining;
    private int consumed;

    private BatchingStream(List<T> list, int numberOfParts) {
        source = list;
        chunks = numberOfParts;
        maxChunks = numberOfParts;
        chunkSize = (int) Math.ceil(((double) source.size()) / numberOfParts);
        remaining = source.size();
    }

    static <T> Stream<List<T>> partitioned(List<T> list, int numberOfParts) {
        int size = list.size();

        if (size == numberOfParts) {
            return asSingletonListStream(list);
        } else if (size == 0 || numberOfParts == 0) {
            return empty();
        } else if (numberOfParts == 1) {
            return of(list);
        } else {
            return stream(new BatchingStream<>(list, numberOfParts), false);
        }
    }

    private static <T> Stream<List<T>> asSingletonListStream(List<T> list) {
        Stream.Builder<List<T>> acc = Stream.builder();
        for (T t : list) {
            acc.add(Collections.singletonList(t));
        }
        return acc.build();
    }

    @Override
    public boolean tryAdvance(Consumer<? super List<T>> action) {
        if (consumed < source.size() && chunks != 0) {
            List<T> batch = source.subList(consumed, consumed + chunkSize);
            consumed = consumed + chunkSize;
            remaining = remaining - chunkSize;
            chunkSize = (int) Math.ceil(((double) remaining) / --chunks);
            action.accept(batch);
            return true;
        } else {
            return false;
        }
    }

    @Override
    public Spliterator<List<T>> trySplit() {
        return null;
    }

    @Override
    public long estimateSize() {
        return maxChunks;
    }

    @Override
    public int characteristics() {
        return ORDERED | SIZED;
    }
}

Implementation considerations of the above will be covered in a separate article.

In order to adapt a mapping function, we need to prepare an adapter that converts Function<T, R> to Function<List<T>, List<R>> which is fairly straightforward:

private static <T, R> Function<List<T>, List<R>> batching(
  Function<T, R> mapper) {
    return batch -> {
        List<R> list = new ArrayList<>(batch.size());
        for (T t : batch) {
            list.add(mapper.apply(t));
        }
        return list;
    };
}

If we combine the above, we get something like:

static <T, R> CompletableFuture<List<R>> inParallelBatching(
  List<T> source, Function<T, R> mapper, Executor executor, int batches) {

    return BatchingStream.partitioned(source, batches)
      .map(batch -> supplyAsync(
        () -> batching(mapper).apply(batch), executor))
      .collect(collectingAndThen(toList(), l -> allOfOrException(l)))
      ...
}

And the last thing left to do is to flatten the result (CompletableFuture<List<List<R>>) into a single list, which can be achieved using Stream#flatMap, or a simple loop. In this case, I choose a loop in order to avoid extra overhead of the Stream API.

So the final result is:

static <T, R> CompletableFuture<List<R>> inParallelBatching(
  List<T> source, Function<T, R> mapper, Executor executor, int batches) {

    return BatchingStream.partitioned(source, batches)
      .map(batch -> supplyAsync(
        () -> batching(mapper).apply(batch), executor))
      .collect(collectingAndThen(toList(), l -> allOfOrException(l)))
      .thenApply(list -> {
          List<R> result = new ArrayList<>(source.size());
              for (List<R> rs : list) {
                  result.addAll(rs);
              }
              return result;
       });
}

Benchmarks

Finally, we can compare results with our starting point! Keep in mind that in our benchmarks number of batches is equal to the number of threads.

Again, before we have a look at the results, try to run a thought experiment and then compare your intuition with actual numbers.

Benchmark      Threads Mode Cnt Score   Error Units
----
no_batching    1       avgt 5   0.226 ± 0.002 ms/op
no_batching    10      avgt 5   0.278 ± 0.005 ms/op
no_batching    100     avgt 5   0.317 ± 0.005 ms/op
no_batching    1000    avgt 5   0.905 ± 0.097 ms/op
with_batching  1       avgt 5   0.015 ± 0.001 ms/op
with_batching  10      avgt 5   0.020 ± 0.001 ms/op
with_batching  100     avgt 5   0.071 ± 0.028 ms/op
with_batching  1000    avgt 5   0.985 ± 0.054 ms/op

Intel i7-4980HQ (8) @ 2.80GHz

The result is really impressive. As you can see introducing batching made it possible to decrease overhead in almost all the cases.

In the best-case scenario (1 thread), the overhead was ~9 times lower, and in the worst-case scenario (1000 threads), the overhead was resulting in worse results than without batching at all. At the end of the day, 1000 threads processing 1000 elements results in 1000 batches.

What a great success, isn’t it?

Let me recall the warning you get each time you run JMH:

REMEMBER: The numbers below are just data.

To gain reusable insights, you need to follow up on why the numbers are the way they are. Use profilers (see -prof, -lprof), design factorial
experiments, perform baseline and negative tests that provide experimental control, make sure
the benchmarking environment is safe on JVM/OS/HW level, ask for reviews from the domain experts.

Do not assume the numbers tell you what you want them to tell.

The benchmarks we created are telling the truth, but not necessarily the one that we care about(remember the Answer to the Ultimate Question of Life, The Universe, and Everything?).

The relative performance improvement is impressive, but what we really care about is if this improvement is significant enough for our use cases, and our benchmark was designed to show the relative difference between overhead, which doesn’t answer our question!

In our case, we care more about absolute values and not relative ones, so let’s have a look at the speedup we gained in each case:

Benchmark      Threads Diff    Units
----
with_batching  1       + ~211  ns/op
with_batching  10      + ~258  ns/op
with_batching  100     + ~246  ns/op
with_batching  1000    - ~80   ns/op

Intel i7-4980HQ (8) @ 2.80GHz

As you can see, in the best-case scenario, we saved around 258ns.

Depending on your use case, this can be a significant difference or not really. But introducing batching brings other advantages besides offering higher throughput.

Other Considerations

Besides the throughput increase, batching allows you to control the maximum parallelism level.

If you decide to apply the processing in N batches, it also implies that not more than N items will be processed in parallel (assuming that the parallelized action doesn’t orchestrate more parallel processing itself!).

Naturally, you can implement parallelism limits for the non-batching example, but it becomes non-trivial if you want to keep it asynchronous, and requires extra resources (most likely an extra dispatcher thread guarded by a semaphore).

Besides the parallelism level control, batching can give you more predictable latencies when reusing a heavily-shared thread pool.

If no batching involved, each item becomes a separate entry in the thread pool’s work queue, which might result in situations where 99% of the work is already done, but there are other items in the queue that need to be processed first!

And once a batch gets picked up, all the items inside will be processed at once by a single thread.

Another thing to consider is that properly-tuned thread-pools usually feature a fixed-size work queue and that the size(among other parameters) needs to be properly tuned in relation to the expected rate of arrival of new tasks.

Batching generates always N tasks where N corresponds to the number of batches and not the number of all items in the source.

Conclusion

In the next article, we’ll see how to achieve the same by using parallel-collectors.

Code samples can be found on GitHub.




If you enjoyed the content, consider supporting the site: