rebind allocator in mapped_ring_buffer

This commit is contained in:
Jackie Kay 2015-10-22 15:58:00 -07:00
parent 0cd13608f7
commit ea21d9263a
11 changed files with 143 additions and 81 deletions

View file

@ -12,8 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef RCLCPP_RCLCPP_ALLOCATOR_COMMON_HPP_
#define RCLCPP_RCLCPP_ALLOCATOR_COMMON_HPP_
#ifndef RCLCPP_RCLCPP__ALLOCATOR_COMMON__HPP_
#define RCLCPP_RCLCPP__ALLOCATOR_COMMON__HPP_
#include <memory>

View file

@ -50,7 +50,8 @@ public:
void operator()(U * ptr)
{
std::allocator_traits<AllocRebind<U>>::destroy(*allocator_, ptr);
std::allocator_traits<AllocRebind<U>>::deallocate(*allocator_, ptr, sizeof(U));
// TODO: figure out if we're guaranteed to be destroying only 1 item here
std::allocator_traits<AllocRebind<U>>::deallocate(*allocator_, ptr, 1);
ptr = NULL;
}

View file

@ -33,9 +33,10 @@ namespace any_subscription_callback
template<typename MessageT, typename Alloc>
class AnySubscriptionCallback
{
using MessageAlloc = allocator::AllocRebind<MessageT, Alloc>;
using MessageDeleter = allocator::Deleter<typename MessageAlloc::allocator_type, MessageT>;
using UniquePtr = std::unique_ptr<MessageT, MessageDeleter>;
using MessageAllocTraits = allocator::AllocRebind<MessageT, Alloc>;
using MessageAlloc = typename MessageAllocTraits::allocator_type;
using MessageDeleter = allocator::Deleter<MessageAlloc, MessageT>;
using MessageUniquePtr = std::unique_ptr<MessageT, MessageDeleter>;
using SharedPtrCallback = std::function<void(const std::shared_ptr<MessageT>)>;
using SharedPtrWithInfoCallback =
@ -43,8 +44,8 @@ class AnySubscriptionCallback
using ConstSharedPtrCallback = std::function<void(const std::shared_ptr<const MessageT>)>;
using ConstSharedPtrWithInfoCallback =
std::function<void(const std::shared_ptr<const MessageT>, const rmw_message_info_t &)>;
using UniquePtrCallback = std::function<void(UniquePtr)>;
using UniquePtrWithInfoCallback = std::function<void(UniquePtr, const rmw_message_info_t &)>;
using UniquePtrCallback = std::function<void(MessageUniquePtr)>;
using UniquePtrWithInfoCallback = std::function<void(MessageUniquePtr, const rmw_message_info_t &)>;
SharedPtrCallback shared_ptr_callback_;
SharedPtrWithInfoCallback shared_ptr_with_info_callback_;
@ -59,8 +60,8 @@ public:
const_shared_ptr_callback_(nullptr), const_shared_ptr_with_info_callback_(nullptr),
unique_ptr_callback_(nullptr), unique_ptr_with_info_callback_(nullptr)
{
message_allocator_ = new typename MessageAlloc::allocator_type(*allocator.get());
allocator::set_allocator_for_deleter(&message_deleter_, message_allocator_);
message_allocator_ = std::make_shared<MessageAlloc>(*allocator.get());
allocator::set_allocator_for_deleter(&message_deleter_, message_allocator_.get());
}
AnySubscriptionCallback(const AnySubscriptionCallback &) = default;
@ -162,20 +163,20 @@ public:
} else if (const_shared_ptr_with_info_callback_) {
const_shared_ptr_with_info_callback_(message, message_info);
} else if (unique_ptr_callback_) {
auto ptr = MessageAlloc::allocate(*message_allocator_, 1);
MessageAlloc::construct(*message_allocator_, ptr, *message);
unique_ptr_callback_(UniquePtr(ptr, message_deleter_));
auto ptr = MessageAllocTraits::allocate(*message_allocator_.get(), 1);
MessageAllocTraits::construct(*message_allocator_.get(), ptr, *message);
unique_ptr_callback_(MessageUniquePtr(ptr, message_deleter_));
} else if (unique_ptr_with_info_callback_) {
auto ptr = MessageAlloc::allocate(*message_allocator_, 1);
MessageAlloc::construct(*message_allocator_, ptr, *message);
unique_ptr_with_info_callback_(UniquePtr(ptr, message_deleter_), message_info);
auto ptr = MessageAllocTraits::allocate(*message_allocator_.get(), 1);
MessageAllocTraits::construct(*message_allocator_.get(), ptr, *message);
unique_ptr_with_info_callback_(MessageUniquePtr(ptr, message_deleter_), message_info);
} else {
throw std::runtime_error("unexpected message without any callback set");
}
}
void dispatch_intra_process(
UniquePtr & message, const rmw_message_info_t & message_info)
MessageUniquePtr & message, const rmw_message_info_t & message_info)
{
(void)message_info;
if (shared_ptr_callback_) {
@ -200,7 +201,7 @@ public:
}
private:
typename MessageAlloc::allocator_type * message_allocator_;
std::shared_ptr<MessageAlloc> message_allocator_;
MessageDeleter message_deleter_;
};

View file

@ -201,9 +201,10 @@ public:
throw std::invalid_argument("the calculated buffer size is too large");
}
publishers_[id].sequence_number.store(0);
using Deleter = typename publisher::Publisher<MessageT, Alloc>::MessageDeleter;
publishers_[id].buffer = mapped_ring_buffer::MappedRingBuffer<MessageT, Deleter>::make_shared(
size);
publishers_[id].buffer =
mapped_ring_buffer::MappedRingBuffer<MessageT,
typename publisher::Publisher<MessageT, Alloc>::MessageAlloc>::make_shared(
size, publisher->get_allocator());
publishers_[id].target_subscriptions_by_message_sequence.reserve(size);
return id;
}
@ -249,11 +250,13 @@ public:
* \param message the message that is being stored.
* \return the message sequence number.
*/
template<typename MessageT, typename Deleter = std::default_delete<MessageT>>
template<typename MessageT, typename Alloc = std::allocator<void>>
uint64_t
store_intra_process_message(
uint64_t intra_process_publisher_id,
std::unique_ptr<MessageT, Deleter> & message)
std::unique_ptr<MessageT,
typename allocator::Deleter<typename std::allocator_traits<Alloc>::template rebind_alloc<MessageT>,
MessageT>> & message)
{
auto it = publishers_.find(intra_process_publisher_id);
if (it == publishers_.end()) {
@ -263,7 +266,8 @@ public:
// Calculate the next message sequence number.
uint64_t message_seq = info.sequence_number.fetch_add(1, std::memory_order_relaxed);
// Insert the message into the ring buffer using the message_seq to identify it.
typedef typename mapped_ring_buffer::MappedRingBuffer<MessageT, Deleter> TypedMRB;
using MRBMessageAlloc = typename std::allocator_traits<Alloc>::template rebind_alloc<MessageT>;
typedef typename mapped_ring_buffer::MappedRingBuffer<MessageT, MRBMessageAlloc> TypedMRB;
typename TypedMRB::SharedPtr typed_buffer = std::static_pointer_cast<TypedMRB>(info.buffer);
bool did_replace = typed_buffer->push_and_replace(message_seq, message);
// TODO(wjwwood): do something when a message was displaced. log debug?
@ -323,13 +327,15 @@ public:
* \param requesting_subscriptions_intra_process_id the subscription's id.
* \param message the message typed unique_ptr used to return the message.
*/
template<typename MessageT, typename Deleter = std::default_delete<MessageT>>
template<typename MessageT, typename Alloc = std::allocator<void>>
void
take_intra_process_message(
uint64_t intra_process_publisher_id,
uint64_t message_sequence_number,
uint64_t requesting_subscriptions_intra_process_id,
std::unique_ptr<MessageT, Deleter> & message)
std::unique_ptr<MessageT,
typename allocator::Deleter<typename std::allocator_traits<Alloc>::template rebind_alloc<MessageT>,
MessageT>> & message)
{
message = nullptr;
PublisherInfo * info;
@ -361,7 +367,8 @@ public:
}
target_subs->erase(it);
}
typedef typename mapped_ring_buffer::MappedRingBuffer<MessageT, Deleter> TypedMRB;
using MRBMessageAlloc = typename std::allocator_traits<Alloc>::template rebind_alloc<MessageT>;
typedef typename mapped_ring_buffer::MappedRingBuffer<MessageT, MRBMessageAlloc> TypedMRB;
typename TypedMRB::SharedPtr typed_buffer = std::static_pointer_cast<TypedMRB>(info->buffer);
// Return a copy or the unique_ptr (ownership) depending on how many subscriptions are left.
if (target_subs->size()) {

View file

@ -15,6 +15,7 @@
#ifndef RCLCPP_RCLCPP_RING_BUFFER_HPP_
#define RCLCPP_RCLCPP_RING_BUFFER_HPP_
#include <rclcpp/allocator/allocator_common.hpp>
#include <rclcpp/macros.hpp>
#include <algorithm>
@ -50,25 +51,35 @@ public:
* there is no guarantee on which value is returned if a key is used multiple
* times.
*/
template<typename T, typename Deleter = std::default_delete<T>>
template<typename T, typename Alloc = std::allocator<void>>
class MappedRingBuffer : public MappedRingBufferBase
{
public:
RCLCPP_SMART_PTR_DEFINITIONS(MappedRingBuffer<T, Deleter>);
using ElemUniquePtr = std::unique_ptr<T, Deleter>;
RCLCPP_SMART_PTR_DEFINITIONS(MappedRingBuffer<T, Alloc>);
using ElemAllocTraits = allocator::AllocRebind<T, Alloc>;
using ElemAlloc = typename ElemAllocTraits::allocator_type;
using ElemDeleter = allocator::Deleter<ElemAlloc, T>;
using ElemUniquePtr = std::unique_ptr<T, ElemDeleter>;
/// Constructor.
/* The constructor will allocate memory while reserving space.
*
* \param size size of the ring buffer; must be positive and non-zero.
*/
MappedRingBuffer(size_t size)
MappedRingBuffer(size_t size, std::shared_ptr<Alloc> allocator = nullptr)
: elements_(size), head_(0)
{
if (size == 0) {
throw std::invalid_argument("size must be a positive, non-zero value");
}
if (!allocator) {
allocator_ = std::make_shared<ElemAlloc>();
} else {
allocator_ = std::make_shared<ElemAlloc>(*allocator.get());
}
}
virtual ~MappedRingBuffer() {}
/// Return a copy of the value stored in the ring buffer at the given key.
@ -88,7 +99,9 @@ public:
auto it = get_iterator_of_key(key);
value = nullptr;
if (it != elements_.end() && it->in_use) {
value = ElemUniquePtr(new T(*it->value));
auto ptr = ElemAllocTraits::allocate(*allocator_.get(), 1);
ElemAllocTraits::construct(*allocator_.get(), ptr, *it->value);
value = ElemUniquePtr(ptr);
}
}
@ -118,7 +131,9 @@ public:
value = nullptr;
if (it != elements_.end() && it->in_use) {
// Make a copy.
auto copy = ElemUniquePtr(new T(*it->value));
auto ptr = ElemAllocTraits::allocate(*allocator_.get(), 1);
ElemAllocTraits::construct(*allocator_.get(), ptr, *it->value);
auto copy = ElemUniquePtr(ptr);
// Return the original.
value.swap(it->value);
// Store the copy.
@ -184,7 +199,7 @@ public:
}
private:
RCLCPP_DISABLE_COPY(MappedRingBuffer<T, Deleter>);
RCLCPP_DISABLE_COPY(MappedRingBuffer<T, Alloc>);
struct element
{
@ -193,7 +208,9 @@ private:
bool in_use;
};
typename std::vector<element>::iterator
using VectorAlloc = typename std::allocator_traits<Alloc>::template rebind_alloc<element>;
typename std::vector<element, VectorAlloc>::iterator
get_iterator_of_key(uint64_t key)
{
// *INDENT-OFF* (prevent uncrustify from making unecessary indents here)
@ -204,8 +221,9 @@ private:
return it;
}
std::vector<element> elements_;
std::vector<element, VectorAlloc> elements_;
size_t head_;
std::shared_ptr<ElemAlloc> allocator_;
};

View file

@ -34,17 +34,18 @@ class MessageMemoryStrategy
public:
RCLCPP_SMART_PTR_DEFINITIONS(MessageMemoryStrategy);
using MessageAlloc = allocator::AllocRebind<MessageT, Alloc>;
using MessageDeleter = allocator::Deleter<typename MessageAlloc::allocator_type, MessageT>;
using MessageAllocTraits = allocator::AllocRebind<MessageT, Alloc>;
using MessageAlloc = typename MessageAllocTraits::allocator_type;
using MessageDeleter = allocator::Deleter<MessageAlloc, MessageT>;
MessageMemoryStrategy()
{
message_allocator_ = new typename MessageAlloc::allocator_type();
message_allocator_ = std::make_shared<MessageAlloc>();
}
MessageMemoryStrategy(std::shared_ptr<Alloc> allocator)
{
message_allocator_ = new typename MessageAlloc::allocator_type(*allocator.get());
message_allocator_ = std::make_shared<MessageAlloc>(*allocator.get());
}
/// Default factory method
@ -57,9 +58,10 @@ public:
// \return Shared pointer to the new message.
virtual std::shared_ptr<MessageT> borrow_message()
{
auto ptr = MessageAlloc::allocate(*message_allocator_, 1);
MessageAlloc::construct(*message_allocator_, ptr);
auto ptr = MessageAllocTraits::allocate(*message_allocator_.get(), 1);
MessageAllocTraits::construct(*message_allocator_.get(), ptr);
return std::shared_ptr<MessageT>(ptr, message_deleter_);
//return std::allocate_shared<MessageT, typename MessageAlloc::allocator_type>(*message_allocator_);
}
/// Release ownership of the message, which will deallocate it if it has no more owners.
@ -69,7 +71,7 @@ public:
msg.reset();
}
typename MessageAlloc::allocator_type * message_allocator_;
std::shared_ptr<MessageAlloc> message_allocator_;
MessageDeleter message_deleter_;
};

View file

@ -187,8 +187,9 @@ Node::create_publisher(
"' is incompatible from the publisher type '" + message_type_info.name() + "'");
}
MessageT * typed_message_ptr = static_cast<MessageT *>(msg);
std::unique_ptr<MessageT> unique_msg(typed_message_ptr);
uint64_t message_seq = ipm->store_intra_process_message(publisher_id, unique_msg);
using MessageDeleter = typename publisher::Publisher<MessageT, Alloc>::MessageDeleter;
std::unique_ptr<MessageT, MessageDeleter> unique_msg(typed_message_ptr);
uint64_t message_seq = ipm->store_intra_process_message<MessageT, Alloc>(publisher_id, unique_msg);
return message_seq;
};
// *INDENT-ON*
@ -293,7 +294,7 @@ Node::create_subscription(
throw std::runtime_error(
"intra process take called after destruction of intra process manager");
}
ipm->take_intra_process_message(publisher_id, message_sequence, subscription_id, message);
ipm->take_intra_process_message<MessageT, Alloc>(publisher_id, message_sequence, subscription_id, message);
},
[weak_ipm](const rmw_gid_t * sender_gid) -> bool {
auto ipm = weak_ipm.lock();

View file

@ -211,8 +211,9 @@ class Publisher : public PublisherBase
friend rclcpp::node::Node;
public:
using MessageAlloc = allocator::AllocRebind<MessageT, Alloc>;
using MessageDeleter = allocator::Deleter<typename MessageAlloc::allocator_type, MessageT>;
using MessageAllocTraits = allocator::AllocRebind<MessageT, Alloc>;
using MessageAlloc = typename MessageAllocTraits::allocator_type;
using MessageDeleter = allocator::Deleter<MessageAlloc, MessageT>;
using MessageUniquePtr = std::unique_ptr<MessageT, MessageDeleter>;
RCLCPP_SMART_PTR_DEFINITIONS(Publisher<MessageT, Alloc>);
@ -225,8 +226,8 @@ public:
std::shared_ptr<Alloc> allocator)
: PublisherBase(node_handle, publisher_handle, topic, queue_size)
{
message_allocator_ = new typename MessageAlloc::allocator_type(*allocator.get());
allocator::set_allocator_for_deleter(&message_deleter_, message_allocator_);
message_allocator_ = std::make_shared<MessageAlloc>(*allocator.get());
allocator::set_allocator_for_deleter(&message_deleter_, message_allocator_.get());
}
@ -237,7 +238,6 @@ public:
*/
void
publish(std::unique_ptr<MessageT, MessageDeleter> & msg)
//publish(MessageUniquePtr & msg)
{
this->do_inter_process_publish(msg.get());
if (store_intra_process_message_) {
@ -285,8 +285,8 @@ public:
// The intra process manager should probably also be able to store
// shared_ptr's and do the "smart" thing based on other intra process
// subscriptions. For now call the other publish().
auto ptr = MessageAlloc::allocate(*message_allocator_, 1);
MessageAlloc::construct(*message_allocator_, ptr, *msg.get());
auto ptr = MessageAllocTraits::allocate(*message_allocator_.get(), 1);
MessageAllocTraits::construct(*message_allocator_.get(), ptr, *msg.get());
MessageUniquePtr unique_msg(ptr, message_deleter_);
return this->publish(unique_msg);
}
@ -304,8 +304,8 @@ public:
// The intra process manager should probably also be able to store
// shared_ptr's and do the "smart" thing based on other intra process
// subscriptions. For now call the other publish().
auto ptr = MessageAlloc::allocate(*message_allocator_, 1);
MessageAlloc::construct(*message_allocator_, ptr, *msg.get());
auto ptr = MessageAllocTraits::allocate(*message_allocator_.get(), 1);
MessageAllocTraits::construct(*message_allocator_.get(), ptr, *msg.get());
MessageUniquePtr unique_msg(ptr, message_deleter_);
return this->publish(unique_msg);
}
@ -319,12 +319,17 @@ public:
return this->do_inter_process_publish(&msg);
}
// Otherwise we have to allocate memory in a unique_ptr and pass it along.
auto ptr = MessageAlloc::allocate(*message_allocator_, 1);
MessageAlloc::construct(*message_allocator_, ptr, msg);
auto ptr = MessageAllocTraits::allocate(*message_allocator_.get(), 1);
MessageAllocTraits::construct(*message_allocator_.get(), ptr, msg);
MessageUniquePtr unique_msg(ptr, message_deleter_);
return this->publish(unique_msg);
}
std::shared_ptr<MessageAlloc> get_allocator() const
{
return message_allocator_;
}
protected:
void
do_inter_process_publish(const MessageT * msg)
@ -338,7 +343,7 @@ protected:
}
}
typename MessageAlloc::allocator_type * message_allocator_;
std::shared_ptr<MessageAlloc> message_allocator_;
MessageDeleter message_deleter_;

View file

@ -35,21 +35,22 @@ namespace allocator_memory_strategy
* come through.
*/
template<typename Alloc>
class AllocatorMemoryStrategy : public MemoryStrategy
class AllocatorMemoryStrategy : public memory_strategy::MemoryStrategy
{
public:
RCLCPP_SMART_PTR_DEFINITIONS(AllocatorMemoryStrategy<Alloc>);
using ExecAlloc = allocator::AllocRebind<executor::AnyExecutable, Alloc>;
using ExecDeleter =
allocator::Deleter<typename ExecAlloc::allocator_type, executor::AnyExecutable>;
using VoidAlloc = allocator::AllocRebind<void *, Alloc>;
using ExecAllocTraits = allocator::AllocRebind<executor::AnyExecutable, Alloc>;
using ExecAlloc = typename ExecAllocTraits::allocator_type;
using ExecDeleter = allocator::Deleter<ExecAlloc, executor::AnyExecutable>;
using VoidAllocTraits = typename allocator::AllocRebind<void *, Alloc>;
using VoidAlloc = typename VoidAllocTraits::allocator_type;
AllocatorMemoryStrategy(std::shared_ptr<Alloc> allocator)
{
executable_allocator_ = new ExecAlloc(*allocator.get());
allocator_ = new VoidAlloc(*allocator.get());
executable_allocator_ = std::make_shared<ExecAlloc>(*allocator.get());
allocator_ = std::make_shared<VoidAlloc>(*allocator.get());
}
/// Borrow memory for storing data for subscriptions, services, clients, or guard conditions.
@ -137,8 +138,8 @@ public:
virtual executor::AnyExecutable::SharedPtr instantiate_next_executable()
{
//return std::make_shared<executor::AnyExecutable>();
auto ptr = ExecAlloc::allocate(*executable_allocator_, 1);
ExecAlloc::construct(*executable_allocator_);
auto ptr = ExecAllocTraits::allocate(*executable_allocator_.get(), 1);
ExecAllocTraits::construct(*executable_allocator_.get(), ptr);
return std::shared_ptr<executor::AnyExecutable>(ptr, executable_deleter_);
}
@ -152,7 +153,9 @@ public:
if (size == 0) {
return NULL;
}
return VoidAlloc::allocate(*allocator_, size);
auto ptr = VoidAllocTraits::allocate(*allocator_.get(), size);
alloc_map[ptr] = size;
return ptr;
}
/// Implementation of a general-purpose free.
@ -161,22 +164,34 @@ public:
*/
virtual void free(void * ptr)
{
VoidAlloc::deallocate(*allocator, ptr);
if (alloc_map.count(ptr) == 0) {
// do nothing, the pointer is not in the alloc'd map
return;
}
VoidAllocTraits::deallocate(*allocator_.get(), &ptr, alloc_map[ptr]);
}
std::vector<rclcpp::subscription::SubscriptionBase::SharedPtr, Alloc> subs;
std::vector<rclcpp::service::ServiceBase::SharedPtr, Alloc> services;
std::vector<rclcpp::client::ClientBase::SharedPtr, Alloc> clients;
template<typename U>
using VectorRebind = typename std::allocator_traits<Alloc>::template rebind_alloc<U>;
std::vector<void *, Alloc> subscription_handles;
std::vector<void *, Alloc> service_handles;
std::vector<void *, Alloc> client_handles;
std::vector<rclcpp::subscription::SubscriptionBase::SharedPtr,
VectorRebind<rclcpp::subscription::SubscriptionBase::SharedPtr>> subs;
std::vector<rclcpp::service::ServiceBase::SharedPtr,
VectorRebind<rclcpp::service::ServiceBase::SharedPtr>> services;
std::vector<rclcpp::client::ClientBase::SharedPtr,
VectorRebind<rclcpp::client::ClientBase::SharedPtr>> clients;
std::vector<void *, VoidAlloc> subscription_handles;
std::vector<void *, VoidAlloc> service_handles;
std::vector<void *, VoidAlloc> client_handles;
std::array<void *, 2> guard_cond_handles;
std::unordered_map<void *, size_t> alloc_map;
private:
typename ExecAlloc::allocator_type * executable_allocator_;
std::shared_ptr<ExecAlloc> executable_allocator_;
ExecDeleter executable_deleter_;
typename VoidAlloc::allocator_type * allocator_;
std::shared_ptr<VoidAlloc> allocator_;
};
} /* allocator_memory_strategy */

View file

@ -150,8 +150,9 @@ class Subscription : public SubscriptionBase
friend class rclcpp::node::Node;
public:
using MessageAlloc = allocator::AllocRebind<MessageT, Alloc>;
using MessageDeleter = allocator::Deleter<typename MessageAlloc::allocator_type, MessageT>;
using MessageAllocTraits = allocator::AllocRebind<MessageT, Alloc>;
using MessageAlloc = typename MessageAllocTraits::allocator_type;
using MessageDeleter = allocator::Deleter<MessageAlloc, MessageT>;
using MessageUniquePtr = std::unique_ptr<MessageT, MessageDeleter>;
RCLCPP_SMART_PTR_DEFINITIONS(Subscription);

View file

@ -59,10 +59,21 @@ template<typename T, typename Alloc = std::allocator<void>>
class Publisher : public PublisherBase
{
public:
using MessageAlloc = allocator::AllocRebind<T, Alloc>;
using MessageDeleter = allocator::Deleter<typename MessageAlloc::allocator_type, T>;
using MessageAllocTraits = allocator::AllocRebind<T, Alloc>;
using MessageAlloc = typename MessageAllocTraits::allocator_type;
using MessageDeleter = allocator::Deleter<MessageAlloc, T>;
using MessageUniquePtr = std::unique_ptr<T, MessageDeleter>;
std::shared_ptr<MessageAlloc> allocator_;
RCLCPP_SMART_PTR_DEFINITIONS(Publisher<T, Alloc>);
Publisher() {
allocator_ = std::make_shared<MessageAlloc>();
}
std::shared_ptr<MessageAlloc> get_allocator() {
return allocator_;
}
};
}