Merge remote-tracking branch 'origin/bs/king-logging' into eg/uterm

This commit is contained in:
Elliot Glaysher 2019-08-29 10:30:53 -07:00
commit cdcdc6a59e
15 changed files with 736 additions and 470 deletions

View File

@ -103,12 +103,11 @@
- `Trace`: TODO What does this do?
-}
module Main where
module Main (main) where
import ClassyPrelude
import UrbitPrelude
import Options.Applicative
import Options.Applicative.Help.Pretty
import Data.RAcquire
import Arvo
import Control.Exception hiding (evaluate, throwIO)
@ -120,9 +119,10 @@ import Vere.Pier
import Vere.Pier.Types
import Vere.Serf
import Control.Concurrent (runInBoundThread, threadDelay)
import Control.Concurrent (runInBoundThread)
import Control.Lens ((&))
import System.Directory (doesFileExist, removeFile)
import System.Directory (getHomeDirectory, createDirectoryIfMissing)
import System.Environment (getProgName)
import Text.Show.Pretty (pPrint)
import Urbit.Time (Wen)
@ -135,92 +135,151 @@ import qualified Vere.Serf as Serf
--------------------------------------------------------------------------------
class HasAppName env where
appNameL :: Lens' env Utf8Builder
data App = App
{ _appLogFunc :: !LogFunc
, _appName :: !Utf8Builder
}
makeLenses ''App
instance HasLogFunc App where
logFuncL = appLogFunc
instance HasAppName App where
appNameL = appName
runApp :: RIO App a -> IO a
runApp inner = do
home <- getHomeDirectory
let logDir = home <> "/log"
createDirectoryIfMissing True logDir
withTempFile logDir "king-" $ \tmpFile hFile -> do
hSetBuffering hFile LineBuffering
logOptions <- logOptionsHandle hFile True
<&> setLogUseTime True
<&> setLogUseLoc False
withLogFunc logOptions $ \logFunc -> do
let app = App { _appLogFunc = logFunc
, _appName = "Alice"
}
runRIO app inner
--------------------------------------------------------------------------------
example :: IO ()
example = runApp sayHello
sayHello :: RIO App ()
sayHello = do
name <- view appName
logDebug $ "Hello, " <> name
logInfo $ "Hello, " <> name
logWarn $ "Hello, " <> name
logError $ "Hello, " <> name
--------------------------------------------------------------------------------
zod :: Ship
zod = 0
--------------------------------------------------------------------------------
removeFileIfExists :: FilePath -> IO ()
removeFileIfExists :: HasLogFunc env => FilePath -> RIO env ()
removeFileIfExists pax = do
exists <- doesFileExist pax
exists <- io $ doesFileExist pax
when exists $ do
removeFile pax
catchAny :: IO a -> (SomeException -> IO a) -> IO a
catchAny = Control.Exception.catch
io $ removeFile pax
--------------------------------------------------------------------------------
wipeSnapshot :: FilePath -> IO ()
wipeSnapshot :: HasLogFunc env => FilePath -> RIO env ()
wipeSnapshot shipPath = do
putStrLn "wipeSnapshot"
removeFileIfExists (shipPath <> "/.urb/chk/north.bin")
removeFileIfExists (shipPath <> "/.urb/chk/south.bin")
print (shipPath <> "/.urb/chk/north.bin")
print (shipPath <> "/.urb/chk/south.bin")
putStrLn "SNAPSHOT WIPED"
logTrace "wipeSnapshot"
logDebug $ display $ pack @Text ("Wiping " <> north)
logDebug $ display $ pack @Text ("Wiping " <> south)
removeFileIfExists north
removeFileIfExists south
where
north = shipPath <> "/.urb/chk/north.bin"
south = shipPath <> "/.urb/chk/south.bin"
tryBootFromPill :: FilePath -> FilePath -> Ship -> IO ()
--------------------------------------------------------------------------------
tryBootFromPill :: HasLogFunc e => FilePath -> FilePath -> Ship -> RIO e ()
tryBootFromPill pillPath shipPath ship = do
wipeSnapshot shipPath
with (Pier.booted pillPath shipPath [] ship) $ \(serf, log, ss) -> do
print "lul"
print ss
threadDelay 500000
shutdown serf 0 >>= print
putStrLn "[tryBootFromPill] Booted!"
rwith (Pier.booted pillPath shipPath [] ship) $ \(serf, log, ss) -> do
logTrace "Booting"
logTrace $ displayShow ss
io $ threadDelay 500000
ss <- shutdown serf 0
logTrace $ displayShow ss
logTrace "Booted!"
runAcquire :: (MonadUnliftIO m, MonadIO m)
=> Acquire a -> m a
runAcquire act = with act pure
tryPlayShip :: FilePath -> IO ()
runRAcquire :: (MonadUnliftIO (m e), MonadIO (m e), MonadReader e (m e))
=> RAcquire e a -> m e a
runRAcquire act = rwith act pure
tryPlayShip :: HasLogFunc e => FilePath -> RIO e ()
tryPlayShip shipPath = do
runAcquire $ do
putStrLn "RESUMING SHIP"
runRAcquire $ do
rio $ logTrace "RESUMING SHIP"
sls <- Pier.resumed shipPath []
putStrLn "SHIP RESUMED"
rio $ logTrace "SHIP RESUMED"
Pier.pier shipPath Nothing sls
tryResume :: FilePath -> IO ()
tryResume :: HasLogFunc e => FilePath -> RIO e ()
tryResume shipPath = do
with (Pier.resumed shipPath []) $ \(serf, log, ss) -> do
print ss
rwith (Pier.resumed shipPath []) $ \(serf, log, ss) -> do
logTrace (displayShow ss)
threadDelay 500000
shutdown serf 0 >>= print
putStrLn "[tryResume] Resumed!"
ss <- shutdown serf 0
logTrace (displayShow ss)
logTrace "Resumed!"
tryFullReplay :: FilePath -> IO ()
tryFullReplay :: HasLogFunc e => FilePath -> RIO e ()
tryFullReplay shipPath = do
wipeSnapshot shipPath
tryResume shipPath
--------------------------------------------------------------------------------
checkEvs :: FilePath -> Word64 -> Word64 -> IO ()
checkEvs :: forall e. HasLogFunc e => FilePath -> Word64 -> Word64 -> RIO e ()
checkEvs pierPath first last = do
with (Log.existing logPath) $ \log -> do
rwith (Log.existing logPath) $ \log -> do
let ident = Log.identity log
print ident
logTrace (displayShow ident)
runConduit $ Log.streamEvents log first
.| showEvents first (fromIntegral $ lifecycleLen ident)
where
logPath :: FilePath
logPath = pierPath <> "/.urb/log"
showEvents :: EventId -> EventId -> ConduitT ByteString Void IO ()
showEvents :: EventId -> EventId -> ConduitT ByteString Void (RIO e) ()
showEvents eId _ | eId > last = pure ()
showEvents eId cycle =
await >>= \case
Nothing -> print "Everything checks out."
Nothing -> lift $ logTrace "Everything checks out."
Just bs -> do
liftIO $ do
n <- cueBSExn bs
lift $ do
n <- io $ cueBSExn bs
when (eId > cycle) $ do
(mug, wen, evNoun) <- unpackJob n
fromNounErr evNoun & either print pure
fromNounErr evNoun &
either (logError . displayShow) pure
showEvents (succ eId) cycle
unpackJob :: Noun -> IO (Mug, Wen, Noun)
unpackJob n = fromNounExn n
unpackJob :: Noun -> RIO e (Mug, Wen, Noun)
unpackJob = io . fromNounExn
--------------------------------------------------------------------------------
@ -229,49 +288,36 @@ checkEvs pierPath first last = do
so this should never actually be created. We just do this to avoid
letting the serf use an existing snapshot.
-}
collectAllFx :: FilePath -> IO ()
collectAllFx :: e. HasLogFunc e => FilePath -> RIO e ()
collectAllFx top = do
putStrLn (pack top)
with collectedFX $ \() ->
putStrLn "[collectAllFx] Done collecting effects!"
logTrace $ display $ pack @Text top
rwith collectedFX $ \() ->
logTrace "Done collecting effects!"
where
tmpDir :: FilePath
tmpDir = top <> "/.tmpdir"
collectedFX :: Acquire ()
collectedFX :: RAcquire e ()
collectedFX = do
log <- Log.existing (top <> "/.urb/log")
serf <- Serf.run (Serf.Config tmpDir serfFlags)
liftIO (Serf.collectFX serf log)
rio $ Serf.collectFX serf log
serfFlags :: Serf.Flags
serfFlags = [Serf.Hashless, Serf.DryRun]
--------------------------------------------------------------------------------
tryDoStuff :: FilePath -> IO ()
tryDoStuff shipPath = runInBoundThread $ do
let pillPath = "/home/benjamin/r/urbit/bin/solid.pill"
ship = zod
-- tryResume shipPath
tryPlayShip shipPath
-- tryFullReplay shipPath
pure ()
--------------------------------------------------------------------------------
{-
Interesting
-}
testPill :: FilePath -> Bool -> Bool -> IO ()
testPill :: HasLogFunc e => FilePath -> Bool -> Bool -> RIO e ()
testPill pax showPil showSeq = do
putStrLn "Reading pill file."
pillBytes <- readFile pax
putStrLn "Cueing pill file."
pillNoun <- cueBS pillBytes & either throwIO pure
pillNoun <- io $ cueBS pillBytes & either throwIO pure
putStrLn "Parsing pill file."
pill <- fromNounErr pillNoun & either (throwIO . uncurry ParseErr) pure
@ -289,13 +335,14 @@ testPill pax showPil showSeq = do
when showPil $ do
putStrLn "\n\n== Pill ==\n"
pPrint pill
io $ pPrint pill
when showSeq $ do
putStrLn "\n\n== Boot Sequence ==\n"
pPrint bootSeq
io $ pPrint bootSeq
validateNounVal :: (Eq a, ToNoun a, FromNoun a) => a -> IO ByteString
validateNounVal :: (HasLogFunc e, Eq a, ToNoun a, FromNoun a)
=> a -> RIO e ByteString
validateNounVal inpVal = do
putStrLn " jam"
inpByt <- evaluate $ jamBS $ toNoun inpVal
@ -324,17 +371,17 @@ validateNounVal inpVal = do
--------------------------------------------------------------------------------
newShip :: CLI.New -> CLI.Opts -> IO ()
newShip :: HasLogFunc e => CLI.New -> CLI.Opts -> RIO e ()
newShip CLI.New{..} _ = do
tryBootFromPill nPillPath pierPath (Ship 0)
where
pierPath = fromMaybe ("./" <> unpack nShipAddr) nPierPath
runShip :: CLI.Run -> CLI.Opts -> IO ()
runShip :: HasLogFunc e => CLI.Run -> CLI.Opts -> RIO e ()
runShip (CLI.Run pierPath) _ = tryPlayShip pierPath
main :: IO ()
main = CLI.parseArgs >>= \case
main = CLI.parseArgs >>= runApp . \case
CLI.CmdRun r o -> runShip r o
CLI.CmdNew n o -> newShip n o
CLI.CmdBug (CLI.CollectAllFX pax) -> collectAllFx pax
@ -342,23 +389,20 @@ main = CLI.parseArgs >>= \case
CLI.CmdBug (CLI.ValidateEvents pax f l) -> checkEvs pax f l
CLI.CmdBug (CLI.ValidateFX pax f l) -> checkFx pax f l
-- tryParseFX "/home/benjamin/zod-fx" 1 100000000
-- tryParseFX "/home/benjamin/testnet-zod-fx" 1 100000000
validatePill :: FilePath -> IO ()
validatePill = const (pure ())
--------------------------------------------------------------------------------
checkFx :: FilePath -> Word64 -> Word64 -> IO ()
checkFx :: HasLogFunc e
=> FilePath -> Word64 -> Word64 -> RIO e ()
checkFx pierPath first last =
with (Log.existing logPath) $ \log ->
rwith (Log.existing logPath) $ \log ->
runConduit $ streamFX log first last
.| tryParseFXStream
where
logPath = pierPath <> "/.urb/log"
streamFX :: Log.EventLog -> Word64 -> Word64 -> ConduitT () ByteString IO ()
streamFX :: HasLogFunc e
=> Log.EventLog -> Word64 -> Word64
-> ConduitT () ByteString (RIO e) ()
streamFX log first last = do
Log.streamEffectsRows log first .| loop
where
@ -366,34 +410,18 @@ streamFX log first last = do
Just (eId, bs) | eId > last -> pure ()
Just (eId, bs) -> yield bs >> loop
tryParseFXStream :: ConduitT ByteString Void IO ()
tryParseFXStream = loop 0 (mempty :: Set (Text, Noun))
tryParseFXStream :: HasLogFunc e => ConduitT ByteString Void (RIO e) ()
tryParseFXStream = loop
where
loop 1 pax = for_ (setToList pax) print
loop errors pax =
await >>= \case
Nothing -> for_ (setToList pax) $ \(t,n) ->
putStrLn (t <> ": " <> tshow n)
loop = await >>= \case
Nothing -> pure ()
Just bs -> do
n <- liftIO (cueBSExn bs)
fromNounErr n & \case
Left err -> print err >> loop (errors + 1) pax
Right [] -> loop errors pax
Right (fx :: FX) -> do
-- pax <- pure $ Set.union pax
-- $ setFromList
-- $ fx <&> \(Effect p v) -> (getTag v, toNoun p)
loop errors pax
fromNounErr n & either (logError . displayShow) pure
loop
{-
getTag :: Effect -> Text
getTag fx =
let n = toNoun fx
in case n of
A _ -> maybe "ERR" unCord (fromNoun n)
C h _ -> maybe "ERR" unCord (fromNoun h)
tryCopyLog :: IO ()
tryCopyLog = do
let logPath = "/Users/erg/src/urbit/zod/.urb/falselog/"

View File

@ -0,0 +1,136 @@
module Data.RAcquire where
{-
( RAcquire (..)
, Allocated (..)
, with
, mkRAcquire
, ReleaseType (..)
, mkRAcquireType
) where
-}
import Prelude
import qualified Control.Exception as E
import qualified Control.Monad.Catch as C ()
import qualified Data.Acquire.Internal as Act
import Control.Applicative (Applicative(..))
import Control.Monad (ap, liftM)
import Control.Monad.IO.Unlift (MonadIO(..), MonadUnliftIO, withRunInIO)
import Data.Typeable (Typeable)
import Control.Monad.Reader
import RIO (RIO, runRIO)
--------------------------------------------------------------------------------
data ReleaseType
= ReleaseEarly
| ReleaseNormal
| ReleaseException
deriving (Show, Read, Eq, Ord, Enum, Bounded, Typeable)
data Allocated e a
= Allocated !a !(ReleaseType -> RIO e ())
newtype RAcquire e a
= RAcquire ((forall b. RIO e b -> RIO e b) -> RIO e (Allocated e a))
deriving Typeable
--------------------------------------------------------------------------------
class MonadRIO m where
liftRIO :: RIO e a -> m e a
instance MonadRIO RIO where
liftRIO = id
class MonadAcquire m where
liftAcquire :: Act.Acquire a -> m a
--------------------------------------------------------------------------------
instance Functor (RAcquire e) where
fmap = liftM
instance Applicative (RAcquire e) where
pure a = RAcquire (\_ -> return (Allocated a (const $ return ())))
(<*>) = ap
instance Monad (RAcquire e) where
return = pure
RAcquire f >>= g' = RAcquire $ \restore -> do
env <- ask
Allocated x free1 <- f restore
let RAcquire g = g' x
Allocated y free2 <- liftIO $ E.onException
(runRIO env $ g restore)
(runRIO env $ free1 ReleaseException)
return $! Allocated y $ \rt ->
liftIO $ E.finally (runRIO env $ free2 rt)
(runRIO env $ free1 rt)
instance MonadReader e (RAcquire e) where
ask = liftRIO ask
local mod (RAcquire f) = RAcquire $ \restore -> local mod (f restore)
--------------------------------------------------------------------------------
instance MonadRIO RAcquire where
liftRIO f = RAcquire $ \restore -> do
x <- restore f
return $! Allocated x (const $ return ())
instance MonadIO (RAcquire e) where
liftIO = liftRIO . liftIO
unTransRIO :: e -> (RIO e a -> RIO e a) -> IO a -> IO a
unTransRIO env trans act = runRIO env $ trans $ liftIO act
instance MonadAcquire (RAcquire e) where
liftAcquire (Act.Acquire f) = do
env <- liftRIO ask
RAcquire $ \restore -> do
fmap fixAllo $ liftIO $ f $ unTransRIO env restore
where
fixAllo (Act.Allocated x y) = Allocated x $ fmap liftIO (y . fixTy)
fixTy = \case
ReleaseEarly -> Act.ReleaseEarly
ReleaseNormal -> Act.ReleaseNormal
ReleaseException -> Act.ReleaseException
--------------------------------------------------------------------------------
mkRAcquire :: RIO e a
-> (a -> RIO e ())
-> RAcquire e a
mkRAcquire create free = RAcquire $ \restore -> do
x <- restore create
return $! Allocated x (const $ free x)
mkRAcquireType
:: RIO e a -- ^ acquire the resource
-> (a -> ReleaseType -> RIO e ()) -- ^ free the resource
-> RAcquire e a
mkRAcquireType create free = RAcquire $ \restore -> do
x <- restore create
return $! Allocated x (free x)
transRIO :: e -> (IO a -> IO a) -> RIO e a -> RIO e a
transRIO env trans act = liftIO $ trans $ runRIO env act
rwith :: (MonadUnliftIO (m e), MonadReader e (m e))
=> RAcquire e a
-> (a -> m e b)
-> m e b
rwith (RAcquire f) g = do
env <- ask
withRunInIO $ \run -> E.mask $ \restore -> do
Allocated x free <- runRIO env $ f $ transRIO env restore
res <- E.onException (restore $ run $ g x)
(runRIO env $ free ReleaseException)
runRIO env $ free ReleaseNormal
return res

View File

@ -178,7 +178,7 @@ instance Show BadNoun where
instance Exception BadNoun where
fromNounExn :: FromNoun a => Noun -> IO a
fromNounExn :: MonadIO m => FromNoun a => Noun -> m a
fromNounExn n = runParser (parseNoun n) [] onFail onSuccess
where
onFail p m = throwIO (BadNoun p m)

View File

@ -26,7 +26,7 @@ import qualified Data.Vector.Primitive as VP
cueBS :: ByteString -> Either DecodeErr Noun
cueBS = doGet dNoun
cueBSExn :: ByteString -> IO Noun
cueBSExn :: MonadIO m => ByteString -> m Noun
cueBSExn bs =
cueBS bs & \case
Left e -> throwIO e
@ -35,7 +35,7 @@ cueBSExn bs =
cue :: Atom -> Either DecodeErr Noun
cue = cueBS . view atomBytes
cueExn :: Atom -> IO Noun
cueExn :: MonadIO m => Atom -> m Noun
cueExn atm = cueBSExn (atm ^. atomBytes)

View File

@ -33,8 +33,8 @@ start timer@(Timer vSt man) time cb = do
stop timer
now <- Sys.getSystemTime
let sleep = sysTimeGapMicroSecs now time
print (now, time, "->", sleep)
if (sleep <= 0) then (print "ug" >> fire) else do
-- print (now, time, "->", sleep)
if (sleep <= 0) then fire else do
key <- Ev.registerTimeout man sleep fire
atomicWriteIORef vSt $! Just key

View File

@ -3,10 +3,14 @@ module UrbitPrelude
, module Control.Arrow
, module Control.Lens
, module Data.Acquire
, module Data.RAcquire
, module Data.Void
, module Noun
, module Text.Show.Pretty
, module Text.Printf
, module RIO
, io, rio
, logTrace
) where
import ClassyPrelude
@ -16,7 +20,36 @@ import Control.Lens hiding (Index, cons, index, snoc, uncons, unsnoc, (<.>),
(<|))
import Control.Arrow ((<<<), (>>>))
import Data.RAcquire (RAcquire, mkRAcquire, rwith)
import Data.RAcquire (MonadRIO(..), MonadAcquire(..))
import Data.Acquire (Acquire, mkAcquire, with)
import Data.Void (Void, absurd)
import Text.Printf (printf)
import Text.Show.Pretty (pPrint, ppShow)
import RIO (RIO, runRIO)
import RIO (Utf8Builder, display, displayShow)
import RIO (threadDelay)
import RIO ( HasLogFunc
, LogFunc
, logError
, logInfo
, logWarn
, logDebug
, logOther
, logFuncL
, logOptionsHandle
, withLogFunc
, setLogUseTime
, setLogUseLoc
)
io :: MonadIO m => IO a -> m a
io = liftIO
rio :: MonadRIO m => RIO e a -> m e a
rio = liftRIO
logTrace :: HasLogFunc e => Utf8Builder -> RIO e ()
logTrace = logOther "trace"

