Mercurial > hg > openjdk > lambda > jdk
changeset 7529:ccf54b3cd35b
Move reduce functionality in OpUtils to ReduceOp.
author | psandoz |
---|---|
date | Mon, 25 Feb 2013 12:21:06 +0100 |
parents | b8d764bb3215 |
children | 12dac52b2230 |
files | src/share/classes/java/util/stream/DistinctOp.java src/share/classes/java/util/stream/OpUtils.java src/share/classes/java/util/stream/ReduceOp.java |
diffstat | 3 files changed, 80 insertions(+), 94 deletions(-) [+] |
line wrap: on
line diff
--- a/src/share/classes/java/util/stream/DistinctOp.java Mon Feb 25 10:18:53 2013 +0100 +++ b/src/share/classes/java/util/stream/DistinctOp.java Mon Feb 25 12:21:06 2013 +0100 @@ -115,6 +115,8 @@ return helper.collectOutput(false); } else if (StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) { + // If the stream is SORTED then it should also be ORDERED so the following will also + // preserve the sort order TerminalOp<T, LinkedHashSet<T>> reduceOp = ReduceOp.<T, LinkedHashSet<T>>makeRef(LinkedHashSet::new, LinkedHashSet::add, LinkedHashSet::addAll);
--- a/src/share/classes/java/util/stream/OpUtils.java Mon Feb 25 10:18:53 2013 +0100 +++ b/src/share/classes/java/util/stream/OpUtils.java Mon Feb 25 12:21:06 2013 +0100 @@ -82,38 +82,6 @@ return nb.build(); } - /** - * Given a {@link PipelineHelper} describing a stream source and a sequence of intermediate stream operations, - * perform a reduction operation on the stream, in parallel, using the supplied {@link AccumulatingSink} type. - * The {@code AccumulatingSink} must represent an associative reducing operation. - * - * @param helper A {@code PipelineHelper} describing the stream source and operations - * @param factory A {@code Supplier} for an {@code AccumulatingSink} which performs the associative reducing - * operation - * @param <P_IN> The input type of the stream pipeline - * @param <P_OUT> The output type of the stream pipeline - * @param <R> The result type of the reduction operation - * @param <S> The type of the {@code AccumulatingSink} - * @return The result of the reduction operation - */ - public static<P_IN, P_OUT, R, S extends AccumulatingSink<P_OUT, R, S>> - R parallelReduce(PipelineHelper<P_IN, P_OUT> helper, Supplier<S> factory) { - S sink = new ReduceTask<>(helper, factory).invoke(); - return sink.get(); - } - - /** - * A type of {@code TerminalSink} that implements an associative reducing operation on elements of type - * {@code T} and producing a result of type {@code R}. - * - * @param <T> The type of input element to the combining operation - * @param <R> The result type - * @param <K> The type of the {@code AccumulatingSink} - */ - public interface AccumulatingSink<T, R, K extends AccumulatingSink<T, R, K>> extends TerminalSink<T, R> { - public void combine(K other); - } - /** A {@code ForkJoinTask} for performing a parallel for-each operation */ private static class ForEachTask<S, T> extends CountedCompleter<Void> { @@ -167,45 +135,4 @@ } } - /** A {@code ForkJoinTask} for performing a parallel reduce operation */ - private static class ReduceTask<P_IN, P_OUT, R, S extends AccumulatingSink<P_OUT, R, S>> - extends AbstractTask<P_IN, P_OUT, S, ReduceTask<P_IN, P_OUT, R, S>> { - private final Supplier<S> sinkFactory; - - private ReduceTask(PipelineHelper<P_IN, P_OUT> helper, Supplier<S> sinkFactory) { - super(helper); - this.sinkFactory = sinkFactory; - } - - private ReduceTask(ReduceTask<P_IN, P_OUT, R, S> parent, Spliterator<P_IN> spliterator) { - super(parent, spliterator); - this.sinkFactory = parent.sinkFactory; - } - - @Override - protected ReduceTask<P_IN, P_OUT, R, S> makeChild(Spliterator<P_IN> spliterator) { - return new ReduceTask<>(this, spliterator); - } - - @Override - protected S doLeaf() { - return helper.into(sinkFactory.get(), spliterator); - } - - @Override - public void onCompletion(CountedCompleter caller) { - if (!isLeaf()) { - ReduceTask<P_IN, P_OUT, R, S> child = children; - S result = child.getLocalResult(); - child = child.nextSibling; - for (; child != null; child = child.nextSibling) { - S otherResult = child.getLocalResult(); - result.combine(otherResult); - child.setLocalResult(null); // GC otherResult - } - setLocalResult(result); - } - super.onCompletion(caller); - } - } }
--- a/src/share/classes/java/util/stream/ReduceOp.java Mon Feb 25 10:18:53 2013 +0100 +++ b/src/share/classes/java/util/stream/ReduceOp.java Mon Feb 25 12:21:06 2013 +0100 @@ -28,6 +28,8 @@ import java.util.OptionalDouble; import java.util.OptionalInt; import java.util.OptionalLong; +import java.util.Spliterator; +import java.util.concurrent.CountedCompleter; import java.util.function.BiConsumer; import java.util.function.BiFunction; import java.util.function.BinaryOperator; @@ -41,24 +43,37 @@ /** * A {@code TerminalOp} that evaluates a stream pipeline and sends the output into an {@code AccumulatingSink}, - * which performs a reduce operation + * which performs a reduce operation. The {@code AccumulatingSink} must represent an associative reducing operation. * * @param <T> The output type of the stream pipeline * @param <R> The result type of the reducing operation * @param <S> The type of the {@code AccumulatingSink} * @since 1.8 */ -class ReduceOp<T, R, S extends OpUtils.AccumulatingSink<T, R, S>> implements TerminalOp<T, R> { +// @@@ Can ReduceOp.AccumulatingSink be removed from the class type signature? +final class ReduceOp<T, R, S extends ReduceOp.AccumulatingSink<T, R, S>> implements TerminalOp<T, R> { private final Supplier<S> sinkSupplier; private final StreamShape inputShape; /** + * A type of {@code TerminalSink} that implements an associative reducing operation on elements of type + * {@code T} and producing a result of type {@code R}. + * + * @param <T> The type of input element to the combining operation + * @param <R> The result type + * @param <K> The type of the {@code AccumulatingSink}. + */ + static interface AccumulatingSink<T, R, K extends AccumulatingSink<T, R, K>> extends TerminalSink<T, R> { + public void combine(K other); + } + + /** * Create a {@code ReduceOp} of the specified stream shape which uses the specified {@code Supplier} to create * accumulating sinks * @param shape The shape of the stream pipeline * @param supplier A factory for {@code AccumulatingSinks} */ - public ReduceOp(StreamShape shape, Supplier<S> supplier) { + private ReduceOp(StreamShape shape, Supplier<S> supplier) { sinkSupplier = supplier; inputShape = shape; } @@ -68,7 +83,7 @@ * accumulating sinks * @param supplier A factory for {@code AccumulatingSinks} */ - public ReduceOp(Supplier<S> supplier) { + private ReduceOp(Supplier<S> supplier) { this(StreamShape.REFERENCE, supplier); } @@ -77,16 +92,16 @@ return inputShape; } - public <S> R evaluateSequential(PipelineHelper<S, T> helper) { + @Override + public <P_IN> R evaluateSequential(PipelineHelper<P_IN, T> helper) { return helper.into(sinkSupplier.get(), helper.sourceSpliterator()).get(); } @Override - public <S> R evaluateParallel(PipelineHelper<S, T> helper) { - return OpUtils.parallelReduce(helper, sinkSupplier); + public <P_IN> R evaluateParallel(PipelineHelper<P_IN, T> helper) { + return new ReduceTask<>(helper, sinkSupplier).invoke().get(); } - /** * State box for a single state element, used as a base class for {@code AccumulatingSink} instances * @param <U> The type of the state element @@ -110,7 +125,7 @@ */ public static<T, U> TerminalOp<T, U> makeRef(U seed, BiFunction<U, ? super T, U> reducer, BinaryOperator<U> combiner) { - class ReducingSink extends Box<U> implements OpUtils.AccumulatingSink<T, U, ReducingSink>, Sink<T> { + class ReducingSink extends Box<U> implements AccumulatingSink<T, U, ReducingSink> { @Override public void begin(long size) { state = seed; @@ -142,7 +157,7 @@ */ public static<T> TerminalOp<T, Optional<T>> makeRef(BinaryOperator<T> operator) { - class ReducingSink implements OpUtils.AccumulatingSink<T, Optional<T>, ReducingSink> { + class ReducingSink implements AccumulatingSink<T, Optional<T>, ReducingSink> { private boolean empty; private T state; @@ -190,7 +205,7 @@ Supplier<R> supplier = collector.resultSupplier(); BiConsumer<R, ? super T> accumulator = collector.accumulator(); BinaryOperator<R> combiner = collector.combiner(); - class ReducingSink extends Box<R> implements OpUtils.AccumulatingSink<T, R, ReducingSink> { + class ReducingSink extends Box<R> implements AccumulatingSink<T, R, ReducingSink> { @Override public void begin(long size) { state = supplier.get(); @@ -223,7 +238,7 @@ */ public static<T, R> TerminalOp<T, R> makeRef(Supplier<R> seedFactory, BiConsumer<R, ? super T> accumulator, BiConsumer<R,R> reducer) { - class ReducingSink extends Box<R> implements OpUtils.AccumulatingSink<T, R, ReducingSink> { + class ReducingSink extends Box<R> implements AccumulatingSink<T, R, ReducingSink> { @Override public void begin(long size) { state = seedFactory.get(); @@ -253,7 +268,7 @@ */ public static TerminalOp<Integer, Integer> makeInt(int identity, IntBinaryOperator operator) { - class ReducingSink implements OpUtils.AccumulatingSink<Integer, Integer, ReducingSink>, Sink.OfInt { + class ReducingSink implements AccumulatingSink<Integer, Integer, ReducingSink>, Sink.OfInt { private int state; @Override @@ -292,7 +307,7 @@ */ public static TerminalOp<Integer, OptionalInt> makeInt(IntBinaryOperator operator) { - class ReducingSink implements OpUtils.AccumulatingSink<Integer, OptionalInt, ReducingSink>, Sink.OfInt { + class ReducingSink implements AccumulatingSink<Integer, OptionalInt, ReducingSink>, Sink.OfInt { private boolean empty; private int state; @@ -343,7 +358,7 @@ Supplier<R> supplier = collector.resultSupplier(); ObjIntConsumer<R> accumulator = collector.intAccumulator(); BinaryOperator<R> combiner = collector.combiner(); - class ReducingSink extends Box<R> implements OpUtils.AccumulatingSink<Integer, R, ReducingSink>, Sink.OfInt { + class ReducingSink extends Box<R> implements AccumulatingSink<Integer, R, ReducingSink>, Sink.OfInt { @Override public void begin(long size) { state = supplier.get(); @@ -376,7 +391,7 @@ */ public static TerminalOp<Long, Long> makeLong(long identity, LongBinaryOperator operator) { - class ReducingSink implements OpUtils.AccumulatingSink<Long, Long, ReducingSink>, Sink.OfLong { + class ReducingSink implements AccumulatingSink<Long, Long, ReducingSink>, Sink.OfLong { private long state; @Override @@ -414,7 +429,7 @@ */ public static TerminalOp<Long, OptionalLong> makeLong(LongBinaryOperator operator) { - class ReducingSink implements OpUtils.AccumulatingSink<Long, OptionalLong, ReducingSink>, Sink.OfLong { + class ReducingSink implements AccumulatingSink<Long, OptionalLong, ReducingSink>, Sink.OfLong { private boolean empty; private long state; @@ -465,7 +480,7 @@ Supplier<R> supplier = collector.resultSupplier(); ObjLongConsumer<R> accumulator = collector.longAccumulator(); BinaryOperator<R> combiner = collector.combiner(); - class ReducingSink extends Box<R> implements OpUtils.AccumulatingSink<Long, R, ReducingSink>, Sink.OfLong { + class ReducingSink extends Box<R> implements AccumulatingSink<Long, R, ReducingSink>, Sink.OfLong { @Override public void begin(long size) { state = supplier.get(); @@ -498,7 +513,7 @@ */ public static TerminalOp<Double, Double> makeDouble(double identity, DoubleBinaryOperator operator) { - class ReducingSink implements OpUtils.AccumulatingSink<Double, Double, ReducingSink>, Sink.OfDouble { + class ReducingSink implements AccumulatingSink<Double, Double, ReducingSink>, Sink.OfDouble { private double state; @Override @@ -537,7 +552,7 @@ */ public static TerminalOp<Double, OptionalDouble> makeDouble(DoubleBinaryOperator operator) { - class ReducingSink implements OpUtils.AccumulatingSink<Double, OptionalDouble, ReducingSink>, Sink.OfDouble { + class ReducingSink implements AccumulatingSink<Double, OptionalDouble, ReducingSink>, Sink.OfDouble { private boolean empty; private double state; @@ -588,7 +603,7 @@ ObjDoubleConsumer<R> accumulator = collector.doubleAccumulator(); BinaryOperator<R> combiner = collector.combiner(); - class ReducingSink extends Box<R> implements OpUtils.AccumulatingSink<Double, R, ReducingSink>, Sink.OfDouble { + class ReducingSink extends Box<R> implements AccumulatingSink<Double, R, ReducingSink>, Sink.OfDouble { @Override public void begin(long size) { state = supplier.get(); @@ -612,4 +627,46 @@ } }); } + + /** A {@code ForkJoinTask} for performing a parallel reduce operation */ + private static class ReduceTask<P_IN, P_OUT, R, S extends AccumulatingSink<P_OUT, R, S>> + extends AbstractTask<P_IN, P_OUT, S, ReduceTask<P_IN, P_OUT, R, S>> { + private final Supplier<S> sinkFactory; + + private ReduceTask(PipelineHelper<P_IN, P_OUT> helper, Supplier<S> sinkFactory) { + super(helper); + this.sinkFactory = sinkFactory; + } + + private ReduceTask(ReduceTask<P_IN, P_OUT, R, S> parent, Spliterator<P_IN> spliterator) { + super(parent, spliterator); + this.sinkFactory = parent.sinkFactory; + } + + @Override + protected ReduceTask<P_IN, P_OUT, R, S> makeChild(Spliterator<P_IN> spliterator) { + return new ReduceTask<>(this, spliterator); + } + + @Override + protected S doLeaf() { + return helper.into(sinkFactory.get(), spliterator); + } + + @Override + public void onCompletion(CountedCompleter caller) { + if (!isLeaf()) { + ReduceTask<P_IN, P_OUT, R, S> child = children; + S result = child.getLocalResult(); + child = child.nextSibling; + for (; child != null; child = child.nextSibling) { + S otherResult = child.getLocalResult(); + result.combine(otherResult); + child.setLocalResult(null); // GC otherResult + } + setLocalResult(result); + } + super.onCompletion(caller); + } + } }