{-| Serf Interface TODO: `recvLen` is not big-endian safe. -} module Urbit.Vere.Serf ( Serf, sStderr, SerfState(..), doJob , run, shutdown, kill , replay, bootFromSeq, snapshot , collectFX , Config(..), Flags, Flag(..) ) where import Urbit.Prelude import Data.Conduit import System.Process import System.ProgressBar import Urbit.Arvo import Urbit.Vere.Pier.Types import Data.Bits (setBit) import Data.ByteString (hGet) import Data.ByteString.Unsafe (unsafeUseAsCString) import Foreign.Marshal.Alloc (alloca) import Foreign.Ptr (castPtr) import Foreign.Storable (peek, poke) import System.Exit (ExitCode) import Urbit.King.App (HasStderrLogFunc(..)) import qualified Data.ByteString.Unsafe as BS import qualified Data.Conduit.Combinators as CC import qualified Data.Text as T import qualified System.IO as IO import qualified System.IO.Error as IO import qualified Urbit.Ob as Ob import qualified Urbit.Time as Time import qualified Urbit.Vere.Log as Log -- Serf Config ----------------------------------------------------------------- type Flags = [Flag] data Flag = DebugRam | DebugCpu | CheckCorrupt | CheckFatal | Verbose | DryRun | Quiet | Hashless | Trace deriving (Eq, Ord, Show, Enum, Bounded) compileFlags :: [Flag] -> Word compileFlags = foldl' (\acc flag -> setBit acc (fromEnum flag)) 0 data Config = Config FilePath [Flag] deriving (Show) serf :: HasLogFunc e => Text -> RIO e () serf msg = logInfo $ display ("SERF: " <> msg) -- Types ----------------------------------------------------------------------- data SerfState = SerfState { ssNextEv :: EventId , ssLastMug :: Mug } deriving (Eq, Ord, Show) ssLastEv :: SerfState -> EventId ssLastEv = pred . ssNextEv data Serf e = Serf { sendHandle :: Handle , recvHandle :: Handle , process :: ProcessHandle , sStderr :: MVar (Text -> RIO e ()) } data ShipId = ShipId Ship Bool deriving (Eq, Ord, Show) data Plea = PPlay EventId Mug | PWork Work | PDone EventId Mug FX | PStdr EventId Cord | PSlog EventId Word32 Tank deriving (Eq, Show) type ReplacementEv = Job type WorkResult = (SerfState, FX) type SerfResp = Either ReplacementEv WorkResult data SerfExn = BadComputeId EventId WorkResult | BadReplacementId EventId ReplacementEv | UnexpectedPlay EventId (EventId, Mug) | BadPleaAtom Atom | BadPleaNoun Noun [Text] Text | ReplacedEventDuringReplay EventId ReplacementEv | ReplacedEventDuringBoot EventId ReplacementEv | EffectsDuringBoot EventId FX | SerfConnectionClosed | UnexpectedPleaOnNewShip Plea | InvalidInitialPlea Plea deriving (Show) -- Instances ------------------------------------------------------------------- instance Exception SerfExn deriveNoun ''ShipId deriveNoun ''Plea -- Utils ----------------------------------------------------------------------- printTank :: HasLogFunc e => MVar (Text -> RIO e ()) -> Word32 -> Tank -> RIO e () printTank log _pri = printErr log . unlines . fmap unTape . wash (WashCfg 0 80) guardExn :: (Exception e, MonadIO m) => Bool -> e -> m () guardExn ok = io . unless ok . throwIO fromRightExn :: (Exception e, MonadIO m) => Either a b -> (a -> e) -> m b fromRightExn (Left m) exn = throwIO (exn m) fromRightExn (Right x) _ = pure x printErr :: MVar (Text -> RIO e ()) -> Text -> RIO e () printErr m txt = do f <- readMVar m f txt -- Process Management ---------------------------------------------------------- run :: HasLogFunc e => Config -> RAcquire e (Serf e) run config = mkRAcquire (startUp config) tearDown startUp :: HasLogFunc e => Config -> RIO e (Serf e) startUp conf@(Config pierPath flags) = do logTrace "STARTING SERF" logTrace (displayShow conf) (i, o, e, p) <- io $ do (Just i, Just o, Just e, p) <- createProcess pSpec pure (i, o, e, p) stderr <- newMVar serf async (readStdErr e stderr) pure (Serf i o p stderr) where diskKey = "" config = show (compileFlags flags) args = [pierPath, diskKey, config] pSpec = (proc "urbit-worker" args) { std_in = CreatePipe , std_out = CreatePipe , std_err = CreatePipe } readStdErr :: ∀e. HasLogFunc e => Handle -> MVar (Text -> RIO e ()) -> RIO e () readStdErr h print = untilEOFExn $ do raw <- io $ IO.hGetLine h let ln = T.strip (pack raw) printErr print ln serf ("[stderr] " <> ln) where eofMsg = "[Serf.readStdErr] serf stderr closed" untilEOFExn :: RIO e () -> RIO e () untilEOFExn act = loop where loop :: RIO e () loop = do env <- ask res <- io $ IO.tryIOError $ runRIO env act case res of Left exn | IO.isEOFError exn -> logDebug eofMsg Left exn -> io (IO.ioError exn) Right () -> loop tearDown :: HasLogFunc e => Serf e -> RIO e () tearDown serf = do io $ terminateProcess (process serf) void $ waitForExit serf -- race_ waitThenKill (shutdownAndWait serf 0) where -- killedMsg = -- "[Serf.tearDown]: Serf didn't die when asked, killing it" -- waitThenKill = do -- threadDelay 1000000 -- debug killedMsg -- terminateProcess (process serf) waitForExit :: HasLogFunc e => Serf e -> RIO e ExitCode waitForExit = io . waitForProcess . process kill :: HasLogFunc e => Serf e -> RIO e ExitCode kill serf = io (terminateProcess $ process serf) >> waitForExit serf _shutdownAndWait :: HasLogFunc e => Serf e -> Word8 -> RIO e ExitCode _shutdownAndWait serf code = do shutdown serf code waitForExit serf -- Basic Send and Receive Operations ------------------------------------------- withWord64AsByteString :: Word64 -> (ByteString -> RIO e a) -> RIO e a withWord64AsByteString w k = do env <- ask io $ alloca $ \wp -> do poke wp w bs <- BS.unsafePackCStringLen (castPtr wp, 8) runRIO env (k bs) sendLen :: HasLogFunc e => Serf e -> Int -> RIO e () sendLen s i = do w <- evaluate (fromIntegral i :: Word64) withWord64AsByteString (fromIntegral i) (hPut (sendHandle s)) sendOrder :: HasLogFunc e => Serf e -> Order -> RIO e () sendOrder w o = do -- logDebug $ display ("(sendOrder) " <> tshow o) sendBytes w $ jamBS $ toNoun o -- logDebug "(sendOrder) Done" sendBytes :: HasLogFunc e => Serf e -> ByteString -> RIO e () sendBytes s bs = handle ioErr $ do sendLen s (length bs) hPut (sendHandle s) bs hFlush (sendHandle s) where ioErr :: IOError -> RIO e () ioErr _ = throwIO SerfConnectionClosed recvLen :: (MonadIO m, HasLogFunc e) => Serf e -> m Word64 recvLen w = io $ do bs <- hGet (recvHandle w) 8 case length bs of 8 -> unsafeUseAsCString bs (peek . castPtr) _ -> throwIO SerfConnectionClosed recvBytes :: HasLogFunc e => Serf e -> Word64 -> RIO e ByteString recvBytes serf = io . hGet (recvHandle serf) . fromIntegral recvAtom :: HasLogFunc e => Serf e -> RIO e Atom recvAtom w = do len <- recvLen w bytesAtom <$> recvBytes w len cordText :: Cord -> Text cordText = T.strip . unCord -------------------------------------------------------------------------------- snapshot :: HasLogFunc e => Serf e -> SerfState -> RIO e () snapshot serf ss = do logTrace $ display ("Taking snapshot at event " <> tshow (ssLastEv ss)) sendOrder serf $ OSave $ ssLastEv ss shutdown :: HasLogFunc e => Serf e -> Word8 -> RIO e () shutdown serf code = sendOrder serf (OExit code) {-| TODO Find a cleaner way to handle `PStdr` Pleas. -} recvPlea :: HasLogFunc e => Serf e -> RIO e Plea recvPlea w = do logDebug "(recvPlea) Waiting" a <- recvAtom w logDebug "(recvPlea) Got atom" n <- fromRightExn (cue a) (const $ BadPleaAtom a) p <- fromRightExn (fromNounErr n) (\(p,m) -> BadPleaNoun n p m) case p of PStdr e msg -> do printErr (sStderr w) (cordText msg) recvPlea w PSlog _ pri t -> do printTank (sStderr w) pri t recvPlea w _ -> do logTrace "recvPlea got something else" pure p {-| Waits for initial plea, and then sends boot IPC if necessary. -} handshake :: HasLogFunc e => Serf e -> LogIdentity -> RIO e SerfState handshake serf ident = do logTrace "Serf Handshake" ss@SerfState{..} <- recvPlea serf >>= \case PPlay e m -> pure $ SerfState e m x -> throwIO (InvalidInitialPlea x) logTrace $ display ("Handshake result: " <> tshow ss) when (ssNextEv == 1) $ do let ev = OBoot (lifecycleLen ident) logTrace $ display ("No snapshot. Sending boot event: " <> tshow ev) sendOrder serf ev logTrace "Finished handshake" pure ss sendWork :: ∀e. HasLogFunc e => Serf e -> Job -> RIO e SerfResp sendWork w job = do sendOrder w (OWork job) res <- loop logTrace ("[sendWork] Got response") pure res where eId = jobId job produce :: WorkResult -> RIO e SerfResp produce (ss@SerfState{..}, o) = do guardExn (ssNextEv == (1+eId)) (BadComputeId eId (ss, o)) pure $ Right (ss, o) replace :: ReplacementEv -> RIO e SerfResp replace job = do guardExn (jobId job == eId) (BadReplacementId eId job) pure (Left job) loop :: RIO e SerfResp loop = recvPlea w >>= \case PPlay e m -> throwIO (UnexpectedPlay eId (e, m)) PDone i m o -> produce (SerfState (i+1) m, o) PWork work -> replace (DoWork work) PStdr _ cord -> printErr (sStderr w) (cordText cord) >> loop PSlog _ pri t -> printTank (sStderr w) pri t >> loop -------------------------------------------------------------------------------- doJob :: HasLogFunc e => Serf e -> Job -> RIO e (Job, SerfState, FX) doJob serf job = do sendWork serf job >>= \case Left replaced -> doJob serf replaced Right (ss, fx) -> pure (job, ss, fx) bootJob :: HasLogFunc e => Serf e -> Job -> RIO e (Job, SerfState) bootJob serf job = do doJob serf job >>= \case (job, ss, _) -> pure (job, ss) -- (job, ss, fx) -> throwIO (EffectsDuringBoot (jobId job) fx) replayJob :: HasLogFunc e => Serf e -> Job -> RIO e SerfState replayJob serf job = do sendWork serf job >>= \case Left replace -> throwIO (ReplacedEventDuringReplay (jobId job) replace) Right (ss, _) -> pure ss -------------------------------------------------------------------------------- updateProgressBar :: HasLogFunc e => Int -> Text -> Maybe (ProgressBar ()) -> RIO e (Maybe (ProgressBar ())) updateProgressBar count startMsg = \case Nothing -> do -- We only construct the progress bar on the first time that we -- process an event so that we don't display an empty progress -- bar when the snapshot is caught up to the log. let style = defStyle { stylePrefix = msg (fromStrict startMsg) } pb <- newProgressBar style 10 (Progress 0 count ()) pure (Just pb) Just pb -> do incProgress pb 1 pure (Just pb) -------------------------------------------------------------------------------- type BootSeqFn = EventId -> Mug -> Time.Wen -> Job data BootExn = ShipAlreadyBooted deriving stock (Eq, Ord, Show) deriving anyclass (Exception) logStderr :: HasStderrLogFunc e => RIO LogFunc a -> RIO e a logStderr action = do logFunc <- view stderrLogFuncL runRIO logFunc action bootFromSeq :: ∀e. (HasStderrLogFunc e, HasLogFunc e) => Serf e -> BootSeq -> RIO e ([Job], SerfState) bootFromSeq serf (BootSeq ident nocks ovums) = do handshake serf ident >>= \case ss@(SerfState 1 (Mug 0)) -> loop [] ss Nothing bootSeqFns _ -> throwIO ShipAlreadyBooted where loop :: [Job] -> SerfState -> Maybe (ProgressBar ()) -> [BootSeqFn] -> RIO e ([Job], SerfState) loop acc ss pb = \case [] -> do pb <- logStderr (updateProgressBar 0 bootMsg pb) pure (reverse acc, ss) x:xs -> do wen <- io Time.now job <- pure $ x (ssNextEv ss) (ssLastMug ss) wen pb <- logStderr (updateProgressBar (1 + length xs) bootMsg pb) (job, ss) <- bootJob serf job loop (job:acc) ss pb xs bootSeqFns :: [BootSeqFn] bootSeqFns = fmap muckNock nocks <> fmap muckOvum ovums where muckNock nok eId mug _ = RunNok $ LifeCyc eId mug nok muckOvum ov eId mug wen = DoWork $ Work eId mug wen ov bootMsg = "Booting " ++ (fakeStr (isFake ident)) ++ (Ob.renderPatp (Ob.patp (fromIntegral (who ident)))) fakeStr True = "fake " fakeStr False = "" {-| The ship is booted, but it is behind. shove events to the worker until it is caught up. -} replayJobs :: (HasStderrLogFunc e, HasLogFunc e) => Serf e -> Int -> SerfState -> ConduitT Job Void (RIO e) SerfState replayJobs serf lastEv = go Nothing where go pb ss = do await >>= \case Nothing -> pure ss Just job -> do pb <- lift $ logStderr (updatePb ss pb) played <- lift $ replayJob serf job go pb played updatePb ss = do let start = lastEv - fromIntegral (ssNextEv ss) let msg = pack ( "Replaying events #" ++ (show (ssNextEv ss)) <> " to #" ++ (show lastEv) ) updateProgressBar start msg replay :: (HasStderrLogFunc e, HasLogFunc e) => Serf e -> Log.EventLog -> Maybe Word64 -> RIO e SerfState replay serf log last = do logTrace "Beginning event log replay" last & \case Nothing -> pure () Just lt -> logTrace $ display $ "User requested to replay up to event #" <> tshow lt ss <- handshake serf (Log.identity log) logLastEv :: Word64 <- fromIntegral <$> Log.lastEv log let serfNextEv = ssNextEv ss lastEventInSnap = serfNextEv - 1 logTrace $ display $ "Last event in event log is #" <> tshow logLastEv let replayUpTo = fromMaybe logLastEv last let numEvs :: Int = fromIntegral replayUpTo - fromIntegral lastEventInSnap logTrace $ display $ "Replaying up to event #" <> tshow replayUpTo logTrace $ display $ "Will replay " <> tshow numEvs <> " in total." runConduit $ Log.streamEvents log serfNextEv .| CC.take (fromIntegral numEvs) .| toJobs (Log.identity log) serfNextEv .| replayJobs serf (fromIntegral replayUpTo) ss toJobs :: HasLogFunc e => LogIdentity -> EventId -> ConduitT ByteString Job (RIO e) () toJobs ident eId = await >>= \case Nothing -> lift $ logTrace "[toJobs] no more jobs" Just at -> do yield =<< lift (fromAtom at) lift $ logTrace $ display ("[toJobs] " <> tshow eId) toJobs ident (eId+1) where isNock = eId <= fromIntegral (lifecycleLen ident) fromAtom :: ByteString -> RIO e Job fromAtom bs | isNock = do noun <- cueBSExn bs (mug, nok) <- fromNounExn noun pure $ RunNok (LifeCyc eId mug nok) fromAtom bs = do noun <- cueBSExn bs (mug, wen, ovm) <- fromNounExn noun pure $ DoWork (Work eId mug wen ovm) -- Collect Effects for Parsing ------------------------------------------------- collectFX :: HasLogFunc e => Serf e -> Log.EventLog -> RIO e () collectFX serf log = do ss <- handshake serf (Log.identity log) runConduit $ Log.streamEvents log (ssNextEv ss) .| toJobs (Log.identity log) (ssNextEv ss) .| doCollectFX serf ss .| persistFX log persistFX :: Log.EventLog -> ConduitT (EventId, FX) Void (RIO e) () persistFX log = loop where loop = await >>= \case Nothing -> pure () Just (eId, fx) -> do lift $ Log.writeEffectsRow log eId (jamBS $ toNoun fx) loop doCollectFX :: ∀e. HasLogFunc e => Serf e -> SerfState -> ConduitT Job (EventId, FX) (RIO e) () doCollectFX serf = go where go :: SerfState -> ConduitT Job (EventId, FX) (RIO e) () go ss = await >>= \case Nothing -> pure () Just jb -> do -- jb <- pure $ replaceMug jb (ssLastMug ss) (_, ss, fx) <- lift $ doJob serf jb when (0 == (jobId jb `mod` 10_000)) $ do lift $ logTrace $ displayShow (jobId jb) yield (jobId jb, fx) go ss _replaceMug :: Job -> Mug -> Job _replaceMug jb mug = case jb of DoWork (Work eId _ w o) -> DoWork (Work eId mug w o) RunNok (LifeCyc eId _ n) -> RunNok (LifeCyc eId mug n)