fix MAXIMUM_NUMBER_OF_STREAMS errors when doing many websocket queries (#15733)

* log more termination

From the timeout loop:

+ fmm-outer
+ fmm-inner
x ACS-before-tx
x tx-after-ACS

* spam eagerCancel=true and see what happens

From the timeout loop:

+ after-split
+ IDSS-outer
+ fmm-outer
+ contractsAndBoundary
+ tx-after-ACS
+ fmm-inner
+ GTSFP-outer
x ACS-before-tx

* passing acs-and-tx tests

* trying combinations of reverting eagerCancel settings

- setting eagerCancel = false in acsAndBoundary causes the ACS
  cancellation to fail (first test), but the tx cancellation still
  succeeds

- setting eagerCancel = false in project2 causes both the ACS and tx
  stream cancellation tests (first and third tests) to fail

- the offset broadcast in acsFollowingAndBoundary appears to be
  redundant with respect to cancellation, so we revert it in the
  interest of conservatism

* make test size small

* current measurement

Still fine after the refactoring of logTermination and removal of fmm-*.

+ GTSFP-outer
+ contractsAndBoundary
x IDSS-outer-2
+ after-split
+ tx-after-ACS
+ IDSS-outer-1
x ACS-before-tx

* set level of the logTermination messages to trace
This commit is contained in:
Stephen Compall 2022-12-16 16:41:08 -05:00 committed by GitHub
parent 33bc838871
commit 2893e26872
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 185 additions and 32 deletions

View File

@ -36,6 +36,7 @@ da_scala_library(
"//daml-lf/transaction",
"//language-support/scala/bindings-akka",
"//ledger-service/db-backend",
"//libs-scala/contextualized-logging",
"//libs-scala/nonempty",
"//libs-scala/scala-utils",
],
@ -43,12 +44,15 @@ da_scala_library(
da_scala_test(
name = "tests",
size = "medium",
size = "small",
srcs = glob(["src/test/scala/**/*.scala"]),
plugins = [
kind_projector_plugin,
],
scala_deps = [
"@maven//:com_typesafe_akka_akka_actor",
"@maven//:com_typesafe_akka_akka_stream",
"@maven//:com_typesafe_akka_akka_stream_testkit",
"@maven//:org_scalacheck_scalacheck",
"@maven//:org_scalatest_scalatest_core",
"@maven//:org_scalatest_scalatest_matchers_core",
@ -60,7 +64,13 @@ da_scala_test(
scalacopts = hj_scalacopts,
deps = [
":fetch-contracts",
"//ledger-api/grpc-definitions:ledger_api_proto_scala",
"//ledger-api/rs-grpc-bridge",
"//ledger-api/testing-utils",
"//libs-scala/contextualized-logging",
"//libs-scala/logging-entries",
"//libs-scala/scalatest-utils",
"@maven//:org_reactivestreams_reactive_streams",
"@maven//:org_scalatest_scalatest_compatible",
],
)

View File

@ -9,6 +9,7 @@ import akka.stream.{FanOutShape2, Graph}
import com.daml.scalautil.Statement.discard
import domain.ContractTypeId
import util.{AbsoluteBookmark, BeginBookmark, ContractStreamStep, InsertDeleteStep, LedgerBegin}
import util.GraphExtensions._
import util.IdentifierConverters.apiIdentifier
import com.daml.ledger.api.v1.transaction.Transaction
import com.daml.ledger.api.{v1 => lav1}
@ -39,6 +40,9 @@ private[daml] object AcsTxStreams {
*/
private[daml] def acsFollowingAndBoundary(
transactionsSince: lav1.ledger_offset.LedgerOffset => Source[Transaction, NotUsed]
)(implicit
ec: concurrent.ExecutionContext,
lc: com.daml.logging.LoggingContextOf[Any],
): Graph[FanOutShape2[
lav1.active_contracts_service.GetActiveContractsResponse,
ContractStreamStep.LAV1,
@ -49,7 +53,7 @@ private[daml] object AcsTxStreams {
import ContractStreamStep.{LiveBegin, Acs}
type Off = BeginBookmark[domain.Offset]
val acs = b add acsAndBoundary
val dupOff = b add Broadcast[Off](2)
val dupOff = b add Broadcast[Off](2, eagerCancel = false)
val liveStart = Flow fromFunction { off: Off =>
LiveBegin(off)
}
@ -75,7 +79,7 @@ private[daml] object AcsTxStreams {
GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._
import lav1.active_contracts_service.{GetActiveContractsResponse => GACR}
val dup = b add Broadcast[GACR](2)
val dup = b add Broadcast[GACR](2, eagerCancel = true)
val acs = b add (Flow fromFunction ((_: GACR).activeContracts))
val off = b add Flow[GACR]
.collect {
@ -93,6 +97,9 @@ private[daml] object AcsTxStreams {
*/
private[daml] def transactionsFollowingBoundary(
transactionsSince: lav1.ledger_offset.LedgerOffset => Source[Transaction, NotUsed]
)(implicit
ec: concurrent.ExecutionContext,
lc: com.daml.logging.LoggingContextOf[Any],
): Graph[FanOutShape2[
BeginBookmark[domain.Offset],
ContractStreamStep.Txn.LAV1,
@ -110,12 +117,15 @@ private[daml] object AcsTxStreams {
import domain.Offset.`Offset ordering`
val lastTxOff = b add last(LedgerBegin: Off)
val maxOff = b add max(LedgerBegin: Off)
val logTxnOut =
b add logTermination[ContractStreamStep.Txn.LAV1]("first branch of tx stream split")
// format: off
discard { txnSplit.in <~ txns <~ dupOff }
discard { dupOff ~> mergeOff ~> maxOff }
discard { txnSplit.out1.map(off => AbsoluteBookmark(off)) ~> lastTxOff ~> mergeOff }
discard { txnSplit.out0 ~> logTxnOut }
// format: on
new FanOutShape2(dupOff.in, txnSplit.out0, maxOff.out)
new FanOutShape2(dupOff.in, logTxnOut.out, maxOff.out)
}
private[this] def transactionToInsertsAndDeletes(

View File

@ -39,7 +39,7 @@ private[daml] object AkkaStreamsDoobie {
private[fetchcontracts] def project2[A, B]: Graph[FanOutShape2[(A, B), A, B], NotUsed] =
GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._
val split = b add Broadcast[(A, B)](2)
val split = b add Broadcast[(A, B)](2, eagerCancel = true)
val left = b add Flow.fromFunction((_: (A, B))._1)
val right = b add Flow.fromFunction((_: (A, B))._2)
discard { split ~> left }

View File

@ -6,9 +6,10 @@ package com.daml.fetchcontracts.util
import akka.NotUsed
import akka.stream.scaladsl.{Flow, GraphDSL, Keep, Sink}
import akka.stream.{FanOutShape2, FlowShape, Graph}
import com.daml.logging.{ContextualizedLogger, LoggingContextOf}
import scalaz.Liskov.<~<
import scala.concurrent.Future
import scala.concurrent.{ExecutionContext, Future}
object GraphExtensions {
implicit final class `Graph FOS2 funs`[A, Y, Z, M](
@ -29,4 +30,26 @@ object GraphExtensions {
divertToMat(Sink.head)(noM.subst[CK](Keep.right[NotUsed, Future[Z]]))
}
}
private[daml] def logTermination[A](
extraMessage: String
)(implicit ec: ExecutionContext, lc: LoggingContextOf[Any]): Flow[A, A, NotUsed] =
if (logger.trace.isEnabled)
Flow[A].watchTermination() { (mat, fd) =>
fd.onComplete(
_.fold(
{ t =>
logger.trace(s"stream-abort [$extraMessage] trying to abort ${t.getMessage}")
},
{ _ =>
logger.trace(s"stream-stop [$extraMessage] trying to shutdown")
},
)
)
mat
}
else
Flow[A]
private val logger = ContextualizedLogger.get(getClass)
}

View File

@ -0,0 +1,108 @@
// Copyright (c) 2022 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.daml.fetchcontracts
import com.daml.ledger.api.testing.utils.AkkaBeforeAndAfterAll
import org.scalatest.wordspec.AsyncWordSpec
import org.scalatest.matchers.should.Matchers
import scala.concurrent.Future
@SuppressWarnings(Array("org.wartremover.warts.NonUnitStatements"))
class AcsTxStreamsTest extends AsyncWordSpec with Matchers with AkkaBeforeAndAfterAll {
import AcsTxStreamsTest._
"acsFollowingAndBoundary" when {
"ACS is active" should {
"cancel the ACS on output cancel" in {
val (acs, futx, out, _) = probeAcsFollowingAndBoundary()
out.cancel()
acs.expectCancellation()
futx.isCompleted should ===(false)
}
}
"ACS is past liveBegin" should {
"not start tx until ACS is complete" in {
val (acs, futx, _, _) = probeAcsFollowingAndBoundary()
acs.sendNext(liveBegin)
futx.isCompleted should ===(false)
}
"propagate cancellation of tx stream" in {
val (_, _) = (liveBegin, txEnd)
val (acs, futx, out, off) = probeAcsFollowingAndBoundary()
acs.sendNext(liveBegin).sendComplete()
off.expectSubscription()
out.cancel()
futx.map { tx =>
tx.expectCancellation()
succeed
}
}
}
}
}
object AcsTxStreamsTest {
import akka.NotUsed
import akka.actor.ActorSystem
import akka.{stream => aks}
import aks.scaladsl.{GraphDSL, RunnableGraph, Source}
import aks.{testkit => tk}
import tk.TestPublisher.{Probe => InProbe}
import tk.TestSubscriber.{Probe => OutProbe}
import tk.scaladsl.{TestSource, TestSink}
import com.daml.ledger.api.{v1 => lav1}
import com.daml.logging.LoggingContextOf
private val liveBegin = lav1.active_contracts_service.GetActiveContractsResponse(offset = "42")
private val txEnd = lav1.transaction.Transaction(offset = "84")
private implicit val `log ctx`: LoggingContextOf[Any] =
LoggingContextOf.newLoggingContext(LoggingContextOf.label[Any])(identity)
private def probeAcsFollowingAndBoundary()(implicit
ec: concurrent.ExecutionContext,
as: ActorSystem,
) =
probeFOS2PlusContinuation(AcsTxStreams.acsFollowingAndBoundary).run()
@SuppressWarnings(Array("org.wartremover.warts.NonUnitStatements"))
private def probeFOS2PlusContinuation[K, I0, I1, O0, O1](
part: (Any => Source[I1, NotUsed]) => aks.Graph[aks.FanOutShape2[I0, O0, O1], NotUsed]
)(implicit
as: ActorSystem
): RunnableGraph[(InProbe[I0], Future[InProbe[I1]], OutProbe[O0], OutProbe[O1])] = {
val i1 = concurrent.Promise[InProbe[I1]]()
// filling in i1 like this is terribly hacky but is well enough for a test
probeAll(
part(_ =>
TestSource.probe[I1].mapMaterializedValue { i1p =>
i1.success(i1p)
NotUsed
}
)
)
.mapMaterializedValue { case (i0, o0, o1) => (i0, i1.future, o0, o1) }
}
private def probeAll[I, O0, O1](
part: aks.Graph[aks.FanOutShape2[I, O0, O1], NotUsed]
)(implicit as: ActorSystem): RunnableGraph[(InProbe[I], OutProbe[O0], OutProbe[O1])] =
RunnableGraph fromGraph GraphDSL.createGraph(
TestSource.probe[I],
TestSink.probe[O0],
TestSink.probe[O1],
)((_, _, _)) { implicit b => (i, o0, o1) =>
import GraphDSL.Implicits._
val here = b add part
// format: off
i ~> here.in
o0 <~ here.out0
o1 <~ here.out1
// format: on
aks.ClosedShape
}
}

View File

@ -24,6 +24,7 @@ import com.daml.fetchcontracts.util.{
import util.{ApiValueToLfValueConverter, toLedgerId}
import com.daml.fetchcontracts.AcsTxStreams.transactionFilter
import com.daml.fetchcontracts.util.ContractStreamStep.{Acs, LiveBegin}
import com.daml.fetchcontracts.util.GraphExtensions._
import com.daml.http.metrics.HttpJsonApiMetrics
import com.daml.http.util.FutureUtil.toFuture
import com.daml.http.util.Logging.{InstanceUUID, RequestID}
@ -596,7 +597,9 @@ class ContractsService(
): Source[ContractStreamStep.LAV1, NotUsed] = {
val txnFilter = transactionFilter(parties, templateIds)
def source = getActiveContracts(jwt, ledgerId, txnFilter, true)(lc)
def source =
(getActiveContracts(jwt, ledgerId, txnFilter, true)(lc)
via logTermination("ACS upstream"))
val transactionsSince
: api.ledger_offset.LedgerOffset => Source[api.transaction.Transaction, NotUsed] =
@ -606,19 +609,21 @@ class ContractsService(
txnFilter,
_: api.ledger_offset.LedgerOffset,
terminates,
)(lc)
)(lc) via logTermination("transactions upstream")
import com.daml.fetchcontracts.AcsTxStreams.{
acsFollowingAndBoundary,
transactionsFollowingBoundary,
}, com.daml.fetchcontracts.util.GraphExtensions._
val contractsAndBoundary = startOffset.cata(
so =>
Source
.single(AbsoluteBookmark(so.offset))
.viaMat(transactionsFollowingBoundary(transactionsSince).divertToHead)(Keep.right),
source.viaMat(acsFollowingAndBoundary(transactionsSince).divertToHead)(Keep.right),
)
val contractsAndBoundary = startOffset
.cata(
so =>
Source
.single(AbsoluteBookmark(so.offset))
.viaMat(transactionsFollowingBoundary(transactionsSince).divertToHead)(Keep.right),
source.viaMat(acsFollowingAndBoundary(transactionsSince).divertToHead)(Keep.right),
)
.via(logTermination("ACS+tx or tx stream"))
contractsAndBoundary mapMaterializedValue { fob =>
fob.foreach(a => logger.debug(s"contracts fetch completed at: ${a.toString}"))
NotUsed

View File

@ -14,6 +14,7 @@ import com.daml.fetchcontracts.util.{
InsertDeleteStep,
LedgerBegin,
}
import com.daml.fetchcontracts.util.GraphExtensions._
import com.daml.http.EndpointsCompanion._
import com.daml.http.domain.{
ContractKeyStreamRequest,
@ -56,7 +57,6 @@ import spray.json.{JsArray, JsObject, JsValue, JsonReader, JsonWriter, enrichAny
import scala.collection.mutable.HashSet
import scala.concurrent.{ExecutionContext, Future}
import scala.util.{Failure, Success}
import scalaz.EitherT.{either, eitherT, rightT}
import com.daml.ledger.api.{domain => LedgerApiDomain}
import com.daml.nonempty.NonEmpty
@ -775,24 +775,19 @@ class WebSocketService(
): Flow[A, A, NotUsed] =
Flow[A]
.watchTermination() { (_, future) =>
discard { numConns.incrementAndGet }
val afterInc = numConns.incrementAndGet()
metrics.websocketRequestCounter.inc()
logger.info(
s"New websocket client has connected, current number of clients:${numConns.get()}"
s"New websocket client has connected, current number of clients:$afterInc"
)
future onComplete {
case Success(_) =>
discard { numConns.decrementAndGet }
metrics.websocketRequestCounter.dec()
logger.info(
s"Websocket client has disconnected. Current number of clients: ${numConns.get()}"
)
case Failure(ex) =>
discard { numConns.decrementAndGet }
metrics.websocketRequestCounter.dec()
logger.info(
s"Websocket client interrupted on Failure: ${ex.getMessage}. remaining number of clients: ${numConns.get()}"
)
future onComplete { td =>
def msg = td.fold(
ex => s"interrupted on Failure: ${ex.getMessage}. remaining",
_ => "has disconnected. Current",
)
val afterDec = numConns.decrementAndGet()
metrics.websocketRequestCounter.dec()
logger.info(s"Websocket client $msg number of clients: $afterDec")
}
NotUsed
}
@ -853,7 +848,7 @@ class WebSocketService(
jwtPayload.parties,
offPrefix,
rq.q: q,
)
) via logTermination("getTransactionSourceForParty")
}.valueOr(e => Source.single(-\/(e))): Source[Error \/ Message, NotUsed],
)
.takeWhile(_.isRight, inclusive = true) // stop after emitting 1st error
@ -986,6 +981,7 @@ class WebSocketService(
liveStartingOffset,
Terminates.Never,
)
.via(logTermination("insertDeleteStepSource with ACS"))
.via(
convertFilterContracts(
resolvedQuery,
@ -1035,6 +1031,7 @@ class WebSocketService(
liveStartingOffset,
Terminates.Never,
)
.via(logTermination("insertDeleteStepSource without ACS"))
.via(
convertFilterContracts(
resolvedQuery,