Speedup visualization and widget responses (#9371)

`ExecuteJob` can now be interrupted.
We now have a separate threadpool for visualization-related jobs.

# Important Notes
In a lock step situation, a job or command could have been interrupted while waiting for one of the locks. As locks ensured only that they were released once all of them have been acquired this could leave engine in a broken state.
Once `ExecuteJob` could be interrupted this became a blocker as it prevented project startup almost in every case.
The change also makes it careful to avoid constant `ExecuteJob` restarts.

Addresses #9278. There will be follow up work.
This commit is contained in:
Hubert Plociniczak 2024-03-21 15:06:48 +01:00 committed by GitHub
parent d488841722
commit c22d7422e6
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
22 changed files with 444 additions and 323 deletions

View File

@ -40,13 +40,23 @@ public final class ExecuteExpressionCommand extends SynchronousCommand {
}
@Override
@SuppressWarnings("unchecked")
public void executeSynchronously(RuntimeContext ctx, ExecutionContext ec) {
if (ctx.contextManager().contains(contextId)) {
reply(new Runtime$Api$VisualizationAttached(), ctx);
ctx.jobControlPlane().abortJobs(contextId);
ctx.jobControlPlane()
.abortJobs(
contextId,
job -> {
if (job instanceof ExecuteJob e) {
return e.visualizationTriggered();
} else {
return job instanceof ExecuteExpressionJob;
}
});
ctx.jobProcessor()
.run(new ExecuteExpressionJob(contextId, visualizationId, expressionId, expression))
.flatMap(executable -> ctx.jobProcessor().run(ExecuteJob.apply(executable)), ec);
.flatMap(executable -> ctx.jobProcessor().run(ExecuteJob.apply(executable, true)), ec);
} else {
reply(new Runtime$Api$ContextNotExistError(contextId), ctx);
}

View File

@ -38,28 +38,31 @@ public class SetExecutionEnvironmentCommand extends AsynchronousCommand {
ec);
}
@SuppressWarnings("unchecked")
private void setExecutionEnvironment(
Runtime$Api$ExecutionEnvironment executionEnvironment, UUID contextId, RuntimeContext ctx) {
var logger = ctx.executionService().getLogger();
var contextLockTimestamp = ctx.locking().acquireContextLock(contextId);
var writeLockTimestamp = ctx.locking().acquireWriteCompilationLock();
try {
Stack<InstrumentFrame> stack = ctx.contextManager().getStack(contextId);
ctx.jobControlPlane().abortJobs(contextId);
ctx.executionService()
.getContext()
.setExecutionEnvironment(ExecutionEnvironment.forName(executionEnvironment.name()));
CacheInvalidation.invalidateAll(stack);
ctx.jobProcessor().run(ExecuteJob.apply(contextId, stack.toList()));
reply(new Runtime$Api$SetExecutionEnvironmentResponse(contextId), ctx);
var writeLockTimestamp = ctx.locking().acquireWriteCompilationLock();
try {
Stack<InstrumentFrame> stack = ctx.contextManager().getStack(contextId);
ctx.jobControlPlane().abortJobs(contextId);
ctx.executionService()
.getContext()
.setExecutionEnvironment(ExecutionEnvironment.forName(executionEnvironment.name()));
CacheInvalidation.invalidateAll(stack);
ctx.jobProcessor().run(ExecuteJob.apply(contextId, stack.toList()));
reply(new Runtime$Api$SetExecutionEnvironmentResponse(contextId), ctx);
} finally {
ctx.locking().releaseWriteCompilationLock();
logger.log(
Level.FINEST,
"Kept write compilation lock [SetExecutionEnvironmentCommand] for "
+ (System.currentTimeMillis() - writeLockTimestamp)
+ " milliseconds");
}
} finally {
ctx.locking().releaseWriteCompilationLock();
logger.log(
Level.FINEST,
"Kept write compilation lock [SetExecutionEnvironmentCommand] for "
+ (System.currentTimeMillis() - writeLockTimestamp)
+ " milliseconds");
ctx.locking().releaseContextLock(contextId);
logger.log(
Level.FINEST,

View File

@ -0,0 +1,63 @@
package org.enso.interpreter.instrument.execution;
import java.util.UUID;
import org.enso.interpreter.instrument.job.Job;
/** Controls running jobs. */
public interface JobControlPlane {
/** Aborts all interruptible jobs. */
void abortAllJobs();
/**
* Abort all jobs except the ignored jobs.
*
* @param ignoredJobs the list of jobs to keep in the execution queue
*/
@SuppressWarnings("unchecked")
void abortAllExcept(Class<? extends Job<?>>... ignoredJobs);
/**
* Aborts jobs that relates to the specified execution context.
*
* @param contextId an identifier of a context
* @param classOf abort jobs of a given class only. If empty all jobs for the given context are
* aborted
*/
@SuppressWarnings("unchecked")
void abortJobs(UUID contextId, Class<? extends Job<?>>... classOf);
/**
* Aborts jobs that relate to the specified execution context.
*
* @param contextId an identifier of a context
* @param accept filter that selects jobs to be aborted
*/
@SuppressWarnings("unchecked")
void abortJobs(UUID contextId, java.util.function.Function<Job<?>, Boolean> accept);
/**
* Abort provided background jobs.
*
* @param toAbort the list of jobs to abort
*/
@SuppressWarnings("unchecked")
void abortBackgroundJobs(Class<? extends Job<?>>... toAbort);
/**
* Starts background jobs processing.
*
* @return `true` if the background jobs were started and `false` if they are already running.
*/
boolean startBackgroundJobs();
/**
* Stops background jobs processing.
*
* @return `true` if the call stopped background job, `false` if they are already stopped.
*/
boolean stopBackgroundJobs();
/** Finds the first in-progress job satisfying the `filter` condition */
<T> scala.Option<T> jobInProgress(scala.PartialFunction<Job<?>, scala.Option<T>> filter);
}

View File

@ -26,7 +26,7 @@ public class ExecuteExpressionJob extends Job<Executable> implements UniqueJob<E
*/
public ExecuteExpressionJob(
UUID contextId, UUID visualizationId, UUID expressionId, String expression) {
super(ScalaConversions.cons(contextId, ScalaConversions.nil()), true, false);
super(ScalaConversions.cons(contextId, ScalaConversions.nil()), true, false, true);
this.contextId = contextId;
this.visualizationId = visualizationId;
this.expressionId = expressionId;
@ -58,6 +58,9 @@ public class ExecuteExpressionJob extends Job<Executable> implements UniqueJob<E
@Override
public boolean equalsTo(UniqueJob<?> that) {
return that instanceof ExecuteExpressionJob;
if (that instanceof ExecuteExpressionJob job) {
return contextId == job.contextId && expressionId == job.expressionId;
}
return false;
}
}

View File

@ -8,7 +8,7 @@ import java.nio.charset.StandardCharsets
object WarningPreview {
private val method = ".to_display_text"
private[this] val METHOD = ".to_display_text"
/** Execute preview of the provided warning value.
*
@ -20,7 +20,7 @@ object WarningPreview {
val visualizationExpression =
ctx.executionService.evaluateExpression(
ctx.executionService.getContext.getBuiltins.getModule,
method
METHOD
)
val visualizationResult = ctx.executionService.callFunction(
visualizationExpression,

View File

@ -17,24 +17,30 @@ class CloseFileCmd(request: Api.CloseFileNotification)
ctx: RuntimeContext,
ec: ExecutionContext
): Unit = {
val logger = ctx.executionService.getLogger
val readLockTimestamp = ctx.locking.acquireReadCompilationLock()
val fileLockTimestamp = ctx.locking.acquireFileLock(request.path)
val pendingEditsLockTimestamp = ctx.locking.acquirePendingEditsLock()
val logger = ctx.executionService.getLogger
val readLockTimestamp = ctx.locking.acquireReadCompilationLock()
try {
ctx.state.pendingEdits.dequeue(request.path)
ctx.executionService.resetModuleSources(request.path)
val fileLockTimestamp = ctx.locking.acquireFileLock(request.path)
try {
val pendingEditsLockTimestamp = ctx.locking.acquirePendingEditsLock()
try {
ctx.state.pendingEdits.dequeue(request.path)
ctx.executionService.resetModuleSources(request.path)
} finally {
ctx.locking.releasePendingEditsLock()
logger.log(
Level.FINEST,
"Kept pending edits lock [CloseFileCmd] for " + (System.currentTimeMillis - pendingEditsLockTimestamp) + " milliseconds"
)
}
} finally {
ctx.locking.releaseFileLock(request.path)
logger.log(
Level.FINEST,
"Kept file lock [CloseFileCmd] for " + (System.currentTimeMillis - fileLockTimestamp) + " milliseconds"
)
}
} finally {
ctx.locking.releasePendingEditsLock()
logger.log(
Level.FINEST,
"Kept pending edits lock [CloseFileCmd] for " + (System.currentTimeMillis - pendingEditsLockTimestamp) + " milliseconds"
)
ctx.locking.releaseFileLock(request.path)
logger.log(
Level.FINEST,
"Kept file lock [CloseFileCmd] for " + (System.currentTimeMillis - fileLockTimestamp) + " milliseconds"
)
ctx.locking.releaseReadCompilationLock()
logger.log(
Level.FINEST,

View File

@ -23,29 +23,34 @@ class EditFileCmd(request: Api.EditFileNotification)
ctx: RuntimeContext,
ec: ExecutionContext
): Unit = {
val logger = ctx.executionService.getLogger
val fileLockTimestamp = ctx.locking.acquireFileLock(request.path)
val pendingEditsLockTimestamp = ctx.locking.acquirePendingEditsLock()
val logger = ctx.executionService.getLogger
val fileLockTimestamp = ctx.locking.acquireFileLock(request.path)
try {
logger.log(
Level.FINE,
"Adding pending file edits: {}",
request.edits.map(e => (e.range, e.text.length))
)
val edits =
request.edits.map(edit => PendingEdit.ApplyEdit(edit, request.execute))
ctx.state.pendingEdits.enqueue(request.path, edits)
if (request.execute) {
ctx.jobControlPlane.abortAllJobs()
ctx.jobProcessor.run(new EnsureCompiledJob(Seq(request.path)))
executeJobs.foreach(ctx.jobProcessor.run)
val pendingEditsLockTimestamp = ctx.locking.acquirePendingEditsLock()
try {
logger.log(
Level.FINE,
"Adding pending file edits: {}",
request.edits.map(e => (e.range, e.text.length))
)
val edits =
request.edits.map(edit =>
PendingEdit.ApplyEdit(edit, request.execute)
)
ctx.state.pendingEdits.enqueue(request.path, edits)
if (request.execute) {
ctx.jobControlPlane.abortAllJobs()
ctx.jobProcessor.run(new EnsureCompiledJob(Seq(request.path)))
executeJobs.foreach(ctx.jobProcessor.run)
}
} finally {
ctx.locking.releasePendingEditsLock()
logger.log(
Level.FINEST,
"Kept pending edits lock [EditFileCmd] for " + (System.currentTimeMillis - pendingEditsLockTimestamp) + " milliseconds"
)
}
} finally {
ctx.locking.releasePendingEditsLock()
logger.log(
Level.FINEST,
"Kept pending edits lock [EditFileCmd] for " + (System.currentTimeMillis - pendingEditsLockTimestamp) + " milliseconds"
)
ctx.locking.releaseFileLock(request.path)
logger.log(
Level.FINEST,

View File

@ -75,9 +75,7 @@ class ModifyVisualizationCmd(
case Some(exec) =>
for {
_ <- Future {
ctx.jobProcessor.run(EnsureCompiledJob(exec.stack))
}
_ <- ctx.jobProcessor.run(EnsureCompiledJob(exec.stack))
_ <- ctx.jobProcessor.run(ExecuteJob(exec))
} yield ()
}

View File

@ -23,27 +23,29 @@ class OpenFileCmd(
): Unit = {
val logger = ctx.executionService.getLogger
val readLockTimestamp = ctx.locking.acquireReadCompilationLock()
val fileLockTimestamp = ctx.locking.acquireFileLock(request.path)
try {
ctx.executionService.setModuleSources(
request.path,
request.contents
)
ctx.endpoint.sendToClient(
Api.Response(maybeRequestId, Api.OpenFileResponse)
)
val fileLockTimestamp = ctx.locking.acquireFileLock(request.path)
try {
ctx.executionService.setModuleSources(
request.path,
request.contents
)
ctx.endpoint.sendToClient(
Api.Response(maybeRequestId, Api.OpenFileResponse)
)
} finally {
ctx.locking.releaseFileLock(request.path)
logger.log(
Level.FINEST,
"Kept file lock [OpenFileCmd] for " + (System.currentTimeMillis - fileLockTimestamp) + " milliseconds"
)
}
} finally {
ctx.locking.releaseFileLock(request.path)
logger.log(
Level.FINEST,
"Kept file lock [OpenFileCmd] for " + (System.currentTimeMillis - fileLockTimestamp) + " milliseconds"
)
ctx.locking.releaseReadCompilationLock()
logger.log(
Level.FINEST,
"Kept read compilation lock [OpenFileCmd] for " + (System.currentTimeMillis - readLockTimestamp) + " milliseconds"
)
}
}
}

View File

@ -23,35 +23,37 @@ class SetExpressionValueCmd(request: Api.SetExpressionValueNotification)
ctx: RuntimeContext,
ec: ExecutionContext
): Future[Unit] = {
val logger = ctx.executionService.getLogger
val fileLockTimestamp = ctx.locking.acquireFileLock(request.path)
val pendingEditsLockTimestamp = ctx.locking.acquirePendingEditsLock()
val logger = ctx.executionService.getLogger
val fileLockTimestamp = ctx.locking.acquireFileLock(request.path)
try {
val pendingApplyEdits =
request.edits.map(
PendingEdit.SetExpressionValue(
_,
request.expressionId,
request.expressionValue
val pendingEditsLockTimestamp = ctx.locking.acquirePendingEditsLock()
try {
val pendingApplyEdits =
request.edits.map(
PendingEdit.SetExpressionValue(
_,
request.expressionId,
request.expressionValue
)
)
ctx.state.pendingEdits.enqueue(request.path, pendingApplyEdits)
ctx.jobControlPlane.abortAllJobs()
ctx.jobProcessor.run(new EnsureCompiledJob(Seq(request.path)))
executeJobs.foreach(ctx.jobProcessor.run)
Future.successful(())
} finally {
ctx.locking.releasePendingEditsLock()
logger.log(
Level.FINEST,
"Kept pending edits lock [SetExpressionValueCmd] for " + (System.currentTimeMillis - pendingEditsLockTimestamp) + " milliseconds"
)
ctx.state.pendingEdits.enqueue(request.path, pendingApplyEdits)
ctx.jobControlPlane.abortAllJobs()
ctx.jobProcessor.run(new EnsureCompiledJob(Seq(request.path)))
executeJobs.foreach(ctx.jobProcessor.run)
Future.successful(())
}
} finally {
ctx.locking.releasePendingEditsLock()
logger.log(
Level.FINEST,
"Kept pending edits lock [SetExpressionValueCmd] for " + (System.currentTimeMillis - pendingEditsLockTimestamp) + " milliseconds"
)
ctx.locking.releaseFileLock(request.path)
logger.log(
Level.FINEST,
"Kept file lock [SetExpressionValueCmd] for " + (System.currentTimeMillis - fileLockTimestamp) + " milliseconds"
)
}
}

View File

@ -5,7 +5,7 @@ import org.enso.polyglot.runtime.Runtime.Api
import scala.collection.mutable
/** Represents executable piece of enso program.
/** Represents executable piece of Enso program.
*
* @param contextId an identifier of a context to execute
* @param stack a call stack that must be executed

View File

@ -1,53 +0,0 @@
package org.enso.interpreter.instrument.execution
import org.enso.interpreter.instrument.job.Job
import java.util.UUID
import scala.annotation.varargs
/** Controls running jobs.
*/
trait JobControlPlane {
/** Aborts all interruptible jobs. */
def abortAllJobs(): Unit
/** Abort all jobs except the ignored jobs.
*
* @param ignoredJobs the list of jobs to keep in the execution queue
*/
@varargs
def abortAllExcept(ignoredJobs: Class[_ <: Job[_]]*): Unit
/** Aborts all jobs that relates to the specified execution context.
*
* @param contextId an identifier of a context
*/
def abortJobs(contextId: UUID): Unit
/** Abort provided background jobs.
*
* @param toAbort the list of jobs to abort
*/
@varargs
def abortBackgroundJobs(toAbort: Class[_ <: Job[_]]*): Unit
/** Starts background jobs processing.
*
* @return `true` if the background jobs were started and `false` if they are
* already running.
*/
def startBackgroundJobs(): Boolean
/** Stops background jobs processing.
*
* @return `true` if the call stopped background job, `false` if they are
* already stopped.
*/
def stopBackgroundJobs(): Boolean
/** Finds the first in-progress job satisfying the `filter` condition
*/
def jobInProgress[T](filter: PartialFunction[Job[_], Option[T]]): Option[T]
}

View File

@ -46,6 +46,14 @@ final class JobExecutionEngine(
val jobExecutor: ExecutorService =
context.newFixedThreadPool(jobParallelism, "job-pool", false)
val highPriorityJobExecutor: ExecutorService =
context.newCachedThreadPool(
"prioritized-job-pool",
2,
Integer.MAX_VALUE,
false
)
private val backgroundJobExecutor: ExecutorService =
context.newFixedThreadPool(1, "background-job-pool", false)
@ -84,7 +92,9 @@ final class JobExecutionEngine(
/** @inheritdoc */
override def run[A](job: Job[A]): Future[A] = {
cancelDuplicateJobs(job, runningJobsRef)
runInternal(job, jobExecutor, runningJobsRef)
val executor =
if (job.highPriority) highPriorityJobExecutor else jobExecutor
runInternal(job, executor, runningJobsRef)
}
private def cancelDuplicateJobs[A](
@ -116,19 +126,21 @@ final class JobExecutionEngine(
val jobId = UUID.randomUUID()
val promise = Promise[A]()
val logger = runtimeContext.executionService.getLogger
logger.log(Level.FINE, s"Submitting job: $job...")
logger.log(Level.FINE, s"Submitting job: {0}...", job)
val future = executorService.submit(() => {
logger.log(Level.FINE, s"Executing job: $job...")
logger.log(Level.FINE, s"Executing job: {0}...", job)
val before = System.currentTimeMillis()
try {
val result = job.run(runtimeContext)
val took = System.currentTimeMillis() - before
logger.log(Level.FINE, s"Job $job finished in $took ms.")
logger.log(Level.FINE, s"Job {0} finished in {1} ms.", Array(job, took))
promise.success(result)
} catch {
case NonFatal(ex) =>
logger.log(Level.SEVERE, s"Error executing $job", ex)
promise.failure(ex)
case err: InterruptedException =>
logger.log(Level.WARNING, s"$job got interrupted", err)
case err: Throwable =>
logger.log(Level.SEVERE, s"Error executing $job", err)
throw err
@ -138,7 +150,8 @@ final class JobExecutionEngine(
})
val runningJob = RunningJob(jobId, job, future)
runningJobsRef.updateAndGet(_ :+ runningJob)
val queue = runningJobsRef.updateAndGet(_ :+ runningJob)
logger.log(Level.FINE, "Number of pending jobs: {}", queue.size)
promise.future
}
@ -163,11 +176,33 @@ final class JobExecutionEngine(
}
/** @inheritdoc */
override def abortJobs(contextId: UUID): Unit = {
override def abortJobs(
contextId: UUID,
toAbort: Class[_ <: Job[_]]*
): Unit = {
val allJobs = runningJobsRef.get()
val contextJobs = allJobs.filter(_.job.contextIds.contains(contextId))
contextJobs.foreach { runningJob =>
if (runningJob.job.isCancellable) {
if (
runningJob.job.isCancellable && (toAbort.isEmpty || toAbort
.contains(runningJob.getClass))
) {
runningJob.future.cancel(runningJob.job.mayInterruptIfRunning)
}
}
runtimeContext.executionService.getContext.getThreadManager
.interruptThreads()
}
/** @inheritdoc */
override def abortJobs(
contextId: UUID,
accept: java.util.function.Function[Job[_], java.lang.Boolean]
): Unit = {
val allJobs = runningJobsRef.get()
val contextJobs = allJobs.filter(_.job.contextIds.contains(contextId))
contextJobs.foreach { runningJob =>
if (runningJob.job.isCancellable && accept.apply(runningJob.job)) {
runningJob.future.cancel(runningJob.job.mayInterruptIfRunning)
}
}

View File

@ -290,35 +290,38 @@ final class EnsureCompiledJob(
ctx: RuntimeContext,
logger: TruffleLogger
): Option[Changeset[Rope]] = {
val fileLockTimestamp = ctx.locking.acquireFileLock(file)
val pendingEditsLockTimestamp = ctx.locking.acquirePendingEditsLock()
val fileLockTimestamp = ctx.locking.acquireFileLock(file)
try {
val pendingEdits = ctx.state.pendingEdits.dequeue(file)
val edits = pendingEdits.map(_.edit)
val shouldExecute =
pendingEdits.isEmpty || pendingEdits.exists(_.execute)
val module = ctx.executionService.getContext
.getModuleForFile(file)
.orElseThrow(() => new ModuleNotFoundForFileException(file))
val changesetBuilder = new ChangesetBuilder(
module.getLiteralSource,
module.getIr
)
val changeset = changesetBuilder.build(pendingEdits)
ctx.executionService.modifyModuleSources(
module,
edits,
changeset.simpleUpdate.orNull,
logger
)
Option.when(shouldExecute)(changeset)
val pendingEditsLockTimestamp = ctx.locking.acquirePendingEditsLock()
try {
val pendingEdits = ctx.state.pendingEdits.dequeue(file)
val edits = pendingEdits.map(_.edit)
val shouldExecute =
pendingEdits.isEmpty || pendingEdits.exists(_.execute)
val module = ctx.executionService.getContext
.getModuleForFile(file)
.orElseThrow(() => new ModuleNotFoundForFileException(file))
val changesetBuilder = new ChangesetBuilder(
module.getLiteralSource,
module.getIr
)
val changeset = changesetBuilder.build(pendingEdits)
ctx.executionService.modifyModuleSources(
module,
edits,
changeset.simpleUpdate.orNull,
logger
)
Option.when(shouldExecute)(changeset)
} finally {
ctx.locking.releasePendingEditsLock()
logger.log(
Level.FINEST,
"Kept pending edits lock [EnsureCompiledJob] for {} milliseconds",
System.currentTimeMillis() - pendingEditsLockTimestamp
)
}
} finally {
ctx.locking.releasePendingEditsLock()
logger.log(
Level.FINEST,
"Kept pending edits lock [EnsureCompiledJob] for {} milliseconds",
System.currentTimeMillis() - pendingEditsLockTimestamp
)
ctx.locking.releaseFileLock(file)
logger.log(
Level.FINEST,

View File

@ -5,7 +5,6 @@ import org.enso.interpreter.instrument.InstrumentFrame
import org.enso.interpreter.instrument.execution.{Executable, RuntimeContext}
import org.enso.interpreter.runtime.state.ExecutionEnvironment
import org.enso.polyglot.runtime.Runtime.Api
import org.enso.polyglot.runtime.Runtime.Api.ExecutionResult
import java.util.logging.Level
@ -18,71 +17,79 @@ import java.util.logging.Level
class ExecuteJob(
contextId: UUID,
stack: List[InstrumentFrame],
val executionEnvironment: Option[Api.ExecutionEnvironment]
val executionEnvironment: Option[Api.ExecutionEnvironment],
val visualizationTriggered: Boolean = false
) extends Job[Unit](
List(contextId),
isCancellable = true,
// TODO[MK]: make this interruptible when https://github.com/oracle/graal/issues/3590
// is resolved
mayInterruptIfRunning = false
// Interruptions may turn out to be problematic in enterprise edition of GraalVM
// until https://github.com/oracle/graal/issues/3590 is resolved
mayInterruptIfRunning = true
) {
/** @inheritdoc */
override def run(implicit ctx: RuntimeContext): Unit = {
val logger = ctx.executionService.getLogger
val acquiredLock = ctx.locking.acquireContextLock(contextId)
val readLockTimestamp = ctx.locking.acquireReadCompilationLock()
val context = ctx.executionService.getContext
val originalExecutionEnvironment =
executionEnvironment.map(_ => context.getExecutionEnvironment)
val logger = ctx.executionService.getLogger
val acquiredLock = ctx.locking.acquireContextLock(contextId)
try {
executionEnvironment.foreach(env =>
context.setExecutionEnvironment(ExecutionEnvironment.forName(env.name))
)
val outcome =
try ProgramExecutionSupport.runProgram(contextId, stack)
finally {
originalExecutionEnvironment.foreach(context.setExecutionEnvironment)
}
outcome match {
case Some(diagnostic: Api.ExecutionResult.Diagnostic) =>
if (diagnostic.isError) {
ctx.endpoint.sendToClient(
Api.Response(Api.ExecutionFailed(contextId, diagnostic))
val readLockTimestamp = ctx.locking.acquireReadCompilationLock()
try {
val context = ctx.executionService.getContext
val originalExecutionEnvironment =
executionEnvironment.map(_ => context.getExecutionEnvironment)
executionEnvironment.foreach(env =>
context.setExecutionEnvironment(
ExecutionEnvironment.forName(env.name)
)
)
val outcome =
try ProgramExecutionSupport.runProgram(contextId, stack)
finally {
originalExecutionEnvironment.foreach(
context.setExecutionEnvironment
)
} else {
}
outcome match {
case Some(diagnostic: Api.ExecutionResult.Diagnostic) =>
if (diagnostic.isError) {
ctx.endpoint.sendToClient(
Api.Response(Api.ExecutionFailed(contextId, diagnostic))
)
} else {
ctx.endpoint.sendToClient(
Api.Response(Api.ExecutionUpdate(contextId, Seq(diagnostic)))
)
ctx.endpoint.sendToClient(
Api.Response(Api.ExecutionComplete(contextId))
)
}
case Some(failure: Api.ExecutionResult.Failure) =>
ctx.endpoint.sendToClient(
Api.Response(Api.ExecutionUpdate(contextId, Seq(diagnostic)))
Api.Response(Api.ExecutionFailed(contextId, failure))
)
case None =>
ctx.endpoint.sendToClient(
Api.Response(Api.ExecutionComplete(contextId))
)
}
case Some(failure: Api.ExecutionResult.Failure) =>
}
} catch {
case t: Throwable =>
ctx.endpoint.sendToClient(
Api.Response(Api.ExecutionFailed(contextId, failure))
)
case None =>
ctx.endpoint.sendToClient(
Api.Response(Api.ExecutionComplete(contextId))
)
}
} catch {
case t: Throwable =>
ctx.endpoint.sendToClient(
Api.Response(
Api.ExecutionFailed(
contextId,
ExecutionResult.Failure(t.getMessage, None)
Api.Response(
Api.ExecutionFailed(
contextId,
Api.ExecutionResult.Failure(t.getMessage, None)
)
)
)
} finally {
ctx.locking.releaseReadCompilationLock()
logger.log(
Level.FINEST,
s"Kept read compilation lock [ExecuteJob] for ${System.currentTimeMillis() - readLockTimestamp} milliseconds"
)
}
} finally {
ctx.locking.releaseReadCompilationLock()
logger.log(
Level.FINEST,
s"Kept read compilation lock [ExecuteJob] for ${System.currentTimeMillis() - readLockTimestamp} milliseconds"
)
ctx.locking.releaseContextLock(contextId)
logger.log(
Level.FINEST,
@ -102,10 +109,19 @@ object ExecuteJob {
/** Create execute job from the executable.
*
* @param executable the executable to run
* @param visualizationTriggered true if execution is triggered by a visualization request, false otherwise
* @return the new execute job
*/
def apply(executable: Executable): ExecuteJob =
new ExecuteJob(executable.contextId, executable.stack.toList, None)
def apply(
executable: Executable,
visualizationTriggered: Boolean = false
): ExecuteJob =
new ExecuteJob(
executable.contextId,
executable.stack.toList,
None,
visualizationTriggered
)
/** Create execute job from the context and stack.
*

View File

@ -15,9 +15,18 @@ import org.enso.interpreter.instrument.execution.RuntimeContext
abstract class Job[+A](
val contextIds: List[UUID],
val isCancellable: Boolean,
val mayInterruptIfRunning: Boolean
val mayInterruptIfRunning: Boolean,
val highPriority: Boolean
) {
def this(
contextIds: List[UUID],
isCancellable: Boolean,
mayInterruptIfRunning: Boolean
) = {
this(contextIds, isCancellable, mayInterruptIfRunning, false)
}
/** Executes a job.
*
* @param ctx contains suppliers of services to perform a request

View File

@ -244,10 +244,10 @@ object ProgramExecutionSupport {
}
val (explicitCallOpt, localCalls) = unwind(stack, Nil, Nil)
val executionResult = for {
val executionResult: Either[Option[Api.ExecutionResult], Unit] = for {
stackItem <- Either.fromOption(
explicitCallOpt,
Api.ExecutionResult.Failure("Execution stack is empty.", None)
Some(Api.ExecutionResult.Failure("Execution stack is empty.", None))
)
_ <-
Either
@ -255,7 +255,7 @@ object ProgramExecutionSupport {
.leftMap(onExecutionError(stackItem.item, _))
} yield ()
logger.log(Level.FINEST, s"Execution finished: $executionResult")
executionResult.fold(Some(_), _ => None)
executionResult.fold(identity, _ => None)
}
/** Execution error handler.
@ -267,24 +267,24 @@ object ProgramExecutionSupport {
private def onExecutionError(
item: ExecutionItem,
error: Throwable
)(implicit ctx: RuntimeContext): Api.ExecutionResult = {
)(implicit ctx: RuntimeContext): Option[Api.ExecutionResult] = {
val itemName = item match {
case ExecutionItem.Method(_, _, function) => function
case ExecutionItem.CallData(_, call) => call.getFunction.getName
}
val executionUpdate = getExecutionOutcome(error)
val reason = VisualizationResult.findExceptionMessage(error)
def onFailure() = error match {
def onFailure(): Option[Api.ExecutionResult] = error match {
case _: ThreadInterruptedException =>
val message = s"Execution of function $itemName interrupted."
ctx.executionService.getLogger.log(Level.FINE, message)
ExecutionResult.Diagnostic.warning(message, None)
None
case _ =>
val message = s"Execution of function $itemName failed ($reason)."
ctx.executionService.getLogger.log(Level.WARNING, message, error)
ExecutionResult.Failure(message, None)
Some(ExecutionResult.Failure(message, None))
}
executionUpdate.getOrElse(onFailure())
executionUpdate.orElse(onFailure())
}
/** Convert the runtime exception to the corresponding API error messages.
@ -564,12 +564,13 @@ object ProgramExecutionSupport {
.getOrElse(expressionValue.getClass)
ctx.executionService.getLogger.log(
Level.WARNING,
"Execution of visualization [{0}] on value [{1}] of [{2}] failed. {3}",
"Execution of visualization [{0}] on value [{1}] of [{2}] failed. {3} | {4}",
Array[Object](
visualizationId,
expressionId,
typeOfNode,
message,
expressionValue,
error
)
)

View File

@ -45,7 +45,8 @@ class UpsertVisualizationJob(
) extends Job[Option[Executable]](
List(config.executionContextId),
false,
false
false,
true
)
with UniqueJob[Option[Executable]] {
@ -62,68 +63,72 @@ class UpsertVisualizationJob(
implicit val logger: TruffleLogger = ctx.executionService.getLogger
val lockTimestamp =
ctx.locking.acquireContextLock(config.executionContextId)
val writeLockTimestamp = ctx.locking.acquireWriteCompilationLock()
try {
val maybeCallable =
UpsertVisualizationJob.evaluateVisualizationExpression(
config.visualizationModule,
config.expression
)
maybeCallable match {
case Left(ModuleNotFound(moduleName)) =>
ctx.endpoint.sendToClient(
Api.Response(Api.ModuleNotFound(moduleName))
val writeLockTimestamp = ctx.locking.acquireWriteCompilationLock()
try {
val maybeCallable =
UpsertVisualizationJob.evaluateVisualizationExpression(
config.visualizationModule,
config.expression
)
None
case Left(EvaluationFailed(message, result)) =>
replyWithExpressionFailedError(
config.executionContextId,
visualizationId,
expressionId,
message,
result
)
None
maybeCallable match {
case Left(ModuleNotFound(moduleName)) =>
ctx.endpoint.sendToClient(
Api.Response(Api.ModuleNotFound(moduleName))
)
None
case Right(EvaluationResult(module, callable, arguments)) =>
val visualization =
UpsertVisualizationJob.updateAttachedVisualization(
case Left(EvaluationFailed(message, result)) =>
replyWithExpressionFailedError(
config.executionContextId,
visualizationId,
expressionId,
module,
config,
callable,
arguments
message,
result
)
val stack = ctx.contextManager.getStack(config.executionContextId)
val cachedValue = stack.headOption
.flatMap(frame => Option(frame.cache.get(expressionId)))
UpsertVisualizationJob.requireVisualizationSynchronization(
stack,
expressionId
)
cachedValue match {
case Some(value) =>
ProgramExecutionSupport.executeAndSendVisualizationUpdate(
config.executionContextId,
stack.headOption.get.syncState,
visualization,
None
case Right(EvaluationResult(module, callable, arguments)) =>
val visualization =
UpsertVisualizationJob.updateAttachedVisualization(
visualizationId,
expressionId,
value
module,
config,
callable,
arguments
)
None
case None =>
Some(Executable(config.executionContextId, stack))
}
val stack = ctx.contextManager.getStack(config.executionContextId)
val cachedValue = stack.headOption
.flatMap(frame => Option(frame.cache.get(expressionId)))
UpsertVisualizationJob.requireVisualizationSynchronization(
stack,
expressionId
)
cachedValue match {
case Some(value) =>
ProgramExecutionSupport.executeAndSendVisualizationUpdate(
config.executionContextId,
stack.headOption.get.syncState,
visualization,
expressionId,
value
)
None
case None =>
Some(Executable(config.executionContextId, stack))
}
}
} finally {
ctx.locking.releaseWriteCompilationLock()
logger.log(
Level.FINEST,
s"Kept write compilation lock [UpsertVisualizationJob] for ${System
.currentTimeMillis() - writeLockTimestamp} milliseconds"
)
}
} finally {
ctx.locking.releaseWriteCompilationLock()
logger.log(
Level.FINEST,
s"Kept write compilation lock [UpsertVisualizationJob] for ${System.currentTimeMillis() - writeLockTimestamp} milliseconds"
)
ctx.locking.releaseContextLock(config.executionContextId)
logger.log(
Level.FINEST,

View File

@ -220,28 +220,24 @@ class RuntimeAsyncCommandsTest
context.send(
Api.Request(requestId, Api.InterruptContextRequest(contextId))
)
context.receiveNIgnoreExpressionUpdates(
3
) should contain theSameElementsAs Seq(
Api.Response(requestId, Api.InterruptContextResponse(contextId)),
Api.Response(
Api.ExecutionUpdate(
contextId,
Seq(
Api.ExecutionResult.Diagnostic(
Api.DiagnosticType.Warning,
Some("Execution of function main interrupted."),
None,
None,
None,
Vector()
)
)
)
),
Api.Response(
Api.ExecutionComplete(contextId)
)
val responses = context.receiveNIgnoreExpressionUpdates(
2
)
responses.length shouldEqual 2
responses should contain(
Api.Response(requestId, Api.InterruptContextResponse(contextId))
)
val failures = responses.filter(_.payload.isInstanceOf[Api.ExecutionFailed])
failures.length shouldEqual 1
val failure = failures.head.payload.asInstanceOf[Api.ExecutionFailed]
failure.contextId shouldEqual contextId
failure.result shouldBe a[Api.ExecutionResult.Diagnostic]
val diagnostic = failure.result.asInstanceOf[Api.ExecutionResult.Diagnostic]
diagnostic.kind shouldEqual Api.DiagnosticType.Error
diagnostic.message shouldEqual Some("sleep interrupted")
diagnostic.stack should not be empty
}
}

View File

@ -730,6 +730,10 @@ public final class EnsoContext {
return threadExecutors.newCachedThreadPool(name, systemThreads);
}
public ExecutorService newCachedThreadPool(String name, int min, int max, boolean systemThreads) {
return threadExecutors.newCachedThreadPool(name, systemThreads, min, max);
}
/**
* @param parallel amount of parallelism for the pool
* @param name human-readable name of the pool

View File

@ -14,8 +14,8 @@ import org.slf4j.LoggerFactory;
/**
* Host class loader that serves as a replacement for {@link
* com.oracle.truffle.host.HostClassLoader}. Add URLs to Jar archives with {@link #add(URL)}. All
* the classes that are loded via this class loader are first searched inside those archives. If not
* found, delegates to parent class loaders.
* the classes that are loaded via this class loader are first searched inside those archives. If
* not found, delegates to parent class loaders.
*/
public class HostClassLoader extends URLClassLoader {

View File

@ -3,9 +3,7 @@ package org.enso.interpreter.runtime;
import java.util.Collections;
import java.util.Map;
import java.util.WeakHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.*;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;
@ -20,13 +18,28 @@ final class ThreadExecutors {
this.context = context;
}
final ExecutorService newCachedThreadPool(String name, boolean systemThread) {
ExecutorService newCachedThreadPool(String name, boolean systemThread) {
var s = Executors.newCachedThreadPool(new Factory(name, systemThread));
pools.put(s, name);
return s;
}
final ExecutorService newFixedThreadPool(int cnt, String name, boolean systemThread) {
ExecutorService newCachedThreadPool(String name, boolean systemThread, int min, int max) {
assert min >= 0;
assert max <= Integer.MAX_VALUE;
var s =
new ThreadPoolExecutor(
min,
max,
60L,
TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
new Factory(name, systemThread));
pools.put(s, name);
return s;
}
ExecutorService newFixedThreadPool(int cnt, String name, boolean systemThread) {
var s = Executors.newFixedThreadPool(cnt, new Factory(name, systemThread));
pools.put(s, name);
return s;