From d403a204702f0f1cf9f1c25af41d4a451b5dc4c0 Mon Sep 17 00:00:00 2001 From: Nick Hawes Date: Thu, 23 Apr 2015 15:43:30 +0200 Subject: [PATCH] Add timeout on termination child states. This is to handle cases where child states refuse to terminate. Providing a timeout value allows the concurrence to return that long after a termination event (preempt or child completion) has happened. If the argument is ommitted from the init method then the default behaviour (no timeout) is used. --- smach/src/smach/concurrence.py | 44 ++++++++++++++++++++++++++++------ 1 file changed, 37 insertions(+), 7 deletions(-) diff --git a/smach/src/smach/concurrence.py b/smach/src/smach/concurrence.py index 3e4c7b6..a337b01 100644 --- a/smach/src/smach/concurrence.py +++ b/smach/src/smach/concurrence.py @@ -1,6 +1,7 @@ import threading import traceback import copy +import math from contextlib import contextmanager import smach @@ -51,7 +52,8 @@ def __init__(self, output_keys = [], outcome_map = {}, outcome_cb = None, - child_termination_cb = None + child_termination_cb = None, + termination_timeout = 0, ): """Constructor for smach Concurrent Split. @@ -128,6 +130,11 @@ def __init__(self, B{NOTE: This callback should be a function ONLY of the outcomes of the child states. It should not access any other resources.} + @type termination_timeout: number + @param termination_timeout: The number of seconds to wait for a child + state to complete after a preemption request has been received. The + default value of 0 indicates no timeout. + """ smach.container.Container.__init__(self, outcomes, input_keys, output_keys) @@ -177,6 +184,8 @@ def __init__(self, self._done_cond = threading.Condition() self._ready_event = threading.Event() + self._termination_timeout = termination_timeout + ### Construction methods @staticmethod def add(label, state, remapping={}): @@ -251,12 +260,33 @@ def execute(self, parent_ud = smach.UserData()): self._states[label].request_preempt() # Wait for all states to terminate - while not smach.is_shutdown(): - if all([not t.isAlive() for t in self._threads.values()]): - break - self._done_cond.acquire() - self._done_cond.wait(0.1) - self._done_cond.release() + + # + if self._termination_timeout > 0: + wait_secs = 0.1 + loop_count = 0 + loop_count_threshold = math.ceil(self._termination_timeout / wait_secs) + while not smach.is_shutdown() and loop_count < loop_count_threshold: + if all([not t.isAlive() for t in self._threads.values()]): + break + self._done_cond.acquire() + self._done_cond.wait(wait_secs) + self._done_cond.release() + loop_count += 1 + + if loop_count >= loop_count_threshold: + for l,t in self._threads.iteritems(): + if t.isAlive(): + smach.logwarn("State '%s' in concurrence did not terminate within timeout." % l) + + else: + + while not smach.is_shutdown(): + if all([not t.isAlive() for t in self._threads.values()]): + break + self._done_cond.acquire() + self._done_cond.wait(0.1) + self._done_cond.release() # Check for user code exception if self._user_code_exception: