Got bulshit scry working and hooked up replay progress callback.

This commit is contained in:
~siprel 2020-06-04 23:49:56 +00:00
parent 3838cf8abb
commit 54acebb0c5
5 changed files with 57 additions and 6 deletions

View File

@ -15,12 +15,13 @@ Stubbed out:
- [x] Snapshots should block until that event is commited to disk.
- [x] Hook up error callbacks to IO Drivers.
- [x] Do something useful with error callbacks from IO Drivers.
- [ ] Make sure replay progress bars go to stderr.
King-Haskell specific features:
- [x] Re-implement `collectFX` flow in Serf/Pier.
- [x] Hook up `collectFX` to CLI.
- [ ] Test new `collectFX` flow
- [ ] Get `collect-all-fx` flow working again.
Performance:

View File

@ -713,6 +713,7 @@ instance (FromNoun a, FromNoun b) => FromNoun (Each a b) where
1 -> named "|" (EachNo <$> parseNoun v)
n -> fail ("Each has invalid head-atom: " <> show n)
-- Tuple Conversions -----------------------------------------------------------
instance ToNoun () where

View File

@ -24,9 +24,14 @@ import Urbit.Arvo
import Urbit.King.Config
import Urbit.Vere.Pier.Types
import Data.Bits (shiftR)
import Data.Text (append)
import Data.Time.Clock (DiffTime)
import Data.Time.Clock.System (systemToUTCTime)
import Data.Time.LocalTime (TimeOfDay(..), timeToTimeOfDay)
import System.Posix.Files (ownerModes, setFileMode)
import Urbit.King.App (HasKingEnv, HasPierEnv(..), PierEnv)
import Urbit.Time (Wen)
import Urbit.Vere.Ames (ames)
import Urbit.Vere.Behn (behn)
import Urbit.Vere.Clay (clay)
@ -37,6 +42,7 @@ import Urbit.Vere.Log (EventLog)
import Urbit.Vere.Serf (Serf)
import qualified System.Entropy as Ent
import qualified Urbit.Atom.Fast as Atom
import qualified Urbit.King.API as King
import qualified Urbit.Time as Time
import qualified Urbit.Vere.Log as Log
@ -302,10 +308,13 @@ pier (serf, log) vSlog mStart vKilled multi = do
io $ atomically $ for_ bootEvents (writeTQueue computeQ)
scryM <- newEmptyTMVarIO
let computeConfig = ComputeConfig
{ ccOnWork = readTQueue computeQ
, ccOnKill = readTMVar vKilled
, ccOnSave = takeTMVar saveM
, ccOnScry = takeTMVar scryM
, ccPutResult = writeTQueue persistQ
, ccShowSpinner = Term.spin muxed
, ccHideSpinner = Term.stopSpin muxed
@ -321,6 +330,14 @@ pier (serf, log) vSlog mStart vKilled multi = do
tSaveSignal <- saveSignalThread saveM
-- bullshit scry tester
void $ acquireWorker "bullshit scry tester" $ forever $ do
threadDelay 1_000_000
wen <- io Time.now
let cb mTermNoun = print ("scry result: ", mTermNoun)
let pax = Path ["j", "~zod", "life", MkKnot $ pack $ showDate wen, "~zod"]
atomically $ putTMVar scryM (wen, Nothing, pax, cb)
putMVar mStart ()
-- Wait for something to die.
@ -438,6 +455,7 @@ data ComputeConfig = ComputeConfig
{ ccOnWork :: STM Serf.EvErr
, ccOnKill :: STM ()
, ccOnSave :: STM ()
, ccOnScry :: STM (Wen, Gang, Path, Maybe (Term, Noun) -> IO ())
, ccPutResult :: (Fact, FX) -> STM ()
, ccShowSpinner :: Maybe Text -> STM ()
, ccHideSpinner :: STM ()
@ -448,9 +466,10 @@ runCompute :: forall e . HasKingEnv e => Serf.Serf -> ComputeConfig -> RIO e ()
runCompute serf ComputeConfig {..} = do
logTrace "runCompute"
let onCR = asum [ Serf.RRKill <$> ccOnKill
, Serf.RRSave <$> ccOnSave
, Serf.RRWork <$> ccOnWork
let onCR = asum [ ccOnKill <&> Serf.RRKill
, ccOnSave <&> Serf.RRSave
, ccOnWork <&> Serf.RRWork
, ccOnScry <&> \(w,g,p,k) -> Serf.RRScry w g p k
]
vEvProcessing :: TMVar Ev <- newEmptyTMVarIO
@ -512,3 +531,29 @@ runPersist log inpQ out = do
go acc = tryReadTQueue inpQ >>= \case
Nothing -> pure (reverse acc)
Just item -> go (item <| acc)
-- "~YYYY.MM.DD..HH.MM.SS..FRACTO"
showDate :: Wen -> String
showDate w = do
if fs == 0
then printf "~%i.%u.%u..%02u.%02u.%02u" y m d h min s
else printf "~%i.%u.%u..%02u.%02u.%02u..%s" y m d h min s (showGap fs)
where
(y, m, d) = toGregorian (utctDay utc)
(h, min, s) = diffTimeSplit (utctDayTime utc)
fs = fromIntegral (Time._fractoSecs (Time._sinceUrbitEpoch w)) :: Word
utc = w ^. Time.systemTime . to systemToUTCTime
showGap :: Word -> String
showGap gap = intercalate "." (printf "%04x" <$> bs)
where
bs = reverse $ dropWhile (== 0) [b4, b3, b2, b1]
b4 = Atom.takeBitsWord 16 gap
b3 = Atom.takeBitsWord 16 (shiftR gap 16)
b2 = Atom.takeBitsWord 16 (shiftR gap 32)
b1 = Atom.takeBitsWord 16 (shiftR gap 48)
diffTimeSplit :: DiffTime -> (Int, Int, Int)
diffTimeSplit dt = (hours, mins, floor secs)
where
TimeOfDay hours mins secs = timeToTimeOfDay dt

View File

@ -110,12 +110,14 @@ execReplay serf log last = do
logTrace $ display $ "Replaying up to event #" <> tshow replayUpTo
logTrace $ display $ "Will replay " <> tshow numEvs <> " in total."
let onProgress n = print ("Serf is at event# " <> tshow n)
runResourceT
$ runConduit
$ Log.streamEvents log (lastEventInSnap + 1)
.| CC.take (fromIntegral numEvs)
.| CC.mapM (fmap snd . parseLogRow)
.| replay 10 serf
.| replay 10 onProgress serf
-- Collect FX ------------------------------------------------------------------

View File

@ -481,15 +481,17 @@ replay
:: forall m
. (MonadResource m, MonadUnliftIO m, MonadIO m)
=> Int
-> (EventId -> IO ())
-> Serf
-> ConduitT Noun Void m (Maybe PlayBail)
replay batchSize serf = do
replay batchSize cb serf = do
withSerfLock serf $ \ss -> do
(r, ss') <- loop ss
pure (ss', r)
where
loop :: SerfState -> ConduitT Noun Void m (Maybe PlayBail, SerfState)
loop ss@(SerfState lastEve lastMug) = do
io (cb lastEve)
awaitBatch batchSize >>= \case
[] -> pure (Nothing, SerfState lastEve lastMug)
evs -> do