pass AnyExecutable objects as reference to avoid memory allocation (#463)

* pass AnyExecutable objects as reference to avoid memory allocation

* remove style change
This commit is contained in:
William Woodall 2018-04-17 21:54:42 -05:00 committed by GitHub
parent 360f1b9425
commit 1610fc3973
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 88 additions and 94 deletions

View file

@ -33,8 +33,6 @@ namespace executor
struct AnyExecutable
{
RCLCPP_SMART_PTR_DEFINITIONS(AnyExecutable)
RCLCPP_PUBLIC
AnyExecutable();

View file

@ -287,7 +287,7 @@ protected:
*/
RCLCPP_PUBLIC
void
execute_any_executable(AnyExecutable::SharedPtr any_exec);
execute_any_executable(AnyExecutable & any_exec);
RCLCPP_PUBLIC
static void
@ -325,15 +325,17 @@ protected:
RCLCPP_PUBLIC
void
get_next_timer(AnyExecutable::SharedPtr any_exec);
get_next_timer(AnyExecutable & any_exec);
RCLCPP_PUBLIC
AnyExecutable::SharedPtr
get_next_ready_executable();
bool
get_next_ready_executable(AnyExecutable & any_executable);
RCLCPP_PUBLIC
AnyExecutable::SharedPtr
get_next_executable(std::chrono::nanoseconds timeout = std::chrono::nanoseconds(-1));
bool
get_next_executable(
AnyExecutable & any_executable,
std::chrono::nanoseconds timeout = std::chrono::nanoseconds(-1));
/// Spinning state, used to prevent multi threaded calls to spin and to cancel blocking spins.
std::atomic_bool spinning;

View file

@ -55,27 +55,23 @@ public:
virtual void clear_handles() = 0;
virtual void remove_null_handles(rcl_wait_set_t * wait_set) = 0;
/// Provide a newly initialized AnyExecutable object.
// \return Shared pointer to the fresh executable.
virtual rclcpp::executor::AnyExecutable::SharedPtr instantiate_next_executable() = 0;
virtual void add_guard_condition(const rcl_guard_condition_t * guard_condition) = 0;
virtual void remove_guard_condition(const rcl_guard_condition_t * guard_condition) = 0;
virtual void
get_next_subscription(
rclcpp::executor::AnyExecutable::SharedPtr any_exec,
rclcpp::executor::AnyExecutable & any_exec,
const WeakNodeVector & weak_nodes) = 0;
virtual void
get_next_service(
rclcpp::executor::AnyExecutable::SharedPtr any_exec,
rclcpp::executor::AnyExecutable & any_exec,
const WeakNodeVector & weak_nodes) = 0;
virtual void
get_next_client(
rclcpp::executor::AnyExecutable::SharedPtr any_exec,
rclcpp::executor::AnyExecutable & any_exec,
const WeakNodeVector & weak_nodes) = 0;
virtual rcl_allocator_t

View file

@ -48,21 +48,16 @@ class AllocatorMemoryStrategy : public memory_strategy::MemoryStrategy
public:
RCLCPP_SMART_PTR_DEFINITIONS(AllocatorMemoryStrategy<Alloc>)
using ExecAllocTraits = allocator::AllocRebind<executor::AnyExecutable, Alloc>;
using ExecAlloc = typename ExecAllocTraits::allocator_type;
using ExecDeleter = allocator::Deleter<ExecAlloc, executor::AnyExecutable>;
using VoidAllocTraits = typename allocator::AllocRebind<void *, Alloc>;
using VoidAlloc = typename VoidAllocTraits::allocator_type;
explicit AllocatorMemoryStrategy(std::shared_ptr<Alloc> allocator)
{
executable_allocator_ = std::make_shared<ExecAlloc>(*allocator.get());
allocator_ = std::make_shared<VoidAlloc>(*allocator.get());
}
AllocatorMemoryStrategy()
{
executable_allocator_ = std::make_shared<ExecAlloc>();
allocator_ = std::make_shared<VoidAlloc>();
}
@ -235,16 +230,9 @@ public:
return true;
}
/// Provide a newly initialized AnyExecutable object.
// \return Shared pointer to the fresh executable.
executor::AnyExecutable::SharedPtr instantiate_next_executable()
{
return std::allocate_shared<executor::AnyExecutable>(*executable_allocator_.get());
}
virtual void
get_next_subscription(
executor::AnyExecutable::SharedPtr any_exec,
executor::AnyExecutable & any_exec,
const WeakNodeVector & weak_nodes)
{
auto it = subscription_handles_.begin();
@ -272,12 +260,12 @@ public:
}
// Otherwise it is safe to set and return the any_exec
if (is_intra_process) {
any_exec->subscription_intra_process = subscription;
any_exec.subscription_intra_process = subscription;
} else {
any_exec->subscription = subscription;
any_exec.subscription = subscription;
}
any_exec->callback_group = group;
any_exec->node_base = get_node_by_group(group, weak_nodes);
any_exec.callback_group = group;
any_exec.node_base = get_node_by_group(group, weak_nodes);
subscription_handles_.erase(it);
return;
}
@ -288,7 +276,7 @@ public:
virtual void
get_next_service(
executor::AnyExecutable::SharedPtr any_exec,
executor::AnyExecutable & any_exec,
const WeakNodeVector & weak_nodes)
{
auto it = service_handles_.begin();
@ -310,9 +298,9 @@ public:
continue;
}
// Otherwise it is safe to set and return the any_exec
any_exec->service = service;
any_exec->callback_group = group;
any_exec->node_base = get_node_by_group(group, weak_nodes);
any_exec.service = service;
any_exec.callback_group = group;
any_exec.node_base = get_node_by_group(group, weak_nodes);
service_handles_.erase(it);
return;
}
@ -322,7 +310,7 @@ public:
}
virtual void
get_next_client(executor::AnyExecutable::SharedPtr any_exec, const WeakNodeVector & weak_nodes)
get_next_client(executor::AnyExecutable & any_exec, const WeakNodeVector & weak_nodes)
{
auto it = client_handles_.begin();
while (it != client_handles_.end()) {
@ -343,9 +331,9 @@ public:
continue;
}
// Otherwise it is safe to set and return the any_exec
any_exec->client = client;
any_exec->callback_group = group;
any_exec->node_base = get_node_by_group(group, weak_nodes);
any_exec.client = client;
any_exec.callback_group = group;
any_exec.node_base = get_node_by_group(group, weak_nodes);
client_handles_.erase(it);
return;
}
@ -396,7 +384,6 @@ private:
VectorRebind<std::shared_ptr<const rcl_client_t>> client_handles_;
VectorRebind<std::shared_ptr<const rcl_timer_t>> timer_handles_;
std::shared_ptr<ExecAlloc> executable_allocator_;
std::shared_ptr<VoidAlloc> allocator_;
};

