king: Misc Small Cleanup.

This commit is contained in:
~siprel 2020-05-28 18:21:43 +00:00
parent ca13d3f79b
commit d8f90ead07
3 changed files with 59 additions and 109 deletions

View File

@ -31,7 +31,7 @@ import Data.Conduit
import Data.Text (append) import Data.Text (append)
import System.Posix.Files (ownerModes, setFileMode) import System.Posix.Files (ownerModes, setFileMode)
import Urbit.King.App (HasConfigDir(..), HasStderrLogFunc(..)) import Urbit.King.App (HasConfigDir(..), HasStderrLogFunc(..))
import Urbit.Time (Wen) -- ort Urbit.Time (Wen)
import Urbit.Vere.Ames (ames) import Urbit.Vere.Ames (ames)
import Urbit.Vere.Behn (behn) import Urbit.Vere.Behn (behn)
import Urbit.Vere.Clay (clay) import Urbit.Vere.Clay (clay)
@ -49,7 +49,7 @@ import qualified Urbit.Vere.Term as Term
import qualified Urbit.Vere.Term.API as Term import qualified Urbit.Vere.Term.API as Term
import qualified Urbit.Vere.Term.Demux as Term import qualified Urbit.Vere.Term.Demux as Term
import qualified Urbit.Vere.Term.Render as Term import qualified Urbit.Vere.Term.Render as Term
import qualified Data.Conduit.Combinators as CC -- ort qualified Data.Conduit.Combinators as CC
-------------------------------------------------------------------------------- --------------------------------------------------------------------------------
@ -275,15 +275,18 @@ pier (serf, log) vStderr mStart = do
let stubErrCallback = \_ -> pure () let stubErrCallback = \_ -> pure ()
tExe <- startDrivers >>= router (readTQueue executeQ) let computeConfig = ComputeConfig
tDisk <- runPersist log persistQ (writeTQueue executeQ) { ccOnWork = (,stubErrCallback) <$> readTQueue computeQ
tCpu <- runCompute serf , ccOnKill = takeTMVar shutdownM
((,stubErrCallback) <$> readTQueue computeQ) , ccOnSave = takeTMVar saveM
(takeTMVar saveM) , ccPutResult = writeTQueue persistQ
(takeTMVar shutdownM) , ccShowSpinner = Term.spin muxed
(Term.spin muxed) , ccHideSpinner = Term.stopSpin muxed
(Term.stopSpin muxed) }
(writeTQueue persistQ)
tExe <- startDrivers >>= acquireWorker . router (readTQueue executeQ)
tDisk <- acquireWorkerBound (runPersist log persistQ (writeTQueue executeQ))
tCpu <- acquireWorker (runCompute serf computeConfig)
tSaveSignal <- saveSignalThread saveM tSaveSignal <- saveSignalThread saveM
@ -359,29 +362,24 @@ drivers inst who isFake plan shutdownSTM termSys stderr =
-- Route Effects to Drivers ---------------------------------------------------- -- Route Effects to Drivers ----------------------------------------------------
router :: HasLogFunc e => STM FX -> Drivers e -> RAcquire e (Async ()) router :: HasLogFunc e => STM FX -> Drivers e -> RIO e ()
router waitFx Drivers{..} = router waitFx Drivers {..} = forever $ do
mkRAcquire start cancel fx <- atomically waitFx
where for_ fx $ \ef -> do
start = async $ forever $ do logEffect ef
fx <- atomically waitFx case ef of
for_ fx $ \ef -> do GoodParse (EfVega _ _ ) -> error "TODO"
logEffect ef GoodParse (EfExit _ _ ) -> error "TODO"
case ef of GoodParse (EfVane (VEAmes ef)) -> dAmes ef
GoodParse (EfVega _ _) -> error "TODO" GoodParse (EfVane (VEBehn ef)) -> dBehn ef
GoodParse (EfExit _ _) -> error "TODO" GoodParse (EfVane (VEBoat ef)) -> dSync ef
GoodParse (EfVane (VEAmes ef)) -> dAmes ef GoodParse (EfVane (VEClay ef)) -> dSync ef
GoodParse (EfVane (VEBehn ef)) -> dBehn ef GoodParse (EfVane (VEHttpClient ef)) -> dHttpClient ef
GoodParse (EfVane (VEBoat ef)) -> dSync ef GoodParse (EfVane (VEHttpServer ef)) -> dHttpServer ef
GoodParse (EfVane (VEClay ef)) -> dSync ef GoodParse (EfVane (VENewt ef)) -> dNewt ef
GoodParse (EfVane (VEHttpClient ef)) -> dHttpClient ef GoodParse (EfVane (VESync ef)) -> dSync ef
GoodParse (EfVane (VEHttpServer ef)) -> dHttpServer ef GoodParse (EfVane (VETerm ef)) -> dTerm ef
GoodParse (EfVane (VENewt ef)) -> dNewt ef FailParse n -> logError $ display $ pack @Text (ppShow n)
GoodParse (EfVane (VESync ef)) -> dSync ef
GoodParse (EfVane (VETerm ef)) -> dTerm ef
FailParse n -> logError
$ display
$ pack @Text (ppShow n)
-- Compute Thread -------------------------------------------------------------- -- Compute Thread --------------------------------------------------------------
@ -407,29 +405,6 @@ data ComputeRequest
| CRSave () | CRSave ()
| CRShutdown () | CRShutdown ()
runCompute
:: forall e
. HasLogFunc e
=> Serf
-> STM (Ev, Serf.RunError -> IO ())
-> STM ()
-> STM ()
-> (Maybe Text -> STM ())
-> STM ()
-> ((Fact, FX) -> STM ())
-> RAcquire e (Async ())
runCompute serf getEvent getSaveSignal getShutdownSignal showSpinner hideSpinner putResult = do
acquireWorker (newRunCompute serf config)
where
config = ComputeConfig
{ ccOnWork = getEvent
, ccOnKill = getShutdownSignal
, ccOnSave = getSaveSignal
, ccPutResult = putResult
, ccShowSpinner = showSpinner
, ccHideSpinner = hideSpinner
}
{- {-
TODO Pack and Peek TODO Pack and Peek
-} -}
@ -460,17 +435,6 @@ ipcSource onEvent onSave onKill = loop
yield (Serf.RunWork ev cb) yield (Serf.RunWork ev cb)
loop loop
fromRightErr :: Either a b -> IO b
fromRightErr (Left l) = error "unexpected Left value"
fromRightErr (Right r) = pure r
data Fact = Fact
{ factEve :: EventId
, factMug :: Mug
, factWen :: Wen
, factNon :: Noun
}
data ComputeConfig = ComputeConfig data ComputeConfig = ComputeConfig
{ ccOnWork :: STM (Ev, Serf.RunError -> IO ()) { ccOnWork :: STM (Ev, Serf.RunError -> IO ())
, ccOnKill :: STM () , ccOnKill :: STM ()
@ -480,10 +444,10 @@ data ComputeConfig = ComputeConfig
, ccHideSpinner :: STM () , ccHideSpinner :: STM ()
} }
newRunCompute runCompute
:: forall e . HasLogFunc e => Serf.Serf -> ComputeConfig -> RIO e () :: forall e . HasLogFunc e => Serf.Serf -> ComputeConfig -> RIO e ()
newRunCompute serf ComputeConfig {..} = do runCompute serf ComputeConfig {..} = do
logTrace "newRunCompute" logTrace "runCompute"
runConduit runConduit
$ ipcSource ccOnWork ccOnSave ccOnKill $ ipcSource ccOnWork ccOnSave ccOnKill
.| Serf.running serf (atomically . onStatusChange) .| Serf.running serf (atomically . onStatusChange)
@ -493,7 +457,7 @@ newRunCompute serf ComputeConfig {..} = do
sendResults = await >>= \case sendResults = await >>= \case
Nothing -> pure () Nothing -> pure ()
Just (Serf.RunOutput e m w nounEv fx) -> do Just (Serf.RunOutput e m w nounEv fx) -> do
lift $ logTrace "newRunCompute: Got play result" lift $ logTrace "runCompute: Got play result"
atomically $ ccPutResult (Fact e m w nounEv, GoodParse <$> fx) -- TODO GoodParse atomically $ ccPutResult (Fact e m w nounEv, GoodParse <$> fx) -- TODO GoodParse
sendResults sendResults
@ -521,19 +485,17 @@ runPersist
=> EventLog => EventLog
-> TQueue (Fact, FX) -> TQueue (Fact, FX)
-> (FX -> STM ()) -> (FX -> STM ())
-> RAcquire e (Async ()) -> RIO e ()
runPersist log inpQ out = mkRAcquire runThread cancel runPersist log inpQ out = do
where dryRun <- view dryRunL
runThread :: RIO e (Async ()) forever $ do
runThread = asyncBound $ do writs <- atomically getBatchFromQueue
dryRun <- view dryRunL events <- validateFactsAndGetBytes (fst <$> toNullable writs)
forever $ do unless dryRun (Log.appendEvents log events)
writs <- atomically getBatchFromQueue atomically $ for_ writs $ \(_, fx) -> do
events <- validateFactsAndGetBytes (fst <$> toNullable writs) out fx
unless dryRun (Log.appendEvents log events)
atomically $ for_ writs $ \(_, fx) -> do
out fx
where
validateFactsAndGetBytes :: [Fact] -> RIO e (Vector ByteString) validateFactsAndGetBytes :: [Fact] -> RIO e (Vector ByteString)
validateFactsAndGetBytes facts = do validateFactsAndGetBytes facts = do
expect <- Log.nextEv log expect <- Log.nextEv log

