Discover parties in navigator automatically (#8079)

This turned out to be a bit more messy than I thought it would be
unfortunately but it doesn’t seem too bad. If anyone has a better
suggestion for how to approach this, I’m all ears.

I added an integration test that checks that newly allocated parties
are picked up.

changelog_begin

- [Navigator] If no parties are in the Navigator config or daml.yaml,
  Navigator will now pick up parties from the party management
  service. Those parties are periodically refreshed.

changelog_end

Update navigator/backend/src/main/scala/com/digitalasset/navigator/Session.scala

Co-authored-by: Stephen Compall <stephen.compall@daml.com>

Co-authored-by: Stephen Compall <stephen.compall@daml.com>
This commit is contained in:
Moritz Kiefer 2020-11-30 12:53:40 +01:00 committed by GitHub
parent 74f5ecd082
commit 96f58a3db1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 417 additions and 162 deletions

View File

@ -150,6 +150,7 @@ da_scala_test_suite(
"src/test/**/*Spec.scala",
"src/test/**/*Test.scala",
]),
data = ["//ledger/test-common:dar-files"],
scalacopts = navigator_scalacopts,
deps = [
":navigator-library",
@ -159,9 +160,22 @@ da_scala_test_suite(
"//daml-lf/interface",
"//daml-lf/transaction",
"//language-support/scala/bindings",
"//ledger-api/rs-grpc-bridge",
"//ledger-api/testing-utils",
"//ledger-service/lf-value-json",
"//ledger/ledger-api-auth",
"//ledger/ledger-api-client",
"//ledger/ledger-api-common",
"//ledger/ledger-api-domain",
"//ledger/ledger-resources",
"//ledger/sandbox-classic",
"//ledger/sandbox-classic:sandbox-classic-scala-tests-lib",
"//ledger/sandbox-common",
"//ledger/sandbox-common:sandbox-common-scala-tests-lib",
"//libs-scala/build-info",
"//libs-scala/ports",
"//libs-scala/resources",
"//libs-scala/timer-utils",
"@maven//:com_typesafe_akka_akka_actor_2_12",
"@maven//:com_typesafe_akka_akka_http_2_12",
"@maven//:com_typesafe_akka_akka_http_core_2_12",

View File

@ -13,7 +13,7 @@ import com.daml.navigator.config.Arguments
import com.daml.navigator.store.Store._
import spray.json._
import DefaultJsonProtocol._
import com.daml.ledger.api.refinements.ApiTypes
import com.daml.navigator.store.Store
import com.daml.navigator.time.TimeProviderType
import scala.concurrent.{ExecutionContext, Future}
@ -56,7 +56,12 @@ case class DefaultInfoHandler(arguments: Arguments, platformStore: ActorRef)(
case _: PartyActorStarting => JsString("Starting")
case _: PartyActorStarted => JsString("Started")
case info: PartyActorFailed => JsString(s"Failed: ${info.error.getMessage}")
case _: PartyActorUnresponsive => JsString("Unresponsive")
}
}
private implicit object actorResponseWriter extends RootJsonWriter[PartyActorResponse] {
override def write(obj: PartyActorResponse): JsValue = obj match {
case PartyActorRunning(info) => info.toJson
case Store.PartyActorUnresponsive => JsString("Unresponsive")
}
}
private implicit object appInfoWriter extends RootJsonWriter[ApplicationStateInfo] {
@ -84,7 +89,7 @@ case class DefaultInfoHandler(arguments: Arguments, platformStore: ActorRef)(
"type" -> TimeProviderType.write(info.ledgerTime.`type`).toJson
),
"partyActors" -> JsObject(
info.partyActors.map(p => ApiTypes.Party.unwrap(p.party) -> p.toJson).toMap
info.partyActors.map { case (p, s) => p -> s.toJson }.toMap
)
)
case info: ApplicationStateFailed =>

View File

@ -21,6 +21,7 @@ final case class User(
sealed trait SignInError
case object InvalidCredentials extends SignInError
case object NotConnected extends SignInError
case object Unresponsive extends SignInError
case object Unknown extends SignInError
sealed abstract class Status
@ -38,8 +39,12 @@ object Session {
def current(sessionId: String): Option[Session] = sessions.get(sessionId)
def open(sessionId: String, userId: String, userConfig: UserConfig): Session = {
val user = Session(User(userId, userConfig.party, userConfig.role))
def open(
sessionId: String,
userId: String,
userConfig: UserConfig,
state: PartyState): Session = {
val user = Session(User(userId, state, userConfig.role))
sessions += sessionId -> user
user
}
@ -185,7 +190,8 @@ object SessionJsonProtocol extends DefaultJsonProtocol {
errorFieldName -> JsString(error match {
case InvalidCredentials => "invalid-credentials"
case NotConnected => "not-connected"
case _ => "unknown-error"
case Unresponsive => "unresponsive"
case Unknown => "unknown-error"
})
))
@ -197,12 +203,15 @@ object SessionJsonProtocol extends DefaultJsonProtocol {
s"${SignIn.getClass.getCanonicalName} because the field $methodFieldName is missing")
case (Some(rawMethod), maybeError) =>
val method = signInMethodFormat.read(rawMethod)
maybeError.fold(SignIn(method))(error =>
SignIn(method, Some(error match {
SignIn(
method,
maybeError.map {
case JsString("invalid-credentials") => InvalidCredentials
case JsString("not-connected") => NotConnected
case JsString("unresponsive") => Unresponsive
case _ => Unknown
})))
}
)
}
}

View File

@ -8,7 +8,7 @@ import java.util.UUID
import java.util.concurrent.TimeUnit
import java.util.stream.Collectors
import akka.actor.ActorSystem
import akka.actor.{ActorSystem, Cancellable}
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.HttpHeader
import akka.http.scaladsl.model.StatusCodes._
@ -25,7 +25,8 @@ import com.daml.navigator.SessionJsonProtocol._
import com.daml.navigator.config._
import com.daml.navigator.graphql.GraphQLContext
import com.daml.navigator.graphqless.GraphQLObject
import com.daml.navigator.model.{Ledger, PackageRegistry}
import com.daml.navigator.model.{Ledger, PackageRegistry, PartyState}
import com.daml.navigator.store.Store
import com.daml.navigator.store.Store._
import com.daml.navigator.store.platform.PlatformStore
import com.typesafe.scalalogging.LazyLogging
@ -61,15 +62,14 @@ abstract class UIBackend extends LazyLogging with ApplicationInfoJsonSupport {
private[navigator] def getRoute(
system: ActorSystem,
arguments: Arguments,
config: Config,
graphQL: GraphQLHandler,
info: InfoHandler,
getAppState: () => Future[ApplicationStateInfo]): Route = {
def openSession(userId: String, userConfig: UserConfig): Route = {
def openSession(userId: String, userConfig: UserConfig, state: PartyState): Route = {
val sessionId = UUID.randomUUID().toString
setCookie(HttpCookie("session-id", sessionId, path = Some("/"))) {
complete(Session.open(sessionId, userId, userConfig))
complete(Session.open(sessionId, userId, userConfig, state))
}
}
@ -84,8 +84,15 @@ abstract class UIBackend extends LazyLogging with ApplicationInfoJsonSupport {
None
}
def signIn(error: Option[SignInError] = None): SignIn =
SignIn(SignInSelect(config.users.keySet), error)
def signIn(): Route =
onSuccess(getAppState()) {
case ApplicationStateConnecting(_, _, _, _) =>
complete(SignIn(SignInSelect(Set.empty), Some(NotConnected)))
case ApplicationStateConnected(_, _, _, _, _, _, partyActors) =>
complete(SignIn(SignInSelect(partyActors.keySet), None))
case ApplicationStateFailed(_, _, _, _, _) =>
complete(SignIn(SignInSelect(Set.empty), Some(Unknown)))
}
// A resource with content that may change.
// Users can quickly switch between Navigator versions, so we don't want to cache this resource for any amount of time.
@ -112,37 +119,42 @@ abstract class UIBackend extends LazyLogging with ApplicationInfoJsonSupport {
} ~
path("session"./) {
get {
complete {
session match {
case Some((_, session)) => session
case None => signIn()
}
session match {
case Some((_, session)) => complete(session)
case None => signIn()
}
} ~
post {
entity(as[LoginRequest]) { request =>
config.users.get(request.userId) match {
case None =>
logger.error(
s"Attempt to signin with non-existent user ${request.userId}")
complete(signIn(Some(InvalidCredentials)))
case Some(userConfig) =>
onSuccess(getAppState()) {
case ApplicationStateFailed(
_,
_,
_,
_,
GrpcException.PERMISSION_DENIED()) =>
logger.warn("Attempt to sign in without valid token")
complete(signIn(Some(InvalidCredentials)))
case _: ApplicationStateFailed =>
complete(signIn(Some(Unknown)))
case _: ApplicationStateConnecting =>
complete(signIn(Some(NotConnected)))
case _ =>
openSession(request.userId, userConfig)
onSuccess(getAppState()) {
case ApplicationStateConnecting(_, _, _, _) =>
complete(SignIn(SignInSelect(Set.empty), Some(NotConnected)))
case ApplicationStateConnected(_, _, _, _, _, _, partyActors) =>
partyActors.get(request.userId) match {
case Some(resp) =>
resp match {
case PartyActorRunning(info) =>
openSession(request.userId, info.state.config, info.state)
case Store.PartyActorUnresponsive =>
complete(
SignIn(SignInSelect(partyActors.keySet), Some(Unresponsive)))
}
case None =>
logger.error(
s"Attempt to signin with non-existent user ${request.userId}")
complete(
SignIn(SignInSelect(partyActors.keySet), Some(InvalidCredentials)))
}
case ApplicationStateFailed(
_,
_,
_,
_,
GrpcException.PERMISSION_DENIED()) =>
logger.warn("Attempt to sign in without valid token")
complete(SignIn(SignInSelect(Set.empty), Some(InvalidCredentials)))
case ApplicationStateFailed(_, _, _, _, _) =>
complete(SignIn(SignInSelect(Set.empty), Some(Unknown)))
}
}
} ~
@ -155,7 +167,7 @@ abstract class UIBackend extends LazyLogging with ApplicationInfoJsonSupport {
case None =>
logger.error("Cannot delete session without session-id, cookie not found")
}
complete(signIn())
signIn()
}
}
} ~
@ -208,14 +220,10 @@ abstract class UIBackend extends LazyLogging with ApplicationInfoJsonSupport {
}
}
private[navigator] def runServer(arguments: Arguments, config: Config): Unit = {
banner.foreach(println)
implicit val system: ActorSystem = ActorSystem("da-ui-backend")
// Factored out for integration tests
private[navigator] def setup(arguments: Arguments, config: Config)(
implicit system: ActorSystem) = {
import system.dispatcher
// Read from the access token file or crash
val token =
arguments.accessTokenFile.map { path =>
@ -237,7 +245,17 @@ abstract class UIBackend extends LazyLogging with ApplicationInfoJsonSupport {
applicationInfo,
arguments.ledgerInboundMessageSizeMax
))
config.parties.foreach(store ! Subscribe(_))
// If no parties are specified, we periodically poll from the party management service.
// If parties are specified, we only use those. This allows users to use custom display names
// if they are non-unique or use only a subset of parties for performance reasons.
// Currently, we subscribe to all available parties. We could change that to do it lazily only on login
// but given that Navigator is only a development tool that might not be worth the complexity.
val partyRefresh: Option[Cancellable] = if (config.users.isEmpty) {
Some(system.scheduler.scheduleWithFixedDelay(Duration.Zero, 1.seconds, store, UpdateParties))
} else {
config.users.foreach { case (displayName, config) => store ! Subscribe(displayName, config) }
None
}
def graphQL: GraphQLHandler = DefaultGraphQLHandler(customEndpoints, Some(store))
def info: InfoHandler = DefaultInfoHandler(arguments, store)
@ -245,12 +263,22 @@ abstract class UIBackend extends LazyLogging with ApplicationInfoJsonSupport {
implicit val actorTimeout: Timeout = Timeout(5, TimeUnit.SECONDS)
(store ? GetApplicationStateInfo).mapTo[ApplicationStateInfo]
}
(graphQL, info, store, getAppState, partyRefresh)
}
private[navigator] def runServer(arguments: Arguments, config: Config): Unit = {
banner.foreach(println)
implicit val system: ActorSystem = ActorSystem("da-ui-backend")
val (graphQL, info, store, getAppState, partyRefresh) = setup(arguments, config)
val stopServer = if (arguments.startWebServer) {
val binding = Http()
.newServerAt("0.0.0.0", arguments.port)
.withSettings(ServerSettings(system).withTransparentHeadRequests(true))
.bind(getRoute(system, arguments, config, graphQL, info, getAppState))
.bind(getRoute(system, arguments, graphQL, info, getAppState))
logger.info(s"DA UI backend server listening on port ${arguments.port}")
println(s"Frontend running at http://localhost:${arguments.port}.")
() =>
@ -265,6 +293,7 @@ abstract class UIBackend extends LazyLogging with ApplicationInfoJsonSupport {
console.Console.run(arguments, config, store, graphQL, applicationInfo)
// Stop the web server, then the Akka system consuming the ledger API
stopServer()
partyRefresh.foreach(_.cancel)
stopAkka()
()
}

View File

@ -12,7 +12,6 @@ import com.daml.assistant.config.{
ConfigParseError => SdkConfigParseError,
ProjectConfig
}
import com.daml.navigator.model.PartyState
import com.daml.ledger.api.refinements.ApiTypes
import com.github.ghik.silencer.silent
import com.typesafe.config.{ConfigFactory, ConfigRenderOptions}
@ -20,7 +19,11 @@ import org.slf4j.LoggerFactory
import pureconfig.{ConfigConvert, ConfigWriter}
import scalaz.Tag
final case class UserConfig(password: Option[String], party: PartyState, role: Option[String])
final case class UserConfig(
password: Option[String],
party: ApiTypes.Party,
role: Option[String],
useDatabase: Boolean)
/* The configuration has an empty map as default list of users because you can login as party too */
final case class Config(users: Map[String, UserConfig] = Map.empty[String, UserConfig]) {
@ -28,7 +31,6 @@ final case class Config(users: Map[String, UserConfig] = Map.empty[String, UserC
def userIds: Set[String] = users.keySet
def roles: Set[String] = users.values.flatMap(_.role.toList)(collection.breakOut)
def parties: Set[PartyState] = users.values.map(_.party)(collection.breakOut)
}
sealed abstract class ConfigReadError extends Product with Serializable {
@ -70,8 +72,8 @@ object Config {
def loadNavigatorConfig(
configFile: Path,
useDatabase: Boolean): Either[ConfigReadError, Config] = {
@silent(" partyConfigConvert .* is never used") // false positive; macro uses aren't seen
implicit val partyConfigConvert: ConfigConvert[PartyState] = mkPartyConfigConvert(
@silent(" userConfigConvert .* is never used") // false positive; macro uses aren't seen
implicit val userConfigConvert: ConfigConvert[UserConfig] = mkUserConfigConvert(
useDatabase = useDatabase)
if (Files.exists(configFile)) {
logger.info(s"Loading Navigator config file from $configFile")
@ -99,11 +101,11 @@ object Config {
Right(
Config(
parties
.map(p => p -> UserConfig(None, new PartyState(ApiTypes.Party(p), useDatabase), None))
.map(p => p -> UserConfig(None, ApiTypes.Party(p), None, useDatabase))
.toMap))
case Right(None) =>
val message = "Found a SDK project config file, but it does not contain any parties."
Left(ConfigInvalid(message))
// Pick up parties from party management service
Right(Config())
case Left(SdkConfigMissing(reason)) =>
Left(ConfigNotFound(reason))
case Left(SdkConfigParseError(reason)) =>
@ -118,15 +120,12 @@ object Config {
def template(useDatabase: Boolean): Config =
Config(
Map(
"OPERATOR" -> UserConfig(
Some("password"),
new PartyState(ApiTypes.Party("party"), useDatabase),
None)
"OPERATOR" -> UserConfig(Some("password"), ApiTypes.Party("party"), None, useDatabase)
))
def writeTemplateToPath(configFile: Path, useDatabase: Boolean): Unit = {
@silent(" partyConfigConvert .* is never used") // false positive; macro uses aren't seen
implicit val partyConfigConvert: ConfigConvert[PartyState] = mkPartyConfigConvert(
@silent(" userConfigConvert .* is never used") // false positive; macro uses aren't seen
implicit val userConfigConvert: ConfigConvert[UserConfig] = mkUserConfigConvert(
useDatabase = useDatabase)
val config = ConfigWriter[Config].to(template(useDatabase))
val cro = ConfigRenderOptions
@ -139,8 +138,11 @@ object Config {
()
}
private[this] def mkPartyConfigConvert(useDatabase: Boolean): ConfigConvert[PartyState] =
ConfigConvert.viaNonEmptyString[PartyState](
str => _ => Right(new PartyState(ApiTypes.Party(str), useDatabase)),
t => Tag.unwrap(t.name))
final case class UserConfigHelper(password: Option[String], party: String, role: Option[String])
private[this] def mkUserConfigConvert(useDatabase: Boolean): ConfigConvert[UserConfig] =
implicitly[ConfigConvert[UserConfigHelper]].xmap(
helper => UserConfig(helper.password, ApiTypes.Party(helper.party), helper.role, useDatabase),
conf => UserConfigHelper(conf.password, Tag.unwrap(conf.party), conf.role)
)
}

View File

@ -79,7 +79,7 @@ object Console {
history = new DefaultHistory(),
quit = false,
rebuildLineReader = false,
party = config.parties.headOption.map(ps => ps.name).getOrElse(ApiTypes.Party("???")),
party = config.users.values.headOption.map(ps => ps.party).getOrElse(ApiTypes.Party("???")),
arguments = arguments,
config = config,
store = store,

View File

@ -54,7 +54,7 @@ final case class ParameterSQL(name: String, description: String) extends Paramet
final case class ParameterParty(name: String, description: String) extends Parameter {
def paramName: String = s"<$name>"
def completer(state: State): Completer =
new StringsCompleter(state.config.parties.toList.map(p => ApiTypes.Party.unwrap(p.name)).asJava)
new StringsCompleter(state.config.users.values.map(p => ApiTypes.Party.unwrap(p.party)).asJava)
}
final case class ParameterTemplateId(name: String, description: String) extends Parameter {

View File

@ -3,15 +3,26 @@
package com.daml.navigator.console
import java.util.concurrent.TimeUnit
import akka.actor.ActorRef
import akka.pattern.ask
import akka.util.Timeout
import com.daml.ledger.api.refinements.ApiTypes
import com.daml.navigator.{ApplicationInfo, GraphQLHandler}
import com.daml.navigator.config.{Arguments, Config}
import com.daml.navigator.model.PartyState
import com.daml.navigator.store.Store
import com.daml.navigator.store.Store.{
ApplicationStateInfo,
GetApplicationStateInfo,
PartyActorRunning
}
import org.jline.reader.{History, LineReader}
import org.jline.terminal.Terminal
import scala.concurrent.ExecutionContext
import scala.concurrent.{Await, ExecutionContext}
import scala.concurrent.duration._
/**
*
@ -39,5 +50,21 @@ final case class State(
graphQL: GraphQLHandler,
applicationInfo: ApplicationInfo
) {
def getPartyState: Option[PartyState] = config.parties.find(p => p.name == party)
def getParties: Option[Map[String, PartyState]] = {
implicit val actorTimeout: Timeout = Timeout(5, TimeUnit.SECONDS)
Await.result((store ? GetApplicationStateInfo).mapTo[ApplicationStateInfo], 10.seconds) match {
case Store.ApplicationStateConnecting(_, _, _, _) =>
None
case Store.ApplicationStateConnected(_, _, _, _, _, _, partyActors) =>
Some(partyActors.collect {
case (str, PartyActorRunning(info)) => (str, info.state)
})
case Store.ApplicationStateFailed(_, _, _, _, _) => None
}
}
def getPartyState: Option[PartyState] = {
getParties.flatMap { parties =>
parties.values.find(s => s.name == party)
}
}
}

View File

@ -5,12 +5,14 @@ package com.daml.navigator.console.commands
import java.util.concurrent.TimeUnit
import com.daml.ledger.api.refinements.ApiTypes
import com.daml.navigator.console._
import com.daml.navigator.store.Store._
import com.daml.navigator.time.TimeProviderType
import akka.pattern.ask
import akka.util.Timeout
import com.daml.ledger.api.refinements.ApiTypes
import com.daml.navigator.model.PartyState
import com.daml.navigator.store.Store
import scala.concurrent.Await
import scala.concurrent.duration._
@ -27,7 +29,11 @@ case object Info extends SimpleCommand {
case _: PartyActorStarting => PrettyPrimitive("Actor starting")
case _: PartyActorStarted => PrettyPrimitive("Actor running")
case info: PartyActorFailed => PrettyPrimitive(s"Actor failed: ${info.error.getMessage}")
case _: PartyActorUnresponsive => PrettyPrimitive(s"Actor unresponsive")
}
def prettyActorResponse(resp: PartyActorResponse): PrettyNode = resp match {
case PartyActorRunning(info) => prettyPartyInfo(info)
case Store.PartyActorUnresponsive => PrettyPrimitive(s"Actor unresponsive")
}
def prettyGeneralInfo(info: ApplicationStateInfo): PrettyNode = PrettyObject(
@ -37,26 +43,27 @@ case object Info extends SimpleCommand {
PrettyField("Application ID", info.applicationId)
)
def prettyLocalDataInfo(state: State): PrettyNode = PrettyObject(
state.config.parties.toList.map(
ps =>
PrettyField(
ApiTypes.Party.unwrap(ps.name),
PrettyObject(
PrettyField("Packages", ps.packageRegistry.packageCount.toString),
PrettyField("Contracts", ps.ledger.allContractsCount.toString),
PrettyField("Active contracts", ps.ledger.activeContractsCount.toString),
PrettyField(
"Last transaction",
ps.ledger
.latestTransaction(ps.packageRegistry)
.map(t => ApiTypes.TransactionId.unwrap(t.id))
.getOrElse("???"))
)
))
)
def prettyLocalDataInfo(pss: Seq[PartyState]): PrettyNode =
PrettyObject(
pss.toList.map(
ps =>
PrettyField(
ApiTypes.Party.unwrap(ps.name),
PrettyObject(
PrettyField("Packages", ps.packageRegistry.packageCount.toString),
PrettyField("Contracts", ps.ledger.allContractsCount.toString),
PrettyField("Active contracts", ps.ledger.activeContractsCount.toString),
PrettyField(
"Last transaction",
ps.ledger
.latestTransaction(ps.packageRegistry)
.map(t => ApiTypes.TransactionId.unwrap(t.id))
.getOrElse("???"))
)
))
)
def prettyInfo(applicationInfo: ApplicationStateInfo, state: State): PrettyObject =
def prettyInfo(applicationInfo: ApplicationStateInfo): PrettyObject =
applicationInfo match {
case info: ApplicationStateConnected =>
PrettyObject(
@ -73,10 +80,11 @@ case object Info extends SimpleCommand {
PrettyField(
"Akka system",
PrettyObject(
info.partyActors
.map(p => PrettyField(ApiTypes.Party.unwrap(p.party), prettyPartyInfo(p)))
info.partyActors.toList.map { case (k, v) => PrettyField(k, prettyActorResponse(v)) }.toList
)),
PrettyField("Local data", prettyLocalDataInfo(state))
PrettyField("Local data", prettyLocalDataInfo(info.partyActors.values.collect {
case PartyActorRunning(info) => info.state
}.toList))
)
case info: ApplicationStateConnecting =>
PrettyObject(
@ -86,7 +94,6 @@ case object Info extends SimpleCommand {
PrettyObject(
PrettyField("Connection status", "Connecting")
)),
PrettyField("Local data", prettyLocalDataInfo(state))
)
case info: ApplicationStateFailed =>
PrettyObject(
@ -97,7 +104,6 @@ case object Info extends SimpleCommand {
PrettyField("Connection status", "Failed"),
PrettyField("Error", info.error.getMessage)
)),
PrettyField("Local data", prettyLocalDataInfo(state))
)
}
@ -109,7 +115,7 @@ case object Info extends SimpleCommand {
for {
future <- Try((state.store ? GetApplicationStateInfo).mapTo[ApplicationStateInfo]) ~> "Failed to get info"
info <- Try(Await.result(future, 10.seconds)) ~> "Failed to get info"
} yield (state, getBanner(state) + "\n" + Pretty.yaml(prettyInfo(info, state)))
} yield (state, getBanner(state) + "\n" + Pretty.yaml(prettyInfo(info)))
}
def getBanner(state: State): String = {

View File

@ -16,7 +16,9 @@ case object Parties extends SimpleCommand {
state: State,
args: List[String],
set: CommandSet): Either[CommandError, (State, String)] = {
Right((state, state.config.parties.map(p => p.name).mkString("\n")))
for {
parties <- state.getParties ~> "Application not connected"
} yield (state, parties.values.map(ps => ps.name).mkString("\n"))
}
}

View File

@ -22,7 +22,8 @@ case object Party extends SimpleCommand {
set: CommandSet): Either[CommandError, (State, String)] = {
for {
arg1 <- args.headOption ~> "Missing <party> argument"
newParty <- state.config.parties
parties <- state.getParties ~> s"Application not connected"
newParty <- parties.values
.find(ps => ApiTypes.Party.unwrap(ps.name) == arg1) ~> s"Unknown party $arg1"
} yield {
(state.copy(party = newParty.name), s"Active party set to $newParty")

View File

@ -7,13 +7,15 @@ import java.util.concurrent.atomic.AtomicReference
import com.daml.lf.{iface => DamlLfIface}
import com.daml.ledger.api.refinements.ApiTypes
import com.daml.navigator.config.UserConfig
import scalaz.Tag
case class State(ledger: Ledger, packageRegistry: PackageRegistry)
/** A DA party and its ledger view(s). */
class PartyState(val name: ApiTypes.Party, val useDatabase: Boolean) {
class PartyState(val config: UserConfig) {
val name = config.party
val useDatabase = config.useDatabase
private val stateRef: AtomicReference[State] = new AtomicReference(
State(Ledger(name, None, useDatabase), new PackageRegistry))

View File

@ -5,8 +5,10 @@ package com.daml.navigator.store
import java.time.Instant
import com.daml.ledger.api.domain.PartyDetails
import com.daml.navigator.model._
import com.daml.ledger.api.refinements.ApiTypes
import com.daml.navigator.config.UserConfig
import com.daml.navigator.time.TimeProviderWithType
trait ActorStatus
@ -28,8 +30,12 @@ object Store {
/** Reinitialize the platform connection and reset all local state `Unit` */
case object ResetConnection
case object UpdateParties
case class UpdatedParties(details: List[PartyDetails])
/** Request to subscribe a party to the store (without response to sender). */
case class Subscribe(party: PartyState)
case class Subscribe(displayName: String, config: UserConfig)
/** Request to create a contract instance for a template and respond with a `scala.util.Try[CommandId]`. */
case class CreateContract(party: PartyState, templateId: TemplateStringId, argument: ApiRecord)
@ -80,7 +86,7 @@ object Store {
applicationId: String,
ledgerId: String,
ledgerTime: TimeProviderWithType,
partyActors: List[PartyActorInfo]
partyActors: Map[String, PartyActorResponse]
) extends ApplicationStateInfo
/** Application failed to start up */
@ -95,23 +101,26 @@ object Store {
/** Request diagnostic information about a party and respond with a [[PartyActorInfo]]. */
case object GetPartyActorInfo
sealed trait PartyActorResponse
final case class PartyActorRunning(info: PartyActorInfo) extends PartyActorResponse
final case object PartyActorUnresponsive extends PartyActorResponse
/** Diagnostic information about a party */
sealed trait PartyActorInfo {
def party: ApiTypes.Party
def party: ApiTypes.Party = state.name
def state: PartyState
}
/** Actor still starting up */
final case class PartyActorStarting(party: ApiTypes.Party) extends PartyActorInfo
final case class PartyActorStarting(state: PartyState) extends PartyActorInfo
/** Actor running and consuming the transaction stream */
final case class PartyActorStarted(party: ApiTypes.Party) extends PartyActorInfo
final case class PartyActorStarted(state: PartyState) extends PartyActorInfo
/** Actor permanently failed */
final case class PartyActorFailed(
party: ApiTypes.Party,
state: PartyState,
error: Throwable
) extends PartyActorInfo
/** Actor did not respond within a reasonable time */
final case class PartyActorUnresponsive(party: ApiTypes.Party) extends PartyActorInfo
}

View File

@ -9,7 +9,7 @@ import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicLong
import akka.actor.{Actor, ActorLogging, ActorRef, Props, Scheduler, Stash}
import akka.pattern.ask
import akka.pattern.{ask, pipe}
import akka.stream.Materializer
import akka.util.Timeout
import com.daml.grpc.GrpcException
@ -26,6 +26,7 @@ import com.daml.ledger.client.configuration.{
import com.daml.ledger.client.services.testing.time.StaticTime
import com.daml.lf.data.Ref
import com.daml.navigator.ApplicationInfo
import com.daml.navigator.config.UserConfig
import com.daml.navigator.model._
import com.daml.navigator.store.Store._
import com.daml.navigator.time._
@ -36,7 +37,7 @@ import io.netty.handler.ssl.SslContext
import org.slf4j.LoggerFactory
import scalaz.syntax.tag._
import scala.concurrent.Future
import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.duration._
import scala.util.{Failure, Random, Success, Try}
@ -70,11 +71,11 @@ object PlatformStore {
case class StateConnected(
ledgerClient: LedgerClient,
parties: List[PartyState],
parties: Map[String, PartyState],
staticTime: Option[StaticTime],
time: TimeProviderWithType
)
case class StateInitial(parties: List[PartyState])
case class StateInitial(parties: Map[String, PartyState])
case class StateFailed(error: Throwable)
}
@ -98,7 +99,7 @@ class PlatformStore(
private val system = context.system
implicit val s: Scheduler = system.scheduler
import scala.concurrent.ExecutionContext.Implicits.global
implicit val ec: ExecutionContext = system.dispatcher
implicit val materializer: Materializer = Materializer(system)
implicit val esf: ExecutionSequencerFactory =
new AkkaExecutionSequencerPool("esf-" + this.getClass.getSimpleName)(system)
@ -126,7 +127,7 @@ class PlatformStore(
// ----------------------------------------------------------------------------------------------
// Messages
// ----------------------------------------------------------------------------------------------
override def receive: Receive = connecting(StateInitial(List.empty[PartyState]))
override def receive: Receive = connecting(StateInitial(Map.empty[String, PartyState]))
def connecting(state: StateInitial): Receive = {
case Connected(Success(value)) =>
@ -134,8 +135,6 @@ class PlatformStore(
connected(StateConnected(value.ledgerClient, state.parties, value.staticTime, value.time)))
unstashAll
state.parties.foreach(party => startPartyActor(value.ledgerClient, party))
case Connected(Failure(e)) =>
// Connection failed even after several retries - not sure how to recover from this
val message = s"Permanently failed to connect to the ledger at $platformHost:$platformPort. " +
@ -155,9 +154,36 @@ class PlatformStore(
}
def connected(state: StateConnected): Receive = {
case Subscribe(party) =>
startPartyActor(state.ledgerClient, party)
context.become(connected(state.copy(parties = party :: state.parties)))
case UpdateParties =>
state.ledgerClient.partyManagementClient
.listKnownParties()
.map(UpdatedParties(_))
.pipeTo(self)
()
case UpdatedParties(details) =>
details.foreach { partyDetails =>
val displayName = partyDetails.displayName match {
case Some(value) => value
case None => partyDetails.party
}
self ! Subscribe(
displayName,
UserConfig(
password = None,
party = ApiTypes.Party(partyDetails.party),
role = None,
useDatabase = false))
}
case Subscribe(displayName, config) =>
if (!state.parties.contains(displayName)) {
log.info(s"Starting actor for $displayName")
val partyState = new PartyState(config)
startPartyActor(state.ledgerClient, partyState)
context.become(connected(state.copy(parties = state.parties + (displayName -> partyState))))
} else {
log.info(s"Actor for $displayName is already running")
}
case CreateContract(party, templateId, value) =>
createContract(state.time.time.getCurrentTime, party, templateId, value, sender)
@ -185,17 +211,22 @@ class PlatformStore(
val snd = sender
Future
.traverse(state.parties)(ps => {
val result = for {
ref <- context.child(childName(ps))
pi <- Try(
(ref ? GetPartyActorInfo)
.mapTo[PartyActorInfo]
.recover { case _ => PartyActorUnresponsive(ps.name) }
).toOption
} yield pi
result.getOrElse(Future.successful(PartyActorUnresponsive(ps.name)))
})
.traverse(state.parties.toList) {
case (p, ps) => {
val result = for {
ref <- context.child(childName(ps.name))
pi <- Try(
(ref ? GetPartyActorInfo)
.mapTo[PartyActorInfo]
.map(PartyActorRunning(_): PartyActorResponse)
.recover { case _ => PartyActorUnresponsive }
).toOption
} yield pi
result
.getOrElse(Future.successful(PartyActorUnresponsive))
.map(actorInfo => (p, actorInfo))
}
}
.andThen {
case Success(actorStatus) =>
snd ! ApplicationStateConnected(
@ -205,7 +236,7 @@ class PlatformStore(
applicationId,
state.ledgerClient.ledgerId.unwrap,
state.time,
actorStatus
actorStatus.toMap
)
case Failure(error) =>
log.error(error.getMessage)
@ -216,7 +247,7 @@ class PlatformStore(
applicationId,
state.ledgerClient.ledgerId.unwrap,
state.time,
List.empty
Map.empty
)
}
()
@ -238,13 +269,13 @@ class PlatformStore(
// ----------------------------------------------------------------------------------------------
// Helpers
// ----------------------------------------------------------------------------------------------
private def childName(party: PartyState): String =
"party-" + URLEncoder.encode(ApiTypes.Party.unwrap(party.name), "UTF-8")
private def childName(party: ApiTypes.Party): String =
"party-" + URLEncoder.encode(ApiTypes.Party.unwrap(party), "UTF-8")
private def startPartyActor(ledgerClient: LedgerClient, party: PartyState): ActorRef = {
private def startPartyActor(ledgerClient: LedgerClient, state: PartyState): ActorRef = {
context.actorOf(
PlatformSubscriber.props(ledgerClient, party, applicationId, token),
childName(party))
PlatformSubscriber.props(ledgerClient, state, applicationId, token),
childName(state.name))
}
private def sslContext: Option[SslContext] =
@ -421,7 +452,7 @@ class PlatformStore(
// Each party has its own command completion stream.
// Forward the request to the party actor, so that it can be tracked.
context
.child(childName(party))
.child(childName(party.name))
.foreach(child => child ! PlatformSubscriber.SubmitCommand(command, sender))
}

View File

@ -110,7 +110,7 @@ class PlatformSubscriber(
unstashAll()
case GetPartyActorInfo =>
sender ! PartyActorStarting(party.name)
sender ! PartyActorStarting(party)
case _ =>
stash
@ -128,13 +128,13 @@ class PlatformSubscriber(
submitCommand(ledgerClient, state.commandTracker, party, command, commandSender)
case GetPartyActorInfo =>
sender ! PartyActorStarted(party.name)
sender ! PartyActorStarted(party)
}
// Permanently failed state
def failed(error: Throwable): Receive = {
case GetApplicationStateInfo =>
sender ! PartyActorFailed(party.name, error)
sender ! PartyActorFailed(party, error)
case _ => ()
}

View File

@ -0,0 +1,11 @@
<configuration>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%d{HH:mm:ss} %msg%n</pattern>
</encoder>
</appender>
<root level="INFO">
<appender-ref ref="STDOUT" />
</root>
</configuration>

View File

@ -0,0 +1,96 @@
// Copyright (c) 2020 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.daml.navigator
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.{HttpRequest, HttpResponse, StatusCodes, Uri}
import akka.http.scaladsl.settings.ServerSettings
import akka.util.ByteString
import com.daml.ledger.api.testing.utils.{AkkaBeforeAndAfterAll, SuiteResourceManagementAroundAll}
import com.daml.ledger.client.LedgerClient
import com.daml.ledger.client.configuration.{
CommandClientConfiguration,
LedgerClientConfiguration,
LedgerIdRequirement
}
import com.daml.navigator.config.{Arguments, Config}
import org.scalatest._
import com.daml.platform.sandbox.services.SandboxFixture
import com.daml.timer.RetryStrategy
import scala.concurrent.Future
import scala.concurrent.duration._
class IntegrationTest
extends AsyncFreeSpec
with SandboxFixture
with AkkaBeforeAndAfterAll
with SuiteResourceManagementAroundAll
with Matchers {
self: Suite =>
private def withNavigator[A](testFn: (Uri, LedgerClient) => Future[A]): Future[A] = {
val args = Arguments(port = 0, participantPort = serverPort.value)
val sys = ActorSystem("navigator")
val (graphQL, info, _, getAppState, partyRefresh) = NavigatorBackend.setup(args, Config())(sys)
val bindingF = Http()
.newServerAt("localhost", 0)
.withSettings(ServerSettings(system).withTransparentHeadRequests(true))
.bind(NavigatorBackend.getRoute(system, args, graphQL, info, getAppState))
val clientF = LedgerClient(
channel,
LedgerClientConfiguration(
applicationId = "foobar",
LedgerIdRequirement.none,
commandClient = CommandClientConfiguration.default,
sslContext = None))
val fa = for {
binding <- bindingF
client <- clientF
uri = Uri.from(
scheme = "http",
host = binding.localAddress.getHostName,
port = binding.localAddress.getPort)
a <- testFn(uri, client)
} yield a
fa.transformWith { ta =>
partyRefresh.foreach(_.cancel())
Future
.sequence(
Seq[Future[Unit]](
bindingF.flatMap(_.unbind()).map(_ => ()),
sys.terminate().map(_ => ())))
.transform(_ => ta)
}
}
def getResponseDataBytes(resp: HttpResponse): Future[String] = {
val fb = resp.entity.dataBytes.runFold(ByteString.empty)((b, a) => b ++ a).map(_.utf8String)
fb
}
"Navigator" - {
"picks up newly allocated parties" in withNavigator {
case (uri, client) =>
for {
resp <- Http().singleRequest(HttpRequest(uri = uri.withPath(Uri.Path("/api/session/"))))
respBody <- getResponseDataBytes(resp)
_ = respBody shouldBe """{"method":{"type":"select","users":[]},"type":"sign-in"}"""
_ = resp.status shouldBe StatusCodes.OK
_ <- client.partyManagementClient
.allocateParty(hint = None, displayName = Some("display-name"))
_ <- RetryStrategy.constant(20, 1.second) {
case (run @ _, duration @ _) =>
for {
resp <- Http().singleRequest(
HttpRequest(uri = uri.withPath(Uri.Path("/api/session/"))))
respBody <- getResponseDataBytes(resp)
} yield {
respBody shouldBe """{"method":{"type":"select","users":["display-name"]},"type":"sign-in"}"""
}
}
} yield succeed
}
}
}

View File

@ -12,13 +12,15 @@ import akka.http.scaladsl.unmarshalling.Unmarshal
import com.daml.api.util.TimeProvider.UTC
import com.daml.ledger.api.refinements.ApiTypes
import com.daml.navigator.SessionJsonProtocol._
import com.daml.navigator.config.{Arguments, Config, UserConfig}
import com.daml.navigator.config.{Arguments, UserConfig}
import com.daml.navigator.model.PartyState
import com.daml.navigator.store.Store.{
ApplicationStateConnected,
ApplicationStateConnecting,
ApplicationStateFailed,
ApplicationStateInfo
ApplicationStateInfo,
PartyActorRunning,
PartyActorStarted
}
import com.daml.navigator.time.TimeProviderType.Static
import com.daml.navigator.time.TimeProviderWithType
@ -41,8 +43,9 @@ class ServerTest
val role = "role"
val party = ApiTypes.Party("party")
val password = "password"
val user = User(userId, new PartyState(party, false), Some(role), true)
val userConfig = UserConfig(Some(password), new PartyState(party, false), Some(role))
val userConfig = UserConfig(Some(password), party, Some(role), false)
val partyState = new PartyState(userConfig)
val user = User(userId, partyState, Some(role), true)
val userJson = JsObject(
"id" -> JsString(userId),
@ -63,7 +66,6 @@ class ServerTest
NavigatorBackend.getRoute(
system = ActorSystem("da-ui-backend-test"),
arguments = Arguments.default,
config = new Config(users = Map(userId -> userConfig)),
graphQL = DefaultGraphQLHandler(Set.empty, None),
info = TestInfoHandler,
getAppState = () => Future.successful(state)
@ -78,7 +80,7 @@ class ServerTest
"n/a",
"0",
TimeProviderWithType(UTC, Static),
List.empty))
Map(userId -> PartyActorRunning(PartyActorStarted(partyState)))))
private[this] val unauthorized =
route(
@ -128,7 +130,7 @@ class ServerTest
it should "respond with the Session when already signed-in" in withCleanSessions {
val sessionId = "session-id-value"
Session.open(sessionId, userId, userConfig)
Session.open(sessionId, userId, userConfig, user.party)
Get("/api/session/") ~> Cookie("session-id" -> sessionId) ~> connected ~> check {
Unmarshal(response.entity).to[String].value.map(_.map(_.parseJson)) shouldEqual Some(
Success((sessionJson)))
@ -155,30 +157,28 @@ class ServerTest
it should "forbid to SignIn with when unauthorized and report the error" in withCleanSessions {
Post("/api/session/", LoginRequest(userId, None)) ~> unauthorized ~> check {
responseAs[SignIn] shouldEqual SignIn(
method = SignInSelect(userIds = Set(userId)),
method = SignInSelect(userIds = Set()),
Some(InvalidCredentials))
}
}
it should "forbid to SignIn with when it's impossible to connect to the ledger" in withCleanSessions {
Post("/api/session/", LoginRequest(userId, None)) ~> failed ~> check {
responseAs[SignIn] shouldEqual SignIn(
method = SignInSelect(userIds = Set(userId)),
Some(Unknown))
responseAs[SignIn] shouldEqual SignIn(method = SignInSelect(userIds = Set()), Some(Unknown))
}
}
it should "forbid to SignIn with when still connecting to a ledger" in withCleanSessions {
Post("/api/session/", LoginRequest(userId, None)) ~> connecting ~> check {
responseAs[SignIn] shouldEqual SignIn(
method = SignInSelect(userIds = Set(userId)),
method = SignInSelect(userIds = Set()),
Some(NotConnected))
}
}
"SelectMode DELETE /api/session/" should "delete a given Session when signed-in" in withCleanSessions {
val sessionId = "session-id-value-2"
Session.open(sessionId, userId, userConfig)
Session.open(sessionId, userId, userConfig, user.party)
Delete("/api/session/") ~> Cookie("session-id", sessionId) ~> connected ~> check {
Session.current(sessionId) shouldBe None
}

View File

@ -7,6 +7,7 @@ import com.daml.navigator.model.PartyState
import org.scalatest.{FlatSpec, Matchers}
import SessionJsonProtocol.userWriter
import com.daml.ledger.api.refinements.ApiTypes
import com.daml.navigator.config.UserConfig
import spray.json.{JsBoolean, JsObject, JsString}
class SessionJsonProtocolTest extends FlatSpec with Matchers {
@ -17,7 +18,10 @@ class SessionJsonProtocolTest extends FlatSpec with Matchers {
behavior of s"JsonCodec[$userClassName]"
it should s"encode $userClassName without role" in {
val user = User(id = "id", party = new PartyState(party, false), canAdvanceTime = true)
val user = User(
id = "id",
party = new PartyState(UserConfig(None, party, None, false)),
canAdvanceTime = true)
val userJson = JsObject(
"id" -> JsString("id"),
"party" -> JsString("party"),
@ -28,7 +32,7 @@ class SessionJsonProtocolTest extends FlatSpec with Matchers {
it should s"encode $userClassName with role" in {
val user = User(
id = "id",
party = new PartyState(party, false),
party = new PartyState(UserConfig(None, party, Some("role"), false)),
role = Some("role"),
canAdvanceTime = false)
val userJson = JsObject(

View File

@ -114,6 +114,13 @@ export default class Component<A extends Action>
<div>Verify that the ledger is available and try again</div>
</WarningMessage>
)
} else if (failure === 'unresponsive') {
errorEl = (
<WarningMessage>
<div>Actor for party was unresponsive</div>
<div>Try restarting Navigator</div>
</WarningMessage>
)
} else if (failure === 'unknown-error') {
errorEl = (
<ErrorMessage>