Skip to content

Commit

Permalink
simplifying sync with sender
Browse files Browse the repository at this point in the history
  • Loading branch information
kazu-yamamoto committed Oct 17, 2024
1 parent ed663cc commit 620687b
Show file tree
Hide file tree
Showing 6 changed files with 230 additions and 209 deletions.
97 changes: 56 additions & 41 deletions Network/HTTP2/Client/Run.hs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,10 @@ run cconf@ClientConfig{..} conf client = do
{ auxPossibleClientStreams = possibleClientStream ctx
}
clientCore ctx req processResponse = do
strm <- sendRequest conf ctx scheme authority req
(strm, moutobj) <- makeStream ctx scheme authority req
case moutobj of
Nothing -> return ()
Just outobj -> sendRequest conf ctx strm outobj
rsp <- getResponse strm
x <- processResponse rsp
adjustRxWindow ctx strm
Expand All @@ -109,7 +112,10 @@ runIO cconf@ClientConfig{..} conf@Config{..} action = do
ctx@Context{..} <- setup cconf conf
let putB bs = enqueueControl controlQ $ CFrames Nothing [bs]
putR req = do
strm <- sendRequest conf ctx scheme authority req
(strm, moutobj) <- makeStream ctx scheme authority req
case moutobj of
Nothing -> return ()
Just outobj -> sendRequest conf ctx strm outobj
return (streamNumber strm, strm)
get = getResponse
create = openOddStreamWait ctx
Expand Down Expand Up @@ -165,23 +171,22 @@ runH2 conf ctx runClient = do
Left () -> do
wait runningClient

