DPP-665 Make in-flight commands configurable (#11456)

changelog_begin
changelog_end
This commit is contained in:
Robert Autenrieth 2021-10-28 21:50:09 +02:00 committed by GitHub
parent bf00956143
commit bb4f4c5a40
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 21 additions and 5 deletions

View File

@ -52,6 +52,14 @@ object Cli {
config.copy(contractSetDescriptorFile = Some(descriptorFile))
}
opt[Int]("max-in-flight-commands")
.hidden() // TODO: uncomment when production-ready
.text("Maximum in-flight commands for command submissions.")
.optional()
.action { case (size, config) =>
config.copy(maxInFlightCommands = size)
}
opt[FiniteDuration]("log-interval")
.abbr("r")
.text("Stream metrics log interval.")

View File

@ -18,6 +18,7 @@ case class Config(
streams: List[Config.StreamConfig],
reportingPeriod: FiniteDuration,
contractSetDescriptorFile: Option[File],
maxInFlightCommands: Int,
metricsReporter: MetricsReporter,
)
@ -93,6 +94,7 @@ object Config {
streams = List.empty[Config.StreamConfig],
reportingPeriod = 5.seconds,
contractSetDescriptorFile = None,
maxInFlightCommands = 100,
metricsReporter = MetricsReporter.Console,
)
}

View File

@ -49,7 +49,8 @@ object LedgerApiBenchTool {
Future.successful(
logger.info("No contract set descriptor file provided. Skipping contracts generation.")
)
case Some(descriptorFile) => CommandSubmitter(apiServices).submit(descriptorFile)
case Some(descriptorFile) =>
CommandSubmitter(apiServices).submit(descriptorFile, config.maxInFlightCommands)
}
def benchmarkStep(): Future[Unit] = if (config.streams.isEmpty) {

View File

@ -32,7 +32,8 @@ case class CommandSubmitter(services: LedgerApiServices) {
private def darId(index: Int) = s"submission-dars-$index-$identifierSuffix"
def submit(
descriptorFile: File
descriptorFile: File,
maxInFlightCommands: Int,
)(implicit ec: ExecutionContext): Future[Unit] =
(for {
_ <- Future.successful(logger.info("Generating contracts..."))
@ -41,7 +42,7 @@ case class CommandSubmitter(services: LedgerApiServices) {
signatory <- allocateParty(signatoryName)
observers <- allocateParties(descriptor.numberOfObservers, observerName)
_ <- uploadTestDars()
_ <- submitCommands(descriptor, signatory, observers)
_ <- submitCommands(descriptor, signatory, observers, maxInFlightCommands)
} yield logger.info("Commands submitted successfully."))
.recoverWith { case NonFatal(ex) =>
logger.error(s"Command submission failed. Details: ${ex.getLocalizedMessage}", ex)
@ -108,25 +109,29 @@ case class CommandSubmitter(services: LedgerApiServices) {
descriptor: ContractSetDescriptor,
signatory: Primitive.Party,
observers: List[Primitive.Party],
maxInFlightCommands: Int,
)(implicit
ec: ExecutionContext
): Future[Unit] = {
implicit val resourceContext: ResourceContext = ResourceContext(ec)
val progressMeter = CommandSubmitter.ProgressMeter(descriptor.numberOfInstances)
// Output a log line roughly once per 10% progress, or once every 500 submissions (whichever comes first)
val progressLogInterval = math.min(descriptor.numberOfInstances / 10 + 1, 500)
val progressLoggingSink =
Sink.foreach[Int](index =>
if (index % 500 == 0) {
if (index % progressLogInterval == 0) {
logger.info(progressMeter.getProgress(index))
}
)
val generator = new CommandGenerator(RandomnessProvider.Default, descriptor, observers)
logger.info("Submitting commands...")
materializerOwner()
.use { implicit materializer =>
Source
.fromIterator(() => (1 to descriptor.numberOfInstances).iterator)
.mapAsync(100) { index =>
.mapAsync(maxInFlightCommands) { index =>
generator.next() match {
case Success(command) =>
submitAndWait(