Crash in callback group pointer vector iterator (#814)

Signed-off-by: Guillaume Autran <gautran@clearpath.ai>
This commit is contained in:
Guillaume Autran 2019-08-28 17:11:11 -04:00 committed by ivanpauno
parent 17841d6b7c
commit 4feecc5945
7 changed files with 160 additions and 131 deletions

View file

@ -62,25 +62,40 @@ public:
RCLCPP_PUBLIC RCLCPP_PUBLIC
explicit CallbackGroup(CallbackGroupType group_type); explicit CallbackGroup(CallbackGroupType group_type);
RCLCPP_PUBLIC template<typename Function>
const std::vector<rclcpp::SubscriptionBase::WeakPtr> & rclcpp::SubscriptionBase::SharedPtr
get_subscription_ptrs() const; find_subscription_ptrs_if(Function func) const
{
return _find_ptrs_if_impl<rclcpp::SubscriptionBase, Function>(func, subscription_ptrs_);
}
RCLCPP_PUBLIC template<typename Function>
const std::vector<rclcpp::TimerBase::WeakPtr> & rclcpp::TimerBase::SharedPtr
get_timer_ptrs() const; find_timer_ptrs_if(Function func) const
{
return _find_ptrs_if_impl<rclcpp::TimerBase, Function>(func, timer_ptrs_);
}
RCLCPP_PUBLIC template<typename Function>
const std::vector<rclcpp::ServiceBase::WeakPtr> & rclcpp::ServiceBase::SharedPtr
get_service_ptrs() const; find_service_ptrs_if(Function func) const
{
return _find_ptrs_if_impl<rclcpp::ServiceBase, Function>(func, service_ptrs_);
}
RCLCPP_PUBLIC template<typename Function>
const std::vector<rclcpp::ClientBase::WeakPtr> & rclcpp::ClientBase::SharedPtr
get_client_ptrs() const; find_client_ptrs_if(Function func) const
{
return _find_ptrs_if_impl<rclcpp::ClientBase, Function>(func, client_ptrs_);
}
RCLCPP_PUBLIC template<typename Function>
const std::vector<rclcpp::Waitable::WeakPtr> & rclcpp::Waitable::SharedPtr
get_waitable_ptrs() const; find_waitable_ptrs_if(Function func) const
{
return _find_ptrs_if_impl<rclcpp::Waitable, Function>(func, waitable_ptrs_);
}
RCLCPP_PUBLIC RCLCPP_PUBLIC
std::atomic_bool & std::atomic_bool &
@ -130,6 +145,21 @@ protected:
std::vector<rclcpp::ClientBase::WeakPtr> client_ptrs_; std::vector<rclcpp::ClientBase::WeakPtr> client_ptrs_;
std::vector<rclcpp::Waitable::WeakPtr> waitable_ptrs_; std::vector<rclcpp::Waitable::WeakPtr> waitable_ptrs_;
std::atomic_bool can_be_taken_from_; std::atomic_bool can_be_taken_from_;
private:
template<typename TypeT, typename Function>
typename TypeT::SharedPtr _find_ptrs_if_impl(
Function func, const std::vector<typename TypeT::WeakPtr> & vect_ptrs) const
{
std::lock_guard<std::mutex> lock(mutex_);
for (auto & weak_ptr : vect_ptrs) {
auto ref_ptr = weak_ptr.lock();
if (ref_ptr && func(ref_ptr)) {
return ref_ptr;
}
}
return typename TypeT::SharedPtr();
}
}; };
} // namespace callback_group } // namespace callback_group

View file

@ -15,6 +15,7 @@
#ifndef RCLCPP__EXECUTORS__MULTI_THREADED_EXECUTOR_HPP_ #ifndef RCLCPP__EXECUTORS__MULTI_THREADED_EXECUTOR_HPP_
#define RCLCPP__EXECUTORS__MULTI_THREADED_EXECUTOR_HPP_ #define RCLCPP__EXECUTORS__MULTI_THREADED_EXECUTOR_HPP_
#include <chrono>
#include <memory> #include <memory>
#include <mutex> #include <mutex>
#include <set> #include <set>
@ -53,7 +54,8 @@ public:
MultiThreadedExecutor( MultiThreadedExecutor(
const executor::ExecutorArgs & args = executor::ExecutorArgs(), const executor::ExecutorArgs & args = executor::ExecutorArgs(),
size_t number_of_threads = 0, size_t number_of_threads = 0,
bool yield_before_execute = false); bool yield_before_execute = false,
std::chrono::nanoseconds timeout = std::chrono::nanoseconds(-1));
RCLCPP_PUBLIC RCLCPP_PUBLIC
virtual ~MultiThreadedExecutor(); virtual ~MultiThreadedExecutor();
@ -77,6 +79,7 @@ private:
std::mutex wait_mutex_; std::mutex wait_mutex_;
size_t number_of_threads_; size_t number_of_threads_;
bool yield_before_execute_; bool yield_before_execute_;
std::chrono::nanoseconds next_exec_timeout_;
std::set<TimerBase::SharedPtr> scheduled_timers_; std::set<TimerBase::SharedPtr> scheduled_timers_;
}; };

