Integrate topic statistics (#1072)

* Add SubscriberTopicStatistics class

Signed-off-by: Devin Bonnie <dbbonnie@amazon.com>

* Add SubscriberTopicStatistics Test

Signed-off-by: Devin Bonnie <dbbonnie@amazon.com>

* Address review comments

Signed-off-by: Devin Bonnie <dbbonnie@amazon.com>

* Modify constructor to allow a node to create necessary components

Signed-off-by: Devin Bonnie <dbbonnie@amazon.com>

* Fix docstring style

Signed-off-by: Devin Bonnie <dbbonnie@amazon.com>

* Remove SetPublisherTimer method

Signed-off-by: Devin Bonnie <dbbonnie@amazon.com>

* Change naming style to match rclcpp

Signed-off-by: Devin Bonnie <dbbonnie@amazon.com>

* Address style issues

Signed-off-by: Devin Bonnie <dbbonnie@amazon.com>

* Use rclcpp:Time

Signed-off-by: Devin Bonnie <dbbonnie@amazon.com>

* Address review comments

Signed-off-by: Devin Bonnie <dbbonnie@amazon.com>

* Remove unnecessary check for null publisher timer
Move anonymous namespace function to private class method

Signed-off-by: Devin Bonnie <dbbonnie@amazon.com>

* Update message dependency

Signed-off-by: Devin Bonnie <dbbonnie@amazon.com>

* Initial integration of Subscriber Topic Statistics

Signed-off-by: Devin Bonnie <dbbonnie@amazon.com>

* Fix nanoseconds used for Topic Stats

Signed-off-by: Devin Bonnie <dbbonnie@amazon.com>

* Add simple publishing test
Minor fixes

Signed-off-by: Devin Bonnie <dbbonnie@amazon.com>

* Add test utils header

Signed-off-by: Devin Bonnie <dbbonnie@amazon.com>

* Integrate with Topic Statistics options
Fixes after rebasing with master

Signed-off-by: Devin Bonnie <dbbonnie@amazon.com>

* Update after rebasing

Signed-off-by: Devin Bonnie <dbbonnie@amazon.com>

* Address minor review comments

Signed-off-by: Devin Bonnie <dbbonnie@amazon.com>

* Move Topic Statistics instantiation to create_subscription

Signed-off-by: Devin Bonnie <dbbonnie@amazon.com>

* Fix rebase issue
Fix topic statistics enable flag usage
Address minor formatting

Signed-off-by: Devin Bonnie <dbbonnie@amazon.com>

* Move new timer creation method to relevant header

Signed-off-by: Devin Bonnie <dbbonnie@amazon.com>

* Add timers interface to topic interface

Signed-off-by: Devin Bonnie <dbbonnie@amazon.com>

* Use new create timer method

Signed-off-by: Devin Bonnie <dbbonnie@amazon.com>

* Address review comments

Signed-off-by: Devin Bonnie <dbbonnie@amazon.com>
This commit is contained in:
Devin Bonnie 2020-04-22 16:09:41 -07:00 committed by GitHub
parent 4eab2a3c60
commit bb91b6c2ef
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
13 changed files with 419 additions and 27 deletions

View file

@ -15,15 +15,25 @@
#ifndef RCLCPP__CREATE_SUBSCRIPTION_HPP_ #ifndef RCLCPP__CREATE_SUBSCRIPTION_HPP_
#define RCLCPP__CREATE_SUBSCRIPTION_HPP_ #define RCLCPP__CREATE_SUBSCRIPTION_HPP_
#include <chrono>
#include <functional>
#include <memory> #include <memory>
#include <string> #include <string>
#include <utility> #include <utility>
#include "rclcpp/detail/resolve_enable_topic_statistics.hpp"
#include "rclcpp/node_interfaces/get_node_timers_interface.hpp"
#include "rclcpp/node_interfaces/get_node_topics_interface.hpp" #include "rclcpp/node_interfaces/get_node_topics_interface.hpp"
#include "rclcpp/node_interfaces/node_timers_interface.hpp"
#include "rclcpp/node_interfaces/node_topics_interface.hpp" #include "rclcpp/node_interfaces/node_topics_interface.hpp"
#include "rclcpp/create_timer.hpp"
#include "rclcpp/qos.hpp"
#include "rclcpp/subscription_factory.hpp" #include "rclcpp/subscription_factory.hpp"
#include "rclcpp/subscription_options.hpp" #include "rclcpp/subscription_options.hpp"
#include "rclcpp/qos.hpp" #include "rclcpp/timer.hpp"
#include "rclcpp/topic_statistics/subscription_topic_statistics.hpp"
#include "rmw/qos_profiles.h" #include "rmw/qos_profiles.h"
namespace rclcpp namespace rclcpp
@ -64,10 +74,46 @@ create_subscription(
using rclcpp::node_interfaces::get_node_topics_interface; using rclcpp::node_interfaces::get_node_topics_interface;
auto node_topics = get_node_topics_interface(std::forward<NodeT>(node)); auto node_topics = get_node_topics_interface(std::forward<NodeT>(node));
std::shared_ptr<rclcpp::topic_statistics::SubscriptionTopicStatistics<CallbackMessageT>>
subscription_topic_stats = nullptr;
if (rclcpp::detail::resolve_enable_topic_statistics(
options,
*node_topics->get_node_base_interface()))
{
std::shared_ptr<Publisher<statistics_msgs::msg::MetricsMessage>> publisher =
create_publisher<statistics_msgs::msg::MetricsMessage>(
node,
options.topic_stats_options.publish_topic,
qos);
subscription_topic_stats = std::make_shared<
rclcpp::topic_statistics::SubscriptionTopicStatistics<CallbackMessageT>
>(node_topics->get_node_base_interface()->get_name(), publisher);
auto sub_call_back = [subscription_topic_stats]() {
subscription_topic_stats->publish_message();
};
auto node_timer_interface = node_topics->get_node_timers_interface();
auto timer = create_wall_timer(
std::chrono::duration_cast<std::chrono::nanoseconds>(
options.topic_stats_options.publish_period),
sub_call_back,
options.callback_group,
node_topics->get_node_base_interface(),
node_timer_interface
);
subscription_topic_stats->set_publisher_timer(timer);
}
auto factory = rclcpp::create_subscription_factory<MessageT>( auto factory = rclcpp::create_subscription_factory<MessageT>(
std::forward<CallbackT>(callback), std::forward<CallbackT>(callback),
options, options,
msg_mem_strat msg_mem_strat,
subscription_topic_stats
); );
auto sub = node_topics->create_subscription(topic_name, factory, qos); auto sub = node_topics->create_subscription(topic_name, factory, qos);

View file

@ -15,6 +15,8 @@
#ifndef RCLCPP__CREATE_TIMER_HPP_ #ifndef RCLCPP__CREATE_TIMER_HPP_
#define RCLCPP__CREATE_TIMER_HPP_ #define RCLCPP__CREATE_TIMER_HPP_
#include <chrono>
#include <exception>
#include <memory> #include <memory>
#include <string> #include <string>
#include <utility> #include <utility>
@ -68,6 +70,46 @@ create_timer(
group); group);
} }
/// Convenience method to create a timer with node resources.
/**
*
* \tparam DurationRepT
* \tparam DurationT
* \tparam CallbackT
* \param period period to exectute callback
* \param callback callback to execute via the timer period
* \param group
* \param node_base
* \param node_timers
* \return
* \throws std::invalid argument if either node_base or node_timers
* are null
*/
template<typename DurationRepT, typename DurationT, typename CallbackT>
typename rclcpp::WallTimer<CallbackT>::SharedPtr
create_wall_timer(
std::chrono::duration<DurationRepT, DurationT> period,
CallbackT callback,
rclcpp::callback_group::CallbackGroup::SharedPtr group,
node_interfaces::NodeBaseInterface * node_base,
node_interfaces::NodeTimersInterface * node_timers)
{
if (node_base == nullptr) {
throw std::invalid_argument{"input node_base cannot be null"};
}
if (node_timers == nullptr) {
throw std::invalid_argument{"input node_timers cannot be null"};
}
auto timer = rclcpp::WallTimer<CallbackT>::make_shared(
std::chrono::duration_cast<std::chrono::nanoseconds>(period),
std::move(callback),
node_base->get_context());
node_timers->add_timer(timer, group);
return timer;
}
} // namespace rclcpp } // namespace rclcpp
#endif // RCLCPP__CREATE_TIMER_HPP_ #endif // RCLCPP__CREATE_TIMER_HPP_

