kvutils: Add a Runner for the simplified KVUtils API. (#3930)

* Initial version of simplified transaction reader/writer API.

* Code tidying.

* Ran ./fmt.sh

* Added verification of pre-declared vs. actual state updates to example.

* kvutils: Run `scalafmt`.

* kvutils: Move ParticipantStateIntegrationSpecBase to src/test/lib.

* kvutils: Clean up warnings in ParticipantStateIntegrationSpecBase.

* ledger-on-memory: Move InMemoryLedgerReaderWriter to its own package.

* ledger-on-memory: Add a Main class with a simple configuration.

* reference-v2/ledger-on-memory: Wait for DARs to upload.

* ledger-on-memory: Run conformance tests.

* kvutils/app: Extract the in-memory ledger `Main` class into a Runner.

* reference-v2: Move InMemoryKVParticipantState out of kvutils.

It's not used elsewhere.

* kvutils/app: Fix some of the config help text.

* kvutils/app: Don't register the archive files twice.

CHANGELOG_BEGIN

- [kvutils/app] A simple way to run KVUtils-based participants.
- [ledger-in-memory] Split out the simple in-memory ledger, previously
  part of kvutils, into its own JAR.

CHANGELOG_END

* Revert "kvutils/app: Don't register the archive files twice."

This reverts commit 2489ae6964.

Turns out we need this for preloading, which is a useful optimization.

Co-authored-by: Miklos <57664299+miklos-da@users.noreply.github.com>
This commit is contained in:
Samir Talwar 2020-01-13 10:51:07 +01:00 committed by mergify[bot]
parent 5aa33cf7e9
commit 6bc00da1e2
21 changed files with 542 additions and 42 deletions

View File

@ -4,6 +4,7 @@
load(
"//bazel_tools:scala.bzl",
"da_scala_binary",
"da_scala_test",
)
load(
"//ledger/ledger-api-test-tool:conformance.bzl",
@ -36,6 +37,7 @@ da_scala_binary(
"//daml-lf/archive:daml_lf_dev_archive_java_proto",
"//daml-lf/data",
"//daml-lf/engine",
"//daml-lf/transaction",
"//language-support/scala/bindings",
"//ledger/ledger-api-auth",
"//ledger/ledger-api-common",
@ -51,13 +53,38 @@ da_scala_binary(
],
)
da_scala_test(
name = "reference-v2-tests",
size = "small",
srcs = glob(["src/test/suite/**/*.scala"]),
data = [
"//ledger/test-common:Test-stable.dar",
],
resources = glob(["src/test/resources/*"]),
deps = [
":reference-v2",
"//daml-lf/data",
"//language-support/scala/bindings",
"//ledger-api/rs-grpc-bridge",
"//ledger-api/testing-utils",
"//ledger/ledger-api-common",
"//ledger/ledger-api-health",
"//ledger/participant-state",
"//ledger/participant-state/kvutils",
"//ledger/participant-state/kvutils:kvutils-tests-lib",
"@maven//:com_typesafe_akka_akka_actor_2_12",
"@maven//:com_typesafe_akka_akka_stream_2_12",
"@maven//:org_slf4j_slf4j_api",
],
)
########################################
### Testing the index server
########################################
da_scala_binary(
name = "ephemeral-postgres-reference-server",
srcs = glob(["src/test/scala/**/*.scala"]),
srcs = glob(["src/test/lib/scala/**/*.scala"]),
data = [
"@postgresql_dev_env//:all",
],

View File

@ -1,7 +1,7 @@
// Copyright (c) 2020 The DAML Authors. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.daml.ledger.participant.state.kvutils
package com.daml.ledger.api.server.damlonx.reference.v2
import java.io._
import java.time.Clock
@ -13,7 +13,14 @@ import akka.actor.{Actor, ActorSystem, PoisonPill, Props}
import akka.pattern.gracefulStop
import akka.stream.Materializer
import akka.stream.scaladsl.{Sink, Source}
import com.daml.ledger.participant.state.kvutils.{DamlKvutils => Proto}
import com.daml.ledger.participant.state.kvutils.{
Envelope,
KeyValueCommitting,
KeyValueConsumption,
KeyValueSubmission,
Pretty,
DamlKvutils => Proto
}
import com.daml.ledger.participant.state.v1._
import com.digitalasset.daml.lf.data.Ref
import com.digitalasset.daml.lf.data.Ref.LedgerString

View File

@ -9,7 +9,6 @@ import akka.actor.ActorSystem
import akka.stream.Materializer
import com.codahale.metrics.SharedMetricRegistries
import com.daml.ledger.api.server.damlonx.reference.v2.cli.Cli
import com.daml.ledger.participant.state.kvutils.InMemoryKVParticipantState
import com.daml.ledger.participant.state.v1.{ReadService, SubmissionId, WriteService}
import com.digitalasset.daml.lf.archive.DarReader
import com.digitalasset.daml_lf_dev.DamlLf.Archive
@ -48,13 +47,19 @@ object ReferenceServer extends App {
ledger <- ResourceOwner
.forCloseable(() => new InMemoryKVParticipantState(config.participantId))
.acquire()
_ = config.archiveFiles.foreach { file =>
_ <- Resource.sequenceIgnoringValues(config.archiveFiles.map { file =>
val submissionId = SubmissionId.assertFromString(UUID.randomUUID().toString)
for {
dar <- DarReader { case (_, x) => Try(Archive.parseFrom(x)) }
.readArchiveFromFile(file)
} yield ledger.uploadPackages(submissionId, dar.all, None)
}
dar <- ResourceOwner
.forTry(() =>
DarReader { case (_, x) => Try(Archive.parseFrom(x)) }
.readArchiveFromFile(file))
.acquire()
_ <- ResourceOwner
.forCompletionStage(() => ledger.uploadPackages(submissionId, dar.all, None))
.acquire()
} yield ()
})
_ <- startIndexerServer(config, readService = ledger)
_ <- startApiServer(
config,

View File

@ -67,7 +67,7 @@ object Cli {
.optional()
.unbounded()
.action((f, c) => c.copy(archiveFiles = f :: c.archiveFiles))
.text("DAR files to load. Scenarios are ignored. The servers starts with an empty ledger by default.")
.text("DAR files to load. Scenarios are ignored. The server starts with an empty ledger by default.")
}
def parse(args: Array[String], binaryName: String, description: String): Option[Config] =

View File

@ -1,16 +1,20 @@
// Copyright (c) 2020 The DAML Authors. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.daml.ledger.participant.state.kvutils
// Copyright (c) 2019 The DAML Authors. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.daml.ledger.api.server.damlonx.reference.v2
import java.time.Clock
import com.daml.ledger.participant.state.kvutils.ParticipantStateIntegrationSpecBase
import com.daml.ledger.participant.state.v1._
import com.digitalasset.daml.lf.data.Ref.LedgerString
import com.digitalasset.daml.lf.data.Time.Timestamp
class InMemoryKVParticipantStateIT
extends ParticipantStateIntegrationSpecBase("In-memory participant state implementation") {
extends ParticipantStateIntegrationSpecBase("In-memory participant state for Reference v2") {
override def participantStateFactory(
participantId: ParticipantId,
@ -19,4 +23,9 @@ class InMemoryKVParticipantStateIT
override def currentRecordTime(): Timestamp =
Timestamp.assertFromInstant(Clock.systemUTC().instant())
override protected def afterEach(): Unit = {
ps.asInstanceOf[InMemoryKVParticipantState].close()
super.afterEach()
}
}

View File

@ -0,0 +1,89 @@
# Copyright (c) 2020 The DAML Authors. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
# Copyright (c) 2019 The DAML Authors. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
load(
"//bazel_tools:scala.bzl",
"da_scala_binary",
"da_scala_library",
"da_scala_test",
)
load("//ledger/ledger-api-test-tool:conformance.bzl", "conformance_test")
da_scala_library(
name = "ledger-on-memory",
srcs = glob(["src/main/scala/**/*.scala"]),
resources = glob(["src/main/resources/**/*"]),
tags = ["maven_coordinates=com.daml.ledger:on-memory:__VERSION__"],
visibility = [
"//visibility:public",
],
deps = [
"//daml-lf/data",
"//daml-lf/engine",
"//ledger/ledger-api-common",
"//ledger/ledger-api-health",
"//ledger/participant-state",
"//ledger/participant-state/kvutils",
"//ledger/participant-state/kvutils/app",
"@maven//:com_google_protobuf_protobuf_java",
"@maven//:com_typesafe_akka_akka_actor_2_12",
"@maven//:com_typesafe_akka_akka_stream_2_12",
],
)
da_scala_test(
name = "ledger-on-memory-tests",
size = "small",
srcs = glob(["src/test/suite/**/*.scala"]),
data = [
"//ledger/test-common:Test-stable.dar",
],
resources = glob(["src/test/resources/*"]),
deps = [
":ledger-on-memory",
"//daml-lf/data",
"//ledger-api/rs-grpc-bridge",
"//ledger-api/testing-utils",
"//ledger/ledger-api-common",
"//ledger/ledger-api-health",
"//ledger/participant-state",
"//ledger/participant-state/kvutils",
"//ledger/participant-state/kvutils:kvutils-tests-lib",
"@maven//:com_typesafe_akka_akka_actor_2_12",
"@maven//:com_typesafe_akka_akka_stream_2_12",
"@maven//:org_scalactic_scalactic_2_12",
"@maven//:org_scalatest_scalatest_2_12",
],
)
da_scala_binary(
name = "app",
main_class = "com.daml.ledger.on.memory.Main",
visibility = ["//visibility:public"],
runtime_deps = [
"@maven//:ch_qos_logback_logback_classic",
"@maven//:ch_qos_logback_logback_core",
"@maven//:com_h2database_h2",
],
deps = [
":ledger-on-memory",
],
)
conformance_test(
name = "conformance-test",
ports = [6865],
server = ":app",
server_args = [
"--port=6865",
],
test_tool_args = [
"--all-tests",
"--exclude=ConfigManagementServiceIT",
"--exclude=LotsOfPartiesIT",
"--exclude=TimeIT",
],
)

View File

@ -1,14 +1,16 @@
// Copyright (c) 2020 The DAML Authors. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.daml.ledger.participant.state.kvutils.api.impl
// Copyright (c) 2019 The DAML Authors. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.daml.ledger.on.memory
import java.time.Clock
import java.util.UUID
import akka.NotUsed
import akka.stream.scaladsl.Source
import com.daml.ledger.participant.state.kvutils.{Envelope, KeyValueCommitting}
import com.daml.ledger.participant.state.kvutils.DamlKvutils.{
DamlLogEntryId,
DamlStateKey,
@ -16,6 +18,7 @@ import com.daml.ledger.participant.state.kvutils.DamlKvutils.{
DamlSubmission
}
import com.daml.ledger.participant.state.kvutils.api.{LedgerReader, LedgerRecord, LedgerWriter}
import com.daml.ledger.participant.state.kvutils.{Envelope, KeyValueCommitting}
import com.daml.ledger.participant.state.v1._
import com.digitalasset.daml.lf.data.Ref
import com.digitalasset.daml.lf.data.Time.Timestamp
@ -23,25 +26,25 @@ import com.digitalasset.daml.lf.engine.Engine
import com.digitalasset.ledger.api.health.{HealthStatus, Healthy}
import com.digitalasset.platform.akkastreams.dispatcher.Dispatcher
import com.digitalasset.platform.akkastreams.dispatcher.SubSource.OneAfterAnother
import scala.collection.JavaConverters._
import scala.collection.{breakOut, mutable}
import scala.collection.mutable.ArrayBuffer
import scala.concurrent.{ExecutionContext, Future}
import com.google.protobuf.ByteString
import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
import scala.collection.{breakOut, mutable}
import scala.concurrent.{ExecutionContext, Future}
import scala.util.Random
private[impl] class LogEntry(val entryId: DamlLogEntryId, val payload: Array[Byte])
private[memory] class LogEntry(val entryId: DamlLogEntryId, val payload: Array[Byte])
private[impl] object LogEntry {
private[memory] object LogEntry {
def apply(entryId: DamlLogEntryId, payload: Array[Byte]): LogEntry =
new LogEntry(entryId, payload)
}
private[impl] class InMemoryState(
private[memory] class InMemoryState(
val log: mutable.Buffer[LogEntry] = ArrayBuffer[LogEntry](),
val state: mutable.Map[ByteString, DamlStateValue] = mutable.Map.empty)
val state: mutable.Map[ByteString, DamlStateValue] = mutable.Map.empty,
)
final class InMemoryLedgerReaderWriter(
ledgerId: LedgerId = Ref.LedgerString.assertFromString(UUID.randomUUID.toString),
@ -115,7 +118,9 @@ final class InMemoryLedgerReaderWriter(
override def currentHealth(): HealthStatus = Healthy
override def close(): Unit = ()
override def close(): Unit = {
dispatcher.close()
}
private val dispatcher: Dispatcher[Int] =
Dispatcher("in-memory-key-value-participant-state", zeroIndex = 0, headAtInitialization = 0)

View File

@ -0,0 +1,18 @@
// Copyright (c) 2020 The DAML Authors. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
// Copyright (c) 2019 The DAML Authors. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.daml.ledger.on.memory
import com.daml.ledger.participant.state.kvutils.app.Runner
import scala.concurrent.ExecutionContext.Implicits.global
object Main extends App {
new Runner(
"In-Memory Ledger",
participantId => new InMemoryLedgerReaderWriter(participantId = participantId),
).run(args)
}

View File

@ -0,0 +1,12 @@
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%level %logger{10}: %msg%n</pattern>
</encoder>
</appender>
<root level="TRACE">
<appender-ref ref="STDOUT"/>
</root>
</configuration>

View File

@ -1,12 +1,12 @@
// Copyright (c) 2020 The DAML Authors. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.daml.ledger.participant.state.kvutils.api
package com.daml.ledger.on.memory
import java.time.Clock
import com.daml.ledger.participant.state.kvutils.ParticipantStateIntegrationSpecBase
import com.daml.ledger.participant.state.kvutils.api.impl.InMemoryLedgerReaderWriter
import com.daml.ledger.participant.state.kvutils.api.KeyValueParticipantState
import com.daml.ledger.participant.state.v1._
import com.digitalasset.daml.lf.data.Ref.LedgerString
import com.digitalasset.daml.lf.data.Time.Timestamp

View File

@ -42,16 +42,6 @@ da_scala_library(
],
)
test_lib_deps = [
":kvutils",
"//bazel_tools/runfiles:scala_runfiles",
"//ledger-api/rs-grpc-bridge",
"//ledger-api/testing-utils",
"@maven//:org_mockito_mockito_core",
"@maven//:org_scalactic_scalactic_2_12",
"@maven//:org_scalatest_scalatest_2_12",
]
da_scala_library(
name = "kvutils-tests-lib",
srcs = glob(["src/test/lib/scala/**/*.scala"]),
@ -93,7 +83,6 @@ da_scala_test(
deps = [
":daml_kvutils_java_proto",
":kvutils",
":kvutils-tests-lib",
"//bazel_tools/runfiles:scala_runfiles",
"//daml-lf/archive:daml_lf_archive_reader",
"//daml-lf/archive:daml_lf_dev_archive_java_proto",

View File

@ -0,0 +1,47 @@
# Copyright (c) 2020 The DAML Authors. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
# Copyright (c) 2019 The DAML Authors. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
load(
"//bazel_tools:scala.bzl",
"da_scala_binary",
"da_scala_library",
"da_scala_test",
)
load("//ledger/ledger-api-test-tool:conformance.bzl", "conformance_test")
da_scala_library(
name = "app",
srcs = glob(["src/main/scala/**/*.scala"]),
resources = glob(["src/main/resources/**/*"]),
tags = ["maven_coordinates=com.daml.ledger:participant-state-kvutils-app:__VERSION__"],
visibility = [
"//visibility:public",
],
runtime_deps = [
"@maven//:ch_qos_logback_logback_classic",
"@maven//:ch_qos_logback_logback_core",
"@maven//:com_h2database_h2",
],
deps = [
"//daml-lf/archive:daml_lf_archive_reader",
"//daml-lf/archive:daml_lf_dev_archive_java_proto",
"//daml-lf/data",
"//daml-lf/engine",
"//language-support/scala/bindings",
"//ledger/ledger-api-auth",
"//ledger/ledger-api-common",
"//ledger/ledger-api-health",
"//ledger/participant-state",
"//ledger/participant-state/kvutils",
"//ledger/sandbox",
"@maven//:com_github_scopt_scopt_2_12",
"@maven//:com_google_protobuf_protobuf_java",
"@maven//:com_typesafe_akka_akka_actor_2_12",
"@maven//:com_typesafe_akka_akka_stream_2_12",
"@maven//:io_dropwizard_metrics_metrics_core",
"@maven//:org_slf4j_slf4j_api",
],
)

View File

@ -0,0 +1,41 @@
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%d{HH:mm:ss.SSS} %-5level %logger{5} - %msg%n</pattern>
</encoder>
</appender>
<appender name="STDERR" class="ch.qos.logback.core.ConsoleAppender">
<target>System.err</target>
<encoder>
<pattern>%d{HH:mm:ss.SSS} %-5level %logger{5} - %msg%n</pattern>
</encoder>
</appender>
<appender name="FILE" class="ch.qos.logback.core.FileAppender">
<file>api-server-damlonx-reference.log</file>
<encoder>
<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg %mdc%n</pattern>
</encoder>
</appender>
<appender name="ASYNC" class="ch.qos.logback.classic.AsyncAppender">
<appender-ref ref="FILE"/>
</appender>
<logger name="io.netty" level="WARN">
<appender-ref ref="STDERR"/>
</logger>
<logger name="io.grpc.netty" level="WARN">
<appender-ref ref="STDERR"/>
</logger>
<logger name="com.digitalasset.platform.sandbox.services" level="INFO" additivity="false">
<appender-ref ref="STDERR"/>
</logger>
<root level="INFO">
<appender-ref ref="STDOUT"/>
<appender-ref ref="ASYNC"/>
</root>
</configuration>

View File

@ -0,0 +1,70 @@
// Copyright (c) 2020 The DAML Authors. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
// Copyright (c) 2019 The DAML Authors. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.daml.ledger.participant.state.kvutils.app
import java.io.File
import java.nio.file.Path
import com.daml.ledger.participant.state.v1.ParticipantId
import scopt.OptionParser
case class Config(
participantId: ParticipantId,
address: Option[String],
port: Int,
portFile: Option[Path],
archiveFiles: Seq[Path],
)
object Config {
val DefaultMaxInboundMessageSize: Int = 4 * 1024 * 1024
def default: Config =
Config(
participantId = ParticipantId.assertFromString("example"),
address = None,
port = 6865,
portFile = None,
archiveFiles = Vector.empty,
)
def parse(name: String, args: Seq[String]): Option[Config] =
parser(name).parse(args, default)
private def parser(name: String): OptionParser[Config] = new scopt.OptionParser[Config](name) {
head(name)
opt[String](name = "participant-id")
.optional()
.text("The participant ID given to all components of the ledger API server.")
.action((participantId, config) =>
config.copy(participantId = ParticipantId.assertFromString(participantId)))
opt[String]("address")
.optional()
.text("The address on which to run the ledger API server.")
.action((address, config) => config.copy(address = Some(address)))
opt[Int]("port")
.optional()
.text("The port on which to run the ledger API server.")
.action((port, config) => config.copy(port = port))
opt[File]("port-file")
.optional()
.text("File to write the allocated port number to. Used to inform clients in CI about the allocated port.")
.action((file, config) => config.copy(portFile = Some(file.toPath)))
arg[File]("<archive>...")
.optional()
.unbounded()
.text("DAR files to load. Scenarios are ignored. The server starts with an empty ledger by default.")
.action((file, config) => config.copy(archiveFiles = config.archiveFiles :+ file.toPath))
help("help").text(s"$name as a service.")
}
}

View File

@ -0,0 +1,124 @@
// Copyright (c) 2020 The DAML Authors. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
// Copyright (c) 2019 The DAML Authors. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.daml.ledger.participant.state.kvutils.app
import java.util.UUID
import akka.actor.ActorSystem
import akka.stream.Materializer
import com.codahale.metrics.SharedMetricRegistries
import com.daml.ledger.participant.state.kvutils.api.KeyValueParticipantState
import com.daml.ledger.participant.state.v1.{ParticipantId, ReadService, SubmissionId, WriteService}
import com.digitalasset.api.util.TimeProvider
import com.digitalasset.daml.lf.archive.DarReader
import com.digitalasset.daml_lf_dev.DamlLf.Archive
import com.digitalasset.ledger.api.auth.{AuthService, AuthServiceWildcard}
import com.digitalasset.platform.apiserver.{ApiServerConfig, StandaloneApiServer}
import com.digitalasset.platform.common.logging.NamedLoggerFactory
import com.digitalasset.platform.indexer.{
IndexerConfig,
IndexerStartupMode,
StandaloneIndexerServer
}
import com.digitalasset.platform.resources.{Resource, ResourceOwner}
import org.slf4j.LoggerFactory
import scala.concurrent.duration.DurationInt
import scala.concurrent.{Await, ExecutionContext}
import scala.util.Try
class Runner(name: String, construct: ParticipantId => KeyValueLedger) {
def run(args: Seq[String]): Unit = {
val config = Config.parse(name, args).getOrElse(sys.exit(1))
val logger = LoggerFactory.getLogger(getClass)
implicit val system: ActorSystem = ActorSystem(
"[^A-Za-z0-9_\\-]".r.replaceAllIn(name.toLowerCase, "-"))
implicit val materializer: Materializer = Materializer(system)
implicit val executionContext: ExecutionContext = system.dispatcher
val resource = for {
// Take ownership of the actor system and materializer so they're cleaned up properly.
// This is necessary because we can't declare them as implicits within a `for` comprehension.
_ <- ResourceOwner.forActorSystem(() => system).acquire()
_ <- ResourceOwner.forMaterializer(() => materializer).acquire()
readerWriter <- ResourceOwner
.forCloseable(() => construct(config.participantId))
.acquire()
ledger = new KeyValueParticipantState(readerWriter, readerWriter)
_ <- Resource.sequenceIgnoringValues(config.archiveFiles.map { file =>
val submissionId = SubmissionId.assertFromString(UUID.randomUUID().toString)
for {
dar <- ResourceOwner
.forTry(() =>
DarReader { case (_, x) => Try(Archive.parseFrom(x)) }
.readArchiveFromFile(file.toFile))
.acquire()
_ <- ResourceOwner
.forCompletionStage(() => ledger.uploadPackages(submissionId, dar.all, None))
.acquire()
} yield ()
})
_ <- startIndexerServer(config, readService = ledger)
_ <- startApiServer(
config,
readService = ledger,
writeService = ledger,
authService = AuthServiceWildcard,
)
} yield ()
resource.asFuture.failed.foreach { exception =>
logger.error("Shutting down because of an initialization error.", exception)
System.exit(1)
}
Runtime.getRuntime
.addShutdownHook(new Thread(() => Await.result(resource.release(), 10.seconds)))
}
private def startIndexerServer(
config: Config,
readService: ReadService,
)(implicit executionContext: ExecutionContext): Resource[Unit] =
new StandaloneIndexerServer(
readService,
IndexerConfig(
config.participantId,
jdbcUrl = "jdbc:h2:mem:server;db_close_delay=-1;db_close_on_exit=false",
startupMode = IndexerStartupMode.MigrateAndStart,
),
NamedLoggerFactory.forParticipant(config.participantId),
SharedMetricRegistries.getOrCreate(s"indexer-${config.participantId}"),
).acquire()
private def startApiServer(
config: Config,
readService: ReadService,
writeService: WriteService,
authService: AuthService,
)(implicit executionContext: ExecutionContext): Resource[Unit] =
new StandaloneApiServer(
ApiServerConfig(
config.participantId,
config.archiveFiles.map(_.toFile).toList,
config.port,
config.address,
jdbcUrl = "jdbc:h2:mem:server;db_close_delay=-1;db_close_on_exit=false",
tlsConfig = None,
TimeProvider.UTC,
Config.DefaultMaxInboundMessageSize,
config.portFile,
),
readService,
writeService,
authService,
NamedLoggerFactory.forParticipant(config.participantId),
SharedMetricRegistries.getOrCreate(s"ledger-api-server-${config.participantId}"),
).acquire()
}

View File

@ -0,0 +1,13 @@
// Copyright (c) 2020 The DAML Authors. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
// Copyright (c) 2019 The DAML Authors. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.daml.ledger.participant.state.kvutils
import com.daml.ledger.participant.state.kvutils.api.{LedgerReader, LedgerWriter}
package object app {
type KeyValueLedger = LedgerReader with LedgerWriter with AutoCloseable
}

View File

@ -64,7 +64,6 @@ abstract class ParticipantStateIntegrationSpecBase(implementationName: String)
// a KeyValueConsumptionSpec as the heart of the logic is there
implementationName should {
"return initial conditions" in {
for {
conditions <- ps

View File

@ -4,7 +4,6 @@
package com.daml.ledger.participant.state.kvutils.api
import akka.NotUsed
import com.google.protobuf.ByteString
import akka.stream.scaladsl.{Sink, Source}
import com.daml.ledger.participant.state.kvutils.DamlKvutils.{
DamlLogEntry,
@ -16,6 +15,7 @@ import com.daml.ledger.participant.state.kvutils.DamlKvutils.{
import com.daml.ledger.participant.state.kvutils.Envelope
import com.daml.ledger.participant.state.v1.{Offset, Update}
import com.digitalasset.ledger.api.testing.utils.AkkaBeforeAndAfterAll
import com.google.protobuf.ByteString
import org.mockito.ArgumentMatchers._
import org.mockito.Mockito.when
import org.scalatest.mockito.MockitoSugar

View File

@ -4,12 +4,14 @@
package com.digitalasset.platform.resources
import java.util.Timer
import java.util.concurrent.ExecutorService
import java.util.concurrent.{CompletionStage, ExecutorService}
import akka.actor.ActorSystem
import akka.stream.Materializer
import scala.compat.java8.FutureConverters._
import scala.concurrent.{ExecutionContext, Future}
import scala.util.{Failure, Success, Try}
@FunctionalInterface
trait ResourceOwner[A] {
@ -36,14 +38,20 @@ trait ResourceOwner[A] {
object ResourceOwner {
def successful[T](value: T): ResourceOwner[T] =
new FutureResourceOwner(() => Future.successful(value))
forTry(() => Success(value))
def failed[T](exception: Throwable): ResourceOwner[T] =
new FutureResourceOwner(() => Future.failed(exception))
forTry(() => Failure(exception))
def forTry[T](acquire: () => Try[T]): ResourceOwner[T] =
new FutureResourceOwner[T](() => Future.fromTry(acquire()))
def forFuture[T](acquire: () => Future[T]): ResourceOwner[T] =
new FutureResourceOwner(acquire)
def forCompletionStage[T](acquire: () => CompletionStage[T]): ResourceOwner[T] =
new FutureResourceOwner(() => acquire().toScala)
def forCloseable[T <: AutoCloseable](acquire: () => T): ResourceOwner[T] =
new CloseableResourceOwner(acquire)

View File

@ -3,6 +3,7 @@
package com.digitalasset.platform.resources
import java.util.concurrent.CompletableFuture.completedFuture
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.{Executors, RejectedExecutionException}
import java.util.{Timer, TimerTask}
@ -351,6 +352,23 @@ class ResourceOwnerSpec extends AsyncWordSpec with Matchers {
}
}
"a function returning a Try" should {
"convert to a ResourceOwner" in {
val resource = for {
value <- ResourceOwner.forTry(() => Success(49)).acquire()
} yield {
value should be(49)
}
for {
_ <- resource.asFuture
_ <- resource.release()
} yield {
succeed
}
}
}
"a function returning a Future" should {
"convert to a ResourceOwner" in {
val resource = for {
@ -368,6 +386,25 @@ class ResourceOwnerSpec extends AsyncWordSpec with Matchers {
}
}
"a function returning a CompletionStage" should {
"convert to a ResourceOwner" in {
val resource = for {
value <- ResourceOwner
.forCompletionStage(() => completedFuture(63))
.acquire()
} yield {
value should be(63)
}
for {
_ <- resource.asFuture
_ <- resource.release()
} yield {
succeed
}
}
}
"a function returning an AutoCloseable" should {
"convert to a ResourceOwner" in {
val newCloseable = new MockConstructor(acquired => new TestCloseable(42, acquired))