Move examples to streamly-examples repo.

Closes #684
This commit is contained in:
pranaysashank 2020-09-18 16:37:26 +05:30
parent c54581ee95
commit d65f2b4045
20 changed files with 25 additions and 1849 deletions

View File

@ -115,9 +115,9 @@ matrix:
#- env: BUILD=cabal-v2 GHCVER=8.2.2 GHC_OPTIONS="" DISABLE_BENCH=y
# addons: {apt: {packages: [cabal-install-2.4,ghc-8.2.2], sources: [hvr-ghc]}}
- name: "GHC 8.0.2+examples-sdl"
env: BUILD=cabal-v2 GHCVER=8.0.2 GHC_OPTIONS="" CABAL_BUILD_OPTIONS="--flags examples-sdl"
addons: {apt: {packages: [cabal-install-3.2,ghc-8.0.2,libsdl1.2-dev], sources: [hvr-ghc]}}
- name: "GHC 8.0.2"
env: BUILD=cabal-v2 GHCVER=8.0.2 GHC_OPTIONS=""
addons: {apt: {packages: [cabal-install-3.2,ghc-8.0.2], sources: [hvr-ghc]}}
os: linux
arch: amd64
@ -155,9 +155,9 @@ matrix:
#- env: BUILD=stack RESOLVER=nightly
# addons: {apt: {packages: [cabal-install-1.24], sources: [hvr-ghc]}}
- name: "GHC 8.8+stack lts-16.12+examples-sdl"
env: BUILD=stack RESOLVER=lts-16.12 GHCVER=8.8 STACK_BUILD_OPTIONS="--flag streamly:examples-sdl" SDIST_OPTIONS="--ignore-check"
addons: {apt: {packages: [cabal-install-3.2,libsdl1.2-dev], sources: [hvr-ghc]}}
- name: "GHC 8.8+stack lts-16.12"
env: BUILD=stack RESOLVER=lts-16.12 GHCVER=8.8 SDIST_OPTIONS="--ignore-check"
addons: {apt: {packages: [cabal-install-3.2], sources: [hvr-ghc]}}
os: linux
arch: amd64

View File

