implement intra process comms

this involves changes in the executor, node,
publisher, and subscription classes

I'd like a more decoupled way to integrate this
into the executor and node but I was unable to
find a good way to do so.
This commit is contained in:
William Woodall 2015-08-18 18:48:13 -07:00
parent fb4e836da0
commit aedc494f8f
6 changed files with 289 additions and 34 deletions

View file

@ -33,6 +33,7 @@ struct AnyExecutable
{}
// Only one of the following pointers will be set.
rclcpp::subscription::SubscriptionBase::SharedPtr subscription;
rclcpp::subscription::SubscriptionBase::SharedPtr subscription_intra_process;
rclcpp::timer::TimerBase::SharedPtr timer;
rclcpp::service::ServiceBase::SharedPtr service;
rclcpp::client::ClientBase::SharedPtr client;

View file

@ -15,15 +15,16 @@
#ifndef RCLCPP_RCLCPP_EXECUTOR_HPP_
#define RCLCPP_RCLCPP_EXECUTOR_HPP_
#include <iostream>
#include <algorithm>
#include <cassert>
#include <cstdlib>
#include <iostream>
#include <list>
#include <memory>
#include <vector>
#include <rcl_interfaces/msg/intra_process_message.hpp>
#include <rclcpp/any_executable.hpp>
#include <rclcpp/macros.hpp>
#include <rclcpp/memory_strategy.hpp>
@ -159,6 +160,9 @@ protected:
if (any_exec->subscription) {
execute_subscription(any_exec->subscription);
}
if (any_exec->subscription_intra_process) {
execute_intra_process_subscription(any_exec->subscription_intra_process);
}
if (any_exec->service) {
execute_service(any_exec->service);
}
@ -194,6 +198,24 @@ protected:
subscription->return_message(message);
}
static void
execute_intra_process_subscription(
rclcpp::subscription::SubscriptionBase::SharedPtr & subscription)
{
rcl_interfaces::msg::IntraProcessMessage ipm;
bool taken = false;
rmw_ret_t status = rmw_take(subscription->intra_process_subscription_handle_, &ipm, &taken);
if (status == RMW_RET_OK) {
if (taken) {
subscription->handle_intra_process_message(ipm);
}
} else {
fprintf(stderr,
"[rclcpp::error] take failed for intra process subscription on topic '%s': %s\n",
subscription->get_topic_name().c_str(), rmw_get_error_string_safe());
}
}
static void
execute_timer(
rclcpp::timer::TimerBase::SharedPtr & timer)
@ -293,22 +315,24 @@ protected:
}));
}
// Use the number of subscriptions to allocate memory in the handles
size_t number_of_subscriptions = subs.size();
size_t max_number_of_subscriptions = subs.size() * 2; // Times two for intra-process.
rmw_subscriptions_t subscriber_handles;
subscriber_handles.subscriber_count = number_of_subscriptions;
subscriber_handles.subscriber_count = 0;
// TODO(wjwwood): Avoid redundant malloc's
subscriber_handles.subscribers =
memory_strategy_->borrow_handles(HandleType::subscription_handle, number_of_subscriptions);
subscriber_handles.subscribers = memory_strategy_->borrow_handles(
HandleType::subscription_handle, max_number_of_subscriptions);
if (subscriber_handles.subscribers == NULL) {
// TODO(wjwwood): Use a different error here? maybe std::bad_alloc?
throw std::runtime_error("Could not malloc for subscriber pointers.");
}
// Then fill the SubscriberHandles with ready subscriptions
size_t subscriber_handle_index = 0;
for (auto & subscription : subs) {
subscriber_handles.subscribers[subscriber_handle_index] = \
subscriber_handles.subscribers[subscriber_handles.subscriber_count++] =
subscription->subscription_handle_->data;
subscriber_handle_index += 1;
if (subscription->intra_process_subscription_handle_) {
subscriber_handles.subscribers[subscriber_handles.subscriber_count++] =
subscription->intra_process_subscription_handle_->data;
}
}
// Use the number of services to allocate memory in the handles
@ -414,7 +438,7 @@ protected:
}
// Add the new work to the class's list of things waiting to be executed
// Starting with the subscribers
for (size_t i = 0; i < number_of_subscriptions; ++i) {
for (size_t i = 0; i < subscriber_handles.subscriber_count; ++i) {
void * handle = subscriber_handles.subscribers[i];
if (handle) {
subscriber_handles_.push_back(handle);
@ -463,13 +487,18 @@ protected:
}
for (auto & weak_subscription : group->subscription_ptrs_) {
auto subscription = weak_subscription.lock();
if (subscription && subscription->subscription_handle_->data == subscriber_handle) {
return subscription;
if (subscription) {
if (subscription->subscription_handle_->data == subscriber_handle) {
return subscription;
}
if (subscription->intra_process_subscription_handle_->data == subscriber_handle) {
return subscription;
}
}
}
}
}
return rclcpp::subscription::SubscriptionBase::SharedPtr();
return nullptr;
}
rclcpp::service::ServiceBase::SharedPtr
@ -653,6 +682,11 @@ protected:
while (it != subscriber_handles_.end()) {
auto subscription = get_subscription_by_handle(*it);
if (subscription) {
// Figure out if this is for intra-process or not.
bool is_intra_process = false;
if (subscription->intra_process_subscription_handle_) {
is_intra_process = subscription->intra_process_subscription_handle_->data == *it;
}
// Find the group for this handle and see if it can be serviced
auto group = get_group_by_subscription(subscription);
if (!group) {
@ -668,7 +702,11 @@ protected:
continue;
}
// Otherwise it is safe to set and return the any_exec
any_exec->subscription = subscription;
if (is_intra_process) {
any_exec->subscription_intra_process = subscription;
} else {
any_exec->subscription = subscription;
}
any_exec->callback_group = group;
any_exec->node = get_node_by_group(group);
subscriber_handles_.erase(it++);
@ -804,7 +842,7 @@ protected:
}
// Check the subscriptions to see if there are any that are ready
get_next_subscription(any_exec);
if (any_exec->subscription) {
if (any_exec->subscription || any_exec->subscription_intra_process) {
return any_exec;
}
// Check the services to see if there are any that are ready

View file

@ -24,6 +24,7 @@
#include <rcl_interfaces/msg/parameter_descriptor.hpp>
#include <rcl_interfaces/msg/parameter_event.hpp>
#include <rcl_interfaces/msg/set_parameters_result.hpp>
#include <rosidl_generator_cpp/message_type_support.hpp>
#include <rclcpp/callback_group.hpp>
#include <rclcpp/client.hpp>
@ -199,6 +200,8 @@ public:
private:
RCLCPP_DISABLE_COPY(Node);
static const rosidl_message_type_support_t * ipm_ts;
bool
group_in_node(callback_group::CallbackGroup::SharedPtr & group);
@ -309,6 +312,11 @@ private:
}
};
const rosidl_message_type_support_t * Node::ipm_ts =
rosidl_generator_cpp::get_message_type_support_handle<
rcl_interfaces::msg::IntraProcessMessage
>();
} /* namespace node */
} /* namespace rclcpp */

