Added count matching api and intra-process subscriber count (#628)

* Added count matching api to publishers. Also, internal method to count intra-process subscriptions

Signed-off-by: ivanpauno <ivanpauno@ekumenlabs.com>

* Addressed PR comments

Signed-off-by: ivanpauno <ivanpauno@ekumenlabs.com>

* Corrected error checking in publisher interprocess subscription count api. Minimal modifications in test

Signed-off-by: ivanpauno <ivanpauno@ekumenlabs.com>

* Moved intraprocess subscription count api to public. Started removing publishers and subscribers from ipm.

Signed-off-by: ivanpauno <ivanpauno@ekumenlabs.com>

* Added publisher count api in subscription class

Signed-off-by: ivanpauno <ivanpauno@ekumenlabs.com>

* Addressed PR comments

Signed-off-by: ivanpauno <ivanpauno@ekumenlabs.com>

* Addressed PR comments

Signed-off-by: ivanpauno <ivanpauno@ekumenlabs.com>

* Solved Wreorder

Signed-off-by: ivanpauno <ivanpauno@ekumenlabs.com>
This commit is contained in:
ivanpauno 2019-02-21 14:18:06 -03:00 committed by GitHub
parent ef5f3d3fc1
commit 8743bcb0a1
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
12 changed files with 508 additions and 5 deletions

View file

@ -248,6 +248,16 @@ if(BUILD_TESTING)
)
target_link_libraries(test_publisher ${PROJECT_NAME})
endif()
ament_add_gtest(test_publisher_subscription_count_api test/test_publisher_subscription_count_api.cpp)
if(TARGET test_publisher_subscription_count_api)
target_include_directories(test_publisher_subscription_count_api PUBLIC
${rcl_interfaces_INCLUDE_DIRS}
${rmw_INCLUDE_DIRS}
${rosidl_generator_cpp_INCLUDE_DIRS}
${rosidl_typesupport_cpp_INCLUDE_DIRS}
)
target_link_libraries(test_publisher_subscription_count_api ${PROJECT_NAME})
endif()
ament_add_gtest(test_rate test/test_rate.cpp)
if(TARGET test_rate)
target_include_directories(test_rate PUBLIC
@ -290,6 +300,16 @@ if(BUILD_TESTING)
)
target_link_libraries(test_subscription ${PROJECT_NAME})
endif()
ament_add_gtest(test_subscription_publisher_count_api test/test_subscription_publisher_count_api.cpp)
if(TARGET test_subscription_publisher_count_api)
target_include_directories(test_subscription_publisher_count_api PUBLIC
${rcl_interfaces_INCLUDE_DIRS}
${rmw_INCLUDE_DIRS}
${rosidl_generator_cpp_INCLUDE_DIRS}
${rosidl_typesupport_cpp_INCLUDE_DIRS}
)
target_link_libraries(test_subscription_publisher_count_api ${PROJECT_NAME})
endif()
find_package(test_msgs REQUIRED)
ament_add_gtest(test_subscription_traits test/test_subscription_traits.cpp)
if(TARGET test_subscription_traits)

View file

@ -346,6 +346,11 @@ public:
bool
matches_any_publishers(const rmw_gid_t * id) const;
/// Return the number of intraprocess subscriptions to a topic, given the publisher id.
RCLCPP_PUBLIC
size_t
get_subscription_count(uint64_t intra_process_publisher_id) const;
private:
RCLCPP_PUBLIC
static uint64_t

View file

@ -80,6 +80,9 @@ public:
virtual bool
matches_any_publishers(const rmw_gid_t * id) const = 0;
virtual size_t
get_subscription_count(uint64_t intra_process_publisher_id) const = 0;
private:
RCLCPP_DISABLE_COPY(IntraProcessManagerImplBase)
};
@ -248,6 +251,26 @@ public:
return false;
}
size_t
get_subscription_count(uint64_t intra_process_publisher_id) const
{
auto publisher_it = publishers_.find(intra_process_publisher_id);
if (publisher_it == publishers_.end()) {
// Publisher is either invalid or no longer exists.
return 0;
}
auto publisher = publisher_it->second.publisher.lock();
if (!publisher) {
throw std::runtime_error("publisher has unexpectedly gone out of scope");
}
auto sub_map_it = subscription_ids_by_topic_.find(publisher->get_topic_name());
if (sub_map_it == subscription_ids_by_topic_.end()) {
// No intraprocess subscribers
return 0;
}
return sub_map_it->second.size();
}
private:
RCLCPP_DISABLE_COPY(IntraProcessManagerImpl)

