Press "Enter" to skip to content

Implementing a Sliding Window Stream/Spliterator in Java

In this article, we’ll see how to implement a custom sliding window Stream/Spliterator in Java.

Does the world need another way of implementing a sliding window operation in Java? Probably not, but you do – for your self-development.

Sliding Window

Simply put, Sliding Window algorithm is a method of traversing data structures by moving a fixed-size window(sublist) over a sequence in fixed steps.

It gets much more intuitive when shown in an example.

If we wanted to traverse a list [1 2 3 4 5] by using the window of the size 3, we’d be merely looking at the following groups:

  1. [1 2 3]
  2. [2 3 4]
  3. [3 4 5]

…but if we wanted to traverse the same list using a window that’s bigger than collection’s size, we wouldn’t get even a single element.

Implementation

To be able to create a custom Stream, we need to implement a custom Spliterator.

In our case, we need to be able to iterate over groups represented by Stream<T> sequences, so we need to implement the Spliterator interface and specify the generic type parameter:

public class SlidingWindowSpliterator<T> implements Spliterator<Stream<T>> {
   // ...
}

Then, it turns out we have a bunch of methods to implement:

public class SlidingWindowSpliterator<T> implements Spliterator<Stream<T>> {

    @Override
    public boolean tryAdvance(Consumer<? super Stream<T>> action) {
        return false;
    }

    @Override
    public Spliterator<Stream<T>> trySplit() {
        return null;
    }

    @Override
    public long estimateSize() {
        return 0;
    }

    @Override
    public int characteristics() {
        return 0;
    }
}

We’ll also need a few fields for storing buffered elements, the window size parameter, an iterator of the source collection, and a precomputed size estimation (we’ll need that later on):

private final Queue<T> buffer;
private final Iterator<T> sourceIterator;
private final int windowSize;
private final int size;

Before we can start implementing interface methods, we need to have an ability to instantiate our tool.

In this case, we’ll restrict the visibility of the constructor, and expose a public static factory method instead:

private SlidingWindowSpliterator(Collection<T> source, int windowSize) {
    this.buffer = new ArrayDeque<>(windowSize);
    this.sourceIterator = Objects.requireNonNull(source).iterator();
    this.windowSize = windowSize;
    this.size = calculateSize(source, windowSize);
}
static <T> Stream<Stream<T>> windowed(Collection<T> stream, int windowSize) {
    return StreamSupport.stream(
      new SlidingWindowSpliterator<>(stream, windowSize), false);
}

Let’s now implement the easy part of the Spliterator methods.

In our case, there’s no easy way to split the sequence, so when implementing trySplit(), we default to values specified in the documentation. Luckily, size can be calculated quite easily:

private static int calculateSize(Collection<?> source, int windowSize) {
    return source.size() < windowSize
      ? 0
      : source.size() - windowSize + 1;
}

@Override 
public Spliterator<Stream<T>> trySplit() { 
    return null; 
} 
 
@Override 
public long estimateSize() { 
    return size; 
}

In characteristics(), we specify:

  1. ORDERED – because the encounter order matters
  2. NONNULL – because elements will never be null (although can contain nulls)
  3. SIZED – because the size is predictable
@Override
public int characteristics() {
    return ORDERED | NONNULL | SIZED;
}

Implementing tryAdvance

And here comes the crucial part – the method responsible for the actual grouping and iteration.

Firstly, if the window is smaller than 1, then there’s nothing to iterate so that we can short-circuit immediately:

@Override
public boolean tryAdvance(Consumer<? super Stream<T>> action) {
    if (windowSize < 1) {
        return false;
    }

    // ...
}

And now, to generate the first sublist, we need to start iterating and filling the buffer:

while (sourceIterator.hasNext()) {
    buffer.add(sourceIterator.next());

    // ...
}

Once the buffer is filled, we can dispatch the complete group, and discard the oldest element from the buffer.

Here comes a crucial part, one might be tempted to pass buffer.stream() to the accept() method, which is a huge mistake – Streams are lazily bound to an underlying collection, which means that if the source changes, Stream changes as well.

To avoid the problem and decouple our groups from the internal buffer representation, we need to snapshot the current state of the buffer before creating each Stream instance. We’ll back Stream instances with arrays to make them as lightweight as possible.

Since Java doesn’t support generic arrays, we need to do some ugly casting:

if (buffer.size() == windowSize) {
    action.accept(Arrays.stream((T[]) buffer.toArray(new Object[0])));

    buffer.poll();
    return true;
}

…and voila, we are ready to use it:

windowed(List.of(1,2,3,4,5), 3)
  .map(group -> group.collect(toList()))
  .forEach(System.out::println);

// result
[1, 2, 3]
[2, 3, 4]
[3, 4, 5]

An exercise for the reader: implement a possibility of specifying a custom step size (now it’s implicitly set to 1).

A Complete Example

package com.pivovarit.stream;

import java.util.ArrayDeque;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.Objects;
import java.util.Queue;
import java.util.Spliterator;
import java.util.function.Consumer;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

public class SlidingWindowSpliterator<T> implements Spliterator<Stream<T>> {

    static <T> Stream<Stream<T>> windowed(Collection<T> stream, int windowSize) {
        return StreamSupport.stream(
          new SlidingWindowSpliterator<>(stream, windowSize), false);
    }

    private final Queue<T> buffer;
    private final Iterator<T> sourceIterator;
    private final int windowSize;
    private final int size;

    private SlidingWindowSpliterator(Collection<T> source, int windowSize) {
        this.buffer = new ArrayDeque<>(windowSize);
        this.sourceIterator = Objects.requireNonNull(source).iterator();
        this.windowSize = windowSize;
        this.size = calculateSize(source, windowSize);
    }

    @Override
    public boolean tryAdvance(Consumer<? super Stream<T>> action) {
        if (windowSize < 1) {
            return false;
        }

        while (sourceIterator.hasNext()) {
            buffer.add(sourceIterator.next());

            if (buffer.size() == windowSize) {
                action.accept(Arrays.stream((T[]) buffer.toArray(new Object[0])));
                buffer.poll();
                return true;
            }
        }

        return true;
    }

    @Override
    public Spliterator<Stream<T>> trySplit() {
        return null;
    }

    @Override
    public long estimateSize() {
        return size;
    }

    @Override
    public int characteristics() {
        return ORDERED | NONNULL | SIZED;
    }

    private static int calculateSize(Collection<?> source, int windowSize) {
        return source.size() < windowSize
          ? 0
          : source.size() - windowSize + 1;
    }
}

Source

The complete example can also be found on GitHub.

Have a good idea of how to improve it? Feel free to issue a PR!




If you enjoyed the content, consider supporting the site: