diff --git a/rclcpp/CMakeLists.txt b/rclcpp/CMakeLists.txt index d4e3ad4..1501ef8 100644 --- a/rclcpp/CMakeLists.txt +++ b/rclcpp/CMakeLists.txt @@ -21,6 +21,8 @@ if(AMENT_ENABLE_TESTING) include_directories(include) ament_add_gtest(test_mapped_ring_buffer test/test_mapped_ring_buffer.cpp) + ament_add_gtest(test_intra_process_manager test/test_intra_process_manager.cpp) + target_include_directories(test_intra_process_manager PUBLIC "${rcl_interfaces_INCLUDE_DIRS}") endif() ament_package( diff --git a/rclcpp/include/rclcpp/intra_process_manager.hpp b/rclcpp/include/rclcpp/intra_process_manager.hpp new file mode 100644 index 0000000..38eee41 --- /dev/null +++ b/rclcpp/include/rclcpp/intra_process_manager.hpp @@ -0,0 +1,413 @@ +// Copyright 2015 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef RCLCPP_RCLCPP_INTRA_PROCESS_MANAGER_HPP_ +#define RCLCPP_RCLCPP_INTRA_PROCESS_MANAGER_HPP_ + +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include + +namespace rclcpp +{ +namespace intra_process_manager +{ + +/// This class facilitates intra process communication between nodes. +/* This class is used in the creation of publishers and subscriptions. + * A singleton instance of this class is owned by a rclcpp::Context and a + * rclcpp::Node can use an associated Context to get an instance of this class. + * Nodes which do not have a common Context will not exchange intra process + * messages because they will not share access to an instance of this class. + * + * When a Node creates a publisher or subscription, it will register them + * with this class. + * The node will also hook into the publisher's publish call + * in order to do intra process related work. + * + * When a publisher is created, it advertises on the topic the user provided, + * as well as a "shadowing" topic of type rcl_interfaces/IntraProcessMessage. + * For instance, if the user specified the topic '/namespace/chatter', then the + * corresponding intra process topic might be '/namespace/chatter__intra'. + * The publisher is also allocated an id which is unique among all publishers + * and subscriptions in this process. + * Additionally, when registered with this class a ring buffer is created and + * owned by this class as a temporary place to hold messages destined for intra + * process subscriptions. + * + * When a subscription is created, it subscribes to the topic provided by the + * user as well as to the corresponding intra process topic. + * It is also gets a unique id from the singleton instance of this class which + * is unique among publishers and subscriptions. + * + * When the user publishes a message, the message is stored by calling + * store_intra_process_message on this class. + * The instance of that message is uniquely identified by a publisher id and a + * message sequence number. + * The publisher id, message sequence pair is unique with in the process. + * At that point a list of the id's of intra process subscriptions which have + * been registered with the singleton instance of this class are stored with + * the message instance so that delivery is only made to those subscriptions. + * Then an instance of rcl_interfaces/IntraProcessMessage is published to the + * intra process topic which is specific to the topic specified by the user. + * + * When an instance of rcl_interfaces/IntraProcessMessage is received by a + * subscription, then it is handled by calling take_intra_process_message + * on a singleton of this class. + * The subscription passes a publisher id, message sequence pair which + * uniquely identifies the message instance it was suppose to receive as well + * as the subscriptions unique id. + * If the message is still being held by this class and the subscription's id + * is in the list of intended subscriptions then the message is returned. + * If either of those predicates are not satisfied then the message is not + * returned and the subscription does not call the users callback. + * + * Since the publisher builds a list of destined subscriptions on publish, and + * other requests are ignored, this class knows how many times a message + * instance should be requested. + * The final time a message is requested, the ownership is passed out of this + * class and passed to the final subscription, effectively freeing space in + * this class's internal storage. + * + * Since a topic is being used to ferry notifications about new intra process + * messages between publishers and subscriptions, it is possible for that + * notification to be lost. + * It is also possible that a subscription which was available when publish was + * called will no longer exist once the notification gets posted. + * In both cases this might result in a message instance getting requested + * fewer times than expected. + * This is why the internal storage of this class is a ring buffer. + * That way if a message is orphaned it will eventually be dropped from storage + * when a new message instance is stored and will not result in a memory leak. + * + * However, since the storage system is finite, this also means that a message + * instance might get displaced by an incoming message instance before all + * interested parties have called take_intra_process_message. + * Because of this the size of the internal storage should be carefully + * considered. + * + * /TODO(wjwwood): update to include information about handling latching. + * /TODO(wjwwood): consider thread safety of the class. + * + * This class is neither CopyConstructable nor CopyAssignable. + */ +class IntraProcessManager +{ +private: + RCLCPP_DISABLE_COPY(IntraProcessManager); +public: + RCLCPP_SMART_PTR_DEFINITIONS(IntraProcessManager); + + IntraProcessManager() = default; + + /// Register a subscription with the manager, returns subscriptions unique id. + /* In addition to generating a unique intra process id for the subscription, + * this method also stores the topic name of the subscription. + * + * This method is normally called during the creation of a subscription, + * but after it creates the internal intra process rmw_subscription_t. + * + * This method will allocate memory. + * + * /param subscription the Subscription to register. + * /return an unsigned 64-bit integer which is the subscription's unique id. + */ + uint64_t + add_subscription(subscription::SubscriptionBase::SharedPtr subscription) + { + auto id = IntraProcessManager::get_next_unique_id(); + subscriptions_[id] = subscription; + subscription_ids_by_topic_[subscription->get_topic_name()].insert(id); + return id; + } + + /// Unregister a subscription using the subscription's unique id. + /* This method does not allocate memory. + * + * /param intra_process_subscription_id id of the subscription to remove. + */ + void + remove_subscription(uint64_t intra_process_subscription_id) + { + subscriptions_.erase(intra_process_subscription_id); + for (auto & pair : subscription_ids_by_topic_) { + pair.second.erase(intra_process_subscription_id); + } + // Iterate over all publisher infos and all stored subscription id's and + // remove references to this subscription's id. + for (auto & publisher_pair : publishers_) { + for (auto & sub_pair : publisher_pair.second.target_subscriptions_by_message_sequence) { + sub_pair.second.erase(intra_process_subscription_id); + } + } + } + + /// Register a publisher with the manager, returns the publisher unique id. + /* In addition to generating and returning a unique id for the publisher, + * this method creates internal ring buffer storage for "in-flight" intra + * process messages which are stored when store_intra_process_message is + * called with this publisher's unique id. + * + * The buffer_size must be less than or equal to the max uint64_t value. + * If the buffer_size is 0 then a buffer size is calculated using the + * publisher's QoS settings. + * The default is to use the depth field of the publisher's QoS. + * TODO(wjwwood): Consider doing depth *= 1.2, round up, or similar. + * TODO(wjwwood): Consider what to do for keep all. + * + * This method is templated on the publisher's message type so that internal + * storage of the same type can be allocated. + * + * This method will allocate memory. + * + * /param publisher publisher to be registered with the manager. + * /param buffer_size if 0 (default) a size is calculated based on the QoS. + * /return an unsigned 64-bit integer which is the publisher's unique id. + */ + template + uint64_t + add_publisher(publisher::Publisher::SharedPtr publisher, size_t buffer_size=0) + { + auto id = IntraProcessManager::get_next_unique_id(); + publishers_[id].publisher = publisher; + size_t size = buffer_size > 0 ? buffer_size : publisher->get_queue_size(); + // As long as the size of the ring buffer is less than the max sequence number, we're safe. + assert(size <= std::numeric_limits::max()); + publishers_[id].sequence_number.store(0); + publishers_[id].buffer = mapped_ring_buffer::MappedRingBuffer::make_shared(size); + publishers_[id].target_subscriptions_by_message_sequence.reserve(size); + return id; + } + + /// Unregister a publisher using the publisher's unique id. + /* This method does not allocate memory. + * + * /param intra_process_publisher_id id of the publisher to remove. + */ + void + remove_publisher(uint64_t intra_process_publisher_id) + { + publishers_.erase(intra_process_publisher_id); + } + + /// Store a message in the manager, and return the message sequence number. + /* The given message is stored in internal storage using the given publisher + * id and the newly generated message sequence, which is also returned. + * The combination of publisher id and message sequence number can later + * be used with a subscription id to retrieve the message by calling + * take_intra_process_message. + * The number of times take_intra_process_message can be called with this + * unique pair of id's is determined by the number of subscriptions currently + * subscribed to the same topic and which share the same Context, i.e. once + * for each subscription which should receive the intra process message. + * + * The ownership of the incoming message is transfered to the internal + * storage in order to avoid copying the message data. + * Therefore, the message parameter will no longer contain the original + * message after calling this method. + * Instead it will either be a nullptr or it will contain the ownership of + * the message instance which was displaced. + * If the message parameter is not equal to nullptr after calling this method + * then a message was prematurely displaced, i.e. take_intra_process_message + * had not been called on it as many times as was expected. + * + * This method can throw an exception if the publisher id is not found or + * if the publisher shared_ptr given to add_publisher has gone out of scope. + * + * This method does allocate memory. + * + * /param intra_process_publisher_id the id of the publisher of this message. + * /param message the message that is being stored. + * /return the message sequence number. + */ + template + uint64_t + store_intra_process_message( + uint64_t intra_process_publisher_id, + std::unique_ptr & message) + { + auto it = publishers_.find(intra_process_publisher_id); + if (it == publishers_.end()) { + throw std::runtime_error("store_intra_process_message called with invalid publisher id"); + } + publisher_info & info = it->second; + // Calculate the next message sequence number. + uint64_t message_seq = info.sequence_number.fetch_add(1, std::memory_order_relaxed); + // Insert the message into the ring buffer using the message_seq to identify it. + typedef typename mapped_ring_buffer::MappedRingBuffer TypedMRB; + typename TypedMRB::SharedPtr typed_buffer = std::static_pointer_cast(info.buffer); + bool did_replace = typed_buffer->push_and_replace(message_seq, message); + // TODO(wjwwood): do something when a message was displaced. log debug? + (void)did_replace; // Avoid unused variable warning. + // Figure out what subscriptions should receive the message. + auto publisher = info.publisher.lock(); + if (!publisher) { + throw std::runtime_error("publisher has unexpectedly gone out of scope"); + } + auto & destined_subscriptions = subscription_ids_by_topic_[publisher->get_topic_name()]; + // Store the list for later comparison. + info.target_subscriptions_by_message_sequence[message_seq].clear(); + std::copy( + destined_subscriptions.begin(), destined_subscriptions.end(), + // Memory allocation occurs in info.target_subscriptions_by_message_sequence[message_seq] + std::inserter( + info.target_subscriptions_by_message_sequence[message_seq], + // This ends up only being a hint to std::set, could also be .begin(). + info.target_subscriptions_by_message_sequence[message_seq].end() + ) + ); + // Return the message sequence which is sent to the subscription. + return message_seq; + } + + /// Take an intra process message. + /* The intra_process_publisher_id and message_sequence_number parameters + * uniquely identify a message instance, which should be taken. + * + * The requesting_subscriptions_intra_process_id parameter is used to make + * sure the requesting subscription was intended to receive this message + * instance. + * This check is made because it could happen that the requester + * comes up after the publish event, so it still receives the notification of + * a new intra process message, but it wasn't registered with the manager at + * the time of publishing, causing it to take when it wasn't intended. + * This should be avioded unless latching-like behavior is involved. + * + * The message parameter is used to store the taken message. + * On the last expected call to this method, the ownership is transfered out + * of internal storage and into the message parameter. + * On all previous calls a copy of the internally stored message is made and + * the ownership of the copy is transfered to the message parameter. + * TODO(wjwwood): update this documentation when latching is supported. + * + * The message parameter can be set to nullptr if: + * + * - The publisher id is not found. + * - The message sequence is not found for the given publisher id. + * - The requesting subscription's id is not in the list of intended takers. + * - The requesting subscription's id has been used before with this message. + * + * This method may allocate memory to copy the stored message. + * + * /param intra_process_publisher_id the id of the message's publisher. + * /param message_sequence_number the sequence number of the message. + * /param requesting_subscriptions_intra_process_id the subscription's id. + * /param message the message typed unique_ptr used to return the message. + */ + template + void + take_intra_process_message( + uint64_t intra_process_publisher_id, + uint64_t message_sequence_number, + uint64_t requesting_subscriptions_intra_process_id, + std::unique_ptr & message) + { + message = nullptr; + publisher_info * info; + { + auto it = publishers_.find(intra_process_publisher_id); + if (it == publishers_.end()) { + // Publisher is either invalid or no longer exists. + return; + } + info = &it->second; + } + // Figure out how many subscriptions are left. + std::set * target_subs; + { + auto it = info->target_subscriptions_by_message_sequence.find(message_sequence_number); + if (it == info->target_subscriptions_by_message_sequence.end()) { + // Message is no longer being stored by this publisher. + return; + } + target_subs = &it->second; + } + { + auto it = std::find( + target_subs->begin(), target_subs->end(), + requesting_subscriptions_intra_process_id); + if (it == target_subs->end()) { + // This publisher id/message seq pair was not intended for this subscription. + return; + } + target_subs->erase(it); + } + typedef typename mapped_ring_buffer::MappedRingBuffer TypedMRB; + typename TypedMRB::SharedPtr typed_buffer = std::static_pointer_cast(info->buffer); + // Return a copy or the unique_ptr (ownership) depending on how many subscriptions are left. + if (target_subs->size()) { + // There are more subscriptions to serve, return a copy. + typed_buffer->get_copy_at_key(message_sequence_number, message); + } else { + // This is the last one to be returned, transfer ownership. + typed_buffer->pop_at_key(message_sequence_number, message); + } + } + +private: + static uint64_t get_next_unique_id() + { + auto next_id = next_unique_id_.fetch_add(1, std::memory_order_relaxed); + // Check for rollover (we started at 1). + if (0 == next_id) { + // This puts a technical limit on the number of times you can add a publisher or subscriber. + // But even if you could add (and remove) them at 1 kHz (very optimistic rate) + // it would still be a very long time before you could exhaust the pool of id's: + // 2^64 / 1000 times per sec / 60 sec / 60 min / 24 hours / 365 days = 584,942,417 years + // So around 585 million years. Even at 1 GHz, it would take 585 years. + // I think it's safe to avoid trying to handle overflow. + // If we roll over then it's most likely a bug. + throw std::overflow_error( + "exhausted the unique id's for publishers and subscribers in this process " + "(congratulations your computer is either extremely fast or extremely old)"); + } + return next_id; + } + + static std::atomic next_unique_id_; + + std::unordered_map subscriptions_; + std::map> subscription_ids_by_topic_; + + struct publisher_info + { + RCLCPP_DISABLE_COPY(publisher_info); + + publisher_info() = default; + + publisher::Publisher::WeakPtr publisher; + std::atomic sequence_number; + mapped_ring_buffer::MappedRingBufferBase::SharedPtr buffer; + std::unordered_map> target_subscriptions_by_message_sequence; + }; + + std::unordered_map publishers_; + +}; + +std::atomic IntraProcessManager::next_unique_id_{1}; + +} /* namespace intra_process_manager */ +} /* namespace rclcpp */ + +#endif /* RCLCPP_RCLCPP_INTRA_PROCESS_MANAGER_HPP_ */ diff --git a/rclcpp/test/test_intra_process_manager.cpp b/rclcpp/test/test_intra_process_manager.cpp new file mode 100644 index 0000000..29bc717 --- /dev/null +++ b/rclcpp/test/test_intra_process_manager.cpp @@ -0,0 +1,748 @@ +// Copyright 2015 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include + +#include + +// Mock up publisher and subscription base to avoid needing an rmw impl. +namespace rclcpp { namespace publisher { namespace mock { + +class Publisher +{ +public: + RCLCPP_SMART_PTR_DEFINITIONS(Publisher); + + Publisher() : mock_topic_name(""), mock_queue_size(0) {} + + std::string get_topic_name() const {return mock_topic_name;} + size_t get_queue_size() const {return mock_queue_size;} + + std::string mock_topic_name; + size_t mock_queue_size; +}; + +}}} + +namespace rclcpp { namespace subscription { namespace mock { + +class SubscriptionBase +{ +public: + RCLCPP_SMART_PTR_DEFINITIONS(SubscriptionBase); + + SubscriptionBase() : mock_topic_name(""), mock_queue_size(0) {} + + std::string get_topic_name() const {return mock_topic_name;} + size_t get_queue_size() const {return mock_queue_size;} + + std::string mock_topic_name; + size_t mock_queue_size; +}; + +}}} + +// Prevent rclcpp/publisher.hpp and rclcpp/subscription.hpp from being imported. +#define RCLCPP_RCLCPP_PUBLISHER_HPP_ +#define RCLCPP_RCLCPP_SUBSCRIPTION_HPP_ +// Force ipm to use our mock publisher class. +#define Publisher mock::Publisher +#define SubscriptionBase mock::SubscriptionBase +#include +#undef SubscriptionBase +#undef Publisher + +#include + +/* +This tests the "normal" usage of the class: + - Creates two publishers on different topics. + - Creates a subscription which matches one of them. + - Publishes on each publisher with different message content. + - Try's to take the message from the non-matching publish, should fail. + - Try's to take the message from the matching publish, should work. + - Asserts the message it got back was the one that went in (since there's only one subscription). + - Try's to take the message again, should fail. +*/ +TEST(test_intra_process_manager, nominal) { + rclcpp::intra_process_manager::IntraProcessManager ipm; + + auto p1 = std::make_shared(); + p1->mock_topic_name = "nominal1"; + p1->mock_queue_size = 2; + + auto p2 = std::make_shared(); + p2->mock_topic_name = "nominal2"; + p2->mock_queue_size = 10; + + auto s1 = std::make_shared(); + s1->mock_topic_name = "nominal1"; + s1->mock_queue_size = 10; + + auto p1_id = ipm.add_publisher(p1); + auto p2_id = ipm.add_publisher(p2); + auto s1_id = ipm.add_subscription(s1); + + auto ipm_msg = std::make_shared(); + ipm_msg->message_sequence = 42; + ipm_msg->publisher_id = 42; + rcl_interfaces::msg::IntraProcessMessage::UniquePtr unique_msg; + unique_msg.reset(new rcl_interfaces::msg::IntraProcessMessage(*ipm_msg)); + + auto p1_m1_original_address = unique_msg.get(); + auto p1_m1_id = ipm.store_intra_process_message(p1_id, unique_msg); + ASSERT_EQ(nullptr, unique_msg); + + ipm_msg->message_sequence = 43; + ipm_msg->publisher_id = 43; + unique_msg.reset(new rcl_interfaces::msg::IntraProcessMessage(*ipm_msg)); + + auto p2_m1_id = ipm.store_intra_process_message(p2_id, unique_msg); + ASSERT_EQ(nullptr, unique_msg); + + ipm.take_intra_process_message(p2_id, p2_m1_id, s1_id, unique_msg); + EXPECT_EQ(nullptr, unique_msg); // Should fail since p2 and s1 don't have the same topic. + unique_msg.reset(); + + ipm.take_intra_process_message(p1_id, p1_m1_id, s1_id, unique_msg); + EXPECT_NE(nullptr, unique_msg); + if (unique_msg) { + EXPECT_EQ(42ul, unique_msg->message_sequence); + EXPECT_EQ(42ul, unique_msg->publisher_id); + EXPECT_EQ(p1_m1_original_address, unique_msg.get()); + } + + ipm.take_intra_process_message(p1_id, p1_m1_id, s1_id, unique_msg); + EXPECT_EQ(nullptr, unique_msg); // Should fail, since the message was already taken. + + ipm_msg->message_sequence = 44; + ipm_msg->publisher_id = 44; + unique_msg.reset(new rcl_interfaces::msg::IntraProcessMessage(*ipm_msg)); + + ipm.store_intra_process_message(p1_id, unique_msg); + ASSERT_EQ(nullptr, unique_msg); + + ipm_msg->message_sequence = 45; + ipm_msg->publisher_id = 45; + unique_msg.reset(new rcl_interfaces::msg::IntraProcessMessage(*ipm_msg)); + + ipm.store_intra_process_message(p1_id, unique_msg); + ASSERT_EQ(nullptr, unique_msg); + + ipm_msg->message_sequence = 46; + ipm_msg->publisher_id = 46; + unique_msg.reset(new rcl_interfaces::msg::IntraProcessMessage(*ipm_msg)); + + ipm.store_intra_process_message(p1_id, unique_msg); + ASSERT_NE(nullptr, unique_msg); + if (unique_msg) { + EXPECT_EQ(44ul, unique_msg->message_sequence); + EXPECT_EQ(44ul, unique_msg->publisher_id); + } +} + +/* +Simulates the case where a publisher is removed between publishing and the matching take. + - Creates a publisher and subscription on the same topic. + - Publishes a message. + - Remove the publisher. + - Try's to take the message, should fail since the publisher (and its storage) is gone. +*/ +TEST(test_intra_process_manager, remove_publisher_before_trying_to_take) { + rclcpp::intra_process_manager::IntraProcessManager ipm; + + auto p1 = std::make_shared(); + p1->mock_topic_name = "nominal1"; + p1->mock_queue_size = 10; + + auto s1 = std::make_shared(); + s1->mock_topic_name = "nominal1"; + s1->mock_queue_size = 10; + + auto p1_id = ipm.add_publisher(p1); + auto s1_id = ipm.add_subscription(s1); + + auto ipm_msg = std::make_shared(); + ipm_msg->message_sequence = 42; + ipm_msg->publisher_id = 42; + rcl_interfaces::msg::IntraProcessMessage::UniquePtr unique_msg; + unique_msg.reset(new rcl_interfaces::msg::IntraProcessMessage(*ipm_msg)); + + auto p1_m1_id = ipm.store_intra_process_message(p1_id, unique_msg); + ASSERT_EQ(nullptr, unique_msg); + + ipm.remove_publisher(p1_id); + + ipm.take_intra_process_message(p1_id, p1_m1_id, s1_id, unique_msg); + EXPECT_EQ(nullptr, unique_msg); // Should fail, since the publisher is gone. +} + +/* +Tests whether or not removed subscriptions affect take behavior. + - Creates a publisher and three subscriptions on the same topic. + - Publish a message, keep the original point for later comparison. + - Take with one subscription, should work. + - Remove a different subscription. + - Take with the final subscription, should work. + - Assert the previous take returned ownership of the original object published. +*/ +TEST(test_intra_process_manager, removed_subscription_affects_take) { + rclcpp::intra_process_manager::IntraProcessManager ipm; + + auto p1 = std::make_shared(); + p1->mock_topic_name = "nominal1"; + p1->mock_queue_size = 10; + + auto s1 = std::make_shared(); + s1->mock_topic_name = "nominal1"; + s1->mock_queue_size = 10; + + auto s2 = std::make_shared(); + s2->mock_topic_name = "nominal1"; + s2->mock_queue_size = 10; + + auto s3 = std::make_shared(); + s3->mock_topic_name = "nominal1"; + s3->mock_queue_size = 10; + + auto p1_id = ipm.add_publisher(p1); + auto s1_id = ipm.add_subscription(s1); + auto s2_id = ipm.add_subscription(s2); + auto s3_id = ipm.add_subscription(s3); + + auto ipm_msg = std::make_shared(); + ipm_msg->message_sequence = 42; + ipm_msg->publisher_id = 42; + rcl_interfaces::msg::IntraProcessMessage::UniquePtr unique_msg; + unique_msg.reset(new rcl_interfaces::msg::IntraProcessMessage(*ipm_msg)); + + auto original_message_pointer = unique_msg.get(); + auto p1_m1_id = ipm.store_intra_process_message(p1_id, unique_msg); + ASSERT_EQ(nullptr, unique_msg); + + ipm.take_intra_process_message(p1_id, p1_m1_id, s1_id, unique_msg); + EXPECT_NE(nullptr, unique_msg); + if (unique_msg) { + EXPECT_EQ(42ul, unique_msg->message_sequence); + EXPECT_EQ(42ul, unique_msg->publisher_id); + EXPECT_NE(original_message_pointer, unique_msg.get()); + } + unique_msg.reset(); + + ipm.remove_subscription(s2_id); + + // Take using s3, the remaining subscription. + ipm.take_intra_process_message(p1_id, p1_m1_id, s3_id, unique_msg); + EXPECT_NE(nullptr, unique_msg); + if (unique_msg) { + EXPECT_EQ(42ul, unique_msg->message_sequence); + EXPECT_EQ(42ul, unique_msg->publisher_id); + // Should match the original pointer since s2 was removed first. + EXPECT_EQ(original_message_pointer, unique_msg.get()); + } + + // Take using s2, should fail since s2 was removed. + unique_msg.reset(); + ipm.take_intra_process_message(p1_id, p1_m1_id, s2_id, unique_msg); + EXPECT_EQ(nullptr, unique_msg); +} + +/* +This tests normal operation with multiple subscriptions and one publisher. + - Creates a publisher and three subscriptions on the same topic. + - Publish a message. + - Take with each subscription, checking that the last takes the original back. +*/ +TEST(test_intra_process_manager, multiple_subscriptions_one_publisher) { + rclcpp::intra_process_manager::IntraProcessManager ipm; + + auto p1 = std::make_shared(); + p1->mock_topic_name = "nominal1"; + p1->mock_queue_size = 10; + + auto s1 = std::make_shared(); + s1->mock_topic_name = "nominal1"; + s1->mock_queue_size = 10; + + auto s2 = std::make_shared(); + s2->mock_topic_name = "nominal1"; + s2->mock_queue_size = 10; + + auto s3 = std::make_shared(); + s3->mock_topic_name = "nominal1"; + s3->mock_queue_size = 10; + + auto p1_id = ipm.add_publisher(p1); + auto s1_id = ipm.add_subscription(s1); + auto s2_id = ipm.add_subscription(s2); + auto s3_id = ipm.add_subscription(s3); + + auto ipm_msg = std::make_shared(); + ipm_msg->message_sequence = 42; + ipm_msg->publisher_id = 42; + rcl_interfaces::msg::IntraProcessMessage::UniquePtr unique_msg; + unique_msg.reset(new rcl_interfaces::msg::IntraProcessMessage(*ipm_msg)); + + auto original_message_pointer = unique_msg.get(); + auto p1_m1_id = ipm.store_intra_process_message(p1_id, unique_msg); + ASSERT_EQ(nullptr, unique_msg); + + ipm.take_intra_process_message(p1_id, p1_m1_id, s1_id, unique_msg); + EXPECT_NE(nullptr, unique_msg); + if (unique_msg) { + EXPECT_EQ(42ul, unique_msg->message_sequence); + EXPECT_EQ(42ul, unique_msg->publisher_id); + EXPECT_NE(original_message_pointer, unique_msg.get()); + } + unique_msg.reset(); + + ipm.take_intra_process_message(p1_id, p1_m1_id, s2_id, unique_msg); + EXPECT_NE(nullptr, unique_msg); + if (unique_msg) { + EXPECT_EQ(42ul, unique_msg->message_sequence); + EXPECT_EQ(42ul, unique_msg->publisher_id); + EXPECT_NE(original_message_pointer, unique_msg.get()); + } + unique_msg.reset(); + + ipm.take_intra_process_message(p1_id, p1_m1_id, s3_id, unique_msg); + EXPECT_NE(nullptr, unique_msg); + if (unique_msg) { + EXPECT_EQ(42ul, unique_msg->message_sequence); + EXPECT_EQ(42ul, unique_msg->publisher_id); + // Should match the original pointer. + EXPECT_EQ(original_message_pointer, unique_msg.get()); + } +} + +/* +This tests normal operation with multiple publishers and one subscription. + - Creates a publisher and three subscriptions on the same topic. + - Publish a message. + - Take with each subscription, checking that the last takes the original back. +*/ +TEST(test_intra_process_manager, multiple_publishers_one_subscription) { + rclcpp::intra_process_manager::IntraProcessManager ipm; + + auto p1 = std::make_shared(); + p1->mock_topic_name = "nominal1"; + p1->mock_queue_size = 10; + + auto p2 = std::make_shared(); + p2->mock_topic_name = "nominal1"; + p2->mock_queue_size = 10; + + auto p3 = std::make_shared(); + p3->mock_topic_name = "nominal1"; + p3->mock_queue_size = 10; + + auto s1 = std::make_shared(); + s1->mock_topic_name = "nominal1"; + s1->mock_queue_size = 10; + + auto p1_id = ipm.add_publisher(p1); + auto p2_id = ipm.add_publisher(p2); + auto p3_id = ipm.add_publisher(p3); + auto s1_id = ipm.add_subscription(s1); + + auto ipm_msg = std::make_shared(); + // First publish + ipm_msg->message_sequence = 42; + ipm_msg->publisher_id = 42; + rcl_interfaces::msg::IntraProcessMessage::UniquePtr unique_msg; + unique_msg.reset(new rcl_interfaces::msg::IntraProcessMessage(*ipm_msg)); + + auto original_message_pointer1 = unique_msg.get(); + auto p1_m1_id = ipm.store_intra_process_message(p1_id, unique_msg); + ASSERT_EQ(nullptr, unique_msg); + + // Second publish + ipm_msg->message_sequence = 43; + ipm_msg->publisher_id = 43; + unique_msg.reset(new rcl_interfaces::msg::IntraProcessMessage(*ipm_msg)); + + auto original_message_pointer2 = unique_msg.get(); + auto p2_m1_id = ipm.store_intra_process_message(p2_id, unique_msg); + ASSERT_EQ(nullptr, unique_msg); + + // Third publish + ipm_msg->message_sequence = 44; + ipm_msg->publisher_id = 44; + unique_msg.reset(new rcl_interfaces::msg::IntraProcessMessage(*ipm_msg)); + + auto original_message_pointer3 = unique_msg.get(); + auto p3_m1_id = ipm.store_intra_process_message(p3_id, unique_msg); + ASSERT_EQ(nullptr, unique_msg); + + // First take + ipm.take_intra_process_message(p1_id, p1_m1_id, s1_id, unique_msg); + EXPECT_NE(nullptr, unique_msg); + if (unique_msg) { + EXPECT_EQ(42ul, unique_msg->message_sequence); + EXPECT_EQ(42ul, unique_msg->publisher_id); + EXPECT_EQ(original_message_pointer1, unique_msg.get()); + } + unique_msg.reset(); + + // Second take + ipm.take_intra_process_message(p2_id, p2_m1_id, s1_id, unique_msg); + EXPECT_NE(nullptr, unique_msg); + if (unique_msg) { + EXPECT_EQ(43ul, unique_msg->message_sequence); + EXPECT_EQ(43ul, unique_msg->publisher_id); + EXPECT_EQ(original_message_pointer2, unique_msg.get()); + } + unique_msg.reset(); + + // Third take + ipm.take_intra_process_message(p3_id, p3_m1_id, s1_id, unique_msg); + EXPECT_NE(nullptr, unique_msg); + if (unique_msg) { + EXPECT_EQ(44ul, unique_msg->message_sequence); + EXPECT_EQ(44ul, unique_msg->publisher_id); + EXPECT_EQ(original_message_pointer3, unique_msg.get()); + } + unique_msg.reset(); +} + +/* +This tests normal operation with multiple publishers and subscriptions. + - Creates three publishers and three subscriptions on the same topic. + - Publish a message on each publisher. + - Take from each publisher with each subscription, checking the pointer. +*/ +TEST(test_intra_process_manager, multiple_publishers_multiple_subscription) { + rclcpp::intra_process_manager::IntraProcessManager ipm; + + auto p1 = std::make_shared(); + p1->mock_topic_name = "nominal1"; + p1->mock_queue_size = 10; + + auto p2 = std::make_shared(); + p2->mock_topic_name = "nominal1"; + p2->mock_queue_size = 10; + + auto p3 = std::make_shared(); + p3->mock_topic_name = "nominal1"; + p3->mock_queue_size = 10; + + auto s1 = std::make_shared(); + s1->mock_topic_name = "nominal1"; + s1->mock_queue_size = 10; + + auto s2 = std::make_shared(); + s2->mock_topic_name = "nominal1"; + s2->mock_queue_size = 10; + + auto s3 = std::make_shared(); + s3->mock_topic_name = "nominal1"; + s3->mock_queue_size = 10; + + auto p1_id = ipm.add_publisher(p1); + auto p2_id = ipm.add_publisher(p2); + auto p3_id = ipm.add_publisher(p3); + auto s1_id = ipm.add_subscription(s1); + auto s2_id = ipm.add_subscription(s2); + auto s3_id = ipm.add_subscription(s3); + + auto ipm_msg = std::make_shared(); + // First publish + ipm_msg->message_sequence = 42; + ipm_msg->publisher_id = 42; + rcl_interfaces::msg::IntraProcessMessage::UniquePtr unique_msg; + unique_msg.reset(new rcl_interfaces::msg::IntraProcessMessage(*ipm_msg)); + + auto original_message_pointer1 = unique_msg.get(); + auto p1_m1_id = ipm.store_intra_process_message(p1_id, unique_msg); + ASSERT_EQ(nullptr, unique_msg); + + // Second publish + ipm_msg->message_sequence = 43; + ipm_msg->publisher_id = 43; + unique_msg.reset(new rcl_interfaces::msg::IntraProcessMessage(*ipm_msg)); + + auto original_message_pointer2 = unique_msg.get(); + auto p2_m1_id = ipm.store_intra_process_message(p2_id, unique_msg); + ASSERT_EQ(nullptr, unique_msg); + + // Third publish + ipm_msg->message_sequence = 44; + ipm_msg->publisher_id = 44; + unique_msg.reset(new rcl_interfaces::msg::IntraProcessMessage(*ipm_msg)); + + auto original_message_pointer3 = unique_msg.get(); + auto p3_m1_id = ipm.store_intra_process_message(p3_id, unique_msg); + ASSERT_EQ(nullptr, unique_msg); + + // First take + ipm.take_intra_process_message(p1_id, p1_m1_id, s1_id, unique_msg); + EXPECT_NE(nullptr, unique_msg); + if (unique_msg) { + EXPECT_EQ(42ul, unique_msg->message_sequence); + EXPECT_EQ(42ul, unique_msg->publisher_id); + EXPECT_NE(original_message_pointer1, unique_msg.get()); + } + unique_msg.reset(); + + ipm.take_intra_process_message(p1_id, p1_m1_id, s2_id, unique_msg); + EXPECT_NE(nullptr, unique_msg); + if (unique_msg) { + EXPECT_EQ(42ul, unique_msg->message_sequence); + EXPECT_EQ(42ul, unique_msg->publisher_id); + EXPECT_NE(original_message_pointer1, unique_msg.get()); + } + unique_msg.reset(); + + ipm.take_intra_process_message(p1_id, p1_m1_id, s3_id, unique_msg); + EXPECT_NE(nullptr, unique_msg); + if (unique_msg) { + EXPECT_EQ(42ul, unique_msg->message_sequence); + EXPECT_EQ(42ul, unique_msg->publisher_id); + EXPECT_EQ(original_message_pointer1, unique_msg.get()); // Final take. + } + unique_msg.reset(); + + // Second take + ipm.take_intra_process_message(p2_id, p2_m1_id, s1_id, unique_msg); + EXPECT_NE(nullptr, unique_msg); + if (unique_msg) { + EXPECT_EQ(43ul, unique_msg->message_sequence); + EXPECT_EQ(43ul, unique_msg->publisher_id); + EXPECT_NE(original_message_pointer2, unique_msg.get()); + } + unique_msg.reset(); + + ipm.take_intra_process_message(p2_id, p2_m1_id, s2_id, unique_msg); + EXPECT_NE(nullptr, unique_msg); + if (unique_msg) { + EXPECT_EQ(43ul, unique_msg->message_sequence); + EXPECT_EQ(43ul, unique_msg->publisher_id); + EXPECT_NE(original_message_pointer2, unique_msg.get()); + } + unique_msg.reset(); + + ipm.take_intra_process_message(p2_id, p2_m1_id, s3_id, unique_msg); + EXPECT_NE(nullptr, unique_msg); + if (unique_msg) { + EXPECT_EQ(43ul, unique_msg->message_sequence); + EXPECT_EQ(43ul, unique_msg->publisher_id); + EXPECT_EQ(original_message_pointer2, unique_msg.get()); + } + unique_msg.reset(); + + // Third take + ipm.take_intra_process_message(p3_id, p3_m1_id, s1_id, unique_msg); + EXPECT_NE(nullptr, unique_msg); + if (unique_msg) { + EXPECT_EQ(44ul, unique_msg->message_sequence); + EXPECT_EQ(44ul, unique_msg->publisher_id); + EXPECT_NE(original_message_pointer3, unique_msg.get()); + } + unique_msg.reset(); + + ipm.take_intra_process_message(p3_id, p3_m1_id, s2_id, unique_msg); + EXPECT_NE(nullptr, unique_msg); + if (unique_msg) { + EXPECT_EQ(44ul, unique_msg->message_sequence); + EXPECT_EQ(44ul, unique_msg->publisher_id); + EXPECT_NE(original_message_pointer3, unique_msg.get()); + } + unique_msg.reset(); + + ipm.take_intra_process_message(p3_id, p3_m1_id, s3_id, unique_msg); + EXPECT_NE(nullptr, unique_msg); + if (unique_msg) { + EXPECT_EQ(44ul, unique_msg->message_sequence); + EXPECT_EQ(44ul, unique_msg->publisher_id); + EXPECT_EQ(original_message_pointer3, unique_msg.get()); + } + unique_msg.reset(); +} + +/* +Tests displacing a message from the ring buffer before take is called. + - Creates a publisher (buffer_size = 2) and a subscription on the same topic. + - Publish a message on the publisher. + - Publish another message. + - Take the second message. + - Publish a message. + - Try to take the first message, should fail. +*/ +TEST(test_intra_process_manager, ring_buffer_displacement) { + rclcpp::intra_process_manager::IntraProcessManager ipm; + + auto p1 = std::make_shared(); + p1->mock_topic_name = "nominal1"; + p1->mock_queue_size = 2; + + auto s1 = std::make_shared(); + s1->mock_topic_name = "nominal1"; + s1->mock_queue_size = 10; + + auto p1_id = ipm.add_publisher(p1); + auto s1_id = ipm.add_subscription(s1); + + auto ipm_msg = std::make_shared(); + ipm_msg->message_sequence = 42; + ipm_msg->publisher_id = 42; + rcl_interfaces::msg::IntraProcessMessage::UniquePtr unique_msg; + unique_msg.reset(new rcl_interfaces::msg::IntraProcessMessage(*ipm_msg)); + + auto original_message_pointer1 = unique_msg.get(); + auto p1_m1_id = ipm.store_intra_process_message(p1_id, unique_msg); + ASSERT_EQ(nullptr, unique_msg); + + ipm_msg->message_sequence = 43; + ipm_msg->publisher_id = 43; + unique_msg.reset(new rcl_interfaces::msg::IntraProcessMessage(*ipm_msg)); + + auto original_message_pointer2 = unique_msg.get(); + auto p1_m2_id = ipm.store_intra_process_message(p1_id, unique_msg); + ASSERT_EQ(nullptr, unique_msg); + + ipm.take_intra_process_message(p1_id, p1_m2_id, s1_id, unique_msg); + EXPECT_NE(nullptr, unique_msg); + if (unique_msg) { + EXPECT_EQ(43ul, unique_msg->message_sequence); + EXPECT_EQ(43ul, unique_msg->publisher_id); + EXPECT_EQ(original_message_pointer2, unique_msg.get()); + } + unique_msg.reset(); + + ipm_msg->message_sequence = 44; + ipm_msg->publisher_id = 44; + unique_msg.reset(new rcl_interfaces::msg::IntraProcessMessage(*ipm_msg)); + + ipm.store_intra_process_message(p1_id, unique_msg); + EXPECT_NE(nullptr, unique_msg); // Should return the thing in the ring buffer it displaced. + if (unique_msg) { + // This should have been the first published message. + EXPECT_EQ(42ul, unique_msg->message_sequence); + EXPECT_EQ(42ul, unique_msg->publisher_id); + EXPECT_EQ(original_message_pointer1, unique_msg.get()); + } + unique_msg.reset(); + + // Since it just got displaced it should no longer be there to take. + ipm.take_intra_process_message(p1_id, p1_m1_id, s1_id, unique_msg); + EXPECT_EQ(nullptr, unique_msg); +} + +/* +Simulates race condition where a subscription is created after publish. + - Creates a publisher. + - Publish a message on the publisher. + - Create a subscription on the same topic. + - Try to take the message with the newly created subscription, should fail. +*/ +TEST(test_intra_process_manager, subscription_creation_race_condition) { + rclcpp::intra_process_manager::IntraProcessManager ipm; + + auto p1 = std::make_shared(); + p1->mock_topic_name = "nominal1"; + p1->mock_queue_size = 2; + + auto p1_id = ipm.add_publisher(p1); + + auto ipm_msg = std::make_shared(); + ipm_msg->message_sequence = 42; + ipm_msg->publisher_id = 42; + rcl_interfaces::msg::IntraProcessMessage::UniquePtr unique_msg; + unique_msg.reset(new rcl_interfaces::msg::IntraProcessMessage(*ipm_msg)); + + auto p1_m1_id = ipm.store_intra_process_message(p1_id, unique_msg); + ASSERT_EQ(nullptr, unique_msg); + + auto s1 = std::make_shared(); + s1->mock_topic_name = "nominal1"; + s1->mock_queue_size = 10; + + auto s1_id = ipm.add_subscription(s1); + + ipm.take_intra_process_message(p1_id, p1_m1_id, s1_id, unique_msg); + EXPECT_EQ(nullptr, unique_msg); +} + +/* +Simulates race condition where a publisher goes out of scope before take. + - Create a subscription. + - Creates a publisher on the same topic in a scope. + - Publish a message on the publisher in a scope. + - Let the scope expire. + - Try to take the message with the subscription, should fail. +*/ +TEST(test_intra_process_manager, publisher_out_of_scope_take) { + rclcpp::intra_process_manager::IntraProcessManager ipm; + + auto s1 = std::make_shared(); + s1->mock_topic_name = "nominal1"; + s1->mock_queue_size = 10; + + auto s1_id = ipm.add_subscription(s1); + + uint64_t p1_id; + uint64_t p1_m1_id; + { + auto p1 = std::make_shared(); + p1->mock_topic_name = "nominal1"; + p1->mock_queue_size = 2; + + p1_id = ipm.add_publisher(p1); + + auto ipm_msg = std::make_shared(); + ipm_msg->message_sequence = 42; + ipm_msg->publisher_id = 42; + rcl_interfaces::msg::IntraProcessMessage::UniquePtr unique_msg; + unique_msg.reset(new rcl_interfaces::msg::IntraProcessMessage(*ipm_msg)); + + p1_m1_id = ipm.store_intra_process_message(p1_id, unique_msg); + ASSERT_EQ(nullptr, unique_msg); + + // Explicitly remove publisher from ipm (emulate's publisher's destructor). + ipm.remove_publisher(p1_id); + } + + rcl_interfaces::msg::IntraProcessMessage::UniquePtr unique_msg(nullptr); + // Should fail because the publisher is out of scope. + ipm.take_intra_process_message(p1_id, p1_m1_id, s1_id, unique_msg); + EXPECT_EQ(nullptr, unique_msg); +} + +/* +Simulates race condition where a publisher goes out of scope before store. + - Creates a publisher in a scope. + - Let the scope expire. + - Publish a message on the publisher in a scope, should throw. +*/ +TEST(test_intra_process_manager, publisher_out_of_scope_store) { + rclcpp::intra_process_manager::IntraProcessManager ipm; + + uint64_t p1_id; + { + auto p1 = std::make_shared(); + p1->mock_topic_name = "nominal1"; + p1->mock_queue_size = 2; + + p1_id = ipm.add_publisher(p1); + } + + auto ipm_msg = std::make_shared(); + ipm_msg->message_sequence = 42; + ipm_msg->publisher_id = 42; + rcl_interfaces::msg::IntraProcessMessage::UniquePtr unique_msg; + unique_msg.reset(new rcl_interfaces::msg::IntraProcessMessage(*ipm_msg)); + + EXPECT_THROW(ipm.store_intra_process_message(p1_id, unique_msg), std::runtime_error); + ASSERT_EQ(nullptr, unique_msg); +}