participant-state - Add an implicit logging context to the write service [kvl-1072] (#11838)

changelog_begin
changelog_end
This commit is contained in:
nicu-da 2021-11-26 03:12:17 -08:00 committed by GitHub
parent 9ff64f7faa
commit a1705d669a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 143 additions and 52 deletions

View File

@ -336,7 +336,10 @@ object ApiConfigManagementServiceSpec {
maxRecordTime: Time.Timestamp,
submissionId: Ref.SubmissionId,
config: Configuration,
)(implicit telemetryContext: TelemetryContext): CompletionStage[state.SubmissionResult] = {
)(implicit
loggingContext: LoggingContext,
telemetryContext: TelemetryContext,
): CompletionStage[state.SubmissionResult] = {
telemetryContext.setAttribute(
anApplicationIdSpanAttribute._1,
anApplicationIdSpanAttribute._2,
@ -389,7 +392,10 @@ object ApiConfigManagementServiceSpec {
maxRecordTime: Time.Timestamp,
submissionId: SubmissionId,
configuration: Configuration,
)(implicit telemetryContext: TelemetryContext): CompletionStage[SubmissionResult] = {
)(implicit
loggingContext: LoggingContext,
telemetryContext: TelemetryContext,
): CompletionStage[SubmissionResult] = {
configurationQueue.offer((currentOffset.getAndIncrement(), submissionId, configuration))
completedFuture(state.SubmissionResult.Acknowledged)
}

View File

@ -132,7 +132,10 @@ object ApiPackageManagementServiceSpec {
submissionId: Ref.SubmissionId,
archives: List[DamlLf.Archive],
sourceDescription: Option[String],
)(implicit telemetryContext: TelemetryContext): CompletionStage[state.SubmissionResult] = {
)(implicit
loggingContext: LoggingContext,
telemetryContext: TelemetryContext,
): CompletionStage[state.SubmissionResult] = {
telemetryContext.setAttribute(
anApplicationIdSpanAttribute._1,
anApplicationIdSpanAttribute._2,

View File

@ -89,7 +89,10 @@ object ApiPartyManagementServiceSpec {
hint: Option[Ref.Party],
displayName: Option[String],
submissionId: Ref.SubmissionId,
)(implicit telemetryContext: TelemetryContext): CompletionStage[state.SubmissionResult] = {
)(implicit
loggingContext: LoggingContext,
telemetryContext: TelemetryContext,
): CompletionStage[state.SubmissionResult] = {
telemetryContext.setAttribute(
anApplicationIdSpanAttribute._1,
anApplicationIdSpanAttribute._2,

View File

@ -73,7 +73,7 @@ final class LedgerConfigurationProvisionerSpec
any[Timestamp],
any[Ref.SubmissionId],
any[Configuration],
)(any[TelemetryContext])
)(any[LoggingContext], any[TelemetryContext])
scheduler.timePasses(100.millis)
eventually {
@ -81,7 +81,7 @@ final class LedgerConfigurationProvisionerSpec
eqTo(Timestamp.assertFromInstant(timeProvider.getCurrentTime.plusSeconds(60))),
eqTo(submissionId),
eqTo(configurationToSubmit),
)(any[TelemetryContext])
)(any[LoggingContext], any[TelemetryContext])
}
succeed
}
@ -115,7 +115,7 @@ final class LedgerConfigurationProvisionerSpec
any[Timestamp],
any[Ref.SubmissionId],
any[Configuration],
)(any[TelemetryContext])
)(any[LoggingContext], any[TelemetryContext])
succeed
}
}
@ -158,7 +158,7 @@ final class LedgerConfigurationProvisionerSpec
any[Timestamp],
any[Ref.SubmissionId],
any[Configuration],
)(any[TelemetryContext])
)(any[LoggingContext], any[TelemetryContext])
succeed
}
}
@ -192,7 +192,7 @@ final class LedgerConfigurationProvisionerSpec
any[Timestamp],
any[Ref.SubmissionId],
any[Configuration],
)(any[TelemetryContext])
)(any[LoggingContext], any[TelemetryContext])
succeed
}
}

View File

