From 7ff90aa3f7c63c9809101d05ec3595a3970f0063 Mon Sep 17 00:00:00 2001 From: Dmitry Bushev Date: Fri, 23 Feb 2024 11:18:01 +0000 Subject: [PATCH] Synchronize suggestions loading after the reconnect 2 (#9142) related #8689, #9072 Fixes a race between the language server SQL updating logic and the engine `DeserializeLibrarySuggestionsJob`s when the library suggestions may start loading before the database is properly cleaned up after the reconnect. # Important Notes As a side effect, arguments are showing slightly (~1 second) faster due to the lower contention between the engine jobs. #### Before https://github.com/enso-org/enso/assets/357683/cbda2da4-9080-4b9b-b836-81e54694d468 #### After https://github.com/enso-org/enso/assets/357683/bf442284-47be-456d-b1dd-2413b6ad8244 --- .../search/SearchProtocol.scala | 3 + .../search/SuggestionsHandler.scala | 112 ++++++++++++------ .../InvalidateModulesIndexHandler.scala | 18 ++- .../InvalidateModulesIndexCommand.java | 3 +- .../instrument/job/BackgroundJob.java | 11 ++ .../StartBackgroundProcessingCmd.scala | 3 +- .../DeserializeLibrarySuggestionsJob.scala | 2 +- .../searcher/sql/SqlSuggestionsRepo.scala | 5 +- 8 files changed, 107 insertions(+), 50 deletions(-) diff --git a/engine/language-server/src/main/scala/org/enso/languageserver/search/SearchProtocol.scala b/engine/language-server/src/main/scala/org/enso/languageserver/search/SearchProtocol.scala index a4c4a737ab..5d636a34c7 100644 --- a/engine/language-server/src/main/scala/org/enso/languageserver/search/SearchProtocol.scala +++ b/engine/language-server/src/main/scala/org/enso/languageserver/search/SearchProtocol.scala @@ -493,6 +493,9 @@ object SearchProtocol { updates: Seq[SuggestionsDatabaseUpdate] ) + /** A request to clear the suggestions database. */ + case object ClearSuggestionsDatabase + /** The request to receive contents of the suggestions database. */ case object GetSuggestionsDatabase diff --git a/engine/language-server/src/main/scala/org/enso/languageserver/search/SuggestionsHandler.scala b/engine/language-server/src/main/scala/org/enso/languageserver/search/SuggestionsHandler.scala index 86e58510f5..ba5c85eca2 100644 --- a/engine/language-server/src/main/scala/org/enso/languageserver/search/SuggestionsHandler.scala +++ b/engine/language-server/src/main/scala/org/enso/languageserver/search/SuggestionsHandler.scala @@ -110,6 +110,8 @@ final class SuggestionsHandler( ) context.system.eventStream .subscribe(self, classOf[Api.LibraryLoaded]) + context.system.eventStream + .subscribe(self, classOf[Api.BackgroundJobsStartedNotification]) context.system.eventStream.subscribe(self, classOf[FileDeletedEvent]) context.system.eventStream .subscribe(self, InitializedEvent.SuggestionsRepoInitialized.getClass) @@ -186,13 +188,29 @@ final class SuggestionsHandler( case msg: Api.SuggestionsDatabaseSuggestionsLoadedNotification if state.isSuggestionLoadingRunning => - state.suggestionLoadingQueue.enqueue(msg) + logger.trace( + "SuggestionsDatabaseSuggestionsLoadedNotification [shouldStartBackgroundProcessing={}].", + state.shouldStartBackgroundProcessing + ) + if (state.shouldStartBackgroundProcessing) { + state.suggestionLoadingQueue.clear() + } else { + state.suggestionLoadingQueue.enqueue(msg) + } case msg: Api.SuggestionsDatabaseSuggestionsLoadedNotification => logger.debug( "Starting loading suggestions for library [{}].", msg.libraryName ) + context.become( + initialized( + projectName, + graph, + clients, + state.suggestionLoadingRunning() + ) + ) applyLoadedSuggestions(msg.suggestions) .onComplete { case Success(notification) => @@ -214,14 +232,6 @@ final class SuggestionsHandler( ) self ! SuggestionsHandler.SuggestionLoadingCompleted } - context.become( - initialized( - projectName, - graph, - clients, - state.suggestionLoadingRunning() - ) - ) case msg: Api.SuggestionsDatabaseModuleUpdateNotification if state.isSuggestionUpdatesRunning => @@ -304,33 +314,8 @@ final class SuggestionsHandler( ) ) - case GetSuggestionsDatabaseVersion => - suggestionsRepo.currentVersion - .map(GetSuggestionsDatabaseVersionResult) - .pipeTo(sender()) - - case GetSuggestionsDatabase => - val responseAction = for { - _ <- suggestionsRepo.clean - version <- suggestionsRepo.currentVersion - } yield GetSuggestionsDatabaseResult(version, Seq()) - - responseAction.pipeTo(sender()) - - val handlerAction = for { - _ <- responseAction - } yield SearchProtocol.InvalidateModulesIndex - - val handler = context.system.actorOf( - InvalidateModulesIndexHandler.props( - RuntimeFailureMapper(contentRootManager), - timeout, - runtimeConnector - ) - ) - - handlerAction.pipeTo(handler) - + case Api.BackgroundJobsStartedNotification() => + self ! SuggestionLoadingCompleted context.become( initialized( projectName, @@ -340,6 +325,57 @@ final class SuggestionsHandler( ) ) + case GetSuggestionsDatabaseVersion => + suggestionsRepo.currentVersion + .map(GetSuggestionsDatabaseVersionResult) + .pipeTo(sender()) + + case ClearSuggestionsDatabase => + if (state.isSuggestionLoadingRunning) stash() + else { + context.become( + initialized( + projectName, + graph, + clients, + state.suggestionLoadingRunning() + ) + ) + for { + _ <- suggestionsRepo.clean + } yield { + logger.trace( + "ClearSuggestionsDatabase [{}].", + state.suggestionLoadingQueue + ) + state.suggestionLoadingQueue.clear() + runtimeConnector ! Api.Request(Api.StartBackgroundProcessing()) + } + } + + case GetSuggestionsDatabase => + val handler = context.system.actorOf( + InvalidateModulesIndexHandler.props( + RuntimeFailureMapper(contentRootManager), + timeout, + runtimeConnector, + self + ) + ) + + handler ! SearchProtocol.InvalidateModulesIndex + + sender() ! GetSuggestionsDatabaseResult(0, Seq()) + + context.become( + initialized( + projectName, + graph, + clients, + state.backgroundProcessingStopped() + ) + ) + case Completion(path, pos, selfType, returnType, tags, isStatic) => val selfTypes = selfType.toList.flatMap(ty => ty :: graph.getParents(ty)) getModuleName(projectName, path) @@ -419,7 +455,8 @@ final class SuggestionsHandler( InvalidateModulesIndexHandler.props( runtimeFailureMapper, timeout, - runtimeConnector + runtimeConnector, + self ) ) action.pipeTo(handler)(sender()) @@ -450,6 +487,7 @@ final class SuggestionsHandler( ) case SuggestionLoadingCompleted => + unstashAll() if (state.suggestionLoadingQueue.nonEmpty) { self ! state.suggestionLoadingQueue.dequeue() } diff --git a/engine/language-server/src/main/scala/org/enso/languageserver/search/handler/InvalidateModulesIndexHandler.scala b/engine/language-server/src/main/scala/org/enso/languageserver/search/handler/InvalidateModulesIndexHandler.scala index 899eb958de..69e9dd2dac 100644 --- a/engine/language-server/src/main/scala/org/enso/languageserver/search/handler/InvalidateModulesIndexHandler.scala +++ b/engine/language-server/src/main/scala/org/enso/languageserver/search/handler/InvalidateModulesIndexHandler.scala @@ -17,11 +17,13 @@ import scala.concurrent.duration.FiniteDuration * @param runtimeFailureMapper mapper for runtime failures * @param timeout request timeout * @param runtime reference to the runtime connector + * @param suggestionsHandler reference to the suggestions handler */ final class InvalidateModulesIndexHandler( runtimeFailureMapper: RuntimeFailureMapper, timeout: FiniteDuration, - runtime: ActorRef + runtime: ActorRef, + suggestionsHandler: ActorRef ) extends Actor with LazyLogging with UnhandledLogging { @@ -50,6 +52,7 @@ final class InvalidateModulesIndexHandler( context.stop(self) case Api.Response(_, Api.InvalidateModulesIndexResponse()) => + suggestionsHandler ! SearchProtocol.ClearSuggestionsDatabase replyTo ! SearchProtocol.InvalidateSuggestionsDatabaseResult cancellable.cancel() context.stop(self) @@ -67,14 +70,21 @@ object InvalidateModulesIndexHandler { * * @param runtimeFailureMapper mapper for runtime failures * @param timeout request timeout - * @param runtime reference to the runtime conector + * @param runtime reference to the runtime connector + * @param suggestionsHandler reference to the suggestions handler */ def props( runtimeFailureMapper: RuntimeFailureMapper, timeout: FiniteDuration, - runtime: ActorRef + runtime: ActorRef, + suggestionsHandler: ActorRef ): Props = Props( - new InvalidateModulesIndexHandler(runtimeFailureMapper, timeout, runtime) + new InvalidateModulesIndexHandler( + runtimeFailureMapper, + timeout, + runtime, + suggestionsHandler + ) ) } diff --git a/engine/runtime-instrument-common/src/main/java/org/enso/interpreter/instrument/command/InvalidateModulesIndexCommand.java b/engine/runtime-instrument-common/src/main/java/org/enso/interpreter/instrument/command/InvalidateModulesIndexCommand.java index 7edaa27108..3e61838cac 100644 --- a/engine/runtime-instrument-common/src/main/java/org/enso/interpreter/instrument/command/InvalidateModulesIndexCommand.java +++ b/engine/runtime-instrument-common/src/main/java/org/enso/interpreter/instrument/command/InvalidateModulesIndexCommand.java @@ -5,7 +5,6 @@ import java.util.UUID; import java.util.logging.Level; import org.enso.interpreter.instrument.execution.RuntimeContext; import org.enso.interpreter.instrument.job.DeserializeLibrarySuggestionsJob; -import org.enso.interpreter.instrument.job.StartBackgroundProcessingJob; import org.enso.interpreter.runtime.EnsoContext; import org.enso.polyglot.runtime.Runtime$Api$InvalidateModulesIndexResponse; import scala.Option; @@ -33,6 +32,7 @@ public final class InvalidateModulesIndexCommand extends AsynchronousCommand { TruffleLogger logger = ctx.executionService().getLogger(); long writeCompilationLockTimestamp = ctx.locking().acquireWriteCompilationLock(); try { + ctx.jobControlPlane().stopBackgroundJobs(); ctx.jobControlPlane().abortBackgroundJobs(DeserializeLibrarySuggestionsJob.class); EnsoContext context = ctx.executionService().getContext(); @@ -48,7 +48,6 @@ public final class InvalidateModulesIndexCommand extends AsynchronousCommand { return BoxedUnit.UNIT; }); - StartBackgroundProcessingJob.startBackgroundJobs(ctx); reply(new Runtime$Api$InvalidateModulesIndexResponse(), ctx); } finally { ctx.locking().releaseWriteCompilationLock(); diff --git a/engine/runtime-instrument-common/src/main/java/org/enso/interpreter/instrument/job/BackgroundJob.java b/engine/runtime-instrument-common/src/main/java/org/enso/interpreter/instrument/job/BackgroundJob.java index 3ecac0f036..23bf01a84d 100644 --- a/engine/runtime-instrument-common/src/main/java/org/enso/interpreter/instrument/job/BackgroundJob.java +++ b/engine/runtime-instrument-common/src/main/java/org/enso/interpreter/instrument/job/BackgroundJob.java @@ -22,6 +22,17 @@ public abstract class BackgroundJob extends Job { this.priority = priority; } + /** + * Create a background job with priority. + * + * @param priority the job priority. Lower number indicates higher priority. + * @param mayInterruptIfRunning the flag indicating if the running job may be interrupted. + */ + public BackgroundJob(int priority, boolean mayInterruptIfRunning) { + super(List$.MODULE$.empty(), true, mayInterruptIfRunning); + this.priority = priority; + } + /** * @return the job priority. */ diff --git a/engine/runtime-instrument-common/src/main/scala/org/enso/interpreter/instrument/command/StartBackgroundProcessingCmd.scala b/engine/runtime-instrument-common/src/main/scala/org/enso/interpreter/instrument/command/StartBackgroundProcessingCmd.scala index 5194accc48..bde3651c8e 100644 --- a/engine/runtime-instrument-common/src/main/scala/org/enso/interpreter/instrument/command/StartBackgroundProcessingCmd.scala +++ b/engine/runtime-instrument-common/src/main/scala/org/enso/interpreter/instrument/command/StartBackgroundProcessingCmd.scala @@ -19,8 +19,7 @@ final class StartBackgroundProcessingCmd( ctx: RuntimeContext, ec: ExecutionContext ): Future[Unit] = { - StartBackgroundProcessingJob.startBackgroundJobs() - Future.successful(()) + Future(StartBackgroundProcessingJob.startBackgroundJobs()) } } diff --git a/engine/runtime-instrument-common/src/main/scala/org/enso/interpreter/instrument/job/DeserializeLibrarySuggestionsJob.scala b/engine/runtime-instrument-common/src/main/scala/org/enso/interpreter/instrument/job/DeserializeLibrarySuggestionsJob.scala index 7f9277f19b..2e1bbe7176 100644 --- a/engine/runtime-instrument-common/src/main/scala/org/enso/interpreter/instrument/job/DeserializeLibrarySuggestionsJob.scala +++ b/engine/runtime-instrument-common/src/main/scala/org/enso/interpreter/instrument/job/DeserializeLibrarySuggestionsJob.scala @@ -14,7 +14,7 @@ import scala.jdk.CollectionConverters._ */ final class DeserializeLibrarySuggestionsJob( val libraryName: LibraryName -) extends BackgroundJob[Unit](DeserializeLibrarySuggestionsJob.Priority) +) extends BackgroundJob[Unit](DeserializeLibrarySuggestionsJob.Priority, true) with UniqueJob[Unit] { /** @inheritdoc */ diff --git a/lib/scala/searcher/src/main/scala/org/enso/searcher/sql/SqlSuggestionsRepo.scala b/lib/scala/searcher/src/main/scala/org/enso/searcher/sql/SqlSuggestionsRepo.scala index 62d9faae18..457e8b7ad2 100644 --- a/lib/scala/searcher/src/main/scala/org/enso/searcher/sql/SqlSuggestionsRepo.scala +++ b/lib/scala/searcher/src/main/scala/org/enso/searcher/sql/SqlSuggestionsRepo.scala @@ -175,10 +175,7 @@ final class SqlSuggestionsRepo(val db: SqlDatabase)(implicit /** The query to clean the repo. */ private def cleanQuery: DBIO[Unit] = { - for { - _ <- Suggestions.delete - _ <- SuggestionsVersion.delete - } yield () + DBIO.seq(Suggestions.delete, SuggestionsVersion.delete) } /** The query to get all suggestions.