diff --git a/rclcpp/include/rclcpp/intra_process_manager.hpp b/rclcpp/include/rclcpp/intra_process_manager.hpp index 21739cf..6ef4f18 100644 --- a/rclcpp/include/rclcpp/intra_process_manager.hpp +++ b/rclcpp/include/rclcpp/intra_process_manager.hpp @@ -15,6 +15,7 @@ #ifndef RCLCPP_RCLCPP_INTRA_PROCESS_MANAGER_HPP_ #define RCLCPP_RCLCPP_INTRA_PROCESS_MANAGER_HPP_ +#include #include #include #include @@ -24,7 +25,6 @@ #include #include #include -#include #include #include #include @@ -122,7 +122,10 @@ private: public: RCLCPP_SMART_PTR_DEFINITIONS(IntraProcessManager); - IntraProcessManager() = default; + //IntraProcessManager() = default; + IntraProcessManager(IntraProcessManagerStateBase::SharedPtr state = create_default_state()) + : state_(state) { + } /// Register a subscription with the manager, returns subscriptions unique id. /* In addition to generating a unique intra process id for the subscription, @@ -140,8 +143,7 @@ public: add_subscription(subscription::SubscriptionBase::SharedPtr subscription) { auto id = IntraProcessManager::get_next_unique_id(); - subscriptions_[id] = subscription; - subscription_ids_by_topic_[subscription->get_topic_name()].insert(id); + state_->add_subscription(id, subscription); return id; } @@ -153,17 +155,7 @@ public: void remove_subscription(uint64_t intra_process_subscription_id) { - subscriptions_.erase(intra_process_subscription_id); - for (auto & pair : subscription_ids_by_topic_) { - pair.second.erase(intra_process_subscription_id); - } - // Iterate over all publisher infos and all stored subscription id's and - // remove references to this subscription's id. - for (auto & publisher_pair : publishers_) { - for (auto & sub_pair : publisher_pair.second.target_subscriptions_by_message_sequence) { - sub_pair.second.erase(intra_process_subscription_id); - } - } + state_->remove_subscription(intra_process_subscription_id); } /// Register a publisher with the manager, returns the publisher unique id. @@ -194,18 +186,11 @@ public: size_t buffer_size = 0) { auto id = IntraProcessManager::get_next_unique_id(); - publishers_[id].publisher = publisher; size_t size = buffer_size > 0 ? buffer_size : publisher->get_queue_size(); - // As long as the size of the ring buffer is less than the max sequence number, we're safe. - if (size > std::numeric_limits::max()) { - throw std::invalid_argument("the calculated buffer size is too large"); - } - publishers_[id].sequence_number.store(0); - publishers_[id].buffer = - mapped_ring_buffer::MappedRingBuffer::MessageAlloc>::make_shared( size, publisher->get_allocator()); - publishers_[id].target_subscriptions_by_message_sequence.reserve(size); + state_->add_publisher(id, publisher, mrb, size); return id; } @@ -217,7 +202,7 @@ public: void remove_publisher(uint64_t intra_process_publisher_id) { - publishers_.erase(intra_process_publisher_id); + state_->remove_publisher(intra_process_publisher_id); } /// Store a message in the manager, and return the message sequence number. @@ -258,37 +243,23 @@ public: typename allocator::Deleter::template rebind_alloc, MessageT>> & message) { - auto it = publishers_.find(intra_process_publisher_id); - if (it == publishers_.end()) { - throw std::runtime_error("store_intra_process_message called with invalid publisher id"); - } - PublisherInfo & info = it->second; - // Calculate the next message sequence number. - uint64_t message_seq = info.sequence_number.fetch_add(1, std::memory_order_relaxed); - // Insert the message into the ring buffer using the message_seq to identify it. using MRBMessageAlloc = typename std::allocator_traits::template rebind_alloc; typedef typename mapped_ring_buffer::MappedRingBuffer TypedMRB; - typename TypedMRB::SharedPtr typed_buffer = std::static_pointer_cast(info.buffer); + + mapped_ring_buffer::MappedRingBufferBase::SharedPtr buffer; + auto message_seq = state_->get_publisher_info_for_id(intra_process_publisher_id, buffer); + typename TypedMRB::SharedPtr typed_buffer = std::static_pointer_cast(buffer); + if (!typed_buffer) { + throw std::runtime_error("Typecast failed due to incorrect message type"); + } + + // Insert the message into the ring buffer using the message_seq to identify it. bool did_replace = typed_buffer->push_and_replace(message_seq, message); // TODO(wjwwood): do something when a message was displaced. log debug? (void)did_replace; // Avoid unused variable warning. - // Figure out what subscriptions should receive the message. - auto publisher = info.publisher.lock(); - if (!publisher) { - throw std::runtime_error("publisher has unexpectedly gone out of scope"); - } - auto & destined_subscriptions = subscription_ids_by_topic_[publisher->get_topic_name()]; - // Store the list for later comparison. - info.target_subscriptions_by_message_sequence[message_seq].clear(); - std::copy( - destined_subscriptions.begin(), destined_subscriptions.end(), - // Memory allocation occurs in info.target_subscriptions_by_message_sequence[message_seq] - std::inserter( - info.target_subscriptions_by_message_sequence[message_seq], - // This ends up only being a hint to std::set, could also be .begin(). - info.target_subscriptions_by_message_sequence[message_seq].end() - ) - ); + + state_->store_intra_process_message(intra_process_publisher_id, message_seq); + // Return the message sequence which is sent to the subscription. return message_seq; } @@ -337,41 +308,22 @@ public: typename allocator::Deleter::template rebind_alloc, MessageT>> & message) { - message = nullptr; - PublisherInfo * info; - { - auto it = publishers_.find(intra_process_publisher_id); - if (it == publishers_.end()) { - // Publisher is either invalid or no longer exists. - return; - } - info = &it->second; - } - // Figure out how many subscriptions are left. - std::set * target_subs; - { - auto it = info->target_subscriptions_by_message_sequence.find(message_sequence_number); - if (it == info->target_subscriptions_by_message_sequence.end()) { - // Message is no longer being stored by this publisher. - return; - } - target_subs = &it->second; - } - { - auto it = std::find( - target_subs->begin(), target_subs->end(), - requesting_subscriptions_intra_process_id); - if (it == target_subs->end()) { - // This publisher id/message seq pair was not intended for this subscription. - return; - } - target_subs->erase(it); - } using MRBMessageAlloc = typename std::allocator_traits::template rebind_alloc; typedef typename mapped_ring_buffer::MappedRingBuffer TypedMRB; - typename TypedMRB::SharedPtr typed_buffer = std::static_pointer_cast(info->buffer); + + mapped_ring_buffer::MappedRingBufferBase::SharedPtr buffer; + size_t target_subs_size = state_->take_intra_process_message( + intra_process_publisher_id, + message_sequence_number, + requesting_subscriptions_intra_process_id, + buffer + ); + typename TypedMRB::SharedPtr typed_buffer = std::static_pointer_cast(buffer); + if (!typed_buffer) { + return; + } // Return a copy or the unique_ptr (ownership) depending on how many subscriptions are left. - if (target_subs->size()) { + if (target_subs_size) { // There are more subscriptions to serve, return a copy. typed_buffer->get_copy_at_key(message_sequence_number, message); } else { @@ -384,16 +336,7 @@ public: bool matches_any_publishers(const rmw_gid_t * id) const { - for (auto & publisher_pair : publishers_) { - auto publisher = publisher_pair.second.publisher.lock(); - if (!publisher) { - continue; - } - if (*publisher.get() == id) { - return true; - } - } - return false; + return state_->matches_any_publishers(id); } private: @@ -420,22 +363,7 @@ private: static std::atomic next_unique_id_; - std::unordered_map subscriptions_; - std::map> subscription_ids_by_topic_; - - struct PublisherInfo - { - RCLCPP_DISABLE_COPY(PublisherInfo); - - PublisherInfo() = default; - - publisher::PublisherBase::WeakPtr publisher; - std::atomic sequence_number; - mapped_ring_buffer::MappedRingBufferBase::SharedPtr buffer; - std::unordered_map> target_subscriptions_by_message_sequence; - }; - - std::unordered_map publishers_; + IntraProcessManagerStateBase::SharedPtr state_; }; diff --git a/rclcpp/include/rclcpp/intra_process_manager_state.hpp b/rclcpp/include/rclcpp/intra_process_manager_state.hpp new file mode 100644 index 0000000..906d9a2 --- /dev/null +++ b/rclcpp/include/rclcpp/intra_process_manager_state.hpp @@ -0,0 +1,271 @@ +// Copyright 2015 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef RCLCPP__RCLCPP__INTRA_PROCESS_MANAGER_STATE_HPP_ +#define RCLCPP__RCLCPP__INTRA_PROCESS_MANAGER_STATE_HPP_ + +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include + +namespace rclcpp +{ +namespace intra_process_manager +{ + +class IntraProcessManagerStateBase { +public: + //RCLCPP_SMART_PTR_DEFINITIONS(IntraProcessManagerStateBase); + using SharedPtr = std::shared_ptr; + + virtual void + add_subscription(uint64_t id, subscription::SubscriptionBase::SharedPtr subscription) = 0; + + virtual void + remove_subscription(uint64_t intra_process_subscription_id) = 0; + + virtual void add_publisher(uint64_t id, + publisher::PublisherBase::WeakPtr publisher, + mapped_ring_buffer::MappedRingBufferBase::SharedPtr mrb, + size_t size) = 0; + + virtual void + remove_publisher(uint64_t intra_process_publisher_id) = 0; + + virtual uint64_t + get_publisher_info_for_id( + uint64_t intra_process_publisher_id, + mapped_ring_buffer::MappedRingBufferBase::SharedPtr & mrb) = 0; + + virtual void + store_intra_process_message(uint64_t intra_process_publisher_id, uint64_t message_seq) = 0; + + virtual size_t + take_intra_process_message(uint64_t intra_process_publisher_id, + uint64_t message_sequence_number, + uint64_t requesting_subscriptions_intra_process_id, + mapped_ring_buffer::MappedRingBufferBase::SharedPtr & mrb) = 0; + + virtual bool + matches_any_publishers(const rmw_gid_t * id) const = 0; +}; + +template> +class IntraProcessManagerState : public IntraProcessManagerStateBase { +public: + + void + add_subscription(uint64_t id, subscription::SubscriptionBase::SharedPtr subscription) { + subscriptions_[id] = subscription; + subscription_ids_by_topic_[subscription->get_topic_name()].insert(id); + } + + void + remove_subscription(uint64_t intra_process_subscription_id) { + subscriptions_.erase(intra_process_subscription_id); + for (auto & pair : subscription_ids_by_topic_) { + pair.second.erase(intra_process_subscription_id); + } + // Iterate over all publisher infos and all stored subscription id's and + // remove references to this subscription's id. + for (auto & publisher_pair : publishers_) { + for (auto & sub_pair : publisher_pair.second.target_subscriptions_by_message_sequence) { + sub_pair.second.erase(intra_process_subscription_id); + } + } + + } + + void add_publisher(uint64_t id, + publisher::PublisherBase::WeakPtr publisher, + mapped_ring_buffer::MappedRingBufferBase::SharedPtr mrb, + size_t size) { + + publishers_[id].publisher = publisher; + // As long as the size of the ring buffer is less than the max sequence number, we're safe. + if (size > std::numeric_limits::max()) { + throw std::invalid_argument("the calculated buffer size is too large"); + } + publishers_[id].sequence_number.store(0); + + publishers_[id].buffer = mrb; + publishers_[id].target_subscriptions_by_message_sequence.reserve(size); + } + + void + remove_publisher(uint64_t intra_process_publisher_id) { + publishers_.erase(intra_process_publisher_id); + } + + // TODO + // return message_seq and mrb + uint64_t + get_publisher_info_for_id( + uint64_t intra_process_publisher_id, + mapped_ring_buffer::MappedRingBufferBase::SharedPtr & mrb) + { + auto it = publishers_.find(intra_process_publisher_id); + if (it == publishers_.end()) { + throw std::runtime_error("store_intra_process_message called with invalid publisher id"); + } + PublisherInfo & info = it->second; + mrb = info.buffer; + // Calculate the next message sequence number. + uint64_t message_seq = info.sequence_number.fetch_add(1, std::memory_order_relaxed); + + return message_seq; + } + + void + store_intra_process_message(uint64_t intra_process_publisher_id, uint64_t message_seq) + { + auto it = publishers_.find(intra_process_publisher_id); + if (it == publishers_.end()) { + throw std::runtime_error("store_intra_process_message called with invalid publisher id"); + } + PublisherInfo & info = it->second; + auto publisher = info.publisher.lock(); + if (!publisher) { + throw std::runtime_error("publisher has unexpectedly gone out of scope"); + } + + // Figure out what subscriptions should receive the message. + auto & destined_subscriptions = subscription_ids_by_topic_[publisher->get_topic_name()]; + // Store the list for later comparison. + info.target_subscriptions_by_message_sequence[message_seq].clear(); + std::copy( + destined_subscriptions.begin(), destined_subscriptions.end(), + // Memory allocation occurs in info.target_subscriptions_by_message_sequence[message_seq] + std::inserter( + info.target_subscriptions_by_message_sequence[message_seq], + // This ends up only being a hint to std::set, could also be .begin(). + info.target_subscriptions_by_message_sequence[message_seq].end() + ) + ); + + } + + size_t + take_intra_process_message(uint64_t intra_process_publisher_id, + uint64_t message_sequence_number, + uint64_t requesting_subscriptions_intra_process_id, + mapped_ring_buffer::MappedRingBufferBase::SharedPtr & mrb + ) + { + mrb = nullptr; + PublisherInfo * info; + { + auto it = publishers_.find(intra_process_publisher_id); + if (it == publishers_.end()) { + // Publisher is either invalid or no longer exists. + return 0; + } + info = &it->second; + } + // Figure out how many subscriptions are left. + AllocSet * target_subs; + { + auto it = info->target_subscriptions_by_message_sequence.find(message_sequence_number); + if (it == info->target_subscriptions_by_message_sequence.end()) { + // Message is no longer being stored by this publisher. + return 0; + } + target_subs = &it->second; + } + { + auto it = std::find( + target_subs->begin(), target_subs->end(), + requesting_subscriptions_intra_process_id); + if (it == target_subs->end()) { + // This publisher id/message seq pair was not intended for this subscription. + return 0; + } + target_subs->erase(it); + } + mrb = info->buffer; + return target_subs->size(); + } + + bool + matches_any_publishers(const rmw_gid_t * id) const + { + for (auto & publisher_pair : publishers_) { + auto publisher = publisher_pair.second.publisher.lock(); + if (!publisher) { + continue; + } + if (*publisher.get() == id) { + return true; + } + } + return false; + } + + +private: + template + using RebindAlloc = typename std::allocator_traits::template rebind_alloc; + + //using AllocString = std::basic_string, RebindAlloc>; + using AllocString = std::string; + using AllocSet = std::set, RebindAlloc>; + using SubscriptionMap = std::unordered_map, std::equal_to, + RebindAlloc>>; + using IDTopicMap = std::map, RebindAlloc>>; + + SubscriptionMap subscriptions_; + + IDTopicMap subscription_ids_by_topic_; + + struct PublisherInfo + { + RCLCPP_DISABLE_COPY(PublisherInfo); + + PublisherInfo() = default; + + publisher::PublisherBase::WeakPtr publisher; + std::atomic sequence_number; + mapped_ring_buffer::MappedRingBufferBase::SharedPtr buffer; + + using TargetSubscriptionsMap = std::unordered_map, std::equal_to, RebindAlloc>>; + TargetSubscriptionsMap target_subscriptions_by_message_sequence; + }; + + using PublisherMap = std::unordered_map, std::equal_to, + RebindAlloc>>; + + PublisherMap publishers_; + +}; + +static IntraProcessManagerStateBase::SharedPtr create_default_state() { + return std::make_shared>(); +} + +} +} + +#endif