Synchronize suggestions loading after the reconnect 2 (#9142)

related #8689, #9072

Fixes a race between the language server SQL updating logic and the engine `DeserializeLibrarySuggestionsJob`s when the library suggestions may start loading before the database is properly cleaned up after the reconnect.

# Important Notes
As a side effect, arguments are showing slightly (~1 second) faster due to the lower contention between the engine jobs.

#### Before
https://github.com/enso-org/enso/assets/357683/cbda2da4-9080-4b9b-b836-81e54694d468

#### After
https://github.com/enso-org/enso/assets/357683/bf442284-47be-456d-b1dd-2413b6ad8244
This commit is contained in:
Dmitry Bushev 2024-02-23 11:18:01 +00:00 committed by GitHub
parent 0d635626aa
commit 7ff90aa3f7
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 107 additions and 50 deletions

View File

@ -493,6 +493,9 @@ object SearchProtocol {
updates: Seq[SuggestionsDatabaseUpdate]
)
/** A request to clear the suggestions database. */
case object ClearSuggestionsDatabase
/** The request to receive contents of the suggestions database. */
case object GetSuggestionsDatabase

View File

@ -110,6 +110,8 @@ final class SuggestionsHandler(
)
context.system.eventStream
.subscribe(self, classOf[Api.LibraryLoaded])
context.system.eventStream
.subscribe(self, classOf[Api.BackgroundJobsStartedNotification])
context.system.eventStream.subscribe(self, classOf[FileDeletedEvent])
context.system.eventStream
.subscribe(self, InitializedEvent.SuggestionsRepoInitialized.getClass)
@ -186,13 +188,29 @@ final class SuggestionsHandler(
case msg: Api.SuggestionsDatabaseSuggestionsLoadedNotification
if state.isSuggestionLoadingRunning =>
state.suggestionLoadingQueue.enqueue(msg)
logger.trace(
"SuggestionsDatabaseSuggestionsLoadedNotification [shouldStartBackgroundProcessing={}].",
state.shouldStartBackgroundProcessing
)
if (state.shouldStartBackgroundProcessing) {
state.suggestionLoadingQueue.clear()
} else {
state.suggestionLoadingQueue.enqueue(msg)
}
case msg: Api.SuggestionsDatabaseSuggestionsLoadedNotification =>
logger.debug(
"Starting loading suggestions for library [{}].",
msg.libraryName
)
context.become(
initialized(
projectName,
graph,
clients,
state.suggestionLoadingRunning()
)
)
applyLoadedSuggestions(msg.suggestions)
.onComplete {
case Success(notification) =>
@ -214,14 +232,6 @@ final class SuggestionsHandler(
)
self ! SuggestionsHandler.SuggestionLoadingCompleted
}
context.become(
initialized(
projectName,
graph,
clients,
state.suggestionLoadingRunning()
)
)
case msg: Api.SuggestionsDatabaseModuleUpdateNotification
if state.isSuggestionUpdatesRunning =>
@ -304,33 +314,8 @@ final class SuggestionsHandler(
)
)
case GetSuggestionsDatabaseVersion =>
suggestionsRepo.currentVersion
.map(GetSuggestionsDatabaseVersionResult)
.pipeTo(sender())
case GetSuggestionsDatabase =>
val responseAction = for {
_ <- suggestionsRepo.clean
version <- suggestionsRepo.currentVersion
} yield GetSuggestionsDatabaseResult(version, Seq())
responseAction.pipeTo(sender())
val handlerAction = for {
_ <- responseAction
} yield SearchProtocol.InvalidateModulesIndex
val handler = context.system.actorOf(
InvalidateModulesIndexHandler.props(
RuntimeFailureMapper(contentRootManager),
timeout,
runtimeConnector
)
)
handlerAction.pipeTo(handler)
case Api.BackgroundJobsStartedNotification() =>
self ! SuggestionLoadingCompleted
context.become(
initialized(
projectName,
@ -340,6 +325,57 @@ final class SuggestionsHandler(
)
)
case GetSuggestionsDatabaseVersion =>
suggestionsRepo.currentVersion
.map(GetSuggestionsDatabaseVersionResult)
.pipeTo(sender())
case ClearSuggestionsDatabase =>
if (state.isSuggestionLoadingRunning) stash()
else {
context.become(
initialized(
projectName,
graph,
clients,
state.suggestionLoadingRunning()
)
)
for {
_ <- suggestionsRepo.clean
} yield {
logger.trace(
"ClearSuggestionsDatabase [{}].",
state.suggestionLoadingQueue
)
state.suggestionLoadingQueue.clear()
runtimeConnector ! Api.Request(Api.StartBackgroundProcessing())
}
}
case GetSuggestionsDatabase =>
val handler = context.system.actorOf(
InvalidateModulesIndexHandler.props(
RuntimeFailureMapper(contentRootManager),
timeout,
runtimeConnector,
self
)
)
handler ! SearchProtocol.InvalidateModulesIndex
sender() ! GetSuggestionsDatabaseResult(0, Seq())
context.become(
initialized(
projectName,
graph,
clients,
state.backgroundProcessingStopped()
)
)
case Completion(path, pos, selfType, returnType, tags, isStatic) =>
val selfTypes = selfType.toList.flatMap(ty => ty :: graph.getParents(ty))
getModuleName(projectName, path)
@ -419,7 +455,8 @@ final class SuggestionsHandler(
InvalidateModulesIndexHandler.props(
runtimeFailureMapper,
timeout,
runtimeConnector
runtimeConnector,
self
)
)
action.pipeTo(handler)(sender())
@ -450,6 +487,7 @@ final class SuggestionsHandler(
)
case SuggestionLoadingCompleted =>
unstashAll()
if (state.suggestionLoadingQueue.nonEmpty) {
self ! state.suggestionLoadingQueue.dequeue()
}

View File

@ -17,11 +17,13 @@ import scala.concurrent.duration.FiniteDuration
* @param runtimeFailureMapper mapper for runtime failures
* @param timeout request timeout
* @param runtime reference to the runtime connector
* @param suggestionsHandler reference to the suggestions handler
*/
final class InvalidateModulesIndexHandler(
runtimeFailureMapper: RuntimeFailureMapper,
timeout: FiniteDuration,
runtime: ActorRef
runtime: ActorRef,
suggestionsHandler: ActorRef
) extends Actor
with LazyLogging
with UnhandledLogging {
@ -50,6 +52,7 @@ final class InvalidateModulesIndexHandler(
context.stop(self)
case Api.Response(_, Api.InvalidateModulesIndexResponse()) =>
suggestionsHandler ! SearchProtocol.ClearSuggestionsDatabase
replyTo ! SearchProtocol.InvalidateSuggestionsDatabaseResult
cancellable.cancel()
context.stop(self)
@ -67,14 +70,21 @@ object InvalidateModulesIndexHandler {
*
* @param runtimeFailureMapper mapper for runtime failures
* @param timeout request timeout
* @param runtime reference to the runtime conector
* @param runtime reference to the runtime connector
* @param suggestionsHandler reference to the suggestions handler
*/
def props(
runtimeFailureMapper: RuntimeFailureMapper,
timeout: FiniteDuration,
runtime: ActorRef
runtime: ActorRef,
suggestionsHandler: ActorRef
): Props =
Props(
new InvalidateModulesIndexHandler(runtimeFailureMapper, timeout, runtime)
new InvalidateModulesIndexHandler(
runtimeFailureMapper,
timeout,
runtime,
suggestionsHandler
)
)
}

View File

@ -5,7 +5,6 @@ import java.util.UUID;
import java.util.logging.Level;
import org.enso.interpreter.instrument.execution.RuntimeContext;
import org.enso.interpreter.instrument.job.DeserializeLibrarySuggestionsJob;
import org.enso.interpreter.instrument.job.StartBackgroundProcessingJob;
import org.enso.interpreter.runtime.EnsoContext;
import org.enso.polyglot.runtime.Runtime$Api$InvalidateModulesIndexResponse;
import scala.Option;
@ -33,6 +32,7 @@ public final class InvalidateModulesIndexCommand extends AsynchronousCommand {
TruffleLogger logger = ctx.executionService().getLogger();
long writeCompilationLockTimestamp = ctx.locking().acquireWriteCompilationLock();
try {
ctx.jobControlPlane().stopBackgroundJobs();
ctx.jobControlPlane().abortBackgroundJobs(DeserializeLibrarySuggestionsJob.class);
EnsoContext context = ctx.executionService().getContext();
@ -48,7 +48,6 @@ public final class InvalidateModulesIndexCommand extends AsynchronousCommand {
return BoxedUnit.UNIT;
});
StartBackgroundProcessingJob.startBackgroundJobs(ctx);
reply(new Runtime$Api$InvalidateModulesIndexResponse(), ctx);
} finally {
ctx.locking().releaseWriteCompilationLock();

View File

@ -22,6 +22,17 @@ public abstract class BackgroundJob<A> extends Job<A> {
this.priority = priority;
}
/**
* Create a background job with priority.
*
* @param priority the job priority. Lower number indicates higher priority.
* @param mayInterruptIfRunning the flag indicating if the running job may be interrupted.
*/
public BackgroundJob(int priority, boolean mayInterruptIfRunning) {
super(List$.MODULE$.empty(), true, mayInterruptIfRunning);
this.priority = priority;
}
/**
* @return the job priority.
*/

View File

@ -19,8 +19,7 @@ final class StartBackgroundProcessingCmd(
ctx: RuntimeContext,
ec: ExecutionContext
): Future[Unit] = {
StartBackgroundProcessingJob.startBackgroundJobs()
Future.successful(())
Future(StartBackgroundProcessingJob.startBackgroundJobs())
}
}

View File

@ -14,7 +14,7 @@ import scala.jdk.CollectionConverters._
*/
final class DeserializeLibrarySuggestionsJob(
val libraryName: LibraryName
) extends BackgroundJob[Unit](DeserializeLibrarySuggestionsJob.Priority)
) extends BackgroundJob[Unit](DeserializeLibrarySuggestionsJob.Priority, true)
with UniqueJob[Unit] {
/** @inheritdoc */

View File

@ -175,10 +175,7 @@ final class SqlSuggestionsRepo(val db: SqlDatabase)(implicit
/** The query to clean the repo. */
private def cleanQuery: DBIO[Unit] = {
for {
_ <- Suggestions.delete
_ <- SuggestionsVersion.delete
} yield ()
DBIO.seq(Suggestions.delete, SuggestionsVersion.delete)
}
/** The query to get all suggestions.