mirror of
https://github.com/digital-asset/daml.git
synced 2024-09-20 01:07:18 +03:00
Prevent scripts from restarting when users trigger other IDE actions, like hover (#16984)
* Stop the semaphore thread cleanly when a result was received * Use ContextId to disamiguate similar script runs * Test opposite side (cancellation does *not* happen after hover) * Longer timeout, drop todo for `scenario service does not interrupt` test * Add better comments, refactor 4-tuple into RunInfo
This commit is contained in:
parent
2216cd70aa
commit
ab5a9d56bb
@ -663,9 +663,8 @@ scriptTests runScripts = testGroup "scripts"
|
||||
|
||||
closeDoc script
|
||||
closeDoc main'
|
||||
-- TODO https://github.com/digital-asset/daml/issues/16772
|
||||
, localOption (mkTimeout 30000000) $ -- 30s timeout
|
||||
testCaseSteps "scenario service interrupts on non-script messages" $ \step -> runScripts $ \_stderr -> do
|
||||
, localOption (mkTimeout 60000000) $ -- 60s timeout
|
||||
testCaseSteps "scenario service does not interrupt on non-script messages" $ \step -> runScripts $ \_stderr -> do
|
||||
-- open document with long-running script
|
||||
main' <- openDoc' "Main.daml" damlId $
|
||||
T.unlines
|
||||
@ -705,14 +704,10 @@ scriptTests runScripts = testGroup "scripts"
|
||||
_ <- sendRequest STextDocumentHover (HoverParams main' (Position 4 3) Nothing)
|
||||
liftIO $ step "Hover sent..."
|
||||
|
||||
-- check that new script is started
|
||||
_ <- liftIO $ hTakeUntil _stderr "SCENARIO SERVICE STDOUT: Script started."
|
||||
liftIO $ step "New script started."
|
||||
|
||||
-- check that previous script is cancelled
|
||||
_ <- liftIO $ hTakeUntil _stderr "SCENARIO SERVICE STDOUT: Script cancelling."
|
||||
_ <- liftIO $ hTakeUntil _stderr "SCENARIO SERVICE STDOUT: Script cancelled."
|
||||
liftIO $ step "Previous script cancelled."
|
||||
-- Check that script did return and that log does not show any cancellations
|
||||
_changeResult <- waitForScriptDidChange
|
||||
_scriptFinishedMessage <- liftIO $ assertUntilWithout _stderr "SCENARIO SERVICE STDOUT: Script finished." "SCENARIO SERVICE STDOUT: Script cancelled."
|
||||
liftIO $ step "Script returned without cancellation."
|
||||
|
||||
closeDoc script
|
||||
closeDoc main'
|
||||
@ -730,9 +725,32 @@ hTakeUntil handle regex = go
|
||||
line <- hGetLine handle
|
||||
if pred line then pure (Just line) else go
|
||||
|
||||
-- Takes lines from the handle until matching the first pattern `until`. If any
|
||||
-- of the lines before the matching line match the second pattern `without`
|
||||
-- then fail the test
|
||||
-- Useful for cases where we want to assert that some message has been emitted
|
||||
-- without a different message being emitted in the interim, e.g. a script
|
||||
-- finished without restarts in between.
|
||||
assertUntilWithout :: Handle -> T.Text -> T.Text -> IO (Maybe String)
|
||||
assertUntilWithout handle until without = go
|
||||
where
|
||||
untilP = matchTest (makeRegex until :: Regex)
|
||||
withoutP = matchTest (makeRegex without :: Regex)
|
||||
go = do
|
||||
closed <- hIsClosed handle
|
||||
if closed
|
||||
then pure Nothing
|
||||
else do
|
||||
line <- hGetLine handle
|
||||
if withoutP line then
|
||||
assertFailure $ "Source line: `" <> line <> "` shouldn't match regular expression `" <> T.unpack without <> "`, but it does."
|
||||
else if untilP line then
|
||||
pure (Just line)
|
||||
else go
|
||||
|
||||
assertRegex :: T.Text -> T.Text -> Assertion
|
||||
assertRegex source regex =
|
||||
let errMsg = "Source text: `" <> T.unpack source <> "` does not match regular expression `" <> T.unpack regex <> "`."
|
||||
let errMsg = "Source text: `" <> T.unpack source <> "` should match regular expression `" <> T.unpack regex <> "`, but it doesn't."
|
||||
in
|
||||
assertBool
|
||||
errMsg
|
||||
|
@ -88,7 +88,26 @@ data Handle = Handle
|
||||
, hContextId :: IORef LowLevel.ContextId
|
||||
-- ^ The root context id, this is mutable so that rather than mutating the context
|
||||
-- we can clone it and update the clone which allows us to safely interrupt a context update.
|
||||
, hRunningHandlers :: MVar (MS.Map RunOptions (ThreadId, MVar Bool))
|
||||
, hRunningHandlers :: MVar (MS.Map RunOptions RunInfo)
|
||||
-- ^ Track running scripts as a map between the RunOptions that triggered
|
||||
-- them and all information required to cancel them or to resume from them
|
||||
-- ContextId for determining ThreadId for cancelling via asynchronous exception
|
||||
}
|
||||
|
||||
-- RunInfo stores information required for cancelling and resuming from script runs
|
||||
data RunInfo = RunInfo
|
||||
{ threadId :: ThreadId
|
||||
, context :: LowLevel.ContextId
|
||||
-- ^ If a new prospective script run has a newer context id, then threads
|
||||
-- with older contexts should be cancelled
|
||||
, stop :: MVar Bool
|
||||
-- ^ To cancel a thread, put True into this semaphore, which triggers
|
||||
-- cancellation in the corresponding lowlevel script run
|
||||
, result :: Barrier (Either LowLevel.Error LowLevel.ScenarioResult)
|
||||
-- ^ To obtain the result of a script run, listen to this barrier, which will
|
||||
-- be filled by the lowlevel script run when the script run terminates
|
||||
-- Must be a barrier so that both this run and future runs can subscribe to
|
||||
-- the same value.
|
||||
}
|
||||
|
||||
withSem :: QSemN -> IO a -> IO a
|
||||
@ -262,36 +281,52 @@ runLiveScript h ctxId logger name statusUpdateHandler = runWithOptions (RunOptio
|
||||
|
||||
runWithOptions :: RunOptions -> Handle -> LowLevel.ContextId -> IDELogger.Logger -> IO (Either LowLevel.Error LowLevel.ScenarioResult)
|
||||
runWithOptions options Handle{..} ctxId logger = do
|
||||
resVar <- newEmptyMVar
|
||||
resBarrier <- newBarrier
|
||||
stopSemaphore <- newEmptyMVar
|
||||
|
||||
-- If the internal or external thread receives a cancellation exception, signal to stop
|
||||
modifyMVar_ hRunningHandlers $ \runningHandlers -> do
|
||||
-- If there was an old thread handling the same scenario in the same
|
||||
-- way, under a different context, send a cancellation to its semaphore
|
||||
let mbOldRunInfo = MS.lookup options runningHandlers
|
||||
case mbOldRunInfo of
|
||||
Just oldRunInfo
|
||||
| context oldRunInfo == ctxId -> pure ()
|
||||
| otherwise -> do
|
||||
_ <- tryPutMVar (stop oldRunInfo) True
|
||||
pure ()
|
||||
Nothing -> pure ()
|
||||
|
||||
-- If there was an old thread handling the same scenario in the same
|
||||
-- way, under a different context, or if there was no old thread, start a
|
||||
-- new thread to replace it.
|
||||
-- Otherwise (when there is an old thread with the same context id) listen
|
||||
-- to that thread's result via its result barrier
|
||||
case mbOldRunInfo of
|
||||
Just oldRunInfo
|
||||
| context oldRunInfo == ctxId -> do
|
||||
_ <- forkIO $ do
|
||||
oldResult <- waitBarrier (result oldRunInfo)
|
||||
signalBarrier resBarrier oldResult
|
||||
pure runningHandlers
|
||||
_ -> do
|
||||
handlerThread <- forkIO $ withSem hConcurrencySem $ do
|
||||
r <- try $ optionsToLowLevel options hLowLevelHandle ctxId logger stopSemaphore
|
||||
putMVar resVar $
|
||||
signalBarrier resBarrier $
|
||||
case r of
|
||||
Left ex -> Left $ LowLevel.ExceptionError ex
|
||||
Right r -> r
|
||||
pure ()
|
||||
|
||||
-- Store the new thread into runningHandlers
|
||||
let insertLookup kx x t = MS.insertLookupWithKey (\_ a _ -> a) kx x t
|
||||
let (mbOldThread, newRunningHandlers) = insertLookup options (handlerThread, stopSemaphore) runningHandlers
|
||||
|
||||
-- If there was an old thread handling the same scenario in the same way,
|
||||
-- send a cancellation to its semaphore
|
||||
case mbOldThread of
|
||||
Just (_, oldThreadSemaphore) -> do
|
||||
_ <- tryPutMVar oldThreadSemaphore True
|
||||
pure ()
|
||||
_ -> pure ()
|
||||
|
||||
-- Return updated runningHandlers
|
||||
let selfInfo =
|
||||
RunInfo
|
||||
{ threadId = handlerThread
|
||||
, context = ctxId
|
||||
, stop = stopSemaphore
|
||||
, result = resBarrier
|
||||
}
|
||||
let newRunningHandlers = MS.insert options selfInfo runningHandlers
|
||||
pure newRunningHandlers
|
||||
res <- takeMVar resVar
|
||||
_ <- tryPutMVar stopSemaphore False
|
||||
pure res
|
||||
waitBarrier resBarrier
|
||||
|
||||
optionsToLowLevel :: RunOptions -> LowLevel.Handle -> LowLevel.ContextId -> IDELogger.Logger -> MVar Bool -> IO (Either LowLevel.Error LowLevel.ScenarioResult)
|
||||
optionsToLowLevel RunOptions{..} h ctxId logger mask =
|
||||
|
@ -483,6 +483,7 @@ runBiDiLive runner Handle{..} (ContextId ctxId) name logger stopSemaphore status
|
||||
NoResultUpdate -> loop
|
||||
_ -> pure ()
|
||||
pure ()
|
||||
_ <- tryPutMVar stopSemaphore False -- once we exit, stop the semaphore checking thread
|
||||
case response of
|
||||
ClientBiDiResponse _ StatusOk _ -> getFinalResponse
|
||||
ClientBiDiResponse _ status _ -> pure (Left (BackendError (BErrorFail status)))
|
||||
|
@ -143,6 +143,7 @@ object ScriptStream {
|
||||
}
|
||||
internal.onNext(message)
|
||||
internal.onCompleted()
|
||||
println(f"Script finished.")
|
||||
}
|
||||
|
||||
override def sendStatus(status: ScenarioStatus): Unit = {}
|
||||
@ -166,6 +167,7 @@ object ScriptStream {
|
||||
}
|
||||
internal.onNext(message)
|
||||
internal.onCompleted()
|
||||
println(f"Script finished.")
|
||||
}
|
||||
|
||||
override def sendStatus(status: ScenarioStatus): Unit = internal.synchronized {
|
||||
@ -268,9 +270,7 @@ class ScenarioService(
|
||||
println(f"Received error $t")
|
||||
}
|
||||
|
||||
override def onCompleted(): Unit = {
|
||||
println("Completed.")
|
||||
}
|
||||
override def onCompleted(): Unit = {}
|
||||
}
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user