Update ExpressionValueUpdate Notification API (#1033)

ExpressionValueUpdate notification contains information about the
executed object. To have the full information about this object, IDE
needs the id of the corresponding suggestion. PR updates the
notification adding the suggestion id of the executed object.

- update: public API for ExpressionValueUpdate notification
- update: ContextEventsListener groups ExpressionValueUpdates and sends
  them in a batch
- update: ContextRegistry listens to the notifications from runtime and
  routes them to the corresponding listener.
- test: add ContextEventsListenerSpec
This commit is contained in:
Dmitry Bushev 2020-07-28 23:24:04 +03:00 committed by GitHub
parent 7dcfef0503
commit 93c4453299
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 491 additions and 208 deletions

View File

@ -212,10 +212,8 @@ interface MethodPointer {
```typescript
interface ExpressionValueUpdate {
id: ExpressionId;
type?: String;
shortValue?: String;
methodCall?: MethodPointer;
/** The updated suggestion id */
suggestionId: number;
}
```
@ -289,7 +287,6 @@ type SuggestionEntry
| SuggestionEntryFunction
// A local value
| SuggestionEntryLocal;
}
interface SuggestionEntryAtom {
name: string;

View File

@ -118,7 +118,12 @@ class MainModule(serverConfig: LanguageServerConfig) {
lazy val contextRegistry =
system.actorOf(
ContextRegistry
.props(languageServerConfig, runtimeConnector, sessionRouter),
.props(
suggestionsRepo,
languageServerConfig,
runtimeConnector,
sessionRouter
),
"context-registry"
)

View File

@ -1,7 +1,7 @@
package org.enso.languageserver.runtime
import akka.actor.{Actor, ActorLogging, ActorRef, Props}
import org.enso.languageserver.data.Config
import akka.pattern.pipe
import org.enso.languageserver.runtime.ContextRegistryProtocol.{
VisualisationContext,
VisualisationUpdate
@ -14,38 +14,53 @@ import org.enso.languageserver.session.SessionRouter.{
}
import org.enso.languageserver.util.UnhandledLogging
import org.enso.polyglot.runtime.Runtime.Api
import org.enso.searcher.SuggestionsRepo
import scala.concurrent.Future
import scala.concurrent.duration._
/**
* EventListener listens event stream for the notifications from the runtime
* and send updates to the client. The listener is created per context, and
* only handles the notifications with the given `contextId`.
*
* @param config configuration
* Expression updates are collected and sent to the user in a batch.
*
* @param repo the suggestions repo
* @param rpcSession reference to the client
* @param contextId exectuion context identifier
* @param sessionRouter the session router
* @param updatesSendRate how often send the updates to the user
*/
final class ContextEventsListener(
config: Config,
repo: SuggestionsRepo[Future],
rpcSession: JsonSession,
contextId: ContextId,
sessionRouter: ActorRef
sessionRouter: ActorRef,
updatesSendRate: FiniteDuration
) extends Actor
with ActorLogging
with UnhandledLogging {
import ContextEventsListener.RunExpressionUpdates
import context.dispatcher
override def preStart(): Unit = {
context.system.eventStream
.subscribe(self, classOf[Api.ExpressionValuesComputed])
context.system.eventStream
.subscribe(self, classOf[Api.VisualisationUpdate])
context.system.eventStream
.subscribe(self, classOf[Api.ExecutionFailed])
context.system.eventStream
.subscribe(self, classOf[Api.VisualisationEvaluationFailed])
if (updatesSendRate.length > 0) {
context.system.scheduler.scheduleWithFixedDelay(
updatesSendRate,
updatesSendRate,
self,
RunExpressionUpdates
)
}
}
override def receive: Receive = {
override def receive: Receive = withState(Vector())
def withState(
expressionUpdates: Vector[Api.ExpressionValueUpdate]
): Receive = {
case Api.VisualisationUpdate(ctx, data) if ctx.contextId == contextId =>
val payload =
VisualisationUpdate(
@ -59,20 +74,7 @@ final class ContextEventsListener(
sessionRouter ! DeliverToBinaryController(rpcSession.clientId, payload)
case Api.ExpressionValuesComputed(`contextId`, apiUpdates) =>
val updates = apiUpdates.flatMap { update =>
toRuntimeUpdate(update) match {
case None =>
log.error(s"Failed to convert $update")
None
case runtimeUpdate =>
runtimeUpdate
}
}
rpcSession.rpcController ! ContextRegistryProtocol
.ExpressionValuesComputedNotification(
contextId,
updates
)
context.become(withState(expressionUpdates :++ apiUpdates))
case Api.ExecutionFailed(`contextId`, msg) =>
val payload =
@ -86,72 +88,60 @@ final class ContextEventsListener(
sessionRouter ! DeliverToBinaryController(rpcSession.clientId, payload)
case _: Api.ExpressionValuesComputed =>
case _: Api.VisualisationUpdate =>
case _: Api.ExecutionFailed =>
case _: Api.VisualisationEvaluationFailed =>
}
private def toRuntimeUpdate(
update: Api.ExpressionValueUpdate
): Option[ExpressionValueUpdate] = {
update.methodCall match {
case None =>
Some(
ExpressionValueUpdate(
update.expressionId,
update.expressionType,
update.shortValue,
None
)
)
case Some(methodCall) =>
toRuntimePointer(methodCall).map { pointer =>
ExpressionValueUpdate(
update.expressionId,
update.expressionType,
update.shortValue,
Some(pointer)
)
case RunExpressionUpdates if expressionUpdates.nonEmpty =>
val updateIds = expressionUpdates.map(_.expressionId)
repo
.getAllByExternalIds(updateIds)
.map { suggestionIds =>
val valueUpdates = updateIds.zip(suggestionIds).flatMap {
case (_, Some(suggestionId)) =>
Some(ExpressionValueUpdate(suggestionId))
case (id, None) =>
log.error("Unable to find suggestion with expression id: {}", id)
None
}
val payload =
ContextRegistryProtocol.ExpressionValuesComputedNotification(
contextId,
valueUpdates
)
DeliverToJsonController(rpcSession.clientId, payload)
}
}
.pipeTo(sessionRouter)
context.become(withState(Vector()))
case RunExpressionUpdates if expressionUpdates.isEmpty =>
}
private def toRuntimePointer(
pointer: Api.MethodPointer
): Option[MethodPointer] =
config.findRelativePath(pointer.file).map { relativePath =>
MethodPointer(
file = relativePath,
definedOnType = pointer.definedOnType,
name = pointer.name
)
}
}
object ContextEventsListener {
/** The action to process the expression updates. */
case object RunExpressionUpdates
/**
* Creates a configuration object used to create a [[ContextEventsListener]].
*
* @param config configuration
* @param repo the suggestions repo
* @param rpcSession reference to the client
* @param contextId exectuion context identifier
* @param sessionRouter the session router
* @param updatesSendRate how often send the updates to the user
*/
def props(
config: Config,
repo: SuggestionsRepo[Future],
rpcSession: JsonSession,
contextId: ContextId,
sessionRouter: ActorRef
sessionRouter: ActorRef,
updatesSendRate: FiniteDuration = 1.second
): Props =
Props(
new ContextEventsListener(
config,
repo,
rpcSession,
contextId,
sessionRouter: ActorRef
sessionRouter: ActorRef,
updatesSendRate
)
)

View File

@ -14,6 +14,9 @@ import org.enso.languageserver.runtime.handler._
import org.enso.languageserver.util.UnhandledLogging
import org.enso.polyglot.runtime.Runtime.Api
import org.enso.polyglot.runtime.Runtime.Api.ContextId
import org.enso.searcher.SuggestionsRepo
import scala.concurrent.Future
/**
* Registry handles execution context requests and communicates with runtime
@ -49,11 +52,13 @@ import org.enso.polyglot.runtime.Runtime.Api.ContextId
*
* }}}
*
* @param repo the suggestions repo
* @param config configuration
* @param runtime reference to the [[RuntimeConnector]]
* @param sessionRouter the session router
*/
final class ContextRegistry(
repo: SuggestionsRepo[Future],
config: Config,
runtime: ActorRef,
sessionRouter: ActorRef
@ -65,6 +70,17 @@ final class ContextRegistry(
private val timeout = config.executionContext.requestTimeout
override def preStart(): Unit = {
context.system.eventStream
.subscribe(self, classOf[Api.ExpressionValuesComputed])
context.system.eventStream
.subscribe(self, classOf[Api.VisualisationUpdate])
context.system.eventStream
.subscribe(self, classOf[Api.ExecutionFailed])
context.system.eventStream
.subscribe(self, classOf[Api.VisualisationEvaluationFailed])
}
override def receive: Receive =
withStore(ContextRegistry.Store())
@ -72,13 +88,27 @@ final class ContextRegistry(
case Ping =>
sender() ! Pong
case update: Api.ExpressionValuesComputed =>
store.getListener(update.contextId).foreach(_ ! update)
case update: Api.VisualisationUpdate =>
store
.getListener(update.visualisationContext.contextId)
.foreach(_ ! update)
case update: Api.ExecutionFailed =>
store.getListener(update.contextId).foreach(_ ! update)
case update: Api.VisualisationEvaluationFailed =>
store.getListener(update.contextId).foreach(_ ! update)
case CreateContextRequest(client) =>
val contextId = UUID.randomUUID()
val handler =
context.actorOf(CreateContextHandler.props(timeout, runtime))
val listener =
context.actorOf(
ContextEventsListener.props(config, client, contextId, sessionRouter)
ContextEventsListener.props(repo, client, contextId, sessionRouter)
)
handler.forward(Api.CreateContextRequest(contextId))
context.become(
@ -152,10 +182,10 @@ final class ContextRegistry(
}
case DetachVisualisation(
clientId,
contextId,
visualisationId,
expressionId
clientId,
contextId,
visualisationId,
expressionId
) =>
if (store.hasContext(clientId, contextId)) {
val handler =
@ -241,7 +271,7 @@ object ContextRegistry {
listener: ActorRef
): Store =
copy(
contexts = contexts + (client -> (getContexts(client) + contextId)),
contexts = contexts + (client -> (getContexts(client) + contextId)),
listeners = listeners + (contextId -> listener)
)
@ -270,10 +300,16 @@ object ContextRegistry {
/**
* Creates a configuration object used to create a [[ContextRegistry]].
*
* @param repo the suggestions repo
* @param config language server configuration
* @param runtime reference to the [[RuntimeConnector]]
* @param sessionRouter the session router
*/
def props(config: Config, runtime: ActorRef, sessionRouter: ActorRef): Props =
Props(new ContextRegistry(config, runtime, sessionRouter))
def props(
repo: SuggestionsRepo[Future],
config: Config,
runtime: ActorRef,
sessionRouter: ActorRef
): Props =
Props(new ContextRegistry(repo, config, runtime, sessionRouter))
}

View File

@ -1,18 +1,8 @@
package org.enso.languageserver.runtime
import java.util.UUID
/**
* An update containing information about expression.
*
* @param id expression id
* @param type optional type of expression
* @param shortValue optional value of expression
* @param methodCall optional pointer to a method definition
* @param suggestionId the updated suggestion id
*/
case class ExpressionValueUpdate(
id: UUID,
`type`: Option[String],
shortValue: Option[String],
methodCall: Option[MethodPointer]
)
case class ExpressionValueUpdate(suggestionId: Long)

View File

@ -0,0 +1,242 @@
package org.enso.languageserver.runtime
import java.nio.file.Files
import java.util.UUID
import akka.actor.{ActorRef, ActorSystem}
import akka.testkit.{ImplicitSender, TestKit, TestProbe}
import org.apache.commons.io.FileUtils
import org.enso.languageserver.data.DirectoriesConfig
import org.enso.languageserver.event.InitializedEvent
import org.enso.languageserver.runtime.ContextRegistryProtocol.{
ExecutionFailedNotification,
ExpressionValuesComputedNotification,
VisualisationContext,
VisualisationEvaluationFailed,
VisualisationUpdate
}
import org.enso.languageserver.search.Suggestions
import org.enso.languageserver.session.JsonSession
import org.enso.languageserver.session.SessionRouter.{
DeliverToBinaryController,
DeliverToJsonController
}
import org.enso.polyglot.runtime.Runtime.Api
import org.enso.searcher.SuggestionsRepo
import org.enso.searcher.sql.SqlSuggestionsRepo
import org.enso.testkit.RetrySpec
import org.scalatest.BeforeAndAfterAll
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpecLike
import scala.concurrent.{Await, Future}
import scala.concurrent.duration._
import scala.util.{Failure, Success}
class ContextEventsListenerSpec
extends TestKit(ActorSystem("TestSystem"))
with ImplicitSender
with AnyWordSpecLike
with Matchers
with BeforeAndAfterAll
with RetrySpec {
import system.dispatcher
val Timeout: FiniteDuration = 10.seconds
override def afterAll(): Unit = {
TestKit.shutdownActorSystem(system)
}
"ContextEventsListener" should {
"not send empty updates" taggedAs Retry in withDb { (_, _, _, router, _) =>
router.expectNoMessage()
}
"send expression updates" taggedAs Retry in withDb {
(clientId, contextId, repo, router, listener) =>
val (_, suggestionIds) = Await.result(
repo.insertAll(
Seq(
Suggestions.atom,
Suggestions.method,
Suggestions.function,
Suggestions.local
)
),
Timeout
)
listener ! Api.ExpressionValuesComputed(
contextId,
Vector(
Api.ExpressionValueUpdate(
Suggestions.method.externalId.get,
None,
None,
None
)
)
)
router.expectMsg(
DeliverToJsonController(
clientId,
ContextRegistryProtocol.ExpressionValuesComputedNotification(
contextId,
Vector(ExpressionValueUpdate(suggestionIds(1).get))
)
)
)
}
"send expression updates grouped" taggedAs Retry in withDb(0.seconds) {
(clientId, contextId, repo, router, listener) =>
val (_, suggestionIds) = Await.result(
repo.insertAll(
Seq(
Suggestions.atom,
Suggestions.method,
Suggestions.function,
Suggestions.local
)
),
Timeout
)
listener ! Api.ExpressionValuesComputed(
contextId,
Vector(
Api.ExpressionValueUpdate(
Suggestions.method.externalId.get,
None,
None,
None
)
)
)
listener ! Api.ExpressionValuesComputed(
contextId,
Vector(
Api.ExpressionValueUpdate(
Suggestions.local.externalId.get,
None,
None,
None
)
)
)
listener ! ContextEventsListener.RunExpressionUpdates
router.expectMsg(
DeliverToJsonController(
clientId,
ExpressionValuesComputedNotification(
contextId,
Vector(
ExpressionValueUpdate(suggestionIds(1).get),
ExpressionValueUpdate(suggestionIds(3).get)
)
)
)
)
}
"send visualization updates" taggedAs Retry in withDb {
(clientId, contextId, _, router, listener) =>
val ctx = Api.VisualisationContext(
UUID.randomUUID(),
contextId,
UUID.randomUUID()
)
val data = Array[Byte](1, 2, 3)
listener ! Api.VisualisationUpdate(ctx, data)
router.expectMsg(
DeliverToBinaryController(
clientId,
VisualisationUpdate(
VisualisationContext(
ctx.visualisationId,
ctx.contextId,
ctx.expressionId
),
data
)
)
)
}
"send execution failed notification" taggedAs Retry in withDb {
(clientId, contextId, _, router, listener) =>
val message = "Test execution failed"
listener ! Api.ExecutionFailed(contextId, message)
router.expectMsg(
DeliverToJsonController(
clientId,
ExecutionFailedNotification(contextId, message)
)
)
}
"send visualisation evaluation failed notification" taggedAs Retry in withDb {
(clientId, contextId, _, router, listener) =>
val message = "Test visualisation evaluation failed"
listener ! Api.VisualisationEvaluationFailed(contextId, message)
router.expectMsg(
DeliverToBinaryController(
clientId,
VisualisationEvaluationFailed(contextId, message)
)
)
}
}
def newJsonSession(clientId: UUID): JsonSession =
JsonSession(clientId, TestProbe().ref)
def withDb(
test: (UUID, UUID, SuggestionsRepo[Future], TestProbe, ActorRef) => Any
): Unit =
withDb(100.millis)(test)
def withDb(updatesSendRate: FiniteDuration)(
test: (UUID, UUID, SuggestionsRepo[Future], TestProbe, ActorRef) => Any
): Unit = {
val testContentRoot = Files.createTempDirectory(null).toRealPath()
sys.addShutdownHook(FileUtils.deleteQuietly(testContentRoot.toFile))
val dirsConfig = DirectoriesConfig(testContentRoot.toFile)
val clientId = UUID.randomUUID()
val contextId = UUID.randomUUID()
val router = TestProbe("session-router")
val repo = SqlSuggestionsRepo(dirsConfig.suggestionsDatabaseFile)
val listener = system.actorOf(
ContextEventsListener.props(
repo,
newJsonSession(clientId),
contextId,
router.ref,
updatesSendRate
)
)
repo.init.onComplete {
case Success(()) =>
system.eventStream.publish(InitializedEvent.SuggestionsRepoInitialized)
case Failure(ex) =>
system.log.error(ex, "Failed to initialize Suggestions repo")
}
try test(clientId, contextId, repo, router, listener)
finally {
system.stop(listener)
repo.close()
}
}
}

View File

@ -104,7 +104,12 @@ class BaseServerTest extends JsonRpcServerTestKit {
val contextRegistry =
system.actorOf(
ContextRegistry.props(config, runtimeConnectorProbe.ref, sessionRouter)
ContextRegistry.props(
suggestionsRepo,
config,
runtimeConnectorProbe.ref,
sessionRouter
)
)
val suggestionsHandler =

View File

@ -1,6 +1,5 @@
package org.enso.languageserver.websocket.json
import java.io.File
import java.util.UUID
import io.circe.literal._
@ -142,11 +141,11 @@ class ContextRegistryTest extends BaseServerTest {
val requestId2 =
runtimeConnectorProbe.receiveN(1).head match {
case Api.Request(
requestId,
Api.PushContextRequest(
`contextId`,
Api.StackItem.LocalCall(`expressionId`)
)
requestId,
Api.PushContextRequest(
`contextId`,
Api.StackItem.LocalCall(`expressionId`)
)
) =>
requestId
case msg =>
@ -182,11 +181,11 @@ class ContextRegistryTest extends BaseServerTest {
val requestId2 =
runtimeConnectorProbe.receiveN(1).head match {
case Api.Request(
requestId,
Api.PushContextRequest(
`contextId`,
Api.StackItem.LocalCall(`expressionId`)
)
requestId,
Api.PushContextRequest(
`contextId`,
Api.StackItem.LocalCall(`expressionId`)
)
) =>
requestId
case msg =>
@ -278,11 +277,11 @@ class ContextRegistryTest extends BaseServerTest {
val requestId2 =
runtimeConnectorProbe.receiveN(1).head match {
case Api.Request(
requestId,
Api.PushContextRequest(
`contextId`,
Api.StackItem.LocalCall(`expressionId`)
)
requestId,
Api.PushContextRequest(
`contextId`,
Api.StackItem.LocalCall(`expressionId`)
)
) =>
requestId
case msg =>
@ -327,11 +326,11 @@ class ContextRegistryTest extends BaseServerTest {
val requestId2 =
runtimeConnectorProbe.receiveN(1).head match {
case Api.Request(
requestId,
Api.PushContextRequest(
`contextId`,
Api.StackItem.LocalCall(`expressionId`)
)
requestId,
Api.PushContextRequest(
`contextId`,
Api.StackItem.LocalCall(`expressionId`)
)
) =>
requestId
case msg =>
@ -359,11 +358,11 @@ class ContextRegistryTest extends BaseServerTest {
val requestId3 =
runtimeConnectorProbe.receiveN(1).head match {
case Api.Request(
requestId,
Api.RecomputeContextRequest(
`contextId`,
Some(Api.InvalidatedExpressions.All())
)
requestId,
Api.RecomputeContextRequest(
`contextId`,
Some(Api.InvalidatedExpressions.All())
)
) =>
requestId
case msg =>
@ -400,11 +399,11 @@ class ContextRegistryTest extends BaseServerTest {
val requestId2 =
runtimeConnectorProbe.receiveN(1).head match {
case Api.Request(
requestId,
Api.PushContextRequest(
`contextId`,
Api.StackItem.LocalCall(`expressionId`)
)
requestId,
Api.PushContextRequest(
`contextId`,
Api.StackItem.LocalCall(`expressionId`)
)
) =>
requestId
case msg =>
@ -432,13 +431,15 @@ class ContextRegistryTest extends BaseServerTest {
val requestId3 =
runtimeConnectorProbe.receiveN(1).head match {
case Api.Request(
requestId,
Api.RecomputeContextRequest(
`contextId`,
Some(
Api.InvalidatedExpressions.Expressions(Vector(`expressionId`))
requestId,
Api.RecomputeContextRequest(
`contextId`,
Some(
Api.InvalidatedExpressions.Expressions(
Vector(`expressionId`)
)
)
)
)
) =>
requestId
case msg =>
@ -450,77 +451,6 @@ class ContextRegistryTest extends BaseServerTest {
)
client.expectJson(json.ok(3))
}
"send notifications" in {
val client = getInitialisedWsClient()
// create context
client.send(json.executionContextCreateRequest(1))
val (requestId, contextId) =
runtimeConnectorProbe.receiveN(1).head match {
case Api.Request(requestId, Api.CreateContextRequest(contextId)) =>
(requestId, contextId)
case msg =>
fail(s"Unexpected message: $msg")
}
runtimeConnectorProbe.lastSender ! Api.Response(
requestId,
Api.CreateContextResponse(contextId)
)
client.expectJson(json.executionContextCreateResponse(1, contextId))
// notify
val update = Api.ExpressionValueUpdate(
expressionId = UUID.randomUUID(),
expressionType = Some("ExpressionType"),
shortValue = Some("ShortValue"),
methodCall = Some(
Api.MethodPointer(
file = testContentRoot.toFile,
definedOnType = "DefinedOnType",
name = "Name"
)
)
)
val invalidPathUpdate = Api.ExpressionValueUpdate(
expressionId = UUID.randomUUID(),
expressionType = None,
shortValue = None,
methodCall = Some(
Api.MethodPointer(new File("/invalid"), "Invalid", "Invalid")
)
)
system.eventStream.publish(
Api.ExpressionValuesComputed(
contextId,
Vector(update, invalidPathUpdate)
)
)
client.expectJson(json"""
{ "jsonrpc" : "2.0",
"method" : "executionContext/expressionValuesComputed",
"params" : {
"contextId" : $contextId,
"updates" : [
{
"id" : ${update.expressionId},
"type" : "ExpressionType",
"shortValue" : "ShortValue",
"methodCall" : {
"file" : {
"rootId" : $testContentRootId,
"segments" : [
]
},
"definedOnType" : "DefinedOnType",
"name" : "Name"
}
}
]
}
}
""")
}
}
}

View File

@ -32,6 +32,7 @@ public class SuggestionsRepoBenchmark {
final Path dbfile = Path.of(System.getProperty("java.io.tmpdir"), "bench-suggestions.db");
final Seq<Suggestion.Kind> kinds = SuggestionRandom.nextKinds();
final Seq<scala.Tuple2<UUID, String>> updateInput = SuggestionRandom.nextUpdateAllInput();
final Seq<UUID> getAllByExternalIdsInput = SuggestionRandom.nextGetByExternalIdsInput();
SqlSuggestionsRepo repo;
@ -102,12 +103,16 @@ public class SuggestionsRepoBenchmark {
TIMEOUT);
}
@Benchmark
public Object searchByExternalIds() throws TimeoutException, InterruptedException {
return Await.result(repo.getAllByExternalIds(getAllByExternalIdsInput), TIMEOUT);
}
@Benchmark
public Object updateByExternalId() throws TimeoutException, InterruptedException {
return Await.result(repo.updateAll(updateInput), TIMEOUT);
}
public static void main(String[] args) throws RunnerException {
Options opt =
new OptionsBuilder().include(SuggestionsRepoBenchmark.class.getSimpleName()).build();

View File

@ -11,6 +11,9 @@ object SuggestionRandom {
def nextUpdateAllInput(): Seq[(UUID, String)] =
Seq(UUID.randomUUID() -> nextString())
def nextGetByExternalIdsInput(): Seq[UUID] =
Seq(UUID.randomUUID(), UUID.randomUUID(), UUID.randomUUID())
def nextKinds(): Seq[Suggestion.Kind] =
Set.fill(1)(nextKind()).toSeq

View File

@ -14,6 +14,13 @@ trait SuggestionsRepo[F[_]] {
*/
def getAll: F[(Long, Seq[SuggestionEntry])]
/** Get suggestions by external ids.
*
* @param ids the list of external ids
* @return the list of found suggestion ids
*/
def getAllByExternalIds(ids: Seq[Suggestion.ExternalId]): F[Seq[Option[Long]]]
/** Search suggestion by various parameters.
*
* @param module the module name search parameter

View File

@ -39,6 +39,12 @@ final class SqlSuggestionsRepo(db: SqlDatabase)(implicit ec: ExecutionContext)
override def getAll: Future[(Long, Seq[SuggestionEntry])] =
db.run(getAllQuery)
/** @inheritdoc */
override def getAllByExternalIds(
ids: Seq[Suggestion.ExternalId]
): Future[Seq[Option[Long]]] =
db.run(getAllByExternalIdsQuery(ids))
/** @inheritdoc */
override def search(
module: Option[String],
@ -129,6 +135,37 @@ final class SqlSuggestionsRepo(db: SqlDatabase)(implicit ec: ExecutionContext)
query.transactionally
}
/** The query to get suggestions by external ids.
*
* @param ids the list of external ids
* @return the list of found suggestion ids
*/
private def getAllByExternalIdsQuery(
ids: Seq[Suggestion.ExternalId]
): DBIO[Seq[Option[Long]]] =
if (ids.isEmpty) {
DBIO.successful(Seq())
} else {
val bits =
ids.map(id => (id.getLeastSignificantBits, id.getMostSignificantBits))
val query = Suggestions
.filter { row =>
bits
.map {
case (least, most) =>
row.externalIdLeast === least && row.externalIdMost === most
}
.reduce(_ || _)
}
.map(row => (row.id, row.externalIdLeast, row.externalIdMost))
query.result.map { triples =>
val result = triples.flatMap {
case (id, least, most) => toUUID(least, most).map(_ -> id)
}.toMap
ids.map(result.get)
}
}
/** The query to search suggestion by various parameters.
*
* @param module the module name search parameter

View File

@ -63,6 +63,42 @@ class SuggestionsRepoTest extends AnyWordSpec with Matchers with RetrySpec {
)
}
"get suggestions by external ids" taggedAs Retry in withRepo { repo =>
val action = for {
(_, ids) <- repo.insertAll(
Seq(
suggestion.atom,
suggestion.method,
suggestion.function,
suggestion.local
)
)
results <- repo.getAllByExternalIds(
Seq(suggestion.method.externalId.get, suggestion.local.externalId.get)
)
} yield (ids, results)
val (ids, results) = Await.result(action, Timeout)
results should contain theSameElementsAs Seq(ids(1), ids(3))
}
"get suggestions by empty external ids" taggedAs Retry in withRepo { repo =>
val action = for {
_ <- repo.insertAll(
Seq(
suggestion.atom,
suggestion.method,
suggestion.function,
suggestion.local
)
)
results <- repo.getAllByExternalIds(Seq())
} yield results
val results = Await.result(action, Timeout)
results.isEmpty shouldEqual true
}
"fail to insert duplicate suggestion" taggedAs Retry in withRepo { repo =>
val action =
for {