Add spin_once and spin_until_future_complete to Executor

This commit is contained in:
Jackie Kay 2015-11-05 15:26:36 -08:00
parent f4a094afc8
commit 660d762072
4 changed files with 75 additions and 41 deletions

View file

@ -36,6 +36,14 @@ namespace rclcpp
namespace executor
{
/// Return codes to be used with spin_until_future_complete.
/**
* SUCCESS: The future is complete and can be accessed with "get" without blocking.
* INTERRUPTED: The future is not complete, spinning was interrupted by Ctrl-C or another error.
* TIMEOUT: Spinning timed out.
*/
enum FutureReturnCode {SUCCESS, INTERRUPTED, TIMEOUT};
/// Coordinate the order and timing of available communication tasks.
/**
* Executor provides spin functions (including spin_node_once and spin_some).
@ -126,6 +134,52 @@ public:
virtual void
spin_some();
RCLCPP_PUBLIC
virtual void
spin_once(std::chrono::nanoseconds timeout = std::chrono::nanoseconds(-1));
/// Spin (blocking) until the future is complete, it times out waiting, or rclcpp is interrupted.
/**
* \param[in] executor The executor which will spin the node.
* \param[in] node_ptr The node to spin.
* \param[in] future The future to wait on. If SUCCESS, the future is safe to access after this function
* \param[in] timeout Optional timeout parameter, which gets passed to Executor::spin_node_once.
-1 is block forever, 0 is non-blocking.
If the time spent inside the blocking loop exceeds this timeout, return a TIMEOUT return code.
* \return The return code, one of SUCCESS, INTERRUPTED, or TIMEOUT.
*/
template<typename ResponseT, typename TimeT = std::milli>
FutureReturnCode
spin_until_future_complete(
std::shared_future<ResponseT> & future,
std::chrono::duration<int64_t, TimeT> timeout = std::chrono::duration<int64_t, TimeT>(-1))
{
// TODO(wjwwood): does not work recursively; can't call spin_node_until_future_complete
// inside a callback executed by an executor.
// Check the future before entering the while loop.
// If the future is already complete, don't try to spin.
std::future_status status = future.wait_for(std::chrono::seconds(0));
auto start_time = std::chrono::system_clock::now();
while (status != std::future_status::ready && rclcpp::utilities::ok()) {
spin_once(timeout);
if (timeout.count() >= 0) {
if (start_time + timeout < std::chrono::system_clock::now()) {
return TIMEOUT;
}
}
status = future.wait_for(std::chrono::seconds(0));
}
// If the future completed, and we weren't interrupted by ctrl-C, return the response
if (status == std::future_status::ready) {
return FutureReturnCode::SUCCESS;
}
return FutureReturnCode::INTERRUPTED;
}
/// Support dynamic switching of the memory strategy.
/**
* Switching the memory strategy while the executor is spinning in another threading could have

View file

@ -44,14 +44,6 @@ namespace executors
using rclcpp::executors::multi_threaded_executor::MultiThreadedExecutor;
using rclcpp::executors::single_threaded_executor::SingleThreadedExecutor;
/// Return codes to be used with spin_until_future_complete.
/**
* SUCCESS: The future is complete and can be accessed with "get" without blocking.
* INTERRUPTED: The future is not complete, spinning was interrupted by Ctrl-C or another error.
* TIMEOUT: Spinning timed out.
*/
enum FutureReturnCode {SUCCESS, INTERRUPTED, TIMEOUT};
/// Spin (blocking) until the future is complete, it times out waiting, or rclcpp is interrupted.
/**
* \param[in] executor The executor which will spin the node.
@ -63,7 +55,7 @@ enum FutureReturnCode {SUCCESS, INTERRUPTED, TIMEOUT};
* \return The return code, one of SUCCESS, INTERRUPTED, or TIMEOUT.
*/
template<typename ResponseT, typename TimeT = std::milli>
FutureReturnCode
rclcpp::executor::FutureReturnCode
spin_node_until_future_complete(
rclcpp::executor::Executor & executor, rclcpp::node::Node::SharedPtr node_ptr,
std::shared_future<ResponseT> & future,
@ -71,34 +63,16 @@ spin_node_until_future_complete(
{
// TODO(wjwwood): does not work recursively; can't call spin_node_until_future_complete
// inside a callback executed by an executor.
// Check the future before entering the while loop.
// If the future is already complete, don't try to spin.
std::future_status status = future.wait_for(std::chrono::seconds(0));
auto start_time = std::chrono::system_clock::now();
while (status != std::future_status::ready && rclcpp::utilities::ok()) {
executor.spin_node_once(node_ptr, timeout);
if (timeout.count() >= 0) {
if (start_time + timeout < std::chrono::system_clock::now()) {
return TIMEOUT;
}
}
status = future.wait_for(std::chrono::seconds(0));
}
// If the future completed, and we weren't interrupted by ctrl-C, return the response
if (status == std::future_status::ready) {
return FutureReturnCode::SUCCESS;
}
return FutureReturnCode::INTERRUPTED;
executor.add_node(node_ptr);
auto retcode = executor.spin_until_future_complete(future, timeout);
executor.remove_node(node_ptr);
return retcode;
}
} // namespace executors
template<typename FutureT, typename TimeT = std::milli>
rclcpp::executors::FutureReturnCode
rclcpp::executor::FutureReturnCode
spin_until_future_complete(
node::Node::SharedPtr node_ptr, std::shared_future<FutureT> & future,
std::chrono::duration<int64_t, TimeT> timeout = std::chrono::duration<int64_t, TimeT>(-1))

View file

@ -93,10 +93,7 @@ Executor::spin_node_once_nanoseconds(
{
this->add_node(node, false);
// non-blocking = true
auto any_exec = get_next_executable(timeout);
if (any_exec) {
execute_any_executable(any_exec);
}
spin_once(timeout);
this->remove_node(node, false);
}
@ -118,6 +115,15 @@ Executor::spin_some()
}
}
void
Executor::spin_once(std::chrono::nanoseconds timeout)
{
auto any_exec = get_next_executable(timeout);
if (any_exec) {
execute_any_executable(any_exec);
}
}
void
Executor::set_memory_strategy(rclcpp::memory_strategy::MemoryStrategy::SharedPtr memory_strategy)
{

View file

@ -299,7 +299,7 @@ SyncParametersClient::get_parameters(const std::vector<std::string> & parameter_
{
auto f = async_parameters_client_->get_parameters(parameter_names);
if (rclcpp::executors::spin_node_until_future_complete(*executor_, node_, f) ==
rclcpp::executors::FutureReturnCode::SUCCESS)
rclcpp::executor::FutureReturnCode::SUCCESS)
{
return f.get();
}
@ -313,7 +313,7 @@ SyncParametersClient::get_parameter_types(const std::vector<std::string> & param
auto f = async_parameters_client_->get_parameter_types(parameter_names);
if (rclcpp::executors::spin_node_until_future_complete(*executor_, node_, f) ==
rclcpp::executors::FutureReturnCode::SUCCESS)
rclcpp::executor::FutureReturnCode::SUCCESS)
{
return f.get();
}
@ -327,7 +327,7 @@ SyncParametersClient::set_parameters(
auto f = async_parameters_client_->set_parameters(parameters);
if (rclcpp::executors::spin_node_until_future_complete(*executor_, node_, f) ==
rclcpp::executors::FutureReturnCode::SUCCESS)
rclcpp::executor::FutureReturnCode::SUCCESS)
{
return f.get();
}
@ -341,7 +341,7 @@ SyncParametersClient::set_parameters_atomically(
auto f = async_parameters_client_->set_parameters_atomically(parameters);
if (rclcpp::executors::spin_node_until_future_complete(*executor_, node_, f) ==
rclcpp::executors::FutureReturnCode::SUCCESS)
rclcpp::executor::FutureReturnCode::SUCCESS)
{
return f.get();
}
@ -357,7 +357,7 @@ SyncParametersClient::list_parameters(
auto f = async_parameters_client_->list_parameters(parameter_prefixes, depth);
if (rclcpp::executors::spin_node_until_future_complete(*executor_, node_, f) ==
rclcpp::executors::FutureReturnCode::SUCCESS)
rclcpp::executor::FutureReturnCode::SUCCESS)
{
return f.get();
}