-
Notifications
You must be signed in to change notification settings - Fork 0
/
CBCAST.hs
167 lines (154 loc) · 6.01 KB
/
CBCAST.hs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
{-# OPTIONS_GHC "-Wno-unused-imports" #-} -- LH needs bodies of reflected definitions
-- | External CBCAST client functions which do not require Liquid Haskell for
-- correctness.
module CBCAST
( -- * Initialization
newProcess
-- * State transitions
, receive
, deliver
, broadcast
-- * Types
, CB.PID
, CB.Process(), CB.pVC, CB.pID, CB.pDQ
, CB.Message(), CB.mVC, CB.mSender, CB.mRaw
) where
import Text.Printf (printf)
import Control.Arrow (first)
import qualified Redefined
import qualified CBCAST.Transitions
import qualified CBCAST.Core as CB
import qualified CBCAST.Step as CB
import CBCAST.Generic -- FIXME
-- $setup
-- >>> import Control.Concurrent.STM
-- >>> import CBCAST.Core (Process)
-- >>> let m1 = CB.Message [0,1,0] 1 "hello!"
-- >>> let m2 = CB.Message [0,2,0] 1 "world!"
-- >>> let mLongVC = CB.Message [0,1,0,0] 1 "hello!"
-- >>> let mSamePID = CB.Message [0,0,1] 2 "hello!"
-- >>> sendToCluster _ = return ()
isNat :: Int -> Bool
isNat n = 0 <= n
{-@ inline isNat @-}
isFin :: Redefined.Fin -> Int -> Bool
isFin x n = x < n
{-@ inline isFin @-}
expectedNat :: String -> String
expectedNat = printf "`%s` must be a `Nat`"
expectedFin :: String -> Int -> String
expectedFin = printf "`%s` must be a `Fin %d`"
-- | @newProcess n pid :: Process r@ creates a new CBCAST process with
-- identifier @pid@, for a cluster with @n@ participants, exchanging messages
-- of type @r@.
--
-- >>> newProcess 3 2
-- Right (Process {pVC = [0,0,0], pID = 2, pDQ = [], pHist = []})
--
-- Both @n@ and @pid@ must be /Nat/s. Additionally @pid@ must be a /Fin @n@/.
--
-- >>> newProcess (-3) 2
-- Left "<newProcess n:-3 pid:2>: `n` must be a `Nat`"
-- >>> newProcess 3 (-2)
-- Left "<newProcess n:3 pid:-2>: `pid` must be a `Nat`"
-- >>> newProcess 3 8
-- Left "<newProcess n:3 pid:8>: `pid` must be a `Fin 3`"
--
newProcess :: Int -> CB.PID -> Either String (CB.Process r)
newProcess n pid
| not $ isNat n = Left $ prefix ++ expectedNat "n"
| not $ isNat pid = Left $ prefix ++ expectedNat "pid"
| not $ isFin pid n = Left $ prefix ++ expectedFin "pid" n
| otherwise = Right $ CB.pEmpty n pid
where
prefix = printf "<newProcess n:%d pid:%d>: " n pid
-- | Receive state transition. Call this for messages that arrive from the
-- network, to insert them in the delay queue for later delivery.
--
-- >>> let Right p = newProcess 3 2
-- >>> receive m1 p
-- Right (Process {pVC = [0,0,0], pID = 2, pDQ = [Message {mVC = [0,1,0], mSender = 1, mRaw = "hello!"}], pHist = []})
--
-- The vector clock size of the message must match the vector clock size of the
-- process.
--
-- >>> receive mLongVC p
-- Left "<receive m p>: `messageSize m:4` must equal `processSize p:3`"
--
-- Messages with the sender ID of the current process are ignored (see
-- 'broadcast' to learn how to handle messages from the current process).
--
-- >>> receive mSamePID p
-- Left "<receive m p>: `mSender m:2` must be distinct from `pID p:2`"
--
receive :: CB.Message r -> CB.Process r -> Either String (CB.Process r)
receive m p
| not $ CB.messageSize m == CB.processSize p = Left $ printf "<receive m p>: `messageSize m:%d` must equal `processSize p:%d`" (CB.messageSize m) (CB.processSize p)
| CB.mSender m == CB.pID p = Left $ printf "<receive m p>: `mSender m:%d` must be distinct from `pID p:%d`" (CB.mSender m) (CB.pID p)
| otherwise = let CB.ResultReceive _n ret = CB.step (CB.OpReceive n m) p in Right ret
where
n = CB.messageSize m
-- | Deliver state-transition. Call this to check for and return a deliverable
-- message from the delay queue (updating the internal vector clock and history
-- as appropriate).
--
-- >>> let Right p = receive m1 =<< newProcess 3 2
-- >>> deliver p
-- (Just (Message {mVC = [0,1,0], mSender = 1, mRaw = "hello!"}),Process {pVC = [0,1,0], pID = 2, pDQ = [], pHist = [...]})
--
-- When a message is returned, it should be immediately processed by the user
-- application. This should be followed by repeated delivery attempts until
-- nothing is deliverable.
--
-- >>> let Right p = receive m1 =<< newProcess 3 2
-- >>> procVar <- newTVarIO p
-- >>> appVar <- newTVarIO ([] :: [String]) -- Processed commands
-- >>> :{
-- atomically $ do
-- message <- stateTVar procVar deliver
-- maybe retry (\m -> modifyTVar appVar (CB.mRaw m :)) message
-- :}
--
-- >>> readTVarIO appVar
-- ["hello!"]
--
-- When no message in the delay queue is deliverable, @deliver p@ will return
-- @p@ unchanged.
--
-- >>> let Right p = receive m2 =<< newProcess 3 2
-- >>> deliver p
-- (Nothing,Process {pVC = [0,0,0], pID = 2, pDQ = [Message {mVC = [0,2,0], mSender = 1, mRaw = "world!"}], pHist = []})
--
deliver :: CB.Process r -> (Maybe (CB.Message r), CB.Process r)
deliver p = maybe (Nothing, p) (first Just) ret
where
n = CB.processSize p `const` ("Proof that processSize returns a Nat", Redefined.listLength $ CB.pVC p)
CB.ResultDeliver _n ret = CB.step (CB.OpDeliver n) p
-- | Broadcast state transition. Call this to prepare a message for broadcast.
--
-- >>> let Right p = newProcess 3 2
-- >>> broadcast "hooray!" p
-- (Message {mVC = [0,0,1], mSender = 2, mRaw = "hooray!"},Process {pVC = [0,0,1], pID = 2, pDQ = [], pHist = [...]})
--
-- The returned message must be immediately processed by the user application,
-- and then sent on the network to all members of the cluster.
--
-- >>> let Right p = newProcess 3 2
-- >>> procVar <- newTVarIO (p :: Process String)
-- >>> appVar <- newTVarIO ([] :: [String]) -- Processed commands
-- >>> :{
-- do m <- atomically $ do
-- message <- stateTVar procVar $ broadcast "hooray!"
-- modifyTVar appVar (CB.mRaw message :)
-- return message
-- sendToCluster m
-- :}
--
-- >>> readTVarIO appVar
-- ["hooray!"]
--
broadcast :: r -> CB.Process r -> (CB.Message r, CB.Process r)
broadcast raw p = ret
where
n = CB.processSize p `const` ("Proof that processSize returns a Nat", Redefined.listLength $ CB.pVC p)
CB.ResultBroadcast _n ret = CB.step (CB.OpBroadcast n raw) p