changeset 10921:a49d60c55b74

8129120: Terminal operation properties should not be back-propagated to upstream operations Reviewed-by: briangoetz, chegar
author psandoz
date Tue, 23 Jun 2015 09:49:55 +0200
parents 009d3bbe66bd
children bf5f41bd4710
files src/share/classes/java/util/stream/AbstractPipeline.java test/java/util/stream/bootlib/java/util/stream/DoubleStreamTestScenario.java test/java/util/stream/bootlib/java/util/stream/IntStreamTestScenario.java test/java/util/stream/bootlib/java/util/stream/LongStreamTestScenario.java test/java/util/stream/bootlib/java/util/stream/OpTestCase.java test/java/util/stream/bootlib/java/util/stream/StreamTestScenario.java test/java/util/stream/boottest/java/util/stream/FlagOpTest.java test/java/util/stream/boottest/java/util/stream/UnorderedTest.java test/java/util/stream/test/org/openjdk/tests/java/util/SplittableRandomTest.java test/java/util/stream/test/org/openjdk/tests/java/util/stream/DistinctOpTest.java test/java/util/stream/test/org/openjdk/tests/java/util/stream/InfiniteStreamWithLimitOpTest.java
diffstat 11 files changed, 355 insertions(+), 381 deletions(-) [+]
line wrap: on
line diff
--- a/src/share/classes/java/util/stream/AbstractPipeline.java	Thu Jun 25 16:49:08 2015 -0700
+++ b/src/share/classes/java/util/stream/AbstractPipeline.java	Tue Jun 23 09:49:55 2015 +0200
@@ -249,6 +249,11 @@
         // If the last intermediate operation is stateful then
         // evaluate directly to avoid an extra collection step
         if (isParallel() && previousStage != null && opIsStateful()) {
+            // Set the depth of this, last, pipeline stage to zero to slice the
+            // pipeline such that this operation will not be included in the
+            // upstream slice and upstream operations will not be included
+            // in this slice
+            depth = 0;
             return opEvaluateParallel(previousStage, previousStage.sourceSpliterator(0), generator);
         }
         else {
@@ -379,60 +384,6 @@
     }
 
     /**
-     * Prepare the pipeline for a parallel execution.  As the pipeline is built,
-     * the flags and depth indicators are set up for a sequential execution.
-     * If the execution is parallel, and there are any stateful operations, then
-     * some of these need to be adjusted, as well as adjusting for flags from
-     * the terminal operation (such as back-propagating UNORDERED).
-     * Need not be called for a sequential execution.
-     *
-     * @param terminalFlags Operation flags for the terminal operation
-     */
-    private void parallelPrepare(int terminalFlags) {
-        @SuppressWarnings("rawtypes")
-        AbstractPipeline backPropagationHead = sourceStage;
-        if (sourceStage.sourceAnyStateful) {
-            int depth = 1;
-            for (  @SuppressWarnings("rawtypes") AbstractPipeline u = sourceStage, p = sourceStage.nextStage;
-                 p != null;
-                 u = p, p = p.nextStage) {
-                int thisOpFlags = p.sourceOrOpFlags;
-                if (p.opIsStateful()) {
-                    // If the stateful operation is a short-circuit operation
-                    // then move the back propagation head forwards
-                    // NOTE: there are no size-injecting ops
-                    if (StreamOpFlag.SHORT_CIRCUIT.isKnown(thisOpFlags)) {
-                        backPropagationHead = p;
-                        // Clear the short circuit flag for next pipeline stage
-                        // This stage encapsulates short-circuiting, the next
-                        // stage may not have any short-circuit operations, and
-                        // if so spliterator.forEachRemaining should be be used
-                        // for traversal
-                        thisOpFlags = thisOpFlags & ~StreamOpFlag.IS_SHORT_CIRCUIT;
-                    }
-
-                    depth = 0;
-                    // The following injects size, it is equivalent to:
-                    // StreamOpFlag.combineOpFlags(StreamOpFlag.IS_SIZED, p.combinedFlags);
-                    thisOpFlags = (thisOpFlags & ~StreamOpFlag.NOT_SIZED) | StreamOpFlag.IS_SIZED;
-                }
-                p.depth = depth++;
-                p.combinedFlags = StreamOpFlag.combineOpFlags(thisOpFlags, u.combinedFlags);
-            }
-        }
-
-        // Apply the upstream terminal flags
-        if (terminalFlags != 0) {
-            int upstreamTerminalFlags = terminalFlags & StreamOpFlag.UPSTREAM_TERMINAL_OP_MASK;
-            for ( @SuppressWarnings("rawtypes") AbstractPipeline p = backPropagationHead; p.nextStage != null; p = p.nextStage) {
-                p.combinedFlags = StreamOpFlag.combineOpFlags(upstreamTerminalFlags, p.combinedFlags);
-            }
-
-            combinedFlags = StreamOpFlag.combineOpFlags(terminalFlags, combinedFlags);
-        }
-    }
-
-    /**
      * Get the source spliterator for this pipeline stage.  For a sequential or
      * stateless parallel pipeline, this is the source spliterator.  For a
      * stateful parallel pipeline, this is a spliterator describing the results
@@ -455,31 +406,49 @@
             throw new IllegalStateException(MSG_CONSUMED);
         }
 
-        if (isParallel()) {
-            // @@@ Merge parallelPrepare with the loop below and use the
-            //     spliterator characteristics to determine if SIZED
-            //     should be injected
-            parallelPrepare(terminalFlags);
-
+        if (isParallel() && sourceStage.sourceAnyStateful) {
             // Adapt the source spliterator, evaluating each stateful op
-            // in the pipeline up to and including this pipeline stage
-            for ( @SuppressWarnings("rawtypes") AbstractPipeline u = sourceStage, p = sourceStage.nextStage, e = this;
+            // in the pipeline up to and including this pipeline stage.
+            // The depth and flags of each pipeline stage are adjusted accordingly.
+            int depth = 1;
+            for (@SuppressWarnings("rawtypes") AbstractPipeline u = sourceStage, p = sourceStage.nextStage, e = this;
                  u != e;
                  u = p, p = p.nextStage) {
 
+                int thisOpFlags = p.sourceOrOpFlags;
                 if (p.opIsStateful()) {
+                    depth = 0;
+
+                    if (StreamOpFlag.SHORT_CIRCUIT.isKnown(thisOpFlags)) {
+                        // Clear the short circuit flag for next pipeline stage
+                        // This stage encapsulates short-circuiting, the next
+                        // stage may not have any short-circuit operations, and
+                        // if so spliterator.forEachRemaining should be used
+                        // for traversal
+                        thisOpFlags = thisOpFlags & ~StreamOpFlag.IS_SHORT_CIRCUIT;
+                    }
+
                     spliterator = p.opEvaluateParallelLazy(u, spliterator);
+
+                    // Inject or clear SIZED on the source pipeline stage
+                    // based on the stage's spliterator
+                    thisOpFlags = spliterator.hasCharacteristics(Spliterator.SIZED)
+                            ? (thisOpFlags & ~StreamOpFlag.NOT_SIZED) | StreamOpFlag.IS_SIZED
+                            : (thisOpFlags & ~StreamOpFlag.IS_SIZED) | StreamOpFlag.NOT_SIZED;
                 }
+                p.depth = depth++;
+                p.combinedFlags = StreamOpFlag.combineOpFlags(thisOpFlags, u.combinedFlags);
             }
         }
-        else if (terminalFlags != 0)  {
+
+        if (terminalFlags != 0)  {
+            // Apply flags from the terminal operation to last pipeline stage
             combinedFlags = StreamOpFlag.combineOpFlags(terminalFlags, combinedFlags);
         }
 
         return spliterator;
     }
 
-
     // PipelineHelper
 
     @Override
--- a/test/java/util/stream/bootlib/java/util/stream/DoubleStreamTestScenario.java	Thu Jun 25 16:49:08 2015 -0700
+++ b/test/java/util/stream/bootlib/java/util/stream/DoubleStreamTestScenario.java	Tue Jun 23 09:49:55 2015 +0200
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2013, Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2013, 2015, Oracle and/or its affiliates. All rights reserved.
  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
  *
  * This code is free software; you can redistribute it and/or modify it
@@ -22,7 +22,10 @@
  */
 package java.util.stream;
 
