Add received message age metric to topic statistics (#1080)

* Add received message age metric to topic statistics

Signed-off-by: Prajakta Gokhale <prajaktg@amazon.com>

* Add unit tests

Signed-off-by: Prajakta Gokhale <prajaktg@amazon.com>

* Add IMU messages in unit test

Signed-off-by: Prajakta Gokhale <prajaktg@amazon.com>

* Use system time instead of steady time
Test received message age stats values are greater than 0

Signed-off-by: Devin Bonnie <dbbonnie@amazon.com>

* Fix test warnings

Signed-off-by: Devin Bonnie <dbbonnie@amazon.com>

* Replace IMU messages with new dummy messages

Signed-off-by: Prajakta Gokhale <prajaktg@amazon.com>

* Remove outdated TODO and unused test variables

Signed-off-by: Prajakta Gokhale <prajaktg@amazon.com>

* Address review comments

Signed-off-by: Devin Bonnie <dbbonnie@amazon.com>

* Address review comments

Signed-off-by: Prajakta Gokhale <prajaktg@amazon.com>

* Re-add message with header for unit testing

Signed-off-by: Prajakta Gokhale <prajaktg@amazon.com>

* Address message review feedback

Signed-off-by: Devin Bonnie <dbbonnie@amazon.com>

* Remove extra newline

Signed-off-by: Prajakta Gokhale <prajaktg@amazon.com>

* Address more review feedback

Signed-off-by: Devin Bonnie <dbbonnie@amazon.com>

* Fix Windows failure

Signed-off-by: Devin Bonnie <dbbonnie@amazon.com>

* Only set append_library_dirs once

Signed-off-by: Devin Bonnie <dbbonnie@amazon.com>
This commit is contained in:
Prajakta Gokhale 2020-04-23 20:41:35 -07:00 committed by GitHub
parent df3c2ffa8a
commit 04f3c33de5
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
8 changed files with 229 additions and 54 deletions

View file

@ -12,6 +12,7 @@ find_package(rcpputils REQUIRED)
find_package(rcutils REQUIRED)
find_package(rmw REQUIRED)
find_package(rosgraph_msgs REQUIRED)
find_package(rosidl_default_generators REQUIRED)
find_package(rosidl_runtime_cpp REQUIRED)
find_package(rosidl_typesupport_c REQUIRED)
find_package(rosidl_typesupport_cpp REQUIRED)
@ -223,6 +224,14 @@ if(BUILD_TESTING)
add_definitions(-DTEST_RESOURCES_DIRECTORY="${CMAKE_CURRENT_BINARY_DIR}/test/resources")
rosidl_generate_interfaces(${PROJECT_NAME}_test_msgs
test/msg/Header.msg
test/msg/MessageWithHeader.msg
DEPENDENCIES builtin_interfaces
LIBRARY_NAME ${PROJECT_NAME}
SKIP_INSTALL
)
ament_add_gtest(test_client test/test_client.cpp)
if(TARGET test_client)
ament_target_dependencies(test_client
@ -610,9 +619,12 @@ if(BUILD_TESTING)
target_link_libraries(test_wait_set ${PROJECT_NAME})
endif()
ament_add_gtest(test_subscription_topic_statistics test/topic_statistics/test_subscription_topic_statistics.cpp)
ament_add_gtest(test_subscription_topic_statistics test/topic_statistics/test_subscription_topic_statistics.cpp
APPEND_LIBRARY_DIRS "${append_library_dirs}"
)
if(TARGET test_subscription_topic_statistics)
ament_target_dependencies(test_subscription_topic_statistics
"builtin_interfaces"
"libstatistics_collector"
"rcl_interfaces"
"rcutils"
@ -621,6 +633,7 @@ if(BUILD_TESTING)
"rosidl_typesupport_cpp"
"statistics_msgs"
"test_msgs")
rosidl_target_interfaces(test_subscription_topic_statistics ${PROJECT_NAME}_test_msgs "rosidl_typesupport_cpp")
target_link_libraries(test_subscription_topic_statistics ${PROJECT_NAME})
endif()

View file

@ -271,8 +271,8 @@ public:
if (subscription_topic_statistics_) {
const auto nanos = std::chrono::time_point_cast<std::chrono::nanoseconds>(
std::chrono::steady_clock::now());
const auto time = rclcpp::Time(nanos.time_since_epoch().count(), RCL_STEADY_TIME);
std::chrono::system_clock::now());
const auto time = rclcpp::Time(nanos.time_since_epoch().count());
subscription_topic_statistics_->handle_message(*typed_message, time);
}
}

View file

