mirror of
https://github.com/digital-asset/daml.git
synced 2024-09-20 09:17:43 +03:00
Propagate trace context for configuration submission (#9545)
CHANGELOG_BEGIN CHANGELOG_END
This commit is contained in:
parent
fcece8b7e8
commit
bef8af2bb6
@ -4,5 +4,6 @@
|
||||
package com.daml.telemetry
|
||||
|
||||
object SpanName {
|
||||
val RunnerUploadDar: String = "daml.runner.upload-dar"
|
||||
val LedgerConfigProviderInitialConfig = "daml.ledger.config-provider.initial-config"
|
||||
val RunnerUploadDar = "daml.runner.upload-dar"
|
||||
}
|
||||
|
@ -23,6 +23,7 @@ import com.daml.ledger.resources.ResourceOwner
|
||||
import com.daml.lf.data.Time.Timestamp
|
||||
import com.daml.logging.{ContextualizedLogger, LoggingContext}
|
||||
import com.daml.platform.configuration.LedgerConfiguration
|
||||
import com.daml.telemetry.{DefaultTelemetry, SpanKind, SpanName}
|
||||
|
||||
import scala.compat.java8.FutureConverters
|
||||
import scala.concurrent.duration.{DurationInt, DurationLong}
|
||||
@ -145,27 +146,32 @@ private[apiserver] final class LedgerConfigProvider private (
|
||||
// This method therefore does not try to re-submit the initial configuration in case of failure.
|
||||
val submissionId = SubmissionId.assertFromString(UUID.randomUUID.toString)
|
||||
logger.info(s"No ledger configuration found, submitting an initial configuration $submissionId")
|
||||
FutureConverters
|
||||
.toScala(
|
||||
writeService.submitConfiguration(
|
||||
Timestamp.assertFromInstant(timeProvider.getCurrentTime.plusSeconds(60)),
|
||||
submissionId,
|
||||
config.initialConfiguration,
|
||||
)
|
||||
)
|
||||
.map {
|
||||
case SubmissionResult.Acknowledged =>
|
||||
logger.info(s"Initial configuration submission $submissionId was successful")
|
||||
()
|
||||
case SubmissionResult.NotSupported =>
|
||||
logger.info("Setting an initial ledger configuration is not supported")
|
||||
()
|
||||
case result =>
|
||||
logger.warn(
|
||||
s"Initial configuration submission $submissionId failed. Reason: ${result.description}"
|
||||
DefaultTelemetry.runFutureInSpan(
|
||||
SpanName.LedgerConfigProviderInitialConfig,
|
||||
SpanKind.Internal,
|
||||
) { implicit telemetryContext =>
|
||||
FutureConverters
|
||||
.toScala(
|
||||
writeService.submitConfiguration(
|
||||
Timestamp.assertFromInstant(timeProvider.getCurrentTime.plusSeconds(60)),
|
||||
submissionId,
|
||||
config.initialConfiguration,
|
||||
)
|
||||
()
|
||||
}
|
||||
)
|
||||
.map {
|
||||
case SubmissionResult.Acknowledged =>
|
||||
logger.info(s"Initial configuration submission $submissionId was successful")
|
||||
()
|
||||
case SubmissionResult.NotSupported =>
|
||||
logger.info("Setting an initial ledger configuration is not supported")
|
||||
()
|
||||
case result =>
|
||||
logger.warn(
|
||||
s"Initial configuration submission $submissionId failed. Reason: ${result.description}"
|
||||
)
|
||||
()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/** The latest configuration found so far.
|
||||
|
@ -29,7 +29,7 @@ import com.daml.platform.apiserver.services.logging
|
||||
import com.daml.platform.configuration.LedgerConfiguration
|
||||
import com.daml.platform.server.api.validation
|
||||
import com.daml.platform.server.api.validation.ErrorFactories
|
||||
import com.daml.telemetry.{NoOpTelemetryContext, TelemetryContext}
|
||||
import com.daml.telemetry.{DefaultTelemetry, TelemetryContext}
|
||||
import io.grpc.{ServerServiceDefinition, StatusRuntimeException}
|
||||
|
||||
import scala.compat.java8.FutureConverters._
|
||||
@ -86,6 +86,10 @@ private[apiserver] final class ApiConfigManagementService private (
|
||||
withEnrichedLoggingContext(logging.submissionId(request.submissionId)) {
|
||||
implicit loggingContext =>
|
||||
logger.info("Setting time model")
|
||||
|
||||
implicit val telemetryContext: TelemetryContext =
|
||||
DefaultTelemetry.contextFromGrpcThreadLocalContext()
|
||||
|
||||
val response = for {
|
||||
// Validate and convert the request parameters
|
||||
params <- validateParameters(request).fold(Future.failed(_), Future.successful)
|
||||
@ -129,7 +133,7 @@ private[apiserver] final class ApiConfigManagementService private (
|
||||
entry <- synchronousResponse.submitAndWait(
|
||||
submissionId,
|
||||
(params.maximumRecordTime, newConfig),
|
||||
)(NoOpTelemetryContext, executionContext, materializer)
|
||||
)
|
||||
} yield SetTimeModelResponse(entry.configuration.generation)
|
||||
|
||||
response.andThen(logger.logErrorsOnCall[SetTimeModelResponse])
|
||||
|
@ -26,6 +26,7 @@ import com.daml.lf.data.Time.Timestamp
|
||||
import com.daml.logging.LoggingContext
|
||||
import com.daml.platform.apiserver.services.LedgerConfigProviderSpec._
|
||||
import com.daml.platform.configuration.LedgerConfiguration
|
||||
import com.daml.telemetry.TelemetryContext
|
||||
import org.mockito.MockitoSugar
|
||||
import org.scalatest.matchers.should.Matchers
|
||||
import org.scalatest.wordspec.AsyncWordSpec
|
||||
@ -162,7 +163,7 @@ object LedgerConfigProviderSpec {
|
||||
maxRecordTime: Timestamp,
|
||||
submissionId: SubmissionId,
|
||||
config: Configuration,
|
||||
): CompletionStage[SubmissionResult] =
|
||||
)(implicit telemetryContext: TelemetryContext): CompletionStage[SubmissionResult] =
|
||||
CompletableFuture.supplyAsync { () =>
|
||||
Thread.sleep(delay.toMillis)
|
||||
currentOffset += 1
|
||||
|
@ -55,7 +55,7 @@ final class TimedWriteService(delegate: WriteService, metrics: Metrics) extends
|
||||
maxRecordTime: Time.Timestamp,
|
||||
submissionId: SubmissionId,
|
||||
config: Configuration,
|
||||
): CompletionStage[SubmissionResult] =
|
||||
)(implicit telemetryContext: TelemetryContext): CompletionStage[SubmissionResult] =
|
||||
Timed.completionStage(
|
||||
metrics.daml.services.write.submitConfiguration,
|
||||
delegate.submitConfiguration(maxRecordTime, submissionId, config),
|
||||
|
@ -59,7 +59,7 @@ class KeyValueParticipantState(
|
||||
maxRecordTime: Time.Timestamp,
|
||||
submissionId: SubmissionId,
|
||||
config: Configuration,
|
||||
): CompletionStage[SubmissionResult] =
|
||||
)(implicit telemetryContext: TelemetryContext): CompletionStage[SubmissionResult] =
|
||||
writerAdapter.submitConfiguration(maxRecordTime, submissionId, config)
|
||||
|
||||
override def uploadPackages(
|
||||
|
@ -60,7 +60,7 @@ class KeyValueParticipantStateWriter(writer: LedgerWriter, metrics: Metrics) ext
|
||||
maxRecordTime: Time.Timestamp,
|
||||
submissionId: SubmissionId,
|
||||
config: Configuration,
|
||||
): CompletionStage[SubmissionResult] = {
|
||||
)(implicit telemetryContext: TelemetryContext): CompletionStage[SubmissionResult] = {
|
||||
val submission =
|
||||
keyValueSubmission
|
||||
.configurationToSubmission(maxRecordTime, submissionId, writer.participantId, config)
|
||||
|
@ -6,6 +6,7 @@ package com.daml.ledger.participant.state.v1
|
||||
import java.util.concurrent.CompletionStage
|
||||
|
||||
import com.daml.lf.data.Time.Timestamp
|
||||
import com.daml.telemetry.TelemetryContext
|
||||
|
||||
trait WriteConfigService {
|
||||
|
||||
@ -23,11 +24,12 @@ trait WriteConfigService {
|
||||
* @param maxRecordTime: The maximum record time after which the request is rejected.
|
||||
* @param submissionId: Client picked submission identifier for matching the responses with the request.
|
||||
* @param config: The new ledger configuration.
|
||||
* @param telemetryContext: An implicit context for tracing.
|
||||
* @return an async result of a SubmissionResult
|
||||
*/
|
||||
def submitConfiguration(
|
||||
maxRecordTime: Timestamp,
|
||||
submissionId: SubmissionId,
|
||||
config: Configuration,
|
||||
): CompletionStage[SubmissionResult]
|
||||
)(implicit telemetryContext: TelemetryContext): CompletionStage[SubmissionResult]
|
||||
}
|
||||
|
@ -91,7 +91,7 @@ private[stores] final class LedgerBackedWriteService(ledger: Ledger, timeProvide
|
||||
maxRecordTime: Time.Timestamp,
|
||||
submissionId: SubmissionId,
|
||||
config: Configuration,
|
||||
): CompletionStage[SubmissionResult] =
|
||||
)(implicit telemetryContext: TelemetryContext): CompletionStage[SubmissionResult] =
|
||||
withEnrichedLoggingContext(
|
||||
"maxRecordTime" -> maxRecordTime.toInstant.toString,
|
||||
"submissionId" -> submissionId,
|
||||
|
@ -50,7 +50,7 @@ case class ReadWriteServiceBridge(
|
||||
maxRecordTime: Time.Timestamp,
|
||||
submissionId: SubmissionId,
|
||||
config: Configuration,
|
||||
): CompletionStage[SubmissionResult] =
|
||||
)(implicit telemetryContext: TelemetryContext): CompletionStage[SubmissionResult] =
|
||||
submit(
|
||||
Submission.Config(
|
||||
maxRecordTime = maxRecordTime,
|
||||
|
Loading…
Reference in New Issue
Block a user