mirror of
https://github.com/digital-asset/daml.git
synced 2024-09-19 08:48:21 +03:00
[User management] Use pagination for listing users [DPP-840] (#12610)
CHANGELOG_BEGIN Ledger API Specification: UserManagementService.ListUsers is now using pagination CHANGELOG_END
This commit is contained in:
parent
ca976fe3d7
commit
7137b46543
@ -376,8 +376,12 @@ class GrpcLedgerClient(val grpcClient: LedgerClient, val applicationId: Applicat
|
||||
ec: ExecutionContext,
|
||||
esf: ExecutionSequencerFactory,
|
||||
mat: Materializer,
|
||||
): Future[List[User]] =
|
||||
grpcClient.userManagementClient.listUsers().map(_.toList)
|
||||
): Future[List[User]] = {
|
||||
// TODO https://github.com/digital-asset/daml/issues/12663 participant user management: Emulating no-pagination
|
||||
grpcClient.userManagementClient.listUsers(pageToken = "", pageSize = 10000).map {
|
||||
case (users, _) => users.toList
|
||||
}
|
||||
}
|
||||
|
||||
override def grantUserRights(
|
||||
id: UserId,
|
||||
|
@ -604,7 +604,8 @@ test('user API', async () => {
|
||||
expect(await ledger.grantUserRights(niceUser, [ UserRightHelper.participantAdmin ])).toEqual([ UserRightHelper.participantAdmin ])
|
||||
expect(await ledger.revokeUserRights(niceUser, [ UserRightHelper.participantAdmin, UserRightHelper.canActAs(ALICE_PARTY) ])).toEqual([ UserRightHelper.participantAdmin, UserRightHelper.canActAs(ALICE_PARTY) ])
|
||||
|
||||
expect((await ledger.listUsers()).map(it => it.userId)).toEqual([ "participant_admin", niceUser ])
|
||||
const allUserIds = (await ledger.listUsers()).map(it => it.userId)
|
||||
expect(_.sortBy(allUserIds)).toEqual([niceUser, "participant_admin"])
|
||||
await ledger.deleteUser(niceUser)
|
||||
expect((await ledger.listUsers()).map(it => it.userId)).toEqual([ "participant_admin" ])
|
||||
|
||||
|
@ -123,11 +123,24 @@ message DeleteUserResponse {
|
||||
|
||||
// Required authorization: ``HasRight(ParticipantAdmin)``
|
||||
message ListUsersRequest {
|
||||
|
||||
// Pagination token to determine the specific page to fetch.
|
||||
// Leave empty to fetch the first page.
|
||||
string page_token = 2;
|
||||
|
||||
// Maximum number of results to be returned by the server. The server will return no more than that many results, but it might return fewer.
|
||||
// If 0, the server will decide the number of results to be returned.
|
||||
int32 page_size = 3;
|
||||
|
||||
}
|
||||
|
||||
message ListUsersResponse {
|
||||
// All users of the participant node.
|
||||
// A subset of users of the participant node that fit into this page.
|
||||
repeated User users = 1;
|
||||
|
||||
// Pagination token to retrieve the next page.
|
||||
// Empty, if there are no further results.
|
||||
string next_page_token = 2;
|
||||
}
|
||||
|
||||
// Add the rights to the set of rights granted to the user.
|
||||
|
@ -61,6 +61,9 @@ message UserManagementFeature {
|
||||
// Whether the Ledger API server provides the user management service.
|
||||
bool supported = 1;
|
||||
// The maximum number of rights that can be assigned to a single user.
|
||||
// Value of 0 means that no limit is being enforced.
|
||||
// Value of 0 means that no rights per user limit is enforced.
|
||||
uint32 max_rights_per_user = 2;
|
||||
// The maximum number of users the server can return in a single response (page).
|
||||
// Value of 0 means that no page size limit is enforced.
|
||||
int32 max_users_page_size = 3;
|
||||
}
|
||||
|
@ -498,7 +498,10 @@ class Endpoints(
|
||||
for {
|
||||
jwt <- eitherT(input(req)).bimap(identity[Error], _._1)
|
||||
users <- EitherT.rightT(
|
||||
userManagementClient.listUsers(Some(jwt.value))
|
||||
// TODO participant user management: Emulating no-pagination
|
||||
userManagementClient.listUsers(Some(jwt.value), pageToken = "", pageSize = 10000).map {
|
||||
case (users, _) => users
|
||||
}
|
||||
)
|
||||
} yield domain.OkResponse(users.map(domain.UserDetails.fromUser).toList)
|
||||
|
||||
|
@ -22,6 +22,15 @@ trait ContextualizedErrorLogger {
|
||||
def error(message: String, throwable: Throwable): Unit
|
||||
}
|
||||
|
||||
object DamlContextualizedErrorLogger {
|
||||
|
||||
def forTesting(clazz: Class[_]) = new DamlContextualizedErrorLogger(
|
||||
ContextualizedLogger.get(clazz),
|
||||
LoggingContext.ForTesting,
|
||||
None,
|
||||
)
|
||||
}
|
||||
|
||||
/** Implementation of [[ContextualizedErrorLogger]] leveraging the //libs-scala/contextualized-logging
|
||||
* as the logging stack.
|
||||
*
|
||||
@ -35,6 +44,7 @@ class DamlContextualizedErrorLogger(
|
||||
loggingContext: LoggingContext,
|
||||
val correlationId: Option[String],
|
||||
) extends ContextualizedErrorLogger {
|
||||
|
||||
override def properties: Map[String, String] =
|
||||
loggingContext.entries.contents.view.map { case (key, value) =>
|
||||
key -> loggingValueToString(value)
|
||||
|
@ -52,11 +52,15 @@ final class UserManagementClient(service: UserManagementServiceStub)(implicit
|
||||
.deleteUser(proto.DeleteUserRequest(userId.toString))
|
||||
.map(_ => ())
|
||||
|
||||
def listUsers(token: Option[String] = None): Future[Seq[User]] =
|
||||
def listUsers(
|
||||
token: Option[String] = None,
|
||||
pageToken: String,
|
||||
pageSize: Int,
|
||||
): Future[(Seq[User], String)] =
|
||||
LedgerClient
|
||||
.stub(service, token)
|
||||
.listUsers(proto.ListUsersRequest())
|
||||
.map(_.users.view.map(fromProtoUser).toSeq)
|
||||
.listUsers(proto.ListUsersRequest(pageToken = pageToken, pageSize = pageSize))
|
||||
.map(res => res.users.view.map(fromProtoUser).toSeq -> res.nextPageToken)
|
||||
|
||||
def grantUserRights(
|
||||
userId: UserId,
|
||||
|
@ -277,13 +277,14 @@ final class UserManagementServiceIT extends LedgerTestSuite {
|
||||
_ <- ledger.userManagement.createUser(
|
||||
CreateUserRequest(Some(user1), Nil)
|
||||
)
|
||||
res1 <- ledger.userManagement.listUsers(ListUsersRequest())
|
||||
request = ListUsersRequest(pageSize = 100, pageToken = "")
|
||||
res1 <- ledger.userManagement.listUsers(request)
|
||||
res2 <- ledger.userManagement.createUser(
|
||||
CreateUserRequest(Some(user2), Nil)
|
||||
)
|
||||
res3 <- ledger.userManagement.listUsers(ListUsersRequest())
|
||||
res3 <- ledger.userManagement.listUsers(request)
|
||||
res4 <- ledger.userManagement.deleteUser(DeleteUserRequest(userId2))
|
||||
res5 <- ledger.userManagement.listUsers(ListUsersRequest())
|
||||
res5 <- ledger.userManagement.listUsers(request)
|
||||
} yield {
|
||||
def filterUsers(users: Iterable[User]) = users.filter(u => u.id == userId1 || u.id == userId2)
|
||||
|
||||
@ -298,6 +299,129 @@ final class UserManagementServiceIT extends LedgerTestSuite {
|
||||
}
|
||||
})
|
||||
|
||||
userManagementTest(
|
||||
"TestPagedListUsers",
|
||||
"Exercise paging behavior of ListUsers rpc",
|
||||
)(implicit ec => { implicit ledger =>
|
||||
val userId1 = ledger.nextUserId()
|
||||
val userId2 = ledger.nextUserId()
|
||||
val userId3 = ledger.nextUserId()
|
||||
val userId4 = ledger.nextUserId()
|
||||
val userId5 = ledger.nextUserId()
|
||||
val userId6 = ledger.nextUserId()
|
||||
val user1 = User(userId1, "")
|
||||
val user2 = User(userId2, "")
|
||||
val user3 = User(userId3, "")
|
||||
val user4 = User(userId4, "")
|
||||
val user5 = User(userId5, "")
|
||||
val user6 = User(userId6, "")
|
||||
for {
|
||||
// Ensure we have at least 6 users:
|
||||
_ <- ledger.userManagement.createUser(CreateUserRequest(Some(user1), Nil))
|
||||
_ <- ledger.userManagement.createUser(CreateUserRequest(Some(user2), Nil))
|
||||
_ <- ledger.userManagement.createUser(CreateUserRequest(Some(user3), Nil))
|
||||
_ <- ledger.userManagement.createUser(CreateUserRequest(Some(user4), Nil))
|
||||
_ <- ledger.userManagement.createUser(CreateUserRequest(Some(user5), Nil))
|
||||
_ <- ledger.userManagement.createUser(CreateUserRequest(Some(user6), Nil))
|
||||
// Requesting first page:
|
||||
res1 <- ledger.userManagement.listUsers(ListUsersRequest(pageSize = 2, pageToken = ""))
|
||||
// Requesting second page:
|
||||
res2 <- ledger.userManagement.listUsers(
|
||||
ListUsersRequest(pageSize = 3, pageToken = res1.nextPageToken)
|
||||
)
|
||||
// Requesting last non-empty page of users
|
||||
res3 <- ledger.userManagement.listUsers(
|
||||
ListUsersRequest(pageSize = 1000, pageToken = res2.nextPageToken)
|
||||
)
|
||||
// Requesting last page that is empty
|
||||
res4 <- ledger.userManagement.listUsers(
|
||||
ListUsersRequest(pageSize = 100, pageToken = res3.nextPageToken)
|
||||
)
|
||||
// Using not base64 encoded string as a page token
|
||||
onBadTokenError <- ledger.userManagement
|
||||
.listUsers(
|
||||
ListUsersRequest(pageSize = 100, pageToken = UUID.randomUUID().toString)
|
||||
)
|
||||
.mustFail("using not base64 encoded string")
|
||||
// Using negative pageSize
|
||||
onNegativePageSizeError <- ledger.userManagement
|
||||
.listUsers(
|
||||
ListUsersRequest(pageSize = -100, pageToken = "")
|
||||
)
|
||||
.mustFail("using negative page size")
|
||||
// 0 pageSize
|
||||
responseZeroPageSize <- ledger.userManagement.listUsers(
|
||||
ListUsersRequest(pageSize = 0, pageToken = "")
|
||||
)
|
||||
} yield {
|
||||
assert(res1.nextPageToken.nonEmpty, s"First next page token should be non-empty")
|
||||
assertLength("first page", 2, res1.users)
|
||||
|
||||
assert(res2.nextPageToken.nonEmpty, s"Second next page token should be non-empty")
|
||||
assertLength("second page", 3, res2.users)
|
||||
|
||||
assert(res3.nextPageToken.nonEmpty, s"Third next page token should be non-empty")
|
||||
assert(res2.users.nonEmpty, s"Third page should be non-empty")
|
||||
|
||||
assertEquals(
|
||||
s"Last next page token should be empty but was: ${res4.nextPageToken}",
|
||||
res4.nextPageToken,
|
||||
"",
|
||||
)
|
||||
assert(res4.users.isEmpty, s"Last page should be empty but was: ${res4.users}")
|
||||
assertGrpcError(
|
||||
participant = ledger,
|
||||
t = onBadTokenError,
|
||||
expectedCode = Status.Code.INVALID_ARGUMENT,
|
||||
selfServiceErrorCode = LedgerApiErrors.RequestValidation.InvalidArgument,
|
||||
exceptionMessageSubstring = None,
|
||||
)
|
||||
assertGrpcError(
|
||||
participant = ledger,
|
||||
t = onNegativePageSizeError,
|
||||
expectedCode = Status.Code.INVALID_ARGUMENT,
|
||||
selfServiceErrorCode = LedgerApiErrors.RequestValidation.InvalidArgument,
|
||||
exceptionMessageSubstring = None,
|
||||
)
|
||||
assert(
|
||||
responseZeroPageSize.nextPageToken.nonEmpty,
|
||||
"Non-empty page token when pageSize is 0 (and there are some users)",
|
||||
)
|
||||
}
|
||||
})
|
||||
|
||||
test(
|
||||
"TestMaxUsersPageSize",
|
||||
"Exercise max users page size behavior of ListUsers rpc",
|
||||
allocate(NoParties),
|
||||
enabled = _.userManagement.maxUsersPageSize > 0,
|
||||
disabledReason = "requires user management feature with users page size limit",
|
||||
)(implicit ec => { case Participants(Participant(ledger)) =>
|
||||
val maxUsersPageSize = ledger.features.userManagement.maxUsersPageSize
|
||||
val users = 1.to(maxUsersPageSize + 1).map(_ => User(ledger.nextUserId(), ""))
|
||||
for {
|
||||
// create users
|
||||
_ <- Future.sequence(
|
||||
users.map(u => ledger.userManagement.createUser(CreateUserRequest(Some(u), Nil)))
|
||||
)
|
||||
// request page size greater than the server's limit
|
||||
page <- ledger.userManagement
|
||||
.listUsers(
|
||||
ListUsersRequest(pageSize = maxUsersPageSize + 1, pageToken = "")
|
||||
)
|
||||
// cleanup
|
||||
_ <- Future.sequence(
|
||||
users.map(u => ledger.userManagement.deleteUser(DeleteUserRequest(u.id)))
|
||||
)
|
||||
|
||||
} yield {
|
||||
assert(
|
||||
page.users.size <= maxUsersPageSize,
|
||||
s"page size must be within limit. actual size: ${page.users.size}, server's limit: $maxUsersPageSize",
|
||||
)
|
||||
}
|
||||
})
|
||||
|
||||
userManagementTest(
|
||||
"TestGrantUserRights",
|
||||
"Exercise GrantUserRights rpc",
|
||||
|
@ -147,6 +147,7 @@ da_scala_library(
|
||||
"@maven//:com_typesafe_akka_akka_stream",
|
||||
"@maven//:org_scalacheck_scalacheck",
|
||||
"@maven//:org_scalactic_scalactic",
|
||||
"@maven//:org_scalatest_scalatest_freespec",
|
||||
"@maven//:org_scalatest_scalatest_core",
|
||||
"@maven//:org_scalatest_scalatest_flatspec",
|
||||
"@maven//:org_scalatest_scalatest_matchers_core",
|
||||
@ -183,6 +184,7 @@ da_scala_library(
|
||||
"//ledger/ledger-configuration",
|
||||
"//ledger/ledger-offset",
|
||||
"//ledger/ledger-resources",
|
||||
"//ledger/ledger-resources:ledger-resources-test-lib",
|
||||
"//ledger/metrics",
|
||||
"//ledger/participant-state",
|
||||
"//ledger/participant-state-index",
|
||||
|
@ -8,10 +8,12 @@ import com.daml.lf.data.Ref
|
||||
import com.daml.platform.apiserver.SeedService.Seeding
|
||||
import com.daml.platform.configuration.{IndexConfiguration, InitialLedgerConfiguration}
|
||||
import com.daml.ports.Port
|
||||
|
||||
import java.io.File
|
||||
import java.nio.file.Path
|
||||
import java.time.Duration
|
||||
|
||||
import com.daml.platform.usermanagement.UserManagementConfig
|
||||
|
||||
import scala.concurrent.duration.FiniteDuration
|
||||
|
||||
case class ApiServerConfig(
|
||||
@ -41,5 +43,5 @@ case class ApiServerConfig(
|
||||
maxTransactionsInMemoryFanOutBufferSize: Long,
|
||||
enableInMemoryFanOutForLedgerApi: Boolean,
|
||||
enableSelfServiceErrorCodes: Boolean,
|
||||
enableUserManagement: Boolean,
|
||||
userManagementConfig: UserManagementConfig,
|
||||
)
|
||||
|
@ -42,8 +42,10 @@ import com.daml.platform.services.time.TimeProviderType
|
||||
import com.daml.telemetry.TelemetryContext
|
||||
import io.grpc.BindableService
|
||||
import io.grpc.protobuf.services.ProtoReflectionService
|
||||
|
||||
import java.time.Duration
|
||||
|
||||
import com.daml.platform.usermanagement.UserManagementConfig
|
||||
|
||||
import scala.collection.immutable
|
||||
import scala.concurrent.duration.{Duration => ScalaDuration}
|
||||
import scala.concurrent.{ExecutionContext, Future}
|
||||
@ -87,7 +89,7 @@ private[daml] object ApiServices {
|
||||
enableSelfServiceErrorCodes: Boolean,
|
||||
checkOverloaded: TelemetryContext => Option[state.SubmissionResult],
|
||||
ledgerFeatures: LedgerFeatures,
|
||||
enableUserManagement: Boolean,
|
||||
userManagementConfig: UserManagementConfig,
|
||||
)(implicit
|
||||
materializer: Materializer,
|
||||
esf: ExecutionSequencerFactory,
|
||||
@ -155,7 +157,7 @@ private[daml] object ApiServices {
|
||||
ApiVersionService.create(
|
||||
enableSelfServiceErrorCodes,
|
||||
ledgerFeatures,
|
||||
enableUserManagement = enableUserManagement,
|
||||
userManagementConfig = userManagementConfig,
|
||||
)
|
||||
|
||||
val apiPackageService =
|
||||
@ -201,9 +203,13 @@ private[daml] object ApiServices {
|
||||
val apiHealthService = new GrpcHealthService(healthChecks, errorsVersionsSwitcher)
|
||||
|
||||
val maybeApiUserManagementService: Option[UserManagementServiceAuthorization] =
|
||||
if (enableUserManagement) {
|
||||
if (userManagementConfig.enabled) {
|
||||
val apiUserManagementService =
|
||||
new ApiUserManagementService(userManagementStore, errorsVersionsSwitcher)
|
||||
new ApiUserManagementService(
|
||||
userManagementStore,
|
||||
errorsVersionsSwitcher,
|
||||
maxUsersPageSize = userManagementConfig.maxUsersPageSize,
|
||||
)
|
||||
val authorized =
|
||||
new UserManagementServiceAuthorization(apiUserManagementService, authorizer)
|
||||
Some(authorized)
|
||||
|
@ -117,7 +117,7 @@ object StandaloneApiServer {
|
||||
checkOverloaded = checkOverloaded,
|
||||
userManagementStore = userManagementStore,
|
||||
ledgerFeatures = ledgerFeatures,
|
||||
enableUserManagement = config.enableUserManagement,
|
||||
userManagementConfig = config.userManagementConfig,
|
||||
)(materializer, executionSequencerFactory, loggingContext)
|
||||
.map(_.withServices(otherServices))
|
||||
apiServer <- new LedgerApiServer(
|
||||
@ -128,7 +128,7 @@ object StandaloneApiServer {
|
||||
config.tlsConfig,
|
||||
AuthorizationInterceptor(
|
||||
authService,
|
||||
Option.when(config.enableUserManagement)(userManagementStore),
|
||||
Option.when(config.userManagementConfig.enabled)(userManagementStore),
|
||||
servicesExecutionContext,
|
||||
errorCodesVersionSwitcher,
|
||||
) :: otherInterceptors,
|
||||
|
@ -37,7 +37,7 @@ import scala.util.control.NonFatal
|
||||
private[apiserver] final class ApiVersionService private (
|
||||
enableSelfServiceErrorCodes: Boolean,
|
||||
ledgerFeatures: LedgerFeatures,
|
||||
enableUserManagement: Boolean,
|
||||
userManagementConfig: UserManagementConfig,
|
||||
)(implicit
|
||||
loggingContext: LoggingContext,
|
||||
executionContext: ExecutionContext,
|
||||
@ -56,10 +56,19 @@ private[apiserver] final class ApiVersionService private (
|
||||
private val featuresDescriptor =
|
||||
FeaturesDescriptor.of(
|
||||
userManagement = Some(
|
||||
if (userManagementConfig.enabled) {
|
||||
UserManagementFeature(
|
||||
supported = enableUserManagement,
|
||||
maxRightsPerUser = if (enableUserManagement) UserManagementConfig.MaxRightsPerUser else 0,
|
||||
supported = true,
|
||||
maxRightsPerUser = UserManagementConfig.MaxRightsPerUser,
|
||||
maxUsersPageSize = userManagementConfig.maxUsersPageSize,
|
||||
)
|
||||
} else {
|
||||
UserManagementFeature(
|
||||
supported = false,
|
||||
maxRightsPerUser = 0,
|
||||
maxUsersPageSize = 0,
|
||||
)
|
||||
}
|
||||
),
|
||||
experimental = Some(
|
||||
ExperimentalFeatures.of(
|
||||
@ -112,11 +121,11 @@ private[apiserver] object ApiVersionService {
|
||||
def create(
|
||||
enableSelfServiceErrorCodes: Boolean,
|
||||
ledgerFeatures: LedgerFeatures,
|
||||
enableUserManagement: Boolean,
|
||||
userManagementConfig: UserManagementConfig,
|
||||
)(implicit loggingContext: LoggingContext, ec: ExecutionContext): ApiVersionService =
|
||||
new ApiVersionService(
|
||||
enableSelfServiceErrorCodes,
|
||||
ledgerFeatures,
|
||||
enableUserManagement = enableUserManagement,
|
||||
userManagementConfig = userManagementConfig,
|
||||
)
|
||||
}
|
||||
|
@ -3,6 +3,9 @@
|
||||
|
||||
package com.daml.platform.apiserver.services.admin
|
||||
|
||||
import java.nio.charset.StandardCharsets
|
||||
import java.util.Base64
|
||||
|
||||
import com.daml.error.definitions.LedgerApiErrors
|
||||
import com.daml.error.{
|
||||
ContextualizedErrorLogger,
|
||||
@ -12,6 +15,8 @@ import com.daml.error.{
|
||||
import com.daml.ledger.api.domain._
|
||||
import com.daml.ledger.api.v1.admin.{user_management_service => proto}
|
||||
import com.daml.ledger.participant.state.index.v2.UserManagementStore
|
||||
import com.daml.ledger.participant.state.index.v2.UserManagementStore.UsersPage
|
||||
import com.daml.lf.data.Ref
|
||||
import com.daml.logging.{ContextualizedLogger, LoggingContext}
|
||||
import com.daml.platform.api.grpc.GrpcApiService
|
||||
import com.daml.platform.server.api.validation.{ErrorFactories, FieldValidations}
|
||||
@ -21,15 +26,18 @@ import scalaz.syntax.traverse._
|
||||
import scalaz.std.list._
|
||||
|
||||
import scala.concurrent.{ExecutionContext, Future}
|
||||
import scala.util.Try
|
||||
|
||||
private[apiserver] final class ApiUserManagementService(
|
||||
userManagementStore: UserManagementStore,
|
||||
errorCodesVersionSwitcher: ErrorCodesVersionSwitcher,
|
||||
maxUsersPageSize: Int,
|
||||
)(implicit
|
||||
executionContext: ExecutionContext,
|
||||
loggingContext: LoggingContext,
|
||||
) extends proto.UserManagementServiceGrpc.UserManagementService
|
||||
with GrpcApiService {
|
||||
|
||||
import ApiUserManagementService._
|
||||
|
||||
private implicit val logger: ContextualizedLogger = ContextualizedLogger.get(this.getClass)
|
||||
@ -37,6 +45,7 @@ private[apiserver] final class ApiUserManagementService(
|
||||
private implicit val contextualizedErrorLogger: ContextualizedErrorLogger =
|
||||
new DamlContextualizedErrorLogger(logger, loggingContext, None)
|
||||
private val fieldValidations = FieldValidations(errorFactories)
|
||||
|
||||
import fieldValidations._
|
||||
|
||||
override def close(): Unit = ()
|
||||
@ -82,14 +91,33 @@ private[apiserver] final class ApiUserManagementService(
|
||||
.map(_ => proto.DeleteUserResponse())
|
||||
)
|
||||
|
||||
override def listUsers(request: proto.ListUsersRequest): Future[proto.ListUsersResponse] =
|
||||
userManagementStore
|
||||
.listUsers()
|
||||
.flatMap(handleResult("listing users"))
|
||||
.map(
|
||||
_.map(toProtoUser)
|
||||
override def listUsers(request: proto.ListUsersRequest): Future[proto.ListUsersResponse] = {
|
||||
withValidation(
|
||||
for {
|
||||
fromExcl <- decodePageToken(request.pageToken)
|
||||
rawPageSize <- Either.cond(
|
||||
request.pageSize >= 0,
|
||||
request.pageSize,
|
||||
LedgerApiErrors.RequestValidation.InvalidArgument
|
||||
.Reject("Max page size must be non-negative")
|
||||
.asGrpcError,
|
||||
)
|
||||
.map(proto.ListUsersResponse(_))
|
||||
pageSize =
|
||||
if (rawPageSize == 0) maxUsersPageSize
|
||||
else Math.min(request.pageSize, maxUsersPageSize)
|
||||
} yield {
|
||||
(fromExcl, pageSize)
|
||||
}
|
||||
) { case (fromExcl, pageSize) =>
|
||||
userManagementStore
|
||||
.listUsers(fromExcl, pageSize)
|
||||
.flatMap(handleResult("listing users"))
|
||||
.map { page: UserManagementStore.UsersPage =>
|
||||
val protoUsers = page.users.map(toProtoUser)
|
||||
proto.ListUsersResponse(protoUsers, encodeNextPageToken(page))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
override def grantUserRights(
|
||||
request: proto.GrantUserRightsRequest
|
||||
@ -210,4 +238,45 @@ object ApiUserManagementService {
|
||||
proto.Right(proto.Right.Kind.CanReadAs(proto.Right.CanReadAs(party)))
|
||||
}
|
||||
|
||||
def encodeNextPageToken(page: UsersPage): String =
|
||||
page.lastUserIdOption
|
||||
.map { id =>
|
||||
val bytes = Base64.getUrlEncoder.encode(id.getBytes(StandardCharsets.UTF_8))
|
||||
new String(bytes, StandardCharsets.UTF_8)
|
||||
}
|
||||
.getOrElse("")
|
||||
|
||||
def decodePageToken(pageToken: String)(implicit
|
||||
loggingContext: ContextualizedErrorLogger
|
||||
): Either[StatusRuntimeException, Option[Ref.UserId]] = {
|
||||
if (pageToken.isEmpty) {
|
||||
Right(None)
|
||||
} else {
|
||||
val bytes = pageToken.getBytes(StandardCharsets.UTF_8)
|
||||
for {
|
||||
decodedBytes <- Try[Array[Byte]](Base64.getUrlDecoder.decode(bytes))
|
||||
.map(Right(_))
|
||||
.recover { case _: IllegalArgumentException =>
|
||||
Left(
|
||||
LedgerApiErrors.RequestValidation.InvalidArgument
|
||||
.Reject("Invalid page token")
|
||||
.asGrpcError
|
||||
)
|
||||
}
|
||||
.get
|
||||
decodedStr = new String(decodedBytes, StandardCharsets.UTF_8)
|
||||
userId <- Ref.UserId
|
||||
.fromString(decodedStr)
|
||||
.map(Some(_))
|
||||
.left
|
||||
.map(_ =>
|
||||
LedgerApiErrors.RequestValidation.InvalidArgument
|
||||
.Reject("Invalid page token")
|
||||
.asGrpcError
|
||||
)
|
||||
} yield {
|
||||
userId
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -397,7 +397,9 @@ trait UserManagementStorageBackend {
|
||||
|
||||
def getUser(id: UserId)(connection: Connection): Option[UserManagementStorageBackend.DbUser]
|
||||
|
||||
def getUsers()(connection: Connection): Vector[User]
|
||||
def getUsersOrderedById(fromExcl: Option[UserId] = None, maxResults: Int)(
|
||||
connection: Connection
|
||||
): Vector[User]
|
||||
|
||||
/** @return true if the right didn't exist and we have just added it.
|
||||
*/
|
||||
|
@ -379,7 +379,7 @@ abstract class EventStorageBackendTemplate(
|
||||
$additionalAndClause
|
||||
$witnessesWhereClause
|
||||
ORDER BY event_sequential_id
|
||||
${queryStrategy.limitClause(limit)}"""
|
||||
${QueryStrategy.limitClause(limit)}"""
|
||||
.withFetchSize(fetchSizeHint)
|
||||
.asVectorOf(rowParser(internedAllParties))(connection)
|
||||
}
|
||||
@ -442,7 +442,7 @@ abstract class EventStorageBackendTemplate(
|
||||
filters.party_id,
|
||||
$templateIdOrderingClause
|
||||
filters.event_sequential_id -- deliver in index order
|
||||
${queryStrategy.limitClause(Some(limit))}
|
||||
${QueryStrategy.limitClause(Some(limit))}
|
||||
"""
|
||||
.asVectorOf(long("event_sequential_id"))(connection)
|
||||
}
|
||||
|
@ -5,14 +5,7 @@ package com.daml.platform.store.backend.common
|
||||
|
||||
import com.daml.platform.store.backend.common.ComposableQuery.{CompositeSql, SqlStringInterpolation}
|
||||
|
||||
trait QueryStrategy {
|
||||
|
||||
/** An expression resulting to a boolean, to check equality between two SQL expressions
|
||||
*
|
||||
* @return plain SQL which fits the query template
|
||||
*/
|
||||
def columnEqualityBoolean(column: String, value: String): String =
|
||||
s"""$column = $value"""
|
||||
object QueryStrategy {
|
||||
|
||||
/** This populates the following part of the query:
|
||||
* SELECT ... WHERE ... ORDER BY ... [THIS PART]
|
||||
@ -25,6 +18,17 @@ trait QueryStrategy {
|
||||
.map(to => cSQL"fetch next $to rows only")
|
||||
.getOrElse(cSQL"")
|
||||
|
||||
}
|
||||
|
||||
trait QueryStrategy {
|
||||
|
||||
/** An expression resulting to a boolean, to check equality between two SQL expressions
|
||||
*
|
||||
* @return plain SQL which fits the query template
|
||||
*/
|
||||
def columnEqualityBoolean(column: String, value: String): String =
|
||||
s"""$column = $value"""
|
||||
|
||||
/** An expression resulting to a boolean to check whether:
|
||||
* - the party set defined by columnName and
|
||||
* - the party set defined by parties
|
||||
|
@ -70,18 +70,22 @@ object UserManagementStorageBackendTemplate extends UserManagementStorageBackend
|
||||
}
|
||||
}
|
||||
|
||||
override def getUsers()(connection: Connection): Vector[domain.User] = {
|
||||
def domainUser(userId: String, primaryParty: Option[String]): domain.User = {
|
||||
domain.User(
|
||||
Ref.UserId.assertFromString(userId),
|
||||
primaryParty.map(Ref.Party.assertFromString),
|
||||
)
|
||||
override def getUsersOrderedById(fromExcl: Option[UserId], maxResults: Int)(
|
||||
connection: Connection
|
||||
): Vector[domain.User] = {
|
||||
import com.daml.platform.store.backend.common.ComposableQuery.SqlStringInterpolation
|
||||
val whereClause = fromExcl match {
|
||||
case None => cSQL""
|
||||
case Some(id: String) => cSQL"WHERE user_id > ${id}"
|
||||
}
|
||||
SQL"""SELECT internal_id, user_id, primary_party
|
||||
FROM participant_users"""
|
||||
SQL"""SELECT user_id, primary_party
|
||||
FROM participant_users
|
||||
$whereClause
|
||||
ORDER BY user_id
|
||||
${QueryStrategy.limitClause(Some(maxResults))}"""
|
||||
.asVectorOf(ParticipantUserParser2)(connection)
|
||||
.map { case (userId, primaryPartyRaw) =>
|
||||
domainUser(userId, dbStringToPartyString(primaryPartyRaw))
|
||||
toDomainUser(userId, dbStringToPartyString(primaryPartyRaw))
|
||||
}
|
||||
}
|
||||
|
||||
@ -199,4 +203,11 @@ object UserManagementStorageBackendTemplate extends UserManagementStorageBackend
|
||||
}
|
||||
}
|
||||
|
||||
private def toDomainUser(userId: String, primaryParty: Option[String]): domain.User = {
|
||||
domain.User(
|
||||
Ref.UserId.assertFromString(userId),
|
||||
primaryParty.map(Ref.Party.assertFromString),
|
||||
)
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -57,4 +57,5 @@ object OracleStorageBackendFactory extends StorageBackendFactory with CommonStor
|
||||
|
||||
override val createResetStorageBackend: ResetStorageBackend =
|
||||
OracleResetStorageBackend
|
||||
|
||||
}
|
||||
|
@ -10,7 +10,7 @@ import com.daml.caching.CaffeineCache
|
||||
import com.daml.ledger.api.domain
|
||||
import com.daml.ledger.api.domain.User
|
||||
import com.daml.ledger.participant.state.index.v2.UserManagementStore
|
||||
import com.daml.ledger.participant.state.index.v2.UserManagementStore.{Result, UserInfo, Users}
|
||||
import com.daml.ledger.participant.state.index.v2.UserManagementStore.{Result, UserInfo}
|
||||
import com.daml.lf.data.Ref
|
||||
import com.daml.lf.data.Ref.UserId
|
||||
import com.daml.metrics.Metrics
|
||||
@ -85,9 +85,11 @@ class CachedUserManagementStore(
|
||||
.andThen(invalidateOnSuccess(id))
|
||||
}
|
||||
|
||||
override def listUsers(): Future[Result[Users]] = {
|
||||
delegate.listUsers()
|
||||
}
|
||||
override def listUsers(
|
||||
fromExcl: Option[Ref.UserId],
|
||||
maxResults: Int,
|
||||
): Future[Result[UserManagementStore.UsersPage]] =
|
||||
delegate.listUsers(fromExcl, maxResults)
|
||||
|
||||
private def invalidateOnSuccess(id: UserId): PartialFunction[Try[Result[Any]], Unit] = {
|
||||
case Success(Right(_)) => cache.invalidate(id)
|
||||
|
@ -13,7 +13,7 @@ import com.daml.ledger.participant.state.index.v2.UserManagementStore.{
|
||||
UserExists,
|
||||
UserInfo,
|
||||
UserNotFound,
|
||||
Users,
|
||||
UsersPage,
|
||||
}
|
||||
import com.daml.lf.data.Ref
|
||||
import com.daml.lf.data.Ref.UserId
|
||||
@ -29,18 +29,22 @@ object UserManagementConfig {
|
||||
|
||||
val DefaultMaxCacheSize = 100
|
||||
val DefaultCacheExpiryAfterWriteInSeconds = 5
|
||||
val DefaultMaxUsersPageSize = 1000
|
||||
|
||||
val MaxRightsPerUser = 1000
|
||||
|
||||
def default(enabled: Boolean): UserManagementConfig = UserManagementConfig(
|
||||
enabled = enabled,
|
||||
maxCacheSize = DefaultMaxCacheSize,
|
||||
cacheExpiryAfterWriteInSeconds = DefaultCacheExpiryAfterWriteInSeconds,
|
||||
maxUsersPageSize = DefaultMaxUsersPageSize,
|
||||
)
|
||||
}
|
||||
final case class UserManagementConfig(
|
||||
enabled: Boolean,
|
||||
maxCacheSize: Int,
|
||||
cacheExpiryAfterWriteInSeconds: Int,
|
||||
maxUsersPageSize: Int,
|
||||
)
|
||||
|
||||
object PersistentUserManagementStore {
|
||||
@ -182,9 +186,16 @@ class PersistentUserManagementStore(
|
||||
|
||||
}
|
||||
|
||||
override def listUsers(): Future[Result[Users]] = {
|
||||
override def listUsers(
|
||||
fromExcl: Option[Ref.UserId],
|
||||
maxResults: Int,
|
||||
): Future[Result[UsersPage]] = {
|
||||
inTransaction(_.listUsers) { connection =>
|
||||
Right(backend.getUsers()(connection))
|
||||
val users: Seq[domain.User] = fromExcl match {
|
||||
case None => backend.getUsersOrderedById(None, maxResults)(connection)
|
||||
case Some(fromExcl) => backend.getUsersOrderedById(Some(fromExcl), maxResults)(connection)
|
||||
}
|
||||
Right(UsersPage(users = users))
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -96,22 +96,86 @@ private[backend] trait StorageBackendTestsUserManagement
|
||||
getNonexistent shouldBe None
|
||||
}
|
||||
|
||||
it should "get users (getUsers)" in {
|
||||
val user1 = newUniqueUser()
|
||||
val user2 = newUniqueUser()
|
||||
val emptyUsers = executeSql(tested.getUsers())
|
||||
it should "get all users (getUsers) ordered by id" in {
|
||||
val user1 = newUniqueUser(userId = "user_id_1")
|
||||
val user2 = newUniqueUser(userId = "user_id_2")
|
||||
val user3 = newUniqueUser(userId = "user_id_3")
|
||||
executeSql(tested.getUsersOrderedById(fromExcl = None, maxResults = 10)) shouldBe empty
|
||||
val _ = executeSql(tested.createUser(user3))
|
||||
val _ = executeSql(tested.createUser(user1))
|
||||
executeSql(tested.getUsersOrderedById(fromExcl = None, maxResults = 10)) shouldBe Seq(
|
||||
user1,
|
||||
user3,
|
||||
)
|
||||
val _ = executeSql(tested.createUser(user2))
|
||||
val allUsers = executeSql(tested.getUsers())
|
||||
emptyUsers shouldBe empty
|
||||
allUsers should contain theSameElementsAs Seq(user1, user2)
|
||||
executeSql(tested.getUsersOrderedById(fromExcl = None, maxResults = 10)) shouldBe Seq(
|
||||
user1,
|
||||
user2,
|
||||
user3,
|
||||
)
|
||||
}
|
||||
|
||||
it should "get a page of users (getUsers) ordered by id" in {
|
||||
val user1 = newUniqueUser(userId = "user_id_1")
|
||||
val user2 = newUniqueUser(userId = "user_id_2")
|
||||
val user3 = newUniqueUser(userId = "user_id_3")
|
||||
// Note: user4 doesn't exist and won't be created
|
||||
val user5 = newUniqueUser(userId = "user_id_5")
|
||||
val user6 = newUniqueUser(userId = "user_id_6")
|
||||
val user7 = newUniqueUser(userId = "user_id_7")
|
||||
executeSql(tested.getUsersOrderedById(fromExcl = None, maxResults = 10)) shouldBe empty
|
||||
// Creating users in a random order
|
||||
val _ = executeSql(tested.createUser(user5))
|
||||
val _ = executeSql(tested.createUser(user1))
|
||||
val _ = executeSql(tested.createUser(user7))
|
||||
val _ = executeSql(tested.createUser(user3))
|
||||
val _ = executeSql(tested.createUser(user6))
|
||||
val _ = executeSql(tested.createUser(user2))
|
||||
// Get first 2 elements
|
||||
executeSql(tested.getUsersOrderedById(fromExcl = None, maxResults = 2)) shouldBe Seq(
|
||||
user1,
|
||||
user2,
|
||||
)
|
||||
// Get 3 users after user1
|
||||
executeSql(tested.getUsersOrderedById(maxResults = 3, fromExcl = Some(user1.id))) shouldBe Seq(
|
||||
user2,
|
||||
user3,
|
||||
user5,
|
||||
)
|
||||
// Get up to 10000 users after user1
|
||||
executeSql(
|
||||
tested.getUsersOrderedById(maxResults = 10000, fromExcl = Some(user1.id))
|
||||
) shouldBe Seq(
|
||||
user2,
|
||||
user3,
|
||||
user5,
|
||||
user6,
|
||||
user7,
|
||||
)
|
||||
// Get some users after a non-existing user id
|
||||
executeSql(
|
||||
tested.getUsersOrderedById(
|
||||
maxResults = 2,
|
||||
fromExcl = Some(Ref.UserId.assertFromString("user_id_4")),
|
||||
)
|
||||
) shouldBe Seq(user5, user6)
|
||||
// Get no users when requesting with after set the last existing user
|
||||
executeSql(tested.getUsersOrderedById(maxResults = 2, fromExcl = Some(user7.id))) shouldBe empty
|
||||
// Get no users when requesting with after set beyond the last existing user
|
||||
executeSql(
|
||||
tested.getUsersOrderedById(
|
||||
maxResults = 2,
|
||||
fromExcl = Some(Ref.UserId.assertFromString("user_id_8")),
|
||||
)
|
||||
) shouldBe empty
|
||||
}
|
||||
|
||||
it should "handle adding rights to non-existent user" in {
|
||||
val nonExistentUserInternalId = 123
|
||||
val allUsers = executeSql(tested.getUsers())
|
||||
val _ = executeSql(tested.userRightExists(nonExistentUserInternalId, right2))
|
||||
val allUsers = executeSql(tested.getUsersOrderedById(maxResults = 10, fromExcl = None))
|
||||
val rightExists = executeSql(tested.userRightExists(nonExistentUserInternalId, right2))
|
||||
allUsers shouldBe empty
|
||||
rightExists shouldBe false
|
||||
}
|
||||
|
||||
it should "handle adding duplicate rights" in {
|
||||
@ -195,15 +259,19 @@ private[backend] trait StorageBackendTestsUserManagement
|
||||
rights2 shouldBe empty
|
||||
}
|
||||
|
||||
private def newUniqueUser(emptyPrimaryParty: Boolean = false): User = {
|
||||
private def newUniqueUser(
|
||||
emptyPrimaryParty: Boolean = false,
|
||||
userId: String = "",
|
||||
): User = {
|
||||
val uuid = UUID.randomUUID.toString
|
||||
val primaryParty =
|
||||
if (emptyPrimaryParty)
|
||||
None
|
||||
else
|
||||
Some(Ref.Party.assertFromString(s"primary_party_${uuid}"))
|
||||
val userIdStr = if (userId != "") userId else s"user_id_${uuid}"
|
||||
User(
|
||||
id = Ref.UserId.assertFromString(s"user_id_${uuid}"),
|
||||
id = Ref.UserId.assertFromString(userIdStr),
|
||||
primaryParty = primaryParty,
|
||||
)
|
||||
}
|
||||
|
@ -0,0 +1,270 @@
|
||||
// Copyright (c) 2022 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package com.daml.platform.store.platform.usermanagement
|
||||
|
||||
import com.daml.ledger.api.domain.{User, UserRight}
|
||||
import com.daml.ledger.participant.state.index.v2.UserManagementStore
|
||||
import com.daml.ledger.participant.state.index.v2.UserManagementStore.{
|
||||
UserExists,
|
||||
UserNotFound,
|
||||
UsersPage,
|
||||
}
|
||||
import com.daml.ledger.resources.TestResourceContext
|
||||
import com.daml.lf.data.Ref
|
||||
import com.daml.lf.data.Ref.{Party, UserId}
|
||||
import com.daml.logging.LoggingContext
|
||||
import org.scalatest.freespec.AsyncFreeSpec
|
||||
import org.scalatest.matchers.should.Matchers
|
||||
import org.scalatest.{Assertion, EitherValues}
|
||||
import scala.language.implicitConversions
|
||||
|
||||
import scala.concurrent.Future
|
||||
|
||||
/** Common tests for implementations of [[UserManagementStore]]
|
||||
*/
|
||||
trait UserManagementStoreSpecBase extends TestResourceContext with Matchers with EitherValues {
|
||||
self: AsyncFreeSpec =>
|
||||
|
||||
implicit val lc: LoggingContext = LoggingContext.ForTesting
|
||||
|
||||
private implicit def toParty(s: String): Party =
|
||||
Party.assertFromString(s)
|
||||
|
||||
private implicit def toUserId(s: String): UserId =
|
||||
UserId.assertFromString(s)
|
||||
|
||||
def testIt(f: UserManagementStore => Future[Assertion]): Future[Assertion]
|
||||
|
||||
"user management" - {
|
||||
"allow creating a fresh user" in {
|
||||
testIt { tested =>
|
||||
for {
|
||||
res1 <- tested.createUser(User(s"user1", None), Set.empty)
|
||||
res2 <- tested.createUser(User("user2", None), Set.empty)
|
||||
} yield {
|
||||
res1 shouldBe Right(())
|
||||
res2 shouldBe Right(())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
"disallow re-creating an existing user" in {
|
||||
testIt { tested =>
|
||||
val user = User("user1", None)
|
||||
for {
|
||||
res1 <- tested.createUser(user, Set.empty)
|
||||
res2 <- tested.createUser(user, Set.empty)
|
||||
} yield {
|
||||
res1 shouldBe Right(())
|
||||
res2 shouldBe Left(UserExists(user.id))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
"find a freshly created user" in {
|
||||
testIt { tested =>
|
||||
val user = User("user1", None)
|
||||
for {
|
||||
res1 <- tested.createUser(user, Set.empty)
|
||||
user1 <- tested.getUser(user.id)
|
||||
} yield {
|
||||
res1 shouldBe Right(())
|
||||
user1 shouldBe Right(user)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
"not find a non-existent user" in {
|
||||
testIt { tested =>
|
||||
val userId: Ref.UserId = "user1"
|
||||
for {
|
||||
user1 <- tested.getUser(userId)
|
||||
} yield {
|
||||
user1 shouldBe Left(UserNotFound(userId))
|
||||
}
|
||||
}
|
||||
}
|
||||
"not find a deleted user" in {
|
||||
testIt { tested =>
|
||||
val user = User("user1", None)
|
||||
for {
|
||||
res1 <- tested.createUser(user, Set.empty)
|
||||
user1 <- tested.getUser("user1")
|
||||
res2 <- tested.deleteUser("user1")
|
||||
user2 <- tested.getUser("user1")
|
||||
} yield {
|
||||
res1 shouldBe Right(())
|
||||
user1 shouldBe Right(user)
|
||||
res2 shouldBe Right(())
|
||||
user2 shouldBe Left(UserNotFound("user1"))
|
||||
}
|
||||
}
|
||||
}
|
||||
"allow recreating a deleted user" in {
|
||||
testIt { tested =>
|
||||
val user = User("user1", None)
|
||||
for {
|
||||
res1 <- tested.createUser(user, Set.empty)
|
||||
res2 <- tested.deleteUser(user.id)
|
||||
res3 <- tested.createUser(user, Set.empty)
|
||||
} yield {
|
||||
res1 shouldBe Right(())
|
||||
res2 shouldBe Right(())
|
||||
res3 shouldBe Right(())
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
"fail to delete a non-existent user" in {
|
||||
testIt { tested =>
|
||||
for {
|
||||
res1 <- tested.deleteUser("user1")
|
||||
} yield {
|
||||
res1 shouldBe Left(UserNotFound("user1"))
|
||||
}
|
||||
}
|
||||
}
|
||||
"list created users" in {
|
||||
testIt { tested =>
|
||||
for {
|
||||
_ <- tested.createUser(User("user1", None), Set.empty)
|
||||
_ <- tested.createUser(User("user2", None), Set.empty)
|
||||
_ <- tested.createUser(User("user3", None), Set.empty)
|
||||
_ <- tested.createUser(User("user4", None), Set.empty)
|
||||
list1 <- tested.listUsers(fromExcl = None, maxResults = 3)
|
||||
_ = list1 shouldBe Right(
|
||||
UsersPage(Seq(User("user1", None), User("user2", None), User("user3", None)))
|
||||
)
|
||||
list2 <- tested.listUsers(
|
||||
fromExcl = list1.getOrElse(fail("Expecting a Right()")).lastUserIdOption,
|
||||
maxResults = 4,
|
||||
)
|
||||
_ = list2 shouldBe Right(UsersPage(Seq(User("user4", None))))
|
||||
} yield {
|
||||
succeed
|
||||
}
|
||||
}
|
||||
}
|
||||
"not list deleted users" in {
|
||||
testIt { tested =>
|
||||
for {
|
||||
res1 <- tested.createUser(User("user1", None), Set.empty)
|
||||
res2 <- tested.createUser(User("user2", None), Set.empty)
|
||||
users1 <- tested.listUsers(fromExcl = None, maxResults = 10000)
|
||||
res3 <- tested.deleteUser("user1")
|
||||
users2 <- tested.listUsers(fromExcl = None, maxResults = 10000)
|
||||
} yield {
|
||||
res1 shouldBe Right(())
|
||||
res2 shouldBe Right(())
|
||||
users1 shouldBe Right(UsersPage(Seq(User("user1", None), User("user2", None))))
|
||||
res3 shouldBe Right(())
|
||||
users2 shouldBe Right(UsersPage(Seq(User("user2", None))))
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
"user rights management" - {
|
||||
import UserRight._
|
||||
"listUserRights should find the rights of a freshly created user" in {
|
||||
testIt { tested =>
|
||||
for {
|
||||
res1 <- tested.createUser(User("user1", None), Set.empty)
|
||||
rights1 <- tested.listUserRights("user1")
|
||||
user2 <- tested.createUser(
|
||||
User("user2", None),
|
||||
Set(ParticipantAdmin, CanActAs("party1"), CanReadAs("party2")),
|
||||
)
|
||||
rights2 <- tested.listUserRights("user2")
|
||||
} yield {
|
||||
res1 shouldBe Right(())
|
||||
rights1 shouldBe Right(Set.empty)
|
||||
user2 shouldBe Right(())
|
||||
rights2 shouldBe Right(
|
||||
Set(ParticipantAdmin, CanActAs("party1"), CanReadAs("party2"))
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
"listUserRights should fail on non-existent user" in {
|
||||
testIt { tested =>
|
||||
for {
|
||||
rights1 <- tested.listUserRights("user1")
|
||||
} yield {
|
||||
rights1 shouldBe Left(UserNotFound("user1"))
|
||||
}
|
||||
}
|
||||
}
|
||||
"grantUserRights should add new rights" in {
|
||||
testIt { tested =>
|
||||
for {
|
||||
res1 <- tested.createUser(User("user1", None), Set.empty)
|
||||
rights1 <- tested.grantRights("user1", Set(ParticipantAdmin))
|
||||
rights2 <- tested.grantRights("user1", Set(ParticipantAdmin))
|
||||
rights3 <- tested.grantRights("user1", Set(CanActAs("party1"), CanReadAs("party2")))
|
||||
rights4 <- tested.listUserRights("user1")
|
||||
} yield {
|
||||
res1 shouldBe Right(())
|
||||
rights1 shouldBe Right(Set(ParticipantAdmin))
|
||||
rights2 shouldBe Right(Set.empty)
|
||||
rights3 shouldBe Right(
|
||||
Set(CanActAs("party1"), CanReadAs("party2"))
|
||||
)
|
||||
rights4 shouldBe Right(
|
||||
Set(ParticipantAdmin, CanActAs("party1"), CanReadAs("party2"))
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
"grantRights should fail on non-existent user" in {
|
||||
testIt { tested =>
|
||||
for {
|
||||
rights1 <- tested.grantRights("user1", Set.empty)
|
||||
} yield {
|
||||
rights1 shouldBe Left(UserNotFound("user1"))
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
"revokeRights should revoke rights" in {
|
||||
testIt { tested =>
|
||||
for {
|
||||
res1 <- tested.createUser(
|
||||
User("user1", None),
|
||||
Set(ParticipantAdmin, CanActAs("party1"), CanReadAs("party2")),
|
||||
)
|
||||
rights1 <- tested.listUserRights("user1")
|
||||
rights2 <- tested.revokeRights("user1", Set(ParticipantAdmin))
|
||||
rights3 <- tested.revokeRights("user1", Set(ParticipantAdmin))
|
||||
rights4 <- tested.listUserRights("user1")
|
||||
rights5 <- tested.revokeRights("user1", Set(CanActAs("party1"), CanReadAs("party2")))
|
||||
rights6 <- tested.listUserRights("user1")
|
||||
} yield {
|
||||
res1 shouldBe Right(())
|
||||
rights1 shouldBe Right(
|
||||
Set(ParticipantAdmin, CanActAs("party1"), CanReadAs("party2"))
|
||||
)
|
||||
rights2 shouldBe Right(Set(ParticipantAdmin))
|
||||
rights3 shouldBe Right(Set.empty)
|
||||
rights4 shouldBe Right(Set(CanActAs("party1"), CanReadAs("party2")))
|
||||
rights5 shouldBe Right(
|
||||
Set(CanActAs("party1"), CanReadAs("party2"))
|
||||
)
|
||||
rights6 shouldBe Right(Set.empty)
|
||||
}
|
||||
}
|
||||
}
|
||||
"revokeRights should fail on non-existent user" in {
|
||||
testIt { tested =>
|
||||
for {
|
||||
rights1 <- tested.revokeRights("user1", Set.empty)
|
||||
} yield {
|
||||
rights1 shouldBe Left(UserNotFound("user1"))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,74 @@
|
||||
// Copyright (c) 2022 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package com.daml.platform.apiserver.services.admin
|
||||
|
||||
import java.nio.charset.StandardCharsets
|
||||
import java.util.Base64
|
||||
|
||||
import com.daml.error.definitions.LedgerApiErrors
|
||||
import com.daml.error.{DamlContextualizedErrorLogger, ErrorsAssertions}
|
||||
import com.daml.ledger.api.domain.User
|
||||
import com.daml.ledger.participant.state.index.v2.UserManagementStore.UsersPage
|
||||
import com.daml.lf.data.Ref
|
||||
import org.scalatest.EitherValues
|
||||
import org.scalatest.flatspec.AnyFlatSpec
|
||||
import org.scalatest.matchers.should.Matchers
|
||||
|
||||
class ApiUserManagementServiceSpec
|
||||
extends AnyFlatSpec
|
||||
with Matchers
|
||||
with EitherValues
|
||||
with ErrorsAssertions {
|
||||
|
||||
private val errorLogger = DamlContextualizedErrorLogger.forTesting(getClass)
|
||||
|
||||
it should "test users page token encoding and decoding" in {
|
||||
val id2 = Ref.UserId.assertFromString("user2")
|
||||
val page = UsersPage(
|
||||
users = Seq(User(Ref.UserId.assertFromString("user1"), None), User(id2, None))
|
||||
)
|
||||
val actualNextPageToken = ApiUserManagementService.encodeNextPageToken(page)
|
||||
actualNextPageToken shouldBe "dXNlcjI="
|
||||
ApiUserManagementService.decodePageToken(actualNextPageToken)(errorLogger) shouldBe Right(
|
||||
Some(id2)
|
||||
)
|
||||
}
|
||||
|
||||
it should "test users page token encoding and decoding for an empty page" in {
|
||||
val page = UsersPage(users = Seq.empty)
|
||||
val actualNextPageToken = ApiUserManagementService.encodeNextPageToken(page)
|
||||
actualNextPageToken shouldBe ("")
|
||||
ApiUserManagementService.decodePageToken(actualNextPageToken)(errorLogger) shouldBe Right(None)
|
||||
}
|
||||
|
||||
it should "return invalid argument error when token is not a base64" in {
|
||||
val actualNextPageToken =
|
||||
ApiUserManagementService.decodePageToken("not-a-base64-string!!")(errorLogger)
|
||||
val error = actualNextPageToken.left.value
|
||||
assertError(
|
||||
actual = error,
|
||||
expectedF = LedgerApiErrors.RequestValidation.InvalidArgument
|
||||
.Reject("Invalid page token")(_)
|
||||
.asGrpcError,
|
||||
)
|
||||
}
|
||||
|
||||
it should "return invalid argument error when token is base64 but not a valid user id string" in {
|
||||
val notValidUserId = "not a valid user id"
|
||||
Ref.UserId.fromString(notValidUserId).isLeft shouldBe true
|
||||
val badPageToken = new String(
|
||||
Base64.getEncoder.encode(notValidUserId.getBytes(StandardCharsets.UTF_8)),
|
||||
StandardCharsets.UTF_8,
|
||||
)
|
||||
|
||||
val actualNextPageToken = ApiUserManagementService.decodePageToken(badPageToken)(errorLogger)
|
||||
val error = actualNextPageToken.left.value
|
||||
assertError(
|
||||
actual = error,
|
||||
expectedF = LedgerApiErrors.RequestValidation.InvalidArgument
|
||||
.Reject("Invalid page token")(_)
|
||||
.asGrpcError,
|
||||
)
|
||||
}
|
||||
}
|
@ -6,16 +6,26 @@ package com.daml.platform.usermanagement
|
||||
import com.codahale.metrics.MetricRegistry
|
||||
import com.daml.ledger.api.domain.{User, UserRight}
|
||||
import com.daml.ledger.participant.state.index.impl.inmemory.InMemoryUserManagementStore
|
||||
import com.daml.ledger.participant.state.index.v2.UserManagementStore.{UserInfo, UserNotFound}
|
||||
import com.daml.ledger.participant.state.index.v2.UserManagementStore
|
||||
import com.daml.ledger.participant.state.index.v2.UserManagementStore.{
|
||||
UserInfo,
|
||||
UserNotFound,
|
||||
UsersPage,
|
||||
}
|
||||
import com.daml.ledger.resources.TestResourceContext
|
||||
import com.daml.lf.data.Ref
|
||||
import com.daml.metrics.Metrics
|
||||
import com.daml.platform.store.platform.usermanagement.UserManagementStoreSpecBase
|
||||
import org.mockito.{ArgumentMatchersSugar, MockitoSugar}
|
||||
import org.scalatest.Assertion
|
||||
import org.scalatest.freespec.AsyncFreeSpec
|
||||
import org.scalatest.matchers.should.Matchers
|
||||
|
||||
import scala.concurrent.Future
|
||||
|
||||
class CachedUserManagementStoreSpec
|
||||
extends AsyncFreeSpec
|
||||
with UserManagementStoreSpecBase
|
||||
with TestResourceContext
|
||||
with Matchers
|
||||
with MockitoSugar
|
||||
@ -104,16 +114,16 @@ class CachedUserManagementStoreSpec
|
||||
|
||||
for {
|
||||
res0 <- tested.createUser(user, rights)
|
||||
res1 <- tested.listUsers()
|
||||
res2 <- tested.listUsers()
|
||||
res1 <- tested.listUsers(fromExcl = None, maxResults = 100)
|
||||
res2 <- tested.listUsers(fromExcl = None, maxResults = 100)
|
||||
} yield {
|
||||
val order = inOrder(delegate)
|
||||
order.verify(delegate, times(1)).createUser(user, rights)
|
||||
order.verify(delegate, times(2)).listUsers()
|
||||
order.verify(delegate, times(2)).listUsers(fromExcl = None, maxResults = 100)
|
||||
order.verifyNoMoreInteractions()
|
||||
res0 shouldBe Right(())
|
||||
res1 shouldBe Right(Seq(user))
|
||||
res2 shouldBe Right(Seq(user))
|
||||
res1 shouldBe Right(UsersPage(Seq(user)))
|
||||
res2 shouldBe Right(UsersPage(Seq(user)))
|
||||
}
|
||||
}
|
||||
|
||||
@ -149,4 +159,7 @@ class CachedUserManagementStoreSpec
|
||||
)
|
||||
}
|
||||
|
||||
override def testIt(f: UserManagementStore => Future[Assertion]): Future[Assertion] = {
|
||||
f(createTested(new InMemoryUserManagementStore(createAdmin = false)))
|
||||
}
|
||||
}
|
||||
|
@ -0,0 +1,24 @@
|
||||
// Copyright (c) 2022 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package com.daml.platform.usermanagement
|
||||
|
||||
import com.daml.ledger.participant.state.index.impl.inmemory.InMemoryUserManagementStore
|
||||
import com.daml.ledger.participant.state.index.v2.UserManagementStore
|
||||
import com.daml.platform.store.platform.usermanagement.UserManagementStoreSpecBase
|
||||
import org.scalatest.Assertion
|
||||
import org.scalatest.freespec.AsyncFreeSpec
|
||||
|
||||
import scala.concurrent.Future
|
||||
|
||||
class InMemoryUserManagementStoreSpec extends AsyncFreeSpec with UserManagementStoreSpecBase {
|
||||
|
||||
override def testIt(f: UserManagementStore => Future[Assertion]): Future[Assertion] = {
|
||||
f(
|
||||
new InMemoryUserManagementStore(
|
||||
createAdmin = false
|
||||
)
|
||||
)
|
||||
}
|
||||
|
||||
}
|
@ -19,7 +19,7 @@ class InMemoryUserManagementStore(createAdmin: Boolean = true) extends UserManag
|
||||
// Structured so we can use a ConcurrentHashMap (to more closely mimic a real implementation, where performance is key).
|
||||
// We synchronize on a private object (the mutable map), not the service (which could cause deadlocks).
|
||||
// (No need to mark state as volatile -- rely on synchronized to establish the JMM's happens-before relation.)
|
||||
private val state: mutable.Map[Ref.UserId, UserInfo] = mutable.Map()
|
||||
private val state: mutable.TreeMap[Ref.UserId, UserInfo] = mutable.TreeMap()
|
||||
if (createAdmin) {
|
||||
state.put(AdminUser.user.id, AdminUser)
|
||||
}
|
||||
@ -64,9 +64,21 @@ class InMemoryUserManagementStore(createAdmin: Boolean = true) extends UserManag
|
||||
effectivelyRevoked
|
||||
}
|
||||
|
||||
def listUsers(): Future[Result[Users]] =
|
||||
override def listUsers(
|
||||
fromExcl: Option[Ref.UserId],
|
||||
maxResults: Int,
|
||||
): Future[Result[UsersPage]] = {
|
||||
withState {
|
||||
Right(state.values.map(_.user).toSeq)
|
||||
val iter: Iterator[UserInfo] = fromExcl match {
|
||||
case None => state.valuesIterator
|
||||
case Some(after) => state.valuesIteratorFrom(start = after).dropWhile(_.user.id == after)
|
||||
}
|
||||
val users: Seq[User] = iter
|
||||
.take(maxResults)
|
||||
.map(_.user)
|
||||
.toSeq
|
||||
Right(UsersPage(users = users))
|
||||
}
|
||||
}
|
||||
|
||||
private def withState[T](t: => T): Future[T] =
|
||||
@ -100,6 +112,7 @@ class InMemoryUserManagementStore(createAdmin: Boolean = true) extends UserManag
|
||||
case _ => false
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
object InMemoryUserManagementStore {
|
||||
|
@ -9,13 +9,14 @@ import com.daml.lf.data.Ref
|
||||
import scala.concurrent.{ExecutionContext, Future}
|
||||
|
||||
trait UserManagementStore {
|
||||
|
||||
import UserManagementStore._
|
||||
|
||||
// read access
|
||||
|
||||
def getUserInfo(id: Ref.UserId): Future[Result[UserInfo]]
|
||||
|
||||
def listUsers(): Future[Result[Users]]
|
||||
def listUsers(fromExcl: Option[Ref.UserId], maxResults: Int): Future[Result[UsersPage]]
|
||||
|
||||
// write access
|
||||
|
||||
@ -43,6 +44,10 @@ object UserManagementStore {
|
||||
type Result[T] = Either[Error, T]
|
||||
type Users = Seq[User]
|
||||
|
||||
case class UsersPage(users: Seq[User]) {
|
||||
def lastUserIdOption: Option[Ref.UserId] = users.lastOption.map(_.id)
|
||||
}
|
||||
|
||||
case class UserInfo(user: User, rights: Set[UserRight])
|
||||
|
||||
sealed trait Error extends RuntimeException
|
||||
|
@ -653,6 +653,16 @@ object Config {
|
||||
.action((value, config: Config[Extra]) =>
|
||||
config.withUserManagementConfig(_.copy(maxCacheSize = value))
|
||||
)
|
||||
|
||||
opt[Int]("max-users-page-size")
|
||||
.optional()
|
||||
.text(
|
||||
s"Maximum number of users that the server can return in a single response. " +
|
||||
s"Defaults to ${UserManagementConfig.DefaultMaxUsersPageSize} entries."
|
||||
)
|
||||
.action((value, config: Config[Extra]) =>
|
||||
config.withUserManagementConfig(_.copy(maxUsersPageSize = value))
|
||||
)
|
||||
}
|
||||
extraOptions(parser)
|
||||
parser
|
||||
|
@ -83,7 +83,7 @@ trait ConfigProvider[ExtraConfig] {
|
||||
participantConfig.maxTransactionsInMemoryFanOutBufferSize,
|
||||
enableInMemoryFanOutForLedgerApi = config.enableInMemoryFanOutForLedgerApi,
|
||||
enableSelfServiceErrorCodes = config.enableSelfServiceErrorCodes,
|
||||
enableUserManagement = config.userManagementConfig.enabled,
|
||||
userManagementConfig = config.userManagementConfig,
|
||||
)
|
||||
|
||||
def partyConfig(@unused config: Config[ExtraConfig]): PartyConfiguration =
|
||||
|
@ -334,5 +334,21 @@ final class ConfigSpec
|
||||
).value.userManagementConfig.cacheExpiryAfterWriteInSeconds shouldBe 123
|
||||
}
|
||||
|
||||
it should "handle '--max-users-page-size' flag correctly" in {
|
||||
// missing value
|
||||
configParserSimple(
|
||||
Seq("--max-users-page-size")
|
||||
) shouldBe None
|
||||
// default
|
||||
configParserSimple().value.userManagementConfig.maxUsersPageSize shouldBe 1000
|
||||
// custom value
|
||||
configParserSimple(
|
||||
Seq(
|
||||
"--max-users-page-size",
|
||||
"123",
|
||||
)
|
||||
).value.userManagementConfig.maxUsersPageSize shouldBe 123
|
||||
}
|
||||
|
||||
private def parsingFailure(): Nothing = fail("Config parsing failed.")
|
||||
}
|
||||
|
@ -423,6 +423,16 @@ class CommonCliBase(name: LedgerName) {
|
||||
config.withUserManagementConfig(_.copy(maxCacheSize = value))
|
||||
)
|
||||
|
||||
opt[Int]("max-users-page-size")
|
||||
.optional()
|
||||
.text(
|
||||
s"Maximum number of users that the server can return in a single response. " +
|
||||
s"Defaults to ${UserManagementConfig.DefaultMaxUsersPageSize} entries."
|
||||
)
|
||||
.action((value, config: SandboxConfig) =>
|
||||
config.withUserManagementConfig(_.copy(maxUsersPageSize = value))
|
||||
)
|
||||
|
||||
com.daml.cliopts.Metrics.metricsReporterParse(this)(
|
||||
(setter, config) => config.copy(metricsReporter = setter(config.metricsReporter)),
|
||||
(setter, config) =>
|
||||
|
@ -422,6 +422,26 @@ abstract class CommonCliSpecBase(
|
||||
)
|
||||
}
|
||||
|
||||
"handle '--max-users-page-size' flag correctly" in {
|
||||
// missing value
|
||||
checkOptionFail(
|
||||
Array("--max-users-page-size")
|
||||
)
|
||||
// default
|
||||
checkOption(
|
||||
Array.empty,
|
||||
_.withUserManagementConfig(_.copy(maxUsersPageSize = 1000)),
|
||||
)
|
||||
// custom value
|
||||
checkOption(
|
||||
Array(
|
||||
"--max-users-page-size",
|
||||
"123",
|
||||
),
|
||||
_.withUserManagementConfig(_.copy(maxUsersPageSize = 123)),
|
||||
)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
protected def checkOption(
|
||||
|
@ -225,7 +225,7 @@ class Runner(config: SandboxConfig) extends ResourceOwner[Port] {
|
||||
maxTransactionsInMemoryFanOutBufferSize = 0L,
|
||||
enableInMemoryFanOutForLedgerApi = false,
|
||||
enableSelfServiceErrorCodes = config.enableSelfServiceErrorCodes,
|
||||
enableUserManagement = config.userManagementConfig.enabled,
|
||||
userManagementConfig = config.userManagementConfig,
|
||||
)
|
||||
dbSupport <- DbSupport.owner(
|
||||
jdbcUrl = apiServerConfig.jdbcUrl,
|
||||
|
Loading…
Reference in New Issue
Block a user