changeset 7529:ccf54b3cd35b

Move reduce functionality in OpUtils to ReduceOp.
author psandoz
date Mon, 25 Feb 2013 12:21:06 +0100
parents b8d764bb3215
children 12dac52b2230
files src/share/classes/java/util/stream/DistinctOp.java src/share/classes/java/util/stream/OpUtils.java src/share/classes/java/util/stream/ReduceOp.java
diffstat 3 files changed, 80 insertions(+), 94 deletions(-) [+]
line wrap: on
line diff
--- a/src/share/classes/java/util/stream/DistinctOp.java	Mon Feb 25 10:18:53 2013 +0100
+++ b/src/share/classes/java/util/stream/DistinctOp.java	Mon Feb 25 12:21:06 2013 +0100
@@ -115,6 +115,8 @@
             return helper.collectOutput(false);
         }
         else if (StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) {
+            // If the stream is SORTED then it should also be ORDERED so the following will also
+            // preserve the sort order
             TerminalOp<T, LinkedHashSet<T>> reduceOp
                     = ReduceOp.<T, LinkedHashSet<T>>makeRef(LinkedHashSet::new, LinkedHashSet::add,
                                                             LinkedHashSet::addAll);
--- a/src/share/classes/java/util/stream/OpUtils.java	Mon Feb 25 10:18:53 2013 +0100
+++ b/src/share/classes/java/util/stream/OpUtils.java	Mon Feb 25 12:21:06 2013 +0100
@@ -82,38 +82,6 @@
         return nb.build();
     }
 
-    /**
-     * Given a {@link PipelineHelper} describing a stream source and a sequence of intermediate stream operations,
-     * perform a reduction operation on the stream, in parallel, using the supplied {@link AccumulatingSink} type.
-     * The {@code AccumulatingSink} must represent an associative reducing operation.
-     *
-     * @param helper A {@code PipelineHelper} describing the stream source and operations
-     * @param factory A {@code Supplier} for an {@code AccumulatingSink} which performs the associative reducing
-     *                operation
-     * @param <P_IN> The input type of the stream pipeline
-     * @param <P_OUT> The output type of the stream pipeline
-     * @param <R> The result type of the reduction operation
-     * @param <S> The type of the {@code AccumulatingSink}
-     * @return The result of the reduction operation
-     */
-    public static<P_IN, P_OUT, R, S extends AccumulatingSink<P_OUT, R, S>>
-    R parallelReduce(PipelineHelper<P_IN, P_OUT> helper, Supplier<S> factory) {
-        S sink = new ReduceTask<>(helper, factory).invoke();
-        return sink.get();
-    }
-
-    /**
-     * A type of {@code TerminalSink} that implements an associative reducing operation on elements of type
-     * {@code T} and producing a result of type {@code R}.
-     *
-     * @param <T> The type of input element to the combining operation
-     * @param <R> The result type
-     * @param <K> The type of the {@code AccumulatingSink}
-     */
-    public interface AccumulatingSink<T, R, K extends AccumulatingSink<T, R, K>> extends TerminalSink<T, R> {
-        public void combine(K other);
-    }
-
 
     /** A {@code ForkJoinTask} for performing a parallel for-each operation */
     private static class ForEachTask<S, T> extends CountedCompleter<Void> {
@@ -167,45 +135,4 @@
         }
     }
 
-    /** A {@code ForkJoinTask} for performing a parallel reduce operation */
-    private static class ReduceTask<P_IN, P_OUT, R, S extends AccumulatingSink<P_OUT, R, S>>
-            extends AbstractTask<P_IN, P_OUT, S, ReduceTask<P_IN, P_OUT, R, S>> {
-        private final Supplier<S> sinkFactory;
-
-        private ReduceTask(PipelineHelper<P_IN, P_OUT> helper, Supplier<S> sinkFactory) {
-            super(helper);
-            this.sinkFactory = sinkFactory;
-        }
-
-        private ReduceTask(ReduceTask<P_IN, P_OUT, R, S> parent, Spliterator<P_IN> spliterator) {
-            super(parent, spliterator);
-            this.sinkFactory = parent.sinkFactory;
-        }
-
-        @Override
-        protected ReduceTask<P_IN, P_OUT, R, S> makeChild(Spliterator<P_IN> spliterator) {
-            return new ReduceTask<>(this, spliterator);
-        }
-
-        @Override
-        protected S doLeaf() {
-            return helper.into(sinkFactory.get(), spliterator);
-        }
-
-        @Override
-        public void onCompletion(CountedCompleter caller) {
-            if (!isLeaf()) {
-                ReduceTask<P_IN, P_OUT, R, S> child = children;
-                S result = child.getLocalResult();
-                child = child.nextSibling;
-                for (; child != null; child = child.nextSibling) {
-                    S otherResult = child.getLocalResult();
-                    result.combine(otherResult);
-                    child.setLocalResult(null); // GC otherResult
-                }
-                setLocalResult(result);
-            }
-            super.onCompletion(caller);
-        }
-    }
 }