@ -23,6 +23,7 @@
#include "libstatistics_collector/collector/generate_statistics_message.hpp"
#include "libstatistics_collector/moving_average_statistics/types.hpp"
#include "libstatistics_collector/topic_statistics_collector/constants.hpp"
#include "libstatistics_collector/topic_statistics_collector/received_message_age.hpp"
#include "libstatistics_collector/topic_statistics_collector/received_message_period.hpp"
#include "rcl/time.h"
@ -56,6 +57,9 @@ class SubscriptionTopicStatistics
using TopicStatsCollector =
libstatistics_collector::topic_statistics_collector::TopicStatisticsCollector<
CallbackMessageT>;
using ReceivedMessageAge =
libstatistics_collector::topic_statistics_collector::ReceivedMessageAgeCollector<
CallbackMessageT>;
using ReceivedMessagePeriod =
libstatistics_collector::topic_statistics_collector::ReceivedMessagePeriodCollector<
CallbackMessageT>;
@ -173,6 +177,10 @@ private:
*/
void bring_up()
{
auto received_message_age = std::make_unique<ReceivedMessageAge>();
received_message_age->Start();
subscriber_statistics_collectors_.emplace_back(std::move(received_message_age));
auto received_message_period = std::make_unique<ReceivedMessagePeriod>();
received_message_period->Start();
{

View file

@ -37,6 +37,7 @@
<test_depend>ament_lint_common</test_depend>
<test_depend>rmw</test_depend>
<test_depend>rmw_implementation_cmake</test_depend>
<test_depend>rosidl_default_generators</test_depend>
<test_depend>test_msgs</test_depend>
<export>

View file

@ -0,0 +1,3 @@
# Simple Header message with a timestamp field.
builtin_interfaces/Time stamp

View file

@ -0,0 +1,3 @@
# Message containing a simple Header field.
Header header

View file

@ -18,12 +18,14 @@
#include <chrono>
#include <iostream>
#include <memory>
#include <set>
#include <string>
#include <vector>
#include "libstatistics_collector/moving_average_statistics/types.hpp"
#include "rclcpp/create_publisher.hpp"
#include "rclcpp/msg/message_with_header.hpp"
#include "rclcpp/node.hpp"
#include "rclcpp/qos.hpp"
#include "rclcpp/rclcpp.hpp"
@ -49,6 +51,7 @@ constexpr const uint64_t kNoSamples{0};
constexpr const std::chrono::seconds kTestDuration{10};
} // namespace
using rclcpp::msg::MessageWithHeader;
using test_msgs::msg::Empty;
using rclcpp::topic_statistics::SubscriptionTopicStatistics;
using statistics_msgs::msg::MetricsMessage;
@ -96,21 +99,47 @@ public:
virtual ~EmptyPublisher() = default;
size_t get_number_published()
{
return number_published_.load();
}
private:
void publish_message()
{
++number_published_;
auto msg = Empty{};
publisher_->publish(msg);
}
rclcpp::Publisher<Empty>::SharedPtr publisher_;
std::atomic<size_t> number_published_{0};
rclcpp::TimerBase::SharedPtr publish_timer_;
};
/**
* MessageWithHeader publisher node: used to publish MessageWithHeader with `header` value set
*/
class MessageWithHeaderPublisher : public rclcpp::Node
{
public:
MessageWithHeaderPublisher(
const std::string & name, const std::string & topic,
const std::chrono::milliseconds & publish_period = std::chrono::milliseconds{100})
: Node(name)
{
publisher_ = create_publisher<MessageWithHeader>(topic, 10);
publish_timer_ = this->create_wall_timer(
publish_period, [this]() {
this->publish_message();
});
}
virtual ~MessageWithHeaderPublisher() = default;
private:
void publish_message()
{
auto msg = MessageWithHeader{};
// Subtract 1 sec from current time so the received message age is always > 0
msg.header.stamp = this->now() - rclcpp::Duration{1, 0};
publisher_->publish(msg);
}
rclcpp::Publisher<MessageWithHeader>::SharedPtr publisher_;
rclcpp::TimerBase::SharedPtr publish_timer_;
};
@ -127,8 +156,8 @@ public:
auto options = rclcpp::SubscriptionOptions();
options.topic_stats_options.state = rclcpp::TopicStatisticsState::Enable;
auto callback = [this](Empty::UniquePtr msg) {
this->receive_message(*msg);
auto callback = [](Empty::UniquePtr msg) {
(void) msg;
};
subscription_ = create_subscription<Empty,
std::function<void(Empty::UniquePtr)>>(
@ -140,12 +169,38 @@ public:
virtual ~EmptySubscriber() = default;
private:
void receive_message(const Empty &) const
{
}
rclcpp::Subscription<Empty>::SharedPtr subscription_;
};
/**
* MessageWithHeader subscriber node: used to create subscriber topic statistics requirements
*/
class MessageWithHeaderSubscriber : public rclcpp::Node
{
public:
MessageWithHeaderSubscriber(const std::string & name, const std::string & topic)
: Node(name)
{
// manually enable topic statistics via options
auto options = rclcpp::SubscriptionOptions();
options.topic_stats_options.state = rclcpp::TopicStatisticsState::Enable;
auto callback = [](MessageWithHeader::UniquePtr msg) {
(void) msg;
};
subscription_ = create_subscription<MessageWithHeader,
std::function<void(MessageWithHeader::UniquePtr)>>(
topic,
rclcpp::QoS(rclcpp::KeepAll()),
callback,
options);
}
virtual ~MessageWithHeaderSubscriber() = default;
private:
rclcpp::Subscription<MessageWithHeader>::SharedPtr subscription_;
};
/**
* Test fixture to bring up and teardown rclcpp
*/
@ -170,18 +225,18 @@ protected:
TEST_F(TestSubscriptionTopicStatisticsFixture, test_manual_construction)
{
// manually create publisher tied to the node
// Manually create publisher tied to the node
auto topic_stats_publisher =
empty_subscriber->create_publisher<MetricsMessage>(
kTestTopicStatisticsTopic,
10);
// construct a separate instance
// Construct a separate instance
auto sub_topic_stats = std::make_unique<TestSubscriptionTopicStatistics<Empty>>(
empty_subscriber->get_name(),
topic_stats_publisher);
// expect no data has been collected / no samples received
// Expect no data has been collected / no samples received
for (const auto & data : sub_topic_stats->get_current_collector_data()) {
EXPECT_TRUE(std::isnan(data.average));
EXPECT_TRUE(std::isnan(data.min));
@ -191,36 +246,54 @@ TEST_F(TestSubscriptionTopicStatisticsFixture, test_manual_construction)
}
}
TEST_F(TestSubscriptionTopicStatisticsFixture, test_receive_single_empty_stats_message)
TEST_F(TestSubscriptionTopicStatisticsFixture, test_receive_stats_for_message_no_header)
{
// create an empty publisher
// Create an empty publisher
auto empty_publisher = std::make_shared<EmptyPublisher>(
kTestPubNodeName,
kTestSubStatsTopic);
// empty_subscriber has a topic statistics instance as part of its subscription
// this will listen to and generate statistics for the empty message
// create a listener for topic statistics messages
// Create a listener for topic statistics messages
auto statistics_listener = std::make_shared<rclcpp::topic_statistics::MetricsMessageSubscriber>(
"test_receive_single_empty_stats_message_listener",
"/statistics");
"/statistics",
2);
rclcpp::executors::SingleThreadedExecutor ex;
ex.add_node(empty_publisher);
ex.add_node(statistics_listener);
ex.add_node(empty_subscriber);
// spin and get future
// Spin and get future
ex.spin_until_future_complete(
statistics_listener->GetFuture(),
kTestDuration);
// compare message counts, sample count should be the same as published and received count
EXPECT_EQ(1, statistics_listener->GetNumberOfMessagesReceived());
// Compare message counts, sample count should be the same as published and received count
EXPECT_EQ(2, statistics_listener->GetNumberOfMessagesReceived());
// check the received message and the data types
const auto received_message = statistics_listener->GetLastReceivedMessage();
for (const auto & stats_point : received_message.statistics) {
// Check the received message and the data types
const auto received_messages = statistics_listener->GetReceivedMessages();
EXPECT_EQ(2u, received_messages.size());
std::set<std::string> received_metrics;
for (const auto & msg : received_messages) {
received_metrics.insert(msg.metrics_source);
}
EXPECT_TRUE(received_metrics.find("message_age") != received_metrics.end());
EXPECT_TRUE(received_metrics.find("message_period") != received_metrics.end());
// Check the collected statistics for message period.
// Message age statistics will not be calculated because Empty messages
// don't have a `header` with timestamp.
for (const auto & msg : received_messages) {
if (msg.metrics_source != "message_period") {
continue;
}
for (const auto & stats_point : msg.statistics) {
const auto type = stats_point.data_type;
switch (type) {
case StatisticDataType::STATISTICS_DATA_TYPE_SAMPLE_COUNT:
@ -243,4 +316,77 @@ TEST_F(TestSubscriptionTopicStatisticsFixture, test_receive_single_empty_stats_m
static_cast<unsigned int>(type);
}
}
}
}
TEST_F(TestSubscriptionTopicStatisticsFixture, test_receive_stats_for_message_with_header)
{
// Create a MessageWithHeader publisher
auto msg_with_header_publisher = std::make_shared<MessageWithHeaderPublisher>(
kTestPubNodeName,
kTestSubStatsTopic);
// empty_subscriber has a topic statistics instance as part of its subscription
// this will listen to and generate statistics for the empty message
// Create a listener for topic statistics messages
auto statistics_listener = std::make_shared<rclcpp::topic_statistics::MetricsMessageSubscriber>(
"test_receive_stats_for_message_with_header",
"/statistics",
2);
auto msg_with_header_subscriber = std::make_shared<MessageWithHeaderSubscriber>(
kTestSubNodeName,
kTestSubStatsTopic);
rclcpp::executors::SingleThreadedExecutor ex;
ex.add_node(msg_with_header_publisher);
ex.add_node(statistics_listener);
ex.add_node(msg_with_header_subscriber);
// Spin and get future
ex.spin_until_future_complete(
statistics_listener->GetFuture(),
kTestDuration);
// Compare message counts, sample count should be the same as published and received count
EXPECT_EQ(2, statistics_listener->GetNumberOfMessagesReceived());
// Check the received message and the data types
const auto received_messages = statistics_listener->GetReceivedMessages();
EXPECT_EQ(2u, received_messages.size());
std::set<std::string> received_metrics;
for (const auto & msg : received_messages) {
received_metrics.insert(msg.metrics_source);
}
EXPECT_TRUE(received_metrics.find("message_age") != received_metrics.end());
EXPECT_TRUE(received_metrics.find("message_period") != received_metrics.end());
// Check the collected statistics for message period.
for (const auto & msg : received_messages) {
for (const auto & stats_point : msg.statistics) {
const auto type = stats_point.data_type;
switch (type) {
case StatisticDataType::STATISTICS_DATA_TYPE_SAMPLE_COUNT:
EXPECT_LT(0, stats_point.data) << "unexpected sample count";
break;
case StatisticDataType::STATISTICS_DATA_TYPE_AVERAGE:
EXPECT_LT(0, stats_point.data) << "unexpected avg";
break;
case StatisticDataType::STATISTICS_DATA_TYPE_MINIMUM:
EXPECT_LT(0, stats_point.data) << "unexpected min";
break;
case StatisticDataType::STATISTICS_DATA_TYPE_MAXIMUM:
EXPECT_LT(0, stats_point.data) << "unexpected max";
break;
case StatisticDataType::STATISTICS_DATA_TYPE_STDDEV:
EXPECT_LE(0, stats_point.data) << "unexpected stddev";
break;
default:
FAIL() << "received unknown statistics type: " << std::dec <<
static_cast<unsigned int>(type);
}
}
}
}

View file

@ -18,6 +18,7 @@
#include <memory>
#include <mutex>
#include <string>
#include <vector>
#include "statistics_msgs/msg/metrics_message.hpp"
@ -89,7 +90,7 @@ public:
MetricsMessageSubscriber(
const std::string & name,
const std::string & topic_name,
const int number_of_messages_to_receive = 1)
const int number_of_messages_to_receive = 2)
: rclcpp::Node(name),
number_of_messages_to_receive_(number_of_messages_to_receive)
{
@ -99,7 +100,7 @@ public:
subscription_ = create_subscription<MetricsMessage,
std::function<void(MetricsMessage::UniquePtr)>>(
topic_name,
0 /*history_depth*/,
10 /*history_depth*/,
callback);
}
@ -107,10 +108,10 @@ public:
* Acquires a mutex in order to get the last message received member.
* \return the last message received
*/
MetricsMessage GetLastReceivedMessage() const
std::vector<MetricsMessage> GetReceivedMessages() const
{
std::unique_lock<std::mutex> ulock{mutex_};
return last_received_message_;
return received_messages_;
}
/**
@ -132,13 +133,13 @@ private:
{
std::unique_lock<std::mutex> ulock{mutex_};
++num_messages_received_;
last_received_message_ = msg;
received_messages_.push_back(msg);
if (num_messages_received_ >= number_of_messages_to_receive_) {
PromiseSetter::SetPromise();
}
}
MetricsMessage last_received_message_;
std::vector<MetricsMessage> received_messages_;
rclcpp::Subscription<MetricsMessage>::SharedPtr subscription_;
mutable std::mutex mutex_;
std::atomic<int> num_messages_received_{0};