From 10792ed6fd1207bfebc6a8661b5c3439bd70b59e Mon Sep 17 00:00:00 2001 From: Moritz Kiefer Date: Tue, 17 Nov 2020 10:25:30 +0100 Subject: [PATCH] Make application ID configurable in trigger service (#7974) * Make application ID configurable in trigger service fixes #7671 changelog_begin - [Trigger Service] The application id used by a trigger can now be configured by an optional `applicationId` in the start request. changelog_end * Update triggers/service/src/main/scala/com/digitalasset/daml/lf/engine/trigger/Server.scala Co-authored-by: Andreas Herrmann <42969706+aherrmann-da@users.noreply.github.com> Co-authored-by: Andreas Herrmann <42969706+aherrmann-da@users.noreply.github.com> --- docs/source/tools/trigger-service.rst | 14 +++- .../postgres/V3__Add_application_id.sql | 7 ++ .../V3__Add_application_id.sql.sha256 | 1 + .../daml/lf/engine/trigger/Request.scala | 17 +++-- .../daml/lf/engine/trigger/Server.scala | 74 +++++++++++-------- .../lf/engine/trigger/TriggerRunnerImpl.scala | 6 +- .../lf/engine/trigger/dao/DbTriggerDao.scala | 19 +++-- .../daml/lf/engine/trigger/package.scala | 2 + .../engine/trigger/TriggerServiceTest.scala | 34 +++++++-- 9 files changed, 123 insertions(+), 51 deletions(-) create mode 100644 triggers/service/src/main/resources/com/daml/lf/engine/trigger/db/migration/postgres/V3__Add_application_id.sql create mode 100644 triggers/service/src/main/resources/com/daml/lf/engine/trigger/db/migration/postgres/V3__Add_application_id.sql.sha256 diff --git a/docs/source/tools/trigger-service.rst b/docs/source/tools/trigger-service.rst index b4ec202e87..7d87000ee2 100644 --- a/docs/source/tools/trigger-service.rst +++ b/docs/source/tools/trigger-service.rst @@ -49,9 +49,21 @@ HTTP Request { "triggerName": "312094804c1468e2166bae3c9ba8b5cc0d285e31356304a2e9b0ac549df59d14:TestTrigger:trigger", - "party": "alice" + "party": "alice", + "applicationId": "my-app-id" } +where + +- ``triggerName`` contains the identifier for the trigger in the form + ``${packageId}:${moduleName}:${identifierName}``. You can find the + package id using ``daml damlc inspect path/to/trigger.dar``. +- ``party`` is the party the trigger will be running as. +- ``applicationId`` is an optional field to specify the application ID + the trigger will use for command submissions. If omitted, the + trigger will default to using its random UUID identifier returned in + the start request as the application ID. + HTTP Response ============= diff --git a/triggers/service/src/main/resources/com/daml/lf/engine/trigger/db/migration/postgres/V3__Add_application_id.sql b/triggers/service/src/main/resources/com/daml/lf/engine/trigger/db/migration/postgres/V3__Add_application_id.sql new file mode 100644 index 0000000000..de7bf48510 --- /dev/null +++ b/triggers/service/src/main/resources/com/daml/lf/engine/trigger/db/migration/postgres/V3__Add_application_id.sql @@ -0,0 +1,7 @@ +-- Copyright (c) 2020 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved. +-- SPDX-License-Identifier: Apache-2.0 + +-- Add application_id to running trigger table defaulting to trigger_instance +alter table running_triggers add column application_id text; +update running_triggers set application_id = trigger_instance; +alter table running_triggers alter column application_id set not null; diff --git a/triggers/service/src/main/resources/com/daml/lf/engine/trigger/db/migration/postgres/V3__Add_application_id.sql.sha256 b/triggers/service/src/main/resources/com/daml/lf/engine/trigger/db/migration/postgres/V3__Add_application_id.sql.sha256 new file mode 100644 index 0000000000..ff11d83b79 --- /dev/null +++ b/triggers/service/src/main/resources/com/daml/lf/engine/trigger/db/migration/postgres/V3__Add_application_id.sql.sha256 @@ -0,0 +1 @@ +f79ce69153f2419fc87c0ff14a58df4a6e6b1b3bd34674a73c9f0d750b845236 diff --git a/triggers/service/src/main/scala/com/digitalasset/daml/lf/engine/trigger/Request.scala b/triggers/service/src/main/scala/com/digitalasset/daml/lf/engine/trigger/Request.scala index 313f644e44..6672e8f5c7 100644 --- a/triggers/service/src/main/scala/com/digitalasset/daml/lf/engine/trigger/Request.scala +++ b/triggers/service/src/main/scala/com/digitalasset/daml/lf/engine/trigger/Request.scala @@ -3,7 +3,7 @@ package com.daml.lf.engine.trigger -import com.daml.ledger.api.refinements.ApiTypes.Party +import com.daml.ledger.api.refinements.ApiTypes.{ApplicationId, Party} import com.daml.lf.data.Ref.Identifier import scalaz.Tag import spray.json.DefaultJsonProtocol._ @@ -22,13 +22,18 @@ object Request { private[Request] implicit val PartyFormat: JsonFormat[Party] = Tag.subst(implicitly[JsonFormat[String]]) - final case class StartParams(triggerName: Identifier, party: Party) - object StartParams extends ((Identifier, Party) => StartParams) { - implicit val startParamsFormat: RootJsonFormat[StartParams] = jsonFormat2(StartParams) + final case class StartParams( + triggerName: Identifier, + party: Party, + applicationId: Option[ApplicationId]) + object StartParams { + implicit val applicationIdFormat: JsonFormat[ApplicationId] = + Tag.subst(implicitly[JsonFormat[String]]) + implicit val startParamsFormat: RootJsonFormat[StartParams] = jsonFormat3(StartParams.apply) } final case class ListParams(party: Party) - object ListParams extends (Party => ListParams) { - implicit val listParamsFormat: RootJsonFormat[ListParams] = jsonFormat1(ListParams) + object ListParams { + implicit val listParamsFormat: RootJsonFormat[ListParams] = jsonFormat1(ListParams.apply) } } diff --git a/triggers/service/src/main/scala/com/digitalasset/daml/lf/engine/trigger/Server.scala b/triggers/service/src/main/scala/com/digitalasset/daml/lf/engine/trigger/Server.scala index 4547e5dbe2..4bdf766cef 100644 --- a/triggers/service/src/main/scala/com/digitalasset/daml/lf/engine/trigger/Server.scala +++ b/triggers/service/src/main/scala/com/digitalasset/daml/lf/engine/trigger/Server.scala @@ -27,7 +27,7 @@ import akka.util.{ByteString, Timeout} import scala.concurrent.duration._ import com.daml.daml_lf_dev.DamlLf import com.daml.grpc.adapter.{AkkaExecutionSequencerPool, ExecutionSequencerFactory} -import com.daml.ledger.api.refinements.ApiTypes.Party +import com.daml.ledger.api.refinements.ApiTypes.{ApplicationId, Party} import com.daml.lf.archive.Reader.ParseError import com.daml.lf.archive.{Dar, DarReader, Decode} import com.daml.lf.data.Ref.{Identifier, PackageId} @@ -104,8 +104,10 @@ class Server( private def restartTriggers(triggers: Vector[RunningTrigger]): Either[String, Unit] = { import cats.implicits._ // needed for traverse - triggers.traverse_(t => - startTrigger(t.triggerParty, t.triggerName, t.triggerToken, Some(t.triggerInstance))) + triggers.traverse_(runningTrigger => + for { + trigger <- Trigger.fromIdentifier(compiledPackages, runningTrigger.triggerName) + } yield startTrigger(trigger, runningTrigger)) } private def triggerRunnerName(triggerInstance: UUID): String = @@ -116,38 +118,44 @@ class Server( .child(triggerRunnerName(triggerInstance)) .asInstanceOf[Option[ActorRef[TriggerRunner.Message]]] - private def startTrigger( + // Add a new trigger to the database and return the resulting RunningTrigger. + // Note that this does not yet start the trigger. + private def addNewTrigger( party: Party, triggerName: Identifier, - token: Option[AccessToken], - existingInstance: Option[UUID] = None): Either[String, JsValue] = { + optApplicationId: Option[ApplicationId], + token: Option[AccessToken] + ): Either[String, (Trigger, RunningTrigger)] = { + val newInstance = UUID.randomUUID() + val applicationId = optApplicationId.getOrElse(Tag(newInstance.toString): ApplicationId) + val runningTrigger = RunningTrigger(newInstance, triggerName, party, applicationId, token) for { - trigger <- Trigger.fromIdentifier(compiledPackages, triggerName) - triggerInstance <- existingInstance match { - case None => - val newInstance = UUID.randomUUID - triggerDao - .addRunningTrigger(RunningTrigger(newInstance, triggerName, party, token)) - .map(_ => newInstance) - case Some(instance) => Right(instance) - } - _ = ctx.spawn( + // Validate trigger id before persisting to DB + trigger <- Trigger.fromIdentifier(compiledPackages, runningTrigger.triggerName) + _ <- triggerDao.addRunningTrigger(runningTrigger).map(_ => runningTrigger) + } yield (trigger, runningTrigger) + } + + private def startTrigger(trigger: Trigger, runningTrigger: RunningTrigger): JsValue = { + discard[ActorRef[Message]]( + ctx.spawn( TriggerRunner( new TriggerRunner.Config( ctx.self, - triggerInstance, - party, - AccessToken.unsubst(token), + runningTrigger.triggerInstance, + runningTrigger.triggerParty, + runningTrigger.triggerApplicationId, + AccessToken.unsubst(runningTrigger.triggerToken), compiledPackages, trigger, ledgerConfig, restartConfig ), - triggerInstance.toString + runningTrigger.triggerInstance.toString ), - triggerRunnerName(triggerInstance) - ) - } yield JsObject(("triggerId", triggerInstance.toString.toJson)) + triggerRunnerName(runningTrigger.triggerInstance) + )) + JsObject(("triggerId", runningTrigger.triggerInstance.toString.toJson)) } private def stopTrigger(uuid: UUID): Either[String, Option[JsValue]] = { @@ -328,13 +336,19 @@ class Server( val claims = AuthRequest.Claims(actAs = List(params.party)) // TODO[AH] Why do we need to pass ec, system explicitly? - authorize(claims)(ec, system) { token => - startTrigger(params.party, params.triggerName, token) match { - case Left(err) => - complete(errorResponse(StatusCodes.UnprocessableEntity, err)) - case Right(triggerInstance) => - complete(successResponse(triggerInstance)) - } + authorize(claims)(ec, system) { + token => + val instOrErr = + addNewTrigger(params.party, params.triggerName, params.applicationId, token) + .map { + case (trigger, runningTrigger) => startTrigger(trigger, runningTrigger) + } + instOrErr match { + case Left(err) => + complete(errorResponse(StatusCodes.UnprocessableEntity, err)) + case Right(triggerInstance) => + complete(successResponse(triggerInstance)) + } } } }, diff --git a/triggers/service/src/main/scala/com/digitalasset/daml/lf/engine/trigger/TriggerRunnerImpl.scala b/triggers/service/src/main/scala/com/digitalasset/daml/lf/engine/trigger/TriggerRunnerImpl.scala index d99022945b..b2efe9c176 100644 --- a/triggers/service/src/main/scala/com/digitalasset/daml/lf/engine/trigger/TriggerRunnerImpl.scala +++ b/triggers/service/src/main/scala/com/digitalasset/daml/lf/engine/trigger/TriggerRunnerImpl.scala @@ -32,6 +32,7 @@ object TriggerRunnerImpl { server: ActorRef[Server.Message], triggerInstance: UUID, party: Party, + applicationId: ApplicationId, token: Option[String], compiledPackages: CompiledPackages, trigger: Trigger, @@ -62,9 +63,8 @@ object TriggerRunnerImpl { // Report to the server that this trigger is starting. config.server ! Server.TriggerStarting(triggerInstance) logger.info(s"Trigger $name is starting") - val appId = ApplicationId(name) val clientConfig = LedgerClientConfiguration( - applicationId = appId.unwrap, + applicationId = config.applicationId.unwrap, ledgerIdRequirement = LedgerIdRequirement.none, commandClient = CommandClientConfiguration.default.copy( defaultDeduplicationTime = config.ledgerConfig.commandTtl), @@ -176,7 +176,7 @@ object TriggerRunnerImpl { config.trigger, client, config.ledgerConfig.timeProvider, - appId, + config.applicationId, config.party.unwrap) (acs, offset) <- runner.queryACS() } yield QueriedACS(runner, acs, offset) diff --git a/triggers/service/src/main/scala/com/digitalasset/daml/lf/engine/trigger/dao/DbTriggerDao.scala b/triggers/service/src/main/scala/com/digitalasset/daml/lf/engine/trigger/dao/DbTriggerDao.scala index 73dadfbe4b..f73da01d57 100644 --- a/triggers/service/src/main/scala/com/digitalasset/daml/lf/engine/trigger/dao/DbTriggerDao.scala +++ b/triggers/service/src/main/scala/com/digitalasset/daml/lf/engine/trigger/dao/DbTriggerDao.scala @@ -10,7 +10,7 @@ import cats.effect.{Blocker, ContextShift, IO} import cats.syntax.apply._ import cats.syntax.functor._ import com.daml.daml_lf_dev.DamlLf -import com.daml.ledger.api.refinements.ApiTypes.Party +import com.daml.ledger.api.refinements.ApiTypes.{ApplicationId, Party} import com.daml.lf.archive.{Dar, Reader} import com.daml.lf.data.Ref.{Identifier, PackageId} import com.daml.lf.engine.trigger.{JdbcConfig, RunningTrigger} @@ -74,6 +74,10 @@ final class DbTriggerDao private (dataSource: DataSource with Closeable, xa: Con implicit val partyGet: Get[Party] = Tag.subst(implicitly[Get[String]]) + implicit val appIdPut: Put[ApplicationId] = Tag.subst(implicitly[Put[String]]) + + implicit val appIdGet: Get[ApplicationId] = Tag.subst(implicitly[Get[String]]) + implicit val accessTokenPut: Put[AccessToken] = Tag.subst(implicitly[Put[String]]) implicit val accessTokenGet: Get[AccessToken] = Tag.subst(implicitly[Get[String]]) @@ -99,18 +103,21 @@ final class DbTriggerDao private (dataSource: DataSource with Closeable, xa: Con private def insertRunningTrigger(t: RunningTrigger): ConnectionIO[Unit] = { val insert: Fragment = sql""" - insert into running_triggers values (${t.triggerInstance}, ${t.triggerParty}, ${t.triggerName}, ${t.triggerToken}) + insert into running_triggers + (trigger_instance, trigger_party, full_trigger_name, access_token, application_id) + values + (${t.triggerInstance}, ${t.triggerParty}, ${t.triggerName}, ${t.triggerToken}, ${t.triggerApplicationId}) """ insert.update.run.void } private def queryRunningTrigger(triggerInstance: UUID): ConnectionIO[Option[RunningTrigger]] = { val select: Fragment = sql""" - select trigger_instance, full_trigger_name, trigger_party, access_token from running_triggers + select trigger_instance, full_trigger_name, trigger_party, application_id, access_token from running_triggers where trigger_instance = $triggerInstance """ select - .query[(UUID, Identifier, Party, Option[AccessToken])] + .query[(UUID, Identifier, Party, ApplicationId, Option[AccessToken])] .map(RunningTrigger.tupled) .option } @@ -162,10 +169,10 @@ final class DbTriggerDao private (dataSource: DataSource with Closeable, xa: Con private def selectAllTriggers: ConnectionIO[Vector[RunningTrigger]] = { val select: Fragment = sql""" - select trigger_instance, full_trigger_name, trigger_party, access_token from running_triggers order by trigger_instance + select trigger_instance, full_trigger_name, trigger_party, application_id, access_token from running_triggers order by trigger_instance """ select - .query[(UUID, Identifier, Party, Option[AccessToken])] + .query[(UUID, Identifier, Party, ApplicationId, Option[AccessToken])] .map(RunningTrigger.tupled) .to[Vector] } diff --git a/triggers/service/src/main/scala/com/digitalasset/daml/lf/engine/trigger/package.scala b/triggers/service/src/main/scala/com/digitalasset/daml/lf/engine/trigger/package.scala index b85470f527..94825f512f 100644 --- a/triggers/service/src/main/scala/com/digitalasset/daml/lf/engine/trigger/package.scala +++ b/triggers/service/src/main/scala/com/digitalasset/daml/lf/engine/trigger/package.scala @@ -26,6 +26,7 @@ package trigger { val AccessToken = Tag.of[AccessTokenTag] } import Tagged._ + import com.daml.ledger.api.refinements.ApiTypes.ApplicationId case class LedgerConfig( host: String, @@ -45,6 +46,7 @@ package trigger { triggerInstance: UUID, triggerName: Identifier, triggerParty: Party, + triggerApplicationId: ApplicationId, triggerToken: Option[AccessToken], ) } diff --git a/triggers/service/src/test/scala/com/digitalasset/daml/lf/engine/trigger/TriggerServiceTest.scala b/triggers/service/src/test/scala/com/digitalasset/daml/lf/engine/trigger/TriggerServiceTest.scala index a5bcab9dac..a9a8593aa9 100644 --- a/triggers/service/src/test/scala/com/digitalasset/daml/lf/engine/trigger/TriggerServiceTest.scala +++ b/triggers/service/src/test/scala/com/digitalasset/daml/lf/engine/trigger/TriggerServiceTest.scala @@ -3,7 +3,7 @@ package com.daml.lf.engine.trigger -import com.daml.ledger.api.refinements.ApiTypes.Party +import com.daml.ledger.api.refinements.ApiTypes.{ApplicationId, Party} import com.daml.lf.archive.DarReader import akka.http.scaladsl.Http import akka.http.scaladsl.model._ @@ -24,9 +24,13 @@ import com.daml.bazeltools.BazelRunfiles.requiredResource import com.daml.ledger.api.refinements.ApiTypes import com.daml.ledger.api.v1.commands._ import com.daml.ledger.api.v1.command_service._ +import com.daml.ledger.api.v1.ledger_offset.LedgerOffset +import com.daml.ledger.api.v1.ledger_offset.LedgerOffset.LedgerBoundary.LEDGER_BEGIN +import com.daml.ledger.api.v1.ledger_offset.LedgerOffset.Value.Boundary import com.daml.ledger.api.v1.value.{Identifier, Record, RecordField, Value} import com.daml.ledger.api.v1.transaction_filter.{Filters, InclusiveFilters, TransactionFilter} import com.daml.ledger.client.LedgerClient +import com.daml.ledger.client.services.commands.CompletionStreamElement import com.daml.timer.RetryStrategy import com.typesafe.scalalogging.StrictLogging import org.scalatest.time.{Seconds, Span} @@ -78,13 +82,18 @@ trait AbstractTriggerServiceTest protected val bob: Party = Tag("Bob") protected val eve: Party = Tag("Eve") - def startTrigger(uri: Uri, triggerName: String, party: Party): Future[HttpResponse] = { + def startTrigger( + uri: Uri, + triggerName: String, + party: Party, + applicationId: Option[ApplicationId] = None): Future[HttpResponse] = { val req = HttpRequest( method = HttpMethods.POST, uri = uri.withPath(Uri.Path("/v1/triggers")), entity = HttpEntity( ContentTypes.`application/json`, - s"""{"triggerName": "$triggerName", "party": "$party"}""" + s"""{"triggerName": "$triggerName", "party": "$party", "applicationId": "${applicationId + .getOrElse("null")}"}""" ) ) httpRequestFollow(req) @@ -255,7 +264,7 @@ trait AbstractTriggerServiceTest it should "should enable a trigger on http request" in withTriggerService(List(dar)) { uri: Uri => for { client <- sandboxClient( - ApiTypes.ApplicationId(testId), + ApiTypes.ApplicationId("my-app-id"), actAs = List(ApiTypes.Party(aliceAcs.unwrap))) filter = TransactionFilter( List( @@ -270,7 +279,11 @@ trait AbstractTriggerServiceTest .map(acsPages => acsPages.flatMap(_.activeContracts)) _ = acs shouldBe Vector() // Start the trigger - resp <- startTrigger(uri, s"$testPkgId:TestTrigger:trigger", aliceAcs) + resp <- startTrigger( + uri, + s"$testPkgId:TestTrigger:trigger", + aliceAcs, + Some(ApplicationId("my-app-id"))) triggerId <- parseTriggerId(resp) // Trigger is running, create an A contract @@ -296,6 +309,17 @@ trait AbstractTriggerServiceTest .map(acsPages => acsPages.flatMap(_.activeContracts)) } yield assert(acs.length == 1) } + // Read completions to make sure we set the right app id. + r <- client.commandClient + .completionSource(List(aliceAcs.unwrap), LedgerOffset(Boundary(LEDGER_BEGIN))) + .collect({ + case CompletionStreamElement.CompletionElement(completion) + if !completion.transactionId.isEmpty => + completion + }) + .take(1) + .runWith(Sink.seq) + _ = r.length shouldBe 1 status <- triggerStatus(uri, triggerId) _ = status.status shouldBe StatusCodes.OK body <- responseBodyToString(status)