fixes to address comments and CI failures

This commit is contained in:
William Woodall 2015-08-19 13:10:15 -07:00
parent aedc494f8f
commit 12b939cf5a
10 changed files with 264 additions and 181 deletions

View file

@ -22,8 +22,10 @@ if(AMENT_ENABLE_TESTING)
ament_add_gtest(test_mapped_ring_buffer test/test_mapped_ring_buffer.cpp) ament_add_gtest(test_mapped_ring_buffer test/test_mapped_ring_buffer.cpp)
ament_add_gtest(test_intra_process_manager test/test_intra_process_manager.cpp) ament_add_gtest(test_intra_process_manager test/test_intra_process_manager.cpp)
if(TARGET test_intra_process_manager)
target_include_directories(test_intra_process_manager PUBLIC "${rcl_interfaces_INCLUDE_DIRS}") target_include_directories(test_intra_process_manager PUBLIC "${rcl_interfaces_INCLUDE_DIRS}")
endif() endif()
endif()
ament_package( ament_package(
CONFIG_EXTRAS rclcpp-extras.cmake CONFIG_EXTRAS rclcpp-extras.cmake

View file

@ -51,11 +51,13 @@ public:
auto it = sub_contexts_.find(type_i); auto it = sub_contexts_.find(type_i);
if (it == sub_contexts_.end()) { if (it == sub_contexts_.end()) {
// It doesn't exist yet, make it // It doesn't exist yet, make it
// *INDENT-OFF* (prevent uncrustify from making unecessary indents here)
sub_context = std::shared_ptr<SubContext>( sub_context = std::shared_ptr<SubContext>(
new SubContext(std::forward<Args>(args) ...), new SubContext(std::forward<Args>(args) ...),
[] (SubContext * sub_context_ptr) { [] (SubContext * sub_context_ptr) {
delete sub_context_ptr; delete sub_context_ptr;
}); });
// *INDENT-ON*
sub_contexts_[type_i] = sub_context; sub_contexts_[type_i] = sub_context;
} else { } else {
// It exists, get it out and cast it. // It exists, get it out and cast it.

View file

@ -21,8 +21,9 @@
#include <rclcpp/subscription.hpp> #include <rclcpp/subscription.hpp>
#include <algorithm> #include <algorithm>
#include <cassert> #include <atomic>
#include <cstdint> #include <cstdint>
#include <exception>
#include <limits> #include <limits>
#include <map> #include <map>
#include <unordered_map> #include <unordered_map>
@ -115,6 +116,7 @@ class IntraProcessManager
{ {
private: private:
RCLCPP_DISABLE_COPY(IntraProcessManager); RCLCPP_DISABLE_COPY(IntraProcessManager);
public: public:
RCLCPP_SMART_PTR_DEFINITIONS(IntraProcessManager); RCLCPP_SMART_PTR_DEFINITIONS(IntraProcessManager);
@ -129,8 +131,8 @@ public:
* *
* This method will allocate memory. * This method will allocate memory.
* *
* /param subscription the Subscription to register. * \param subscription the Subscription to register.
* /return an unsigned 64-bit integer which is the subscription's unique id. * \return an unsigned 64-bit integer which is the subscription's unique id.
*/ */
uint64_t uint64_t
add_subscription(subscription::SubscriptionBase::SharedPtr subscription) add_subscription(subscription::SubscriptionBase::SharedPtr subscription)
@ -144,7 +146,7 @@ public:
/// Unregister a subscription using the subscription's unique id. /// Unregister a subscription using the subscription's unique id.
/* This method does not allocate memory. /* This method does not allocate memory.
* *
* /param intra_process_subscription_id id of the subscription to remove. * \param intra_process_subscription_id id of the subscription to remove.
*/ */
void void
remove_subscription(uint64_t intra_process_subscription_id) remove_subscription(uint64_t intra_process_subscription_id)
@ -180,9 +182,9 @@ public:
* *
* This method will allocate memory. * This method will allocate memory.
* *
* /param publisher publisher to be registered with the manager. * \param publisher publisher to be registered with the manager.
* /param buffer_size if 0 (default) a size is calculated based on the QoS. * \param buffer_size if 0 (default) a size is calculated based on the QoS.
* /return an unsigned 64-bit integer which is the publisher's unique id. * \return an unsigned 64-bit integer which is the publisher's unique id.
*/ */
template<typename MessageT> template<typename MessageT>
uint64_t uint64_t
@ -192,7 +194,9 @@ public:
publishers_[id].publisher = publisher; publishers_[id].publisher = publisher;
size_t size = buffer_size > 0 ? buffer_size : publisher->get_queue_size(); size_t size = buffer_size > 0 ? buffer_size : publisher->get_queue_size();
// As long as the size of the ring buffer is less than the max sequence number, we're safe. // As long as the size of the ring buffer is less than the max sequence number, we're safe.
assert(size <= std::numeric_limits<uint64_t>::max()); if (size > std::numeric_limits<uint64_t>::max()) {
throw std::invalid_argument("the calculated buffer size is too large");
}
publishers_[id].sequence_number.store(0); publishers_[id].sequence_number.store(0);
publishers_[id].buffer = mapped_ring_buffer::MappedRingBuffer<MessageT>::make_shared(size); publishers_[id].buffer = mapped_ring_buffer::MappedRingBuffer<MessageT>::make_shared(size);
publishers_[id].target_subscriptions_by_message_sequence.reserve(size); publishers_[id].target_subscriptions_by_message_sequence.reserve(size);
@ -202,7 +206,7 @@ public:
/// Unregister a publisher using the publisher's unique id. /// Unregister a publisher using the publisher's unique id.
/* This method does not allocate memory. /* This method does not allocate memory.
* *
* /param intra_process_publisher_id id of the publisher to remove. * \param intra_process_publisher_id id of the publisher to remove.
*/ */
void void
remove_publisher(uint64_t intra_process_publisher_id) remove_publisher(uint64_t intra_process_publisher_id)
@ -236,9 +240,9 @@ public:
* *
* This method does allocate memory. * This method does allocate memory.
* *
* /param intra_process_publisher_id the id of the publisher of this message. * \param intra_process_publisher_id the id of the publisher of this message.
* /param message the message that is being stored. * \param message the message that is being stored.
* /return the message sequence number. * \return the message sequence number.
*/ */
template<typename MessageT> template<typename MessageT>
uint64_t uint64_t
@ -250,7 +254,7 @@ public:
if (it == publishers_.end()) { if (it == publishers_.end()) {
throw std::runtime_error("store_intra_process_message called with invalid publisher id"); throw std::runtime_error("store_intra_process_message called with invalid publisher id");
} }
publisher_info & info = it->second; PublisherInfo & info = it->second;
// Calculate the next message sequence number. // Calculate the next message sequence number.
uint64_t message_seq = info.sequence_number.fetch_add(1, std::memory_order_relaxed); uint64_t message_seq = info.sequence_number.fetch_add(1, std::memory_order_relaxed);
// Insert the message into the ring buffer using the message_seq to identify it. // Insert the message into the ring buffer using the message_seq to identify it.
@ -309,10 +313,10 @@ public:
* *
* This method may allocate memory to copy the stored message. * This method may allocate memory to copy the stored message.
* *
* /param intra_process_publisher_id the id of the message's publisher. * \param intra_process_publisher_id the id of the message's publisher.
* /param message_sequence_number the sequence number of the message. * \param message_sequence_number the sequence number of the message.
* /param requesting_subscriptions_intra_process_id the subscription's id. * \param requesting_subscriptions_intra_process_id the subscription's id.
* /param message the message typed unique_ptr used to return the message. * \param message the message typed unique_ptr used to return the message.
*/ */
template<typename MessageT> template<typename MessageT>
void void
@ -323,7 +327,7 @@ public:
std::unique_ptr<MessageT> & message) std::unique_ptr<MessageT> & message)
{ {
message = nullptr; message = nullptr;
publisher_info * info; PublisherInfo * info;
{ {
auto it = publishers_.find(intra_process_publisher_id); auto it = publishers_.find(intra_process_publisher_id);
if (it == publishers_.end()) { if (it == publishers_.end()) {
@ -377,9 +381,11 @@ private:
// So around 585 million years. Even at 1 GHz, it would take 585 years. // So around 585 million years. Even at 1 GHz, it would take 585 years.
// I think it's safe to avoid trying to handle overflow. // I think it's safe to avoid trying to handle overflow.
// If we roll over then it's most likely a bug. // If we roll over then it's most likely a bug.
// *INDENT-OFF* (prevent uncrustify from making unecessary indents here)
throw std::overflow_error( throw std::overflow_error(
"exhausted the unique id's for publishers and subscribers in this process " "exhausted the unique id's for publishers and subscribers in this process "
"(congratulations your computer is either extremely fast or extremely old)"); "(congratulations your computer is either extremely fast or extremely old)");
// *INDENT-ON*
} }
return next_id; return next_id;
} }
@ -389,11 +395,11 @@ private:
std::unordered_map<uint64_t, subscription::SubscriptionBase::WeakPtr> subscriptions_; std::unordered_map<uint64_t, subscription::SubscriptionBase::WeakPtr> subscriptions_;
std::map<std::string, std::set<uint64_t>> subscription_ids_by_topic_; std::map<std::string, std::set<uint64_t>> subscription_ids_by_topic_;
struct publisher_info struct PublisherInfo
{ {
RCLCPP_DISABLE_COPY(publisher_info); RCLCPP_DISABLE_COPY(PublisherInfo);
publisher_info() = default; PublisherInfo() = default;
publisher::Publisher::WeakPtr publisher; publisher::Publisher::WeakPtr publisher;
std::atomic<uint64_t> sequence_number; std::atomic<uint64_t> sequence_number;
@ -401,7 +407,7 @@ private:
std::unordered_map<uint64_t, std::set<uint64_t>> target_subscriptions_by_message_sequence; std::unordered_map<uint64_t, std::set<uint64_t>> target_subscriptions_by_message_sequence;
}; };
std::unordered_map<uint64_t, publisher_info> publishers_; std::unordered_map<uint64_t, PublisherInfo> publishers_;
}; };

View file

@ -59,9 +59,10 @@ public:
/// Constructor. /// Constructor.
/* The constructor will allocate memory while reserving space. /* The constructor will allocate memory while reserving space.
* *
* /param size size of the ring buffer; must be positive and non-zero. * \param size size of the ring buffer; must be positive and non-zero.
*/ */
MappedRingBuffer(size_t size) : elements_(size), head_(0) MappedRingBuffer(size_t size)
: elements_(size), head_(0)
{ {
if (size == 0) { if (size == 0) {
throw std::invalid_argument("size must be a positive, non-zero value"); throw std::invalid_argument("size must be a positive, non-zero value");
@ -75,8 +76,10 @@ public:
* *
* The key is not guaranteed to be unique, see the class docs for more. * The key is not guaranteed to be unique, see the class docs for more.
* *
* /param key the key associated with the stored value * The contents of value before the method is called are discarded.
* /param value if the key is found, the value is stored in this parameter *
* \param key the key associated with the stored value
* \param value if the key is found, the value is stored in this parameter
*/ */
void void
get_copy_at_key(uint64_t key, std::unique_ptr<T> & value) get_copy_at_key(uint64_t key, std::unique_ptr<T> & value)
@ -102,8 +105,10 @@ public:
* originally stored object, since it was returned by the first call to this * originally stored object, since it was returned by the first call to this
* method. * method.
* *
* /param key the key associated with the stored value * The contents of value before the method is called are discarded.
* /param value if the key is found, the value is stored in this parameter *
* \param key the key associated with the stored value
* \param value if the key is found, the value is stored in this parameter
*/ */
void void
get_ownership_at_key(uint64_t key, std::unique_ptr<T> & value) get_ownership_at_key(uint64_t key, std::unique_ptr<T> & value)
@ -125,8 +130,10 @@ public:
* *
* The key is not guaranteed to be unique, see the class docs for more. * The key is not guaranteed to be unique, see the class docs for more.
* *
* /param key the key associated with the stored value * The contents of value before the method is called are discarded.
* /param value if the key is found, the value is stored in this parameter *
* \param key the key associated with the stored value
* \param value if the key is found, the value is stored in this parameter
*/ */
void void
pop_at_key(uint64_t key, std::unique_ptr<T> & value) pop_at_key(uint64_t key, std::unique_ptr<T> & value)
@ -147,8 +154,8 @@ public:
* After insertion, if a pair was replaced, then value will contain ownership * After insertion, if a pair was replaced, then value will contain ownership
* of that displaced value. Otherwise it will be a nullptr. * of that displaced value. Otherwise it will be a nullptr.
* *
* /param key the key associated with the value to be stored * \param key the key associated with the value to be stored
* /param value the value to store, and optionally the value displaced * \param value the value to store, and optionally the value displaced
*/ */
bool bool
push_and_replace(uint64_t key, std::unique_ptr<T> & value) push_and_replace(uint64_t key, std::unique_ptr<T> & value)
@ -188,9 +195,11 @@ private:
typename std::vector<element>::iterator typename std::vector<element>::iterator
get_iterator_of_key(uint64_t key) get_iterator_of_key(uint64_t key)
{ {
// *INDENT-OFF* (prevent uncrustify from making unecessary indents here)
auto it = std::find_if(elements_.begin(), elements_.end(), [key](element & e) -> bool { auto it = std::find_if(elements_.begin(), elements_.end(), [key](element & e) -> bool {
return e.key == key && e.in_use; return e.key == key && e.in_use;
}); });
// *INDENT-ON*
return it; return it;
} }

View file

@ -200,7 +200,7 @@ public:
private: private:
RCLCPP_DISABLE_COPY(Node); RCLCPP_DISABLE_COPY(Node);
static const rosidl_message_type_support_t * ipm_ts; static const rosidl_message_type_support_t * ipm_ts_;
bool bool
group_in_node(callback_group::CallbackGroup::SharedPtr & group); group_in_node(callback_group::CallbackGroup::SharedPtr & group);
@ -312,10 +312,8 @@ private:
} }
}; };
const rosidl_message_type_support_t * Node::ipm_ts = const rosidl_message_type_support_t * Node::ipm_ts_ =
rosidl_generator_cpp::get_message_type_support_handle< rosidl_generator_cpp::get_message_type_support_handle<rcl_interfaces::msg::IntraProcessMessage>();
rcl_interfaces::msg::IntraProcessMessage
>();
} /* namespace node */ } /* namespace node */
} /* namespace rclcpp */ } /* namespace rclcpp */

