Mercurial > hg > openjdk > lambda > jdk
changeset 7530:12dac52b2230
Move forEach functionality from OpUtils to ForEachOp.
author | psandoz |
---|---|
date | Mon, 25 Feb 2013 12:21:08 +0100 |
parents | ccf54b3cd35b |
children | e343f811b2b4 |
files | src/share/classes/java/util/stream/DistinctOp.java src/share/classes/java/util/stream/DoublePipeline.java src/share/classes/java/util/stream/ForEachOp.java src/share/classes/java/util/stream/ForEachUntilOp.java src/share/classes/java/util/stream/IntPipeline.java src/share/classes/java/util/stream/LongPipeline.java src/share/classes/java/util/stream/OpUtils.java src/share/classes/java/util/stream/ReferencePipeline.java |
diffstat | 8 files changed, 80 insertions(+), 100 deletions(-) [+] |
line wrap: on
line diff
--- a/src/share/classes/java/util/stream/DistinctOp.java Mon Feb 25 12:21:06 2013 +0100 +++ b/src/share/classes/java/util/stream/DistinctOp.java Mon Feb 25 12:21:08 2013 +0100 @@ -123,22 +123,19 @@ return Nodes.node(reduceOp.evaluateParallel(helper)); } else { - final AtomicBoolean seenNull = new AtomicBoolean(false); - final ConcurrentHashMap<T, Boolean> map = new ConcurrentHashMap<>(); + // Holder of null state since ConcurrentHashMap does not support null values + AtomicBoolean seenNull = new AtomicBoolean(false); + ConcurrentHashMap<T, Boolean> map = new ConcurrentHashMap<>(); + TerminalOp<T, Void> forEachOp = ForEachOp.makeRef(t -> { + if (t == null) + seenNull.set(true); + else + map.putIfAbsent(t, Boolean.TRUE); + }); + forEachOp.evaluateParallel(helper); - // Cache the sink chain, so it can be reused by all F/J leaf tasks - Sink<S> sinkChain = helper.wrapSink(new Sink<T>() { - @Override - public void accept(T t) { - if (t == null) - seenNull.set(true); - else - map.putIfAbsent(t, Boolean.TRUE); - } - }); - - OpUtils.parallelForEach(helper, sinkChain); - + // If null has been seen then copy the key set into a HashSet that supports null values + // and add null Set<T> keys = map.keySet(); if (seenNull.get()) { keys = new HashSet<>(keys);
--- a/src/share/classes/java/util/stream/DoublePipeline.java Mon Feb 25 12:21:06 2013 +0100 +++ b/src/share/classes/java/util/stream/DoublePipeline.java Mon Feb 25 12:21:08 2013 +0100 @@ -241,7 +241,7 @@ @Override public void forEach(DoubleConsumer consumer) { - pipeline(ForEachOp.make(consumer)); + pipeline(ForEachOp.makeDouble(consumer)); } @Override
--- a/src/share/classes/java/util/stream/ForEachOp.java Mon Feb 25 12:21:06 2013 +0100 +++ b/src/share/classes/java/util/stream/ForEachOp.java Mon Feb 25 12:21:08 2013 +0100 @@ -24,6 +24,8 @@ */ package java.util.stream; +import java.util.Spliterator; +import java.util.concurrent.CountedCompleter; import java.util.function.Consumer; import java.util.function.DoubleConsumer; import java.util.function.IntConsumer; @@ -32,6 +34,8 @@ /** * A {@code TerminalOp} that evaluates a stream pipeline and sends the output into a {@code TerminalSink}. + * Elements will be passed to the {@code TerminalSink} in whatever thread and whatever order they become available, + * independent of the stream's encounter order. * @param <T> The output type of the stream pipeline * @since 1.8 */ @@ -71,7 +75,7 @@ * the provided {@code Consumer} * @param consumer The {@code Consumer} to send stream output to */ - public static <T> ForEachOp<T> make(final Consumer<? super T> consumer) { + public static <T> TerminalOp<T, Void> makeRef(final Consumer<? super T> consumer) { return new ForEachOp<>((VoidTerminalSink<T>) consumer::accept, StreamShape.REFERENCE); } @@ -80,7 +84,7 @@ * the provided {@code IntConsumer} * @param consumer The {@code IntConsumer} to send stream output to */ - public static ForEachOp<Integer> make(final IntConsumer consumer) { + public static TerminalOp<Integer, Void> makeInt(final IntConsumer consumer) { return new ForEachOp<>((VoidTerminalSink.OfInt) consumer::accept, StreamShape.INT_VALUE); } @@ -89,7 +93,7 @@ * the provided {@code LongConsumer} * @param consumer The {@code LongConsumer} to send stream output to */ - public static ForEachOp<Long> make(final LongConsumer consumer) { + public static TerminalOp<Long, Void> makeLong(final LongConsumer consumer) { return new ForEachOp<>((VoidTerminalSink.OfLong) consumer::accept, StreamShape.LONG_VALUE); } @@ -98,7 +102,7 @@ * the provided {@code DoubleConsumer} * @param consumer The {@code DoubleConsumer} to send stream output to */ - public static ForEachOp<Double> make(final DoubleConsumer consumer) { + public static TerminalOp<Double, Void> makeDouble(final DoubleConsumer consumer) { return new ForEachOp<>((VoidTerminalSink.OfDouble) consumer::accept, StreamShape.DOUBLE_VALUE); } @@ -119,7 +123,59 @@ @Override public <S> Void evaluateParallel(PipelineHelper<S, T> helper) { - OpUtils.parallelForEach(helper, helper.wrapSink(sink)); + new ForEachTask<>(helper, helper.wrapSink(sink)).invoke(); return null; } + + /** A {@code ForkJoinTask} for performing a parallel for-each operation */ + private static class ForEachTask<S, T> extends CountedCompleter<Void> { + private Spliterator<S> spliterator; + private final Sink<S> sink; + private final PipelineHelper<S, T> helper; + private final long targetSize; + + private ForEachTask(PipelineHelper<S, T> helper, Sink<S> sink) { + super(null); + this.spliterator = helper.sourceSpliterator(); + this.sink = sink; + this.targetSize = AbstractTask.suggestTargetSize(spliterator.estimateSize()); + this.helper = helper; + } + + private ForEachTask(ForEachTask<S, T> parent, Spliterator<S> spliterator) { + super(parent); + this.spliterator = spliterator; + this.sink = parent.sink; + this.targetSize = parent.targetSize; + this.helper = parent.helper; + } + + public void compute() { + doCompute(this); + } + + private static <S, T> void doCompute(ForEachTask<S, T> task) { + boolean isShortCircuit = StreamOpFlag.SHORT_CIRCUIT.isKnown(task.helper.getStreamAndOpFlags()); + while (true) { + if (isShortCircuit && task.sink.cancellationRequested()) { + task.propagateCompletion(); + task.spliterator = null; + return; + } + + Spliterator<S> split = null; + if (!AbstractTask.suggestSplit(task.helper, task.spliterator, task.targetSize) + || (split = task.spliterator.trySplit()) == null) { + task.helper.intoWrapped(task.sink, task.spliterator); + task.propagateCompletion(); + task.spliterator = null; + return; + } + else { + task.addToPendingCount(1); + new ForEachTask<>(task, split).fork(); + } + } + } + } }
--- a/src/share/classes/java/util/stream/ForEachUntilOp.java Mon Feb 25 12:21:06 2013 +0100 +++ b/src/share/classes/java/util/stream/ForEachUntilOp.java Mon Feb 25 12:21:08 2013 +0100 @@ -49,8 +49,9 @@ * @param <T> The output type of the stream pipeline * @since 1.8 */ -class ForEachUntilOp<T> extends ForEachOp<T> implements TerminalOp<T, Void> { - public ForEachUntilOp(TerminalSink<T, Void> sink, StreamShape shape) { +final class ForEachUntilOp<T> extends ForEachOp<T> implements TerminalOp<T, Void> { + + private ForEachUntilOp(TerminalSink<T, Void> sink, StreamShape shape) { super(sink, shape); }
--- a/src/share/classes/java/util/stream/IntPipeline.java Mon Feb 25 12:21:06 2013 +0100 +++ b/src/share/classes/java/util/stream/IntPipeline.java Mon Feb 25 12:21:08 2013 +0100 @@ -262,7 +262,7 @@ @Override public void forEach(IntConsumer consumer) { - pipeline(ForEachOp.make(consumer)); + pipeline(ForEachOp.makeInt(consumer)); } @Override
--- a/src/share/classes/java/util/stream/LongPipeline.java Mon Feb 25 12:21:06 2013 +0100 +++ b/src/share/classes/java/util/stream/LongPipeline.java Mon Feb 25 12:21:08 2013 +0100 @@ -252,7 +252,7 @@ @Override public void forEach(LongConsumer consumer) { - pipeline(ForEachOp.make(consumer)); + pipeline(ForEachOp.makeLong(consumer)); } @Override
--- a/src/share/classes/java/util/stream/OpUtils.java Mon Feb 25 12:21:06 2013 +0100 +++ b/src/share/classes/java/util/stream/OpUtils.java Mon Feb 25 12:21:08 2013 +0100 @@ -24,10 +24,6 @@ */ package java.util.stream; -import java.util.Spliterator; -import java.util.concurrent.CountedCompleter; -import java.util.function.Supplier; - /** * Utility methods useful in the implementation of stream operations ({@link IntermediateOp}, {@link StatefulOp}, * {@link TerminalOp}). @@ -40,22 +36,6 @@ /** * Given a {@link PipelineHelper} describing a stream source and a sequence of intermediate stream operations, - * compute the contents of the stream, in parallel, and pass the elements to the provided {@code Sink}. Elements - * will be passed to the {@code Sink} in whatever thread and whatever order they become available, independent - * of the stream's encounter order. - * - * @param helper A {@code PipelineHelper} describing the stream source and operations - * @param sink A {@code Sink} into which to deposit resulting elements - * @param <P_IN> The input type of the stream pipeline - * @param <P_OUT> The output type of the stream pipeline - */ - public static<P_IN, P_OUT> void parallelForEach(PipelineHelper<P_IN, P_OUT> helper, - Sink<P_IN> sink) { - new ForEachTask<>(helper, sink).invoke(); - } - - /** - * Given a {@link PipelineHelper} describing a stream source and a sequence of intermediate stream operations, * compute the contents of the stream, sequentially, and collect the results into a {@code Node}. The order * of output elements will respect the encounter order of the source stream, and all computation will happen * in the invoking thread. @@ -81,58 +61,4 @@ helper.intoWrappedWithCancel(helper.wrapSink(opSink), helper.sourceSpliterator()); return nb.build(); } - - - /** A {@code ForkJoinTask} for performing a parallel for-each operation */ - private static class ForEachTask<S, T> extends CountedCompleter<Void> { - private Spliterator<S> spliterator; - private final Sink<S> sink; - private final PipelineHelper<S, T> helper; - private final long targetSize; - - private ForEachTask(PipelineHelper<S, T> helper, Sink<S> sink) { - super(null); - this.spliterator = helper.sourceSpliterator(); - this.sink = sink; - this.targetSize = AbstractTask.suggestTargetSize(spliterator.estimateSize()); - this.helper = helper; - } - - private ForEachTask(ForEachTask<S, T> parent, Spliterator<S> spliterator) { - super(parent); - this.spliterator = spliterator; - this.sink = parent.sink; - this.targetSize = parent.targetSize; - this.helper = parent.helper; - } - - public void compute() { - doCompute(this); - } - - private static <S, T> void doCompute(ForEachTask<S, T> task) { - boolean isShortCircuit = StreamOpFlag.SHORT_CIRCUIT.isKnown(task.helper.getStreamAndOpFlags()); - while (true) { - if (isShortCircuit && task.sink.cancellationRequested()) { - task.propagateCompletion(); - task.spliterator = null; - return; - } - - Spliterator<S> split = null; - if (!AbstractTask.suggestSplit(task.helper, task.spliterator, task.targetSize) - || (split = task.spliterator.trySplit()) == null) { - task.helper.intoWrapped(task.sink, task.spliterator); - task.propagateCompletion(); - task.spliterator = null; - return; - } - else { - task.addToPendingCount(1); - new ForEachTask<>(task, split).fork(); - } - } - } - } - }
--- a/src/share/classes/java/util/stream/ReferencePipeline.java Mon Feb 25 12:21:06 2013 +0100 +++ b/src/share/classes/java/util/stream/ReferencePipeline.java Mon Feb 25 12:21:08 2013 +0100 @@ -291,7 +291,7 @@ @Override public void forEach(Consumer<? super U> consumer) { - pipeline(ForEachOp.make(consumer)); + pipeline(ForEachOp.makeRef(consumer)); } @Override