Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Client sync threads #89

Merged
merged 2 commits into from
Sep 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 25 additions & 9 deletions Network/HTTP2/Arch/Manager.hs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ module Network.HTTP2.Arch.Manager (
, timeoutKillThread
, timeoutClose
, KilledByHttp2ThreadPoolManager(..)
, incCounter
, decCounter
, waitCounter0
) where

import Control.Exception
Expand All @@ -41,7 +44,7 @@ noAction = return ()
data Command = Stop (Maybe SomeException) | Spawn | Add ThreadId | Delete ThreadId

-- | Manager to manage the thread pool and the timer.
data Manager = Manager (TQueue Command) (IORef Action) T.Manager
data Manager = Manager (TQueue Command) (IORef Action) (TVar Int) T.Manager

-- | Starting a thread pool manager.
-- Its action is initially set to 'return ()' and should be set
Expand All @@ -51,8 +54,9 @@ start :: T.Manager -> IO Manager
start timmgr = do
q <- newTQueueIO
ref <- newIORef noAction
cnt <- newTVarIO 0
void $ forkIO $ go q Set.empty ref
return $ Manager q ref timmgr
return $ Manager q ref cnt timmgr
where
go q tset0 ref = do
x <- atomically $ readTQueue q
Expand All @@ -72,19 +76,19 @@ start timmgr = do

-- | Setting the action to be spawned.
setAction :: Manager -> Action -> IO ()
setAction (Manager _ ref _) action = writeIORef ref action
setAction (Manager _ ref _ _) action = writeIORef ref action

-- | Stopping the manager.
stopAfter :: Manager -> IO a -> (Either SomeException a -> IO b) -> IO b
stopAfter (Manager q _ _) action cleanup = do
stopAfter (Manager q _ _ _) action cleanup = do
mask $ \unmask -> do
ma <- try $ unmask action
atomically $ writeTQueue q $ Stop (either Just (const Nothing) ma)
cleanup ma

-- | Spawning the action.
spawnAction :: Manager -> IO ()
spawnAction (Manager q _ _) = atomically $ writeTQueue q Spawn
spawnAction (Manager q _ _ _) = atomically $ writeTQueue q Spawn

----------------------------------------------------------------

Expand All @@ -110,7 +114,7 @@ forkManagedUnmask mgr io =
--
-- This is not part of the public API; see 'forkManaged' instead.
addMyId :: Manager -> IO ()
addMyId (Manager q _ _) = do
addMyId (Manager q _ _ _) = do
tid <- myThreadId
atomically $ writeTQueue q $ Add tid

Expand All @@ -120,7 +124,7 @@ addMyId (Manager q _ _) = do
-- the manager /before/ the thread terminates (thereby assuming responsibility
-- for thread cleanup yourself).
deleteMyId :: Manager -> IO ()
deleteMyId (Manager q _ _) = do
deleteMyId (Manager q _ _ _) = do
tid <- myThreadId
atomically $ writeTQueue q $ Delete tid

Expand All @@ -141,14 +145,14 @@ kill set err = traverse_ (\tid -> E.throwTo tid $ KilledByHttp2ThreadPoolManager

-- | Killing the IO action of the second argument on timeout.
timeoutKillThread :: Manager -> (T.Handle -> IO ()) -> IO ()
timeoutKillThread (Manager _ _ tmgr) action = E.bracket register T.cancel action
timeoutKillThread (Manager _ _ _ tmgr) action = E.bracket register T.cancel action
where
register = T.registerKillThread tmgr noAction

-- | Registering closer for a resource and
-- returning a timer refresher.
timeoutClose :: Manager -> IO () -> IO (IO ())
timeoutClose (Manager _ _ tmgr) closer = do
timeoutClose (Manager _ _ _ tmgr) closer = do
th <- T.register tmgr closer
return $ T.tickle th

Expand All @@ -159,3 +163,15 @@ instance Exception KilledByHttp2ThreadPoolManager where
toException = asyncExceptionToException
fromException = asyncExceptionFromException

----------------------------------------------------------------

incCounter :: Manager -> IO ()
incCounter (Manager _ _ cnt _) = atomically $ modifyTVar' cnt (+1)

decCounter :: Manager -> IO ()
decCounter (Manager _ _ cnt _) = atomically $ modifyTVar' cnt (subtract 1)

waitCounter0 :: Manager -> IO ()
waitCounter0 (Manager _ _ cnt _) = atomically $ do
n <- readTVar cnt
checkSTM (n < 1)
11 changes: 10 additions & 1 deletion Network/HTTP2/Arch/Sender.hs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ module Network.HTTP2.Arch.Sender (
, runTrailersMaker
) where

import Control.Concurrent.MVar (putMVar)
import qualified Data.ByteString as BS
import Data.ByteString.Builder (Builder)
import qualified Data.ByteString.Builder.Extra as B
Expand Down Expand Up @@ -106,6 +107,12 @@ frameSender ctx@Context{outputQ,controlQ,encodeDynamicTable,outputBufferLimit}
-- called with off == 0
control :: Control -> IO ()
control (CFinish e) = E.throwIO e
control (CGoaway bs mvar) = do
buf <- copyAll [bs] confWriteBuffer
let off = buf `minusPtr` confWriteBuffer
flushN off
putMVar mvar ()
E.throwIO GoAwayIsSent
control (CFrames ms xs) = do
buf <- copyAll xs confWriteBuffer
let off = buf `minusPtr` confWriteBuffer
Expand Down Expand Up @@ -431,7 +438,9 @@ runStreamBuilder buf0 room0 takeQ = loop buf0 room0 0
B.More _ writer -> return (True, total', False, LOne writer)
B.Chunk bs writer -> return (True, total', False, LTwo bs writer)
Just StreamingFlush -> return (True, total, True, LZero)
Just StreamingFinished -> return (False, total, True, LZero)
Just (StreamingFinished dec) -> do
dec
return (False, total, True, LZero)

fillBufStream :: Leftover -> IO (Maybe StreamingChunk) -> DynaNext
fillBufStream leftover0 takeQ buf0 siz0 lim0 = do
Expand Down
6 changes: 4 additions & 2 deletions Network/HTTP2/Arch/Types.hs
Original file line number Diff line number Diff line change
Expand Up @@ -278,12 +278,13 @@ data Next = Next BytesFilled -- payload length

----------------------------------------------------------------

data Control = CFinish HTTP2Error
data Control = CFinish HTTP2Error
| CFrames (Maybe SettingsList) [ByteString]
| CGoaway ByteString (MVar ())

----------------------------------------------------------------

data StreamingChunk = StreamingFinished
data StreamingChunk = StreamingFinished (IO ())
| StreamingFlush
| StreamingBuilder Builder

Expand All @@ -304,6 +305,7 @@ data HTTP2Error =
| StreamErrorIsReceived ErrorCode StreamId
| StreamErrorIsSent ErrorCode StreamId ReasonPhrase
| BadThingHappen E.SomeException
| GoAwayIsSent
deriving (Show, Typeable)

instance E.Exception HTTP2Error
Expand Down
10 changes: 7 additions & 3 deletions Network/HTTP2/Client/Run.hs
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,13 @@ run ClientConfig{..} conf@Config{..} client = do
runSender = frameSender ctx conf mgr
concurrently_ runReceiver runSender
exchangeSettings conf ctx
mvar <- newMVar ()
let runClient = do
x <- client $ sendRequest ctx mgr scheme authority
waitCounter0 mgr
let frame = goawayFrame 0 NoError "graceful closing"
enqueueControl (controlQ ctx) $ CFrames Nothing [frame]
enqueueControl (controlQ ctx) $ CGoaway frame mvar
takeMVar mvar
return x
stopAfter mgr (race runBackgroundThreads runClient) $ \res -> do
closeAllStreams (streamTable ctx) $ either Just (const Nothing) res
Expand Down Expand Up @@ -98,8 +101,9 @@ sendRequest ctx@Context{..} mgr scheme auth (Request req) processResponse = do
writeTBQueue tbq (StreamingBuilder b)
writeTVar tbqNonEmpty True
flush = atomically $ writeTBQueue tbq StreamingFlush
strmbdy unmask push flush
atomically $ writeTBQueue tbq StreamingFinished
finished = atomically $ writeTBQueue tbq $ StreamingFinished (decCounter mgr)
incCounter mgr
strmbdy unmask push flush `finally` finished
atomically $ do
sidOK <- readTVar outputQStreamID
ready <- readTVar tbqNonEmpty
Expand Down
5 changes: 3 additions & 2 deletions Network/HTTP2/Server/Worker.hs
Original file line number Diff line number Diff line change
Expand Up @@ -141,8 +141,9 @@ response wc@WorkerConf{..} mgr th tconf strm (Request req) (Response rsp) pps =
atomically $ writeTBQueue tbq (StreamingBuilder b)
T.resume th
flush = atomically $ writeTBQueue tbq StreamingFlush
strmbdy push flush
atomically $ writeTBQueue tbq StreamingFinished
finished = atomically $ writeTBQueue tbq $ StreamingFinished (decCounter mgr)
incCounter mgr
strmbdy push flush `E.finally` finished
-- Remove the thread's ID from the manager's queue, to ensure the that the
-- manager will not terminate it before we are done. (The thread ID was
-- added implicitly when the worker was spawned by the manager).
Expand Down
Loading