Don't cancel aborted jobs immediately (#11375)

* Don't cancel aborted jobs immediately

Rather than cancelling Futures that capture jobs' logic,
this change introduces a two-level system:

- interrupt all jobs softly via ThreadInterrupted at safepoints
- if safepoint is not executed within some time period or it is
  but the job is still not cancelled, trigger a hard-interrupt
  by cancelling the job explicitly, if possible

Closes #11084.

* Only cancel Future when you mean it

Soft-cancelling a future only to later call it with `mayInterrupt` set
to `true` has no effect in the latter case.
Changed the logic so that interrupting a Future will really enforce it.

Ocassionally some commands should not attempt to run soft cancellations
- we know they will re-execute the program.

* Replace Thread.sleep with Future.get

No while loops etc, it's much easier to reason about what is soft and
hard interrupt supposed to do.

* Better comments/logs

* nit

* PR review

* Make test more robust
This commit is contained in:
Hubert Plociniczak 2024-11-05 10:33:02 +01:00 committed by GitHub
parent 988316f910
commit 35e5ed53d2
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
26 changed files with 257 additions and 63 deletions

View File

@ -12,6 +12,7 @@ import org.enso.languageserver.runtime.{
}
import org.enso.languageserver.util.UnhandledLogging
import java.util.UUID
import scala.concurrent.duration.FiniteDuration
/** A request handler for `executionContext/modifyVisualization` commands.
@ -41,16 +42,23 @@ class ModifyVisualizationHandler(
)
val cancellable =
context.system.scheduler.scheduleOnce(timeout, self, RequestTimeout)
context.become(responseStage(id, sender(), cancellable))
context.become(
responseStage(id, params.visualizationId, sender(), cancellable)
)
}
private def responseStage(
id: Id,
visualizationID: UUID,
replyTo: ActorRef,
cancellable: Cancellable
): Receive = {
case RequestTimeout =>
logger.error("Request [{}] timed out.", id)
logger.error(
"Request [{}] timed out for visualization {}.",
id,
visualizationID
)
replyTo ! ResponseError(Some(id), Errors.RequestTimeout)
context.stop(self)

View File

@ -52,7 +52,9 @@ public class SetExecutionEnvironmentCommand extends AsynchronousCommand {
if (!oldEnvironmentName.equals(executionEnvironment.name())) {
ctx.jobControlPlane()
.abortJobs(
contextId, "set execution environment to " + executionEnvironment.name());
contextId,
"set execution environment to " + executionEnvironment.name(),
false);
ctx.locking()
.withWriteCompilationLock(
this.getClass(),

View File

@ -21,11 +21,15 @@ public interface JobControlPlane {
* Aborts jobs that relates to the specified execution context.
*
* @param contextId an identifier of a context
* @param reason reason for aborting job(s)
* @param softAbortFirst true if ongoing jobs should be aborted with safepoints first, even if
* marked as interruptible
* @param classOf abort jobs of a given class only. If empty all jobs for the given context are
* aborted
*/
@SuppressWarnings("unchecked")
void abortJobs(UUID contextId, String reason, Class<? extends Job<?>>... classOf);
void abortJobs(
UUID contextId, String reason, boolean softAbortFirst, Class<? extends Job<?>>... classOf);
/**
* Aborts jobs that relate to the specified execution context.

View File

@ -32,7 +32,7 @@ public class ExecuteExpressionJob extends Job<Executable> implements UniqueJob<E
}
@Override
public Executable run(RuntimeContext ctx) {
public Executable runImpl(RuntimeContext ctx) {
return ctx.locking()
.withContextLock(
ctx.locking().getOrCreateContextLock(contextId),

View File

@ -18,7 +18,7 @@ public final class SerializeModuleJob extends BackgroundJob<Void> {
}
@Override
public Void run(RuntimeContext ctx) {
public Void runImpl(RuntimeContext ctx) {
var ensoContext = ctx.executionService().getContext();
var compiler = ensoContext.getCompiler();
boolean useGlobalCacheLocations = ensoContext.isUseGlobalCache();

View File

@ -34,7 +34,7 @@ class DestroyContextCmd(
}
private def removeContext()(implicit ctx: RuntimeContext): Unit = {
ctx.jobControlPlane.abortJobs(request.contextId, "destroy context")
ctx.jobControlPlane.abortJobs(request.contextId, "destroy context", false)
val contextLock = ctx.locking.getOrCreateContextLock(request.contextId)
try {
ctx.locking.withContextLock(

View File

@ -23,7 +23,11 @@ class InterruptContextCmd(
): Future[Unit] =
if (doesContextExist) {
Future {
ctx.jobControlPlane.abortJobs(request.contextId, "interrupt context")
ctx.jobControlPlane.abortJobs(
request.contextId,
"interrupt context",
false
)
reply(Api.InterruptContextResponse(request.contextId))
}
} else {

View File

@ -43,7 +43,7 @@ class PopContextCmd(
ec: ExecutionContext
): Future[Unit] =
Future {
ctx.jobControlPlane.abortJobs(request.contextId, "pop context")
ctx.jobControlPlane.abortJobs(request.contextId, "pop context", false)
val maybeTopItem = ctx.contextManager.pop(request.contextId)
if (maybeTopItem.isDefined) {
reply(Api.PopContextResponse(request.contextId))

View File

@ -46,7 +46,7 @@ class PushContextCmd(
ec: ExecutionContext
): Future[Boolean] =
Future {
ctx.jobControlPlane.abortJobs(request.contextId, "push context")
ctx.jobControlPlane.abortJobs(request.contextId, "push context", false)
val stack = ctx.contextManager.getStack(request.contextId)
val pushed = request.stackItem match {
case _: Api.StackItem.ExplicitCall if stack.isEmpty =>

View File

@ -42,7 +42,11 @@ class RecomputeContextCmd(
ec: ExecutionContext
): Future[Boolean] = {
Future {
ctx.jobControlPlane.abortJobs(request.contextId, "recompute context")
ctx.jobControlPlane.abortJobs(
request.contextId,
"recompute context",
false
)
val stack = ctx.contextManager.getStack(request.contextId)
if (stack.isEmpty) {
reply(Api.EmptyStackError(request.contextId))

View File

@ -1,6 +1,7 @@
package org.enso.interpreter.instrument.execution
import com.oracle.truffle.api.TruffleLogger
import org.enso.common.Asserts.assertInJvm
import org.enso.interpreter.instrument.InterpreterContext
import org.enso.interpreter.instrument.job.{BackgroundJob, Job, UniqueJob}
import org.enso.text.Sha3_224VersionCalculator
@ -8,10 +9,9 @@ import org.enso.text.Sha3_224VersionCalculator
import java.util
import java.util.{Collections, UUID}
import java.util.concurrent.atomic.AtomicReference
import java.util.concurrent.ExecutorService
import java.util.concurrent.{ExecutorService, TimeUnit}
import java.util.logging.Level
import scala.concurrent.{Future, Promise}
import scala.concurrent.{Future, Promise, TimeoutException}
import scala.util.control.NonFatal
/** This component schedules the execution of jobs. It keeps a queue of
@ -37,6 +37,9 @@ final class JobExecutionEngine(
private val context = interpreterContext.executionService.getContext
private val pendingCancellationsExecutor =
context.newFixedThreadPool(1, "pending-cancellations", false)
private val jobParallelism = context.getJobParallelism
private var isBackgroundJobsStarted = false
@ -78,6 +81,82 @@ final class JobExecutionEngine(
private lazy val logger: TruffleLogger =
runtimeContext.executionService.getLogger
// Independent Runnable that has a list of jobs that should finish within a pre-determined window
// and, if not, are interrupted.
private class ForceJobCancellations(val pendingJobs: Seq[(Long, RunningJob)])
extends Runnable {
private val forceInterruptTimeout: Long = 50 * 1000
override def run(): Unit = {
pendingJobs.sortBy(_._1).foreach {
case (timeRequestedToCancel, runningJob) =>
try {
val now = System.currentTimeMillis()
val timeSinceRequestedToCancel = now - timeRequestedToCancel
assertInJvm(timeSinceRequestedToCancel > 0)
val timeToCancel =
forceInterruptTimeout - timeSinceRequestedToCancel
logger.log(
Level.FINEST,
"About to wait {}ms to cancel job {}",
Array[Any](
timeToCancel,
runningJob.id
)
)
runningJob.future.get(timeToCancel, TimeUnit.MILLISECONDS)
logger.log(
Level.FINEST,
"Job {} finished within the allocated soft-cancel time"
)
} catch {
case _: TimeoutException =>
val sb = new StringBuilder(
"Threaddump when timeout is reached while waiting for the job " + runningJob.id + " running in thread " + runningJob.job
.threadNameExecutingJob() + " to cancel:\n"
)
Thread.getAllStackTraces.entrySet.forEach { entry =>
sb.append(entry.getKey.getName).append("\n")
entry.getValue.foreach { e =>
sb.append(" ")
.append(e.getClassName)
.append(".")
.append(e.getMethodName)
.append("(")
.append(e.getFileName)
.append(":")
.append(e.getLineNumber)
.append(")\n")
}
}
logger.log(Level.WARNING, sb.toString())
runningJob.future.cancel(runningJob.job.mayInterruptIfRunning)
case e: Throwable =>
logger.log(
Level.WARNING,
"Encountered exception while waiting on status of pending jobs",
e
)
}
}
}
}
private def maybeForceCancelRunningJob(
runningJob: RunningJob,
softAbortFirst: Boolean
): Option[RunningJob] = {
val delayJobCancellation =
runningJob.job.mayInterruptIfRunning && softAbortFirst || !runningJob.job
.hasStarted()
if (delayJobCancellation) {
Some(runningJob)
} else {
runningJob.future.cancel(runningJob.job.mayInterruptIfRunning)
None
}
}
/** @inheritdoc */
override def runBackground[A](job: BackgroundJob[A]): Unit =
synchronized {
@ -118,7 +197,12 @@ final class JobExecutionEngine(
case jobRef: UniqueJob[_] if jobRef.equalsTo(job) =>
logger
.log(Level.FINEST, s"Cancelling duplicate job [$jobRef].")
runningJob.future.cancel(jobRef.mayInterruptIfRunning)
updatePendingCancellations(
maybeForceCancelRunningJob(
runningJob,
softAbortFirst = true
).toSeq
)
case _ =>
}
}
@ -126,6 +210,23 @@ final class JobExecutionEngine(
}
}
private def updatePendingCancellations(
jobsToCancel: Seq[RunningJob]
): Unit = {
val at = System.currentTimeMillis()
if (jobsToCancel.nonEmpty) {
logger.log(
Level.FINEST,
"Submitting {0} job(s) for future cancellation",
jobsToCancel.map(j => (j.job.getClass, j.id))
)
}
if (jobsToCancel.nonEmpty)
pendingCancellationsExecutor.submit(
new ForceJobCancellations(jobsToCancel.map((at, _)))
)
}
private def runInternal[A](
job: Job[A],
executorService: ExecutorService,
@ -164,6 +265,7 @@ final class JobExecutionEngine(
)
}
})
job.setJobId(jobId)
val runningJob = RunningJob(jobId, job, future)
val queue = runningJobsRef.updateAndGet(_ :+ runningJob)
@ -192,9 +294,11 @@ final class JobExecutionEngine(
"Aborting {0} jobs because {1}: {2}",
Array[Any](cancellableJobs.length, reason, cancellableJobs.map(_.id))
)
cancellableJobs.foreach { runningJob =>
runningJob.future.cancel(runningJob.job.mayInterruptIfRunning)
}
val pending = cancellableJobs.flatMap(
maybeForceCancelRunningJob(_, softAbortFirst = true)
)
updatePendingCancellations(pending)
runtimeContext.executionService.getContext.getThreadManager
.interruptThreads()
}
@ -203,23 +307,27 @@ final class JobExecutionEngine(
override def abortJobs(
contextId: UUID,
reason: String,
softAbortFirst: Boolean,
toAbort: Class[_ <: Job[_]]*
): Unit = {
val allJobs = runningJobsRef.get()
val contextJobs = allJobs.filter(_.job.contextIds.contains(contextId))
contextJobs.foreach { runningJob =>
if (
runningJob.job.isCancellable && (toAbort.isEmpty || toAbort
.contains(runningJob.getClass))
) {
logger.log(
Level.FINE,
"Aborting job {0} because {1}",
Array[Any](runningJob.id, reason)
)
runningJob.future.cancel(runningJob.job.mayInterruptIfRunning)
val pending = contextJobs
.flatMap { runningJob =>
if (
runningJob.job.isCancellable && (toAbort.isEmpty || toAbort
.contains(runningJob.getClass))
) {
logger.log(
Level.FINE,
"Aborting job {0} because {1}",
Array[Any](runningJob.id, reason)
)
Some(runningJob)
} else None
}
}
.flatMap(maybeForceCancelRunningJob(_, softAbortFirst))
updatePendingCancellations(pending)
runtimeContext.executionService.getContext.getThreadManager
.interruptThreads()
}
@ -232,16 +340,19 @@ final class JobExecutionEngine(
): Unit = {
val allJobs = runningJobsRef.get()
val contextJobs = allJobs.filter(_.job.contextIds.contains(contextId))
contextJobs.foreach { runningJob =>
if (runningJob.job.isCancellable && accept.apply(runningJob.job)) {
logger.log(
Level.FINE,
"Aborting job {0} because {1}",
Array[Any](runningJob.id, reason)
)
runningJob.future.cancel(runningJob.job.mayInterruptIfRunning)
val pending = contextJobs
.flatMap { runningJob =>
if (runningJob.job.isCancellable && accept.apply(runningJob.job)) {
logger.log(
Level.FINE,
"Aborting job {0} because {1}",
Array[Any](runningJob.id, reason)
)
Some(runningJob)
} else None
}
}
.flatMap(maybeForceCancelRunningJob(_, softAbortFirst = true))
updatePendingCancellations(pending)
runtimeContext.executionService.getContext.getThreadManager
.interruptThreads()
}
@ -262,9 +373,10 @@ final class JobExecutionEngine(
"Aborting {0} background jobs because {1}: {2}",
Array[Any](cancellableJobs.length, reason, cancellableJobs.map(_.id))
)
cancellableJobs.foreach { runningJob =>
runningJob.future.cancel(runningJob.job.mayInterruptIfRunning)
}
val pending = cancellableJobs.flatMap(
maybeForceCancelRunningJob(_, softAbortFirst = true)
)
updatePendingCancellations(pending)
}
/** @inheritdoc */
@ -292,6 +404,7 @@ final class JobExecutionEngine(
.interruptThreads()
jobExecutor.shutdownNow()
backgroundJobExecutor.shutdownNow()
pendingCancellationsExecutor.shutdownNow()
}
/** Submit background jobs preserving the stable order. */

View File

@ -310,7 +310,7 @@ class ReentrantLocking(logger: TruffleLogger) extends Locking {
val now2 = System.currentTimeMillis()
logger.log(
Level.FINEST,
"Waited [{0}] {1}ms for the {2} lock",
"Waited [{0}] {1}ms for the {2}",
Array[Any](where.getSimpleName, now2 - now, msg)
)
now2

View File

@ -22,7 +22,7 @@ final class AnalyzeModuleInScopeJob(
private val exportsBuilder = new ExportsBuilder
/** @inheritdoc */
override def run(implicit ctx: RuntimeContext): Unit = {
override def runImpl(implicit ctx: RuntimeContext): Unit = {
// There are two runtime flags that can disable suggestions for project
// and global modules (libraries). They are used primarily in tests to
// disable the suggestion updates and reduce the number of messages that

View File

@ -26,7 +26,7 @@ final class AnalyzeModuleJob(
) extends BackgroundJob[Unit](AnalyzeModuleJob.Priority) {
/** @inheritdoc */
override def run(implicit ctx: RuntimeContext): Unit = {
override def runImpl(implicit ctx: RuntimeContext): Unit = {
AnalyzeModuleJob.analyzeModule(module, state, ir, changeset)
}

View File

@ -27,7 +27,7 @@ final class DeserializeLibrarySuggestionsJob(
}
/** @inheritdoc */
override def run(implicit ctx: RuntimeContext): Unit = {
override def runImpl(implicit ctx: RuntimeContext): Unit = {
ctx.executionService.getLogger.log(
Level.FINE,
"Deserializing suggestions for library [{}].",

View File

@ -29,7 +29,7 @@ class DetachVisualizationJob(
}
/** @inheritdoc */
override def run(implicit ctx: RuntimeContext): Unit = {
override def runImpl(implicit ctx: RuntimeContext): Unit = {
ctx.locking.withContextLock(
ctx.locking.getOrCreateContextLock(contextId),
this.getClass,

View File

@ -58,7 +58,7 @@ class EnsureCompiledJob(
import EnsureCompiledJob._
/** @inheritdoc */
override def run(implicit ctx: RuntimeContext): CompilationStatus = {
override def runImpl(implicit ctx: RuntimeContext): CompilationStatus = {
ctx.locking.withWriteCompilationLock(
this.getClass,
() => {

View File

@ -27,10 +27,29 @@ class ExecuteJob(
mayInterruptIfRunning = true
) {
private var _threadName: String = "<unknown>"
@volatile private var _hasStarted: Boolean = false
private var _jobId: UUID = _
override def threadNameExecutingJob(): String = _threadName
override def hasStarted(): Boolean = _hasStarted
override def setJobId(id: UUID): Unit = {
_jobId = id
}
/** @inheritdoc */
override def run(implicit ctx: RuntimeContext): Unit = {
override def runImpl(implicit ctx: RuntimeContext): Unit = {
_hasStarted = true
_threadName = Thread.currentThread().getName
try {
runImpl
ctx.executionService.getLogger.log(
Level.INFO,
"Starting ExecuteJob[{}]",
_jobId
)
execute
} catch {
case t: Throwable =>
ctx.executionService.getLogger.log(Level.SEVERE, "Failed to execute", t)
@ -55,10 +74,16 @@ class ExecuteJob(
)
)
)
} finally {
ctx.executionService.getLogger.log(
Level.FINEST,
"Finished ExecuteJob[{0}]",
_jobId
)
}
}
private def runImpl(implicit ctx: RuntimeContext): Unit = {
private def execute(implicit ctx: RuntimeContext): Unit = {
ctx.state.executionHooks.run()
ctx.locking.withContextLock(
@ -116,7 +141,7 @@ class ExecuteJob(
}
override def toString(): String = {
s"ExecuteJob(contextId=$contextId)"
s"ExecuteJob(contextId=$contextId, jobId=${_jobId})"
}
}

View File

@ -19,6 +19,8 @@ abstract class Job[+A](
val highPriority: Boolean
) {
@volatile private var _hasStarted = false
def this(
contextIds: List[UUID],
isCancellable: Boolean,
@ -27,11 +29,32 @@ abstract class Job[+A](
this(contextIds, isCancellable, mayInterruptIfRunning, false)
}
/** Executes a job. Will mark the job as "started".
*
* @param ctx contains suppliers of services to perform a request
*/
final def run(implicit ctx: RuntimeContext): A = {
_hasStarted = true
runImpl(ctx)
}
/** Executes a job.
*
* @param ctx contains suppliers of services to perform a request
*/
def run(implicit ctx: RuntimeContext): A
def runImpl(implicit ctx: RuntimeContext): A
/** Returns the name of the thread which executes the job, if any.
* @return a name of the thread or null, if information is unsupported
*/
def threadNameExecutingJob(): String = null
/** Indicates whether the job has started executing. */
def hasStarted(): Boolean = {
_hasStarted
}
private[instrument] def setJobId(id: UUID): Unit = ()
}

View File

@ -34,7 +34,7 @@ final class RefactoringRenameJob(
) {
/** @inheritdoc */
override def run(implicit ctx: RuntimeContext): Seq[File] = {
override def runImpl(implicit ctx: RuntimeContext): Seq[File] = {
val logger = ctx.executionService.getLogger
ctx.locking.withReadCompilationLock(
this.getClass,

View File

@ -14,7 +14,7 @@ final class StartBackgroundProcessingJob()
) {
/** @inheritdoc */
override def run(implicit ctx: RuntimeContext): Unit =
override def runImpl(implicit ctx: RuntimeContext): Unit =
StartBackgroundProcessingJob.startBackgroundJobs()
}

View File

@ -61,7 +61,7 @@ class UpsertVisualizationJob(
}
/** @inheritdoc */
override def run(implicit ctx: RuntimeContext): Option[Executable] = {
override def runImpl(implicit ctx: RuntimeContext): Option[Executable] = {
ctx.locking.withContextLock(
ctx.locking.getOrCreateContextLock(config.executionContextId),
this.getClass,

View File

@ -10,7 +10,7 @@ class SlowEnsureCompiledJob(
isCancellable: Boolean = true
) extends EnsureCompiledJob(files, isCancellable) {
override def run(implicit ctx: RuntimeContext): CompilationStatus = {
override def runImpl(implicit ctx: RuntimeContext): CompilationStatus = {
Thread.sleep(1000)
super.run(ctx)
}

View File

@ -21,7 +21,7 @@ class SlowUpsertVisualizationJob(
override val isCancellable: Boolean = true
override val mayInterruptIfRunning: Boolean = true
override def run(implicit ctx: RuntimeContext): Option[Executable] = {
override def runImpl(implicit ctx: RuntimeContext): Option[Executable] = {
if (
ctx.executionService.getContext.isRandomDelayedCommandExecution && delay
) {

View File

@ -230,7 +230,7 @@ class RuntimeAsyncCommandsTest
var iteration = 0
while (!isProgramStarted && iteration < 100) {
val out = context.consumeOut
Thread.sleep(200)
Thread.sleep(100)
isProgramStarted = out == List("started")
iteration += 1
}
@ -361,7 +361,7 @@ class RuntimeAsyncCommandsTest
var iteration = 0
while (!isProgramStarted && iteration < 100) {
val out = context.consumeOut
Thread.sleep(200)
Thread.sleep(100)
isProgramStarted = out == List("started")
iteration += 1
}
@ -386,7 +386,7 @@ class RuntimeAsyncCommandsTest
)
)
)
val responses1 = context.receiveNIgnorePendingExpressionUpdates(3)
val responses1 = context.receiveNIgnorePendingExpressionUpdates(2)
responses1 should contain allOf (
TestMessages.update(
contextId,
@ -407,7 +407,18 @@ class RuntimeAsyncCommandsTest
),
context.executionComplete(contextId)
)
context.consumeOut should contain("True")
// It's possible that ExecutionComplete is from RecomputeContext not from EditFileNotification.
// If that's the case, then there might be a race in the output produced by the program.
var reallyFinished = false
iteration = 0
while (!reallyFinished && iteration < 50) {
val out = context.consumeOut
Thread.sleep(100)
reallyFinished = out.contains("True")
iteration += 1
}
reallyFinished shouldBe true
}
it should "interrupt running execution context without sending Panic in expression updates" in {

View File

@ -26,7 +26,7 @@ class ActorLoggingReceive(
case _ => ""
}
val template =
s"received ${if (handled) "handled" else "unhandled"} {} from {}$labelText"
s"receive ${if (handled) "handled" else "unhandled"} {} from {}$labelText"
logger.trace(template, o, context.sender())
handled
}