sendRequest
:: Config
-> Context
makeStream
:: Context
-> Scheme
-> Authority
-> Request
-> IO Stream
sendRequest conf ctx@Context{..} scheme auth (Request req) = do
-> IO (Stream, Maybe OutObj)
makeStream ctx@Context{..} scheme auth (Request req) = do
-- Checking push promises
let hdr0 = outObjHeaders req
method = fromMaybe (error "sendRequest:method") $ lookup ":method" hdr0
path = fromMaybe (error "sendRequest:path") $ lookup ":path" hdr0
method = fromMaybe (error "makeStream:method") $ lookup ":method" hdr0
path = fromMaybe (error "makeStream:path") $ lookup ":path" hdr0
mstrm0 <- lookupEvenCache evenStreamTable method path
case mstrm0 of
Just strm0 -> do
deleteEvenCache evenStreamTable method path
return strm0
return (strm0, Nothing)
Nothing -> do
-- Arch/Sender is originally implemented for servers where
-- the ordering of responses can be out-of-order.
Expand All @@ -200,38 +205,48 @@ sendRequest conf ctx@Context{..} scheme auth (Request req) = do
| otherwise = hdr1
req' = req{outObjHeaders = hdr2}
-- FLOW CONTROL: SETTINGS_MAX_CONCURRENT_STREAMS: send: respecting peer's limit
(sid, newstrm) <- openOddStreamWait ctx
sendHeaderBody conf ctx sid newstrm req'
return newstrm
(_sid, newstrm) <- openOddStreamWait ctx
return (newstrm, Just req')

sendHeaderBody :: Config -> Context -> StreamId -> Stream -> OutObj -> IO ()
sendHeaderBody Config{..} ctx@Context{..} sid newstrm OutObj{..} = do
(mnext, mtbq) <- case outObjBody of
OutBodyNone -> return (Nothing, Nothing)
OutBodyFile (FileSpec path fileoff bytecount) -> do
(pread, sentinel) <- confPositionReadMaker path
let next = fillFileBodyGetNext pread fileoff bytecount sentinel
return (Just next, Nothing)
OutBodyBuilder builder -> do
let next = fillBuilderBodyGetNext builder
return (Just next, Nothing)
OutBodyStreaming strmbdy -> do
q <- sendStreaming ctx newstrm $ \iface ->
outBodyUnmask iface $ strmbdy (outBodyPush iface) (outBodyFlush iface)
let next = nextForStreaming q
return (Just next, Just q)
OutBodyStreamingIface strmbdy -> do
q <- sendStreaming ctx newstrm strmbdy
let next = nextForStreaming q
return (Just next, Just q)
((var, sync), out) <-
prepareSync newstrm (OHeader outObjHeaders mnext outObjTrailers) mtbq
atomically $ do
sidOK <- readTVar outputQStreamID
check (sidOK == sid)
enqueueOutputSTM outputQ out
writeTVar outputQStreamID (sid + 2)
forkManaged threadManager "H2 worker" $ syncWithSender ctx newstrm var sync
sendRequest :: Config -> Context -> Stream -> OutObj -> IO ()
sendRequest Config{..} ctx@Context{..} strm OutObj{..} =
forkManaged threadManager label $ do
let sid = streamNumber strm
(mnext, mtbq) <- case outObjBody of
OutBodyNone -> return (Nothing, Nothing)
OutBodyFile (FileSpec path fileoff bytecount) -> do
(pread, sentinel) <- confPositionReadMaker path
let next = fillFileBodyGetNext pread fileoff bytecount sentinel
return (Just next, Nothing)
OutBodyBuilder builder -> do
let next = fillBuilderBodyGetNext builder
return (Just next, Nothing)
OutBodyStreaming strmbdy -> do
q <- sendStreaming ctx strm $ \iface ->
outBodyUnmask iface $ strmbdy (outBodyPush iface) (outBodyFlush iface)
let next = nextForStreaming q
return (Just next, Just q)
OutBodyStreamingIface strmbdy -> do
q <- sendStreaming ctx strm strmbdy
let next = nextForStreaming q
return (Just next, Just q)
let ot = (OHeader outObjHeaders mnext outObjTrailers)
(var, out) <- makeOutput strm ot
atomically $ do
sidOK <- readTVar outputQStreamID
check (sidOK == sid)
writeTVar outputQStreamID (sid + 2)
enqueueOutputSTM outputQ out
tovar <- newTVarIO False
let lc =
LoopCheck
{ lcTBQ = mtbq
, lcTimeout = tovar
, lcWindow = streamTxFlow strm
}
syncWithSender' ctx var lc
where
label = "H2 request sender for stream " ++ show (streamNumber strm)

sendStreaming
:: Context
Expand Down
97 changes: 45 additions & 52 deletions Network/HTTP2/H2/Sender.hs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ frameSender
x <- atomically $ dequeue off
case x of
C ctl -> flushN off >> control ctl >> loop 0
O out -> outputOrEnqueueAgain out off >>= flushIfNecessary >>= loop
O out -> outputAndSync out off >>= flushIfNecessary >>= loop
Flush -> flushN off >> loop 0

-- Flush the connection buffer to the socket, where the first 'n' bytes of
Expand Down Expand Up @@ -139,29 +139,31 @@ frameSender
Just siz -> setLimitForEncoding siz encodeDynamicTable

----------------------------------------------------------------
outputOrEnqueueAgain :: Output -> Offset -> IO Offset
outputOrEnqueueAgain out@(Output strm otyp sync) off = E.handle (\e -> resetStream strm InternalError e >> return off) $ do
-- INVARIANT
--
-- Both the stream window and the connection window are open.
----------------------------------------------------------------
outputAndSync :: Output -> Offset -> IO Offset
outputAndSync out@(Output strm otyp sync) off = E.handle (\e -> resetStream strm InternalError e >> return off) $ do
state <- readStreamState strm
if isHalfClosedLocal state
then return off
else case otyp of
OHeader hdr mnext tlrmkr ->
-- Send headers immediately, without waiting for data
-- No need to check the streaming window (applies to DATA frames only)
outputHeader strm hdr mnext tlrmkr sync off
OHeader hdr mnext tlrmkr -> do
(off', mout') <- outputHeader strm hdr mnext tlrmkr sync off
case mout' of
Nothing -> sync Done
Just out' -> sync $ Cont out'
return off'
_ -> do
-- The 'sync' function usage constraints hold here: We
-- just popped off the only 'Output' for this stream,
-- and we only enqueue a new output (in 'output') if
-- 'sync' returns 'True'
ok <- sync $ Just otyp
if ok
then do
sws <- getStreamWindowSize strm
cws <- getConnectionWindowSize ctx -- not 0
let lim = min cws sws
output out off lim
else return off
sws <- getStreamWindowSize strm
cws <- getConnectionWindowSize ctx -- not 0
let lim = min cws sws
(off', mout') <- output out off lim
case mout' of
Nothing -> sync Done
Just out' -> sync $ Cont out'
return off'

resetStream :: Stream -> ErrorCode -> E.SomeException -> IO ()
resetStream strm err e = do
Expand All @@ -175,9 +177,9 @@ frameSender
-> [Header]
-> Maybe DynaNext
-> TrailersMaker
-> (Maybe OutputType -> IO Bool)
-> (Sync -> IO ())
-> Offset
-> IO Offset
-> IO (Offset, Maybe Output)
outputHeader strm hdr mnext tlrmkr sync off0 = do
-- Header frame and Continuation frame
let sid = streamNumber strm
Expand All @@ -186,19 +188,19 @@ frameSender
off' <- headerContinue sid ths endOfStream off0
-- halfClosedLocal calls closed which removes
-- the stream from stream table.
when endOfStream $ do
halfClosedLocal ctx strm Finished
void $ sync Nothing
off <- flushIfNecessary off'
case mnext of
Nothing -> return off
Nothing -> do
-- endOfStream
halfClosedLocal ctx strm Finished
return (off, Nothing)
Just next -> do
let out' = Output strm (ONext next tlrmkr) sync
outputOrEnqueueAgain out' off
return (off, Just out')

----------------------------------------------------------------
output :: Output -> Offset -> WindowSize -> IO Offset
output out@(Output strm (ONext curr tlrmkr) sync) off0 lim = do
output :: Output -> Offset -> WindowSize -> IO (Offset, Maybe Output)
output out@(Output strm (ONext curr tlrmkr) _) off0 lim = do
-- Data frame payload
buflim <- readIORef outputBufferLimit
let payloadOff = off0 + frameHeaderLength
Expand All @@ -208,13 +210,12 @@ frameSender
case next of
Next datPayloadLen reqflush mnext -> do
NextTrailersMaker tlrmkr' <- runTrailersMaker tlrmkr datBuf datPayloadLen
fillDataHeaderEnqueueNext
fillDataHeader
strm
off0
datPayloadLen
mnext
tlrmkr'
sync
out
reqflush
CancelNext mErr -> do
Expand All @@ -233,15 +234,14 @@ frameSender
resetStream strm InternalError err
Nothing ->
resetStream strm Cancel (E.toException CancelledStream)
return off0
output (Output strm (OPush ths pid) sync) off0 _lim = do
return (off0, Nothing)
output (Output strm (OPush ths pid) _) off0 _lim = do
-- Creating a push promise header
-- Frame id should be associated stream id from the client.
let sid = streamNumber strm
len <- pushPromise pid sid ths off0
off <- flushIfNecessary $ off0 + frameHeaderLength + len
_ <- sync Nothing
return off
return (off, Nothing)
output _ _ _ = undefined -- never reached

----------------------------------------------------------------
Expand Down Expand Up @@ -285,23 +285,21 @@ frameSender
continue off' ths' FrameContinuation

----------------------------------------------------------------
fillDataHeaderEnqueueNext
fillDataHeader
:: Stream
-> Offset
-> Int
-> Maybe DynaNext
-> (Maybe ByteString -> IO NextTrailersMaker)
-> (Maybe OutputType -> IO Bool)
-> Output
-> Bool
-> IO Offset
fillDataHeaderEnqueueNext
-> IO (Offset, Maybe Output)
fillDataHeader
strm@Stream{streamNumber}
off
datPayloadLen
Nothing
tlrmkr
sync
_
reqflush = do
let buf = confWriteBuffer `plusPtr` off
Expand All @@ -321,41 +319,37 @@ frameSender
else
return off
off'' <- handleTrailers mtrailers off'
_ <- sync Nothing
halfClosedLocal ctx strm Finished
if reqflush
then do
flushN off''
return 0
else return off''
return (0, Nothing)
else return (off'', Nothing)
where
handleTrailers Nothing off0 = return off0
handleTrailers (Just trailers) off0 = do
(ths, _) <- toTokenHeaderTable trailers
headerContinue streamNumber ths True {- endOfStream -} off0
fillDataHeaderEnqueueNext
fillDataHeader
_
off
0
(Just next)
tlrmkr
_
out
reqflush = do
let out' = out{outputType = ONext next tlrmkr}
enqueueOutput outputQ out'
if reqflush
then do
flushN off
return 0
else return off
fillDataHeaderEnqueueNext
return (0, Just out')
else return (off, Just out')
fillDataHeader
strm@Stream{streamNumber}
off
datPayloadLen
(Just next)
tlrmkr
_
out
reqflush = do
let buf = confWriteBuffer `plusPtr` off
Expand All @@ -364,12 +358,11 @@ frameSender
fillFrameHeader FrameData datPayloadLen streamNumber flag buf
decreaseWindowSize ctx strm datPayloadLen
let out' = out{outputType = ONext next tlrmkr}
enqueueOutput outputQ out'
if reqflush
then do
flushN off'
return 0
else return off'
return (0, Just out')
else return (off', Just out')

----------------------------------------------------------------
pushPromise :: StreamId -> StreamId -> TokenHeaderList -> Offset -> IO Int
Expand Down
Loading

0 comments on commit 620687b

Please sign in to comment.