Pass submission id as correlation id to error codes in Ledger API (#11381)

CHANGELOG_BEGIN
CHANGELOG_END
This commit is contained in:
tudor-da 2021-10-27 21:26:53 +02:00 committed by GitHub
parent ba106873b9
commit 6c88e55509
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 111 additions and 77 deletions

View File

@ -22,6 +22,14 @@ trait ContextualizedErrorLogger {
def error(message: String, throwable: Throwable): Unit
}
/** Implementation of [[ContextualizedErrorLogger]] leveraging the //libs-scala/contextualized-logging
* as the logging stack.
*
* @param logger The logger.
* @param loggingContext The logging context.
* @param correlationId The correlation id, if present. The choice of the correlation id depends on the
* ledger integration. By default it should be the command submission id.
*/
class DamlContextualizedErrorLogger(
logger: ContextualizedLogger,
loggingContext: LoggingContext,

View File

@ -3,11 +3,7 @@
package com.daml.platform.server.api.services.grpc
import com.daml.error.{
ContextualizedErrorLogger,
DamlContextualizedErrorLogger,
ErrorCodesVersionSwitcher,
}
import com.daml.error.{DamlContextualizedErrorLogger, ErrorCodesVersionSwitcher}
import com.daml.ledger.api.SubmissionIdGenerator
import com.daml.ledger.api.domain.LedgerId
import com.daml.ledger.api.v1.command_service.CommandServiceGrpc.CommandService
@ -37,8 +33,6 @@ class GrpcCommandService(
with ProxyCloseable {
protected implicit val logger: ContextualizedLogger = ContextualizedLogger.get(getClass)
private implicit val contextualizedErrorLogger: ContextualizedErrorLogger =
new DamlContextualizedErrorLogger(logger, loggingContext, None)
private[this] val validator = new SubmitAndWaitRequestValidator(
new CommandsValidator(ledgerId, errorCodesVersionSwitcher),
@ -53,7 +47,7 @@ class GrpcCommandService(
currentLedgerTime(),
currentUtcTime(),
maxDeduplicationTime(),
)
)(contextualizedErrorLogger(requestWithSubmissionId))
.fold(
t => Future.failed(ValidationLogger.logFailure(requestWithSubmissionId, t)),
_ => service.submitAndWait(requestWithSubmissionId),
@ -70,7 +64,7 @@ class GrpcCommandService(
currentLedgerTime(),
currentUtcTime(),
maxDeduplicationTime(),
)
)(contextualizedErrorLogger(requestWithSubmissionId))
.fold(
t => Future.failed(ValidationLogger.logFailure(requestWithSubmissionId, t)),
_ => service.submitAndWaitForTransactionId(requestWithSubmissionId),
@ -87,7 +81,7 @@ class GrpcCommandService(
currentLedgerTime(),
currentUtcTime(),
maxDeduplicationTime(),
)
)(contextualizedErrorLogger(requestWithSubmissionId))
.fold(
t => Future.failed(ValidationLogger.logFailure(requestWithSubmissionId, t)),
_ => service.submitAndWaitForTransaction(requestWithSubmissionId),
@ -104,7 +98,7 @@ class GrpcCommandService(
currentLedgerTime(),
currentUtcTime(),
maxDeduplicationTime(),
)
)(contextualizedErrorLogger(requestWithSubmissionId))
.fold(
t => Future.failed(ValidationLogger.logFailure(requestWithSubmissionId, t)),
_ => service.submitAndWaitForTransactionTree(requestWithSubmissionId),
@ -123,4 +117,9 @@ class GrpcCommandService(
request
}
}
private def contextualizedErrorLogger(request: SubmitAndWaitRequest)(implicit
loggingContext: LoggingContext
) =
new DamlContextualizedErrorLogger(logger, loggingContext, request.commands.map(_.submissionId))
}

View File

@ -3,11 +3,7 @@
package com.daml.platform.server.api.services.grpc
import com.daml.error.{
ContextualizedErrorLogger,
DamlContextualizedErrorLogger,
ErrorCodesVersionSwitcher,
}
import com.daml.error.{DamlContextualizedErrorLogger, ErrorCodesVersionSwitcher}
import com.daml.ledger.api.domain.LedgerId
import com.daml.ledger.api.v1.command_submission_service.CommandSubmissionServiceGrpc.{
CommandSubmissionService => ApiCommandSubmissionService
@ -44,12 +40,6 @@ class GrpcCommandSubmissionService(
with GrpcApiService {
protected implicit val logger: ContextualizedLogger = ContextualizedLogger.get(getClass)
private implicit val contextualizedErrorLogger: ContextualizedErrorLogger =
new DamlContextualizedErrorLogger(
logger,
loggingContext,
None,
)
private val validator = new SubmitRequestValidator(
new CommandsValidator(ledgerId, errorCodesVersionSwitcher),
FieldValidations(ErrorFactories(errorCodesVersionSwitcher)),
@ -64,6 +54,11 @@ class GrpcCommandSubmissionService(
telemetryContext.setAttribute(SpanAttribute.Submitter, commands.party)
telemetryContext.setAttribute(SpanAttribute.WorkflowId, commands.workflowId)
}
val errorLogger = new DamlContextualizedErrorLogger(
logger = logger,
loggingContext = loggingContext,
correlationId = request.commands.map(_.submissionId),
)
Timed.timedAndTrackedFuture(
metrics.daml.commands.submissions,
metrics.daml.commands.submissionsRunning,
@ -75,7 +70,7 @@ class GrpcCommandSubmissionService(
currentLedgerTime(),
currentUtcTime(),
maxDeduplicationTime(),
),
)(errorLogger),
)
.fold(
t => Future.failed(ValidationLogger.logFailure(request, t)),

View File

@ -69,7 +69,6 @@ private[apiserver] final class ApiCommandService private[services] (
private val logger = ContextualizedLogger.get(this.getClass)
private val errorFactories = ErrorFactories(errorCodesVersionSwitcher)
import errorFactories.serviceNotRunning
@volatile private var running = true
@ -82,7 +81,6 @@ private[apiserver] final class ApiCommandService private[services] (
private def submitAndWaitInternal(request: SubmitAndWaitRequest)(implicit
loggingContext: LoggingContext
): Future[Either[TrackedCompletionFailure, CompletionSuccess]] = {
val contextualizedErrorLogger = new DamlContextualizedErrorLogger(logger, loggingContext, None)
val commands = request.getCommands
withEnrichedLoggingContext(
logging.submissionId(commands.submissionId),
@ -96,10 +94,8 @@ private[apiserver] final class ApiCommandService private[services] (
.map(deadline => Duration.ofNanos(deadline.timeRemaining(TimeUnit.NANOSECONDS)))
submissionTracker.track(CommandSubmission(commands, timeout))
} else {
Future.failed(
serviceNotRunning(definiteAnswer = Some(false))(contextualizedErrorLogger)
)
}.andThen(logger.logErrorsOnCall[Completion])
handleFailure(request, loggingContext)
}
}
}
@ -160,6 +156,22 @@ private[apiserver] final class ApiCommandService private[services] (
},
)
}
private def handleFailure(
request: SubmitAndWaitRequest,
loggingContext: LoggingContext,
): Future[Either[TrackedCompletionFailure, CompletionSuccess]] =
Future
.failed(
errorFactories.serviceNotRunning(definiteAnswer = Some(false))(
new DamlContextualizedErrorLogger(
logger,
loggingContext,
request.commands.map(_.submissionId),
)
)
)
.andThen(logger.logErrorsOnCall[Completion](loggingContext))
}
private[apiserver] object ApiCommandService {

View File

@ -12,7 +12,7 @@ import com.daml.error.{
}
import com.daml.error.definitions.{ErrorCauseExport, RejectionGenerators}
import com.daml.ledger.api.DeduplicationPeriod
import com.daml.ledger.api.domain.{LedgerId, Commands => ApiCommands}
import com.daml.ledger.api.domain.{LedgerId, SubmissionId, Commands => ApiCommands}
import com.daml.ledger.api.messages.command.submission.SubmitRequest
import com.daml.ledger.configuration.Configuration
import com.daml.ledger.participant.state.index.v2._
@ -118,6 +118,14 @@ private[apiserver] final class ApiSubmissionService private[services] (
withEnrichedLoggingContext(logging.commands(request.commands)) { implicit loggingContext =>
logger.info("Submitting transaction")
logger.trace(s"Commands: ${request.commands.commands.commands}")
implicit val contextualizedErrorLogger: ContextualizedErrorLogger =
new DamlContextualizedErrorLogger(
logger,
loggingContext,
request.commands.submissionId.map(SubmissionId.unwrap),
)
val evaluatedCommand = ledgerConfigurationSubscription
.latestConfiguration() match {
case Some(ledgerConfiguration) =>
@ -133,9 +141,7 @@ private[apiserver] final class ApiSubmissionService private[services] (
}
case None =>
Future.failed(
errorFactories.missingLedgerConfig(definiteAnswer = Some(false))(
new DamlContextualizedErrorLogger(logger, loggingContext, None)
)
errorFactories.missingLedgerConfig(definiteAnswer = Some(false))
)
}
evaluatedCommand.andThen(logger.logErrorsOnCall[Unit])
@ -145,7 +151,11 @@ private[apiserver] final class ApiSubmissionService private[services] (
seed: crypto.Hash,
commands: ApiCommands,
ledgerConfig: Configuration,
)(implicit loggingContext: LoggingContext, telemetryContext: TelemetryContext): Future[Unit] =
)(implicit
loggingContext: LoggingContext,
telemetryContext: TelemetryContext,
contextualizedErrorLogger: ContextualizedErrorLogger,
): Future[Unit] =
submissionService
.deduplicateCommand(
commands.commandId,
@ -168,9 +178,7 @@ private[apiserver] final class ApiSubmissionService private[services] (
case _: CommandDeduplicationDuplicate =>
metrics.daml.commands.deduplicatedCommands.mark()
Future.failed(
errorFactories.duplicateCommandException(
new DamlContextualizedErrorLogger(logger, loggingContext, None)
)
errorFactories.duplicateCommandException
)
}
@ -195,7 +203,7 @@ private[apiserver] final class ApiSubmissionService private[services] (
private def handleCommandExecutionResult(
result: Either[ErrorCause, CommandExecutionResult]
): Future[CommandExecutionResult] =
)(implicit contextualizedErrorLogger: ContextualizedErrorLogger): Future[CommandExecutionResult] =
result.fold(
error => {
metrics.daml.commands.failedCommandInterpretations.mark()
@ -211,6 +219,7 @@ private[apiserver] final class ApiSubmissionService private[services] (
)(implicit
loggingContext: LoggingContext,
telemetryContext: TelemetryContext,
contextualizedErrorLogger: ContextualizedErrorLogger,
): Future[state.SubmissionResult] =
for {
result <- commandExecutor.execute(commands, submissionSeed, ledgerConfig)
@ -326,16 +335,12 @@ private[apiserver] final class ApiSubmissionService private[services] (
private def failedOnCommandExecution(
error: ErrorCause
)(implicit loggingContext: LoggingContext): Future[CommandExecutionResult] = {
implicit val contextualizedErrorLogger: ContextualizedErrorLogger =
new DamlContextualizedErrorLogger(logger, loggingContext, None)
)(implicit contextualizedErrorLogger: ContextualizedErrorLogger): Future[CommandExecutionResult] =
errorCodesVersionSwitcher.chooseAsFailedFuture(
v1 = toStatusExceptionV1(error),
v2 = RejectionGenerators
.commandExecutorError(cause = ErrorCauseExport.fromErrorCause(error)),
)
}
override def close(): Unit = ()

View File

@ -48,9 +48,6 @@ private[apiserver] final class ApiConfigManagementService private (
) extends ConfigManagementService
with GrpcApiService {
private implicit val logger: ContextualizedLogger = ContextualizedLogger.get(this.getClass)
private implicit val contextualizedErrorLogger: ContextualizedErrorLogger =
new DamlContextualizedErrorLogger(logger, loggingContext, None)
private val errorFactories = ErrorFactories(errorCodesVersionSwitcher)
private val fieldValidations = FieldValidations(errorFactories)
@ -70,7 +67,11 @@ private[apiserver] final class ApiConfigManagementService private (
Future.successful(configurationToResponse(configuration))
case None =>
// TODO error codes: Duplicate of missingLedgerConfig
Future.failed(missingLedgerConfigUponRequest)
Future.failed(
missingLedgerConfigUponRequest(
new DamlContextualizedErrorLogger(logger, loggingContext, None)
)
)
}
.andThen(logger.logErrorsOnCall[GetTimeModelResponse])
}
@ -97,7 +98,7 @@ private[apiserver] final class ApiConfigManagementService private (
implicit val telemetryContext: TelemetryContext =
DefaultTelemetry.contextFromGrpcThreadLocalContext()
implicit val contextualizedErrorLogger: ContextualizedErrorLogger =
new DamlContextualizedErrorLogger(logger, loggingContext, None)
new DamlContextualizedErrorLogger(logger, loggingContext, Some(request.submissionId))
val response = for {
// Validate and convert the request parameters
@ -232,13 +233,15 @@ private[apiserver] object ApiConfigManagementService {
configManagementService: IndexConfigManagementService,
ledgerEnd: LedgerOffset.Absolute,
errorFactories: ErrorFactories,
)(implicit loggingContext: LoggingContext, contextualizedErrorLogger: ContextualizedErrorLogger)
)(implicit loggingContext: LoggingContext)
extends SynchronousResponse.Strategy[
(Time.Timestamp, Configuration),
ConfigurationEntry,
ConfigurationEntry.Accepted,
] {
private val logger = ContextualizedLogger.get(getClass)
override def currentLedgerEnd(): Future[Option[LedgerOffset.Absolute]] =
Future.successful(Some(ledgerEnd))
@ -266,7 +269,9 @@ private[apiserver] object ApiConfigManagementService {
submissionId: Ref.SubmissionId
): PartialFunction[ConfigurationEntry, StatusRuntimeException] = {
case domain.ConfigurationEntry.Rejected(`submissionId`, reason, _) =>
errorFactories.configurationEntryRejected(reason, None)
errorFactories.configurationEntryRejected(reason, None)(
new DamlContextualizedErrorLogger(logger, loggingContext, Some(submissionId))
)
}
}

View File

@ -9,11 +9,8 @@ import akka.stream.Materializer
import akka.stream.scaladsl.Source
import com.daml.api.util.TimestampConversion
import com.daml.daml_lf_dev.DamlLf.Archive
import com.daml.error.{
ContextualizedErrorLogger,
DamlContextualizedErrorLogger,
ErrorCodesVersionSwitcher,
}
import com.daml.error.ErrorCodesVersionSwitcher
import com.daml.error.DamlContextualizedErrorLogger
import com.daml.ledger.api.domain.{LedgerOffset, PackageEntry}
import com.daml.ledger.api.v1.admin.package_management_service.PackageManagementServiceGrpc.PackageManagementService
import com.daml.ledger.api.v1.admin.package_management_service._
@ -60,8 +57,6 @@ private[apiserver] final class ApiPackageManagementService private (
with GrpcApiService {
private implicit val logger: ContextualizedLogger = ContextualizedLogger.get(this.getClass)
private implicit val contextualizedErrorLogger: ContextualizedErrorLogger =
new DamlContextualizedErrorLogger(logger, loggingContext, None)
private val errorFactories = ErrorFactories(errorCodesVersionSwitcher)
@ -128,7 +123,13 @@ private[apiserver] final class ApiPackageManagementService private (
ValidationLogger
.logFailureWithContext(
request,
errorFactories.invalidArgument(None)(err.getMessage),
errorFactories.invalidArgument(None)(err.getMessage)(
new DamlContextualizedErrorLogger(
logger,
loggingContext,
Some(submissionId),
)
),
)
),
Future.successful,
@ -143,7 +144,6 @@ private[apiserver] final class ApiPackageManagementService private (
response.andThen(logger.logErrorsOnCall[UploadDarFileResponse])
}
}
private[apiserver] object ApiPackageManagementService {
@ -185,8 +185,6 @@ private[apiserver] object ApiPackageManagementService {
PackageEntry.PackageUploadAccepted,
] {
private implicit val logger: ContextualizedLogger = ContextualizedLogger.get(this.getClass)
private implicit val contextualizedErrorLogger: ContextualizedErrorLogger =
new DamlContextualizedErrorLogger(logger, loggingContext, None)
override def currentLedgerEnd(): Future[Option[LedgerOffset.Absolute]] =
ledgerEndService.currentLedgerEnd().map(Some(_))
@ -209,7 +207,9 @@ private[apiserver] object ApiPackageManagementService {
submissionId: Ref.SubmissionId
): PartialFunction[PackageEntry, StatusRuntimeException] = {
case PackageEntry.PackageUploadRejected(`submissionId`, _, reason) =>
errorFactories.packageUploadRejected(reason, definiteAnswer = None)
errorFactories.packageUploadRejected(reason, definiteAnswer = None)(
new DamlContextualizedErrorLogger(logger, loggingContext, Some(submissionId))
)
}
}

View File

@ -55,7 +55,11 @@ final class ApiParticipantPruningService private (
if (request.submissionId.nonEmpty) request.submissionId else UUID.randomUUID().toString
)
.left
.map(err => invalidArgument(None)(s"submission_id $err")(contextualizedErrorLogger))
.map(err =>
invalidArgument(None)(s"submission_id $err")(
contextualizedErrorLogger(request.submissionId)
)
)
submissionIdOrErr.fold(
t => Future.failed(ValidationLogger.logFailure(request, t)),
@ -65,7 +69,10 @@ final class ApiParticipantPruningService private (
logger.info(s"Pruning up to ${request.pruneUpTo}")
(for {
pruneUpTo <- validateRequest(request)
pruneUpTo <- validateRequest(request)(
loggingContext,
contextualizedErrorLogger(submissionId),
)
// If write service pruning succeeds but ledger api server index pruning fails, the user can bring the
// systems back in sync by reissuing the prune request at the currently specified or later offset.
@ -83,7 +90,10 @@ final class ApiParticipantPruningService private (
private def validateRequest(
request: PruneRequest
)(implicit loggingContext: LoggingContext): Future[Offset] = {
)(implicit
loggingContext: LoggingContext,
errorLogger: ContextualizedErrorLogger,
): Future[Offset] = {
(for {
pruneUpToString <- checkOffsetIsSpecified(request.pruneUpTo)
pruneUpTo <- checkOffsetIsHexadecimal(pruneUpToString)
@ -98,9 +108,7 @@ final class ApiParticipantPruningService private (
pruneUpTo: Offset,
submissionId: Ref.SubmissionId,
pruneAllDivulgedContracts: Boolean,
)(implicit
loggingContext: LoggingContext
): Future[Unit] = {
)(implicit loggingContext: LoggingContext): Future[Unit] = {
import state.PruningResult._
logger.info(
s"About to prune participant ledger up to ${pruneUpTo.toApiString} inclusively starting with the write service"
@ -130,16 +138,16 @@ final class ApiParticipantPruningService private (
private def checkOffsetIsSpecified(
offset: String
)(implicit loggingContext: LoggingContext): Either[StatusRuntimeException, String] =
)(implicit errorLogger: ContextualizedErrorLogger): Either[StatusRuntimeException, String] =
Either.cond(
offset.nonEmpty,
offset,
invalidArgument(None)("prune_up_to not specified")(contextualizedErrorLogger),
invalidArgument(None)("prune_up_to not specified"),
)
private def checkOffsetIsHexadecimal(
pruneUpToString: String
)(implicit loggingContext: LoggingContext): Either[StatusRuntimeException, Offset] =
)(implicit errorLogger: ContextualizedErrorLogger): Either[StatusRuntimeException, Offset] =
ApiOffset
.fromString(pruneUpToString)
.toEither
@ -150,13 +158,16 @@ final class ApiParticipantPruningService private (
offsetValue = pruneUpToString,
message =
s"prune_up_to needs to be a hexadecimal string and not $pruneUpToString: ${t.getMessage}",
)(contextualizedErrorLogger)
)
)
private def checkOffsetIsBeforeLedgerEnd(
pruneUpToProto: Offset,
pruneUpToString: String,
)(implicit loggingContext: LoggingContext): Future[Offset] =
)(implicit
loggingContext: LoggingContext,
errorLogger: ContextualizedErrorLogger,
): Future[Offset] =
for {
ledgerEnd <- readBackend.currentLedgerEnd()
_ <-
@ -165,15 +176,14 @@ final class ApiParticipantPruningService private (
Future.failed(
readingOffsetAfterLedgerEnd_was_invalidArgument(None)(
s"prune_up_to needs to be before ledger end ${ledgerEnd.value}"
)(contextualizedErrorLogger)
)
)
} yield pruneUpToProto
private def contextualizedErrorLogger(implicit
private def contextualizedErrorLogger(submissionId: String)(implicit
loggingContext: LoggingContext
): ContextualizedErrorLogger =
new DamlContextualizedErrorLogger(logger, loggingContext, None)
new DamlContextualizedErrorLogger(logger, loggingContext, Some(submissionId))
}
object ApiParticipantPruningService {