From c22d7422e69e6b1864c1baf36c2dc81180c958f0 Mon Sep 17 00:00:00 2001 From: Hubert Plociniczak Date: Thu, 21 Mar 2024 15:06:48 +0100 Subject: [PATCH] Speedup visualization and widget responses (#9371) `ExecuteJob` can now be interrupted. We now have a separate threadpool for visualization-related jobs. # Important Notes In a lock step situation, a job or command could have been interrupted while waiting for one of the locks. As locks ensured only that they were released once all of them have been acquired this could leave engine in a broken state. Once `ExecuteJob` could be interrupted this became a blocker as it prevented project startup almost in every case. The change also makes it careful to avoid constant `ExecuteJob` restarts. Addresses #9278. There will be follow up work. --- .../command/ExecuteExpressionCommand.java | 14 ++- .../SetExecutionEnvironmentCommand.java | 35 +++--- .../instrument/execution/JobControlPlane.java | 63 ++++++++++ .../instrument/job/ExecuteExpressionJob.java | 7 +- .../instrument/WarningPreview.scala | 4 +- .../instrument/command/CloseFileCmd.scala | 38 +++--- .../instrument/command/EditFileCmd.scala | 45 ++++--- .../command/ModifyVisualizationCmd.scala | 4 +- .../instrument/command/OpenFileCmd.scala | 30 ++--- .../command/SetExpressionValueCmd.scala | 42 ++++--- .../instrument/execution/Executable.scala | 2 +- .../execution/JobControlPlane.scala | 53 -------- .../execution/JobExecutionEngine.scala | 49 ++++++-- .../instrument/job/EnsureCompiledJob.scala | 57 ++++----- .../instrument/job/ExecuteJob.scala | 114 ++++++++++-------- .../enso/interpreter/instrument/job/Job.scala | 11 +- .../job/ProgramExecutionSupport.scala | 19 +-- .../job/UpsertVisualizationJob.scala | 109 +++++++++-------- .../instrument/RuntimeAsyncCommandsTest.scala | 40 +++--- .../enso/interpreter/runtime/EnsoContext.java | 4 + .../interpreter/runtime/HostClassLoader.java | 4 +- .../interpreter/runtime/ThreadExecutors.java | 23 +++- 22 files changed, 444 insertions(+), 323 deletions(-) create mode 100644 engine/runtime-instrument-common/src/main/java/org/enso/interpreter/instrument/execution/JobControlPlane.java delete mode 100644 engine/runtime-instrument-common/src/main/scala/org/enso/interpreter/instrument/execution/JobControlPlane.scala diff --git a/engine/runtime-instrument-common/src/main/java/org/enso/interpreter/instrument/command/ExecuteExpressionCommand.java b/engine/runtime-instrument-common/src/main/java/org/enso/interpreter/instrument/command/ExecuteExpressionCommand.java index 632b48c37b..107bbc6ea3 100644 --- a/engine/runtime-instrument-common/src/main/java/org/enso/interpreter/instrument/command/ExecuteExpressionCommand.java +++ b/engine/runtime-instrument-common/src/main/java/org/enso/interpreter/instrument/command/ExecuteExpressionCommand.java @@ -40,13 +40,23 @@ public final class ExecuteExpressionCommand extends SynchronousCommand { } @Override + @SuppressWarnings("unchecked") public void executeSynchronously(RuntimeContext ctx, ExecutionContext ec) { if (ctx.contextManager().contains(contextId)) { reply(new Runtime$Api$VisualizationAttached(), ctx); - ctx.jobControlPlane().abortJobs(contextId); + ctx.jobControlPlane() + .abortJobs( + contextId, + job -> { + if (job instanceof ExecuteJob e) { + return e.visualizationTriggered(); + } else { + return job instanceof ExecuteExpressionJob; + } + }); ctx.jobProcessor() .run(new ExecuteExpressionJob(contextId, visualizationId, expressionId, expression)) - .flatMap(executable -> ctx.jobProcessor().run(ExecuteJob.apply(executable)), ec); + .flatMap(executable -> ctx.jobProcessor().run(ExecuteJob.apply(executable, true)), ec); } else { reply(new Runtime$Api$ContextNotExistError(contextId), ctx); } diff --git a/engine/runtime-instrument-common/src/main/java/org/enso/interpreter/instrument/command/SetExecutionEnvironmentCommand.java b/engine/runtime-instrument-common/src/main/java/org/enso/interpreter/instrument/command/SetExecutionEnvironmentCommand.java index 04877fc530..73412ff20a 100644 --- a/engine/runtime-instrument-common/src/main/java/org/enso/interpreter/instrument/command/SetExecutionEnvironmentCommand.java +++ b/engine/runtime-instrument-common/src/main/java/org/enso/interpreter/instrument/command/SetExecutionEnvironmentCommand.java @@ -38,28 +38,31 @@ public class SetExecutionEnvironmentCommand extends AsynchronousCommand { ec); } + @SuppressWarnings("unchecked") private void setExecutionEnvironment( Runtime$Api$ExecutionEnvironment executionEnvironment, UUID contextId, RuntimeContext ctx) { var logger = ctx.executionService().getLogger(); var contextLockTimestamp = ctx.locking().acquireContextLock(contextId); - var writeLockTimestamp = ctx.locking().acquireWriteCompilationLock(); - try { - Stack stack = ctx.contextManager().getStack(contextId); - ctx.jobControlPlane().abortJobs(contextId); - ctx.executionService() - .getContext() - .setExecutionEnvironment(ExecutionEnvironment.forName(executionEnvironment.name())); - CacheInvalidation.invalidateAll(stack); - ctx.jobProcessor().run(ExecuteJob.apply(contextId, stack.toList())); - reply(new Runtime$Api$SetExecutionEnvironmentResponse(contextId), ctx); + var writeLockTimestamp = ctx.locking().acquireWriteCompilationLock(); + try { + Stack stack = ctx.contextManager().getStack(contextId); + ctx.jobControlPlane().abortJobs(contextId); + ctx.executionService() + .getContext() + .setExecutionEnvironment(ExecutionEnvironment.forName(executionEnvironment.name())); + CacheInvalidation.invalidateAll(stack); + ctx.jobProcessor().run(ExecuteJob.apply(contextId, stack.toList())); + reply(new Runtime$Api$SetExecutionEnvironmentResponse(contextId), ctx); + } finally { + ctx.locking().releaseWriteCompilationLock(); + logger.log( + Level.FINEST, + "Kept write compilation lock [SetExecutionEnvironmentCommand] for " + + (System.currentTimeMillis() - writeLockTimestamp) + + " milliseconds"); + } } finally { - ctx.locking().releaseWriteCompilationLock(); - logger.log( - Level.FINEST, - "Kept write compilation lock [SetExecutionEnvironmentCommand] for " - + (System.currentTimeMillis() - writeLockTimestamp) - + " milliseconds"); ctx.locking().releaseContextLock(contextId); logger.log( Level.FINEST, diff --git a/engine/runtime-instrument-common/src/main/java/org/enso/interpreter/instrument/execution/JobControlPlane.java b/engine/runtime-instrument-common/src/main/java/org/enso/interpreter/instrument/execution/JobControlPlane.java new file mode 100644 index 0000000000..d28e68d869 --- /dev/null +++ b/engine/runtime-instrument-common/src/main/java/org/enso/interpreter/instrument/execution/JobControlPlane.java @@ -0,0 +1,63 @@ +package org.enso.interpreter.instrument.execution; + +import java.util.UUID; +import org.enso.interpreter.instrument.job.Job; + +/** Controls running jobs. */ +public interface JobControlPlane { + + /** Aborts all interruptible jobs. */ + void abortAllJobs(); + + /** + * Abort all jobs except the ignored jobs. + * + * @param ignoredJobs the list of jobs to keep in the execution queue + */ + @SuppressWarnings("unchecked") + void abortAllExcept(Class>... ignoredJobs); + + /** + * Aborts jobs that relates to the specified execution context. + * + * @param contextId an identifier of a context + * @param classOf abort jobs of a given class only. If empty all jobs for the given context are + * aborted + */ + @SuppressWarnings("unchecked") + void abortJobs(UUID contextId, Class>... classOf); + + /** + * Aborts jobs that relate to the specified execution context. + * + * @param contextId an identifier of a context + * @param accept filter that selects jobs to be aborted + */ + @SuppressWarnings("unchecked") + void abortJobs(UUID contextId, java.util.function.Function, Boolean> accept); + + /** + * Abort provided background jobs. + * + * @param toAbort the list of jobs to abort + */ + @SuppressWarnings("unchecked") + void abortBackgroundJobs(Class>... toAbort); + + /** + * Starts background jobs processing. + * + * @return `true` if the background jobs were started and `false` if they are already running. + */ + boolean startBackgroundJobs(); + + /** + * Stops background jobs processing. + * + * @return `true` if the call stopped background job, `false` if they are already stopped. + */ + boolean stopBackgroundJobs(); + + /** Finds the first in-progress job satisfying the `filter` condition */ + scala.Option jobInProgress(scala.PartialFunction, scala.Option> filter); +} diff --git a/engine/runtime-instrument-common/src/main/java/org/enso/interpreter/instrument/job/ExecuteExpressionJob.java b/engine/runtime-instrument-common/src/main/java/org/enso/interpreter/instrument/job/ExecuteExpressionJob.java index c86b581b77..b92d571a03 100644 --- a/engine/runtime-instrument-common/src/main/java/org/enso/interpreter/instrument/job/ExecuteExpressionJob.java +++ b/engine/runtime-instrument-common/src/main/java/org/enso/interpreter/instrument/job/ExecuteExpressionJob.java @@ -26,7 +26,7 @@ public class ExecuteExpressionJob extends Job implements UniqueJob implements UniqueJob that) { - return that instanceof ExecuteExpressionJob; + if (that instanceof ExecuteExpressionJob job) { + return contextId == job.contextId && expressionId == job.expressionId; + } + return false; } } diff --git a/engine/runtime-instrument-common/src/main/scala/org/enso/interpreter/instrument/WarningPreview.scala b/engine/runtime-instrument-common/src/main/scala/org/enso/interpreter/instrument/WarningPreview.scala index e35d2bdce0..7e28390732 100644 --- a/engine/runtime-instrument-common/src/main/scala/org/enso/interpreter/instrument/WarningPreview.scala +++ b/engine/runtime-instrument-common/src/main/scala/org/enso/interpreter/instrument/WarningPreview.scala @@ -8,7 +8,7 @@ import java.nio.charset.StandardCharsets object WarningPreview { - private val method = ".to_display_text" + private[this] val METHOD = ".to_display_text" /** Execute preview of the provided warning value. * @@ -20,7 +20,7 @@ object WarningPreview { val visualizationExpression = ctx.executionService.evaluateExpression( ctx.executionService.getContext.getBuiltins.getModule, - method + METHOD ) val visualizationResult = ctx.executionService.callFunction( visualizationExpression, diff --git a/engine/runtime-instrument-common/src/main/scala/org/enso/interpreter/instrument/command/CloseFileCmd.scala b/engine/runtime-instrument-common/src/main/scala/org/enso/interpreter/instrument/command/CloseFileCmd.scala index 01efd47738..973b320feb 100644 --- a/engine/runtime-instrument-common/src/main/scala/org/enso/interpreter/instrument/command/CloseFileCmd.scala +++ b/engine/runtime-instrument-common/src/main/scala/org/enso/interpreter/instrument/command/CloseFileCmd.scala @@ -17,24 +17,30 @@ class CloseFileCmd(request: Api.CloseFileNotification) ctx: RuntimeContext, ec: ExecutionContext ): Unit = { - val logger = ctx.executionService.getLogger - val readLockTimestamp = ctx.locking.acquireReadCompilationLock() - val fileLockTimestamp = ctx.locking.acquireFileLock(request.path) - val pendingEditsLockTimestamp = ctx.locking.acquirePendingEditsLock() + val logger = ctx.executionService.getLogger + val readLockTimestamp = ctx.locking.acquireReadCompilationLock() try { - ctx.state.pendingEdits.dequeue(request.path) - ctx.executionService.resetModuleSources(request.path) + val fileLockTimestamp = ctx.locking.acquireFileLock(request.path) + try { + val pendingEditsLockTimestamp = ctx.locking.acquirePendingEditsLock() + try { + ctx.state.pendingEdits.dequeue(request.path) + ctx.executionService.resetModuleSources(request.path) + } finally { + ctx.locking.releasePendingEditsLock() + logger.log( + Level.FINEST, + "Kept pending edits lock [CloseFileCmd] for " + (System.currentTimeMillis - pendingEditsLockTimestamp) + " milliseconds" + ) + } + } finally { + ctx.locking.releaseFileLock(request.path) + logger.log( + Level.FINEST, + "Kept file lock [CloseFileCmd] for " + (System.currentTimeMillis - fileLockTimestamp) + " milliseconds" + ) + } } finally { - ctx.locking.releasePendingEditsLock() - logger.log( - Level.FINEST, - "Kept pending edits lock [CloseFileCmd] for " + (System.currentTimeMillis - pendingEditsLockTimestamp) + " milliseconds" - ) - ctx.locking.releaseFileLock(request.path) - logger.log( - Level.FINEST, - "Kept file lock [CloseFileCmd] for " + (System.currentTimeMillis - fileLockTimestamp) + " milliseconds" - ) ctx.locking.releaseReadCompilationLock() logger.log( Level.FINEST, diff --git a/engine/runtime-instrument-common/src/main/scala/org/enso/interpreter/instrument/command/EditFileCmd.scala b/engine/runtime-instrument-common/src/main/scala/org/enso/interpreter/instrument/command/EditFileCmd.scala index 553ac7efe0..90a6f7acce 100644 --- a/engine/runtime-instrument-common/src/main/scala/org/enso/interpreter/instrument/command/EditFileCmd.scala +++ b/engine/runtime-instrument-common/src/main/scala/org/enso/interpreter/instrument/command/EditFileCmd.scala @@ -23,29 +23,34 @@ class EditFileCmd(request: Api.EditFileNotification) ctx: RuntimeContext, ec: ExecutionContext ): Unit = { - val logger = ctx.executionService.getLogger - val fileLockTimestamp = ctx.locking.acquireFileLock(request.path) - val pendingEditsLockTimestamp = ctx.locking.acquirePendingEditsLock() + val logger = ctx.executionService.getLogger + val fileLockTimestamp = ctx.locking.acquireFileLock(request.path) try { - logger.log( - Level.FINE, - "Adding pending file edits: {}", - request.edits.map(e => (e.range, e.text.length)) - ) - val edits = - request.edits.map(edit => PendingEdit.ApplyEdit(edit, request.execute)) - ctx.state.pendingEdits.enqueue(request.path, edits) - if (request.execute) { - ctx.jobControlPlane.abortAllJobs() - ctx.jobProcessor.run(new EnsureCompiledJob(Seq(request.path))) - executeJobs.foreach(ctx.jobProcessor.run) + val pendingEditsLockTimestamp = ctx.locking.acquirePendingEditsLock() + try { + logger.log( + Level.FINE, + "Adding pending file edits: {}", + request.edits.map(e => (e.range, e.text.length)) + ) + val edits = + request.edits.map(edit => + PendingEdit.ApplyEdit(edit, request.execute) + ) + ctx.state.pendingEdits.enqueue(request.path, edits) + if (request.execute) { + ctx.jobControlPlane.abortAllJobs() + ctx.jobProcessor.run(new EnsureCompiledJob(Seq(request.path))) + executeJobs.foreach(ctx.jobProcessor.run) + } + } finally { + ctx.locking.releasePendingEditsLock() + logger.log( + Level.FINEST, + "Kept pending edits lock [EditFileCmd] for " + (System.currentTimeMillis - pendingEditsLockTimestamp) + " milliseconds" + ) } } finally { - ctx.locking.releasePendingEditsLock() - logger.log( - Level.FINEST, - "Kept pending edits lock [EditFileCmd] for " + (System.currentTimeMillis - pendingEditsLockTimestamp) + " milliseconds" - ) ctx.locking.releaseFileLock(request.path) logger.log( Level.FINEST, diff --git a/engine/runtime-instrument-common/src/main/scala/org/enso/interpreter/instrument/command/ModifyVisualizationCmd.scala b/engine/runtime-instrument-common/src/main/scala/org/enso/interpreter/instrument/command/ModifyVisualizationCmd.scala index 8dc048b3ac..78502216d0 100644 --- a/engine/runtime-instrument-common/src/main/scala/org/enso/interpreter/instrument/command/ModifyVisualizationCmd.scala +++ b/engine/runtime-instrument-common/src/main/scala/org/enso/interpreter/instrument/command/ModifyVisualizationCmd.scala @@ -75,9 +75,7 @@ class ModifyVisualizationCmd( case Some(exec) => for { - _ <- Future { - ctx.jobProcessor.run(EnsureCompiledJob(exec.stack)) - } + _ <- ctx.jobProcessor.run(EnsureCompiledJob(exec.stack)) _ <- ctx.jobProcessor.run(ExecuteJob(exec)) } yield () } diff --git a/engine/runtime-instrument-common/src/main/scala/org/enso/interpreter/instrument/command/OpenFileCmd.scala b/engine/runtime-instrument-common/src/main/scala/org/enso/interpreter/instrument/command/OpenFileCmd.scala index 6d238af0b1..5348eadfb5 100644 --- a/engine/runtime-instrument-common/src/main/scala/org/enso/interpreter/instrument/command/OpenFileCmd.scala +++ b/engine/runtime-instrument-common/src/main/scala/org/enso/interpreter/instrument/command/OpenFileCmd.scala @@ -23,27 +23,29 @@ class OpenFileCmd( ): Unit = { val logger = ctx.executionService.getLogger val readLockTimestamp = ctx.locking.acquireReadCompilationLock() - val fileLockTimestamp = ctx.locking.acquireFileLock(request.path) try { - ctx.executionService.setModuleSources( - request.path, - request.contents - ) - ctx.endpoint.sendToClient( - Api.Response(maybeRequestId, Api.OpenFileResponse) - ) + val fileLockTimestamp = ctx.locking.acquireFileLock(request.path) + try { + ctx.executionService.setModuleSources( + request.path, + request.contents + ) + ctx.endpoint.sendToClient( + Api.Response(maybeRequestId, Api.OpenFileResponse) + ) + } finally { + ctx.locking.releaseFileLock(request.path) + logger.log( + Level.FINEST, + "Kept file lock [OpenFileCmd] for " + (System.currentTimeMillis - fileLockTimestamp) + " milliseconds" + ) + } } finally { - ctx.locking.releaseFileLock(request.path) - logger.log( - Level.FINEST, - "Kept file lock [OpenFileCmd] for " + (System.currentTimeMillis - fileLockTimestamp) + " milliseconds" - ) ctx.locking.releaseReadCompilationLock() logger.log( Level.FINEST, "Kept read compilation lock [OpenFileCmd] for " + (System.currentTimeMillis - readLockTimestamp) + " milliseconds" ) - } } } diff --git a/engine/runtime-instrument-common/src/main/scala/org/enso/interpreter/instrument/command/SetExpressionValueCmd.scala b/engine/runtime-instrument-common/src/main/scala/org/enso/interpreter/instrument/command/SetExpressionValueCmd.scala index 62ee7c8e7b..3df15f756f 100644 --- a/engine/runtime-instrument-common/src/main/scala/org/enso/interpreter/instrument/command/SetExpressionValueCmd.scala +++ b/engine/runtime-instrument-common/src/main/scala/org/enso/interpreter/instrument/command/SetExpressionValueCmd.scala @@ -23,35 +23,37 @@ class SetExpressionValueCmd(request: Api.SetExpressionValueNotification) ctx: RuntimeContext, ec: ExecutionContext ): Future[Unit] = { - val logger = ctx.executionService.getLogger - val fileLockTimestamp = ctx.locking.acquireFileLock(request.path) - val pendingEditsLockTimestamp = ctx.locking.acquirePendingEditsLock() + val logger = ctx.executionService.getLogger + val fileLockTimestamp = ctx.locking.acquireFileLock(request.path) try { - val pendingApplyEdits = - request.edits.map( - PendingEdit.SetExpressionValue( - _, - request.expressionId, - request.expressionValue + val pendingEditsLockTimestamp = ctx.locking.acquirePendingEditsLock() + try { + val pendingApplyEdits = + request.edits.map( + PendingEdit.SetExpressionValue( + _, + request.expressionId, + request.expressionValue + ) ) + ctx.state.pendingEdits.enqueue(request.path, pendingApplyEdits) + ctx.jobControlPlane.abortAllJobs() + ctx.jobProcessor.run(new EnsureCompiledJob(Seq(request.path))) + executeJobs.foreach(ctx.jobProcessor.run) + Future.successful(()) + } finally { + ctx.locking.releasePendingEditsLock() + logger.log( + Level.FINEST, + "Kept pending edits lock [SetExpressionValueCmd] for " + (System.currentTimeMillis - pendingEditsLockTimestamp) + " milliseconds" ) - ctx.state.pendingEdits.enqueue(request.path, pendingApplyEdits) - ctx.jobControlPlane.abortAllJobs() - ctx.jobProcessor.run(new EnsureCompiledJob(Seq(request.path))) - executeJobs.foreach(ctx.jobProcessor.run) - Future.successful(()) + } } finally { - ctx.locking.releasePendingEditsLock() - logger.log( - Level.FINEST, - "Kept pending edits lock [SetExpressionValueCmd] for " + (System.currentTimeMillis - pendingEditsLockTimestamp) + " milliseconds" - ) ctx.locking.releaseFileLock(request.path) logger.log( Level.FINEST, "Kept file lock [SetExpressionValueCmd] for " + (System.currentTimeMillis - fileLockTimestamp) + " milliseconds" ) - } } diff --git a/engine/runtime-instrument-common/src/main/scala/org/enso/interpreter/instrument/execution/Executable.scala b/engine/runtime-instrument-common/src/main/scala/org/enso/interpreter/instrument/execution/Executable.scala index f53461e2a3..7f3db8485f 100644 --- a/engine/runtime-instrument-common/src/main/scala/org/enso/interpreter/instrument/execution/Executable.scala +++ b/engine/runtime-instrument-common/src/main/scala/org/enso/interpreter/instrument/execution/Executable.scala @@ -5,7 +5,7 @@ import org.enso.polyglot.runtime.Runtime.Api import scala.collection.mutable -/** Represents executable piece of enso program. +/** Represents executable piece of Enso program. * * @param contextId an identifier of a context to execute * @param stack a call stack that must be executed diff --git a/engine/runtime-instrument-common/src/main/scala/org/enso/interpreter/instrument/execution/JobControlPlane.scala b/engine/runtime-instrument-common/src/main/scala/org/enso/interpreter/instrument/execution/JobControlPlane.scala deleted file mode 100644 index 73ecb9458b..0000000000 --- a/engine/runtime-instrument-common/src/main/scala/org/enso/interpreter/instrument/execution/JobControlPlane.scala +++ /dev/null @@ -1,53 +0,0 @@ -package org.enso.interpreter.instrument.execution - -import org.enso.interpreter.instrument.job.Job - -import java.util.UUID - -import scala.annotation.varargs - -/** Controls running jobs. - */ -trait JobControlPlane { - - /** Aborts all interruptible jobs. */ - def abortAllJobs(): Unit - - /** Abort all jobs except the ignored jobs. - * - * @param ignoredJobs the list of jobs to keep in the execution queue - */ - @varargs - def abortAllExcept(ignoredJobs: Class[_ <: Job[_]]*): Unit - - /** Aborts all jobs that relates to the specified execution context. - * - * @param contextId an identifier of a context - */ - def abortJobs(contextId: UUID): Unit - - /** Abort provided background jobs. - * - * @param toAbort the list of jobs to abort - */ - @varargs - def abortBackgroundJobs(toAbort: Class[_ <: Job[_]]*): Unit - - /** Starts background jobs processing. - * - * @return `true` if the background jobs were started and `false` if they are - * already running. - */ - def startBackgroundJobs(): Boolean - - /** Stops background jobs processing. - * - * @return `true` if the call stopped background job, `false` if they are - * already stopped. - */ - def stopBackgroundJobs(): Boolean - - /** Finds the first in-progress job satisfying the `filter` condition - */ - def jobInProgress[T](filter: PartialFunction[Job[_], Option[T]]): Option[T] -} diff --git a/engine/runtime-instrument-common/src/main/scala/org/enso/interpreter/instrument/execution/JobExecutionEngine.scala b/engine/runtime-instrument-common/src/main/scala/org/enso/interpreter/instrument/execution/JobExecutionEngine.scala index b1cc10c3b1..d979b47495 100644 --- a/engine/runtime-instrument-common/src/main/scala/org/enso/interpreter/instrument/execution/JobExecutionEngine.scala +++ b/engine/runtime-instrument-common/src/main/scala/org/enso/interpreter/instrument/execution/JobExecutionEngine.scala @@ -46,6 +46,14 @@ final class JobExecutionEngine( val jobExecutor: ExecutorService = context.newFixedThreadPool(jobParallelism, "job-pool", false) + val highPriorityJobExecutor: ExecutorService = + context.newCachedThreadPool( + "prioritized-job-pool", + 2, + Integer.MAX_VALUE, + false + ) + private val backgroundJobExecutor: ExecutorService = context.newFixedThreadPool(1, "background-job-pool", false) @@ -84,7 +92,9 @@ final class JobExecutionEngine( /** @inheritdoc */ override def run[A](job: Job[A]): Future[A] = { cancelDuplicateJobs(job, runningJobsRef) - runInternal(job, jobExecutor, runningJobsRef) + val executor = + if (job.highPriority) highPriorityJobExecutor else jobExecutor + runInternal(job, executor, runningJobsRef) } private def cancelDuplicateJobs[A]( @@ -116,19 +126,21 @@ final class JobExecutionEngine( val jobId = UUID.randomUUID() val promise = Promise[A]() val logger = runtimeContext.executionService.getLogger - logger.log(Level.FINE, s"Submitting job: $job...") + logger.log(Level.FINE, s"Submitting job: {0}...", job) val future = executorService.submit(() => { - logger.log(Level.FINE, s"Executing job: $job...") + logger.log(Level.FINE, s"Executing job: {0}...", job) val before = System.currentTimeMillis() try { val result = job.run(runtimeContext) val took = System.currentTimeMillis() - before - logger.log(Level.FINE, s"Job $job finished in $took ms.") + logger.log(Level.FINE, s"Job {0} finished in {1} ms.", Array(job, took)) promise.success(result) } catch { case NonFatal(ex) => logger.log(Level.SEVERE, s"Error executing $job", ex) promise.failure(ex) + case err: InterruptedException => + logger.log(Level.WARNING, s"$job got interrupted", err) case err: Throwable => logger.log(Level.SEVERE, s"Error executing $job", err) throw err @@ -138,7 +150,8 @@ final class JobExecutionEngine( }) val runningJob = RunningJob(jobId, job, future) - runningJobsRef.updateAndGet(_ :+ runningJob) + val queue = runningJobsRef.updateAndGet(_ :+ runningJob) + logger.log(Level.FINE, "Number of pending jobs: {}", queue.size) promise.future } @@ -163,11 +176,33 @@ final class JobExecutionEngine( } /** @inheritdoc */ - override def abortJobs(contextId: UUID): Unit = { + override def abortJobs( + contextId: UUID, + toAbort: Class[_ <: Job[_]]* + ): Unit = { val allJobs = runningJobsRef.get() val contextJobs = allJobs.filter(_.job.contextIds.contains(contextId)) contextJobs.foreach { runningJob => - if (runningJob.job.isCancellable) { + if ( + runningJob.job.isCancellable && (toAbort.isEmpty || toAbort + .contains(runningJob.getClass)) + ) { + runningJob.future.cancel(runningJob.job.mayInterruptIfRunning) + } + } + runtimeContext.executionService.getContext.getThreadManager + .interruptThreads() + } + + /** @inheritdoc */ + override def abortJobs( + contextId: UUID, + accept: java.util.function.Function[Job[_], java.lang.Boolean] + ): Unit = { + val allJobs = runningJobsRef.get() + val contextJobs = allJobs.filter(_.job.contextIds.contains(contextId)) + contextJobs.foreach { runningJob => + if (runningJob.job.isCancellable && accept.apply(runningJob.job)) { runningJob.future.cancel(runningJob.job.mayInterruptIfRunning) } } diff --git a/engine/runtime-instrument-common/src/main/scala/org/enso/interpreter/instrument/job/EnsureCompiledJob.scala b/engine/runtime-instrument-common/src/main/scala/org/enso/interpreter/instrument/job/EnsureCompiledJob.scala index 4613e58be5..e2d22ff32a 100644 --- a/engine/runtime-instrument-common/src/main/scala/org/enso/interpreter/instrument/job/EnsureCompiledJob.scala +++ b/engine/runtime-instrument-common/src/main/scala/org/enso/interpreter/instrument/job/EnsureCompiledJob.scala @@ -290,35 +290,38 @@ final class EnsureCompiledJob( ctx: RuntimeContext, logger: TruffleLogger ): Option[Changeset[Rope]] = { - val fileLockTimestamp = ctx.locking.acquireFileLock(file) - val pendingEditsLockTimestamp = ctx.locking.acquirePendingEditsLock() + val fileLockTimestamp = ctx.locking.acquireFileLock(file) try { - val pendingEdits = ctx.state.pendingEdits.dequeue(file) - val edits = pendingEdits.map(_.edit) - val shouldExecute = - pendingEdits.isEmpty || pendingEdits.exists(_.execute) - val module = ctx.executionService.getContext - .getModuleForFile(file) - .orElseThrow(() => new ModuleNotFoundForFileException(file)) - val changesetBuilder = new ChangesetBuilder( - module.getLiteralSource, - module.getIr - ) - val changeset = changesetBuilder.build(pendingEdits) - ctx.executionService.modifyModuleSources( - module, - edits, - changeset.simpleUpdate.orNull, - logger - ) - Option.when(shouldExecute)(changeset) + val pendingEditsLockTimestamp = ctx.locking.acquirePendingEditsLock() + try { + val pendingEdits = ctx.state.pendingEdits.dequeue(file) + val edits = pendingEdits.map(_.edit) + val shouldExecute = + pendingEdits.isEmpty || pendingEdits.exists(_.execute) + val module = ctx.executionService.getContext + .getModuleForFile(file) + .orElseThrow(() => new ModuleNotFoundForFileException(file)) + val changesetBuilder = new ChangesetBuilder( + module.getLiteralSource, + module.getIr + ) + val changeset = changesetBuilder.build(pendingEdits) + ctx.executionService.modifyModuleSources( + module, + edits, + changeset.simpleUpdate.orNull, + logger + ) + Option.when(shouldExecute)(changeset) + } finally { + ctx.locking.releasePendingEditsLock() + logger.log( + Level.FINEST, + "Kept pending edits lock [EnsureCompiledJob] for {} milliseconds", + System.currentTimeMillis() - pendingEditsLockTimestamp + ) + } } finally { - ctx.locking.releasePendingEditsLock() - logger.log( - Level.FINEST, - "Kept pending edits lock [EnsureCompiledJob] for {} milliseconds", - System.currentTimeMillis() - pendingEditsLockTimestamp - ) ctx.locking.releaseFileLock(file) logger.log( Level.FINEST, diff --git a/engine/runtime-instrument-common/src/main/scala/org/enso/interpreter/instrument/job/ExecuteJob.scala b/engine/runtime-instrument-common/src/main/scala/org/enso/interpreter/instrument/job/ExecuteJob.scala index 86bb4e7849..d80e1c3c56 100644 --- a/engine/runtime-instrument-common/src/main/scala/org/enso/interpreter/instrument/job/ExecuteJob.scala +++ b/engine/runtime-instrument-common/src/main/scala/org/enso/interpreter/instrument/job/ExecuteJob.scala @@ -5,7 +5,6 @@ import org.enso.interpreter.instrument.InstrumentFrame import org.enso.interpreter.instrument.execution.{Executable, RuntimeContext} import org.enso.interpreter.runtime.state.ExecutionEnvironment import org.enso.polyglot.runtime.Runtime.Api -import org.enso.polyglot.runtime.Runtime.Api.ExecutionResult import java.util.logging.Level @@ -18,71 +17,79 @@ import java.util.logging.Level class ExecuteJob( contextId: UUID, stack: List[InstrumentFrame], - val executionEnvironment: Option[Api.ExecutionEnvironment] + val executionEnvironment: Option[Api.ExecutionEnvironment], + val visualizationTriggered: Boolean = false ) extends Job[Unit]( List(contextId), isCancellable = true, - // TODO[MK]: make this interruptible when https://github.com/oracle/graal/issues/3590 - // is resolved - mayInterruptIfRunning = false + // Interruptions may turn out to be problematic in enterprise edition of GraalVM + // until https://github.com/oracle/graal/issues/3590 is resolved + mayInterruptIfRunning = true ) { /** @inheritdoc */ override def run(implicit ctx: RuntimeContext): Unit = { - val logger = ctx.executionService.getLogger - val acquiredLock = ctx.locking.acquireContextLock(contextId) - val readLockTimestamp = ctx.locking.acquireReadCompilationLock() - val context = ctx.executionService.getContext - val originalExecutionEnvironment = - executionEnvironment.map(_ => context.getExecutionEnvironment) + val logger = ctx.executionService.getLogger + val acquiredLock = ctx.locking.acquireContextLock(contextId) try { - executionEnvironment.foreach(env => - context.setExecutionEnvironment(ExecutionEnvironment.forName(env.name)) - ) - val outcome = - try ProgramExecutionSupport.runProgram(contextId, stack) - finally { - originalExecutionEnvironment.foreach(context.setExecutionEnvironment) - } - outcome match { - case Some(diagnostic: Api.ExecutionResult.Diagnostic) => - if (diagnostic.isError) { - ctx.endpoint.sendToClient( - Api.Response(Api.ExecutionFailed(contextId, diagnostic)) + val readLockTimestamp = ctx.locking.acquireReadCompilationLock() + try { + val context = ctx.executionService.getContext + val originalExecutionEnvironment = + executionEnvironment.map(_ => context.getExecutionEnvironment) + executionEnvironment.foreach(env => + context.setExecutionEnvironment( + ExecutionEnvironment.forName(env.name) + ) + ) + val outcome = + try ProgramExecutionSupport.runProgram(contextId, stack) + finally { + originalExecutionEnvironment.foreach( + context.setExecutionEnvironment ) - } else { + } + outcome match { + case Some(diagnostic: Api.ExecutionResult.Diagnostic) => + if (diagnostic.isError) { + ctx.endpoint.sendToClient( + Api.Response(Api.ExecutionFailed(contextId, diagnostic)) + ) + } else { + ctx.endpoint.sendToClient( + Api.Response(Api.ExecutionUpdate(contextId, Seq(diagnostic))) + ) + ctx.endpoint.sendToClient( + Api.Response(Api.ExecutionComplete(contextId)) + ) + } + case Some(failure: Api.ExecutionResult.Failure) => ctx.endpoint.sendToClient( - Api.Response(Api.ExecutionUpdate(contextId, Seq(diagnostic))) + Api.Response(Api.ExecutionFailed(contextId, failure)) ) + case None => ctx.endpoint.sendToClient( Api.Response(Api.ExecutionComplete(contextId)) ) - } - case Some(failure: Api.ExecutionResult.Failure) => + } + } catch { + case t: Throwable => ctx.endpoint.sendToClient( - Api.Response(Api.ExecutionFailed(contextId, failure)) - ) - case None => - ctx.endpoint.sendToClient( - Api.Response(Api.ExecutionComplete(contextId)) - ) - } - } catch { - case t: Throwable => - ctx.endpoint.sendToClient( - Api.Response( - Api.ExecutionFailed( - contextId, - ExecutionResult.Failure(t.getMessage, None) + Api.Response( + Api.ExecutionFailed( + contextId, + Api.ExecutionResult.Failure(t.getMessage, None) + ) ) ) + } finally { + ctx.locking.releaseReadCompilationLock() + logger.log( + Level.FINEST, + s"Kept read compilation lock [ExecuteJob] for ${System.currentTimeMillis() - readLockTimestamp} milliseconds" ) + } } finally { - ctx.locking.releaseReadCompilationLock() - logger.log( - Level.FINEST, - s"Kept read compilation lock [ExecuteJob] for ${System.currentTimeMillis() - readLockTimestamp} milliseconds" - ) ctx.locking.releaseContextLock(contextId) logger.log( Level.FINEST, @@ -102,10 +109,19 @@ object ExecuteJob { /** Create execute job from the executable. * * @param executable the executable to run + * @param visualizationTriggered true if execution is triggered by a visualization request, false otherwise * @return the new execute job */ - def apply(executable: Executable): ExecuteJob = - new ExecuteJob(executable.contextId, executable.stack.toList, None) + def apply( + executable: Executable, + visualizationTriggered: Boolean = false + ): ExecuteJob = + new ExecuteJob( + executable.contextId, + executable.stack.toList, + None, + visualizationTriggered + ) /** Create execute job from the context and stack. * diff --git a/engine/runtime-instrument-common/src/main/scala/org/enso/interpreter/instrument/job/Job.scala b/engine/runtime-instrument-common/src/main/scala/org/enso/interpreter/instrument/job/Job.scala index 6a154daef5..9ab0dd2ba5 100644 --- a/engine/runtime-instrument-common/src/main/scala/org/enso/interpreter/instrument/job/Job.scala +++ b/engine/runtime-instrument-common/src/main/scala/org/enso/interpreter/instrument/job/Job.scala @@ -15,9 +15,18 @@ import org.enso.interpreter.instrument.execution.RuntimeContext abstract class Job[+A]( val contextIds: List[UUID], val isCancellable: Boolean, - val mayInterruptIfRunning: Boolean + val mayInterruptIfRunning: Boolean, + val highPriority: Boolean ) { + def this( + contextIds: List[UUID], + isCancellable: Boolean, + mayInterruptIfRunning: Boolean + ) = { + this(contextIds, isCancellable, mayInterruptIfRunning, false) + } + /** Executes a job. * * @param ctx contains suppliers of services to perform a request diff --git a/engine/runtime-instrument-common/src/main/scala/org/enso/interpreter/instrument/job/ProgramExecutionSupport.scala b/engine/runtime-instrument-common/src/main/scala/org/enso/interpreter/instrument/job/ProgramExecutionSupport.scala index 070f65b93e..74899d2942 100644 --- a/engine/runtime-instrument-common/src/main/scala/org/enso/interpreter/instrument/job/ProgramExecutionSupport.scala +++ b/engine/runtime-instrument-common/src/main/scala/org/enso/interpreter/instrument/job/ProgramExecutionSupport.scala @@ -244,10 +244,10 @@ object ProgramExecutionSupport { } val (explicitCallOpt, localCalls) = unwind(stack, Nil, Nil) - val executionResult = for { + val executionResult: Either[Option[Api.ExecutionResult], Unit] = for { stackItem <- Either.fromOption( explicitCallOpt, - Api.ExecutionResult.Failure("Execution stack is empty.", None) + Some(Api.ExecutionResult.Failure("Execution stack is empty.", None)) ) _ <- Either @@ -255,7 +255,7 @@ object ProgramExecutionSupport { .leftMap(onExecutionError(stackItem.item, _)) } yield () logger.log(Level.FINEST, s"Execution finished: $executionResult") - executionResult.fold(Some(_), _ => None) + executionResult.fold(identity, _ => None) } /** Execution error handler. @@ -267,24 +267,24 @@ object ProgramExecutionSupport { private def onExecutionError( item: ExecutionItem, error: Throwable - )(implicit ctx: RuntimeContext): Api.ExecutionResult = { + )(implicit ctx: RuntimeContext): Option[Api.ExecutionResult] = { val itemName = item match { case ExecutionItem.Method(_, _, function) => function case ExecutionItem.CallData(_, call) => call.getFunction.getName } val executionUpdate = getExecutionOutcome(error) val reason = VisualizationResult.findExceptionMessage(error) - def onFailure() = error match { + def onFailure(): Option[Api.ExecutionResult] = error match { case _: ThreadInterruptedException => val message = s"Execution of function $itemName interrupted." ctx.executionService.getLogger.log(Level.FINE, message) - ExecutionResult.Diagnostic.warning(message, None) + None case _ => val message = s"Execution of function $itemName failed ($reason)." ctx.executionService.getLogger.log(Level.WARNING, message, error) - ExecutionResult.Failure(message, None) + Some(ExecutionResult.Failure(message, None)) } - executionUpdate.getOrElse(onFailure()) + executionUpdate.orElse(onFailure()) } /** Convert the runtime exception to the corresponding API error messages. @@ -564,12 +564,13 @@ object ProgramExecutionSupport { .getOrElse(expressionValue.getClass) ctx.executionService.getLogger.log( Level.WARNING, - "Execution of visualization [{0}] on value [{1}] of [{2}] failed. {3}", + "Execution of visualization [{0}] on value [{1}] of [{2}] failed. {3} | {4}", Array[Object]( visualizationId, expressionId, typeOfNode, message, + expressionValue, error ) ) diff --git a/engine/runtime-instrument-common/src/main/scala/org/enso/interpreter/instrument/job/UpsertVisualizationJob.scala b/engine/runtime-instrument-common/src/main/scala/org/enso/interpreter/instrument/job/UpsertVisualizationJob.scala index 78ac32396e..cbdbad4afc 100644 --- a/engine/runtime-instrument-common/src/main/scala/org/enso/interpreter/instrument/job/UpsertVisualizationJob.scala +++ b/engine/runtime-instrument-common/src/main/scala/org/enso/interpreter/instrument/job/UpsertVisualizationJob.scala @@ -45,7 +45,8 @@ class UpsertVisualizationJob( ) extends Job[Option[Executable]]( List(config.executionContextId), false, - false + false, + true ) with UniqueJob[Option[Executable]] { @@ -62,68 +63,72 @@ class UpsertVisualizationJob( implicit val logger: TruffleLogger = ctx.executionService.getLogger val lockTimestamp = ctx.locking.acquireContextLock(config.executionContextId) - val writeLockTimestamp = ctx.locking.acquireWriteCompilationLock() try { - val maybeCallable = - UpsertVisualizationJob.evaluateVisualizationExpression( - config.visualizationModule, - config.expression - ) - - maybeCallable match { - case Left(ModuleNotFound(moduleName)) => - ctx.endpoint.sendToClient( - Api.Response(Api.ModuleNotFound(moduleName)) + val writeLockTimestamp = ctx.locking.acquireWriteCompilationLock() + try { + val maybeCallable = + UpsertVisualizationJob.evaluateVisualizationExpression( + config.visualizationModule, + config.expression ) - None - case Left(EvaluationFailed(message, result)) => - replyWithExpressionFailedError( - config.executionContextId, - visualizationId, - expressionId, - message, - result - ) - None + maybeCallable match { + case Left(ModuleNotFound(moduleName)) => + ctx.endpoint.sendToClient( + Api.Response(Api.ModuleNotFound(moduleName)) + ) + None - case Right(EvaluationResult(module, callable, arguments)) => - val visualization = - UpsertVisualizationJob.updateAttachedVisualization( + case Left(EvaluationFailed(message, result)) => + replyWithExpressionFailedError( + config.executionContextId, visualizationId, expressionId, - module, - config, - callable, - arguments + message, + result ) - val stack = ctx.contextManager.getStack(config.executionContextId) - val cachedValue = stack.headOption - .flatMap(frame => Option(frame.cache.get(expressionId))) - UpsertVisualizationJob.requireVisualizationSynchronization( - stack, - expressionId - ) - cachedValue match { - case Some(value) => - ProgramExecutionSupport.executeAndSendVisualizationUpdate( - config.executionContextId, - stack.headOption.get.syncState, - visualization, + None + + case Right(EvaluationResult(module, callable, arguments)) => + val visualization = + UpsertVisualizationJob.updateAttachedVisualization( + visualizationId, expressionId, - value + module, + config, + callable, + arguments ) - None - case None => - Some(Executable(config.executionContextId, stack)) - } + val stack = ctx.contextManager.getStack(config.executionContextId) + val cachedValue = stack.headOption + .flatMap(frame => Option(frame.cache.get(expressionId))) + UpsertVisualizationJob.requireVisualizationSynchronization( + stack, + expressionId + ) + cachedValue match { + case Some(value) => + ProgramExecutionSupport.executeAndSendVisualizationUpdate( + config.executionContextId, + stack.headOption.get.syncState, + visualization, + expressionId, + value + ) + None + case None => + Some(Executable(config.executionContextId, stack)) + } + } + } finally { + ctx.locking.releaseWriteCompilationLock() + logger.log( + Level.FINEST, + s"Kept write compilation lock [UpsertVisualizationJob] for ${System + .currentTimeMillis() - writeLockTimestamp} milliseconds" + ) } } finally { - ctx.locking.releaseWriteCompilationLock() - logger.log( - Level.FINEST, - s"Kept write compilation lock [UpsertVisualizationJob] for ${System.currentTimeMillis() - writeLockTimestamp} milliseconds" - ) ctx.locking.releaseContextLock(config.executionContextId) logger.log( Level.FINEST, diff --git a/engine/runtime-integration-tests/src/test/scala/org/enso/interpreter/test/instrument/RuntimeAsyncCommandsTest.scala b/engine/runtime-integration-tests/src/test/scala/org/enso/interpreter/test/instrument/RuntimeAsyncCommandsTest.scala index 2fa8519021..6f1634382b 100644 --- a/engine/runtime-integration-tests/src/test/scala/org/enso/interpreter/test/instrument/RuntimeAsyncCommandsTest.scala +++ b/engine/runtime-integration-tests/src/test/scala/org/enso/interpreter/test/instrument/RuntimeAsyncCommandsTest.scala @@ -220,28 +220,24 @@ class RuntimeAsyncCommandsTest context.send( Api.Request(requestId, Api.InterruptContextRequest(contextId)) ) - context.receiveNIgnoreExpressionUpdates( - 3 - ) should contain theSameElementsAs Seq( - Api.Response(requestId, Api.InterruptContextResponse(contextId)), - Api.Response( - Api.ExecutionUpdate( - contextId, - Seq( - Api.ExecutionResult.Diagnostic( - Api.DiagnosticType.Warning, - Some("Execution of function main interrupted."), - None, - None, - None, - Vector() - ) - ) - ) - ), - Api.Response( - Api.ExecutionComplete(contextId) - ) + val responses = context.receiveNIgnoreExpressionUpdates( + 2 ) + responses.length shouldEqual 2 + responses should contain( + Api.Response(requestId, Api.InterruptContextResponse(contextId)) + ) + + val failures = responses.filter(_.payload.isInstanceOf[Api.ExecutionFailed]) + failures.length shouldEqual 1 + + val failure = failures.head.payload.asInstanceOf[Api.ExecutionFailed] + failure.contextId shouldEqual contextId + failure.result shouldBe a[Api.ExecutionResult.Diagnostic] + + val diagnostic = failure.result.asInstanceOf[Api.ExecutionResult.Diagnostic] + diagnostic.kind shouldEqual Api.DiagnosticType.Error + diagnostic.message shouldEqual Some("sleep interrupted") + diagnostic.stack should not be empty } } diff --git a/engine/runtime/src/main/java/org/enso/interpreter/runtime/EnsoContext.java b/engine/runtime/src/main/java/org/enso/interpreter/runtime/EnsoContext.java index 7067b0b96d..bff4bbc326 100644 --- a/engine/runtime/src/main/java/org/enso/interpreter/runtime/EnsoContext.java +++ b/engine/runtime/src/main/java/org/enso/interpreter/runtime/EnsoContext.java @@ -730,6 +730,10 @@ public final class EnsoContext { return threadExecutors.newCachedThreadPool(name, systemThreads); } + public ExecutorService newCachedThreadPool(String name, int min, int max, boolean systemThreads) { + return threadExecutors.newCachedThreadPool(name, systemThreads, min, max); + } + /** * @param parallel amount of parallelism for the pool * @param name human-readable name of the pool diff --git a/engine/runtime/src/main/java/org/enso/interpreter/runtime/HostClassLoader.java b/engine/runtime/src/main/java/org/enso/interpreter/runtime/HostClassLoader.java index a4139a1f9f..ee949e6837 100644 --- a/engine/runtime/src/main/java/org/enso/interpreter/runtime/HostClassLoader.java +++ b/engine/runtime/src/main/java/org/enso/interpreter/runtime/HostClassLoader.java @@ -14,8 +14,8 @@ import org.slf4j.LoggerFactory; /** * Host class loader that serves as a replacement for {@link * com.oracle.truffle.host.HostClassLoader}. Add URLs to Jar archives with {@link #add(URL)}. All - * the classes that are loded via this class loader are first searched inside those archives. If not - * found, delegates to parent class loaders. + * the classes that are loaded via this class loader are first searched inside those archives. If + * not found, delegates to parent class loaders. */ public class HostClassLoader extends URLClassLoader { diff --git a/engine/runtime/src/main/java/org/enso/interpreter/runtime/ThreadExecutors.java b/engine/runtime/src/main/java/org/enso/interpreter/runtime/ThreadExecutors.java index 8153776e9d..b2ab84b9a8 100644 --- a/engine/runtime/src/main/java/org/enso/interpreter/runtime/ThreadExecutors.java +++ b/engine/runtime/src/main/java/org/enso/interpreter/runtime/ThreadExecutors.java @@ -3,9 +3,7 @@ package org.enso.interpreter.runtime; import java.util.Collections; import java.util.Map; import java.util.WeakHashMap; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.ThreadFactory; +import java.util.concurrent.*; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.logging.Level; @@ -20,13 +18,28 @@ final class ThreadExecutors { this.context = context; } - final ExecutorService newCachedThreadPool(String name, boolean systemThread) { + ExecutorService newCachedThreadPool(String name, boolean systemThread) { var s = Executors.newCachedThreadPool(new Factory(name, systemThread)); pools.put(s, name); return s; } - final ExecutorService newFixedThreadPool(int cnt, String name, boolean systemThread) { + ExecutorService newCachedThreadPool(String name, boolean systemThread, int min, int max) { + assert min >= 0; + assert max <= Integer.MAX_VALUE; + var s = + new ThreadPoolExecutor( + min, + max, + 60L, + TimeUnit.SECONDS, + new SynchronousQueue(), + new Factory(name, systemThread)); + pools.put(s, name); + return s; + } + + ExecutorService newFixedThreadPool(int cnt, String name, boolean systemThread) { var s = Executors.newFixedThreadPool(cnt, new Factory(name, systemThread)); pools.put(s, name); return s;