ensure removal of guard conditions of expired nodes from memory strategy (#741)

* change memory strategy API from vector of nodes to list of nodes

Signed-off-by: Dirk Thomas <dirk-thomas@users.noreply.github.com>

* store guard_condition of node in executor and ensure that it is removed from the memory strategy

Signed-off-by: Dirk Thomas <dirk-thomas@users.noreply.github.com>

* add unit test

Signed-off-by: Dirk Thomas <dirk-thomas@users.noreply.github.com>
This commit is contained in:
Dirk Thomas 2019-05-24 15:31:31 -07:00 committed by GitHub
parent 131a11bba5
commit 8553fbea7f
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 74 additions and 54 deletions

View file

@ -364,7 +364,8 @@ protected:
private: private:
RCLCPP_DISABLE_COPY(Executor) RCLCPP_DISABLE_COPY(Executor)
std::vector<rclcpp::node_interfaces::NodeBaseInterface::WeakPtr> weak_nodes_; std::list<rclcpp::node_interfaces::NodeBaseInterface::WeakPtr> weak_nodes_;
std::list<const rcl_guard_condition_t *> guard_conditions_;
}; };
} // namespace executor } // namespace executor

View file

@ -15,8 +15,8 @@
#ifndef RCLCPP__MEMORY_STRATEGY_HPP_ #ifndef RCLCPP__MEMORY_STRATEGY_HPP_
#define RCLCPP__MEMORY_STRATEGY_HPP_ #define RCLCPP__MEMORY_STRATEGY_HPP_
#include <list>
#include <memory> #include <memory>
#include <vector>
#include "rcl/allocator.h" #include "rcl/allocator.h"
#include "rcl/wait.h" #include "rcl/wait.h"
@ -42,11 +42,11 @@ class RCLCPP_PUBLIC MemoryStrategy
{ {
public: public:
RCLCPP_SMART_PTR_DEFINITIONS_NOT_COPYABLE(MemoryStrategy) RCLCPP_SMART_PTR_DEFINITIONS_NOT_COPYABLE(MemoryStrategy)
using WeakNodeVector = std::vector<rclcpp::node_interfaces::NodeBaseInterface::WeakPtr>; using WeakNodeList = std::list<rclcpp::node_interfaces::NodeBaseInterface::WeakPtr>;
virtual ~MemoryStrategy() = default; virtual ~MemoryStrategy() = default;
virtual bool collect_entities(const WeakNodeVector & weak_nodes) = 0; virtual bool collect_entities(const WeakNodeList & weak_nodes) = 0;
virtual size_t number_of_ready_subscriptions() const = 0; virtual size_t number_of_ready_subscriptions() const = 0;
virtual size_t number_of_ready_services() const = 0; virtual size_t number_of_ready_services() const = 0;
@ -67,22 +67,22 @@ public:
virtual void virtual void
get_next_subscription( get_next_subscription(
rclcpp::executor::AnyExecutable & any_exec, rclcpp::executor::AnyExecutable & any_exec,
const WeakNodeVector & weak_nodes) = 0; const WeakNodeList & weak_nodes) = 0;
virtual void virtual void
get_next_service( get_next_service(
rclcpp::executor::AnyExecutable & any_exec, rclcpp::executor::AnyExecutable & any_exec,
const WeakNodeVector & weak_nodes) = 0; const WeakNodeList & weak_nodes) = 0;
virtual void virtual void
get_next_client( get_next_client(
rclcpp::executor::AnyExecutable & any_exec, rclcpp::executor::AnyExecutable & any_exec,
const WeakNodeVector & weak_nodes) = 0; const WeakNodeList & weak_nodes) = 0;
virtual void virtual void
get_next_waitable( get_next_waitable(
rclcpp::executor::AnyExecutable & any_exec, rclcpp::executor::AnyExecutable & any_exec,
const WeakNodeVector & weak_nodes) = 0; const WeakNodeList & weak_nodes) = 0;
virtual rcl_allocator_t virtual rcl_allocator_t
get_allocator() = 0; get_allocator() = 0;
@ -90,42 +90,42 @@ public:
static rclcpp::SubscriptionBase::SharedPtr static rclcpp::SubscriptionBase::SharedPtr
get_subscription_by_handle( get_subscription_by_handle(
std::shared_ptr<const rcl_subscription_t> subscriber_handle, std::shared_ptr<const rcl_subscription_t> subscriber_handle,
const WeakNodeVector & weak_nodes); const WeakNodeList & weak_nodes);
static rclcpp::ServiceBase::SharedPtr static rclcpp::ServiceBase::SharedPtr
get_service_by_handle( get_service_by_handle(
std::shared_ptr<const rcl_service_t> service_handle, std::shared_ptr<const rcl_service_t> service_handle,
const WeakNodeVector & weak_nodes); const WeakNodeList & weak_nodes);
static rclcpp::ClientBase::SharedPtr static rclcpp::ClientBase::SharedPtr
get_client_by_handle( get_client_by_handle(
std::shared_ptr<const rcl_client_t> client_handle, std::shared_ptr<const rcl_client_t> client_handle,
const WeakNodeVector & weak_nodes); const WeakNodeList & weak_nodes);
static rclcpp::node_interfaces::NodeBaseInterface::SharedPtr static rclcpp::node_interfaces::NodeBaseInterface::SharedPtr
get_node_by_group( get_node_by_group(
rclcpp::callback_group::CallbackGroup::SharedPtr group, rclcpp::callback_group::CallbackGroup::SharedPtr group,
const WeakNodeVector & weak_nodes); const WeakNodeList & weak_nodes);
static rclcpp::callback_group::CallbackGroup::SharedPtr static rclcpp::callback_group::CallbackGroup::SharedPtr
get_group_by_subscription( get_group_by_subscription(
rclcpp::SubscriptionBase::SharedPtr subscription, rclcpp::SubscriptionBase::SharedPtr subscription,
const WeakNodeVector & weak_nodes); const WeakNodeList & weak_nodes);
static rclcpp::callback_group::CallbackGroup::SharedPtr static rclcpp::callback_group::CallbackGroup::SharedPtr
get_group_by_service( get_group_by_service(
rclcpp::ServiceBase::SharedPtr service, rclcpp::ServiceBase::SharedPtr service,
const WeakNodeVector & weak_nodes); const WeakNodeList & weak_nodes);
static rclcpp::callback_group::CallbackGroup::SharedPtr static rclcpp::callback_group::CallbackGroup::SharedPtr
get_group_by_client( get_group_by_client(
rclcpp::ClientBase::SharedPtr client, rclcpp::ClientBase::SharedPtr client,
const WeakNodeVector & weak_nodes); const WeakNodeList & weak_nodes);
static rclcpp::callback_group::CallbackGroup::SharedPtr static rclcpp::callback_group::CallbackGroup::SharedPtr
get_group_by_waitable( get_group_by_waitable(
rclcpp::Waitable::SharedPtr waitable, rclcpp::Waitable::SharedPtr waitable,
const WeakNodeVector & weak_nodes); const WeakNodeList & weak_nodes);
}; };
} // namespace memory_strategy } // namespace memory_strategy

