mirror of
https://github.com/digital-asset/daml.git
synced 2024-09-20 01:07:18 +03:00
Use ExecutionContext.parasitic instead of DirectExecutionContext. (#11783)
* concurrent: Replace `DirectExecutionContextInternal` with `parasitic`. * concurrent: Rename `DirectExecutionContext` `parasitic`. * Use `ExecutionContext.parasitic` instead of `DirectExecutionContext`. We no longer need the latter. CHANGELOG_BEGIN CHANGELOG_END * Fix formatting.
This commit is contained in:
parent
9a01bb9b86
commit
9f8e640b1a
@ -17,7 +17,6 @@ da_scala_library(
|
||||
deps = [
|
||||
"//ledger-api/rs-grpc-bridge",
|
||||
"//ledger/error",
|
||||
"//libs-scala/concurrent",
|
||||
"//libs-scala/contextualized-logging",
|
||||
"@maven//:io_grpc_grpc_api",
|
||||
"@maven//:io_grpc_grpc_stub",
|
||||
|
@ -12,7 +12,6 @@ import com.daml.grpc.adapter.RunnableSequencingActor.ShutdownRequest
|
||||
import scala.concurrent.duration.FiniteDuration
|
||||
import scala.concurrent.{ExecutionContext, Future}
|
||||
import scala.util.control.NonFatal
|
||||
import com.daml.dec.DirectExecutionContext
|
||||
|
||||
/** Implements serial execution semantics by forwarding the Runnables it receives to an underlying actor.
|
||||
*/
|
||||
@ -23,7 +22,7 @@ class AkkaExecutionSequencer private (private val actorRef: ActorRef)(implicit
|
||||
override def sequence(runnable: Runnable): Unit = actorRef ! runnable
|
||||
|
||||
override def close(): Unit = {
|
||||
closeAsync(DirectExecutionContext)
|
||||
closeAsync(ExecutionContext.parasitic)
|
||||
()
|
||||
}
|
||||
|
||||
|
@ -34,7 +34,6 @@ da_scala_library(
|
||||
"//ledger-api/grpc-definitions:ledger_api_proto_scala",
|
||||
"//ledger-api/rs-grpc-akka",
|
||||
"//ledger-api/rs-grpc-bridge",
|
||||
"//libs-scala/concurrent",
|
||||
"//libs-scala/contextualized-logging",
|
||||
"//libs-scala/grpc-utils",
|
||||
"//libs-scala/resources",
|
||||
|
@ -5,16 +5,15 @@ package com.daml.ledger.api.testing.utils
|
||||
|
||||
import java.util.concurrent.{Executors, ScheduledExecutorService, TimeUnit}
|
||||
|
||||
import com.daml.dec.DirectExecutionContext
|
||||
import com.daml.logging.LoggingContext
|
||||
import org.scalatest._
|
||||
import org.scalatest.concurrent.{AsyncTimeLimitedTests, ScaledTimeSpans}
|
||||
import org.scalatest.exceptions.TestCanceledException
|
||||
import org.scalatest.time.Span
|
||||
import org.scalatest.{Assertion, Assertions, AsyncTestSuite, BeforeAndAfterAll, Succeeded}
|
||||
|
||||
import scala.collection.immutable.Iterable
|
||||
import scala.concurrent.duration.DurationInt
|
||||
import scala.concurrent.{Future, Promise, TimeoutException}
|
||||
import scala.concurrent.{ExecutionContext, Future, Promise, TimeoutException}
|
||||
import scala.util.control.{NoStackTrace, NonFatal}
|
||||
|
||||
trait MultiFixtureBase[FixtureId, TestContext]
|
||||
@ -95,10 +94,12 @@ trait MultiFixtureBase[FixtureId, TestContext]
|
||||
|
||||
try {
|
||||
Future
|
||||
.firstCompletedOf(List(runTest(testFixture), timeoutPromise.future))(DirectExecutionContext)
|
||||
.firstCompletedOf(List(runTest(testFixture), timeoutPromise.future))(
|
||||
ExecutionContext.parasitic
|
||||
)
|
||||
.recover { case NonFatal(throwable) =>
|
||||
failOnFixture(throwable)
|
||||
}(DirectExecutionContext)
|
||||
}(ExecutionContext.parasitic)
|
||||
} catch {
|
||||
case NonFatal(throwable) => failOnFixture(throwable)
|
||||
}
|
||||
|
@ -36,7 +36,6 @@ da_scala_library(
|
||||
"//ledger/metrics",
|
||||
"//ledger/participant-integration-api",
|
||||
"//ledger/participant-state",
|
||||
"//libs-scala/concurrent",
|
||||
"//libs-scala/contextualized-logging",
|
||||
"//libs-scala/postgresql-testing",
|
||||
"//libs-scala/resources",
|
||||
@ -65,7 +64,6 @@ da_scala_library(
|
||||
"//ledger/metrics",
|
||||
"//ledger/participant-state",
|
||||
"//ledger/participant-state/kvutils",
|
||||
"//libs-scala/concurrent",
|
||||
"//libs-scala/contextualized-logging",
|
||||
"@maven//:io_dropwizard_metrics_metrics_core",
|
||||
],
|
||||
|
@ -11,7 +11,6 @@ import akka.actor.ActorSystem
|
||||
import akka.stream.Materializer
|
||||
import akka.stream.scaladsl.{Sink, Source}
|
||||
import com.codahale.metrics.MetricRegistry
|
||||
import com.daml.dec.DirectExecutionContext
|
||||
import com.daml.ledger.api.health.{HealthStatus, Healthy}
|
||||
import com.daml.ledger.configuration.LedgerId
|
||||
import com.daml.ledger.offset.Offset
|
||||
@ -26,7 +25,7 @@ import com.daml.ledger.participant.state.v2.Update
|
||||
import com.daml.logging.LoggingContext.newLoggingContext
|
||||
import com.daml.metrics.Metrics
|
||||
|
||||
import scala.concurrent.Future
|
||||
import scala.concurrent.{ExecutionContext, Future}
|
||||
|
||||
object Main {
|
||||
def main(args: Array[String]): Unit =
|
||||
@ -97,8 +96,8 @@ object Main {
|
||||
data
|
||||
}
|
||||
.runWith(Sink.seq[(Offset, Update)])
|
||||
.map(seq => seq.iterator)(DirectExecutionContext)
|
||||
.andThen { case _ => system.terminate() }(DirectExecutionContext)
|
||||
.map(seq => seq.iterator)(ExecutionContext.parasitic)
|
||||
.andThen { case _ => system.terminate() }(ExecutionContext.parasitic)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -10,7 +10,6 @@ import akka.actor.ActorSystem
|
||||
import akka.stream.Materializer
|
||||
import akka.stream.scaladsl.Source
|
||||
import com.codahale.metrics.{MetricRegistry, Snapshot}
|
||||
import com.daml.dec.DirectExecutionContext
|
||||
import com.daml.ledger.api.health.{HealthStatus, Healthy}
|
||||
import com.daml.ledger.configuration.{Configuration, LedgerInitialConditions, LedgerTimeModel}
|
||||
import com.daml.ledger.offset.Offset
|
||||
@ -44,7 +43,7 @@ class IndexerBenchmark() {
|
||||
.use(db => {
|
||||
println(s"Running the indexer benchmark against the ephemeral Postgres database ${db.url}")
|
||||
run(createUpdates, config.copy(indexerConfig = config.indexerConfig.copy(jdbcUrl = db.url)))
|
||||
})(DirectExecutionContext)
|
||||
})(ExecutionContext.parasitic)
|
||||
}
|
||||
|
||||
def run(
|
||||
|
@ -36,7 +36,6 @@ da_scala_library(
|
||||
"//ledger/ledger-grpc",
|
||||
"//ledger/ledger-resources",
|
||||
"//ledger/metrics",
|
||||
"//libs-scala/concurrent",
|
||||
"//libs-scala/grpc-utils",
|
||||
"//libs-scala/ports",
|
||||
"//libs-scala/resources",
|
||||
|
@ -9,7 +9,6 @@ import java.util.concurrent.TimeUnit
|
||||
import akka.NotUsed
|
||||
import akka.stream.scaladsl.{Sink, Source}
|
||||
import com.daml.api.util.TimeProvider
|
||||
import com.daml.dec.DirectExecutionContext
|
||||
import com.daml.ledger.api.domain
|
||||
import com.daml.ledger.api.testing.utils.{
|
||||
IsStatusException,
|
||||
@ -90,10 +89,12 @@ final class CommandClientIT
|
||||
configuration,
|
||||
)
|
||||
|
||||
private def timeProvider(ledgerId: domain.LedgerId): Future[TimeProvider] = {
|
||||
private def timeProvider(
|
||||
ledgerId: domain.LedgerId
|
||||
): Future[TimeProvider] = {
|
||||
StaticTime
|
||||
.updatedVia(TimeServiceGrpc.stub(channel), ledgerId.unwrap)
|
||||
.recover { case NonFatal(_) => TimeProvider.UTC }(DirectExecutionContext)
|
||||
.recover { case NonFatal(_) => TimeProvider.UTC }
|
||||
}
|
||||
|
||||
private def commandClient(
|
||||
@ -102,9 +103,7 @@ final class CommandClientIT
|
||||
configuration: CommandClientConfiguration = defaultCommandClientConfiguration,
|
||||
): Future[CommandClient] =
|
||||
timeProvider(ledgerId)
|
||||
.map(_ => commandClientWithoutTime(ledgerId, applicationId, configuration))(
|
||||
DirectExecutionContext
|
||||
)
|
||||
.map(_ => commandClientWithoutTime(ledgerId, applicationId, configuration))
|
||||
|
||||
override protected def config: SandboxConfig =
|
||||
super.config.copy(ledgerIdMode = LedgerIdMode.Static(testLedgerId))
|
||||
@ -171,7 +170,7 @@ final class CommandClientIT
|
||||
notOk.grpcStatus.code should be(expectedErrorCode.value)
|
||||
notOk.grpcStatus.message should include(expectedMessageSubString)
|
||||
}
|
||||
}(DirectExecutionContext)
|
||||
}
|
||||
|
||||
/** Reads a set of command IDs expected in the given client after the given checkpoint.
|
||||
* Returns a pair of sets (elements seen, elements not seen).
|
||||
|
@ -5,12 +5,11 @@ package com.daml.ledger.client.services.commands
|
||||
|
||||
import akka.NotUsed
|
||||
import akka.stream.scaladsl.Flow
|
||||
import com.daml.dec.DirectExecutionContext
|
||||
import com.daml.ledger.api.v1.command_submission_service.SubmitRequest
|
||||
import com.daml.util.Ctx
|
||||
import com.google.protobuf.empty.Empty
|
||||
|
||||
import scala.concurrent.Future
|
||||
import scala.concurrent.{ExecutionContext, Future}
|
||||
import scala.util.{Success, Try}
|
||||
|
||||
object CommandSubmissionFlow {
|
||||
@ -33,7 +32,7 @@ object CommandSubmissionFlow {
|
||||
telemetryContext,
|
||||
)
|
||||
)
|
||||
}(DirectExecutionContext)
|
||||
}(ExecutionContext.parasitic)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -3,7 +3,6 @@
|
||||
|
||||
package com.daml.ledger.client.services.identity
|
||||
|
||||
import com.daml.dec.DirectExecutionContext
|
||||
import com.daml.ledger.api.domain.LedgerId
|
||||
import com.daml.ledger.api.v1.ledger_identity_service.GetLedgerIdentityRequest
|
||||
import com.daml.ledger.api.v1.ledger_identity_service.LedgerIdentityServiceGrpc.LedgerIdentityServiceStub
|
||||
@ -16,9 +15,10 @@ final class LedgerIdentityClient(service: LedgerIdentityServiceStub) {
|
||||
|
||||
/** The ledgerId in use, if the check was successful.
|
||||
*/
|
||||
def satisfies(ledgerIdRequirement: LedgerIdRequirement, token: Option[String] = None)(implicit
|
||||
ec: ExecutionContext
|
||||
): Future[LedgerId] =
|
||||
def satisfies(
|
||||
ledgerIdRequirement: LedgerIdRequirement,
|
||||
token: Option[String] = None,
|
||||
)(implicit executionContext: ExecutionContext): Future[LedgerId] =
|
||||
for {
|
||||
ledgerId <- getLedgerId(token)
|
||||
} yield {
|
||||
@ -30,10 +30,12 @@ final class LedgerIdentityClient(service: LedgerIdentityServiceStub) {
|
||||
LedgerId(ledgerId)
|
||||
}
|
||||
|
||||
def getLedgerId(token: Option[String] = None): Future[String] =
|
||||
def getLedgerId(
|
||||
token: Option[String] = None
|
||||
)(implicit executionContext: ExecutionContext): Future[String] =
|
||||
LedgerClient
|
||||
.stub(service, token)
|
||||
.getLedgerIdentity(new GetLedgerIdentityRequest())
|
||||
.map(_.ledgerId)(DirectExecutionContext)
|
||||
.map(_.ledgerId)
|
||||
|
||||
}
|
||||
|
@ -8,13 +8,12 @@ import java.util.concurrent.atomic.AtomicReference
|
||||
|
||||
import akka.stream.scaladsl.{Broadcast, Flow, GraphDSL, RunnableGraph, Sink}
|
||||
import akka.stream.{ClosedShape, KillSwitches, Materializer, UniqueKillSwitch}
|
||||
import com.daml.api.util.{TimeProvider, TimestampConversion}
|
||||
import com.daml.api.util.TimestampConversion._
|
||||
import com.daml.api.util.{TimeProvider, TimestampConversion}
|
||||
import com.daml.grpc.adapter.ExecutionSequencerFactory
|
||||
import com.daml.grpc.adapter.client.akka.ClientAdapter
|
||||
import com.daml.dec.DirectExecutionContext
|
||||
import com.daml.ledger.api.v1.testing.time_service.{GetTimeRequest, SetTimeRequest}
|
||||
import com.daml.ledger.api.v1.testing.time_service.TimeServiceGrpc.{TimeService, TimeServiceStub}
|
||||
import com.daml.ledger.api.v1.testing.time_service.{GetTimeRequest, SetTimeRequest}
|
||||
import com.daml.ledger.client.LedgerClient
|
||||
|
||||
import scala.concurrent.{ExecutionContext, Future}
|
||||
@ -68,7 +67,7 @@ object StaticTime {
|
||||
// We serve this in a future which completes when the first element has passed through.
|
||||
// Thus we make sure that the object we serve already received time data from the ledger.
|
||||
futureOfFirstElem.map(_ => new StaticTime(timeService, clockRef, killSwitch, ledgerId))(
|
||||
DirectExecutionContext
|
||||
ExecutionContext.parasitic
|
||||
)
|
||||
} { implicit b => (killSwitch, sinkHead) =>
|
||||
import GraphDSL.Implicits._
|
||||
|
@ -6,12 +6,14 @@ package com.daml.ledger.client.services.version
|
||||
import com.daml.ledger.api.domain.LedgerId
|
||||
import com.daml.ledger.api.v1.version_service.VersionServiceGrpc.VersionServiceStub
|
||||
|
||||
import scala.concurrent.Future
|
||||
import scala.concurrent.{ExecutionContext, Future}
|
||||
|
||||
final class VersionClient(ledgerId: LedgerId, service: VersionServiceStub) {
|
||||
private val it = new withoutledgerid.VersionClient(service)
|
||||
|
||||
def getApiVersion(token: Option[String] = None): Future[String] =
|
||||
def getApiVersion(
|
||||
token: Option[String] = None
|
||||
)(implicit executionContext: ExecutionContext): Future[String] =
|
||||
it.getApiVersion(ledgerId, token)
|
||||
|
||||
}
|
||||
|
@ -3,24 +3,23 @@
|
||||
|
||||
package com.daml.ledger.client.services.version.withoutledgerid
|
||||
|
||||
import com.daml.dec.DirectExecutionContext
|
||||
import com.daml.ledger.api.domain.LedgerId
|
||||
import com.daml.ledger.api.v1.version_service.GetLedgerApiVersionRequest
|
||||
import com.daml.ledger.api.v1.version_service.VersionServiceGrpc.VersionServiceStub
|
||||
import com.daml.ledger.client.LedgerClient
|
||||
import scalaz.syntax.tag._
|
||||
|
||||
import scala.concurrent.Future
|
||||
import scala.concurrent.{ExecutionContext, Future}
|
||||
|
||||
private[daml] final class VersionClient(service: VersionServiceStub) {
|
||||
def getApiVersion(
|
||||
ledgerIdToUse: LedgerId,
|
||||
token: Option[String] = None,
|
||||
): Future[String] =
|
||||
)(implicit executionContext: ExecutionContext): Future[String] =
|
||||
LedgerClient
|
||||
.stub(service, token)
|
||||
.getLedgerApiVersion(
|
||||
new GetLedgerApiVersionRequest(ledgerIdToUse.unwrap)
|
||||
)
|
||||
.map(_.version)(DirectExecutionContext)
|
||||
.map(_.version)
|
||||
}
|
||||
|
@ -14,7 +14,6 @@ import akka.stream.testkit.{TestPublisher, TestSubscriber}
|
||||
import akka.stream.{OverflowStrategy, QueueOfferResult}
|
||||
import com.daml.api.util.TimestampConversion._
|
||||
import com.daml.concurrent.ExecutionContext
|
||||
import com.daml.dec.DirectExecutionContext
|
||||
import com.daml.ledger.api.testing.utils.AkkaBeforeAndAfterAll
|
||||
import com.daml.ledger.api.v1.command_completion_service.Checkpoint
|
||||
import com.daml.ledger.api.v1.commands.Commands
|
||||
@ -105,7 +104,7 @@ class CommandTrackerFlowTest
|
||||
startOffset: LedgerOffset,
|
||||
)
|
||||
|
||||
private implicit val ec: ExecutionContext[Nothing] = DirectExecutionContext
|
||||
private implicit val ec: ExecutionContext[Nothing] = ExecutionContext.parasitic
|
||||
private val stateRef = new AtomicReference[Promise[State]](Promise[State]())
|
||||
|
||||
def createCompletionsSource(
|
||||
|
@ -43,7 +43,6 @@ da_scala_library(
|
||||
"//ledger/ledger-offset",
|
||||
"//ledger/ledger-resources",
|
||||
"//ledger/metrics",
|
||||
"//libs-scala/concurrent",
|
||||
"//libs-scala/contextualized-logging",
|
||||
"//libs-scala/grpc-utils",
|
||||
"//libs-scala/logging-entries",
|
||||
@ -81,7 +80,6 @@ da_scala_library(
|
||||
"//daml-lf/transaction",
|
||||
"//language-support/scala/bindings",
|
||||
"//ledger/ledger-api-domain",
|
||||
"//libs-scala/concurrent",
|
||||
"//libs-scala/grpc-utils",
|
||||
"@maven//:com_google_api_grpc_proto_google_common_protos",
|
||||
"@maven//:org_scalatest_scalatest_compatible",
|
||||
|
@ -9,9 +9,10 @@ import akka.NotUsed
|
||||
import akka.stream._
|
||||
import akka.stream.scaladsl.{Source, SourceQueueWithComplete}
|
||||
import com.daml.platform.akkastreams.dispatcher.SignalDispatcher.Signal
|
||||
import com.daml.dec.DirectExecutionContext
|
||||
import org.slf4j.LoggerFactory
|
||||
|
||||
import scala.concurrent.ExecutionContext
|
||||
|
||||
/** A fanout signaller that can be subscribed to dynamically.
|
||||
* Signals may be coalesced, but if a signal is sent, we guarantee that all consumers subscribed before
|
||||
* the signal is sent will eventually receive a signal.
|
||||
@ -53,7 +54,7 @@ class SignalDispatcher private () extends AutoCloseable {
|
||||
q.watchCompletion()
|
||||
.onComplete { _ =>
|
||||
runningState.updateAndGet(_.map(s => s - q))
|
||||
}(DirectExecutionContext)
|
||||
}(ExecutionContext.parasitic)
|
||||
NotUsed
|
||||
}
|
||||
|
||||
|
@ -5,10 +5,9 @@ package com.daml.platform.akkastreams.dispatcher
|
||||
|
||||
import akka.NotUsed
|
||||
import akka.stream.scaladsl.Source
|
||||
import com.daml.dec.DirectExecutionContext
|
||||
|
||||
import scala.annotation.nowarn
|
||||
import scala.concurrent.Future
|
||||
import scala.concurrent.{ExecutionContext, Future}
|
||||
|
||||
/** Defines how the progress on the ledger should be mapped to look-up operations */
|
||||
@nowarn("msg=parameter value evidence.* is never used")
|
||||
@ -44,7 +43,7 @@ object SubSource {
|
||||
readElement(index).map { t =>
|
||||
val nextIndex = readSuccessor(index)
|
||||
Some((nextIndex, (index, t)))
|
||||
}(DirectExecutionContext)
|
||||
}(ExecutionContext.parasitic)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -3,12 +3,11 @@
|
||||
|
||||
package com.daml.platform.akkastreams
|
||||
import akka.actor.ActorSystem
|
||||
import com.daml.dec.DirectExecutionContext
|
||||
import org.scalatest.Assertion
|
||||
import org.scalatest.wordspec.AsyncWordSpec
|
||||
|
||||
import scala.concurrent.{Future, Promise, TimeoutException}
|
||||
import scala.concurrent.duration.FiniteDuration
|
||||
import scala.concurrent.{ExecutionContext, Future, Promise, TimeoutException}
|
||||
import scala.util.Try
|
||||
import scala.util.control.NoStackTrace
|
||||
|
||||
@ -28,10 +27,10 @@ trait FutureTimeouts { self: AsyncWordSpec =>
|
||||
runnable,
|
||||
)(system.dispatcher)
|
||||
|
||||
f.onComplete((_: Try[Any]) => cancellable.cancel())(DirectExecutionContext)
|
||||
f.onComplete((_: Try[Any]) => cancellable.cancel())(ExecutionContext.parasitic)
|
||||
|
||||
recoverToSucceededIf[TimeoutException](
|
||||
Future.firstCompletedOf[Any](List[Future[Any]](f, promise.future))(DirectExecutionContext)
|
||||
Future.firstCompletedOf[Any](List[Future[Any]](f, promise.future))(ExecutionContext.parasitic)
|
||||
)
|
||||
}
|
||||
}
|
||||
|
@ -8,7 +8,6 @@ import java.util.concurrent.Executors
|
||||
import javax.sql.DataSource
|
||||
|
||||
import com.daml.concurrent.{ExecutionContext, Future}
|
||||
import com.daml.dec.DirectExecutionContext
|
||||
import com.daml.ledger.on.sql.Database._
|
||||
import com.daml.ledger.on.sql.queries._
|
||||
import com.daml.ledger.participant.state.kvutils.KVOffsetBuilder
|
||||
@ -143,7 +142,7 @@ object Database {
|
||||
implicit val writerConnectionPool: ConnectionPool[Writer] =
|
||||
new ConnectionPool(writerDataSource)
|
||||
implicit val adminConnectionPool: ConnectionPool[Migrator] =
|
||||
new ConnectionPool(adminDataSource)(DirectExecutionContext)
|
||||
new ConnectionPool(adminDataSource)(ExecutionContext.parasitic)
|
||||
new UninitializedDatabase(system, offsetBuilder, metrics)
|
||||
}
|
||||
}
|
||||
@ -169,7 +168,7 @@ object Database {
|
||||
implicit val readerWriterConnectionPool: ConnectionPool[Reader with Writer] =
|
||||
new ConnectionPool(readerWriterDataSource)
|
||||
implicit val adminConnectionPool: ConnectionPool[Migrator] =
|
||||
new ConnectionPool(adminDataSource)(DirectExecutionContext)
|
||||
new ConnectionPool(adminDataSource)(ExecutionContext.parasitic)
|
||||
new UninitializedDatabase(system, offsetBuilder, metrics)
|
||||
}
|
||||
}
|
||||
|
@ -9,9 +9,8 @@ import akka.Done
|
||||
import akka.stream.scaladsl.{Keep, Source}
|
||||
import com.codahale.metrics.{Counter, Meter, Timer}
|
||||
import com.daml.concurrent
|
||||
import com.daml.dec.DirectExecutionContext
|
||||
|
||||
import scala.concurrent.Future
|
||||
import scala.concurrent.{ExecutionContext, Future}
|
||||
|
||||
object Timed {
|
||||
|
||||
@ -56,25 +55,25 @@ object Timed {
|
||||
def future[T](timer: Timer, future: => Future[T]): Future[T] = {
|
||||
val ctx = timer.time()
|
||||
val result = future
|
||||
result.onComplete(_ => ctx.stop())(DirectExecutionContext)
|
||||
result.onComplete(_ => ctx.stop())(ExecutionContext.parasitic)
|
||||
result
|
||||
}
|
||||
|
||||
def future[EC, T](timer: Timer, future: => concurrent.Future[EC, T]): concurrent.Future[EC, T] = {
|
||||
val ctx = timer.time()
|
||||
val result = future
|
||||
result.onComplete(_ => ctx.stop())(DirectExecutionContext)
|
||||
result.onComplete(_ => ctx.stop())(concurrent.ExecutionContext.parasitic)
|
||||
result
|
||||
}
|
||||
|
||||
def trackedFuture[T](counter: Counter, future: => Future[T]): Future[T] = {
|
||||
counter.inc()
|
||||
future.andThen { case _ => counter.dec() }(DirectExecutionContext)
|
||||
future.andThen { case _ => counter.dec() }(ExecutionContext.parasitic)
|
||||
}
|
||||
|
||||
def trackedFuture[T](meter: Meter, future: => Future[T]): Future[T] = {
|
||||
meter.mark(+1)
|
||||
future.andThen { case _ => meter.mark(-1) }(DirectExecutionContext)
|
||||
future.andThen { case _ => meter.mark(-1) }(ExecutionContext.parasitic)
|
||||
}
|
||||
|
||||
def timedAndTrackedFuture[T](timer: Timer, counter: Counter, future: => Future[T]): Future[T] = {
|
||||
@ -90,7 +89,7 @@ object Timed {
|
||||
source
|
||||
.watchTermination()(Keep.both[Mat, Future[Done]])
|
||||
.mapMaterializedValue { case (mat, done) =>
|
||||
done.onComplete(_ => ctx.stop())(DirectExecutionContext)
|
||||
done.onComplete(_ => ctx.stop())(ExecutionContext.parasitic)
|
||||
mat
|
||||
}
|
||||
}
|
||||
|
@ -5,11 +5,10 @@ package com.daml.telemetry
|
||||
|
||||
import java.util.{HashMap => jHashMap, Map => jMap}
|
||||
|
||||
import com.daml.dec.DirectExecutionContext
|
||||
import io.opentelemetry.api.trace.{Span, Tracer}
|
||||
import io.opentelemetry.context.Context
|
||||
|
||||
import scala.concurrent.Future
|
||||
import scala.concurrent.{ExecutionContext, Future}
|
||||
import scala.util.{Failure, Success}
|
||||
|
||||
trait TelemetryContext {
|
||||
@ -112,7 +111,7 @@ protected class DefaultTelemetryContext(protected val tracer: Tracer, protected
|
||||
subSpan.end()
|
||||
case Success(_) =>
|
||||
subSpan.end()
|
||||
}(DirectExecutionContext)
|
||||
}(ExecutionContext.parasitic)
|
||||
}
|
||||
|
||||
override def runInNewSpan[T](
|
||||
|
@ -7,7 +7,6 @@ import akka.stream.scaladsl.{Flow, Keep, Sink}
|
||||
import akka.stream.{BoundedSourceQueue, Materializer, QueueOfferResult}
|
||||
import akka.{Done, NotUsed}
|
||||
import com.codahale.metrics.{Counter, Timer}
|
||||
import com.daml.dec.DirectExecutionContext
|
||||
import com.daml.error.DamlContextualizedErrorLogger
|
||||
import com.daml.ledger.client.services.commands.CommandSubmission
|
||||
import com.daml.ledger.client.services.commands.CommandTrackerFlow.Materialized
|
||||
@ -146,8 +145,8 @@ private[services] object QueueBackedTracker {
|
||||
errorFactories.trackerFailure(msg = promiseCancellationDescription)(errorLogger)
|
||||
)
|
||||
)
|
||||
})(DirectExecutionContext)
|
||||
}(DirectExecutionContext)
|
||||
})(ExecutionContext.parasitic)
|
||||
}(ExecutionContext.parasitic)
|
||||
|
||||
new QueueBackedTracker(queue, done, errorFactories)
|
||||
}
|
||||
|
@ -8,7 +8,6 @@ import java.util.concurrent.TimeoutException
|
||||
import java.util.concurrent.atomic.AtomicReference
|
||||
|
||||
import akka.stream.Materializer
|
||||
import com.daml.dec.DirectExecutionContext
|
||||
import com.daml.ledger.api.v1.commands.Commands
|
||||
import com.daml.ledger.client.services.commands.CommandSubmission
|
||||
import com.daml.ledger.client.services.commands.tracker.CompletionResponse.{
|
||||
@ -27,9 +26,9 @@ import scala.util.{Failure, Success}
|
||||
* A tracker tracker, if you will.
|
||||
*
|
||||
* @param retentionPeriod The minimum duration for which to retain ready-but-idling trackers.
|
||||
* @param getKey A function to compute the tracker key from the commands.
|
||||
* @param newTracker A function to construct a new tracker.
|
||||
* Called when there is no tracker for the given key.
|
||||
* @param getKey A function to compute the tracker key from the commands.
|
||||
* @param newTracker A function to construct a new tracker.
|
||||
* Called when there is no tracker for the given key.
|
||||
*/
|
||||
private[services] final class TrackerMap[Key](
|
||||
retentionPeriod: Duration,
|
||||
@ -149,9 +148,13 @@ private[services] object TrackerMap {
|
||||
}
|
||||
|
||||
private sealed trait AsyncResourceState[+T <: AutoCloseable]
|
||||
|
||||
private final case object Waiting extends AsyncResourceState[Nothing]
|
||||
|
||||
private final case class Ready[T <: AutoCloseable](resource: T) extends AsyncResourceState[T]
|
||||
|
||||
private final case object Closed extends AsyncResourceState[Nothing]
|
||||
|
||||
private final case class Failed(exception: Throwable) extends AsyncResourceState[Nothing]
|
||||
|
||||
/** A holder for an AutoCloseable that can be opened and closed async.
|
||||
@ -170,7 +173,7 @@ private[services] object TrackerMap {
|
||||
state.set(Ready(resource))
|
||||
case Failure(exception) =>
|
||||
state.set(Failed(exception))
|
||||
}(DirectExecutionContext)
|
||||
}(ExecutionContext.parasitic)
|
||||
|
||||
private[TrackerMap] def currentState: AsyncResourceState[T] = state.get()
|
||||
|
||||
@ -188,7 +191,7 @@ private[services] object TrackerMap {
|
||||
case Waiting =>
|
||||
try {
|
||||
Await.result(
|
||||
future.transform(Success(_))(DirectExecutionContext),
|
||||
future.transform(Success(_))(ExecutionContext.parasitic),
|
||||
10.seconds,
|
||||
) match {
|
||||
case Success(resource) => resource.close()
|
||||
|
@ -6,7 +6,6 @@ package com.daml.platform.index
|
||||
import akka.NotUsed
|
||||
import akka.stream.scaladsl.Source
|
||||
import com.daml.daml_lf_dev.DamlLf.Archive
|
||||
import com.daml.dec.{DirectExecutionContext => DEC}
|
||||
import com.daml.error.DamlContextualizedErrorLogger
|
||||
import com.daml.ledger.api.domain
|
||||
import com.daml.ledger.api.domain.ConfigurationEntry.Accepted
|
||||
@ -48,7 +47,7 @@ import com.daml.platform.store.entries.PartyLedgerEntry
|
||||
import com.daml.telemetry.{SpanAttribute, Spans}
|
||||
import scalaz.syntax.tag.ToTagOps
|
||||
|
||||
import scala.concurrent.Future
|
||||
import scala.concurrent.{ExecutionContext, Future}
|
||||
|
||||
private[platform] final class LedgerBackedIndexService(
|
||||
ledger: ReadOnlyLedger,
|
||||
@ -287,7 +286,9 @@ private[platform] final class LedgerBackedIndexService(
|
||||
): Future[Option[(LedgerOffset.Absolute, Configuration)]] =
|
||||
ledger
|
||||
.lookupLedgerConfiguration()
|
||||
.map(_.map { case (offset, config) => (toAbsolute(offset), config) })(DEC)
|
||||
.map(
|
||||
_.map { case (offset, config) => (toAbsolute(offset), config) }
|
||||
)(ExecutionContext.parasitic)
|
||||
|
||||
/** Looks up the current configuration, if set, and continues to stream configuration changes.
|
||||
*/
|
||||
|
@ -6,7 +6,6 @@ package com.daml.platform.store
|
||||
import akka.NotUsed
|
||||
import akka.stream.scaladsl.Source
|
||||
import com.daml.daml_lf_dev.DamlLf
|
||||
import com.daml.dec.DirectExecutionContext
|
||||
import com.daml.ledger.api.domain
|
||||
import com.daml.ledger.api.domain.{ApplicationId, CommandId, LedgerId}
|
||||
import com.daml.ledger.api.health.HealthStatus
|
||||
@ -48,8 +47,6 @@ private[platform] abstract class BaseLedger(
|
||||
dispatcher: Dispatcher[Offset],
|
||||
) extends ReadOnlyLedger {
|
||||
|
||||
implicit private val DEC: ExecutionContext = DirectExecutionContext
|
||||
|
||||
override def currentHealth(): HealthStatus = ledgerDao.currentHealth()
|
||||
|
||||
override def lookupKey(key: GlobalKey, forParties: Set[Ref.Party])(implicit
|
||||
@ -166,9 +163,7 @@ private[platform] abstract class BaseLedger(
|
||||
.getLfArchive(packageId)
|
||||
.flatMap(archiveO =>
|
||||
Future.fromTry(Try(archiveO.map(archive => Decode.assertDecodeArchive(archive)._2)))
|
||||
)(
|
||||
DEC
|
||||
)
|
||||
)(ExecutionContext.parasitic)
|
||||
|
||||
override def packageEntries(startExclusive: Offset)(implicit
|
||||
loggingContext: LoggingContext
|
||||
|
@ -5,9 +5,8 @@ package com.daml.platform.store.appendonlydao
|
||||
|
||||
import akka.NotUsed
|
||||
import akka.stream.scaladsl.Source
|
||||
import com.daml.dec.DirectExecutionContext
|
||||
|
||||
import scala.concurrent.Future
|
||||
import scala.concurrent.{ExecutionContext, Future}
|
||||
|
||||
private[platform] object PaginatingAsyncStream {
|
||||
|
||||
@ -26,7 +25,7 @@ private[platform] object PaginatingAsyncStream {
|
||||
* This is not designed to page through results using the "seek method":
|
||||
* https://use-the-index-luke.com/sql/partial-results/fetch-next-page
|
||||
*
|
||||
* @param pageSize number of items to retrieve per call
|
||||
* @param pageSize number of items to retrieve per call
|
||||
* @param queryPage takes the offset from which to start the next page and returns that page
|
||||
* @tparam T the type of the items returned in each call
|
||||
*/
|
||||
@ -39,7 +38,7 @@ private[platform] object PaginatingAsyncStream {
|
||||
val resultSize = result.size.toLong
|
||||
val newQueryOffset = if (resultSize < pageSize) None else Some(queryOffset + pageSize)
|
||||
Some(newQueryOffset -> result)
|
||||
}(DirectExecutionContext)
|
||||
}(ExecutionContext.parasitic)
|
||||
}
|
||||
.flatMapConcat(Source(_))
|
||||
}
|
||||
@ -57,10 +56,10 @@ private[platform] object PaginatingAsyncStream {
|
||||
* lookup calls.
|
||||
*
|
||||
* @param startFromOffset initial offset
|
||||
* @param getOffset function that returns a position/offset from the element of type [[T]]
|
||||
* @param query a function that fetches results starting from provided offset
|
||||
* @param getOffset function that returns a position/offset from the element of type [[T]]
|
||||
* @param query a function that fetches results starting from provided offset
|
||||
* @tparam Off the type of the offset
|
||||
* @tparam T the type of the items returned in each call
|
||||
* @tparam T the type of the items returned in each call
|
||||
*/
|
||||
def streamFrom[Off, T](startFromOffset: Off, getOffset: T => Off)(
|
||||
query: Off => Future[Vector[T]]
|
||||
@ -73,9 +72,7 @@ private[platform] object PaginatingAsyncStream {
|
||||
query(offset).map { result =>
|
||||
val nextPageOffset: Option[Off] = result.lastOption.map(getOffset)
|
||||
Some((nextPageOffset, result))
|
||||
}(
|
||||
DirectExecutionContext
|
||||
) // run in the same thread as the query, avoid context switch for a cheap operation
|
||||
}(ExecutionContext.parasitic)
|
||||
}
|
||||
.flatMapConcat(Source(_))
|
||||
}
|
||||
|
@ -6,7 +6,6 @@ package com.daml.platform.store.appendonlydao.events
|
||||
import akka.NotUsed
|
||||
import akka.stream.scaladsl.Source
|
||||
import akka.stream.{BoundedSourceQueue, Materializer, QueueOfferResult}
|
||||
import com.daml.dec.DirectExecutionContext
|
||||
import com.daml.error.definitions.LedgerApiErrors
|
||||
import com.daml.error.definitions.LedgerApiErrors.ParticipantBackpressure
|
||||
import com.daml.error.{ContextualizedErrorLogger, DamlContextualizedErrorLogger}
|
||||
@ -20,7 +19,7 @@ import com.daml.platform.store.utils.ConcurrencyLimiter
|
||||
|
||||
import scala.annotation.tailrec
|
||||
import scala.collection.mutable
|
||||
import scala.concurrent.Future
|
||||
import scala.concurrent.{ExecutionContext, Future}
|
||||
|
||||
trait ACSReader {
|
||||
def acsStream(
|
||||
@ -189,7 +188,7 @@ private[events] object FilterTableACSReader {
|
||||
work(task).map { case (result, nextTask) =>
|
||||
queueState.finishTask(nextTask)
|
||||
task -> result
|
||||
}(DirectExecutionContext)
|
||||
}(ExecutionContext.parasitic)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -3,11 +3,10 @@
|
||||
|
||||
package com.daml.platform.store.interning
|
||||
|
||||
import com.daml.dec.DirectExecutionContext
|
||||
import com.daml.lf.data.Ref
|
||||
import com.daml.logging.LoggingContext
|
||||
|
||||
import scala.concurrent.Future
|
||||
import scala.concurrent.{ExecutionContext, Future}
|
||||
|
||||
class DomainStringIterators(
|
||||
val parties: Iterator[String],
|
||||
@ -100,7 +99,7 @@ class StringInterningView(loadPrefixedEntries: LoadStringInterningEntries)
|
||||
Future.unit
|
||||
} else {
|
||||
loadPrefixedEntries(raw.lastId, lastStringInterningId)(loggingContext)
|
||||
.map(updateView)(DirectExecutionContext)
|
||||
.map(updateView)(ExecutionContext.parasitic)
|
||||
}
|
||||
|
||||
private def updateView(newEntries: Iterable[(Int, String)]): Unit = synchronized {
|
||||
|
@ -7,14 +7,13 @@ import java.util.concurrent.atomic.AtomicReference
|
||||
|
||||
import akka.stream.scaladsl.{Sink, Source, SourceQueueWithComplete}
|
||||
import akka.stream.{Materializer, OverflowStrategy, QueueOfferResult}
|
||||
import com.daml.dec.DirectExecutionContext
|
||||
import com.daml.ledger.participant.state.kvutils.wire.DamlSubmissionBatch
|
||||
import com.daml.ledger.participant.state.v2.SubmissionResult
|
||||
import com.google.rpc.code.Code
|
||||
import com.google.rpc.status.Status
|
||||
|
||||
import scala.concurrent.Future
|
||||
import scala.concurrent.duration._
|
||||
import scala.concurrent.{ExecutionContext, Future}
|
||||
|
||||
object BatchingQueue {
|
||||
type CommitBatchFunction =
|
||||
@ -154,7 +153,7 @@ case class DefaultBatchingQueue(
|
||||
RunningBatchingQueueState.Complete,
|
||||
)
|
||||
()
|
||||
}(DirectExecutionContext)
|
||||
}(ExecutionContext.parasitic)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -3,7 +3,6 @@
|
||||
|
||||
package com.daml.ledger.validator
|
||||
|
||||
import com.daml.dec.DirectExecutionContext
|
||||
import com.daml.ledger.participant.state.kvutils.Raw
|
||||
import com.daml.logging.LoggingContext
|
||||
import com.daml.metrics.{Metrics, Timed}
|
||||
@ -103,7 +102,7 @@ abstract class BatchingLedgerStateOperations[LogResult] extends LedgerStateOpera
|
||||
executionContext: ExecutionContext,
|
||||
loggingContext: LoggingContext,
|
||||
): Future[Option[Raw.Envelope]] =
|
||||
readState(Seq(key)).map(_.head)(DirectExecutionContext)
|
||||
readState(Seq(key)).map(_.head)(ExecutionContext.parasitic)
|
||||
|
||||
override final def writeState(
|
||||
key: Raw.StateKey,
|
||||
|
@ -41,7 +41,6 @@ da_scala_library(
|
||||
"//ledger/participant-integration-api",
|
||||
"//ledger/participant-state",
|
||||
"//ledger/participant-state/kvutils",
|
||||
"//libs-scala/concurrent",
|
||||
"//libs-scala/contextualized-logging",
|
||||
"//libs-scala/resources",
|
||||
"//libs-scala/resources-akka",
|
||||
|
@ -5,11 +5,11 @@ package com.daml.ledger.participant.state.kvutils.tools.integritycheck
|
||||
|
||||
import java.io.PrintWriter
|
||||
import java.util.concurrent.{Executors, TimeUnit}
|
||||
|
||||
import akka.actor.ActorSystem
|
||||
import akka.stream.Materializer
|
||||
import akka.stream.scaladsl.{Sink, Source}
|
||||
import com.codahale.metrics.{ConsoleReporter, MetricRegistry}
|
||||
import com.daml.dec.DirectExecutionContext
|
||||
import com.daml.ledger.participant.state.kvutils.export.{
|
||||
LedgerDataImporter,
|
||||
ProtobufBasedLedgerDataImporter,
|
||||
@ -20,13 +20,7 @@ import com.daml.lf.data.Ref
|
||||
import com.daml.logging.LoggingContext
|
||||
import com.daml.logging.LoggingContext.newLoggingContext
|
||||
import com.daml.metrics.Metrics
|
||||
import com.daml.platform.indexer.{
|
||||
Indexer,
|
||||
IndexerConfig,
|
||||
IndexerStartupMode,
|
||||
JdbcIndexer,
|
||||
StandaloneIndexerServer,
|
||||
}
|
||||
import com.daml.platform.indexer._
|
||||
import com.daml.platform.store.LfValueTranslationCache
|
||||
|
||||
import scala.concurrent.duration.Duration
|
||||
@ -327,7 +321,7 @@ object IntegrityChecker {
|
||||
case Failure(exception) =>
|
||||
exception.printStackTrace()
|
||||
sys.exit(1)
|
||||
}(DirectExecutionContext)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -8,7 +8,6 @@ import akka.stream.Materializer
|
||||
import com.codahale.metrics.MetricRegistry
|
||||
import com.daml.api.util.TimeProvider
|
||||
import com.daml.buildinfo.BuildInfo
|
||||
import com.daml.dec.DirectExecutionContext
|
||||
import com.daml.error.ErrorCodesVersionSwitcher
|
||||
import com.daml.ledger.api.auth.interceptor.AuthorizationInterceptor
|
||||
import com.daml.ledger.api.auth.{AuthService, AuthServiceWildcard, Authorizer}
|
||||
@ -199,7 +198,7 @@ final class SandboxServer(
|
||||
|
||||
// Only used in testing; hopefully we can get rid of it soon.
|
||||
def port: Port =
|
||||
Await.result(portF(DirectExecutionContext), AsyncTolerance)
|
||||
Await.result(portF(ExecutionContext.parasitic), AsyncTolerance)
|
||||
|
||||
def portF(implicit executionContext: ExecutionContext): Future[Port] =
|
||||
apiServer.map(_.port)
|
||||
@ -493,8 +492,7 @@ final class SandboxServer(
|
||||
}
|
||||
|
||||
override def close(): Unit = {
|
||||
implicit val executionContext: ExecutionContext = DirectExecutionContext
|
||||
Await.result(sandboxState.flatMap(_.release()), AsyncTolerance)
|
||||
Await.result(sandboxState.flatMap(_.release())(ExecutionContext.parasitic), AsyncTolerance)
|
||||
}
|
||||
|
||||
private def writePortFile(port: Port)(implicit executionContext: ExecutionContext): Future[Unit] =
|
||||
|
@ -9,7 +9,6 @@ import akka.stream.scaladsl.{Keep, Sink, Source, SourceQueueWithComplete}
|
||||
import akka.stream.{Materializer, OverflowStrategy, QueueOfferResult}
|
||||
import com.daml.api.util.TimeProvider
|
||||
import com.daml.daml_lf_dev.DamlLf.Archive
|
||||
import com.daml.dec.{DirectExecutionContext => DEC}
|
||||
import com.daml.error.{ContextualizedErrorLogger, DamlContextualizedErrorLogger}
|
||||
import com.daml.grpc.GrpcStatus
|
||||
import com.daml.ledger.api.domain
|
||||
@ -288,9 +287,10 @@ private[sandbox] object SqlLedger {
|
||||
|
||||
ledgerDao
|
||||
.storePackageEntry(newLedgerEnd, packages, None)
|
||||
.transform(_ => (), e => sys.error("Failed to copy initial packages: " + e.getMessage))(
|
||||
DEC
|
||||
)
|
||||
.transform(
|
||||
_ => (),
|
||||
e => sys.error("Failed to copy initial packages: " + e.getMessage),
|
||||
)(ExecutionContext.parasitic)
|
||||
} else {
|
||||
Future.unit
|
||||
}
|
||||
@ -391,7 +391,7 @@ private[sandbox] object SqlLedger {
|
||||
})(queue => Future.successful(queue.complete()))
|
||||
|
||||
private def persistAll(queue: Queue[Offset => Future[Unit]]): Future[Unit] = {
|
||||
implicit val executionContext: ExecutionContext = DEC
|
||||
implicit val executionContext: ExecutionContext = ExecutionContext.parasitic
|
||||
val startOffset = SandboxOffset.fromOffset(dispatcher.getHead())
|
||||
// This will attempt to run the SQL queries concurrently, but there is no parallelism here,
|
||||
// so they will still run sequentially.
|
||||
@ -416,7 +416,7 @@ private[sandbox] object SqlLedger {
|
||||
.failed
|
||||
.foreach { throwable =>
|
||||
logger.error("Persistence queue has been closed with a failure.", throwable)
|
||||
}(DEC)
|
||||
}(ExecutionContext.parasitic)
|
||||
|
||||
}
|
||||
}
|
||||
@ -512,7 +512,7 @@ private final class SqlLedger(
|
||||
_.map(_ => ()).recover { case NonFatal(t) =>
|
||||
logger.error(s"Failed to persist entry with offset: ${offset.toApiString}", t)
|
||||
}
|
||||
)(DEC)
|
||||
)(ExecutionContext.parasitic)
|
||||
}(errorLogger)
|
||||
}
|
||||
|
||||
@ -548,7 +548,7 @@ private final class SqlLedger(
|
||||
"Failed to enqueue submission"
|
||||
)(f)
|
||||
Failure(protobuf.StatusProto.toStatusRuntimeException(failedStatus))
|
||||
}(DEC)
|
||||
}(ExecutionContext.parasitic)
|
||||
|
||||
override def publishPartyAllocation(
|
||||
submissionId: Ref.SubmissionId,
|
||||
@ -570,7 +570,7 @@ private final class SqlLedger(
|
||||
PartyDetails(party, displayName, isLocal = true),
|
||||
),
|
||||
)
|
||||
.map(_ => ())(DEC)
|
||||
.map(_ => ())(ExecutionContext.parasitic)
|
||||
.recover { case t =>
|
||||
//recovering from the failure so the persistence stream doesn't die
|
||||
logger.error(
|
||||
@ -578,14 +578,14 @@ private final class SqlLedger(
|
||||
t,
|
||||
)
|
||||
()
|
||||
}(DEC)
|
||||
}(ExecutionContext.parasitic)
|
||||
|
||||
case _ =>
|
||||
logger.warn(
|
||||
s"Ignoring duplicate party submission with ID $party for submissionId ${Some(submissionId)}"
|
||||
)
|
||||
Future.unit
|
||||
}(DEC)
|
||||
}(ExecutionContext.parasitic)
|
||||
}(errorLogger)
|
||||
}
|
||||
|
||||
@ -609,12 +609,12 @@ private final class SqlLedger(
|
||||
PackageLedgerEntry.PackageUploadAccepted(submissionId, timeProvider.getCurrentTimestamp)
|
||||
),
|
||||
)
|
||||
.map(_ => ())(DEC)
|
||||
.map(_ => ())(ExecutionContext.parasitic)
|
||||
.recover { case t =>
|
||||
//recovering from the failure so the persistence stream doesn't die
|
||||
logger.error(s"Failed to persist packages with offset: ${offset.toApiString}", t)
|
||||
()
|
||||
}(DEC)
|
||||
}(ExecutionContext.parasitic)
|
||||
}(errorLogger)
|
||||
}
|
||||
|
||||
@ -644,7 +644,7 @@ private final class SqlLedger(
|
||||
// database transaction.
|
||||
// NOTE(RA): Since the new configuration can be rejected inside storeConfigurationEntry,
|
||||
// we look up the current configuration again to see if it was stored successfully.
|
||||
implicit val ec: ExecutionContext = DEC
|
||||
implicit val ec: ExecutionContext = ExecutionContext.parasitic
|
||||
for {
|
||||
response <- ledgerDao.storeConfigurationEntry(
|
||||
offset,
|
||||
@ -661,12 +661,12 @@ private final class SqlLedger(
|
||||
}
|
||||
|
||||
storeF
|
||||
.map(_ => ())(DEC)
|
||||
.map(_ => ())(ExecutionContext.parasitic)
|
||||
.recover { case t =>
|
||||
//recovering from the failure so the persistence stream doesn't die
|
||||
logger.error(s"Failed to persist configuration with offset: $offset", t)
|
||||
()
|
||||
}(DEC)
|
||||
}(ExecutionContext.parasitic)
|
||||
}(errorLogger)
|
||||
}
|
||||
}
|
||||
|
@ -9,11 +9,10 @@ import com.daml.ledger.api.testing.utils.{SuiteResourceManagementAroundEach, Moc
|
||||
import com.daml.ledger.api.v1.active_contracts_service.ActiveContractsServiceGrpc
|
||||
import com.daml.ledger.api.v1.transaction_filter._
|
||||
import com.daml.ledger.client.services.acs.ActiveContractSetClient
|
||||
import com.daml.dec.DirectExecutionContext
|
||||
import com.daml.platform.sandbox.services.{SandboxFixture, TestCommands}
|
||||
import org.scalatest.concurrent.ScalaFutures
|
||||
import org.scalatest.time.{Millis, Span}
|
||||
import org.scalatest.matchers.should.Matchers
|
||||
import org.scalatest.time.{Millis, Span}
|
||||
import org.scalatest.wordspec.AnyWordSpec
|
||||
|
||||
@SuppressWarnings(Array("org.wartremover.warts.StringPlusAny"))
|
||||
@ -40,8 +39,6 @@ class ScenarioLoadingITDivulgence
|
||||
.getActiveContracts(transactionFilter)
|
||||
.runWith(Sink.seq)
|
||||
|
||||
implicit val ec = DirectExecutionContext
|
||||
|
||||
"ScenarioLoading" when {
|
||||
"running a divulgence scenario" should {
|
||||
"not fail" in {
|
||||
|
@ -6,7 +6,6 @@ package com.daml.platform.sandbox.services.command
|
||||
import java.util.concurrent.atomic.AtomicInteger
|
||||
|
||||
import com.daml.api.util.TimeProvider
|
||||
import com.daml.dec.DirectExecutionContext
|
||||
import com.daml.ledger.api.testing.utils.{MockMessages, SuiteResourceManagementAroundAll}
|
||||
import com.daml.ledger.api.v1.command_completion_service.CommandCompletionServiceGrpc
|
||||
import com.daml.ledger.api.v1.command_submission_service.{
|
||||
@ -27,7 +26,7 @@ import org.scalatest.matchers.should.Matchers
|
||||
import org.scalatest.wordspec.AsyncWordSpec
|
||||
import scalaz.syntax.tag._
|
||||
|
||||
import scala.concurrent.Future
|
||||
import scala.concurrent.{ExecutionContext, Future}
|
||||
import scala.util.control.NonFatal
|
||||
|
||||
final class CommandStaticTimeIT
|
||||
@ -46,10 +45,12 @@ final class CommandStaticTimeIT
|
||||
|
||||
private lazy val unwrappedLedgerId = ledgerId().unwrap
|
||||
|
||||
private def createCommandClient(): Future[CommandClient] =
|
||||
private def createCommandClient()(implicit
|
||||
executionContext: ExecutionContext
|
||||
): Future[CommandClient] =
|
||||
StaticTime
|
||||
.updatedVia(TimeServiceGrpc.stub(channel), unwrappedLedgerId)
|
||||
.recover { case NonFatal(_) => TimeProvider.UTC }(DirectExecutionContext)
|
||||
.recover { case NonFatal(_) => TimeProvider.UTC }
|
||||
.map(_ =>
|
||||
new CommandClient(
|
||||
CommandSubmissionServiceGrpc.stub(channel),
|
||||
@ -62,7 +63,7 @@ final class CommandStaticTimeIT
|
||||
defaultDeduplicationTime = java.time.Duration.ofSeconds(30),
|
||||
),
|
||||
)
|
||||
)(DirectExecutionContext)
|
||||
)
|
||||
|
||||
private lazy val submitRequest: SubmitRequest =
|
||||
MockMessages.submitRequest.update(
|
||||
|
@ -42,7 +42,6 @@ da_scala_library(
|
||||
"//ledger/metrics",
|
||||
"//ledger/participant-integration-api",
|
||||
"//libs-scala/build-info",
|
||||
"//libs-scala/concurrent",
|
||||
"//libs-scala/contextualized-logging",
|
||||
"//libs-scala/logging-entries",
|
||||
"//libs-scala/ports",
|
||||
|
@ -4,7 +4,7 @@
|
||||
package com.daml.platform.sandbox.services
|
||||
|
||||
import java.util.concurrent.atomic.AtomicBoolean
|
||||
import com.daml.dec.{DirectExecutionContext => DE}
|
||||
|
||||
import com.daml.error.DamlContextualizedErrorLogger
|
||||
import com.daml.ledger.api.auth.Authorizer
|
||||
import com.daml.ledger.api.domain.LedgerId
|
||||
@ -15,7 +15,7 @@ import com.google.protobuf.empty.Empty
|
||||
import io.grpc.ServerCall.Listener
|
||||
import io.grpc._
|
||||
|
||||
import scala.concurrent.Future
|
||||
import scala.concurrent.{ExecutionContext, Future}
|
||||
|
||||
class SandboxResetService(
|
||||
ledgerId: LedgerId,
|
||||
@ -34,7 +34,7 @@ class SandboxResetService(
|
||||
private val resetInitialized = new AtomicBoolean(false)
|
||||
|
||||
override def bindService(): ServerServiceDefinition =
|
||||
ResetServiceGrpc.bindService(this, DE)
|
||||
ResetServiceGrpc.bindService(this, ExecutionContext.parasitic)
|
||||
|
||||
override def reset(request: ResetRequest): Future[Empty] =
|
||||
authorizer.requireAdminClaims(doReset)(request)
|
||||
@ -65,7 +65,10 @@ class SandboxResetService(
|
||||
request.ledgerId,
|
||||
errorFactories.ledgerIdMismatch(ledgerId, LedgerId(request.ledgerId), None),
|
||||
)
|
||||
.fold(Future.failed[Empty], _ => actuallyReset().map(_ => Empty())(DE))
|
||||
.fold(
|
||||
Future.failed[Empty],
|
||||
_ => actuallyReset().map(_ => Empty())(ExecutionContext.parasitic),
|
||||
)
|
||||
|
||||
private def actuallyReset() = {
|
||||
logger.info("Initiating server reset.")
|
||||
|
@ -41,7 +41,6 @@ da_scala_library(
|
||||
"//ledger/sandbox-common:sandbox-common-scala-tests-lib",
|
||||
"//ledger/test-common",
|
||||
"//ledger/test-common:dar-files-default-lib",
|
||||
"//libs-scala/concurrent",
|
||||
"//libs-scala/ports",
|
||||
"//libs-scala/postgresql-testing",
|
||||
"//libs-scala/resources",
|
||||
|
@ -7,7 +7,6 @@ import java.io.File
|
||||
import java.util.UUID
|
||||
|
||||
import akka.stream.scaladsl.{Sink, Source}
|
||||
import com.daml.lf.data.Ref.PackageId
|
||||
import com.daml.ledger.api.v1.active_contracts_service.GetActiveContractsResponse
|
||||
import com.daml.ledger.api.v1.command_service.SubmitAndWaitRequest
|
||||
import com.daml.ledger.api.v1.commands.{Command, Commands}
|
||||
@ -15,11 +14,11 @@ import com.daml.ledger.api.v1.event.CreatedEvent
|
||||
import com.daml.ledger.api.v1.transaction_filter.{Filters, TransactionFilter}
|
||||
import com.daml.ledger.api.v1.value.{Identifier, Value}
|
||||
import com.daml.ledger.client.services.acs.ActiveContractSetClient
|
||||
import com.daml.dec.DirectExecutionContext
|
||||
import com.daml.lf.data.Ref.PackageId
|
||||
import com.daml.platform.sandbox.perf.util.DarUtil
|
||||
|
||||
import scala.concurrent.{ExecutionContext, Future}
|
||||
import scala.concurrent.duration._
|
||||
import scala.concurrent.{ExecutionContext, Future}
|
||||
|
||||
trait TestHelper {
|
||||
|
||||
@ -31,12 +30,11 @@ trait TestHelper {
|
||||
val applicationId: String = "app1"
|
||||
|
||||
val party = "party"
|
||||
val rangeOfIntsTemplateId =
|
||||
Identifier(
|
||||
packageId = largeTxPackageId,
|
||||
moduleName = "LargeTransaction",
|
||||
entityName = "RangeOfInts",
|
||||
)
|
||||
val rangeOfIntsTemplateId = Identifier(
|
||||
packageId = largeTxPackageId,
|
||||
moduleName = "LargeTransaction",
|
||||
entityName = "RangeOfInts",
|
||||
)
|
||||
|
||||
val listUtilTemplateId = Identifier(
|
||||
packageId = largeTxPackageId,
|
||||
@ -67,7 +65,7 @@ trait TestHelper {
|
||||
applicationId = applicationId,
|
||||
commandId = commandId,
|
||||
party = party,
|
||||
commands = Seq(Command(command)),
|
||||
commands = Seq(Command.of(command)),
|
||||
)
|
||||
SubmitAndWaitRequest(Some(commands))
|
||||
}
|
||||
@ -115,7 +113,7 @@ trait TestHelper {
|
||||
workflowId: String = "",
|
||||
): Future[Unit] = {
|
||||
val request: SubmitAndWaitRequest = submitAndWaitRequest(command, commandId, workflowId)
|
||||
state.ledger.commandService.submitAndWait(request).map(_ => ())(DirectExecutionContext)
|
||||
state.ledger.commandService.submitAndWait(request).map(_ => ())(ExecutionContext.parasitic)
|
||||
}
|
||||
|
||||
def activeContractIds(
|
||||
|
@ -27,9 +27,6 @@ da_scala_library(
|
||||
visibility = [
|
||||
"//visibility:public",
|
||||
],
|
||||
deps = [
|
||||
"@maven//:org_slf4j_slf4j_api",
|
||||
],
|
||||
)
|
||||
|
||||
da_scala_test(
|
||||
|
@ -21,9 +21,9 @@ import scala.{concurrent => sc}
|
||||
* declare which ExecutionContext their operations are in.
|
||||
*
|
||||
* For Scala 2.12, you must pass `-Xsource:2.13` to scalac for methods and
|
||||
* conversions to be automatically found. You must also `import
|
||||
* scalaz.syntax.bind._` or similar for Future methods like `map`, `flatMap`,
|
||||
* and so on.
|
||||
* conversions to be automatically found. You must also
|
||||
* `import scalaz.syntax.bind._` or similar for Future methods like `map`,
|
||||
* `flatMap`, and so on.
|
||||
*
|
||||
* There are no constraints on the `EC` type variable; you need only declare
|
||||
* types you wish to use for it that are sufficient for describing the domains
|
||||
@ -111,5 +111,8 @@ package concurrent {
|
||||
|
||||
def fromExecutorService[EC](e: ExecutorService): ExecutionContext[EC] =
|
||||
apply(sc.ExecutionContext.fromExecutorService(e))
|
||||
|
||||
val parasitic: ExecutionContext[Nothing] =
|
||||
ExecutionContext(sc.ExecutionContext.parasitic)
|
||||
}
|
||||
}
|
||||
|
@ -1,21 +0,0 @@
|
||||
// Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package com.daml.dec
|
||||
|
||||
import org.slf4j.LoggerFactory
|
||||
|
||||
import scala.concurrent.ExecutionContext
|
||||
|
||||
// Starting from Scala 2.13 this can deleted and replaced by `parasitic`
|
||||
private[dec] object DirectExecutionContextInternal extends ExecutionContext {
|
||||
|
||||
private[this] val logger = LoggerFactory.getLogger(this.getClass)
|
||||
|
||||
override final def execute(runnable: Runnable): Unit =
|
||||
runnable.run()
|
||||
|
||||
override final def reportFailure(cause: Throwable): Unit =
|
||||
logger.error("Unhandled exception", cause)
|
||||
|
||||
}
|
@ -1,14 +0,0 @@
|
||||
// Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package com.daml
|
||||
|
||||
import com.daml.concurrent.ExecutionContext
|
||||
|
||||
package object dec {
|
||||
|
||||
// Starting from Scala 2.13 this can deleted and replaced by `parasitic`
|
||||
val DirectExecutionContext: ExecutionContext[Nothing] =
|
||||
ExecutionContext(DirectExecutionContextInternal)
|
||||
|
||||
}
|
@ -90,8 +90,6 @@ scala_binary_deps = [
|
||||
|
||||
binary_deps = [
|
||||
":trigger-service",
|
||||
"//libs-scala/ports",
|
||||
"//libs-scala/concurrent",
|
||||
"//daml-lf/archive:daml_lf_archive_reader",
|
||||
"//daml-lf/archive:daml_lf_1.dev_archive_proto_java",
|
||||
"//daml-lf/data",
|
||||
@ -100,6 +98,7 @@ binary_deps = [
|
||||
"//ledger/ledger-api-common",
|
||||
"//libs-scala/contextualized-logging",
|
||||
"//libs-scala/db-utils",
|
||||
"//libs-scala/ports",
|
||||
"//libs-scala/scala-utils",
|
||||
"//triggers/service/auth:middleware-api",
|
||||
"@maven//:org_slf4j_slf4j_api",
|
||||
|
@ -5,14 +5,13 @@ package com.daml.lf.engine.trigger
|
||||
|
||||
import java.util.UUID
|
||||
|
||||
import akka.actor.typed.{ActorRef, ActorSystem, Scheduler}
|
||||
import akka.actor.typed.scaladsl.AskPattern._
|
||||
import akka.actor.typed.{ActorRef, ActorSystem, Scheduler}
|
||||
import akka.http.scaladsl.Http.ServerBinding
|
||||
import akka.http.scaladsl.model.Uri
|
||||
import akka.util.Timeout
|
||||
import com.daml.auth.middleware.api.{Client => AuthClient}
|
||||
import com.daml.daml_lf_dev.DamlLf
|
||||
import com.daml.dec.DirectExecutionContext
|
||||
import com.daml.dbutils.JdbcConfig
|
||||
import com.daml.lf.archive.{Dar, DarReader}
|
||||
import com.daml.lf.data.Ref.PackageId
|
||||
@ -20,16 +19,16 @@ import com.daml.lf.engine.trigger.dao.DbTriggerDao
|
||||
import com.daml.lf.speedy.Compiler
|
||||
import com.daml.logging.ContextualizedLogger
|
||||
import com.daml.ports.{Port, PortFiles}
|
||||
import com.daml.scalautil.Statement.discard
|
||||
import com.daml.runtime.JdbcDrivers
|
||||
import com.daml.scalautil.Statement.discard
|
||||
import scalaz.std.either._
|
||||
import scalaz.std.list._
|
||||
import scalaz.syntax.traverse._
|
||||
|
||||
import scala.concurrent.{Await, ExecutionContext, Future}
|
||||
import scala.concurrent.duration._
|
||||
import scala.concurrent.{Await, ExecutionContext, Future}
|
||||
import scala.sys.ShutdownHookThread
|
||||
import scala.util.{Failure, Success, Try}
|
||||
import scalaz.syntax.traverse._
|
||||
import scalaz.std.list._
|
||||
import scalaz.std.either._
|
||||
|
||||
object ServiceMain {
|
||||
|
||||
@ -135,8 +134,8 @@ object ServiceMain {
|
||||
case Some(c) =>
|
||||
Try(
|
||||
Await.result(
|
||||
DbTriggerDao(c)(DirectExecutionContext)
|
||||
.initialize(config.allowExistingSchema)(DirectExecutionContext),
|
||||
DbTriggerDao(c)(ExecutionContext.parasitic)
|
||||
.initialize(config.allowExistingSchema)(ExecutionContext.parasitic),
|
||||
Duration(30, SECONDS),
|
||||
)
|
||||
) match {
|
||||
|
Loading…
Reference in New Issue
Block a user