8319447: Improve performance of delayed task handling by DougLea · Pull Request #23702 · openjdk/jdk (original) (raw)

Expand Up

@@ -273,6 +273,8 @@ final boolean casNext(Aux c, Aux v) { // used only in cancellation

static final int ABNORMAL = 1 << 16;

static final int THROWN = 1 << 17;

static final int HAVE_EXCEPTION = DONE | ABNORMAL | THROWN;

static final int NUH_BIT = 24; // no external caller helping

static final int NO_USER_HELP = 1 << NUH_BIT;

static final int MARKER = 1 << 30; // utility marker

static final int SMASK = 0xffff; // short bits for tags

static final int UNCOMPENSATE = 1 << 16; // helpJoin sentinel

Expand All

@@ -292,6 +294,12 @@ private int getAndBitwiseOrStatus(int v) {

private boolean casStatus(int c, int v) {

return U.compareAndSetInt(this, STATUS, c, v);

}

final int noUserHelp() { // nonvolatile read; return 0 or 1

return (U.getInt(this, STATUS) & NO_USER_HELP) >>> NUH_BIT;

}

final void setNoUserHelp() { // for use in constructors only

U.putInt(this, STATUS, NO_USER_HELP);

}

// Support for waiting and signalling

Expand Down Expand Up

@@ -330,14 +338,9 @@ private void setDone() {

*/

final int trySetCancelled() {

int s;

for (;;) {

if ((s = status) < 0)

break;

if (casStatus(s, s | (DONE | ABNORMAL))) {

signalWaiters();

break;

}

}

if ((s = status) >= 0 &&

(s = getAndBitwiseOrStatus(DONE | ABNORMAL)) >= 0)

signalWaiters();

return s;

}

Expand Down Expand Up

@@ -481,7 +484,7 @@ else if (casAux(a, next))

*/

private int awaitDone(boolean interruptible, long deadline) {

ForkJoinWorkerThread wt; ForkJoinPool p; ForkJoinPool.WorkQueue q;

Thread t; boolean internal; int s;

Thread t; boolean internal; int s, ss;

if (internal =

(t = Thread.currentThread()) instanceof ForkJoinWorkerThread) {

p = (wt = (ForkJoinWorkerThread)t).pool;

Expand All

@@ -492,7 +495,7 @@ private int awaitDone(boolean interruptible, long deadline) {

return (((s = (p == null) ? 0 :

((this instanceof CountedCompleter) ?

p.helpComplete(this, q, internal) :

(this instanceof InterruptibleTask) && !internal ? status :

!internal && ((ss = status) & NO_USER_HELP) != 0 ? ss :

p.helpJoin(this, q, internal))) < 0)) ? s :

awaitDone(internal ? p : null, s, interruptible, deadline);

}

Expand Down Expand Up

@@ -642,7 +645,7 @@ public final ForkJoinTask fork() {

p = wt.pool;

}

else

q = (p = ForkJoinPool.common).externalSubmissionQueue();

q = (p = ForkJoinPool.common).externalSubmissionQueue(false);

q.push(this, p, internal);

return this;

}

Expand Down Expand Up

@@ -1160,7 +1163,7 @@ public static void helpQuiesce() {

*/

public void reinitialize() {

aux = null;

status = 0;

status &= NO_USER_HELP;

}

/**

Expand Down Expand Up

@@ -1414,7 +1417,8 @@ public final boolean compareAndSetForkJoinTaskTag(short expect, short update) {

* @return the task

*/

public static ForkJoinTask<?> adapt(Runnable runnable) {

return new AdaptedRunnableAction(runnable);

return new AdaptedRunnableAction(

Objects.requireNonNull(runnable));

}

/**

Expand All

@@ -1428,7 +1432,8 @@ public static ForkJoinTask<?> adapt(Runnable runnable) {

* @return the task

*/

public static ForkJoinTask adapt(Runnable runnable, T result) {

return new AdaptedRunnable(runnable, result);

return new AdaptedRunnable(

Objects.requireNonNull(runnable), result);

}

/**

Expand All

@@ -1442,7 +1447,8 @@ public static ForkJoinTask adapt(Runnable runnable, T result) {

* @return the task

*/

public static ForkJoinTask adapt(Callable<? extends T> callable) {

return new AdaptedCallable(callable);

return new AdaptedCallable(

Objects.requireNonNull(callable));

}

/**

Expand All

@@ -1460,7 +1466,8 @@ public static ForkJoinTask adapt(Callable<? extends T> callable) {

* @since 19

*/

public static ForkJoinTask adaptInterruptible(Callable<? extends T> callable) {

return new AdaptedInterruptibleCallable(callable);

return new AdaptedInterruptibleCallable(

Objects.requireNonNull(callable));

}

/**

Expand All

@@ -1479,7 +1486,8 @@ public static ForkJoinTask adaptInterruptible(Callable<? extends T> calla

* @since 22

*/

public static ForkJoinTask adaptInterruptible(Runnable runnable, T result) {

return new AdaptedInterruptibleRunnable(runnable, result);

return new AdaptedInterruptibleRunnable(

Objects.requireNonNull(runnable), result);

}

/**

Expand All

@@ -1497,7 +1505,8 @@ public static ForkJoinTask adaptInterruptible(Runnable runnable, T result

* @since 22

*/

public static ForkJoinTask<?> adaptInterruptible(Runnable runnable) {

return new AdaptedInterruptibleRunnable(runnable, null);

return new AdaptedInterruptibleRunnable(

Objects.requireNonNull(runnable), null);

}

// Serialization support

Expand Down Expand Up

@@ -1556,7 +1565,6 @@ static final class AdaptedRunnable extends ForkJoinTask

@SuppressWarnings("serial") // Conditionally serializable

T result;

AdaptedRunnable(Runnable runnable, T result) {

Objects.requireNonNull(runnable);

this.runnable = runnable;

this.result = result; // OK to set this even before completion

}

Expand All

@@ -1578,7 +1586,6 @@ static final class AdaptedRunnableAction extends ForkJoinTask

@SuppressWarnings("serial") // Conditionally serializable

final Runnable runnable;

AdaptedRunnableAction(Runnable runnable) {

Objects.requireNonNull(runnable);

this.runnable = runnable;

}

public final Void getRawResult() { return null; }

Expand All

@@ -1601,7 +1608,6 @@ static final class AdaptedCallable extends ForkJoinTask

@SuppressWarnings("serial") // Conditionally serializable

T result;

AdaptedCallable(Callable<? extends T> callable) {

Objects.requireNonNull(callable);

this.callable = callable;

}

public final T getRawResult() { return result; }

Expand Down Expand Up

@@ -1636,6 +1642,9 @@ public String toString() {

abstract static class InterruptibleTask extends ForkJoinTask

implements RunnableFuture {

transient volatile Thread runner;

InterruptibleTask() {

setNoUserHelp();

}

abstract T compute() throws Exception;

public final boolean exec() {

Thread.interrupted();

Expand All

@@ -1655,20 +1664,29 @@ public final boolean exec() {

} finally {

runner = null;

}

return postExec();

}

boolean postExec() { // cleanup and return completion status to doExec

return true;

}

final boolean interruptIfRunning(boolean enabled) {

Thread t;

if ((t = runner) == null) // return false if not running

return false;

if (enabled) {

try {

t.interrupt();

} catch (Throwable ignore) {

}

}

return true;

}

public boolean cancel(boolean mayInterruptIfRunning) {

Thread t;

if (trySetCancelled() >= 0) {

if (mayInterruptIfRunning && (t = runner) != null) {

try {

t.interrupt();

} catch (Throwable ignore) {

}

}

return true;

}

return isCancelled();

int s;

if ((s = trySetCancelled()) < 0)

return ((s & (ABNORMAL | THROWN)) == ABNORMAL);

interruptIfRunning(mayInterruptIfRunning);

return true;

}

public final void run() { quietlyInvoke(); }

Object adaptee() { return null; } // for printing and diagnostics

Expand All

@@ -1690,7 +1708,6 @@ static final class AdaptedInterruptibleCallable extends InterruptibleTask

@SuppressWarnings("serial") // Conditionally serializable

T result;

AdaptedInterruptibleCallable(Callable<? extends T> callable) {

Objects.requireNonNull(callable);

this.callable = callable;

}

public final T getRawResult() { return result; }

Expand All

@@ -1709,7 +1726,6 @@ static final class AdaptedInterruptibleRunnable extends InterruptibleTask

@SuppressWarnings("serial") // Conditionally serializable

final T result;

AdaptedInterruptibleRunnable(Runnable runnable, T result) {

Objects.requireNonNull(runnable);

this.runnable = runnable;

this.result = result;

}

Expand All

@@ -1727,7 +1743,6 @@ static final class RunnableExecuteAction extends InterruptibleTask {

@SuppressWarnings("serial") // Conditionally serializable

final Runnable runnable;

RunnableExecuteAction(Runnable runnable) {

Objects.requireNonNull(runnable);

this.runnable = runnable;

}

public final Void getRawResult() { return null; }

Expand Down Expand Up

@@ -1793,9 +1808,11 @@ final T invokeAny(Collection<? extends Callable> tasks,

throw new NullPointerException();

InvokeAnyTask t = null; // list of submitted tasks

try {

for (Callable c : tasks)

for (Callable c : tasks) {

Objects.requireNonNull(c);

pool.execute((ForkJoinTask<?>)

(t = new InvokeAnyTask(c, this, t)));

}

return timed ? get(nanos, TimeUnit.NANOSECONDS) : get();

} finally {

for (; t != null; t = t.pred)

Expand All

@@ -1822,7 +1839,6 @@ static final class InvokeAnyTask extends InterruptibleTask {

final InvokeAnyTask pred; // to traverse on cancellation

InvokeAnyTask(Callable callable, InvokeAnyRoot root,

InvokeAnyTask pred) {

Objects.requireNonNull(callable);

this.callable = callable;

this.root = root;

this.pred = pred;

Expand Down Expand Up

@@ -1857,4 +1873,39 @@ final void onRootCompletion() {

public final void setRawResult(Void v) { }

final Object adaptee() { return callable; }

}

/**

* Adapter for Callable-based interruptible tasks with timeout actions.

*/

@SuppressWarnings("serial") // Conditionally serializable

static final class CallableWithTimeout extends InterruptibleTask {

Callable<? extends T> callable; // nulled out after use

ForkJoinTask<?> timeoutAction;

T result;

CallableWithTimeout(Callable<? extends T> callable,

ForkJoinTask<?> timeoutAction) {

this.callable = callable;

this.timeoutAction = timeoutAction;

}

public final T getRawResult() { return result; }

public final void setRawResult(T v) { result = v; }

final Object adaptee() { return callable; }

final T compute() throws Exception {

Callable<? extends T> c;

return ((c = callable) != null) ? c.call() : null;

}

final boolean postExec() { // cancel timeout action

ForkJoinTask<?> t;

callable = null;

if ((t = timeoutAction) != null) {

timeoutAction = null;

try {

t.cancel(false);

} catch (Error | RuntimeException ex) {

}

}

return true;

}

}

}