mirror of
https://github.com/unisonweb/unison.git
synced 2024-09-11 10:35:57 +03:00
hacking to get node workers loading base.u and extra.u from file
This commit is contained in:
parent
00e6472404
commit
d00c3f1cfa
2
.gitignore
vendored
2
.gitignore
vendored
@ -10,7 +10,9 @@ cabal-dev
|
||||
**/cache/**
|
||||
**/build/**
|
||||
store
|
||||
codestore
|
||||
tags
|
||||
unison-src/.loaded
|
||||
**cabal.sandbox.config
|
||||
.cabal-sandbox/**
|
||||
|
||||
|
@ -13,7 +13,7 @@ import System.IO (hSetBinaryMode, hFlush, stdin)
|
||||
import System.Process as P
|
||||
import Unison.NodeProtocol.V0 (protocol)
|
||||
import Unison.NodeServer as NS
|
||||
import Unison.Parsers (unsafeParseTermWithPrelude)
|
||||
import Unison.Parsers (unsafeParseTerm)
|
||||
import Unison.Runtime.Lock (Lock(..),Lease(..))
|
||||
import Web.Scotty as S
|
||||
import qualified Data.ByteArray as BA
|
||||
@ -65,7 +65,7 @@ main = Mux.uniqueChannel >>= \rand ->
|
||||
let node = R.Node "localhost" (Put.runPutS . serialize . Base64.decodeLenient $ nodepk)
|
||||
programtxt <- S.body
|
||||
let programstr = Text.unpack (decodeUtf8 (LB.toStrict programtxt))
|
||||
let !prog = unsafeParseTermWithPrelude programstr
|
||||
let !prog = unsafeParseTerm programstr
|
||||
let !prog' = Components.minimize' prog
|
||||
liftIO . putStrLn $ "parsed " ++ show prog
|
||||
liftIO . putStrLn $ "parsed' " ++ show prog'
|
||||
|
@ -57,11 +57,11 @@ make bs = let
|
||||
StoreData trm tym (Map.insert ref met mm)
|
||||
in do
|
||||
journaledStore <- J.fromBlocks bs apply keyframeBlock updateBlock
|
||||
let readTerm h = Note.noted . atomically $ (maybeToEither (Note.note "term not found") . Map.lookup h . termMap)
|
||||
let readTerm h = Note.noted . atomically $ (maybeToEither (Note.note $ "term not found " ++ show h) . Map.lookup h . termMap)
|
||||
<$> J.get journaledStore
|
||||
typeOfTerm r = Note.noted . atomically $ (maybeToEither (Note.note "type not found") . Map.lookup r . annotationMap)
|
||||
typeOfTerm r = Note.noted . atomically $ (maybeToEither (Note.note $ "type not found " ++ show r) . Map.lookup r . annotationMap)
|
||||
<$> J.get journaledStore
|
||||
readMetadata r = Note.noted . atomically $ (maybeToEither (Note.note "metadata not found") . Map.lookup r . metadataMap)
|
||||
readMetadata r = Note.noted . atomically $ (maybeToEither (Note.note $ "metadata not found " ++ show r) . Map.lookup r . metadataMap)
|
||||
<$> J.get journaledStore
|
||||
writeTerm h t = Note.lift $ J.update (WriteTerm h t) journaledStore
|
||||
annotateTerm r t = Note.lift $ J.update (AnnotateTerm r t) journaledStore
|
||||
|
@ -99,13 +99,19 @@ make bs nodeLock p genNode launchNodeCmd = do
|
||||
writer <- Async.async . forever $ do
|
||||
(bytes, force) <- tryReadChan toNodeRead
|
||||
bytes <- tryRead bytes >>= \bytes -> case bytes of
|
||||
Nothing -> hFlush stdin >> force -- flush buffer whenever there's a pause
|
||||
Nothing -> do
|
||||
L.trace logger $ "flushing bytes sent to stdin of node worker"
|
||||
hFlush stdin >> force -- flush buffer whenever there's a pause
|
||||
Just bytes -> pure bytes -- we're saturating the channel, no need to flush manually
|
||||
let nodeBytes = Put.runPutS (S.serialize node)
|
||||
L.trace logger $ "writing bytes " ++ show (B.length bytes)
|
||||
let numbytes = B.length bytes
|
||||
L.trace logger $ "sending " ++ show numbytes ++ " bytes to node " ++ show node
|
||||
safely $
|
||||
B.hPut stdin bytes `onException`
|
||||
writeChan packetWrite (Mux.Packet nodeBytes bytes)
|
||||
do
|
||||
B.hPut stdin bytes
|
||||
L.trace logger $ "done sending " ++ show numbytes ++ " bytes to node " ++ show node
|
||||
`onException`
|
||||
writeChan packetWrite (Mux.Packet nodeBytes bytes)
|
||||
|
||||
-- establish routes for processing packets coming from the node
|
||||
routes <- id $
|
||||
@ -125,7 +131,9 @@ make bs nodeLock p genNode launchNodeCmd = do
|
||||
handleRequest :: (S.Serial a, S.Serial b) => (a -> IO b) -> ByteString -> IO ()
|
||||
handleRequest h bytes = safely $ do
|
||||
(a, replyTo) <- either fail pure (Get.runGetS S.deserialize bytes)
|
||||
L.debug logger $ "got request " ++ show (Base64.encode replyTo)
|
||||
b <- h a
|
||||
L.debug logger $ "got response " ++ show (Base64.encode replyTo)
|
||||
send $ Put.runPutS (S.serialize (Mux.Packet replyTo $ Put.runPutS (S.serialize b)))
|
||||
insert = handleRequest (BS.insert bs)
|
||||
lookup = handleRequest (BS.lookup bs)
|
||||
|
@ -53,17 +53,17 @@ data Protocol term signature hash thash =
|
||||
blockStoreProxy :: (Serial hash) => Protocol term signature hash thash -> Mux.Multiplex (BlockStore hash)
|
||||
blockStoreProxy p = go <$> Mux.ask
|
||||
where
|
||||
timeout = 5000000 :: Mux.Microseconds
|
||||
timeout = Mux.seconds 25
|
||||
go env =
|
||||
let
|
||||
mt :: (Serial a, Serial b) => Request a b -> a -> IO b
|
||||
mt chan a = Mux.run env . join $ Mux.requestTimed timeout chan a
|
||||
insert bytes = mt (_insert p) bytes
|
||||
lookup h = mt (_lookup p) h
|
||||
declare series = mt (_declare p) series
|
||||
delete series = mt (_delete p) series
|
||||
update series h bytes = mt (_update p) (series,h,bytes)
|
||||
append series h bytes = mt (_append p) (series,h,bytes)
|
||||
resolve series = mt (_resolve p) series
|
||||
resolves series = mt (_resolves p) series
|
||||
mt :: (Serial a, Serial b) => String -> Request a b -> a -> IO b
|
||||
mt msg chan a = Mux.run env . join $ Mux.requestTimed msg timeout chan a
|
||||
insert bytes = mt "BlockStore.insert" (_insert p) bytes
|
||||
lookup h = mt "BlockStore.lookup" (_lookup p) h
|
||||
declare series = mt "BlockStore.declare" (_declare p) series
|
||||
delete series = mt "BlockStore.delete" (_delete p) series
|
||||
update series h bytes = mt "BlockStore.update" (_update p) (series,h,bytes)
|
||||
append series h bytes = mt "BlockStore.append" (_append p) (series,h,bytes)
|
||||
resolve series = mt "BlockStore.resolve" (_resolve p) series
|
||||
resolves series = mt "BlockStore.resolves" (_resolves p) series
|
||||
in BlockStore insert lookup declare delete update append resolve resolves
|
||||
|
@ -43,7 +43,7 @@ make :: ( BA.ByteArrayAccess key
|
||||
-> (Keypair key -> Cryptography key symmetricKey signKey skp signature hash Remote.Cleartext)
|
||||
-> Get (Cryptography key symmetricKey signKey skp signature hash Remote.Cleartext
|
||||
-> BlockStore h
|
||||
-> IO (Remote.Language term thash, term -> IO (Either String ())))
|
||||
-> IO (Remote.Language term thash, term -> IO (Either String term), IO ()))
|
||||
-> IO ()
|
||||
make protocol mkCrypto makeSandbox = do
|
||||
logger <- L.scope "worker" <$> Config.loggerStandardError
|
||||
@ -56,21 +56,23 @@ make protocol mkCrypto makeSandbox = do
|
||||
(sandbox, _, rem) <- Mux.deserializeHandle1 stdin (Get.runGetPartial deserialize rem)
|
||||
publicKey <- either die pure $ Get.runGetS deserialize (Remote.publicKey node)
|
||||
let keypair = Keypair publicKey privateKey
|
||||
L.debug logger $ "parsed private key, node id, universe, sandbox description"
|
||||
L.debug logger $ "remaining bytes: " ++ show (B.length rem)
|
||||
interrupt <- atomically $ newTSem 0
|
||||
Mux.runStandardIO logger (Mux.seconds 5) rem (atomically $ waitTSem interrupt) $ do
|
||||
blockStore <- P.blockStoreProxy protocol
|
||||
makeSandbox <- either die pure $ Get.runGetS makeSandbox sandbox
|
||||
let crypto = mkCrypto keypair
|
||||
(sandbox, typecheck) <- liftIO $ makeSandbox crypto blockStore
|
||||
(sandbox, typecheck, initialize) <- liftIO $ makeSandbox crypto blockStore
|
||||
let skHash = Put.runPutS (serialize $ C.hash crypto [Put.runPutS (serialize $ private keypair)])
|
||||
-- todo: load this from persistent store also
|
||||
connectionSandbox <- pure $ Remote.ConnectionSandbox (\_ -> pure True) (\_ -> pure True)
|
||||
env <- liftIO $ Remote.makeEnv universe node blockStore
|
||||
Mux.info $ "... done initializing"
|
||||
_ <- Remote.server crypto connectionSandbox env sandbox protocol
|
||||
_ <- do
|
||||
(prog, cancel) <- Mux.subscribeTimed (Mux.seconds 60) (P._localEval protocol)
|
||||
liftIO $ initialize
|
||||
Mux.info $ "... done initializing"
|
||||
Mux.fork . Mux.scope "_localEval" . Mux.repeatWhile $ do
|
||||
e <- prog
|
||||
case e of
|
||||
@ -83,7 +85,7 @@ make protocol mkCrypto makeSandbox = do
|
||||
Mux.warn $ "typechecking failed on: " ++ show r
|
||||
Mux.warn $ "typechecking error:\n" ++ err
|
||||
pure True
|
||||
Right _ -> do
|
||||
Right r -> do
|
||||
Mux.debug "typechecked"
|
||||
r <- liftIO $ Remote.eval sandbox r
|
||||
Mux.debug $ "evaluated to " ++ show r
|
||||
|
@ -174,6 +174,12 @@ scope :: String -> Multiplex a -> Multiplex a
|
||||
scope msg = local tweak where
|
||||
tweak (a,b,c,logger) = (a,b,c,L.scope msg logger)
|
||||
|
||||
-- | Crash with a message. Include the current logging scope.
|
||||
crash :: String -> Multiplex a
|
||||
crash msg = scope msg $ do
|
||||
l <- logger
|
||||
fail (show $ L.getScope l)
|
||||
|
||||
info, warn, debug :: String -> Multiplex ()
|
||||
info msg = logger >>= \logger -> liftIO $ L.info logger msg
|
||||
warn msg = logger >>= \logger -> liftIO $ L.warn logger msg
|
||||
@ -253,40 +259,41 @@ type Request a b = Channel (a, Channel b)
|
||||
type Microseconds = Int
|
||||
|
||||
requestTimedVia' :: (Serial a, Serial b)
|
||||
=> Microseconds
|
||||
=> String
|
||||
-> Microseconds
|
||||
-> (STM (a, Channel b) -> Multiplex ())
|
||||
-> Channel b
|
||||
-> STM a
|
||||
-> Multiplex (Multiplex b)
|
||||
requestTimedVia' micros send replyTo a = do
|
||||
requestTimedVia' msg micros send replyTo a = do
|
||||
env <- ask
|
||||
(receive, cancel) <- receiveCancellable replyTo
|
||||
send $ (,replyTo) <$> a
|
||||
watchdog <- liftIO . C.forkIO $ do
|
||||
liftIO $ C.threadDelay micros
|
||||
run env cancel
|
||||
run env (cancel $ "requestTimedVia timeout " ++ msg)
|
||||
pure $ receive <* liftIO (C.killThread watchdog)
|
||||
|
||||
requestTimedVia :: (Serial a, Serial b) => Microseconds -> Request a b -> Channel b -> STM a
|
||||
requestTimedVia :: (Serial a, Serial b) => String -> Microseconds -> Request a b -> Channel b -> STM a
|
||||
-> Multiplex (Multiplex b)
|
||||
requestTimedVia micros req replyTo a =
|
||||
requestTimedVia' micros (send' req) replyTo a
|
||||
requestTimedVia msg micros req replyTo a =
|
||||
requestTimedVia' msg micros (send' req) replyTo a
|
||||
|
||||
requestTimed' :: (Serial a, Serial b) => Microseconds -> Request a b -> STM a -> Multiplex (Multiplex b)
|
||||
requestTimed' micros req a = do
|
||||
requestTimed' :: (Serial a, Serial b) => String -> Microseconds -> Request a b -> STM a -> Multiplex (Multiplex b)
|
||||
requestTimed' msg micros req a = do
|
||||
replyTo <- channel
|
||||
requestTimedVia micros req replyTo a
|
||||
requestTimedVia msg micros req replyTo a
|
||||
|
||||
requestTimed :: (Serial a, Serial b) => Microseconds -> Request a b -> a -> Multiplex (Multiplex b)
|
||||
requestTimed micros req a = do
|
||||
requestTimed :: (Serial a, Serial b) => String -> Microseconds -> Request a b -> a -> Multiplex (Multiplex b)
|
||||
requestTimed msg micros req a = do
|
||||
replyTo <- channel
|
||||
env <- ask
|
||||
(receive, cancel) <- receiveCancellable replyTo
|
||||
send req (a, replyTo)
|
||||
watchdog <- liftIO . C.forkIO $ do
|
||||
liftIO $ C.threadDelay micros
|
||||
run env cancel
|
||||
pure $ receive <* liftIO (C.killThread watchdog) <* cancel
|
||||
run env (cancel $ "requestTimed timeout " ++ msg)
|
||||
pure $ receive <* liftIO (C.killThread watchdog) <* cancel ("requestTimed completed")
|
||||
|
||||
type Cleartext = B.ByteString
|
||||
type Ciphertext = B.ByteString
|
||||
@ -294,18 +301,19 @@ type CipherState = (Cleartext -> STM Ciphertext, Ciphertext -> STM Cleartext)
|
||||
|
||||
encryptedRequestTimedVia
|
||||
:: (Serial a, Serial b)
|
||||
=> CipherState
|
||||
=> String
|
||||
-> CipherState
|
||||
-> Microseconds
|
||||
-> ((a,Channel b) -> Multiplex ())
|
||||
-> Channel b
|
||||
-> a
|
||||
-> Multiplex b
|
||||
encryptedRequestTimedVia (_,decrypt) micros send replyTo@(Channel _ bs) a = do
|
||||
responseCiphertext <- receiveTimed micros (Channel Type bs)
|
||||
encryptedRequestTimedVia msg (_,decrypt) micros send replyTo@(Channel _ bs) a = do
|
||||
responseCiphertext <- receiveTimed msg micros (Channel Type bs)
|
||||
send (a, replyTo)
|
||||
responseCiphertext <- responseCiphertext -- force the receive
|
||||
responseCleartext <- liftIO . atomically . decrypt $ responseCiphertext
|
||||
either fail pure $ Get.runGetS deserialize responseCleartext
|
||||
either crash pure $ Get.runGetS deserialize responseCleartext
|
||||
|
||||
encryptAndSendTo
|
||||
:: (Serial a, Serial node)
|
||||
@ -346,29 +354,29 @@ send' (Channel _ key) a = do
|
||||
~(send,_,_,_) <- ask
|
||||
liftIO . atomically $ send (Packet key . Put.runPutS . serialize <$> a)
|
||||
|
||||
receiveCancellable :: Serial a => Channel a -> Multiplex (Multiplex a, Multiplex ())
|
||||
receiveCancellable :: Serial a => Channel a -> Multiplex (Multiplex a, String -> Multiplex ())
|
||||
receiveCancellable (Channel _ key) = do
|
||||
(_,Callbacks cbs cba,_,_) <- ask
|
||||
result <- liftIO newEmptyMVar
|
||||
liftIO . atomically $ M.insert (putMVar result . Right) key cbs
|
||||
liftIO $ bumpActivity' cba
|
||||
cancel <- pure $ do
|
||||
cancel <- pure $ \reason -> do
|
||||
liftIO . atomically $ M.delete key cbs
|
||||
liftIO $ putMVar result (Left "cancelled")
|
||||
force <- pure . liftIO $ do
|
||||
bytes <- takeMVar result
|
||||
bytes <- either fail pure bytes
|
||||
either fail pure $ Get.runGetS deserialize bytes
|
||||
liftIO $ putMVar result (Left $ "Mux.cancelled: " ++ reason)
|
||||
force <- pure . scope "receiveCancellable" $ do
|
||||
bytes <- liftIO $ takeMVar result
|
||||
bytes <- either crash pure bytes
|
||||
either crash pure $ Get.runGetS deserialize bytes
|
||||
pure (force, cancel)
|
||||
|
||||
receiveTimed :: Serial a => Microseconds -> Channel a -> Multiplex (Multiplex a)
|
||||
receiveTimed micros chan = do
|
||||
receiveTimed :: Serial a => String -> Microseconds -> Channel a -> Multiplex (Multiplex a)
|
||||
receiveTimed msg micros chan = do
|
||||
(force, cancel) <- receiveCancellable chan
|
||||
env <- ask
|
||||
watchdog <- liftIO . C.forkIO $ do
|
||||
liftIO $ C.threadDelay micros
|
||||
run env cancel
|
||||
pure $ force <* liftIO (C.killThread watchdog) <* cancel
|
||||
run env (cancel $ "receiveTimed timeout during " ++ msg)
|
||||
pure $ scope "receiveTimed" (force <* liftIO (C.killThread watchdog) <* cancel ("receiveTimed completed" ++ msg))
|
||||
|
||||
timeout' :: Microseconds -> a -> Multiplex a -> Multiplex a
|
||||
timeout' micros onTimeout m = fromMaybe onTimeout <$> timeout micros m
|
||||
@ -413,15 +421,15 @@ subscribeTimed micros chan = do
|
||||
loop logger activity result cancel
|
||||
|
||||
subscribe :: Serial a => Channel a -> Multiplex (Multiplex a, Multiplex ())
|
||||
subscribe (Channel _ key) = do
|
||||
subscribe (Channel _ key) = scope "subscribe" $ do
|
||||
(_, Callbacks cbs cba, _, _) <- ask
|
||||
q <- liftIO . atomically $ newTQueue
|
||||
liftIO . atomically $ M.insert (atomically . writeTQueue q) key cbs
|
||||
liftIO $ bumpActivity' cba
|
||||
unsubscribe <- pure . liftIO . atomically . M.delete key $ cbs
|
||||
force <- pure . liftIO $ do
|
||||
bytes <- atomically $ readTQueue q
|
||||
either fail pure $ Get.runGetS deserialize bytes
|
||||
force <- pure $ do
|
||||
bytes <- liftIO . atomically $ readTQueue q
|
||||
either crash pure $ Get.runGetS deserialize bytes
|
||||
pure (force, unsubscribe)
|
||||
|
||||
seconds :: Microseconds -> Int
|
||||
@ -487,7 +495,7 @@ pipeInitiate crypto rootChan (recipient,recipientKey) u = scope "pipeInitiate" $
|
||||
bytes <- fetchh
|
||||
debug "... handshake round trip completed"
|
||||
case bytes of
|
||||
Nothing -> cancelh >> cancelc >> fail "cancelled handshake"
|
||||
Nothing -> cancelh >> cancelc >> crash "cancelled handshake"
|
||||
Just bytes -> liftIO (atomically $ decrypt bytes) >> go
|
||||
|
||||
-- todo: add access control here, better to bail ASAP (or after 1s delay
|
||||
@ -505,7 +513,7 @@ pipeRespond crypto allow _ extractSender payload = do
|
||||
(doneHandshake, senderKey, encrypt, decrypt) <- liftIO $ C.pipeResponder crypto
|
||||
debug $ "decrypting initial payload"
|
||||
bytes <- (liftLogged "[Mux.pipeRespond] decrypt" . atomically . decrypt) payload
|
||||
(u, chans@(handshakeChan,connectedChan)) <- either fail pure $ Get.runGetS deserialize bytes
|
||||
(u, chans@(handshakeChan,connectedChan)) <- either crash pure $ Get.runGetS deserialize bytes
|
||||
debug $ "handshake channels: " ++ show chans
|
||||
let sender = extractSender u
|
||||
handshakeSub <- subscribeTimed handshakeTimeout handshakeChan
|
||||
@ -531,7 +539,7 @@ pipeRespond crypto allow _ extractSender payload = do
|
||||
Nothing -> pure ()
|
||||
Just senderKey -> allow senderKey >>= \ok ->
|
||||
if ok then pure ()
|
||||
else liftIO (C.threadDelay delayBeforeFailure) >> fail "disallowed key"
|
||||
else liftIO (C.threadDelay delayBeforeFailure) >> crash "disallowed key"
|
||||
go = do
|
||||
ready <- liftIO $ atomically doneHandshake
|
||||
checkSenderKey
|
||||
@ -545,5 +553,5 @@ pipeRespond crypto allow _ extractSender payload = do
|
||||
nest sender $ send' chanh (encrypt B.empty)
|
||||
bytes <- fetchh
|
||||
case bytes of
|
||||
Nothing -> cancelh >> cancelc >> fail "cancelled handshake"
|
||||
Nothing -> cancelh >> cancelc >> crash "cancelled handshake"
|
||||
Just bytes -> liftIO (atomically $ decrypt bytes) >> go
|
||||
|
@ -132,7 +132,8 @@ server crypto allow env lang p = do
|
||||
where
|
||||
fetch hs = do
|
||||
syncChan <- Mux.channel
|
||||
Mux.encryptedRequestTimedVia cipherstate (Mux.seconds 5) (send . Just . Just) syncChan (Set.toList hs)
|
||||
Mux.encryptedRequestTimedVia "fetching hashes"
|
||||
cipherstate (Mux.seconds 5) (send . Just . Just) syncChan (Set.toList hs)
|
||||
loop needs | Set.null needs = pure ()
|
||||
loop needs = fetch needs >>= \hashes -> case hashes of
|
||||
Nothing -> fail "expected hashes, got timeout"
|
||||
@ -185,7 +186,7 @@ handle crypto allow env lang p r = Mux.debug (show r) >> case r of
|
||||
pure $ node lang (currentNode env)
|
||||
runLocal Spawn = do
|
||||
Mux.debug $ "runLocal Spawn"
|
||||
n <- Mux.requestTimed (Mux.seconds 5) (P._spawn p) B.empty
|
||||
n <- Mux.requestTimed "runLocal.spawn" (Mux.seconds 5) (P._spawn p) B.empty
|
||||
n <- n
|
||||
Mux.debug $ "runLocal Spawn completed: " ++ show n
|
||||
pure (node lang n)
|
||||
@ -198,7 +199,8 @@ handle crypto allow env lang p r = Mux.debug (show r) >> case r of
|
||||
pure (unit lang)
|
||||
runLocal (ReceiveAsync chan@(Channel cid) (Seconds seconds)) = do
|
||||
Mux.debug $ "runLocal ReceiveAsync " ++ show (seconds, cid)
|
||||
_ <- Mux.receiveTimed (floor $ seconds * 1000 * 1000) ((Mux.Channel Mux.Type cid) :: Mux.Channel (Maybe B.ByteString))
|
||||
_ <- Mux.receiveTimed ("receiveAsync on " ++ show chan)
|
||||
(floor $ seconds * 1000 * 1000) ((Mux.Channel Mux.Type cid) :: Mux.Channel (Maybe B.ByteString))
|
||||
pure (remote lang (Step (Local (Receive chan))))
|
||||
runLocal (Receive (Channel cid)) = do
|
||||
Mux.debug $ "runLocal Receive " ++ show cid
|
||||
@ -233,7 +235,7 @@ client crypto allow env p recipient r = Mux.scope "Remote.client" $ do
|
||||
Mux.info $ "connected"
|
||||
replyChan <- Mux.channel
|
||||
let send' (a,b) = send (Just (a,b))
|
||||
_ <- Mux.encryptedRequestTimedVia cipherstate (Mux.seconds 5) send' replyChan r
|
||||
_ <- Mux.encryptedRequestTimedVia "client ack" cipherstate (Mux.seconds 5) send' replyChan r
|
||||
Mux.debug $ "got ack on " ++ show replyChan
|
||||
-- todo - might want to retry if ack doesn't come back
|
||||
id $
|
||||
|
@ -2,50 +2,82 @@
|
||||
|
||||
module Main where
|
||||
|
||||
import Control.Concurrent.STM.TVar
|
||||
import Control.Monad
|
||||
import System.Directory (doesFileExist)
|
||||
import System.IO (stderr)
|
||||
import Unison.Hash (Hash)
|
||||
import Unison.NodeProtocol.V0 (protocol)
|
||||
import Unison.NodeWorker as W
|
||||
import Unison.SerializationAndHashing (TermV)
|
||||
import qualified Data.Map as Map
|
||||
import qualified Control.Concurrent.STM as STM
|
||||
import qualified Data.Set as Set
|
||||
import qualified Data.Text as Text
|
||||
import qualified Data.Text.IO as Text.IO
|
||||
import qualified Unison.Config as Config
|
||||
import qualified Unison.Cryptography as C
|
||||
import qualified Unison.Eval as Eval
|
||||
import qualified Unison.Eval.Interpreter as I
|
||||
import qualified Unison.Node as Node
|
||||
import qualified Unison.Node.BasicNode as BasicNode
|
||||
import qualified Unison.Node.Builtin as Builtin
|
||||
import qualified Unison.Node.FileStore as Store
|
||||
import qualified Unison.Note as Note
|
||||
import qualified Unison.Parsers as Parsers
|
||||
import qualified Unison.Reference as Reference
|
||||
import qualified Unison.Remote as RT
|
||||
import qualified Unison.Runtime.ExtraBuiltins as ExtraBuiltins
|
||||
import qualified Unison.Runtime.Remote as R
|
||||
import qualified Unison.SerializationAndHashing as SAH
|
||||
import qualified Unison.Term as Term
|
||||
import qualified Unison.Typechecker as Typechecker
|
||||
import qualified Unison.Util.Logger as L
|
||||
|
||||
main :: IO ()
|
||||
main = W.make protocol crypto (pure lang) where
|
||||
main = do
|
||||
logger <- L.scope "worker-main" <$> Config.loggerTo stderr
|
||||
W.make protocol crypto (pure $ lang logger) where
|
||||
crypto keypair = C.noop (W.public keypair)
|
||||
lang crypto blockstore = do
|
||||
lang logger crypto blockstore = do
|
||||
let b0 = Builtin.makeBuiltins
|
||||
b1 <- ExtraBuiltins.makeAPI blockstore crypto
|
||||
pure $ go b0 b1
|
||||
store <- Store.make "codestore"
|
||||
backend <- BasicNode.make SAH.hash store (\whnf -> b0 whnf ++ b1 whnf)
|
||||
initialized <- STM.atomically $ newTVar False
|
||||
pure $ go backend initialized b0 b1
|
||||
where
|
||||
go b0 b1 = (lang, typecheck) where
|
||||
lang :: R.Language TermV Hash
|
||||
lang = R.Language localDependencies eval apply node unit channel local unRemote remote
|
||||
codestore = R.makeCodestore blockstore :: R.Codestore TermV Hash
|
||||
localDependencies _ = Set.empty -- todo, compute this for real
|
||||
evaluator = I.eval allprimops
|
||||
whnf = Eval.whnf evaluator gethash
|
||||
allbuiltins = b0 whnf ++ b1 whnf
|
||||
allprimops = Map.fromList [ (r, op) | Builtin.Builtin r (Just op) _ _ <- allbuiltins ]
|
||||
gethash h = Note.lift $ do
|
||||
[(h',t)] <- R.getHashes codestore (Set.singleton h)
|
||||
guard $ h == h'
|
||||
pure t
|
||||
typeEnv ref = case lookup ref [ (r, t) | Builtin.Builtin r _ t _ <- allbuiltins ] of
|
||||
Nothing -> fail $ "unknown reference " ++ show ref
|
||||
Just t -> pure t
|
||||
eval t = Note.run (whnf t)
|
||||
typecheck term = Note.attemptRun . void $ Typechecker.synthesize typeEnv term
|
||||
go backend initialized b0 b1 =
|
||||
let
|
||||
lang :: R.Language TermV Hash
|
||||
lang = R.Language localDependencies eval apply node unit channel local unRemote remote
|
||||
codestore = R.makeCodestore blockstore :: R.Codestore TermV Hash
|
||||
localDependencies _ = Set.empty -- todo, compute this for real
|
||||
whnf e = do -- todo: may want to have this use evaluator + codestore directly
|
||||
Note.lift . STM.atomically $ readTVar initialized >>= \ok ->
|
||||
if ok then pure ()
|
||||
else STM.retry
|
||||
[(_,_,e)] <- Node.evaluateTerms backend [([], e)]
|
||||
pure e
|
||||
eval t = Note.run (whnf t)
|
||||
-- evaluator = I.eval allprimops
|
||||
-- allbuiltins = b0 whnf ++ b1 whnf
|
||||
-- allprimops = Map.fromList [ (r, op) | Builtin.Builtin r (Just op) _ _ <- allbuiltins ]
|
||||
typecheck e = do
|
||||
bindings <- Note.run $ Node.allTermsByVarName Term.ref backend
|
||||
let e' = Parsers.bindBuiltins bindings [] e
|
||||
Note.unnote (Node.typeAt backend e' []) >>= \t -> case t of
|
||||
Left note -> pure $ Left (show note)
|
||||
Right _ -> pure (Right e')
|
||||
initialize = do
|
||||
L.info logger "checking if base libraries loaded"
|
||||
alreadyInitialized <- doesDirectoryExist "codestore"
|
||||
when (not alreadyInitialized) $ do
|
||||
L.info logger "codestore/ directory not found, loading base libraries..."
|
||||
loadDeclarations "unison-src/base.u" backend
|
||||
loadDeclarations "unison-src/extra.u" backend
|
||||
hs <- Note.run (Node.allTerms backend)
|
||||
R.saveHashes codestore [ (h,v) | (Reference.Derived h, v) <- hs ]
|
||||
STM.atomically $ writeTVar initialized True
|
||||
in (lang, typecheck, initialize)
|
||||
apply = Term.app
|
||||
node = Term.node
|
||||
unit = Term.builtin "()"
|
||||
@ -54,3 +86,10 @@ main = W.make protocol crypto (pure lang) where
|
||||
unRemote (Term.Distributed' (Term.Remote r)) = Just r
|
||||
unRemote _ = Nothing
|
||||
remote = Term.remote
|
||||
loadDeclarations path node = do
|
||||
txt <- Text.IO.readFile path
|
||||
let str = Text.unpack txt
|
||||
L.info logger $ "loading " ++ path
|
||||
r <- Note.run $ Node.declare' Term.ref str node
|
||||
L.info logger $ "done loading " ++ path
|
||||
pure r
|
||||
|
@ -161,6 +161,7 @@ executable worker
|
||||
ghc-options: -funbox-strict-fields -O2
|
||||
|
||||
build-depends:
|
||||
aeson,
|
||||
async,
|
||||
base,
|
||||
base64-bytestring,
|
||||
@ -171,6 +172,8 @@ executable worker
|
||||
configurator,
|
||||
cryptonite,
|
||||
curl,
|
||||
directory,
|
||||
filepath,
|
||||
free,
|
||||
hashable,
|
||||
list-t,
|
||||
|
@ -250,11 +250,7 @@ node eval hash store =
|
||||
-- existing metadata store of the Node.
|
||||
declare :: (Monad m, Var v) => (h -> Term v) -> [(v, Term v)] -> Node m v h (Type v) (Term v) -> Noted m ()
|
||||
declare ref bindings node = do
|
||||
termBuiltins <- do
|
||||
-- grab all definitions in the node
|
||||
results <- search node Term.blank [] 1000000 (Metadata.Query "") Nothing
|
||||
pure [ (v, ref h) | (h, md) <- references results
|
||||
, v <- toList $ Metadata.firstName (Metadata.names md) ]
|
||||
termBuiltins <- allTermsByVarName ref node
|
||||
let groups = Components.components bindings
|
||||
-- watch msg a = trace (msg ++ show (map (Var.name . fst) a)) a
|
||||
bindings' = groups >>= \c -> case c of
|
||||
@ -276,3 +272,15 @@ declare' ref bindings node = do
|
||||
Parser.Fail err _ -> Noted (pure $ Left (Note err))
|
||||
Parser.Succeed bs _ _ -> pure bs
|
||||
declare ref bs node
|
||||
|
||||
allTermsByVarName :: (Monad m, Var v) => (h -> Term v) -> Node m v h (Type v) (Term v) -> Noted m [(v, Term v)]
|
||||
allTermsByVarName ref node = do
|
||||
-- grab all definitions in the node
|
||||
results <- search node Term.blank [] 1000000 (Metadata.Query "") Nothing
|
||||
pure [ (v, ref h) | (h, md) <- references results
|
||||
, v <- toList $ Metadata.firstName (Metadata.names md) ]
|
||||
|
||||
allTerms :: (Monad m, Var v) => Node m v h (Type v) (Term v) -> Noted m [(h, Term v)]
|
||||
allTerms node = do
|
||||
hs <- map fst . references <$> search node Term.blank [] 100000 (Metadata.Query "") Nothing
|
||||
Map.toList <$> terms node hs
|
||||
|
@ -43,28 +43,7 @@ make hash store getBuiltins =
|
||||
readTerm h = Store.readTerm store h
|
||||
whnf = Eval.whnf eval readTerm
|
||||
node = Node.node eval hash store
|
||||
|
||||
-- stub :: Metadata V R.Reference -> Type V -> N.Noted IO ()
|
||||
-- stub s t = () <$ Node.createTerm node (Term.blank `Term.ann` t) s
|
||||
|
||||
in N.run $ do
|
||||
_ <- Node.createTerm node (unsafeParseTerm "a -> a") (prefix "identity")
|
||||
mapM_ (\(B.Builtin r _ t md) -> Node.updateMetadata node r md *> Store.annotateTerm store r t)
|
||||
builtins
|
||||
compose <- Node.createTerm node (unsafeParseTerm "f g x -> f (g x)") (prefix "compose")
|
||||
-- Node.createTerm node (\f -> bind (compose pure f))
|
||||
let composeH = unsafeHashStringFromReference compose
|
||||
_ <- Node.createTerm node (unsafeParseTerm $ "f -> bind ("++composeH++" pure f)")
|
||||
(prefix "map")
|
||||
pure node
|
||||
where
|
||||
unsafeHashStringFromReference (R.Derived h) = "#" ++ Text.unpack (H.base64 h)
|
||||
unsafeHashStringFromReference _ = error "tried to extract a Derived hash from a Builtin"
|
||||
|
||||
prefix :: Text -> Metadata V h
|
||||
prefix s = prefixes [s]
|
||||
|
||||
prefixes :: [Text] -> Metadata V h
|
||||
prefixes s = Metadata Metadata.Term
|
||||
(Metadata.Names (map Var.named s))
|
||||
Nothing
|
||||
|
@ -153,13 +153,13 @@ makeBuiltins whnf =
|
||||
op [a] = pure $ Term.remote (Remote.Step (Remote.Local (Remote.Pure a)))
|
||||
op _ = fail "unpossible"
|
||||
in (r, Just (I.Primop 1 op), remoteSignatureOf "Remote.pure", prefix "pure")
|
||||
, let r = R.Builtin "Remote.map"
|
||||
op [f, r] = pure $ Term.builtin "Remote.bind" `Term.app`
|
||||
(Term.lam' ["x"] $ Term.remote
|
||||
(Remote.Step . Remote.Local . Remote.Pure $ f `Term.app` Term.var' "x"))
|
||||
`Term.app` r
|
||||
op _ = fail "unpossible"
|
||||
in (r, Just (I.Primop 2 op), remoteSignatureOf "Remote.map", prefix "map")
|
||||
--, let r = R.Builtin "Remote.map"
|
||||
-- op [f, r] = pure $ Term.builtin "Remote.bind" `Term.app`
|
||||
-- (Term.lam' ["x"] $ Term.remote
|
||||
-- (Remote.Step . Remote.Local . Remote.Pure $ f `Term.app` Term.var' "x"))
|
||||
-- `Term.app` r
|
||||
-- op _ = fail "unpossible"
|
||||
-- in (r, Just (I.Primop 2 op), remoteSignatureOf "Remote.map", prefix "map")
|
||||
, let r = R.Builtin "Remote.receiveAsync"
|
||||
op [chan, timeout] = do
|
||||
Term.Number' seconds <- whnf timeout
|
||||
|
@ -91,10 +91,10 @@ termBuiltins = (Var.named *** Term.ref) <$> (
|
||||
, AliasFromModule "Text"
|
||||
["concatenate", "left", "right", "center", "justify"] []
|
||||
, AliasFromModule "Remote"
|
||||
["fork", "receive", "receiveAsync", "pure", "bind", "map", "channel", "send", "here", "at", "spawn"] []
|
||||
["fork", "receive", "receiveAsync", "pure", "bind", "channel", "send", "here", "at", "spawn"] []
|
||||
, AliasFromModule "Color" ["rgba"] []
|
||||
, AliasFromModule "Symbol" ["Symbol"] []
|
||||
, AliasFromModule "Index" ["lookup", "unsafeLookup", "insert", "unsafeInsert", "empty", "unsafeEmpty"] []
|
||||
, AliasFromModule "Index" ["lookup", "unsafeLookup", "insert", "unsafeInsert", "unsafeEmpty"] []
|
||||
, AliasFromModule "Html" ["getLinks", "getHref", "getDescription"] []
|
||||
, AliasFromModule "Http" ["getURL", "unsafeGetURL"] []
|
||||
] >>= unpackAliases)
|
||||
|
@ -39,6 +39,8 @@ node = do
|
||||
base <- Note.run $ do
|
||||
-- grab all definitions in the node
|
||||
results <- Node.search node Term.blank [] 1000000 (Metadata.Query "") Nothing
|
||||
sources <- Node.terms node (map fst $ Node.references results)
|
||||
Note.lift $ putStrLn (show sources)
|
||||
let x = [ (v, Term.ref h) | (h, md) <- Node.references results
|
||||
, v <- toList $ Metadata.firstName (Metadata.names md) ]
|
||||
Note.lift $ putStrLn (show x)
|
||||
|
@ -34,6 +34,7 @@ tests = withResource Common.node (\_ -> pure ()) $ \node ->
|
||||
, t "Either.fold ((+) 1) ((+) 2) (Either.Right 1)" "3"
|
||||
, t "Either.swap (Left 1)" "Either.Right 1"
|
||||
, t "Pair.fold (x y -> x) (1, 2)" "1"
|
||||
, t "const 41 0" "41"
|
||||
, t "1st (1,2,3,4)" "1"
|
||||
, t "2nd (1,2 + 1,3,4)" "3"
|
||||
]
|
||||
|
@ -13,3 +13,8 @@ extra-deps:
|
||||
- cacophony-0.7.0
|
||||
- cryptonite-0.17
|
||||
- unagi-chan-0.4.0.0
|
||||
|
||||
extra-include-dirs:
|
||||
- /usr/local/include
|
||||
extra-lib-dirs:
|
||||
- /usr/local/lib
|
||||
|
@ -1,6 +1,9 @@
|
||||
Remote.transfer : Node -> Remote Unit;
|
||||
Remote.transfer node = Remote.at node unit;
|
||||
|
||||
Remote.map : ∀ a b . (a -> b) -> Remote a -> Remote b;
|
||||
Remote.map f = Remote.bind (f `then` Remote.pure);
|
||||
|
||||
then : ∀ a b c . (a -> b) -> (b -> c) -> a -> c;
|
||||
then f1 f2 x = f2 (f1 x);
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user