Skip to content

Commit

Permalink
Add support for spin_until_timeout (take 2)
Browse files Browse the repository at this point in the history
Co-authored-by: Hubert Liberacki <[email protected]>
Co-authored-by: Tomoya Fujita <[email protected]>
Signed-off-by: Hubert Liberacki <[email protected]>
Signed-off-by: Tomoya Fujita <[email protected]>
Signed-off-by: Christophe Bedard <[email protected]>
  • Loading branch information
3 people committed Apr 2, 2024
1 parent f9c4894 commit 22223f1
Show file tree
Hide file tree
Showing 29 changed files with 468 additions and 322 deletions.
6 changes: 3 additions & 3 deletions rclcpp/include/rclcpp/client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -608,11 +608,11 @@ class Client : public ClientBase
/// Send a request to the service server.
/**
* This method returns a `FutureAndRequestId` instance
* that can be passed to Executor::spin_until_future_complete() to
* that can be passed to Executor::spin_until_complete() to
* wait until it has been completed.
*
* If the future never completes,
* e.g. the call to Executor::spin_until_future_complete() times out,
* e.g. the call to Executor::spin_until_complete() times out,
* Client::remove_pending_request() must be called to clean the client internal state.
* Not doing so will make the `Client` instance to use more memory each time a response is not
* received from the service server.
Expand All @@ -621,7 +621,7 @@ class Client : public ClientBase
* auto future = client->async_send_request(my_request);
* if (
* rclcpp::FutureReturnCode::TIMEOUT ==
* executor->spin_until_future_complete(future, timeout))
* executor->spin_until_complete(future, timeout))
* {
* client->remove_pending_request(future);
* // handle timeout
Expand Down
143 changes: 97 additions & 46 deletions rclcpp/include/rclcpp/executor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include <memory>
#include <mutex>
#include <string>
#include <type_traits>
#include <vector>

#include "rcl/guard_condition.h"
Expand Down Expand Up @@ -350,6 +351,51 @@ class Executor
virtual void
spin_once(std::chrono::nanoseconds timeout = std::chrono::nanoseconds(-1));

/// Spin (blocking) until the condition is complete, it times out waiting,
/// or rclcpp is interrupted.
/**
* \param[in] future The condition which can be callable or future type to wait on.
* If this function returns SUCCESS, the future can be
* accessed without blocking (though it may still throw an exception).
* \param[in] timeout Optional timeout parameter, which gets passed to Executor::spin_node_once.
* `-1` is block forever, `0` is non-blocking.
* If the time spent inside the blocking loop exceeds this timeout, return a TIMEOUT return
* code.
* \return The return code, one of `SUCCESS`, `INTERRUPTED`, or `TIMEOUT`.
*/
template<typename ConditionT, typename DurationT = std::chrono::milliseconds>
FutureReturnCode
spin_until_complete(
const ConditionT & condition,
DurationT timeout = DurationT(-1))
{
if constexpr (std::is_invocable_v<ConditionT>) {
using RetT = std::invoke_result_t<ConditionT>;
static_assert(
std::is_same_v<bool, RetT>,
"Conditional callable has to return boolean type");
return spin_until_complete_impl(condition, timeout);
} else {
auto check_future = [&condition]() {
return condition.wait_for(std::chrono::seconds(0)) ==
std::future_status::ready;
};
return spin_until_complete_impl(check_future, timeout);
}
}

/// Spin (blocking) for at least the given amount of duration.
/**
* \param[in] duration gets passed to Executor::spin_node_once,
* spins the executor for given duration.
*/
template<typename DurationT>
void
spin_for(DurationT duration)
{
(void)spin_until_complete([]() {return false;}, duration);
}

