king: Minor cleanup.

This commit is contained in:
~siprel 2020-06-08 21:22:04 +00:00
parent 1f4c823d92
commit 957f14ee40
2 changed files with 26 additions and 29 deletions

View File

@ -55,24 +55,24 @@ wsConn :: (FromNoun i, ToNoun o, Show i, Show o, HasLogFunc e)
-> WS.Connection -> WS.Connection
-> RIO e () -> RIO e ()
wsConn pre inp out wsc = do wsConn pre inp out wsc = do
logWarn (pre <> "(wcConn) Connected!") logDebug (pre <> "(wcConn) Connected!")
writer <- withRIOThread $ forever $ do writer <- withRIOThread $ forever $ do
logWarn (pre <> "(wsConn) Waiting for data.") logDebug (pre <> "(wsConn) Waiting for data.")
byt <- io $ toStrict <$> WS.receiveData wsc byt <- io $ toStrict <$> WS.receiveData wsc
logWarn (pre <> "Got data") logDebug (pre <> "Got data")
dat <- cueBSExn byt >>= fromNounExn dat <- cueBSExn byt >>= fromNounExn
logWarn (pre <> "(wsConn) Decoded data, writing to chan") logDebug (pre <> "(wsConn) Decoded data, writing to chan")
atomically $ writeTBMChan inp dat atomically $ writeTBMChan inp dat
reader <- withRIOThread $ forever $ do reader <- withRIOThread $ forever $ do
logWarn (pre <> "Waiting for data from chan") logDebug (pre <> "Waiting for data from chan")
atomically (readTBMChan out) >>= \case atomically (readTBMChan out) >>= \case
Nothing -> do Nothing -> do
logWarn (pre <> "(wsConn) Connection closed") logDebug (pre <> "(wsConn) Connection closed")
error "dead-conn" error "dead-conn"
Just msg -> do Just msg -> do
logWarn (pre <> "(wsConn) Got message! " <> displayShow msg) logDebug (pre <> "(wsConn) Got message! " <> displayShow msg)
io $ WS.sendBinaryData wsc $ fromStrict $ jamBS $ toNoun msg io $ WS.sendBinaryData wsc $ fromStrict $ jamBS $ toNoun msg
let cleanup = do let cleanup = do
@ -82,7 +82,7 @@ wsConn pre inp out wsc = do
flip finally cleanup $ do flip finally cleanup $ do
res <- atomically (waitCatchSTM writer <|> waitCatchSTM reader) res <- atomically (waitCatchSTM writer <|> waitCatchSTM reader)
logWarn $ displayShow (res :: Either SomeException ()) logDebug $ displayShow (res :: Either SomeException ())
-------------------------------------------------------------------------------- --------------------------------------------------------------------------------
@ -111,7 +111,7 @@ wsServApp :: (HasLogFunc e, ToNoun o, FromNoun i, Show i, Show o)
-> WS.PendingConnection -> WS.PendingConnection
-> RIO e () -> RIO e ()
wsServApp cb pen = do wsServApp cb pen = do
logError "NOUNSERV (wsServer) Got connection!" logDebug "NOUNSERV (wsServer) Got connection!"
wsc <- io $ WS.acceptRequest pen wsc <- io $ WS.acceptRequest pen
inp <- io $ newTBMChanIO 5 inp <- io $ newTBMChanIO 5
out <- io $ newTBMChanIO 5 out <- io $ newTBMChanIO 5
@ -125,10 +125,10 @@ wsServer = do
tid <- async $ do tid <- async $ do
env <- ask env <- ask
logError "NOUNSERV (wsServer) Starting server" logDebug "NOUNSERV (wsServer) Starting server"
io $ WS.runServer "127.0.0.1" 9999 io $ WS.runServer "127.0.0.1" 9999
$ runRIO env . wsServApp (writeTBMChan con) $ runRIO env . wsServApp (writeTBMChan con)
logError "NOUNSERV (wsServer) Server died" logDebug "NOUNSERV (wsServer) Server died"
atomically $ closeTBMChan con atomically $ closeTBMChan con
pure $ Server (readTBMChan con) tid 9999 pure $ Server (readTBMChan con) tid 9999
@ -147,34 +147,34 @@ example = Just (99, (), 44)
testIt :: HasLogFunc e => RIO e () testIt :: HasLogFunc e => RIO e ()
testIt = do testIt = do
logTrace "(testIt) Starting Server" logDebug "(testIt) Starting Server"
Server{..} <- wsServer @Example @Example Server{..} <- wsServer @Example @Example
logTrace "(testIt) Connecting" logDebug "(testIt) Connecting"
Client{..} <- wsClient @Example @Example "/" sData Client{..} <- wsClient @Example @Example "/" sData
logTrace "(testIt) Accepting connection" logDebug "(testIt) Accepting connection"
sConn <- fromJust "accept" =<< atomically sAccept sConn <- fromJust "accept" =<< atomically sAccept
let let
clientSend = do clientSend = do
logTrace "(testIt) Sending from client" logDebug "(testIt) Sending from client"
atomically (cSend cConn example) atomically (cSend cConn example)
logTrace "(testIt) Waiting for response" logDebug "(testIt) Waiting for response"
res <- atomically (cRecv sConn) res <- atomically (cRecv sConn)
print ("clientSend", res, example) print ("clientSend", res, example)
unless (res == Just example) $ do unless (res == Just example) $ do
error "Bad data" error "Bad data"
logInfo "(testIt) Success" logDebug "(testIt) Success"
serverSend = do serverSend = do
logTrace "(testIt) Sending from server" logDebug "(testIt) Sending from server"
atomically (cSend sConn example) atomically (cSend sConn example)
logTrace "(testIt) Waiting for response" logDebug "(testIt) Waiting for response"
res <- atomically (cRecv cConn) res <- atomically (cRecv cConn)
print ("serverSend", res, example) print ("serverSend", res, example)
unless (res == Just example) $ do unless (res == Just example) $ do
error "Bad data" error "Bad data"
logInfo "(testIt) Success" logDebug "(testIt) Success"
clientSend clientSend
clientSend clientSend

