From d11a10a583f677245faaa0d79c53e5a4d82b2384 Mon Sep 17 00:00:00 2001 From: ivanpauno Date: Wed, 3 Apr 2019 18:03:10 -0300 Subject: [PATCH] Check QoS policy when configuring intraprocess, skip interprocess publish when possible (#674) * Only setup intraprocess if 'durability' qos policy is 'volatile'. Signed-off-by: ivanpauno * Skip interprocess publish when only having intraprocess subscriptions. Signed-off-by: ivanpauno * Add intraprocess configuration option at publisher/subscription level Signed-off-by: ivanpauno * Use get_actual_qos when setting-up intraprocess. Add test. Signed-off-by: ivanpauno --- rclcpp/include/rclcpp/node.hpp | 23 ++++++++++--- rclcpp/include/rclcpp/node_impl.hpp | 53 ++++++++++++++++++++++++----- rclcpp/include/rclcpp/publisher.hpp | 8 +++-- rclcpp/src/rclcpp/publisher.cpp | 14 +++++--- rclcpp/test/test_publisher.cpp | 25 ++++++++++++-- 5 files changed, 102 insertions(+), 21 deletions(-) diff --git a/rclcpp/include/rclcpp/node.hpp b/rclcpp/include/rclcpp/node.hpp index a3fe7eb..2e34104 100644 --- a/rclcpp/include/rclcpp/node.hpp +++ b/rclcpp/include/rclcpp/node.hpp @@ -64,6 +64,17 @@ namespace rclcpp { +/// Used as argument in create_publisher and create_subscriber. +enum class IntraProcessSetting +{ + /// Explicitly enable intraprocess comm at publisher/subscription level. + Enable, + /// Explicitly disable intraprocess comm at publisher/subscription level. + Disable, + /// Take intraprocess configuration from the node. + NodeDefault +}; + /// Node is the single point of entry for creating publishers and subscribers. class Node : public std::enable_shared_from_this { @@ -152,7 +163,8 @@ public: std::shared_ptr create_publisher( const std::string & topic_name, size_t qos_history_depth, - std::shared_ptr allocator = nullptr); + std::shared_ptr allocator = nullptr, + IntraProcessSetting use_intra_process_comm = IntraProcessSetting::NodeDefault); /// Create and return a Publisher. /** @@ -168,7 +180,8 @@ public: create_publisher( const std::string & topic_name, const rmw_qos_profile_t & qos_profile = rmw_qos_profile_default, - std::shared_ptr allocator = nullptr); + std::shared_ptr allocator = nullptr, + IntraProcessSetting use_intra_process_comm = IntraProcessSetting::NodeDefault); /// Create and return a Subscription. /** @@ -201,7 +214,8 @@ public: typename rclcpp::message_memory_strategy::MessageMemoryStrategy< typename rclcpp::subscription_traits::has_message_type::type, Alloc>::SharedPtr msg_mem_strat = nullptr, - std::shared_ptr allocator = nullptr); + std::shared_ptr allocator = nullptr, + IntraProcessSetting use_intra_process_comm = IntraProcessSetting::NodeDefault); /// Create and return a Subscription. /** @@ -234,7 +248,8 @@ public: typename rclcpp::message_memory_strategy::MessageMemoryStrategy< typename rclcpp::subscription_traits::has_message_type::type, Alloc>::SharedPtr msg_mem_strat = nullptr, - std::shared_ptr allocator = nullptr); + std::shared_ptr allocator = nullptr, + IntraProcessSetting use_intra_process_comm = IntraProcessSetting::NodeDefault); /// Create a timer. /** diff --git a/rclcpp/include/rclcpp/node_impl.hpp b/rclcpp/include/rclcpp/node_impl.hpp index a2ef35b..0d35da8 100644 --- a/rclcpp/include/rclcpp/node_impl.hpp +++ b/rclcpp/include/rclcpp/node_impl.hpp @@ -55,14 +55,16 @@ template std::shared_ptr Node::create_publisher( const std::string & topic_name, size_t qos_history_depth, - std::shared_ptr allocator) + std::shared_ptr allocator, + IntraProcessSetting use_intra_process_comm) { if (!allocator) { allocator = std::make_shared(); } rmw_qos_profile_t qos = rmw_qos_profile_default; qos.depth = qos_history_depth; - return this->create_publisher(topic_name, qos, allocator); + return this->create_publisher(topic_name, qos, + allocator, use_intra_process_comm); } RCLCPP_LOCAL @@ -81,17 +83,33 @@ template std::shared_ptr Node::create_publisher( const std::string & topic_name, const rmw_qos_profile_t & qos_profile, - std::shared_ptr allocator) + std::shared_ptr allocator, IntraProcessSetting use_intra_process_comm) { if (!allocator) { allocator = std::make_shared(); } + bool use_intra_process; + switch (use_intra_process_comm) { + case IntraProcessSetting::Enable: + use_intra_process = true; + break; + case IntraProcessSetting::Disable: + use_intra_process = false; + break; + case IntraProcessSetting::NodeDefault: + use_intra_process = this->get_node_options().use_intra_process_comms(); + break; + default: + throw std::runtime_error("Unrecognized IntraProcessSetting value"); + break; + } + return rclcpp::create_publisher( this->node_topics_.get(), extend_name_with_sub_namespace(topic_name, this->get_sub_namespace()), qos_profile, - this->get_node_options().use_intra_process_comms(), + use_intra_process, allocator); } @@ -110,7 +128,8 @@ Node::create_subscription( typename rclcpp::message_memory_strategy::MessageMemoryStrategy< typename rclcpp::subscription_traits::has_message_type::type, Alloc>::SharedPtr msg_mem_strat, - std::shared_ptr allocator) + std::shared_ptr allocator, + IntraProcessSetting use_intra_process_comm) { using CallbackMessageT = typename rclcpp::subscription_traits::has_message_type::type; @@ -123,6 +142,22 @@ Node::create_subscription( msg_mem_strat = MessageMemoryStrategy::create_default(); } + bool use_intra_process; + switch (use_intra_process_comm) { + case IntraProcessSetting::Enable: + use_intra_process = true; + break; + case IntraProcessSetting::Disable: + use_intra_process = false; + break; + case IntraProcessSetting::NodeDefault: + use_intra_process = this->get_node_options().use_intra_process_comms(); + break; + default: + throw std::runtime_error("Unrecognized IntraProcessSetting value"); + break; + } + return rclcpp::create_subscription( this->node_topics_.get(), extend_name_with_sub_namespace(topic_name, this->get_sub_namespace()), @@ -130,7 +165,7 @@ Node::create_subscription( qos_profile, group, ignore_local_publications, - this->get_node_options().use_intra_process_comms(), + use_intra_process, msg_mem_strat, allocator); } @@ -150,7 +185,8 @@ Node::create_subscription( typename rclcpp::message_memory_strategy::MessageMemoryStrategy< typename rclcpp::subscription_traits::has_message_type::type, Alloc>::SharedPtr msg_mem_strat, - std::shared_ptr allocator) + std::shared_ptr allocator, + IntraProcessSetting use_intra_process_comm) { rmw_qos_profile_t qos = rmw_qos_profile_default; qos.depth = qos_history_depth; @@ -162,7 +198,8 @@ Node::create_subscription( group, ignore_local_publications, msg_mem_strat, - allocator); + allocator, + use_intra_process_comm); } template diff --git a/rclcpp/include/rclcpp/publisher.hpp b/rclcpp/include/rclcpp/publisher.hpp index 27a4177..3bc5814 100644 --- a/rclcpp/include/rclcpp/publisher.hpp +++ b/rclcpp/include/rclcpp/publisher.hpp @@ -183,7 +183,7 @@ protected: using IntraProcessManagerWeakPtr = std::weak_ptr; - bool use_intra_process_; + bool intra_process_is_enabled_; IntraProcessManagerWeakPtr weak_ipm_; uint64_t intra_process_publisher_id_; StoreMessageCallbackT store_intra_process_message_; @@ -230,7 +230,11 @@ public: virtual void publish(std::unique_ptr & msg) { - this->do_inter_process_publish(msg.get()); + bool inter_process_subscriptions_exist = + get_subscription_count() > get_intra_process_subscription_count(); + if (!intra_process_is_enabled_ || inter_process_subscriptions_exist) { + this->do_inter_process_publish(msg.get()); + } if (store_intra_process_message_) { // Take the pointer from the unique_msg, release it and pass as a void * // to the ipm. The ipm should then capture it again as a unique_ptr of diff --git a/rclcpp/src/rclcpp/publisher.cpp b/rclcpp/src/rclcpp/publisher.cpp index 275e145..a580bb5 100644 --- a/rclcpp/src/rclcpp/publisher.cpp +++ b/rclcpp/src/rclcpp/publisher.cpp @@ -43,7 +43,7 @@ PublisherBase::PublisherBase( const rosidl_message_type_support_t & type_support, const rcl_publisher_options_t & publisher_options) : rcl_node_handle_(node_base->get_shared_rcl_node_handle()), - use_intra_process_(false), intra_process_publisher_id_(0), + intra_process_is_enabled_(false), intra_process_publisher_id_(0), store_intra_process_message_(nullptr) { rcl_ret_t ret = rcl_publisher_init( @@ -99,7 +99,7 @@ PublisherBase::~PublisherBase() auto ipm = weak_ipm_.lock(); - if (!use_intra_process_) { + if (!intra_process_is_enabled_) { return; } if (!ipm) { @@ -183,13 +183,12 @@ size_t PublisherBase::get_intra_process_subscription_count() const { auto ipm = weak_ipm_.lock(); - if (!use_intra_process_) { + if (!intra_process_is_enabled_) { return 0; } if (!ipm) { // TODO(ivanpauno): should this just return silently? Or maybe return with a warning? // Same as wjwwood comment in publisher_factory create_shared_publish_callback. - // If we don't raise an error here, use_intra_process_ flag is unnecessary. throw std::runtime_error( "intra process subscriber count called after " "destruction of intra process manager"); @@ -243,6 +242,11 @@ PublisherBase::setup_intra_process( IntraProcessManagerSharedPtr ipm, const rcl_publisher_options_t & intra_process_options) { + // Intraprocess configuration is not allowed with "durability" qos policy non "volatile". + if (this->get_actual_qos().durability != RMW_QOS_POLICY_DURABILITY_VOLATILE) { + throw exceptions::InvalidParametersException( + "intraprocess communication is not allowed with durability qos policy non-volatile"); + } const char * topic_name = this->get_topic_name(); if (!topic_name) { throw std::runtime_error("failed to get topic name"); @@ -273,7 +277,7 @@ PublisherBase::setup_intra_process( intra_process_publisher_id_ = intra_process_publisher_id; store_intra_process_message_ = store_callback; weak_ipm_ = ipm; - use_intra_process_ = true; + intra_process_is_enabled_ = true; // Life time of this object is tied to the publisher handle. rmw_publisher_t * publisher_rmw_handle = rcl_publisher_get_rmw_handle( diff --git a/rclcpp/test/test_publisher.cpp b/rclcpp/test/test_publisher.cpp index 24486b7..572daff 100644 --- a/rclcpp/test/test_publisher.cpp +++ b/rclcpp/test/test_publisher.cpp @@ -30,9 +30,9 @@ protected: rclcpp::init(0, nullptr); } - void SetUp() + void initialize(const rclcpp::NodeOptions & node_options = rclcpp::NodeOptions()) { - node = std::make_shared("my_node", "/ns"); + node = std::make_shared("my_node", "/ns", node_options); } void TearDown() @@ -69,6 +69,7 @@ protected: Testing publisher construction and destruction. */ TEST_F(TestPublisher, construction_and_destruction) { + initialize(); using rcl_interfaces::msg::IntraProcessMessage; { auto publisher = node->create_publisher("topic"); @@ -81,6 +82,26 @@ TEST_F(TestPublisher, construction_and_destruction) { } } +/* + Testing publisher with intraprocess enabled and invalid QoS + */ +TEST_F(TestPublisher, intraprocess_with_invalid_qos) { + initialize(rclcpp::NodeOptions().use_intra_process_comms(true)); + rmw_qos_profile_t qos = { + RMW_QOS_POLICY_HISTORY_KEEP_LAST, + 1, + RMW_QOS_POLICY_RELIABILITY_RELIABLE, + RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL, + false + }; + using rcl_interfaces::msg::IntraProcessMessage; + { + ASSERT_THROW( + {auto publisher = node->create_publisher("topic", qos);}, + rclcpp::exceptions::InvalidParametersException); + } +} + /* Testing publisher construction and destruction for subnodes. */