From 7fbb5bc4a9b441a62910fc3944ff644cd074fedc Mon Sep 17 00:00:00 2001 From: Adithya Kumar Date: Mon, 4 Nov 2024 16:15:24 +0530 Subject: [PATCH] Implement Dir module using DirIO module --- core/src/Streamly/Internal/FileSystem/Dir.hs | 348 ++----------------- core/streamly-core.cabal | 6 - 2 files changed, 29 insertions(+), 325 deletions(-) diff --git a/core/src/Streamly/Internal/FileSystem/Dir.hs b/core/src/Streamly/Internal/FileSystem/Dir.hs index 5080ee0d55..43aaee4743 100644 --- a/core/src/Streamly/Internal/FileSystem/Dir.hs +++ b/core/src/Streamly/Internal/FileSystem/Dir.hs @@ -86,195 +86,42 @@ where import Control.Monad.Catch (MonadCatch) import Control.Monad.IO.Class (MonadIO(..)) import Data.Bifunctor (bimap) -import Data.Either (isRight, isLeft, fromLeft, fromRight) -import Data.Function ((&)) import Streamly.Data.Stream (Stream) -import Streamly.Internal.Data.Unfold (Step(..)) import Streamly.Internal.Data.Unfold.Type (Unfold(..)) import System.FilePath (()) -#if (defined linux_HOST_OS) || (defined darwin_HOST_OS) || (defined freebsd_HOST_OS) -import System.Posix (DirStream, openDirStream, readDirStream, closeDirStream) -#elif defined(mingw32_HOST_OS) -import qualified System.Win32 as Win32 -#else -#error "Unsupported architecture" -#endif -import qualified Streamly.Data.Unfold as UF -import qualified Streamly.Internal.Data.Unfold as UF (mapM2, bracketIO) import qualified Streamly.Data.Stream as S -import qualified System.Directory as Dir -import Prelude hiding (read) - -{- -{-# INLINABLE readArrayUpto #-} -readArrayUpto :: Int -> Handle -> IO (Array Word8) -readArrayUpto size h = do - ptr <- mallocPlainForeignPtrBytes size - -- ptr <- mallocPlainForeignPtrAlignedBytes size (alignment (undefined :: Word8)) - withForeignPtr ptr $ \p -> do - n <- hGetBufSome h p size - let v = Array - { aStart = ptr - , arrEnd = p `plusPtr` n - , arrBound = p `plusPtr` size - } - -- XXX shrink only if the diff is significant - shrinkToFit v - -------------------------------------------------------------------------------- --- Stream of Arrays IO -------------------------------------------------------------------------------- - --- | @toChunksWithBufferOf size h@ reads a stream of arrays from file handle @h@. --- The maximum size of a single array is specified by @size@. The actual size --- read may be less than or equal to @size@. -{-# INLINE _toChunksWithBufferOf #-} -_toChunksWithBufferOf :: MonadIO m => Int -> Handle -> Stream m (Array Word8) -_toChunksWithBufferOf size h = go - where - -- XXX use cons/nil instead - go = mkStream $ \_ yld _ stp -> do - arr <- liftIO $ readArrayUpto size h - if A.length arr == 0 - then stp - else yld arr go - --- | @toChunksWithBufferOf size handle@ reads a stream of arrays from the file --- handle @handle@. The maximum size of a single array is limited to @size@. --- The actual size read may be less than or equal to @size@. --- --- @since 0.7.0 -{-# INLINE_NORMAL toChunksWithBufferOf #-} -toChunksWithBufferOf :: MonadIO m => Int -> Handle -> Stream m (Array Word8) -toChunksWithBufferOf size h = D.fromStreamD (D.Stream step ()) - where - {-# INLINE_LATE step #-} - step _ _ = do - arr <- liftIO $ readArrayUpto size h - return $ - case A.length arr of - 0 -> D.Stop - _ -> D.Yield arr () - --- | Unfold the tuple @(bufsize, handle)@ into a stream of 'Word8' arrays. --- Read requests to the IO device are performed using a buffer of size --- @bufsize@. The size of an array in the resulting stream is always less than --- or equal to @bufsize@. --- --- @since 0.7.0 -{-# INLINE_NORMAL readChunksWithBufferOf #-} -readChunksWithBufferOf :: MonadIO m => Unfold m (Int, Handle) (Array Word8) -readChunksWithBufferOf = Unfold step return - where - {-# INLINE_LATE step #-} - step (size, h) = do - arr <- liftIO $ readArrayUpto size h - return $ - case A.length arr of - 0 -> D.Stop - _ -> D.Yield arr (size, h) - --- XXX read 'Array a' instead of Word8 --- --- | @toChunks handle@ reads a stream of arrays from the specified file --- handle. The maximum size of a single array is limited to --- @defaultChunkSize@. The actual size read may be less than or equal to --- @defaultChunkSize@. --- --- > toChunks = toChunksWithBufferOf defaultChunkSize --- --- @since 0.7.0 -{-# INLINE toChunks #-} -toChunks :: MonadIO m => Handle -> Stream m (Array Word8) -toChunks = toChunksWithBufferOf defaultChunkSize - --- | Unfolds a handle into a stream of 'Word8' arrays. Requests to the IO --- device are performed using a buffer of size --- 'Streamly.Internal.Data.Array.Type.defaultChunkSize'. The --- size of arrays in the resulting stream are therefore less than or equal to --- 'Streamly.Internal.Data.Array.Type.defaultChunkSize'. --- --- @since 0.7.0 -{-# INLINE readChunks #-} -readChunks :: MonadIO m => Unfold m Handle (Array Word8) -readChunks = UF.first readChunksWithBufferOf defaultChunkSize - -------------------------------------------------------------------------------- --- Read a Directory to Stream -------------------------------------------------------------------------------- - --- TODO for concurrent streams implement readahead IO. We can send multiple --- read requests at the same time. For serial case we can use async IO. We can --- also control the read throughput in mbps or IOPS. - --- | Unfolds the tuple @(bufsize, handle)@ into a byte stream, read requests --- to the IO device are performed using buffers of @bufsize@. --- --- @since 0.7.0 -{-# INLINE readWithBufferOf #-} -readWithBufferOf :: MonadIO m => Unfold m (Int, Handle) Word8 -readWithBufferOf = UF.many readChunksWithBufferOf A.read +import Streamly.Internal.FileSystem.Path (Path) --- | @toStreamWithBufferOf bufsize handle@ reads a byte stream from a file --- handle, reads are performed in chunks of up to @bufsize@. --- --- /Pre-release/ -{-# INLINE toStreamWithBufferOf #-} -toStreamWithBufferOf :: MonadIO m => Int -> Handle -> Stream m Word8 -toStreamWithBufferOf chunkSize h = AS.concat $ toChunksWithBufferOf chunkSize h --} +import qualified Streamly.Internal.FileSystem.Path as Path +import qualified Streamly.Internal.FileSystem.DirIO as DirIO +import qualified Streamly.Internal.Data.Unfold as Unfold --- read child node names from a dir filtering out . and .. --- --- . and .. are an implementation artifact, and should probably not be used in --- user level abstractions. --- --- . does not seem to have any useful purpose. If we have the path of the dir --- then we will resolve it to get the inode of the dir so the . entry would be --- redundant. If we have the inode of the dir to read the dir then it is --- redundant. Is this for cross check when doing fsck? --- --- For .. we have the readAncestors API, we should not have this in the --- readChildren API. - --- XXX exception handling - -#if (defined linux_HOST_OS) || (defined darwin_HOST_OS) || (defined freebsd_HOST_OS) -{-# INLINE streamReader #-} -streamReader :: MonadIO m => Unfold m DirStream FilePath -streamReader = Unfold step return - - where - - step strm = do - -- XXX Use readDirStreamMaybe - file <- liftIO $ readDirStream strm - case file of - [] -> return Stop - _ -> return $ Yield file strm +import Prelude hiding (read) -#elif defined(mingw32_HOST_OS) -openDirStream :: String -> IO (Win32.HANDLE, Win32.FindData) -openDirStream = Win32.findFirstFile +-------------------------------------------------------------------------------- +-- Helpers +-------------------------------------------------------------------------------- -closeDirStream :: (Win32.HANDLE, Win32.FindData) -> IO () -closeDirStream (h, _) = Win32.findClose h +{-# INLINE ePathMap #-} +ePathMap :: Either Path Path -> Either FilePath FilePath +ePathMap (Left a) = Left (Path.toString a) +ePathMap (Right a) = Right (Path.toString a) -{-# INLINE streamReader #-} -streamReader :: MonadIO m => Unfold m (Win32.HANDLE, Win32.FindData) FilePath -streamReader = Unfold step return +{-# INLINE pMapUnfold #-} +pMapUnfold :: MonadCatch m => Unfold m Path Path -> Unfold m FilePath FilePath +pMapUnfold = fmap Path.toString . Unfold.lmapM Path.fromString - where +{-# INLINE pMapUnfoldE #-} +pMapUnfoldE + :: MonadCatch m + => Unfold m Path (Either Path Path) + -> Unfold m FilePath (Either FilePath FilePath) +pMapUnfoldE = fmap ePathMap . Unfold.lmapM Path.fromString - step (h, fdat) = do - more <- liftIO $ Win32.findNextFile h fdat - if more - then do - file <- liftIO $ Win32.getFindDataFileName fdat - return $ Yield file (h, fdat) - else return Stop -#endif +-------------------------------------------------------------------------------- +-- Functions +-------------------------------------------------------------------------------- -- | Read a directory emitting a stream with names of the children. Filter out -- "." and ".." entries. @@ -283,18 +130,7 @@ streamReader = Unfold step return -- {-# INLINE reader #-} reader :: (MonadIO m, MonadCatch m) => Unfold m FilePath FilePath -reader = --- XXX Instead of using bracketIO for each iteration of the loop we should --- instead yield a buffer of dir entries in each iteration and then use an --- unfold and concat to flatten those entries. That should improve the --- performance. - UF.bracketIO openDirStream closeDirStream streamReader - & UF.filter (\x -> x /= "." && x /= "..") - --- XXX We can use a more general mechanism to filter the contents of a --- directory. We can just stat each child and pass on the stat information. We --- can then use that info to do a general filtering. "find" like filters can be --- created. +reader = fmap Path.toString $ Unfold.lmapM Path.fromString DirIO.reader -- | Read directories as Left and files as Right. Filter out "." and ".." -- entries. @@ -303,18 +139,12 @@ reader = -- {-# INLINE eitherReader #-} eitherReader :: (MonadIO m, MonadCatch m) => Unfold m FilePath (Either FilePath FilePath) -eitherReader = UF.mapM2 classify reader +eitherReader = pMapUnfoldE DirIO.eitherReader - where - - classify dir x = do - r <- liftIO $ Dir.doesDirectoryExist (dir ++ "/" ++ x) - return $ if r then Left x else Right x {-# INLINE eitherReaderPaths #-} eitherReaderPaths ::(MonadIO m, MonadCatch m) => Unfold m FilePath (Either FilePath FilePath) -eitherReaderPaths = - UF.mapM2 (\dir -> return . bimap (dir ) (dir )) eitherReader +eitherReaderPaths = pMapUnfoldE DirIO.eitherReaderPaths -- -- | Read files only. @@ -323,7 +153,7 @@ eitherReaderPaths = -- {-# INLINE fileReader #-} fileReader :: (MonadIO m, MonadCatch m) => Unfold m FilePath FilePath -fileReader = fmap (fromRight undefined) $ UF.filter isRight eitherReader +fileReader = pMapUnfold DirIO.fileReader -- | Read directories only. Filter out "." and ".." entries. -- @@ -331,7 +161,7 @@ fileReader = fmap (fromRight undefined) $ UF.filter isRight eitherReader -- {-# INLINE dirReader #-} dirReader :: (MonadIO m, MonadCatch m) => Unfold m FilePath FilePath -dirReader = fmap (fromLeft undefined) $ UF.filter isLeft eitherReader +dirReader = pMapUnfold DirIO.dirReader -- | Raw read of a directory. -- @@ -389,123 +219,3 @@ readDirs = S.unfold dirReader {-# INLINE toDirs #-} toDirs :: (MonadIO m, MonadCatch m) => String -> Stream m String toDirs = readDirs - -{- -------------------------------------------------------------------------------- --- Writing -------------------------------------------------------------------------------- - -------------------------------------------------------------------------------- --- Array IO (output) -------------------------------------------------------------------------------- - --- | Write an 'Array' to a file handle. --- --- @since 0.7.0 -{-# INLINABLE writeArray #-} -writeArray :: Storable a => Handle -> Array a -> IO () -writeArray _ arr | A.length arr == 0 = return () -writeArray h Array{..} = withForeignPtr aStart $ \p -> hPutBuf h p aLen - where - aLen = - let p = unsafeForeignPtrToPtr aStart - in arrEnd `minusPtr` p - -------------------------------------------------------------------------------- --- Stream of Arrays IO -------------------------------------------------------------------------------- - -------------------------------------------------------------------------------- --- Writing -------------------------------------------------------------------------------- - --- | Write a stream of arrays to a handle. --- --- @since 0.7.0 -{-# INLINE fromChunks #-} -fromChunks :: (MonadIO m, Storable a) - => Handle -> Stream m (Array a) -> m () -fromChunks h m = S.mapM_ (liftIO . writeArray h) m - --- | @fromChunksWithBufferOf bufsize handle stream@ writes a stream of arrays --- to @handle@ after coalescing the adjacent arrays in chunks of @bufsize@. --- The chunk size is only a maximum and the actual writes could be smaller as --- we do not split the arrays to fit exactly to the specified size. --- --- @since 0.7.0 -{-# INLINE fromChunksWithBufferOf #-} -fromChunksWithBufferOf :: (MonadIO m, Storable a) - => Int -> Handle -> Stream m (Array a) -> m () -fromChunksWithBufferOf n h xs = fromChunks h $ AS.compact n xs - --- | @fromStreamWithBufferOf bufsize handle stream@ writes @stream@ to @handle@ --- in chunks of @bufsize@. A write is performed to the IO device as soon as we --- collect the required input size. --- --- @since 0.7.0 -{-# INLINE fromStreamWithBufferOf #-} -fromStreamWithBufferOf :: MonadIO m => Int -> Handle -> Stream m Word8 -> m () -fromStreamWithBufferOf n h m = fromChunks h $ S.pinnedChunksOf n m --- fromStreamWithBufferOf n h m = fromChunks h $ AS.chunksOf n m - --- > write = 'writeWithBufferOf' A.defaultChunkSize --- --- | Write a byte stream to a file handle. Accumulates the input in chunks of --- up to 'Streamly.Internal.Data.Array.Type.defaultChunkSize' before writing. --- --- NOTE: This may perform better than the 'write' fold, you can try this if you --- need some extra perf boost. --- --- @since 0.7.0 -{-# INLINE fromStream #-} -fromStream :: MonadIO m => Handle -> Stream m Word8 -> m () -fromStream = fromStreamWithBufferOf defaultChunkSize - --- | Write a stream of arrays to a handle. Each array in the stream is written --- to the device as a separate IO request. --- --- @since 0.7.0 -{-# INLINE writeChunks #-} -writeChunks :: (MonadIO m, Storable a) => Handle -> Fold m (Array a) () -writeChunks h = FL.drainBy (liftIO . writeArray h) - --- | @writeChunksWithBufferOf bufsize handle@ writes a stream of arrays --- to @handle@ after coalescing the adjacent arrays in chunks of @bufsize@. --- We never split an array, if a single array is bigger than the specified size --- it emitted as it is. Multiple arrays are coalesed as long as the total size --- remains below the specified size. --- --- @since 0.7.0 -{-# INLINE writeChunksWithBufferOf #-} -writeChunksWithBufferOf :: (MonadIO m, Storable a) - => Int -> Handle -> Fold m (Array a) () -writeChunksWithBufferOf n h = lpackArraysChunksOf n (writeChunks h) - --- GHC buffer size dEFAULT_FD_BUFFER_SIZE=8192 bytes. --- --- XXX test this --- Note that if you use a chunk size less than 8K (GHC's default buffer --- size) then you are advised to use 'NOBuffering' mode on the 'Handle' in case you --- do not want buffering to occur at GHC level as well. Same thing applies to --- writes as well. - --- | @writeWithBufferOf reqSize handle@ writes the input stream to @handle@. --- Bytes in the input stream are collected into a buffer until we have a chunk --- of @reqSize@ and then written to the IO device. --- --- @since 0.7.0 -{-# INLINE writeWithBufferOf #-} -writeWithBufferOf :: MonadIO m => Int -> Handle -> Fold m Word8 () -writeWithBufferOf n h = FL.groupsOf n (pinnedWriteNUnsafe n) (writeChunks h) - --- > write = 'writeWithBufferOf' A.defaultChunkSize --- --- | Write a byte stream to a file handle. Accumulates the input in chunks of --- up to 'Streamly.Internal.Data.Array.Type.defaultChunkSize' before writing --- to the IO device. --- --- @since 0.7.0 -{-# INLINE write #-} -write :: MonadIO m => Handle -> Fold m Word8 () -write = writeWithBufferOf defaultChunkSize --} diff --git a/core/streamly-core.cabal b/core/streamly-core.cabal index ec3a35e8fc..b8df8a60a2 100644 --- a/core/streamly-core.cabal +++ b/core/streamly-core.cabal @@ -544,9 +544,6 @@ library -- streamly-unicode-core , template-haskell >= 2.14 && < 2.23 - -- streamly-filesystem-core - , directory >= 1.3.3 && < 1.4 - -- XXX to be removed , containers >= 0.6.0 && < 0.8 , heaps >= 0.3 && < 0.5 @@ -559,8 +556,5 @@ library if !flag(use-unliftio) build-depends: monad-control >= 1.0 && < 1.1 - if os(linux) || os (darwin) || os(freebsd) - build-depends: unix >= 2.7.0 && < 2.9 - if os(windows) build-depends: Win32 >= 2.6 && < 2.14