Wait for Suggestions Database Initialization (#1025)

This commit is contained in:
Dmitry Bushev 2020-07-22 17:12:52 +03:00 committed by GitHub
parent cf79d99f3f
commit 45d75536aa
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
27 changed files with 201 additions and 78 deletions

View File

@ -35,16 +35,29 @@ class LanguageServerComponent(config: LanguageServerConfig)
/** @inheritdoc */
override def start(): Future[ComponentStarted.type] = {
logger.info("Starting Language Server...")
val module = new MainModule(config)
val initMainModule =
for {
_ <- module.init
_ <- Future { logger.debug("Main module initialized") }
} yield ()
val bindJsonServer =
for {
binding <- module.jsonRpcServer.bind(config.interface, config.rpcPort)
_ <- Future { logger.debug("Json RPC server initialized") }
} yield binding
val bindBinaryServer =
for {
binding <- module.binaryServer.bind(config.interface, config.dataPort)
_ <- Future { logger.debug("Binary server initialized") }
} yield binding
for {
module <- Future { new MainModule(config) }
_ <- Future { logger.debug("MainModule created") }
jsonBinding <- module.jsonRpcServer.bind(config.interface, config.rpcPort)
binaryBinding <-
module.binaryServer
.bind(config.interface, config.dataPort)
jsonBinding <- bindJsonServer
binaryBinding <- bindBinaryServer
_ <- Future {
maybeServerCtx = Some(ServerContext(module, jsonBinding, binaryBinding))
}
_ <- initMainModule
_ <- Future {
logger.info(
s"Started server at json:${config.interface}:${config.rpcPort}, " +

View File

@ -8,6 +8,7 @@ import org.enso.jsonrpc.JsonRpcServer
import org.enso.languageserver.capability.CapabilityRouter
import org.enso.languageserver.data._
import org.enso.languageserver.effect.ZioExec
import org.enso.languageserver.event.InitializedEvent
import org.enso.languageserver.filemanager.{
FileManager,
FileSystem,
@ -24,6 +25,7 @@ import org.enso.languageserver.protocol.json.{
JsonRpc
}
import org.enso.languageserver.runtime._
import org.enso.languageserver.search.SuggestionsHandler
import org.enso.languageserver.session.SessionRouter
import org.enso.languageserver.text.BufferRegistry
import org.enso.languageserver.util.binary.BinaryEncoder
@ -32,7 +34,9 @@ import org.enso.searcher.sql.{SqlDatabase, SqlSuggestionsRepo, SqlVersionsRepo}
import org.graalvm.polyglot.Context
import org.graalvm.polyglot.io.MessageEndpoint
import scala.concurrent.Future
import scala.concurrent.duration._
import scala.util.{Failure, Success}
/**
* A main module containing all components of the server.
@ -64,21 +68,10 @@ class MainModule(serverConfig: LanguageServerConfig) {
Some(serverConfig.computeExecutionContext)
)
val sqlDatabase = SqlDatabase(
languageServerConfig.directories.suggestionsDatabaseFile
)
system.log.debug("Sql database created")
val suggestionsRepo = {
val repo = new SqlSuggestionsRepo(sqlDatabase)(system.dispatcher)
repo.init
repo
}
val versionsRepo = {
val repo = new SqlVersionsRepo(sqlDatabase)(system.dispatcher)
repo.init
repo
}
val sqlDatabase =
SqlDatabase(languageServerConfig.directories.suggestionsDatabaseFile)
val suggestionsRepo = new SqlSuggestionsRepo(sqlDatabase)(system.dispatcher)
val versionsRepo = new SqlVersionsRepo(sqlDatabase)(system.dispatcher)
system.log.debug("Sql repos created")
lazy val sessionRouter =
@ -215,6 +208,31 @@ class MainModule(serverConfig: LanguageServerConfig) {
new BinaryConnectionControllerFactory(fileManager)
)
/** Initialize the module. */
def init: Future[Unit] = {
import system.dispatcher
val suggestionsRepoInit = suggestionsRepo.init
suggestionsRepoInit.onComplete {
case Success(()) =>
system.eventStream.publish(InitializedEvent.SuggestionsRepoInitialized)
case Failure(ex) =>
system.log.error(ex, "Failed to initialize SQL suggestions repo")
}
val versionsRepoInit = versionsRepo.init
versionsRepoInit.onComplete {
case Success(()) =>
system.eventStream.publish(InitializedEvent.FileVersionsRepoInitialized)
case Failure(ex) =>
system.log.error(ex, "Failed to initialize SQL versions repo")
}(system.dispatcher)
Future
.sequence(Seq(suggestionsRepoInit, versionsRepoInit))
.map(_ => ())
}
/** Close the main module releasing all resources. */
def close(): Unit = {
suggestionsRepo.close()

View File

@ -0,0 +1,10 @@
package org.enso.languageserver.event
/** Event about the initialization of the language server component. */
sealed trait InitializedEvent extends Event
object InitializedEvent {
case object SuggestionsRepoInitialized extends InitializedEvent
case object FileVersionsRepoInitialized extends InitializedEvent
}

View File

@ -35,13 +35,9 @@ import org.enso.languageserver.requesthandler.visualisation.{
DetachVisualisationHandler,
ModifyVisualisationHandler
}
import org.enso.languageserver.runtime.{
ContextRegistryProtocol,
SearchApi,
SearchProtocol
}
import org.enso.languageserver.runtime.ContextRegistryProtocol
import org.enso.languageserver.runtime.ExecutionApi._
import org.enso.languageserver.runtime.SearchApi.{
import org.enso.languageserver.search.SearchApi.{
Completion,
GetSuggestionsDatabase,
GetSuggestionsDatabaseVersion
@ -51,6 +47,7 @@ import org.enso.languageserver.runtime.VisualisationApi.{
DetachVisualisation,
ModifyVisualisation
}
import org.enso.languageserver.search.{SearchApi, SearchProtocol}
import org.enso.languageserver.session.JsonSession
import org.enso.languageserver.session.SessionApi.{
InitProtocolConnection,

View File

@ -13,7 +13,7 @@ import org.enso.languageserver.io.InputOutputApi._
import org.enso.languageserver.monitoring.MonitoringApi.Ping
import org.enso.languageserver.refactoring.RefactoringApi.RenameProject
import org.enso.languageserver.runtime.ExecutionApi._
import org.enso.languageserver.runtime.SearchApi._
import org.enso.languageserver.search.SearchApi._
import org.enso.languageserver.runtime.VisualisationApi._
import org.enso.languageserver.session.SessionApi.InitProtocolConnection
import org.enso.languageserver.text.TextApi._

View File

@ -4,11 +4,11 @@ import akka.actor.{Actor, ActorLogging, ActorRef, Cancellable, Props, Status}
import org.enso.jsonrpc.Errors.ServiceError
import org.enso.jsonrpc._
import org.enso.languageserver.requesthandler.RequestTimeout
import org.enso.languageserver.runtime.SearchApi.{
import org.enso.languageserver.search.SearchApi.{
Completion,
SuggestionsDatabaseError
}
import org.enso.languageserver.runtime.{SearchFailureMapper, SearchProtocol}
import org.enso.languageserver.search.{SearchFailureMapper, SearchProtocol}
import org.enso.languageserver.util.UnhandledLogging
import scala.concurrent.duration.FiniteDuration

View File

@ -4,11 +4,11 @@ import akka.actor.{Actor, ActorLogging, ActorRef, Cancellable, Props, Status}
import org.enso.jsonrpc.Errors.ServiceError
import org.enso.jsonrpc._
import org.enso.languageserver.requesthandler.RequestTimeout
import org.enso.languageserver.runtime.SearchApi.{
import org.enso.languageserver.search.SearchApi.{
GetSuggestionsDatabase,
SuggestionsDatabaseError
}
import org.enso.languageserver.runtime.{SearchFailureMapper, SearchProtocol}
import org.enso.languageserver.search.{SearchFailureMapper, SearchProtocol}
import org.enso.languageserver.util.UnhandledLogging
import scala.concurrent.duration.FiniteDuration

View File

@ -4,11 +4,11 @@ import akka.actor.{Actor, ActorLogging, ActorRef, Cancellable, Props, Status}
import org.enso.jsonrpc.Errors.ServiceError
import org.enso.jsonrpc._
import org.enso.languageserver.requesthandler.RequestTimeout
import org.enso.languageserver.runtime.SearchApi.{
import org.enso.languageserver.search.SearchApi.{
GetSuggestionsDatabaseVersion,
SuggestionsDatabaseError
}
import org.enso.languageserver.runtime.{SearchFailureMapper, SearchProtocol}
import org.enso.languageserver.search.{SearchFailureMapper, SearchProtocol}
import org.enso.languageserver.util.UnhandledLogging
import scala.concurrent.duration.FiniteDuration

View File

@ -1,4 +1,4 @@
package org.enso.languageserver.runtime
package org.enso.languageserver.search
import java.nio.file.Path

View File

@ -1,8 +1,8 @@
package org.enso.languageserver.runtime
package org.enso.languageserver.search
import org.enso.jsonrpc.{Error, HasParams, HasResult, Method, Unused}
import org.enso.languageserver.filemanager.Path
import org.enso.languageserver.runtime.SearchProtocol.{
import org.enso.languageserver.search.SearchProtocol.{
SuggestionDatabaseEntry,
SuggestionId,
SuggestionKind,

View File

@ -1,8 +1,8 @@
package org.enso.languageserver.runtime
package org.enso.languageserver.search
import org.enso.jsonrpc.Error
import org.enso.languageserver.filemanager.FileSystemFailureMapper
import org.enso.languageserver.runtime.SearchProtocol.{
import org.enso.languageserver.search.SearchProtocol.{
FileSystemError,
ModuleNameNotResolvedError,
ProjectNotFoundError,

View File

@ -1,4 +1,4 @@
package org.enso.languageserver.runtime
package org.enso.languageserver.search
import enumeratum._
import io.circe.generic.auto._

View File

@ -1,6 +1,6 @@
package org.enso.languageserver.runtime
package org.enso.languageserver.search
import akka.actor.{Actor, ActorLogging, ActorRef, Props}
import akka.actor.{Actor, ActorLogging, ActorRef, Props, Stash}
import akka.pattern.pipe
import org.enso.languageserver.capability.CapabilityProtocol.{
AcquireCapability,
@ -14,9 +14,10 @@ import org.enso.languageserver.data.{
Config,
ReceivesSuggestionsDatabaseUpdates
}
import org.enso.languageserver.event.InitializedEvent
import org.enso.languageserver.filemanager.{FileDeletedEvent, Path}
import org.enso.languageserver.refactoring.ProjectNameChangedEvent
import org.enso.languageserver.runtime.SearchProtocol._
import org.enso.languageserver.search.SearchProtocol._
import org.enso.languageserver.session.SessionRouter.DeliverToJsonController
import org.enso.languageserver.util.UnhandledLogging
import org.enso.pkg.PackageManager
@ -70,6 +71,7 @@ final class SuggestionsHandler(
repo: SuggestionsRepo[Future],
sessionRouter: ActorRef
) extends Actor
with Stash
with ActorLogging
with UnhandledLogging {
@ -84,6 +86,8 @@ final class SuggestionsHandler(
.subscribe(self, classOf[Api.SuggestionsDatabaseReIndexNotification])
context.system.eventStream.subscribe(self, classOf[ProjectNameChangedEvent])
context.system.eventStream.subscribe(self, classOf[FileDeletedEvent])
context.system.eventStream
.subscribe(self, InitializedEvent.SuggestionsRepoInitialized.getClass)
config.contentRoots.foreach {
case (_, contentRoot) =>
@ -93,13 +97,19 @@ final class SuggestionsHandler(
}
}
override def receive: Receive = {
case ProjectNameChangedEvent(name) =>
context.become(initialized(name, Set()))
override def receive: Receive =
initializing(SuggestionsHandler.Initialization())
case msg =>
log.warning("Unhandled message: {}", msg)
sender() ! ProjectNotFoundError
def initializing(init: SuggestionsHandler.Initialization): Receive = {
case ProjectNameChangedEvent(name) =>
tryInitialize(init.copy(project = Some(name)))
case InitializedEvent.SuggestionsRepoInitialized =>
tryInitialize(
init.copy(suggestions =
Some(InitializedEvent.SuggestionsRepoInitialized)
)
)
case _ => stash()
}
def initialized(projectName: String, clients: Set[ClientId]): Receive = {
@ -239,6 +249,18 @@ final class SuggestionsHandler(
context.become(initialized(name, clients))
}
/**
* Transition the initialization process.
*
* @param state current initialization state
*/
private def tryInitialize(state: SuggestionsHandler.Initialization): Unit = {
state.initialized.fold(context.become(initializing(state))) { name =>
context.become(initialized(name, Set()))
unstashAll()
}
}
/**
* Handle the suggestions database re-index update.
*
@ -341,6 +363,29 @@ final class SuggestionsHandler(
object SuggestionsHandler {
/**
* The initialization state of the handler.
*
* @param project the project name
* @param suggestions the initialization event of the suggestions repo
*/
private case class Initialization(
project: Option[String] = None,
suggestions: Option[InitializedEvent.SuggestionsRepoInitialized.type] = None
) {
/**
* Check if all the components are initialized.
*
* @return the project name
*/
def initialized: Option[String] =
for {
_ <- suggestions
name <- project
} yield name
}
/**
* Creates a configuration object used to create a [[SuggestionsHandler]].
*

View File

@ -3,3 +3,4 @@ akka.http.server.remote-address-header = on
akka.http.server.websocket.periodic-keep-alive-max-idle = 1 second
akka.loglevel = "ERROR"
akka.test.timefactor = ${?CI_TEST_TIMEFACTOR}
searcher.db.numThreads = 1

View File

@ -1,4 +1,4 @@
package org.enso.languageserver.runtime
package org.enso.languageserver.search
import java.util.UUID

View File

@ -1,4 +1,4 @@
package org.enso.languageserver.runtime
package org.enso.languageserver.search
import java.io.File
import java.nio.file.Files
@ -12,9 +12,10 @@ import org.enso.languageserver.capability.CapabilityProtocol.{
CapabilityAcquired
}
import org.enso.languageserver.data._
import org.enso.languageserver.event.InitializedEvent
import org.enso.languageserver.filemanager.Path
import org.enso.languageserver.refactoring.ProjectNameChangedEvent
import org.enso.languageserver.runtime.SearchProtocol.SuggestionDatabaseEntry
import org.enso.languageserver.search.SearchProtocol.SuggestionDatabaseEntry
import org.enso.languageserver.session.JsonSession
import org.enso.languageserver.session.SessionRouter.DeliverToJsonController
import org.enso.polyglot.runtime.Runtime.Api
@ -28,6 +29,7 @@ import org.scalatest.wordspec.AnyWordSpecLike
import scala.concurrent.duration._
import scala.concurrent.{Await, Future}
import scala.util.{Failure, Success}
class SuggestionsHandlerSpec
extends TestKit(ActorSystem("TestSystem"))
@ -258,7 +260,12 @@ class SuggestionsHandlerSpec
val router = TestProbe("session-router")
val repo = SqlSuggestionsRepo(config.directories.suggestionsDatabaseFile)
val handler = newSuggestionsHandler(config, router, repo)
Await.ready(repo.init, Timeout)
repo.init.onComplete {
case Success(()) =>
system.eventStream.publish(InitializedEvent.SuggestionsRepoInitialized)
case Failure(ex) =>
system.log.error(ex, "Failed to initialize Suggestions repo")
}
try test(config, repo, router, handler)
finally {

View File

@ -11,6 +11,7 @@ import org.enso.jsonrpc.{ClientControllerFactory, Protocol}
import org.enso.languageserver.capability.CapabilityRouter
import org.enso.languageserver.data._
import org.enso.languageserver.effect.ZioExec
import org.enso.languageserver.event.InitializedEvent
import org.enso.languageserver.filemanager.{
FileManager,
FileSystem,
@ -21,17 +22,19 @@ import org.enso.languageserver.protocol.json.{
JsonConnectionControllerFactory,
JsonRpc
}
import org.enso.languageserver.runtime.{ContextRegistry, SuggestionsHandler}
import org.enso.languageserver.runtime.ContextRegistry
import org.enso.languageserver.search.SuggestionsHandler
import org.enso.languageserver.session.SessionRouter
import org.enso.languageserver.text.BufferRegistry
import org.enso.searcher.sql.{SqlDatabase, SqlSuggestionsRepo, SqlVersionsRepo}
import scala.concurrent.Await
import scala.concurrent.duration._
import scala.util.{Failure, Success}
class BaseServerTest extends JsonRpcServerTestKit {
val timeout: FiniteDuration = 3.seconds
val timeout: FiniteDuration = 10.seconds
val testContentRoot = Files.createTempDirectory(null).toRealPath()
val testContentRootId = UUID.randomUUID()
@ -81,8 +84,6 @@ class BaseServerTest extends JsonRpcServerTestKit {
val sqlDatabase = SqlDatabase(config.directories.suggestionsDatabaseFile)
val suggestionsRepo = new SqlSuggestionsRepo(sqlDatabase)(system.dispatcher)
val versionsRepo = new SqlVersionsRepo(sqlDatabase)(system.dispatcher)
Await.ready(suggestionsRepo.init, timeout)
Await.ready(versionsRepo.init, timeout)
val fileManager =
system.actorOf(FileManager.props(config, new FileSystem, zioExec))
@ -120,6 +121,26 @@ class BaseServerTest extends JsonRpcServerTestKit {
)
)
// initialize
val suggestionsRepoInit = suggestionsRepo.init
suggestionsRepoInit.onComplete {
case Success(()) =>
system.eventStream.publish(InitializedEvent.SuggestionsRepoInitialized)
case Failure(ex) =>
system.log.error(ex, "Failed to initialize Suggestions repo")
}(system.dispatcher)
val versionsRepoInit = versionsRepo.init
versionsRepoInit.onComplete {
case Success(()) =>
system.eventStream.publish(InitializedEvent.FileVersionsRepoInitialized)
case Failure(ex) =>
system.log.error(ex, "Failed to initialize FileVersions repo")
}(system.dispatcher)
Await.ready(suggestionsRepoInit, timeout)
Await.ready(versionsRepoInit, timeout)
new JsonConnectionControllerFactory(
bufferRegistry,
capabilityRouter,

View File

@ -2,7 +2,7 @@ package org.enso.languageserver.websocket.json
import io.circe.literal._
import org.enso.languageserver.refactoring.ProjectNameChangedEvent
import org.enso.languageserver.runtime.Suggestions
import org.enso.languageserver.search.Suggestions
import org.enso.languageserver.websocket.json.{SearchJsonMessages => json}
import org.enso.polyglot.runtime.Runtime.Api
import org.enso.testkit.FlakySpec

View File

@ -10,19 +10,11 @@ class SuggestionsHandlerTest extends BaseServerTest with FlakySpec {
"SuggestionsHandler" must {
"reply with error when uninitialized" in {
"stash messages when initializing" in {
val client = getInitialisedWsClient()
client.send(json.getSuggestionsDatabaseVersion(0))
client.expectJson(json"""
{ "jsonrpc" : "2.0",
"id" : 0,
"error" : {
"code" : 7002,
"message" : "Project not found in the root directory"
}
}
""")
client.expectNoMessage()
}
"get initial suggestions database version" in {

View File

@ -1397,7 +1397,7 @@ class RuntimeServerTest
)
context.pkg.rename("Foo")
context.send(Api.Request(requestId, Api.RenameProject("Test", "Foo")))
context.receive(1) should contain theSameElementsAs Seq(
context.receive(2) should contain theSameElementsAs Seq(
Api.Response(requestId, Api.ProjectRenamed("Foo"))
)
@ -1406,7 +1406,7 @@ class RuntimeServerTest
Api.Request(requestId, Api.RecomputeContextRequest(contextId, None))
)
context.receive(1) should contain(
context.receive(2) should contain(
Api.Response(requestId, Api.RecomputeContextResponse(contextId))
)
@ -1420,7 +1420,7 @@ class RuntimeServerTest
)
)
)
context.receive(5) should contain theSameElementsAs Seq(
context.receive(4) should contain theSameElementsAs Seq(
Api.Response(requestId, Api.RecomputeContextResponse(contextId)),
context.Main.Update.mainX(contextId),
context.Main.Update.mainY(contextId),

View File

@ -13,4 +13,4 @@
<root level="INFO">
<appender-ref ref="STDOUT" />
</root>
</configuration>
</configuration>

View File

@ -57,3 +57,4 @@ akka.http.server.remote-address-header = on
akka.http.server.websocket.periodic-keep-alive-max-idle = 1 second
akka.loglevel = "ERROR"
akka.test.timefactor = ${?CI_TEST_TIMEFACTOR}
searcher.db.numThreads = 1

View File

@ -5,6 +5,7 @@ import java.util.UUID
import org.enso.polyglot.Suggestion
import org.enso.searcher.{SuggestionEntry, SuggestionsRepo}
import slick.jdbc.SQLiteProfile
import slick.jdbc.SQLiteProfile.api._
import scala.concurrent.{ExecutionContext, Future}
@ -99,8 +100,14 @@ final class SqlSuggestionsRepo(db: SqlDatabase)(implicit ec: ExecutionContext)
db.run(insertBatchQuery(suggestions))
/** The query to initialize the repo. */
private def initQuery: DBIO[Unit] =
(Suggestions.schema ++ Arguments.schema ++ SuggestionsVersions.schema).createIfNotExists
private def initQuery: DBIO[Unit] = {
// Initialize schema suppressing errors. Workaround for slick/slick#1999.
def initSchema(schema: SQLiteProfile.SchemaDescription) =
schema.createIfNotExists.asTry >> DBIO.successful(())
val schemas =
Seq(Suggestions.schema, Arguments.schema, SuggestionsVersions.schema)
DBIO.sequence(schemas.map(initSchema)) >> DBIO.successful(())
}
/** The query to clean the repo. */
private def cleanQuery: DBIO[Unit] =

View File

@ -38,8 +38,10 @@ final class SqlVersionsRepo(db: SqlDatabase)(implicit ec: ExecutionContext)
db.close()
/** The query to initialize the repo. */
private def initQuery: DBIO[Unit] =
FileVersions.schema.createIfNotExists
private def initQuery: DBIO[Unit] = {
// Initialize schema suppressing errors. Workaround for slick/slick#1999.
FileVersions.schema.createIfNotExists.asTry >> DBIO.successful(())
}
/** The query to clean the repo. */
private def cleanQuery: DBIO[Unit] =

View File

@ -0,0 +1 @@
searcher.db.numThreads = 1

View File

@ -15,7 +15,7 @@ import scala.util.Random
class FileVersionsRepoTest extends AnyWordSpec with Matchers with RetrySpec {
val Timeout: FiniteDuration = 5.seconds
val Timeout: FiniteDuration = 20.seconds
val tmpdir: Path = {
val tmp = Files.createTempDirectory("versions-repo-test")
@ -44,6 +44,10 @@ class FileVersionsRepoTest extends AnyWordSpec with Matchers with RetrySpec {
"FileVersionsRepo" should {
"init idempotent" in withRepo { repo =>
Await.result(repo.init, Timeout)
}
"insert digest" taggedAs Retry in withRepo { repo =>
val file = new File("/foo/bar")
val digest = nextDigest()

View File

@ -14,7 +14,7 @@ import scala.concurrent.duration._
class SuggestionsRepoTest extends AnyWordSpec with Matchers with RetrySpec {
val Timeout: FiniteDuration = 10.seconds
val Timeout: FiniteDuration = 20.seconds
val tmpdir: Path = {
val tmp = Files.createTempDirectory("suggestions-repo-test")
@ -40,6 +40,10 @@ class SuggestionsRepoTest extends AnyWordSpec with Matchers with RetrySpec {
"SuggestionsRepo" should {
"init idempotent" taggedAs Retry in withRepo { repo =>
Await.result(repo.init, Timeout)
}
"get all suggestions" taggedAs Retry in withRepo { repo =>
val action =
for {