+import java.util.Collections;
+import java.util.EnumSet;
 import java.util.PrimitiveIterator;
+import java.util.Set;
 import java.util.Spliterator;
 import java.util.function.Consumer;
 import java.util.function.DoubleConsumer;
@@ -159,12 +162,50 @@
             for (double t : pipe2.toArray())
                 b.accept(t);
         }
-    },;
+    },
+
+    // Wrap as parallel stream + forEach synchronizing
+    PAR_STREAM_FOR_EACH(true, false) {
+        <T, S_IN extends BaseStream<T, S_IN>>
+        void _run(TestData<T, S_IN> data, DoubleConsumer b, Function<S_IN, DoubleStream> m) {
+            m.apply(data.parallelStream()).forEach(e -> {
+                synchronized (data) {
+                    b.accept(e);
+                }
+            });
+        }
+    },
+
+    // Wrap as parallel stream + forEach synchronizing and clear SIZED flag
+    PAR_STREAM_FOR_EACH_CLEAR_SIZED(true, false) {
+        <T, S_IN extends BaseStream<T, S_IN>>
+        void _run(TestData<T, S_IN> data, DoubleConsumer b, Function<S_IN, DoubleStream> m) {
+            S_IN pipe1 = (S_IN) OpTestCase.chain(data.parallelStream(),
+                                                 new FlagDeclaringOp(StreamOpFlag.NOT_SIZED, data.getShape()));
+            m.apply(pipe1).forEach(e -> {
+                synchronized (data) {
+                    b.accept(e);
+                }
+            });
+        }
+    },
+    ;
+
+    // The set of scenarios that clean the SIZED flag
+    public static final Set<DoubleStreamTestScenario> CLEAR_SIZED_SCENARIOS = Collections.unmodifiableSet(
+            EnumSet.of(PAR_STREAM_TO_ARRAY_CLEAR_SIZED, PAR_STREAM_FOR_EACH_CLEAR_SIZED));
 
     private boolean isParallel;
 
+    private final boolean isOrdered;
+
     DoubleStreamTestScenario(boolean isParallel) {
+        this(isParallel, true);
+    }
+
+    DoubleStreamTestScenario(boolean isParallel, boolean isOrdered) {
         this.isParallel = isParallel;
+        this.isOrdered = isOrdered;
     }
 
     public StreamShape getShape() {
@@ -175,6 +216,10 @@
         return isParallel;
     }
 
+    public boolean isOrdered() {
+        return isOrdered;
+    }
+
     public <T, U, S_IN extends BaseStream<T, S_IN>, S_OUT extends BaseStream<U, S_OUT>>
     void run(TestData<T, S_IN> data, Consumer<U> b, Function<S_IN, S_OUT> m) {
         _run(data, (DoubleConsumer) b, (Function<S_IN, DoubleStream>) m);
--- a/test/java/util/stream/bootlib/java/util/stream/IntStreamTestScenario.java	Thu Jun 25 16:49:08 2015 -0700
+++ b/test/java/util/stream/bootlib/java/util/stream/IntStreamTestScenario.java	Tue Jun 23 09:49:55 2015 +0200
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2012, 2015, Oracle and/or its affiliates. All rights reserved.
  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
  *
  * This code is free software; you can redistribute it and/or modify it
@@ -22,7 +22,10 @@
  */
 package java.util.stream;
 
+import java.util.Collections;
+import java.util.EnumSet;
 import java.util.PrimitiveIterator;
+import java.util.Set;
 import java.util.Spliterator;
 import java.util.function.Consumer;
 import java.util.function.Function;
@@ -160,12 +163,50 @@
             for (int t : pipe2.toArray())
                 b.accept(t);
         }
-    },;
+    },
+
+    // Wrap as parallel stream + forEach synchronizing
+    PAR_STREAM_FOR_EACH(true, false) {
+        <T, S_IN extends BaseStream<T, S_IN>>
+        void _run(TestData<T, S_IN> data, IntConsumer b, Function<S_IN, IntStream> m) {
+            m.apply(data.parallelStream()).forEach(e -> {
+                synchronized (data) {
+                    b.accept(e);
+                }
+            });
+        }
+    },
 
-    private boolean isParallel;
+    // Wrap as parallel stream + forEach synchronizing and clear SIZED flag
+    PAR_STREAM_FOR_EACH_CLEAR_SIZED(true, false) {
+        <T, S_IN extends BaseStream<T, S_IN>>
+        void _run(TestData<T, S_IN> data, IntConsumer b, Function<S_IN, IntStream> m) {
+            S_IN pipe1 = (S_IN) OpTestCase.chain(data.parallelStream(),
+                                                 new FlagDeclaringOp(StreamOpFlag.NOT_SIZED, data.getShape()));
+            m.apply(pipe1).forEach(e -> {
+                synchronized (data) {
+                    b.accept(e);
+                }
+            });
+        }
+    },
+    ;
+
+    // The set of scenarios that clean the SIZED flag
+    public static final Set<IntStreamTestScenario> CLEAR_SIZED_SCENARIOS = Collections.unmodifiableSet(
+            EnumSet.of(PAR_STREAM_TO_ARRAY_CLEAR_SIZED, PAR_STREAM_FOR_EACH_CLEAR_SIZED));
+
+    private final boolean isParallel;
+
+    private final boolean isOrdered;
 
     IntStreamTestScenario(boolean isParallel) {
+        this(isParallel, true);
+    }
+
+    IntStreamTestScenario(boolean isParallel, boolean isOrdered) {
         this.isParallel = isParallel;
+        this.isOrdered = isOrdered;
     }
 
     public StreamShape getShape() {
@@ -176,6 +217,10 @@
         return isParallel;
     }
 