@ -102,7 +102,7 @@ class ApiSubmissionServiceSpec
any[Option[Ref.Party]],
any[Option[Ref.Party]],
any[Ref.SubmissionId],
)(any[TelemetryContext])
)(any[LoggingContext], any[TelemetryContext])
).thenReturn(completedFuture(state.SubmissionResult.Acknowledged))
val service =
@ -118,7 +118,7 @@ class ApiSubmissionServiceSpec
eqTo(Some(Ref.Party.assertFromString(party))),
eqTo(Some(party)),
any[Ref.SubmissionId],
)(any[TelemetryContext])
)(any[LoggingContext], any[TelemetryContext])
}
verifyNoMoreInteractions(writeService)
succeed
@ -137,7 +137,7 @@ class ApiSubmissionServiceSpec
any[Option[Ref.Party]],
any[Option[Ref.Party]],
any[Ref.SubmissionId],
)(any[TelemetryContext])
)(any[LoggingContext], any[TelemetryContext])
).thenReturn(completedFuture(state.SubmissionResult.Acknowledged))
val service =
@ -151,7 +151,7 @@ class ApiSubmissionServiceSpec
any[Option[Ref.Party]],
any[Option[String]],
any[Ref.SubmissionId],
)(any[TelemetryContext])
)(any[LoggingContext], any[TelemetryContext])
succeed
}
}
@ -173,7 +173,7 @@ class ApiSubmissionServiceSpec
any[Option[Ref.Party]],
any[Option[String]],
any[Ref.SubmissionId],
)(any[TelemetryContext])
)(any[LoggingContext], any[TelemetryContext])
succeed
}
}
@ -192,7 +192,7 @@ class ApiSubmissionServiceSpec
eqTo(Some(typedParty)),
eqTo(Some(party)),
any[Ref.SubmissionId],
)(any[TelemetryContext])
)(any[LoggingContext], any[TelemetryContext])
).thenReturn(completedFuture(submissionFailure))
when(partyManagementService.getParties(Seq(typedParty)))
.thenReturn(Future(List.empty[PartyDetails]))

View File

