Removed FileSystem modules

This commit is contained in:
iko 2024-01-26 19:51:04 +03:00
parent 30c5a56ff9
commit c756f7ffe0
Signed by untrusted user: iko
GPG Key ID: 82C257048D1026F2
9 changed files with 0 additions and 4458 deletions

View File

@ -1,245 +0,0 @@
-- |
-- Module : Streamly.FileSystem.Event
-- Copyright : (c) 2020 Composewell Technologies
-- License : BSD-3-Clause
-- Maintainer : streamly@composewell.com
-- Stability : pre-release
-- Portability : GHC
--
-- File system event notification API portable across Linux, macOS and Windows
-- platforms.
--
-- Note that recursive directory tree watch does not work reliably on Linux
-- (see notes in the Linux module), therefore, recursive watch API is not
-- provided in this module. However, you can use it from the platform specific
-- modules.
--
-- For platform specific APIs please see the following modules:
--
-- * "Streamly.Internal.FileSystem.Event.Darwin"
-- * "Streamly.Internal.FileSystem.Event.Linux"
-- * "Streamly.Internal.FileSystem.Event.Windows"
-- XXX Need to ensure that the signatures of the exported APIs are same on all
-- platforms.
module Streamly.Internal.FileSystem.Event
(
-- * Creating a Watch
watch
-- , watchRecursive
-- * Handling Events
, Event
, getAbsPath
-- ** Item CRUD events
, isCreated
, isDeleted
, isMoved
, isModified
-- ** Exception Conditions
, isEventsLost
-- ** Debugging
, showEvent
)
where
import Data.List.NonEmpty (NonEmpty)
import Data.Word (Word8)
import Streamly.Data.Array (Array)
import Streamly.Data.Stream (Stream)
#if defined(CABAL_OS_DARWIN)
import Streamly.Internal.FileSystem.Event.Darwin (Event)
import qualified Streamly.Internal.FileSystem.Event.Darwin as Event
#elif defined(CABAL_OS_LINUX)
import Streamly.Internal.FileSystem.Event.Linux (Event)
import qualified Streamly.Internal.FileSystem.Event.Linux as Event
#elif defined(CABAL_OS_WINDOWS)
import Streamly.Internal.FileSystem.Event.Windows (Event)
import qualified Streamly.Internal.FileSystem.Event.Windows as Event
#else
#error "FS Events not supported on this platform"
#endif
-- XXX Ensure that defaultConfig is the same for all platforms.
--
-- XXX Ensure that equivalent events are generated on all platforms when same
-- operations are done. Document any differences in behavior on different
-- platforms. Any events that are not common to all platforms should be
-- filtered out.
--
-- XXX Only need to set common event types for macOS/Linux/Windows
-- XXX write a test, that generates all types of events that can possibly
-- occur on each platform. Then create a common test that only checks
-- for the common subset of those.
-------------------------------------------------------------------------------
-- Creating a watch
-------------------------------------------------------------------------------
-- macOS and Windows seem to do a path level watch i.e. they watch the path
-- rather than the underlying inode, possibly they can map the inode back to a
-- path?. On Linux we cannot keep watching a watch root if the inode it is
-- pointing to has moved. However, the inode continues to be watched in this
-- case instead of the path, which is not the case for macOS.
--
-- To make the behavior same for all platforms, for the Linux case we will have
-- to stop watching the inode if it moves from under the path
-- (setUnwatchMoved). And on macOS we can stop watching a path if the inode
-- under it changes i.e. just filter out events for that path.
--
-- Also, to make the behavior of Linux same as macOS we need to set
-- setFollowSymLinks on Linux.
-- XXX Verify all the cases mentioned below using test cases.
-- XXX Make it fail if the watch root is a file. It works on macOS and Linux
-- but does not work on Windows.
-- | Start monitoring a list of directories or symbolic links to directories
-- for file system events. Monitoring starts from the current time onwards.
-- The paths are specified as UTF-8 encoded 'Array' of 'Word8'.
--
-- If a watch root is a symbolic link then the target of the link is watched.
-- Fails if the watched path does not exist. If the user does not have
-- permissions (read and execute?) on the watch root then no events are
-- generated. No events are generated if the watch root itself is renamed or
-- deleted.
--
-- This API watches for changes in the watch root directory only, any changes
-- in the subdirectories of the watch root are not watched. However, on macOS
-- the watch is always recursive, but do not rely on that behavior, it may
-- change without notice in future. If you want to use recursive watch please
-- use platform specific modules.
--
-- /Pre-release/
--
watch :: NonEmpty (Array Word8) -> Stream IO Event
#if defined(CABAL_OS_DARWIN)
watch = Event.watchRecursive
#else
watch = Event.watch
#endif
-- | Like 'watch' except that if a watched path is a directory the whole
-- directory tree under it is watched recursively.
--
-- On Linux 'watchRecursive' may be more expensive than 'watch'.
--
-- /Pre-release/
--
_watchRecursive :: NonEmpty (Array Word8) -> Stream IO Event
_watchRecursive = Event.watchRecursive
-------------------------------------------------------------------------------
-- Handling Events
-------------------------------------------------------------------------------
-- XXX Ensure that on all paltforms the path has same conventions. That is do
-- not have a trailing path separator on one platfrom and not on another.
--
-- XXX We should use getRelPath instead so that the behavior when the root is a
-- symlink becomes platform independent.
--
-- | Get the absolute path of the file system object for which the event is
-- generated. The path is a UTF-8 encoded array of bytes.
--
-- When the watch root is a symlink the behavior is different on different
-- platforms:
--
-- * On Linux and Windows, the absolute path returned is via the original
-- symlink.
--
-- * On macOS the absolute path returned is via the real path of the root after
-- resolving the symlink.
--
-- This API is subject to removal in future, to be replaced by a platform
-- independent @getRelPath@.
--
-- /Pre-release/
--
getAbsPath :: Event -> Array Word8
getAbsPath = Event.getAbsPath
-- | Determine whether the event indicates creation of an object within the
-- monitored path. This event is generated when any file system object is
-- created.
--
-- For hard links the behavior is different on different operating systems. On
-- macOS hard linking does not generate a create event, it generates an
-- 'isInodeAttrsChanged' event on the directory instead (see the Darwin
-- module). On Linux and Windows hard linking generates a create event.
--
-- /Pre-release/
--
isCreated :: Event -> Bool
isCreated = Event.isCreated
-- XXX To make the behavior consistent, can we block the event on watch root on
-- macOS?
--
-- | Determine whether the event indicates deletion of an object within the
-- monitored path. On Linux and Windows hard link deletion generates a delete
-- event.
--
-- On Linux and Windows, this event does not occur when the watch root itself
-- is deleted. On macOS it occurs on deleting the watch root when it is not a
-- symbolic link.
--
-- See also 'isRootDeleted' event for Linux.
--
-- /Pre-release/
--
isDeleted :: Event -> Bool
isDeleted = Event.isDeleted
-- | Determine whether the event indicates rename of an object within the
-- monitored path. This event is generated when an object is renamed within the
-- watched directory or if it is moved out of or in the watched directory.
-- Moving hard links is no different than other types of objects.
--
-- /Pre-release/
--
isMoved :: Event -> Bool
isMoved = Event.isMoved
-- XXX Make the Windows behavior consistent with other platforms by removing
-- the event from directories.
--
-- | Determine whether the event indicates modification of an object within the
-- monitored path. This event is generated on file modification on all
-- platforms.
--
-- On Linux and macOS this event is never generated for directories. On
-- Windows (in recursive watch mode) this event is generated for directories as
-- well when an object is created in or deleted from the directory.
--
-- /Pre-release/
--
isModified :: Event -> Bool
isModified = Event.isModified
-- | An event that indicates that some events before this may have been lost,
-- therefore, we need to take some recovery action.
--
-- /Pre-release/
--
isEventsLost :: Event -> Bool
isEventsLost = Event.isEventsLost
-------------------------------------------------------------------------------
-- Debugging
-------------------------------------------------------------------------------
-- | Convert an 'Event' record to a String representation. Note that the output
-- of this function may be different on different platforms because it may
-- contain platform specific details.
--
-- /Internal/
--
showEvent :: Event -> String
showEvent = Event.showEvent

View File

