Skip to content

Commit

Permalink
Add timeout on termination child states.
Browse files Browse the repository at this point in the history
This is to handle cases where child states refuse to terminate. Providing a timeout value allows the concurrence to return that long after a termination event (preempt or child completion) has happened. If the argument is ommitted from the init method then the default behaviour (no timeout) is used.
  • Loading branch information
hawesie committed Apr 23, 2015
1 parent 4feb518 commit d403a20
Showing 1 changed file with 37 additions and 7 deletions.
44 changes: 37 additions & 7 deletions smach/src/smach/concurrence.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import threading
import traceback
import copy
import math
from contextlib import contextmanager

import smach
Expand Down Expand Up @@ -51,7 +52,8 @@ def __init__(self,
output_keys = [],
outcome_map = {},
outcome_cb = None,
child_termination_cb = None
child_termination_cb = None,
termination_timeout = 0,
):
"""Constructor for smach Concurrent Split.
Expand Down Expand Up @@ -128,6 +130,11 @@ def __init__(self,
B{NOTE: This callback should be a function ONLY of the outcomes of
the child states. It should not access any other resources.}
@type termination_timeout: number
@param termination_timeout: The number of seconds to wait for a child
state to complete after a preemption request has been received. The
default value of 0 indicates no timeout.
"""
smach.container.Container.__init__(self, outcomes, input_keys, output_keys)

Expand Down Expand Up @@ -177,6 +184,8 @@ def __init__(self,
self._done_cond = threading.Condition()
self._ready_event = threading.Event()

self._termination_timeout = termination_timeout

### Construction methods
@staticmethod
def add(label, state, remapping={}):
Expand Down Expand Up @@ -251,12 +260,33 @@ def execute(self, parent_ud = smach.UserData()):
self._states[label].request_preempt()

# Wait for all states to terminate
while not smach.is_shutdown():
if all([not t.isAlive() for t in self._threads.values()]):
break
self._done_cond.acquire()
self._done_cond.wait(0.1)
self._done_cond.release()

#
if self._termination_timeout > 0:
wait_secs = 0.1
loop_count = 0
loop_count_threshold = math.ceil(self._termination_timeout / wait_secs)
while not smach.is_shutdown() and loop_count < loop_count_threshold:
if all([not t.isAlive() for t in self._threads.values()]):
break
self._done_cond.acquire()
self._done_cond.wait(wait_secs)
self._done_cond.release()
loop_count += 1

if loop_count >= loop_count_threshold:
for l,t in self._threads.iteritems():
if t.isAlive():
smach.logwarn("State '%s' in concurrence did not terminate within timeout." % l)

else:

while not smach.is_shutdown():
if all([not t.isAlive() for t in self._threads.values()]):
break
self._done_cond.acquire()
self._done_cond.wait(0.1)
self._done_cond.release()

# Check for user code exception
if self._user_code_exception:
Expand Down

0 comments on commit d403a20

Please sign in to comment.