Mercurial > hg > openjdk > aarch64-port > jdk
changeset 10921:a49d60c55b74
8129120: Terminal operation properties should not be back-propagated to upstream operations
Reviewed-by: briangoetz, chegar
author | psandoz |
---|---|
date | Tue, 23 Jun 2015 09:49:55 +0200 |
parents | 009d3bbe66bd |
children | bf5f41bd4710 |
files | src/share/classes/java/util/stream/AbstractPipeline.java test/java/util/stream/bootlib/java/util/stream/DoubleStreamTestScenario.java test/java/util/stream/bootlib/java/util/stream/IntStreamTestScenario.java test/java/util/stream/bootlib/java/util/stream/LongStreamTestScenario.java test/java/util/stream/bootlib/java/util/stream/OpTestCase.java test/java/util/stream/bootlib/java/util/stream/StreamTestScenario.java test/java/util/stream/boottest/java/util/stream/FlagOpTest.java test/java/util/stream/boottest/java/util/stream/UnorderedTest.java test/java/util/stream/test/org/openjdk/tests/java/util/SplittableRandomTest.java test/java/util/stream/test/org/openjdk/tests/java/util/stream/DistinctOpTest.java test/java/util/stream/test/org/openjdk/tests/java/util/stream/InfiniteStreamWithLimitOpTest.java |
diffstat | 11 files changed, 355 insertions(+), 381 deletions(-) [+] |
line wrap: on
line diff
--- a/src/share/classes/java/util/stream/AbstractPipeline.java Thu Jun 25 16:49:08 2015 -0700 +++ b/src/share/classes/java/util/stream/AbstractPipeline.java Tue Jun 23 09:49:55 2015 +0200 @@ -249,6 +249,11 @@ // If the last intermediate operation is stateful then // evaluate directly to avoid an extra collection step if (isParallel() && previousStage != null && opIsStateful()) { + // Set the depth of this, last, pipeline stage to zero to slice the + // pipeline such that this operation will not be included in the + // upstream slice and upstream operations will not be included + // in this slice + depth = 0; return opEvaluateParallel(previousStage, previousStage.sourceSpliterator(0), generator); } else { @@ -379,60 +384,6 @@ } /** - * Prepare the pipeline for a parallel execution. As the pipeline is built, - * the flags and depth indicators are set up for a sequential execution. - * If the execution is parallel, and there are any stateful operations, then - * some of these need to be adjusted, as well as adjusting for flags from - * the terminal operation (such as back-propagating UNORDERED). - * Need not be called for a sequential execution. - * - * @param terminalFlags Operation flags for the terminal operation - */ - private void parallelPrepare(int terminalFlags) { - @SuppressWarnings("rawtypes") - AbstractPipeline backPropagationHead = sourceStage; - if (sourceStage.sourceAnyStateful) { - int depth = 1; - for ( @SuppressWarnings("rawtypes") AbstractPipeline u = sourceStage, p = sourceStage.nextStage; - p != null; - u = p, p = p.nextStage) { - int thisOpFlags = p.sourceOrOpFlags; - if (p.opIsStateful()) { - // If the stateful operation is a short-circuit operation - // then move the back propagation head forwards - // NOTE: there are no size-injecting ops - if (StreamOpFlag.SHORT_CIRCUIT.isKnown(thisOpFlags)) { - backPropagationHead = p; - // Clear the short circuit flag for next pipeline stage - // This stage encapsulates short-circuiting, the next - // stage may not have any short-circuit operations, and - // if so spliterator.forEachRemaining should be be used - // for traversal - thisOpFlags = thisOpFlags & ~StreamOpFlag.IS_SHORT_CIRCUIT; - } - - depth = 0; - // The following injects size, it is equivalent to: - // StreamOpFlag.combineOpFlags(StreamOpFlag.IS_SIZED, p.combinedFlags); - thisOpFlags = (thisOpFlags & ~StreamOpFlag.NOT_SIZED) | StreamOpFlag.IS_SIZED; - } - p.depth = depth++; - p.combinedFlags = StreamOpFlag.combineOpFlags(thisOpFlags, u.combinedFlags); - } - } - - // Apply the upstream terminal flags - if (terminalFlags != 0) { - int upstreamTerminalFlags = terminalFlags & StreamOpFlag.UPSTREAM_TERMINAL_OP_MASK; - for ( @SuppressWarnings("rawtypes") AbstractPipeline p = backPropagationHead; p.nextStage != null; p = p.nextStage) { - p.combinedFlags = StreamOpFlag.combineOpFlags(upstreamTerminalFlags, p.combinedFlags); - } - - combinedFlags = StreamOpFlag.combineOpFlags(terminalFlags, combinedFlags); - } - } - - /** * Get the source spliterator for this pipeline stage. For a sequential or * stateless parallel pipeline, this is the source spliterator. For a * stateful parallel pipeline, this is a spliterator describing the results @@ -455,31 +406,49 @@ throw new IllegalStateException(MSG_CONSUMED); } - if (isParallel()) { - // @@@ Merge parallelPrepare with the loop below and use the - // spliterator characteristics to determine if SIZED - // should be injected - parallelPrepare(terminalFlags); - + if (isParallel() && sourceStage.sourceAnyStateful) { // Adapt the source spliterator, evaluating each stateful op - // in the pipeline up to and including this pipeline stage - for ( @SuppressWarnings("rawtypes") AbstractPipeline u = sourceStage, p = sourceStage.nextStage, e = this; + // in the pipeline up to and including this pipeline stage. + // The depth and flags of each pipeline stage are adjusted accordingly. + int depth = 1; + for (@SuppressWarnings("rawtypes") AbstractPipeline u = sourceStage, p = sourceStage.nextStage, e = this; u != e; u = p, p = p.nextStage) { + int thisOpFlags = p.sourceOrOpFlags; if (p.opIsStateful()) { + depth = 0; + + if (StreamOpFlag.SHORT_CIRCUIT.isKnown(thisOpFlags)) { + // Clear the short circuit flag for next pipeline stage + // This stage encapsulates short-circuiting, the next + // stage may not have any short-circuit operations, and + // if so spliterator.forEachRemaining should be used + // for traversal + thisOpFlags = thisOpFlags & ~StreamOpFlag.IS_SHORT_CIRCUIT; + } + spliterator = p.opEvaluateParallelLazy(u, spliterator); + + // Inject or clear SIZED on the source pipeline stage + // based on the stage's spliterator + thisOpFlags = spliterator.hasCharacteristics(Spliterator.SIZED) + ? (thisOpFlags & ~StreamOpFlag.NOT_SIZED) | StreamOpFlag.IS_SIZED + : (thisOpFlags & ~StreamOpFlag.IS_SIZED) | StreamOpFlag.NOT_SIZED; } + p.depth = depth++; + p.combinedFlags = StreamOpFlag.combineOpFlags(thisOpFlags, u.combinedFlags); } } - else if (terminalFlags != 0) { + + if (terminalFlags != 0) { + // Apply flags from the terminal operation to last pipeline stage combinedFlags = StreamOpFlag.combineOpFlags(terminalFlags, combinedFlags); } return spliterator; } - // PipelineHelper @Override
--- a/test/java/util/stream/bootlib/java/util/stream/DoubleStreamTestScenario.java Thu Jun 25 16:49:08 2015 -0700 +++ b/test/java/util/stream/bootlib/java/util/stream/DoubleStreamTestScenario.java Tue Jun 23 09:49:55 2015 +0200 @@ -1,5 +1,5 @@ /* - * Copyright (c) 2013, Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2013, 2015, Oracle and/or its affiliates. All rights reserved. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * * This code is free software; you can redistribute it and/or modify it @@ -22,7 +22,10 @@ */ package java.util.stream; +import java.util.Collections; +import java.util.EnumSet; import java.util.PrimitiveIterator; +import java.util.Set; import java.util.Spliterator; import java.util.function.Consumer; import java.util.function.DoubleConsumer; @@ -159,12 +162,50 @@ for (double t : pipe2.toArray()) b.accept(t); } - },; + }, + + // Wrap as parallel stream + forEach synchronizing + PAR_STREAM_FOR_EACH(true, false) { + <T, S_IN extends BaseStream<T, S_IN>> + void _run(TestData<T, S_IN> data, DoubleConsumer b, Function<S_IN, DoubleStream> m) { + m.apply(data.parallelStream()).forEach(e -> { + synchronized (data) { + b.accept(e); + } + }); + } + }, + + // Wrap as parallel stream + forEach synchronizing and clear SIZED flag + PAR_STREAM_FOR_EACH_CLEAR_SIZED(true, false) { + <T, S_IN extends BaseStream<T, S_IN>> + void _run(TestData<T, S_IN> data, DoubleConsumer b, Function<S_IN, DoubleStream> m) { + S_IN pipe1 = (S_IN) OpTestCase.chain(data.parallelStream(), + new FlagDeclaringOp(StreamOpFlag.NOT_SIZED, data.getShape())); + m.apply(pipe1).forEach(e -> { + synchronized (data) { + b.accept(e); + } + }); + } + }, + ; + + // The set of scenarios that clean the SIZED flag + public static final Set<DoubleStreamTestScenario> CLEAR_SIZED_SCENARIOS = Collections.unmodifiableSet( + EnumSet.of(PAR_STREAM_TO_ARRAY_CLEAR_SIZED, PAR_STREAM_FOR_EACH_CLEAR_SIZED)); private boolean isParallel; + private final boolean isOrdered; + DoubleStreamTestScenario(boolean isParallel) { + this(isParallel, true); + } + + DoubleStreamTestScenario(boolean isParallel, boolean isOrdered) { this.isParallel = isParallel; + this.isOrdered = isOrdered; } public StreamShape getShape() { @@ -175,6 +216,10 @@ return isParallel; } + public boolean isOrdered() { + return isOrdered; + } + public <T, U, S_IN extends BaseStream<T, S_IN>, S_OUT extends BaseStream<U, S_OUT>> void run(TestData<T, S_IN> data, Consumer<U> b, Function<S_IN, S_OUT> m) { _run(data, (DoubleConsumer) b, (Function<S_IN, DoubleStream>) m);
--- a/test/java/util/stream/bootlib/java/util/stream/IntStreamTestScenario.java Thu Jun 25 16:49:08 2015 -0700 +++ b/test/java/util/stream/bootlib/java/util/stream/IntStreamTestScenario.java Tue Jun 23 09:49:55 2015 +0200 @@ -1,5 +1,5 @@ /* - * Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2012, 2015, Oracle and/or its affiliates. All rights reserved. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * * This code is free software; you can redistribute it and/or modify it @@ -22,7 +22,10 @@ */ package java.util.stream; +import java.util.Collections; +import java.util.EnumSet; import java.util.PrimitiveIterator; +import java.util.Set; import java.util.Spliterator; import java.util.function.Consumer; import java.util.function.Function; @@ -160,12 +163,50 @@ for (int t : pipe2.toArray()) b.accept(t); } - },; + }, + + // Wrap as parallel stream + forEach synchronizing + PAR_STREAM_FOR_EACH(true, false) { + <T, S_IN extends BaseStream<T, S_IN>> + void _run(TestData<T, S_IN> data, IntConsumer b, Function<S_IN, IntStream> m) { + m.apply(data.parallelStream()).forEach(e -> { + synchronized (data) { + b.accept(e); + } + }); + } + }, - private boolean isParallel; + // Wrap as parallel stream + forEach synchronizing and clear SIZED flag + PAR_STREAM_FOR_EACH_CLEAR_SIZED(true, false) { + <T, S_IN extends BaseStream<T, S_IN>> + void _run(TestData<T, S_IN> data, IntConsumer b, Function<S_IN, IntStream> m) { + S_IN pipe1 = (S_IN) OpTestCase.chain(data.parallelStream(), + new FlagDeclaringOp(StreamOpFlag.NOT_SIZED, data.getShape())); + m.apply(pipe1).forEach(e -> { + synchronized (data) { + b.accept(e); + } + }); + } + }, + ; + + // The set of scenarios that clean the SIZED flag + public static final Set<IntStreamTestScenario> CLEAR_SIZED_SCENARIOS = Collections.unmodifiableSet( + EnumSet.of(PAR_STREAM_TO_ARRAY_CLEAR_SIZED, PAR_STREAM_FOR_EACH_CLEAR_SIZED)); + + private final boolean isParallel; + + private final boolean isOrdered; IntStreamTestScenario(boolean isParallel) { + this(isParallel, true); + } + + IntStreamTestScenario(boolean isParallel, boolean isOrdered) { this.isParallel = isParallel; + this.isOrdered = isOrdered; } public StreamShape getShape() { @@ -176,6 +217,10 @@ return isParallel; } + public boolean isOrdered() { + return isOrdered; + } + public <T, U, S_IN extends BaseStream<T, S_IN>, S_OUT extends BaseStream<U, S_OUT>> void run(TestData<T, S_IN> data, Consumer<U> b, Function<S_IN, S_OUT> m) { _run(data, (IntConsumer) b, (Function<S_IN, IntStream>) m);
--- a/test/java/util/stream/bootlib/java/util/stream/LongStreamTestScenario.java Thu Jun 25 16:49:08 2015 -0700 +++ b/test/java/util/stream/bootlib/java/util/stream/LongStreamTestScenario.java Tue Jun 23 09:49:55 2015 +0200 @@ -1,5 +1,5 @@ /* - * Copyright (c) 2013, Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2013, 2015, Oracle and/or its affiliates. All rights reserved. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * * This code is free software; you can redistribute it and/or modify it @@ -22,7 +22,10 @@ */ package java.util.stream; +import java.util.Collections; +import java.util.EnumSet; import java.util.PrimitiveIterator; +import java.util.Set; import java.util.Spliterator; import java.util.function.Consumer; import java.util.function.Function; @@ -159,12 +162,50 @@ for (long t : pipe2.toArray()) b.accept(t); } - },; + }, + + // Wrap as parallel stream + forEach synchronizing + PAR_STREAM_FOR_EACH(true, false) { + <T, S_IN extends BaseStream<T, S_IN>> + void _run(TestData<T, S_IN> data, LongConsumer b, Function<S_IN, LongStream> m) { + m.apply(data.parallelStream()).forEach(e -> { + synchronized (data) { + b.accept(e); + } + }); + } + }, + + // Wrap as parallel stream + forEach synchronizing and clear SIZED flag + PAR_STREAM_FOR_EACH_CLEAR_SIZED(true, false) { + <T, S_IN extends BaseStream<T, S_IN>> + void _run(TestData<T, S_IN> data, LongConsumer b, Function<S_IN, LongStream> m) { + S_IN pipe1 = (S_IN) OpTestCase.chain(data.parallelStream(), + new FlagDeclaringOp(StreamOpFlag.NOT_SIZED, data.getShape())); + m.apply(pipe1).forEach(e -> { + synchronized (data) { + b.accept(e); + } + }); + } + }, + ; + + // The set of scenarios that clean the SIZED flag + public static final Set<LongStreamTestScenario> CLEAR_SIZED_SCENARIOS = Collections.unmodifiableSet( + EnumSet.of(PAR_STREAM_TO_ARRAY_CLEAR_SIZED, PAR_STREAM_FOR_EACH_CLEAR_SIZED)); private boolean isParallel; + private final boolean isOrdered; + LongStreamTestScenario(boolean isParallel) { + this(isParallel, true); + } + + LongStreamTestScenario(boolean isParallel, boolean isOrdered) { this.isParallel = isParallel; + this.isOrdered = isOrdered; } public StreamShape getShape() { @@ -175,6 +216,10 @@ return isParallel; } + public boolean isOrdered() { + return isOrdered; + } + public <T, U, S_IN extends BaseStream<T, S_IN>, S_OUT extends BaseStream<U, S_OUT>> void run(TestData<T, S_IN> data, Consumer<U> b, Function<S_IN, S_OUT> m) { _run(data, (LongConsumer) b, (Function<S_IN, LongStream>) m);
--- a/test/java/util/stream/bootlib/java/util/stream/OpTestCase.java Thu Jun 25 16:49:08 2015 -0700 +++ b/test/java/util/stream/bootlib/java/util/stream/OpTestCase.java Tue Jun 23 09:49:55 2015 +0200 @@ -1,5 +1,5 @@ /* - * Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2012, 2015, Oracle and/or its affiliates. All rights reserved. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * * This code is free software; you can redistribute it and/or modify it @@ -30,6 +30,7 @@ import java.util.Collections; import java.util.EnumMap; import java.util.EnumSet; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -91,11 +92,13 @@ boolean isParallel(); - abstract <T, U, S_IN extends BaseStream<T, S_IN>, S_OUT extends BaseStream<U, S_OUT>> + boolean isOrdered(); + + <T, U, S_IN extends BaseStream<T, S_IN>, S_OUT extends BaseStream<U, S_OUT>> void run(TestData<T, S_IN> data, Consumer<U> b, Function<S_IN, S_OUT> m); } - public <T, U, S_IN extends BaseStream<T, S_IN>, S_OUT extends BaseStream<U, S_OUT>> + protected <T, U, S_IN extends BaseStream<T, S_IN>, S_OUT extends BaseStream<U, S_OUT>> Collection<U> exerciseOps(TestData<T, S_IN> data, Function<S_IN, S_OUT> m) { return withData(data).stream(m).exercise(); } @@ -103,7 +106,7 @@ // Run multiple versions of exercise(), returning the result of the first, and asserting that others return the same result // If the first version is s -> s.foo(), can be used with s -> s.mapToInt(i -> i).foo().mapToObj(i -> i) to test all shape variants @SafeVarargs - public final<T, U, S_IN extends BaseStream<T, S_IN>, S_OUT extends BaseStream<U, S_OUT>> + protected final<T, U, S_IN extends BaseStream<T, S_IN>, S_OUT extends BaseStream<U, S_OUT>> Collection<U> exerciseOpsMulti(TestData<T, S_IN> data, Function<S_IN, S_OUT>... ms) { Collection<U> result = null; @@ -121,7 +124,7 @@ // Run multiple versions of exercise() for an Integer stream, returning the result of the first, and asserting that others return the same result // Automates the conversion between Stream<Integer> and {Int,Long,Double}Stream and back, so client sites look like you are passing the same // lambda four times, but in fact they are four different lambdas since they are transforming four different kinds of streams - public final + protected final Collection<Integer> exerciseOpsInt(TestData.OfRef<Integer> data, Function<Stream<Integer>, Stream<Integer>> mRef, Function<IntStream, IntStream> mInt, @@ -136,30 +139,73 @@ return exerciseOpsMulti(data, ms); } - public <T, U, S_OUT extends BaseStream<U, S_OUT>> + // Run multiple versions of exercise() with multiple terminal operations for all kinds of stream, , and asserting against the expected result + // If the first version is s -> s.foo(), can be used with s -> s.mapToInt(i -> i).foo().mapToObj(i -> i) to test all shape variants + protected final<T, U, R, S_IN extends BaseStream<T, S_IN>, S_OUT extends BaseStream<U, S_OUT>> + void exerciseTerminalOpsMulti(TestData<T, S_IN> data, + R expected, + Map<String, Function<S_IN, S_OUT>> streams, + Map<String, Function<S_OUT, R>> terminals) { + for (Map.Entry<String, Function<S_IN, S_OUT>> se : streams.entrySet()) { + setContext("Intermediate stream", se.getKey()); + for (Map.Entry<String, Function<S_OUT, R>> te : terminals.entrySet()) { + setContext("Terminal stream", te.getKey()); + withData(data) + .terminal(se.getValue(), te.getValue()) + .expectedResult(expected) + .exercise(); + + } + } + } + + // Run multiple versions of exercise() with multiple terminal operation for all kinds of stream, and asserting against the expected result + // Automates the conversion between Stream<Integer> and {Int,Long,Double}Stream and back, so client sites look like you are passing the same + // lambda four times, but in fact they are four different lambdas since they are transforming four different kinds of streams + protected final + void exerciseTerminalOpsInt(TestData<Integer, Stream<Integer>> data, + Collection<Integer> expected, + String desc, + Function<Stream<Integer>, Stream<Integer>> mRef, + Function<IntStream, IntStream> mInt, + Function<LongStream, LongStream> mLong, + Function<DoubleStream, DoubleStream> mDouble, + Map<String, Function<Stream<Integer>, Collection<Integer>>> terminals) { + + Map<String, Function<Stream<Integer>, Stream<Integer>>> m = new HashMap<>(); + m.put("Ref " + desc, mRef); + m.put("Int " + desc, s -> mInt.apply(s.mapToInt(e -> e)).mapToObj(e -> e)); + m.put("Long " + desc, s -> mLong.apply(s.mapToLong(e -> e)).mapToObj(e -> (int) e)); + m.put("Double " + desc, s -> mDouble.apply(s.mapToDouble(e -> e)).mapToObj(e -> (int) e)); + + exerciseTerminalOpsMulti(data, expected, m, terminals); + } + + + protected <T, U, S_OUT extends BaseStream<U, S_OUT>> Collection<U> exerciseOps(Collection<T> data, Function<Stream<T>, S_OUT> m) { TestData.OfRef<T> data1 = TestData.Factory.ofCollection("Collection of type " + data.getClass().getName(), data); return withData(data1).stream(m).exercise(); } - public <T, U, S_OUT extends BaseStream<U, S_OUT>, I extends Iterable<U>> + protected <T, U, S_OUT extends BaseStream<U, S_OUT>, I extends Iterable<U>> Collection<U> exerciseOps(Collection<T> data, Function<Stream<T>, S_OUT> m, I expected) { TestData.OfRef<T> data1 = TestData.Factory.ofCollection("Collection of type " + data.getClass().getName(), data); return withData(data1).stream(m).expectedResult(expected).exercise(); } @SuppressWarnings("unchecked") - public <U, S_OUT extends BaseStream<U, S_OUT>> + protected <U, S_OUT extends BaseStream<U, S_OUT>> Collection<U> exerciseOps(int[] data, Function<IntStream, S_OUT> m) { return withData(TestData.Factory.ofArray("int array", data)).stream(m).exercise(); } - public Collection<Integer> exerciseOps(int[] data, Function<IntStream, IntStream> m, int[] expected) { + protected Collection<Integer> exerciseOps(int[] data, Function<IntStream, IntStream> m, int[] expected) { TestData.OfInt data1 = TestData.Factory.ofArray("int array", data); return withData(data1).stream(m).expectedResult(expected).exercise(); } - public <T, S_IN extends BaseStream<T, S_IN>> DataStreamBuilder<T, S_IN> withData(TestData<T, S_IN> data) { + protected <T, S_IN extends BaseStream<T, S_IN>> DataStreamBuilder<T, S_IN> withData(TestData<T, S_IN> data) { Objects.requireNonNull(data); return new DataStreamBuilder<>(data); } @@ -325,19 +371,19 @@ // Build method public Collection<U> exercise() { - final boolean isOrdered; + final boolean isStreamOrdered; if (refResult == null) { // Induce the reference result before.accept(data); S_OUT sOut = m.apply(data.stream()); - isOrdered = StreamOpFlag.ORDERED.isKnown(((AbstractPipeline) sOut).getStreamFlags()); + isStreamOrdered = StreamOpFlag.ORDERED.isKnown(((AbstractPipeline) sOut).getStreamFlags()); Node<U> refNodeResult = ((AbstractPipeline<?, U, ?>) sOut).evaluateToArrayNode(size -> (U[]) new Object[size]); refResult = LambdaTestHelpers.toBoxedList(refNodeResult.spliterator()); after.accept(data); } else { S_OUT sOut = m.apply(data.stream()); - isOrdered = StreamOpFlag.ORDERED.isKnown(((AbstractPipeline) sOut).getStreamFlags()); + isStreamOrdered = StreamOpFlag.ORDERED.isKnown(((AbstractPipeline) sOut).getStreamFlags()); } List<Error> errors = new ArrayList<>(); @@ -348,7 +394,7 @@ List<U> result = new ArrayList<>(); test.run(data, LambdaTestHelpers.<U>toBoxingConsumer(result::add), m); - Runnable asserter = () -> resultAsserter.assertResult(result, refResult, isOrdered, test.isParallel()); + Runnable asserter = () -> resultAsserter.assertResult(result, refResult, isStreamOrdered && test.isOrdered(), test.isParallel()); if (refResult.size() > 1000) { LambdaTestHelpers.launderAssertion( @@ -406,7 +452,7 @@ } @SuppressWarnings({"rawtypes", "unchecked"}) - static enum TerminalTestScenario implements BaseTerminalTestScenario { + enum TerminalTestScenario implements BaseTerminalTestScenario { SINGLE_SEQUENTIAL(true, false), SINGLE_SEQUENTIAL_SHORT_CIRCUIT(true, false) { @@ -546,19 +592,19 @@ } } - public <T, R> R exerciseTerminalOps(Collection<T> data, Function<Stream<T>, R> m, R expected) { + protected <T, R> R exerciseTerminalOps(Collection<T> data, Function<Stream<T>, R> m, R expected) { TestData.OfRef<T> data1 = TestData.Factory.ofCollection("Collection of type " + data.getClass().getName(), data); return withData(data1).terminal(m).expectedResult(expected).exercise(); } - public <T, R, S_IN extends BaseStream<T, S_IN>> R + protected <T, R, S_IN extends BaseStream<T, S_IN>> R exerciseTerminalOps(TestData<T, S_IN> data, Function<S_IN, R> terminalF) { return withData(data).terminal(terminalF).exercise(); } - public <T, U, R, S_IN extends BaseStream<T, S_IN>, S_OUT extends BaseStream<U, S_OUT>> R + protected <T, U, R, S_IN extends BaseStream<T, S_IN>, S_OUT extends BaseStream<U, S_OUT>> R exerciseTerminalOps(TestData<T, S_IN> data, Function<S_IN, S_OUT> streamF, Function<S_OUT, R> terminalF) {
--- a/test/java/util/stream/bootlib/java/util/stream/StreamTestScenario.java Thu Jun 25 16:49:08 2015 -0700 +++ b/test/java/util/stream/bootlib/java/util/stream/StreamTestScenario.java Tue Jun 23 09:49:55 2015 +0200 @@ -1,5 +1,5 @@ /* - * Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2012, 2015, Oracle and/or its affiliates. All rights reserved. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * * This code is free software; you can redistribute it and/or modify it @@ -22,7 +22,10 @@ */ package java.util.stream; +import java.util.Collections; +import java.util.EnumSet; import java.util.Iterator; +import java.util.Set; import java.util.Spliterator; import java.util.function.Consumer; import java.util.function.Function; @@ -173,8 +176,8 @@ } }, - // Wrap as parallel + collect - PAR_STREAM_COLLECT(true) { + // Wrap as parallel + collect to list + PAR_STREAM_COLLECT_TO_LIST(true) { <T, U, S_IN extends BaseStream<T, S_IN>> void _run(TestData<T, S_IN> data, Consumer<U> b, Function<S_IN, Stream<U>> m) { for (U u : m.apply(data.parallelStream()).collect(Collectors.toList())) @@ -182,8 +185,8 @@ } }, - // Wrap sequential as parallel, + collect - STREAM_TO_PAR_STREAM_COLLECT(true) { + // Wrap sequential as parallel, + collect to list + STREAM_TO_PAR_STREAM_COLLECT_TO_LIST(true) { <T, U, S_IN extends BaseStream<T, S_IN>> void _run(TestData<T, S_IN> data, Consumer<U> b, Function<S_IN, Stream<U>> m) { for (U u : m.apply(data.stream().parallel()).collect(Collectors.toList())) @@ -192,19 +195,56 @@ }, // Wrap parallel as sequential,, + collect - PAR_STREAM_TO_STREAM_COLLECT(true) { + PAR_STREAM_TO_STREAM_COLLECT_TO_LIST(true) { <T, U, S_IN extends BaseStream<T, S_IN>> void _run(TestData<T, S_IN> data, Consumer<U> b, Function<S_IN, Stream<U>> m) { for (U u : m.apply(data.parallelStream().sequential()).collect(Collectors.toList())) b.accept(u); } }, + + // Wrap as parallel stream + forEach synchronizing + PAR_STREAM_FOR_EACH(true, false) { + <T, U, S_IN extends BaseStream<T, S_IN>> + void _run(TestData<T, S_IN> data, Consumer<U> b, Function<S_IN, Stream<U>> m) { + m.apply(data.parallelStream()).forEach(e -> { + synchronized (data) { + b.accept(e); + } + }); + } + }, + + // Wrap as parallel stream + forEach synchronizing and clear SIZED flag + PAR_STREAM_FOR_EACH_CLEAR_SIZED(true, false) { + <T, U, S_IN extends BaseStream<T, S_IN>> + void _run(TestData<T, S_IN> data, Consumer<U> b, Function<S_IN, Stream<U>> m) { + S_IN pipe1 = (S_IN) OpTestCase.chain(data.parallelStream(), + new FlagDeclaringOp(StreamOpFlag.NOT_SIZED, data.getShape())); + m.apply(pipe1).forEach(e -> { + synchronized (data) { + b.accept(e); + } + }); + } + }, ; - private boolean isParallel; + // The set of scenarios that clean the SIZED flag + public static final Set<StreamTestScenario> CLEAR_SIZED_SCENARIOS = Collections.unmodifiableSet( + EnumSet.of(PAR_STREAM_TO_ARRAY_CLEAR_SIZED, PAR_STREAM_FOR_EACH_CLEAR_SIZED)); + + private final boolean isParallel; + + private final boolean isOrdered; StreamTestScenario(boolean isParallel) { + this(isParallel, true); + } + + StreamTestScenario(boolean isParallel, boolean isOrdered) { this.isParallel = isParallel; + this.isOrdered = isOrdered; } public StreamShape getShape() { @@ -215,6 +255,10 @@ return isParallel; } + public boolean isOrdered() { + return isOrdered; + } + public <T, U, S_IN extends BaseStream<T, S_IN>, S_OUT extends BaseStream<U, S_OUT>> void run(TestData<T, S_IN> data, Consumer<U> b, Function<S_IN, S_OUT> m) { _run(data, b, (Function<S_IN, Stream<U>>) m);
--- a/test/java/util/stream/boottest/java/util/stream/FlagOpTest.java Thu Jun 25 16:49:08 2015 -0700 +++ b/test/java/util/stream/boottest/java/util/stream/FlagOpTest.java Tue Jun 23 09:49:55 2015 +0200 @@ -1,5 +1,5 @@ /* - * Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2012, 2015, Oracle and/or its affiliates. All rights reserved. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * * This code is free software; you can redistribute it and/or modify it @@ -112,7 +112,7 @@ FlagDeclaringOp[] opsArray = ops.toArray(new FlagDeclaringOp[ops.size()]); withData(data).ops(opsArray). - without(StreamTestScenario.PAR_STREAM_TO_ARRAY_CLEAR_SIZED). + without(StreamTestScenario.CLEAR_SIZED_SCENARIOS). exercise(); } @@ -152,7 +152,7 @@ withData(data).ops(opsArray). - without(StreamTestScenario.PAR_STREAM_TO_ARRAY_CLEAR_SIZED). + without(StreamTestScenario.CLEAR_SIZED_SCENARIOS). exercise(); } @@ -185,7 +185,7 @@ IntermediateTestOp[] opsArray = ops.toArray(new IntermediateTestOp[ops.size()]); withData(data).ops(opsArray). - without(StreamTestScenario.PAR_STREAM_TO_ARRAY_CLEAR_SIZED). + without(StreamTestScenario.CLEAR_SIZED_SCENARIOS). exercise(); } @@ -221,7 +221,7 @@ IntermediateTestOp[] opsArray = ops.toArray(new IntermediateTestOp[ops.size()]); withData(data).ops(opsArray). - without(StreamTestScenario.PAR_STREAM_TO_ARRAY_CLEAR_SIZED). + without(StreamTestScenario.CLEAR_SIZED_SCENARIOS). exercise(); }
--- a/test/java/util/stream/boottest/java/util/stream/UnorderedTest.java Thu Jun 25 16:49:08 2015 -0700 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,265 +0,0 @@ -/* - * Copyright (c) 2013, Oracle and/or its affiliates. All rights reserved. - * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. - * - * This code is free software; you can redistribute it and/or modify it - * under the terms of the GNU General Public License version 2 only, as - * published by the Free Software Foundation. - * - * This code is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License - * version 2 for more details (a copy is included in the LICENSE file that - * accompanied this code). - * - * You should have received a copy of the GNU General Public License version - * 2 along with this work; if not, write to the Free Software Foundation, - * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. - * - * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA - * or visit www.oracle.com if you need additional information or have any - * questions. - */ -package java.util.stream; - -import org.testng.annotations.Test; - -import java.util.Arrays; -import java.util.List; -import java.util.function.BiConsumer; -import java.util.function.Function; -import java.util.function.UnaryOperator; - -@Test -public class UnorderedTest extends OpTestCase { - - @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class) - public void testTerminalOps(String name, TestData<Integer, Stream<Integer>> data) { - testTerminal(data, s -> { s.forEach(x -> { }); return 0; }); - - testTerminal(data, s -> s.findAny(), (a, b) -> assertEquals(a.isPresent(), b.isPresent())); - - testTerminal(data, s -> s.anyMatch(e -> true)); - } - - - private <T, R> void testTerminal(TestData<T, Stream<T>> data, Function<Stream<T>, R> terminalF) { - testTerminal(data, terminalF, LambdaTestHelpers::assertContentsEqual); - } - - static class WrappingUnaryOperator<S> implements UnaryOperator<S> { - - final boolean isLimit; - final UnaryOperator<S> uo; - - WrappingUnaryOperator(UnaryOperator<S> uo) { - this(uo, false); - } - - WrappingUnaryOperator(UnaryOperator<S> uo, boolean isLimit) { - this.uo = uo; - this.isLimit = isLimit; - } - - @Override - public S apply(S s) { - return uo.apply(s); - } - } - - static <S> WrappingUnaryOperator<S> wrap(UnaryOperator<S> uo) { - return new WrappingUnaryOperator<>(uo); - } - - static <S> WrappingUnaryOperator<S> wrap(UnaryOperator<S> uo, boolean isLimit) { - return new WrappingUnaryOperator<>(uo, isLimit); - } - - @SuppressWarnings("rawtypes") - private List permutationOfFunctions = - LambdaTestHelpers.perm(Arrays.<WrappingUnaryOperator<Stream<Object>>>asList( - wrap(s -> s.sorted()), - wrap(s -> s.distinct()), - wrap(s -> s.limit(5), true) - )); - - @SuppressWarnings("unchecked") - private <T, R> void testTerminal(TestData<T, Stream<T>> data, - Function<Stream<T>, R> terminalF, - BiConsumer<R, R> equalityAsserter) { - testTerminal(data, terminalF, equalityAsserter, permutationOfFunctions, StreamShape.REFERENCE); - } - - // - - @Test(dataProvider = "IntStreamTestData", dataProviderClass = IntStreamTestDataProvider.class) - public void testIntTerminalOps(String name, TestData.OfInt data) { - testIntTerminal(data, s -> { s.forEach(x -> { }); return 0; }); - testIntTerminal(data, s -> s.findAny(), (a, b) -> assertEquals(a.isPresent(), b.isPresent())); - testIntTerminal(data, s -> s.anyMatch(e -> true)); - } - - - private <T, R> void testIntTerminal(TestData.OfInt data, Function<IntStream, R> terminalF) { - testIntTerminal(data, terminalF, LambdaTestHelpers::assertContentsEqual); - } - - private List<List<WrappingUnaryOperator<IntStream>>> intPermutationOfFunctions = - LambdaTestHelpers.perm(Arrays.asList( - wrap(s -> s.sorted()), - wrap(s -> s.distinct()), - wrap(s -> s.limit(5), true) - )); - - private <R> void testIntTerminal(TestData.OfInt data, - Function<IntStream, R> terminalF, - BiConsumer<R, R> equalityAsserter) { - testTerminal(data, terminalF, equalityAsserter, intPermutationOfFunctions, StreamShape.INT_VALUE); - } - - // - - @Test(dataProvider = "LongStreamTestData", dataProviderClass = LongStreamTestDataProvider.class) - public void testLongTerminalOps(String name, TestData.OfLong data) { - testLongTerminal(data, s -> { s.forEach(x -> { }); return 0; }); - testLongTerminal(data, s -> s.findAny(), (a, b) -> assertEquals(a.isPresent(), b.isPresent())); - testLongTerminal(data, s -> s.anyMatch(e -> true)); - } - - - private <T, R> void testLongTerminal(TestData.OfLong data, Function<LongStream, R> terminalF) { - testLongTerminal(data, terminalF, LambdaTestHelpers::assertContentsEqual); - } - - private List<List<WrappingUnaryOperator<LongStream>>> longPermutationOfFunctions = - LambdaTestHelpers.perm(Arrays.asList( - wrap(s -> s.sorted()), - wrap(s -> s.distinct()), - wrap(s -> s.limit(5), true) - )); - - private <R> void testLongTerminal(TestData.OfLong data, - Function<LongStream, R> terminalF, - BiConsumer<R, R> equalityAsserter) { - testTerminal(data, terminalF, equalityAsserter, longPermutationOfFunctions, StreamShape.LONG_VALUE); - } - - // - - @Test(dataProvider = "DoubleStreamTestData", dataProviderClass = DoubleStreamTestDataProvider.class) - public void testDoubleTerminalOps(String name, TestData.OfDouble data) { - testDoubleTerminal(data, s -> { s.forEach(x -> { }); return 0; }); - testDoubleTerminal(data, s -> s.findAny(), (a, b) -> assertEquals(a.isPresent(), b.isPresent())); - testDoubleTerminal(data, s -> s.anyMatch(e -> true)); - } - - - private <T, R> void testDoubleTerminal(TestData.OfDouble data, Function<DoubleStream, R> terminalF) { - testDoubleTerminal(data, terminalF, LambdaTestHelpers::assertContentsEqual); - } - - private List<List<WrappingUnaryOperator<DoubleStream>>> doublePermutationOfFunctions = - LambdaTestHelpers.perm(Arrays.asList( - wrap(s -> s.sorted()), - wrap(s -> s.distinct()), - wrap(s -> s.limit(5), true) - )); - - private <R> void testDoubleTerminal(TestData.OfDouble data, - Function<DoubleStream, R> terminalF, - BiConsumer<R, R> equalityAsserter) { - testTerminal(data, terminalF, equalityAsserter, doublePermutationOfFunctions, StreamShape.DOUBLE_VALUE); - } - - // - - private <T, S extends BaseStream<T, S>, R> void testTerminal(TestData<T, S> data, - Function<S, R> terminalF, - BiConsumer<R, R> equalityAsserter, - List<List<WrappingUnaryOperator<S>>> pFunctions, - StreamShape shape) { - CheckClearOrderedOp<T> checkClearOrderedOp = new CheckClearOrderedOp<>(shape); - for (List<WrappingUnaryOperator<S>> f : pFunctions) { - @SuppressWarnings("unchecked") - UnaryOperator<S> fi = interpose(f, (S s) -> (S) chain(s, checkClearOrderedOp)); - withData(data). - terminal(fi, terminalF). - equalator(equalityAsserter). - exercise(); - } - - CheckSetOrderedOp<T> checkSetOrderedOp = new CheckSetOrderedOp<>(shape); - for (List<WrappingUnaryOperator<S>> f : pFunctions) { - @SuppressWarnings("unchecked") - UnaryOperator<S> fi = interpose(f, (S s) -> (S) chain(s, checkSetOrderedOp)); - withData(data). - terminal(fi, s -> terminalF.apply(s.sequential())). - equalator(equalityAsserter). - exercise(); - } - } - - static class CheckClearOrderedOp<T> implements StatelessTestOp<T, T> { - private final StreamShape shape; - - CheckClearOrderedOp(StreamShape shape) { - this.shape = shape; - } - - @Override - public StreamShape outputShape() { - return shape; - } - - @Override - public StreamShape inputShape() { - return shape; - } - - @Override - public Sink<T> opWrapSink(int flags, boolean parallel, Sink<T> sink) { - if (parallel) { - assertTrue(StreamOpFlag.ORDERED.isCleared(flags)); - } - - return sink; - } - } - - static class CheckSetOrderedOp<T> extends CheckClearOrderedOp<T> { - - CheckSetOrderedOp(StreamShape shape) { - super(shape); - } - - @Override - public Sink<T> opWrapSink(int flags, boolean parallel, Sink<T> sink) { - assertTrue(StreamOpFlag.ORDERED.isKnown(flags) || StreamOpFlag.ORDERED.isPreserved(flags)); - - return sink; - } - } - - private <T, S extends BaseStream<T, S>> - UnaryOperator<S> interpose(List<WrappingUnaryOperator<S>> fs, UnaryOperator<S> fi) { - int l = -1; - for (int i = 0; i < fs.size(); i++) { - if (fs.get(i).isLimit) { - l = i; - } - } - - final int lastLimitIndex = l; - return s -> { - if (lastLimitIndex == -1) - s = fi.apply(s); - for (int i = 0; i < fs.size(); i++) { - s = fs.get(i).apply(s); - if (i >= lastLimitIndex) { - s = fi.apply(s); - } - } - return s; - }; - } -}
--- a/test/java/util/stream/test/org/openjdk/tests/java/util/SplittableRandomTest.java Thu Jun 25 16:49:08 2015 -0700 +++ b/test/java/util/stream/test/org/openjdk/tests/java/util/SplittableRandomTest.java Tue Jun 23 09:49:55 2015 +0200 @@ -1,5 +1,5 @@ /* - * Copyright (c) 2013, Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2013, 2015, Oracle and/or its affiliates. All rights reserved. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * * This code is free software; you can redistribute it and/or modify it @@ -192,7 +192,7 @@ public void testInts(TestData.OfInt data, ResultAsserter<Iterable<Integer>> ra) { withData(data). stream(s -> s). - without(IntStreamTestScenario.PAR_STREAM_TO_ARRAY_CLEAR_SIZED). + without(IntStreamTestScenario.CLEAR_SIZED_SCENARIOS). resultAsserter(ra). exercise(); } @@ -276,7 +276,7 @@ public void testLongs(TestData.OfLong data, ResultAsserter<Iterable<Long>> ra) { withData(data). stream(s -> s). - without(LongStreamTestScenario.PAR_STREAM_TO_ARRAY_CLEAR_SIZED). + without(LongStreamTestScenario.CLEAR_SIZED_SCENARIOS). resultAsserter(ra). exercise(); } @@ -360,7 +360,7 @@ public void testDoubles(TestData.OfDouble data, ResultAsserter<Iterable<Double>> ra) { withData(data). stream(s -> s). - without(DoubleStreamTestScenario.PAR_STREAM_TO_ARRAY_CLEAR_SIZED). + without(DoubleStreamTestScenario.CLEAR_SIZED_SCENARIOS). resultAsserter(ra). exercise(); }
--- a/test/java/util/stream/test/org/openjdk/tests/java/util/stream/DistinctOpTest.java Thu Jun 25 16:49:08 2015 -0700 +++ b/test/java/util/stream/test/org/openjdk/tests/java/util/stream/DistinctOpTest.java Tue Jun 23 09:49:55 2015 +0200 @@ -32,7 +32,16 @@ import java.util.Spliterator; import java.util.Spliterators; import java.util.concurrent.ThreadLocalRandom; -import java.util.stream.*; +import java.util.stream.CollectorOps; +import java.util.stream.Collectors; +import java.util.stream.DoubleStream; +import java.util.stream.IntStream; +import java.util.stream.LongStream; +import java.util.stream.OpTestCase; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; +import java.util.stream.StreamTestDataProvider; +import java.util.stream.TestData; import static java.util.stream.LambdaTestHelpers.*; @@ -67,7 +76,12 @@ @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class) public void testOp(String name, TestData.OfRef<Integer> data) { - Collection<Integer> result = exerciseOpsInt(data, Stream::distinct, IntStream::distinct, LongStream::distinct, DoubleStream::distinct); + Collection<Integer> result = exerciseOpsInt( + data, + Stream::distinct, + IntStream::distinct, + LongStream::distinct, + DoubleStream::distinct); assertUnique(result); assertTrue((data.size() > 0) ? result.size() > 0 : result.size() == 0); @@ -127,10 +141,14 @@ @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class) public void testDistinctDistinct(String name, TestData.OfRef<Integer> data) { - Collection<Integer> result = withData(data) - .stream(s -> s.distinct().distinct(), new CollectorOps.TestParallelSizedOp<>()) - .exercise(); - assertUnique(result); + Collection<Integer> result = exerciseOpsInt( + data, + s -> s.distinct().distinct(), + s -> s.distinct().distinct(), + s -> s.distinct().distinct(), + s -> s.distinct().distinct()); + + assertUnique(result); } @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class) @@ -152,4 +170,31 @@ assertUnique(result); assertSorted(result); } + + @Test + public void testStable() { + // Create N instances of Integer all with the same value + List<Integer> input = IntStream.rangeClosed(0, 1000) + .mapToObj(i -> new Integer(1000)) // explicit construction + .collect(Collectors.toList()); + Integer expectedElement = input.get(0); + TestData<Integer, Stream<Integer>> data = TestData.Factory.ofCollection( + "1000 instances of Integer with the same value", input); + + withData(data) + .stream(Stream::distinct) + .resultAsserter((actual, expected, isOrdered, isParallel) -> { + List<Integer> l = new ArrayList<>(); + actual.forEach(l::add); + + // Assert stability + // The single result element should be equal in identity to + // the first input element + assertEquals(l.size(), 1); + assertEquals(System.identityHashCode(l.get(0)), + System.identityHashCode(expectedElement)); + + }) + .exercise(); + } }
--- a/test/java/util/stream/test/org/openjdk/tests/java/util/stream/InfiniteStreamWithLimitOpTest.java Thu Jun 25 16:49:08 2015 -0700 +++ b/test/java/util/stream/test/org/openjdk/tests/java/util/stream/InfiniteStreamWithLimitOpTest.java Tue Jun 23 09:49:55 2015 +0200 @@ -1,5 +1,5 @@ /* - * Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2012, 2015, Oracle and/or its affiliates. All rights reserved. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * * This code is free software; you can redistribute it and/or modify it @@ -181,7 +181,7 @@ // slice implementations withData(refLongs()). stream(s -> fs.apply(s)). - without(StreamTestScenario.PAR_STREAM_TO_ARRAY_CLEAR_SIZED). + without(StreamTestScenario.CLEAR_SIZED_SCENARIOS). exercise(); } @@ -192,7 +192,7 @@ // slice implementations withData(ints()). stream(s -> fs.apply(s)). - without(IntStreamTestScenario.PAR_STREAM_TO_ARRAY_CLEAR_SIZED). + without(IntStreamTestScenario.CLEAR_SIZED_SCENARIOS). exercise(); } @@ -203,7 +203,7 @@ // slice implementations withData(longs()). stream(s -> fs.apply(s)). - without(LongStreamTestScenario.PAR_STREAM_TO_ARRAY_CLEAR_SIZED). + without(LongStreamTestScenario.CLEAR_SIZED_SCENARIOS). exercise(); } @@ -214,7 +214,7 @@ // slice implementations withData(doubles()). stream(s -> fs.apply(s)). - without(DoubleStreamTestScenario.PAR_STREAM_TO_ARRAY_CLEAR_SIZED). + without(DoubleStreamTestScenario.CLEAR_SIZED_SCENARIOS). exercise(); }