--- a/src/share/classes/java/util/stream/ReduceOp.java	Mon Feb 25 10:18:53 2013 +0100
+++ b/src/share/classes/java/util/stream/ReduceOp.java	Mon Feb 25 12:21:06 2013 +0100
@@ -28,6 +28,8 @@
 import java.util.OptionalDouble;
 import java.util.OptionalInt;
 import java.util.OptionalLong;
+import java.util.Spliterator;
+import java.util.concurrent.CountedCompleter;
 import java.util.function.BiConsumer;
 import java.util.function.BiFunction;
 import java.util.function.BinaryOperator;
@@ -41,24 +43,37 @@
 
 /**
  * A {@code TerminalOp} that evaluates a stream pipeline and sends the output into an {@code AccumulatingSink},
- * which performs a reduce operation
+ * which performs a reduce operation. The {@code AccumulatingSink} must represent an associative reducing operation.
  *
  * @param <T> The output type of the stream pipeline
  * @param <R> The result type of the reducing operation
  * @param <S> The type of the {@code AccumulatingSink}
  * @since 1.8
  */
-class ReduceOp<T, R, S extends OpUtils.AccumulatingSink<T, R, S>> implements TerminalOp<T, R> {
+// @@@ Can ReduceOp.AccumulatingSink be removed from the class type signature?
+final class ReduceOp<T, R, S extends ReduceOp.AccumulatingSink<T, R, S>> implements TerminalOp<T, R> {
     private final Supplier<S> sinkSupplier;
     private final StreamShape inputShape;
 
     /**
+     * A type of {@code TerminalSink} that implements an associative reducing operation on elements of type
+     * {@code T} and producing a result of type {@code R}.
+     *
+     * @param <T> The type of input element to the combining operation
+     * @param <R> The result type
+     * @param <K> The type of the {@code AccumulatingSink}.
+     */
+    static interface AccumulatingSink<T, R, K extends AccumulatingSink<T, R, K>> extends TerminalSink<T, R> {
+        public void combine(K other);
+    }
+
+    /**
      * Create a {@code ReduceOp} of the specified stream shape which uses the specified {@code Supplier} to create
      * accumulating sinks
      * @param shape The shape of the stream pipeline
      * @param supplier A factory for {@code AccumulatingSinks}
      */
-    public ReduceOp(StreamShape shape, Supplier<S> supplier) {
+    private ReduceOp(StreamShape shape, Supplier<S> supplier) {
         sinkSupplier = supplier;
         inputShape = shape;
     }
@@ -68,7 +83,7 @@
      * accumulating sinks
      * @param supplier A factory for {@code AccumulatingSinks}
      */
-    public ReduceOp(Supplier<S> supplier) {
+    private ReduceOp(Supplier<S> supplier) {
         this(StreamShape.REFERENCE, supplier);
     }
 
@@ -77,16 +92,16 @@
         return inputShape;
     }
 
-    public <S> R evaluateSequential(PipelineHelper<S, T> helper) {
+    @Override
+    public <P_IN> R evaluateSequential(PipelineHelper<P_IN, T> helper) {
         return helper.into(sinkSupplier.get(), helper.sourceSpliterator()).get();
     }
 
     @Override
-    public <S> R evaluateParallel(PipelineHelper<S, T> helper) {
-        return OpUtils.parallelReduce(helper, sinkSupplier);
+    public <P_IN> R evaluateParallel(PipelineHelper<P_IN, T> helper) {
+        return new ReduceTask<>(helper, sinkSupplier).invoke().get();
     }
 
-
     /**
      * State box for a single state element, used as a base class for {@code AccumulatingSink} instances
      * @param <U> The type of the state element
@@ -110,7 +125,7 @@
      */
     public static<T, U> TerminalOp<T, U>
     makeRef(U seed, BiFunction<U, ? super T, U> reducer, BinaryOperator<U> combiner) {
-        class ReducingSink extends Box<U> implements OpUtils.AccumulatingSink<T, U, ReducingSink>, Sink<T> {
+        class ReducingSink extends Box<U> implements AccumulatingSink<T, U, ReducingSink> {
             @Override
             public void begin(long size) {
                 state = seed;
@@ -142,7 +157,7 @@
      */
     public static<T> TerminalOp<T, Optional<T>>
     makeRef(BinaryOperator<T> operator) {
-        class ReducingSink implements OpUtils.AccumulatingSink<T, Optional<T>, ReducingSink> {
+        class ReducingSink implements AccumulatingSink<T, Optional<T>, ReducingSink> {
             private boolean empty;
             private T state;
 
@@ -190,7 +205,7 @@
         Supplier<R> supplier = collector.resultSupplier();
         BiConsumer<R, ? super T> accumulator = collector.accumulator();
         BinaryOperator<R> combiner = collector.combiner();
-        class ReducingSink extends Box<R> implements OpUtils.AccumulatingSink<T, R, ReducingSink> {
+        class ReducingSink extends Box<R> implements AccumulatingSink<T, R, ReducingSink> {
             @Override
             public void begin(long size) {
                 state = supplier.get();
@@ -223,7 +238,7 @@
      */
     public static<T, R> TerminalOp<T, R>
     makeRef(Supplier<R> seedFactory, BiConsumer<R, ? super T> accumulator, BiConsumer<R,R> reducer) {
-        class ReducingSink extends Box<R> implements OpUtils.AccumulatingSink<T, R, ReducingSink> {
+        class ReducingSink extends Box<R> implements AccumulatingSink<T, R, ReducingSink> {
             @Override
             public void begin(long size) {
                 state = seedFactory.get();
@@ -253,7 +268,7 @@
      */
     public static TerminalOp<Integer, Integer>
     makeInt(int identity, IntBinaryOperator operator) {
-        class ReducingSink implements OpUtils.AccumulatingSink<Integer, Integer, ReducingSink>, Sink.OfInt {
+        class ReducingSink implements AccumulatingSink<Integer, Integer, ReducingSink>, Sink.OfInt {
             private int state;
 
             @Override
@@ -292,7 +307,7 @@
      */
     public static TerminalOp<Integer, OptionalInt>
     makeInt(IntBinaryOperator operator) {
-        class ReducingSink implements OpUtils.AccumulatingSink<Integer, OptionalInt, ReducingSink>, Sink.OfInt {
+        class ReducingSink implements AccumulatingSink<Integer, OptionalInt, ReducingSink>, Sink.OfInt {
             private boolean empty;
             private int state;
 
@@ -343,7 +358,7 @@
         Supplier<R> supplier = collector.resultSupplier();
         ObjIntConsumer<R> accumulator = collector.intAccumulator();
         BinaryOperator<R> combiner = collector.combiner();
-        class ReducingSink extends Box<R> implements OpUtils.AccumulatingSink<Integer, R, ReducingSink>, Sink.OfInt {
+        class ReducingSink extends Box<R> implements AccumulatingSink<Integer, R, ReducingSink>, Sink.OfInt {
             @Override
             public void begin(long size) {
                 state = supplier.get();
@@ -376,7 +391,7 @@
      */
     public static TerminalOp<Long, Long>
     makeLong(long identity, LongBinaryOperator operator) {
-        class ReducingSink implements OpUtils.AccumulatingSink<Long, Long, ReducingSink>, Sink.OfLong {
+        class ReducingSink implements AccumulatingSink<Long, Long, ReducingSink>, Sink.OfLong {
             private long state;
 
             @Override
@@ -414,7 +429,7 @@
      */
     public static TerminalOp<Long, OptionalLong>
     makeLong(LongBinaryOperator operator) {
-        class ReducingSink implements OpUtils.AccumulatingSink<Long, OptionalLong, ReducingSink>, Sink.OfLong {
+        class ReducingSink implements AccumulatingSink<Long, OptionalLong, ReducingSink>, Sink.OfLong {
             private boolean empty;
             private long state;
 
@@ -465,7 +480,7 @@
         Supplier<R> supplier = collector.resultSupplier();
         ObjLongConsumer<R> accumulator = collector.longAccumulator();
         BinaryOperator<R> combiner = collector.combiner();
-        class ReducingSink extends Box<R> implements OpUtils.AccumulatingSink<Long, R, ReducingSink>, Sink.OfLong {
+        class ReducingSink extends Box<R> implements AccumulatingSink<Long, R, ReducingSink>, Sink.OfLong {
             @Override
             public void begin(long size) {
                 state = supplier.get();
@@ -498,7 +513,7 @@
      */
     public static TerminalOp<Double, Double>
     makeDouble(double identity, DoubleBinaryOperator operator) {
-        class ReducingSink implements OpUtils.AccumulatingSink<Double, Double, ReducingSink>, Sink.OfDouble {
+        class ReducingSink implements AccumulatingSink<Double, Double, ReducingSink>, Sink.OfDouble {
             private double state;
 
             @Override
@@ -537,7 +552,7 @@
      */
     public static TerminalOp<Double, OptionalDouble>
     makeDouble(DoubleBinaryOperator operator) {
-        class ReducingSink implements OpUtils.AccumulatingSink<Double, OptionalDouble, ReducingSink>, Sink.OfDouble {
+        class ReducingSink implements AccumulatingSink<Double, OptionalDouble, ReducingSink>, Sink.OfDouble {
             private boolean empty;
             private double state;
 
@@ -588,7 +603,7 @@
         ObjDoubleConsumer<R> accumulator = collector.doubleAccumulator();
         BinaryOperator<R> combiner = collector.combiner();
 
-        class ReducingSink extends Box<R> implements OpUtils.AccumulatingSink<Double, R, ReducingSink>, Sink.OfDouble {
+        class ReducingSink extends Box<R> implements AccumulatingSink<Double, R, ReducingSink>, Sink.OfDouble {
             @Override
             public void begin(long size) {
                 state = supplier.get();
@@ -612,4 +627,46 @@
             }
         });
     }
+
+    /** A {@code ForkJoinTask} for performing a parallel reduce operation */
+    private static class ReduceTask<P_IN, P_OUT, R, S extends AccumulatingSink<P_OUT, R, S>>
+            extends AbstractTask<P_IN, P_OUT, S, ReduceTask<P_IN, P_OUT, R, S>> {
+        private final Supplier<S> sinkFactory;
+
+        private ReduceTask(PipelineHelper<P_IN, P_OUT> helper, Supplier<S> sinkFactory) {
+            super(helper);
+            this.sinkFactory = sinkFactory;
+        }
+
+        private ReduceTask(ReduceTask<P_IN, P_OUT, R, S> parent, Spliterator<P_IN> spliterator) {
+            super(parent, spliterator);
+            this.sinkFactory = parent.sinkFactory;
+        }
+
+        @Override
+        protected ReduceTask<P_IN, P_OUT, R, S> makeChild(Spliterator<P_IN> spliterator) {
+            return new ReduceTask<>(this, spliterator);
+        }
+
+        @Override
+        protected S doLeaf() {
+            return helper.into(sinkFactory.get(), spliterator);
+        }
+
+        @Override
+        public void onCompletion(CountedCompleter caller) {
+            if (!isLeaf()) {
+                ReduceTask<P_IN, P_OUT, R, S> child = children;
+                S result = child.getLocalResult();
+                child = child.nextSibling;
+                for (; child != null; child = child.nextSibling) {
+                    S otherResult = child.getLocalResult();
+                    result.combine(otherResult);
+                    child.setLocalResult(null); // GC otherResult
+                }
+                setLocalResult(result);
+            }
+            super.onCompletion(caller);
+        }
+    }
 }