Skip to content

Commit

Permalink
Concurrency and free monads.
Browse files Browse the repository at this point in the history
  • Loading branch information
athas committed Oct 15, 2024
1 parent ca8e077 commit 37c51c3
Show file tree
Hide file tree
Showing 4 changed files with 598 additions and 60 deletions.
97 changes: 38 additions & 59 deletions haskell/Week7/FreeConcurrency.hs
Original file line number Diff line number Diff line change
@@ -1,39 +1,20 @@
{-# LANGUAGE ExistentialQuantification #-}

module Week6.FreeConcurrency where

import Control.Concurrent (Chan, forkIO, newChan, readChan, writeChan)
import Control.Monad (ap, liftM)
import Data.Maybe (fromMaybe)
import Week2.ReaderState (State, get, put, runState)

data Free e a
= Pure a
| Free (e (Free e a))

instance (Functor e) => Functor (Free e) where
fmap = liftM

instance (Functor e) => Applicative (Free e) where
pure = Pure
(<*>) = ap

instance (Functor e) => Monad (Free e) where
Pure x >>= f = f x
Free g >>= f = Free $ h <$> g
where
h x = x >>= f
import Week4.Free (Free (..))

type Msg = String

type CC chan a = Free (CCOp chan) a

data CCOp chan a
= CCFork (CC chan ()) a
| CCNewChan (chan -> a)
| CCSend chan Msg a
| CCReceive chan (Msg -> a)

type CC chan a = Free (CCOp chan) a

instance Functor (CCOp chan) where
fmap f (CCFork m c) = CCFork m (f c)
fmap f (CCNewChan c) = CCNewChan $ f . c
Expand Down Expand Up @@ -71,9 +52,9 @@ interpCCIO (Free (CCReceive chan c)) = do
type ChanId = Int

data CCState = CCState
{ ccCounter :: Int,
{ ccCounter :: ChanId,
ccChans :: [(ChanId, [Msg])],
ccThreads :: [CC Int ()]
ccThreads :: [CC ChanId ()]
}

getChan :: ChanId -> State CCState [Msg]
Expand All @@ -94,58 +75,56 @@ setChan chan_id msgs = do
: filter ((/= chan_id) . fst) (ccChans state)
}

addThread :: CC Int () -> State CCState ()
addThread :: CC ChanId () -> State CCState ()
addThread m = do
state <- get
put $ state {ccThreads = m : ccThreads state}

incCounter :: State CCState Int
incCounter :: State CCState ChanId
incCounter = do
state <- get
put $ state {ccCounter = ccCounter state + 1}
pure $ ccCounter state

interp :: CC Int a -> State CCState (CC Int a)
interp (Pure x) = pure $ Pure x
interp (Free (CCSend chan_id msg c)) = do
step :: CC ChanId a -> State CCState (CC ChanId a)
step (Pure x) = pure $ Pure x
step (Free (CCNewChan c)) = do
chan_id <- incCounter
setChan chan_id []
step $ c chan_id
step (Free (CCFork m c)) = do
addThread m
step c
step (Free (CCSend chan_id msg c)) = do
msgs <- getChan chan_id
setChan chan_id $ msgs ++ [msg]
interp c
interp (Free (CCReceive chan_id c)) = do
step c
step (Free (CCReceive chan_id c)) = do
msgs <- getChan chan_id
case msgs of
[] -> pure $ Free $ CCReceive chan_id c
msg : msgs' -> do
setChan chan_id msgs'
interp $ c msg
interp (Free (CCFork (Pure _) c)) =
interp c
interp (Free (CCFork m c)) = do
addThread m
interp c
interp (Free (CCNewChan c)) = do
chan_id <- incCounter
setChan chan_id []
interp $ c chan_id
step $ c msg

stepThreads :: State CCState ()
stepThreads = do
state <- get
put $ state {ccThreads = []}
threads <- mapM interp $ ccThreads state
threads <- mapM step $ ccThreads state
new_state <- get
put $ new_state {ccThreads = threads ++ ccThreads new_state}

interpFully :: CC Int a -> State CCState a
interpFully (Pure x) = pure x
interpFully (Free op) = do
interp :: CC ChanId a -> State CCState a
interp (Pure x) = pure x
interp (Free op) = do
stepThreads
op' <- interp $ Free op
interpFully op'
op' <- step $ Free op
interp op'

interpCCPure :: CC Int a -> a
interpCCPure :: CC ChanId a -> a
interpCCPure orig =
fst $ runState initial_state $ interpFully orig
fst $ runState initial_state $ interp orig
where
initial_state =
CCState
Expand All @@ -154,25 +133,25 @@ interpCCPure orig =
ccThreads = []
}