View File

@ -7,8 +7,6 @@ import Network.Socket hiding (recvFrom, sendTo)
import Network.Socket.ByteString
import Vere.Pier.Types
import Control.Concurrent (threadDelay)
import qualified Urbit.Time as Time
@ -92,17 +90,17 @@ _turfText = intercalate "." . reverse . fmap unCord . unTurf
TODO verify that the KingIds match on effects.
-}
ames :: KingId -> Ship -> Maybe Port -> QueueEv
-> ([Ev], Acquire (EffCb NewtEf))
-> ([Ev], Acquire (EffCb e NewtEf))
ames inst who mPort enqueueEv =
(initialEvents, runAmes)
where
initialEvents :: [Ev]
initialEvents = [barnEv inst]
runAmes :: Acquire (EffCb NewtEf)
runAmes :: Acquire (EffCb e NewtEf)
runAmes = do
drv <- mkAcquire start stop
pure (handleEffect drv)
pure (io . handleEffect drv)
start :: IO AmesDrv
start = do

View File

@ -21,19 +21,19 @@ wakeEv = EvBlip $ BlipEvBehn $ BehnEvWake () ()
sysTime = view Time.systemTime
behn :: KingId -> QueueEv -> ([Ev], Acquire (EffCb BehnEf))
behn :: KingId -> QueueEv -> ([Ev], Acquire (EffCb e BehnEf))
behn king enqueueEv =
(initialEvents, runBehn)
where
initialEvents = [bornEv king]
runBehn :: Acquire (EffCb BehnEf)
runBehn :: Acquire (EffCb e BehnEf)
runBehn = do
tim <- mkAcquire Timer.init Timer.stop
pure (handleEf tim)
handleEf :: Timer -> BehnEf -> IO ()
handleEf b = \case
handleEf :: Timer -> BehnEf -> RIO e ()
handleEf b = io . \case
BehnEfVoid v -> absurd v
BehnEfDoze (i, ()) mWen -> do
when (i == king) (doze b mWen)

