diff --git a/ledger/participant-integration-api/src/main/scala/platform/indexer/IndexerConfig.scala b/ledger/participant-integration-api/src/main/scala/platform/indexer/IndexerConfig.scala index 36c4400e03..018f48c2e4 100644 --- a/ledger/participant-integration-api/src/main/scala/platform/indexer/IndexerConfig.scala +++ b/ledger/participant-integration-api/src/main/scala/platform/indexer/IndexerConfig.scala @@ -33,6 +33,8 @@ case class IndexerConfig( tailingRateLimitPerSecond: Int = DefaultTailingRateLimitPerSecond, batchWithinMillis: Long = DefaultBatchWithinMillis, enableCompression: Boolean = DefaultEnableCompression, + schemaMigrationAttempts: Int = DefaultSchemaMigrationAttempts, + schemaMigrationAttemptBackoff: FiniteDuration = DefaultSchemaMigrationAttemptBackoff, haConfig: HaConfig = HaConfig(), ) @@ -53,4 +55,7 @@ object IndexerConfig { val DefaultTailingRateLimitPerSecond: Int = 20 val DefaultBatchWithinMillis: Long = 50L val DefaultEnableCompression: Boolean = false + + val DefaultSchemaMigrationAttempts: Int = 30 + val DefaultSchemaMigrationAttemptBackoff: FiniteDuration = 1.second } diff --git a/ledger/participant-integration-api/src/main/scala/platform/indexer/StandaloneIndexerServer.scala b/ledger/participant-integration-api/src/main/scala/platform/indexer/StandaloneIndexerServer.scala index b4297f2793..65bed8d991 100644 --- a/ledger/participant-integration-api/src/main/scala/platform/indexer/StandaloneIndexerServer.scala +++ b/ledger/participant-integration-api/src/main/scala/platform/indexer/StandaloneIndexerServer.scala @@ -80,7 +80,12 @@ final class StandaloneIndexerServer( case IndexerStartupMode.ValidateAndWaitOnly => Resource - .fromFuture(flywayMigrations.validateAndWaitOnly()) + .fromFuture( + flywayMigrations.validateAndWaitOnly( + config.schemaMigrationAttempts, + config.schemaMigrationAttemptBackoff, + ) + ) .map[ReportsHealth] { _ => logger.debug("Waiting for the indexer to validate the schema migrations.") () => Healthy diff --git a/ledger/participant-integration-api/src/main/scala/platform/store/FlywayMigrations.scala b/ledger/participant-integration-api/src/main/scala/platform/store/FlywayMigrations.scala index 3af0fe6dcc..56044441c9 100644 --- a/ledger/participant-integration-api/src/main/scala/platform/store/FlywayMigrations.scala +++ b/ledger/participant-integration-api/src/main/scala/platform/store/FlywayMigrations.scala @@ -7,12 +7,13 @@ import com.daml.ledger.resources.ResourceContext import com.daml.logging.{ContextualizedLogger, LoggingContext} import com.daml.platform.store.FlywayMigrations._ import com.daml.platform.store.backend.VerifiedDataSource +import com.daml.timer.RetryStrategy import javax.sql.DataSource import org.flywaydb.core.Flyway import org.flywaydb.core.api.MigrationVersion import org.flywaydb.core.api.configuration.FluentConfiguration -import scala.annotation.tailrec +import scala.concurrent.duration.FiniteDuration import scala.concurrent.{ExecutionContext, Future} private[platform] class FlywayMigrations( @@ -25,13 +26,11 @@ private[platform] class FlywayMigrations( private val dbType = DbType.jdbcType(jdbcUrl) implicit private val ec: ExecutionContext = resourceContext.executionContext - private def run[T](t: FluentConfiguration => T): Future[T] = { - VerifiedDataSource(jdbcUrl).flatMap(dataSource => - Future( - t(configurationBase(dataSource)) - ) - ) - } + private def runF[T](t: FluentConfiguration => Future[T]): Future[T] = + VerifiedDataSource(jdbcUrl).flatMap(dataSource => t(configurationBase(dataSource))) + + private def run[T](t: FluentConfiguration => T): Future[T] = + runF(fc => Future(t(fc))) private def configurationBase(dataSource: DataSource): FluentConfiguration = Flyway @@ -67,42 +66,25 @@ private[platform] class FlywayMigrations( logger.info("Flyway schema clean finished successfully.") } - def validateAndWaitOnly(): Future[Unit] = run { configBase => - val flyway = configBase - .ignoreFutureMigrations(false) - .load() + def validateAndWaitOnly(retries: Int, retryBackoff: FiniteDuration): Future[Unit] = runF { + configBase => + val flyway = configBase + .ignoreFutureMigrations(false) + .load() - logger.info("Running Flyway validation...") + logger.info("Running Flyway validation...") - @tailrec - def flywayMigrationDone( - retries: Int, - pendingMigrationsSoFar: Option[Int], - ): Unit = { - val pendingMigrations = flyway.info().pending().length - if (pendingMigrations == 0) { - () - } else if (retries <= 0) { - throw ExhaustedRetries(pendingMigrations) - } else if (pendingMigrationsSoFar.exists(pendingMigrations >= _)) { - throw StoppedProgressing(pendingMigrations) - } else { - logger.debug( - s"Concurrent migration has reduced the pending migrations set to $pendingMigrations, waiting until pending set is empty.." - ) - Thread.sleep(1000) - flywayMigrationDone(retries - 1, Some(pendingMigrations)) + RetryStrategy.constant(retries, retryBackoff) { (attempt, _) => + val pendingMigrations = flyway.info().pending().length + if (pendingMigrations == 0) { + Future.unit + } else { + logger.debug( + s"Pending migrations ${pendingMigrations} on attempt ${attempt} of ${retries} attempts" + ) + Future.failed(MigrationIncomplete(pendingMigrations)) + } } - } - - try { - flywayMigrationDone(10, None) - logger.info("Flyway schema validation finished successfully.") - } catch { - case ex: RuntimeException => - logger.error(s"Failed to validate and wait only: ${ex.getMessage}", ex) - throw ex - } } def migrateOnEmptySchema(): Future[Unit] = run { configBase => @@ -164,10 +146,8 @@ private[platform] object FlywayMigrations { } } - case class ExhaustedRetries(pendingMigrations: Int) - extends RuntimeException(s"Ran out of retries with $pendingMigrations migrations remaining") - case class StoppedProgressing(pendingMigrations: Int) - extends RuntimeException(s"Stopped progressing with $pendingMigrations migrations") + case class MigrationIncomplete(pendingMigrations: Int) + extends RuntimeException(s"Migration incomplete with $pendingMigrations migrations remaining") case class MigrateOnEmptySchema(appliedMigrations: Int, pendingMigrations: Int) extends RuntimeException( s"Asked to migrate-on-empty-schema, but encountered neither an empty database with $appliedMigrations " +