diff --git a/engine/runtime-instrument-common/src/main/java/org/enso/interpreter/instrument/command/ExecuteExpressionCommand.java b/engine/runtime-instrument-common/src/main/java/org/enso/interpreter/instrument/command/ExecuteExpressionCommand.java index 632b48c37b..107bbc6ea3 100644 --- a/engine/runtime-instrument-common/src/main/java/org/enso/interpreter/instrument/command/ExecuteExpressionCommand.java +++ b/engine/runtime-instrument-common/src/main/java/org/enso/interpreter/instrument/command/ExecuteExpressionCommand.java @@ -40,13 +40,23 @@ public final class ExecuteExpressionCommand extends SynchronousCommand { } @Override + @SuppressWarnings("unchecked") public void executeSynchronously(RuntimeContext ctx, ExecutionContext ec) { if (ctx.contextManager().contains(contextId)) { reply(new Runtime$Api$VisualizationAttached(), ctx); - ctx.jobControlPlane().abortJobs(contextId); + ctx.jobControlPlane() + .abortJobs( + contextId, + job -> { + if (job instanceof ExecuteJob e) { + return e.visualizationTriggered(); + } else { + return job instanceof ExecuteExpressionJob; + } + }); ctx.jobProcessor() .run(new ExecuteExpressionJob(contextId, visualizationId, expressionId, expression)) - .flatMap(executable -> ctx.jobProcessor().run(ExecuteJob.apply(executable)), ec); + .flatMap(executable -> ctx.jobProcessor().run(ExecuteJob.apply(executable, true)), ec); } else { reply(new Runtime$Api$ContextNotExistError(contextId), ctx); } diff --git a/engine/runtime-instrument-common/src/main/java/org/enso/interpreter/instrument/command/SetExecutionEnvironmentCommand.java b/engine/runtime-instrument-common/src/main/java/org/enso/interpreter/instrument/command/SetExecutionEnvironmentCommand.java index 04877fc530..73412ff20a 100644 --- a/engine/runtime-instrument-common/src/main/java/org/enso/interpreter/instrument/command/SetExecutionEnvironmentCommand.java +++ b/engine/runtime-instrument-common/src/main/java/org/enso/interpreter/instrument/command/SetExecutionEnvironmentCommand.java @@ -38,28 +38,31 @@ public class SetExecutionEnvironmentCommand extends AsynchronousCommand { ec); } + @SuppressWarnings("unchecked") private void setExecutionEnvironment( Runtime$Api$ExecutionEnvironment executionEnvironment, UUID contextId, RuntimeContext ctx) { var logger = ctx.executionService().getLogger(); var contextLockTimestamp = ctx.locking().acquireContextLock(contextId); - var writeLockTimestamp = ctx.locking().acquireWriteCompilationLock(); - try { - Stack stack = ctx.contextManager().getStack(contextId); - ctx.jobControlPlane().abortJobs(contextId); - ctx.executionService() - .getContext() - .setExecutionEnvironment(ExecutionEnvironment.forName(executionEnvironment.name())); - CacheInvalidation.invalidateAll(stack); - ctx.jobProcessor().run(ExecuteJob.apply(contextId, stack.toList())); - reply(new Runtime$Api$SetExecutionEnvironmentResponse(contextId), ctx); + var writeLockTimestamp = ctx.locking().acquireWriteCompilationLock(); + try { + Stack stack = ctx.contextManager().getStack(contextId); + ctx.jobControlPlane().abortJobs(contextId); + ctx.executionService() + .getContext() + .setExecutionEnvironment(ExecutionEnvironment.forName(executionEnvironment.name())); + CacheInvalidation.invalidateAll(stack); + ctx.jobProcessor().run(ExecuteJob.apply(contextId, stack.toList())); + reply(new Runtime$Api$SetExecutionEnvironmentResponse(contextId), ctx); + } finally { + ctx.locking().releaseWriteCompilationLock(); + logger.log( + Level.FINEST, + "Kept write compilation lock [SetExecutionEnvironmentCommand] for " + + (System.currentTimeMillis() - writeLockTimestamp) + + " milliseconds"); + } } finally { - ctx.locking().releaseWriteCompilationLock(); - logger.log( - Level.FINEST, - "Kept write compilation lock [SetExecutionEnvironmentCommand] for " - + (System.currentTimeMillis() - writeLockTimestamp) - + " milliseconds"); ctx.locking().releaseContextLock(contextId); logger.log( Level.FINEST, diff --git a/engine/runtime-instrument-common/src/main/java/org/enso/interpreter/instrument/execution/JobControlPlane.java b/engine/runtime-instrument-common/src/main/java/org/enso/interpreter/instrument/execution/JobControlPlane.java new file mode 100644 index 0000000000..d28e68d869 --- /dev/null +++ b/engine/runtime-instrument-common/src/main/java/org/enso/interpreter/instrument/execution/JobControlPlane.java @@ -0,0 +1,63 @@ +package org.enso.interpreter.instrument.execution; + +import java.util.UUID; +import org.enso.interpreter.instrument.job.Job; + +/** Controls running jobs. */ +public interface JobControlPlane { + + /** Aborts all interruptible jobs. */ + void abortAllJobs(); + + /** + * Abort all jobs except the ignored jobs. + * + * @param ignoredJobs the list of jobs to keep in the execution queue + */ + @SuppressWarnings("unchecked") + void abortAllExcept(Class>... ignoredJobs); + + /** + * Aborts jobs that relates to the specified execution context. + * + * @param contextId an identifier of a context + * @param classOf abort jobs of a given class only. If empty all jobs for the given context are + * aborted + */ + @SuppressWarnings("unchecked") + void abortJobs(UUID contextId, Class>... classOf); + + /** + * Aborts jobs that relate to the specified execution context. + * + * @param contextId an identifier of a context + * @param accept filter that selects jobs to be aborted + */ + @SuppressWarnings("unchecked") + void abortJobs(UUID contextId, java.util.function.Function, Boolean> accept); + + /** + * Abort provided background jobs. + * + * @param toAbort the list of jobs to abort + */ + @SuppressWarnings("unchecked") + void abortBackgroundJobs(Class>... toAbort); + + /** + * Starts background jobs processing. + * + * @return `true` if the background jobs were started and `false` if they are already running. + */ + boolean startBackgroundJobs(); + + /** + * Stops background jobs processing. + * + * @return `true` if the call stopped background job, `false` if they are already stopped. + */ + boolean stopBackgroundJobs(); + + /** Finds the first in-progress job satisfying the `filter` condition */ + scala.Option jobInProgress(scala.PartialFunction, scala.Option> filter); +} diff --git a/engine/runtime-instrument-common/src/main/java/org/enso/interpreter/instrument/job/ExecuteExpressionJob.java b/engine/runtime-instrument-common/src/main/java/org/enso/interpreter/instrument/job/ExecuteExpressionJob.java index c86b581b77..b92d571a03 100644 --- a/engine/runtime-instrument-common/src/main/java/org/enso/interpreter/instrument/job/ExecuteExpressionJob.java +++ b/engine/runtime-instrument-common/src/main/java/org/enso/interpreter/instrument/job/ExecuteExpressionJob.java @@ -26,7 +26,7 @@ public class ExecuteExpressionJob extends Job implements UniqueJob implements UniqueJob that) { - return that instanceof ExecuteExpressionJob; + if (that instanceof ExecuteExpressionJob job) { + return contextId == job.contextId && expressionId == job.expressionId; + } + return false; } } diff --git a/engine/runtime-instrument-common/src/main/scala/org/enso/interpreter/instrument/WarningPreview.scala b/engine/runtime-instrument-common/src/main/scala/org/enso/interpreter/instrument/WarningPreview.scala index e35d2bdce0..7e28390732 100644 --- a/engine/runtime-instrument-common/src/main/scala/org/enso/interpreter/instrument/WarningPreview.scala +++ b/engine/runtime-instrument-common/src/main/scala/org/enso/interpreter/instrument/WarningPreview.scala @@ -8,7 +8,7 @@ import java.nio.charset.StandardCharsets object WarningPreview { - private val method = ".to_display_text" + private[this] val METHOD = ".to_display_text" /** Execute preview of the provided warning value. * @@ -20,7 +20,7 @@ object WarningPreview { val visualizationExpression = ctx.executionService.evaluateExpression( ctx.executionService.getContext.getBuiltins.getModule, - method + METHOD ) val visualizationResult = ctx.executionService.callFunction( visualizationExpression, diff --git a/engine/runtime-instrument-common/src/main/scala/org/enso/interpreter/instrument/command/CloseFileCmd.scala b/engine/runtime-instrument-common/src/main/scala/org/enso/interpreter/instrument/command/CloseFileCmd.scala index 01efd47738..973b320feb 100644 --- a/engine/runtime-instrument-common/src/main/scala/org/enso/interpreter/instrument/command/CloseFileCmd.scala +++ b/engine/runtime-instrument-common/src/main/scala/org/enso/interpreter/instrument/command/CloseFileCmd.scala @@ -17,24 +17,30 @@ class CloseFileCmd(request: Api.CloseFileNotification) ctx: RuntimeContext, ec: ExecutionContext ): Unit = { - val logger = ctx.executionService.getLogger - val readLockTimestamp = ctx.locking.acquireReadCompilationLock() - val fileLockTimestamp = ctx.locking.acquireFileLock(request.path) - val pendingEditsLockTimestamp = ctx.locking.acquirePendingEditsLock() + val logger = ctx.executionService.getLogger + val readLockTimestamp = ctx.locking.acquireReadCompilationLock() try { - ctx.state.pendingEdits.dequeue(request.path) - ctx.executionService.resetModuleSources(request.path) + val fileLockTimestamp = ctx.locking.acquireFileLock(request.path) + try { + val pendingEditsLockTimestamp = ctx.locking.acquirePendingEditsLock() + try { + ctx.state.pendingEdits.dequeue(request.path) + ctx.executionService.resetModuleSources(request.path) + } finally { + ctx.locking.releasePendingEditsLock() + logger.log( + Level.FINEST, + "Kept pending edits lock [CloseFileCmd] for " + (System.currentTimeMillis - pendingEditsLockTimestamp) + " milliseconds" + ) + } + } finally { + ctx.locking.releaseFileLock(request.path) + logger.log( + Level.FINEST, + "Kept file lock [CloseFileCmd] for " + (System.currentTimeMillis - fileLockTimestamp) + " milliseconds" + ) + } } finally { - ctx.locking.releasePendingEditsLock() - logger.log( - Level.FINEST, - "Kept pending edits lock [CloseFileCmd] for " + (System.currentTimeMillis - pendingEditsLockTimestamp) + " milliseconds" - ) - ctx.locking.releaseFileLock(request.path) - logger.log( - Level.FINEST, - "Kept file lock [CloseFileCmd] for " + (System.currentTimeMillis - fileLockTimestamp) + " milliseconds" - ) ctx.locking.releaseReadCompilationLock() logger.log( Level.FINEST, diff --git a/engine/runtime-instrument-common/src/main/scala/org/enso/interpreter/instrument/command/EditFileCmd.scala b/engine/runtime-instrument-common/src/main/scala/org/enso/interpreter/instrument/command/EditFileCmd.scala index 553ac7efe0..90a6f7acce 100644 --- a/engine/runtime-instrument-common/src/main/scala/org/enso/interpreter/instrument/command/EditFileCmd.scala +++ b/engine/runtime-instrument-common/src/main/scala/org/enso/interpreter/instrument/command/EditFileCmd.scala @@ -23,29 +23,34 @@ class EditFileCmd(request: Api.EditFileNotification) ctx: RuntimeContext, ec: ExecutionContext ): Unit = { - val logger = ctx.executionService.getLogger - val fileLockTimestamp = ctx.locking.acquireFileLock(request.path) - val pendingEditsLockTimestamp = ctx.locking.acquirePendingEditsLock() + val logger = ctx.executionService.getLogger + val fileLockTimestamp = ctx.locking.acquireFileLock(request.path) try { - logger.log( - Level.FINE, - "Adding pending file edits: {}", - request.edits.map(e => (e.range, e.text.length)) - ) - val edits = - request.edits.map(edit => PendingEdit.ApplyEdit(edit, request.execute)) - ctx.state.pendingEdits.enqueue(request.path, edits) - if (request.execute) { - ctx.jobControlPlane.abortAllJobs() - ctx.jobProcessor.run(new EnsureCompiledJob(Seq(request.path))) - executeJobs.foreach(ctx.jobProcessor.run) + val pendingEditsLockTimestamp = ctx.locking.acquirePendingEditsLock() + try { + logger.log( + Level.FINE, + "Adding pending file edits: {}", + request.edits.map(e => (e.range, e.text.length)) + ) + val edits = + request.edits.map(edit => + PendingEdit.ApplyEdit(edit, request.execute) + ) + ctx.state.pendingEdits.enqueue(request.path, edits) + if (request.execute) { + ctx.jobControlPlane.abortAllJobs() + ctx.jobProcessor.run(new EnsureCompiledJob(Seq(request.path))) + executeJobs.foreach(ctx.jobProcessor.run) + } + } finally { + ctx.locking.releasePendingEditsLock() + logger.log( + Level.FINEST, + "Kept pending edits lock [EditFileCmd] for " + (System.currentTimeMillis - pendingEditsLockTimestamp) + " milliseconds" + ) } } finally { - ctx.locking.releasePendingEditsLock() - logger.log( - Level.FINEST, - "Kept pending edits lock [EditFileCmd] for " + (System.currentTimeMillis - pendingEditsLockTimestamp) + " milliseconds" - ) ctx.locking.releaseFileLock(request.path) logger.log( Level.FINEST, diff --git a/engine/runtime-instrument-common/src/main/scala/org/enso/interpreter/instrument/command/ModifyVisualizationCmd.scala b/engine/runtime-instrument-common/src/main/scala/org/enso/interpreter/instrument/command/ModifyVisualizationCmd.scala index 8dc048b3ac..78502216d0 100644 --- a/engine/runtime-instrument-common/src/main/scala/org/enso/interpreter/instrument/command/ModifyVisualizationCmd.scala +++ b/engine/runtime-instrument-common/src/main/scala/org/enso/interpreter/instrument/command/ModifyVisualizationCmd.scala @@ -75,9 +75,7 @@ class ModifyVisualizationCmd( case Some(exec) => for { - _ <- Future { - ctx.jobProcessor.run(EnsureCompiledJob(exec.stack)) - } + _ <- ctx.jobProcessor.run(EnsureCompiledJob(exec.stack)) _ <- ctx.jobProcessor.run(ExecuteJob(exec)) } yield () } diff --git a/engine/runtime-instrument-common/src/main/scala/org/enso/interpreter/instrument/command/OpenFileCmd.scala b/engine/runtime-instrument-common/src/main/scala/org/enso/interpreter/instrument/command/OpenFileCmd.scala index 6d238af0b1..5348eadfb5 100644 --- a/engine/runtime-instrument-common/src/main/scala/org/enso/interpreter/instrument/command/OpenFileCmd.scala +++ b/engine/runtime-instrument-common/src/main/scala/org/enso/interpreter/instrument/command/OpenFileCmd.scala @@ -23,27 +23,29 @@ class OpenFileCmd( ): Unit = { val logger = ctx.executionService.getLogger val readLockTimestamp = ctx.locking.acquireReadCompilationLock() - val fileLockTimestamp = ctx.locking.acquireFileLock(request.path) try { - ctx.executionService.setModuleSources( - request.path, - request.contents - ) - ctx.endpoint.sendToClient( - Api.Response(maybeRequestId, Api.OpenFileResponse) - ) + val fileLockTimestamp = ctx.locking.acquireFileLock(request.path) + try { + ctx.executionService.setModuleSources( + request.path, + request.contents + ) + ctx.endpoint.sendToClient( + Api.Response(maybeRequestId, Api.OpenFileResponse) + ) + } finally { + ctx.locking.releaseFileLock(request.path) + logger.log( + Level.FINEST, + "Kept file lock [OpenFileCmd] for " + (System.currentTimeMillis - fileLockTimestamp) + " milliseconds" + ) + } } finally { - ctx.locking.releaseFileLock(request.path) - logger.log( - Level.FINEST, - "Kept file lock [OpenFileCmd] for " + (System.currentTimeMillis - fileLockTimestamp) + " milliseconds" - ) ctx.locking.releaseReadCompilationLock() logger.log( Level.FINEST, "Kept read compilation lock [OpenFileCmd] for " + (System.currentTimeMillis - readLockTimestamp) + " milliseconds" ) - } } } diff --git a/engine/runtime-instrument-common/src/main/scala/org/enso/interpreter/instrument/command/SetExpressionValueCmd.scala b/engine/runtime-instrument-common/src/main/scala/org/enso/interpreter/instrument/command/SetExpressionValueCmd.scala index 62ee7c8e7b..3df15f756f 100644 --- a/engine/runtime-instrument-common/src/main/scala/org/enso/interpreter/instrument/command/SetExpressionValueCmd.scala +++ b/engine/runtime-instrument-common/src/main/scala/org/enso/interpreter/instrument/command/SetExpressionValueCmd.scala @@ -23,35 +23,37 @@ class SetExpressionValueCmd(request: Api.SetExpressionValueNotification) ctx: RuntimeContext, ec: ExecutionContext ): Future[Unit] = { - val logger = ctx.executionService.getLogger - val fileLockTimestamp = ctx.locking.acquireFileLock(request.path) - val pendingEditsLockTimestamp = ctx.locking.acquirePendingEditsLock() + val logger = ctx.executionService.getLogger + val fileLockTimestamp = ctx.locking.acquireFileLock(request.path) try { - val pendingApplyEdits = - request.edits.map( - PendingEdit.SetExpressionValue( - _, - request.expressionId, - request.expressionValue + val pendingEditsLockTimestamp = ctx.locking.acquirePendingEditsLock() + try { + val pendingApplyEdits = + request.edits.map( + PendingEdit.SetExpressionValue( + _, + request.expressionId, + request.expressionValue + ) ) + ctx.state.pendingEdits.enqueue(request.path, pendingApplyEdits) + ctx.jobControlPlane.abortAllJobs() + ctx.jobProcessor.run(new EnsureCompiledJob(Seq(request.path))) + executeJobs.foreach(ctx.jobProcessor.run) + Future.successful(()) + } finally { + ctx.locking.releasePendingEditsLock() + logger.log( + Level.FINEST, + "Kept pending edits lock [SetExpressionValueCmd] for " + (System.currentTimeMillis - pendingEditsLockTimestamp) + " milliseconds" ) - ctx.state.pendingEdits.enqueue(request.path, pendingApplyEdits) - ctx.jobControlPlane.abortAllJobs() - ctx.jobProcessor.run(new EnsureCompiledJob(Seq(request.path))) - executeJobs.foreach(ctx.jobProcessor.run) - Future.successful(()) + } } finally { - ctx.locking.releasePendingEditsLock() - logger.log( - Level.FINEST, - "Kept pending edits lock [SetExpressionValueCmd] for " + (System.currentTimeMillis - pendingEditsLockTimestamp) + " milliseconds" - ) ctx.locking.releaseFileLock(request.path) logger.log( Level.FINEST, "Kept file lock [SetExpressionValueCmd] for " + (System.currentTimeMillis - fileLockTimestamp) + " milliseconds" ) - } } diff --git a/engine/runtime-instrument-common/src/main/scala/org/enso/interpreter/instrument/execution/Executable.scala b/engine/runtime-instrument-common/src/main/scala/org/enso/interpreter/instrument/execution/Executable.scala index f53461e2a3..7f3db8485f 100644 --- a/engine/runtime-instrument-common/src/main/scala/org/enso/interpreter/instrument/execution/Executable.scala +++ b/engine/runtime-instrument-common/src/main/scala/org/enso/interpreter/instrument/execution/Executable.scala @@ -5,7 +5,7 @@ import org.enso.polyglot.runtime.Runtime.Api import scala.collection.mutable -/** Represents executable piece of enso program. +/** Represents executable piece of Enso program. * * @param contextId an identifier of a context to execute * @param stack a call stack that must be executed diff --git a/engine/runtime-instrument-common/src/main/scala/org/enso/interpreter/instrument/execution/JobControlPlane.scala b/engine/runtime-instrument-common/src/main/scala/org/enso/interpreter/instrument/execution/JobControlPlane.scala deleted file mode 100644 index 73ecb9458b..0000000000 --- a/engine/runtime-instrument-common/src/main/scala/org/enso/interpreter/instrument/execution/JobControlPlane.scala +++ /dev/null @@ -1,53 +0,0 @@ -package org.enso.interpreter.instrument.execution - -import org.enso.interpreter.instrument.job.Job - -import java.util.UUID - -import scala.annotation.varargs - -/** Controls running jobs. - */ -trait JobControlPlane { - - /** Aborts all interruptible jobs. */ - def abortAllJobs(): Unit - - /** Abort all jobs except the ignored jobs. - * - * @param ignoredJobs the list of jobs to keep in the execution queue - */ - @varargs - def abortAllExcept(ignoredJobs: Class[_ <: Job[_]]*): Unit - - /** Aborts all jobs that relates to the specified execution context. - * - * @param contextId an identifier of a context - */ - def abortJobs(contextId: UUID): Unit - - /** Abort provided background jobs. - * - * @param toAbort the list of jobs to abort - */ - @varargs - def abortBackgroundJobs(toAbort: Class[_ <: Job[_]]*): Unit - - /** Starts background jobs processing. - * - * @return `true` if the background jobs were started and `false` if they are - * already running. - */ - def startBackgroundJobs(): Boolean - - /** Stops background jobs processing. - * - * @return `true` if the call stopped background job, `false` if they are - * already stopped. - */ - def stopBackgroundJobs(): Boolean - - /** Finds the first in-progress job satisfying the `filter` condition - */ - def jobInProgress[T](filter: PartialFunction[Job[_], Option[T]]): Option[T] -} diff --git a/engine/runtime-instrument-common/src/main/scala/org/enso/interpreter/instrument/execution/JobExecutionEngine.scala b/engine/runtime-instrument-common/src/main/scala/org/enso/interpreter/instrument/execution/JobExecutionEngine.scala index b1cc10c3b1..d979b47495 100644 --- a/engine/runtime-instrument-common/src/main/scala/org/enso/interpreter/instrument/execution/JobExecutionEngine.scala +++ b/engine/runtime-instrument-common/src/main/scala/org/enso/interpreter/instrument/execution/JobExecutionEngine.scala @@ -46,6 +46,14 @@ final class JobExecutionEngine( val jobExecutor: ExecutorService = context.newFixedThreadPool(jobParallelism, "job-pool", false) + val highPriorityJobExecutor: ExecutorService = + context.newCachedThreadPool( + "prioritized-job-pool", + 2, + Integer.MAX_VALUE, + false + ) + private val backgroundJobExecutor: ExecutorService = context.newFixedThreadPool(1, "background-job-pool", false) @@ -84,7 +92,9 @@ final class JobExecutionEngine( /** @inheritdoc */ override def run[A](job: Job[A]): Future[A] = { cancelDuplicateJobs(job, runningJobsRef) - runInternal(job, jobExecutor, runningJobsRef) + val executor = + if (job.highPriority) highPriorityJobExecutor else jobExecutor + runInternal(job, executor, runningJobsRef) } private def cancelDuplicateJobs[A]( @@ -116,19 +126,21 @@ final class JobExecutionEngine( val jobId = UUID.randomUUID() val promise = Promise[A]() val logger = runtimeContext.executionService.getLogger - logger.log(Level.FINE, s"Submitting job: $job...") + logger.log(Level.FINE, s"Submitting job: {0}...", job) val future = executorService.submit(() => { - logger.log(Level.FINE, s"Executing job: $job...") + logger.log(Level.FINE, s"Executing job: {0}...", job) val before = System.currentTimeMillis() try { val result = job.run(runtimeContext) val took = System.currentTimeMillis() - before - logger.log(Level.FINE, s"Job $job finished in $took ms.") + logger.log(Level.FINE, s"Job {0} finished in {1} ms.", Array(job, took)) promise.success(result) } catch { case NonFatal(ex) => logger.log(Level.SEVERE, s"Error executing $job", ex) promise.failure(ex) + case err: InterruptedException => + logger.log(Level.WARNING, s"$job got interrupted", err) case err: Throwable => logger.log(Level.SEVERE, s"Error executing $job", err) throw err @@ -138,7 +150,8 @@ final class JobExecutionEngine( }) val runningJob = RunningJob(jobId, job, future) - runningJobsRef.updateAndGet(_ :+ runningJob) + val queue = runningJobsRef.updateAndGet(_ :+ runningJob) + logger.log(Level.FINE, "Number of pending jobs: {}", queue.size) promise.future } @@ -163,11 +176,33 @@ final class JobExecutionEngine( } /** @inheritdoc */ - override def abortJobs(contextId: UUID): Unit = { + override def abortJobs( + contextId: UUID, + toAbort: Class[_ <: Job[_]]* + ): Unit = { val allJobs = runningJobsRef.get() val contextJobs = allJobs.filter(_.job.contextIds.contains(contextId)) contextJobs.foreach { runningJob => - if (runningJob.job.isCancellable) { + if ( + runningJob.job.isCancellable && (toAbort.isEmpty || toAbort + .contains(runningJob.getClass)) + ) { + runningJob.future.cancel(runningJob.job.mayInterruptIfRunning) + } + } + runtimeContext.executionService.getContext.getThreadManager + .interruptThreads() + } + + /** @inheritdoc */ + override def abortJobs( + contextId: UUID, + accept: java.util.function.Function[Job[_], java.lang.Boolean] + ): Unit = { + val allJobs = runningJobsRef.get() + val contextJobs = allJobs.filter(_.job.contextIds.contains(contextId)) + contextJobs.foreach { runningJob => + if (runningJob.job.isCancellable && accept.apply(runningJob.job)) { runningJob.future.cancel(runningJob.job.mayInterruptIfRunning) } } diff --git a/engine/runtime-instrument-common/src/main/scala/org/enso/interpreter/instrument/job/EnsureCompiledJob.scala b/engine/runtime-instrument-common/src/main/scala/org/enso/interpreter/instrument/job/EnsureCompiledJob.scala index 4613e58be5..e2d22ff32a 100644 --- a/engine/runtime-instrument-common/src/main/scala/org/enso/interpreter/instrument/job/EnsureCompiledJob.scala +++ b/engine/runtime-instrument-common/src/main/scala/org/enso/interpreter/instrument/job/EnsureCompiledJob.scala @@ -290,35 +290,38 @@ final class EnsureCompiledJob( ctx: RuntimeContext, logger: TruffleLogger ): Option[Changeset[Rope]] = { - val fileLockTimestamp = ctx.locking.acquireFileLock(file) - val pendingEditsLockTimestamp = ctx.locking.acquirePendingEditsLock() + val fileLockTimestamp = ctx.locking.acquireFileLock(file) try { - val pendingEdits = ctx.state.pendingEdits.dequeue(file) - val edits = pendingEdits.map(_.edit) - val shouldExecute = - pendingEdits.isEmpty || pendingEdits.exists(_.execute) - val module = ctx.executionService.getContext - .getModuleForFile(file) - .orElseThrow(() => new ModuleNotFoundForFileException(file)) - val changesetBuilder = new ChangesetBuilder( - module.getLiteralSource, - module.getIr - ) - val changeset = changesetBuilder.build(pendingEdits) - ctx.executionService.modifyModuleSources( - module, - edits, - changeset.simpleUpdate.orNull, - logger - ) - Option.when(shouldExecute)(changeset) + val pendingEditsLockTimestamp = ctx.locking.acquirePendingEditsLock() + try { + val pendingEdits = ctx.state.pendingEdits.dequeue(file) + val edits = pendingEdits.map(_.edit) + val shouldExecute = + pendingEdits.isEmpty || pendingEdits.exists(_.execute) + val module = ctx.executionService.getContext + .getModuleForFile(file) + .orElseThrow(() => new ModuleNotFoundForFileException(file)) + val changesetBuilder = new ChangesetBuilder( + module.getLiteralSource, + module.getIr + ) + val changeset = changesetBuilder.build(pendingEdits) + ctx.executionService.modifyModuleSources( + module, + edits, + changeset.simpleUpdate.orNull, + logger + ) + Option.when(shouldExecute)(changeset) + } finally { + ctx.locking.releasePendingEditsLock() + logger.log( + Level.FINEST, + "Kept pending edits lock [EnsureCompiledJob] for {} milliseconds", + System.currentTimeMillis() - pendingEditsLockTimestamp + ) + } } finally { - ctx.locking.releasePendingEditsLock() - logger.log( - Level.FINEST, - "Kept pending edits lock [EnsureCompiledJob] for {} milliseconds", - System.currentTimeMillis() - pendingEditsLockTimestamp - ) ctx.locking.releaseFileLock(file) logger.log( Level.FINEST, diff --git a/engine/runtime-instrument-common/src/main/scala/org/enso/interpreter/instrument/job/ExecuteJob.scala b/engine/runtime-instrument-common/src/main/scala/org/enso/interpreter/instrument/job/ExecuteJob.scala index 86bb4e7849..d80e1c3c56 100644 --- a/engine/runtime-instrument-common/src/main/scala/org/enso/interpreter/instrument/job/ExecuteJob.scala +++ b/engine/runtime-instrument-common/src/main/scala/org/enso/interpreter/instrument/job/ExecuteJob.scala @@ -5,7 +5,6 @@ import org.enso.interpreter.instrument.InstrumentFrame import org.enso.interpreter.instrument.execution.{Executable, RuntimeContext} import org.enso.interpreter.runtime.state.ExecutionEnvironment import org.enso.polyglot.runtime.Runtime.Api -import org.enso.polyglot.runtime.Runtime.Api.ExecutionResult import java.util.logging.Level @@ -18,71 +17,79 @@ import java.util.logging.Level class ExecuteJob( contextId: UUID, stack: List[InstrumentFrame], - val executionEnvironment: Option[Api.ExecutionEnvironment] + val executionEnvironment: Option[Api.ExecutionEnvironment], + val visualizationTriggered: Boolean = false ) extends Job[Unit]( List(contextId), isCancellable = true, - // TODO[MK]: make this interruptible when https://github.com/oracle/graal/issues/3590 - // is resolved - mayInterruptIfRunning = false + // Interruptions may turn out to be problematic in enterprise edition of GraalVM + // until https://github.com/oracle/graal/issues/3590 is resolved + mayInterruptIfRunning = true ) { /** @inheritdoc */ override def run(implicit ctx: RuntimeContext): Unit = { - val logger = ctx.executionService.getLogger - val acquiredLock = ctx.locking.acquireContextLock(contextId) - val readLockTimestamp = ctx.locking.acquireReadCompilationLock() - val context = ctx.executionService.getContext - val originalExecutionEnvironment = - executionEnvironment.map(_ => context.getExecutionEnvironment) + val logger = ctx.executionService.getLogger + val acquiredLock = ctx.locking.acquireContextLock(contextId) try { - executionEnvironment.foreach(env => - context.setExecutionEnvironment(ExecutionEnvironment.forName(env.name)) - ) - val outcome = - try ProgramExecutionSupport.runProgram(contextId, stack) - finally { - originalExecutionEnvironment.foreach(context.setExecutionEnvironment) - } - outcome match { - case Some(diagnostic: Api.ExecutionResult.Diagnostic) => - if (diagnostic.isError) { - ctx.endpoint.sendToClient( - Api.Response(Api.ExecutionFailed(contextId, diagnostic)) + val readLockTimestamp = ctx.locking.acquireReadCompilationLock() + try { + val context = ctx.executionService.getContext + val originalExecutionEnvironment = + executionEnvironment.map(_ => context.getExecutionEnvironment) + executionEnvironment.foreach(env => + context.setExecutionEnvironment( + ExecutionEnvironment.forName(env.name) + ) + ) + val outcome = + try ProgramExecutionSupport.runProgram(contextId, stack) + finally { + originalExecutionEnvironment.foreach( + context.setExecutionEnvironment ) - } else { + } + outcome match { + case Some(diagnostic: Api.ExecutionResult.Diagnostic) => + if (diagnostic.isError) { + ctx.endpoint.sendToClient( + Api.Response(Api.ExecutionFailed(contextId, diagnostic)) + ) + } else { + ctx.endpoint.sendToClient( + Api.Response(Api.ExecutionUpdate(contextId, Seq(diagnostic))) + ) + ctx.endpoint.sendToClient( + Api.Response(Api.ExecutionComplete(contextId)) + ) + } + case Some(failure: Api.ExecutionResult.Failure) => ctx.endpoint.sendToClient( - Api.Response(Api.ExecutionUpdate(contextId, Seq(diagnostic))) + Api.Response(Api.ExecutionFailed(contextId, failure)) ) + case None => ctx.endpoint.sendToClient( Api.Response(Api.ExecutionComplete(contextId)) ) - } - case Some(failure: Api.ExecutionResult.Failure) => + } + } catch { + case t: Throwable => ctx.endpoint.sendToClient( - Api.Response(Api.ExecutionFailed(contextId, failure)) - ) - case None => - ctx.endpoint.sendToClient( - Api.Response(Api.ExecutionComplete(contextId)) - ) - } - } catch { - case t: Throwable => - ctx.endpoint.sendToClient( - Api.Response( - Api.ExecutionFailed( - contextId, - ExecutionResult.Failure(t.getMessage, None) + Api.Response( + Api.ExecutionFailed( + contextId, + Api.ExecutionResult.Failure(t.getMessage, None) + ) ) ) + } finally { + ctx.locking.releaseReadCompilationLock() + logger.log( + Level.FINEST, + s"Kept read compilation lock [ExecuteJob] for ${System.currentTimeMillis() - readLockTimestamp} milliseconds" ) + } } finally { - ctx.locking.releaseReadCompilationLock() - logger.log( - Level.FINEST, - s"Kept read compilation lock [ExecuteJob] for ${System.currentTimeMillis() - readLockTimestamp} milliseconds" - ) ctx.locking.releaseContextLock(contextId) logger.log( Level.FINEST, @@ -102,10 +109,19 @@ object ExecuteJob { /** Create execute job from the executable. * * @param executable the executable to run + * @param visualizationTriggered true if execution is triggered by a visualization request, false otherwise * @return the new execute job */ - def apply(executable: Executable): ExecuteJob = - new ExecuteJob(executable.contextId, executable.stack.toList, None) + def apply( + executable: Executable, + visualizationTriggered: Boolean = false + ): ExecuteJob = + new ExecuteJob( + executable.contextId, + executable.stack.toList, + None, + visualizationTriggered + ) /** Create execute job from the context and stack. * diff --git a/engine/runtime-instrument-common/src/main/scala/org/enso/interpreter/instrument/job/Job.scala b/engine/runtime-instrument-common/src/main/scala/org/enso/interpreter/instrument/job/Job.scala index 6a154daef5..9ab0dd2ba5 100644 --- a/engine/runtime-instrument-common/src/main/scala/org/enso/interpreter/instrument/job/Job.scala +++ b/engine/runtime-instrument-common/src/main/scala/org/enso/interpreter/instrument/job/Job.scala @@ -15,9 +15,18 @@ import org.enso.interpreter.instrument.execution.RuntimeContext abstract class Job[+A]( val contextIds: List[UUID], val isCancellable: Boolean, - val mayInterruptIfRunning: Boolean + val mayInterruptIfRunning: Boolean, + val highPriority: Boolean ) { + def this( + contextIds: List[UUID], + isCancellable: Boolean, + mayInterruptIfRunning: Boolean + ) = { + this(contextIds, isCancellable, mayInterruptIfRunning, false) + } + /** Executes a job. * * @param ctx contains suppliers of services to perform a request diff --git a/engine/runtime-instrument-common/src/main/scala/org/enso/interpreter/instrument/job/ProgramExecutionSupport.scala b/engine/runtime-instrument-common/src/main/scala/org/enso/interpreter/instrument/job/ProgramExecutionSupport.scala index 070f65b93e..74899d2942 100644 --- a/engine/runtime-instrument-common/src/main/scala/org/enso/interpreter/instrument/job/ProgramExecutionSupport.scala +++ b/engine/runtime-instrument-common/src/main/scala/org/enso/interpreter/instrument/job/ProgramExecutionSupport.scala @@ -244,10 +244,10 @@ object ProgramExecutionSupport { } val (explicitCallOpt, localCalls) = unwind(stack, Nil, Nil) - val executionResult = for { + val executionResult: Either[Option[Api.ExecutionResult], Unit] = for { stackItem <- Either.fromOption( explicitCallOpt, - Api.ExecutionResult.Failure("Execution stack is empty.", None) + Some(Api.ExecutionResult.Failure("Execution stack is empty.", None)) ) _ <- Either @@ -255,7 +255,7 @@ object ProgramExecutionSupport { .leftMap(onExecutionError(stackItem.item, _)) } yield () logger.log(Level.FINEST, s"Execution finished: $executionResult") - executionResult.fold(Some(_), _ => None) + executionResult.fold(identity, _ => None) } /** Execution error handler. @@ -267,24 +267,24 @@ object ProgramExecutionSupport { private def onExecutionError( item: ExecutionItem, error: Throwable - )(implicit ctx: RuntimeContext): Api.ExecutionResult = { + )(implicit ctx: RuntimeContext): Option[Api.ExecutionResult] = { val itemName = item match { case ExecutionItem.Method(_, _, function) => function case ExecutionItem.CallData(_, call) => call.getFunction.getName } val executionUpdate = getExecutionOutcome(error) val reason = VisualizationResult.findExceptionMessage(error) - def onFailure() = error match { + def onFailure(): Option[Api.ExecutionResult] = error match { case _: ThreadInterruptedException => val message = s"Execution of function $itemName interrupted." ctx.executionService.getLogger.log(Level.FINE, message) - ExecutionResult.Diagnostic.warning(message, None) + None case _ => val message = s"Execution of function $itemName failed ($reason)." ctx.executionService.getLogger.log(Level.WARNING, message, error) - ExecutionResult.Failure(message, None) + Some(ExecutionResult.Failure(message, None)) } - executionUpdate.getOrElse(onFailure()) + executionUpdate.orElse(onFailure()) } /** Convert the runtime exception to the corresponding API error messages. @@ -564,12 +564,13 @@ object ProgramExecutionSupport { .getOrElse(expressionValue.getClass) ctx.executionService.getLogger.log( Level.WARNING, - "Execution of visualization [{0}] on value [{1}] of [{2}] failed. {3}", + "Execution of visualization [{0}] on value [{1}] of [{2}] failed. {3} | {4}", Array[Object]( visualizationId, expressionId, typeOfNode, message, + expressionValue, error ) ) diff --git a/engine/runtime-instrument-common/src/main/scala/org/enso/interpreter/instrument/job/UpsertVisualizationJob.scala b/engine/runtime-instrument-common/src/main/scala/org/enso/interpreter/instrument/job/UpsertVisualizationJob.scala index 78ac32396e..cbdbad4afc 100644 --- a/engine/runtime-instrument-common/src/main/scala/org/enso/interpreter/instrument/job/UpsertVisualizationJob.scala +++ b/engine/runtime-instrument-common/src/main/scala/org/enso/interpreter/instrument/job/UpsertVisualizationJob.scala @@ -45,7 +45,8 @@ class UpsertVisualizationJob( ) extends Job[Option[Executable]]( List(config.executionContextId), false, - false + false, + true ) with UniqueJob[Option[Executable]] { @@ -62,68 +63,72 @@ class UpsertVisualizationJob( implicit val logger: TruffleLogger = ctx.executionService.getLogger val lockTimestamp = ctx.locking.acquireContextLock(config.executionContextId) - val writeLockTimestamp = ctx.locking.acquireWriteCompilationLock() try { - val maybeCallable = - UpsertVisualizationJob.evaluateVisualizationExpression( - config.visualizationModule, - config.expression - ) - - maybeCallable match { - case Left(ModuleNotFound(moduleName)) => - ctx.endpoint.sendToClient( - Api.Response(Api.ModuleNotFound(moduleName)) + val writeLockTimestamp = ctx.locking.acquireWriteCompilationLock() + try { + val maybeCallable = + UpsertVisualizationJob.evaluateVisualizationExpression( + config.visualizationModule, + config.expression ) - None - case Left(EvaluationFailed(message, result)) => - replyWithExpressionFailedError( - config.executionContextId, - visualizationId, - expressionId, - message, - result - ) - None + maybeCallable match { + case Left(ModuleNotFound(moduleName)) => + ctx.endpoint.sendToClient( + Api.Response(Api.ModuleNotFound(moduleName)) + ) + None - case Right(EvaluationResult(module, callable, arguments)) => - val visualization = - UpsertVisualizationJob.updateAttachedVisualization( + case Left(EvaluationFailed(message, result)) => + replyWithExpressionFailedError( + config.executionContextId, visualizationId, expressionId, - module, - config, - callable, - arguments + message, + result ) - val stack = ctx.contextManager.getStack(config.executionContextId) - val cachedValue = stack.headOption - .flatMap(frame => Option(frame.cache.get(expressionId))) - UpsertVisualizationJob.requireVisualizationSynchronization( - stack, - expressionId - ) - cachedValue match { - case Some(value) => - ProgramExecutionSupport.executeAndSendVisualizationUpdate( - config.executionContextId, - stack.headOption.get.syncState, - visualization, + None + + case Right(EvaluationResult(module, callable, arguments)) => + val visualization = + UpsertVisualizationJob.updateAttachedVisualization( + visualizationId, expressionId, - value + module, + config, + callable, + arguments ) - None - case None => - Some(Executable(config.executionContextId, stack)) - } + val stack = ctx.contextManager.getStack(config.executionContextId) + val cachedValue = stack.headOption + .flatMap(frame => Option(frame.cache.get(expressionId))) + UpsertVisualizationJob.requireVisualizationSynchronization( + stack, + expressionId + ) + cachedValue match { + case Some(value) => + ProgramExecutionSupport.executeAndSendVisualizationUpdate( + config.executionContextId, + stack.headOption.get.syncState, + visualization, + expressionId, + value + ) + None + case None => + Some(Executable(config.executionContextId, stack)) + } + } + } finally { + ctx.locking.releaseWriteCompilationLock() + logger.log( + Level.FINEST, + s"Kept write compilation lock [UpsertVisualizationJob] for ${System + .currentTimeMillis() - writeLockTimestamp} milliseconds" + ) } } finally { - ctx.locking.releaseWriteCompilationLock() - logger.log( - Level.FINEST, - s"Kept write compilation lock [UpsertVisualizationJob] for ${System.currentTimeMillis() - writeLockTimestamp} milliseconds" - ) ctx.locking.releaseContextLock(config.executionContextId) logger.log( Level.FINEST, diff --git a/engine/runtime-integration-tests/src/test/scala/org/enso/interpreter/test/instrument/RuntimeAsyncCommandsTest.scala b/engine/runtime-integration-tests/src/test/scala/org/enso/interpreter/test/instrument/RuntimeAsyncCommandsTest.scala index 2fa8519021..6f1634382b 100644 --- a/engine/runtime-integration-tests/src/test/scala/org/enso/interpreter/test/instrument/RuntimeAsyncCommandsTest.scala +++ b/engine/runtime-integration-tests/src/test/scala/org/enso/interpreter/test/instrument/RuntimeAsyncCommandsTest.scala @@ -220,28 +220,24 @@ class RuntimeAsyncCommandsTest context.send( Api.Request(requestId, Api.InterruptContextRequest(contextId)) ) - context.receiveNIgnoreExpressionUpdates( - 3 - ) should contain theSameElementsAs Seq( - Api.Response(requestId, Api.InterruptContextResponse(contextId)), - Api.Response( - Api.ExecutionUpdate( - contextId, - Seq( - Api.ExecutionResult.Diagnostic( - Api.DiagnosticType.Warning, - Some("Execution of function main interrupted."), - None, - None, - None, - Vector() - ) - ) - ) - ), - Api.Response( - Api.ExecutionComplete(contextId) - ) + val responses = context.receiveNIgnoreExpressionUpdates( + 2 ) + responses.length shouldEqual 2 + responses should contain( + Api.Response(requestId, Api.InterruptContextResponse(contextId)) + ) + + val failures = responses.filter(_.payload.isInstanceOf[Api.ExecutionFailed]) + failures.length shouldEqual 1 + + val failure = failures.head.payload.asInstanceOf[Api.ExecutionFailed] + failure.contextId shouldEqual contextId + failure.result shouldBe a[Api.ExecutionResult.Diagnostic] + + val diagnostic = failure.result.asInstanceOf[Api.ExecutionResult.Diagnostic] + diagnostic.kind shouldEqual Api.DiagnosticType.Error + diagnostic.message shouldEqual Some("sleep interrupted") + diagnostic.stack should not be empty } } diff --git a/engine/runtime/src/main/java/org/enso/interpreter/runtime/EnsoContext.java b/engine/runtime/src/main/java/org/enso/interpreter/runtime/EnsoContext.java index 7067b0b96d..bff4bbc326 100644 --- a/engine/runtime/src/main/java/org/enso/interpreter/runtime/EnsoContext.java +++ b/engine/runtime/src/main/java/org/enso/interpreter/runtime/EnsoContext.java @@ -730,6 +730,10 @@ public final class EnsoContext { return threadExecutors.newCachedThreadPool(name, systemThreads); } + public ExecutorService newCachedThreadPool(String name, int min, int max, boolean systemThreads) { + return threadExecutors.newCachedThreadPool(name, systemThreads, min, max); + } + /** * @param parallel amount of parallelism for the pool * @param name human-readable name of the pool diff --git a/engine/runtime/src/main/java/org/enso/interpreter/runtime/HostClassLoader.java b/engine/runtime/src/main/java/org/enso/interpreter/runtime/HostClassLoader.java index a4139a1f9f..ee949e6837 100644 --- a/engine/runtime/src/main/java/org/enso/interpreter/runtime/HostClassLoader.java +++ b/engine/runtime/src/main/java/org/enso/interpreter/runtime/HostClassLoader.java @@ -14,8 +14,8 @@ import org.slf4j.LoggerFactory; /** * Host class loader that serves as a replacement for {@link * com.oracle.truffle.host.HostClassLoader}. Add URLs to Jar archives with {@link #add(URL)}. All - * the classes that are loded via this class loader are first searched inside those archives. If not - * found, delegates to parent class loaders. + * the classes that are loaded via this class loader are first searched inside those archives. If + * not found, delegates to parent class loaders. */ public class HostClassLoader extends URLClassLoader { diff --git a/engine/runtime/src/main/java/org/enso/interpreter/runtime/ThreadExecutors.java b/engine/runtime/src/main/java/org/enso/interpreter/runtime/ThreadExecutors.java index 8153776e9d..b2ab84b9a8 100644 --- a/engine/runtime/src/main/java/org/enso/interpreter/runtime/ThreadExecutors.java +++ b/engine/runtime/src/main/java/org/enso/interpreter/runtime/ThreadExecutors.java @@ -3,9 +3,7 @@ package org.enso.interpreter.runtime; import java.util.Collections; import java.util.Map; import java.util.WeakHashMap; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.ThreadFactory; +import java.util.concurrent.*; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.logging.Level; @@ -20,13 +18,28 @@ final class ThreadExecutors { this.context = context; } - final ExecutorService newCachedThreadPool(String name, boolean systemThread) { + ExecutorService newCachedThreadPool(String name, boolean systemThread) { var s = Executors.newCachedThreadPool(new Factory(name, systemThread)); pools.put(s, name); return s; } - final ExecutorService newFixedThreadPool(int cnt, String name, boolean systemThread) { + ExecutorService newCachedThreadPool(String name, boolean systemThread, int min, int max) { + assert min >= 0; + assert max <= Integer.MAX_VALUE; + var s = + new ThreadPoolExecutor( + min, + max, + 60L, + TimeUnit.SECONDS, + new SynchronousQueue(), + new Factory(name, systemThread)); + pools.put(s, name); + return s; + } + + ExecutorService newFixedThreadPool(int cnt, String name, boolean systemThread) { var s = Executors.newFixedThreadPool(cnt, new Factory(name, systemThread)); pools.put(s, name); return s;