kvutils: Fix a flaky test in BatchingQueueSpec with eventually. (#9268)

This is an async test, and therefore we can't rely on the queue
processing information immediately. We need to wait at least a little.

This change brought to you by failures seen in CI.

CHANGELOG_BEGIN
CHANGELOG_END
This commit is contained in:
Samir Talwar 2021-03-29 14:55:53 +02:00 committed by GitHub
parent 7fefeb2513
commit 45b4d63d16
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -153,7 +153,7 @@ class BatchingQueueSpec
"commit batch after maxBatchSizeBytes exceeded" in {
val correlatedSubmission1 = createCorrelatedSubmission("1")
val correlatedSubmission2 = createCorrelatedSubmission("2")
val batches = mutable.ListBuffer.empty[Seq[CorrelatedSubmission]]
val batches = mutable.Buffer.empty[Seq[CorrelatedSubmission]]
val maxWaitDuration = 500.millis
@ -173,24 +173,23 @@ class BatchingQueueSpec
for {
res1 <- queue.offer(correlatedSubmission1)
// Batch not yet full, hence should not be emitted yet.
_ = res1 should be(SubmissionResult.Acknowledged)
// The batch is not yet full, so should not be emitted yet.
_ = {
batches.size should be(0)
}
res2 <- queue.offer(correlatedSubmission2)
// Batch now full, so it should have been immediately emitted.
_ = {
batches.size should be(1)
}
_ = res2 should be(SubmissionResult.Acknowledged)
} yield {
// Wait for the second batch to be emitted due to wait exceeding.
eventually(Timeout(1.second)) {
batches.size should be(2)
// The batch is now full, so it will be emitted immediately, without the second submission.
eventually(Timeout(maxWaitDuration / 2)) {
batches should be(Seq(Seq(correlatedSubmission1)))
}
// After the wait timeout, the second batch will be emitted with the second submission.
eventually(Timeout(maxWaitDuration * 2)) {
batches should be(Seq(Seq(correlatedSubmission1), Seq(correlatedSubmission2)))
}
res1 should be(SubmissionResult.Acknowledged)
res2 should be(SubmissionResult.Acknowledged)
batches.reverse should contain.only(Seq(correlatedSubmission1), Seq(correlatedSubmission2))
queue.state should be(RunningBatchingQueueState.Alive)
}
}