Check QoS policy when configuring intraprocess, skip interprocess publish when possible (#674)

* Only setup intraprocess if 'durability' qos policy is 'volatile'.

Signed-off-by: ivanpauno <ivanpauno@ekumenlabs.com>

* Skip interprocess publish when only having intraprocess subscriptions.

Signed-off-by: ivanpauno <ivanpauno@ekumenlabs.com>

* Add intraprocess configuration option at publisher/subscription level

Signed-off-by: ivanpauno <ivanpauno@ekumenlabs.com>

* Use get_actual_qos when setting-up intraprocess. Add test.

Signed-off-by: ivanpauno <ivanpauno@ekumenlabs.com>
This commit is contained in:
ivanpauno 2019-04-03 18:03:10 -03:00 committed by GitHub
parent 8783cdcf96
commit d11a10a583
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 102 additions and 21 deletions

View file

@ -64,6 +64,17 @@
namespace rclcpp namespace rclcpp
{ {
/// Used as argument in create_publisher and create_subscriber.
enum class IntraProcessSetting
{
/// Explicitly enable intraprocess comm at publisher/subscription level.
Enable,
/// Explicitly disable intraprocess comm at publisher/subscription level.
Disable,
/// Take intraprocess configuration from the node.
NodeDefault
};
/// Node is the single point of entry for creating publishers and subscribers. /// Node is the single point of entry for creating publishers and subscribers.
class Node : public std::enable_shared_from_this<Node> class Node : public std::enable_shared_from_this<Node>
{ {
@ -152,7 +163,8 @@ public:
std::shared_ptr<PublisherT> std::shared_ptr<PublisherT>
create_publisher( create_publisher(
const std::string & topic_name, size_t qos_history_depth, const std::string & topic_name, size_t qos_history_depth,
std::shared_ptr<Alloc> allocator = nullptr); std::shared_ptr<Alloc> allocator = nullptr,
IntraProcessSetting use_intra_process_comm = IntraProcessSetting::NodeDefault);
/// Create and return a Publisher. /// Create and return a Publisher.
/** /**
@ -168,7 +180,8 @@ public:
create_publisher( create_publisher(
const std::string & topic_name, const std::string & topic_name,
const rmw_qos_profile_t & qos_profile = rmw_qos_profile_default, const rmw_qos_profile_t & qos_profile = rmw_qos_profile_default,
std::shared_ptr<Alloc> allocator = nullptr); std::shared_ptr<Alloc> allocator = nullptr,
IntraProcessSetting use_intra_process_comm = IntraProcessSetting::NodeDefault);
/// Create and return a Subscription. /// Create and return a Subscription.
/** /**
@ -201,7 +214,8 @@ public:
typename rclcpp::message_memory_strategy::MessageMemoryStrategy< typename rclcpp::message_memory_strategy::MessageMemoryStrategy<
typename rclcpp::subscription_traits::has_message_type<CallbackT>::type, Alloc>::SharedPtr typename rclcpp::subscription_traits::has_message_type<CallbackT>::type, Alloc>::SharedPtr
msg_mem_strat = nullptr, msg_mem_strat = nullptr,
std::shared_ptr<Alloc> allocator = nullptr); std::shared_ptr<Alloc> allocator = nullptr,
IntraProcessSetting use_intra_process_comm = IntraProcessSetting::NodeDefault);
/// Create and return a Subscription. /// Create and return a Subscription.
/** /**
@ -234,7 +248,8 @@ public:
typename rclcpp::message_memory_strategy::MessageMemoryStrategy< typename rclcpp::message_memory_strategy::MessageMemoryStrategy<
typename rclcpp::subscription_traits::has_message_type<CallbackT>::type, Alloc>::SharedPtr typename rclcpp::subscription_traits::has_message_type<CallbackT>::type, Alloc>::SharedPtr
msg_mem_strat = nullptr, msg_mem_strat = nullptr,
std::shared_ptr<Alloc> allocator = nullptr); std::shared_ptr<Alloc> allocator = nullptr,
IntraProcessSetting use_intra_process_comm = IntraProcessSetting::NodeDefault);
/// Create a timer. /// Create a timer.
/** /**

View file

@ -55,14 +55,16 @@ template<typename MessageT, typename Alloc, typename PublisherT>
std::shared_ptr<PublisherT> std::shared_ptr<PublisherT>
Node::create_publisher( Node::create_publisher(
const std::string & topic_name, size_t qos_history_depth, const std::string & topic_name, size_t qos_history_depth,
std::shared_ptr<Alloc> allocator) std::shared_ptr<Alloc> allocator,
IntraProcessSetting use_intra_process_comm)
{ {
if (!allocator) { if (!allocator) {
allocator = std::make_shared<Alloc>(); allocator = std::make_shared<Alloc>();
} }
rmw_qos_profile_t qos = rmw_qos_profile_default; rmw_qos_profile_t qos = rmw_qos_profile_default;
qos.depth = qos_history_depth; qos.depth = qos_history_depth;
return this->create_publisher<MessageT, Alloc, PublisherT>(topic_name, qos, allocator); return this->create_publisher<MessageT, Alloc, PublisherT>(topic_name, qos,
allocator, use_intra_process_comm);
} }
RCLCPP_LOCAL RCLCPP_LOCAL
@ -81,17 +83,33 @@ template<typename MessageT, typename Alloc, typename PublisherT>
std::shared_ptr<PublisherT> std::shared_ptr<PublisherT>
Node::create_publisher( Node::create_publisher(
const std::string & topic_name, const rmw_qos_profile_t & qos_profile, const std::string & topic_name, const rmw_qos_profile_t & qos_profile,
std::shared_ptr<Alloc> allocator) std::shared_ptr<Alloc> allocator, IntraProcessSetting use_intra_process_comm)
{ {
if (!allocator) { if (!allocator) {
allocator = std::make_shared<Alloc>(); allocator = std::make_shared<Alloc>();
} }
bool use_intra_process;
switch (use_intra_process_comm) {
case IntraProcessSetting::Enable:
use_intra_process = true;
break;
case IntraProcessSetting::Disable:
use_intra_process = false;
break;
case IntraProcessSetting::NodeDefault:
use_intra_process = this->get_node_options().use_intra_process_comms();
break;
default:
throw std::runtime_error("Unrecognized IntraProcessSetting value");
break;
}
return rclcpp::create_publisher<MessageT, Alloc, PublisherT>( return rclcpp::create_publisher<MessageT, Alloc, PublisherT>(
this->node_topics_.get(), this->node_topics_.get(),
extend_name_with_sub_namespace(topic_name, this->get_sub_namespace()), extend_name_with_sub_namespace(topic_name, this->get_sub_namespace()),
qos_profile, qos_profile,
this->get_node_options().use_intra_process_comms(), use_intra_process,
allocator); allocator);
} }
@ -110,7 +128,8 @@ Node::create_subscription(
typename rclcpp::message_memory_strategy::MessageMemoryStrategy< typename rclcpp::message_memory_strategy::MessageMemoryStrategy<
typename rclcpp::subscription_traits::has_message_type<CallbackT>::type, Alloc>::SharedPtr typename rclcpp::subscription_traits::has_message_type<CallbackT>::type, Alloc>::SharedPtr
msg_mem_strat, msg_mem_strat,
std::shared_ptr<Alloc> allocator) std::shared_ptr<Alloc> allocator,
IntraProcessSetting use_intra_process_comm)
{ {
using CallbackMessageT = typename rclcpp::subscription_traits::has_message_type<CallbackT>::type; using CallbackMessageT = typename rclcpp::subscription_traits::has_message_type<CallbackT>::type;
@ -123,6 +142,22 @@ Node::create_subscription(
msg_mem_strat = MessageMemoryStrategy<CallbackMessageT, Alloc>::create_default(); msg_mem_strat = MessageMemoryStrategy<CallbackMessageT, Alloc>::create_default();
} }
bool use_intra_process;
switch (use_intra_process_comm) {
case IntraProcessSetting::Enable:
use_intra_process = true;
break;
case IntraProcessSetting::Disable:
use_intra_process = false;
break;
case IntraProcessSetting::NodeDefault:
use_intra_process = this->get_node_options().use_intra_process_comms();
break;
default:
throw std::runtime_error("Unrecognized IntraProcessSetting value");
break;
}
return rclcpp::create_subscription<MessageT, CallbackT, Alloc, CallbackMessageT, SubscriptionT>( return rclcpp::create_subscription<MessageT, CallbackT, Alloc, CallbackMessageT, SubscriptionT>(
this->node_topics_.get(), this->node_topics_.get(),
extend_name_with_sub_namespace(topic_name, this->get_sub_namespace()), extend_name_with_sub_namespace(topic_name, this->get_sub_namespace()),
@ -130,7 +165,7 @@ Node::create_subscription(
qos_profile, qos_profile,
group, group,
ignore_local_publications, ignore_local_publications,
this->get_node_options().use_intra_process_comms(), use_intra_process,
msg_mem_strat, msg_mem_strat,
allocator); allocator);
} }
@ -150,7 +185,8 @@ Node::create_subscription(
typename rclcpp::message_memory_strategy::MessageMemoryStrategy< typename rclcpp::message_memory_strategy::MessageMemoryStrategy<
typename rclcpp::subscription_traits::has_message_type<CallbackT>::type, Alloc>::SharedPtr typename rclcpp::subscription_traits::has_message_type<CallbackT>::type, Alloc>::SharedPtr
msg_mem_strat, msg_mem_strat,
std::shared_ptr<Alloc> allocator) std::shared_ptr<Alloc> allocator,
IntraProcessSetting use_intra_process_comm)
{ {
rmw_qos_profile_t qos = rmw_qos_profile_default; rmw_qos_profile_t qos = rmw_qos_profile_default;
qos.depth = qos_history_depth; qos.depth = qos_history_depth;
@ -162,7 +198,8 @@ Node::create_subscription(
group, group,
ignore_local_publications, ignore_local_publications,
msg_mem_strat, msg_mem_strat,
allocator); allocator,
use_intra_process_comm);
} }
template<typename DurationRepT, typename DurationT, typename CallbackT> template<typename DurationRepT, typename DurationT, typename CallbackT>

View file

@ -183,7 +183,7 @@ protected:
using IntraProcessManagerWeakPtr = using IntraProcessManagerWeakPtr =
std::weak_ptr<rclcpp::intra_process_manager::IntraProcessManager>; std::weak_ptr<rclcpp::intra_process_manager::IntraProcessManager>;
bool use_intra_process_; bool intra_process_is_enabled_;
IntraProcessManagerWeakPtr weak_ipm_; IntraProcessManagerWeakPtr weak_ipm_;
uint64_t intra_process_publisher_id_; uint64_t intra_process_publisher_id_;
StoreMessageCallbackT store_intra_process_message_; StoreMessageCallbackT store_intra_process_message_;
@ -230,7 +230,11 @@ public:
virtual void virtual void
publish(std::unique_ptr<MessageT, MessageDeleter> & msg) publish(std::unique_ptr<MessageT, MessageDeleter> & msg)
{ {
bool inter_process_subscriptions_exist =
get_subscription_count() > get_intra_process_subscription_count();
if (!intra_process_is_enabled_ || inter_process_subscriptions_exist) {
this->do_inter_process_publish(msg.get()); this->do_inter_process_publish(msg.get());
}
if (store_intra_process_message_) { if (store_intra_process_message_) {
// Take the pointer from the unique_msg, release it and pass as a void * // Take the pointer from the unique_msg, release it and pass as a void *
// to the ipm. The ipm should then capture it again as a unique_ptr of // to the ipm. The ipm should then capture it again as a unique_ptr of

View file

@ -43,7 +43,7 @@ PublisherBase::PublisherBase(
const rosidl_message_type_support_t & type_support, const rosidl_message_type_support_t & type_support,
const rcl_publisher_options_t & publisher_options) const rcl_publisher_options_t & publisher_options)
: rcl_node_handle_(node_base->get_shared_rcl_node_handle()), : rcl_node_handle_(node_base->get_shared_rcl_node_handle()),
use_intra_process_(false), intra_process_publisher_id_(0), intra_process_is_enabled_(false), intra_process_publisher_id_(0),
store_intra_process_message_(nullptr) store_intra_process_message_(nullptr)
{ {
rcl_ret_t ret = rcl_publisher_init( rcl_ret_t ret = rcl_publisher_init(
@ -99,7 +99,7 @@ PublisherBase::~PublisherBase()
auto ipm = weak_ipm_.lock(); auto ipm = weak_ipm_.lock();
if (!use_intra_process_) { if (!intra_process_is_enabled_) {
return; return;
} }
if (!ipm) { if (!ipm) {
@ -183,13 +183,12 @@ size_t
PublisherBase::get_intra_process_subscription_count() const PublisherBase::get_intra_process_subscription_count() const
{ {
auto ipm = weak_ipm_.lock(); auto ipm = weak_ipm_.lock();
if (!use_intra_process_) { if (!intra_process_is_enabled_) {
return 0; return 0;
} }
if (!ipm) { if (!ipm) {
// TODO(ivanpauno): should this just return silently? Or maybe return with a warning? // TODO(ivanpauno): should this just return silently? Or maybe return with a warning?
// Same as wjwwood comment in publisher_factory create_shared_publish_callback. // Same as wjwwood comment in publisher_factory create_shared_publish_callback.
// If we don't raise an error here, use_intra_process_ flag is unnecessary.
throw std::runtime_error( throw std::runtime_error(
"intra process subscriber count called after " "intra process subscriber count called after "
"destruction of intra process manager"); "destruction of intra process manager");
@ -243,6 +242,11 @@ PublisherBase::setup_intra_process(
IntraProcessManagerSharedPtr ipm, IntraProcessManagerSharedPtr ipm,
const rcl_publisher_options_t & intra_process_options) const rcl_publisher_options_t & intra_process_options)
{ {
// Intraprocess configuration is not allowed with "durability" qos policy non "volatile".
if (this->get_actual_qos().durability != RMW_QOS_POLICY_DURABILITY_VOLATILE) {
throw exceptions::InvalidParametersException(
"intraprocess communication is not allowed with durability qos policy non-volatile");
}
const char * topic_name = this->get_topic_name(); const char * topic_name = this->get_topic_name();
if (!topic_name) { if (!topic_name) {
throw std::runtime_error("failed to get topic name"); throw std::runtime_error("failed to get topic name");
@ -273,7 +277,7 @@ PublisherBase::setup_intra_process(
intra_process_publisher_id_ = intra_process_publisher_id; intra_process_publisher_id_ = intra_process_publisher_id;
store_intra_process_message_ = store_callback; store_intra_process_message_ = store_callback;
weak_ipm_ = ipm; weak_ipm_ = ipm;
use_intra_process_ = true; intra_process_is_enabled_ = true;
// Life time of this object is tied to the publisher handle. // Life time of this object is tied to the publisher handle.
rmw_publisher_t * publisher_rmw_handle = rcl_publisher_get_rmw_handle( rmw_publisher_t * publisher_rmw_handle = rcl_publisher_get_rmw_handle(

View file

@ -30,9 +30,9 @@ protected:
rclcpp::init(0, nullptr); rclcpp::init(0, nullptr);
} }
void SetUp() void initialize(const rclcpp::NodeOptions & node_options = rclcpp::NodeOptions())
{ {
node = std::make_shared<rclcpp::Node>("my_node", "/ns"); node = std::make_shared<rclcpp::Node>("my_node", "/ns", node_options);
} }
void TearDown() void TearDown()
@ -69,6 +69,7 @@ protected:
Testing publisher construction and destruction. Testing publisher construction and destruction.
*/ */
TEST_F(TestPublisher, construction_and_destruction) { TEST_F(TestPublisher, construction_and_destruction) {
initialize();
using rcl_interfaces::msg::IntraProcessMessage; using rcl_interfaces::msg::IntraProcessMessage;
{ {
auto publisher = node->create_publisher<IntraProcessMessage>("topic"); auto publisher = node->create_publisher<IntraProcessMessage>("topic");
@ -81,6 +82,26 @@ TEST_F(TestPublisher, construction_and_destruction) {
} }
} }
/*
Testing publisher with intraprocess enabled and invalid QoS
*/
TEST_F(TestPublisher, intraprocess_with_invalid_qos) {
initialize(rclcpp::NodeOptions().use_intra_process_comms(true));
rmw_qos_profile_t qos = {
RMW_QOS_POLICY_HISTORY_KEEP_LAST,
1,
RMW_QOS_POLICY_RELIABILITY_RELIABLE,
RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL,
false
};
using rcl_interfaces::msg::IntraProcessMessage;
{
ASSERT_THROW(
{auto publisher = node->create_publisher<IntraProcessMessage>("topic", qos);},
rclcpp::exceptions::InvalidParametersException);
}
}
/* /*
Testing publisher construction and destruction for subnodes. Testing publisher construction and destruction for subnodes.
*/ */