@ -1,63 +0,0 @@
#include <config.h>
struct watch;
struct pathName {
const UInt8 *pathBytes;
size_t pathLen;
};
int createWatch
( struct pathName* folders
, int n /* number of entries in "folders" */
, UInt32 createFlags
, UInt64 since
, double latency
, int* fd
, void** wp
);
void destroyWatch(struct watch* w);
/******************************************************************************
* Create Flags
*****************************************************************************/
UInt32 FSEventStreamCreateFlagNoDefer ();
UInt32 FSEventStreamCreateFlagWatchRoot ();
UInt32 FSEventStreamCreateFlagFileEvents ();
UInt32 FSEventStreamCreateFlagIgnoreSelf ();
#if HAVE_DECL_KFSEVENTSTREAMCREATEFLAGFULLHISTORY
UInt32 FSEventStreamCreateFlagFullHistory;
#endif
/******************************************************************************
* Event Flags
*****************************************************************************/
UInt32 FSEventStreamEventFlagEventIdsWrapped ();
UInt32 FSEventStreamEventFlagMustScanSubDirs ();
UInt32 FSEventStreamEventFlagKernelDropped ();
UInt32 FSEventStreamEventFlagUserDropped ();
UInt32 FSEventStreamEventFlagHistoryDone ();
UInt32 FSEventStreamEventFlagRootChanged ();
UInt32 FSEventStreamEventFlagMount ();
UInt32 FSEventStreamEventFlagUnmount ();
UInt32 FSEventStreamEventFlagItemChangeOwner ();
UInt32 FSEventStreamEventFlagItemInodeMetaMod ();
UInt32 FSEventStreamEventFlagItemFinderInfoMod ();
UInt32 FSEventStreamEventFlagItemXattrMod ();
UInt32 FSEventStreamEventFlagItemCreated ();
UInt32 FSEventStreamEventFlagItemRemoved ();
UInt32 FSEventStreamEventFlagItemRenamed ();
UInt32 FSEventStreamEventFlagItemModified ();
#if HAVE_DECL_KFSEVENTSTREAMEVENTFLAGITEMCLONED
UInt32 FSEventStreamEventFlagItemCloned ();
#endif
UInt32 FSEventStreamEventFlagItemIsDir ();
UInt32 FSEventStreamEventFlagItemIsFile ();
UInt32 FSEventStreamEventFlagItemIsSymlink ();
#if HAVE_DECL_KFSEVENTSTREAMEVENTFLAGITEMISHARDLINK
UInt32 FSEventStreamEventFlagItemIsHardlink ();
#endif
UInt32 FSEventStreamEventFlagItemIsLastHardlink ();

File diff suppressed because it is too large Load Diff

View File

@ -1,294 +0,0 @@
/*
* Code adapted from the Haskell "hfsevents" package.
*
* Copyright (c) 2012, Luite Stegeman
*
*/
#include <config.h>
#if HAVE_DECL_KFSEVENTSTREAMCREATEFLAGFILEEVENTS
#include <CoreServices/CoreServices.h>
#include <pthread.h>
#include <unistd.h>
#include <FileSystem/Event/Darwin.h>
/*
* For reference documentaion see:
* https://developer.apple.com/documentation/coreservices/file_system_events?language=objc
* https://developer.apple.com/library/archive/documentation/Darwin/Conceptual/FSEvents_ProgGuide/UsingtheFSEventsFramework/UsingtheFSEventsFramework.html
*
* An OS thread is started which runs the event loop. A pipe is created
* and the events are sent to the pipes. The receiver can read the pipe
* output end to get the events.
*/
/******************************************************************************
* Create Flags
*****************************************************************************/
UInt32 FSEventStreamCreateFlagNoDefer () {
return kFSEventStreamCreateFlagNoDefer;
}
UInt32 FSEventStreamCreateFlagWatchRoot () {
return kFSEventStreamCreateFlagWatchRoot;
}
UInt32 FSEventStreamCreateFlagFileEvents () {
return kFSEventStreamCreateFlagFileEvents;
}
UInt32 FSEventStreamCreateFlagIgnoreSelf () {
return kFSEventStreamCreateFlagIgnoreSelf;
}
#if 0
UInt32 FSEventStreamCreateFlagFullHistory = kFSEventStreamCreateFlagFullHistory;
#endif
/******************************************************************************
* Event Flags
*****************************************************************************/
UInt32 FSEventStreamEventFlagEventIdsWrapped () {
return kFSEventStreamEventFlagEventIdsWrapped;
}
UInt32 FSEventStreamEventFlagMustScanSubDirs () {
return kFSEventStreamEventFlagMustScanSubDirs;
}
UInt32 FSEventStreamEventFlagKernelDropped () {
return kFSEventStreamEventFlagKernelDropped;
}
UInt32 FSEventStreamEventFlagUserDropped () {
return kFSEventStreamEventFlagUserDropped;
}
UInt32 FSEventStreamEventFlagHistoryDone () {
return kFSEventStreamEventFlagHistoryDone;
}
UInt32 FSEventStreamEventFlagRootChanged () {
return kFSEventStreamEventFlagRootChanged;
}
UInt32 FSEventStreamEventFlagMount () {
return kFSEventStreamEventFlagMount;
}
UInt32 FSEventStreamEventFlagUnmount () {
return kFSEventStreamEventFlagUnmount;
}
UInt32 FSEventStreamEventFlagItemChangeOwner () {
return kFSEventStreamEventFlagItemChangeOwner;
}
UInt32 FSEventStreamEventFlagItemInodeMetaMod () {
return kFSEventStreamEventFlagItemInodeMetaMod;
}
UInt32 FSEventStreamEventFlagItemFinderInfoMod () {
return kFSEventStreamEventFlagItemFinderInfoMod;
}
UInt32 FSEventStreamEventFlagItemXattrMod () {
return kFSEventStreamEventFlagItemXattrMod;
}
UInt32 FSEventStreamEventFlagItemCreated () {
return kFSEventStreamEventFlagItemCreated;
}
UInt32 FSEventStreamEventFlagItemRemoved () {
return kFSEventStreamEventFlagItemRemoved;
}
UInt32 FSEventStreamEventFlagItemRenamed () {
return kFSEventStreamEventFlagItemRenamed;
}
UInt32 FSEventStreamEventFlagItemModified () {
return kFSEventStreamEventFlagItemModified;
}
#if __MAC_OS_X_VERSION_MIN_REQUIRED >= 101300
UInt32 FSEventStreamEventFlagItemCloned () {
return kFSEventStreamEventFlagItemCloned;
}
#endif
UInt32 FSEventStreamEventFlagItemIsDir () {
return kFSEventStreamEventFlagItemIsDir;
}
UInt32 FSEventStreamEventFlagItemIsFile () {
return kFSEventStreamEventFlagItemIsFile;
}
UInt32 FSEventStreamEventFlagItemIsSymlink () {
return kFSEventStreamEventFlagItemIsSymlink;
}
#if __MAC_OS_X_VERSION_MIN_REQUIRED >= 101000
UInt32 FSEventStreamEventFlagItemIsHardlink () {
return kFSEventStreamEventFlagItemIsHardlink;
}
#endif
UInt32 FSEventStreamEventFlagItemIsLastHardlink () {
return kFSEventStreamEventFlagItemIsLastHardlink;
}
/******************************************************************************
* Event watch
*****************************************************************************/
/* Write an event to the pipe input fd */
static void writeEvent(int fd, UInt64 eventId, UInt64 eventFlags, char* path)
{
UInt64 buf[3];
buf[0] = eventId;
buf[1] = eventFlags;
/* XXX Is the path string in UTF-8? */
buf[2] = (UInt64)strlen(path);
write(fd, buf, 3 * sizeof(UInt64));
write(fd, path, strlen(path));
}
/* thread state */
struct watch
{
FSEventStreamRef eventStream;
CFRunLoopRef runLoop;
int writefd;
pthread_mutex_t mut;
};
/* Just writes the event to the pipe input fd */
static void watchCallback
( ConstFSEventStreamRef streamRef
, void *clientCallBackInfo
, size_t n
, void *eventPaths
, const FSEventStreamEventFlags eventFlags[]
, const FSEventStreamEventId eventIds[]
)
{
int i;
struct watch *w = clientCallBackInfo;
char **paths = eventPaths;
for (i = 0; i < n; i++) {
writeEvent(w->writefd, eventIds[i], eventFlags[i], paths[i]);
}
}
/******************************************************************************
* Start a watch event loop
*****************************************************************************/
/* Event loop run in a pthread */
static void *watchRunLoop(void *vw)
{
struct watch* w = (struct watch*) vw;
CFRunLoopRef rl = CFRunLoopGetCurrent();
CFRetain(rl);
w->runLoop = rl;
FSEventStreamScheduleWithRunLoop(w->eventStream, rl, kCFRunLoopDefaultMode);
FSEventStreamStart(w->eventStream);
pthread_mutex_unlock(&w->mut);
CFRunLoopRun();
pthread_exit(NULL);
}
#define MAX_WATCH_PATHS 4096
int createWatch
( struct pathName* folders
, int n /* number of entries in folders */
, UInt32 createFlags
, UInt64 since
, double latency
, int* fd
, void** wp
)
{
if (n > MAX_WATCH_PATHS) {
return -1;
}
int pfds[2];
if (pipe (pfds)) {
return -1;
}
/*
* XXX We can possibly use since == 0 to get all events since
* beginning of time
*/
if (!since) {
since = kFSEventStreamEventIdSinceNow;
}
/* Setup paths array */
CFStringRef *cffolders = malloc(n * sizeof(CFStringRef));
int i;
for(i = 0; i < n; i++) {
cffolders[i] = CFStringCreateWithBytes
( NULL
, folders[i].pathBytes
, folders[i].pathLen
, kCFStringEncodingUTF8
, false
);
}
CFArrayRef paths = CFArrayCreate(NULL, (const void **)cffolders, n, NULL);
/* Setup context */
struct watch *w = malloc(sizeof(struct watch));
FSEventStreamContext ctx;
ctx.version = 0;
ctx.info = (void*)w;
ctx.retain = NULL;
ctx.release = NULL;
ctx.copyDescription = NULL;
/* Create watch using paths and context*/
FSEventStreamRef es = FSEventStreamCreate
(NULL, &watchCallback, &ctx, paths, since, latency, createFlags);
/* Run the event loop in a pthread */
int retval;
if(es != NULL) {
/* Success */
w->writefd = pfds[1];
w->eventStream = es;
w->runLoop = NULL;
/* Lock to prevent race against watch destroy */
pthread_mutex_init(&w->mut, NULL);
pthread_mutex_lock(&w->mut);
pthread_t t;
pthread_create(&t, NULL, &watchRunLoop, (void*)w);
/* return the out fd and the watch struct */
*fd = pfds[0];
*wp = w;
retval = 0;
} else {
/* Failure */
close(pfds[0]);
close(pfds[1]);
free(w);
retval = -1;
}
/* Cleanup */
for (i = 0; i < n; i++) {
CFRelease (cffolders[i]);
}
free(cffolders);
CFRelease(paths);
return retval;
}
/******************************************************************************
* Stop a watch event loop
*****************************************************************************/
void destroyWatch(struct watch* w) {
/* Stop the loop so the thread will exit */
pthread_mutex_lock(&w->mut);
FSEventStreamStop(w->eventStream);
FSEventStreamInvalidate(w->eventStream);
CFRunLoopStop(w->runLoop);
CFRelease(w->runLoop);
FSEventStreamRelease(w->eventStream);
close(w->writefd);
pthread_mutex_unlock(&w->mut);
/* Cleanup */
pthread_mutex_destroy(&w->mut);
free(w);
}
#endif

File diff suppressed because it is too large Load Diff

View File

@ -1,640 +0,0 @@
-- Some code snippets are adapted from the fsnotify package.
-- http://hackage.haskell.org/package/fsnotify-0.3.0.1/
--
-- |
-- Module : Streamly.Internal.FileSystem.Event.Windows
-- Copyright : (c) 2020 Composewell Technologies
-- (c) 2012, Mark Dittmer
-- License : BSD-3-Clause
-- Maintainer : streamly@composewell.com
-- Stability : pre-release
-- Portability : GHC
--
-- =Overview
--
-- Use 'watchRecursive'or 'watch' with a list of file system dir paths you
-- want to watch as argument. It returns a stream of 'Event' representing the
-- file system events occurring under the watched paths.
--
-- @
-- {-\# LANGUAGE MagicHash #-}
-- Stream.mapM_ (putStrLn . 'showEvent') $ 'watchRecursive' [Array.fromList "path"]
-- @
--
-- 'Event' is an opaque type. Accessor functions (e.g. 'showEvent' above)
-- provided in this module are used to determine the attributes of the event.
--
-- =Design notes
--
-- For Windows reference documentation see:
--
-- * <https://docs.microsoft.com/en-us/windows/win32/api/winnt/ns-winnt-file_notify_information file notify information>
-- * <https://docs.microsoft.com/en-us/windows/win32/api/winbase/nf-winbase-readdirectorychangesw read directory changes>
--
-- We try to keep the macOS\/Linux/Windows event handling APIs and defaults
-- semantically and syntactically as close as possible.
--
-- =Availability
--
-- As per the Windows reference docs, the fs event notification API is
-- available in:
--
-- * Minimum supported client: Windows XP [desktop apps | UWP apps]
-- * Minimum supported server: Windows Server 2003 [desktop apps | UWP apps
module Streamly.Internal.FileSystem.Event.Windows
(
-- * Subscribing to events
-- ** Default configuration
Config
, defaultConfig
-- ** Watch Behavior
, setRecursiveMode
-- ** Events of Interest
, setFileNameEvents
, setDirNameEvents
, setAttrsModified
, setSecurityModified
, setSizeModified
, setLastWriteTimeModified
, setAllEvents
-- ** Watch APIs
, watch
, watchRecursive
, watchWith
-- * Handling Events
, Event(..)
, getRoot
, getRelPath
, getAbsPath
-- ** Item CRUD events
, isCreated
, isDeleted
, isModified
, isMoved
, isMovedFrom
, isMovedTo
-- ** Exception Conditions
, isEventsLost
-- * Debugging
, showEvent
)
where
import Data.Bits ((.|.), (.&.), complement)
import Data.Char (ord)
import Data.Functor.Identity (runIdentity)
import Data.List.NonEmpty (NonEmpty)
import Data.Word (Word8)
import Foreign.C.String (peekCWStringLen)
import Foreign.Marshal.Alloc (alloca, allocaBytes)
import Foreign.Storable (peekByteOff)
import Foreign.Ptr (Ptr, FunPtr, castPtr, nullPtr, nullFunPtr, plusPtr)
import System.Win32.File
( FileNotificationFlag
, LPOVERLAPPED
, closeHandle
, createFile
, fILE_FLAG_BACKUP_SEMANTICS
, fILE_LIST_DIRECTORY
, fILE_NOTIFY_CHANGE_FILE_NAME
, fILE_NOTIFY_CHANGE_DIR_NAME
, fILE_NOTIFY_CHANGE_ATTRIBUTES
, fILE_NOTIFY_CHANGE_SIZE
, fILE_NOTIFY_CHANGE_LAST_WRITE
, fILE_NOTIFY_CHANGE_SECURITY
, fILE_SHARE_READ
, fILE_SHARE_WRITE
, oPEN_EXISTING
)
import System.Win32.Types (BOOL, DWORD, HANDLE, LPVOID, LPDWORD, failIfFalse_)
import Streamly.Data.Array (Array)
import Streamly.Data.Stream (Stream)
import Streamly.Data.Stream.Prelude (eager)
import qualified Data.List.NonEmpty as NonEmpty
import qualified Streamly.Data.Array as A (fromList)
import qualified Streamly.Data.Fold as Fold
import qualified Streamly.Data.Stream as S
import qualified Streamly.Data.Stream.Prelude as S
import qualified Streamly.Unicode.Stream as U
import qualified Streamly.Internal.Unicode.Utf8 as UTF8 (pack, toArray)
import qualified Streamly.Internal.Data.Array as A (read)
-- | Watch configuration, used to specify the events of interest and the
-- behavior of the watch.
--
-- /Pre-release/
--
data Config = Config
{ watchRec :: BOOL
, createFlags :: DWORD
}
-------------------------------------------------------------------------------
-- Boolean settings
-------------------------------------------------------------------------------
setFlag :: DWORD -> Bool -> Config -> Config
setFlag mask status cfg@Config{..} =
let flags =
if status
then createFlags .|. mask
else createFlags .&. complement mask
in cfg {createFlags = flags}
-- | Set watch event on directory recursively.
--
-- /default: False/
--
-- /Pre-release/
--
setRecursiveMode :: Bool -> Config -> Config
setRecursiveMode recursive cfg@Config{} = cfg {watchRec = recursive}
-- | Generate notify events on file create, rename or delete.
--
-- From Windows API documentation: Any file name change in the watched
-- directory or subtree causes a change notification wait operation to return.
-- Changes include renaming, creating, or deleting a file.
--
-- /default: True/
--
-- /Pre-release/
--
setFileNameEvents :: Bool -> Config -> Config
setFileNameEvents = setFlag fILE_NOTIFY_CHANGE_FILE_NAME
-- | Generate notify events on directory create, rename or delete.
--
-- From Windows API documentaiton: Any directory-name change in the watched
-- directory or subtree causes a change notification wait operation to return.
-- Changes include creating or deleting a directory.
--
-- /default: True/
--
-- /Pre-release/
--
setDirNameEvents :: Bool -> Config -> Config
setDirNameEvents = setFlag fILE_NOTIFY_CHANGE_DIR_NAME
-- | Generate an 'isModified' event on any attribute change in the watched
-- directory or subtree.
--
-- /default: False/
--
-- /Pre-release/
--
setAttrsModified :: Bool -> Config -> Config
setAttrsModified = setFlag fILE_NOTIFY_CHANGE_ATTRIBUTES
-- | Generate an 'isModified' event when the file size is changed.
--
-- From Windows API documentation: Any file-size change in the watched
-- directory or subtree causes a change notification wait operation to return.
-- The operating system detects a change in file size only when the file is
-- written to the disk. For operating systems that use extensive caching,
-- detection occurs only when the cache is sufficiently flushed.
--
-- /default: False/
--
-- /Pre-release/
--
setSizeModified :: Bool -> Config -> Config
setSizeModified = setFlag fILE_NOTIFY_CHANGE_SIZE
-- | Generate an 'isModified' event when the last write timestamp of the file
-- inode is changed.
--
-- From Windows API documentation: Any change to the last write-time of files
-- in the watched directory or subtree causes a change notification wait
-- operation to return. The operating system detects a change to the last
-- write-time only when the file is written to the disk. For operating systems
-- that use extensive caching, detection occurs only when the cache is
-- sufficiently flushed.
--
-- /default: False/
--
-- /Pre-release/
--
setLastWriteTimeModified :: Bool -> Config -> Config
setLastWriteTimeModified = setFlag fILE_NOTIFY_CHANGE_LAST_WRITE
-- | Generate an 'isModified' event when any security-descriptor change occurs
-- in the watched directory or subtree.
--
-- /default: False/
--
-- /Pre-release/
--
setSecurityModified :: Bool -> Config -> Config
setSecurityModified = setFlag fILE_NOTIFY_CHANGE_SECURITY
-- | Set all tunable events 'True' or 'False'. Equivalent to setting:
--
-- * setFileNameEvents
-- * setDirNameEvents
-- * setAttrsModified
-- * setSizeModified
-- * setLastWriteTimeModified
-- * setSecurityModified
--
-- /Pre-release/
--
setAllEvents :: Bool -> Config -> Config
setAllEvents s =
setFileNameEvents s
. setDirNameEvents s
. setAttrsModified s
. setSizeModified s
. setLastWriteTimeModified s
. setSecurityModified s
-- | The tunable events that are enabled by default are:
--
-- * setFileNameEvents True
-- * setDirNameEvents True
-- * setSizeModified True
-- * setLastWriteTimeModified True
--
-- /Pre-release/
--
defaultConfig :: Config
defaultConfig =
setFileNameEvents True
$ setDirNameEvents True
$ setSizeModified True
$ setLastWriteTimeModified True
$ Config {watchRec = False, createFlags = 0}
getConfigFlag :: Config -> DWORD
getConfigFlag Config{..} = createFlags
getConfigRecMode :: Config -> BOOL
getConfigRecMode Config{..} = watchRec
data Event = Event
{ eventFlags :: DWORD
, eventRelPath :: String
, eventRootPath :: String
, totalBytes :: DWORD
} deriving (Show, Ord, Eq)
-- For reference documentation see:
--
-- See https://docs.microsoft.com/en-us/windows/win32/api/winnt/ns-winnt-file_notify_information
data FILE_NOTIFY_INFORMATION = FILE_NOTIFY_INFORMATION
{ fniNextEntryOffset :: DWORD
, fniAction :: DWORD
, fniFileName :: String
} deriving Show
type LPOVERLAPPED_COMPLETION_ROUTINE =
FunPtr ((DWORD, DWORD, LPOVERLAPPED) -> IO ())
-- | A handle for a watch.
getWatchHandle :: FilePath -> IO (HANDLE, FilePath)
getWatchHandle dir = do
h <- createFile dir
-- Access mode
fILE_LIST_DIRECTORY
-- Share mode
(fILE_SHARE_READ .|. fILE_SHARE_WRITE)
-- Security attributes
Nothing
-- Create mode, we want to look at an existing directory
oPEN_EXISTING
-- File attribute, NOT using OVERLAPPED since we work synchronously
fILE_FLAG_BACKUP_SEMANTICS
-- No template file
Nothing
return (h, dir)
-- For reference documentation see:
--
-- See https://docs.microsoft.com/en-us/windows/win32/api/winbase/nf-winbase-readdirectorychangesw
-- Note that this API uses UTF-16 for file system paths:
-- 1. https://docs.microsoft.com/en-us/windows/win32/intl/unicode-in-the-windows-api
-- 2. https://docs.microsoft.com/en-us/windows/win32/intl/unicode
foreign import ccall safe
"windows.h ReadDirectoryChangesW" c_ReadDirectoryChangesW ::
HANDLE
-> LPVOID
-> DWORD
-> BOOL
-> DWORD
-> LPDWORD
-> LPOVERLAPPED
-> LPOVERLAPPED_COMPLETION_ROUTINE
-> IO BOOL
readDirectoryChangesW ::
HANDLE
-> Ptr FILE_NOTIFY_INFORMATION
-> DWORD
-> BOOL
-> FileNotificationFlag
-> LPDWORD
-> IO ()
readDirectoryChangesW h buf bufSize wst f br =
let res = c_ReadDirectoryChangesW
h (castPtr buf) bufSize wst f br nullPtr nullFunPtr
in failIfFalse_ "ReadDirectoryChangesW" res
peekFNI :: Ptr FILE_NOTIFY_INFORMATION -> IO FILE_NOTIFY_INFORMATION
peekFNI buf = do
neof <- peekByteOff buf 0
acti <- peekByteOff buf 4
fnle <- peekByteOff buf 8
-- Note: The path is UTF-16 encoded C WChars, peekCWStringLen converts
-- UTF-16 to UTF-32 Char String
fnam <- peekCWStringLen
-- start of array
(buf `plusPtr` 12,
-- fnle is the length in *bytes*, and a WCHAR is 2 bytes
fromEnum (fnle :: DWORD) `div` 2)
return $ FILE_NOTIFY_INFORMATION neof acti fnam
readChangeEvents ::
Ptr FILE_NOTIFY_INFORMATION -> String -> DWORD -> IO [Event]
readChangeEvents pfni root bytesRet = do
fni <- peekFNI pfni
let entry = Event
{ eventFlags = fniAction fni
, eventRelPath = fniFileName fni
, eventRootPath = root
, totalBytes = bytesRet
}
nioff = fromEnum $ fniNextEntryOffset fni
entries <-
if nioff == 0
then return []
else readChangeEvents (pfni `plusPtr` nioff) root bytesRet
return $ entry : entries
readDirectoryChanges ::
String -> HANDLE -> Bool -> FileNotificationFlag -> IO [Event]
readDirectoryChanges root h wst mask = do
let maxBuf = 63 * 1024
allocaBytes maxBuf $ \buffer -> do
alloca $ \bret -> do
readDirectoryChangesW h buffer (toEnum maxBuf) wst mask bret
bytesRet <- peekByteOff bret 0
readChangeEvents buffer root bytesRet
-- XXX Try to get these from windows header files
type FileAction = DWORD
fILE_ACTION_ADDED :: FileAction
fILE_ACTION_ADDED = 1
fILE_ACTION_REMOVED :: FileAction
fILE_ACTION_REMOVED = 2
fILE_ACTION_MODIFIED :: FileAction
fILE_ACTION_MODIFIED = 3
fILE_ACTION_RENAMED_OLD_NAME :: FileAction
fILE_ACTION_RENAMED_OLD_NAME = 4
fILE_ACTION_RENAMED_NEW_NAME :: FileAction
fILE_ACTION_RENAMED_NEW_NAME = 5
eventStreamAggr :: (HANDLE, FilePath, Config) -> Stream IO Event
eventStreamAggr (handle, rootPath, cfg) = do
let recMode = getConfigRecMode cfg
flagMasks = getConfigFlag cfg
repeatM = S.sequence . S.repeat
S.concatMap S.fromList $ repeatM
$ readDirectoryChanges rootPath handle recMode flagMasks
pathsToHandles ::
NonEmpty FilePath -> Config -> Stream IO (HANDLE, FilePath, Config)
pathsToHandles paths cfg = do
let pathStream = S.fromList (NonEmpty.toList paths)
st2 = S.mapM getWatchHandle pathStream
fmap (\(h, f) -> (h, f, cfg)) st2
-------------------------------------------------------------------------------
-- Utilities
-------------------------------------------------------------------------------
utf8ToString :: Array Word8 -> FilePath
utf8ToString = runIdentity . S.fold Fold.toList . U.decodeUtf8 . A.read
utf8ToStringList :: NonEmpty (Array Word8) -> NonEmpty FilePath
utf8ToStringList = NonEmpty.map utf8ToString
-- | Close a Directory handle.
--
closePathHandleStream :: Stream IO (HANDLE, FilePath, Config) -> IO ()
closePathHandleStream =
let f (h, _, _) = closeHandle h
in S.fold (Fold.drainMapM f)
-- XXX
-- Document the path treatment for Linux/Windows/macOS modules.
-- Remove the utf-8 encoding requirement of paths? It can be encoding agnostic
-- "\" separated bytes, the application can decide what encoding to use.
-- Instead of always using widechar (-W) APIs can we call the underlying APIs
-- based on the configured code page?
-- https://docs.microsoft.com/en-us/windows/uwp/design/globalizing/use-utf8-code-page
--
-- | Start monitoring a list of directory paths for file system events with the
-- supplied configuration modifier operation over the' defaultConfig'. The
-- paths could be directories or symbolic links to directories.
--
-- When recursive mode is True, the whole directory tree under the path is
-- watched recursively. When recursive mode is False, only the files and
-- directories directly under the watched directory are monitored, contents of
-- subdirectories are not monitored. Monitoring starts from the current time
-- onwards. The paths are specified as UTF-8 encoded 'Array' of 'Word8'.
--
-- /Symbolic Links:/ If the pathname to be watched is a symbolic link then
-- watch the target of the symbolic link instead of the symbolic link itself.
-- Note that the path location in the events is through the original symbolic
-- link path rather than the resolved path.
--
-- @
-- watchWith
-- ('setAttrsModified' True . 'setLastWriteTimeModified' False)
-- [Array.fromList "dir"]
-- @
--
-- /Pre-release/
--
watchWith :: (Config -> Config) -> NonEmpty (Array Word8) -> Stream IO Event
watchWith f paths =
S.bracketIO before after (S.parConcatMap (eager True) eventStreamAggr)
where
before = return $ pathsToHandles (utf8ToStringList paths) $ f defaultConfig
after = closePathHandleStream
-- | Same as 'watchWith' using 'defaultConfig' and recursive mode.
--
-- >>> watchRecursive = watchWith (setRecursiveMode True)
--
-- /Pre-release/
--
watchRecursive :: NonEmpty (Array Word8) -> Stream IO Event
watchRecursive = watchWith (setRecursiveMode True)
-- | Same as 'watchWith' using defaultConfig and non-recursive mode.
--
-- >>> watch = watchWith id
--
-- /Pre-release/
--
watch :: NonEmpty (Array Word8) -> Stream IO Event
watch = watchWith id
getFlag :: DWORD -> Event -> Bool
getFlag mask Event{..} = eventFlags == mask
-- XXX Change the type to Array Word8 to make it compatible with other APIs.
--
-- | Get the file system object path for which the event is generated, relative
-- to the watched root. The path is a UTF-8 encoded array of bytes.
--
-- /Pre-release/
--
getRelPath :: Event -> Array Word8
getRelPath Event{..} = (UTF8.toArray . UTF8.pack) eventRelPath
-- XXX Change the type to Array Word8 to make it compatible with other APIs.
--
-- | Get the watch root directory to which this event belongs.
--
-- /Pre-release/
--
getRoot :: Event -> Array Word8
getRoot Event{..} = (UTF8.toArray . UTF8.pack) eventRootPath
-- | Get the absolute file system object path for which the event is generated.
--
-- When the watch root is a symlink, the absolute path returned is via the
-- original symlink and not through the resolved path.
--
-- /Pre-release/
--
getAbsPath :: Event -> Array Word8
getAbsPath ev = getRoot ev <> backSlash <> getRelPath ev
where backSlash = A.fromList [ fromIntegral (ord '\\') ]
-- XXX need to document the exact semantics of these.
--
-- | This event is generated when a file or directory is created in a watched
-- directory or directory tree when in recursive watch mode. Creating a hard
-- link also generates this event.
--
-- /Occurs when either 'setFileNameEvents' or 'setDirNameEvents' is enabled/
--
-- /Pre-release/
--
isCreated :: Event -> Bool
isCreated = getFlag fILE_ACTION_ADDED
-- | This event is generated when a file or directory is deleted from the
-- watched directory or directory tree when in recursive mode. This event is
-- generated even when a hard link is deleted.
--
-- /Occurs when either 'setFileNameEvents' or 'setDirNameEvents' is enabled/
--
-- /Pre-release/
--
isDeleted :: Event -> Bool
isDeleted = getFlag fILE_ACTION_REMOVED
-- | Generated for the original path when an object is moved from under a
-- monitored directory.
--
-- /Occurs when either 'setFileNameEvents' or 'setDirNameEvents' is enabled/
--
-- /Pre-release/
--
isMovedFrom :: Event -> Bool
isMovedFrom = getFlag fILE_ACTION_RENAMED_OLD_NAME
-- | Generated for the new path when an object is moved under a monitored
-- directory.
--
-- /Occurs when either 'setFileNameEvents' or 'setDirNameEvents' is enabled/
--
-- /Pre-release/
--
isMovedTo :: Event -> Bool
isMovedTo = getFlag fILE_ACTION_RENAMED_NEW_NAME
-- | Generated for a path that is moved from or moved to the monitored
-- directory.
--
-- >>> isMoved ev = isMovedFrom ev || isMovedTo ev
--
-- /Occurs when either 'setFileNameEvents' or 'setDirNameEvents' is enabled/
-- /Occurs only for an object inside the watched directory/
--
-- /Pre-release/
--
isMoved :: Event -> Bool
isMoved ev = isMovedFrom ev || isMovedTo ev
-- XXX This event is generated only for files and not directories?
--
-- | This event occurs when a file or directory contents, timestamps or
-- attributes are modified. Since it can occur on multiple changes, you may
-- have to check the attributes to know what exactly changed when multiple type
-- of modified events are enabled.
--
-- In non-recursive mode this event does not occur for directories. In
-- recursive mode this event occurs for the parent directory if a file or
-- directory inside it is created or renamed.
--
-- /Occurs when one of the @set*Modified@ events is enabled/
--
-- /Pre-release/
--
isModified :: Event -> Bool
isModified = getFlag fILE_ACTION_MODIFIED
-- | If the kernel event buffer overflows, entire contents of the buffer are
-- discarded, therefore, events are lost. The user application must scan
-- everything under the watched paths to know the current state of the file
-- system tree.
--
-- /Pre-release/
--
isEventsLost :: Event -> Bool
isEventsLost Event{..} = totalBytes == 0
-------------------------------------------------------------------------------
-- Debugging
-------------------------------------------------------------------------------
-- | Convert an 'Event' record to a String representation.
showEvent :: Event -> String
showEvent ev@Event{..} =
"--------------------------"
++ "\nRoot = " ++ utf8ToString (getRoot ev)
++ "\nPath = " ++ utf8ToString (getRelPath ev)
++ "\ngetAbsPath = " ++ utf8ToString (getAbsPath ev)
++ "\nFlags " ++ show eventFlags
++ showev isEventsLost "Overflow"
++ showev isCreated "Created"
++ showev isDeleted "Deleted"
++ showev isModified "Modified"
++ showev isMovedFrom "MovedFrom"
++ showev isMovedTo "MovedTo"
++ "\n"
where showev f str = if f ev then "\n" ++ str else ""

View File

@ -1,585 +0,0 @@
#include "inline.hs"
-- |
-- Module : Streamly.Internal.FileSystem.FD
-- Copyright : (c) 2019 Composewell Technologies
--
-- License : BSD3
-- Maintainer : streamly@composewell.com
-- Stability : experimental
-- Portability : GHC
--
-- This module is a an experimental replacement for
-- "Streamly.FileSystem.Handle". The former module provides IO facilities based
-- on the GHC Handle type. The APIs in this module avoid the GHC handle layer
-- and provide more explicit control over buffering.
--
-- Read and write data as streams and arrays to and from files.
--
-- This module provides read and write APIs based on handles. Before reading or
-- writing, a file must be opened first using 'openFile'. The 'Handle' returned
-- by 'openFile' is then used to access the file. A 'Handle' is backed by an
-- operating system file descriptor. When the 'Handle' is garbage collected the
-- underlying file descriptor is automatically closed. A handle can be
-- explicitly closed using 'closeFile'.
--
-- Reading and writing APIs are divided into two categories, sequential
-- streaming APIs and random or seekable access APIs. File IO APIs are quite
-- similar to "Streamly.Data.Array.Foreign" read write APIs. In that regard, arrays can
-- be considered as in-memory files or files can be considered as on-disk
-- arrays.
--
-- > import qualified Streamly.Internal.FileSystem.FD as FD
--
module Streamly.Internal.FileSystem.FD
(
-- * File Handles
Handle
, stdin
, stdout
, stderr
, openFile
-- TODO file path based APIs
-- , readFile
-- , writeFile
-- * Streaming IO
-- | Streaming APIs read or write data to or from a file or device
-- sequentially, they never perform a seek to a random location. When
-- reading, the stream is lazy and generated on-demand as the consumer
-- consumes it. Read IO requests to the IO device are performed in chunks
-- of 32KiB, this is referred to as @defaultChunkSize@ in the
-- documentation. One IO request may or may not read the full chunk. If the
-- whole stream is not consumed, it is possible that we may read slightly
-- more from the IO device than what the consumer needed. Unless specified
-- otherwise in the API, writes are collected into chunks of
-- @defaultChunkSize@ before they are written to the IO device.
-- Streaming APIs work for all kind of devices, seekable or non-seekable;
-- including disks, files, memory devices, terminals, pipes, sockets and
-- fifos. While random access APIs work only for files or devices that have
-- random access or seek capability for example disks, memory devices.
-- Devices like terminals, pipes, sockets and fifos do not have random
-- access capability.
-- ** Read File to Stream
, read
-- , readUtf8
-- , readLines
-- , readFrames
, readInChunksOf
-- -- * Array Read
-- , readArrayUpto
-- , readArrayOf
, readArrays
, readArraysOfUpto
-- , readArraysOf
-- ** Write File from Stream
, write
-- , writeUtf8
-- , writeUtf8Lines
-- , writeFrames
, writeInChunksOf
-- -- * Array Write
-- , writeArray
, writeArrays
, writeArraysPackedUpto
-- XXX these are incomplete
-- , writev
-- , writevArraysPackedUpto
-- -- * Random Access (Seek)
-- -- | Unlike the streaming APIs listed above, these APIs apply to devices or
-- files that have random access or seek capability. This type of devices
-- include disks, files, memory devices and exclude terminals, pipes,
-- sockets and fifos.
--
-- , readIndex
-- , readSlice
-- , readSliceRev
-- , readAt -- read from a given position to th end of file
-- , readSliceArrayUpto
-- , readSliceArrayOf
-- , writeIndex
-- , writeSlice
-- , writeSliceRev
-- , writeAt -- start writing at the given position
-- , writeSliceArray
)
where
import Control.Monad.IO.Class (MonadIO(..))
import Data.Word (Word8)
-- import Foreign.ForeignPtr.Unsafe (unsafeForeignPtrToPtr)
import Foreign.Ptr (castPtr)
-- import System.IO (Handle, hGetBufSome, hPutBuf)
import System.IO (IOMode)
import Prelude hiding (read)
import qualified GHC.IO.FD as FD
import qualified GHC.IO.Device as RawIO
import Streamly.Data.Array (Array, Unbox)
import Streamly.Data.Stream (Stream)
import Streamly.Internal.Data.Array (byteLength, unsafeFreeze, unsafePinnedAsPtr)
import Streamly.Internal.System.IO (defaultChunkSize)
#if !defined(mingw32_HOST_OS)
{-
import Streamly.Internal.Data.Stream.IsStream.Type (toStreamD)
import Streamly.Internal.System.IOVec (groupIOVecsOf)
import qualified Streamly.Internal.FileSystem.FDIO as RawIO hiding (write)
import qualified Streamly.Internal.System.IOVec.Type as RawIO
-}
#endif
-- import Streamly.Data.Fold (Fold)
-- import Streamly.String (encodeUtf8, decodeUtf8, foldLines)
import qualified Streamly.Data.Array as A
import qualified Streamly.Data.Fold as FL
import qualified Streamly.Internal.Data.MutArray as MArray
(MutArray(..), unsafePinnedAsPtr, pinnedNewBytes)
import qualified Streamly.Internal.Data.Array.Stream as AS
import qualified Streamly.Internal.Data.Stream as S
import qualified Streamly.Internal.Data.Stream as D
(Stream(..), Step(..))
import qualified Streamly.Internal.Data.StreamK as K (mkStream)
-------------------------------------------------------------------------------
-- References
-------------------------------------------------------------------------------
--
-- The following references may be useful to build an understanding about the
-- file API design:
--
-- http://www.linux-mag.com/id/308/ for blocking/non-blocking IO on linux.
-- https://lwn.net/Articles/612483/ Non-blocking buffered file read operations
-- https://en.wikipedia.org/wiki/C_file_input/output for C APIs.
-- https://docs.oracle.com/javase/tutorial/essential/io/file.html for Java API.
-- https://www.w3.org/TR/FileAPI/ for http file API.
-------------------------------------------------------------------------------
-- Handles
-------------------------------------------------------------------------------
-- XXX attach a finalizer
-- | A 'Handle' is returned by 'openFile' and is subsequently used to perform
-- read and write operations on a file.
--
newtype Handle = Handle FD.FD
-- | File handle for standard input
stdin :: Handle
stdin = Handle FD.stdin
-- | File handle for standard output
stdout :: Handle
stdout = Handle FD.stdout
-- | File handle for standard error
stderr :: Handle
stderr = Handle FD.stderr
-- XXX we can support all the flags that the "open" system call supports.
-- Instead of using RTS locking mechanism can we use system provided locking
-- instead?
--
-- | Open a file that is not a directory and return a file handle.
-- 'openFile' enforces a multiple-reader single-writer locking on files. That
-- is, there may either be many handles on the same file which manage input, or
-- just one handle on the file which manages output. If any open handle is
-- managing a file for output, no new handle can be allocated for that file. If
-- any open handle is managing a file for input, new handles can only be
-- allocated if they do not manage output. Whether two files are the same is
-- implementation-dependent, but they should normally be the same if they have
-- the same absolute path name and neither has been renamed, for example.
--
openFile :: FilePath -> IOMode -> IO Handle
openFile path mode = Handle . fst <$> FD.openFile path mode True
-------------------------------------------------------------------------------
-- Array IO (Input)
-------------------------------------------------------------------------------
-- | Read a 'ByteArray' from a file handle. If no data is available on the
-- handle it blocks until some data becomes available. If data is available
-- then it immediately returns that data without blocking. It reads a maximum
-- of up to the size requested.
{-# INLINABLE readArrayUpto #-}
readArrayUpto :: Int -> Handle -> IO (Array Word8)
readArrayUpto size (Handle fd) = do
arr <- MArray.pinnedNewBytes size
-- ptr <- mallocPlainForeignPtrAlignedBytes size (alignment (undefined :: Word8))
MArray.unsafePinnedAsPtr arr $ \p -> do
-- n <- hGetBufSome h p size
#if MIN_VERSION_base(4,15,0)
n <- RawIO.read fd p 0 size
#else
n <- RawIO.read fd p size
#endif
-- XXX shrink only if the diff is significant
-- Use unsafeFreezeWithShrink
return
$ unsafeFreeze
$ arr { MArray.arrEnd = n, MArray.arrBound = size }
-------------------------------------------------------------------------------
-- Array IO (output)
-------------------------------------------------------------------------------
-- | Write an 'Array' to a file handle.
--
-- @since 0.7.0
{-# INLINABLE writeArray #-}
writeArray :: Unbox a => Handle -> Array a -> IO ()
writeArray _ arr | A.length arr == 0 = return ()
writeArray (Handle fd) arr =
unsafePinnedAsPtr arr $ \p ->
-- RawIO.writeAll fd (castPtr p) aLen
#if MIN_VERSION_base(4,15,0)
RawIO.write fd (castPtr p) 0 aLen
#else
RawIO.write fd (castPtr p) aLen
#endif
{-
-- Experiment to compare "writev" based IO with "write" based IO.
iov <- A.newArray 1
let iov' = iov {arrEnd = arrBound iov}
A.writeIndex iov' 0 (RawIO.IOVec (castPtr p) (fromIntegral aLen))
RawIO.writevAll fd (unsafeForeignPtrToPtr (aStart iov')) 1
-}
where
aLen = byteLength arr
#if !defined(mingw32_HOST_OS)
{-
-- | Write an array of 'IOVec' to a file handle.
--
-- @since 0.7.0
{-# INLINABLE writeIOVec #-}
writeIOVec :: Handle -> Array RawIO.IOVec -> IO ()
writeIOVec _ iov | A.length iov == 0 = return ()
writeIOVec (Handle fd) iov =
unsafePinnedAsPtr iov $ \p ->
RawIO.writevAll fd p (A.length iov)
-}
#endif
-------------------------------------------------------------------------------
-- Stream of Arrays IO
-------------------------------------------------------------------------------
-- | @readArraysOfUpto size h@ reads a stream of arrays from file handle @h@.
-- The maximum size of a single array is specified by @size@. The actual size
-- read may be less than or equal to @size@.
{-# INLINE _readArraysOfUpto #-}
_readArraysOfUpto :: (MonadIO m)
=> Int -> Handle -> Stream m (Array Word8)
_readArraysOfUpto size h = S.fromStreamK go
where
-- XXX use cons/nil instead
go = K.mkStream $ \_ yld _ stp -> do
arr <- liftIO $ readArrayUpto size h
if A.length arr == 0
then stp
else yld arr go
{-# INLINE_NORMAL readArraysOfUpto #-}
readArraysOfUpto :: (MonadIO m)
=> Int -> Handle -> Stream m (Array Word8)
readArraysOfUpto size h = D.Stream step ()
where
{-# INLINE_LATE step #-}
step _ _ = do
arr <- liftIO $ readArrayUpto size h
return $
case A.length arr of
0 -> D.Stop
_ -> D.Yield arr ()
-- XXX read 'Array a' instead of Word8
--
-- | @readArrays h@ reads a stream of arrays from file handle @h@.
-- The maximum size of a single array is limited to @defaultChunkSize@.
-- 'readArrays' ignores the prevailing 'TextEncoding' and 'NewlineMode'
-- on the 'Handle'.
--
-- > readArrays = readArraysOfUpto defaultChunkSize
--
-- @since 0.7.0
{-# INLINE readArrays #-}
readArrays :: (MonadIO m) => Handle -> Stream m (Array Word8)
readArrays = readArraysOfUpto defaultChunkSize
-------------------------------------------------------------------------------
-- Read File to Stream
-------------------------------------------------------------------------------
-- TODO for concurrent streams implement readahead IO. We can send multiple
-- read requests at the same time. For serial case we can use async IO. We can
-- also control the read throughput in mbps or IOPS.
-- | @readInChunksOf chunkSize handle@ reads a byte stream from a file handle,
-- reads are performed in chunks of up to @chunkSize@. The stream ends as soon
-- as EOF is encountered.
--
{-# INLINE readInChunksOf #-}
readInChunksOf :: (MonadIO m) => Int -> Handle -> Stream m Word8
readInChunksOf chunkSize h = AS.concat $ readArraysOfUpto chunkSize h
-- TODO
-- read :: (MonadIO m, Unbox a) => Handle -> Stream m a
--
-- > read = 'readByChunks' A.defaultChunkSize
-- | Generate a stream of elements of the given type from a file 'Handle'. The
-- stream ends when EOF is encountered.
--
-- @since 0.7.0
{-# INLINE read #-}
read :: (MonadIO m) => Handle -> Stream m Word8
read = AS.concat . readArrays
-------------------------------------------------------------------------------
-- Writing
-------------------------------------------------------------------------------
-- | Write a stream of arrays to a handle.
--
-- @since 0.7.0
{-# INLINE writeArrays #-}
writeArrays :: (MonadIO m, Unbox a) => Handle -> Stream m (Array a) -> m ()
writeArrays h = S.fold (FL.drainMapM (liftIO . writeArray h))
-- | Write a stream of arrays to a handle after coalescing them in chunks of
-- specified size. The chunk size is only a maximum and the actual writes could
-- be smaller than that as we do not split the arrays to fit them to the
-- specified size.
--
-- @since 0.7.0
{-# INLINE writeArraysPackedUpto #-}
writeArraysPackedUpto :: (MonadIO m, Unbox a)
=> Int -> Handle -> Stream m (Array a) -> m ()
writeArraysPackedUpto n h xs = writeArrays h $ AS.compact n xs
#if !defined(mingw32_HOST_OS)
{-
-- XXX this is incomplete
-- | Write a stream of 'IOVec' arrays to a handle.
--
-- @since 0.7.0
{-# INLINE writev #-}
writev :: MonadIO m => Handle -> Stream m (Array RawIO.IOVec) -> m ()
writev h = S.mapM_ (liftIO . writeIOVec h)
-- XXX this is incomplete
-- | Write a stream of arrays to a handle after grouping them in 'IOVec' arrays
-- of up to a maximum total size. Writes are performed using gather IO via
-- @writev@ system call. The maximum number of entries in each 'IOVec' group
-- limited to 512.
--
-- @since 0.7.0
{-# INLINE _writevArraysPackedUpto #-}
_writevArraysPackedUpto :: MonadIO m
=> Int -> Handle -> Stream m (Array a) -> m ()
_writevArraysPackedUpto n h xs =
writev h $ fromStreamD $ groupIOVecsOf n 512 (toStreamD xs)
-}
#endif
-- GHC buffer size dEFAULT_FD_BUFFER_SIZE=8192 bytes.
--
-- XXX test this
-- Note that if you use a chunk size less than 8K (GHC's default buffer
-- size) then you are advised to use 'NOBuffering' mode on the 'Handle' in case you
-- do not want buffering to occur at GHC level as well. Same thing applies to
-- writes as well.
-- | Like 'write' but provides control over the write buffer. Output will
-- be written to the IO device as soon as we collect the specified number of
-- input elements.
--
-- @since 0.7.0
{-# INLINE writeInChunksOf #-}
writeInChunksOf :: MonadIO m => Int -> Handle -> Stream m Word8 -> m ()
writeInChunksOf n h m = writeArrays h $ AS.chunksOf n m
-- > write = 'writeInChunksOf' A.defaultChunkSize
--
-- | Write a byte stream to a file handle. Combines the bytes in chunks of size
-- up to 'A.defaultChunkSize' before writing. Note that the write behavior
-- depends on the 'IOMode' and the current seek position of the handle.
--
-- @since 0.7.0
{-# INLINE write #-}
write :: MonadIO m => Handle -> Stream m Word8 -> m ()
write = writeInChunksOf defaultChunkSize
{-
{-# INLINE write #-}
write :: (MonadIO m, Unboxed a) => Handle -> Stream m a -> m ()
write = toHandleWith A.defaultChunkSize
-}
-------------------------------------------------------------------------------
-- IO with encoding/decoding Unicode characters
-------------------------------------------------------------------------------
{-
-- |
-- > readUtf8 = decodeUtf8 . read
--
-- Read a UTF8 encoded stream of unicode characters from a file handle.
--
-- @since 0.7.0
{-# INLINE readUtf8 #-}
readUtf8 :: (MonadIO m) => Handle -> Stream m Char
readUtf8 = decodeUtf8 . read
-- |
-- > writeUtf8 h s = write h $ encodeUtf8 s
--
-- Encode a stream of unicode characters to UTF8 and write it to the given file
-- handle. Default block buffering applies to the writes.
--
-- @since 0.7.0
{-# INLINE writeUtf8 #-}
writeUtf8 :: MonadIO m => Handle -> Stream m Char -> m ()
writeUtf8 h s = write h $ encodeUtf8 s
-- | Write a stream of unicode characters after encoding to UTF-8 in chunks
-- separated by a linefeed character @'\n'@. If the size of the buffer exceeds
-- @defaultChunkSize@ and a linefeed is not yet found, the buffer is written
-- anyway. This is similar to writing to a 'Handle' with the 'LineBuffering'
-- option.
--
-- @since 0.7.0
{-# INLINE writeUtf8ByLines #-}
writeUtf8ByLines :: (MonadIO m) => Handle -> Stream m Char -> m ()
writeUtf8ByLines = undefined
-- | Read UTF-8 lines from a file handle and apply the specified fold to each
-- line. This is similar to reading a 'Handle' with the 'LineBuffering' option.
--
-- @since 0.7.0
{-# INLINE readLines #-}
readLines :: (MonadIO m) => Handle -> Fold m Char b -> Stream m b
readLines h f = foldLines (readUtf8 h) f
-------------------------------------------------------------------------------
-- Framing on a sequence
-------------------------------------------------------------------------------
-- | Read a stream from a file handle and split it into frames delimited by
-- the specified sequence of elements. The supplied fold is applied on each
-- frame.
--
-- @since 0.7.0
{-# INLINE readFrames #-}
readFrames :: (MonadIO m, Unboxed a)
=> Array a -> Handle -> Fold m a b -> Stream m b
readFrames = undefined -- foldFrames . read
-- | Write a stream to the given file handle buffering up to frames separated
-- by the given sequence or up to a maximum of @defaultChunkSize@.
--
-- @since 0.7.0
{-# INLINE writeByFrames #-}
writeByFrames :: (MonadIO m, Unboxed a)
=> Array a -> Handle -> Stream m a -> m ()
writeByFrames = undefined
-------------------------------------------------------------------------------
-- Random Access IO (Seek)
-------------------------------------------------------------------------------
-- XXX handles could be shared, so we may not want to use the handle state at
-- all for these APIs. we can use pread and pwrite instead. On windows we will
-- need to use readFile/writeFile with an offset argument.
-------------------------------------------------------------------------------
-- | Read the element at the given index treating the file as an array.
--
-- @since 0.7.0
{-# INLINE readIndex #-}
readIndex :: Unboxed a => Handle -> Int -> Maybe a
readIndex arr i = undefined
-- NOTE: To represent a range to read we have chosen (start, size) instead of
-- (start, end). This helps in removing the ambiguity of whether "end" is
-- included in the range or not.
--
-- We could avoid specifying the range to be read and instead use "take size"
-- on the stream, but it may end up reading more and then consume it partially.
-- | @readSliceWith chunkSize handle pos len@ reads up to @len@ bytes
-- from @handle@ starting at the offset @pos@ from the beginning of the file.
--
-- Reads are performed in chunks of size @chunkSize@. For block devices, to
-- avoid reading partial blocks @chunkSize@ must align with the block size of
-- the underlying device. If the underlying block size is unknown, it is a good
-- idea to keep it a multiple 4KiB. This API ensures that the start of each
-- chunk is aligned with @chunkSize@ from second chunk onwards.
--
{-# INLINE readSliceWith #-}
readSliceWith :: (MonadIO m, Unboxed a)
=> Int -> Handle -> Int -> Int -> Stream m a
readSliceWith chunkSize h pos len = undefined
-- | @readSlice h i count@ streams a slice from the file handle @h@ starting
-- at index @i@ and reading up to @count@ elements in the forward direction
-- ending at the index @i + count - 1@.
--
-- @since 0.7.0
{-# INLINE readSlice #-}
readSlice :: (MonadIO m, Unboxed a)
=> Handle -> Int -> Int -> Stream m a
readSlice = readSliceWith defaultChunkSize
-- | @readSliceRev h i count@ streams a slice from the file handle @h@ starting
-- at index @i@ and reading up to @count@ elements in the reverse direction
-- ending at the index @i - count + 1@.
--
-- @since 0.7.0
{-# INLINE readSliceRev #-}
readSliceRev :: (MonadIO m, Unboxed a)
=> Handle -> Int -> Int -> Stream m a
readSliceRev h i count = undefined
-- | Write the given element at the given index in the file.
--
-- @since 0.7.0
{-# INLINE writeIndex #-}
writeIndex :: (MonadIO m, Unboxed a) => Handle -> Int -> a -> m ()
writeIndex h i a = undefined
-- | @writeSlice h i count stream@ writes a stream to the file handle @h@
-- starting at index @i@ and writing up to @count@ elements in the forward
-- direction ending at the index @i + count - 1@.
--
-- @since 0.7.0
{-# INLINE writeSlice #-}
writeSlice :: (Monad m, Unboxed a)
=> Handle -> Int -> Int -> Stream m a -> m ()
writeSlice h i len s = undefined
-- | @writeSliceRev h i count stream@ writes a stream to the file handle @h@
-- starting at index @i@ and writing up to @count@ elements in the reverse
-- direction ending at the index @i - count + 1@.
--
-- @since 0.7.0
{-# INLINE writeSliceRev #-}
writeSliceRev :: (Monad m, Unboxed a)
=> Handle -> Int -> Int -> Stream m a -> m ()
writeSliceRev arr i len s = undefined
-}

View File

@ -1,216 +0,0 @@
#include "inline.hs"
-- |
-- Module : Streamly.Internal.FileSystem.FDIO
-- Copyright : (c) 2019 Composewell Technologies
-- Copyright : (c) 1994-2008 The University of Glasgow
--
-- License : BSD3
-- Maintainer : streamly@composewell.com
-- Stability : experimental
-- Portability : GHC
--
-- Low level IO routines interfacing the operating system.
--
module Streamly.Internal.FileSystem.FDIO
( write
, writeAll
, writev
, writevAll
)
where
import Control.Monad (when)
import Streamly.Internal.System.IOVec.Type (IOVec)
#if !defined(mingw32_HOST_OS)
import Control.Concurrent (threadWaitWrite)
import Data.Int (Int64)
import Foreign.C.Error (throwErrnoIfMinus1RetryMayBlock)
import Foreign.C.Types (CBool(..))
import System.Posix.Internals (c_write, c_safe_write)
import Streamly.Internal.System.IOVec.Type (c_writev, c_safe_writev)
#endif
import Foreign.C.Types (CSize(..), CInt(..))
import Data.Word (Word8)
import Foreign.Ptr (plusPtr, Ptr)
import GHC.IO.FD (FD(..))
-------------------------------------------------------------------------------
-- IO Routines
-------------------------------------------------------------------------------
-- See System.POSIX.Internals in GHC base package
-------------------------------------------------------------------------------
-- Write without blocking the underlying OS thread
-------------------------------------------------------------------------------
#if !defined(mingw32_HOST_OS)
foreign import ccall unsafe "rtsSupportsBoundThreads" threaded :: Bool
isNonBlocking :: FD -> Bool
isNonBlocking fd = fdIsNonBlocking fd /= 0
-- "poll"s the fd for data to become available or timeout
-- See cbits/inputReady.c in base package
foreign import ccall unsafe "fdReady"
unsafe_fdReady :: CInt -> CBool -> Int64 -> CBool -> IO CInt
writeNonBlocking :: String -> FD -> Ptr Word8 -> Int -> CSize -> IO CInt
writeNonBlocking loc !fd !buf !off !len
| isNonBlocking fd = unsafe_write -- unsafe is ok, it can't block
| otherwise = do
let isWrite = 1
isSocket = 0
msecs = 0
r <- unsafe_fdReady (fdFD fd) isWrite msecs isSocket
when (r == 0) $ threadWaitWrite (fromIntegral (fdFD fd))
if threaded then safe_write else unsafe_write
where
do_write call = fromIntegral `fmap`
throwErrnoIfMinus1RetryMayBlock loc call
(threadWaitWrite (fromIntegral (fdFD fd)))
unsafe_write = do_write (c_write (fdFD fd) (buf `plusPtr` off) len)
safe_write = do_write (c_safe_write (fdFD fd) (buf `plusPtr` off) len)
writevNonBlocking :: String -> FD -> Ptr IOVec -> Int -> IO CInt
writevNonBlocking loc !fd !iov !cnt
| isNonBlocking fd = unsafe_write -- unsafe is ok, it can't block
| otherwise = do
let isWrite = 1
isSocket = 0
msecs = 0
r <- unsafe_fdReady (fdFD fd) isWrite msecs isSocket
when (r == 0) $ threadWaitWrite (fromIntegral (fdFD fd))
if threaded then safe_write else unsafe_write
where
do_write call = fromIntegral `fmap`
throwErrnoIfMinus1RetryMayBlock loc call
(threadWaitWrite (fromIntegral (fdFD fd)))
unsafe_write = do_write (c_writev (fdFD fd) iov (fromIntegral cnt))
safe_write = do_write (c_safe_writev (fdFD fd) iov (fromIntegral cnt))
#else
writeNonBlocking :: String -> FD -> Ptr Word8 -> Int -> CSize -> IO CInt
writeNonBlocking = undefined
writevNonBlocking :: String -> FD -> Ptr IOVec -> Int -> IO CInt
writevNonBlocking = undefined
#endif
-- Windows code is disabled for now
#if 0
#if defined(mingw32_HOST_OS)
# if defined(i386_HOST_ARCH)
# define WINDOWS_CCONV stdcall
# elif defined(x86_64_HOST_ARCH)
# define WINDOWS_CCONV ccall
# else
# error Unknown mingw32 arch
# endif
#endif
foreign import WINDOWS_CCONV safe "recv"
c_safe_recv :: CInt -> Ptr Word8 -> CInt -> CInt{-flags-} -> IO CInt
foreign import WINDOWS_CCONV safe "send"
c_safe_send :: CInt -> Ptr Word8 -> CInt -> CInt{-flags-} -> IO CInt
blockingWriteRawBufferPtr :: String -> FD -> Ptr Word8-> Int -> CSize -> IO CInt
blockingWriteRawBufferPtr loc !fd !buf !off !len
= throwErrnoIfMinus1Retry loc $ do
let start_ptr = buf `plusPtr` off
send_ret = c_safe_send (fdFD fd) start_ptr (fromIntegral len) 0
write_ret = c_safe_write (fdFD fd) start_ptr (fromIntegral len)
r <- bool write_ret send_ret (fdIsSocket fd)
when (r == -1) c_maperrno
return r
-- We don't trust write() to give us the correct errno, and
-- instead do the errno conversion from GetLastError()
-- ourselves. The main reason is that we treat ERROR_NO_DATA
-- (pipe is closing) as EPIPE, whereas write() returns EINVAL
-- for this case. We need to detect EPIPE correctly, because it
-- shouldn't be reported as an error when it happens on stdout.
-- As for send()'s case, Winsock functions don't do errno
-- conversion in any case so we have to do it ourselves.
-- That means we're doing the errno conversion no matter if the
-- fd is from a socket or not.
-- NOTE: "safe" versions of the read/write calls for use by the threaded RTS.
-- These calls may block, but that's ok.
asyncWriteRawBufferPtr :: String -> FD -> Ptr Word8 -> Int -> CSize -> IO CInt
asyncWriteRawBufferPtr loc !fd !buf !off !len = do
(l, rc) <- asyncWrite (fromIntegral (fdFD fd)) (fdIsSocket_ fd)
(fromIntegral len) (buf `plusPtr` off)
if l == (-1)
then let sock_errno = c_maperrno_func (fromIntegral rc)
non_sock_errno = Errno (fromIntegral rc)
errno = bool non_sock_errno sock_errno (fdIsSocket fd)
in ioError (errnoToIOError loc errno Nothing Nothing)
else return (fromIntegral l)
writeNonBlocking :: String -> FD -> Ptr Word8 -> Int -> CSize -> IO CInt
writeNonBlocking loc !fd !buf !off !len
| threaded = blockingWriteRawBufferPtr loc fd buf off len
| otherwise = asyncWriteRawBufferPtr loc fd buf off len
#endif
-- | @write FD buffer offset length@ tries to write data on the given
-- filesystem fd (cannot be a socket) up to sepcified length starting from the
-- given offset in the buffer. The write will not block the OS thread, it may
-- suspend the Haskell thread until write can proceed. Returns the actual
-- amount of data written.
write :: FD -> Ptr Word8 -> Int -> CSize -> IO CInt
write = writeNonBlocking "Streamly.Internal.FileSystem.FDIO"
-- XXX sendAll for sockets has a similar code, we can deduplicate the two.
-- XXX we need to check the errno to determine if the loop should continue. For
-- example, write may return without writing all data if the process file-size
-- limit has reached, in that case keep writing in a loop is fruitless.
--
-- | Keep writing in a loop until all data in the buffer has been written.
writeAll :: FD -> Ptr Word8 -> Int -> IO ()
writeAll fd ptr bytes = do
res <- write fd ptr 0 (fromIntegral bytes)
let res' = fromIntegral res
when (res' < bytes) $
writeAll fd (ptr `plusPtr` res') (bytes - res')
-------------------------------------------------------------------------------
-- Vector IO
-------------------------------------------------------------------------------
-- | @write FD iovec count@ tries to write data on the given filesystem fd
-- (cannot be a socket) from an iovec with specified number of entries. The
-- write will not block the OS thread, it may suspend the Haskell thread until
-- write can proceed. Returns the actual amount of data written.
writev :: FD -> Ptr IOVec -> Int -> IO CInt
writev = writevNonBlocking "Streamly.Internal.FileSystem.FDIO"
-- XXX incomplete
-- | Keep writing an iovec in a loop until all the iovec entries are written.
writevAll :: FD -> Ptr IOVec -> Int -> IO ()
writevAll fd iovec count = do
_res <- writev fd iovec count
{-
let res' = fromIntegral res
totalBytes = countIOVecBytes
if res' < totalBytes
then do
let iovec' = createModifiedIOVec
count' = ...
writeAll fd iovec' count'
else return ()
-}
return ()

View File

@ -83,8 +83,6 @@ extra-source-files:
benchmark/Streamly/Benchmark/Data/Array/Stream.hs
benchmark/Streamly/Benchmark/Data/Fold/Window.hs
benchmark/Streamly/Benchmark/Data/Stream/*.hs
benchmark/Streamly/Benchmark/FileSystem/*.hs
benchmark/Streamly/Benchmark/FileSystem/Handle/*.hs
benchmark/Streamly/Benchmark/Prelude/*.hs
benchmark/Streamly/Benchmark/Unicode/*.hs
benchmark/lib/Streamly/Benchmark/*.hs
@ -104,7 +102,6 @@ extra-source-files:
-- This is duplicated
src/Streamly/Internal/Data/Stream/Instances.hs
src/Streamly/Internal/FileSystem/Event/Darwin.h
src/assert.hs
src/config.h.in
src/inline.hs
@ -421,13 +418,6 @@ library
, Streamly.Internal.Data.Stream.IsStream
if !impl(ghcjs) && flag(dev)
other-modules:
Streamly.Internal.System.IOVec.Type
, Streamly.Internal.System.IOVec
, Streamly.Internal.FileSystem.FDIO
, Streamly.Internal.FileSystem.FD
if flag(dev)
exposed-modules: Streamly.Internal.Data.SmallArray
-- Exposed modules show up on hackage irrespective of the flag, so keep
@ -435,20 +425,6 @@ library
other-modules: Streamly.Data.SmallArray
, Streamly.Internal.Data.SmallArray.Type
if os(windows)
exposed-modules: Streamly.Internal.FileSystem.Event.Windows
if os(darwin)
include-dirs: src/Streamly/Internal
c-sources: src/Streamly/Internal/FileSystem/Event/Darwin.m
exposed-modules: Streamly.Internal.FileSystem.Event.Darwin
if os(linux)
exposed-modules: Streamly.Internal.FileSystem.Event.Linux
if os(linux) || os(darwin) || os(windows)
exposed-modules: Streamly.Internal.FileSystem.Event
other-modules:
Streamly.Internal.Data.Channel.Types
, Streamly.Internal.Data.Channel.Dispatcher
@ -539,17 +515,6 @@ library
if flag(dev)
build-depends: primitive >= 0.5.4 && < 0.9
-- For FileSystem.Event module
if os(linux)
build-depends: directory >= 1.2.2 && < 1.4
if os(windows)
build-depends: Win32 >= 2.6 && < 2.14
if os(darwin)
build-depends: directory >= 1.2.2 && < 1.4
frameworks: Cocoa
if flag(inspection)
build-depends: inspection-testing >= 0.4 && < 0.6