template subscriber, using rebind semantics
This commit is contained in:
parent
444e4fdae3
commit
788be0009d
13 changed files with 253 additions and 381 deletions
33
rclcpp/include/rclcpp/allocator/allocator_common.hpp
Normal file
33
rclcpp/include/rclcpp/allocator/allocator_common.hpp
Normal file
|
@ -0,0 +1,33 @@
|
|||
// Copyright 2015 Open Source Robotics Foundation, Inc.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
#ifndef RCLCPP_RCLCPP_ALLOCATOR_COMMON_HPP_
|
||||
#define RCLCPP_RCLCPP_ALLOCATOR_COMMON_HPP_
|
||||
|
||||
#include <memory>
|
||||
|
||||
#include <rclcpp/allocator/allocator_deleter.hpp>
|
||||
|
||||
namespace rclcpp
|
||||
{
|
||||
namespace allocator
|
||||
{
|
||||
|
||||
template<typename U, typename Alloc>
|
||||
using AllocRebind = typename std::allocator_traits<Alloc>::template rebind_traits<U>;
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
#endif
|
|
@ -23,9 +23,12 @@ namespace rclcpp
|
|||
namespace allocator
|
||||
{
|
||||
|
||||
template<typename T, typename Allocator>
|
||||
template<typename Allocator>
|
||||
class AllocatorDeleter
|
||||
{
|
||||
template<typename U>
|
||||
using AllocRebind = typename std::allocator_traits<Allocator>::template rebind_alloc<U>;
|
||||
|
||||
public:
|
||||
AllocatorDeleter()
|
||||
: allocator_(NULL)
|
||||
|
@ -37,16 +40,17 @@ public:
|
|||
{
|
||||
}
|
||||
|
||||
template<typename U, typename B>
|
||||
AllocatorDeleter(const AllocatorDeleter<U, B> & a)
|
||||
template<typename U>
|
||||
AllocatorDeleter(const AllocatorDeleter<U> & a)
|
||||
{
|
||||
allocator_ = a.get_allocator();
|
||||
}
|
||||
|
||||
void operator()(T * ptr)
|
||||
template<typename U>
|
||||
void operator()(U * ptr)
|
||||
{
|
||||
std::allocator_traits<Allocator>::destroy(*allocator_, ptr);
|
||||
std::allocator_traits<Allocator>::deallocate(*allocator_, ptr, sizeof(T));
|
||||
std::allocator_traits<AllocRebind<U>>::destroy(*allocator_, ptr);
|
||||
std::allocator_traits<AllocRebind<U>>::deallocate(*allocator_, ptr, sizeof(U));
|
||||
ptr = NULL;
|
||||
}
|
||||
|
||||
|
@ -64,61 +68,36 @@ private:
|
|||
Allocator * allocator_;
|
||||
};
|
||||
|
||||
/*
|
||||
template<template<typename> class Alloc, typename T, typename D>
|
||||
D initialize_deleter(Alloc<T> * alloc)
|
||||
template<typename Alloc, typename T, typename D>
|
||||
void set_allocator_for_deleter(D * deleter, Alloc * alloc)
|
||||
{
|
||||
(void) alloc;
|
||||
(void) deleter;
|
||||
throw std::runtime_error("Reached unexpected template specialization");
|
||||
}
|
||||
|
||||
template<typename T>
|
||||
std::default_delete<T> initialize_deleter(std::allocator<T> * alloc)
|
||||
{
|
||||
(void) alloc;
|
||||
return std::default_delete<T>();
|
||||
}
|
||||
|
||||
template<template<typename> class Alloc, typename T>
|
||||
AllocatorDeleter<T, Alloc<T>> initialize_deleter(Alloc<T> * alloc)
|
||||
{
|
||||
if (!alloc) {
|
||||
throw std::invalid_argument("Allocator argument was NULL");
|
||||
}
|
||||
return AllocatorDeleter<T, Alloc<T>>(alloc);
|
||||
}
|
||||
*/
|
||||
template<template<typename> class Alloc, typename T, typename D>
|
||||
void set_allocator_for_deleter(D * deleter, Alloc<T> * alloc)
|
||||
{
|
||||
(void) alloc;
|
||||
throw std::runtime_error("Reached unexpected template specialization");
|
||||
}
|
||||
|
||||
template<typename T>
|
||||
void set_allocator_for_deleter(std::default_delete<T> * deleter, std::allocator<T> * alloc)
|
||||
template<typename T, typename U>
|
||||
void set_allocator_for_deleter(std::default_delete<T> * deleter, std::allocator<U> * alloc)
|
||||
{
|
||||
(void) deleter;
|
||||
(void) alloc;
|
||||
}
|
||||
|
||||
template<template<typename> class Alloc, typename T>
|
||||
void set_allocator_for_deleter(AllocatorDeleter<T, Alloc<T>> * deleter, Alloc<T> * alloc)
|
||||
template<typename Alloc, typename T>
|
||||
void set_allocator_for_deleter(AllocatorDeleter<T> * deleter, Alloc * alloc)
|
||||
{
|
||||
if (!deleter || !alloc) {
|
||||
throw std::invalid_argument("Argument was NULL to set_allocator_for_deleter");
|
||||
}
|
||||
//return AllocatorDeleter<T, Alloc<T>>(alloc);
|
||||
deleter->set_allocator(alloc);
|
||||
}
|
||||
|
||||
template<typename Alloc, typename T>
|
||||
using Deleter = typename std::conditional<
|
||||
std::is_same<Alloc, std::allocator<T>>::value,
|
||||
std::is_same<Alloc, std::allocator<void>>::value,
|
||||
std::default_delete<T>,
|
||||
AllocatorDeleter<T, Alloc>
|
||||
AllocatorDeleter<Alloc>
|
||||
>::type;
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -1,52 +0,0 @@
|
|||
// Copyright 2015 Open Source Robotics Foundation, Inc.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
#ifndef RCLCPP_RCLCPP_ALLOCATOR_FACTORY_HPP_
|
||||
#define RCLCPP_RCLCPP_ALLOCATOR_FACTORY_HPP_
|
||||
|
||||
#include <memory>
|
||||
#include <iostream>
|
||||
#include <rclcpp/allocator/allocator_deleter.hpp>
|
||||
|
||||
template<template<typename> class Alloc, typename T, typename D>
|
||||
D initialize_deleter(Alloc<T> * alloc)
|
||||
{
|
||||
(void) alloc;
|
||||
throw std::runtime_error("Reached unexpected template specialization");
|
||||
}
|
||||
|
||||
template<typename T>
|
||||
std::default_delete<T> initialize_deleter(std::allocator<T> * alloc)
|
||||
{
|
||||
(void) alloc;
|
||||
return std::default_delete<T>();
|
||||
}
|
||||
|
||||
template<template<typename> class Alloc, typename T>
|
||||
AllocatorDeleter<T, Alloc<T>> initialize_deleter(Alloc<T> * alloc)
|
||||
{
|
||||
if (!alloc) {
|
||||
throw std::invalid_argument("Allocator argument was NULL");
|
||||
}
|
||||
return AllocatorDeleter<T, Alloc<T>>(alloc);
|
||||
}
|
||||
|
||||
template<typename T, template<typename> class Alloc>
|
||||
using Deleter = typename std::conditional<
|
||||
std::is_same<Alloc<T>, std::allocator<T>>::value,
|
||||
std::default_delete<T>,
|
||||
AllocatorDeleter<T, Alloc<T>>
|
||||
>::type;
|
||||
|
||||
#endif
|
|
@ -1,149 +0,0 @@
|
|||
// Copyright 2015 Open Source Robotics Foundation, Inc.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
#ifndef RCLCPP_RCLCPP_ALLOCATOR_WRAPPER_HPP_
|
||||
#define RCLCPP_RCLCPP_ALLOCATOR_WRAPPER_HPP_
|
||||
|
||||
#include <memory>
|
||||
#include <iostream>
|
||||
#include <rclcpp/allocator/allocator_deleter.hpp>
|
||||
|
||||
namespace rclcpp
|
||||
{
|
||||
|
||||
namespace allocator
|
||||
{
|
||||
|
||||
// Type-erased interface to AllocatorWrapper
|
||||
class AllocatorWrapper
|
||||
{
|
||||
virtual void * allocate(size_t size) = 0;
|
||||
virtual void deallocate(void * pointer, size_t size) = 0;
|
||||
// Construct will have to be through placement new, since pure virtual function can't be templated
|
||||
|
||||
virtual void destroy(T* pointer) = 0;
|
||||
}
|
||||
|
||||
template<typename T, typename Alloc>
|
||||
class TypedAllocatorWrapper : public AllocatorWrapper
|
||||
{
|
||||
public:
|
||||
/*
|
||||
using Deleter = typename std::conditional<
|
||||
std::is_same<Alloc, std::allocator<T>>::value,
|
||||
std::default_delete<T>,
|
||||
AllocatorDeleter<T, Alloc>
|
||||
>::type;
|
||||
*/
|
||||
|
||||
using DeleterT = Deleter<Alloc, T>;
|
||||
|
||||
TypedAllocatorWrapper(Alloc * allocator)
|
||||
: allocator_(allocator)
|
||||
{
|
||||
if (!allocator_) {
|
||||
throw std::invalid_argument("Allocator argument was NULL");
|
||||
}
|
||||
initialize_deleter(deleter_, allocator_);
|
||||
if (deleter_ == NULL) {
|
||||
throw std::invalid_argument("Failed to initialize deleter");
|
||||
}
|
||||
}
|
||||
|
||||
TypedAllocatorWrapper(Alloc * allocator, DeleterT * deleter)
|
||||
: allocator_(allocator), deleter_(deleter)
|
||||
{
|
||||
if (!allocator_) {
|
||||
throw std::invalid_argument("Allocator argument was NULL");
|
||||
}
|
||||
if (!deleter_) {
|
||||
throw std::invalid_argument("Deleter argument was NULL");
|
||||
}
|
||||
}
|
||||
|
||||
TypedAllocatorWrapper(Alloc & allocator)
|
||||
{
|
||||
allocator_ = &allocator;
|
||||
if (!allocator_) {
|
||||
throw std::invalid_argument("Allocator argument was NULL");
|
||||
}
|
||||
initialize_deleter(deleter_, allocator_);
|
||||
if (!deleter_) {
|
||||
throw std::invalid_argument("Failed to initialize deleter");
|
||||
}
|
||||
}
|
||||
|
||||
TypedAllocatorWrapper()
|
||||
{
|
||||
allocator_ = new Alloc();
|
||||
initialize_deleter(deleter_, allocator_);
|
||||
if (!deleter_) {
|
||||
//throw std::invalid_argument("Failed to initialize deleter");
|
||||
deleter_ = new DeleterT;
|
||||
}
|
||||
}
|
||||
|
||||
void * allocate(size_t size)
|
||||
{
|
||||
return std::allocator_traits<Alloc>::allocate(*allocator_, size);
|
||||
}
|
||||
|
||||
void deallocate(void * pointer, size_t size)
|
||||
{
|
||||
deallocate(static_cast<T*>(pointer), size);
|
||||
}
|
||||
|
||||
void deallocate(T * pointer, size_t size)
|
||||
{
|
||||
std::allocator_traits<Alloc>::deallocate(*allocator_, pointer, size);
|
||||
}
|
||||
|
||||
template<class ... Args>
|
||||
void construct(T * pointer, Args && ... args)
|
||||
{
|
||||
std::allocator_traits<Alloc>::construct(*allocator_, pointer, std::forward<Args>(args)...);
|
||||
}
|
||||
|
||||
void destroy(void * pointer)
|
||||
{
|
||||
destroy(static_cast<T*>(pointer));
|
||||
}
|
||||
|
||||
template<class T>
|
||||
void destroy(T * pointer)
|
||||
{
|
||||
std::allocator_traits<Alloc>::destroy(*allocator_, pointer);
|
||||
}
|
||||
|
||||
DeleterT * get_deleter() const
|
||||
{
|
||||
return deleter_;
|
||||
}
|
||||
Alloc * get_underlying_allocator() const
|
||||
{
|
||||
return allocator_;
|
||||
}
|
||||
|
||||
private:
|
||||
Alloc * allocator_;
|
||||
DeleterT * deleter_;
|
||||
};
|
||||
|
||||
template<typename T>
|
||||
using DefaultAllocator = TypedAllocatorWrapper<T, std::allocator<T>>;
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
#endif
|
|
@ -15,6 +15,7 @@
|
|||
#ifndef RCLCPP_RCLCPP_ANY_SUBSCRIPTION_CALLBACK_HPP_
|
||||
#define RCLCPP_RCLCPP_ANY_SUBSCRIPTION_CALLBACK_HPP_
|
||||
|
||||
#include <rclcpp/allocator/allocator_common.hpp>
|
||||
#include <rclcpp/function_traits.hpp>
|
||||
|
||||
#include <functional>
|
||||
|
@ -29,18 +30,21 @@ namespace rclcpp
|
|||
namespace any_subscription_callback
|
||||
{
|
||||
|
||||
template<typename MessageT>
|
||||
template<typename MessageT, typename Alloc>
|
||||
class AnySubscriptionCallback
|
||||
{
|
||||
using MessageAlloc = allocator::AllocRebind<MessageT, Alloc>;
|
||||
using MessageDeleter = allocator::Deleter<typename MessageAlloc::allocator_type, MessageT>;
|
||||
using UniquePtr = std::unique_ptr<MessageT, MessageDeleter>;
|
||||
|
||||
using SharedPtrCallback = std::function<void(const std::shared_ptr<MessageT>)>;
|
||||
using SharedPtrWithInfoCallback =
|
||||
std::function<void(const std::shared_ptr<MessageT>, const rmw_message_info_t &)>;
|
||||
using ConstSharedPtrCallback = std::function<void(const std::shared_ptr<const MessageT>)>;
|
||||
using ConstSharedPtrWithInfoCallback =
|
||||
std::function<void(const std::shared_ptr<const MessageT>, const rmw_message_info_t &)>;
|
||||
using UniquePtrCallback = std::function<void(std::unique_ptr<MessageT>)>;
|
||||
using UniquePtrWithInfoCallback =
|
||||
std::function<void(std::unique_ptr<MessageT>, const rmw_message_info_t &)>;
|
||||
using UniquePtrCallback = std::function<void(UniquePtr)>;
|
||||
using UniquePtrWithInfoCallback = std::function<void(UniquePtr, const rmw_message_info_t &)>;
|
||||
|
||||
SharedPtrCallback shared_ptr_callback_;
|
||||
SharedPtrWithInfoCallback shared_ptr_with_info_callback_;
|
||||
|
@ -50,11 +54,14 @@ class AnySubscriptionCallback
|
|||
UniquePtrWithInfoCallback unique_ptr_with_info_callback_;
|
||||
|
||||
public:
|
||||
AnySubscriptionCallback()
|
||||
AnySubscriptionCallback(std::shared_ptr<Alloc> allocator)
|
||||
: shared_ptr_callback_(nullptr), shared_ptr_with_info_callback_(nullptr),
|
||||
const_shared_ptr_callback_(nullptr), const_shared_ptr_with_info_callback_(nullptr),
|
||||
unique_ptr_callback_(nullptr), unique_ptr_with_info_callback_(nullptr)
|
||||
{}
|
||||
{
|
||||
message_allocator_ = new typename MessageAlloc::allocator_type(*allocator.get());
|
||||
allocator::set_allocator_for_deleter(&message_deleter_, message_allocator_);
|
||||
}
|
||||
|
||||
AnySubscriptionCallback(const AnySubscriptionCallback &) = default;
|
||||
|
||||
|
@ -155,17 +162,20 @@ public:
|
|||
} else if (const_shared_ptr_with_info_callback_) {
|
||||
const_shared_ptr_with_info_callback_(message, message_info);
|
||||
} else if (unique_ptr_callback_) {
|
||||
unique_ptr_callback_(std::unique_ptr<MessageT>(new MessageT(*message)));
|
||||
auto ptr = MessageAlloc::allocate(*message_allocator_, 1);
|
||||
MessageAlloc::construct(*message_allocator_, ptr, *message);
|
||||
unique_ptr_callback_(UniquePtr(ptr, message_deleter_));
|
||||
} else if (unique_ptr_with_info_callback_) {
|
||||
unique_ptr_with_info_callback_(std::unique_ptr<MessageT>(new MessageT(*
|
||||
message)), message_info);
|
||||
auto ptr = MessageAlloc::allocate(*message_allocator_, 1);
|
||||
MessageAlloc::construct(*message_allocator_, ptr, *message);
|
||||
unique_ptr_with_info_callback_(UniquePtr(ptr, message_deleter_), message_info);
|
||||
} else {
|
||||
throw std::runtime_error("unexpected message without any callback set");
|
||||
}
|
||||
}
|
||||
|
||||
void dispatch_intra_process(
|
||||
std::unique_ptr<MessageT> & message, const rmw_message_info_t & message_info)
|
||||
UniquePtr & message, const rmw_message_info_t & message_info)
|
||||
{
|
||||
(void)message_info;
|
||||
if (shared_ptr_callback_) {
|
||||
|
@ -188,6 +198,10 @@ public:
|
|||
throw std::runtime_error("unexpected message without any callback set");
|
||||
}
|
||||
}
|
||||
|
||||
private:
|
||||
typename MessageAlloc::allocator_type * message_allocator_;
|
||||
MessageDeleter message_deleter_;
|
||||
};
|
||||
|
||||
} /* namespace any_subscription_callback */
|
||||
|
|
|
@ -188,9 +188,9 @@ public:
|
|||
* \param buffer_size if 0 (default) a size is calculated based on the QoS.
|
||||
* \return an unsigned 64-bit integer which is the publisher's unique id.
|
||||
*/
|
||||
template<typename MessageT>
|
||||
template<typename MessageT, typename Alloc>
|
||||
uint64_t
|
||||
add_publisher(typename publisher::Publisher<MessageT>::SharedPtr publisher,
|
||||
add_publisher(typename publisher::Publisher<MessageT, Alloc>::SharedPtr publisher,
|
||||
size_t buffer_size = 0)
|
||||
{
|
||||
auto id = IntraProcessManager::get_next_unique_id();
|
||||
|
@ -201,7 +201,9 @@ public:
|
|||
throw std::invalid_argument("the calculated buffer size is too large");
|
||||
}
|
||||
publishers_[id].sequence_number.store(0);
|
||||
publishers_[id].buffer = mapped_ring_buffer::MappedRingBuffer<MessageT>::make_shared(size);
|
||||
using Deleter = typename publisher::Publisher<MessageT, Alloc>::MessageDeleter;
|
||||
publishers_[id].buffer = mapped_ring_buffer::MappedRingBuffer<MessageT, Deleter>::make_shared(
|
||||
size);
|
||||
publishers_[id].target_subscriptions_by_message_sequence.reserve(size);
|
||||
return id;
|
||||
}
|
||||
|
@ -247,11 +249,11 @@ public:
|
|||
* \param message the message that is being stored.
|
||||
* \return the message sequence number.
|
||||
*/
|
||||
template<typename MessageT>
|
||||
template<typename MessageT, typename Deleter = std::default_delete<MessageT>>
|
||||
uint64_t
|
||||
store_intra_process_message(
|
||||
uint64_t intra_process_publisher_id,
|
||||
std::unique_ptr<MessageT> & message)
|
||||
std::unique_ptr<MessageT, Deleter> & message)
|
||||
{
|
||||
auto it = publishers_.find(intra_process_publisher_id);
|
||||
if (it == publishers_.end()) {
|
||||
|
@ -261,7 +263,7 @@ public:
|
|||
// Calculate the next message sequence number.
|
||||
uint64_t message_seq = info.sequence_number.fetch_add(1, std::memory_order_relaxed);
|
||||
// Insert the message into the ring buffer using the message_seq to identify it.
|
||||
typedef typename mapped_ring_buffer::MappedRingBuffer<MessageT> TypedMRB;
|
||||
typedef typename mapped_ring_buffer::MappedRingBuffer<MessageT, Deleter> TypedMRB;
|
||||
typename TypedMRB::SharedPtr typed_buffer = std::static_pointer_cast<TypedMRB>(info.buffer);
|
||||
bool did_replace = typed_buffer->push_and_replace(message_seq, message);
|
||||
// TODO(wjwwood): do something when a message was displaced. log debug?
|
||||
|
@ -321,13 +323,13 @@ public:
|
|||
* \param requesting_subscriptions_intra_process_id the subscription's id.
|
||||
* \param message the message typed unique_ptr used to return the message.
|
||||
*/
|
||||
template<typename MessageT>
|
||||
template<typename MessageT, typename Deleter = std::default_delete<MessageT>>
|
||||
void
|
||||
take_intra_process_message(
|
||||
uint64_t intra_process_publisher_id,
|
||||
uint64_t message_sequence_number,
|
||||
uint64_t requesting_subscriptions_intra_process_id,
|
||||
std::unique_ptr<MessageT> & message)
|
||||
std::unique_ptr<MessageT, Deleter> & message)
|
||||
{
|
||||
message = nullptr;
|
||||
PublisherInfo * info;
|
||||
|
@ -359,7 +361,7 @@ public:
|
|||
}
|
||||
target_subs->erase(it);
|
||||
}
|
||||
typedef typename mapped_ring_buffer::MappedRingBuffer<MessageT> TypedMRB;
|
||||
typedef typename mapped_ring_buffer::MappedRingBuffer<MessageT, Deleter> TypedMRB;
|
||||
typename TypedMRB::SharedPtr typed_buffer = std::static_pointer_cast<TypedMRB>(info->buffer);
|
||||
// Return a copy or the unique_ptr (ownership) depending on how many subscriptions are left.
|
||||
if (target_subs->size()) {
|
||||
|
|
|
@ -50,11 +50,12 @@ public:
|
|||
* there is no guarantee on which value is returned if a key is used multiple
|
||||
* times.
|
||||
*/
|
||||
template<typename T>
|
||||
template<typename T, typename Deleter = std::default_delete<T>>
|
||||
class MappedRingBuffer : public MappedRingBufferBase
|
||||
{
|
||||
public:
|
||||
RCLCPP_SMART_PTR_DEFINITIONS(MappedRingBuffer<T>);
|
||||
RCLCPP_SMART_PTR_DEFINITIONS(MappedRingBuffer<T, Deleter>);
|
||||
using ElemUniquePtr = std::unique_ptr<T, Deleter>;
|
||||
|
||||
/// Constructor.
|
||||
/* The constructor will allocate memory while reserving space.
|
||||
|
@ -82,12 +83,12 @@ public:
|
|||
* \param value if the key is found, the value is stored in this parameter
|
||||
*/
|
||||
void
|
||||
get_copy_at_key(uint64_t key, std::unique_ptr<T> & value)
|
||||
get_copy_at_key(uint64_t key, ElemUniquePtr & value)
|
||||
{
|
||||
auto it = get_iterator_of_key(key);
|
||||
value = nullptr;
|
||||
if (it != elements_.end() && it->in_use) {
|
||||
value = std::unique_ptr<T>(new T(*it->value));
|
||||
value = ElemUniquePtr(new T(*it->value));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -111,13 +112,13 @@ public:
|
|||
* \param value if the key is found, the value is stored in this parameter
|
||||
*/
|
||||
void
|
||||
get_ownership_at_key(uint64_t key, std::unique_ptr<T> & value)
|
||||
get_ownership_at_key(uint64_t key, ElemUniquePtr & value)
|
||||
{
|
||||
auto it = get_iterator_of_key(key);
|
||||
value = nullptr;
|
||||
if (it != elements_.end() && it->in_use) {
|
||||
// Make a copy.
|
||||
auto copy = std::unique_ptr<T>(new T(*it->value));
|
||||
auto copy = ElemUniquePtr(new T(*it->value));
|
||||
// Return the original.
|
||||
value.swap(it->value);
|
||||
// Store the copy.
|
||||
|
@ -136,7 +137,7 @@ public:
|
|||
* \param value if the key is found, the value is stored in this parameter
|
||||
*/
|
||||
void
|
||||
pop_at_key(uint64_t key, std::unique_ptr<T> & value)
|
||||
pop_at_key(uint64_t key, ElemUniquePtr & value)
|
||||
{
|
||||
auto it = get_iterator_of_key(key);
|
||||
value = nullptr;
|
||||
|
@ -158,7 +159,7 @@ public:
|
|||
* \param value the value to store, and optionally the value displaced
|
||||
*/
|
||||
bool
|
||||
push_and_replace(uint64_t key, std::unique_ptr<T> & value)
|
||||
push_and_replace(uint64_t key, ElemUniquePtr & value)
|
||||
{
|
||||
bool did_replace = elements_[head_].in_use;
|
||||
elements_[head_].key = key;
|
||||
|
@ -169,9 +170,9 @@ public:
|
|||
}
|
||||
|
||||
bool
|
||||
push_and_replace(uint64_t key, std::unique_ptr<T> && value)
|
||||
push_and_replace(uint64_t key, ElemUniquePtr && value)
|
||||
{
|
||||
std::unique_ptr<T> temp = std::move(value);
|
||||
ElemUniquePtr temp = std::move(value);
|
||||
return push_and_replace(key, temp);
|
||||
}
|
||||
|
||||
|
@ -183,12 +184,12 @@ public:
|
|||
}
|
||||
|
||||
private:
|
||||
RCLCPP_DISABLE_COPY(MappedRingBuffer<T>);
|
||||
RCLCPP_DISABLE_COPY(MappedRingBuffer<T, Deleter>);
|
||||
|
||||
struct element
|
||||
{
|
||||
uint64_t key;
|
||||
std::unique_ptr<T> value;
|
||||
ElemUniquePtr value;
|
||||
bool in_use;
|
||||
};
|
||||
|
||||
|
|
|
@ -17,6 +17,7 @@
|
|||
|
||||
#include <memory>
|
||||
|
||||
#include <rclcpp/allocator/allocator_common.hpp>
|
||||
#include <rclcpp/macros.hpp>
|
||||
|
||||
namespace rclcpp
|
||||
|
@ -26,24 +27,34 @@ namespace message_memory_strategy
|
|||
|
||||
/// Default allocation strategy for messages received by subscriptions.
|
||||
// A message memory strategy must be templated on the type of the subscription it belongs to.
|
||||
template<typename MessageT>
|
||||
template<typename MessageT, typename Alloc = std::allocator<void>>
|
||||
class MessageMemoryStrategy
|
||||
{
|
||||
|
||||
public:
|
||||
RCLCPP_SMART_PTR_DEFINITIONS(MessageMemoryStrategy);
|
||||
|
||||
using MessageAlloc = allocator::AllocRebind<MessageT, Alloc>;
|
||||
using MessageDeleter = allocator::Deleter<typename MessageAlloc::allocator_type, MessageT>;
|
||||
|
||||
MessageMemoryStrategy(std::shared_ptr<Alloc> allocator)
|
||||
{
|
||||
message_allocator_ = new typename MessageAlloc::allocator_type(*allocator.get());
|
||||
}
|
||||
|
||||
/// Default factory method
|
||||
static SharedPtr create_default()
|
||||
{
|
||||
return SharedPtr(new MessageMemoryStrategy<MessageT>);
|
||||
return std::make_shared<MessageMemoryStrategy<MessageT, Alloc>>(std::make_shared<Alloc>());
|
||||
}
|
||||
|
||||
/// By default, dynamically allocate a new message.
|
||||
// \return Shared pointer to the new message.
|
||||
virtual std::shared_ptr<MessageT> borrow_message()
|
||||
{
|
||||
return std::shared_ptr<MessageT>(new MessageT);
|
||||
auto ptr = MessageAlloc::allocate(*message_allocator_, 1);
|
||||
MessageAlloc::construct(*message_allocator_, ptr);
|
||||
return std::shared_ptr<MessageT>(ptr, message_deleter_);
|
||||
}
|
||||
|
||||
/// Release ownership of the message, which will deallocate it if it has no more owners.
|
||||
|
@ -52,6 +63,9 @@ public:
|
|||
{
|
||||
msg.reset();
|
||||
}
|
||||
|
||||
typename MessageAlloc::allocator_type * message_allocator_;
|
||||
MessageDeleter message_deleter_;
|
||||
};
|
||||
|
||||
} /* message_memory_strategy */
|
||||
|
|
|
@ -91,10 +91,11 @@ public:
|
|||
* \param[in] qos_history_depth The depth of the publisher message queue.
|
||||
* \return Shared pointer to the created publisher.
|
||||
*/
|
||||
template<typename MessageT>
|
||||
typename rclcpp::publisher::Publisher<MessageT>::SharedPtr
|
||||
template<typename MessageT, typename Alloc = std::allocator<void>>
|
||||
typename rclcpp::publisher::Publisher<MessageT, Alloc>::SharedPtr
|
||||
create_publisher(
|
||||
const std::string & topic_name, size_t qos_history_depth);
|
||||
const std::string & topic_name, size_t qos_history_depth,
|
||||
std::shared_ptr<Alloc> allocator = nullptr);
|
||||
|
||||
/// Create and return a Publisher.
|
||||
/**
|
||||
|
@ -102,11 +103,12 @@ public:
|
|||
* \param[in] qos_profile The quality of service profile to pass on to the rmw implementation.
|
||||
* \return Shared pointer to the created publisher.
|
||||
*/
|
||||
template<typename MessageT>
|
||||
typename rclcpp::publisher::Publisher<MessageT>::SharedPtr
|
||||
template<typename MessageT, typename Alloc = std::allocator<void>>
|
||||
typename rclcpp::publisher::Publisher<MessageT, Alloc>::SharedPtr
|
||||
create_publisher(
|
||||
const std::string & topic_name,
|
||||
const rmw_qos_profile_t & qos_profile = rmw_qos_profile_default);
|
||||
const rmw_qos_profile_t & qos_profile = rmw_qos_profile_default,
|
||||
std::shared_ptr<Alloc> allocator = nullptr);
|
||||
|
||||
/// Create and return a Subscription.
|
||||
/**
|
||||
|
@ -122,16 +124,17 @@ public:
|
|||
Windows build breaks when static member function passed as default
|
||||
argument to msg_mem_strat, nullptr is a workaround.
|
||||
*/
|
||||
template<typename MessageT, typename CallbackT>
|
||||
typename rclcpp::subscription::Subscription<MessageT>::SharedPtr
|
||||
template<typename MessageT, typename CallbackT, typename Alloc = std::allocator<void>>
|
||||
typename rclcpp::subscription::Subscription<MessageT, Alloc>::SharedPtr
|
||||
create_subscription(
|
||||
const std::string & topic_name,
|
||||
CallbackT callback,
|
||||
const rmw_qos_profile_t & qos_profile = rmw_qos_profile_default,
|
||||
rclcpp::callback_group::CallbackGroup::SharedPtr group = nullptr,
|
||||
bool ignore_local_publications = false,
|
||||
typename rclcpp::message_memory_strategy::MessageMemoryStrategy<MessageT>::SharedPtr
|
||||
msg_mem_strat = nullptr);
|
||||
typename rclcpp::message_memory_strategy::MessageMemoryStrategy<MessageT, Alloc>::SharedPtr
|
||||
msg_mem_strat = nullptr,
|
||||
std::shared_ptr<Alloc> allocator = nullptr);
|
||||
|
||||
/// Create and return a Subscription.
|
||||
/**
|
||||
|
@ -147,16 +150,17 @@ public:
|
|||
Windows build breaks when static member function passed as default
|
||||
argument to msg_mem_strat, nullptr is a workaround.
|
||||
*/
|
||||
template<typename MessageT, typename CallbackT>
|
||||
typename rclcpp::subscription::Subscription<MessageT>::SharedPtr
|
||||
template<typename MessageT, typename CallbackT, typename Alloc = std::allocator<void>>
|
||||
typename rclcpp::subscription::Subscription<MessageT, Alloc>::SharedPtr
|
||||
create_subscription(
|
||||
const std::string & topic_name,
|
||||
size_t qos_history_depth,
|
||||
CallbackT callback,
|
||||
rclcpp::callback_group::CallbackGroup::SharedPtr group = nullptr,
|
||||
bool ignore_local_publications = false,
|
||||
typename rclcpp::message_memory_strategy::MessageMemoryStrategy<MessageT>::SharedPtr
|
||||
msg_mem_strat = nullptr);
|
||||
typename rclcpp::message_memory_strategy::MessageMemoryStrategy<MessageT, Alloc>::SharedPtr
|
||||
msg_mem_strat = nullptr,
|
||||
std::shared_ptr<Alloc> allocator = nullptr);
|
||||
|
||||
/// Create a timer.
|
||||
/**
|
||||
|
@ -202,34 +206,25 @@ public:
|
|||
FunctorT callback,
|
||||
rclcpp::callback_group::CallbackGroup::SharedPtr group = nullptr);
|
||||
|
||||
template<typename Alloc>
|
||||
using StringRebind = typename Alloc::template rebind<std::string>::other;
|
||||
std::vector<rcl_interfaces::msg::SetParametersResult> set_parameters(
|
||||
const std::vector<rclcpp::parameter::ParameterVariant> & parameters);
|
||||
|
||||
template<typename Alloc = std::allocator<rclcpp::parameter::ParameterVariant>>
|
||||
typename std::vector<rcl_interfaces::msg::SetParametersResult, Alloc> set_parameters(
|
||||
const typename std::vector<rclcpp::parameter::ParameterVariant, Alloc> & parameters);
|
||||
|
||||
template<typename Alloc = std::allocator<rclcpp::parameter::ParameterVariant>>
|
||||
rcl_interfaces::msg::SetParametersResult set_parameters_atomically(
|
||||
const typename std::vector<rclcpp::parameter::ParameterVariant, Alloc> & parameters);
|
||||
const std::vector<rclcpp::parameter::ParameterVariant> & parameters);
|
||||
|
||||
template<typename Alloc = std::allocator<rclcpp::parameter::ParameterVariant>>
|
||||
typename std::vector<rclcpp::parameter::ParameterVariant, Alloc> get_parameters(
|
||||
const typename std::vector<std::string, StringRebind<Alloc>> & names) const;
|
||||
std::vector<rclcpp::parameter::ParameterVariant> get_parameters(
|
||||
const std::vector<std::string> & names) const;
|
||||
|
||||
template<typename Alloc = std::allocator<rcl_interfaces::msg::ParameterDescriptor>>
|
||||
typename std::vector<rcl_interfaces::msg::ParameterDescriptor, Alloc> describe_parameters(
|
||||
const typename std::vector<std::string, StringRebind<Alloc>>> & names) const;
|
||||
std::vector<rcl_interfaces::msg::ParameterDescriptor> describe_parameters(
|
||||
const std::vector<std::string> & names) const;
|
||||
|
||||
template<typename Alloc = std::allocator<uint8_t>>
|
||||
typename std::vector<uint8_t, Alloc> get_parameter_types(
|
||||
const typename std::vector<std::string, StringRebind<Alloc>>> & names) const;
|
||||
std::vector<uint8_t> get_parameter_types(
|
||||
const std::vector<std::string> & names) const;
|
||||
|
||||
rcl_interfaces::msg::ListParametersResult list_parameters(
|
||||
const std::vector<std::string> & prefixes, uint64_t depth) const;
|
||||
|
||||
template<typename Alloc = std::allocator<std::pair<const std::string, std::string>>>
|
||||
typename std::map<std::string, std::string, Alloc> get_topic_names_and_types() const;
|
||||
std::map<std::string, std::string> get_topic_names_and_types() const;
|
||||
|
||||
size_t count_publishers(const std::string & topic_name) const;
|
||||
|
||||
|
|
|
@ -113,21 +113,29 @@ Node::create_callback_group(
|
|||
return group;
|
||||
}
|
||||
|
||||
template<typename MessageT>
|
||||
typename rclcpp::publisher::Publisher<MessageT>::SharedPtr
|
||||
template<typename MessageT, typename Alloc>
|
||||
typename rclcpp::publisher::Publisher<MessageT, Alloc>::SharedPtr
|
||||
Node::create_publisher(
|
||||
const std::string & topic_name, size_t qos_history_depth)
|
||||
const std::string & topic_name, size_t qos_history_depth,
|
||||
std::shared_ptr<Alloc> allocator)
|
||||
{
|
||||
if (!allocator) {
|
||||
allocator = std::make_shared<Alloc>();
|
||||
}
|
||||
rmw_qos_profile_t qos = rmw_qos_profile_default;
|
||||
qos.depth = qos_history_depth;
|
||||
return this->create_publisher<MessageT>(topic_name, qos);
|
||||
return this->create_publisher<MessageT, Alloc>(topic_name, qos, allocator);
|
||||
}
|
||||
|
||||
template<typename MessageT>
|
||||
typename publisher::Publisher<MessageT>::SharedPtr
|
||||
template<typename MessageT, typename Alloc>
|
||||
typename publisher::Publisher<MessageT, Alloc>::SharedPtr
|
||||
Node::create_publisher(
|
||||
const std::string & topic_name, const rmw_qos_profile_t & qos_profile)
|
||||
const std::string & topic_name, const rmw_qos_profile_t & qos_profile,
|
||||
std::shared_ptr<Alloc> allocator)
|
||||
{
|
||||
if (!allocator) {
|
||||
allocator = std::make_shared<Alloc>();
|
||||
}
|
||||
using rosidl_generator_cpp::get_message_type_support_handle;
|
||||
auto type_support_handle = get_message_type_support_handle<MessageT>();
|
||||
rmw_publisher_t * publisher_handle = rmw_create_publisher(
|
||||
|
@ -140,8 +148,8 @@ Node::create_publisher(
|
|||
// *INDENT-ON*
|
||||
}
|
||||
|
||||
auto publisher = publisher::Publisher<MessageT>::make_shared(
|
||||
node_handle_, publisher_handle, topic_name, qos_profile.depth);
|
||||
auto publisher = publisher::Publisher<MessageT, Alloc>::make_shared(
|
||||
node_handle_, publisher_handle, topic_name, qos_profile.depth, allocator);
|
||||
|
||||
if (use_intra_process_comms_) {
|
||||
rmw_publisher_t * intra_process_publisher_handle = rmw_create_publisher(
|
||||
|
@ -157,7 +165,7 @@ Node::create_publisher(
|
|||
auto intra_process_manager =
|
||||
context_->get_sub_context<rclcpp::intra_process_manager::IntraProcessManager>();
|
||||
uint64_t intra_process_publisher_id =
|
||||
intra_process_manager->add_publisher<MessageT>(publisher);
|
||||
intra_process_manager->add_publisher<MessageT, Alloc>(publisher);
|
||||
rclcpp::intra_process_manager::IntraProcessManager::WeakPtr weak_ipm = intra_process_manager;
|
||||
// *INDENT-OFF*
|
||||
auto shared_publish_callback =
|
||||
|
@ -205,25 +213,31 @@ Node::group_in_node(callback_group::CallbackGroup::SharedPtr group)
|
|||
return group_belongs_to_this_node;
|
||||
}
|
||||
|
||||
template<typename MessageT, typename CallbackT>
|
||||
typename rclcpp::subscription::Subscription<MessageT>::SharedPtr
|
||||
template<typename MessageT, typename CallbackT, typename Alloc>
|
||||
typename rclcpp::subscription::Subscription<MessageT, Alloc>::SharedPtr
|
||||
Node::create_subscription(
|
||||
const std::string & topic_name,
|
||||
CallbackT callback,
|
||||
const rmw_qos_profile_t & qos_profile,
|
||||
rclcpp::callback_group::CallbackGroup::SharedPtr group,
|
||||
bool ignore_local_publications,
|
||||
typename rclcpp::message_memory_strategy::MessageMemoryStrategy<MessageT>::SharedPtr
|
||||
msg_mem_strat)
|
||||
typename rclcpp::message_memory_strategy::MessageMemoryStrategy<MessageT, Alloc>::SharedPtr
|
||||
msg_mem_strat,
|
||||
typename std::shared_ptr<Alloc> allocator)
|
||||
{
|
||||
rclcpp::subscription::AnySubscriptionCallback<MessageT> any_subscription_callback;
|
||||
if (!allocator) {
|
||||
allocator = std::make_shared<Alloc>();
|
||||
}
|
||||
|
||||
rclcpp::subscription::AnySubscriptionCallback<MessageT,
|
||||
Alloc> any_subscription_callback(allocator);
|
||||
any_subscription_callback.set(callback);
|
||||
|
||||
using rosidl_generator_cpp::get_message_type_support_handle;
|
||||
|
||||
if (!msg_mem_strat) {
|
||||
msg_mem_strat =
|
||||
rclcpp::message_memory_strategy::MessageMemoryStrategy<MessageT>::create_default();
|
||||
rclcpp::message_memory_strategy::MessageMemoryStrategy<MessageT, Alloc>::create_default();
|
||||
}
|
||||
|
||||
auto type_support_handle = get_message_type_support_handle<MessageT>();
|
||||
|
@ -239,7 +253,7 @@ Node::create_subscription(
|
|||
|
||||
using namespace rclcpp::subscription;
|
||||
|
||||
auto sub = Subscription<MessageT>::make_shared(
|
||||
auto sub = Subscription<MessageT, Alloc>::make_shared(
|
||||
node_handle_,
|
||||
subscriber_handle,
|
||||
topic_name,
|
||||
|
@ -271,7 +285,7 @@ Node::create_subscription(
|
|||
uint64_t publisher_id,
|
||||
uint64_t message_sequence,
|
||||
uint64_t subscription_id,
|
||||
std::unique_ptr<MessageT> & message)
|
||||
typename Subscription<MessageT, Alloc>::MessageUniquePtr & message)
|
||||
{
|
||||
auto ipm = weak_ipm.lock();
|
||||
if (!ipm) {
|
||||
|
@ -306,26 +320,28 @@ Node::create_subscription(
|
|||
return sub;
|
||||
}
|
||||
|
||||
template<typename MessageT, typename CallbackT>
|
||||
typename rclcpp::subscription::Subscription<MessageT>::SharedPtr
|
||||
template<typename MessageT, typename CallbackT, typename Alloc>
|
||||
typename rclcpp::subscription::Subscription<MessageT, Alloc>::SharedPtr
|
||||
Node::create_subscription(
|
||||
const std::string & topic_name,
|
||||
size_t qos_history_depth,
|
||||
CallbackT callback,
|
||||
rclcpp::callback_group::CallbackGroup::SharedPtr group,
|
||||
bool ignore_local_publications,
|
||||
typename rclcpp::message_memory_strategy::MessageMemoryStrategy<MessageT>::SharedPtr
|
||||
msg_mem_strat)
|
||||
typename rclcpp::message_memory_strategy::MessageMemoryStrategy<MessageT, Alloc>::SharedPtr
|
||||
msg_mem_strat,
|
||||
std::shared_ptr<Alloc> allocator)
|
||||
{
|
||||
rmw_qos_profile_t qos = rmw_qos_profile_default;
|
||||
qos.depth = qos_history_depth;
|
||||
return this->create_subscription<MessageT, CallbackT>(
|
||||
return this->create_subscription<MessageT, CallbackT, Alloc>(
|
||||
topic_name,
|
||||
callback,
|
||||
qos,
|
||||
group,
|
||||
ignore_local_publications,
|
||||
msg_mem_strat);
|
||||
msg_mem_strat,
|
||||
allocator);
|
||||
}
|
||||
|
||||
rclcpp::timer::WallTimer::SharedPtr
|
||||
|
@ -443,7 +459,6 @@ Node::create_service(
|
|||
return serv;
|
||||
}
|
||||
|
||||
template<typename Alloc>
|
||||
std::vector<rcl_interfaces::msg::SetParametersResult>
|
||||
Node::set_parameters(
|
||||
const std::vector<rclcpp::parameter::ParameterVariant> & parameters)
|
||||
|
@ -456,7 +471,6 @@ Node::set_parameters(
|
|||
return results;
|
||||
}
|
||||
|
||||
template<typename Alloc>
|
||||
rcl_interfaces::msg::SetParametersResult
|
||||
Node::set_parameters_atomically(
|
||||
const std::vector<rclcpp::parameter::ParameterVariant> & parameters)
|
||||
|
@ -491,7 +505,6 @@ Node::set_parameters_atomically(
|
|||
return result;
|
||||
}
|
||||
|
||||
template<typename Alloc>
|
||||
std::vector<rclcpp::parameter::ParameterVariant>
|
||||
Node::get_parameters(
|
||||
const std::vector<std::string> & names) const
|
||||
|
@ -511,7 +524,6 @@ Node::get_parameters(
|
|||
return results;
|
||||
}
|
||||
|
||||
template<typename Alloc>
|
||||
std::vector<rcl_interfaces::msg::ParameterDescriptor>
|
||||
Node::describe_parameters(
|
||||
const std::vector<std::string> & names) const
|
||||
|
@ -532,7 +544,6 @@ Node::describe_parameters(
|
|||
return results;
|
||||
}
|
||||
|
||||
template<typename Alloc>
|
||||
std::vector<uint8_t>
|
||||
Node::get_parameter_types(
|
||||
const std::vector<std::string> & names) const
|
||||
|
|
|
@ -205,23 +205,28 @@ protected:
|
|||
};
|
||||
|
||||
/// A publisher publishes messages of any type to a topic.
|
||||
template<typename MessageT, template<typename> class Alloc = std::allocator>
|
||||
template<typename MessageT, typename Alloc = std::allocator<void>>
|
||||
class Publisher : public PublisherBase
|
||||
{
|
||||
friend rclcpp::node::Node;
|
||||
|
||||
public:
|
||||
using MessageAlloc = allocator::AllocRebind<MessageT, Alloc>;
|
||||
using MessageDeleter = allocator::Deleter<typename MessageAlloc::allocator_type, MessageT>;
|
||||
using MessageUniquePtr = std::unique_ptr<MessageT, MessageDeleter>;
|
||||
|
||||
RCLCPP_SMART_PTR_DEFINITIONS(Publisher<MessageT, Alloc>);
|
||||
|
||||
Publisher(
|
||||
std::shared_ptr<rmw_node_t> node_handle,
|
||||
rmw_publisher_t * publisher_handle,
|
||||
std::string topic,
|
||||
size_t queue_size)
|
||||
size_t queue_size,
|
||||
std::shared_ptr<Alloc> allocator)
|
||||
: PublisherBase(node_handle, publisher_handle, topic, queue_size)
|
||||
{
|
||||
// TODO: avoid messy initialization
|
||||
allocator::set_allocator_for_deleter(&message_deleter_, &message_allocator_);
|
||||
message_allocator_ = new typename MessageAlloc::allocator_type(*allocator.get());
|
||||
allocator::set_allocator_for_deleter(&message_deleter_, message_allocator_);
|
||||
}
|
||||
|
||||
|
||||
|
@ -231,7 +236,7 @@ public:
|
|||
* \param[in] msg A shared pointer to the message to send.
|
||||
*/
|
||||
void
|
||||
publish(std::unique_ptr<MessageT> & msg)
|
||||
publish(MessageUniquePtr & msg)
|
||||
{
|
||||
this->do_inter_process_publish(msg.get());
|
||||
if (store_intra_process_message_) {
|
||||
|
@ -279,9 +284,9 @@ public:
|
|||
// The intra process manager should probably also be able to store
|
||||
// shared_ptr's and do the "smart" thing based on other intra process
|
||||
// subscriptions. For now call the other publish().
|
||||
auto ptr = std::allocator_traits<Alloc<MessageT>>::allocate(message_allocator_, 1);
|
||||
std::allocator_traits<Alloc<MessageT>>::construct(message_allocator_, ptr, *msg.get());
|
||||
std::unique_ptr<MessageT, allocator::Deleter<Alloc<MessageT>, MessageT>> unique_msg(ptr, message_deleter_);
|
||||
auto ptr = MessageAlloc::allocate(*message_allocator_, 1);
|
||||
MessageAlloc::construct(*message_allocator_, ptr, *msg.get());
|
||||
MessageUniquePtr unique_msg(ptr, message_deleter_);
|
||||
return this->publish(unique_msg);
|
||||
}
|
||||
|
||||
|
@ -298,9 +303,9 @@ public:
|
|||
// The intra process manager should probably also be able to store
|
||||
// shared_ptr's and do the "smart" thing based on other intra process
|
||||
// subscriptions. For now call the other publish().
|
||||
auto ptr = std::allocator_traits<Alloc<MessageT>>::allocate(message_allocator_, 1);
|
||||
std::allocator_traits<Alloc<MessageT>>::construct(message_allocator_, ptr, *msg.get());
|
||||
std::unique_ptr<MessageT, allocator::Deleter<Alloc<MessageT>, MessageT>> unique_msg(ptr, message_deleter_);
|
||||
auto ptr = MessageAlloc::allocate(*message_allocator_, 1);
|
||||
MessageAlloc::construct(*message_allocator_, ptr, *msg.get());
|
||||
MessageUniquePtr unique_msg(ptr, message_deleter_);
|
||||
return this->publish(unique_msg);
|
||||
}
|
||||
|
||||
|
@ -313,9 +318,9 @@ public:
|
|||
return this->do_inter_process_publish(&msg);
|
||||
}
|
||||
// Otherwise we have to allocate memory in a unique_ptr and pass it along.
|
||||
auto ptr = std::allocator_traits<Alloc<MessageT>>::allocate(message_allocator_, 1);
|
||||
std::allocator_traits<Alloc<MessageT>>::construct(message_allocator_, ptr, msg);
|
||||
std::unique_ptr<MessageT, allocator::Deleter<Alloc<MessageT>, MessageT>> unique_msg(ptr, message_deleter_);
|
||||
auto ptr = MessageAlloc::allocate(*message_allocator_, 1);
|
||||
MessageAlloc::construct(*message_allocator_, ptr, msg);
|
||||
MessageUniquePtr unique_msg(ptr, message_deleter_);
|
||||
return this->publish(unique_msg);
|
||||
}
|
||||
|
||||
|
@ -332,9 +337,9 @@ protected:
|
|||
}
|
||||
}
|
||||
|
||||
Alloc<MessageT> message_allocator_;
|
||||
typename MessageAlloc::allocator_type * message_allocator_;
|
||||
|
||||
allocator::Deleter<Alloc<MessageT>, MessageT> message_deleter_;
|
||||
MessageDeleter message_deleter_;
|
||||
|
||||
};
|
||||
|
||||
|
|
|
@ -144,12 +144,16 @@ private:
|
|||
using namespace any_subscription_callback;
|
||||
|
||||
/// Subscription implementation, templated on the type of message this subscription receives.
|
||||
template<typename MessageT>
|
||||
template<typename MessageT, typename Alloc = std::allocator<void>>
|
||||
class Subscription : public SubscriptionBase
|
||||
{
|
||||
friend class rclcpp::node::Node;
|
||||
|
||||
public:
|
||||
using MessageAlloc = allocator::AllocRebind<MessageT, Alloc>;
|
||||
using MessageDeleter = allocator::Deleter<typename MessageAlloc::allocator_type, MessageT>;
|
||||
using MessageUniquePtr = std::unique_ptr<MessageT, MessageDeleter>;
|
||||
|
||||
RCLCPP_SMART_PTR_DEFINITIONS(Subscription);
|
||||
|
||||
/// Default constructor.
|
||||
|
@ -167,15 +171,17 @@ public:
|
|||
rmw_subscription_t * subscription_handle,
|
||||
const std::string & topic_name,
|
||||
bool ignore_local_publications,
|
||||
AnySubscriptionCallback<MessageT> callback,
|
||||
typename message_memory_strategy::MessageMemoryStrategy<MessageT>::SharedPtr memory_strategy =
|
||||
message_memory_strategy::MessageMemoryStrategy<MessageT>::create_default())
|
||||
AnySubscriptionCallback<MessageT, Alloc> callback,
|
||||
typename message_memory_strategy::MessageMemoryStrategy<MessageT, Alloc>::SharedPtr memory_strategy =
|
||||
message_memory_strategy::MessageMemoryStrategy<MessageT,
|
||||
Alloc>::create_default())
|
||||
: SubscriptionBase(node_handle, subscription_handle, topic_name, ignore_local_publications),
|
||||
any_callback_(callback),
|
||||
message_memory_strategy_(memory_strategy),
|
||||
get_intra_process_message_callback_(nullptr),
|
||||
matches_any_intra_process_publishers_(nullptr)
|
||||
{}
|
||||
{
|
||||
}
|
||||
|
||||
/// Support dynamically setting the message memory strategy.
|
||||
/**
|
||||
|
@ -183,7 +189,8 @@ public:
|
|||
* \param[in] message_memory_strategy Shared pointer to the memory strategy to set.
|
||||
*/
|
||||
void set_message_memory_strategy(
|
||||
typename message_memory_strategy::MessageMemoryStrategy<MessageT>::SharedPtr message_memory_strategy)
|
||||
typename message_memory_strategy::MessageMemoryStrategy<MessageT,
|
||||
Alloc>::SharedPtr message_memory_strategy)
|
||||
{
|
||||
message_memory_strategy_ = message_memory_strategy;
|
||||
}
|
||||
|
@ -226,7 +233,7 @@ public:
|
|||
// However, this can only really happen if this node has it disabled, but the other doesn't.
|
||||
return;
|
||||
}
|
||||
std::unique_ptr<MessageT> msg;
|
||||
MessageUniquePtr msg;
|
||||
get_intra_process_message_callback_(
|
||||
ipm.publisher_id,
|
||||
ipm.message_sequence,
|
||||
|
@ -244,7 +251,7 @@ public:
|
|||
private:
|
||||
typedef
|
||||
std::function<
|
||||
void (uint64_t, uint64_t, uint64_t, std::unique_ptr<MessageT> &)
|
||||
void (uint64_t, uint64_t, uint64_t, MessageUniquePtr &)
|
||||
> GetMessageCallbackType;
|
||||
typedef std::function<bool (const rmw_gid_t *)> MatchesAnyPublishersCallbackType;
|
||||
|
||||
|
@ -262,14 +269,13 @@ private:
|
|||
|
||||
RCLCPP_DISABLE_COPY(Subscription);
|
||||
|
||||
AnySubscriptionCallback<MessageT> any_callback_;
|
||||
typename message_memory_strategy::MessageMemoryStrategy<MessageT>::SharedPtr
|
||||
AnySubscriptionCallback<MessageT, Alloc> any_callback_;
|
||||
typename message_memory_strategy::MessageMemoryStrategy<MessageT, Alloc>::SharedPtr
|
||||
message_memory_strategy_;
|
||||
|
||||
GetMessageCallbackType get_intra_process_message_callback_;
|
||||
MatchesAnyPublishersCallbackType matches_any_intra_process_publishers_;
|
||||
uint64_t intra_process_subscription_id_;
|
||||
|
||||
};
|
||||
|
||||
} /* namespace subscription */
|
||||
|
|
|
@ -55,11 +55,11 @@ public:
|
|||
size_t mock_queue_size;
|
||||
};
|
||||
|
||||
template<typename T>
|
||||
template<typename T, typename Alloc = std::allocator<void>>
|
||||
class Publisher : public PublisherBase
|
||||
{
|
||||
public:
|
||||
RCLCPP_SMART_PTR_DEFINITIONS(Publisher<T>);
|
||||
RCLCPP_SMART_PTR_DEFINITIONS(Publisher<T, Alloc>);
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -139,8 +139,10 @@ TEST(TestIntraProcessManager, nominal) {
|
|||
s1->mock_topic_name = "nominal1";
|
||||
s1->mock_queue_size = 10;
|
||||
|
||||
auto p1_id = ipm.add_publisher<rcl_interfaces::msg::IntraProcessMessage>(p1);
|
||||
auto p2_id = ipm.add_publisher<rcl_interfaces::msg::IntraProcessMessage>(p2);
|
||||
auto p1_id =
|
||||
ipm.add_publisher<rcl_interfaces::msg::IntraProcessMessage, std::allocator<void>>(p1);
|
||||
auto p2_id =
|
||||
ipm.add_publisher<rcl_interfaces::msg::IntraProcessMessage, std::allocator<void>>(p2);
|
||||
auto s1_id = ipm.add_subscription(s1);
|
||||
|
||||
auto ipm_msg = std::make_shared<rcl_interfaces::msg::IntraProcessMessage>();
|
||||
|
@ -221,7 +223,8 @@ TEST(TestIntraProcessManager, remove_publisher_before_trying_to_take) {
|
|||
s1->mock_topic_name = "nominal1";
|
||||
s1->mock_queue_size = 10;
|
||||
|
||||
auto p1_id = ipm.add_publisher<rcl_interfaces::msg::IntraProcessMessage>(p1);
|
||||
auto p1_id =
|
||||
ipm.add_publisher<rcl_interfaces::msg::IntraProcessMessage, std::allocator<void>>(p1);
|
||||
auto s1_id = ipm.add_subscription(s1);
|
||||
|
||||
auto ipm_msg = std::make_shared<rcl_interfaces::msg::IntraProcessMessage>();
|
||||
|
@ -269,7 +272,8 @@ TEST(TestIntraProcessManager, removed_subscription_affects_take) {
|
|||
s3->mock_topic_name = "nominal1";
|
||||
s3->mock_queue_size = 10;
|
||||
|
||||
auto p1_id = ipm.add_publisher<rcl_interfaces::msg::IntraProcessMessage>(p1);
|
||||
auto p1_id =
|
||||
ipm.add_publisher<rcl_interfaces::msg::IntraProcessMessage, std::allocator<void>>(p1);
|
||||
auto s1_id = ipm.add_subscription(s1);
|
||||
auto s2_id = ipm.add_subscription(s2);
|
||||
auto s3_id = ipm.add_subscription(s3);
|
||||
|
@ -338,7 +342,8 @@ TEST(TestIntraProcessManager, multiple_subscriptions_one_publisher) {
|
|||
s3->mock_topic_name = "nominal1";
|
||||
s3->mock_queue_size = 10;
|
||||
|
||||
auto p1_id = ipm.add_publisher<rcl_interfaces::msg::IntraProcessMessage>(p1);
|
||||
auto p1_id =
|
||||
ipm.add_publisher<rcl_interfaces::msg::IntraProcessMessage, std::allocator<void>>(p1);
|
||||
auto s1_id = ipm.add_subscription(s1);
|
||||
auto s2_id = ipm.add_subscription(s2);
|
||||
auto s3_id = ipm.add_subscription(s3);
|
||||
|
@ -410,9 +415,12 @@ TEST(TestIntraProcessManager, multiple_publishers_one_subscription) {
|
|||
s1->mock_topic_name = "nominal1";
|
||||
s1->mock_queue_size = 10;
|
||||
|
||||
auto p1_id = ipm.add_publisher<rcl_interfaces::msg::IntraProcessMessage>(p1);
|
||||
auto p2_id = ipm.add_publisher<rcl_interfaces::msg::IntraProcessMessage>(p2);
|
||||
auto p3_id = ipm.add_publisher<rcl_interfaces::msg::IntraProcessMessage>(p3);
|
||||
auto p1_id =
|
||||
ipm.add_publisher<rcl_interfaces::msg::IntraProcessMessage, std::allocator<void>>(p1);
|
||||
auto p2_id =
|
||||
ipm.add_publisher<rcl_interfaces::msg::IntraProcessMessage, std::allocator<void>>(p2);
|
||||
auto p3_id =
|
||||
ipm.add_publisher<rcl_interfaces::msg::IntraProcessMessage, std::allocator<void>>(p3);
|
||||
auto s1_id = ipm.add_subscription(s1);
|
||||
|
||||
auto ipm_msg = std::make_shared<rcl_interfaces::msg::IntraProcessMessage>();
|
||||
|
@ -512,9 +520,12 @@ TEST(TestIntraProcessManager, multiple_publishers_multiple_subscription) {
|
|||
s3->mock_topic_name = "nominal1";
|
||||
s3->mock_queue_size = 10;
|
||||
|
||||
auto p1_id = ipm.add_publisher<rcl_interfaces::msg::IntraProcessMessage>(p1);
|
||||
auto p2_id = ipm.add_publisher<rcl_interfaces::msg::IntraProcessMessage>(p2);
|
||||
auto p3_id = ipm.add_publisher<rcl_interfaces::msg::IntraProcessMessage>(p3);
|
||||
auto p1_id =
|
||||
ipm.add_publisher<rcl_interfaces::msg::IntraProcessMessage, std::allocator<void>>(p1);
|
||||
auto p2_id =
|
||||
ipm.add_publisher<rcl_interfaces::msg::IntraProcessMessage, std::allocator<void>>(p2);
|
||||
auto p3_id =
|
||||
ipm.add_publisher<rcl_interfaces::msg::IntraProcessMessage, std::allocator<void>>(p3);
|
||||
auto s1_id = ipm.add_subscription(s1);
|
||||
auto s2_id = ipm.add_subscription(s2);
|
||||
auto s3_id = ipm.add_subscription(s3);
|
||||
|
@ -655,7 +666,8 @@ TEST(TestIntraProcessManager, ring_buffer_displacement) {
|
|||
s1->mock_topic_name = "nominal1";
|
||||
s1->mock_queue_size = 10;
|
||||
|
||||
auto p1_id = ipm.add_publisher<rcl_interfaces::msg::IntraProcessMessage>(p1);
|
||||
auto p1_id =
|
||||
ipm.add_publisher<rcl_interfaces::msg::IntraProcessMessage, std::allocator<void>>(p1);
|
||||
auto s1_id = ipm.add_subscription(s1);
|
||||
|
||||
auto ipm_msg = std::make_shared<rcl_interfaces::msg::IntraProcessMessage>();
|
||||
|
@ -720,7 +732,8 @@ TEST(TestIntraProcessManager, subscription_creation_race_condition) {
|
|||
p1->mock_topic_name = "nominal1";
|
||||
p1->mock_queue_size = 2;
|
||||
|
||||
auto p1_id = ipm.add_publisher<rcl_interfaces::msg::IntraProcessMessage>(p1);
|
||||
auto p1_id =
|
||||
ipm.add_publisher<rcl_interfaces::msg::IntraProcessMessage, std::allocator<void>>(p1);
|
||||
|
||||
auto ipm_msg = std::make_shared<rcl_interfaces::msg::IntraProcessMessage>();
|
||||
ipm_msg->message_sequence = 42;
|
||||
|
@ -767,7 +780,7 @@ TEST(TestIntraProcessManager, publisher_out_of_scope_take) {
|
|||
p1->mock_topic_name = "nominal1";
|
||||
p1->mock_queue_size = 2;
|
||||
|
||||
p1_id = ipm.add_publisher<rcl_interfaces::msg::IntraProcessMessage>(p1);
|
||||
p1_id = ipm.add_publisher<rcl_interfaces::msg::IntraProcessMessage, std::allocator<void>>(p1);
|
||||
|
||||
auto ipm_msg = std::make_shared<rcl_interfaces::msg::IntraProcessMessage>();
|
||||
ipm_msg->message_sequence = 42;
|
||||
|
@ -805,7 +818,7 @@ TEST(TestIntraProcessManager, publisher_out_of_scope_store) {
|
|||
p1->mock_topic_name = "nominal1";
|
||||
p1->mock_queue_size = 2;
|
||||
|
||||
p1_id = ipm.add_publisher<rcl_interfaces::msg::IntraProcessMessage>(p1);
|
||||
p1_id = ipm.add_publisher<rcl_interfaces::msg::IntraProcessMessage, std::allocator<void>>(p1);
|
||||
}
|
||||
|
||||
auto ipm_msg = std::make_shared<rcl_interfaces::msg::IntraProcessMessage>();
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue