Press "Enter" to skip to content

Implementing Efficient Last Stream Elements Gatherer in Java

Gatherers is the missing piece of the Stream API. So far, if we were missing a terminal method, we could always implement one ourselves by leveraging the Collectors API. This made parallel-collectors happen, which extend the capabilities of parallel streams.

However, what if someone wanted to implement a custom intermediate operation? While most of those you could shoehorn into the Collectors API, it has one major limitation – Collectors evaluate the whole Stream without any short-circuiting. This is precisely what Gatherers are for.

More about them in the upcoming article, but today we’ll go through a case study of implementing an efficient Gatherer that takes N last Stream elements (think limit() but from the other side).

This example is not about short-circuiting; it’s about implementing an intermediate operation efficiently without materialising the whole stream first.

Implementing Gatherers

Implementing Gatherers is similar to implementing Collectors, but with one major difference – the Integrator:

public interface Gatherer<T, A, R> {
    Supplier<A> initializer();
    Integrator<A, T, R> integrator();
    BinaryOperator<A> combiner();
    BiConsumer<A, Downstream<? super R>> finisher();
}

Integrator is what allows signalling up that a Stream should stop pushing new elements by returning a boolean indicator:

boolean integrate(A state, T element, Downstream<? super R> downstream);

However, in our case, we’ll not be leveraging this.

Our goal is to implement a gatherer that returns the last N elements of a given Stream:

Stream.of(1,2,3,4)
  .gather(last(2))
  .forEach(...); 

// 3, 4

Now, we’ll go through a few iterations: start simple, and then see what we can do to make it perform better.

Step 1: Make it Right

Probably the easiest way to end up with something that produces correct results is to use a List to accumulate the last N elements and update it whenever new elements arrive.

Firstly, we need to initialize our accumulator:

@Override
public Supplier<ArrayList<T>> initializer() {
    return ArrayList::new;
}

Then, we need to implement the Integrator with a core logic:

@Override
public Integrator<ArrayList<T>, T, T> integrator() {
    return Integrator.ofGreedy((state, elem, ignored) -> {
        if (state.size() >= n) {
            state.removeFirst();
        }
        state.add(elem);
        return true;
    });
}

And finally, we need to push those elements downstream in the finalizer:

@Override
public BiConsumer<ArrayList<T>, Downstream<? super T>> finisher() {
    return (state, downstream) -> {
        for (T e : state) {
            if (!downstream.push(e)) {
                break;
            }
        }
    };
}

And now together:

public record LastGathererTake1<T>(int n) implements Gatherer<T, ArrayList<T>, T> {

    @Override
    public Supplier<ArrayList<T>> initializer() {
        return ArrayList::new;
    }

    @Override
    public Integrator<ArrayList<T>, T, T> integrator() {
        return Integrator.ofGreedy((state, elem, ignored) -> {
            if (state.size() >= n) {
                state.removeFirst();
            }
            state.add(elem);
            return true;
        });
    }

    @Override
    public BiConsumer<ArrayList<T>, Downstream<? super T>> finisher() {
        return (state, downstream) -> {
            for (T e : state) {
                if (!downstream.push(e)) {
                    break;
                }
            }
        };
    }
}

Profiling and Benchmarking

Let’s try to benchmark and profile our creation. We’ll use JMH and async-profiler (conveniently integrated into JMH nowadays).

Here’s our benchmark (the full setup can be found on GitHub):

@Benchmark
public void take_1(Blackhole bh) {
    Stream.of(data)
      .gather(new LastGathererTake1<>(n))
      .forEach(bh::consume);
}

And here’s our result:

Benchmark             (n)   (size)  Mode Cnt Score   Error   Units
LastBenchmark.take_1 1000 10000000 thrpt   3 1,338 ± 0,239   ops/s

Is this good or bad? Probably good enough for most use cases, but… if you look closely into the flamegraph, you will see that the ArrayList#removeFirst() method takes significant portion of processing time!

