Merge pull request #73 from ros2/intra_process

Implement intra process communications for Pub/Sub
This commit is contained in:
William Woodall 2015-08-21 13:29:26 -07:00
commit 7f016cdff8
15 changed files with 1968 additions and 47 deletions

View file

@ -3,6 +3,7 @@ cmake_minimum_required(VERSION 2.8.3)
project(rclcpp)
find_package(ament_cmake REQUIRED)
find_package(rcl_interfaces REQUIRED)
ament_export_dependencies(rmw)
ament_export_dependencies(rcl_interfaces)
@ -12,6 +13,18 @@ ament_export_include_directories(include)
if(AMENT_ENABLE_TESTING)
find_package(ament_lint_auto REQUIRED)
ament_lint_auto_find_test_dependencies()
if(NOT WIN32)
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11 -Wall -Wextra")
endif()
include_directories(include)
ament_add_gtest(test_mapped_ring_buffer test/test_mapped_ring_buffer.cpp)
ament_add_gtest(test_intra_process_manager test/test_intra_process_manager.cpp)
if(TARGET test_intra_process_manager)
target_include_directories(test_intra_process_manager PUBLIC "${rcl_interfaces_INCLUDE_DIRS}")
endif()
endif()
ament_package(

View file

@ -33,6 +33,7 @@ struct AnyExecutable
{}
// Only one of the following pointers will be set.
rclcpp::subscription::SubscriptionBase::SharedPtr subscription;
rclcpp::subscription::SubscriptionBase::SharedPtr subscription_intra_process;
rclcpp::timer::TimerBase::SharedPtr timer;
rclcpp::service::ServiceBase::SharedPtr service;
rclcpp::client::ClientBase::SharedPtr client;

View file

@ -15,16 +15,24 @@
#ifndef RCLCPP_RCLCPP_CONTEXT_HPP_
#define RCLCPP_RCLCPP_CONTEXT_HPP_
#include <memory>
#include <rclcpp/macros.hpp>
#include <iostream>
#include <memory>
#include <mutex>
#include <typeinfo>
#include <typeindex>
#include <unordered_map>
#include <rmw/rmw.h>
namespace rclcpp
{
namespace context
{
/* ROS Context */
class Context
{
public:
@ -32,9 +40,38 @@ public:
Context() {}
template<typename SubContext, typename ... Args>
std::shared_ptr<SubContext>
get_sub_context(Args && ... args)
{
std::lock_guard<std::mutex> lock(mutex_);
std::type_index type_i(typeid(SubContext));
std::shared_ptr<SubContext> sub_context;
auto it = sub_contexts_.find(type_i);
if (it == sub_contexts_.end()) {
// It doesn't exist yet, make it
// *INDENT-OFF* (prevent uncrustify from making unecessary indents here)
sub_context = std::shared_ptr<SubContext>(
new SubContext(std::forward<Args>(args) ...),
[] (SubContext * sub_context_ptr) {
delete sub_context_ptr;
});
// *INDENT-ON*
sub_contexts_[type_i] = sub_context;
} else {
// It exists, get it out and cast it.
sub_context = std::static_pointer_cast<SubContext>(it->second);
}
return sub_context;
}
private:
RCLCPP_DISABLE_COPY(Context);
std::unordered_map<std::type_index, std::shared_ptr<void>> sub_contexts_;
std::mutex mutex_;
};
} /* namespace context */

View file

@ -33,6 +33,13 @@ public:
};
DefaultContext::SharedPtr
get_global_default_context()
{
static DefaultContext::SharedPtr default_context = DefaultContext::make_shared();
return default_context;
}
} // namespace default_context
} // namespace contexts
} // namespace rclcpp

View file

