mirror of
https://github.com/digital-asset/daml.git
synced 2024-09-20 01:07:18 +03:00
converting server errors to proper client errors (#11184)
* a model for trapping client errors in Scala bindings shim and reporting correctly * clean up some nesting with an alias * filter out client-side command service errors * fix flattening error propagation of CommandService errors in endpoints * remove todo * Daml evaluation triggers INVALID_ARGUMENT; handle this for creates/exercises * clean up lookupResult * remove stripLeft utility; it is unused * proper error propagation for /parties endpoint * map grpc status codes to HTTP error codes * add a case to pass-through gRPC errors in Endpoints errors * handle gRPC status in all explicit top-level catches * pass through gRPC errors in CommandService as well * treat a gRPC status anywhere in the causal chain as indicating participant-server error * propagate ContractsService errors without assuming they will always be ServerErrors * filter ServerErrors' contents when rendering errorful streams * log errors from websocket output instead of rendering full messages * hide message in ServerError case * remove Aborted * transfer with bad contract ID now returns 409 * mention new error codes * add changelog CHANGELOG_BEGIN - [JSON API] Several kinds of gRPC server errors are now reported with associated HTTP statuses; for example, a Daml-LF interpreter error now returns a 400 instead of a 500, and an exercise on an archived contract returns a 409 Conflict instead of a 500. Errors internal to JSON API (e.g. internal assertion failures) are no longer detailed in the HTTP response; their details are only logged. See `issue #11184 <https://github.com/digital-asset/daml/pull/11184>`__. CHANGELOG_END * remove unused Show and liftErr utility * adapt daml-script to new error codes * adapt typescript tests to new error codes * adapt json-api failure tests to new error codes
This commit is contained in:
parent
525e4ceb5e
commit
f4df1cc4d0
@ -425,21 +425,28 @@ class JsonLedgerClient(
|
||||
})
|
||||
}
|
||||
|
||||
private[this] val SubmissionFailures: Set[StatusCode] = {
|
||||
import StatusCodes._
|
||||
Set(InternalServerError, BadRequest, Conflict)
|
||||
}
|
||||
|
||||
def commandRequest[In, Out](endpoint: String, argument: In)(implicit
|
||||
argumentWriter: JsonWriter[In],
|
||||
outputReader: RootJsonReader[Out],
|
||||
): Future[Either[StatusRuntimeException, Out]] = {
|
||||
request[In, Out](uri.path./("v1")./(endpoint), argument).flatMap {
|
||||
case ErrorResponse(errors, status) if status == StatusCodes.InternalServerError =>
|
||||
case ErrorResponse(errors, status) if SubmissionFailures(status) =>
|
||||
// TODO (MK) Using a grpc exception here doesn’t make that much sense.
|
||||
// We should refactor this to provide something more general.
|
||||
Future.successful(
|
||||
Left(new StatusRuntimeException(Status.UNKNOWN.withDescription(errors.toString)))
|
||||
)
|
||||
case ErrorResponse(errors, status) =>
|
||||
// A non-500 failure is something like invalid JSON or “cannot resolve template ID”.
|
||||
// We don’t want to treat that failures as ones that can be caught
|
||||
// via `submitMustFail` so fail hard.
|
||||
// XXX SC JSON API doesn't distinguish between
|
||||
// 400s that mean something like invalid JSON or “cannot resolve template ID”
|
||||
// and those that mean a submission error or assertion failure.
|
||||
// Ideally, the former would go through this path rather than be treated
|
||||
// as `submitMustFail` success
|
||||
Future.failed(
|
||||
new FailedJsonApiRequest(
|
||||
uri.path./("v1")./(endpoint),
|
||||
|
@ -209,23 +209,27 @@ HTTP Status Codes
|
||||
The **JSON API** reports errors using standard HTTP status codes. It divides HTTP status codes into 3 groups indicating:
|
||||
|
||||
1. success (200)
|
||||
2. failure due to a client-side problem (400, 401, 404)
|
||||
3. failure due to a server-side problem (500)
|
||||
2. failure due to a client-side problem (400, 401, 403, 404, 409, 429)
|
||||
3. failure due to a server-side problem (500, 503)
|
||||
|
||||
The **JSON API** can return one of the following HTTP status codes:
|
||||
|
||||
- 200 - OK
|
||||
- 400 - Bad Request (Client Error)
|
||||
- 401 - Unauthorized, authentication required
|
||||
- 403 - Forbidden, insufficient permissions
|
||||
- 404 - Not Found
|
||||
- 409 - Conflict, contract ID or key missing or duplicated
|
||||
- 429 - Too Many Requests, ledger server has hit configured limit of in-flight commands
|
||||
- 500 - Internal Server Error
|
||||
- 503 - Service Unavailable, ledger server is not running yet or has been shut down
|
||||
|
||||
If a client's HTTP GET or POST request reaches an API endpoint, the corresponding response will always contain a JSON object with a ``status`` field, either an ``errors`` or ``result`` field and an optional ``warnings``:
|
||||
|
||||
.. code-block:: none
|
||||
|
||||
{
|
||||
"status": <400 | 401 | 404 | 500>,
|
||||
"status": <400 | 401 | 403 | 404 | 409 | 429 | 500 | 503>,
|
||||
"errors": <JSON array of strings>, | "result": <JSON object or array>,
|
||||
["warnings": <JSON object> ]
|
||||
}
|
||||
|
@ -547,7 +547,7 @@ test('package API', async () => {
|
||||
await p;
|
||||
expect(true).toBe(false);
|
||||
} catch (exc) {
|
||||
expect(exc.status).toBe(500);
|
||||
expect([400, 404]).toContain(exc.status);
|
||||
expect(exc.errors.length).toBe(1);
|
||||
}
|
||||
};
|
||||
|
@ -61,7 +61,7 @@ final class FailureTests
|
||||
uri,
|
||||
headersWithParties(List(p.unwrap)),
|
||||
)
|
||||
_ = status shouldBe StatusCodes.InternalServerError
|
||||
_ = status shouldBe StatusCodes.ServiceUnavailable
|
||||
(status, out) <- getRequestEncoded(uri.withPath(Uri.Path("/readyz")))
|
||||
_ = status shouldBe StatusCodes.ServiceUnavailable
|
||||
_ = out shouldBe
|
||||
@ -71,7 +71,7 @@ final class FailureTests
|
||||
|""".stripMargin.replace("\r\n", "\n")
|
||||
_ <- inside(output) { case JsObject(fields) =>
|
||||
inside(fields.get("status")) { case Some(JsNumber(code)) =>
|
||||
code shouldBe 500
|
||||
code shouldBe 503
|
||||
}
|
||||
}
|
||||
_ = proxy.enable()
|
||||
|
@ -1138,8 +1138,8 @@ abstract class AbstractHttpServiceIntegrationTest
|
||||
val exerciseJson: JsValue = encodeExercise(encoder)(iouExerciseTransferCommand(contractId))
|
||||
postJsonRequest(uri.withPath(Uri.Path("/v1/exercise")), exerciseJson)
|
||||
.flatMap { case (status, output) =>
|
||||
status shouldBe StatusCodes.InternalServerError
|
||||
assertStatus(output, StatusCodes.InternalServerError)
|
||||
status shouldBe StatusCodes.Conflict
|
||||
assertStatus(output, StatusCodes.Conflict)
|
||||
expectedOneErrorMessage(output) should include(
|
||||
s"Contract could not be found with id ContractId($contractIdString)"
|
||||
)
|
||||
|
@ -21,6 +21,7 @@ import com.daml.http.util.FutureUtil._
|
||||
import com.daml.http.util.IdentifierConverters.refApiIdentifier
|
||||
import com.daml.http.util.Logging.{InstanceUUID, RequestID}
|
||||
import com.daml.http.util.{Commands, Transactions}
|
||||
import LedgerClientJwt.Grpc
|
||||
import com.daml.jwt.domain.Jwt
|
||||
import com.daml.ledger.api.refinements.{ApiTypes => lar}
|
||||
import com.daml.ledger.api.{v1 => lav1}
|
||||
@ -30,7 +31,7 @@ import scalaz.std.scalaFuture._
|
||||
import scalaz.syntax.show._
|
||||
import scalaz.syntax.std.option._
|
||||
import scalaz.syntax.traverse._
|
||||
import scalaz.{-\/, EitherT, Show, \/, \/-}
|
||||
import scalaz.{-\/, EitherT, \/, \/-}
|
||||
|
||||
import scala.concurrent.{ExecutionContext, Future}
|
||||
import scala.util.{Failure, Success}
|
||||
@ -130,7 +131,10 @@ class CommandService(
|
||||
et.run
|
||||
}
|
||||
|
||||
private def logResult[A](op: Symbol, fa: Future[A])(implicit
|
||||
private def logResult[A](
|
||||
op: Symbol,
|
||||
fa: Grpc.EFuture[Grpc.Category.SubmitError, A],
|
||||
)(implicit
|
||||
lc: LoggingContextOf[InstanceUUID with RequestID]
|
||||
): ET[A] = {
|
||||
val opName = op.name
|
||||
@ -138,8 +142,19 @@ class CommandService(
|
||||
fa.transformWith {
|
||||
case Failure(e) =>
|
||||
logger.error(s"$opName failure", e)
|
||||
Future.successful(-\/(Error(None, e.toString)))
|
||||
case Success(a) =>
|
||||
Future.successful(-\/(e match {
|
||||
case Grpc.StatusEnvelope(status) => GrpcError(status)
|
||||
case _ => InternalError(None, e.toString)
|
||||
}))
|
||||
case Success(-\/(e)) =>
|
||||
logger.error(s"$opName failure: ${e.e}: ${e.message}")
|
||||
import Grpc.Category._
|
||||
val tagged = e.e match {
|
||||
case PermissionDenied => -\/(PermissionDenied)
|
||||
case InvalidArgument => \/-(InvalidArgument)
|
||||
}
|
||||
Future.successful(-\/(ClientError(tagged, e.message)))
|
||||
case Success(\/-(a)) =>
|
||||
logger.debug(s"$opName success: $a")
|
||||
Future.successful(\/-(a))
|
||||
}
|
||||
@ -218,7 +233,7 @@ class CommandService(
|
||||
case Seq(x) => \/-(x)
|
||||
case xs @ _ =>
|
||||
-\/(
|
||||
Error(
|
||||
InternalError(
|
||||
Some(Symbol("exactlyOneActiveContract")),
|
||||
s"Expected exactly one active contract, got: $xs",
|
||||
)
|
||||
@ -230,7 +245,10 @@ class CommandService(
|
||||
): Error \/ ImmArraySeq[ActiveContract[lav1.value.Value]] =
|
||||
response.transaction
|
||||
.toRightDisjunction(
|
||||
Error(Some(Symbol("activeContracts")), s"Received response without transaction: $response")
|
||||
InternalError(
|
||||
Some(Symbol("activeContracts")),
|
||||
s"Received response without transaction: $response",
|
||||
)
|
||||
)
|
||||
.flatMap(activeContracts)
|
||||
|
||||
@ -240,7 +258,7 @@ class CommandService(
|
||||
Transactions
|
||||
.allCreatedEvents(tx)
|
||||
.traverse(ActiveContract.fromLedgerApi(_))
|
||||
.leftMap(e => Error(Some(Symbol("activeContracts")), e.shows))
|
||||
.leftMap(e => InternalError(Some(Symbol("activeContracts")), e.shows))
|
||||
}
|
||||
|
||||
private def contracts(
|
||||
@ -248,7 +266,10 @@ class CommandService(
|
||||
): Error \/ List[Contract[lav1.value.Value]] =
|
||||
response.transaction
|
||||
.toRightDisjunction(
|
||||
Error(Some(Symbol("contracts")), s"Received response without transaction: $response")
|
||||
InternalError(
|
||||
Some(Symbol("contracts")),
|
||||
s"Received response without transaction: $response",
|
||||
)
|
||||
)
|
||||
.flatMap(contracts)
|
||||
|
||||
@ -257,7 +278,7 @@ class CommandService(
|
||||
): Error \/ List[Contract[lav1.value.Value]] =
|
||||
Contract
|
||||
.fromTransactionTree(tx)
|
||||
.leftMap(e => Error(Some(Symbol("contracts")), e.shows))
|
||||
.leftMap(e => InternalError(Some(Symbol("contracts")), e.shows))
|
||||
.map(_.toList)
|
||||
|
||||
private def exerciseResult(
|
||||
@ -270,7 +291,7 @@ class CommandService(
|
||||
} yield exResult
|
||||
|
||||
result.toRightDisjunction(
|
||||
Error(
|
||||
InternalError(
|
||||
Some(Symbol("choiceArgument")),
|
||||
s"Cannot get exerciseResult from the first ExercisedEvent of gRPC response: ${a.toString}",
|
||||
)
|
||||
@ -287,16 +308,13 @@ class CommandService(
|
||||
}
|
||||
|
||||
object CommandService {
|
||||
final case class Error(id: Option[Symbol], message: String)
|
||||
|
||||
object Error {
|
||||
implicit val errorShow: Show[Error] = Show shows {
|
||||
case Error(None, message) =>
|
||||
s"CommandService Error, $message"
|
||||
case Error(Some(id), message) =>
|
||||
s"CommandService Error, $id: $message"
|
||||
}
|
||||
}
|
||||
sealed abstract class Error extends Product with Serializable
|
||||
final case class ClientError(
|
||||
id: Grpc.Category.PermissionDenied \/ Grpc.Category.InvalidArgument,
|
||||
message: String,
|
||||
) extends Error
|
||||
final case class GrpcError(status: io.grpc.Status) extends Error
|
||||
final case class InternalError(id: Option[Symbol], message: String) extends Error
|
||||
|
||||
private type ET[A] = EitherT[Future, Error, A]
|
||||
|
||||
|
@ -222,9 +222,8 @@ class ContractsService(
|
||||
|
||||
private def lookupResult(
|
||||
errorOrAc: Option[Error \/ domain.ActiveContract[LfValue]]
|
||||
): Future[Option[domain.ActiveContract[LfValue]]] = {
|
||||
errorOrAc.cata(x => toFuture(x).map(Some(_)), Future.successful(None))
|
||||
}
|
||||
): Future[Option[domain.ActiveContract[LfValue]]] =
|
||||
errorOrAc traverse (toFuture(_))
|
||||
|
||||
private def isContractId(k: domain.ContractId)(a: domain.ActiveContract[LfValue]): Boolean =
|
||||
(a.contractId: domain.ContractId) == k
|
||||
@ -405,7 +404,7 @@ class ContractsService(
|
||||
queryParams: InMemoryQuery,
|
||||
)(implicit
|
||||
lc: LoggingContextOf[InstanceUUID]
|
||||
): Source[Error \/ domain.ActiveContract[LfValue], NotUsed] = {
|
||||
): Source[InternalError \/ domain.ActiveContract[LfValue], NotUsed] = {
|
||||
|
||||
logger.debug(
|
||||
s"Searching in memory, parties: $parties, templateIds: $templateIds, queryParms: $queryParams"
|
||||
@ -423,7 +422,7 @@ class ContractsService(
|
||||
val (errors, converted) = step.toInsertDelete.partitionMapPreservingIds { apiEvent =>
|
||||
domain.ActiveContract
|
||||
.fromLedgerApi(apiEvent)
|
||||
.leftMap(e => Error(Symbol("searchInMemory"), e.shows))
|
||||
.leftMap(e => InternalError(Symbol("searchInMemory"), e.shows))
|
||||
.flatMap(apiAcToLfAc): Error \/ Ac
|
||||
}
|
||||
val convertedInserts = converted.inserts filter { ac =>
|
||||
@ -529,7 +528,7 @@ class ContractsService(
|
||||
ac: domain.ActiveContract[ApiValue]
|
||||
): Error \/ domain.ActiveContract[LfValue] =
|
||||
ac.traverse(ApiValueToLfValueConverter.apiValueToLfValue)
|
||||
.leftMap(e => Error(Symbol("apiAcToLfAc"), e.shows))
|
||||
.leftMap(e => InternalError(Symbol("apiAcToLfAc"), e.shows))
|
||||
|
||||
private[http] def valuePredicate(
|
||||
templateId: domain.TemplateId.RequiredPkg,
|
||||
@ -539,7 +538,7 @@ class ContractsService(
|
||||
|
||||
private def lfValueToJsValue(a: LfValue): Error \/ JsValue =
|
||||
\/.attempt(LfValueCodec.apiValueToJsValue(a))(e =>
|
||||
Error(Symbol("lfValueToJsValue"), e.description)
|
||||
InternalError(Symbol("lfValueToJsValue"), e.description)
|
||||
)
|
||||
|
||||
private[http] def resolveTemplateIds[Tid <: domain.TemplateId.OptionalPkg](
|
||||
@ -644,7 +643,9 @@ object ContractsService {
|
||||
): Source[Error \/ domain.ActiveContract[LfV], NotUsed]
|
||||
}
|
||||
|
||||
case class Error(id: Symbol, message: String)
|
||||
final case class Error(id: Symbol, message: String)
|
||||
private type InternalError = Error
|
||||
private[http] val InternalError: Error.type = Error
|
||||
|
||||
object Error {
|
||||
implicit val errorShow: Show[Error] = Show shows { e =>
|
||||
|
@ -41,12 +41,10 @@ import com.daml.http.util.{ProtobufByteStrings, toLedgerId}
|
||||
import com.daml.jwt.domain.Jwt
|
||||
import com.daml.ledger.api.{v1 => lav1}
|
||||
import com.daml.logging.LoggingContextOf.withEnrichedLoggingContext
|
||||
import com.daml.scalautil.ExceptionOps._
|
||||
import scalaz.std.scalaFuture._
|
||||
import scalaz.syntax.std.option._
|
||||
import scalaz.syntax.show._
|
||||
import scalaz.syntax.traverse._
|
||||
import scalaz.{-\/, EitherT, NonEmptyList, Show, Traverse, \/, \/-}
|
||||
import scalaz.{-\/, EitherT, NonEmptyList, Traverse, \/, \/-}
|
||||
import spray.json._
|
||||
|
||||
import scala.concurrent.duration.FiniteDuration
|
||||
@ -454,7 +452,8 @@ class Endpoints(
|
||||
def allParties(req: HttpRequest)(implicit
|
||||
lc: LoggingContextOf[InstanceUUID with RequestID]
|
||||
): ET[domain.SyncResponse[List[domain.PartyDetails]]] =
|
||||
proxyWithoutCommand((jwt, _) => partiesService.allParties(jwt))(req).map(domain.OkResponse(_))
|
||||
proxyWithoutCommand((jwt, _) => partiesService.allParties(jwt))(req)
|
||||
.flatMap(pd => either(pd map (domain.OkResponse(_))))
|
||||
|
||||
def parties(req: HttpRequest)(implicit
|
||||
lc: LoggingContextOf[InstanceUUID with RequestID]
|
||||
@ -522,31 +521,24 @@ class Endpoints(
|
||||
} yield domain.OkResponse(())
|
||||
|
||||
private def handleFutureEitherFailure[A, B](fa: Future[A \/ B])(implicit
|
||||
A: IntoServerError[A],
|
||||
A: IntoEndpointsError[A],
|
||||
lc: LoggingContextOf[InstanceUUID with RequestID],
|
||||
): Future[Error \/ B] =
|
||||
fa.map(_ leftMap A.run).recover { case NonFatal(e) =>
|
||||
logger.error("Future failed", e)
|
||||
-\/(ServerError(e.description))
|
||||
}
|
||||
fa.map(_ leftMap A.run)
|
||||
.recover(logException("Future") andThen Error.fromThrowable andThen (-\/(_)))
|
||||
|
||||
private def handleFutureFailure[E >: ServerError, A](fa: Future[A])(implicit
|
||||
private def handleFutureFailure[A](fa: Future[A])(implicit
|
||||
lc: LoggingContextOf[InstanceUUID with RequestID]
|
||||
): Future[E \/ A] =
|
||||
fa.map(a => \/-(a)).recover { case NonFatal(e) =>
|
||||
logger.error("Future failed", e)
|
||||
-\/(ServerError(e.description))
|
||||
}
|
||||
): Future[Error \/ A] =
|
||||
fa.map(a => \/-(a)).recover(logException("Future") andThen Error.fromThrowable andThen (-\/(_)))
|
||||
|
||||
private def handleSourceFailure[E: Show, A](implicit
|
||||
lc: LoggingContextOf[InstanceUUID with RequestID]
|
||||
private def handleSourceFailure[E, A](implicit
|
||||
E: IntoEndpointsError[E],
|
||||
lc: LoggingContextOf[InstanceUUID with RequestID],
|
||||
): Flow[E \/ A, Error \/ A, NotUsed] =
|
||||
Flow
|
||||
.fromFunction((_: E \/ A).liftErr[Error](ServerError))
|
||||
.recover { case NonFatal(e) =>
|
||||
logger.error("Source failed", e)
|
||||
-\/(ServerError(e.description))
|
||||
}
|
||||
.fromFunction((_: E \/ A).leftMap(E.run))
|
||||
.recover(logException("Source") andThen Error.fromThrowable andThen (-\/(_)))
|
||||
|
||||
private def httpResponse(
|
||||
output: Future[Error \/ SearchResult[Error \/ JsValue]]
|
||||
@ -556,9 +548,14 @@ class Endpoints(
|
||||
case -\/(e) => httpResponseError(e)
|
||||
case \/-(searchResult) => httpResponse(searchResult)
|
||||
}
|
||||
.recover { case NonFatal(e) =>
|
||||
httpResponseError(ServerError(e.description))
|
||||
}
|
||||
.recover(Error.fromThrowable andThen (httpResponseError(_)))
|
||||
|
||||
private[this] def logException(fromWhat: String)(implicit
|
||||
lc: LoggingContextOf[InstanceUUID with RequestID]
|
||||
): Throwable PartialFunction Throwable = { case NonFatal(e) =>
|
||||
logger.error(s"$fromWhat failed", e)
|
||||
e
|
||||
}
|
||||
|
||||
private def httpResponse(searchResult: SearchResult[Error \/ JsValue]): HttpResponse = {
|
||||
import json.JsonProtocol._
|
||||
@ -566,7 +563,7 @@ class Endpoints(
|
||||
val response: Source[ByteString, NotUsed] = searchResult match {
|
||||
case domain.OkResponse(result, warnings, _) =>
|
||||
val warningsJsVal: Option[JsValue] = warnings.map(SprayJson.encodeUnsafe(_))
|
||||
ResponseFormats.resultJsObject(result, warningsJsVal)
|
||||
ResponseFormats.resultJsObject(result via filterStreamErrors, warningsJsVal)
|
||||
case error: domain.ErrorResponse =>
|
||||
val jsVal: JsValue = SprayJson.encodeUnsafe(error)
|
||||
Source.single(ByteString(jsVal.compactPrint))
|
||||
@ -579,6 +576,12 @@ class Endpoints(
|
||||
)
|
||||
}
|
||||
|
||||
private[this] def filterStreamErrors[E, A]: Flow[Error \/ A, Error \/ A, NotUsed] =
|
||||
Flow[Error \/ A].map {
|
||||
case -\/(ServerError(_)) => -\/(ServerError("internal server error"))
|
||||
case o => o
|
||||
}
|
||||
|
||||
private def httpResponse[A: JsonWriter](
|
||||
result: ET[domain.SyncResponse[A]]
|
||||
)(implicit metrics: Metrics): Future[HttpResponse] = {
|
||||
@ -598,9 +601,7 @@ class Endpoints(
|
||||
status = status,
|
||||
)
|
||||
}
|
||||
.recover { case NonFatal(e) =>
|
||||
httpResponseError(ServerError(e.description))
|
||||
},
|
||||
.recover(Error.fromThrowable andThen (httpResponseError(_))),
|
||||
)
|
||||
}
|
||||
|
||||
@ -747,12 +748,27 @@ object Endpoints {
|
||||
|
||||
private type LfValue = lf.value.Value
|
||||
|
||||
private final class IntoServerError[-A](val run: A => Error) extends AnyVal
|
||||
private object IntoServerError extends IntoServerErrorLow {
|
||||
implicit val id: IntoServerError[Error] = new IntoServerError(identity)
|
||||
}
|
||||
private sealed abstract class IntoServerErrorLow {
|
||||
implicit def shown[A: Show]: IntoServerError[A] = new IntoServerError(a => ServerError(a.shows))
|
||||
private final class IntoEndpointsError[-A](val run: A => Error) extends AnyVal
|
||||
private object IntoEndpointsError {
|
||||
import LedgerClientJwt.Grpc.Category
|
||||
|
||||
implicit val id: IntoEndpointsError[Error] = new IntoEndpointsError(identity)
|
||||
|
||||
implicit val fromCommands: IntoEndpointsError[CommandService.Error] = new IntoEndpointsError({
|
||||
case CommandService.InternalError(id, message) =>
|
||||
ServerError(s"command service error, ${id.cata(sym => s"${sym.name}: ", "")}$message")
|
||||
case CommandService.GrpcError(status) =>
|
||||
ParticipantServerError(status.getCode, Option(status.getDescription))
|
||||
case CommandService.ClientError(-\/(Category.PermissionDenied), message) =>
|
||||
Unauthorized(message)
|
||||
case CommandService.ClientError(\/-(Category.InvalidArgument), message) =>
|
||||
InvalidUserInput(message)
|
||||
})
|
||||
|
||||
implicit val fromContracts: IntoEndpointsError[ContractsService.Error] =
|
||||
new IntoEndpointsError({ case ContractsService.InternalError(id, msg) =>
|
||||
ServerError(s"contracts service error, ${id.name}: $msg")
|
||||
})
|
||||
}
|
||||
|
||||
private def lfValueToJsValue(a: LfValue): Error \/ JsValue =
|
||||
|
@ -9,14 +9,18 @@ import akka.http.scaladsl.server.{RequestContext, Route}
|
||||
import akka.util.ByteString
|
||||
import com.daml.http.domain.{JwtPayload, JwtPayloadLedgerIdOnly, JwtWritePayload}
|
||||
import com.daml.http.json.SprayJson
|
||||
import util.GrpcHttpErrorCodes._
|
||||
import com.daml.jwt.domain.{DecodedJwt, Jwt}
|
||||
import com.daml.ledger.api.auth.AuthServiceJWTCodec
|
||||
import com.daml.ledger.api.refinements.{ApiTypes => lar}
|
||||
import com.daml.scalautil.ExceptionOps._
|
||||
import io.grpc.Status.{Code => GrpcCode}
|
||||
import scalaz.syntax.std.option._
|
||||
import scalaz.{-\/, NonEmptyList, Show, \/, \/-}
|
||||
import spray.json.JsValue
|
||||
|
||||
import scala.concurrent.Future
|
||||
import scala.util.control.NonFatal
|
||||
|
||||
object EndpointsCompanion {
|
||||
|
||||
@ -30,15 +34,26 @@ object EndpointsCompanion {
|
||||
|
||||
final case class ServerError(message: String) extends Error
|
||||
|
||||
final case class ParticipantServerError(grpcStatus: GrpcCode, description: Option[String])
|
||||
extends Error
|
||||
|
||||
final case class NotFound(message: String) extends Error
|
||||
|
||||
object Error {
|
||||
implicit val ShowInstance: Show[Error] = Show shows {
|
||||
case InvalidUserInput(e) => s"Endpoints.InvalidUserInput: ${e: String}"
|
||||
case ParticipantServerError(s, d) =>
|
||||
s"Endpoints.ParticipantServerError: ${s: GrpcCode}${d.cata((": " + _), "")}"
|
||||
case ServerError(e) => s"Endpoints.ServerError: ${e: String}"
|
||||
case Unauthorized(e) => s"Endpoints.Unauthorized: ${e: String}"
|
||||
case NotFound(e) => s"Endpoints.NotFound: ${e: String}"
|
||||
}
|
||||
|
||||
def fromThrowable: Throwable PartialFunction Error = {
|
||||
case LedgerClientJwt.Grpc.StatusEnvelope(status) =>
|
||||
ParticipantServerError(status.getCode, Option(status.getDescription))
|
||||
case NonFatal(t) => ServerError(t.description)
|
||||
}
|
||||
}
|
||||
|
||||
trait ParsePayload[A] {
|
||||
@ -142,7 +157,9 @@ object EndpointsCompanion {
|
||||
private[http] def errorResponse(error: Error): domain.ErrorResponse = {
|
||||
val (status, errorMsg): (StatusCode, String) = error match {
|
||||
case InvalidUserInput(e) => StatusCodes.BadRequest -> e
|
||||
case ServerError(e) => StatusCodes.InternalServerError -> e
|
||||
case ParticipantServerError(grpcStatus, d) =>
|
||||
grpcStatus.asAkkaHttpForJsonApi -> s"$grpcStatus${d.cata((": " + _), "")}"
|
||||
case ServerError(_) => StatusCodes.InternalServerError -> "HTTP JSON API Server Error"
|
||||
case Unauthorized(e) => StatusCodes.Unauthorized -> e
|
||||
case NotFound(e) => StatusCodes.NotFound -> e
|
||||
}
|
||||
|
@ -5,7 +5,7 @@ package com.daml.http
|
||||
|
||||
import akka.NotUsed
|
||||
import akka.stream.scaladsl.Source
|
||||
import com.daml.http.util.Logging.{InstanceUUID, RequestID}
|
||||
import util.Logging.{InstanceUUID, RequestID}
|
||||
import com.daml.jwt.domain.Jwt
|
||||
import com.daml.ledger.api
|
||||
import com.daml.ledger.api.v1.package_service
|
||||
@ -22,20 +22,33 @@ import com.daml.ledger.client.withoutledgerid.{LedgerClient => DamlLedgerClient}
|
||||
import com.daml.lf.data.Ref
|
||||
import com.daml.logging.{ContextualizedLogger, LoggingContextOf}
|
||||
import com.google.protobuf
|
||||
import scalaz.OneAnd
|
||||
import io.grpc.Status, Status.Code, Code.{values => _, _}
|
||||
import scalaz.{OneAnd, \/, -\/}
|
||||
import scalaz.syntax.std.boolean._
|
||||
|
||||
import scala.concurrent.{ExecutionContext, Future}
|
||||
import scala.concurrent.{ExecutionContext => EC, Future}
|
||||
import scala.util.control.NonFatal
|
||||
import com.daml.ledger.api.{domain => LedgerApiDomain}
|
||||
|
||||
object LedgerClientJwt {
|
||||
import Grpc.EFuture, Grpc.Category._
|
||||
|
||||
private[this] val logger = ContextualizedLogger.get(getClass)
|
||||
|
||||
// there are other error categories of interest if we wish to propagate
|
||||
// different 5xx errors, but PermissionDenied and InvalidArgument are the only
|
||||
// "client" errors here
|
||||
type SubmitAndWaitForTransaction =
|
||||
(Jwt, SubmitAndWaitRequest) => Future[SubmitAndWaitForTransactionResponse]
|
||||
(
|
||||
Jwt,
|
||||
SubmitAndWaitRequest,
|
||||
) => EFuture[SubmitError, SubmitAndWaitForTransactionResponse]
|
||||
|
||||
type SubmitAndWaitForTransactionTree =
|
||||
(Jwt, SubmitAndWaitRequest) => Future[SubmitAndWaitForTransactionTreeResponse]
|
||||
(
|
||||
Jwt,
|
||||
SubmitAndWaitRequest,
|
||||
) => EFuture[SubmitError, SubmitAndWaitForTransactionTreeResponse]
|
||||
|
||||
type GetTermination =
|
||||
(Jwt, LedgerApiDomain.LedgerId) => Future[Option[Terminates.AtAbsolute]]
|
||||
@ -58,10 +71,13 @@ object LedgerClientJwt {
|
||||
) => Source[Transaction, NotUsed]
|
||||
|
||||
type ListKnownParties =
|
||||
Jwt => Future[List[api.domain.PartyDetails]]
|
||||
Jwt => EFuture[PermissionDenied, List[api.domain.PartyDetails]]
|
||||
|
||||
type GetParties =
|
||||
(Jwt, OneAnd[Set, Ref.Party]) => Future[List[api.domain.PartyDetails]]
|
||||
(
|
||||
Jwt,
|
||||
OneAnd[Set, Ref.Party],
|
||||
) => EFuture[PermissionDenied, List[api.domain.PartyDetails]]
|
||||
|
||||
type AllocateParty =
|
||||
(Jwt, Option[Ref.Party], Option[String]) => Future[api.domain.PartyDetails]
|
||||
@ -89,13 +105,23 @@ object LedgerClientJwt {
|
||||
|
||||
private def bearer(jwt: Jwt): Some[String] = Some(jwt.value: String)
|
||||
|
||||
def submitAndWaitForTransaction(client: DamlLedgerClient): SubmitAndWaitForTransaction =
|
||||
(jwt, req) => client.commandServiceClient.submitAndWaitForTransaction(req, bearer(jwt))
|
||||
def submitAndWaitForTransaction(
|
||||
client: DamlLedgerClient
|
||||
)(implicit ec: EC): SubmitAndWaitForTransaction =
|
||||
(jwt, req) =>
|
||||
client.commandServiceClient
|
||||
.submitAndWaitForTransaction(req, bearer(jwt))
|
||||
.requireHandling(submitErrors)
|
||||
|
||||
def submitAndWaitForTransactionTree(client: DamlLedgerClient): SubmitAndWaitForTransactionTree =
|
||||
(jwt, req) => client.commandServiceClient.submitAndWaitForTransactionTree(req, bearer(jwt))
|
||||
def submitAndWaitForTransactionTree(
|
||||
client: DamlLedgerClient
|
||||
)(implicit ec: EC): SubmitAndWaitForTransactionTree =
|
||||
(jwt, req) =>
|
||||
client.commandServiceClient
|
||||
.submitAndWaitForTransactionTree(req, bearer(jwt))
|
||||
.requireHandling(submitErrors)
|
||||
|
||||
def getTermination(client: DamlLedgerClient)(implicit ec: ExecutionContext): GetTermination =
|
||||
def getTermination(client: DamlLedgerClient)(implicit ec: EC): GetTermination =
|
||||
(jwt, ledgerId) =>
|
||||
client.transactionClient.getLedgerEnd(ledgerId, bearer(jwt)).map {
|
||||
_.offset flatMap {
|
||||
@ -161,11 +187,17 @@ object LedgerClientJwt {
|
||||
}
|
||||
}
|
||||
|
||||
def listKnownParties(client: DamlLedgerClient): ListKnownParties =
|
||||
jwt => client.partyManagementClient.listKnownParties(bearer(jwt))
|
||||
def listKnownParties(client: DamlLedgerClient)(implicit ec: EC): ListKnownParties =
|
||||
jwt =>
|
||||
client.partyManagementClient.listKnownParties(bearer(jwt)).requireHandling {
|
||||
case PERMISSION_DENIED => PermissionDenied
|
||||
}
|
||||
|
||||
def getParties(client: DamlLedgerClient): GetParties =
|
||||
(jwt, partyIds) => client.partyManagementClient.getParties(partyIds, bearer(jwt))
|
||||
def getParties(client: DamlLedgerClient)(implicit ec: EC): GetParties =
|
||||
(jwt, partyIds) =>
|
||||
client.partyManagementClient.getParties(partyIds, bearer(jwt)).requireHandling {
|
||||
case PERMISSION_DENIED => PermissionDenied
|
||||
}
|
||||
|
||||
def allocateParty(client: DamlLedgerClient): AllocateParty =
|
||||
(jwt, identifierHint, displayName) =>
|
||||
@ -195,4 +227,53 @@ object LedgerClientJwt {
|
||||
logger.trace("sending upload dar request to ledger")
|
||||
client.packageManagementClient.uploadDarFile(darFile = byteString, token = bearer(jwt))
|
||||
}
|
||||
|
||||
// a shim error model to stand in for https://github.com/digital-asset/daml/issues/9834
|
||||
object Grpc {
|
||||
type EFuture[E, A] = Future[Error[E] \/ A]
|
||||
|
||||
final case class Error[+E](e: E, message: String)
|
||||
|
||||
private[http] object StatusEnvelope {
|
||||
def unapply(t: Throwable): Option[Status] = t match {
|
||||
case NonFatal(t) =>
|
||||
val s = Status fromThrowable t
|
||||
// fromThrowable uses UNKNOWN if it didn't find one
|
||||
(s.getCode != UNKNOWN) option s
|
||||
case _ => None
|
||||
}
|
||||
}
|
||||
|
||||
// like Code but with types
|
||||
// only needs to contain types that may be reported to the json-api user;
|
||||
// if it is an "internal error" there is no need to call it out for handling
|
||||
// e.g. Unauthenticated never needs to be specially handled, because we should
|
||||
// have caught that the jwt token was missing and reported that to client already
|
||||
object Category {
|
||||
sealed trait SubmitError
|
||||
// XXX SC we might be able to assign singleton types to the Codes instead in 2.13+
|
||||
type PermissionDenied = PermissionDenied.type
|
||||
case object PermissionDenied extends SubmitError
|
||||
type InvalidArgument = InvalidArgument.type
|
||||
case object InvalidArgument extends SubmitError
|
||||
// not *every* singleton here should be a subtype of SubmitError;
|
||||
// think of it more like a Venn diagram
|
||||
|
||||
private[LedgerClientJwt] val submitErrors: Code PartialFunction SubmitError = {
|
||||
case PERMISSION_DENIED => PermissionDenied
|
||||
case INVALID_ARGUMENT => InvalidArgument
|
||||
}
|
||||
|
||||
private[LedgerClientJwt] implicit final class `Future Status Category ops`[A](
|
||||
private val fa: Future[A]
|
||||
) extends AnyVal {
|
||||
def requireHandling[E](c: Code PartialFunction E)(implicit ec: EC): EFuture[E, A] =
|
||||
fa map \/.right[Error[E], A] recover Function.unlift {
|
||||
case StatusEnvelope(status) =>
|
||||
c.lift(status.getCode) map (e => -\/(Error(e, status.asRuntimeException.getMessage)))
|
||||
case _ => None
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -4,10 +4,11 @@
|
||||
package com.daml.http
|
||||
|
||||
import com.daml.lf.data.Ref
|
||||
import com.daml.http.EndpointsCompanion.{Error, InvalidUserInput}
|
||||
import com.daml.http.EndpointsCompanion.{Error, InvalidUserInput, Unauthorized}
|
||||
import com.daml.http.util.FutureUtil._
|
||||
import com.daml.jwt.domain.Jwt
|
||||
import com.daml.ledger.api
|
||||
import LedgerClientJwt.Grpc
|
||||
import scalaz.std.option._
|
||||
import scalaz.std.scalaFuture._
|
||||
import scalaz.std.string._
|
||||
@ -44,9 +45,10 @@ class PartiesService(
|
||||
et.run
|
||||
}
|
||||
|
||||
def allParties(jwt: Jwt): Future[List[domain.PartyDetails]] = {
|
||||
listAllParties(jwt).map(ps => ps.map(p => domain.PartyDetails.fromLedgerApi(p)))
|
||||
}
|
||||
def allParties(jwt: Jwt): Future[Error \/ List[domain.PartyDetails]] =
|
||||
listAllParties(jwt).map(
|
||||
_ bimap (handleGrpcError, (_ map domain.PartyDetails.fromLedgerApi))
|
||||
)
|
||||
|
||||
def parties(
|
||||
jwt: Jwt,
|
||||
@ -54,7 +56,8 @@ class PartiesService(
|
||||
): Future[Error \/ (Set[domain.PartyDetails], Set[domain.Party])] = {
|
||||
val et: ET[(Set[domain.PartyDetails], Set[domain.Party])] = for {
|
||||
apiPartyIds <- either(toLedgerApiPartySet(identifiers)): ET[OneAnd[Set, Ref.Party]]
|
||||
apiPartyDetails <- rightT(getParties(jwt, apiPartyIds)): ET[List[api.domain.PartyDetails]]
|
||||
apiPartyDetails <- eitherT(getParties(jwt, apiPartyIds))
|
||||
.leftMap(handleGrpcError): ET[List[api.domain.PartyDetails]]
|
||||
domainPartyDetails = apiPartyDetails.iterator
|
||||
.map(domain.PartyDetails.fromLedgerApi)
|
||||
.toSet: Set[domain.PartyDetails]
|
||||
@ -82,6 +85,9 @@ object PartiesService {
|
||||
|
||||
private type ET[A] = EitherT[Future, Error, A]
|
||||
|
||||
private def handleGrpcError(e: Grpc.Error[Grpc.Category.PermissionDenied]): Error =
|
||||
Unauthorized(e.message)
|
||||
|
||||
def toLedgerApiPartySet(
|
||||
ps: OneAnd[Set, domain.Party]
|
||||
): InvalidUserInput \/ OneAnd[Set, Ref.Party] = {
|
||||
|
@ -58,6 +58,8 @@ import com.daml.ledger.api.{domain => LedgerApiDomain}
|
||||
object WebSocketService {
|
||||
import util.ErrorOps._
|
||||
|
||||
private val logger = ContextualizedLogger.get(getClass)
|
||||
|
||||
private type CompiledQueries =
|
||||
Map[domain.TemplateId.RequiredPkg, (ValuePredicate, LfV => Boolean)]
|
||||
|
||||
@ -97,6 +99,11 @@ object WebSocketService {
|
||||
) {
|
||||
import JsonProtocol._, spray.json._
|
||||
|
||||
def logHiddenErrors()(implicit lc: LoggingContextOf[InstanceUUID]): Unit =
|
||||
errors foreach { case ServerError(message) =>
|
||||
logger.error(s"while rendering contract: ${message: String}")
|
||||
}
|
||||
|
||||
def render(implicit lfv: LfVT <~< JsValue, pos: Pos <~< Map[String, JsValue]): JsObject = {
|
||||
|
||||
def inj[V: JsonWriter](ctor: String, v: V) = JsObject(ctor -> v.toJson)
|
||||
@ -111,7 +118,9 @@ object WebSocketService {
|
||||
++ inserts.map { case (ac, pos) =>
|
||||
val acj = inj("created", ac)
|
||||
acj copy (fields = acj.fields ++ pos)
|
||||
} ++ errors.map(e => inj("error", e.message)))
|
||||
} ++ errors.map(_ => inj("error", "error rendering contract")))
|
||||
// XXX SC ^ all useful information is now hidden;
|
||||
// can replace with an error count in later API version
|
||||
|
||||
val offsetAfter = step.bookmark.map(_.toJson)
|
||||
|
||||
@ -589,8 +598,6 @@ class WebSocketService(
|
||||
wsConfig: Option[WebsocketConfig],
|
||||
)(implicit mat: Materializer, ec: ExecutionContext) {
|
||||
|
||||
private[this] val logger = ContextualizedLogger.get(getClass)
|
||||
|
||||
import WebSocketService._
|
||||
import com.daml.scalautil.Statement.discard
|
||||
import util.ErrorOps._
|
||||
@ -835,7 +842,10 @@ class WebSocketService(
|
||||
)
|
||||
.map(
|
||||
_.via(removePhantomArchives(remove = Q.removePhantomArchives(request)))
|
||||
.map(_.mapPos(Q.renderCreatedMetadata).render)
|
||||
.map { sae =>
|
||||
sae.logHiddenErrors()
|
||||
sae.mapPos(Q.renderCreatedMetadata).render
|
||||
}
|
||||
.prepend(reportUnresolvedTemplateIds(unresolved))
|
||||
.map(jsv => \/-(wsMessage(jsv)))
|
||||
)
|
||||
|
@ -27,8 +27,5 @@ object ErrorOps {
|
||||
extends AnyRef {
|
||||
def liftErr[M](f: String => M)(implicit L: Show[L]): EitherT[F, M, R] =
|
||||
self leftMap (e => f(e.shows))
|
||||
|
||||
def liftErrS[M](msg: String)(f: String => M)(implicit L: Show[L]): EitherT[F, M, R] =
|
||||
liftErr(x => f(msg + " " + x))
|
||||
}
|
||||
}
|
||||
|
@ -4,9 +4,9 @@
|
||||
package com.daml.http.util
|
||||
|
||||
import scalaz.syntax.show._
|
||||
import scalaz.{-\/, Applicative, EitherT, Functor, Show, \/, \/-}
|
||||
import scalaz.{Applicative, EitherT, Functor, Show, \/}
|
||||
|
||||
import scala.concurrent.{ExecutionContext, Future}
|
||||
import scala.concurrent.Future
|
||||
import scala.util.Try
|
||||
|
||||
object FutureUtil {
|
||||
@ -40,12 +40,4 @@ object FutureUtil {
|
||||
|
||||
def leftT[A, B](fa: Future[A])(implicit ev: Functor[Future]): EitherT[Future, A, B] =
|
||||
EitherT.leftT(fa)
|
||||
|
||||
def stripLeft[A: Show, B](fa: Future[A \/ B])(implicit ec: ExecutionContext): Future[B] =
|
||||
fa.flatMap {
|
||||
case -\/(e) =>
|
||||
Future.failed(new IllegalStateException(e.shows))
|
||||
case \/-(a) =>
|
||||
Future.successful(a)
|
||||
}
|
||||
}
|
||||
|
@ -0,0 +1,40 @@
|
||||
// Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package com.daml.http
|
||||
package util
|
||||
|
||||
private[http] object GrpcHttpErrorCodes {
|
||||
import io.grpc.Status.{Code => G}
|
||||
import akka.http.scaladsl.model.{StatusCode, StatusCodes => A}
|
||||
|
||||
implicit final class `gRPC status as akka http`(private val self: G) extends AnyVal {
|
||||
// some version of this mapping _should_ already exist somewhere, right? -SC
|
||||
def asAkkaHttp: StatusCode = self match {
|
||||
case G.OK => A.OK
|
||||
case G.INVALID_ARGUMENT | G.FAILED_PRECONDITION | G.OUT_OF_RANGE => A.BadRequest
|
||||
case G.UNAUTHENTICATED => A.Unauthorized
|
||||
case G.PERMISSION_DENIED => A.Forbidden
|
||||
case G.NOT_FOUND => A.NotFound
|
||||
case G.ABORTED | G.ALREADY_EXISTS => A.Conflict
|
||||
case G.RESOURCE_EXHAUSTED => A.TooManyRequests
|
||||
case G.CANCELLED => ClientClosedRequest
|
||||
case G.DATA_LOSS | G.UNKNOWN | G.INTERNAL => A.InternalServerError
|
||||
case G.UNIMPLEMENTED => A.NotImplemented
|
||||
case G.UNAVAILABLE => A.ServiceUnavailable
|
||||
case G.DEADLINE_EXCEEDED => A.GatewayTimeout
|
||||
}
|
||||
|
||||
def asAkkaHttpForJsonApi: StatusCode = self match {
|
||||
case G.UNAUTHENTICATED | G.CANCELLED => A.InternalServerError
|
||||
case _ => self.asAkkaHttp
|
||||
}
|
||||
}
|
||||
|
||||
private[this] val ClientClosedRequest =
|
||||
A.custom(
|
||||
499,
|
||||
"Client Closed Request",
|
||||
"The client closed the request before the server could respond.",
|
||||
)
|
||||
}
|
Loading…
Reference in New Issue
Block a user