mirror of
https://github.com/enso-org/enso.git
synced 2024-11-23 08:08:34 +03:00
Add a schema version to the suggestions database (#1703)
This commit is contained in:
parent
3080d8f6f7
commit
ff54c07431
@ -9,10 +9,10 @@ import org.enso.languageserver.boot.resource.{
|
||||
TruffleContextInitialization
|
||||
}
|
||||
import org.enso.languageserver.data.DirectoriesConfig
|
||||
import org.enso.searcher.{FileVersionsRepo, SuggestionsRepo}
|
||||
import org.enso.searcher.sql.{SqlSuggestionsRepo, SqlVersionsRepo}
|
||||
import org.graalvm.polyglot.Context
|
||||
|
||||
import scala.concurrent.{ExecutionContext, Future}
|
||||
import scala.concurrent.ExecutionContext
|
||||
|
||||
/** Helper object for the initialization of the Language Server resources.
|
||||
* Creates the directories, initializes the databases, and the Truffle context.
|
||||
@ -31,13 +31,18 @@ object ResourcesInitialization {
|
||||
def apply(
|
||||
eventStream: EventStream,
|
||||
directoriesConfig: DirectoriesConfig,
|
||||
suggestionsRepo: SuggestionsRepo[Future],
|
||||
versionsRepo: FileVersionsRepo[Future],
|
||||
suggestionsRepo: SqlSuggestionsRepo,
|
||||
versionsRepo: SqlVersionsRepo,
|
||||
truffleContext: Context
|
||||
)(implicit ec: ExecutionContext): InitializationComponent = {
|
||||
val resources = Seq(
|
||||
new DirectoriesInitialization(directoriesConfig),
|
||||
new RepoInitialization(eventStream, suggestionsRepo, versionsRepo),
|
||||
new RepoInitialization(
|
||||
directoriesConfig,
|
||||
eventStream,
|
||||
suggestionsRepo,
|
||||
versionsRepo
|
||||
),
|
||||
new TruffleContextInitialization(eventStream, truffleContext)
|
||||
)
|
||||
new SequentialResourcesInitialization(resources)
|
||||
|
@ -1,23 +1,31 @@
|
||||
package org.enso.languageserver.boot.resource
|
||||
|
||||
import java.io.IOException
|
||||
import java.nio.file.{FileSystemException, Files, NoSuchFileException}
|
||||
|
||||
import akka.event.EventStream
|
||||
import org.apache.commons.io.FileUtils
|
||||
import org.enso.languageserver.data.DirectoriesConfig
|
||||
import org.enso.languageserver.event.InitializedEvent
|
||||
import org.enso.searcher.{FileVersionsRepo, SuggestionsRepo}
|
||||
import org.enso.searcher.sql.{SqlDatabase, SqlSuggestionsRepo, SqlVersionsRepo}
|
||||
import org.slf4j.LoggerFactory
|
||||
|
||||
import scala.concurrent.{ExecutionContext, Future}
|
||||
import scala.util.control.NonFatal
|
||||
import scala.util.{Failure, Success}
|
||||
|
||||
/** Initialization of the Language Server repositories.
|
||||
*
|
||||
* @param directoriesConfig configuration of language server directories
|
||||
* @param eventStream akka events stream
|
||||
* @param suggestionsRepo the suggestions repo
|
||||
* @param versionsRepo the versions repo
|
||||
*/
|
||||
class RepoInitialization(
|
||||
directoriesConfig: DirectoriesConfig,
|
||||
eventStream: EventStream,
|
||||
suggestionsRepo: SuggestionsRepo[Future],
|
||||
versionsRepo: FileVersionsRepo[Future]
|
||||
suggestionsRepo: SqlSuggestionsRepo,
|
||||
versionsRepo: SqlVersionsRepo
|
||||
)(implicit ec: ExecutionContext)
|
||||
extends InitializationComponent {
|
||||
|
||||
@ -26,31 +34,131 @@ class RepoInitialization(
|
||||
/** @inheritdoc */
|
||||
override def init(): Future[InitializationComponent.Initialized.type] =
|
||||
for {
|
||||
_ <- Future.sequence(Seq(suggestionsRepoInit, versionsRepoInit))
|
||||
_ <- suggestionsRepoInit
|
||||
_ <- versionsRepoInit
|
||||
} yield InitializationComponent.Initialized
|
||||
|
||||
private def suggestionsRepoInit: Future[Unit] = {
|
||||
val initAction = suggestionsRepo.init
|
||||
val initAction =
|
||||
for {
|
||||
_ <- Future {
|
||||
log.info(
|
||||
s"Initializing suggestions repo " +
|
||||
s"${directoriesConfig.suggestionsDatabaseFile}."
|
||||
)
|
||||
}
|
||||
_ <- suggestionsRepo.init.recoverWith { case NonFatal(error) =>
|
||||
recoverInitError(error, suggestionsRepo.db)
|
||||
}
|
||||
_ <- Future {
|
||||
log.info(
|
||||
s"Initialized Suggestions repo " +
|
||||
s"${directoriesConfig.suggestionsDatabaseFile}."
|
||||
)
|
||||
}
|
||||
} yield ()
|
||||
initAction.onComplete {
|
||||
case Success(()) =>
|
||||
eventStream.publish(InitializedEvent.SuggestionsRepoInitialized)
|
||||
case Failure(ex) =>
|
||||
log.error("Failed to initialize SQL suggestions repo", ex)
|
||||
log.error(
|
||||
s"Failed to initialize SQL suggestions repo " +
|
||||
s"${directoriesConfig.suggestionsDatabaseFile}. " +
|
||||
s"${ex.getMessage}"
|
||||
)
|
||||
}
|
||||
log.info("Initialized Suggestions repo.")
|
||||
initAction
|
||||
}
|
||||
|
||||
private def versionsRepoInit: Future[Unit] = {
|
||||
val initAction = versionsRepo.init
|
||||
val initAction =
|
||||
for {
|
||||
_ <- Future {
|
||||
log.info(
|
||||
s"Initializing versions repo " +
|
||||
s"${directoriesConfig.suggestionsDatabaseFile}."
|
||||
)
|
||||
}
|
||||
_ <- versionsRepo.init
|
||||
_ <- Future {
|
||||
log.info(
|
||||
s"Initialized Versions repo " +
|
||||
s"${directoriesConfig.suggestionsDatabaseFile}."
|
||||
)
|
||||
}
|
||||
} yield ()
|
||||
initAction.onComplete {
|
||||
case Success(()) =>
|
||||
eventStream.publish(InitializedEvent.FileVersionsRepoInitialized)
|
||||
case Failure(ex) =>
|
||||
log.error("Failed to initialize versions repo", ex)
|
||||
log.error(
|
||||
s"Failed to initialize SQL versions repo " +
|
||||
s"${directoriesConfig.suggestionsDatabaseFile}. " +
|
||||
s"${ex.getMessage}"
|
||||
)
|
||||
}
|
||||
log.info("Initialized Versions repo.")
|
||||
initAction
|
||||
}
|
||||
|
||||
private def recoverInitError(
|
||||
error: Throwable,
|
||||
db: SqlDatabase
|
||||
): Future[Unit] =
|
||||
for {
|
||||
_ <- Future {
|
||||
log.warn(
|
||||
s"Failed to initialize the suggestions database " +
|
||||
s"${directoriesConfig.suggestionsDatabaseFile}. " +
|
||||
s"${error.getMessage}"
|
||||
)
|
||||
}
|
||||
_ <- Future(db.close())
|
||||
_ <- clearDatabaseFile()
|
||||
_ <- Future(db.open())
|
||||
_ <- Future {
|
||||
log.info("Retrying database initialization.")
|
||||
}
|
||||
_ <- suggestionsRepo.init
|
||||
} yield ()
|
||||
|
||||
private def clearDatabaseFile(retries: Int = 0): Future[Unit] = {
|
||||
Future {
|
||||
log.info(s"Clear database file. Attempt #${retries + 1}.")
|
||||
Files.delete(directoriesConfig.suggestionsDatabaseFile.toPath)
|
||||
}.recoverWith {
|
||||
case _: NoSuchFileException =>
|
||||
log.warn(
|
||||
s"Failed to delete the database file. Attempt #${retries + 1}. " +
|
||||
s"File does not exist " +
|
||||
s"${directoriesConfig.suggestionsDatabaseFile}"
|
||||
)
|
||||
Future.successful(())
|
||||
case error: FileSystemException =>
|
||||
log.error(
|
||||
s"Failed to delete the database file. Attempt #${retries + 1}." +
|
||||
s"The file will be removed during the shutdown. ${error.getMessage}."
|
||||
)
|
||||
sys.addShutdownHook(
|
||||
FileUtils.deleteQuietly(directoriesConfig.suggestionsDatabaseFile)
|
||||
)
|
||||
Future.failed(error)
|
||||
case error: IOException =>
|
||||
log.error(
|
||||
s"Failed to delete the database file. Attempt #${retries + 1}. " +
|
||||
s"${error.getMessage}"
|
||||
)
|
||||
if (retries < RepoInitialization.MaxRetries) {
|
||||
Thread.sleep(1000)
|
||||
clearDatabaseFile(retries + 1)
|
||||
} else {
|
||||
Future.failed(error)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
object RepoInitialization {
|
||||
|
||||
val MaxRetries = 3
|
||||
}
|
||||
|
@ -402,6 +402,7 @@ final class SuggestionsHandler(
|
||||
}
|
||||
.pipeTo(self)
|
||||
context.become(verifying(name, graph))
|
||||
unstashAll()
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1,6 +1,6 @@
|
||||
package org.enso.languageserver.text
|
||||
|
||||
import akka.actor.{Actor, ActorLogging, ActorRef, Props, Terminated}
|
||||
import akka.actor.{Actor, ActorLogging, ActorRef, Props, Stash, Terminated}
|
||||
import org.enso.languageserver.capability.CapabilityProtocol.{
|
||||
AcquireCapability,
|
||||
CapabilityAcquisitionBadRequest,
|
||||
@ -8,6 +8,7 @@ import org.enso.languageserver.capability.CapabilityProtocol.{
|
||||
ReleaseCapability
|
||||
}
|
||||
import org.enso.languageserver.data.{CanEdit, CapabilityRegistration}
|
||||
import org.enso.languageserver.event.InitializedEvent
|
||||
import org.enso.languageserver.filemanager.Path
|
||||
import org.enso.languageserver.monitoring.MonitoringProtocol.{Ping, Pong}
|
||||
import org.enso.languageserver.util.UnhandledLogging
|
||||
@ -66,10 +67,27 @@ class BufferRegistry(
|
||||
)(implicit
|
||||
versionCalculator: ContentBasedVersioning
|
||||
) extends Actor
|
||||
with Stash
|
||||
with ActorLogging
|
||||
with UnhandledLogging {
|
||||
|
||||
override def receive: Receive = running(Map.empty)
|
||||
override def preStart(): Unit = {
|
||||
log.info("Starting initialization.")
|
||||
context.system.eventStream
|
||||
.subscribe(self, InitializedEvent.FileVersionsRepoInitialized.getClass)
|
||||
}
|
||||
|
||||
override def receive: Receive = initializing
|
||||
|
||||
private def initializing: Receive = {
|
||||
case InitializedEvent.FileVersionsRepoInitialized =>
|
||||
log.info("Initiaized.")
|
||||
context.become(running(Map.empty))
|
||||
unstashAll()
|
||||
|
||||
case _ =>
|
||||
stash()
|
||||
}
|
||||
|
||||
private def running(registry: Map[Path, ActorRef]): Receive = {
|
||||
case Ping =>
|
||||
|
@ -0,0 +1,254 @@
|
||||
package org.enso.languageserver.boot.resource
|
||||
|
||||
import java.io.File
|
||||
import java.nio.file.{Files, StandardOpenOption}
|
||||
import java.util.UUID
|
||||
|
||||
import akka.actor.ActorSystem
|
||||
import akka.testkit._
|
||||
import org.apache.commons.io.FileUtils
|
||||
import org.enso.languageserver.data._
|
||||
import org.enso.languageserver.event.InitializedEvent
|
||||
import org.enso.searcher.sql.{
|
||||
SchemaVersion,
|
||||
SqlDatabase,
|
||||
SqlSuggestionsRepo,
|
||||
SqlVersionsRepo
|
||||
}
|
||||
import org.enso.testkit.FlakySpec
|
||||
import org.scalatest.BeforeAndAfterAll
|
||||
import org.scalatest.matchers.should.Matchers
|
||||
import org.scalatest.wordspec.AnyWordSpecLike
|
||||
import org.sqlite.SQLiteException
|
||||
|
||||
import scala.concurrent.Await
|
||||
import scala.concurrent.duration._
|
||||
|
||||
class RepoInitializationSpec
|
||||
extends TestKit(ActorSystem("TestSystem"))
|
||||
with ImplicitSender
|
||||
with AnyWordSpecLike
|
||||
with Matchers
|
||||
with BeforeAndAfterAll
|
||||
with FlakySpec {
|
||||
|
||||
import system.dispatcher
|
||||
|
||||
val Timeout: FiniteDuration = 10.seconds.dilated
|
||||
|
||||
override def afterAll(): Unit = {
|
||||
TestKit.shutdownActorSystem(system)
|
||||
}
|
||||
|
||||
"RepoInitialization" should {
|
||||
|
||||
"initialize repositories" in withDb {
|
||||
(config, suggestionsRepo, versionsRepo) =>
|
||||
system.eventStream.subscribe(self, classOf[InitializedEvent])
|
||||
|
||||
val component =
|
||||
new RepoInitialization(
|
||||
config.directories,
|
||||
system.eventStream,
|
||||
suggestionsRepo,
|
||||
versionsRepo
|
||||
)
|
||||
|
||||
val action =
|
||||
for {
|
||||
_ <- component.init()
|
||||
schemaVersion <- suggestionsRepo.getSchemaVersion
|
||||
} yield schemaVersion
|
||||
|
||||
val version = Await.result(action, Timeout)
|
||||
version shouldEqual SchemaVersion.CurrentVersion
|
||||
|
||||
expectMsgAllOf(
|
||||
InitializedEvent.SuggestionsRepoInitialized,
|
||||
InitializedEvent.FileVersionsRepoInitialized
|
||||
)
|
||||
}
|
||||
|
||||
"recreate suggestion database when schema version is incorrect" in withDb {
|
||||
(config, suggestionsRepo, versionsRepo) =>
|
||||
system.eventStream.subscribe(self, classOf[InitializedEvent])
|
||||
|
||||
val testSchemaVersion = Long.MaxValue
|
||||
val component =
|
||||
new RepoInitialization(
|
||||
config.directories,
|
||||
system.eventStream,
|
||||
suggestionsRepo,
|
||||
versionsRepo
|
||||
)
|
||||
|
||||
val action =
|
||||
for {
|
||||
_ <- suggestionsRepo.init
|
||||
_ <- suggestionsRepo.setSchemaVersion(testSchemaVersion)
|
||||
_ <- component.init()
|
||||
version <- suggestionsRepo.getSchemaVersion
|
||||
} yield version
|
||||
|
||||
val version = Await.result(action, Timeout)
|
||||
version shouldEqual SchemaVersion.CurrentVersion
|
||||
|
||||
expectMsgAllOf(
|
||||
InitializedEvent.SuggestionsRepoInitialized,
|
||||
InitializedEvent.FileVersionsRepoInitialized
|
||||
)
|
||||
}
|
||||
|
||||
"recreate suggestion database when schema version is empty" in withDb {
|
||||
(config, suggestionsRepo, versionsRepo) =>
|
||||
system.eventStream.subscribe(self, classOf[InitializedEvent])
|
||||
|
||||
val component =
|
||||
new RepoInitialization(
|
||||
config.directories,
|
||||
system.eventStream,
|
||||
suggestionsRepo,
|
||||
versionsRepo
|
||||
)
|
||||
|
||||
// initialize
|
||||
val init =
|
||||
for {
|
||||
_ <- component.init()
|
||||
version <- suggestionsRepo.getSchemaVersion
|
||||
} yield version
|
||||
|
||||
val version1 = Await.result(init, Timeout)
|
||||
version1 shouldEqual SchemaVersion.CurrentVersion
|
||||
expectMsgAllOf(
|
||||
InitializedEvent.SuggestionsRepoInitialized,
|
||||
InitializedEvent.FileVersionsRepoInitialized
|
||||
)
|
||||
|
||||
// remove schema and re-initialize
|
||||
val action =
|
||||
for {
|
||||
_ <- suggestionsRepo.clearSchemaVersion
|
||||
_ <- component.init()
|
||||
version <- suggestionsRepo.getSchemaVersion
|
||||
} yield version
|
||||
|
||||
val version2 = Await.result(action, Timeout)
|
||||
version2 shouldEqual SchemaVersion.CurrentVersion
|
||||
expectMsgAllOf(
|
||||
InitializedEvent.SuggestionsRepoInitialized,
|
||||
InitializedEvent.FileVersionsRepoInitialized
|
||||
)
|
||||
}
|
||||
|
||||
"recreate corrupted suggestion database file" taggedAs Flaky in withConfig {
|
||||
config =>
|
||||
// initialize
|
||||
withRepos(config) { (suggestionsRepo, versionsRepo) =>
|
||||
val component =
|
||||
new RepoInitialization(
|
||||
config.directories,
|
||||
system.eventStream,
|
||||
suggestionsRepo,
|
||||
versionsRepo
|
||||
)
|
||||
|
||||
val init =
|
||||
for {
|
||||
_ <- component.init()
|
||||
version <- suggestionsRepo.getSchemaVersion
|
||||
} yield version
|
||||
|
||||
val version1 = Await.result(init, Timeout)
|
||||
version1 shouldEqual SchemaVersion.CurrentVersion
|
||||
}
|
||||
|
||||
// corrupt
|
||||
val bytes: Array[Byte] = Array(1, 2, 3)
|
||||
Files.delete(config.directories.suggestionsDatabaseFile.toPath)
|
||||
Files.write(
|
||||
config.directories.suggestionsDatabaseFile.toPath,
|
||||
bytes,
|
||||
StandardOpenOption.CREATE
|
||||
)
|
||||
withRepos(config) { (suggestionsRepo, _) =>
|
||||
an[SQLiteException] should be thrownBy Await.result(
|
||||
suggestionsRepo.getSchemaVersion,
|
||||
Timeout
|
||||
)
|
||||
}
|
||||
|
||||
// re-initialize
|
||||
withRepos(config) { (suggestionsRepo, versionsRepo) =>
|
||||
val component =
|
||||
new RepoInitialization(
|
||||
config.directories,
|
||||
system.eventStream,
|
||||
suggestionsRepo,
|
||||
versionsRepo
|
||||
)
|
||||
|
||||
val action =
|
||||
for {
|
||||
_ <- component.init()
|
||||
version <- suggestionsRepo.getSchemaVersion
|
||||
} yield version
|
||||
|
||||
val version2 = Await.result(action, Timeout)
|
||||
version2 shouldEqual SchemaVersion.CurrentVersion
|
||||
expectMsgAllOf(
|
||||
InitializedEvent.SuggestionsRepoInitialized,
|
||||
InitializedEvent.FileVersionsRepoInitialized
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
def newConfig(root: File): Config = {
|
||||
Config(
|
||||
Map(UUID.randomUUID() -> root),
|
||||
FileManagerConfig(timeout = 3.seconds.dilated),
|
||||
PathWatcherConfig(),
|
||||
ExecutionContextConfig(requestTimeout = 3.seconds.dilated),
|
||||
DirectoriesConfig.initialize(root)
|
||||
)
|
||||
}
|
||||
|
||||
def withConfig(test: Config => Any): Unit = {
|
||||
val testContentRoot = Files.createTempDirectory(null).toRealPath()
|
||||
sys.addShutdownHook(FileUtils.deleteQuietly(testContentRoot.toFile))
|
||||
val config = newConfig(testContentRoot.toFile)
|
||||
|
||||
test(config)
|
||||
}
|
||||
|
||||
def withRepos(
|
||||
config: Config
|
||||
)(test: (SqlSuggestionsRepo, SqlVersionsRepo) => Any): Unit = {
|
||||
val sqlDatabase = SqlDatabase(
|
||||
config.directories.suggestionsDatabaseFile.toString
|
||||
)
|
||||
val suggestionsRepo = new SqlSuggestionsRepo(sqlDatabase)
|
||||
val versionsRepo = new SqlVersionsRepo(sqlDatabase)
|
||||
|
||||
try test(suggestionsRepo, versionsRepo)
|
||||
finally {
|
||||
sqlDatabase.close()
|
||||
}
|
||||
}
|
||||
|
||||
def withDb(
|
||||
test: (
|
||||
Config,
|
||||
SqlSuggestionsRepo,
|
||||
SqlVersionsRepo
|
||||
) => Any
|
||||
): Unit = {
|
||||
withConfig { config =>
|
||||
withRepos(config) { (suggestionsRepo, versionsRepo) =>
|
||||
test(config, suggestionsRepo, versionsRepo)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
@ -408,12 +408,16 @@ class ContextEventsListenerSpec
|
||||
updatesSendRate
|
||||
)
|
||||
)
|
||||
repo.init.onComplete {
|
||||
val repoInit = repo.init
|
||||
repoInit.onComplete {
|
||||
case Success(()) =>
|
||||
system.eventStream.publish(InitializedEvent.SuggestionsRepoInitialized)
|
||||
case Failure(ex) =>
|
||||
system.log.error(ex, "Failed to initialize Suggestions repo")
|
||||
system.log.error(
|
||||
s"ContextEventsListenerSpec failed to initialize Suggestions repo. $ex"
|
||||
)
|
||||
}
|
||||
Await.ready(repoInit, Timeout)
|
||||
|
||||
try test(clientId, contextId, repo, router, listener)
|
||||
finally {
|
||||
|
@ -105,7 +105,12 @@ class BaseServerTest extends JsonRpcServerTestKit {
|
||||
|
||||
val initializationComponent = SequentialResourcesInitialization(
|
||||
new DirectoriesInitialization(config.directories),
|
||||
new RepoInitialization(system.eventStream, suggestionsRepo, versionsRepo)
|
||||
new RepoInitialization(
|
||||
config.directories,
|
||||
system.eventStream,
|
||||
suggestionsRepo,
|
||||
versionsRepo
|
||||
)
|
||||
)
|
||||
|
||||
override def clientControllerFactory: ClientControllerFactory = {
|
||||
|
@ -3,7 +3,7 @@ searcher {
|
||||
url = "jdbc:sqlite::memory:"
|
||||
driver = "org.sqlite.JDBC"
|
||||
connectionPool = disabled
|
||||
properties.journal_mode = "wal"
|
||||
properties.journal_mode = "memory"
|
||||
properties.locking_mode = "EXCLUSIVE"
|
||||
numThreads = 1
|
||||
}
|
||||
|
@ -13,8 +13,8 @@ trait Database[F[_], G[_]] {
|
||||
*/
|
||||
def run[A](query: F[A]): G[A]
|
||||
|
||||
/** Run the database query in one transaction */
|
||||
def transaction[A](query: F[A]): G[A]
|
||||
/** Open the database. */
|
||||
def open(): Unit
|
||||
|
||||
/** Close the database. */
|
||||
def close(): Unit
|
||||
|
@ -0,0 +1,11 @@
|
||||
package org.enso.searcher.sql
|
||||
|
||||
/** An error indicating that the database has invalid schema version.
|
||||
*
|
||||
* @param version the database schema version.
|
||||
*/
|
||||
class InvalidSchemaVersion(val version: Long)
|
||||
extends RuntimeException(
|
||||
s"Database schema version '$version' is different from the application " +
|
||||
s"schema version '${SchemaVersion.CurrentVersion}'."
|
||||
)
|
@ -5,7 +5,6 @@ import org.enso.searcher.Database
|
||||
import org.enso.searcher.sqlite.LockingMode
|
||||
import slick.dbio.DBIO
|
||||
import slick.jdbc.SQLiteProfile
|
||||
import slick.jdbc.SQLiteProfile.api._
|
||||
|
||||
import scala.concurrent.Future
|
||||
|
||||
@ -16,20 +15,26 @@ import scala.concurrent.Future
|
||||
final class SqlDatabase(config: Option[Config] = None)
|
||||
extends Database[DBIO, Future] {
|
||||
|
||||
val db = SQLiteProfile.backend.Database
|
||||
.forConfig(SqlDatabase.configPath, config.orNull)
|
||||
private var db: SQLiteProfile.backend.Database = _
|
||||
|
||||
open()
|
||||
|
||||
/** @inheritdoc */
|
||||
override def run[A](query: DBIO[A]): Future[A] =
|
||||
db.run(query)
|
||||
|
||||
/** @inheritdoc */
|
||||
override def transaction[A](query: DBIO[A]): Future[A] =
|
||||
db.run(query.transactionally)
|
||||
override def open(): Unit =
|
||||
this.synchronized {
|
||||
db = SQLiteProfile.backend.Database
|
||||
.forConfig(SqlDatabase.configPath, config.orNull)
|
||||
}
|
||||
|
||||
/** @inheritdoc */
|
||||
override def close(): Unit =
|
||||
db.close()
|
||||
this.synchronized {
|
||||
db.close()
|
||||
}
|
||||
}
|
||||
|
||||
object SqlDatabase {
|
||||
|
@ -1,5 +1,8 @@
|
||||
package org.enso.searcher.sql
|
||||
|
||||
import java.io.File
|
||||
import java.util.UUID
|
||||
|
||||
import org.enso.polyglot.Suggestion
|
||||
import org.enso.polyglot.data.Tree
|
||||
import org.enso.polyglot.runtime.Runtime.Api.{
|
||||
@ -10,18 +13,18 @@ import org.enso.polyglot.runtime.Runtime.Api.{
|
||||
}
|
||||
import org.enso.searcher.data.QueryResult
|
||||
import org.enso.searcher.{SuggestionEntry, SuggestionsRepo}
|
||||
import slick.jdbc.SQLiteProfile
|
||||
import slick.jdbc.SQLiteProfile.api._
|
||||
import slick.jdbc.meta.MTable
|
||||
import slick.relational.RelationalProfile
|
||||
|
||||
import java.io.File
|
||||
import java.util.UUID
|
||||
import scala.collection.immutable.HashMap
|
||||
import scala.concurrent.{ExecutionContext, Future}
|
||||
import scala.util.{Failure, Success}
|
||||
|
||||
/** The object for accessing the suggestions database. */
|
||||
final class SqlSuggestionsRepo(db: SqlDatabase)(implicit ec: ExecutionContext)
|
||||
extends SuggestionsRepo[Future] {
|
||||
final class SqlSuggestionsRepo(val db: SqlDatabase)(implicit
|
||||
ec: ExecutionContext
|
||||
) extends SuggestionsRepo[Future] {
|
||||
|
||||
/** The query returning the arguments joined with the corresponding
|
||||
* suggestions.
|
||||
@ -155,6 +158,25 @@ final class SqlSuggestionsRepo(db: SqlDatabase)(implicit ec: ExecutionContext)
|
||||
def close(): Unit =
|
||||
db.close()
|
||||
|
||||
/** Get the database schema version.
|
||||
*
|
||||
* @return the schema version of the database
|
||||
*/
|
||||
def getSchemaVersion: Future[Long] =
|
||||
db.run(currentSchemaVersionQuery)
|
||||
|
||||
/** Set the database schema version.
|
||||
*
|
||||
* @param version the database schema version
|
||||
* @return the schema version of the database
|
||||
*/
|
||||
def setSchemaVersion(version: Long): Future[Long] =
|
||||
db.run(setSchemaVersionQuery(version))
|
||||
|
||||
/** Remove the database schema version. */
|
||||
def clearSchemaVersion: Future[Unit] =
|
||||
db.run(clearSchemaVersionQuery)
|
||||
|
||||
/** Insert suggestions in a batch.
|
||||
*
|
||||
* @param suggestions the list of suggestions to insert
|
||||
@ -165,12 +187,33 @@ final class SqlSuggestionsRepo(db: SqlDatabase)(implicit ec: ExecutionContext)
|
||||
|
||||
/** The query to initialize the repo. */
|
||||
private def initQuery: DBIO[Unit] = {
|
||||
// Initialize schema suppressing errors. Workaround for slick/slick#1999.
|
||||
def initSchema(schema: SQLiteProfile.SchemaDescription) =
|
||||
schema.createIfNotExists.asTry >> DBIO.successful(())
|
||||
val schemas =
|
||||
Seq(Suggestions.schema, Arguments.schema, SuggestionsVersions.schema)
|
||||
DBIO.sequence(schemas.map(initSchema)) >> DBIO.successful(())
|
||||
type RelationalTable[A] = RelationalProfile#Table[A]
|
||||
def checkVersion(version: Long) =
|
||||
if (version == SchemaVersion.CurrentVersion) {
|
||||
DBIO.successful(())
|
||||
} else {
|
||||
DBIO.failed(new InvalidSchemaVersion(version))
|
||||
}
|
||||
def createSchema(table: TableQuery[RelationalTable[_]]) =
|
||||
for {
|
||||
tables <- MTable.getTables(table.shaped.value.tableName)
|
||||
_ <- if (tables.isEmpty) table.schema.create else DBIO.successful(())
|
||||
} yield ()
|
||||
|
||||
val tables: Seq[TableQuery[RelationalTable[_]]] =
|
||||
Seq(Suggestions, Arguments, SuggestionsVersions, SchemaVersion)
|
||||
.asInstanceOf[Seq[TableQuery[RelationalTable[_]]]]
|
||||
val initSchemas =
|
||||
for {
|
||||
_ <- DBIO.sequence(tables.map(createSchema))
|
||||
version <- initSchemaVersionQuery
|
||||
} yield version
|
||||
|
||||
for {
|
||||
versionAttempt <- currentSchemaVersionQuery.asTry
|
||||
version <- versionAttempt.fold(_ => initSchemas, DBIO.successful)
|
||||
_ <- checkVersion(version)
|
||||
} yield ()
|
||||
}
|
||||
|
||||
/** The query to clean the repo. */
|
||||
@ -713,6 +756,37 @@ final class SqlSuggestionsRepo(db: SqlDatabase)(implicit ec: ExecutionContext)
|
||||
incrementQuery
|
||||
}
|
||||
|
||||
/** The query to get current version of the repo. */
|
||||
private def currentSchemaVersionQuery: DBIO[Long] = {
|
||||
for {
|
||||
versionOpt <- SchemaVersion.result.headOption
|
||||
} yield versionOpt.flatMap(_.id).getOrElse(0L)
|
||||
}
|
||||
|
||||
/** The query to initialize the [[SchemaVersion]] table. */
|
||||
private def initSchemaVersionQuery: DBIO[Long] = {
|
||||
setSchemaVersionQuery(SchemaVersion.CurrentVersion)
|
||||
}
|
||||
|
||||
/** The query setting the schema version.
|
||||
*
|
||||
* @param version the schema version.
|
||||
* @return the current value of the schema version
|
||||
*/
|
||||
private def setSchemaVersionQuery(version: Long): DBIO[Long] = {
|
||||
val query = for {
|
||||
_ <- SchemaVersion.delete
|
||||
_ <- SchemaVersion += SchemaVersionRow(Some(version))
|
||||
} yield version
|
||||
query
|
||||
}
|
||||
|
||||
/** The query to delete the schema version. */
|
||||
private def clearSchemaVersionQuery: DBIO[Unit] =
|
||||
for {
|
||||
_ <- SchemaVersion.delete
|
||||
} yield ()
|
||||
|
||||
/** The query to insert suggestions in a batch.
|
||||
*
|
||||
* @param suggestions the list of suggestions to insert
|
||||
|
@ -5,6 +5,7 @@ import java.util
|
||||
|
||||
import org.enso.searcher.FileVersionsRepo
|
||||
import slick.jdbc.SQLiteProfile.api._
|
||||
import slick.jdbc.meta.MTable
|
||||
|
||||
import scala.concurrent.{ExecutionContext, Future}
|
||||
|
||||
@ -50,8 +51,11 @@ final class SqlVersionsRepo(db: SqlDatabase)(implicit ec: ExecutionContext)
|
||||
|
||||
/** The query to initialize the repo. */
|
||||
private def initQuery: DBIO[Unit] = {
|
||||
// Initialize schema suppressing errors. Workaround for slick/slick#1999.
|
||||
FileVersions.schema.createIfNotExists.asTry >> DBIO.successful(())
|
||||
val table = FileVersions
|
||||
for {
|
||||
tables <- MTable.getTables(table.shaped.value.tableName)
|
||||
_ <- if (tables.isEmpty) table.schema.create else DBIO.successful(())
|
||||
} yield ()
|
||||
}
|
||||
|
||||
/** The query to clean the repo. */
|
||||
|
@ -65,6 +65,12 @@ case class SuggestionRow(
|
||||
*/
|
||||
case class SuggestionsVersionRow(id: Option[Long])
|
||||
|
||||
/** A row in the schema_version table.
|
||||
*
|
||||
* @param id the row id
|
||||
*/
|
||||
case class SchemaVersionRow(id: Option[Long])
|
||||
|
||||
/** A row in the file_versions table
|
||||
*
|
||||
* @param path the file path
|
||||
@ -206,16 +212,6 @@ final class SuggestionsTable(tag: Tag)
|
||||
)
|
||||
}
|
||||
|
||||
/** The schema of the suggestions_version table. */
|
||||
@nowarn("msg=multiarg infix syntax")
|
||||
final class SuggestionsVersionTable(tag: Tag)
|
||||
extends Table[SuggestionsVersionRow](tag, "suggestions_version") {
|
||||
|
||||
def id = column[Long]("id", O.PrimaryKey, O.AutoInc)
|
||||
|
||||
def * = id.? <> (SuggestionsVersionRow.apply, SuggestionsVersionRow.unapply)
|
||||
}
|
||||
|
||||
/** The schema of the file_versions table. */
|
||||
@nowarn("msg=multiarg infix syntax")
|
||||
final class FileVersionsTable(tag: Tag)
|
||||
@ -227,10 +223,36 @@ final class FileVersionsTable(tag: Tag)
|
||||
def * = (path, digest) <> (FileVersionRow.tupled, FileVersionRow.unapply)
|
||||
}
|
||||
|
||||
/** The schema of the suggestions_version table. */
|
||||
@nowarn("msg=multiarg infix syntax")
|
||||
final class SuggestionsVersionTable(tag: Tag)
|
||||
extends Table[SuggestionsVersionRow](tag, "suggestions_version") {
|
||||
|
||||
def id = column[Long]("id", O.PrimaryKey, O.AutoInc)
|
||||
|
||||
def * = id.? <> (SuggestionsVersionRow.apply, SuggestionsVersionRow.unapply)
|
||||
}
|
||||
|
||||
/** The schema of the schema_version table. */
|
||||
@nowarn("msg=multiarg infix syntax")
|
||||
final class SchemaVersionTable(tag: Tag)
|
||||
extends Table[SchemaVersionRow](tag, "schema_version") {
|
||||
|
||||
def id = column[Long]("id", O.PrimaryKey)
|
||||
|
||||
def * = id.? <> (SchemaVersionRow.apply, SchemaVersionRow.unapply)
|
||||
}
|
||||
|
||||
object Arguments extends TableQuery(new ArgumentsTable(_))
|
||||
|
||||
object Suggestions extends TableQuery(new SuggestionsTable(_))
|
||||
|
||||
object FileVersions extends TableQuery(new FileVersionsTable(_))
|
||||
|
||||
object SuggestionsVersions extends TableQuery(new SuggestionsVersionTable(_))
|
||||
|
||||
object FileVersions extends TableQuery(new FileVersionsTable(_))
|
||||
object SchemaVersion extends TableQuery(new SchemaVersionTable(_)) {
|
||||
|
||||
/** The current schema version. */
|
||||
val CurrentVersion: Long = 1
|
||||
}
|
||||
|
@ -45,6 +45,19 @@ class SuggestionsRepoTest extends AnyWordSpec with Matchers with RetrySpec {
|
||||
Await.result(repo.init, Timeout)
|
||||
}
|
||||
|
||||
"check the schema version when init" taggedAs Retry in withRepo { repo =>
|
||||
val wrongSchemaVersion = Long.MinValue
|
||||
val action =
|
||||
for {
|
||||
version <- repo.setSchemaVersion(wrongSchemaVersion)
|
||||
_ <- repo.init
|
||||
} yield version
|
||||
|
||||
val thrown =
|
||||
the[InvalidSchemaVersion] thrownBy Await.result(action, Timeout)
|
||||
thrown.version shouldEqual wrongSchemaVersion
|
||||
}
|
||||
|
||||
"get all suggestions" taggedAs Retry in withRepo { repo =>
|
||||
val action =
|
||||
for {
|
||||
|
Loading…
Reference in New Issue
Block a user