[JSON-API] Add moar timing metrics (#10045)

* [JSON-API] Add more timing metrics

changelog_begin

- [JSON-API] Timing metrics are now available for party management, package management, command submission and query endpoints.
- [JSON-API] Also added a timing metric for parsing and decoding of incoming json payloads

changelog_end

* Add comments to new metrics

* Split metrics up more & remove obsolete metric

* Split up timers for query endpoints
This commit is contained in:
Victor Peter Rouven Müller 2021-06-21 16:37:36 +02:00 committed by GitHub
parent e12a449c81
commit e4585295c6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 114 additions and 40 deletions

View File

@ -17,6 +17,7 @@ import akka.http.scaladsl.model.headers.{
import akka.stream.Materializer
import akka.stream.scaladsl.{Flow, Source}
import akka.util.ByteString
import com.codahale.metrics.Timer
import com.daml.lf
import com.daml.http.ContractsService.SearchResult
import com.daml.http.EndpointsCompanion._
@ -66,64 +67,105 @@ class Endpoints(
import util.ErrorOps._
import Uri.Path._
// Parenthesis in the case matches below are required because otherwise scalafmt breaks.
//noinspection ScalaUnnecessaryParentheses
def all(implicit
lc: LoggingContextOf[InstanceUUID],
metrics: Metrics,
): PartialFunction[HttpRequest, Http.IncomingConnection => Future[HttpResponse]] = {
val dispatch: PartialFunction[HttpRequest, LoggingContextOf[
InstanceUUID with RequestID
] => Future[HttpResponse]] = {
// Parenthesis are required because otherwise scalafmt breaks.
val apiMetrics = metrics.daml.HttpJsonApi
type DispatchFun =
PartialFunction[HttpRequest, LoggingContextOf[InstanceUUID with RequestID] => Future[
HttpResponse
]]
def mkDispatchFunWithTimer(timer: Timer)(fun: DispatchFun): DispatchFun =
fun andThen (f => lc => Timed.future(timer, f(lc)))
val commandDispatch =
mkDispatchFunWithTimer(apiMetrics.commandSubmissionTimer) {
case req @ HttpRequest(POST, Uri.Path("/v1/create"), _, _, _) =>
(implicit lc => httpResponse(create(req)))
case req @ HttpRequest(POST, Uri.Path("/v1/exercise"), _, _, _) =>
(implicit lc => httpResponse(exercise(req)))
case req @ HttpRequest(POST, Uri.Path("/v1/create-and-exercise"), _, _, _) =>
(implicit lc => httpResponse(createAndExercise(req)))
case req @ HttpRequest(POST, Uri.Path("/v1/fetch"), _, _, _) =>
(implicit lc => httpResponse(fetch(req)))
}
val queryAllDispatch = mkDispatchFunWithTimer(apiMetrics.queryAllTimer) {
case req @ HttpRequest(GET, Uri.Path("/v1/query"), _, _, _) =>
(implicit lc => httpResponse(retrieveAll(req)))
}
val queryMatchingDispatch = mkDispatchFunWithTimer(apiMetrics.queryMatchingTimer) {
case req @ HttpRequest(POST, Uri.Path("/v1/query"), _, _, _) =>
(implicit lc => httpResponse(query(req)))
}
val fetchDispatch: DispatchFun = {
case req @ HttpRequest(POST, Uri.Path("/v1/fetch"), _, _, _) =>
(implicit lc => Timed.future(apiMetrics.fetchTimer, httpResponse(fetch(req))))
}
val getPartyDispatch = mkDispatchFunWithTimer(apiMetrics.getPartyTimer) {
case req @ HttpRequest(GET, Uri.Path("/v1/parties"), _, _, _) =>
(implicit lc => httpResponse(allParties(req)))
case req @ HttpRequest(POST, Uri.Path("/v1/parties"), _, _, _) =>
(implicit lc => httpResponse(parties(req)))
}
val allocatePartyDispatch: DispatchFun = {
case req @ HttpRequest(POST, Uri.Path("/v1/parties/allocate"), _, _, _) =>
(implicit lc => httpResponse(allocateParty(req)))
(
implicit lc =>
Timed.future(apiMetrics.allocatePartyTimer, httpResponse(allocateParty(req)))
)
}
val packageManagementDispatch: DispatchFun = {
case req @ HttpRequest(GET, Uri.Path("/v1/packages"), _, _, _) =>
(implicit lc => httpResponse(listPackages(req)))
case req @ HttpRequest(POST, Uri.Path("/v1/packages"), _, _, _) =>
(
implicit lc =>
Timed.future(
apiMetrics.uploadPackageTimer,
httpResponse(uploadDarFile(req)),
)
)
// format: off
case req @ HttpRequest(GET,
Uri(_, _, Slash(Segment("v1", Slash(Segment("packages", Slash(Segment(packageId, Empty)))))), _, _),
_, _, _) => (implicit lc => downloadPackage(req, packageId))
_, _, _) =>
(implicit lc => Timed.future(apiMetrics.downloadPackageTimer, downloadPackage(req, packageId)))
// format: on
case req @ HttpRequest(POST, Uri.Path("/v1/packages"), _, _, _) =>
(implicit lc => httpResponse(uploadDarFile(req)))
}
val liveOrHealthDispatch: DispatchFun = {
case HttpRequest(GET, Uri.Path("/livez"), _, _, _) =>
_ => Future.successful(HttpResponse(status = StatusCodes.OK))
case HttpRequest(GET, Uri.Path("/readyz"), _, _, _) =>
_ => healthService.ready().map(_.toHttpResponse)
}
import scalaz.std.partialFunction._, scalaz.syntax.arrow._
(dispatch &&& { case r => r }) andThen { case (lcFhr, req) =>
((commandDispatch orElse
queryAllDispatch orElse
queryMatchingDispatch orElse
fetchDispatch orElse
getPartyDispatch orElse
allocatePartyDispatch orElse
packageManagementDispatch orElse
liveOrHealthDispatch) &&& { case r => r }) andThen { case (lcFhr, req) =>
(connection: Http.IncomingConnection) =>
extendWithRequestIdLogCtx(implicit lc => {
val t0 = System.nanoTime
logger.info(s"Incoming request on ${req.uri} from ${connection.remoteAddress}")
metrics.daml.HttpJsonApi.httpRequestThroughput.mark()
Timed
.future(metrics.daml.HttpJsonApi.httpRequestTimer, lcFhr(lc))
.map(res => {
logger.trace(s"Processed request after ${System.nanoTime() - t0}ns")
logger.info(s"Responding to client with HTTP ${res.status}")
res
})
for {
res <- lcFhr(lc)
_ = logger.trace(s"Processed request after ${System.nanoTime() - t0}ns")
_ = logger.info(s"Responding to client with HTTP ${res.status}")
} yield res
})
}
}
def getParseAndDecodeTimerCtx()(implicit
metrics: Metrics
): ET[Timer.Context] =
EitherT.pure(metrics.daml.HttpJsonApi.incomingJsonParsingAndValidationTimer.time())
def withJwtPayloadLoggingContext[A](jwtPayload: JwtPayloadG)(
fn: LoggingContextOf[JwtPayloadTag with InstanceUUID with RequestID] => A
)(implicit lc: LoggingContextOf[InstanceUUID with RequestID]): A =
@ -142,6 +184,7 @@ class Endpoints(
Jwt,
JwtWritePayload,
JsValue,
Timer.Context,
) => LoggingContextOf[JwtPayloadTag with InstanceUUID with RequestID] => ET[
T[ApiValue]
]
@ -152,10 +195,13 @@ class Endpoints(
metrics: Metrics,
): ET[domain.SyncResponse[JsValue]] =
for {
parseAndDecodeTimerCtx <- getParseAndDecodeTimerCtx()
_ <- EitherT.pure(metrics.daml.HttpJsonApi.commandSubmissionThroughput.mark())
t3 <- inputJsValAndJwtPayload(req): ET[(Jwt, JwtWritePayload, JsValue)]
(jwt, jwtPayload, reqBody) = t3
resp <- withJwtPayloadLoggingContext(jwtPayload)(fn(jwt, jwtPayload, reqBody))
resp <- withJwtPayloadLoggingContext(jwtPayload)(
fn(jwt, jwtPayload, reqBody, parseAndDecodeTimerCtx)
)
jsVal <- either(SprayJson.encode1(resp).liftErr(ServerError)): ET[JsValue]
} yield domain.OkResponse(jsVal)
@ -163,11 +209,12 @@ class Endpoints(
lc: LoggingContextOf[InstanceUUID with RequestID],
metrics: Metrics,
): ET[domain.SyncResponse[JsValue]] =
handleCommand(req) { (jwt, jwtPayload, reqBody) => implicit lc =>
handleCommand(req) { (jwt, jwtPayload, reqBody, parseAndDecodeTimerCtx) => implicit lc =>
for {
cmd <- either(
decoder.decodeCreateCommand(reqBody).liftErr(InvalidUserInput)
): ET[domain.CreateCommand[ApiRecord, TemplateId.RequiredPkg]]
_ <- EitherT.pure(parseAndDecodeTimerCtx.close())
ac <- eitherT(
handleFutureEitherFailure(commandService.create(jwt, jwtPayload, cmd))
@ -179,12 +226,12 @@ class Endpoints(
lc: LoggingContextOf[InstanceUUID with RequestID],
metrics: Metrics,
): ET[domain.SyncResponse[JsValue]] =
handleCommand(req) { (jwt, jwtPayload, reqBody) => implicit lc =>
handleCommand(req) { (jwt, jwtPayload, reqBody, parseAndDecodeTimerCtx) => implicit lc =>
for {
cmd <- either(
decoder.decodeExerciseCommand(reqBody).liftErr(InvalidUserInput)
): ET[domain.ExerciseCommand[LfValue, domain.ContractLocator[LfValue]]]
_ <- EitherT.pure(parseAndDecodeTimerCtx.close())
resolvedRef <- eitherT(
resolveReference(jwt, jwtPayload, cmd.reference)
): ET[domain.ResolvedContractRef[ApiValue]]
@ -206,11 +253,12 @@ class Endpoints(
lc: LoggingContextOf[InstanceUUID with RequestID],
metrics: Metrics,
): ET[domain.SyncResponse[JsValue]] =
handleCommand(req) { (jwt, jwtPayload, reqBody) => implicit lc =>
handleCommand(req) { (jwt, jwtPayload, reqBody, parseAndDecodeTimerCtx) => implicit lc =>
for {
cmd <- either(
decoder.decodeCreateAndExerciseCommand(reqBody).liftErr(InvalidUserInput)
): ET[domain.CreateAndExerciseCommand[ApiRecord, ApiValue, TemplateId.RequiredPkg]]
_ <- EitherT.pure(parseAndDecodeTimerCtx.close())
resp <- eitherT(
handleFutureEitherFailure(
@ -221,9 +269,11 @@ class Endpoints(
}
def fetch(req: HttpRequest)(implicit
lc: LoggingContextOf[InstanceUUID with RequestID]
lc: LoggingContextOf[InstanceUUID with RequestID],
metrics: Metrics,
): ET[domain.SyncResponse[JsValue]] =
for {
parseAndDecodeTimerCtx <- getParseAndDecodeTimerCtx()
input <- inputJsValAndJwtPayload(req): ET[(Jwt, JwtPayload, JsValue)]
(jwt, jwtPayload, reqBody) = input
@ -235,7 +285,7 @@ class Endpoints(
cl <- either(
decoder.decodeContractLocator(reqBody).liftErr(InvalidUserInput)
): ET[domain.ContractLocator[LfValue]]
_ <- EitherT.pure(parseAndDecodeTimerCtx.close())
_ = logger.debug(s"/v1/fetch cl: $cl")
ac <- eitherT(
@ -251,10 +301,15 @@ class Endpoints(
} yield domain.OkResponse(jsVal)
def retrieveAll(req: HttpRequest)(implicit
lc: LoggingContextOf[InstanceUUID with RequestID]
): Future[Error \/ SearchResult[Error \/ JsValue]] =
inputAndJwtPayload[JwtPayload](req).map {
lc: LoggingContextOf[InstanceUUID with RequestID],
metrics: Metrics,
): Future[Error \/ SearchResult[Error \/ JsValue]] = for {
parseAndDecodeTimerCtx <- Future(
metrics.daml.HttpJsonApi.incomingJsonParsingAndValidationTimer.time()
)
res <- inputAndJwtPayload[JwtPayload](req).map {
_.map { case (jwt, jwtPayload, _) =>
parseAndDecodeTimerCtx.close()
withJwtPayloadLoggingContext(jwtPayload) { implicit lc =>
val result: SearchResult[ContractsService.Error \/ domain.ActiveContract[LfValue]] =
contractsService.retrieveAll(jwt, jwtPayload)
@ -267,6 +322,7 @@ class Endpoints(
}
}
}
} yield res
def query(req: HttpRequest)(implicit
lc: LoggingContextOf[InstanceUUID with RequestID]
@ -341,9 +397,10 @@ class Endpoints(
metrics: Metrics,
): ET[domain.SyncResponse[Unit]] =
for {
parseAndDecodeTimerCtx <- getParseAndDecodeTimerCtx()
_ <- EitherT.pure(metrics.daml.HttpJsonApi.uploadPackagesThroughput.mark())
t2 <- either(inputSource(req)): ET[(Jwt, Source[ByteString, Any])]
_ <- EitherT.pure(parseAndDecodeTimerCtx.close())
(jwt, source) = t2
_ <- eitherT(

View File

@ -706,8 +706,25 @@ final class Metrics(val registry: MetricRegistry) {
object HttpJsonApi {
private val Prefix: MetricName = daml.Prefix :+ "http_json_api"
// Meters how long processing of a request takes
val httpRequestTimer: Timer = registry.timer(Prefix :+ "http_request_timing")
// Meters how long processing of a command submission request takes
val commandSubmissionTimer: Timer = registry.timer(Prefix :+ "command_submission_timing")
// Meters how long processing of a query GET request takes
val queryAllTimer: Timer = registry.timer(Prefix :+ "query_all_timing")
// Meters how long processing of a query POST request takes
val queryMatchingTimer: Timer = registry.timer(Prefix :+ "query_matching_timing")
// Meters how long processing of a fetch request takes
val fetchTimer: Timer = registry.timer(Prefix :+ "fetch_timing")
// Meters how long processing of a get party/parties request takes
val getPartyTimer: Timer = registry.timer(Prefix :+ "get_party_timing")
// Meters how long processing of a party management request takes
val allocatePartyTimer: Timer = registry.timer(Prefix :+ "allocate_party_timing")
// Meters how long processing of a package management request takes
val downloadPackageTimer: Timer = registry.timer(Prefix :+ "download_package_timing")
// Meters how long processing of a package upload request takes
val uploadPackageTimer: Timer = registry.timer(Prefix :+ "upload_package_timing")
// Meters how long parsing and decoding of an incoming json payload takes
val incomingJsonParsingAndValidationTimer: Timer =
registry.timer(Prefix :+ "incoming_json_parsing_and_validation_timing")
// Meters http requests throughput
val httpRequestThroughput: Meter = registry.meter(Prefix :+ "http_request_throughput")
// Meters how many websocket connections are currently active