Skip to content

Commit

Permalink
CA-404013: replace Thread.delay with Delay module
Browse files Browse the repository at this point in the history
Reporter.cancel would be blocked for a long time by backoff delay when
another thread is waiting for next reading, replace Thread.delay with
Delay module so that Reporter.cancel will not be blocked.

Signed-off-by: Gang Ji <[email protected]>
  • Loading branch information
gangj committed Jan 8, 2025
1 parent dc565fd commit 5efa5d0
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 6 deletions.
36 changes: 31 additions & 5 deletions ocaml/xcp-rrdd/lib/plugin/reporter.ml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
* GNU Lesser General Public License for more details.
*)

module Delay = Xapi_stdext_threads.Threadext.Delay

let with_lock = Xapi_stdext_threads.Threadext.Mutex.execute

(* This exception is setup to be raised on sigint by Process.initialise,
Expand Down Expand Up @@ -57,10 +59,20 @@ type state =
| Cancelled
| Stopped of [`New | `Cancelled | `Failed of exn]

type t = {mutable state: state; lock: Mutex.t; condition: Condition.t}
type t = {
mutable state: state
; lock: Mutex.t
; condition: Condition.t
; delay: Delay.t
}

let make () =
{state= Stopped `New; lock= Mutex.create (); condition= Condition.create ()}
{
state= Stopped `New
; lock= Mutex.create ()
; condition= Condition.create ()
; delay= Delay.make ()
}

let choose_protocol = function
| Rrd_interface.V1 ->
Expand All @@ -69,13 +81,20 @@ let choose_protocol = function
Rrd_protocol_v2.protocol

let wait_until_next_reading (module D : Debug.DEBUG) ~neg_shift ~uid ~protocol
~overdue_count =
~overdue_count ~reporter =
let next_reading = RRDD.Plugin.Local.register uid Rrd.Five_Seconds protocol in
let wait_time = next_reading -. neg_shift in
let wait_time = if wait_time < 0.1 then wait_time +. 5. else wait_time in
(* overdue count - 0 if there is no overdue; +1 if there is overdue *)
if wait_time > 0. then (
Thread.delay wait_time ; 0
( match reporter with
| Some reporter ->
let (_ : bool) = Delay.wait reporter.delay wait_time in
()
| None ->
Thread.delay wait_time
) ;
0
) else (
if overdue_count > 1 then (
(* if register returns negative more than once in a succession,
Expand All @@ -84,7 +103,12 @@ let wait_until_next_reading (module D : Debug.DEBUG) ~neg_shift ~uid ~protocol
D.debug
"rrdd says next reading is overdue, seems like rrdd is busy;\n\
\t\t\t\tBacking off for %.1f seconds" backoff_time ;
Thread.delay backoff_time
match reporter with
| Some reporter ->
let (_ : bool) = Delay.wait reporter.delay backoff_time in
()
| None ->
Thread.delay backoff_time
) else
D.debug "rrdd says next reading is overdue by %.1f seconds; not sleeping"
(-.wait_time) ;
Expand Down Expand Up @@ -147,8 +171,10 @@ let cancel ~reporter =
match reporter.state with
| Running ->
reporter.state <- Cancelled ;
Delay.signal reporter.delay ;
Condition.wait reporter.condition reporter.lock
| Cancelled ->
Delay.signal reporter.delay ;
Condition.wait reporter.condition reporter.lock
| Stopped _ ->
()
Expand Down
2 changes: 1 addition & 1 deletion ocaml/xcp-rrdd/lib/plugin/reporter_local.ml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ let start_local (module D : Debug.DEBUG) ~reporter ~uid ~neg_shift ~page_count
overdue_count :=
wait_until_next_reading
(module D)
~neg_shift ~uid ~protocol ~overdue_count:!overdue_count ;
~neg_shift ~uid ~protocol ~overdue_count:!overdue_count ~reporter ;
if page_count > 0 then
let payload =
Rrd_protocol.{timestamp= Utils.now (); datasources= dss_f ()}
Expand Down

0 comments on commit 5efa5d0

Please sign in to comment.