DPP-769 cap internal state (#12135)

* Limit max queue size

changelog_begin
changelog_end

* fix typo

* Make id queue limit configurable
This commit is contained in:
Robert Autenrieth 2021-12-23 21:00:44 +01:00 committed by GitHub
parent e1b4c30132
commit 2141bfbea5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
23 changed files with 86 additions and 4 deletions

View File

@ -32,6 +32,7 @@ case class ApiServerConfig(
acsIdFetchingParallelism: Int = IndexConfiguration.DefaultAcsIdFetchingParallelism,
acsContractFetchingParallelism: Int = IndexConfiguration.DefaultAcsContractFetchingParallelism,
acsGlobalParallelism: Int = IndexConfiguration.DefaultAcsGlobalParallelism,
acsIdQueueLimit: Int = IndexConfiguration.DefaultAcsIdQueueLimit,
portFile: Option[Path],
seeding: Seeding,
managementServiceTimeout: Duration,

View File

@ -83,6 +83,7 @@ object StandaloneIndexService {
acsIdFetchingParallelism = config.acsIdFetchingParallelism,
acsContractFetchingParallelism = config.acsContractFetchingParallelism,
acsGlobalParallelism = config.acsGlobalParallelism,
acsIdQueueLimit = config.acsIdQueueLimit,
servicesExecutionContext = servicesExecutionContext,
metrics = metrics,
lfValueTranslationCache = lfValueTranslationCache,

View File

@ -11,4 +11,5 @@ object IndexConfiguration {
val DefaultAcsIdFetchingParallelism: Int = 2
val DefaultAcsContractFetchingParallelism: Int = 2
val DefaultAcsGlobalParallelism: Int = 10
val DefaultAcsIdQueueLimit: Int = 10000000
}

View File

@ -28,6 +28,7 @@ private[platform] object JdbcIndex {
acsIdFetchingParallelism: Int,
acsContractFetchingParallelism: Int,
acsGlobalParallelism: Int,
acsIdQueueLimit: Int,
servicesExecutionContext: ExecutionContext,
metrics: Metrics,
lfValueTranslationCache: LfValueTranslationCache.Cache,
@ -48,6 +49,7 @@ private[platform] object JdbcIndex {
acsIdFetchingParallelism = acsIdFetchingParallelism,
acsContractFetchingParallelism = acsContractFetchingParallelism,
acsGlobalParallelism = acsGlobalParallelism,
acsIdQueueLimit = acsIdQueueLimit,
servicesExecutionContext = servicesExecutionContext,
metrics = metrics,
lfValueTranslationCache = lfValueTranslationCache,

View File

@ -54,6 +54,7 @@ private[platform] object ReadOnlySqlLedger {
acsIdFetchingParallelism: Int,
acsContractFetchingParallelism: Int,
acsGlobalParallelism: Int,
acsIdQueueLimit: Int,
servicesExecutionContext: ExecutionContext,
metrics: Metrics,
lfValueTranslationCache: LfValueTranslationCache.Cache,
@ -179,6 +180,7 @@ private[platform] object ReadOnlySqlLedger {
acsIdFetchingParallelism = acsIdFetchingParallelism,
acsContractFetchingParallelism = acsContractFetchingParallelism,
acsGlobalParallelism = acsGlobalParallelism,
acsIdQueueLimit = acsIdQueueLimit,
servicesExecutionContext = servicesExecutionContext,
metrics = metrics,
lfValueTranslationCache = lfValueTranslationCache,

View File

@ -68,6 +68,7 @@ object IndexMetadata {
acsIdFetchingParallelism = 2,
acsContractFetchingParallelism = 2,
acsGlobalParallelism = 10,
acsIdQueueLimit = 1000000,
servicesExecutionContext = executionContext,
metrics = metrics,
lfValueTranslationCache = LfValueTranslationCache.Cache.none,

View File

@ -63,6 +63,7 @@ private class JdbcLedgerDao(
acsIdFetchingParallelism: Int,
acsContractFetchingParallelism: Int,
acsGlobalParallelism: Int,
acsIdQueueLimit: Int,
performPostCommitValidation: Boolean,
metrics: Metrics,
lfValueTranslationCache: LfValueTranslationCache.Cache,
@ -677,6 +678,7 @@ private class JdbcLedgerDao(
idPageSize = acsIdPageSize,
idFetchingParallelism = acsIdFetchingParallelism,
acsFetchingparallelism = acsContractFetchingParallelism,
acsIdQueueLimit = acsIdQueueLimit,
metrics = metrics,
materializer = materializer,
querylimiter =
@ -791,6 +793,7 @@ private[platform] object JdbcLedgerDao {
acsIdFetchingParallelism: Int,
acsContractFetchingParallelism: Int,
acsGlobalParallelism: Int,
acsIdQueueLimit: Int,
servicesExecutionContext: ExecutionContext,
metrics: Metrics,
lfValueTranslationCache: LfValueTranslationCache.Cache,
@ -810,7 +813,8 @@ private[platform] object JdbcLedgerDao {
acsIdPageSize,
acsIdFetchingParallelism,
acsContractFetchingParallelism,
acsGlobalParallelism: Int,
acsGlobalParallelism,
acsIdQueueLimit,
false,
metrics,
lfValueTranslationCache,
@ -837,6 +841,7 @@ private[platform] object JdbcLedgerDao {
acsIdFetchingParallelism: Int,
acsContractFetchingParallelism: Int,
acsGlobalParallelism: Int,
acsIdQueueLimit: Int,
servicesExecutionContext: ExecutionContext,
metrics: Metrics,
lfValueTranslationCache: LfValueTranslationCache.Cache,
@ -857,6 +862,7 @@ private[platform] object JdbcLedgerDao {
acsIdFetchingParallelism,
acsContractFetchingParallelism,
acsGlobalParallelism,
acsIdQueueLimit,
false,
metrics,
lfValueTranslationCache,
@ -883,6 +889,7 @@ private[platform] object JdbcLedgerDao {
acsIdFetchingParallelism: Int,
acsContractFetchingParallelism: Int,
acsGlobalParallelism: Int,
acsIdQueueLimit: Int,
servicesExecutionContext: ExecutionContext,
metrics: Metrics,
lfValueTranslationCache: LfValueTranslationCache.Cache,
@ -904,6 +911,7 @@ private[platform] object JdbcLedgerDao {
acsIdFetchingParallelism,
acsContractFetchingParallelism,
acsGlobalParallelism,
acsIdQueueLimit,
true,
metrics,
lfValueTranslationCache,

View File

@ -38,6 +38,7 @@ class FilterTableACSReader(
idPageSize: Int,
idFetchingParallelism: Int,
acsFetchingparallelism: Int,
acsIdQueueLimit: Int,
metrics: Metrics,
materializer: Materializer,
querylimiter: ConcurrencyLimiter,
@ -97,6 +98,7 @@ class FilterTableACSReader(
tasks = tasks.map(_.filter),
outputBatchSize = pageSize,
inputBatchSize = idPageSize,
idQueueLimit = acsIdQueueLimit,
metrics = metrics,
)
)
@ -258,13 +260,15 @@ private[events] object FilterTableACSReader {
tasks: Iterable[TASK],
outputBatchSize: Int,
inputBatchSize: Int,
idQueueLimit: Int,
metrics: Metrics,
)(implicit
loggingContext: LoggingContext
): () => ((TASK, Iterable[Long])) => Vector[Vector[Long]] = () => {
val outputQueue = new BatchedDistinctOutputQueue(outputBatchSize)
val taskQueue = new MergingTaskQueue[TASK](outputQueue.push)
val taskTracker = new TaskTracker[TASK](tasks, inputBatchSize)
val maxTaskQueueSize = idQueueLimit / inputBatchSize
val taskTracker = new TaskTracker[TASK](tasks, inputBatchSize, maxTaskQueueSize)
{ case (task, ids) =>
@tailrec def go(next: (Option[(Iterable[Long], TASK)], Boolean)): Unit = {
@ -375,7 +379,7 @@ private[events] object FilterTableACSReader {
/** Helper class to encapsulate stateful tracking of task streams.
*/
class TaskTracker[TASK](allTasks: Iterable[TASK], inputBatchSize: Int) {
class TaskTracker[TASK](allTasks: Iterable[TASK], inputBatchSize: Int, maxQueueSize: Int) {
assert(inputBatchSize > 0)
private val idle: mutable.Set[TASK] = mutable.Set.empty
@ -392,7 +396,11 @@ private[events] object FilterTableACSReader {
val toEnqueue =
if (idle(task)) queueEntry(task, ids)
else {
queuedRanges += (task -> queuedRanges.getOrElse(task, Vector.empty).:+(ids))
val previousRanges = queuedRanges.getOrElse(task, Vector.empty)
if (previousRanges.length >= maxQueueSize) {
throw new RuntimeException(s"More than $maxQueueSize id pages queued up")
}
queuedRanges += (task -> previousRanges.:+(ids))
None
}
if (

View File

@ -54,6 +54,7 @@ private[dao] trait JdbcLedgerDaoBackend extends AkkaBeforeAndAfterAll {
acsIdFetchingParallelism: Int,
acsContractFetchingParallelism: Int,
acsGlobalParallelism: Int,
acsIdQueueLimit: Int,
errorFactories: ErrorFactories,
)(implicit
loggingContext: LoggingContext
@ -88,6 +89,7 @@ private[dao] trait JdbcLedgerDaoBackend extends AkkaBeforeAndAfterAll {
acsIdFetchingParallelism = acsIdFetchingParallelism,
acsContractFetchingParallelism = acsContractFetchingParallelism,
acsGlobalParallelism = acsGlobalParallelism,
acsIdQueueLimit = acsIdQueueLimit,
servicesExecutionContext = executionContext,
metrics = metrics,
lfValueTranslationCache = LfValueTranslationCache.Cache.none,
@ -126,6 +128,7 @@ private[dao] trait JdbcLedgerDaoBackend extends AkkaBeforeAndAfterAll {
acsIdFetchingParallelism = 2,
acsContractFetchingParallelism = 2,
acsGlobalParallelism = 10,
acsIdQueueLimit = 1000000,
errorFactories,
).acquire()
_ <- Resource.fromFuture(dao.initialize(TestLedgerId, TestParticipantId))

View File

@ -35,6 +35,7 @@ private[dao] trait JdbcLedgerDaoPostCommitValidationSpec extends LoneElement {
acsIdFetchingParallelism: Int,
acsContractFetchingParallelism: Int,
acsGlobalParallelism: Int,
acsIdQueueLimit: Int,
errorFactories: ErrorFactories,
)(implicit
loggingContext: LoggingContext
@ -79,6 +80,7 @@ private[dao] trait JdbcLedgerDaoPostCommitValidationSpec extends LoneElement {
acsIdFetchingParallelism = acsIdFetchingParallelism,
acsContractFetchingParallelism = acsContractFetchingParallelism,
acsGlobalParallelism = acsGlobalParallelism,
acsIdQueueLimit = acsIdQueueLimit,
servicesExecutionContext = executionContext,
metrics = metrics,
lfValueTranslationCache = LfValueTranslationCache.Cache.none,

View File

@ -510,6 +510,7 @@ private[dao] trait JdbcLedgerDaoTransactionsSpec extends OptionValues with Insid
acsIdFetchingParallelism = 2,
acsContractFetchingParallelism = 2,
acsGlobalParallelism = 10,
acsIdQueueLimit = 1000000,
)
response <- ledgerDao.transactionsReader
@ -647,6 +648,7 @@ private[dao] trait JdbcLedgerDaoTransactionsSpec extends OptionValues with Insid
acsIdFetchingParallelism: Int,
acsContractFetchingParallelism: Int,
acsGlobalParallelism: Int,
acsIdQueueLimit: Int,
) =
LoggingContext.newLoggingContext { implicit loggingContext =>
daoOwner(
@ -656,6 +658,7 @@ private[dao] trait JdbcLedgerDaoTransactionsSpec extends OptionValues with Insid
acsIdFetchingParallelism = acsIdFetchingParallelism,
acsContractFetchingParallelism = acsContractFetchingParallelism,
acsGlobalParallelism = acsGlobalParallelism,
acsIdQueueLimit = acsIdQueueLimit,
MockitoSugar.mock[ErrorFactories],
).acquire()(ResourceContext(executionContext))
}.asFuture

View File

@ -291,6 +291,7 @@ class ACSReaderSpec extends AsyncFlatSpec with Matchers with BeforeAndAfterAll {
tasks = List("a", "b", "c"),
outputBatchSize = 3,
inputBatchSize = 2,
idQueueLimit = 100,
metrics = new Metrics(new MetricRegistry),
)(implicitly)()
mutableLogic("a" -> List(1, 3)) shouldBe Nil // a [1 3] b [] c []

View File

@ -15,6 +15,7 @@ class TaskTrackerSpec extends AsyncFlatSpec with Matchers with BeforeAndAfterAll
val tracker = new FilterTableACSReader.TaskTracker[String](
allTasks = List("A"),
inputBatchSize = 2,
maxQueueSize = 100,
)
tracker.add("A", List(1L, 2L)) shouldBe Some(List(1L, 2L), "A") -> true
@ -26,6 +27,7 @@ class TaskTrackerSpec extends AsyncFlatSpec with Matchers with BeforeAndAfterAll
val tracker = new FilterTableACSReader.TaskTracker[String](
allTasks = List("A"),
inputBatchSize = 2,
maxQueueSize = 100,
)
tracker.add("A", List(1L)) shouldBe Some(List(1L), "A") -> true
@ -37,6 +39,7 @@ class TaskTrackerSpec extends AsyncFlatSpec with Matchers with BeforeAndAfterAll
val tracker = new FilterTableACSReader.TaskTracker[String](
allTasks = List("A"),
inputBatchSize = 2,
maxQueueSize = 100,
)
tracker.add("A", List.empty) shouldBe None -> true
@ -48,6 +51,7 @@ class TaskTrackerSpec extends AsyncFlatSpec with Matchers with BeforeAndAfterAll
val tracker = new FilterTableACSReader.TaskTracker[String](
allTasks = List("A"),
inputBatchSize = 2,
maxQueueSize = 100,
)
tracker.add("A", List(1L, 2L)) shouldBe Some(List(1L, 2L), "A") -> true
@ -64,6 +68,7 @@ class TaskTrackerSpec extends AsyncFlatSpec with Matchers with BeforeAndAfterAll
val tracker = new FilterTableACSReader.TaskTracker[String](
allTasks = List("A"),
inputBatchSize = 2,
maxQueueSize = 100,
)
tracker.add("A", List(1L, 2L)) shouldBe Some(List(1L, 2L), "A") -> true
@ -79,6 +84,7 @@ class TaskTrackerSpec extends AsyncFlatSpec with Matchers with BeforeAndAfterAll
val tracker = new FilterTableACSReader.TaskTracker[String](
allTasks = List("A", "B"),
inputBatchSize = 2,
maxQueueSize = 100,
)
tracker.add("A", List(1L, 2L)) shouldBe Some(List(1L, 2L), "A") -> false
@ -88,4 +94,19 @@ class TaskTrackerSpec extends AsyncFlatSpec with Matchers with BeforeAndAfterAll
tracker.finished("B") shouldBe None -> false
}
it should "fail if a queue is full" in {
val tracker = new FilterTableACSReader.TaskTracker[String](
allTasks = List("A", "B"),
inputBatchSize = 2,
maxQueueSize = 2,
)
tracker.add("A", List(1L, 2L)) shouldBe Some(List(1L, 2L), "A") -> false
tracker.add("A", List(3L, 4L)) shouldBe None -> false
tracker.add("A", List(5L, 6L)) shouldBe None -> false
assertThrows[RuntimeException] { tracker.add("A", List(7L, 8L)) }
tracker.add("B", List(7L, 8L)) shouldBe Some(List(7L, 8L), "B") -> true
}
}

View File

@ -46,6 +46,7 @@ final case class Config[Extra](
acsIdFetchingParallelism: Int,
acsContractFetchingParallelism: Int,
acsGlobalParallelism: Int,
acsIdQueueLimit: Int,
stateValueCache: caching.WeightedCache.Configuration,
lfValueTranslationEventCache: caching.SizedCache.Configuration,
lfValueTranslationContractCache: caching.SizedCache.Configuration,
@ -84,6 +85,7 @@ object Config {
acsIdFetchingParallelism = IndexConfiguration.DefaultAcsIdFetchingParallelism,
acsContractFetchingParallelism = IndexConfiguration.DefaultAcsContractFetchingParallelism,
acsGlobalParallelism = IndexConfiguration.DefaultAcsGlobalParallelism,
acsIdQueueLimit = IndexConfiguration.DefaultAcsIdQueueLimit,
stateValueCache = caching.WeightedCache.Configuration.none,
lfValueTranslationEventCache = caching.SizedCache.Configuration.none,
lfValueTranslationContractCache = caching.SizedCache.Configuration.none,
@ -520,6 +522,13 @@ object Config {
config.copy(acsGlobalParallelism = acsGlobalParallelism)
)
opt[Int]("acs-id-queue-limit")
.optional()
.text(
s"Maximum number of contract ids queued for fetching. Default is ${IndexConfiguration.DefaultAcsIdQueueLimit}."
)
.action((acsIdQueueLimit, config) => config.copy(acsIdQueueLimit = acsIdQueueLimit))
opt[Long]("max-state-value-cache-size")
.optional()
.text(

View File

@ -73,6 +73,7 @@ trait ConfigProvider[ExtraConfig] {
acsIdFetchingParallelism = config.acsIdFetchingParallelism,
acsContractFetchingParallelism = config.acsContractFetchingParallelism,
acsGlobalParallelism = config.acsGlobalParallelism,
acsIdQueueLimit = config.acsIdQueueLimit,
portFile = participantConfig.portFile,
seeding = config.seeding,
managementServiceTimeout = participantConfig.managementServiceTimeout,

View File

@ -258,6 +258,7 @@ class RecoveringIndexerIntegrationSpec
acsIdFetchingParallelism = 2,
acsContractFetchingParallelism = 2,
acsGlobalParallelism = 10,
acsIdQueueLimit = 1000000,
servicesExecutionContext = executionContext,
metrics = metrics,
lfValueTranslationCache = LfValueTranslationCache.Cache.none,

View File

@ -354,6 +354,7 @@ final class SandboxServer(
acsIdFetchingParallelism = config.acsIdFetchingParallelism,
acsContractFetchingParallelism = config.acsContractFetchingParallelism,
acsGlobalParallelism = config.acsGlobalParallelism,
acsIdQueueLimit = config.acsIdQueueLimit,
servicesExecutionContext = servicesExecutionContext,
metrics = metrics,
lfValueTranslationCache = lfValueTranslationCache,

View File

@ -62,6 +62,7 @@ private[sandbox] object SandboxIndexAndWriteService {
acsIdFetchingParallelism: Int,
acsContractFetchingParallelism: Int,
acsGlobalParallelism: Int,
acsIdQueueLimit: Int,
servicesExecutionContext: ExecutionContext,
metrics: Metrics,
lfValueTranslationCache: LfValueTranslationCache.Cache,
@ -90,6 +91,7 @@ private[sandbox] object SandboxIndexAndWriteService {
acsIdFetchingParallelism = acsIdFetchingParallelism,
acsContractFetchingParallelism = acsContractFetchingParallelism,
acsGlobalParallelism = acsGlobalParallelism,
acsIdQueueLimit = acsIdQueueLimit,
servicesExecutionContext = servicesExecutionContext,
metrics = metrics,
lfValueTranslationCache = lfValueTranslationCache,

View File

@ -85,6 +85,7 @@ private[sandbox] object SqlLedger {
acsIdFetchingParallelism: Int,
acsContractFetchingParallelism: Int,
acsGlobalParallelism: Int,
acsIdQueueLimit: Int,
servicesExecutionContext: ExecutionContext,
metrics: Metrics,
lfValueTranslationCache: LfValueTranslationCache.Cache,
@ -299,6 +300,7 @@ private[sandbox] object SqlLedger {
acsIdFetchingParallelism = acsIdFetchingParallelism,
acsContractFetchingParallelism = acsContractFetchingParallelism,
acsGlobalParallelism = acsGlobalParallelism,
acsIdQueueLimit = acsIdQueueLimit,
servicesExecutionContext = servicesExecutionContext,
metrics = metrics,
lfValueTranslationCache = lfValueTranslationCache,

View File

@ -104,6 +104,7 @@ private[sandbox] object LedgerResource {
acsIdFetchingParallelism = 2,
acsContractFetchingParallelism = 2,
acsGlobalParallelism = 10,
acsIdQueueLimit = 1000000,
servicesExecutionContext = servicesExecutionContext,
metrics = metrics,
lfValueTranslationCache = LfValueTranslationCache.Cache.none,

View File

@ -456,6 +456,7 @@ final class SqlLedgerSpec
acsIdFetchingParallelism = 2,
acsContractFetchingParallelism = 2,
acsGlobalParallelism = 10,
acsIdQueueLimit = 1000000,
servicesExecutionContext = executionContext,
metrics = metrics,
lfValueTranslationCache = LfValueTranslationCache.Cache.none,

View File

@ -291,6 +291,13 @@ class CommonCliBase(name: LedgerName) {
config.copy(acsContractFetchingParallelism = acsContractFetchingParallelism)
)
opt[Int]("acs-id-queue-limit")
.optional()
.text(
s"Maximum number of contract ids queued for fetching. Default is ${SandboxConfig.DefaultAcsIdQueueLimit}."
)
.action((acsIdQueueLimit, config) => config.copy(acsIdQueueLimit = acsIdQueueLimit))
opt[Int]("max-commands-in-flight")
.optional()
.action((value, config) =>

View File

@ -61,6 +61,7 @@ final case class SandboxConfig(
acsIdFetchingParallelism: Int,
acsContractFetchingParallelism: Int,
acsGlobalParallelism: Int,
acsIdQueueLimit: Int,
lfValueTranslationEventCacheConfiguration: SizedCache.Configuration,
lfValueTranslationContractCacheConfiguration: SizedCache.Configuration,
profileDir: Option[Path],
@ -101,6 +102,7 @@ object SandboxConfig {
val DefaultAcsIdFetchingParallelism: Int = 2
val DefaultAcsContractFetchingParallelism: Int = 2
val DefaultAcsGlobalParallelism: Int = 10
val DefaultAcsIdQueueLimit: Int = 10000000
val DefaultTimeProviderType: TimeProviderType = TimeProviderType.WallClock
@ -156,6 +158,7 @@ object SandboxConfig {
acsIdFetchingParallelism = DefaultAcsIdFetchingParallelism,
acsContractFetchingParallelism = DefaultAcsContractFetchingParallelism,
acsGlobalParallelism = DefaultAcsGlobalParallelism,
acsIdQueueLimit = DefaultAcsIdQueueLimit,
lfValueTranslationEventCacheConfiguration = DefaultLfValueTranslationCacheConfiguration,
lfValueTranslationContractCacheConfiguration = DefaultLfValueTranslationCacheConfiguration,
profileDir = None,