In newer JDKs removeFirst() exists via sequenced collections, but for ArrayList it’s effectively remove(0) and still O(n).

Let’s remove the bottleneck!

Step 2: Make it Faster

From the above data, it’s clear that removal takes much more time than addition, so let’s address it. The easiest way is to simply buffer all results in a List, and then simply iterate through desired elements:

public record LastGathererTake2<T>(int n) implements Gatherer<T, ArrayList<T>, T> {

    @Override
    public Supplier<ArrayList<T>> initializer() {
        return ArrayList::new;
    }

    @Override
    public Integrator<ArrayList<T>, T, T> integrator() {
        return Gatherer.Integrator.ofGreedy((state, elem, ignored) -> {
              state.add(elem);
              return true;
          }
        );
    }

    @Override
    public BiConsumer<ArrayList<T>, Downstream<? super T>> finisher() {
        return (state, downstream) -> {
            int start = Math.max(0, state.size() - n);
            for (int i = start; i < state.size(); i++) {
                if (!downstream.push(state.get(i))) {
                    break;
                }
            }
        };
    }
}

Let’s run our benchmarks again!

Benchmark             (n)  (size)   Mode Cnt Score   Error Units
LastBenchmark.take_1 1000 10000000 thrpt  3  1,348 ± 0,201 ops/s
LastBenchmark.take_2 1000 10000000 thrpt  3 19,052 ± 7,252 ops/s

Wow, the new implementation did ~15 times better! It’s something!

Flamegraph has also drastically changed, now it’s the ArrayList.add method that dominates, and we can see that this is mostly due to internal ArrayList.grow method that internally allocates a larger array (roughly 1.5× the previous size) and copies all existing elements into it (arrays can’t be resized).

Similar problem was the core issue of the previous bottleneck – removal of the first item from ArrayList involves copying the whole array content and shifting it by one! (ironically, the internal ArrayList method is called fastRemove()).

We’re an order of magnitude faster, but still inefficient. The original idea wasn’t that bad – we just used wrong data structure for it!

Step 3: Use the Right Data Structure

Maintaining a fixed-size buffer was not a bad idea, but it required a dedicated data structure which allowed efficient insertion/removal on both ends. ArrayDeque (double-ended queue) is one of such data structures.

If we take the code from step 1, and swap ArrayList with ArrayDeque, we get something like this:

public record LastGathererTake3<T>(int n) implements Gatherer<T, ArrayDeque<T>, T> {

    @Override
    public Supplier<ArrayDeque<T>> initializer() {
        return ArrayDeque::new;
    }

    @Override
    public Integrator<ArrayDeque<T>, T, T> integrator() {
        return Integrator.ofGreedy((state, element, ignored) -> {
            if (state.size() == n) {
                state.removeFirst();
            }
            state.addLast(element);
            return true;
        });
    }

    @Override
    public BiConsumer<ArrayDeque<T>, Downstream<? super T>> finisher() {
        return (state, ds) -> {
            for (Iterator<T> it = state.iterator(); it.hasNext() && !ds.isRejecting(); ) {
                if (!ds.push(it.next())) {
                    break;
                }
            }
        };
    }
}

And here are benchmark results:

Benchmark             (n)   (size) Mode Cnt Score    Error  Units
LastBenchmark.take_1 1000 10000000 thrpt 3  1,340  ± 0,032  ops/s
LastBenchmark.take_2 1000 10000000 thrpt 3  17,685 ± 33,776 ops/s
LastBenchmark.take_3 1000 10000000 thrpt 3  35,751 ± 3,629  ops/s

Whoa! We’re doing twice better than the previous attempt!

Could we find even more bottlenecks? 

At first sight, there are no obvious candidates, but… what if we could get rid of that removal altogether?

Step 4: Roll out Specialised Data Structure

What if we could avoiding paying the price of removals by introducing overwriting semantics?

The standard library doesn’t have a data structure like this, so we’ll need to roll out our own!

