Clarify CommandTracker [KVL-1104] (#10943)

CHANGELOG_BEGIN
CHANGELOG_END
This commit is contained in:
Hubert Slojewski 2021-09-20 19:25:26 +02:00 committed by GitHub
parent 5244643d16
commit a33176265a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 40 additions and 37 deletions

View File

@ -62,6 +62,8 @@ private[commands] class CommandTracker[Context](
private val logger = LoggerFactory.getLogger(this.getClass.getName)
type ContextualizedCompletionResponse = Ctx[Context, Either[CompletionFailure, CompletionSuccess]]
val submitRequestIn: Inlet[Ctx[Context, CommandSubmission]] =
Inlet[Ctx[Context, CommandSubmission]]("submitRequestIn")
val submitRequestOut: Outlet[Ctx[(Context, TrackedCommandKey), CommandSubmission]] =
@ -71,8 +73,8 @@ private[commands] class CommandTracker[Context](
Inlet[Either[Ctx[(Context, TrackedCommandKey), Try[Empty]], CompletionStreamElement]](
"commandResultIn"
)
val resultOut: Outlet[Ctx[Context, Either[CompletionFailure, CompletionSuccess]]] =
Outlet[Ctx[Context, Either[CompletionFailure, CompletionSuccess]]]("resultOut")
val resultOut: Outlet[ContextualizedCompletionResponse] =
Outlet[ContextualizedCompletionResponse]("resultOut")
val offsetOut: Outlet[LedgerOffset] =
Outlet[LedgerOffset]("offsetOut")
@ -93,7 +95,7 @@ private[commands] class CommandTracker[Context](
override protected def onTimer(timerKey: Any): Unit = {
timerKey match {
case `timeout_detection` =>
val timeouts = getOutputForTimeout(Instant.now)
val timeouts = getResponsesForTimeouts(Instant.now)
if (timeouts.nonEmpty) emitMultiple(resultOut, timeouts.to(immutable.Iterable))
case _ => // unknown timer, nothing to do
}
@ -164,7 +166,7 @@ private[commands] class CommandTracker[Context](
pushResultOrPullCommandResultIn(handleSubmitResponse(submitResponse))
case Right(CompletionStreamElement.CompletionElement(completion)) =>
pushResultOrPullCommandResultIn(getOutputForCompletion(completion))
pushResultOrPullCommandResultIn(getResponsesForCompletion(completion))
case Right(CompletionStreamElement.CheckpointElement(checkpoint)) =>
if (!hasBeenPulled(commandResultIn)) pull(commandResultIn)
@ -185,7 +187,7 @@ private[commands] class CommandTracker[Context](
)
private def pushResultOrPullCommandResultIn(
compl: Seq[Ctx[Context, Either[CompletionFailure, CompletionSuccess]]]
completionResponses: Seq[ContextualizedCompletionResponse]
): Unit = {
// The command tracker detects timeouts outside the regular pull/push
// mechanism of the input/output ports. Basically the timeout
@ -194,10 +196,10 @@ private[commands] class CommandTracker[Context](
// even though it hasn't been pulled again in the meantime. Using `emit`
// instead of `push` when a completion arrives makes akka take care of
// handling the signaling properly.
if (compl.isEmpty && !hasBeenPulled(commandResultIn)) {
if (completionResponses.isEmpty && !hasBeenPulled(commandResultIn)) {
pull(commandResultIn)
}
emitMultiple(resultOut, compl.to(immutable.Iterable))
emitMultiple(resultOut, completionResponses.to(immutable.Iterable))
}
private def completeStageIfTerminal(): Unit = {
@ -210,12 +212,15 @@ private[commands] class CommandTracker[Context](
private def handleSubmitResponse(
submitResponse: Ctx[(Context, TrackedCommandKey), Try[Empty]]
): Seq[Ctx[Context, Either[CompletionFailure, CompletionSuccess]]] = {
): Seq[ContextualizedCompletionResponse] = {
val Ctx((_, commandKey), value, _) = submitResponse
value match {
case Failure(GrpcException(status @ GrpcStatus(code, _), metadata))
if !nonTerminalCodes(code) =>
getOutputForTerminalStatusCode(commandKey, GrpcStatus.toProto(status, metadata))
getResponseForTerminalStatusCode(
commandKey,
GrpcStatus.toProto(status, metadata),
).toList
case Failure(throwable) =>
logger.warn(
s"Service responded with error for submitting command with context ${submitResponse.context}. Status of command is unknown. watching for completion...",
@ -238,7 +243,7 @@ private[commands] class CommandTracker[Context](
logger.trace(s"Begin tracking of command $commandId for submission $submissionId.")
if (commands.submissionId.isEmpty) {
throw new IllegalArgumentException(
s"The submission id for the command $commandId is empty. This should not happen."
s"The submission ID for the command ID $commandId is empty. This should not happen."
)
}
if (pendingCommands.contains(TrackedCommandKey(submissionId, commandId))) {
@ -274,7 +279,9 @@ private[commands] class CommandTracker[Context](
()
}
private def getOutputForTimeout(instant: Instant) = {
private def getResponsesForTimeouts(
instant: Instant
): Seq[ContextualizedCompletionResponse] = {
logger.trace("Checking timeouts at {}", instant)
pendingCommands.view.flatMap { case (commandKey, trackingData) =>
if (trackingData.commandTimeout.isBefore(instant)) {
@ -298,32 +305,29 @@ private[commands] class CommandTracker[Context](
}.toSeq
}
private def getOutputForCompletion(
private def getResponsesForCompletion(
completion: Completion
): Seq[Ctx[Context, Either[CompletionFailure, CompletionSuccess]]] = {
val completionDescription = completion.status match {
case Some(StatusProto(code, _, _, _)) if code == Status.Code.OK.value =>
"successful completion of command"
case _ => "failed completion of command"
}
): Seq[ContextualizedCompletionResponse] = {
val commandId = completion.commandId
logger.trace(
"Handling {} {} from submission {}.",
completionDescription,
commandId,
completion.submissionId,
)
val maybeSubmissionId = Option(completion.submissionId).collect {
case id if id.nonEmpty => id
}
val value = pendingCommandKeys(maybeSubmissionId, commandId)
val trackedCommands =
value.flatMap(pendingCommands.remove(_).toList)
logger.trace {
val completionDescription = completion.status match {
case Some(StatusProto(code, _, _, _)) if code == Status.Code.OK.value =>
"successful completion of command"
case _ => "failed completion of command"
}
s"Handling $completionDescription $commandId from submission $maybeSubmissionId."
}
if (trackedCommands.size > 1) {
trackedCommands.map { trackingData =>
val trackedCommandKeys = pendingCommandKeys(maybeSubmissionId, commandId)
val trackingDataForSubmissionId =
trackedCommandKeys.flatMap(pendingCommands.remove(_).toList)
if (trackingDataForSubmissionId.size > 1) {
trackingDataForSubmissionId.map { trackingData =>
Ctx(
trackingData.context,
Left(
@ -331,7 +335,7 @@ private[commands] class CommandTracker[Context](
commandId,
StatusProto.of(
Status.Code.INTERNAL.value(),
s"There are multiple pending commands for the id $commandId. This can only happen for the mutating schema.",
s"There are multiple pending commands with ID: $commandId for submission ID: $maybeSubmissionId. This can only happen for the mutating schema that shouldn't be used anymore, as it doesn't fully support command deduplication.",
Seq.empty,
),
)
@ -339,7 +343,7 @@ private[commands] class CommandTracker[Context](
)
}
} else {
trackedCommands.map(trackingData =>
trackingDataForSubmissionId.map(trackingData =>
Ctx(trackingData.context, tracker.CompletionResponse(completion))
)
}
@ -353,10 +357,10 @@ private[commands] class CommandTracker[Context](
pendingCommands.keys.filter(_.commandId == commandId).toList
}
private def getOutputForTerminalStatusCode(
private def getResponseForTerminalStatusCode(
commandKey: TrackedCommandKey,
status: StatusProto,
): Seq[Ctx[Context, Either[CompletionFailure, CompletionSuccess]]] = {
): Option[ContextualizedCompletionResponse] = {
logger.trace(
s"Handling failure of command ${commandKey.commandId} from submission ${commandKey.submissionId}."
)
@ -380,7 +384,6 @@ private[commands] class CommandTracker[Context](
)
None
}
.toList
}
override def postStop(): Unit = {

View File

@ -162,7 +162,7 @@ class CommandTrackerFlowTest
val actualException = results.expectError()
actualException shouldBe an[IllegalArgumentException]
actualException.getMessage shouldBe s"The submission id for the command $commandId is empty. This should not happen."
actualException.getMessage shouldBe s"The submission ID for the command ID $commandId is empty. This should not happen."
}
}
@ -520,7 +520,7 @@ class CommandTrackerFlowTest
commandId = commandId,
grpcStatus = StatusProto.of(
Status.Code.INTERNAL.value(),
s"There are multiple pending commands for the id $commandId. This can only happen for the mutating schema.",
s"There are multiple pending commands with ID: $commandId for submission ID: None. This can only happen for the mutating schema that shouldn't be used anymore, as it doesn't fully support command deduplication.",
Seq.empty,
),
)