8319447: Improve performance of delayed task handling by DougLea · Pull Request #23702 · openjdk/jdk (original) (raw)
@@ -273,6 +273,8 @@ final boolean casNext(Aux c, Aux v) { // used only in cancellation
static final int ABNORMAL = 1 << 16;
static final int THROWN = 1 << 17;
static final int HAVE_EXCEPTION = DONE | ABNORMAL | THROWN;
static final int NUH_BIT = 24; // no external caller helping
static final int NO_USER_HELP = 1 << NUH_BIT;
static final int MARKER = 1 << 30; // utility marker
static final int SMASK = 0xffff; // short bits for tags
static final int UNCOMPENSATE = 1 << 16; // helpJoin sentinel
@@ -292,6 +294,12 @@ private int getAndBitwiseOrStatus(int v) {
private boolean casStatus(int c, int v) {
return U.compareAndSetInt(this, STATUS, c, v);
}
final int noUserHelp() { // nonvolatile read; return 0 or 1
return (U.getInt(this, STATUS) & NO_USER_HELP) >>> NUH_BIT;
}
final void setNoUserHelp() { // for use in constructors only
U.putInt(this, STATUS, NO_USER_HELP);
}
// Support for waiting and signalling
@@ -330,14 +338,9 @@ private void setDone() {
*/
final int trySetCancelled() {
int s;
for (;;) {
if ((s = status) < 0)
break;
if (casStatus(s, s | (DONE | ABNORMAL))) {
signalWaiters();
break;
}
}
if ((s = status) >= 0 &&
(s = getAndBitwiseOrStatus(DONE | ABNORMAL)) >= 0)
signalWaiters();
return s;
}
@@ -481,7 +484,7 @@ else if (casAux(a, next))
*/
private int awaitDone(boolean interruptible, long deadline) {
ForkJoinWorkerThread wt; ForkJoinPool p; ForkJoinPool.WorkQueue q;
Thread t; boolean internal; int s;
Thread t; boolean internal; int s, ss;
if (internal =
(t = Thread.currentThread()) instanceof ForkJoinWorkerThread) {
p = (wt = (ForkJoinWorkerThread)t).pool;
@@ -492,7 +495,7 @@ private int awaitDone(boolean interruptible, long deadline) {
return (((s = (p == null) ? 0 :
((this instanceof CountedCompleter) ?
p.helpComplete(this, q, internal) :
(this instanceof InterruptibleTask) && !internal ? status :
!internal && ((ss = status) & NO_USER_HELP) != 0 ? ss :
p.helpJoin(this, q, internal))) < 0)) ? s :
awaitDone(internal ? p : null, s, interruptible, deadline);
}
@@ -642,7 +645,7 @@ public final ForkJoinTask fork() {
p = wt.pool;
}
else
q = (p = ForkJoinPool.common).externalSubmissionQueue();
q = (p = ForkJoinPool.common).externalSubmissionQueue(false);
q.push(this, p, internal);
return this;
}
@@ -1160,7 +1163,7 @@ public static void helpQuiesce() {
*/
public void reinitialize() {
aux = null;
status = 0;
status &= NO_USER_HELP;
}
/**
@@ -1414,7 +1417,8 @@ public final boolean compareAndSetForkJoinTaskTag(short expect, short update) {
* @return the task
*/
public static ForkJoinTask<?> adapt(Runnable runnable) {
return new AdaptedRunnableAction(runnable);
return new AdaptedRunnableAction(
Objects.requireNonNull(runnable));
}
/**
@@ -1428,7 +1432,8 @@ public static ForkJoinTask<?> adapt(Runnable runnable) {
* @return the task
*/
public static ForkJoinTask adapt(Runnable runnable, T result) {
return new AdaptedRunnable(runnable, result);
return new AdaptedRunnable(
Objects.requireNonNull(runnable), result);
}
/**
@@ -1442,7 +1447,8 @@ public static ForkJoinTask adapt(Runnable runnable, T result) {
* @return the task
*/
public static ForkJoinTask adapt(Callable<? extends T> callable) {
return new AdaptedCallable(callable);
return new AdaptedCallable(
Objects.requireNonNull(callable));
}
/**
@@ -1460,7 +1466,8 @@ public static ForkJoinTask adapt(Callable<? extends T> callable) {
* @since 19
*/
public static ForkJoinTask adaptInterruptible(Callable<? extends T> callable) {
return new AdaptedInterruptibleCallable(callable);
return new AdaptedInterruptibleCallable(
Objects.requireNonNull(callable));
}
/**
@@ -1479,7 +1486,8 @@ public static ForkJoinTask adaptInterruptible(Callable<? extends T> calla
* @since 22
*/
public static ForkJoinTask adaptInterruptible(Runnable runnable, T result) {
return new AdaptedInterruptibleRunnable(runnable, result);
return new AdaptedInterruptibleRunnable(
Objects.requireNonNull(runnable), result);
}
/**
@@ -1497,7 +1505,8 @@ public static ForkJoinTask adaptInterruptible(Runnable runnable, T result
* @since 22
*/
public static ForkJoinTask<?> adaptInterruptible(Runnable runnable) {
return new AdaptedInterruptibleRunnable(runnable, null);
return new AdaptedInterruptibleRunnable(
Objects.requireNonNull(runnable), null);
}
// Serialization support
@@ -1556,7 +1565,6 @@ static final class AdaptedRunnable extends ForkJoinTask
@SuppressWarnings("serial") // Conditionally serializable
T result;
AdaptedRunnable(Runnable runnable, T result) {
Objects.requireNonNull(runnable);
this.runnable = runnable;
this.result = result; // OK to set this even before completion
}
@@ -1578,7 +1586,6 @@ static final class AdaptedRunnableAction extends ForkJoinTask
@SuppressWarnings("serial") // Conditionally serializable
final Runnable runnable;
AdaptedRunnableAction(Runnable runnable) {
Objects.requireNonNull(runnable);
this.runnable = runnable;
}
public final Void getRawResult() { return null; }
@@ -1601,7 +1608,6 @@ static final class AdaptedCallable extends ForkJoinTask
@SuppressWarnings("serial") // Conditionally serializable
T result;
AdaptedCallable(Callable<? extends T> callable) {
Objects.requireNonNull(callable);
this.callable = callable;
}
public final T getRawResult() { return result; }
@@ -1636,6 +1642,9 @@ public String toString() {
abstract static class InterruptibleTask extends ForkJoinTask
implements RunnableFuture {
transient volatile Thread runner;
InterruptibleTask() {
setNoUserHelp();
}
abstract T compute() throws Exception;
public final boolean exec() {
Thread.interrupted();
@@ -1655,20 +1664,29 @@ public final boolean exec() {
} finally {
runner = null;
}
return postExec();
}
boolean postExec() { // cleanup and return completion status to doExec
return true;
}
final boolean interruptIfRunning(boolean enabled) {
Thread t;
if ((t = runner) == null) // return false if not running
return false;
if (enabled) {
try {
t.interrupt();
} catch (Throwable ignore) {
}
}
return true;
}
public boolean cancel(boolean mayInterruptIfRunning) {
Thread t;
if (trySetCancelled() >= 0) {
if (mayInterruptIfRunning && (t = runner) != null) {
try {
t.interrupt();
} catch (Throwable ignore) {
}
}
return true;
}
return isCancelled();
int s;
if ((s = trySetCancelled()) < 0)
return ((s & (ABNORMAL | THROWN)) == ABNORMAL);
interruptIfRunning(mayInterruptIfRunning);
return true;
}
public final void run() { quietlyInvoke(); }
Object adaptee() { return null; } // for printing and diagnostics
@@ -1690,7 +1708,6 @@ static final class AdaptedInterruptibleCallable extends InterruptibleTask
@SuppressWarnings("serial") // Conditionally serializable
T result;
AdaptedInterruptibleCallable(Callable<? extends T> callable) {
Objects.requireNonNull(callable);
this.callable = callable;
}
public final T getRawResult() { return result; }
@@ -1709,7 +1726,6 @@ static final class AdaptedInterruptibleRunnable extends InterruptibleTask
@SuppressWarnings("serial") // Conditionally serializable
final T result;
AdaptedInterruptibleRunnable(Runnable runnable, T result) {
Objects.requireNonNull(runnable);
this.runnable = runnable;
this.result = result;
}
@@ -1727,7 +1743,6 @@ static final class RunnableExecuteAction extends InterruptibleTask {
@SuppressWarnings("serial") // Conditionally serializable
final Runnable runnable;
RunnableExecuteAction(Runnable runnable) {
Objects.requireNonNull(runnable);
this.runnable = runnable;
}
public final Void getRawResult() { return null; }
@@ -1793,9 +1808,11 @@ final T invokeAny(Collection<? extends Callable> tasks,
throw new NullPointerException();
InvokeAnyTask t = null; // list of submitted tasks
try {
for (Callable c : tasks)
for (Callable c : tasks) {
Objects.requireNonNull(c);
pool.execute((ForkJoinTask<?>)
(t = new InvokeAnyTask(c, this, t)));
}
return timed ? get(nanos, TimeUnit.NANOSECONDS) : get();
} finally {
for (; t != null; t = t.pred)
@@ -1822,7 +1839,6 @@ static final class InvokeAnyTask extends InterruptibleTask {
final InvokeAnyTask pred; // to traverse on cancellation
InvokeAnyTask(Callable callable, InvokeAnyRoot root,
InvokeAnyTask pred) {
Objects.requireNonNull(callable);
this.callable = callable;
this.root = root;
this.pred = pred;
@@ -1857,4 +1873,39 @@ final void onRootCompletion() {
public final void setRawResult(Void v) { }
final Object adaptee() { return callable; }
}
/**
* Adapter for Callable-based interruptible tasks with timeout actions.
*/
@SuppressWarnings("serial") // Conditionally serializable
static final class CallableWithTimeout extends InterruptibleTask {
Callable<? extends T> callable; // nulled out after use
ForkJoinTask<?> timeoutAction;
T result;
CallableWithTimeout(Callable<? extends T> callable,
ForkJoinTask<?> timeoutAction) {
this.callable = callable;
this.timeoutAction = timeoutAction;
}
public final T getRawResult() { return result; }
public final void setRawResult(T v) { result = v; }
final Object adaptee() { return callable; }
final T compute() throws Exception {
Callable<? extends T> c;
return ((c = callable) != null) ? c.call() : null;
}
final boolean postExec() { // cancel timeout action
ForkJoinTask<?> t;
callable = null;
if ((t = timeoutAction) != null) {
timeoutAction = null;
try {
t.cancel(false);
} catch (Error | RuntimeException ex) {
}
}
return true;
}
}
}