View file

@ -42,7 +42,6 @@ resolve_enable_topic_statistics(const OptionsT & options, const NodeBaseT & node
break; break;
default: default:
throw std::runtime_error("Unrecognized EnableTopicStatistics value"); throw std::runtime_error("Unrecognized EnableTopicStatistics value");
break;
} }
return topic_stats_enabled; return topic_stats_enabled;

View file

@ -19,6 +19,7 @@
#include <rmw/rmw.h> #include <rmw/rmw.h>
#include <algorithm> #include <algorithm>
#include <chrono>
#include <cstdlib> #include <cstdlib>
#include <iostream> #include <iostream>
#include <limits> #include <limits>
@ -37,10 +38,12 @@
#include "rclcpp/create_client.hpp" #include "rclcpp/create_client.hpp"
#include "rclcpp/create_publisher.hpp" #include "rclcpp/create_publisher.hpp"
#include "rclcpp/create_service.hpp" #include "rclcpp/create_service.hpp"
#include "rclcpp/create_timer.hpp"
#include "rclcpp/create_subscription.hpp" #include "rclcpp/create_subscription.hpp"
#include "rclcpp/detail/resolve_enable_topic_statistics.hpp" #include "rclcpp/detail/resolve_enable_topic_statistics.hpp"
#include "rclcpp/parameter.hpp" #include "rclcpp/parameter.hpp"
#include "rclcpp/qos.hpp" #include "rclcpp/qos.hpp"
#include "rclcpp/timer.hpp"
#include "rclcpp/type_support_decl.hpp" #include "rclcpp/type_support_decl.hpp"
#include "rclcpp/visibility_control.hpp" #include "rclcpp/visibility_control.hpp"
@ -108,12 +111,12 @@ Node::create_wall_timer(
CallbackT callback, CallbackT callback,
rclcpp::callback_group::CallbackGroup::SharedPtr group) rclcpp::callback_group::CallbackGroup::SharedPtr group)
{ {
auto timer = rclcpp::WallTimer<CallbackT>::make_shared( return rclcpp::create_wall_timer(
std::chrono::duration_cast<std::chrono::nanoseconds>(period), period,
std::move(callback), std::move(callback),
this->node_base_->get_context()); group,
node_timers_->add_timer(timer, group); this->node_base_.get(),
return timer; this->node_timers_.get());
} }
template<typename ServiceT> template<typename ServiceT>

