New IntraProcessManager capable of storing shared_ptr<const T> (#690)

* Changed mapped_ring_buffer class to store both shared_ptr or unique_ptr

Signed-off-by: ivanpauno <ivanpauno@ekumenlabs.com>

* Changed the IPM store and take methods

Signed-off-by: ivanpauno <ivanpauno@ekumenlabs.com>

* Changed publish methods to take advantage of the new IPM

Signed-off-by: ivanpauno <ivanpauno@ekumenlabs.com>

* Change how subscriptions handle intraprocess messages

Signed-off-by: ivanpauno <ivanpauno@ekumenlabs.com>

* Modified publish method signatures

Signed-off-by: ivanpauno <ivanpauno@ekumenlabs.com>

* Renamed 'publisher.cpp' and 'subscription.cpp' to 'publisher_base.cpp' and 'subscription_base.cpp'

Signed-off-by: ivanpauno <ivanpauno@ekumenlabs.com>

* Updated lifecycle_publisher publish methods

Signed-off-by: ivanpauno <ivanpauno@ekumenlabs.com>
This commit is contained in:
ivanpauno 2019-04-30 16:05:53 -03:00 committed by GitHub
parent d34fa607a2
commit 98f610c114
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
18 changed files with 1115 additions and 787 deletions

View file

@ -64,10 +64,10 @@ set(${PROJECT_NAME}_SRCS
src/rclcpp/parameter_events_filter.cpp src/rclcpp/parameter_events_filter.cpp
src/rclcpp/parameter_map.cpp src/rclcpp/parameter_map.cpp
src/rclcpp/parameter_service.cpp src/rclcpp/parameter_service.cpp
src/rclcpp/publisher.cpp src/rclcpp/publisher_base.cpp
src/rclcpp/service.cpp src/rclcpp/service.cpp
src/rclcpp/signal_handler.cpp src/rclcpp/signal_handler.cpp
src/rclcpp/subscription.cpp src/rclcpp/subscription_base.cpp
src/rclcpp/time.cpp src/rclcpp/time.cpp
src/rclcpp/time_source.cpp src/rclcpp/time_source.cpp
src/rclcpp/timer.cpp src/rclcpp/timer.cpp

View file

@ -36,6 +36,7 @@ class AnySubscriptionCallback
using MessageAllocTraits = allocator::AllocRebind<MessageT, Alloc>; using MessageAllocTraits = allocator::AllocRebind<MessageT, Alloc>;
using MessageAlloc = typename MessageAllocTraits::allocator_type; using MessageAlloc = typename MessageAllocTraits::allocator_type;
using MessageDeleter = allocator::Deleter<MessageAlloc, MessageT>; using MessageDeleter = allocator::Deleter<MessageAlloc, MessageT>;
using ConstMessageSharedPtr = std::shared_ptr<const MessageT>;
using MessageUniquePtr = std::unique_ptr<MessageT, MessageDeleter>; using MessageUniquePtr = std::unique_ptr<MessageT, MessageDeleter>;
using SharedPtrCallback = std::function<void (const std::shared_ptr<MessageT>)>; using SharedPtrCallback = std::function<void (const std::shared_ptr<MessageT>)>;
@ -154,7 +155,6 @@ public:
void dispatch( void dispatch(
std::shared_ptr<MessageT> message, const rmw_message_info_t & message_info) std::shared_ptr<MessageT> message, const rmw_message_info_t & message_info)
{ {
(void)message_info;
if (shared_ptr_callback_) { if (shared_ptr_callback_) {
shared_ptr_callback_(message); shared_ptr_callback_(message);
} else if (shared_ptr_with_info_callback_) { } else if (shared_ptr_with_info_callback_) {
@ -177,30 +177,50 @@ public:
} }
void dispatch_intra_process( void dispatch_intra_process(
MessageUniquePtr & message, const rmw_message_info_t & message_info) ConstMessageSharedPtr message, const rmw_message_info_t & message_info)
{
if (const_shared_ptr_callback_) {
const_shared_ptr_callback_(message);
} else if (const_shared_ptr_with_info_callback_) {
const_shared_ptr_with_info_callback_(message, message_info);
} else {
if (unique_ptr_callback_ || unique_ptr_with_info_callback_ ||
shared_ptr_callback_ || shared_ptr_with_info_callback_)
{
throw std::runtime_error("unexpected dispatch_intra_process const shared "
"message call with no const shared_ptr callback");
} else {
throw std::runtime_error("unexpected message without any callback set");
}
}
}
void dispatch_intra_process(
MessageUniquePtr message, const rmw_message_info_t & message_info)
{ {
(void)message_info;
if (shared_ptr_callback_) { if (shared_ptr_callback_) {
typename std::shared_ptr<MessageT> shared_message = std::move(message); typename std::shared_ptr<MessageT> shared_message = std::move(message);
shared_ptr_callback_(shared_message); shared_ptr_callback_(shared_message);
} else if (shared_ptr_with_info_callback_) { } else if (shared_ptr_with_info_callback_) {
typename std::shared_ptr<MessageT> shared_message = std::move(message); typename std::shared_ptr<MessageT> shared_message = std::move(message);
shared_ptr_with_info_callback_(shared_message, message_info); shared_ptr_with_info_callback_(shared_message, message_info);
} else if (const_shared_ptr_callback_) {
typename std::shared_ptr<MessageT const> const_shared_message = std::move(message);
const_shared_ptr_callback_(const_shared_message);
} else if (const_shared_ptr_with_info_callback_) {
typename std::shared_ptr<MessageT const> const_shared_message = std::move(message);
const_shared_ptr_with_info_callback_(const_shared_message, message_info);
} else if (unique_ptr_callback_) { } else if (unique_ptr_callback_) {
unique_ptr_callback_(std::move(message)); unique_ptr_callback_(std::move(message));
} else if (unique_ptr_with_info_callback_) { } else if (unique_ptr_with_info_callback_) {
unique_ptr_with_info_callback_(std::move(message), message_info); unique_ptr_with_info_callback_(std::move(message), message_info);
} else if (const_shared_ptr_callback_ || const_shared_ptr_with_info_callback_) {
throw std::runtime_error("unexpected dispatch_intra_process unique message call"
" with const shared_ptr callback");
} else { } else {
throw std::runtime_error("unexpected message without any callback set"); throw std::runtime_error("unexpected message without any callback set");
} }
} }
bool use_take_shared_method()
{
return const_shared_ptr_callback_ || const_shared_ptr_with_info_callback_;
}
private: private:
std::shared_ptr<MessageAlloc> message_allocator_; std::shared_ptr<MessageAlloc> message_allocator_;
MessageDeleter message_deleter_; MessageDeleter message_deleter_;

View file

@ -25,14 +25,15 @@
#include <memory> #include <memory>
#include <mutex> #include <mutex>
#include <unordered_map> #include <unordered_map>
#include <utility>
#include <set> #include <set>
#include "rclcpp/allocator/allocator_deleter.hpp" #include "rclcpp/allocator/allocator_deleter.hpp"
#include "rclcpp/intra_process_manager_impl.hpp" #include "rclcpp/intra_process_manager_impl.hpp"
#include "rclcpp/mapped_ring_buffer.hpp" #include "rclcpp/mapped_ring_buffer.hpp"
#include "rclcpp/macros.hpp" #include "rclcpp/macros.hpp"
#include "rclcpp/publisher.hpp" #include "rclcpp/publisher_base.hpp"
#include "rclcpp/subscription.hpp" #include "rclcpp/subscription_base.hpp"
#include "rclcpp/visibility_control.hpp" #include "rclcpp/visibility_control.hpp"
namespace rclcpp namespace rclcpp
@ -184,21 +185,11 @@ public:
* \param buffer_size if 0 (default) a size is calculated based on the QoS. * \param buffer_size if 0 (default) a size is calculated based on the QoS.
* \return an unsigned 64-bit integer which is the publisher's unique id. * \return an unsigned 64-bit integer which is the publisher's unique id.
*/ */
template<typename MessageT, typename Alloc> RCLCPP_PUBLIC
uint64_t uint64_t
add_publisher( add_publisher(
typename Publisher<MessageT, Alloc>::SharedPtr publisher, rclcpp::PublisherBase::SharedPtr publisher,
size_t buffer_size = 0) size_t buffer_size = 0);
{
auto id = IntraProcessManager::get_next_unique_id();
size_t size = buffer_size > 0 ? buffer_size : publisher->get_queue_size();
auto mrb = mapped_ring_buffer::MappedRingBuffer<
MessageT,
typename Publisher<MessageT, Alloc>::MessageAlloc
>::make_shared(size, publisher->get_allocator());
impl_->add_publisher(id, publisher, mrb, size);
return id;
}
/// Unregister a publisher using the publisher's unique id. /// Unregister a publisher using the publisher's unique id.
/** /**
@ -242,12 +233,11 @@ public:
* \return the message sequence number. * \return the message sequence number.
*/ */
template< template<
typename MessageT, typename Alloc = std::allocator<void>, typename MessageT, typename Alloc = std::allocator<void>>
typename Deleter = std::default_delete<MessageT>>
uint64_t uint64_t
store_intra_process_message( store_intra_process_message(
uint64_t intra_process_publisher_id, uint64_t intra_process_publisher_id,
std::unique_ptr<MessageT, Deleter> & message) std::shared_ptr<const MessageT> message)
{ {
using MRBMessageAlloc = typename std::allocator_traits<Alloc>::template rebind_alloc<MessageT>; using MRBMessageAlloc = typename std::allocator_traits<Alloc>::template rebind_alloc<MessageT>;
using TypedMRB = typename mapped_ring_buffer::MappedRingBuffer<MessageT, MRBMessageAlloc>; using TypedMRB = typename mapped_ring_buffer::MappedRingBuffer<MessageT, MRBMessageAlloc>;
@ -270,6 +260,35 @@ public:
return message_seq; return message_seq;
} }
template<
typename MessageT, typename Alloc = std::allocator<void>,
typename Deleter = std::default_delete<MessageT>>
uint64_t
store_intra_process_message(
uint64_t intra_process_publisher_id,
std::unique_ptr<MessageT, Deleter> message)
{
using MRBMessageAlloc = typename std::allocator_traits<Alloc>::template rebind_alloc<MessageT>;
using TypedMRB = typename mapped_ring_buffer::MappedRingBuffer<MessageT, MRBMessageAlloc>;
uint64_t message_seq = 0;
mapped_ring_buffer::MappedRingBufferBase::SharedPtr buffer = impl_->get_publisher_info_for_id(
intra_process_publisher_id, message_seq);
typename TypedMRB::SharedPtr typed_buffer = std::static_pointer_cast<TypedMRB>(buffer);
if (!typed_buffer) {
throw std::runtime_error("Typecast failed due to incorrect message type");
}
// Insert the message into the ring buffer using the message_seq to identify it.
bool did_replace = typed_buffer->push_and_replace(message_seq, std::move(message));
// TODO(wjwwood): do something when a message was displaced. log debug?
(void)did_replace; // Avoid unused variable warning.
impl_->store_intra_process_message(intra_process_publisher_id, message_seq);
// Return the message sequence which is sent to the subscription.
return message_seq;
}
/// Take an intra process message. /// Take an intra process message.
/** /**
* The intra_process_publisher_id and message_sequence_number parameters * The intra_process_publisher_id and message_sequence_number parameters
@ -334,10 +353,45 @@ public:
// Return a copy or the unique_ptr (ownership) depending on how many subscriptions are left. // Return a copy or the unique_ptr (ownership) depending on how many subscriptions are left.
if (target_subs_size) { if (target_subs_size) {
// There are more subscriptions to serve, return a copy. // There are more subscriptions to serve, return a copy.
typed_buffer->get_copy_at_key(message_sequence_number, message); typed_buffer->get(message_sequence_number, message);
} else { } else {
// This is the last one to be returned, transfer ownership. // This is the last one to be returned, transfer ownership.
typed_buffer->pop_at_key(message_sequence_number, message); typed_buffer->pop(message_sequence_number, message);
}
}
template<
typename MessageT, typename Alloc = std::allocator<void>>
void
take_intra_process_message(
uint64_t intra_process_publisher_id,
uint64_t message_sequence_number,
uint64_t requesting_subscriptions_intra_process_id,
std::shared_ptr<const MessageT> & message)
{
using MRBMessageAlloc = typename std::allocator_traits<Alloc>::template rebind_alloc<MessageT>;
using TypedMRB = mapped_ring_buffer::MappedRingBuffer<MessageT, MRBMessageAlloc>;
message = nullptr;
size_t target_subs_size = 0;
std::lock_guard<std::mutex> lock(take_mutex_);
mapped_ring_buffer::MappedRingBufferBase::SharedPtr buffer = impl_->take_intra_process_message(
intra_process_publisher_id,
message_sequence_number,
requesting_subscriptions_intra_process_id,
target_subs_size
);
typename TypedMRB::SharedPtr typed_buffer = std::static_pointer_cast<TypedMRB>(buffer);
if (!typed_buffer) {
return;
}
// Return a copy or the unique_ptr (ownership) depending on how many subscriptions are left.
if (target_subs_size) {
// There are more subscriptions to serve, return a copy.
typed_buffer->get(message_sequence_number, message);
} else {
// This is the last one to be returned, transfer ownership.
typed_buffer->pop(message_sequence_number, message);
} }
} }

View file

@ -34,8 +34,8 @@
#include "rclcpp/macros.hpp" #include "rclcpp/macros.hpp"
#include "rclcpp/mapped_ring_buffer.hpp" #include "rclcpp/mapped_ring_buffer.hpp"
#include "rclcpp/publisher.hpp" #include "rclcpp/publisher_base.hpp"
#include "rclcpp/subscription.hpp" #include "rclcpp/subscription_base.hpp"
#include "rclcpp/visibility_control.hpp" #include "rclcpp/visibility_control.hpp"
namespace rclcpp namespace rclcpp

View file

@ -20,6 +20,7 @@
#include <cstdint> #include <cstdint>
#include <memory> #include <memory>
#include <mutex> #include <mutex>
#include <stdexcept>
#include <utility> #include <utility>
#include <vector> #include <vector>
@ -38,7 +39,7 @@ public:
RCLCPP_SMART_PTR_DEFINITIONS(MappedRingBufferBase) RCLCPP_SMART_PTR_DEFINITIONS(MappedRingBufferBase)
}; };
/// Ring buffer container of unique_ptr's of T, which can be accessed by a key. /// Ring buffer container of shared_ptr's or unique_ptr's of T, which can be accessed by a key.
/** /**
* T must be a CopyConstructable and CopyAssignable. * T must be a CopyConstructable and CopyAssignable.
* This class can be used in a container by using the base class MappedRingBufferBase. * This class can be used in a container by using the base class MappedRingBufferBase.
@ -64,6 +65,7 @@ public:
using ElemAlloc = typename ElemAllocTraits::allocator_type; using ElemAlloc = typename ElemAllocTraits::allocator_type;
using ElemDeleter = allocator::Deleter<ElemAlloc, T>; using ElemDeleter = allocator::Deleter<ElemAlloc, T>;
using ConstElemSharedPtr = std::shared_ptr<const T>;
using ElemUniquePtr = std::unique_ptr<T, ElemDeleter>; using ElemUniquePtr = std::unique_ptr<T, ElemDeleter>;
/// Constructor. /// Constructor.
@ -101,57 +103,33 @@ public:
* \param value if the key is found, the value is stored in this parameter * \param value if the key is found, the value is stored in this parameter
*/ */
void void
get_copy_at_key(uint64_t key, ElemUniquePtr & value) get(uint64_t key, ElemUniquePtr & value)
{ {
std::lock_guard<std::mutex> lock(data_mutex_); std::lock_guard<std::mutex> lock(data_mutex_);
auto it = get_iterator_of_key(key); auto it = get_iterator_of_key(key);
value = nullptr; value = nullptr;
if (it != elements_.end() && it->in_use) { if (it != elements_.end() && it->in_use) {
auto ptr = ElemAllocTraits::allocate(*allocator_.get(), 1); if (it->unique_value) {
ElemAllocTraits::construct(*allocator_.get(), ptr, *it->value); ElemDeleter deleter = it->unique_value.get_deleter();
value = ElemUniquePtr(ptr); auto ptr = ElemAllocTraits::allocate(*allocator_.get(), 1);
ElemAllocTraits::construct(*allocator_.get(), ptr, *it->unique_value);
value = ElemUniquePtr(ptr, deleter);
} else if (it->shared_value) {
ElemDeleter * deleter = std::get_deleter<ElemDeleter, const T>(it->shared_value);
auto ptr = ElemAllocTraits::allocate(*allocator_.get(), 1);
ElemAllocTraits::construct(*allocator_.get(), ptr, *it->shared_value);
if (deleter) {
value = ElemUniquePtr(ptr, *deleter);
} else {
value = ElemUniquePtr(ptr);
}
} else {
throw std::runtime_error("Unexpected empty MappedRingBuffer element.");
}
} }
} }
/// Return ownership of the value stored in the ring buffer, leaving a copy. /// Share ownership of the value stored in the ring buffer at the given key.
/**
* The key is matched if an element in the ring bufer has a matching key.
* This method will allocate in order to store a copy.
*
* The key is not guaranteed to be unique, see the class docs for more.
*
* The ownership of the currently stored object is returned, but a copy is
* made and stored in its place.
* This means that multiple calls to this function for a particular element
* will result in returning the copied and stored object not the original.
* This also means that later calls to pop_at_key will not return the
* originally stored object, since it was returned by the first call to this
* method.
*
* The contents of value before the method is called are discarded.
*
* \param key the key associated with the stored value
* \param value if the key is found, the value is stored in this parameter
*/
void
get_ownership_at_key(uint64_t key, ElemUniquePtr & value)
{
std::lock_guard<std::mutex> lock(data_mutex_);
auto it = get_iterator_of_key(key);
value = nullptr;
if (it != elements_.end() && it->in_use) {
// Make a copy.
auto ptr = ElemAllocTraits::allocate(*allocator_.get(), 1);
ElemAllocTraits::construct(*allocator_.get(), ptr, *it->value);
auto copy = ElemUniquePtr(ptr);
// Return the original.
value.swap(it->value);
// Store the copy.
it->value.swap(copy);
}
}
/// Return ownership of the value stored in the ring buffer at the given key.
/** /**
* The key is matched if an element in the ring buffer has a matching key. * The key is matched if an element in the ring buffer has a matching key.
* *
@ -163,13 +141,90 @@ public:
* \param value if the key is found, the value is stored in this parameter * \param value if the key is found, the value is stored in this parameter
*/ */
void void
pop_at_key(uint64_t key, ElemUniquePtr & value) get(uint64_t key, ConstElemSharedPtr & value)
{
std::lock_guard<std::mutex> lock(data_mutex_);
auto it = get_iterator_of_key(key);
value.reset();
if (it != elements_.end() && it->in_use) {
if (!it->shared_value) {
// The stored unique_ptr is upgraded to a shared_ptr here.
// All the remaining get and pop calls done with unique_ptr
// signature will receive a copy.
if (!it->unique_value) {
throw std::runtime_error("Unexpected empty MappedRingBuffer element.");
}
it->shared_value = std::move(it->unique_value);
}
value = it->shared_value;
}
}
/// Give the ownership of the stored value to the caller if possible, or copy and release.
/**
* The key is matched if an element in the ring buffer has a matching key.
* This method may allocate in order to return a copy.
*
* If the stored value is a shared_ptr, it is not possible to downgrade it to a unique_ptr.
* In that case, a copy is returned and the stored value is released.
*
* The key is not guaranteed to be unique, see the class docs for more.
*
* The contents of value before the method is called are discarded.
*
* \param key the key associated with the stored value
* \param value if the key is found, the value is stored in this parameter
*/
void
pop(uint64_t key, ElemUniquePtr & value)
{ {
std::lock_guard<std::mutex> lock(data_mutex_); std::lock_guard<std::mutex> lock(data_mutex_);
auto it = get_iterator_of_key(key); auto it = get_iterator_of_key(key);
value = nullptr; value = nullptr;
if (it != elements_.end() && it->in_use) { if (it != elements_.end() && it->in_use) {
value.swap(it->value); if (it->unique_value) {
value = std::move(it->unique_value);
} else if (it->shared_value) {
auto ptr = ElemAllocTraits::allocate(*allocator_.get(), 1);
ElemAllocTraits::construct(*allocator_.get(), ptr, *it->shared_value);
auto deleter = std::get_deleter<ElemDeleter, const T>(it->shared_value);
if (deleter) {
value = ElemUniquePtr(ptr, *deleter);
} else {
value = ElemUniquePtr(ptr);
}
it->shared_value.reset();
} else {
throw std::runtime_error("Unexpected empty MappedRingBuffer element.");
}
it->in_use = false;
}
}
/// Give the ownership of the stored value to the caller, at the given key.
/**
* The key is matched if an element in the ring buffer has a matching key.
*
* The key is not guaranteed to be unique, see the class docs for more.
*
* The contents of value before the method is called are discarded.
*
* \param key the key associated with the stored value
* \param value if the key is found, the value is stored in this parameter
*/
void
pop(uint64_t key, ConstElemSharedPtr & value)
{
std::lock_guard<std::mutex> lock(data_mutex_);
auto it = get_iterator_of_key(key);
if (it != elements_.end() && it->in_use) {
if (it->shared_value) {
value = std::move(it->shared_value);
} else if (it->unique_value) {
value = std::move(it->unique_value);
} else {
throw std::runtime_error("Unexpected empty MappedRingBuffer element.");
}
it->in_use = false; it->in_use = false;
} }
} }
@ -180,29 +235,44 @@ public:
* It is up to the user to ensure the key is unique. * It is up to the user to ensure the key is unique.
* This method should not allocate memory. * This method should not allocate memory.
* *
* After insertion, if a pair was replaced, then value will contain ownership * After insertion the value will be a nullptr.
* of that displaced value. Otherwise it will be a nullptr. * If a pair were replaced, its smart pointer is reset.
* *
* \param key the key associated with the value to be stored * \param key the key associated with the value to be stored
* \param value the value to store, and optionally the value displaced * \param value the value to store, and optionally the value displaced
*/ */
bool bool
push_and_replace(uint64_t key, ElemUniquePtr & value) push_and_replace(uint64_t key, ConstElemSharedPtr value)
{ {
std::lock_guard<std::mutex> lock(data_mutex_); std::lock_guard<std::mutex> lock(data_mutex_);
bool did_replace = elements_[head_].in_use; bool did_replace = elements_[head_].in_use;
elements_[head_].key = key; Element & element = elements_[head_];
elements_[head_].value.swap(value); element.key = key;
elements_[head_].in_use = true; element.unique_value.reset();
element.shared_value.reset();
element.shared_value = value;
element.in_use = true;
head_ = (head_ + 1) % elements_.size(); head_ = (head_ + 1) % elements_.size();
return did_replace; return did_replace;
} }
/// Insert a key-value pair, displacing an existing pair if necessary.
/**
* See `bool push_and_replace(uint64_t key, const ConstElemSharedPtr & value)`.
*/
bool bool
push_and_replace(uint64_t key, ElemUniquePtr && value) push_and_replace(uint64_t key, ElemUniquePtr value)
{ {
ElemUniquePtr temp = std::move(value); std::lock_guard<std::mutex> lock(data_mutex_);
return push_and_replace(key, temp); bool did_replace = elements_[head_].in_use;
Element & element = elements_[head_];
element.key = key;
element.unique_value.reset();
element.shared_value.reset();
element.unique_value = std::move(value);
element.in_use = true;
head_ = (head_ + 1) % elements_.size();
return did_replace;
} }
/// Return true if the key is found in the ring buffer, otherwise false. /// Return true if the key is found in the ring buffer, otherwise false.
@ -216,27 +286,28 @@ public:
private: private:
RCLCPP_DISABLE_COPY(MappedRingBuffer<T, Alloc>) RCLCPP_DISABLE_COPY(MappedRingBuffer<T, Alloc>)
struct element struct Element
{ {
uint64_t key; uint64_t key;
ElemUniquePtr value; ElemUniquePtr unique_value;
ConstElemSharedPtr shared_value;
bool in_use; bool in_use;
}; };
using VectorAlloc = typename std::allocator_traits<Alloc>::template rebind_alloc<element>; using VectorAlloc = typename std::allocator_traits<Alloc>::template rebind_alloc<Element>;
typename std::vector<element, VectorAlloc>::iterator typename std::vector<Element, VectorAlloc>::iterator
get_iterator_of_key(uint64_t key) get_iterator_of_key(uint64_t key)
{ {
auto it = std::find_if( auto it = std::find_if(
elements_.begin(), elements_.end(), elements_.begin(), elements_.end(),
[key](element & e) -> bool { [key](Element & e) -> bool {
return e.key == key && e.in_use; return e.key == key && e.in_use;
}); });
return it; return it;
} }
std::vector<element, VectorAlloc> elements_; std::vector<Element, VectorAlloc> elements_;
size_t head_; size_t head_;
std::shared_ptr<ElemAlloc> allocator_; std::shared_ptr<ElemAlloc> allocator_;
std::mutex data_mutex_; std::mutex data_mutex_;

View file

@ -23,6 +23,7 @@
#include <memory> #include <memory>
#include <sstream> #include <sstream>
#include <string> #include <string>
#include <utility>
#include "rcl/error_handling.h" #include "rcl/error_handling.h"
#include "rcl/publisher.h" #include "rcl/publisher.h"
@ -31,167 +32,16 @@
#include "rclcpp/allocator/allocator_common.hpp" #include "rclcpp/allocator/allocator_common.hpp"
#include "rclcpp/allocator/allocator_deleter.hpp" #include "rclcpp/allocator/allocator_deleter.hpp"
#include "rclcpp/intra_process_manager.hpp"
#include "rclcpp/macros.hpp" #include "rclcpp/macros.hpp"
#include "rclcpp/node_interfaces/node_base_interface.hpp" #include "rclcpp/node_interfaces/node_base_interface.hpp"
#include "rclcpp/publisher_base.hpp"
#include "rclcpp/type_support_decl.hpp" #include "rclcpp/type_support_decl.hpp"
#include "rclcpp/visibility_control.hpp" #include "rclcpp/visibility_control.hpp"
namespace rclcpp namespace rclcpp
{ {
// Forward declaration is used for friend statement.
namespace node_interfaces
{
class NodeTopicsInterface;
}
namespace intra_process_manager
{
/**
* NOTE(ivanpauno): IntraProcessManager is forward declared here, avoiding a circular inclusion between intra_process_manager.hpp and publisher.hpp.
* SharedPtr and WeakPtr of the IntraProcessManager are defined again here, to avoid a warning for accessing a member of a forward declared class.
*/
class IntraProcessManager;
}
class PublisherBase
{
friend ::rclcpp::node_interfaces::NodeTopicsInterface;
public:
RCLCPP_SMART_PTR_DEFINITIONS(PublisherBase)
/// Default constructor.
/**
* Typically, a publisher is not created through this method, but instead is created through a
* call to `Node::create_publisher`.
* \param[in] node_base A pointer to the NodeBaseInterface for the parent node.
* \param[in] topic The topic that this publisher publishes on.
* \param[in] type_support The type support structure for the type to be published.
* \param[in] publisher_options QoS settings for this publisher.
*/
RCLCPP_PUBLIC
PublisherBase(
rclcpp::node_interfaces::NodeBaseInterface * node_base,
const std::string & topic,
const rosidl_message_type_support_t & type_support,
const rcl_publisher_options_t & publisher_options);
RCLCPP_PUBLIC
virtual ~PublisherBase();
/// Get the topic that this publisher publishes on.
/** \return The topic name. */
RCLCPP_PUBLIC
const char *
get_topic_name() const;
/// Get the queue size for this publisher.
/** \return The queue size. */
RCLCPP_PUBLIC
size_t
get_queue_size() const;
/// Get the global identifier for this publisher (used in rmw and by DDS).
/** \return The gid. */
RCLCPP_PUBLIC
const rmw_gid_t &
get_gid() const;
/// Get the global identifier for this publisher used by intra-process communication.
/** \return The intra-process gid. */
RCLCPP_PUBLIC
const rmw_gid_t &
get_intra_process_gid() const;
/// Get the rcl publisher handle.
/** \return The rcl publisher handle. */
RCLCPP_PUBLIC
rcl_publisher_t *
get_publisher_handle();
/// Get the rcl publisher handle.
/** \return The rcl publisher handle. */
RCLCPP_PUBLIC
const rcl_publisher_t *
get_publisher_handle() const;
/// Get subscription count
/** \return The number of subscriptions. */
RCLCPP_PUBLIC
size_t
get_subscription_count() const;
/// Get intraprocess subscription count
/** \return The number of intraprocess subscriptions. */
RCLCPP_PUBLIC
size_t
get_intra_process_subscription_count() const;
/// Get the actual QoS settings, after the defaults have been determined.
/**
* The actual configuration applied when using RMW_QOS_POLICY_*_SYSTEM_DEFAULT
* can only be resolved after the creation of the publisher, and it
* depends on the underlying rmw implementation.
* If the underlying setting in use can't be represented in ROS terms,
* it will be set to RMW_QOS_POLICY_*_UNKNOWN.
* May throw runtime_error when an unexpected error occurs.
* \return The actual qos settings.
*/
RCLCPP_PUBLIC
rmw_qos_profile_t
get_actual_qos() const;
/// Compare this publisher to a gid.
/**
* Note that this function calls the next function.
* \param[in] gid Reference to a gid.
* \return True if the publisher's gid matches the input.
*/
RCLCPP_PUBLIC
bool
operator==(const rmw_gid_t & gid) const;
/// Compare this publisher to a pointer gid.
/**
* A wrapper for comparing this publisher's gid to the input using rmw_compare_gids_equal.
* \param[in] gid A pointer to a gid.
* \return True if this publisher's gid matches the input.
*/
RCLCPP_PUBLIC
bool
operator==(const rmw_gid_t * gid) const;
using StoreMessageCallbackT = std::function<uint64_t(uint64_t, void *, const std::type_info &)>;
using IntraProcessManagerSharedPtr =
std::shared_ptr<rclcpp::intra_process_manager::IntraProcessManager>;
/// Implementation utility function used to setup intra process publishing after creation.
RCLCPP_PUBLIC
void
setup_intra_process(
uint64_t intra_process_publisher_id,
StoreMessageCallbackT store_callback,
IntraProcessManagerSharedPtr ipm,
const rcl_publisher_options_t & intra_process_options);
protected:
std::shared_ptr<rcl_node_t> rcl_node_handle_;
rcl_publisher_t publisher_handle_ = rcl_get_zero_initialized_publisher();
rcl_publisher_t intra_process_publisher_handle_ = rcl_get_zero_initialized_publisher();
using IntraProcessManagerWeakPtr =
std::weak_ptr<rclcpp::intra_process_manager::IntraProcessManager>;
bool intra_process_is_enabled_;
IntraProcessManagerWeakPtr weak_ipm_;
uint64_t intra_process_publisher_id_;
StoreMessageCallbackT store_intra_process_message_;
rmw_gid_t rmw_gid_;
rmw_gid_t intra_process_rmw_gid_;
};
/// A publisher publishes messages of any type to a topic. /// A publisher publishes messages of any type to a topic.
template<typename MessageT, typename Alloc = std::allocator<void>> template<typename MessageT, typename Alloc = std::allocator<void>>
class Publisher : public PublisherBase class Publisher : public PublisherBase
@ -201,6 +51,7 @@ public:
using MessageAlloc = typename MessageAllocTraits::allocator_type; using MessageAlloc = typename MessageAllocTraits::allocator_type;
using MessageDeleter = allocator::Deleter<MessageAlloc, MessageT>; using MessageDeleter = allocator::Deleter<MessageAlloc, MessageT>;
using MessageUniquePtr = std::unique_ptr<MessageT, MessageDeleter>; using MessageUniquePtr = std::unique_ptr<MessageT, MessageDeleter>;
using MessageSharedPtr = std::shared_ptr<const MessageT>;
RCLCPP_SMART_PTR_DEFINITIONS(Publisher<MessageT, Alloc>) RCLCPP_SMART_PTR_DEFINITIONS(Publisher<MessageT, Alloc>)
@ -222,6 +73,15 @@ public:
virtual ~Publisher() virtual ~Publisher()
{} {}
mapped_ring_buffer::MappedRingBufferBase::SharedPtr
make_mapped_ring_buffer(size_t size) const override
{
return mapped_ring_buffer::MappedRingBuffer<
MessageT,
typename Publisher<MessageT, Alloc>::MessageAlloc
>::make_shared(size, this->get_allocator());
}
/// Send a message to the topic for this publisher. /// Send a message to the topic for this publisher.
/** /**
* This function is templated on the input message type, MessageT. * This function is templated on the input message type, MessageT.
@ -230,97 +90,56 @@ public:
virtual void virtual void
publish(std::unique_ptr<MessageT, MessageDeleter> & msg) publish(std::unique_ptr<MessageT, MessageDeleter> & msg)
{ {
bool inter_process_subscriptions_exist = if (!intra_process_is_enabled_) {
get_subscription_count() > get_intra_process_subscription_count();
if (!intra_process_is_enabled_ || inter_process_subscriptions_exist) {
this->do_inter_process_publish(msg.get()); this->do_inter_process_publish(msg.get());
}
if (store_intra_process_message_) {
// Take the pointer from the unique_msg, release it and pass as a void *
// to the ipm. The ipm should then capture it again as a unique_ptr of
// the correct type.
// TODO(wjwwood):
// investigate how to transfer the custom deleter (if there is one)
// from the incoming unique_ptr through to the ipm's unique_ptr.
// See: http://stackoverflow.com/questions/11002641/dynamic-casting-for-unique-ptr
MessageT * msg_ptr = msg.get();
msg.release();
uint64_t message_seq =
store_intra_process_message_(intra_process_publisher_id_, msg_ptr, typeid(MessageT));
rcl_interfaces::msg::IntraProcessMessage ipm;
ipm.publisher_id = intra_process_publisher_id_;
ipm.message_sequence = message_seq;
auto status = rcl_publish(&intra_process_publisher_handle_, &ipm);
if (RCL_RET_PUBLISHER_INVALID == status) {
rcl_reset_error(); // next call will reset error message if not context
if (rcl_publisher_is_valid_except_context(&intra_process_publisher_handle_)) {
rcl_context_t * context = rcl_publisher_get_context(&intra_process_publisher_handle_);
if (nullptr != context && !rcl_context_is_valid(context)) {
// publisher is invalid due to context being shutdown
return;
}
}
}
if (RCL_RET_OK != status) {
rclcpp::exceptions::throw_from_rcl_error(status, "failed to publish intra process message");
}
} else {
// Always destroy the message, even if we don't consume it, for consistency.
msg.reset(); msg.reset();
return;
}
// If an interprocess subscription exist, then the unique_ptr is promoted
// to a shared_ptr and published.
// This allows doing the intraprocess publish first and then doing the
// interprocess publish, resulting in lower publish-to-subscribe latency.
// It's not possible to do that with an unique_ptr,
// as do_intra_process_publish takes the ownership of the message.
uint64_t message_seq;
bool inter_process_publish_needed =
get_subscription_count() > get_intra_process_subscription_count();
MessageSharedPtr shared_msg;
if (inter_process_publish_needed) {
shared_msg = std::move(msg);
message_seq =
store_intra_process_message(intra_process_publisher_id_, shared_msg);
} else {
message_seq =
store_intra_process_message(intra_process_publisher_id_, std::move(msg));
}
this->do_intra_process_publish(message_seq);
if (inter_process_publish_needed) {
this->do_inter_process_publish(shared_msg.get());
} }
} }
virtual void virtual void
publish(const std::shared_ptr<MessageT> & msg) publish(const std::shared_ptr<const MessageT> & msg)
{ {
// Avoid allocating when not using intra process. publish(*msg);
if (!store_intra_process_message_) {
// In this case we're not using intra process.
return this->do_inter_process_publish(msg.get());
}
// Otherwise we have to allocate memory in a unique_ptr and pass it along.
// TODO(wjwwood):
// The intra process manager should probably also be able to store
// shared_ptr's and do the "smart" thing based on other intra process
// subscriptions. For now call the other publish().
auto ptr = MessageAllocTraits::allocate(*message_allocator_.get(), 1);
MessageAllocTraits::construct(*message_allocator_.get(), ptr, *msg.get());
MessageUniquePtr unique_msg(ptr, message_deleter_);
return this->publish(unique_msg);
}
virtual void
publish(std::shared_ptr<const MessageT> msg)
{
// Avoid allocating when not using intra process.
if (!store_intra_process_message_) {
// In this case we're not using intra process.
return this->do_inter_process_publish(msg.get());
}
// Otherwise we have to allocate memory in a unique_ptr and pass it along.
// TODO(wjwwood):
// The intra process manager should probably also be able to store
// shared_ptr's and do the "smart" thing based on other intra process
// subscriptions. For now call the other publish().
auto ptr = MessageAllocTraits::allocate(*message_allocator_.get(), 1);
MessageAllocTraits::construct(*message_allocator_.get(), ptr, *msg.get());
MessageUniquePtr unique_msg(ptr, message_deleter_);
return this->publish(unique_msg);
} }
virtual void virtual void
publish(const MessageT & msg) publish(const MessageT & msg)
{ {
// Avoid allocating when not using intra process. // Avoid allocating when not using intra process.
if (!store_intra_process_message_) { if (!intra_process_is_enabled_) {
// In this case we're not using intra process. // In this case we're not using intra process.
return this->do_inter_process_publish(&msg); return this->do_inter_process_publish(&msg);
} }
// Otherwise we have to allocate memory in a unique_ptr and pass it along. // Otherwise we have to allocate memory in a unique_ptr and pass it along.
// As the message is not const, a copy should be made.
// A shared_ptr<const MessageT> could also be constructed here.
auto ptr = MessageAllocTraits::allocate(*message_allocator_.get(), 1); auto ptr = MessageAllocTraits::allocate(*message_allocator_.get(), 1);
MessageAllocTraits::construct(*message_allocator_.get(), ptr, msg); MessageAllocTraits::construct(*message_allocator_.get(), ptr, msg);
MessageUniquePtr unique_msg(ptr, message_deleter_); MessageUniquePtr unique_msg(ptr, message_deleter_);
return this->publish(unique_msg); this->publish(unique_msg);
} }
virtual void virtual void
@ -335,7 +154,7 @@ public:
void void
publish(const rcl_serialized_message_t * serialized_msg) publish(const rcl_serialized_message_t * serialized_msg)
{ {
if (store_intra_process_message_) { if (intra_process_is_enabled_) {
// TODO(Karsten1987): support serialized message passed by intraprocess // TODO(Karsten1987): support serialized message passed by intraprocess
throw std::runtime_error("storing serialized messages in intra process is not supported yet"); throw std::runtime_error("storing serialized messages in intra process is not supported yet");
} }
@ -376,6 +195,64 @@ protected:
} }
} }
void
do_intra_process_publish(uint64_t message_seq)
{
rcl_interfaces::msg::IntraProcessMessage ipm;
ipm.publisher_id = intra_process_publisher_id_;
ipm.message_sequence = message_seq;
auto status = rcl_publish(&intra_process_publisher_handle_, &ipm);
if (RCL_RET_PUBLISHER_INVALID == status) {
rcl_reset_error(); // next call will reset error message if not context
if (rcl_publisher_is_valid_except_context(&intra_process_publisher_handle_)) {
rcl_context_t * context = rcl_publisher_get_context(&intra_process_publisher_handle_);
if (nullptr != context && !rcl_context_is_valid(context)) {
// publisher is invalid due to context being shutdown
return;
}
}
}
if (RCL_RET_OK != status) {
rclcpp::exceptions::throw_from_rcl_error(status, "failed to publish intra process message");
}
}
uint64_t
store_intra_process_message(
uint64_t publisher_id,
std::shared_ptr<const MessageT> msg)
{
auto ipm = weak_ipm_.lock();
if (!ipm) {
throw std::runtime_error(
"intra process publish called after destruction of intra process manager");
}
if (!msg) {
throw std::runtime_error("cannot publisher msg which is a null pointer");
}
uint64_t message_seq =
ipm->template store_intra_process_message<MessageT, Alloc>(publisher_id, msg);
return message_seq;
}
uint64_t
store_intra_process_message(
uint64_t publisher_id,
std::unique_ptr<MessageT, MessageDeleter> msg)
{
auto ipm = weak_ipm_.lock();
if (!ipm) {
throw std::runtime_error(
"intra process publish called after destruction of intra process manager");
}
if (!msg) {
throw std::runtime_error("cannot publisher msg which is a null pointer");
}
uint64_t message_seq =
ipm->template store_intra_process_message<MessageT, Alloc>(publisher_id, std::move(msg));
return message_seq;
}
std::shared_ptr<MessageAlloc> message_allocator_; std::shared_ptr<MessageAlloc> message_allocator_;
MessageDeleter message_deleter_; MessageDeleter message_deleter_;

View file

@ -0,0 +1,195 @@
// Copyright 2014 Open Source Robotics Foundation, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef RCLCPP__PUBLISHER_BASE_HPP_
#define RCLCPP__PUBLISHER_BASE_HPP_
#include <rmw/error_handling.h>
#include <rmw/rmw.h>
#include <functional>
#include <iostream>
#include <memory>
#include <sstream>
#include <string>
#include "rcl/publisher.h"
#include "rclcpp/macros.hpp"
#include "rclcpp/mapped_ring_buffer.hpp"
#include "rclcpp/type_support_decl.hpp"
#include "rclcpp/visibility_control.hpp"
namespace rclcpp
{
// Forward declaration is used for friend statement.
namespace node_interfaces
{
class NodeBaseInterface;
class NodeTopicsInterface;
}
namespace intra_process_manager
{
/**
* IntraProcessManager is forward declared here, avoiding a circular inclusion between
* `intra_process_manager.hpp` and `publisher_base.hpp`.
*/
class IntraProcessManager;
}
class PublisherBase
{
friend ::rclcpp::node_interfaces::NodeTopicsInterface;
public:
RCLCPP_SMART_PTR_DEFINITIONS(PublisherBase)
/// Default constructor.
/**
* Typically, a publisher is not created through this method, but instead is created through a
* call to `Node::create_publisher`.
* \param[in] node_base A pointer to the NodeBaseInterface for the parent node.
* \param[in] topic The topic that this publisher publishes on.
* \param[in] type_support The type support structure for the type to be published.
* \param[in] publisher_options QoS settings for this publisher.
*/
RCLCPP_PUBLIC
PublisherBase(
rclcpp::node_interfaces::NodeBaseInterface * node_base,
const std::string & topic,
const rosidl_message_type_support_t & type_support,
const rcl_publisher_options_t & publisher_options);
RCLCPP_PUBLIC
virtual ~PublisherBase();
/// Get the topic that this publisher publishes on.
/** \return The topic name. */
RCLCPP_PUBLIC
const char *
get_topic_name() const;
/// Get the queue size for this publisher.
/** \return The queue size. */
RCLCPP_PUBLIC
size_t
get_queue_size() const;
/// Get the global identifier for this publisher (used in rmw and by DDS).
/** \return The gid. */
RCLCPP_PUBLIC
const rmw_gid_t &
get_gid() const;
/// Get the global identifier for this publisher used by intra-process communication.
/** \return The intra-process gid. */
RCLCPP_PUBLIC
const rmw_gid_t &
get_intra_process_gid() const;
/// Get the rcl publisher handle.
/** \return The rcl publisher handle. */
RCLCPP_PUBLIC
rcl_publisher_t *
get_publisher_handle();
/// Get the rcl publisher handle.
/** \return The rcl publisher handle. */
RCLCPP_PUBLIC
const rcl_publisher_t *
get_publisher_handle() const;
/// Get subscription count
/** \return The number of subscriptions. */
RCLCPP_PUBLIC
size_t
get_subscription_count() const;
/// Get intraprocess subscription count
/** \return The number of intraprocess subscriptions. */
RCLCPP_PUBLIC
size_t
get_intra_process_subscription_count() const;
/// Get the actual QoS settings, after the defaults have been determined.
/**
* The actual configuration applied when using RMW_QOS_POLICY_*_SYSTEM_DEFAULT
* can only be resolved after the creation of the publisher, and it
* depends on the underlying rmw implementation.
* If the underlying setting in use can't be represented in ROS terms,
* it will be set to RMW_QOS_POLICY_*_UNKNOWN.
* May throw runtime_error when an unexpected error occurs.
* \return The actual qos settings.
*/
RCLCPP_PUBLIC
rmw_qos_profile_t
get_actual_qos() const;
/// Compare this publisher to a gid.
/**
* Note that this function calls the next function.
* \param[in] gid Reference to a gid.
* \return True if the publisher's gid matches the input.
*/
RCLCPP_PUBLIC
bool
operator==(const rmw_gid_t & gid) const;
/// Compare this publisher to a pointer gid.
/**
* A wrapper for comparing this publisher's gid to the input using rmw_compare_gids_equal.
* \param[in] gid A pointer to a gid.
* \return True if this publisher's gid matches the input.
*/
RCLCPP_PUBLIC
bool
operator==(const rmw_gid_t * gid) const;
using IntraProcessManagerSharedPtr =
std::shared_ptr<rclcpp::intra_process_manager::IntraProcessManager>;
/// Implementation utility function that creates a typed mapped ring buffer.
RCLCPP_PUBLIC
mapped_ring_buffer::MappedRingBufferBase::SharedPtr
virtual make_mapped_ring_buffer(size_t size) const;
/// Implementation utility function used to setup intra process publishing after creation.
RCLCPP_PUBLIC
void
setup_intra_process(
uint64_t intra_process_publisher_id,
IntraProcessManagerSharedPtr ipm,
const rcl_publisher_options_t & intra_process_options);
protected:
std::shared_ptr<rcl_node_t> rcl_node_handle_;
rcl_publisher_t publisher_handle_ = rcl_get_zero_initialized_publisher();
rcl_publisher_t intra_process_publisher_handle_ = rcl_get_zero_initialized_publisher();
using IntraProcessManagerWeakPtr =
std::weak_ptr<rclcpp::intra_process_manager::IntraProcessManager>;
bool intra_process_is_enabled_;
IntraProcessManagerWeakPtr weak_ipm_;
uint64_t intra_process_publisher_id_;
rmw_gid_t rmw_gid_;
rmw_gid_t intra_process_rmw_gid_;
};
} // namespace rclcpp
#endif // RCLCPP__PUBLISHER_BASE_HPP_

View file

@ -52,24 +52,6 @@ struct PublisherFactory
rcl_publisher_options_t & publisher_options)>; rcl_publisher_options_t & publisher_options)>;
PublisherFactoryFunction create_typed_publisher; PublisherFactoryFunction create_typed_publisher;
// Adds the PublisherBase to the intraprocess manager with the correctly
// templated call to IntraProcessManager::store_intra_process_message.
using AddPublisherToIntraProcessManagerFunction = std::function<
uint64_t(
rclcpp::intra_process_manager::IntraProcessManager * ipm,
rclcpp::PublisherBase::SharedPtr publisher)>;
AddPublisherToIntraProcessManagerFunction add_publisher_to_intra_process_manager;
// Creates the callback function which is called on each
// PublisherT::publish() and which handles the intra process transmission of
// the message being published.
using SharedPublishCallbackFactoryFunction = std::function<
rclcpp::PublisherBase::StoreMessageCallbackT(
rclcpp::intra_process_manager::IntraProcessManager::SharedPtr ipm)>;
SharedPublishCallbackFactoryFunction create_shared_publish_callback;
}; };
/// Return a PublisherFactory with functions setup for creating a PublisherT<MessageT, Alloc>. /// Return a PublisherFactory with functions setup for creating a PublisherT<MessageT, Alloc>.
@ -92,53 +74,6 @@ create_publisher_factory(std::shared_ptr<Alloc> allocator)
return std::make_shared<PublisherT>(node_base, topic_name, publisher_options, message_alloc); return std::make_shared<PublisherT>(node_base, topic_name, publisher_options, message_alloc);
}; };
// function to add a publisher to the intra process manager
factory.add_publisher_to_intra_process_manager =
[](
rclcpp::intra_process_manager::IntraProcessManager * ipm,
rclcpp::PublisherBase::SharedPtr publisher) -> uint64_t
{
return ipm->add_publisher<MessageT, Alloc>(std::dynamic_pointer_cast<PublisherT>(publisher));
};
// function to create a shared publish callback std::function
using StoreMessageCallbackT = rclcpp::PublisherBase::StoreMessageCallbackT;
factory.create_shared_publish_callback =
[](rclcpp::intra_process_manager::IntraProcessManager::SharedPtr ipm) -> StoreMessageCallbackT
{
rclcpp::intra_process_manager::IntraProcessManager::WeakPtr weak_ipm = ipm;
// this function is called on each call to publish() and handles storing
// of the published message in the intra process manager
auto shared_publish_callback =
[weak_ipm](uint64_t publisher_id, void * msg, const std::type_info & type_info) -> uint64_t
{
auto ipm = weak_ipm.lock();
if (!ipm) {
// TODO(wjwwood): should this just return silently? Or maybe return with a warning?
throw std::runtime_error(
"intra process publish called after destruction of intra process manager");
}
if (!msg) {
throw std::runtime_error("cannot publisher msg which is a null pointer");
}
auto & message_type_info = typeid(MessageT);
if (message_type_info != type_info) {
throw std::runtime_error(
std::string("published type '") + type_info.name() +
"' is incompatible from the publisher type '" + message_type_info.name() + "'");
}
MessageT * typed_message_ptr = static_cast<MessageT *>(msg);
using MessageDeleter = typename Publisher<MessageT, Alloc>::MessageDeleter;
std::unique_ptr<MessageT, MessageDeleter> unique_msg(typed_message_ptr);
uint64_t message_seq =
ipm->store_intra_process_message<MessageT, Alloc>(publisher_id, unique_msg);
return message_seq;
};
return shared_publish_callback;
};
// return the factory now that it is populated // return the factory now that it is populated
return factory; return factory;
} }

