Properly implement the persist thread.
@ -3,10 +3,13 @@ module UrbitPrelude
, module Control.Lens
, module Noun
, module Data.Void
, module Data.Acquire
) where
import ClassyPrelude
import Control.Lens hiding (Index, cons, index, snoc, uncons, unsnoc, (<.>),
import Noun
import Data.Void
import Data.Acquire (Acquire, mkAcquire, with)
import Data.Void (Void, absurd)
@ -1,76 +0,0 @@
TODO Close the database on uncaught exception.
TODO `Persist` should just be the thread id.
the thread should close the database when it is killed.
module Vere.Persist (start, stop) where
import UrbitPrelude hiding (init)
import Vere.Log (EventLog)
import qualified Vere.Log as Log
import Vere.Pier.Types
-- Types -----------------------------------------------------------------------
data Persist = Persist EventLog (Async ())
-- Start and Stop --------------------------------------------------------------
start :: EventLog
-> TQueue (Writ [Eff])
-> (Writ [Eff] -> STM ())
-> IO Persist
start log inp cb = do
tid <- asyncBound (persistThread log inp cb)
pure (Persist log tid)
-- TODO: properly handle shutdowns during write
stop :: Persist -> IO ()
stop (Persist log tid) = do
void (cancel tid)
void (waitCatch tid)
-- Persist Thread --------------------------------------------------------------
-- TODO: We need to be able to send back an exception to the main thread on an
-- exception on the persistence thread.
persistThread :: EventLog
-> TQueue (Writ [Eff])
-> (Writ [Eff] -> STM ())
-> IO ()
persistThread log inputQueue onPersist =
forever $ do
writs <- atomically $ fmap toNullable $ readQueue inputQueue
events <- validateWrits writs
Log.appendEvents log events
atomically $ traverse_ onPersist writs
validateWrits :: [Writ [Eff]] -> IO (Vector Atom)
validateWrits writs = do
expect <- Log.nextEv log
fmap fromList
$ for (zip [expect..] writs)
$ \(expectedId, Writ{..}) -> do
guard (expectedId == eventId)
pure (unJam event)
-- Get eventhing from the input queue. -----------------------------------------
Read one or more items from a TQueue, only blocking on the first item.
readQueue :: TQueue a -> STM (NonNull [a])
readQueue q =
readTQueue q >>= go . singleton
go acc =
tryReadTQueue q >>= \case
Nothing -> pure (reverse acc)
Just item -> go (item <| acc)
@ -124,7 +124,7 @@ performCommonPierStartup serf computeQ persistQ releaseQ logState = do
driverThreads <- for ioDrivers $ \x -> do
startDriver x (writeTQueue computeQ)
-- TODO: Don't do a bunch of extra work; we send all events to all drivers
-- TODO: Don't do a bunch of extra work; we send all effects to all drivers
portingThread <- async $ do
forever $ do
r <- atomically (readTQueue releaseQ)
@ -136,3 +136,52 @@ performCommonPierStartup serf computeQ persistQ releaseQ logState = do
pure (Pier{..})
-- Persist Thread --------------------------------------------------------------
data PersistExn = BadEventId EventId EventId
deriving Show
instance Exception PersistExn where
displayException (BadEventId expected got) =
unlines [ "Out-of-order event id send to persist thread."
, "\tExpected " <> show expected <> " but got " <> show got
runPersist :: EventLog
-> TQueue (Writ [Eff])
-> (Writ [Eff] -> STM ())
-> Acquire ()
runPersist log inpQ out = do
mkAcquire runThread cancelWait
pure ()
cancelWait :: Async () -> IO ()
cancelWait tid = cancel tid >> wait tid
runThread :: IO (Async ())
runThread = asyncBound $ forever $ do
writs <- atomically (toNullable <$> getBatchFromQueue)
events <- validateWritsAndGetAtom writs
Log.appendEvents log events
atomically $ traverse_ out writs
validateWritsAndGetAtom :: [Writ [Eff]] -> IO (Vector Atom)
validateWritsAndGetAtom writs = do
expect <- Log.nextEv log
fmap fromList
$ for (zip [expect..] writs)
$ \(expectedId, Writ{..}) -> do
unless (expectedId == eventId) $
throwIO (BadEventId expectedId eventId)
pure (unJam event)
getBatchFromQueue :: STM (NonNull [Writ [Eff]])
getBatchFromQueue =
readTQueue inpQ >>= go . singleton
go acc =
tryReadTQueue inpQ >>= \case
Nothing -> pure (reverse acc)
Just item -> go (item <| acc)
@ -14,6 +14,9 @@ import qualified Vere.Http.Server as Server
newtype ShipId = ShipId (Ship, Bool)
deriving newtype (Eq, Ord, Show, ToNoun, FromNoun)
newtype Octs = Octs ByteString
deriving newtype (Eq, Ord, Show, FromNoun, ToNoun)
newtype FileOcts = FileOcts ByteString
deriving newtype (Eq, Ord, ToNoun, FromNoun)
@ -92,9 +95,6 @@ data Order
| OWork Job
deriving (Eq, Ord, Show)
newtype Octs = Octs ByteString
deriving newtype (Eq, Ord, Show, FromNoun, ToNoun)
data ResponseHeader = ResponseHeader
{ rhStatus :: Word
, rhHeaders :: [(Text, Text)]
@ -15,9 +15,8 @@ import Control.Concurrent (threadDelay)
import System.Directory (removeFile, doesFileExist)
import Text.Show.Pretty (pPrint)
import qualified Vere.Log as Log
import qualified Vere.Persist as Persist
import qualified Vere.Pier as Pier
import qualified Vere.Log as Log
import qualified Vere.Pier as Pier
@ -114,25 +113,30 @@ tryCopyLog = do
let logPath = "/Users/erg/src/urbit/zod/.urb/falselog/"
falselogPath = "/Users/erg/src/urbit/zod/.urb/falselog2/"
persistQ <- newTQueueIO
releaseQ <- newTQueueIO
(ident, nextEv, events) <-
with (Log.existing logPath) $ \log -> do
persistQ <- newTQueueIO
releaseQ <- newTQueueIO
persist <- Persist.start log persistQ (writeTQueue releaseQ)
ident <- pure $ Log.identity log
events <- runConduit (Log.streamEvents log 1 .| consume)
nextEv <- Log.nextEv log
with (do { log <- Log.existing logPath
; Pier.runPersist log persistQ (writeTQueue releaseQ)
; pure log
\log -> do
ident <- pure $ Log.identity log
events <- runConduit (Log.streamEvents log 1 .| consume)
nextEv <- Log.nextEv log
pure (ident, nextEv, events)
print ident
print nextEv
print (length events)
with (Log.new falselogPath ident) $ \log2 -> do
persistQ2 <- newTQueueIO
releaseQ2 <- newTQueueIO
persist2 <- Persist.start log2 persistQ2 (writeTQueue releaseQ2)
persistQ2 <- newTQueueIO
releaseQ2 <- newTQueueIO
with (do { log <- Log.new falselogPath ident
; Pier.runPersist log persistQ2 (writeTQueue releaseQ2)
; pure log
$ \log2 -> do
let writs = zip [1..] events <&> \(id, a) ->
Writ id Nothing (Jam a) []
