bindings-rxjava: Return the Disposable from Bot.wire. (#6476)

* bindings-rxjava: Return the `Disposable` from `Bot.wire`.

And in `BotTest`, dispose of the connection.

We are seeing some spurious errors in runs of BotTest due to connections
shutting down in the wrong order. By explicitly disposing of the wiring
before shutting down the ledger client, we can hopefully suppress these
errors.

They're not causing test failures, but they do often obscure the real
failure.

CHANGELOG_BEGIN
- [RxJava Bindings] `Bot.wire` and `Bot.wireSimple` now return a
  `Disposable`, which can be used to shut down the flows. You are
  encouraged to call `.dispose()` before terminating the client.
CHANGELOG_END

* bindings-rxjava: Don't run `awaitTermination` in a loop.

When shutting down the ledger client, instead of calling
`channel.awaitTermination` in a loop, we just call it once with an
approximately-infinite timeout.

* bindings-rxjava: Use the loan pattern for disposing in BotTest.

* bindings-rxjava: Simplify BotTest a little further.
This commit is contained in:
Samir Talwar 2020-06-24 17:09:27 +02:00 committed by GitHub
parent c7ea0a8b08
commit c5e1e39d23
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 171 additions and 153 deletions

View File

@ -3,17 +3,16 @@
package com.daml.ledger.rxjava;
import com.daml.ledger.rxjava.grpc.*;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import com.daml.grpc.adapter.SingleThreadExecutionSequencerPool;
import com.daml.ledger.rxjava.grpc.*;
import io.grpc.ManagedChannel;
import io.grpc.netty.GrpcSslContexts;
import io.grpc.netty.NettyChannelBuilder;
import io.netty.handler.ssl.SslContext;
import org.checkerframework.checker.nullness.qual.NonNull;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
/**
* A {@link LedgerClient} implementation that connects to
* an existing Ledger and provides clients to query it. To use the {@link DamlLedgerClient}:
@ -24,13 +23,12 @@ import java.util.concurrent.TimeUnit;
* <li>Call the method {@link DamlLedgerClient#connect()} to initialize the clients for that particular ledger</li>
* <li>Retrieve one of the clients by using a getter, e.g. {@link DamlLedgerClient#getActiveContractSetClient()}</li>
* </ol>
*
* <p>
* Alternatively to {@link DamlLedgerClient#newBuilder(String, int)}, you can use {@link DamlLedgerClient#newBuilder(NettyChannelBuilder)}
* to make sure you can specify additional properties for the channel you're building, such as the maximum inbound message size.
*
* <p>
* For information on how to set up an {@link SslContext} object for mutual authentication please refer to
* the <a href="https://github.com/grpc/grpc-java/blob/master/SECURITY.md">section on security</a> in the grpc-java documentation.
*
*/
public final class DamlLedgerClient implements LedgerClient {
@ -73,7 +71,7 @@ public final class DamlLedgerClient implements LedgerClient {
/**
* Create a new {@link Builder} with the given parameters
*
* <p>
* Useful as a shortcut unless you have to customize the {@link NettyChannelBuilder} beyond the builder's capabilities
*/
public static Builder newBuilder(@NonNull String host, int port) {
@ -82,7 +80,7 @@ public final class DamlLedgerClient implements LedgerClient {
/**
* Create a new {@link Builder} with the given parameters
*
* <p>
* Useful to customize the {@link NettyChannelBuilder} beyond the builder's capabilities,
* otherwise {@link DamlLedgerClient#newBuilder(String, int)} is probably more suited for your use case
*/
@ -94,9 +92,9 @@ public final class DamlLedgerClient implements LedgerClient {
* Creates a {@link DamlLedgerClient} connected to a Ledger
* identified by the ip and port.
*
* @param ledgerId The expected ledger-id
* @param hostIp The ip of the Ledger
* @param hostPort The port of the Ledger
* @param ledgerId The expected ledger-id
* @param hostIp The ip of the Ledger
* @param hostPort The port of the Ledger
* @param sslContext If present, it will be used to establish a TLS connection. If empty, an unsecured plaintext connection will be used.
* Must be an SslContext created for client applications via {@link GrpcSslContexts#forClient()}.
* @deprecated since 0.13.38, please use {@link DamlLedgerClient#DamlLedgerClient(NettyChannelBuilder, Optional, Optional)} or even better either {@link DamlLedgerClient#newBuilder}
@ -111,6 +109,7 @@ public final class DamlLedgerClient implements LedgerClient {
/**
* Like {@link DamlLedgerClient#forLedgerIdAndHost(String, String, int, Optional)} but with the ledger-id
* automatically discovered instead of provided.
*
* @deprecated since 0.13.38, please use {@link DamlLedgerClient#DamlLedgerClient(NettyChannelBuilder, Optional, Optional)} or even better either {@link DamlLedgerClient#newBuilder}
*/
@Deprecated
@ -142,9 +141,10 @@ public final class DamlLedgerClient implements LedgerClient {
/**
* Creates a {@link DamlLedgerClient} with a previously created {@link ManagedChannel}. This is useful in
* case additional settings need to be configured for the connection to the ledger (e.g. keep alive timeout).
*
* @param expectedLedgerId If the value is present, {@link DamlLedgerClient#connect()} throws an exception
* if the provided ledger id does not match the ledger id provided by the ledger.
* @param channel A user provided instance of @{@link ManagedChannel}.
* @param channel A user provided instance of @{@link ManagedChannel}.
* @deprecated since 0.13.38, please use {@link DamlLedgerClient#newBuilder}
*/
@Deprecated
@ -225,13 +225,13 @@ public final class DamlLedgerClient implements LedgerClient {
}
@Override
public TimeClient getTimeClient() { return timeClient; }
public TimeClient getTimeClient() {
return timeClient;
}
public void close() throws Exception {
channel.shutdownNow();
while (!channel.awaitTermination(1, TimeUnit.SECONDS)) {
}
channel.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
pool.close();
}

View File

@ -3,6 +3,10 @@
package com.daml.ledger.rxjava.components;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiConsumer;
import java.util.function.Function;
import com.daml.ledger.javaapi.data.*;
import com.daml.ledger.rxjava.CommandSubmissionClient;
import com.daml.ledger.rxjava.LedgerClient;
@ -13,6 +17,7 @@ import com.daml.ledger.rxjava.components.helpers.Pair;
import com.daml.ledger.rxjava.util.FlowableLogger;
import com.google.rpc.Code;
import io.reactivex.*;
import io.reactivex.disposables.Disposable;
import io.reactivex.schedulers.Schedulers;
import io.reactivex.subjects.ReplaySubject;
import io.reactivex.subjects.Subject;
@ -20,16 +25,10 @@ import org.checkerframework.checker.nullness.qual.NonNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiConsumer;
import java.util.function.Function;
/**
* A Bot is an automation that reacts to changes in the ledger by submitting zero or more
* commands. This class contains helpers to create bots. The main helper is
* {@link #wire(String, LedgerClient, TransactionFilter, Function, Function)}.
*
*/
public class Bot {
@ -45,48 +44,53 @@ public class Bot {
/**
* Wires the Bot logic to an existing {@link LedgerClient} instance.
*
* @param applicationId The application identifier that will be sent to the Ledger
* @param ledgerClient The {@link LedgerClient} instance which will be wired to the
* bot.
* @param <R> The type of the result of transform.
* @param applicationId The application identifier that will be sent to the Ledger
* @param ledgerClient The {@link LedgerClient} instance which will be wired to the
* bot.
* @param transactionFilter A server-side filter of incoming transactions
* @param bot The business logic of the bot.
* @param transform A function from the arguments of a Contract on the Ledger to
* a more refined type R. This can be used by the developer to, for
* instance, discard the fields of a Contract that are not needed
* and save space.
* @param <R> The type of the result of transform.
* @param bot The business logic of the bot.
* @param transform A function from the arguments of a Contract on the Ledger to
* a more refined type R. This can be used by the developer to, for
* instance, discard the fields of a Contract that are not needed
* and save space.
* @return The subscription representing the connection.
*/
public static <R> void wire(String applicationId,
LedgerClient ledgerClient,
TransactionFilter transactionFilter,
Function<LedgerViewFlowable.LedgerView<R>, Flowable<CommandsAndPendingSet>> bot,
Function<CreatedContract, R> transform) {
wire(applicationId, ledgerClient, transactionFilter, bot, transform, Schedulers.io());
public static <R> Disposable wire(
String applicationId,
LedgerClient ledgerClient,
TransactionFilter transactionFilter,
Function<LedgerViewFlowable.LedgerView<R>, Flowable<CommandsAndPendingSet>> bot,
Function<CreatedContract, R> transform
) {
return wire(applicationId, ledgerClient, transactionFilter, bot, transform, Schedulers.io());
}
/**
* Wires the Bot logic to an existing {@link LedgerClient} instance.
*
* @param applicationId The application identifier that will be sent to the Ledger
* @param ledgerClient The {@link LedgerClient} instance which will be wired to the
* bot.
* @param <R> The type of the result of transform.
* @param applicationId The application identifier that will be sent to the Ledger
* @param ledgerClient The {@link LedgerClient} instance which will be wired to the
* bot.
* @param transactionFilter A server-side filter of incoming transactions
* @param bot The business logic of the bot.
* @param transform A function from the arguments of a Contract on the Ledger to
* a more refined type R. This can be used by the developer to, for
* instance, discard the fields of a Contract that are not needed
* and save space.
* @param scheduler The scheduler used to run the flows
* @param <R> The type of the result of transform.
* @param bot The business logic of the bot.
* @param transform A function from the arguments of a Contract on the Ledger to
* a more refined type R. This can be used by the developer to, for
* instance, discard the fields of a Contract that are not needed
* and save space.
* @param scheduler The scheduler used to run the flows
* @return The subscription representing the connection.
*/
public static <R> void wire(String applicationId,
LedgerClient ledgerClient,
TransactionFilter transactionFilter,
Function<LedgerViewFlowable.LedgerView<R>, Flowable<CommandsAndPendingSet>> bot,
Function<CreatedContract, R> transform,
Scheduler scheduler) {
public static <R> Disposable wire(
String applicationId,
LedgerClient ledgerClient,
TransactionFilter transactionFilter,
Function<LedgerViewFlowable.LedgerView<R>, Flowable<CommandsAndPendingSet>> bot,
Function<CreatedContract, R> transform,
Scheduler scheduler
) {
logger.info("Bot wiring started for parties {}", transactionFilter.getParties());
TransactionsClient transactionsClient = ledgerClient.getTransactionsClient();
@ -136,7 +140,7 @@ public class Bot {
transform
);
Flowable<CommandsAndPendingSet> botResult = ledgerViews.concatMap(ledgerView -> {
Flowable<CommandsAndPendingSet> result = null;
Flowable<CommandsAndPendingSet> result;
try {
Flowable<CommandsAndPendingSet> commandsToSend = bot.apply(ledgerView);
result = Flowable.concat(commandsToSend, Flowable.just(CommandsAndPendingSet.empty));
@ -159,24 +163,27 @@ public class Bot {
logger.info("Bot wiring complete for parties {}", transactionFilter.getParties());
});
// Since we have removed the blockingGet call, we now need to make sure that the flow is actually triggered
mainFlow.toFlowable().observeOn(scheduler).publish().connect();
return mainFlow.toFlowable().observeOn(scheduler).publish().connect();
}
/**
* Wires the Bot logic to an existing {@link LedgerClient} instance, storing {@link CreatedContract}
* instances in the {@link com.daml.ledger.rxjava.components.LedgerViewFlowable.LedgerView}.
*
* @param appId The application identifier that will be sent to the Ledger
* @param ledgerClient The {@link LedgerClient} instance which will be wired to the
* bot.
* @param appId The application identifier that will be sent to the Ledger
* @param ledgerClient The {@link LedgerClient} instance which will be wired to the
* bot.
* @param transactionFilter A server-side filter of incoming transactions
* @param bot The business logic of the bot.
* @param bot The business logic of the bot.
* @return
*/
public static void wireSimple(String appId,
LedgerClient ledgerClient,
TransactionFilter transactionFilter,
Function<LedgerViewFlowable.LedgerView<CreatedContract>, Flowable<CommandsAndPendingSet>> bot) {
Bot.<CreatedContract>wire(appId, ledgerClient, transactionFilter, bot, r -> r);
public static Disposable wireSimple(
String appId,
LedgerClient ledgerClient,
TransactionFilter transactionFilter,
Function<LedgerViewFlowable.LedgerView<CreatedContract>, Flowable<CommandsAndPendingSet>> bot
) {
return Bot.<CreatedContract>wire(appId, ledgerClient, transactionFilter, bot, r -> r);
}
static Flowable<WorkflowEvent> activeContractSetAndNewTransactions(LedgerClient ledgerClient, TransactionFilter filter) {
@ -189,9 +196,9 @@ public class Bot {
};
Flowable<GetActiveContractsResponse> activeContracts =
FlowableLogger.log(ledgerClient.getActiveContractSetClient().getActiveContracts(filter, true), "acs")
.doOnNext(r -> {
r.getOffset().ifPresent(off -> setOffset.accept(new LedgerOffset.Absolute(off), "acs.next"));
}).doOnComplete(() -> setOffset.accept(LedgerOffset.LedgerBegin.getInstance(), "acs.complete"));
.doOnNext(r -> {
r.getOffset().ifPresent(off -> setOffset.accept(new LedgerOffset.Absolute(off), "acs.next"));
}).doOnComplete(() -> setOffset.accept(LedgerOffset.LedgerBegin.getInstance(), "acs.complete"));
Flowable<Transaction> transactions = Single.fromFuture(offsetFuture)
.doOnSuccess(o -> logger.debug("offset.success: " + o))
.doOnError(t -> logger.error("offset.error: " + t))
@ -206,9 +213,9 @@ public class Bot {
return FlowableLogger.log(commandSubmissionClient.submit(cs.getWorkflowId(), cs.getApplicationId(),
cs.getCommandId(), cs.getParty(), cs.getMinLedgerTimeAbsolute(), cs.getMinLedgerTimeRelative(),
cs.getDeduplicationTime(), cs.getCommands())
.flatMapMaybe(s -> Maybe.<LedgerViewFlowable.SubmissionFailure> empty())
.doOnError(t -> logger.error("Error submitting commands {} for party {}: {}", cs.getCommandId(), cs.getParty(), t.getMessage()))
.onErrorReturn(t -> new LedgerViewFlowable.SubmissionFailure(cs.getCommandId(), t))
.flatMapMaybe(s -> Maybe.<LedgerViewFlowable.SubmissionFailure>empty())
.doOnError(t -> logger.error("Error submitting commands {} for party {}: {}", cs.getCommandId(), cs.getParty(), t.getMessage()))
.onErrorReturn(t -> new LedgerViewFlowable.SubmissionFailure(cs.getCommandId(), t))
, "commandSubmissions");
};
}

View File

@ -9,13 +9,6 @@ import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
import java.util.{Collections, Optional, function}
import com.daml.ledger.javaapi.data.{Unit => DAMLUnit, _}
import com.daml.ledger.rxjava.components.BotTest._
import com.daml.ledger.rxjava.components.LedgerViewFlowable.LedgerView
import com.daml.ledger.rxjava.components.helpers.{CommandsAndPendingSet, CreatedContract}
import com.daml.ledger.rxjava.components.tests.helpers.DummyLedgerClient
import com.daml.ledger.rxjava.grpc.helpers.{LedgerServices, TransactionsServiceImpl}
import com.daml.ledger.rxjava.{CommandSubmissionClient, DamlLedgerClient, untestedEndpoint}
import com.daml.grpc.{GrpcException, GrpcStatus}
import com.daml.ledger.api.auth.AuthServiceWildcard
import com.daml.ledger.api.v1.command_service.{
@ -24,11 +17,19 @@ import com.daml.ledger.api.v1.command_service.{
SubmitAndWaitForTransactionTreeResponse
}
import com.daml.ledger.api.{v1 => scalaAPI}
import com.daml.ledger.javaapi.data.{Unit => DAMLUnit, _}
import com.daml.ledger.rxjava.components.BotTest._
import com.daml.ledger.rxjava.components.LedgerViewFlowable.LedgerView
import com.daml.ledger.rxjava.components.helpers.{CommandsAndPendingSet, CreatedContract}
import com.daml.ledger.rxjava.components.tests.helpers.DummyLedgerClient
import com.daml.ledger.rxjava.grpc.helpers.{LedgerServices, TransactionsServiceImpl}
import com.daml.ledger.rxjava.{CommandSubmissionClient, DamlLedgerClient, untestedEndpoint}
import com.google.protobuf.empty.Empty
import com.google.protobuf.{Empty => JEmpty}
import com.google.rpc.Status
import com.google.rpc.code.Code.OK
import io.grpc.Metadata
import io.reactivex.disposables.Disposable
import io.reactivex.{Flowable, Observable, Single}
import org.pcollections.{HashTreePMap, HashTreePSet}
import org.reactivestreams.{Subscriber, Subscription}
@ -229,24 +230,24 @@ final class BotTest extends FlatSpec with Matchers with Eventually {
}
val counter = new AtomicInteger(0)
Bot.wire(appId, ledgerClient, transactionFilter, bot, _ => counter)
using(Bot.wire(appId, ledgerClient, transactionFilter, bot, _ => counter)) {
// when the bot is wired-up, no command should have been submitted to the server
ledgerClient.submitted should have size 0
counter.get shouldBe 0
// when the bot is wired-up, no command should have been submitted to the server
ledgerClient.submitted should have size 0
counter.get shouldBe 0
// when the bot receives a transaction, a command should be submitted to the server
val createdEvent1 = create(party, templateId, id = 1)
transactions.emit(transaction(createdEvent1))
// when the bot receives a transaction, a command should be submitted to the server
val createdEvent1 = create(party, templateId, id = 1)
transactions.emit(transaction(createdEvent1))
eventually {
finishedWork.get shouldBe true
}
eventually {
finishedWork.get shouldBe true
ledgerClient.submitted should have size 3
counter.get shouldBe 3
transactions.complete()
}
ledgerClient.submitted should have size 3
counter.get shouldBe 3
transactions.complete()
}
it should "wire a bot to the ledger-client" in {
@ -311,61 +312,61 @@ final class BotTest extends FlatSpec with Matchers with Eventually {
)
}
}
Bot.wireSimple(appId, ledgerClient, transactionFilter, bot)
using(Bot.wireSimple(appId, ledgerClient, transactionFilter, bot)) {
// when the bot is wired-up, no command should have been submitted to the server
eventually {
ledgerClient.submitted should have size 0
}
// when the bot is wired-up, no command should have been submitted to the server
eventually {
ledgerClient.submitted should have size 0
}
// when the bot receives a transaction, a command should be submitted to the server
val createdEvent1 = create(party, templateId)
transactions.emit(transaction(createdEvent1))
eventually {
ledgerClient.submitted should have size 1
}
// when the bot receives a transaction, a command should be submitted to the server
val createdEvent1 = create(party, templateId)
transactions.emit(transaction(createdEvent1))
eventually {
ledgerClient.submitted should have size 1
}
val archivedEvent1 = archive(createdEvent1)
val createEvent2 = create(party, templateId)
val createEvent3 = create(party, templateId)
transactions.emit(transaction(archivedEvent1, createEvent2, createEvent3))
eventually {
ledgerClient.submitted should have size 3
}
val archivedEvent1 = archive(createdEvent1)
val createEvent2 = create(party, templateId)
val createEvent3 = create(party, templateId)
transactions.emit(transaction(archivedEvent1, createEvent2, createEvent3))
eventually {
// we complete the first command with success and then check that the client hasn't submitted a new command
commandCompletions.emit(
new CompletionStreamResponse(
Optional.of(new Checkpoint(ZeroTimestamp, new LedgerOffset.Absolute(""))),
List(
scalaAPI.CompletionOuterClass.Completion
.newBuilder()
.setCommandId("commandId_0")
.setStatus(Status.newBuilder().setCode(OK.value).build())
.build()).asJava
))
Thread.sleep(100)
ledgerClient.submitted should have size 3
// WARNING: THE FOLLOWING TEST IS NOT PASSING YET
// // we complete the second command with failure and then check that the client has submitted a new command
// commandCompletions.emit(
// new CompletionStreamResponse(
// Optional.of(new Checkpoint(ZeroTimestamp, new LedgerOffset.Absolute(""))),
// List(
// CompletionOuterClass.Completion
// .newBuilder()
// .setCommandId("commandId_1")
// .setStatus(Status.newBuilder().setCode(INVALID_ARGUMENT.value))
// .build(),
// ).asJava
// ))
// eventually {
// ledgerClient.submitted should have size 4
// }
transactions.complete()
commandCompletions.complete()
}
// we complete the first command with success and then check that the client hasn't submitted a new command
commandCompletions.emit(
new CompletionStreamResponse(
Optional.of(new Checkpoint(ZeroTimestamp, new LedgerOffset.Absolute(""))),
List(
scalaAPI.CompletionOuterClass.Completion
.newBuilder()
.setCommandId("commandId_0")
.setStatus(Status.newBuilder().setCode(OK.value).build())
.build()).asJava
))
Thread.sleep(100)
ledgerClient.submitted should have size 3
// WARNING: THE FOLLOWING TEST IS NOT PASSING YET
// // we complete the second command with failure and then check that the client has submitted a new command
// commandCompletions.emit(
// new CompletionStreamResponse(
// Optional.of(new Checkpoint(ZeroTimestamp, new LedgerOffset.Absolute(""))),
// List(
// CompletionOuterClass.Completion
// .newBuilder()
// .setCommandId("commandId_1")
// .setStatus(Status.newBuilder().setCode(INVALID_ARGUMENT.value))
// .build(),
// ).asJava
// ))
// eventually {
// ledgerClient.submitted should have size 4
// }
transactions.complete()
commandCompletions.complete()
}
it should "query first the ACS and then the LedgerEnd sequentially so that there is not race condition and LedgerEnd >= ACS offset" in {
@ -442,26 +443,28 @@ final class BotTest extends FlatSpec with Matchers with Eventually {
* error to pretty-print it. This try-catch depends on the implementation of `TransactionServiceImpl.getTransactionTree()`
*/
try {
Bot.wireSimple(
"appId",
client,
new FiltersByParty(Collections.emptyMap()),
_ => Flowable.empty())
Bot
.wireSimple(
"appId",
client,
new FiltersByParty(Collections.emptyMap()),
_ => Flowable.empty(),
)
.dispose()
} catch {
case GrpcException(GrpcStatus.INVALID_ARGUMENT(), trailers) =>
/** the tests relies on specific implementation of the [[TransactionsServiceImpl.getTransactions()]] */
fail(trailers.get(Metadata.Key.of("cause", Metadata.ASCII_STRING_MARSHALLER)))
}
// test is passed, we wait a bit to avoid issues with gRPC and then close the client. If there is an exception,
// we just ignore it as there are some problems with gRPC
// If there is an exception, we just ignore it as there are some problems with gRPC.
try {
Thread.sleep(100)
client.close()
} catch {
case NonFatal(e) =>
case NonFatal(exception) =>
logger.warn(
s"Closing DamlLedgerClient caused an error, ignoring it because it can happen and it should not be a problem. Error is $e")
"Closing DamlLedgerClient caused an error, ignoring it because it can happen and it should not be a problem",
exception)
}
}
}
@ -556,4 +559,12 @@ object BotTest {
}
}
}
private def using[A <: Disposable, B](disposable: A)(run: => B): B = {
try {
run
} finally {
disposable.dispose()
}
}
}