13355 removed proxy* methods from RouteSetup (#15458)

* WIP

* do it on getuesr first

* remove old getUesr

* createUser

* createUser

* user post paths

* user post paths

* WIP

* removed routesetup from UserManagement

* refactor parties and allocateParty

* remove proxyWithCommand

* refactor PackagesAndDars

* remove proxyWithoutCommand

* refactor  PackagesAndDars

* merge from main

* move UploadDarFile implementation back to package and dars
This commit is contained in:
Chun Lok Ling 2022-12-12 14:38:24 +00:00 committed by GitHub
parent 9875fd0ef9
commit f23f5d02af
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 310 additions and 289 deletions

View File

@ -16,7 +16,8 @@ import akka.util.ByteString
import ContractsService.SearchResult
import EndpointsCompanion._
import json._
import util.FutureUtil.either
import util.toLedgerId
import util.FutureUtil.{either, rightT}
import util.Logging.{InstanceUUID, RequestID, extendWithRequestIdLogCtx}
import com.daml.logging.LoggingContextOf.withEnrichedLoggingContext
import scalaz.std.scalaFuture._
@ -31,10 +32,13 @@ import com.daml.http.metrics.HttpJsonApiMetrics
import com.daml.logging.{ContextualizedLogger, LoggingContextOf}
import com.daml.metrics.Timed
import akka.http.scaladsl.server.Directives._
import com.daml.http.endpoints.MeteringReportEndpoint
import com.daml.http.endpoints.{MeteringReportEndpoint, RouteSetup}
import com.daml.jwt.domain.Jwt
import com.daml.ledger.api.{domain => LedgerApiDomain}
import com.daml.ledger.client.services.admin.UserManagementClient
import com.daml.ledger.client.services.identity.LedgerIdentityClient
import com.daml.metrics.api.MetricHandle.Timer
import scalaz.EitherT.eitherT
import scala.util.control.NonFatal
@ -69,26 +73,23 @@ class Endpoints(
import commandsHelper._
private[this] val userManagement: endpoints.UserManagement = new endpoints.UserManagement(
routeSetup = routeSetup,
decodeJwt = decodeJwt,
userManagementClient,
)
import userManagement._
private[this] val packagesDars: endpoints.PackagesAndDars = new endpoints.PackagesAndDars(
routeSetup = routeSetup,
packageManagementService,
)
private[this] val packagesDars: endpoints.PackagesAndDars =
new endpoints.PackagesAndDars(routeSetup, packageManagementService)
import packagesDars._
private[this] val meteringReportEndpoint =
new MeteringReportEndpoint(routeSetup, meteringReportService)
new MeteringReportEndpoint(meteringReportService)
private[this] val contractList: endpoints.ContractList =
new endpoints.ContractList(routeSetup, decoder, contractsService)
import contractList._
private[this] val partiesEP: endpoints.Parties = new endpoints.Parties(routeSetup, partiesService)
private[this] val partiesEP: endpoints.Parties = new endpoints.Parties(partiesService)
import partiesEP._
private[this] val logger = ContextualizedLogger.get(getClass)
@ -109,6 +110,86 @@ class Endpoints(
): Route =
responseToRoute(httpResponse(res))
private def toPostRoute[Req: JsonReader, Res: JsonWriter](
httpRequest: HttpRequest,
fn: (Jwt, Req) => ET[domain.SyncResponse[Res]],
)(implicit
lc: LoggingContextOf[InstanceUUID with RequestID],
metrics: HttpJsonApiMetrics,
): Route = {
val res = for {
t <- routeSetup.inputJsVal(httpRequest): ET[(Jwt, JsValue)]
(jwt, reqBody) = t
req <- either(SprayJson.decode[Req](reqBody).liftErr(InvalidUserInput)): ET[Req]
res <- eitherT(RouteSetup.handleFutureEitherFailure(fn(jwt, req).run)): ET[
domain.SyncResponse[Res]
]
} yield res
responseToRoute(httpResponse(res))
}
private def toGetRoute[Res](
httpRequest: HttpRequest,
fn: (Jwt) => ET[domain.SyncResponse[Res]],
)(implicit
lc: LoggingContextOf[InstanceUUID with RequestID],
mkHttpResponse: MkHttpResponse[ET[domain.SyncResponse[Res]]],
): Route = {
val res = for {
t <- eitherT(routeSetup.input(httpRequest)): ET[(Jwt, String)]
(jwt, _) = t
res <- eitherT(RouteSetup.handleFutureEitherFailure(fn(jwt).run)): ET[
domain.SyncResponse[Res]
]
} yield res
responseToRoute(httpResponse(res))
}
private def toGetRouteLedgerId[Res](
httpRequest: HttpRequest,
fn: (Jwt, LedgerApiDomain.LedgerId) => ET[domain.SyncResponse[Res]],
)(implicit
lc: LoggingContextOf[InstanceUUID with RequestID],
mkHttpResponse: MkHttpResponse[ET[domain.SyncResponse[Res]]],
): Route = {
val res = for {
t <- extractJwtAndLedgerId(httpRequest)
(jwt, ledgerId) = t
res <- eitherT(
RouteSetup.handleFutureEitherFailure(fn(jwt, ledgerId).run)
): ET[domain.SyncResponse[Res]]
} yield res
responseToRoute(httpResponse(res))
}
private def toDownloadPackageRoute[Res](
httpRequest: HttpRequest,
packageId: String,
fn: (Jwt, LedgerApiDomain.LedgerId, String) => Future[HttpResponse],
)(implicit
lc: LoggingContextOf[InstanceUUID with RequestID]
): Route = {
responseToRoute(
httpResponse(
extractJwtAndLedgerId(httpRequest).flatMap { case (jwt, ledgerId) =>
rightT(fn(jwt, ledgerId, packageId))
}
)
)
}
private def extractJwtAndLedgerId(
httpRequest: HttpRequest
)(implicit
lc: LoggingContextOf[InstanceUUID with RequestID]
): ET[(Jwt, LedgerApiDomain.LedgerId)] = for {
t <- routeSetup
.inputAndJwtPayload[domain.JwtPayloadLedgerIdOnly](httpRequest): ET[
(Jwt, domain.JwtPayloadLedgerIdOnly, String)
]
(jwt, jwtBody, _) = t
} yield (jwt, toLedgerId(jwtBody.ledgerId))
private def mkRequestLogMsg(request: HttpRequest, remoteAddress: RemoteAddress) =
s"Incoming ${request.method.value} request on ${request.uri} from $remoteAddress"
@ -265,33 +346,31 @@ class Endpoints(
),
path("query") & withTimer(queryMatchingTimer) apply toRoute(query(req)),
path("fetch") & withFetchTimer apply toRoute(fetch(req)),
path("user") apply toRoute(getUser(req)),
path("user" / "create") apply toRoute(createUser(req)),
path("user" / "delete") apply toRoute(deleteUser(req)),
path("user" / "rights") apply toRoute(listUserRights(req)),
path("user" / "rights" / "grant") apply toRoute(grantUserRights(req)),
path("user" / "rights" / "revoke") apply toRoute(revokeUserRights(req)),
path("parties") & withFetchTimer apply toRoute(parties(req)),
path("parties" / "allocate") & withTimer(
allocatePartyTimer
) apply toRoute(allocateParty(req)),
path("user") apply toPostRoute(req, getUser),
path("user" / "create") apply toPostRoute(req, createUser),
path("user" / "delete") apply toPostRoute(req, deleteUser),
path("user" / "rights") apply toPostRoute(req, listUserRights),
path("user" / "rights" / "grant") apply toPostRoute(req, grantUserRights),
path("user" / "rights" / "revoke") apply toPostRoute(req, revokeUserRights),
path("parties") & withFetchTimer apply toPostRoute(req, parties),
path("parties" / "allocate") & withTimer(allocatePartyTimer) apply toPostRoute(
req,
allocateParty,
),
path("packages") apply toRoute(uploadDarFile(req)),
path("metering-report") apply toRoute(meteringReportEndpoint.generateReportResponse(req)),
path("metering-report") apply toPostRoute(req, meteringReportEndpoint.generateReport),
),
get apply concat(
path("query") & withTimer(queryAllTimer) apply
toRoute(retrieveAll(req)),
path("user") apply toRoute(getAuthenticatedUser(req)),
path("user" / "rights") apply toRoute(
listAuthenticatedUserRights(req)
),
path("users") apply toRoute(listUsers(req)),
path("parties") & withTimer(getPartyTimer) apply
toRoute(allParties(req)),
path("packages") apply toRoute(listPackages(req)),
path("user") apply toGetRoute(req, getAuthenticatedUser),
path("user" / "rights") apply toGetRoute(req, listAuthenticatedUserRights),
path("users") apply toGetRoute(req, listUsers),
path("parties") & withTimer(getPartyTimer) apply toGetRoute(req, allParties),
path("packages") apply toGetRouteLedgerId(req, listPackages),
path("packages" / ".+".r)(packageId =>
withTimer(downloadPackageTimer) & extractRequest apply (req =>
responseToRoute(downloadPackage(req, packageId))
toDownloadPackageRoute(req, packageId, downloadPackage)
)
),
),
@ -323,6 +402,20 @@ class Endpoints(
output.map(_.fold(httpResponseError, searchHttpResponse))
}
private implicit def mkHttpResponseEitherT(implicit
lc: LoggingContextOf[InstanceUUID with RequestID]
): MkHttpResponse[ET[HttpResponse]] =
MkHttpResponse { output =>
implicitly[MkHttpResponse[Future[Error \/ HttpResponse]]].run(output.run)
}
private implicit def mkHttpResponse(implicit
lc: LoggingContextOf[InstanceUUID with RequestID]
): MkHttpResponse[Future[Error \/ HttpResponse]] =
MkHttpResponse { output =>
output.map(_.fold(httpResponseError, identity))
}
private def searchHttpResponse(searchResult: SearchResult[Error \/ JsValue]): HttpResponse = {
import json.JsonProtocol._

View File

@ -80,7 +80,7 @@ object EndpointsCompanion {
case NotFound(e) => s"Endpoints.NotFound: ${e: String}"
}
def fromThrowable: Throwable PartialFunction Error = {
def fromThrowable: PartialFunction[Throwable, Error] = {
case LedgerClientJwt.Grpc.StatusEnvelope(status) => ParticipantServerError(status)
case NonFatal(t) => ServerError(t)
}

View File

@ -4,7 +4,6 @@
package com.daml.http
package endpoints
import akka.http.scaladsl.model.HttpRequest
import com.daml.http.Endpoints.ET
import com.daml.http.EndpointsCompanion.{Error, ServerError}
import com.daml.http.endpoints.MeteringReportEndpoint.{MeteringReportDateRequest, toPbRequest}
@ -16,12 +15,13 @@ import com.daml.lf.data.Time.Timestamp
import com.daml.logging.LoggingContextOf
import com.google.protobuf
import com.google.protobuf.struct.Struct
import scalaz.EitherT.eitherT
import scalaz.\/
import scalaz.std.scalaFuture._
import spray.json._
import java.time.{Instant, LocalDate, LocalTime, ZoneOffset}
import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.ExecutionContext
import scala.util.Try
private[http] object MeteringReportEndpoint {
@ -83,25 +83,18 @@ private[http] object MeteringReportEndpoint {
}
class MeteringReportEndpoint(routeSetup: RouteSetup, service: MeteringReportService)(implicit
class MeteringReportEndpoint(service: MeteringReportService)(implicit
ec: ExecutionContext
) {
import routeSetup._
def generateReportResponse(req: HttpRequest)(implicit
lc: LoggingContextOf[InstanceUUID with RequestID]
): ET[domain.SyncResponse[Struct]] = {
proxyWithCommand(generateReport)(req)
.map[domain.SyncResponse[Struct]](domain.OkResponse(_))
}
def generateReport(jwt: Jwt, dateRequest: MeteringReportDateRequest)(implicit
lc: LoggingContextOf[InstanceUUID with RequestID]
): Future[Error \/ Struct] = {
service
.getMeteringReport(jwt, toPbRequest(dateRequest))
.map(MeteringReportEndpoint.toJsonMeteringReport)
}
): ET[domain.SyncResponse[Struct]] = for {
s <- eitherT(
service
.getMeteringReport(jwt, toPbRequest(dateRequest))
.map(MeteringReportEndpoint.toJsonMeteringReport)
)
} yield (domain.OkResponse(s))
}

View File

@ -4,41 +4,33 @@
package com.daml.http
package endpoints
import akka.NotUsed
import akka.http.scaladsl.model._
import akka.stream.Materializer
import akka.stream.scaladsl.Source
import akka.util.ByteString
import EndpointsCompanion._
import akka.NotUsed
import com.daml.http.metrics.HttpJsonApiMetrics
import Endpoints.ET
import com.daml.scalautil.Statement.discard
import domain.JwtPayloadLedgerIdOnly
import util.FutureUtil.{either, eitherT}
import util.FutureUtil.{eitherT, rightT}
import util.Logging.{InstanceUUID, RequestID}
import util.{ProtobufByteStrings, toLedgerId}
import com.daml.jwt.domain.Jwt
import scalaz.EitherT
import scalaz.std.scalaFuture._
import scalaz.{-\/, EitherT, \/, \/-}
import scala.concurrent.{ExecutionContext, Future}
import com.daml.http.metrics.HttpJsonApiMetrics
import com.daml.ledger.api.{domain => LedgerApiDomain}
import com.daml.logging.LoggingContextOf
class PackagesAndDars(routeSetup: RouteSetup, packageManagementService: PackageManagementService)(
implicit
ec: ExecutionContext,
mat: Materializer,
implicit ec: ExecutionContext
) {
import routeSetup._, RouteSetup._
def uploadDarFile(req: HttpRequest)(implicit
def uploadDarFile(httpRequest: HttpRequest)(implicit
lc: LoggingContextOf[InstanceUUID with RequestID],
metrics: HttpJsonApiMetrics,
): ET[domain.SyncResponse[Unit]] =
): ET[domain.SyncResponse[Unit]] = {
for {
parseAndDecodeTimer <- getParseAndDecodeTimerCtx()
_ <- EitherT.pure(metrics.uploadPackagesThroughput.mark())
t2 <- inputSource(req)
t2 <- eitherT(routeSetup.inputSource(httpRequest))
(jwt, payload, source) = t2
_ <- EitherT.pure(parseAndDecodeTimer.stop())
@ -51,46 +43,26 @@ class PackagesAndDars(routeSetup: RouteSetup, packageManagementService: PackageM
)
)
): ET[Unit]
} yield domain.OkResponse(())
private[this] def inputSource(req: HttpRequest)(implicit
lc: LoggingContextOf[InstanceUUID with RequestID]
): ET[(Jwt, JwtPayloadLedgerIdOnly, Source[ByteString, Any])] =
either(findJwt(req))
.leftMap { e =>
discard { req.entity.discardBytes(mat) }
e: Error
}
.flatMap(j =>
withJwtPayload[Source[ByteString, Any], JwtPayloadLedgerIdOnly]((j, req.entity.dataBytes))
.leftMap(it => it: Error)
)
def listPackages(req: HttpRequest)(implicit
lc: LoggingContextOf[InstanceUUID with RequestID]
): ET[domain.SyncResponse[Seq[String]]] =
proxyWithoutCommand(packageManagementService.listPackages)(req).map(domain.OkResponse(_))
def downloadPackage(req: HttpRequest, packageId: String)(implicit
lc: LoggingContextOf[InstanceUUID with RequestID]
): Future[HttpResponse] = {
val et: ET[admin.GetPackageResponse] =
proxyWithoutCommand((jwt, ledgerId) =>
packageManagementService.getPackage(jwt, ledgerId, packageId)
)(req)
val fa: Future[Error \/ admin.GetPackageResponse] = et.run
fa.map {
case -\/(e) =>
httpResponseError(e)
case \/-(x) =>
HttpResponse(
entity = HttpEntity.apply(
ContentTypes.`application/octet-stream`,
ProtobufByteStrings.toSource(x.archivePayload),
)
)
}
}
def listPackages(jwt: Jwt, ledgerId: LedgerApiDomain.LedgerId)(implicit
lc: LoggingContextOf[InstanceUUID with RequestID]
): ET[domain.SyncResponse[Seq[String]]] =
rightT(packageManagementService.listPackages(jwt, ledgerId)).map(domain.OkResponse(_))
def downloadPackage(jwt: Jwt, ledgerId: LedgerApiDomain.LedgerId, packageId: String)(implicit
lc: LoggingContextOf[InstanceUUID with RequestID]
): Future[HttpResponse] = {
val pkgResp: Future[admin.GetPackageResponse] =
packageManagementService.getPackage(jwt, ledgerId, packageId)
pkgResp.map { x =>
HttpResponse(
entity = HttpEntity.apply(
ContentTypes.`application/octet-stream`,
ProtobufByteStrings.toSource(x.archivePayload),
)
)
}
}
}

View File

@ -4,10 +4,10 @@
package com.daml.http
package endpoints
import akka.http.scaladsl.model._
import Endpoints.ET
import com.daml.jwt.domain.Jwt
import util.Collections.toNonEmptySet
import util.FutureUtil.either
import util.FutureUtil.eitherT
import util.Logging.{InstanceUUID, RequestID}
import scalaz.std.scalaFuture._
import scalaz.{EitherT, NonEmptyList}
@ -16,35 +16,30 @@ import scala.concurrent.ExecutionContext
import com.daml.http.metrics.HttpJsonApiMetrics
import com.daml.logging.LoggingContextOf
private[http] final class Parties(
routeSetup: RouteSetup,
partiesService: PartiesService,
)(implicit ec: ExecutionContext) {
private[http] final class Parties(partiesService: PartiesService)(implicit ec: ExecutionContext) {
import Parties._
import routeSetup._
import json.JsonProtocol._
def allParties(req: HttpRequest)(implicit
def allParties(jwt: Jwt)(implicit
lc: LoggingContextOf[InstanceUUID with RequestID]
): ET[domain.SyncResponse[List[domain.PartyDetails]]] = for {
res <- eitherT(partiesService.allParties(jwt))
} yield domain.OkResponse(res)
def parties(jwt: Jwt, parties: NonEmptyList[domain.Party])(implicit
lc: LoggingContextOf[InstanceUUID with RequestID]
): ET[domain.SyncResponse[List[domain.PartyDetails]]] =
proxyWithoutCommand((jwt, _) => partiesService.allParties(jwt))(req)
.flatMap(pd => either(pd map (domain.OkResponse(_))))
for {
ps <- eitherT(partiesService.parties(jwt, toNonEmptySet(parties)))
} yield partiesResponse(parties = ps._1.toList, unknownParties = ps._2.toList)
def parties(req: HttpRequest)(implicit
lc: LoggingContextOf[InstanceUUID with RequestID]
): ET[domain.SyncResponse[List[domain.PartyDetails]]] =
proxyWithCommand[NonEmptyList[domain.Party], (Set[domain.PartyDetails], Set[domain.Party])](
(jwt, cmd) => partiesService.parties(jwt, toNonEmptySet(cmd))
)(req)
.map(ps => partiesResponse(parties = ps._1.toList, unknownParties = ps._2.toList))
def allocateParty(req: HttpRequest)(implicit
def allocateParty(jwt: Jwt, request: domain.AllocatePartyRequest)(implicit
lc: LoggingContextOf[InstanceUUID with RequestID],
metrics: HttpJsonApiMetrics,
): ET[domain.SyncResponse[domain.PartyDetails]] =
EitherT
.pure(metrics.allocatePartyThroughput.mark())
.flatMap(_ => proxyWithCommand(partiesService.allocate)(req).map(domain.OkResponse(_)))
for {
_ <- EitherT.pure(metrics.allocatePartyThroughput.mark())
res <- eitherT(partiesService.allocate(jwt, request))
} yield domain.OkResponse(res)
}
private[endpoints] object Parties {

View File

@ -15,6 +15,8 @@ import akka.http.scaladsl.model.headers.{
import akka.stream.Materializer
import Endpoints.ET
import EndpointsCompanion._
import akka.stream.scaladsl.Source
import akka.util.ByteString
import com.daml.http.metrics.HttpJsonApiMetrics
import com.daml.logging.LoggingContextOf.withEnrichedLoggingContext
import com.daml.scalautil.Statement.discard
@ -22,7 +24,6 @@ import domain.{JwtPayloadG, JwtPayloadLedgerIdOnly, JwtPayloadTag, JwtWritePaylo
import json._
import util.FutureUtil.{either, eitherT}
import util.Logging.{InstanceUUID, RequestID}
import util.toLedgerId
import com.daml.jwt.domain.Jwt
import com.daml.ledger.api.{v1 => lav1}
import lav1.value.{Value => ApiValue}
@ -34,7 +35,6 @@ import spray.json._
import scala.concurrent.duration.FiniteDuration
import scala.concurrent.{ExecutionContext, Future}
import scala.util.Try
import com.daml.ledger.api.{domain => LedgerApiDomain}
import com.daml.ledger.client.services.admin.UserManagementClient
import com.daml.ledger.client.services.identity.LedgerIdentityClient
import com.daml.logging.{ContextualizedLogger, LoggingContextOf}
@ -52,34 +52,6 @@ private[http] final class RouteSetup(
import encoder.implicits._
import util.ErrorOps._
def proxyWithCommand[A: JsonReader, R](
fn: (Jwt, A) => Future[Error \/ R]
)(req: HttpRequest)(implicit
lc: LoggingContextOf[InstanceUUID with RequestID]
): ET[R] =
for {
t2 <- inputJsVal(req): ET[(Jwt, JsValue)]
(jwt, reqBody) = t2
a <- either(SprayJson.decode[A](reqBody).liftErr(InvalidUserInput)): ET[A]
b <- eitherT(handleFutureEitherFailure(fn(jwt, a))): ET[R]
} yield b
private[endpoints] def proxyWithCommandET[A: JsonReader, R](
fn: (Jwt, A) => ET[R]
)(req: HttpRequest)(implicit
lc: LoggingContextOf[InstanceUUID with RequestID]
): ET[R] = proxyWithCommand((jwt, a: A) => fn(jwt, a).run)(req)
def proxyWithoutCommand[A](
fn: (Jwt, LedgerApiDomain.LedgerId) => Future[A]
)(req: HttpRequest)(implicit
lc: LoggingContextOf[InstanceUUID with RequestID]
): ET[A] =
for {
t3 <- inputAndJwtPayload[JwtPayloadLedgerIdOnly](req).leftMap(it => it: Error)
a <- eitherT(handleFutureFailure(fn(t3._1, toLedgerId(t3._2.ledgerId)))): ET[A]
} yield a
private[endpoints] def handleCommand[T[_]](req: HttpRequest)(
fn: (
Jwt,
@ -135,7 +107,7 @@ private[http] final class RouteSetup(
): ET[TimerHandle] =
EitherT.pure(metrics.incomingJsonParsingAndValidationTimer.startAsync())
private[endpoints] def input(req: HttpRequest)(implicit
private[http] def input(req: HttpRequest)(implicit
lc: LoggingContextOf[InstanceUUID with RequestID]
): Future[Error \/ (Jwt, String)] = {
findJwt(req) match {
@ -147,10 +119,23 @@ private[http] final class RouteSetup(
}
}
private[http] def inputSource(req: HttpRequest)(implicit
lc: LoggingContextOf[InstanceUUID with RequestID]
): Future[Error \/ (Jwt, JwtPayloadLedgerIdOnly, Source[ByteString, Any])] =
findJwt(req) match {
case e @ -\/(_) =>
discard { req.entity.discardBytes(mat) }
Future.successful(e)
case \/-(j) =>
withJwtPayload[Source[ByteString, Any], JwtPayloadLedgerIdOnly](
(j, req.entity.dataBytes)
).run
}
private[this] def data(entity: RequestEntity): Future[String] =
entity.toStrict(maxTimeToCollectRequest).map(_.data.utf8String)
private[this] def inputJsVal(req: HttpRequest)(implicit
private[http] def inputJsVal(req: HttpRequest)(implicit
lc: LoggingContextOf[InstanceUUID with RequestID]
): ET[(Jwt, JsValue)] =
for {
@ -181,7 +166,7 @@ private[http] final class RouteSetup(
}
}
private[endpoints] object RouteSetup {
private[http] object RouteSetup {
import Endpoints.IntoEndpointsError
private val logger = ContextualizedLogger.get(getClass)
@ -200,12 +185,12 @@ private[endpoints] object RouteSetup {
"read_as" -> jwtPayload.readAs.toString,
).run(fn)
private[endpoints] def handleFutureFailure[A](fa: Future[A])(implicit
private[http] def handleFutureFailure[A](fa: Future[A])(implicit
ec: ExecutionContext
): Future[Error \/ A] =
fa.map(a => \/-(a)).recover(Error.fromThrowable andThen (-\/(_)))
private[endpoints] def handleFutureEitherFailure[A, B](fa: Future[A \/ B])(implicit
private[http] def handleFutureEitherFailure[A, B](fa: Future[A \/ B])(implicit
ec: ExecutionContext,
A: IntoEndpointsError[A],
): Future[Error \/ B] =

View File

@ -5,13 +5,11 @@ package com.daml.http
package endpoints
import akka.NotUsed
import akka.http.scaladsl.model._
import akka.stream.scaladsl.Source
import EndpointsCompanion._
import Endpoints.ET
import com.daml.http.EndpointsCompanion.CreateFromUserToken.userIdFromToken
import util.FutureUtil.{either, eitherT}
import util.Logging.{InstanceUUID, RequestID}
import util.FutureUtil.either
import com.daml.jwt.domain.Jwt
import com.daml.ledger.api.auth.StandardJWTPayload
import scalaz.std.scalaFuture._
@ -19,142 +17,126 @@ import scalaz.syntax.traverse._
import scalaz.{-\/, EitherT, Monad, \/, \/-}
import scala.concurrent.{ExecutionContext, Future}
import com.daml.logging.LoggingContextOf
import com.daml.ledger.api.domain.{User, UserRight}
import com.daml.ledger.client.services.admin.UserManagementClient
import com.daml.lf.data.Ref.UserId
private[http] final class UserManagement(
routeSetup: RouteSetup,
decodeJwt: EndpointsCompanion.ValidateJwt,
userManagementClient: UserManagementClient,
)(implicit
ec: ExecutionContext
) {
import UserManagement._
import routeSetup._
import json.JsonProtocol._
def getUser(req: HttpRequest)(implicit
lc: LoggingContextOf[InstanceUUID with RequestID]
): ET[domain.SyncResponse[domain.UserDetails]] =
proxyWithCommandET { (jwt, getUserRequest: domain.GetUserRequest) =>
for {
userId <- parseUserId(getUserRequest.userId)
user <- EitherT.rightT(userManagementClient.getUser(userId, Some(jwt.value)))
} yield domain.OkResponse(
domain.UserDetails(user.id, user.primaryParty)
): domain.SyncResponse[domain.UserDetails]
}(req)
def createUser(req: HttpRequest)(implicit
lc: LoggingContextOf[InstanceUUID with RequestID]
): ET[domain.SyncResponse[spray.json.JsObject]] =
proxyWithCommand { (jwt, createUserRequest: domain.CreateUserRequest) =>
{
import scalaz.std.option._
import scalaz.syntax.traverse._
import scalaz.syntax.std.either._
import com.daml.lf.data.Ref
val input =
for {
username <- UserId.fromString(createUserRequest.userId).disjunction
primaryParty <- createUserRequest.primaryParty.traverse(it =>
Ref.Party.fromString(it).disjunction
)
rights <- domain.UserRights.toLedgerUserRights(
createUserRequest.rights.getOrElse(List.empty)
)
} yield (username, primaryParty, rights)
for {
info <- EitherT.either(input.leftMap(InvalidUserInput)): ET[
(UserId, Option[Ref.Party], List[UserRight])
]
(username, primaryParty, initialRights) = info
_ <- EitherT.rightT(
userManagementClient.createUser(
User(username, primaryParty),
initialRights,
Some(jwt.value),
)
)
} yield emptyObjectResponse
}.run
}(req)
def deleteUser(req: HttpRequest)(implicit
lc: LoggingContextOf[InstanceUUID with RequestID]
): ET[domain.SyncResponse[spray.json.JsObject]] =
proxyWithCommandET { (jwt, deleteUserRequest: domain.DeleteUserRequest) =>
for {
userId <- parseUserId(deleteUserRequest.userId)
_ <- EitherT.rightT(userManagementClient.deleteUser(userId, Some(jwt.value)))
} yield emptyObjectResponse
}(req)
def listUserRights(req: HttpRequest)(implicit
lc: LoggingContextOf[InstanceUUID with RequestID]
): ET[domain.SyncResponse[List[domain.UserRight]]] =
proxyWithCommandET { (jwt, listUserRightsRequest: domain.ListUserRightsRequest) =>
for {
userId <- parseUserId(listUserRightsRequest.userId)
rights <- EitherT.rightT(
userManagementClient.listUserRights(userId, Some(jwt.value))
)
} yield domain
.OkResponse(domain.UserRights.fromLedgerUserRights(rights)): domain.SyncResponse[
List[domain.UserRight]
]
}(req)
def grantUserRights(req: HttpRequest)(implicit
lc: LoggingContextOf[InstanceUUID with RequestID]
): ET[domain.SyncResponse[List[domain.UserRight]]] =
proxyWithCommandET { (jwt, grantUserRightsRequest: domain.GrantUserRightsRequest) =>
for {
userId <- parseUserId(grantUserRightsRequest.userId)
rights <- either(
domain.UserRights.toLedgerUserRights(grantUserRightsRequest.rights)
).leftMap(InvalidUserInput): ET[List[UserRight]]
grantedUserRights <- EitherT.rightT(
userManagementClient.grantUserRights(userId, rights, Some(jwt.value))
)
} yield domain.OkResponse(
domain.UserRights.fromLedgerUserRights(grantedUserRights)
): domain.SyncResponse[List[domain.UserRight]]
}(req)
def revokeUserRights(req: HttpRequest)(implicit
lc: LoggingContextOf[InstanceUUID with RequestID]
): ET[domain.SyncResponse[List[domain.UserRight]]] =
proxyWithCommandET { (jwt, revokeUserRightsRequest: domain.RevokeUserRightsRequest) =>
for {
userId <- parseUserId(revokeUserRightsRequest.userId)
rights <- either(
domain.UserRights.toLedgerUserRights(revokeUserRightsRequest.rights)
).leftMap(InvalidUserInput): ET[List[UserRight]]
revokedUserRights <- EitherT.rightT(
userManagementClient.revokeUserRights(userId, rights, Some(jwt.value))
)
} yield domain.OkResponse(
domain.UserRights.fromLedgerUserRights(revokedUserRights)
): domain.SyncResponse[List[domain.UserRight]]
}(req)
def getAuthenticatedUser(req: HttpRequest)(implicit
lc: LoggingContextOf[InstanceUUID with RequestID]
): ET[domain.SyncResponse[domain.UserDetails]] =
def getUser(jwt: Jwt, req: domain.GetUserRequest): ET[domain.SyncResponse[domain.UserDetails]] =
for {
userId <- parseUserId(req.userId)
user <- EitherT.rightT(userManagementClient.getUser(userId, Some(jwt.value)))
} yield domain.OkResponse(
domain.UserDetails(user.id, user.primaryParty)
): domain.SyncResponse[domain.UserDetails]
def createUser(
jwt: Jwt,
createUserRequest: domain.CreateUserRequest,
): ET[domain.SyncResponse[spray.json.JsObject]] = {
import scalaz.std.option._
import scalaz.syntax.traverse._
import scalaz.syntax.std.either._
import com.daml.lf.data.Ref
val input =
for {
username <- UserId.fromString(createUserRequest.userId).disjunction
primaryParty <- createUserRequest.primaryParty.traverse(it =>
Ref.Party.fromString(it).disjunction
)
rights <- domain.UserRights.toLedgerUserRights(
createUserRequest.rights.getOrElse(List.empty)
)
} yield (username, primaryParty, rights)
for {
info <- EitherT.either(input.leftMap(InvalidUserInput)): ET[
(UserId, Option[Ref.Party], List[UserRight])
]
(username, primaryParty, initialRights) = info
_ <- EitherT.rightT(
userManagementClient.createUser(
User(username, primaryParty),
initialRights,
Some(jwt.value),
)
)
} yield emptyObjectResponse
}
def deleteUser(
jwt: Jwt,
deleteUserRequest: domain.DeleteUserRequest,
): ET[domain.SyncResponse[spray.json.JsObject]] = {
for {
userId <- parseUserId(deleteUserRequest.userId)
_ <- EitherT.rightT(userManagementClient.deleteUser(userId, Some(jwt.value)))
} yield emptyObjectResponse
}
def listUserRights(
jwt: Jwt,
listUserRightsRequest: domain.ListUserRightsRequest,
): ET[domain.SyncResponse[List[domain.UserRight]]] = {
for {
userId <- parseUserId(listUserRightsRequest.userId)
rights <- EitherT.rightT(
userManagementClient.listUserRights(userId, Some(jwt.value))
)
} yield domain
.OkResponse(domain.UserRights.fromLedgerUserRights(rights)): domain.SyncResponse[List[
domain.UserRight
]]
}
def grantUserRights(
jwt: Jwt,
grantUserRightsRequest: domain.GrantUserRightsRequest,
): ET[domain.SyncResponse[List[domain.UserRight]]] = {
for {
userId <- parseUserId(grantUserRightsRequest.userId)
rights <- either(
domain.UserRights.toLedgerUserRights(grantUserRightsRequest.rights)
).leftMap(InvalidUserInput): ET[List[UserRight]]
grantedUserRights <- EitherT.rightT(
userManagementClient.grantUserRights(userId, rights, Some(jwt.value))
)
} yield domain.OkResponse(
domain.UserRights.fromLedgerUserRights(grantedUserRights)
): domain.SyncResponse[List[domain.UserRight]]
}
def revokeUserRights(
jwt: Jwt,
revokeUserRightsRequest: domain.RevokeUserRightsRequest,
): ET[domain.SyncResponse[List[domain.UserRight]]] = {
for {
userId <- parseUserId(revokeUserRightsRequest.userId)
rights <- either(
domain.UserRights.toLedgerUserRights(revokeUserRightsRequest.rights)
).leftMap(InvalidUserInput): ET[List[UserRight]]
revokedUserRights <- EitherT.rightT(
userManagementClient.revokeUserRights(userId, rights, Some(jwt.value))
)
} yield domain.OkResponse(
domain.UserRights.fromLedgerUserRights(revokedUserRights)
): domain.SyncResponse[List[domain.UserRight]]
}
def getAuthenticatedUser(jwt: Jwt): ET[domain.SyncResponse[domain.UserDetails]] =
for {
jwt <- eitherT(input(req)).bimap(identity[Error], _._1)
userId <- getUserIdFromToken(jwt)
user <- EitherT.rightT(userManagementClient.getUser(userId, Some(jwt.value)))
} yield domain.OkResponse(domain.UserDetails(user.id, user.primaryParty))
def listAuthenticatedUserRights(req: HttpRequest)(implicit
lc: LoggingContextOf[InstanceUUID with RequestID]
): ET[domain.SyncResponse[List[domain.UserRight]]] =
def listAuthenticatedUserRights(jwt: Jwt): ET[domain.SyncResponse[List[domain.UserRight]]] = {
for {
jwt <- eitherT(input(req)).bimap(identity[Error], _._1)
userId <- getUserIdFromToken(jwt)
rights <- EitherT.rightT(
userManagementClient.listUserRights(userId, Some(jwt.value))
@ -163,14 +145,15 @@ private[http] final class UserManagement(
.OkResponse(domain.UserRights.fromLedgerUserRights(rights)): domain.SyncResponse[List[
domain.UserRight
]]
}
def listUsers(req: HttpRequest)(implicit
lc: LoggingContextOf[InstanceUUID with RequestID]
): ET[domain.SyncResponse[Source[Error \/ domain.UserDetails, NotUsed]]] =
for {
jwt <- eitherT(input(req)).bimap(identity[Error], _._1)
users = aggregateListUserPages(Some(jwt.value))
} yield domain.OkResponse(users.map(_ map domain.UserDetails.fromUser))
def listUsers(
jwt: Jwt
): ET[domain.SyncResponse[Source[Error \/ domain.UserDetails, NotUsed]]] = {
val users = aggregateListUserPages(Some(jwt.value))
val userDetails = users.map(_ map domain.UserDetails.fromUser)
EitherT.rightT(Future.successful(domain.OkResponse(userDetails)))
}
private def aggregateListUserPages(
token: Option[String],