View file

@ -22,6 +22,7 @@
#include "rclcpp/macros.hpp" #include "rclcpp/macros.hpp"
#include "rclcpp/node_interfaces/node_base_interface.hpp" #include "rclcpp/node_interfaces/node_base_interface.hpp"
#include "rclcpp/node_interfaces/node_timers_interface.hpp"
#include "rclcpp/node_interfaces/node_topics_interface.hpp" #include "rclcpp/node_interfaces/node_topics_interface.hpp"
#include "rclcpp/publisher.hpp" #include "rclcpp/publisher.hpp"
#include "rclcpp/publisher_factory.hpp" #include "rclcpp/publisher_factory.hpp"
@ -39,7 +40,9 @@ public:
RCLCPP_SMART_PTR_ALIASES_ONLY(NodeTopicsInterface) RCLCPP_SMART_PTR_ALIASES_ONLY(NodeTopicsInterface)
RCLCPP_PUBLIC RCLCPP_PUBLIC
explicit NodeTopics(rclcpp::node_interfaces::NodeBaseInterface * node_base); NodeTopics(
rclcpp::node_interfaces::NodeBaseInterface * node_base,
rclcpp::node_interfaces::NodeTimersInterface * node_timers);
RCLCPP_PUBLIC RCLCPP_PUBLIC
~NodeTopics() override; ~NodeTopics() override;
@ -74,10 +77,15 @@ public:
rclcpp::node_interfaces::NodeBaseInterface * rclcpp::node_interfaces::NodeBaseInterface *
get_node_base_interface() const override; get_node_base_interface() const override;
RCLCPP_PUBLIC
rclcpp::node_interfaces::NodeTimersInterface *
get_node_timers_interface() const override;
private: private:
RCLCPP_DISABLE_COPY(NodeTopics) RCLCPP_DISABLE_COPY(NodeTopics)
rclcpp::node_interfaces::NodeBaseInterface * node_base_; rclcpp::node_interfaces::NodeBaseInterface * node_base_;
rclcpp::node_interfaces::NodeTimersInterface * node_timers_;
}; };
} // namespace node_interfaces } // namespace node_interfaces

View file

@ -25,6 +25,7 @@
#include "rclcpp/callback_group.hpp" #include "rclcpp/callback_group.hpp"
#include "rclcpp/macros.hpp" #include "rclcpp/macros.hpp"
#include "rclcpp/node_interfaces/node_base_interface.hpp" #include "rclcpp/node_interfaces/node_base_interface.hpp"
#include "rclcpp/node_interfaces/node_timers_interface.hpp"
#include "rclcpp/publisher.hpp" #include "rclcpp/publisher.hpp"
#include "rclcpp/publisher_factory.hpp" #include "rclcpp/publisher_factory.hpp"
#include "rclcpp/subscription.hpp" #include "rclcpp/subscription.hpp"
@ -80,6 +81,11 @@ public:
virtual virtual
rclcpp::node_interfaces::NodeBaseInterface * rclcpp::node_interfaces::NodeBaseInterface *
get_node_base_interface() const = 0; get_node_base_interface() const = 0;
RCLCPP_PUBLIC
virtual
rclcpp::node_interfaces::NodeTimersInterface *
get_node_timers_interface() const = 0;
}; };
} // namespace node_interfaces } // namespace node_interfaces