View File

@ -24,7 +24,6 @@ import Urbit.Arvo
import Urbit.King.Config import Urbit.King.Config
import Urbit.Vere.Pier.Types import Urbit.Vere.Pier.Types
import Data.Text (append)
import System.Posix.Files (ownerModes, setFileMode) import System.Posix.Files (ownerModes, setFileMode)
import Urbit.King.App (HasKingEnv, HasPierEnv(..), PierEnv) import Urbit.King.App (HasKingEnv, HasPierEnv(..), PierEnv)
import Urbit.King.App (onKillPierSigL) import Urbit.King.App (onKillPierSigL)
@ -240,7 +239,6 @@ acquireWorker nam act = mkRAcquire (async act) kill
kill tid = do kill tid = do
logTrace ("Killing worker thread: " <> display nam) logTrace ("Killing worker thread: " <> display nam)
cancel tid cancel tid
logTrace ("Killed worker thread: " <> display nam)
acquireWorkerBound :: HasLogFunc e => Text -> RIO e () -> RAcquire e (Async ()) acquireWorkerBound :: HasLogFunc e => Text -> RIO e () -> RAcquire e (Async ())
acquireWorkerBound nam act = mkRAcquire (asyncBound act) kill acquireWorkerBound nam act = mkRAcquire (asyncBound act) kill
@ -248,7 +246,6 @@ acquireWorkerBound nam act = mkRAcquire (asyncBound act) kill
kill tid = do kill tid = do
logTrace ("Killing worker thread: " <> display nam) logTrace ("Killing worker thread: " <> display nam)
cancel tid cancel tid
logTrace ("Killed worker thread: " <> display nam)
-- Run Pier -------------------------------------------------------------------- -- Run Pier --------------------------------------------------------------------
@ -261,9 +258,9 @@ pier
-> RAcquire PierEnv () -> RAcquire PierEnv ()
pier (serf, log) vSlog mStart multi = do pier (serf, log) vSlog mStart multi = do
computeQ <- newTQueueIO @_ @Serf.EvErr computeQ <- newTQueueIO @_ @Serf.EvErr
persistQ <- newTQueueIO persistQ <- newTQueueIO @_ @(Fact, FX)
executeQ <- newTQueueIO executeQ <- newTQueueIO @_ @FX
saveM <- newEmptyTMVarIO saveM <- newEmptyTMVarIO @_ @()
kingApi <- King.kingAPI kingApi <- King.kingAPI
termApiQ <- atomically $ do termApiQ <- atomically $ do
@ -289,14 +286,14 @@ pier (serf, log) vSlog mStart multi = do
atomically $ Term.trace muxed txt atomically $ Term.trace muxed txt
oldSlog txt oldSlog txt
let logId = Log.identity log let logId = Log.identity log :: LogIdentity
let ship = who logId let ship = who logId :: Ship
-- Our call above to set the logging function which echos errors from the -- Our call above to set the logging function which echos errors from the
-- Serf doesn't have the appended \r\n because those \r\n s are added in -- Serf doesn't have the appended \r\n because those \r\n s are added in
-- the c serf code. Logging output from our haskell process must manually -- the c serf code. Logging output from our haskell process must manually
-- add them. -- add them.
let showErr = atomically . Term.trace muxed . flip append "\r\n" let showErr = atomically . Term.trace muxed . (<> "\r\n")
env <- ask env <- ask