mirror of
https://github.com/digital-asset/daml.git
synced 2024-09-20 01:07:18 +03:00
[Trigger-Service] Add support for read as in the trigger service (#11370)
* Add support for read as in the trigger service changelog_begin - [Trigger-Service] a list of read_as parties can now be supplied to the playload for the v1/triggers endpoint (i.e. field name is "read_as"). changelog_end * Fix scala 2.12 build * Remove not-null constraint from the new read_as column to fix oracle tests * Fix read queries to respect that the read_as column values can be NULL * Don't set any default values for the read_as column, it being null is fine * Add test that covers the readas feature in the trigger service * Use only alice and public as parties in the new test, don't want to mix up things unnecessarily * Fix failing auth test * Update triggers/service/src/main/scala/com/digitalasset/daml/lf/engine/trigger/dao/DbTriggerDao.scala Co-authored-by: Stephen Compall <stephen.compall@daml.com> * Update triggers/service/src/test/scala/com/digitalasset/daml/lf/engine/trigger/TriggerServiceTest.scala Co-authored-by: Stephen Compall <stephen.compall@daml.com> * Address review comments/Add the not null constraint to the read as column for postgres Co-authored-by: Stephen Compall <stephen.compall@daml.com>
This commit is contained in:
parent
07274d1b52
commit
99c6be5272
@ -300,6 +300,7 @@ genrule(
|
||||
cp -L $(location :test-model/TestTrigger.daml) $$TMP_DIR/daml
|
||||
cp -L $(location :test-model/ErrorTrigger.daml) $$TMP_DIR/daml
|
||||
cp -L $(location :test-model/LowLevelErrorTrigger.daml) $$TMP_DIR/daml
|
||||
cp -L $(location :test-model/ReadAs.daml) $$TMP_DIR/daml
|
||||
cp -L $(location //triggers/daml:daml-trigger.dar) $$TMP_DIR/
|
||||
cp -L $(location //daml-script/daml:daml-script.dar) $$TMP_DIR
|
||||
cat << EOF > $$TMP_DIR/daml.yaml
|
||||
|
@ -0,0 +1,5 @@
|
||||
-- Copyright (c) 2020 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
|
||||
-- SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
-- Add application_id to running_triggers table defaulting to trigger_instance
|
||||
alter table ${table.prefix}running_triggers add (read_as nvarchar2(2000));
|
@ -0,0 +1 @@
|
||||
71ea0dd6abec48973a83720bdd6bca11ed31483d278850cd0062279adf0dab91
|
@ -0,0 +1,7 @@
|
||||
-- Copyright (c) 2020 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
|
||||
-- SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
-- Add application_id to running_triggers table defaulting to trigger_instance
|
||||
alter table ${table.prefix}running_triggers add column read_as text;
|
||||
update ${table.prefix}running_triggers set read_as = '';
|
||||
alter table ${table.prefix}running_triggers alter column read_as set not null;
|
@ -0,0 +1 @@
|
||||
e24e0be965dd8ea349add211be88b82203ffedc8bd85bb7dd77aa44042944205
|
@ -19,18 +19,19 @@ object Request {
|
||||
def write(id: Identifier): JsValue = JsString(id.toString)
|
||||
}
|
||||
|
||||
private[Request] implicit val PartyFormat: JsonFormat[Party] =
|
||||
private[trigger] implicit val PartyFormat: JsonFormat[Party] =
|
||||
Tag.subst(implicitly[JsonFormat[String]])
|
||||
|
||||
final case class StartParams(
|
||||
triggerName: Identifier,
|
||||
party: Party,
|
||||
applicationId: Option[ApplicationId],
|
||||
readAs: Option[List[Party]],
|
||||
)
|
||||
object StartParams {
|
||||
implicit val applicationIdFormat: JsonFormat[ApplicationId] =
|
||||
Tag.subst(implicitly[JsonFormat[String]])
|
||||
implicit val startParamsFormat: RootJsonFormat[StartParams] = jsonFormat3(StartParams.apply)
|
||||
implicit val startParamsFormat: RootJsonFormat[StartParams] = jsonFormat4(StartParams.apply)
|
||||
}
|
||||
|
||||
final case class ListParams(party: Party)
|
||||
|
@ -135,16 +135,18 @@ class Server(
|
||||
name: Identifier,
|
||||
party: Party,
|
||||
applicationId: ApplicationId,
|
||||
readAs: Set[Party],
|
||||
)
|
||||
|
||||
private def newTrigger(
|
||||
party: Party,
|
||||
triggerName: Identifier,
|
||||
optApplicationId: Option[ApplicationId],
|
||||
readAs: Set[Party],
|
||||
): TriggerConfig = {
|
||||
val newInstance = UUID.randomUUID()
|
||||
val applicationId = optApplicationId.getOrElse(Tag(newInstance.toString): ApplicationId)
|
||||
TriggerConfig(newInstance, triggerName, party, applicationId)
|
||||
TriggerConfig(newInstance, triggerName, party, applicationId, readAs)
|
||||
}
|
||||
|
||||
// Add a new trigger to the database and return the resulting Trigger.
|
||||
@ -161,6 +163,7 @@ class Server(
|
||||
config.applicationId,
|
||||
auth.map(_.accessToken),
|
||||
auth.flatMap(_.refreshToken),
|
||||
config.readAs,
|
||||
)
|
||||
// Validate trigger id before persisting to DB
|
||||
Trigger.fromIdentifier(compiledPackages, runningTrigger.triggerName) match {
|
||||
@ -298,11 +301,18 @@ class Server(
|
||||
// started trigger.
|
||||
post {
|
||||
entity(as[StartParams]) { params =>
|
||||
val config = newTrigger(params.party, params.triggerName, params.applicationId)
|
||||
val config =
|
||||
newTrigger(
|
||||
params.party,
|
||||
params.triggerName,
|
||||
params.applicationId,
|
||||
params.readAs.map(_.toSet).getOrElse(Set.empty),
|
||||
)
|
||||
val claims =
|
||||
AuthRequest.Claims(
|
||||
actAs = List(params.party),
|
||||
applicationId = Some(config.applicationId),
|
||||
readAs = config.readAs.toList,
|
||||
)
|
||||
authorize(claims) { auth =>
|
||||
extractExecutionContext { implicit ec =>
|
||||
@ -582,6 +592,7 @@ object Server {
|
||||
trigger,
|
||||
ledgerConfig,
|
||||
restartConfig,
|
||||
runningTrigger.triggerReadAs,
|
||||
),
|
||||
runningTrigger.triggerInstance.toString,
|
||||
),
|
||||
|
@ -40,6 +40,7 @@ object TriggerRunnerImpl {
|
||||
trigger: Trigger,
|
||||
ledgerConfig: LedgerConfig,
|
||||
restartConfig: TriggerRestartConfig,
|
||||
readAs: Set[Party],
|
||||
) {
|
||||
private[trigger] def withLoggingContext[T](f: LoggingContextOf[Config with Trigger] => T): T =
|
||||
trigger.withLoggingContext.labelled[Config]("triggerId" -> triggerInstance.toString)(f)
|
||||
@ -189,8 +190,7 @@ object TriggerRunnerImpl {
|
||||
config.applicationId,
|
||||
TriggerParties(
|
||||
actAs = config.party,
|
||||
// TODO (MK) Support multi-party readAs in the trigger service.
|
||||
readAs = Set.empty,
|
||||
readAs = config.readAs,
|
||||
),
|
||||
)
|
||||
(acs, offset) <- runner.queryACS()
|
||||
|
@ -21,6 +21,7 @@ import java.io.{Closeable, IOException}
|
||||
|
||||
import com.daml.auth.middleware.api.Tagged.{AccessToken, RefreshToken}
|
||||
import com.daml.doobie.logging.Slf4jLogHandler
|
||||
import scalaz.syntax.std.tuple._
|
||||
import javax.sql.DataSource
|
||||
|
||||
import scala.concurrent.{ExecutionContext, Future}
|
||||
@ -38,6 +39,20 @@ abstract class DbTriggerDao protected (
|
||||
protected implicit def uuidPut: Put[UUID]
|
||||
protected implicit def uuidGet: Get[UUID]
|
||||
|
||||
implicit val readAsPut: Put[Set[Party]] = {
|
||||
type F[A] = Put[Set[A]]
|
||||
Party.subst[F, String](implicitly[Put[String]].contramap {
|
||||
_.mkString("%")
|
||||
})
|
||||
}
|
||||
|
||||
implicit val readAsGet: Get[Set[Party]] = {
|
||||
type F[A] = Get[Set[A]]
|
||||
Party.subst[F, String](implicitly[Get[String]].map {
|
||||
_.split("%").toSet.filter(_.nonEmpty)
|
||||
})
|
||||
}
|
||||
|
||||
implicit val partyPut: Put[Party] = Tag.subst(implicitly[Put[String]])
|
||||
|
||||
implicit val partyGet: Get[Party] = Tag.subst(implicitly[Get[String]])
|
||||
@ -77,21 +92,32 @@ abstract class DbTriggerDao protected (
|
||||
private def insertRunningTrigger(t: RunningTrigger): ConnectionIO[Unit] = {
|
||||
val insert: Fragment =
|
||||
sql"""insert into ${Fragment.const(s"${tablePrefix}running_triggers")}
|
||||
(trigger_instance, trigger_party, full_trigger_name, access_token, refresh_token, application_id)
|
||||
(trigger_instance, trigger_party, full_trigger_name, access_token, refresh_token, application_id, read_as)
|
||||
values
|
||||
(${t.triggerInstance}, ${t.triggerParty}, ${t.triggerName}, ${t.triggerAccessToken}, ${t.triggerRefreshToken}, ${t.triggerApplicationId})
|
||||
(${t.triggerInstance}, ${t.triggerParty}, ${t.triggerName}, ${t.triggerAccessToken}, ${t.triggerRefreshToken}, ${t.triggerApplicationId}, ${t.triggerReadAs})
|
||||
"""
|
||||
insert.update.run.void
|
||||
}
|
||||
|
||||
private def queryRunningTrigger(triggerInstance: UUID): ConnectionIO[Option[RunningTrigger]] = {
|
||||
val select: Fragment = sql"""
|
||||
select trigger_instance, full_trigger_name, trigger_party, application_id, access_token, refresh_token
|
||||
select trigger_instance, full_trigger_name, trigger_party, application_id, access_token, refresh_token, read_as
|
||||
from ${Fragment.const(s"${tablePrefix}running_triggers")}
|
||||
where trigger_instance = $triggerInstance
|
||||
"""
|
||||
select
|
||||
.query[(UUID, Identifier, Party, ApplicationId, Option[AccessToken], Option[RefreshToken])]
|
||||
.query[
|
||||
(
|
||||
UUID,
|
||||
Identifier,
|
||||
Party,
|
||||
ApplicationId,
|
||||
Option[AccessToken],
|
||||
Option[RefreshToken],
|
||||
Option[Set[Party]],
|
||||
)
|
||||
]
|
||||
.map(_.mapElements(_7 = it => it.getOrElse(Set.empty)))
|
||||
.map(RunningTrigger.tupled)
|
||||
.option
|
||||
}
|
||||
@ -156,12 +182,23 @@ abstract class DbTriggerDao protected (
|
||||
|
||||
private def selectAllTriggers: ConnectionIO[Vector[RunningTrigger]] = {
|
||||
val select: Fragment = sql"""
|
||||
select trigger_instance, full_trigger_name, trigger_party, application_id, access_token, refresh_token
|
||||
select trigger_instance, full_trigger_name, trigger_party, application_id, access_token, refresh_token, read_as
|
||||
from ${Fragment.const(s"${tablePrefix}running_triggers")}
|
||||
order by trigger_instance
|
||||
"""
|
||||
select
|
||||
.query[(UUID, Identifier, Party, ApplicationId, Option[AccessToken], Option[RefreshToken])]
|
||||
.query[
|
||||
(
|
||||
UUID,
|
||||
Identifier,
|
||||
Party,
|
||||
ApplicationId,
|
||||
Option[AccessToken],
|
||||
Option[RefreshToken],
|
||||
Option[Set[Party]],
|
||||
)
|
||||
]
|
||||
.map(_.mapElements(_7 = it => it.getOrElse(Set.empty)))
|
||||
.map(RunningTrigger.tupled)
|
||||
.to[Vector]
|
||||
}
|
||||
|
@ -43,5 +43,6 @@ package trigger {
|
||||
triggerApplicationId: ApplicationId,
|
||||
triggerAccessToken: Option[AccessToken],
|
||||
triggerRefreshToken: Option[RefreshToken],
|
||||
triggerReadAs: Set[Party],
|
||||
)
|
||||
}
|
||||
|
@ -94,14 +94,22 @@ trait AbstractTriggerServiceTest
|
||||
triggerName: String,
|
||||
party: Party,
|
||||
applicationId: Option[ApplicationId] = None,
|
||||
readAs: Set[Party] = Set(),
|
||||
): Future[HttpResponse] = {
|
||||
import Request.PartyFormat
|
||||
val readAsContent =
|
||||
if (readAs.isEmpty) "null"
|
||||
else {
|
||||
import spray.json.DefaultJsonProtocol._
|
||||
readAs.toJson.compactPrint
|
||||
}
|
||||
val req = HttpRequest(
|
||||
method = HttpMethods.POST,
|
||||
uri = uri.withPath(Uri.Path("/v1/triggers")),
|
||||
entity = HttpEntity(
|
||||
ContentTypes.`application/json`,
|
||||
s"""{"triggerName": "$triggerName", "party": "$party", "applicationId": "${applicationId
|
||||
.getOrElse("null")}"}""",
|
||||
.getOrElse("null")}", "readAs": $readAsContent}""",
|
||||
),
|
||||
)
|
||||
httpRequestFollow(req)
|
||||
@ -254,6 +262,54 @@ trait AbstractTriggerServiceTest
|
||||
} yield succeed
|
||||
}
|
||||
|
||||
it should "successfully start a trigger that uses multi-read-as" in withTriggerService(
|
||||
List(dar)
|
||||
) { uri: Uri =>
|
||||
val visibleToPublicId = Identifier(testPkgId, "ReadAs", "VisibleToPublic")
|
||||
def visibleToPublic(party: String): CreateCommand =
|
||||
CreateCommand(
|
||||
templateId = Some(visibleToPublicId),
|
||||
createArguments = Some(
|
||||
Record(fields = Seq(RecordField("public", Some(Value().withParty(party)))))
|
||||
),
|
||||
)
|
||||
for {
|
||||
(client, public) <- for {
|
||||
client <- sandboxClient(
|
||||
ApiTypes.ApplicationId("exp-app-id"),
|
||||
actAs = List(ApiTypes.Party(alice.unwrap)),
|
||||
admin = true,
|
||||
)
|
||||
|
||||
public <- client.partyManagementClient.allocateParty(Some("public"), Some("public"), None)
|
||||
clientWeWant <- sandboxClient(
|
||||
ApiTypes.ApplicationId("exp-app-id"),
|
||||
actAs = List(ApiTypes.Party(alice.unwrap), ApiTypes.Party(public.party.toString)),
|
||||
)
|
||||
} yield (clientWeWant, Party(public.party.toString))
|
||||
|
||||
_ <- submitCmd(
|
||||
client,
|
||||
public.unwrap,
|
||||
Command().withCreate(visibleToPublic(public.unwrap)),
|
||||
)
|
||||
|
||||
// Start the trigger
|
||||
resp <- startTrigger(
|
||||
uri,
|
||||
s"$testPkgId:ReadAs:test",
|
||||
alice,
|
||||
Some(ApplicationId("exp-app-id")),
|
||||
readAs = Set(public),
|
||||
)
|
||||
|
||||
triggerId <- parseTriggerId(resp)
|
||||
_ <- assertTriggerIds(uri, alice, Vector(triggerId))
|
||||
_ <- assertTriggerStatus(triggerId, _.last == "running")
|
||||
|
||||
} yield succeed
|
||||
}
|
||||
|
||||
it should "start multiple triggers and list them by party" in withTriggerService(List(dar)) {
|
||||
uri: Uri =>
|
||||
for {
|
||||
|
38
triggers/service/test-model/ReadAs.daml
Normal file
38
triggers/service/test-model/ReadAs.daml
Normal file
@ -0,0 +1,38 @@
|
||||
-- Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
|
||||
-- SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
|
||||
module ReadAs where
|
||||
|
||||
import DA.Action
|
||||
import Daml.Trigger
|
||||
|
||||
-- There are 3 things we need to test:
|
||||
--
|
||||
-- 1. The initial ACS query sets the right readAs claims.
|
||||
-- 2. We submit commands with the right readAs claims.
|
||||
-- 3. We subscribe to transactions for the readAs parties.
|
||||
|
||||
test : Trigger Int
|
||||
test = Trigger
|
||||
{ initialize = do
|
||||
public <- getReadAs >>= \case
|
||||
[public] -> pure public
|
||||
readAs -> error $ "Expected exactly one readAs party but got " <> show readAs
|
||||
visible <- query @VisibleToPublic
|
||||
case visible of
|
||||
[(_, visible)] -> unless (visible == VisibleToPublic public) $
|
||||
error ("Expected " <> show (VisibleToPublic public) <> " but got " <> show visible)
|
||||
_ -> error $ "Expected exactly one contract but got " <> show visible
|
||||
pure 1
|
||||
, updateState = \_ -> pure ()
|
||||
, rule = \_ -> pure ()
|
||||
, registeredTemplates = AllInDar
|
||||
, heartbeat = None
|
||||
}
|
||||
|
||||
template VisibleToPublic
|
||||
with
|
||||
public : Party
|
||||
where
|
||||
signatory public
|
Loading…
Reference in New Issue
Block a user