mirror of
https://github.com/digital-asset/daml.git
synced 2024-09-20 01:07:18 +03:00
Support Exercise and Create commands in DAML triggers (#2864)
* Support Exercise and Create commands in DAML triggers This is a bit rough but I want something to experiment with before we settle on a design and start making the LF changes required to make it nicer. It does already allow you to use the original template and choice types. * Update triggers/runner/src/main/scala/com/daml/trigger/Runner.scala Co-Authored-By: Martin Huschenbett <martin.huschenbett@posteo.me>
This commit is contained in:
parent
07d1930fdc
commit
77225af6da
@ -1,6 +1,6 @@
|
||||
-- Copyright (c) 2019 The DAML Authors. All rights reserved.
|
||||
-- SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
{-# LANGUAGE AllowAmbiguousTypes #-}
|
||||
daml 1.2
|
||||
module Daml.Trigger
|
||||
( Message(..)
|
||||
@ -11,12 +11,17 @@ module Daml.Trigger
|
||||
, Archived(..)
|
||||
, Trigger(..)
|
||||
, ActiveContracts(..)
|
||||
, Commands(..)
|
||||
, TemplateId(..)
|
||||
, Command
|
||||
, exerciseCmd
|
||||
, createCmd
|
||||
) where
|
||||
|
||||
data Identifier = Identifier
|
||||
{ packageId : Text
|
||||
, moduleName : Text
|
||||
, name : Text
|
||||
, entityName : Text
|
||||
} deriving (Show, Eq)
|
||||
|
||||
data Transaction = Transaction
|
||||
@ -51,6 +56,49 @@ data ActiveContracts = ActiveContracts { activeContracts : [Created] }
|
||||
-- | Trigger is (approximately) a left-fold over `Message` with
|
||||
-- an accumulator of type `s`.
|
||||
data Trigger s = Trigger
|
||||
{ initialState : ActiveContracts -> s
|
||||
, update : Message -> s -> (s, Text)
|
||||
{ initialState : Party -> ActiveContracts -> s
|
||||
, update : Message -> s -> (s, Optional Commands, Text)
|
||||
}
|
||||
|
||||
-- | We implicitly assume that the package id corresponds to the package the trigger is part of.
|
||||
-- This type is temporary until we have a builtin in LF for identifiers.
|
||||
data TemplateId = TemplateId
|
||||
{ moduleName : Text
|
||||
, entityName : Text
|
||||
}
|
||||
|
||||
-- | This is an internal hack until we have this as a builtin in DAML-LF.
|
||||
|
||||
-- You can think of this as an existential `data Dynamic = forall a. Dynamic a`
|
||||
-- that you can’t convert back from.
|
||||
data LedgerValue = LedgerValue {}
|
||||
|
||||
-- | At runtime we turn this into the identity function
|
||||
-- and convert the result to ledger values.
|
||||
toLedgerValue : a -> LedgerValue
|
||||
toLedgerValue = error "toLedgerValue should be removed."
|
||||
|
||||
data Command
|
||||
= CreateCommand
|
||||
{ templateId : TemplateId
|
||||
, templateArg : LedgerValue
|
||||
}
|
||||
| ExerciseCommand
|
||||
{ templateId : TemplateId
|
||||
, contractId : Text
|
||||
, choiceName : Text
|
||||
, choiceArg : LedgerValue
|
||||
}
|
||||
|
||||
createCmd : Template t => TemplateId -> t -> Command
|
||||
createCmd templateId templateArg =
|
||||
CreateCommand templateId (toLedgerValue templateArg)
|
||||
|
||||
exerciseCmd : Choice t c r => TemplateId -> Text -> Text -> c -> Command
|
||||
exerciseCmd templateId contractId choiceName choiceArg =
|
||||
ExerciseCommand templateId contractId choiceName (toLedgerValue choiceArg)
|
||||
|
||||
data Commands = Commands
|
||||
{ commandId : Text
|
||||
, commands : [Command]
|
||||
}
|
||||
|
@ -24,6 +24,7 @@ da_scala_library(
|
||||
"//language-support/scala/bindings",
|
||||
"//language-support/scala/bindings-akka",
|
||||
"//ledger-api/rs-grpc-bridge",
|
||||
"//ledger/ledger-api-common",
|
||||
],
|
||||
)
|
||||
|
||||
|
@ -4,17 +4,25 @@
|
||||
package com.daml.trigger
|
||||
|
||||
import java.io.File
|
||||
import java.time.Instant
|
||||
import java.util
|
||||
|
||||
import akka.actor.ActorSystem
|
||||
import akka.stream._
|
||||
import akka.stream.scaladsl._
|
||||
import com.digitalasset.grpc.adapter.AkkaExecutionSequencerPool
|
||||
import scalaz.syntax.tag._
|
||||
import scalaz.syntax.traverse._
|
||||
import scalaz.std.either._
|
||||
import scala.collection.JavaConverters._
|
||||
import scala.concurrent.{Await, ExecutionContext, Future}
|
||||
import scala.concurrent.duration.Duration
|
||||
|
||||
import com.digitalasset.ledger.api.domain.LedgerId
|
||||
import com.digitalasset.ledger.api.refinements.ApiTypes.{ApplicationId}
|
||||
|
||||
import com.digitalasset.ledger.api.v1.command_submission_service.SubmitRequest
|
||||
import com.digitalasset.api.util.TimestampConversion.fromInstant
|
||||
import com.digitalasset.ledger.api.v1.ledger_offset.LedgerOffset
|
||||
import com.digitalasset.ledger.api.v1.event._
|
||||
import com.digitalasset.ledger.api.v1.event.Event.Event.{Archived, Created}
|
||||
@ -23,17 +31,24 @@ import com.digitalasset.ledger.api.v1.transaction_filter.{Filters, TransactionFi
|
||||
import com.digitalasset.ledger.api.v1.value
|
||||
import com.digitalasset.ledger.api.refinements.ApiTypes.{ApplicationId}
|
||||
import com.digitalasset.ledger.client.LedgerClient
|
||||
import com.digitalasset.ledger.api.v1.commands.{Commands, Command, CreateCommand, ExerciseCommand}
|
||||
import com.digitalasset.ledger.client.configuration.{
|
||||
CommandClientConfiguration,
|
||||
LedgerClientConfiguration,
|
||||
LedgerIdRequirement
|
||||
}
|
||||
import com.digitalasset.platform.participant.util.LfEngineToApi.{
|
||||
toApiIdentifier,
|
||||
lfValueToApiRecord,
|
||||
lfValueToApiValue
|
||||
}
|
||||
import com.digitalasset.daml.lf.PureCompiledPackages
|
||||
import com.digitalasset.daml.lf.archive.Dar
|
||||
import com.digitalasset.daml.lf.archive.Dar._
|
||||
import com.digitalasset.daml.lf.archive.DarReader
|
||||
import com.digitalasset.daml.lf.archive.Decode
|
||||
import com.digitalasset.daml.lf.data.Ref._
|
||||
import com.digitalasset.daml.lf.value.Value.{AbsoluteContractId, RelativeContractId}
|
||||
import com.digitalasset.daml.lf.data.ImmArray
|
||||
import com.digitalasset.daml.lf.data.FrontStack
|
||||
import com.digitalasset.daml_lf.DamlLf
|
||||
@ -94,7 +109,35 @@ object RunnerConfig {
|
||||
}
|
||||
|
||||
// Convert from a Ledger API transaction to an SValue corresponding to a Message from the Daml.Trigger module
|
||||
case class Converter(fromTransaction: Transaction => SValue, fromACS: Seq[CreatedEvent] => SValue)
|
||||
case class Converter(
|
||||
fromTransaction: Transaction => SValue,
|
||||
fromACS: Seq[CreatedEvent] => SValue,
|
||||
toCommands: SValue => Either[String, (String, Seq[Command])]
|
||||
)
|
||||
|
||||
// Helper to create identifiers pointing to the DAML.Trigger module
|
||||
case class TriggerIds(
|
||||
triggerPackageId: PackageId,
|
||||
triggerModuleName: ModuleName,
|
||||
mainPackageId: PackageId) {
|
||||
def getId(n: String): Identifier =
|
||||
Identifier(triggerPackageId, QualifiedName(triggerModuleName, DottedName.assertFromString(n)))
|
||||
}
|
||||
|
||||
object TriggerIds {
|
||||
def fromDar(dar: Dar[(PackageId, Package)]): TriggerIds = {
|
||||
val triggerModuleName = DottedName.assertFromString("Daml.Trigger")
|
||||
// We might want to just fix this at compile time at some point
|
||||
// once we ship the trigger lib with the SDK.
|
||||
val triggerPackageId: PackageId = dar.all
|
||||
.find {
|
||||
case (pkgId, pkg) => pkg.modules.contains(triggerModuleName)
|
||||
}
|
||||
.get
|
||||
._1
|
||||
TriggerIds(triggerPackageId, triggerModuleName, dar.main._1)
|
||||
}
|
||||
}
|
||||
|
||||
object Converter {
|
||||
// Helper to make constructing an SRecord more convenient
|
||||
@ -104,24 +147,25 @@ object Converter {
|
||||
SRecord(ty, fieldNames, args)
|
||||
}
|
||||
|
||||
// Helper to create identifiers pointing to the DAML.Trigger module
|
||||
private case class TriggerIds(triggerPackageId: PackageId, triggerModuleName: ModuleName) {
|
||||
def getId(n: String): Identifier =
|
||||
Identifier(triggerPackageId, QualifiedName(triggerModuleName, DottedName.assertFromString(n)))
|
||||
private def toLedgerRecord(v: SValue) = {
|
||||
lfValueToApiRecord(
|
||||
true,
|
||||
v.toValue.mapContractId {
|
||||
case rcoid: RelativeContractId =>
|
||||
throw new RuntimeException(s"Unexpected contract id $rcoid")
|
||||
case acoid: AbsoluteContractId => acoid
|
||||
}
|
||||
)
|
||||
}
|
||||
private object TriggerIds {
|
||||
def fromDar(dar: Dar[(PackageId, Package)]): TriggerIds = {
|
||||
val triggerModuleName = DottedName.assertFromString("Daml.Trigger")
|
||||
// We might want to just fix this at compile time at some point
|
||||
// once we ship the trigger lib with the SDK.
|
||||
val triggerPackageId: PackageId = dar.all
|
||||
.find {
|
||||
case (pkgId, pkg) => pkg.modules.contains(triggerModuleName)
|
||||
}
|
||||
.get
|
||||
._1
|
||||
TriggerIds(triggerPackageId, triggerModuleName)
|
||||
}
|
||||
private def toLedgerValue(v: SValue) = {
|
||||
lfValueToApiValue(
|
||||
true,
|
||||
v.toValue.mapContractId {
|
||||
case rcoid: RelativeContractId =>
|
||||
throw new RuntimeException(s"Unexpected contract id $rcoid")
|
||||
case acoid: AbsoluteContractId => acoid
|
||||
}
|
||||
)
|
||||
}
|
||||
|
||||
private def fromIdentifier(triggerIds: TriggerIds, id: value.Identifier): SValue = {
|
||||
@ -189,6 +233,91 @@ object Converter {
|
||||
)
|
||||
}
|
||||
|
||||
private def toText(v: SValue): Either[String, String] = {
|
||||
v match {
|
||||
case SText(t) => Right(t)
|
||||
case _ => Left(s"Expected Text but got $v")
|
||||
}
|
||||
}
|
||||
|
||||
private def toTemplateId(triggerIds: TriggerIds, v: SValue): Either[String, Identifier] = {
|
||||
v match {
|
||||
case SRecord(_, _, vals) => {
|
||||
assert(vals.size == 2)
|
||||
for {
|
||||
moduleName <- toText(vals.get(0)).flatMap(DottedName.fromString)
|
||||
entityName <- toText(vals.get(1)).flatMap(DottedName.fromString)
|
||||
} yield Identifier(triggerIds.mainPackageId, QualifiedName(moduleName, entityName))
|
||||
}
|
||||
case _ => Left(s"Expected TemplateId but got $v")
|
||||
}
|
||||
}
|
||||
|
||||
private def toCreate(triggerIds: TriggerIds, v: SValue): Either[String, CreateCommand] = {
|
||||
v match {
|
||||
case SRecord(_, _, vals) => {
|
||||
assert(vals.size == 2)
|
||||
for {
|
||||
templateId <- toTemplateId(triggerIds, vals.get(0))
|
||||
templateArg <- toLedgerRecord(vals.get(1))
|
||||
} yield CreateCommand(Some(toApiIdentifier(templateId)), Some(templateArg))
|
||||
}
|
||||
case _ => Left(s"Expected CreateCommand but got $v")
|
||||
}
|
||||
}
|
||||
|
||||
private def toExercise(triggerIds: TriggerIds, v: SValue): Either[String, ExerciseCommand] = {
|
||||
v match {
|
||||
case SRecord(_, _, vals) => {
|
||||
assert(vals.size == 4)
|
||||
for {
|
||||
templateId <- toTemplateId(triggerIds, vals.get(0))
|
||||
contractId <- toText(vals.get(1))
|
||||
choiceName <- toText(vals.get(2))
|
||||
choiceArg <- toLedgerValue(vals.get(2))
|
||||
} yield
|
||||
ExerciseCommand(
|
||||
Some(toApiIdentifier(templateId)),
|
||||
contractId,
|
||||
choiceName,
|
||||
Some(choiceArg))
|
||||
}
|
||||
case _ => Left(s"Expected ExerciseCommand but got $v")
|
||||
}
|
||||
}
|
||||
|
||||
private def toCommand(triggerIds: TriggerIds, v: SValue): Either[String, Command] = {
|
||||
v match {
|
||||
case SVariant(_, "CreateCommand", createVal) =>
|
||||
for {
|
||||
create <- toCreate(triggerIds, createVal)
|
||||
} yield Command().withCreate(create)
|
||||
case SVariant(_, "ExerciseCommand", exerciseVal) =>
|
||||
for {
|
||||
exercise <- toExercise(triggerIds, exerciseVal)
|
||||
} yield Command().withExercise(exercise)
|
||||
case _ => Left("Expected CreateCommand or ExerciseCommand but got $v")
|
||||
}
|
||||
}
|
||||
|
||||
private def toCommands(
|
||||
triggerIds: TriggerIds,
|
||||
v: SValue): Either[String, (String, Seq[Command])] = {
|
||||
v match {
|
||||
case SRecord(_, _, vals) => {
|
||||
assert(vals.size == 2)
|
||||
for {
|
||||
commandId <- toText(vals.get(0))
|
||||
commands <- vals.get(1) match {
|
||||
case SList(cmdValues) => cmdValues.traverseU(v => toCommand(triggerIds, v))
|
||||
case _ => Left("Expected List but got ${vals.get(1)}")
|
||||
}
|
||||
} yield (commandId, commands.toImmArray.toSeq)
|
||||
}
|
||||
case _ => Left("Expected Commands but got $v")
|
||||
}
|
||||
}
|
||||
|
||||
private def fromACS(triggerIds: TriggerIds, createdEvents: Seq[CreatedEvent]): SValue = {
|
||||
val activeContractsTy = triggerIds.getId("ActiveContracts")
|
||||
record(
|
||||
@ -198,24 +327,49 @@ object Converter {
|
||||
|
||||
def fromDar(dar: Dar[(PackageId, Package)]): Converter = {
|
||||
val triggerIds = TriggerIds.fromDar(dar)
|
||||
Converter(fromTransaction(triggerIds, _), fromACS(triggerIds, _))
|
||||
Converter(
|
||||
fromTransaction(triggerIds, _),
|
||||
fromACS(triggerIds, _),
|
||||
toCommands(triggerIds, _)
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
case class Runner(getTriggerSink: Seq[CreatedEvent] => Sink[Transaction, Future[SExpr]])
|
||||
class Runner(
|
||||
ledgerId: LedgerId,
|
||||
applicationId: ApplicationId,
|
||||
party: String,
|
||||
dar: Dar[(PackageId, Package)],
|
||||
submit: SubmitRequest => Unit) {
|
||||
|
||||
object Runner {
|
||||
def fromDar(dar: Dar[(PackageId, Package)], triggerId: Identifier): Runner = {
|
||||
val converter = Converter.fromDar(dar)
|
||||
val compiledPackages = PureCompiledPackages(dar.all.toMap).right.get
|
||||
val compiler = Compiler(compiledPackages.packages)
|
||||
private val converter = Converter.fromDar(dar)
|
||||
private val triggerIds = TriggerIds.fromDar(dar)
|
||||
private val darMap: Map[PackageId, Package] = dar.all.toMap
|
||||
private val compiler = Compiler(darMap)
|
||||
// We overwrite the definition of toLedgerValue with an identity function.
|
||||
// This is a type error but Speedy doesn’t care about the types and the only thing we do
|
||||
// with the result is convert it to ledger values/record so this is safe.
|
||||
private val definitionMap =
|
||||
compiler.compilePackages(darMap.keys) +
|
||||
(LfDefRef(
|
||||
Identifier(
|
||||
triggerIds.triggerPackageId,
|
||||
QualifiedName(
|
||||
triggerIds.triggerModuleName,
|
||||
DottedName.assertFromString("toLedgerValue")))) ->
|
||||
SEMakeClo(Array(), 1, SEVar(1)))
|
||||
private val compiledPackages = PureCompiledPackages(darMap, definitionMap).right.get
|
||||
|
||||
def getTriggerSink(
|
||||
triggerId: Identifier,
|
||||
acs: Seq[CreatedEvent]): Sink[Transaction, Future[SExpr]] = {
|
||||
val triggerExpr = EVal(triggerId)
|
||||
val (tyCon: TypeConName, stateTy) =
|
||||
dar.main._2.lookupIdentifier(triggerId.qualifiedName).right.get match {
|
||||
case DValue(TApp(TTyCon(tcon), stateTy), _, _, _) => (tcon, stateTy)
|
||||
dar.main._2.lookupIdentifier(triggerId.qualifiedName).toOption match {
|
||||
case Some(DValue(TApp(TTyCon(tcon), stateTy), _, _, _)) => (tcon, stateTy)
|
||||
case _ => {
|
||||
throw new RuntimeException(s"Identifier does not point to trigger")
|
||||
throw new RuntimeException(
|
||||
s"Identifier ${triggerId.qualifiedName} does not point to trigger")
|
||||
}
|
||||
}
|
||||
val triggerTy: TypeConApp = TypeConApp(tyCon, ImmArray(stateTy))
|
||||
@ -223,11 +377,27 @@ object Runner {
|
||||
val getInitialState =
|
||||
compiler.compile(ERecProj(triggerTy, Name.assertFromString("initialState"), triggerExpr))
|
||||
|
||||
def getSink(acs: Seq[CreatedEvent]): Sink[Transaction, Future[SExpr]] = {
|
||||
val machine = Speedy.Machine.fromSExpr(null, false, compiledPackages)
|
||||
val createdExpr: SExpr = SEValue(converter.fromACS(acs))
|
||||
val initialState = SEApp(getInitialState, Array(createdExpr))
|
||||
machine.ctrl = Speedy.CtrlExpr(initialState)
|
||||
val machine = Speedy.Machine.fromSExpr(null, false, compiledPackages)
|
||||
val createdExpr: SExpr = SEValue(converter.fromACS(acs))
|
||||
val initialState =
|
||||
SEApp(getInitialState, Array(SEValue(SParty(Party.assertFromString(party))), createdExpr))
|
||||
machine.ctrl = Speedy.CtrlExpr(initialState)
|
||||
while (!machine.isFinal) {
|
||||
machine.step() match {
|
||||
case SResultContinue => ()
|
||||
case SResultError(err) => {
|
||||
throw new RuntimeException(err)
|
||||
}
|
||||
case res => {
|
||||
throw new RuntimeException(s"Unexpected speedy result $res")
|
||||
}
|
||||
}
|
||||
}
|
||||
val evaluatedInitialState = machine.toSValue
|
||||
println(s"Initial state: $evaluatedInitialState")
|
||||
Sink.fold[SExpr, Transaction](SEValue(evaluatedInitialState))((state, transaction) => {
|
||||
val message = converter.fromTransaction(transaction)
|
||||
machine.ctrl = Speedy.CtrlExpr(SEApp(update, Array(SEValue(message), state)))
|
||||
while (!machine.isFinal) {
|
||||
machine.step() match {
|
||||
case SResultContinue => ()
|
||||
@ -239,37 +409,47 @@ object Runner {
|
||||
}
|
||||
}
|
||||
}
|
||||
val evaluatedInitialState = machine.toSValue
|
||||
println(s"Initial state: $evaluatedInitialState")
|
||||
Sink.fold[SExpr, Transaction](SEValue(evaluatedInitialState))((state, transaction) => {
|
||||
val message = converter.fromTransaction(transaction)
|
||||
machine.ctrl = Speedy.CtrlExpr(SEApp(update, Array(SEValue(message), state)))
|
||||
while (!machine.isFinal) {
|
||||
machine.step() match {
|
||||
case SResultContinue => ()
|
||||
case SResultError(err) => {
|
||||
throw new RuntimeException(err)
|
||||
}
|
||||
case res => {
|
||||
throw new RuntimeException(s"Unexpected speed result $res")
|
||||
}
|
||||
machine.toSValue match {
|
||||
case SRecord(recordId, _, values)
|
||||
if recordId.qualifiedName ==
|
||||
QualifiedName(
|
||||
DottedName.assertFromString("DA.Types"),
|
||||
DottedName.assertFromString("Tuple3")) => {
|
||||
val newState = values.get(0)
|
||||
val commandOpt = values.get(1)
|
||||
val logMessage = values.get(2) match {
|
||||
case SText(t) => t
|
||||
case _ =>
|
||||
throw new RuntimeException(s"Log message should be text but was ${values.get(2)}")
|
||||
}
|
||||
println(s"New state: $newState")
|
||||
println(s"Emitted log message: ${logMessage}")
|
||||
commandOpt match {
|
||||
case SOptional(Some(commandsVal)) =>
|
||||
converter.toCommands(commandsVal) match {
|
||||
case Left(err) => throw new RuntimeException(err)
|
||||
case Right((commandId, commands)) => {
|
||||
val commandsArg = Commands(
|
||||
ledgerId = ledgerId.unwrap,
|
||||
applicationId = applicationId.unwrap,
|
||||
commandId = commandId,
|
||||
party = party,
|
||||
ledgerEffectiveTime = Some(fromInstant(Instant.EPOCH)),
|
||||
maximumRecordTime = Some(fromInstant(Instant.EPOCH.plusSeconds(5))),
|
||||
commands = commands
|
||||
)
|
||||
submit(SubmitRequest(commands = Some(commandsArg)))
|
||||
}
|
||||
}
|
||||
case _ => {}
|
||||
}
|
||||
SEValue(newState)
|
||||
}
|
||||
machine.toSValue match {
|
||||
case SRecord(_, _, values) => {
|
||||
val newState = values.get(0)
|
||||
val command = values.get(1)
|
||||
println(s"Emitted log message: $command")
|
||||
println(s"New state: $newState")
|
||||
SEValue(newState)
|
||||
}
|
||||
case v => {
|
||||
throw new RuntimeException(s"Expected Tuple2 but got $v")
|
||||
}
|
||||
case v => {
|
||||
throw new RuntimeException(s"Expected Tuple3 but got $v")
|
||||
}
|
||||
})
|
||||
}
|
||||
Runner(getSink)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@ -288,7 +468,6 @@ object RunnerMain {
|
||||
|
||||
val triggerId: Identifier =
|
||||
Identifier(dar.main._1, QualifiedName.assertFromString(config.triggerIdentifier))
|
||||
val runner = Runner.fromDar(dar, triggerId)
|
||||
|
||||
val system: ActorSystem = ActorSystem("TriggerRunner")
|
||||
implicit val materializer: ActorMaterializer = ActorMaterializer()(system)
|
||||
@ -309,6 +488,11 @@ object RunnerMain {
|
||||
client <- LedgerClient.singleHost(config.ledgerHost, config.ledgerPort, clientConfig)(
|
||||
ec,
|
||||
sequencer)
|
||||
runner <- Future {
|
||||
new Runner(client.ledgerId, applicationId, config.ledgerParty, dar, submitRequest => {
|
||||
val _ = client.commandClient.submitSingleCommand(submitRequest)
|
||||
})
|
||||
}
|
||||
acsResponses <- client.activeContractSetClient
|
||||
.getActiveContracts(filter, verbose = true)
|
||||
.runWith(Sink.seq)
|
||||
@ -324,7 +508,7 @@ object RunnerMain {
|
||||
offset,
|
||||
None,
|
||||
TransactionFilter(List((config.ledgerParty, Filters.defaultInstance)).toMap))
|
||||
.runWith(runner.getTriggerSink(acsResponses.flatMap(x => x.activeContracts)))
|
||||
.runWith(runner.getTriggerSink(triggerId, acsResponses.flatMap(x => x.activeContracts)))
|
||||
} yield ()
|
||||
|
||||
flow.onComplete(_ => system.terminate())
|
||||
|
@ -9,27 +9,56 @@ import qualified DA.TextMap as TM
|
||||
|
||||
import Daml.Trigger
|
||||
|
||||
type ACS = TextMap Identifier
|
||||
data TriggerState = TriggerState
|
||||
{ activeAssets : TextMap Identifier
|
||||
, nextCommandId : Int
|
||||
, party : Party
|
||||
}
|
||||
|
||||
test : Trigger (TextMap Identifier)
|
||||
initState : Party -> ActiveContracts -> TriggerState
|
||||
initState party (ActiveContracts events) = TriggerState (foldl updateAcs TM.empty events) 0 party
|
||||
where
|
||||
updateAcs : TextMap Identifier -> Created -> TextMap Identifier
|
||||
updateAcs acs (Created _ cId tId)
|
||||
| tId.entityName == "Asset" = TM.insert cId tId acs
|
||||
| otherwise = acs
|
||||
|
||||
-- | This is a very silly trigger for testing purposes:
|
||||
-- We track the active Asset contracts (we make no attempts to distinguish different things called Asset)
|
||||
-- and we create a new AssetMirror contract whenever an Asset contract is created (but we do not archive them).
|
||||
test : Trigger TriggerState
|
||||
test = Trigger
|
||||
{ initialState = init
|
||||
{ initialState = initState
|
||||
, update = update
|
||||
}
|
||||
where
|
||||
init : ActiveContracts -> TextMap Identifier
|
||||
init (ActiveContracts events) = foldl updateAcs TM.empty events
|
||||
updateAcs : TextMap Identifier -> Created -> TextMap Identifier
|
||||
updateAcs acs (Created _ cId tId) = TM.insert cId tId acs
|
||||
update : Message -> TextMap Identifier -> (TextMap Identifier, Text)
|
||||
update (MTransaction t) acs = (foldl updateEvent acs (events t), transactionId t)
|
||||
updateEvent : TextMap Identifier -> Event -> TextMap Identifier
|
||||
updateEvent acs ev = case ev of
|
||||
CreatedEvent (Created _ cId tId) -> TM.insert cId tId acs
|
||||
ArchivedEvent (Archived _ cId _) -> TM.delete cId acs
|
||||
update : Message -> TriggerState -> (TriggerState, Optional Commands, Text)
|
||||
update (MTransaction t) (TriggerState acs nextId party) = case foldl updateEvent ([], acs) (events t) of
|
||||
([], acs) -> (TriggerState acs nextId party, None, "No command")
|
||||
(cmds, acs) ->
|
||||
( TriggerState acs (nextId + 1) party
|
||||
, Some $ Commands ("command_" <> show nextId) cmds
|
||||
, "Submitted " <> show (length cmds) <> " commands"
|
||||
)
|
||||
where
|
||||
updateEvent : ([Command], TextMap Identifier) -> Event -> ([Command], TextMap Identifier)
|
||||
updateEvent (cmds, acs) ev = case ev of
|
||||
CreatedEvent (Created _ cId tId)
|
||||
| tId.entityName == "Asset" ->
|
||||
let createMirror : Command = createCmd (TemplateId "ACS" "AssetMirror") (AssetMirror { issuer = party })
|
||||
in (createMirror :: cmds, TM.insert cId tId acs)
|
||||
ArchivedEvent (Archived _ cId tId)
|
||||
| tId.entityName == "Asset" -> (cmds, TM.delete cId acs)
|
||||
_ -> (cmds, acs)
|
||||
|
||||
template Asset
|
||||
with
|
||||
issuer : Party
|
||||
where
|
||||
signatory issuer
|
||||
|
||||
template AssetMirror
|
||||
with
|
||||
issuer : Party
|
||||
where
|
||||
signatory issuer
|
||||
|
@ -66,6 +66,9 @@ object AcsMain {
|
||||
|
||||
private val applicationId = ApplicationId("AscMain test")
|
||||
|
||||
case class ActiveAssetMirrors(num: Int)
|
||||
case class NumTransactions(num: Long)
|
||||
|
||||
def main(args: Array[String]): Unit = {
|
||||
configParser.parse(args, Config(0, null)) match {
|
||||
case None =>
|
||||
@ -86,7 +89,6 @@ object AcsMain {
|
||||
|
||||
val triggerId: Identifier =
|
||||
Identifier(dar.main._1, QualifiedName.assertFromString("ACS:test"))
|
||||
val runner = Runner.fromDar(dar, triggerId)
|
||||
|
||||
val system: ActorSystem = ActorSystem("TriggerRunner")
|
||||
implicit val materializer: ActorMaterializer = ActorMaterializer()(system)
|
||||
@ -194,7 +196,9 @@ object AcsMain {
|
||||
s"Alice$partyCount"
|
||||
}
|
||||
|
||||
def test(transactions: Long, commands: (LedgerClient, String) => Future[Set[String]]) = {
|
||||
def test(
|
||||
transactions: NumTransactions,
|
||||
commands: (LedgerClient, String) => Future[(Set[String], ActiveAssetMirrors)]) = {
|
||||
val party = getNewParty()
|
||||
val clientF =
|
||||
LedgerClient.singleHost("localhost", config.ledgerPort, clientConfig)(ec, sequencer)
|
||||
@ -210,19 +214,25 @@ object AcsMain {
|
||||
.fold(LedgerOffset().withBoundary(LedgerOffset.LedgerBoundary.LEDGER_BEGIN))(resp =>
|
||||
LedgerOffset().withAbsolute(resp.offset))
|
||||
}
|
||||
runner <- Future {
|
||||
new Runner(client.ledgerId, applicationId, party, dar, submitRequest => {
|
||||
val _ = client.commandClient.submitSingleCommand(submitRequest)
|
||||
})
|
||||
}
|
||||
finalState <- client.transactionClient
|
||||
.getTransactions(offset, None, filter)
|
||||
.take(transactions)
|
||||
.runWith(runner.getTriggerSink(acsResponses.flatMap(x => x.activeContracts)))
|
||||
.take(transactions.num)
|
||||
.runWith(
|
||||
runner.getTriggerSink(triggerId, acsResponses.flatMap(x => x.activeContracts)))
|
||||
} yield finalState
|
||||
val commandsFlow: Future[Set[String]] = for {
|
||||
val commandsFlow: Future[(Set[String], ActiveAssetMirrors)] = for {
|
||||
client <- clientF
|
||||
activeContracts <- commands(client, party)
|
||||
} yield activeContracts
|
||||
r <- commands(client, party)
|
||||
} yield r
|
||||
|
||||
// We want to error out if either of the futures fails so Future.sequence
|
||||
// does not do the trick and we have to hack around it using a Promise
|
||||
val p = Promise[(SExpr, Set[String])]()
|
||||
val p = Promise[(SExpr, (Set[String], ActiveAssetMirrors))]()
|
||||
triggerFlow.onComplete(r =>
|
||||
r match {
|
||||
case Success(_) => ()
|
||||
@ -253,38 +263,60 @@ object AcsMain {
|
||||
val r = Await.result(p.future, Duration.Inf)
|
||||
|
||||
r._1 match {
|
||||
case SEValue(SMap(v)) =>
|
||||
case SEValue(SRecord(_, _, vals)) => {
|
||||
assert(vals.size == 3, s"Expected record with 3 fields but got ${r._1}")
|
||||
val activeAssets = vals.get(0) match {
|
||||
case SMap(v) => v.keySet
|
||||
case _ => throw new RuntimeException(s"Expected a map but got ${vals.get(0)}")
|
||||
}
|
||||
assert(activeAssets == r._2._1, s"Expected ${r._2._1} but got $activeAssets")
|
||||
val activeMirrorContractsF: Future[Int] = for {
|
||||
client <- clientF
|
||||
acsResponses <- client.activeContractSetClient
|
||||
.getActiveContracts(filter, verbose = true)
|
||||
.runWith(Sink.seq)
|
||||
|
||||
} yield
|
||||
(acsResponses
|
||||
.flatMap(x => x.activeContracts)
|
||||
.filter(x => x.getTemplateId.entityName == "AssetMirror")
|
||||
.size)
|
||||
val activeMirrorContracts = Await.result(activeMirrorContractsF, Duration.Inf)
|
||||
assert(
|
||||
v.keySet == r._2,
|
||||
"Expected " + r._2.toString + " but got " + v.keySet.toString)
|
||||
case _ => assert(false, "Expected a map but got " + r._1.toString)
|
||||
activeMirrorContracts == r._2._2.num,
|
||||
s"Expected ${r._2._2.num} but got $activeMirrorContracts")
|
||||
}
|
||||
case _ => assert(false, "Expected a map but got ${r._1.toString}")
|
||||
}
|
||||
}
|
||||
|
||||
try {
|
||||
|
||||
test(1, (client, party) => {
|
||||
test(NumTransactions(2), (client, party) => {
|
||||
for {
|
||||
contractId <- create(client, party, "1.0")
|
||||
} yield Set(contractId)
|
||||
})
|
||||
|
||||
test(2, (client, party) => {
|
||||
for {
|
||||
contractId1 <- create(client, party, "2.0")
|
||||
contractId2 <- create(client, party, "2.1")
|
||||
} yield Set(contractId1, contractId2)
|
||||
} yield (Set(contractId), ActiveAssetMirrors(1))
|
||||
})
|
||||
|
||||
test(
|
||||
4,
|
||||
NumTransactions(4),
|
||||
(client, party) => {
|
||||
for {
|
||||
contractId1 <- create(client, party, "2.0")
|
||||
contractId2 <- create(client, party, "2.1")
|
||||
} yield (Set(contractId1, contractId2), ActiveAssetMirrors(2))
|
||||
}
|
||||
)
|
||||
|
||||
test(
|
||||
NumTransactions(6),
|
||||
(client, party) => {
|
||||
for {
|
||||
contractId1 <- create(client, party, "3.0")
|
||||
contractId2 <- create(client, party, "3.1")
|
||||
_ <- archive(client, party, "3.2", contractId1)
|
||||
_ <- archive(client, party, "3.3", contractId2)
|
||||
} yield Set()
|
||||
} yield (Set(), ActiveAssetMirrors(2))
|
||||
}
|
||||
)
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user