@ -1,61 +0,0 @@
{-# LANGUAGE CPP #-}
{-# LANGUAGE FlexibleContexts #-}
-- Copyright : (c) 2017 Composewell Technologies
-- (c) 2013, 2014 Gabriel Gonzalez
--
-- This example is adapted from Gabriel Gonzalez's pipes-concurrency package.
-- https://hackage.haskell.org/package/pipes-concurrency-2.0.8/docs/Pipes-Concurrent-Tutorial.html
#if !(MIN_VERSION_base(4,11,0))
import Data.Semigroup ((<>))
#endif
import Streamly.Prelude as S
import Control.Monad (void)
import Control.Monad.IO.Class (MonadIO(liftIO))
import Control.Monad.State (MonadState, get, modify, runStateT)
data Event = Quit | Harm Int | Heal Int deriving (Show)
userAction :: MonadAsync m => SerialT m Event
userAction = S.repeatM $ liftIO askUser
where
askUser = do
command <- getLine
case command of
"potion" -> return (Heal 10)
"harm" -> return (Harm 10)
"quit" -> return Quit
_ -> putStrLn "Type potion or harm or quit" >> askUser
acidRain :: MonadAsync m => SerialT m Event
acidRain = S.asyncly $ S.constRate 1 $ S.repeatM $ liftIO $ return $ Harm 1
data Result = Check | Done
runEvents :: (MonadAsync m, MonadState Int m) => SerialT m Result
runEvents = do
event <- userAction `S.parallel` acidRain
case event of
Harm n -> modify (\h -> h - n) >> return Check
Heal n -> modify (\h -> h + n) >> return Check
Quit -> return Done
data Status = Alive | GameOver deriving Eq
getStatus :: (MonadAsync m, MonadState Int m) => Result -> m Status
getStatus result =
case result of
Done -> liftIO $ putStrLn "You quit!" >> return GameOver
Check -> do
h <- get
liftIO $ if (h <= 0)
then putStrLn "You die!" >> return GameOver
else putStrLn ("Health = " <> show h) >> return Alive
main :: IO ()
main = do
putStrLn "Your health is deteriorating due to acid rain,\\
\ type \"potion\" or \"quit\""
let runGame = S.drainWhile (== Alive) $ S.mapM getStatus runEvents
void $ runStateT runGame 60

View File

@ -1,33 +0,0 @@
-- ghc -O2 -fspec-constr-recursive=10 -fmax-worker-args=16
-- Convert the input file to camel case and write to stdout
import Data.Maybe (fromJust, isJust)
import System.Environment (getArgs)
import System.IO (Handle, IOMode(..), openFile, stdout)
import qualified Streamly.Prelude as S
import qualified Streamly.Internal.FileSystem.Handle as FH
camelCase :: Handle -> Handle -> IO ()
camelCase src dst =
FH.fromBytes dst
$ S.map fromJust
$ S.filter isJust
$ S.map snd
$ S.scanl' step (True, Nothing)
$ FH.toBytes src
where
step (wasSpace, _) x =
if x == 0x0a || x >= 0x41 && x <= 0x5a
then (False, Just x)
else if x >= 0x61 && x <= 0x7a
then (False, Just $ if wasSpace then x - 32 else x)
else (True, Nothing)
main :: IO ()
main = do
name <- fmap head getArgs
src <- openFile name ReadMode
camelCase src stdout

View File

@ -1,82 +0,0 @@
-- Adapted from the Yampa package.
-- Displays a square moving in a circle. To move the position drag it with the
-- mouse.
--
-- Requires the SDL package, assuming streamly has already been built, you can
-- compile it like this:
-- stack ghc --package SDL CirclingSquare.hs
import Data.IORef
import Graphics.UI.SDL as SDL
import Streamly.Prelude as S
------------------------------------------------------------------------------
-- SDL Graphics Init
------------------------------------------------------------------------------
sdlInit :: IO ()
sdlInit = do
SDL.init [InitVideo]
let width = 640
height = 480
_ <- SDL.setVideoMode width height 16 [SWSurface]
SDL.setCaption "Test" ""
------------------------------------------------------------------------------
-- Display a box at a given coordinates
------------------------------------------------------------------------------
display :: (Double, Double) -> IO ()
display (playerX, playerY) = do
screen <- getVideoSurface
-- Paint screen green
let format = surfaceGetPixelFormat screen
bgColor <- mapRGB format 55 60 64
_ <- fillRect screen Nothing bgColor
-- Paint small red square, at an angle 'angle' with respect to the center
foreC <- mapRGB format 212 108 73
let side = 20
x = round playerX
y = round playerY
_ <- fillRect screen (Just (Rect x y side side)) foreC
-- Double buffering
SDL.flip screen
------------------------------------------------------------------------------
-- Wait and update Controller Position if it changes
------------------------------------------------------------------------------
updateController :: IORef (Double, Double) -> IO ()
updateController ref = do
e <- pollEvent
case e of
MouseMotion x y _ _ -> writeIORef ref (fromIntegral x, fromIntegral y)
_ -> return ()
------------------------------------------------------------------------------
-- Periodically refresh the output display
------------------------------------------------------------------------------
updateDisplay :: IORef (Double, Double) -> IO ()
updateDisplay cref = do
time <- SDL.getTicks
(x, y) <- readIORef cref
let t = fromIntegral time * speed / 1000
in display (x + cos t * radius, y + sin t * radius)
where
speed = 6
radius = 60
main :: IO ()
main = do
sdlInit
cref <- newIORef (0,0)
S.drain $ S.asyncly $ S.constRate 40
$ S.repeatM (updateController cref)
`S.parallel` S.repeatM (updateDisplay cref)

View File

@ -1,308 +0,0 @@
{-# LANGUAGE CPP #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE ScopedTypeVariables #-}
-------------------------------------------------------------------------------
-- Combining control flow manipulating monad transformers (MaybeT, exceptT,
-- ContT) with Streamly
-------------------------------------------------------------------------------
--
-- Streamly streams are non-determinism (nested looping) monads. We can use a
-- control flow monad on top or streamly on top depending on whether we want to
-- superimpose control flow manipulation on top of non-deterministic
-- composition or vice-versa.
--
-- This file provides an example where we enter a sequence of characters "x",
-- and "y" on separate lines, on the command line. When any other sequence is
-- entered the control flow short circuits at the first non-matching char and
-- exits.
#if !(MIN_VERSION_base(4,11,0))
import Data.Semigroup (Semigroup(..))
#endif
import Control.Concurrent (threadDelay)
import Control.Exception (catch, SomeException)
import Control.Monad
import Control.Monad.Catch (MonadThrow, throwM, Exception)
import Control.Monad.IO.Class
import Control.Monad.Trans.Class
import Control.Monad.Trans.Maybe
import Control.Monad.Trans.Except
import Control.Monad.Trans.Cont
import Streamly.Prelude
import qualified Streamly.Prelude as S
-------------------------------------------------------------------------------
-- Using MaybeT below streamly
-------------------------------------------------------------------------------
--
-- When streamly is on top MaybeT would terminate all iterations of
-- non-determinism.
--
getSequenceMaybeBelow
:: ( IsStream t
, Monad m
, MonadTrans t
, MonadIO (t (MaybeT m))
)
=> t (MaybeT m) ()
getSequenceMaybeBelow = do
liftIO $ putStrLn "MaybeT below streamly: Enter one char per line: "
i <- S.fromFoldable [1..2 :: Int]
liftIO $ putStrLn $ "iteration = " <> show i
r1 <- liftIO getLine
when (r1 /= "x") $ lift mzero
r2 <- liftIO getLine
when (r2 /= "y") $ lift mzero
mainMaybeBelow :: IO ()
mainMaybeBelow = do
r <- runMaybeT (S.drain getSequenceMaybeBelow)
case r of
Just _ -> putStrLn "Bingo"
Nothing -> putStrLn "Wrong"
-------------------------------------------------------------------------------
-- Using MaybeT above streamly
-------------------------------------------------------------------------------
--
-- When MaybeT is on top a Nothing would terminate only the current iteration
-- of non-determinism below.
--
-- Note that this is redundant configuration as the same behavior can be
-- achieved with just streamly, using mzero.
--
getSequenceMaybeAbove :: (IsStream t, MonadIO (t m)) => MaybeT (t m) ()
getSequenceMaybeAbove = do
liftIO $ putStrLn "MaybeT above streamly: Enter one char per line: "
i <- lift $ S.fromFoldable [1..2 :: Int]
liftIO $ putStrLn $ "iteration = " <> show i
r1 <- liftIO getLine
when (r1 /= "x") mzero
r2 <- liftIO getLine
when (r2 /= "y") mzero
mainMaybeAbove :: (IsStream t, MonadIO (t m)) => MaybeT (t m) ()
mainMaybeAbove = do
getSequenceMaybeAbove
liftIO $ putStrLn "Bingo"
-------------------------------------------------------------------------------
-- Using ExceptT below streamly
-------------------------------------------------------------------------------
--
-- XXX need to have a specialized liftCatch to lift catchE
--
-- Note that throwE would terminate all iterations of non-determinism
-- altogether.
getSequenceEitherBelow
:: ( IsStream t
, MonadTrans t
, Monad m
, MonadIO (t (ExceptT String m))
)
=> t (ExceptT String m) ()
getSequenceEitherBelow = do
liftIO $ putStrLn "ExceptT below streamly: Enter one char per line: "
i <- S.fromFoldable [1..2 :: Int]
liftIO $ putStrLn $ "iteration = " <> show i
r1 <- liftIO getLine
when (r1 /= "x") $ lift $ throwE $ "Expecting x got: " <> r1
r2 <- liftIO getLine
when (r2 /= "y") $ lift $ throwE $ "Expecting y got: " <> r2
mainEitherBelow :: IO ()
mainEitherBelow = do
-- XXX Cannot lift catchE
r <- runExceptT (S.drain getSequenceEitherBelow)
case r of
Right _ -> liftIO $ putStrLn "Bingo"
Left s -> liftIO $ putStrLn s
-------------------------------------------------------------------------------
-- Using ExceptT below concurrent streamly
-------------------------------------------------------------------------------
--
-- XXX does not work correctly yet
--
getSequenceEitherAsyncBelow
:: ( IsStream t
, MonadTrans t
, MonadIO m
, MonadIO (t (ExceptT String m))
, Semigroup (t (ExceptT String m) Integer)
)
=> t (ExceptT String m) ()
getSequenceEitherAsyncBelow = do
liftIO $ putStrLn "ExceptT below concurrent streamly: "
i <- (liftIO (threadDelay 1000)
>> lift (throwE "First task")
>> return 1)
<> (lift (throwE "Second task") >> return 2)
<> S.yield (3 :: Integer)
liftIO $ putStrLn $ "iteration = " <> show i
mainEitherAsyncBelow :: IO ()
mainEitherAsyncBelow = do
r <- runExceptT (S.drain $ asyncly getSequenceEitherAsyncBelow)
case r of
Right _ -> liftIO $ putStrLn "Bingo"
Left s -> liftIO $ putStrLn s
-------------------------------------------------------------------------------
-- Using ExceptT above streamly
-------------------------------------------------------------------------------
--
-- When ExceptT is on top, we can lift the non-determinism of stream from
-- below.
--
-- Note that throwE would terminate/break only current iteration of
-- non-determinism and not all of them altogether.
--
-- Here we can use catchE directly but will have to use monad-control to lift
-- stream operations with stream arguments.
getSequenceEitherAbove :: (IsStream t, MonadIO (t m))
=> ExceptT String (t m) ()
getSequenceEitherAbove = do
liftIO $ putStrLn "ExceptT above streamly: Enter one char per line: "
i <- lift $ S.fromFoldable [1..2 :: Int]
liftIO $ putStrLn $ "iteration = " <> show i
r1 <- liftIO getLine
when (r1 /= "x") $ throwE $ "Expecting x got: " <> r1
r2 <- liftIO getLine
when (r2 /= "y") $ throwE $ "Expecting y got: " <> r2
mainEitherAbove :: (IsStream t, MonadIO (t m)) => ExceptT String (t m) ()
mainEitherAbove =
catchE (getSequenceEitherAbove >> liftIO (putStrLn "Bingo"))
(liftIO . putStrLn)
-------------------------------------------------------------------------------
-- Using MonadThrow to throw exceptions in streamly
-------------------------------------------------------------------------------
--
newtype Unexpected = Unexpected String deriving Show
instance Exception Unexpected
-- Note that unlike when ExceptT is used on top, MonadThrow terminates all
-- iterations of non-determinism rather then just the current iteration.
--
getSequenceMonadThrow :: (IsStream t, MonadIO (t m), MonadThrow (t m))
=> t m ()
getSequenceMonadThrow = do
liftIO $ putStrLn "MonadThrow in streamly: Enter one char per line: "
i <- S.fromFoldable [1..2 :: Int]
liftIO $ putStrLn $ "iteration = " <> show i
r1 <- liftIO getLine
when (r1 /= "x") $ throwM $ Unexpected $ "Expecting x got: " <> r1
r2 <- liftIO getLine
when (r2 /= "y") $ throwM $ Unexpected $ "Expecting y got: " <> r2
mainMonadThrow :: IO ()
mainMonadThrow =
catch (S.drain getSequenceMonadThrow >> liftIO (putStrLn "Bingo"))
(\(e :: SomeException) -> liftIO $ print e)
-------------------------------------------------------------------------------
-- Using ContT below streamly
-------------------------------------------------------------------------------
--
-- CallCC is the goto/setjmp/longjmp equivalent
-- Allows us to manipulate the control flow in arbitrary ways
--
-- XXX need to have a specialized liftCallCC to actually lift callCC
--
getSequenceContBelow
:: (IsStream t, MonadTrans t, MonadIO m, MonadIO (t (ContT r m)))
=> t (ContT r m) (Either String ())
getSequenceContBelow = do
liftIO $ putStrLn "ContT below streamly: Enter one char per line: "
i <- S.fromFoldable [1..2 :: Int]
liftIO $ putStrLn $ "iteration = " <> show i
r <- lift $ callCC $ \exit -> do
r1 <- liftIO getLine
_ <- if r1 /= "x"
then exit $ Left $ "Expecting x got: " <> r1
else return $ Right ()
r2 <- liftIO getLine
if r2 /= "y"
then exit $ Left $ "Expecting y got: " <> r2
else return $ Right ()
liftIO $ putStrLn $ "done iteration = " <> show i
return r
mainContBelow
:: (IsStream t, MonadIO m, MonadTrans t, MonadIO (t (ContT r m)))
=> t (ContT r m) ()
mainContBelow = do
r <- getSequenceContBelow
case r of
Right _ -> liftIO $ putStrLn "Bingo"
Left s -> liftIO $ putStrLn s
-------------------------------------------------------------------------------
-- Using ContT above streamly
-------------------------------------------------------------------------------
--
getSequenceContAbove :: (IsStream t, MonadIO (t m))
=> ContT r (t m) (Either String ())
getSequenceContAbove = do
liftIO $ putStrLn "ContT above streamly: Enter one char per line: "
i <- lift $ S.fromFoldable [1..2 :: Int]
liftIO $ putStrLn $ "iteration = " <> show i
callCC $ \exit -> do
r1 <- liftIO getLine
_ <- if r1 /= "x"
then exit $ Left $ "Expecting x got: " <> r1
else return $ Right ()
r2 <- liftIO getLine
if r2 /= "y"
then exit $ Left $ "Expecting y got: " <> r2
else return $ Right ()
mainContAbove :: (IsStream t, MonadIO (t m)) => ContT r (t m) ()
mainContAbove = do
r <- getSequenceContAbove
case r of
Right _ -> liftIO $ putStrLn "Bingo"
Left s -> liftIO $ putStrLn s
-------------------------------------------------------------------------------
-- Combining control flow manipulating monad transformers (MaybeT, exceptT,
-- ContT) with Streamly
-------------------------------------------------------------------------------
main :: IO ()
main = do
mainMaybeBelow
S.drain $ runMaybeT mainMaybeAbove
runContT (S.drain mainContBelow) return
S.drain (runContT mainContAbove return)
mainEitherBelow
S.drain (runExceptT mainEitherAbove)
mainMonadThrow
mainEitherAsyncBelow

View File

@ -1,21 +0,0 @@
-- A concurrent TCP server that echoes everything that it receives.
import Data.Function ((&))
import Streamly.Internal.Network.Socket (handleWithM)
import Streamly.Network.Socket
import qualified Streamly.Network.Inet.TCP as TCP
import qualified Streamly.Prelude as S
main :: IO ()
main =
S.serially (S.unfold TCP.acceptOnPort 8091)
& S.parallely . S.mapM (handleWithM echo)
& S.drain
where
echo sk =
S.unfold readChunksWithBufferOf (32768, sk) -- SerialT IO Socket
& S.fold (writeChunks sk) -- IO ()

View File

@ -1,65 +0,0 @@
import qualified Streamly.Prelude as S
import qualified Streamly.Data.Fold as FL
import qualified Streamly.Data.Array.Storable.Foreign as A
import qualified Streamly.Internal.Data.Fold as FL
import qualified Streamly.Internal.Data.Stream.IsStream as IP
import qualified Streamly.Internal.FileSystem.File as File
import Data.Char (ord)
import System.Environment (getArgs)
cat :: FilePath -> IO ()
cat src =
File.fromChunks "/dev/stdout"
$ File.toChunksWithBufferOf (256*1024) src
cp :: FilePath -> FilePath -> IO ()
cp src dst =
File.fromChunks dst
$ File.toChunksWithBufferOf (256*1024) src
append :: FilePath -> FilePath -> IO ()
append src dst =
File.appendChunks dst
$ File.toChunksWithBufferOf (256*1024) src
ord' :: Num a => Char -> a
ord' = (fromIntegral . ord)
wcl :: FilePath -> IO ()
wcl src = print =<< (S.length
$ S.splitOnSuffix (== ord' '\n') FL.drain
$ File.toBytes src)
grepc :: String -> FilePath -> IO ()
grepc pat src = print . (subtract 1) =<< (S.length
$ IP.splitOnSeq (A.fromList (map ord' pat)) FL.drain
$ File.toBytes src)
avgll :: FilePath -> IO ()
avgll src = print =<< (S.fold avg
$ S.splitOnSuffix (== ord' '\n') FL.length
$ File.toBytes src)
where avg = (/) <$> toDouble FL.sum <*> toDouble FL.length
toDouble = fmap (fromIntegral :: Int -> Double)
llhisto :: FilePath -> IO ()
llhisto src = print =<< (S.fold (FL.classify FL.length)
$ S.map bucket
$ S.splitOnSuffix (== ord' '\n') FL.length
$ File.toBytes src)
where
bucket n = let i = n `mod` 10 in if i > 9 then (9,n) else (i,n)
main :: IO ()
main = do
src <- fmap head getArgs
putStrLn "cat" >> cat src -- Unix cat program
putStr "wcl " >> wcl src -- Unix wc -l program
putStr "grepc " >> grepc "aaaa" src -- Unix grep -c program
putStr "avgll " >> avgll src -- get average line length
putStr "llhisto " >> llhisto src -- get line length histogram
putStr "cp " >> cp src "dst-xyz.txt" -- Unix cp program
putStr "append " >> append src "dst-xyz.txt" -- Appending to file

View File

@ -1,37 +0,0 @@
-- A concurrent TCP server that:
--
-- * receives connections from clients
-- * splits the incoming data into lines
-- * lines from concurrent connections are merged into a single srteam
-- * writes the line stream to an output file
import Control.Monad.IO.Class (liftIO)
import Network.Socket (close)
import System.Environment (getArgs)
import Streamly.Unicode.Stream
import qualified Streamly.FileSystem.Handle as FH
import qualified Streamly.Data.Array.Storable.Foreign as A
import qualified Streamly.Network.Socket as NS
import qualified Streamly.Network.Inet.TCP as TCP
import qualified Streamly.Prelude as S
import System.IO (withFile, IOMode(..))
main :: IO ()
main = do
file <- fmap head getArgs
withFile file AppendMode
(\src -> S.fold (FH.write src)
$ encodeLatin1
$ S.concatUnfold A.read
$ S.concatMapWith S.parallel use
$ S.unfold TCP.acceptOnPort 8090)
where
use sk = S.finally (liftIO $ close sk) (recv sk)
recv =
S.splitWithSuffix (== '\n') A.write
. decodeLatin1
. S.unfold NS.read

View File

@ -1,20 +0,0 @@
-- A TCP client that does the following:
-- * Reads multiple filenames passed on the command line
-- * Opens as many concurrent connections to the server
-- * Sends all the files concurrently to the server
import System.Environment (getArgs)
import qualified Streamly.Prelude as S
import qualified Streamly.Internal.FileSystem.Handle as IFH
import qualified Streamly.Internal.Network.Inet.TCP as TCP
import System.IO (withFile, IOMode(..))
main :: IO ()
main =
let sendFile file =
withFile file ReadMode $ \src ->
S.fold (TCP.writeChunks (127, 0, 0, 1) 8090)
$ IFH.toChunks src
in getArgs >>= S.drain . S.parallely . S.mapM sendFile . S.fromList

View File

@ -1,107 +0,0 @@
import Data.Char (ord)
import System.Environment (getArgs)
import System.IO (IOMode(..), hSeek, SeekMode(..))
import qualified Streamly.Data.Fold as FL
import qualified Streamly.FileSystem.Handle as FH
import qualified System.IO as FH
import qualified Streamly.Data.Array.Storable.Foreign as A
import qualified Streamly.Prelude as S
-- import qualified Streamly.FileSystem.FD as FH
import qualified Streamly.Internal.Data.Fold as FL
import qualified Streamly.Internal.Unicode.Stream as US
import qualified Streamly.Internal.Memory.ArrayStream as AS
import qualified Streamly.Internal.Data.Stream.IsStream as S
-- Read the contents of a file to stdout.
--
-- FH.read reads the file in 32KB chunks and converts the chunks into a byte
-- stream. FH.write takes the byte stream as input, converts it into chunks of
-- 32KB and writes those chunks to stdout.
--
_cat :: FH.Handle -> IO ()
_cat src = S.fold (FH.write FH.stdout) $ S.unfold FH.read src
-- Chunked version, more efficient than the byte stream version above. Reads
-- the file in 256KB chunks and writes those chunks to stdout.
cat :: FH.Handle -> IO ()
cat src =
S.fold (FH.writeChunks FH.stdout)
$ S.unfold FH.readChunksWithBufferOf ((256*1024), src)
-- Copy a source file to a destination file.
--
-- FH.read reads the file in 32KB chunks and converts the chunks into a byte
-- stream. FH.write takes the byte stream as input, converts it into chunks of
-- 32KB and writes those chunks to the destination file.
_cp :: FH.Handle -> FH.Handle -> IO ()
_cp src dst = S.fold (FH.write dst) $ S.unfold FH.read src
-- Chunked version, more efficient than the byte stream version above. Reads
-- the file in 256KB chunks and writes those chunks to stdout.
cp :: FH.Handle -> FH.Handle -> IO ()
cp src dst =
S.fold (FH.writeChunks dst)
$ S.unfold FH.readChunksWithBufferOf ((256*1024), src)
ord' :: Num a => Char -> a
ord' = (fromIntegral . ord)
-- Count lines like wc -l.
--
-- Char stream version. Reads the input as a byte stream, splits it into lines
-- and counts the lines..
_wcl :: FH.Handle -> IO ()
_wcl src = print =<< (S.length
$ US.lines FL.drain
$ US.decodeLatin1
$ S.unfold FH.read src)
-- More efficient chunked version. Reads chunks from the input handles and
-- splits the chunks directly instead of converting them into byte stream
-- first.
wcl :: FH.Handle -> IO ()
wcl src = print =<< (S.length
$ AS.splitOn 10
$ S.unfold FH.readChunks src)
-- grep -c
--
-- count the occurrences of a pattern in a file.
grepc :: String -> FH.Handle -> IO ()
grepc pat src = print . (subtract 1) =<< (S.length
$ S.splitOnSeq (A.fromList (map ord' pat)) FL.drain
$ S.unfold FH.read src)
-- Compute the average line length in a file.
avgll :: FH.Handle -> IO ()
avgll src = print =<< (S.fold avg
$ S.splitWithSuffix (== ord' '\n') FL.length
$ S.unfold FH.read src)
where avg = (/) <$> toDouble FL.sum <*> toDouble FL.length
toDouble = fmap (fromIntegral :: Int -> Double)
-- histogram of line lengths in a file
llhisto :: FH.Handle -> IO ()
llhisto src = print =<< (S.fold (FL.classify FL.length)
$ S.map bucket
$ S.splitWithSuffix (== ord' '\n') FL.length
$ S.unfold FH.read src)
where
bucket n = let i = n `mod` 10 in if i > 9 then (9,n) else (i,n)
main :: IO ()
main = do
name <- fmap head getArgs
src <- FH.openFile name ReadMode
let rewind = hSeek src AbsoluteSeek 0
rewind >> putStrLn "cat" >> cat src -- Unix cat program
rewind >> putStr "wcl " >> wcl src -- Unix wc -l program
rewind >> putStr "grepc " >> grepc "aaaa" src -- Unix grep -c program
rewind >> putStr "avgll " >> avgll src -- get average line length
rewind >> putStr "llhisto " >> llhisto src -- get line length histogram
dst <- FH.openFile "dst-xyz.txt" WriteMode
rewind >> putStr "cp " >> cp src dst -- Unix cp program

View File

@ -1,23 +0,0 @@
module Main (main) where
import Data.Bifunctor (bimap)
import Data.Function ((&))
import System.IO (stdout, hSetBuffering, BufferMode(LineBuffering))
import qualified Streamly.Prelude as S
import qualified Streamly.Internal.Data.Stream.IsStream as S
import qualified Streamly.Internal.FileSystem.Dir as Dir
-- | List the current directory recursively using concurrent processing
--
main :: IO ()
main = do
hSetBuffering stdout LineBuffering
S.mapM_ print $ S.iterateMapLeftsWith S.ahead listDir (S.yield $ (Left "."))
where
listDir dir =
Dir.toEither dir -- SerialT IO (Either String String)
& S.map (bimap prefix prefix) -- SerialT IO (Either String String)
where prefix x = dir ++ "/" ++ x

View File

@ -1,23 +0,0 @@
{-# LANGUAGE FlexibleContexts #-}
-- | This example generates two streams sorted in ascending order and merges
-- them in ascending order, concurrently.
--
-- Compile with '-threaded -with-rtsopts "-N"' GHC options to use the
-- parallelism.
import Data.Word
import System.Random (getStdGen, randoms)
import Data.List (sort)
import Streamly.Prelude (Serial)
import qualified Streamly.Prelude as S
getSorted :: Serial Word16
getSorted = do
g <- S.yieldM getStdGen
let ls = take 100000 (randoms g) :: [Word16]
foldMap return (sort ls)
main :: IO ()
main = S.last (S.mergeAsyncBy compare getSorted getSorted) >>= print

View File

@ -1,44 +1,3 @@
# Running The Examples
# Examples
## Running directly using stack
You can run these examples using `stack` like this:
```
$ stack build
$ stack AcidRain.hs
```
Note: This method may not work for `CirclingSquare.hs` SDL animation example.
## Build and run
Build the library with the `examples` flag on e.g.
```
stack build --flag streamly:examples
cabal new-build --flags examples
```
Then run the executables, for example:
```
stack exec AcidRain
```
The executable name are the same as the filenames.
## Running the SDL animation example
To include the SDL examples as well build with `examples-sdl` flag:
```
stack build --flag streamly:examples-sdl
cabal new-build --flags examples-sdl
```
Make sure that you have the SDL OS package installed on your system and the
headers are visible to Haskell build tool.
```
stack exec CirclingSquare
```
Examples are available in [streamly-examples][https://github.com/composewell/streamly-examples] repo.

View File

@ -1,10 +0,0 @@
import qualified Streamly.Prelude as S
import qualified Streamly.Internal.Data.Stream.IsStream as Internal
main :: IO ()
main =
S.mapM_ print
$ S.asyncly
$ S.avgRate 1
$ Internal.timestamped
$ S.repeatM (pure "tick")

View File

@ -1,35 +0,0 @@
{-# LANGUAGE CPP #-}
#if !(MIN_VERSION_base(4,11,0))
import Data.Semigroup ((<>))
#endif
import Streamly.Prelude (drain, nil, yieldM, (|:))
import Network.HTTP.Simple
import qualified Streamly.Prelude as S
-- | Runs three search engine queries in parallel and prints the search engine
-- names in the fastest first order.
--
-- Does it twice using two different ways.
--
main :: IO ()
main = do
putStrLn "Using parallel stream construction"
drain . S.parallely $ google |: bing |: duckduckgo |: nil
putStrLn "\nUsing parallel semigroup composition"
drain . S.parallely $ yieldM google <> yieldM bing <> yieldM duckduckgo
putStrLn "\nUsing parallel applicative zip"
drain . S.zipAsyncly $
(,,) <$> yieldM google <*> yieldM bing <*> yieldM duckduckgo
where
get :: String -> IO ()
get s = httpNoBody (parseRequest_ s) >> print s
google, bing, duckduckgo :: IO ()
google = get "https://www.google.com/search?q=haskell"
bing = get "https://www.bing.com/search?q=haskell"
duckduckgo = get "https://www.duckduckgo.com/?q=haskell"

View File

@ -1,38 +0,0 @@
import qualified Streamly.Internal.Data.Stream.IsStream as S
import qualified Streamly.Internal.FileSystem.Handle as IFH
import qualified Streamly.FileSystem.Handle as FH
import qualified System.IO as FH
import Control.Monad.IO.Class (liftIO)
import Control.Monad.State.Strict (StateT(..), get, put)
import System.Environment (getArgs)
import System.IO (IOMode(..))
import Data.Function ((&))
newHandle :: StateT (Maybe (FH.Handle, Int)) IO FH.Handle
newHandle = do
old <- get
idx <- case old of
Nothing -> return 0
Just (h, i) -> liftIO (FH.hClose h) >> return (i + 1)
h <- liftIO $ FH.openFile ("dst-xyz-" ++ show idx ++ ".txt") WriteMode
put (Just (h, idx))
return h
-- XXX reduce the input stream to a stream of file names
-- The fold can return the file name/handle after it is done.
-- similarly the files can written to directories and we can generate a stream
-- of directory names.
splitFile :: FH.Handle -> IO ()
splitFile inHandle =
S.unfold FH.read inHandle
& S.liftInner
& S.chunksOf2 (180 * 1024 * 1024) newHandle IFH.write2
& S.evalStateT (return Nothing) -- generate new handle for each iteration
& S.drain
main :: IO ()
main = do
name <- fmap head getArgs
src <- FH.openFile name ReadMode
splitFile src

View File

@ -1,75 +0,0 @@
{-# LANGUAGE CPP #-}
{-# LANGUAGE FlexibleInstances #-}
{-# OPTIONS_GHC -Wno-orphans #-}
-- compile with:
-- ghc -O2 -fspec-constr-recursive=10 -fmax-worker-args=16 word-classifier.hs
--
import qualified Data.Char as Char
import Data.Foldable
import Data.Function ((&))
import Data.Functor.Identity (Identity(..))
import qualified Data.HashMap.Strict as Map
import Data.Hashable
import Data.IORef
import qualified Data.List as List
import qualified Data.Ord as Ord
import Foreign.Storable (Storable(..))
import qualified Streamly.Unicode.Stream as S
import qualified Streamly.Internal.Unicode.Stream as S
import qualified Streamly.Data.Fold as FL
import qualified Streamly.Internal.Data.Fold as IFL
import qualified Streamly.Internal.Data.Unfold as IUF
import qualified Streamly.Internal.FileSystem.File as File
import qualified Streamly.Data.Array.Storable.Foreign as A
import qualified Streamly.Prelude as S
import System.Environment (getArgs)
instance (Enum a, Storable a) => Hashable (A.Array a) where
hash arr = fromIntegral $ runIdentity $ IUF.fold A.read IFL.rollingHash arr
hashWithSalt salt arr = fromIntegral $ runIdentity $
IUF.fold A.read (IFL.rollingHashWithSalt $ fromIntegral salt) arr
{-# INLINE toLower #-}
toLower :: Char -> Char
toLower c
| uc >= 0x61 && uc <= 0x7a = c
| otherwise = Char.toLower c
where
uc = fromIntegral (Char.ord c) :: Word
{-# INLINE isAlpha #-}
isAlpha :: Char -> Bool
isAlpha c
| uc >= 0x61 && uc <= 0x7a = True
| otherwise = Char.isAlpha c
where
uc = fromIntegral (Char.ord c) :: Word
main :: IO ()
main = do
inFile <- fmap head getArgs
-- Write the stream to a hashmap consisting of word counts
mp <-
let
alter Nothing = fmap Just $ newIORef (1 :: Int)
alter (Just ref) = modifyIORef' ref (+ 1) >> return (Just ref)
in File.toBytes inFile -- SerialT IO Word8
& S.decodeLatin1 -- SerialT IO Char
& S.map toLower -- SerialT IO Char
& S.words FL.toList -- SerialT IO String
& S.filter (all isAlpha) -- SerialT IO String
& S.foldlM' (flip (Map.alterF alter)) (return Map.empty) -- IO (Map String (IORef Int))
-- Print the top hashmap entries
counts <-
let readRef (w, ref) = do
cnt <- readIORef ref
return (w, cnt)
in Map.toList mp
& mapM readRef
traverse_ print $ List.sortOn (Ord.Down . snd) counts
& List.take 25

View File

@ -1,628 +0,0 @@
-------------------------------------------------------------------------------
-- Fast, streaming and parallel word counting (wc) program.
-------------------------------------------------------------------------------
-- 1) On utf8 inputs the serial version is around 3x faster than MacOS wc
-- 2) It can run parallely on multiple cores providing further speedup
-- 3) Parallel version works efficiently on stdin/streaming input as well
-- 4) Parallel version handles utf8 input correctly (including multi-byte space
-- chars) and gives the same output as the serial version on all inputs.
-- 5) There may be differences in word/char counts when there are invalid utf8
-- byte sequences present in the input because of different styles of error
-- handling.
-------------------------------------------------------------------------------
-- Build with the following options:
-------------------------------------------------------------------------------
-- streamly optimization plugin is required for best performance
-- ghc -O2 -fplugin Plugin -fspec-constr-recursive=10 -fmax-worker-args=16
-- For concurrent version add: -threaded -with-rtsopts "-N"
-------------------------------------------------------------------------------
-- Comparing with "wc -mwl" command:
-------------------------------------------------------------------------------
--
-- 1) To enable UTF8 with wc: export LANG=en_US.UTF-8; export LC_ALL=$LANG
-- 2) To test whether it is acutally using utf8, copy and paste this string
-- "U+1680U+2000 U+2001U+2002" and run "wc -mwl" on this. Without proper UTF8
-- handling word count would be 1, with proper UTF8 handling word count would
-- be 4. Note that the spaces in this string are not regular space chars they
-- are different unicode space chars.
{-# LANGUAGE CPP #-}
import Control.Monad (when)
import Data.Char (isSpace)
import Data.Word (Word8)
import GHC.Conc (numCapabilities)
import System.Environment (getArgs)
import System.IO (Handle, openFile, IOMode(..))
import Streamly.Internal.Unicode.Stream
(DecodeState, DecodeError(..), CodePoint, decodeUtf8Either,
resumeDecodeUtf8Either)
import qualified Streamly.Internal.Data.Stream.IsStream as S
import qualified Streamly.Unicode.Stream as S
import qualified Streamly.FileSystem.Handle as FH
import qualified Streamly.Data.Array.Storable.Foreign as A
import qualified Data.Vector.Storable.Mutable as V
-------------------------------------------------------------------------------
-- Parallel char, line and word counting
-------------------------------------------------------------------------------
-- We process individual chunks in the stream independently and parallely and
-- the combine the chunks to combine what they have counted.
--
-------------------------------------------------------------------------------
-- Char counting
-------------------------------------------------------------------------------
-- To count chars each block needs the following:
--
-- -- | header | char counts | trailer |
--
-- header and trailer are incomplete utf8 byte sequences that may be combined
-- with the previous or the next block to complete them later.
--
-- The trailer may have one or more bytes in a valid utf8 sequence and is
-- expecting more bytes to complete the sequence. The header stores any
-- possible continuation from the previous block. It contains a maximum of 3
-- bytes which all must be non-starter bytes.
--
-- When two blocks are combined, the trailer of the first block is combined
-- with the header of the next block and then utf8 decoded. The combined
-- header+trailer may yield:
--
-- * Nothing - when there is no trailer and header
-- * All errors - when there is no trailer in the previous block, and there is
-- a header in the next block. In this case there is no starting char which
-- means all header bytes are errors.
-- * It can yield at most one valid character followed by 0, 1 or 2 errors.
--
-- We count an incomplete utf8 sequence of 2 or more bytes starting with a
-- valid starter byte as a single codepoint. Bytes not following a valid
-- starter byte are treated as individual codepoints for counting.
--
-------------------------------------------------------------------------------
-- Word counting
-------------------------------------------------------------------------------
-- For word counting we need the following in each block:
--
-- -- | header | startsWithSpace | word counts | endsWithSpace | trailer |
--
-- The word counts in individual blocks are performed assuming that the
-- previous char before the block is a space.
-- When combining two blocks, after combining the trailer of previous blocks
-- with the header of the next we determine if the resulting char is a space or
-- not.
--
-- 1) If there is no new char joining the two blocks then we use endsWithSpace
-- of the previous block and startsWithSpace of the next block to determine if
-- the word counts are to be adjusted. If the previous block ends with
-- non-space and the next block starts with non-space we need to decrement the
-- word count by one if it is non-zero in the next block.
--
-- 2) If the new joining char is a space then we combine it with
-- startsWithSpace and endsWithSpace to determine the
-- startsWithSpace/endsWithSpace of the combined block and adjust the word
-- counts appropriately.
--
-------------------------------------------------------------------------------
-- Line counting
-------------------------------------------------------------------------------
-- Line counting is performed by counting "\n" in the stream. No new "\n" can
-- result from patching the trailer and header as it is always a single byte.
-------------------------------------------------------------------------------
-- Counting state
-------------------------------------------------------------------------------
-- We use a mutable vector for the counting state. A fold using an immutable
-- structure for such a large state does not perform well. However, mutability
-- is confined to just the accumulator.
--
-- XXX we need convenient mutable records (like C structs) to handle things
-- like this. It may be possible to achieve the same performance with an
-- immutable accumulator, but that will require more research. Since we are
-- always discarding the previous state, we can perhaps make use of that memory
-- using safe in-place modifications, without having to allocate new memory.
-- XXX we can also count the number of decoding errors separately
data Field =
-- The number of "\n" characters found in the block.
LineCount
-- Number of full words found in the block, words are counted on a
-- transition from space char to a non-space char. We always assume the
-- char before the first starter char in a block is a space. If this is
-- found to be incorrect when joining two blocks then we fix the counts
-- accordingly.
| WordCount
-- The number of successfully decoded characters plus the number of
-- decoding failures in the block. Each byte or sequence of bytes on which
-- decoding fails is also counted as one char. The header and trailer bytes
-- are not accounted in this, they are accounted only when we join two
-- blocks.
| CharCount
-- whether the last counted char in this block was a space char
| WasSpace
-- whether the first successfully decoded char in this block is a space. A
-- decoding failure, after the trailing bytes from the previous block are
-- accounted, is also considered as space.
| FirstIsSpace
-- If no starter byte is found in the first three bytes in the block then
-- store those bytes to possibly combine them with the trailing incomplete
-- byte sequence in the previous block. We mark it done when either we have
-- stored three bytes or we have found a starter byte.
--
-- XXX This is ugly to manipulate, we can implement a statically max sized
-- mutable ring structure within this record.
| HeaderDone
| HeaderWordCount
| HeaderWord1
| HeaderWord2
| HeaderWord3
-- If a byte sequence at the end of the block is not complete then store
-- the current state of the utf8 decoder to continue it later using the
-- incomplete leading byte sequence in the next block.
| TrailerPresent
| TrailerState
| TrailerCodePoint
deriving (Show, Enum, Bounded)
-------------------------------------------------------------------------------
-- Default/initial state of the block
-------------------------------------------------------------------------------
readField :: V.IOVector Int -> Field -> IO Int
readField v fld = V.read v (fromEnum fld)
writeField :: V.IOVector Int -> Field -> Int -> IO ()
writeField v fld val = V.write v (fromEnum fld) val
modifyField :: V.IOVector Int -> Field -> (Int -> Int) -> IO ()
modifyField v fld f = V.modify v f (fromEnum fld)
newCounts :: IO (V.IOVector Int)
newCounts = do
counts <- V.new (fromEnum (maxBound :: Field) + 1)
writeField counts LineCount 0
writeField counts WordCount 0
writeField counts CharCount 0
writeField counts WasSpace 1
writeField counts FirstIsSpace 0
writeField counts HeaderDone 0
writeField counts HeaderWordCount 0
writeField counts TrailerPresent 0
return counts
-------------------------------------------------------------------------------
-- Counting chars
-------------------------------------------------------------------------------
accountChar :: V.IOVector Int -> Bool -> IO ()
accountChar counts isSp = do
c <- readField counts CharCount
let space = if isSp then 1 else 0
when (c == 0) $ writeField counts FirstIsSpace space
writeField counts CharCount (c + 1)
writeField counts WasSpace space
-------------------------------------------------------------------------------
-- Manipulating the header bytes
-------------------------------------------------------------------------------
addToHeader :: V.IOVector Int -> Int -> IO Bool
addToHeader counts cp = do
cnt <- readField counts HeaderWordCount
case cnt of
0 -> do
writeField counts HeaderWord1 cp
writeField counts HeaderWordCount 1
return True
1 -> do
writeField counts HeaderWord2 cp
writeField counts HeaderWordCount 2
return True
2 -> do
writeField counts HeaderWord3 cp
writeField counts HeaderWordCount 3
writeField counts HeaderDone 1
return True
_ -> return False
resetHeaderOnNewChar :: V.IOVector Int -> IO ()
resetHeaderOnNewChar counts = do
hdone <- readField counts HeaderDone
when (hdone == 0) $ writeField counts HeaderDone 1
-------------------------------------------------------------------------------
-- Manipulating the trailer
-------------------------------------------------------------------------------
setTrailer :: V.IOVector Int -> DecodeState -> CodePoint -> IO ()
setTrailer counts st cp = do
writeField counts TrailerState (fromIntegral st)
writeField counts TrailerCodePoint cp
writeField counts TrailerPresent 1
resetTrailerOnNewChar :: V.IOVector Int -> IO ()
resetTrailerOnNewChar counts = do
trailer <- readField counts TrailerPresent
when (trailer /= 0) $ do
writeField counts TrailerPresent 0
accountChar counts True
-------------------------------------------------------------------------------
-- Counting the stream
-------------------------------------------------------------------------------
{-# INLINE countChar #-}
countChar :: V.IOVector Int -> Either DecodeError Char -> IO ()
countChar counts inp =
case inp of
Right ch -> do
resetHeaderOnNewChar counts
-- account the last stored error as whitespace and clear it
resetTrailerOnNewChar counts
when (ch == '\n') $ modifyField counts LineCount (+ 1)
if isSpace ch
then accountChar counts True
else do
wasSpace <- readField counts WasSpace
when (wasSpace /= 0) $ modifyField counts WordCount (+ 1)
accountChar counts False
Left (DecodeError st cp) -> do
hdone <- readField counts HeaderDone
if hdone == 0
then do
if st == 0
then do
-- We got a non-starter in initial decoder state, there may
-- be something that comes before this to complete it.
r <- addToHeader counts cp
when (not r) $ error "countChar: Bug addToHeader failed"
else do
-- We got an error in a non-initial decoder state, it may
-- be an input underflow error, keep it as incomplete in
-- the trailer.
writeField counts HeaderDone 1
setTrailer counts st cp
else do
resetTrailerOnNewChar counts
if st == 0
then accountChar counts True
else setTrailer counts st cp
printCounts :: V.IOVector Int -> IO ()
printCounts v = do
l <- readField v LineCount
w <- readField v WordCount
c <- readField v CharCount
putStrLn $ show l ++ " " ++ show w ++ " " ++ show c
-------------------------------------------------------------------------------
-- Serial counting using parallel version of countChar
-------------------------------------------------------------------------------
_wc_mwl_parserial :: Handle -> IO (V.IOVector Int)
_wc_mwl_parserial src = do
counts <- newCounts
S.mapM_ (countChar counts)
$ decodeUtf8Either
$ S.unfold FH.read src
return counts
-------------------------------------------------------------------------------
-- Serial word counting with UTF-8 handling
-------------------------------------------------------------------------------
data Counts = Counts !Int !Int !Int !Bool deriving Show
{-# INLINE countCharSerial #-}
countCharSerial :: Counts -> Char -> Counts
countCharSerial (Counts l w c wasSpace) ch =
let l1 = if (ch == '\n') then l + 1 else l
(w1, wasSpace1) =
if (isSpace ch)
then (w, True)
else (if wasSpace then w + 1 else w, False)
in (Counts l1 w1 (c + 1) wasSpace1)
-- Note: This counts invalid byte sequences are non-space chars
_wc_mwl_serial :: Handle -> IO ()
_wc_mwl_serial src = print =<< (
S.foldl' countCharSerial (Counts 0 0 0 True)
$ S.decodeUtf8
$ S.unfold FH.read src)
-------------------------------------------------------------------------------
-- Parallel counting
-------------------------------------------------------------------------------
-- XXX we need a better data structure to store the header bytes to make these
-- routines simpler.
--
-- combine trailing bytes in preceding block with leading bytes in the next
-- block and decode them into a codepoint
reconstructChar :: Int
-> V.IOVector Int
-> V.IOVector Int
-> IO (S.SerialT IO (Either DecodeError Char))
reconstructChar hdrCnt v1 v2 = do
when (hdrCnt > 3 || hdrCnt < 0) $ error "reconstructChar: hdrCnt > 3"
stream1 <-
if (hdrCnt > 2)
then do
x <- readField v2 HeaderWord3
return $ (fromIntegral x :: Word8) `S.cons` S.nil
else return S.nil
stream2 <-
if (hdrCnt > 1)
then do
x <- readField v2 HeaderWord2
return $ fromIntegral x `S.cons` stream1
else return stream1
stream3 <-
if (hdrCnt > 0)
then do
x <- readField v2 HeaderWord1
return $ fromIntegral x `S.cons` stream2
else return stream2
state <- readField v1 TrailerState
cp <- readField v1 TrailerCodePoint
return $ resumeDecodeUtf8Either (fromIntegral state) cp stream3
getHdrChar :: V.IOVector Int -> IO (Maybe Int)
getHdrChar v = do
hdrCnt <- readField v HeaderWordCount
case hdrCnt of
0 -> return Nothing
1 -> do
writeField v HeaderWordCount 0
fmap Just $ readField v HeaderWord1
2 -> do
x1 <- readField v HeaderWord1
x2 <- readField v HeaderWord2
writeField v HeaderWord1 x2
writeField v HeaderWordCount 1
return $ Just x1
3 -> do
x1 <- readField v HeaderWord1
x2 <- readField v HeaderWord2
x3 <- readField v HeaderWord3
writeField v HeaderWord1 x2
writeField v HeaderWord2 x3
writeField v HeaderWordCount 2
return $ Just x1
_ -> error "getHdrChar: Bug, hdrCnt not in range 0-3"
-- If the header of the first block is not done then combine the header
-- with the header of the next block.
combineHeaders :: V.IOVector Int -> V.IOVector Int -> IO ()
combineHeaders v1 v2 = do
hdone1 <- readField v1 HeaderDone
if hdone1 == 0
then do
res <- getHdrChar v2
case res of
Nothing -> return ()
Just x -> do
r <- addToHeader v1 x
when (not r) $ error "combineHeaders: Bug, addToHeader failed"
else return ()
-- We combine the contents of the second vector into the first vector, mutating
-- the first vector and returning it.
-- XXX This is a quick hack and can be refactored to reduce the size
-- and understandability considerably.
addCounts :: V.IOVector Int -> V.IOVector Int -> IO (V.IOVector Int)
addCounts v1 v2 = do
hdone1 <- readField v1 HeaderDone
hdone2 <- readField v2 HeaderDone
hdrCnt2_0 <- readField v2 HeaderWordCount
if hdone1 == 0 && (hdrCnt2_0 /= 0 || hdone2 /= 0)
then do
combineHeaders v1 v2
if hdone2 == 0
then return v1
else do
writeField v1 HeaderDone 1
addCounts v1 v2
else do
trailerPresent2 <- readField v2 TrailerPresent
trailerState2 <- readField v2 TrailerState
trailerCodePoint2 <- readField v2 TrailerCodePoint
if hdone1 == 0
then error "addCounts: Bug, trying to add completely empty second block"
else do
l1 <- readField v1 LineCount
w1 <- readField v1 WordCount
c1 <- readField v1 CharCount
wasSpace1 <- readField v1 WasSpace
l2 <- readField v2 LineCount
w2 <- readField v2 WordCount
c2 <- readField v2 CharCount
wasSpace2 <- readField v2 WasSpace
firstIsSpace2 <- readField v2 FirstIsSpace
hdrCnt2 <- readField v2 HeaderWordCount
trailer1 <- readField v1 TrailerPresent
if trailer1 == 0 -- no trailer in the first block
then do
-- header2, if any, complete or incomplete, is just invalid
-- bytes, count them as whitespace
let firstIsSpace2' = firstIsSpace2 /= 0 || hdrCnt2 /= 0
w <-
if w2 /= 0 && wasSpace1 == 0 && not firstIsSpace2'
then return $ w1 + w2 - 1
else return $ w1 + w2
writeField v1 LineCount (l1 + l2)
writeField v1 WordCount w
writeField v1 CharCount (c1 + c2 + hdrCnt2)
when (c1 == 0) $ do
if c2 == 0 && hdrCnt2 /= 0
then writeField v1 FirstIsSpace 1
else writeField v1 FirstIsSpace firstIsSpace2
if c2 == 0 && hdrCnt2 /= 0
then writeField v1 WasSpace 1
else writeField v1 WasSpace wasSpace2
writeField v1 TrailerPresent trailerPresent2
writeField v1 TrailerState trailerState2
writeField v1 TrailerCodePoint trailerCodePoint2
return v1
else do
if hdrCnt2 == 0
then do
when (hdone2 /= 0) $ do -- empty and Done header
-- count trailer as whitespace, its counted as one char
-- Note: hdrCnt2 == 0 means either header is not done
-- or c2 /= 0
writeField v1 LineCount (l1 + l2)
writeField v1 WordCount (w1 + w2)
writeField v1 CharCount (c1 + c2 + 1)
when (c1 == 0) $ writeField v1 FirstIsSpace 1
if (c2 == 0)
then writeField v1 WasSpace 1
else writeField v1 WasSpace wasSpace2
writeField v1 TrailerPresent trailerPresent2
writeField v1 TrailerState trailerState2
writeField v1 TrailerCodePoint trailerCodePoint2
-- If header of the next block is not done we continue the
-- trailer from the previous block instead of treating it
-- as whitespace
return v1
else do
-- join the trailing part of the first block with the
-- header of the next
decoded <- reconstructChar hdrCnt2 v1 v2
res <- S.uncons decoded
case res of
Nothing -> error "addCounts: Bug. empty reconstructed char"
Just (h, t) -> do
tlength <- S.length t
case h of
Right ch -> do
-- If we have an error case after this
-- char then that would be treated
-- as whitespace
let lcount = l1 + l2
lcount1 = if (ch == '\n') then lcount + 1 else lcount
wcount = w1 + w2
firstSpace = isSpace ch
wasSpace = firstSpace || tlength /= 0
wcount1 =
if wasSpace
then wcount
else if w2 == 0 || firstIsSpace2 /= 0
then wcount
else wcount - 1
writeField v1 LineCount lcount1
writeField v1 WordCount wcount1
writeField v1 CharCount (c1 + c2 + 1 + tlength)
when (c1 == 0) $
writeField v1 FirstIsSpace
(if firstSpace then 1 else 0)
if c2 == 0
then do
if wasSpace
then writeField v1 WasSpace 1
else writeField v1 WasSpace 0
else writeField v1 WasSpace wasSpace2
writeField v1 TrailerPresent trailerPresent2
writeField v1 TrailerState trailerState2
writeField v1 TrailerCodePoint trailerCodePoint2
return v1
Left (DecodeError st cp) -> do
-- if header was incomplete it may result
-- in partially decoded char to be written
-- as trailer. Check if the last error is
-- an incomplete decode.
r <- S.last t
let (st', cp') =
case r of
Nothing -> (st, cp)
Just lst -> case lst of
Right _ -> error "addCounts: Bug"
Left (DecodeError st1 cp1) -> (st1, cp1)
if hdone2 == 0 && st' /= 12
then do
-- all elements before the last one must be errors
writeField v1 CharCount (c1 + tlength)
when (c1 == 0) $
writeField v1 FirstIsSpace 1
writeField v1 WasSpace 1
writeField v1 TrailerState (fromIntegral st')
writeField v1 TrailerCodePoint cp'
else do
-- all elements must be errors
-- treat them as whitespace
writeField v1 LineCount (l1 + l2)
writeField v1 WordCount (w1 + w2)
writeField v1 CharCount (c1 + c2 + tlength + 1)
when (c1 == 0) $
writeField v1 FirstIsSpace 1
if c2 == 0
then writeField v1 WasSpace 1
else writeField v1 WasSpace wasSpace2
writeField v1 TrailerPresent trailerPresent2
writeField v1 TrailerState trailerState2
writeField v1 TrailerCodePoint trailerCodePoint2
return v1
-- Individual array processing is an isolated loop, fusing it with the bigger
-- loop may be counter productive.
{-# NOINLINE countArray #-}
countArray :: A.Array Word8 -> IO (V.IOVector Int)
countArray src = do
counts <- newCounts
S.mapM_ (countChar counts)
$ decodeUtf8Either
$ S.unfold A.read src
return counts
{-# INLINE wc_mwl_parallel #-}
wc_mwl_parallel :: Handle -> Int -> IO (V.IOVector Int)
wc_mwl_parallel src n = do
S.foldlM' addCounts newCounts
$ S.aheadly
$ S.maxThreads numCapabilities
$ S.mapM (countArray)
$ S.unfold FH.readChunksWithBufferOf (n, src)
-------------------------------------------------------------------------------
-- Main
-------------------------------------------------------------------------------
main :: IO ()
main = do
name <- fmap head getArgs
src <- openFile name ReadMode
-- _wc_mwl_serial src -- Unix wc -l program
-- printCounts =<< _wc_mwl_parserial src -- Unix wc -l program
-- Using different sizes of chunks (1,2,3,4,5,10,128,256) is a good testing
-- mechanism for parallel counting code.
{-
args <- getArgs
let chunkSize = read $ args !! 1
-}
let chunkSize = 32 * 1024
printCounts =<< wc_mwl_parallel src chunkSize -- Unix wc -l program

View File

@ -24,7 +24,8 @@
-- used to fine tune the concurrency control.
--
-- Streaming and concurrency together enable expressing reactive applications
-- conveniently. See the @CirclingSquare@ example in the examples directory for
-- conveniently. See the @CirclingSquare@ example in
-- <https://github.com/composewell/streamly-examples Streamly Examples> for
-- a simple SDL based FRP example. To summarize, streamly provides a unified
-- computing framework for streaming, non-determinism and functional reactive
-- programming in an elegant and simple API that is a natural extension of pure
@ -912,7 +913,7 @@ import Control.Monad.Trans.Class (MonadTrans (lift))
-- time. Take an example of a merge sort of two sorted streams. We need to
-- keep consuming items from the stream which has the lowest item in the sort
-- order. This can be achieved using async references to streams. See
-- "MergeSort.hs" in the examples directory.
-- "MergeSort.hs" in <https://github.com/composewell/streamly-examples Streamly Examples>.
-- $monoid
--
@ -1436,9 +1437,9 @@ import Control.Monad.Trans.Class (MonadTrans (lift))
-- and operators instead of the ugly pragmas.
--
-- For more concurrent programming examples see,
-- <examples/ListDir.hs ListDir.hs>,
-- <examples/MergeSort.hs MergeSort.hs> and
-- <examples/SearchQuery.hs SearchQuery.hs> in the examples directory.
-- <https://github.com/composewell/streamly-examples/tree/master/ListDir.hs ListDir.hs>,
-- <https://github.com/composewell/streamly-examples/tree/master/MergeSort.hs MergeSort.hs> and
-- <https://github.com/composewell/streamly-examples/tree/master/SearchQuery.hs SearchQuery.hs> in the examples directory.
-- $reactive
--
@ -1512,14 +1513,15 @@ import Control.Monad.Trans.Class (MonadTrans (lift))
-- void $ runStateT runGame 60
-- @
--
-- You can also find the source of this example in the examples directory as
-- <examples/AcidRain.hs AcidRain.hs>. It has been adapted from Gabriel's
-- You can also find the source of this example in the streamly-examples repo
-- as <https://github.com/composewell/streamly-examples/tree/master/AcidRain.hs AcidRain.hs>.
-- It has been adapted from Gabriel's
-- <https://hackage.haskell.org/package/pipes-concurrency-2.0.8/docs/Pipes-Concurrent-Tutorial.html pipes-concurrency>
-- package.
-- This is much simpler compared to the pipes version because of the builtin
-- concurrency in streamly. You can also find a SDL based reactive programming
-- example adapted from Yampa in
-- <examples/CirclingSquare.hs CirclingSquare.hs>.
-- <https://github.com/composewell/streamly-examples/tree/master/CirclingSquare.hs CirclingSquare.hs>.
-- $performance
--
@ -1714,15 +1716,15 @@ import Control.Monad.Trans.Class (MonadTrans (lift))
-- strong reactive programming library as well. Reactive programming is
-- fundamentally stream of events that can be processed concurrently. The
-- example in this tutorial as well as the
-- <examples/CirclingSquare.hs CirclingSquare> example from Yampa demonstrate
-- the basic reactive capability of streamly. In core concepts streamly is
-- strikingly similar to @dunai@. dunai was designed from a FRP perspective
-- and streamly was originally designed from a concurrency perspective.
-- However, both have similarity at the core.
-- <https://github.com/composewell/streamly-examples/tree/master/CirclingSquare.hs CirclingSquare>
-- example from Yampa demonstrate the basic reactive capability of streamly.
-- In core concepts streamly is strikingly similar to @dunai@. dunai was
-- designed from a FRP perspective and streamly was originally designed from a
-- concurrency perspective. However, both have similarity at the core.
-- $furtherReading
--
-- * Read the documentation of "Streamly" module
-- * Read the documentation of "Streamly.Prelude" module
-- * See the examples in the "examples" directory of the package
-- * See the examples in <https://github.com/composewell/streamly-examples streamly-examples> repo.
-- * See the tests in the "test" directory of the package

View File

@ -68,7 +68,7 @@ description:
* /Interoperation/: "Streamly.Tutorial" module for interop with other
streaming libraries
* /Reference Documentation/: Haddock documentation for the respective modules
* /Examples/: <src/examples examples directory> in the package
* /Examples/: <https://github.com/composewell/streamly-examples Streamly Examples>
* /Guides/: <src/docs docs directory> in the package, for documentation on
advanced topics, limitations, semantics of the library or on specific use
cases.
@ -197,16 +197,6 @@ flag streamk
manual: True
default: False
flag examples
description: Build including examples
manual: True
default: False
flag examples-sdl
description: Build including SDL examples
manual: True
default: False
flag use-c-malloc
description: Use C malloc instead of GHC malloc
manual: True
@ -741,212 +731,3 @@ executable FileSystem.Event
main-is: Streamly/Test/FileSystem/Event.hs
if !flag(manual-tests)
buildable: False
-------------------------------------------------------------------------------
-- Examples
-------------------------------------------------------------------------------
executable SearchQuery
import: exe-options
main-is: SearchQuery.hs
hs-source-dirs: examples
if (flag(examples) || flag(examples-sdl)) && !impl(ghcjs)
buildable: True
build-depends:
streamly
, base >= 4.9 && < 5
, http-conduit >= 2.2.2 && < 2.4
else
buildable: False
executable ListDir
import: exe-options
main-is: ListDir.hs
hs-source-dirs: examples
if flag(examples) || flag(examples-sdl)
buildable: True
build-Depends:
streamly
, base >= 4.9 && < 5
else
buildable: False
executable MergeSort
import: exe-options
main-is: MergeSort.hs
hs-source-dirs: examples
if flag(examples) || flag(examples-sdl)
buildable: True
build-Depends:
streamly
, base >= 4.9 && < 5
, random >= 1.0.0 && < 2
else
buildable: False
executable AcidRain
import: exe-options
main-is: AcidRain.hs
hs-source-dirs: examples
if flag(examples) || flag(examples-sdl)
buildable: True
build-Depends:
streamly
, base >= 4.9 && < 5
, mtl >= 2.2 && < 3
else
buildable: False
executable CirclingSquare
import: exe-options
main-is: CirclingSquare.hs
hs-source-dirs: examples
if flag(examples-sdl)
buildable: True
build-Depends:
streamly
, base >= 4.9 && < 5
, SDL >= 0.6.5 && < 0.7
else
buildable: False
executable ControlFlow
import: exe-options
main-is: ControlFlow.hs
hs-source-dirs: examples
if flag(examples) || flag(examples-sdl)
buildable: True
build-Depends:
streamly
, base >= 4.9 && < 5
, exceptions >= 0.8 && < 0.11
, transformers >= 0.4 && < 0.6
, transformers-base >= 0.4 && < 0.5
else
buildable: False
executable HandleIO
import: exe-options
main-is: HandleIO.hs
hs-source-dirs: examples
if flag(examples) || flag(examples-sdl)
buildable: True
build-Depends:
streamly
, base >= 4.9 && < 5
else
buildable: False
executable FileIOExamples
import: exe-options
main-is: FileIOExamples.hs
hs-source-dirs: examples
if flag(examples) || flag(examples-sdl)
buildable: True
build-Depends:
streamly
, base >= 4.9 && < 5
else
buildable: False
executable EchoServer
import: exe-options
main-is: EchoServer.hs
hs-source-dirs: examples
if (flag(examples) || flag(examples-sdl)) && !impl(ghcjs)
buildable: True
build-Depends:
streamly
, base >= 4.9 && < 5
, network >= 2.6 && < 4
else
buildable: False
executable FileSinkServer
import: exe-options
main-is: FileSinkServer.hs
hs-source-dirs: examples
if (flag(examples) || flag(examples-sdl)) && !impl(ghcjs)
buildable: True
build-Depends:
streamly
, base >= 4.9 && < 5
, network >= 2.6 && < 4
else
buildable: False
executable FromFileClient
import: exe-options
main-is: FromFileClient.hs
hs-source-dirs: examples
if (flag(examples) || flag(examples-sdl)) && !impl(ghcjs)
buildable: True
build-Depends:
streamly
, base >= 4.9 && < 5
else
buildable: False
executable WordClassifier
import: exe-options
main-is: WordClassifier.hs
hs-source-dirs: examples
if (flag(examples) || flag(examples-sdl)) && !impl(ghcjs)
buildable: True
build-Depends:
streamly
, base >= 4.9 && < 5
, hashable >= 1.2 && < 1.4
, unordered-containers >= 0.2 && < 0.3
else
buildable: False
executable WordCount
import: exe-options
main-is: WordCount.hs
hs-source-dirs: examples
if (flag(examples) || flag(examples-sdl)) && !impl(ghcjs)
buildable: True
build-Depends:
streamly
, base >= 4.9 && < 5
, vector >= 0.12 && < 0.13
else
buildable: False
executable CamelCase
import: exe-options
main-is: CamelCase.hs
hs-source-dirs: examples
if (flag(examples) || flag(examples-sdl)) && !impl(ghcjs)
buildable: True
build-Depends:
streamly
, base >= 4.9 && < 5
else
buildable: False
executable Rate
import: exe-options
main-is: Rate.hs
hs-source-dirs: examples
if (flag(examples) || flag(examples-sdl)) && !impl(ghcjs)
buildable: True
build-Depends:
streamly
, base >= 4.9 && < 5
else
buildable: False
executable Split
import: exe-options
main-is: Split.hs
hs-source-dirs: examples
if (flag(examples) || flag(examples-sdl)) && !impl(ghcjs)
buildable: True
build-Depends:
streamly
, base >= 4.9 && < 5
, mtl >= 2.2 && < 3
else
buildable: False