sandbox-classic: Only allow --max-parallel-submissions here. (#10941)

This option is only used by Sandbox Classic and Daml Driver for SQL.
There is no reason for it to be part of the command service
configuration.

CHANGELOG_BEGIN
CHANGELOG_END
This commit is contained in:
Samir Talwar 2021-09-20 17:38:20 +02:00 committed by GitHub
parent ac02dbdeb9
commit 88ef05e557
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 28 additions and 36 deletions

View File

@ -62,6 +62,14 @@ private[sql] final class Cli(
)
.action((mode, config) => config.copy(sqlStartMode = PostgresStartupMode.fromString(mode)))
parser
.opt[Int]("max-parallel-submissions")
.optional()
.action((value, config) => config.copy(maxParallelSubmissions = value))
.text(
s"Maximum number of successfully interpreted commands waiting to be sequenced. The threshold is shared across all parties. Overflowing it will cause back-pressure, signaled by a `RESOURCE_EXHAUSTED` error code. Default is ${defaultConfig.maxParallelSubmissions}."
)
// Ideally we would set the relevant options to `required()`, but it doesn't seem to work.
// Even when the value is provided, it still reports that it's missing. Instead, we check the
// configuration afterwards.

View File

@ -8,15 +8,8 @@ import java.time.Duration
/** Reaching either [[inputBufferSize]] or [[maxCommandsInFlight]] will trigger
* back-pressure by [[com.daml.ledger.client.services.commands.CommandClient]].
*
* Reaching [[maxParallelSubmissions]] will trigger back-pressure
* by [[com.daml.platform.sandbox.stores.ledger.sql.SqlLedger]].
*
* @param inputBufferSize
* Maximum number of commands waiting to be submitted for each party.
* @param maxParallelSubmissions
* Maximum number of commands waiting to be sequenced after being evaluated by the engine.
* This does _not_ apply to on-X ledgers, where sequencing happens after the evaluated
* transaction has been shipped via the WriteService.
* @param maxCommandsInFlight
* Maximum number of submitted commands waiting to be completed for each party.
* @param trackerRetentionPeriod
@ -26,7 +19,6 @@ import java.time.Duration
*/
final case class CommandConfiguration(
inputBufferSize: Int,
maxParallelSubmissions: Int,
maxCommandsInFlight: Int,
trackerRetentionPeriod: Duration,
)
@ -37,7 +29,6 @@ object CommandConfiguration {
lazy val default: CommandConfiguration =
CommandConfiguration(
inputBufferSize = 512,
maxParallelSubmissions = 512,
maxCommandsInFlight = 256,
trackerRetentionPeriod = DefaultTrackerRetentionPeriod,
)

View File

@ -397,15 +397,6 @@ object Config {
"Maximum number of submitted commands waiting for completion for each party (only applied when using the CommandService). Overflowing this threshold will cause back-pressure, signaled by a RESOURCE_EXHAUSTED error code. Default is 256."
)
opt[Int]("max-parallel-submissions")
.optional()
.action((value, config) =>
config.copy(commandConfig = config.commandConfig.copy(maxParallelSubmissions = value))
)
.text(
"Maximum number of successfully interpreted commands waiting to be sequenced (applied only when running sandbox-classic). The threshold is shared across all parties. Overflowing it will cause back-pressure, signaled by a RESOURCE_EXHAUSTED error code. Default is 512."
)
opt[Int]("input-buffer-size")
.optional()
.action((value, config) =>

View File

@ -40,6 +40,13 @@ private[sandbox] object Cli extends SandboxCli {
)
.action((url, config) => config.copy(jdbcUrl = Some(url)))
parser
.opt[Int]("max-parallel-submissions")
.optional()
.action((value, config) => config.copy(maxParallelSubmissions = value))
.text(
s"Maximum number of successfully interpreted commands waiting to be sequenced. The threshold is shared across all parties. Overflowing it will cause back-pressure, signaled by a `RESOURCE_EXHAUSTED` error code. Default is ${defaultConfig.maxParallelSubmissions}."
)
parser
}
}

View File

@ -321,7 +321,7 @@ final class SandboxServer(
timeProvider = timeProvider,
ledgerEntries = ledgerEntries,
startMode = startMode,
queueDepth = config.commandConfig.maxParallelSubmissions,
queueDepth = config.maxParallelSubmissions,
transactionCommitter = transactionCommitter,
templateStore = packageStore,
eventsPageSize = config.eventsPageSize,

View File

@ -111,9 +111,9 @@ sealed trait CommandServiceBackPressureITBase
super.config.copy(
commandConfig = super.config.commandConfig.copy(
inputBufferSize = 1,
maxParallelSubmissions = 2,
maxCommandsInFlight = 2,
)
),
maxParallelSubmissions = 2,
)
}

