Modularize, separate stream impl specific code

This commit is contained in:
Harendra Kumar 2018-06-17 05:11:06 +05:30
parent 10d828ac27
commit f3e1d5948e

View File

@ -465,6 +465,20 @@ enqueueLIFO sv q m = do
atomicModifyIORefCAS_ (needDoorBell sv) (const False)
void $ tryPutMVar (outputDoorBell sv) ()
{-# INLINE runStreamLIFO #-}
runStreamLIFO :: MonadIO m
=> SVar m a -> IORef [Stream m a] -> Stream m a -> m () -> m ()
runStreamLIFO sv q m stop = runStream m (Just sv) stop single yield
where
single a = do
res <- liftIO $ send sv (ChildYield a)
if res then stop else liftIO $ sendStop sv
yield a r = do
res <- liftIO $ send sv (ChildYield a)
if res
then (runStream r) (Just sv) stop single yield
else liftIO $ enqueueLIFO sv q r >> sendStop sv
workLoopLIFO :: MonadIO m => SVar m a -> IORef [Stream m a] -> m ()
workLoopLIFO sv q = run
@ -474,16 +488,7 @@ workLoopLIFO sv q = run
work <- dequeue
case work of
Nothing -> liftIO $ sendStop sv
Just m -> (runStream m) (Just sv) run single yield
single a = do
res <- liftIO $ send sv (ChildYield a)
if res then run else liftIO $ sendStop sv
yield a r = do
res <- liftIO $ send sv (ChildYield a)
if res
then (runStream r) (Just sv) run single yield
else liftIO $ enqueueLIFO sv q r >> sendStop sv
Just m -> runStreamLIFO sv q m run
dequeue = liftIO $ atomicModifyIORefCAS q $ \case
[] -> ([], Nothing)
@ -512,25 +517,29 @@ enqueueFIFO sv q m = do
atomicModifyIORefCAS_ (needDoorBell sv) (const False)
void $ tryPutMVar (outputDoorBell sv) ()
{-# INLINE runStreamFIFO #-}
runStreamFIFO :: MonadIO m
=> SVar m a -> LinkedQueue (Stream m a) -> Stream m a -> m () -> m ()
runStreamFIFO sv q m stop = runStream m (Just sv) stop single yield
where
single a = do
res <- liftIO $ send sv (ChildYield a)
if res then stop else liftIO $ sendStop sv
yield a r = do
res <- liftIO $ send sv (ChildYield a)
liftIO (enqueueFIFO sv q r)
if res then stop else liftIO $ sendStop sv
workLoopFIFO :: MonadIO m => SVar m a -> LinkedQueue (Stream m a) -> m ()
workLoopFIFO sv q = run
where
run = do
work <- dequeue
work <- liftIO $ tryPopR q
case work of
Nothing -> liftIO $ sendStop sv
Just m -> (runStream m) (Just sv) run single yield
dequeue = liftIO $ tryPopR q
single a = do
res <- liftIO $ send sv (ChildYield a)
if res then run else liftIO $ sendStop sv
yield a r = do
res <- liftIO $ send sv (ChildYield a)
liftIO (enqueueFIFO sv q r)
if res then run else liftIO $ sendStop sv
Just m -> runStreamFIFO sv q m run
-------------------------------------------------------------------------------
-- Parallel
@ -644,6 +653,35 @@ enqueueAhead sv q m = do
--
-- XXX review for livelock
--
maxHeap :: Int
maxHeap = 1500
queueEmptyAhead :: MonadIO m => IORef ([Stream m a], Int) -> m Bool
queueEmptyAhead q = liftIO $ do
(xs, _) <- readIORef q
return $ null xs
dequeueAhead :: MonadIO m
=> IORef ([Stream m a], Int) -> m (Maybe (Stream m a, Int))
dequeueAhead q = liftIO $ do
atomicModifyIORefCAS q $ \case
([], n) -> (([], n), Nothing)
(x : [], n) -> (([], n), Just (x, n))
_ -> error "more than one item on queue"
dequeueFromHeap
:: IORef (Heap (Entry Int (AheadHeapEntry m a)), Int)
-> IO (Maybe (Entry Int (AheadHeapEntry m a)))
dequeueFromHeap hpRef = do
atomicModifyIORefCAS hpRef $ \hp@(h, snum) -> do
let r = H.uncons h
case r of
Nothing -> (hp, Nothing)
Just (ent@(Entry seqNo _ev), hp') ->
if (seqNo == snum)
then ((hp', seqNo), Just ent)
else (hp, Nothing)
workLoopAhead :: MonadIO m
=> SVar m a
-> IORef ([Stream m a], Int)
@ -653,8 +691,6 @@ workLoopAhead sv q heap = runHeap
where
maxHeap = 1500
toHeap seqNo ent = do
hp <- liftIO $ atomicModifyIORefCAS heap $ \(h, snum) ->
((H.insert (Entry seqNo ent) h, snum), h)
@ -686,7 +722,7 @@ workLoopAhead sv q heap = runHeap
{-# INLINE runQueueToken #-}
runQueueToken prevSeqNo = do
work <- dequeue
work <- dequeueAhead q
case work of
Nothing -> do
liftIO $ atomicModifyIORefCAS_ heap $ \(h, _) ->
@ -705,7 +741,7 @@ workLoopAhead sv q heap = runHeap
(singleToHeap seqNo)
(yieldToHeap seqNo)
runQueueNoToken = do
work <- dequeue
work <- dequeueAhead q
case work of
Nothing -> runHeap
Just (m, seqNo) -> do
@ -730,7 +766,7 @@ workLoopAhead sv q heap = runHeap
ent <- liftIO $ dequeueFromHeap heap
case ent of
Nothing -> do
done <- queueEmpty q
done <- queueEmptyAhead q
if done
then liftIO $ sendStop sv
else runQueueNoToken
@ -742,29 +778,6 @@ workLoopAhead sv q heap = runHeap
(singleOutput seqNo)
(yieldOutput seqNo)
queueEmpty qu = liftIO $ do
(xs, _) <- readIORef qu
return $ null xs
dequeue = liftIO $ do
atomicModifyIORefCAS q $ \case
([], n) -> (([], n), Nothing)
(x : [], n) -> (([], n), Just (x, n))
_ -> error "more than one item on queue"
dequeueFromHeap
:: IORef (Heap (Entry Int (AheadHeapEntry m a)), Int)
-> IO (Maybe (Entry Int (AheadHeapEntry m a)))
dequeueFromHeap hpRef = do
atomicModifyIORefCAS hpRef $ \hp@(h, snum) -> do
let r = H.uncons h
case r of
Nothing -> (hp, Nothing)
Just (ent@(Entry seqNo _ev), hp') ->
if (seqNo == snum)
then ((hp', seqNo), Just ent)
else (hp, Nothing)
-------------------------------------------------------------------------------
-- WAhead
-------------------------------------------------------------------------------