Remove fixed guard conditions, add notify guard conditions

* No more fixed guard conditions.

* Add notify guard condition to nodes
This commit is contained in:
Jackie Kay 2016-04-01 14:07:07 -07:00
parent 2be9568498
commit 6bcd9db4d6
7 changed files with 124 additions and 18 deletions

View file

@ -306,8 +306,6 @@ protected:
/// Spinning state, used to prevent multi threaded calls to spin and to cancel blocking spins.
std::atomic_bool spinning;
rmw_guard_conditions_t fixed_guard_conditions_;
/// Guard condition for signaling the rmw layer to wake up for special events.
rmw_guard_condition_t * interrupt_guard_condition_;
@ -321,7 +319,6 @@ private:
RCLCPP_DISABLE_COPY(Executor);
std::vector<std::weak_ptr<rclcpp::node::Node>> weak_nodes_;
std::array<void *, 2> guard_cond_handles_;
};
} // namespace executor

View file

@ -49,6 +49,9 @@ public:
// return the new number of clients
virtual size_t fill_client_handles(void ** & ptr) = 0;
// return the new number of guard_conditions
virtual size_t fill_guard_condition_handles(void ** & ptr) = 0;
virtual void clear_active_entities() = 0;
virtual void clear_handles() = 0;
@ -59,6 +62,10 @@ public:
// \return Shared pointer to the fresh executable.
virtual rclcpp::executor::AnyExecutable::SharedPtr instantiate_next_executable() = 0;
virtual void add_guard_condition(const rmw_guard_condition_t * guard_condition) = 0;
virtual void remove_guard_condition(const rmw_guard_condition_t * guard_condition) = 0;
virtual void
get_next_subscription(rclcpp::executor::AnyExecutable::SharedPtr any_exec,
const WeakNodeVector & weak_nodes) = 0;

View file

@ -77,6 +77,9 @@ public:
const std::string & node_name, rclcpp::context::Context::SharedPtr context,
bool use_intra_process_comms = false);
RCLCPP_PUBLIC
~Node();
/// Get the name of the node.
// \return The name of the node.
RCLCPP_PUBLIC
@ -252,6 +255,9 @@ public:
const CallbackGroupWeakPtrList &
get_callback_groups() const;
RCLCPP_PUBLIC
rmw_guard_condition_t * get_notify_guard_condition() const;
std::atomic_bool has_executor;
private:
@ -279,6 +285,9 @@ private:
mutable std::mutex mutex_;
/// Guard condition for notifying the Executor of changes to this node.
rmw_guard_condition_t * notify_guard_condition_;
std::map<std::string, rclcpp::parameter::ParameterVariant> parameters_;
publisher::Publisher<rcl_interfaces::msg::ParameterEvent>::SharedPtr events_publisher_;

View file

@ -134,6 +134,11 @@ Node::create_publisher(
shared_publish_callback,
intra_process_publisher_handle);
}
if (rmw_trigger_guard_condition(notify_guard_condition_) != RMW_RET_OK) {
throw std::runtime_error(
std::string(
"Failed to notify waitset on publisher creation: ") + rmw_get_error_string());
}
return publisher;
}
@ -243,6 +248,11 @@ Node::create_subscription(
default_callback_group_->add_subscription(sub_base_ptr);
}
number_of_subscriptions_++;
if (rmw_trigger_guard_condition(notify_guard_condition_) != RMW_RET_OK) {
throw std::runtime_error(
std::string(
"Failed to notify waitset on subscription creation: ") + rmw_get_error_string());
}
return sub;
}
@ -289,6 +299,11 @@ Node::create_wall_timer(
default_callback_group_->add_timer(timer);
}
number_of_timers_++;
if (rmw_trigger_guard_condition(notify_guard_condition_) != RMW_RET_OK) {
throw std::runtime_error(
std::string(
"Failed to notify waitset on timer creation: ") + rmw_get_error_string());
}
return timer;
}
@ -333,6 +348,11 @@ Node::create_client(
}
number_of_clients_++;
if (rmw_trigger_guard_condition(notify_guard_condition_) != RMW_RET_OK) {
throw std::runtime_error(
std::string(
"Failed to notify waitset on client creation: ") + rmw_get_error_string());
}
return cli;
}
@ -374,6 +394,11 @@ Node::create_service(
default_callback_group_->add_service(serv_base_ptr);
}
number_of_services_++;
if (rmw_trigger_guard_condition(notify_guard_condition_) != RMW_RET_OK) {
throw std::runtime_error(
std::string(
"Failed to notify waitset on service creation: ") + rmw_get_error_string());
}
return serv;
}

View file

@ -23,6 +23,8 @@
#include "rclcpp/node.hpp"
#include "rclcpp/visibility_control.hpp"
#include "rmw/types.h"
namespace rclcpp
{
namespace memory_strategies
@ -62,6 +64,26 @@ public:
allocator_ = std::make_shared<VoidAlloc>();
}
void add_guard_condition(const rmw_guard_condition_t * guard_condition)
{
for (const auto & existing_guard_condition : guard_conditions_) {
if (existing_guard_condition == guard_condition) {
return;
}
}
guard_conditions_.push_back(guard_condition);
}
void remove_guard_condition(const rmw_guard_condition_t * guard_condition)
{
for (auto it = guard_conditions_.begin(); it != guard_conditions_.end(); ++it) {
if (*it == guard_condition) {
guard_conditions_.erase(it);
break;
}
}
}
size_t fill_subscriber_handles(void ** & ptr)
{
for (auto & subscription : subscriptions_) {
@ -94,6 +116,17 @@ public:
return client_handles_.size();
}
size_t fill_guard_condition_handles(void ** & ptr)
{
for (const auto & guard_condition : guard_conditions_) {
if (guard_condition) {
guard_condition_handles_.push_back(guard_condition->data);
}
}
ptr = guard_condition_handles_.data();
return guard_condition_handles_.size();
}
void clear_active_entities()
{
subscriptions_.clear();
@ -106,6 +139,7 @@ public:
subscriber_handles_.clear();
service_handles_.clear();
client_handles_.clear();
guard_condition_handles_.clear();
}
void remove_null_handles()
@ -124,6 +158,11 @@ public:
std::remove(client_handles_.begin(), client_handles_.end(), nullptr),
client_handles_.end()
);
guard_condition_handles_.erase(
std::remove(guard_condition_handles_.begin(), guard_condition_handles_.end(), nullptr),
guard_condition_handles_.end()
);
}
bool collect_entities(const WeakNodeVector & weak_nodes)
@ -288,10 +327,12 @@ private:
VectorRebind<rclcpp::subscription::SubscriptionBase::SharedPtr> subscriptions_;
VectorRebind<rclcpp::service::ServiceBase::SharedPtr> services_;
VectorRebind<rclcpp::client::ClientBase::SharedPtr> clients_;
VectorRebind<const rmw_guard_condition_t *> guard_conditions_;
VectorRebind<void *> subscriber_handles_;
VectorRebind<void *> service_handles_;
VectorRebind<void *> client_handles_;
VectorRebind<void *> guard_condition_handles_;
std::shared_ptr<ExecAlloc> executable_allocator_;
std::shared_ptr<VoidAlloc> allocator_;

View file

@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
#include <algorithm>
#include <string>
#include <type_traits>
@ -34,23 +35,16 @@ Executor::Executor(const ExecutorArgs & args)
throw std::runtime_error("Failed to create interrupt guard condition in Executor constructor");
}
// The number of guard conditions is fixed at 2: 1 for the ctrl-c guard cond,
// The number of guard conditions is always at least 2: 1 for the ctrl-c guard cond,
// and one for the executor's guard cond (interrupt_guard_condition_)
// These guard conditions are permanently attached to the waitset.
const size_t number_of_guard_conds = 2;
fixed_guard_conditions_.guard_condition_count = number_of_guard_conds;
fixed_guard_conditions_.guard_conditions = static_cast<void **>(guard_cond_handles_.data());
// The size of guard conditions is variable because the number of nodes can vary
// Put the global ctrl-c guard condition in
assert(fixed_guard_conditions_.guard_condition_count > 1);
fixed_guard_conditions_.guard_conditions[0] = \
rclcpp::utilities::get_global_sigint_guard_condition()->data;
memory_strategy_->add_guard_condition(rclcpp::utilities::get_global_sigint_guard_condition());
// Put the executor's guard condition in
fixed_guard_conditions_.guard_conditions[1] = interrupt_guard_condition_->data;
memory_strategy_->add_guard_condition(interrupt_guard_condition_);
// The waitset adds the fixed guard conditions to the middleware waitset on initialization,
// and removes the guard conditions in rmw_destroy_waitset.
waitset_ = rmw_create_waitset(&fixed_guard_conditions_, args.max_conditions);
waitset_ = rmw_create_waitset(args.max_conditions);
if (!waitset_) {
fprintf(stderr,
@ -107,6 +101,8 @@ Executor::add_node(rclcpp::node::Node::SharedPtr node_ptr, bool notify)
throw std::runtime_error(rmw_get_error_string_safe());
}
}
// Add the node's notify condition to the guard condition handles
memory_strategy_->add_guard_condition(node_ptr->get_notify_guard_condition());
}
void
@ -136,6 +132,7 @@ Executor::remove_node(rclcpp::node::Node::SharedPtr node_ptr, bool notify)
}
}
}
memory_strategy_->remove_guard_condition(node_ptr->get_notify_guard_condition());
}
void
@ -356,6 +353,7 @@ Executor::wait_for_work(std::chrono::nanoseconds timeout)
);
}
memory_strategy_->clear_handles();
// Use the number of subscriptions to allocate memory in the handles
rmw_subscriptions_t subscriber_handles;
subscriber_handles.subscriber_count =
@ -369,7 +367,10 @@ Executor::wait_for_work(std::chrono::nanoseconds timeout)
client_handles.client_count =
memory_strategy_->fill_client_handles(client_handles.clients);
// Don't pass guard conditions to rmw_wait; they are permanent fixtures of the waitset
// construct a guard conditions struct
rmw_guard_conditions_t guard_conditions;
guard_conditions.guard_condition_count =
memory_strategy_->fill_guard_condition_handles(guard_conditions.guard_conditions);
rmw_time_t * wait_timeout = NULL;
rmw_time_t rmw_timeout;
@ -397,7 +398,7 @@ Executor::wait_for_work(std::chrono::nanoseconds timeout)
// Now wait on the waitable subscriptions and timers
rmw_ret_t status = rmw_wait(
&subscriber_handles,
nullptr,
&guard_conditions,
&service_handles,
&client_handles,
waitset_,

View file

@ -36,8 +36,13 @@ Node::Node(
bool use_intra_process_comms)
: name_(node_name), context_(context),
number_of_subscriptions_(0), number_of_timers_(0), number_of_services_(0),
use_intra_process_comms_(use_intra_process_comms)
use_intra_process_comms_(use_intra_process_comms),
notify_guard_condition_(rmw_create_guard_condition())
{
if (!notify_guard_condition_) {
throw std::runtime_error("Failed to create guard condition for node: " +
std::string(rmw_get_error_string_safe()));
}
has_executor.store(false);
size_t domain_id = 0;
char * ros_domain_id = nullptr;
@ -51,6 +56,10 @@ Node::Node(
if (ros_domain_id) {
uint32_t number = strtoul(ros_domain_id, NULL, 0);
if (number == (std::numeric_limits<uint32_t>::max)()) {
if (rmw_destroy_guard_condition(notify_guard_condition_) != RMW_RET_OK) {
fprintf(
stderr, "Failed to destroy notify guard condition: %s\n", rmw_get_error_string_safe());
}
throw std::runtime_error("failed to interpret ROS_DOMAIN_ID as integral number");
}
domain_id = static_cast<size_t>(number);
@ -62,6 +71,10 @@ Node::Node(
auto node = rmw_create_node(name_.c_str(), domain_id);
if (!node) {
// *INDENT-OFF*
if (rmw_destroy_guard_condition(notify_guard_condition_) != RMW_RET_OK) {
fprintf(
stderr, "Failed to destroy notify guard condition: %s\n", rmw_get_error_string_safe());
}
throw std::runtime_error(
std::string("could not create node: ") +
rmw_get_error_string_safe());
@ -85,6 +98,14 @@ Node::Node(
"parameter_events", rmw_qos_profile_parameter_events);
}
Node::~Node()
{
if (rmw_destroy_guard_condition(notify_guard_condition_) != RMW_RET_OK) {
fprintf(stderr, "Warning! Failed to destroy guard condition in Node destructor: %s\n",
rmw_get_error_string_safe());
}
}
const std::string &
Node::get_name() const
{
@ -334,3 +355,8 @@ Node::get_callback_groups() const
{
return callback_groups_;
}
rmw_guard_condition_t * Node::get_notify_guard_condition() const
{
return notify_guard_condition_;
}