Mercurial > hg > openjdk > lambda > jdk
changeset 9502:0a778e487a73
Merge
author | mullan |
---|---|
date | Fri, 02 Aug 2013 09:38:13 -0400 |
parents | 7bbc6c2351d7 (current diff) 6ec910ff3ea1 (diff) |
children | 33617583c545 |
files | |
diffstat | 4 files changed, 1708 insertions(+), 1247 deletions(-) [+] |
line wrap: on
line diff
--- a/src/share/classes/java/util/concurrent/CompletableFuture.java Fri Aug 02 08:37:25 2013 -0400 +++ b/src/share/classes/java/util/concurrent/CompletableFuture.java Fri Aug 02 09:38:13 2013 -0400 @@ -48,13 +48,16 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeoutException; import java.util.concurrent.CancellationException; +import java.util.concurrent.CompletionException; +import java.util.concurrent.CompletionStage; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.LockSupport; /** * A {@link Future} that may be explicitly completed (setting its - * value and status), and may include dependent functions and actions - * that trigger upon its completion. + * value and status), and may be used as a {@link CompletionStage}, + * supporting dependent functions and actions that trigger upon its + * completion. * * <p>When two or more threads attempt to * {@link #complete complete}, @@ -62,64 +65,50 @@ * {@link #cancel cancel} * a CompletableFuture, only one of them succeeds. * - * <p>Methods are available for adding dependents based on - * user-provided Functions, Consumers, or Runnables. The appropriate - * form to use depends on whether actions require arguments and/or - * produce results. Completion of a dependent action will trigger the - * completion of another CompletableFuture. Actions may also be - * triggered after either or both the current and another - * CompletableFuture complete. Multiple CompletableFutures may also - * be grouped as one using {@link #anyOf(CompletableFuture...)} and - * {@link #allOf(CompletableFuture...)}. + * <p>In addition to these and related methods for directly + * manipulating status and results, CompletableFuture implements + * interface {@link CompletionStage} with the following policies: <ul> * - * <p>CompletableFutures themselves do not execute asynchronously. - * However, actions supplied for dependent completions of another - * CompletableFuture may do so, depending on whether they are provided - * via one of the <em>async</em> methods (that is, methods with names - * of the form <tt><var>xxx</var>Async</tt>). The <em>async</em> - * methods provide a way to commence asynchronous processing of an - * action using either a given {@link Executor} or by default the - * {@link ForkJoinPool#commonPool()}. To simplify monitoring, + * <li>Actions supplied for dependent completions of + * <em>non-async</em> methods may be performed by the thread that + * completes the current CompletableFuture, or by any other caller of + * a completion method.</li> + * + * <li>All <em>async</em> methods without an explicit Executor + * argument are performed using the {@link ForkJoinPool#commonPool()} + * (unless it does not support a parallelism level of at least two, in + * which case, a new Thread is used). To simplify monitoring, * debugging, and tracking, all generated asynchronous tasks are - * instances of the marker interface {@link AsynchronousCompletionTask}. + * instances of the marker interface {@link + * AsynchronousCompletionTask}. </li> * - * <p>Actions supplied for dependent completions of <em>non-async</em> - * methods may be performed by the thread that completes the current - * CompletableFuture, or by any other caller of these methods. There - * are no guarantees about the order of processing completions unless - * constrained by these methods. + * <li>All CompletionStage methods are implemented independently of + * other public methods, so the behavior of one method is not impacted + * by overrides of others in subclasses. </li> </ul> * - * <p>Since (unlike {@link FutureTask}) this class has no direct - * control over the computation that causes it to be completed, - * cancellation is treated as just another form of exceptional completion. - * Method {@link #cancel cancel} has the same effect as - * {@code completeExceptionally(new CancellationException())}. + * <p>CompletableFuture also implements {@link Future} with the following + * policies: <ul> * - * <p>Upon exceptional completion (including cancellation), or when a - * completion entails an additional computation which terminates - * abruptly with an (unchecked) exception or error, then all of their - * dependent completions (and their dependents in turn) generally act - * as {@code completeExceptionally} with a {@link CompletionException} - * holding that exception as its cause. However, the {@link - * #exceptionally exceptionally} and {@link #handle handle} - * completions <em>are</em> able to handle exceptional completions of - * the CompletableFutures they depend on. + * <li>Since (unlike {@link FutureTask}) this class has no direct + * control over the computation that causes it to be completed, + * cancellation is treated as just another form of exceptional + * completion. Method {@link #cancel cancel} has the same effect as + * {@code completeExceptionally(new CancellationException())}. Method + * {@link #isCompletedExceptionally} can be used to determine if a + * CompletableFuture completed in any exceptional fashion.</li> * - * <p>In case of exceptional completion with a CompletionException, + * <li>In case of exceptional completion with a CompletionException, * methods {@link #get()} and {@link #get(long, TimeUnit)} throw an * {@link ExecutionException} with the same cause as held in the - * corresponding CompletionException. However, in these cases, - * methods {@link #join()} and {@link #getNow} throw the - * CompletionException, which simplifies usage. - * - * <p>Arguments used to pass a completion result (that is, for parameters - * of type {@code T}) may be null, but passing a null value for any other - * parameter will result in a {@link NullPointerException} being thrown. + * corresponding CompletionException. To simplify usage in most + * contexts, this class also defines methods {@link #join()} and + * {@link #getNow} that instead throw the CompletionException directly + * in these cases.</li> </ul> * * @author Doug Lea * @since 1.8 */ -public class CompletableFuture<T> implements Future<T> { +public class CompletableFuture<T> implements Future<T>, CompletionStage<T> { /* * Overview: @@ -438,6 +427,19 @@ public final void run() { exec(); } } + /** + * Starts the given async task using the given executor, unless + * the executor is ForkJoinPool.commonPool and it has been + * disabled, in which case starts a new thread. + */ + static void execAsync(Executor e, Async r) { + if (e == ForkJoinPool.commonPool() && + ForkJoinPool.getCommonPoolParallelism() <= 1) + new Thread(r).start(); + else + e.execute(r); + } + static final class AsyncRun extends Async { final Runnable fn; final CompletableFuture<Void> dst; @@ -538,13 +540,13 @@ static final class AsyncAccept<T> extends Async { final T arg; final Consumer<? super T> fn; - final CompletableFuture<Void> dst; + final CompletableFuture<?> dst; AsyncAccept(T arg, Consumer<? super T> fn, - CompletableFuture<Void> dst) { + CompletableFuture<?> dst) { this.arg = arg; this.fn = fn; this.dst = dst; } public final boolean exec() { - CompletableFuture<Void> d; Throwable ex; + CompletableFuture<?> d; Throwable ex; if ((d = this.dst) != null && d.result == null) { try { fn.accept(arg); @@ -563,14 +565,14 @@ final T arg1; final U arg2; final BiConsumer<? super T,? super U> fn; - final CompletableFuture<Void> dst; + final CompletableFuture<?> dst; AsyncAcceptBoth(T arg1, U arg2, BiConsumer<? super T,? super U> fn, - CompletableFuture<Void> dst) { + CompletableFuture<?> dst) { this.arg1 = arg1; this.arg2 = arg2; this.fn = fn; this.dst = dst; } public final boolean exec() { - CompletableFuture<Void> d; Throwable ex; + CompletableFuture<?> d; Throwable ex; if ((d = this.dst) != null && d.result == null) { try { fn.accept(arg1, arg2); @@ -587,10 +589,10 @@ static final class AsyncCompose<T,U> extends Async { final T arg; - final Function<? super T, CompletableFuture<U>> fn; + final Function<? super T, ? extends CompletionStage<U>> fn; final CompletableFuture<U> dst; AsyncCompose(T arg, - Function<? super T, CompletableFuture<U>> fn, + Function<? super T, ? extends CompletionStage<U>> fn, CompletableFuture<U> dst) { this.arg = arg; this.fn = fn; this.dst = dst; } @@ -598,7 +600,8 @@ CompletableFuture<U> d, fr; U u; Throwable ex; if ((d = this.dst) != null && d.result == null) { try { - fr = fn.apply(arg); + CompletionStage<U> cs = fn.apply(arg); + fr = (cs == null) ? null : cs.toCompletableFuture(); ex = (fr == null) ? new NullPointerException() : null; } catch (Throwable rex) { ex = rex; @@ -626,6 +629,33 @@ private static final long serialVersionUID = 5232453952276885070L; } + static final class AsyncWhenComplete<T> extends Async { + final T arg1; + final Throwable arg2; + final BiConsumer<? super T,? super Throwable> fn; + final CompletableFuture<T> dst; + AsyncWhenComplete(T arg1, Throwable arg2, + BiConsumer<? super T,? super Throwable> fn, + CompletableFuture<T> dst) { + this.arg1 = arg1; this.arg2 = arg2; this.fn = fn; this.dst = dst; + } + public final boolean exec() { + CompletableFuture<T> d; + if ((d = this.dst) != null && d.result == null) { + Throwable ex = arg2; + try { + fn.accept(arg1, ex); + } catch (Throwable rex) { + if (ex == null) + ex = rex; + } + d.internalComplete(arg1, ex); + } + return true; + } + private static final long serialVersionUID = 5232453952276885070L; + } + /* ------------- Completions -------------- */ /** @@ -680,7 +710,7 @@ if (ex == null) { try { if (e != null) - e.execute(new AsyncApply<T,U>(t, fn, dst)); + execAsync(e, new AsyncApply<T,U>(t, fn, dst)); else u = fn.apply(t); } catch (Throwable rex) { @@ -697,11 +727,11 @@ static final class ThenAccept<T> extends Completion { final CompletableFuture<? extends T> src; final Consumer<? super T> fn; - final CompletableFuture<Void> dst; + final CompletableFuture<?> dst; final Executor executor; ThenAccept(CompletableFuture<? extends T> src, Consumer<? super T> fn, - CompletableFuture<Void> dst, + CompletableFuture<?> dst, Executor executor) { this.src = src; this.fn = fn; this.dst = dst; this.executor = executor; @@ -709,7 +739,7 @@ public final void run() { final CompletableFuture<? extends T> a; final Consumer<? super T> fn; - final CompletableFuture<Void> dst; + final CompletableFuture<?> dst; Object r; T t; Throwable ex; if ((dst = this.dst) != null && (fn = this.fn) != null && @@ -729,7 +759,7 @@ if (ex == null) { try { if (e != null) - e.execute(new AsyncAccept<T>(t, fn, dst)); + execAsync(e, new AsyncAccept<T>(t, fn, dst)); else fn.accept(t); } catch (Throwable rex) { @@ -773,7 +803,7 @@ if (ex == null) { try { if (e != null) - e.execute(new AsyncRun(fn, dst)); + execAsync(e, new AsyncRun(fn, dst)); else fn.run(); } catch (Throwable rex) { @@ -839,7 +869,7 @@ if (ex == null) { try { if (e != null) - e.execute(new AsyncCombine<T,U,V>(t, u, fn, dst)); + execAsync(e, new AsyncCombine<T,U,V>(t, u, fn, dst)); else v = fn.apply(t, u); } catch (Throwable rex) { @@ -904,7 +934,7 @@ if (ex == null) { try { if (e != null) - e.execute(new AsyncAcceptBoth<T,U>(t, u, fn, dst)); + execAsync(e, new AsyncAcceptBoth<T,U>(t, u, fn, dst)); else fn.accept(t, u); } catch (Throwable rex) { @@ -956,7 +986,7 @@ if (ex == null) { try { if (e != null) - e.execute(new AsyncRun(fn, dst)); + execAsync(e, new AsyncRun(fn, dst)); else fn.run(); } catch (Throwable rex) { @@ -1042,7 +1072,7 @@ if (ex == null) { try { if (e != null) - e.execute(new AsyncApply<T,U>(t, fn, dst)); + execAsync(e, new AsyncApply<T,U>(t, fn, dst)); else u = fn.apply(t); } catch (Throwable rex) { @@ -1095,7 +1125,7 @@ if (ex == null) { try { if (e != null) - e.execute(new AsyncAccept<T>(t, fn, dst)); + execAsync(e, new AsyncAccept<T>(t, fn, dst)); else fn.accept(t); } catch (Throwable rex) { @@ -1143,7 +1173,7 @@ if (ex == null) { try { if (e != null) - e.execute(new AsyncRun(fn, dst)); + execAsync(e, new AsyncRun(fn, dst)); else fn.run(); } catch (Throwable rex) { @@ -1226,6 +1256,54 @@ private static final long serialVersionUID = 5232453952276885070L; } + static final class WhenCompleteCompletion<T> extends Completion { + final CompletableFuture<? extends T> src; + final BiConsumer<? super T, ? super Throwable> fn; + final CompletableFuture<T> dst; + final Executor executor; + WhenCompleteCompletion(CompletableFuture<? extends T> src, + BiConsumer<? super T, ? super Throwable> fn, + CompletableFuture<T> dst, + Executor executor) { + this.src = src; this.fn = fn; this.dst = dst; + this.executor = executor; + } + public final void run() { + final CompletableFuture<? extends T> a; + final BiConsumer<? super T, ? super Throwable> fn; + final CompletableFuture<T> dst; + Object r; T t; Throwable ex; + if ((dst = this.dst) != null && + (fn = this.fn) != null && + (a = this.src) != null && + (r = a.result) != null && + compareAndSet(0, 1)) { + if (r instanceof AltResult) { + ex = ((AltResult)r).ex; + t = null; + } + else { + ex = null; + @SuppressWarnings("unchecked") T tr = (T) r; + t = tr; + } + Executor e = executor; + Throwable dx = null; + try { + if (e != null) + execAsync(e, new AsyncWhenComplete<T>(t, ex, fn, dst)); + else + fn.accept(t, ex); + } catch (Throwable rex) { + dx = rex; + } + if (e == null || dx != null) + dst.internalComplete(t, ex != null ? ex : dx); + } + } + private static final long serialVersionUID = 5232453952276885070L; + } + static final class ThenCopy<T> extends Completion { final CompletableFuture<?> src; final CompletableFuture<T> dst; @@ -1286,10 +1364,13 @@ final CompletableFuture<? extends T> src; final BiFunction<? super T, Throwable, ? extends U> fn; final CompletableFuture<U> dst; + final Executor executor; HandleCompletion(CompletableFuture<? extends T> src, BiFunction<? super T, Throwable, ? extends U> fn, - CompletableFuture<U> dst) { + CompletableFuture<U> dst, + Executor executor) { this.src = src; this.fn = fn; this.dst = dst; + this.executor = executor; } public final void run() { final CompletableFuture<? extends T> a; @@ -1310,13 +1391,19 @@ @SuppressWarnings("unchecked") T tr = (T) r; t = tr; } - U u = null; Throwable dx = null; + Executor e = executor; + U u = null; + Throwable dx = null; try { - u = fn.apply(t, ex); + if (e != null) + execAsync(e, new AsyncCombine<T,Throwable,U>(t, ex, fn, dst)); + else + u = fn.apply(t, ex); } catch (Throwable rex) { dx = rex; } - dst.internalComplete(u, dx); + if (e == null || dx != null) + dst.internalComplete(u, dx); } } private static final long serialVersionUID = 5232453952276885070L; @@ -1324,11 +1411,11 @@ static final class ThenCompose<T,U> extends Completion { final CompletableFuture<? extends T> src; - final Function<? super T, CompletableFuture<U>> fn; + final Function<? super T, ? extends CompletionStage<U>> fn; final CompletableFuture<U> dst; final Executor executor; ThenCompose(CompletableFuture<? extends T> src, - Function<? super T, CompletableFuture<U>> fn, + Function<? super T, ? extends CompletionStage<U>> fn, CompletableFuture<U> dst, Executor executor) { this.src = src; this.fn = fn; this.dst = dst; @@ -1336,7 +1423,7 @@ } public final void run() { final CompletableFuture<? extends T> a; - final Function<? super T, CompletableFuture<U>> fn; + final Function<? super T, ? extends CompletionStage<U>> fn; final CompletableFuture<U> dst; Object r; T t; Throwable ex; Executor e; if ((dst = this.dst) != null && @@ -1358,10 +1445,12 @@ boolean complete = false; if (ex == null) { if ((e = executor) != null) - e.execute(new AsyncCompose<T,U>(t, fn, dst)); + execAsync(e, new AsyncCompose<T,U>(t, fn, dst)); else { try { - if ((c = fn.apply(t)) == null) + CompletionStage<U> cs = fn.apply(t); + c = (cs == null) ? null : cs.toCompletableFuture(); + if (c == null) ex = new NullPointerException(); } catch (Throwable rex) { ex = rex; @@ -1401,6 +1490,619 @@ private static final long serialVersionUID = 5232453952276885070L; } + // Implementations of stage methods with (plain, async, Executor) forms + + private <U> CompletableFuture<U> doThenApply + (Function<? super T,? extends U> fn, + Executor e) { + if (fn == null) throw new NullPointerException(); + CompletableFuture<U> dst = new CompletableFuture<U>(); + ThenApply<T,U> d = null; + Object r; + if ((r = result) == null) { + CompletionNode p = new CompletionNode + (d = new ThenApply<T,U>(this, fn, dst, e)); + while ((r = result) == null) { + if (UNSAFE.compareAndSwapObject + (this, COMPLETIONS, p.next = completions, p)) + break; + } + } + if (r != null && (d == null || d.compareAndSet(0, 1))) { + T t; Throwable ex; + if (r instanceof AltResult) { + ex = ((AltResult)r).ex; + t = null; + } + else { + ex = null; + @SuppressWarnings("unchecked") T tr = (T) r; + t = tr; + } + U u = null; + if (ex == null) { + try { + if (e != null) + execAsync(e, new AsyncApply<T,U>(t, fn, dst)); + else + u = fn.apply(t); + } catch (Throwable rex) { + ex = rex; + } + } + if (e == null || ex != null) + dst.internalComplete(u, ex); + } + helpPostComplete(); + return dst; + } + + private CompletableFuture<Void> doThenAccept(Consumer<? super T> fn, + Executor e) { + if (fn == null) throw new NullPointerException(); + CompletableFuture<Void> dst = new CompletableFuture<Void>(); + ThenAccept<T> d = null; + Object r; + if ((r = result) == null) { + CompletionNode p = new CompletionNode + (d = new ThenAccept<T>(this, fn, dst, e)); + while ((r = result) == null) { + if (UNSAFE.compareAndSwapObject + (this, COMPLETIONS, p.next = completions, p)) + break; + } + } + if (r != null && (d == null || d.compareAndSet(0, 1))) { + T t; Throwable ex; + if (r instanceof AltResult) { + ex = ((AltResult)r).ex; + t = null; + } + else { + ex = null; + @SuppressWarnings("unchecked") T tr = (T) r; + t = tr; + } + if (ex == null) { + try { + if (e != null) + execAsync(e, new AsyncAccept<T>(t, fn, dst)); + else + fn.accept(t); + } catch (Throwable rex) { + ex = rex; + } + } + if (e == null || ex != null) + dst.internalComplete(null, ex); + } + helpPostComplete(); + return dst; + } + + private CompletableFuture<Void> doThenRun(Runnable action, + Executor e) { + if (action == null) throw new NullPointerException(); + CompletableFuture<Void> dst = new CompletableFuture<Void>(); + ThenRun d = null; + Object r; + if ((r = result) == null) { + CompletionNode p = new CompletionNode + (d = new ThenRun(this, action, dst, e)); + while ((r = result) == null) { + if (UNSAFE.compareAndSwapObject + (this, COMPLETIONS, p.next = completions, p)) + break; + } + } + if (r != null && (d == null || d.compareAndSet(0, 1))) { + Throwable ex; + if (r instanceof AltResult) + ex = ((AltResult)r).ex; + else + ex = null; + if (ex == null) { + try { + if (e != null) + execAsync(e, new AsyncRun(action, dst)); + else + action.run(); + } catch (Throwable rex) { + ex = rex; + } + } + if (e == null || ex != null) + dst.internalComplete(null, ex); + } + helpPostComplete(); + return dst; + } + + private <U,V> CompletableFuture<V> doThenCombine + (CompletableFuture<? extends U> other, + BiFunction<? super T,? super U,? extends V> fn, + Executor e) { + if (other == null || fn == null) throw new NullPointerException(); + CompletableFuture<V> dst = new CompletableFuture<V>(); + ThenCombine<T,U,V> d = null; + Object r, s = null; + if ((r = result) == null || (s = other.result) == null) { + d = new ThenCombine<T,U,V>(this, other, fn, dst, e); + CompletionNode q = null, p = new CompletionNode(d); + while ((r == null && (r = result) == null) || + (s == null && (s = other.result) == null)) { + if (q != null) { + if (s != null || + UNSAFE.compareAndSwapObject + (other, COMPLETIONS, q.next = other.completions, q)) + break; + } + else if (r != null || + UNSAFE.compareAndSwapObject + (this, COMPLETIONS, p.next = completions, p)) { + if (s != null) + break; + q = new CompletionNode(d); + } + } + } + if (r != null && s != null && (d == null || d.compareAndSet(0, 1))) { + T t; U u; Throwable ex; + if (r instanceof AltResult) { + ex = ((AltResult)r).ex; + t = null; + } + else { + ex = null; + @SuppressWarnings("unchecked") T tr = (T) r; + t = tr; + } + if (ex != null) + u = null; + else if (s instanceof AltResult) { + ex = ((AltResult)s).ex; + u = null; + } + else { + @SuppressWarnings("unchecked") U us = (U) s; + u = us; + } + V v = null; + if (ex == null) { + try { + if (e != null) + execAsync(e, new AsyncCombine<T,U,V>(t, u, fn, dst)); + else + v = fn.apply(t, u); + } catch (Throwable rex) { + ex = rex; + } + } + if (e == null || ex != null) + dst.internalComplete(v, ex); + } + helpPostComplete(); + other.helpPostComplete(); + return dst; + } + + private <U> CompletableFuture<Void> doThenAcceptBoth + (CompletableFuture<? extends U> other, + BiConsumer<? super T,? super U> fn, + Executor e) { + if (other == null || fn == null) throw new NullPointerException(); + CompletableFuture<Void> dst = new CompletableFuture<Void>(); + ThenAcceptBoth<T,U> d = null; + Object r, s = null; + if ((r = result) == null || (s = other.result) == null) { + d = new ThenAcceptBoth<T,U>(this, other, fn, dst, e); + CompletionNode q = null, p = new CompletionNode(d); + while ((r == null && (r = result) == null) || + (s == null && (s = other.result) == null)) { + if (q != null) { + if (s != null || + UNSAFE.compareAndSwapObject + (other, COMPLETIONS, q.next = other.completions, q)) + break; + } + else if (r != null || + UNSAFE.compareAndSwapObject + (this, COMPLETIONS, p.next = completions, p)) { + if (s != null) + break; + q = new CompletionNode(d); + } + } + } + if (r != null && s != null && (d == null || d.compareAndSet(0, 1))) { + T t; U u; Throwable ex; + if (r instanceof AltResult) { + ex = ((AltResult)r).ex; + t = null; + } + else { + ex = null; + @SuppressWarnings("unchecked") T tr = (T) r; + t = tr; + } + if (ex != null) + u = null; + else if (s instanceof AltResult) { + ex = ((AltResult)s).ex; + u = null; + } + else { + @SuppressWarnings("unchecked") U us = (U) s; + u = us; + } + if (ex == null) { + try { + if (e != null) + execAsync(e, new AsyncAcceptBoth<T,U>(t, u, fn, dst)); + else + fn.accept(t, u); + } catch (Throwable rex) { + ex = rex; + } + } + if (e == null || ex != null) + dst.internalComplete(null, ex); + } + helpPostComplete(); + other.helpPostComplete(); + return dst; + } + + private CompletableFuture<Void> doRunAfterBoth(CompletableFuture<?> other, + Runnable action, + Executor e) { + if (other == null || action == null) throw new NullPointerException(); + CompletableFuture<Void> dst = new CompletableFuture<Void>(); + RunAfterBoth d = null; + Object r, s = null; + if ((r = result) == null || (s = other.result) == null) { + d = new RunAfterBoth(this, other, action, dst, e); + CompletionNode q = null, p = new CompletionNode(d); + while ((r == null && (r = result) == null) || + (s == null && (s = other.result) == null)) { + if (q != null) { + if (s != null || + UNSAFE.compareAndSwapObject + (other, COMPLETIONS, q.next = other.completions, q)) + break; + } + else if (r != null || + UNSAFE.compareAndSwapObject + (this, COMPLETIONS, p.next = completions, p)) { + if (s != null) + break; + q = new CompletionNode(d); + } + } + } + if (r != null && s != null && (d == null || d.compareAndSet(0, 1))) { + Throwable ex; + if (r instanceof AltResult) + ex = ((AltResult)r).ex; + else + ex = null; + if (ex == null && (s instanceof AltResult)) + ex = ((AltResult)s).ex; + if (ex == null) { + try { + if (e != null) + execAsync(e, new AsyncRun(action, dst)); + else + action.run(); + } catch (Throwable rex) { + ex = rex; + } + } + if (e == null || ex != null) + dst.internalComplete(null, ex); + } + helpPostComplete(); + other.helpPostComplete(); + return dst; + } + + private <U> CompletableFuture<U> doApplyToEither + (CompletableFuture<? extends T> other, + Function<? super T, U> fn, + Executor e) { + if (other == null || fn == null) throw new NullPointerException(); + CompletableFuture<U> dst = new CompletableFuture<U>(); + ApplyToEither<T,U> d = null; + Object r; + if ((r = result) == null && (r = other.result) == null) { + d = new ApplyToEither<T,U>(this, other, fn, dst, e); + CompletionNode q = null, p = new CompletionNode(d); + while ((r = result) == null && (r = other.result) == null) { + if (q != null) { + if (UNSAFE.compareAndSwapObject + (other, COMPLETIONS, q.next = other.completions, q)) + break; + } + else if (UNSAFE.compareAndSwapObject + (this, COMPLETIONS, p.next = completions, p)) + q = new CompletionNode(d); + } + } + if (r != null && (d == null || d.compareAndSet(0, 1))) { + T t; Throwable ex; + if (r instanceof AltResult) { + ex = ((AltResult)r).ex; + t = null; + } + else { + ex = null; + @SuppressWarnings("unchecked") T tr = (T) r; + t = tr; + } + U u = null; + if (ex == null) { + try { + if (e != null) + execAsync(e, new AsyncApply<T,U>(t, fn, dst)); + else + u = fn.apply(t); + } catch (Throwable rex) { + ex = rex; + } + } + if (e == null || ex != null) + dst.internalComplete(u, ex); + } + helpPostComplete(); + other.helpPostComplete(); + return dst; + } + + private CompletableFuture<Void> doAcceptEither + (CompletableFuture<? extends T> other, + Consumer<? super T> fn, + Executor e) { + if (other == null || fn == null) throw new NullPointerException(); + CompletableFuture<Void> dst = new CompletableFuture<Void>(); + AcceptEither<T> d = null; + Object r; + if ((r = result) == null && (r = other.result) == null) { + d = new AcceptEither<T>(this, other, fn, dst, e); + CompletionNode q = null, p = new CompletionNode(d); + while ((r = result) == null && (r = other.result) == null) { + if (q != null) { + if (UNSAFE.compareAndSwapObject + (other, COMPLETIONS, q.next = other.completions, q)) + break; + } + else if (UNSAFE.compareAndSwapObject + (this, COMPLETIONS, p.next = completions, p)) + q = new CompletionNode(d); + } + } + if (r != null && (d == null || d.compareAndSet(0, 1))) { + T t; Throwable ex; + if (r instanceof AltResult) { + ex = ((AltResult)r).ex; + t = null; + } + else { + ex = null; + @SuppressWarnings("unchecked") T tr = (T) r; + t = tr; + } + if (ex == null) { + try { + if (e != null) + execAsync(e, new AsyncAccept<T>(t, fn, dst)); + else + fn.accept(t); + } catch (Throwable rex) { + ex = rex; + } + } + if (e == null || ex != null) + dst.internalComplete(null, ex); + } + helpPostComplete(); + other.helpPostComplete(); + return dst; + } + + private CompletableFuture<Void> doRunAfterEither + (CompletableFuture<?> other, + Runnable action, + Executor e) { + if (other == null || action == null) throw new NullPointerException(); + CompletableFuture<Void> dst = new CompletableFuture<Void>(); + RunAfterEither d = null; + Object r; + if ((r = result) == null && (r = other.result) == null) { + d = new RunAfterEither(this, other, action, dst, e); + CompletionNode q = null, p = new CompletionNode(d); + while ((r = result) == null && (r = other.result) == null) { + if (q != null) { + if (UNSAFE.compareAndSwapObject + (other, COMPLETIONS, q.next = other.completions, q)) + break; + } + else if (UNSAFE.compareAndSwapObject + (this, COMPLETIONS, p.next = completions, p)) + q = new CompletionNode(d); + } + } + if (r != null && (d == null || d.compareAndSet(0, 1))) { + Throwable ex; + if (r instanceof AltResult) + ex = ((AltResult)r).ex; + else + ex = null; + if (ex == null) { + try { + if (e != null) + execAsync(e, new AsyncRun(action, dst)); + else + action.run(); + } catch (Throwable rex) { + ex = rex; + } + } + if (e == null || ex != null) + dst.internalComplete(null, ex); + } + helpPostComplete(); + other.helpPostComplete(); + return dst; + } + + private <U> CompletableFuture<U> doThenCompose + (Function<? super T, ? extends CompletionStage<U>> fn, + Executor e) { + if (fn == null) throw new NullPointerException(); + CompletableFuture<U> dst = null; + ThenCompose<T,U> d = null; + Object r; + if ((r = result) == null) { + dst = new CompletableFuture<U>(); + CompletionNode p = new CompletionNode + (d = new ThenCompose<T,U>(this, fn, dst, e)); + while ((r = result) == null) { + if (UNSAFE.compareAndSwapObject + (this, COMPLETIONS, p.next = completions, p)) + break; + } + } + if (r != null && (d == null || d.compareAndSet(0, 1))) { + T t; Throwable ex; + if (r instanceof AltResult) { + ex = ((AltResult)r).ex; + t = null; + } + else { + ex = null; + @SuppressWarnings("unchecked") T tr = (T) r; + t = tr; + } + if (ex == null) { + if (e != null) { + if (dst == null) + dst = new CompletableFuture<U>(); + execAsync(e, new AsyncCompose<T,U>(t, fn, dst)); + } + else { + try { + CompletionStage<U> cs = fn.apply(t); + if (cs == null || + (dst = cs.toCompletableFuture()) == null) + ex = new NullPointerException(); + } catch (Throwable rex) { + ex = rex; + } + } + } + if (dst == null) + dst = new CompletableFuture<U>(); + if (e == null || ex != null) + dst.internalComplete(null, ex); + } + helpPostComplete(); + dst.helpPostComplete(); + return dst; + } + + private CompletableFuture<T> doWhenComplete + (BiConsumer<? super T, ? super Throwable> fn, + Executor e) { + if (fn == null) throw new NullPointerException(); + CompletableFuture<T> dst = new CompletableFuture<T>(); + WhenCompleteCompletion<T> d = null; + Object r; + if ((r = result) == null) { + CompletionNode p = + new CompletionNode(d = new WhenCompleteCompletion<T> + (this, fn, dst, e)); + while ((r = result) == null) { + if (UNSAFE.compareAndSwapObject(this, COMPLETIONS, + p.next = completions, p)) + break; + } + } + if (r != null && (d == null || d.compareAndSet(0, 1))) { + T t; Throwable ex; + if (r instanceof AltResult) { + ex = ((AltResult)r).ex; + t = null; + } + else { + ex = null; + @SuppressWarnings("unchecked") T tr = (T) r; + t = tr; + } + Throwable dx = null; + try { + if (e != null) + execAsync(e, new AsyncWhenComplete<T>(t, ex, fn, dst)); + else + fn.accept(t, ex); + } catch (Throwable rex) { + dx = rex; + } + if (e == null || dx != null) + dst.internalComplete(t, ex != null ? ex : dx); + } + helpPostComplete(); + return dst; + } + + private <U> CompletableFuture<U> doHandle + (BiFunction<? super T, Throwable, ? extends U> fn, + Executor e) { + if (fn == null) throw new NullPointerException(); + CompletableFuture<U> dst = new CompletableFuture<U>(); + HandleCompletion<T,U> d = null; + Object r; + if ((r = result) == null) { + CompletionNode p = + new CompletionNode(d = new HandleCompletion<T,U> + (this, fn, dst, e)); + while ((r = result) == null) { + if (UNSAFE.compareAndSwapObject(this, COMPLETIONS, + p.next = completions, p)) + break; + } + } + if (r != null && (d == null || d.compareAndSet(0, 1))) { + T t; Throwable ex; + if (r instanceof AltResult) { + ex = ((AltResult)r).ex; + t = null; + } + else { + ex = null; + @SuppressWarnings("unchecked") T tr = (T) r; + t = tr; + } + U u = null; + Throwable dx = null; + try { + if (e != null) + execAsync(e, new AsyncCombine<T,Throwable,U>(t, ex, fn, dst)); + else { + u = fn.apply(t, ex); + dx = null; + } + } catch (Throwable rex) { + dx = rex; + u = null; + } + if (e == null || dx != null) + dst.internalComplete(u, dx); + } + helpPostComplete(); + return dst; + } + + // public methods /** @@ -1416,13 +2118,13 @@ * * @param supplier a function returning the value to be used * to complete the returned CompletableFuture + * @param <U> the function's return type * @return the new CompletableFuture */ public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) { if (supplier == null) throw new NullPointerException(); CompletableFuture<U> f = new CompletableFuture<U>(); - ForkJoinPool.commonPool(). - execute((ForkJoinTask<?>)new AsyncSupply<U>(supplier, f)); + execAsync(ForkJoinPool.commonPool(), new AsyncSupply<U>(supplier, f)); return f; } @@ -1434,6 +2136,7 @@ * @param supplier a function returning the value to be used * to complete the returned CompletableFuture * @param executor the executor to use for asynchronous execution + * @param <U> the function's return type * @return the new CompletableFuture */ public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, @@ -1441,7 +2144,7 @@ if (executor == null || supplier == null) throw new NullPointerException(); CompletableFuture<U> f = new CompletableFuture<U>(); - executor.execute(new AsyncSupply<U>(supplier, f)); + execAsync(executor, new AsyncSupply<U>(supplier, f)); return f; } @@ -1457,8 +2160,7 @@ public static CompletableFuture<Void> runAsync(Runnable runnable) { if (runnable == null) throw new NullPointerException(); CompletableFuture<Void> f = new CompletableFuture<Void>(); - ForkJoinPool.commonPool(). - execute((ForkJoinTask<?>)new AsyncRun(runnable, f)); + execAsync(ForkJoinPool.commonPool(), new AsyncRun(runnable, f)); return f; } @@ -1477,7 +2179,7 @@ if (executor == null || runnable == null) throw new NullPointerException(); CompletableFuture<Void> f = new CompletableFuture<Void>(); - executor.execute(new AsyncRun(runnable, f)); + execAsync(executor, new AsyncRun(runnable, f)); return f; } @@ -1486,6 +2188,7 @@ * the given value. * * @param value the value + * @param <U> the type of the value * @return the completed CompletableFuture */ public static <U> CompletableFuture<U> completedFuture(U value) { @@ -1657,60 +2360,18 @@ return triggered; } - /** - * Returns a new CompletableFuture that is completed - * when this CompletableFuture completes, with the result of the - * given function of this CompletableFuture's result. - * - * <p>If this CompletableFuture completes exceptionally, or the - * supplied function throws an exception, then the returned - * CompletableFuture completes exceptionally with a - * CompletionException holding the exception as its cause. - * - * @param fn the function to use to compute the value of - * the returned CompletableFuture - * @return the new CompletableFuture - */ - public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn) { + // CompletionStage methods + + public <U> CompletableFuture<U> thenApply + (Function<? super T,? extends U> fn) { return doThenApply(fn, null); } - /** - * Returns a new CompletableFuture that is asynchronously completed - * when this CompletableFuture completes, with the result of the - * given function of this CompletableFuture's result from a - * task running in the {@link ForkJoinPool#commonPool()}. - * - * <p>If this CompletableFuture completes exceptionally, or the - * supplied function throws an exception, then the returned - * CompletableFuture completes exceptionally with a - * CompletionException holding the exception as its cause. - * - * @param fn the function to use to compute the value of - * the returned CompletableFuture - * @return the new CompletableFuture - */ public <U> CompletableFuture<U> thenApplyAsync (Function<? super T,? extends U> fn) { return doThenApply(fn, ForkJoinPool.commonPool()); } - /** - * Returns a new CompletableFuture that is asynchronously completed - * when this CompletableFuture completes, with the result of the - * given function of this CompletableFuture's result from a - * task running in the given executor. - * - * <p>If this CompletableFuture completes exceptionally, or the - * supplied function throws an exception, then the returned - * CompletableFuture completes exceptionally with a - * CompletionException holding the exception as its cause. - * - * @param fn the function to use to compute the value of - * the returned CompletableFuture - * @param executor the executor to use for asynchronous execution - * @return the new CompletableFuture - */ public <U> CompletableFuture<U> thenApplyAsync (Function<? super T,? extends U> fn, Executor executor) { @@ -1718,1149 +2379,228 @@ return doThenApply(fn, executor); } - private <U> CompletableFuture<U> doThenApply - (Function<? super T,? extends U> fn, - Executor e) { - if (fn == null) throw new NullPointerException(); - CompletableFuture<U> dst = new CompletableFuture<U>(); - ThenApply<T,U> d = null; - Object r; - if ((r = result) == null) { - CompletionNode p = new CompletionNode - (d = new ThenApply<T,U>(this, fn, dst, e)); - while ((r = result) == null) { - if (UNSAFE.compareAndSwapObject - (this, COMPLETIONS, p.next = completions, p)) - break; - } - } - if (r != null && (d == null || d.compareAndSet(0, 1))) { - T t; Throwable ex; - if (r instanceof AltResult) { - ex = ((AltResult)r).ex; - t = null; - } - else { - ex = null; - @SuppressWarnings("unchecked") T tr = (T) r; - t = tr; - } - U u = null; - if (ex == null) { - try { - if (e != null) - e.execute(new AsyncApply<T,U>(t, fn, dst)); - else - u = fn.apply(t); - } catch (Throwable rex) { - ex = rex; - } - } - if (e == null || ex != null) - dst.internalComplete(u, ex); - } - helpPostComplete(); - return dst; + public CompletableFuture<Void> thenAccept + (Consumer<? super T> action) { + return doThenAccept(action, null); } - /** - * Returns a new CompletableFuture that is completed - * when this CompletableFuture completes, after performing the given - * action with this CompletableFuture's result. - * - * <p>If this CompletableFuture completes exceptionally, or the - * supplied action throws an exception, then the returned - * CompletableFuture completes exceptionally with a - * CompletionException holding the exception as its cause. - * - * @param block the action to perform before completing the - * returned CompletableFuture - * @return the new CompletableFuture - */ - public CompletableFuture<Void> thenAccept(Consumer<? super T> block) { - return doThenAccept(block, null); + public CompletableFuture<Void> thenAcceptAsync + (Consumer<? super T> action) { + return doThenAccept(action, ForkJoinPool.commonPool()); } - /** - * Returns a new CompletableFuture that is asynchronously completed - * when this CompletableFuture completes, after performing the given - * action with this CompletableFuture's result from a task running - * in the {@link ForkJoinPool#commonPool()}. - * - * <p>If this CompletableFuture completes exceptionally, or the - * supplied action throws an exception, then the returned - * CompletableFuture completes exceptionally with a - * CompletionException holding the exception as its cause. - * - * @param block the action to perform before completing the - * returned CompletableFuture - * @return the new CompletableFuture - */ - public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> block) { - return doThenAccept(block, ForkJoinPool.commonPool()); - } - - /** - * Returns a new CompletableFuture that is asynchronously completed - * when this CompletableFuture completes, after performing the given - * action with this CompletableFuture's result from a task running - * in the given executor. - * - * <p>If this CompletableFuture completes exceptionally, or the - * supplied action throws an exception, then the returned - * CompletableFuture completes exceptionally with a - * CompletionException holding the exception as its cause. - * - * @param block the action to perform before completing the - * returned CompletableFuture - * @param executor the executor to use for asynchronous execution - * @return the new CompletableFuture - */ - public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> block, - Executor executor) { + public CompletableFuture<Void> thenAcceptAsync + (Consumer<? super T> action, + Executor executor) { if (executor == null) throw new NullPointerException(); - return doThenAccept(block, executor); + return doThenAccept(action, executor); } - private CompletableFuture<Void> doThenAccept(Consumer<? super T> fn, - Executor e) { - if (fn == null) throw new NullPointerException(); - CompletableFuture<Void> dst = new CompletableFuture<Void>(); - ThenAccept<T> d = null; - Object r; - if ((r = result) == null) { - CompletionNode p = new CompletionNode - (d = new ThenAccept<T>(this, fn, dst, e)); - while ((r = result) == null) { - if (UNSAFE.compareAndSwapObject - (this, COMPLETIONS, p.next = completions, p)) - break; - } - } - if (r != null && (d == null || d.compareAndSet(0, 1))) { - T t; Throwable ex; - if (r instanceof AltResult) { - ex = ((AltResult)r).ex; - t = null; - } - else { - ex = null; - @SuppressWarnings("unchecked") T tr = (T) r; - t = tr; - } - if (ex == null) { - try { - if (e != null) - e.execute(new AsyncAccept<T>(t, fn, dst)); - else - fn.accept(t); - } catch (Throwable rex) { - ex = rex; - } - } - if (e == null || ex != null) - dst.internalComplete(null, ex); - } - helpPostComplete(); - return dst; - } - - /** - * Returns a new CompletableFuture that is completed - * when this CompletableFuture completes, after performing the given - * action. - * - * <p>If this CompletableFuture completes exceptionally, or the - * supplied action throws an exception, then the returned - * CompletableFuture completes exceptionally with a - * CompletionException holding the exception as its cause. - * - * @param action the action to perform before completing the - * returned CompletableFuture - * @return the new CompletableFuture - */ - public CompletableFuture<Void> thenRun(Runnable action) { + public CompletableFuture<Void> thenRun + (Runnable action) { return doThenRun(action, null); } - /** - * Returns a new CompletableFuture that is asynchronously completed - * when this CompletableFuture completes, after performing the given - * action from a task running in the {@link ForkJoinPool#commonPool()}. - * - * <p>If this CompletableFuture completes exceptionally, or the - * supplied action throws an exception, then the returned - * CompletableFuture completes exceptionally with a - * CompletionException holding the exception as its cause. - * - * @param action the action to perform before completing the - * returned CompletableFuture - * @return the new CompletableFuture - */ - public CompletableFuture<Void> thenRunAsync(Runnable action) { + public CompletableFuture<Void> thenRunAsync + (Runnable action) { return doThenRun(action, ForkJoinPool.commonPool()); } - /** - * Returns a new CompletableFuture that is asynchronously completed - * when this CompletableFuture completes, after performing the given - * action from a task running in the given executor. - * - * <p>If this CompletableFuture completes exceptionally, or the - * supplied action throws an exception, then the returned - * CompletableFuture completes exceptionally with a - * CompletionException holding the exception as its cause. - * - * @param action the action to perform before completing the - * returned CompletableFuture - * @param executor the executor to use for asynchronous execution - * @return the new CompletableFuture - */ - public CompletableFuture<Void> thenRunAsync(Runnable action, - Executor executor) { + public CompletableFuture<Void> thenRunAsync + (Runnable action, + Executor executor) { if (executor == null) throw new NullPointerException(); return doThenRun(action, executor); } - private CompletableFuture<Void> doThenRun(Runnable action, - Executor e) { - if (action == null) throw new NullPointerException(); - CompletableFuture<Void> dst = new CompletableFuture<Void>(); - ThenRun d = null; - Object r; - if ((r = result) == null) { - CompletionNode p = new CompletionNode - (d = new ThenRun(this, action, dst, e)); - while ((r = result) == null) { - if (UNSAFE.compareAndSwapObject - (this, COMPLETIONS, p.next = completions, p)) - break; - } - } - if (r != null && (d == null || d.compareAndSet(0, 1))) { - Throwable ex; - if (r instanceof AltResult) - ex = ((AltResult)r).ex; - else - ex = null; - if (ex == null) { - try { - if (e != null) - e.execute(new AsyncRun(action, dst)); - else - action.run(); - } catch (Throwable rex) { - ex = rex; - } - } - if (e == null || ex != null) - dst.internalComplete(null, ex); - } - helpPostComplete(); - return dst; + public <U,V> CompletableFuture<V> thenCombine + (CompletionStage<? extends U> other, + BiFunction<? super T,? super U,? extends V> fn) { + return doThenCombine(other.toCompletableFuture(), fn, null); } - /** - * Returns a new CompletableFuture that is completed - * when both this and the other given CompletableFuture complete, - * with the result of the given function of the results of the two - * CompletableFutures. - * - * <p>If this and/or the other CompletableFuture complete - * exceptionally, or the supplied function throws an exception, - * then the returned CompletableFuture completes exceptionally - * with a CompletionException holding the exception as its cause. - * - * @param other the other CompletableFuture - * @param fn the function to use to compute the value of - * the returned CompletableFuture - * @return the new CompletableFuture - */ - public <U,V> CompletableFuture<V> thenCombine - (CompletableFuture<? extends U> other, + public <U,V> CompletableFuture<V> thenCombineAsync + (CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn) { - return doThenCombine(other, fn, null); + return doThenCombine(other.toCompletableFuture(), fn, + ForkJoinPool.commonPool()); } - /** - * Returns a new CompletableFuture that is asynchronously completed - * when both this and the other given CompletableFuture complete, - * with the result of the given function of the results of the two - * CompletableFutures from a task running in the - * {@link ForkJoinPool#commonPool()}. - * - * <p>If this and/or the other CompletableFuture complete - * exceptionally, or the supplied function throws an exception, - * then the returned CompletableFuture completes exceptionally - * with a CompletionException holding the exception as its cause. - * - * @param other the other CompletableFuture - * @param fn the function to use to compute the value of - * the returned CompletableFuture - * @return the new CompletableFuture - */ public <U,V> CompletableFuture<V> thenCombineAsync - (CompletableFuture<? extends U> other, - BiFunction<? super T,? super U,? extends V> fn) { - return doThenCombine(other, fn, ForkJoinPool.commonPool()); - } - - /** - * Returns a new CompletableFuture that is asynchronously completed - * when both this and the other given CompletableFuture complete, - * with the result of the given function of the results of the two - * CompletableFutures from a task running in the given executor. - * - * <p>If this and/or the other CompletableFuture complete - * exceptionally, or the supplied function throws an exception, - * then the returned CompletableFuture completes exceptionally - * with a CompletionException holding the exception as its cause. - * - * @param other the other CompletableFuture - * @param fn the function to use to compute the value of - * the returned CompletableFuture - * @param executor the executor to use for asynchronous execution - * @return the new CompletableFuture - */ - public <U,V> CompletableFuture<V> thenCombineAsync - (CompletableFuture<? extends U> other, + (CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn, Executor executor) { if (executor == null) throw new NullPointerException(); - return doThenCombine(other, fn, executor); + return doThenCombine(other.toCompletableFuture(), fn, executor); + } + + public <U> CompletableFuture<Void> thenAcceptBoth + (CompletionStage<? extends U> other, + BiConsumer<? super T, ? super U> action) { + return doThenAcceptBoth(other.toCompletableFuture(), action, null); } - private <U,V> CompletableFuture<V> doThenCombine - (CompletableFuture<? extends U> other, - BiFunction<? super T,? super U,? extends V> fn, - Executor e) { - if (other == null || fn == null) throw new NullPointerException(); - CompletableFuture<V> dst = new CompletableFuture<V>(); - ThenCombine<T,U,V> d = null; - Object r, s = null; - if ((r = result) == null || (s = other.result) == null) { - d = new ThenCombine<T,U,V>(this, other, fn, dst, e); - CompletionNode q = null, p = new CompletionNode(d); - while ((r == null && (r = result) == null) || - (s == null && (s = other.result) == null)) { - if (q != null) { - if (s != null || - UNSAFE.compareAndSwapObject - (other, COMPLETIONS, q.next = other.completions, q)) - break; - } - else if (r != null || - UNSAFE.compareAndSwapObject - (this, COMPLETIONS, p.next = completions, p)) { - if (s != null) - break; - q = new CompletionNode(d); - } - } - } - if (r != null && s != null && (d == null || d.compareAndSet(0, 1))) { - T t; U u; Throwable ex; - if (r instanceof AltResult) { - ex = ((AltResult)r).ex; - t = null; - } - else { - ex = null; - @SuppressWarnings("unchecked") T tr = (T) r; - t = tr; - } - if (ex != null) - u = null; - else if (s instanceof AltResult) { - ex = ((AltResult)s).ex; - u = null; - } - else { - @SuppressWarnings("unchecked") U us = (U) s; - u = us; - } - V v = null; - if (ex == null) { - try { - if (e != null) - e.execute(new AsyncCombine<T,U,V>(t, u, fn, dst)); - else - v = fn.apply(t, u); - } catch (Throwable rex) { - ex = rex; - } - } - if (e == null || ex != null) - dst.internalComplete(v, ex); - } - helpPostComplete(); - other.helpPostComplete(); - return dst; + public <U> CompletableFuture<Void> thenAcceptBothAsync + (CompletionStage<? extends U> other, + BiConsumer<? super T, ? super U> action) { + return doThenAcceptBoth(other.toCompletableFuture(), action, + ForkJoinPool.commonPool()); } - /** - * Returns a new CompletableFuture that is completed - * when both this and the other given CompletableFuture complete, - * after performing the given action with the results of the two - * CompletableFutures. - * - * <p>If this and/or the other CompletableFuture complete - * exceptionally, or the supplied action throws an exception, - * then the returned CompletableFuture completes exceptionally - * with a CompletionException holding the exception as its cause. - * - * @param other the other CompletableFuture - * @param block the action to perform before completing the - * returned CompletableFuture - * @return the new CompletableFuture - */ - public <U> CompletableFuture<Void> thenAcceptBoth - (CompletableFuture<? extends U> other, - BiConsumer<? super T, ? super U> block) { - return doThenAcceptBoth(other, block, null); + public <U> CompletableFuture<Void> thenAcceptBothAsync + (CompletionStage<? extends U> other, + BiConsumer<? super T, ? super U> action, + Executor executor) { + if (executor == null) throw new NullPointerException(); + return doThenAcceptBoth(other.toCompletableFuture(), action, executor); } - /** - * Returns a new CompletableFuture that is asynchronously completed - * when both this and the other given CompletableFuture complete, - * after performing the given action with the results of the two - * CompletableFutures from a task running in the {@link - * ForkJoinPool#commonPool()}. - * - * <p>If this and/or the other CompletableFuture complete - * exceptionally, or the supplied action throws an exception, - * then the returned CompletableFuture completes exceptionally - * with a CompletionException holding the exception as its cause. - * - * @param other the other CompletableFuture - * @param block the action to perform before completing the - * returned CompletableFuture - * @return the new CompletableFuture - */ - public <U> CompletableFuture<Void> thenAcceptBothAsync - (CompletableFuture<? extends U> other, - BiConsumer<? super T, ? super U> block) { - return doThenAcceptBoth(other, block, ForkJoinPool.commonPool()); - } - - /** - * Returns a new CompletableFuture that is asynchronously completed - * when both this and the other given CompletableFuture complete, - * after performing the given action with the results of the two - * CompletableFutures from a task running in the given executor. - * - * <p>If this and/or the other CompletableFuture complete - * exceptionally, or the supplied action throws an exception, - * then the returned CompletableFuture completes exceptionally - * with a CompletionException holding the exception as its cause. - * - * @param other the other CompletableFuture - * @param block the action to perform before completing the - * returned CompletableFuture - * @param executor the executor to use for asynchronous execution - * @return the new CompletableFuture - */ - public <U> CompletableFuture<Void> thenAcceptBothAsync - (CompletableFuture<? extends U> other, - BiConsumer<? super T, ? super U> block, - Executor executor) { - if (executor == null) throw new NullPointerException(); - return doThenAcceptBoth(other, block, executor); + public CompletableFuture<Void> runAfterBoth + (CompletionStage<?> other, + Runnable action) { + return doRunAfterBoth(other.toCompletableFuture(), action, null); } - private <U> CompletableFuture<Void> doThenAcceptBoth - (CompletableFuture<? extends U> other, - BiConsumer<? super T,? super U> fn, - Executor e) { - if (other == null || fn == null) throw new NullPointerException(); - CompletableFuture<Void> dst = new CompletableFuture<Void>(); - ThenAcceptBoth<T,U> d = null; - Object r, s = null; - if ((r = result) == null || (s = other.result) == null) { - d = new ThenAcceptBoth<T,U>(this, other, fn, dst, e); - CompletionNode q = null, p = new CompletionNode(d); - while ((r == null && (r = result) == null) || - (s == null && (s = other.result) == null)) { - if (q != null) { - if (s != null || - UNSAFE.compareAndSwapObject - (other, COMPLETIONS, q.next = other.completions, q)) - break; - } - else if (r != null || - UNSAFE.compareAndSwapObject - (this, COMPLETIONS, p.next = completions, p)) { - if (s != null) - break; - q = new CompletionNode(d); - } - } - } - if (r != null && s != null && (d == null || d.compareAndSet(0, 1))) { - T t; U u; Throwable ex; - if (r instanceof AltResult) { - ex = ((AltResult)r).ex; - t = null; - } - else { - ex = null; - @SuppressWarnings("unchecked") T tr = (T) r; - t = tr; - } - if (ex != null) - u = null; - else if (s instanceof AltResult) { - ex = ((AltResult)s).ex; - u = null; - } - else { - @SuppressWarnings("unchecked") U us = (U) s; - u = us; - } - if (ex == null) { - try { - if (e != null) - e.execute(new AsyncAcceptBoth<T,U>(t, u, fn, dst)); - else - fn.accept(t, u); - } catch (Throwable rex) { - ex = rex; - } - } - if (e == null || ex != null) - dst.internalComplete(null, ex); - } - helpPostComplete(); - other.helpPostComplete(); - return dst; + public CompletableFuture<Void> runAfterBothAsync + (CompletionStage<?> other, + Runnable action) { + return doRunAfterBoth(other.toCompletableFuture(), action, + ForkJoinPool.commonPool()); } - /** - * Returns a new CompletableFuture that is completed - * when both this and the other given CompletableFuture complete, - * after performing the given action. - * - * <p>If this and/or the other CompletableFuture complete - * exceptionally, or the supplied action throws an exception, - * then the returned CompletableFuture completes exceptionally - * with a CompletionException holding the exception as its cause. - * - * @param other the other CompletableFuture - * @param action the action to perform before completing the - * returned CompletableFuture - * @return the new CompletableFuture - */ - public CompletableFuture<Void> runAfterBoth(CompletableFuture<?> other, - Runnable action) { - return doRunAfterBoth(other, action, null); - } - - /** - * Returns a new CompletableFuture that is asynchronously completed - * when both this and the other given CompletableFuture complete, - * after performing the given action from a task running in the - * {@link ForkJoinPool#commonPool()}. - * - * <p>If this and/or the other CompletableFuture complete - * exceptionally, or the supplied action throws an exception, - * then the returned CompletableFuture completes exceptionally - * with a CompletionException holding the exception as its cause. - * - * @param other the other CompletableFuture - * @param action the action to perform before completing the - * returned CompletableFuture - * @return the new CompletableFuture - */ - public CompletableFuture<Void> runAfterBothAsync(CompletableFuture<?> other, - Runnable action) { - return doRunAfterBoth(other, action, ForkJoinPool.commonPool()); + public CompletableFuture<Void> runAfterBothAsync + (CompletionStage<?> other, + Runnable action, + Executor executor) { + if (executor == null) throw new NullPointerException(); + return doRunAfterBoth(other.toCompletableFuture(), action, executor); } - /** - * Returns a new CompletableFuture that is asynchronously completed - * when both this and the other given CompletableFuture complete, - * after performing the given action from a task running in the - * given executor. - * - * <p>If this and/or the other CompletableFuture complete - * exceptionally, or the supplied action throws an exception, - * then the returned CompletableFuture completes exceptionally - * with a CompletionException holding the exception as its cause. - * - * @param other the other CompletableFuture - * @param action the action to perform before completing the - * returned CompletableFuture - * @param executor the executor to use for asynchronous execution - * @return the new CompletableFuture - */ - public CompletableFuture<Void> runAfterBothAsync(CompletableFuture<?> other, - Runnable action, - Executor executor) { - if (executor == null) throw new NullPointerException(); - return doRunAfterBoth(other, action, executor); - } - private CompletableFuture<Void> doRunAfterBoth(CompletableFuture<?> other, - Runnable action, - Executor e) { - if (other == null || action == null) throw new NullPointerException(); - CompletableFuture<Void> dst = new CompletableFuture<Void>(); - RunAfterBoth d = null; - Object r, s = null; - if ((r = result) == null || (s = other.result) == null) { - d = new RunAfterBoth(this, other, action, dst, e); - CompletionNode q = null, p = new CompletionNode(d); - while ((r == null && (r = result) == null) || - (s == null && (s = other.result) == null)) { - if (q != null) { - if (s != null || - UNSAFE.compareAndSwapObject - (other, COMPLETIONS, q.next = other.completions, q)) - break; - } - else if (r != null || - UNSAFE.compareAndSwapObject - (this, COMPLETIONS, p.next = completions, p)) { - if (s != null) - break; - q = new CompletionNode(d); - } - } - } - if (r != null && s != null && (d == null || d.compareAndSet(0, 1))) { - Throwable ex; - if (r instanceof AltResult) - ex = ((AltResult)r).ex; - else - ex = null; - if (ex == null && (s instanceof AltResult)) - ex = ((AltResult)s).ex; - if (ex == null) { - try { - if (e != null) - e.execute(new AsyncRun(action, dst)); - else - action.run(); - } catch (Throwable rex) { - ex = rex; - } - } - if (e == null || ex != null) - dst.internalComplete(null, ex); - } - helpPostComplete(); - other.helpPostComplete(); - return dst; + public <U> CompletableFuture<U> applyToEither + (CompletionStage<? extends T> other, + Function<? super T, U> fn) { + return doApplyToEither(other.toCompletableFuture(), fn, null); } - /** - * Returns a new CompletableFuture that is completed - * when either this or the other given CompletableFuture completes, - * with the result of the given function of either this or the other - * CompletableFuture's result. - * - * <p>If this and/or the other CompletableFuture complete - * exceptionally, then the returned CompletableFuture may also do so, - * with a CompletionException holding one of these exceptions as its - * cause. No guarantees are made about which result or exception is - * used in the returned CompletableFuture. If the supplied function - * throws an exception, then the returned CompletableFuture completes - * exceptionally with a CompletionException holding the exception as - * its cause. - * - * @param other the other CompletableFuture - * @param fn the function to use to compute the value of - * the returned CompletableFuture - * @return the new CompletableFuture - */ - public <U> CompletableFuture<U> applyToEither - (CompletableFuture<? extends T> other, + public <U> CompletableFuture<U> applyToEitherAsync + (CompletionStage<? extends T> other, Function<? super T, U> fn) { - return doApplyToEither(other, fn, null); + return doApplyToEither(other.toCompletableFuture(), fn, + ForkJoinPool.commonPool()); } - /** - * Returns a new CompletableFuture that is asynchronously completed - * when either this or the other given CompletableFuture completes, - * with the result of the given function of either this or the other - * CompletableFuture's result from a task running in the - * {@link ForkJoinPool#commonPool()}. - * - * <p>If this and/or the other CompletableFuture complete - * exceptionally, then the returned CompletableFuture may also do so, - * with a CompletionException holding one of these exceptions as its - * cause. No guarantees are made about which result or exception is - * used in the returned CompletableFuture. If the supplied function - * throws an exception, then the returned CompletableFuture completes - * exceptionally with a CompletionException holding the exception as - * its cause. - * - * @param other the other CompletableFuture - * @param fn the function to use to compute the value of - * the returned CompletableFuture - * @return the new CompletableFuture - */ public <U> CompletableFuture<U> applyToEitherAsync - (CompletableFuture<? extends T> other, - Function<? super T, U> fn) { - return doApplyToEither(other, fn, ForkJoinPool.commonPool()); - } - - /** - * Returns a new CompletableFuture that is asynchronously completed - * when either this or the other given CompletableFuture completes, - * with the result of the given function of either this or the other - * CompletableFuture's result from a task running in the - * given executor. - * - * <p>If this and/or the other CompletableFuture complete - * exceptionally, then the returned CompletableFuture may also do so, - * with a CompletionException holding one of these exceptions as its - * cause. No guarantees are made about which result or exception is - * used in the returned CompletableFuture. If the supplied function - * throws an exception, then the returned CompletableFuture completes - * exceptionally with a CompletionException holding the exception as - * its cause. - * - * @param other the other CompletableFuture - * @param fn the function to use to compute the value of - * the returned CompletableFuture - * @param executor the executor to use for asynchronous execution - * @return the new CompletableFuture - */ - public <U> CompletableFuture<U> applyToEitherAsync - (CompletableFuture<? extends T> other, + (CompletionStage<? extends T> other, Function<? super T, U> fn, Executor executor) { if (executor == null) throw new NullPointerException(); - return doApplyToEither(other, fn, executor); + return doApplyToEither(other.toCompletableFuture(), fn, executor); } - private <U> CompletableFuture<U> doApplyToEither - (CompletableFuture<? extends T> other, - Function<? super T, U> fn, - Executor e) { - if (other == null || fn == null) throw new NullPointerException(); - CompletableFuture<U> dst = new CompletableFuture<U>(); - ApplyToEither<T,U> d = null; - Object r; - if ((r = result) == null && (r = other.result) == null) { - d = new ApplyToEither<T,U>(this, other, fn, dst, e); - CompletionNode q = null, p = new CompletionNode(d); - while ((r = result) == null && (r = other.result) == null) { - if (q != null) { - if (UNSAFE.compareAndSwapObject - (other, COMPLETIONS, q.next = other.completions, q)) - break; - } - else if (UNSAFE.compareAndSwapObject - (this, COMPLETIONS, p.next = completions, p)) - q = new CompletionNode(d); - } - } - if (r != null && (d == null || d.compareAndSet(0, 1))) { - T t; Throwable ex; - if (r instanceof AltResult) { - ex = ((AltResult)r).ex; - t = null; - } - else { - ex = null; - @SuppressWarnings("unchecked") T tr = (T) r; - t = tr; - } - U u = null; - if (ex == null) { - try { - if (e != null) - e.execute(new AsyncApply<T,U>(t, fn, dst)); - else - u = fn.apply(t); - } catch (Throwable rex) { - ex = rex; - } - } - if (e == null || ex != null) - dst.internalComplete(u, ex); - } - helpPostComplete(); - other.helpPostComplete(); - return dst; + public CompletableFuture<Void> acceptEither + (CompletionStage<? extends T> other, + Consumer<? super T> action) { + return doAcceptEither(other.toCompletableFuture(), action, null); } - /** - * Returns a new CompletableFuture that is completed - * when either this or the other given CompletableFuture completes, - * after performing the given action with the result of either this - * or the other CompletableFuture's result. - * - * <p>If this and/or the other CompletableFuture complete - * exceptionally, then the returned CompletableFuture may also do so, - * with a CompletionException holding one of these exceptions as its - * cause. No guarantees are made about which result or exception is - * used in the returned CompletableFuture. If the supplied action - * throws an exception, then the returned CompletableFuture completes - * exceptionally with a CompletionException holding the exception as - * its cause. - * - * @param other the other CompletableFuture - * @param block the action to perform before completing the - * returned CompletableFuture - * @return the new CompletableFuture - */ - public CompletableFuture<Void> acceptEither - (CompletableFuture<? extends T> other, - Consumer<? super T> block) { - return doAcceptEither(other, block, null); - } - - /** - * Returns a new CompletableFuture that is asynchronously completed - * when either this or the other given CompletableFuture completes, - * after performing the given action with the result of either this - * or the other CompletableFuture's result from a task running in - * the {@link ForkJoinPool#commonPool()}. - * - * <p>If this and/or the other CompletableFuture complete - * exceptionally, then the returned CompletableFuture may also do so, - * with a CompletionException holding one of these exceptions as its - * cause. No guarantees are made about which result or exception is - * used in the returned CompletableFuture. If the supplied action - * throws an exception, then the returned CompletableFuture completes - * exceptionally with a CompletionException holding the exception as - * its cause. - * - * @param other the other CompletableFuture - * @param block the action to perform before completing the - * returned CompletableFuture - * @return the new CompletableFuture - */ public CompletableFuture<Void> acceptEitherAsync - (CompletableFuture<? extends T> other, - Consumer<? super T> block) { - return doAcceptEither(other, block, ForkJoinPool.commonPool()); + (CompletionStage<? extends T> other, + Consumer<? super T> action) { + return doAcceptEither(other.toCompletableFuture(), action, + ForkJoinPool.commonPool()); } - /** - * Returns a new CompletableFuture that is asynchronously completed - * when either this or the other given CompletableFuture completes, - * after performing the given action with the result of either this - * or the other CompletableFuture's result from a task running in - * the given executor. - * - * <p>If this and/or the other CompletableFuture complete - * exceptionally, then the returned CompletableFuture may also do so, - * with a CompletionException holding one of these exceptions as its - * cause. No guarantees are made about which result or exception is - * used in the returned CompletableFuture. If the supplied action - * throws an exception, then the returned CompletableFuture completes - * exceptionally with a CompletionException holding the exception as - * its cause. - * - * @param other the other CompletableFuture - * @param block the action to perform before completing the - * returned CompletableFuture - * @param executor the executor to use for asynchronous execution - * @return the new CompletableFuture - */ public CompletableFuture<Void> acceptEitherAsync - (CompletableFuture<? extends T> other, - Consumer<? super T> block, + (CompletionStage<? extends T> other, + Consumer<? super T> action, Executor executor) { if (executor == null) throw new NullPointerException(); - return doAcceptEither(other, block, executor); + return doAcceptEither(other.toCompletableFuture(), action, executor); } - private CompletableFuture<Void> doAcceptEither - (CompletableFuture<? extends T> other, - Consumer<? super T> fn, - Executor e) { - if (other == null || fn == null) throw new NullPointerException(); - CompletableFuture<Void> dst = new CompletableFuture<Void>(); - AcceptEither<T> d = null; - Object r; - if ((r = result) == null && (r = other.result) == null) { - d = new AcceptEither<T>(this, other, fn, dst, e); - CompletionNode q = null, p = new CompletionNode(d); - while ((r = result) == null && (r = other.result) == null) { - if (q != null) { - if (UNSAFE.compareAndSwapObject - (other, COMPLETIONS, q.next = other.completions, q)) - break; - } - else if (UNSAFE.compareAndSwapObject - (this, COMPLETIONS, p.next = completions, p)) - q = new CompletionNode(d); - } - } - if (r != null && (d == null || d.compareAndSet(0, 1))) { - T t; Throwable ex; - if (r instanceof AltResult) { - ex = ((AltResult)r).ex; - t = null; - } - else { - ex = null; - @SuppressWarnings("unchecked") T tr = (T) r; - t = tr; - } - if (ex == null) { - try { - if (e != null) - e.execute(new AsyncAccept<T>(t, fn, dst)); - else - fn.accept(t); - } catch (Throwable rex) { - ex = rex; - } - } - if (e == null || ex != null) - dst.internalComplete(null, ex); - } - helpPostComplete(); - other.helpPostComplete(); - return dst; + public CompletableFuture<Void> runAfterEither(CompletionStage<?> other, + Runnable action) { + return doRunAfterEither(other.toCompletableFuture(), action, null); } - /** - * Returns a new CompletableFuture that is completed - * when either this or the other given CompletableFuture completes, - * after performing the given action. - * - * <p>If this and/or the other CompletableFuture complete - * exceptionally, then the returned CompletableFuture may also do so, - * with a CompletionException holding one of these exceptions as its - * cause. No guarantees are made about which result or exception is - * used in the returned CompletableFuture. If the supplied action - * throws an exception, then the returned CompletableFuture completes - * exceptionally with a CompletionException holding the exception as - * its cause. - * - * @param other the other CompletableFuture - * @param action the action to perform before completing the - * returned CompletableFuture - * @return the new CompletableFuture - */ - public CompletableFuture<Void> runAfterEither(CompletableFuture<?> other, - Runnable action) { - return doRunAfterEither(other, action, null); + public CompletableFuture<Void> runAfterEitherAsync + (CompletionStage<?> other, + Runnable action) { + return doRunAfterEither(other.toCompletableFuture(), action, + ForkJoinPool.commonPool()); } - /** - * Returns a new CompletableFuture that is asynchronously completed - * when either this or the other given CompletableFuture completes, - * after performing the given action from a task running in the - * {@link ForkJoinPool#commonPool()}. - * - * <p>If this and/or the other CompletableFuture complete - * exceptionally, then the returned CompletableFuture may also do so, - * with a CompletionException holding one of these exceptions as its - * cause. No guarantees are made about which result or exception is - * used in the returned CompletableFuture. If the supplied action - * throws an exception, then the returned CompletableFuture completes - * exceptionally with a CompletionException holding the exception as - * its cause. - * - * @param other the other CompletableFuture - * @param action the action to perform before completing the - * returned CompletableFuture - * @return the new CompletableFuture - */ public CompletableFuture<Void> runAfterEitherAsync - (CompletableFuture<?> other, - Runnable action) { - return doRunAfterEither(other, action, ForkJoinPool.commonPool()); - } - - /** - * Returns a new CompletableFuture that is asynchronously completed - * when either this or the other given CompletableFuture completes, - * after performing the given action from a task running in the - * given executor. - * - * <p>If this and/or the other CompletableFuture complete - * exceptionally, then the returned CompletableFuture may also do so, - * with a CompletionException holding one of these exceptions as its - * cause. No guarantees are made about which result or exception is - * used in the returned CompletableFuture. If the supplied action - * throws an exception, then the returned CompletableFuture completes - * exceptionally with a CompletionException holding the exception as - * its cause. - * - * @param other the other CompletableFuture - * @param action the action to perform before completing the - * returned CompletableFuture - * @param executor the executor to use for asynchronous execution - * @return the new CompletableFuture - */ - public CompletableFuture<Void> runAfterEitherAsync - (CompletableFuture<?> other, + (CompletionStage<?> other, Runnable action, Executor executor) { if (executor == null) throw new NullPointerException(); - return doRunAfterEither(other, action, executor); + return doRunAfterEither(other.toCompletableFuture(), action, executor); } - private CompletableFuture<Void> doRunAfterEither - (CompletableFuture<?> other, - Runnable action, - Executor e) { - if (other == null || action == null) throw new NullPointerException(); - CompletableFuture<Void> dst = new CompletableFuture<Void>(); - RunAfterEither d = null; - Object r; - if ((r = result) == null && (r = other.result) == null) { - d = new RunAfterEither(this, other, action, dst, e); - CompletionNode q = null, p = new CompletionNode(d); - while ((r = result) == null && (r = other.result) == null) { - if (q != null) { - if (UNSAFE.compareAndSwapObject - (other, COMPLETIONS, q.next = other.completions, q)) - break; - } - else if (UNSAFE.compareAndSwapObject - (this, COMPLETIONS, p.next = completions, p)) - q = new CompletionNode(d); - } - } - if (r != null && (d == null || d.compareAndSet(0, 1))) { - Throwable ex; - if (r instanceof AltResult) - ex = ((AltResult)r).ex; - else - ex = null; - if (ex == null) { - try { - if (e != null) - e.execute(new AsyncRun(action, dst)); - else - action.run(); - } catch (Throwable rex) { - ex = rex; - } - } - if (e == null || ex != null) - dst.internalComplete(null, ex); - } - helpPostComplete(); - other.helpPostComplete(); - return dst; - } - - /** - * Returns a CompletableFuture that upon completion, has the same - * value as produced by the given function of the result of this - * CompletableFuture. - * - * <p>If this CompletableFuture completes exceptionally, then the - * returned CompletableFuture also does so, with a - * CompletionException holding this exception as its cause. - * Similarly, if the computed CompletableFuture completes - * exceptionally, then so does the returned CompletableFuture. - * - * @param fn the function returning a new CompletableFuture - * @return the CompletableFuture - */ public <U> CompletableFuture<U> thenCompose - (Function<? super T, CompletableFuture<U>> fn) { + (Function<? super T, ? extends CompletionStage<U>> fn) { return doThenCompose(fn, null); } - /** - * Returns a CompletableFuture that upon completion, has the same - * value as that produced asynchronously using the {@link - * ForkJoinPool#commonPool()} by the given function of the result - * of this CompletableFuture. - * - * <p>If this CompletableFuture completes exceptionally, then the - * returned CompletableFuture also does so, with a - * CompletionException holding this exception as its cause. - * Similarly, if the computed CompletableFuture completes - * exceptionally, then so does the returned CompletableFuture. - * - * @param fn the function returning a new CompletableFuture - * @return the CompletableFuture - */ public <U> CompletableFuture<U> thenComposeAsync - (Function<? super T, CompletableFuture<U>> fn) { + (Function<? super T, ? extends CompletionStage<U>> fn) { return doThenCompose(fn, ForkJoinPool.commonPool()); } - /** - * Returns a CompletableFuture that upon completion, has the same - * value as that produced asynchronously using the given executor - * by the given function of this CompletableFuture. - * - * <p>If this CompletableFuture completes exceptionally, then the - * returned CompletableFuture also does so, with a - * CompletionException holding this exception as its cause. - * Similarly, if the computed CompletableFuture completes - * exceptionally, then so does the returned CompletableFuture. - * - * @param fn the function returning a new CompletableFuture - * @param executor the executor to use for asynchronous execution - * @return the CompletableFuture - */ public <U> CompletableFuture<U> thenComposeAsync - (Function<? super T, CompletableFuture<U>> fn, + (Function<? super T, ? extends CompletionStage<U>> fn, Executor executor) { if (executor == null) throw new NullPointerException(); return doThenCompose(fn, executor); } - private <U> CompletableFuture<U> doThenCompose - (Function<? super T, CompletableFuture<U>> fn, - Executor e) { - if (fn == null) throw new NullPointerException(); - CompletableFuture<U> dst = null; - ThenCompose<T,U> d = null; - Object r; - if ((r = result) == null) { - dst = new CompletableFuture<U>(); - CompletionNode p = new CompletionNode - (d = new ThenCompose<T,U>(this, fn, dst, e)); - while ((r = result) == null) { - if (UNSAFE.compareAndSwapObject - (this, COMPLETIONS, p.next = completions, p)) - break; - } - } - if (r != null && (d == null || d.compareAndSet(0, 1))) { - T t; Throwable ex; - if (r instanceof AltResult) { - ex = ((AltResult)r).ex; - t = null; - } - else { - ex = null; - @SuppressWarnings("unchecked") T tr = (T) r; - t = tr; - } - if (ex == null) { - if (e != null) { - if (dst == null) - dst = new CompletableFuture<U>(); - e.execute(new AsyncCompose<T,U>(t, fn, dst)); - } - else { - try { - if ((dst = fn.apply(t)) == null) - ex = new NullPointerException(); - } catch (Throwable rex) { - ex = rex; - } - } - } - if (dst == null) - dst = new CompletableFuture<U>(); - if (e == null || ex != null) - dst.internalComplete(null, ex); - } - helpPostComplete(); - dst.helpPostComplete(); - return dst; + public CompletableFuture<T> whenComplete + (BiConsumer<? super T, ? super Throwable> action) { + return doWhenComplete(action, null); + } + + public CompletableFuture<T> whenCompleteAsync + (BiConsumer<? super T, ? super Throwable> action) { + return doWhenComplete(action, ForkJoinPool.commonPool()); + } + + public CompletableFuture<T> whenCompleteAsync + (BiConsumer<? super T, ? super Throwable> action, + Executor executor) { + if (executor == null) throw new NullPointerException(); + return doWhenComplete(action, executor); + } + + public <U> CompletableFuture<U> handle + (BiFunction<? super T, Throwable, ? extends U> fn) { + return doHandle(fn, null); } + public <U> CompletableFuture<U> handleAsync + (BiFunction<? super T, Throwable, ? extends U> fn) { + return doHandle(fn, ForkJoinPool.commonPool()); + } + + public <U> CompletableFuture<U> handleAsync + (BiFunction<? super T, Throwable, ? extends U> fn, + Executor executor) { + if (executor == null) throw new NullPointerException(); + return doHandle(fn, executor); + } + + /** + * Returns this CompletableFuture + * + * @return this CompletableFuture + */ + public CompletableFuture<T> toCompletableFuture() { + return this; + } + + // not in interface CompletionStage + /** * Returns a new CompletableFuture that is completed when this * CompletableFuture completes, with the result of the given @@ -2868,6 +2608,8 @@ * completion when it completes exceptionally; otherwise, if this * CompletableFuture completes normally, then the returned * CompletableFuture also completes normally with the same value. + * Note: More flexible versions of this functionality are + * available using methods {@code whenComplete} and {@code handle}. * * @param fn the function to use to compute the value of the * returned CompletableFuture if this CompletableFuture completed @@ -2882,7 +2624,8 @@ Object r; if ((r = result) == null) { CompletionNode p = - new CompletionNode(d = new ExceptionCompletion<T>(this, fn, dst)); + new CompletionNode(d = new ExceptionCompletion<T> + (this, fn, dst)); while ((r = result) == null) { if (UNSAFE.compareAndSwapObject(this, COMPLETIONS, p.next = completions, p)) @@ -2910,59 +2653,6 @@ return dst; } - /** - * Returns a new CompletableFuture that is completed when this - * CompletableFuture completes, with the result of the given - * function of the result and exception of this CompletableFuture's - * completion. The given function is invoked with the result (or - * {@code null} if none) and the exception (or {@code null} if none) - * of this CompletableFuture when complete. - * - * @param fn the function to use to compute the value of the - * returned CompletableFuture - * @return the new CompletableFuture - */ - public <U> CompletableFuture<U> handle - (BiFunction<? super T, Throwable, ? extends U> fn) { - if (fn == null) throw new NullPointerException(); - CompletableFuture<U> dst = new CompletableFuture<U>(); - HandleCompletion<T,U> d = null; - Object r; - if ((r = result) == null) { - CompletionNode p = - new CompletionNode(d = new HandleCompletion<T,U>(this, fn, dst)); - while ((r = result) == null) { - if (UNSAFE.compareAndSwapObject(this, COMPLETIONS, - p.next = completions, p)) - break; - } - } - if (r != null && (d == null || d.compareAndSet(0, 1))) { - T t; Throwable ex; - if (r instanceof AltResult) { - ex = ((AltResult)r).ex; - t = null; - } - else { - ex = null; - @SuppressWarnings("unchecked") T tr = (T) r; - t = tr; - } - U u; Throwable dx; - try { - u = fn.apply(t, ex); - dx = null; - } catch (Throwable rex) { - dx = rex; - u = null; - } - dst.internalComplete(u, dx); - } - helpPostComplete(); - return dst; - } - - /* ------------- Arbitrary-arity constructions -------------- */ /* @@ -3215,6 +2905,21 @@ } /** + * Returns {@code true} if this CompletableFuture completed + * exceptionally, in any way. Possible causes include + * cancellation, explicit invocation of {@code + * completeExceptionally}, and abrupt termination of a + * CompletionStage action. + * + * @return {@code true} if this CompletableFuture completed + * exceptionally + */ + public boolean isCompletedExceptionally() { + Object r; + return ((r = result) instanceof AltResult) && r != NIL; + } + + /** * Forcibly sets or resets the value subsequently returned by * method {@link #get()} and related methods, whether or not * already completed. This method is designed for use only in
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/share/classes/java/util/concurrent/CompletionStage.java Fri Aug 02 09:38:13 2013 -0400 @@ -0,0 +1,760 @@ +/* + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This code is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 only, as + * published by the Free Software Foundation. Oracle designates this + * particular file as subject to the "Classpath" exception as provided + * by Oracle in the LICENSE file that accompanied this code. + * + * This code is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * version 2 for more details (a copy is included in the LICENSE file that + * accompanied this code). + * + * You should have received a copy of the GNU General Public License version + * 2 along with this work; if not, write to the Free Software Foundation, + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA + * or visit www.oracle.com if you need additional information or have any + * questions. + */ + +/* + * This file is available under and governed by the GNU General Public + * License version 2 only, as published by the Free Software Foundation. + * However, the following notice accompanied the original version of this + * file: + * + * Written by Doug Lea with assistance from members of JCP JSR-166 + * Expert Group and released to the public domain, as explained at + * http://creativecommons.org/publicdomain/zero/1.0/ + */ + +package java.util.concurrent; +import java.util.function.Supplier; +import java.util.function.Consumer; +import java.util.function.BiConsumer; +import java.util.function.Function; +import java.util.function.BiFunction; +import java.util.concurrent.Executor; + +/** + * A stage of a possibly asynchronous computation, that performs an + * action or computes a value when another CompletionStage completes. + * A stage completes upon termination of its computation, but this may + * in turn trigger other dependent stages. The functionality defined + * in this interface takes only a few basic forms, which expand out to + * a larger set of methods to capture a range of usage styles: <ul> + * + * <li>The computation performed by a stage may be expressed as a + * Function, Consumer, or Runnable (using methods with names including + * <em>apply</em>, <em>accept</em>, or <em>run</em>, respectively) + * depending on whether it requires arguments and/or produces results. + * For example, {@code stage.thenApply(x -> square(x)).thenAccept(x -> + * System.out.print(x)).thenRun(() -> System.out.println())}. An + * additional form (<em>compose</em>) applies functions of stages + * themselves, rather than their results. </li> + * + * <li> One stage's execution may be triggered by completion of a + * single stage, or both of two stages, or either of two stages. + * Dependencies on a single stage are arranged using methods with + * prefix <em>then</em>. Those triggered by completion of + * <em>both</em> of two stages may <em>combine</em> their results or + * effects, using correspondingly named methods. Those triggered by + * <em>either</em> of two stages make no guarantees about which of the + * results or effects are used for the dependent stage's + * computation.</li> + * + * <li> Dependencies among stages control the triggering of + * computations, but do not otherwise guarantee any particular + * ordering. Additionally, execution of a new stage's computations may + * be arranged in any of three ways: default execution, default + * asynchronous execution (using methods with suffix <em>async</em> + * that employ the stage's default asynchronous execution facility), + * or custom (via a supplied {@link Executor}). The execution + * properties of default and async modes are specified by + * CompletionStage implementations, not this interface. Methods with + * explicit Executor arguments may have arbitrary execution + * properties, and might not even support concurrent execution, but + * are arranged for processing in a way that accommodates asynchrony. + * + * <li> Two method forms support processing whether the triggering + * stage completed normally or exceptionally: Method {@link + * #whenComplete whenComplete} allows injection of an action + * regardless of outcome, otherwise preserving the outcome in its + * completion. Method {@link #handle handle} additionally allows the + * stage to compute a replacement result that may enable further + * processing by other dependent stages. In all other cases, if a + * stage's computation terminates abruptly with an (unchecked) + * exception or error, then all dependent stages requiring its + * completion complete exceptionally as well, with a {@link + * CompletionException} holding the exception as its cause. If a + * stage is dependent on <em>both</em> of two stages, and both + * complete exceptionally, then the CompletionException may correspond + * to either one of these exceptions. If a stage is dependent on + * <em>either</em> of two others, and only one of them completes + * exceptionally, no guarantees are made about whether the dependent + * stage completes normally or exceptionally. In the case of method + * {@code whenComplete}, when the supplied action itself encounters an + * exception, then the stage exceptionally completes with this + * exception if not already completed exceptionally.</li> + * + * </ul> + * + * <p>All methods adhere to the above triggering, execution, and + * exceptional completion specifications (which are not repeated in + * individual method specifications). Additionally, while arguments + * used to pass a completion result (that is, for parameters of type + * {@code T}) for methods accepting them may be null, passing a null + * value for any other parameter will result in a {@link + * NullPointerException} being thrown. + * + * <p>This interface does not define methods for initially creating, + * forcibly completing normally or exceptionally, probing completion + * status or results, or awaiting completion of a stage. + * Implementations of CompletionStage may provide means of achieving + * such effects, as appropriate. Method {@link #toCompletableFuture} + * enables interoperability among different implementations of this + * interface by providing a common conversion type. + * + * @author Doug Lea + * @since 1.8 + */ +public interface CompletionStage<T> { + + /** + * Returns a new CompletionStage that, when this stage completes + * normally, is executed with this stage's result as the argument + * to the supplied function. + * + * See the {@link CompletionStage} documentation for rules + * covering exceptional completion. + * + * @param fn the function to use to compute the value of + * the returned CompletionStage + * @param <U> the function's return type + * @return the new CompletionStage + */ + public <U> CompletionStage<U> thenApply(Function<? super T,? extends U> fn); + + /** + * Returns a new CompletionStage that, when this stage completes + * normally, is executed using this stage's default asynchronous + * execution facility, with this stage's result as the argument to + * the supplied function. + * + * See the {@link CompletionStage} documentation for rules + * covering exceptional completion. + * + * @param fn the function to use to compute the value of + * the returned CompletionStage + * @param <U> the function's return type + * @return the new CompletionStage + */ + public <U> CompletionStage<U> thenApplyAsync + (Function<? super T,? extends U> fn); + + /** + * Returns a new CompletionStage that, when this stage completes + * normally, is executed using the supplied Executor, with this + * stage's result as the argument to the supplied function. + * + * See the {@link CompletionStage} documentation for rules + * covering exceptional completion. + * + * @param fn the function to use to compute the value of + * the returned CompletionStage + * @param executor the executor to use for asynchronous execution + * @param <U> the function's return type + * @return the new CompletionStage + */ + public <U> CompletionStage<U> thenApplyAsync + (Function<? super T,? extends U> fn, + Executor executor); + + /** + * Returns a new CompletionStage that, when this stage completes + * normally, is executed with this stage's result as the argument + * to the supplied action. + * + * See the {@link CompletionStage} documentation for rules + * covering exceptional completion. + * + * @param action the action to perform before completing the + * returned CompletionStage + * @return the new CompletionStage + */ + public CompletionStage<Void> thenAccept(Consumer<? super T> action); + + /** + * Returns a new CompletionStage that, when this stage completes + * normally, is executed using this stage's default asynchronous + * execution facility, with this stage's result as the argument to + * the supplied action. + * + * See the {@link CompletionStage} documentation for rules + * covering exceptional completion. + * + * @param action the action to perform before completing the + * returned CompletionStage + * @return the new CompletionStage + */ + public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action); + + /** + * Returns a new CompletionStage that, when this stage completes + * normally, is executed using the supplied Executor, with this + * stage's result as the argument to the supplied action. + * + * See the {@link CompletionStage} documentation for rules + * covering exceptional completion. + * + * @param action the action to perform before completing the + * returned CompletionStage + * @param executor the executor to use for asynchronous execution + * @return the new CompletionStage + */ + public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action, + Executor executor); + /** + * Returns a new CompletionStage that, when this stage completes + * normally, executes the given action. + * + * See the {@link CompletionStage} documentation for rules + * covering exceptional completion. + * + * @param action the action to perform before completing the + * returned CompletionStage + * @return the new CompletionStage + */ + public CompletionStage<Void> thenRun(Runnable action); + + /** + * Returns a new CompletionStage that, when this stage completes + * normally, executes the given action using this stage's default + * asynchronous execution facility. + * + * See the {@link CompletionStage} documentation for rules + * covering exceptional completion. + * + * @param action the action to perform before completing the + * returned CompletionStage + * @return the new CompletionStage + */ + public CompletionStage<Void> thenRunAsync(Runnable action); + + /** + * Returns a new CompletionStage that, when this stage completes + * normally, executes the given action using the supplied Executor. + * + * See the {@link CompletionStage} documentation for rules + * covering exceptional completion. + * + * @param action the action to perform before completing the + * returned CompletionStage + * @param executor the executor to use for asynchronous execution + * @return the new CompletionStage + */ + public CompletionStage<Void> thenRunAsync(Runnable action, + Executor executor); + + /** + * Returns a new CompletionStage that, when this and the other + * given stage both complete normally, is executed with the two + * results as arguments to the supplied function. + * + * See the {@link CompletionStage} documentation for rules + * covering exceptional completion. + * + * @param other the other CompletionStage + * @param fn the function to use to compute the value of + * the returned CompletionStage + * @param <U> the type of the other CompletionStage's result + * @param <V> the function's return type + * @return the new CompletionStage + */ + public <U,V> CompletionStage<V> thenCombine + (CompletionStage<? extends U> other, + BiFunction<? super T,? super U,? extends V> fn); + + /** + * Returns a new CompletionStage that, when this and the other + * given stage complete normally, is executed using this stage's + * default asynchronous execution facility, with the two results + * as arguments to the supplied function. + * + * See the {@link CompletionStage} documentation for rules + * covering exceptional completion. + * + * @param other the other CompletionStage + * @param fn the function to use to compute the value of + * the returned CompletionStage + * @param <U> the type of the other CompletionStage's result + * @param <V> the function's return type + * @return the new CompletionStage + */ + public <U,V> CompletionStage<V> thenCombineAsync + (CompletionStage<? extends U> other, + BiFunction<? super T,? super U,? extends V> fn); + + /** + * Returns a new CompletionStage that, when this and the other + * given stage complete normally, is executed using the supplied + * executor, with the two results as arguments to the supplied + * function. + * + * See the {@link CompletionStage} documentation for rules + * covering exceptional completion. + * + * @param other the other CompletionStage + * @param fn the function to use to compute the value of + * the returned CompletionStage + * @param executor the executor to use for asynchronous execution + * @param <U> the type of the other CompletionStage's result + * @param <V> the function's return type + * @return the new CompletionStage + */ + public <U,V> CompletionStage<V> thenCombineAsync + (CompletionStage<? extends U> other, + BiFunction<? super T,? super U,? extends V> fn, + Executor executor); + + /** + * Returns a new CompletionStage that, when this and the other + * given stage both complete normally, is executed with the two + * results as arguments to the supplied action. + * + * See the {@link CompletionStage} documentation for rules + * covering exceptional completion. + * + * @param other the other CompletionStage + * @param action the action to perform before completing the + * returned CompletionStage + * @param <U> the type of the other CompletionStage's result + * @return the new CompletionStage + */ + public <U> CompletionStage<Void> thenAcceptBoth + (CompletionStage<? extends U> other, + BiConsumer<? super T, ? super U> action); + + /** + * Returns a new CompletionStage that, when this and the other + * given stage complete normally, is executed using this stage's + * default asynchronous execution facility, with the two results + * as arguments to the supplied action. + * + * @param other the other CompletionStage + * @param action the action to perform before completing the + * returned CompletionStage + * @param <U> the type of the other CompletionStage's result + * @return the new CompletionStage + */ + public <U> CompletionStage<Void> thenAcceptBothAsync + (CompletionStage<? extends U> other, + BiConsumer<? super T, ? super U> action); + + /** + * Returns a new CompletionStage that, when this and the other + * given stage complete normally, is executed using the supplied + * executor, with the two results as arguments to the supplied + * function. + * + * @param other the other CompletionStage + * @param action the action to perform before completing the + * returned CompletionStage + * @param executor the executor to use for asynchronous execution + * @param <U> the type of the other CompletionStage's result + * @return the new CompletionStage + */ + public <U> CompletionStage<Void> thenAcceptBothAsync + (CompletionStage<? extends U> other, + BiConsumer<? super T, ? super U> action, + Executor executor); + + /** + * Returns a new CompletionStage that, when this and the other + * given stage both complete normally, executes the given action. + * + * See the {@link CompletionStage} documentation for rules + * covering exceptional completion. + * + * @param other the other CompletionStage + * @param action the action to perform before completing the + * returned CompletionStage + * @return the new CompletionStage + */ + public CompletionStage<Void> runAfterBoth(CompletionStage<?> other, + Runnable action); + /** + * Returns a new CompletionStage that, when this and the other + * given stage complete normally, executes the given action using + * this stage's default asynchronous execution facility. + * + * See the {@link CompletionStage} documentation for rules + * covering exceptional completion. + * + * @param other the other CompletionStage + * @param action the action to perform before completing the + * returned CompletionStage + * @return the new CompletionStage + */ + public CompletionStage<Void> runAfterBothAsync(CompletionStage<?> other, + Runnable action); + + /** + * Returns a new CompletionStage that, when this and the other + * given stage complete normally, executes the given action using + * the supplied executor + * + * See the {@link CompletionStage} documentation for rules + * covering exceptional completion. + * + * @param other the other CompletionStage + * @param action the action to perform before completing the + * returned CompletionStage + * @param executor the executor to use for asynchronous execution + * @return the new CompletionStage + */ + public CompletionStage<Void> runAfterBothAsync(CompletionStage<?> other, + Runnable action, + Executor executor); + /** + * Returns a new CompletionStage that, when either this or the + * other given stage complete normally, is executed with the + * corresponding result as argument to the supplied function. + * + * See the {@link CompletionStage} documentation for rules + * covering exceptional completion. + * + * @param other the other CompletionStage + * @param fn the function to use to compute the value of + * the returned CompletionStage + * @param <U> the function's return type + * @return the new CompletionStage + */ + public <U> CompletionStage<U> applyToEither + (CompletionStage<? extends T> other, + Function<? super T, U> fn); + + /** + * Returns a new CompletionStage that, when either this or the + * other given stage complete normally, is executed using this + * stage's default asynchronous execution facility, with the + * corresponding result as argument to the supplied function. + * + * See the {@link CompletionStage} documentation for rules + * covering exceptional completion. + * + * @param other the other CompletionStage + * @param fn the function to use to compute the value of + * the returned CompletionStage + * @param <U> the function's return type + * @return the new CompletionStage + */ + public <U> CompletionStage<U> applyToEitherAsync + (CompletionStage<? extends T> other, + Function<? super T, U> fn); + + /** + * Returns a new CompletionStage that, when either this or the + * other given stage complete normally, is executed using the + * supplied executor, with the corresponding result as argument to + * the supplied function. + * + * See the {@link CompletionStage} documentation for rules + * covering exceptional completion. + * + * @param other the other CompletionStage + * @param fn the function to use to compute the value of + * the returned CompletionStage + * @param executor the executor to use for asynchronous execution + * @param <U> the function's return type + * @return the new CompletionStage + */ + public <U> CompletionStage<U> applyToEitherAsync + (CompletionStage<? extends T> other, + Function<? super T, U> fn, + Executor executor); + + /** + * Returns a new CompletionStage that, when either this or the + * other given stage complete normally, is executed with the + * corresponding result as argument to the supplied action. + * + * See the {@link CompletionStage} documentation for rules + * covering exceptional completion. + * + * @param other the other CompletionStage + * @param action the action to perform before completing the + * returned CompletionStage + * @return the new CompletionStage + */ + public CompletionStage<Void> acceptEither + (CompletionStage<? extends T> other, + Consumer<? super T> action); + + /** + * Returns a new CompletionStage that, when either this or the + * other given stage complete normally, is executed using this + * stage's default asynchronous execution facility, with the + * corresponding result as argument to the supplied action. + * + * See the {@link CompletionStage} documentation for rules + * covering exceptional completion. + * + * @param other the other CompletionStage + * @param action the action to perform before completing the + * returned CompletionStage + * @return the new CompletionStage + */ + public CompletionStage<Void> acceptEitherAsync + (CompletionStage<? extends T> other, + Consumer<? super T> action); + + /** + * Returns a new CompletionStage that, when either this or the + * other given stage complete normally, is executed using the + * supplied executor, with the corresponding result as argument to + * the supplied function. + * + * See the {@link CompletionStage} documentation for rules + * covering exceptional completion. + * + * @param other the other CompletionStage + * @param action the action to perform before completing the + * returned CompletionStage + * @param executor the executor to use for asynchronous execution + * @return the new CompletionStage + */ + public CompletionStage<Void> acceptEitherAsync + (CompletionStage<? extends T> other, + Consumer<? super T> action, + Executor executor); + + /** + * Returns a new CompletionStage that, when either this or the + * other given stage complete normally, executes the given action. + * + * See the {@link CompletionStage} documentation for rules + * covering exceptional completion. + * + * @param other the other CompletionStage + * @param action the action to perform before completing the + * returned CompletionStage + * @return the new CompletionStage + */ + public CompletionStage<Void> runAfterEither(CompletionStage<?> other, + Runnable action); + + /** + * Returns a new CompletionStage that, when either this or the + * other given stage complete normally, executes the given action + * using this stage's default asynchronous execution facility. + * + * See the {@link CompletionStage} documentation for rules + * covering exceptional completion. + * + * @param other the other CompletionStage + * @param action the action to perform before completing the + * returned CompletionStage + * @return the new CompletionStage + */ + public CompletionStage<Void> runAfterEitherAsync + (CompletionStage<?> other, + Runnable action); + + /** + * Returns a new CompletionStage that, when either this or the + * other given stage complete normally, executes the given action + * using supplied executor. + * + * See the {@link CompletionStage} documentation for rules + * covering exceptional completion. + * + * @param other the other CompletionStage + * @param action the action to perform before completing the + * returned CompletionStage + * @param executor the executor to use for asynchronous execution + * @return the new CompletionStage + */ + public CompletionStage<Void> runAfterEitherAsync + (CompletionStage<?> other, + Runnable action, + Executor executor); + + /** + * Returns a new CompletionStage that, when this stage completes + * normally, is executed with this stage as the argument + * to the supplied function. + * + * See the {@link CompletionStage} documentation for rules + * covering exceptional completion. + * + * @param fn the function returning a new CompletionStage + * @param <U> the type of the returned CompletionStage's result + * @return the CompletionStage + */ + public <U> CompletionStage<U> thenCompose + (Function<? super T, ? extends CompletionStage<U>> fn); + + /** + * Returns a new CompletionStage that, when this stage completes + * normally, is executed using this stage's default asynchronous + * execution facility, with this stage as the argument to the + * supplied function. + * + * See the {@link CompletionStage} documentation for rules + * covering exceptional completion. + * + * @param fn the function returning a new CompletionStage + * @param <U> the type of the returned CompletionStage's result + * @return the CompletionStage + */ + public <U> CompletionStage<U> thenComposeAsync + (Function<? super T, ? extends CompletionStage<U>> fn); + + /** + * Returns a new CompletionStage that, when this stage completes + * normally, is executed using the supplied Executor, with this + * stage's result as the argument to the supplied function. + * + * See the {@link CompletionStage} documentation for rules + * covering exceptional completion. + * + * @param fn the function returning a new CompletionStage + * @param executor the executor to use for asynchronous execution + * @param <U> the type of the returned CompletionStage's result + * @return the CompletionStage + */ + public <U> CompletionStage<U> thenComposeAsync + (Function<? super T, ? extends CompletionStage<U>> fn, + Executor executor); + + /** + * Returns a new CompletionStage that, when this stage completes + * exceptionally, is executed with this stage's exception as the + * argument to the supplied function. Otherwise, if this stage + * completes normally, then the returned stage also completes + * normally with the same value. + * + * @param fn the function to use to compute the value of the + * returned CompletionStage if this CompletionStage completed + * exceptionally + * @return the new CompletionStage + */ + public CompletionStage<T> exceptionally + (Function<Throwable, ? extends T> fn); + + /** + * Returns a new CompletionStage with the same result or exception + * as this stage, and when this stage completes, executes the + * given action with the result (or {@code null} if none) and the + * exception (or {@code null} if none) of this stage. + * + * @param action the action to perform + * @return the new CompletionStage + */ + public CompletionStage<T> whenComplete + (BiConsumer<? super T, ? super Throwable> action); + + /** + * Returns a new CompletionStage with the same result or exception + * as this stage, and when this stage completes, executes the + * given action executes the given action using this stage's + * default asynchronous execution facility, with the result (or + * {@code null} if none) and the exception (or {@code null} if + * none) of this stage as arguments. + * + * @param action the action to perform + * @return the new CompletionStage + */ + public CompletionStage<T> whenCompleteAsync + (BiConsumer<? super T, ? super Throwable> action); + + /** + * Returns a new CompletionStage with the same result or exception + * as this stage, and when this stage completes, executes using + * the supplied Executor, the given action with the result (or + * {@code null} if none) and the exception (or {@code null} if + * none) of this stage as arguments. + * + * @param action the action to perform + * @param executor the executor to use for asynchronous execution + * @return the new CompletionStage + */ + public CompletionStage<T> whenCompleteAsync + (BiConsumer<? super T, ? super Throwable> action, + Executor executor); + + /** + * Returns a new CompletionStage that, when this stage completes + * either normally or exceptionally, is executed with this stage's + * result and exception as arguments to the supplied function. + * The given function is invoked with the result (or {@code null} + * if none) and the exception (or {@code null} if none) of this + * stage when complete as arguments. + * + * @param fn the function to use to compute the value of the + * returned CompletionStage + * @param <U> the function's return type + * @return the new CompletionStage + */ + public <U> CompletionStage<U> handle + (BiFunction<? super T, Throwable, ? extends U> fn); + + /** + * Returns a new CompletionStage that, when this stage completes + * either normally or exceptionally, is executed using this stage's + * default asynchronous execution facility, with this stage's + * result and exception as arguments to the supplied function. + * The given function is invoked with the result (or {@code null} + * if none) and the exception (or {@code null} if none) of this + * stage when complete as arguments. + * + * @param fn the function to use to compute the value of the + * returned CompletionStage + * @param <U> the function's return type + * @return the new CompletionStage + */ + public <U> CompletionStage<U> handleAsync + (BiFunction<? super T, Throwable, ? extends U> fn); + + /** + * Returns a new CompletionStage that, when this stage completes + * either normally or exceptionally, is executed using the + * supplied executor, with this stage's result and exception as + * arguments to the supplied function. The given function is + * invoked with the result (or {@code null} if none) and the + * exception (or {@code null} if none) of this stage when complete + * as arguments. + * + * @param fn the function to use to compute the value of the + * returned CompletionStage + * @param executor the executor to use for asynchronous execution + * @param <U> the function's return type + * @return the new CompletionStage + */ + public <U> CompletionStage<U> handleAsync + (BiFunction<? super T, Throwable, ? extends U> fn, + Executor executor); + + /** + * Returns a {@link CompletableFuture} maintaining the same + * completion properties as this stage. If this stage is already a + * CompletableFuture, this method may return this stage itself. + * Otherwise, invocation of this method may be equivalent in + * effect to {@code thenApply(x -> x)}, but returning an instance + * of type {@code CompletableFuture}. A CompletionStage + * implementation that does not choose to interoperate with others + * may throw {@code UnsupportedOperationException}. + * + * @return the CompletableFuture + * @throws UnsupportedOperationException if this implementation + * does not interoperate with CompletableFuture + */ + public CompletableFuture<T> toCompletableFuture(); + +}
--- a/test/ProblemList.txt Fri Aug 02 08:37:25 2013 -0400 +++ b/test/ProblemList.txt Fri Aug 02 09:38:13 2013 -0400 @@ -370,12 +370,6 @@ # Filed 6772009 java/util/concurrent/locks/ReentrantLock/CancelledLockLoops.java generic-all -# 8020435 -java/util/concurrent/CompletableFuture/Basic.java generic-all - -# 8020291 -java/util/Random/RandomStreamTest.java generic-all - # 7041639, Solaris DSA keypair generation bug java/util/TimeZone/TimeZoneDatePermissionCheck.sh solaris-all
--- a/test/java/util/concurrent/CompletableFuture/Basic.java Fri Aug 02 08:37:25 2013 -0400 +++ b/test/java/util/concurrent/CompletableFuture/Basic.java Fri Aug 02 09:38:13 2013 -0400 @@ -34,6 +34,8 @@ /* * @test * @bug 8005696 + * @run main Basic + * @run main/othervm -Djava.util.concurrent.ForkJoinPool.common.parallelism=0 Basic * @summary Basic tests for CompletableFuture * @author Chris Hegarty */