From 5efa5d066837f995ef1494fb0ebae742f86698b6 Mon Sep 17 00:00:00 2001 From: Gang Ji Date: Tue, 7 Jan 2025 19:27:42 +0800 Subject: [PATCH] CA-404013: replace Thread.delay with Delay module Reporter.cancel would be blocked for a long time by backoff delay when another thread is waiting for next reading, replace Thread.delay with Delay module so that Reporter.cancel will not be blocked. Signed-off-by: Gang Ji --- ocaml/xcp-rrdd/lib/plugin/reporter.ml | 36 ++++++++++++++++++--- ocaml/xcp-rrdd/lib/plugin/reporter_local.ml | 2 +- 2 files changed, 32 insertions(+), 6 deletions(-) diff --git a/ocaml/xcp-rrdd/lib/plugin/reporter.ml b/ocaml/xcp-rrdd/lib/plugin/reporter.ml index 683af78b243..b7c9c018cbd 100644 --- a/ocaml/xcp-rrdd/lib/plugin/reporter.ml +++ b/ocaml/xcp-rrdd/lib/plugin/reporter.ml @@ -12,6 +12,8 @@ * GNU Lesser General Public License for more details. *) +module Delay = Xapi_stdext_threads.Threadext.Delay + let with_lock = Xapi_stdext_threads.Threadext.Mutex.execute (* This exception is setup to be raised on sigint by Process.initialise, @@ -57,10 +59,20 @@ type state = | Cancelled | Stopped of [`New | `Cancelled | `Failed of exn] -type t = {mutable state: state; lock: Mutex.t; condition: Condition.t} +type t = { + mutable state: state + ; lock: Mutex.t + ; condition: Condition.t + ; delay: Delay.t +} let make () = - {state= Stopped `New; lock= Mutex.create (); condition= Condition.create ()} + { + state= Stopped `New + ; lock= Mutex.create () + ; condition= Condition.create () + ; delay= Delay.make () + } let choose_protocol = function | Rrd_interface.V1 -> @@ -69,13 +81,20 @@ let choose_protocol = function Rrd_protocol_v2.protocol let wait_until_next_reading (module D : Debug.DEBUG) ~neg_shift ~uid ~protocol - ~overdue_count = + ~overdue_count ~reporter = let next_reading = RRDD.Plugin.Local.register uid Rrd.Five_Seconds protocol in let wait_time = next_reading -. neg_shift in let wait_time = if wait_time < 0.1 then wait_time +. 5. else wait_time in (* overdue count - 0 if there is no overdue; +1 if there is overdue *) if wait_time > 0. then ( - Thread.delay wait_time ; 0 + ( match reporter with + | Some reporter -> + let (_ : bool) = Delay.wait reporter.delay wait_time in + () + | None -> + Thread.delay wait_time + ) ; + 0 ) else ( if overdue_count > 1 then ( (* if register returns negative more than once in a succession, @@ -84,7 +103,12 @@ let wait_until_next_reading (module D : Debug.DEBUG) ~neg_shift ~uid ~protocol D.debug "rrdd says next reading is overdue, seems like rrdd is busy;\n\ \t\t\t\tBacking off for %.1f seconds" backoff_time ; - Thread.delay backoff_time + match reporter with + | Some reporter -> + let (_ : bool) = Delay.wait reporter.delay backoff_time in + () + | None -> + Thread.delay backoff_time ) else D.debug "rrdd says next reading is overdue by %.1f seconds; not sleeping" (-.wait_time) ; @@ -147,8 +171,10 @@ let cancel ~reporter = match reporter.state with | Running -> reporter.state <- Cancelled ; + Delay.signal reporter.delay ; Condition.wait reporter.condition reporter.lock | Cancelled -> + Delay.signal reporter.delay ; Condition.wait reporter.condition reporter.lock | Stopped _ -> () diff --git a/ocaml/xcp-rrdd/lib/plugin/reporter_local.ml b/ocaml/xcp-rrdd/lib/plugin/reporter_local.ml index 42955b5ae1f..fec90d1dc8e 100644 --- a/ocaml/xcp-rrdd/lib/plugin/reporter_local.ml +++ b/ocaml/xcp-rrdd/lib/plugin/reporter_local.ml @@ -28,7 +28,7 @@ let start_local (module D : Debug.DEBUG) ~reporter ~uid ~neg_shift ~page_count overdue_count := wait_until_next_reading (module D) - ~neg_shift ~uid ~protocol ~overdue_count:!overdue_count ; + ~neg_shift ~uid ~protocol ~overdue_count:!overdue_count ~reporter ; if page_count > 0 then let payload = Rrd_protocol.{timestamp= Utils.now (); datasources= dss_f ()}