[rclcpp] adding a "static" single threaded executor (#1034)

* Added static single threaded executor functionality

Signed-off-by: Ishu Goel <ishu.goel@nobleo.nl>

executor enhanced to run clients and waitable

Signed-off-by: Ishu Goel <ishu.goel@nobleo.nl>

tested executor

Signed-off-by: Ishu Goel <ishu.goel@nobleo.nl>

added semi-dynamic feature to the executor

Signed-off-by: Ishu Goel <ishu.goel@nobleo.nl>

Jenkins error fixes

Signed-off-by: Ishu Goel <ishu.goel@nobleo.nl>

Added static single threaded executor functionality

Signed-off-by: Ishu Goel <ishu.goel@nobleo.nl>

* Added semi-dynamic feature and made changes based on review comments

Signed-off-by: Ishu Goel <ishu.goel@nobleo.nl>

* re-added accidentally deleted code in node.hpp, fixed static_single_threaded_executor.cpp w.r.t. intra-process change since last commit

Signed-off-by: MartinCornelis2 <martin.cornelis@nobleo.nl>

* Remove not needed comparison

wait_set_.size_of_* is always different than '0'
if we are inside the for loop

Signed-off-by: Mauro <mpasserino@irobot.com>

* If new entity added to a node: re-collect entities

Now we check ONLY node guard_conditions_

Some possible guard conditions to be triggered HERE are:
1. Ctrl+C guard condition
2. Executor interrupt_guard_condition_
3. Node guard_conditions_
4. Waitables guard conditions
5. ..more

The previous approach was only checking if NOT (1 & 2),
so if a Waitable was triggered, it would re-collect all
entities, even if no new node entity was added. This was the case
of the intra process manager, who relies on waitables.
Every time a subscriber got a message, all the entities
were collected.

Signed-off-by: Mauro <mpasserino@irobot.com>

* Implement static executor entities collector

Signed-off-by: Mauro <mpasserino@irobot.com>

* fixup and style

Signed-off-by: William Woodall <william@osrfoundation.org>

* mark new classes as final, with non-virtual destructors

Signed-off-by: William Woodall <william@osrfoundation.org>

* adding copyright to static executor files

Signed-off-by: Ishu Goel <ishu.goel@nobleo.nl>

* fixup

Signed-off-by: William Woodall <william@osrfoundation.org>

* cpplint fixes

Signed-off-by: William Woodall <william@osrfoundation.org>

Co-authored-by: Ishu Goel <ishu.goel@nobleo.nl>
Co-authored-by: MartinCornelis2 <martin.cornelis@nobleo.nl>
Co-authored-by: Mauro <mpasserino@irobot.com>
This commit is contained in:
William Woodall 2020-03-28 00:37:51 -07:00 committed by GitHub
parent 9017efbca0
commit 99286978f9
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
11 changed files with 986 additions and 2 deletions

View file

@ -40,11 +40,14 @@ set(${PROJECT_NAME}_SRCS
src/rclcpp/duration.cpp
src/rclcpp/event.cpp
src/rclcpp/exceptions.cpp
src/rclcpp/executable_list.cpp
src/rclcpp/executor.cpp
src/rclcpp/executors.cpp
src/rclcpp/expand_topic_or_service_name.cpp
src/rclcpp/executors/multi_threaded_executor.cpp
src/rclcpp/executors/single_threaded_executor.cpp
src/rclcpp/executors/static_executor_entities_collector.cpp
src/rclcpp/executors/static_single_threaded_executor.cpp
src/rclcpp/graph_listener.cpp
src/rclcpp/init_options.cpp
src/rclcpp/intra_process_manager.cpp

View file

@ -0,0 +1,92 @@
// Copyright 2019 Nobleo Technology
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef RCLCPP__EXECUTABLE_LIST_HPP_
#define RCLCPP__EXECUTABLE_LIST_HPP_
#include <memory>
#include <vector>
#include "rclcpp/client.hpp"
#include "rclcpp/service.hpp"
#include "rclcpp/subscription_base.hpp"
#include "rclcpp/timer.hpp"
#include "rclcpp/visibility_control.hpp"
#include "rclcpp/waitable.hpp"
namespace rclcpp
{
namespace executor
{
/// This class contains subscriptionbase, timerbase, etc. which can be used to run callbacks.
class ExecutableList final
{
public:
RCLCPP_PUBLIC
ExecutableList();
RCLCPP_PUBLIC
~ExecutableList();
RCLCPP_PUBLIC
void
clear();
RCLCPP_PUBLIC
void
add_subscription(rclcpp::SubscriptionBase::SharedPtr subscription);
RCLCPP_PUBLIC
void
add_timer(rclcpp::TimerBase::SharedPtr timer);
RCLCPP_PUBLIC
void
add_service(rclcpp::ServiceBase::SharedPtr service);
RCLCPP_PUBLIC
void
add_client(rclcpp::ClientBase::SharedPtr client);
RCLCPP_PUBLIC
void
add_waitable(rclcpp::Waitable::SharedPtr waitable);
// Vector containing the SubscriptionBase of all the subscriptions added to the executor.
std::vector<rclcpp::SubscriptionBase::SharedPtr> subscription;
// Contains the count of added subscriptions
size_t number_of_subscriptions;
// Vector containing the TimerBase of all the timers added to the executor.
std::vector<rclcpp::TimerBase::SharedPtr> timer;
// Contains the count of added timers
size_t number_of_timers;
// Vector containing the ServiceBase of all the services added to the executor.
std::vector<rclcpp::ServiceBase::SharedPtr> service;
// Contains the count of added services
size_t number_of_services;
// Vector containing the ClientBase of all the clients added to the executor.
std::vector<rclcpp::ClientBase::SharedPtr> client;
// Contains the count of added clients
size_t number_of_clients;
// Vector containing all the waitables added to the executor.
std::vector<rclcpp::Waitable::SharedPtr> waitable;
// Contains the count of added waitables
size_t number_of_waitables;
};
} // namespace executor
} // namespace rclcpp
#endif // RCLCPP__EXECUTABLE_LIST_HPP_

View file

@ -357,7 +357,6 @@ protected:
/// The context associated with this executor.
std::shared_ptr<rclcpp::Context> context_;
private:
RCLCPP_DISABLE_COPY(Executor)
std::list<rclcpp::node_interfaces::NodeBaseInterface::WeakPtr> weak_nodes_;

View file

@ -20,6 +20,7 @@
#include "rclcpp/executors/multi_threaded_executor.hpp"
#include "rclcpp/executors/single_threaded_executor.hpp"
#include "rclcpp/executors/static_single_threaded_executor.hpp"
#include "rclcpp/node.hpp"
#include "rclcpp/utilities.hpp"
#include "rclcpp/visibility_control.hpp"

View file

@ -0,0 +1,167 @@
// Copyright 2020 Open Source Robotics Foundation, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef RCLCPP__EXECUTORS__STATIC_EXECUTOR_ENTITIES_COLLECTOR_HPP_
#define RCLCPP__EXECUTORS__STATIC_EXECUTOR_ENTITIES_COLLECTOR_HPP_
#include <list>
#include <memory>
#include "rcl/guard_condition.h"
#include "rcl/wait.h"
#include "rclcpp/executable_list.hpp"
#include "rclcpp/macros.hpp"
#include "rclcpp/memory_strategy.hpp"
#include "rclcpp/visibility_control.hpp"
#include "rclcpp/waitable.hpp"
namespace rclcpp
{
namespace executors
{
class StaticExecutorEntitiesCollector final
: public rclcpp::Waitable,
public std::enable_shared_from_this<StaticExecutorEntitiesCollector>
{
public:
RCLCPP_SMART_PTR_DEFINITIONS(StaticExecutorEntitiesCollector)
// Constructor
RCLCPP_PUBLIC
StaticExecutorEntitiesCollector() = default;
// Destructor
~StaticExecutorEntitiesCollector();
RCLCPP_PUBLIC
void
init(
rcl_wait_set_t * p_wait_set,
rclcpp::memory_strategy::MemoryStrategy::SharedPtr & memory_strategy,
rcl_guard_condition_t * executor_guard_condition);
RCLCPP_PUBLIC
void
execute() override;
RCLCPP_PUBLIC
void
fill_memory_strategy();
RCLCPP_PUBLIC
void
fill_executable_list();
/// Function to reallocate space for entities in the wait set.
RCLCPP_PUBLIC
void
prepare_wait_set();
/// Function to add_handles_to_wait_set and wait for work and
// block until the wait set is ready or until the timeout has been exceeded.
RCLCPP_PUBLIC
void
refresh_wait_set(std::chrono::nanoseconds timeout = std::chrono::nanoseconds(-1));
RCLCPP_PUBLIC
bool
add_to_wait_set(rcl_wait_set_t * wait_set) override;
RCLCPP_PUBLIC
size_t
get_number_of_ready_guard_conditions() override;
RCLCPP_PUBLIC
void
add_node(
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr);
RCLCPP_PUBLIC
bool
remove_node(
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr);
/// Complete all available queued work without blocking.
/**
* This function checks if after the guard condition was triggered
* (or a spurious wakeup happened) we are really ready to execute
* i.e. re-collect entities
*/
RCLCPP_PUBLIC
bool
is_ready(rcl_wait_set_t * wait_set) override;
RCLCPP_PUBLIC
size_t
get_number_of_timers() {return exec_list_.number_of_timers;}
RCLCPP_PUBLIC
size_t
get_number_of_subscriptions() {return exec_list_.number_of_subscriptions;}
RCLCPP_PUBLIC
size_t
get_number_of_services() {return exec_list_.number_of_services;}
RCLCPP_PUBLIC
size_t
get_number_of_clients() {return exec_list_.number_of_clients;}
RCLCPP_PUBLIC
size_t
get_number_of_waitables() {return exec_list_.number_of_waitables;}
RCLCPP_PUBLIC
rclcpp::SubscriptionBase::SharedPtr
get_subscription(size_t i) {return exec_list_.subscription[i];}
RCLCPP_PUBLIC
rclcpp::TimerBase::SharedPtr
get_timer(size_t i) {return exec_list_.timer[i];}
RCLCPP_PUBLIC
rclcpp::ServiceBase::SharedPtr
get_service(size_t i) {return exec_list_.service[i];}
RCLCPP_PUBLIC
rclcpp::ClientBase::SharedPtr
get_client(size_t i) {return exec_list_.client[i];}
RCLCPP_PUBLIC
rclcpp::Waitable::SharedPtr
get_waitable(size_t i) {return exec_list_.waitable[i];}
private:
/// Nodes guard conditions which trigger this waitable
std::list<const rcl_guard_condition_t *> guard_conditions_;
/// Memory strategy: an interface for handling user-defined memory allocation strategies.
rclcpp::memory_strategy::MemoryStrategy::SharedPtr memory_strategy_;
/// List of weak nodes registered in the static executor
std::list<rclcpp::node_interfaces::NodeBaseInterface::WeakPtr> weak_nodes_;
/// Wait set for managing entities that the rmw layer waits on.
rcl_wait_set_t * p_wait_set_ = nullptr;
/// Executable list: timers, subscribers, clients, services and waitables
rclcpp::executor::ExecutableList exec_list_;
};
} // namespace executors
} // namespace rclcpp
#endif // RCLCPP__EXECUTORS__STATIC_EXECUTOR_ENTITIES_COLLECTOR_HPP_

View file

@ -0,0 +1,200 @@
// Copyright 2019 Nobleo Technology
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef RCLCPP__EXECUTORS__STATIC_SINGLE_THREADED_EXECUTOR_HPP_
#define RCLCPP__EXECUTORS__STATIC_SINGLE_THREADED_EXECUTOR_HPP_
#include <cassert>
#include <cstdlib>
#include <memory>
#include <vector>
#include <string>
#include "rmw/rmw.h"
#include "rclcpp/executable_list.hpp"
#include "rclcpp/executor.hpp"
#include "rclcpp/executors/static_executor_entities_collector.hpp"
#include "rclcpp/macros.hpp"
#include "rclcpp/memory_strategies.hpp"
#include "rclcpp/node.hpp"
#include "rclcpp/rate.hpp"
#include "rclcpp/utilities.hpp"
#include "rclcpp/visibility_control.hpp"
namespace rclcpp
{
namespace executors
{
/// Static executor implementation
/**
* This executor is a static version of the original single threaded executor.
* It's static because it doesn't reconstruct the executable list for every iteration.
* All nodes, callbackgroups, timers, subscriptions etc. are created before
* spin() is called, and modified only when an entity is added/removed to/from a node.
*
* To run this executor instead of SingleThreadedExecutor replace:
* rclcpp::executors::SingleThreadedExecutor exec;
* by
* rclcpp::executors::StaticSingleThreadedExecutor exec;
* in your source code and spin node(s) in the following way:
* exec.add_node(node);
* exec.spin();
* exec.remove_node(node);
*/
class StaticSingleThreadedExecutor : public executor::Executor
{
public:
RCLCPP_SMART_PTR_DEFINITIONS(StaticSingleThreadedExecutor)
/// Default constructor. See the default constructor for Executor.
RCLCPP_PUBLIC
explicit StaticSingleThreadedExecutor(
const executor::ExecutorArgs & args = executor::ExecutorArgs());
/// Default destrcutor.
RCLCPP_PUBLIC
virtual ~StaticSingleThreadedExecutor();
/// Static executor implementation of spin.
// This function will block until work comes in, execute it, and keep blocking.
// It will only be interrupt by a CTRL-C (managed by the global signal handler).
RCLCPP_PUBLIC
void
spin() override;
/// Add a node to the executor.
/**
* An executor can have zero or more nodes which provide work during `spin` functions.
* \param[in] node_ptr Shared pointer to the node to be added.
* \param[in] notify True to trigger the interrupt guard condition during this function. If
* the executor is blocked at the rmw layer while waiting for work and it is notified that a new
* node was added, it will wake up.
*/
RCLCPP_PUBLIC
void
add_node(
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr,
bool notify = true) override;
/// Convenience function which takes Node and forwards NodeBaseInterface.
RCLCPP_PUBLIC
void
add_node(std::shared_ptr<rclcpp::Node> node_ptr, bool notify = true) override;
/// Remove a node from the executor.
/**
* \param[in] node_ptr Shared pointer to the node to remove.
* \param[in] notify True to trigger the interrupt guard condition and wake up the executor.
* This is useful if the last node was removed from the executor while the executor was blocked
* waiting for work in another thread, because otherwise the executor would never be notified.
*/
RCLCPP_PUBLIC
void
remove_node(
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr,
bool notify = true) override;
/// Convenience function which takes Node and forwards NodeBaseInterface.
RCLCPP_PUBLIC
void
remove_node(std::shared_ptr<rclcpp::Node> node_ptr, bool notify = true) override;
/// Spin (blocking) until the future is complete, it times out waiting, or rclcpp is interrupted.
/**
* \param[in] future The future to wait on. If this function returns SUCCESS, the future can be
* accessed without blocking (though it may still throw an exception).
* \param[in] timeout Optional timeout parameter, which gets passed to
* Executor::execute_ready_executables.
* `-1` is block forever, `0` is non-blocking.
* If the time spent inside the blocking loop exceeds this timeout, return a TIMEOUT return
* code.
* \return The return code, one of `SUCCESS`, `INTERRUPTED`, or `TIMEOUT`.
*
* Example usage:
* rclcpp::executors::StaticSingleThreadedExecutor exec;
* // ... other part of code like creating node
* // define future
* exec.add_node(node);
* exec.spin_until_future_complete(future);
*/
template<typename ResponseT, typename TimeRepT = int64_t, typename TimeT = std::milli>
rclcpp::executor::FutureReturnCode
spin_until_future_complete(
std::shared_future<ResponseT> & future,
std::chrono::duration<TimeRepT, TimeT> timeout = std::chrono::duration<TimeRepT, TimeT>(-1))
{
std::future_status status = future.wait_for(std::chrono::seconds(0));
if (status == std::future_status::ready) {
return rclcpp::executor::FutureReturnCode::SUCCESS;
}
auto end_time = std::chrono::steady_clock::now();
std::chrono::nanoseconds timeout_ns = std::chrono::duration_cast<std::chrono::nanoseconds>(
timeout);
if (timeout_ns > std::chrono::nanoseconds::zero()) {
end_time += timeout_ns;
}
std::chrono::nanoseconds timeout_left = timeout_ns;
entities_collector_ = std::make_shared<StaticExecutorEntitiesCollector>();
entities_collector_->init(&wait_set_, memory_strategy_, &interrupt_guard_condition_);
while (rclcpp::ok(this->context_)) {
// Do one set of work.
entities_collector_->refresh_wait_set(timeout_left);
execute_ready_executables();
// Check if the future is set, return SUCCESS if it is.
status = future.wait_for(std::chrono::seconds(0));
if (status == std::future_status::ready) {
return rclcpp::executor::FutureReturnCode::SUCCESS;
}
// If the original timeout is < 0, then this is blocking, never TIMEOUT.
if (timeout_ns < std::chrono::nanoseconds::zero()) {
continue;
}
// Otherwise check if we still have time to wait, return TIMEOUT if not.
auto now = std::chrono::steady_clock::now();
if (now >= end_time) {
return rclcpp::executor::FutureReturnCode::TIMEOUT;
}
// Subtract the elapsed time from the original timeout.
timeout_left = std::chrono::duration_cast<std::chrono::nanoseconds>(end_time - now);
}
// The future did not complete before ok() returned false, return INTERRUPTED.
return rclcpp::executor::FutureReturnCode::INTERRUPTED;
}
protected:
/// Check which executables in ExecutableList struct are ready from wait_set and execute them.
/**
* \param[in] exec_list Structure that can hold subscriptionbases, timerbases, etc
* \param[in] timeout Optional timeout parameter.
*/
RCLCPP_PUBLIC
void
execute_ready_executables();
private:
RCLCPP_DISABLE_COPY(StaticSingleThreadedExecutor)
StaticExecutorEntitiesCollector::SharedPtr entities_collector_;
};
} // namespace executors
} // namespace rclcpp
#endif // RCLCPP__EXECUTORS__STATIC_SINGLE_THREADED_EXECUTOR_HPP_

View file

@ -56,6 +56,7 @@ public:
virtual size_t number_of_guard_conditions() const = 0;
virtual size_t number_of_waitables() const = 0;
virtual void add_waitable_handle(const rclcpp::Waitable::SharedPtr & waitable) = 0;
virtual bool add_handles_to_wait_set(rcl_wait_set_t * wait_set) = 0;
virtual void clear_handles() = 0;
virtual void remove_null_handles(rcl_wait_set_t * wait_set) = 0;

View file

@ -169,7 +169,6 @@ public:
subscription_handles_.push_back(subscription->get_subscription_handle());
return false;
});
group->find_service_ptrs_if(
[this](const rclcpp::ServiceBase::SharedPtr & service) {
service_handles_.push_back(service->get_service_handle());
@ -195,6 +194,14 @@ public:
return has_invalid_weak_nodes;
}
void add_waitable_handle(const rclcpp::Waitable::SharedPtr & waitable)
{
if (nullptr == waitable) {
throw std::runtime_error("waitable object unexpectedly nullptr");
}
waitable_handles_.push_back(waitable);
}
bool add_handles_to_wait_set(rcl_wait_set_t * wait_set) override
{
for (auto subscription : subscription_handles_) {

View file

@ -0,0 +1,84 @@
// Copyright 2019 Nobleo Technology
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <utility>
#include "rclcpp/executable_list.hpp"
using rclcpp::executor::ExecutableList;
ExecutableList::ExecutableList()
: number_of_subscriptions(0),
number_of_timers(0),
number_of_services(0),
number_of_clients(0),
number_of_waitables(0)
{}
ExecutableList::~ExecutableList()
{}
void
ExecutableList::clear()
{
this->timer.clear();
this->number_of_timers = 0;
this->subscription.clear();
this->number_of_subscriptions = 0;
this->service.clear();
this->number_of_services = 0;
this->client.clear();
this->number_of_clients = 0;
this->waitable.clear();
this->number_of_waitables = 0;
}
void
ExecutableList::add_subscription(rclcpp::SubscriptionBase::SharedPtr subscription)
{
this->subscription.push_back(std::move(subscription));
this->number_of_subscriptions++;
}
void
ExecutableList::add_timer(rclcpp::TimerBase::SharedPtr timer)
{
this->timer.push_back(std::move(timer));
this->number_of_timers++;
}
void
ExecutableList::add_service(rclcpp::ServiceBase::SharedPtr service)
{
this->service.push_back(std::move(service));
this->number_of_services++;
}
void
ExecutableList::add_client(rclcpp::ClientBase::SharedPtr client)
{
this->client.push_back(std::move(client));
this->number_of_clients++;
}
void
ExecutableList::add_waitable(rclcpp::Waitable::SharedPtr waitable)
{
this->waitable.push_back(std::move(waitable));
this->number_of_waitables++;
}

View file

@ -0,0 +1,286 @@
// Copyright 2020 Open Source Robotics Foundation, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "rclcpp/executors/static_executor_entities_collector.hpp"
#include <string>
#include <memory>
#include "rclcpp/executors/static_single_threaded_executor.hpp"
using rclcpp::executors::StaticExecutorEntitiesCollector;
StaticExecutorEntitiesCollector::~StaticExecutorEntitiesCollector()
{
// Disassociate all nodes
for (auto & weak_node : weak_nodes_) {
auto node = weak_node.lock();
if (node) {
std::atomic_bool & has_executor = node->get_associated_with_executor_atomic();
has_executor.store(false);
}
}
exec_list_.clear();
weak_nodes_.clear();
guard_conditions_.clear();
}
void
StaticExecutorEntitiesCollector::init(
rcl_wait_set_t * p_wait_set,
memory_strategy::MemoryStrategy::SharedPtr & memory_strategy,
rcl_guard_condition_t * executor_guard_condition)
{
// Empty initialize executable list
exec_list_ = executor::ExecutableList();
// Get executor's wait_set_ pointer
p_wait_set_ = p_wait_set;
// Get executor's memory strategy ptr
if (memory_strategy == nullptr) {
throw std::runtime_error("Received NULL memory strategy in executor waitable.");
}
memory_strategy_ = memory_strategy;
// Add executor's guard condition
guard_conditions_.push_back(executor_guard_condition);
// Get memory strategy and executable list. Prepare wait_set_
execute();
}
void
StaticExecutorEntitiesCollector::execute()
{
// Fill memory strategy with entities coming from weak_nodes_
fill_memory_strategy();
// Fill exec_list_ with entities coming from weak_nodes_ (same as memory strategy)
fill_executable_list();
// Resize the wait_set_ based on memory_strategy handles (rcl_wait_set_resize)
prepare_wait_set();
}
void
StaticExecutorEntitiesCollector::fill_memory_strategy()
{
memory_strategy_->clear_handles();
bool has_invalid_weak_nodes = memory_strategy_->collect_entities(weak_nodes_);
// Clean up any invalid nodes, if they were detected
if (has_invalid_weak_nodes) {
auto node_it = weak_nodes_.begin();
while (node_it != weak_nodes_.end()) {
if (node_it->expired()) {
node_it = weak_nodes_.erase(node_it);
} else {
++node_it;
}
}
}
// Add the static executor waitable to the memory strategy
memory_strategy_->add_waitable_handle(this->shared_from_this());
}
void
StaticExecutorEntitiesCollector::fill_executable_list()
{
exec_list_.clear();
for (auto & weak_node : weak_nodes_) {
auto node = weak_node.lock();
if (!node) {
continue;
}
// Check in all the callback groups
for (auto & weak_group : node->get_callback_groups()) {
auto group = weak_group.lock();
if (!group || !group->can_be_taken_from().load()) {
continue;
}
group->find_timer_ptrs_if(
[this](const rclcpp::TimerBase::SharedPtr & timer) {
if (timer) {
exec_list_.add_timer(timer);
}
return false;
});
group->find_subscription_ptrs_if(
[this](const rclcpp::SubscriptionBase::SharedPtr & subscription) {
if (subscription) {
exec_list_.add_subscription(subscription);
}
return false;
});
group->find_service_ptrs_if(
[this](const rclcpp::ServiceBase::SharedPtr & service) {
if (service) {
exec_list_.add_service(service);
}
return false;
});
group->find_client_ptrs_if(
[this](const rclcpp::ClientBase::SharedPtr & client) {
if (client) {
exec_list_.add_client(client);
}
return false;
});
group->find_waitable_ptrs_if(
[this](const rclcpp::Waitable::SharedPtr & waitable) {
if (waitable) {
exec_list_.add_waitable(waitable);
}
return false;
});
}
}
// Add the executor's waitable to the executable list
exec_list_.add_waitable(shared_from_this());
}
void
StaticExecutorEntitiesCollector::prepare_wait_set()
{
// clear wait set
if (rcl_wait_set_clear(p_wait_set_) != RCL_RET_OK) {
throw std::runtime_error("Couldn't clear wait set");
}
// The size of waitables are accounted for in size of the other entities
rcl_ret_t ret = rcl_wait_set_resize(
p_wait_set_, memory_strategy_->number_of_ready_subscriptions(),
memory_strategy_->number_of_guard_conditions(), memory_strategy_->number_of_ready_timers(),
memory_strategy_->number_of_ready_clients(), memory_strategy_->number_of_ready_services(),
memory_strategy_->number_of_ready_events());
if (RCL_RET_OK != ret) {
throw std::runtime_error(
std::string("Couldn't resize the wait set : ") + rcl_get_error_string().str);
}
}
void
StaticExecutorEntitiesCollector::refresh_wait_set(std::chrono::nanoseconds timeout)
{
// clear wait set (memeset to '0' all wait_set_ entities
// but keeps the wait_set_ number of entities)
if (rcl_wait_set_clear(p_wait_set_) != RCL_RET_OK) {
throw std::runtime_error("Couldn't clear wait set");
}
if (!memory_strategy_->add_handles_to_wait_set(p_wait_set_)) {
throw std::runtime_error("Couldn't fill wait set");
}
rcl_ret_t status =
rcl_wait(p_wait_set_, std::chrono::duration_cast<std::chrono::nanoseconds>(timeout).count());
if (status == RCL_RET_WAIT_SET_EMPTY) {
RCUTILS_LOG_WARN_NAMED(
"rclcpp",
"empty wait set received in rcl_wait(). This should never happen.");
} else if (status != RCL_RET_OK && status != RCL_RET_TIMEOUT) {
using rclcpp::exceptions::throw_from_rcl_error;
throw_from_rcl_error(status, "rcl_wait() failed");
}
}
bool
StaticExecutorEntitiesCollector::add_to_wait_set(rcl_wait_set_t * wait_set)
{
// Add waitable guard conditions (one for each registered node) into the wait set.
for (const auto & gc : guard_conditions_) {
rcl_ret_t ret = rcl_wait_set_add_guard_condition(wait_set, gc, NULL);
if (ret != RCL_RET_OK) {
throw std::runtime_error("Executor waitable: couldn't add guard condition to wait set");
}
}
return true;
}
size_t StaticExecutorEntitiesCollector::get_number_of_ready_guard_conditions()
{
return guard_conditions_.size();
}
void
StaticExecutorEntitiesCollector::add_node(
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr)
{
// Check to ensure node not already added
for (auto & weak_node : weak_nodes_) {
auto node = weak_node.lock();
if (node == node_ptr) {
// TODO(jacquelinekay): Use a different error here?
throw std::runtime_error("Cannot add node to executor, node already added.");
}
}
weak_nodes_.push_back(node_ptr);
guard_conditions_.push_back(node_ptr->get_notify_guard_condition());
}
bool
StaticExecutorEntitiesCollector::remove_node(
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr)
{
auto node_it = weak_nodes_.begin();
while (node_it != weak_nodes_.end()) {
bool matched = (node_it->lock() == node_ptr);
if (matched) {
// Find and remove node and its guard condition
auto gc_it = std::find(
guard_conditions_.begin(),
guard_conditions_.end(),
node_ptr->get_notify_guard_condition());
if (gc_it != guard_conditions_.end()) {
guard_conditions_.erase(gc_it);
weak_nodes_.erase(node_it);
return true;
}
throw std::runtime_error("Didn't find guard condition associated with node.");
} else {
++node_it;
}
}
return false;
}
bool
StaticExecutorEntitiesCollector::is_ready(rcl_wait_set_t * p_wait_set)
{
// Check wait_set guard_conditions for added/removed entities to/from a node
for (size_t i = 0; i < p_wait_set->size_of_guard_conditions; ++i) {
if (p_wait_set->guard_conditions[i] != NULL) {
// Check if the guard condition triggered belongs to a node
auto it = std::find(
guard_conditions_.begin(), guard_conditions_.end(),
p_wait_set->guard_conditions[i]);
// If it does, we are ready to re-collect entities
if (it != guard_conditions_.end()) {
return true;
}
}
}
// None of the guard conditions triggered belong to a registered node
return false;
}

View file

@ -0,0 +1,144 @@
// Copyright 2019 Nobleo Technology
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "rclcpp/executors/static_single_threaded_executor.hpp"
#include <memory>
#include "rclcpp/scope_exit.hpp"
using rclcpp::executors::StaticSingleThreadedExecutor;
using rclcpp::executor::ExecutableList;
StaticSingleThreadedExecutor::StaticSingleThreadedExecutor(
const rclcpp::executor::ExecutorArgs & args)
: executor::Executor(args)
{
entities_collector_ = std::make_shared<StaticExecutorEntitiesCollector>();
}
StaticSingleThreadedExecutor::~StaticSingleThreadedExecutor() {}
void
StaticSingleThreadedExecutor::spin()
{
if (spinning.exchange(true)) {
throw std::runtime_error("spin() called while already spinning");
}
RCLCPP_SCOPE_EXIT(this->spinning.store(false); );
// Set memory_strategy_ and exec_list_ based on weak_nodes_
// Prepare wait_set_ based on memory_strategy_
entities_collector_->init(&wait_set_, memory_strategy_, &interrupt_guard_condition_);
while (rclcpp::ok(this->context_) && spinning.load()) {
// Refresh wait set and wait for work
entities_collector_->refresh_wait_set();
execute_ready_executables();
}
}
void
StaticSingleThreadedExecutor::add_node(
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, bool notify)
{
// If the node already has an executor
std::atomic_bool & has_executor = node_ptr->get_associated_with_executor_atomic();
if (has_executor.exchange(true)) {
throw std::runtime_error("Node has already been added to an executor.");
}
if (notify) {
// Interrupt waiting to handle new node
if (rcl_trigger_guard_condition(&interrupt_guard_condition_) != RCL_RET_OK) {
throw std::runtime_error(rcl_get_error_string().str);
}
}
entities_collector_->add_node(node_ptr);
}
void
StaticSingleThreadedExecutor::add_node(std::shared_ptr<rclcpp::Node> node_ptr, bool notify)
{
this->add_node(node_ptr->get_node_base_interface(), notify);
}
void
StaticSingleThreadedExecutor::remove_node(
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, bool notify)
{
bool node_removed = entities_collector_->remove_node(node_ptr);
if (notify) {
// If the node was matched and removed, interrupt waiting
if (node_removed) {
if (rcl_trigger_guard_condition(&interrupt_guard_condition_) != RCL_RET_OK) {
throw std::runtime_error(rcl_get_error_string().str);
}
}
}
std::atomic_bool & has_executor = node_ptr->get_associated_with_executor_atomic();
has_executor.store(false);
}
void
StaticSingleThreadedExecutor::remove_node(std::shared_ptr<rclcpp::Node> node_ptr, bool notify)
{
this->remove_node(node_ptr->get_node_base_interface(), notify);
}
void
StaticSingleThreadedExecutor::execute_ready_executables()
{
// Execute all the ready subscriptions
for (size_t i = 0; i < wait_set_.size_of_subscriptions; ++i) {
if (i < entities_collector_->get_number_of_subscriptions()) {
if (wait_set_.subscriptions[i]) {
execute_subscription(entities_collector_->get_subscription(i));
}
}
}
// Execute all the ready timers
for (size_t i = 0; i < wait_set_.size_of_timers; ++i) {
if (i < entities_collector_->get_number_of_timers()) {
if (wait_set_.timers[i] && entities_collector_->get_timer(i)->is_ready()) {
execute_timer(entities_collector_->get_timer(i));
}
}
}
// Execute all the ready services
for (size_t i = 0; i < wait_set_.size_of_services; ++i) {
if (i < entities_collector_->get_number_of_services()) {
if (wait_set_.services[i]) {
execute_service(entities_collector_->get_service(i));
}
}
}
// Execute all the ready clients
for (size_t i = 0; i < wait_set_.size_of_clients; ++i) {
if (i < entities_collector_->get_number_of_clients()) {
if (wait_set_.clients[i]) {
execute_client(entities_collector_->get_client(i));
}
}
}
// Execute all the ready waitables
for (size_t i = 0; i < entities_collector_->get_number_of_waitables(); ++i) {
if (entities_collector_->get_waitable(i)->is_ready(&wait_set_)) {
entities_collector_->get_waitable(i)->execute();
}
}
}