From 94948b3c04ec412be39f889b86d778a06bf31c63 Mon Sep 17 00:00:00 2001 From: Kazu Yamamoto Date: Mon, 4 Sep 2023 14:16:41 +0900 Subject: [PATCH 1/2] cleaning up the finish of client with synchronization --- Network/HTTP2/Arch/Manager.hs | 34 +++++++++++++++++++++++++--------- Network/HTTP2/Arch/Sender.hs | 7 +++++++ Network/HTTP2/Arch/Types.hs | 4 +++- Network/HTTP2/Client/Run.hs | 12 +++++++++--- 4 files changed, 44 insertions(+), 13 deletions(-) diff --git a/Network/HTTP2/Arch/Manager.hs b/Network/HTTP2/Arch/Manager.hs index d071e863..86c7ad86 100644 --- a/Network/HTTP2/Arch/Manager.hs +++ b/Network/HTTP2/Arch/Manager.hs @@ -16,6 +16,9 @@ module Network.HTTP2.Arch.Manager ( , timeoutKillThread , timeoutClose , KilledByHttp2ThreadPoolManager(..) + , incCounter + , decCounter + , waitCounter0 ) where import Control.Exception @@ -41,7 +44,7 @@ noAction = return () data Command = Stop (Maybe SomeException) | Spawn | Add ThreadId | Delete ThreadId -- | Manager to manage the thread pool and the timer. -data Manager = Manager (TQueue Command) (IORef Action) T.Manager +data Manager = Manager (TQueue Command) (IORef Action) (TVar Int) T.Manager -- | Starting a thread pool manager. -- Its action is initially set to 'return ()' and should be set @@ -51,8 +54,9 @@ start :: T.Manager -> IO Manager start timmgr = do q <- newTQueueIO ref <- newIORef noAction + cnt <- newTVarIO 0 void $ forkIO $ go q Set.empty ref - return $ Manager q ref timmgr + return $ Manager q ref cnt timmgr where go q tset0 ref = do x <- atomically $ readTQueue q @@ -72,11 +76,11 @@ start timmgr = do -- | Setting the action to be spawned. setAction :: Manager -> Action -> IO () -setAction (Manager _ ref _) action = writeIORef ref action +setAction (Manager _ ref _ _) action = writeIORef ref action -- | Stopping the manager. stopAfter :: Manager -> IO a -> (Either SomeException a -> IO b) -> IO b -stopAfter (Manager q _ _) action cleanup = do +stopAfter (Manager q _ _ _) action cleanup = do mask $ \unmask -> do ma <- try $ unmask action atomically $ writeTQueue q $ Stop (either Just (const Nothing) ma) @@ -84,7 +88,7 @@ stopAfter (Manager q _ _) action cleanup = do -- | Spawning the action. spawnAction :: Manager -> IO () -spawnAction (Manager q _ _) = atomically $ writeTQueue q Spawn +spawnAction (Manager q _ _ _) = atomically $ writeTQueue q Spawn ---------------------------------------------------------------- @@ -110,7 +114,7 @@ forkManagedUnmask mgr io = -- -- This is not part of the public API; see 'forkManaged' instead. addMyId :: Manager -> IO () -addMyId (Manager q _ _) = do +addMyId (Manager q _ _ _) = do tid <- myThreadId atomically $ writeTQueue q $ Add tid @@ -120,7 +124,7 @@ addMyId (Manager q _ _) = do -- the manager /before/ the thread terminates (thereby assuming responsibility -- for thread cleanup yourself). deleteMyId :: Manager -> IO () -deleteMyId (Manager q _ _) = do +deleteMyId (Manager q _ _ _) = do tid <- myThreadId atomically $ writeTQueue q $ Delete tid @@ -141,14 +145,14 @@ kill set err = traverse_ (\tid -> E.throwTo tid $ KilledByHttp2ThreadPoolManager -- | Killing the IO action of the second argument on timeout. timeoutKillThread :: Manager -> (T.Handle -> IO ()) -> IO () -timeoutKillThread (Manager _ _ tmgr) action = E.bracket register T.cancel action +timeoutKillThread (Manager _ _ _ tmgr) action = E.bracket register T.cancel action where register = T.registerKillThread tmgr noAction -- | Registering closer for a resource and -- returning a timer refresher. timeoutClose :: Manager -> IO () -> IO (IO ()) -timeoutClose (Manager _ _ tmgr) closer = do +timeoutClose (Manager _ _ _ tmgr) closer = do th <- T.register tmgr closer return $ T.tickle th @@ -159,3 +163,15 @@ instance Exception KilledByHttp2ThreadPoolManager where toException = asyncExceptionToException fromException = asyncExceptionFromException +---------------------------------------------------------------- + +incCounter :: Manager -> IO () +incCounter (Manager _ _ cnt _) = atomically $ modifyTVar' cnt (+1) + +decCounter :: Manager -> IO () +decCounter (Manager _ _ cnt _) = atomically $ modifyTVar' cnt (subtract 1) + +waitCounter0 :: Manager -> IO () +waitCounter0 (Manager _ _ cnt _) = atomically $ do + n <- readTVar cnt + checkSTM (n < 1) diff --git a/Network/HTTP2/Arch/Sender.hs b/Network/HTTP2/Arch/Sender.hs index 782c2be8..9df9d69f 100644 --- a/Network/HTTP2/Arch/Sender.hs +++ b/Network/HTTP2/Arch/Sender.hs @@ -11,6 +11,7 @@ module Network.HTTP2.Arch.Sender ( , runTrailersMaker ) where +import Control.Concurrent.MVar (putMVar) import qualified Data.ByteString as BS import Data.ByteString.Builder (Builder) import qualified Data.ByteString.Builder.Extra as B @@ -106,6 +107,12 @@ frameSender ctx@Context{outputQ,controlQ,encodeDynamicTable,outputBufferLimit} -- called with off == 0 control :: Control -> IO () control (CFinish e) = E.throwIO e + control (CGoaway bs mvar) = do + buf <- copyAll [bs] confWriteBuffer + let off = buf `minusPtr` confWriteBuffer + flushN off + putMVar mvar () + E.throwIO GoAwayIsSent control (CFrames ms xs) = do buf <- copyAll xs confWriteBuffer let off = buf `minusPtr` confWriteBuffer diff --git a/Network/HTTP2/Arch/Types.hs b/Network/HTTP2/Arch/Types.hs index 430af9f7..a8e34833 100644 --- a/Network/HTTP2/Arch/Types.hs +++ b/Network/HTTP2/Arch/Types.hs @@ -278,8 +278,9 @@ data Next = Next BytesFilled -- payload length ---------------------------------------------------------------- -data Control = CFinish HTTP2Error +data Control = CFinish HTTP2Error | CFrames (Maybe SettingsList) [ByteString] + | CGoaway ByteString (MVar ()) ---------------------------------------------------------------- @@ -304,6 +305,7 @@ data HTTP2Error = | StreamErrorIsReceived ErrorCode StreamId | StreamErrorIsSent ErrorCode StreamId ReasonPhrase | BadThingHappen E.SomeException + | GoAwayIsSent deriving (Show, Typeable) instance E.Exception HTTP2Error diff --git a/Network/HTTP2/Client/Run.hs b/Network/HTTP2/Client/Run.hs index d91b4109..821c2dec 100644 --- a/Network/HTTP2/Client/Run.hs +++ b/Network/HTTP2/Client/Run.hs @@ -34,10 +34,13 @@ run ClientConfig{..} conf@Config{..} client = do runSender = frameSender ctx conf mgr concurrently_ runReceiver runSender exchangeSettings conf ctx + mvar <- newMVar () let runClient = do x <- client $ sendRequest ctx mgr scheme authority + waitCounter0 mgr let frame = goawayFrame 0 NoError "graceful closing" - enqueueControl (controlQ ctx) $ CFrames Nothing [frame] + enqueueControl (controlQ ctx) $ CGoaway frame mvar + takeMVar mvar return x stopAfter mgr (race runBackgroundThreads runClient) $ \res -> do closeAllStreams (streamTable ctx) $ either Just (const Nothing) res @@ -98,8 +101,11 @@ sendRequest ctx@Context{..} mgr scheme auth (Request req) processResponse = do writeTBQueue tbq (StreamingBuilder b) writeTVar tbqNonEmpty True flush = atomically $ writeTBQueue tbq StreamingFlush - strmbdy unmask push flush - atomically $ writeTBQueue tbq StreamingFinished + finished = do + atomically $ writeTBQueue tbq StreamingFinished + decCounter mgr + incCounter mgr + strmbdy unmask push flush `finally` finished atomically $ do sidOK <- readTVar outputQStreamID ready <- readTVar tbqNonEmpty From 287e8f695a6604990cd97f67f5630212e0bdc380 Mon Sep 17 00:00:00 2001 From: Kazu Yamamoto Date: Thu, 14 Sep 2023 17:54:33 +0900 Subject: [PATCH 2/2] call decCounter when StreamingFinished is received --- Network/HTTP2/Arch/Sender.hs | 4 +++- Network/HTTP2/Arch/Types.hs | 2 +- Network/HTTP2/Client/Run.hs | 4 +--- Network/HTTP2/Server/Worker.hs | 5 +++-- 4 files changed, 8 insertions(+), 7 deletions(-) diff --git a/Network/HTTP2/Arch/Sender.hs b/Network/HTTP2/Arch/Sender.hs index 9df9d69f..5fc6792c 100644 --- a/Network/HTTP2/Arch/Sender.hs +++ b/Network/HTTP2/Arch/Sender.hs @@ -438,7 +438,9 @@ runStreamBuilder buf0 room0 takeQ = loop buf0 room0 0 B.More _ writer -> return (True, total', False, LOne writer) B.Chunk bs writer -> return (True, total', False, LTwo bs writer) Just StreamingFlush -> return (True, total, True, LZero) - Just StreamingFinished -> return (False, total, True, LZero) + Just (StreamingFinished dec) -> do + dec + return (False, total, True, LZero) fillBufStream :: Leftover -> IO (Maybe StreamingChunk) -> DynaNext fillBufStream leftover0 takeQ buf0 siz0 lim0 = do diff --git a/Network/HTTP2/Arch/Types.hs b/Network/HTTP2/Arch/Types.hs index a8e34833..06b8635a 100644 --- a/Network/HTTP2/Arch/Types.hs +++ b/Network/HTTP2/Arch/Types.hs @@ -284,7 +284,7 @@ data Control = CFinish HTTP2Error ---------------------------------------------------------------- -data StreamingChunk = StreamingFinished +data StreamingChunk = StreamingFinished (IO ()) | StreamingFlush | StreamingBuilder Builder diff --git a/Network/HTTP2/Client/Run.hs b/Network/HTTP2/Client/Run.hs index 821c2dec..2c4f2c24 100644 --- a/Network/HTTP2/Client/Run.hs +++ b/Network/HTTP2/Client/Run.hs @@ -101,9 +101,7 @@ sendRequest ctx@Context{..} mgr scheme auth (Request req) processResponse = do writeTBQueue tbq (StreamingBuilder b) writeTVar tbqNonEmpty True flush = atomically $ writeTBQueue tbq StreamingFlush - finished = do - atomically $ writeTBQueue tbq StreamingFinished - decCounter mgr + finished = atomically $ writeTBQueue tbq $ StreamingFinished (decCounter mgr) incCounter mgr strmbdy unmask push flush `finally` finished atomically $ do diff --git a/Network/HTTP2/Server/Worker.hs b/Network/HTTP2/Server/Worker.hs index 3d031a3c..7b3b4f09 100644 --- a/Network/HTTP2/Server/Worker.hs +++ b/Network/HTTP2/Server/Worker.hs @@ -141,8 +141,9 @@ response wc@WorkerConf{..} mgr th tconf strm (Request req) (Response rsp) pps = atomically $ writeTBQueue tbq (StreamingBuilder b) T.resume th flush = atomically $ writeTBQueue tbq StreamingFlush - strmbdy push flush - atomically $ writeTBQueue tbq StreamingFinished + finished = atomically $ writeTBQueue tbq $ StreamingFinished (decCounter mgr) + incCounter mgr + strmbdy push flush `E.finally` finished -- Remove the thread's ID from the manager's queue, to ensure the that the -- manager will not terminate it before we are done. (The thread ID was -- added implicitly when the worker was spawned by the manager).