Mercurial > hg > openjdk > lambda > jdk
changeset 10544:18c111c17231
8027316: Distinct operation on an unordered stream should not be a barrier
Reviewed-by: henryjen, mduigou, briangoetz
author | psandoz |
---|---|
date | Thu, 31 Oct 2013 11:59:04 +0100 |
parents | 1ea1b24c1a04 |
children | bb4b1e1e390d |
files | src/share/classes/java/util/stream/DistinctOps.java src/share/classes/java/util/stream/StreamSpliterators.java test/java/util/stream/test/org/openjdk/tests/java/util/stream/DistinctOpTest.java |
diffstat | 3 files changed, 123 insertions(+), 6 deletions(-) [+] |
line wrap: on
line diff
--- a/src/share/classes/java/util/stream/DistinctOps.java Wed Oct 30 18:39:09 2013 -0700 +++ b/src/share/classes/java/util/stream/DistinctOps.java Thu Oct 31 11:59:04 2013 +0100 @@ -54,6 +54,16 @@ static <T> ReferencePipeline<T, T> makeRef(AbstractPipeline<?, T, ?> upstream) { return new ReferencePipeline.StatefulOp<T, T>(upstream, StreamShape.REFERENCE, StreamOpFlag.IS_DISTINCT | StreamOpFlag.NOT_SIZED) { + + <P_IN> Node<T> reduce(PipelineHelper<T> helper, Spliterator<P_IN> spliterator) { + // If the stream is SORTED then it should also be ORDERED so the following will also + // preserve the sort order + TerminalOp<T, LinkedHashSet<T>> reduceOp + = ReduceOps.<T, LinkedHashSet<T>>makeRef(LinkedHashSet::new, LinkedHashSet::add, + LinkedHashSet::addAll); + return Nodes.node(reduceOp.evaluateParallel(helper, spliterator)); + } + @Override <P_IN> Node<T> opEvaluateParallel(PipelineHelper<T> helper, Spliterator<P_IN> spliterator, @@ -63,12 +73,7 @@ return helper.evaluate(spliterator, false, generator); } else if (StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) { - // If the stream is SORTED then it should also be ORDERED so the following will also - // preserve the sort order - TerminalOp<T, LinkedHashSet<T>> reduceOp - = ReduceOps.<T, LinkedHashSet<T>>makeRef(LinkedHashSet::new, LinkedHashSet::add, - LinkedHashSet::addAll); - return Nodes.node(reduceOp.evaluateParallel(helper, spliterator)); + return reduce(helper, spliterator); } else { // Holder of null state since ConcurrentHashMap does not support null values @@ -95,6 +100,22 @@ } @Override + <P_IN> Spliterator<T> opEvaluateParallelLazy(PipelineHelper<T> helper, Spliterator<P_IN> spliterator) { + if (StreamOpFlag.DISTINCT.isKnown(helper.getStreamAndOpFlags())) { + // No-op + return helper.wrapSpliterator(spliterator); + } + else if (StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) { + // Not lazy, barrier required to preserve order + return reduce(helper, spliterator).spliterator(); + } + else { + // Lazy + return new StreamSpliterators.DistinctSpliterator<>(helper.wrapSpliterator(spliterator)); + } + } + + @Override Sink<T> opWrapSink(int flags, Sink<T> sink) { Objects.requireNonNull(sink);
--- a/src/share/classes/java/util/stream/StreamSpliterators.java Wed Oct 30 18:39:09 2013 -0700 +++ b/src/share/classes/java/util/stream/StreamSpliterators.java Thu Oct 31 11:59:04 2013 +0100 @@ -27,6 +27,7 @@ import java.util.Comparator; import java.util.Objects; import java.util.Spliterator; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicLong; import java.util.function.BooleanSupplier; import java.util.function.Consumer; @@ -1227,6 +1228,88 @@ } /** + * A wrapping spliterator that only reports distinct elements of the + * underlying spliterator. Does not preserve size and encounter order. + */ + static final class DistinctSpliterator<T> implements Spliterator<T>, Consumer<T> { + + // The value to represent null in the ConcurrentHashMap + private static final Object NULL_VALUE = new Object(); + + // The underlying spliterator + private final Spliterator<T> s; + + // ConcurrentHashMap holding distinct elements as keys + private final ConcurrentHashMap<T, Boolean> seen; + + // Temporary element, only used with tryAdvance + private T tmpSlot; + + DistinctSpliterator(Spliterator<T> s) { + this(s, new ConcurrentHashMap<>()); + } + + private DistinctSpliterator(Spliterator<T> s, ConcurrentHashMap<T, Boolean> seen) { + this.s = s; + this.seen = seen; + } + + @Override + public void accept(T t) { + this.tmpSlot = t; + } + + @SuppressWarnings("unchecked") + private T mapNull(T t) { + return t != null ? t : (T) NULL_VALUE; + } + + @Override + public boolean tryAdvance(Consumer<? super T> action) { + while (s.tryAdvance(this)) { + if (seen.putIfAbsent(mapNull(tmpSlot), Boolean.TRUE) == null) { + action.accept(tmpSlot); + tmpSlot = null; + return true; + } + } + return false; + } + + @Override + public void forEachRemaining(Consumer<? super T> action) { + s.forEachRemaining(t -> { + if (seen.putIfAbsent(mapNull(t), Boolean.TRUE) == null) { + action.accept(t); + } + }); + } + + @Override + public Spliterator<T> trySplit() { + Spliterator<T> split = s.trySplit(); + return (split != null) ? new DistinctSpliterator<>(split, seen) : null; + } + + @Override + public long estimateSize() { + return s.estimateSize(); + } + + @Override + public int characteristics() { + return (s.characteristics() & ~(Spliterator.SIZED | Spliterator.SUBSIZED | + Spliterator.SORTED | Spliterator.ORDERED)) + | Spliterator.DISTINCT; + } + + @Override + public Comparator<? super T> getComparator() { + return s.getComparator(); + } + } + + /** * A Spliterator that infinitely supplies elements in no particular order. * * <p>Splitting divides the estimated size in two and stops when the
--- a/test/java/util/stream/test/org/openjdk/tests/java/util/stream/DistinctOpTest.java Wed Oct 30 18:39:09 2013 -0700 +++ b/test/java/util/stream/test/org/openjdk/tests/java/util/stream/DistinctOpTest.java Thu Oct 31 11:59:04 2013 +0100 @@ -28,8 +28,10 @@ import java.util.Collection; import java.util.Comparator; import java.util.List; +import java.util.Optional; import java.util.Spliterator; import java.util.Spliterators; +import java.util.concurrent.ThreadLocalRandom; import java.util.stream.*; import static java.util.stream.LambdaTestHelpers.*; @@ -48,6 +50,17 @@ assertCountSum(countTo(10).stream().distinct(), 10, 55); } + public void testWithUnorderedInfiniteStream() { + // These tests should short-circuit, otherwise will fail with a time-out + // or an OOME + + Integer one = Stream.iterate(1, i -> i + 1).unordered().parallel().distinct().findAny().get(); + assertEquals(one.intValue(), 1); + + Optional<Integer> oi = ThreadLocalRandom.current().ints().boxed().parallel().distinct().findAny(); + assertTrue(oi.isPresent()); + } + @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class) public void testOp(String name, TestData.OfRef<Integer> data) { Collection<Integer> result = exerciseOpsInt(data, Stream::distinct, IntStream::distinct, LongStream::distinct, DoubleStream::distinct);