Added synchronous error reporting (#32)

This commit is contained in:
iko 2021-02-08 13:34:32 +03:00
parent b2c89c8551
commit f68291d5a1

View File

@ -6,10 +6,11 @@ import Control.Applicative
import Control.Concurrent (threadDelay)
import Control.Concurrent.Async (race_)
import qualified Control.Concurrent.Lifted as L
import qualified Control.Exception.Lifted as L
import Control.Concurrent.MVar
import Control.Concurrent.STM
import Control.Exception (Exception, throwIO, try)
import Control.Lens
import Control.Lens hiding (pre)
import Control.Monad
import Control.Monad.Except
import Control.Monad.Reader
@ -359,43 +360,46 @@ createH dep = do
{ errBody = validationError [badNameText] [] }
t1 <- liftIO $ now
st <- ask
let pgPool = pool st
failIfImageNotFound (name dep) (tag dep)
failIfGracefulShutdownActivated
runDeploymentBgWorker Nothing (name dep) $ do
let
q =
"INSERT INTO deployments (name, tag, status) \
\VALUES (?, ?, ?) RETURNING id"
pgPool = pool st
createDep :: PgPool -> Deployment -> IO [Only Int]
createDep p Deployment { name = n, tag = t } =
withResource p $ \conn ->
query conn q (n, t, CreatePending)
res :: Either SqlError [Only Int] <- liftIO . try $ createDep pgPool dep
dId <- case res of
Right ((Only depId) : _) ->
pure . DeploymentId $ depId
Right [] ->
throwError err404
{ errBody = validationError ["Name not found"] [] }
Left (SqlError code _ _ _ _) | code == unique_violation ->
throwError err400
{ errBody = validationError ["Deployment already exists"] [] }
Left (SqlError _ _ _ _ _) ->
throwError err409 { errBody = appError "Some database error" }
liftIO . withResource pgPool $ \conn ->
upsertNewOverrides conn dId (appOverrides dep) (deploymentOverrides dep)
liftBase $ sendReloadEvent st
liftBase $ updateDeploymentInfo (name dep) st
(ec, out, err) <- liftBase $ createDeployment dep st
t2 <- liftBase $ now
let
elTime = elapsedTime t2 t1
liftIO . withResource pgPool $ \conn ->
-- calling it directly now is fine since there is no previous status.
createDeploymentLog conn dep "create" ec elTime out err
liftBase $ sendReloadEvent st
liftBase $ handleExitCode ec
runDeploymentBgWorker Nothing (name dep)
( do
let
q =
"INSERT INTO deployments (name, tag, status) \
\VALUES (?, ?, ?) RETURNING id"
createDep :: PgPool -> Deployment -> IO [Only Int]
createDep p Deployment { name = n, tag = t } =
withResource p $ \conn ->
query conn q (n, t, CreatePending)
res :: Either SqlError [Only Int] <- liftIO . try $ createDep pgPool dep
case res of
Right ((Only depId) : _) ->
pure . DeploymentId $ depId
Right [] ->
throwError err404
{ errBody = validationError ["Name not found"] [] }
Left (SqlError code _ _ _ _) | code == unique_violation ->
throwError err400
{ errBody = validationError ["Deployment already exists"] [] }
Left (SqlError _ _ _ _ _) ->
throwError err409 { errBody = appError "Some database error" }
)
$ \dId -> do
liftIO . withResource pgPool $ \conn ->
upsertNewOverrides conn dId (appOverrides dep) (deploymentOverrides dep)
liftBase $ sendReloadEvent st
liftBase $ updateDeploymentInfo (name dep) st
(ec, out, err) <- liftBase $ createDeployment dep st
t2 <- liftBase $ now
let
elTime = elapsedTime t2 t1
liftIO . withResource pgPool $ \conn ->
-- calling it directly now is fine since there is no previous status.
createDeploymentLog conn dep "create" ec elTime out err
liftBase $ sendReloadEvent st
liftBase $ handleExitCode ec
pure Success
-- | Updates deployment info.
@ -652,7 +656,7 @@ archiveH dName = do
, "--name", coerce dName
]
cmd = coerce $ archiveCommand st
runDeploymentBgWorker (Just ArchivePending) dName $ do
runDeploymentBgWorker (Just ArchivePending) dName (pure ()) $ \() -> do
log $ "call " <> unwords (cmd : args)
(ec, out, err) <- runCommand (unpack cmd) (unpack <$> args)
t2 <- liftBase now
@ -685,7 +689,7 @@ updateH dName dUpdate = do
dId <- selectDeploymentId pgPool dName
failIfImageNotFound dName dTag
failIfGracefulShutdownActivated
runDeploymentBgWorker (Just UpdatePending) dName $ do
runDeploymentBgWorker (Just UpdatePending) dName (pure ()) $ \() -> do
(appOvs, depOvs) <- liftBase . withResource pgPool $ \conn ->
withTransaction conn $ do
deleteOldOverrides conn dId oldAppOvs oldDepOvs
@ -930,7 +934,7 @@ cleanupH :: DeploymentName -> AppM CommandResponse
cleanupH dName = do
failIfGracefulShutdownActivated
st <- ask
runDeploymentBgWorker Nothing dName $ liftBase $ cleanupDeployment dName st
runDeploymentBgWorker Nothing dName (pure ()) $ \ () -> liftBase $ cleanupDeployment dName st
pure Success
-- | Helper to cleanup deployment.
@ -1005,7 +1009,7 @@ cleanArchiveH = do
withResource pgPool $ \conn -> query conn q (In archivedStatuses, archRetention)
runBgWorker . void $
for retrieved $ \(Only dName) ->
runDeploymentBgWorker Nothing dName $ liftBase $ cleanupDeployment dName st
runDeploymentBgWorker Nothing dName (pure ()) $ \() -> liftBase $ cleanupDeployment dName st
pure Success
@ -1018,7 +1022,7 @@ restoreH dName = do
dep <- selectDeployment dName
failIfImageNotFound (name dep) (tag dep)
failIfGracefulShutdownActivated
runDeploymentBgWorker (Just CreatePending) dName $ do
runDeploymentBgWorker (Just CreatePending) dName (pure ()) $ \() -> do
dep' <- selectDeployment dName
liftBase $ updateDeploymentInfo dName st
(ec, out, err) <- liftBase $ createDeployment dep' st
@ -1316,27 +1320,39 @@ runBgWorker act = void $ L.forkFinally act' cleanup
-- | Same as 'runBgWorker' but also locks the specified deployment.
runDeploymentBgWorker
:: (MonadBaseControl IO m, MonadReader AppState m, MonadError ServerError m)
=> Maybe DeploymentStatus -> DeploymentName -> m () -> m ()
runDeploymentBgWorker newS dName m = do
:: forall m a. (MonadBaseControl IO m, MonadReader AppState m, MonadError ServerError m)
=> Maybe DeploymentStatus
-> DeploymentName
-> m a
-- ^ Think of this as running synchronously.
-- The resulting monad state will be forwarded to the calling context.
-> (a -> m ())
-- ^ The computation to run asynchronously.
-> m ()
runDeploymentBgWorker newS dName pre post = do
st <- ask
(err :: MVar (Either ServerError ())) <- L.newEmptyMVar
mainThread <- L.myThreadId
(err :: MVar (Either ServerError (StM m a))) <- L.newEmptyMVar
runBgWorker $ withLockedDeployment
dName
(L.putMVar err $ Left err409 {errBody = "The deployment is currently being processed."})
( do
let
ok = do
L.putMVar err $ Right ()
L.try (liftBaseWith \runInBase -> runInBase pre) >>= \case
Left (e :: L.SomeException) -> L.throwTo mainThread e
Right x -> do
proceed <- flip L.finally (L.tryPutMVar err (Left err500)) $ case newS of
Just newS' -> do
runExceptT (assertDeploymentTransitionPossibleS dName newS') >>= \case
Left e -> L.putMVar err (Left e) $> False
Right () -> L.putMVar err (Right x) $> True
Nothing -> L.putMVar err (Right x) $> True
liftBase (sendReloadEvent st)
m
case newS of
Just newS' ->
runExceptT (assertDeploymentTransitionPossibleS dName newS') >>= \case
Left e -> L.putMVar err $ Left e
Right () -> ok
Nothing -> ok
when proceed $ restoreM x >>= post
)
L.readMVar err >>= \case
Left e -> throwError e
Right () -> return ()
Right (stm :: StM m a) -> do
-- This might be a bad idea in general, but with just ReaderT and ExceptT
-- it should be fine.
_ :: a <- restoreM stm
return ()