/// Spin (blocking) until the future is complete, it times out waiting, or rclcpp is interrupted.
/**
* \param[in] future The future to wait on. If this function returns SUCCESS, the future can be
Expand All @@ -361,57 +407,13 @@ class Executor
* \return The return code, one of `SUCCESS`, `INTERRUPTED`, or `TIMEOUT`.
*/
template<typename FutureT, typename TimeRepT = int64_t, typename TimeT = std::milli>
[[deprecated("use spin_until_complete(const ConditionT & condition, DurationT timeout) instead")]]
FutureReturnCode
spin_until_future_complete(
const FutureT & future,
std::chrono::duration<TimeRepT, TimeT> timeout = std::chrono::duration<TimeRepT, TimeT>(-1))
{
// TODO(wjwwood): does not work recursively; can't call spin_node_until_future_complete
// inside a callback executed by an executor.

// Check the future before entering the while loop.
// If the future is already complete, don't try to spin.
std::future_status status = future.wait_for(std::chrono::seconds(0));
if (status == std::future_status::ready) {
return FutureReturnCode::SUCCESS;
}

auto end_time = std::chrono::steady_clock::now();
std::chrono::nanoseconds timeout_ns = std::chrono::duration_cast<std::chrono::nanoseconds>(
timeout);
if (timeout_ns > std::chrono::nanoseconds::zero()) {
end_time += timeout_ns;
}
std::chrono::nanoseconds timeout_left = timeout_ns;

if (spinning.exchange(true)) {
throw std::runtime_error("spin_until_future_complete() called while already spinning");
}
RCPPUTILS_SCOPE_EXIT(this->spinning.store(false); );
while (rclcpp::ok(this->context_) && spinning.load()) {
// Do one item of work.
spin_once_impl(timeout_left);

// Check if the future is set, return SUCCESS if it is.
status = future.wait_for(std::chrono::seconds(0));
if (status == std::future_status::ready) {
return FutureReturnCode::SUCCESS;
}
// If the original timeout is < 0, then this is blocking, never TIMEOUT.
if (timeout_ns < std::chrono::nanoseconds::zero()) {
continue;
}
// Otherwise check if we still have time to wait, return TIMEOUT if not.
auto now = std::chrono::steady_clock::now();
if (now >= end_time) {
return FutureReturnCode::TIMEOUT;
}
// Subtract the elapsed time from the original timeout.
timeout_left = std::chrono::duration_cast<std::chrono::nanoseconds>(end_time - now);
}

// The future did not complete before ok() returned false, return INTERRUPTED.
return FutureReturnCode::INTERRUPTED;
return spin_until_complete(future, timeout);
}

/// Cancel any running spin* function, causing it to return.
Expand Down Expand Up @@ -570,6 +572,55 @@ class Executor
virtual void
spin_once_impl(std::chrono::nanoseconds timeout);

protected:
// Implementation details, used by spin_until_complete and spin_for.
// Previous implementation of spin_until_future_complete.
template<typename ConditionT, typename DurationT>
FutureReturnCode
spin_until_complete_impl(ConditionT condition, DurationT timeout)
{
auto end_time = std::chrono::steady_clock::now();
std::chrono::nanoseconds timeout_ns = std::chrono::duration_cast<std::chrono::nanoseconds>(
timeout);
if (timeout_ns > std::chrono::nanoseconds::zero()) {
end_time += timeout_ns;
}
std::chrono::nanoseconds timeout_left = timeout_ns;

// Preliminary check, finish if conditon is done already.
if (condition()) {
return FutureReturnCode::SUCCESS;
}

if (spinning.exchange(true)) {
throw std::runtime_error("spin_until_complete() called while already spinning");
}
RCPPUTILS_SCOPE_EXIT(this->spinning.store(false); );
while (rclcpp::ok(this->context_) && spinning.load()) {
// Do one item of work.
spin_once_impl(timeout_left);

if (condition()) {
return FutureReturnCode::SUCCESS;
}
// If the original timeout is < 0, then this is blocking, never TIMEOUT.
if (timeout_ns < std::chrono::nanoseconds::zero()) {
continue;
}
// Otherwise check if we still have time to wait, return TIMEOUT if not.
auto now = std::chrono::steady_clock::now();
if (now >= end_time) {
return FutureReturnCode::TIMEOUT;
}
// Subtract the elapsed time from the original timeout.
timeout_left = std::chrono::duration_cast<std::chrono::nanoseconds>(end_time - now);
}

// The condition did not pass before ok() returned false, return INTERRUPTED.
return FutureReturnCode::INTERRUPTED;
}

