Skip to content

Commit

Permalink
Merge pull request #89 from kazu-yamamoto/client-sync-threads
Browse files Browse the repository at this point in the history
Client sync threads
  • Loading branch information
kazu-yamamoto authored Sep 14, 2023
2 parents 560f26d + 287e8f6 commit 6d7636f
Show file tree
Hide file tree
Showing 5 changed files with 49 additions and 17 deletions.
34 changes: 25 additions & 9 deletions Network/HTTP2/Arch/Manager.hs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ module Network.HTTP2.Arch.Manager (
, timeoutKillThread
, timeoutClose
, KilledByHttp2ThreadPoolManager(..)
, incCounter
, decCounter
, waitCounter0
) where

import Control.Exception
Expand All @@ -41,7 +44,7 @@ noAction = return ()
data Command = Stop (Maybe SomeException) | Spawn | Add ThreadId | Delete ThreadId

-- | Manager to manage the thread pool and the timer.
data Manager = Manager (TQueue Command) (IORef Action) T.Manager
data Manager = Manager (TQueue Command) (IORef Action) (TVar Int) T.Manager

-- | Starting a thread pool manager.
-- Its action is initially set to 'return ()' and should be set
Expand All @@ -51,8 +54,9 @@ start :: T.Manager -> IO Manager
start timmgr = do
q <- newTQueueIO
ref <- newIORef noAction
cnt <- newTVarIO 0
void $ forkIO $ go q Set.empty ref
return $ Manager q ref timmgr
return $ Manager q ref cnt timmgr
where
go q tset0 ref = do
x <- atomically $ readTQueue q
Expand All @@ -72,19 +76,19 @@ start timmgr = do

-- | Setting the action to be spawned.
setAction :: Manager -> Action -> IO ()
setAction (Manager _ ref _) action = writeIORef ref action
setAction (Manager _ ref _ _) action = writeIORef ref action

-- | Stopping the manager.
stopAfter :: Manager -> IO a -> (Either SomeException a -> IO b) -> IO b
stopAfter (Manager q _ _) action cleanup = do
stopAfter (Manager q _ _ _) action cleanup = do
mask $ \unmask -> do
ma <- try $ unmask action
atomically $ writeTQueue q $ Stop (either Just (const Nothing) ma)
cleanup ma

-- | Spawning the action.
spawnAction :: Manager -> IO ()
spawnAction (Manager q _ _) = atomically $ writeTQueue q Spawn
spawnAction (Manager q _ _ _) = atomically $ writeTQueue q Spawn

----------------------------------------------------------------

Expand All @@ -110,7 +114,7 @@ forkManagedUnmask mgr io =
--
-- This is not part of the public API; see 'forkManaged' instead.
addMyId :: Manager -> IO ()
addMyId (Manager q _ _) = do
addMyId (Manager q _ _ _) = do
tid <- myThreadId
atomically $ writeTQueue q $ Add tid

Expand All @@ -120,7 +124,7 @@ addMyId (Manager q _ _) = do
-- the manager /before/ the thread terminates (thereby assuming responsibility
-- for thread cleanup yourself).
deleteMyId :: Manager -> IO ()
deleteMyId (Manager q _ _) = do
deleteMyId (Manager q _ _ _) = do
tid <- myThreadId
atomically $ writeTQueue q $ Delete tid