View file

@ -45,6 +45,15 @@ namespace node_interfaces
class NodeTopicsInterface;
}
namespace intra_process_manager
{
/**
* NOTE(ivanpauno): IntraProcessManager is forward declared here, avoiding a circular inclusion between intra_process_manager.hpp and publisher.hpp.
* SharedPtr and WeakPtr of the IntraProcessManager are defined again here, to avoid a warning for accessing a member of a forward declared class.
*/
class IntraProcessManager;
}
class PublisherBase
{
friend ::rclcpp::node_interfaces::NodeTopicsInterface;
@ -107,6 +116,18 @@ public:
const rcl_publisher_t *
get_publisher_handle() const;
/// Get subscription count
/** \return The number of subscriptions. */
RCLCPP_PUBLIC
size_t
get_subscription_count() const;
/// Get intraprocess subscription count
/** \return The number of intraprocess subscriptions. */
RCLCPP_PUBLIC
size_t
get_intra_process_subscription_count() const;
/// Compare this publisher to a gid.
/**
* Note that this function calls the next function.
@ -128,13 +149,16 @@ public:
operator==(const rmw_gid_t * gid) const;
using StoreMessageCallbackT = std::function<uint64_t(uint64_t, void *, const std::type_info &)>;
using IntraProcessManagerSharedPtr =
std::shared_ptr<rclcpp::intra_process_manager::IntraProcessManager>;
/// Implementation utility function used to setup intra process publishing after creation.
RCLCPP_PUBLIC
void
setup_intra_process(
uint64_t intra_process_publisher_id,
StoreMessageCallbackT callback,
StoreMessageCallbackT store_callback,
IntraProcessManagerSharedPtr ipm,
const rcl_publisher_options_t & intra_process_options);
protected:
@ -143,6 +167,10 @@ protected:
rcl_publisher_t publisher_handle_ = rcl_get_zero_initialized_publisher();
rcl_publisher_t intra_process_publisher_handle_ = rcl_get_zero_initialized_publisher();
using IntraProcessManagerWeakPtr =
std::weak_ptr<rclcpp::intra_process_manager::IntraProcessManager>;
bool use_intra_process_;
IntraProcessManagerWeakPtr weak_ipm_;
uint64_t intra_process_publisher_id_;
StoreMessageCallbackT store_intra_process_message_;

View file

@ -46,6 +46,15 @@ namespace node_interfaces
class NodeTopicsInterface;
} // namespace node_interfaces
namespace intra_process_manager
{
/**
* NOTE(ivanpauno): IntraProcessManager is forward declared here, avoiding a circular inclusion between intra_process_manager.hpp and publisher.hpp.
* SharedPtr and WeakPtr of the IntraProcessManager are defined again here, to avoid a warning for accessing a member of a forward declared class.
*/
class IntraProcessManager;
}
/// Virtual base class for subscriptions. This pattern allows us to iterate over different template
/// specializations of Subscription, among other things.
class SubscriptionBase
@ -129,11 +138,23 @@ public:
bool
is_serialized() const;
/// Get matching publisher count
/** \return The number of publishers on this topic. */
RCLCPP_PUBLIC
size_t
get_publisher_count() const;
protected:
std::shared_ptr<rcl_subscription_t> intra_process_subscription_handle_;
std::shared_ptr<rcl_subscription_t> subscription_handle_;
std::shared_ptr<rcl_node_t> node_handle_;
using IntraProcessManagerWeakPtr =
std::weak_ptr<rclcpp::intra_process_manager::IntraProcessManager>;
bool use_intra_process_;
IntraProcessManagerWeakPtr weak_ipm_;
uint64_t intra_process_subscription_id_;
private:
RCLCPP_DISABLE_COPY(SubscriptionBase)
@ -272,10 +293,13 @@ public:
using MatchesAnyPublishersCallbackType = std::function<bool (const rmw_gid_t *)>;
/// Implemenation detail.
// TODO(ivanpauno): This can be moved to the base class. No reason to be here.
// Also get_intra_process_message_callback_ and matches_any_intra_process_publishers_.
void setup_intra_process(
uint64_t intra_process_subscription_id,
GetMessageCallbackType get_message_callback,
MatchesAnyPublishersCallbackType matches_any_publisher_callback,
IntraProcessManagerWeakPtr weak_ipm,
const rcl_subscription_options_t & intra_process_options)
{
std::string intra_process_topic_name = std::string(get_topic_name()) + "/_intra";
@ -302,6 +326,8 @@ public:
intra_process_subscription_id_ = intra_process_subscription_id;
get_intra_process_message_callback_ = get_message_callback;
matches_any_intra_process_publishers_ = matches_any_publisher_callback;
weak_ipm_ = weak_ipm;
use_intra_process_ = true;
}
/// Implemenation detail.
@ -323,7 +349,6 @@ private:
GetMessageCallbackType get_intra_process_message_callback_;
MatchesAnyPublishersCallbackType matches_any_intra_process_publishers_;
uint64_t intra_process_subscription_id_;
};
} // namespace rclcpp