View file

@ -164,40 +164,31 @@ public:
if (!group || !group->can_be_taken_from().load()) { if (!group || !group->can_be_taken_from().load()) {
continue; continue;
} }
for (auto & weak_subscription : group->get_subscription_ptrs()) { group->find_subscription_ptrs_if(
auto subscription = weak_subscription.lock(); [this](const rclcpp::SubscriptionBase::SharedPtr & subscription) {
if (subscription) {
subscription_handles_.push_back(subscription->get_subscription_handle()); subscription_handles_.push_back(subscription->get_subscription_handle());
if (subscription->get_intra_process_subscription_handle()) { if (subscription->get_intra_process_subscription_handle()) {
subscription_handles_.push_back( subscription_handles_.push_back(
subscription->get_intra_process_subscription_handle()); subscription->get_intra_process_subscription_handle());
} }
} return false;
} });
for (auto & weak_service : group->get_service_ptrs()) { group->find_service_ptrs_if([this](const rclcpp::ServiceBase::SharedPtr & service) {
auto service = weak_service.lock();
if (service) {
service_handles_.push_back(service->get_service_handle()); service_handles_.push_back(service->get_service_handle());
} return false;
} });
for (auto & weak_client : group->get_client_ptrs()) { group->find_client_ptrs_if([this](const rclcpp::ClientBase::SharedPtr & client) {
auto client = weak_client.lock();
if (client) {
client_handles_.push_back(client->get_client_handle()); client_handles_.push_back(client->get_client_handle());
} return false;
} });
for (auto & weak_timer : group->get_timer_ptrs()) { group->find_timer_ptrs_if([this](const rclcpp::TimerBase::SharedPtr & timer) {
auto timer = weak_timer.lock();
if (timer) {
timer_handles_.push_back(timer->get_timer_handle()); timer_handles_.push_back(timer->get_timer_handle());
} return false;
} });
for (auto & weak_waitable : group->get_waitable_ptrs()) { group->find_waitable_ptrs_if([this](const rclcpp::Waitable::SharedPtr & waitable) {
auto waitable = weak_waitable.lock();
if (waitable) {
waitable_handles_.push_back(waitable); waitable_handles_.push_back(waitable);
} return false;
} });
} }
} }
return has_invalid_weak_nodes; return has_invalid_weak_nodes;

View file