+    public boolean isOrdered() {
+        return isOrdered;
+    }
+
     public <T, U, S_IN extends BaseStream<T, S_IN>, S_OUT extends BaseStream<U, S_OUT>>
     void run(TestData<T, S_IN> data, Consumer<U> b, Function<S_IN, S_OUT> m) {
         _run(data, (IntConsumer) b, (Function<S_IN, IntStream>) m);
--- a/test/java/util/stream/bootlib/java/util/stream/LongStreamTestScenario.java	Thu Jun 25 16:49:08 2015 -0700
+++ b/test/java/util/stream/bootlib/java/util/stream/LongStreamTestScenario.java	Tue Jun 23 09:49:55 2015 +0200
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2013, Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2013, 2015, Oracle and/or its affiliates. All rights reserved.
  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
  *
  * This code is free software; you can redistribute it and/or modify it
@@ -22,7 +22,10 @@
  */
 package java.util.stream;
 
+import java.util.Collections;
+import java.util.EnumSet;
 import java.util.PrimitiveIterator;
+import java.util.Set;
 import java.util.Spliterator;
 import java.util.function.Consumer;
 import java.util.function.Function;
@@ -159,12 +162,50 @@
             for (long t : pipe2.toArray())
                 b.accept(t);
         }
-    },;
+    },
+
+    // Wrap as parallel stream + forEach synchronizing
+    PAR_STREAM_FOR_EACH(true, false) {
+        <T, S_IN extends BaseStream<T, S_IN>>
+        void _run(TestData<T, S_IN> data, LongConsumer b, Function<S_IN, LongStream> m) {
+            m.apply(data.parallelStream()).forEach(e -> {
+                synchronized (data) {
+                    b.accept(e);
+                }
+            });
+        }
+    },
+
+    // Wrap as parallel stream + forEach synchronizing and clear SIZED flag
+    PAR_STREAM_FOR_EACH_CLEAR_SIZED(true, false) {
+        <T, S_IN extends BaseStream<T, S_IN>>
+        void _run(TestData<T, S_IN> data, LongConsumer b, Function<S_IN, LongStream> m) {
+            S_IN pipe1 = (S_IN) OpTestCase.chain(data.parallelStream(),
+                                                 new FlagDeclaringOp(StreamOpFlag.NOT_SIZED, data.getShape()));
+            m.apply(pipe1).forEach(e -> {
+                synchronized (data) {
+                    b.accept(e);
+                }
+            });
+        }
+    },
+    ;
+
+    // The set of scenarios that clean the SIZED flag
+    public static final Set<LongStreamTestScenario> CLEAR_SIZED_SCENARIOS = Collections.unmodifiableSet(
+            EnumSet.of(PAR_STREAM_TO_ARRAY_CLEAR_SIZED, PAR_STREAM_FOR_EACH_CLEAR_SIZED));
 
     private boolean isParallel;
 
+    private final boolean isOrdered;
+
     LongStreamTestScenario(boolean isParallel) {
+        this(isParallel, true);
+    }
+
+    LongStreamTestScenario(boolean isParallel, boolean isOrdered) {
         this.isParallel = isParallel;
+        this.isOrdered = isOrdered;
     }
 
     public StreamShape getShape() {
@@ -175,6 +216,10 @@
         return isParallel;
     }
 
+    public boolean isOrdered() {
+        return isOrdered;
+    }
+
     public <T, U, S_IN extends BaseStream<T, S_IN>, S_OUT extends BaseStream<U, S_OUT>>
     void run(TestData<T, S_IN> data, Consumer<U> b, Function<S_IN, S_OUT> m) {
         _run(data, (LongConsumer) b, (Function<S_IN, LongStream>) m);
--- a/test/java/util/stream/bootlib/java/util/stream/OpTestCase.java	Thu Jun 25 16:49:08 2015 -0700
+++ b/test/java/util/stream/bootlib/java/util/stream/OpTestCase.java	Tue Jun 23 09:49:55 2015 +0200
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2012, 2015, Oracle and/or its affiliates. All rights reserved.
  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
  *
  * This code is free software; you can redistribute it and/or modify it
@@ -30,6 +30,7 @@
 import java.util.Collections;
 import java.util.EnumMap;
 import java.util.EnumSet;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -91,11 +92,13 @@
 
         boolean isParallel();
 
-        abstract <T, U, S_IN extends BaseStream<T, S_IN>, S_OUT extends BaseStream<U, S_OUT>>
+        boolean isOrdered();
+
+        <T, U, S_IN extends BaseStream<T, S_IN>, S_OUT extends BaseStream<U, S_OUT>>
         void run(TestData<T, S_IN> data, Consumer<U> b, Function<S_IN, S_OUT> m);
     }
 
-    public <T, U, S_IN extends BaseStream<T, S_IN>, S_OUT extends BaseStream<U, S_OUT>>
+    protected <T, U, S_IN extends BaseStream<T, S_IN>, S_OUT extends BaseStream<U, S_OUT>>
     Collection<U> exerciseOps(TestData<T, S_IN> data, Function<S_IN, S_OUT> m) {
         return withData(data).stream(m).exercise();
     }
@@ -103,7 +106,7 @@
     // Run multiple versions of exercise(), returning the result of the first, and asserting that others return the same result
     // If the first version is s -> s.foo(), can be used with s -> s.mapToInt(i -> i).foo().mapToObj(i -> i) to test all shape variants
     @SafeVarargs
