refactor syncInternal to use transaction tech

This commit is contained in:
Mitchell Rosen 2022-04-07 22:19:10 -04:00
parent af9be6743c
commit 186b76530a
6 changed files with 109 additions and 126 deletions

View File

@ -10,8 +10,6 @@
module U.Codebase.Sqlite.Sync22 where
import Control.Monad.Except (MonadError (throwError))
import Control.Monad.RWS (MonadReader)
import qualified Control.Monad.Reader as Reader
import Control.Monad.Validate (ValidateT, runValidateT)
import qualified Control.Monad.Validate as Validate
import Data.Bifunctor (bimap)
@ -39,7 +37,7 @@ import qualified U.Codebase.WatchKind as WK
import U.Util.Cache (Cache)
import qualified U.Util.Cache as Cache
import Unison.Prelude
import Unison.Sqlite (Connection, Transaction, unsafeUnTransaction)
import Unison.Sqlite (Transaction)
data Entity
= O ObjectId
@ -47,8 +45,6 @@ data Entity
| W WK.WatchKind Sqlite.Reference.IdH
deriving (Eq, Ord, Show)
data DbTag = SrcDb | DestDb
data DecodeError
= ErrTermComponent
| ErrDeclComponent
@ -67,41 +63,50 @@ data Error
| SourceDbNotExist
deriving (Show)
data Env = Env
{ srcDB :: Connection,
destDB :: Connection,
data Env m = Env
{ runSrc :: forall a. Transaction a -> m a,
runDest :: forall a. Transaction a -> m a,
-- | there are three caches of this size
idCacheSize :: Word
}
mapEnv :: (forall x. m x -> n x) -> Env m -> Env n
mapEnv f Env {runSrc, runDest, idCacheSize} =
Env
{ runSrc = f . runSrc,
runDest = f . runDest,
idCacheSize
}
debug :: Bool
debug = False
-- data Mappings
sync22 ::
( MonadIO m,
MonadError Error m,
MonadReader Env m
MonadError Error m
) =>
Env m ->
m (Sync m Entity)
sync22 = do
size <- Reader.reader idCacheSize
sync22 Env {runSrc, runDest, idCacheSize = size} = do
tCache <- Cache.semispaceCache size
hCache <- Cache.semispaceCache size
oCache <- Cache.semispaceCache size
cCache <- Cache.semispaceCache size
pure $ Sync (trySync tCache hCache oCache cCache)
pure $ Sync (trySync runSrc runDest tCache hCache oCache cCache)
trySync ::
forall m.
(MonadIO m, MonadError Error m, MonadReader Env m) =>
(MonadIO m, MonadError Error m) =>
(forall a. Transaction a -> m a) ->
(forall a. Transaction a -> m a) ->
Cache TextId TextId ->
Cache HashId HashId ->
Cache ObjectId ObjectId ->
Cache CausalHashId CausalHashId ->
Entity ->
m (TrySyncResult Entity)
trySync tCache hCache oCache cCache = \case
trySync runSrc runDest tCache hCache oCache cCache = \case
-- for causals, we need to get the value_hash_id of the thingo
-- - maybe enqueue their parents
-- - enqueue the self_ and value_ hashes
@ -111,14 +116,14 @@ trySync tCache hCache oCache cCache = \case
Just {} -> pure Sync.PreviouslyDone
Nothing -> do
result <- runValidateT @(Set Entity) @m @() do
bhId <- runSrc $ Q.expectCausalValueHashId chId
mayBoId <- runSrc . Q.loadObjectIdForAnyHashId $ unBranchHashId bhId
bhId <- lift . runSrc $ Q.expectCausalValueHashId chId
mayBoId <- lift . runSrc . Q.loadObjectIdForAnyHashId $ unBranchHashId bhId
traverse_ syncLocalObjectId mayBoId
parents' :: [CausalHashId] <- findParents' chId
bhId' <- lift $ syncBranchHashId bhId
chId' <- lift $ syncCausalHashId chId
runDest do
(lift . runDest) do
Q.saveCausal chId' bhId'
Q.saveCausalParents chId' parents'
@ -157,7 +162,7 @@ trySync tCache hCache oCache cCache = \case
let bytes' =
runPutS $
putWord8 fmt >> S.recomposeComponent (zip localIds' bytes)
oId' <- runDest $ Q.saveObject hId' objType bytes'
oId' <- lift . runDest $ Q.saveObject hId' objType bytes'
lift do
-- copy reference-specific stuff
for_ [0 .. length localIds - 1] \(fromIntegral -> idx) -> do
@ -188,7 +193,7 @@ trySync tCache hCache oCache cCache = \case
runPutS $
putWord8 fmt
>> S.recomposeComponent (zip localIds' declBytes)
oId' <- runDest $ Q.saveObject hId' objType bytes'
oId' <- lift . runDest $ Q.saveObject hId' objType bytes'
lift do
-- copy per-element-of-the-component stuff
for_ [0 .. length localIds - 1] \(fromIntegral -> idx) -> do
@ -202,26 +207,26 @@ trySync tCache hCache oCache cCache = \case
Right (BL.SyncFull ids body) -> do
ids' <- syncBranchLocalIds ids
let bytes' = runPutS $ S.recomposeBranchFormat (BL.SyncFull ids' body)
oId' <- runDest $ Q.saveObject hId' objType bytes'
oId' <- lift . runDest $ Q.saveObject hId' objType bytes'
pure oId'
Right (BL.SyncDiff boId ids body) -> do
boId' <- syncBranchObjectId boId
ids' <- syncBranchLocalIds ids
let bytes' = runPutS $ S.recomposeBranchFormat (BL.SyncDiff boId' ids' body)
oId' <- runDest $ Q.saveObject hId' objType bytes'
oId' <- lift . runDest $ Q.saveObject hId' objType bytes'
pure oId'
Left s -> throwError $ DecodeError ErrBranchFormat bytes s
OT.Patch -> case flip runGetS bytes S.decomposePatchFormat of
Right (PL.SyncFull ids body) -> do
ids' <- syncPatchLocalIds ids
let bytes' = runPutS $ S.recomposePatchFormat (PL.SyncFull ids' body)
oId' <- runDest $ Q.saveObject hId' objType bytes'
oId' <- lift . runDest $ Q.saveObject hId' objType bytes'
pure oId'
Right (PL.SyncDiff poId ids body) -> do
poId' <- syncPatchObjectId poId
ids' <- syncPatchLocalIds ids
let bytes' = runPutS $ S.recomposePatchFormat (PL.SyncDiff poId' ids' body)
oId' <- runDest $ Q.saveObject hId' objType bytes'
oId' <- lift . runDest $ Q.saveObject hId' objType bytes'
pure oId'
Left s -> throwError $ DecodeError ErrPatchFormat bytes s
case result of
@ -273,7 +278,7 @@ trySync tCache hCache oCache cCache = \case
-- workaround for requiring components to compute component lengths for references.
-- this line requires objects in the destination for any hashes referenced in the source,
-- (making those objects dependencies of this patch). See Sync21.filter{Term,Type}Edit
traverse_ syncLocalObjectId =<< traverse (runSrc . Q.expectObjectIdForAnyHashId) hIds
traverse_ syncLocalObjectId =<< traverse (lift . runSrc . Q.expectObjectIdForAnyHashId) hIds
pure $ PL.LocalIds tIds' hIds' oIds'
@ -355,7 +360,7 @@ trySync tCache hCache oCache cCache = \case
findParents' :: CausalHashId -> ValidateT (Set Entity) m [CausalHashId]
findParents' chId = do
srcParents <- runSrc $ Q.loadCausalParents chId
srcParents <- lift . runSrc $ Q.loadCausalParents chId
traverse syncCausal srcParents
-- Sync any watches of the given kinds to the dest if and only if watches of those kinds
@ -409,11 +414,3 @@ trySync tCache hCache oCache cCache = \case
(runDest $ Q.isCausalHash hId')
(pure . Just $ CausalHashId hId')
(pure Nothing)
runSrc,
runDest ::
(MonadIO m, MonadReader Env m) =>
Transaction a ->
m a
runSrc ma = Reader.reader srcDB >>= liftIO . unsafeUnTransaction ma
runDest ma = Reader.reader destDB >>= liftIO . unsafeUnTransaction ma

View File

@ -19,6 +19,10 @@ module Unison.Sqlite
Transaction,
runTransaction,
runTransactionWithAbort,
runReadOnlyTransaction,
runReadOnlyTransactionIO,
runWriteTransaction,
runWriteTransactionIO,
unsafeUnTransaction,
savepoint,
idempotentIO,

View File

@ -163,6 +163,7 @@ default-extensions:
- ApplicativeDo
- BangPatterns
- BlockArguments
- DeriveAnyClass
- DeriveFunctor
- DeriveGeneric
- DeriveTraversable

View File

@ -529,95 +529,75 @@ syncInternal ::
syncInternal progress srcConn destConn b = time "syncInternal" do
UnliftIO runInIO <- askUnliftIO
-- We start a savepoint on the src connection because it seemed to speed things up.
-- Mitchell says: that doesn't sound right... why would that be the case?
-- TODO: look into this; this connection should be used only for reads.
liftIO (Sqlite.Connection.savepoint srcConn "sync")
liftIO (Sqlite.Connection.savepoint destConn "sync")
-- FIXME don't savepoint above, instead BEGIN
result <- runExceptT do
let syncEnv = Sync22.Env srcConn destConn (16 * 1024 * 1024)
-- we want to use sync22 wherever possible
-- so for each source branch, we'll check if it exists in the destination codebase
-- or if it exists in the source codebase, then we can sync22 it
-- if it doesn't exist in the dest or source branch,
-- then just use putBranch to the dest
let se :: forall m a. Functor m => (ExceptT Sync22.Error m a -> ExceptT SyncEphemeral.Error m a)
se = Except.withExceptT SyncEphemeral.Sync22Error
let r :: forall m a. (ReaderT Sync22.Env m a -> m a)
r = flip runReaderT syncEnv
processBranches ::
Sync.Sync (ReaderT Sync22.Env (ExceptT Sync22.Error m)) Sync22.Entity ->
Sync.Progress (ReaderT Sync22.Env (ExceptT Sync22.Error m)) Sync22.Entity ->
[Entity m] ->
ExceptT Sync22.Error m ()
processBranches _ _ [] = pure ()
processBranches sync progress (b0@(B h mb) : rest) = do
when debugProcessBranches do
traceM $ "processBranches " ++ show b0
traceM $ " queue: " ++ show rest
ifM @(ExceptT Sync22.Error m)
(liftIO (Sqlite.unsafeUnTransaction (Ops2.isCausalHash h) destConn))
do
when debugProcessBranches $ traceM $ " " ++ show b0 ++ " already exists in dest db"
processBranches sync progress rest
do
when debugProcessBranches $ traceM $ " " ++ show b0 ++ " doesn't exist in dest db"
let h2 = CausalHash . Cv.hash1to2 $ Causal.unRawHash h
liftIO (Sqlite.unsafeUnTransaction (Q.loadCausalHashIdByCausalHash h2) srcConn) >>= \case
Just chId -> do
when debugProcessBranches $ traceM $ " " ++ show b0 ++ " exists in source db, so delegating to direct sync"
r $ Sync.sync' sync progress [Sync22.C chId]
Sqlite.runReadOnlyTransactionIO srcConn \runSrc -> do
Sqlite.runWriteTransactionIO destConn \runDest -> do
throwExceptT do
let syncEnv = Sync22.Env runSrc runDest (16 * 1024 * 1024)
-- we want to use sync22 wherever possible
-- so for each source branch, we'll check if it exists in the destination codebase
-- or if it exists in the source codebase, then we can sync22 it
-- if it doesn't exist in the dest or source branch,
-- then just use putBranch to the dest
let se :: forall m a. Functor m => (ExceptT Sync22.Error m a -> ExceptT SyncEphemeral.Error m a)
se = Except.withExceptT SyncEphemeral.Sync22Error
let r :: forall a. (ReaderT (Sync22.Env m) m a -> m a)
r = flip runReaderT syncEnv
processBranches ::
Sync.Sync (ExceptT Sync22.Error m) Sync22.Entity ->
Sync.Progress (ExceptT Sync22.Error m) Sync22.Entity ->
[Entity m] ->
ExceptT Sync22.Error m ()
processBranches _ _ [] = pure ()
processBranches sync progress (b0@(B h mb) : rest) = do
when debugProcessBranches do
traceM $ "processBranches " ++ show b0
traceM $ " queue: " ++ show rest
ifM @(ExceptT Sync22.Error m)
(lift (runDest (Ops2.isCausalHash h)))
do
when debugProcessBranches $ traceM $ " " ++ show b0 ++ " already exists in dest db"
processBranches sync progress rest
Nothing ->
lift mb >>= \b -> do
when debugProcessBranches $ traceM $ " " ++ show b0 ++ " doesn't exist in either db, so delegating to Codebase.putBranch"
let (branchDeps, BD.to' -> BD.Dependencies' es ts ds) = BD.fromBranch b
when debugProcessBranches do
traceM $ " branchDeps: " ++ show (fst <$> branchDeps)
traceM $ " terms: " ++ show ts
traceM $ " decls: " ++ show ds
traceM $ " edits: " ++ show es
(cs, es, ts, ds) <- liftIO $ flip Sqlite.unsafeUnTransaction destConn do
cs <- filterM (fmap not . Ops2.isCausalHash . fst) branchDeps
es <- filterM (fmap not . Ops2.patchExists) es
ts <- filterM (fmap not . Ops2.termExists) ts
ds <- filterM (fmap not . Ops2.declExists) ds
pure (cs, es, ts, ds)
if null cs && null es && null ts && null ds
then do
liftIO (Sqlite.unsafeUnTransaction (Ops2.putBranch (Branch.transform (Sqlite.idempotentIO . runInIO) b)) destConn)
processBranches sync progress rest
else do
let bs = map (uncurry B) cs
os = map O (es <> ts <> ds)
processBranches sync progress (os ++ bs ++ b0 : rest)
processBranches sync progress (O h : rest) = do
when debugProcessBranches $ traceM $ "processBranches O " ++ take 10 (show h)
oId <-
liftIO do
Sqlite.unsafeUnTransaction (Q.expectHashIdByHash (Cv.hash1to2 h) >>= Q.expectObjectIdForAnyHashId) srcConn
r $ Sync.sync' sync progress [Sync22.O oId]
processBranches sync progress rest
sync <- se . r $ Sync22.sync22
let progress' = Sync.transformProgress (lift . lift) progress
bHash = Branch.headHash b
se $ time "SyncInternal.processBranches" $ processBranches sync progress' [B bHash (pure b)]
-- FIXME COMMIT/ROLLBACK here, no savepoint so no release
let onSuccess a = do
liftIO (Sqlite.Connection.release destConn "sync")
pure a
onFailure e = liftIO do
if debugCommitFailedTransaction
then Sqlite.Connection.release destConn "sync"
else do
Sqlite.Connection.rollbackTo destConn "sync"
Sqlite.Connection.release destConn "sync"
error (show e)
-- (we don't write to the src anyway)
liftIO (Sqlite.Connection.rollbackTo srcConn "sync")
liftIO (Sqlite.Connection.release srcConn "sync")
either onFailure onSuccess result
do
when debugProcessBranches $ traceM $ " " ++ show b0 ++ " doesn't exist in dest db"
let h2 = CausalHash . Cv.hash1to2 $ Causal.unRawHash h
lift (runSrc (Q.loadCausalHashIdByCausalHash h2)) >>= \case
Just chId -> do
when debugProcessBranches $ traceM $ " " ++ show b0 ++ " exists in source db, so delegating to direct sync"
Sync.sync' sync progress [Sync22.C chId]
processBranches sync progress rest
Nothing ->
lift mb >>= \b -> do
when debugProcessBranches $ traceM $ " " ++ show b0 ++ " doesn't exist in either db, so delegating to Codebase.putBranch"
let (branchDeps, BD.to' -> BD.Dependencies' es ts ds) = BD.fromBranch b
when debugProcessBranches do
traceM $ " branchDeps: " ++ show (fst <$> branchDeps)
traceM $ " terms: " ++ show ts
traceM $ " decls: " ++ show ds
traceM $ " edits: " ++ show es
(cs, es, ts, ds) <-
(lift . runDest) do
cs <- filterM (fmap not . Ops2.isCausalHash . fst) branchDeps
es <- filterM (fmap not . Ops2.patchExists) es
ts <- filterM (fmap not . Ops2.termExists) ts
ds <- filterM (fmap not . Ops2.declExists) ds
pure (cs, es, ts, ds)
if null cs && null es && null ts && null ds
then do
lift (runDest (Ops2.putBranch (Branch.transform (Sqlite.idempotentIO . runInIO) b)))
processBranches sync progress rest
else do
let bs = map (uncurry B) cs
os = map O (es <> ts <> ds)
processBranches sync progress (os ++ bs ++ b0 : rest)
processBranches sync progress (O h : rest) = do
when debugProcessBranches $ traceM $ "processBranches O " ++ take 10 (show h)
oId <- lift (runSrc (Q.expectHashIdByHash (Cv.hash1to2 h) >>= Q.expectObjectIdForAnyHashId))
Sync.sync' sync progress [Sync22.O oId]
processBranches sync progress rest
sync <- se (Sync22.sync22 (Sync22.mapEnv lift syncEnv))
let progress' = Sync.transformProgress lift progress
bHash = Branch.headHash b
se $ time "SyncInternal.processBranches" $ processBranches sync progress' [B bHash (pure b)]
data Entity m
= B Branch.Hash (m (Branch m))

View File

@ -1,12 +1,10 @@
{-# LANGUAGE ScopedTypeVariables #-}
module Unison.Codebase.SqliteCodebase.SyncEphemeral where
import Data.Set (Set)
import U.Codebase.HashTags (CausalHash)
import U.Codebase.Sqlite.DbId (SchemaVersion)
import qualified U.Codebase.Sqlite.Sync22 as Sync22
import Unison.Hash (Hash)
import Unison.Prelude
data Dependencies = Dependencies
{ definitions :: Set Hash,
@ -18,4 +16,5 @@ data Error
| SrcWrongSchema SchemaVersion
| DestWrongSchema SchemaVersion
| DisappearingBranch CausalHash
deriving (Show)
deriving stock (Show)
deriving anyclass (Exception)

View File

@ -187,6 +187,7 @@ library
ApplicativeDo
BangPatterns
BlockArguments
DeriveAnyClass
DeriveFunctor
DeriveGeneric
DeriveTraversable
@ -353,6 +354,7 @@ executable tests
ApplicativeDo
BangPatterns
BlockArguments
DeriveAnyClass
DeriveFunctor
DeriveGeneric
DeriveTraversable