View File

@ -95,6 +95,13 @@ data IODriver = IODriver
, startDriver :: (Ev -> STM ()) -> IO (Async (), Perform) , startDriver :: (Ev -> STM ()) -> IO (Async (), Perform)
} }
data Fact = Fact
{ factEve :: EventId
, factMug :: Mug
, factWen :: Wen
, factNon :: Noun
}
-- Instances ------------------------------------------------------------------- -- Instances -------------------------------------------------------------------

View File

@ -1,46 +1,27 @@
{-# OPTIONS_GHC -Wwarn #-}
{-| {-|
Serf Interface High-Level Serf Interface
TODO: `recvLen` is not big-endian safe.
-} -}
module Urbit.Vere.Serf module Urbit.Vere.Serf
( module Urbit.Vere.Serf.IPC ( module Urbit.Vere.Serf.IPC
, withSerf , withSerf
, execReplay , execReplay
, shutdown
, snapshot
) )
where where
import Urbit.Prelude import Urbit.Prelude
import Data.Conduit import Data.Conduit
import System.Process -- ort System.ProgressBar
import System.ProgressBar -- ort Urbit.Arvo
import Urbit.Arvo
import Urbit.Vere.Pier.Types import Urbit.Vere.Pier.Types
import Urbit.Vere.Serf.IPC import Urbit.Vere.Serf.IPC
import System.Posix.Signals
import Data.Bits (setBit) -- ort Urbit.King.App (HasStderrLogFunc(..))
import Data.ByteString (hGet)
import Data.ByteString.Unsafe (unsafeUseAsCString)
import Foreign.Marshal.Alloc (alloca)
import Foreign.Ptr (castPtr)
import Foreign.Storable (peek, poke)
import System.Exit (ExitCode)
import Urbit.King.App (HasStderrLogFunc(..))
import qualified Data.ByteString.Unsafe as BS
import qualified Data.Conduit.Combinators as CC import qualified Data.Conduit.Combinators as CC
import qualified Data.Text as T -- ort qualified Urbit.Ob as Ob
import qualified System.IO as IO -- ort qualified Urbit.Time as Time
import qualified System.IO.Error as IO
import qualified Urbit.Ob as Ob
import qualified Urbit.Time as Time
import qualified Urbit.Vere.Log as Log import qualified Urbit.Vere.Log as Log