diff --git a/rclcpp/include/rclcpp/callback_group.hpp b/rclcpp/include/rclcpp/callback_group.hpp index a1b7a12..ef87a2f 100644 --- a/rclcpp/include/rclcpp/callback_group.hpp +++ b/rclcpp/include/rclcpp/callback_group.hpp @@ -21,9 +21,9 @@ #include #include "rclcpp/client.hpp" -#include "rclcpp/publisher.hpp" +#include "rclcpp/publisher_base.hpp" #include "rclcpp/service.hpp" -#include "rclcpp/subscription.hpp" +#include "rclcpp/subscription_base.hpp" #include "rclcpp/timer.hpp" #include "rclcpp/visibility_control.hpp" #include "rclcpp/waitable.hpp" diff --git a/rclcpp/include/rclcpp/create_publisher.hpp b/rclcpp/include/rclcpp/create_publisher.hpp index 0e99c37..811c18b 100644 --- a/rclcpp/include/rclcpp/create_publisher.hpp +++ b/rclcpp/include/rclcpp/create_publisher.hpp @@ -37,7 +37,7 @@ namespace rclcpp template< typename MessageT, typename AllocatorT = std::allocator, - typename PublisherT = ::rclcpp::Publisher, + typename PublisherT = rclcpp::Publisher, typename NodeT> std::shared_ptr create_publisher( @@ -46,43 +46,23 @@ create_publisher( const rclcpp::QoS & qos, const rclcpp::PublisherOptionsWithAllocator & options = ( rclcpp::PublisherOptionsWithAllocator() -)) + ) +) { + // Extract the NodeTopicsInterface from the NodeT. using rclcpp::node_interfaces::get_node_topics_interface; auto node_topics = get_node_topics_interface(node); - std::shared_ptr allocator = options.allocator; - if (!allocator) { - allocator = std::make_shared(); - } - - bool use_intra_process; - switch (options.use_intra_process_comm) { - case IntraProcessSetting::Enable: - use_intra_process = true; - break; - case IntraProcessSetting::Disable: - use_intra_process = false; - break; - case IntraProcessSetting::NodeDefault: - use_intra_process = node_topics->get_node_base_interface()->get_use_intra_process_default(); - break; - default: - throw std::runtime_error("Unrecognized IntraProcessSetting value"); - break; - } - - // TODO(wjwwood): convert all of the interfaces to use QoS and PublisherOptionsBase + // Create the publisher. auto pub = node_topics->create_publisher( topic_name, - rclcpp::create_publisher_factory( - options.event_callbacks, - allocator - ), - options.template to_rcl_publisher_options(qos), - use_intra_process + rclcpp::create_publisher_factory(options), + qos ); + + // Add the publisher to the node topics interface. node_topics->add_publisher(pub, options.callback_group); + return std::dynamic_pointer_cast(pub); } diff --git a/rclcpp/include/rclcpp/create_subscription.hpp b/rclcpp/include/rclcpp/create_subscription.hpp index 7c13d6f..0f267cb 100644 --- a/rclcpp/include/rclcpp/create_subscription.hpp +++ b/rclcpp/include/rclcpp/create_subscription.hpp @@ -42,6 +42,10 @@ template< typename CallbackMessageT = typename rclcpp::subscription_traits::has_message_type::type, typename SubscriptionT = rclcpp::Subscription, + typename MessageMemoryStrategyT = rclcpp::message_memory_strategy::MessageMemoryStrategy< + CallbackMessageT, + AllocatorT + >, typename NodeT> typename std::shared_ptr create_subscription( @@ -52,48 +56,21 @@ create_subscription( const rclcpp::SubscriptionOptionsWithAllocator & options = ( rclcpp::SubscriptionOptionsWithAllocator() ), - typename rclcpp::message_memory_strategy::MessageMemoryStrategy< - CallbackMessageT, AllocatorT>::SharedPtr - msg_mem_strat = nullptr) + typename MessageMemoryStrategyT::SharedPtr msg_mem_strat = ( + MessageMemoryStrategyT::create_default() + ) +) { using rclcpp::node_interfaces::get_node_topics_interface; auto node_topics = get_node_topics_interface(std::forward(node)); - if (!msg_mem_strat) { - using rclcpp::message_memory_strategy::MessageMemoryStrategy; - msg_mem_strat = MessageMemoryStrategy::create_default(); - } + auto factory = rclcpp::create_subscription_factory( + std::forward(callback), + options, + msg_mem_strat + ); - std::shared_ptr allocator = options.allocator; - if (!allocator) { - allocator = std::make_shared(); - } - auto factory = rclcpp::create_subscription_factory - ( - std::forward(callback), options.event_callbacks, msg_mem_strat, allocator); - - bool use_intra_process; - switch (options.use_intra_process_comm) { - case IntraProcessSetting::Enable: - use_intra_process = true; - break; - case IntraProcessSetting::Disable: - use_intra_process = false; - break; - case IntraProcessSetting::NodeDefault: - use_intra_process = node_topics->get_node_base_interface()->get_use_intra_process_default(); - break; - default: - throw std::runtime_error("Unrecognized IntraProcessSetting value"); - break; - } - - // TODO(wjwwood): convert all of the interfaces to use QoS and SubscriptionOptionsBase - auto sub = node_topics->create_subscription( - topic_name, - factory, - options.template to_rcl_subscription_options(qos), - use_intra_process); + auto sub = node_topics->create_subscription(topic_name, factory, qos); node_topics->add_subscription(sub, options.callback_group); return std::dynamic_pointer_cast(sub); } diff --git a/rclcpp/include/rclcpp/detail/README.md b/rclcpp/include/rclcpp/detail/README.md new file mode 100644 index 0000000..a2712d6 --- /dev/null +++ b/rclcpp/include/rclcpp/detail/README.md @@ -0,0 +1,3 @@ +Notice that headers in this folder should only provide symbols in the rclcpp::detail namespace. + +Also that these headers are not considered part of the public API and are subject to change without notice. diff --git a/rclcpp/include/rclcpp/detail/resolve_use_intra_process.hpp b/rclcpp/include/rclcpp/detail/resolve_use_intra_process.hpp new file mode 100644 index 0000000..cab3542 --- /dev/null +++ b/rclcpp/include/rclcpp/detail/resolve_use_intra_process.hpp @@ -0,0 +1,55 @@ +// Copyright 2019 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef RCLCPP__DETAIL__RESOLVE_USE_INTRA_PROCESS_HPP_ +#define RCLCPP__DETAIL__RESOLVE_USE_INTRA_PROCESS_HPP_ + +#include + +#include "rclcpp/intra_process_setting.hpp" + +namespace rclcpp +{ + +namespace detail +{ + +template +bool +resolve_use_intra_process(const OptionsT & options, const NodeBaseT & node_base) +{ + bool use_intra_process; + switch (options.use_intra_process_comm) { + case IntraProcessSetting::Enable: + use_intra_process = true; + break; + case IntraProcessSetting::Disable: + use_intra_process = false; + break; + case IntraProcessSetting::NodeDefault: + use_intra_process = node_base.get_use_intra_process_default(); + break; + default: + throw std::runtime_error("Unrecognized IntraProcessSetting value"); + break; + } + + return use_intra_process; +} + +} // namespace detail + +} // namespace rclcpp + +#endif // RCLCPP__DETAIL__RESOLVE_USE_INTRA_PROCESS_HPP_ diff --git a/rclcpp/include/rclcpp/node.hpp b/rclcpp/include/rclcpp/node.hpp index e46d262..bc8b93e 100644 --- a/rclcpp/include/rclcpp/node.hpp +++ b/rclcpp/include/rclcpp/node.hpp @@ -192,16 +192,18 @@ public: * \param[in] msg_mem_strat The message memory strategy to use for allocating messages. * \return Shared pointer to the created subscription. */ - /* TODO(jacquelinekay): - Windows build breaks when static member function passed as default - argument to msg_mem_strat, nullptr is a workaround. - */ template< typename MessageT, typename CallbackT, typename AllocatorT = std::allocator, - typename SubscriptionT = rclcpp::Subscription< - typename rclcpp::subscription_traits::has_message_type::type, AllocatorT>> + typename CallbackMessageT = + typename rclcpp::subscription_traits::has_message_type::type, + typename SubscriptionT = rclcpp::Subscription, + typename MessageMemoryStrategyT = rclcpp::message_memory_strategy::MessageMemoryStrategy< + CallbackMessageT, + AllocatorT + > + > std::shared_ptr create_subscription( const std::string & topic_name, @@ -209,10 +211,10 @@ public: CallbackT && callback, const SubscriptionOptionsWithAllocator & options = SubscriptionOptionsWithAllocator(), - typename rclcpp::message_memory_strategy::MessageMemoryStrategy< - typename rclcpp::subscription_traits::has_message_type::type, AllocatorT - >::SharedPtr - msg_mem_strat = nullptr); + typename MessageMemoryStrategyT::SharedPtr msg_mem_strat = ( + MessageMemoryStrategyT::create_default() + ) + ); /// Create a timer. /** diff --git a/rclcpp/include/rclcpp/node_impl.hpp b/rclcpp/include/rclcpp/node_impl.hpp index e7c30d5..56bca44 100644 --- a/rclcpp/include/rclcpp/node_impl.hpp +++ b/rclcpp/include/rclcpp/node_impl.hpp @@ -83,16 +83,16 @@ template< typename MessageT, typename CallbackT, typename AllocatorT, - typename SubscriptionT> + typename CallbackMessageT, + typename SubscriptionT, + typename MessageMemoryStrategyT> std::shared_ptr Node::create_subscription( const std::string & topic_name, const rclcpp::QoS & qos, CallbackT && callback, const SubscriptionOptionsWithAllocator & options, - typename rclcpp::message_memory_strategy::MessageMemoryStrategy< - typename rclcpp::subscription_traits::has_message_type::type, AllocatorT>::SharedPtr - msg_mem_strat) + typename MessageMemoryStrategyT::SharedPtr msg_mem_strat) { return rclcpp::create_subscription( *this, diff --git a/rclcpp/include/rclcpp/node_interfaces/node_topics.hpp b/rclcpp/include/rclcpp/node_interfaces/node_topics.hpp index a36c8e6..8209984 100644 --- a/rclcpp/include/rclcpp/node_interfaces/node_topics.hpp +++ b/rclcpp/include/rclcpp/node_interfaces/node_topics.hpp @@ -49,8 +49,7 @@ public: create_publisher( const std::string & topic_name, const rclcpp::PublisherFactory & publisher_factory, - const rcl_publisher_options_t & publisher_options, - bool use_intra_process) override; + const rclcpp::QoS & qos) override; RCLCPP_PUBLIC void @@ -63,8 +62,7 @@ public: create_subscription( const std::string & topic_name, const rclcpp::SubscriptionFactory & subscription_factory, - const rcl_subscription_options_t & subscription_options, - bool use_intra_process) override; + const rclcpp::QoS & qos) override; RCLCPP_PUBLIC void diff --git a/rclcpp/include/rclcpp/node_interfaces/node_topics_interface.hpp b/rclcpp/include/rclcpp/node_interfaces/node_topics_interface.hpp index 7e70dc0..2beb7f0 100644 --- a/rclcpp/include/rclcpp/node_interfaces/node_topics_interface.hpp +++ b/rclcpp/include/rclcpp/node_interfaces/node_topics_interface.hpp @@ -50,8 +50,7 @@ public: create_publisher( const std::string & topic_name, const rclcpp::PublisherFactory & publisher_factory, - const rcl_publisher_options_t & publisher_options, - bool use_intra_process) = 0; + const rclcpp::QoS & qos) = 0; RCLCPP_PUBLIC virtual @@ -66,8 +65,7 @@ public: create_subscription( const std::string & topic_name, const rclcpp::SubscriptionFactory & subscription_factory, - const rcl_subscription_options_t & subscription_options, - bool use_intra_process) = 0; + const rclcpp::QoS & qos) = 0; RCLCPP_PUBLIC virtual diff --git a/rclcpp/include/rclcpp/publisher.hpp b/rclcpp/include/rclcpp/publisher.hpp index 3357d54..2bb3101 100644 --- a/rclcpp/include/rclcpp/publisher.hpp +++ b/rclcpp/include/rclcpp/publisher.hpp @@ -32,9 +32,12 @@ #include "rclcpp/allocator/allocator_common.hpp" #include "rclcpp/allocator/allocator_deleter.hpp" +#include "rclcpp/detail/resolve_use_intra_process.hpp" #include "rclcpp/intra_process_manager.hpp" #include "rclcpp/macros.hpp" +#include "rclcpp/node_interfaces/node_base_interface.hpp" #include "rclcpp/publisher_base.hpp" +#include "rclcpp/publisher_options.hpp" #include "rclcpp/type_support_decl.hpp" #include "rclcpp/visibility_control.hpp" @@ -42,40 +45,78 @@ namespace rclcpp { /// A publisher publishes messages of any type to a topic. -template> +template> class Publisher : public PublisherBase { public: - using MessageAllocTraits = allocator::AllocRebind; - using MessageAlloc = typename MessageAllocTraits::allocator_type; - using MessageDeleter = allocator::Deleter; + using MessageAllocatorTraits = allocator::AllocRebind; + using MessageAllocator = typename MessageAllocatorTraits::allocator_type; + using MessageDeleter = allocator::Deleter; using MessageUniquePtr = std::unique_ptr; using MessageSharedPtr = std::shared_ptr; - RCLCPP_SMART_PTR_DEFINITIONS(Publisher) + RCLCPP_SMART_PTR_DEFINITIONS(Publisher) Publisher( rclcpp::node_interfaces::NodeBaseInterface * node_base, const std::string & topic, - const rcl_publisher_options_t & publisher_options, - const PublisherEventCallbacks & event_callbacks, - const std::shared_ptr & allocator) + const rclcpp::QoS & qos, + const rclcpp::PublisherOptionsWithAllocator & options) : PublisherBase( node_base, topic, *rosidl_typesupport_cpp::get_message_type_support_handle(), - publisher_options), - message_allocator_(allocator) + options.template to_rcl_publisher_options(qos)), + options_(options), + message_allocator_(new MessageAllocator(*options.get_allocator().get())) { allocator::set_allocator_for_deleter(&message_deleter_, message_allocator_.get()); - if (event_callbacks.deadline_callback) { + if (options_.event_callbacks.deadline_callback) { this->add_event_handler( - event_callbacks.deadline_callback, + options_.event_callbacks.deadline_callback, RCL_PUBLISHER_OFFERED_DEADLINE_MISSED); } - if (event_callbacks.liveliness_callback) { - this->add_event_handler(event_callbacks.liveliness_callback, RCL_PUBLISHER_LIVELINESS_LOST); + if (options_.event_callbacks.liveliness_callback) { + this->add_event_handler( + options_.event_callbacks.liveliness_callback, + RCL_PUBLISHER_LIVELINESS_LOST); + } + + // Setup continues in the post construction method, post_init_setup(). + } + + /// Called post construction, so that construction may continue after shared_from_this() works. + virtual + void + post_init_setup( + rclcpp::node_interfaces::NodeBaseInterface * node_base, + const std::string & topic, + const rclcpp::QoS & qos, + const rclcpp::PublisherOptionsWithAllocator & options) + { + // Topic is unused for now. + (void)topic; + + // If needed, setup intra process communication. + if (rclcpp::detail::resolve_use_intra_process(options_, *node_base)) { + auto context = node_base->get_context(); + // Get the intra process manager instance for this context. + auto ipm = context->get_sub_context(); + // Register the publisher with the intra process manager. + if (qos.get_rmw_qos_profile().history == RMW_QOS_POLICY_HISTORY_KEEP_ALL) { + throw std::invalid_argument( + "intraprocess communication is not allowed with keep all history qos policy"); + } + if (qos.get_rmw_qos_profile().depth == 0) { + throw std::invalid_argument( + "intraprocess communication is not allowed with a zero qos history depth value"); + } + uint64_t intra_process_publisher_id = ipm->add_publisher(this->shared_from_this()); + this->setup_intra_process( + intra_process_publisher_id, + ipm, + options.template to_rcl_publisher_options(qos)); } } @@ -87,7 +128,7 @@ public: { return mapped_ring_buffer::MappedRingBuffer< MessageT, - typename Publisher::MessageAlloc + typename Publisher::MessageAllocator >::make_shared(size, this->get_allocator()); } @@ -138,8 +179,8 @@ public: // Otherwise we have to allocate memory in a unique_ptr and pass it along. // As the message is not const, a copy should be made. // A shared_ptr could also be constructed here. - auto ptr = MessageAllocTraits::allocate(*message_allocator_.get(), 1); - MessageAllocTraits::construct(*message_allocator_.get(), ptr, msg); + auto ptr = MessageAllocatorTraits::allocate(*message_allocator_.get(), 1); + MessageAllocatorTraits::construct(*message_allocator_.get(), ptr, msg); MessageUniquePtr unique_msg(ptr, message_deleter_); this->publish(std::move(unique_msg)); } @@ -150,7 +191,8 @@ public: return this->do_serialized_publish(&serialized_msg); } - std::shared_ptr get_allocator() const + std::shared_ptr + get_allocator() const { return message_allocator_; } @@ -224,7 +266,7 @@ protected: throw std::runtime_error("cannot publisher msg which is a null pointer"); } uint64_t message_seq = - ipm->template store_intra_process_message(publisher_id, msg); + ipm->template store_intra_process_message(publisher_id, msg); return message_seq; } @@ -242,11 +284,13 @@ protected: throw std::runtime_error("cannot publisher msg which is a null pointer"); } uint64_t message_seq = - ipm->template store_intra_process_message(publisher_id, std::move(msg)); + ipm->template store_intra_process_message(publisher_id, std::move(msg)); return message_seq; } - std::shared_ptr message_allocator_; + const rclcpp::PublisherOptionsWithAllocator options_; + + std::shared_ptr message_allocator_; MessageDeleter message_deleter_; }; diff --git a/rclcpp/include/rclcpp/publisher_base.hpp b/rclcpp/include/rclcpp/publisher_base.hpp index afb6830..1901a27 100644 --- a/rclcpp/include/rclcpp/publisher_base.hpp +++ b/rclcpp/include/rclcpp/publisher_base.hpp @@ -52,7 +52,7 @@ namespace intra_process_manager class IntraProcessManager; } -class PublisherBase +class PublisherBase : public std::enable_shared_from_this { friend ::rclcpp::node_interfaces::NodeTopicsInterface; @@ -184,8 +184,9 @@ public: /// Implementation utility function that creates a typed mapped ring buffer. RCLCPP_PUBLIC + virtual mapped_ring_buffer::MappedRingBufferBase::SharedPtr - virtual make_mapped_ring_buffer(size_t size) const; + make_mapped_ring_buffer(size_t size) const; /// Implementation utility function used to setup intra process publishing after creation. RCLCPP_PUBLIC diff --git a/rclcpp/include/rclcpp/publisher_factory.hpp b/rclcpp/include/rclcpp/publisher_factory.hpp index 8e6e8e2..65e3c1f 100644 --- a/rclcpp/include/rclcpp/publisher_factory.hpp +++ b/rclcpp/include/rclcpp/publisher_factory.hpp @@ -24,6 +24,7 @@ #include "rosidl_typesupport_cpp/message_type_support.hpp" #include "rclcpp/publisher.hpp" +#include "rclcpp/publisher_options.hpp" #include "rclcpp/intra_process_manager.hpp" #include "rclcpp/node_interfaces/node_base_interface.hpp" #include "rclcpp/visibility_control.hpp" @@ -41,6 +42,9 @@ namespace rclcpp * called from a templated "create_publisher" method on the Node class, and * is passed to the non-templated "create_publisher" method on the NodeTopics * class where it is used to create and setup the Publisher. + * + * It also handles the two step construction of Publishers, first calling + * the constructor and then the post_init_setup() method. */ struct PublisherFactory { @@ -49,39 +53,33 @@ struct PublisherFactory rclcpp::PublisherBase::SharedPtr( rclcpp::node_interfaces::NodeBaseInterface * node_base, const std::string & topic_name, - const rcl_publisher_options_t & publisher_options)>; + const rclcpp::QoS & qos + )>; - PublisherFactoryFunction create_typed_publisher; + const PublisherFactoryFunction create_typed_publisher; }; -/// Return a PublisherFactory with functions setup for creating a PublisherT. -template +/// Return a PublisherFactory with functions setup for creating a PublisherT. +template PublisherFactory -create_publisher_factory( - const PublisherEventCallbacks & event_callbacks, - std::shared_ptr allocator) +create_publisher_factory(const rclcpp::PublisherOptionsWithAllocator & options) { - PublisherFactory factory; - - // factory function that creates a MessageT specific PublisherT - factory.create_typed_publisher = - [event_callbacks, allocator]( - rclcpp::node_interfaces::NodeBaseInterface * node_base, - const std::string & topic_name, - const rcl_publisher_options_t & publisher_options + PublisherFactory factory { + // factory function that creates a MessageT specific PublisherT + [options]( + rclcpp::node_interfaces::NodeBaseInterface * node_base, + const std::string & topic_name, + const rclcpp::QoS & qos ) -> std::shared_ptr { - auto options_copy = publisher_options; - auto message_alloc = std::make_shared(*allocator.get()); - options_copy.allocator = allocator::get_rcl_allocator(*message_alloc.get()); - - return std::make_shared( - node_base, - topic_name, - options_copy, - event_callbacks, - message_alloc); - }; + auto publisher = std::make_shared(node_base, topic_name, qos, options); + // This is used for setting up things like intra process comms which + // require this->shared_from_this() which cannot be called from + // the constructor. + publisher->post_init_setup(node_base, topic_name, qos, options); + return publisher; + } + }; // return the factory now that it is populated return factory; diff --git a/rclcpp/include/rclcpp/publisher_options.hpp b/rclcpp/include/rclcpp/publisher_options.hpp index d1a7f64..405d339 100644 --- a/rclcpp/include/rclcpp/publisher_options.hpp +++ b/rclcpp/include/rclcpp/publisher_options.hpp @@ -19,16 +19,21 @@ #include #include -#include "rclcpp/callback_group.hpp" +#include "rcl/publisher.h" + +#include "rclcpp/allocator/allocator_common.hpp" #include "rclcpp/intra_process_setting.hpp" #include "rclcpp/qos.hpp" #include "rclcpp/qos_event.hpp" -#include "rclcpp/visibility_control.hpp" -#include "rcl/publisher.h" namespace rclcpp { +namespace callback_group +{ +class CallbackGroup; +} // namespace callback_group + /// Non-templated part of PublisherOptionsWithAllocator. struct PublisherOptionsBase { @@ -39,7 +44,7 @@ struct PublisherOptionsBase PublisherEventCallbacks event_callbacks; /// Callback group in which the waitable items from the publisher should be placed. - rclcpp::callback_group::CallbackGroup::SharedPtr callback_group; + std::shared_ptr callback_group; }; /// Structure containing optional configuration for Publishers. @@ -64,11 +69,26 @@ struct PublisherOptionsWithAllocator : public PublisherOptionsBase rcl_publisher_options_t result; using AllocatorTraits = std::allocator_traits; using MessageAllocatorT = typename AllocatorTraits::template rebind_alloc; - auto message_alloc = std::make_shared(*allocator.get()); - result.allocator = allocator::get_rcl_allocator(*message_alloc); + auto message_alloc = std::make_shared(*this->get_allocator().get()); + result.allocator = rclcpp::allocator::get_rcl_allocator(*message_alloc); result.qos = qos.get_rmw_qos_profile(); return result; } + + /// Get the allocator, creating one if needed. + std::shared_ptr + get_allocator() const + { + if (!this->allocator) { + // TODO(wjwwood): I would like to use the commented line instead, but + // cppcheck 1.89 fails with: + // Syntax Error: AST broken, binary operator '>' doesn't have two operands. + // return std::make_shared(); + std::shared_ptr tmp(new Allocator()); + return tmp; + } + return this->allocator; + } }; using PublisherOptions = PublisherOptionsWithAllocator>; diff --git a/rclcpp/include/rclcpp/subscription.hpp b/rclcpp/include/rclcpp/subscription.hpp index e963e85..ae76835 100644 --- a/rclcpp/include/rclcpp/subscription.hpp +++ b/rclcpp/include/rclcpp/subscription.hpp @@ -32,13 +32,16 @@ #include "rcl_interfaces/msg/intra_process_message.hpp" #include "rclcpp/any_subscription_callback.hpp" +#include "rclcpp/detail/resolve_use_intra_process.hpp" #include "rclcpp/exceptions.hpp" #include "rclcpp/expand_topic_or_service_name.hpp" #include "rclcpp/intra_process_manager.hpp" #include "rclcpp/logging.hpp" #include "rclcpp/macros.hpp" #include "rclcpp/message_memory_strategy.hpp" +#include "rclcpp/node_interfaces/node_base_interface.hpp" #include "rclcpp/subscription_base.hpp" +#include "rclcpp/subscription_options.hpp" #include "rclcpp/subscription_traits.hpp" #include "rclcpp/type_support_decl.hpp" #include "rclcpp/visibility_control.hpp" @@ -55,15 +58,19 @@ class NodeTopicsInterface; /// Subscription implementation, templated on the type of message this subscription receives. template< typename CallbackMessageT, - typename Alloc = std::allocator> + typename AllocatorT = std::allocator, + typename MessageMemoryStrategyT = rclcpp::message_memory_strategy::MessageMemoryStrategy< + CallbackMessageT, + AllocatorT + >> class Subscription : public SubscriptionBase { friend class rclcpp::node_interfaces::NodeTopicsInterface; public: - using MessageAllocTraits = allocator::AllocRebind; - using MessageAlloc = typename MessageAllocTraits::allocator_type; - using MessageDeleter = allocator::Deleter; + using MessageAllocatorTraits = allocator::AllocRebind; + using MessageAllocator = typename MessageAllocatorTraits::allocator_type; + using MessageDeleter = allocator::Deleter; using ConstMessageSharedPtr = std::shared_ptr; using MessageUniquePtr = std::unique_ptr; @@ -71,46 +78,66 @@ public: /// Default constructor. /** - * The constructor for a subscription is almost never called directly. Instead, subscriptions - * should be instantiated through Node::create_subscription. - * \param[in] node_handle rcl representation of the node that owns this subscription. + * The constructor for a subscription is almost never called directly. + * Instead, subscriptions should be instantiated through the function + * rclcpp::create_subscription(). + * + * \param[in] node_base NodeBaseInterface pointer that is used in part of the setup. * \param[in] type_support_handle rosidl type support struct, for the Message type of the topic. * \param[in] topic_name Name of the topic to subscribe to. - * \param[in] subscription_options options for the subscription. * \param[in] callback User defined callback to call when a message is received. - * \param[in] memory_strategy The memory strategy to be used for managing message memory. + * \param[in] options options for the subscription. + * \param[in] message_memory_strategy The memory strategy to be used for managing message memory. */ Subscription( - std::shared_ptr node_handle, + rclcpp::node_interfaces::NodeBaseInterface * node_base, const rosidl_message_type_support_t & type_support_handle, const std::string & topic_name, - const rcl_subscription_options_t & subscription_options, - AnySubscriptionCallback callback, - const SubscriptionEventCallbacks & event_callbacks, - typename message_memory_strategy::MessageMemoryStrategy::SharedPtr - memory_strategy = message_memory_strategy::MessageMemoryStrategy::create_default()) + const rclcpp::QoS & qos, + AnySubscriptionCallback callback, + const rclcpp::SubscriptionOptionsWithAllocator & options, + typename MessageMemoryStrategyT::SharedPtr message_memory_strategy) : SubscriptionBase( - node_handle, + node_base, type_support_handle, topic_name, - subscription_options, + options.template to_rcl_subscription_options(qos), rclcpp::subscription_traits::is_serialized_subscription_argument::value), any_callback_(callback), - message_memory_strategy_(memory_strategy) + message_memory_strategy_(message_memory_strategy) { - if (event_callbacks.deadline_callback) { + if (options.event_callbacks.deadline_callback) { this->add_event_handler( - event_callbacks.deadline_callback, + options.event_callbacks.deadline_callback, RCL_SUBSCRIPTION_REQUESTED_DEADLINE_MISSED); } - if (event_callbacks.liveliness_callback) { + if (options.event_callbacks.liveliness_callback) { this->add_event_handler( - event_callbacks.liveliness_callback, + options.event_callbacks.liveliness_callback, RCL_SUBSCRIPTION_LIVELINESS_CHANGED); } } + /// Called after construction to continue setup that requires shared_from_this(). + void + post_init_setup( + rclcpp::node_interfaces::NodeBaseInterface * node_base, + const rclcpp::QoS & qos, + const rclcpp::SubscriptionOptionsWithAllocator & options) + { + // Setup intra process publishing if requested. + if (rclcpp::detail::resolve_use_intra_process(options, *node_base)) { + auto context = node_base->get_context(); + using rclcpp::intra_process_manager::IntraProcessManager; + auto ipm = context->get_sub_context(); + uint64_t intra_process_subscription_id = ipm->add_subscription(this->shared_from_this()); + this->setup_intra_process( + intra_process_subscription_id, + ipm, + options.template to_rcl_subscription_options(qos)); + } + } + /// Support dynamically setting the message memory strategy. /** * Behavior may be undefined if called while the subscription could be executing. @@ -118,7 +145,7 @@ public: */ void set_message_memory_strategy( typename message_memory_strategy::MessageMemoryStrategy::SharedPtr message_memory_strategy) + AllocatorT>::SharedPtr message_memory_strategy) { message_memory_strategy_ = message_memory_strategy; } @@ -237,7 +264,7 @@ private: throw std::runtime_error( "intra process take called after destruction of intra process manager"); } - ipm->template take_intra_process_message( + ipm->template take_intra_process_message( publisher_id, message_sequence, subscription_id, message); } @@ -253,7 +280,7 @@ private: throw std::runtime_error( "intra process take called after destruction of intra process manager"); } - ipm->template take_intra_process_message( + ipm->template take_intra_process_message( publisher_id, message_sequence, subscription_id, message); } @@ -274,8 +301,8 @@ private: RCLCPP_DISABLE_COPY(Subscription) - AnySubscriptionCallback any_callback_; - typename message_memory_strategy::MessageMemoryStrategy::SharedPtr + AnySubscriptionCallback any_callback_; + typename message_memory_strategy::MessageMemoryStrategy::SharedPtr message_memory_strategy_; }; diff --git a/rclcpp/include/rclcpp/subscription_base.hpp b/rclcpp/include/rclcpp/subscription_base.hpp index 35255da..25b99b8 100644 --- a/rclcpp/include/rclcpp/subscription_base.hpp +++ b/rclcpp/include/rclcpp/subscription_base.hpp @@ -36,7 +36,7 @@ namespace rclcpp namespace node_interfaces { -class NodeTopicsInterface; +class NodeBaseInterface; } // namespace node_interfaces namespace intra_process_manager @@ -50,14 +50,14 @@ class IntraProcessManager; /// Virtual base class for subscriptions. This pattern allows us to iterate over different template /// specializations of Subscription, among other things. -class SubscriptionBase +class SubscriptionBase : public std::enable_shared_from_this { public: RCLCPP_SMART_PTR_DEFINITIONS_NOT_COPYABLE(SubscriptionBase) /// Default constructor. /** - * \param[in] node_handle The rcl representation of the node that owns this subscription. + * \param[in] node_base NodeBaseInterface pointer used in parts of the setup. * \param[in] type_support_handle rosidl type support struct, for the Message type of the topic. * \param[in] topic_name Name of the topic to subscribe to. * \param[in] subscription_options options for the subscription. @@ -65,7 +65,7 @@ public: */ RCLCPP_PUBLIC SubscriptionBase( - std::shared_ptr node_handle, + rclcpp::node_interfaces::NodeBaseInterface * node_base, const rosidl_message_type_support_t & type_support_handle, const std::string & topic_name, const rcl_subscription_options_t & subscription_options, @@ -114,12 +114,16 @@ public: /// Borrow a new message. /** \return Shared pointer to the fresh message. */ - virtual std::shared_ptr + RCLCPP_PUBLIC + virtual + std::shared_ptr create_message() = 0; /// Borrow a new serialized message /** \return Shared pointer to a rcl_message_serialized_t. */ - virtual std::shared_ptr + RCLCPP_PUBLIC + virtual + std::shared_ptr create_serialized_message() = 0; /// Check if we need to handle the message, and execute the callback if we do. @@ -127,27 +131,37 @@ public: * \param[in] message Shared pointer to the message to handle. * \param[in] message_info Metadata associated with this message. */ - virtual void + RCLCPP_PUBLIC + virtual + void handle_message(std::shared_ptr & message, const rmw_message_info_t & message_info) = 0; /// Return the message borrowed in create_message. /** \param[in] message Shared pointer to the returned message. */ - virtual void + RCLCPP_PUBLIC + virtual + void return_message(std::shared_ptr & message) = 0; /// Return the message borrowed in create_serialized_message. /** \param[in] message Shared pointer to the returned message. */ - virtual void + RCLCPP_PUBLIC + virtual + void return_serialized_message(std::shared_ptr & message) = 0; - virtual void + RCLCPP_PUBLIC + virtual + void handle_intra_process_message( rcl_interfaces::msg::IntraProcessMessage & ipm, const rmw_message_info_t & message_info) = 0; + RCLCPP_PUBLIC const rosidl_message_type_support_t & get_message_type_support_handle() const; + RCLCPP_PUBLIC bool is_serialized() const; @@ -161,7 +175,9 @@ public: std::weak_ptr; /// Implemenation detail. - void setup_intra_process( + RCLCPP_PUBLIC + void + setup_intra_process( uint64_t intra_process_subscription_id, IntraProcessManagerWeakPtr weak_ipm, const rcl_subscription_options_t & intra_process_options); diff --git a/rclcpp/include/rclcpp/subscription_factory.hpp b/rclcpp/include/rclcpp/subscription_factory.hpp index 9b46fe3..a5175c8 100644 --- a/rclcpp/include/rclcpp/subscription_factory.hpp +++ b/rclcpp/include/rclcpp/subscription_factory.hpp @@ -43,6 +43,9 @@ namespace rclcpp * usually called from a templated "create_subscription" method of the Node * class, and is passed to the non-templated "create_subscription" method of * the NodeTopics class where it is used to create and setup the Subscription. + * + * It also handles the two step construction of Subscriptions, first calling + * the constructor and then the post_init_setup() method. */ struct SubscriptionFactory { @@ -51,71 +54,62 @@ struct SubscriptionFactory rclcpp::SubscriptionBase::SharedPtr( rclcpp::node_interfaces::NodeBaseInterface * node_base, const std::string & topic_name, - const rcl_subscription_options_t & subscription_options)>; + const rclcpp::QoS & qos)>; - SubscriptionFactoryFunction create_typed_subscription; - - // Function that takes a MessageT from the intra process manager - using SetupIntraProcessFunction = std::function< - void ( - rclcpp::intra_process_manager::IntraProcessManager::SharedPtr ipm, - rclcpp::SubscriptionBase::SharedPtr subscription, - const rcl_subscription_options_t & subscription_options)>; - - SetupIntraProcessFunction setup_intra_process; + const SubscriptionFactoryFunction create_typed_subscription; }; /// Return a SubscriptionFactory with functions for creating a SubscriptionT. template< typename MessageT, typename CallbackT, - typename Alloc, - typename CallbackMessageT, - typename SubscriptionT> + typename AllocatorT, + typename CallbackMessageT = + typename rclcpp::subscription_traits::has_message_type::type, + typename SubscriptionT = rclcpp::Subscription, + typename MessageMemoryStrategyT = rclcpp::message_memory_strategy::MessageMemoryStrategy< + CallbackMessageT, + AllocatorT + >> SubscriptionFactory create_subscription_factory( CallbackT && callback, - const SubscriptionEventCallbacks & event_callbacks, - typename rclcpp::message_memory_strategy::MessageMemoryStrategy< - CallbackMessageT, Alloc>::SharedPtr - msg_mem_strat, - std::shared_ptr allocator) + const rclcpp::SubscriptionOptionsWithAllocator & options, + typename MessageMemoryStrategyT::SharedPtr msg_mem_strat) { - SubscriptionFactory factory; + auto allocator = options.get_allocator(); using rclcpp::AnySubscriptionCallback; - AnySubscriptionCallback any_subscription_callback(allocator); + AnySubscriptionCallback any_subscription_callback(allocator); any_subscription_callback.set(std::forward(callback)); - auto message_alloc = - std::make_shared::MessageAlloc>(); - - // factory function that creates a MessageT specific SubscriptionT - factory.create_typed_subscription = - [allocator, msg_mem_strat, any_subscription_callback, event_callbacks, message_alloc]( - rclcpp::node_interfaces::NodeBaseInterface * node_base, - const std::string & topic_name, - const rcl_subscription_options_t & subscription_options + SubscriptionFactory factory { + // factory function that creates a MessageT specific SubscriptionT + [options, msg_mem_strat, any_subscription_callback]( + rclcpp::node_interfaces::NodeBaseInterface * node_base, + const std::string & topic_name, + const rclcpp::QoS & qos ) -> rclcpp::SubscriptionBase::SharedPtr { - auto options_copy = subscription_options; - options_copy.allocator = - rclcpp::allocator::get_rcl_allocator(*message_alloc.get()); - using rclcpp::Subscription; using rclcpp::SubscriptionBase; - auto sub = Subscription::make_shared( - node_base->get_shared_rcl_node_handle(), + auto sub = Subscription::make_shared( + node_base, *rosidl_typesupport_cpp::get_message_type_support_handle(), topic_name, - options_copy, + qos, any_subscription_callback, - event_callbacks, + options, msg_mem_strat); + // This is used for setting up things like intra process comms which + // require this->shared_from_this() which cannot be called from + // the constructor. + sub->post_init_setup(node_base, qos, options); auto sub_base_ptr = std::dynamic_pointer_cast(sub); return sub_base_ptr; - }; + } + }; // return the factory now that it is populated return factory; diff --git a/rclcpp/include/rclcpp/subscription_options.hpp b/rclcpp/include/rclcpp/subscription_options.hpp index d76be04..710d66a 100644 --- a/rclcpp/include/rclcpp/subscription_options.hpp +++ b/rclcpp/include/rclcpp/subscription_options.hpp @@ -70,6 +70,16 @@ struct SubscriptionOptionsWithAllocator : public SubscriptionOptionsBase result.qos = qos.get_rmw_qos_profile(); return result; } + + /// Get the allocator, creating one if needed. + std::shared_ptr + get_allocator() const + { + if (!this->allocator) { + return std::make_shared(); + } + return this->allocator; + } }; using SubscriptionOptions = SubscriptionOptionsWithAllocator>; diff --git a/rclcpp/src/rclcpp/node_interfaces/node_topics.cpp b/rclcpp/src/rclcpp/node_interfaces/node_topics.cpp index 8bed672..74e034e 100644 --- a/rclcpp/src/rclcpp/node_interfaces/node_topics.cpp +++ b/rclcpp/src/rclcpp/node_interfaces/node_topics.cpp @@ -34,36 +34,10 @@ rclcpp::PublisherBase::SharedPtr NodeTopics::create_publisher( const std::string & topic_name, const rclcpp::PublisherFactory & publisher_factory, - const rcl_publisher_options_t & publisher_options, - bool use_intra_process) + const rclcpp::QoS & qos) { - // Create the MessageT specific Publisher using the factory, but store it as PublisherBase. - auto publisher = publisher_factory.create_typed_publisher( - node_base_, topic_name, publisher_options); - - // Setup intra process publishing if requested. - if (use_intra_process) { - auto context = node_base_->get_context(); - // Get the intra process manager instance for this context. - auto ipm = context->get_sub_context(); - // Register the publisher with the intra process manager. - if (publisher_options.qos.history == RMW_QOS_POLICY_HISTORY_KEEP_ALL) { - throw std::invalid_argument( - "intraprocess communication is not allowed with keep all history qos policy"); - } - if (publisher_options.qos.depth == 0) { - throw std::invalid_argument( - "intraprocess communication is not allowed with a zero qos history depth value"); - } - uint64_t intra_process_publisher_id = ipm->add_publisher(publisher); - publisher->setup_intra_process( - intra_process_publisher_id, - ipm, - publisher_options); - } - - // Return the completed publisher. - return publisher; + // Create the MessageT specific Publisher using the factory, but return it as PublisherBase. + return publisher_factory.create_typed_publisher(node_base_, topic_name, qos); } void @@ -99,25 +73,10 @@ rclcpp::SubscriptionBase::SharedPtr NodeTopics::create_subscription( const std::string & topic_name, const rclcpp::SubscriptionFactory & subscription_factory, - const rcl_subscription_options_t & subscription_options, - bool use_intra_process) + const rclcpp::QoS & qos) { - auto subscription = subscription_factory.create_typed_subscription( - node_base_, topic_name, subscription_options); - - // Setup intra process publishing if requested. - if (use_intra_process) { - auto context = node_base_->get_context(); - auto ipm = - context->get_sub_context(); - uint64_t intra_process_subscription_id = ipm->add_subscription(subscription); - auto options_copy = subscription_options; - options_copy.ignore_local_publications = false; - subscription->setup_intra_process(intra_process_subscription_id, ipm, options_copy); - } - - // Return the completed subscription. - return subscription; + // Create the MessageT specific Subscription using the factory, but return a SubscriptionBase. + return subscription_factory.create_typed_subscription(node_base_, topic_name, qos); } void diff --git a/rclcpp/src/rclcpp/subscription_base.cpp b/rclcpp/src/rclcpp/subscription_base.cpp index 5189150..8d9b205 100644 --- a/rclcpp/src/rclcpp/subscription_base.cpp +++ b/rclcpp/src/rclcpp/subscription_base.cpp @@ -23,6 +23,7 @@ #include "rclcpp/expand_topic_or_service_name.hpp" #include "rclcpp/intra_process_manager.hpp" #include "rclcpp/logging.hpp" +#include "rclcpp/node_interfaces/node_base_interface.hpp" #include "rmw/error_handling.h" #include "rmw/rmw.h" @@ -30,18 +31,18 @@ using rclcpp::SubscriptionBase; SubscriptionBase::SubscriptionBase( - std::shared_ptr node_handle, + rclcpp::node_interfaces::NodeBaseInterface * node_base, const rosidl_message_type_support_t & type_support_handle, const std::string & topic_name, const rcl_subscription_options_t & subscription_options, bool is_serialized) -: node_handle_(node_handle), +: node_handle_(node_base->get_shared_rcl_node_handle()), use_intra_process_(false), intra_process_subscription_id_(0), type_support_(type_support_handle), is_serialized_(is_serialized) { - auto custom_deletor = [node_handle](rcl_subscription_t * rcl_subs) + auto custom_deletor = [node_handle = this->node_handle_](rcl_subscription_t * rcl_subs) { if (rcl_subscription_fini(rcl_subs, node_handle.get()) != RCL_RET_OK) { RCLCPP_ERROR( @@ -167,7 +168,8 @@ SubscriptionBase::get_publisher_count() const return inter_process_publisher_count; } -void SubscriptionBase::setup_intra_process( +void +SubscriptionBase::setup_intra_process( uint64_t intra_process_subscription_id, IntraProcessManagerWeakPtr weak_ipm, const rcl_subscription_options_t & intra_process_options) diff --git a/rclcpp_lifecycle/include/rclcpp_lifecycle/lifecycle_node.hpp b/rclcpp_lifecycle/include/rclcpp_lifecycle/lifecycle_node.hpp index fec67c7..9d9fa7e 100644 --- a/rclcpp_lifecycle/include/rclcpp_lifecycle/lifecycle_node.hpp +++ b/rclcpp_lifecycle/include/rclcpp_lifecycle/lifecycle_node.hpp @@ -166,8 +166,9 @@ public: create_publisher( const std::string & topic_name, const rclcpp::QoS & qos, - const PublisherOptionsWithAllocator & options = - create_default_publisher_options() + const PublisherOptionsWithAllocator & options = ( + create_default_publisher_options() + ) ); /// Create and return a Subscription. @@ -179,15 +180,17 @@ public: * \param[in] msg_mem_strat The message memory strategy to use for allocating messages. * \return Shared pointer to the created subscription. */ - /* TODO(jacquelinekay): - Windows build breaks when static member function passed as default - argument to msg_mem_strat, nullptr is a workaround. - */ template< typename MessageT, typename CallbackT, typename AllocatorT = std::allocator, - typename SubscriptionT = rclcpp::Subscription> + typename CallbackMessageT = + typename rclcpp::subscription_traits::has_message_type::type, + typename SubscriptionT = rclcpp::Subscription, + typename MessageMemoryStrategyT = rclcpp::message_memory_strategy::MessageMemoryStrategy< + CallbackMessageT, + AllocatorT + >> std::shared_ptr create_subscription( const std::string & topic_name, @@ -195,10 +198,10 @@ public: CallbackT && callback, const SubscriptionOptionsWithAllocator & options = create_default_subscription_options(), - typename rclcpp::message_memory_strategy::MessageMemoryStrategy< - typename rclcpp::subscription_traits::has_message_type::type, AllocatorT - >::SharedPtr - msg_mem_strat = nullptr); + typename MessageMemoryStrategyT::SharedPtr msg_mem_strat = ( + MessageMemoryStrategyT::create_default() + ) + ); /// Create a timer. /** diff --git a/rclcpp_lifecycle/include/rclcpp_lifecycle/lifecycle_node_impl.hpp b/rclcpp_lifecycle/include/rclcpp_lifecycle/lifecycle_node_impl.hpp index 68f573b..6fe3532 100644 --- a/rclcpp_lifecycle/include/rclcpp_lifecycle/lifecycle_node_impl.hpp +++ b/rclcpp_lifecycle/include/rclcpp_lifecycle/lifecycle_node_impl.hpp @@ -59,16 +59,16 @@ template< typename MessageT, typename CallbackT, typename AllocatorT, - typename SubscriptionT> + typename CallbackMessageT, + typename SubscriptionT, + typename MessageMemoryStrategyT> std::shared_ptr LifecycleNode::create_subscription( const std::string & topic_name, const rclcpp::QoS & qos, CallbackT && callback, const rclcpp::SubscriptionOptionsWithAllocator & options, - typename rclcpp::message_memory_strategy::MessageMemoryStrategy< - typename rclcpp::subscription_traits::has_message_type::type, AllocatorT>::SharedPtr - msg_mem_strat) + typename MessageMemoryStrategyT::SharedPtr msg_mem_strat) { return rclcpp::create_subscription( *this, diff --git a/rclcpp_lifecycle/include/rclcpp_lifecycle/lifecycle_publisher.hpp b/rclcpp_lifecycle/include/rclcpp_lifecycle/lifecycle_publisher.hpp index e5d2ada..2f94dc7 100644 --- a/rclcpp_lifecycle/include/rclcpp_lifecycle/lifecycle_publisher.hpp +++ b/rclcpp_lifecycle/include/rclcpp_lifecycle/lifecycle_publisher.hpp @@ -43,8 +43,7 @@ public: /// brief child class of rclcpp Publisher class. /** - * Overrides all publisher functions to check for - * enabled/disabled state. + * Overrides all publisher functions to check for enabled/disabled state. */ template> class LifecyclePublisher : public LifecyclePublisherInterface, @@ -61,11 +60,9 @@ public: LifecyclePublisher( rclcpp::node_interfaces::NodeBaseInterface * node_base, const std::string & topic, - const rcl_publisher_options_t & publisher_options, - const rclcpp::PublisherEventCallbacks event_callbacks, - std::shared_ptr allocator) - : rclcpp::Publisher( - node_base, topic, publisher_options, event_callbacks, allocator), + const rclcpp::QoS & qos, + const rclcpp::PublisherOptionsWithAllocator & options) + : rclcpp::Publisher(node_base, topic, qos, options), enabled_(false), logger_(rclcpp::get_logger("LifecyclePublisher")) {