From 81452333388f3e367dd8379ffab64e1904502ed2 Mon Sep 17 00:00:00 2001 From: Tom Ellis Date: Fri, 27 Dec 2024 15:49:45 +0000 Subject: [PATCH] Use ConsumeTerminate --- .../src/Bluefin/Examples/Stream/Many.hs | 43 +++++++++---------- 1 file changed, 21 insertions(+), 22 deletions(-) diff --git a/bluefin-examples/src/Bluefin/Examples/Stream/Many.hs b/bluefin-examples/src/Bluefin/Examples/Stream/Many.hs index 64ca108..2e53893 100644 --- a/bluefin-examples/src/Bluefin/Examples/Stream/Many.hs +++ b/bluefin-examples/src/Bluefin/Examples/Stream/Many.hs @@ -10,16 +10,22 @@ import Bluefin.Compound useImplUnder, useImplWithin, ) -import Bluefin.Consume (Consume, await) +import Bluefin.ConsumeTerminate + ( ConsumeTerminate, + awaitOrTerminate, + consumeStreamOrTerminate, + ) import Bluefin.Eff (Eff, bracket, runEff, (:&), (:>)) +import Bluefin.Exception (try) import Bluefin.IO (IOE, effIO) import Bluefin.Jump (jumpTo, withJump) import Bluefin.State (evalState, get, modify) -import Bluefin.Stream (Stream, consumeStream, forEach, yield) +import Bluefin.Stream (Stream, forEach, yield) import Bluefin.System.IO (hGetLine, hIsEOF, withFile) import Control.Monad (forever, replicateM_, when) import Data.Function (fix) import Data.Maybe (Maybe (..), isNothing) +import Data.Traversable (for) import System.IO (IOMode (ReadMode)) import Prelude hiding ( break, @@ -81,13 +87,16 @@ mix :: Stream [Data.Maybe.Maybe String] e1 -> Eff es () mix timings io y = do - let itersStreams :: [Wrap (Stream (Maybe String)) () es] - itersStreams = map (\x -> MkWrap (nothingOnEnd (pad io x))) timings + let itersStreams :: [Wrap (Stream String) () es] + itersStreams = map (\x -> MkWrap (pad io x)) timings connectMany itersStreams $ \itersStart -> do flip fix itersStart $ \again iters -> do when (not (null iters)) $ do - outs <- traverse await iters + outs <- for iters $ \y' -> do + try (awaitOrTerminate y') >>= \case + Left () -> pure Nothing + Right r -> pure (Just r) yield y outs let iters' = [ iter @@ -96,16 +105,6 @@ mix timings io y = do ] again iters' --- | When the stream is finished, yield Nothing for evermore. -nothingOnEnd :: - (e1 :> es) => - (forall e. Stream a e -> Eff (e :& es) r) -> - Stream (Data.Maybe.Maybe a) e1 -> - Eff es r -nothingOnEnd s y = do - _ <- forEach s $ \a -> yield y (Data.Maybe.Just a) - forever (yield y Data.Maybe.Nothing) - pad :: (e1 :> es, e2 :> es) => IOE e1 -> @@ -192,25 +191,25 @@ take n k y = -- it's just how the current implementation happens to work.) connectMany :: -- | n effectful operations that yield `a`s - [Wrap (Stream a) r es] -> + [Wrap (Stream a) r1 es] -> -- | Will be called with a list of n Consumes, -- to which the streams above will yield their -- `a`s - (forall e. [Consume a e] -> Eff (e :& es) r) -> - Eff es r + (forall e. [ConsumeTerminate a r1 e] -> Eff (e :& es) r2) -> + Eff es r2 connectMany ss k = makeOp (connectMany' ss k []) connectMany' :: - [Wrap (Stream a) r es] -> - (forall e. [Consume a e] -> Eff (e :& es) r) -> - (forall e. [Consume a e] -> Eff (e :& es) r) + [Wrap (Stream a) r1 es] -> + (forall e. [ConsumeTerminate a r1 e] -> Eff (e :& es) r2) -> + (forall e. [ConsumeTerminate a r1 e] -> Eff (e :& es) r2) connectMany' [] k = k connectMany' (MkWrap s : ss) k = connectMany' ss ( \cs -> - consumeStream + consumeStreamOrTerminate (\c -> useImplIn k (mapHandle c : map mapHandle cs)) (useImplWithin s) )