mirror of
https://github.com/digital-asset/daml.git
synced 2024-09-20 01:07:18 +03:00
Don't update running triggers until we know the trigger is running (#6079)
* Don't update running triggers until we know the trigger is running changelog_begin changelog_end * Don't update running triggers until we know the trigger is running
This commit is contained in:
parent
cfec886e71
commit
b95dd39a6e
@ -17,6 +17,10 @@ case class ServiceConfig(
|
||||
ledgerHost: String,
|
||||
ledgerPort: Int,
|
||||
maxInboundMessageSize: Int,
|
||||
// These 2 parameters mean that a failing trigger will be
|
||||
// restarted up to n times within k seconds.
|
||||
maxFailureNumberOfRetries: Int,
|
||||
failureRetryTimeRange: Duration, // in seconds
|
||||
timeProviderType: TimeProviderType,
|
||||
commandTtl: Duration,
|
||||
init: Boolean,
|
||||
@ -71,6 +75,8 @@ object JdbcConfig {
|
||||
object ServiceConfig {
|
||||
val DefaultHttpPort: Int = 8088
|
||||
val DefaultMaxInboundMessageSize: Int = RunnerConfig.DefaultMaxInboundMessageSize
|
||||
val DefaultMaxFailureNumberOfRetries: Int = 3
|
||||
val DefaultFailureRetryTimeRange: Duration = Duration.ofSeconds(60)
|
||||
|
||||
private val parser = new scopt.OptionParser[ServiceConfig]("trigger-service") {
|
||||
head("trigger-service")
|
||||
@ -99,7 +105,20 @@ object ServiceConfig {
|
||||
.action((x, c) => c.copy(maxInboundMessageSize = x))
|
||||
.optional()
|
||||
.text(
|
||||
s"Optional max inbound message size in bytes. Defaults to ${DefaultMaxInboundMessageSize}")
|
||||
s"Optional max inbound message size in bytes. Defaults to ${DefaultMaxInboundMessageSize}.")
|
||||
|
||||
opt[Int]("max-failure-number-of-retries")
|
||||
.action((x, c) => c.copy(maxFailureNumberOfRetries = x))
|
||||
.optional()
|
||||
.text(
|
||||
s"Max number of times to try to restart a failing trigger (within allowed time range). Defaults to ${DefaultMaxFailureNumberOfRetries}.")
|
||||
|
||||
opt[Long]("failure-retry-time-range")
|
||||
.action { (t, c) =>
|
||||
c.copy(failureRetryTimeRange = Duration.ofSeconds(t))
|
||||
}
|
||||
.text(
|
||||
"Allow up to max number of restarts of a failing trigger within this many seconds. Defaults to " + DefaultFailureRetryTimeRange.getSeconds.toString + "s.")
|
||||
|
||||
opt[Unit]('w', "wall-clock-time")
|
||||
.action { (t, c) =>
|
||||
@ -134,6 +153,8 @@ object ServiceConfig {
|
||||
ledgerHost = null,
|
||||
ledgerPort = 0,
|
||||
maxInboundMessageSize = DefaultMaxInboundMessageSize,
|
||||
maxFailureNumberOfRetries = DefaultMaxFailureNumberOfRetries,
|
||||
failureRetryTimeRange = DefaultFailureRetryTimeRange,
|
||||
timeProviderType = TimeProviderType.Static,
|
||||
commandTtl = Duration.ofSeconds(30L),
|
||||
init = false,
|
||||
|
@ -62,21 +62,23 @@ case class LedgerConfig(
|
||||
commandTtl: Duration,
|
||||
)
|
||||
|
||||
object Server {
|
||||
class Server(dar: Option[Dar[(PackageId, Package)]]) {
|
||||
private var triggers: Map[UUID, TriggerRunnerWithToken] = Map.empty;
|
||||
private var triggersByToken: Map[Jwt, Set[UUID]] = Map.empty;
|
||||
val compiledPackages: MutableCompiledPackages = ConcurrentCompiledPackages()
|
||||
dar.foreach(addDar(_))
|
||||
|
||||
sealed trait Message
|
||||
private final case class StartFailed(cause: Throwable) extends Message
|
||||
private final case class Started(binding: ServerBinding) extends Message
|
||||
final case class GetServerBinding(replyTo: ActorRef[ServerBinding]) extends Message
|
||||
final case object Stop extends Message
|
||||
case class TriggerRunnerWithToken(ref: ActorRef[TriggerRunner.Message], token: Jwt)
|
||||
|
||||
private def addDar(compiledPackages: MutableCompiledPackages, dar: Dar[(PackageId, Package)]) = {
|
||||
private def addDar(dar: Dar[(PackageId, Package)]) = {
|
||||
val darMap = dar.all.toMap
|
||||
darMap.foreach {
|
||||
case (pkgId, pkg) =>
|
||||
// If packages are not in topological order, we will get back ResultNeedPackage.
|
||||
// The way the code is structured here we will still call addPackage even if we
|
||||
// already fed the package via the callback but this is harmless and not expensive.
|
||||
// If packages are not in topological order, we will get back
|
||||
// ResultNeedPackage. The way the code is structured here we
|
||||
// will still call addPackage even if we already fed the
|
||||
// package via the callback but this is harmless and not
|
||||
// expensive.
|
||||
def go(r: Result[Unit]): Unit = r match {
|
||||
case ResultDone(()) => ()
|
||||
case ResultNeedPackage(pkgId, resume) =>
|
||||
@ -87,84 +89,127 @@ object Server {
|
||||
}
|
||||
}
|
||||
|
||||
case class TriggerRunnerWithToken(
|
||||
ref: ActorRef[TriggerRunner.Message],
|
||||
token: Jwt,
|
||||
)
|
||||
|
||||
private var triggers: Map[UUID, TriggerRunnerWithToken] = Map.empty
|
||||
private var triggersByToken: Map[Jwt, Set[UUID]] = Map.empty
|
||||
private val compiledPackages: MutableCompiledPackages = ConcurrentCompiledPackages()
|
||||
|
||||
private def startTrigger(
|
||||
ctx: ActorContext[Server.Message],
|
||||
token: (Jwt, JwtPayload),
|
||||
trigger: Trigger,
|
||||
params: StartParams,
|
||||
ledgerConfig: LedgerConfig,
|
||||
maxInboundMessageSize: Int)(
|
||||
implicit esf: ExecutionSequencerFactory,
|
||||
mat: Materializer): JsValue = {
|
||||
val jwt: Jwt = token._1
|
||||
val jwtPayload: JwtPayload = token._2
|
||||
val party: Party = Party(jwtPayload.party);
|
||||
val uuid = UUID.randomUUID
|
||||
val ident = uuid.toString
|
||||
val ref = ctx.spawn(
|
||||
TriggerRunner(
|
||||
new TriggerRunner.Config(
|
||||
compiledPackages,
|
||||
trigger,
|
||||
ledgerConfig,
|
||||
maxInboundMessageSize,
|
||||
party),
|
||||
ident),
|
||||
ident + "-monitor")
|
||||
triggers = triggers + (uuid -> TriggerRunnerWithToken(ref, jwt))
|
||||
val newTriggerSet = triggersByToken.getOrElse(jwt, Set()) + uuid
|
||||
triggersByToken = triggersByToken + (jwt -> newTriggerSet)
|
||||
val triggerIdResult = JsObject(("triggerId", uuid.toString.toJson))
|
||||
triggerIdResult
|
||||
private def actorWithToken(uuid: UUID) = {
|
||||
triggers.get(uuid).get // TODO: Improve as might throw NoSuchElementException.
|
||||
}
|
||||
|
||||
private def stopTrigger(uuid: UUID, token: (Jwt, JwtPayload))(
|
||||
implicit esf: ExecutionSequencerFactory,
|
||||
mat: Materializer): JsValue = {
|
||||
//TODO(SF, 2020-05-20): Check that the provided token
|
||||
//is the same as the one used to start the trigger and
|
||||
//fail with 'Unauthorized' if not.
|
||||
val actorWithToken = triggers.get(uuid).get
|
||||
actorWithToken.ref ! TriggerRunner.Stop
|
||||
private def addRunningTrigger(uuid: UUID, jwt: Jwt, runner: ActorRef[TriggerRunner.Message]) = {
|
||||
triggers = triggers + (uuid -> TriggerRunnerWithToken(runner, jwt))
|
||||
triggersByToken = triggersByToken + (jwt -> (triggersByToken.getOrElse(jwt, Set()) + uuid))
|
||||
}
|
||||
|
||||
private def removeRunningTrigger(
|
||||
uuid: UUID,
|
||||
jwt: Jwt,
|
||||
runner: ActorRef[TriggerRunner.Message]) = {
|
||||
triggers = triggers - uuid
|
||||
val token = actorWithToken.token
|
||||
val newTriggerSet = triggersByToken.get(token).get - uuid
|
||||
triggersByToken = triggersByToken + (token -> newTriggerSet)
|
||||
val stoppedTriggerId = JsObject(("triggerId", uuid.toString.toJson))
|
||||
stoppedTriggerId
|
||||
triggersByToken = triggersByToken + (jwt -> (triggersByToken.get(jwt).get - uuid))
|
||||
}
|
||||
|
||||
private def listTriggers(token: (Jwt, JwtPayload))(
|
||||
implicit esf: ExecutionSequencerFactory,
|
||||
mat: Materializer): JsValue = {
|
||||
val jwt: Jwt = token._1
|
||||
val triggerList = triggersByToken.getOrElse(jwt, Set()).map(_.toString).toList
|
||||
JsObject(("triggerIds", triggerList.toJson))
|
||||
private def listRunningTriggers(jwt: Jwt): List[String] = {
|
||||
triggersByToken.getOrElse(jwt, Set()).map(_.toString).toList
|
||||
}
|
||||
}
|
||||
|
||||
object Server {
|
||||
|
||||
sealed trait Message
|
||||
|
||||
final case class GetServerBinding(replyTo: ActorRef[ServerBinding]) extends Message
|
||||
final case object Stop extends Message
|
||||
|
||||
private final case class StartFailed(cause: Throwable) extends Message
|
||||
private final case class Started(binding: ServerBinding) extends Message
|
||||
|
||||
final case class TriggerStarting(
|
||||
triggerId: UUID,
|
||||
jwt: Jwt,
|
||||
runner: ActorRef[TriggerRunner.Message])
|
||||
extends Message
|
||||
final case class TriggerStarted(
|
||||
triggerId: UUID,
|
||||
jwt: Jwt,
|
||||
runner: ActorRef[TriggerRunner.Message])
|
||||
extends Message
|
||||
final case class TriggerInitializationFailure(
|
||||
triggerId: UUID,
|
||||
jwt: Jwt,
|
||||
runner: ActorRef[TriggerRunner.Message],
|
||||
cause: String
|
||||
) extends Message
|
||||
final case class TriggerRuntimeFailure(
|
||||
triggerId: UUID,
|
||||
jwt: Jwt,
|
||||
runner: ActorRef[TriggerRunner.Message],
|
||||
cause: String
|
||||
) extends Message
|
||||
|
||||
def apply(
|
||||
host: String,
|
||||
port: Int,
|
||||
ledgerConfig: LedgerConfig,
|
||||
maxInboundMessageSize: Int,
|
||||
maxFailureNumberOfRetries: Int,
|
||||
failureRetryTimeRange: Duration,
|
||||
dar: Option[Dar[(PackageId, Package)]],
|
||||
): Behavior[Message] = Behaviors.setup { ctx =>
|
||||
val server = new Server(dar)
|
||||
|
||||
// http doesn't know about akka typed so provide untyped system
|
||||
implicit val untypedSystem: akka.actor.ActorSystem = ctx.system.toClassic
|
||||
implicit val materializer: Materializer = Materializer(untypedSystem)
|
||||
implicit val esf: ExecutionSequencerFactory =
|
||||
new AkkaExecutionSequencerPool("TriggerService")(untypedSystem)
|
||||
|
||||
dar.foreach(addDar(compiledPackages, _))
|
||||
def startTrigger(
|
||||
ctx: ActorContext[Server.Message],
|
||||
token: (Jwt, JwtPayload),
|
||||
trigger: Trigger,
|
||||
params: StartParams,
|
||||
ledgerConfig: LedgerConfig,
|
||||
maxInboundMessageSize: Int,
|
||||
maxFailureNumberOfRetries: Int,
|
||||
failureRetryTimeRange: Duration,
|
||||
): JsValue = {
|
||||
val jwt: Jwt = token._1
|
||||
val jwtPayload: JwtPayload = token._2
|
||||
val party: Party = Party(jwtPayload.party);
|
||||
val uuid = UUID.randomUUID
|
||||
val ident = uuid.toString
|
||||
val ref = ctx.spawn(
|
||||
TriggerRunner(
|
||||
new TriggerRunner.Config(
|
||||
ctx.self,
|
||||
uuid,
|
||||
jwt,
|
||||
server.compiledPackages,
|
||||
trigger,
|
||||
ledgerConfig,
|
||||
maxInboundMessageSize,
|
||||
maxFailureNumberOfRetries,
|
||||
failureRetryTimeRange,
|
||||
party),
|
||||
ident
|
||||
),
|
||||
ident + "-monitor"
|
||||
)
|
||||
JsObject(("triggerId", uuid.toString.toJson))
|
||||
}
|
||||
|
||||
def stopTrigger(uuid: UUID, token: (Jwt, JwtPayload)): JsValue = {
|
||||
//TODO(SF, 2020-05-20): At least check that the provided token
|
||||
//is the same as the one used to start the trigger and fail with
|
||||
//'Unauthorized' if not (expect we'll be able to do better than
|
||||
//this).
|
||||
val actorWithToken = server.actorWithToken(uuid)
|
||||
actorWithToken.ref ! TriggerRunner.Stop
|
||||
server.removeRunningTrigger(uuid, actorWithToken.token, actorWithToken.ref)
|
||||
JsObject(("triggerId", uuid.toString.toJson))
|
||||
}
|
||||
|
||||
def listTriggers(token: (Jwt, JwtPayload)): JsValue = {
|
||||
JsObject(("triggerIds", server.listRunningTriggers(token._1).toJson))
|
||||
}
|
||||
|
||||
// 'fileUpload' triggers this warning we don't have a fix right
|
||||
// now so we disable it.
|
||||
@ -190,7 +235,7 @@ object Server {
|
||||
complete(
|
||||
errorResponse(StatusCodes.UnprocessableEntity, unauthorized.message)),
|
||||
token =>
|
||||
Trigger.fromIdentifier(compiledPackages, params.identifier) match {
|
||||
Trigger.fromIdentifier(server.compiledPackages, params.identifier) match {
|
||||
case Left(err) =>
|
||||
complete(errorResponse(StatusCodes.UnprocessableEntity, err))
|
||||
case Right(trigger) => {
|
||||
@ -200,7 +245,10 @@ object Server {
|
||||
trigger,
|
||||
params,
|
||||
ledgerConfig,
|
||||
maxInboundMessageSize)
|
||||
maxInboundMessageSize,
|
||||
maxFailureNumberOfRetries,
|
||||
failureRetryTimeRange
|
||||
)
|
||||
complete(successResponse(triggerId))
|
||||
}
|
||||
}
|
||||
@ -226,7 +274,7 @@ object Server {
|
||||
val dar = encodedDar.map {
|
||||
case (pkgId, payload) => Decode.readArchivePayload(pkgId, payload)
|
||||
}
|
||||
addDar(compiledPackages, dar)
|
||||
server.addDar(dar)
|
||||
val mainPackageId = JsObject(("mainPackageId", dar.main._1.name.toJson))
|
||||
complete(successResponse(mainPackageId))
|
||||
} catch {
|
||||
@ -293,27 +341,55 @@ object Server {
|
||||
},
|
||||
)
|
||||
|
||||
// The server binding is a future that on completion will be piped
|
||||
// to a message to this actor.
|
||||
val serverBinding = Http().bindAndHandle(Route.handlerFlow(route), host, port)
|
||||
ctx.pipeToSelf(serverBinding) {
|
||||
case Success(binding) => Started(binding)
|
||||
case Failure(ex) => StartFailed(ex)
|
||||
}
|
||||
|
||||
// The server running server.
|
||||
def running(binding: ServerBinding): Behavior[Message] =
|
||||
Behaviors
|
||||
.receiveMessage[Message] {
|
||||
case StartFailed(_) => Behaviors.unhandled
|
||||
case Started(_) => Behaviors.unhandled
|
||||
case TriggerStarting(uuid, jwt, runner) =>
|
||||
// Nothing to do at this time.
|
||||
Behaviors.same
|
||||
case TriggerStarted(uuid, jwt, runner) =>
|
||||
// The trigger has successfully started. Update the
|
||||
// running triggers tables.
|
||||
server.addRunningTrigger(uuid, jwt, runner)
|
||||
Behaviors.same
|
||||
case TriggerInitializationFailure(uuid, jwt, runner, _) =>
|
||||
// The trigger has failed to start. Send the runner a stop
|
||||
// message. There's no point in it remaining alive since
|
||||
// its child actor is stopped and won't be restarted.
|
||||
runner ! TriggerRunner.Stop
|
||||
// No need to update the running triggers tables since
|
||||
// this trigger never made it there.
|
||||
Behaviors.same
|
||||
case TriggerRuntimeFailure(uuid, jwt, runner, _) =>
|
||||
// The trigger has failed. Remove it from the running
|
||||
// triggers tables.
|
||||
server.removeRunningTrigger(uuid, jwt, runner)
|
||||
// Don't send any messages to the runner. Its supervision
|
||||
// strategy will automatically restart the trigger up to
|
||||
// some number of times beyond which it will remain
|
||||
// stopped.
|
||||
Behaviors.same
|
||||
case GetServerBinding(replyTo) =>
|
||||
replyTo ! binding
|
||||
Behaviors.same
|
||||
case StartFailed(_) => Behaviors.unhandled // Will never be received in this server
|
||||
case Started(_) => Behaviors.unhandled // Will never be received in this server
|
||||
case Stop =>
|
||||
ctx.log.info(
|
||||
"Stopping server http://{}:{}/",
|
||||
binding.localAddress.getHostString,
|
||||
binding.localAddress.getPort,
|
||||
)
|
||||
Behaviors.stopped
|
||||
Behaviors.stopped // Automatically stops all actors.
|
||||
}
|
||||
.receiveSignal {
|
||||
case (_, PostStop) =>
|
||||
@ -321,6 +397,7 @@ object Server {
|
||||
Behaviors.same
|
||||
}
|
||||
|
||||
// The server starting server.
|
||||
def starting(
|
||||
wasStopped: Boolean,
|
||||
req: Option[ActorRef[ServerBinding]]): Behaviors.Receive[Message] =
|
||||
@ -340,11 +417,14 @@ object Server {
|
||||
req.foreach(ref => ref ! binding)
|
||||
if (wasStopped) ctx.self ! Stop
|
||||
running(binding)
|
||||
case GetServerBinding(replyTo) => starting(wasStopped, Some(replyTo))
|
||||
case GetServerBinding(replyTo) =>
|
||||
starting(wasStopped, Some(replyTo))
|
||||
case Stop =>
|
||||
// we got a stop message but haven't completed starting yet,
|
||||
// we cannot stop until starting has completed
|
||||
// We got a stop message but haven't completed starting
|
||||
// yet. We cannot stop until starting has completed.
|
||||
starting(wasStopped = true, req = None)
|
||||
case _ =>
|
||||
Behaviors.unhandled
|
||||
}
|
||||
|
||||
starting(wasStopped = false, req = None)
|
||||
@ -358,10 +438,21 @@ object ServiceMain {
|
||||
port: Int,
|
||||
ledgerConfig: LedgerConfig,
|
||||
maxInboundMessageSize: Int,
|
||||
maxFailureNumberOfRetries: Int,
|
||||
failureRetryTimeRange: Duration,
|
||||
dar: Option[Dar[(PackageId, Package)]])
|
||||
: Future[(ServerBinding, ActorSystem[Server.Message])] = {
|
||||
val system: ActorSystem[Server.Message] =
|
||||
ActorSystem(Server(host, port, ledgerConfig, maxInboundMessageSize, dar), "TriggerService")
|
||||
ActorSystem(
|
||||
Server(
|
||||
host,
|
||||
port,
|
||||
ledgerConfig,
|
||||
maxInboundMessageSize,
|
||||
maxFailureNumberOfRetries,
|
||||
failureRetryTimeRange,
|
||||
dar),
|
||||
"TriggerService")
|
||||
// timeout chosen at random, change freely if you see issues
|
||||
implicit val timeout: Timeout = 15.seconds
|
||||
implicit val scheduler: Scheduler = system.scheduler
|
||||
@ -403,8 +494,16 @@ object ServiceMain {
|
||||
)
|
||||
val system: ActorSystem[Server.Message] =
|
||||
ActorSystem(
|
||||
Server("localhost", config.httpPort, ledgerConfig, config.maxInboundMessageSize, dar),
|
||||
"TriggerService")
|
||||
Server(
|
||||
"localhost",
|
||||
config.httpPort,
|
||||
ledgerConfig,
|
||||
config.maxInboundMessageSize,
|
||||
config.maxFailureNumberOfRetries,
|
||||
config.failureRetryTimeRange,
|
||||
dar),
|
||||
"TriggerService"
|
||||
)
|
||||
// Timeout chosen at random, change freely if you see issues.
|
||||
implicit val timeout: Timeout = 15.seconds
|
||||
implicit val scheduler: Scheduler = system.scheduler
|
||||
|
@ -14,6 +14,8 @@ import akka.stream.Materializer
|
||||
import com.typesafe.scalalogging.StrictLogging
|
||||
import com.daml.grpc.adapter.ExecutionSequencerFactory
|
||||
|
||||
class InitializationException(s: String) extends Exception(s) {}
|
||||
|
||||
object TriggerRunner {
|
||||
type Config = TriggerRunnerImpl.Config
|
||||
|
||||
@ -35,14 +37,27 @@ class TriggerRunner(
|
||||
|
||||
import TriggerRunner.{Message, Stop}
|
||||
|
||||
// Spawn a trigger runner impl. Supervise it. If it fails to start
|
||||
// its runner, send it a stop message (the server will later send us
|
||||
// a stop message in due course in this case so this actor will get
|
||||
// garbage collected too). If it something bad happens when the
|
||||
// trigger is running, try to restart it up to 3 times.
|
||||
private val child =
|
||||
ctx.spawn(Behaviors.supervise(TriggerRunnerImpl(config)).onFailure(restart), name)
|
||||
ctx.spawn(
|
||||
Behaviors
|
||||
.supervise(
|
||||
Behaviors
|
||||
.supervise(TriggerRunnerImpl(ctx.self, config))
|
||||
.onFailure[InitializationException](stop))
|
||||
.onFailure[RuntimeException](
|
||||
restart.withLimit(config.maxFailureNumberOfRetries, config.failureRetryTimeRange)),
|
||||
name
|
||||
)
|
||||
|
||||
override def onMessage(msg: Message): Behavior[Message] =
|
||||
Behaviors.receiveMessagePartial[Message] {
|
||||
case Stop =>
|
||||
child ! Stop
|
||||
Behaviors.stopped
|
||||
Behaviors.stopped // Automatically stops the child actor if running.
|
||||
}
|
||||
|
||||
override def onSignal: PartialFunction[Signal, Behavior[Message]] = {
|
||||
|
@ -3,6 +3,7 @@
|
||||
|
||||
package com.daml.lf.engine.trigger
|
||||
|
||||
import akka.actor.typed.{ActorRef}
|
||||
import akka.actor.typed.{Behavior, PostStop}
|
||||
import akka.actor.typed.PostStop
|
||||
import akka.actor.typed.PreRestart
|
||||
@ -24,28 +25,45 @@ import com.daml.ledger.client.configuration.{
|
||||
LedgerClientConfiguration,
|
||||
LedgerIdRequirement
|
||||
}
|
||||
import com.daml.jwt.domain.Jwt
|
||||
|
||||
import java.util.UUID
|
||||
import java.time.Duration
|
||||
|
||||
object TriggerRunnerImpl {
|
||||
case class Config(
|
||||
server: ActorRef[Server.Message],
|
||||
triggerId: UUID,
|
||||
jwt: Jwt,
|
||||
compiledPackages: CompiledPackages,
|
||||
trigger: Trigger,
|
||||
ledgerConfig: LedgerConfig,
|
||||
maxInboundMessageSize: Int,
|
||||
maxFailureNumberOfRetries: Int,
|
||||
failureRetryTimeRange: Duration,
|
||||
party: Party,
|
||||
)
|
||||
|
||||
import TriggerRunner.{Message, Stop}
|
||||
final case class Failed(error: Throwable) extends Message
|
||||
final case class QueryACSFailed(cause: Throwable) extends Message
|
||||
final case class QueriedACS(runner: Runner, acs: Seq[CreatedEvent], offset: LedgerOffset)
|
||||
final private case class Failed(error: Throwable) extends Message
|
||||
final private case class QueryACSFailed(cause: Throwable) extends Message
|
||||
final private case class QueriedACS(runner: Runner, acs: Seq[CreatedEvent], offset: LedgerOffset)
|
||||
extends Message
|
||||
import Server.{
|
||||
TriggerStarting,
|
||||
TriggerStarted,
|
||||
TriggerInitializationFailure,
|
||||
TriggerRuntimeFailure
|
||||
}
|
||||
|
||||
def apply(config: Config)(
|
||||
def apply(parent: ActorRef[Message], config: Config)(
|
||||
implicit esf: ExecutionSequencerFactory,
|
||||
mat: Materializer): Behavior[Message] =
|
||||
Behaviors.setup { ctx =>
|
||||
implicit val ec: ExecutionContext = ctx.executionContext
|
||||
val name = ctx.self.path.name
|
||||
implicit val ec: ExecutionContext = ctx.executionContext
|
||||
// Report to the server that this trigger is starting.
|
||||
config.server ! TriggerStarting(config.triggerId, config.jwt, parent)
|
||||
ctx.log.info(s"Trigger ${name} is starting")
|
||||
val appId = ApplicationId(name)
|
||||
val clientConfig = LedgerClientConfiguration(
|
||||
@ -62,16 +80,46 @@ object TriggerRunnerImpl {
|
||||
Behaviors.receiveMessagePartial[Message] {
|
||||
case QueryACSFailed(cause) =>
|
||||
if (wasStopped) {
|
||||
// Never mind that it failed - we were asked to stop
|
||||
// anyway.
|
||||
Behaviors.stopped;
|
||||
// Report the failure to the server.
|
||||
config.server ! TriggerInitializationFailure(
|
||||
config.triggerId,
|
||||
config.jwt,
|
||||
parent,
|
||||
cause.toString)
|
||||
// Tell our monitor there's been a failure. The
|
||||
// monitor's supervision strategy will respond to this
|
||||
// by writing the exception to the log and stopping this
|
||||
// actor.
|
||||
throw new InitializationException("User stopped")
|
||||
} else {
|
||||
throw new RuntimeException("ACS query failed", cause)
|
||||
// Report the failure to the server.
|
||||
config.server ! TriggerInitializationFailure(
|
||||
config.triggerId,
|
||||
config.jwt,
|
||||
parent,
|
||||
cause.toString)
|
||||
// Tell our monitor there's been a failure. The
|
||||
// monitor's supervisor strategy will respond to this by
|
||||
// writing the exception to the log and stopping this
|
||||
// actor.
|
||||
throw new InitializationException("Couldn't start: " + cause.toString)
|
||||
}
|
||||
case QueriedACS(runner, acs, offset) =>
|
||||
if (wasStopped) {
|
||||
Behaviors.stopped;
|
||||
// Report that we won't be going on to the server.
|
||||
config.server ! TriggerInitializationFailure(
|
||||
config.triggerId,
|
||||
config.jwt,
|
||||
parent,
|
||||
"User stopped")
|
||||
// Tell our monitor there's been a failure. The
|
||||
// monitor's supervisor strategy will respond to this
|
||||
// writing the exception to the log and stopping this
|
||||
// actor.
|
||||
throw new InitializationException("User stopped")
|
||||
} else {
|
||||
// The trigger is a future that we only expect to
|
||||
// complete if something goes wrong.
|
||||
val (killSwitch, trigger) = runner.runWithACS(
|
||||
acs,
|
||||
offset,
|
||||
@ -83,9 +131,14 @@ object TriggerRunnerImpl {
|
||||
// is sent to a now terminated actor. We should fix this
|
||||
// somehow™.
|
||||
ctx.pipeToSelf(trigger) {
|
||||
case Success(_) => Failed(new RuntimeException("Trigger exited unexpectedly"))
|
||||
case Failure(cause) => Failed(cause)
|
||||
case Success(_) =>
|
||||
Failed(new RuntimeException("Trigger exited unexpectedly"))
|
||||
case Failure(cause) =>
|
||||
Failed(cause)
|
||||
}
|
||||
// Report to the server that this trigger is entering
|
||||
// the running state.
|
||||
config.server ! TriggerStarted(config.triggerId, config.jwt, parent)
|
||||
running(killSwitch)
|
||||
}
|
||||
case Stop =>
|
||||
@ -94,24 +147,41 @@ object TriggerRunnerImpl {
|
||||
queryingACS(wasStopped = true)
|
||||
}
|
||||
|
||||
// Trigger loop is started, wait until we should stop.
|
||||
// The trigger loop is running. The only thing to do now is wait
|
||||
// to be told to stop or respond to failures.
|
||||
def running(killSwitch: KillSwitch) =
|
||||
Behaviors
|
||||
.receiveMessagePartial[Message] {
|
||||
case Stop =>
|
||||
// Don't think about trying to send the server a message
|
||||
// here. It won't receive it (I found out the hard way).
|
||||
Behaviors.stopped
|
||||
case Failed(cause) =>
|
||||
// In the event 'runWithACS' completes it's because the
|
||||
// stream is broken. Throw an exception allowing our
|
||||
// supervisor to restart us.
|
||||
// Report the failure to the server.
|
||||
config.server ! TriggerRuntimeFailure(
|
||||
config.triggerId,
|
||||
config.jwt,
|
||||
parent,
|
||||
cause.toString)
|
||||
// Tell our monitor there's been a failure. The
|
||||
// monitor's supervisor strategy will respond to this by
|
||||
// writing the exception to the log and attempting to
|
||||
// restart this actor up to some number of times.
|
||||
throw new RuntimeException(cause)
|
||||
}
|
||||
.receiveSignal {
|
||||
case (_, PostStop) =>
|
||||
// Don't think about trying to send the server a message
|
||||
// here. It won't receive it (many Bothans died to bring
|
||||
// us this information).
|
||||
ctx.log.info(s"Trigger ${name} is stopping")
|
||||
killSwitch.shutdown
|
||||
Behaviors.stopped
|
||||
case (_, PreRestart) =>
|
||||
// No need to send any messages here. The server has
|
||||
// already been informed of the earlier failure and in
|
||||
// the process of being restarted, will be informed of
|
||||
// the start along the way.
|
||||
ctx.log.info(s"Trigger ${name} is being restarted")
|
||||
Behaviors.same
|
||||
}
|
||||
@ -133,11 +203,13 @@ object TriggerRunnerImpl {
|
||||
config.party.unwrap)
|
||||
(acs, offset) <- runner.queryACS()
|
||||
} yield QueriedACS(runner, acs, offset)
|
||||
|
||||
// Arrange for the completion status to be piped into a message
|
||||
// to this actor.
|
||||
ctx.pipeToSelf(acsQuery) {
|
||||
case Success(msg) => msg
|
||||
case Failure(cause) => QueryACSFailed(cause)
|
||||
}
|
||||
|
||||
queryingACS(wasStopped = false)
|
||||
}
|
||||
}
|
||||
|
@ -196,14 +196,17 @@ class ServiceTest extends AsyncFlatSpec with Eventually with Matchers with Postg
|
||||
JsObject(fields) <- parseResult(resp)
|
||||
Some(JsString(mainPackageId)) = fields.get("mainPackageId")
|
||||
_ <- mainPackageId should not be empty
|
||||
|
||||
resp <- startTrigger(uri, s"$testPkgId:TestTrigger:trigger", "Alice")
|
||||
triggerId <- parseTriggerId(resp)
|
||||
|
||||
resp <- listTriggers(uri, "Alice")
|
||||
result <- parseTriggerIds(resp)
|
||||
_ <- result should equal(Vector(triggerId))
|
||||
|
||||
_ <- Future {
|
||||
eventually {
|
||||
val r = Await.result(for {
|
||||
resp <- listTriggers(uri, "Alice")
|
||||
result <- parseTriggerIds(resp)
|
||||
} yield result, Duration.Inf)
|
||||
assert(r == Vector(triggerId))
|
||||
}
|
||||
}
|
||||
resp <- stopTrigger(uri, triggerId, "Alice")
|
||||
stoppedTriggerId <- parseTriggerId(resp)
|
||||
_ <- stoppedTriggerId should equal(triggerId)
|
||||
@ -213,45 +216,77 @@ class ServiceTest extends AsyncFlatSpec with Eventually with Matchers with Postg
|
||||
it should "start multiple triggers and list them by party" in withHttpService(Some(dar)) {
|
||||
(uri: Uri, client) =>
|
||||
for {
|
||||
// no triggers running initially
|
||||
resp <- listTriggers(uri, "Alice")
|
||||
result <- parseTriggerIds(resp)
|
||||
_ <- result should equal(Vector())
|
||||
// start trigger for Alice
|
||||
resp <- startTrigger(uri, s"$testPkgId:TestTrigger:trigger", "Alice")
|
||||
aliceTrigger <- parseTriggerId(resp)
|
||||
resp <- listTriggers(uri, "Alice")
|
||||
result <- parseTriggerIds(resp)
|
||||
_ <- result should equal(Vector(aliceTrigger))
|
||||
// start trigger for Bob
|
||||
_ <- Future {
|
||||
eventually {
|
||||
val r = Await.result(for {
|
||||
resp <- listTriggers(uri, "Alice")
|
||||
result <- parseTriggerIds(resp)
|
||||
} yield result, Duration.Inf)
|
||||
assert(r == Vector(aliceTrigger))
|
||||
}
|
||||
}
|
||||
resp <- startTrigger(uri, s"$testPkgId:TestTrigger:trigger", "Bob")
|
||||
bobTrigger1 <- parseTriggerId(resp)
|
||||
resp <- listTriggers(uri, "Bob")
|
||||
result <- parseTriggerIds(resp)
|
||||
_ <- result should equal(Vector(bobTrigger1))
|
||||
// start another trigger for Bob
|
||||
_ <- Future {
|
||||
eventually {
|
||||
val r = Await.result(for {
|
||||
resp <- listTriggers(uri, "Bob")
|
||||
result <- parseTriggerIds(resp)
|
||||
} yield result, Duration.Inf)
|
||||
assert(r == Vector(bobTrigger1))
|
||||
}
|
||||
}
|
||||
resp <- startTrigger(uri, s"$testPkgId:TestTrigger:trigger", "Bob")
|
||||
bobTrigger2 <- parseTriggerId(resp)
|
||||
resp <- listTriggers(uri, "Bob")
|
||||
result <- parseTriggerIds(resp)
|
||||
_ <- result should equal(Vector(bobTrigger1, bobTrigger2))
|
||||
// stop Alice's trigger
|
||||
_ <- Future {
|
||||
eventually {
|
||||
val r = Await.result(for {
|
||||
resp <- listTriggers(uri, "Bob")
|
||||
result <- parseTriggerIds(resp)
|
||||
} yield result, Duration.Inf)
|
||||
assert(r == Vector(bobTrigger1, bobTrigger2))
|
||||
}
|
||||
}
|
||||
resp <- stopTrigger(uri, aliceTrigger, "Alice")
|
||||
_ <- assert(resp.status.isSuccess)
|
||||
resp <- listTriggers(uri, "Alice")
|
||||
result <- parseTriggerIds(resp)
|
||||
_ <- result should equal(Vector())
|
||||
resp <- listTriggers(uri, "Bob")
|
||||
result <- parseTriggerIds(resp)
|
||||
_ <- result should equal(Vector(bobTrigger1, bobTrigger2))
|
||||
// stop Bob's triggers
|
||||
_ <- Future {
|
||||
eventually {
|
||||
val r = Await.result(for {
|
||||
resp <- listTriggers(uri, "Alice")
|
||||
result <- parseTriggerIds(resp)
|
||||
} yield result, Duration.Inf)
|
||||
assert(r == Vector())
|
||||
}
|
||||
}
|
||||
_ <- Future {
|
||||
eventually {
|
||||
val r = Await.result(for {
|
||||
resp <- listTriggers(uri, "Bob")
|
||||
result <- parseTriggerIds(resp)
|
||||
} yield result, Duration.Inf)
|
||||
assert(r == Vector(bobTrigger1, bobTrigger2))
|
||||
}
|
||||
}
|
||||
// Stop Bob's triggers
|
||||
resp <- stopTrigger(uri, bobTrigger1, "Bob")
|
||||
_ <- assert(resp.status.isSuccess)
|
||||
resp <- stopTrigger(uri, bobTrigger2, "Bob")
|
||||
_ <- assert(resp.status.isSuccess)
|
||||
resp <- listTriggers(uri, "Bob")
|
||||
result <- parseTriggerIds(resp)
|
||||
_ <- result should equal(Vector())
|
||||
_ <- Future {
|
||||
eventually {
|
||||
val r = Await.result(for {
|
||||
resp <- listTriggers(uri, "Bob")
|
||||
result <- parseTriggerIds(resp)
|
||||
} yield result, Duration.Inf)
|
||||
assert(r == Vector())
|
||||
}
|
||||
}
|
||||
} yield succeed
|
||||
}
|
||||
|
||||
|
@ -68,7 +68,10 @@ object TriggerServiceFixture {
|
||||
0,
|
||||
ledgerConfig,
|
||||
ServiceConfig.DefaultMaxInboundMessageSize,
|
||||
dar)
|
||||
ServiceConfig.DefaultMaxFailureNumberOfRetries,
|
||||
ServiceConfig.DefaultFailureRetryTimeRange,
|
||||
dar
|
||||
)
|
||||
} yield service
|
||||
|
||||
val fa: Future[A] = for {
|
||||
|
Loading…
Reference in New Issue
Block a user