View file

@ -23,6 +23,8 @@
#include <memory> #include <memory>
#include <sstream> #include <sstream>
#include <string> #include <string>
#include <utility>
#include "rcl/error_handling.h" #include "rcl/error_handling.h"
#include "rcl/subscription.h" #include "rcl/subscription.h"
@ -32,8 +34,11 @@
#include "rclcpp/any_subscription_callback.hpp" #include "rclcpp/any_subscription_callback.hpp"
#include "rclcpp/exceptions.hpp" #include "rclcpp/exceptions.hpp"
#include "rclcpp/expand_topic_or_service_name.hpp" #include "rclcpp/expand_topic_or_service_name.hpp"
#include "rclcpp/intra_process_manager.hpp"
#include "rclcpp/logging.hpp"
#include "rclcpp/macros.hpp" #include "rclcpp/macros.hpp"
#include "rclcpp/message_memory_strategy.hpp" #include "rclcpp/message_memory_strategy.hpp"
#include "rclcpp/subscription_base.hpp"
#include "rclcpp/subscription_traits.hpp" #include "rclcpp/subscription_traits.hpp"
#include "rclcpp/type_support_decl.hpp" #include "rclcpp/type_support_decl.hpp"
#include "rclcpp/visibility_control.hpp" #include "rclcpp/visibility_control.hpp"
@ -46,122 +51,6 @@ namespace node_interfaces
class NodeTopicsInterface; class NodeTopicsInterface;
} // namespace node_interfaces } // namespace node_interfaces
namespace intra_process_manager
{
/**
* NOTE(ivanpauno): IntraProcessManager is forward declared here, avoiding a circular inclusion between intra_process_manager.hpp and publisher.hpp.
* SharedPtr and WeakPtr of the IntraProcessManager are defined again here, to avoid a warning for accessing a member of a forward declared class.
*/
class IntraProcessManager;
}
/// Virtual base class for subscriptions. This pattern allows us to iterate over different template
/// specializations of Subscription, among other things.
class SubscriptionBase
{
public:
RCLCPP_SMART_PTR_DEFINITIONS_NOT_COPYABLE(SubscriptionBase)
/// Default constructor.
/**
* \param[in] node_handle The rcl representation of the node that owns this subscription.
* \param[in] type_support_handle rosidl type support struct, for the Message type of the topic.
* \param[in] topic_name Name of the topic to subscribe to.
* \param[in] subscription_options options for the subscription.
* \param[in] is_serialized is true if the message will be delivered still serialized
*/
RCLCPP_PUBLIC
SubscriptionBase(
std::shared_ptr<rcl_node_t> node_handle,
const rosidl_message_type_support_t & type_support_handle,
const std::string & topic_name,
const rcl_subscription_options_t & subscription_options,
bool is_serialized = false);
/// Default destructor.
RCLCPP_PUBLIC
virtual ~SubscriptionBase();
/// Get the topic that this subscription is subscribed on.
RCLCPP_PUBLIC
const char *
get_topic_name() const;
RCLCPP_PUBLIC
std::shared_ptr<rcl_subscription_t>
get_subscription_handle();
RCLCPP_PUBLIC
const std::shared_ptr<rcl_subscription_t>
get_subscription_handle() const;
RCLCPP_PUBLIC
virtual const std::shared_ptr<rcl_subscription_t>
get_intra_process_subscription_handle() const;
/// Borrow a new message.
/** \return Shared pointer to the fresh message. */
virtual std::shared_ptr<void>
create_message() = 0;
/// Borrow a new serialized message
/** \return Shared pointer to a rcl_message_serialized_t. */
virtual std::shared_ptr<rcl_serialized_message_t>
create_serialized_message() = 0;
/// Check if we need to handle the message, and execute the callback if we do.
/**
* \param[in] message Shared pointer to the message to handle.
* \param[in] message_info Metadata associated with this message.
*/
virtual void
handle_message(std::shared_ptr<void> & message, const rmw_message_info_t & message_info) = 0;
/// Return the message borrowed in create_message.
/** \param[in] message Shared pointer to the returned message. */
virtual void
return_message(std::shared_ptr<void> & message) = 0;
/// Return the message borrowed in create_serialized_message.
/** \param[in] message Shared pointer to the returned message. */
virtual void
return_serialized_message(std::shared_ptr<rcl_serialized_message_t> & message) = 0;
virtual void
handle_intra_process_message(
rcl_interfaces::msg::IntraProcessMessage & ipm,
const rmw_message_info_t & message_info) = 0;
const rosidl_message_type_support_t &
get_message_type_support_handle() const;
bool
is_serialized() const;
/// Get matching publisher count
/** \return The number of publishers on this topic. */
RCLCPP_PUBLIC
size_t
get_publisher_count() const;
protected:
std::shared_ptr<rcl_subscription_t> intra_process_subscription_handle_;
std::shared_ptr<rcl_subscription_t> subscription_handle_;
std::shared_ptr<rcl_node_t> node_handle_;
using IntraProcessManagerWeakPtr =
std::weak_ptr<rclcpp::intra_process_manager::IntraProcessManager>;
bool use_intra_process_;
IntraProcessManagerWeakPtr weak_ipm_;
uint64_t intra_process_subscription_id_;
private:
RCLCPP_DISABLE_COPY(SubscriptionBase)
rosidl_message_type_support_t type_support_;
bool is_serialized_;
};
/// Subscription implementation, templated on the type of message this subscription receives. /// Subscription implementation, templated on the type of message this subscription receives.
template< template<
typename CallbackMessageT, typename CallbackMessageT,
@ -174,6 +63,7 @@ public:
using MessageAllocTraits = allocator::AllocRebind<CallbackMessageT, Alloc>; using MessageAllocTraits = allocator::AllocRebind<CallbackMessageT, Alloc>;
using MessageAlloc = typename MessageAllocTraits::allocator_type; using MessageAlloc = typename MessageAllocTraits::allocator_type;
using MessageDeleter = allocator::Deleter<MessageAlloc, CallbackMessageT>; using MessageDeleter = allocator::Deleter<MessageAlloc, CallbackMessageT>;
using ConstMessageSharedPtr = std::shared_ptr<const CallbackMessageT>;
using MessageUniquePtr = std::unique_ptr<CallbackMessageT, MessageDeleter>; using MessageUniquePtr = std::unique_ptr<CallbackMessageT, MessageDeleter>;
RCLCPP_SMART_PTR_DEFINITIONS(Subscription) RCLCPP_SMART_PTR_DEFINITIONS(Subscription)
@ -205,9 +95,7 @@ public:
subscription_options, subscription_options,
rclcpp::subscription_traits::is_serialized_subscription_argument<CallbackMessageT>::value), rclcpp::subscription_traits::is_serialized_subscription_argument<CallbackMessageT>::value),
any_callback_(callback), any_callback_(callback),
message_memory_strategy_(memory_strategy), message_memory_strategy_(memory_strategy)
get_intra_process_message_callback_(nullptr),
matches_any_intra_process_publishers_(nullptr)
{} {}
/// Support dynamically setting the message memory strategy. /// Support dynamically setting the message memory strategy.
@ -238,12 +126,10 @@ public:
void handle_message(std::shared_ptr<void> & message, const rmw_message_info_t & message_info) void handle_message(std::shared_ptr<void> & message, const rmw_message_info_t & message_info)
{ {
if (matches_any_intra_process_publishers_) { if (matches_any_intra_process_publishers(&message_info.publisher_gid)) {
if (matches_any_intra_process_publishers_(&message_info.publisher_gid)) { // In this case, the message will be delivered via intra process and
// In this case, the message will be delivered via intra process and // we should ignore this copy of the message.
// we should ignore this copy of the message. return;
return;
}
} }
auto typed_message = std::static_pointer_cast<CallbackMessageT>(message); auto typed_message = std::static_pointer_cast<CallbackMessageT>(message);
any_callback_.dispatch(typed_message, message_info); any_callback_.dispatch(typed_message, message_info);
@ -266,89 +152,109 @@ public:
rcl_interfaces::msg::IntraProcessMessage & ipm, rcl_interfaces::msg::IntraProcessMessage & ipm,
const rmw_message_info_t & message_info) const rmw_message_info_t & message_info)
{ {
if (!get_intra_process_message_callback_) { if (!use_intra_process_) {
// throw std::runtime_error( // throw std::runtime_error(
// "handle_intra_process_message called before setup_intra_process"); // "handle_intra_process_message called before setup_intra_process");
// TODO(wjwwood): for now, this could mean that intra process was just not enabled. // TODO(wjwwood): for now, this could mean that intra process was just not enabled.
// However, this can only really happen if this node has it disabled, but the other doesn't. // However, this can only really happen if this node has it disabled, but the other doesn't.
return; return;
} }
MessageUniquePtr msg; if (any_callback_.use_take_shared_method()) {
get_intra_process_message_callback_( ConstMessageSharedPtr msg;
ipm.publisher_id, take_intra_process_message(
ipm.message_sequence, ipm.publisher_id,
intra_process_subscription_id_, ipm.message_sequence,
msg); intra_process_subscription_id_,
if (!msg) { msg);
// This either occurred because the publisher no longer exists or the if (!msg) {
// message requested is no longer being stored. // This either occurred because the publisher no longer exists or the
// TODO(wjwwood): should we notify someone of this? log error, log warning? // message requested is no longer being stored.
return; RCLCPP_WARN(get_logger("rclcpp"),
} "Intra process message not longer being stored when trying to handle it");
any_callback_.dispatch_intra_process(msg, message_info); return;
}
using GetMessageCallbackType =
std::function<void (uint64_t, uint64_t, uint64_t, MessageUniquePtr &)>;
using MatchesAnyPublishersCallbackType = std::function<bool (const rmw_gid_t *)>;
/// Implemenation detail.
// TODO(ivanpauno): This can be moved to the base class. No reason to be here.
// Also get_intra_process_message_callback_ and matches_any_intra_process_publishers_.
void setup_intra_process(
uint64_t intra_process_subscription_id,
GetMessageCallbackType get_message_callback,
MatchesAnyPublishersCallbackType matches_any_publisher_callback,
IntraProcessManagerWeakPtr weak_ipm,
const rcl_subscription_options_t & intra_process_options)
{
std::string intra_process_topic_name = std::string(get_topic_name()) + "/_intra";
rcl_ret_t ret = rcl_subscription_init(
intra_process_subscription_handle_.get(),
node_handle_.get(),
rclcpp::type_support::get_intra_process_message_msg_type_support(),
intra_process_topic_name.c_str(),
&intra_process_options);
if (ret != RCL_RET_OK) {
if (ret == RCL_RET_TOPIC_NAME_INVALID) {
auto rcl_node_handle = node_handle_.get();
// this will throw on any validation problem
rcl_reset_error();
expand_topic_or_service_name(
intra_process_topic_name,
rcl_node_get_name(rcl_node_handle),
rcl_node_get_namespace(rcl_node_handle));
} }
any_callback_.dispatch_intra_process(msg, message_info);
rclcpp::exceptions::throw_from_rcl_error(ret, "could not create intra process subscription"); } else {
MessageUniquePtr msg;
take_intra_process_message(
ipm.publisher_id,
ipm.message_sequence,
intra_process_subscription_id_,
msg);
if (!msg) {
// This either occurred because the publisher no longer exists or the
// message requested is no longer being stored.
RCLCPP_WARN(get_logger("rclcpp"),
"Intra process message not longer being stored when trying to handle it");
return;
}
any_callback_.dispatch_intra_process(std::move(msg), message_info);
} }
intra_process_subscription_id_ = intra_process_subscription_id;
get_intra_process_message_callback_ = get_message_callback;
matches_any_intra_process_publishers_ = matches_any_publisher_callback;
weak_ipm_ = weak_ipm;
use_intra_process_ = true;
} }
/// Implemenation detail. /// Implemenation detail.
const std::shared_ptr<rcl_subscription_t> const std::shared_ptr<rcl_subscription_t>
get_intra_process_subscription_handle() const get_intra_process_subscription_handle() const
{ {
if (!get_intra_process_message_callback_) { if (!use_intra_process_) {
return nullptr; return nullptr;
} }
return intra_process_subscription_handle_; return intra_process_subscription_handle_;
} }
private: private:
void
take_intra_process_message(
uint64_t publisher_id,
uint64_t message_sequence,
uint64_t subscription_id,
MessageUniquePtr & message)
{
auto ipm = weak_ipm_.lock();
if (!ipm) {
throw std::runtime_error(
"intra process take called after destruction of intra process manager");
}
ipm->template take_intra_process_message<CallbackMessageT, Alloc>(
publisher_id, message_sequence, subscription_id, message);
}
void
take_intra_process_message(
uint64_t publisher_id,
uint64_t message_sequence,
uint64_t subscription_id,
ConstMessageSharedPtr & message)
{
auto ipm = weak_ipm_.lock();
if (!ipm) {
throw std::runtime_error(
"intra process take called after destruction of intra process manager");
}
ipm->template take_intra_process_message<CallbackMessageT, Alloc>(
publisher_id, message_sequence, subscription_id, message);
}
bool
matches_any_intra_process_publishers(const rmw_gid_t * sender_gid)
{
if (!use_intra_process_) {
return false;
}
auto ipm = weak_ipm_.lock();
if (!ipm) {
throw std::runtime_error(
"intra process publisher check called "
"after destruction of intra process manager");
}
return ipm->matches_any_publishers(sender_gid);
}
RCLCPP_DISABLE_COPY(Subscription) RCLCPP_DISABLE_COPY(Subscription)
AnySubscriptionCallback<CallbackMessageT, Alloc> any_callback_; AnySubscriptionCallback<CallbackMessageT, Alloc> any_callback_;
typename message_memory_strategy::MessageMemoryStrategy<CallbackMessageT, Alloc>::SharedPtr typename message_memory_strategy::MessageMemoryStrategy<CallbackMessageT, Alloc>::SharedPtr
message_memory_strategy_; message_memory_strategy_;
GetMessageCallbackType get_intra_process_message_callback_;
MatchesAnyPublishersCallbackType matches_any_intra_process_publishers_;
}; };
} // namespace rclcpp } // namespace rclcpp

View file

@ -0,0 +1,165 @@
// Copyright 2019 Open Source Robotics Foundation, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef RCLCPP__SUBSCRIPTION_BASE_HPP_
#define RCLCPP__SUBSCRIPTION_BASE_HPP_
#include <memory>
#include <string>
#include "rcl/subscription.h"
#include "rcl_interfaces/msg/intra_process_message.hpp"
#include "rmw/rmw.h"
#include "rclcpp/any_subscription_callback.hpp"
#include "rclcpp/macros.hpp"
#include "rclcpp/type_support_decl.hpp"
#include "rclcpp/visibility_control.hpp"
namespace rclcpp
{
namespace node_interfaces
{
class NodeTopicsInterface;
} // namespace node_interfaces
namespace intra_process_manager
{
/**
* IntraProcessManager is forward declared here, avoiding a circular inclusion between
* `intra_process_manager.hpp` and `subscription_base.hpp`.
*/
class IntraProcessManager;
}
/// Virtual base class for subscriptions. This pattern allows us to iterate over different template
/// specializations of Subscription, among other things.
class SubscriptionBase
{
public:
RCLCPP_SMART_PTR_DEFINITIONS_NOT_COPYABLE(SubscriptionBase)
/// Default constructor.
/**
* \param[in] node_handle The rcl representation of the node that owns this subscription.
* \param[in] type_support_handle rosidl type support struct, for the Message type of the topic.
* \param[in] topic_name Name of the topic to subscribe to.
* \param[in] subscription_options options for the subscription.
* \param[in] is_serialized is true if the message will be delivered still serialized
*/
RCLCPP_PUBLIC
SubscriptionBase(
std::shared_ptr<rcl_node_t> node_handle,
const rosidl_message_type_support_t & type_support_handle,
const std::string & topic_name,
const rcl_subscription_options_t & subscription_options,
bool is_serialized = false);
/// Default destructor.
RCLCPP_PUBLIC
virtual ~SubscriptionBase();
/// Get the topic that this subscription is subscribed on.
RCLCPP_PUBLIC
const char *
get_topic_name() const;
RCLCPP_PUBLIC
std::shared_ptr<rcl_subscription_t>
get_subscription_handle();
RCLCPP_PUBLIC
const std::shared_ptr<rcl_subscription_t>
get_subscription_handle() const;
RCLCPP_PUBLIC
virtual const std::shared_ptr<rcl_subscription_t>
get_intra_process_subscription_handle() const;
/// Borrow a new message.
/** \return Shared pointer to the fresh message. */
virtual std::shared_ptr<void>
create_message() = 0;
/// Borrow a new serialized message
/** \return Shared pointer to a rcl_message_serialized_t. */
virtual std::shared_ptr<rcl_serialized_message_t>
create_serialized_message() = 0;
/// Check if we need to handle the message, and execute the callback if we do.
/**
* \param[in] message Shared pointer to the message to handle.
* \param[in] message_info Metadata associated with this message.
*/
virtual void
handle_message(std::shared_ptr<void> & message, const rmw_message_info_t & message_info) = 0;
/// Return the message borrowed in create_message.
/** \param[in] message Shared pointer to the returned message. */
virtual void
return_message(std::shared_ptr<void> & message) = 0;
/// Return the message borrowed in create_serialized_message.
/** \param[in] message Shared pointer to the returned message. */
virtual void
return_serialized_message(std::shared_ptr<rcl_serialized_message_t> & message) = 0;
virtual void
handle_intra_process_message(
rcl_interfaces::msg::IntraProcessMessage & ipm,
const rmw_message_info_t & message_info) = 0;
const rosidl_message_type_support_t &
get_message_type_support_handle() const;
bool
is_serialized() const;
/// Get matching publisher count.
/** \return The number of publishers on this topic. */
RCLCPP_PUBLIC
size_t
get_publisher_count() const;
using IntraProcessManagerWeakPtr =
std::weak_ptr<rclcpp::intra_process_manager::IntraProcessManager>;
/// Implemenation detail.
void setup_intra_process(
uint64_t intra_process_subscription_id,
IntraProcessManagerWeakPtr weak_ipm,
const rcl_subscription_options_t & intra_process_options);
protected:
std::shared_ptr<rcl_subscription_t> intra_process_subscription_handle_;
std::shared_ptr<rcl_subscription_t> subscription_handle_;
std::shared_ptr<rcl_node_t> node_handle_;
bool use_intra_process_;
IntraProcessManagerWeakPtr weak_ipm_;
uint64_t intra_process_subscription_id_;
private:
RCLCPP_DISABLE_COPY(SubscriptionBase)
rosidl_message_type_support_t type_support_;
bool is_serialized_;
};
} // namespace rclcpp
#endif // RCLCPP__SUBSCRIPTION_BASE_HPP_

View file

@ -114,64 +114,6 @@ create_subscription_factory(
return sub_base_ptr; return sub_base_ptr;
}; };
// function that will setup intra process communications for the subscription
factory.setup_intra_process =
[message_alloc](
rclcpp::intra_process_manager::IntraProcessManager::SharedPtr ipm,
rclcpp::SubscriptionBase::SharedPtr subscription,
const rcl_subscription_options_t & subscription_options)
{
rclcpp::intra_process_manager::IntraProcessManager::WeakPtr weak_ipm = ipm;
uint64_t intra_process_subscription_id = ipm->add_subscription(subscription);
auto intra_process_options = rcl_subscription_get_default_options();
intra_process_options.allocator = rclcpp::allocator::get_rcl_allocator<CallbackMessageT>(
*message_alloc.get());
intra_process_options.qos = subscription_options.qos;
intra_process_options.ignore_local_publications = false;
// function that will be called to take a MessageT from the intra process manager
auto take_intra_process_message_func =
[weak_ipm](
uint64_t publisher_id,
uint64_t message_sequence,
uint64_t subscription_id,
typename rclcpp::Subscription<CallbackMessageT, Alloc>::MessageUniquePtr & message)
{
auto ipm = weak_ipm.lock();
if (!ipm) {
// TODO(wjwwood): should this just return silently? Or return with a logged warning?
throw std::runtime_error(
"intra process take called after destruction of intra process manager");
}
ipm->take_intra_process_message<CallbackMessageT, Alloc>(
publisher_id, message_sequence, subscription_id, message);
};
// function that is called to see if the publisher id matches any local publishers
auto matches_any_publisher_func =
[weak_ipm](const rmw_gid_t * sender_gid) -> bool
{
auto ipm = weak_ipm.lock();
if (!ipm) {
throw std::runtime_error(
"intra process publisher check called "
"after destruction of intra process manager");
}
return ipm->matches_any_publishers(sender_gid);
};
auto typed_sub_ptr = std::dynamic_pointer_cast<SubscriptionT>(subscription);
typed_sub_ptr->setup_intra_process(
intra_process_subscription_id,
take_intra_process_message_func,
matches_any_publisher_func,
weak_ipm,
intra_process_options
);
};
// end definition of factory function to setup intra process
// return the factory now that it is populated // return the factory now that it is populated
return factory; return factory;
} }

View file

@ -29,6 +29,21 @@ IntraProcessManager::IntraProcessManager(
IntraProcessManager::~IntraProcessManager() IntraProcessManager::~IntraProcessManager()
{} {}
uint64_t
IntraProcessManager::add_publisher(
rclcpp::PublisherBase::SharedPtr publisher,
size_t buffer_size)
{
auto id = IntraProcessManager::get_next_unique_id();
size_t size = buffer_size > 0 ? buffer_size : publisher->get_queue_size();
auto mrb = publisher->make_mapped_ring_buffer(size);
impl_->add_publisher(id, publisher, mrb, size);
if (!mrb) {
throw std::runtime_error("failed to create a mapped ring buffer");
}
return id;
}
uint64_t uint64_t
IntraProcessManager::add_subscription( IntraProcessManager::add_subscription(
rclcpp::SubscriptionBase::SharedPtr subscription) rclcpp::SubscriptionBase::SharedPtr subscription)

View file

@ -47,13 +47,9 @@ NodeTopics::create_publisher(
// Get the intra process manager instance for this context. // Get the intra process manager instance for this context.
auto ipm = context->get_sub_context<rclcpp::intra_process_manager::IntraProcessManager>(); auto ipm = context->get_sub_context<rclcpp::intra_process_manager::IntraProcessManager>();
// Register the publisher with the intra process manager. // Register the publisher with the intra process manager.
uint64_t intra_process_publisher_id = uint64_t intra_process_publisher_id = ipm->add_publisher(publisher);
publisher_factory.add_publisher_to_intra_process_manager(ipm.get(), publisher);
// Create a function to be called when publisher to do the intra process publish.
auto shared_publish_callback = publisher_factory.create_shared_publish_callback(ipm);
publisher->setup_intra_process( publisher->setup_intra_process(
intra_process_publisher_id, intra_process_publisher_id,
shared_publish_callback,
ipm, ipm,
publisher_options); publisher_options);
} }
@ -93,10 +89,11 @@ NodeTopics::create_subscription(
// Setup intra process publishing if requested. // Setup intra process publishing if requested.
if (use_intra_process) { if (use_intra_process) {
auto context = node_base_->get_context(); auto context = node_base_->get_context();
auto intra_process_manager = auto ipm =
context->get_sub_context<rclcpp::intra_process_manager::IntraProcessManager>(); context->get_sub_context<rclcpp::intra_process_manager::IntraProcessManager>();
subscription_factory.setup_intra_process( uint64_t intra_process_subscription_id = ipm->add_subscription(subscription);
intra_process_manager, subscription, subscription_options); subscription_options.ignore_local_publications = false;
subscription->setup_intra_process(intra_process_subscription_id, ipm, subscription_options);
} }
// Return the completed subscription. // Return the completed subscription.

View file

@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
#include "rclcpp/publisher.hpp" #include "rclcpp/publisher_base.hpp"
#include <rmw/error_handling.h> #include <rmw/error_handling.h>
#include <rmw/rmw.h> #include <rmw/rmw.h>
@ -43,8 +43,7 @@ PublisherBase::PublisherBase(
const rosidl_message_type_support_t & type_support, const rosidl_message_type_support_t & type_support,
const rcl_publisher_options_t & publisher_options) const rcl_publisher_options_t & publisher_options)
: rcl_node_handle_(node_base->get_shared_rcl_node_handle()), : rcl_node_handle_(node_base->get_shared_rcl_node_handle()),
intra_process_is_enabled_(false), intra_process_publisher_id_(0), intra_process_is_enabled_(false), intra_process_publisher_id_(0)
store_intra_process_message_(nullptr)
{ {
rcl_ret_t ret = rcl_publisher_init( rcl_ret_t ret = rcl_publisher_init(
&publisher_handle_, &publisher_handle_,
@ -235,10 +234,16 @@ PublisherBase::operator==(const rmw_gid_t * gid) const
return result; return result;
} }
rclcpp::mapped_ring_buffer::MappedRingBufferBase::SharedPtr
PublisherBase::make_mapped_ring_buffer(size_t size) const
{
(void)size;
return nullptr;
}
void void
PublisherBase::setup_intra_process( PublisherBase::setup_intra_process(
uint64_t intra_process_publisher_id, uint64_t intra_process_publisher_id,
StoreMessageCallbackT store_callback,
IntraProcessManagerSharedPtr ipm, IntraProcessManagerSharedPtr ipm,
const rcl_publisher_options_t & intra_process_options) const rcl_publisher_options_t & intra_process_options)
{ {
@ -275,7 +280,6 @@ PublisherBase::setup_intra_process(
} }
intra_process_publisher_id_ = intra_process_publisher_id; intra_process_publisher_id_ = intra_process_publisher_id;
store_intra_process_message_ = store_callback;
weak_ipm_ = ipm; weak_ipm_ = ipm;
intra_process_is_enabled_ = true; intra_process_is_enabled_ = true;

View file

@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
#include "rclcpp/subscription.hpp" #include "rclcpp/subscription_base.hpp"
#include <cstdio> #include <cstdio>
#include <memory> #include <memory>
@ -147,3 +147,34 @@ SubscriptionBase::get_publisher_count() const
} }
return inter_process_publisher_count; return inter_process_publisher_count;
} }
void SubscriptionBase::setup_intra_process(
uint64_t intra_process_subscription_id,
IntraProcessManagerWeakPtr weak_ipm,
const rcl_subscription_options_t & intra_process_options)
{
std::string intra_process_topic_name = std::string(get_topic_name()) + "/_intra";
rcl_ret_t ret = rcl_subscription_init(
intra_process_subscription_handle_.get(),
node_handle_.get(),
rclcpp::type_support::get_intra_process_message_msg_type_support(),
intra_process_topic_name.c_str(),
&intra_process_options);
if (ret != RCL_RET_OK) {
if (ret == RCL_RET_TOPIC_NAME_INVALID) {
auto rcl_node_handle = node_handle_.get();
// this will throw on any validation problem
rcl_reset_error();
expand_topic_or_service_name(
intra_process_topic_name,
rcl_node_get_name(rcl_node_handle),
rcl_node_get_namespace(rcl_node_handle));
}
rclcpp::exceptions::throw_from_rcl_error(ret, "could not create intra process subscription");
}
intra_process_subscription_id_ = intra_process_subscription_id;
weak_ipm_ = weak_ipm;
use_intra_process_ = true;
}

View file

@ -14,10 +14,13 @@
#include <memory> #include <memory>
#include <string> #include <string>
#include <utility>
#define RCLCPP_BUILDING_LIBRARY 1
#include "gtest/gtest.h" #include "gtest/gtest.h"
#include "rclcpp/allocator/allocator_common.hpp" #include "rclcpp/allocator/allocator_common.hpp"
#include "rclcpp/macros.hpp" #include "rclcpp/macros.hpp"
#include "rclcpp/mapped_ring_buffer.hpp"
#include "rmw/types.h" #include "rmw/types.h"
// Mock up publisher and subscription base to avoid needing an rmw impl. // Mock up publisher and subscription base to avoid needing an rmw impl.
@ -50,6 +53,14 @@ public:
return false; return false;
} }
virtual
mapped_ring_buffer::MappedRingBufferBase::SharedPtr
make_mapped_ring_buffer(size_t size) const
{
(void)size;
return nullptr;
}
std::string mock_topic_name; std::string mock_topic_name;
size_t mock_queue_size; size_t mock_queue_size;
}; };
@ -71,6 +82,15 @@ public:
allocator_ = std::make_shared<MessageAlloc>(); allocator_ = std::make_shared<MessageAlloc>();
} }
mapped_ring_buffer::MappedRingBufferBase::SharedPtr
make_mapped_ring_buffer(size_t size) const override
{
return mapped_ring_buffer::MappedRingBuffer<
T,
typename Publisher<T, Alloc>::MessageAlloc
>::make_shared(size, allocator_);
}
std::shared_ptr<MessageAlloc> get_allocator() std::shared_ptr<MessageAlloc> get_allocator()
{ {
return allocator_; return allocator_;
@ -109,10 +129,9 @@ public:
} // namespace mock } // namespace mock
} // namespace rclcpp } // namespace rclcpp
// Prevent rclcpp/publisher.hpp and rclcpp/subscription.hpp from being imported. // Prevent rclcpp/publisher_base.hpp and rclcpp/subscription.hpp from being imported.
#define RCLCPP__PUBLISHER_HPP_ #define RCLCPP__PUBLISHER_BASE_HPP_
#define RCLCPP__SUBSCRIPTION_HPP_ #define RCLCPP__SUBSCRIPTION_BASE_HPP_
#define RCLCPP_BUILDING_LIBRARY 1
// Force ipm to use our mock publisher class. // Force ipm to use our mock publisher class.
#define Publisher mock::Publisher #define Publisher mock::Publisher
#define PublisherBase mock::PublisherBase #define PublisherBase mock::PublisherBase
@ -155,10 +174,8 @@ TEST(TestIntraProcessManager, nominal) {
s1->mock_topic_name = "nominal1"; s1->mock_topic_name = "nominal1";
s1->mock_queue_size = 10; s1->mock_queue_size = 10;
auto p1_id = auto p1_id = ipm.add_publisher(p1);
ipm.add_publisher<rcl_interfaces::msg::IntraProcessMessage, std::allocator<void>>(p1); auto p2_id = ipm.add_publisher(p2);
auto p2_id =
ipm.add_publisher<rcl_interfaces::msg::IntraProcessMessage, std::allocator<void>>(p2);
auto s1_id = ipm.add_subscription(s1); auto s1_id = ipm.add_subscription(s1);
auto ipm_msg = std::make_shared<rcl_interfaces::msg::IntraProcessMessage>(); auto ipm_msg = std::make_shared<rcl_interfaces::msg::IntraProcessMessage>();
@ -169,14 +186,14 @@ TEST(TestIntraProcessManager, nominal) {
); );
auto p1_m1_original_address = unique_msg.get(); auto p1_m1_original_address = unique_msg.get();
auto p1_m1_id = ipm.store_intra_process_message(p1_id, unique_msg); auto p1_m1_id = ipm.store_intra_process_message(p1_id, std::move(unique_msg));
ASSERT_EQ(nullptr, unique_msg); ASSERT_EQ(nullptr, unique_msg);
ipm_msg->message_sequence = 43; ipm_msg->message_sequence = 43;
ipm_msg->publisher_id = 43; ipm_msg->publisher_id = 43;
unique_msg.reset(new rcl_interfaces::msg::IntraProcessMessage(*ipm_msg)); unique_msg.reset(new rcl_interfaces::msg::IntraProcessMessage(*ipm_msg));
auto p2_m1_id = ipm.store_intra_process_message(p2_id, unique_msg); auto p2_m1_id = ipm.store_intra_process_message(p2_id, std::move(unique_msg));
ASSERT_EQ(nullptr, unique_msg); ASSERT_EQ(nullptr, unique_msg);
ipm.take_intra_process_message(p2_id, p2_m1_id, s1_id, unique_msg); ipm.take_intra_process_message(p2_id, p2_m1_id, s1_id, unique_msg);
@ -198,26 +215,22 @@ TEST(TestIntraProcessManager, nominal) {
ipm_msg->publisher_id = 44; ipm_msg->publisher_id = 44;
unique_msg.reset(new rcl_interfaces::msg::IntraProcessMessage(*ipm_msg)); unique_msg.reset(new rcl_interfaces::msg::IntraProcessMessage(*ipm_msg));
ipm.store_intra_process_message(p1_id, unique_msg); ipm.store_intra_process_message(p1_id, std::move(unique_msg));
ASSERT_EQ(nullptr, unique_msg); ASSERT_EQ(nullptr, unique_msg);
ipm_msg->message_sequence = 45; ipm_msg->message_sequence = 45;
ipm_msg->publisher_id = 45; ipm_msg->publisher_id = 45;
unique_msg.reset(new rcl_interfaces::msg::IntraProcessMessage(*ipm_msg)); unique_msg.reset(new rcl_interfaces::msg::IntraProcessMessage(*ipm_msg));
ipm.store_intra_process_message(p1_id, unique_msg); ipm.store_intra_process_message(p1_id, std::move(unique_msg));
ASSERT_EQ(nullptr, unique_msg); ASSERT_EQ(nullptr, unique_msg);
ipm_msg->message_sequence = 46; ipm_msg->message_sequence = 46;
ipm_msg->publisher_id = 46; ipm_msg->publisher_id = 46;
unique_msg.reset(new rcl_interfaces::msg::IntraProcessMessage(*ipm_msg)); unique_msg.reset(new rcl_interfaces::msg::IntraProcessMessage(*ipm_msg));
ipm.store_intra_process_message(p1_id, unique_msg); ipm.store_intra_process_message(p1_id, std::move(unique_msg));
ASSERT_NE(nullptr, unique_msg); ASSERT_EQ(nullptr, unique_msg);
if (unique_msg) {
EXPECT_EQ(44ul, unique_msg->message_sequence);
EXPECT_EQ(44ul, unique_msg->publisher_id);
}
} }
/* /*
@ -240,8 +253,7 @@ TEST(TestIntraProcessManager, remove_publisher_before_trying_to_take) {
s1->mock_topic_name = "nominal1"; s1->mock_topic_name = "nominal1";
s1->mock_queue_size = 10; s1->mock_queue_size = 10;
auto p1_id = auto p1_id = ipm.add_publisher(p1);
ipm.add_publisher<rcl_interfaces::msg::IntraProcessMessage, std::allocator<void>>(p1);
auto s1_id = ipm.add_subscription(s1); auto s1_id = ipm.add_subscription(s1);
auto ipm_msg = std::make_shared<rcl_interfaces::msg::IntraProcessMessage>(); auto ipm_msg = std::make_shared<rcl_interfaces::msg::IntraProcessMessage>();
@ -251,7 +263,7 @@ TEST(TestIntraProcessManager, remove_publisher_before_trying_to_take) {
new rcl_interfaces::msg::IntraProcessMessage(*ipm_msg) new rcl_interfaces::msg::IntraProcessMessage(*ipm_msg)
); );
auto p1_m1_id = ipm.store_intra_process_message(p1_id, unique_msg); auto p1_m1_id = ipm.store_intra_process_message(p1_id, std::move(unique_msg));
ASSERT_EQ(nullptr, unique_msg); ASSERT_EQ(nullptr, unique_msg);
ipm.remove_publisher(p1_id); ipm.remove_publisher(p1_id);
@ -290,8 +302,7 @@ TEST(TestIntraProcessManager, removed_subscription_affects_take) {
s3->mock_topic_name = "nominal1"; s3->mock_topic_name = "nominal1";
s3->mock_queue_size = 10; s3->mock_queue_size = 10;
auto p1_id = auto p1_id = ipm.add_publisher(p1);
ipm.add_publisher<rcl_interfaces::msg::IntraProcessMessage, std::allocator<void>>(p1);
auto s1_id = ipm.add_subscription(s1); auto s1_id = ipm.add_subscription(s1);
auto s2_id = ipm.add_subscription(s2); auto s2_id = ipm.add_subscription(s2);
auto s3_id = ipm.add_subscription(s3); auto s3_id = ipm.add_subscription(s3);
@ -304,7 +315,7 @@ TEST(TestIntraProcessManager, removed_subscription_affects_take) {
); );
auto original_message_pointer = unique_msg.get(); auto original_message_pointer = unique_msg.get();
auto p1_m1_id = ipm.store_intra_process_message(p1_id, unique_msg); auto p1_m1_id = ipm.store_intra_process_message(p1_id, std::move(unique_msg));
ASSERT_EQ(nullptr, unique_msg); ASSERT_EQ(nullptr, unique_msg);
ipm.take_intra_process_message(p1_id, p1_m1_id, s1_id, unique_msg); ipm.take_intra_process_message(p1_id, p1_m1_id, s1_id, unique_msg);
@ -361,8 +372,7 @@ TEST(TestIntraProcessManager, multiple_subscriptions_one_publisher) {
s3->mock_topic_name = "nominal1"; s3->mock_topic_name = "nominal1";
s3->mock_queue_size = 10; s3->mock_queue_size = 10;
auto p1_id = auto p1_id = ipm.add_publisher(p1);
ipm.add_publisher<rcl_interfaces::msg::IntraProcessMessage, std::allocator<void>>(p1);
auto s1_id = ipm.add_subscription(s1); auto s1_id = ipm.add_subscription(s1);
auto s2_id = ipm.add_subscription(s2); auto s2_id = ipm.add_subscription(s2);
auto s3_id = ipm.add_subscription(s3); auto s3_id = ipm.add_subscription(s3);
@ -375,7 +385,7 @@ TEST(TestIntraProcessManager, multiple_subscriptions_one_publisher) {
); );
auto original_message_pointer = unique_msg.get(); auto original_message_pointer = unique_msg.get();
auto p1_m1_id = ipm.store_intra_process_message(p1_id, unique_msg); auto p1_m1_id = ipm.store_intra_process_message(p1_id, std::move(unique_msg));
ASSERT_EQ(nullptr, unique_msg); ASSERT_EQ(nullptr, unique_msg);
ipm.take_intra_process_message(p1_id, p1_m1_id, s1_id, unique_msg); ipm.take_intra_process_message(p1_id, p1_m1_id, s1_id, unique_msg);
@ -437,12 +447,9 @@ TEST(TestIntraProcessManager, multiple_publishers_one_subscription) {
s1->mock_topic_name = "nominal1"; s1->mock_topic_name = "nominal1";
s1->mock_queue_size = 10; s1->mock_queue_size = 10;
auto p1_id = auto p1_id = ipm.add_publisher(p1);
ipm.add_publisher<rcl_interfaces::msg::IntraProcessMessage, std::allocator<void>>(p1); auto p2_id = ipm.add_publisher(p2);
auto p2_id = auto p3_id = ipm.add_publisher(p3);
ipm.add_publisher<rcl_interfaces::msg::IntraProcessMessage, std::allocator<void>>(p2);
auto p3_id =
ipm.add_publisher<rcl_interfaces::msg::IntraProcessMessage, std::allocator<void>>(p3);
auto s1_id = ipm.add_subscription(s1); auto s1_id = ipm.add_subscription(s1);
auto ipm_msg = std::make_shared<rcl_interfaces::msg::IntraProcessMessage>(); auto ipm_msg = std::make_shared<rcl_interfaces::msg::IntraProcessMessage>();
@ -454,7 +461,7 @@ TEST(TestIntraProcessManager, multiple_publishers_one_subscription) {
); );
auto original_message_pointer1 = unique_msg.get(); auto original_message_pointer1 = unique_msg.get();
auto p1_m1_id = ipm.store_intra_process_message(p1_id, unique_msg); auto p1_m1_id = ipm.store_intra_process_message(p1_id, std::move(unique_msg));
ASSERT_EQ(nullptr, unique_msg); ASSERT_EQ(nullptr, unique_msg);
// Second publish // Second publish
@ -463,7 +470,7 @@ TEST(TestIntraProcessManager, multiple_publishers_one_subscription) {
unique_msg.reset(new rcl_interfaces::msg::IntraProcessMessage(*ipm_msg)); unique_msg.reset(new rcl_interfaces::msg::IntraProcessMessage(*ipm_msg));
auto original_message_pointer2 = unique_msg.get(); auto original_message_pointer2 = unique_msg.get();
auto p2_m1_id = ipm.store_intra_process_message(p2_id, unique_msg); auto p2_m1_id = ipm.store_intra_process_message(p2_id, std::move(unique_msg));
ASSERT_EQ(nullptr, unique_msg); ASSERT_EQ(nullptr, unique_msg);
// Third publish // Third publish
@ -472,7 +479,7 @@ TEST(TestIntraProcessManager, multiple_publishers_one_subscription) {
unique_msg.reset(new rcl_interfaces::msg::IntraProcessMessage(*ipm_msg)); unique_msg.reset(new rcl_interfaces::msg::IntraProcessMessage(*ipm_msg));
auto original_message_pointer3 = unique_msg.get(); auto original_message_pointer3 = unique_msg.get();
auto p3_m1_id = ipm.store_intra_process_message(p3_id, unique_msg); auto p3_m1_id = ipm.store_intra_process_message(p3_id, std::move(unique_msg));
ASSERT_EQ(nullptr, unique_msg); ASSERT_EQ(nullptr, unique_msg);
// First take // First take
@ -545,12 +552,9 @@ TEST(TestIntraProcessManager, multiple_publishers_multiple_subscription) {
s3->mock_topic_name = "nominal1"; s3->mock_topic_name = "nominal1";
s3->mock_queue_size = 10; s3->mock_queue_size = 10;
auto p1_id = auto p1_id = ipm.add_publisher(p1);
ipm.add_publisher<rcl_interfaces::msg::IntraProcessMessage, std::allocator<void>>(p1); auto p2_id = ipm.add_publisher(p2);
auto p2_id = auto p3_id = ipm.add_publisher(p3);
ipm.add_publisher<rcl_interfaces::msg::IntraProcessMessage, std::allocator<void>>(p2);
auto p3_id =
ipm.add_publisher<rcl_interfaces::msg::IntraProcessMessage, std::allocator<void>>(p3);
auto s1_id = ipm.add_subscription(s1); auto s1_id = ipm.add_subscription(s1);
auto s2_id = ipm.add_subscription(s2); auto s2_id = ipm.add_subscription(s2);
auto s3_id = ipm.add_subscription(s3); auto s3_id = ipm.add_subscription(s3);
@ -564,7 +568,7 @@ TEST(TestIntraProcessManager, multiple_publishers_multiple_subscription) {
); );
auto original_message_pointer1 = unique_msg.get(); auto original_message_pointer1 = unique_msg.get();
auto p1_m1_id = ipm.store_intra_process_message(p1_id, unique_msg); auto p1_m1_id = ipm.store_intra_process_message(p1_id, std::move(unique_msg));
ASSERT_EQ(nullptr, unique_msg); ASSERT_EQ(nullptr, unique_msg);
// Second publish // Second publish
@ -573,7 +577,7 @@ TEST(TestIntraProcessManager, multiple_publishers_multiple_subscription) {
unique_msg.reset(new rcl_interfaces::msg::IntraProcessMessage(*ipm_msg)); unique_msg.reset(new rcl_interfaces::msg::IntraProcessMessage(*ipm_msg));
auto original_message_pointer2 = unique_msg.get(); auto original_message_pointer2 = unique_msg.get();
auto p2_m1_id = ipm.store_intra_process_message(p2_id, unique_msg); auto p2_m1_id = ipm.store_intra_process_message(p2_id, std::move(unique_msg));
ASSERT_EQ(nullptr, unique_msg); ASSERT_EQ(nullptr, unique_msg);
// Third publish // Third publish
@ -582,7 +586,7 @@ TEST(TestIntraProcessManager, multiple_publishers_multiple_subscription) {
unique_msg.reset(new rcl_interfaces::msg::IntraProcessMessage(*ipm_msg)); unique_msg.reset(new rcl_interfaces::msg::IntraProcessMessage(*ipm_msg));
auto original_message_pointer3 = unique_msg.get(); auto original_message_pointer3 = unique_msg.get();
auto p3_m1_id = ipm.store_intra_process_message(p3_id, unique_msg); auto p3_m1_id = ipm.store_intra_process_message(p3_id, std::move(unique_msg));
ASSERT_EQ(nullptr, unique_msg); ASSERT_EQ(nullptr, unique_msg);
// First take // First take
@ -692,8 +696,7 @@ TEST(TestIntraProcessManager, ring_buffer_displacement) {
s1->mock_topic_name = "nominal1"; s1->mock_topic_name = "nominal1";
s1->mock_queue_size = 10; s1->mock_queue_size = 10;
auto p1_id = auto p1_id = ipm.add_publisher(p1);
ipm.add_publisher<rcl_interfaces::msg::IntraProcessMessage, std::allocator<void>>(p1);
auto s1_id = ipm.add_subscription(s1); auto s1_id = ipm.add_subscription(s1);
auto ipm_msg = std::make_shared<rcl_interfaces::msg::IntraProcessMessage>(); auto ipm_msg = std::make_shared<rcl_interfaces::msg::IntraProcessMessage>();
@ -703,8 +706,7 @@ TEST(TestIntraProcessManager, ring_buffer_displacement) {
new rcl_interfaces::msg::IntraProcessMessage(*ipm_msg) new rcl_interfaces::msg::IntraProcessMessage(*ipm_msg)
); );
auto original_message_pointer1 = unique_msg.get(); auto p1_m1_id = ipm.store_intra_process_message(p1_id, std::move(unique_msg));
auto p1_m1_id = ipm.store_intra_process_message(p1_id, unique_msg);
ASSERT_EQ(nullptr, unique_msg); ASSERT_EQ(nullptr, unique_msg);
ipm_msg->message_sequence = 43; ipm_msg->message_sequence = 43;
@ -712,7 +714,7 @@ TEST(TestIntraProcessManager, ring_buffer_displacement) {
unique_msg.reset(new rcl_interfaces::msg::IntraProcessMessage(*ipm_msg)); unique_msg.reset(new rcl_interfaces::msg::IntraProcessMessage(*ipm_msg));
auto original_message_pointer2 = unique_msg.get(); auto original_message_pointer2 = unique_msg.get();
auto p1_m2_id = ipm.store_intra_process_message(p1_id, unique_msg); auto p1_m2_id = ipm.store_intra_process_message(p1_id, std::move(unique_msg));
ASSERT_EQ(nullptr, unique_msg); ASSERT_EQ(nullptr, unique_msg);
ipm.take_intra_process_message(p1_id, p1_m2_id, s1_id, unique_msg); ipm.take_intra_process_message(p1_id, p1_m2_id, s1_id, unique_msg);
@ -728,14 +730,8 @@ TEST(TestIntraProcessManager, ring_buffer_displacement) {
ipm_msg->publisher_id = 44; ipm_msg->publisher_id = 44;
unique_msg.reset(new rcl_interfaces::msg::IntraProcessMessage(*ipm_msg)); unique_msg.reset(new rcl_interfaces::msg::IntraProcessMessage(*ipm_msg));
ipm.store_intra_process_message(p1_id, unique_msg); ipm.store_intra_process_message(p1_id, std::move(unique_msg));
EXPECT_NE(nullptr, unique_msg); // Should return the thing in the ring buffer it displaced. EXPECT_EQ(nullptr, unique_msg);
if (unique_msg) {
// This should have been the first published message.
EXPECT_EQ(42ul, unique_msg->message_sequence);
EXPECT_EQ(42ul, unique_msg->publisher_id);
EXPECT_EQ(original_message_pointer1, unique_msg.get());
}
unique_msg.reset(); unique_msg.reset();
// Since it just got displaced it should no longer be there to take. // Since it just got displaced it should no longer be there to take.
@ -759,8 +755,7 @@ TEST(TestIntraProcessManager, subscription_creation_race_condition) {
p1->mock_topic_name = "nominal1"; p1->mock_topic_name = "nominal1";
p1->mock_queue_size = 2; p1->mock_queue_size = 2;
auto p1_id = auto p1_id = ipm.add_publisher(p1);
ipm.add_publisher<rcl_interfaces::msg::IntraProcessMessage, std::allocator<void>>(p1);
auto ipm_msg = std::make_shared<rcl_interfaces::msg::IntraProcessMessage>(); auto ipm_msg = std::make_shared<rcl_interfaces::msg::IntraProcessMessage>();
ipm_msg->message_sequence = 42; ipm_msg->message_sequence = 42;
@ -769,7 +764,7 @@ TEST(TestIntraProcessManager, subscription_creation_race_condition) {
new rcl_interfaces::msg::IntraProcessMessage(*ipm_msg) new rcl_interfaces::msg::IntraProcessMessage(*ipm_msg)
); );
auto p1_m1_id = ipm.store_intra_process_message(p1_id, unique_msg); auto p1_m1_id = ipm.store_intra_process_message(p1_id, std::move(unique_msg));
ASSERT_EQ(nullptr, unique_msg); ASSERT_EQ(nullptr, unique_msg);
auto s1 = std::make_shared<rclcpp::mock::SubscriptionBase>(); auto s1 = std::make_shared<rclcpp::mock::SubscriptionBase>();
@ -808,7 +803,7 @@ TEST(TestIntraProcessManager, publisher_out_of_scope_take) {
p1->mock_topic_name = "nominal1"; p1->mock_topic_name = "nominal1";
p1->mock_queue_size = 2; p1->mock_queue_size = 2;
p1_id = ipm.add_publisher<rcl_interfaces::msg::IntraProcessMessage, std::allocator<void>>(p1); p1_id = ipm.add_publisher(p1);
auto ipm_msg = std::make_shared<rcl_interfaces::msg::IntraProcessMessage>(); auto ipm_msg = std::make_shared<rcl_interfaces::msg::IntraProcessMessage>();
ipm_msg->message_sequence = 42; ipm_msg->message_sequence = 42;
@ -817,7 +812,7 @@ TEST(TestIntraProcessManager, publisher_out_of_scope_take) {
new rcl_interfaces::msg::IntraProcessMessage(*ipm_msg) new rcl_interfaces::msg::IntraProcessMessage(*ipm_msg)
); );
p1_m1_id = ipm.store_intra_process_message(p1_id, unique_msg); p1_m1_id = ipm.store_intra_process_message(p1_id, std::move(unique_msg));
ASSERT_EQ(nullptr, unique_msg); ASSERT_EQ(nullptr, unique_msg);
// Explicitly remove publisher from ipm (emulate's publisher's destructor). // Explicitly remove publisher from ipm (emulate's publisher's destructor).
@ -847,7 +842,7 @@ TEST(TestIntraProcessManager, publisher_out_of_scope_store) {
p1->mock_topic_name = "nominal1"; p1->mock_topic_name = "nominal1";
p1->mock_queue_size = 2; p1->mock_queue_size = 2;
p1_id = ipm.add_publisher<rcl_interfaces::msg::IntraProcessMessage, std::allocator<void>>(p1); p1_id = ipm.add_publisher(p1);
} }
auto ipm_msg = std::make_shared<rcl_interfaces::msg::IntraProcessMessage>(); auto ipm_msg = std::make_shared<rcl_interfaces::msg::IntraProcessMessage>();
@ -857,6 +852,6 @@ TEST(TestIntraProcessManager, publisher_out_of_scope_store) {
new rcl_interfaces::msg::IntraProcessMessage(*ipm_msg) new rcl_interfaces::msg::IntraProcessMessage(*ipm_msg)
); );
EXPECT_THROW(ipm.store_intra_process_message(p1_id, unique_msg), std::runtime_error); EXPECT_THROW(ipm.store_intra_process_message(p1_id, std::move(unique_msg)), std::runtime_error);
ASSERT_EQ(nullptr, unique_msg); ASSERT_EQ(nullptr, unique_msg);
} }

View file

@ -12,12 +12,13 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
#include <gtest/gtest.h> #include <memory>
#include <utility>
#include "gtest/gtest.h"
#define RCLCPP_BUILDING_LIBRARY 1 // Prevent including unavailable symbols #define RCLCPP_BUILDING_LIBRARY 1 // Prevent including unavailable symbols
#include <rclcpp/mapped_ring_buffer.hpp> #include "rclcpp/mapped_ring_buffer.hpp"
#include <memory>
/* /*
Tests get_copy and pop on an empty mrb. Tests get_copy and pop on an empty mrb.
@ -28,60 +29,90 @@ TEST(TestMappedRingBuffer, empty) {
// Getting or popping an empty buffer should result in a nullptr. // Getting or popping an empty buffer should result in a nullptr.
rclcpp::mapped_ring_buffer::MappedRingBuffer<char> mrb(1); rclcpp::mapped_ring_buffer::MappedRingBuffer<char> mrb(1);
std::unique_ptr<char> actual; std::unique_ptr<char> unique;
mrb.get_copy_at_key(1, actual); mrb.get(1, unique);
EXPECT_EQ(nullptr, actual); EXPECT_EQ(nullptr, unique);
mrb.pop_at_key(1, actual); mrb.pop(1, unique);
EXPECT_EQ(nullptr, unique);
std::shared_ptr<const char> shared;
mrb.get(1, shared);
EXPECT_EQ(nullptr, shared);
mrb.pop(1, shared);
EXPECT_EQ(nullptr, shared);
}
/*
Tests push_and_replace with a temporary object, and using
get and pop methods with shared_ptr signature.
*/
TEST(TestMappedRingBuffer, temporary_l_value_with_shared_get_pop) {
rclcpp::mapped_ring_buffer::MappedRingBuffer<char> mrb(2);
// Pass in value with temporary object
mrb.push_and_replace(1, std::shared_ptr<const char>(new char('a')));
std::shared_ptr<const char> actual;
mrb.get(1, actual);
EXPECT_EQ('a', *actual);
mrb.pop(1, actual);
EXPECT_EQ('a', *actual);
mrb.get(1, actual);
EXPECT_EQ(nullptr, actual); EXPECT_EQ(nullptr, actual);
} }
/* /*
Tests push_and_replace with a temporary object. Tests push_and_replace with a temporary object, and using
get and pop methods with unique_ptr signature.
*/ */
TEST(TestMappedRingBuffer, temporary_l_value) { TEST(TestMappedRingBuffer, temporary_l_value_with_unique_get_pop) {
rclcpp::mapped_ring_buffer::MappedRingBuffer<char> mrb(2); rclcpp::mapped_ring_buffer::MappedRingBuffer<char> mrb(2);
// Pass in value with temporary object // Pass in value with temporary object
mrb.push_and_replace(1, std::unique_ptr<char>(new char('a'))); mrb.push_and_replace(1, std::shared_ptr<const char>(new char('a')));
std::unique_ptr<char> actual; std::unique_ptr<char> actual;
mrb.get_copy_at_key(1, actual); mrb.get(1, actual);
EXPECT_EQ('a', *actual); EXPECT_EQ('a', *actual);
mrb.pop_at_key(1, actual); mrb.pop(1, actual);
EXPECT_EQ('a', *actual); EXPECT_EQ('a', *actual);
mrb.get_copy_at_key(1, actual); mrb.get(1, actual);
EXPECT_EQ(nullptr, actual); EXPECT_EQ(nullptr, actual);
} }
/* /*
Tests normal usage of the mrb. Tests normal usage of the mrb.
Using shared push_and_replace, get and pop methods.
*/ */
TEST(TestMappedRingBuffer, nominal) { TEST(TestMappedRingBuffer, nominal_push_shared_get_pop_shared) {
rclcpp::mapped_ring_buffer::MappedRingBuffer<char> mrb(2); rclcpp::mapped_ring_buffer::MappedRingBuffer<char> mrb(2);
std::unique_ptr<char> expected(new char('a')); std::shared_ptr<const char> expected(new char('a'));
// Store expected value's address for later comparison.
char * expected_orig = expected.get();
EXPECT_FALSE(mrb.push_and_replace(1, expected)); EXPECT_FALSE(mrb.push_and_replace(1, expected));
EXPECT_EQ(2, expected.use_count());
std::unique_ptr<char> actual; std::shared_ptr<const char> actual;
mrb.get_copy_at_key(1, actual); mrb.get(1, actual);
EXPECT_NE(nullptr, actual); EXPECT_NE(nullptr, actual);
if (actual) { if (actual) {
EXPECT_EQ('a', *actual); EXPECT_EQ('a', *actual);
} }
EXPECT_NE(expected_orig, actual.get()); EXPECT_EQ(expected, actual);
EXPECT_EQ(3, actual.use_count());
mrb.pop_at_key(1, actual); mrb.pop(1, actual);
EXPECT_NE(nullptr, actual); EXPECT_EQ(expected, actual);
if (actual) { if (actual) {
EXPECT_EQ('a', *actual); EXPECT_EQ('a', *actual);
} }
EXPECT_EQ(expected_orig, actual.get()); expected.reset();
EXPECT_TRUE(actual.unique());
mrb.get_copy_at_key(1, actual); mrb.get(1, actual);
EXPECT_EQ(nullptr, actual); EXPECT_EQ(nullptr, actual);
expected.reset(new char('a')); expected.reset(new char('a'));
@ -93,16 +124,16 @@ TEST(TestMappedRingBuffer, nominal) {
expected.reset(new char('c')); expected.reset(new char('c'));
EXPECT_TRUE(mrb.push_and_replace(3, expected)); EXPECT_TRUE(mrb.push_and_replace(3, expected));
mrb.get_copy_at_key(1, actual); mrb.get(1, actual);
EXPECT_EQ(nullptr, actual); EXPECT_EQ(nullptr, actual);
mrb.get_copy_at_key(2, actual); mrb.get(2, actual);
EXPECT_NE(nullptr, actual); EXPECT_NE(nullptr, actual);
if (actual) { if (actual) {
EXPECT_EQ('b', *actual); EXPECT_EQ('b', *actual);
} }
mrb.get_copy_at_key(3, actual); mrb.get(3, actual);
EXPECT_NE(nullptr, actual); EXPECT_NE(nullptr, actual);
if (actual) { if (actual) {
EXPECT_EQ('c', *actual); EXPECT_EQ('c', *actual);
@ -110,40 +141,167 @@ TEST(TestMappedRingBuffer, nominal) {
} }
/* /*
Tests get_ownership on a normal mrb. Tests normal usage of the mrb.
Using shared push_and_replace, unique get and pop methods.
*/ */
TEST(TestMappedRingBuffer, get_ownership) { TEST(TestMappedRingBuffer, nominal_push_shared_get_pop_unique) {
rclcpp::mapped_ring_buffer::MappedRingBuffer<char> mrb(2); rclcpp::mapped_ring_buffer::MappedRingBuffer<char> mrb(2);
std::unique_ptr<char> expected(new char('a')); std::shared_ptr<const char> expected(new char('a'));
// Store expected value's address for later comparison. const char * expected_orig = expected.get();
char * expected_orig = expected.get();
EXPECT_FALSE(mrb.push_and_replace(1, expected)); EXPECT_FALSE(mrb.push_and_replace(1, expected));
EXPECT_EQ(2, expected.use_count());
std::unique_ptr<char> actual; std::unique_ptr<char> actual;
mrb.get_copy_at_key(1, actual); mrb.get(1, actual);
EXPECT_NE(nullptr, actual); EXPECT_NE(nullptr, actual);
if (actual) { if (actual) {
EXPECT_EQ('a', *actual); EXPECT_EQ('a', *actual);
} }
EXPECT_NE(expected_orig, actual.get()); EXPECT_NE(expected_orig, actual.get());
mrb.pop(1, actual);
EXPECT_NE(expected_orig, actual.get());
if (actual) {
EXPECT_EQ('a', *actual);
}
mrb.get(1, actual);
EXPECT_EQ(nullptr, actual);
mrb.get_ownership_at_key(1, actual); EXPECT_FALSE(mrb.push_and_replace(1, expected));
expected.reset();
mrb.pop(1, actual);
EXPECT_NE(expected_orig, actual.get());
if (actual) {
EXPECT_EQ('a', *actual);
}
mrb.get(1, actual);
EXPECT_EQ(nullptr, actual);
expected.reset(new char('a'));
EXPECT_FALSE(mrb.push_and_replace(1, expected));
expected.reset(new char('b'));
EXPECT_FALSE(mrb.push_and_replace(2, expected));
expected.reset(new char('c'));
EXPECT_TRUE(mrb.push_and_replace(3, expected));
mrb.get(1, actual);
EXPECT_EQ(nullptr, actual);
mrb.get(2, actual);
EXPECT_NE(nullptr, actual);
if (actual) {
EXPECT_EQ('b', *actual);
}
mrb.get(3, actual);
EXPECT_NE(nullptr, actual);
if (actual) {
EXPECT_EQ('c', *actual);
}
}
/*
Tests normal usage of the mrb.
Using unique push_and_replace, get and pop methods.
*/
TEST(TestMappedRingBuffer, nominal_push_unique_get_pop_unique) {
rclcpp::mapped_ring_buffer::MappedRingBuffer<char> mrb(2);
std::unique_ptr<char> expected(new char('a'));
const char * expected_orig = expected.get();
EXPECT_FALSE(mrb.push_and_replace(1, std::move(expected)));
std::unique_ptr<char> actual;
mrb.get(1, actual);
EXPECT_NE(nullptr, actual);
if (actual) {
EXPECT_EQ('a', *actual);
}
EXPECT_NE(expected_orig, actual.get());
mrb.pop(1, actual);
if (actual) {
EXPECT_EQ('a', *actual);
}
EXPECT_EQ(expected_orig, actual.get());
mrb.get(1, actual);
EXPECT_EQ(nullptr, actual);
expected.reset(new char('a'));
EXPECT_FALSE(mrb.push_and_replace(1, std::move(expected)));
expected.reset(new char('b'));
EXPECT_FALSE(mrb.push_and_replace(2, std::move(expected)));
expected.reset(new char('c'));
EXPECT_TRUE(mrb.push_and_replace(3, std::move(expected)));
mrb.get(1, actual);
EXPECT_EQ(nullptr, actual);
mrb.get(2, actual);
EXPECT_NE(nullptr, actual);
if (actual) {
EXPECT_EQ('b', *actual);
}
mrb.get(3, actual);
EXPECT_NE(nullptr, actual);
if (actual) {
EXPECT_EQ('c', *actual);
}
}
/*
Tests normal usage of the mrb.
Using unique push_and_replace, shared get and pop methods.
*/
TEST(TestMappedRingBuffer, nominal_push_unique_get_pop_shared) {
rclcpp::mapped_ring_buffer::MappedRingBuffer<char> mrb(2);
std::unique_ptr<char> expected(new char('a'));
const char * expected_orig = expected.get();
EXPECT_FALSE(mrb.push_and_replace(1, std::move(expected)));
std::shared_ptr<const char> actual;
mrb.get(1, actual);
EXPECT_NE(nullptr, actual); EXPECT_NE(nullptr, actual);
if (actual) { if (actual) {
EXPECT_EQ('a', *actual); EXPECT_EQ('a', *actual);
} }
EXPECT_EQ(expected_orig, actual.get()); EXPECT_EQ(expected_orig, actual.get());
mrb.pop(1, actual);
if (actual) {
EXPECT_EQ('a', *actual);
}
EXPECT_EQ(expected_orig, actual.get());
mrb.get(1, actual);
EXPECT_EQ(nullptr, actual);
mrb.pop_at_key(1, actual); expected.reset(new char('a'));
EXPECT_FALSE(mrb.push_and_replace(1, std::move(expected)));
expected.reset(new char('b'));
EXPECT_FALSE(mrb.push_and_replace(2, std::move(expected)));
expected.reset(new char('c'));
EXPECT_TRUE(mrb.push_and_replace(3, std::move(expected)));
mrb.get(1, actual);
EXPECT_EQ(nullptr, actual);
mrb.get(2, actual);
EXPECT_NE(nullptr, actual); EXPECT_NE(nullptr, actual);
if (actual) { if (actual) {
EXPECT_EQ('a', *actual); // The value should be the same. EXPECT_EQ('b', *actual);
} }
EXPECT_NE(expected_orig, actual.get()); // Even though we pop'ed, we didn't get the original.
mrb.get_copy_at_key(1, actual); mrb.get(3, actual);
EXPECT_EQ(nullptr, actual); EXPECT_NE(nullptr, actual);
if (actual) {
EXPECT_EQ('c', *actual);
}
} }
/* /*
@ -152,22 +310,23 @@ TEST(TestMappedRingBuffer, get_ownership) {
TEST(TestMappedRingBuffer, non_unique_keys) { TEST(TestMappedRingBuffer, non_unique_keys) {
rclcpp::mapped_ring_buffer::MappedRingBuffer<char> mrb(2); rclcpp::mapped_ring_buffer::MappedRingBuffer<char> mrb(2);
std::unique_ptr<char> input(new char('a')); std::shared_ptr<const char> input(new char('a'));
mrb.push_and_replace(1, input); mrb.push_and_replace(1, input);
input.reset(new char('b')); input.reset(new char('b'));
// Different value, same key. // Different value, same key.
mrb.push_and_replace(1, input); mrb.push_and_replace(1, input);
input.reset();
std::unique_ptr<char> actual; std::unique_ptr<char> actual;
mrb.pop_at_key(1, actual); mrb.pop(1, actual);
EXPECT_NE(nullptr, actual); EXPECT_NE(nullptr, actual);
if (actual) { if (actual) {
EXPECT_EQ('a', *actual); EXPECT_EQ('a', *actual);
} }
actual = nullptr; actual = nullptr;
mrb.pop_at_key(1, actual); mrb.pop(1, actual);
EXPECT_NE(nullptr, actual); EXPECT_NE(nullptr, actual);
if (actual) { if (actual) {
EXPECT_EQ('b', *actual); EXPECT_EQ('b', *actual);

View file

@ -97,7 +97,7 @@ public:
* to the actual rclcpp Publisher base class * to the actual rclcpp Publisher base class
*/ */
virtual void virtual void
publish(const std::shared_ptr<MessageT> & msg) publish(const std::shared_ptr<const MessageT> & msg)
{ {
if (!enabled_) { if (!enabled_) {
RCLCPP_WARN(logger_, RCLCPP_WARN(logger_,
@ -106,26 +106,7 @@ public:
return; return;
} }
rclcpp::Publisher<MessageT, Alloc>::publish(msg); rclcpp::Publisher<MessageT, Alloc>::publish(*msg);
}
/// LifecyclePublisher publish function
/**
* The publish function checks whether the communication
* was enabled or disabled and forwards the message
* to the actual rclcpp Publisher base class
*/
virtual void
publish(std::shared_ptr<const MessageT> msg)
{
if (!enabled_) {
RCLCPP_WARN(logger_,
"Trying to publish message on the topic '%s', but the publisher is not activated",
this->get_topic_name());
return;
}
rclcpp::Publisher<MessageT, Alloc>::publish(msg);
} }
/// LifecyclePublisher publish function /// LifecyclePublisher publish function
@ -156,25 +137,6 @@ public:
this->publish(*msg); this->publish(*msg);
} }
/// LifecyclePublisher publish function
/**
* The publish function checks whether the communication
* was enabled or disabled and forwards the message
* to the actual rclcpp Publisher base class
*/
virtual void
publish(std::shared_ptr<const MessageT> & msg)
{
if (!enabled_) {
RCLCPP_WARN(logger_,
"Trying to publish message on the topic '%s', but the publisher is not activated",
this->get_topic_name());
return;
}
rclcpp::Publisher<MessageT, Alloc>::publish(msg);
}
virtual void virtual void
on_activate() on_activate()
{ {