View file

@ -24,12 +24,14 @@
#include <stdexcept>
#include <string>
#include <rcl_interfaces/msg/intra_process_message.hpp>
#include <rmw/error_handling.h>
#include <rmw/rmw.h>
#include <rosidl_generator_cpp/message_type_support.hpp>
#include <rosidl_generator_cpp/service_type_support.hpp>
#include <rclcpp/contexts/default_context.hpp>
#include <rclcpp/intra_process_manager.hpp>
#include <rclcpp/parameter.hpp>
#ifndef RCLCPP_RCLCPP_NODE_HPP_
@ -128,7 +130,45 @@ Node::create_publisher(
// *INDENT-ON*
}
return publisher::Publisher::make_shared(node_handle_, publisher_handle);
auto publisher = publisher::Publisher::make_shared(
node_handle_, publisher_handle, topic_name, qos_profile.depth);
if (use_intra_process_comms_) {
rmw_publisher_t * intra_process_publisher_handle = rmw_create_publisher(
node_handle_.get(), ipm_ts, (topic_name + "__intra").c_str(), qos_profile);
if (!intra_process_publisher_handle) {
// *INDENT-OFF* (prevent uncrustify from making unecessary indents here)
throw std::runtime_error(
std::string("could not create intra process publisher: ") +
rmw_get_error_string_safe());
// *INDENT-ON*
}
auto intra_process_manager =
context_->get_sub_context<rclcpp::intra_process_manager::IntraProcessManager>();
uint64_t intra_process_publisher_id =
intra_process_manager->add_publisher<MessageT>(publisher);
rclcpp::intra_process_manager::IntraProcessManager::WeakPtr weak_ipm = intra_process_manager;
auto shared_publish_callback =
[weak_ipm] (uint64_t publisher_id, std::shared_ptr<void> msg) -> uint64_t
{
auto ipm = weak_ipm.lock();
if (!ipm) {
// TODO(wjwwood): should this just return silently? Or maybe return with a logged warning?
throw std::runtime_error(
"intra process publish called after destruction of intra process manager");
}
auto typed_msg = std::static_pointer_cast<const MessageT>(msg);
std::unique_ptr<MessageT> unique_msg(new MessageT(*typed_msg));
uint64_t message_seq = ipm->store_intra_process_message(publisher_id, unique_msg);
return message_seq;
};
publisher->setup_intra_process(
intra_process_publisher_id,
shared_publish_callback,
intra_process_publisher_handle);
}
return publisher;
}
bool
@ -182,10 +222,45 @@ Node::create_subscription(
callback,
msg_mem_strat);
auto sub_base_ptr = std::dynamic_pointer_cast<SubscriptionBase>(sub);
// Setup intra process.
if (use_intra_process_comms_) {
rmw_subscription_t * intra_process_subscriber_handle = rmw_create_subscription(
node_handle_.get(), ipm_ts,
(topic_name + "__intra").c_str(), qos_profile, false);
if (!subscriber_handle) {
// *INDENT-OFF* (prevent uncrustify from making unecessary indents here)
throw std::runtime_error(
std::string("could not create intra process subscription: ") + rmw_get_error_string_safe());
// *INDENT-ON*
}
auto intra_process_manager =
context_->get_sub_context<rclcpp::intra_process_manager::IntraProcessManager>();
rclcpp::intra_process_manager::IntraProcessManager::WeakPtr weak_ipm = intra_process_manager;
uint64_t intra_process_subscription_id =
intra_process_manager->add_subscription(sub_base_ptr);
sub->setup_intra_process(
intra_process_subscription_id,
intra_process_subscriber_handle,
[weak_ipm] (
uint64_t publisher_id,
uint64_t message_sequence,
uint64_t subscription_id,
std::unique_ptr<MessageT> & message)
{
auto ipm = weak_ipm.lock();
if (!ipm) {
// TODO(wjwwood): should this just return silently? Or maybe return with a logged warning?
throw std::runtime_error(
"intra process take called after destruction of intra process manager");
}
ipm->take_intra_process_message(publisher_id, message_sequence, subscription_id, message);
});
}
// Assign to a group.
if (group) {
if (!group_in_node(group)) {
// TODO: use custom exception
throw std::runtime_error("Cannot create timer, group not in node.");
throw std::runtime_error("Cannot create subscription, group not in node.");
}
group->add_subscription(sub_base_ptr);
} else {

View file

@ -15,15 +15,18 @@
#ifndef RCLCPP_RCLCPP_PUBLISHER_HPP_
#define RCLCPP_RCLCPP_PUBLISHER_HPP_
#include <rclcpp/macros.hpp>
#include <iostream>
#include <memory>
#include <sstream>
#include <string>
#include <rcl_interfaces/msg/intra_process_message.hpp>
#include <rmw/impl/cpp/demangle.hpp>
#include <rmw/error_handling.h>
#include <rmw/rmw.h>
#include <rclcpp/macros.hpp>
namespace rclcpp
{
@ -38,23 +41,37 @@ namespace publisher
class Publisher
{
friend rclcpp::node::Node;
public:
RCLCPP_SMART_PTR_DEFINITIONS(Publisher);
Publisher(std::shared_ptr<rmw_node_t> node_handle, rmw_publisher_t * publisher_handle)
: node_handle_(node_handle), publisher_handle_(publisher_handle)
Publisher(
std::shared_ptr<rmw_node_t> node_handle,
rmw_publisher_t * publisher_handle,
std::string topic,
size_t queue_size)
: node_handle_(node_handle), publisher_handle_(publisher_handle),
intra_process_publisher_handle_(nullptr),
topic_(topic), queue_size_(queue_size),
intra_process_publisher_id_(0), store_intra_process_message_(nullptr)
{}
virtual ~Publisher()
{
if (intra_process_publisher_handle_) {
if (rmw_destroy_publisher(node_handle_.get(), intra_process_publisher_handle_)) {
fprintf(
stderr,
"Error in destruction of intra process rmw publisher handle: %s\n",
rmw_get_error_string_safe());
}
}
if (publisher_handle_) {
if (rmw_destroy_publisher(node_handle_.get(), publisher_handle_) != RMW_RET_OK) {
// *INDENT-OFF*
std::stringstream ss;
ss << "Error in destruction of rmw publisher handle: "
<< rmw_get_error_string_safe() << '\n';
// *INDENT-ON*
(std::cerr << ss.str()).flush();
fprintf(
stderr,
"Error in destruction of rmw publisher handle: %s\n",
rmw_get_error_string_safe());
}
}
}
@ -63,19 +80,69 @@ public:
void
publish(std::shared_ptr<MessageT> & msg)
{
rmw_ret_t status = rmw_publish(publisher_handle_, msg.get());
if (status != RMW_RET_OK) {
// *INDENT-OFF* (prevent uncrustify from making unecessary indents here)
throw std::runtime_error(
std::string("failed to publish message: ") + rmw_get_error_string_safe());
// *INDENT-ON*
rmw_ret_t status;
if (!store_intra_process_message_) {
// TODO(wjwwood): for now, make intra process and inter process mutually exclusive.
// Later we'll have them together, when we have a way to filter more efficiently.
status = rmw_publish(publisher_handle_, msg.get());
if (status != RMW_RET_OK) {
// *INDENT-OFF* (prevent uncrustify from making unecessary indents here)
throw std::runtime_error(
std::string("failed to publish message: ") + rmw_get_error_string_safe());
// *INDENT-ON*
}
}
if (store_intra_process_message_) {
uint64_t message_seq = store_intra_process_message_(intra_process_publisher_id_, msg);
rcl_interfaces::msg::IntraProcessMessage ipm;
ipm.publisher_id = intra_process_publisher_id_;
ipm.message_sequence = message_seq;
status = rmw_publish(intra_process_publisher_handle_, &ipm);
if (status != RMW_RET_OK) {
// *INDENT-OFF* (prevent uncrustify from making unecessary indents here)
throw std::runtime_error(
std::string("failed to publish intra process message: ") + rmw_get_error_string_safe());
// *INDENT-ON*
}
}
}
std::string
get_topic_name() const
{
return topic_;
}
size_t
get_queue_size() const
{
return queue_size_;
}
typedef std::function<uint64_t (uint64_t, std::shared_ptr<void>)> StoreSharedMessageCallbackT;
protected:
void
setup_intra_process(
uint64_t intra_process_publisher_id,
StoreSharedMessageCallbackT callback,
rmw_publisher_t * intra_process_publisher_handle)
{
intra_process_publisher_id_ = intra_process_publisher_id;
store_intra_process_message_ = callback;
intra_process_publisher_handle_ = intra_process_publisher_handle;
}
private:
std::shared_ptr<rmw_node_t> node_handle_;
rmw_publisher_t * publisher_handle_;
rmw_publisher_t * intra_process_publisher_handle_;
std::string topic_;
size_t queue_size_;
uint64_t intra_process_publisher_id_;
StoreSharedMessageCallbackT store_intra_process_message_;
};

View file

@ -21,6 +21,7 @@
#include <sstream>
#include <string>
#include <rcl_interfaces/msg/intra_process_message.hpp>
#include <rmw/error_handling.h>
#include <rmw/rmw.h>
@ -36,6 +37,11 @@ namespace executor
class Executor;
} // namespace executor
namespace node
{
class Node;
} // namespace node
namespace subscription
{
@ -51,7 +57,8 @@ public:
rmw_subscription_t * subscription_handle,
const std::string & topic_name,
bool ignore_local_publications)
: node_handle_(node_handle),
: intra_process_subscription_handle_(nullptr),
node_handle_(node_handle),
subscription_handle_(subscription_handle),
topic_name_(topic_name),
ignore_local_publications_(ignore_local_publications)
@ -70,6 +77,15 @@ public:
(std::cerr << ss.str()).flush();
}
}
if (intra_process_subscription_handle_) {
auto ret = rmw_destroy_subscription(node_handle_.get(), intra_process_subscription_handle_);
if (ret != RMW_RET_OK) {
std::stringstream ss;
ss << "Error in destruction of rmw intra process subscription handle: " <<
rmw_get_error_string_safe() << '\n';
(std::cerr << ss.str()).flush();
}
}
}
const std::string & get_topic_name() const
@ -80,6 +96,10 @@ public:
virtual std::shared_ptr<void> create_message() = 0;
virtual void handle_message(std::shared_ptr<void> & message) = 0;
virtual void return_message(std::shared_ptr<void> & message) = 0;
virtual void handle_intra_process_message(rcl_interfaces::msg::IntraProcessMessage & ipm) = 0;
protected:
rmw_subscription_t * intra_process_subscription_handle_;
private:
RCLCPP_DISABLE_COPY(SubscriptionBase);
@ -87,6 +107,7 @@ private:
std::shared_ptr<rmw_node_t> node_handle_;
rmw_subscription_t * subscription_handle_;
std::string topic_name_;
bool ignore_local_publications_;
@ -95,6 +116,8 @@ private:
template<typename MessageT>
class Subscription : public SubscriptionBase
{
friend class rclcpp::node::Node;
public:
using CallbackType = std::function<void(const std::shared_ptr<MessageT> &)>;
RCLCPP_SMART_PTR_DEFINITIONS(Subscription);
@ -135,13 +158,56 @@ public:
message_memory_strategy_->return_message(typed_message);
}
void handle_intra_process_message(rcl_interfaces::msg::IntraProcessMessage & ipm)
{
if (!get_intra_process_message_callback_) {
// throw std::runtime_error(
// "handle_intra_process_message called before setup_intra_process");
// TODO(wjwwood): for now, this could mean that intra process was just not enabled.
// However, this can only really happen if this node has it disabled, but the other doesn't.
return;
}
std::unique_ptr<MessageT> msg;
get_intra_process_message_callback_(
ipm.publisher_id,
ipm.message_sequence,
intra_process_subscription_id_,
msg);
if (!msg) {
// This either occurred because the publisher no longer exists or the
// message requested is no longer being stored.
// TODO(wjwwood): should we notify someone of this? log error, log warning?
return;
}
typename MessageT::SharedPtr shared_msg = std::move(msg);
callback_(shared_msg);
}
private:
typedef
std::function<
void (uint64_t, uint64_t, uint64_t, std::unique_ptr<MessageT> &)
> GetMessageCallbackType;
void setup_intra_process(
uint64_t intra_process_subscription_id,
rmw_subscription_t * intra_process_subscription,
GetMessageCallbackType callback)
{
intra_process_subscription_id_ = intra_process_subscription_id;
intra_process_subscription_handle_ = intra_process_subscription;
get_intra_process_message_callback_ = callback;
}
RCLCPP_DISABLE_COPY(Subscription);
CallbackType callback_;
typename message_memory_strategy::MessageMemoryStrategy<MessageT>::SharedPtr
message_memory_strategy_;
GetMessageCallbackType get_intra_process_message_callback_;
uint64_t intra_process_subscription_id_;
};
} /* namespace subscription */