Don't use thread interrupts to cancel executions (#1574)

This commit is contained in:
Marcin Kostrzewa 2021-03-16 10:43:36 +01:00 committed by GitHub
parent b0680d0384
commit ca90d9824a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 21 additions and 10 deletions

View File

@ -6,6 +6,7 @@ import com.oracle.truffle.api.Truffle;
import com.oracle.truffle.api.nodes.InvalidAssumptionException; import com.oracle.truffle.api.nodes.InvalidAssumptionException;
import org.enso.interpreter.runtime.control.ThreadInterruptedException; import org.enso.interpreter.runtime.control.ThreadInterruptedException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Phaser; import java.util.concurrent.Phaser;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantLock;
@ -23,6 +24,7 @@ public class ThreadManager {
private final ReentrantLock lock = new ReentrantLock(); private final ReentrantLock lock = new ReentrantLock();
private volatile boolean safepoint = false; private volatile boolean safepoint = false;
private final ConcurrentHashMap<Thread, Boolean> interruptFlags = new ConcurrentHashMap<>();
/** /**
* Registers the current thread as running guest code. * Registers the current thread as running guest code.
@ -35,6 +37,7 @@ public class ThreadManager {
*/ */
public void enter() { public void enter() {
safepointPhaser.register(); safepointPhaser.register();
interruptFlags.put(Thread.currentThread(), false);
} }
/** /**
@ -44,6 +47,7 @@ public class ThreadManager {
*/ */
public void leave() { public void leave() {
safepointPhaser.arriveAndDeregister(); safepointPhaser.arriveAndDeregister();
interruptFlags.remove(Thread.currentThread());
} }
/** Called from the interpreter to periodically perform a safepoint check. */ /** Called from the interpreter to periodically perform a safepoint check. */
@ -51,7 +55,8 @@ public class ThreadManager {
if (safepoint) { if (safepoint) {
CompilerDirectives.transferToInterpreter(); CompilerDirectives.transferToInterpreter();
safepointPhaser.arriveAndAwaitAdvance(); safepointPhaser.arriveAndAwaitAdvance();
if (Thread.interrupted()) { if (interruptFlags.get(Thread.currentThread())) {
interruptFlags.put(Thread.currentThread(), false);
throw new ThreadInterruptedException(); throw new ThreadInterruptedException();
} }
} }
@ -59,7 +64,7 @@ public class ThreadManager {
/** /**
* Forces all threads managed by this system to halt at the next safepoint (i.e. a {@link #poll()} * Forces all threads managed by this system to halt at the next safepoint (i.e. a {@link #poll()}
* call) and throw an exception if they were interrupted. * call) and throw a {@link ThreadInterruptedException}.
* *
* <p>This method is blocking, does not return until the last managed thread reports at a * <p>This method is blocking, does not return until the last managed thread reports at a
* safepoint. * safepoint.
@ -67,9 +72,10 @@ public class ThreadManager {
* <p>This method may not be called from a thread that is itself managed by this system, as doing * <p>This method may not be called from a thread that is itself managed by this system, as doing
* so may result in a deadlock. * so may result in a deadlock.
*/ */
public void checkInterrupts() { public void interruptThreads() {
lock.lock(); lock.lock();
try { try {
interruptFlags.replaceAll((t, b) -> true);
enter(); enter();
try { try {
safepoint = true; safepoint = true;

View File

@ -91,7 +91,7 @@ class JobExecutionEngine(
runningJob.future.cancel(runningJob.job.mayInterruptIfRunning) runningJob.future.cancel(runningJob.job.mayInterruptIfRunning)
} }
runtimeContext.executionService.getContext.getThreadManager runtimeContext.executionService.getContext.getThreadManager
.checkInterrupts() .interruptThreads()
} }
/** @inheritdoc */ /** @inheritdoc */
@ -104,7 +104,7 @@ class JobExecutionEngine(
} }
} }
runtimeContext.executionService.getContext.getThreadManager runtimeContext.executionService.getContext.getThreadManager
.checkInterrupts() .interruptThreads()
} }
/** @inheritdoc */ /** @inheritdoc */
@ -112,7 +112,7 @@ class JobExecutionEngine(
val allJobs = runningJobsRef.get() val allJobs = runningJobsRef.get()
allJobs.foreach(_.future.cancel(true)) allJobs.foreach(_.future.cancel(true))
runtimeContext.executionService.getContext.getThreadManager runtimeContext.executionService.getContext.getThreadManager
.checkInterrupts() .interruptThreads()
jobExecutor.shutdownNow() jobExecutor.shutdownNow()
} }

View File

@ -19,7 +19,13 @@ class ExecuteJob(
stack: List[InstrumentFrame], stack: List[InstrumentFrame],
updatedVisualisations: Seq[UUID], updatedVisualisations: Seq[UUID],
sendMethodCallUpdates: Boolean sendMethodCallUpdates: Boolean
) extends Job[Unit](List(contextId), true, true) ) extends Job[Unit](
List(contextId),
isCancellable = true,
// TODO[MK]: make this interruptible when https://github.com/oracle/graal/issues/3273
// is resolved
mayInterruptIfRunning = false
)
with ProgramExecutionSupport { with ProgramExecutionSupport {
def this(exe: Executable) = def this(exe: Executable) =

View File

@ -51,8 +51,7 @@ class RuntimeManagementTest extends InterpreterTest {
reportedCount += consumeOut.length reportedCount += consumeOut.length
} }
val expectedOut = List.fill(n)("Interrupted.") val expectedOut = List.fill(n)("Interrupted.")
threads.foreach(_.interrupt()) langCtx.getThreadManager.interruptThreads()
langCtx.getThreadManager.checkInterrupts()
threads.foreach(_.join()) threads.foreach(_.join())
consumeOut shouldEqual expectedOut consumeOut shouldEqual expectedOut
threads.forall(!_.isAlive) shouldBe true threads.forall(!_.isAlive) shouldBe true

View File

@ -44,7 +44,7 @@ public class HashIndex extends Index {
@Override @Override
public String ilocString(int loc) { public String ilocString(int loc) {
return iloc(loc).toString(); return String.valueOf(iloc(loc));
} }
@Override @Override