@ -15,15 +15,16 @@
#ifndef RCLCPP_RCLCPP_EXECUTOR_HPP_
#define RCLCPP_RCLCPP_EXECUTOR_HPP_
#include <iostream>
#include <algorithm>
#include <cassert>
#include <cstdlib>
#include <iostream>
#include <list>
#include <memory>
#include <vector>
#include <rcl_interfaces/msg/intra_process_message.hpp>
#include <rclcpp/any_executable.hpp>
#include <rclcpp/macros.hpp>
#include <rclcpp/memory_strategy.hpp>
@ -159,6 +160,9 @@ protected:
if (any_exec->subscription) {
execute_subscription(any_exec->subscription);
}
if (any_exec->subscription_intra_process) {
execute_intra_process_subscription(any_exec->subscription_intra_process);
}
if (any_exec->service) {
execute_service(any_exec->service);
}
@ -194,6 +198,24 @@ protected:
subscription->return_message(message);
}
static void
execute_intra_process_subscription(
rclcpp::subscription::SubscriptionBase::SharedPtr & subscription)
{
rcl_interfaces::msg::IntraProcessMessage ipm;
bool taken = false;
rmw_ret_t status = rmw_take(subscription->intra_process_subscription_handle_, &ipm, &taken);
if (status == RMW_RET_OK) {
if (taken) {
subscription->handle_intra_process_message(ipm);
}
} else {
fprintf(stderr,
"[rclcpp::error] take failed for intra process subscription on topic '%s': %s\n",
subscription->get_topic_name().c_str(), rmw_get_error_string_safe());
}
}
static void
execute_timer(
rclcpp::timer::TimerBase::SharedPtr & timer)
@ -293,22 +315,24 @@ protected:
}));
}
// Use the number of subscriptions to allocate memory in the handles
size_t number_of_subscriptions = subs.size();
size_t max_number_of_subscriptions = subs.size() * 2; // Times two for intra-process.
rmw_subscriptions_t subscriber_handles;
subscriber_handles.subscriber_count = number_of_subscriptions;
subscriber_handles.subscriber_count = 0;
// TODO(wjwwood): Avoid redundant malloc's
subscriber_handles.subscribers =
memory_strategy_->borrow_handles(HandleType::subscription_handle, number_of_subscriptions);
subscriber_handles.subscribers = memory_strategy_->borrow_handles(
HandleType::subscription_handle, max_number_of_subscriptions);
if (subscriber_handles.subscribers == NULL) {
// TODO(wjwwood): Use a different error here? maybe std::bad_alloc?
throw std::runtime_error("Could not malloc for subscriber pointers.");
}
// Then fill the SubscriberHandles with ready subscriptions
size_t subscriber_handle_index = 0;
for (auto & subscription : subs) {
subscriber_handles.subscribers[subscriber_handle_index] = \
subscriber_handles.subscribers[subscriber_handles.subscriber_count++] =
subscription->subscription_handle_->data;
subscriber_handle_index += 1;
if (subscription->intra_process_subscription_handle_) {
subscriber_handles.subscribers[subscriber_handles.subscriber_count++] =
subscription->intra_process_subscription_handle_->data;
}
}
// Use the number of services to allocate memory in the handles
@ -414,7 +438,7 @@ protected:
}
// Add the new work to the class's list of things waiting to be executed
// Starting with the subscribers
for (size_t i = 0; i < number_of_subscriptions; ++i) {
for (size_t i = 0; i < subscriber_handles.subscriber_count; ++i) {
void * handle = subscriber_handles.subscribers[i];
if (handle) {
subscriber_handles_.push_back(handle);
@ -463,13 +487,18 @@ protected:
}
for (auto & weak_subscription : group->subscription_ptrs_) {
auto subscription = weak_subscription.lock();
if (subscription && subscription->subscription_handle_->data == subscriber_handle) {
return subscription;
if (subscription) {
if (subscription->subscription_handle_->data == subscriber_handle) {
return subscription;
}
if (subscription->intra_process_subscription_handle_->data == subscriber_handle) {
return subscription;
}
}
}
}
}
return rclcpp::subscription::SubscriptionBase::SharedPtr();
return nullptr;
}
rclcpp::service::ServiceBase::SharedPtr
@ -653,6 +682,11 @@ protected:
while (it != subscriber_handles_.end()) {
auto subscription = get_subscription_by_handle(*it);
if (subscription) {
// Figure out if this is for intra-process or not.
bool is_intra_process = false;
if (subscription->intra_process_subscription_handle_) {
is_intra_process = subscription->intra_process_subscription_handle_->data == *it;
}
// Find the group for this handle and see if it can be serviced
auto group = get_group_by_subscription(subscription);
if (!group) {
@ -668,7 +702,11 @@ protected:
continue;
}
// Otherwise it is safe to set and return the any_exec
any_exec->subscription = subscription;
if (is_intra_process) {
any_exec->subscription_intra_process = subscription;
} else {
any_exec->subscription = subscription;
}
any_exec->callback_group = group;
any_exec->node = get_node_by_group(group);
subscriber_handles_.erase(it++);
@ -804,7 +842,7 @@ protected:
}
// Check the subscriptions to see if there are any that are ready
get_next_subscription(any_exec);
if (any_exec->subscription) {
if (any_exec->subscription || any_exec->subscription_intra_process) {
return any_exec;
}
// Check the services to see if there are any that are ready

View file

@ -0,0 +1,419 @@
// Copyright 2015 Open Source Robotics Foundation, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef RCLCPP_RCLCPP_INTRA_PROCESS_MANAGER_HPP_
#define RCLCPP_RCLCPP_INTRA_PROCESS_MANAGER_HPP_
#include <rclcpp/mapped_ring_buffer.hpp>
#include <rclcpp/macros.hpp>
#include <rclcpp/publisher.hpp>
#include <rclcpp/subscription.hpp>
#include <algorithm>
#include <atomic>
#include <cstdint>
#include <exception>
#include <limits>
#include <map>
#include <unordered_map>
#include <set>
namespace rclcpp
{
namespace intra_process_manager
{
/// This class facilitates intra process communication between nodes.
/* This class is used in the creation of publishers and subscriptions.
* A singleton instance of this class is owned by a rclcpp::Context and a
* rclcpp::Node can use an associated Context to get an instance of this class.
* Nodes which do not have a common Context will not exchange intra process
* messages because they will not share access to an instance of this class.
*
* When a Node creates a publisher or subscription, it will register them
* with this class.
* The node will also hook into the publisher's publish call
* in order to do intra process related work.
*
* When a publisher is created, it advertises on the topic the user provided,
* as well as a "shadowing" topic of type rcl_interfaces/IntraProcessMessage.
* For instance, if the user specified the topic '/namespace/chatter', then the
* corresponding intra process topic might be '/namespace/chatter__intra'.
* The publisher is also allocated an id which is unique among all publishers
* and subscriptions in this process.
* Additionally, when registered with this class a ring buffer is created and
* owned by this class as a temporary place to hold messages destined for intra
* process subscriptions.
*
* When a subscription is created, it subscribes to the topic provided by the
* user as well as to the corresponding intra process topic.
* It is also gets a unique id from the singleton instance of this class which
* is unique among publishers and subscriptions.
*
* When the user publishes a message, the message is stored by calling
* store_intra_process_message on this class.
* The instance of that message is uniquely identified by a publisher id and a
* message sequence number.
* The publisher id, message sequence pair is unique with in the process.
* At that point a list of the id's of intra process subscriptions which have
* been registered with the singleton instance of this class are stored with
* the message instance so that delivery is only made to those subscriptions.
* Then an instance of rcl_interfaces/IntraProcessMessage is published to the
* intra process topic which is specific to the topic specified by the user.
*
* When an instance of rcl_interfaces/IntraProcessMessage is received by a
* subscription, then it is handled by calling take_intra_process_message
* on a singleton of this class.
* The subscription passes a publisher id, message sequence pair which
* uniquely identifies the message instance it was suppose to receive as well
* as the subscriptions unique id.
* If the message is still being held by this class and the subscription's id
* is in the list of intended subscriptions then the message is returned.
* If either of those predicates are not satisfied then the message is not
* returned and the subscription does not call the users callback.
*
* Since the publisher builds a list of destined subscriptions on publish, and
* other requests are ignored, this class knows how many times a message
* instance should be requested.
* The final time a message is requested, the ownership is passed out of this
* class and passed to the final subscription, effectively freeing space in
* this class's internal storage.
*
* Since a topic is being used to ferry notifications about new intra process
* messages between publishers and subscriptions, it is possible for that
* notification to be lost.
* It is also possible that a subscription which was available when publish was
* called will no longer exist once the notification gets posted.
* In both cases this might result in a message instance getting requested
* fewer times than expected.
* This is why the internal storage of this class is a ring buffer.
* That way if a message is orphaned it will eventually be dropped from storage
* when a new message instance is stored and will not result in a memory leak.
*
* However, since the storage system is finite, this also means that a message
* instance might get displaced by an incoming message instance before all
* interested parties have called take_intra_process_message.
* Because of this the size of the internal storage should be carefully
* considered.
*
* /TODO(wjwwood): update to include information about handling latching.
* /TODO(wjwwood): consider thread safety of the class.
*
* This class is neither CopyConstructable nor CopyAssignable.
*/
class IntraProcessManager
{
private:
RCLCPP_DISABLE_COPY(IntraProcessManager);
public:
RCLCPP_SMART_PTR_DEFINITIONS(IntraProcessManager);
IntraProcessManager() = default;
/// Register a subscription with the manager, returns subscriptions unique id.
/* In addition to generating a unique intra process id for the subscription,
* this method also stores the topic name of the subscription.
*
* This method is normally called during the creation of a subscription,
* but after it creates the internal intra process rmw_subscription_t.
*
* This method will allocate memory.
*
* \param subscription the Subscription to register.
* \return an unsigned 64-bit integer which is the subscription's unique id.
*/
uint64_t
add_subscription(subscription::SubscriptionBase::SharedPtr subscription)
{
auto id = IntraProcessManager::get_next_unique_id();
subscriptions_[id] = subscription;
subscription_ids_by_topic_[subscription->get_topic_name()].insert(id);
return id;
}
/// Unregister a subscription using the subscription's unique id.
/* This method does not allocate memory.
*
* \param intra_process_subscription_id id of the subscription to remove.
*/
void
remove_subscription(uint64_t intra_process_subscription_id)
{
subscriptions_.erase(intra_process_subscription_id);
for (auto & pair : subscription_ids_by_topic_) {
pair.second.erase(intra_process_subscription_id);
}
// Iterate over all publisher infos and all stored subscription id's and
// remove references to this subscription's id.
for (auto & publisher_pair : publishers_) {
for (auto & sub_pair : publisher_pair.second.target_subscriptions_by_message_sequence) {
sub_pair.second.erase(intra_process_subscription_id);
}
}
}
/// Register a publisher with the manager, returns the publisher unique id.
/* In addition to generating and returning a unique id for the publisher,
* this method creates internal ring buffer storage for "in-flight" intra
* process messages which are stored when store_intra_process_message is
* called with this publisher's unique id.
*
* The buffer_size must be less than or equal to the max uint64_t value.
* If the buffer_size is 0 then a buffer size is calculated using the
* publisher's QoS settings.
* The default is to use the depth field of the publisher's QoS.
* TODO(wjwwood): Consider doing depth *= 1.2, round up, or similar.
* TODO(wjwwood): Consider what to do for keep all.
*
* This method is templated on the publisher's message type so that internal
* storage of the same type can be allocated.
*
* This method will allocate memory.
*
* \param publisher publisher to be registered with the manager.
* \param buffer_size if 0 (default) a size is calculated based on the QoS.
* \return an unsigned 64-bit integer which is the publisher's unique id.
*/
template<typename MessageT>
uint64_t
add_publisher(publisher::Publisher::SharedPtr publisher, size_t buffer_size = 0)
{
auto id = IntraProcessManager::get_next_unique_id();
publishers_[id].publisher = publisher;
size_t size = buffer_size > 0 ? buffer_size : publisher->get_queue_size();
// As long as the size of the ring buffer is less than the max sequence number, we're safe.
if (size > std::numeric_limits<uint64_t>::max()) {
throw std::invalid_argument("the calculated buffer size is too large");
}
publishers_[id].sequence_number.store(0);
publishers_[id].buffer = mapped_ring_buffer::MappedRingBuffer<MessageT>::make_shared(size);
publishers_[id].target_subscriptions_by_message_sequence.reserve(size);
return id;
}
/// Unregister a publisher using the publisher's unique id.
/* This method does not allocate memory.
*
* \param intra_process_publisher_id id of the publisher to remove.
*/
void
remove_publisher(uint64_t intra_process_publisher_id)
{
publishers_.erase(intra_process_publisher_id);
}
/// Store a message in the manager, and return the message sequence number.
/* The given message is stored in internal storage using the given publisher
* id and the newly generated message sequence, which is also returned.
* The combination of publisher id and message sequence number can later
* be used with a subscription id to retrieve the message by calling
* take_intra_process_message.
* The number of times take_intra_process_message can be called with this
* unique pair of id's is determined by the number of subscriptions currently
* subscribed to the same topic and which share the same Context, i.e. once
* for each subscription which should receive the intra process message.
*
* The ownership of the incoming message is transfered to the internal
* storage in order to avoid copying the message data.
* Therefore, the message parameter will no longer contain the original
* message after calling this method.
* Instead it will either be a nullptr or it will contain the ownership of
* the message instance which was displaced.
* If the message parameter is not equal to nullptr after calling this method
* then a message was prematurely displaced, i.e. take_intra_process_message
* had not been called on it as many times as was expected.
*
* This method can throw an exception if the publisher id is not found or
* if the publisher shared_ptr given to add_publisher has gone out of scope.
*
* This method does allocate memory.
*
* \param intra_process_publisher_id the id of the publisher of this message.
* \param message the message that is being stored.
* \return the message sequence number.
*/
template<typename MessageT>
uint64_t
store_intra_process_message(
uint64_t intra_process_publisher_id,
std::unique_ptr<MessageT> & message)
{
auto it = publishers_.find(intra_process_publisher_id);
if (it == publishers_.end()) {
throw std::runtime_error("store_intra_process_message called with invalid publisher id");
}
PublisherInfo & info = it->second;
// Calculate the next message sequence number.
uint64_t message_seq = info.sequence_number.fetch_add(1, std::memory_order_relaxed);
// Insert the message into the ring buffer using the message_seq to identify it.
typedef typename mapped_ring_buffer::MappedRingBuffer<MessageT> TypedMRB;
typename TypedMRB::SharedPtr typed_buffer = std::static_pointer_cast<TypedMRB>(info.buffer);
bool did_replace = typed_buffer->push_and_replace(message_seq, message);
// TODO(wjwwood): do something when a message was displaced. log debug?
(void)did_replace; // Avoid unused variable warning.
// Figure out what subscriptions should receive the message.
auto publisher = info.publisher.lock();
if (!publisher) {
throw std::runtime_error("publisher has unexpectedly gone out of scope");
}
auto & destined_subscriptions = subscription_ids_by_topic_[publisher->get_topic_name()];
// Store the list for later comparison.
info.target_subscriptions_by_message_sequence[message_seq].clear();
std::copy(
destined_subscriptions.begin(), destined_subscriptions.end(),
// Memory allocation occurs in info.target_subscriptions_by_message_sequence[message_seq]
std::inserter(
info.target_subscriptions_by_message_sequence[message_seq],
// This ends up only being a hint to std::set, could also be .begin().
info.target_subscriptions_by_message_sequence[message_seq].end()
)
);
// Return the message sequence which is sent to the subscription.
return message_seq;
}
/// Take an intra process message.
/* The intra_process_publisher_id and message_sequence_number parameters
* uniquely identify a message instance, which should be taken.
*
* The requesting_subscriptions_intra_process_id parameter is used to make
* sure the requesting subscription was intended to receive this message
* instance.
* This check is made because it could happen that the requester
* comes up after the publish event, so it still receives the notification of
* a new intra process message, but it wasn't registered with the manager at
* the time of publishing, causing it to take when it wasn't intended.
* This should be avioded unless latching-like behavior is involved.
*
* The message parameter is used to store the taken message.
* On the last expected call to this method, the ownership is transfered out
* of internal storage and into the message parameter.
* On all previous calls a copy of the internally stored message is made and
* the ownership of the copy is transfered to the message parameter.
* TODO(wjwwood): update this documentation when latching is supported.
*
* The message parameter can be set to nullptr if:
*
* - The publisher id is not found.
* - The message sequence is not found for the given publisher id.
* - The requesting subscription's id is not in the list of intended takers.
* - The requesting subscription's id has been used before with this message.
*
* This method may allocate memory to copy the stored message.
*
* \param intra_process_publisher_id the id of the message's publisher.
* \param message_sequence_number the sequence number of the message.
* \param requesting_subscriptions_intra_process_id the subscription's id.
* \param message the message typed unique_ptr used to return the message.
*/
template<typename MessageT>
void
take_intra_process_message(
uint64_t intra_process_publisher_id,
uint64_t message_sequence_number,
uint64_t requesting_subscriptions_intra_process_id,
std::unique_ptr<MessageT> & message)
{
message = nullptr;
PublisherInfo * info;
{
auto it = publishers_.find(intra_process_publisher_id);
if (it == publishers_.end()) {
// Publisher is either invalid or no longer exists.
return;
}
info = &it->second;
}
// Figure out how many subscriptions are left.
std::set<uint64_t> * target_subs;
{
auto it = info->target_subscriptions_by_message_sequence.find(message_sequence_number);
if (it == info->target_subscriptions_by_message_sequence.end()) {
// Message is no longer being stored by this publisher.
return;
}
target_subs = &it->second;
}
{
auto it = std::find(
target_subs->begin(), target_subs->end(),
requesting_subscriptions_intra_process_id);
if (it == target_subs->end()) {
// This publisher id/message seq pair was not intended for this subscription.
return;
}
target_subs->erase(it);
}
typedef typename mapped_ring_buffer::MappedRingBuffer<MessageT> TypedMRB;
typename TypedMRB::SharedPtr typed_buffer = std::static_pointer_cast<TypedMRB>(info->buffer);
// Return a copy or the unique_ptr (ownership) depending on how many subscriptions are left.
if (target_subs->size()) {
// There are more subscriptions to serve, return a copy.
typed_buffer->get_copy_at_key(message_sequence_number, message);
} else {
// This is the last one to be returned, transfer ownership.
typed_buffer->pop_at_key(message_sequence_number, message);
}
}
private:
static uint64_t get_next_unique_id()
{
auto next_id = next_unique_id_.fetch_add(1, std::memory_order_relaxed);
// Check for rollover (we started at 1).
if (0 == next_id) {
// This puts a technical limit on the number of times you can add a publisher or subscriber.
// But even if you could add (and remove) them at 1 kHz (very optimistic rate)
// it would still be a very long time before you could exhaust the pool of id's:
// 2^64 / 1000 times per sec / 60 sec / 60 min / 24 hours / 365 days = 584,942,417 years
// So around 585 million years. Even at 1 GHz, it would take 585 years.
// I think it's safe to avoid trying to handle overflow.
// If we roll over then it's most likely a bug.
// *INDENT-OFF* (prevent uncrustify from making unecessary indents here)
throw std::overflow_error(
"exhausted the unique id's for publishers and subscribers in this process "
"(congratulations your computer is either extremely fast or extremely old)");
// *INDENT-ON*
}
return next_id;
}
static std::atomic<uint64_t> next_unique_id_;
std::unordered_map<uint64_t, subscription::SubscriptionBase::WeakPtr> subscriptions_;
std::map<std::string, std::set<uint64_t>> subscription_ids_by_topic_;
struct PublisherInfo
{
RCLCPP_DISABLE_COPY(PublisherInfo);
PublisherInfo() = default;
publisher::Publisher::WeakPtr publisher;
std::atomic<uint64_t> sequence_number;
mapped_ring_buffer::MappedRingBufferBase::SharedPtr buffer;
std::unordered_map<uint64_t, std::set<uint64_t>> target_subscriptions_by_message_sequence;
};
std::unordered_map<uint64_t, PublisherInfo> publishers_;
};
std::atomic<uint64_t> IntraProcessManager::next_unique_id_ {1};
} /* namespace intra_process_manager */
} /* namespace rclcpp */
#endif /* RCLCPP_RCLCPP_INTRA_PROCESS_MANAGER_HPP_ */

View file

@ -0,0 +1,214 @@
// Copyright 2015 Open Source Robotics Foundation, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef RCLCPP_RCLCPP_RING_BUFFER_HPP_
#define RCLCPP_RCLCPP_RING_BUFFER_HPP_
#include <rclcpp/macros.hpp>
#include <algorithm>
#include <cstddef>
#include <cstdint>
#include <memory>
#include <vector>
namespace rclcpp
{
namespace mapped_ring_buffer
{
class MappedRingBufferBase
{
public:
RCLCPP_SMART_PTR_DEFINITIONS(MappedRingBufferBase);
};
/// Ring buffer container of unique_ptr's of T, which can be accessed by a key.
/* T must be a CopyConstructable and CopyAssignable.
* This class can be used in a container by using the base class MappedRingBufferBase.
* This class must have a positive, non-zero size.
* This class cannot be resized nor can it reserve additional space after construction.
* This class is not CopyConstructable nor CopyAssignable.
*
* The key's are not guaranteed to be unique because push_and_replace does not
* check for colliding keys.
* It is up to the user to only use unique keys.
* A side effect of this is that when get_copy_at_key or pop_at_key are called,
* they return the first encountered instance of the key.
* But iteration does not begin with the ring buffer's head, and therefore
* there is no guarantee on which value is returned if a key is used multiple
* times.
*/
template<typename T>
class MappedRingBuffer : public MappedRingBufferBase
{
public:
RCLCPP_SMART_PTR_DEFINITIONS(MappedRingBuffer<T>);
/// Constructor.
/* The constructor will allocate memory while reserving space.
*
* \param size size of the ring buffer; must be positive and non-zero.
*/
MappedRingBuffer(size_t size)
: elements_(size), head_(0)
{
if (size == 0) {
throw std::invalid_argument("size must be a positive, non-zero value");
}
}
virtual ~MappedRingBuffer() {}
/// Return a copy of the value stored in the ring buffer at the given key.
/* The key is matched if an element in the ring buffer has a matching key.
* This method will allocate in order to return a copy.
*
* The key is not guaranteed to be unique, see the class docs for more.
*
* The contents of value before the method is called are discarded.
*
* \param key the key associated with the stored value
* \param value if the key is found, the value is stored in this parameter
*/
void
get_copy_at_key(uint64_t key, std::unique_ptr<T> & value)
{
auto it = get_iterator_of_key(key);
value = nullptr;
if (it != elements_.end() && it->in_use) {
value = std::unique_ptr<T>(new T(*it->value));
}
}
/// Return ownership of the value stored in the ring buffer, leaving a copy.
/* The key is matched if an element in the ring bufer has a matching key.
* This method will allocate in order to store a copy.
*
* The key is not guaranteed to be unique, see the class docs for more.
*
* The ownership of the currently stored object is returned, but a copy is
* made and stored in its place.
* This means that multiple calls to this function for a particular element
* will result in returning the copied and stored object not the original.
* This also means that later calls to pop_at_key will not return the
* originally stored object, since it was returned by the first call to this
* method.
*
* The contents of value before the method is called are discarded.
*
* \param key the key associated with the stored value
* \param value if the key is found, the value is stored in this parameter
*/
void
get_ownership_at_key(uint64_t key, std::unique_ptr<T> & value)
{
auto it = get_iterator_of_key(key);
value = nullptr;
if (it != elements_.end() && it->in_use) {
// Make a copy.
auto copy = std::unique_ptr<T>(new T(*it->value));
// Return the original.
value.swap(it->value);
// Store the copy.
it->value.swap(copy);
}
}
/// Return ownership of the value stored in the ring buffer at the given key.
/* The key is matched if an element in the ring buffer has a matching key.
*
* The key is not guaranteed to be unique, see the class docs for more.
*
* The contents of value before the method is called are discarded.
*
* \param key the key associated with the stored value
* \param value if the key is found, the value is stored in this parameter
*/
void
pop_at_key(uint64_t key, std::unique_ptr<T> & value)
{
auto it = get_iterator_of_key(key);
value = nullptr;
if (it != elements_.end() && it->in_use) {
value.swap(it->value);
it->in_use = false;
}
}
/// Insert a key-value pair, displacing an existing pair if necessary.
/* The key's uniqueness is not checked on insertion.
* It is up to the user to ensure the key is unique.
* This method should not allocate memory.
*
* After insertion, if a pair was replaced, then value will contain ownership
* of that displaced value. Otherwise it will be a nullptr.
*
* \param key the key associated with the value to be stored
* \param value the value to store, and optionally the value displaced
*/
bool
push_and_replace(uint64_t key, std::unique_ptr<T> & value)
{
bool did_replace = elements_[head_].in_use;
elements_[head_].key = key;
elements_[head_].value.swap(value);
elements_[head_].in_use = true;
head_ = (head_ + 1) % elements_.size();
return did_replace;
}
bool
push_and_replace(uint64_t key, std::unique_ptr<T> && value)
{
std::unique_ptr<T> temp = std::move(value);
return push_and_replace(key, temp);
}
/// Return true if the key is found in the ring buffer, otherwise false.
bool
has_key(uint64_t key)
{
return elements_.end() != get_iterator_of_key(key);
}
private:
RCLCPP_DISABLE_COPY(MappedRingBuffer<T>);
struct element
{
uint64_t key;
std::unique_ptr<T> value;
bool in_use;
};
typename std::vector<element>::iterator
get_iterator_of_key(uint64_t key)
{
// *INDENT-OFF* (prevent uncrustify from making unecessary indents here)
auto it = std::find_if(elements_.begin(), elements_.end(), [key](element & e) -> bool {
return e.key == key && e.in_use;
});
// *INDENT-ON*
return it;
}
std::vector<element> elements_;
size_t head_;
};
} /* namespace ring_buffer */
} /* namespace rclcpp */
#endif /* RCLCPP_RCLCPP_RING_BUFFER_HPP_ */

View file

@ -24,6 +24,7 @@
#include <rcl_interfaces/msg/parameter_descriptor.hpp>
#include <rcl_interfaces/msg/parameter_event.hpp>
#include <rcl_interfaces/msg/set_parameters_result.hpp>
#include <rosidl_generator_cpp/message_type_support.hpp>
#include <rclcpp/callback_group.hpp>
#include <rclcpp/client.hpp>
@ -103,9 +104,11 @@ public:
RCLCPP_SMART_PTR_DEFINITIONS(Node);
/* Create a node based on the node name. */
Node(const std::string & node_name);
Node(const std::string & node_name, bool use_intra_process_comms = false);
/* Create a node based on the node name and a rclcpp::context::Context. */
Node(const std::string & node_name, rclcpp::context::Context::SharedPtr context);
Node(
const std::string & node_name, rclcpp::context::Context::SharedPtr context,
bool use_intra_process_comms = false);
/* Get the name of the node. */
const std::string &
@ -197,6 +200,8 @@ public:
private:
RCLCPP_DISABLE_COPY(Node);
static const rosidl_message_type_support_t * ipm_ts_;
bool
group_in_node(callback_group::CallbackGroup::SharedPtr & group);
@ -214,6 +219,8 @@ private:
size_t number_of_services_;
size_t number_of_clients_;
bool use_intra_process_comms_;
mutable std::mutex mutex_;
std::map<std::string, rclcpp::parameter::ParameterVariant> parameters_;
@ -305,6 +312,9 @@ private:
}
};
const rosidl_message_type_support_t * Node::ipm_ts_ =
rosidl_generator_cpp::get_message_type_support_handle<rcl_interfaces::msg::IntraProcessMessage>();
} /* namespace node */
} /* namespace rclcpp */