View file

@ -206,9 +206,13 @@ Executor::spin_some()
throw std::runtime_error("spin_some() called while already spinning");
}
RCLCPP_SCOPE_EXIT(this->spinning.store(false); );
AnyExecutable::SharedPtr any_exec;
while ((any_exec = get_next_executable(std::chrono::milliseconds::zero())) && spinning.load()) {
execute_any_executable(any_exec);
while (spinning.load()) {
AnyExecutable any_exec;
if (get_next_executable(any_exec, std::chrono::milliseconds::zero())) {
execute_any_executable(any_exec);
} else {
break;
}
}
}
@ -219,8 +223,8 @@ Executor::spin_once(std::chrono::nanoseconds timeout)
throw std::runtime_error("spin_once() called while already spinning");
}
RCLCPP_SCOPE_EXIT(this->spinning.store(false); );
auto any_exec = get_next_executable(timeout);
if (any_exec) {
AnyExecutable any_exec;
if (get_next_executable(any_exec, timeout)) {
execute_any_executable(any_exec);
}
}
@ -244,28 +248,28 @@ Executor::set_memory_strategy(rclcpp::memory_strategy::MemoryStrategy::SharedPtr
}
void
Executor::execute_any_executable(AnyExecutable::SharedPtr any_exec)
Executor::execute_any_executable(AnyExecutable & any_exec)
{
if (!any_exec || !spinning.load()) {
if (!spinning.load()) {
return;
}
if (any_exec->timer) {
execute_timer(any_exec->timer);
if (any_exec.timer) {
execute_timer(any_exec.timer);
}
if (any_exec->subscription) {
execute_subscription(any_exec->subscription);
if (any_exec.subscription) {
execute_subscription(any_exec.subscription);
}
if (any_exec->subscription_intra_process) {
execute_intra_process_subscription(any_exec->subscription_intra_process);
if (any_exec.subscription_intra_process) {
execute_intra_process_subscription(any_exec.subscription_intra_process);
}
if (any_exec->service) {
execute_service(any_exec->service);
if (any_exec.service) {
execute_service(any_exec.service);
}
if (any_exec->client) {
execute_client(any_exec->client);
if (any_exec.client) {
execute_client(any_exec.client);
}
// Reset the callback_group, regardless of type
any_exec->callback_group->can_be_taken_from().store(true);
any_exec.callback_group->can_be_taken_from().store(true);
// Wake the wait, because it may need to be recalculated or work that
// was previously blocked is now available.
if (rcl_trigger_guard_condition(&interrupt_guard_condition_) != RCL_RET_OK) {
@ -508,7 +512,7 @@ Executor::get_group_by_timer(rclcpp::TimerBase::SharedPtr timer)
}
void
Executor::get_next_timer(AnyExecutable::SharedPtr any_exec)
Executor::get_next_timer(AnyExecutable & any_exec)
{
for (auto & weak_node : weak_nodes_) {
auto node = weak_node.lock();
@ -523,8 +527,8 @@ Executor::get_next_timer(AnyExecutable::SharedPtr any_exec)
for (auto & timer_ref : group->get_timer_ptrs()) {
auto timer = timer_ref.lock();
if (timer && timer->is_ready()) {
any_exec->timer = timer;
any_exec->callback_group = group;
any_exec.timer = timer;
any_exec.callback_group = group;
node = get_node_by_group(group);
return;
}
@ -533,67 +537,69 @@ Executor::get_next_timer(AnyExecutable::SharedPtr any_exec)
}
}
AnyExecutable::SharedPtr
Executor::get_next_ready_executable()
bool
Executor::get_next_ready_executable(AnyExecutable & any_executable)
{
auto any_exec = memory_strategy_->instantiate_next_executable();
// Check the timers to see if there are any that are ready, if so return
get_next_timer(any_exec);
if (any_exec->timer) {
return any_exec;
get_next_timer(any_executable);
if (any_executable.timer) {
return true;
}
// Check the subscriptions to see if there are any that are ready
memory_strategy_->get_next_subscription(any_exec, weak_nodes_);
if (any_exec->subscription || any_exec->subscription_intra_process) {
return any_exec;
memory_strategy_->get_next_subscription(any_executable, weak_nodes_);
if (any_executable.subscription || any_executable.subscription_intra_process) {
return true;
}
// Check the services to see if there are any that are ready
memory_strategy_->get_next_service(any_exec, weak_nodes_);
if (any_exec->service) {
return any_exec;
memory_strategy_->get_next_service(any_executable, weak_nodes_);
if (any_executable.service) {
return true;
}
// Check the clients to see if there are any that are ready
memory_strategy_->get_next_client(any_exec, weak_nodes_);
if (any_exec->client) {
return any_exec;
memory_strategy_->get_next_client(any_executable, weak_nodes_);
if (any_executable.client) {
return true;
}
// If there is no ready executable, return a null ptr
return nullptr;
return false;
}
AnyExecutable::SharedPtr
Executor::get_next_executable(std::chrono::nanoseconds timeout)
bool
Executor::get_next_executable(AnyExecutable & any_executable, std::chrono::nanoseconds timeout)
{
bool success = false;
// Check to see if there are any subscriptions or timers needing service
// TODO(wjwwood): improve run to run efficiency of this function
auto any_exec = get_next_ready_executable();
success = get_next_ready_executable(any_executable);
// If there are none
if (!any_exec) {
if (!success) {
// Wait for subscriptions or timers to work on
wait_for_work(timeout);
if (!spinning.load()) {
return nullptr;
return false;
}
// Try again
any_exec = get_next_ready_executable();
success = get_next_ready_executable(any_executable);
}
// At this point any_exec should be valid with either a valid subscription
// or a valid timer, or it should be a null shared_ptr
if (any_exec) {
if (success) {
// If it is valid, check to see if the group is mutually exclusive or
// not, then mark it accordingly
if (any_exec->callback_group && any_exec->callback_group->type() == \
callback_group::CallbackGroupType::MutuallyExclusive)
using callback_group::CallbackGroupType;
if (
any_executable.callback_group &&
any_executable.callback_group->type() == CallbackGroupType::MutuallyExclusive)
{
// It should not have been taken otherwise
assert(any_exec->callback_group->can_be_taken_from().load());
assert(any_executable.callback_group->can_be_taken_from().load());
// Set to false to indicate something is being run from this group
// This is reset to true either when the any_exec is executed or when the
// any_exec is destructued
any_exec->callback_group->can_be_taken_from().store(false);
any_executable.callback_group->can_be_taken_from().store(false);
}
}
return any_exec;
return success;
}
std::ostream &

View file

@ -69,13 +69,15 @@ void
MultiThreadedExecutor::run(size_t)
{
while (rclcpp::ok() && spinning.load()) {
executor::AnyExecutable::SharedPtr any_exec;
executor::AnyExecutable any_exec;
{
std::lock_guard<std::mutex> wait_lock(wait_mutex_);
if (!rclcpp::ok() || !spinning.load()) {
return;
}
any_exec = get_next_executable();
if (!get_next_executable(any_exec)) {
continue;
}
}
execute_any_executable(any_exec);
}

View file

@ -13,6 +13,7 @@
// limitations under the License.
#include "rclcpp/executors/single_threaded_executor.hpp"
#include "rclcpp/any_executable.hpp"
#include "rclcpp/scope_exit.hpp"
using rclcpp::executors::SingleThreadedExecutor;
@ -30,7 +31,9 @@ SingleThreadedExecutor::spin()
}
RCLCPP_SCOPE_EXIT(this->spinning.store(false); );
while (rclcpp::ok() && spinning.load()) {
auto any_exec = get_next_executable();
execute_any_executable(any_exec);
rclcpp::executor::AnyExecutable any_executable;
if (get_next_executable(any_executable)) {
execute_any_executable(any_executable);
}
}
}