@ -23,40 +23,6 @@ CallbackGroup::CallbackGroup(CallbackGroupType group_type)
: type_(group_type), can_be_taken_from_(true) : type_(group_type), can_be_taken_from_(true)
{} {}
const std::vector<rclcpp::SubscriptionBase::WeakPtr> &
CallbackGroup::get_subscription_ptrs() const
{
std::lock_guard<std::mutex> lock(mutex_);
return subscription_ptrs_;
}
const std::vector<rclcpp::TimerBase::WeakPtr> &
CallbackGroup::get_timer_ptrs() const
{
std::lock_guard<std::mutex> lock(mutex_);
return timer_ptrs_;
}
const std::vector<rclcpp::ServiceBase::WeakPtr> &
CallbackGroup::get_service_ptrs() const
{
std::lock_guard<std::mutex> lock(mutex_);
return service_ptrs_;
}
const std::vector<rclcpp::ClientBase::WeakPtr> &
CallbackGroup::get_client_ptrs() const
{
std::lock_guard<std::mutex> lock(mutex_);
return client_ptrs_;
}
const std::vector<rclcpp::Waitable::WeakPtr> &
CallbackGroup::get_waitable_ptrs() const
{
std::lock_guard<std::mutex> lock(mutex_);
return waitable_ptrs_;
}
std::atomic_bool & std::atomic_bool &
CallbackGroup::can_be_taken_from() CallbackGroup::can_be_taken_from()
@ -76,6 +42,12 @@ CallbackGroup::add_subscription(
{ {
std::lock_guard<std::mutex> lock(mutex_); std::lock_guard<std::mutex> lock(mutex_);
subscription_ptrs_.push_back(subscription_ptr); subscription_ptrs_.push_back(subscription_ptr);
subscription_ptrs_.erase(
std::remove_if(
subscription_ptrs_.begin(),
subscription_ptrs_.end(),
[](rclcpp::SubscriptionBase::WeakPtr x) {return x.expired();}),
subscription_ptrs_.end());
} }
void void
@ -83,6 +55,12 @@ CallbackGroup::add_timer(const rclcpp::TimerBase::SharedPtr timer_ptr)
{ {
std::lock_guard<std::mutex> lock(mutex_); std::lock_guard<std::mutex> lock(mutex_);
timer_ptrs_.push_back(timer_ptr); timer_ptrs_.push_back(timer_ptr);
timer_ptrs_.erase(
std::remove_if(
timer_ptrs_.begin(),
timer_ptrs_.end(),
[](rclcpp::TimerBase::WeakPtr x) {return x.expired();}),
timer_ptrs_.end());
} }
void void
@ -90,6 +68,12 @@ CallbackGroup::add_service(const rclcpp::ServiceBase::SharedPtr service_ptr)
{ {
std::lock_guard<std::mutex> lock(mutex_); std::lock_guard<std::mutex> lock(mutex_);
service_ptrs_.push_back(service_ptr); service_ptrs_.push_back(service_ptr);
service_ptrs_.erase(
std::remove_if(
service_ptrs_.begin(),
service_ptrs_.end(),
[](rclcpp::ServiceBase::WeakPtr x) {return x.expired();}),
service_ptrs_.end());
} }
void void
@ -97,6 +81,12 @@ CallbackGroup::add_client(const rclcpp::ClientBase::SharedPtr client_ptr)
{ {
std::lock_guard<std::mutex> lock(mutex_); std::lock_guard<std::mutex> lock(mutex_);
client_ptrs_.push_back(client_ptr); client_ptrs_.push_back(client_ptr);
client_ptrs_.erase(
std::remove_if(
client_ptrs_.begin(),
client_ptrs_.end(),
[](rclcpp::ClientBase::WeakPtr x) {return x.expired();}),
client_ptrs_.end());
} }
void void
@ -104,6 +94,12 @@ CallbackGroup::add_waitable(const rclcpp::Waitable::SharedPtr waitable_ptr)
{ {
std::lock_guard<std::mutex> lock(mutex_); std::lock_guard<std::mutex> lock(mutex_);
waitable_ptrs_.push_back(waitable_ptr); waitable_ptrs_.push_back(waitable_ptr);
waitable_ptrs_.erase(
std::remove_if(
waitable_ptrs_.begin(),
waitable_ptrs_.end(),
[](rclcpp::Waitable::WeakPtr x) {return x.expired();}),
waitable_ptrs_.end());
} }
void void

View file

@ -511,14 +511,15 @@ Executor::get_group_by_timer(rclcpp::TimerBase::SharedPtr timer)
if (!group) { if (!group) {
continue; continue;
} }
for (auto & weak_timer : group->get_timer_ptrs()) { auto timer_ref = group->find_timer_ptrs_if(
auto t = weak_timer.lock(); [timer](const rclcpp::TimerBase::SharedPtr & timer_ptr) -> bool {
if (t == timer) { return timer_ptr == timer;
});
if (timer_ref) {
return group; return group;
} }
} }
} }
}
return rclcpp::callback_group::CallbackGroup::SharedPtr(); return rclcpp::callback_group::CallbackGroup::SharedPtr();
} }
@ -535,10 +536,12 @@ Executor::get_next_timer(AnyExecutable & any_exec)
if (!group || !group->can_be_taken_from().load()) { if (!group || !group->can_be_taken_from().load()) {
continue; continue;
} }
for (auto & timer_ref : group->get_timer_ptrs()) { auto timer_ref = group->find_timer_ptrs_if(
auto timer = timer_ref.lock(); [](const rclcpp::TimerBase::SharedPtr & timer) -> bool {
if (timer && timer->is_ready()) { return timer->is_ready();
any_exec.timer = timer; });
if (timer_ref) {
any_exec.timer = timer_ref;
any_exec.callback_group = group; any_exec.callback_group = group;
any_exec.node_base = node; any_exec.node_base = node;
return; return;
@ -546,7 +549,6 @@ Executor::get_next_timer(AnyExecutable & any_exec)
} }
} }
} }
}
bool bool
Executor::get_next_ready_executable(AnyExecutable & any_executable) Executor::get_next_ready_executable(AnyExecutable & any_executable)

View file