One of data structures with such properties is circular buffer – it’s a fixed-size data structure which overwrites oldest entries on overflow, and it’s quite easy to implement:

static class AppendOnlyCircularBuffer<T> {
    private final T[] buffer;
    private int endIdx = 0;
    private int size = 0;

    public AppendOnlyCircularBuffer(int size) {
        this.buffer = (T[]) new Object[size];
    }

    public void add(T element) {
        buffer[endIdx++ % buffer.length] = element;
        if (size < buffer.length) {
            size++;
        }
    }

    public void forEach(Consumer<T> consumer) {
        int startIdx = (endIdx - size + buffer.length) % buffer.length;
        for (int i = 0; i < size; i++) {
            consumer.accept(buffer[(startIdx + i) % buffer.length]);
        }
    }
}

In this implementation an array serves as storage and an additional index is used to keep track of the position of the last element. We also need an extra int for keeping track of the actual buffer size.

Let’s see it in action!

Benchmark             (n)  (size)  Mode  Cnt Score  Error    Units
LastBenchmark.take_1 1000 10000000 thrpt 3   1,360  ± 0,216  ops/s
LastBenchmark.take_2 1000 10000000 thrpt 3   19,060 ± 28,949 ops/s
LastBenchmark.take_3 1000 10000000 thrpt 3   38,050 ± 8,296  ops/s
LastBenchmark.take_4 1000 10000000 thrpt 3   84,340 ± 15,607 ops/s

Step 5: Make it Fast

We’ve now improved over 60 times! But could we do even better?

As you can see, AppendOnlyCircularBuffer.add method is the main bottleneck now (not counting Stream internals) and this is going to get tough now:

public void add(T element) {
    buffer[endIdx++ % buffer.length] = element;
    if (size < buffer.length) {
        size++;
    }
}

This is pretty lean already, but let’s go deeper and look up the assembly code produced by JIT:

And the most relevant part:

0x0000000112afe244: sdiv w8, w3, w4
0x0000000112afe248: msub w3, w8, w4, w3 ;*irem {reexecute=0 rethrow=0 return_oop=0}

Yes, it’s the modulo operation, which is implemented by a combination of two operations: sdiv and msub. According to Apple Silicon CPU Optimization Guide: 4.0, SDIV can take anywhere from 7 to 21 cycles and MSUB 1-2 cycles.

This is relatively expensive. Could we avoid paying this extra price?

One of the classic tricks involves replacing modulo operation with bit masking, but there’s a catch – it works only when modulus is a power of two. We can’t guarantee our modulus to be a power of two… or can we?

Nothing is stopping us from aligning our buffer’s size to the next available power of two. We’d just need to be careful to read last N elements properly.

First, we’d need to have an efficient method of finding it:

private static int nextPowerOfTwo(int x) {
    int highest = Integer.highestOneBit(x);
    return (x == highest) ? x : (highest << 1);
}

Now, we’d need to use it to initialize buffer and a mask:

int capacity = nextPowerOfTwo(Math.max(1, this.limit));
this.buffer = new Object[capacity];
this.mask = capacity - 1;

Our add() method becomes:

void add(T e) {
    buffer[writeIdx & mask] = e;
    writeIdx++;
    if (size < limit) {
        size++;
    }
}

And we need a helper method for fetching an element at a given index:

T get(int index, int start) {
    return (T) buffer[(start + index) & mask];
}
void pushAll(Gatherer.Downstream<? super T> ds) {
    int start = (writeIdx - size) & mask;

    for (int i = 0; i < size && !ds.isRejecting(); i++) {
        if (!ds.push(get(i, start))) {
            break;
        }
    }
}

We’re no longer relying on an expensive modulo operation, if we look up assembly produced by JIT, we can see that expensive instructions are gone, and we have a cheap AND(1-2 cycles) in their place:

Here’s the whole thing:

final class AppendOnlyCircularBuffer<T> {
    private final Object[] buffer;
    private final int mask;
    private final int limit;