View file

@ -166,6 +166,7 @@ create_subscription_factory(
intra_process_subscription_id,
take_intra_process_message_func,
matches_any_publisher_func,
weak_ipm,
intra_process_options
);
};

View file

@ -56,6 +56,12 @@ IntraProcessManager::matches_any_publishers(const rmw_gid_t * id) const
return impl_->matches_any_publishers(id);
}
size_t
IntraProcessManager::get_subscription_count(uint64_t intra_process_publisher_id) const
{
return impl_->get_subscription_count(intra_process_publisher_id);
}
uint64_t
IntraProcessManager::get_next_unique_id()
{

View file

@ -54,6 +54,7 @@ NodeTopics::create_publisher(
publisher->setup_intra_process(
intra_process_publisher_id,
shared_publish_callback,
ipm,
publisher_options);
}

View file

@ -30,6 +30,7 @@
#include "rclcpp/allocator/allocator_common.hpp"
#include "rclcpp/allocator/allocator_deleter.hpp"
#include "rclcpp/exceptions.hpp"
#include "rclcpp/intra_process_manager.hpp"
#include "rclcpp/macros.hpp"
#include "rclcpp/node.hpp"
#include "rclcpp/expand_topic_or_service_name.hpp"
@ -42,7 +43,8 @@ PublisherBase::PublisherBase(
const rosidl_message_type_support_t & type_support,
const rcl_publisher_options_t & publisher_options)
: rcl_node_handle_(node_base->get_shared_rcl_node_handle()),
intra_process_publisher_id_(0), store_intra_process_message_(nullptr)
use_intra_process_(false), intra_process_publisher_id_(0),
store_intra_process_message_(nullptr)
{
rcl_ret_t ret = rcl_publisher_init(
&publisher_handle_,
@ -94,6 +96,20 @@ PublisherBase::~PublisherBase()
rcl_get_error_string().str);
rcl_reset_error();
}
auto ipm = weak_ipm_.lock();
if (!use_intra_process_) {
return;
}
if (!ipm) {
// TODO(ivanpauno): should this raise an error?
RCLCPP_WARN(
rclcpp::get_logger("rclcpp"),
"Intra process manager died before than a publisher.");
return;
}
ipm->remove_publisher(intra_process_publisher_id_);
}
const char *
@ -138,6 +154,49 @@ PublisherBase::get_publisher_handle() const
return &publisher_handle_;
}
size_t
PublisherBase::get_subscription_count() const
{
size_t inter_process_subscription_count = 0;
rmw_ret_t status = rcl_publisher_get_subscription_count(
&publisher_handle_,
&inter_process_subscription_count);
if (RCL_RET_PUBLISHER_INVALID == status) {
rcl_reset_error(); /* next call will reset error message if not context */
if (rcl_publisher_is_valid_except_context(&publisher_handle_)) {
rcl_context_t * context = rcl_publisher_get_context(&publisher_handle_);
if (nullptr != context && !rcl_context_is_valid(context)) {
/* publisher is invalid due to context being shutdown */
return 0;
}
}
}
if (RCL_RET_OK != status) {
rclcpp::exceptions::throw_from_rcl_error(status, "failed to get get subscription count");
}
return inter_process_subscription_count;
}
size_t
PublisherBase::get_intra_process_subscription_count() const
{
auto ipm = weak_ipm_.lock();
if (!use_intra_process_) {
return 0;
}
if (!ipm) {
// TODO(ivanpauno): should this just return silently? Or maybe return with a warning?
// Same as wjwwood comment in publisher_factory create_shared_publish_callback.
// If we don't raise an error here, use_intra_process_ flag is unnecessary.
throw std::runtime_error(
"intra process subscriber count called after "
"destruction of intra process manager");
}
return ipm->get_subscription_count(intra_process_publisher_id_);
}
bool
PublisherBase::operator==(const rmw_gid_t & gid) const
{
@ -168,7 +227,8 @@ PublisherBase::operator==(const rmw_gid_t * gid) const
void
PublisherBase::setup_intra_process(
uint64_t intra_process_publisher_id,
StoreMessageCallbackT callback,
StoreMessageCallbackT store_callback,
IntraProcessManagerSharedPtr ipm,
const rcl_publisher_options_t & intra_process_options)
{
const char * topic_name = this->get_topic_name();
@ -199,7 +259,10 @@ PublisherBase::setup_intra_process(
}
intra_process_publisher_id_ = intra_process_publisher_id;
store_intra_process_message_ = callback;
store_intra_process_message_ = store_callback;
weak_ipm_ = ipm;
use_intra_process_ = true;
// Life time of this object is tied to the publisher handle.
rmw_publisher_t * publisher_rmw_handle = rcl_publisher_get_rmw_handle(
&intra_process_publisher_handle_);

View file

@ -20,6 +20,7 @@
#include "rclcpp/exceptions.hpp"
#include "rclcpp/expand_topic_or_service_name.hpp"
#include "rclcpp/intra_process_manager.hpp"
#include "rclcpp/logging.hpp"
#include "rmw/error_handling.h"
@ -34,6 +35,8 @@ SubscriptionBase::SubscriptionBase(
const rcl_subscription_options_t & subscription_options,
bool is_serialized)
: node_handle_(node_handle),
use_intra_process_(false),
intra_process_subscription_id_(0),
type_support_(type_support_handle),
is_serialized_(is_serialized)
{
@ -80,6 +83,18 @@ SubscriptionBase::SubscriptionBase(
SubscriptionBase::~SubscriptionBase()
{
if (!use_intra_process_) {
return;
}
auto ipm = weak_ipm_.lock();
if (!ipm) {
// TODO(ivanpauno): should this raise an error?
RCLCPP_WARN(
rclcpp::get_logger("rclcpp"),
"Intra process manager died before than a subscription.");
return;
}
ipm->remove_subscription(intra_process_subscription_id_);
}
const char *
@ -117,3 +132,18 @@ SubscriptionBase::is_serialized() const
{
return is_serialized_;
}
size_t
SubscriptionBase::get_publisher_count() const
{
size_t inter_process_publisher_count = 0;
rmw_ret_t status = rcl_subscription_get_publisher_count(
subscription_handle_.get(),
&inter_process_publisher_count);
if (RCL_RET_OK != status) {
rclcpp::exceptions::throw_from_rcl_error(status, "failed to get get publisher count");
}
return inter_process_publisher_count;
}

View file

@ -0,0 +1,178 @@
// Copyright 2019 Open Source Robotics Foundation, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <gtest/gtest.h>
#include <iostream>
#include <string>
#include <memory>
#include "rclcpp/exceptions.hpp"
#include "rclcpp/publisher.hpp"
#include "rclcpp/rclcpp.hpp"
#include "rcl_interfaces/msg/intra_process_message.hpp"
using rcl_interfaces::msg::IntraProcessMessage;
/**
* Parameterized test.
* The first param are the NodeOptions used to create the nodes.
* The second param are the expect intraprocess count results.
*/
struct TestParameters
{
rclcpp::NodeOptions node_options[2];
uint64_t intraprocess_count_results[2];
std::string description;
};
std::ostream & operator<<(std::ostream & out, const TestParameters & params)
{
out << params.description;
return out;
}
class TestPublisherSubscriptionCount : public ::testing::TestWithParam<TestParameters>
{
public:
static void SetUpTestCase()
{
rclcpp::init(0, nullptr);
}
protected:
void SetUp() {}
void TearDown() {}
static std::chrono::milliseconds offset;
};
std::chrono::milliseconds TestPublisherSubscriptionCount::offset = std::chrono::milliseconds(2000);
void OnMessage(const rcl_interfaces::msg::IntraProcessMessage::SharedPtr msg)
{
(void)msg;
}
TEST_P(TestPublisherSubscriptionCount, increasing_and_decreasing_counts)
{
TestParameters parameters = GetParam();
rclcpp::Node::SharedPtr node = std::make_shared<rclcpp::Node>(
"my_node",
"/ns",
parameters.node_options[0]);
auto publisher = node->create_publisher<IntraProcessMessage>("/topic");
EXPECT_EQ(publisher->get_subscription_count(), 0u);
EXPECT_EQ(publisher->get_intra_process_subscription_count(), 0u);
{
auto sub = node->create_subscription<IntraProcessMessage>("/topic", &OnMessage);
rclcpp::sleep_for(offset);
EXPECT_EQ(publisher->get_subscription_count(), 1u);
EXPECT_EQ(
publisher->get_intra_process_subscription_count(),
parameters.intraprocess_count_results[0]);
{
rclcpp::Node::SharedPtr another_node = std::make_shared<rclcpp::Node>(
"another_node",
"/ns",
parameters.node_options[1]);
auto another_sub =
another_node->create_subscription<IntraProcessMessage>("/topic", &OnMessage);
rclcpp::sleep_for(offset);
EXPECT_EQ(publisher->get_subscription_count(), 2u);
EXPECT_EQ(
publisher->get_intra_process_subscription_count(),
parameters.intraprocess_count_results[1]);
}
rclcpp::sleep_for(offset);
EXPECT_EQ(publisher->get_subscription_count(), 1u);
EXPECT_EQ(
publisher->get_intra_process_subscription_count(),
parameters.intraprocess_count_results[0]);
}
/**
* Counts should be zero here, as all are subscriptions are out of scope.
* Subscriptions count checking is always preceeded with an sleep, as random failures had been
* detected without it. */
rclcpp::sleep_for(offset);
EXPECT_EQ(publisher->get_subscription_count(), 0u);
EXPECT_EQ(publisher->get_intra_process_subscription_count(), 0u);
}
auto get_new_context()
{
auto context = rclcpp::Context::make_shared();
context->init(0, nullptr);
return context;
}
TestParameters parameters[] = {
/*
Testing publisher subscription count api and internal process subscription count.
Two subscriptions in the same topic, both using intraprocess comm.
*/
{
{
rclcpp::NodeOptions().use_intra_process_comms(true),
rclcpp::NodeOptions().use_intra_process_comms(true)
},
{1u, 2u},
"two_subscriptions_intraprocess_comm"
},
/*
Testing publisher subscription count api and internal process subscription count.
Two subscriptions, one using intra-process comm and the other not using it.
*/
{
{
rclcpp::NodeOptions().use_intra_process_comms(true),
rclcpp::NodeOptions().use_intra_process_comms(false)
},
{1u, 1u},
"two_subscriptions_one_intraprocess_one_not"
},
/*
Testing publisher subscription count api and internal process subscription count.
Two contexts, both using intra-process.
*/
{
{
rclcpp::NodeOptions().use_intra_process_comms(true),
rclcpp::NodeOptions().context(get_new_context()).use_intra_process_comms(true)
},
{1u, 1u},
"two_subscriptions_in_two_contexts_with_intraprocess_comm"
},
/*
Testing publisher subscription count api and internal process subscription count.
Two contexts, both of them not using intra-process comm.
*/
{
{
rclcpp::NodeOptions().use_intra_process_comms(false),
rclcpp::NodeOptions().context(get_new_context()).use_intra_process_comms(false)
},
{0u, 0u},
"two_subscriptions_in_two_contexts_without_intraprocess_comm"
}
};
INSTANTIATE_TEST_CASE_P(
TestWithDifferentNodeOptions, TestPublisherSubscriptionCount,
::testing::ValuesIn(parameters),
::testing::PrintToStringParamName());

View file

@ -0,0 +1,123 @@
// Copyright 2019 Open Source Robotics Foundation, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <gtest/gtest.h>
#include <iostream>
#include <string>
#include <memory>
#include "rclcpp/exceptions.hpp"
#include "rclcpp/rclcpp.hpp"
#include "rcl_interfaces/msg/intra_process_message.hpp"
using rcl_interfaces::msg::IntraProcessMessage;
struct TestParameters
{
rclcpp::NodeOptions node_options;
std::string description;
};
std::ostream & operator<<(std::ostream & out, const TestParameters & params)
{
out << params.description;
return out;
}
class TestSubscriptionPublisherCount : public ::testing::TestWithParam<TestParameters>
{
public:
static void SetUpTestCase()
{
rclcpp::init(0, nullptr);
}
protected:
void SetUp() {}
void TearDown() {}
static std::chrono::milliseconds offset;
};
std::chrono::milliseconds TestSubscriptionPublisherCount::offset = std::chrono::milliseconds(2000);
void OnMessage(const rcl_interfaces::msg::IntraProcessMessage::SharedPtr msg)
{
(void)msg;
}
TEST_P(TestSubscriptionPublisherCount, increasing_and_decreasing_counts)
{
rclcpp::NodeOptions node_options = GetParam().node_options;
rclcpp::Node::SharedPtr node = std::make_shared<rclcpp::Node>(
"my_node",
"/ns",
node_options);
auto subscription = node->create_subscription<IntraProcessMessage>("/topic", &OnMessage);
EXPECT_EQ(subscription->get_publisher_count(), 0u);
{
auto pub = node->create_publisher<IntraProcessMessage>("/topic");
rclcpp::sleep_for(offset);
EXPECT_EQ(subscription->get_publisher_count(), 1u);
{
rclcpp::Node::SharedPtr another_node = std::make_shared<rclcpp::Node>(
"another_node",
"/ns",
node_options);
auto another_pub =
another_node->create_publisher<IntraProcessMessage>("/topic");
rclcpp::sleep_for(offset);
EXPECT_EQ(subscription->get_publisher_count(), 2u);
}
rclcpp::sleep_for(offset);
EXPECT_EQ(subscription->get_publisher_count(), 1u);
}
rclcpp::sleep_for(offset);
EXPECT_EQ(subscription->get_publisher_count(), 0u);
}
auto get_new_context()
{
auto context = rclcpp::Context::make_shared();
context->init(0, nullptr);
return context;
}
TestParameters parameters[] = {
/*
Testing subscription publisher count api.
One context.
*/
{
rclcpp::NodeOptions(),
"one_context_test"
},
/*
Testing subscription publisher count api.
Two contexts.
*/
{
rclcpp::NodeOptions().context(get_new_context()),
"two_contexts_test"
}
};
INSTANTIATE_TEST_CASE_P(
TestWithDifferentNodeOptions,
TestSubscriptionPublisherCount,
testing::ValuesIn(parameters),
testing::PrintToStringParamName());