Unique job for visualizations (#3752)

PR adds special kind of jobs for visualizations. It should
- prevent cancelling visualization jobs when the program is re-executed (resulting in the fact that visualization is not showing up)
- omit unnecessary executions of visualization jobs (the case when the user goes through menu items in the component browser)

I skipped the tests because testing of the last scenario involves indeterminism. I.e. when preparing the test, we can't control which of submitted visualization jobs will be actually executed, and which will be cancelled.
This commit is contained in:
Dmitry Bushev 2022-10-03 11:26:54 +03:00 committed by GitHub
parent 146b8a5695
commit 11acad5cff
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 44 additions and 15 deletions

View File

@ -1791,9 +1791,9 @@ class RuntimeVisualizationsTest
context.send( context.send(
Api.Request(requestId, Api.PushContextRequest(contextId, item1)) Api.Request(requestId, Api.PushContextRequest(contextId, item1))
) )
val responses = context.receiveN(n = 5, timeoutSeconds = 60) val responses = context.receiveNIgnoreStdLib(n = 3)
responses should contain allOf ( responses should contain theSameElementsAs Seq(
Api.Response(requestId, Api.PushContextResponse(contextId)), Api.Response(requestId, Api.PushContextResponse(contextId)),
TestMessages.error( TestMessages.error(
contextId, contextId,
@ -1803,12 +1803,6 @@ class RuntimeVisualizationsTest
context.executionComplete(contextId) context.executionComplete(contextId)
) )
val loadedLibraries = responses.collect {
case Api.Response(None, Api.LibraryLoaded(namespace, name, _, _)) =>
(namespace, name)
}
loadedLibraries should contain(("Standard", "Base"))
// attach visualisation // attach visualisation
context.send( context.send(
Api.Request( Api.Request(
@ -1826,7 +1820,7 @@ class RuntimeVisualizationsTest
) )
) )
) )
val attachVisualisationResponses = context.receiveN(5) val attachVisualisationResponses = context.receiveN(4, timeoutSeconds = 60)
attachVisualisationResponses should contain allOf ( attachVisualisationResponses should contain allOf (
Api.Response(requestId, Api.VisualisationAttached()), Api.Response(requestId, Api.VisualisationAttached()),
context.executionComplete(contextId) context.executionComplete(contextId)

View File

@ -1,7 +1,7 @@
package org.enso.interpreter.instrument.execution package org.enso.interpreter.instrument.execution
import org.enso.interpreter.instrument.InterpreterContext import org.enso.interpreter.instrument.InterpreterContext
import org.enso.interpreter.instrument.job.Job import org.enso.interpreter.instrument.job.{Job, UniqueJob}
import org.enso.polyglot.RuntimeServerInfo import org.enso.polyglot.RuntimeServerInfo
import org.enso.text.Sha3_224VersionCalculator import org.enso.text.Sha3_224VersionCalculator
@ -21,7 +21,7 @@ import scala.util.control.NonFatal
* @param executionState a state of the runtime * @param executionState a state of the runtime
* @param locking locking capability for runtime * @param locking locking capability for runtime
*/ */
class JobExecutionEngine( final class JobExecutionEngine(
interpreterContext: InterpreterContext, interpreterContext: InterpreterContext,
executionState: ExecutionState, executionState: ExecutionState,
locking: Locking locking: Locking
@ -70,8 +70,29 @@ class JobExecutionEngine(
runInternal(job, backgroundJobExecutor, backgroundJobsRef) runInternal(job, backgroundJobExecutor, backgroundJobsRef)
/** @inheritdoc */ /** @inheritdoc */
override def run[A](job: Job[A]): Future[A] = override def run[A](job: Job[A]): Future[A] = {
cancelDuplicateJobs(job)
runInternal(job, jobExecutor, runningJobsRef) runInternal(job, jobExecutor, runningJobsRef)
}
private def cancelDuplicateJobs[A](job: Job[A]): Unit = {
job match {
case job: UniqueJob[_] =>
val allJobs =
runningJobsRef.updateAndGet(_.filterNot(_.future.isCancelled))
allJobs.foreach { runningJob =>
runningJob.job match {
case jobRef: UniqueJob[_]
if jobRef.getClass == job.getClass && jobRef.key == job.key =>
runtimeContext.executionService.getLogger
.log(Level.FINEST, s"Cancelling duplicate job [$jobRef].")
runningJob.future.cancel(jobRef.mayInterruptIfRunning)
case _ =>
}
}
case _ =>
}
}
private def runInternal[A]( private def runInternal[A](
job: Job[A], job: Job[A],

View File

@ -25,5 +25,19 @@ abstract class Job[+A](
def run(implicit ctx: RuntimeContext): A def run(implicit ctx: RuntimeContext): A
override def toString: String = this.getClass.getSimpleName override def toString: String = this.getClass.getSimpleName
} }
/** The job queue can contain only one job of this type with the same `key`.
* When a job of this type is added to the job queue, previous duplicate jobs
* are cancelled.
*
* @param key a unique job key
* @param contextIds affected executions contests' ids
* @param mayInterruptIfRunning determines if the job may be interruptd when
* running
*/
abstract class UniqueJob[+A](
val key: UUID,
contextIds: List[UUID],
mayInterruptIfRunning: Boolean
) extends Job[A](contextIds, isCancellable = false, mayInterruptIfRunning)

View File

@ -37,9 +37,9 @@ class UpsertVisualisationJob(
visualisationId: VisualisationId, visualisationId: VisualisationId,
expressionId: ExpressionId, expressionId: ExpressionId,
config: Api.VisualisationConfiguration config: Api.VisualisationConfiguration
) extends Job[Option[Executable]]( ) extends UniqueJob[Option[Executable]](
expressionId,
List(config.executionContextId), List(config.executionContextId),
false,
false false
) { ) {