handle stop message received before acs query complete (#5801)

changelog_begin
changelog_end
This commit is contained in:
Shayne Fletcher 2020-05-01 11:24:58 -04:00 committed by GitHub
parent 57c08d4819
commit e7741dd850
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -84,9 +84,9 @@ object TriggerActor {
def apply(
config: Config,
)(implicit esf: ExecutionSequencerFactory, mat: Materializer): Behavior[Message] =
Behaviors.setup { context =>
implicit val ec: ExecutionContext = context.executionContext
val name = context.self.path.name
Behaviors.setup { ctx =>
implicit val ec: ExecutionContext = ctx.executionContext
val name = ctx.self.path.name
val appId = ApplicationId(name)
val clientConfig = LedgerClientConfiguration(
applicationId = appId.unwrap,
@ -96,24 +96,42 @@ object TriggerActor {
sslContext = None,
)
// Waiting for the ACS query to finish so we can build the initial state.
// TODO We should handle being stopped while querying the ACS.
def queryingACS() = Behaviors.receiveMessagePartial[Message] {
case QueryACSFailed(cause) => throw new RuntimeException("ACS query failed", cause)
case QueriedACS(runner, acs, offset) =>
val (killSwitch, trigger) = runner.runWithACS(
acs,
offset,
msgFlow = KillSwitches.single[TriggerMsg],
)
// TODO If we are stopped we will end up causing the future to complete which will trigger
// a message that is sent to a now terminated actor. We should fix this somehow.
context.pipeToSelf(trigger) {
case Success(_) => Failed(new RuntimeException("Trigger exited unexpectedly"))
case Failure(cause) => Failed(cause)
}
running(killSwitch)
}
// Waiting for the ACS query to finish so we can build the
// initial state.
def queryingACS(wasStopped: Boolean): Behaviors.Receive[Message] =
Behaviors.receiveMessagePartial[Message] {
case QueryACSFailed(cause) =>
if (wasStopped) {
// Never mind that it failed - we were asked to stop
// anyway.
Behaviors.stopped;
} else {
throw new RuntimeException("ACS query failed", cause)
}
case QueriedACS(runner, acs, offset) =>
if (wasStopped) {
Behaviors.stopped;
} else {
val (killSwitch, trigger) = runner.runWithACS(
acs,
offset,
msgFlow = KillSwitches.single[TriggerMsg],
)
// TODO If we are stopped we will end up causing the
// future to complete which will trigger a message that
// is sent to a now terminated actor. We should fix this
// somehow.
ctx.pipeToSelf(trigger) {
case Success(_) => Failed(new RuntimeException("Trigger exited unexpectedly"))
case Failure(cause) => Failed(cause)
}
running(killSwitch)
}
case Stop =>
// We got a stop message but the ACS query hasn't
// completed yet.
queryingACS(wasStopped = true)
}
// Trigger loop is started, wait until we should stop.
def running(killSwitch: KillSwitch) =
@ -143,11 +161,11 @@ object TriggerActor {
(acs, offset) <- runner.queryACS()
} yield QueriedACS(runner, acs, offset)
context.pipeToSelf(acsQuery) {
ctx.pipeToSelf(acsQuery) {
case Success(msg) => msg
case Failure(cause) => QueryACSFailed(cause)
}
queryingACS()
queryingACS(wasStopped = false)
}
}
@ -344,7 +362,11 @@ object Server {
req: Option[ActorRef[ServerBinding]]): Behaviors.Receive[Message] =
Behaviors.receiveMessage[Message] {
case StartFailed(cause) =>
throw new RuntimeException("Server failed to start", cause)
if (wasStopped) {
Behaviors.stopped
} else {
throw new RuntimeException("Server failed to start", cause)
}
case Started(binding) =>
ctx.log.info(
"Server online at http://{}:{}/",