View file

@ -150,7 +150,7 @@ public:
); );
} }
bool collect_entities(const WeakNodeVector & weak_nodes) bool collect_entities(const WeakNodeList & weak_nodes)
{ {
bool has_invalid_weak_nodes = false; bool has_invalid_weak_nodes = false;
for (auto & weak_node : weak_nodes) { for (auto & weak_node : weak_nodes) {
@ -265,7 +265,7 @@ public:
virtual void virtual void
get_next_subscription( get_next_subscription(
executor::AnyExecutable & any_exec, executor::AnyExecutable & any_exec,
const WeakNodeVector & weak_nodes) const WeakNodeList & weak_nodes)
{ {
auto it = subscription_handles_.begin(); auto it = subscription_handles_.begin();
while (it != subscription_handles_.end()) { while (it != subscription_handles_.end()) {
@ -309,7 +309,7 @@ public:
virtual void virtual void
get_next_service( get_next_service(
executor::AnyExecutable & any_exec, executor::AnyExecutable & any_exec,
const WeakNodeVector & weak_nodes) const WeakNodeList & weak_nodes)
{ {
auto it = service_handles_.begin(); auto it = service_handles_.begin();
while (it != service_handles_.end()) { while (it != service_handles_.end()) {
@ -342,7 +342,7 @@ public:
} }
virtual void virtual void
get_next_client(executor::AnyExecutable & any_exec, const WeakNodeVector & weak_nodes) get_next_client(executor::AnyExecutable & any_exec, const WeakNodeList & weak_nodes)
{ {
auto it = client_handles_.begin(); auto it = client_handles_.begin();
while (it != client_handles_.end()) { while (it != client_handles_.end()) {
@ -375,7 +375,7 @@ public:
} }
virtual void virtual void
get_next_waitable(executor::AnyExecutable & any_exec, const WeakNodeVector & weak_nodes) get_next_waitable(executor::AnyExecutable & any_exec, const WeakNodeList & weak_nodes)
{ {
auto it = waitable_handles_.begin(); auto it = waitable_handles_.begin();
while (it != waitable_handles_.end()) { while (it != waitable_handles_.end()) {

View file

@ -91,6 +91,10 @@ Executor::~Executor()
} }
} }
weak_nodes_.clear(); weak_nodes_.clear();
for (auto & guard_condition : guard_conditions_) {
memory_strategy_->remove_guard_condition(guard_condition);
}
guard_conditions_.clear();
// Finalize the wait set. // Finalize the wait set.
if (rcl_wait_set_fini(&wait_set_) != RCL_RET_OK) { if (rcl_wait_set_fini(&wait_set_) != RCL_RET_OK) {
@ -128,6 +132,7 @@ Executor::add_node(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_pt
} }
} }
weak_nodes_.push_back(node_ptr); weak_nodes_.push_back(node_ptr);
guard_conditions_.push_back(node_ptr->get_notify_guard_condition());
if (notify) { if (notify) {
// Interrupt waiting to handle new node // Interrupt waiting to handle new node
if (rcl_trigger_guard_condition(&interrupt_guard_condition_) != RCL_RET_OK) { if (rcl_trigger_guard_condition(&interrupt_guard_condition_) != RCL_RET_OK) {
@ -148,17 +153,21 @@ void
Executor::remove_node(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, bool notify) Executor::remove_node(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, bool notify)
{ {
bool node_removed = false; bool node_removed = false;
weak_nodes_.erase( {
std::remove_if( auto node_it = weak_nodes_.begin();
weak_nodes_.begin(), weak_nodes_.end(), auto gc_it = guard_conditions_.begin();
[&](rclcpp::node_interfaces::NodeBaseInterface::WeakPtr & i) while (node_it != weak_nodes_.end()) {
{ bool matched = (node_it->lock() == node_ptr);
bool matched = (i.lock() == node_ptr); if (matched) {
node_removed |= matched; node_it = weak_nodes_.erase(node_it);
return matched; gc_it = guard_conditions_.erase(gc_it);
node_removed = true;
} else {
++node_it;
++gc_it;
} }
) }
); }
std::atomic_bool & has_executor = node_ptr->get_associated_with_executor_atomic(); std::atomic_bool & has_executor = node_ptr->get_associated_with_executor_atomic();
has_executor.store(false); has_executor.store(false);
if (notify) { if (notify) {
@ -420,15 +429,18 @@ Executor::wait_for_work(std::chrono::nanoseconds timeout)
// Clean up any invalid nodes, if they were detected // Clean up any invalid nodes, if they were detected
if (has_invalid_weak_nodes) { if (has_invalid_weak_nodes) {
weak_nodes_.erase( auto node_it = weak_nodes_.begin();
remove_if( auto gc_it = guard_conditions_.begin();
weak_nodes_.begin(), weak_nodes_.end(), while (node_it != weak_nodes_.end()) {
[](rclcpp::node_interfaces::NodeBaseInterface::WeakPtr i) if (node_it->expired()) {
{ node_it = weak_nodes_.erase(node_it);
return i.expired(); memory_strategy_->remove_guard_condition(*gc_it);
} gc_it = guard_conditions_.erase(gc_it);
) } else {
); ++node_it;
++gc_it;
}
}
} }
// clear wait set // clear wait set
if (rcl_wait_set_clear(&wait_set_) != RCL_RET_OK) { if (rcl_wait_set_clear(&wait_set_) != RCL_RET_OK) {

View file

@ -20,7 +20,7 @@ using rclcpp::memory_strategy::MemoryStrategy;
rclcpp::SubscriptionBase::SharedPtr rclcpp::SubscriptionBase::SharedPtr
MemoryStrategy::get_subscription_by_handle( MemoryStrategy::get_subscription_by_handle(
std::shared_ptr<const rcl_subscription_t> subscriber_handle, std::shared_ptr<const rcl_subscription_t> subscriber_handle,
const WeakNodeVector & weak_nodes) const WeakNodeList & weak_nodes)
{ {
for (auto & weak_node : weak_nodes) { for (auto & weak_node : weak_nodes) {
auto node = weak_node.lock(); auto node = weak_node.lock();
@ -51,7 +51,7 @@ MemoryStrategy::get_subscription_by_handle(
rclcpp::ServiceBase::SharedPtr rclcpp::ServiceBase::SharedPtr
MemoryStrategy::get_service_by_handle( MemoryStrategy::get_service_by_handle(
std::shared_ptr<const rcl_service_t> service_handle, std::shared_ptr<const rcl_service_t> service_handle,
const WeakNodeVector & weak_nodes) const WeakNodeList & weak_nodes)
{ {
for (auto & weak_node : weak_nodes) { for (auto & weak_node : weak_nodes) {
auto node = weak_node.lock(); auto node = weak_node.lock();
@ -77,7 +77,7 @@ MemoryStrategy::get_service_by_handle(
rclcpp::ClientBase::SharedPtr rclcpp::ClientBase::SharedPtr
MemoryStrategy::get_client_by_handle( MemoryStrategy::get_client_by_handle(
std::shared_ptr<const rcl_client_t> client_handle, std::shared_ptr<const rcl_client_t> client_handle,
const WeakNodeVector & weak_nodes) const WeakNodeList & weak_nodes)
{ {
for (auto & weak_node : weak_nodes) { for (auto & weak_node : weak_nodes) {
auto node = weak_node.lock(); auto node = weak_node.lock();
@ -103,7 +103,7 @@ MemoryStrategy::get_client_by_handle(
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr rclcpp::node_interfaces::NodeBaseInterface::SharedPtr
MemoryStrategy::get_node_by_group( MemoryStrategy::get_node_by_group(
rclcpp::callback_group::CallbackGroup::SharedPtr group, rclcpp::callback_group::CallbackGroup::SharedPtr group,
const WeakNodeVector & weak_nodes) const WeakNodeList & weak_nodes)
{ {
if (!group) { if (!group) {
return nullptr; return nullptr;
@ -126,7 +126,7 @@ MemoryStrategy::get_node_by_group(
rclcpp::callback_group::CallbackGroup::SharedPtr rclcpp::callback_group::CallbackGroup::SharedPtr
MemoryStrategy::get_group_by_subscription( MemoryStrategy::get_group_by_subscription(
rclcpp::SubscriptionBase::SharedPtr subscription, rclcpp::SubscriptionBase::SharedPtr subscription,
const WeakNodeVector & weak_nodes) const WeakNodeList & weak_nodes)
{ {
for (auto & weak_node : weak_nodes) { for (auto & weak_node : weak_nodes) {
auto node = weak_node.lock(); auto node = weak_node.lock();
@ -152,7 +152,7 @@ MemoryStrategy::get_group_by_subscription(
rclcpp::callback_group::CallbackGroup::SharedPtr rclcpp::callback_group::CallbackGroup::SharedPtr
MemoryStrategy::get_group_by_service( MemoryStrategy::get_group_by_service(
rclcpp::ServiceBase::SharedPtr service, rclcpp::ServiceBase::SharedPtr service,
const WeakNodeVector & weak_nodes) const WeakNodeList & weak_nodes)
{ {
for (auto & weak_node : weak_nodes) { for (auto & weak_node : weak_nodes) {
auto node = weak_node.lock(); auto node = weak_node.lock();
@ -178,7 +178,7 @@ MemoryStrategy::get_group_by_service(
rclcpp::callback_group::CallbackGroup::SharedPtr rclcpp::callback_group::CallbackGroup::SharedPtr
MemoryStrategy::get_group_by_client( MemoryStrategy::get_group_by_client(
rclcpp::ClientBase::SharedPtr client, rclcpp::ClientBase::SharedPtr client,
const WeakNodeVector & weak_nodes) const WeakNodeList & weak_nodes)
{ {
for (auto & weak_node : weak_nodes) { for (auto & weak_node : weak_nodes) {
auto node = weak_node.lock(); auto node = weak_node.lock();
@ -204,7 +204,7 @@ MemoryStrategy::get_group_by_client(
rclcpp::callback_group::CallbackGroup::SharedPtr rclcpp::callback_group::CallbackGroup::SharedPtr
MemoryStrategy::get_group_by_waitable( MemoryStrategy::get_group_by_waitable(
rclcpp::Waitable::SharedPtr waitable, rclcpp::Waitable::SharedPtr waitable,
const WeakNodeVector & weak_nodes) const WeakNodeList & weak_nodes)
{ {
for (auto & weak_node : weak_nodes) { for (auto & weak_node : weak_nodes) {
auto node = weak_node.lock(); auto node = weak_node.lock();

View file

@ -60,3 +60,10 @@ TEST_F(TestExecutors, detachOnDestruction) {
EXPECT_NO_THROW(executor.add_node(node)); EXPECT_NO_THROW(executor.add_node(node));
} }
} }
// Make sure that the executor can automatically remove expired nodes correctly
TEST_F(TestExecutors, addTemporaryNode) {
rclcpp::executors::SingleThreadedExecutor executor;
executor.add_node(std::make_shared<rclcpp::Node>("temporary_node"));
EXPECT_NO_THROW(executor.spin_some());
}

View file

@ -36,15 +36,15 @@ TEST_F(TestFindWeakNodes, allocator_strategy_with_weak_nodes) {
rclcpp::memory_strategies::allocator_memory_strategy::AllocatorMemoryStrategy<>>(); rclcpp::memory_strategies::allocator_memory_strategy::AllocatorMemoryStrategy<>>();
auto existing_node = rclcpp::Node::make_shared("existing_node"); auto existing_node = rclcpp::Node::make_shared("existing_node");
auto dead_node = rclcpp::Node::make_shared("dead_node"); auto dead_node = rclcpp::Node::make_shared("dead_node");
rclcpp::memory_strategy::MemoryStrategy::WeakNodeVector weak_nodes; rclcpp::memory_strategy::MemoryStrategy::WeakNodeList weak_nodes;
weak_nodes.push_back(existing_node->get_node_base_interface()); weak_nodes.push_back(existing_node->get_node_base_interface());
weak_nodes.push_back(dead_node->get_node_base_interface()); weak_nodes.push_back(dead_node->get_node_base_interface());
// AND // AND
// Delete dead_node, creating a dangling pointer in weak_nodes // Delete dead_node, creating a dangling pointer in weak_nodes
dead_node.reset(); dead_node.reset();
ASSERT_FALSE(weak_nodes[0].expired()); ASSERT_FALSE(weak_nodes.front().expired());
ASSERT_TRUE(weak_nodes[1].expired()); ASSERT_TRUE(weak_nodes.back().expired());
// WHEN // WHEN
bool has_invalid_weak_nodes = memory_strategy->collect_entities(weak_nodes); bool has_invalid_weak_nodes = memory_strategy->collect_entities(weak_nodes);
@ -64,11 +64,11 @@ TEST_F(TestFindWeakNodes, allocator_strategy_no_weak_nodes) {
rclcpp::memory_strategies::allocator_memory_strategy::AllocatorMemoryStrategy<>>(); rclcpp::memory_strategies::allocator_memory_strategy::AllocatorMemoryStrategy<>>();
auto existing_node1 = rclcpp::Node::make_shared("existing_node1"); auto existing_node1 = rclcpp::Node::make_shared("existing_node1");
auto existing_node2 = rclcpp::Node::make_shared("existing_node2"); auto existing_node2 = rclcpp::Node::make_shared("existing_node2");
rclcpp::memory_strategy::MemoryStrategy::WeakNodeVector weak_nodes; rclcpp::memory_strategy::MemoryStrategy::WeakNodeList weak_nodes;
weak_nodes.push_back(existing_node1->get_node_base_interface()); weak_nodes.push_back(existing_node1->get_node_base_interface());
weak_nodes.push_back(existing_node2->get_node_base_interface()); weak_nodes.push_back(existing_node2->get_node_base_interface());
ASSERT_FALSE(weak_nodes[0].expired()); ASSERT_FALSE(weak_nodes.front().expired());
ASSERT_FALSE(weak_nodes[1].expired()); ASSERT_FALSE(weak_nodes.back().expired());
// WHEN // WHEN
bool has_invalid_weak_nodes = memory_strategy->collect_entities(weak_nodes); bool has_invalid_weak_nodes = memory_strategy->collect_entities(weak_nodes);