diff --git a/rclcpp/include/rclcpp/intra_process_manager.hpp b/rclcpp/include/rclcpp/intra_process_manager.hpp index baad129..340b527 100644 --- a/rclcpp/include/rclcpp/intra_process_manager.hpp +++ b/rclcpp/include/rclcpp/intra_process_manager.hpp @@ -190,7 +190,8 @@ public: */ template uint64_t - add_publisher(publisher::Publisher::SharedPtr publisher, size_t buffer_size = 0) + add_publisher(typename publisher::Publisher::SharedPtr publisher, + size_t buffer_size = 0) { auto id = IntraProcessManager::get_next_unique_id(); publishers_[id].publisher = publisher; @@ -419,7 +420,7 @@ private: PublisherInfo() = default; - publisher::Publisher::WeakPtr publisher; + publisher::PublisherBase::WeakPtr publisher; std::atomic sequence_number; mapped_ring_buffer::MappedRingBufferBase::SharedPtr buffer; std::unordered_map> target_subscriptions_by_message_sequence; diff --git a/rclcpp/include/rclcpp/node.hpp b/rclcpp/include/rclcpp/node.hpp index b90631f..e995060 100644 --- a/rclcpp/include/rclcpp/node.hpp +++ b/rclcpp/include/rclcpp/node.hpp @@ -99,7 +99,7 @@ public: * \return Shared pointer to the created publisher. */ template - rclcpp::publisher::Publisher::SharedPtr + typename rclcpp::publisher::Publisher::SharedPtr create_publisher( const std::string & topic_name, const rmw_qos_profile_t & qos_profile); @@ -235,7 +235,7 @@ private: std::map parameters_; - publisher::Publisher::SharedPtr events_publisher_; + publisher::Publisher::SharedPtr events_publisher_; template typename subscription::Subscription::SharedPtr diff --git a/rclcpp/include/rclcpp/node_impl.hpp b/rclcpp/include/rclcpp/node_impl.hpp index 4412caf..6e6ae76 100644 --- a/rclcpp/include/rclcpp/node_impl.hpp +++ b/rclcpp/include/rclcpp/node_impl.hpp @@ -114,7 +114,7 @@ Node::create_callback_group( } template -publisher::Publisher::SharedPtr +typename publisher::Publisher::SharedPtr Node::create_publisher( const std::string & topic_name, const rmw_qos_profile_t & qos_profile) { @@ -130,7 +130,7 @@ Node::create_publisher( // *INDENT-ON* } - auto publisher = publisher::Publisher::make_shared( + auto publisher = publisher::Publisher::make_shared( node_handle_, publisher_handle, topic_name, qos_profile.depth); if (use_intra_process_comms_) { diff --git a/rclcpp/include/rclcpp/publisher.hpp b/rclcpp/include/rclcpp/publisher.hpp index 4dbe03f..89d688a 100644 --- a/rclcpp/include/rclcpp/publisher.hpp +++ b/rclcpp/include/rclcpp/publisher.hpp @@ -40,14 +40,10 @@ class Node; namespace publisher { -/// A publisher publishes messages of any type to a topic. -class Publisher +class PublisherBase { - friend rclcpp::node::Node; - public: - RCLCPP_SMART_PTR_DEFINITIONS(Publisher); - + RCLCPP_SMART_PTR_DEFINITIONS(PublisherBase); /// Default constructor. /** * Typically, a publisher is not created through this method, but instead is created through a @@ -57,7 +53,7 @@ public: * \param[in] topic The topic that this publisher publishes on. * \param[in] queue_size The maximum number of unpublished messages to queue. */ - Publisher( + PublisherBase( std::shared_ptr node_handle, rmw_publisher_t * publisher_handle, std::string topic, @@ -77,7 +73,7 @@ public: } /// Default destructor. - virtual ~Publisher() + virtual ~PublisherBase() { if (intra_process_publisher_handle_) { if (rmw_destroy_publisher(node_handle_.get(), intra_process_publisher_handle_)) { @@ -97,98 +93,6 @@ public: } } - /// Send a message to the topic for this publisher. - /** - * This function is templated on the input message type, MessageT. - * \param[in] msg A shared pointer to the message to send. - */ - template - void - publish(std::unique_ptr & msg) - { - this->do_inter_process_publish(msg.get()); - if (store_intra_process_message_) { - // Take the pointer from the unique_msg, release it and pass as a void * - // to the ipm. The ipm should then capture it again as a unique_ptr of - // the correct type. - // TODO(wjwwood): - // investigate how to transfer the custom deleter (if there is one) - // from the incoming unique_ptr through to the ipm's unique_ptr. - // See: http://stackoverflow.com/questions/11002641/dynamic-casting-for-unique-ptr - MessageT * msg_ptr = msg.get(); - msg.release(); - uint64_t message_seq; - { - std::lock_guard lock(intra_process_publish_mutex_); - message_seq = - store_intra_process_message_(intra_process_publisher_id_, msg_ptr, typeid(MessageT)); - } - rcl_interfaces::msg::IntraProcessMessage ipm; - ipm.publisher_id = intra_process_publisher_id_; - ipm.message_sequence = message_seq; - auto status = rmw_publish(intra_process_publisher_handle_, &ipm); - if (status != RMW_RET_OK) { - // *INDENT-OFF* (prevent uncrustify from making unecessary indents here) - throw std::runtime_error( - std::string("failed to publish intra process message: ") + rmw_get_error_string_safe()); - // *INDENT-ON* - } - } else { - // Always destroy the message, even if we don't consume it, for consistency. - msg.reset(); - } - } - - template - void - publish(const std::shared_ptr & msg) - { - // Avoid allocating when not using intra process. - if (!store_intra_process_message_) { - // In this case we're not using intra process. - return this->do_inter_process_publish(msg.get()); - } - // Otherwise we have to allocate memory in a unique_ptr and pass it along. - // TODO(wjwwood): - // The intra process manager should probably also be able to store - // shared_ptr's and do the "smart" thing based on other intra process - // subscriptions. For now call the other publish(). - std::unique_ptr unique_msg(new MessageT(*msg.get())); - return this->publish(unique_msg); - } - - template - void - publish(std::shared_ptr msg) - { - // Avoid allocating when not using intra process. - if (!store_intra_process_message_) { - // In this case we're not using intra process. - return this->do_inter_process_publish(msg.get()); - } - // Otherwise we have to allocate memory in a unique_ptr and pass it along. - // TODO(wjwwood): - // The intra process manager should probably also be able to store - // shared_ptr's and do the "smart" thing based on other intra process - // subscriptions. For now call the other publish(). - std::unique_ptr unique_msg(new MessageT(*msg.get())); - return this->publish(unique_msg); - } - - template - void - publish(const MessageT & msg) - { - // Avoid allocating when not using intra process. - if (!store_intra_process_message_) { - // In this case we're not using intra process. - return this->do_inter_process_publish(msg.get()); - } - // Otherwise we have to allocate memory in a unique_ptr and pass it along. - std::unique_ptr unique_msg(new MessageT(msg)); - return this->publish(unique_msg); - } - /// Get the topic that this publisher publishes on. // \return The topic name. const std::string & @@ -261,20 +165,6 @@ public: typedef std::function StoreMessageCallbackT; protected: - template - void - do_inter_process_publish(MessageT * msg) - { - auto status = rmw_publish(publisher_handle_, msg); - if (status != RMW_RET_OK) { - // *INDENT-OFF* (prevent uncrustify from making unecessary indents here) - throw std::runtime_error( - std::string("failed to publish message: ") + rmw_get_error_string_safe()); - // *INDENT-ON* - } - } - - void setup_intra_process( uint64_t intra_process_publisher_id, @@ -295,7 +185,6 @@ protected: } } -private: std::shared_ptr node_handle_; rmw_publisher_t * publisher_handle_; @@ -311,6 +200,127 @@ private: rmw_gid_t intra_process_rmw_gid_; std::mutex intra_process_publish_mutex_; +}; + +/// A publisher publishes messages of any type to a topic. +template +class Publisher : public PublisherBase +{ + friend rclcpp::node::Node; + +public: + RCLCPP_SMART_PTR_DEFINITIONS(Publisher); + + Publisher( + std::shared_ptr node_handle, + rmw_publisher_t * publisher_handle, + std::string topic, + size_t queue_size) + : PublisherBase(node_handle, publisher_handle, topic, queue_size) + {} + + + /// Send a message to the topic for this publisher. + /** + * This function is templated on the input message type, MessageT. + * \param[in] msg A shared pointer to the message to send. + */ + void + publish(std::unique_ptr & msg) + { + this->do_inter_process_publish(msg.get()); + if (store_intra_process_message_) { + // Take the pointer from the unique_msg, release it and pass as a void * + // to the ipm. The ipm should then capture it again as a unique_ptr of + // the correct type. + // TODO(wjwwood): + // investigate how to transfer the custom deleter (if there is one) + // from the incoming unique_ptr through to the ipm's unique_ptr. + // See: http://stackoverflow.com/questions/11002641/dynamic-casting-for-unique-ptr + MessageT * msg_ptr = msg.get(); + msg.release(); + uint64_t message_seq; + { + std::lock_guard lock(intra_process_publish_mutex_); + message_seq = + store_intra_process_message_(intra_process_publisher_id_, msg_ptr, typeid(MessageT)); + } + rcl_interfaces::msg::IntraProcessMessage ipm; + ipm.publisher_id = intra_process_publisher_id_; + ipm.message_sequence = message_seq; + auto status = rmw_publish(intra_process_publisher_handle_, &ipm); + if (status != RMW_RET_OK) { + // *INDENT-OFF* (prevent uncrustify from making unecessary indents here) + throw std::runtime_error( + std::string("failed to publish intra process message: ") + rmw_get_error_string_safe()); + // *INDENT-ON* + } + } else { + // Always destroy the message, even if we don't consume it, for consistency. + msg.reset(); + } + } + + void + publish(const std::shared_ptr & msg) + { + // Avoid allocating when not using intra process. + if (!store_intra_process_message_) { + // In this case we're not using intra process. + return this->do_inter_process_publish(msg.get()); + } + // Otherwise we have to allocate memory in a unique_ptr and pass it along. + // TODO(wjwwood): + // The intra process manager should probably also be able to store + // shared_ptr's and do the "smart" thing based on other intra process + // subscriptions. For now call the other publish(). + std::unique_ptr unique_msg(new MessageT(*msg.get())); + return this->publish(unique_msg); + } + + void + publish(std::shared_ptr msg) + { + // Avoid allocating when not using intra process. + if (!store_intra_process_message_) { + // In this case we're not using intra process. + return this->do_inter_process_publish(msg.get()); + } + // Otherwise we have to allocate memory in a unique_ptr and pass it along. + // TODO(wjwwood): + // The intra process manager should probably also be able to store + // shared_ptr's and do the "smart" thing based on other intra process + // subscriptions. For now call the other publish(). + std::unique_ptr unique_msg(new MessageT(*msg.get())); + return this->publish(unique_msg); + } + + + void + publish(const MessageT & msg) + { + // Avoid allocating when not using intra process. + if (!store_intra_process_message_) { + // In this case we're not using intra process. + return this->do_inter_process_publish(msg.get()); + } + // Otherwise we have to allocate memory in a unique_ptr and pass it along. + std::unique_ptr unique_msg(new MessageT(msg)); + return this->publish(unique_msg); + } + +protected: + void + do_inter_process_publish(MessageT * msg) + { + auto status = rmw_publish(publisher_handle_, msg); + if (status != RMW_RET_OK) { + // *INDENT-OFF* (prevent uncrustify from making unecessary indents here) + throw std::runtime_error( + std::string("failed to publish message: ") + rmw_get_error_string_safe()); + // *INDENT-ON* + } + } }; diff --git a/rclcpp/test/test_intra_process_manager.cpp b/rclcpp/test/test_intra_process_manager.cpp index 71fdc8a..4ce7f90 100644 --- a/rclcpp/test/test_intra_process_manager.cpp +++ b/rclcpp/test/test_intra_process_manager.cpp @@ -27,12 +27,12 @@ namespace publisher namespace mock { -class Publisher +class PublisherBase { public: - RCLCPP_SMART_PTR_DEFINITIONS(Publisher); + RCLCPP_SMART_PTR_DEFINITIONS(PublisherBase); - Publisher() + PublisherBase() : mock_topic_name(""), mock_queue_size(0) {} const std::string & get_topic_name() const @@ -55,6 +55,13 @@ public: size_t mock_queue_size; }; +template +class Publisher : public PublisherBase +{ +public: + RCLCPP_SMART_PTR_DEFINITIONS(Publisher); +}; + } } } @@ -96,10 +103,12 @@ public: #define RCLCPP_RCLCPP_SUBSCRIPTION_HPP_ // Force ipm to use our mock publisher class. #define Publisher mock::Publisher +#define PublisherBase mock::PublisherBase #define SubscriptionBase mock::SubscriptionBase #include #undef SubscriptionBase #undef Publisher +#undef PublisherBase #include @@ -116,11 +125,13 @@ public: TEST(TestIntraProcessManager, nominal) { rclcpp::intra_process_manager::IntraProcessManager ipm; - auto p1 = std::make_shared(); + auto p1 = + std::make_shared>(); p1->mock_topic_name = "nominal1"; p1->mock_queue_size = 2; - auto p2 = std::make_shared(); + auto p2 = + std::make_shared>(); p2->mock_topic_name = "nominal2"; p2->mock_queue_size = 10; @@ -201,7 +212,8 @@ TEST(TestIntraProcessManager, nominal) { TEST(TestIntraProcessManager, remove_publisher_before_trying_to_take) { rclcpp::intra_process_manager::IntraProcessManager ipm; - auto p1 = std::make_shared(); + auto p1 = + std::make_shared>(); p1->mock_topic_name = "nominal1"; p1->mock_queue_size = 10; @@ -240,7 +252,8 @@ TEST(TestIntraProcessManager, remove_publisher_before_trying_to_take) { TEST(TestIntraProcessManager, removed_subscription_affects_take) { rclcpp::intra_process_manager::IntraProcessManager ipm; - auto p1 = std::make_shared(); + auto p1 = + std::make_shared>(); p1->mock_topic_name = "nominal1"; p1->mock_queue_size = 10; @@ -308,7 +321,8 @@ TEST(TestIntraProcessManager, removed_subscription_affects_take) { TEST(TestIntraProcessManager, multiple_subscriptions_one_publisher) { rclcpp::intra_process_manager::IntraProcessManager ipm; - auto p1 = std::make_shared(); + auto p1 = + std::make_shared>(); p1->mock_topic_name = "nominal1"; p1->mock_queue_size = 10; @@ -377,15 +391,18 @@ TEST(TestIntraProcessManager, multiple_subscriptions_one_publisher) { TEST(TestIntraProcessManager, multiple_publishers_one_subscription) { rclcpp::intra_process_manager::IntraProcessManager ipm; - auto p1 = std::make_shared(); + auto p1 = + std::make_shared>(); p1->mock_topic_name = "nominal1"; p1->mock_queue_size = 10; - auto p2 = std::make_shared(); + auto p2 = + std::make_shared>(); p2->mock_topic_name = "nominal1"; p2->mock_queue_size = 10; - auto p3 = std::make_shared(); + auto p3 = + std::make_shared>(); p3->mock_topic_name = "nominal1"; p3->mock_queue_size = 10; @@ -468,15 +485,18 @@ TEST(TestIntraProcessManager, multiple_publishers_one_subscription) { TEST(TestIntraProcessManager, multiple_publishers_multiple_subscription) { rclcpp::intra_process_manager::IntraProcessManager ipm; - auto p1 = std::make_shared(); + auto p1 = + std::make_shared>(); p1->mock_topic_name = "nominal1"; p1->mock_queue_size = 10; - auto p2 = std::make_shared(); + auto p2 = + std::make_shared>(); p2->mock_topic_name = "nominal1"; p2->mock_queue_size = 10; - auto p3 = std::make_shared(); + auto p3 = + std::make_shared>(); p3->mock_topic_name = "nominal1"; p3->mock_queue_size = 10; @@ -626,7 +646,8 @@ TEST(TestIntraProcessManager, multiple_publishers_multiple_subscription) { TEST(TestIntraProcessManager, ring_buffer_displacement) { rclcpp::intra_process_manager::IntraProcessManager ipm; - auto p1 = std::make_shared(); + auto p1 = + std::make_shared>(); p1->mock_topic_name = "nominal1"; p1->mock_queue_size = 2; @@ -694,7 +715,8 @@ TEST(TestIntraProcessManager, ring_buffer_displacement) { TEST(TestIntraProcessManager, subscription_creation_race_condition) { rclcpp::intra_process_manager::IntraProcessManager ipm; - auto p1 = std::make_shared(); + auto p1 = + std::make_shared>(); p1->mock_topic_name = "nominal1"; p1->mock_queue_size = 2; @@ -740,7 +762,8 @@ TEST(TestIntraProcessManager, publisher_out_of_scope_take) { uint64_t p1_id; uint64_t p1_m1_id; { - auto p1 = std::make_shared(); + auto p1 = + std::make_shared>(); p1->mock_topic_name = "nominal1"; p1->mock_queue_size = 2; @@ -777,7 +800,8 @@ TEST(TestIntraProcessManager, publisher_out_of_scope_store) { uint64_t p1_id; { - auto p1 = std::make_shared(); + auto p1 = + std::make_shared>(); p1->mock_topic_name = "nominal1"; p1->mock_queue_size = 2;