[kvutils] - Integrate the deduplication offset support into kvutils [kvl-1172] (#11916)

Add support for offset deduplication periods for KV ledgers by converting the offset into a duration using the record time for the completion found of the offset.

changelog_begin
changelog_end
This commit is contained in:
nicu-da 2021-12-06 08:09:59 -08:00 committed by GitHub
parent 9fa3c76b64
commit cd8f2c76fc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 167 additions and 66 deletions

View File

@ -40,6 +40,7 @@ da_scala_library(
"//ledger/error",
"//ledger/ledger-api-auth",
"//ledger/ledger-api-common",
"//ledger/ledger-api-domain",
"//ledger/ledger-api-health",
"//ledger/ledger-configuration",
"//ledger/ledger-resources",

View File

@ -4,17 +4,25 @@
package com.daml.ledger.participant.state.kvutils.app
import akka.stream.Materializer
import com.daml.ledger.api.domain
import com.daml.ledger.participant.state.index.v2.IndexCompletionsService
import com.daml.ledger.participant.state.kvutils.api.{
KeyValueParticipantStateReader,
KeyValueParticipantStateWriter,
LedgerReader,
LedgerWriter,
WriteServiceWithDeduplicationSupport,
}
import com.daml.ledger.participant.state.kvutils.deduplication.{
CompletionBasedDeduplicationPeriodConverter,
DeduplicationPeriodSupport,
}
import com.daml.ledger.participant.state.v2.{ReadService, WritePackagesService, WriteService}
import com.daml.ledger.resources.ResourceOwner
import com.daml.lf.engine.Engine
import com.daml.logging.LoggingContext
import com.daml.metrics.Metrics
import com.daml.platform.server.api.validation.{DeduplicationPeriodValidator, ErrorFactories}
import scala.concurrent.ExecutionContext
@ -35,7 +43,7 @@ trait ReadWriteServiceFactory {
def readService(): ReadService
def writePackageService(): WritePackagesService
def writePackagesService(): WritePackagesService
def writeService(): WriteService
}
@ -55,8 +63,7 @@ class KeyValueReadWriteFactory(
)
}
override def writePackageService(): WritePackagesService =
writeService()
override def writePackagesService(): WritePackagesService = writeService()
override def writeService(): WriteService = {
new KeyValueParticipantStateWriter(
@ -66,3 +73,30 @@ class KeyValueReadWriteFactory(
}
}
class KeyValueDeduplicationSupportFactory(
delegate: ReadWriteServiceFactory,
config: Config[_],
completionsService: IndexCompletionsService,
)(implicit materializer: Materializer, ec: ExecutionContext)
extends ReadWriteServiceFactory {
override def readService(): ReadService = delegate.readService()
override def writePackagesService(): WritePackagesService = delegate.writePackagesService()
override def writeService(): WriteService = {
val writeServiceDelegate = delegate.writeService()
val errorFactories = ErrorFactories(config.enableSelfServiceErrorCodes)
new WriteServiceWithDeduplicationSupport(
writeServiceDelegate,
new DeduplicationPeriodSupport(
new CompletionBasedDeduplicationPeriodConverter(
domain.LedgerId(config.ledgerId),
completionsService,
),
new DeduplicationPeriodValidator(errorFactories),
errorFactories,
),
)
}
}

View File

@ -141,7 +141,7 @@ final class Runner[T <: ReadWriteService, Extra](
metrics,
)(materializer, servicesExecutionContext, loggingContext)
.acquire()
writePackageService = ledgerFactory.writePackageService()
writePackageService = ledgerFactory.writePackagesService()
_ <- Resource.sequence(
config.archiveFiles.map(path =>
Resource.fromFuture(
@ -173,18 +173,23 @@ final class Runner[T <: ReadWriteService, Extra](
Resource.successful(new HealthChecks())
}
apiServerConfig = configProvider.apiServerConfig(participantConfig, config)
indexService <- StandaloneIndexService(
ledgerId = config.ledgerId,
config = apiServerConfig,
metrics = metrics,
engine = sharedEngine,
servicesExecutionContext = servicesExecutionContext,
lfValueTranslationCache = lfValueTranslationCache,
).acquire()
_ <- participantConfig.mode match {
case ParticipantRunMode.Combined | ParticipantRunMode.LedgerApiServer =>
val writeService = new TimedWriteService(ledgerFactory.writeService(), metrics)
for {
indexService <- StandaloneIndexService(
ledgerId = config.ledgerId,
config = apiServerConfig,
metrics = metrics,
engine = sharedEngine,
servicesExecutionContext = servicesExecutionContext,
lfValueTranslationCache = lfValueTranslationCache,
).acquire()
factory = new KeyValueDeduplicationSupportFactory(
ledgerFactory,
config,
indexService,
)(implicitly, servicesExecutionContext)
writeService = new TimedWriteService(factory.writeService(), metrics)
_ <- StandaloneApiServer(
indexService = indexService,
ledgerId = config.ledgerId,

View File

@ -109,4 +109,7 @@ class WriteServiceWithDeduplicationSupport(
telemetryContext: TelemetryContext,
): CompletionStage[SubmissionResult] =
delegate.submitConfiguration(maxRecordTime, submissionId, config)
override def isApiDeduplicationEnabled: Boolean = delegate.isApiDeduplicationEnabled
}

View File

@ -0,0 +1,65 @@
// Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.daml.ledger.participant.state.kvutils.deduplication
import java.time.{Duration, Instant}
import akka.stream.Materializer
import akka.stream.scaladsl.Sink
import com.daml.ledger.api.domain.{ApplicationId, LedgerId, LedgerOffset}
import com.daml.ledger.api.messages.command.completion.CompletionRequest
import com.daml.ledger.api.v1.command_completion_service.CompletionStreamResponse
import com.daml.ledger.participant.state.index.v2.IndexCompletionsService
import com.daml.lf.data.Ref
import com.daml.logging.LoggingContext
import scala.concurrent.{ExecutionContext, Future}
class CompletionBasedDeduplicationPeriodConverter(
ledgerId: LedgerId,
completionService: IndexCompletionsService,
) extends DeduplicationPeriodConverter {
override def convertOffsetToDuration(
offset: Ref.LedgerString,
applicationId: ApplicationId,
actAs: Set[Ref.Party],
submittedAt: Instant,
)(implicit
mat: Materializer,
ec: ExecutionContext,
loggingContext: LoggingContext,
): Future[Either[DeduplicationConversionFailure, Duration]] = completionAtOffset(
CompletionRequest(
ledgerId,
applicationId,
actAs,
LedgerOffset.Absolute(offset),
)
).map {
case Some(CompletionStreamResponse(Some(checkpoint), _)) =>
if (checkpoint.offset.flatMap(_.value.absolute).contains(offset)) {
checkpoint.recordTime match {
case Some(recordTime) =>
val duration = Duration.between(recordTime.asJavaInstant, submittedAt)
Right(duration)
case None => Left(DeduplicationConversionFailure.CompletionRecordTimeNotAvailable)
}
} else {
Left(DeduplicationConversionFailure.CompletionOffsetNotMatching)
}
case Some(CompletionStreamResponse(None, _)) =>
Left(DeduplicationConversionFailure.CompletionCheckpointNotAvailable)
case None => Left(DeduplicationConversionFailure.CompletionAtOffsetNotFound)
}
private def completionAtOffset(
request: CompletionRequest
)(implicit
mat: Materializer,
loggingContext: LoggingContext,
): Future[Option[CompletionStreamResponse]] = completionService
.getCompletions(request.offset, request.offset, request.applicationId, request.parties)
.runWith(Sink.headOption)
}

View File

@ -6,20 +6,13 @@ package com.daml.ledger.participant.state.kvutils.deduplication
import java.time.{Duration, Instant}
import akka.stream.Materializer
import akka.stream.scaladsl.Sink
import com.daml.ledger.api.domain.{ApplicationId, LedgerId, LedgerOffset}
import com.daml.ledger.api.messages.command.completion.CompletionRequest
import com.daml.ledger.api.v1.command_completion_service.CompletionStreamResponse
import com.daml.ledger.participant.state.index.v2.IndexCompletionsService
import com.daml.ledger.api.domain.ApplicationId
import com.daml.lf.data.Ref
import com.daml.logging.LoggingContext
import scala.concurrent.{ExecutionContext, Future}
class DeduplicationPeriodConverter(
ledgerId: LedgerId,
completionService: IndexCompletionsService,
) {
trait DeduplicationPeriodConverter {
def convertOffsetToDuration(
offset: Ref.LedgerString,
@ -30,40 +23,6 @@ class DeduplicationPeriodConverter(
mat: Materializer,
ec: ExecutionContext,
loggingContext: LoggingContext,
): Future[Either[DeduplicationConversionFailure, Duration]] = {
completionAtOffset(
CompletionRequest(
ledgerId,
applicationId,
actAs,
LedgerOffset.Absolute(offset),
)
).map {
case Some(CompletionStreamResponse(Some(checkpoint), _)) =>
if (checkpoint.offset.flatMap(_.value.absolute).contains(offset)) {
checkpoint.recordTime match {
case Some(recordTime) =>
val duration = Duration.between(recordTime.asJavaInstant, submittedAt)
Right(duration)
case None => Left(DeduplicationConversionFailure.CompletionRecordTimeNotAvailable)
}
} else {
Left(DeduplicationConversionFailure.CompletionOffsetNotMatching)
}
case Some(CompletionStreamResponse(None, _)) =>
Left(DeduplicationConversionFailure.CompletionCheckpointNotAvailable)
case None => Left(DeduplicationConversionFailure.CompletionAtOffsetNotFound)
}
}
): Future[Either[DeduplicationConversionFailure, Duration]]
private def completionAtOffset(
request: CompletionRequest
)(implicit
mat: Materializer,
loggingContext: LoggingContext,
): Future[Option[CompletionStreamResponse]] = {
completionService
.getCompletions(request.offset, request.offset, request.applicationId, request.parties)
.runWith(Sink.headOption)
}
}

View File

@ -0,0 +1,32 @@
// Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.daml.ledger.participant.state.kvutils.deduplication
import java.time.{Duration, Instant}
import akka.stream.Materializer
import com.daml.ledger.api.domain.ApplicationId
import com.daml.lf.data.Ref.{LedgerString, Party}
import com.daml.logging.LoggingContext
import scala.concurrent.{ExecutionContext, Future}
object NotSupportedDeduplicationPeriodConverter extends DeduplicationPeriodConverter {
override def convertOffsetToDuration(
offset: LedgerString,
applicationId: ApplicationId,
actAs: Set[Party],
submittedAt: Instant,
)(implicit
mat: Materializer,
ec: ExecutionContext,
loggingContext: LoggingContext,
): Future[Either[DeduplicationConversionFailure, Duration]] = Future.failed(
new IllegalArgumentException(
"Offsets are not supported as deduplication periods"
)
)
}

View File

@ -21,16 +21,18 @@ import org.scalatest.BeforeAndAfterEach
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AsyncWordSpec
class DeduplicationPeriodConverterSpec
class CompletionBasedDeduplicationPeriodConverterSpec
extends AsyncWordSpec
with Matchers
with MockitoSugar
with BeforeAndAfterEach
with AkkaBeforeAndAfterAll
with TestLoggers {
private val ledgerId = LedgerId("id")
private val indexCompletionsService: IndexCompletionsService = mock[IndexCompletionsService]
private val service = new DeduplicationPeriodConverter(ledgerId, indexCompletionsService)
private val deduplicationPeriodConverter =
new CompletionBasedDeduplicationPeriodConverter(ledgerId, indexCompletionsService)
private val offset = Ref.LedgerString.assertFromString("offset")
private val applicationId = ApplicationId(Ref.ApplicationId.assertFromString("id"))
private val parties = Set.empty[Ref.Party]
@ -51,7 +53,7 @@ class DeduplicationPeriodConverterSpec
)
)
completionServiceReturnsResponse(response)
service
deduplicationPeriodConverter
.convertOffsetToDuration(
offset,
applicationId,
@ -65,7 +67,7 @@ class DeduplicationPeriodConverterSpec
"return failure when there is an empty response" in {
completionServiceReturnsResponse(Source.empty)
service
deduplicationPeriodConverter
.convertOffsetToDuration(
offset,
applicationId,
@ -79,7 +81,7 @@ class DeduplicationPeriodConverterSpec
"return failure when the checkpoint is missing" in {
completionServiceReturnsResponse(Source.single(emptyResponse))
service
deduplicationPeriodConverter
.convertOffsetToDuration(
offset,
applicationId,
@ -101,7 +103,7 @@ class DeduplicationPeriodConverterSpec
)
)
)
service
deduplicationPeriodConverter
.convertOffsetToDuration(
offset,
applicationId,
@ -119,7 +121,7 @@ class DeduplicationPeriodConverterSpec
emptyResponse.update(_.checkpoint := Checkpoint.defaultInstance)
)
)
service
deduplicationPeriodConverter
.convertOffsetToDuration(
offset,
applicationId,
@ -141,7 +143,7 @@ class DeduplicationPeriodConverterSpec
)
)
)
service
deduplicationPeriodConverter
.convertOffsetToDuration(
offset,
applicationId,

View File

@ -47,7 +47,7 @@ object BridgeLedgerFactory extends LedgerFactory[BridgeConfig] {
new ReadWriteServiceFactory {
override def readService(): ReadService = readWriteService
override def writePackageService(): WritePackagesService = readWriteService
override def writePackagesService(): WritePackagesService = readWriteService
override def writeService(): WriteService = readWriteService
}