-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathMain.hs
226 lines (200 loc) · 7.87 KB
/
Main.hs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
import Control.Applicative (pure)
import Control.Concurrent (forkIO, threadDelay)
import Control.Concurrent.STM (atomically)
import Control.Concurrent.STM.TChan (TChan, newTChan, readTChan, writeTChan,
isEmptyTChan)
import Control.Exception (try, SomeException)
import Control.Monad (forever, forM_, liftM)
import qualified Data.ByteString as BS
import Data.Function (on)
import qualified Data.Map as M
import Data.Maybe (catMaybes)
import qualified Network.Socket as S
import qualified Network.Socket.ByteString as SB
import System.Environment (getArgs)
import System.IO (openFile, hSetBuffering, hGetLine, hPutStrLn,
IOMode (..), BufferMode(..), Handle)
import System.Timeout (timeout)
import System.Time (getClockTime, ClockTime(TOD))
import Text.Printf (printf)
data Message = NewRxConnection S.SockAddr
| LostRxConnection
| NewTxConnection
| LostTxConnection
| DataReceived Int
| DataSent Int
| Tick
deriving (Show)
data TxConnMessage = TxEstablished
| TxLost
| TxFailed
data LogMessage = LogString String | LogStats BinStats deriving (Show)
data Stats = Stats
{ sRxBytes :: !Integer
, sTxBytes :: !Integer
} deriving Show
data Bin = Bin
{ bTicks :: !Int
, bMaxTicks :: !Int
, bPrevStats :: !Stats
} deriving Show
data BinStats = BinStats
{ bsTicks :: !Int
, bsRxBytes :: !Integer
, bsTxBytes :: !Integer
} deriving Show
blockSize = 2^16
defaultTickPeriodUs = 1000000 :: Int
defaultTicksPerBin = [1, 20, 60, 600, 3600]
main :: IO ()
main = S.withSocketsDo $ do
args <- getArgs
if length args /= 4 then
putStrLn "Usage: Main [tx_threads] [local_port] [remote_ip] [remote_port]"
else do
let threads = read $ args !! 0
localPort = read $ args !! 1
peerPort = read $ args !! 3
peerAddr <- S.inet_addr $ args !! 2
dataChan <- atomically newTChan
logChan <- atomically newTChan
forkIO $ doRx dataChan localPort
forkIO $ doTx dataChan (S.SockAddrInet (fromIntegral peerPort) peerAddr)
threads
forkIO $ gatherStats dataChan logChan
writeLogs logChan
return ()
writeLogs :: TChan LogMessage -> IO ()
writeLogs chan = do
mainLog <- open "main.log"
loop mainLog M.empty
where
open name = do
h <- openFile name AppendMode
hSetBuffering h NoBuffering
return h
loop mainLog tickLogs = do
m <- atomically $ readTChan chan
ut <- liftM fmtClock getClockTime
case m of
LogString s -> do
putStrLn s
hPutStrLn mainLog $ ut ++ " " ++ s
loop mainLog tickLogs
LogStats bw@(BinStats ticks rx tx) -> do
let rx' = 8 * rx `div` (fromIntegral ticks)
tx' = 8 * tx `div` (fromIntegral ticks)
(tickLogs', h) <- getOrOpen tickLogs ticks
putStrLn $ fmtBw bw
hPutStrLn h (ut ++ " " ++ show rx' ++ " " ++ show tx')
loop mainLog tickLogs'
getOrOpen tickLogs ticks =
case M.lookup ticks tickLogs of
Just h -> return (tickLogs, h)
Nothing -> do h <- open $ "bw" ++ show ticks ++ ".log"
return (M.insert ticks h tickLogs, h)
fmtClock (TOD s ps) = printf "%d.%06d" s (ps `div` 10^6)
fmtBw s = let ticks = bsTicks s
fmt x = formatBw (8 * x `div` fromIntegral ticks)
in (fmt $ bsRxBytes s) ++ " " ++ (fmt $ bsTxBytes s)
gatherStats :: TChan Message -> TChan LogMessage -> IO ()
gatherStats chan logChan = do
forkIO $ tick
let stats = Stats 0 0
let bins = map (\n -> Bin 0 n stats) defaultTicksPerBin
processMessages stats bins
where
tick = forever $ do
threadDelay defaultTickPeriodUs
atomically $ writeTChan chan Tick
processMessages stats bins = do
m <- atomically $ readTChan chan
case m of
DataReceived n -> let stats' = stats {sRxBytes = sRxBytes stats +
fromIntegral n}
in processMessages stats' bins
DataSent n -> let stats' = stats {sTxBytes = sTxBytes stats +
fromIntegral n}
in processMessages stats' bins
Tick -> let (bins', diffs) = unzip $ map (updateBin stats) bins
diffs' = catMaybes diffs
in printStats diffs' >> processMessages stats bins'
m -> do atomically $ writeTChan logChan (LogString $ show m)
processMessages stats bins
updateBin stats bin =
let ticks' = (bTicks bin + 1) `rem` bMaxTicks bin
bin' = bin {bTicks = ticks'}
bin'' = bin' {bPrevStats = stats}
bs = BinStats { bsTicks = bMaxTicks bin
, bsRxBytes = ((-) `on` (sRxBytes . bPrevStats)) bin'' bin
, bsTxBytes = ((-) `on` (sTxBytes . bPrevStats)) bin'' bin
}
in case bTicks bin' of
0 -> (bin'', Just bs)
_ -> (bin', Nothing)
printStats stats = forM_ stats $ \s ->
atomically $ writeTChan logChan $ LogStats s
doRx :: TChan Message -> Int -> IO ()
doRx chan portno = do
sock <- setup portno
process sock
where
setup portno = do
sock <- S.socket S.AF_INET S.Stream S.defaultProtocol
S.setSocketOption sock S.ReuseAddr 1
S.bindSocket sock $ S.SockAddrInet (fromIntegral portno) 0
S.listen sock 10
return sock
process sock = forever $ do
(s, addr) <- S.accept sock
sendMsg $ NewRxConnection addr
forkIO $ rx s
rx sock = do
d <- SB.recv sock blockSize
case BS.length d of
0 -> sendMsg LostRxConnection >> S.sClose sock
n -> (sendMsg $ DataReceived n) >> rx sock
sendMsg m = atomically $ writeTChan chan $ m
doTx :: TChan Message -> S.SockAddr -> Int -> IO ()
doTx chan addr num = do
chan' <- atomically newTChan
loop 0 chan'
where
loop nRunning chan' = do
let delay = if nRunning < num then 50000 else -1
m <- timeout delay $ atomically $ readTChan chan'
case m of
Just TxLost -> do
atomically $ writeTChan chan LostTxConnection
loop (nRunning - 1) chan'
Just TxFailed -> do
loop (nRunning - 1) chan'
Just TxEstablished -> do
atomically $ writeTChan chan NewTxConnection
Nothing -> do
forkIO $ txConnection chan'
loop (nRunning + 1) chan'
txConnection chan' = do
sock <- S.socket S.AF_INET S.Stream S.defaultProtocol
r <- try (S.connect sock addr) :: IO (Either SomeException ())
case r of
Right _ -> floodSocket chan' sock
Left _ -> atomically $ writeTChan chan' TxFailed
S.sClose sock
floodSocket chan' sock = do
let bs = BS.replicate blockSize 0
r <- try (SB.send sock bs) :: IO (Either SomeException Int)
case r of
Right n -> do
atomically $ writeTChan chan $ DataSent n
floodSocket chan' sock
Left _ -> atomically $ writeTChan chan' $ TxLost
suffixNames :: [(Integer, String)]
suffixNames = zip (map (1000^) [0..]) (map pure "BKMGT")
formatBw :: Integer -> String
formatBw i = let (mult, name) = findSuffix
val = fromIntegral i / fromIntegral mult :: Double
in printf "%6.1f" val ++ name
where
findSuffix = last $ head suffixNames :
takeWhile ((i > ) .(*10) . fst) suffixNames