Make application ID configurable in trigger service (#7974)

* Make application ID configurable in trigger service

fixes #7671

changelog_begin

- [Trigger Service] The application id used by a trigger can now be
  configured by an optional `applicationId` in the start request.

changelog_end

* Update triggers/service/src/main/scala/com/digitalasset/daml/lf/engine/trigger/Server.scala

Co-authored-by: Andreas Herrmann <42969706+aherrmann-da@users.noreply.github.com>

Co-authored-by: Andreas Herrmann <42969706+aherrmann-da@users.noreply.github.com>
This commit is contained in:
Moritz Kiefer 2020-11-17 10:25:30 +01:00 committed by GitHub
parent fdddb56b72
commit 10792ed6fd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 123 additions and 51 deletions

View File

@ -49,9 +49,21 @@ HTTP Request
{
"triggerName": "312094804c1468e2166bae3c9ba8b5cc0d285e31356304a2e9b0ac549df59d14:TestTrigger:trigger",
"party": "alice"
"party": "alice",
"applicationId": "my-app-id"
}
where
- ``triggerName`` contains the identifier for the trigger in the form
``${packageId}:${moduleName}:${identifierName}``. You can find the
package id using ``daml damlc inspect path/to/trigger.dar``.
- ``party`` is the party the trigger will be running as.
- ``applicationId`` is an optional field to specify the application ID
the trigger will use for command submissions. If omitted, the
trigger will default to using its random UUID identifier returned in
the start request as the application ID.
HTTP Response
=============

View File

@ -0,0 +1,7 @@
-- Copyright (c) 2020 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
-- SPDX-License-Identifier: Apache-2.0
-- Add application_id to running trigger table defaulting to trigger_instance
alter table running_triggers add column application_id text;
update running_triggers set application_id = trigger_instance;
alter table running_triggers alter column application_id set not null;

View File

@ -0,0 +1 @@
f79ce69153f2419fc87c0ff14a58df4a6e6b1b3bd34674a73c9f0d750b845236

View File

@ -3,7 +3,7 @@
package com.daml.lf.engine.trigger
import com.daml.ledger.api.refinements.ApiTypes.Party
import com.daml.ledger.api.refinements.ApiTypes.{ApplicationId, Party}
import com.daml.lf.data.Ref.Identifier
import scalaz.Tag
import spray.json.DefaultJsonProtocol._
@ -22,13 +22,18 @@ object Request {
private[Request] implicit val PartyFormat: JsonFormat[Party] =
Tag.subst(implicitly[JsonFormat[String]])
final case class StartParams(triggerName: Identifier, party: Party)
object StartParams extends ((Identifier, Party) => StartParams) {
implicit val startParamsFormat: RootJsonFormat[StartParams] = jsonFormat2(StartParams)
final case class StartParams(
triggerName: Identifier,
party: Party,
applicationId: Option[ApplicationId])
object StartParams {
implicit val applicationIdFormat: JsonFormat[ApplicationId] =
Tag.subst(implicitly[JsonFormat[String]])
implicit val startParamsFormat: RootJsonFormat[StartParams] = jsonFormat3(StartParams.apply)
}
final case class ListParams(party: Party)
object ListParams extends (Party => ListParams) {
implicit val listParamsFormat: RootJsonFormat[ListParams] = jsonFormat1(ListParams)
object ListParams {
implicit val listParamsFormat: RootJsonFormat[ListParams] = jsonFormat1(ListParams.apply)
}
}

View File

@ -27,7 +27,7 @@ import akka.util.{ByteString, Timeout}
import scala.concurrent.duration._
import com.daml.daml_lf_dev.DamlLf
import com.daml.grpc.adapter.{AkkaExecutionSequencerPool, ExecutionSequencerFactory}
import com.daml.ledger.api.refinements.ApiTypes.Party
import com.daml.ledger.api.refinements.ApiTypes.{ApplicationId, Party}
import com.daml.lf.archive.Reader.ParseError
import com.daml.lf.archive.{Dar, DarReader, Decode}
import com.daml.lf.data.Ref.{Identifier, PackageId}
@ -104,8 +104,10 @@ class Server(
private def restartTriggers(triggers: Vector[RunningTrigger]): Either[String, Unit] = {
import cats.implicits._ // needed for traverse
triggers.traverse_(t =>
startTrigger(t.triggerParty, t.triggerName, t.triggerToken, Some(t.triggerInstance)))
triggers.traverse_(runningTrigger =>
for {
trigger <- Trigger.fromIdentifier(compiledPackages, runningTrigger.triggerName)
} yield startTrigger(trigger, runningTrigger))
}
private def triggerRunnerName(triggerInstance: UUID): String =
@ -116,38 +118,44 @@ class Server(
.child(triggerRunnerName(triggerInstance))
.asInstanceOf[Option[ActorRef[TriggerRunner.Message]]]
private def startTrigger(
// Add a new trigger to the database and return the resulting RunningTrigger.
// Note that this does not yet start the trigger.
private def addNewTrigger(
party: Party,
triggerName: Identifier,
token: Option[AccessToken],
existingInstance: Option[UUID] = None): Either[String, JsValue] = {
optApplicationId: Option[ApplicationId],
token: Option[AccessToken]
): Either[String, (Trigger, RunningTrigger)] = {
val newInstance = UUID.randomUUID()
val applicationId = optApplicationId.getOrElse(Tag(newInstance.toString): ApplicationId)
val runningTrigger = RunningTrigger(newInstance, triggerName, party, applicationId, token)
for {
trigger <- Trigger.fromIdentifier(compiledPackages, triggerName)
triggerInstance <- existingInstance match {
case None =>
val newInstance = UUID.randomUUID
triggerDao
.addRunningTrigger(RunningTrigger(newInstance, triggerName, party, token))
.map(_ => newInstance)
case Some(instance) => Right(instance)
}
_ = ctx.spawn(
// Validate trigger id before persisting to DB
trigger <- Trigger.fromIdentifier(compiledPackages, runningTrigger.triggerName)
_ <- triggerDao.addRunningTrigger(runningTrigger).map(_ => runningTrigger)
} yield (trigger, runningTrigger)
}
private def startTrigger(trigger: Trigger, runningTrigger: RunningTrigger): JsValue = {
discard[ActorRef[Message]](
ctx.spawn(
TriggerRunner(
new TriggerRunner.Config(
ctx.self,
triggerInstance,
party,
AccessToken.unsubst(token),
runningTrigger.triggerInstance,
runningTrigger.triggerParty,
runningTrigger.triggerApplicationId,
AccessToken.unsubst(runningTrigger.triggerToken),
compiledPackages,
trigger,
ledgerConfig,
restartConfig
),
triggerInstance.toString
runningTrigger.triggerInstance.toString
),
triggerRunnerName(triggerInstance)
)
} yield JsObject(("triggerId", triggerInstance.toString.toJson))
triggerRunnerName(runningTrigger.triggerInstance)
))
JsObject(("triggerId", runningTrigger.triggerInstance.toString.toJson))
}
private def stopTrigger(uuid: UUID): Either[String, Option[JsValue]] = {
@ -328,13 +336,19 @@ class Server(
val claims =
AuthRequest.Claims(actAs = List(params.party))
// TODO[AH] Why do we need to pass ec, system explicitly?
authorize(claims)(ec, system) { token =>
startTrigger(params.party, params.triggerName, token) match {
case Left(err) =>
complete(errorResponse(StatusCodes.UnprocessableEntity, err))
case Right(triggerInstance) =>
complete(successResponse(triggerInstance))
}
authorize(claims)(ec, system) {
token =>
val instOrErr =
addNewTrigger(params.party, params.triggerName, params.applicationId, token)
.map {
case (trigger, runningTrigger) => startTrigger(trigger, runningTrigger)
}
instOrErr match {
case Left(err) =>
complete(errorResponse(StatusCodes.UnprocessableEntity, err))
case Right(triggerInstance) =>
complete(successResponse(triggerInstance))
}
}
}
},

View File

@ -32,6 +32,7 @@ object TriggerRunnerImpl {
server: ActorRef[Server.Message],
triggerInstance: UUID,
party: Party,
applicationId: ApplicationId,
token: Option[String],
compiledPackages: CompiledPackages,
trigger: Trigger,
@ -62,9 +63,8 @@ object TriggerRunnerImpl {
// Report to the server that this trigger is starting.
config.server ! Server.TriggerStarting(triggerInstance)
logger.info(s"Trigger $name is starting")
val appId = ApplicationId(name)
val clientConfig = LedgerClientConfiguration(
applicationId = appId.unwrap,
applicationId = config.applicationId.unwrap,
ledgerIdRequirement = LedgerIdRequirement.none,
commandClient = CommandClientConfiguration.default.copy(
defaultDeduplicationTime = config.ledgerConfig.commandTtl),
@ -176,7 +176,7 @@ object TriggerRunnerImpl {
config.trigger,
client,
config.ledgerConfig.timeProvider,
appId,
config.applicationId,
config.party.unwrap)
(acs, offset) <- runner.queryACS()
} yield QueriedACS(runner, acs, offset)

View File

@ -10,7 +10,7 @@ import cats.effect.{Blocker, ContextShift, IO}
import cats.syntax.apply._
import cats.syntax.functor._
import com.daml.daml_lf_dev.DamlLf
import com.daml.ledger.api.refinements.ApiTypes.Party
import com.daml.ledger.api.refinements.ApiTypes.{ApplicationId, Party}
import com.daml.lf.archive.{Dar, Reader}
import com.daml.lf.data.Ref.{Identifier, PackageId}
import com.daml.lf.engine.trigger.{JdbcConfig, RunningTrigger}
@ -74,6 +74,10 @@ final class DbTriggerDao private (dataSource: DataSource with Closeable, xa: Con
implicit val partyGet: Get[Party] = Tag.subst(implicitly[Get[String]])
implicit val appIdPut: Put[ApplicationId] = Tag.subst(implicitly[Put[String]])
implicit val appIdGet: Get[ApplicationId] = Tag.subst(implicitly[Get[String]])
implicit val accessTokenPut: Put[AccessToken] = Tag.subst(implicitly[Put[String]])
implicit val accessTokenGet: Get[AccessToken] = Tag.subst(implicitly[Get[String]])
@ -99,18 +103,21 @@ final class DbTriggerDao private (dataSource: DataSource with Closeable, xa: Con
private def insertRunningTrigger(t: RunningTrigger): ConnectionIO[Unit] = {
val insert: Fragment = sql"""
insert into running_triggers values (${t.triggerInstance}, ${t.triggerParty}, ${t.triggerName}, ${t.triggerToken})
insert into running_triggers
(trigger_instance, trigger_party, full_trigger_name, access_token, application_id)
values
(${t.triggerInstance}, ${t.triggerParty}, ${t.triggerName}, ${t.triggerToken}, ${t.triggerApplicationId})
"""
insert.update.run.void
}
private def queryRunningTrigger(triggerInstance: UUID): ConnectionIO[Option[RunningTrigger]] = {
val select: Fragment = sql"""
select trigger_instance, full_trigger_name, trigger_party, access_token from running_triggers
select trigger_instance, full_trigger_name, trigger_party, application_id, access_token from running_triggers
where trigger_instance = $triggerInstance
"""
select
.query[(UUID, Identifier, Party, Option[AccessToken])]
.query[(UUID, Identifier, Party, ApplicationId, Option[AccessToken])]
.map(RunningTrigger.tupled)
.option
}
@ -162,10 +169,10 @@ final class DbTriggerDao private (dataSource: DataSource with Closeable, xa: Con
private def selectAllTriggers: ConnectionIO[Vector[RunningTrigger]] = {
val select: Fragment = sql"""
select trigger_instance, full_trigger_name, trigger_party, access_token from running_triggers order by trigger_instance
select trigger_instance, full_trigger_name, trigger_party, application_id, access_token from running_triggers order by trigger_instance
"""
select
.query[(UUID, Identifier, Party, Option[AccessToken])]
.query[(UUID, Identifier, Party, ApplicationId, Option[AccessToken])]
.map(RunningTrigger.tupled)
.to[Vector]
}

View File

@ -26,6 +26,7 @@ package trigger {
val AccessToken = Tag.of[AccessTokenTag]
}
import Tagged._
import com.daml.ledger.api.refinements.ApiTypes.ApplicationId
case class LedgerConfig(
host: String,
@ -45,6 +46,7 @@ package trigger {
triggerInstance: UUID,
triggerName: Identifier,
triggerParty: Party,
triggerApplicationId: ApplicationId,
triggerToken: Option[AccessToken],
)
}

View File

@ -3,7 +3,7 @@
package com.daml.lf.engine.trigger
import com.daml.ledger.api.refinements.ApiTypes.Party
import com.daml.ledger.api.refinements.ApiTypes.{ApplicationId, Party}
import com.daml.lf.archive.DarReader
import akka.http.scaladsl.Http
import akka.http.scaladsl.model._
@ -24,9 +24,13 @@ import com.daml.bazeltools.BazelRunfiles.requiredResource
import com.daml.ledger.api.refinements.ApiTypes
import com.daml.ledger.api.v1.commands._
import com.daml.ledger.api.v1.command_service._
import com.daml.ledger.api.v1.ledger_offset.LedgerOffset
import com.daml.ledger.api.v1.ledger_offset.LedgerOffset.LedgerBoundary.LEDGER_BEGIN
import com.daml.ledger.api.v1.ledger_offset.LedgerOffset.Value.Boundary
import com.daml.ledger.api.v1.value.{Identifier, Record, RecordField, Value}
import com.daml.ledger.api.v1.transaction_filter.{Filters, InclusiveFilters, TransactionFilter}
import com.daml.ledger.client.LedgerClient
import com.daml.ledger.client.services.commands.CompletionStreamElement
import com.daml.timer.RetryStrategy
import com.typesafe.scalalogging.StrictLogging
import org.scalatest.time.{Seconds, Span}
@ -78,13 +82,18 @@ trait AbstractTriggerServiceTest
protected val bob: Party = Tag("Bob")
protected val eve: Party = Tag("Eve")
def startTrigger(uri: Uri, triggerName: String, party: Party): Future[HttpResponse] = {
def startTrigger(
uri: Uri,
triggerName: String,
party: Party,
applicationId: Option[ApplicationId] = None): Future[HttpResponse] = {
val req = HttpRequest(
method = HttpMethods.POST,
uri = uri.withPath(Uri.Path("/v1/triggers")),
entity = HttpEntity(
ContentTypes.`application/json`,
s"""{"triggerName": "$triggerName", "party": "$party"}"""
s"""{"triggerName": "$triggerName", "party": "$party", "applicationId": "${applicationId
.getOrElse("null")}"}"""
)
)
httpRequestFollow(req)
@ -255,7 +264,7 @@ trait AbstractTriggerServiceTest
it should "should enable a trigger on http request" in withTriggerService(List(dar)) { uri: Uri =>
for {
client <- sandboxClient(
ApiTypes.ApplicationId(testId),
ApiTypes.ApplicationId("my-app-id"),
actAs = List(ApiTypes.Party(aliceAcs.unwrap)))
filter = TransactionFilter(
List(
@ -270,7 +279,11 @@ trait AbstractTriggerServiceTest
.map(acsPages => acsPages.flatMap(_.activeContracts))
_ = acs shouldBe Vector()
// Start the trigger
resp <- startTrigger(uri, s"$testPkgId:TestTrigger:trigger", aliceAcs)
resp <- startTrigger(
uri,
s"$testPkgId:TestTrigger:trigger",
aliceAcs,
Some(ApplicationId("my-app-id")))
triggerId <- parseTriggerId(resp)
// Trigger is running, create an A contract
@ -296,6 +309,17 @@ trait AbstractTriggerServiceTest
.map(acsPages => acsPages.flatMap(_.activeContracts))
} yield assert(acs.length == 1)
}
// Read completions to make sure we set the right app id.
r <- client.commandClient
.completionSource(List(aliceAcs.unwrap), LedgerOffset(Boundary(LEDGER_BEGIN)))
.collect({
case CompletionStreamElement.CompletionElement(completion)
if !completion.transactionId.isEmpty =>
completion
})
.take(1)
.runWith(Sink.seq)
_ = r.length shouldBe 1
status <- triggerStatus(uri, triggerId)
_ = status.status shouldBe StatusCodes.OK
body <- responseBodyToString(status)