Fix Timer resource teardown (#16289)

After this PR the TimerResourceOwner will make sure that none of
the scheduled task are running, after the Resource finished releasing.
Also adds asynchronous test as evidence.

[CHANGELOG_BEGIN]
[CHANGELOG_END]
This commit is contained in:
Marton Nagy 2023-02-13 13:48:47 +01:00 committed by GitHub
parent c74d950912
commit f5d18dafa4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 73 additions and 7 deletions

View File

@ -3,12 +3,29 @@
package com.daml.resources
import java.util.Timer
import scala.concurrent.Future
import java.util.{Timer, TimerTask}
import scala.concurrent.{Future, Promise}
import scala.util.Try
class TimerResourceOwner[Context: HasExecutionContext](acquireTimer: () => Timer)
extends AbstractResourceOwner[Context, Timer] {
override def acquire()(implicit context: Context): Resource[Context, Timer] =
ReleasableResource(Future(acquireTimer()))(timer => Future(timer.cancel()))
ReleasableResource(Future(acquireTimer())) { timer =>
val timerCancelledPromise = Promise[Unit]()
Future(
// We are cancel()-ing the timer in a scheduled task to make sure no scheduled tasks are running
// as the Timer Resource is released. See Timer.cancel() method's Java Documentation for more information.
timer.schedule(
new TimerTask {
override def run(): Unit =
timerCancelledPromise.complete(
Try(timer.cancel())
)
},
0L,
)
)
// if timer.schedule fails, we do not want to wait for the timerCancelledPromise
.flatMap(_ => timerCancelledPromise.future)
}
}

View File

@ -11,7 +11,6 @@ import java.util.concurrent.{
RejectedExecutionException,
}
import java.util.{Timer, TimerTask}
import com.daml.resources.FailingResourceOwner.{
FailingResourceFailedToOpen,
TriedToReleaseAFailedResource,
@ -22,8 +21,8 @@ import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AsyncWordSpec
import scala.annotation.nowarn
import scala.concurrent.duration.DurationInt
import scala.concurrent.{ExecutionContext, ExecutionContextExecutorService, Future, Promise}
import scala.concurrent.duration.{Duration, DurationInt}
import scala.concurrent.{Await, ExecutionContext, ExecutionContextExecutorService, Future, Promise}
import scala.jdk.CollectionConverters._
import scala.util.{Failure, Success}
@ -734,6 +733,56 @@ final class ResourceOwnerSpec extends AsyncWordSpec with Matchers {
)
}
}
"convert to a ResourceOwner, which ensures, that no TimerTask is running after the resource is released" in {
val finishLongTakingTask = Promise[Unit]()
val timerResource = Factories.forTimer(() => new Timer("test timer")).acquire()
for {
timer <- timerResource.asFuture
(timerReleased, extraTimerTaskStarted) = {
timer.schedule(
new TimerTask {
override def run(): Unit = {
Await.result(finishLongTakingTask.future, Duration(10, "seconds"))
}
},
0L,
)
info("As scheduled a long taking task")
val released = timerResource.release()
info("And as triggered release of the timer resource")
Thread.sleep(100)
info("And as waiting 100 millis")
released.isCompleted shouldBe false
info("The release should not be completed yet")
val extraTimerTaskStarted = Promise[Unit]()
timer.schedule(
new TimerTask {
override def run(): Unit = extraTimerTaskStarted.success(())
},
0L,
)
info("And scheduling a further task is still possible")
finishLongTakingTask.success(())
info("As completing the currently running timer task")
(released, extraTimerTaskStarted.future)
}
_ <- timerReleased
} yield {
info("Timer released")
an[IllegalStateException] should be thrownBy timer.schedule(
new TimerTask {
override def run(): Unit = ()
},
0,
)
info("And scheduling new task on the released timer, is not possible anymore")
Thread.sleep(100)
extraTimerTaskStarted.isCompleted shouldBe false
info("And after waiting 100 millis, the additional task is still not started")
succeed
}
}
}
"many resources in a sequence" should {