Mercurial > hg > openjdk > lambda > jdk
changeset 7519:5c97c99f2d72
Rename FoldOp to ReduceOp
author | briangoetz |
---|---|
date | Fri, 22 Feb 2013 12:59:59 -0500 |
parents | 9df184b39d16 |
children | 418a74c46029 752249e6e1d9 |
files | src/share/classes/java/util/stream/Collectors.java src/share/classes/java/util/stream/DoublePipeline.java src/share/classes/java/util/stream/FoldOp.java src/share/classes/java/util/stream/IntPipeline.java src/share/classes/java/util/stream/LongPipeline.java src/share/classes/java/util/stream/ReduceOp.java src/share/classes/java/util/stream/ReferencePipeline.java |
diffstat | 7 files changed, 716 insertions(+), 716 deletions(-) [+] |
line wrap: on
line diff
--- a/src/share/classes/java/util/stream/Collectors.java Fri Feb 22 12:33:55 2013 -0500 +++ b/src/share/classes/java/util/stream/Collectors.java Fri Feb 22 12:59:59 2013 -0500 @@ -197,7 +197,7 @@ public static<T> Collector<T,Set<T>> toSet() { - // @@@ Declare that the collector is NOT_ORDERED so the fold op can declare NOT_ORDERED in + // @@@ Declare that the collector is NOT_ORDERED so the reduce op can declare NOT_ORDERED in // the terminal op flags return toCollection(HashSet::new); }
--- a/src/share/classes/java/util/stream/DoublePipeline.java Fri Feb 22 12:33:55 2013 -0500 +++ b/src/share/classes/java/util/stream/DoublePipeline.java Fri Feb 22 12:59:59 2013 -0500 @@ -249,17 +249,17 @@ @Override public double reduce(double identity, DoubleBinaryOperator op) { - return pipeline(FoldOp.makeDouble(identity, op)); + return pipeline(ReduceOp.makeDouble(identity, op)); } @Override public OptionalDouble reduce(DoubleBinaryOperator op) { - return pipeline(FoldOp.makeDouble(op)); + return pipeline(ReduceOp.makeDouble(op)); } @Override public <R> R collect(Collector.OfDouble<R> collector) { - return pipeline(FoldOp.makeDouble(collector)); + return pipeline(ReduceOp.makeDouble(collector)); } @Override
--- a/src/share/classes/java/util/stream/FoldOp.java Fri Feb 22 12:33:55 2013 -0500 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,701 +0,0 @@ -/* - * Copyright (c) 2012, Oracle and/or its affiliates. All rights reserved. - * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. - * - * This code is free software; you can redistribute it and/or modify it - * under the terms of the GNU General Public License version 2 only, as - * published by the Free Software Foundation. Oracle designates this - * particular file as subject to the "Classpath" exception as provided - * by Oracle in the LICENSE file that accompanied this code. - * - * This code is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License - * version 2 for more details (a copy is included in the LICENSE file that - * accompanied this code). - * - * You should have received a copy of the GNU General Public License version - * 2 along with this work; if not, write to the Free Software Foundation, - * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. - * - * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA - * or visit www.oracle.com if you need additional information or have any - * questions. - */ -package java.util.stream; - -import java.util.Optional; -import java.util.OptionalDouble; -import java.util.OptionalInt; -import java.util.OptionalLong; -import java.util.function.BiConsumer; -import java.util.function.BiFunction; -import java.util.function.BinaryOperator; -import java.util.function.DoubleBinaryOperator; -import java.util.function.IntBinaryOperator; -import java.util.function.LongBinaryOperator; -import java.util.function.ObjDoubleConsumer; -import java.util.function.ObjIntConsumer; -import java.util.function.ObjLongConsumer; -import java.util.function.Supplier; - -/** - * A {@code TerminalOp} that evaluates a stream pipeline and sends the output into an {@code AccumulatingSink}, - * which performs a reduce operation - * - * @param <T> The output type of the stream pipeline - * @param <R> The result type of the reducing operation - * @param <S> The type of the {@code AccumulatingSink} - * @since 1.8 - */ -class FoldOp<T, R, S extends OpUtils.AccumulatingSink<T, R, S>> implements TerminalOp<T, R> { - private final Supplier<S> sinkSupplier; - private final StreamShape inputShape; - - /** - * Create a {@code FoldOp} of the specified stream shape which uses the specified {@code Supplier} to create - * accumulating sinks - * @param shape The shape of the stream pipeline - * @param supplier A factory for {@code AccumulatingSinks} - */ - public FoldOp(StreamShape shape, Supplier<S> supplier) { - sinkSupplier = supplier; - inputShape = shape; - } - - /** - * Create a {@code FoldOp} for a reference stream which uses the specified {@code Supplier} to create - * accumulating sinks - * @param supplier A factory for {@code AccumulatingSinks} - */ - public FoldOp(Supplier<S> supplier) { - this(StreamShape.REFERENCE, supplier); - } - - @Override - public StreamShape inputShape() { - return inputShape; - } - - public <S> R evaluateSequential(PipelineHelper<S, T> helper) { - return helper.into(sinkSupplier.get(), helper.sourceSpliterator()).getAndClearState(); - } - - @Override - public <S> R evaluateParallel(PipelineHelper<S, T> helper) { - return OpUtils.parallelReduce(helper, sinkSupplier); - } - - - /** - * State box for a single state element, used as a base class for {@code AccumulatingSink} instances - * @param <U> The type of the state element - */ - private static abstract class Box<U> { - protected U state; - - public void clearState() { - state = null; - } - - public U getAndClearState() { - try { return state; } - finally { state = null; } - } - } - - /** - * State box for an optional state element, used as a base class for {@code AccumulatingSink} instances - * @param <U> The type of the state element - */ - private static abstract class OptionalBox<U> { - protected boolean empty; - protected U state; - - public void begin(long size) { - empty = true; - state = null; - } - - public void clearState() { - empty = true; - state = null; - } - - public Optional<U> getAndClearState() { - try { return empty ? Optional.empty() : Optional.of(state); } - finally { clearState(); } - } - } - - /** - * State box for an int state element, used as a base class for {@code AccumulatingSink} instances - */ - private static abstract class IntBox { - protected int state; - - public void begin(long size) { - state = 0; - } - - public void clearState() { - state = 0; - } - - public Integer getAndClearState() { - try { return state; } - finally { state = 0; } - } - } - - /** - * State box for an optional int state element, used as a base class for {@code AccumulatingSink} instances - */ - private static abstract class OptionalIntBox { - protected boolean empty; - protected int state; - - public void begin(long size) { - empty = true; - state = 0; - } - - public void clearState() { - empty = true; - state = 0; - } - - public OptionalInt getAndClearState() { - try { return empty ? OptionalInt.empty() : OptionalInt.of(state); } - finally { state = 0; } - } - } - - /** - * State box for a long state element, used as a base class for {@code AccumulatingSink} instances - */ - private static abstract class LongBox { - protected long state; - - public void begin(long size) { - state = 0; - } - - public void clearState() { - state = 0; - } - - public Long getAndClearState() { - try { return state; } - finally { state = 0; } - } - } - - /** - * State box for an optional long state element, used as a base class for {@code AccumulatingSink} instances - */ - private static abstract class OptionalLongBox { - protected boolean empty; - protected long state; - - public void begin(long size) { - empty = true; - state = 0; - } - - public void clearState() { - empty = true; - state = 0; - } - - public OptionalLong getAndClearState() { - try { return empty ? OptionalLong.empty() : OptionalLong.of(state); } - finally { state = 0; } - } - } - - /** - * State box for a double state element, used as a base class for {@code AccumulatingSink} instances - */ - private static abstract class DoubleBox { - protected double state; - - public void begin(long size) { - state = 0; - } - - public void clearState() { - state = 0; - } - - public Double getAndClearState() { - try { return state; } - finally { state = 0; } - } - } - - /** - * State box for an optional double state element, used as a base class for {@code AccumulatingSink} instances - */ - private static abstract class OptionalDoubleBox { - protected boolean empty; - protected double state; - - public void begin(long size) { - empty = true; - state = 0; - } - - public void clearState() { - empty = true; - state = 0; - } - - public OptionalDouble getAndClearState() { - try { return empty ? OptionalDouble.empty() : OptionalDouble.of(state); } - finally { state = 0; } - } - } - - /** - * Construct a {@code FoldOp} that implements a functional reduce on reference values - * @param seed The identity element for the reduction - * @param reducer The accumulating function that incorporates an additional input element into the result - * @param combiner The combining function that combines two intermediate results - * @param <T> The type of the input elements - * @param <U> The type of the result - * @return A {@code FoldOp} implementing the reduction - */ - public static<T, U> TerminalOp<T, U> - makeRef(U seed, BiFunction<U, ? super T, U> reducer, BinaryOperator<U> combiner) { - class FoldingSink extends Box<U> implements OpUtils.AccumulatingSink<T, U, FoldingSink>, Sink<T> { - @Override - public void begin(long size) { - state = seed; - } - - @Override - public void accept(T t) { - state = reducer.apply(state, t); - } - - @Override - public void combine(FoldingSink other) { - state = combiner.apply(state, other.state); - } - } - // @@@ Replace inner class suppliers with ctor refs or lambdas pending fix of bug jdk-8005043 - return new FoldOp<>(new Supplier<FoldingSink>() { - @Override - public FoldingSink get() {return new FoldingSink();} - }); - } - - /** - * Construct a {@code FoldOp} that implements a functional reduce on reference values - * producing an optional reference result - * @param operator The reducing function - * @param <T> The type of the input elements, and the type of the result - * @return A {@code FoldOp} implementing the reduction - */ - public static<T> TerminalOp<T, Optional<T>> - makeRef(BinaryOperator<T> operator) { - class FoldingSink extends OptionalBox<T> implements OpUtils.AccumulatingSink<T, Optional<T>, FoldingSink> { - - @Override - public void accept(T t) { - if (empty) { - empty = false; - state = t; - } else { - state = operator.apply(state, t); - } - } - - @Override - public void combine(FoldingSink other) { - if (!other.empty) - accept(other.state); - } - } - return new FoldOp<>(new Supplier<FoldingSink>() { - @Override - public FoldingSink get() {return new FoldingSink();} - }); - } - - /** - * Construct a {@code FoldOp} that implements a mutable reduce on reference values - * @param collector A {@code Collector} defining the reduction - * @param <T> The type of the input elements - * @param <R> The type of the result - * @return A {@code FoldOp} implementing the reduction - */ - public static<T,R> TerminalOp<T, R> - makeRef(Collector<? super T,R> collector) { - Supplier<R> supplier = collector.resultSupplier(); - BiConsumer<R, ? super T> accumulator = collector.accumulator(); - BinaryOperator<R> combiner = collector.combiner(); - class FoldingSink extends Box<R> implements OpUtils.AccumulatingSink<T, R, FoldingSink> { - @Override - public void begin(long size) { - state = supplier.get(); - } - - @Override - public void accept(T t) { - accumulator.accept(state, t); - } - - @Override - public void combine(FoldingSink other) { - state = combiner.apply(state, other.state); - } - } - return new FoldOp<>(new Supplier<FoldingSink>() { - @Override - public FoldingSink get() {return new FoldingSink();} - }); - } - - /** - * Construct a {@code FoldOp} that implements a mutable reduce on reference values - * @param seedFactory A factory to produce a new base accumulator - * @param accumulator A function to incorporate an element into an accumulator - * @param reducer A function to combine an accumulator into another - * @param <T> The type of the input elements - * @param <R> The type of the result - * @return A {@code FoldOp} implementing the reduction - */ - public static<T, R> TerminalOp<T, R> - makeRef(Supplier<R> seedFactory, BiConsumer<R, ? super T> accumulator, BiConsumer<R,R> reducer) { - class FoldingSink extends Box<R> implements OpUtils.AccumulatingSink<T, R, FoldingSink> { - @Override - public void begin(long size) { - state = seedFactory.get(); - } - - @Override - public void accept(T t) { - accumulator.accept(state, t); - } - - @Override - public void combine(FoldingSink other) { - reducer.accept(state, other.state); - } - } - return new FoldOp<>(new Supplier<FoldingSink>() { - @Override - public FoldingSink get() {return new FoldingSink();} - }); - } - - /** - * Construct a {@code FoldOp} that implements a functional reduce on integer values - * @param identity The identity for the combining function - * @param operator The combining function - * @return A {@code FoldOp} implementing the reduction - */ - public static TerminalOp<Integer, Integer> - makeInt(int identity, IntBinaryOperator operator) { - class FoldingSink extends IntBox implements OpUtils.AccumulatingSink<Integer, Integer, FoldingSink>, Sink.OfInt { - @Override - public void begin(long size) { - state = identity; - } - - @Override - public void accept(int t) { - state = operator.applyAsInt(state, t); - } - - @Override - public void combine(FoldingSink other) { - accept(other.state); - } - } - return new FoldOp<>(StreamShape.INT_VALUE, new Supplier<FoldingSink>() { - @Override - public FoldingSink get() { - return new FoldingSink(); - } - }); - } - - /** - * Construct a {@code FoldOp} that implements a functional reduce on integer values, - * producing an optional integer result - * @param operator The combining function - * @return A {@code FoldOp} implementing the reduction - */ - public static TerminalOp<Integer, OptionalInt> - makeInt(IntBinaryOperator operator) { - class FoldingSink extends OptionalIntBox implements OpUtils.AccumulatingSink<Integer, OptionalInt, FoldingSink>, Sink.OfInt { - @Override - public void accept(int t) { - if (empty) { - empty = false; - state = t; - } - else { - state = operator.applyAsInt(state, t); - } - } - - @Override - public void combine(FoldingSink other) { - if (!other.empty) - accept(other.state); - } - } - - return new FoldOp<>(StreamShape.INT_VALUE, new Supplier<FoldingSink>() { - @Override - public FoldingSink get() { - return new FoldingSink(); - } - }); - } - - /** - * Construct a {@code FoldOp} that implements a mutable reduce on integer values - * @param collector A {@code Collector} defining the reduction - * @param <R> The type of the result - * @return A {@code FoldOp} implementing the reduction - */ - public static <R> TerminalOp<Integer, R> - makeInt(Collector.OfInt<R> collector) { - Supplier<R> supplier = collector.resultSupplier(); - ObjIntConsumer<R> accumulator = collector.intAccumulator(); - BinaryOperator<R> combiner = collector.combiner(); - class FoldingSink extends Box<R> implements OpUtils.AccumulatingSink<Integer, R, FoldingSink>, Sink.OfInt { - @Override - public void begin(long size) { - state = supplier.get(); - } - - @Override - public void accept(int t) { - accumulator.accept(state, t); - } - - @Override - public void combine(FoldingSink other) { - state = combiner.apply(state, other.state); - } - } - - return new FoldOp<>(StreamShape.INT_VALUE, new Supplier<FoldingSink>() { - @Override - public FoldingSink get() { - return new FoldingSink(); - } - }); - } - - /** - * Construct a {@code FoldOp} that implements a functional reduce on long values - * @param identity The identity for the combining function - * @param operator The combining function - * @return A {@code FoldOp} implementing the reduction - */ - public static TerminalOp<Long, Long> - makeLong(long identity, LongBinaryOperator operator) { - class FoldingSink extends LongBox implements OpUtils.AccumulatingSink<Long, Long, FoldingSink>, Sink.OfLong { - @Override - public void begin(long size) { - state = identity; - } - - @Override - public void accept(long t) { - state = operator.applyAsLong(state, t); - } - - @Override - public void combine(FoldingSink other) { - accept(other.state); - } - } - return new FoldOp<>(StreamShape.LONG_VALUE, new Supplier<FoldingSink>() { - @Override - public FoldingSink get() { - return new FoldingSink(); - } - }); - } - - /** - * Construct a {@code FoldOp} that implements a functional reduce on long values, producing an optional long result - * @param operator The combining function - * @return A {@code FoldOp} implementing the reduction - */ - public static TerminalOp<Long, OptionalLong> - makeLong(LongBinaryOperator operator) { - class FoldingSink extends OptionalLongBox implements OpUtils.AccumulatingSink<Long, OptionalLong, FoldingSink>, Sink.OfLong { - @Override - public void accept(long t) { - if (empty) { - empty = false; - state = t; - } - else { - state = operator.applyAsLong(state, t); - } - } - - @Override - public void combine(FoldingSink other) { - if (!other.empty) - accept(other.state); - } - } - - return new FoldOp<>(StreamShape.LONG_VALUE, new Supplier<FoldingSink>() { - @Override - public FoldingSink get() { - return new FoldingSink(); - } - }); - } - - /** - * Construct a {@code FoldOp} that implements a mutable reduce on long values - * @param collector A {@code Collector} defining the reduction - * @param <R> The type of the result - * @return A {@code FoldOp} implementing the reduction - */ - public static <R> TerminalOp<Long, R> - makeLong(Collector.OfLong<R> collector) { - Supplier<R> supplier = collector.resultSupplier(); - ObjLongConsumer<R> accumulator = collector.longAccumulator(); - BinaryOperator<R> combiner = collector.combiner(); - class FoldingSink extends Box<R> implements OpUtils.AccumulatingSink<Long, R, FoldingSink>, Sink.OfLong { - @Override - public void begin(long size) { - state = supplier.get(); - } - - @Override - public void accept(long t) { - accumulator.accept(state, t); - } - - @Override - public void combine(FoldingSink other) { - state = combiner.apply(state, other.state); - } - } - - return new FoldOp<>(StreamShape.LONG_VALUE, new Supplier<FoldingSink>() { - @Override - public FoldingSink get() { - return new FoldingSink(); - } - }); - } - - /** - * Construct a {@code FoldOp} that implements a functional reduce on double values - * @param identity The identity for the combining function - * @param operator The combining function - * @return A {@code FoldOp} implementing the reduction - */ - public static TerminalOp<Double, Double> - makeDouble(double identity, DoubleBinaryOperator operator) { - class FoldingSink extends DoubleBox implements OpUtils.AccumulatingSink<Double, Double, FoldingSink>, Sink.OfDouble { - @Override - public void begin(long size) { - state = identity; - } - - @Override - public void accept(double t) { - state = operator.applyAsDouble(state, t); - } - - @Override - public void combine(FoldingSink other) { - accept(other.state); - } - } - return new FoldOp<>(StreamShape.DOUBLE_VALUE, new Supplier<FoldingSink>() { - @Override - public FoldingSink get() { - return new FoldingSink(); - } - }); - } - - /** - * Construct a {@code FoldOp} that implements a functional reduce on double values, - * producing an optional double result - * @param operator The combining function - * @return A {@code FoldOp} implementing the reduction - */ - public static TerminalOp<Double, OptionalDouble> - makeDouble(DoubleBinaryOperator operator) { - class FoldingSink extends OptionalDoubleBox implements OpUtils.AccumulatingSink<Double, OptionalDouble, FoldingSink>, Sink.OfDouble { - @Override - public void accept(double t) { - if (empty) { - empty = false; - state = t; - } - else { - state = operator.applyAsDouble(state, t); - } - } - - @Override - public void combine(FoldingSink other) { - if (!other.empty) - accept(other.state); - } - } - - return new FoldOp<>(StreamShape.DOUBLE_VALUE, new Supplier<FoldingSink>() { - @Override - public FoldingSink get() { - return new FoldingSink(); - } - }); - } - - /** - * Construct a {@code FoldOp} that implements a mutable reduce on double values - * @param collector A {@code Collector} defining the reduction - * @param <R> The type of the result - * @return A {@code FoldOp} implementing the reduction - */ - public static <R> TerminalOp<Double, R> makeDouble(Collector.OfDouble<R> collector) { - Supplier<R> supplier = collector.resultSupplier(); - ObjDoubleConsumer<R> accumulator = collector.doubleAccumulator(); - BinaryOperator<R> combiner = collector.combiner(); - - class FoldingSink extends Box<R> implements OpUtils.AccumulatingSink<Double, R, FoldingSink>, Sink.OfDouble { - @Override - public void begin(long size) { - state = supplier.get(); - } - - @Override - public void accept(double t) { - accumulator.accept(state, t); - } - - @Override - public void combine(FoldingSink other) { - state = combiner.apply(state, other.state); - } - } - - return new FoldOp<>(StreamShape.DOUBLE_VALUE, new Supplier<FoldingSink>() { - @Override - public FoldingSink get() { - return new FoldingSink(); - } - }); - } -}
--- a/src/share/classes/java/util/stream/IntPipeline.java Fri Feb 22 12:33:55 2013 -0500 +++ b/src/share/classes/java/util/stream/IntPipeline.java Fri Feb 22 12:59:59 2013 -0500 @@ -270,17 +270,17 @@ @Override public int reduce(int identity, IntBinaryOperator op) { - return pipeline(FoldOp.makeInt(identity, op)); + return pipeline(ReduceOp.makeInt(identity, op)); } @Override public OptionalInt reduce(IntBinaryOperator op) { - return pipeline(FoldOp.makeInt(op)); + return pipeline(ReduceOp.makeInt(op)); } @Override public <R> R collect(Collector.OfInt<R> collector) { - return pipeline(FoldOp.makeInt(collector)); + return pipeline(ReduceOp.makeInt(collector)); } @Override
--- a/src/share/classes/java/util/stream/LongPipeline.java Fri Feb 22 12:33:55 2013 -0500 +++ b/src/share/classes/java/util/stream/LongPipeline.java Fri Feb 22 12:59:59 2013 -0500 @@ -260,17 +260,17 @@ @Override public long reduce(long identity, LongBinaryOperator op) { - return pipeline(FoldOp.makeLong(identity, op)); + return pipeline(ReduceOp.makeLong(identity, op)); } @Override public OptionalLong reduce(LongBinaryOperator op) { - return pipeline(FoldOp.makeLong(op)); + return pipeline(ReduceOp.makeLong(op)); } @Override public <R> R collect(Collector.OfLong<R> collector) { - return pipeline(FoldOp.makeLong(collector)); + return pipeline(ReduceOp.makeLong(collector)); } @Override
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/share/classes/java/util/stream/ReduceOp.java Fri Feb 22 12:59:59 2013 -0500 @@ -0,0 +1,701 @@ +/* + * Copyright (c) 2012, Oracle and/or its affiliates. All rights reserved. + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This code is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 only, as + * published by the Free Software Foundation. Oracle designates this + * particular file as subject to the "Classpath" exception as provided + * by Oracle in the LICENSE file that accompanied this code. + * + * This code is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * version 2 for more details (a copy is included in the LICENSE file that + * accompanied this code). + * + * You should have received a copy of the GNU General Public License version + * 2 along with this work; if not, write to the Free Software Foundation, + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA + * or visit www.oracle.com if you need additional information or have any + * questions. + */ +package java.util.stream; + +import java.util.Optional; +import java.util.OptionalDouble; +import java.util.OptionalInt; +import java.util.OptionalLong; +import java.util.function.BiConsumer; +import java.util.function.BiFunction; +import java.util.function.BinaryOperator; +import java.util.function.DoubleBinaryOperator; +import java.util.function.IntBinaryOperator; +import java.util.function.LongBinaryOperator; +import java.util.function.ObjDoubleConsumer; +import java.util.function.ObjIntConsumer; +import java.util.function.ObjLongConsumer; +import java.util.function.Supplier; + +/** + * A {@code TerminalOp} that evaluates a stream pipeline and sends the output into an {@code AccumulatingSink}, + * which performs a reduce operation + * + * @param <T> The output type of the stream pipeline + * @param <R> The result type of the reducing operation + * @param <S> The type of the {@code AccumulatingSink} + * @since 1.8 + */ +class ReduceOp<T, R, S extends OpUtils.AccumulatingSink<T, R, S>> implements TerminalOp<T, R> { + private final Supplier<S> sinkSupplier; + private final StreamShape inputShape; + + /** + * Create a {@code ReduceOp} of the specified stream shape which uses the specified {@code Supplier} to create + * accumulating sinks + * @param shape The shape of the stream pipeline + * @param supplier A factory for {@code AccumulatingSinks} + */ + public ReduceOp(StreamShape shape, Supplier<S> supplier) { + sinkSupplier = supplier; + inputShape = shape; + } + + /** + * Create a {@code ReduceOp} for a reference stream which uses the specified {@code Supplier} to create + * accumulating sinks + * @param supplier A factory for {@code AccumulatingSinks} + */ + public ReduceOp(Supplier<S> supplier) { + this(StreamShape.REFERENCE, supplier); + } + + @Override + public StreamShape inputShape() { + return inputShape; + } + + public <S> R evaluateSequential(PipelineHelper<S, T> helper) { + return helper.into(sinkSupplier.get(), helper.sourceSpliterator()).getAndClearState(); + } + + @Override + public <S> R evaluateParallel(PipelineHelper<S, T> helper) { + return OpUtils.parallelReduce(helper, sinkSupplier); + } + + + /** + * State box for a single state element, used as a base class for {@code AccumulatingSink} instances + * @param <U> The type of the state element + */ + private static abstract class Box<U> { + protected U state; + + public void clearState() { + state = null; + } + + public U getAndClearState() { + try { return state; } + finally { state = null; } + } + } + + /** + * State box for an optional state element, used as a base class for {@code AccumulatingSink} instances + * @param <U> The type of the state element + */ + private static abstract class OptionalBox<U> { + protected boolean empty; + protected U state; + + public void begin(long size) { + empty = true; + state = null; + } + + public void clearState() { + empty = true; + state = null; + } + + public Optional<U> getAndClearState() { + try { return empty ? Optional.empty() : Optional.of(state); } + finally { clearState(); } + } + } + + /** + * State box for an int state element, used as a base class for {@code AccumulatingSink} instances + */ + private static abstract class IntBox { + protected int state; + + public void begin(long size) { + state = 0; + } + + public void clearState() { + state = 0; + } + + public Integer getAndClearState() { + try { return state; } + finally { state = 0; } + } + } + + /** + * State box for an optional int state element, used as a base class for {@code AccumulatingSink} instances + */ + private static abstract class OptionalIntBox { + protected boolean empty; + protected int state; + + public void begin(long size) { + empty = true; + state = 0; + } + + public void clearState() { + empty = true; + state = 0; + } + + public OptionalInt getAndClearState() { + try { return empty ? OptionalInt.empty() : OptionalInt.of(state); } + finally { state = 0; } + } + } + + /** + * State box for a long state element, used as a base class for {@code AccumulatingSink} instances + */ + private static abstract class LongBox { + protected long state; + + public void begin(long size) { + state = 0; + } + + public void clearState() { + state = 0; + } + + public Long getAndClearState() { + try { return state; } + finally { state = 0; } + } + } + + /** + * State box for an optional long state element, used as a base class for {@code AccumulatingSink} instances + */ + private static abstract class OptionalLongBox { + protected boolean empty; + protected long state; + + public void begin(long size) { + empty = true; + state = 0; + } + + public void clearState() { + empty = true; + state = 0; + } + + public OptionalLong getAndClearState() { + try { return empty ? OptionalLong.empty() : OptionalLong.of(state); } + finally { state = 0; } + } + } + + /** + * State box for a double state element, used as a base class for {@code AccumulatingSink} instances + */ + private static abstract class DoubleBox { + protected double state; + + public void begin(long size) { + state = 0; + } + + public void clearState() { + state = 0; + } + + public Double getAndClearState() { + try { return state; } + finally { state = 0; } + } + } + + /** + * State box for an optional double state element, used as a base class for {@code AccumulatingSink} instances + */ + private static abstract class OptionalDoubleBox { + protected boolean empty; + protected double state; + + public void begin(long size) { + empty = true; + state = 0; + } + + public void clearState() { + empty = true; + state = 0; + } + + public OptionalDouble getAndClearState() { + try { return empty ? OptionalDouble.empty() : OptionalDouble.of(state); } + finally { state = 0; } + } + } + + /** + * Construct a {@code ReduceOp} that implements a functional reduce on reference values + * @param seed The identity element for the reduction + * @param reducer The accumulating function that incorporates an additional input element into the result + * @param combiner The combining function that combines two intermediate results + * @param <T> The type of the input elements + * @param <U> The type of the result + * @return A {@code ReduceOp} implementing the reduction + */ + public static<T, U> TerminalOp<T, U> + makeRef(U seed, BiFunction<U, ? super T, U> reducer, BinaryOperator<U> combiner) { + class ReducingSink extends Box<U> implements OpUtils.AccumulatingSink<T, U, ReducingSink>, Sink<T> { + @Override + public void begin(long size) { + state = seed; + } + + @Override + public void accept(T t) { + state = reducer.apply(state, t); + } + + @Override + public void combine(ReducingSink other) { + state = combiner.apply(state, other.state); + } + } + // @@@ Replace inner class suppliers with ctor refs or lambdas pending fix of bug jdk-8005043 + return new ReduceOp<>(new Supplier<ReducingSink>() { + @Override + public ReducingSink get() {return new ReducingSink();} + }); + } + + /** + * Construct a {@code ReduceOp} that implements a functional reduce on reference values + * producing an optional reference result + * @param operator The reducing function + * @param <T> The type of the input elements, and the type of the result + * @return A {@code ReduceOp} implementing the reduction + */ + public static<T> TerminalOp<T, Optional<T>> + makeRef(BinaryOperator<T> operator) { + class ReducingSink extends OptionalBox<T> implements OpUtils.AccumulatingSink<T, Optional<T>, ReducingSink> { + + @Override + public void accept(T t) { + if (empty) { + empty = false; + state = t; + } else { + state = operator.apply(state, t); + } + } + + @Override + public void combine(ReducingSink other) { + if (!other.empty) + accept(other.state); + } + } + return new ReduceOp<>(new Supplier<ReducingSink>() { + @Override + public ReducingSink get() {return new ReducingSink();} + }); + } + + /** + * Construct a {@code ReduceOp} that implements a mutable reduce on reference values + * @param collector A {@code Collector} defining the reduction + * @param <T> The type of the input elements + * @param <R> The type of the result + * @return A {@code ReduceOp} implementing the reduction + */ + public static<T,R> TerminalOp<T, R> + makeRef(Collector<? super T,R> collector) { + Supplier<R> supplier = collector.resultSupplier(); + BiConsumer<R, ? super T> accumulator = collector.accumulator(); + BinaryOperator<R> combiner = collector.combiner(); + class ReducingSink extends Box<R> implements OpUtils.AccumulatingSink<T, R, ReducingSink> { + @Override + public void begin(long size) { + state = supplier.get(); + } + + @Override + public void accept(T t) { + accumulator.accept(state, t); + } + + @Override + public void combine(ReducingSink other) { + state = combiner.apply(state, other.state); + } + } + return new ReduceOp<>(new Supplier<ReducingSink>() { + @Override + public ReducingSink get() {return new ReducingSink();} + }); + } + + /** + * Construct a {@code ReduceOp} that implements a mutable reduce on reference values + * @param seedFactory A factory to produce a new base accumulator + * @param accumulator A function to incorporate an element into an accumulator + * @param reducer A function to combine an accumulator into another + * @param <T> The type of the input elements + * @param <R> The type of the result + * @return A {@code ReduceOp} implementing the reduction + */ + public static<T, R> TerminalOp<T, R> + makeRef(Supplier<R> seedFactory, BiConsumer<R, ? super T> accumulator, BiConsumer<R,R> reducer) { + class ReducingSink extends Box<R> implements OpUtils.AccumulatingSink<T, R, ReducingSink> { + @Override + public void begin(long size) { + state = seedFactory.get(); + } + + @Override + public void accept(T t) { + accumulator.accept(state, t); + } + + @Override + public void combine(ReducingSink other) { + reducer.accept(state, other.state); + } + } + return new ReduceOp<>(new Supplier<ReducingSink>() { + @Override + public ReducingSink get() {return new ReducingSink();} + }); + } + + /** + * Construct a {@code ReduceOp} that implements a functional reduce on integer values + * @param identity The identity for the combining function + * @param operator The combining function + * @return A {@code ReduceOp} implementing the reduction + */ + public static TerminalOp<Integer, Integer> + makeInt(int identity, IntBinaryOperator operator) { + class ReducingSink extends IntBox implements OpUtils.AccumulatingSink<Integer, Integer, ReducingSink>, Sink.OfInt { + @Override + public void begin(long size) { + state = identity; + } + + @Override + public void accept(int t) { + state = operator.applyAsInt(state, t); + } + + @Override + public void combine(ReducingSink other) { + accept(other.state); + } + } + return new ReduceOp<>(StreamShape.INT_VALUE, new Supplier<ReducingSink>() { + @Override + public ReducingSink get() { + return new ReducingSink(); + } + }); + } + + /** + * Construct a {@code ReduceOp} that implements a functional reduce on integer values, + * producing an optional integer result + * @param operator The combining function + * @return A {@code ReduceOp} implementing the reduction + */ + public static TerminalOp<Integer, OptionalInt> + makeInt(IntBinaryOperator operator) { + class ReducingSink extends OptionalIntBox implements OpUtils.AccumulatingSink<Integer, OptionalInt, ReducingSink>, Sink.OfInt { + @Override + public void accept(int t) { + if (empty) { + empty = false; + state = t; + } + else { + state = operator.applyAsInt(state, t); + } + } + + @Override + public void combine(ReducingSink other) { + if (!other.empty) + accept(other.state); + } + } + + return new ReduceOp<>(StreamShape.INT_VALUE, new Supplier<ReducingSink>() { + @Override + public ReducingSink get() { + return new ReducingSink(); + } + }); + } + + /** + * Construct a {@code ReduceOp} that implements a mutable reduce on integer values + * @param collector A {@code Collector} defining the reduction + * @param <R> The type of the result + * @return A {@code ReduceOp} implementing the reduction + */ + public static <R> TerminalOp<Integer, R> + makeInt(Collector.OfInt<R> collector) { + Supplier<R> supplier = collector.resultSupplier(); + ObjIntConsumer<R> accumulator = collector.intAccumulator(); + BinaryOperator<R> combiner = collector.combiner(); + class ReducingSink extends Box<R> implements OpUtils.AccumulatingSink<Integer, R, ReducingSink>, Sink.OfInt { + @Override + public void begin(long size) { + state = supplier.get(); + } + + @Override + public void accept(int t) { + accumulator.accept(state, t); + } + + @Override + public void combine(ReducingSink other) { + state = combiner.apply(state, other.state); + } + } + + return new ReduceOp<>(StreamShape.INT_VALUE, new Supplier<ReducingSink>() { + @Override + public ReducingSink get() { + return new ReducingSink(); + } + }); + } + + /** + * Construct a {@code ReduceOp} that implements a functional reduce on long values + * @param identity The identity for the combining function + * @param operator The combining function + * @return A {@code ReduceOp} implementing the reduction + */ + public static TerminalOp<Long, Long> + makeLong(long identity, LongBinaryOperator operator) { + class ReducingSink extends LongBox implements OpUtils.AccumulatingSink<Long, Long, ReducingSink>, Sink.OfLong { + @Override + public void begin(long size) { + state = identity; + } + + @Override + public void accept(long t) { + state = operator.applyAsLong(state, t); + } + + @Override + public void combine(ReducingSink other) { + accept(other.state); + } + } + return new ReduceOp<>(StreamShape.LONG_VALUE, new Supplier<ReducingSink>() { + @Override + public ReducingSink get() { + return new ReducingSink(); + } + }); + } + + /** + * Construct a {@code ReduceOp} that implements a functional reduce on long values, producing an optional long result + * @param operator The combining function + * @return A {@code ReduceOp} implementing the reduction + */ + public static TerminalOp<Long, OptionalLong> + makeLong(LongBinaryOperator operator) { + class ReducingSink extends OptionalLongBox implements OpUtils.AccumulatingSink<Long, OptionalLong, ReducingSink>, Sink.OfLong { + @Override + public void accept(long t) { + if (empty) { + empty = false; + state = t; + } + else { + state = operator.applyAsLong(state, t); + } + } + + @Override + public void combine(ReducingSink other) { + if (!other.empty) + accept(other.state); + } + } + + return new ReduceOp<>(StreamShape.LONG_VALUE, new Supplier<ReducingSink>() { + @Override + public ReducingSink get() { + return new ReducingSink(); + } + }); + } + + /** + * Construct a {@code ReduceOp} that implements a mutable reduce on long values + * @param collector A {@code Collector} defining the reduction + * @param <R> The type of the result + * @return A {@code ReduceOp} implementing the reduction + */ + public static <R> TerminalOp<Long, R> + makeLong(Collector.OfLong<R> collector) { + Supplier<R> supplier = collector.resultSupplier(); + ObjLongConsumer<R> accumulator = collector.longAccumulator(); + BinaryOperator<R> combiner = collector.combiner(); + class ReducingSink extends Box<R> implements OpUtils.AccumulatingSink<Long, R, ReducingSink>, Sink.OfLong { + @Override + public void begin(long size) { + state = supplier.get(); + } + + @Override + public void accept(long t) { + accumulator.accept(state, t); + } + + @Override + public void combine(ReducingSink other) { + state = combiner.apply(state, other.state); + } + } + + return new ReduceOp<>(StreamShape.LONG_VALUE, new Supplier<ReducingSink>() { + @Override + public ReducingSink get() { + return new ReducingSink(); + } + }); + } + + /** + * Construct a {@code ReduceOp} that implements a functional reduce on double values + * @param identity The identity for the combining function + * @param operator The combining function + * @return A {@code ReduceOp} implementing the reduction + */ + public static TerminalOp<Double, Double> + makeDouble(double identity, DoubleBinaryOperator operator) { + class ReducingSink extends DoubleBox implements OpUtils.AccumulatingSink<Double, Double, ReducingSink>, Sink.OfDouble { + @Override + public void begin(long size) { + state = identity; + } + + @Override + public void accept(double t) { + state = operator.applyAsDouble(state, t); + } + + @Override + public void combine(ReducingSink other) { + accept(other.state); + } + } + return new ReduceOp<>(StreamShape.DOUBLE_VALUE, new Supplier<ReducingSink>() { + @Override + public ReducingSink get() { + return new ReducingSink(); + } + }); + } + + /** + * Construct a {@code ReduceOp} that implements a functional reduce on double values, + * producing an optional double result + * @param operator The combining function + * @return A {@code ReduceOp} implementing the reduction + */ + public static TerminalOp<Double, OptionalDouble> + makeDouble(DoubleBinaryOperator operator) { + class ReducingSink extends OptionalDoubleBox implements OpUtils.AccumulatingSink<Double, OptionalDouble, ReducingSink>, Sink.OfDouble { + @Override + public void accept(double t) { + if (empty) { + empty = false; + state = t; + } + else { + state = operator.applyAsDouble(state, t); + } + } + + @Override + public void combine(ReducingSink other) { + if (!other.empty) + accept(other.state); + } + } + + return new ReduceOp<>(StreamShape.DOUBLE_VALUE, new Supplier<ReducingSink>() { + @Override + public ReducingSink get() { + return new ReducingSink(); + } + }); + } + + /** + * Construct a {@code ReduceOp} that implements a mutable reduce on double values + * @param collector A {@code Collector} defining the reduction + * @param <R> The type of the result + * @return A {@code ReduceOp} implementing the reduction + */ + public static <R> TerminalOp<Double, R> makeDouble(Collector.OfDouble<R> collector) { + Supplier<R> supplier = collector.resultSupplier(); + ObjDoubleConsumer<R> accumulator = collector.doubleAccumulator(); + BinaryOperator<R> combiner = collector.combiner(); + + class ReducingSink extends Box<R> implements OpUtils.AccumulatingSink<Double, R, ReducingSink>, Sink.OfDouble { + @Override + public void begin(long size) { + state = supplier.get(); + } + + @Override + public void accept(double t) { + accumulator.accept(state, t); + } + + @Override + public void combine(ReducingSink other) { + state = combiner.apply(state, other.state); + } + } + + return new ReduceOp<>(StreamShape.DOUBLE_VALUE, new Supplier<ReducingSink>() { + @Override + public ReducingSink get() { + return new ReducingSink(); + } + }); + } +}
--- a/src/share/classes/java/util/stream/ReferencePipeline.java Fri Feb 22 12:33:55 2013 -0500 +++ b/src/share/classes/java/util/stream/ReferencePipeline.java Fri Feb 22 12:59:59 2013 -0500 @@ -339,27 +339,27 @@ @Override public U reduce(final U identity, final BinaryOperator<U> reducer) { - return pipeline(FoldOp.makeRef(identity, reducer, reducer)); + return pipeline(ReduceOp.makeRef(identity, reducer, reducer)); } @Override public Optional<U> reduce(BinaryOperator<U> reducer) { - return pipeline(FoldOp.makeRef(reducer)); + return pipeline(ReduceOp.makeRef(reducer)); } @Override public <R> R reduce(R identity, BiFunction<R, ? super U, R> reducer, BinaryOperator<R> combiner) { - return pipeline(FoldOp.makeRef(identity, reducer, combiner)); + return pipeline(ReduceOp.makeRef(identity, reducer, combiner)); } @Override public <R> R collect(Collector<? super U, R> collector) { - return pipeline(FoldOp.makeRef(collector)); + return pipeline(ReduceOp.makeRef(collector)); } @Override public <R> R collect(Supplier<R> resultFactory, BiConsumer<R, ? super U> accumulator, BiConsumer<R, R> combiner) { - return pipeline(FoldOp.makeRef(resultFactory, accumulator, combiner)); + return pipeline(ReduceOp.makeRef(resultFactory, accumulator, combiner)); } @Override