Trigger service: Change to restart strategy with backoff (#6552)

* Add min/maxRestartInterval to configs and use in trigger restart strategy

* Adapt tests with triggers failing due to lack of network

changelog_begin
changelog_end

* Adapt tests for triggers with errors

* Remove comment about number of restarts

* Use a small initial restart interval for testing

* Remove old restart params

* Move maxInboundMessageSize to LedgerConfig

* Rename TriggerRunnerConfig to TriggerRestartConfig
This commit is contained in:
Rohan Jacob-Rao 2020-06-30 20:46:23 -04:00 committed by GitHub
parent 14ca4e5e79
commit ea16ff350d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 101 additions and 144 deletions

View File

@ -45,7 +45,7 @@ import com.daml.lf.engine.trigger.dao._
class Server(
ledgerConfig: LedgerConfig,
runnerConfig: TriggerRunnerConfig,
restartConfig: TriggerRestartConfig,
secretKey: SecretKey,
triggerDao: RunningTriggerDao)(
implicit ctx: ActorContext[Message],
@ -114,7 +114,7 @@ class Server(
compiledPackages,
trigger,
ledgerConfig,
runnerConfig,
restartConfig,
party
),
triggerInstance.toString
@ -282,7 +282,7 @@ object Server {
host: String,
port: Int,
ledgerConfig: LedgerConfig,
runnerConfig: TriggerRunnerConfig,
restartConfig: TriggerRestartConfig,
initialDar: Option[Dar[(PackageId, DamlLf.ArchivePayload)]],
jdbcConfig: Option[JdbcConfig],
initDb: Boolean,
@ -329,7 +329,7 @@ object Server {
val (triggerDao: RunningTriggerDao, server: Server) = jdbcConfig match {
case None =>
val dao = InMemoryTriggerDao()
val server = new Server(ledgerConfig, runnerConfig, secretKey, dao)
val server = new Server(ledgerConfig, restartConfig, secretKey, dao)
(dao, server)
case Some(c) =>
val dao = DbTriggerDao(c)
@ -338,7 +338,7 @@ object Server {
ctx.log.error(err)
sys.exit(1)
case Right(pkgs) =>
val server = new Server(ledgerConfig, runnerConfig, secretKey, dao)
val server = new Server(ledgerConfig, restartConfig, secretKey, dao)
server.addPackagesInMemory(pkgs)
ctx.log.info("Successfully recovered packages from database.")
(dao, server)

View File

@ -9,6 +9,9 @@ import java.time.Duration
import com.daml.platform.services.time.TimeProviderType
import scalaz.Show
import scala.concurrent.duration
import scala.concurrent.duration.FiniteDuration
case class ServiceConfig(
// For convenience, we allow passing in a DAR on startup
// as opposed to uploading it dynamically.
@ -17,10 +20,8 @@ case class ServiceConfig(
ledgerHost: String,
ledgerPort: Int,
maxInboundMessageSize: Int,
// These 2 parameters mean that a failing trigger will be
// restarted up to n times within k seconds.
maxFailureNumberOfRetries: Int,
failureRetryTimeRange: Duration, // in seconds
minRestartInterval: FiniteDuration,
maxRestartInterval: FiniteDuration,
timeProviderType: TimeProviderType,
commandTtl: Duration,
init: Boolean,
@ -76,8 +77,8 @@ object JdbcConfig {
object ServiceConfig {
val DefaultHttpPort: Int = 8088
val DefaultMaxInboundMessageSize: Int = RunnerConfig.DefaultMaxInboundMessageSize
val DefaultMaxFailureNumberOfRetries: Int = 3
val DefaultFailureRetryTimeRange: Duration = Duration.ofSeconds(60)
val DefaultMinRestartInterval: FiniteDuration = FiniteDuration(5, duration.SECONDS)
val DefaultMaxRestartInterval: FiniteDuration = FiniteDuration(60, duration.SECONDS)
private val parser = new scopt.OptionParser[ServiceConfig]("trigger-service") {
head("trigger-service")
@ -108,18 +109,17 @@ object ServiceConfig {
.text(
s"Optional max inbound message size in bytes. Defaults to ${DefaultMaxInboundMessageSize}.")
opt[Int]("max-failure-number-of-retries")
.action((x, c) => c.copy(maxFailureNumberOfRetries = x))
opt[Long]("min-restart-interval")
.action((x, c) => c.copy(minRestartInterval = FiniteDuration(x, duration.SECONDS)))
.optional()
.text(
s"Max number of times to try to restart a failing trigger (within allowed time range). Defaults to ${DefaultMaxFailureNumberOfRetries}.")
s"Minimum time interval before restarting a failed trigger. Defaults to ${DefaultMinRestartInterval.toSeconds} seconds.")
opt[Long]("failure-retry-time-range")
.action { (t, c) =>
c.copy(failureRetryTimeRange = Duration.ofSeconds(t))
}
opt[Long]("max-restart-interval")
.action((x, c) => c.copy(maxRestartInterval = FiniteDuration(x, duration.SECONDS)))
.optional()
.text(
"Allow up to max number of restarts of a failing trigger within this many seconds. Defaults to " + DefaultFailureRetryTimeRange.getSeconds.toString + "s.")
s"Maximum time interval between restarting a failed trigger. Defaults to ${DefaultMaxRestartInterval.toSeconds} seconds.")
opt[Unit]('w', "wall-clock-time")
.action { (t, c) =>
@ -159,8 +159,8 @@ object ServiceConfig {
ledgerHost = null,
ledgerPort = 0,
maxInboundMessageSize = DefaultMaxInboundMessageSize,
maxFailureNumberOfRetries = DefaultMaxFailureNumberOfRetries,
failureRetryTimeRange = DefaultFailureRetryTimeRange,
minRestartInterval = DefaultMinRestartInterval,
maxRestartInterval = DefaultMaxRestartInterval,
timeProviderType = TimeProviderType.Static,
commandTtl = Duration.ofSeconds(30L),
init = false,

View File

@ -27,7 +27,7 @@ object ServiceMain {
host: String,
port: Int,
ledgerConfig: LedgerConfig,
runnerConfig: TriggerRunnerConfig,
restartConfig: TriggerRestartConfig,
encodedDar: Option[Dar[(PackageId, DamlLf.ArchivePayload)]],
jdbcConfig: Option[JdbcConfig],
noSecretKey: Boolean,
@ -39,7 +39,7 @@ object ServiceMain {
host,
port,
ledgerConfig,
runnerConfig,
restartConfig,
encodedDar,
jdbcConfig,
initDb = false, // for tests we initialize the database in beforeEach clause
@ -73,11 +73,11 @@ object ServiceMain {
config.ledgerPort,
config.timeProviderType,
config.commandTtl,
config.maxInboundMessageSize,
)
val runnerConfig = TriggerRunnerConfig(
config.maxInboundMessageSize,
config.maxFailureNumberOfRetries,
config.failureRetryTimeRange,
val restartConfig = TriggerRestartConfig(
config.minRestartInterval,
config.maxRestartInterval,
)
val system: ActorSystem[Message] =
ActorSystem(
@ -85,7 +85,7 @@ object ServiceMain {
"localhost",
config.httpPort,
ledgerConfig,
runnerConfig,
restartConfig,
encodedDar,
config.jdbcConfig,
config.init,

View File

@ -49,9 +49,10 @@ class TriggerRunner(
.onFailure[InitializationHalted](stop)
)
.onFailure(
restart.withLimit(
config.runnerConfig.maxFailureNumberOfRetries,
config.runnerConfig.failureRetryTimeRange)),
restartWithBackoff(
config.restartConfig.minRestartInterval,
config.restartConfig.maxRestartInterval,
config.restartConfig.restartIntervalRandomFactor)),
name
)

View File

@ -37,7 +37,7 @@ object TriggerRunnerImpl {
compiledPackages: CompiledPackages,
trigger: Trigger,
ledgerConfig: LedgerConfig,
runnerConfig: TriggerRunnerConfig,
restartConfig: TriggerRestartConfig,
party: Party,
)
@ -136,8 +136,7 @@ object TriggerRunnerImpl {
// Tell our monitor there's been a failure. The
// monitor's supervisor strategy will respond to
// this by writing the exception to the log and
// attempting to restart this actor up to some
// number of times.
// attempting to restart this actor.
throw new InitializationException("Couldn't start: " + cause.toString)
}
}
@ -162,7 +161,7 @@ object TriggerRunnerImpl {
// Tell our monitor there's been a failure. The
// monitor's supervisor strategy will respond to this by
// writing the exception to the log and attempting to
// restart this actor up to some number of times.
// restart this actor.
throw new RuntimeException(cause)
}
.receiveSignal {
@ -187,7 +186,7 @@ object TriggerRunnerImpl {
.fromBuilder(
NettyChannelBuilder
.forAddress(config.ledgerConfig.host, config.ledgerConfig.port)
.maxInboundMessageSize(config.runnerConfig.maxInboundMessageSize),
.maxInboundMessageSize(config.ledgerConfig.maxInboundMessageSize),
clientConfig,
)
runner = new Runner(

View File

@ -9,6 +9,8 @@ import java.util.UUID
import com.daml.lf.data.Ref.Identifier
import com.daml.platform.services.time.TimeProviderType
import scala.concurrent.duration.FiniteDuration
package object trigger {
case class LedgerConfig(
@ -16,12 +18,13 @@ package object trigger {
port: Int,
timeProvider: TimeProviderType,
commandTtl: Duration,
maxInboundMessageSize: Int,
)
case class TriggerRunnerConfig(
maxInboundMessageSize: Int,
maxFailureNumberOfRetries: Int,
failureRetryTimeRange: Duration
case class TriggerRestartConfig(
minRestartInterval: FiniteDuration,
maxRestartInterval: FiniteDuration,
restartIntervalRandomFactor: Double = 0.2,
)
final case class SecretKey(value: String)

View File

@ -52,6 +52,9 @@ object TriggerServiceFixture {
}
}
// Use a small initial interval so we can test restart behaviour more easily.
private val minRestartInterval = FiniteDuration(1, duration.SECONDS)
def withTriggerService[A](
testName: String,
dars: List[File],
@ -140,17 +143,18 @@ object TriggerServiceFixture {
host.getHostName,
ledgerProxyPort.value,
TimeProviderType.Static,
Duration.ofSeconds(30))
runnerConfig = TriggerRunnerConfig(
Duration.ofSeconds(30),
ServiceConfig.DefaultMaxInboundMessageSize,
ServiceConfig.DefaultMaxFailureNumberOfRetries,
ServiceConfig.DefaultFailureRetryTimeRange,
)
restartConfig = TriggerRestartConfig(
minRestartInterval,
ServiceConfig.DefaultMaxRestartInterval,
)
service <- ServiceMain.startServer(
host.getHostName,
Port(0).value,
ledgerConfig,
runnerConfig,
restartConfig,
encodedDar,
jdbcConfig,
noSecretKey = true // That's ok, use the default.

View File

@ -18,12 +18,10 @@ import org.scalatest._
import org.scalatest.concurrent.Eventually
import org.scalatest.time.{Seconds, Span}
import scala.concurrent.{Await, TimeoutException}
import scala.concurrent.duration._
import scala.concurrent.Await
import scala.concurrent.duration.Duration
import scala.concurrent.{ExecutionContext, Future}
import scalaz.syntax.tag._
import scalaz.syntax.traverse._
import spray.json._
import com.daml.bazeltools.BazelRunfiles.requiredResource
import com.daml.daml_lf_dev.DamlLf
@ -323,122 +321,74 @@ abstract class AbstractTriggerServiceTest extends AsyncFlatSpec with Eventually
} yield succeed
}
// FIXME(RJR): This doesn't make sense with our convention of managing the running trigger store
ignore should "fail to start a trigger if a ledger client can't be obtained" in withTriggerService(
it should "restart trigger on initialization failure due to failed connection" in withTriggerService(
Some(dar)) { (uri: Uri, client: LedgerClient, ledgerProxy: Proxy) =>
// Disable the proxy. This means that the service won't be able to
// get a ledger connection.
ledgerProxy.disable()
try {
// Request a trigger be started and setup an assertion that
// completes with success when the running trigger table becomes
// non-empty.
val runningTriggersNotEmpty = for {
resp <- startTrigger(uri, s"$testPkgId:TestTrigger:trigger", alice)
aliceTrigger <- parseTriggerId(resp)
triggerId <- parseTriggerId(resp)
} yield (assertTriggerIds(uri, alice, _.nonEmpty))
// Wait a good while (10s) on the assertion to become true. If it
// does, indicate the test has failed (because, since the trigger
// can't be initialized for the lack of a viable ledger client
// connection, it should not make its way to the running triggers
// table).
Await.ready(awaitable = runningTriggersNotEmpty, atMost = 10.seconds)
fail("Timeout expected")
} catch {
// If the assertion times-out the test has succeeded (look to
// the log; you'll see messages indicating that the trigger
// failed to initialize and was stopped).
case _: TimeoutException => succeed
} finally {
// This isn't strictly necessary here (since each test gets its
// own fixture) but it's jolly decent of us to do it anyway.
ledgerProxy.enable()
}
for {
// Simulate a failed ledger connection which will prevent triggers from initializing.
_ <- Future(ledgerProxy.disable())
resp <- startTrigger(uri, s"$testPkgId:TestTrigger:trigger", alice)
// The start request should succeed and an entry should be added to the running trigger store,
// even though the trigger will not be able to start.
aliceTrigger <- parseTriggerId(resp)
_ <- assertTriggerIds(uri, alice, _ == Vector(aliceTrigger))
// Check the log for an initialization failure.
_ <- assertTriggerStatus(uri, aliceTrigger, _.contains("stopped: initialization failure"))
// Finally establish the connection and check that the trigger eventually starts.
_ <- Future(ledgerProxy.enable())
_ <- assertTriggerStatus(uri, aliceTrigger, _.last == "running")
} yield succeed
}
it should "stop a failing trigger on network failure" in withTriggerService(Some(dar)) {
(uri: Uri, client: LedgerClient, ledgerProxy: Proxy) =>
// Simulate the ledger becoming unavailable indefinitely due to network connectivity loss.
// Our restart strategy means that a previously running trigger will be stopped after a number
// of restarts. However the running trigger store is not impacted.
for {
// Request a trigger be started for Alice.
resp <- startTrigger(uri, s"$testPkgId:TestTrigger:trigger", alice)
aliceTrigger <- parseTriggerId(resp)
// Proceed when it's confirmed to be running.
_ <- assertTriggerIds(uri, alice, _ == Vector(aliceTrigger))
// Simulate network failure.
_ <- Future { ledgerProxy.disable() }
// The running trigger store should be unchanged.
_ <- assertTriggerIds(uri, alice, _ == Vector(aliceTrigger))
// Confirm that the trigger was eventually stopped.
_ <- assertTriggerStatus(uri, aliceTrigger, _.last == "stopped: initialization failure")
} yield succeed
it should "restart trigger on run-time failure due to dropped connection" in withTriggerService(
Some(dar)) { (uri: Uri, client: LedgerClient, ledgerProxy: Proxy) =>
// Simulate the ledger being briefly unavailable due to network connectivity loss.
// We continually restart the trigger until the connection returns.
for {
// Request a trigger be started for Alice.
resp <- startTrigger(uri, s"$testPkgId:TestTrigger:trigger", alice)
aliceTrigger <- parseTriggerId(resp)
_ <- assertTriggerIds(uri, alice, _ == Vector(aliceTrigger))
// Proceed when it's confirmed to be running.
_ <- assertTriggerStatus(uri, aliceTrigger, _.last == "running")
// Simulate brief network connectivity loss and observe the trigger fail.
_ <- Future(ledgerProxy.disable())
_ <- assertTriggerStatus(uri, aliceTrigger, _.contains("stopped: runtime failure"))
// Finally check the trigger is restarted after the connection returns.
_ <- Future(ledgerProxy.enable())
_ <- assertTriggerStatus(uri, aliceTrigger, _.last == "running")
} yield succeed
}
it should "restart a trigger failing due to a dropped connection" in withTriggerService(Some(dar)) {
(uri: Uri, client: LedgerClient, ledgerProxy: Proxy) =>
// Simulate the ledger briefly being unavailable due to network
// connectivity loss. Our restart strategy means that the running
// trigger gets restarted.
for {
// Request a trigger be started for Alice.
resp <- startTrigger(uri, s"$testPkgId:TestTrigger:trigger", alice)
aliceTrigger <- parseTriggerId(resp)
_ <- assertTriggerIds(uri, alice, _ == Vector(aliceTrigger))
// Proceed when it is confirmed to be running.
_ <- assertTriggerStatus(uri, aliceTrigger, _.last == "running")
// Simulate brief network connectivity loss.
_ <- Future { ledgerProxy.disable() }
// Observe the trigger fail as a result.
_ <- assertTriggerStatus(
uri,
aliceTrigger,
log => log.count(_ == "stopped: runtime failure") > 0)
_ <- Future { ledgerProxy.enable() }
// Check the trigger is restarted after the outage.
_ <- assertTriggerStatus(uri, aliceTrigger, _.last == "running")
} yield succeed
}
it should "restart triggers with script init errors" in withTriggerService(Some(dar)) {
it should "restart triggers with initialization errors" in withTriggerService(Some(dar)) {
(uri: Uri, client: LedgerClient, ledgerProxy: Proxy) =>
for {
resp <- startTrigger(uri, s"$testPkgId:ErrorTrigger:trigger", alice)
aliceTrigger <- parseTriggerId(resp)
_ <- assertTriggerIds(uri, alice, _ == Vector(aliceTrigger))
// We will attempt to restart the trigger indefinitely.
// Just check that we see a few failures and restart attempts.
// This relies on a small minimum restart interval as the interval doubles after each
// failure.
_ <- assertTriggerStatus(uri, aliceTrigger, _.count(_ == "starting") > 2)
_ <- assertTriggerStatus(
uri,
aliceTrigger,
triggerStatus => {
triggerStatus.count(_ == "starting") ==
ServiceConfig.DefaultMaxFailureNumberOfRetries + 1 &&
triggerStatus.last == "stopped: initialization failure"
}
)
// Although the trigger won't be restarted again it's entry in the running trigger store
// isn't affected.
_ <- assertTriggerIds(uri, alice, _ == Vector(aliceTrigger))
_.count(_ == "stopped: initialization failure") > 2)
} yield succeed
}
it should "restart triggers with script update errors" in withTriggerService(Some(dar)) {
it should "restart triggers with update errors" in withTriggerService(Some(dar)) {
(uri: Uri, client: LedgerClient, ledgerProxy: Proxy) =>
for {
resp <- startTrigger(uri, s"$testPkgId:LowLevelErrorTrigger:trigger", alice)
aliceTrigger <- parseTriggerId(resp)
_ <- assertTriggerStatus(
uri,
aliceTrigger,
triggerStatus => {
triggerStatus
.count(_ == "running") == ServiceConfig.DefaultMaxFailureNumberOfRetries + 1 &&
triggerStatus.last == "stopped: runtime failure"
}
)
// Although the trigger won't be restarted again it's entry in the running trigger store
// isn't affected.
_ <- assertTriggerIds(uri, alice, _ == Vector(aliceTrigger))
// We will attempt to restart the trigger indefinitely.
// Just check that we see a few failures and restart attempts.
// This relies on a small minimum restart interval as the interval doubles after each
// failure.
_ <- assertTriggerStatus(uri, aliceTrigger, _.count(_ == "starting") > 2)
_ <- assertTriggerStatus(uri, aliceTrigger, _.count(_ == "stopped: runtime failure") > 2)
} yield succeed
}