View File

@ -119,36 +119,37 @@ reorgHttpEvent = \case
- Keeps the MVar lock until the restart process finishes.
-}
restartService :: forall s
. MVar (Maybe s)
-> IO s
-> (s -> IO ())
-> IO (Either SomeException s)
restartService :: e s. HasLogFunc e
=> MVar (Maybe s)
-> RIO e s
-> (s -> RIO e ())
-> RIO e (Either SomeException s)
restartService vServ sstart kkill = do
putStrLn "restartService"
logDebug "restartService"
modifyMVar vServ $ \case
Nothing -> doStart
Just sv -> doRestart sv
where
doRestart :: s -> IO (Maybe s, Either SomeException s)
doRestart :: s -> RIO e (Maybe s, Either SomeException s)
doRestart serv = do
putStrLn "doStart"
logDebug "doStart"
try (kkill serv) >>= \case
Left exn -> pure (Nothing, Left exn)
Right () -> doStart
doStart :: IO (Maybe s, Either SomeException s)
doStart :: RIO e (Maybe s, Either SomeException s)
doStart = do
putStrLn "doStart"
logDebug "doStart"
try sstart <&> \case
Right s -> (Just s, Right s)
Left exn -> (Nothing, Left exn)
stopService :: MVar (Maybe s)
-> (s -> IO ())
-> IO (Either SomeException ())
stopService :: HasLogFunc e
=> MVar (Maybe s)
-> (s -> RIO e ())
-> RIO e (Either SomeException ())
stopService vServ kkill = do
putStrLn "stopService"
logDebug "stopService"
modifyMVar vServ $ \case
Nothing -> pure (Nothing, Right ())
Just sv -> do res <- try (kkill sv)
@ -186,10 +187,10 @@ newLiveReq var = do
-- Ports File ------------------------------------------------------------------
removePortsFile :: FilePath -> IO ()
removePortsFile :: FilePath -> RIO e ()
removePortsFile pax =
doesFileExist pax >>= \case
True -> removeFile pax
io (doesFileExist pax) >>= \case
True -> io $ removeFile pax
False -> pure ()
portsFileText :: Ports -> Text
@ -200,7 +201,7 @@ portsFileText Ports{..} =
, Just (tshow (unPort pLoop) <> " insecure loopback")
]
writePortsFile :: FilePath -> Ports -> IO ()
writePortsFile :: FilePath -> Ports -> RIO e ()
writePortsFile f = writeFile f . encodeUtf8 . portsFileText
@ -224,12 +225,12 @@ cookMeth = H.parseMethod . W.requestMethod >>> \case
reqIdCord :: ReqId -> Cord
reqIdCord = Cord . tshow
reqBody :: W.Request -> IO (Maybe File)
reqBody :: W.Request -> RIO e (Maybe File)
reqBody req = do
bodyLbs <- W.strictRequestBody req
if length bodyLbs == 0
then pure $ Nothing
else pure $ Just $ File $ Octs (toStrict bodyLbs)
bodyLbs <- io $ W.strictRequestBody req
pure $ if length bodyLbs == 0
then Nothing
else Just $ File $ Octs (toStrict bodyLbs)
reqAddr :: W.Request -> Address
reqAddr = W.remoteHost >>> \case
@ -295,7 +296,7 @@ data Req
- If %bloc before %head, collect it and wait for %head.
- If %done before %head, ignore all chunks and produce Nothing.
-}
getReq :: TQueue RespAction -> IO Req
getReq :: TQueue RespAction -> RIO e Req
getReq tmv = go []
where
go çunks = atomically (readTQueue tmv) >>= \case
@ -309,12 +310,15 @@ getReq tmv = go []
- Yield the data from %bloc action.
- Close the stream when we hit a %done action.
-}
streamBlocks :: [File] -> TQueue RespAction -> ConduitT () (Flush Builder) IO ()
streamBlocks init tmv =
streamBlocks :: HasLogFunc e
=> e -> [File] -> TQueue RespAction
-> ConduitT () (Flush Builder) IO ()
streamBlocks env init tmv =
for_ init yieldÇunk >> go
where
yieldFlush = \x -> yield (Chunk x) >> yield Flush
logDupHead = putStrLn "Multiple %head actions on one request"
logDupHead = runRIO env
$ logError "Multiple %head actions on one request"
yieldÇunk = \case
"" -> pure ()
@ -326,17 +330,19 @@ streamBlocks init tmv =
RABloc c -> yieldÇunk c
RADone -> pure ()
sendResponse :: (W.Response -> IO W.ResponseReceived)
sendResponse :: HasLogFunc e
=> (W.Response -> IO W.ResponseReceived)
-> TQueue RespAction
-> IO W.ResponseReceived
-> RIO e W.ResponseReceived
sendResponse cb tmv = do
env <- ask
getReq tmv >>= \case
RNone -> cb $ W.responseLBS (H.mkStatus 444 "No Response") []
RNone -> io $ cb $ W.responseLBS (H.mkStatus 444 "No Response") []
$ ""
RFull h f -> cb $ W.responseLBS (hdrStatus h) (hdrHeaders h)
RFull h f -> io $ cb $ W.responseLBS (hdrStatus h) (hdrHeaders h)
$ fromStrict $ concat $ unOcts . unFile <$> f
RHead h i -> cb $ W.responseSource (hdrStatus h) (hdrHeaders h)
$ streamBlocks i tmv
RHead h i -> io $ cb $ W.responseSource (hdrStatus h) (hdrHeaders h)
$ streamBlocks env i tmv
where
hdrHeaders :: ResponseHeader -> [H.Header]
hdrHeaders = unconvertHeaders . headers
@ -344,15 +350,18 @@ sendResponse cb tmv = do
hdrStatus :: ResponseHeader -> H.Status
hdrStatus = toEnum . fromIntegral . statusCode
liveReq :: TVar LiveReqs -> Acquire (ReqId, TQueue RespAction)
liveReq vLiv = mkAcquire ins del
liveReq :: TVar LiveReqs -> RAcquire e (ReqId, TQueue RespAction)
liveReq vLiv = mkRAcquire ins del
where
ins = atomically (newLiveReq vLiv)
del = atomically . rmLiveReq vLiv . fst
app :: ServId -> TVar LiveReqs -> (Ev -> STM ()) -> WhichServer -> W.Application
app sId liv plan which req respond = do
with (liveReq liv) $ \(reqId, respVar) -> do
app :: HasLogFunc e
=> e -> ServId -> TVar LiveReqs -> (Ev -> STM ()) -> WhichServer
-> W.Application
app env sId liv plan which req respond =
runRIO env $
rwith (liveReq liv) $ \(reqId, respVar) -> do
body <- reqBody req
meth <- maybe (error "bad method") pure (cookMeth req)
@ -364,8 +373,9 @@ app sId liv plan which req respond = do
try (sendResponse respond respVar) >>= \case
Right rr -> pure rr
Left exn -> do atomically $ plan (cancelEv sId reqId)
putStrLn ("Exception during request" <> tshow exn)
Left exn -> do
io $ atomically $ plan (cancelEv sId reqId)
logError $ display ("Exception during request" <> tshow exn)
throwIO (exn :: SomeException)
@ -374,20 +384,21 @@ app sId liv plan which req respond = do
{-
TODO Need to find an open port.
-}
startServ :: FilePath -> HttpServerConf -> (Ev -> STM ())
-> IO Serv
startServ :: HasLogFunc e
=> FilePath -> HttpServerConf -> (Ev -> STM ())
-> RIO e Serv
startServ pierPath conf plan = do
putStrLn "startServ"
logDebug "startServ"
let tls = hscSecure conf <&> \(PEM key, PEM cert) ->
(W.tlsSettingsMemory (cordBytes cert) (cordBytes key))
sId <- ServId . UV . fromIntegral <$> (randomIO :: IO Word32)
sId <- io $ ServId . UV . fromIntegral <$> (randomIO :: IO Word32)
liv <- newTVarIO emptyLiveReqs
(httpPortInt, httpSock) <- W.openFreePort -- 8080 -- 80 if real ship
(httpsPortInt, httpsSock) <- W.openFreePort -- 8443 -- 443 if real ship
(loopPortInt, loopSock) <- W.openFreePort -- 12321 -- ??? if real ship
(httpPortInt, httpSock) <- io $ W.openFreePort -- 8080 -- 80 if real ship
(httpsPortInt, httpsSock) <- io $ W.openFreePort -- 8443 -- 443 if real ship
(loopPortInt, loopSock) <- io $ W.openFreePort -- 12321 -- ??? if real ship
let httpPort = Port (fromIntegral httpPortInt)
httpsPort = Port (fromIntegral httpsPortInt)
@ -399,29 +410,34 @@ startServ pierPath conf plan = do
httpOpts = W.defaultSettings & W.setPort (fromIntegral httpPort)
httpsOpts = W.defaultSettings & W.setPort (fromIntegral httpsPort)
putStrLn "Starting loopback server"
loopTid <- async $ W.runSettingsSocket loopOpts loopSock
$ app sId liv plan Loopback
env <- ask
putStrLn "Starting HTTP server"
httpTid <- async $ W.runSettingsSocket httpOpts httpSock
$ app sId liv plan Insecure
logDebug "Starting loopback server"
loopTid <- async $ io
$ W.runSettingsSocket loopOpts loopSock
$ app env sId liv plan Loopback
putStrLn "Starting HTTPS server"
logDebug "Starting HTTP server"
httpTid <- async $ io
$ W.runSettingsSocket httpOpts httpSock
$ app env sId liv plan Insecure
logDebug "Starting HTTPS server"
httpsTid <- for tls $ \tlsOpts ->
async $ W.runTLSSocket tlsOpts httpsOpts httpsSock
$ app sId liv plan Secure
async $ io
$ W.runTLSSocket tlsOpts httpsOpts httpsSock
$ app env sId liv plan Secure
let por = Ports (tls <&> const httpsPort) httpPort loopPort
fil = pierPath <> "/.http.ports"
print (sId, por, fil)
logDebug $ displayShow (sId, por, fil)
putStrLn "END startServ"
logDebug "Finished started HTTP Servers"
pure $ Serv sId conf loopTid httpTid httpsTid por fil liv
killServ :: Serv -> IO ()
killServ :: HasLogFunc e => Serv -> RIO e ()
killServ Serv{..} = do
cancel sLoopTid
cancel sHttpTid
@ -431,47 +447,50 @@ killServ Serv{..} = do
(void . waitCatch) sHttpTid
traverse_ (void . waitCatch) sHttpsTid
kill :: Drv -> IO ()
kill :: HasLogFunc e => Drv -> RIO e ()
kill (Drv v) = stopService v killServ >>= fromEither
respond :: Drv -> ReqId -> HttpEvent -> IO ()
respond :: HasLogFunc e
=> Drv -> ReqId -> HttpEvent -> RIO e ()
respond (Drv v) reqId ev = do
readMVar v >>= \case
Nothing -> pure ()
Just sv -> do (print (reorgHttpEvent ev))
Just sv -> do logDebug $ displayShow $ reorgHttpEvent ev
for_ (reorgHttpEvent ev) $
atomically . respondToLiveReq (sLiveReqs sv) reqId
serv :: FilePath -> KingId -> QueueEv -> ([Ev], Acquire (EffCb HttpServerEf))
serv :: e. HasLogFunc e
=> FilePath -> KingId -> QueueEv
-> ([Ev], RAcquire e (EffCb e HttpServerEf))
serv pier king plan =
(initialEvents, runHttpServer)
where
initialEvents :: [Ev]
initialEvents = [bornEv king]
runHttpServer :: Acquire (EffCb HttpServerEf)
runHttpServer = handleEf <$> mkAcquire (Drv <$> newMVar Nothing) kill
runHttpServer :: RAcquire e (EffCb e HttpServerEf)
runHttpServer = handleEf <$> mkRAcquire (Drv <$> newMVar Nothing) kill
restart :: Drv -> HttpServerConf -> IO Serv
restart :: Drv -> HttpServerConf -> RIO e Serv
restart (Drv var) conf = do
putStrLn "Restarting http server"
logDebug "Restarting http server"
res <- fromEither =<< restartService var (startServ pier conf plan) killServ
putStrLn "Done restating http server"
logDebug "Done restating http server"
pure res
handleEf :: Drv -> HttpServerEf -> IO ()
handleEf :: Drv -> HttpServerEf -> RIO e ()
handleEf drv = \case
HSESetConfig (i, ()) conf -> do
-- print (i, king)
-- when (i == fromIntegral king) $ do
putStrLn "restarting"
logDebug "restarting"
Serv{..} <- restart drv conf
putStrLn "Enqueue %live"
logDebug "Enqueue %live"
atomically $ plan (liveEv sServId sPorts)
putStrLn "Write ports file"
logDebug "Write ports file"
writePortsFile sPortsFile sPorts
HSEResponse (i, req, _seq, ()) ev -> do
-- print (i, king)
-- when (i == fromIntegral king) $ do
putStrLn "respond"
logDebug "respond"
respond drv (fromIntegral req) ev

