mirror of
https://github.com/digital-asset/daml.git
synced 2024-09-20 01:07:18 +03:00
Chore slow migration error removal (#10886)
* Avoid slow-progress timeout StoppedProgressing to avoid flakiness CHANGELOG_BEGIN CHANGELOG_END * Make schema migration retries and backoff configurable * Review feedback - use RetryStrategy.constant instead * Remove unused tailrec import * Simplifications and runF by Marton * Rename config options from retry to attempt and default to 30 attempts
This commit is contained in:
parent
e4cce53957
commit
b3e4975795
@ -33,6 +33,8 @@ case class IndexerConfig(
|
||||
tailingRateLimitPerSecond: Int = DefaultTailingRateLimitPerSecond,
|
||||
batchWithinMillis: Long = DefaultBatchWithinMillis,
|
||||
enableCompression: Boolean = DefaultEnableCompression,
|
||||
schemaMigrationAttempts: Int = DefaultSchemaMigrationAttempts,
|
||||
schemaMigrationAttemptBackoff: FiniteDuration = DefaultSchemaMigrationAttemptBackoff,
|
||||
haConfig: HaConfig = HaConfig(),
|
||||
)
|
||||
|
||||
@ -53,4 +55,7 @@ object IndexerConfig {
|
||||
val DefaultTailingRateLimitPerSecond: Int = 20
|
||||
val DefaultBatchWithinMillis: Long = 50L
|
||||
val DefaultEnableCompression: Boolean = false
|
||||
|
||||
val DefaultSchemaMigrationAttempts: Int = 30
|
||||
val DefaultSchemaMigrationAttemptBackoff: FiniteDuration = 1.second
|
||||
}
|
||||
|
@ -80,7 +80,12 @@ final class StandaloneIndexerServer(
|
||||
|
||||
case IndexerStartupMode.ValidateAndWaitOnly =>
|
||||
Resource
|
||||
.fromFuture(flywayMigrations.validateAndWaitOnly())
|
||||
.fromFuture(
|
||||
flywayMigrations.validateAndWaitOnly(
|
||||
config.schemaMigrationAttempts,
|
||||
config.schemaMigrationAttemptBackoff,
|
||||
)
|
||||
)
|
||||
.map[ReportsHealth] { _ =>
|
||||
logger.debug("Waiting for the indexer to validate the schema migrations.")
|
||||
() => Healthy
|
||||
|
@ -7,12 +7,13 @@ import com.daml.ledger.resources.ResourceContext
|
||||
import com.daml.logging.{ContextualizedLogger, LoggingContext}
|
||||
import com.daml.platform.store.FlywayMigrations._
|
||||
import com.daml.platform.store.backend.VerifiedDataSource
|
||||
import com.daml.timer.RetryStrategy
|
||||
import javax.sql.DataSource
|
||||
import org.flywaydb.core.Flyway
|
||||
import org.flywaydb.core.api.MigrationVersion
|
||||
import org.flywaydb.core.api.configuration.FluentConfiguration
|
||||
|
||||
import scala.annotation.tailrec
|
||||
import scala.concurrent.duration.FiniteDuration
|
||||
import scala.concurrent.{ExecutionContext, Future}
|
||||
|
||||
private[platform] class FlywayMigrations(
|
||||
@ -25,13 +26,11 @@ private[platform] class FlywayMigrations(
|
||||
private val dbType = DbType.jdbcType(jdbcUrl)
|
||||
implicit private val ec: ExecutionContext = resourceContext.executionContext
|
||||
|
||||
private def run[T](t: FluentConfiguration => T): Future[T] = {
|
||||
VerifiedDataSource(jdbcUrl).flatMap(dataSource =>
|
||||
Future(
|
||||
t(configurationBase(dataSource))
|
||||
)
|
||||
)
|
||||
}
|
||||
private def runF[T](t: FluentConfiguration => Future[T]): Future[T] =
|
||||
VerifiedDataSource(jdbcUrl).flatMap(dataSource => t(configurationBase(dataSource)))
|
||||
|
||||
private def run[T](t: FluentConfiguration => T): Future[T] =
|
||||
runF(fc => Future(t(fc)))
|
||||
|
||||
private def configurationBase(dataSource: DataSource): FluentConfiguration =
|
||||
Flyway
|
||||
@ -67,42 +66,25 @@ private[platform] class FlywayMigrations(
|
||||
logger.info("Flyway schema clean finished successfully.")
|
||||
}
|
||||
|
||||
def validateAndWaitOnly(): Future[Unit] = run { configBase =>
|
||||
val flyway = configBase
|
||||
.ignoreFutureMigrations(false)
|
||||
.load()
|
||||
def validateAndWaitOnly(retries: Int, retryBackoff: FiniteDuration): Future[Unit] = runF {
|
||||
configBase =>
|
||||
val flyway = configBase
|
||||
.ignoreFutureMigrations(false)
|
||||
.load()
|
||||
|
||||
logger.info("Running Flyway validation...")
|
||||
logger.info("Running Flyway validation...")
|
||||
|
||||
@tailrec
|
||||
def flywayMigrationDone(
|
||||
retries: Int,
|
||||
pendingMigrationsSoFar: Option[Int],
|
||||
): Unit = {
|
||||
val pendingMigrations = flyway.info().pending().length
|
||||
if (pendingMigrations == 0) {
|
||||
()
|
||||
} else if (retries <= 0) {
|
||||
throw ExhaustedRetries(pendingMigrations)
|
||||
} else if (pendingMigrationsSoFar.exists(pendingMigrations >= _)) {
|
||||
throw StoppedProgressing(pendingMigrations)
|
||||
} else {
|
||||
logger.debug(
|
||||
s"Concurrent migration has reduced the pending migrations set to $pendingMigrations, waiting until pending set is empty.."
|
||||
)
|
||||
Thread.sleep(1000)
|
||||
flywayMigrationDone(retries - 1, Some(pendingMigrations))
|
||||
RetryStrategy.constant(retries, retryBackoff) { (attempt, _) =>
|
||||
val pendingMigrations = flyway.info().pending().length
|
||||
if (pendingMigrations == 0) {
|
||||
Future.unit
|
||||
} else {
|
||||
logger.debug(
|
||||
s"Pending migrations ${pendingMigrations} on attempt ${attempt} of ${retries} attempts"
|
||||
)
|
||||
Future.failed(MigrationIncomplete(pendingMigrations))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
try {
|
||||
flywayMigrationDone(10, None)
|
||||
logger.info("Flyway schema validation finished successfully.")
|
||||
} catch {
|
||||
case ex: RuntimeException =>
|
||||
logger.error(s"Failed to validate and wait only: ${ex.getMessage}", ex)
|
||||
throw ex
|
||||
}
|
||||
}
|
||||
|
||||
def migrateOnEmptySchema(): Future[Unit] = run { configBase =>
|
||||
@ -164,10 +146,8 @@ private[platform] object FlywayMigrations {
|
||||
}
|
||||
}
|
||||
|
||||
case class ExhaustedRetries(pendingMigrations: Int)
|
||||
extends RuntimeException(s"Ran out of retries with $pendingMigrations migrations remaining")
|
||||
case class StoppedProgressing(pendingMigrations: Int)
|
||||
extends RuntimeException(s"Stopped progressing with $pendingMigrations migrations")
|
||||
case class MigrationIncomplete(pendingMigrations: Int)
|
||||
extends RuntimeException(s"Migration incomplete with $pendingMigrations migrations remaining")
|
||||
case class MigrateOnEmptySchema(appliedMigrations: Int, pendingMigrations: Int)
|
||||
extends RuntimeException(
|
||||
s"Asked to migrate-on-empty-schema, but encountered neither an empty database with $appliedMigrations " +
|
||||
|
Loading…
Reference in New Issue
Block a user