diff options
Diffstat (limited to 'luni/src/main/java/java/util/concurrent/ForkJoinTask.java')
-rw-r--r-- | luni/src/main/java/java/util/concurrent/ForkJoinTask.java | 142 |
1 files changed, 84 insertions, 58 deletions
diff --git a/luni/src/main/java/java/util/concurrent/ForkJoinTask.java b/luni/src/main/java/java/util/concurrent/ForkJoinTask.java index 818788e..c6bc6de 100644 --- a/luni/src/main/java/java/util/concurrent/ForkJoinTask.java +++ b/luni/src/main/java/java/util/concurrent/ForkJoinTask.java @@ -32,8 +32,8 @@ import java.lang.reflect.Constructor; * * <p>A "main" {@code ForkJoinTask} begins execution when it is * explicitly submitted to a {@link ForkJoinPool}, or, if not already - * engaged in a ForkJoin computation, commenced in the {@link - * ForkJoinPool#commonPool()} via {@link #fork}, {@link #invoke}, or + * engaged in a ForkJoin computation, commenced in the {@code + * ForkJoinPool.commonPool()} via {@link #fork}, {@link #invoke}, or * related methods. Once started, it will usually in turn start other * subtasks. As indicated by the name of this class, many programs * using {@code ForkJoinTask} employ only methods {@link #fork} and @@ -74,10 +74,9 @@ import java.lang.reflect.Constructor; * but doing do requires three further considerations: (1) Completion * of few if any <em>other</em> tasks should be dependent on a task * that blocks on external synchronization or I/O. Event-style async - * tasks that are never joined (for example, those subclassing {@link - * CountedCompleter}) often fall into this category. (2) To minimize - * resource impact, tasks should be small; ideally performing only the - * (possibly) blocking action. (3) Unless the {@link + * tasks that are never joined often fall into this category. + * (2) To minimize resource impact, tasks should be small; ideally + * performing only the (possibly) blocking action. (3) Unless the {@link * ForkJoinPool.ManagedBlocker} API is used, or the number of possibly * blocked tasks is known to be less than the pool's {@link * ForkJoinPool#getParallelism} level, the pool cannot guarantee that @@ -120,13 +119,11 @@ import java.lang.reflect.Constructor; * <p>The ForkJoinTask class is not usually directly subclassed. * Instead, you subclass one of the abstract classes that support a * particular style of fork/join processing, typically {@link - * RecursiveAction} for most computations that do not return results, - * {@link RecursiveTask} for those that do, and {@link - * CountedCompleter} for those in which completed actions trigger - * other actions. Normally, a concrete ForkJoinTask subclass declares - * fields comprising its parameters, established in a constructor, and - * then defines a {@code compute} method that somehow uses the control - * methods supplied by this base class. + * RecursiveAction} for most computations that do not return results + * and {@link RecursiveTask} for those that do. Normally, a concrete + * ForkJoinTask subclass declares fields comprising its parameters, + * established in a constructor, and then defines a {@code compute} + * method that somehow uses the control methods supplied by this base class. * * <p>Method {@link #join} and its variants are appropriate for use * only when completion dependencies are acyclic; that is, the @@ -136,11 +133,11 @@ import java.lang.reflect.Constructor; * supports other methods and techniques (for example the use of * {@link Phaser}, {@link #helpQuiesce}, and {@link #complete}) that * may be of use in constructing custom subclasses for problems that - * are not statically structured as DAGs. To support such usages a + * are not statically structured as DAGs. To support such usages, a * ForkJoinTask may be atomically <em>tagged</em> with a {@code short} - * value using {@link #setForkJoinTaskTag} or {@link - * #compareAndSetForkJoinTaskTag} and checked using {@link - * #getForkJoinTaskTag}. The ForkJoinTask implementation does not use + * value using {@code setForkJoinTaskTag} or {@code + * compareAndSetForkJoinTaskTag} and checked using {@code + * getForkJoinTaskTag}. The ForkJoinTask implementation does not use * these {@code protected} methods or tags for any purpose, but they * may be of use in the construction of specialized subclasses. For * example, parallel graph traversals can use the supplied methods to @@ -178,7 +175,6 @@ import java.lang.reflect.Constructor; * execution. Serialization is not relied on during execution itself. * * @since 1.7 - * @hide * @author Doug Lea */ public abstract class ForkJoinTask<V> implements Future<V>, Serializable { @@ -286,25 +282,35 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable { */ private int externalAwaitDone() { int s; - ForkJoinPool.externalHelpJoin(this); - boolean interrupted = false; - while ((s = status) >= 0) { - if (U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) { - synchronized (this) { - if (status >= 0) { - try { - wait(); - } catch (InterruptedException ie) { - interrupted = true; + ForkJoinPool cp = ForkJoinPool.common; + if ((s = status) >= 0) { + if (cp != null) { + if (this instanceof CountedCompleter) + s = cp.externalHelpComplete((CountedCompleter<?>)this); + else if (cp.tryExternalUnpush(this)) + s = doExec(); + } + if (s >= 0 && (s = status) >= 0) { + boolean interrupted = false; + do { + if (U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) { + synchronized (this) { + if (status >= 0) { + try { + wait(); + } catch (InterruptedException ie) { + interrupted = true; + } + } + else + notifyAll(); } } - else - notifyAll(); - } + } while ((s = status) >= 0); + if (interrupted) + Thread.currentThread().interrupt(); } } - if (interrupted) - Thread.currentThread().interrupt(); return s; } @@ -313,9 +319,15 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable { */ private int externalInterruptibleAwaitDone() throws InterruptedException { int s; + ForkJoinPool cp = ForkJoinPool.common; if (Thread.interrupted()) throw new InterruptedException(); - ForkJoinPool.externalHelpJoin(this); + if ((s = status) >= 0 && cp != null) { + if (this instanceof CountedCompleter) + cp.externalHelpComplete((CountedCompleter<?>)this); + else if (cp.tryExternalUnpush(this)) + doExec(); + } while ((s = status) >= 0) { if (U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) { synchronized (this) { @@ -329,7 +341,6 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable { return s; } - /** * Implementation for join, get, quietlyJoin. Directly handles * only cases of already-completed, external wait, and @@ -601,14 +612,9 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable { /** * A version of "sneaky throw" to relay exceptions */ - static void rethrow(final Throwable ex) { - if (ex != null) { - if (ex instanceof Error) - throw (Error)ex; - if (ex instanceof RuntimeException) - throw (RuntimeException)ex; - throw uncheckedThrowable(ex, RuntimeException.class); - } + static void rethrow(Throwable ex) { + if (ex != null) + ForkJoinTask.<RuntimeException>uncheckedThrow(ex); } /** @@ -617,8 +623,8 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable { * unchecked exceptions */ @SuppressWarnings("unchecked") static <T extends Throwable> - T uncheckedThrowable(final Throwable t, final Class<T> c) { - return (T)t; // rely on vacuous cast + void uncheckedThrow(Throwable t) throws T { + throw (T)t; // rely on vacuous cast } /** @@ -635,8 +641,8 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable { /** * Arranges to asynchronously execute this task in the pool the - * current task is running in, if applicable, or using the {@link - * ForkJoinPool#commonPool()} if not {@link #inForkJoinPool}. While + * current task is running in, if applicable, or using the {@code + * ForkJoinPool.commonPool()} if not {@link #inForkJoinPool}. While * it is not necessarily enforced, it is a usage error to fork a * task more than once unless it has completed and been * reinitialized. Subsequent modifications to the state of this @@ -653,7 +659,7 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable { if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ((ForkJoinWorkerThread)t).workQueue.push(this); else - ForkJoinPool.commonPool.externalPush(this); + ForkJoinPool.common.externalPush(this); return this; } @@ -774,8 +780,6 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable { * @param tasks the collection of tasks * @return the tasks argument, to simplify usage * @throws NullPointerException if tasks or any element are null - - * @hide */ public static <T extends ForkJoinTask<?>> Collection<T> invokeAll(Collection<T> tasks) { if (!(tasks instanceof RandomAccess) || !(tasks instanceof List<?>)) { @@ -831,7 +835,7 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable { * <p>This method is designed to be invoked by <em>other</em> * tasks. To terminate the current task, you can just return or * throw an unchecked exception from its computation method, or - * invoke {@link #completeExceptionally}. + * invoke {@link #completeExceptionally(Throwable)}. * * @param mayInterruptIfRunning this value has no effect in the * default implementation because interrupts are not used to @@ -984,6 +988,7 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable { // Messy in part because we measure in nanosecs, but wait in millisecs int s; long ms; long ns = unit.toNanos(timeout); + ForkJoinPool cp; if ((s = status) >= 0 && ns > 0L) { long deadline = System.nanoTime() + ns; ForkJoinPool p = null; @@ -995,8 +1000,12 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable { w = wt.workQueue; p.helpJoinOnce(w, this); // no retries on failure } - else - ForkJoinPool.externalHelpJoin(this); + else if ((cp = ForkJoinPool.common) != null) { + if (this instanceof CountedCompleter) + cp.externalHelpComplete((CountedCompleter<?>)this); + else if (cp.tryExternalUnpush(this)) + doExec(); + } boolean canBlock = false; boolean interrupted = false; try { @@ -1004,7 +1013,7 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable { if (w != null && w.qlock < 0) cancelIgnoringExceptions(this); else if (!canBlock) { - if (p == null || p.tryCompensate()) + if (p == null || p.tryCompensate(p.ctl)) canBlock = true; } else { @@ -1080,7 +1089,7 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable { wt.pool.helpQuiescePool(wt.workQueue); } else - ForkJoinPool.externalHelpQuiescePool(); + ForkJoinPool.quiesceCommonPool(); } /** @@ -1145,7 +1154,7 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable { Thread t; return (((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ? ((ForkJoinWorkerThread)t).workQueue.tryUnpush(this) : - ForkJoinPool.tryExternalUnpush(this)); + ForkJoinPool.common.tryExternalUnpush(this)); } /** @@ -1316,7 +1325,7 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable { * * @param e the expected tag value * @param tag the new tag value - * @return true if successful; i.e., the current value was + * @return {@code true} if successful; i.e., the current value was * equal to e and is now tag. * @since 1.8 * @hide @@ -1370,6 +1379,24 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable { } /** + * Adaptor for Runnables in which failure forces worker exception + */ + static final class RunnableExecuteAction extends ForkJoinTask<Void> { + final Runnable runnable; + RunnableExecuteAction(Runnable runnable) { + if (runnable == null) throw new NullPointerException(); + this.runnable = runnable; + } + public final Void getRawResult() { return null; } + public final void setRawResult(Void v) { } + public final boolean exec() { runnable.run(); return true; } + void internalPropagateException(Throwable ex) { + rethrow(ex); // rethrow outside exec() catches. + } + private static final long serialVersionUID = 5232453952276885070L; + } + + /** * Adaptor for Callables */ static final class AdaptedCallable<T> extends ForkJoinTask<T> @@ -1480,5 +1507,4 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable { throw new Error(e); } } - } |