Think twice before using Stream#generate…improperly.
Stream#generate Overview
Simply put, Stream#generate can be used for generating infinite Stream sequences by providing a custom implementation of Supplier<T>.
Works great for constant Streams:
Stream.generate(() -> 42) .limit(3) .forEach(System.out::print); // 424242
Works great for generating random values as well:
Stream.generate(new Random()::nextInt) .limit(10) .forEach(System.out::println); // some random values, trust me on this one, you must
Although, the preferred way of achieving the same would be by leveraging Random#ints:
new Random().ints() .limit(10) .forEach(System.out::println);
Stateful Suppliers
However, let’s try to go for a more interesting use-case and try to use a stateful Supplier as the base:
Stream.generate(new Supplier<Integer>() { private int value; @Override public Integer get() { return value++; }}) .limit(8) .forEach(System.out::println); // 0 // 1 // 2 // 3 // 4 // 5 // 6 // 7 // 8
Works just fine. However, it’d be much easier to leverage Stream#iterate to achieve the same:
Stream.iterate(0, i -> i + 1) .limit(10) ...
Stateful Suppliers… in Parallel
All good so far, let’s try to do the same in parallel:
Stream.generate(new Supplier<Integer>() { private int value; @Override public Integer get() { return value++; }}).parallel() .limit(20) .forEach(System.out::println); // 0 // 2 // 4 // 6 // 8 // 3 // 1 // 9 // 7 // 5 ...
Suddenly, it’s all messed up. That’s all because the implementation is not thread-safe, right?
Let’s fix it:
Stream.generate(new Supplier<Integer>() { private AtomicInteger value = new AtomicInteger(); @Override public Integer get() { return value.getAndIncrement(); }}).parallel() .limit(20) .forEach(System.out::println); // 0 // 1 // 2 // 3 // 4 // 5 // 7 // 6 // 9 // 12 ...
Still not perfect, let’s assume that this solution is 100% thread safe, and keep investigating further by adding a Stream#peek() call to figure out what actually goes through Stream:
List<Integer> result = Stream.generate(new Supplier<Integer>() { private AtomicInteger value = new AtomicInteger(); @Override public Integer get() { return value.getAndIncrement(); } }).parallel() .peek(System.out::println) .limit(4) .collect(Collectors.toList()); System.out.println("results: " + result); // 0 // 6 // 7 // 5 // 3 // 4 // 2 // 1 // 10 // 8 // 9 results: [5, 6, 0, 7]
Wow, not only the Stream processes more items than asked for but results contain numbers that seem to go out of expected range. Why is that?
Explanation
As expected, subtle hints can be found in… the documentation of the method itself:
/** * Returns an infinite sequential unordered stream where each element is * generated by the provided {@code Supplier}. This is suitable for * generating constant streams, streams of random elements, etc. * ... */
The devil lies in a single word here – unordered – which means we should never assume that items emitted by Stream#generate will maintain order – even in single-threaded sequential scenarios!
But why does Stream process more elements that the limit in the parallel scenario?
It’s quite straightforward – a parallel stream instance gets split into substreams that get processed in parallel, but the instance gets short-circuited only as soon as it figures out that it’s above the processing limit. It could be avoided, but it’s probably not worth paying the price of synchronization.
Summary
Never assume that Stream#generate returns elements in any particular order – even if the implementation of the Supplier seems to do this.