View file

@ -24,12 +24,14 @@
#include <stdexcept>
#include <string>
#include <rcl_interfaces/msg/intra_process_message.hpp>
#include <rmw/error_handling.h>
#include <rmw/rmw.h>
#include <rosidl_generator_cpp/message_type_support.hpp>
#include <rosidl_generator_cpp/service_type_support.hpp>
#include <rclcpp/contexts/default_context.hpp>
#include <rclcpp/intra_process_manager.hpp>
#include <rclcpp/parameter.hpp>
#ifndef RCLCPP_RCLCPP_NODE_HPP_
@ -39,15 +41,20 @@
using namespace rclcpp;
using namespace rclcpp::node;
using rclcpp::contexts::default_context::DefaultContext;
Node::Node(const std::string & node_name)
: Node(node_name, DefaultContext::make_shared())
Node::Node(const std::string & node_name, bool use_intra_process_comms)
: Node(
node_name,
rclcpp::contexts::default_context::get_global_default_context(),
use_intra_process_comms)
{}
Node::Node(const std::string & node_name, context::Context::SharedPtr context)
Node::Node(
const std::string & node_name,
context::Context::SharedPtr context,
bool use_intra_process_comms)
: name_(node_name), context_(context),
number_of_subscriptions_(0), number_of_timers_(0), number_of_services_(0)
number_of_subscriptions_(0), number_of_timers_(0), number_of_services_(0),
use_intra_process_comms_(use_intra_process_comms)
{
size_t domain_id = 0;
char * ros_domain_id = nullptr;
@ -80,6 +87,7 @@ Node::Node(const std::string & node_name, context::Context::SharedPtr context)
// *INDENT-ON*
}
// Initialize node handle shared_ptr with custom deleter.
// *INDENT-OFF*
node_handle_.reset(node, [](rmw_node_t * node) {
auto ret = rmw_destroy_node(node);
if (ret != RMW_RET_OK) {
@ -87,6 +95,7 @@ Node::Node(const std::string & node_name, context::Context::SharedPtr context)
stderr, "Error in destruction of rmw node handle: %s\n", rmw_get_error_string_safe());
}
});
// *INDENT-ON*
using rclcpp::callback_group::CallbackGroupType;
default_callback_group_ = create_callback_group(
@ -123,7 +132,47 @@ Node::create_publisher(
// *INDENT-ON*
}
return publisher::Publisher::make_shared(node_handle_, publisher_handle);
auto publisher = publisher::Publisher::make_shared(
node_handle_, publisher_handle, topic_name, qos_profile.depth);
if (use_intra_process_comms_) {
rmw_publisher_t * intra_process_publisher_handle = rmw_create_publisher(
node_handle_.get(), ipm_ts_, (topic_name + "__intra").c_str(), qos_profile);
if (!intra_process_publisher_handle) {
// *INDENT-OFF* (prevent uncrustify from making unecessary indents here)
throw std::runtime_error(
std::string("could not create intra process publisher: ") +
rmw_get_error_string_safe());
// *INDENT-ON*
}
auto intra_process_manager =
context_->get_sub_context<rclcpp::intra_process_manager::IntraProcessManager>();
uint64_t intra_process_publisher_id =
intra_process_manager->add_publisher<MessageT>(publisher);
rclcpp::intra_process_manager::IntraProcessManager::WeakPtr weak_ipm = intra_process_manager;
// *INDENT-OFF*
auto shared_publish_callback =
[weak_ipm](uint64_t publisher_id, std::shared_ptr<void> msg) -> uint64_t
{
auto ipm = weak_ipm.lock();
if (!ipm) {
// TODO(wjwwood): should this just return silently? Or maybe return with a logged warning?
throw std::runtime_error(
"intra process publish called after destruction of intra process manager");
}
auto typed_msg = std::static_pointer_cast<const MessageT>(msg);
std::unique_ptr<MessageT> unique_msg(new MessageT(*typed_msg));
uint64_t message_seq = ipm->store_intra_process_message(publisher_id, unique_msg);
return message_seq;
};
// *INDENT-ON*
publisher->setup_intra_process(
intra_process_publisher_id,
shared_publish_callback,
intra_process_publisher_handle);
}
return publisher;
}
bool
@ -163,8 +212,7 @@ Node::create_subscription(
if (!subscriber_handle) {
// *INDENT-OFF* (prevent uncrustify from making unecessary indents here)
throw std::runtime_error(
std::string("could not create subscription: ") +
rmw_get_error_string_safe());
std::string("could not create subscription: ") + rmw_get_error_string_safe());
// *INDENT-ON*
}
@ -178,10 +226,47 @@ Node::create_subscription(
callback,
msg_mem_strat);
auto sub_base_ptr = std::dynamic_pointer_cast<SubscriptionBase>(sub);
// Setup intra process.
if (use_intra_process_comms_) {
rmw_subscription_t * intra_process_subscriber_handle = rmw_create_subscription(
node_handle_.get(), ipm_ts_,
(topic_name + "__intra").c_str(), qos_profile, false);
if (!subscriber_handle) {
// *INDENT-OFF* (prevent uncrustify from making unecessary indents here)
throw std::runtime_error(
std::string("could not create intra process subscription: ") + rmw_get_error_string_safe());
// *INDENT-ON*
}
auto intra_process_manager =
context_->get_sub_context<rclcpp::intra_process_manager::IntraProcessManager>();
rclcpp::intra_process_manager::IntraProcessManager::WeakPtr weak_ipm = intra_process_manager;
uint64_t intra_process_subscription_id =
intra_process_manager->add_subscription(sub_base_ptr);
// *INDENT-OFF*
sub->setup_intra_process(
intra_process_subscription_id,
intra_process_subscriber_handle,
[weak_ipm](
uint64_t publisher_id,
uint64_t message_sequence,
uint64_t subscription_id,
std::unique_ptr<MessageT> & message)
{
auto ipm = weak_ipm.lock();
if (!ipm) {
// TODO(wjwwood): should this just return silently? Or maybe return with a logged warning?
throw std::runtime_error(
"intra process take called after destruction of intra process manager");
}
ipm->take_intra_process_message(publisher_id, message_sequence, subscription_id, message);
});
// *INDENT-ON*
}
// Assign to a group.
if (group) {
if (!group_in_node(group)) {
// TODO: use custom exception
throw std::runtime_error("Cannot create timer, group not in node.");
throw std::runtime_error("Cannot create subscription, group not in node.");
}
group->add_subscription(sub_base_ptr);
} else {

View file

@ -15,15 +15,18 @@
#ifndef RCLCPP_RCLCPP_PUBLISHER_HPP_
#define RCLCPP_RCLCPP_PUBLISHER_HPP_
#include <rclcpp/macros.hpp>
#include <iostream>
#include <memory>
#include <sstream>
#include <string>
#include <rcl_interfaces/msg/intra_process_message.hpp>
#include <rmw/impl/cpp/demangle.hpp>
#include <rmw/error_handling.h>
#include <rmw/rmw.h>
#include <rclcpp/macros.hpp>
namespace rclcpp
{
@ -38,23 +41,38 @@ namespace publisher
class Publisher
{
friend rclcpp::node::Node;
public:
RCLCPP_SMART_PTR_DEFINITIONS(Publisher);
Publisher(std::shared_ptr<rmw_node_t> node_handle, rmw_publisher_t * publisher_handle)
: node_handle_(node_handle), publisher_handle_(publisher_handle)
Publisher(
std::shared_ptr<rmw_node_t> node_handle,
rmw_publisher_t * publisher_handle,
std::string topic,
size_t queue_size)
: node_handle_(node_handle), publisher_handle_(publisher_handle),
intra_process_publisher_handle_(nullptr),
topic_(topic), queue_size_(queue_size),
intra_process_publisher_id_(0), store_intra_process_message_(nullptr)
{}
virtual ~Publisher()
{
if (intra_process_publisher_handle_) {
if (rmw_destroy_publisher(node_handle_.get(), intra_process_publisher_handle_)) {
fprintf(
stderr,
"Error in destruction of intra process rmw publisher handle: %s\n",
rmw_get_error_string_safe());
}
}
if (publisher_handle_) {
if (rmw_destroy_publisher(node_handle_.get(), publisher_handle_) != RMW_RET_OK) {
// *INDENT-OFF*
std::stringstream ss;
ss << "Error in destruction of rmw publisher handle: "
<< rmw_get_error_string_safe() << '\n';
// *INDENT-ON*
(std::cerr << ss.str()).flush();
fprintf(
stderr,
"Error in destruction of rmw publisher handle: %s\n",
rmw_get_error_string_safe());
}
}
}
@ -63,19 +81,70 @@ public:
void
publish(std::shared_ptr<MessageT> & msg)
{
rmw_ret_t status = rmw_publish(publisher_handle_, msg.get());
if (status != RMW_RET_OK) {
// *INDENT-OFF* (prevent uncrustify from making unecessary indents here)
throw std::runtime_error(
std::string("failed to publish message: ") + rmw_get_error_string_safe());
// *INDENT-ON*
rmw_ret_t status;
if (!store_intra_process_message_) {
// TODO(wjwwood): for now, make intra process and inter process mutually exclusive.
// Later we'll have them together, when we have a way to filter more efficiently.
status = rmw_publish(publisher_handle_, msg.get());
if (status != RMW_RET_OK) {
// *INDENT-OFF* (prevent uncrustify from making unecessary indents here)
throw std::runtime_error(
std::string("failed to publish message: ") + rmw_get_error_string_safe());
// *INDENT-ON*
}
}
if (store_intra_process_message_) {
uint64_t message_seq = store_intra_process_message_(intra_process_publisher_id_, msg);
rcl_interfaces::msg::IntraProcessMessage ipm;
ipm.publisher_id = intra_process_publisher_id_;
ipm.message_sequence = message_seq;
status = rmw_publish(intra_process_publisher_handle_, &ipm);
if (status != RMW_RET_OK) {
// *INDENT-OFF* (prevent uncrustify from making unecessary indents here)
throw std::runtime_error(
std::string("failed to publish intra process message: ") + rmw_get_error_string_safe());
// *INDENT-ON*
}
}
}
const std::string &
get_topic_name() const
{
return topic_;
}
size_t
get_queue_size() const
{
return queue_size_;
}
typedef std::function<uint64_t(uint64_t, std::shared_ptr<void>)> StoreSharedMessageCallbackT;
protected:
void
setup_intra_process(
uint64_t intra_process_publisher_id,
StoreSharedMessageCallbackT callback,
rmw_publisher_t * intra_process_publisher_handle)
{
intra_process_publisher_id_ = intra_process_publisher_id;
store_intra_process_message_ = callback;
intra_process_publisher_handle_ = intra_process_publisher_handle;
}
private:
std::shared_ptr<rmw_node_t> node_handle_;
rmw_publisher_t * publisher_handle_;
rmw_publisher_t * intra_process_publisher_handle_;
std::string topic_;
size_t queue_size_;
uint64_t intra_process_publisher_id_;
StoreSharedMessageCallbackT store_intra_process_message_;
};

View file

@ -21,6 +21,7 @@
#include <sstream>
#include <string>
#include <rcl_interfaces/msg/intra_process_message.hpp>
#include <rmw/error_handling.h>
#include <rmw/rmw.h>
@ -36,6 +37,11 @@ namespace executor
class Executor;
} // namespace executor
namespace node
{
class Node;
} // namespace node
namespace subscription
{
@ -51,7 +57,8 @@ public:
rmw_subscription_t * subscription_handle,
const std::string & topic_name,
bool ignore_local_publications)
: node_handle_(node_handle),
: intra_process_subscription_handle_(nullptr),
node_handle_(node_handle),
subscription_handle_(subscription_handle),
topic_name_(topic_name),
ignore_local_publications_(ignore_local_publications)
@ -70,6 +77,15 @@ public:
(std::cerr << ss.str()).flush();
}
}
if (intra_process_subscription_handle_) {
auto ret = rmw_destroy_subscription(node_handle_.get(), intra_process_subscription_handle_);
if (ret != RMW_RET_OK) {
std::stringstream ss;
ss << "Error in destruction of rmw intra process subscription handle: " <<
rmw_get_error_string_safe() << '\n';
(std::cerr << ss.str()).flush();
}
}
}
const std::string & get_topic_name() const
@ -80,6 +96,10 @@ public:
virtual std::shared_ptr<void> create_message() = 0;
virtual void handle_message(std::shared_ptr<void> & message) = 0;
virtual void return_message(std::shared_ptr<void> & message) = 0;
virtual void handle_intra_process_message(rcl_interfaces::msg::IntraProcessMessage & ipm) = 0;
protected:
rmw_subscription_t * intra_process_subscription_handle_;
private:
RCLCPP_DISABLE_COPY(SubscriptionBase);
@ -87,6 +107,7 @@ private:
std::shared_ptr<rmw_node_t> node_handle_;
rmw_subscription_t * subscription_handle_;
std::string topic_name_;
bool ignore_local_publications_;
@ -95,6 +116,8 @@ private:
template<typename MessageT>
class Subscription : public SubscriptionBase
{
friend class rclcpp::node::Node;
public:
using CallbackType = std::function<void(const std::shared_ptr<MessageT> &)>;
RCLCPP_SMART_PTR_DEFINITIONS(Subscription);
@ -135,13 +158,56 @@ public:
message_memory_strategy_->return_message(typed_message);
}
void handle_intra_process_message(rcl_interfaces::msg::IntraProcessMessage & ipm)
{
if (!get_intra_process_message_callback_) {
// throw std::runtime_error(
// "handle_intra_process_message called before setup_intra_process");
// TODO(wjwwood): for now, this could mean that intra process was just not enabled.
// However, this can only really happen if this node has it disabled, but the other doesn't.
return;
}
std::unique_ptr<MessageT> msg;
get_intra_process_message_callback_(
ipm.publisher_id,
ipm.message_sequence,
intra_process_subscription_id_,
msg);
if (!msg) {
// This either occurred because the publisher no longer exists or the
// message requested is no longer being stored.
// TODO(wjwwood): should we notify someone of this? log error, log warning?
return;
}
typename MessageT::SharedPtr shared_msg = std::move(msg);
callback_(shared_msg);
}
private:
typedef
std::function<
void (uint64_t, uint64_t, uint64_t, std::unique_ptr<MessageT> &)
> GetMessageCallbackType;
void setup_intra_process(
uint64_t intra_process_subscription_id,
rmw_subscription_t * intra_process_subscription,
GetMessageCallbackType callback)
{
intra_process_subscription_id_ = intra_process_subscription_id;
intra_process_subscription_handle_ = intra_process_subscription;
get_intra_process_message_callback_ = callback;
}
RCLCPP_DISABLE_COPY(Subscription);
CallbackType callback_;
typename message_memory_strategy::MessageMemoryStrategy<MessageT>::SharedPtr
message_memory_strategy_;
GetMessageCallbackType get_intra_process_message_callback_;
uint64_t intra_process_subscription_id_;
};
} /* namespace subscription */

View file

@ -27,6 +27,7 @@
#include <string.h>
#include <thread>
#include <rmw/error_handling.h>
#include <rmw/macros.h>
#include <rmw/rmw.h>

View file

@ -13,6 +13,7 @@
<build_depend>rcl_interfaces</build_depend>
<build_export_depend>rcl_interfaces</build_export_depend>
<test_depend>ament_cmake_gtest</test_depend>
<test_depend>ament_lint_auto</test_depend>
<test_depend>ament_lint_common</test_depend>

View file

@ -0,0 +1,788 @@
// Copyright 2015 Open Source Robotics Foundation, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <memory>
#include <gtest/gtest.h>
#include <rclcpp/macros.hpp>
// Mock up publisher and subscription base to avoid needing an rmw impl.
namespace rclcpp
{
namespace publisher
{
namespace mock
{
class Publisher
{
public:
RCLCPP_SMART_PTR_DEFINITIONS(Publisher);
Publisher()
: mock_topic_name(""), mock_queue_size(0) {}
const std::string & get_topic_name() const
{
return mock_topic_name;
}
size_t get_queue_size() const
{
return mock_queue_size;
}
std::string mock_topic_name;
size_t mock_queue_size;
};
}
}
}
namespace rclcpp
{
namespace subscription
{
namespace mock
{
class SubscriptionBase
{
public:
RCLCPP_SMART_PTR_DEFINITIONS(SubscriptionBase);
SubscriptionBase()
: mock_topic_name(""), mock_queue_size(0) {}
const std::string & get_topic_name() const
{
return mock_topic_name;
}
size_t get_queue_size() const
{
return mock_queue_size;
}
std::string mock_topic_name;
size_t mock_queue_size;
};
}
}
}
// Prevent rclcpp/publisher.hpp and rclcpp/subscription.hpp from being imported.
#define RCLCPP_RCLCPP_PUBLISHER_HPP_
#define RCLCPP_RCLCPP_SUBSCRIPTION_HPP_
// Force ipm to use our mock publisher class.
#define Publisher mock::Publisher
#define SubscriptionBase mock::SubscriptionBase
#include <rclcpp/intra_process_manager.hpp>
#undef SubscriptionBase
#undef Publisher
#include <rcl_interfaces/msg/intra_process_message.hpp>
/*
This tests the "normal" usage of the class:
- Creates two publishers on different topics.
- Creates a subscription which matches one of them.
- Publishes on each publisher with different message content.
- Try's to take the message from the non-matching publish, should fail.
- Try's to take the message from the matching publish, should work.
- Asserts the message it got back was the one that went in (since there's only one subscription).
- Try's to take the message again, should fail.
*/
TEST(TestIntraProcessManager, nominal) {
rclcpp::intra_process_manager::IntraProcessManager ipm;
auto p1 = std::make_shared<rclcpp::publisher::mock::Publisher>();
p1->mock_topic_name = "nominal1";
p1->mock_queue_size = 2;
auto p2 = std::make_shared<rclcpp::publisher::mock::Publisher>();
p2->mock_topic_name = "nominal2";
p2->mock_queue_size = 10;
auto s1 = std::make_shared<rclcpp::subscription::mock::SubscriptionBase>();
s1->mock_topic_name = "nominal1";
s1->mock_queue_size = 10;
auto p1_id = ipm.add_publisher<rcl_interfaces::msg::IntraProcessMessage>(p1);
auto p2_id = ipm.add_publisher<rcl_interfaces::msg::IntraProcessMessage>(p2);
auto s1_id = ipm.add_subscription(s1);
auto ipm_msg = std::make_shared<rcl_interfaces::msg::IntraProcessMessage>();
ipm_msg->message_sequence = 42;
ipm_msg->publisher_id = 42;
rcl_interfaces::msg::IntraProcessMessage::UniquePtr unique_msg(
new rcl_interfaces::msg::IntraProcessMessage(*ipm_msg)
);
auto p1_m1_original_address = unique_msg.get();
auto p1_m1_id = ipm.store_intra_process_message(p1_id, unique_msg);
ASSERT_EQ(nullptr, unique_msg);
ipm_msg->message_sequence = 43;
ipm_msg->publisher_id = 43;
unique_msg.reset(new rcl_interfaces::msg::IntraProcessMessage(*ipm_msg));
auto p2_m1_id = ipm.store_intra_process_message(p2_id, unique_msg);
ASSERT_EQ(nullptr, unique_msg);
ipm.take_intra_process_message(p2_id, p2_m1_id, s1_id, unique_msg);
EXPECT_EQ(nullptr, unique_msg); // Should fail since p2 and s1 don't have the same topic.
unique_msg.reset();
ipm.take_intra_process_message(p1_id, p1_m1_id, s1_id, unique_msg);
EXPECT_NE(nullptr, unique_msg);
if (unique_msg) {
EXPECT_EQ(42ul, unique_msg->message_sequence);
EXPECT_EQ(42ul, unique_msg->publisher_id);
EXPECT_EQ(p1_m1_original_address, unique_msg.get());
}
ipm.take_intra_process_message(p1_id, p1_m1_id, s1_id, unique_msg);
EXPECT_EQ(nullptr, unique_msg); // Should fail, since the message was already taken.
ipm_msg->message_sequence = 44;
ipm_msg->publisher_id = 44;
unique_msg.reset(new rcl_interfaces::msg::IntraProcessMessage(*ipm_msg));
ipm.store_intra_process_message(p1_id, unique_msg);
ASSERT_EQ(nullptr, unique_msg);
ipm_msg->message_sequence = 45;
ipm_msg->publisher_id = 45;
unique_msg.reset(new rcl_interfaces::msg::IntraProcessMessage(*ipm_msg));
ipm.store_intra_process_message(p1_id, unique_msg);
ASSERT_EQ(nullptr, unique_msg);
ipm_msg->message_sequence = 46;
ipm_msg->publisher_id = 46;
unique_msg.reset(new rcl_interfaces::msg::IntraProcessMessage(*ipm_msg));
ipm.store_intra_process_message(p1_id, unique_msg);
ASSERT_NE(nullptr, unique_msg);
if (unique_msg) {
EXPECT_EQ(44ul, unique_msg->message_sequence);
EXPECT_EQ(44ul, unique_msg->publisher_id);
}
}
/*
Simulates the case where a publisher is removed between publishing and the matching take.
- Creates a publisher and subscription on the same topic.
- Publishes a message.
- Remove the publisher.
- Try's to take the message, should fail since the publisher (and its storage) is gone.
*/
TEST(TestIntraProcessManager, remove_publisher_before_trying_to_take) {
rclcpp::intra_process_manager::IntraProcessManager ipm;
auto p1 = std::make_shared<rclcpp::publisher::mock::Publisher>();
p1->mock_topic_name = "nominal1";
p1->mock_queue_size = 10;
auto s1 = std::make_shared<rclcpp::subscription::mock::SubscriptionBase>();
s1->mock_topic_name = "nominal1";
s1->mock_queue_size = 10;
auto p1_id = ipm.add_publisher<rcl_interfaces::msg::IntraProcessMessage>(p1);
auto s1_id = ipm.add_subscription(s1);
auto ipm_msg = std::make_shared<rcl_interfaces::msg::IntraProcessMessage>();
ipm_msg->message_sequence = 42;
ipm_msg->publisher_id = 42;
rcl_interfaces::msg::IntraProcessMessage::UniquePtr unique_msg(
new rcl_interfaces::msg::IntraProcessMessage(*ipm_msg)
);
auto p1_m1_id = ipm.store_intra_process_message(p1_id, unique_msg);
ASSERT_EQ(nullptr, unique_msg);
ipm.remove_publisher(p1_id);
ipm.take_intra_process_message(p1_id, p1_m1_id, s1_id, unique_msg);
EXPECT_EQ(nullptr, unique_msg); // Should fail, since the publisher is gone.
}
/*
Tests whether or not removed subscriptions affect take behavior.
- Creates a publisher and three subscriptions on the same topic.
- Publish a message, keep the original point for later comparison.
- Take with one subscription, should work.
- Remove a different subscription.
- Take with the final subscription, should work.
- Assert the previous take returned ownership of the original object published.
*/
TEST(TestIntraProcessManager, removed_subscription_affects_take) {
rclcpp::intra_process_manager::IntraProcessManager ipm;
auto p1 = std::make_shared<rclcpp::publisher::mock::Publisher>();
p1->mock_topic_name = "nominal1";
p1->mock_queue_size = 10;
auto s1 = std::make_shared<rclcpp::subscription::mock::SubscriptionBase>();
s1->mock_topic_name = "nominal1";
s1->mock_queue_size = 10;
auto s2 = std::make_shared<rclcpp::subscription::mock::SubscriptionBase>();
s2->mock_topic_name = "nominal1";
s2->mock_queue_size = 10;
auto s3 = std::make_shared<rclcpp::subscription::mock::SubscriptionBase>();
s3->mock_topic_name = "nominal1";
s3->mock_queue_size = 10;
auto p1_id = ipm.add_publisher<rcl_interfaces::msg::IntraProcessMessage>(p1);
auto s1_id = ipm.add_subscription(s1);
auto s2_id = ipm.add_subscription(s2);
auto s3_id = ipm.add_subscription(s3);
auto ipm_msg = std::make_shared<rcl_interfaces::msg::IntraProcessMessage>();
ipm_msg->message_sequence = 42;
ipm_msg->publisher_id = 42;
rcl_interfaces::msg::IntraProcessMessage::UniquePtr unique_msg(
new rcl_interfaces::msg::IntraProcessMessage(*ipm_msg)
);
auto original_message_pointer = unique_msg.get();
auto p1_m1_id = ipm.store_intra_process_message(p1_id, unique_msg);
ASSERT_EQ(nullptr, unique_msg);
ipm.take_intra_process_message(p1_id, p1_m1_id, s1_id, unique_msg);
EXPECT_NE(nullptr, unique_msg);
if (unique_msg) {
EXPECT_EQ(42ul, unique_msg->message_sequence);
EXPECT_EQ(42ul, unique_msg->publisher_id);
EXPECT_NE(original_message_pointer, unique_msg.get());
}
unique_msg.reset();
ipm.remove_subscription(s2_id);
// Take using s3, the remaining subscription.
ipm.take_intra_process_message(p1_id, p1_m1_id, s3_id, unique_msg);
EXPECT_NE(nullptr, unique_msg);
if (unique_msg) {
EXPECT_EQ(42ul, unique_msg->message_sequence);
EXPECT_EQ(42ul, unique_msg->publisher_id);
// Should match the original pointer since s2 was removed first.
EXPECT_EQ(original_message_pointer, unique_msg.get());
}
// Take using s2, should fail since s2 was removed.
unique_msg.reset();
ipm.take_intra_process_message(p1_id, p1_m1_id, s2_id, unique_msg);
EXPECT_EQ(nullptr, unique_msg);
}
/*
This tests normal operation with multiple subscriptions and one publisher.
- Creates a publisher and three subscriptions on the same topic.
- Publish a message.
- Take with each subscription, checking that the last takes the original back.
*/
TEST(TestIntraProcessManager, multiple_subscriptions_one_publisher) {
rclcpp::intra_process_manager::IntraProcessManager ipm;
auto p1 = std::make_shared<rclcpp::publisher::mock::Publisher>();
p1->mock_topic_name = "nominal1";
p1->mock_queue_size = 10;
auto s1 = std::make_shared<rclcpp::subscription::mock::SubscriptionBase>();
s1->mock_topic_name = "nominal1";
s1->mock_queue_size = 10;
auto s2 = std::make_shared<rclcpp::subscription::mock::SubscriptionBase>();
s2->mock_topic_name = "nominal1";
s2->mock_queue_size = 10;
auto s3 = std::make_shared<rclcpp::subscription::mock::SubscriptionBase>();
s3->mock_topic_name = "nominal1";
s3->mock_queue_size = 10;
auto p1_id = ipm.add_publisher<rcl_interfaces::msg::IntraProcessMessage>(p1);
auto s1_id = ipm.add_subscription(s1);
auto s2_id = ipm.add_subscription(s2);
auto s3_id = ipm.add_subscription(s3);
auto ipm_msg = std::make_shared<rcl_interfaces::msg::IntraProcessMessage>();
ipm_msg->message_sequence = 42;
ipm_msg->publisher_id = 42;
rcl_interfaces::msg::IntraProcessMessage::UniquePtr unique_msg(
new rcl_interfaces::msg::IntraProcessMessage(*ipm_msg)
);
auto original_message_pointer = unique_msg.get();
auto p1_m1_id = ipm.store_intra_process_message(p1_id, unique_msg);
ASSERT_EQ(nullptr, unique_msg);
ipm.take_intra_process_message(p1_id, p1_m1_id, s1_id, unique_msg);
EXPECT_NE(nullptr, unique_msg);
if (unique_msg) {
EXPECT_EQ(42ul, unique_msg->message_sequence);
EXPECT_EQ(42ul, unique_msg->publisher_id);
EXPECT_NE(original_message_pointer, unique_msg.get());
}
unique_msg.reset();
ipm.take_intra_process_message(p1_id, p1_m1_id, s2_id, unique_msg);
EXPECT_NE(nullptr, unique_msg);
if (unique_msg) {
EXPECT_EQ(42ul, unique_msg->message_sequence);
EXPECT_EQ(42ul, unique_msg->publisher_id);
EXPECT_NE(original_message_pointer, unique_msg.get());
}
unique_msg.reset();
ipm.take_intra_process_message(p1_id, p1_m1_id, s3_id, unique_msg);
EXPECT_NE(nullptr, unique_msg);
if (unique_msg) {
EXPECT_EQ(42ul, unique_msg->message_sequence);
EXPECT_EQ(42ul, unique_msg->publisher_id);
// Should match the original pointer.
EXPECT_EQ(original_message_pointer, unique_msg.get());
}
}
/*
This tests normal operation with multiple publishers and one subscription.
- Creates a publisher and three subscriptions on the same topic.
- Publish a message.
- Take with each subscription, checking that the last takes the original back.
*/
TEST(TestIntraProcessManager, multiple_publishers_one_subscription) {
rclcpp::intra_process_manager::IntraProcessManager ipm;
auto p1 = std::make_shared<rclcpp::publisher::mock::Publisher>();
p1->mock_topic_name = "nominal1";
p1->mock_queue_size = 10;
auto p2 = std::make_shared<rclcpp::publisher::mock::Publisher>();
p2->mock_topic_name = "nominal1";
p2->mock_queue_size = 10;
auto p3 = std::make_shared<rclcpp::publisher::mock::Publisher>();
p3->mock_topic_name = "nominal1";
p3->mock_queue_size = 10;
auto s1 = std::make_shared<rclcpp::subscription::mock::SubscriptionBase>();
s1->mock_topic_name = "nominal1";
s1->mock_queue_size = 10;
auto p1_id = ipm.add_publisher<rcl_interfaces::msg::IntraProcessMessage>(p1);
auto p2_id = ipm.add_publisher<rcl_interfaces::msg::IntraProcessMessage>(p2);
auto p3_id = ipm.add_publisher<rcl_interfaces::msg::IntraProcessMessage>(p3);
auto s1_id = ipm.add_subscription(s1);
auto ipm_msg = std::make_shared<rcl_interfaces::msg::IntraProcessMessage>();
// First publish
ipm_msg->message_sequence = 42;
ipm_msg->publisher_id = 42;
rcl_interfaces::msg::IntraProcessMessage::UniquePtr unique_msg(
new rcl_interfaces::msg::IntraProcessMessage(*ipm_msg)
);
auto original_message_pointer1 = unique_msg.get();
auto p1_m1_id = ipm.store_intra_process_message(p1_id, unique_msg);
ASSERT_EQ(nullptr, unique_msg);
// Second publish
ipm_msg->message_sequence = 43;
ipm_msg->publisher_id = 43;
unique_msg.reset(new rcl_interfaces::msg::IntraProcessMessage(*ipm_msg));
auto original_message_pointer2 = unique_msg.get();
auto p2_m1_id = ipm.store_intra_process_message(p2_id, unique_msg);
ASSERT_EQ(nullptr, unique_msg);
// Third publish
ipm_msg->message_sequence = 44;
ipm_msg->publisher_id = 44;
unique_msg.reset(new rcl_interfaces::msg::IntraProcessMessage(*ipm_msg));
auto original_message_pointer3 = unique_msg.get();
auto p3_m1_id = ipm.store_intra_process_message(p3_id, unique_msg);
ASSERT_EQ(nullptr, unique_msg);
// First take
ipm.take_intra_process_message(p1_id, p1_m1_id, s1_id, unique_msg);
EXPECT_NE(nullptr, unique_msg);
if (unique_msg) {
EXPECT_EQ(42ul, unique_msg->message_sequence);
EXPECT_EQ(42ul, unique_msg->publisher_id);
EXPECT_EQ(original_message_pointer1, unique_msg.get());
}
unique_msg.reset();
// Second take
ipm.take_intra_process_message(p2_id, p2_m1_id, s1_id, unique_msg);
EXPECT_NE(nullptr, unique_msg);
if (unique_msg) {
EXPECT_EQ(43ul, unique_msg->message_sequence);
EXPECT_EQ(43ul, unique_msg->publisher_id);
EXPECT_EQ(original_message_pointer2, unique_msg.get());
}
unique_msg.reset();
// Third take
ipm.take_intra_process_message(p3_id, p3_m1_id, s1_id, unique_msg);
EXPECT_NE(nullptr, unique_msg);
if (unique_msg) {
EXPECT_EQ(44ul, unique_msg->message_sequence);
EXPECT_EQ(44ul, unique_msg->publisher_id);
EXPECT_EQ(original_message_pointer3, unique_msg.get());
}
unique_msg.reset();
}
/*
This tests normal operation with multiple publishers and subscriptions.
- Creates three publishers and three subscriptions on the same topic.
- Publish a message on each publisher.
- Take from each publisher with each subscription, checking the pointer.
*/
TEST(TestIntraProcessManager, multiple_publishers_multiple_subscription) {
rclcpp::intra_process_manager::IntraProcessManager ipm;
auto p1 = std::make_shared<rclcpp::publisher::mock::Publisher>();
p1->mock_topic_name = "nominal1";
p1->mock_queue_size = 10;
auto p2 = std::make_shared<rclcpp::publisher::mock::Publisher>();
p2->mock_topic_name = "nominal1";
p2->mock_queue_size = 10;
auto p3 = std::make_shared<rclcpp::publisher::mock::Publisher>();
p3->mock_topic_name = "nominal1";
p3->mock_queue_size = 10;
auto s1 = std::make_shared<rclcpp::subscription::mock::SubscriptionBase>();
s1->mock_topic_name = "nominal1";
s1->mock_queue_size = 10;
auto s2 = std::make_shared<rclcpp::subscription::mock::SubscriptionBase>();
s2->mock_topic_name = "nominal1";
s2->mock_queue_size = 10;
auto s3 = std::make_shared<rclcpp::subscription::mock::SubscriptionBase>();
s3->mock_topic_name = "nominal1";
s3->mock_queue_size = 10;
auto p1_id = ipm.add_publisher<rcl_interfaces::msg::IntraProcessMessage>(p1);
auto p2_id = ipm.add_publisher<rcl_interfaces::msg::IntraProcessMessage>(p2);
auto p3_id = ipm.add_publisher<rcl_interfaces::msg::IntraProcessMessage>(p3);
auto s1_id = ipm.add_subscription(s1);
auto s2_id = ipm.add_subscription(s2);
auto s3_id = ipm.add_subscription(s3);
auto ipm_msg = std::make_shared<rcl_interfaces::msg::IntraProcessMessage>();
// First publish
ipm_msg->message_sequence = 42;
ipm_msg->publisher_id = 42;
rcl_interfaces::msg::IntraProcessMessage::UniquePtr unique_msg(
new rcl_interfaces::msg::IntraProcessMessage(*ipm_msg)
);
auto original_message_pointer1 = unique_msg.get();
auto p1_m1_id = ipm.store_intra_process_message(p1_id, unique_msg);
ASSERT_EQ(nullptr, unique_msg);
// Second publish
ipm_msg->message_sequence = 43;
ipm_msg->publisher_id = 43;
unique_msg.reset(new rcl_interfaces::msg::IntraProcessMessage(*ipm_msg));
auto original_message_pointer2 = unique_msg.get();
auto p2_m1_id = ipm.store_intra_process_message(p2_id, unique_msg);
ASSERT_EQ(nullptr, unique_msg);
// Third publish
ipm_msg->message_sequence = 44;
ipm_msg->publisher_id = 44;
unique_msg.reset(new rcl_interfaces::msg::IntraProcessMessage(*ipm_msg));
auto original_message_pointer3 = unique_msg.get();
auto p3_m1_id = ipm.store_intra_process_message(p3_id, unique_msg);
ASSERT_EQ(nullptr, unique_msg);
// First take
ipm.take_intra_process_message(p1_id, p1_m1_id, s1_id, unique_msg);
EXPECT_NE(nullptr, unique_msg);
if (unique_msg) {
EXPECT_EQ(42ul, unique_msg->message_sequence);
EXPECT_EQ(42ul, unique_msg->publisher_id);
EXPECT_NE(original_message_pointer1, unique_msg.get());
}
unique_msg.reset();
ipm.take_intra_process_message(p1_id, p1_m1_id, s2_id, unique_msg);
EXPECT_NE(nullptr, unique_msg);
if (unique_msg) {
EXPECT_EQ(42ul, unique_msg->message_sequence);
EXPECT_EQ(42ul, unique_msg->publisher_id);
EXPECT_NE(original_message_pointer1, unique_msg.get());
}
unique_msg.reset();
ipm.take_intra_process_message(p1_id, p1_m1_id, s3_id, unique_msg);
EXPECT_NE(nullptr, unique_msg);
if (unique_msg) {
EXPECT_EQ(42ul, unique_msg->message_sequence);
EXPECT_EQ(42ul, unique_msg->publisher_id);
EXPECT_EQ(original_message_pointer1, unique_msg.get()); // Final take.
}
unique_msg.reset();
// Second take
ipm.take_intra_process_message(p2_id, p2_m1_id, s1_id, unique_msg);
EXPECT_NE(nullptr, unique_msg);
if (unique_msg) {
EXPECT_EQ(43ul, unique_msg->message_sequence);
EXPECT_EQ(43ul, unique_msg->publisher_id);
EXPECT_NE(original_message_pointer2, unique_msg.get());
}
unique_msg.reset();
ipm.take_intra_process_message(p2_id, p2_m1_id, s2_id, unique_msg);
EXPECT_NE(nullptr, unique_msg);
if (unique_msg) {
EXPECT_EQ(43ul, unique_msg->message_sequence);
EXPECT_EQ(43ul, unique_msg->publisher_id);
EXPECT_NE(original_message_pointer2, unique_msg.get());
}
unique_msg.reset();
ipm.take_intra_process_message(p2_id, p2_m1_id, s3_id, unique_msg);
EXPECT_NE(nullptr, unique_msg);
if (unique_msg) {
EXPECT_EQ(43ul, unique_msg->message_sequence);
EXPECT_EQ(43ul, unique_msg->publisher_id);
EXPECT_EQ(original_message_pointer2, unique_msg.get());
}
unique_msg.reset();
// Third take
ipm.take_intra_process_message(p3_id, p3_m1_id, s1_id, unique_msg);
EXPECT_NE(nullptr, unique_msg);
if (unique_msg) {
EXPECT_EQ(44ul, unique_msg->message_sequence);
EXPECT_EQ(44ul, unique_msg->publisher_id);
EXPECT_NE(original_message_pointer3, unique_msg.get());
}
unique_msg.reset();
ipm.take_intra_process_message(p3_id, p3_m1_id, s2_id, unique_msg);
EXPECT_NE(nullptr, unique_msg);
if (unique_msg) {
EXPECT_EQ(44ul, unique_msg->message_sequence);
EXPECT_EQ(44ul, unique_msg->publisher_id);
EXPECT_NE(original_message_pointer3, unique_msg.get());
}
unique_msg.reset();
ipm.take_intra_process_message(p3_id, p3_m1_id, s3_id, unique_msg);
EXPECT_NE(nullptr, unique_msg);
if (unique_msg) {
EXPECT_EQ(44ul, unique_msg->message_sequence);
EXPECT_EQ(44ul, unique_msg->publisher_id);
EXPECT_EQ(original_message_pointer3, unique_msg.get());
}
unique_msg.reset();
}
/*
Tests displacing a message from the ring buffer before take is called.
- Creates a publisher (buffer_size = 2) and a subscription on the same topic.
- Publish a message on the publisher.
- Publish another message.
- Take the second message.
- Publish a message.
- Try to take the first message, should fail.
*/
TEST(TestIntraProcessManager, ring_buffer_displacement) {
rclcpp::intra_process_manager::IntraProcessManager ipm;
auto p1 = std::make_shared<rclcpp::publisher::mock::Publisher>();
p1->mock_topic_name = "nominal1";
p1->mock_queue_size = 2;
auto s1 = std::make_shared<rclcpp::subscription::mock::SubscriptionBase>();
s1->mock_topic_name = "nominal1";
s1->mock_queue_size = 10;
auto p1_id = ipm.add_publisher<rcl_interfaces::msg::IntraProcessMessage>(p1);
auto s1_id = ipm.add_subscription(s1);
auto ipm_msg = std::make_shared<rcl_interfaces::msg::IntraProcessMessage>();
ipm_msg->message_sequence = 42;
ipm_msg->publisher_id = 42;
rcl_interfaces::msg::IntraProcessMessage::UniquePtr unique_msg(
new rcl_interfaces::msg::IntraProcessMessage(*ipm_msg)
);
auto original_message_pointer1 = unique_msg.get();
auto p1_m1_id = ipm.store_intra_process_message(p1_id, unique_msg);
ASSERT_EQ(nullptr, unique_msg);
ipm_msg->message_sequence = 43;
ipm_msg->publisher_id = 43;
unique_msg.reset(new rcl_interfaces::msg::IntraProcessMessage(*ipm_msg));
auto original_message_pointer2 = unique_msg.get();
auto p1_m2_id = ipm.store_intra_process_message(p1_id, unique_msg);
ASSERT_EQ(nullptr, unique_msg);
ipm.take_intra_process_message(p1_id, p1_m2_id, s1_id, unique_msg);
EXPECT_NE(nullptr, unique_msg);
if (unique_msg) {
EXPECT_EQ(43ul, unique_msg->message_sequence);
EXPECT_EQ(43ul, unique_msg->publisher_id);
EXPECT_EQ(original_message_pointer2, unique_msg.get());
}
unique_msg.reset();
ipm_msg->message_sequence = 44;
ipm_msg->publisher_id = 44;
unique_msg.reset(new rcl_interfaces::msg::IntraProcessMessage(*ipm_msg));
ipm.store_intra_process_message(p1_id, unique_msg);
EXPECT_NE(nullptr, unique_msg); // Should return the thing in the ring buffer it displaced.
if (unique_msg) {
// This should have been the first published message.
EXPECT_EQ(42ul, unique_msg->message_sequence);
EXPECT_EQ(42ul, unique_msg->publisher_id);
EXPECT_EQ(original_message_pointer1, unique_msg.get());
}
unique_msg.reset();
// Since it just got displaced it should no longer be there to take.
ipm.take_intra_process_message(p1_id, p1_m1_id, s1_id, unique_msg);
EXPECT_EQ(nullptr, unique_msg);
}
/*
Simulates race condition where a subscription is created after publish.
- Creates a publisher.
- Publish a message on the publisher.
- Create a subscription on the same topic.
- Try to take the message with the newly created subscription, should fail.
*/
TEST(TestIntraProcessManager, subscription_creation_race_condition) {
rclcpp::intra_process_manager::IntraProcessManager ipm;
auto p1 = std::make_shared<rclcpp::publisher::mock::Publisher>();
p1->mock_topic_name = "nominal1";
p1->mock_queue_size = 2;
auto p1_id = ipm.add_publisher<rcl_interfaces::msg::IntraProcessMessage>(p1);
auto ipm_msg = std::make_shared<rcl_interfaces::msg::IntraProcessMessage>();
ipm_msg->message_sequence = 42;
ipm_msg->publisher_id = 42;
rcl_interfaces::msg::IntraProcessMessage::UniquePtr unique_msg(
new rcl_interfaces::msg::IntraProcessMessage(*ipm_msg)
);
auto p1_m1_id = ipm.store_intra_process_message(p1_id, unique_msg);
ASSERT_EQ(nullptr, unique_msg);
auto s1 = std::make_shared<rclcpp::subscription::mock::SubscriptionBase>();
s1->mock_topic_name = "nominal1";
s1->mock_queue_size = 10;
auto s1_id = ipm.add_subscription(s1);
ipm.take_intra_process_message(p1_id, p1_m1_id, s1_id, unique_msg);
EXPECT_EQ(nullptr, unique_msg);
}
/*
Simulates race condition where a publisher goes out of scope before take.
- Create a subscription.
- Creates a publisher on the same topic in a scope.
- Publish a message on the publisher in a scope.
- Let the scope expire.
- Try to take the message with the subscription, should fail.
*/
TEST(TestIntraProcessManager, publisher_out_of_scope_take) {
rclcpp::intra_process_manager::IntraProcessManager ipm;
auto s1 = std::make_shared<rclcpp::subscription::mock::SubscriptionBase>();
s1->mock_topic_name = "nominal1";
s1->mock_queue_size = 10;
auto s1_id = ipm.add_subscription(s1);
uint64_t p1_id;
uint64_t p1_m1_id;
{
auto p1 = std::make_shared<rclcpp::publisher::mock::Publisher>();
p1->mock_topic_name = "nominal1";
p1->mock_queue_size = 2;
p1_id = ipm.add_publisher<rcl_interfaces::msg::IntraProcessMessage>(p1);
auto ipm_msg = std::make_shared<rcl_interfaces::msg::IntraProcessMessage>();
ipm_msg->message_sequence = 42;
ipm_msg->publisher_id = 42;
rcl_interfaces::msg::IntraProcessMessage::UniquePtr unique_msg(
new rcl_interfaces::msg::IntraProcessMessage(*ipm_msg)
);
p1_m1_id = ipm.store_intra_process_message(p1_id, unique_msg);
ASSERT_EQ(nullptr, unique_msg);
// Explicitly remove publisher from ipm (emulate's publisher's destructor).
ipm.remove_publisher(p1_id);
}
rcl_interfaces::msg::IntraProcessMessage::UniquePtr unique_msg(nullptr);
// Should fail because the publisher is out of scope.
ipm.take_intra_process_message(p1_id, p1_m1_id, s1_id, unique_msg);
EXPECT_EQ(nullptr, unique_msg);
}
/*
Simulates race condition where a publisher goes out of scope before store.
- Creates a publisher in a scope.
- Let the scope expire.
- Publish a message on the publisher in a scope, should throw.
*/
TEST(TestIntraProcessManager, publisher_out_of_scope_store) {
rclcpp::intra_process_manager::IntraProcessManager ipm;
uint64_t p1_id;
{
auto p1 = std::make_shared<rclcpp::publisher::mock::Publisher>();
p1->mock_topic_name = "nominal1";
p1->mock_queue_size = 2;
p1_id = ipm.add_publisher<rcl_interfaces::msg::IntraProcessMessage>(p1);
}
auto ipm_msg = std::make_shared<rcl_interfaces::msg::IntraProcessMessage>();
ipm_msg->message_sequence = 42;
ipm_msg->publisher_id = 42;
rcl_interfaces::msg::IntraProcessMessage::UniquePtr unique_msg(
new rcl_interfaces::msg::IntraProcessMessage(*ipm_msg)
);
EXPECT_THROW(ipm.store_intra_process_message(p1_id, unique_msg), std::runtime_error);
ASSERT_EQ(nullptr, unique_msg);
}

