From 2d8796114b8e48a02cd71a4c9897e351076a8ddb Mon Sep 17 00:00:00 2001 From: William Woodall Date: Mon, 24 Aug 2015 17:12:20 -0700 Subject: [PATCH] use gid comparison to filter duplicate messages --- rclcpp/include/rclcpp/executor.hpp | 8 +- .../include/rclcpp/intra_process_manager.hpp | 18 +++++ rclcpp/include/rclcpp/node_impl.hpp | 11 ++- rclcpp/include/rclcpp/publisher.hpp | 75 ++++++++++++++++--- rclcpp/include/rclcpp/subscription.hpp | 23 ++++-- rclcpp/test/test_intra_process_manager.cpp | 8 ++ 6 files changed, 123 insertions(+), 20 deletions(-) diff --git a/rclcpp/include/rclcpp/executor.hpp b/rclcpp/include/rclcpp/executor.hpp index de7780b..3911d59 100644 --- a/rclcpp/include/rclcpp/executor.hpp +++ b/rclcpp/include/rclcpp/executor.hpp @@ -185,10 +185,12 @@ protected: { std::shared_ptr message = subscription->create_message(); bool taken = false; - rmw_ret_t status = rmw_take(subscription->subscription_handle_, message.get(), &taken); - if (status == RMW_RET_OK) { + rmw_message_info_t message_info; + auto ret = + rmw_take_with_info(subscription->subscription_handle_, message.get(), &taken, &message_info); + if (ret == RMW_RET_OK) { if (taken) { - subscription->handle_message(message); + subscription->handle_message(message, &message_info.publisher_gid); } } else { fprintf(stderr, diff --git a/rclcpp/include/rclcpp/intra_process_manager.hpp b/rclcpp/include/rclcpp/intra_process_manager.hpp index e689731..baad129 100644 --- a/rclcpp/include/rclcpp/intra_process_manager.hpp +++ b/rclcpp/include/rclcpp/intra_process_manager.hpp @@ -29,6 +29,8 @@ #include #include +#include + namespace rclcpp { namespace intra_process_manager @@ -368,6 +370,22 @@ public: } } + /// Return true if the given rmw_gid_t matches any stored Publishers. + bool + matches_any_publishers(const rmw_gid_t * id) const + { + for (auto & publisher_pair : publishers_) { + auto publisher = publisher_pair.second.publisher.lock(); + if (!publisher) { + continue; + } + if (*publisher.get() == id) { + return true; + } + } + return false; + } + private: static uint64_t get_next_unique_id() { diff --git a/rclcpp/include/rclcpp/node_impl.hpp b/rclcpp/include/rclcpp/node_impl.hpp index 23cdd27..42cf7f2 100644 --- a/rclcpp/include/rclcpp/node_impl.hpp +++ b/rclcpp/include/rclcpp/node_impl.hpp @@ -259,7 +259,16 @@ Node::create_subscription( "intra process take called after destruction of intra process manager"); } ipm->take_intra_process_message(publisher_id, message_sequence, subscription_id, message); - }); + }, + [weak_ipm](const rmw_gid_t * sender_gid) -> bool { + auto ipm = weak_ipm.lock(); + if (!ipm) { + throw std::runtime_error( + "intra process publisher check called after destruction of intra process manager"); + } + return ipm->matches_any_publishers(sender_gid); + } + ); // *INDENT-ON* } // Assign to a group. diff --git a/rclcpp/include/rclcpp/publisher.hpp b/rclcpp/include/rclcpp/publisher.hpp index 6f0ae14..21cf457 100644 --- a/rclcpp/include/rclcpp/publisher.hpp +++ b/rclcpp/include/rclcpp/publisher.hpp @@ -55,7 +55,15 @@ public: intra_process_publisher_handle_(nullptr), topic_(topic), queue_size_(queue_size), intra_process_publisher_id_(0), store_intra_process_message_(nullptr) - {} + { + // Life time of this object is tied to the publisher handle. + if (rmw_get_gid_for_publisher(publisher_handle_, &rmw_gid_) != RMW_RET_OK) { + // *INDENT-OFF* (prevent uncrustify from making unecessary indents here) + throw std::runtime_error( + std::string("failed to get publisher gid: ") + rmw_get_error_string_safe()); + // *INDENT-ON* + } + } virtual ~Publisher() { @@ -82,16 +90,12 @@ public: publish(std::shared_ptr & msg) { rmw_ret_t status; - if (!store_intra_process_message_) { - // TODO(wjwwood): for now, make intra process and inter process mutually exclusive. - // Later we'll have them together, when we have a way to filter more efficiently. - status = rmw_publish(publisher_handle_, msg.get()); - if (status != RMW_RET_OK) { - // *INDENT-OFF* (prevent uncrustify from making unecessary indents here) - throw std::runtime_error( - std::string("failed to publish message: ") + rmw_get_error_string_safe()); - // *INDENT-ON* - } + status = rmw_publish(publisher_handle_, msg.get()); + if (status != RMW_RET_OK) { + // *INDENT-OFF* (prevent uncrustify from making unecessary indents here) + throw std::runtime_error( + std::string("failed to publish message: ") + rmw_get_error_string_safe()); + // *INDENT-ON* } if (store_intra_process_message_) { uint64_t message_seq = store_intra_process_message_(intra_process_publisher_id_, msg); @@ -120,6 +124,43 @@ public: return queue_size_; } + const rmw_gid_t & + get_gid() const + { + return rmw_gid_; + } + + const rmw_gid_t & + get_intra_process_gid() const + { + return intra_process_rmw_gid_; + } + + bool + operator==(const rmw_gid_t & gid) const + { + return *this == &gid; + } + + bool + operator==(const rmw_gid_t * gid) const + { + bool result = false; + auto ret = rmw_compare_gids_equal(gid, &this->get_gid(), &result); + if (ret != RMW_RET_OK) { + throw std::runtime_error( + std::string("failed to compare gids: ") + rmw_get_error_string_safe()); + } + if (!result) { + ret = rmw_compare_gids_equal(gid, &this->get_intra_process_gid(), &result); + if (ret != RMW_RET_OK) { + throw std::runtime_error( + std::string("failed to compare gids: ") + rmw_get_error_string_safe()); + } + } + return result; + } + typedef std::function)> StoreSharedMessageCallbackT; protected: @@ -132,6 +173,15 @@ protected: intra_process_publisher_id_ = intra_process_publisher_id; store_intra_process_message_ = callback; intra_process_publisher_handle_ = intra_process_publisher_handle; + // Life time of this object is tied to the publisher handle. + auto ret = rmw_get_gid_for_publisher(intra_process_publisher_handle_, &intra_process_rmw_gid_); + if (ret != RMW_RET_OK) { + // *INDENT-OFF* (prevent uncrustify from making unecessary indents here) + throw std::runtime_error( + std::string("failed to create intra process publisher gid: ") + + rmw_get_error_string_safe()); + // *INDENT-ON* + } } private: @@ -146,6 +196,9 @@ private: uint64_t intra_process_publisher_id_; StoreSharedMessageCallbackT store_intra_process_message_; + rmw_gid_t rmw_gid_; + rmw_gid_t intra_process_rmw_gid_; + }; } /* namespace publisher */ diff --git a/rclcpp/include/rclcpp/subscription.hpp b/rclcpp/include/rclcpp/subscription.hpp index 33dc9ff..4952087 100644 --- a/rclcpp/include/rclcpp/subscription.hpp +++ b/rclcpp/include/rclcpp/subscription.hpp @@ -94,7 +94,7 @@ public: } virtual std::shared_ptr create_message() = 0; - virtual void handle_message(std::shared_ptr & message) = 0; + virtual void handle_message(std::shared_ptr & message, const rmw_gid_t * sender_id) = 0; virtual void return_message(std::shared_ptr & message) = 0; virtual void handle_intra_process_message(rcl_interfaces::msg::IntraProcessMessage & ipm) = 0; @@ -132,7 +132,9 @@ public: message_memory_strategy::MessageMemoryStrategy::create_default()) : SubscriptionBase(node_handle, subscription_handle, topic_name, ignore_local_publications), callback_(callback), - message_memory_strategy_(memory_strategy) + message_memory_strategy_(memory_strategy), + get_intra_process_message_callback_(nullptr), + matches_any_intra_process_publishers_(nullptr) {} void set_message_memory_strategy( @@ -146,8 +148,15 @@ public: return message_memory_strategy_->borrow_message(); } - void handle_message(std::shared_ptr & message) + void handle_message(std::shared_ptr & message, const rmw_gid_t * sender_id) { + if (matches_any_intra_process_publishers_) { + if (matches_any_intra_process_publishers_(sender_id)) { + // In this case, the message will be delivered via intra process and + // we should ignore this copy of the message. + return; + } + } auto typed_message = std::static_pointer_cast(message); callback_(typed_message); } @@ -188,15 +197,18 @@ private: std::function< void (uint64_t, uint64_t, uint64_t, std::unique_ptr &) > GetMessageCallbackType; + typedef std::function MatchesAnyPublishersCallbackType; void setup_intra_process( uint64_t intra_process_subscription_id, rmw_subscription_t * intra_process_subscription, - GetMessageCallbackType callback) + GetMessageCallbackType get_message_callback, + MatchesAnyPublishersCallbackType matches_any_publisher_callback) { intra_process_subscription_id_ = intra_process_subscription_id; intra_process_subscription_handle_ = intra_process_subscription; - get_intra_process_message_callback_ = callback; + get_intra_process_message_callback_ = get_message_callback; + matches_any_intra_process_publishers_ = matches_any_publisher_callback; } RCLCPP_DISABLE_COPY(Subscription); @@ -206,6 +218,7 @@ private: message_memory_strategy_; GetMessageCallbackType get_intra_process_message_callback_; + MatchesAnyPublishersCallbackType matches_any_intra_process_publishers_; uint64_t intra_process_subscription_id_; }; diff --git a/rclcpp/test/test_intra_process_manager.cpp b/rclcpp/test/test_intra_process_manager.cpp index 8a57705..71fdc8a 100644 --- a/rclcpp/test/test_intra_process_manager.cpp +++ b/rclcpp/test/test_intra_process_manager.cpp @@ -17,6 +17,7 @@ #include #include +#include // Mock up publisher and subscription base to avoid needing an rmw impl. namespace rclcpp @@ -43,6 +44,13 @@ public: return mock_queue_size; } + bool + operator==(const rmw_gid_t * gid) const + { + (void)gid; + return false; + } + std::string mock_topic_name; size_t mock_queue_size; };