Press "Enter" to skip to content

Think Twice Before Using Stream#generate in Java

Think twice before using Stream#generate…improperly.

Stream#generate Overview

Simply put, Stream#generate can be used for generating infinite Stream sequences by providing a custom implementation of Supplier<T>.

Works great for constant Streams:

Stream.generate(() -> 42)
    .limit(3)
    .forEach(System.out::print);

// 424242

Works great for generating random values as well:

Stream.generate(new Random()::nextInt)
  .limit(10)
  .forEach(System.out::println);

// some random values, trust me on this one, you must

Although, the preferred way of achieving the same would be by leveraging Random#ints:

new Random().ints()
  .limit(10)
  .forEach(System.out::println);

Stateful Suppliers

However, let’s try to go for a more interesting use-case and try to use a stateful Supplier as the base:

Stream.generate(new Supplier<Integer>() {

    private int value;

    @Override
    public Integer get() {
        return value++;
    }})
    .limit(8)
    .forEach(System.out::println);

// 0
// 1
// 2
// 3
// 4
// 5
// 6
// 7
// 8

Works just fine. However, it’d be much easier to leverage Stream#iterate to achieve the same:

Stream.iterate(0, i -> i + 1)
  .limit(10)
  ...

Stateful Suppliers… in Parallel

All good so far, let’s try to do the same in parallel:

Stream.generate(new Supplier<Integer>() {

    private int value;

    @Override
    public Integer get() {
        return value++;
    }}).parallel()
    .limit(20)
    .forEach(System.out::println);

// 0
// 2
// 4
// 6
// 8
// 3
// 1
// 9
// 7
// 5
...

Suddenly, it’s all messed up. That’s all because the implementation is not thread-safe, right?

Let’s fix it:

Stream.generate(new Supplier<Integer>() {

    private AtomicInteger value = new AtomicInteger();

    @Override
    public Integer get() {
        return value.getAndIncrement();
    }}).parallel()
    .limit(20)
    .forEach(System.out::println);

// 0
// 1
// 2
// 3
// 4
// 5
// 7
// 6
// 9
// 12
...

Still not perfect, let’s assume that this solution is 100% thread safe, and keep investigating further by adding a Stream#peek() call to figure out what actually goes through Stream:

List<Integer> result = Stream.generate(new Supplier<Integer>() {
    private AtomicInteger value = new AtomicInteger();

    @Override
    public Integer get() {
        return value.getAndIncrement();
    }
}).parallel()
    .peek(System.out::println)
    .limit(4)
    .collect(Collectors.toList());

System.out.println("results: " + result);

// 0
// 6
// 7
// 5
// 3
// 4
// 2
// 1
// 10
// 8
// 9
results: [5, 6, 0, 7]

Wow, not only the Stream processes more items than asked for but results contain numbers that seem to go out of expected range. Why is that?

Explanation

As expected, subtle hints can be found in… the documentation of the method itself:

/**
 * Returns an infinite sequential unordered stream where each element is
 * generated by the provided {@code Supplier}.  This is suitable for
 * generating constant streams, streams of random elements, etc.
 * ...
 */

The devil lies in a single word here – unordered – which means we should never assume that items emitted by Stream#generate will maintain order – even in single-threaded sequential scenarios!

But why does Stream process more elements that the limit in the parallel scenario?

It’s quite straightforward – a parallel stream instance gets split into substreams that get processed in parallel, but the instance gets short-circuited only as soon as it figures out that it’s above the processing limit. It could be avoided, but it’s probably not worth paying the price of synchronization.

Summary

Never assume that Stream#generate returns elements in any particular order – even if the implementation of the Supplier seems to do this.




If you enjoyed the content, consider supporting the site: