diff --git a/rclcpp/include/rclcpp/allocator/allocator_common.hpp b/rclcpp/include/rclcpp/allocator/allocator_common.hpp new file mode 100644 index 0000000..6f980b6 --- /dev/null +++ b/rclcpp/include/rclcpp/allocator/allocator_common.hpp @@ -0,0 +1,33 @@ +// Copyright 2015 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef RCLCPP_RCLCPP_ALLOCATOR_COMMON_HPP_ +#define RCLCPP_RCLCPP_ALLOCATOR_COMMON_HPP_ + +#include + +#include + +namespace rclcpp +{ +namespace allocator +{ + +template +using AllocRebind = typename std::allocator_traits::template rebind_traits; + +} +} + +#endif diff --git a/rclcpp/include/rclcpp/allocator/allocator_deleter.hpp b/rclcpp/include/rclcpp/allocator/allocator_deleter.hpp index 160c4e5..d725f2b 100644 --- a/rclcpp/include/rclcpp/allocator/allocator_deleter.hpp +++ b/rclcpp/include/rclcpp/allocator/allocator_deleter.hpp @@ -23,9 +23,12 @@ namespace rclcpp namespace allocator { -template +template class AllocatorDeleter { + template + using AllocRebind = typename std::allocator_traits::template rebind_alloc; + public: AllocatorDeleter() : allocator_(NULL) @@ -37,16 +40,17 @@ public: { } - template - AllocatorDeleter(const AllocatorDeleter & a) + template + AllocatorDeleter(const AllocatorDeleter & a) { allocator_ = a.get_allocator(); } - void operator()(T * ptr) + template + void operator()(U * ptr) { - std::allocator_traits::destroy(*allocator_, ptr); - std::allocator_traits::deallocate(*allocator_, ptr, sizeof(T)); + std::allocator_traits>::destroy(*allocator_, ptr); + std::allocator_traits>::deallocate(*allocator_, ptr, sizeof(U)); ptr = NULL; } @@ -64,61 +68,36 @@ private: Allocator * allocator_; }; -/* -template class Alloc, typename T, typename D> -D initialize_deleter(Alloc * alloc) +template +void set_allocator_for_deleter(D * deleter, Alloc * alloc) { (void) alloc; + (void) deleter; throw std::runtime_error("Reached unexpected template specialization"); } -template -std::default_delete initialize_deleter(std::allocator * alloc) -{ - (void) alloc; - return std::default_delete(); -} - -template class Alloc, typename T> -AllocatorDeleter> initialize_deleter(Alloc * alloc) -{ - if (!alloc) { - throw std::invalid_argument("Allocator argument was NULL"); - } - return AllocatorDeleter>(alloc); -} -*/ -template class Alloc, typename T, typename D> -void set_allocator_for_deleter(D * deleter, Alloc * alloc) -{ - (void) alloc; - throw std::runtime_error("Reached unexpected template specialization"); -} - -template -void set_allocator_for_deleter(std::default_delete * deleter, std::allocator * alloc) +template +void set_allocator_for_deleter(std::default_delete * deleter, std::allocator * alloc) { (void) deleter; (void) alloc; } -template class Alloc, typename T> -void set_allocator_for_deleter(AllocatorDeleter> * deleter, Alloc * alloc) +template +void set_allocator_for_deleter(AllocatorDeleter * deleter, Alloc * alloc) { if (!deleter || !alloc) { throw std::invalid_argument("Argument was NULL to set_allocator_for_deleter"); } - //return AllocatorDeleter>(alloc); deleter->set_allocator(alloc); } template using Deleter = typename std::conditional< - std::is_same>::value, + std::is_same>::value, std::default_delete, - AllocatorDeleter + AllocatorDeleter >::type; - } } diff --git a/rclcpp/include/rclcpp/allocator/allocator_factory.hpp b/rclcpp/include/rclcpp/allocator/allocator_factory.hpp deleted file mode 100644 index 2a98b1f..0000000 --- a/rclcpp/include/rclcpp/allocator/allocator_factory.hpp +++ /dev/null @@ -1,52 +0,0 @@ -// Copyright 2015 Open Source Robotics Foundation, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#ifndef RCLCPP_RCLCPP_ALLOCATOR_FACTORY_HPP_ -#define RCLCPP_RCLCPP_ALLOCATOR_FACTORY_HPP_ - -#include -#include -#include - -template class Alloc, typename T, typename D> -D initialize_deleter(Alloc * alloc) -{ - (void) alloc; - throw std::runtime_error("Reached unexpected template specialization"); -} - -template -std::default_delete initialize_deleter(std::allocator * alloc) -{ - (void) alloc; - return std::default_delete(); -} - -template class Alloc, typename T> -AllocatorDeleter> initialize_deleter(Alloc * alloc) -{ - if (!alloc) { - throw std::invalid_argument("Allocator argument was NULL"); - } - return AllocatorDeleter>(alloc); -} - -template class Alloc> -using Deleter = typename std::conditional< - std::is_same, std::allocator>::value, - std::default_delete, - AllocatorDeleter> - >::type; - -#endif diff --git a/rclcpp/include/rclcpp/allocator/allocator_wrapper.hpp b/rclcpp/include/rclcpp/allocator/allocator_wrapper.hpp deleted file mode 100644 index 99fc421..0000000 --- a/rclcpp/include/rclcpp/allocator/allocator_wrapper.hpp +++ /dev/null @@ -1,149 +0,0 @@ -// Copyright 2015 Open Source Robotics Foundation, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#ifndef RCLCPP_RCLCPP_ALLOCATOR_WRAPPER_HPP_ -#define RCLCPP_RCLCPP_ALLOCATOR_WRAPPER_HPP_ - -#include -#include -#include - -namespace rclcpp -{ - -namespace allocator -{ - -// Type-erased interface to AllocatorWrapper -class AllocatorWrapper -{ - virtual void * allocate(size_t size) = 0; - virtual void deallocate(void * pointer, size_t size) = 0; - // Construct will have to be through placement new, since pure virtual function can't be templated - - virtual void destroy(T* pointer) = 0; -} - -template -class TypedAllocatorWrapper : public AllocatorWrapper -{ -public: - /* - using Deleter = typename std::conditional< - std::is_same>::value, - std::default_delete, - AllocatorDeleter - >::type; - */ - - using DeleterT = Deleter; - - TypedAllocatorWrapper(Alloc * allocator) - : allocator_(allocator) - { - if (!allocator_) { - throw std::invalid_argument("Allocator argument was NULL"); - } - initialize_deleter(deleter_, allocator_); - if (deleter_ == NULL) { - throw std::invalid_argument("Failed to initialize deleter"); - } - } - - TypedAllocatorWrapper(Alloc * allocator, DeleterT * deleter) - : allocator_(allocator), deleter_(deleter) - { - if (!allocator_) { - throw std::invalid_argument("Allocator argument was NULL"); - } - if (!deleter_) { - throw std::invalid_argument("Deleter argument was NULL"); - } - } - - TypedAllocatorWrapper(Alloc & allocator) - { - allocator_ = &allocator; - if (!allocator_) { - throw std::invalid_argument("Allocator argument was NULL"); - } - initialize_deleter(deleter_, allocator_); - if (!deleter_) { - throw std::invalid_argument("Failed to initialize deleter"); - } - } - - TypedAllocatorWrapper() - { - allocator_ = new Alloc(); - initialize_deleter(deleter_, allocator_); - if (!deleter_) { - //throw std::invalid_argument("Failed to initialize deleter"); - deleter_ = new DeleterT; - } - } - - void * allocate(size_t size) - { - return std::allocator_traits::allocate(*allocator_, size); - } - - void deallocate(void * pointer, size_t size) - { - deallocate(static_cast(pointer), size); - } - - void deallocate(T * pointer, size_t size) - { - std::allocator_traits::deallocate(*allocator_, pointer, size); - } - - template - void construct(T * pointer, Args && ... args) - { - std::allocator_traits::construct(*allocator_, pointer, std::forward(args)...); - } - - void destroy(void * pointer) - { - destroy(static_cast(pointer)); - } - - template - void destroy(T * pointer) - { - std::allocator_traits::destroy(*allocator_, pointer); - } - - DeleterT * get_deleter() const - { - return deleter_; - } - Alloc * get_underlying_allocator() const - { - return allocator_; - } - -private: - Alloc * allocator_; - DeleterT * deleter_; -}; - -template -using DefaultAllocator = TypedAllocatorWrapper>; - -} -} - -#endif diff --git a/rclcpp/include/rclcpp/any_subscription_callback.hpp b/rclcpp/include/rclcpp/any_subscription_callback.hpp index fdec603..7b420f6 100644 --- a/rclcpp/include/rclcpp/any_subscription_callback.hpp +++ b/rclcpp/include/rclcpp/any_subscription_callback.hpp @@ -15,6 +15,7 @@ #ifndef RCLCPP_RCLCPP_ANY_SUBSCRIPTION_CALLBACK_HPP_ #define RCLCPP_RCLCPP_ANY_SUBSCRIPTION_CALLBACK_HPP_ +#include #include #include @@ -29,18 +30,21 @@ namespace rclcpp namespace any_subscription_callback { -template +template class AnySubscriptionCallback { + using MessageAlloc = allocator::AllocRebind; + using MessageDeleter = allocator::Deleter; + using UniquePtr = std::unique_ptr; + using SharedPtrCallback = std::function)>; using SharedPtrWithInfoCallback = std::function, const rmw_message_info_t &)>; using ConstSharedPtrCallback = std::function)>; using ConstSharedPtrWithInfoCallback = std::function, const rmw_message_info_t &)>; - using UniquePtrCallback = std::function)>; - using UniquePtrWithInfoCallback = - std::function, const rmw_message_info_t &)>; + using UniquePtrCallback = std::function; + using UniquePtrWithInfoCallback = std::function; SharedPtrCallback shared_ptr_callback_; SharedPtrWithInfoCallback shared_ptr_with_info_callback_; @@ -50,11 +54,14 @@ class AnySubscriptionCallback UniquePtrWithInfoCallback unique_ptr_with_info_callback_; public: - AnySubscriptionCallback() + AnySubscriptionCallback(std::shared_ptr allocator) : shared_ptr_callback_(nullptr), shared_ptr_with_info_callback_(nullptr), const_shared_ptr_callback_(nullptr), const_shared_ptr_with_info_callback_(nullptr), unique_ptr_callback_(nullptr), unique_ptr_with_info_callback_(nullptr) - {} + { + message_allocator_ = new typename MessageAlloc::allocator_type(*allocator.get()); + allocator::set_allocator_for_deleter(&message_deleter_, message_allocator_); + } AnySubscriptionCallback(const AnySubscriptionCallback &) = default; @@ -155,17 +162,20 @@ public: } else if (const_shared_ptr_with_info_callback_) { const_shared_ptr_with_info_callback_(message, message_info); } else if (unique_ptr_callback_) { - unique_ptr_callback_(std::unique_ptr(new MessageT(*message))); + auto ptr = MessageAlloc::allocate(*message_allocator_, 1); + MessageAlloc::construct(*message_allocator_, ptr, *message); + unique_ptr_callback_(UniquePtr(ptr, message_deleter_)); } else if (unique_ptr_with_info_callback_) { - unique_ptr_with_info_callback_(std::unique_ptr(new MessageT(* - message)), message_info); + auto ptr = MessageAlloc::allocate(*message_allocator_, 1); + MessageAlloc::construct(*message_allocator_, ptr, *message); + unique_ptr_with_info_callback_(UniquePtr(ptr, message_deleter_), message_info); } else { throw std::runtime_error("unexpected message without any callback set"); } } void dispatch_intra_process( - std::unique_ptr & message, const rmw_message_info_t & message_info) + UniquePtr & message, const rmw_message_info_t & message_info) { (void)message_info; if (shared_ptr_callback_) { @@ -188,6 +198,10 @@ public: throw std::runtime_error("unexpected message without any callback set"); } } + +private: + typename MessageAlloc::allocator_type * message_allocator_; + MessageDeleter message_deleter_; }; } /* namespace any_subscription_callback */ diff --git a/rclcpp/include/rclcpp/intra_process_manager.hpp b/rclcpp/include/rclcpp/intra_process_manager.hpp index 340b527..c66d7b6 100644 --- a/rclcpp/include/rclcpp/intra_process_manager.hpp +++ b/rclcpp/include/rclcpp/intra_process_manager.hpp @@ -188,9 +188,9 @@ public: * \param buffer_size if 0 (default) a size is calculated based on the QoS. * \return an unsigned 64-bit integer which is the publisher's unique id. */ - template + template uint64_t - add_publisher(typename publisher::Publisher::SharedPtr publisher, + add_publisher(typename publisher::Publisher::SharedPtr publisher, size_t buffer_size = 0) { auto id = IntraProcessManager::get_next_unique_id(); @@ -201,7 +201,9 @@ public: throw std::invalid_argument("the calculated buffer size is too large"); } publishers_[id].sequence_number.store(0); - publishers_[id].buffer = mapped_ring_buffer::MappedRingBuffer::make_shared(size); + using Deleter = typename publisher::Publisher::MessageDeleter; + publishers_[id].buffer = mapped_ring_buffer::MappedRingBuffer::make_shared( + size); publishers_[id].target_subscriptions_by_message_sequence.reserve(size); return id; } @@ -247,11 +249,11 @@ public: * \param message the message that is being stored. * \return the message sequence number. */ - template + template> uint64_t store_intra_process_message( uint64_t intra_process_publisher_id, - std::unique_ptr & message) + std::unique_ptr & message) { auto it = publishers_.find(intra_process_publisher_id); if (it == publishers_.end()) { @@ -261,7 +263,7 @@ public: // Calculate the next message sequence number. uint64_t message_seq = info.sequence_number.fetch_add(1, std::memory_order_relaxed); // Insert the message into the ring buffer using the message_seq to identify it. - typedef typename mapped_ring_buffer::MappedRingBuffer TypedMRB; + typedef typename mapped_ring_buffer::MappedRingBuffer TypedMRB; typename TypedMRB::SharedPtr typed_buffer = std::static_pointer_cast(info.buffer); bool did_replace = typed_buffer->push_and_replace(message_seq, message); // TODO(wjwwood): do something when a message was displaced. log debug? @@ -321,13 +323,13 @@ public: * \param requesting_subscriptions_intra_process_id the subscription's id. * \param message the message typed unique_ptr used to return the message. */ - template + template> void take_intra_process_message( uint64_t intra_process_publisher_id, uint64_t message_sequence_number, uint64_t requesting_subscriptions_intra_process_id, - std::unique_ptr & message) + std::unique_ptr & message) { message = nullptr; PublisherInfo * info; @@ -359,7 +361,7 @@ public: } target_subs->erase(it); } - typedef typename mapped_ring_buffer::MappedRingBuffer TypedMRB; + typedef typename mapped_ring_buffer::MappedRingBuffer TypedMRB; typename TypedMRB::SharedPtr typed_buffer = std::static_pointer_cast(info->buffer); // Return a copy or the unique_ptr (ownership) depending on how many subscriptions are left. if (target_subs->size()) { diff --git a/rclcpp/include/rclcpp/mapped_ring_buffer.hpp b/rclcpp/include/rclcpp/mapped_ring_buffer.hpp index 4b5bb47..daf6792 100644 --- a/rclcpp/include/rclcpp/mapped_ring_buffer.hpp +++ b/rclcpp/include/rclcpp/mapped_ring_buffer.hpp @@ -50,11 +50,12 @@ public: * there is no guarantee on which value is returned if a key is used multiple * times. */ -template +template> class MappedRingBuffer : public MappedRingBufferBase { public: - RCLCPP_SMART_PTR_DEFINITIONS(MappedRingBuffer); + RCLCPP_SMART_PTR_DEFINITIONS(MappedRingBuffer); + using ElemUniquePtr = std::unique_ptr; /// Constructor. /* The constructor will allocate memory while reserving space. @@ -82,12 +83,12 @@ public: * \param value if the key is found, the value is stored in this parameter */ void - get_copy_at_key(uint64_t key, std::unique_ptr & value) + get_copy_at_key(uint64_t key, ElemUniquePtr & value) { auto it = get_iterator_of_key(key); value = nullptr; if (it != elements_.end() && it->in_use) { - value = std::unique_ptr(new T(*it->value)); + value = ElemUniquePtr(new T(*it->value)); } } @@ -111,13 +112,13 @@ public: * \param value if the key is found, the value is stored in this parameter */ void - get_ownership_at_key(uint64_t key, std::unique_ptr & value) + get_ownership_at_key(uint64_t key, ElemUniquePtr & value) { auto it = get_iterator_of_key(key); value = nullptr; if (it != elements_.end() && it->in_use) { // Make a copy. - auto copy = std::unique_ptr(new T(*it->value)); + auto copy = ElemUniquePtr(new T(*it->value)); // Return the original. value.swap(it->value); // Store the copy. @@ -136,7 +137,7 @@ public: * \param value if the key is found, the value is stored in this parameter */ void - pop_at_key(uint64_t key, std::unique_ptr & value) + pop_at_key(uint64_t key, ElemUniquePtr & value) { auto it = get_iterator_of_key(key); value = nullptr; @@ -158,7 +159,7 @@ public: * \param value the value to store, and optionally the value displaced */ bool - push_and_replace(uint64_t key, std::unique_ptr & value) + push_and_replace(uint64_t key, ElemUniquePtr & value) { bool did_replace = elements_[head_].in_use; elements_[head_].key = key; @@ -169,9 +170,9 @@ public: } bool - push_and_replace(uint64_t key, std::unique_ptr && value) + push_and_replace(uint64_t key, ElemUniquePtr && value) { - std::unique_ptr temp = std::move(value); + ElemUniquePtr temp = std::move(value); return push_and_replace(key, temp); } @@ -183,12 +184,12 @@ public: } private: - RCLCPP_DISABLE_COPY(MappedRingBuffer); + RCLCPP_DISABLE_COPY(MappedRingBuffer); struct element { uint64_t key; - std::unique_ptr value; + ElemUniquePtr value; bool in_use; }; diff --git a/rclcpp/include/rclcpp/message_memory_strategy.hpp b/rclcpp/include/rclcpp/message_memory_strategy.hpp index 3ef46d8..25c8c2d 100644 --- a/rclcpp/include/rclcpp/message_memory_strategy.hpp +++ b/rclcpp/include/rclcpp/message_memory_strategy.hpp @@ -17,6 +17,7 @@ #include +#include #include namespace rclcpp @@ -26,24 +27,34 @@ namespace message_memory_strategy /// Default allocation strategy for messages received by subscriptions. // A message memory strategy must be templated on the type of the subscription it belongs to. -template +template> class MessageMemoryStrategy { public: RCLCPP_SMART_PTR_DEFINITIONS(MessageMemoryStrategy); + using MessageAlloc = allocator::AllocRebind; + using MessageDeleter = allocator::Deleter; + + MessageMemoryStrategy(std::shared_ptr allocator) + { + message_allocator_ = new typename MessageAlloc::allocator_type(*allocator.get()); + } + /// Default factory method static SharedPtr create_default() { - return SharedPtr(new MessageMemoryStrategy); + return std::make_shared>(std::make_shared()); } /// By default, dynamically allocate a new message. // \return Shared pointer to the new message. virtual std::shared_ptr borrow_message() { - return std::shared_ptr(new MessageT); + auto ptr = MessageAlloc::allocate(*message_allocator_, 1); + MessageAlloc::construct(*message_allocator_, ptr); + return std::shared_ptr(ptr, message_deleter_); } /// Release ownership of the message, which will deallocate it if it has no more owners. @@ -52,6 +63,9 @@ public: { msg.reset(); } + + typename MessageAlloc::allocator_type * message_allocator_; + MessageDeleter message_deleter_; }; } /* message_memory_strategy */ diff --git a/rclcpp/include/rclcpp/node.hpp b/rclcpp/include/rclcpp/node.hpp index e320651..e7d3e28 100644 --- a/rclcpp/include/rclcpp/node.hpp +++ b/rclcpp/include/rclcpp/node.hpp @@ -91,10 +91,11 @@ public: * \param[in] qos_history_depth The depth of the publisher message queue. * \return Shared pointer to the created publisher. */ - template - typename rclcpp::publisher::Publisher::SharedPtr + template> + typename rclcpp::publisher::Publisher::SharedPtr create_publisher( - const std::string & topic_name, size_t qos_history_depth); + const std::string & topic_name, size_t qos_history_depth, + std::shared_ptr allocator = nullptr); /// Create and return a Publisher. /** @@ -102,11 +103,12 @@ public: * \param[in] qos_profile The quality of service profile to pass on to the rmw implementation. * \return Shared pointer to the created publisher. */ - template - typename rclcpp::publisher::Publisher::SharedPtr + template> + typename rclcpp::publisher::Publisher::SharedPtr create_publisher( const std::string & topic_name, - const rmw_qos_profile_t & qos_profile = rmw_qos_profile_default); + const rmw_qos_profile_t & qos_profile = rmw_qos_profile_default, + std::shared_ptr allocator = nullptr); /// Create and return a Subscription. /** @@ -122,16 +124,17 @@ public: Windows build breaks when static member function passed as default argument to msg_mem_strat, nullptr is a workaround. */ - template - typename rclcpp::subscription::Subscription::SharedPtr + template> + typename rclcpp::subscription::Subscription::SharedPtr create_subscription( const std::string & topic_name, CallbackT callback, const rmw_qos_profile_t & qos_profile = rmw_qos_profile_default, rclcpp::callback_group::CallbackGroup::SharedPtr group = nullptr, bool ignore_local_publications = false, - typename rclcpp::message_memory_strategy::MessageMemoryStrategy::SharedPtr - msg_mem_strat = nullptr); + typename rclcpp::message_memory_strategy::MessageMemoryStrategy::SharedPtr + msg_mem_strat = nullptr, + std::shared_ptr allocator = nullptr); /// Create and return a Subscription. /** @@ -147,16 +150,17 @@ public: Windows build breaks when static member function passed as default argument to msg_mem_strat, nullptr is a workaround. */ - template - typename rclcpp::subscription::Subscription::SharedPtr + template> + typename rclcpp::subscription::Subscription::SharedPtr create_subscription( const std::string & topic_name, size_t qos_history_depth, CallbackT callback, rclcpp::callback_group::CallbackGroup::SharedPtr group = nullptr, bool ignore_local_publications = false, - typename rclcpp::message_memory_strategy::MessageMemoryStrategy::SharedPtr - msg_mem_strat = nullptr); + typename rclcpp::message_memory_strategy::MessageMemoryStrategy::SharedPtr + msg_mem_strat = nullptr, + std::shared_ptr allocator = nullptr); /// Create a timer. /** @@ -202,34 +206,25 @@ public: FunctorT callback, rclcpp::callback_group::CallbackGroup::SharedPtr group = nullptr); - template - using StringRebind = typename Alloc::template rebind::other; + std::vector set_parameters( + const std::vector & parameters); - template> - typename std::vector set_parameters( - const typename std::vector & parameters); - - template> rcl_interfaces::msg::SetParametersResult set_parameters_atomically( - const typename std::vector & parameters); + const std::vector & parameters); - template> - typename std::vector get_parameters( - const typename std::vector> & names) const; + std::vector get_parameters( + const std::vector & names) const; - template> - typename std::vector describe_parameters( - const typename std::vector>> & names) const; + std::vector describe_parameters( + const std::vector & names) const; - template> - typename std::vector get_parameter_types( - const typename std::vector>> & names) const; + std::vector get_parameter_types( + const std::vector & names) const; rcl_interfaces::msg::ListParametersResult list_parameters( const std::vector & prefixes, uint64_t depth) const; - template>> - typename std::map get_topic_names_and_types() const; + std::map get_topic_names_and_types() const; size_t count_publishers(const std::string & topic_name) const; diff --git a/rclcpp/include/rclcpp/node_impl.hpp b/rclcpp/include/rclcpp/node_impl.hpp index f568536..9f82a31 100644 --- a/rclcpp/include/rclcpp/node_impl.hpp +++ b/rclcpp/include/rclcpp/node_impl.hpp @@ -113,21 +113,29 @@ Node::create_callback_group( return group; } -template -typename rclcpp::publisher::Publisher::SharedPtr +template +typename rclcpp::publisher::Publisher::SharedPtr Node::create_publisher( - const std::string & topic_name, size_t qos_history_depth) + const std::string & topic_name, size_t qos_history_depth, + std::shared_ptr allocator) { + if (!allocator) { + allocator = std::make_shared(); + } rmw_qos_profile_t qos = rmw_qos_profile_default; qos.depth = qos_history_depth; - return this->create_publisher(topic_name, qos); + return this->create_publisher(topic_name, qos, allocator); } -template -typename publisher::Publisher::SharedPtr +template +typename publisher::Publisher::SharedPtr Node::create_publisher( - const std::string & topic_name, const rmw_qos_profile_t & qos_profile) + const std::string & topic_name, const rmw_qos_profile_t & qos_profile, + std::shared_ptr allocator) { + if (!allocator) { + allocator = std::make_shared(); + } using rosidl_generator_cpp::get_message_type_support_handle; auto type_support_handle = get_message_type_support_handle(); rmw_publisher_t * publisher_handle = rmw_create_publisher( @@ -140,8 +148,8 @@ Node::create_publisher( // *INDENT-ON* } - auto publisher = publisher::Publisher::make_shared( - node_handle_, publisher_handle, topic_name, qos_profile.depth); + auto publisher = publisher::Publisher::make_shared( + node_handle_, publisher_handle, topic_name, qos_profile.depth, allocator); if (use_intra_process_comms_) { rmw_publisher_t * intra_process_publisher_handle = rmw_create_publisher( @@ -157,7 +165,7 @@ Node::create_publisher( auto intra_process_manager = context_->get_sub_context(); uint64_t intra_process_publisher_id = - intra_process_manager->add_publisher(publisher); + intra_process_manager->add_publisher(publisher); rclcpp::intra_process_manager::IntraProcessManager::WeakPtr weak_ipm = intra_process_manager; // *INDENT-OFF* auto shared_publish_callback = @@ -205,25 +213,31 @@ Node::group_in_node(callback_group::CallbackGroup::SharedPtr group) return group_belongs_to_this_node; } -template -typename rclcpp::subscription::Subscription::SharedPtr +template +typename rclcpp::subscription::Subscription::SharedPtr Node::create_subscription( const std::string & topic_name, CallbackT callback, const rmw_qos_profile_t & qos_profile, rclcpp::callback_group::CallbackGroup::SharedPtr group, bool ignore_local_publications, - typename rclcpp::message_memory_strategy::MessageMemoryStrategy::SharedPtr - msg_mem_strat) + typename rclcpp::message_memory_strategy::MessageMemoryStrategy::SharedPtr + msg_mem_strat, + typename std::shared_ptr allocator) { - rclcpp::subscription::AnySubscriptionCallback any_subscription_callback; + if (!allocator) { + allocator = std::make_shared(); + } + + rclcpp::subscription::AnySubscriptionCallback any_subscription_callback(allocator); any_subscription_callback.set(callback); using rosidl_generator_cpp::get_message_type_support_handle; if (!msg_mem_strat) { msg_mem_strat = - rclcpp::message_memory_strategy::MessageMemoryStrategy::create_default(); + rclcpp::message_memory_strategy::MessageMemoryStrategy::create_default(); } auto type_support_handle = get_message_type_support_handle(); @@ -239,7 +253,7 @@ Node::create_subscription( using namespace rclcpp::subscription; - auto sub = Subscription::make_shared( + auto sub = Subscription::make_shared( node_handle_, subscriber_handle, topic_name, @@ -271,7 +285,7 @@ Node::create_subscription( uint64_t publisher_id, uint64_t message_sequence, uint64_t subscription_id, - std::unique_ptr & message) + typename Subscription::MessageUniquePtr & message) { auto ipm = weak_ipm.lock(); if (!ipm) { @@ -306,26 +320,28 @@ Node::create_subscription( return sub; } -template -typename rclcpp::subscription::Subscription::SharedPtr +template +typename rclcpp::subscription::Subscription::SharedPtr Node::create_subscription( const std::string & topic_name, size_t qos_history_depth, CallbackT callback, rclcpp::callback_group::CallbackGroup::SharedPtr group, bool ignore_local_publications, - typename rclcpp::message_memory_strategy::MessageMemoryStrategy::SharedPtr - msg_mem_strat) + typename rclcpp::message_memory_strategy::MessageMemoryStrategy::SharedPtr + msg_mem_strat, + std::shared_ptr allocator) { rmw_qos_profile_t qos = rmw_qos_profile_default; qos.depth = qos_history_depth; - return this->create_subscription( + return this->create_subscription( topic_name, callback, qos, group, ignore_local_publications, - msg_mem_strat); + msg_mem_strat, + allocator); } rclcpp::timer::WallTimer::SharedPtr @@ -443,7 +459,6 @@ Node::create_service( return serv; } -template std::vector Node::set_parameters( const std::vector & parameters) @@ -456,7 +471,6 @@ Node::set_parameters( return results; } -template rcl_interfaces::msg::SetParametersResult Node::set_parameters_atomically( const std::vector & parameters) @@ -491,7 +505,6 @@ Node::set_parameters_atomically( return result; } -template std::vector Node::get_parameters( const std::vector & names) const @@ -511,7 +524,6 @@ Node::get_parameters( return results; } -template std::vector Node::describe_parameters( const std::vector & names) const @@ -532,7 +544,6 @@ Node::describe_parameters( return results; } -template std::vector Node::get_parameter_types( const std::vector & names) const diff --git a/rclcpp/include/rclcpp/publisher.hpp b/rclcpp/include/rclcpp/publisher.hpp index 5670059..0c706e1 100644 --- a/rclcpp/include/rclcpp/publisher.hpp +++ b/rclcpp/include/rclcpp/publisher.hpp @@ -205,23 +205,28 @@ protected: }; /// A publisher publishes messages of any type to a topic. -template class Alloc = std::allocator> +template> class Publisher : public PublisherBase { friend rclcpp::node::Node; public: + using MessageAlloc = allocator::AllocRebind; + using MessageDeleter = allocator::Deleter; + using MessageUniquePtr = std::unique_ptr; + RCLCPP_SMART_PTR_DEFINITIONS(Publisher); Publisher( std::shared_ptr node_handle, rmw_publisher_t * publisher_handle, std::string topic, - size_t queue_size) + size_t queue_size, + std::shared_ptr allocator) : PublisherBase(node_handle, publisher_handle, topic, queue_size) { - // TODO: avoid messy initialization - allocator::set_allocator_for_deleter(&message_deleter_, &message_allocator_); + message_allocator_ = new typename MessageAlloc::allocator_type(*allocator.get()); + allocator::set_allocator_for_deleter(&message_deleter_, message_allocator_); } @@ -231,7 +236,7 @@ public: * \param[in] msg A shared pointer to the message to send. */ void - publish(std::unique_ptr & msg) + publish(MessageUniquePtr & msg) { this->do_inter_process_publish(msg.get()); if (store_intra_process_message_) { @@ -279,9 +284,9 @@ public: // The intra process manager should probably also be able to store // shared_ptr's and do the "smart" thing based on other intra process // subscriptions. For now call the other publish(). - auto ptr = std::allocator_traits>::allocate(message_allocator_, 1); - std::allocator_traits>::construct(message_allocator_, ptr, *msg.get()); - std::unique_ptr, MessageT>> unique_msg(ptr, message_deleter_); + auto ptr = MessageAlloc::allocate(*message_allocator_, 1); + MessageAlloc::construct(*message_allocator_, ptr, *msg.get()); + MessageUniquePtr unique_msg(ptr, message_deleter_); return this->publish(unique_msg); } @@ -298,9 +303,9 @@ public: // The intra process manager should probably also be able to store // shared_ptr's and do the "smart" thing based on other intra process // subscriptions. For now call the other publish(). - auto ptr = std::allocator_traits>::allocate(message_allocator_, 1); - std::allocator_traits>::construct(message_allocator_, ptr, *msg.get()); - std::unique_ptr, MessageT>> unique_msg(ptr, message_deleter_); + auto ptr = MessageAlloc::allocate(*message_allocator_, 1); + MessageAlloc::construct(*message_allocator_, ptr, *msg.get()); + MessageUniquePtr unique_msg(ptr, message_deleter_); return this->publish(unique_msg); } @@ -313,9 +318,9 @@ public: return this->do_inter_process_publish(&msg); } // Otherwise we have to allocate memory in a unique_ptr and pass it along. - auto ptr = std::allocator_traits>::allocate(message_allocator_, 1); - std::allocator_traits>::construct(message_allocator_, ptr, msg); - std::unique_ptr, MessageT>> unique_msg(ptr, message_deleter_); + auto ptr = MessageAlloc::allocate(*message_allocator_, 1); + MessageAlloc::construct(*message_allocator_, ptr, msg); + MessageUniquePtr unique_msg(ptr, message_deleter_); return this->publish(unique_msg); } @@ -332,9 +337,9 @@ protected: } } - Alloc message_allocator_; + typename MessageAlloc::allocator_type * message_allocator_; - allocator::Deleter, MessageT> message_deleter_; + MessageDeleter message_deleter_; }; diff --git a/rclcpp/include/rclcpp/subscription.hpp b/rclcpp/include/rclcpp/subscription.hpp index b7250e6..7f148c3 100644 --- a/rclcpp/include/rclcpp/subscription.hpp +++ b/rclcpp/include/rclcpp/subscription.hpp @@ -144,12 +144,16 @@ private: using namespace any_subscription_callback; /// Subscription implementation, templated on the type of message this subscription receives. -template +template> class Subscription : public SubscriptionBase { friend class rclcpp::node::Node; public: + using MessageAlloc = allocator::AllocRebind; + using MessageDeleter = allocator::Deleter; + using MessageUniquePtr = std::unique_ptr; + RCLCPP_SMART_PTR_DEFINITIONS(Subscription); /// Default constructor. @@ -167,15 +171,17 @@ public: rmw_subscription_t * subscription_handle, const std::string & topic_name, bool ignore_local_publications, - AnySubscriptionCallback callback, - typename message_memory_strategy::MessageMemoryStrategy::SharedPtr memory_strategy = - message_memory_strategy::MessageMemoryStrategy::create_default()) + AnySubscriptionCallback callback, + typename message_memory_strategy::MessageMemoryStrategy::SharedPtr memory_strategy = + message_memory_strategy::MessageMemoryStrategy::create_default()) : SubscriptionBase(node_handle, subscription_handle, topic_name, ignore_local_publications), any_callback_(callback), message_memory_strategy_(memory_strategy), get_intra_process_message_callback_(nullptr), matches_any_intra_process_publishers_(nullptr) - {} + { + } /// Support dynamically setting the message memory strategy. /** @@ -183,7 +189,8 @@ public: * \param[in] message_memory_strategy Shared pointer to the memory strategy to set. */ void set_message_memory_strategy( - typename message_memory_strategy::MessageMemoryStrategy::SharedPtr message_memory_strategy) + typename message_memory_strategy::MessageMemoryStrategy::SharedPtr message_memory_strategy) { message_memory_strategy_ = message_memory_strategy; } @@ -226,7 +233,7 @@ public: // However, this can only really happen if this node has it disabled, but the other doesn't. return; } - std::unique_ptr msg; + MessageUniquePtr msg; get_intra_process_message_callback_( ipm.publisher_id, ipm.message_sequence, @@ -244,7 +251,7 @@ public: private: typedef std::function< - void (uint64_t, uint64_t, uint64_t, std::unique_ptr &) + void (uint64_t, uint64_t, uint64_t, MessageUniquePtr &) > GetMessageCallbackType; typedef std::function MatchesAnyPublishersCallbackType; @@ -262,14 +269,13 @@ private: RCLCPP_DISABLE_COPY(Subscription); - AnySubscriptionCallback any_callback_; - typename message_memory_strategy::MessageMemoryStrategy::SharedPtr + AnySubscriptionCallback any_callback_; + typename message_memory_strategy::MessageMemoryStrategy::SharedPtr message_memory_strategy_; GetMessageCallbackType get_intra_process_message_callback_; MatchesAnyPublishersCallbackType matches_any_intra_process_publishers_; uint64_t intra_process_subscription_id_; - }; } /* namespace subscription */ diff --git a/rclcpp/test/test_intra_process_manager.cpp b/rclcpp/test/test_intra_process_manager.cpp index 4ce7f90..3cae9bf 100644 --- a/rclcpp/test/test_intra_process_manager.cpp +++ b/rclcpp/test/test_intra_process_manager.cpp @@ -55,11 +55,11 @@ public: size_t mock_queue_size; }; -template +template> class Publisher : public PublisherBase { public: - RCLCPP_SMART_PTR_DEFINITIONS(Publisher); + RCLCPP_SMART_PTR_DEFINITIONS(Publisher); }; } @@ -139,8 +139,10 @@ TEST(TestIntraProcessManager, nominal) { s1->mock_topic_name = "nominal1"; s1->mock_queue_size = 10; - auto p1_id = ipm.add_publisher(p1); - auto p2_id = ipm.add_publisher(p2); + auto p1_id = + ipm.add_publisher>(p1); + auto p2_id = + ipm.add_publisher>(p2); auto s1_id = ipm.add_subscription(s1); auto ipm_msg = std::make_shared(); @@ -221,7 +223,8 @@ TEST(TestIntraProcessManager, remove_publisher_before_trying_to_take) { s1->mock_topic_name = "nominal1"; s1->mock_queue_size = 10; - auto p1_id = ipm.add_publisher(p1); + auto p1_id = + ipm.add_publisher>(p1); auto s1_id = ipm.add_subscription(s1); auto ipm_msg = std::make_shared(); @@ -269,7 +272,8 @@ TEST(TestIntraProcessManager, removed_subscription_affects_take) { s3->mock_topic_name = "nominal1"; s3->mock_queue_size = 10; - auto p1_id = ipm.add_publisher(p1); + auto p1_id = + ipm.add_publisher>(p1); auto s1_id = ipm.add_subscription(s1); auto s2_id = ipm.add_subscription(s2); auto s3_id = ipm.add_subscription(s3); @@ -338,7 +342,8 @@ TEST(TestIntraProcessManager, multiple_subscriptions_one_publisher) { s3->mock_topic_name = "nominal1"; s3->mock_queue_size = 10; - auto p1_id = ipm.add_publisher(p1); + auto p1_id = + ipm.add_publisher>(p1); auto s1_id = ipm.add_subscription(s1); auto s2_id = ipm.add_subscription(s2); auto s3_id = ipm.add_subscription(s3); @@ -410,9 +415,12 @@ TEST(TestIntraProcessManager, multiple_publishers_one_subscription) { s1->mock_topic_name = "nominal1"; s1->mock_queue_size = 10; - auto p1_id = ipm.add_publisher(p1); - auto p2_id = ipm.add_publisher(p2); - auto p3_id = ipm.add_publisher(p3); + auto p1_id = + ipm.add_publisher>(p1); + auto p2_id = + ipm.add_publisher>(p2); + auto p3_id = + ipm.add_publisher>(p3); auto s1_id = ipm.add_subscription(s1); auto ipm_msg = std::make_shared(); @@ -512,9 +520,12 @@ TEST(TestIntraProcessManager, multiple_publishers_multiple_subscription) { s3->mock_topic_name = "nominal1"; s3->mock_queue_size = 10; - auto p1_id = ipm.add_publisher(p1); - auto p2_id = ipm.add_publisher(p2); - auto p3_id = ipm.add_publisher(p3); + auto p1_id = + ipm.add_publisher>(p1); + auto p2_id = + ipm.add_publisher>(p2); + auto p3_id = + ipm.add_publisher>(p3); auto s1_id = ipm.add_subscription(s1); auto s2_id = ipm.add_subscription(s2); auto s3_id = ipm.add_subscription(s3); @@ -655,7 +666,8 @@ TEST(TestIntraProcessManager, ring_buffer_displacement) { s1->mock_topic_name = "nominal1"; s1->mock_queue_size = 10; - auto p1_id = ipm.add_publisher(p1); + auto p1_id = + ipm.add_publisher>(p1); auto s1_id = ipm.add_subscription(s1); auto ipm_msg = std::make_shared(); @@ -720,7 +732,8 @@ TEST(TestIntraProcessManager, subscription_creation_race_condition) { p1->mock_topic_name = "nominal1"; p1->mock_queue_size = 2; - auto p1_id = ipm.add_publisher(p1); + auto p1_id = + ipm.add_publisher>(p1); auto ipm_msg = std::make_shared(); ipm_msg->message_sequence = 42; @@ -767,7 +780,7 @@ TEST(TestIntraProcessManager, publisher_out_of_scope_take) { p1->mock_topic_name = "nominal1"; p1->mock_queue_size = 2; - p1_id = ipm.add_publisher(p1); + p1_id = ipm.add_publisher>(p1); auto ipm_msg = std::make_shared(); ipm_msg->message_sequence = 42; @@ -805,7 +818,7 @@ TEST(TestIntraProcessManager, publisher_out_of_scope_store) { p1->mock_topic_name = "nominal1"; p1->mock_queue_size = 2; - p1_id = ipm.add_publisher(p1); + p1_id = ipm.add_publisher>(p1); } auto ipm_msg = std::make_shared();