Expand All @@ -141,14 +145,14 @@ kill set err = traverse_ (\tid -> E.throwTo tid $ KilledByHttp2ThreadPoolManager

-- | Killing the IO action of the second argument on timeout.
timeoutKillThread :: Manager -> (T.Handle -> IO ()) -> IO ()
timeoutKillThread (Manager _ _ tmgr) action = E.bracket register T.cancel action
timeoutKillThread (Manager _ _ _ tmgr) action = E.bracket register T.cancel action
where
register = T.registerKillThread tmgr noAction

-- | Registering closer for a resource and
-- returning a timer refresher.
timeoutClose :: Manager -> IO () -> IO (IO ())
timeoutClose (Manager _ _ tmgr) closer = do
timeoutClose (Manager _ _ _ tmgr) closer = do
th <- T.register tmgr closer
return $ T.tickle th

Expand All @@ -159,3 +163,15 @@ instance Exception KilledByHttp2ThreadPoolManager where
toException = asyncExceptionToException
fromException = asyncExceptionFromException

----------------------------------------------------------------

incCounter :: Manager -> IO ()
incCounter (Manager _ _ cnt _) = atomically $ modifyTVar' cnt (+1)

decCounter :: Manager -> IO ()
decCounter (Manager _ _ cnt _) = atomically $ modifyTVar' cnt (subtract 1)

waitCounter0 :: Manager -> IO ()
waitCounter0 (Manager _ _ cnt _) = atomically $ do
n <- readTVar cnt
checkSTM (n < 1)
11 changes: 10 additions & 1 deletion Network/HTTP2/Arch/Sender.hs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ module Network.HTTP2.Arch.Sender (
, runTrailersMaker
) where

import Control.Concurrent.MVar (putMVar)
import qualified Data.ByteString as BS
import Data.ByteString.Builder (Builder)
import qualified Data.ByteString.Builder.Extra as B
Expand Down Expand Up @@ -106,6 +107,12 @@ frameSender ctx@Context{outputQ,controlQ,encodeDynamicTable,outputBufferLimit}
-- called with off == 0
control :: Control -> IO ()
control (CFinish e) = E.throwIO e
control (CGoaway bs mvar) = do
buf <- copyAll [bs] confWriteBuffer
let off = buf `minusPtr` confWriteBuffer
flushN off
putMVar mvar ()
E.throwIO GoAwayIsSent
control (CFrames ms xs) = do
buf <- copyAll xs confWriteBuffer
let off = buf `minusPtr` confWriteBuffer
Expand Down Expand Up @@ -431,7 +438,9 @@ runStreamBuilder buf0 room0 takeQ = loop buf0 room0 0
B.More _ writer -> return (True, total', False, LOne writer)
B.Chunk bs writer -> return (True, total', False, LTwo bs writer)
Just StreamingFlush -> return (True, total, True, LZero)
Just StreamingFinished -> return (False, total, True, LZero)
Just (StreamingFinished dec) -> do
dec
return (False, total, True, LZero)

fillBufStream :: Leftover -> IO (Maybe StreamingChunk) -> DynaNext
fillBufStream leftover0 takeQ buf0 siz0 lim0 = do
Expand Down
6 changes: 4 additions & 2 deletions Network/HTTP2/Arch/Types.hs
Original file line number Diff line number Diff line change
Expand Up @@ -278,12 +278,13 @@ data Next = Next BytesFilled -- payload length

----------------------------------------------------------------

data Control = CFinish HTTP2Error
data Control = CFinish HTTP2Error
| CFrames (Maybe SettingsList) [ByteString]
| CGoaway ByteString (MVar ())

----------------------------------------------------------------

data StreamingChunk = StreamingFinished
data StreamingChunk = StreamingFinished (IO ())
| StreamingFlush
| StreamingBuilder Builder

Expand All @@ -304,6 +305,7 @@ data HTTP2Error =
| StreamErrorIsReceived ErrorCode StreamId
| StreamErrorIsSent ErrorCode StreamId ReasonPhrase
| BadThingHappen E.SomeException
| GoAwayIsSent
deriving (Show, Typeable)

instance E.Exception HTTP2Error
Expand Down
10 changes: 7 additions & 3 deletions Network/HTTP2/Client/Run.hs
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,13 @@ run ClientConfig{..} conf@Config{..} client = do
runSender = frameSender ctx conf mgr
concurrently_ runReceiver runSender
exchangeSettings conf ctx
mvar <- newMVar ()
let runClient = do
x <- client $ sendRequest ctx mgr scheme authority
waitCounter0 mgr
let frame = goawayFrame 0 NoError "graceful closing"
enqueueControl (controlQ ctx) $ CFrames Nothing [frame]
enqueueControl (controlQ ctx) $ CGoaway frame mvar
takeMVar mvar
return x
stopAfter mgr (race runBackgroundThreads runClient) $ \res -> do
closeAllStreams (streamTable ctx) $ either Just (const Nothing) res
Expand Down Expand Up @@ -98,8 +101,9 @@ sendRequest ctx@Context{..} mgr scheme auth (Request req) processResponse = do
writeTBQueue tbq (StreamingBuilder b)
writeTVar tbqNonEmpty True
flush = atomically $ writeTBQueue tbq StreamingFlush
strmbdy unmask push flush
atomically $ writeTBQueue tbq StreamingFinished
finished = atomically $ writeTBQueue tbq $ StreamingFinished (decCounter mgr)
incCounter mgr
strmbdy unmask push flush `finally` finished
atomically $ do
sidOK <- readTVar outputQStreamID
ready <- readTVar tbqNonEmpty
Expand Down
5 changes: 3 additions & 2 deletions Network/HTTP2/Server/Worker.hs
Original file line number Diff line number Diff line change
Expand Up @@ -141,8 +141,9 @@ response wc@WorkerConf{..} mgr th tconf strm (Request req) (Response rsp) pps =
atomically $ writeTBQueue tbq (StreamingBuilder b)
T.resume th
flush = atomically $ writeTBQueue tbq StreamingFlush
strmbdy push flush
atomically $ writeTBQueue tbq StreamingFinished
finished = atomically $ writeTBQueue tbq $ StreamingFinished (decCounter mgr)
incCounter mgr
strmbdy push flush `E.finally` finished
-- Remove the thread's ID from the manager's queue, to ensure the that the
-- manager will not terminate it before we are done. (The thread ID was
-- added implicitly when the worker was spawned by the manager).
Expand Down

0 comments on commit 6d7636f

Please sign in to comment.