mirror of
https://github.com/enso-org/enso.git
synced 2024-12-23 18:34:03 +03:00
Delay writing IR caches (#5957)
close #5911 In interactive mode, perform writing IR caches in the background jobs queue. Background jobs execution is delayed until the first execution is complete.
This commit is contained in:
parent
a668e3d434
commit
b2aa58bf20
@ -201,6 +201,10 @@ final class SuggestionsHandler(
|
||||
initialized(projectName, graph, clients - client.clientId, state)
|
||||
)
|
||||
|
||||
case msg: Api.SuggestionsDatabaseSuggestionsLoadedNotification
|
||||
if state.isSuggestionLoadingRunning =>
|
||||
state.suggestionLoadingQueue.enqueue(msg)
|
||||
|
||||
case msg: Api.SuggestionsDatabaseSuggestionsLoadedNotification =>
|
||||
logger.debug(
|
||||
"Starting loading suggestions for library [{}].",
|
||||
@ -218,13 +222,23 @@ final class SuggestionsHandler(
|
||||
sessionRouter ! DeliverToJsonController(clientId, notification)
|
||||
}
|
||||
}
|
||||
self ! SuggestionsHandler.SuggestionLoadingCompleted
|
||||
case Failure(ex) =>
|
||||
logger.error(
|
||||
"Error applying suggestion updates for loaded library [{}].",
|
||||
msg.libraryName,
|
||||
ex
|
||||
)
|
||||
self ! SuggestionsHandler.SuggestionLoadingCompleted
|
||||
}
|
||||
context.become(
|
||||
initialized(
|
||||
projectName,
|
||||
graph,
|
||||
clients,
|
||||
state.copy(isSuggestionLoadingRunning = true)
|
||||
)
|
||||
)
|
||||
|
||||
case msg: Api.SuggestionsDatabaseModuleUpdateNotification
|
||||
if state.isSuggestionUpdatesRunning =>
|
||||
@ -516,6 +530,18 @@ final class SuggestionsHandler(
|
||||
)
|
||||
)
|
||||
|
||||
case SuggestionLoadingCompleted =>
|
||||
if (state.suggestionLoadingQueue.nonEmpty) {
|
||||
self ! state.suggestionLoadingQueue.dequeue()
|
||||
}
|
||||
context.become(
|
||||
initialized(
|
||||
projectName,
|
||||
graph,
|
||||
clients,
|
||||
state.copy(isSuggestionLoadingRunning = false)
|
||||
)
|
||||
)
|
||||
}
|
||||
|
||||
/** Transition the initialization process.
|
||||
@ -775,9 +801,12 @@ object SuggestionsHandler {
|
||||
}
|
||||
|
||||
/** The notification that the suggestion updates are processed. */
|
||||
case object SuggestionUpdatesCompleted
|
||||
private case object SuggestionUpdatesCompleted
|
||||
|
||||
case class SuggestionUpdatesBatch(
|
||||
/** The notification that the suggestions loading is complete. */
|
||||
private case object SuggestionLoadingCompleted
|
||||
|
||||
private case class SuggestionUpdatesBatch(
|
||||
updates: Seq[Api.SuggestionsDatabaseModuleUpdateNotification]
|
||||
)
|
||||
|
||||
@ -809,15 +838,20 @@ object SuggestionsHandler {
|
||||
/** The suggestion updates state.
|
||||
*
|
||||
* @param suggestionUpdatesQueue the queue containing update messages
|
||||
* @param isSuggestionUpdatesRunning a flag for a running update action
|
||||
* @param isSuggestionUpdatesRunning a flag for a running suggestion update action
|
||||
* @param isSuggestionLoadingRunning a flag for a running suggestion loading action
|
||||
* @param shouldStartBackgroundProcessing a flag for starting a background
|
||||
* processing action
|
||||
*/
|
||||
final case class State(
|
||||
suggestionUpdatesQueue: mutable.Queue[
|
||||
Api.SuggestionsDatabaseModuleUpdateNotification
|
||||
] = mutable.Queue.empty,
|
||||
suggestionLoadingQueue: mutable.Queue[
|
||||
Api.SuggestionsDatabaseSuggestionsLoadedNotification
|
||||
] = mutable.Queue.empty,
|
||||
isSuggestionUpdatesRunning: Boolean = false,
|
||||
isSuggestionLoadingRunning: Boolean = false,
|
||||
shouldStartBackgroundProcessing: Boolean = true
|
||||
)
|
||||
|
||||
|
@ -9,7 +9,7 @@ import com.fasterxml.jackson.module.scala.{
|
||||
}
|
||||
import org.enso.editions.LibraryName
|
||||
import org.enso.logger.masking.{MaskedPath, MaskedString, ToLogString}
|
||||
import org.enso.pkg.ComponentGroups
|
||||
import org.enso.pkg.{ComponentGroups, QualifiedName}
|
||||
import org.enso.polyglot.{ModuleExports, Suggestion}
|
||||
import org.enso.polyglot.data.{Tree, TypeGraph}
|
||||
import org.enso.text.ContentVersion
|
||||
@ -276,6 +276,10 @@ object Runtime {
|
||||
new JsonSubTypes.Type(
|
||||
value = classOf[Api.BackgroundJobsStartedNotification],
|
||||
name = "backgroundJobsStartedNotification"
|
||||
),
|
||||
new JsonSubTypes.Type(
|
||||
value = classOf[Api.SerializeModule],
|
||||
name = "serializeModule"
|
||||
)
|
||||
)
|
||||
)
|
||||
@ -1679,6 +1683,12 @@ object Runtime {
|
||||
/** A notification about started background jobs. */
|
||||
final case class BackgroundJobsStartedNotification() extends ApiNotification
|
||||
|
||||
/** A request to serialize the module.
|
||||
*
|
||||
* @param module qualified module name
|
||||
*/
|
||||
final case class SerializeModule(module: QualifiedName) extends ApiRequest
|
||||
|
||||
private lazy val mapper = {
|
||||
val factory = new CBORFactory()
|
||||
val mapper = new ObjectMapper(factory) with ClassTagExtensions
|
||||
|
@ -242,17 +242,13 @@ class RuntimeComponentsTest
|
||||
)
|
||||
val responses =
|
||||
context.receiveAllUntil(
|
||||
Seq(
|
||||
context.executionComplete(contextId),
|
||||
context.analyzeJobFinished
|
||||
),
|
||||
Seq(context.executionComplete(contextId)),
|
||||
timeout = 180
|
||||
)
|
||||
// sanity check
|
||||
responses should contain allOf (
|
||||
Api.Response(requestId, Api.PushContextResponse(contextId)),
|
||||
context.executionComplete(contextId),
|
||||
context.analyzeJobFinished
|
||||
)
|
||||
|
||||
// check LibraryLoaded notifications
|
||||
@ -335,17 +331,13 @@ class RuntimeComponentsTest
|
||||
)
|
||||
val responses =
|
||||
context.receiveAllUntil(
|
||||
Seq(
|
||||
context.executionComplete(contextId),
|
||||
context.analyzeJobFinished
|
||||
),
|
||||
Seq(context.executionComplete(contextId)),
|
||||
timeout = 180
|
||||
)
|
||||
// sanity check
|
||||
responses should contain allOf (
|
||||
Api.Response(requestId, Api.PushContextResponse(contextId)),
|
||||
context.executionComplete(contextId),
|
||||
context.analyzeJobFinished
|
||||
)
|
||||
|
||||
// check the registered component groups
|
||||
|
@ -235,17 +235,13 @@ class RuntimeStdlibTest
|
||||
)
|
||||
val responses =
|
||||
context.receiveAllUntil(
|
||||
Seq(
|
||||
context.executionComplete(contextId),
|
||||
context.analyzeJobFinished
|
||||
),
|
||||
Seq(context.executionComplete(contextId)),
|
||||
timeout = 180
|
||||
)
|
||||
// sanity check
|
||||
responses should contain allOf (
|
||||
Api.Response(requestId, Api.PushContextResponse(contextId)),
|
||||
context.executionComplete(contextId),
|
||||
context.analyzeJobFinished,
|
||||
)
|
||||
|
||||
// check that the suggestion notifications are received
|
||||
|
@ -138,7 +138,7 @@ class RuntimeSuggestionUpdatesTest
|
||||
)
|
||||
)
|
||||
context.receiveNIgnoreExpressionUpdates(
|
||||
5
|
||||
4
|
||||
) should contain theSameElementsAs Seq(
|
||||
Api.Response(Api.BackgroundJobsStartedNotification()),
|
||||
Api.Response(requestId, Api.PushContextResponse(contextId)),
|
||||
@ -180,7 +180,6 @@ class RuntimeSuggestionUpdatesTest
|
||||
)
|
||||
)
|
||||
),
|
||||
Api.Response(Api.AnalyzeModuleInScopeJobFinished()),
|
||||
context.executionComplete(contextId)
|
||||
)
|
||||
context.consumeOut shouldEqual List("Hello World!")
|
||||
@ -201,7 +200,7 @@ class RuntimeSuggestionUpdatesTest
|
||||
)
|
||||
)
|
||||
context.receiveNIgnoreExpressionUpdates(
|
||||
3
|
||||
2
|
||||
) should contain theSameElementsAs Seq(
|
||||
Api.Response(
|
||||
Api.SuggestionsDatabaseModuleUpdateNotification(
|
||||
@ -256,7 +255,6 @@ class RuntimeSuggestionUpdatesTest
|
||||
)
|
||||
)
|
||||
),
|
||||
Api.Response(Api.AnalyzeModuleInScopeJobFinished()),
|
||||
context.executionComplete(contextId)
|
||||
)
|
||||
context.consumeOut shouldEqual List("42")
|
||||
@ -281,7 +279,7 @@ class RuntimeSuggestionUpdatesTest
|
||||
)
|
||||
)
|
||||
context.receiveNIgnoreExpressionUpdates(
|
||||
3
|
||||
2
|
||||
) should contain theSameElementsAs Seq(
|
||||
Api.Response(
|
||||
Api.SuggestionsDatabaseModuleUpdateNotification(
|
||||
@ -361,7 +359,6 @@ class RuntimeSuggestionUpdatesTest
|
||||
)
|
||||
)
|
||||
),
|
||||
Api.Response(Api.AnalyzeModuleInScopeJobFinished()),
|
||||
context.executionComplete(contextId)
|
||||
)
|
||||
context.consumeOut shouldEqual List("51")
|
||||
@ -382,7 +379,7 @@ class RuntimeSuggestionUpdatesTest
|
||||
)
|
||||
)
|
||||
context.receiveNIgnoreExpressionUpdates(
|
||||
3
|
||||
2
|
||||
) should contain theSameElementsAs Seq(
|
||||
Api.Response(
|
||||
Api.SuggestionsDatabaseModuleUpdateNotification(
|
||||
@ -471,7 +468,6 @@ class RuntimeSuggestionUpdatesTest
|
||||
)
|
||||
)
|
||||
),
|
||||
Api.Response(Api.AnalyzeModuleInScopeJobFinished()),
|
||||
context.executionComplete(contextId)
|
||||
)
|
||||
context.consumeOut shouldEqual List("51")
|
||||
@ -492,7 +488,7 @@ class RuntimeSuggestionUpdatesTest
|
||||
)
|
||||
)
|
||||
context.receiveNIgnoreExpressionUpdates(
|
||||
3
|
||||
2
|
||||
) should contain theSameElementsAs Seq(
|
||||
Api.Response(
|
||||
Api.SuggestionsDatabaseModuleUpdateNotification(
|
||||
@ -609,7 +605,6 @@ class RuntimeSuggestionUpdatesTest
|
||||
)
|
||||
)
|
||||
),
|
||||
Api.Response(Api.AnalyzeModuleInScopeJobFinished()),
|
||||
context.executionComplete(contextId)
|
||||
)
|
||||
context.consumeOut shouldEqual List("51")
|
||||
@ -630,7 +625,7 @@ class RuntimeSuggestionUpdatesTest
|
||||
)
|
||||
)
|
||||
context.receiveNIgnoreExpressionUpdates(
|
||||
3
|
||||
2
|
||||
) should contain theSameElementsAs Seq(
|
||||
Api.Response(
|
||||
Api.SuggestionsDatabaseModuleUpdateNotification(
|
||||
@ -698,7 +693,6 @@ class RuntimeSuggestionUpdatesTest
|
||||
)
|
||||
)
|
||||
),
|
||||
Api.Response(Api.AnalyzeModuleInScopeJobFinished()),
|
||||
context.executionComplete(contextId)
|
||||
)
|
||||
context.consumeOut shouldEqual List("51")
|
||||
@ -756,7 +750,7 @@ class RuntimeSuggestionUpdatesTest
|
||||
)
|
||||
)
|
||||
context.receiveNIgnoreExpressionUpdates(
|
||||
5
|
||||
4
|
||||
) should contain theSameElementsAs Seq(
|
||||
Api.Response(Api.BackgroundJobsStartedNotification()),
|
||||
Api.Response(requestId, Api.PushContextResponse(contextId)),
|
||||
@ -868,7 +862,6 @@ class RuntimeSuggestionUpdatesTest
|
||||
)
|
||||
)
|
||||
),
|
||||
Api.Response(Api.AnalyzeModuleInScopeJobFinished()),
|
||||
context.executionComplete(contextId)
|
||||
)
|
||||
}
|
||||
@ -935,7 +928,7 @@ class RuntimeSuggestionUpdatesTest
|
||||
)
|
||||
)
|
||||
context.receiveNIgnoreExpressionUpdates(
|
||||
6
|
||||
5
|
||||
) should contain theSameElementsAs Seq(
|
||||
Api.Response(Api.BackgroundJobsStartedNotification()),
|
||||
Api.Response(requestId, Api.PushContextResponse(contextId)),
|
||||
@ -1113,7 +1106,6 @@ class RuntimeSuggestionUpdatesTest
|
||||
)
|
||||
)
|
||||
),
|
||||
Api.Response(Api.AnalyzeModuleInScopeJobFinished()),
|
||||
context.executionComplete(contextId)
|
||||
)
|
||||
context.consumeOut shouldEqual List("Hello World!")
|
||||
@ -1184,7 +1176,7 @@ class RuntimeSuggestionUpdatesTest
|
||||
)
|
||||
)
|
||||
context.receiveNIgnorePendingExpressionUpdates(
|
||||
3
|
||||
2
|
||||
) should contain theSameElementsAs Seq(
|
||||
Api.Response(
|
||||
Api.SuggestionsDatabaseModuleUpdateNotification(
|
||||
@ -1208,7 +1200,6 @@ class RuntimeSuggestionUpdatesTest
|
||||
updates = Tree.Root(Vector())
|
||||
)
|
||||
),
|
||||
Api.Response(Api.AnalyzeModuleInScopeJobFinished()),
|
||||
context.executionComplete(contextId)
|
||||
)
|
||||
context.consumeOut shouldEqual List("Hello World!")
|
||||
|
@ -0,0 +1,28 @@
|
||||
package org.enso.interpreter.instrument.command;
|
||||
|
||||
import org.enso.interpreter.instrument.execution.RuntimeContext;
|
||||
import org.enso.interpreter.instrument.job.SerializeModuleJob;
|
||||
import org.enso.pkg.QualifiedName;
|
||||
import scala.Option;
|
||||
import scala.concurrent.ExecutionContext;
|
||||
import scala.concurrent.Future;
|
||||
import scala.runtime.BoxedUnit;
|
||||
|
||||
import java.util.UUID;
|
||||
|
||||
/** The command to start the module serialization. */
|
||||
public final class SerializeModuleCommand extends Command {
|
||||
|
||||
private final QualifiedName moduleName;
|
||||
|
||||
public SerializeModuleCommand(Option<UUID> maybeRequestId, QualifiedName moduleName) {
|
||||
super(maybeRequestId);
|
||||
this.moduleName = moduleName;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Future<BoxedUnit> execute(RuntimeContext ctx, ExecutionContext ec) {
|
||||
ctx.jobProcessor().runBackground(new SerializeModuleJob(moduleName));
|
||||
return Future.successful(BoxedUnit.UNIT);
|
||||
}
|
||||
}
|
@ -0,0 +1,29 @@
|
||||
package org.enso.interpreter.instrument.job;
|
||||
|
||||
import scala.collection.immutable.List$;
|
||||
|
||||
/** The job that runs in the background. */
|
||||
public abstract class BackgroundJob<A> extends Job<A> implements Comparable<BackgroundJob<?>> {
|
||||
|
||||
private final int priority;
|
||||
|
||||
/**
|
||||
* Create a background job with priority.
|
||||
*
|
||||
* @param priority the job priority. Lower number indicates higher priority.
|
||||
*/
|
||||
public BackgroundJob(int priority) {
|
||||
super(List$.MODULE$.empty(), false, false);
|
||||
this.priority = priority;
|
||||
}
|
||||
|
||||
/** @return the job priority. */
|
||||
public int getPriority() {
|
||||
return priority;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int compareTo(BackgroundJob<?> that) {
|
||||
return Integer.compare(this.priority, that.getPriority());
|
||||
}
|
||||
}
|
@ -0,0 +1,57 @@
|
||||
package org.enso.interpreter.instrument.job;
|
||||
|
||||
import org.enso.compiler.SerializationManager;
|
||||
import org.enso.interpreter.instrument.execution.RuntimeContext;
|
||||
import org.enso.interpreter.runtime.EnsoContext;
|
||||
import org.enso.interpreter.runtime.Module;
|
||||
import org.enso.pkg.QualifiedName;
|
||||
import org.enso.polyglot.RuntimeOptions;
|
||||
|
||||
import java.util.logging.Level;
|
||||
|
||||
/** The job that serializes module. */
|
||||
public final class SerializeModuleJob extends BackgroundJob<Void> {
|
||||
|
||||
private final QualifiedName moduleName;
|
||||
|
||||
private static final int SERIALIZE_MODULE_JOB_PRIORITY = 1000;
|
||||
|
||||
public SerializeModuleJob(QualifiedName moduleName) {
|
||||
super(SERIALIZE_MODULE_JOB_PRIORITY);
|
||||
this.moduleName = moduleName;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Void run(RuntimeContext ctx) {
|
||||
EnsoContext ensoContext = ctx.executionService().getContext();
|
||||
SerializationManager serializationManager = ensoContext.getCompiler().getSerializationManager();
|
||||
boolean useGlobalCacheLocations =
|
||||
ensoContext
|
||||
.getEnvironment()
|
||||
.getOptions()
|
||||
.get(RuntimeOptions.USE_GLOBAL_IR_CACHE_LOCATION_KEY);
|
||||
ctx.locking().acquireWriteCompilationLock();
|
||||
try {
|
||||
ctx.executionService()
|
||||
.getContext()
|
||||
.findModule(moduleName.toString())
|
||||
.ifPresent(
|
||||
module -> {
|
||||
if (module.getCompilationStage().isBefore(Module.CompilationStage.AFTER_CODEGEN)) {
|
||||
ctx.executionService()
|
||||
.getLogger()
|
||||
.log(
|
||||
Level.WARNING,
|
||||
"Attempt to serialize the module [{}] at stage [{}].",
|
||||
new Object[] {module.getName(), module.getCompilationStage()});
|
||||
return;
|
||||
}
|
||||
|
||||
serializationManager.serializeModule(module, useGlobalCacheLocations, false);
|
||||
});
|
||||
} finally {
|
||||
ctx.locking().releaseWriteCompilationLock();
|
||||
}
|
||||
return null;
|
||||
}
|
||||
}
|
@ -14,7 +14,7 @@ public final class VisualizationResult {
|
||||
/**
|
||||
* Extracts a string representation for a polyglot exception.
|
||||
*
|
||||
* @param exexception the exception
|
||||
* @param ex the exception
|
||||
* @return message representing the exception
|
||||
*/
|
||||
@CompilerDirectives.TruffleBoundary
|
||||
|
@ -524,6 +524,11 @@ public class EnsoContext {
|
||||
return 10;
|
||||
}
|
||||
|
||||
/** @return the notification handler. */
|
||||
public NotificationHandler getNotificationHandler() {
|
||||
return notificationHandler;
|
||||
}
|
||||
|
||||
private <T> T getOption(OptionKey<T> key) {
|
||||
var options = getEnvironment().getOptions();
|
||||
var safely = false;
|
||||
|
@ -5,7 +5,6 @@ import com.oracle.truffle.api.source.Source
|
||||
import org.enso.compiler.codegen.{IrToTruffle, RuntimeStubsGenerator}
|
||||
import org.enso.compiler.context.{FreshNameSupply, InlineContext, ModuleContext}
|
||||
import org.enso.compiler.core.IR
|
||||
|
||||
import org.enso.compiler.data.{BindingsMap, CompilerConfig}
|
||||
import org.enso.compiler.exception.{CompilationAbortedException, CompilerError}
|
||||
import org.enso.compiler.pass.PassManager
|
||||
@ -60,6 +59,8 @@ class Compiler(
|
||||
private val useGlobalCacheLocations = context.getEnvironment.getOptions.get(
|
||||
RuntimeOptions.USE_GLOBAL_IR_CACHE_LOCATION_KEY
|
||||
)
|
||||
private val isInteractiveMode =
|
||||
context.getEnvironment.getOptions.get(RuntimeOptions.INTERACTIVE_MODE_KEY)
|
||||
private val serializationManager: SerializationManager =
|
||||
new SerializationManager(this)
|
||||
private val logger: TruffleLogger = context.getLogger(getClass)
|
||||
@ -396,10 +397,14 @@ class Compiler(
|
||||
val shouldStoreCache =
|
||||
irCachingEnabled && !module.wasLoadedFromCache()
|
||||
if (shouldStoreCache && !hasErrors(module) && !module.isInteractive) {
|
||||
serializationManager.serializeModule(
|
||||
module,
|
||||
useGlobalCacheLocations
|
||||
)
|
||||
if (isInteractiveMode) {
|
||||
context.getNotificationHandler.serializeModule(module.getName)
|
||||
} else {
|
||||
serializationManager.serializeModule(
|
||||
module,
|
||||
useGlobalCacheLocations
|
||||
)
|
||||
}
|
||||
}
|
||||
} else {
|
||||
logger.log(
|
||||
|
@ -94,12 +94,14 @@ final class SerializationManager(
|
||||
*
|
||||
* @param module the module to serialize
|
||||
* @param useGlobalCacheLocations if true, will use global caches location, local one otherwise
|
||||
* @param useThreadPool if true, will perform serialization asynchronously
|
||||
* @return Future referencing the serialization task. On completion Future will return
|
||||
* `true` if `module` has been successfully serialized, `false` otherwise
|
||||
*/
|
||||
def serializeModule(
|
||||
module: Module,
|
||||
useGlobalCacheLocations: Boolean
|
||||
useGlobalCacheLocations: Boolean,
|
||||
useThreadPool: Boolean = true
|
||||
): Future[Boolean] = {
|
||||
logger.log(
|
||||
debugLogLevel,
|
||||
@ -120,7 +122,7 @@ final class SerializationManager(
|
||||
module.getSource,
|
||||
useGlobalCacheLocations
|
||||
)
|
||||
if (compiler.context.getEnvironment.isCreateThreadAllowed) {
|
||||
if (useThreadPool) {
|
||||
isWaitingForSerialization.synchronized {
|
||||
val future = pool.submit(task)
|
||||
isWaitingForSerialization.put(module.getName, future)
|
||||
@ -178,6 +180,10 @@ final class SerializationManager(
|
||||
libraryName: LibraryName,
|
||||
useGlobalCacheLocations: Boolean
|
||||
): Callable[Boolean] = () => {
|
||||
while (isSerializingLibrary(libraryName)) {
|
||||
Thread.sleep(100)
|
||||
}
|
||||
|
||||
logger.log(
|
||||
debugLogLevel,
|
||||
"Running serialization for bindings [{}].",
|
||||
@ -347,7 +353,7 @@ final class SerializationManager(
|
||||
abort(module)
|
||||
None
|
||||
} else {
|
||||
while (isSerializingModule(module)) {
|
||||
while (isSerializingModule(module.getName)) {
|
||||
Thread.sleep(100)
|
||||
}
|
||||
|
||||
@ -401,11 +407,11 @@ final class SerializationManager(
|
||||
* @return `true` if `module` is currently being serialized, `false`
|
||||
* otherwise
|
||||
*/
|
||||
def isSerializingModule(module: Module): Boolean = {
|
||||
isSerializing.contains(module.getName)
|
||||
private def isSerializingModule(module: QualifiedName): Boolean = {
|
||||
isSerializing.contains(module)
|
||||
}
|
||||
|
||||
def isSerializingLibrary(library: LibraryName): Boolean = {
|
||||
private def isSerializingLibrary(library: LibraryName): Boolean = {
|
||||
isSerializing.contains(library.toQualifiedName)
|
||||
}
|
||||
|
||||
@ -420,7 +426,7 @@ final class SerializationManager(
|
||||
* @param module the module to check
|
||||
* @return `true` if `module` is waiting for serialization, `false` otherwise
|
||||
*/
|
||||
def isWaitingForSerialization(module: Module): Boolean = {
|
||||
private def isWaitingForSerialization(module: Module): Boolean = {
|
||||
isWaitingForSerialization(module.getName)
|
||||
}
|
||||
|
||||
@ -429,11 +435,11 @@ final class SerializationManager(
|
||||
* @param library the library to check
|
||||
* @return `true` if `library` is waiting for serialization, `false` otherwise
|
||||
*/
|
||||
def isWaitingForSerialization(library: LibraryName): Boolean = {
|
||||
private def isWaitingForSerialization(library: LibraryName): Boolean = {
|
||||
isWaitingForSerialization(library.toQualifiedName)
|
||||
}
|
||||
|
||||
def abort(name: QualifiedName): Boolean = {
|
||||
private def abort(name: QualifiedName): Boolean = {
|
||||
isWaitingForSerialization.synchronized {
|
||||
if (isWaitingForSerialization(name)) {
|
||||
isWaitingForSerialization
|
||||
@ -453,7 +459,7 @@ final class SerializationManager(
|
||||
* @return `true` if serialization for `module` was aborted, `false`
|
||||
* otherwise
|
||||
*/
|
||||
def abort(module: Module): Boolean = {
|
||||
private def abort(module: Module): Boolean = {
|
||||
abort(module.getName)
|
||||
}
|
||||
|
||||
@ -466,7 +472,7 @@ final class SerializationManager(
|
||||
* @return `true` if serialization for `library` was aborted, `false`
|
||||
* otherwise
|
||||
*/
|
||||
def abort(library: LibraryName): Boolean = {
|
||||
private def abort(library: LibraryName): Boolean = {
|
||||
abort(library.toQualifiedName)
|
||||
}
|
||||
|
||||
@ -542,6 +548,10 @@ final class SerializationManager(
|
||||
source: Source,
|
||||
useGlobalCacheLocations: Boolean
|
||||
): Callable[Boolean] = { () =>
|
||||
while (isSerializingModule(name)) {
|
||||
Thread.sleep(100)
|
||||
}
|
||||
|
||||
logger.log(
|
||||
debugLogLevel,
|
||||
"Running serialization for module [{}].",
|
||||
|
@ -6,6 +6,7 @@ import org.enso.cli.task.{ProgressNotification, ProgressReporter, TaskProgress}
|
||||
import org.enso.distribution.ProgressAndLockNotificationForwarder
|
||||
import org.enso.distribution.locking.{LockUserInterface, Resource}
|
||||
import org.enso.editions.{LibraryName, LibraryVersion}
|
||||
import org.enso.pkg.QualifiedName
|
||||
import org.enso.polyglot.runtime.Runtime.{Api, ApiResponse}
|
||||
|
||||
import java.nio.file.Path
|
||||
@ -26,6 +27,12 @@ trait NotificationHandler extends ProgressReporter with LockUserInterface {
|
||||
libraryVersion: LibraryVersion,
|
||||
location: Path
|
||||
): Unit
|
||||
|
||||
/** A request to serialize the module.
|
||||
*
|
||||
* @param moduleName qualified module name
|
||||
*/
|
||||
def serializeModule(moduleName: QualifiedName): Unit
|
||||
}
|
||||
|
||||
object NotificationHandler {
|
||||
@ -48,6 +55,9 @@ object NotificationHandler {
|
||||
// Library notifications are deliberately ignored in text mode.
|
||||
}
|
||||
|
||||
/** @inheritdoc */
|
||||
override def serializeModule(module: QualifiedName): Unit = ()
|
||||
|
||||
/** @inheritdoc */
|
||||
override def trackProgress(message: String, task: TaskProgress[_]): Unit = {
|
||||
logger.info(message)
|
||||
@ -79,6 +89,10 @@ object NotificationHandler {
|
||||
): Unit = for (listener <- listeners)
|
||||
listener.addedLibrary(libraryName, libraryVersion, location)
|
||||
|
||||
/** @inheritdoc */
|
||||
def serializeModule(module: QualifiedName): Unit =
|
||||
listeners.foreach(_.serializeModule(module))
|
||||
|
||||
/** @inheritdoc */
|
||||
override def trackProgress(message: String, task: TaskProgress[_]): Unit =
|
||||
for (listener <- listeners) listener.trackProgress(message, task)
|
||||
@ -131,6 +145,12 @@ object NotificationHandler {
|
||||
)
|
||||
}
|
||||
|
||||
/** @inheritdoc */
|
||||
override def serializeModule(module: QualifiedName): Unit =
|
||||
endpoint.sendToSelf(
|
||||
Api.Request(Api.SerializeModule(module))
|
||||
)
|
||||
|
||||
/** @inheritdoc */
|
||||
override def trackProgress(message: String, task: TaskProgress[_]): Unit = {
|
||||
logger.info(message)
|
||||
|
@ -65,6 +65,9 @@ object CommandFactory {
|
||||
case _: Api.StartBackgroundProcessing =>
|
||||
new StartBackgroundProcessingCmd(request.requestId)
|
||||
|
||||
case payload: Api.SerializeModule =>
|
||||
new SerializeModuleCommand(request.requestId, payload.module)
|
||||
|
||||
case Api.ShutDownRuntimeServer() =>
|
||||
throw new IllegalArgumentException(
|
||||
"ShutDownRuntimeServer request is not convertible to command object"
|
||||
|
@ -1,15 +1,15 @@
|
||||
package org.enso.interpreter.instrument.execution
|
||||
|
||||
import org.enso.interpreter.instrument.InterpreterContext
|
||||
import org.enso.interpreter.instrument.job.{Job, UniqueJob}
|
||||
import org.enso.interpreter.instrument.job.{BackgroundJob, Job, UniqueJob}
|
||||
import org.enso.text.Sha3_224VersionCalculator
|
||||
|
||||
import java.util.UUID
|
||||
import java.util
|
||||
import java.util.{Collections, UUID}
|
||||
import java.util.concurrent.atomic.AtomicReference
|
||||
import java.util.concurrent.ExecutorService
|
||||
import java.util.logging.Level
|
||||
|
||||
import scala.collection.mutable
|
||||
import scala.concurrent.{Future, Promise}
|
||||
import scala.util.control.NonFatal
|
||||
|
||||
@ -41,12 +41,12 @@ final class JobExecutionEngine(
|
||||
private var isBackgroundJobsStarted = false
|
||||
|
||||
private val delayedBackgroundJobsQueue =
|
||||
mutable.Queue.empty[Job[_]]
|
||||
new util.ArrayList[BackgroundJob[_]](4096)
|
||||
|
||||
val jobExecutor: ExecutorService =
|
||||
context.newFixedThreadPool(jobParallelism, "job-pool", false)
|
||||
|
||||
val backgroundJobExecutor: ExecutorService =
|
||||
private val backgroundJobExecutor: ExecutorService =
|
||||
context.newFixedThreadPool(1, "background-job-pool", false)
|
||||
|
||||
private val runtimeContext =
|
||||
@ -63,12 +63,12 @@ final class JobExecutionEngine(
|
||||
)
|
||||
|
||||
/** @inheritdoc */
|
||||
override def runBackground[A](job: Job[A]): Unit =
|
||||
override def runBackground[A](job: BackgroundJob[A]): Unit =
|
||||
synchronized {
|
||||
if (isBackgroundJobsStarted) {
|
||||
runInternal(job, backgroundJobExecutor, backgroundJobsRef)
|
||||
} else {
|
||||
delayedBackgroundJobsQueue.enqueue(job)
|
||||
delayedBackgroundJobsQueue.add(job)
|
||||
}
|
||||
}
|
||||
|
||||
@ -158,8 +158,7 @@ final class JobExecutionEngine(
|
||||
synchronized {
|
||||
val result = !isBackgroundJobsStarted
|
||||
isBackgroundJobsStarted = true
|
||||
delayedBackgroundJobsQueue.foreach(runBackground)
|
||||
delayedBackgroundJobsQueue.clear()
|
||||
submitBackgroundJobsOrdered()
|
||||
result
|
||||
}
|
||||
|
||||
@ -172,4 +171,10 @@ final class JobExecutionEngine(
|
||||
jobExecutor.shutdownNow()
|
||||
}
|
||||
|
||||
/** Submit background jobs preserving the stable order. */
|
||||
private def submitBackgroundJobsOrdered(): Unit = {
|
||||
Collections.sort(delayedBackgroundJobsQueue)
|
||||
delayedBackgroundJobsQueue.forEach(job => runBackground(job))
|
||||
delayedBackgroundJobsQueue.clear()
|
||||
}
|
||||
}
|
||||
|
@ -1,6 +1,6 @@
|
||||
package org.enso.interpreter.instrument.execution
|
||||
|
||||
import org.enso.interpreter.instrument.job.Job
|
||||
import org.enso.interpreter.instrument.job.{BackgroundJob, Job}
|
||||
|
||||
import scala.concurrent.Future
|
||||
|
||||
@ -20,7 +20,7 @@ trait JobProcessor {
|
||||
* @param job a job to execute
|
||||
* @return the future result of an asynchronous computation
|
||||
*/
|
||||
def runBackground[A](job: Job[A]): Unit
|
||||
def runBackground[A](job: BackgroundJob[A]): Unit
|
||||
|
||||
/** Stops the job processor. */
|
||||
def stop(): Unit
|
||||
|
@ -16,11 +16,7 @@ import java.util.logging.Level
|
||||
|
||||
final class AnalyzeModuleInScopeJob(
|
||||
modules: Iterable[Module]
|
||||
) extends Job[Unit](
|
||||
List(AnalyzeModuleJob.backgroundContextId),
|
||||
false,
|
||||
false
|
||||
) {
|
||||
) extends BackgroundJob[Unit](AnalyzeModuleInScopeJob.Priority) {
|
||||
|
||||
private val exportsBuilder = new ExportsBuilder
|
||||
|
||||
@ -38,6 +34,9 @@ final class AnalyzeModuleInScopeJob(
|
||||
}
|
||||
}
|
||||
|
||||
override def toString: String =
|
||||
s"AnalyzeModuleInScopeJob($modules)"
|
||||
|
||||
private def analyzeModuleInScope(module: Module)(implicit
|
||||
ctx: RuntimeContext
|
||||
): Unit = {
|
||||
@ -101,4 +100,6 @@ object AnalyzeModuleInScopeJob {
|
||||
*/
|
||||
def apply(modules: Iterable[Module]): AnalyzeModuleInScopeJob =
|
||||
new AnalyzeModuleInScopeJob(modules)
|
||||
|
||||
private val Priority = 11
|
||||
}
|
||||
|
@ -12,18 +12,12 @@ import org.enso.interpreter.runtime.Module
|
||||
import org.enso.polyglot.ModuleExports
|
||||
import org.enso.polyglot.data.Tree
|
||||
import org.enso.polyglot.runtime.Runtime.Api
|
||||
import org.enso.polyglot.runtime.Runtime.Api.ContextId
|
||||
import org.enso.text.buffer.Rope
|
||||
|
||||
import java.util.UUID
|
||||
import java.util.logging.Level
|
||||
|
||||
final class AnalyzeModuleJob(module: Module, changeset: Changeset[Rope])
|
||||
extends Job[Unit](
|
||||
List(AnalyzeModuleJob.backgroundContextId),
|
||||
false,
|
||||
false
|
||||
) {
|
||||
extends BackgroundJob[Unit](AnalyzeModuleJob.Priority) {
|
||||
|
||||
/** @inheritdoc */
|
||||
override def run(implicit ctx: RuntimeContext): Unit = {
|
||||
@ -39,7 +33,7 @@ object AnalyzeModuleJob {
|
||||
def apply(module: Module, changeset: Changeset[Rope]): AnalyzeModuleJob =
|
||||
new AnalyzeModuleJob(module, changeset)
|
||||
|
||||
val backgroundContextId: ContextId = UUID.randomUUID()
|
||||
private val Priority = 10
|
||||
|
||||
private val exportsBuilder = new ExportsBuilder
|
||||
|
||||
@ -115,5 +109,4 @@ object AnalyzeModuleJob {
|
||||
) {
|
||||
ctx.endpoint.sendToClient(Api.Response(payload))
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -14,11 +14,7 @@ import scala.jdk.CollectionConverters._
|
||||
*/
|
||||
final class DeserializeLibrarySuggestionsJob(
|
||||
libraryName: LibraryName
|
||||
) extends Job[Unit](
|
||||
List(),
|
||||
isCancellable = false,
|
||||
mayInterruptIfRunning = false
|
||||
) {
|
||||
) extends BackgroundJob[Unit](DeserializeLibrarySuggestionsJob.Priority) {
|
||||
|
||||
/** @inheritdoc */
|
||||
override def run(implicit ctx: RuntimeContext): Unit = {
|
||||
@ -41,4 +37,12 @@ final class DeserializeLibrarySuggestionsJob(
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
override def toString: String =
|
||||
s"DeserializeLibrarySuggestionsJob($libraryName)"
|
||||
}
|
||||
|
||||
object DeserializeLibrarySuggestionsJob {
|
||||
|
||||
private val Priority = 100
|
||||
}
|
||||
|
@ -104,7 +104,7 @@ final class EnsureCompiledJob(protected val files: Iterable[File])
|
||||
ctx: RuntimeContext
|
||||
): Iterable[CompilationStatus] = {
|
||||
val notIndexedModulesInScope = modulesInScope.filter(!_.isIndexed)
|
||||
val (modulesToAnalyze, compilationStatuses) =
|
||||
val (modulesToAnalyzeBuilder, compilationStatusesBuilder) =
|
||||
notIndexedModulesInScope.foldLeft(
|
||||
(Set.newBuilder[Module], Vector.newBuilder[CompilationStatus])
|
||||
) { case ((modules, statuses), module) =>
|
||||
@ -127,10 +127,13 @@ final class EnsureCompiledJob(protected val files: Iterable[File])
|
||||
)
|
||||
}
|
||||
}
|
||||
ctx.jobProcessor.runBackground(
|
||||
AnalyzeModuleInScopeJob(modulesToAnalyze.result())
|
||||
)
|
||||
compilationStatuses.result()
|
||||
val modulesToAnalyze = modulesToAnalyzeBuilder.result()
|
||||
if (modulesToAnalyze.nonEmpty) {
|
||||
ctx.jobProcessor.runBackground(
|
||||
AnalyzeModuleInScopeJob(modulesToAnalyze)
|
||||
)
|
||||
}
|
||||
compilationStatusesBuilder.result()
|
||||
}
|
||||
|
||||
/** Extract compilation diagnostics from the module and send the diagnostic
|
||||
|
@ -23,8 +23,6 @@ abstract class Job[+A](
|
||||
* @param ctx contains suppliers of services to perform a request
|
||||
*/
|
||||
def run(implicit ctx: RuntimeContext): A
|
||||
|
||||
override def toString: String = this.getClass.getSimpleName
|
||||
}
|
||||
|
||||
/** The job queue can contain only one job of this type with the same `key`.
|
||||
|
@ -56,7 +56,8 @@ public class SerializerTest {
|
||||
var result = compiler.run(module);
|
||||
assertEquals(result.compiledModules().exists(m -> m == module), true);
|
||||
var serializationManager = new SerializationManager(ensoContext.getCompiler());
|
||||
var future = serializationManager.serializeModule(module, true);
|
||||
var useThreadPool = compiler.context().getEnvironment().isCreateThreadAllowed();
|
||||
var future = serializationManager.serializeModule(module, true, useThreadPool);
|
||||
var serialized = future.get(5, TimeUnit.SECONDS);
|
||||
assertEquals(serialized, true);
|
||||
var deserialized = serializationManager.deserialize(module);
|
||||
|
@ -12,7 +12,7 @@ import scala.jdk.CollectionConverters._
|
||||
*/
|
||||
case class QualifiedName(path: List[String], item: String) {
|
||||
|
||||
lazy val qualifiedNameString: String =
|
||||
private lazy val qualifiedNameString: String =
|
||||
(path :+ item).mkString(QualifiedName.separator)
|
||||
|
||||
@CompilerDirectives.TruffleBoundary
|
||||
|
Loading…
Reference in New Issue
Block a user