Mercurial > hg > openjdk > aarch64-port > jdk
changeset 10013:03913dedfb12
8056249: Improve CompletableFuture resource usage
Reviewed-by: psandoz, chegar, martin
author | dl |
---|---|
date | Fri, 05 Sep 2014 10:48:11 +0200 |
parents | aa400be54fec |
children | ba77067a033a |
files | src/share/classes/java/util/concurrent/CompletableFuture.java src/share/classes/java/util/concurrent/CompletionStage.java |
diffstat | 2 files changed, 1776 insertions(+), 2391 deletions(-) [+] |
line wrap: on
line diff
--- a/src/share/classes/java/util/concurrent/CompletableFuture.java Thu Sep 04 13:00:55 2014 -0700 +++ b/src/share/classes/java/util/concurrent/CompletableFuture.java Fri Sep 05 10:48:11 2014 +0200 @@ -50,7 +50,6 @@ import java.util.concurrent.CancellationException; import java.util.concurrent.CompletionException; import java.util.concurrent.CompletionStage; -import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.LockSupport; /** @@ -77,9 +76,9 @@ * <li>All <em>async</em> methods without an explicit Executor * argument are performed using the {@link ForkJoinPool#commonPool()} * (unless it does not support a parallelism level of at least two, in - * which case, a new Thread is used). To simplify monitoring, - * debugging, and tracking, all generated asynchronous tasks are - * instances of the marker interface {@link + * which case, a new Thread is created to run each task). To simplify + * monitoring, debugging, and tracking, all generated asynchronous + * tasks are instances of the marker interface {@link * AsynchronousCompletionTask}. </li> * * <li>All CompletionStage methods are implemented independently of @@ -113,141 +112,1556 @@ /* * Overview: * - * 1. Non-nullness of field result (set via CAS) indicates done. - * An AltResult is used to box null as a result, as well as to - * hold exceptions. Using a single field makes completion fast - * and simple to detect and trigger, at the expense of a lot of - * encoding and decoding that infiltrates many methods. One minor - * simplification relies on the (static) NIL (to box null results) - * being the only AltResult with a null exception field, so we - * don't usually need explicit comparisons with NIL. The CF - * exception propagation mechanics surrounding decoding rely on - * unchecked casts of decoded results really being unchecked, - * where user type errors are caught at point of use, as is - * currently the case in Java. These are highlighted by using - * SuppressWarnings-annotated temporaries. + * A CompletableFuture may have dependent completion actions, + * collected in a linked stack. It atomically completes by CASing + * a result field, and then pops off and runs those actions. This + * applies across normal vs exceptional outcomes, sync vs async + * actions, binary triggers, and various forms of completions. + * + * Non-nullness of field result (set via CAS) indicates done. An + * AltResult is used to box null as a result, as well as to hold + * exceptions. Using a single field makes completion simple to + * detect and trigger. Encoding and decoding is straightforward + * but adds to the sprawl of trapping and associating exceptions + * with targets. Minor simplifications rely on (static) NIL (to + * box null results) being the only AltResult with a null + * exception field, so we don't usually need explicit comparisons. + * Even though some of the generics casts are unchecked (see + * SuppressWarnings annotations), they are placed to be + * appropriate even if checked. * - * 2. Waiters are held in a Treiber stack similar to the one used - * in FutureTask, Phaser, and SynchronousQueue. See their - * internal documentation for algorithmic details. + * Dependent actions are represented by Completion objects linked + * as Treiber stacks headed by field "stack". There are Completion + * classes for each kind of action, grouped into single-input + * (UniCompletion), two-input (BiCompletion), projected + * (BiCompletions using either (not both) of two inputs), shared + * (CoCompletion, used by the second of two sources), zero-input + * source actions, and Signallers that unblock waiters. Class + * Completion extends ForkJoinTask to enable async execution + * (adding no space overhead because we exploit its "tag" methods + * to maintain claims). It is also declared as Runnable to allow + * usage with arbitrary executors. + * + * Support for each kind of CompletionStage relies on a separate + * class, along with two CompletableFuture methods: + * + * * A Completion class with name X corresponding to function, + * prefaced with "Uni", "Bi", or "Or". Each class contains + * fields for source(s), actions, and dependent. They are + * boringly similar, differing from others only with respect to + * underlying functional forms. We do this so that users don't + * encounter layers of adaptors in common usages. We also + * include "Relay" classes/methods that don't correspond to user + * methods; they copy results from one stage to another. + * + * * Boolean CompletableFuture method x(...) (for example + * uniApply) takes all of the arguments needed to check that an + * action is triggerable, and then either runs the action or + * arranges its async execution by executing its Completion + * argument, if present. The method returns true if known to be + * complete. * - * 3. Completions are also kept in a list/stack, and pulled off - * and run when completion is triggered. (We could even use the - * same stack as for waiters, but would give up the potential - * parallelism obtained because woken waiters help release/run - * others -- see method postComplete). Because post-processing - * may race with direct calls, class Completion opportunistically - * extends AtomicInteger so callers can claim the action via - * compareAndSet(0, 1). The Completion.run methods are all - * written a boringly similar uniform way (that sometimes includes - * unnecessary-looking checks, kept to maintain uniformity). - * There are enough dimensions upon which they differ that - * attempts to factor commonalities while maintaining efficiency - * require more lines of code than they would save. + * * Completion method tryFire(int mode) invokes the associated x + * method with its held arguments, and on success cleans up. + * The mode argument allows tryFire to be called twice (SYNC, + * then ASYNC); the first to screen and trap exceptions while + * arranging to execute, and the second when called from a + * task. (A few classes are not used async so take slightly + * different forms.) The claim() callback suppresses function + * invocation if already claimed by another thread. + * + * * CompletableFuture method xStage(...) is called from a public + * stage method of CompletableFuture x. It screens user + * arguments and invokes and/or creates the stage object. If + * not async and x is already complete, the action is run + * immediately. Otherwise a Completion c is created, pushed to + * x's stack (unless done), and started or triggered via + * c.tryFire. This also covers races possible if x completes + * while pushing. Classes with two inputs (for example BiApply) + * deal with races across both while pushing actions. The + * second completion is a CoCompletion pointing to the first, + * shared so that at most one performs the action. The + * multiple-arity methods allOf and anyOf do this pairwise to + * form trees of completions. + * + * Note that the generic type parameters of methods vary according + * to whether "this" is a source, dependent, or completion. * - * 4. The exported then/and/or methods do support a bit of - * factoring (see doThenApply etc). They must cope with the - * intrinsic races surrounding addition of a dependent action - * versus performing the action directly because the task is - * already complete. For example, a CF may not be complete upon - * entry, so a dependent completion is added, but by the time it - * is added, the target CF is complete, so must be directly - * executed. This is all done while avoiding unnecessary object - * construction in safe-bypass cases. + * Method postComplete is called upon completion unless the target + * is guaranteed not to be observable (i.e., not yet returned or + * linked). Multiple threads can call postComplete, which + * atomically pops each dependent action, and tries to trigger it + * via method tryFire, in NESTED mode. Triggering can propagate + * recursively, so NESTED mode returns its completed dependent (if + * one exists) for further processing by its caller (see method + * postFire). + * + * Blocking methods get() and join() rely on Signaller Completions + * that wake up waiting threads. The mechanics are similar to + * Treiber stack wait-nodes used in FutureTask, Phaser, and + * SynchronousQueue. See their internal documentation for + * algorithmic details. + * + * Without precautions, CompletableFutures would be prone to + * garbage accumulation as chains of Completions build up, each + * pointing back to its sources. So we null out fields as soon as + * possible (see especially method Completion.detach). The + * screening checks needed anyway harmlessly ignore null arguments + * that may have been obtained during races with threads nulling + * out fields. We also try to unlink fired Completions from + * stacks that might never be popped (see method postFire). + * Completion fields need not be declared as final or volatile + * because they are only visible to other threads upon safe + * publication. */ - // preliminaries + volatile Object result; // Either the result or boxed AltResult + volatile Completion stack; // Top of Treiber stack of dependent actions + + final boolean internalComplete(Object r) { // CAS from null to r + return UNSAFE.compareAndSwapObject(this, RESULT, null, r); + } + + final boolean casStack(Completion cmp, Completion val) { + return UNSAFE.compareAndSwapObject(this, STACK, cmp, val); + } + + /** Returns true if successfully pushed c onto stack. */ + final boolean tryPushStack(Completion c) { + Completion h = stack; + lazySetNext(c, h); + return UNSAFE.compareAndSwapObject(this, STACK, h, c); + } + + /** Unconditionally pushes c onto stack, retrying if necessary. */ + final void pushStack(Completion c) { + do {} while (!tryPushStack(c)); + } + + /* ------------- Encoding and decoding outcomes -------------- */ + + static final class AltResult { // See above + final Throwable ex; // null only for NIL + AltResult(Throwable x) { this.ex = x; } + } - static final class AltResult { - final Throwable ex; // null only for NIL - AltResult(Throwable ex) { this.ex = ex; } + /** The encoding of the null value. */ + static final AltResult NIL = new AltResult(null); + + /** Completes with the null value, unless already completed. */ + final boolean completeNull() { + return UNSAFE.compareAndSwapObject(this, RESULT, null, + NIL); + } + + /** Returns the encoding of the given non-exceptional value. */ + final Object encodeValue(T t) { + return (t == null) ? NIL : t; + } + + /** Completes with a non-exceptional result, unless already completed. */ + final boolean completeValue(T t) { + return UNSAFE.compareAndSwapObject(this, RESULT, null, + (t == null) ? NIL : t); + } + + /** + * Returns the encoding of the given (non-null) exception as a + * wrapped CompletionException unless it is one already. + */ + static AltResult encodeThrowable(Throwable x) { + return new AltResult((x instanceof CompletionException) ? x : + new CompletionException(x)); + } + + /** Completes with an exceptional result, unless already completed. */ + final boolean completeThrowable(Throwable x) { + return UNSAFE.compareAndSwapObject(this, RESULT, null, + encodeThrowable(x)); } - static final AltResult NIL = new AltResult(null); + /** + * Returns the encoding of the given (non-null) exception as a + * wrapped CompletionException unless it is one already. May + * return the given Object r (which must have been the result of a + * source future) if it is equivalent, i.e. if this is a simple + * relay of an existing CompletionException. + */ + static Object encodeThrowable(Throwable x, Object r) { + if (!(x instanceof CompletionException)) + x = new CompletionException(x); + else if (r instanceof AltResult && x == ((AltResult)r).ex) + return r; + return new AltResult(x); + } - // Fields + /** + * Completes with the given (non-null) exceptional result as a + * wrapped CompletionException unless it is one already, unless + * already completed. May complete with the given Object r + * (which must have been the result of a source future) if it is + * equivalent, i.e. if this is a simple propagation of an + * existing CompletionException. + */ + final boolean completeThrowable(Throwable x, Object r) { + return UNSAFE.compareAndSwapObject(this, RESULT, null, + encodeThrowable(x, r)); + } + + /** + * Returns the encoding of the given arguments: if the exception + * is non-null, encodes as AltResult. Otherwise uses the given + * value, boxed as NIL if null. + */ + Object encodeOutcome(T t, Throwable x) { + return (x == null) ? (t == null) ? NIL : t : encodeThrowable(x); + } - volatile Object result; // Either the result or boxed AltResult - volatile WaitNode waiters; // Treiber stack of threads blocked on get() - volatile CompletionNode completions; // list (Treiber stack) of completions + /** + * Returns the encoding of a copied outcome; if exceptional, + * rewraps as a CompletionException, else returns argument. + */ + static Object encodeRelay(Object r) { + Throwable x; + return (((r instanceof AltResult) && + (x = ((AltResult)r).ex) != null && + !(x instanceof CompletionException)) ? + new AltResult(new CompletionException(x)) : r); + } + + /** + * Completes with r or a copy of r, unless already completed. + * If exceptional, r is first coerced to a CompletionException. + */ + final boolean completeRelay(Object r) { + return UNSAFE.compareAndSwapObject(this, RESULT, null, + encodeRelay(r)); + } - // Basic utilities for triggering and processing completions + /** + * Reports result using Future.get conventions. + */ + private static <T> T reportGet(Object r) + throws InterruptedException, ExecutionException { + if (r == null) // by convention below, null means interrupted + throw new InterruptedException(); + if (r instanceof AltResult) { + Throwable x, cause; + if ((x = ((AltResult)r).ex) == null) + return null; + if (x instanceof CancellationException) + throw (CancellationException)x; + if ((x instanceof CompletionException) && + (cause = x.getCause()) != null) + x = cause; + throw new ExecutionException(x); + } + @SuppressWarnings("unchecked") T t = (T) r; + return t; + } /** - * Removes and signals all waiting threads and runs all completions. + * Decodes outcome to return result or throw unchecked exception. + */ + private static <T> T reportJoin(Object r) { + if (r instanceof AltResult) { + Throwable x; + if ((x = ((AltResult)r).ex) == null) + return null; + if (x instanceof CancellationException) + throw (CancellationException)x; + if (x instanceof CompletionException) + throw (CompletionException)x; + throw new CompletionException(x); + } + @SuppressWarnings("unchecked") T t = (T) r; + return t; + } + + /* ------------- Async task preliminaries -------------- */ + + /** + * A marker interface identifying asynchronous tasks produced by + * {@code async} methods. This may be useful for monitoring, + * debugging, and tracking asynchronous activities. + * + * @since 1.8 + */ + public static interface AsynchronousCompletionTask { + } + + private static final boolean useCommonPool = + (ForkJoinPool.getCommonPoolParallelism() > 1); + + /** + * Default executor -- ForkJoinPool.commonPool() unless it cannot + * support parallelism. + */ + private static final Executor asyncPool = useCommonPool ? + ForkJoinPool.commonPool() : new ThreadPerTaskExecutor(); + + /** Fallback if ForkJoinPool.commonPool() cannot support parallelism */ + static final class ThreadPerTaskExecutor implements Executor { + public void execute(Runnable r) { new Thread(r).start(); } + } + + /** + * Null-checks user executor argument, and translates uses of + * commonPool to asyncPool in case parallelism disabled. + */ + static Executor screenExecutor(Executor e) { + if (!useCommonPool && e == ForkJoinPool.commonPool()) + return asyncPool; + if (e == null) throw new NullPointerException(); + return e; + } + + // Modes for Completion.tryFire. Signedness matters. + static final int SYNC = 0; + static final int ASYNC = 1; + static final int NESTED = -1; + + /* ------------- Base Completion classes and operations -------------- */ + + @SuppressWarnings("serial") + abstract static class Completion extends ForkJoinTask<Void> + implements Runnable, AsynchronousCompletionTask { + volatile Completion next; // Treiber stack link + + /** + * Performs completion action if triggered, returning a + * dependent that may need propagation, if one exists. + * + * @param mode SYNC, ASYNC, or NESTED + */ + abstract CompletableFuture<?> tryFire(int mode); + + /** Returns true if possibly still triggerable. Used by cleanStack. */ + abstract boolean isLive(); + + public final void run() { tryFire(ASYNC); } + public final boolean exec() { tryFire(ASYNC); return true; } + public final Void getRawResult() { return null; } + public final void setRawResult(Void v) {} + } + + static void lazySetNext(Completion c, Completion next) { + UNSAFE.putOrderedObject(c, NEXT, next); + } + + /** + * Pops and tries to trigger all reachable dependents. Call only + * when known to be done. */ final void postComplete() { - WaitNode q; Thread t; - while ((q = waiters) != null) { - if (UNSAFE.compareAndSwapObject(this, WAITERS, q, q.next) && - (t = q.thread) != null) { - q.thread = null; - LockSupport.unpark(t); + /* + * On each step, variable f holds current dependents to pop + * and run. It is extended along only one path at a time, + * pushing others to avoid unbounded recursion. + */ + CompletableFuture<?> f = this; Completion h; + while ((h = f.stack) != null || + (f != this && (h = (f = this).stack) != null)) { + CompletableFuture<?> d; Completion t; + if (f.casStack(h, t = h.next)) { + if (t != null) { + if (f != this) { + pushStack(h); + continue; + } + h.next = null; // detach + } + f = (d = h.tryFire(NESTED)) == null ? this : d; } } + } - CompletionNode h; Completion c; - while ((h = completions) != null) { - if (UNSAFE.compareAndSwapObject(this, COMPLETIONS, h, h.next) && - (c = h.completion) != null) - c.run(); + /** Traverses stack and unlinks dead Completions. */ + final void cleanStack() { + for (Completion p = null, q = stack; q != null;) { + Completion s = q.next; + if (q.isLive()) { + p = q; + q = s; + } + else if (p == null) { + casStack(q, s); + q = stack; + } + else { + p.next = s; + if (p.isLive()) + q = s; + else { + p = null; // restart + q = stack; + } + } + } + } + + /* ------------- One-input Completions -------------- */ + + /** A Completion with a source, dependent, and executor. */ + @SuppressWarnings("serial") + abstract static class UniCompletion<T,V> extends Completion { + Executor executor; // executor to use (null if none) + CompletableFuture<V> dep; // the dependent to complete + CompletableFuture<T> src; // source for action + + UniCompletion(Executor executor, CompletableFuture<V> dep, + CompletableFuture<T> src) { + this.executor = executor; this.dep = dep; this.src = src; + } + + /** + * Returns true if action can be run. Call only when known to + * be triggerable. Uses FJ tag bit to ensure that only one + * thread claims ownership. If async, starts as task -- a + * later call to tryFire will run action. + */ + final boolean claim() { + Executor e = executor; + if (compareAndSetForkJoinTaskTag((short)0, (short)1)) { + if (e == null) + return true; + executor = null; // disable + e.execute(this); + } + return false; + } + + final boolean isLive() { return dep != null; } + } + + /** Pushes the given completion (if it exists) unless done. */ + final void push(UniCompletion<?,?> c) { + if (c != null) { + while (result == null && !tryPushStack(c)) + lazySetNext(c, null); // clear on failure } } /** - * Triggers completion with the encoding of the given arguments: - * if the exception is non-null, encodes it as a wrapped - * CompletionException unless it is one already. Otherwise uses - * the given result, boxed as NIL if null. + * Post-processing by dependent after successful UniCompletion + * tryFire. Tries to clean stack of source a, and then either runs + * postComplete or returns this to caller, depending on mode. */ - final void internalComplete(T v, Throwable ex) { - if (result == null) - UNSAFE.compareAndSwapObject - (this, RESULT, null, - (ex == null) ? (v == null) ? NIL : v : - new AltResult((ex instanceof CompletionException) ? ex : - new CompletionException(ex))); - postComplete(); // help out even if not triggered + final CompletableFuture<T> postFire(CompletableFuture<?> a, int mode) { + if (a != null && a.stack != null) { + if (mode < 0 || a.result == null) + a.cleanStack(); + else + a.postComplete(); + } + if (result != null && stack != null) { + if (mode < 0) + return this; + else + postComplete(); + } + return null; + } + + @SuppressWarnings("serial") + static final class UniApply<T,V> extends UniCompletion<T,V> { + Function<? super T,? extends V> fn; + UniApply(Executor executor, CompletableFuture<V> dep, + CompletableFuture<T> src, + Function<? super T,? extends V> fn) { + super(executor, dep, src); this.fn = fn; + } + final CompletableFuture<V> tryFire(int mode) { + CompletableFuture<V> d; CompletableFuture<T> a; + if ((d = dep) == null || + !d.uniApply(a = src, fn, mode > 0 ? null : this)) + return null; + dep = null; src = null; fn = null; + return d.postFire(a, mode); + } + } + + final <S> boolean uniApply(CompletableFuture<S> a, + Function<? super S,? extends T> f, + UniApply<S,T> c) { + Object r; Throwable x; + if (a == null || (r = a.result) == null || f == null) + return false; + tryComplete: if (result == null) { + if (r instanceof AltResult) { + if ((x = ((AltResult)r).ex) != null) { + completeThrowable(x, r); + break tryComplete; + } + r = null; + } + try { + if (c != null && !c.claim()) + return false; + @SuppressWarnings("unchecked") S s = (S) r; + completeValue(f.apply(s)); + } catch (Throwable ex) { + completeThrowable(ex); + } + } + return true; + } + + private <V> CompletableFuture<V> uniApplyStage( + Executor e, Function<? super T,? extends V> f) { + if (f == null) throw new NullPointerException(); + CompletableFuture<V> d = new CompletableFuture<V>(); + if (e != null || !d.uniApply(this, f, null)) { + UniApply<T,V> c = new UniApply<T,V>(e, d, this, f); + push(c); + c.tryFire(SYNC); + } + return d; + } + + @SuppressWarnings("serial") + static final class UniAccept<T> extends UniCompletion<T,Void> { + Consumer<? super T> fn; + UniAccept(Executor executor, CompletableFuture<Void> dep, + CompletableFuture<T> src, Consumer<? super T> fn) { + super(executor, dep, src); this.fn = fn; + } + final CompletableFuture<Void> tryFire(int mode) { + CompletableFuture<Void> d; CompletableFuture<T> a; + if ((d = dep) == null || + !d.uniAccept(a = src, fn, mode > 0 ? null : this)) + return null; + dep = null; src = null; fn = null; + return d.postFire(a, mode); + } + } + + final <S> boolean uniAccept(CompletableFuture<S> a, + Consumer<? super S> f, UniAccept<S> c) { + Object r; Throwable x; + if (a == null || (r = a.result) == null || f == null) + return false; + tryComplete: if (result == null) { + if (r instanceof AltResult) { + if ((x = ((AltResult)r).ex) != null) { + completeThrowable(x, r); + break tryComplete; + } + r = null; + } + try { + if (c != null && !c.claim()) + return false; + @SuppressWarnings("unchecked") S s = (S) r; + f.accept(s); + completeNull(); + } catch (Throwable ex) { + completeThrowable(ex); + } + } + return true; + } + + private CompletableFuture<Void> uniAcceptStage(Executor e, + Consumer<? super T> f) { + if (f == null) throw new NullPointerException(); + CompletableFuture<Void> d = new CompletableFuture<Void>(); + if (e != null || !d.uniAccept(this, f, null)) { + UniAccept<T> c = new UniAccept<T>(e, d, this, f); + push(c); + c.tryFire(SYNC); + } + return d; + } + + @SuppressWarnings("serial") + static final class UniRun<T> extends UniCompletion<T,Void> { + Runnable fn; + UniRun(Executor executor, CompletableFuture<Void> dep, + CompletableFuture<T> src, Runnable fn) { + super(executor, dep, src); this.fn = fn; + } + final CompletableFuture<Void> tryFire(int mode) { + CompletableFuture<Void> d; CompletableFuture<T> a; + if ((d = dep) == null || + !d.uniRun(a = src, fn, mode > 0 ? null : this)) + return null; + dep = null; src = null; fn = null; + return d.postFire(a, mode); + } + } + + final boolean uniRun(CompletableFuture<?> a, Runnable f, UniRun<?> c) { + Object r; Throwable x; + if (a == null || (r = a.result) == null || f == null) + return false; + if (result == null) { + if (r instanceof AltResult && (x = ((AltResult)r).ex) != null) + completeThrowable(x, r); + else + try { + if (c != null && !c.claim()) + return false; + f.run(); + completeNull(); + } catch (Throwable ex) { + completeThrowable(ex); + } + } + return true; + } + + private CompletableFuture<Void> uniRunStage(Executor e, Runnable f) { + if (f == null) throw new NullPointerException(); + CompletableFuture<Void> d = new CompletableFuture<Void>(); + if (e != null || !d.uniRun(this, f, null)) { + UniRun<T> c = new UniRun<T>(e, d, this, f); + push(c); + c.tryFire(SYNC); + } + return d; + } + + @SuppressWarnings("serial") + static final class UniWhenComplete<T> extends UniCompletion<T,T> { + BiConsumer<? super T, ? super Throwable> fn; + UniWhenComplete(Executor executor, CompletableFuture<T> dep, + CompletableFuture<T> src, + BiConsumer<? super T, ? super Throwable> fn) { + super(executor, dep, src); this.fn = fn; + } + final CompletableFuture<T> tryFire(int mode) { + CompletableFuture<T> d; CompletableFuture<T> a; + if ((d = dep) == null || + !d.uniWhenComplete(a = src, fn, mode > 0 ? null : this)) + return null; + dep = null; src = null; fn = null; + return d.postFire(a, mode); + } + } + + final boolean uniWhenComplete(CompletableFuture<T> a, + BiConsumer<? super T,? super Throwable> f, + UniWhenComplete<T> c) { + Object r; T t; Throwable x = null; + if (a == null || (r = a.result) == null || f == null) + return false; + if (result == null) { + try { + if (c != null && !c.claim()) + return false; + if (r instanceof AltResult) { + x = ((AltResult)r).ex; + t = null; + } else { + @SuppressWarnings("unchecked") T tr = (T) r; + t = tr; + } + f.accept(t, x); + if (x == null) { + internalComplete(r); + return true; + } + } catch (Throwable ex) { + if (x == null) + x = ex; + } + completeThrowable(x, r); + } + return true; + } + + private CompletableFuture<T> uniWhenCompleteStage( + Executor e, BiConsumer<? super T, ? super Throwable> f) { + if (f == null) throw new NullPointerException(); + CompletableFuture<T> d = new CompletableFuture<T>(); + if (e != null || !d.uniWhenComplete(this, f, null)) { + UniWhenComplete<T> c = new UniWhenComplete<T>(e, d, this, f); + push(c); + c.tryFire(SYNC); + } + return d; + } + + @SuppressWarnings("serial") + static final class UniHandle<T,V> extends UniCompletion<T,V> { + BiFunction<? super T, Throwable, ? extends V> fn; + UniHandle(Executor executor, CompletableFuture<V> dep, + CompletableFuture<T> src, + BiFunction<? super T, Throwable, ? extends V> fn) { + super(executor, dep, src); this.fn = fn; + } + final CompletableFuture<V> tryFire(int mode) { + CompletableFuture<V> d; CompletableFuture<T> a; + if ((d = dep) == null || + !d.uniHandle(a = src, fn, mode > 0 ? null : this)) + return null; + dep = null; src = null; fn = null; + return d.postFire(a, mode); + } } - /** - * If triggered, helps release and/or process completions. - */ - final void helpPostComplete() { - if (result != null) - postComplete(); + final <S> boolean uniHandle(CompletableFuture<S> a, + BiFunction<? super S, Throwable, ? extends T> f, + UniHandle<S,T> c) { + Object r; S s; Throwable x; + if (a == null || (r = a.result) == null || f == null) + return false; + if (result == null) { + try { + if (c != null && !c.claim()) + return false; + if (r instanceof AltResult) { + x = ((AltResult)r).ex; + s = null; + } else { + x = null; + @SuppressWarnings("unchecked") S ss = (S) r; + s = ss; + } + completeValue(f.apply(s, x)); + } catch (Throwable ex) { + completeThrowable(ex); + } + } + return true; + } + + private <V> CompletableFuture<V> uniHandleStage( + Executor e, BiFunction<? super T, Throwable, ? extends V> f) { + if (f == null) throw new NullPointerException(); + CompletableFuture<V> d = new CompletableFuture<V>(); + if (e != null || !d.uniHandle(this, f, null)) { + UniHandle<T,V> c = new UniHandle<T,V>(e, d, this, f); + push(c); + c.tryFire(SYNC); + } + return d; + } + + @SuppressWarnings("serial") + static final class UniExceptionally<T> extends UniCompletion<T,T> { + Function<? super Throwable, ? extends T> fn; + UniExceptionally(CompletableFuture<T> dep, CompletableFuture<T> src, + Function<? super Throwable, ? extends T> fn) { + super(null, dep, src); this.fn = fn; + } + final CompletableFuture<T> tryFire(int mode) { // never ASYNC + // assert mode != ASYNC; + CompletableFuture<T> d; CompletableFuture<T> a; + if ((d = dep) == null || !d.uniExceptionally(a = src, fn, this)) + return null; + dep = null; src = null; fn = null; + return d.postFire(a, mode); + } + } + + final boolean uniExceptionally(CompletableFuture<T> a, + Function<? super Throwable, ? extends T> f, + UniExceptionally<T> c) { + Object r; Throwable x; + if (a == null || (r = a.result) == null || f == null) + return false; + if (result == null) { + try { + if (r instanceof AltResult && (x = ((AltResult)r).ex) != null) { + if (c != null && !c.claim()) + return false; + completeValue(f.apply(x)); + } else + internalComplete(r); + } catch (Throwable ex) { + completeThrowable(ex); + } + } + return true; + } + + private CompletableFuture<T> uniExceptionallyStage( + Function<Throwable, ? extends T> f) { + if (f == null) throw new NullPointerException(); + CompletableFuture<T> d = new CompletableFuture<T>(); + if (!d.uniExceptionally(this, f, null)) { + UniExceptionally<T> c = new UniExceptionally<T>(d, this, f); + push(c); + c.tryFire(SYNC); + } + return d; + } + + @SuppressWarnings("serial") + static final class UniRelay<T> extends UniCompletion<T,T> { // for Compose + UniRelay(CompletableFuture<T> dep, CompletableFuture<T> src) { + super(null, dep, src); + } + final CompletableFuture<T> tryFire(int mode) { + CompletableFuture<T> d; CompletableFuture<T> a; + if ((d = dep) == null || !d.uniRelay(a = src)) + return null; + src = null; dep = null; + return d.postFire(a, mode); + } + } + + final boolean uniRelay(CompletableFuture<T> a) { + Object r; + if (a == null || (r = a.result) == null) + return false; + if (result == null) // no need to claim + completeRelay(r); + return true; + } + + @SuppressWarnings("serial") + static final class UniCompose<T,V> extends UniCompletion<T,V> { + Function<? super T, ? extends CompletionStage<V>> fn; + UniCompose(Executor executor, CompletableFuture<V> dep, + CompletableFuture<T> src, + Function<? super T, ? extends CompletionStage<V>> fn) { + super(executor, dep, src); this.fn = fn; + } + final CompletableFuture<V> tryFire(int mode) { + CompletableFuture<V> d; CompletableFuture<T> a; + if ((d = dep) == null || + !d.uniCompose(a = src, fn, mode > 0 ? null : this)) + return null; + dep = null; src = null; fn = null; + return d.postFire(a, mode); + } + } + + final <S> boolean uniCompose( + CompletableFuture<S> a, + Function<? super S, ? extends CompletionStage<T>> f, + UniCompose<S,T> c) { + Object r; Throwable x; + if (a == null || (r = a.result) == null || f == null) + return false; + tryComplete: if (result == null) { + if (r instanceof AltResult) { + if ((x = ((AltResult)r).ex) != null) { + completeThrowable(x, r); + break tryComplete; + } + r = null; + } + try { + if (c != null && !c.claim()) + return false; + @SuppressWarnings("unchecked") S s = (S) r; + CompletableFuture<T> g = f.apply(s).toCompletableFuture(); + if (g.result == null || !uniRelay(g)) { + UniRelay<T> copy = new UniRelay<T>(this, g); + g.push(copy); + copy.tryFire(SYNC); + if (result == null) + return false; + } + } catch (Throwable ex) { + completeThrowable(ex); + } + } + return true; + } + + private <V> CompletableFuture<V> uniComposeStage( + Executor e, Function<? super T, ? extends CompletionStage<V>> f) { + if (f == null) throw new NullPointerException(); + Object r; Throwable x; + if (e == null && (r = result) != null) { + // try to return function result directly + if (r instanceof AltResult) { + if ((x = ((AltResult)r).ex) != null) { + return new CompletableFuture<V>(encodeThrowable(x, r)); + } + r = null; + } + try { + @SuppressWarnings("unchecked") T t = (T) r; + return f.apply(t).toCompletableFuture(); + } catch (Throwable ex) { + return new CompletableFuture<V>(encodeThrowable(ex)); + } + } + CompletableFuture<V> d = new CompletableFuture<V>(); + UniCompose<T,V> c = new UniCompose<T,V>(e, d, this, f); + push(c); + c.tryFire(SYNC); + return d; + } + + /* ------------- Two-input Completions -------------- */ + + /** A Completion for an action with two sources */ + @SuppressWarnings("serial") + abstract static class BiCompletion<T,U,V> extends UniCompletion<T,V> { + CompletableFuture<U> snd; // second source for action + BiCompletion(Executor executor, CompletableFuture<V> dep, + CompletableFuture<T> src, CompletableFuture<U> snd) { + super(executor, dep, src); this.snd = snd; + } + } + + /** A Completion delegating to a BiCompletion */ + @SuppressWarnings("serial") + static final class CoCompletion extends Completion { + BiCompletion<?,?,?> base; + CoCompletion(BiCompletion<?,?,?> base) { this.base = base; } + final CompletableFuture<?> tryFire(int mode) { + BiCompletion<?,?,?> c; CompletableFuture<?> d; + if ((c = base) == null || (d = c.tryFire(mode)) == null) + return null; + base = null; // detach + return d; + } + final boolean isLive() { + BiCompletion<?,?,?> c; + return (c = base) != null && c.dep != null; + } + } + + /** Pushes completion to this and b unless both done. */ + final void bipush(CompletableFuture<?> b, BiCompletion<?,?,?> c) { + if (c != null) { + Object r; + while ((r = result) == null && !tryPushStack(c)) + lazySetNext(c, null); // clear on failure + if (b != null && b != this && b.result == null) { + Completion q = (r != null) ? c : new CoCompletion(c); + while (b.result == null && !b.tryPushStack(q)) + lazySetNext(q, null); // clear on failure + } + } + } + + /** Post-processing after successful BiCompletion tryFire. */ + final CompletableFuture<T> postFire(CompletableFuture<?> a, + CompletableFuture<?> b, int mode) { + if (b != null && b.stack != null) { // clean second source + if (mode < 0 || b.result == null) + b.cleanStack(); + else + b.postComplete(); + } + return postFire(a, mode); + } + + @SuppressWarnings("serial") + static final class BiApply<T,U,V> extends BiCompletion<T,U,V> { + BiFunction<? super T,? super U,? extends V> fn; + BiApply(Executor executor, CompletableFuture<V> dep, + CompletableFuture<T> src, CompletableFuture<U> snd, + BiFunction<? super T,? super U,? extends V> fn) { + super(executor, dep, src, snd); this.fn = fn; + } + final CompletableFuture<V> tryFire(int mode) { + CompletableFuture<V> d; + CompletableFuture<T> a; + CompletableFuture<U> b; + if ((d = dep) == null || + !d.biApply(a = src, b = snd, fn, mode > 0 ? null : this)) + return null; + dep = null; src = null; snd = null; fn = null; + return d.postFire(a, b, mode); + } } - /* ------------- waiting for completions -------------- */ + final <R,S> boolean biApply(CompletableFuture<R> a, + CompletableFuture<S> b, + BiFunction<? super R,? super S,? extends T> f, + BiApply<R,S,T> c) { + Object r, s; Throwable x; + if (a == null || (r = a.result) == null || + b == null || (s = b.result) == null || f == null) + return false; + tryComplete: if (result == null) { + if (r instanceof AltResult) { + if ((x = ((AltResult)r).ex) != null) { + completeThrowable(x, r); + break tryComplete; + } + r = null; + } + if (s instanceof AltResult) { + if ((x = ((AltResult)s).ex) != null) { + completeThrowable(x, s); + break tryComplete; + } + s = null; + } + try { + if (c != null && !c.claim()) + return false; + @SuppressWarnings("unchecked") R rr = (R) r; + @SuppressWarnings("unchecked") S ss = (S) s; + completeValue(f.apply(rr, ss)); + } catch (Throwable ex) { + completeThrowable(ex); + } + } + return true; + } + + private <U,V> CompletableFuture<V> biApplyStage( + Executor e, CompletionStage<U> o, + BiFunction<? super T,? super U,? extends V> f) { + CompletableFuture<U> b; + if (f == null || (b = o.toCompletableFuture()) == null) + throw new NullPointerException(); + CompletableFuture<V> d = new CompletableFuture<V>(); + if (e != null || !d.biApply(this, b, f, null)) { + BiApply<T,U,V> c = new BiApply<T,U,V>(e, d, this, b, f); + bipush(b, c); + c.tryFire(SYNC); + } + return d; + } + + @SuppressWarnings("serial") + static final class BiAccept<T,U> extends BiCompletion<T,U,Void> { + BiConsumer<? super T,? super U> fn; + BiAccept(Executor executor, CompletableFuture<Void> dep, + CompletableFuture<T> src, CompletableFuture<U> snd, + BiConsumer<? super T,? super U> fn) { + super(executor, dep, src, snd); this.fn = fn; + } + final CompletableFuture<Void> tryFire(int mode) { + CompletableFuture<Void> d; + CompletableFuture<T> a; + CompletableFuture<U> b; + if ((d = dep) == null || + !d.biAccept(a = src, b = snd, fn, mode > 0 ? null : this)) + return null; + dep = null; src = null; snd = null; fn = null; + return d.postFire(a, b, mode); + } + } + + final <R,S> boolean biAccept(CompletableFuture<R> a, + CompletableFuture<S> b, + BiConsumer<? super R,? super S> f, + BiAccept<R,S> c) { + Object r, s; Throwable x; + if (a == null || (r = a.result) == null || + b == null || (s = b.result) == null || f == null) + return false; + tryComplete: if (result == null) { + if (r instanceof AltResult) { + if ((x = ((AltResult)r).ex) != null) { + completeThrowable(x, r); + break tryComplete; + } + r = null; + } + if (s instanceof AltResult) { + if ((x = ((AltResult)s).ex) != null) { + completeThrowable(x, s); + break tryComplete; + } + s = null; + } + try { + if (c != null && !c.claim()) + return false; + @SuppressWarnings("unchecked") R rr = (R) r; + @SuppressWarnings("unchecked") S ss = (S) s; + f.accept(rr, ss); + completeNull(); + } catch (Throwable ex) { + completeThrowable(ex); + } + } + return true; + } + + private <U> CompletableFuture<Void> biAcceptStage( + Executor e, CompletionStage<U> o, + BiConsumer<? super T,? super U> f) { + CompletableFuture<U> b; + if (f == null || (b = o.toCompletableFuture()) == null) + throw new NullPointerException(); + CompletableFuture<Void> d = new CompletableFuture<Void>(); + if (e != null || !d.biAccept(this, b, f, null)) { + BiAccept<T,U> c = new BiAccept<T,U>(e, d, this, b, f); + bipush(b, c); + c.tryFire(SYNC); + } + return d; + } - /** Number of processors, for spin control */ - static final int NCPU = Runtime.getRuntime().availableProcessors(); + @SuppressWarnings("serial") + static final class BiRun<T,U> extends BiCompletion<T,U,Void> { + Runnable fn; + BiRun(Executor executor, CompletableFuture<Void> dep, + CompletableFuture<T> src, + CompletableFuture<U> snd, + Runnable fn) { + super(executor, dep, src, snd); this.fn = fn; + } + final CompletableFuture<Void> tryFire(int mode) { + CompletableFuture<Void> d; + CompletableFuture<T> a; + CompletableFuture<U> b; + if ((d = dep) == null || + !d.biRun(a = src, b = snd, fn, mode > 0 ? null : this)) + return null; + dep = null; src = null; snd = null; fn = null; + return d.postFire(a, b, mode); + } + } + + final boolean biRun(CompletableFuture<?> a, CompletableFuture<?> b, + Runnable f, BiRun<?,?> c) { + Object r, s; Throwable x; + if (a == null || (r = a.result) == null || + b == null || (s = b.result) == null || f == null) + return false; + if (result == null) { + if (r instanceof AltResult && (x = ((AltResult)r).ex) != null) + completeThrowable(x, r); + else if (s instanceof AltResult && (x = ((AltResult)s).ex) != null) + completeThrowable(x, s); + else + try { + if (c != null && !c.claim()) + return false; + f.run(); + completeNull(); + } catch (Throwable ex) { + completeThrowable(ex); + } + } + return true; + } + + private CompletableFuture<Void> biRunStage(Executor e, CompletionStage<?> o, + Runnable f) { + CompletableFuture<?> b; + if (f == null || (b = o.toCompletableFuture()) == null) + throw new NullPointerException(); + CompletableFuture<Void> d = new CompletableFuture<Void>(); + if (e != null || !d.biRun(this, b, f, null)) { + BiRun<T,?> c = new BiRun<>(e, d, this, b, f); + bipush(b, c); + c.tryFire(SYNC); + } + return d; + } + + @SuppressWarnings("serial") + static final class BiRelay<T,U> extends BiCompletion<T,U,Void> { // for And + BiRelay(CompletableFuture<Void> dep, + CompletableFuture<T> src, + CompletableFuture<U> snd) { + super(null, dep, src, snd); + } + final CompletableFuture<Void> tryFire(int mode) { + CompletableFuture<Void> d; + CompletableFuture<T> a; + CompletableFuture<U> b; + if ((d = dep) == null || !d.biRelay(a = src, b = snd)) + return null; + src = null; snd = null; dep = null; + return d.postFire(a, b, mode); + } + } + + boolean biRelay(CompletableFuture<?> a, CompletableFuture<?> b) { + Object r, s; Throwable x; + if (a == null || (r = a.result) == null || + b == null || (s = b.result) == null) + return false; + if (result == null) { + if (r instanceof AltResult && (x = ((AltResult)r).ex) != null) + completeThrowable(x, r); + else if (s instanceof AltResult && (x = ((AltResult)s).ex) != null) + completeThrowable(x, s); + else + completeNull(); + } + return true; + } + + /** Recursively constructs a tree of completions. */ + static CompletableFuture<Void> andTree(CompletableFuture<?>[] cfs, + int lo, int hi) { + CompletableFuture<Void> d = new CompletableFuture<Void>(); + if (lo > hi) // empty + d.result = NIL; + else { + CompletableFuture<?> a, b; + int mid = (lo + hi) >>> 1; + if ((a = (lo == mid ? cfs[lo] : + andTree(cfs, lo, mid))) == null || + (b = (lo == hi ? a : (hi == mid+1) ? cfs[hi] : + andTree(cfs, mid+1, hi))) == null) + throw new NullPointerException(); + if (!d.biRelay(a, b)) { + BiRelay<?,?> c = new BiRelay<>(d, a, b); + a.bipush(b, c); + c.tryFire(SYNC); + } + } + return d; + } + + /* ------------- Projected (Ored) BiCompletions -------------- */ + + /** Pushes completion to this and b unless either done. */ + final void orpush(CompletableFuture<?> b, BiCompletion<?,?,?> c) { + if (c != null) { + while ((b == null || b.result == null) && result == null) { + if (tryPushStack(c)) { + if (b != null && b != this && b.result == null) { + Completion q = new CoCompletion(c); + while (result == null && b.result == null && + !b.tryPushStack(q)) + lazySetNext(q, null); // clear on failure + } + break; + } + lazySetNext(c, null); // clear on failure + } + } + } + + @SuppressWarnings("serial") + static final class OrApply<T,U extends T,V> extends BiCompletion<T,U,V> { + Function<? super T,? extends V> fn; + OrApply(Executor executor, CompletableFuture<V> dep, + CompletableFuture<T> src, + CompletableFuture<U> snd, + Function<? super T,? extends V> fn) { + super(executor, dep, src, snd); this.fn = fn; + } + final CompletableFuture<V> tryFire(int mode) { + CompletableFuture<V> d; + CompletableFuture<T> a; + CompletableFuture<U> b; + if ((d = dep) == null || + !d.orApply(a = src, b = snd, fn, mode > 0 ? null : this)) + return null; + dep = null; src = null; snd = null; fn = null; + return d.postFire(a, b, mode); + } + } - /** - * Heuristic spin value for waitingGet() before blocking on - * multiprocessors - */ - static final int SPINS = (NCPU > 1) ? 1 << 8 : 0; + final <R,S extends R> boolean orApply(CompletableFuture<R> a, + CompletableFuture<S> b, + Function<? super R, ? extends T> f, + OrApply<R,S,T> c) { + Object r; Throwable x; + if (a == null || b == null || + ((r = a.result) == null && (r = b.result) == null) || f == null) + return false; + tryComplete: if (result == null) { + try { + if (c != null && !c.claim()) + return false; + if (r instanceof AltResult) { + if ((x = ((AltResult)r).ex) != null) { + completeThrowable(x, r); + break tryComplete; + } + r = null; + } + @SuppressWarnings("unchecked") R rr = (R) r; + completeValue(f.apply(rr)); + } catch (Throwable ex) { + completeThrowable(ex); + } + } + return true; + } + + private <U extends T,V> CompletableFuture<V> orApplyStage( + Executor e, CompletionStage<U> o, + Function<? super T, ? extends V> f) { + CompletableFuture<U> b; + if (f == null || (b = o.toCompletableFuture()) == null) + throw new NullPointerException(); + CompletableFuture<V> d = new CompletableFuture<V>(); + if (e != null || !d.orApply(this, b, f, null)) { + OrApply<T,U,V> c = new OrApply<T,U,V>(e, d, this, b, f); + orpush(b, c); + c.tryFire(SYNC); + } + return d; + } + + @SuppressWarnings("serial") + static final class OrAccept<T,U extends T> extends BiCompletion<T,U,Void> { + Consumer<? super T> fn; + OrAccept(Executor executor, CompletableFuture<Void> dep, + CompletableFuture<T> src, + CompletableFuture<U> snd, + Consumer<? super T> fn) { + super(executor, dep, src, snd); this.fn = fn; + } + final CompletableFuture<Void> tryFire(int mode) { + CompletableFuture<Void> d; + CompletableFuture<T> a; + CompletableFuture<U> b; + if ((d = dep) == null || + !d.orAccept(a = src, b = snd, fn, mode > 0 ? null : this)) + return null; + dep = null; src = null; snd = null; fn = null; + return d.postFire(a, b, mode); + } + } + + final <R,S extends R> boolean orAccept(CompletableFuture<R> a, + CompletableFuture<S> b, + Consumer<? super R> f, + OrAccept<R,S> c) { + Object r; Throwable x; + if (a == null || b == null || + ((r = a.result) == null && (r = b.result) == null) || f == null) + return false; + tryComplete: if (result == null) { + try { + if (c != null && !c.claim()) + return false; + if (r instanceof AltResult) { + if ((x = ((AltResult)r).ex) != null) { + completeThrowable(x, r); + break tryComplete; + } + r = null; + } + @SuppressWarnings("unchecked") R rr = (R) r; + f.accept(rr); + completeNull(); + } catch (Throwable ex) { + completeThrowable(ex); + } + } + return true; + } + + private <U extends T> CompletableFuture<Void> orAcceptStage( + Executor e, CompletionStage<U> o, Consumer<? super T> f) { + CompletableFuture<U> b; + if (f == null || (b = o.toCompletableFuture()) == null) + throw new NullPointerException(); + CompletableFuture<Void> d = new CompletableFuture<Void>(); + if (e != null || !d.orAccept(this, b, f, null)) { + OrAccept<T,U> c = new OrAccept<T,U>(e, d, this, b, f); + orpush(b, c); + c.tryFire(SYNC); + } + return d; + } + + @SuppressWarnings("serial") + static final class OrRun<T,U> extends BiCompletion<T,U,Void> { + Runnable fn; + OrRun(Executor executor, CompletableFuture<Void> dep, + CompletableFuture<T> src, + CompletableFuture<U> snd, + Runnable fn) { + super(executor, dep, src, snd); this.fn = fn; + } + final CompletableFuture<Void> tryFire(int mode) { + CompletableFuture<Void> d; + CompletableFuture<T> a; + CompletableFuture<U> b; + if ((d = dep) == null || + !d.orRun(a = src, b = snd, fn, mode > 0 ? null : this)) + return null; + dep = null; src = null; snd = null; fn = null; + return d.postFire(a, b, mode); + } + } + + final boolean orRun(CompletableFuture<?> a, CompletableFuture<?> b, + Runnable f, OrRun<?,?> c) { + Object r; Throwable x; + if (a == null || b == null || + ((r = a.result) == null && (r = b.result) == null) || f == null) + return false; + if (result == null) { + try { + if (c != null && !c.claim()) + return false; + if (r instanceof AltResult && (x = ((AltResult)r).ex) != null) + completeThrowable(x, r); + else { + f.run(); + completeNull(); + } + } catch (Throwable ex) { + completeThrowable(ex); + } + } + return true; + } + + private CompletableFuture<Void> orRunStage(Executor e, CompletionStage<?> o, + Runnable f) { + CompletableFuture<?> b; + if (f == null || (b = o.toCompletableFuture()) == null) + throw new NullPointerException(); + CompletableFuture<Void> d = new CompletableFuture<Void>(); + if (e != null || !d.orRun(this, b, f, null)) { + OrRun<T,?> c = new OrRun<>(e, d, this, b, f); + orpush(b, c); + c.tryFire(SYNC); + } + return d; + } + + @SuppressWarnings("serial") + static final class OrRelay<T,U> extends BiCompletion<T,U,Object> { // for Or + OrRelay(CompletableFuture<Object> dep, CompletableFuture<T> src, + CompletableFuture<U> snd) { + super(null, dep, src, snd); + } + final CompletableFuture<Object> tryFire(int mode) { + CompletableFuture<Object> d; + CompletableFuture<T> a; + CompletableFuture<U> b; + if ((d = dep) == null || !d.orRelay(a = src, b = snd)) + return null; + src = null; snd = null; dep = null; + return d.postFire(a, b, mode); + } + } + + final boolean orRelay(CompletableFuture<?> a, CompletableFuture<?> b) { + Object r; + if (a == null || b == null || + ((r = a.result) == null && (r = b.result) == null)) + return false; + if (result == null) + completeRelay(r); + return true; + } + + /** Recursively constructs a tree of completions. */ + static CompletableFuture<Object> orTree(CompletableFuture<?>[] cfs, + int lo, int hi) { + CompletableFuture<Object> d = new CompletableFuture<Object>(); + if (lo <= hi) { + CompletableFuture<?> a, b; + int mid = (lo + hi) >>> 1; + if ((a = (lo == mid ? cfs[lo] : + orTree(cfs, lo, mid))) == null || + (b = (lo == hi ? a : (hi == mid+1) ? cfs[hi] : + orTree(cfs, mid+1, hi))) == null) + throw new NullPointerException(); + if (!d.orRelay(a, b)) { + OrRelay<?,?> c = new OrRelay<>(d, a, b); + a.orpush(b, c); + c.tryFire(SYNC); + } + } + return d; + } + + /* ------------- Zero-input Async forms -------------- */ + + @SuppressWarnings("serial") + static final class AsyncSupply<T> extends ForkJoinTask<Void> + implements Runnable, AsynchronousCompletionTask { + CompletableFuture<T> dep; Supplier<T> fn; + AsyncSupply(CompletableFuture<T> dep, Supplier<T> fn) { + this.dep = dep; this.fn = fn; + } + + public final Void getRawResult() { return null; } + public final void setRawResult(Void v) {} + public final boolean exec() { run(); return true; } + + public void run() { + CompletableFuture<T> d; Supplier<T> f; + if ((d = dep) != null && (f = fn) != null) { + dep = null; fn = null; + if (d.result == null) { + try { + d.completeValue(f.get()); + } catch (Throwable ex) { + d.completeThrowable(ex); + } + } + d.postComplete(); + } + } + } + + static <U> CompletableFuture<U> asyncSupplyStage(Executor e, + Supplier<U> f) { + if (f == null) throw new NullPointerException(); + CompletableFuture<U> d = new CompletableFuture<U>(); + e.execute(new AsyncSupply<U>(d, f)); + return d; + } + + @SuppressWarnings("serial") + static final class AsyncRun extends ForkJoinTask<Void> + implements Runnable, AsynchronousCompletionTask { + CompletableFuture<Void> dep; Runnable fn; + AsyncRun(CompletableFuture<Void> dep, Runnable fn) { + this.dep = dep; this.fn = fn; + } + + public final Void getRawResult() { return null; } + public final void setRawResult(Void v) {} + public final boolean exec() { run(); return true; } + + public void run() { + CompletableFuture<Void> d; Runnable f; + if ((d = dep) != null && (f = fn) != null) { + dep = null; fn = null; + if (d.result == null) { + try { + f.run(); + d.completeNull(); + } catch (Throwable ex) { + d.completeThrowable(ex); + } + } + d.postComplete(); + } + } + } + + static CompletableFuture<Void> asyncRunStage(Executor e, Runnable f) { + if (f == null) throw new NullPointerException(); + CompletableFuture<Void> d = new CompletableFuture<Void>(); + e.execute(new AsyncRun(d, f)); + return d; + } + + /* ------------- Signallers -------------- */ /** - * Linked nodes to record waiting threads in a Treiber stack. See - * other classes such as Phaser and SynchronousQueue for more - * detailed explanation. This class implements ManagedBlocker to - * avoid starvation when blocking actions pile up in - * ForkJoinPools. + * Completion for recording and releasing a waiting thread. This + * class implements ManagedBlocker to avoid starvation when + * blocking actions pile up in ForkJoinPools. */ - static final class WaitNode implements ForkJoinPool.ManagedBlocker { - long nanos; // wait time if timed - final long deadline; // non-zero if timed + @SuppressWarnings("serial") + static final class Signaller extends Completion + implements ForkJoinPool.ManagedBlocker { + long nanos; // wait time if timed + final long deadline; // non-zero if timed volatile int interruptControl; // > 0: interruptible, < 0: interrupted volatile Thread thread; - volatile WaitNode next; - WaitNode(boolean interruptible, long nanos, long deadline) { + + Signaller(boolean interruptible, long nanos, long deadline) { this.thread = Thread.currentThread(); this.interruptControl = interruptible ? 1 : 0; this.nanos = nanos; this.deadline = deadline; } + final CompletableFuture<?> tryFire(int ignore) { + Thread w; // no need to atomically claim + if ((w = thread) != null) { + thread = null; + LockSupport.unpark(w); + } + return null; + } public boolean isReleasable() { if (thread == null) return true; @@ -273,6 +1687,7 @@ LockSupport.parkNanos(this, nanos); return isReleasable(); } + final boolean isLive() { return thread != null; } } /** @@ -280,1832 +1695,89 @@ * interrupted. */ private Object waitingGet(boolean interruptible) { - WaitNode q = null; + Signaller q = null; boolean queued = false; - int spins = SPINS; - for (Object r;;) { - if ((r = result) != null) { - if (q != null) { // suppress unpark - q.thread = null; - if (q.interruptControl < 0) { - if (interruptible) { - removeWaiter(q); - return null; - } - Thread.currentThread().interrupt(); - } - } - postComplete(); // help release others - return r; - } + int spins = -1; + Object r; + while ((r = result) == null) { + if (spins < 0) + spins = (Runtime.getRuntime().availableProcessors() > 1) ? + 1 << 8 : 0; // Use brief spin-wait on multiprocessors else if (spins > 0) { - int rnd = ThreadLocalRandom.nextSecondarySeed(); - if (rnd == 0) - rnd = ThreadLocalRandom.current().nextInt(); - if (rnd >= 0) + if (ThreadLocalRandom.nextSecondarySeed() >= 0) --spins; } else if (q == null) - q = new WaitNode(interruptible, 0L, 0L); + q = new Signaller(interruptible, 0L, 0L); else if (!queued) - queued = UNSAFE.compareAndSwapObject(this, WAITERS, - q.next = waiters, q); + queued = tryPushStack(q); else if (interruptible && q.interruptControl < 0) { - removeWaiter(q); + q.thread = null; + cleanStack(); return null; } else if (q.thread != null && result == null) { try { ForkJoinPool.managedBlock(q); - } catch (InterruptedException ex) { - q.interruptControl = -1; - } - } - } - } - - /** - * Awaits completion or aborts on interrupt or timeout. - * - * @param nanos time to wait - * @return raw result - */ - private Object timedAwaitDone(long nanos) - throws InterruptedException, TimeoutException { - WaitNode q = null; - boolean queued = false; - for (Object r;;) { - if ((r = result) != null) { - if (q != null) { - q.thread = null; - if (q.interruptControl < 0) { - removeWaiter(q); - throw new InterruptedException(); - } - } - postComplete(); - return r; - } - else if (q == null) { - if (nanos <= 0L) - throw new TimeoutException(); - long d = System.nanoTime() + nanos; - q = new WaitNode(true, nanos, d == 0L ? 1L : d); // avoid 0 - } - else if (!queued) - queued = UNSAFE.compareAndSwapObject(this, WAITERS, - q.next = waiters, q); - else if (q.interruptControl < 0) { - removeWaiter(q); - throw new InterruptedException(); - } - else if (q.nanos <= 0L) { - if (result == null) { - removeWaiter(q); - throw new TimeoutException(); - } - } - else if (q.thread != null && result == null) { - try { - ForkJoinPool.managedBlock(q); - } catch (InterruptedException ex) { + } catch (InterruptedException ie) { q.interruptControl = -1; } } } - } - - /** - * Tries to unlink a timed-out or interrupted wait node to avoid - * accumulating garbage. Internal nodes are simply unspliced - * without CAS since it is harmless if they are traversed anyway - * by releasers. To avoid effects of unsplicing from already - * removed nodes, the list is retraversed in case of an apparent - * race. This is slow when there are a lot of nodes, but we don't - * expect lists to be long enough to outweigh higher-overhead - * schemes. - */ - private void removeWaiter(WaitNode node) { - if (node != null) { - node.thread = null; - retry: - for (;;) { // restart on removeWaiter race - for (WaitNode pred = null, q = waiters, s; q != null; q = s) { - s = q.next; - if (q.thread != null) - pred = q; - else if (pred != null) { - pred.next = s; - if (pred.thread == null) // check for race - continue retry; - } - else if (!UNSAFE.compareAndSwapObject(this, WAITERS, q, s)) - continue retry; - } - break; + if (q != null) { + q.thread = null; + if (q.interruptControl < 0) { + if (interruptible) + r = null; // report interruption + else + Thread.currentThread().interrupt(); } } - } - - /* ------------- Async tasks -------------- */ - - /** - * A marker interface identifying asynchronous tasks produced by - * {@code async} methods. This may be useful for monitoring, - * debugging, and tracking asynchronous activities. - * - * @since 1.8 - */ - public static interface AsynchronousCompletionTask { - } - - /** Base class can act as either FJ or plain Runnable */ - @SuppressWarnings("serial") - abstract static class Async extends ForkJoinTask<Void> - implements Runnable, AsynchronousCompletionTask { - public final Void getRawResult() { return null; } - public final void setRawResult(Void v) { } - public final void run() { exec(); } + postComplete(); + return r; } /** - * Starts the given async task using the given executor, unless - * the executor is ForkJoinPool.commonPool and it has been - * disabled, in which case starts a new thread. - */ - static void execAsync(Executor e, Async r) { - if (e == ForkJoinPool.commonPool() && - ForkJoinPool.getCommonPoolParallelism() <= 1) - new Thread(r).start(); - else - e.execute(r); - } - - static final class AsyncRun extends Async { - final Runnable fn; - final CompletableFuture<Void> dst; - AsyncRun(Runnable fn, CompletableFuture<Void> dst) { - this.fn = fn; this.dst = dst; - } - public final boolean exec() { - CompletableFuture<Void> d; Throwable ex; - if ((d = this.dst) != null && d.result == null) { - try { - fn.run(); - ex = null; - } catch (Throwable rex) { - ex = rex; - } - d.internalComplete(null, ex); - } - return true; - } - private static final long serialVersionUID = 5232453952276885070L; - } - - static final class AsyncSupply<U> extends Async { - final Supplier<U> fn; - final CompletableFuture<U> dst; - AsyncSupply(Supplier<U> fn, CompletableFuture<U> dst) { - this.fn = fn; this.dst = dst; - } - public final boolean exec() { - CompletableFuture<U> d; U u; Throwable ex; - if ((d = this.dst) != null && d.result == null) { - try { - u = fn.get(); - ex = null; - } catch (Throwable rex) { - ex = rex; - u = null; - } - d.internalComplete(u, ex); - } - return true; - } - private static final long serialVersionUID = 5232453952276885070L; - } - - static final class AsyncApply<T,U> extends Async { - final T arg; - final Function<? super T,? extends U> fn; - final CompletableFuture<U> dst; - AsyncApply(T arg, Function<? super T,? extends U> fn, - CompletableFuture<U> dst) { - this.arg = arg; this.fn = fn; this.dst = dst; - } - public final boolean exec() { - CompletableFuture<U> d; U u; Throwable ex; - if ((d = this.dst) != null && d.result == null) { - try { - u = fn.apply(arg); - ex = null; - } catch (Throwable rex) { - ex = rex; - u = null; - } - d.internalComplete(u, ex); - } - return true; - } - private static final long serialVersionUID = 5232453952276885070L; - } - - static final class AsyncCombine<T,U,V> extends Async { - final T arg1; - final U arg2; - final BiFunction<? super T,? super U,? extends V> fn; - final CompletableFuture<V> dst; - AsyncCombine(T arg1, U arg2, - BiFunction<? super T,? super U,? extends V> fn, - CompletableFuture<V> dst) { - this.arg1 = arg1; this.arg2 = arg2; this.fn = fn; this.dst = dst; - } - public final boolean exec() { - CompletableFuture<V> d; V v; Throwable ex; - if ((d = this.dst) != null && d.result == null) { - try { - v = fn.apply(arg1, arg2); - ex = null; - } catch (Throwable rex) { - ex = rex; - v = null; - } - d.internalComplete(v, ex); - } - return true; - } - private static final long serialVersionUID = 5232453952276885070L; - } - - static final class AsyncAccept<T> extends Async { - final T arg; - final Consumer<? super T> fn; - final CompletableFuture<?> dst; - AsyncAccept(T arg, Consumer<? super T> fn, - CompletableFuture<?> dst) { - this.arg = arg; this.fn = fn; this.dst = dst; - } - public final boolean exec() { - CompletableFuture<?> d; Throwable ex; - if ((d = this.dst) != null && d.result == null) { - try { - fn.accept(arg); - ex = null; - } catch (Throwable rex) { - ex = rex; - } - d.internalComplete(null, ex); - } - return true; - } - private static final long serialVersionUID = 5232453952276885070L; - } - - static final class AsyncAcceptBoth<T,U> extends Async { - final T arg1; - final U arg2; - final BiConsumer<? super T,? super U> fn; - final CompletableFuture<?> dst; - AsyncAcceptBoth(T arg1, U arg2, - BiConsumer<? super T,? super U> fn, - CompletableFuture<?> dst) { - this.arg1 = arg1; this.arg2 = arg2; this.fn = fn; this.dst = dst; - } - public final boolean exec() { - CompletableFuture<?> d; Throwable ex; - if ((d = this.dst) != null && d.result == null) { - try { - fn.accept(arg1, arg2); - ex = null; - } catch (Throwable rex) { - ex = rex; - } - d.internalComplete(null, ex); - } - return true; - } - private static final long serialVersionUID = 5232453952276885070L; - } - - static final class AsyncCompose<T,U> extends Async { - final T arg; - final Function<? super T, ? extends CompletionStage<U>> fn; - final CompletableFuture<U> dst; - AsyncCompose(T arg, - Function<? super T, ? extends CompletionStage<U>> fn, - CompletableFuture<U> dst) { - this.arg = arg; this.fn = fn; this.dst = dst; - } - public final boolean exec() { - CompletableFuture<U> d, fr; U u; Throwable ex; - if ((d = this.dst) != null && d.result == null) { - try { - CompletionStage<U> cs = fn.apply(arg); - fr = (cs == null) ? null : cs.toCompletableFuture(); - ex = (fr == null) ? new NullPointerException() : null; - } catch (Throwable rex) { - ex = rex; - fr = null; - } - if (ex != null) - u = null; - else { - Object r = fr.result; - if (r == null) - r = fr.waitingGet(false); - if (r instanceof AltResult) { - ex = ((AltResult)r).ex; - u = null; - } - else { - @SuppressWarnings("unchecked") U ur = (U) r; - u = ur; - } - } - d.internalComplete(u, ex); - } - return true; - } - private static final long serialVersionUID = 5232453952276885070L; - } - - static final class AsyncWhenComplete<T> extends Async { - final T arg1; - final Throwable arg2; - final BiConsumer<? super T,? super Throwable> fn; - final CompletableFuture<T> dst; - AsyncWhenComplete(T arg1, Throwable arg2, - BiConsumer<? super T,? super Throwable> fn, - CompletableFuture<T> dst) { - this.arg1 = arg1; this.arg2 = arg2; this.fn = fn; this.dst = dst; - } - public final boolean exec() { - CompletableFuture<T> d; - if ((d = this.dst) != null && d.result == null) { - Throwable ex = arg2; - try { - fn.accept(arg1, ex); - } catch (Throwable rex) { - if (ex == null) - ex = rex; - } - d.internalComplete(arg1, ex); - } - return true; - } - private static final long serialVersionUID = 5232453952276885070L; - } - - /* ------------- Completions -------------- */ - - /** - * Simple linked list nodes to record completions, used in - * basically the same way as WaitNodes. (We separate nodes from - * the Completions themselves mainly because for the And and Or - * methods, the same Completion object resides in two lists.) + * Returns raw result after waiting, or null if interrupted, or + * throws TimeoutException on timeout. */ - static final class CompletionNode { - final Completion completion; - volatile CompletionNode next; - CompletionNode(Completion completion) { this.completion = completion; } - } - - // Opportunistically subclass AtomicInteger to use compareAndSet to claim. - @SuppressWarnings("serial") - abstract static class Completion extends AtomicInteger implements Runnable { - } - - static final class ThenApply<T,U> extends Completion { - final CompletableFuture<? extends T> src; - final Function<? super T,? extends U> fn; - final CompletableFuture<U> dst; - final Executor executor; - ThenApply(CompletableFuture<? extends T> src, - Function<? super T,? extends U> fn, - CompletableFuture<U> dst, - Executor executor) { - this.src = src; this.fn = fn; this.dst = dst; - this.executor = executor; - } - public final void run() { - final CompletableFuture<? extends T> a; - final Function<? super T,? extends U> fn; - final CompletableFuture<U> dst; - Object r; T t; Throwable ex; - if ((dst = this.dst) != null && - (fn = this.fn) != null && - (a = this.src) != null && - (r = a.result) != null && - compareAndSet(0, 1)) { - if (r instanceof AltResult) { - ex = ((AltResult)r).ex; - t = null; - } - else { - ex = null; - @SuppressWarnings("unchecked") T tr = (T) r; - t = tr; - } - Executor e = executor; - U u = null; - if (ex == null) { - try { - if (e != null) - execAsync(e, new AsyncApply<T,U>(t, fn, dst)); - else - u = fn.apply(t); - } catch (Throwable rex) { - ex = rex; - } - } - if (e == null || ex != null) - dst.internalComplete(u, ex); - } - } - private static final long serialVersionUID = 5232453952276885070L; - } - - static final class ThenAccept<T> extends Completion { - final CompletableFuture<? extends T> src; - final Consumer<? super T> fn; - final CompletableFuture<?> dst; - final Executor executor; - ThenAccept(CompletableFuture<? extends T> src, - Consumer<? super T> fn, - CompletableFuture<?> dst, - Executor executor) { - this.src = src; this.fn = fn; this.dst = dst; - this.executor = executor; - } - public final void run() { - final CompletableFuture<? extends T> a; - final Consumer<? super T> fn; - final CompletableFuture<?> dst; - Object r; T t; Throwable ex; - if ((dst = this.dst) != null && - (fn = this.fn) != null && - (a = this.src) != null && - (r = a.result) != null && - compareAndSet(0, 1)) { - if (r instanceof AltResult) { - ex = ((AltResult)r).ex; - t = null; - } - else { - ex = null; - @SuppressWarnings("unchecked") T tr = (T) r; - t = tr; - } - Executor e = executor; - if (ex == null) { - try { - if (e != null) - execAsync(e, new AsyncAccept<T>(t, fn, dst)); - else - fn.accept(t); - } catch (Throwable rex) { - ex = rex; - } - } - if (e == null || ex != null) - dst.internalComplete(null, ex); - } - } - private static final long serialVersionUID = 5232453952276885070L; - } - - static final class ThenRun extends Completion { - final CompletableFuture<?> src; - final Runnable fn; - final CompletableFuture<Void> dst; - final Executor executor; - ThenRun(CompletableFuture<?> src, - Runnable fn, - CompletableFuture<Void> dst, - Executor executor) { - this.src = src; this.fn = fn; this.dst = dst; - this.executor = executor; - } - public final void run() { - final CompletableFuture<?> a; - final Runnable fn; - final CompletableFuture<Void> dst; - Object r; Throwable ex; - if ((dst = this.dst) != null && - (fn = this.fn) != null && - (a = this.src) != null && - (r = a.result) != null && - compareAndSet(0, 1)) { - if (r instanceof AltResult) - ex = ((AltResult)r).ex; - else - ex = null; - Executor e = executor; - if (ex == null) { - try { - if (e != null) - execAsync(e, new AsyncRun(fn, dst)); - else - fn.run(); - } catch (Throwable rex) { - ex = rex; - } - } - if (e == null || ex != null) - dst.internalComplete(null, ex); - } - } - private static final long serialVersionUID = 5232453952276885070L; - } - - static final class ThenCombine<T,U,V> extends Completion { - final CompletableFuture<? extends T> src; - final CompletableFuture<? extends U> snd; - final BiFunction<? super T,? super U,? extends V> fn; - final CompletableFuture<V> dst; - final Executor executor; - ThenCombine(CompletableFuture<? extends T> src, - CompletableFuture<? extends U> snd, - BiFunction<? super T,? super U,? extends V> fn, - CompletableFuture<V> dst, - Executor executor) { - this.src = src; this.snd = snd; - this.fn = fn; this.dst = dst; - this.executor = executor; - } - public final void run() { - final CompletableFuture<? extends T> a; - final CompletableFuture<? extends U> b; - final BiFunction<? super T,? super U,? extends V> fn; - final CompletableFuture<V> dst; - Object r, s; T t; U u; Throwable ex; - if ((dst = this.dst) != null && - (fn = this.fn) != null && - (a = this.src) != null && - (r = a.result) != null && - (b = this.snd) != null && - (s = b.result) != null && - compareAndSet(0, 1)) { - if (r instanceof AltResult) { - ex = ((AltResult)r).ex; - t = null; - } - else { - ex = null; - @SuppressWarnings("unchecked") T tr = (T) r; - t = tr; - } - if (ex != null) - u = null; - else if (s instanceof AltResult) { - ex = ((AltResult)s).ex; - u = null; - } - else { - @SuppressWarnings("unchecked") U us = (U) s; - u = us; - } - Executor e = executor; - V v = null; - if (ex == null) { - try { - if (e != null) - execAsync(e, new AsyncCombine<T,U,V>(t, u, fn, dst)); - else - v = fn.apply(t, u); - } catch (Throwable rex) { - ex = rex; - } - } - if (e == null || ex != null) - dst.internalComplete(v, ex); - } - } - private static final long serialVersionUID = 5232453952276885070L; - } - - static final class ThenAcceptBoth<T,U> extends Completion { - final CompletableFuture<? extends T> src; - final CompletableFuture<? extends U> snd; - final BiConsumer<? super T,? super U> fn; - final CompletableFuture<Void> dst; - final Executor executor; - ThenAcceptBoth(CompletableFuture<? extends T> src, - CompletableFuture<? extends U> snd, - BiConsumer<? super T,? super U> fn, - CompletableFuture<Void> dst, - Executor executor) { - this.src = src; this.snd = snd; - this.fn = fn; this.dst = dst; - this.executor = executor; - } - public final void run() { - final CompletableFuture<? extends T> a; - final CompletableFuture<? extends U> b; - final BiConsumer<? super T,? super U> fn; - final CompletableFuture<Void> dst; - Object r, s; T t; U u; Throwable ex; - if ((dst = this.dst) != null && - (fn = this.fn) != null && - (a = this.src) != null && - (r = a.result) != null && - (b = this.snd) != null && - (s = b.result) != null && - compareAndSet(0, 1)) { - if (r instanceof AltResult) { - ex = ((AltResult)r).ex; - t = null; - } - else { - ex = null; - @SuppressWarnings("unchecked") T tr = (T) r; - t = tr; - } - if (ex != null) - u = null; - else if (s instanceof AltResult) { - ex = ((AltResult)s).ex; - u = null; - } - else { - @SuppressWarnings("unchecked") U us = (U) s; - u = us; - } - Executor e = executor; - if (ex == null) { - try { - if (e != null) - execAsync(e, new AsyncAcceptBoth<T,U>(t, u, fn, dst)); - else - fn.accept(t, u); - } catch (Throwable rex) { - ex = rex; - } - } - if (e == null || ex != null) - dst.internalComplete(null, ex); - } - } - private static final long serialVersionUID = 5232453952276885070L; - } - - static final class RunAfterBoth extends Completion { - final CompletableFuture<?> src; - final CompletableFuture<?> snd; - final Runnable fn; - final CompletableFuture<Void> dst; - final Executor executor; - RunAfterBoth(CompletableFuture<?> src, - CompletableFuture<?> snd, - Runnable fn, - CompletableFuture<Void> dst, - Executor executor) { - this.src = src; this.snd = snd; - this.fn = fn; this.dst = dst; - this.executor = executor; - } - public final void run() { - final CompletableFuture<?> a; - final CompletableFuture<?> b; - final Runnable fn; - final CompletableFuture<Void> dst; - Object r, s; Throwable ex; - if ((dst = this.dst) != null && - (fn = this.fn) != null && - (a = this.src) != null && - (r = a.result) != null && - (b = this.snd) != null && - (s = b.result) != null && - compareAndSet(0, 1)) { - if (r instanceof AltResult) - ex = ((AltResult)r).ex; - else - ex = null; - if (ex == null && (s instanceof AltResult)) - ex = ((AltResult)s).ex; - Executor e = executor; - if (ex == null) { - try { - if (e != null) - execAsync(e, new AsyncRun(fn, dst)); - else - fn.run(); - } catch (Throwable rex) { - ex = rex; - } - } - if (e == null || ex != null) - dst.internalComplete(null, ex); - } - } - private static final long serialVersionUID = 5232453952276885070L; - } - - static final class AndCompletion extends Completion { - final CompletableFuture<?> src; - final CompletableFuture<?> snd; - final CompletableFuture<Void> dst; - AndCompletion(CompletableFuture<?> src, - CompletableFuture<?> snd, - CompletableFuture<Void> dst) { - this.src = src; this.snd = snd; this.dst = dst; - } - public final void run() { - final CompletableFuture<?> a; - final CompletableFuture<?> b; - final CompletableFuture<Void> dst; - Object r, s; Throwable ex; - if ((dst = this.dst) != null && - (a = this.src) != null && - (r = a.result) != null && - (b = this.snd) != null && - (s = b.result) != null && - compareAndSet(0, 1)) { - if (r instanceof AltResult) - ex = ((AltResult)r).ex; - else - ex = null; - if (ex == null && (s instanceof AltResult)) - ex = ((AltResult)s).ex; - dst.internalComplete(null, ex); + private Object timedGet(long nanos) throws TimeoutException { + if (Thread.interrupted()) + return null; + if (nanos <= 0L) + throw new TimeoutException(); + long d = System.nanoTime() + nanos; + Signaller q = new Signaller(true, nanos, d == 0L ? 1L : d); // avoid 0 + boolean queued = false; + Object r; + // We intentionally don't spin here (as waitingGet does) because + // the call to nanoTime() above acts much like a spin. + while ((r = result) == null) { + if (!queued) + queued = tryPushStack(q); + else if (q.interruptControl < 0 || q.nanos <= 0L) { + q.thread = null; + cleanStack(); + if (q.interruptControl < 0) + return null; + throw new TimeoutException(); } - } - private static final long serialVersionUID = 5232453952276885070L; - } - - static final class ApplyToEither<T,U> extends Completion { - final CompletableFuture<? extends T> src; - final CompletableFuture<? extends T> snd; - final Function<? super T,? extends U> fn; - final CompletableFuture<U> dst; - final Executor executor; - ApplyToEither(CompletableFuture<? extends T> src, - CompletableFuture<? extends T> snd, - Function<? super T,? extends U> fn, - CompletableFuture<U> dst, - Executor executor) { - this.src = src; this.snd = snd; - this.fn = fn; this.dst = dst; - this.executor = executor; - } - public final void run() { - final CompletableFuture<? extends T> a; - final CompletableFuture<? extends T> b; - final Function<? super T,? extends U> fn; - final CompletableFuture<U> dst; - Object r; T t; Throwable ex; - if ((dst = this.dst) != null && - (fn = this.fn) != null && - (((a = this.src) != null && (r = a.result) != null) || - ((b = this.snd) != null && (r = b.result) != null)) && - compareAndSet(0, 1)) { - if (r instanceof AltResult) { - ex = ((AltResult)r).ex; - t = null; - } - else { - ex = null; - @SuppressWarnings("unchecked") T tr = (T) r; - t = tr; - } - Executor e = executor; - U u = null; - if (ex == null) { - try { - if (e != null) - execAsync(e, new AsyncApply<T,U>(t, fn, dst)); - else - u = fn.apply(t); - } catch (Throwable rex) { - ex = rex; - } - } - if (e == null || ex != null) - dst.internalComplete(u, ex); - } - } - private static final long serialVersionUID = 5232453952276885070L; - } - - static final class AcceptEither<T> extends Completion { - final CompletableFuture<? extends T> src; - final CompletableFuture<? extends T> snd; - final Consumer<? super T> fn; - final CompletableFuture<Void> dst; - final Executor executor; - AcceptEither(CompletableFuture<? extends T> src, - CompletableFuture<? extends T> snd, - Consumer<? super T> fn, - CompletableFuture<Void> dst, - Executor executor) { - this.src = src; this.snd = snd; - this.fn = fn; this.dst = dst; - this.executor = executor; - } - public final void run() { - final CompletableFuture<? extends T> a; - final CompletableFuture<? extends T> b; - final Consumer<? super T> fn; - final CompletableFuture<Void> dst; - Object r; T t; Throwable ex; - if ((dst = this.dst) != null && - (fn = this.fn) != null && - (((a = this.src) != null && (r = a.result) != null) || - ((b = this.snd) != null && (r = b.result) != null)) && - compareAndSet(0, 1)) { - if (r instanceof AltResult) { - ex = ((AltResult)r).ex; - t = null; - } - else { - ex = null; - @SuppressWarnings("unchecked") T tr = (T) r; - t = tr; - } - Executor e = executor; - if (ex == null) { - try { - if (e != null) - execAsync(e, new AsyncAccept<T>(t, fn, dst)); - else - fn.accept(t); - } catch (Throwable rex) { - ex = rex; - } - } - if (e == null || ex != null) - dst.internalComplete(null, ex); - } - } - private static final long serialVersionUID = 5232453952276885070L; - } - - static final class RunAfterEither extends Completion { - final CompletableFuture<?> src; - final CompletableFuture<?> snd; - final Runnable fn; - final CompletableFuture<Void> dst; - final Executor executor; - RunAfterEither(CompletableFuture<?> src, - CompletableFuture<?> snd, - Runnable fn, - CompletableFuture<Void> dst, - Executor executor) { - this.src = src; this.snd = snd; - this.fn = fn; this.dst = dst; - this.executor = executor; - } - public final void run() { - final CompletableFuture<?> a; - final CompletableFuture<?> b; - final Runnable fn; - final CompletableFuture<Void> dst; - Object r; Throwable ex; - if ((dst = this.dst) != null && - (fn = this.fn) != null && - (((a = this.src) != null && (r = a.result) != null) || - ((b = this.snd) != null && (r = b.result) != null)) && - compareAndSet(0, 1)) { - if (r instanceof AltResult) - ex = ((AltResult)r).ex; - else - ex = null; - Executor e = executor; - if (ex == null) { - try { - if (e != null) - execAsync(e, new AsyncRun(fn, dst)); - else - fn.run(); - } catch (Throwable rex) { - ex = rex; - } - } - if (e == null || ex != null) - dst.internalComplete(null, ex); - } - } - private static final long serialVersionUID = 5232453952276885070L; - } - - static final class OrCompletion extends Completion { - final CompletableFuture<?> src; - final CompletableFuture<?> snd; - final CompletableFuture<Object> dst; - OrCompletion(CompletableFuture<?> src, - CompletableFuture<?> snd, - CompletableFuture<Object> dst) { - this.src = src; this.snd = snd; this.dst = dst; - } - public final void run() { - final CompletableFuture<?> a; - final CompletableFuture<?> b; - final CompletableFuture<Object> dst; - Object r, t; Throwable ex; - if ((dst = this.dst) != null && - (((a = this.src) != null && (r = a.result) != null) || - ((b = this.snd) != null && (r = b.result) != null)) && - compareAndSet(0, 1)) { - if (r instanceof AltResult) { - ex = ((AltResult)r).ex; - t = null; - } - else { - ex = null; - t = r; - } - dst.internalComplete(t, ex); - } - } - private static final long serialVersionUID = 5232453952276885070L; - } - - static final class ExceptionCompletion<T> extends Completion { - final CompletableFuture<? extends T> src; - final Function<? super Throwable, ? extends T> fn; - final CompletableFuture<T> dst; - ExceptionCompletion(CompletableFuture<? extends T> src, - Function<? super Throwable, ? extends T> fn, - CompletableFuture<T> dst) { - this.src = src; this.fn = fn; this.dst = dst; - } - public final void run() { - final CompletableFuture<? extends T> a; - final Function<? super Throwable, ? extends T> fn; - final CompletableFuture<T> dst; - Object r; T t = null; Throwable ex, dx = null; - if ((dst = this.dst) != null && - (fn = this.fn) != null && - (a = this.src) != null && - (r = a.result) != null && - compareAndSet(0, 1)) { - if ((r instanceof AltResult) && - (ex = ((AltResult)r).ex) != null) { - try { - t = fn.apply(ex); - } catch (Throwable rex) { - dx = rex; - } - } - else { - @SuppressWarnings("unchecked") T tr = (T) r; - t = tr; - } - dst.internalComplete(t, dx); - } - } - private static final long serialVersionUID = 5232453952276885070L; - } - - static final class WhenCompleteCompletion<T> extends Completion { - final CompletableFuture<? extends T> src; - final BiConsumer<? super T, ? super Throwable> fn; - final CompletableFuture<T> dst; - final Executor executor; - WhenCompleteCompletion(CompletableFuture<? extends T> src, - BiConsumer<? super T, ? super Throwable> fn, - CompletableFuture<T> dst, - Executor executor) { - this.src = src; this.fn = fn; this.dst = dst; - this.executor = executor; - } - public final void run() { - final CompletableFuture<? extends T> a; - final BiConsumer<? super T, ? super Throwable> fn; - final CompletableFuture<T> dst; - Object r; T t; Throwable ex; - if ((dst = this.dst) != null && - (fn = this.fn) != null && - (a = this.src) != null && - (r = a.result) != null && - compareAndSet(0, 1)) { - if (r instanceof AltResult) { - ex = ((AltResult)r).ex; - t = null; - } - else { - ex = null; - @SuppressWarnings("unchecked") T tr = (T) r; - t = tr; - } - Executor e = executor; - Throwable dx = null; + else if (q.thread != null && result == null) { try { - if (e != null) - execAsync(e, new AsyncWhenComplete<T>(t, ex, fn, dst)); - else - fn.accept(t, ex); - } catch (Throwable rex) { - dx = rex; - } - if (e == null || dx != null) - dst.internalComplete(t, ex != null ? ex : dx); - } - } - private static final long serialVersionUID = 5232453952276885070L; - } - - static final class ThenCopy<T> extends Completion { - final CompletableFuture<?> src; - final CompletableFuture<T> dst; - ThenCopy(CompletableFuture<?> src, - CompletableFuture<T> dst) { - this.src = src; this.dst = dst; - } - public final void run() { - final CompletableFuture<?> a; - final CompletableFuture<T> dst; - Object r; T t; Throwable ex; - if ((dst = this.dst) != null && - (a = this.src) != null && - (r = a.result) != null && - compareAndSet(0, 1)) { - if (r instanceof AltResult) { - ex = ((AltResult)r).ex; - t = null; - } - else { - ex = null; - @SuppressWarnings("unchecked") T tr = (T) r; - t = tr; - } - dst.internalComplete(t, ex); - } - } - private static final long serialVersionUID = 5232453952276885070L; - } - - // version of ThenCopy for CompletableFuture<Void> dst - static final class ThenPropagate extends Completion { - final CompletableFuture<?> src; - final CompletableFuture<Void> dst; - ThenPropagate(CompletableFuture<?> src, - CompletableFuture<Void> dst) { - this.src = src; this.dst = dst; - } - public final void run() { - final CompletableFuture<?> a; - final CompletableFuture<Void> dst; - Object r; Throwable ex; - if ((dst = this.dst) != null && - (a = this.src) != null && - (r = a.result) != null && - compareAndSet(0, 1)) { - if (r instanceof AltResult) - ex = ((AltResult)r).ex; - else - ex = null; - dst.internalComplete(null, ex); - } - } - private static final long serialVersionUID = 5232453952276885070L; - } - - static final class HandleCompletion<T,U> extends Completion { - final CompletableFuture<? extends T> src; - final BiFunction<? super T, Throwable, ? extends U> fn; - final CompletableFuture<U> dst; - final Executor executor; - HandleCompletion(CompletableFuture<? extends T> src, - BiFunction<? super T, Throwable, ? extends U> fn, - CompletableFuture<U> dst, - Executor executor) { - this.src = src; this.fn = fn; this.dst = dst; - this.executor = executor; - } - public final void run() { - final CompletableFuture<? extends T> a; - final BiFunction<? super T, Throwable, ? extends U> fn; - final CompletableFuture<U> dst; - Object r; T t; Throwable ex; - if ((dst = this.dst) != null && - (fn = this.fn) != null && - (a = this.src) != null && - (r = a.result) != null && - compareAndSet(0, 1)) { - if (r instanceof AltResult) { - ex = ((AltResult)r).ex; - t = null; - } - else { - ex = null; - @SuppressWarnings("unchecked") T tr = (T) r; - t = tr; - } - Executor e = executor; - U u = null; - Throwable dx = null; - try { - if (e != null) - execAsync(e, new AsyncCombine<T,Throwable,U>(t, ex, fn, dst)); - else - u = fn.apply(t, ex); - } catch (Throwable rex) { - dx = rex; - } - if (e == null || dx != null) - dst.internalComplete(u, dx); - } - } - private static final long serialVersionUID = 5232453952276885070L; - } - - static final class ThenCompose<T,U> extends Completion { - final CompletableFuture<? extends T> src; - final Function<? super T, ? extends CompletionStage<U>> fn; - final CompletableFuture<U> dst; - final Executor executor; - ThenCompose(CompletableFuture<? extends T> src, - Function<? super T, ? extends CompletionStage<U>> fn, - CompletableFuture<U> dst, - Executor executor) { - this.src = src; this.fn = fn; this.dst = dst; - this.executor = executor; - } - public final void run() { - final CompletableFuture<? extends T> a; - final Function<? super T, ? extends CompletionStage<U>> fn; - final CompletableFuture<U> dst; - Object r; T t; Throwable ex; Executor e; - if ((dst = this.dst) != null && - (fn = this.fn) != null && - (a = this.src) != null && - (r = a.result) != null && - compareAndSet(0, 1)) { - if (r instanceof AltResult) { - ex = ((AltResult)r).ex; - t = null; - } - else { - ex = null; - @SuppressWarnings("unchecked") T tr = (T) r; - t = tr; - } - CompletableFuture<U> c = null; - U u = null; - boolean complete = false; - if (ex == null) { - if ((e = executor) != null) - execAsync(e, new AsyncCompose<T,U>(t, fn, dst)); - else { - try { - CompletionStage<U> cs = fn.apply(t); - c = (cs == null) ? null : cs.toCompletableFuture(); - if (c == null) - ex = new NullPointerException(); - } catch (Throwable rex) { - ex = rex; - } - } - } - if (c != null) { - ThenCopy<U> d = null; - Object s; - if ((s = c.result) == null) { - CompletionNode p = new CompletionNode - (d = new ThenCopy<U>(c, dst)); - while ((s = c.result) == null) { - if (UNSAFE.compareAndSwapObject - (c, COMPLETIONS, p.next = c.completions, p)) - break; - } - } - if (s != null && (d == null || d.compareAndSet(0, 1))) { - complete = true; - if (s instanceof AltResult) { - ex = ((AltResult)s).ex; // no rewrap - u = null; - } - else { - @SuppressWarnings("unchecked") U us = (U) s; - u = us; - } - } - } - if (complete || ex != null) - dst.internalComplete(u, ex); - if (c != null) - c.helpPostComplete(); - } - } - private static final long serialVersionUID = 5232453952276885070L; - } - - // Implementations of stage methods with (plain, async, Executor) forms - - private <U> CompletableFuture<U> doThenApply - (Function<? super T,? extends U> fn, - Executor e) { - if (fn == null) throw new NullPointerException(); - CompletableFuture<U> dst = new CompletableFuture<U>(); - ThenApply<T,U> d = null; - Object r; - if ((r = result) == null) { - CompletionNode p = new CompletionNode - (d = new ThenApply<T,U>(this, fn, dst, e)); - while ((r = result) == null) { - if (UNSAFE.compareAndSwapObject - (this, COMPLETIONS, p.next = completions, p)) - break; - } - } - if (r != null && (d == null || d.compareAndSet(0, 1))) { - T t; Throwable ex; - if (r instanceof AltResult) { - ex = ((AltResult)r).ex; - t = null; - } - else { - ex = null; - @SuppressWarnings("unchecked") T tr = (T) r; - t = tr; - } - U u = null; - if (ex == null) { - try { - if (e != null) - execAsync(e, new AsyncApply<T,U>(t, fn, dst)); - else - u = fn.apply(t); - } catch (Throwable rex) { - ex = rex; - } - } - if (e == null || ex != null) - dst.internalComplete(u, ex); - } - helpPostComplete(); - return dst; - } - - private CompletableFuture<Void> doThenAccept(Consumer<? super T> fn, - Executor e) { - if (fn == null) throw new NullPointerException(); - CompletableFuture<Void> dst = new CompletableFuture<Void>(); - ThenAccept<T> d = null; - Object r; - if ((r = result) == null) { - CompletionNode p = new CompletionNode - (d = new ThenAccept<T>(this, fn, dst, e)); - while ((r = result) == null) { - if (UNSAFE.compareAndSwapObject - (this, COMPLETIONS, p.next = completions, p)) - break; - } - } - if (r != null && (d == null || d.compareAndSet(0, 1))) { - T t; Throwable ex; - if (r instanceof AltResult) { - ex = ((AltResult)r).ex; - t = null; - } - else { - ex = null; - @SuppressWarnings("unchecked") T tr = (T) r; - t = tr; - } - if (ex == null) { - try { - if (e != null) - execAsync(e, new AsyncAccept<T>(t, fn, dst)); - else - fn.accept(t); - } catch (Throwable rex) { - ex = rex; - } - } - if (e == null || ex != null) - dst.internalComplete(null, ex); - } - helpPostComplete(); - return dst; - } - - private CompletableFuture<Void> doThenRun(Runnable action, - Executor e) { - if (action == null) throw new NullPointerException(); - CompletableFuture<Void> dst = new CompletableFuture<Void>(); - ThenRun d = null; - Object r; - if ((r = result) == null) { - CompletionNode p = new CompletionNode - (d = new ThenRun(this, action, dst, e)); - while ((r = result) == null) { - if (UNSAFE.compareAndSwapObject - (this, COMPLETIONS, p.next = completions, p)) - break; - } - } - if (r != null && (d == null || d.compareAndSet(0, 1))) { - Throwable ex; - if (r instanceof AltResult) - ex = ((AltResult)r).ex; - else - ex = null; - if (ex == null) { - try { - if (e != null) - execAsync(e, new AsyncRun(action, dst)); - else - action.run(); - } catch (Throwable rex) { - ex = rex; - } - } - if (e == null || ex != null) - dst.internalComplete(null, ex); - } - helpPostComplete(); - return dst; - } - - private <U,V> CompletableFuture<V> doThenCombine - (CompletableFuture<? extends U> other, - BiFunction<? super T,? super U,? extends V> fn, - Executor e) { - if (other == null || fn == null) throw new NullPointerException(); - CompletableFuture<V> dst = new CompletableFuture<V>(); - ThenCombine<T,U,V> d = null; - Object r, s = null; - if ((r = result) == null || (s = other.result) == null) { - d = new ThenCombine<T,U,V>(this, other, fn, dst, e); - CompletionNode q = null, p = new CompletionNode(d); - while ((r == null && (r = result) == null) || - (s == null && (s = other.result) == null)) { - if (q != null) { - if (s != null || - UNSAFE.compareAndSwapObject - (other, COMPLETIONS, q.next = other.completions, q)) - break; - } - else if (r != null || - UNSAFE.compareAndSwapObject - (this, COMPLETIONS, p.next = completions, p)) { - if (s != null) - break; - q = new CompletionNode(d); + ForkJoinPool.managedBlock(q); + } catch (InterruptedException ie) { + q.interruptControl = -1; } } } - if (r != null && s != null && (d == null || d.compareAndSet(0, 1))) { - T t; U u; Throwable ex; - if (r instanceof AltResult) { - ex = ((AltResult)r).ex; - t = null; - } - else { - ex = null; - @SuppressWarnings("unchecked") T tr = (T) r; - t = tr; - } - if (ex != null) - u = null; - else if (s instanceof AltResult) { - ex = ((AltResult)s).ex; - u = null; - } - else { - @SuppressWarnings("unchecked") U us = (U) s; - u = us; - } - V v = null; - if (ex == null) { - try { - if (e != null) - execAsync(e, new AsyncCombine<T,U,V>(t, u, fn, dst)); - else - v = fn.apply(t, u); - } catch (Throwable rex) { - ex = rex; - } - } - if (e == null || ex != null) - dst.internalComplete(v, ex); - } - helpPostComplete(); - other.helpPostComplete(); - return dst; - } - - private <U> CompletableFuture<Void> doThenAcceptBoth - (CompletableFuture<? extends U> other, - BiConsumer<? super T,? super U> fn, - Executor e) { - if (other == null || fn == null) throw new NullPointerException(); - CompletableFuture<Void> dst = new CompletableFuture<Void>(); - ThenAcceptBoth<T,U> d = null; - Object r, s = null; - if ((r = result) == null || (s = other.result) == null) { - d = new ThenAcceptBoth<T,U>(this, other, fn, dst, e); - CompletionNode q = null, p = new CompletionNode(d); - while ((r == null && (r = result) == null) || - (s == null && (s = other.result) == null)) { - if (q != null) { - if (s != null || - UNSAFE.compareAndSwapObject - (other, COMPLETIONS, q.next = other.completions, q)) - break; - } - else if (r != null || - UNSAFE.compareAndSwapObject - (this, COMPLETIONS, p.next = completions, p)) { - if (s != null) - break; - q = new CompletionNode(d); - } - } - } - if (r != null && s != null && (d == null || d.compareAndSet(0, 1))) { - T t; U u; Throwable ex; - if (r instanceof AltResult) { - ex = ((AltResult)r).ex; - t = null; - } - else { - ex = null; - @SuppressWarnings("unchecked") T tr = (T) r; - t = tr; - } - if (ex != null) - u = null; - else if (s instanceof AltResult) { - ex = ((AltResult)s).ex; - u = null; - } - else { - @SuppressWarnings("unchecked") U us = (U) s; - u = us; - } - if (ex == null) { - try { - if (e != null) - execAsync(e, new AsyncAcceptBoth<T,U>(t, u, fn, dst)); - else - fn.accept(t, u); - } catch (Throwable rex) { - ex = rex; - } - } - if (e == null || ex != null) - dst.internalComplete(null, ex); - } - helpPostComplete(); - other.helpPostComplete(); - return dst; - } - - private CompletableFuture<Void> doRunAfterBoth(CompletableFuture<?> other, - Runnable action, - Executor e) { - if (other == null || action == null) throw new NullPointerException(); - CompletableFuture<Void> dst = new CompletableFuture<Void>(); - RunAfterBoth d = null; - Object r, s = null; - if ((r = result) == null || (s = other.result) == null) { - d = new RunAfterBoth(this, other, action, dst, e); - CompletionNode q = null, p = new CompletionNode(d); - while ((r == null && (r = result) == null) || - (s == null && (s = other.result) == null)) { - if (q != null) { - if (s != null || - UNSAFE.compareAndSwapObject - (other, COMPLETIONS, q.next = other.completions, q)) - break; - } - else if (r != null || - UNSAFE.compareAndSwapObject - (this, COMPLETIONS, p.next = completions, p)) { - if (s != null) - break; - q = new CompletionNode(d); - } - } - } - if (r != null && s != null && (d == null || d.compareAndSet(0, 1))) { - Throwable ex; - if (r instanceof AltResult) - ex = ((AltResult)r).ex; - else - ex = null; - if (ex == null && (s instanceof AltResult)) - ex = ((AltResult)s).ex; - if (ex == null) { - try { - if (e != null) - execAsync(e, new AsyncRun(action, dst)); - else - action.run(); - } catch (Throwable rex) { - ex = rex; - } - } - if (e == null || ex != null) - dst.internalComplete(null, ex); - } - helpPostComplete(); - other.helpPostComplete(); - return dst; - } - - private <U> CompletableFuture<U> doApplyToEither - (CompletableFuture<? extends T> other, - Function<? super T, U> fn, - Executor e) { - if (other == null || fn == null) throw new NullPointerException(); - CompletableFuture<U> dst = new CompletableFuture<U>(); - ApplyToEither<T,U> d = null; - Object r; - if ((r = result) == null && (r = other.result) == null) { - d = new ApplyToEither<T,U>(this, other, fn, dst, e); - CompletionNode q = null, p = new CompletionNode(d); - while ((r = result) == null && (r = other.result) == null) { - if (q != null) { - if (UNSAFE.compareAndSwapObject - (other, COMPLETIONS, q.next = other.completions, q)) - break; - } - else if (UNSAFE.compareAndSwapObject - (this, COMPLETIONS, p.next = completions, p)) - q = new CompletionNode(d); - } - } - if (r != null && (d == null || d.compareAndSet(0, 1))) { - T t; Throwable ex; - if (r instanceof AltResult) { - ex = ((AltResult)r).ex; - t = null; - } - else { - ex = null; - @SuppressWarnings("unchecked") T tr = (T) r; - t = tr; - } - U u = null; - if (ex == null) { - try { - if (e != null) - execAsync(e, new AsyncApply<T,U>(t, fn, dst)); - else - u = fn.apply(t); - } catch (Throwable rex) { - ex = rex; - } - } - if (e == null || ex != null) - dst.internalComplete(u, ex); - } - helpPostComplete(); - other.helpPostComplete(); - return dst; + if (q.interruptControl < 0) + r = null; + q.thread = null; + postComplete(); + return r; } - private CompletableFuture<Void> doAcceptEither - (CompletableFuture<? extends T> other, - Consumer<? super T> fn, - Executor e) { - if (other == null || fn == null) throw new NullPointerException(); - CompletableFuture<Void> dst = new CompletableFuture<Void>(); - AcceptEither<T> d = null; - Object r; - if ((r = result) == null && (r = other.result) == null) { - d = new AcceptEither<T>(this, other, fn, dst, e); - CompletionNode q = null, p = new CompletionNode(d); - while ((r = result) == null && (r = other.result) == null) { - if (q != null) { - if (UNSAFE.compareAndSwapObject - (other, COMPLETIONS, q.next = other.completions, q)) - break; - } - else if (UNSAFE.compareAndSwapObject - (this, COMPLETIONS, p.next = completions, p)) - q = new CompletionNode(d); - } - } - if (r != null && (d == null || d.compareAndSet(0, 1))) { - T t; Throwable ex; - if (r instanceof AltResult) { - ex = ((AltResult)r).ex; - t = null; - } - else { - ex = null; - @SuppressWarnings("unchecked") T tr = (T) r; - t = tr; - } - if (ex == null) { - try { - if (e != null) - execAsync(e, new AsyncAccept<T>(t, fn, dst)); - else - fn.accept(t); - } catch (Throwable rex) { - ex = rex; - } - } - if (e == null || ex != null) - dst.internalComplete(null, ex); - } - helpPostComplete(); - other.helpPostComplete(); - return dst; - } - - private CompletableFuture<Void> doRunAfterEither - (CompletableFuture<?> other, - Runnable action, - Executor e) { - if (other == null || action == null) throw new NullPointerException(); - CompletableFuture<Void> dst = new CompletableFuture<Void>(); - RunAfterEither d = null; - Object r; - if ((r = result) == null && (r = other.result) == null) { - d = new RunAfterEither(this, other, action, dst, e); - CompletionNode q = null, p = new CompletionNode(d); - while ((r = result) == null && (r = other.result) == null) { - if (q != null) { - if (UNSAFE.compareAndSwapObject - (other, COMPLETIONS, q.next = other.completions, q)) - break; - } - else if (UNSAFE.compareAndSwapObject - (this, COMPLETIONS, p.next = completions, p)) - q = new CompletionNode(d); - } - } - if (r != null && (d == null || d.compareAndSet(0, 1))) { - Throwable ex; - if (r instanceof AltResult) - ex = ((AltResult)r).ex; - else - ex = null; - if (ex == null) { - try { - if (e != null) - execAsync(e, new AsyncRun(action, dst)); - else - action.run(); - } catch (Throwable rex) { - ex = rex; - } - } - if (e == null || ex != null) - dst.internalComplete(null, ex); - } - helpPostComplete(); - other.helpPostComplete(); - return dst; - } - - private <U> CompletableFuture<U> doThenCompose - (Function<? super T, ? extends CompletionStage<U>> fn, - Executor e) { - if (fn == null) throw new NullPointerException(); - CompletableFuture<U> dst = null; - ThenCompose<T,U> d = null; - Object r; - if ((r = result) == null) { - dst = new CompletableFuture<U>(); - CompletionNode p = new CompletionNode - (d = new ThenCompose<T,U>(this, fn, dst, e)); - while ((r = result) == null) { - if (UNSAFE.compareAndSwapObject - (this, COMPLETIONS, p.next = completions, p)) - break; - } - } - if (r != null && (d == null || d.compareAndSet(0, 1))) { - T t; Throwable ex; - if (r instanceof AltResult) { - ex = ((AltResult)r).ex; - t = null; - } - else { - ex = null; - @SuppressWarnings("unchecked") T tr = (T) r; - t = tr; - } - if (ex == null) { - if (e != null) { - if (dst == null) - dst = new CompletableFuture<U>(); - execAsync(e, new AsyncCompose<T,U>(t, fn, dst)); - } - else { - try { - CompletionStage<U> cs = fn.apply(t); - if (cs == null || - (dst = cs.toCompletableFuture()) == null) - ex = new NullPointerException(); - } catch (Throwable rex) { - ex = rex; - } - } - } - if (dst == null) - dst = new CompletableFuture<U>(); - if (ex != null) - dst.internalComplete(null, ex); - } - helpPostComplete(); - dst.helpPostComplete(); - return dst; - } - - private CompletableFuture<T> doWhenComplete - (BiConsumer<? super T, ? super Throwable> fn, - Executor e) { - if (fn == null) throw new NullPointerException(); - CompletableFuture<T> dst = new CompletableFuture<T>(); - WhenCompleteCompletion<T> d = null; - Object r; - if ((r = result) == null) { - CompletionNode p = - new CompletionNode(d = new WhenCompleteCompletion<T> - (this, fn, dst, e)); - while ((r = result) == null) { - if (UNSAFE.compareAndSwapObject(this, COMPLETIONS, - p.next = completions, p)) - break; - } - } - if (r != null && (d == null || d.compareAndSet(0, 1))) { - T t; Throwable ex; - if (r instanceof AltResult) { - ex = ((AltResult)r).ex; - t = null; - } - else { - ex = null; - @SuppressWarnings("unchecked") T tr = (T) r; - t = tr; - } - Throwable dx = null; - try { - if (e != null) - execAsync(e, new AsyncWhenComplete<T>(t, ex, fn, dst)); - else - fn.accept(t, ex); - } catch (Throwable rex) { - dx = rex; - } - if (e == null || dx != null) - dst.internalComplete(t, ex != null ? ex : dx); - } - helpPostComplete(); - return dst; - } - - private <U> CompletableFuture<U> doHandle - (BiFunction<? super T, Throwable, ? extends U> fn, - Executor e) { - if (fn == null) throw new NullPointerException(); - CompletableFuture<U> dst = new CompletableFuture<U>(); - HandleCompletion<T,U> d = null; - Object r; - if ((r = result) == null) { - CompletionNode p = - new CompletionNode(d = new HandleCompletion<T,U> - (this, fn, dst, e)); - while ((r = result) == null) { - if (UNSAFE.compareAndSwapObject(this, COMPLETIONS, - p.next = completions, p)) - break; - } - } - if (r != null && (d == null || d.compareAndSet(0, 1))) { - T t; Throwable ex; - if (r instanceof AltResult) { - ex = ((AltResult)r).ex; - t = null; - } - else { - ex = null; - @SuppressWarnings("unchecked") T tr = (T) r; - t = tr; - } - U u = null; - Throwable dx = null; - try { - if (e != null) - execAsync(e, new AsyncCombine<T,Throwable,U>(t, ex, fn, dst)); - else { - u = fn.apply(t, ex); - dx = null; - } - } catch (Throwable rex) { - dx = rex; - u = null; - } - if (e == null || dx != null) - dst.internalComplete(u, dx); - } - helpPostComplete(); - return dst; - } - - - // public methods + /* ------------- public methods -------------- */ /** * Creates a new incomplete CompletableFuture. @@ -2114,6 +1786,13 @@ } /** + * Creates a new complete CompletableFuture with given encoded result. + */ + private CompletableFuture(Object r) { + this.result = r; + } + + /** * Returns a new CompletableFuture that is asynchronously completed * by a task running in the {@link ForkJoinPool#commonPool()} with * the value obtained by calling the given Supplier. @@ -2124,10 +1803,7 @@ * @return the new CompletableFuture */ public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) { - if (supplier == null) throw new NullPointerException(); - CompletableFuture<U> f = new CompletableFuture<U>(); - execAsync(ForkJoinPool.commonPool(), new AsyncSupply<U>(supplier, f)); - return f; + return asyncSupplyStage(asyncPool, supplier); } /** @@ -2143,11 +1819,7 @@ */ public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor) { - if (executor == null || supplier == null) - throw new NullPointerException(); - CompletableFuture<U> f = new CompletableFuture<U>(); - execAsync(executor, new AsyncSupply<U>(supplier, f)); - return f; + return asyncSupplyStage(screenExecutor(executor), supplier); } /** @@ -2160,10 +1832,7 @@ * @return the new CompletableFuture */ public static CompletableFuture<Void> runAsync(Runnable runnable) { - if (runnable == null) throw new NullPointerException(); - CompletableFuture<Void> f = new CompletableFuture<Void>(); - execAsync(ForkJoinPool.commonPool(), new AsyncRun(runnable, f)); - return f; + return asyncRunStage(asyncPool, runnable); } /** @@ -2178,11 +1847,7 @@ */ public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor) { - if (executor == null || runnable == null) - throw new NullPointerException(); - CompletableFuture<Void> f = new CompletableFuture<Void>(); - execAsync(executor, new AsyncRun(runnable, f)); - return f; + return asyncRunStage(screenExecutor(executor), runnable); } /** @@ -2194,9 +1859,7 @@ * @return the completed CompletableFuture */ public static <U> CompletableFuture<U> completedFuture(U value) { - CompletableFuture<U> f = new CompletableFuture<U>(); - f.result = (value == null) ? NIL : value; - return f; + return new CompletableFuture<U>((value == null) ? NIL : value); } /** @@ -2220,21 +1883,8 @@ * while waiting */ public T get() throws InterruptedException, ExecutionException { - Object r; Throwable ex, cause; - if ((r = result) == null && (r = waitingGet(true)) == null) - throw new InterruptedException(); - if (!(r instanceof AltResult)) { - @SuppressWarnings("unchecked") T tr = (T) r; - return tr; - } - if ((ex = ((AltResult)r).ex) == null) - return null; - if (ex instanceof CancellationException) - throw (CancellationException)ex; - if ((ex instanceof CompletionException) && - (cause = ex.getCause()) != null) - ex = cause; - throw new ExecutionException(ex); + Object r; + return reportGet((r = result) == null ? waitingGet(true) : r); } /** @@ -2252,24 +1902,9 @@ */ public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { - Object r; Throwable ex, cause; + Object r; long nanos = unit.toNanos(timeout); - if (Thread.interrupted()) - throw new InterruptedException(); - if ((r = result) == null) - r = timedAwaitDone(nanos); - if (!(r instanceof AltResult)) { - @SuppressWarnings("unchecked") T tr = (T) r; - return tr; - } - if ((ex = ((AltResult)r).ex) == null) - return null; - if (ex instanceof CancellationException) - throw (CancellationException)ex; - if ((ex instanceof CompletionException) && - (cause = ex.getCause()) != null) - ex = cause; - throw new ExecutionException(ex); + return reportGet((r = result) == null ? timedGet(nanos) : r); } /** @@ -2287,20 +1922,8 @@ * exceptionally or a completion computation threw an exception */ public T join() { - Object r; Throwable ex; - if ((r = result) == null) - r = waitingGet(false); - if (!(r instanceof AltResult)) { - @SuppressWarnings("unchecked") T tr = (T) r; - return tr; - } - if ((ex = ((AltResult)r).ex) == null) - return null; - if (ex instanceof CancellationException) - throw (CancellationException)ex; - if (ex instanceof CompletionException) - throw (CompletionException)ex; - throw new CompletionException(ex); + Object r; + return reportJoin((r = result) == null ? waitingGet(false) : r); } /** @@ -2314,20 +1937,8 @@ * exceptionally or a completion computation threw an exception */ public T getNow(T valueIfAbsent) { - Object r; Throwable ex; - if ((r = result) == null) - return valueIfAbsent; - if (!(r instanceof AltResult)) { - @SuppressWarnings("unchecked") T tr = (T) r; - return tr; - } - if ((ex = ((AltResult)r).ex) == null) - return null; - if (ex instanceof CancellationException) - throw (CancellationException)ex; - if (ex instanceof CompletionException) - throw (CompletionException)ex; - throw new CompletionException(ex); + Object r; + return ((r = result) == null) ? valueIfAbsent : reportJoin(r); } /** @@ -2339,9 +1950,7 @@ * to transition to a completed state, else {@code false} */ public boolean complete(T value) { - boolean triggered = result == null && - UNSAFE.compareAndSwapObject(this, RESULT, null, - value == null ? NIL : value); + boolean triggered = completeValue(value); postComplete(); return triggered; } @@ -2356,244 +1965,200 @@ */ public boolean completeExceptionally(Throwable ex) { if (ex == null) throw new NullPointerException(); - boolean triggered = result == null && - UNSAFE.compareAndSwapObject(this, RESULT, null, new AltResult(ex)); + boolean triggered = internalComplete(new AltResult(ex)); postComplete(); return triggered; } - // CompletionStage methods - - public <U> CompletableFuture<U> thenApply - (Function<? super T,? extends U> fn) { - return doThenApply(fn, null); + public <U> CompletableFuture<U> thenApply( + Function<? super T,? extends U> fn) { + return uniApplyStage(null, fn); } - public <U> CompletableFuture<U> thenApplyAsync - (Function<? super T,? extends U> fn) { - return doThenApply(fn, ForkJoinPool.commonPool()); + public <U> CompletableFuture<U> thenApplyAsync( + Function<? super T,? extends U> fn) { + return uniApplyStage(asyncPool, fn); } - public <U> CompletableFuture<U> thenApplyAsync - (Function<? super T,? extends U> fn, - Executor executor) { - if (executor == null) throw new NullPointerException(); - return doThenApply(fn, executor); + public <U> CompletableFuture<U> thenApplyAsync( + Function<? super T,? extends U> fn, Executor executor) { + return uniApplyStage(screenExecutor(executor), fn); } - public CompletableFuture<Void> thenAccept - (Consumer<? super T> action) { - return doThenAccept(action, null); + public CompletableFuture<Void> thenAccept(Consumer<? super T> action) { + return uniAcceptStage(null, action); } - public CompletableFuture<Void> thenAcceptAsync - (Consumer<? super T> action) { - return doThenAccept(action, ForkJoinPool.commonPool()); + public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action) { + return uniAcceptStage(asyncPool, action); } - public CompletableFuture<Void> thenAcceptAsync - (Consumer<? super T> action, - Executor executor) { - if (executor == null) throw new NullPointerException(); - return doThenAccept(action, executor); + public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action, + Executor executor) { + return uniAcceptStage(screenExecutor(executor), action); } - public CompletableFuture<Void> thenRun - (Runnable action) { - return doThenRun(action, null); + public CompletableFuture<Void> thenRun(Runnable action) { + return uniRunStage(null, action); } - public CompletableFuture<Void> thenRunAsync - (Runnable action) { - return doThenRun(action, ForkJoinPool.commonPool()); + public CompletableFuture<Void> thenRunAsync(Runnable action) { + return uniRunStage(asyncPool, action); } - public CompletableFuture<Void> thenRunAsync - (Runnable action, - Executor executor) { - if (executor == null) throw new NullPointerException(); - return doThenRun(action, executor); + public CompletableFuture<Void> thenRunAsync(Runnable action, + Executor executor) { + return uniRunStage(screenExecutor(executor), action); } - public <U,V> CompletableFuture<V> thenCombine - (CompletionStage<? extends U> other, - BiFunction<? super T,? super U,? extends V> fn) { - return doThenCombine(other.toCompletableFuture(), fn, null); + public <U,V> CompletableFuture<V> thenCombine( + CompletionStage<? extends U> other, + BiFunction<? super T,? super U,? extends V> fn) { + return biApplyStage(null, other, fn); } - public <U,V> CompletableFuture<V> thenCombineAsync - (CompletionStage<? extends U> other, - BiFunction<? super T,? super U,? extends V> fn) { - return doThenCombine(other.toCompletableFuture(), fn, - ForkJoinPool.commonPool()); + public <U,V> CompletableFuture<V> thenCombineAsync( + CompletionStage<? extends U> other, + BiFunction<? super T,? super U,? extends V> fn) { + return biApplyStage(asyncPool, other, fn); } - public <U,V> CompletableFuture<V> thenCombineAsync - (CompletionStage<? extends U> other, - BiFunction<? super T,? super U,? extends V> fn, - Executor executor) { - if (executor == null) throw new NullPointerException(); - return doThenCombine(other.toCompletableFuture(), fn, executor); + public <U,V> CompletableFuture<V> thenCombineAsync( + CompletionStage<? extends U> other, + BiFunction<? super T,? super U,? extends V> fn, Executor executor) { + return biApplyStage(screenExecutor(executor), other, fn); } - public <U> CompletableFuture<Void> thenAcceptBoth - (CompletionStage<? extends U> other, - BiConsumer<? super T, ? super U> action) { - return doThenAcceptBoth(other.toCompletableFuture(), action, null); + public <U> CompletableFuture<Void> thenAcceptBoth( + CompletionStage<? extends U> other, + BiConsumer<? super T, ? super U> action) { + return biAcceptStage(null, other, action); } - public <U> CompletableFuture<Void> thenAcceptBothAsync - (CompletionStage<? extends U> other, - BiConsumer<? super T, ? super U> action) { - return doThenAcceptBoth(other.toCompletableFuture(), action, - ForkJoinPool.commonPool()); + public <U> CompletableFuture<Void> thenAcceptBothAsync( + CompletionStage<? extends U> other, + BiConsumer<? super T, ? super U> action) { + return biAcceptStage(asyncPool, other, action); } - public <U> CompletableFuture<Void> thenAcceptBothAsync - (CompletionStage<? extends U> other, - BiConsumer<? super T, ? super U> action, - Executor executor) { - if (executor == null) throw new NullPointerException(); - return doThenAcceptBoth(other.toCompletableFuture(), action, executor); - } - - public CompletableFuture<Void> runAfterBoth - (CompletionStage<?> other, - Runnable action) { - return doRunAfterBoth(other.toCompletableFuture(), action, null); + public <U> CompletableFuture<Void> thenAcceptBothAsync( + CompletionStage<? extends U> other, + BiConsumer<? super T, ? super U> action, Executor executor) { + return biAcceptStage(screenExecutor(executor), other, action); } - public CompletableFuture<Void> runAfterBothAsync - (CompletionStage<?> other, - Runnable action) { - return doRunAfterBoth(other.toCompletableFuture(), action, - ForkJoinPool.commonPool()); + public CompletableFuture<Void> runAfterBoth(CompletionStage<?> other, + Runnable action) { + return biRunStage(null, other, action); } - public CompletableFuture<Void> runAfterBothAsync - (CompletionStage<?> other, - Runnable action, - Executor executor) { - if (executor == null) throw new NullPointerException(); - return doRunAfterBoth(other.toCompletableFuture(), action, executor); + public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other, + Runnable action) { + return biRunStage(asyncPool, other, action); } - - public <U> CompletableFuture<U> applyToEither - (CompletionStage<? extends T> other, - Function<? super T, U> fn) { - return doApplyToEither(other.toCompletableFuture(), fn, null); + public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other, + Runnable action, + Executor executor) { + return biRunStage(screenExecutor(executor), other, action); } - public <U> CompletableFuture<U> applyToEitherAsync - (CompletionStage<? extends T> other, - Function<? super T, U> fn) { - return doApplyToEither(other.toCompletableFuture(), fn, - ForkJoinPool.commonPool()); + public <U> CompletableFuture<U> applyToEither( + CompletionStage<? extends T> other, Function<? super T, U> fn) { + return orApplyStage(null, other, fn); + } + + public <U> CompletableFuture<U> applyToEitherAsync( + CompletionStage<? extends T> other, Function<? super T, U> fn) { + return orApplyStage(asyncPool, other, fn); } - public <U> CompletableFuture<U> applyToEitherAsync - (CompletionStage<? extends T> other, - Function<? super T, U> fn, - Executor executor) { - if (executor == null) throw new NullPointerException(); - return doApplyToEither(other.toCompletableFuture(), fn, executor); - } - - public CompletableFuture<Void> acceptEither - (CompletionStage<? extends T> other, - Consumer<? super T> action) { - return doAcceptEither(other.toCompletableFuture(), action, null); + public <U> CompletableFuture<U> applyToEitherAsync( + CompletionStage<? extends T> other, Function<? super T, U> fn, + Executor executor) { + return orApplyStage(screenExecutor(executor), other, fn); } - public CompletableFuture<Void> acceptEitherAsync - (CompletionStage<? extends T> other, - Consumer<? super T> action) { - return doAcceptEither(other.toCompletableFuture(), action, - ForkJoinPool.commonPool()); + public CompletableFuture<Void> acceptEither( + CompletionStage<? extends T> other, Consumer<? super T> action) { + return orAcceptStage(null, other, action); } - public CompletableFuture<Void> acceptEitherAsync - (CompletionStage<? extends T> other, - Consumer<? super T> action, - Executor executor) { - if (executor == null) throw new NullPointerException(); - return doAcceptEither(other.toCompletableFuture(), action, executor); + public CompletableFuture<Void> acceptEitherAsync( + CompletionStage<? extends T> other, Consumer<? super T> action) { + return orAcceptStage(asyncPool, other, action); + } + + public CompletableFuture<Void> acceptEitherAsync( + CompletionStage<? extends T> other, Consumer<? super T> action, + Executor executor) { + return orAcceptStage(screenExecutor(executor), other, action); } public CompletableFuture<Void> runAfterEither(CompletionStage<?> other, Runnable action) { - return doRunAfterEither(other.toCompletableFuture(), action, null); + return orRunStage(null, other, action); } - public CompletableFuture<Void> runAfterEitherAsync - (CompletionStage<?> other, - Runnable action) { - return doRunAfterEither(other.toCompletableFuture(), action, - ForkJoinPool.commonPool()); + public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other, + Runnable action) { + return orRunStage(asyncPool, other, action); } - public CompletableFuture<Void> runAfterEitherAsync - (CompletionStage<?> other, - Runnable action, - Executor executor) { - if (executor == null) throw new NullPointerException(); - return doRunAfterEither(other.toCompletableFuture(), action, executor); + public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other, + Runnable action, + Executor executor) { + return orRunStage(screenExecutor(executor), other, action); } - public <U> CompletableFuture<U> thenCompose - (Function<? super T, ? extends CompletionStage<U>> fn) { - return doThenCompose(fn, null); + public <U> CompletableFuture<U> thenCompose( + Function<? super T, ? extends CompletionStage<U>> fn) { + return uniComposeStage(null, fn); } - public <U> CompletableFuture<U> thenComposeAsync - (Function<? super T, ? extends CompletionStage<U>> fn) { - return doThenCompose(fn, ForkJoinPool.commonPool()); + public <U> CompletableFuture<U> thenComposeAsync( + Function<? super T, ? extends CompletionStage<U>> fn) { + return uniComposeStage(asyncPool, fn); } - public <U> CompletableFuture<U> thenComposeAsync - (Function<? super T, ? extends CompletionStage<U>> fn, - Executor executor) { - if (executor == null) throw new NullPointerException(); - return doThenCompose(fn, executor); + public <U> CompletableFuture<U> thenComposeAsync( + Function<? super T, ? extends CompletionStage<U>> fn, + Executor executor) { + return uniComposeStage(screenExecutor(executor), fn); } - public CompletableFuture<T> whenComplete - (BiConsumer<? super T, ? super Throwable> action) { - return doWhenComplete(action, null); + public CompletableFuture<T> whenComplete( + BiConsumer<? super T, ? super Throwable> action) { + return uniWhenCompleteStage(null, action); } - public CompletableFuture<T> whenCompleteAsync - (BiConsumer<? super T, ? super Throwable> action) { - return doWhenComplete(action, ForkJoinPool.commonPool()); + public CompletableFuture<T> whenCompleteAsync( + BiConsumer<? super T, ? super Throwable> action) { + return uniWhenCompleteStage(asyncPool, action); } - public CompletableFuture<T> whenCompleteAsync - (BiConsumer<? super T, ? super Throwable> action, - Executor executor) { - if (executor == null) throw new NullPointerException(); - return doWhenComplete(action, executor); + public CompletableFuture<T> whenCompleteAsync( + BiConsumer<? super T, ? super Throwable> action, Executor executor) { + return uniWhenCompleteStage(screenExecutor(executor), action); } - public <U> CompletableFuture<U> handle - (BiFunction<? super T, Throwable, ? extends U> fn) { - return doHandle(fn, null); + public <U> CompletableFuture<U> handle( + BiFunction<? super T, Throwable, ? extends U> fn) { + return uniHandleStage(null, fn); } - public <U> CompletableFuture<U> handleAsync - (BiFunction<? super T, Throwable, ? extends U> fn) { - return doHandle(fn, ForkJoinPool.commonPool()); + public <U> CompletableFuture<U> handleAsync( + BiFunction<? super T, Throwable, ? extends U> fn) { + return uniHandleStage(asyncPool, fn); } - public <U> CompletableFuture<U> handleAsync - (BiFunction<? super T, Throwable, ? extends U> fn, - Executor executor) { - if (executor == null) throw new NullPointerException(); - return doHandle(fn, executor); + public <U> CompletableFuture<U> handleAsync( + BiFunction<? super T, Throwable, ? extends U> fn, Executor executor) { + return uniHandleStage(screenExecutor(executor), fn); } /** - * Returns this CompletableFuture + * Returns this CompletableFuture. * * @return this CompletableFuture */ @@ -2618,52 +2183,13 @@ * exceptionally * @return the new CompletableFuture */ - public CompletableFuture<T> exceptionally - (Function<Throwable, ? extends T> fn) { - if (fn == null) throw new NullPointerException(); - CompletableFuture<T> dst = new CompletableFuture<T>(); - ExceptionCompletion<T> d = null; - Object r; - if ((r = result) == null) { - CompletionNode p = - new CompletionNode(d = new ExceptionCompletion<T> - (this, fn, dst)); - while ((r = result) == null) { - if (UNSAFE.compareAndSwapObject(this, COMPLETIONS, - p.next = completions, p)) - break; - } - } - if (r != null && (d == null || d.compareAndSet(0, 1))) { - T t = null; Throwable ex, dx = null; - if (r instanceof AltResult) { - if ((ex = ((AltResult)r).ex) != null) { - try { - t = fn.apply(ex); - } catch (Throwable rex) { - dx = rex; - } - } - } - else { - @SuppressWarnings("unchecked") T tr = (T) r; - t = tr; - } - dst.internalComplete(t, dx); - } - helpPostComplete(); - return dst; + public CompletableFuture<T> exceptionally( + Function<Throwable, ? extends T> fn) { + return uniExceptionallyStage(fn); } /* ------------- Arbitrary-arity constructions -------------- */ - /* - * The basic plan of attack is to recursively form binary - * completion trees of elements. This can be overkill for small - * sets, but scales nicely. The And/All vs Or/Any forms use the - * same idea, but details differ. - */ - /** * Returns a new CompletableFuture that is completed when all of * the given CompletableFutures complete. If any of the given @@ -2688,82 +2214,7 @@ * {@code null} */ public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) { - int len = cfs.length; // Directly handle empty and singleton cases - if (len > 1) - return allTree(cfs, 0, len - 1); - else { - CompletableFuture<Void> dst = new CompletableFuture<Void>(); - CompletableFuture<?> f; - if (len == 0) - dst.result = NIL; - else if ((f = cfs[0]) == null) - throw new NullPointerException(); - else { - ThenPropagate d = null; - CompletionNode p = null; - Object r; - while ((r = f.result) == null) { - if (d == null) - d = new ThenPropagate(f, dst); - else if (p == null) - p = new CompletionNode(d); - else if (UNSAFE.compareAndSwapObject - (f, COMPLETIONS, p.next = f.completions, p)) - break; - } - if (r != null && (d == null || d.compareAndSet(0, 1))) - dst.internalComplete(null, (r instanceof AltResult) ? - ((AltResult)r).ex : null); - f.helpPostComplete(); - } - return dst; - } - } - - /** - * Recursively constructs an And'ed tree of CompletableFutures. - * Called only when array known to have at least two elements. - */ - private static CompletableFuture<Void> allTree(CompletableFuture<?>[] cfs, - int lo, int hi) { - CompletableFuture<?> fst, snd; - int mid = (lo + hi) >>> 1; - if ((fst = (lo == mid ? cfs[lo] : allTree(cfs, lo, mid))) == null || - (snd = (hi == mid+1 ? cfs[hi] : allTree(cfs, mid+1, hi))) == null) - throw new NullPointerException(); - CompletableFuture<Void> dst = new CompletableFuture<Void>(); - AndCompletion d = null; - CompletionNode p = null, q = null; - Object r = null, s = null; - while ((r = fst.result) == null || (s = snd.result) == null) { - if (d == null) - d = new AndCompletion(fst, snd, dst); - else if (p == null) - p = new CompletionNode(d); - else if (q == null) { - if (UNSAFE.compareAndSwapObject - (fst, COMPLETIONS, p.next = fst.completions, p)) - q = new CompletionNode(d); - } - else if (UNSAFE.compareAndSwapObject - (snd, COMPLETIONS, q.next = snd.completions, q)) - break; - } - if ((r != null || (r = fst.result) != null) && - (s != null || (s = snd.result) != null) && - (d == null || d.compareAndSet(0, 1))) { - Throwable ex; - if (r instanceof AltResult) - ex = ((AltResult)r).ex; - else - ex = null; - if (ex == null && (s instanceof AltResult)) - ex = ((AltResult)s).ex; - dst.internalComplete(null, ex); - } - fst.helpPostComplete(); - snd.helpPostComplete(); - return dst; + return andTree(cfs, 0, cfs.length - 1); } /** @@ -2782,92 +2233,7 @@ * {@code null} */ public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs) { - int len = cfs.length; // Same idea as allOf - if (len > 1) - return anyTree(cfs, 0, len - 1); - else { - CompletableFuture<Object> dst = new CompletableFuture<Object>(); - CompletableFuture<?> f; - if (len == 0) - ; // skip - else if ((f = cfs[0]) == null) - throw new NullPointerException(); - else { - ThenCopy<Object> d = null; - CompletionNode p = null; - Object r; - while ((r = f.result) == null) { - if (d == null) - d = new ThenCopy<Object>(f, dst); - else if (p == null) - p = new CompletionNode(d); - else if (UNSAFE.compareAndSwapObject - (f, COMPLETIONS, p.next = f.completions, p)) - break; - } - if (r != null && (d == null || d.compareAndSet(0, 1))) { - Throwable ex; Object t; - if (r instanceof AltResult) { - ex = ((AltResult)r).ex; - t = null; - } - else { - ex = null; - t = r; - } - dst.internalComplete(t, ex); - } - f.helpPostComplete(); - } - return dst; - } - } - - /** - * Recursively constructs an Or'ed tree of CompletableFutures. - */ - private static CompletableFuture<Object> anyTree(CompletableFuture<?>[] cfs, - int lo, int hi) { - CompletableFuture<?> fst, snd; - int mid = (lo + hi) >>> 1; - if ((fst = (lo == mid ? cfs[lo] : anyTree(cfs, lo, mid))) == null || - (snd = (hi == mid+1 ? cfs[hi] : anyTree(cfs, mid+1, hi))) == null) - throw new NullPointerException(); - CompletableFuture<Object> dst = new CompletableFuture<Object>(); - OrCompletion d = null; - CompletionNode p = null, q = null; - Object r; - while ((r = fst.result) == null && (r = snd.result) == null) { - if (d == null) - d = new OrCompletion(fst, snd, dst); - else if (p == null) - p = new CompletionNode(d); - else if (q == null) { - if (UNSAFE.compareAndSwapObject - (fst, COMPLETIONS, p.next = fst.completions, p)) - q = new CompletionNode(d); - } - else if (UNSAFE.compareAndSwapObject - (snd, COMPLETIONS, q.next = snd.completions, q)) - break; - } - if ((r != null || (r = fst.result) != null || - (r = snd.result) != null) && - (d == null || d.compareAndSet(0, 1))) { - Throwable ex; Object t; - if (r instanceof AltResult) { - ex = ((AltResult)r).ex; - t = null; - } - else { - ex = null; - t = r; - } - dst.internalComplete(t, ex); - } - fst.helpPostComplete(); - snd.helpPostComplete(); - return dst; + return orTree(cfs, 0, cfs.length - 1); } /* ------------- Control and status methods -------------- */ @@ -2887,8 +2253,7 @@ */ public boolean cancel(boolean mayInterruptIfRunning) { boolean cancelled = (result == null) && - UNSAFE.compareAndSwapObject - (this, RESULT, null, new AltResult(new CancellationException())); + internalComplete(new AltResult(new CancellationException())); postComplete(); return cancelled || isCancelled(); } @@ -2940,11 +2305,12 @@ * Forcibly causes subsequent invocations of method {@link #get()} * and related methods to throw the given exception, whether or * not already completed. This method is designed for use only in - * recovery actions, and even in such situations may result in - * ongoing dependent completions using established versus + * error recovery actions, and even in such situations may result + * in ongoing dependent completions using established versus * overwritten outcomes. * * @param ex the exception + * @throws NullPointerException if the exception is null */ public void obtrudeException(Throwable ex) { if (ex == null) throw new NullPointerException(); @@ -2962,7 +2328,7 @@ */ public int getNumberOfDependents() { int count = 0; - for (CompletionNode p = completions; p != null; p = p.next) + for (Completion p = stack; p != null; p = p.next) ++count; return count; } @@ -2993,20 +2359,19 @@ // Unsafe mechanics private static final sun.misc.Unsafe UNSAFE; private static final long RESULT; - private static final long WAITERS; - private static final long COMPLETIONS; + private static final long STACK; + private static final long NEXT; static { try { - UNSAFE = sun.misc.Unsafe.getUnsafe(); + final sun.misc.Unsafe u; + UNSAFE = u = sun.misc.Unsafe.getUnsafe(); Class<?> k = CompletableFuture.class; - RESULT = UNSAFE.objectFieldOffset - (k.getDeclaredField("result")); - WAITERS = UNSAFE.objectFieldOffset - (k.getDeclaredField("waiters")); - COMPLETIONS = UNSAFE.objectFieldOffset - (k.getDeclaredField("completions")); - } catch (Exception e) { - throw new Error(e); + RESULT = u.objectFieldOffset(k.getDeclaredField("result")); + STACK = u.objectFieldOffset(k.getDeclaredField("stack")); + NEXT = u.objectFieldOffset + (Completion.class.getDeclaredField("next")); + } catch (Exception x) { + throw new Error(x); } } }
--- a/src/share/classes/java/util/concurrent/CompletionStage.java Thu Sep 04 13:00:55 2014 -0700 +++ b/src/share/classes/java/util/concurrent/CompletionStage.java Fri Sep 05 10:48:11 2014 +0200 @@ -407,7 +407,7 @@ /** * Returns a new CompletionStage that, when this and the other * given stage complete normally, executes the given action using - * the supplied executor + * the supplied executor. * * See the {@link CompletionStage} documentation for rules * covering exceptional completion. @@ -569,7 +569,7 @@ /** * Returns a new CompletionStage that, when either this or the * other given stage complete normally, executes the given action - * using supplied executor. + * using the supplied executor. * * See the {@link CompletionStage} documentation for rules * covering exceptional completion. @@ -649,10 +649,15 @@ (Function<Throwable, ? extends T> fn); /** - * Returns a new CompletionStage with the same result or exception - * as this stage, and when this stage completes, executes the - * given action with the result (or {@code null} if none) and the - * exception (or {@code null} if none) of this stage. + * Returns a new CompletionStage with the same result or exception as + * this stage, that executes the given action when this stage completes. + * + * <p>When this stage is complete, the given action is invoked with the + * result (or {@code null} if none) and the exception (or {@code null} + * if none) of this stage as arguments. The returned stage is completed + * when the action returns. If the supplied action itself encounters an + * exception, then the returned stage exceptionally completes with this + * exception unless this stage also completed exceptionally. * * @param action the action to perform * @return the new CompletionStage @@ -661,12 +666,16 @@ (BiConsumer<? super T, ? super Throwable> action); /** - * Returns a new CompletionStage with the same result or exception - * as this stage, and when this stage completes, executes the - * given action executes the given action using this stage's - * default asynchronous execution facility, with the result (or - * {@code null} if none) and the exception (or {@code null} if - * none) of this stage as arguments. + * Returns a new CompletionStage with the same result or exception as + * this stage, that executes the given action using this stage's + * default asynchronous execution facility when this stage completes. + * + * <p>When this stage is complete, the given action is invoked with the + * result (or {@code null} if none) and the exception (or {@code null} + * if none) of this stage as arguments. The returned stage is completed + * when the action returns. If the supplied action itself encounters an + * exception, then the returned stage exceptionally completes with this + * exception unless this stage also completed exceptionally. * * @param action the action to perform * @return the new CompletionStage @@ -675,11 +684,16 @@ (BiConsumer<? super T, ? super Throwable> action); /** - * Returns a new CompletionStage with the same result or exception - * as this stage, and when this stage completes, executes using - * the supplied Executor, the given action with the result (or - * {@code null} if none) and the exception (or {@code null} if - * none) of this stage as arguments. + * Returns a new CompletionStage with the same result or exception as + * this stage, that executes the given action using the supplied + * Executor when this stage completes. + * + * <p>When this stage is complete, the given action is invoked with the + * result (or {@code null} if none) and the exception (or {@code null} + * if none) of this stage as arguments. The returned stage is completed + * when the action returns. If the supplied action itself encounters an + * exception, then the returned stage exceptionally completes with this + * exception unless this stage also completed exceptionally. * * @param action the action to perform * @param executor the executor to use for asynchronous execution @@ -693,9 +707,11 @@ * Returns a new CompletionStage that, when this stage completes * either normally or exceptionally, is executed with this stage's * result and exception as arguments to the supplied function. - * The given function is invoked with the result (or {@code null} - * if none) and the exception (or {@code null} if none) of this - * stage when complete as arguments. + * + * <p>When this stage is complete, the given function is invoked + * with the result (or {@code null} if none) and the exception (or + * {@code null} if none) of this stage as arguments, and the + * function's result is used to complete the returned stage. * * @param fn the function to use to compute the value of the * returned CompletionStage @@ -710,9 +726,11 @@ * either normally or exceptionally, is executed using this stage's * default asynchronous execution facility, with this stage's * result and exception as arguments to the supplied function. - * The given function is invoked with the result (or {@code null} - * if none) and the exception (or {@code null} if none) of this - * stage when complete as arguments. + * + * <p>When this stage is complete, the given function is invoked + * with the result (or {@code null} if none) and the exception (or + * {@code null} if none) of this stage as arguments, and the + * function's result is used to complete the returned stage. * * @param fn the function to use to compute the value of the * returned CompletionStage @@ -726,10 +744,12 @@ * Returns a new CompletionStage that, when this stage completes * either normally or exceptionally, is executed using the * supplied executor, with this stage's result and exception as - * arguments to the supplied function. The given function is - * invoked with the result (or {@code null} if none) and the - * exception (or {@code null} if none) of this stage when complete - * as arguments. + * arguments to the supplied function. + * + * <p>When this stage is complete, the given function is invoked + * with the result (or {@code null} if none) and the exception (or + * {@code null} if none) of this stage as arguments, and the + * function's result is used to complete the returned stage. * * @param fn the function to use to compute the value of the * returned CompletionStage