mirror of
https://github.com/digital-asset/daml.git
synced 2024-11-10 00:35:25 +03:00
ACS ID fetching enhancements [DPP-1058] (#14027)
* Adds exponential ID page retrieval to aid low stream initialization * Adds Working-Memory concept to calculate parameters to aid guaranteed memory footprint * Switches to dynamic BatchN batching instead of static grouping to aid stream initialization changelog_begin changelog_end
This commit is contained in:
parent
ee394702b8
commit
ec4f9850ba
@ -11,6 +11,7 @@ final case class IndexServiceConfig(
|
||||
bufferedStreamsPageSize: Int = IndexServiceConfig.DefaultBufferedStreamsPageSize,
|
||||
acsIdPageSize: Int = IndexServiceConfig.DefaultAcsIdPageSize,
|
||||
acsIdPageBufferSize: Int = IndexServiceConfig.DefaultAcsIdPageBufferSize,
|
||||
acsIdPageWorkingMemoryBytes: Int = IndexServiceConfig.DefaultAcsIdPageWorkingMemoryBytes,
|
||||
acsIdFetchingParallelism: Int = IndexServiceConfig.DefaultAcsIdFetchingParallelism,
|
||||
acsContractFetchingParallelism: Int = IndexServiceConfig.DefaultAcsContractFetchingParallelism,
|
||||
acsGlobalParallelism: Int = IndexServiceConfig.DefaultAcsGlobalParallelism,
|
||||
@ -29,6 +30,7 @@ object IndexServiceConfig {
|
||||
val DefaultBufferedStreamsPageSize: Int = 100
|
||||
val DefaultAcsIdPageSize: Int = 20000
|
||||
val DefaultAcsIdPageBufferSize: Int = 1
|
||||
val DefaultAcsIdPageWorkingMemoryBytes: Int = 100 * 1024 * 1024
|
||||
val DefaultAcsIdFetchingParallelism: Int = 2
|
||||
val DefaultAcsContractFetchingParallelism: Int = 2
|
||||
val DefaultAcsGlobalParallelism: Int = 10
|
||||
|
@ -307,6 +307,7 @@ private[platform] case class IndexServiceBuilder(
|
||||
eventsProcessingParallelism = config.eventsProcessingParallelism,
|
||||
acsIdPageSize = config.acsIdPageSize,
|
||||
acsIdPageBufferSize = config.acsIdPageBufferSize,
|
||||
acsIdPageWorkingMemoryBytes = config.acsIdPageWorkingMemoryBytes,
|
||||
acsIdFetchingParallelism = config.acsIdFetchingParallelism,
|
||||
acsContractFetchingParallelism = config.acsContractFetchingParallelism,
|
||||
acsGlobalParallelism = config.acsGlobalParallelism,
|
||||
|
@ -68,6 +68,7 @@ object IndexMetadata {
|
||||
eventsProcessingParallelism = 8,
|
||||
acsIdPageSize = 20000,
|
||||
acsIdPageBufferSize = 1,
|
||||
acsIdPageWorkingMemoryBytes = 100 * 1024 * 1024,
|
||||
acsIdFetchingParallelism = 2,
|
||||
acsContractFetchingParallelism = 2,
|
||||
acsGlobalParallelism = 10,
|
||||
|
@ -43,6 +43,7 @@ private class JdbcLedgerDao(
|
||||
eventsProcessingParallelism: Int,
|
||||
acsIdPageSize: Int,
|
||||
acsIdPageBufferSize: Int,
|
||||
acsIdPageWorkingMemoryBytes: Int,
|
||||
acsIdFetchingParallelism: Int,
|
||||
acsContractFetchingParallelism: Int,
|
||||
acsGlobalParallelism: Int,
|
||||
@ -472,6 +473,7 @@ private class JdbcLedgerDao(
|
||||
pageSize = eventsPageSize,
|
||||
idPageSize = acsIdPageSize,
|
||||
idPageBufferSize = acsIdPageBufferSize,
|
||||
idPageWorkingMemoryBytes = acsIdPageWorkingMemoryBytes,
|
||||
idFetchingParallelism = acsIdFetchingParallelism,
|
||||
acsFetchingparallelism = acsContractFetchingParallelism,
|
||||
metrics = metrics,
|
||||
@ -568,6 +570,7 @@ private[platform] object JdbcLedgerDao {
|
||||
eventsProcessingParallelism: Int,
|
||||
acsIdPageSize: Int,
|
||||
acsIdPageBufferSize: Int,
|
||||
acsIdPageWorkingMemoryBytes: Int,
|
||||
acsIdFetchingParallelism: Int,
|
||||
acsContractFetchingParallelism: Int,
|
||||
acsGlobalParallelism: Int,
|
||||
@ -587,6 +590,7 @@ private[platform] object JdbcLedgerDao {
|
||||
eventsProcessingParallelism,
|
||||
acsIdPageSize,
|
||||
acsIdPageBufferSize,
|
||||
acsIdPageWorkingMemoryBytes,
|
||||
acsIdFetchingParallelism,
|
||||
acsContractFetchingParallelism,
|
||||
acsGlobalParallelism,
|
||||
@ -608,6 +612,7 @@ private[platform] object JdbcLedgerDao {
|
||||
eventsProcessingParallelism: Int,
|
||||
acsIdPageSize: Int,
|
||||
acsIdPageBufferSize: Int,
|
||||
acsIdPageWorkingMemoryBytes: Int,
|
||||
acsIdFetchingParallelism: Int,
|
||||
acsContractFetchingParallelism: Int,
|
||||
acsGlobalParallelism: Int,
|
||||
@ -627,6 +632,7 @@ private[platform] object JdbcLedgerDao {
|
||||
eventsProcessingParallelism,
|
||||
acsIdPageSize,
|
||||
acsIdPageBufferSize,
|
||||
acsIdPageWorkingMemoryBytes,
|
||||
acsIdFetchingParallelism,
|
||||
acsContractFetchingParallelism,
|
||||
acsGlobalParallelism,
|
||||
|
@ -5,10 +5,11 @@ package com.daml.platform.store.dao.events
|
||||
|
||||
import akka.NotUsed
|
||||
import akka.stream.scaladsl.Source
|
||||
import akka.stream.OverflowStrategy
|
||||
import akka.stream.{Attributes, OverflowStrategy}
|
||||
import com.daml.ledger.offset.Offset
|
||||
import com.daml.logging.{ContextualizedLogger, LoggingContext}
|
||||
import com.daml.metrics.Metrics
|
||||
import com.daml.platform.indexer.parallel.BatchN
|
||||
import com.daml.platform.{FilterRelation, Identifier, Party}
|
||||
import com.daml.platform.store.dao.DbDispatcher
|
||||
import com.daml.platform.store.backend.EventStorageBackend
|
||||
@ -34,6 +35,7 @@ class FilterTableACSReader(
|
||||
pageSize: Int,
|
||||
idPageSize: Int,
|
||||
idPageBufferSize: Int,
|
||||
idPageWorkingMemoryBytes: Int,
|
||||
idFetchingParallelism: Int,
|
||||
acsFetchingparallelism: Int,
|
||||
metrics: Metrics,
|
||||
@ -60,28 +62,38 @@ class FilterTableACSReader(
|
||||
val idQueryLimiter =
|
||||
new QueueBasedConcurrencyLimiter(idFetchingParallelism, executionContext)
|
||||
|
||||
val idQueryConfiguration = IdQueryConfiguration(
|
||||
maxIdPageSize = idPageSize,
|
||||
idPageWorkingMemoryBytes = idPageWorkingMemoryBytes,
|
||||
filterSize = filters.size,
|
||||
idPageBufferSize = idPageBufferSize,
|
||||
)
|
||||
|
||||
def toIdSource(filter: Filter): Source[Long, NotUsed] =
|
||||
idSource(idPageBufferSize)(fromExclusive =>
|
||||
idQueryLimiter.execute(
|
||||
idSource(
|
||||
idQueryConfiguration = idQueryConfiguration,
|
||||
pageBufferSize = idPageBufferSize,
|
||||
)(idQuery =>
|
||||
idQueryLimiter.execute {
|
||||
dispatcher.executeSql(metrics.daml.index.db.getActiveContractIds) { connection =>
|
||||
val result = eventStorageBackend.activeContractEventIds(
|
||||
partyFilter = filter.party,
|
||||
templateIdFilter = filter.templateId,
|
||||
startExclusive = fromExclusive,
|
||||
startExclusive = idQuery.fromExclusiveEventSeqId,
|
||||
endInclusive = activeAt._2,
|
||||
limit = idPageSize,
|
||||
limit = idQuery.pageSize,
|
||||
)(connection)
|
||||
logger.debug(
|
||||
s"getActiveContractIds $filter returned #${result.size} ${result.lastOption
|
||||
.map(last => s"until $last")
|
||||
.getOrElse("")}"
|
||||
)
|
||||
result
|
||||
result.toArray
|
||||
}
|
||||
)
|
||||
}
|
||||
)
|
||||
|
||||
def fetchAcs(ids: Vector[Long]): Future[Vector[EventStorageBackend.Entry[Raw.FlatEvent]]] =
|
||||
def fetchAcs(ids: Iterable[Long]): Future[Vector[EventStorageBackend.Entry[Raw.FlatEvent]]] =
|
||||
querylimiter.execute(
|
||||
dispatcher.executeSql(metrics.daml.index.db.getActiveContractBatch) { connection =>
|
||||
val result = queryNonPruned.executeSql(
|
||||
@ -107,32 +119,102 @@ class FilterTableACSReader(
|
||||
.map(toIdSource)
|
||||
.pipe(mergeSort[Long])
|
||||
.statefulMapConcat(statefulDeduplicate)
|
||||
.grouped(pageSize)
|
||||
.map(_.toVector)
|
||||
.via(
|
||||
BatchN(
|
||||
maxBatchSize = pageSize,
|
||||
maxBatchCount = acsFetchingparallelism + 1,
|
||||
)
|
||||
)
|
||||
.async
|
||||
.addAttributes(
|
||||
Attributes.inputBuffer(initial = acsFetchingparallelism, max = acsFetchingparallelism)
|
||||
)
|
||||
.mapAsync(acsFetchingparallelism)(fetchAcs)
|
||||
}
|
||||
}
|
||||
|
||||
private[events] object FilterTableACSReader {
|
||||
private val logger = ContextualizedLogger.get(this.getClass)
|
||||
|
||||
case class Filter(party: Party, templateId: Option[Identifier])
|
||||
|
||||
case class IdQuery(fromExclusiveEventSeqId: Long, pageSize: Int)
|
||||
|
||||
case class IdQueryConfiguration(
|
||||
minPageSize: Int,
|
||||
maxPageSize: Int,
|
||||
) {
|
||||
assert(minPageSize > 0)
|
||||
assert(maxPageSize >= minPageSize)
|
||||
}
|
||||
|
||||
object IdQueryConfiguration {
|
||||
def apply(
|
||||
maxIdPageSize: Int,
|
||||
idPageWorkingMemoryBytes: Int,
|
||||
filterSize: Int,
|
||||
idPageBufferSize: Int,
|
||||
)(implicit loggingContext: LoggingContext): IdQueryConfiguration = {
|
||||
val LowestIdPageSize =
|
||||
Math.min(10, maxIdPageSize) // maxIdPageSize can override this if it is smaller
|
||||
// Approximation how many index entries can be present in one btree index leaf page (fetching smaller than this only adds round-trip overhead, without boiling down to smaller disk read)
|
||||
// Experiments show party_id, template_id index has 244 tuples per page, wildcard party_id index has 254 per page (with default fill ratio for BTREE Index)
|
||||
// Picking a lower number is for accommodating pruning, deletions, index bloat effect, which boil down to lower tuple per page ratio.
|
||||
val RecommendedMinIdPageSize =
|
||||
Math.min(200, maxIdPageSize) // maxIdPageSize can override this if it is smaller
|
||||
val calculatedMaxIdPageSize =
|
||||
idPageWorkingMemoryBytes
|
||||
./(8) // IDs stored in 8 bytes
|
||||
./(
|
||||
idPageBufferSize + 1
|
||||
) // for each filter we need one page fetched for merge sorting, and additional pages might reside in the buffer
|
||||
./(filterSize)
|
||||
if (calculatedMaxIdPageSize < LowestIdPageSize) {
|
||||
logger.warn(
|
||||
s"Calculated maximum ID page size supporting API stream memory limits [$calculatedMaxIdPageSize] is too low: $LowestIdPageSize is used instead. Warning: API stream memory limits not respected. Warning: Dangerously low maximum ID page size can cause poor streaming performance. Filter size [$filterSize] too large?"
|
||||
)
|
||||
IdQueryConfiguration(LowestIdPageSize, LowestIdPageSize)
|
||||
} else if (calculatedMaxIdPageSize < RecommendedMinIdPageSize) {
|
||||
logger.warn(
|
||||
s"Calculated maximum ID page size supporting API stream memory limits [$calculatedMaxIdPageSize] is very low. Warning: Low maximum ID page size can cause poor streaming performance. Filter size [$filterSize] too large?"
|
||||
)
|
||||
IdQueryConfiguration(calculatedMaxIdPageSize, calculatedMaxIdPageSize)
|
||||
} else if (calculatedMaxIdPageSize < maxIdPageSize) {
|
||||
logger.info(
|
||||
s"Calculated maximum ID page size supporting API stream memory limits [$calculatedMaxIdPageSize] is low. Warning: Low maximum ID page size can cause poor streaming performance. Filter size [$filterSize] too large?"
|
||||
)
|
||||
IdQueryConfiguration(RecommendedMinIdPageSize, calculatedMaxIdPageSize)
|
||||
} else {
|
||||
logger.debug(
|
||||
s"Calculated maximum ID page size supporting API stream memory limits [$calculatedMaxIdPageSize] is sufficiently high, using [$maxIdPageSize] instead."
|
||||
)
|
||||
IdQueryConfiguration(RecommendedMinIdPageSize, maxIdPageSize)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
def idSource(
|
||||
pageBufferSize: Int
|
||||
)(getPage: Long => Future[Vector[Long]]): Source[Long, NotUsed] = {
|
||||
idQueryConfiguration: IdQueryConfiguration,
|
||||
pageBufferSize: Int,
|
||||
)(getPage: IdQuery => Future[Array[Long]]): Source[Long, NotUsed] = {
|
||||
assert(pageBufferSize > 0)
|
||||
Source
|
||||
.unfoldAsync(0L) { fromExclusive =>
|
||||
getPage(fromExclusive).map {
|
||||
.unfoldAsync(
|
||||
IdQuery(0L, idQueryConfiguration.minPageSize)
|
||||
) { query =>
|
||||
getPage(query).map {
|
||||
case empty if empty.isEmpty => None
|
||||
case nonEmpty =>
|
||||
Some(
|
||||
nonEmpty.last -> nonEmpty
|
||||
IdQuery(
|
||||
nonEmpty.last,
|
||||
Math.min(query.pageSize * 4, idQueryConfiguration.maxPageSize),
|
||||
) -> nonEmpty
|
||||
)
|
||||
}(scala.concurrent.ExecutionContext.parasitic)
|
||||
}
|
||||
.buffer(pageBufferSize, OverflowStrategy.backpressure)
|
||||
.mapConcat(identity)
|
||||
.mapConcat(identity(_))
|
||||
}
|
||||
|
||||
@tailrec
|
||||
|
@ -87,6 +87,7 @@ private[dao] trait JdbcLedgerDaoBackend extends AkkaBeforeAndAfterAll {
|
||||
eventsProcessingParallelism = eventsProcessingParallelism,
|
||||
acsIdPageSize = acsIdPageSize,
|
||||
acsIdPageBufferSize = 1,
|
||||
acsIdPageWorkingMemoryBytes = 100 * 1024 * 1024,
|
||||
acsIdFetchingParallelism = acsIdFetchingParallelism,
|
||||
acsContractFetchingParallelism = acsContractFetchingParallelism,
|
||||
acsGlobalParallelism = acsGlobalParallelism,
|
||||
|
@ -12,40 +12,144 @@ import org.scalatest.matchers.should.Matchers
|
||||
|
||||
import scala.concurrent.duration.Duration
|
||||
import scala.concurrent.{Await, ExecutionContext, Future}
|
||||
import FilterTableACSReader._
|
||||
import com.daml.logging.LoggingContext
|
||||
|
||||
class ACSReaderSpec extends AsyncFlatSpec with Matchers with BeforeAndAfterAll {
|
||||
private val actorSystem = ActorSystem()
|
||||
private implicit val materializer: Materializer = Materializer(actorSystem)
|
||||
private implicit val ec: ExecutionContext = actorSystem.dispatcher
|
||||
private implicit val lc: LoggingContext = LoggingContext.empty
|
||||
|
||||
override def afterAll(): Unit = {
|
||||
Await.result(actorSystem.terminate(), Duration(10, "seconds"))
|
||||
()
|
||||
}
|
||||
|
||||
behavior of "idSource"
|
||||
behavior of "IdQueryConfiguration"
|
||||
|
||||
it should "populate expected continuation" in {
|
||||
FilterTableACSReader
|
||||
.idSource(pageBufferSize = 1) { from =>
|
||||
Future.successful(
|
||||
if (from == 8) Vector.empty
|
||||
else Vector(from + 1, from + 1, from + 2)
|
||||
)
|
||||
}
|
||||
.runWith(Sink.seq)
|
||||
.map(_ shouldBe Vector(1, 1, 2, 3, 3, 4, 5, 5, 6, 7, 7, 8))
|
||||
it should "compute correct parameters for a realistic case" in {
|
||||
def realisticConfigForFilterSize(filterSize: Int) = IdQueryConfiguration(
|
||||
maxIdPageSize = 10000,
|
||||
idPageWorkingMemoryBytes = 100 * 1024 * 1024,
|
||||
filterSize = filterSize,
|
||||
idPageBufferSize = 1,
|
||||
)
|
||||
// progression: 200 800 3200 10000 10000...
|
||||
realisticConfigForFilterSize(1) shouldBe IdQueryConfiguration(200, 10000)
|
||||
realisticConfigForFilterSize(10) shouldBe IdQueryConfiguration(200, 10000)
|
||||
realisticConfigForFilterSize(100) shouldBe IdQueryConfiguration(200, 10000)
|
||||
// 200 800 3200 6553 6553...
|
||||
realisticConfigForFilterSize(1000) shouldBe IdQueryConfiguration(200, 6553)
|
||||
// 200 655 655...
|
||||
realisticConfigForFilterSize(10000) shouldBe IdQueryConfiguration(200, 655)
|
||||
realisticConfigForFilterSize(100000) shouldBe IdQueryConfiguration(65, 65)
|
||||
realisticConfigForFilterSize(1000000) shouldBe IdQueryConfiguration(10, 10)
|
||||
realisticConfigForFilterSize(10000000) shouldBe IdQueryConfiguration(10, 10)
|
||||
}
|
||||
|
||||
it should "populate empty stream correctly" in {
|
||||
FilterTableACSReader
|
||||
.idSource(pageBufferSize = 1) { _ =>
|
||||
Future.successful(
|
||||
Vector.empty
|
||||
)
|
||||
}
|
||||
.runWith(Sink.seq)
|
||||
.map(_ shouldBe Vector.empty)
|
||||
it should "compute correct parameters, if maxIdPageSize is lower than recommended (200), then maxIdPageSize is preferred" in {
|
||||
def configWith(filterSize: Int) = IdQueryConfiguration(
|
||||
maxIdPageSize = 150,
|
||||
idPageWorkingMemoryBytes = 100 * 1024 * 1024,
|
||||
filterSize = filterSize,
|
||||
idPageBufferSize = 1,
|
||||
)
|
||||
configWith(1) shouldBe IdQueryConfiguration(150, 150)
|
||||
configWith(10) shouldBe IdQueryConfiguration(150, 150)
|
||||
configWith(100) shouldBe IdQueryConfiguration(150, 150)
|
||||
configWith(1000) shouldBe IdQueryConfiguration(150, 150)
|
||||
configWith(10000) shouldBe IdQueryConfiguration(150, 150)
|
||||
configWith(100000) shouldBe IdQueryConfiguration(65, 65)
|
||||
configWith(1000000) shouldBe IdQueryConfiguration(10, 10)
|
||||
configWith(10000000) shouldBe IdQueryConfiguration(10, 10)
|
||||
}
|
||||
|
||||
it should "compute correct parameters, if maxIdPageSize is lower than minimum (10), then maxIdPageSize is preferred" in {
|
||||
def configWith(filterSize: Int) = IdQueryConfiguration(
|
||||
maxIdPageSize = 4,
|
||||
idPageWorkingMemoryBytes = 100 * 1024 * 1024,
|
||||
filterSize = filterSize,
|
||||
idPageBufferSize = 1,
|
||||
)
|
||||
configWith(1) shouldBe IdQueryConfiguration(4, 4)
|
||||
configWith(10) shouldBe IdQueryConfiguration(4, 4)
|
||||
configWith(100) shouldBe IdQueryConfiguration(4, 4)
|
||||
configWith(1000) shouldBe IdQueryConfiguration(4, 4)
|
||||
configWith(10000) shouldBe IdQueryConfiguration(4, 4)
|
||||
configWith(100000) shouldBe IdQueryConfiguration(4, 4)
|
||||
configWith(1000000) shouldBe IdQueryConfiguration(4, 4)
|
||||
configWith(10000000) shouldBe IdQueryConfiguration(4, 4)
|
||||
}
|
||||
|
||||
behavior of "idSource"
|
||||
|
||||
it should "stream data exponentially" in {
|
||||
testIdSource(
|
||||
IdQueryConfiguration(
|
||||
minPageSize = 1,
|
||||
maxPageSize = 20,
|
||||
),
|
||||
Range(1, 70).map(_.toLong).toVector,
|
||||
).map(
|
||||
_ shouldBe Vector(
|
||||
IdQuery(0, 1),
|
||||
IdQuery(1, 4),
|
||||
IdQuery(5, 16),
|
||||
IdQuery(21, 20),
|
||||
IdQuery(41, 20),
|
||||
IdQuery(61, 20),
|
||||
IdQuery(69, 20),
|
||||
)
|
||||
)
|
||||
}
|
||||
|
||||
it should "stream data constantly" in {
|
||||
testIdSource(
|
||||
IdQueryConfiguration(
|
||||
minPageSize = 20,
|
||||
maxPageSize = 20,
|
||||
),
|
||||
Range(1, 70).map(_.toLong).toVector,
|
||||
).map(
|
||||
_ shouldBe Vector(
|
||||
IdQuery(0, 20),
|
||||
IdQuery(20, 20),
|
||||
IdQuery(40, 20),
|
||||
IdQuery(60, 20),
|
||||
IdQuery(69, 20),
|
||||
)
|
||||
)
|
||||
}
|
||||
|
||||
it should "stream data exponentially, if maxPageSize never reached" in {
|
||||
testIdSource(
|
||||
IdQueryConfiguration(
|
||||
minPageSize = 1,
|
||||
maxPageSize = 20,
|
||||
),
|
||||
Range(1, 6).map(_.toLong).toVector,
|
||||
).map(
|
||||
_ shouldBe Vector(
|
||||
IdQuery(0, 1),
|
||||
IdQuery(1, 4),
|
||||
IdQuery(5, 16),
|
||||
)
|
||||
)
|
||||
}
|
||||
|
||||
it should "stream empty data" in {
|
||||
testIdSource(
|
||||
IdQueryConfiguration(
|
||||
minPageSize = 1,
|
||||
maxPageSize = 20,
|
||||
),
|
||||
Vector.empty,
|
||||
).map(
|
||||
_ shouldBe Vector(
|
||||
IdQuery(0, 1)
|
||||
)
|
||||
)
|
||||
}
|
||||
|
||||
behavior of "mergeSort"
|
||||
@ -105,35 +209,35 @@ class ACSReaderSpec extends AsyncFlatSpec with Matchers with BeforeAndAfterAll {
|
||||
|
||||
it should "deduplicate a stream correctly" in {
|
||||
Source(Vector(1, 1, 2, 2, 2, 3, 4, 4, 5, 6, 7, 0, 0, 0))
|
||||
.statefulMapConcat(FilterTableACSReader.statefulDeduplicate)
|
||||
.statefulMapConcat(statefulDeduplicate)
|
||||
.runWith(Sink.seq)
|
||||
.map(_ shouldBe Vector(1, 2, 3, 4, 5, 6, 7, 0))
|
||||
}
|
||||
|
||||
it should "preserve a stream of unique numbers" in {
|
||||
Source(Vector(1, 2, 3, 4, 5, 6, 7, 0))
|
||||
.statefulMapConcat(FilterTableACSReader.statefulDeduplicate)
|
||||
.statefulMapConcat(statefulDeduplicate)
|
||||
.runWith(Sink.seq)
|
||||
.map(_ shouldBe Vector(1, 2, 3, 4, 5, 6, 7, 0))
|
||||
}
|
||||
|
||||
it should "work for empty stream" in {
|
||||
Source(Vector.empty)
|
||||
.statefulMapConcat(FilterTableACSReader.statefulDeduplicate)
|
||||
.statefulMapConcat(statefulDeduplicate)
|
||||
.runWith(Sink.seq)
|
||||
.map(_ shouldBe Vector.empty)
|
||||
}
|
||||
|
||||
it should "work for one sized stream" in {
|
||||
Source(Vector(1))
|
||||
.statefulMapConcat(FilterTableACSReader.statefulDeduplicate)
|
||||
.statefulMapConcat(statefulDeduplicate)
|
||||
.runWith(Sink.seq)
|
||||
.map(_ shouldBe Vector(1))
|
||||
}
|
||||
|
||||
it should "work if only duplications present" in {
|
||||
Source(Vector(1, 1, 1, 1))
|
||||
.statefulMapConcat(FilterTableACSReader.statefulDeduplicate)
|
||||
.statefulMapConcat(statefulDeduplicate)
|
||||
.runWith(Sink.seq)
|
||||
.map(_ shouldBe Vector(1))
|
||||
}
|
||||
@ -155,4 +259,23 @@ class ACSReaderSpec extends AsyncFlatSpec with Matchers with BeforeAndAfterAll {
|
||||
}
|
||||
}
|
||||
|
||||
private def testIdSource(
|
||||
idQueryConfiguration: IdQueryConfiguration,
|
||||
ids: Vector[Long],
|
||||
): Future[Vector[IdQuery]] = {
|
||||
val queries = Vector.newBuilder[IdQuery]
|
||||
idSource(idQueryConfiguration, 1) { idQuery =>
|
||||
queries.addOne(idQuery)
|
||||
Future.successful(
|
||||
ids
|
||||
.dropWhile(_ <= idQuery.fromExclusiveEventSeqId)
|
||||
.take(idQuery.pageSize)
|
||||
.toArray
|
||||
)
|
||||
}.runWith(Sink.seq[Long]).map { result =>
|
||||
result shouldBe ids
|
||||
queries.result()
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -259,6 +259,7 @@ class RecoveringIndexerIntegrationSpec
|
||||
eventsProcessingParallelism = 8,
|
||||
acsIdPageSize = 20000,
|
||||
acsIdPageBufferSize = 1,
|
||||
acsIdPageWorkingMemoryBytes = 100 * 1024 * 1024,
|
||||
acsIdFetchingParallelism = 2,
|
||||
acsContractFetchingParallelism = 2,
|
||||
acsGlobalParallelism = 10,
|
||||
|
Loading…
Reference in New Issue
Block a user