carousel :: CC chan String
carousel = do
pipeline :: CC chan String
pipeline = do
chan_0 <- ccNewChan
chan_1 <- ccNewChan
chan_2 <- ccNewChan
chan_3 <- ccNewChan
chan_4 <- ccNewChan
let passOn from to = do
let passOn tok from to = do
x <- ccReceive from
ccSend to $ x ++ "x"
ccFork $ passOn chan_0 chan_1
ccFork $ passOn chan_1 chan_2
ccFork $ passOn chan_2 chan_3
ccFork $ passOn chan_3 chan_4
ccSend to $ x ++ tok
ccFork $ passOn "a" chan_0 chan_1
ccFork $ passOn "b" chan_1 chan_2
ccFork $ passOn "c" chan_2 chan_3
ccFork $ passOn "d" chan_3 chan_4
ccSend chan_0 ""
ccReceive chan_4

demoIO :: IO ()
demoIO = print =<< interpCCIO carousel
demoIO = print =<< interpCCIO pipeline

demoPure :: IO ()
demoPure = print $ interpCCPure carousel
demoPure = print $ interpCCPure pipeline
68 changes: 68 additions & 0 deletions haskell/Week7/InitialAttempt.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
module Week6.InitialAttempt where

import Control.Concurrent (Chan, forkIO, newChan, readChan, writeChan)
import Week4.Free (Free (..))

type Msg = String

data CCOp a
= CCFork (CC ()) a
| CCNewChan (Chan Msg -> a)
| CCSend (Chan Msg) Msg a
| CCReceive (Chan Msg) (Msg -> a)

type CC a = Free CCOp a

instance Functor CCOp where
fmap f (CCFork m c) = CCFork m (f c)
fmap f (CCNewChan c) = CCNewChan $ f . c
fmap f (CCSend chan msg c) = CCSend chan msg $ f c
fmap f (CCReceive chan c) = CCReceive chan $ f . c

ccNewChan :: CC (Chan Msg)
ccNewChan = Free $ CCNewChan pure

ccFork :: CC () -> CC ()
ccFork m = Free $ CCFork m $ pure ()

ccSend :: Chan Msg -> Msg -> CC ()
ccSend chan msg = Free $ CCSend chan msg $ pure ()

ccReceive :: Chan Msg -> CC Msg
ccReceive chan = Free $ CCReceive chan pure

interpCCIO :: CC a -> IO a
interpCCIO (Pure x) =
pure x
interpCCIO (Free (CCFork m c)) = do
_ <- forkIO $ interpCCIO m
interpCCIO c
interpCCIO (Free (CCNewChan c)) = do
chan <- newChan
interpCCIO $ c chan
interpCCIO (Free (CCSend chan msg c)) = do
writeChan chan msg
interpCCIO c
interpCCIO (Free (CCReceive chan c)) = do
msg <- readChan chan
interpCCIO $ c msg

pipeline :: CC String
pipeline = do
chan_0 <- ccNewChan
chan_1 <- ccNewChan
chan_2 <- ccNewChan
chan_3 <- ccNewChan
chan_4 <- ccNewChan
let passOn tok from to = do
x <- ccReceive from
ccSend to $ x ++ tok
ccFork $ passOn "a" chan_0 chan_1
ccFork $ passOn "b" chan_1 chan_2
ccFork $ passOn "c" chan_2 chan_3
ccFork $ passOn "d" chan_3 chan_4
ccSend chan_0 ""
ccReceive chan_4

demoIO :: IO ()
demoIO = print =<< interpCCIO pipeline
2 changes: 1 addition & 1 deletion src/chapter_4.md
Original file line number Diff line number Diff line change
Expand Up @@ -892,7 +892,7 @@ The `FibMemo` constructor has three components:
The instance definition and the accessor functions are fairly
straightforward; strongly resembling those we have seen before.

```
```Haskell
instance Functor FibOp where
fmap f (FibLog s x) = FibLog s $ f x
fmap f (FibMemo n m c) = FibMemo n m $ \y -> f (c y)
Expand Down
Loading

0 comments on commit 37c51c3

Please sign in to comment.