Improve ticker example by grouping messages before feeding them back into the UI

This commit is contained in:
Francisco Vallarino 2021-08-20 22:44:16 -03:00
parent f1704902db
commit 9b6d6a5082
3 changed files with 42 additions and 18 deletions

View File

@ -31,22 +31,20 @@ steps:
From that point on:
- When a new message from the API is received, it is feed into the application
using the provided `sendMsg` function.
- When a message from the application is received, it is formatted and forwarded
to the server.
- When a new message from the server is received, it is sent to a grouping
thread that, every half a second, sends the accumulated messages into the
application using the provided `sendMsg` function. Since updates for each coin
are received as independent messages from the server, feeding each of them
directly into the application would trigger multiple model updates. Grouping
these messages and only updating the model a few times per second provides
better and more predictable performance.
The `TickerIgnore` event is used in Tasks that process errors and don't
currently feed information into the application. In general you will want to
report these errors to the user, but this is useful at prototyping time.
## Possible improvements
Since updates for each coin are received as individual messages, triggering a
model change every time, it would be desirable to process them as a single batch
when several messages are received within milliseconds. Grouping these messages
and only updating the model once or twice per second can provide more
predictable performance.
report these errors to the user, but logging them may be enough at prototyping
time.
One way to do this is to:

View File

@ -13,6 +13,7 @@ import Control.Monad.IO.Class
import Control.Monad.STM
import Data.Aeson
import Data.Default
import Data.Foldable (foldl')
import Data.Maybe
import Data.Scientific
import Data.Text (Text)
@ -139,8 +140,8 @@ handleEvent env wenv node model evt = case evt of
Model $ model & symbolPairs .~ moveBefore (model^.symbolPairs) target pair
]
TickerUpdate ticker -> [
Model $ model & tickers . at (ticker ^. symbolPair) ?~ ticker
TickerUpdate updates -> [
Model (processTickerUpdates model updates)
]
TickerError err -> [Task $ print ("Error", err) >> return TickerIgnore]
@ -149,6 +150,11 @@ handleEvent env wenv node model evt = case evt of
TickerIgnore -> []
processTickerUpdates :: TickerModel -> [Ticker] -> TickerModel
processTickerUpdates model updates = foldl' stepTicker model updates where
stepTicker model ticker = model
& tickers . at (ticker ^. symbolPair) ?~ ticker
handleSubscription :: AppEnv -> [Text] -> Text -> IO TickerEvt
handleSubscription env pairs action = do
liftIO . atomically $ writeTChan (env^.channel) req
@ -176,8 +182,11 @@ moveBefore list target item = result where
startProducer :: AppEnv -> (TickerEvt -> IO ()) -> IO ()
startProducer env sendMsg = do
groupChannel <- newTChanIO
Wuss.runSecureClient url port path $ \connection -> do
receiveWs connection sendMsg
groupTickers groupChannel sendMsg
receiveWs connection groupChannel sendMsg
sendWs (env ^. channel) connection
where
url = "stream.binance.com"
@ -189,15 +198,23 @@ subscribeInitial env initialList = do
threadDelay 500000
subscribe env initialList
receiveWs :: WS.Connection -> (TickerEvt -> IO ()) -> IO ()
receiveWs conn sendMsg = void . forkIO . forever $ do
groupTickers :: TChan Ticker -> (TickerEvt -> IO a) -> IO ()
groupTickers channel sendMsg = void . forkIO . forever $ do
ticker <- liftIO . atomically $ readTChan channel
tickers <- collectJustM . liftIO . atomically $ tryReadTChan channel
sendMsg $ TickerUpdate (ticker : tickers)
threadDelay $ 500 * 1000
receiveWs :: WS.Connection -> TChan Ticker -> (TickerEvt -> IO ()) -> IO ()
receiveWs conn groupChannel sendMsg = void . forkIO . forever $ do
msg <- WS.receiveData conn
let serverMsg = decode msg
forM_ serverMsg $ \case
MsgResponse resp -> sendMsg $ TickerResponse resp
MsgError err -> sendMsg $ TickerError err
MsgTicker ticker -> sendMsg $ TickerUpdate ticker
MsgTicker ticker -> liftIO . atomically $ writeTChan groupChannel ticker
sendWs :: (Show a, ToJSON a) => TChan a -> WS.Connection -> IO ()
sendWs channel connection = forever $ do
@ -232,6 +249,15 @@ customDarkTheme = darkTheme
& L.userColorMap . at "trashBg" ?~ rgbHex "#555555"
& L.userColorMap . at "trashFg" ?~ rgbHex "#909090"
collectJustM :: MonadIO m => m (Maybe a) -> m [a]
collectJustM action = do
x <- action
case x of
Nothing -> return []
Just x -> do
xs <- collectJustM action
return (x : xs)
formatTickerValue :: Scientific -> Text
formatTickerValue = T.pack . formatScientific Fixed (Just 8)

View File

@ -41,7 +41,7 @@ data TickerEvt
| TickerRemovePairBegin Text
| TickerRemovePair Text
| TickerMovePair Text Text
| TickerUpdate Ticker
| TickerUpdate [Ticker]
| TickerError ServerError
| TickerResponse ServerResponse
deriving (Eq, Show)