public:
/// Waitable containing guard conditions controlling the executor flow.
/**
* This waitable contains the interrupt and shutdown guard condition, as well
Expand Down
95 changes: 86 additions & 9 deletions rclcpp/include/rclcpp/executors.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,50 @@ namespace executors
using rclcpp::executors::MultiThreadedExecutor;
using rclcpp::executors::SingleThreadedExecutor;

/// Spin (blocking) until the conditon is complete, it times out waiting, or rclcpp is interrupted.
/**
* \param[in] executor The executor which will spin the node.
* \param[in] node_ptr The node to spin.
* \param[in] condition The callable or future to wait on. If `SUCCESS`, the condition is safe to
* access after this function
* \param[in] timeout Optional timeout parameter, which gets passed to
* Executor::spin_node_once.
* `-1` is block forever, `0` is non-blocking.
* If the time spent inside the blocking loop exceeds this timeout, return a `TIMEOUT` return code.
* \return The return code, one of `SUCCESS`, `INTERRUPTED`, or `TIMEOUT`.
*/
template<typename ConditionT, typename DurationT = std::chrono::milliseconds>
rclcpp::FutureReturnCode
spin_node_until_complete(
rclcpp::Executor & executor,
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr,
const ConditionT & condition,
DurationT timeout = DurationT(-1))
{
// TODO(wjwwood): does not work recursively; can't call spin_node_until_complete
// inside a callback executed by an executor.
executor.add_node(node_ptr);
auto retcode = executor.spin_until_complete(condition, timeout);
executor.remove_node(node_ptr);
return retcode;
}

template<typename NodeT = rclcpp::Node, typename ConditionT,
typename DurationT = std::chrono::milliseconds>
rclcpp::FutureReturnCode
spin_node_until_complete(
rclcpp::Executor & executor,
std::shared_ptr<NodeT> node_ptr,
const ConditionT & condition,
DurationT timeout = DurationT(-1))
{
return rclcpp::executors::spin_node_until_complete(
executor,
node_ptr->get_node_base_interface(),
condition,
timeout);
}

/// Spin (blocking) until the future is complete, it times out waiting, or rclcpp is interrupted.
/**
* \param[in] executor The executor which will spin the node.
Expand All @@ -80,31 +124,34 @@ using rclcpp::executors::SingleThreadedExecutor;
* \return The return code, one of `SUCCESS`, `INTERRUPTED`, or `TIMEOUT`.
*/
template<typename FutureT, typename TimeRepT = int64_t, typename TimeT = std::milli>
[[deprecated(
"use spin_node_until_complete(Executor &, node_interfaces::NodeBaseInterface::SharedPtr, "
"const ConditionT &, DurationT) instead"
)]]
rclcpp::FutureReturnCode
spin_node_until_future_complete(
rclcpp::Executor & executor,
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr,
const FutureT & future,
std::chrono::duration<TimeRepT, TimeT> timeout = std::chrono::duration<TimeRepT, TimeT>(-1))
{
// TODO(wjwwood): does not work recursively; can't call spin_node_until_future_complete
// inside a callback executed by an executor.
executor.add_node(node_ptr);
auto retcode = executor.spin_until_future_complete(future, timeout);
executor.remove_node(node_ptr);
return retcode;
return spin_until_complete(executor, node_ptr, future, timeout);
}