@ -27,8 +27,11 @@ using rclcpp::executors::MultiThreadedExecutor;
MultiThreadedExecutor::MultiThreadedExecutor( MultiThreadedExecutor::MultiThreadedExecutor(
const rclcpp::executor::ExecutorArgs & args, const rclcpp::executor::ExecutorArgs & args,
size_t number_of_threads, size_t number_of_threads,
bool yield_before_execute) bool yield_before_execute,
: executor::Executor(args), yield_before_execute_(yield_before_execute) std::chrono::nanoseconds next_exec_timeout)
: executor::Executor(args),
yield_before_execute_(yield_before_execute),
next_exec_timeout_(next_exec_timeout)
{ {
number_of_threads_ = number_of_threads ? number_of_threads : std::thread::hardware_concurrency(); number_of_threads_ = number_of_threads ? number_of_threads : std::thread::hardware_concurrency();
if (number_of_threads_ == 0) { if (number_of_threads_ == 0) {
@ -77,7 +80,7 @@ MultiThreadedExecutor::run(size_t)
if (!rclcpp::ok(this->context_) || !spinning.load()) { if (!rclcpp::ok(this->context_) || !spinning.load()) {
return; return;
} }
if (!get_next_executable(any_exec)) { if (!get_next_executable(any_exec, next_exec_timeout_)) {
continue; continue;
} }
if (any_exec.timer) { if (any_exec.timer) {

View file

@ -32,16 +32,14 @@ MemoryStrategy::get_subscription_by_handle(
if (!group) { if (!group) {
continue; continue;
} }
for (auto & weak_subscription : group->get_subscription_ptrs()) { auto match_subscription = group->find_subscription_ptrs_if(
auto subscription = weak_subscription.lock(); [&subscriber_handle](const rclcpp::SubscriptionBase::SharedPtr & subscription) -> bool {
if (subscription) { return
if (subscription->get_subscription_handle() == subscriber_handle) { (subscription->get_subscription_handle() == subscriber_handle) ||
return subscription; (subscription->get_intra_process_subscription_handle() == subscriber_handle);
} });
if (subscription->get_intra_process_subscription_handle() == subscriber_handle) { if (match_subscription) {
return subscription; return match_subscription;
}
}
} }
} }
} }
@ -63,11 +61,12 @@ MemoryStrategy::get_service_by_handle(
if (!group) { if (!group) {
continue; continue;
} }
for (auto & weak_service : group->get_service_ptrs()) { auto service_ref = group->find_service_ptrs_if(
auto service = weak_service.lock(); [&service_handle](const rclcpp::ServiceBase::SharedPtr & service) -> bool {
if (service && service->get_service_handle() == service_handle) { return service->get_service_handle() == service_handle;
return service; });
} if (service_ref) {
return service_ref;
} }
} }
} }
@ -89,11 +88,12 @@ MemoryStrategy::get_client_by_handle(
if (!group) { if (!group) {
continue; continue;
} }
for (auto & weak_client : group->get_client_ptrs()) { auto client_ref = group->find_client_ptrs_if(
auto client = weak_client.lock(); [&client_handle](const rclcpp::ClientBase::SharedPtr & client) -> bool {
if (client && client->get_client_handle() == client_handle) { return client->get_client_handle() == client_handle;
return client; });
} if (client_ref) {
return client_ref;
} }
} }
} }
@ -138,14 +138,15 @@ MemoryStrategy::get_group_by_subscription(
if (!group) { if (!group) {
continue; continue;
} }
for (auto & weak_sub : group->get_subscription_ptrs()) { auto match_sub = group->find_subscription_ptrs_if(
auto sub = weak_sub.lock(); [&subscription](const rclcpp::SubscriptionBase::SharedPtr & sub) -> bool {
if (sub == subscription) { return sub == subscription;
});
if (match_sub) {
return group; return group;
} }
} }
} }
}
return nullptr; return nullptr;
} }
@ -164,14 +165,15 @@ MemoryStrategy::get_group_by_service(
if (!group) { if (!group) {
continue; continue;
} }
for (auto & weak_serv : group->get_service_ptrs()) { auto service_ref = group->find_service_ptrs_if(
auto serv = weak_serv.lock(); [&service](const rclcpp::ServiceBase::SharedPtr & serv) -> bool {
if (serv && serv == service) { return serv == service;
});
if (service_ref) {
return group; return group;
} }
} }
} }
}
return nullptr; return nullptr;
} }
@ -190,14 +192,15 @@ MemoryStrategy::get_group_by_client(
if (!group) { if (!group) {
continue; continue;
} }
for (auto & weak_client : group->get_client_ptrs()) { auto client_ref = group->find_client_ptrs_if(
auto cli = weak_client.lock(); [&client](const rclcpp::ClientBase::SharedPtr & cli) -> bool {
if (cli && cli == client) { return cli == client;
});
if (client_ref) {
return group; return group;
} }
} }
} }
}
return nullptr; return nullptr;
} }
@ -216,13 +219,14 @@ MemoryStrategy::get_group_by_waitable(
if (!group) { if (!group) {
continue; continue;
} }
for (auto & weak_waitable : group->get_waitable_ptrs()) { auto waitable_ref = group->find_waitable_ptrs_if(
auto group_waitable = weak_waitable.lock(); [&waitable](const rclcpp::Waitable::SharedPtr & group_waitable) -> bool {
if (group_waitable && group_waitable == waitable) { return group_waitable == waitable;
});
if (waitable_ref) {
return group; return group;
} }
} }
} }
}
return nullptr; return nullptr;
} }