executors: refactor spin_node_some
This allowed me to unify populate_all_handles and populate_all_handles_with_node into a single function wait_for_work. This means that spin_node_some now adds, then removes a node to the executor for calling wait_for_work. That implies I also added the remove_node function and while I was at it I put some more robust checking and wait interruption into add_node.
This commit is contained in:
parent
0f9351ac21
commit
f77e6d7d56
3 changed files with 54 additions and 38 deletions
|
@ -50,7 +50,55 @@ public:
|
||||||
virtual void
|
virtual void
|
||||||
add_node(rclcpp::node::Node::SharedPtr &node_ptr)
|
add_node(rclcpp::node::Node::SharedPtr &node_ptr)
|
||||||
{
|
{
|
||||||
this->weak_nodes_.push_back(node_ptr);
|
// Check to ensure node not already added
|
||||||
|
for (auto &weak_node : weak_nodes_)
|
||||||
|
{
|
||||||
|
auto node = weak_node.lock();
|
||||||
|
if (node == node_ptr)
|
||||||
|
{
|
||||||
|
// TODO: Use a different error here?
|
||||||
|
throw std::runtime_error(
|
||||||
|
"Cannot add node to executor, node already added.");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
weak_nodes_.push_back(node_ptr);
|
||||||
|
// Interrupt waiting to handle new node
|
||||||
|
using ros_middleware_interface::trigger_guard_condition;
|
||||||
|
trigger_guard_condition(interrupt_guard_condition_);
|
||||||
|
}
|
||||||
|
|
||||||
|
virtual void
|
||||||
|
remove_node(rclcpp::node::Node::SharedPtr &node_ptr)
|
||||||
|
{
|
||||||
|
bool node_removed = false;
|
||||||
|
weak_nodes_.erase(
|
||||||
|
std::remove_if(weak_nodes_.begin(), weak_nodes_.end(),
|
||||||
|
[&](std::weak_ptr<rclcpp::node::Node> &i)
|
||||||
|
{
|
||||||
|
bool matched = (i.lock() == node_ptr);
|
||||||
|
node_removed |= matched;
|
||||||
|
return matched;
|
||||||
|
}));
|
||||||
|
// If the node was matched and removed, interrupt waiting
|
||||||
|
if (node_removed)
|
||||||
|
{
|
||||||
|
using ros_middleware_interface::trigger_guard_condition;
|
||||||
|
trigger_guard_condition(interrupt_guard_condition_);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void spin_node_some(rclcpp::node::Node::SharedPtr &node)
|
||||||
|
{
|
||||||
|
reset_subscriber_handles();
|
||||||
|
this->add_node(node);
|
||||||
|
// non-blocking = true
|
||||||
|
std::shared_ptr<AnyExecutable> any_exec;
|
||||||
|
while ((any_exec = get_next_executable(true)))
|
||||||
|
{
|
||||||
|
execute_any_executable(any_exec);
|
||||||
|
}
|
||||||
|
this->remove_node(node);
|
||||||
|
reset_subscriber_handles();
|
||||||
}
|
}
|
||||||
|
|
||||||
protected:
|
protected:
|
||||||
|
@ -138,22 +186,10 @@ protected:
|
||||||
|
|
||||||
/******************************/
|
/******************************/
|
||||||
|
|
||||||
/*** Populating class storage from a single node ***/
|
|
||||||
|
|
||||||
// TODO: pick a better name for this function
|
|
||||||
void
|
|
||||||
populate_all_handles_with_node(rclcpp::node::Node &node)
|
|
||||||
{
|
|
||||||
// TODO: reimplement
|
|
||||||
}
|
|
||||||
|
|
||||||
/******************************/
|
|
||||||
|
|
||||||
/*** Populate class storage from stored weak node pointers and wait. ***/
|
/*** Populate class storage from stored weak node pointers and wait. ***/
|
||||||
|
|
||||||
// TODO: pick a better name for this function
|
|
||||||
void
|
void
|
||||||
populate_all_handles(bool nonblocking)
|
wait_for_work(bool nonblocking)
|
||||||
{
|
{
|
||||||
// Collect the subscriptions and timers to be waited on
|
// Collect the subscriptions and timers to be waited on
|
||||||
bool has_invalid_weak_nodes = false;
|
bool has_invalid_weak_nodes = false;
|
||||||
|
@ -527,8 +563,8 @@ protected:
|
||||||
// If there are none
|
// If there are none
|
||||||
if (!any_exec)
|
if (!any_exec)
|
||||||
{
|
{
|
||||||
// Repopulate the subscriber handles and wait on them
|
// Wait for subscriptions or timers to work on
|
||||||
populate_all_handles(nonblocking);
|
wait_for_work(nonblocking);
|
||||||
// Try again
|
// Try again
|
||||||
any_exec = get_next_ready_executable();
|
any_exec = get_next_ready_executable();
|
||||||
}
|
}
|
||||||
|
|
|
@ -55,21 +55,6 @@ public:
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void spin_node_some(rclcpp::node::Node &node)
|
|
||||||
{
|
|
||||||
reset_subscriber_handles();
|
|
||||||
populate_all_handles_with_node(node);
|
|
||||||
// non-blocking = true
|
|
||||||
auto any_exec = get_next_executable(true);
|
|
||||||
while (any_exec->subscription)
|
|
||||||
{
|
|
||||||
execute_subscription(any_exec->subscription);
|
|
||||||
// non-blocking = true
|
|
||||||
any_exec = get_next_executable(true);
|
|
||||||
}
|
|
||||||
reset_subscriber_handles();
|
|
||||||
}
|
|
||||||
|
|
||||||
private:
|
private:
|
||||||
RCLCPP_DISABLE_COPY(SingleThreadedExecutor);
|
RCLCPP_DISABLE_COPY(SingleThreadedExecutor);
|
||||||
|
|
||||||
|
|
|
@ -57,15 +57,10 @@ using rclcpp::utilities::ok;
|
||||||
using rclcpp::utilities::init;
|
using rclcpp::utilities::init;
|
||||||
using rclcpp::utilities::sleep_for;
|
using rclcpp::utilities::sleep_for;
|
||||||
|
|
||||||
void spin_some(Node &node)
|
|
||||||
{
|
|
||||||
rclcpp::executors::SingleThreadedExecutor executor;
|
|
||||||
executor.spin_node_some(node);
|
|
||||||
}
|
|
||||||
|
|
||||||
void spin_some(Node::SharedPtr &node_ptr)
|
void spin_some(Node::SharedPtr &node_ptr)
|
||||||
{
|
{
|
||||||
spin_some(*node_ptr);
|
rclcpp::executors::SingleThreadedExecutor executor;
|
||||||
|
executor.spin_node_some(node_ptr);
|
||||||
}
|
}
|
||||||
|
|
||||||
void spin(Node::SharedPtr &node_ptr)
|
void spin(Node::SharedPtr &node_ptr)
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue