Triggers: Connect Logging context from Triggers to the Speedy. (#13009)

Follow up of #12976.
fixes #12208

CHANGELOG_BEGIN
CHANGELOG_END
This commit is contained in:
Remy 2022-02-22 18:28:49 +01:00 committed by GitHub
parent 42462f4574
commit e834a7b85c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 136 additions and 154 deletions

View File

@ -11,13 +11,13 @@ import java.util.logging.{Level, Logger}
import scalaz.std.option._
import scalaz.std.scalaFuture._
import scalaz.syntax.traverse._
import com.daml.lf.archive
import com.daml.lf.data.ImmArray
import com.daml.lf.data.Ref
import com.daml.lf.data.Ref.ModuleName
import com.daml.lf.language.LanguageVersion
import com.daml.lf.scenario.api.v1.{Map => _, _}
import com.daml.logging.LoggingContext
import io.grpc.stub.StreamObserver
import io.grpc.{Status, StatusRuntimeException}
import io.grpc.netty.NettyServerBuilder
@ -75,36 +75,37 @@ object ScenarioServiceMain extends App {
new AkkaExecutionSequencerPool("ScriptServicePool")(system)
implicit val materializer: Materializer = Materializer(system)
implicit val ec: ExecutionContext = system.dispatcher
LoggingContext.newLoggingContext { implicit lc: LoggingContext =>
val server =
NettyServerBuilder
.forAddress(new InetSocketAddress(InetAddress.getLoopbackAddress, 0)) // any free port
.addService(new ScenarioService(config.enableScenarios))
.maxInboundMessageSize(config.maxInboundMessageSize)
.build
server.start()
// Print the allocated port for the client
println("PORT=" + server.getPort.toString)
val server =
NettyServerBuilder
.forAddress(new InetSocketAddress(InetAddress.getLoopbackAddress, 0)) // any free port
.addService(new ScenarioService(config.enableScenarios))
.maxInboundMessageSize(config.maxInboundMessageSize)
.build
server.start()
// Print the allocated port for the client
println("PORT=" + server.getPort.toString)
// Bump up the log level
Logger.getLogger("io.grpc").setLevel(Level.ALL)
// Bump up the log level
Logger.getLogger("io.grpc").setLevel(Level.ALL)
// Start a thread to watch stdin and terminate
// if it closes. This makes sure we do not leave
// this process running if the parent exits.
new Thread(new Runnable {
def run(): Unit = {
while (System.in.read >= 0) {}
System.err.println("ScenarioService: stdin closed, terminating server.")
server.shutdown()
system.terminate()
()
}
}).start()
// Start a thread to watch stdin and terminate
// if it closes. This makes sure we do not leave
// this process running if the parent exits.
new Thread(new Runnable {
def run(): Unit = {
while (System.in.read >= 0) {}
System.err.println("ScenarioService: stdin closed, terminating server.")
server.shutdown()
system.terminate()
()
}
}).start()
println("Server started.")
server.awaitTermination()
println("Server started.")
server.awaitTermination()
}
}
}
@ -119,6 +120,7 @@ class ScenarioService(
ec: ExecutionContext,
esf: ExecutionSequencerFactory,
mat: Materializer,
lc: LoggingContext,
) extends ScenarioServiceGrpc.ScenarioServiceImplBase {
import ScenarioService._

View File

@ -35,7 +35,7 @@ object Context {
private val contextCounter = new AtomicLong()
def newContext(lfVerion: LanguageVersion): Context =
def newContext(lfVerion: LanguageVersion)(implicit loggingContext: LoggingContext): Context =
new Context(contextCounter.incrementAndGet(), lfVerion)
private val compilerConfig =
@ -47,7 +47,9 @@ object Context {
)
}
class Context(val contextId: Context.ContextId, languageVersion: LanguageVersion) {
class Context(val contextId: Context.ContextId, languageVersion: LanguageVersion)(implicit
loggingContext: LoggingContext
) {
import Context._
@ -143,14 +145,9 @@ class Context(val contextId: Context.ContextId, languageVersion: LanguageVersion
val compiledPackages = PureCompiledPackages(allSignatures, defns, compilerConfig)
for {
defn <- defns.get(LfDefRef(identifier))
} yield
// TODO: https://github.com/digital-asset/daml/issues/12208
// plug the logging context properly in the scenario service
LoggingContext.newLoggingContext(
Speedy.Machine.fromScenarioSExpr(
compiledPackages,
defn.body,
)(_)
} yield Speedy.Machine.fromScenarioSExpr(
compiledPackages,
defn.body,
)
}
@ -160,9 +157,7 @@ class Context(val contextId: Context.ContextId, languageVersion: LanguageVersion
): Option[ScenarioRunner.ScenarioResult] =
buildMachine(
Identifier(PackageId.assertFromString(pkgId), QualifiedName.assertFromString(name))
).map(machine =>
LoggingContext.newLoggingContext(new ScenarioRunner(machine, txSeeding).run(_))
)
).map(machine => new ScenarioRunner(machine, txSeeding).run)
def interpretScript(
pkgId: String,

View File

@ -23,7 +23,6 @@ import com.daml.lf.speedy.{Pretty, SValue, Speedy}
import com.daml.lf.speedy.SExpr.SExpr
import com.daml.lf.value.Value
import com.daml.lf.value.Value.ContractId
import com.daml.logging.LoggingContext
import com.daml.platform.participant.util.LfEngineToApi.toApiIdentifier
import com.daml.script.converter.ConverterException
import io.grpc.StatusRuntimeException
@ -304,12 +303,8 @@ object Converter {
fun: SValue,
): Either[String, (SValue, SValue)] = {
val machine =
// TODO: https://github.com/digital-asset/daml/issues/12208
// plug the logging context properly in Daml-script
LoggingContext.newLoggingContext(
Speedy.Machine.fromPureSExpr(compiledPackages, SEApp(SEValue(fun), Array(extractToTuple)))(
_
)
Speedy.Machine.fromPureSExpr(compiledPackages, SEApp(SEValue(fun), Array(extractToTuple)))(
Script.DummyLoggingContext
)
machine.run() match {
case SResultFinalValue(v) =>

View File

@ -178,6 +178,12 @@ object ParticipantsJsonProtocol extends DefaultJsonProtocol {
// Function that requires an argument.
sealed abstract class Script extends Product with Serializable
object Script {
// For now, we do not care of the logging context for Daml-Script, so we create a
// global dummy context, we can feed the Speedy Machine and the Scenario service with.
private[script] val DummyLoggingContext: LoggingContext =
LoggingContext.newLoggingContext(identity)
final case class Action(expr: SExpr, scriptIds: ScriptIds) extends Script
final case class Function(expr: SExpr, param: Type, scriptIds: ScriptIds) extends Script {
def apply(arg: SExpr): Script.Action = Script.Action(SEApp(expr, Array(arg)), scriptIds)
@ -378,10 +384,8 @@ private[lf] class Runner(
mat: Materializer,
): (Speedy.Machine, Future[SValue]) = {
val machine =
// TODO: https://github.com/digital-asset/daml/issues/12208
// plug the logging context properly in Daml-script
LoggingContext.newLoggingContext(
Speedy.Machine.fromPureSExpr(extendedCompiledPackages, script.expr, traceLog, warningLog)(_)
Speedy.Machine.fromPureSExpr(extendedCompiledPackages, script.expr, traceLog, warningLog)(
Script.DummyLoggingContext
)
def stepToValue(): Either[RuntimeException, SValue] =

View File

@ -24,7 +24,6 @@ import com.daml.lf.transaction.{
import com.daml.lf.value.Value
import com.daml.lf.value.Value.ContractId
import com.daml.script.converter.ConverterException
import com.daml.logging.LoggingContext
import io.grpc.StatusRuntimeException
import scalaz.OneAnd
import scalaz.OneAnd._
@ -157,21 +156,17 @@ class IdeLedgerClient(
val ledgerApi = ScenarioRunner.ScenarioLedgerApi(ledger)
val result =
// TODO: https://github.com/digital-asset/daml/issues/12208
// plug the logging context properly in Daml-script
LoggingContext.newLoggingContext(
ScenarioRunner.submit(
compiledPackages,
ledgerApi,
actAs.toSet,
readAs,
translated,
optLocation,
nextSeed(),
traceLog,
warningLog,
)(_)
)
ScenarioRunner.submit(
compiledPackages,
ledgerApi,
actAs.toSet,
readAs,
translated,
optLocation,
nextSeed(),
traceLog,
warningLog,
)(Script.DummyLoggingContext)
result match {
case err: ScenarioRunner.SubmissionError => err
case commit: ScenarioRunner.Commit[_] =>

View File

@ -32,15 +32,13 @@ import com.daml.lf.speedy.SResult._
import com.daml.lf.speedy.SValue._
import com.daml.lf.speedy.{Compiler, Pretty, SValue, Speedy}
import com.daml.lf.{CompiledPackages, PureCompiledPackages}
import com.daml.logging.LoggingContextOf.{label, newLoggingContext}
import com.daml.logging.entries.{LoggingEntry, LoggingValue}
import com.daml.logging.entries.LoggingEntry
import com.daml.logging.{ContextualizedLogger, LoggingContextOf}
import com.daml.platform.participant.util.LfEngineToApi.toApiIdentifier
import com.daml.platform.services.time.TimeProviderType
import com.daml.script.converter.Converter.Implicits._
import com.daml.script.converter.Converter.{DamlAnyModuleRecord, DamlTuple2, unrollFree}
import com.daml.script.converter.ConverterException
import com.daml.logging.LoggingContext
import com.google.protobuf.empty.Empty
import com.google.rpc.status.Status
import com.typesafe.scalalogging.StrictLogging
@ -52,7 +50,7 @@ import scalaz.syntax.functor._
import scalaz.syntax.std.boolean._
import scalaz.syntax.std.option._
import scalaz.syntax.tag._
import scalaz.{-\/, Functor, \/, \/-}
import scalaz.{-\/, Functor, Tag, \/, \/-}
import java.time.Instant
import java.util.UUID
@ -77,25 +75,7 @@ final case class Trigger(
heartbeat: Option[FiniteDuration],
// Whether the trigger supports readAs claims (SDK 1.18 and newer) or not.
hasReadAs: Boolean,
) {
private[trigger] final class withLoggingContext[P] private (
label: label[Trigger with P],
kvs: Seq[LoggingEntry],
) {
def apply[T](f: LoggingContextOf[Trigger with P] => T): T =
newLoggingContext(
label,
kvs :+ "triggerDefinition" -> LoggingValue.from(triggerDefinition.toString): _*
)(f)
}
private[trigger] object withLoggingContext {
def apply[T](f: LoggingContextOf[Trigger] => T): T =
newLoggingContext(label, "triggerDefinition" -> triggerDefinition.toString)(f)
def labelled[P](kvs: LoggingEntry*) = new withLoggingContext(label[Trigger with P], kvs)
}
}
)
// Utilities for interacting with the speedy machine.
object Machine extends StrictLogging {
@ -118,6 +98,21 @@ object Machine extends StrictLogging {
object Trigger extends StrictLogging {
private[trigger] def newLoggingContext[P, T](
triggerDefinition: Identifier,
actAs: Party,
readAs: Set[Party],
triggerId: Option[UUID] = None,
): (LoggingContextOf[Trigger with P] => T) => T = {
val entries0 = List[LoggingEntry](
"triggerDefinition" -> triggerDefinition.toString,
"triggerActAs" -> Tag.unwrap(actAs),
"triggerReadAs" -> Tag.unsubst(readAs),
)
val entries = triggerId.fold(entries0)(uuid => entries0.+:("triggerId" -> uuid.toString))
LoggingContextOf.newLoggingContext(LoggingContextOf.label[Trigger with P], entries: _*)
}
private def detectHasReadAs(
interface: PackageInterface,
triggerIds: TriggerIds,
@ -140,7 +135,7 @@ object Trigger extends StrictLogging {
def fromIdentifier(
compiledPackages: CompiledPackages,
triggerId: Identifier,
): Either[String, Trigger] = {
)(implicit loggingContext: LoggingContextOf[Trigger]): Either[String, Trigger] = {
// Given an identifier to a high- or lowlevel trigger,
// return an expression that will run the corresponding trigger
@ -192,16 +187,11 @@ object Trigger extends StrictLogging {
compiler: Compiler,
converter: Converter,
expr: TypedExpr,
): Either[String, Option[FiniteDuration]] = {
)(implicit loggingContext: LoggingContextOf[Trigger]): Either[String, Option[FiniteDuration]] = {
val heartbeat = compiler.unsafeCompile(
ERecProj(expr.ty, Name.assertFromString("heartbeat"), expr.expr)
)
val machine =
// TODO: https://github.com/digital-asset/daml/issues/12208
// plug the logging context properly in triggers
LoggingContext.newLoggingContext(
Speedy.Machine.fromPureSExpr(compiledPackages, heartbeat)(_)
)
val machine = Speedy.Machine.fromPureSExpr(compiledPackages, heartbeat)
Machine.stepToValue(machine) match {
case SOptional(None) => Right(None)
case SOptional(Some(relTime)) => converter.toFiniteDuration(relTime).map(Some(_))
@ -215,17 +205,11 @@ object Trigger extends StrictLogging {
compiler: Compiler,
converter: Converter,
expr: TypedExpr,
): Either[String, Filters] = {
val registeredTemplates =
compiler.unsafeCompile(
ERecProj(expr.ty, Name.assertFromString("registeredTemplates"), expr.expr)
)
val machine =
// TODO: https://github.com/digital-asset/daml/issues/12208
// plug the logging context properly in triggers
LoggingContext.newLoggingContext(
Speedy.Machine.fromPureSExpr(compiledPackages, registeredTemplates)(_)
)
)(implicit loggingContext: LoggingContextOf[Trigger]): Either[String, Filters] = {
val registeredTemplates = compiler.unsafeCompile(
ERecProj(expr.ty, Name.assertFromString("registeredTemplates"), expr.expr)
)
val machine = Speedy.Machine.fromPureSExpr(compiledPackages, registeredTemplates)
Machine.stepToValue(machine) match {
case SVariant(_, "AllInDar", _, _) => {
val packages = compiledPackages.packageIds
@ -759,7 +743,7 @@ object Runner extends StrictLogging {
case object Neither extends SeenMsgs
}
// Convience wrapper that creates the runner and runs the trigger.
// Convenience wrapper that creates the runner and runs the trigger.
def run(
dar: Dar[(PackageId, Package)],
triggerId: Identifier,
@ -768,23 +752,22 @@ object Runner extends StrictLogging {
applicationId: ApplicationId,
parties: TriggerParties,
config: Compiler.Config,
)(implicit materializer: Materializer, executionContext: ExecutionContext): Future[SValue] = {
val darMap = dar.all.toMap
val compiledPackages = PureCompiledPackages.build(darMap, config) match {
case Left(err) => throw new RuntimeException(s"Failed to compile packages: $err")
case Right(pkgs) => pkgs
}
val trigger = Trigger.fromIdentifier(compiledPackages, triggerId) match {
case Left(err) => throw new RuntimeException(s"Invalid trigger: $err")
case Right(trigger) => trigger
}
val runner =
trigger.withLoggingContext { implicit lc =>
new Runner(compiledPackages, trigger, client, timeProviderType, applicationId, parties)
)(implicit materializer: Materializer, executionContext: ExecutionContext): Future[SValue] =
Trigger.newLoggingContext(triggerId, parties.actAs, parties.readAs) { implicit lc =>
val darMap = dar.all.toMap
val compiledPackages = PureCompiledPackages.build(darMap, config) match {
case Left(err) => throw new RuntimeException(s"Failed to compile packages: $err")
case Right(pkgs) => pkgs
}
for {
(acs, offset) <- runner.queryACS()
finalState <- runner.runWithACS(acs, offset)._2
} yield finalState
}
val trigger = Trigger.fromIdentifier(compiledPackages, triggerId) match {
case Left(err) => throw new RuntimeException(s"Invalid trigger: $err")
case Right(trigger) => trigger
}
val runner =
new Runner(compiledPackages, trigger, client, timeProviderType, applicationId, parties)
for {
(acs, offset) <- runner.queryACS()
finalState <- runner.runWithACS(acs, offset)._2
} yield finalState
}
}

View File

@ -73,7 +73,6 @@ da_scala_library(
"//libs-scala/contextualized-logging",
"//libs-scala/db-utils",
"//libs-scala/doobie-slf4j",
"//libs-scala/logging-entries",
"//libs-scala/scala-utils",
"//triggers/runner:trigger-runner-lib",
"//triggers/service/auth:middleware-api",

View File

@ -111,9 +111,11 @@ class Server(
)(implicit ec: ExecutionContext, sys: ActorSystem): Future[Either[String, Unit]] = {
import cats.implicits._
triggers.toList.traverse(runningTrigger =>
Trigger
.fromIdentifier(compiledPackages, runningTrigger.triggerName)
.map(trigger => (trigger, runningTrigger))
runningTrigger.withLoggingContext(implicit loggingContext =>
Trigger
.fromIdentifier(compiledPackages, runningTrigger.triggerName)
.map(trigger => (trigger, runningTrigger))
)
) match {
case Left(err) => Future.successful(Left(err))
case Right(triggers) =>
@ -165,12 +167,14 @@ class Server(
auth.flatMap(_.refreshToken),
config.readAs,
)
// Validate trigger id before persisting to DB
Trigger.fromIdentifier(compiledPackages, runningTrigger.triggerName) match {
case Left(value) => Future.successful(Left(value))
case Right(trigger) =>
triggerDao.addRunningTrigger(runningTrigger).map(_ => Right((trigger, runningTrigger)))
}
runningTrigger.withLoggingContext(implicit loggingContext =>
// Validate trigger id before persisting to DB
Trigger.fromIdentifier(compiledPackages, runningTrigger.triggerName) match {
case Left(value) => Future.successful(Left(value))
case Right(trigger) =>
triggerDao.addRunningTrigger(runningTrigger).map(_ => Right((trigger, runningTrigger)))
}
)
}
private def startTrigger(trigger: Trigger, runningTrigger: RunningTrigger)(implicit

View File

@ -43,8 +43,8 @@ object TriggerRunnerImpl {
restartConfig: TriggerRestartConfig,
readAs: Set[Party],
) {
private[trigger] def withLoggingContext[T](f: LoggingContextOf[Config with Trigger] => T): T =
trigger.withLoggingContext.labelled[Config]("triggerId" -> triggerInstance.toString)(f)
private[trigger] def withLoggingContext[T]: (LoggingContextOf[Trigger with Config] => T) => T =
Trigger.newLoggingContext(trigger.triggerDefinition, party, readAs, Some(triggerInstance))
}
sealed trait Message

View File

@ -21,6 +21,7 @@ package trigger {
import com.daml.auth.middleware.api.Tagged.{AccessToken, RefreshToken}
import com.daml.ledger.api.refinements.ApiTypes.ApplicationId
import com.daml.logging.LoggingContextOf
case class LedgerConfig(
host: String,
@ -44,5 +45,8 @@ package trigger {
triggerAccessToken: Option[AccessToken],
triggerRefreshToken: Option[RefreshToken],
triggerReadAs: Set[Party],
)
) {
private[trigger] def withLoggingContext[T]: (LoggingContextOf[Trigger] => T) => T =
Trigger.newLoggingContext(triggerName, triggerParty, triggerReadAs, Some(triggerInstance))
}
}

View File

@ -92,19 +92,20 @@ trait AbstractTriggerTest extends SandboxFixture with SandboxBackend.Postgresql
readAs: Set[String] = Set.empty,
): Runner = {
val triggerId = Identifier(packageId, name)
val trigger = Trigger.fromIdentifier(compiledPackages, triggerId).toOption.get
trigger.withLoggingContext { implicit lc =>
new Runner(
compiledPackages,
trigger,
client,
config.timeProviderType.get,
applicationId,
TriggerParties(
actAs = Party(party),
readAs = Party.subst(readAs),
),
)
Trigger.newLoggingContext(triggerId, Party(party), Party.subst(readAs)) {
implicit loggingContext =>
val trigger = Trigger.fromIdentifier(compiledPackages, triggerId).toOption.get
new Runner(
compiledPackages,
trigger,
client,
config.timeProviderType.get,
applicationId,
TriggerParties(
actAs = Party(party),
readAs = Party.subst(readAs),
),
)
}
}