add mutex in add/remove_node and wait_for_work to protect concurrent use/change of memory_strategy_ (#837)

Signed-off-by: Dirk Thomas <dirk-thomas@users.noreply.github.com>
This commit is contained in:
Dirk Thomas 2019-08-29 15:09:39 -07:00 committed by GitHub
parent 4feecc5945
commit d5301c1c7c
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 42 additions and 32 deletions

View file

@ -22,6 +22,7 @@
#include <iostream> #include <iostream>
#include <list> #include <list>
#include <memory> #include <memory>
#include <mutex>
#include <string> #include <string>
#include <vector> #include <vector>
@ -356,6 +357,9 @@ protected:
/// Wait set for managing entities that the rmw layer waits on. /// Wait set for managing entities that the rmw layer waits on.
rcl_wait_set_t wait_set_ = rcl_get_zero_initialized_wait_set(); rcl_wait_set_t wait_set_ = rcl_get_zero_initialized_wait_set();
// Mutex to protect the subsequent memory_strategy_.
std::mutex memory_strategy_mutex_;
/// The memory strategy: an interface for handling user-defined memory allocation strategies. /// The memory strategy: an interface for handling user-defined memory allocation strategies.
memory_strategy::MemoryStrategy::SharedPtr memory_strategy_; memory_strategy::MemoryStrategy::SharedPtr memory_strategy_;

View file

@ -140,6 +140,7 @@ Executor::add_node(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_pt
} }
} }
// Add the node's notify condition to the guard condition handles // Add the node's notify condition to the guard condition handles
std::unique_lock<std::mutex> lock(memory_strategy_mutex_);
memory_strategy_->add_guard_condition(node_ptr->get_notify_guard_condition()); memory_strategy_->add_guard_condition(node_ptr->get_notify_guard_condition());
} }
@ -178,6 +179,7 @@ Executor::remove_node(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node
} }
} }
} }
std::unique_lock<std::mutex> lock(memory_strategy_mutex_);
memory_strategy_->remove_guard_condition(node_ptr->get_notify_guard_condition()); memory_strategy_->remove_guard_condition(node_ptr->get_notify_guard_condition());
} }
@ -423,43 +425,47 @@ Executor::execute_client(
void void
Executor::wait_for_work(std::chrono::nanoseconds timeout) Executor::wait_for_work(std::chrono::nanoseconds timeout)
{ {
// Collect the subscriptions and timers to be waited on {
memory_strategy_->clear_handles(); std::unique_lock<std::mutex> lock(memory_strategy_mutex_);
bool has_invalid_weak_nodes = memory_strategy_->collect_entities(weak_nodes_);
// Clean up any invalid nodes, if they were detected // Collect the subscriptions and timers to be waited on
if (has_invalid_weak_nodes) { memory_strategy_->clear_handles();
auto node_it = weak_nodes_.begin(); bool has_invalid_weak_nodes = memory_strategy_->collect_entities(weak_nodes_);
auto gc_it = guard_conditions_.begin();
while (node_it != weak_nodes_.end()) { // Clean up any invalid nodes, if they were detected
if (node_it->expired()) { if (has_invalid_weak_nodes) {
node_it = weak_nodes_.erase(node_it); auto node_it = weak_nodes_.begin();
memory_strategy_->remove_guard_condition(*gc_it); auto gc_it = guard_conditions_.begin();
gc_it = guard_conditions_.erase(gc_it); while (node_it != weak_nodes_.end()) {
} else { if (node_it->expired()) {
++node_it; node_it = weak_nodes_.erase(node_it);
++gc_it; memory_strategy_->remove_guard_condition(*gc_it);
gc_it = guard_conditions_.erase(gc_it);
} else {
++node_it;
++gc_it;
}
} }
} }
} // clear wait set
// clear wait set if (rcl_wait_set_clear(&wait_set_) != RCL_RET_OK) {
if (rcl_wait_set_clear(&wait_set_) != RCL_RET_OK) { throw std::runtime_error("Couldn't clear wait set");
throw std::runtime_error("Couldn't clear wait set"); }
}
// The size of waitables are accounted for in size of the other entities // The size of waitables are accounted for in size of the other entities
rcl_ret_t ret = rcl_wait_set_resize( rcl_ret_t ret = rcl_wait_set_resize(
&wait_set_, memory_strategy_->number_of_ready_subscriptions(), &wait_set_, memory_strategy_->number_of_ready_subscriptions(),
memory_strategy_->number_of_guard_conditions(), memory_strategy_->number_of_ready_timers(), memory_strategy_->number_of_guard_conditions(), memory_strategy_->number_of_ready_timers(),
memory_strategy_->number_of_ready_clients(), memory_strategy_->number_of_ready_services(), memory_strategy_->number_of_ready_clients(), memory_strategy_->number_of_ready_services(),
memory_strategy_->number_of_ready_events()); memory_strategy_->number_of_ready_events());
if (RCL_RET_OK != ret) { if (RCL_RET_OK != ret) {
throw std::runtime_error( throw std::runtime_error(
std::string("Couldn't resize the wait set : ") + rcl_get_error_string().str); std::string("Couldn't resize the wait set : ") + rcl_get_error_string().str);
} }
if (!memory_strategy_->add_handles_to_wait_set(&wait_set_)) { if (!memory_strategy_->add_handles_to_wait_set(&wait_set_)) {
throw std::runtime_error("Couldn't fill wait set"); throw std::runtime_error("Couldn't fill wait set");
}
} }
rcl_ret_t status = rcl_ret_t status =
rcl_wait(&wait_set_, std::chrono::duration_cast<std::chrono::nanoseconds>(timeout).count()); rcl_wait(&wait_set_, std::chrono::duration_cast<std::chrono::nanoseconds>(timeout).count());