From aedc494f8fa07b12662360f19a59e90626895803 Mon Sep 17 00:00:00 2001 From: William Woodall Date: Tue, 18 Aug 2015 18:48:13 -0700 Subject: [PATCH] implement intra process comms this involves changes in the executor, node, publisher, and subscription classes I'd like a more decoupled way to integrate this into the executor and node but I was unable to find a good way to do so. --- rclcpp/include/rclcpp/any_executable.hpp | 1 + rclcpp/include/rclcpp/executor.hpp | 68 ++++++++++++---- rclcpp/include/rclcpp/node.hpp | 8 ++ rclcpp/include/rclcpp/node_impl.hpp | 79 ++++++++++++++++++- rclcpp/include/rclcpp/publisher.hpp | 99 ++++++++++++++++++++---- rclcpp/include/rclcpp/subscription.hpp | 68 +++++++++++++++- 6 files changed, 289 insertions(+), 34 deletions(-) diff --git a/rclcpp/include/rclcpp/any_executable.hpp b/rclcpp/include/rclcpp/any_executable.hpp index 3f698f1..1bdddb8 100644 --- a/rclcpp/include/rclcpp/any_executable.hpp +++ b/rclcpp/include/rclcpp/any_executable.hpp @@ -33,6 +33,7 @@ struct AnyExecutable {} // Only one of the following pointers will be set. rclcpp::subscription::SubscriptionBase::SharedPtr subscription; + rclcpp::subscription::SubscriptionBase::SharedPtr subscription_intra_process; rclcpp::timer::TimerBase::SharedPtr timer; rclcpp::service::ServiceBase::SharedPtr service; rclcpp::client::ClientBase::SharedPtr client; diff --git a/rclcpp/include/rclcpp/executor.hpp b/rclcpp/include/rclcpp/executor.hpp index f32daa6..c58370c 100644 --- a/rclcpp/include/rclcpp/executor.hpp +++ b/rclcpp/include/rclcpp/executor.hpp @@ -15,15 +15,16 @@ #ifndef RCLCPP_RCLCPP_EXECUTOR_HPP_ #define RCLCPP_RCLCPP_EXECUTOR_HPP_ -#include - #include #include #include +#include #include #include #include +#include + #include #include #include @@ -159,6 +160,9 @@ protected: if (any_exec->subscription) { execute_subscription(any_exec->subscription); } + if (any_exec->subscription_intra_process) { + execute_intra_process_subscription(any_exec->subscription_intra_process); + } if (any_exec->service) { execute_service(any_exec->service); } @@ -194,6 +198,24 @@ protected: subscription->return_message(message); } + static void + execute_intra_process_subscription( + rclcpp::subscription::SubscriptionBase::SharedPtr & subscription) + { + rcl_interfaces::msg::IntraProcessMessage ipm; + bool taken = false; + rmw_ret_t status = rmw_take(subscription->intra_process_subscription_handle_, &ipm, &taken); + if (status == RMW_RET_OK) { + if (taken) { + subscription->handle_intra_process_message(ipm); + } + } else { + fprintf(stderr, + "[rclcpp::error] take failed for intra process subscription on topic '%s': %s\n", + subscription->get_topic_name().c_str(), rmw_get_error_string_safe()); + } + } + static void execute_timer( rclcpp::timer::TimerBase::SharedPtr & timer) @@ -293,22 +315,24 @@ protected: })); } // Use the number of subscriptions to allocate memory in the handles - size_t number_of_subscriptions = subs.size(); + size_t max_number_of_subscriptions = subs.size() * 2; // Times two for intra-process. rmw_subscriptions_t subscriber_handles; - subscriber_handles.subscriber_count = number_of_subscriptions; + subscriber_handles.subscriber_count = 0; // TODO(wjwwood): Avoid redundant malloc's - subscriber_handles.subscribers = - memory_strategy_->borrow_handles(HandleType::subscription_handle, number_of_subscriptions); + subscriber_handles.subscribers = memory_strategy_->borrow_handles( + HandleType::subscription_handle, max_number_of_subscriptions); if (subscriber_handles.subscribers == NULL) { // TODO(wjwwood): Use a different error here? maybe std::bad_alloc? throw std::runtime_error("Could not malloc for subscriber pointers."); } // Then fill the SubscriberHandles with ready subscriptions - size_t subscriber_handle_index = 0; for (auto & subscription : subs) { - subscriber_handles.subscribers[subscriber_handle_index] = \ + subscriber_handles.subscribers[subscriber_handles.subscriber_count++] = subscription->subscription_handle_->data; - subscriber_handle_index += 1; + if (subscription->intra_process_subscription_handle_) { + subscriber_handles.subscribers[subscriber_handles.subscriber_count++] = + subscription->intra_process_subscription_handle_->data; + } } // Use the number of services to allocate memory in the handles @@ -414,7 +438,7 @@ protected: } // Add the new work to the class's list of things waiting to be executed // Starting with the subscribers - for (size_t i = 0; i < number_of_subscriptions; ++i) { + for (size_t i = 0; i < subscriber_handles.subscriber_count; ++i) { void * handle = subscriber_handles.subscribers[i]; if (handle) { subscriber_handles_.push_back(handle); @@ -463,13 +487,18 @@ protected: } for (auto & weak_subscription : group->subscription_ptrs_) { auto subscription = weak_subscription.lock(); - if (subscription && subscription->subscription_handle_->data == subscriber_handle) { - return subscription; + if (subscription) { + if (subscription->subscription_handle_->data == subscriber_handle) { + return subscription; + } + if (subscription->intra_process_subscription_handle_->data == subscriber_handle) { + return subscription; + } } } } } - return rclcpp::subscription::SubscriptionBase::SharedPtr(); + return nullptr; } rclcpp::service::ServiceBase::SharedPtr @@ -653,6 +682,11 @@ protected: while (it != subscriber_handles_.end()) { auto subscription = get_subscription_by_handle(*it); if (subscription) { + // Figure out if this is for intra-process or not. + bool is_intra_process = false; + if (subscription->intra_process_subscription_handle_) { + is_intra_process = subscription->intra_process_subscription_handle_->data == *it; + } // Find the group for this handle and see if it can be serviced auto group = get_group_by_subscription(subscription); if (!group) { @@ -668,7 +702,11 @@ protected: continue; } // Otherwise it is safe to set and return the any_exec - any_exec->subscription = subscription; + if (is_intra_process) { + any_exec->subscription_intra_process = subscription; + } else { + any_exec->subscription = subscription; + } any_exec->callback_group = group; any_exec->node = get_node_by_group(group); subscriber_handles_.erase(it++); @@ -804,7 +842,7 @@ protected: } // Check the subscriptions to see if there are any that are ready get_next_subscription(any_exec); - if (any_exec->subscription) { + if (any_exec->subscription || any_exec->subscription_intra_process) { return any_exec; } // Check the services to see if there are any that are ready diff --git a/rclcpp/include/rclcpp/node.hpp b/rclcpp/include/rclcpp/node.hpp index 4266a0c..a4d30a8 100644 --- a/rclcpp/include/rclcpp/node.hpp +++ b/rclcpp/include/rclcpp/node.hpp @@ -24,6 +24,7 @@ #include #include #include +#include #include #include @@ -199,6 +200,8 @@ public: private: RCLCPP_DISABLE_COPY(Node); + static const rosidl_message_type_support_t * ipm_ts; + bool group_in_node(callback_group::CallbackGroup::SharedPtr & group); @@ -309,6 +312,11 @@ private: } }; +const rosidl_message_type_support_t * Node::ipm_ts = + rosidl_generator_cpp::get_message_type_support_handle< + rcl_interfaces::msg::IntraProcessMessage + >(); + } /* namespace node */ } /* namespace rclcpp */ diff --git a/rclcpp/include/rclcpp/node_impl.hpp b/rclcpp/include/rclcpp/node_impl.hpp index 2202561..a6a46e5 100644 --- a/rclcpp/include/rclcpp/node_impl.hpp +++ b/rclcpp/include/rclcpp/node_impl.hpp @@ -24,12 +24,14 @@ #include #include +#include #include #include #include #include #include +#include #include #ifndef RCLCPP_RCLCPP_NODE_HPP_ @@ -128,7 +130,45 @@ Node::create_publisher( // *INDENT-ON* } - return publisher::Publisher::make_shared(node_handle_, publisher_handle); + auto publisher = publisher::Publisher::make_shared( + node_handle_, publisher_handle, topic_name, qos_profile.depth); + + if (use_intra_process_comms_) { + rmw_publisher_t * intra_process_publisher_handle = rmw_create_publisher( + node_handle_.get(), ipm_ts, (topic_name + "__intra").c_str(), qos_profile); + if (!intra_process_publisher_handle) { + // *INDENT-OFF* (prevent uncrustify from making unecessary indents here) + throw std::runtime_error( + std::string("could not create intra process publisher: ") + + rmw_get_error_string_safe()); + // *INDENT-ON* + } + + auto intra_process_manager = + context_->get_sub_context(); + uint64_t intra_process_publisher_id = + intra_process_manager->add_publisher(publisher); + rclcpp::intra_process_manager::IntraProcessManager::WeakPtr weak_ipm = intra_process_manager; + auto shared_publish_callback = + [weak_ipm] (uint64_t publisher_id, std::shared_ptr msg) -> uint64_t + { + auto ipm = weak_ipm.lock(); + if (!ipm) { + // TODO(wjwwood): should this just return silently? Or maybe return with a logged warning? + throw std::runtime_error( + "intra process publish called after destruction of intra process manager"); + } + auto typed_msg = std::static_pointer_cast(msg); + std::unique_ptr unique_msg(new MessageT(*typed_msg)); + uint64_t message_seq = ipm->store_intra_process_message(publisher_id, unique_msg); + return message_seq; + }; + publisher->setup_intra_process( + intra_process_publisher_id, + shared_publish_callback, + intra_process_publisher_handle); + } + return publisher; } bool @@ -182,10 +222,45 @@ Node::create_subscription( callback, msg_mem_strat); auto sub_base_ptr = std::dynamic_pointer_cast(sub); + // Setup intra process. + if (use_intra_process_comms_) { + rmw_subscription_t * intra_process_subscriber_handle = rmw_create_subscription( + node_handle_.get(), ipm_ts, + (topic_name + "__intra").c_str(), qos_profile, false); + if (!subscriber_handle) { + // *INDENT-OFF* (prevent uncrustify from making unecessary indents here) + throw std::runtime_error( + std::string("could not create intra process subscription: ") + rmw_get_error_string_safe()); + // *INDENT-ON* + } + auto intra_process_manager = + context_->get_sub_context(); + rclcpp::intra_process_manager::IntraProcessManager::WeakPtr weak_ipm = intra_process_manager; + uint64_t intra_process_subscription_id = + intra_process_manager->add_subscription(sub_base_ptr); + sub->setup_intra_process( + intra_process_subscription_id, + intra_process_subscriber_handle, + [weak_ipm] ( + uint64_t publisher_id, + uint64_t message_sequence, + uint64_t subscription_id, + std::unique_ptr & message) + { + auto ipm = weak_ipm.lock(); + if (!ipm) { + // TODO(wjwwood): should this just return silently? Or maybe return with a logged warning? + throw std::runtime_error( + "intra process take called after destruction of intra process manager"); + } + ipm->take_intra_process_message(publisher_id, message_sequence, subscription_id, message); + }); + } + // Assign to a group. if (group) { if (!group_in_node(group)) { // TODO: use custom exception - throw std::runtime_error("Cannot create timer, group not in node."); + throw std::runtime_error("Cannot create subscription, group not in node."); } group->add_subscription(sub_base_ptr); } else { diff --git a/rclcpp/include/rclcpp/publisher.hpp b/rclcpp/include/rclcpp/publisher.hpp index 3a8cc38..ad9102b 100644 --- a/rclcpp/include/rclcpp/publisher.hpp +++ b/rclcpp/include/rclcpp/publisher.hpp @@ -15,15 +15,18 @@ #ifndef RCLCPP_RCLCPP_PUBLISHER_HPP_ #define RCLCPP_RCLCPP_PUBLISHER_HPP_ +#include + #include #include #include +#include +#include +#include #include #include -#include - namespace rclcpp { @@ -38,23 +41,37 @@ namespace publisher class Publisher { + friend rclcpp::node::Node; public: RCLCPP_SMART_PTR_DEFINITIONS(Publisher); - Publisher(std::shared_ptr node_handle, rmw_publisher_t * publisher_handle) - : node_handle_(node_handle), publisher_handle_(publisher_handle) + Publisher( + std::shared_ptr node_handle, + rmw_publisher_t * publisher_handle, + std::string topic, + size_t queue_size) + : node_handle_(node_handle), publisher_handle_(publisher_handle), + intra_process_publisher_handle_(nullptr), + topic_(topic), queue_size_(queue_size), + intra_process_publisher_id_(0), store_intra_process_message_(nullptr) {} virtual ~Publisher() { + if (intra_process_publisher_handle_) { + if (rmw_destroy_publisher(node_handle_.get(), intra_process_publisher_handle_)) { + fprintf( + stderr, + "Error in destruction of intra process rmw publisher handle: %s\n", + rmw_get_error_string_safe()); + } + } if (publisher_handle_) { if (rmw_destroy_publisher(node_handle_.get(), publisher_handle_) != RMW_RET_OK) { - // *INDENT-OFF* - std::stringstream ss; - ss << "Error in destruction of rmw publisher handle: " - << rmw_get_error_string_safe() << '\n'; - // *INDENT-ON* - (std::cerr << ss.str()).flush(); + fprintf( + stderr, + "Error in destruction of rmw publisher handle: %s\n", + rmw_get_error_string_safe()); } } } @@ -63,19 +80,69 @@ public: void publish(std::shared_ptr & msg) { - rmw_ret_t status = rmw_publish(publisher_handle_, msg.get()); - if (status != RMW_RET_OK) { - // *INDENT-OFF* (prevent uncrustify from making unecessary indents here) - throw std::runtime_error( - std::string("failed to publish message: ") + rmw_get_error_string_safe()); - // *INDENT-ON* + rmw_ret_t status; + if (!store_intra_process_message_) { + // TODO(wjwwood): for now, make intra process and inter process mutually exclusive. + // Later we'll have them together, when we have a way to filter more efficiently. + status = rmw_publish(publisher_handle_, msg.get()); + if (status != RMW_RET_OK) { + // *INDENT-OFF* (prevent uncrustify from making unecessary indents here) + throw std::runtime_error( + std::string("failed to publish message: ") + rmw_get_error_string_safe()); + // *INDENT-ON* + } } + if (store_intra_process_message_) { + uint64_t message_seq = store_intra_process_message_(intra_process_publisher_id_, msg); + rcl_interfaces::msg::IntraProcessMessage ipm; + ipm.publisher_id = intra_process_publisher_id_; + ipm.message_sequence = message_seq; + status = rmw_publish(intra_process_publisher_handle_, &ipm); + if (status != RMW_RET_OK) { + // *INDENT-OFF* (prevent uncrustify from making unecessary indents here) + throw std::runtime_error( + std::string("failed to publish intra process message: ") + rmw_get_error_string_safe()); + // *INDENT-ON* + } + } + } + + std::string + get_topic_name() const + { + return topic_; + } + + size_t + get_queue_size() const + { + return queue_size_; + } + + typedef std::function)> StoreSharedMessageCallbackT; +protected: + void + setup_intra_process( + uint64_t intra_process_publisher_id, + StoreSharedMessageCallbackT callback, + rmw_publisher_t * intra_process_publisher_handle) + { + intra_process_publisher_id_ = intra_process_publisher_id; + store_intra_process_message_ = callback; + intra_process_publisher_handle_ = intra_process_publisher_handle; } private: std::shared_ptr node_handle_; rmw_publisher_t * publisher_handle_; + rmw_publisher_t * intra_process_publisher_handle_; + + std::string topic_; + size_t queue_size_; + + uint64_t intra_process_publisher_id_; + StoreSharedMessageCallbackT store_intra_process_message_; }; diff --git a/rclcpp/include/rclcpp/subscription.hpp b/rclcpp/include/rclcpp/subscription.hpp index 141747c..13235c3 100644 --- a/rclcpp/include/rclcpp/subscription.hpp +++ b/rclcpp/include/rclcpp/subscription.hpp @@ -21,6 +21,7 @@ #include #include +#include #include #include @@ -36,6 +37,11 @@ namespace executor class Executor; } // namespace executor +namespace node +{ +class Node; +} // namespace node + namespace subscription { @@ -51,7 +57,8 @@ public: rmw_subscription_t * subscription_handle, const std::string & topic_name, bool ignore_local_publications) - : node_handle_(node_handle), + : intra_process_subscription_handle_(nullptr), + node_handle_(node_handle), subscription_handle_(subscription_handle), topic_name_(topic_name), ignore_local_publications_(ignore_local_publications) @@ -70,6 +77,15 @@ public: (std::cerr << ss.str()).flush(); } } + if (intra_process_subscription_handle_) { + auto ret = rmw_destroy_subscription(node_handle_.get(), intra_process_subscription_handle_); + if (ret != RMW_RET_OK) { + std::stringstream ss; + ss << "Error in destruction of rmw intra process subscription handle: " << + rmw_get_error_string_safe() << '\n'; + (std::cerr << ss.str()).flush(); + } + } } const std::string & get_topic_name() const @@ -80,6 +96,10 @@ public: virtual std::shared_ptr create_message() = 0; virtual void handle_message(std::shared_ptr & message) = 0; virtual void return_message(std::shared_ptr & message) = 0; + virtual void handle_intra_process_message(rcl_interfaces::msg::IntraProcessMessage & ipm) = 0; + +protected: + rmw_subscription_t * intra_process_subscription_handle_; private: RCLCPP_DISABLE_COPY(SubscriptionBase); @@ -87,6 +107,7 @@ private: std::shared_ptr node_handle_; rmw_subscription_t * subscription_handle_; + std::string topic_name_; bool ignore_local_publications_; @@ -95,6 +116,8 @@ private: template class Subscription : public SubscriptionBase { + friend class rclcpp::node::Node; + public: using CallbackType = std::function &)>; RCLCPP_SMART_PTR_DEFINITIONS(Subscription); @@ -135,13 +158,56 @@ public: message_memory_strategy_->return_message(typed_message); } + void handle_intra_process_message(rcl_interfaces::msg::IntraProcessMessage & ipm) + { + if (!get_intra_process_message_callback_) { + // throw std::runtime_error( + // "handle_intra_process_message called before setup_intra_process"); + // TODO(wjwwood): for now, this could mean that intra process was just not enabled. + // However, this can only really happen if this node has it disabled, but the other doesn't. + return; + } + std::unique_ptr msg; + get_intra_process_message_callback_( + ipm.publisher_id, + ipm.message_sequence, + intra_process_subscription_id_, + msg); + if (!msg) { + // This either occurred because the publisher no longer exists or the + // message requested is no longer being stored. + // TODO(wjwwood): should we notify someone of this? log error, log warning? + return; + } + typename MessageT::SharedPtr shared_msg = std::move(msg); + callback_(shared_msg); + } + private: + typedef + std::function< + void (uint64_t, uint64_t, uint64_t, std::unique_ptr &) + > GetMessageCallbackType; + + void setup_intra_process( + uint64_t intra_process_subscription_id, + rmw_subscription_t * intra_process_subscription, + GetMessageCallbackType callback) + { + intra_process_subscription_id_ = intra_process_subscription_id; + intra_process_subscription_handle_ = intra_process_subscription; + get_intra_process_message_callback_ = callback; + } + RCLCPP_DISABLE_COPY(Subscription); CallbackType callback_; typename message_memory_strategy::MessageMemoryStrategy::SharedPtr message_memory_strategy_; + GetMessageCallbackType get_intra_process_message_callback_; + uint64_t intra_process_subscription_id_; + }; } /* namespace subscription */