mirror of
https://github.com/hasura/graphql-engine.git
synced 2024-12-15 17:31:56 +03:00
342391f39d
This upgrades the version of Ormolu required by the HGE repository to v0.5.0.1, and reformats all code accordingly. Ormolu v0.5 reformats code that uses infix operators. This is mostly useful, adding newlines and indentation to make it clear which operators are applied first, but in some cases, it's unpleasant. To make this easier on the eyes, I had to do the following: * Add a few fixity declarations (search for `infix`) * Add parentheses to make precedence clear, allowing Ormolu to keep everything on one line * Rename `relevantEq` to `(==~)` in #6651 and set it to `infix 4` * Add a few _.ormolu_ files (thanks to @hallettj for helping me get started), mostly for Autodocodec operators that don't have explicit fixity declarations In general, I think these changes are quite reasonable. They mostly affect indentation. PR-URL: https://github.com/hasura/graphql-engine-mono/pull/6675 GitOrigin-RevId: cd47d87f1d089fb0bc9dcbbe7798dbceedcd7d83
278 lines
12 KiB
Haskell
278 lines
12 KiB
Haskell
{-# OPTIONS_GHC -Wno-unrecognised-pragmas #-}
|
|
|
|
module Control.Concurrent.Extended
|
|
( module Control.Concurrent,
|
|
sleep,
|
|
ForkableMonadIO,
|
|
|
|
-- * Robust forking
|
|
forkImmortal,
|
|
forkManagedT,
|
|
forkManagedTWithGracefulShutdown,
|
|
|
|
-- * Concurrency in MonadError
|
|
forConcurrentlyEIO,
|
|
concurrentlyEIO,
|
|
|
|
-- * Deprecated
|
|
ImmortalThreadLog (..),
|
|
ThreadState (..),
|
|
ThreadShutdown (..),
|
|
Forever (..),
|
|
)
|
|
where
|
|
|
|
import Control.Concurrent hiding (threadDelay)
|
|
import Control.Concurrent qualified as Base
|
|
import Control.Concurrent.Async as A
|
|
import Control.Concurrent.Async.Lifted.Safe qualified as LA
|
|
import Control.Concurrent.STM qualified as STM
|
|
import Control.Exception
|
|
import Control.Immortal qualified as Immortal
|
|
import Control.Monad.Except
|
|
import Control.Monad.Loops (iterateM_)
|
|
import Control.Monad.Trans.Control qualified as MC
|
|
import Control.Monad.Trans.Managed (ManagedT (..), allocate)
|
|
import Data.Aeson
|
|
import Data.List.Split
|
|
import Data.Traversable
|
|
import Data.Void
|
|
-- For forkImmortal. We could also have it take a cumbersome continuation if we
|
|
-- want to break this dependency. Probably best to move Hasura.Logging into a
|
|
-- separate lib with this if we do the override thing.
|
|
import Hasura.Logging
|
|
import Hasura.Prelude
|
|
|
|
{-# HLINT ignore sleep #-}
|
|
|
|
-- | Like 'Base.threadDelay', but takes a 'DiffTime' instead of an 'Int' microseconds.
|
|
--
|
|
-- NOTE: you cannot simply replace e.g. @threadDelay 1000@ with @sleep 1000@ since those literals
|
|
-- have different meanings!
|
|
sleep :: DiffTime -> IO ()
|
|
sleep = Base.threadDelay . round . Microseconds
|
|
|
|
-- | Note: Please consider using 'forkManagedT' instead to ensure reliable
|
|
-- resource cleanup.
|
|
forkImmortal ::
|
|
ForkableMonadIO m =>
|
|
-- | A label describing this thread's function (see 'labelThread').
|
|
String ->
|
|
Logger Hasura ->
|
|
-- | An IO action we expect never to return normally. This will have the type
|
|
-- signature ':: m a' (see e.g. the type of 'forever').
|
|
m Void ->
|
|
-- | A handle for the forked thread. See "Control.Immortal".
|
|
m Immortal.Thread
|
|
forkImmortal label logger m =
|
|
Immortal.createWithLabel label $ \this -> do
|
|
-- Log that the thread has started
|
|
liftIO $ unLogger logger (ImmortalThreadRestarted label)
|
|
-- In this case, we are handling unexpected exceptions.
|
|
-- i.e This does not catch the asynchronous exception which stops the thread.
|
|
Immortal.onUnexpectedFinish this logAndPause (void m)
|
|
where
|
|
logAndPause = \case
|
|
Right _void -> pure () -- absurd _void (i.e. unreachable)
|
|
Left e -> liftIO $ do
|
|
liftIO $ unLogger logger (ImmortalThreadUnexpectedException label e)
|
|
-- pause before restarting some arbitrary amount of time. The idea is not to flood
|
|
-- logs or cause other cascading failures.
|
|
sleep (seconds 1)
|
|
|
|
data ThreadState = ThreadForked | ThreadBlocking | ThreadShutdownInitiated
|
|
deriving (Show, Eq)
|
|
|
|
-- | @ThreadShutdown@ is a newtype wrapper over an action which is intended
|
|
-- to execute when a thread's shutdown is initiated before killing the thread
|
|
newtype ThreadShutdown m = ThreadShutdown {tsThreadShutdown :: m ()}
|
|
|
|
-- | This function pairs a call to 'forkImmortal' with a finalizer which stops
|
|
-- the immortal thread.
|
|
|
|
-- Note, the thread object can leave its scope if this function is incorrectly
|
|
-- used. Generally, the result should only be used later in the same ManagedT
|
|
-- scope.
|
|
forkManagedT ::
|
|
ForkableMonadIO m =>
|
|
String ->
|
|
Logger Hasura ->
|
|
m Void ->
|
|
ManagedT m Immortal.Thread
|
|
forkManagedT label logger m =
|
|
allocate
|
|
(forkImmortal label logger m)
|
|
( \thread -> do
|
|
unLogger logger (ImmortalThreadStopping label)
|
|
liftIO $ Immortal.stop thread
|
|
)
|
|
|
|
-- | The @Forever@ type defines an infinite looping monadic action (like @m void@), but allows the
|
|
-- caller to control the recursion or insert code before each iteration. The @a@ is the initial argument,
|
|
-- and subsequent iterations will be fed the argument returned by the previous one. See
|
|
-- @forkManagedTWithGracefulShutdown@ to see how it's used
|
|
data Forever m = forall a. Forever a (a -> m a)
|
|
|
|
-- | @forkManagedTWithGracefulShutdown@ is an extension of the @forkManagedT@
|
|
-- function this function also attempts to gracefully shutdown the thread. This function
|
|
-- accepts a `m (Forever m)` argument. The @Forever@ type contains a function and an argument
|
|
-- to the function. The function supplied will be run repeatedly until shutdown is initiated. The
|
|
-- response of the function will be the argument to the next iteration.
|
|
--
|
|
-- For reference, this function is used to run the async actions processor. Check
|
|
-- `asyncActionsProcessor`
|
|
forkManagedTWithGracefulShutdown ::
|
|
ForkableMonadIO m =>
|
|
String ->
|
|
Logger Hasura ->
|
|
ThreadShutdown m ->
|
|
m (Forever m) ->
|
|
ManagedT m Immortal.Thread
|
|
forkManagedTWithGracefulShutdown label logger (ThreadShutdown threadShutdownHandler) loopIteration = do
|
|
threadStateTVar <- liftIO $ STM.newTVarIO ThreadForked
|
|
allocate
|
|
( Immortal.createWithLabel label $ \this -> do
|
|
-- Log that the thread has started
|
|
liftIO $ unLogger logger (ImmortalThreadRestarted label)
|
|
-- In this case, we are handling unexpected exceptions.
|
|
-- i.e This does not catch the asynchronous exception which stops the thread.
|
|
Immortal.onUnexpectedFinish this logAndPause $
|
|
( do
|
|
let mLoop (Forever loopFunctionInitArg loopFunction) =
|
|
flip iterateM_ loopFunctionInitArg $ \args -> do
|
|
liftIO $
|
|
STM.atomically $ do
|
|
STM.readTVar threadStateTVar >>= \case
|
|
ThreadShutdownInitiated -> do
|
|
-- signal to the finalizer that we are now blocking
|
|
-- and blocking forever since this
|
|
-- var moves monotonically from forked -> shutdown -> blocking
|
|
STM.writeTVar threadStateTVar ThreadBlocking
|
|
ThreadBlocking -> STM.retry
|
|
ThreadForked -> pure ()
|
|
loopFunction args
|
|
t <- LA.async $ mLoop =<< loopIteration
|
|
LA.link t
|
|
void $ LA.wait t
|
|
)
|
|
)
|
|
( \thread -> do
|
|
liftIO $
|
|
STM.atomically $
|
|
STM.modifyTVar' threadStateTVar (const ThreadShutdownInitiated)
|
|
-- the threadShutdownHandler here will wait for any in-flight events
|
|
-- to finish processing
|
|
{-
|
|
There is a conundrum here about whether the @threadShutdownHandler@
|
|
should be before or after the @ThreadBlocking@ check call, this is because
|
|
there are problems with both the cases:
|
|
|
|
1. @threadShutdownHandler@ before the @ThreadBlocking@ check
|
|
------------------------------------------------------------
|
|
|
|
Let's say we're just about to start processing a new iteration of the
|
|
loop function and before the processing actually starts the shutdown is
|
|
initiated, there will be no in-flight events (because the batch hasn't started processing yet) so
|
|
@threadShutdownHandler@ will return immediately and the new batch will start processing
|
|
which were fetched earlier. This is a race condition and may kill the thread with some
|
|
of the events still processing.
|
|
|
|
2. @threadShutdownHandler@ after the @ThreadBlocking@ check
|
|
-----------------------------------------------------------
|
|
|
|
This will solve the above race condition but will cause a new problem. The
|
|
graphql-engine accepts a config called `--graceful-shutdown-timeout` which is a timeout
|
|
for any in-flight processing events that are running in the graphql-engine to complete
|
|
processing within this time.
|
|
|
|
Let's say we are going to start iterating over the next iteration of `processEventQueue`
|
|
and without loss of generality let's say this batch takes 100 seconds to finish processing
|
|
and the graceful shutdown timeout is 10 seconds and shutdown is initiated in the midst of processing
|
|
this batch, this will have no effect and the thread will be shutdown after the batch completes (after
|
|
100 seconds) which is wrong because it doesn't respect the graceful shutdown timeout
|
|
|
|
TODO: figure out a way which solves both the problems
|
|
|
|
At the time of writing this PR, we decided to go with 1 because the worst thing
|
|
that will happen is that some events might get processed more than once but this
|
|
is a better solution than what we had earlier where we were shutting down all the in-flight
|
|
processing events without the graceful shutdown timeout.
|
|
-}
|
|
threadShutdownHandler
|
|
liftIO $
|
|
STM.atomically $ do
|
|
STM.readTVar threadStateTVar >>= STM.check . (== ThreadBlocking)
|
|
unLogger logger (ImmortalThreadStopping label)
|
|
liftIO $ Immortal.stop thread
|
|
)
|
|
where
|
|
logAndPause = \case
|
|
Right () -> pure ()
|
|
Left e -> liftIO $ do
|
|
liftIO $ unLogger logger (ImmortalThreadUnexpectedException label e)
|
|
-- pause before restarting some arbitrary amount of time. The idea is not to flood
|
|
-- logs or cause other cascading failures.
|
|
sleep (seconds 1)
|
|
|
|
data ImmortalThreadLog
|
|
= -- | Synchronous Exception
|
|
ImmortalThreadUnexpectedException String SomeException
|
|
| -- | Asynchronous Exception about to be sent
|
|
ImmortalThreadStopping String
|
|
| ImmortalThreadRestarted String
|
|
|
|
instance ToEngineLog ImmortalThreadLog Hasura where
|
|
toEngineLog (ImmortalThreadStopping label) =
|
|
(LevelInfo, ELTInternal ILTUnstructured, toJSON msg)
|
|
where
|
|
msg = "Stopping immortal " <> label <> " thread"
|
|
toEngineLog (ImmortalThreadUnexpectedException label e) =
|
|
(LevelError, ELTInternal ILTUnstructured, toJSON msg)
|
|
where
|
|
msg =
|
|
"Unexpected exception in immortal thread "
|
|
<> label
|
|
<> " (it will be restarted):\n"
|
|
<> show e
|
|
toEngineLog (ImmortalThreadRestarted label) =
|
|
(LevelInfo, ELTInternal ILTUnstructured, toJSON msg)
|
|
where
|
|
msg = "Thread " <> label <> " (re)started"
|
|
|
|
-- TODO
|
|
-- - maybe use this everywhere, but also:
|
|
-- - consider unifying with: src-lib/Control/Monad/Stateless.hs ?
|
|
-- - nice TypeError: https://kodimensional.dev/type-errors
|
|
--
|
|
|
|
-- | Like 'MonadIO' but constrained to stacks in which forking a new thread is reasonable/safe.
|
|
-- In particular 'StateT' causes problems.
|
|
--
|
|
-- This is the constraint you can use for functions that call 'LA.async', or 'immortal'.
|
|
type ForkableMonadIO m = (MonadIO m, MC.MonadBaseControl IO m, LA.Forall (LA.Pure m))
|
|
|
|
-- TODO consider deprecating async.
|
|
-- export something with polymorphic return type, which makes "fork and forget" difficult
|
|
-- this could automatically link in one variant
|
|
-- another variant might return ThreadId that self destructs w/ finalizer (mkWeakThreadId)
|
|
-- and note: "Holding a normal ThreadId reference will prevent the delivery of BlockedIndefinitely exceptions because the reference could be used as the target of throwTo at any time, "
|
|
|
|
-- | A somewhat wonky function for parallelizing @for xs f@ where @f@ is
|
|
-- @(MonadIO m, MonadError e m)@. This is equivalent to @for xs f@ modulo the
|
|
-- IO effects (i.e. when the IO has no real side effects we care about).
|
|
--
|
|
-- This also takes a @chunkSize@ argument so you can manipulate the amount of
|
|
-- work given to each thread.
|
|
forConcurrentlyEIO :: (MonadIO m, MonadError e m) => Int -> [a] -> (a -> ExceptT e IO b) -> m [b]
|
|
forConcurrentlyEIO chunkSize xs f = do
|
|
let fIO a = runExceptT (f a) >>= evaluate
|
|
xs' <- liftIO $ fmap concat $ A.forConcurrently (chunksOf chunkSize xs) $ traverse fIO
|
|
for xs' (either throwError pure)
|
|
|
|
concurrentlyEIO :: (MonadIO m, MonadError e m) => ExceptT e IO a -> ExceptT e IO b -> m (a, b)
|
|
concurrentlyEIO left right = do
|
|
(leftE, rightE) <- liftIO $ A.concurrently (runExceptT left >>= evaluate) (runExceptT right >>= evaluate)
|
|
x <- leftE `onLeft` throwError
|
|
y <- rightE `onLeft` throwError
|
|
pure (x, y)
|