changeset 7530:12dac52b2230

Move forEach functionality from OpUtils to ForEachOp.
author psandoz
date Mon, 25 Feb 2013 12:21:08 +0100
parents ccf54b3cd35b
children e343f811b2b4
files src/share/classes/java/util/stream/DistinctOp.java src/share/classes/java/util/stream/DoublePipeline.java src/share/classes/java/util/stream/ForEachOp.java src/share/classes/java/util/stream/ForEachUntilOp.java src/share/classes/java/util/stream/IntPipeline.java src/share/classes/java/util/stream/LongPipeline.java src/share/classes/java/util/stream/OpUtils.java src/share/classes/java/util/stream/ReferencePipeline.java
diffstat 8 files changed, 80 insertions(+), 100 deletions(-) [+]
line wrap: on
line diff
--- a/src/share/classes/java/util/stream/DistinctOp.java	Mon Feb 25 12:21:06 2013 +0100
+++ b/src/share/classes/java/util/stream/DistinctOp.java	Mon Feb 25 12:21:08 2013 +0100
@@ -123,22 +123,19 @@
             return Nodes.node(reduceOp.evaluateParallel(helper));
         }
         else {
-            final AtomicBoolean seenNull = new AtomicBoolean(false);
-            final ConcurrentHashMap<T, Boolean> map = new ConcurrentHashMap<>();
+            // Holder of null state since ConcurrentHashMap does not support null values
+            AtomicBoolean seenNull = new AtomicBoolean(false);
+            ConcurrentHashMap<T, Boolean> map = new ConcurrentHashMap<>();
+            TerminalOp<T, Void> forEachOp = ForEachOp.makeRef(t -> {
+                if (t == null)
+                    seenNull.set(true);
+                else
+                    map.putIfAbsent(t, Boolean.TRUE);
+            });
+            forEachOp.evaluateParallel(helper);
 
-            // Cache the sink chain, so it can be reused by all F/J leaf tasks
-            Sink<S> sinkChain = helper.wrapSink(new Sink<T>() {
-                @Override
-                public void accept(T t) {
-                    if (t == null)
-                        seenNull.set(true);
-                    else
-                        map.putIfAbsent(t, Boolean.TRUE);
-                }
-            });
-
-            OpUtils.parallelForEach(helper, sinkChain);
-
+            // If null has been seen then copy the key set into a HashSet that supports null values
+            // and add null
             Set<T> keys = map.keySet();
             if (seenNull.get()) {
                 keys = new HashSet<>(keys);
--- a/src/share/classes/java/util/stream/DoublePipeline.java	Mon Feb 25 12:21:06 2013 +0100
+++ b/src/share/classes/java/util/stream/DoublePipeline.java	Mon Feb 25 12:21:08 2013 +0100
@@ -241,7 +241,7 @@
 
     @Override
     public void forEach(DoubleConsumer consumer) {
-        pipeline(ForEachOp.make(consumer));
+        pipeline(ForEachOp.makeDouble(consumer));
     }
 
     @Override
--- a/src/share/classes/java/util/stream/ForEachOp.java	Mon Feb 25 12:21:06 2013 +0100
+++ b/src/share/classes/java/util/stream/ForEachOp.java	Mon Feb 25 12:21:08 2013 +0100
@@ -24,6 +24,8 @@
  */
 package java.util.stream;
 
+import java.util.Spliterator;
+import java.util.concurrent.CountedCompleter;
 import java.util.function.Consumer;
 import java.util.function.DoubleConsumer;
 import java.util.function.IntConsumer;
@@ -32,6 +34,8 @@
 
 /**
  * A {@code TerminalOp} that evaluates a stream pipeline and sends the output into a {@code TerminalSink}.
+ * Elements will be passed to the {@code TerminalSink} in whatever thread and whatever order they become available,
+ * independent of the stream's encounter order.
  * @param <T> The output type of the stream pipeline
  * @since 1.8
  */
@@ -71,7 +75,7 @@
      * the provided {@code Consumer}
      * @param consumer The {@code Consumer} to send stream output to
      */
-    public static <T> ForEachOp<T> make(final Consumer<? super T> consumer) {
+    public static <T> TerminalOp<T, Void> makeRef(final Consumer<? super T> consumer) {
         return new ForEachOp<>((VoidTerminalSink<T>) consumer::accept, StreamShape.REFERENCE);
     }
 
@@ -80,7 +84,7 @@
      * the provided {@code IntConsumer}
      * @param consumer The {@code IntConsumer} to send stream output to
      */
-    public static ForEachOp<Integer> make(final IntConsumer consumer) {
+    public static TerminalOp<Integer, Void> makeInt(final IntConsumer consumer) {
         return new ForEachOp<>((VoidTerminalSink.OfInt) consumer::accept, StreamShape.INT_VALUE);
     }
 
@@ -89,7 +93,7 @@
      * the provided {@code LongConsumer}
      * @param consumer The {@code LongConsumer} to send stream output to
      */
-    public static ForEachOp<Long> make(final LongConsumer consumer) {
+    public static TerminalOp<Long, Void> makeLong(final LongConsumer consumer) {
         return new ForEachOp<>((VoidTerminalSink.OfLong) consumer::accept, StreamShape.LONG_VALUE);
     }
 
@@ -98,7 +102,7 @@
      * the provided {@code DoubleConsumer}
      * @param consumer The {@code DoubleConsumer} to send stream output to
      */
-    public static ForEachOp<Double> make(final DoubleConsumer consumer) {
+    public static TerminalOp<Double, Void> makeDouble(final DoubleConsumer consumer) {
         return new ForEachOp<>((VoidTerminalSink.OfDouble) consumer::accept, StreamShape.DOUBLE_VALUE);
     }
 
@@ -119,7 +123,59 @@
 
     @Override
     public <S> Void evaluateParallel(PipelineHelper<S, T> helper) {
-        OpUtils.parallelForEach(helper, helper.wrapSink(sink));
+        new ForEachTask<>(helper, helper.wrapSink(sink)).invoke();
         return null;
     }
+
+    /** A {@code ForkJoinTask} for performing a parallel for-each operation */
+    private static class ForEachTask<S, T> extends CountedCompleter<Void> {
+        private Spliterator<S> spliterator;
+        private final Sink<S> sink;
+        private final PipelineHelper<S, T> helper;
+        private final long targetSize;
+
+        private ForEachTask(PipelineHelper<S, T> helper, Sink<S> sink) {
+            super(null);
+            this.spliterator = helper.sourceSpliterator();
+            this.sink = sink;
+            this.targetSize = AbstractTask.suggestTargetSize(spliterator.estimateSize());
+            this.helper = helper;
+        }
+
+        private ForEachTask(ForEachTask<S, T> parent, Spliterator<S> spliterator) {
+            super(parent);
+            this.spliterator = spliterator;
+            this.sink = parent.sink;
+            this.targetSize = parent.targetSize;
+            this.helper = parent.helper;
+        }
+
+        public void compute() {
+            doCompute(this);
+        }
+
+        private static <S, T> void doCompute(ForEachTask<S, T> task) {
+            boolean isShortCircuit = StreamOpFlag.SHORT_CIRCUIT.isKnown(task.helper.getStreamAndOpFlags());
+            while (true) {
+                if (isShortCircuit && task.sink.cancellationRequested()) {
+                    task.propagateCompletion();
+                    task.spliterator = null;
+                    return;
+                }
+
+                Spliterator<S> split = null;
+                if (!AbstractTask.suggestSplit(task.helper, task.spliterator, task.targetSize)
+                    || (split = task.spliterator.trySplit()) == null) {
+                    task.helper.intoWrapped(task.sink, task.spliterator);
+                    task.propagateCompletion();
+                    task.spliterator = null;
+                    return;
+                }
+                else {
+                    task.addToPendingCount(1);
+                    new ForEachTask<>(task, split).fork();
+                }
+            }
+        }
+    }
 }
--- a/src/share/classes/java/util/stream/ForEachUntilOp.java	Mon Feb 25 12:21:06 2013 +0100
+++ b/src/share/classes/java/util/stream/ForEachUntilOp.java	Mon Feb 25 12:21:08 2013 +0100
@@ -49,8 +49,9 @@
  * @param <T> The output type of the stream pipeline
  * @since 1.8
  */
-class ForEachUntilOp<T> extends ForEachOp<T> implements TerminalOp<T, Void> {
-    public ForEachUntilOp(TerminalSink<T, Void> sink, StreamShape shape) {
+final class ForEachUntilOp<T> extends ForEachOp<T> implements TerminalOp<T, Void> {
+
+    private ForEachUntilOp(TerminalSink<T, Void> sink, StreamShape shape) {
         super(sink, shape);
     }
 
--- a/src/share/classes/java/util/stream/IntPipeline.java	Mon Feb 25 12:21:06 2013 +0100
+++ b/src/share/classes/java/util/stream/IntPipeline.java	Mon Feb 25 12:21:08 2013 +0100
@@ -262,7 +262,7 @@
 
     @Override
     public void forEach(IntConsumer consumer) {
-        pipeline(ForEachOp.make(consumer));
+        pipeline(ForEachOp.makeInt(consumer));
     }
 
     @Override
--- a/src/share/classes/java/util/stream/LongPipeline.java	Mon Feb 25 12:21:06 2013 +0100
+++ b/src/share/classes/java/util/stream/LongPipeline.java	Mon Feb 25 12:21:08 2013 +0100
@@ -252,7 +252,7 @@
 
     @Override
     public void forEach(LongConsumer consumer) {
-        pipeline(ForEachOp.make(consumer));
+        pipeline(ForEachOp.makeLong(consumer));
     }
 
     @Override
--- a/src/share/classes/java/util/stream/OpUtils.java	Mon Feb 25 12:21:06 2013 +0100
+++ b/src/share/classes/java/util/stream/OpUtils.java	Mon Feb 25 12:21:08 2013 +0100
@@ -24,10 +24,6 @@
  */
 package java.util.stream;
 
-import java.util.Spliterator;
-import java.util.concurrent.CountedCompleter;
-import java.util.function.Supplier;
-
 /**
  * Utility methods useful in the implementation of stream operations ({@link IntermediateOp}, {@link StatefulOp},
  * {@link TerminalOp}).
@@ -40,22 +36,6 @@
 
     /**
      * Given a {@link PipelineHelper} describing a stream source and a sequence of intermediate stream operations,
-     * compute the contents of the stream, in parallel, and pass the elements to the provided {@code Sink}.  Elements
-     * will be passed to the {@code Sink} in whatever thread and whatever order they become available, independent
-     * of the stream's encounter order.
-     *
-     * @param helper A {@code PipelineHelper} describing the stream source and operations
-     * @param sink A {@code Sink} into which to deposit resulting elements
-     * @param <P_IN> The input type of the stream pipeline
-     * @param <P_OUT> The output type of the stream pipeline
-     */
-    public static<P_IN, P_OUT> void parallelForEach(PipelineHelper<P_IN, P_OUT> helper,
-                                                    Sink<P_IN> sink) {
-        new ForEachTask<>(helper, sink).invoke();
-    }
-
-    /**
-     * Given a {@link PipelineHelper} describing a stream source and a sequence of intermediate stream operations,
      * compute the contents of the stream, sequentially, and collect the results into a {@code Node}.  The order
      * of output elements will respect the encounter order of the source stream, and all computation will happen
      * in the invoking thread.
@@ -81,58 +61,4 @@
             helper.intoWrappedWithCancel(helper.wrapSink(opSink), helper.sourceSpliterator());
         return nb.build();
     }
-
-
-    /** A {@code ForkJoinTask} for performing a parallel for-each operation */
-    private static class ForEachTask<S, T> extends CountedCompleter<Void> {
-        private Spliterator<S> spliterator;
-        private final Sink<S> sink;
-        private final PipelineHelper<S, T> helper;
-        private final long targetSize;
-
-        private ForEachTask(PipelineHelper<S, T> helper, Sink<S> sink) {
-            super(null);
-            this.spliterator = helper.sourceSpliterator();
-            this.sink = sink;
-            this.targetSize = AbstractTask.suggestTargetSize(spliterator.estimateSize());
-            this.helper = helper;
-        }
-
-        private ForEachTask(ForEachTask<S, T> parent, Spliterator<S> spliterator) {
-            super(parent);
-            this.spliterator = spliterator;
-            this.sink = parent.sink;
-            this.targetSize = parent.targetSize;
-            this.helper = parent.helper;
-        }
-
-        public void compute() {
-            doCompute(this);
-        }
-
-        private static <S, T> void doCompute(ForEachTask<S, T> task) {
-            boolean isShortCircuit = StreamOpFlag.SHORT_CIRCUIT.isKnown(task.helper.getStreamAndOpFlags());
-            while (true) {
-                if (isShortCircuit && task.sink.cancellationRequested()) {
-                    task.propagateCompletion();
-                    task.spliterator = null;
-                    return;
-                }
-
-                Spliterator<S> split = null;
-                if (!AbstractTask.suggestSplit(task.helper, task.spliterator, task.targetSize)
-                    || (split = task.spliterator.trySplit()) == null) {
-                    task.helper.intoWrapped(task.sink, task.spliterator);
-                    task.propagateCompletion();
-                    task.spliterator = null;
-                    return;
-                }
-                else {
-                    task.addToPendingCount(1);
-                    new ForEachTask<>(task, split).fork();
-                }
-            }
-        }
-    }
-
 }
--- a/src/share/classes/java/util/stream/ReferencePipeline.java	Mon Feb 25 12:21:06 2013 +0100
+++ b/src/share/classes/java/util/stream/ReferencePipeline.java	Mon Feb 25 12:21:08 2013 +0100
@@ -291,7 +291,7 @@
 
     @Override
     public void forEach(Consumer<? super U> consumer) {
-        pipeline(ForEachOp.make(consumer));
+        pipeline(ForEachOp.makeRef(consumer));
     }
 
     @Override