changeset 7518:9df184b39d16

Fixes for out-of-memory; aggressively null task and spliterator references on task completion, don't retain child tasks at all in ForEach
author briangoetz
date Fri, 22 Feb 2013 12:33:55 -0500
parents 1dd294588d0c
children 5c97c99f2d72 c08707396c40
files src/share/classes/java/util/stream/AbstractTask.java src/share/classes/java/util/stream/FindOp.java src/share/classes/java/util/stream/NodeUtils.java src/share/classes/java/util/stream/OpUtils.java src/share/classes/java/util/stream/SliceOp.java
diffstat 5 files changed, 56 insertions(+), 22 deletions(-) [+]
line wrap: on
line diff
--- a/src/share/classes/java/util/stream/AbstractTask.java	Fri Feb 22 16:45:27 2013 +0000
+++ b/src/share/classes/java/util/stream/AbstractTask.java	Fri Feb 22 12:33:55 2013 -0500
@@ -81,7 +81,7 @@
     protected final PipelineHelper<P_IN, P_OUT> helper;
 
     /** The spliterator for the portion of the input associated with the subtree rooted at this task */
-    protected final Spliterator<P_IN> spliterator;
+    protected Spliterator<P_IN> spliterator;
 
     /** Target leaf size */
     protected final long targetSize;
@@ -259,6 +259,17 @@
     }
 
     /**
+     * @inheritDoc
+     * Clears spliterator and children fields.
+     * Overriders MUST call {@code super.onCompletion} as the last thing they do if they want these cleared
+     */
+    @Override
+    public void onCompletion(CountedCompleter<?> caller) {
+        spliterator = null;
+        children = null;
+    }
+
+    /**
      * Determine if the task can be computed.
      *
      * @return true if this task can be computed to either calculate the leaf
--- a/src/share/classes/java/util/stream/FindOp.java	Fri Feb 22 16:45:27 2013 +0000
+++ b/src/share/classes/java/util/stream/FindOp.java	Fri Feb 22 12:33:55 2013 -0500
@@ -265,6 +265,7 @@
                     }
                 }
             }
+            super.onCompletion(caller);
         }
     }
 }
--- a/src/share/classes/java/util/stream/NodeUtils.java	Fri Feb 22 16:45:27 2013 +0000
+++ b/src/share/classes/java/util/stream/NodeUtils.java	Fri Feb 22 12:33:55 2013 -0500
@@ -335,6 +335,7 @@
                     nodes[idx++] = cur.getLocalResult();
                 setLocalResult(Nodes.node(nodes));
             }
+            super.onCompletion(caller);
         }
     }
 
@@ -499,6 +500,7 @@
 
                 setLocalResult(Nodes.intNode(nodes));
             }
+            super.onCompletion(caller);
         }
     }
 
@@ -660,6 +662,7 @@
 
                 setLocalResult(Nodes.longNode(nodes));
             }
+            super.onCompletion(caller);
         }
     }
 
@@ -821,6 +824,7 @@
 
                 setLocalResult(Nodes.doubleNode(nodes));
             }
+            super.onCompletion(caller);
         }
     }
 
--- a/src/share/classes/java/util/stream/OpUtils.java	Fri Feb 22 16:45:27 2013 +0000
+++ b/src/share/classes/java/util/stream/OpUtils.java	Fri Feb 22 12:33:55 2013 -0500
@@ -117,38 +117,54 @@
 
 
     /** A {@code ForkJoinTask} for performing a parallel for-each operation */
-    private static class ForEachTask<S, T> extends AbstractTask<S, T, Void, ForEachTask<S, T>> {
-        // @@@ Extending AbstractTask here is potentially inefficient, since we don't really need to
-        // keep track of the structure of the computation tree
+    private static class ForEachTask<S, T> extends CountedCompleter<Void> {
+        private Spliterator<S> spliterator;
         private final Sink<S> sink;
+        private final PipelineHelper<S, T> helper;
+        private final long targetSize;
 
         private ForEachTask(PipelineHelper<S, T> helper, Sink<S> sink) {
-            super(helper);
+            super(null);
+            this.spliterator = helper.sourceSpliterator();
             this.sink = sink;
+            this.targetSize = AbstractTask.suggestTargetSize(spliterator.estimateSize());
+            this.helper = helper;
         }
 
-        private ForEachTask(ForEachTask<S, T> parent, Spliterator<S> spliterator, Sink<S> sink) {
-            super(parent, spliterator);
-            this.sink = sink;
+        private ForEachTask(ForEachTask<S, T> parent, Spliterator<S> spliterator) {
+            super(parent);
+            this.spliterator = spliterator;
+            this.sink = parent.sink;
+            this.targetSize = parent.targetSize;
+            this.helper = parent.helper;
         }
 
-        @Override
-        public boolean suggestSplit() {
-            boolean suggest = super.suggestSplit();
-            if (StreamOpFlag.SHORT_CIRCUIT.isKnown(helper.getStreamAndOpFlags()))
-                suggest = suggest && !sink.cancellationRequested();
-            return suggest;
+        public void compute() {
+            doCompute(this);
         }
 
-        @Override
-        protected ForEachTask<S, T> makeChild(Spliterator<S> spliterator) {
-            return new ForEachTask<>(this, spliterator, sink);
-        }
+        private static <S, T> void doCompute(ForEachTask<S, T> task) {
+            boolean isShortCircuit = StreamOpFlag.SHORT_CIRCUIT.isKnown(task.helper.getStreamAndOpFlags());
+            while (true) {
+                if (isShortCircuit && task.sink.cancellationRequested()) {
+                    task.propagateCompletion();
+                    task.spliterator = null;
+                    return;
+                }
 
-        @Override
-        protected Void doLeaf() {
-            helper.intoWrapped(sink, spliterator);
-            return null;
+                Spliterator<S> split = null;
+                if (!AbstractTask.suggestSplit(task.helper, task.spliterator, task.targetSize)
+                    || (split = task.spliterator.trySplit()) == null) {
+                    task.helper.intoWrapped(task.sink, task.spliterator);
+                    task.propagateCompletion();
+                    task.spliterator = null;
+                    return;
+                }
+                else {
+                    task.addToPendingCount(1);
+                    new ForEachTask<>(task, split).fork();
+                }
+            }
         }
     }
 
@@ -190,6 +206,7 @@
                 }
                 setLocalResult(result);
             }
+            super.onCompletion(caller);
         }
     }
 }
--- a/src/share/classes/java/util/stream/SliceOp.java	Fri Feb 22 16:45:27 2013 +0000
+++ b/src/share/classes/java/util/stream/SliceOp.java	Fri Feb 22 12:33:55 2013 -0500
@@ -312,6 +312,7 @@
                 if (((SliceTask<S,T>) getRoot()).leftSize() >= targetOffset + targetSize)
                     cancelLaterNodes();
             }
+            // Don't call super.onCompletion(), we don't look at the child nodes until farther up the tree
         }
 
         private long leftSize() {