test-common: Make FiniteStreamObserver more lenient. (#7996)

* test-common: Make FiniteStreamObserver more lenient.

We can't protect against all possible race conditions; we just have to
accept that sometimes `onNext` will be called after `onCompleted`, or
`onCompleted` will be called twice, or…

CHANGELOG_BEGIN
CHANGELOG_END

* test-common: Avoid calling `onCompleted` twice in `TimeBoundObserver`.
This commit is contained in:
Samir Talwar 2020-11-18 11:45:40 +01:00 committed by GitHub
parent 43b1f9c96f
commit 1ac3160a46
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 24 additions and 10 deletions

View File

@ -19,14 +19,16 @@ private[testing] final class FiniteStreamObserver[A] extends StreamObserver[A] {
val result: Future[Vector[A]] = promise.future
override def onNext(value: A): Unit = {
val _ = items.synchronized(items += value)
override def onNext(value: A): Unit = items.synchronized {
val _ = items += value
}
override def onError(t: Throwable): Unit = promise.failure(t)
override def onError(t: Throwable): Unit = {
val _ = promise.tryFailure(t)
}
override def onCompleted(): Unit = {
val _ = items.synchronized(promise.success(items.result()))
override def onCompleted(): Unit = items.synchronized {
val _ = promise.trySuccess(items.result())
}
}

View File

@ -15,20 +15,32 @@ final class TimeBoundObserver[A](
)(delegate: StreamObserver[A])(implicit executionContext: ExecutionContext)
extends StreamObserver[A] {
private var done = false
Delayed.by(duration)(synchronized {
onCompleted()
Context.current().withCancellation().cancel(null)
if (!done) {
onCompleted()
val _ = Context.current().withCancellation().cancel(null)
}
})
override def onNext(value: A): Unit = synchronized {
delegate.onNext(value)
if (!done) {
delegate.onNext(value)
}
}
override def onError(t: Throwable): Unit = synchronized {
delegate.onError(t)
if (!done) {
delegate.onError(t)
done = true
}
}
override def onCompleted(): Unit = synchronized {
delegate.onCompleted()
if (!done) {
delegate.onCompleted()
done = true
}
}
}