@ -18,6 +18,7 @@ import com.daml.ledger.participant.state.v2.{
}
import com.daml.lf.data.{Ref, Time}
import com.daml.lf.transaction.SubmittedTransaction
import com.daml.logging.LoggingContext
import com.daml.metrics.{Metrics, Timed}
import com.daml.telemetry.TelemetryContext
@ -28,7 +29,10 @@ final class TimedWriteService(delegate: WriteService, metrics: Metrics) extends
transactionMeta: TransactionMeta,
transaction: SubmittedTransaction,
estimatedInterpretationCost: Long,
)(implicit telemetryContext: TelemetryContext): CompletionStage[SubmissionResult] =
)(implicit
loggingContext: LoggingContext,
telemetryContext: TelemetryContext,
): CompletionStage[SubmissionResult] =
Timed.timedAndTrackedCompletionStage(
metrics.daml.services.write.submitTransaction,
metrics.daml.services.write.submitTransactionRunning,
@ -44,7 +48,10 @@ final class TimedWriteService(delegate: WriteService, metrics: Metrics) extends
submissionId: Ref.SubmissionId,
archives: List[DamlLf.Archive],
sourceDescription: Option[String],
)(implicit telemetryContext: TelemetryContext): CompletionStage[SubmissionResult] =
)(implicit
loggingContext: LoggingContext,
telemetryContext: TelemetryContext,
): CompletionStage[SubmissionResult] =
Timed.completionStage(
metrics.daml.services.write.uploadPackages,
delegate.uploadPackages(submissionId, archives, sourceDescription),
@ -54,7 +61,10 @@ final class TimedWriteService(delegate: WriteService, metrics: Metrics) extends
hint: Option[Ref.Party],
displayName: Option[String],
submissionId: Ref.SubmissionId,
)(implicit telemetryContext: TelemetryContext): CompletionStage[SubmissionResult] =
)(implicit
loggingContext: LoggingContext,
telemetryContext: TelemetryContext,
): CompletionStage[SubmissionResult] =
Timed.completionStage(
metrics.daml.services.write.allocateParty,
delegate.allocateParty(hint, displayName, submissionId),
@ -64,7 +74,10 @@ final class TimedWriteService(delegate: WriteService, metrics: Metrics) extends
maxRecordTime: Time.Timestamp,
submissionId: Ref.SubmissionId,
config: Configuration,
)(implicit telemetryContext: TelemetryContext): CompletionStage[SubmissionResult] =
)(implicit
loggingContext: LoggingContext,
telemetryContext: TelemetryContext,
): CompletionStage[SubmissionResult] =
Timed.completionStage(
metrics.daml.services.write.submitConfiguration,
delegate.submitConfiguration(maxRecordTime, submissionId, config),

View File

@ -134,7 +134,10 @@ final class Runner[T <: ReadWriteService, Extra](
_ <- Resource.sequence(
config.archiveFiles.map(path =>
Resource.fromFuture(
uploadDar(path, writeService)(resourceContext.executionContext)
uploadDar(path, writeService)(
loggingContext,
resourceContext.executionContext,
)
)
)
)
@ -198,7 +201,8 @@ final class Runner[T <: ReadWriteService, Extra](
}
private def uploadDar(from: Path, to: WritePackagesService)(implicit
executionContext: ExecutionContext
loggingContext: LoggingContext,
executionContext: ExecutionContext,
): Future[Unit] = DefaultTelemetry.runFutureInSpan(SpanName.RunnerUploadDar, SpanKind.Internal) {
implicit telemetryContext =>
val submissionId = Ref.SubmissionId.assertFromString(UUID.randomUUID().toString)

View File

@ -67,7 +67,10 @@ class KeyValueParticipantState(
transactionMeta: TransactionMeta,
transaction: SubmittedTransaction,
estimatedInterpretationCost: Long,
)(implicit telemetryContext: TelemetryContext): CompletionStage[SubmissionResult] =
)(implicit
loggingContext: LoggingContext,
telemetryContext: TelemetryContext,
): CompletionStage[SubmissionResult] =
writerAdapter.submitTransaction(
submitterInfo,
transactionMeta,
@ -79,21 +82,30 @@ class KeyValueParticipantState(
maxRecordTime: Time.Timestamp,
submissionId: Ref.SubmissionId,
config: Configuration,
)(implicit telemetryContext: TelemetryContext): CompletionStage[SubmissionResult] =
)(implicit
loggingContext: LoggingContext,
telemetryContext: TelemetryContext,
): CompletionStage[SubmissionResult] =
writerAdapter.submitConfiguration(maxRecordTime, submissionId, config)
override def uploadPackages(
submissionId: Ref.SubmissionId,
archives: List[DamlLf.Archive],
sourceDescription: Option[String],
)(implicit telemetryContext: TelemetryContext): CompletionStage[SubmissionResult] =
)(implicit
loggingContext: LoggingContext,
telemetryContext: TelemetryContext,
): CompletionStage[SubmissionResult] =
writerAdapter.uploadPackages(submissionId, archives, sourceDescription)
override def allocateParty(
hint: Option[Ref.Party],
displayName: Option[String],
submissionId: Ref.SubmissionId,
)(implicit telemetryContext: TelemetryContext): CompletionStage[SubmissionResult] =
)(implicit
loggingContext: LoggingContext,
telemetryContext: TelemetryContext,
): CompletionStage[SubmissionResult] =
writerAdapter.allocateParty(hint, displayName, submissionId)
override def prune(

View File

@ -5,6 +5,7 @@ package com.daml.ledger.participant.state.kvutils.api
import java.util.UUID
import java.util.concurrent.{CompletableFuture, CompletionStage}
import com.daml.daml_lf_dev.DamlLf
import com.daml.ledger.api.health.HealthStatus
import com.daml.ledger.configuration.Configuration
@ -14,8 +15,8 @@ import com.daml.ledger.participant.state.kvutils.{Envelope, KeyValueSubmission}
import com.daml.ledger.participant.state.v2._
import com.daml.lf.data.{Ref, Time}
import com.daml.lf.transaction.SubmittedTransaction
import com.daml.logging.ContextualizedLogger
import com.daml.logging.LoggingContext.newLoggingContextWith
import com.daml.logging.LoggingContext.withEnrichedLoggingContext
import com.daml.logging.{ContextualizedLogger, LoggingContext}
import com.daml.metrics.Metrics
import com.daml.telemetry.TelemetryContext
@ -36,7 +37,10 @@ class KeyValueParticipantStateWriter(
transactionMeta: TransactionMeta,
transaction: SubmittedTransaction,
estimatedInterpretationCost: Long,
)(implicit telemetryContext: TelemetryContext): CompletionStage[SubmissionResult] = {
)(implicit
loggingContext: LoggingContext,
telemetryContext: TelemetryContext,
): CompletionStage[SubmissionResult] = {
val submission =
keyValueSubmission.transactionToSubmission(
submitterInfo,
@ -45,7 +49,7 @@ class KeyValueParticipantStateWriter(
)
val metadata = CommitMetadata(submission, Some(estimatedInterpretationCost))
val submissionId = submitterInfo.submissionId.getOrElse {
newLoggingContextWith(
withEnrichedLoggingContext(
"commandId" -> submitterInfo.commandId,
"applicationId" -> submitterInfo.applicationId,
) { implicit loggingContext =>
@ -65,7 +69,10 @@ class KeyValueParticipantStateWriter(
submissionId: Ref.SubmissionId,
archives: List[DamlLf.Archive],
sourceDescription: Option[String],
)(implicit telemetryContext: TelemetryContext): CompletionStage[SubmissionResult] = {
)(implicit
loggingContext: LoggingContext,
telemetryContext: TelemetryContext,
): CompletionStage[SubmissionResult] = {
val submission = keyValueSubmission
.archivesToSubmission(
submissionId,
@ -80,7 +87,10 @@ class KeyValueParticipantStateWriter(
maxRecordTime: Time.Timestamp,
submissionId: Ref.SubmissionId,
config: Configuration,
)(implicit telemetryContext: TelemetryContext): CompletionStage[SubmissionResult] = {
)(implicit
loggingContext: LoggingContext,
telemetryContext: TelemetryContext,
): CompletionStage[SubmissionResult] = {
val submission =
keyValueSubmission
.configurationToSubmission(maxRecordTime, submissionId, writer.participantId, config)
@ -91,7 +101,10 @@ class KeyValueParticipantStateWriter(
hint: Option[Ref.Party],
displayName: Option[String],
submissionId: Ref.SubmissionId,
)(implicit telemetryContext: TelemetryContext): CompletionStage[SubmissionResult] = {
)(implicit
loggingContext: LoggingContext,
telemetryContext: TelemetryContext,
): CompletionStage[SubmissionResult] = {
val party = hint.getOrElse(generateRandomParty())
val submission =
keyValueSubmission.partyToSubmission(

View File

@ -37,6 +37,7 @@ class KeyValueParticipantStateWriterSpec
with ArgumentMatchersSugar {
import KeyValueParticipantStateWriterSpec._
private implicit val loggingContext = com.daml.logging.LoggingContext.ForTesting
"participant state writer" should {
"submit a transaction" in {

View File

@ -8,6 +8,7 @@ import java.util.concurrent.CompletionStage
import com.daml.ledger.configuration.Configuration
import com.daml.lf.data.Ref
import com.daml.lf.data.Time.Timestamp
import com.daml.logging.LoggingContext
import com.daml.telemetry.TelemetryContext
trait WriteConfigService {
@ -26,12 +27,14 @@ trait WriteConfigService {
* @param maxRecordTime: The maximum record time after which the request is rejected.
* @param submissionId: Client picked submission identifier for matching the responses with the request.
* @param config: The new ledger configuration.
* @param telemetryContext: An implicit context for tracing.
* @return an async result of a SubmissionResult
*/
def submitConfiguration(
maxRecordTime: Timestamp,
submissionId: Ref.SubmissionId,
config: Configuration,
)(implicit telemetryContext: TelemetryContext): CompletionStage[SubmissionResult]
)(implicit
loggingContext: LoggingContext,
telemetryContext: TelemetryContext,
): CompletionStage[SubmissionResult]
}

View File

@ -7,6 +7,7 @@ import java.util.concurrent.CompletionStage
import com.daml.daml_lf_dev.DamlLf.Archive
import com.daml.lf.data.Ref
import com.daml.logging.LoggingContext
import com.daml.telemetry.TelemetryContext
/** An interface for uploading packages via a participant. */
@ -37,7 +38,6 @@ trait WritePackagesService {
* @param archives Daml-LF archives to be uploaded to the ledger.
* All archives must be valid, i.e., they must successfully decode and pass
* Daml engine validation.
* @param telemetryContext An implicit context for tracing.
*
* @return an async result of a [[SubmissionResult]]
*/
@ -45,5 +45,8 @@ trait WritePackagesService {
submissionId: Ref.SubmissionId,
archives: List[Archive],
sourceDescription: Option[String],
)(implicit telemetryContext: TelemetryContext): CompletionStage[SubmissionResult]
)(implicit
loggingContext: LoggingContext,
telemetryContext: TelemetryContext,
): CompletionStage[SubmissionResult]
}

View File

@ -6,6 +6,7 @@ package com.daml.ledger.participant.state.v2
import java.util.concurrent.CompletionStage
import com.daml.lf.data.Ref
import com.daml.logging.LoggingContext
import com.daml.telemetry.TelemetryContext
/** An interface for on-boarding parties via a participant. */
@ -28,7 +29,6 @@ trait WritePartyService {
* @param hint A party identifier suggestion
* @param displayName A human readable name of the new party
* @param submissionId Client picked submission identifier for matching the responses with the request.
* @param telemetryContext An implicit context for tracing.
*
* @return an async result of a SubmissionResult
*/
@ -36,5 +36,8 @@ trait WritePartyService {
hint: Option[Ref.Party],
displayName: Option[String],
submissionId: Ref.SubmissionId,
)(implicit telemetryContext: TelemetryContext): CompletionStage[SubmissionResult]
)(implicit
loggingContext: LoggingContext,
telemetryContext: TelemetryContext,
): CompletionStage[SubmissionResult]
}

View File

@ -7,6 +7,7 @@ import java.util.concurrent.CompletionStage
import com.daml.ledger.api.health.ReportsHealth
import com.daml.lf.transaction.SubmittedTransaction
import com.daml.logging.LoggingContext
import com.daml.telemetry.TelemetryContext
/** An interface to change a ledger via a participant.
@ -94,14 +95,16 @@ trait WriteService
* daml-lf/spec/contract-id.rst.
* @param estimatedInterpretationCost Estimated cost of interpretation that may be used for
* handling submitted transactions differently.
* @param telemetryContext Implicit context for tracing.
*/
def submitTransaction(
submitterInfo: SubmitterInfo,
transactionMeta: TransactionMeta,
transaction: SubmittedTransaction,
estimatedInterpretationCost: Long,
)(implicit telemetryContext: TelemetryContext): CompletionStage[SubmissionResult]
)(implicit
loggingContext: LoggingContext,
telemetryContext: TelemetryContext,
): CompletionStage[SubmissionResult]
/** Indicates whether command deduplication should be enabled when using this [[WriteService]]
* This is temporary until we fully transition from [[com.daml.ledger.participant.state.v1.WriteService]] to [[WriteService]]

View File

@ -25,8 +25,6 @@ private[stores] final class LedgerBackedWriteService(
ledger: Ledger,
timeProvider: TimeProvider,
enablePruning: Boolean,
)(implicit
loggingContext: LoggingContext
) extends state.WriteService {
override def currentHealth(): HealthStatus = ledger.currentHealth()
@ -36,7 +34,10 @@ private[stores] final class LedgerBackedWriteService(
transactionMeta: state.TransactionMeta,
transaction: SubmittedTransaction,
estimatedInterpretationCost: Long,
)(implicit telemetryContext: TelemetryContext): CompletionStage[state.SubmissionResult] =
)(implicit
loggingContext: LoggingContext,
telemetryContext: TelemetryContext,
): CompletionStage[state.SubmissionResult] =
withEnrichedLoggingContext(
"actAs" -> submitterInfo.actAs,
"applicationId" -> submitterInfo.applicationId,
@ -55,7 +56,10 @@ private[stores] final class LedgerBackedWriteService(
hint: Option[Ref.Party],
displayName: Option[String],
submissionId: Ref.SubmissionId,
)(implicit telemetryContext: TelemetryContext): CompletionStage[state.SubmissionResult] = {
)(implicit
loggingContext: LoggingContext,
telemetryContext: TelemetryContext,
): CompletionStage[state.SubmissionResult] = {
val party = hint.getOrElse(PartyIdGenerator.generateRandomId())
withEnrichedLoggingContext(
"party" -> party,
@ -70,7 +74,10 @@ private[stores] final class LedgerBackedWriteService(
submissionId: Ref.SubmissionId,
payload: List[DamlLf.Archive],
sourceDescription: Option[String],
)(implicit telemetryContext: TelemetryContext): CompletionStage[state.SubmissionResult] =
)(implicit
loggingContext: LoggingContext,
telemetryContext: TelemetryContext,
): CompletionStage[state.SubmissionResult] =
withEnrichedLoggingContext(
"submissionId" -> submissionId,
"description" -> sourceDescription,
@ -91,7 +98,10 @@ private[stores] final class LedgerBackedWriteService(
maxRecordTime: Time.Timestamp,
submissionId: Ref.SubmissionId,
config: Configuration,
)(implicit telemetryContext: TelemetryContext): CompletionStage[state.SubmissionResult] =
)(implicit
loggingContext: LoggingContext,
telemetryContext: TelemetryContext,
): CompletionStage[state.SubmissionResult] =
withEnrichedLoggingContext(
"maxRecordTime" -> maxRecordTime.toInstant,
"submissionId" -> submissionId,

View File

@ -56,7 +56,10 @@ case class ReadWriteServiceBridge(
transactionMeta: TransactionMeta,
transaction: SubmittedTransaction,
estimatedInterpretationCost: Long,
)(implicit telemetryContext: TelemetryContext): CompletionStage[SubmissionResult] =
)(implicit
loggingContext: LoggingContext,
telemetryContext: TelemetryContext,
): CompletionStage[SubmissionResult] =
submit(
Submission.Transaction(
submitterInfo = submitterInfo,
@ -70,7 +73,10 @@ case class ReadWriteServiceBridge(
maxRecordTime: Time.Timestamp,
submissionId: Ref.SubmissionId,
config: Configuration,
)(implicit telemetryContext: TelemetryContext): CompletionStage[SubmissionResult] =
)(implicit
loggingContext: LoggingContext,
telemetryContext: TelemetryContext,
): CompletionStage[SubmissionResult] =
submit(
Submission.Config(
maxRecordTime = maxRecordTime,
@ -85,7 +91,10 @@ case class ReadWriteServiceBridge(
hint: Option[Ref.Party],
displayName: Option[String],
submissionId: Ref.SubmissionId,
)(implicit telemetryContext: TelemetryContext): CompletionStage[SubmissionResult] =
)(implicit
loggingContext: LoggingContext,
telemetryContext: TelemetryContext,
): CompletionStage[SubmissionResult] =
submit(
Submission.AllocateParty(
hint = hint,
@ -98,7 +107,10 @@ case class ReadWriteServiceBridge(
submissionId: Ref.SubmissionId,
archives: List[Archive],
sourceDescription: Option[String],
)(implicit telemetryContext: TelemetryContext): CompletionStage[SubmissionResult] =
)(implicit
loggingContext: LoggingContext,
telemetryContext: TelemetryContext,
): CompletionStage[SubmissionResult] =
submit(
Submission.UploadPackages(
submissionId = submissionId,

View File

@ -7,6 +7,7 @@ import java.io.File
import java.time.{Clock, Instant}
import java.util.UUID
import java.util.concurrent.Executors
import akka.actor.ActorSystem
import akka.stream.Materializer
import com.daml.api.util.TimeProvider
@ -32,7 +33,7 @@ import com.daml.lf.archive.DarParser
import com.daml.lf.data.Ref
import com.daml.lf.engine.{Engine, EngineConfig}
import com.daml.lf.language.LanguageVersion
import com.daml.logging.ContextualizedLogger
import com.daml.logging.{ContextualizedLogger, LoggingContext}
import com.daml.logging.LoggingContext.newLoggingContext
import com.daml.metrics.MetricsReporting
import com.daml.platform.apiserver._
@ -325,7 +326,8 @@ class Runner(config: SandboxConfig) extends ResourceOwner[Port] {
}
private def uploadDar(from: File, to: WritePackagesService)(implicit
executionContext: ExecutionContext
executionContext: ExecutionContext,
loggingContext: LoggingContext,
): Future[Unit] = DefaultTelemetry.runFutureInSpan(SpanName.RunnerUploadDar, SpanKind.Internal) {
implicit telemetryContext =>
val submissionId = Ref.SubmissionId.assertFromString(UUID.randomUUID().toString)