View File

@ -8,13 +8,13 @@ module Vere.Log ( EventLog, identity, nextEv
, streamEffectsRows, writeEffectsRow
) where
import ClassyPrelude hiding (init)
import Data.Acquire
import UrbitPrelude hiding (init)
import Data.RAcquire
import Data.Conduit
import Database.LMDB.Raw
import Foreign.Marshal.Alloc
import Foreign.Ptr
import Noun
import Vere.Pier.Types
import Foreign.Storable (peek, poke, sizeOf)
@ -26,6 +26,7 @@ import qualified Data.Vector as V
-- Types -----------------------------------------------------------------------
type Env = MDB_env
type Val = MDB_val
type Txn = MDB_txn
type Dbi = MDB_dbi
type Cur = MDB_cursor
@ -39,10 +40,10 @@ data EventLog = EventLog
, numEvents :: IORef EventId
}
nextEv :: EventLog -> IO EventId
nextEv :: EventLog -> RIO e EventId
nextEv = fmap succ . readIORef . numEvents
lastEv :: EventLog -> IO EventId
lastEv :: EventLog -> RIO e EventId
lastEv = readIORef . numEvents
data EventLogExn
@ -63,17 +64,18 @@ instance Exception EventLogExn where
-- Open/Close an Event Log -----------------------------------------------------
rawOpen :: FilePath -> IO Env
rawOpen dir = do
putStrLn $ pack ("PAX: " <> dir)
rawOpen :: MonadIO m => FilePath -> m Env
rawOpen dir = io $ do
env <- mdb_env_create
mdb_env_set_maxdbs env 3
mdb_env_set_mapsize env (40 * 1024 * 1024 * 1024)
mdb_env_open env dir []
pure env
create :: FilePath -> LogIdentity -> IO EventLog
create :: HasLogFunc e => FilePath -> LogIdentity -> RIO e EventLog
create dir id = do
logDebug $ display (pack @Text $ "Creating LMDB database: " <> dir)
logDebug $ display (pack @Text $ "Log Identity: " <> show id)
env <- rawOpen dir
(m, e, f) <- createTables env
clearEvents env e
@ -81,28 +83,31 @@ create dir id = do
EventLog env m e f id <$> newIORef 0
where
createTables env =
with (writeTxn env) $ \txn ->
rwith (writeTxn env) $ \txn -> io $
(,,) <$> mdb_dbi_open txn (Just "META") [MDB_CREATE]
<*> mdb_dbi_open txn (Just "EVENTS") [MDB_CREATE, MDB_INTEGERKEY]
<*> mdb_dbi_open txn (Just "EFFECTS") [MDB_CREATE, MDB_INTEGERKEY]
open :: FilePath -> IO EventLog
open :: HasLogFunc e => FilePath -> RIO e EventLog
open dir = do
logDebug $ display (pack @Text $ "Opening LMDB database: " <> dir)
env <- rawOpen dir
(m, e, f) <- openTables env
id <- getIdent env m
logDebug $ display (pack @Text $ "Log Identity: " <> show id)
numEvs <- getNumEvents env e
EventLog env m e f id <$> newIORef numEvs
where
openTables env =
with (writeTxn env) $ \txn ->
rwith (writeTxn env) $ \txn -> io $
(,,) <$> mdb_dbi_open txn (Just "META") []
<*> mdb_dbi_open txn (Just "EVENTS") [MDB_INTEGERKEY]
<*> mdb_dbi_open txn (Just "EFFECTS") [MDB_CREATE, MDB_INTEGERKEY]
close :: EventLog -> IO ()
close (EventLog env meta events effects _ _) = do
mdb_dbi_close env meta
close :: HasLogFunc e => FilePath -> EventLog -> RIO e ()
close dir (EventLog env meta events effects _ _) = do
logDebug $ display (pack @Text $ "Closing LMDB database: " <> dir)
io $ do mdb_dbi_close env meta
mdb_dbi_close env events
mdb_dbi_close env effects
mdb_env_sync_flush env
@ -111,11 +116,11 @@ close (EventLog env meta events effects _ _) = do
-- Create a new event log or open an existing one. -----------------------------
existing :: FilePath -> Acquire EventLog
existing dir = mkAcquire (open dir) close
existing :: HasLogFunc e => FilePath -> RAcquire e EventLog
existing dir = mkRAcquire (open dir) (close dir)
new :: FilePath -> LogIdentity -> Acquire EventLog
new dir id = mkAcquire (create dir id) close
new :: HasLogFunc e => FilePath -> LogIdentity -> RAcquire e EventLog
new dir id = mkRAcquire (create dir id) (close dir)
-- Read/Write Log Identity -----------------------------------------------------
@ -125,22 +130,22 @@ new dir id = mkAcquire (create dir id) close
Use this when opening database handles.
-}
_openTxn :: Env -> Acquire Txn
_openTxn env = mkAcquire begin commit
_openTxn :: Env -> RAcquire e Txn
_openTxn env = mkRAcquire begin commit
where
begin = mdb_txn_begin env Nothing True
commit = mdb_txn_commit
begin = io $ mdb_txn_begin env Nothing True
commit = io . mdb_txn_commit
{-
A read-only transaction that aborts at the end.
Use this when reading data from already-opened databases.
-}
readTxn :: Env -> Acquire Txn
readTxn env = mkAcquire begin abort
readTxn :: Env -> RAcquire e Txn
readTxn env = mkRAcquire begin abort
where
begin = mdb_txn_begin env Nothing True
abort = mdb_txn_abort
begin = io $ mdb_txn_begin env Nothing True
abort = io . mdb_txn_abort
{-
A read-write transaction that commits upon sucessful completion and
@ -148,42 +153,44 @@ readTxn env = mkAcquire begin abort
Use this when reading data from already-opened databases.
-}
writeTxn :: Env -> Acquire Txn
writeTxn env = mkAcquireType begin finalize
writeTxn :: Env -> RAcquire e Txn
writeTxn env = mkRAcquireType begin finalize
where
begin = mdb_txn_begin env Nothing False
finalize txn = \case
begin = io $ mdb_txn_begin env Nothing False
finalize txn = io . \case
ReleaseNormal -> mdb_txn_commit txn
ReleaseEarly -> mdb_txn_commit txn
ReleaseException -> mdb_txn_abort txn
cursor :: Txn -> Dbi -> Acquire Cur
cursor txn dbi = mkAcquire open close
cursor :: Txn -> Dbi -> RAcquire e Cur
cursor txn dbi = mkRAcquire open close
where
open = mdb_cursor_open txn dbi
close = mdb_cursor_close
open = io $ mdb_cursor_open txn dbi
close = io . mdb_cursor_close
getIdent :: Env -> Dbi -> IO LogIdentity
getIdent env dbi =
getIdent :: HasLogFunc e => Env -> Dbi -> RIO e LogIdentity
getIdent env dbi = do
logDebug "Reading log identity"
getTbl env >>= traverse decodeIdent >>= \case
Nothing -> throwIO NoLogIdentity
Just li -> pure li
where
decodeIdent :: (Noun, Noun, Noun) -> IO LogIdentity
decodeIdent :: (Noun, Noun, Noun) -> RIO e LogIdentity
decodeIdent = fromNounExn . toNoun
getTbl :: Env -> IO (Maybe (Noun, Noun, Noun))
getTbl :: Env -> RIO e (Maybe (Noun, Noun, Noun))
getTbl env = do
with (readTxn env) $ \txn -> do
rwith (readTxn env) $ \txn -> do
who <- getMb txn dbi "who"
fake <- getMb txn dbi "is-fake"
life <- getMb txn dbi "life"
pure $ (,,) <$> who <*> fake <*> life
writeIdent :: Env -> Dbi -> LogIdentity -> IO ()
writeIdent :: HasLogFunc e => Env -> Dbi -> LogIdentity -> RIO e ()
writeIdent env metaTbl ident@LogIdentity{..} = do
logDebug "Writing log identity"
let flags = compileWriteFlags []
with (writeTxn env) $ \txn -> do
rwith (writeTxn env) $ \txn -> do
x <- putNoun flags txn metaTbl "who" (toNoun who)
y <- putNoun flags txn metaTbl "is-fake" (toNoun isFake)
z <- putNoun flags txn metaTbl "life" (toNoun lifecycleLen)
@ -193,30 +200,30 @@ writeIdent env metaTbl ident@LogIdentity{..} = do
-- Latest Event Number ---------------------------------------------------------
getNumEvents :: Env -> Dbi -> IO Word64
getNumEvents :: Env -> Dbi -> RIO e Word64
getNumEvents env eventsTbl =
with (readTxn env) $ \txn ->
with (cursor txn eventsTbl) $ \cur ->
withKVPtrs nullVal nullVal $ \pKey pVal ->
mdb_cursor_get MDB_LAST cur pKey pVal >>= \case
rwith (readTxn env) $ \txn ->
rwith (cursor txn eventsTbl) $ \cur ->
withKVPtrs' nullVal nullVal $ \pKey pVal ->
io $ mdb_cursor_get MDB_LAST cur pKey pVal >>= \case
False -> pure 0
True -> peek pKey >>= mdbValToWord64
-- Write Events ----------------------------------------------------------------
clearEvents :: Env -> Dbi -> IO ()
clearEvents :: Env -> Dbi -> RIO e ()
clearEvents env eventsTbl =
with (writeTxn env) $ \txn ->
with (cursor txn eventsTbl) $ \cur ->
withKVPtrs nullVal nullVal $ \pKey pVal -> do
let loop = mdb_cursor_get MDB_LAST cur pKey pVal >>= \case
rwith (writeTxn env) $ \txn ->
rwith (cursor txn eventsTbl) $ \cur ->
withKVPtrs' nullVal nullVal $ \pKey pVal -> do
let loop = io (mdb_cursor_get MDB_LAST cur pKey pVal) >>= \case
False -> pure ()
True -> do mdb_cursor_del (compileWriteFlags []) cur
True -> do io $ mdb_cursor_del (compileWriteFlags []) cur
loop
loop
appendEvents :: EventLog -> Vector ByteString -> IO ()
appendEvents :: EventLog -> Vector ByteString -> RIO e ()
appendEvents log !events = do
numEvs <- readIORef (numEvents log)
next <- pure (numEvs + 1)
@ -225,15 +232,15 @@ appendEvents log !events = do
where
flags = compileWriteFlags [MDB_NOOVERWRITE]
doAppend = \kvs ->
with (writeTxn $ env log) $ \txn ->
rwith (writeTxn $ env log) $ \txn ->
for_ kvs $ \(k,v) -> do
putBytes flags txn (eventsTbl log) k v >>= \case
True -> pure ()
False -> throwIO (BadWriteEvent k)
writeEffectsRow :: EventLog -> EventId -> ByteString -> IO ()
writeEffectsRow :: EventLog -> EventId -> ByteString -> RIO e ()
writeEffectsRow log k v = do
with (writeTxn $ env log) $ \txn ->
rwith (writeTxn $ env log) $ \txn ->
putBytes flags txn (effectsTbl log) k v >>= \case
True -> pure ()
False -> throwIO (BadWriteEffect k)
@ -244,21 +251,24 @@ writeEffectsRow log k v = do
--------------------------------------------------------------------------------
-- Read Events -----------------------------------------------------------------
streamEvents :: EventLog -> Word64
-> ConduitT () ByteString IO ()
streamEvents :: HasLogFunc e
=> EventLog -> Word64
-> ConduitT () ByteString (RIO e) ()
streamEvents log first = do
last <- liftIO $ lastEv log
batch <- liftIO (readBatch log first)
last <- lift $ lastEv log
batch <- lift $ readBatch log first
unless (null batch) $ do
for_ batch yield
streamEvents log (first + word (length batch))
streamEffectsRows :: EventLog -> EventId
-> ConduitT () (Word64, ByteString) IO ()
streamEffectsRows :: e. HasLogFunc e
=> EventLog -> EventId
-> ConduitT () (Word64, ByteString) (RIO e) ()
streamEffectsRows log = go
where
go :: EventId -> ConduitT () (Word64, ByteString) (RIO e) ()
go next = do
batch <- liftIO $ readRowsBatch (env log) (effectsTbl log) next
batch <- lift $ readRowsBatch (env log) (effectsTbl log) next
unless (null batch) $ do
for_ batch yield
go (next + fromIntegral (length batch))
@ -268,7 +278,7 @@ streamEffectsRows log = go
Throws `MissingEvent` if an event was missing from the log.
-}
readBatch :: EventLog -> Word64 -> IO (V.Vector ByteString)
readBatch :: EventLog -> Word64 -> RIO e (V.Vector ByteString)
readBatch log first = start
where
start = do
@ -277,59 +287,66 @@ readBatch log first = start
then pure mempty
else readRows $ fromIntegral $ min 1000 $ ((last+1) - first)
assertFound :: EventId -> Bool -> IO ()
assertFound :: EventId -> Bool -> RIO e ()
assertFound id found = do
unless found $ throwIO $ MissingEvent id
readRows count =
withWordPtr first $ \pIdx ->
withKVPtrs (MDB_val 8 (castPtr pIdx)) nullVal $ \pKey pVal ->
with (readTxn $ env log) $ \txn ->
with (cursor txn $ eventsTbl log) $ \cur -> do
assertFound first =<< mdb_cursor_get MDB_SET_KEY cur pKey pVal
withKVPtrs' (MDB_val 8 (castPtr pIdx)) nullVal $ \pKey pVal ->
rwith (readTxn $ env log) $ \txn ->
rwith (cursor txn $ eventsTbl log) $ \cur -> do
assertFound first =<< io (mdb_cursor_get MDB_SET_KEY cur pKey pVal)
fetchRows count cur pKey pVal
fetchRows count cur pKey pVal = do
V.generateM count $ \i -> do
key <- peek pKey >>= mdbValToWord64
val <- peek pVal >>= mdbValToBytes
env <- ask
V.generateM count $ \i -> runRIO env $ do
key <- io $ peek pKey >>= mdbValToWord64
val <- io $ peek pVal >>= mdbValToBytes
idx <- pure (first + word i)
unless (key == idx) $ throwIO $ MissingEvent idx
when (count /= succ i) $ do
assertFound idx =<< mdb_cursor_get MDB_NEXT cur pKey pVal
assertFound idx =<< io (mdb_cursor_get MDB_NEXT cur pKey pVal)
pure val
{-
Read 1000 rows from the database, starting from key `first`.
-}
readRowsBatch :: Env -> Dbi -> Word64 -> IO (V.Vector (Word64, ByteString))
readRowsBatch :: e. HasLogFunc e
=> Env -> Dbi -> Word64 -> RIO e (V.Vector (Word64, ByteString))
readRowsBatch env dbi first = readRows
where
readRows = do
-- print ("readRows", first)
logDebug $ displayShow ("readRows", first)
withWordPtr first $ \pIdx ->
withKVPtrs (MDB_val 8 (castPtr pIdx)) nullVal $ \pKey pVal ->
with (readTxn env) $ \txn ->
with (cursor txn dbi) $ \cur ->
mdb_cursor_get MDB_SET_RANGE cur pKey pVal >>= \case
withKVPtrs' (MDB_val 8 (castPtr pIdx)) nullVal $ \pKey pVal ->
rwith (readTxn env) $ \txn ->
rwith (cursor txn dbi) $ \cur ->
io (mdb_cursor_get MDB_SET_RANGE cur pKey pVal) >>= \case
False -> pure mempty
True -> V.unfoldrM (fetchRows cur pKey pVal) 1000
fetchRows :: Cur -> Ptr MDB_val -> Ptr MDB_val
-> Word
-> IO (Maybe ((Word64, ByteString), Word))
fetchRows :: Cur -> Ptr Val -> Ptr Val -> Word
-> RIO e (Maybe ((Word64, ByteString), Word))
fetchRows cur pKey pVal 0 = pure Nothing
fetchRows cur pKey pVal n = do
key <- peek pKey >>= mdbValToWord64
val <- peek pVal >>= mdbValToBytes
-- print ("fetchRows", n, key, val)
mdb_cursor_get MDB_NEXT cur pKey pVal >>= \case
key <- io $ peek pKey >>= mdbValToWord64
val <- io $ peek pVal >>= mdbValToBytes
logDebug $ displayShow ("fetchRows", n, key, val)
io $ mdb_cursor_get MDB_NEXT cur pKey pVal >>= \case
False -> pure $ Just ((key, val), 0)
True -> pure $ Just ((key, val), pred n)
-- Utils -----------------------------------------------------------------------
withKVPtrs' :: (MonadIO m, MonadUnliftIO m)
=> Val -> Val -> (Ptr Val -> Ptr Val -> m a) -> m a
withKVPtrs' k v cb =
withRunInIO $ \run ->
withKVPtrs k v $ \x y -> run (cb x y)
nullVal :: MDB_val
nullVal = MDB_val 0 nullPtr
@ -353,20 +370,24 @@ mdbValToWord64 (MDB_val sz ptr) = do
assertExn (sz == 8) BadKeyInEventLog
peek (castPtr ptr)
withWord64AsMDBval :: Word64 -> (MDB_val -> IO a) -> IO a
withWord64AsMDBval :: (MonadIO m, MonadUnliftIO m)
=> Word64 -> (MDB_val -> m a) -> m a
withWord64AsMDBval w cb = do
withWordPtr w $ \p ->
cb (MDB_val (fromIntegral (sizeOf w)) (castPtr p))
withWordPtr :: Word64 -> (Ptr Word64 -> IO a) -> IO a
withWordPtr w cb = do
allocaBytes (sizeOf w) (\p -> poke p w >> cb p)
withWordPtr :: (MonadIO m, MonadUnliftIO m)
=> Word64 -> (Ptr Word64 -> m a) -> m a
withWordPtr w cb =
withRunInIO $ \run ->
allocaBytes (sizeOf w) (\p -> poke p w >> run (cb p))
-- Lower-Level Operations ------------------------------------------------------
getMb :: Txn -> Dbi -> ByteString -> IO (Maybe Noun)
getMb :: MonadIO m => Txn -> Dbi -> ByteString -> m (Maybe Noun)
getMb txn db key =
io $
byteStringAsMdbVal key $ \mKey ->
mdb_get txn db mKey >>= traverse (mdbValToNoun key)
@ -380,14 +401,19 @@ mdbValToNoun key (MDB_val sz ptr) = do
let res = cueBS bs
eitherExn res (\err -> BadNounInLogIdentity key err bs)
putNoun :: MDB_WriteFlags -> Txn -> Dbi -> ByteString -> Noun -> IO Bool
putNoun :: MonadIO m
=> MDB_WriteFlags -> Txn -> Dbi -> ByteString -> Noun -> m Bool
putNoun flags txn db key val =
io $
byteStringAsMdbVal key $ \mKey ->
byteStringAsMdbVal (jamBS val) $ \mVal ->
mdb_put flags txn db mKey mVal
putBytes :: MDB_WriteFlags -> Txn -> Dbi -> Word64 -> ByteString -> IO Bool
putBytes flags txn db id bs = do
withWord64AsMDBval id $ \idVal -> do
byteStringAsMdbVal bs $ \mVal -> do
putBytes :: MonadIO m
=> MDB_WriteFlags -> Txn -> Dbi -> Word64 -> ByteString -> m Bool
putBytes flags txn db id bs =
io $
withWord64AsMDBval id $ \idVal ->
byteStringAsMdbVal bs $ \mVal ->
mdb_put flags txn db idVal mVal