View File

@ -152,9 +152,9 @@ sealed trait CompletionServiceITBase
super.config.copy(
commandConfig = super.config.commandConfig.copy(
inputBufferSize = 1,
maxParallelSubmissions = 2,
maxCommandsInFlight = 2,
)
),
maxParallelSubmissions = 2,
)
}

View File

@ -46,12 +46,14 @@ class TransactionStreamTerminationIT
jdbcUrl = Some("jdbc:h2:mem:static_time;db_close_delay=-1"),
timeProviderType = Some(TimeProviderType.Static),
)
def commandClientConfig =
private def commandClientConfig =
CommandClientConfiguration(
maxCommandsInFlight = config.commandConfig.maxCommandsInFlight,
maxParallelSubmissions = config.commandConfig.maxParallelSubmissions,
maxParallelSubmissions = config.maxParallelSubmissions,
defaultDeduplicationTime = JDuration.ofSeconds(30),
)
private val applicationId = "transaction-stream-termination-test"
private def newTransactionClient(ledgerId: domain.LedgerId) =
@ -82,7 +84,7 @@ class TransactionStreamTerminationIT
_.commands.ledgerId := actualLedgerId.unwrap,
_.commands.applicationId := applicationId,
_.commands.commands := List(
Command(
Command.of(
Command.Command.Create(
CreateCommand(
Some(templateIds.dummy),

View File

@ -273,15 +273,6 @@ class CommonCliBase(name: LedgerName) {
"Maximum number of submitted commands waiting for completion for each party (only applied when using the CommandService). Overflowing this threshold will cause back-pressure, signaled by a RESOURCE_EXHAUSTED error code. Default is 256."
)
opt[Int]("max-parallel-submissions")
.optional()
.action((value, config) =>
config.copy(commandConfig = config.commandConfig.copy(maxParallelSubmissions = value))
)
.text(
"Maximum number of successfully interpreted commands waiting to be sequenced (applied only when running sandbox-classic). The threshold is shared across all parties. Overflowing it will cause back-pressure, signaled by a RESOURCE_EXHAUSTED error code. Default is 512."
)
opt[Int]("input-buffer-size")
.optional()
.action((value, config) =>

View File

@ -54,6 +54,7 @@ final case class SandboxConfig(
seeding: Option[Seeding],
metricsReporter: Option[MetricsReporter],
metricsReportingInterval: FiniteDuration,
maxParallelSubmissions: Int, // only used by Sandbox Classic
eventsPageSize: Int,
eventsProcessingParallelism: Int,
lfValueTranslationEventCacheConfiguration: SizedCache.Configuration,
@ -115,6 +116,7 @@ object SandboxConfig {
damlPackages = Nil,
timeProviderType = None,
configurationLoadTimeout = Duration.ofSeconds(10),
maxDeduplicationDuration = None,
delayBeforeSubmittingLedgerConfiguration = Duration.ofSeconds(1),
timeModel = LedgerTimeModel.reasonableDefault,
commandConfig = CommandConfiguration.default,
@ -132,6 +134,7 @@ object SandboxConfig {
seeding = Some(Seeding.Strong),
metricsReporter = None,
metricsReportingInterval = 10.seconds,
maxParallelSubmissions = 512,
eventsPageSize = DefaultEventsPageSize,
eventsProcessingParallelism = DefaultEventsProcessingParallelism,
lfValueTranslationEventCacheConfiguration = DefaultLfValueTranslationCacheConfiguration,
@ -143,7 +146,6 @@ object SandboxConfig {
sqlStartMode = Some(DefaultSqlStartupMode),
enableAppendOnlySchema = false,
enableCompression = false,
maxDeduplicationDuration = None,
)
sealed abstract class EngineMode extends Product with Serializable