View file

@ -0,0 +1,172 @@
// Copyright 2015 Open Source Robotics Foundation, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <gtest/gtest.h>
#include <rclcpp/mapped_ring_buffer.hpp>
/*
Tests get_copy and pop on an empty mrb.
*/
TEST(TestMappedRingBuffer, empty) {
// Cannot create a buffer of size zero.
EXPECT_THROW(rclcpp::mapped_ring_buffer::MappedRingBuffer<char> mrb(0), std::invalid_argument);
// Getting or popping an empty buffer should result in a nullptr.
rclcpp::mapped_ring_buffer::MappedRingBuffer<char> mrb(1);
std::unique_ptr<char> actual;
mrb.get_copy_at_key(1, actual);
EXPECT_EQ(nullptr, actual);
mrb.pop_at_key(1, actual);
EXPECT_EQ(nullptr, actual);
}
/*
Tests push_and_replace with a temporary object.
*/
TEST(TestMappedRingBuffer, temporary_l_value) {
rclcpp::mapped_ring_buffer::MappedRingBuffer<char> mrb(2);
// Pass in value with temporary object
mrb.push_and_replace(1, std::unique_ptr<char>(new char('a')));
std::unique_ptr<char> actual;
mrb.get_copy_at_key(1, actual);
EXPECT_EQ('a', *actual);
mrb.pop_at_key(1, actual);
EXPECT_EQ('a', *actual);
mrb.get_copy_at_key(1, actual);
EXPECT_EQ(nullptr, actual);
}
/*
Tests normal usage of the mrb.
*/
TEST(TestMappedRingBuffer, nominal) {
rclcpp::mapped_ring_buffer::MappedRingBuffer<char> mrb(2);
std::unique_ptr<char> expected(new char('a'));
// Store expected value's address for later comparison.
char * expected_orig = expected.get();
EXPECT_FALSE(mrb.push_and_replace(1, expected));
std::unique_ptr<char> actual;
mrb.get_copy_at_key(1, actual);
EXPECT_NE(nullptr, actual);
if (actual) {
EXPECT_EQ('a', *actual);
}
EXPECT_NE(expected_orig, actual.get());
mrb.pop_at_key(1, actual);
EXPECT_NE(nullptr, actual);
if (actual) {
EXPECT_EQ('a', *actual);
}
EXPECT_EQ(expected_orig, actual.get());
mrb.get_copy_at_key(1, actual);
EXPECT_EQ(nullptr, actual);
expected.reset(new char('a'));
EXPECT_FALSE(mrb.push_and_replace(1, expected));
expected.reset(new char('b'));
EXPECT_FALSE(mrb.push_and_replace(2, expected));
expected.reset(new char('c'));
EXPECT_TRUE(mrb.push_and_replace(3, expected));
mrb.get_copy_at_key(1, actual);
EXPECT_EQ(nullptr, actual);
mrb.get_copy_at_key(2, actual);
EXPECT_NE(nullptr, actual);
if (actual) {
EXPECT_EQ('b', *actual);
}
mrb.get_copy_at_key(3, actual);
EXPECT_NE(nullptr, actual);
if (actual) {
EXPECT_EQ('c', *actual);
}
}
/*
Tests get_ownership on a normal mrb.
*/
TEST(TestMappedRingBuffer, get_ownership) {
rclcpp::mapped_ring_buffer::MappedRingBuffer<char> mrb(2);
std::unique_ptr<char> expected(new char('a'));
// Store expected value's address for later comparison.
char * expected_orig = expected.get();
EXPECT_FALSE(mrb.push_and_replace(1, expected));
std::unique_ptr<char> actual;
mrb.get_copy_at_key(1, actual);
EXPECT_NE(nullptr, actual);
if (actual) {
EXPECT_EQ('a', *actual);
}
EXPECT_NE(expected_orig, actual.get());
mrb.get_ownership_at_key(1, actual);
EXPECT_NE(nullptr, actual);
if (actual) {
EXPECT_EQ('a', *actual);
}
EXPECT_EQ(expected_orig, actual.get());
mrb.pop_at_key(1, actual);
EXPECT_NE(nullptr, actual);
if (actual) {
EXPECT_EQ('a', *actual); // The value should be the same.
}
EXPECT_NE(expected_orig, actual.get()); // Even though we pop'ed, we didn't get the original.
mrb.get_copy_at_key(1, actual);
EXPECT_EQ(nullptr, actual);
}
/*
Tests the affect of reusing keys (non-unique keys) in a mrb.
*/
TEST(TestMappedRingBuffer, non_unique_keys) {
rclcpp::mapped_ring_buffer::MappedRingBuffer<char> mrb(2);
std::unique_ptr<char> input(new char('a'));
mrb.push_and_replace(1, input);
input.reset(new char('b'));
// Different value, same key.
mrb.push_and_replace(1, input);
std::unique_ptr<char> actual;
mrb.pop_at_key(1, actual);
EXPECT_NE(nullptr, actual);
if (actual) {
EXPECT_EQ('a', *actual);
}
actual = nullptr;
mrb.pop_at_key(1, actual);
EXPECT_NE(nullptr, actual);
if (actual) {
EXPECT_EQ('b', *actual);
}
}