    private int size;
    private int writeIdx;

    AppendOnlyCircularBuffer(int limit) {
        this.limit = Math.max(0, limit);
        int capacity = nextPowerOfTwo(Math.max(1, this.limit));
        this.buffer = new Object[capacity];
        this.mask = capacity - 1;
    }

    void add(T e) {
        buffer[writeIdx & mask] = e;
        writeIdx++;
        if (size < limit) {
            size++;
        }
    }

    T get(int index, int start) {
        return (T) buffer[(start + index) & mask];
    }

    void pushAll(Gatherer.Downstream<? super T> ds) {
        int start = (writeIdx - size) & mask;

        for (int i = 0; i < size && !ds.isRejecting(); i++) {
            if (!ds.push(get(i, start))) {
                break;
            }
        }
    }

    private static int nextPowerOfTwo(int x) {
        int highest = Integer.highestOneBit(x);
        return (x == highest) ? x : (highest << 1);
    }
}

Let’s finally run it and see the benchmarks!

Benchmark             (n)   (size)  Mode Cnt Score   Error  Units
LastBenchmark.take_1 1000 10000000 thrpt  3  1,352 ± 0,232  ops/s
LastBenchmark.take_2 1000 10000000 thrpt  3 17,885 ± 14,921 ops/s
LastBenchmark.take_3 1000 10000000 thrpt  3 38,229 ± 1,599  ops/s
LastBenchmark.take_4 1000 10000000 thrpt  3 89,591 ± 18,486 ops/s
LastBenchmark.take_5 1000 10000000 thrpt  3 100,454 ± 1,073 ops/s

This is where I genuinely run out of good ideas. I had a couple of them (like avoiding masking on each access), but benchmarks proved those were not as good as I had thought.

This is one of the most important takeaways here – performance optimisation cycle involves measuring, finding a bottleneck, forming a hypothesis and then evaluating it.

Step 6: Embracing Edge Cases

However, there’s still an important corner case to have a look at… a single last element lookup!

If all we care about is just the last element, then we can ditch the circular buffer and use a simple object wrapper:

record SingleElementLastGatherer<T>() implements Gatherer<T, SingleElementLastGatherer.ValueHolder<T>, T> {

    @Override
    public Supplier<ValueHolder<T>> initializer() {
        return ValueHolder::new;
    }

    @Override
    public Integrator<ValueHolder<T>, T, T> integrator() {
        return Integrator.ofGreedy((state, element, _) -> {
            state.value = element;
            state.isSet = true;
            return true;
        });
    }

    @Override
    public BiConsumer<ValueHolder<T>, Downstream<? super T>> finisher() {
        return (state, downstream) -> {
            if (state.isSet && !downstream.isRejecting()) {
                downstream.push(state.value);
            }
        };
    }

    static class ValueHolder<T> {
        private T value;
        private boolean isSet;
    }
}

Benchmark results are now brutal:

Benchmark      (n) (size)  Mode Cnt Score        Error        Units
circular_buffer 1 10000000 thrpt 3  100,963      ± 1,941      ops/s
value_holder    1 10000000 thrpt 3  46678566,613 ± 340533,213 ops/s

So our ultimate solution is going to choose a strategy depending on the number of requested elements:

static <T> Gatherer<T, ?, T> last(int n) {
    return switch (n) {
        case 1 -> new SingleElementLastGatherer<>();
        default -> new CircularBufferLastGatherer<>(n);
    };
}

And this is precisely what I’m doing in more-gatherers – library filling in the Stream API gaps.

Now, let’s benchmark this against a similar utility from gatherers4j:

Benchmark         (n)    (size)   Mode    Cnt   Score      Error         Units
gatherers4j        1      1000    thrpt    4    344953,593 ±  11157,945  ops/s
more_gatherers     1      1000    thrpt    4  46403136,688 ± 919588,746  ops/s

gatherers4j        1     10000    thrpt    4     35013,972 ±    386,535  ops/s
more_gatherers     1     10000    thrpt    4  45112509,001 ± 259866,835  ops/s

gatherers4j        1    100000    thrpt    4      3480,072 ±    295,102  ops/s
more_gatherers     1    100000    thrpt    4  52938383,744 ± 843270,687  ops/s

gatherers4j        1   1000000    thrpt    4       349,321 ±     12,975  ops/s
more_gatherers     1   1000000    thrpt    4  46070928,929 ± 328021,478  ops/s

gatherers4j        1  10000000    thrpt    4        34,992 ±      1,383  ops/s
more_gatherers     1  10000000    thrpt    4  45576656,184 ± 603781,325  ops/s

Benchmark         (n)    (size)   Mode    Cnt   Score      Error         Units
gatherers4j       10      1000    thrpt    4    335775,631 ±  19864,798  ops/s
more_gatherers    10      1000    thrpt    4    638157,379 ±  89987,065  ops/s

gatherers4j       10     10000    thrpt    4     34660,535 ±   3070,990  ops/s
more_gatherers    10     10000    thrpt    4     89377,421 ±    745,692  ops/s

gatherers4j       10    100000    thrpt    4      3514,937 ±     97,364  ops/s
more_gatherers    10    100000    thrpt    4      9980,622 ±     87,814  ops/s

gatherers4j       10   1000000    thrpt    4       346,090 ±      9,348  ops/s
more_gatherers    10   1000000    thrpt    4       630,170 ±     81,201  ops/s

gatherers4j       10  10000000    thrpt    4        33,716 ±      8,170  ops/s
more_gatherers    10  10000000    thrpt    4        99,399 ±      3,150  ops/s

Benchmark         (n)    (size)   Mode    Cnt   Score      Error         Units
gatherers4j      100      1000    thrpt    4    338606,915 ±  30009,425  ops/s
more_gatherers   100      1000    thrpt    4    920026,588 ±   5509,723  ops/s

gatherers4j      100     10000    thrpt    4     34730,669 ±   1584,566  ops/s
more_gatherers   100     10000    thrpt    4     88842,806 ±   1361,405  ops/s

gatherers4j      100    100000    thrpt    4      3370,974 ±    717,176  ops/s
more_gatherers   100    100000    thrpt    4      9979,804 ±    135,956  ops/s

gatherers4j      100   1000000    thrpt    4       308,239 ±      4,847  ops/s
more_gatherers   100   1000000    thrpt    4       989,730 ±      7,982  ops/s

gatherers4j      100  10000000    thrpt    4        32,684 ±      0,397  ops/s
more_gatherers   100  10000000    thrpt    4        98,816 ±      1,606  ops/s

Benchmark         (n)    (size)   Mode    Cnt   Score      Error         Units
gatherers4j     1000      1000    thrpt    4    270321,525 ±   2961,567  ops/s
more_gatherers  1000      1000    thrpt    4    606415,730 ±   2780,339  ops/s

gatherers4j     1000     10000    thrpt    4     31727,683 ±    122,620  ops/s
more_gatherers  1000     10000    thrpt    4    101115,273 ±    370,316  ops/s

gatherers4j     1000    100000    thrpt    4      3241,063 ±      9,743  ops/s
more_gatherers  1000    100000    thrpt    4      8857,406 ±     52,233  ops/s

gatherers4j     1000   1000000    thrpt    4       323,806 ±      3,993  ops/s
more_gatherers  1000   1000000    thrpt    4       998,388 ±     13,033  ops/s

gatherers4j     1000  10000000    thrpt    4        32,250 ±      0,205  ops/s
more_gatherers  1000  10000000    thrpt    4        98,674 ±      0,897  ops/s

Benchmarked on MacBook Pro Nov 2023 with Apple M3 Pro with 36GB RAM and Tahoe 26.2.

The code supporting this article along with benchmarking suite and full results can be found on GitHub. Here’s the more-gatherers library that features this Gatherer implementation.

 




If you enjoyed the content, consider supporting the site: