Skip to content

Commit

Permalink
Use ConsumeTerminate
Browse files Browse the repository at this point in the history
  • Loading branch information
tomjaguarpaw committed Dec 27, 2024
1 parent 9fc4306 commit 8145233
Showing 1 changed file with 21 additions and 22 deletions.
43 changes: 21 additions & 22 deletions bluefin-examples/src/Bluefin/Examples/Stream/Many.hs
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,22 @@ import Bluefin.Compound
useImplUnder,
useImplWithin,
)
import Bluefin.Consume (Consume, await)
import Bluefin.ConsumeTerminate
( ConsumeTerminate,
awaitOrTerminate,
consumeStreamOrTerminate,
)
import Bluefin.Eff (Eff, bracket, runEff, (:&), (:>))
import Bluefin.Exception (try)
import Bluefin.IO (IOE, effIO)
import Bluefin.Jump (jumpTo, withJump)
import Bluefin.State (evalState, get, modify)
import Bluefin.Stream (Stream, consumeStream, forEach, yield)
import Bluefin.Stream (Stream, forEach, yield)
import Bluefin.System.IO (hGetLine, hIsEOF, withFile)
import Control.Monad (forever, replicateM_, when)
import Data.Function (fix)
import Data.Maybe (Maybe (..), isNothing)
import Data.Traversable (for)
import System.IO (IOMode (ReadMode))
import Prelude hiding
( break,
Expand Down Expand Up @@ -81,13 +87,16 @@ mix ::
Stream [Data.Maybe.Maybe String] e1 ->
Eff es ()
mix timings io y = do
let itersStreams :: [Wrap (Stream (Maybe String)) () es]
itersStreams = map (\x -> MkWrap (nothingOnEnd (pad io x))) timings
let itersStreams :: [Wrap (Stream String) () es]
itersStreams = map (\x -> MkWrap (pad io x)) timings

connectMany itersStreams $ \itersStart -> do
flip fix itersStart $ \again iters -> do
when (not (null iters)) $ do
outs <- traverse await iters
outs <- for iters $ \y' -> do
try (awaitOrTerminate y') >>= \case
Left () -> pure Nothing
Right r -> pure (Just r)
yield y outs
let iters' =
[ iter
Expand All @@ -96,16 +105,6 @@ mix timings io y = do
]
again iters'

-- | When the stream is finished, yield Nothing for evermore.
nothingOnEnd ::
(e1 :> es) =>
(forall e. Stream a e -> Eff (e :& es) r) ->
Stream (Data.Maybe.Maybe a) e1 ->
Eff es r
nothingOnEnd s y = do
_ <- forEach s $ \a -> yield y (Data.Maybe.Just a)
forever (yield y Data.Maybe.Nothing)

pad ::
(e1 :> es, e2 :> es) =>
IOE e1 ->
Expand Down Expand Up @@ -192,25 +191,25 @@ take n k y =
-- it's just how the current implementation happens to work.)
connectMany ::
-- | n effectful operations that yield `a`s
[Wrap (Stream a) r es] ->
[Wrap (Stream a) r1 es] ->
-- | Will be called with a list of n Consumes,
-- to which the streams above will yield their
-- `a`s
(forall e. [Consume a e] -> Eff (e :& es) r) ->
Eff es r
(forall e. [ConsumeTerminate a r1 e] -> Eff (e :& es) r2) ->
Eff es r2
connectMany ss k =
makeOp (connectMany' ss k [])

connectMany' ::
[Wrap (Stream a) r es] ->
(forall e. [Consume a e] -> Eff (e :& es) r) ->
(forall e. [Consume a e] -> Eff (e :& es) r)
[Wrap (Stream a) r1 es] ->
(forall e. [ConsumeTerminate a r1 e] -> Eff (e :& es) r2) ->
(forall e. [ConsumeTerminate a r1 e] -> Eff (e :& es) r2)
connectMany' [] k = k
connectMany' (MkWrap s : ss) k =
connectMany'
ss
( \cs ->
consumeStream
consumeStreamOrTerminate
(\c -> useImplIn k (mapHandle c : map mapHandle cs))
(useImplWithin s)
)

0 comments on commit 8145233

Please sign in to comment.