View File

@ -29,20 +29,20 @@ import qualified Vere.Serf as Serf
_ioDrivers = [] :: [IODriver]
setupPierDirectory :: FilePath -> IO ()
setupPierDirectory :: FilePath -> RIO e ()
setupPierDirectory shipPath = do
for_ ["put", "get", "log", "chk"] $ \seg -> do
let pax = shipPath <> "/.urb/" <> seg
createDirectoryIfMissing True pax
setFileMode pax ownerModes
io $ createDirectoryIfMissing True pax
io $ setFileMode pax ownerModes
-- Load pill into boot sequence. -----------------------------------------------
genEntropy :: IO Word512
genEntropy = fromIntegral . view (from atomBytes) <$> Ent.getEntropy 64
genEntropy :: RIO e Word512
genEntropy = fromIntegral . view (from atomBytes) <$> io (Ent.getEntropy 64)
generateBootSeq :: Ship -> Pill -> IO BootSeq
generateBootSeq :: Ship -> Pill -> RIO e BootSeq
generateBootSeq ship Pill{..} = do
ent <- genEntropy
let ovums = preKern ent <> pKernelOvums <> pUserspaceOvums
@ -57,15 +57,16 @@ generateBootSeq ship Pill{..} = do
-- Write a batch of jobs into the event log ------------------------------------
writeJobs :: EventLog -> Vector Job -> IO ()
writeJobs :: EventLog -> Vector Job -> RIO e ()
writeJobs log !jobs = do
expect <- Log.nextEv log
events <- fmap fromList $ traverse fromJob (zip [expect..] $ toList jobs)
Log.appendEvents log events
where
fromJob :: (EventId, Job) -> IO ByteString
fromJob :: (EventId, Job) -> RIO e ByteString
fromJob (expectedId, job) = do
guard (expectedId == jobId job)
unless (expectedId == jobId job) $
error $ show ("bad job id!", expectedId, jobId job)
pure $ jamBS $ jobPayload job
jobPayload :: Job -> Noun
@ -75,76 +76,79 @@ writeJobs log !jobs = do
-- Boot a new ship. ------------------------------------------------------------
booted :: FilePath -> FilePath -> Serf.Flags -> Ship
-> Acquire (Serf, EventLog, SerfState)
booted :: HasLogFunc e
=> FilePath -> FilePath -> Serf.Flags -> Ship
-> RAcquire e (Serf, EventLog, SerfState)
booted pillPath pierPath flags ship = do
putStrLn "LOADING PILL"
rio $ logTrace "LOADING PILL"
pill <- liftIO (loadFile pillPath >>= either throwIO pure)
pill <- io (loadFile pillPath >>= either throwIO pure)
putStrLn "PILL LOADED"
rio $ logTrace "PILL LOADED"
seq@(BootSeq ident x y) <- liftIO $ generateBootSeq ship pill
seq@(BootSeq ident x y) <- rio $ generateBootSeq ship pill
putStrLn "BootSeq Computed"
rio $ logTrace "BootSeq Computed"
liftIO (setupPierDirectory pierPath)
liftRIO (setupPierDirectory pierPath)
putStrLn "Directory Setup"
rio $ logTrace "Directory Setup"
log <- Log.new (pierPath <> "/.urb/log") ident
putStrLn "Event Log Initialized"
rio $ logTrace "Event Log Initialized"
serf <- Serf.run (Serf.Config pierPath flags)
putStrLn "Serf Started"
rio $ logTrace "Serf Started"
liftIO $ do
rio $ do
(events, serfSt) <- Serf.bootFromSeq serf seq
putStrLn "Boot Sequence completed"
logTrace "Boot Sequence completed"
Serf.snapshot serf serfSt
putStrLn "Snapshot taken"
logTrace "Snapshot taken"
writeJobs log (fromList events)
putStrLn "Events written"
logTrace "Events written"
pure (serf, log, serfSt)
-- Resume an existing ship. ----------------------------------------------------
resumed :: FilePath -> Serf.Flags
-> Acquire (Serf, EventLog, SerfState)
resumed :: HasLogFunc e
=> FilePath -> Serf.Flags
-> RAcquire e (Serf, EventLog, SerfState)
resumed top flags = do
log <- Log.existing (top <> "/.urb/log")
serf <- Serf.run (Serf.Config top flags)
serfSt <- liftIO (Serf.replay serf log)
serfSt <- rio $ Serf.replay serf log
liftIO (Serf.snapshot serf serfSt)
rio $ Serf.snapshot serf serfSt
pure (serf, log, serfSt)
-- Run Pier --------------------------------------------------------------------
pier :: FilePath
pier :: e. HasLogFunc e
=> FilePath
-> Maybe Port
-> (Serf, EventLog, SerfState)
-> Acquire ()
-> RAcquire e ()
pier pierPath mPort (serf, log, ss) = do
computeQ <- newTQueueIO :: Acquire (TQueue Ev)
persistQ <- newTQueueIO :: Acquire (TQueue (Job, FX))
executeQ <- newTQueueIO :: Acquire (TQueue FX)
computeQ <- newTQueueIO :: RAcquire e (TQueue Ev)
persistQ <- newTQueueIO :: RAcquire e (TQueue (Job, FX))
executeQ <- newTQueueIO :: RAcquire e (TQueue FX)
inst <- liftIO (KingId . UV . fromIntegral <$> randomIO @Word16)
inst <- io (KingId . UV . fromIntegral <$> randomIO @Word16)
vereTerminal <- initializeTerminal
vereTerminal <- liftAcquire $ initializeTerminal
let ship = who (Log.identity log)
let (bootEvents, startDrivers) =
drivers pierPath inst ship mPort (writeTQueue computeQ) vereTerminal
liftIO $ atomically $ for_ bootEvents (writeTQueue computeQ)
io $ atomically $ for_ bootEvents (writeTQueue computeQ)
tExe <- startDrivers >>= router (readTQueue executeQ)
tDisk <- runPersist log persistQ (writeTQueue executeQ)
@ -158,8 +162,8 @@ pier pierPath mPort (serf, log, ss) = do
]
atomically ded >>= \case
Left (txt, exn) -> print ("Somthing died", txt, exn)
Right tag -> print ("something simply exited", tag)
Left (txt, exn) -> logError $ displayShow ("Somthing died", txt, exn)
Right tag -> logError $ displayShow ("something simply exited", tag)
death :: Text -> Async () -> STM (Either (Text, SomeException) Text)
death tag tid = do
@ -169,23 +173,20 @@ death tag tid = do
-- Start All Drivers -----------------------------------------------------------
data Drivers = Drivers
{ dAmes :: EffCb AmesEf
, dBehn :: EffCb BehnEf
, dHttpClient :: EffCb HttpClientEf
, dHttpServer :: EffCb HttpServerEf
, dNewt :: EffCb NewtEf
, dSync :: EffCb SyncEf
, dTerm :: EffCb TermEf
data Drivers e = Drivers
{ dAmes :: EffCb e AmesEf
, dBehn :: EffCb e BehnEf
, dHttpClient :: EffCb e HttpClientEf
, dHttpServer :: EffCb e HttpServerEf
, dNewt :: EffCb e NewtEf
, dSync :: EffCb e SyncEf
, dTerm :: EffCb e TermEf
}
drivers :: FilePath
-> KingId
-> Ship
-> Maybe Port
-> (Ev -> STM ())
drivers :: HasLogFunc e
=> FilePath -> KingId -> Ship -> Maybe Port -> (Ev -> STM ())
-> VereTerminal
-> ([Ev], Acquire Drivers)
-> ([Ev], RAcquire e (Drivers e))
drivers pierPath inst who mPort plan vereTerm =
(initialEvents, runDrivers)
where
@ -195,25 +196,26 @@ drivers pierPath inst who mPort plan vereTerm =
(termBorn, runTerm) = term vereTerm inst plan
initialEvents = mconcat [behnBorn, amesBorn, httpBorn, termBorn]
runDrivers = do
dNewt <- runAmes
dBehn <- runBehn
dNewt <- liftAcquire $ runAmes
dBehn <- liftAcquire $ runBehn
dAmes <- pure $ const $ pure ()
dHttpClient <- pure $ const $ pure ()
dHttpServer <- runHttp
dSync <- pure $ const $ pure ()
dTerm <- runTerm
dTerm <- liftAcquire $ runTerm
pure (Drivers{..})
-- Route Effects to Drivers ----------------------------------------------------
router :: STM FX -> Drivers -> Acquire (Async ())
router waitFx Drivers{..} = mkAcquire start cancel
router :: HasLogFunc e => STM FX -> Drivers e -> RAcquire e (Async ())
router waitFx Drivers{..} =
mkRAcquire start cancel
where
start = async $ forever $ do
fx <- atomically waitFx
for_ fx $ \ef -> do
putStrLn ("[EFFECT]\n" <> pack (ppShow ef) <> "\n\n")
logEffect ef
case ef of
GoodParse (EfVega _ _) -> error "TODO"
GoodParse (EfExit _ _) -> error "TODO"
@ -226,25 +228,44 @@ router waitFx Drivers{..} = mkAcquire start cancel
GoodParse (EfVane (VENewt ef)) -> dNewt ef
GoodParse (EfVane (VESync ef)) -> dSync ef
GoodParse (EfVane (VETerm ef)) -> dTerm ef
FailParse n -> pPrint n
FailParse n -> logError
$ display
$ pack @Text (ppShow n)
-- Compute Thread --------------------------------------------------------------
runCompute :: Serf -> SerfState -> STM Ev -> ((Job, FX) -> STM ())
-> Acquire (Async ())
runCompute serf ss getEvent putResult =
mkAcquire (async (go ss)) cancel
logEvent :: HasLogFunc e => Ev -> RIO e ()
logEvent ev =
logDebug $ display $ "[EVENT]\n" <> pretty
where
go :: SerfState -> IO ()
pretty :: Text
pretty = pack $ unlines $ fmap ("\t" <>) $ lines $ ppShow ev
logEffect :: HasLogFunc e => Lenient Ef -> RIO e ()
logEffect ef =
logDebug $ display $ "[EFFECT]\n" <> pretty ef
where
pretty :: Lenient Ef -> Text
pretty = \case
GoodParse e -> pack $ unlines $ fmap ("\t" <>) $ lines $ ppShow e
FailParse n -> pack $ unlines $ fmap ("\t" <>) $ lines $ ppShow n
runCompute :: e. HasLogFunc e
=> Serf -> SerfState -> STM Ev -> ((Job, FX) -> STM ())
-> RAcquire e (Async ())
runCompute serf ss getEvent putResult =
mkRAcquire (async (go ss)) cancel
where
go :: SerfState -> RIO e ()
go ss = do
ev <- atomically getEvent
putStrLn ("[EVENT]\n" <> pack (ppShow ev) <> "\n\n")
wen <- Time.now
logEvent ev
wen <- io Time.now
eId <- pure (ssNextEv ss)
mug <- pure (ssLastMug ss)
(job', ss', fx) <- doJob serf (DoWork (Work eId mug wen ev))
(job', ss', fx) <- doJob serf $ DoWork $ Work eId mug wen ev
atomically (putResult (job', fx))
go ss'
@ -263,21 +284,21 @@ instance Exception PersistExn where
runPersist :: EventLog
-> TQueue (Job, FX)
-> (FX -> STM ())
-> Acquire (Async ())
-> RAcquire e (Async ())
runPersist log inpQ out =
mkAcquire runThread cancelWait
mkRAcquire runThread cancelWait
where
cancelWait :: Async () -> IO ()
cancelWait :: Async () -> RIO e ()
cancelWait tid = cancel tid >> wait tid
runThread :: IO (Async ())
runThread :: RIO e (Async ())
runThread = asyncBound $ forever $ do
writs <- atomically getBatchFromQueue
events <- validateJobsAndGetBytes (toNullable writs)
Log.appendEvents log events
atomically $ for_ writs $ \(_,fx) -> out fx
validateJobsAndGetBytes :: [(Job, FX)] -> IO (Vector ByteString)
validateJobsAndGetBytes :: [(Job, FX)] -> RIO e (Vector ByteString)
validateJobsAndGetBytes writs = do
expect <- Log.nextEv log
fmap fromList

View File

@ -84,7 +84,7 @@ deriveToNoun ''Order
type QueueEv = Ev -> STM ()
type EffCb a = a -> IO ()
type EffCb e a = a -> RIO e ()
type Perform = Ef -> IO ()

View File

@ -22,7 +22,6 @@ import System.Process
import Vere.Pier.Types
import Data.Bits (setBit)
import Control.Concurrent (threadDelay)
import Data.ByteString (hGet)
import Data.ByteString.Unsafe (unsafeUseAsCString)
import Foreign.Marshal.Alloc (alloca)
@ -61,9 +60,8 @@ compileFlags = foldl' (\acc flag -> setBit acc (fromEnum flag)) 0
data Config = Config FilePath [Flag]
deriving (Show)
debug _msg = pure () -- putStrLn ("[DEBUG]\t" <> msg)
serf msg = putStrLn ("[SERF]\t" <> msg)
serf :: HasLogFunc e => Text -> RIO e ()
serf msg = logInfo $ display ("SERF: " <> msg)
-- Types -----------------------------------------------------------------------
@ -98,8 +96,6 @@ data Plea
| PSlog EventId Word32 Tank
deriving (Eq, Show)
type GetJobs = EventId -> Word64 -> IO (Vector Job)
type ReplacementEv = Job
type WorkResult = (SerfState, FX)
type SerfResp = Either ReplacementEv WorkResult
@ -129,28 +125,32 @@ deriveNoun ''Plea
-- Utils -----------------------------------------------------------------------
printTank :: Word32 -> Tank -> IO ()
printTank :: HasLogFunc e => Word32 -> Tank -> RIO e ()
printTank _pri tank =
(serf . unlines . fmap unTape . wash (WashCfg 0 80)) tank
guardExn :: Exception e => Bool -> e -> IO ()
guardExn ok = unless ok . throwIO
guardExn :: (Exception e, MonadIO m) => Bool -> e -> m ()
guardExn ok = io . unless ok . throwIO
fromRightExn :: Exception e => Either a b -> (a -> e) -> IO b
fromRightExn :: (Exception e, MonadIO m) => Either a b -> (a -> e) -> m b
fromRightExn (Left m) exn = throwIO (exn m)
fromRightExn (Right x) _ = pure x
-- Process Management ----------------------------------------------------------
run :: Config -> Acquire Serf
run config = mkAcquire (startUp config) tearDown
run :: HasLogFunc e => Config -> RAcquire e Serf
run config = mkRAcquire (startUp config) tearDown
startUp :: Config -> IO Serf
startUp :: HasLogFunc e => Config -> RIO e Serf
startUp conf@(Config pierPath flags) = do
debug "STARTING SERF"
debug (tshow conf)
logTrace "STARTING SERF"
logTrace (displayShow conf)
(i, o, e, p) <- io $ do
(Just i, Just o, Just e, p) <- createProcess pSpec
pure (i, o, e, p)
ss <- newEmptyMVar
et <- async (readStdErr e)
pure (Serf i o et p ss)
@ -164,28 +164,30 @@ startUp conf@(Config pierPath flags) = do
, std_err = CreatePipe
}
readStdErr :: Handle -> IO ()
readStdErr :: e. HasLogFunc e => Handle -> RIO e ()
readStdErr h =
untilEOFExn $ do
ln <- IO.hGetLine h
ln <- io $ IO.hGetLine h
serf ("[stderr] " <> T.strip (pack ln))
where
eofMsg = "[Serf.readStdErr] serf stderr closed"
untilEOFExn :: IO () -> IO ()
untilEOFExn :: RIO e () -> RIO e ()
untilEOFExn act = loop
where
loop :: RIO e ()
loop = do
IO.tryIOError act >>= \case
Left exn | IO.isEOFError exn -> do debug eofMsg
pure ()
Left exn -> IO.ioError exn
env <- ask
res <- io $ IO.tryIOError $ runRIO env act
case res of
Left exn | IO.isEOFError exn -> logDebug eofMsg
Left exn -> io (IO.ioError exn)
Right () -> loop
tearDown :: Serf -> IO ()
tearDown :: Serf -> RIO e ()
tearDown serf = do
terminateProcess (process serf)
void (waitForExit serf)
io $ terminateProcess (process serf)
void $ waitForExit serf
-- race_ waitThenKill (shutdownAndWait serf 0)
where
@ -197,50 +199,49 @@ tearDown serf = do
-- debug killedMsg
-- terminateProcess (process serf)
waitForExit :: Serf -> IO ExitCode
waitForExit serf = waitForProcess (process serf)
waitForExit :: Serf -> RIO e ExitCode
waitForExit = io . waitForProcess . process
kill :: Serf -> IO ExitCode
kill serf = terminateProcess (process serf) >> waitForExit serf
kill :: Serf -> RIO e ExitCode
kill serf = io (terminateProcess $ process serf) >> waitForExit serf
{-
shutdownAndWait :: Serf -> Word8 -> IO ExitCode
shutdownAndWait serf code = do
_shutdownAndWait :: HasLogFunc e => Serf -> Word8 -> RIO e ExitCode
_shutdownAndWait serf code = do
shutdown serf code
waitForExit serf
-}
-- Basic Send and Receive Operations -------------------------------------------
withWord64AsByteString :: Word64 -> (ByteString -> IO a) -> IO a
withWord64AsByteString :: Word64 -> (ByteString -> RIO e a) -> RIO e a
withWord64AsByteString w k = do
alloca $ \wp -> do
env <- ask
io $ alloca $ \wp -> do
poke wp w
bs <- BS.unsafePackCStringLen (castPtr wp, 8)
k bs
runRIO env (k bs)
sendLen :: Serf -> Int -> IO ()
sendLen :: Serf -> Int -> RIO e ()
sendLen s i = do
w <- evaluate (fromIntegral i :: Word64)
withWord64AsByteString (fromIntegral i) (hPut (sendHandle s))
sendOrder :: Serf -> Order -> IO ()
sendOrder :: HasLogFunc e => Serf -> Order -> RIO e ()
sendOrder w o = do
debug ("[Serf.sendOrder.toNoun] " <> tshow o)
logDebug $ display ("[Serf.sendOrder.toNoun] " <> tshow o)
n <- evaluate (toNoun o)
case o of
OWork (DoWork (Work _ _ _ e)) -> do print (toNoun (e :: Ev))
OWork (DoWork (Work _ _ _ e)) -> do logTrace $ displayShow $ toNoun (e::Ev)
_ -> do pure ()
debug ("[Serf.sendOrder.jam]")
logDebug "[Serf.sendOrder.jam]"
bs <- evaluate (jamBS n)
debug ("[Serf.sendOrder.send]: " <> tshow (length bs))
logDebug $ display ("[Serf.sendOrder.send]: " <> tshow (length bs))
sendBytes w bs
debug ("[Serf.sendOrder.sent]")
logDebug "[Serf.sendOrder.sent]"
sendBytes :: Serf -> ByteString -> IO ()
sendBytes :: Serf -> ByteString -> RIO e ()
sendBytes s bs = handle ioErr $ do
sendLen s (length bs)
hFlush (sendHandle s)
@ -253,24 +254,24 @@ sendBytes s bs = handle ioErr $ do
hack
where
ioErr :: IOError -> IO ()
ioErr :: IOError -> RIO e ()
ioErr _ = throwIO SerfConnectionClosed
-- TODO WHY DOES THIS MATTER?????
hack = threadDelay 10000
recvLen :: Serf -> IO Word64
recvLen w = do
recvLen :: MonadIO m => Serf -> m Word64
recvLen w = io $ do
bs <- hGet (recvHandle w) 8
case length bs of
8 -> unsafeUseAsCString bs (peek . castPtr)
_ -> throwIO SerfConnectionClosed
recvBytes :: Serf -> Word64 -> IO ByteString
recvBytes w = do
hGet (recvHandle w) . fromIntegral
recvBytes :: Serf -> Word64 -> RIO e ByteString
recvBytes serf =
io . hGet (recvHandle serf) . fromIntegral
recvAtom :: Serf -> IO Atom
recvAtom :: Serf -> RIO e Atom
recvAtom w = do
len <- recvLen w
bs <- recvBytes w len
@ -285,20 +286,20 @@ cordText = T.strip . unCord
--------------------------------------------------------------------------------
snapshot :: Serf -> SerfState -> IO ()
snapshot :: HasLogFunc e => Serf -> SerfState -> RIO e ()
snapshot serf ss = sendOrder serf $ OSave $ ssLastEv ss
shutdown :: Serf -> Word8 -> IO ()
shutdown :: HasLogFunc e => Serf -> Word8 -> RIO e ()
shutdown serf code = sendOrder serf (OExit code)
{-
TODO Find a cleaner way to handle `PStdr` Pleas.
-}
recvPlea :: Serf -> IO Plea
recvPlea :: HasLogFunc e => Serf -> RIO e Plea
recvPlea w = do
debug ("[Vere.Serf.recvPlea] waiting")
logDebug "[Vere.Serf.recvPlea] waiting"
a <- recvAtom w
debug ("[Vere.Serf.recvPlea] got atom")
logDebug "[Vere.Serf.recvPlea] got atom"
n <- fromRightExn (cue a) (const $ BadPleaAtom a)
p <- fromRightExn (fromNounErr n) (\(p,m) -> BadPleaNoun (traceShowId n) p m)
@ -306,13 +307,13 @@ recvPlea w = do
recvPlea w
PSlog _ pri t -> do printTank pri t
recvPlea w
_ -> do debug ("[Serf.recvPlea] Got " <> tshow p)
_ -> do logTrace $ display ("recvPlea got: " <> tshow p)
pure p
{-
Waits for initial plea, and then sends boot IPC if necessary.
-}
handshake :: Serf -> LogIdentity -> IO SerfState
handshake :: HasLogFunc e => Serf -> LogIdentity -> RIO e SerfState
handshake serf ident = do
ss@SerfState{..} <- recvPlea serf >>= \case
PPlay Nothing -> pure $ SerfState 1 (Mug 0)
@ -324,27 +325,27 @@ handshake serf ident = do
pure ss
sendWork :: Serf -> Job -> IO SerfResp
sendWork :: e. HasLogFunc e => Serf -> Job -> RIO e SerfResp
sendWork w job =
do
sendOrder w (OWork job)
res <- loop
debug ("[Vere.Serf.sendWork] Got response")
logTrace ("[sendWork] Got response")
pure res
where
eId = jobId job
produce :: WorkResult -> IO SerfResp
produce :: WorkResult -> RIO e SerfResp
produce (ss@SerfState{..}, o) = do
guardExn (ssNextEv == (1+eId)) (BadComputeId eId (ss, o))
pure $ Right (ss, o)
replace :: ReplacementEv -> IO SerfResp
replace :: ReplacementEv -> RIO e SerfResp
replace job = do
guardExn (jobId job == eId) (BadReplacementId eId job)
pure (Left job)
loop :: IO SerfResp
loop :: RIO e SerfResp
loop = recvPlea w >>= \case
PPlay p -> throwIO (UnexpectedPlay eId p)
PDone i m o -> produce (SerfState (i+1) m, o)
@ -355,19 +356,19 @@ sendWork w job =
--------------------------------------------------------------------------------
doJob :: Serf -> Job -> IO (Job, SerfState, FX)
doJob :: HasLogFunc e => Serf -> Job -> RIO e (Job, SerfState, FX)
doJob serf job = do
sendWork serf job >>= \case
Left replaced -> doJob serf replaced
Right (ss, fx) -> pure (job, ss, fx)
bootJob :: Serf -> Job -> IO (Job, SerfState)
bootJob :: HasLogFunc e => Serf -> Job -> RIO e (Job, SerfState)
bootJob serf job = do
doJob serf job >>= \case
(job, ss, []) -> pure (job, ss)
(job, ss, fx) -> throwIO (EffectsDuringBoot (jobId job) fx)
replayJob :: Serf -> Job -> IO SerfState
replayJob :: HasLogFunc e => Serf -> Job -> RIO e SerfState
replayJob serf job = do
sendWork serf job >>= \case
Left replace -> throwIO (ReplacedEventDuringReplay (jobId job) replace)
@ -382,17 +383,17 @@ data BootExn = ShipAlreadyBooted
deriving stock (Eq, Ord, Show)
deriving anyclass (Exception)
bootFromSeq :: Serf -> BootSeq -> IO ([Job], SerfState)
bootFromSeq :: e. HasLogFunc e => Serf -> BootSeq -> RIO e ([Job], SerfState)
bootFromSeq serf (BootSeq ident nocks ovums) = do
handshake serf ident >>= \case
ss@(SerfState 1 (Mug 0)) -> loop [] ss bootSeqFns
_ -> throwIO ShipAlreadyBooted
where
loop :: [Job] -> SerfState -> [BootSeqFn] -> IO ([Job], SerfState)
loop :: [Job] -> SerfState -> [BootSeqFn] -> RIO e ([Job], SerfState)
loop acc ss = \case
[] -> pure (reverse acc, ss)
x:xs -> do wen <- Time.now
x:xs -> do wen <- io Time.now
job <- pure $ x (ssNextEv ss) (ssLastMug ss) wen
(job, ss) <- bootJob serf job
loop (job:acc) ss xs
@ -406,12 +407,13 @@ bootFromSeq serf (BootSeq ident nocks ovums) = do
The ship is booted, but it is behind. shove events to the worker
until it is caught up.
-}
replayJobs :: Serf -> SerfState -> ConduitT Job Void IO SerfState
replayJobs :: HasLogFunc e
=> Serf -> SerfState -> ConduitT Job Void (RIO e) SerfState
replayJobs serf = go
where
go ss = await >>= maybe (pure ss) (liftIO . replayJob serf >=> go)
go ss = await >>= maybe (pure ss) (lift . replayJob serf >=> go)
replay :: Serf -> Log.EventLog -> IO SerfState
replay :: HasLogFunc e => Serf -> Log.EventLog -> RIO e SerfState
replay serf log = do
ss <- handshake serf (Log.identity log)
@ -419,18 +421,18 @@ replay serf log = do
.| toJobs (Log.identity log) (ssNextEv ss)
.| replayJobs serf ss
toJobs :: LogIdentity -> EventId -> ConduitT ByteString Job IO ()
toJobs :: HasLogFunc e
=> LogIdentity -> EventId -> ConduitT ByteString Job (RIO e) ()
toJobs ident eId =
await >>= \case
Nothing -> putStrLn "[toJobs] no more jobs" >> pure ()
Just at -> do yield =<< liftIO (fromAtom at)
putStrLn ("[toJobs] " <> tshow eId)
Nothing -> lift $ logTrace "[toJobs] no more jobs"
Just at -> do yield =<< lift (fromAtom at)
lift $ logTrace $ display ("[toJobs] " <> tshow eId)
toJobs ident (eId+1)
where
isNock = trace ("[toJobs] " <> show (eId, lifecycleLen ident))
$ eId <= fromIntegral (lifecycleLen ident)
isNock = eId <= fromIntegral (lifecycleLen ident)
fromAtom :: ByteString -> IO Job
fromAtom :: ByteString -> RIO e Job
fromAtom bs | isNock = do
noun <- cueBSExn bs
(mug, nok) <- fromNounExn noun
@ -443,7 +445,7 @@ toJobs ident eId =
-- Collect Effects for Parsing -------------------------------------------------
collectFX :: Serf -> Log.EventLog -> IO ()
collectFX :: HasLogFunc e => Serf -> Log.EventLog -> RIO e ()
collectFX serf log = do
ss <- handshake serf (Log.identity log)
@ -452,26 +454,26 @@ collectFX serf log = do
.| doCollectFX serf ss
.| persistFX log
persistFX :: Log.EventLog -> ConduitT (EventId, FX) Void IO ()
persistFX :: Log.EventLog -> ConduitT (EventId, FX) Void (RIO e) ()
persistFX log = loop
where
loop = await >>= \case
Nothing -> pure ()
Just (eId, fx) -> do
liftIO $ Log.writeEffectsRow log eId (jamBS $ toNoun fx)
putStr "."
lift $ Log.writeEffectsRow log eId (jamBS $ toNoun fx)
loop
doCollectFX :: Serf -> SerfState -> ConduitT Job (EventId, FX) IO ()
doCollectFX :: e. HasLogFunc e
=> Serf -> SerfState -> ConduitT Job (EventId, FX) (RIO e) ()
doCollectFX serf = go
where
go :: SerfState -> ConduitT Job (EventId, FX) IO ()
go :: SerfState -> ConduitT Job (EventId, FX) (RIO e) ()
go ss = await >>= \case
Nothing -> pure ()
Just jb -> do
-- jb <- pure $ replaceMug jb (ssLastMug ss)
(_, ss, fx) <- liftIO (doJob serf jb)
liftIO $ print (jobId jb)
(_, ss, fx) <- lift $ doJob serf jb
lift $ logTrace $ displayShow (jobId jb)
yield (jobId jb, fx)
go ss

View File

@ -179,16 +179,16 @@ initializeTerminal = mkAcquire start stop
termShowCursor t newLs pos
term :: VereTerminal -> KingId -> QueueEv -> ([Ev], Acquire (EffCb TermEf))
term :: VereTerminal -> KingId -> QueueEv -> ([Ev], Acquire (EffCb e TermEf))
term VereTerminal{..} king enqueueEv =
(initialEvents, runTerm)
where
initialEvents = [(initialBlew vtWidth vtHeight), initialHail]
runTerm :: Acquire (EffCb TermEf)
runTerm :: Acquire (EffCb e TermEf)
runTerm = do
tim <- mkAcquire start stop
pure (handleEffect vtWriteQueue tim)
pure (io . handleEffect vtWriteQueue tim)
start :: IO TermDrv
start = do

View File

@ -34,6 +34,7 @@ dependencies:
- data-fix
- directory
- entropy
- exceptions
- extra
- fixed-vector
- flat
@ -51,6 +52,7 @@ dependencies:
- mtl
- multimap
- network
- optparse-applicative
- para
- pretty-show
- primitive
@ -77,13 +79,14 @@ dependencies:
- time
- transformers
- unix
- unliftio
- unliftio-core
- unordered-containers
- vector
- wai
- wai-conduit
- warp
- warp-tls
- optparse-applicative
default-extensions:
- ApplicativeDo