Press "Enter" to skip to content

A New JDK12 Stream API Collector – Collectors#teeing

When following major Java releases, we usually focus on bold and controversial JEPs while humble and tiny changes get introduced unnoticeably.

One of such changes is an addition of a new Stream API Collector to JDK12.

Collectors#teeing

First things first, the name is as puzzling for me as it is for you.

Simply put, it allows to collect a Stream using two independent collectors, and then merge their results using the supplied BiFunction.

For example, by using it you could calculate the Expected Value of a random variable representing uniform distribution:

// import static java.util.stream.Collectors.*;

Double ev = Stream.of(1, 2, 3, 4, 5, 6) // dice roll
  .collect(teeing(
    summingDouble(i -> i),
    counting(),
    (sum, n) -> sum / n));

System.out.println(ev); // 3.5

Or possibly derive a new Collector that would do the same:

private static Collector<Integer, ?, Double> derivingExpectedValue() {
    return teeing(
      summingDouble(i -> i),
      counting(),
      (sum, n) -> sum / n);
}
Double ev = Stream.of(1, 2, 3, 4, 5, 6)
  .collect(derivingExpectedValue());

Teeing() will play along with Collectors#groupingBy.

Before JDK12

Without that Collector in place, we’d need to work our way around with something like:

Integer[] stream = Stream.of(1, 2, 3, 4, 5, 6).toArray(Integer[]::new);
Double ev = IntStream.range(0, stream.length).boxed()
  .reduce(0d, 
    (acc, i) -> acc + (((double) stream[i]) / stream.length), 
    (acc1, acc2) -> acc1 + acc2);

…or just with a plain loop:

Integer[] stream = Stream.of(1, 2, 3, 4, 5, 6).toArray(Integer[]::new);
double ev = 0d;
for (Integer integer : stream) {
    ev = ev + (((double) integer) / stream.length);
}

Implementation

Collectors mechanism is very extensible but it turns out that the implementation of this particular one turns out to be quite painstaking (especially deriving characteristics of a joined Collector):

public static <T, R1, R2, R> Collector<T, ?, R> teeing(
  Collector<? super T, ?, R1> downstream1,
  Collector<? super T, ?, R2> downstream2,
  BiFunction<? super R1, ? super R2, R> merger) {
    return teeing0(downstream1, downstream2, merger);
}

private static <T, A1, A2, R1, R2, R> Collector<T, ?, R> teeing0(
  Collector<? super T, A1, R1> downstream1,
  Collector<? super T, A2, R2> downstream2,
  BiFunction<? super R1, ? super R2, R> merger) {
    Objects.requireNonNull(downstream1, "downstream1");
    Objects.requireNonNull(downstream2, "downstream2");
    Objects.requireNonNull(merger, "merger");

    Supplier<A1> c1Supplier = Objects.requireNonNull(
      downstream1.supplier(), "downstream1 supplier");
    Supplier<A2> c2Supplier = Objects.requireNonNull(
      downstream2.supplier(), "downstream2 supplier");
    BiConsumer<A1, ? super T> c1Accumulator = Objects.requireNonNull(
      downstream1.accumulator(), "downstream1 accumulator");
    BiConsumer<A2, ? super T> c2Accumulator = Objects.requireNonNull(
      downstream2.accumulator(), "downstream2 accumulator");
    BinaryOperator<A1> c1Combiner = Objects.requireNonNull(
      downstream1.combiner(), "downstream1 combiner");
    BinaryOperator<A2> c2Combiner = Objects.requireNonNull(
      downstream2.combiner(), "downstream2 combiner");
    Function<A1, R1> c1Finisher = Objects.requireNonNull(
      downstream1.finisher(), "downstream1 finisher");
    Function<A2, R2> c2Finisher = Objects.requireNonNull(
      downstream2.finisher(), "downstream2 finisher");

    Set<Collector.Characteristics> characteristics;
    Set<Collector.Characteristics> c1Characteristics = downstream1
      .characteristics();
    Set<Collector.Characteristics> c2Characteristics = downstream2
      .characteristics();
    if (CH_ID.containsAll(c1Characteristics) 
      || CH_ID.containsAll(c2Characteristics)) {
        characteristics = CH_NOID;
    } else {
        EnumSet<Collector.Characteristics> c = EnumSet.noneOf(
          Collector.Characteristics.class);
        c.addAll(c1Characteristics);
        c.retainAll(c2Characteristics);
        c.remove(Collector.Characteristics.IDENTITY_FINISH);
        characteristics = Collections.unmodifiableSet(c);
    }

    class PairBox {
        A1 left = c1Supplier.get();
        A2 right = c2Supplier.get();

        void add(T t) {
            c1Accumulator.accept(left, t);
            c2Accumulator.accept(right, t);
        }

        PairBox combine(PairBox other) {
            left = c1Combiner.apply(left, other.left);
            right = c2Combiner.apply(right, other.right);
            return this;
        }

        R get() {
            R1 r1 = c1Finisher.apply(left);
            R2 r2 = c2Finisher.apply(right);
            return merger.apply(r1, r2);
        }
    }

    return new CollectorImpl<>(PairBox::new, PairBox::add, 
      PairBox::combine, PairBox::get, characteristics);
}

The full implementation can be found here.

Summary

We got a new Collector for Stream API – you can read the whole backstory here.

All code snippets can be found on GitHub.




If you enjoyed the content, consider supporting the site: