Put initialization failures through retries too (#6230)

changelog_begin
changelog_end
This commit is contained in:
Shayne Fletcher 2020-06-04 15:24:51 -04:00 committed by GitHub
parent ae463b6bea
commit e23a488596
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 69 additions and 107 deletions

View File

@ -399,11 +399,8 @@ object Server {
case Right(()) => Behaviors.same case Right(()) => Behaviors.same
} }
case TriggerInitializationFailure(runningTrigger, cause) => case TriggerInitializationFailure(runningTrigger, cause) =>
// The trigger has failed to start. Send the runner a stop // The trigger has failed to start.
// message. There's no point in it remaining alive since
// its child actor is stopped and won't be restarted.
server.logTriggerStatus(runningTrigger, "stopped: initialization failure") server.logTriggerStatus(runningTrigger, "stopped: initialization failure")
runningTrigger.runner ! TriggerRunner.Stop
// No need to update the running triggers tables since // No need to update the running triggers tables since
// this trigger never made it there. // this trigger never made it there.
Behaviors.same Behaviors.same

View File

@ -37,19 +37,12 @@ class TriggerRunner(
import TriggerRunner.{Message, Stop} import TriggerRunner.{Message, Stop}
// Spawn a trigger runner impl. Supervise it. If it fails to start // Spawn a trigger runner impl. Supervise it.
// its runner, send it a stop message (the server will later send us
// a stop message in due course in this case so this actor will get
// garbage collected too). If it something bad happens when the
// trigger is running, try to restart it up to 3 times.
private val child = private val child =
ctx.spawn( ctx.spawn(
Behaviors Behaviors
.supervise( .supervise(TriggerRunnerImpl(ctx.self, config))
Behaviors .onFailure(
.supervise(TriggerRunnerImpl(ctx.self, config))
.onFailure[InitializationException](stop))
.onFailure[RuntimeException](
restart.withLimit(config.maxFailureNumberOfRetries, config.failureRetryTimeRange)), restart.withLimit(config.maxFailureNumberOfRetries, config.failureRetryTimeRange)),
name name
) )

View File

@ -10,6 +10,7 @@ import com.daml.ledger.api.refinements.ApiTypes.Party
import io.grpc.netty.NettyChannelBuilder import io.grpc.netty.NettyChannelBuilder
import scala.concurrent.{ExecutionContext, Future} import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.duration._
import scala.util.{Failure, Success} import scala.util.{Failure, Success}
import scalaz.syntax.tag._ import scalaz.syntax.tag._
import com.daml.lf.CompiledPackages import com.daml.lf.CompiledPackages
@ -137,9 +138,10 @@ object TriggerRunnerImpl {
// Report the failure to the server. // Report the failure to the server.
config.server ! TriggerInitializationFailure(runningTrigger, cause.toString) config.server ! TriggerInitializationFailure(runningTrigger, cause.toString)
// Tell our monitor there's been a failure. The // Tell our monitor there's been a failure. The
// monitor's supervisor strategy will respond to this by // monitor's supervisor strategy will respond to
// writing the exception to the log and stopping this // this by writing the exception to the log and
// actor. // attempting to restart this actor up to some
// number of times.
throw new InitializationException("Couldn't start: " + cause.toString) throw new InitializationException("Couldn't start: " + cause.toString)
} }
} }

View File

@ -400,24 +400,11 @@ class ServiceTest extends AsyncFlatSpec with Eventually with Matchers with Postg
// Confirm that the running trigger ends up stopped and that // Confirm that the running trigger ends up stopped and that
// its history matches our expectations. // its history matches our expectations.
_ <- assertTriggerIds(uri, "Alice", _.isEmpty) _ <- assertTriggerIds(uri, "Alice", _.isEmpty)
_ <- assertTriggerStatus( _ <- assertTriggerStatus(uri, aliceTrigger, _.last == "stopped: initialization failure")
uri,
aliceTrigger,
_ ==
Vector(
"starting",
"running",
"stopped: runtime failure",
"starting",
"stopped: initialization failure"))
} yield succeed } yield succeed
} }
// TODO(SF, 2020-06-05): This test is temporarily disabled as too it should "restart a trigger failing due to a dropped connection" in withHttpService(Some(dar)) {
// fragile. There is a race condition on the trigger restart and the
// network being unavailable (see
// https://dev.azure.com/digitalasset/adadc18a-d7df-446a-aacb-86042c1619c6/_apis/build/builds/45230/logs/124).
ignore should "restart a failing trigger if possible" in withHttpService(Some(dar)) {
(uri: Uri, client: LedgerClient, ledgerProxy: Proxy) => (uri: Uri, client: LedgerClient, ledgerProxy: Proxy) =>
// Simulate the ledger briefly being unavailable due to network // Simulate the ledger briefly being unavailable due to network
// connectivity loss. Our restart strategy means that the running // connectivity loss. Our restart strategy means that the running
@ -428,33 +415,24 @@ class ServiceTest extends AsyncFlatSpec with Eventually with Matchers with Postg
aliceTrigger <- parseTriggerId(resp) aliceTrigger <- parseTriggerId(resp)
// Proceed when it is confirmed to be running. // Proceed when it is confirmed to be running.
_ <- assertTriggerIds(uri, "Alice", _ == Vector(aliceTrigger)) _ <- assertTriggerIds(uri, "Alice", _ == Vector(aliceTrigger))
// Simulate brief network connectivity loss. This will cause the // Simulate brief network connectivity loss.
// running trigger's flow graph to complete with failure. Don't
// wait around to restore the network or the restart strategy
// will in turn lead to the stop strategy killing the trigger
// due to the lack of ability to initialize the restarted
// trigger.
_ <- Future { ledgerProxy.disable() } _ <- Future { ledgerProxy.disable() }
_ <- Future { ledgerProxy.enable() } _ <- Future { ledgerProxy.enable() }
// To conclude, check that the trigger survived the network // Check that the trigger survived the outage and that its
// outage and that its history indicates it went through a // history shows it went through a restart.
// restart to do so.
_ <- assertTriggerIds(uri, "Alice", _ == Vector(aliceTrigger)) _ <- assertTriggerIds(uri, "Alice", _ == Vector(aliceTrigger))
_ <- assertTriggerStatus( _ <- assertTriggerStatus(
uri, uri,
aliceTrigger, aliceTrigger,
_ == triggerStatus => {
Vector( triggerStatus.count(_ == "stopped: runtime failure") == 1 &&
"starting", triggerStatus.last == "running"
"running", }
"stopped: runtime failure", )
"starting",
"running"
))
} yield succeed } yield succeed
} }
it should "stop a trigger when the user script fails init" in withHttpService(Some(dar)) { it should "restart triggers with script init errors" in withHttpService(Some(dar)) {
(uri: Uri, client: LedgerClient, ledgerProxy: Proxy) => (uri: Uri, client: LedgerClient, ledgerProxy: Proxy) =>
for { for {
resp <- startTrigger(uri, s"$testPkgId:ErrorTrigger:trigger", "Alice") resp <- startTrigger(uri, s"$testPkgId:ErrorTrigger:trigger", "Alice")
@ -462,15 +440,17 @@ class ServiceTest extends AsyncFlatSpec with Eventually with Matchers with Postg
_ <- assertTriggerStatus( _ <- assertTriggerStatus(
uri, uri,
aliceTrigger, aliceTrigger,
_ == triggerStatus => {
Vector( triggerStatus.count(_ == "starting") ==
"starting", ServiceConfig.DefaultMaxFailureNumberOfRetries + 1 &&
"stopped: initialization failure", triggerStatus.last == "stopped: initialization failure"
)) }
)
_ <- assertTriggerIds(uri, "Alice", _.isEmpty)
} yield succeed } yield succeed
} }
it should "restart triggers with errors in user script" in withHttpService(Some(dar)) { it should "restart triggers with script update errors" in withHttpService(Some(dar)) {
(uri: Uri, client: LedgerClient, ledgerProxy: Proxy) => (uri: Uri, client: LedgerClient, ledgerProxy: Proxy) =>
for { for {
resp <- startTrigger(uri, s"$testPkgId:LowLevelErrorTrigger:trigger", "Alice") resp <- startTrigger(uri, s"$testPkgId:LowLevelErrorTrigger:trigger", "Alice")
@ -478,68 +458,58 @@ class ServiceTest extends AsyncFlatSpec with Eventually with Matchers with Postg
_ <- assertTriggerStatus( _ <- assertTriggerStatus(
uri, uri,
aliceTrigger, aliceTrigger,
_ == triggerStatus => {
Vector( triggerStatus
"starting", .count(_ == "running") == ServiceConfig.DefaultMaxFailureNumberOfRetries + 1 &&
"running", triggerStatus.last == "stopped: runtime failure"
"stopped: runtime failure", }
"starting",
"running",
"stopped: runtime failure",
"starting",
"running",
"stopped: runtime failure",
"starting",
"running",
"stopped: runtime failure"
)
) )
_ <- assertTriggerIds(uri, "Alice", _.isEmpty) _ <- assertTriggerIds(uri, "Alice", _.isEmpty)
} yield succeed } yield succeed
} }
it should "stopping a trigger without providing a token should be unauthorized" in withHttpService( it should "not act on a stop request without a token" in withHttpService(None) {
None) { (uri: Uri, client: LedgerClient, ledgerProxy: Proxy) =>
val uuid: String = "ffffffff-ffff-ffff-ffff-ffffffffffff"
val req = HttpRequest(
method = HttpMethods.DELETE,
uri = uri.withPath(Uri.Path(s"/v1/stop/$uuid")),
)
for {
resp <- Http().singleRequest(req)
body <- responseBodyToString(resp)
JsObject(fields) = body.parseJson
_ <- fields.get("status") should equal(Some(JsNumber(422)))
_ <- fields.get("errors") should equal(
Some(JsArray(JsString("missing Authorization header with OAuth 2.0 Bearer Token"))))
} yield succeed
}
it should "stopping a trigger that can't parse as a UUID gives a 404 response" in withHttpService(
None) { (uri: Uri, client: LedgerClient, ledgerProxy: Proxy) =>
val uuid: String = "No More Mr Nice Guy"
val req = HttpRequest(
method = HttpMethods.DELETE,
uri = uri.withPath(Uri.Path(s"/v1/stop/$uuid")),
)
for {
resp <- Http().singleRequest(req)
_ <- assert(resp.status.isFailure() && resp.status.intValue() == 404)
} yield succeed
}
it should "stopping an unknown trigger gives an error response" in withHttpService(None) {
(uri: Uri, client: LedgerClient, ledgerProxy: Proxy) => (uri: Uri, client: LedgerClient, ledgerProxy: Proxy) =>
val uuid = UUID.fromString("ffffffff-ffff-ffff-ffff-ffffffffffff") val uuid: String = "ffffffff-ffff-ffff-ffff-ffffffffffff"
val req = HttpRequest(
method = HttpMethods.DELETE,
uri = uri.withPath(Uri.Path(s"/v1/stop/$uuid")),
)
for { for {
resp <- stopTrigger(uri, uuid, "Alice") resp <- Http().singleRequest(req)
_ <- assert(resp.status.isFailure() && resp.status.intValue() == 404)
body <- responseBodyToString(resp) body <- responseBodyToString(resp)
JsObject(fields) = body.parseJson JsObject(fields) = body.parseJson
_ <- fields.get("status") should equal(Some(JsNumber(404))) _ <- fields.get("status") should equal(Some(JsNumber(422)))
_ <- fields.get("errors") should equal( _ <- fields.get("errors") should equal(
Some(JsArray(JsString("Unknown trigger: '" + uuid.toString + "'")))) Some(JsArray(JsString("missing Authorization header with OAuth 2.0 Bearer Token"))))
} yield succeed } yield succeed
} }
it should "give a 404 response for a stop request with unparseable UUID" in withHttpService(None) {
(uri: Uri, client: LedgerClient, ledgerProxy: Proxy) =>
val uuid: String = "No More Mr Nice Guy"
val req = HttpRequest(
method = HttpMethods.DELETE,
uri = uri.withPath(Uri.Path(s"/v1/stop/$uuid")),
)
for {
resp <- Http().singleRequest(req)
_ <- assert(resp.status.isFailure() && resp.status.intValue() == 404)
} yield succeed
}
it should "give a 404 error response for a stop request on an unknown UUID" in withHttpService(
None) { (uri: Uri, client: LedgerClient, ledgerProxy: Proxy) =>
val uuid = UUID.fromString("ffffffff-ffff-ffff-ffff-ffffffffffff")
for {
resp <- stopTrigger(uri, uuid, "Alice")
_ <- assert(resp.status.isFailure() && resp.status.intValue() == 404)
body <- responseBodyToString(resp)
JsObject(fields) = body.parseJson
_ <- fields.get("status") should equal(Some(JsNumber(404)))
_ <- fields.get("errors") should equal(
Some(JsArray(JsString("Unknown trigger: '" + uuid.toString + "'"))))
} yield succeed
}
} }