template<typename NodeT = rclcpp::Node, typename FutureT, typename TimeRepT = int64_t,
typename TimeT = std::milli>
[[deprecated(
"use spin_node_until_complete(Executor &, std::shared_ptr<NodeT>, "
"const ConditionT &, DurationT) instead"
)]]
rclcpp::FutureReturnCode
spin_node_until_future_complete(
rclcpp::Executor & executor,
std::shared_ptr<NodeT> node_ptr,
const FutureT & future,
std::chrono::duration<TimeRepT, TimeT> timeout = std::chrono::duration<TimeRepT, TimeT>(-1))
{
return rclcpp::executors::spin_node_until_future_complete(
return rclcpp::executors::spin_node_until_complete(
executor,
node_ptr->get_node_base_interface(),
future,
Expand All @@ -113,26 +160,56 @@ spin_node_until_future_complete(

} // namespace executors

template<typename ConditionT, typename DurationT = std::chrono::milliseconds>
rclcpp::FutureReturnCode
spin_until_complete(
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr,
const ConditionT & condition,
DurationT timeout = DurationT(-1))
{
rclcpp::executors::SingleThreadedExecutor executor;
return executors::spin_node_until_complete<ConditionT>(executor, node_ptr, condition, timeout);
}

template<typename NodeT = rclcpp::Node, typename ConditionT,
typename DurationT = std::chrono::milliseconds>
rclcpp::FutureReturnCode
spin_until_complete(
std::shared_ptr<NodeT> node_ptr,
const ConditionT & condition,
DurationT timeout = DurationT(-1))
{
return rclcpp::spin_until_complete(node_ptr->get_node_base_interface(), condition, timeout);
}

template<typename FutureT, typename TimeRepT = int64_t, typename TimeT = std::milli>
[[deprecated(
"use spin_until_complete(node_interfaces::NodeBaseInterface::SharedPtr, "
"const ConditionT &,DurationT) instead"
)]]
rclcpp::FutureReturnCode
spin_until_future_complete(
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr,
const FutureT & future,
std::chrono::duration<TimeRepT, TimeT> timeout = std::chrono::duration<TimeRepT, TimeT>(-1))
{
rclcpp::executors::SingleThreadedExecutor executor;
return executors::spin_node_until_future_complete<FutureT>(executor, node_ptr, future, timeout);
return executors::spin_node_until_complete<FutureT>(executor, node_ptr, future, timeout);
}

template<typename NodeT = rclcpp::Node, typename FutureT, typename TimeRepT = int64_t,
typename TimeT = std::milli>
[[deprecated(
"use spin_until_complete(std::shared_ptr<NodeT>, const ConditionT &, "
"DurationT) instead"
)]]
rclcpp::FutureReturnCode
spin_until_future_complete(
std::shared_ptr<NodeT> node_ptr,
const FutureT & future,
std::chrono::duration<TimeRepT, TimeT> timeout = std::chrono::duration<TimeRepT, TimeT>(-1))
{
return rclcpp::spin_until_future_complete(node_ptr->get_node_base_interface(), future, timeout);
return rclcpp::spin_until_complete(node_ptr->get_node_base_interface(), future, timeout);
}

} // namespace rclcpp
Expand Down
2 changes: 1 addition & 1 deletion rclcpp/include/rclcpp/future_return_code.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
namespace rclcpp
{

/// Return codes to be used with spin_until_future_complete.
/// Return codes to be used with spin_until_complete.
/**
* SUCCESS: The future is complete and can be accessed with "get" without blocking.
* This does not indicate that the operation succeeded; "get" may still throw an exception.
Expand Down
6 changes: 3 additions & 3 deletions rclcpp/include/rclcpp/generic_client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,11 +100,11 @@ class GenericClient : public ClientBase
/// Send a request to the service server.
/**
* This method returns a `FutureAndRequestId` instance
* that can be passed to Executor::spin_until_future_complete() to
* that can be passed to Executor::spin_until_complete() to
* wait until it has been completed.
*
* If the future never completes,
* e.g. the call to Executor::spin_until_future_complete() times out,
* e.g. the call to Executor::spin_until_complete() times out,
* GenericClient::remove_pending_request() must be called to clean the client internal state.
* Not doing so will make the `Client` instance to use more memory each time a response is not
* received from the service server.
Expand All @@ -113,7 +113,7 @@ class GenericClient : public ClientBase
* auto future = client->async_send_request(my_request);
* if (
* rclcpp::FutureReturnCode::TIMEOUT ==
* executor->spin_until_future_complete(future, timeout))
* executor->spin_until_complete(future, timeout))
* {
* client->remove_pending_request(future);
* // handle timeout
Expand Down
2 changes: 1 addition & 1 deletion rclcpp/include/rclcpp/rclcpp.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@
* - Executors (responsible for execution of callbacks through a blocking spin):
* - rclcpp::spin()
* - rclcpp::spin_some()
* - rclcpp::spin_until_future_complete()
* - rclcpp::spin_until_complete()
* - rclcpp::executors::SingleThreadedExecutor
* - rclcpp::executors::SingleThreadedExecutor::add_node()
* - rclcpp::executors::SingleThreadedExecutor::spin()
Expand Down
Loading

0 comments on commit 22223f1

Please sign in to comment.