View file

@ -18,6 +18,7 @@
#include <rmw/error_handling.h> #include <rmw/error_handling.h>
#include <rmw/rmw.h> #include <rmw/rmw.h>
#include <chrono>
#include <functional> #include <functional>
#include <iostream> #include <iostream>
#include <memory> #include <memory>
@ -25,7 +26,6 @@
#include <string> #include <string>
#include <utility> #include <utility>
#include "rcl/error_handling.h" #include "rcl/error_handling.h"
#include "rcl/subscription.h" #include "rcl/subscription.h"
@ -47,6 +47,7 @@
#include "rclcpp/type_support_decl.hpp" #include "rclcpp/type_support_decl.hpp"
#include "rclcpp/visibility_control.hpp" #include "rclcpp/visibility_control.hpp"
#include "rclcpp/waitable.hpp" #include "rclcpp/waitable.hpp"
#include "rclcpp/topic_statistics/subscription_topic_statistics.hpp"
#include "tracetools/tracetools.h" #include "tracetools/tracetools.h"
namespace rclcpp namespace rclcpp
@ -75,6 +76,8 @@ public:
using MessageDeleter = allocator::Deleter<MessageAllocator, CallbackMessageT>; using MessageDeleter = allocator::Deleter<MessageAllocator, CallbackMessageT>;
using ConstMessageSharedPtr = std::shared_ptr<const CallbackMessageT>; using ConstMessageSharedPtr = std::shared_ptr<const CallbackMessageT>;
using MessageUniquePtr = std::unique_ptr<CallbackMessageT, MessageDeleter>; using MessageUniquePtr = std::unique_ptr<CallbackMessageT, MessageDeleter>;
using SubscriptionTopicStatisticsSharedPtr =
std::shared_ptr<rclcpp::topic_statistics::SubscriptionTopicStatistics<CallbackMessageT>>;
RCLCPP_SMART_PTR_DEFINITIONS(Subscription) RCLCPP_SMART_PTR_DEFINITIONS(Subscription)
@ -98,7 +101,8 @@ public:
const rclcpp::QoS & qos, const rclcpp::QoS & qos,
AnySubscriptionCallback<CallbackMessageT, AllocatorT> callback, AnySubscriptionCallback<CallbackMessageT, AllocatorT> callback,
const rclcpp::SubscriptionOptionsWithAllocator<AllocatorT> & options, const rclcpp::SubscriptionOptionsWithAllocator<AllocatorT> & options,
typename MessageMemoryStrategyT::SharedPtr message_memory_strategy) typename MessageMemoryStrategyT::SharedPtr message_memory_strategy,
SubscriptionTopicStatisticsSharedPtr subscription_topic_statistics = nullptr)
: SubscriptionBase( : SubscriptionBase(
node_base, node_base,
type_support_handle, type_support_handle,
@ -180,6 +184,10 @@ public:
this->setup_intra_process(intra_process_subscription_id, ipm); this->setup_intra_process(intra_process_subscription_id, ipm);
} }
if (subscription_topic_statistics != nullptr) {
this->subscription_topic_statistics_ = std::move(subscription_topic_statistics);
}
TRACEPOINT( TRACEPOINT(
rclcpp_subscription_init, rclcpp_subscription_init,
(const void *)get_subscription_handle().get(), (const void *)get_subscription_handle().get(),
@ -260,6 +268,13 @@ public:
} }
auto typed_message = std::static_pointer_cast<CallbackMessageT>(message); auto typed_message = std::static_pointer_cast<CallbackMessageT>(message);
any_callback_.dispatch(typed_message, message_info); any_callback_.dispatch(typed_message, message_info);
if (subscription_topic_statistics_) {
const auto nanos = std::chrono::time_point_cast<std::chrono::nanoseconds>(
std::chrono::steady_clock::now());
const auto time = rclcpp::Time(nanos.time_since_epoch().count(), RCL_STEADY_TIME);
subscription_topic_statistics_->handle_message(*typed_message, time);
}
} }
void void
@ -307,6 +322,8 @@ private:
const rclcpp::SubscriptionOptionsWithAllocator<AllocatorT> options_; const rclcpp::SubscriptionOptionsWithAllocator<AllocatorT> options_;
typename message_memory_strategy::MessageMemoryStrategy<CallbackMessageT, AllocatorT>::SharedPtr typename message_memory_strategy::MessageMemoryStrategy<CallbackMessageT, AllocatorT>::SharedPtr
message_memory_strategy_; message_memory_strategy_;
/// Component which computes and publishes topic statistics for this subscriber
SubscriptionTopicStatisticsSharedPtr subscription_topic_statistics_{nullptr};
}; };
} // namespace rclcpp } // namespace rclcpp

View file

@ -32,6 +32,7 @@
#include "rclcpp/subscription_options.hpp" #include "rclcpp/subscription_options.hpp"
#include "rclcpp/subscription_traits.hpp" #include "rclcpp/subscription_traits.hpp"
#include "rclcpp/visibility_control.hpp" #include "rclcpp/visibility_control.hpp"
#include "rclcpp/topic_statistics/subscription_topic_statistics.hpp"
namespace rclcpp namespace rclcpp
{ {
@ -78,7 +79,10 @@ SubscriptionFactory
create_subscription_factory( create_subscription_factory(
CallbackT && callback, CallbackT && callback,
const rclcpp::SubscriptionOptionsWithAllocator<AllocatorT> & options, const rclcpp::SubscriptionOptionsWithAllocator<AllocatorT> & options,
typename MessageMemoryStrategyT::SharedPtr msg_mem_strat) typename MessageMemoryStrategyT::SharedPtr msg_mem_strat,
std::shared_ptr<rclcpp::topic_statistics::SubscriptionTopicStatistics<CallbackMessageT>>
subscription_topic_stats = nullptr
)
{ {
auto allocator = options.get_allocator(); auto allocator = options.get_allocator();
@ -88,7 +92,7 @@ create_subscription_factory(
SubscriptionFactory factory { SubscriptionFactory factory {
// factory function that creates a MessageT specific SubscriptionT // factory function that creates a MessageT specific SubscriptionT
[options, msg_mem_strat, any_subscription_callback]( [options, msg_mem_strat, any_subscription_callback, subscription_topic_stats](
rclcpp::node_interfaces::NodeBaseInterface * node_base, rclcpp::node_interfaces::NodeBaseInterface * node_base,
const std::string & topic_name, const std::string & topic_name,
const rclcpp::QoS & qos const rclcpp::QoS & qos
@ -104,7 +108,8 @@ create_subscription_factory(
qos, qos,
any_subscription_callback, any_subscription_callback,
options, options,
msg_mem_strat); msg_mem_strat,
subscription_topic_stats);
// This is used for setting up things like intra process comms which // This is used for setting up things like intra process comms which
// require this->shared_from_this() which cannot be called from // require this->shared_from_this() which cannot be called from
// the constructor. // the constructor.

View file

@ -108,7 +108,7 @@ Node::Node(
node_graph_(new rclcpp::node_interfaces::NodeGraph(node_base_.get())), node_graph_(new rclcpp::node_interfaces::NodeGraph(node_base_.get())),
node_logging_(new rclcpp::node_interfaces::NodeLogging(node_base_.get())), node_logging_(new rclcpp::node_interfaces::NodeLogging(node_base_.get())),
node_timers_(new rclcpp::node_interfaces::NodeTimers(node_base_.get())), node_timers_(new rclcpp::node_interfaces::NodeTimers(node_base_.get())),
node_topics_(new rclcpp::node_interfaces::NodeTopics(node_base_.get())), node_topics_(new rclcpp::node_interfaces::NodeTopics(node_base_.get(), node_timers_.get())),
node_services_(new rclcpp::node_interfaces::NodeServices(node_base_.get())), node_services_(new rclcpp::node_interfaces::NodeServices(node_base_.get())),
node_clock_(new rclcpp::node_interfaces::NodeClock( node_clock_(new rclcpp::node_interfaces::NodeClock(
node_base_, node_base_,

View file

@ -22,8 +22,10 @@ using rclcpp::exceptions::throw_from_rcl_error;
using rclcpp::node_interfaces::NodeTopics; using rclcpp::node_interfaces::NodeTopics;
NodeTopics::NodeTopics(rclcpp::node_interfaces::NodeBaseInterface * node_base) NodeTopics::NodeTopics(
: node_base_(node_base) rclcpp::node_interfaces::NodeBaseInterface * node_base,
rclcpp::node_interfaces::NodeTimersInterface * node_timers)
: node_base_(node_base), node_timers_(node_timers)
{} {}
NodeTopics::~NodeTopics() NodeTopics::~NodeTopics()
@ -121,3 +123,9 @@ NodeTopics::get_node_base_interface() const
{ {
return node_base_; return node_base_;
} }
rclcpp::node_interfaces::NodeTimersInterface *
NodeTopics::get_node_timers_interface() const
{
return node_timers_;
}

View file

@ -14,6 +14,9 @@
#include <gtest/gtest.h> #include <gtest/gtest.h>
#include <atomic>
#include <chrono>
#include <iostream>
#include <memory> #include <memory>
#include <string> #include <string>
#include <vector> #include <vector>
@ -24,23 +27,34 @@
#include "rclcpp/node.hpp" #include "rclcpp/node.hpp"
#include "rclcpp/qos.hpp" #include "rclcpp/qos.hpp"
#include "rclcpp/rclcpp.hpp" #include "rclcpp/rclcpp.hpp"
#include "rclcpp/subscription_options.hpp"
#include "rclcpp/topic_statistics/subscription_topic_statistics.hpp" #include "rclcpp/topic_statistics/subscription_topic_statistics.hpp"
#include "statistics_msgs/msg/metrics_message.hpp" #include "statistics_msgs/msg/metrics_message.hpp"
#include "statistics_msgs/msg/statistic_data_point.hpp"
#include "statistics_msgs/msg/statistic_data_type.hpp"
#include "test_msgs/msg/empty.hpp" #include "test_msgs/msg/empty.hpp"
#include "test_topic_stats_utils.hpp"
namespace namespace
{ {
constexpr const char kTestNodeName[]{"test_sub_stats_node"}; constexpr const char kTestPubNodeName[]{"test_pub_stats_node"};
constexpr const char kTestSubNodeName[]{"test_sub_stats_node"};
constexpr const char kTestSubStatsTopic[]{"/test_sub_stats_topic"}; constexpr const char kTestSubStatsTopic[]{"/test_sub_stats_topic"};
constexpr const char kTestTopicStatisticsTopic[]{"/test_topic_statistics_topic"}; constexpr const char kTestTopicStatisticsTopic[]{"/test_topic_statistics_topic"};
constexpr const uint64_t kNoSamples{0}; constexpr const uint64_t kNoSamples{0};
constexpr const std::chrono::milliseconds kTestStatsPublishPeriod{5000};
constexpr const std::chrono::seconds kTestDuration{10};
} // namespace } // namespace
using test_msgs::msg::Empty; using test_msgs::msg::Empty;
using statistics_msgs::msg::MetricsMessage;
using rclcpp::topic_statistics::SubscriptionTopicStatistics; using rclcpp::topic_statistics::SubscriptionTopicStatistics;
using statistics_msgs::msg::MetricsMessage;
using statistics_msgs::msg::StatisticDataPoint;
using statistics_msgs::msg::StatisticDataType;
using libstatistics_collector::moving_average_statistics::StatisticData; using libstatistics_collector::moving_average_statistics::StatisticData;
template<typename CallbackMessageT> template<typename CallbackMessageT>
@ -63,6 +77,44 @@ public:
} }
}; };
/**
* Empty publisher node: used to publish empty messages
*/
class EmptyPublisher : public rclcpp::Node
{
public:
EmptyPublisher(
const std::string & name, const std::string & topic,
const std::chrono::milliseconds & publish_period = std::chrono::milliseconds{100})
: Node(name)
{
publisher_ = create_publisher<Empty>(topic, 10);
publish_timer_ = this->create_wall_timer(
publish_period, [this]() {
this->publish_message();
});
}
virtual ~EmptyPublisher() = default;
size_t get_number_published()
{
return number_published_.load();
}
private:
void publish_message()
{
++number_published_;
auto msg = Empty{};
publisher_->publish(msg);
}
rclcpp::Publisher<Empty>::SharedPtr publisher_;
std::atomic<size_t> number_published_{0};
rclcpp::TimerBase::SharedPtr publish_timer_;
};
/** /**
* Empty subscriber node: used to create subscriber topic statistics requirements * Empty subscriber node: used to create subscriber topic statistics requirements
*/ */
@ -72,6 +124,10 @@ public:
EmptySubscriber(const std::string & name, const std::string & topic) EmptySubscriber(const std::string & name, const std::string & topic)
: Node(name) : Node(name)
{ {
// manually enable topic statistics via options
auto options = rclcpp::SubscriptionOptions();
options.topic_stats_options.state = rclcpp::TopicStatisticsState::Enable;
auto callback = [this](Empty::UniquePtr msg) { auto callback = [this](Empty::UniquePtr msg) {
this->receive_message(*msg); this->receive_message(*msg);
}; };
@ -79,16 +135,15 @@ public:
std::function<void(Empty::UniquePtr)>>( std::function<void(Empty::UniquePtr)>>(
topic, topic,
rclcpp::QoS(rclcpp::KeepAll()), rclcpp::QoS(rclcpp::KeepAll()),
callback); callback,
options);
} }
virtual ~EmptySubscriber() = default; virtual ~EmptySubscriber() = default;
private: private:
void receive_message(const Empty &) const void receive_message(const Empty &) const
{ {
} }
rclcpp::Subscription<Empty>::SharedPtr subscription_; rclcpp::Subscription<Empty>::SharedPtr subscription_;
}; };
@ -102,7 +157,7 @@ protected:
{ {
rclcpp::init(0 /* argc */, nullptr /* argv */); rclcpp::init(0 /* argc */, nullptr /* argv */);
empty_subscriber = std::make_shared<EmptySubscriber>( empty_subscriber = std::make_shared<EmptySubscriber>(
kTestNodeName, kTestSubNodeName,
kTestSubStatsTopic); kTestSubStatsTopic);
} }
@ -122,13 +177,11 @@ TEST_F(TestSubscriptionTopicStatisticsFixture, test_manual_construction)
kTestTopicStatisticsTopic, kTestTopicStatisticsTopic,
10); 10);
// construct the instance // construct a separate instance
auto sub_topic_stats = std::make_unique<TestSubscriptionTopicStatistics<Empty>>( auto sub_topic_stats = std::make_unique<TestSubscriptionTopicStatistics<Empty>>(
empty_subscriber->get_name(), empty_subscriber->get_name(),
topic_stats_publisher); topic_stats_publisher);
using libstatistics_collector::moving_average_statistics::StatisticData;
// expect no data has been collected / no samples received // expect no data has been collected / no samples received
for (const auto & data : sub_topic_stats->get_current_collector_data()) { for (const auto & data : sub_topic_stats->get_current_collector_data()) {
EXPECT_TRUE(std::isnan(data.average)); EXPECT_TRUE(std::isnan(data.average));
@ -138,3 +191,57 @@ TEST_F(TestSubscriptionTopicStatisticsFixture, test_manual_construction)
EXPECT_EQ(kNoSamples, data.sample_count); EXPECT_EQ(kNoSamples, data.sample_count);
} }
} }
TEST_F(TestSubscriptionTopicStatisticsFixture, test_receive_single_empty_stats_message)
{
// create an empty publisher
auto empty_publisher = std::make_shared<EmptyPublisher>(
kTestPubNodeName,
kTestSubStatsTopic);
// empty_subscriber has a topic statistics instance as part of its subscription
// this will listen to and generate statistics for the empty message
// create a listener for topic statistics messages
auto statistics_listener = std::make_shared<rclcpp::topic_statistics::MetricsMessageSubscriber>(
"test_receive_single_empty_stats_message_listener",
"/statistics");
rclcpp::executors::SingleThreadedExecutor ex;
ex.add_node(empty_publisher);
ex.add_node(statistics_listener);
ex.add_node(empty_subscriber);
// spin and get future
ex.spin_until_future_complete(
statistics_listener->GetFuture(),
kTestDuration);
// compare message counts, sample count should be the same as published and received count
EXPECT_EQ(1, statistics_listener->GetNumberOfMessagesReceived());
// check the received message and the data types
const auto received_message = statistics_listener->GetLastReceivedMessage();
for (const auto & stats_point : received_message.statistics) {
const auto type = stats_point.data_type;
switch (type) {
case StatisticDataType::STATISTICS_DATA_TYPE_SAMPLE_COUNT:
EXPECT_LT(0, stats_point.data) << "unexpected sample count";
break;
case StatisticDataType::STATISTICS_DATA_TYPE_AVERAGE:
EXPECT_LT(0, stats_point.data) << "unexpected avg";
break;
case StatisticDataType::STATISTICS_DATA_TYPE_MINIMUM:
EXPECT_LT(0, stats_point.data) << "unexpected min";
break;
case StatisticDataType::STATISTICS_DATA_TYPE_MAXIMUM:
EXPECT_LT(0, stats_point.data) << "unexpected max";
break;
case StatisticDataType::STATISTICS_DATA_TYPE_STDDEV:
EXPECT_LT(0, stats_point.data) << "unexpected stddev";
break;
default:
FAIL() << "received unknown statistics type: " << std::dec <<
static_cast<unsigned int>(type);
}
}
}

View file

@ -0,0 +1,151 @@
// Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <atomic>
#include <functional>
#include <future>
#include <memory>
#include <mutex>
#include <string>
#include "statistics_msgs/msg/metrics_message.hpp"
#ifndef TOPIC_STATISTICS__TEST_TOPIC_STATS_UTILS_HPP_
#define TOPIC_STATISTICS__TEST_TOPIC_STATS_UTILS_HPP_
namespace rclcpp
{
namespace topic_statistics
{
using statistics_msgs::msg::MetricsMessage;
/**
* Provide an interface to wait for a promise to be satisfied via its future.
*/
class PromiseSetter
{
public:
/**
* Reassign the promise member and return it's future. Acquires a mutex in order
* to mutate member variables.
*
* \return the promise member's future, called upon PeriodicMeasurement
*/
std::shared_future<bool> GetFuture()
{
std::unique_lock<std::mutex> ulock{mutex_};
use_future_ = true;
promise_ = std::promise<bool>();
return promise_.get_future();
}
protected:
/**
* Set the promise to true, which signals the corresponding future. Acquires a mutex and sets
* the promise to true iff GetFuture was invoked before this.
*/
void SetPromise()
{
std::unique_lock<std::mutex> ulock{mutex_};
if (use_future_) {
// only set if GetFuture was called
promise_.set_value(true);
use_future_ = false; // the promise needs to be reassigned to set again
}
}
private:
mutable std::mutex mutex_;
std::promise<bool> promise_;
bool use_future_{false};
};
/**
* Node which listens for published MetricsMessages. This uses the PromiseSetter API
* in order to signal, via a future, that rclcpp should stop spinning upon
* message handling.
*/
class MetricsMessageSubscriber : public rclcpp::Node, public PromiseSetter
{
public:
/**
* Constructs a MetricsMessageSubscriber.
* \param name the node name
* \param name the topic name
* \param number of messages to receive to trigger the PromiseSetter future
*/
MetricsMessageSubscriber(
const std::string & name,
const std::string & topic_name,
const int number_of_messages_to_receive = 1)
: rclcpp::Node(name),
number_of_messages_to_receive_(number_of_messages_to_receive)
{
auto callback = [this](MetricsMessage::UniquePtr msg) {
this->MetricsMessageCallback(*msg);
};
subscription_ = create_subscription<MetricsMessage,
std::function<void(MetricsMessage::UniquePtr)>>(
topic_name,
0 /*history_depth*/,
callback);
}
/**
* Acquires a mutex in order to get the last message received member.
* \return the last message received
*/
MetricsMessage GetLastReceivedMessage() const
{
std::unique_lock<std::mutex> ulock{mutex_};
return last_received_message_;
}
/**
* Return the number of messages received by this subscriber.
* \return the number of messages received by the subscriber callback
*/
int GetNumberOfMessagesReceived() const
{
return num_messages_received_;
}
private:
/**
* Subscriber callback. Acquires a mutex to set the last message received and
* sets the promise to true.
* \param msg
*/
void MetricsMessageCallback(const MetricsMessage & msg)
{
std::unique_lock<std::mutex> ulock{mutex_};
++num_messages_received_;
last_received_message_ = msg;
if (num_messages_received_ >= number_of_messages_to_receive_) {
PromiseSetter::SetPromise();
}
}
MetricsMessage last_received_message_;
rclcpp::Subscription<MetricsMessage>::SharedPtr subscription_;
mutable std::mutex mutex_;
std::atomic<int> num_messages_received_{0};
const int number_of_messages_to_receive_;
};
} // namespace topic_statistics
} // namespace rclcpp
#endif // TOPIC_STATISTICS__TEST_TOPIC_STATS_UTILS_HPP_

View file

@ -67,7 +67,7 @@ LifecycleNode::LifecycleNode(
node_graph_(new rclcpp::node_interfaces::NodeGraph(node_base_.get())), node_graph_(new rclcpp::node_interfaces::NodeGraph(node_base_.get())),
node_logging_(new rclcpp::node_interfaces::NodeLogging(node_base_.get())), node_logging_(new rclcpp::node_interfaces::NodeLogging(node_base_.get())),
node_timers_(new rclcpp::node_interfaces::NodeTimers(node_base_.get())), node_timers_(new rclcpp::node_interfaces::NodeTimers(node_base_.get())),
node_topics_(new rclcpp::node_interfaces::NodeTopics(node_base_.get())), node_topics_(new rclcpp::node_interfaces::NodeTopics(node_base_.get(), node_timers_.get())),
node_services_(new rclcpp::node_interfaces::NodeServices(node_base_.get())), node_services_(new rclcpp::node_interfaces::NodeServices(node_base_.get())),
node_clock_(new rclcpp::node_interfaces::NodeClock( node_clock_(new rclcpp::node_interfaces::NodeClock(
node_base_, node_base_,