From ea21d9263a1932dad2cba224d3ab1c58e4311a78 Mon Sep 17 00:00:00 2001 From: Jackie Kay Date: Thu, 22 Oct 2015 15:58:00 -0700 Subject: [PATCH] rebind allocator in mapped_ring_buffer --- .../rclcpp/allocator/allocator_common.hpp | 4 +- .../rclcpp/allocator/allocator_deleter.hpp | 3 +- .../rclcpp/any_subscription_callback.hpp | 31 +++++------ .../include/rclcpp/intra_process_manager.hpp | 25 +++++---- rclcpp/include/rclcpp/mapped_ring_buffer.hpp | 36 +++++++++---- .../rclcpp/message_memory_strategy.hpp | 16 +++--- rclcpp/include/rclcpp/node_impl.hpp | 7 +-- rclcpp/include/rclcpp/publisher.hpp | 29 +++++----- .../strategies/allocator_memory_strategy.hpp | 53 ++++++++++++------- rclcpp/include/rclcpp/subscription.hpp | 5 +- rclcpp/test/test_intra_process_manager.cpp | 15 +++++- 11 files changed, 143 insertions(+), 81 deletions(-) diff --git a/rclcpp/include/rclcpp/allocator/allocator_common.hpp b/rclcpp/include/rclcpp/allocator/allocator_common.hpp index 6f980b6..5d493b3 100644 --- a/rclcpp/include/rclcpp/allocator/allocator_common.hpp +++ b/rclcpp/include/rclcpp/allocator/allocator_common.hpp @@ -12,8 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. -#ifndef RCLCPP_RCLCPP_ALLOCATOR_COMMON_HPP_ -#define RCLCPP_RCLCPP_ALLOCATOR_COMMON_HPP_ +#ifndef RCLCPP_RCLCPP__ALLOCATOR_COMMON__HPP_ +#define RCLCPP_RCLCPP__ALLOCATOR_COMMON__HPP_ #include diff --git a/rclcpp/include/rclcpp/allocator/allocator_deleter.hpp b/rclcpp/include/rclcpp/allocator/allocator_deleter.hpp index 2eb00ed..8e948b6 100644 --- a/rclcpp/include/rclcpp/allocator/allocator_deleter.hpp +++ b/rclcpp/include/rclcpp/allocator/allocator_deleter.hpp @@ -50,7 +50,8 @@ public: void operator()(U * ptr) { std::allocator_traits>::destroy(*allocator_, ptr); - std::allocator_traits>::deallocate(*allocator_, ptr, sizeof(U)); + // TODO: figure out if we're guaranteed to be destroying only 1 item here + std::allocator_traits>::deallocate(*allocator_, ptr, 1); ptr = NULL; } diff --git a/rclcpp/include/rclcpp/any_subscription_callback.hpp b/rclcpp/include/rclcpp/any_subscription_callback.hpp index 7b420f6..cb9a23c 100644 --- a/rclcpp/include/rclcpp/any_subscription_callback.hpp +++ b/rclcpp/include/rclcpp/any_subscription_callback.hpp @@ -33,9 +33,10 @@ namespace any_subscription_callback template class AnySubscriptionCallback { - using MessageAlloc = allocator::AllocRebind; - using MessageDeleter = allocator::Deleter; - using UniquePtr = std::unique_ptr; + using MessageAllocTraits = allocator::AllocRebind; + using MessageAlloc = typename MessageAllocTraits::allocator_type; + using MessageDeleter = allocator::Deleter; + using MessageUniquePtr = std::unique_ptr; using SharedPtrCallback = std::function)>; using SharedPtrWithInfoCallback = @@ -43,8 +44,8 @@ class AnySubscriptionCallback using ConstSharedPtrCallback = std::function)>; using ConstSharedPtrWithInfoCallback = std::function, const rmw_message_info_t &)>; - using UniquePtrCallback = std::function; - using UniquePtrWithInfoCallback = std::function; + using UniquePtrCallback = std::function; + using UniquePtrWithInfoCallback = std::function; SharedPtrCallback shared_ptr_callback_; SharedPtrWithInfoCallback shared_ptr_with_info_callback_; @@ -59,8 +60,8 @@ public: const_shared_ptr_callback_(nullptr), const_shared_ptr_with_info_callback_(nullptr), unique_ptr_callback_(nullptr), unique_ptr_with_info_callback_(nullptr) { - message_allocator_ = new typename MessageAlloc::allocator_type(*allocator.get()); - allocator::set_allocator_for_deleter(&message_deleter_, message_allocator_); + message_allocator_ = std::make_shared(*allocator.get()); + allocator::set_allocator_for_deleter(&message_deleter_, message_allocator_.get()); } AnySubscriptionCallback(const AnySubscriptionCallback &) = default; @@ -162,20 +163,20 @@ public: } else if (const_shared_ptr_with_info_callback_) { const_shared_ptr_with_info_callback_(message, message_info); } else if (unique_ptr_callback_) { - auto ptr = MessageAlloc::allocate(*message_allocator_, 1); - MessageAlloc::construct(*message_allocator_, ptr, *message); - unique_ptr_callback_(UniquePtr(ptr, message_deleter_)); + auto ptr = MessageAllocTraits::allocate(*message_allocator_.get(), 1); + MessageAllocTraits::construct(*message_allocator_.get(), ptr, *message); + unique_ptr_callback_(MessageUniquePtr(ptr, message_deleter_)); } else if (unique_ptr_with_info_callback_) { - auto ptr = MessageAlloc::allocate(*message_allocator_, 1); - MessageAlloc::construct(*message_allocator_, ptr, *message); - unique_ptr_with_info_callback_(UniquePtr(ptr, message_deleter_), message_info); + auto ptr = MessageAllocTraits::allocate(*message_allocator_.get(), 1); + MessageAllocTraits::construct(*message_allocator_.get(), ptr, *message); + unique_ptr_with_info_callback_(MessageUniquePtr(ptr, message_deleter_), message_info); } else { throw std::runtime_error("unexpected message without any callback set"); } } void dispatch_intra_process( - UniquePtr & message, const rmw_message_info_t & message_info) + MessageUniquePtr & message, const rmw_message_info_t & message_info) { (void)message_info; if (shared_ptr_callback_) { @@ -200,7 +201,7 @@ public: } private: - typename MessageAlloc::allocator_type * message_allocator_; + std::shared_ptr message_allocator_; MessageDeleter message_deleter_; }; diff --git a/rclcpp/include/rclcpp/intra_process_manager.hpp b/rclcpp/include/rclcpp/intra_process_manager.hpp index c66d7b6..21739cf 100644 --- a/rclcpp/include/rclcpp/intra_process_manager.hpp +++ b/rclcpp/include/rclcpp/intra_process_manager.hpp @@ -201,9 +201,10 @@ public: throw std::invalid_argument("the calculated buffer size is too large"); } publishers_[id].sequence_number.store(0); - using Deleter = typename publisher::Publisher::MessageDeleter; - publishers_[id].buffer = mapped_ring_buffer::MappedRingBuffer::make_shared( - size); + publishers_[id].buffer = + mapped_ring_buffer::MappedRingBuffer::MessageAlloc>::make_shared( + size, publisher->get_allocator()); publishers_[id].target_subscriptions_by_message_sequence.reserve(size); return id; } @@ -249,11 +250,13 @@ public: * \param message the message that is being stored. * \return the message sequence number. */ - template> + template> uint64_t store_intra_process_message( uint64_t intra_process_publisher_id, - std::unique_ptr & message) + std::unique_ptr::template rebind_alloc, + MessageT>> & message) { auto it = publishers_.find(intra_process_publisher_id); if (it == publishers_.end()) { @@ -263,7 +266,8 @@ public: // Calculate the next message sequence number. uint64_t message_seq = info.sequence_number.fetch_add(1, std::memory_order_relaxed); // Insert the message into the ring buffer using the message_seq to identify it. - typedef typename mapped_ring_buffer::MappedRingBuffer TypedMRB; + using MRBMessageAlloc = typename std::allocator_traits::template rebind_alloc; + typedef typename mapped_ring_buffer::MappedRingBuffer TypedMRB; typename TypedMRB::SharedPtr typed_buffer = std::static_pointer_cast(info.buffer); bool did_replace = typed_buffer->push_and_replace(message_seq, message); // TODO(wjwwood): do something when a message was displaced. log debug? @@ -323,13 +327,15 @@ public: * \param requesting_subscriptions_intra_process_id the subscription's id. * \param message the message typed unique_ptr used to return the message. */ - template> + template> void take_intra_process_message( uint64_t intra_process_publisher_id, uint64_t message_sequence_number, uint64_t requesting_subscriptions_intra_process_id, - std::unique_ptr & message) + std::unique_ptr::template rebind_alloc, + MessageT>> & message) { message = nullptr; PublisherInfo * info; @@ -361,7 +367,8 @@ public: } target_subs->erase(it); } - typedef typename mapped_ring_buffer::MappedRingBuffer TypedMRB; + using MRBMessageAlloc = typename std::allocator_traits::template rebind_alloc; + typedef typename mapped_ring_buffer::MappedRingBuffer TypedMRB; typename TypedMRB::SharedPtr typed_buffer = std::static_pointer_cast(info->buffer); // Return a copy or the unique_ptr (ownership) depending on how many subscriptions are left. if (target_subs->size()) { diff --git a/rclcpp/include/rclcpp/mapped_ring_buffer.hpp b/rclcpp/include/rclcpp/mapped_ring_buffer.hpp index daf6792..9e6d611 100644 --- a/rclcpp/include/rclcpp/mapped_ring_buffer.hpp +++ b/rclcpp/include/rclcpp/mapped_ring_buffer.hpp @@ -15,6 +15,7 @@ #ifndef RCLCPP_RCLCPP_RING_BUFFER_HPP_ #define RCLCPP_RCLCPP_RING_BUFFER_HPP_ +#include #include #include @@ -50,25 +51,35 @@ public: * there is no guarantee on which value is returned if a key is used multiple * times. */ -template> +template> class MappedRingBuffer : public MappedRingBufferBase { public: - RCLCPP_SMART_PTR_DEFINITIONS(MappedRingBuffer); - using ElemUniquePtr = std::unique_ptr; + RCLCPP_SMART_PTR_DEFINITIONS(MappedRingBuffer); + using ElemAllocTraits = allocator::AllocRebind; + using ElemAlloc = typename ElemAllocTraits::allocator_type; + using ElemDeleter = allocator::Deleter; + + using ElemUniquePtr = std::unique_ptr; /// Constructor. /* The constructor will allocate memory while reserving space. * * \param size size of the ring buffer; must be positive and non-zero. */ - MappedRingBuffer(size_t size) + MappedRingBuffer(size_t size, std::shared_ptr allocator = nullptr) : elements_(size), head_(0) { if (size == 0) { throw std::invalid_argument("size must be a positive, non-zero value"); } + if (!allocator) { + allocator_ = std::make_shared(); + } else { + allocator_ = std::make_shared(*allocator.get()); + } } + virtual ~MappedRingBuffer() {} /// Return a copy of the value stored in the ring buffer at the given key. @@ -88,7 +99,9 @@ public: auto it = get_iterator_of_key(key); value = nullptr; if (it != elements_.end() && it->in_use) { - value = ElemUniquePtr(new T(*it->value)); + auto ptr = ElemAllocTraits::allocate(*allocator_.get(), 1); + ElemAllocTraits::construct(*allocator_.get(), ptr, *it->value); + value = ElemUniquePtr(ptr); } } @@ -118,7 +131,9 @@ public: value = nullptr; if (it != elements_.end() && it->in_use) { // Make a copy. - auto copy = ElemUniquePtr(new T(*it->value)); + auto ptr = ElemAllocTraits::allocate(*allocator_.get(), 1); + ElemAllocTraits::construct(*allocator_.get(), ptr, *it->value); + auto copy = ElemUniquePtr(ptr); // Return the original. value.swap(it->value); // Store the copy. @@ -184,7 +199,7 @@ public: } private: - RCLCPP_DISABLE_COPY(MappedRingBuffer); + RCLCPP_DISABLE_COPY(MappedRingBuffer); struct element { @@ -193,7 +208,9 @@ private: bool in_use; }; - typename std::vector::iterator + using VectorAlloc = typename std::allocator_traits::template rebind_alloc; + + typename std::vector::iterator get_iterator_of_key(uint64_t key) { // *INDENT-OFF* (prevent uncrustify from making unecessary indents here) @@ -204,8 +221,9 @@ private: return it; } - std::vector elements_; + std::vector elements_; size_t head_; + std::shared_ptr allocator_; }; diff --git a/rclcpp/include/rclcpp/message_memory_strategy.hpp b/rclcpp/include/rclcpp/message_memory_strategy.hpp index 9076c93..e88c077 100644 --- a/rclcpp/include/rclcpp/message_memory_strategy.hpp +++ b/rclcpp/include/rclcpp/message_memory_strategy.hpp @@ -34,17 +34,18 @@ class MessageMemoryStrategy public: RCLCPP_SMART_PTR_DEFINITIONS(MessageMemoryStrategy); - using MessageAlloc = allocator::AllocRebind; - using MessageDeleter = allocator::Deleter; + using MessageAllocTraits = allocator::AllocRebind; + using MessageAlloc = typename MessageAllocTraits::allocator_type; + using MessageDeleter = allocator::Deleter; MessageMemoryStrategy() { - message_allocator_ = new typename MessageAlloc::allocator_type(); + message_allocator_ = std::make_shared(); } MessageMemoryStrategy(std::shared_ptr allocator) { - message_allocator_ = new typename MessageAlloc::allocator_type(*allocator.get()); + message_allocator_ = std::make_shared(*allocator.get()); } /// Default factory method @@ -57,9 +58,10 @@ public: // \return Shared pointer to the new message. virtual std::shared_ptr borrow_message() { - auto ptr = MessageAlloc::allocate(*message_allocator_, 1); - MessageAlloc::construct(*message_allocator_, ptr); + auto ptr = MessageAllocTraits::allocate(*message_allocator_.get(), 1); + MessageAllocTraits::construct(*message_allocator_.get(), ptr); return std::shared_ptr(ptr, message_deleter_); + //return std::allocate_shared(*message_allocator_); } /// Release ownership of the message, which will deallocate it if it has no more owners. @@ -69,7 +71,7 @@ public: msg.reset(); } - typename MessageAlloc::allocator_type * message_allocator_; + std::shared_ptr message_allocator_; MessageDeleter message_deleter_; }; diff --git a/rclcpp/include/rclcpp/node_impl.hpp b/rclcpp/include/rclcpp/node_impl.hpp index 9f82a31..e82a383 100644 --- a/rclcpp/include/rclcpp/node_impl.hpp +++ b/rclcpp/include/rclcpp/node_impl.hpp @@ -187,8 +187,9 @@ Node::create_publisher( "' is incompatible from the publisher type '" + message_type_info.name() + "'"); } MessageT * typed_message_ptr = static_cast(msg); - std::unique_ptr unique_msg(typed_message_ptr); - uint64_t message_seq = ipm->store_intra_process_message(publisher_id, unique_msg); + using MessageDeleter = typename publisher::Publisher::MessageDeleter; + std::unique_ptr unique_msg(typed_message_ptr); + uint64_t message_seq = ipm->store_intra_process_message(publisher_id, unique_msg); return message_seq; }; // *INDENT-ON* @@ -293,7 +294,7 @@ Node::create_subscription( throw std::runtime_error( "intra process take called after destruction of intra process manager"); } - ipm->take_intra_process_message(publisher_id, message_sequence, subscription_id, message); + ipm->take_intra_process_message(publisher_id, message_sequence, subscription_id, message); }, [weak_ipm](const rmw_gid_t * sender_gid) -> bool { auto ipm = weak_ipm.lock(); diff --git a/rclcpp/include/rclcpp/publisher.hpp b/rclcpp/include/rclcpp/publisher.hpp index 92249d7..f08a4a8 100644 --- a/rclcpp/include/rclcpp/publisher.hpp +++ b/rclcpp/include/rclcpp/publisher.hpp @@ -211,8 +211,9 @@ class Publisher : public PublisherBase friend rclcpp::node::Node; public: - using MessageAlloc = allocator::AllocRebind; - using MessageDeleter = allocator::Deleter; + using MessageAllocTraits = allocator::AllocRebind; + using MessageAlloc = typename MessageAllocTraits::allocator_type; + using MessageDeleter = allocator::Deleter; using MessageUniquePtr = std::unique_ptr; RCLCPP_SMART_PTR_DEFINITIONS(Publisher); @@ -225,8 +226,8 @@ public: std::shared_ptr allocator) : PublisherBase(node_handle, publisher_handle, topic, queue_size) { - message_allocator_ = new typename MessageAlloc::allocator_type(*allocator.get()); - allocator::set_allocator_for_deleter(&message_deleter_, message_allocator_); + message_allocator_ = std::make_shared(*allocator.get()); + allocator::set_allocator_for_deleter(&message_deleter_, message_allocator_.get()); } @@ -237,7 +238,6 @@ public: */ void publish(std::unique_ptr & msg) - //publish(MessageUniquePtr & msg) { this->do_inter_process_publish(msg.get()); if (store_intra_process_message_) { @@ -285,8 +285,8 @@ public: // The intra process manager should probably also be able to store // shared_ptr's and do the "smart" thing based on other intra process // subscriptions. For now call the other publish(). - auto ptr = MessageAlloc::allocate(*message_allocator_, 1); - MessageAlloc::construct(*message_allocator_, ptr, *msg.get()); + auto ptr = MessageAllocTraits::allocate(*message_allocator_.get(), 1); + MessageAllocTraits::construct(*message_allocator_.get(), ptr, *msg.get()); MessageUniquePtr unique_msg(ptr, message_deleter_); return this->publish(unique_msg); } @@ -304,8 +304,8 @@ public: // The intra process manager should probably also be able to store // shared_ptr's and do the "smart" thing based on other intra process // subscriptions. For now call the other publish(). - auto ptr = MessageAlloc::allocate(*message_allocator_, 1); - MessageAlloc::construct(*message_allocator_, ptr, *msg.get()); + auto ptr = MessageAllocTraits::allocate(*message_allocator_.get(), 1); + MessageAllocTraits::construct(*message_allocator_.get(), ptr, *msg.get()); MessageUniquePtr unique_msg(ptr, message_deleter_); return this->publish(unique_msg); } @@ -319,12 +319,17 @@ public: return this->do_inter_process_publish(&msg); } // Otherwise we have to allocate memory in a unique_ptr and pass it along. - auto ptr = MessageAlloc::allocate(*message_allocator_, 1); - MessageAlloc::construct(*message_allocator_, ptr, msg); + auto ptr = MessageAllocTraits::allocate(*message_allocator_.get(), 1); + MessageAllocTraits::construct(*message_allocator_.get(), ptr, msg); MessageUniquePtr unique_msg(ptr, message_deleter_); return this->publish(unique_msg); } + std::shared_ptr get_allocator() const + { + return message_allocator_; + } + protected: void do_inter_process_publish(const MessageT * msg) @@ -338,7 +343,7 @@ protected: } } - typename MessageAlloc::allocator_type * message_allocator_; + std::shared_ptr message_allocator_; MessageDeleter message_deleter_; diff --git a/rclcpp/include/rclcpp/strategies/allocator_memory_strategy.hpp b/rclcpp/include/rclcpp/strategies/allocator_memory_strategy.hpp index daa2e0f..90c6dee 100644 --- a/rclcpp/include/rclcpp/strategies/allocator_memory_strategy.hpp +++ b/rclcpp/include/rclcpp/strategies/allocator_memory_strategy.hpp @@ -35,21 +35,22 @@ namespace allocator_memory_strategy * come through. */ template -class AllocatorMemoryStrategy : public MemoryStrategy +class AllocatorMemoryStrategy : public memory_strategy::MemoryStrategy { public: RCLCPP_SMART_PTR_DEFINITIONS(AllocatorMemoryStrategy); - using ExecAlloc = allocator::AllocRebind; - using ExecDeleter = - allocator::Deleter; - using VoidAlloc = allocator::AllocRebind; + using ExecAllocTraits = allocator::AllocRebind; + using ExecAlloc = typename ExecAllocTraits::allocator_type; + using ExecDeleter = allocator::Deleter; + using VoidAllocTraits = typename allocator::AllocRebind; + using VoidAlloc = typename VoidAllocTraits::allocator_type; AllocatorMemoryStrategy(std::shared_ptr allocator) { - executable_allocator_ = new ExecAlloc(*allocator.get()); - allocator_ = new VoidAlloc(*allocator.get()); + executable_allocator_ = std::make_shared(*allocator.get()); + allocator_ = std::make_shared(*allocator.get()); } /// Borrow memory for storing data for subscriptions, services, clients, or guard conditions. @@ -137,8 +138,8 @@ public: virtual executor::AnyExecutable::SharedPtr instantiate_next_executable() { //return std::make_shared(); - auto ptr = ExecAlloc::allocate(*executable_allocator_, 1); - ExecAlloc::construct(*executable_allocator_); + auto ptr = ExecAllocTraits::allocate(*executable_allocator_.get(), 1); + ExecAllocTraits::construct(*executable_allocator_.get(), ptr); return std::shared_ptr(ptr, executable_deleter_); } @@ -152,7 +153,9 @@ public: if (size == 0) { return NULL; } - return VoidAlloc::allocate(*allocator_, size); + auto ptr = VoidAllocTraits::allocate(*allocator_.get(), size); + alloc_map[ptr] = size; + return ptr; } /// Implementation of a general-purpose free. @@ -161,22 +164,34 @@ public: */ virtual void free(void * ptr) { - VoidAlloc::deallocate(*allocator, ptr); + if (alloc_map.count(ptr) == 0) { + // do nothing, the pointer is not in the alloc'd map + return; + } + VoidAllocTraits::deallocate(*allocator_.get(), &ptr, alloc_map[ptr]); } - std::vector subs; - std::vector services; - std::vector clients; + template + using VectorRebind = typename std::allocator_traits::template rebind_alloc; - std::vector subscription_handles; - std::vector service_handles; - std::vector client_handles; + std::vector> subs; + std::vector> services; + std::vector> clients; + + std::vector subscription_handles; + std::vector service_handles; + std::vector client_handles; std::array guard_cond_handles; + std::unordered_map alloc_map; + private: - typename ExecAlloc::allocator_type * executable_allocator_; + std::shared_ptr executable_allocator_; ExecDeleter executable_deleter_; - typename VoidAlloc::allocator_type * allocator_; + std::shared_ptr allocator_; }; } /* allocator_memory_strategy */ diff --git a/rclcpp/include/rclcpp/subscription.hpp b/rclcpp/include/rclcpp/subscription.hpp index 7f148c3..fbdb100 100644 --- a/rclcpp/include/rclcpp/subscription.hpp +++ b/rclcpp/include/rclcpp/subscription.hpp @@ -150,8 +150,9 @@ class Subscription : public SubscriptionBase friend class rclcpp::node::Node; public: - using MessageAlloc = allocator::AllocRebind; - using MessageDeleter = allocator::Deleter; + using MessageAllocTraits = allocator::AllocRebind; + using MessageAlloc = typename MessageAllocTraits::allocator_type; + using MessageDeleter = allocator::Deleter; using MessageUniquePtr = std::unique_ptr; RCLCPP_SMART_PTR_DEFINITIONS(Subscription); diff --git a/rclcpp/test/test_intra_process_manager.cpp b/rclcpp/test/test_intra_process_manager.cpp index df460c9..24d3f9b 100644 --- a/rclcpp/test/test_intra_process_manager.cpp +++ b/rclcpp/test/test_intra_process_manager.cpp @@ -59,10 +59,21 @@ template> class Publisher : public PublisherBase { public: - using MessageAlloc = allocator::AllocRebind; - using MessageDeleter = allocator::Deleter; + using MessageAllocTraits = allocator::AllocRebind; + using MessageAlloc = typename MessageAllocTraits::allocator_type; + using MessageDeleter = allocator::Deleter; + using MessageUniquePtr = std::unique_ptr; + std::shared_ptr allocator_; RCLCPP_SMART_PTR_DEFINITIONS(Publisher); + + Publisher() { + allocator_ = std::make_shared(); + } + + std::shared_ptr get_allocator() { + return allocator_; + } }; }