Fix joinOuter

This commit is contained in:
Ranjeet Kumar Ranjan 2022-03-20 11:14:58 +05:30 committed by Harendra Kumar
parent 4428641646
commit 1b8043f6ef
2 changed files with 29 additions and 18 deletions

View File

@ -54,9 +54,10 @@ import Control.Monad.Catch (MonadCatch)
import Control.Monad.IO.Class (MonadIO(..))
import Control.Monad.Trans.Class (lift)
import Control.Monad.Trans.State.Strict (get, put)
-- import Data.Hashable (Hashable)
import Data.Function ((&))
import Data.IORef (newIORef, readIORef, modifyIORef')
import Data.Kind (Type)
import Data.Maybe (isJust)
#if !(MIN_VERSION_base(4,11,0))
import Data.Semigroup (Semigroup(..))
#endif
@ -70,6 +71,7 @@ import Streamly.Internal.Data.Time.Units (NanoSecond64(..), toRelTime64)
import qualified Data.List as List
import qualified Data.Map.Strict as Map
import qualified Streamly.Internal.Data.Array as Array
import qualified Streamly.Internal.Data.Array.Foreign.Mut.Type as MA
import qualified Streamly.Internal.Data.Fold as Fold
import qualified Streamly.Internal.Data.Parser as Parser
import qualified Streamly.Internal.Data.Stream.IsStream.Lift as Stream
@ -443,17 +445,29 @@ joinOuter :: MonadIO m =>
-> SerialT m (Maybe a, Maybe b)
joinOuter eq s1 s =
Stream.concatM $ do
arr <- Array.fromStream $ fmap (,False) s
return $ go arr <> leftOver arr
inputArr <- Array.fromStream s
let len = length inputArr
foundArr <-
Stream.fold
(MA.writeN len)
(Stream.fromList (Prelude.replicate len False))
return $ go inputArr foundArr <> leftOver inputArr foundArr
where
leftOver =
fmap (\(x, _) -> (Nothing, Just x))
. Stream.filter (not . snd)
. Array.toStream
leftOver inputArr foundArr =
let stream1 = IsStream.fromSerial $ Array.toStream inputArr
stream2 = Stream.unfold MA.read foundArr
in Stream.filter
isJust
( Stream.zipWith (\x y ->
if y
then Nothing
else Just (Nothing, Just x)
) stream1 stream2
) & Stream.catMaybes
go arr = Stream.evalStateT (return False) $ do
go inputArr foundArr = Stream.evalStateT (return False) $ do
a <- Stream.liftInner s1
-- XXX should we use StreamD monad here?
-- XXX Is there a better way to perform some action at the end of a loop
@ -464,16 +478,16 @@ joinOuter eq s1 s =
if r
then Stream.nil
else Stream.fromPure Nothing
(_i, b) <-
let stream = IsStream.fromSerial $ Array.toStream arr
(i, b) <-
let stream = IsStream.fromSerial $ Array.toStream inputArr
in Stream.indexed $ fmap Just (Stream.liftInner stream) <> final
case b of
Just (b1, _used) ->
Just b1 ->
if a `eq` b1
then do
lift $ put True
-- XXX Need to use a mutable array
-- when (not used) $ Array.writeIndex i True
MA.putIndex i True foundArr
return (Just a, Just b1)
else Stream.nil
Nothing -> return (Just a, Nothing)

View File

@ -85,8 +85,6 @@ joinOuterList ls0 ls1 =
v4 = filter (\(_, a2, _) -> isNothing a2) v3
in v2 ++ v4
-- XXX A bug need to be fixed in joinOuter function
{-
joinOuter :: Property
joinOuter =
forAll (listOf (chooseInt (min_value, max_value))) $ \ls0 ->
@ -103,7 +101,7 @@ joinOuter =
(map (\a -> (a, a)) ls1)
v3 = map (\(_, v10, v20) -> (v10, v20)) v2
assert (sort v1 == sort v3)
-}
joinOuterMap :: Property
joinOuterMap =
@ -209,8 +207,7 @@ main = hspec $ do
-- Joins
prop "joinInner" Main.joinInner
prop "joinInnerMap" Main.joinInnerMap
-- XXX currently API is broken https://github.com/composewell/streamly/issues/1032
--prop "joinOuter" Main.joinOuter
prop "joinOuter" Main.joinOuter
prop "joinOuterMap" Main.joinOuterMap
prop "joinLeft" Main.joinLeft
prop "joinLeftMap" Main.joinLeftMap