Upload dars sequentially for every participant defined [kvl-1369] (#13834)

This commit is contained in:
Nicu Reut 2022-05-10 10:18:51 +02:00 committed by GitHub
parent 38f424155a
commit be275bb5ec
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 40 additions and 23 deletions

View File

@ -11,6 +11,7 @@ import akka.stream.Materializer
import akka.stream.scaladsl.{Sink, Source}
import com.daml.ledger.api.testtool.infrastructure.LedgerTestCasesRunner._
import com.daml.ledger.api.testtool.infrastructure.PartyAllocationConfiguration.ClosedWorldWaitingForAllParticipants
import com.daml.ledger.api.testtool.infrastructure.future.FutureUtil
import com.daml.ledger.api.testtool.infrastructure.participant.{
ParticipantSession,
ParticipantTestContext,
@ -143,6 +144,28 @@ final class LedgerTestCasesRunner(
)(implicit executionContext: ExecutionContext): Future[Either[Result.Failure, Result.Success]] =
result(createTestContextAndStart(test, session))
private def uploadDarsIfRequired(
sessions: Vector[ParticipantSession]
)(implicit executionContext: ExecutionContext): Future[Unit] =
if (uploadDars) {
FutureUtil
.sequential(sessions) { session =>
logger.info(s"Uploading DAR files for session $session")
for {
context <- session.createInitContext(
applicationId = "upload-dars",
identifierSuffix = identifierSuffix,
features = session.features,
)
// upload the dars sequentially to avoid conflicts
_ <- FutureUtil.sequential(Dars.resources)(uploadDar(context, _))
} yield ()
}
.map(_ => ())
} else {
Future.successful(logger.info("DAR files upload skipped."))
}
private def uploadDar(
context: ParticipantTestContext,
name: String,
@ -158,29 +181,6 @@ final class LedgerTestCasesRunner(
}
}
private def uploadDarsIfRequired(
sessions: Vector[ParticipantSession]
)(implicit executionContext: ExecutionContext): Future[Unit] =
if (uploadDars) {
Future
.sequence(sessions.map { session =>
for {
context <- session.createInitContext(
applicationId = "upload-dars",
identifierSuffix = identifierSuffix,
features = session.features,
)
// upload the dars sequentially to avoid conflicts
_ <- Dars.resources.foldLeft(Future.unit) { case (result, newResource) =>
result.flatMap(_ => uploadDar(context, newResource))
}
} yield ()
})
.map(_ => ())
} else {
Future.successful(logger.info("DAR files upload skipped."))
}
private def createActorSystem(): ActorSystem =
ActorSystem(classOf[LedgerTestCasesRunner].getSimpleName)

View File

@ -0,0 +1,17 @@
// Copyright (c) 2022 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.daml.ledger.api.testtool.infrastructure.future
import scala.concurrent.{ExecutionContext, Future}
object FutureUtil {
def sequential[T, R](elements: Seq[T])(f: T => Future[R])(implicit
ec: ExecutionContext
): Future[Seq[R]] =
elements.foldLeft(Future.successful(Seq.empty[R])) { case (results, newElement) =>
results.flatMap(resultsSoFar => f(newElement).map(resultsSoFar :+ _))
}
}