View file

@ -87,6 +87,7 @@ Node::Node(
// *INDENT-ON* // *INDENT-ON*
} }
// Initialize node handle shared_ptr with custom deleter. // Initialize node handle shared_ptr with custom deleter.
// *INDENT-OFF*
node_handle_.reset(node, [](rmw_node_t * node) { node_handle_.reset(node, [](rmw_node_t * node) {
auto ret = rmw_destroy_node(node); auto ret = rmw_destroy_node(node);
if (ret != RMW_RET_OK) { if (ret != RMW_RET_OK) {
@ -94,6 +95,7 @@ Node::Node(
stderr, "Error in destruction of rmw node handle: %s\n", rmw_get_error_string_safe()); stderr, "Error in destruction of rmw node handle: %s\n", rmw_get_error_string_safe());
} }
}); });
// *INDENT-ON*
using rclcpp::callback_group::CallbackGroupType; using rclcpp::callback_group::CallbackGroupType;
default_callback_group_ = create_callback_group( default_callback_group_ = create_callback_group(
@ -135,7 +137,7 @@ Node::create_publisher(
if (use_intra_process_comms_) { if (use_intra_process_comms_) {
rmw_publisher_t * intra_process_publisher_handle = rmw_create_publisher( rmw_publisher_t * intra_process_publisher_handle = rmw_create_publisher(
node_handle_.get(), ipm_ts, (topic_name + "__intra").c_str(), qos_profile); node_handle_.get(), ipm_ts_, (topic_name + "__intra").c_str(), qos_profile);
if (!intra_process_publisher_handle) { if (!intra_process_publisher_handle) {
// *INDENT-OFF* (prevent uncrustify from making unecessary indents here) // *INDENT-OFF* (prevent uncrustify from making unecessary indents here)
throw std::runtime_error( throw std::runtime_error(
@ -149,6 +151,7 @@ Node::create_publisher(
uint64_t intra_process_publisher_id = uint64_t intra_process_publisher_id =
intra_process_manager->add_publisher<MessageT>(publisher); intra_process_manager->add_publisher<MessageT>(publisher);
rclcpp::intra_process_manager::IntraProcessManager::WeakPtr weak_ipm = intra_process_manager; rclcpp::intra_process_manager::IntraProcessManager::WeakPtr weak_ipm = intra_process_manager;
// *INDENT-OFF*
auto shared_publish_callback = auto shared_publish_callback =
[weak_ipm](uint64_t publisher_id, std::shared_ptr<void> msg) -> uint64_t [weak_ipm](uint64_t publisher_id, std::shared_ptr<void> msg) -> uint64_t
{ {
@ -163,6 +166,7 @@ Node::create_publisher(
uint64_t message_seq = ipm->store_intra_process_message(publisher_id, unique_msg); uint64_t message_seq = ipm->store_intra_process_message(publisher_id, unique_msg);
return message_seq; return message_seq;
}; };
// *INDENT-ON*
publisher->setup_intra_process( publisher->setup_intra_process(
intra_process_publisher_id, intra_process_publisher_id,
shared_publish_callback, shared_publish_callback,
@ -225,7 +229,7 @@ Node::create_subscription(
// Setup intra process. // Setup intra process.
if (use_intra_process_comms_) { if (use_intra_process_comms_) {
rmw_subscription_t * intra_process_subscriber_handle = rmw_create_subscription( rmw_subscription_t * intra_process_subscriber_handle = rmw_create_subscription(
node_handle_.get(), ipm_ts, node_handle_.get(), ipm_ts_,
(topic_name + "__intra").c_str(), qos_profile, false); (topic_name + "__intra").c_str(), qos_profile, false);
if (!subscriber_handle) { if (!subscriber_handle) {
// *INDENT-OFF* (prevent uncrustify from making unecessary indents here) // *INDENT-OFF* (prevent uncrustify from making unecessary indents here)
@ -238,6 +242,7 @@ Node::create_subscription(
rclcpp::intra_process_manager::IntraProcessManager::WeakPtr weak_ipm = intra_process_manager; rclcpp::intra_process_manager::IntraProcessManager::WeakPtr weak_ipm = intra_process_manager;
uint64_t intra_process_subscription_id = uint64_t intra_process_subscription_id =
intra_process_manager->add_subscription(sub_base_ptr); intra_process_manager->add_subscription(sub_base_ptr);
// *INDENT-OFF*
sub->setup_intra_process( sub->setup_intra_process(
intra_process_subscription_id, intra_process_subscription_id,
intra_process_subscriber_handle, intra_process_subscriber_handle,
@ -255,6 +260,7 @@ Node::create_subscription(
} }
ipm->take_intra_process_message(publisher_id, message_sequence, subscription_id, message); ipm->take_intra_process_message(publisher_id, message_sequence, subscription_id, message);
}); });
// *INDENT-ON*
} }
// Assign to a group. // Assign to a group.
if (group) { if (group) {

View file

@ -42,6 +42,7 @@ namespace publisher
class Publisher class Publisher
{ {
friend rclcpp::node::Node; friend rclcpp::node::Node;
public: public:
RCLCPP_SMART_PTR_DEFINITIONS(Publisher); RCLCPP_SMART_PTR_DEFINITIONS(Publisher);
@ -107,7 +108,7 @@ public:
} }
} }
std::string const std::string &
get_topic_name() const get_topic_name() const
{ {
return topic_; return topic_;
@ -120,6 +121,7 @@ public:
} }
typedef std::function<uint64_t(uint64_t, std::shared_ptr<void>)> StoreSharedMessageCallbackT; typedef std::function<uint64_t(uint64_t, std::shared_ptr<void>)> StoreSharedMessageCallbackT;
protected: protected:
void void
setup_intra_process( setup_intra_process(

View file

@ -12,46 +12,76 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
#include <memory>
#include <gtest/gtest.h> #include <gtest/gtest.h>
#include <rclcpp/macros.hpp> #include <rclcpp/macros.hpp>
// Mock up publisher and subscription base to avoid needing an rmw impl. // Mock up publisher and subscription base to avoid needing an rmw impl.
namespace rclcpp { namespace publisher { namespace mock { namespace rclcpp
{
namespace publisher
{
namespace mock
{
class Publisher class Publisher
{ {
public: public:
RCLCPP_SMART_PTR_DEFINITIONS(Publisher); RCLCPP_SMART_PTR_DEFINITIONS(Publisher);
Publisher() : mock_topic_name(""), mock_queue_size(0) {} Publisher()
: mock_topic_name(""), mock_queue_size(0) {}
std::string get_topic_name() const {return mock_topic_name;} const std::string & get_topic_name() const
size_t get_queue_size() const {return mock_queue_size;} {
return mock_topic_name;
}
size_t get_queue_size() const
{
return mock_queue_size;
}
std::string mock_topic_name; std::string mock_topic_name;
size_t mock_queue_size; size_t mock_queue_size;
}; };
}}} }
}
}
namespace rclcpp { namespace subscription { namespace mock { namespace rclcpp
{
namespace subscription
{
namespace mock
{
class SubscriptionBase class SubscriptionBase
{ {
public: public:
RCLCPP_SMART_PTR_DEFINITIONS(SubscriptionBase); RCLCPP_SMART_PTR_DEFINITIONS(SubscriptionBase);
SubscriptionBase() : mock_topic_name(""), mock_queue_size(0) {} SubscriptionBase()
: mock_topic_name(""), mock_queue_size(0) {}
std::string get_topic_name() const {return mock_topic_name;} const std::string & get_topic_name() const
size_t get_queue_size() const {return mock_queue_size;} {
return mock_topic_name;
}
size_t get_queue_size() const
{
return mock_queue_size;
}
std::string mock_topic_name; std::string mock_topic_name;
size_t mock_queue_size; size_t mock_queue_size;
}; };
}}} }
}
}
// Prevent rclcpp/publisher.hpp and rclcpp/subscription.hpp from being imported. // Prevent rclcpp/publisher.hpp and rclcpp/subscription.hpp from being imported.
#define RCLCPP_RCLCPP_PUBLISHER_HPP_ #define RCLCPP_RCLCPP_PUBLISHER_HPP_
@ -75,7 +105,7 @@ This tests the "normal" usage of the class:
- Asserts the message it got back was the one that went in (since there's only one subscription). - Asserts the message it got back was the one that went in (since there's only one subscription).
- Try's to take the message again, should fail. - Try's to take the message again, should fail.
*/ */
TEST(test_intra_process_manager, nominal) { TEST(TestIntraProcessManager, nominal) {
rclcpp::intra_process_manager::IntraProcessManager ipm; rclcpp::intra_process_manager::IntraProcessManager ipm;
auto p1 = std::make_shared<rclcpp::publisher::mock::Publisher>(); auto p1 = std::make_shared<rclcpp::publisher::mock::Publisher>();
@ -97,8 +127,9 @@ TEST(test_intra_process_manager, nominal) {
auto ipm_msg = std::make_shared<rcl_interfaces::msg::IntraProcessMessage>(); auto ipm_msg = std::make_shared<rcl_interfaces::msg::IntraProcessMessage>();
ipm_msg->message_sequence = 42; ipm_msg->message_sequence = 42;
ipm_msg->publisher_id = 42; ipm_msg->publisher_id = 42;
rcl_interfaces::msg::IntraProcessMessage::UniquePtr unique_msg; rcl_interfaces::msg::IntraProcessMessage::UniquePtr unique_msg(
unique_msg.reset(new rcl_interfaces::msg::IntraProcessMessage(*ipm_msg)); new rcl_interfaces::msg::IntraProcessMessage(*ipm_msg)
);
auto p1_m1_original_address = unique_msg.get(); auto p1_m1_original_address = unique_msg.get();
auto p1_m1_id = ipm.store_intra_process_message(p1_id, unique_msg); auto p1_m1_id = ipm.store_intra_process_message(p1_id, unique_msg);
@ -159,7 +190,7 @@ Simulates the case where a publisher is removed between publishing and the match
- Remove the publisher. - Remove the publisher.
- Try's to take the message, should fail since the publisher (and its storage) is gone. - Try's to take the message, should fail since the publisher (and its storage) is gone.
*/ */
TEST(test_intra_process_manager, remove_publisher_before_trying_to_take) { TEST(TestIntraProcessManager, remove_publisher_before_trying_to_take) {
rclcpp::intra_process_manager::IntraProcessManager ipm; rclcpp::intra_process_manager::IntraProcessManager ipm;
auto p1 = std::make_shared<rclcpp::publisher::mock::Publisher>(); auto p1 = std::make_shared<rclcpp::publisher::mock::Publisher>();
@ -176,8 +207,9 @@ TEST(test_intra_process_manager, remove_publisher_before_trying_to_take) {
auto ipm_msg = std::make_shared<rcl_interfaces::msg::IntraProcessMessage>(); auto ipm_msg = std::make_shared<rcl_interfaces::msg::IntraProcessMessage>();
ipm_msg->message_sequence = 42; ipm_msg->message_sequence = 42;
ipm_msg->publisher_id = 42; ipm_msg->publisher_id = 42;
rcl_interfaces::msg::IntraProcessMessage::UniquePtr unique_msg; rcl_interfaces::msg::IntraProcessMessage::UniquePtr unique_msg(
unique_msg.reset(new rcl_interfaces::msg::IntraProcessMessage(*ipm_msg)); new rcl_interfaces::msg::IntraProcessMessage(*ipm_msg)
);
auto p1_m1_id = ipm.store_intra_process_message(p1_id, unique_msg); auto p1_m1_id = ipm.store_intra_process_message(p1_id, unique_msg);
ASSERT_EQ(nullptr, unique_msg); ASSERT_EQ(nullptr, unique_msg);
@ -197,7 +229,7 @@ Tests whether or not removed subscriptions affect take behavior.
- Take with the final subscription, should work. - Take with the final subscription, should work.
- Assert the previous take returned ownership of the original object published. - Assert the previous take returned ownership of the original object published.
*/ */
TEST(test_intra_process_manager, removed_subscription_affects_take) { TEST(TestIntraProcessManager, removed_subscription_affects_take) {
rclcpp::intra_process_manager::IntraProcessManager ipm; rclcpp::intra_process_manager::IntraProcessManager ipm;
auto p1 = std::make_shared<rclcpp::publisher::mock::Publisher>(); auto p1 = std::make_shared<rclcpp::publisher::mock::Publisher>();
@ -224,8 +256,9 @@ TEST(test_intra_process_manager, removed_subscription_affects_take) {
auto ipm_msg = std::make_shared<rcl_interfaces::msg::IntraProcessMessage>(); auto ipm_msg = std::make_shared<rcl_interfaces::msg::IntraProcessMessage>();
ipm_msg->message_sequence = 42; ipm_msg->message_sequence = 42;
ipm_msg->publisher_id = 42; ipm_msg->publisher_id = 42;
rcl_interfaces::msg::IntraProcessMessage::UniquePtr unique_msg; rcl_interfaces::msg::IntraProcessMessage::UniquePtr unique_msg(
unique_msg.reset(new rcl_interfaces::msg::IntraProcessMessage(*ipm_msg)); new rcl_interfaces::msg::IntraProcessMessage(*ipm_msg)
);
auto original_message_pointer = unique_msg.get(); auto original_message_pointer = unique_msg.get();
auto p1_m1_id = ipm.store_intra_process_message(p1_id, unique_msg); auto p1_m1_id = ipm.store_intra_process_message(p1_id, unique_msg);
@ -264,7 +297,7 @@ This tests normal operation with multiple subscriptions and one publisher.
- Publish a message. - Publish a message.
- Take with each subscription, checking that the last takes the original back. - Take with each subscription, checking that the last takes the original back.
*/ */
TEST(test_intra_process_manager, multiple_subscriptions_one_publisher) { TEST(TestIntraProcessManager, multiple_subscriptions_one_publisher) {
rclcpp::intra_process_manager::IntraProcessManager ipm; rclcpp::intra_process_manager::IntraProcessManager ipm;
auto p1 = std::make_shared<rclcpp::publisher::mock::Publisher>(); auto p1 = std::make_shared<rclcpp::publisher::mock::Publisher>();
@ -291,8 +324,9 @@ TEST(test_intra_process_manager, multiple_subscriptions_one_publisher) {
auto ipm_msg = std::make_shared<rcl_interfaces::msg::IntraProcessMessage>(); auto ipm_msg = std::make_shared<rcl_interfaces::msg::IntraProcessMessage>();
ipm_msg->message_sequence = 42; ipm_msg->message_sequence = 42;
ipm_msg->publisher_id = 42; ipm_msg->publisher_id = 42;
rcl_interfaces::msg::IntraProcessMessage::UniquePtr unique_msg; rcl_interfaces::msg::IntraProcessMessage::UniquePtr unique_msg(
unique_msg.reset(new rcl_interfaces::msg::IntraProcessMessage(*ipm_msg)); new rcl_interfaces::msg::IntraProcessMessage(*ipm_msg)
);
auto original_message_pointer = unique_msg.get(); auto original_message_pointer = unique_msg.get();
auto p1_m1_id = ipm.store_intra_process_message(p1_id, unique_msg); auto p1_m1_id = ipm.store_intra_process_message(p1_id, unique_msg);
@ -332,7 +366,7 @@ This tests normal operation with multiple publishers and one subscription.
- Publish a message. - Publish a message.
- Take with each subscription, checking that the last takes the original back. - Take with each subscription, checking that the last takes the original back.
*/ */
TEST(test_intra_process_manager, multiple_publishers_one_subscription) { TEST(TestIntraProcessManager, multiple_publishers_one_subscription) {
rclcpp::intra_process_manager::IntraProcessManager ipm; rclcpp::intra_process_manager::IntraProcessManager ipm;
auto p1 = std::make_shared<rclcpp::publisher::mock::Publisher>(); auto p1 = std::make_shared<rclcpp::publisher::mock::Publisher>();
@ -360,8 +394,9 @@ TEST(test_intra_process_manager, multiple_publishers_one_subscription) {
// First publish // First publish
ipm_msg->message_sequence = 42; ipm_msg->message_sequence = 42;
ipm_msg->publisher_id = 42; ipm_msg->publisher_id = 42;
rcl_interfaces::msg::IntraProcessMessage::UniquePtr unique_msg; rcl_interfaces::msg::IntraProcessMessage::UniquePtr unique_msg(
unique_msg.reset(new rcl_interfaces::msg::IntraProcessMessage(*ipm_msg)); new rcl_interfaces::msg::IntraProcessMessage(*ipm_msg)
);
auto original_message_pointer1 = unique_msg.get(); auto original_message_pointer1 = unique_msg.get();
auto p1_m1_id = ipm.store_intra_process_message(p1_id, unique_msg); auto p1_m1_id = ipm.store_intra_process_message(p1_id, unique_msg);
@ -422,7 +457,7 @@ This tests normal operation with multiple publishers and subscriptions.
- Publish a message on each publisher. - Publish a message on each publisher.
- Take from each publisher with each subscription, checking the pointer. - Take from each publisher with each subscription, checking the pointer.
*/ */
TEST(test_intra_process_manager, multiple_publishers_multiple_subscription) { TEST(TestIntraProcessManager, multiple_publishers_multiple_subscription) {
rclcpp::intra_process_manager::IntraProcessManager ipm; rclcpp::intra_process_manager::IntraProcessManager ipm;
auto p1 = std::make_shared<rclcpp::publisher::mock::Publisher>(); auto p1 = std::make_shared<rclcpp::publisher::mock::Publisher>();
@ -460,8 +495,9 @@ TEST(test_intra_process_manager, multiple_publishers_multiple_subscription) {
// First publish // First publish
ipm_msg->message_sequence = 42; ipm_msg->message_sequence = 42;
ipm_msg->publisher_id = 42; ipm_msg->publisher_id = 42;
rcl_interfaces::msg::IntraProcessMessage::UniquePtr unique_msg; rcl_interfaces::msg::IntraProcessMessage::UniquePtr unique_msg(
unique_msg.reset(new rcl_interfaces::msg::IntraProcessMessage(*ipm_msg)); new rcl_interfaces::msg::IntraProcessMessage(*ipm_msg)
);
auto original_message_pointer1 = unique_msg.get(); auto original_message_pointer1 = unique_msg.get();
auto p1_m1_id = ipm.store_intra_process_message(p1_id, unique_msg); auto p1_m1_id = ipm.store_intra_process_message(p1_id, unique_msg);
@ -579,7 +615,7 @@ Tests displacing a message from the ring buffer before take is called.
- Publish a message. - Publish a message.
- Try to take the first message, should fail. - Try to take the first message, should fail.
*/ */
TEST(test_intra_process_manager, ring_buffer_displacement) { TEST(TestIntraProcessManager, ring_buffer_displacement) {
rclcpp::intra_process_manager::IntraProcessManager ipm; rclcpp::intra_process_manager::IntraProcessManager ipm;
auto p1 = std::make_shared<rclcpp::publisher::mock::Publisher>(); auto p1 = std::make_shared<rclcpp::publisher::mock::Publisher>();
@ -596,8 +632,9 @@ TEST(test_intra_process_manager, ring_buffer_displacement) {
auto ipm_msg = std::make_shared<rcl_interfaces::msg::IntraProcessMessage>(); auto ipm_msg = std::make_shared<rcl_interfaces::msg::IntraProcessMessage>();
ipm_msg->message_sequence = 42; ipm_msg->message_sequence = 42;
ipm_msg->publisher_id = 42; ipm_msg->publisher_id = 42;
rcl_interfaces::msg::IntraProcessMessage::UniquePtr unique_msg; rcl_interfaces::msg::IntraProcessMessage::UniquePtr unique_msg(
unique_msg.reset(new rcl_interfaces::msg::IntraProcessMessage(*ipm_msg)); new rcl_interfaces::msg::IntraProcessMessage(*ipm_msg)
);
auto original_message_pointer1 = unique_msg.get(); auto original_message_pointer1 = unique_msg.get();
auto p1_m1_id = ipm.store_intra_process_message(p1_id, unique_msg); auto p1_m1_id = ipm.store_intra_process_message(p1_id, unique_msg);
@ -646,7 +683,7 @@ Simulates race condition where a subscription is created after publish.
- Create a subscription on the same topic. - Create a subscription on the same topic.
- Try to take the message with the newly created subscription, should fail. - Try to take the message with the newly created subscription, should fail.
*/ */
TEST(test_intra_process_manager, subscription_creation_race_condition) { TEST(TestIntraProcessManager, subscription_creation_race_condition) {
rclcpp::intra_process_manager::IntraProcessManager ipm; rclcpp::intra_process_manager::IntraProcessManager ipm;
auto p1 = std::make_shared<rclcpp::publisher::mock::Publisher>(); auto p1 = std::make_shared<rclcpp::publisher::mock::Publisher>();
@ -658,8 +695,9 @@ TEST(test_intra_process_manager, subscription_creation_race_condition) {
auto ipm_msg = std::make_shared<rcl_interfaces::msg::IntraProcessMessage>(); auto ipm_msg = std::make_shared<rcl_interfaces::msg::IntraProcessMessage>();
ipm_msg->message_sequence = 42; ipm_msg->message_sequence = 42;
ipm_msg->publisher_id = 42; ipm_msg->publisher_id = 42;
rcl_interfaces::msg::IntraProcessMessage::UniquePtr unique_msg; rcl_interfaces::msg::IntraProcessMessage::UniquePtr unique_msg(
unique_msg.reset(new rcl_interfaces::msg::IntraProcessMessage(*ipm_msg)); new rcl_interfaces::msg::IntraProcessMessage(*ipm_msg)
);
auto p1_m1_id = ipm.store_intra_process_message(p1_id, unique_msg); auto p1_m1_id = ipm.store_intra_process_message(p1_id, unique_msg);
ASSERT_EQ(nullptr, unique_msg); ASSERT_EQ(nullptr, unique_msg);
@ -682,7 +720,7 @@ Simulates race condition where a publisher goes out of scope before take.
- Let the scope expire. - Let the scope expire.
- Try to take the message with the subscription, should fail. - Try to take the message with the subscription, should fail.
*/ */
TEST(test_intra_process_manager, publisher_out_of_scope_take) { TEST(TestIntraProcessManager, publisher_out_of_scope_take) {
rclcpp::intra_process_manager::IntraProcessManager ipm; rclcpp::intra_process_manager::IntraProcessManager ipm;
auto s1 = std::make_shared<rclcpp::subscription::mock::SubscriptionBase>(); auto s1 = std::make_shared<rclcpp::subscription::mock::SubscriptionBase>();
@ -703,8 +741,9 @@ TEST(test_intra_process_manager, publisher_out_of_scope_take) {
auto ipm_msg = std::make_shared<rcl_interfaces::msg::IntraProcessMessage>(); auto ipm_msg = std::make_shared<rcl_interfaces::msg::IntraProcessMessage>();
ipm_msg->message_sequence = 42; ipm_msg->message_sequence = 42;
ipm_msg->publisher_id = 42; ipm_msg->publisher_id = 42;
rcl_interfaces::msg::IntraProcessMessage::UniquePtr unique_msg; rcl_interfaces::msg::IntraProcessMessage::UniquePtr unique_msg(
unique_msg.reset(new rcl_interfaces::msg::IntraProcessMessage(*ipm_msg)); new rcl_interfaces::msg::IntraProcessMessage(*ipm_msg)
);
p1_m1_id = ipm.store_intra_process_message(p1_id, unique_msg); p1_m1_id = ipm.store_intra_process_message(p1_id, unique_msg);
ASSERT_EQ(nullptr, unique_msg); ASSERT_EQ(nullptr, unique_msg);
@ -725,7 +764,7 @@ Simulates race condition where a publisher goes out of scope before store.
- Let the scope expire. - Let the scope expire.
- Publish a message on the publisher in a scope, should throw. - Publish a message on the publisher in a scope, should throw.
*/ */
TEST(test_intra_process_manager, publisher_out_of_scope_store) { TEST(TestIntraProcessManager, publisher_out_of_scope_store) {
rclcpp::intra_process_manager::IntraProcessManager ipm; rclcpp::intra_process_manager::IntraProcessManager ipm;
uint64_t p1_id; uint64_t p1_id;
@ -740,8 +779,9 @@ TEST(test_intra_process_manager, publisher_out_of_scope_store) {
auto ipm_msg = std::make_shared<rcl_interfaces::msg::IntraProcessMessage>(); auto ipm_msg = std::make_shared<rcl_interfaces::msg::IntraProcessMessage>();
ipm_msg->message_sequence = 42; ipm_msg->message_sequence = 42;
ipm_msg->publisher_id = 42; ipm_msg->publisher_id = 42;
rcl_interfaces::msg::IntraProcessMessage::UniquePtr unique_msg; rcl_interfaces::msg::IntraProcessMessage::UniquePtr unique_msg(
unique_msg.reset(new rcl_interfaces::msg::IntraProcessMessage(*ipm_msg)); new rcl_interfaces::msg::IntraProcessMessage(*ipm_msg)
);
EXPECT_THROW(ipm.store_intra_process_message(p1_id, unique_msg), std::runtime_error); EXPECT_THROW(ipm.store_intra_process_message(p1_id, unique_msg), std::runtime_error);
ASSERT_EQ(nullptr, unique_msg); ASSERT_EQ(nullptr, unique_msg);

View file

@ -19,7 +19,7 @@
/* /*
Tests get_copy and pop on an empty mrb. Tests get_copy and pop on an empty mrb.
*/ */
TEST(test_mapped_ring_buffer, empty) { TEST(TestMappedRingBuffer, empty) {
// Cannot create a buffer of size zero. // Cannot create a buffer of size zero.
EXPECT_THROW(rclcpp::mapped_ring_buffer::MappedRingBuffer<char> mrb(0), std::invalid_argument); EXPECT_THROW(rclcpp::mapped_ring_buffer::MappedRingBuffer<char> mrb(0), std::invalid_argument);
// Getting or popping an empty buffer should result in a nullptr. // Getting or popping an empty buffer should result in a nullptr.
@ -36,7 +36,7 @@ TEST(test_mapped_ring_buffer, empty) {
/* /*
Tests push_and_replace with a temporary object. Tests push_and_replace with a temporary object.
*/ */
TEST(test_mapped_ring_buffer, temporary_l_value) { TEST(TestMappedRingBuffer, temporary_l_value) {
rclcpp::mapped_ring_buffer::MappedRingBuffer<char> mrb(2); rclcpp::mapped_ring_buffer::MappedRingBuffer<char> mrb(2);
// Pass in value with temporary object // Pass in value with temporary object
mrb.push_and_replace(1, std::unique_ptr<char>(new char('a'))); mrb.push_and_replace(1, std::unique_ptr<char>(new char('a')));
@ -55,7 +55,7 @@ TEST(test_mapped_ring_buffer, temporary_l_value) {
/* /*
Tests normal usage of the mrb. Tests normal usage of the mrb.
*/ */
TEST(test_mapped_ring_buffer, nominal) { TEST(TestMappedRingBuffer, nominal) {
rclcpp::mapped_ring_buffer::MappedRingBuffer<char> mrb(2); rclcpp::mapped_ring_buffer::MappedRingBuffer<char> mrb(2);
std::unique_ptr<char> expected(new char('a')); std::unique_ptr<char> expected(new char('a'));
// Store expected value's address for later comparison. // Store expected value's address for later comparison.
@ -66,12 +66,16 @@ TEST(test_mapped_ring_buffer, nominal) {
std::unique_ptr<char> actual; std::unique_ptr<char> actual;
mrb.get_copy_at_key(1, actual); mrb.get_copy_at_key(1, actual);
EXPECT_NE(nullptr, actual); EXPECT_NE(nullptr, actual);
if (actual) EXPECT_EQ('a', *actual); if (actual) {
EXPECT_EQ('a', *actual);
}
EXPECT_NE(expected_orig, actual.get()); EXPECT_NE(expected_orig, actual.get());
mrb.pop_at_key(1, actual); mrb.pop_at_key(1, actual);
EXPECT_NE(nullptr, actual); EXPECT_NE(nullptr, actual);
if (actual) EXPECT_EQ('a', *actual); if (actual) {
EXPECT_EQ('a', *actual);
}
EXPECT_EQ(expected_orig, actual.get()); EXPECT_EQ(expected_orig, actual.get());
mrb.get_copy_at_key(1, actual); mrb.get_copy_at_key(1, actual);
@ -91,17 +95,21 @@ TEST(test_mapped_ring_buffer, nominal) {
mrb.get_copy_at_key(2, actual); mrb.get_copy_at_key(2, actual);
EXPECT_NE(nullptr, actual); EXPECT_NE(nullptr, actual);
if (actual) EXPECT_EQ('b', *actual); if (actual) {
EXPECT_EQ('b', *actual);
}
mrb.get_copy_at_key(3, actual); mrb.get_copy_at_key(3, actual);
EXPECT_NE(nullptr, actual); EXPECT_NE(nullptr, actual);
if (actual) EXPECT_EQ('c', *actual); if (actual) {
EXPECT_EQ('c', *actual);
}
} }
/* /*
Tests get_ownership on a normal mrb. Tests get_ownership on a normal mrb.
*/ */
TEST(test_mapped_ring_buffer, get_ownership) { TEST(TestMappedRingBuffer, get_ownership) {
rclcpp::mapped_ring_buffer::MappedRingBuffer<char> mrb(2); rclcpp::mapped_ring_buffer::MappedRingBuffer<char> mrb(2);
std::unique_ptr<char> expected(new char('a')); std::unique_ptr<char> expected(new char('a'));
// Store expected value's address for later comparison. // Store expected value's address for later comparison.
@ -112,17 +120,23 @@ TEST(test_mapped_ring_buffer, get_ownership) {
std::unique_ptr<char> actual; std::unique_ptr<char> actual;
mrb.get_copy_at_key(1, actual); mrb.get_copy_at_key(1, actual);
EXPECT_NE(nullptr, actual); EXPECT_NE(nullptr, actual);
if (actual) EXPECT_EQ('a', *actual); if (actual) {
EXPECT_EQ('a', *actual);
}
EXPECT_NE(expected_orig, actual.get()); EXPECT_NE(expected_orig, actual.get());
mrb.get_ownership_at_key(1, actual); mrb.get_ownership_at_key(1, actual);
EXPECT_NE(nullptr, actual); EXPECT_NE(nullptr, actual);
if (actual) EXPECT_EQ('a', *actual); if (actual) {
EXPECT_EQ('a', *actual);
}
EXPECT_EQ(expected_orig, actual.get()); EXPECT_EQ(expected_orig, actual.get());
mrb.pop_at_key(1, actual); mrb.pop_at_key(1, actual);
EXPECT_NE(nullptr, actual); EXPECT_NE(nullptr, actual);
if (actual) EXPECT_EQ('a', *actual); // The value should be the same. if (actual) {
EXPECT_EQ('a', *actual); // The value should be the same.
}
EXPECT_NE(expected_orig, actual.get()); // Even though we pop'ed, we didn't get the original. EXPECT_NE(expected_orig, actual.get()); // Even though we pop'ed, we didn't get the original.
mrb.get_copy_at_key(1, actual); mrb.get_copy_at_key(1, actual);
@ -132,7 +146,7 @@ TEST(test_mapped_ring_buffer, get_ownership) {
/* /*
Tests the affect of reusing keys (non-unique keys) in a mrb. Tests the affect of reusing keys (non-unique keys) in a mrb.
*/ */
TEST(test_mapped_ring_buffer, non_unique_keys) { TEST(TestMappedRingBuffer, non_unique_keys) {
rclcpp::mapped_ring_buffer::MappedRingBuffer<char> mrb(2); rclcpp::mapped_ring_buffer::MappedRingBuffer<char> mrb(2);
std::unique_ptr<char> input(new char('a')); std::unique_ptr<char> input(new char('a'));
@ -145,10 +159,14 @@ TEST(test_mapped_ring_buffer, non_unique_keys) {
std::unique_ptr<char> actual; std::unique_ptr<char> actual;
mrb.pop_at_key(1, actual); mrb.pop_at_key(1, actual);
EXPECT_NE(nullptr, actual); EXPECT_NE(nullptr, actual);
if (actual) EXPECT_EQ('a', *actual); if (actual) {
EXPECT_EQ('a', *actual);
}
actual = nullptr; actual = nullptr;
mrb.pop_at_key(1, actual); mrb.pop_at_key(1, actual);
EXPECT_NE(nullptr, actual); EXPECT_NE(nullptr, actual);
if (actual) EXPECT_EQ('b', *actual); if (actual) {
EXPECT_EQ('b', *actual);
}
} }