Mercurial > hg > openjdk > lambda > jdk
changeset 7518:9df184b39d16
Fixes for out-of-memory; aggressively null task and spliterator references on task completion, don't retain child tasks at all in ForEach
author | briangoetz |
---|---|
date | Fri, 22 Feb 2013 12:33:55 -0500 |
parents | 1dd294588d0c |
children | 5c97c99f2d72 c08707396c40 |
files | src/share/classes/java/util/stream/AbstractTask.java src/share/classes/java/util/stream/FindOp.java src/share/classes/java/util/stream/NodeUtils.java src/share/classes/java/util/stream/OpUtils.java src/share/classes/java/util/stream/SliceOp.java |
diffstat | 5 files changed, 56 insertions(+), 22 deletions(-) [+] |
line wrap: on
line diff
--- a/src/share/classes/java/util/stream/AbstractTask.java Fri Feb 22 16:45:27 2013 +0000 +++ b/src/share/classes/java/util/stream/AbstractTask.java Fri Feb 22 12:33:55 2013 -0500 @@ -81,7 +81,7 @@ protected final PipelineHelper<P_IN, P_OUT> helper; /** The spliterator for the portion of the input associated with the subtree rooted at this task */ - protected final Spliterator<P_IN> spliterator; + protected Spliterator<P_IN> spliterator; /** Target leaf size */ protected final long targetSize; @@ -259,6 +259,17 @@ } /** + * @inheritDoc + * Clears spliterator and children fields. + * Overriders MUST call {@code super.onCompletion} as the last thing they do if they want these cleared + */ + @Override + public void onCompletion(CountedCompleter<?> caller) { + spliterator = null; + children = null; + } + + /** * Determine if the task can be computed. * * @return true if this task can be computed to either calculate the leaf
--- a/src/share/classes/java/util/stream/FindOp.java Fri Feb 22 16:45:27 2013 +0000 +++ b/src/share/classes/java/util/stream/FindOp.java Fri Feb 22 12:33:55 2013 -0500 @@ -265,6 +265,7 @@ } } } + super.onCompletion(caller); } } }
--- a/src/share/classes/java/util/stream/NodeUtils.java Fri Feb 22 16:45:27 2013 +0000 +++ b/src/share/classes/java/util/stream/NodeUtils.java Fri Feb 22 12:33:55 2013 -0500 @@ -335,6 +335,7 @@ nodes[idx++] = cur.getLocalResult(); setLocalResult(Nodes.node(nodes)); } + super.onCompletion(caller); } } @@ -499,6 +500,7 @@ setLocalResult(Nodes.intNode(nodes)); } + super.onCompletion(caller); } } @@ -660,6 +662,7 @@ setLocalResult(Nodes.longNode(nodes)); } + super.onCompletion(caller); } } @@ -821,6 +824,7 @@ setLocalResult(Nodes.doubleNode(nodes)); } + super.onCompletion(caller); } }
--- a/src/share/classes/java/util/stream/OpUtils.java Fri Feb 22 16:45:27 2013 +0000 +++ b/src/share/classes/java/util/stream/OpUtils.java Fri Feb 22 12:33:55 2013 -0500 @@ -117,38 +117,54 @@ /** A {@code ForkJoinTask} for performing a parallel for-each operation */ - private static class ForEachTask<S, T> extends AbstractTask<S, T, Void, ForEachTask<S, T>> { - // @@@ Extending AbstractTask here is potentially inefficient, since we don't really need to - // keep track of the structure of the computation tree + private static class ForEachTask<S, T> extends CountedCompleter<Void> { + private Spliterator<S> spliterator; private final Sink<S> sink; + private final PipelineHelper<S, T> helper; + private final long targetSize; private ForEachTask(PipelineHelper<S, T> helper, Sink<S> sink) { - super(helper); + super(null); + this.spliterator = helper.sourceSpliterator(); this.sink = sink; + this.targetSize = AbstractTask.suggestTargetSize(spliterator.estimateSize()); + this.helper = helper; } - private ForEachTask(ForEachTask<S, T> parent, Spliterator<S> spliterator, Sink<S> sink) { - super(parent, spliterator); - this.sink = sink; + private ForEachTask(ForEachTask<S, T> parent, Spliterator<S> spliterator) { + super(parent); + this.spliterator = spliterator; + this.sink = parent.sink; + this.targetSize = parent.targetSize; + this.helper = parent.helper; } - @Override - public boolean suggestSplit() { - boolean suggest = super.suggestSplit(); - if (StreamOpFlag.SHORT_CIRCUIT.isKnown(helper.getStreamAndOpFlags())) - suggest = suggest && !sink.cancellationRequested(); - return suggest; + public void compute() { + doCompute(this); } - @Override - protected ForEachTask<S, T> makeChild(Spliterator<S> spliterator) { - return new ForEachTask<>(this, spliterator, sink); - } + private static <S, T> void doCompute(ForEachTask<S, T> task) { + boolean isShortCircuit = StreamOpFlag.SHORT_CIRCUIT.isKnown(task.helper.getStreamAndOpFlags()); + while (true) { + if (isShortCircuit && task.sink.cancellationRequested()) { + task.propagateCompletion(); + task.spliterator = null; + return; + } - @Override - protected Void doLeaf() { - helper.intoWrapped(sink, spliterator); - return null; + Spliterator<S> split = null; + if (!AbstractTask.suggestSplit(task.helper, task.spliterator, task.targetSize) + || (split = task.spliterator.trySplit()) == null) { + task.helper.intoWrapped(task.sink, task.spliterator); + task.propagateCompletion(); + task.spliterator = null; + return; + } + else { + task.addToPendingCount(1); + new ForEachTask<>(task, split).fork(); + } + } } } @@ -190,6 +206,7 @@ } setLocalResult(result); } + super.onCompletion(caller); } } }
--- a/src/share/classes/java/util/stream/SliceOp.java Fri Feb 22 16:45:27 2013 +0000 +++ b/src/share/classes/java/util/stream/SliceOp.java Fri Feb 22 12:33:55 2013 -0500 @@ -312,6 +312,7 @@ if (((SliceTask<S,T>) getRoot()).leftSize() >= targetOffset + targetSize) cancelLaterNodes(); } + // Don't call super.onCompletion(), we don't look at the child nodes until farther up the tree } private long leftSize() {