more progress on NodeContainer, added deleteSeries to BlockStore interface, minor update to cryptography interface

This commit is contained in:
Paul Chiusano 2016-07-17 11:35:10 -04:00
parent f2673d66c2
commit 8fadee7d11
15 changed files with 197 additions and 46 deletions

View File

@ -35,6 +35,11 @@ insertSeriesMap series hashes = do
StoreData hashMap seriesMap <- get
put (StoreData hashMap (Map.insert series hashes seriesMap))
deleteSeriesMap :: BS.Series -> Update StoreData ()
deleteSeriesMap series = do
StoreData hashMap seriesMap <- get
put (StoreData hashMap (Map.delete series seriesMap))
appendSeriesMap :: BS.Series -> Hash -> Update StoreData ()
appendSeriesMap series hash = do
StoreData hashMap seriesMap <- get
@ -46,7 +51,7 @@ readHashMap = ask >>= (pure . hashMap)
readSeriesMap :: Query StoreData (Map.Map BS.Series [Hash])
readSeriesMap = ask >>= (pure . seriesMap)
$(makeAcidic ''StoreData ['insertHashMap, 'insertSeriesMap, 'appendSeriesMap, 'readHashMap, 'readSeriesMap])
$(makeAcidic ''StoreData ['insertHashMap, 'insertSeriesMap, 'deleteSeriesMap, 'appendSeriesMap, 'readHashMap, 'readSeriesMap])
initState :: FilePath -> IO (AcidState StoreData)
initState f = openLocalStateFrom f $ StoreData Map.empty Map.empty
@ -69,6 +74,9 @@ make genHash storeState =
pure hash
Just (h:_) -> pure h
_ -> error "FileBlockStore.declareSeries had empty list of hashes in series"
deleteSeries series = do
seriesHashes <- query storeState ReadSeriesMap
update storeState $ DeleteSeriesMap series
update' series hash v = do
seriesHashes <- Map.lookup series <$> query storeState ReadSeriesMap
case seriesHashes of
@ -89,7 +97,7 @@ make genHash storeState =
_ -> pure Nothing
resolve s = (fmap head . Map.lookup s) <$> query storeState ReadSeriesMap
resolves s = Map.findWithDefault [] s <$> query storeState ReadSeriesMap
in BS.BlockStore insert lookup declareSeries update' append resolve resolves
in BS.BlockStore insert lookup declareSeries deleteSeries update' append resolve resolves
make' :: IO Hash -> FilePath -> IO (BS.BlockStore Hash)
make' gen path = initState path >>= pure . make gen

View File

@ -58,6 +58,8 @@ make genHash mapVar =
(store { seriesMap = Map.insert series (SeriesData hash []) (seriesMap store)}
, hash)
Just (SeriesData h _) -> (store, h)
deleteSeries series = IORef.atomicModifyIORef mapVar $ \store ->
(store { seriesMap = Map.delete series (seriesMap store) }, ())
update series hash v = IORef.atomicModifyIORef mapVar $ \(StoreData hm sm rc uc) ->
case Map.lookup series sm of
Just (SeriesData h _) | h == hash ->
@ -91,7 +93,7 @@ make genHash mapVar =
resolves s = IORef.readIORef mapVar >>=
(\(StoreData _ seriesMap _ _) -> pure . seriesList
$ Map.findWithDefault (SeriesData undefined []) s seriesMap)
in BS.BlockStore insert lookup declareSeries update append resolve resolves
in BS.BlockStore insert lookup declareSeries deleteSeries update append resolve resolves
make' :: IO Hash -> IO (BS.BlockStore Hash)
make' genHash = IORef.newIORef (StoreData Map.empty Map.empty Set.empty 0)

View File

@ -2,21 +2,113 @@
module Unison.NodeContainer where
import qualified Data.Bytes.Serial as S
import Data.IORef
import Control.Exception
import Control.Concurrent.STM (atomically)
import Control.Concurrent.STM.TQueue
import Data.Word
import Data.ByteString (ByteString)
import qualified Data.Bytes.Put as Put
import qualified Data.Bytes.Get as Get
import qualified Data.Bytes.Serial as S
import qualified Data.Map as Map
import qualified Data.Trie as Trie
import qualified System.Process as Process
import qualified Unison.BlockStore as BS
import qualified Unison.Cryptography as C
import qualified Unison.NodeProtocol as P
import qualified Unison.Remote as Remote
import qualified Unison.Runtime.Remote as Remote
import qualified Unison.Runtime.Block as Block
import qualified Unison.Runtime.Journal as J
import qualified Unison.Runtime.JournaledMap as JM
import qualified Unison.Runtime.JournaledTrie as JT
import qualified Unison.Runtime.Multiplex as Mux
import qualified Unison.Runtime.Remote as Remote
make :: (Ord h, S.Serial key)
=> BS.BlockStore h
-> C.Cryptography key symmetricKey signKey signature hash Remote.Cleartext
=> C.Cryptography key symmetricKey signKey signKeyPrivate signature hash Remote.Cleartext
-> BS.BlockStore h
-> P.Protocol term hash h thash
-> String
-> IO ()
make bs crypto = do
knownNodes <- JM.fromEncryptedSeries crypto bs
(BS.Series $ Put.runPutS (S.serialize (C.publicKey crypto)))
(BS.Series $ Put.runPutS (S.serialize (C.publicKey crypto) >> Put.putByteString "updates"))
:: IO (JM.JournaledMap Remote.Node BS.Series)
undefined
make crypto bs p launchNodeCmd = do
-- trie keyed by node public keys, maps nodes to their series
-- root of the trie for a node key is the parameters for that node, it's key pair, etc
knownNodes <- journaledTrie "known-nodes" :: IO (JT.JournaledTrie BS.Series)
freeList <- journaledMap "free-list" :: IO (JM.JournaledMap Word64 BS.Series)
-- trie keyed by node public keys, maps nodes to their parents' public key
nodeParents <- journaledTrie "node-parents" :: IO (JT.JournaledTrie ByteString)
packetQ <- atomically newTQueue :: IO (TQueue Mux.Packet)
-- todo: nodes should be told their keypair over standard input on startup
routing <- newIORef Trie.empty
id $
let
-- just add BlockStore.deleteSeries, have BlockStore impl delete incrementally
-- from this, using the free list
-- basic idea for main loop
-- packets are labeled with their destination
-- if a destination does not exist in existing routes, check the knownNodes map
-- if known, check parents map - if there is a parent, and the parent
-- is not a known node, then the node is linked to its parent which is
-- now destroyed; destroy that node
-- if there is a parent, and the parent is a known node, spin up
-- a process for that node and update routes accordingly
--
go = do
packet <- atomically $ readTQueue packetQ
routes <- readIORef routing
let d = Mux.destination packet
case Trie.lookup d routes of
-- route did not exist; either wake up a node, or drop the packet
-- todo: more efficient J.get in terms of readTVarIO
Nothing -> do
knownNodes <- atomically $ J.get knownNodes
nodeParents <- atomically $ J.get nodeParents
case Trie.member d knownNodes of
False -> putStrLn "dropped packet sent to unknown destination"
True ->
if ok then undefined
else undefined -- GC the node - add its entire subtree to the free list
where
ok = case Trie.lookup d nodeParents of
Nothing -> True
Just parent | Trie.member parent knownNodes -> True
| otherwise -> False
Just dest -> safely (dest (Mux.content packet)) >> go
routing0 :: Trie.Trie (ByteString -> IO ())
routing0 = Trie.fromList
[ (Mux.channelId $ P._spawn p, spawn)
, (Mux.channelId $ P._insert p, insert)
, (Mux.channelId $ P._lookup p, lookup)
, (Mux.channelId $ P._declare p, declare)
, (Mux.channelId $ P._update p, update)
, (Mux.channelId $ P._append p, append)
, (Mux.channelId $ P._resolve p, resolve)
, (Mux.channelId $ P._resolves p, resolves) ]
spawn bytes = undefined
insert bytes = undefined
lookup bytes = undefined
declare bytes = undefined
update bytes = undefined
append bytes = undefined
resolve bytes = undefined
resolves bytes = undefined
parse :: S.Serial a => ByteString -> IO a
parse bytes = either fail pure $ Get.runGetS S.deserialize bytes
safely :: IO () -> IO ()
safely action = catch action handle
handle :: SomeException -> IO ()
handle ex = putStrLn $ "Error: " ++ show ex
in writeIORef routing routing0 >> go
where
qname name = Put.runPutS $ S.serialize (C.publicKey crypto) >> Put.putByteString name
journaledTrie :: S.Serial v => ByteString -> IO (JT.JournaledTrie v)
journaledTrie name = atomically . J.checkpointEvery 10000 =<< JT.fromBlocks bs
(Block.encrypted crypto $ Block.fromSeries (BS.Series (qname name)))
(Block.encrypted crypto $ Block.fromSeries (BS.Series (qname name `mappend` "-updates")))
journaledMap :: (S.Serial k, Ord k, S.Serial v) => ByteString -> IO (JM.JournaledMap k v)
journaledMap name = atomically . J.checkpointEvery 1000 =<< JM.fromEncryptedSeries crypto bs
(BS.Series $ qname name)
(BS.Series $ qname (name `mappend` "-updates"))

View File

@ -7,7 +7,7 @@ module Unison.NodeProcess where
import Control.Concurrent.STM (atomically)
import Control.Concurrent.STM.TSem
import Control.Monad.IO.Class
import Data.Bytes.Serial (Serial, deserialize)
import Data.Bytes.Serial (Serial, serialize, deserialize)
import Data.Serialize.Get (Get)
import GHC.Generics
import System.IO (stdin, hSetBinaryMode)
@ -17,6 +17,7 @@ import Unison.Hash.Extra ()
import qualified Data.ByteArray as BA
import qualified Data.ByteString as B
import qualified Data.Bytes.Get as Get
import qualified Data.Bytes.Put as Put
import qualified Data.Serialize.Get as Get
import qualified Unison.Cryptography as C
import qualified Unison.NodeProtocol as P
@ -37,9 +38,8 @@ make :: ( BA.ByteArrayAccess key
, Serial key
, Serial signKey
, Ord thash)
=> P.Protocol term signature h thash
-> (Keypair key -> Keypair signKey ->
Cryptography key symmetricKey signKey signature hash Remote.Cleartext)
=> P.Protocol term hash h thash
-> (Keypair key -> Cryptography key symmetricKey signKey skp signature hash Remote.Cleartext)
-> Get (BlockStore h -> Mux.Multiplex (Remote.Language term thash))
-> IO ()
make protocol mkCrypto makeSandbox = do
@ -48,11 +48,12 @@ make protocol mkCrypto makeSandbox = do
interrupt <- atomically $ newTSem 0
Mux.runStandardIO (Mux.seconds 5) rem (atomically $ waitTSem interrupt) $ do
blockStore <- P.blockStoreProxy protocol
Just (keypair, signKeypair, universe, node, sandbox) <- -- todo: lifetime, budget, children
Just (keypair, universe, node, sandbox) <- -- todo: lifetime, budget, children
liftIO . Block.get blockStore . Block.serial Nothing . Block.fromSeries . Series $ nodeSeries
makeSandbox <- either fail pure $ Get.runGetS makeSandbox sandbox
sandbox <- makeSandbox blockStore
let crypto = mkCrypto keypair signKeypair
let crypto = mkCrypto keypair
let skHash = Put.runPutS (serialize $ C.hash crypto [Put.runPutS (serialize $ private keypair)])
-- todo: load this from persistent store also
connectionSandbox <- pure $ Remote.ConnectionSandbox (\_ -> pure True) (\_ -> pure True)
env <- liftIO $ Remote.makeEnv universe node blockStore
@ -62,12 +63,12 @@ make protocol mkCrypto makeSandbox = do
Mux.repeatWhile $ do
sig <- destroy
case sig of
Just sig | C.verify crypto (public signKeypair) sig "destroy" -> do
-- todo: constant time equality needed here?
Just sig | skHash == Put.runPutS (serialize sig) -> do
cancel
-- todo: actual cleanup, kill child nodes;
-- possiby modify destroyed message to take the
-- list of garbage nodes and/or references
Mux.send (P._destroyed protocol) (node, sig)
Mux.send (Mux.Channel Mux.Type skHash) ()
-- no other cleanup needed; container will reclaim resources and eventually
-- kill off linked child nodes
liftIO $ atomically (signalTSem interrupt)
pure False
_ -> pure True

View File

@ -1,9 +1,11 @@
{-# Language DeriveGeneric #-}
{-# Language ScopedTypeVariables #-}
{-# Language OverloadedStrings #-}
{-# OPTIONS_GHC -fno-warn-orphans #-}
module Unison.NodeProtocol where
import Data.ByteString (ByteString)
import Control.Monad
import Data.Bytes.Serial (Serial)
import GHC.Generics
@ -19,14 +21,15 @@ instance Serial Series
data Ack = Ack deriving Generic
instance Serial Ack
destroyedMessage :: ByteString
destroyedMessage = "destroyed"
data Protocol term signature hash thash =
Protocol
-- | Shut down and destroy this node; requires proof of knowledge of private key
{ _destroyIn :: Channel signature
-- | Destroy another node
, _destroyOut :: Channel signature
-- | Sent to container to indicate destruction was successful
, _destroyed :: Channel (Remote.Node, signature)
-- | Create a new node (TODO - pass in parameters here)
, _spawn :: Request () Remote.Node
-- | Channel used to initiate handshaking to establish an encrypted pipe of `Maybe (Remote term)`
@ -39,6 +42,7 @@ data Protocol term signature hash thash =
, _insert :: Request B.ByteString hash
, _lookup :: Request hash (Maybe B.ByteString)
, _declare :: Request Series hash
, _delete :: Request Series ()
, _update :: Request (Series,hash,B.ByteString) (Maybe hash)
, _append :: Request (Series,hash,B.ByteString) (Maybe hash)
, _resolve :: Request Series (Maybe hash)
@ -55,8 +59,9 @@ blockStoreProxy p = go <$> Mux.ask
insert bytes = mt (_insert p) bytes
lookup h = mt (_lookup p) h
declare series = mt (_declare p) series
delete series = mt (_delete p) series
update series h bytes = mt (_update p) (series,h,bytes)
append series h bytes = mt (_append p) (series,h,bytes)
resolve series = mt (_resolve p) series
resolves series = mt (_resolves p) series
in BlockStore insert lookup declare update append resolve resolves
in BlockStore insert lookup declare delete update append resolve resolves

View File

@ -96,7 +96,7 @@ fromSeries series = Block series pure pure
or :: Block (Maybe a) -> a -> Block a
or (Block series get set) a = Block series (fmap (fromMaybe a) . get) (set . Just)
encrypted :: C.Cryptography k k' k'' s h ByteString
encrypted :: C.Cryptography t1 t2 t3 t4 t5 t6 ByteString
-> Block (Maybe ByteString)
-> Block (Maybe ByteString)
encrypted crypto b = xmap' decrypt encrypt b where

View File

@ -56,7 +56,7 @@ fromSeries :: (Eq h, Ord k, Serial k, Serial v)
fromSeries bs keyframe diffs = fromBlocks bs (B.fromSeries keyframe) (B.fromSeries diffs)
fromEncryptedSeries :: (Eq h, Ord k, Serial k, Serial v)
=> C.Cryptography k1 k2 k3 s h2 ByteString
=> C.Cryptography t1 t2 t3 t4 t5 t6 ByteString
-> BS.BlockStore h
-> BS.Series
-> BS.Series

View File

@ -0,0 +1,33 @@
{-# Language DeriveGeneric #-}
{-# OPTIONS_GHC -fno-warn-orphans #-}
module Unison.Runtime.JournaledTrie where
import Data.ByteString (ByteString)
import Data.Bytes.Serial (Serial(..))
import GHC.Generics
import qualified Unison.BlockStore as BS
import qualified Unison.Runtime.Block as B
import qualified Unison.Runtime.Journal as J
import qualified Data.Trie as T
type JournaledTrie v = J.Journal (T.Trie v) (Maybe (Update v))
data Update v = Insert ByteString v | Delete ByteString deriving Generic
instance Serial v => Serial (Update v)
instance Serial v => Serial (T.Trie v) where
serialize t = serialize (T.toList t)
deserialize = T.fromList <$> deserialize
fromBlocks :: (Eq h, Serial v)
=> BS.BlockStore h
-> B.Block (Maybe ByteString)
-> B.Block (Maybe ByteString)
-> IO (JournaledTrie v)
fromBlocks bs keyframe diffs = J.fromBlocks bs Nothing apply ks ds where
ks = B.serial T.empty $ keyframe
ds = B.serial Nothing $ diffs
apply Nothing t = t
apply (Just (Insert k b)) t = T.insert k b t
apply (Just (Delete k)) t = T.delete k t

View File

@ -56,8 +56,8 @@ runStandardIO sleepAfter rem interrupt m = do
hSetBinaryMode stdin True
hSetBinaryMode stdout True
fresh <- uniqueChannel
output <- atomically Q.empty
input <- atomically newTQueue
output <- atomically Q.empty :: IO (Q.Queue (Maybe Packet))
input <- atomically newTQueue :: IO (TQueue (Maybe Packet))
cb0@(Callbacks cbm cba) <- Callbacks <$> atomically M.new <*> atomically (newTVar 0)
let env = (Q.enqueue output . (Just <$>), cb0, fresh)
activity <- atomically $ newTVar 0
@ -373,7 +373,7 @@ delayBeforeFailure = seconds 2
pipeInitiate
:: (Serial i, Serial o, Serial key, Serial u, Serial node)
=> C.Cryptography key symmetricKey signKey signature hash Cleartext
=> C.Cryptography key t1 t2 t3 t4 t5 Cleartext
-> EncryptedChannel u o i
-> (node,key)
-> u
@ -416,7 +416,7 @@ pipeInitiate crypto rootChan (recipient,recipientKey) u = do
-- handshake if we know we can't accept messages from that party
pipeRespond
:: (Serial o, Serial i, Serial u, Serial node)
=> C.Cryptography key symmetricKey signKey signature hash Cleartext
=> C.Cryptography key t1 t2 t3 t4 t5 Cleartext
-> (key -> Multiplex Bool)
-> EncryptedChannel u i o
-> (u -> node)

View File

@ -98,11 +98,11 @@ data ConnectionSandbox key =
, allowOut :: key -> Multiplex Bool }
server :: (Ord h, Serial key, Serial t, Serial h)
=> C.Cryptography key symmetricKey signKey signature hash Cleartext
=> C.Cryptography key t1 t2 t3 t4 hash Cleartext
-> ConnectionSandbox key
-> Env t h
-> Language t h
-> P.Protocol t signature h' h
-> P.Protocol t hash h' h
-> Multiplex ()
server crypto allow env lang p = do
(accept,_) <- Mux.subscribeTimed (Mux.seconds 60) (Mux.erase (P._eval p))
@ -137,11 +137,11 @@ server crypto allow env lang p = do
loop (Set.unions stillMissing)
handle :: (Ord h, Serial key, Serial t, Serial h)
=> C.Cryptography key symmetricKey signKey signature hash Cleartext
=> C.Cryptography key t1 t2 t3 t4 hash Cleartext
-> ConnectionSandbox key
-> Env t h
-> Language t h
-> P.Protocol t signature h' h
-> P.Protocol t hash h' h
-> Remote t
-> Multiplex ()
handle crypto allow env lang p r = case r of
@ -177,10 +177,10 @@ handle crypto allow env lang p r = case r of
Right r -> pure r
client :: (Ord h, Serial key, Serial t, Serial h)
=> C.Cryptography key symmetricKey signKey signature hash Cleartext
=> C.Cryptography key t1 t2 t3 t4 hash Cleartext
-> ConnectionSandbox key
-> Env t h
-> P.Protocol t signature h' h
-> P.Protocol t hash h' h
-> Node
-> Remote t
-> Multiplex ()

View File

@ -59,6 +59,7 @@ library
Unison.Runtime.ExpiringMap
Unison.Runtime.Journal
Unison.Runtime.JournaledMap
Unison.Runtime.JournaledTrie
Unison.Runtime.KeyValueStore
Unison.Runtime.Multiplex
Unison.Runtime.Queue
@ -90,6 +91,7 @@ library
blaze-html,
bytes,
bytestring,
bytestring-trie,
cacophony,
cereal,
containers,
@ -108,6 +110,7 @@ library
network,
network-simple,
prelude-extras,
process,
random,
safecopy,
scotty,
@ -118,6 +121,7 @@ library
time,
transformers,
transformers-compat,
unagi-chan,
unison-shared,
vector,
wai-extra,

View File

@ -23,6 +23,8 @@ data BlockStore h = BlockStore {
lookup :: h -> IO (Maybe ByteString),
-- | Will return a random hash if Series not already declared, otherwise returns the result of `resolve`
declareSeries :: Series -> IO h,
-- | Marks the `Series` as garbage, allowing it to be collected
deleteSeries :: Series -> IO (),
-- | Update the value associated with this series. Any previous value(s) for the series
-- are considered garbage after the `update` and may be deleted by the store.
update :: Series -> h -> ByteString -> IO (Maybe h),

View File

@ -2,6 +2,8 @@
module Unison.Cryptography where
import Control.Monad
import System.Random (randomIO)
import Control.Concurrent.STM (STM)
import Data.ByteString (ByteString)
import Data.List
@ -15,13 +17,13 @@ type Ciphertext = ByteString
-- | The noop cryptography object. Does no actual encryption or signing,
-- and hashing function is not cryptographically secure! Useful for testing / debugging.
noop :: Cryptography () () () ByteString ByteString ByteString
noop = Cryptography () () hash sign verify randomBytes encryptAsymmetric decryptAsymmetric encrypt decrypt pipeInitiator pipeResponder where
noop :: Cryptography () () () () ByteString ByteString ByteString
noop = Cryptography () gen hash sign verify randomBytes encryptAsymmetric decryptAsymmetric encrypt decrypt pipeInitiator pipeResponder where
gen = pure ((), ())
hash = finish . foldl' (\acc bs -> Murmur.hash64Add bs acc) (Murmur.hash64 ())
sign _ = "not-a-real-signature" :: ByteString
verify _ _ _ = True
-- todo: this actually needs to be a bit more realistic
randomBytes n = pure $ ByteString.replicate n 4 -- see: https://xkcd.com/221/
randomBytes n = ByteString.pack <$> replicateM n randomIO
encryptAsymmetric _ cleartext = pure cleartext
decryptAsymmetric ciphertext = Right ciphertext
encrypt _ bs = pure $ ByteString.concat bs
@ -30,12 +32,12 @@ noop = Cryptography () () hash sign verify randomBytes encryptAsymmetric decrypt
pipeResponder = pure (pure True, pure (Just ()), pure, pure)
finish h64 = (LB.toStrict . Builder.toLazyByteString . Builder.word64LE . Murmur.asWord64) h64
data Cryptography key symmetricKey signKey signature hash cleartext =
data Cryptography key symmetricKey signKey signKeyPrivate signature hash cleartext =
Cryptography
-- public key
{ publicKey :: key
-- public key, used for signing (may not be the same as `publicKey`)
, publicSigningKey :: signKey
-- generate a keypair used for signing
, generateSignKey :: IO (signKey, signKeyPrivate)
-- hash some bytes
, hash :: [ByteString] -> hash
-- sign some bytes

View File

@ -86,6 +86,7 @@ library
mtl,
murmur-hash,
prelude-extras,
random,
stm,
text,
transformers,

View File

@ -12,3 +12,4 @@ extra-deps:
- sockaddr-0.0.0
- cacophony-0.7.0
- cryptonite-0.17
- unagi-chan-0.4.0.0