-    public final<T, U, S_IN extends BaseStream<T, S_IN>, S_OUT extends BaseStream<U, S_OUT>>
+    protected final<T, U, S_IN extends BaseStream<T, S_IN>, S_OUT extends BaseStream<U, S_OUT>>
     Collection<U> exerciseOpsMulti(TestData<T, S_IN> data,
                                    Function<S_IN, S_OUT>... ms) {
         Collection<U> result = null;
@@ -121,7 +124,7 @@
     // Run multiple versions of exercise() for an Integer stream, returning the result of the first, and asserting that others return the same result
     // Automates the conversion between Stream<Integer> and {Int,Long,Double}Stream and back, so client sites look like you are passing the same
     // lambda four times, but in fact they are four different lambdas since they are transforming four different kinds of streams
-    public final
+    protected final
     Collection<Integer> exerciseOpsInt(TestData.OfRef<Integer> data,
                                        Function<Stream<Integer>, Stream<Integer>> mRef,
                                        Function<IntStream, IntStream> mInt,
@@ -136,30 +139,73 @@
         return exerciseOpsMulti(data, ms);
     }
 
-    public <T, U, S_OUT extends BaseStream<U, S_OUT>>
+    // Run multiple versions of exercise() with multiple terminal operations for all kinds of stream, , and asserting against the expected result
+    // If the first version is s -> s.foo(), can be used with s -> s.mapToInt(i -> i).foo().mapToObj(i -> i) to test all shape variants
+    protected final<T, U, R, S_IN extends BaseStream<T, S_IN>, S_OUT extends BaseStream<U, S_OUT>>
+    void exerciseTerminalOpsMulti(TestData<T, S_IN> data,
+                                  R expected,
+                                  Map<String, Function<S_IN, S_OUT>> streams,
+                                  Map<String, Function<S_OUT, R>> terminals) {
+        for (Map.Entry<String, Function<S_IN, S_OUT>> se : streams.entrySet()) {
+            setContext("Intermediate stream", se.getKey());
+            for (Map.Entry<String, Function<S_OUT, R>> te : terminals.entrySet()) {
+                setContext("Terminal stream", te.getKey());
+                withData(data)
+                        .terminal(se.getValue(), te.getValue())
+                        .expectedResult(expected)
+                        .exercise();
+
+            }
+        }
+    }
+
+    // Run multiple versions of exercise() with multiple terminal operation for all kinds of stream, and asserting against the expected result
+    // Automates the conversion between Stream<Integer> and {Int,Long,Double}Stream and back, so client sites look like you are passing the same
+    // lambda four times, but in fact they are four different lambdas since they are transforming four different kinds of streams
+    protected final
+    void exerciseTerminalOpsInt(TestData<Integer, Stream<Integer>> data,
+                                Collection<Integer> expected,
+                                String desc,
+                                Function<Stream<Integer>, Stream<Integer>> mRef,
+                                Function<IntStream, IntStream> mInt,
+                                Function<LongStream, LongStream> mLong,
+                                Function<DoubleStream, DoubleStream> mDouble,
+                                Map<String, Function<Stream<Integer>, Collection<Integer>>> terminals) {
+
+        Map<String, Function<Stream<Integer>, Stream<Integer>>> m = new HashMap<>();
+        m.put("Ref " + desc, mRef);
+        m.put("Int " + desc, s -> mInt.apply(s.mapToInt(e -> e)).mapToObj(e -> e));
+        m.put("Long " + desc, s -> mLong.apply(s.mapToLong(e -> e)).mapToObj(e -> (int) e));
+        m.put("Double " + desc, s -> mDouble.apply(s.mapToDouble(e -> e)).mapToObj(e -> (int) e));
+
+        exerciseTerminalOpsMulti(data, expected, m, terminals);
+    }
+
+
+    protected <T, U, S_OUT extends BaseStream<U, S_OUT>>
     Collection<U> exerciseOps(Collection<T> data, Function<Stream<T>, S_OUT> m) {
         TestData.OfRef<T> data1 = TestData.Factory.ofCollection("Collection of type " + data.getClass().getName(), data);
         return withData(data1).stream(m).exercise();
     }
 
-    public <T, U, S_OUT extends BaseStream<U, S_OUT>, I extends Iterable<U>>
+    protected <T, U, S_OUT extends BaseStream<U, S_OUT>, I extends Iterable<U>>
     Collection<U> exerciseOps(Collection<T> data, Function<Stream<T>, S_OUT> m, I expected) {
         TestData.OfRef<T> data1 = TestData.Factory.ofCollection("Collection of type " + data.getClass().getName(), data);
         return withData(data1).stream(m).expectedResult(expected).exercise();
     }
 
     @SuppressWarnings("unchecked")
-    public <U, S_OUT extends BaseStream<U, S_OUT>>
+    protected <U, S_OUT extends BaseStream<U, S_OUT>>
     Collection<U> exerciseOps(int[] data, Function<IntStream, S_OUT> m) {
         return withData(TestData.Factory.ofArray("int array", data)).stream(m).exercise();
     }
 
-    public Collection<Integer> exerciseOps(int[] data, Function<IntStream, IntStream> m, int[] expected) {
+    protected Collection<Integer> exerciseOps(int[] data, Function<IntStream, IntStream> m, int[] expected) {
         TestData.OfInt data1 = TestData.Factory.ofArray("int array", data);
         return withData(data1).stream(m).expectedResult(expected).exercise();
     }
 
-    public <T, S_IN extends BaseStream<T, S_IN>> DataStreamBuilder<T, S_IN> withData(TestData<T, S_IN> data) {
+    protected <T, S_IN extends BaseStream<T, S_IN>> DataStreamBuilder<T, S_IN> withData(TestData<T, S_IN> data) {
         Objects.requireNonNull(data);
         return new DataStreamBuilder<>(data);
     }
@@ -325,19 +371,19 @@
         // Build method
 
         public Collection<U> exercise() {
-            final boolean isOrdered;
+            final boolean isStreamOrdered;
             if (refResult == null) {
                 // Induce the reference result
                 before.accept(data);
                 S_OUT sOut = m.apply(data.stream());
-                isOrdered = StreamOpFlag.ORDERED.isKnown(((AbstractPipeline) sOut).getStreamFlags());
+                isStreamOrdered = StreamOpFlag.ORDERED.isKnown(((AbstractPipeline) sOut).getStreamFlags());
                 Node<U> refNodeResult = ((AbstractPipeline<?, U, ?>) sOut).evaluateToArrayNode(size -> (U[]) new Object[size]);
                 refResult = LambdaTestHelpers.toBoxedList(refNodeResult.spliterator());
                 after.accept(data);
             }
             else {
                 S_OUT sOut = m.apply(data.stream());
-                isOrdered = StreamOpFlag.ORDERED.isKnown(((AbstractPipeline) sOut).getStreamFlags());
+                isStreamOrdered = StreamOpFlag.ORDERED.isKnown(((AbstractPipeline) sOut).getStreamFlags());
             }
 
             List<Error> errors = new ArrayList<>();
@@ -348,7 +394,7 @@
                     List<U> result = new ArrayList<>();
                     test.run(data, LambdaTestHelpers.<U>toBoxingConsumer(result::add), m);
 
-                    Runnable asserter = () -> resultAsserter.assertResult(result, refResult, isOrdered, test.isParallel());
+                    Runnable asserter = () -> resultAsserter.assertResult(result, refResult, isStreamOrdered && test.isOrdered(), test.isParallel());
 
                     if (refResult.size() > 1000) {
                         LambdaTestHelpers.launderAssertion(
@@ -406,7 +452,7 @@
     }
 
     @SuppressWarnings({"rawtypes", "unchecked"})
-    static enum TerminalTestScenario implements BaseTerminalTestScenario {
+    enum TerminalTestScenario implements BaseTerminalTestScenario {
         SINGLE_SEQUENTIAL(true, false),
 
         SINGLE_SEQUENTIAL_SHORT_CIRCUIT(true, false) {
@@ -546,19 +592,19 @@
         }
     }
 
-    public <T, R> R exerciseTerminalOps(Collection<T> data, Function<Stream<T>, R> m, R expected) {
+    protected <T, R> R exerciseTerminalOps(Collection<T> data, Function<Stream<T>, R> m, R expected) {
         TestData.OfRef<T> data1
                 = TestData.Factory.ofCollection("Collection of type " + data.getClass().getName(), data);
         return withData(data1).terminal(m).expectedResult(expected).exercise();
     }
 
-    public <T, R, S_IN extends BaseStream<T, S_IN>> R
+    protected <T, R, S_IN extends BaseStream<T, S_IN>> R
     exerciseTerminalOps(TestData<T, S_IN> data,
                         Function<S_IN, R> terminalF) {
         return withData(data).terminal(terminalF).exercise();
     }
 
-    public <T, U, R, S_IN extends BaseStream<T, S_IN>, S_OUT extends BaseStream<U, S_OUT>> R
+    protected <T, U, R, S_IN extends BaseStream<T, S_IN>, S_OUT extends BaseStream<U, S_OUT>> R
     exerciseTerminalOps(TestData<T, S_IN> data,
                         Function<S_IN, S_OUT> streamF,
                         Function<S_OUT, R> terminalF) {
--- a/test/java/util/stream/bootlib/java/util/stream/StreamTestScenario.java	Thu Jun 25 16:49:08 2015 -0700
+++ b/test/java/util/stream/bootlib/java/util/stream/StreamTestScenario.java	Tue Jun 23 09:49:55 2015 +0200
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2012, 2015, Oracle and/or its affiliates. All rights reserved.
  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
  *
  * This code is free software; you can redistribute it and/or modify it
@@ -22,7 +22,10 @@
  */
 package java.util.stream;
 
+import java.util.Collections;
+import java.util.EnumSet;
 import java.util.Iterator;
+import java.util.Set;
 import java.util.Spliterator;
 import java.util.function.Consumer;
 import java.util.function.Function;
@@ -173,8 +176,8 @@
         }
     },
 
-    // Wrap as parallel + collect
-    PAR_STREAM_COLLECT(true) {
+    // Wrap as parallel + collect to list
+    PAR_STREAM_COLLECT_TO_LIST(true) {
         <T, U, S_IN extends BaseStream<T, S_IN>>
         void _run(TestData<T, S_IN> data, Consumer<U> b, Function<S_IN, Stream<U>> m) {
             for (U u : m.apply(data.parallelStream()).collect(Collectors.toList()))
@@ -182,8 +185,8 @@
         }
     },
 
-    // Wrap sequential as parallel, + collect
-    STREAM_TO_PAR_STREAM_COLLECT(true) {
+    // Wrap sequential as parallel, + collect to list
+    STREAM_TO_PAR_STREAM_COLLECT_TO_LIST(true) {
         <T, U, S_IN extends BaseStream<T, S_IN>>
         void _run(TestData<T, S_IN> data, Consumer<U> b, Function<S_IN, Stream<U>> m) {
             for (U u : m.apply(data.stream().parallel()).collect(Collectors.toList()))
@@ -192,19 +195,56 @@
     },
 
     // Wrap parallel as sequential,, + collect
-    PAR_STREAM_TO_STREAM_COLLECT(true) {
+    PAR_STREAM_TO_STREAM_COLLECT_TO_LIST(true) {
         <T, U, S_IN extends BaseStream<T, S_IN>>
         void _run(TestData<T, S_IN> data, Consumer<U> b, Function<S_IN, Stream<U>> m) {
             for (U u : m.apply(data.parallelStream().sequential()).collect(Collectors.toList()))
                 b.accept(u);
         }
     },
+
+    // Wrap as parallel stream + forEach synchronizing
+    PAR_STREAM_FOR_EACH(true, false) {
+        <T, U, S_IN extends BaseStream<T, S_IN>>
+        void _run(TestData<T, S_IN> data, Consumer<U> b, Function<S_IN, Stream<U>> m) {
+            m.apply(data.parallelStream()).forEach(e -> {
+                synchronized (data) {
+                    b.accept(e);
+                }
+            });
+        }
+    },
+
+    // Wrap as parallel stream + forEach synchronizing and clear SIZED flag
+    PAR_STREAM_FOR_EACH_CLEAR_SIZED(true, false) {
+        <T, U, S_IN extends BaseStream<T, S_IN>>
+        void _run(TestData<T, S_IN> data, Consumer<U> b, Function<S_IN, Stream<U>> m) {
+            S_IN pipe1 = (S_IN) OpTestCase.chain(data.parallelStream(),
+                                                 new FlagDeclaringOp(StreamOpFlag.NOT_SIZED, data.getShape()));
+            m.apply(pipe1).forEach(e -> {
+                synchronized (data) {
+                    b.accept(e);
+                }
+            });
+        }
+    },
     ;
 
-    private boolean isParallel;
+    // The set of scenarios that clean the SIZED flag
+    public static final Set<StreamTestScenario> CLEAR_SIZED_SCENARIOS = Collections.unmodifiableSet(
+            EnumSet.of(PAR_STREAM_TO_ARRAY_CLEAR_SIZED, PAR_STREAM_FOR_EACH_CLEAR_SIZED));
+
+    private final boolean isParallel;
+
+    private final boolean isOrdered;
 
     StreamTestScenario(boolean isParallel) {
+        this(isParallel, true);
+    }
+
+    StreamTestScenario(boolean isParallel, boolean isOrdered) {
         this.isParallel = isParallel;
+        this.isOrdered = isOrdered;
     }
 
     public StreamShape getShape() {
@@ -215,6 +255,10 @@
         return isParallel;
     }
 
+    public boolean isOrdered() {
+        return isOrdered;
+    }
+
     public <T, U, S_IN extends BaseStream<T, S_IN>, S_OUT extends BaseStream<U, S_OUT>>
     void run(TestData<T, S_IN> data, Consumer<U> b, Function<S_IN, S_OUT> m) {
         _run(data, b, (Function<S_IN, Stream<U>>) m);
--- a/test/java/util/stream/boottest/java/util/stream/FlagOpTest.java	Thu Jun 25 16:49:08 2015 -0700
+++ b/test/java/util/stream/boottest/java/util/stream/FlagOpTest.java	Tue Jun 23 09:49:55 2015 +0200
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2012, 2015, Oracle and/or its affiliates. All rights reserved.
  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
  *
  * This code is free software; you can redistribute it and/or modify it
@@ -112,7 +112,7 @@
         FlagDeclaringOp[] opsArray = ops.toArray(new FlagDeclaringOp[ops.size()]);
 
         withData(data).ops(opsArray).
-                without(StreamTestScenario.PAR_STREAM_TO_ARRAY_CLEAR_SIZED).
+                without(StreamTestScenario.CLEAR_SIZED_SCENARIOS).
                 exercise();
     }
 
@@ -152,7 +152,7 @@
 
 
         withData(data).ops(opsArray).
-                without(StreamTestScenario.PAR_STREAM_TO_ARRAY_CLEAR_SIZED).
+                without(StreamTestScenario.CLEAR_SIZED_SCENARIOS).
                 exercise();
     }
 
@@ -185,7 +185,7 @@
         IntermediateTestOp[] opsArray = ops.toArray(new IntermediateTestOp[ops.size()]);
 
         withData(data).ops(opsArray).
-                without(StreamTestScenario.PAR_STREAM_TO_ARRAY_CLEAR_SIZED).
+                without(StreamTestScenario.CLEAR_SIZED_SCENARIOS).
                 exercise();
     }
 
@@ -221,7 +221,7 @@
         IntermediateTestOp[] opsArray = ops.toArray(new IntermediateTestOp[ops.size()]);
 
         withData(data).ops(opsArray).
-                without(StreamTestScenario.PAR_STREAM_TO_ARRAY_CLEAR_SIZED).
+                without(StreamTestScenario.CLEAR_SIZED_SCENARIOS).
                 exercise();
     }
 
--- a/test/java/util/stream/boottest/java/util/stream/UnorderedTest.java	Thu Jun 25 16:49:08 2015 -0700
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,265 +0,0 @@
-/*
- * Copyright (c) 2013, Oracle and/or its affiliates. All rights reserved.
- * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
- *
- * This code is free software; you can redistribute it and/or modify it
- * under the terms of the GNU General Public License version 2 only, as
- * published by the Free Software Foundation.
- *
- * This code is distributed in the hope that it will be useful, but WITHOUT
- * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
- * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
- * version 2 for more details (a copy is included in the LICENSE file that
- * accompanied this code).
- *
- * You should have received a copy of the GNU General Public License version
- * 2 along with this work; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
- *
- * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
- * or visit www.oracle.com if you need additional information or have any
- * questions.
- */
-package java.util.stream;
-
-import org.testng.annotations.Test;
-
-import java.util.Arrays;
-import java.util.List;
-import java.util.function.BiConsumer;
-import java.util.function.Function;
-import java.util.function.UnaryOperator;
-
-@Test
-public class UnorderedTest extends OpTestCase {
-
-    @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class)
-    public void testTerminalOps(String name, TestData<Integer, Stream<Integer>> data) {
-        testTerminal(data, s -> { s.forEach(x -> { }); return 0; });
-
-        testTerminal(data, s -> s.findAny(), (a, b) -> assertEquals(a.isPresent(), b.isPresent()));
-
-        testTerminal(data, s -> s.anyMatch(e -> true));
-    }
-
-
-    private <T, R> void testTerminal(TestData<T, Stream<T>> data, Function<Stream<T>, R> terminalF) {
-        testTerminal(data, terminalF, LambdaTestHelpers::assertContentsEqual);
-    }
-
-    static class WrappingUnaryOperator<S> implements UnaryOperator<S> {
-
-        final boolean isLimit;
-        final UnaryOperator<S> uo;
-
-        WrappingUnaryOperator(UnaryOperator<S> uo) {
-            this(uo, false);
-        }
-
-        WrappingUnaryOperator(UnaryOperator<S> uo, boolean isLimit) {
-            this.uo = uo;
-            this.isLimit = isLimit;
-        }
-
-        @Override
-        public S apply(S s) {
-            return uo.apply(s);
-        }
-    }
-
-    static <S> WrappingUnaryOperator<S> wrap(UnaryOperator<S> uo) {
-        return new WrappingUnaryOperator<>(uo);
-    }
-
-    static <S> WrappingUnaryOperator<S> wrap(UnaryOperator<S> uo, boolean isLimit) {
-        return new WrappingUnaryOperator<>(uo, isLimit);
-    }
-
-    @SuppressWarnings("rawtypes")
-    private List permutationOfFunctions =
-            LambdaTestHelpers.perm(Arrays.<WrappingUnaryOperator<Stream<Object>>>asList(
-                    wrap(s -> s.sorted()),
-                    wrap(s -> s.distinct()),
-                    wrap(s -> s.limit(5), true)
-            ));
-
-    @SuppressWarnings("unchecked")
-    private <T, R> void testTerminal(TestData<T, Stream<T>> data,
-                                     Function<Stream<T>, R> terminalF,
-                                     BiConsumer<R, R> equalityAsserter) {
-        testTerminal(data, terminalF, equalityAsserter, permutationOfFunctions, StreamShape.REFERENCE);
-    }
-
-    //
-
-    @Test(dataProvider = "IntStreamTestData", dataProviderClass = IntStreamTestDataProvider.class)
-    public void testIntTerminalOps(String name, TestData.OfInt data) {
-        testIntTerminal(data, s -> { s.forEach(x -> { }); return 0; });
-        testIntTerminal(data, s -> s.findAny(), (a, b) -> assertEquals(a.isPresent(), b.isPresent()));
-        testIntTerminal(data, s -> s.anyMatch(e -> true));
-    }
-
-
-    private <T, R> void testIntTerminal(TestData.OfInt data, Function<IntStream, R> terminalF) {
-        testIntTerminal(data, terminalF, LambdaTestHelpers::assertContentsEqual);
-    }
-
-    private List<List<WrappingUnaryOperator<IntStream>>> intPermutationOfFunctions =
-            LambdaTestHelpers.perm(Arrays.asList(
-                    wrap(s -> s.sorted()),
-                    wrap(s -> s.distinct()),
-                    wrap(s -> s.limit(5), true)
-            ));
-
-    private <R> void testIntTerminal(TestData.OfInt data,
-                                     Function<IntStream, R> terminalF,
-                                     BiConsumer<R, R> equalityAsserter) {
-        testTerminal(data, terminalF, equalityAsserter, intPermutationOfFunctions, StreamShape.INT_VALUE);
-    }
-
-    //
-
-    @Test(dataProvider = "LongStreamTestData", dataProviderClass = LongStreamTestDataProvider.class)
-    public void testLongTerminalOps(String name, TestData.OfLong data) {
-        testLongTerminal(data, s -> { s.forEach(x -> { }); return 0; });
-        testLongTerminal(data, s -> s.findAny(), (a, b) -> assertEquals(a.isPresent(), b.isPresent()));
-        testLongTerminal(data, s -> s.anyMatch(e -> true));
-    }
-
-
-    private <T, R> void testLongTerminal(TestData.OfLong data, Function<LongStream, R> terminalF) {
-        testLongTerminal(data, terminalF, LambdaTestHelpers::assertContentsEqual);
-    }
-
-    private List<List<WrappingUnaryOperator<LongStream>>> longPermutationOfFunctions =
-            LambdaTestHelpers.perm(Arrays.asList(
-                    wrap(s -> s.sorted()),
-                    wrap(s -> s.distinct()),
-                    wrap(s -> s.limit(5), true)
-            ));
-
-    private <R> void testLongTerminal(TestData.OfLong data,
-                                      Function<LongStream, R> terminalF,
-                                      BiConsumer<R, R> equalityAsserter) {
-        testTerminal(data, terminalF, equalityAsserter, longPermutationOfFunctions, StreamShape.LONG_VALUE);
-    }
-
-    //
-
-    @Test(dataProvider = "DoubleStreamTestData", dataProviderClass = DoubleStreamTestDataProvider.class)
-    public void testDoubleTerminalOps(String name, TestData.OfDouble data) {
-        testDoubleTerminal(data, s -> { s.forEach(x -> { }); return 0; });
-        testDoubleTerminal(data, s -> s.findAny(), (a, b) -> assertEquals(a.isPresent(), b.isPresent()));
-        testDoubleTerminal(data, s -> s.anyMatch(e -> true));
-    }
-
-
-    private <T, R> void testDoubleTerminal(TestData.OfDouble data, Function<DoubleStream, R> terminalF) {
-        testDoubleTerminal(data, terminalF, LambdaTestHelpers::assertContentsEqual);
-    }
-
-    private List<List<WrappingUnaryOperator<DoubleStream>>> doublePermutationOfFunctions =
-            LambdaTestHelpers.perm(Arrays.asList(
-                    wrap(s -> s.sorted()),
-                    wrap(s -> s.distinct()),
-                    wrap(s -> s.limit(5), true)
-            ));
-
-    private <R> void testDoubleTerminal(TestData.OfDouble data,
-                                        Function<DoubleStream, R> terminalF,
-                                        BiConsumer<R, R> equalityAsserter) {
-        testTerminal(data, terminalF, equalityAsserter, doublePermutationOfFunctions, StreamShape.DOUBLE_VALUE);
-    }
-
-    //
-
-    private <T, S extends BaseStream<T, S>, R> void testTerminal(TestData<T, S> data,
-                                                                 Function<S, R> terminalF,
-                                                                 BiConsumer<R, R> equalityAsserter,
-                                                                 List<List<WrappingUnaryOperator<S>>> pFunctions,
-                                                                 StreamShape shape) {
-        CheckClearOrderedOp<T> checkClearOrderedOp = new CheckClearOrderedOp<>(shape);
-        for (List<WrappingUnaryOperator<S>> f : pFunctions) {
-            @SuppressWarnings("unchecked")
-            UnaryOperator<S> fi = interpose(f, (S s) -> (S) chain(s, checkClearOrderedOp));
-            withData(data).
-                    terminal(fi, terminalF).
-                    equalator(equalityAsserter).
-                    exercise();
-        }
-
-        CheckSetOrderedOp<T> checkSetOrderedOp = new CheckSetOrderedOp<>(shape);
-        for (List<WrappingUnaryOperator<S>> f : pFunctions) {
-            @SuppressWarnings("unchecked")
-            UnaryOperator<S> fi = interpose(f, (S s) -> (S) chain(s, checkSetOrderedOp));
-            withData(data).
-                    terminal(fi, s -> terminalF.apply(s.sequential())).
-                    equalator(equalityAsserter).
-                    exercise();
-        }
-    }
-
-    static class CheckClearOrderedOp<T> implements StatelessTestOp<T, T> {
-        private final StreamShape shape;
-
-        CheckClearOrderedOp(StreamShape shape) {
-            this.shape = shape;
-        }
-
-        @Override
-        public StreamShape outputShape() {
-            return shape;
-        }
-
-        @Override
-        public StreamShape inputShape() {
-            return shape;
-        }
-
-        @Override
-        public Sink<T> opWrapSink(int flags, boolean parallel, Sink<T> sink) {
-            if (parallel) {
-                assertTrue(StreamOpFlag.ORDERED.isCleared(flags));
-            }
-
-            return sink;
-        }
-    }
-
-    static class CheckSetOrderedOp<T> extends CheckClearOrderedOp<T> {
-
-        CheckSetOrderedOp(StreamShape shape) {
-            super(shape);
-        }
-
-        @Override
-        public Sink<T> opWrapSink(int flags, boolean parallel, Sink<T> sink) {
-            assertTrue(StreamOpFlag.ORDERED.isKnown(flags) || StreamOpFlag.ORDERED.isPreserved(flags));
-
-            return sink;
-        }
-    }
-
-    private <T, S extends BaseStream<T, S>>
-    UnaryOperator<S> interpose(List<WrappingUnaryOperator<S>> fs, UnaryOperator<S> fi) {
-        int l = -1;
-        for (int i = 0; i < fs.size(); i++) {
-            if (fs.get(i).isLimit) {
-                l = i;
-            }
-        }
-
-        final int lastLimitIndex = l;
-        return s -> {
-            if (lastLimitIndex == -1)
-                s = fi.apply(s);
-            for (int i = 0; i < fs.size(); i++) {
-                s = fs.get(i).apply(s);
-                if (i >= lastLimitIndex) {
-                    s = fi.apply(s);
-                }
-            }
-            return s;
-        };
-    }
-}
--- a/test/java/util/stream/test/org/openjdk/tests/java/util/SplittableRandomTest.java	Thu Jun 25 16:49:08 2015 -0700
+++ b/test/java/util/stream/test/org/openjdk/tests/java/util/SplittableRandomTest.java	Tue Jun 23 09:49:55 2015 +0200
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2013, Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2013, 2015, Oracle and/or its affiliates. All rights reserved.
  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
  *
  * This code is free software; you can redistribute it and/or modify it
@@ -192,7 +192,7 @@
     public void testInts(TestData.OfInt data, ResultAsserter<Iterable<Integer>> ra) {
         withData(data).
                 stream(s -> s).
-                without(IntStreamTestScenario.PAR_STREAM_TO_ARRAY_CLEAR_SIZED).
+                without(IntStreamTestScenario.CLEAR_SIZED_SCENARIOS).
                 resultAsserter(ra).
                 exercise();
     }
@@ -276,7 +276,7 @@
     public void testLongs(TestData.OfLong data, ResultAsserter<Iterable<Long>> ra) {
         withData(data).
                 stream(s -> s).
-                without(LongStreamTestScenario.PAR_STREAM_TO_ARRAY_CLEAR_SIZED).
+                without(LongStreamTestScenario.CLEAR_SIZED_SCENARIOS).
                 resultAsserter(ra).
                 exercise();
     }
@@ -360,7 +360,7 @@
     public void testDoubles(TestData.OfDouble data, ResultAsserter<Iterable<Double>> ra) {
         withData(data).
                 stream(s -> s).
-                without(DoubleStreamTestScenario.PAR_STREAM_TO_ARRAY_CLEAR_SIZED).
+                without(DoubleStreamTestScenario.CLEAR_SIZED_SCENARIOS).
                 resultAsserter(ra).
                 exercise();
     }
--- a/test/java/util/stream/test/org/openjdk/tests/java/util/stream/DistinctOpTest.java	Thu Jun 25 16:49:08 2015 -0700
+++ b/test/java/util/stream/test/org/openjdk/tests/java/util/stream/DistinctOpTest.java	Tue Jun 23 09:49:55 2015 +0200
@@ -32,7 +32,16 @@
 import java.util.Spliterator;
 import java.util.Spliterators;
 import java.util.concurrent.ThreadLocalRandom;
-import java.util.stream.*;
+import java.util.stream.CollectorOps;
+import java.util.stream.Collectors;
+import java.util.stream.DoubleStream;
+import java.util.stream.IntStream;
+import java.util.stream.LongStream;
+import java.util.stream.OpTestCase;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
+import java.util.stream.StreamTestDataProvider;
+import java.util.stream.TestData;
 
 import static java.util.stream.LambdaTestHelpers.*;
 
@@ -67,7 +76,12 @@
 
     @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class)
     public void testOp(String name, TestData.OfRef<Integer> data) {
-        Collection<Integer> result = exerciseOpsInt(data, Stream::distinct, IntStream::distinct, LongStream::distinct, DoubleStream::distinct);
+        Collection<Integer> result = exerciseOpsInt(
+                data,
+                Stream::distinct,
+                IntStream::distinct,
+                LongStream::distinct,
+                DoubleStream::distinct);
 
         assertUnique(result);
         assertTrue((data.size() > 0) ? result.size() > 0 : result.size() == 0);
@@ -127,10 +141,14 @@
 
     @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class)
     public void testDistinctDistinct(String name, TestData.OfRef<Integer> data) {
-        Collection<Integer> result = withData(data)
-                .stream(s -> s.distinct().distinct(), new CollectorOps.TestParallelSizedOp<>())
-                .exercise();
-        assertUnique(result);
+        Collection<Integer> result = exerciseOpsInt(
+                data,
+                s -> s.distinct().distinct(),
+                s -> s.distinct().distinct(),
+                s -> s.distinct().distinct(),
+                s -> s.distinct().distinct());
+
+         assertUnique(result);
     }
 
     @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class)
@@ -152,4 +170,31 @@
         assertUnique(result);
         assertSorted(result);
     }
+
+    @Test
+    public void testStable() {
+        // Create N instances of Integer all with the same value
+        List<Integer> input = IntStream.rangeClosed(0, 1000)
+                .mapToObj(i -> new Integer(1000)) // explicit construction
+                .collect(Collectors.toList());
+        Integer expectedElement = input.get(0);
+        TestData<Integer, Stream<Integer>> data = TestData.Factory.ofCollection(
+                "1000 instances of Integer with the same value", input);
+
+        withData(data)
+                .stream(Stream::distinct)
+                .resultAsserter((actual, expected, isOrdered, isParallel) -> {
+                    List<Integer> l = new ArrayList<>();
+                    actual.forEach(l::add);
+
+                    // Assert stability
+                    // The single result element should be equal in identity to
+                    // the first input element
+                    assertEquals(l.size(), 1);
+                    assertEquals(System.identityHashCode(l.get(0)),
+                                 System.identityHashCode(expectedElement));
+
+                })
+                .exercise();
+    }
 }
--- a/test/java/util/stream/test/org/openjdk/tests/java/util/stream/InfiniteStreamWithLimitOpTest.java	Thu Jun 25 16:49:08 2015 -0700
+++ b/test/java/util/stream/test/org/openjdk/tests/java/util/stream/InfiniteStreamWithLimitOpTest.java	Tue Jun 23 09:49:55 2015 +0200
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2012, 2015, Oracle and/or its affiliates. All rights reserved.
  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
  *
  * This code is free software; you can redistribute it and/or modify it
@@ -181,7 +181,7 @@
         // slice implementations
         withData(refLongs()).
                 stream(s -> fs.apply(s)).
-                without(StreamTestScenario.PAR_STREAM_TO_ARRAY_CLEAR_SIZED).
+                without(StreamTestScenario.CLEAR_SIZED_SCENARIOS).
                 exercise();
     }
 
@@ -192,7 +192,7 @@
         // slice implementations
         withData(ints()).
                 stream(s -> fs.apply(s)).
-                without(IntStreamTestScenario.PAR_STREAM_TO_ARRAY_CLEAR_SIZED).
+                without(IntStreamTestScenario.CLEAR_SIZED_SCENARIOS).
                 exercise();
     }
 
@@ -203,7 +203,7 @@
         // slice implementations
         withData(longs()).
                 stream(s -> fs.apply(s)).
-                without(LongStreamTestScenario.PAR_STREAM_TO_ARRAY_CLEAR_SIZED).
+                without(LongStreamTestScenario.CLEAR_SIZED_SCENARIOS).
                 exercise();
     }
 
@@ -214,7 +214,7 @@
         // slice implementations
         withData(doubles()).
                 stream(s -> fs.apply(s)).
-                without(DoubleStreamTestScenario.PAR_STREAM_TO_ARRAY_CLEAR_SIZED).
+                without(DoubleStreamTestScenario.CLEAR_SIZED_SCENARIOS).
                 exercise();
     }