diff --git a/rclcpp/CMakeLists.txt b/rclcpp/CMakeLists.txt index 5189510..ef2f106 100644 --- a/rclcpp/CMakeLists.txt +++ b/rclcpp/CMakeLists.txt @@ -12,6 +12,7 @@ find_package(rcpputils REQUIRED) find_package(rcutils REQUIRED) find_package(rmw REQUIRED) find_package(rosgraph_msgs REQUIRED) +find_package(rosidl_default_generators REQUIRED) find_package(rosidl_runtime_cpp REQUIRED) find_package(rosidl_typesupport_c REQUIRED) find_package(rosidl_typesupport_cpp REQUIRED) @@ -223,6 +224,14 @@ if(BUILD_TESTING) add_definitions(-DTEST_RESOURCES_DIRECTORY="${CMAKE_CURRENT_BINARY_DIR}/test/resources") + rosidl_generate_interfaces(${PROJECT_NAME}_test_msgs + test/msg/Header.msg + test/msg/MessageWithHeader.msg + DEPENDENCIES builtin_interfaces + LIBRARY_NAME ${PROJECT_NAME} + SKIP_INSTALL + ) + ament_add_gtest(test_client test/test_client.cpp) if(TARGET test_client) ament_target_dependencies(test_client @@ -610,9 +619,12 @@ if(BUILD_TESTING) target_link_libraries(test_wait_set ${PROJECT_NAME}) endif() - ament_add_gtest(test_subscription_topic_statistics test/topic_statistics/test_subscription_topic_statistics.cpp) + ament_add_gtest(test_subscription_topic_statistics test/topic_statistics/test_subscription_topic_statistics.cpp + APPEND_LIBRARY_DIRS "${append_library_dirs}" + ) if(TARGET test_subscription_topic_statistics) ament_target_dependencies(test_subscription_topic_statistics + "builtin_interfaces" "libstatistics_collector" "rcl_interfaces" "rcutils" @@ -621,6 +633,7 @@ if(BUILD_TESTING) "rosidl_typesupport_cpp" "statistics_msgs" "test_msgs") + rosidl_target_interfaces(test_subscription_topic_statistics ${PROJECT_NAME}_test_msgs "rosidl_typesupport_cpp") target_link_libraries(test_subscription_topic_statistics ${PROJECT_NAME}) endif() diff --git a/rclcpp/include/rclcpp/subscription.hpp b/rclcpp/include/rclcpp/subscription.hpp index 91343aa..2b8108f 100644 --- a/rclcpp/include/rclcpp/subscription.hpp +++ b/rclcpp/include/rclcpp/subscription.hpp @@ -271,8 +271,8 @@ public: if (subscription_topic_statistics_) { const auto nanos = std::chrono::time_point_cast( - std::chrono::steady_clock::now()); - const auto time = rclcpp::Time(nanos.time_since_epoch().count(), RCL_STEADY_TIME); + std::chrono::system_clock::now()); + const auto time = rclcpp::Time(nanos.time_since_epoch().count()); subscription_topic_statistics_->handle_message(*typed_message, time); } } diff --git a/rclcpp/include/rclcpp/topic_statistics/subscription_topic_statistics.hpp b/rclcpp/include/rclcpp/topic_statistics/subscription_topic_statistics.hpp index 058ed45..37a2804 100644 --- a/rclcpp/include/rclcpp/topic_statistics/subscription_topic_statistics.hpp +++ b/rclcpp/include/rclcpp/topic_statistics/subscription_topic_statistics.hpp @@ -23,6 +23,7 @@ #include "libstatistics_collector/collector/generate_statistics_message.hpp" #include "libstatistics_collector/moving_average_statistics/types.hpp" #include "libstatistics_collector/topic_statistics_collector/constants.hpp" +#include "libstatistics_collector/topic_statistics_collector/received_message_age.hpp" #include "libstatistics_collector/topic_statistics_collector/received_message_period.hpp" #include "rcl/time.h" @@ -56,6 +57,9 @@ class SubscriptionTopicStatistics using TopicStatsCollector = libstatistics_collector::topic_statistics_collector::TopicStatisticsCollector< CallbackMessageT>; + using ReceivedMessageAge = + libstatistics_collector::topic_statistics_collector::ReceivedMessageAgeCollector< + CallbackMessageT>; using ReceivedMessagePeriod = libstatistics_collector::topic_statistics_collector::ReceivedMessagePeriodCollector< CallbackMessageT>; @@ -173,6 +177,10 @@ private: */ void bring_up() { + auto received_message_age = std::make_unique(); + received_message_age->Start(); + subscriber_statistics_collectors_.emplace_back(std::move(received_message_age)); + auto received_message_period = std::make_unique(); received_message_period->Start(); { diff --git a/rclcpp/package.xml b/rclcpp/package.xml index a5e09c0..4e7f15b 100644 --- a/rclcpp/package.xml +++ b/rclcpp/package.xml @@ -37,6 +37,7 @@ ament_lint_common rmw rmw_implementation_cmake + rosidl_default_generators test_msgs diff --git a/rclcpp/test/msg/Header.msg b/rclcpp/test/msg/Header.msg new file mode 100644 index 0000000..bfe5dc5 --- /dev/null +++ b/rclcpp/test/msg/Header.msg @@ -0,0 +1,3 @@ +# Simple Header message with a timestamp field. + +builtin_interfaces/Time stamp diff --git a/rclcpp/test/msg/MessageWithHeader.msg b/rclcpp/test/msg/MessageWithHeader.msg new file mode 100644 index 0000000..3bd1b45 --- /dev/null +++ b/rclcpp/test/msg/MessageWithHeader.msg @@ -0,0 +1,3 @@ +# Message containing a simple Header field. + +Header header diff --git a/rclcpp/test/topic_statistics/test_subscription_topic_statistics.cpp b/rclcpp/test/topic_statistics/test_subscription_topic_statistics.cpp index 37433f0..3a7e90c 100644 --- a/rclcpp/test/topic_statistics/test_subscription_topic_statistics.cpp +++ b/rclcpp/test/topic_statistics/test_subscription_topic_statistics.cpp @@ -18,12 +18,14 @@ #include #include #include +#include #include #include #include "libstatistics_collector/moving_average_statistics/types.hpp" #include "rclcpp/create_publisher.hpp" +#include "rclcpp/msg/message_with_header.hpp" #include "rclcpp/node.hpp" #include "rclcpp/qos.hpp" #include "rclcpp/rclcpp.hpp" @@ -49,6 +51,7 @@ constexpr const uint64_t kNoSamples{0}; constexpr const std::chrono::seconds kTestDuration{10}; } // namespace +using rclcpp::msg::MessageWithHeader; using test_msgs::msg::Empty; using rclcpp::topic_statistics::SubscriptionTopicStatistics; using statistics_msgs::msg::MetricsMessage; @@ -96,21 +99,47 @@ public: virtual ~EmptyPublisher() = default; - size_t get_number_published() - { - return number_published_.load(); - } - private: void publish_message() { - ++number_published_; auto msg = Empty{}; publisher_->publish(msg); } rclcpp::Publisher::SharedPtr publisher_; - std::atomic number_published_{0}; + rclcpp::TimerBase::SharedPtr publish_timer_; +}; + +/** + * MessageWithHeader publisher node: used to publish MessageWithHeader with `header` value set + */ +class MessageWithHeaderPublisher : public rclcpp::Node +{ +public: + MessageWithHeaderPublisher( + const std::string & name, const std::string & topic, + const std::chrono::milliseconds & publish_period = std::chrono::milliseconds{100}) + : Node(name) + { + publisher_ = create_publisher(topic, 10); + publish_timer_ = this->create_wall_timer( + publish_period, [this]() { + this->publish_message(); + }); + } + + virtual ~MessageWithHeaderPublisher() = default; + +private: + void publish_message() + { + auto msg = MessageWithHeader{}; + // Subtract 1 sec from current time so the received message age is always > 0 + msg.header.stamp = this->now() - rclcpp::Duration{1, 0}; + publisher_->publish(msg); + } + + rclcpp::Publisher::SharedPtr publisher_; rclcpp::TimerBase::SharedPtr publish_timer_; }; @@ -127,8 +156,8 @@ public: auto options = rclcpp::SubscriptionOptions(); options.topic_stats_options.state = rclcpp::TopicStatisticsState::Enable; - auto callback = [this](Empty::UniquePtr msg) { - this->receive_message(*msg); + auto callback = [](Empty::UniquePtr msg) { + (void) msg; }; subscription_ = create_subscription>( @@ -140,12 +169,38 @@ public: virtual ~EmptySubscriber() = default; private: - void receive_message(const Empty &) const - { - } rclcpp::Subscription::SharedPtr subscription_; }; +/** + * MessageWithHeader subscriber node: used to create subscriber topic statistics requirements + */ +class MessageWithHeaderSubscriber : public rclcpp::Node +{ +public: + MessageWithHeaderSubscriber(const std::string & name, const std::string & topic) + : Node(name) + { + // manually enable topic statistics via options + auto options = rclcpp::SubscriptionOptions(); + options.topic_stats_options.state = rclcpp::TopicStatisticsState::Enable; + + auto callback = [](MessageWithHeader::UniquePtr msg) { + (void) msg; + }; + subscription_ = create_subscription>( + topic, + rclcpp::QoS(rclcpp::KeepAll()), + callback, + options); + } + virtual ~MessageWithHeaderSubscriber() = default; + +private: + rclcpp::Subscription::SharedPtr subscription_; +}; + /** * Test fixture to bring up and teardown rclcpp */ @@ -170,18 +225,18 @@ protected: TEST_F(TestSubscriptionTopicStatisticsFixture, test_manual_construction) { - // manually create publisher tied to the node + // Manually create publisher tied to the node auto topic_stats_publisher = empty_subscriber->create_publisher( kTestTopicStatisticsTopic, 10); - // construct a separate instance + // Construct a separate instance auto sub_topic_stats = std::make_unique>( empty_subscriber->get_name(), topic_stats_publisher); - // expect no data has been collected / no samples received + // Expect no data has been collected / no samples received for (const auto & data : sub_topic_stats->get_current_collector_data()) { EXPECT_TRUE(std::isnan(data.average)); EXPECT_TRUE(std::isnan(data.min)); @@ -191,56 +246,147 @@ TEST_F(TestSubscriptionTopicStatisticsFixture, test_manual_construction) } } -TEST_F(TestSubscriptionTopicStatisticsFixture, test_receive_single_empty_stats_message) +TEST_F(TestSubscriptionTopicStatisticsFixture, test_receive_stats_for_message_no_header) { - // create an empty publisher + // Create an empty publisher auto empty_publisher = std::make_shared( kTestPubNodeName, kTestSubStatsTopic); // empty_subscriber has a topic statistics instance as part of its subscription // this will listen to and generate statistics for the empty message - // create a listener for topic statistics messages + // Create a listener for topic statistics messages auto statistics_listener = std::make_shared( "test_receive_single_empty_stats_message_listener", - "/statistics"); + "/statistics", + 2); rclcpp::executors::SingleThreadedExecutor ex; ex.add_node(empty_publisher); ex.add_node(statistics_listener); ex.add_node(empty_subscriber); - // spin and get future + // Spin and get future ex.spin_until_future_complete( statistics_listener->GetFuture(), kTestDuration); - // compare message counts, sample count should be the same as published and received count - EXPECT_EQ(1, statistics_listener->GetNumberOfMessagesReceived()); + // Compare message counts, sample count should be the same as published and received count + EXPECT_EQ(2, statistics_listener->GetNumberOfMessagesReceived()); - // check the received message and the data types - const auto received_message = statistics_listener->GetLastReceivedMessage(); - for (const auto & stats_point : received_message.statistics) { - const auto type = stats_point.data_type; - switch (type) { - case StatisticDataType::STATISTICS_DATA_TYPE_SAMPLE_COUNT: - EXPECT_LT(0, stats_point.data) << "unexpected sample count"; - break; - case StatisticDataType::STATISTICS_DATA_TYPE_AVERAGE: - EXPECT_LT(0, stats_point.data) << "unexpected avg"; - break; - case StatisticDataType::STATISTICS_DATA_TYPE_MINIMUM: - EXPECT_LT(0, stats_point.data) << "unexpected min"; - break; - case StatisticDataType::STATISTICS_DATA_TYPE_MAXIMUM: - EXPECT_LT(0, stats_point.data) << "unexpected max"; - break; - case StatisticDataType::STATISTICS_DATA_TYPE_STDDEV: - EXPECT_LT(0, stats_point.data) << "unexpected stddev"; - break; - default: - FAIL() << "received unknown statistics type: " << std::dec << - static_cast(type); + // Check the received message and the data types + const auto received_messages = statistics_listener->GetReceivedMessages(); + + EXPECT_EQ(2u, received_messages.size()); + + std::set received_metrics; + for (const auto & msg : received_messages) { + received_metrics.insert(msg.metrics_source); + } + EXPECT_TRUE(received_metrics.find("message_age") != received_metrics.end()); + EXPECT_TRUE(received_metrics.find("message_period") != received_metrics.end()); + + // Check the collected statistics for message period. + // Message age statistics will not be calculated because Empty messages + // don't have a `header` with timestamp. + for (const auto & msg : received_messages) { + if (msg.metrics_source != "message_period") { + continue; + } + for (const auto & stats_point : msg.statistics) { + const auto type = stats_point.data_type; + switch (type) { + case StatisticDataType::STATISTICS_DATA_TYPE_SAMPLE_COUNT: + EXPECT_LT(0, stats_point.data) << "unexpected sample count"; + break; + case StatisticDataType::STATISTICS_DATA_TYPE_AVERAGE: + EXPECT_LT(0, stats_point.data) << "unexpected avg"; + break; + case StatisticDataType::STATISTICS_DATA_TYPE_MINIMUM: + EXPECT_LT(0, stats_point.data) << "unexpected min"; + break; + case StatisticDataType::STATISTICS_DATA_TYPE_MAXIMUM: + EXPECT_LT(0, stats_point.data) << "unexpected max"; + break; + case StatisticDataType::STATISTICS_DATA_TYPE_STDDEV: + EXPECT_LT(0, stats_point.data) << "unexpected stddev"; + break; + default: + FAIL() << "received unknown statistics type: " << std::dec << + static_cast(type); + } + } + } +} + +TEST_F(TestSubscriptionTopicStatisticsFixture, test_receive_stats_for_message_with_header) +{ + // Create a MessageWithHeader publisher + auto msg_with_header_publisher = std::make_shared( + kTestPubNodeName, + kTestSubStatsTopic); + // empty_subscriber has a topic statistics instance as part of its subscription + // this will listen to and generate statistics for the empty message + + // Create a listener for topic statistics messages + auto statistics_listener = std::make_shared( + "test_receive_stats_for_message_with_header", + "/statistics", + 2); + + auto msg_with_header_subscriber = std::make_shared( + kTestSubNodeName, + kTestSubStatsTopic); + + rclcpp::executors::SingleThreadedExecutor ex; + ex.add_node(msg_with_header_publisher); + ex.add_node(statistics_listener); + ex.add_node(msg_with_header_subscriber); + + // Spin and get future + ex.spin_until_future_complete( + statistics_listener->GetFuture(), + kTestDuration); + + // Compare message counts, sample count should be the same as published and received count + EXPECT_EQ(2, statistics_listener->GetNumberOfMessagesReceived()); + + // Check the received message and the data types + const auto received_messages = statistics_listener->GetReceivedMessages(); + + EXPECT_EQ(2u, received_messages.size()); + + std::set received_metrics; + for (const auto & msg : received_messages) { + received_metrics.insert(msg.metrics_source); + } + EXPECT_TRUE(received_metrics.find("message_age") != received_metrics.end()); + EXPECT_TRUE(received_metrics.find("message_period") != received_metrics.end()); + + // Check the collected statistics for message period. + for (const auto & msg : received_messages) { + for (const auto & stats_point : msg.statistics) { + const auto type = stats_point.data_type; + switch (type) { + case StatisticDataType::STATISTICS_DATA_TYPE_SAMPLE_COUNT: + EXPECT_LT(0, stats_point.data) << "unexpected sample count"; + break; + case StatisticDataType::STATISTICS_DATA_TYPE_AVERAGE: + EXPECT_LT(0, stats_point.data) << "unexpected avg"; + break; + case StatisticDataType::STATISTICS_DATA_TYPE_MINIMUM: + EXPECT_LT(0, stats_point.data) << "unexpected min"; + break; + case StatisticDataType::STATISTICS_DATA_TYPE_MAXIMUM: + EXPECT_LT(0, stats_point.data) << "unexpected max"; + break; + case StatisticDataType::STATISTICS_DATA_TYPE_STDDEV: + EXPECT_LE(0, stats_point.data) << "unexpected stddev"; + break; + default: + FAIL() << "received unknown statistics type: " << std::dec << + static_cast(type); + } } } } diff --git a/rclcpp/test/topic_statistics/test_topic_stats_utils.hpp b/rclcpp/test/topic_statistics/test_topic_stats_utils.hpp index 0140f91..1399abb 100644 --- a/rclcpp/test/topic_statistics/test_topic_stats_utils.hpp +++ b/rclcpp/test/topic_statistics/test_topic_stats_utils.hpp @@ -18,6 +18,7 @@ #include #include #include +#include #include "statistics_msgs/msg/metrics_message.hpp" @@ -89,7 +90,7 @@ public: MetricsMessageSubscriber( const std::string & name, const std::string & topic_name, - const int number_of_messages_to_receive = 1) + const int number_of_messages_to_receive = 2) : rclcpp::Node(name), number_of_messages_to_receive_(number_of_messages_to_receive) { @@ -99,7 +100,7 @@ public: subscription_ = create_subscription>( topic_name, - 0 /*history_depth*/, + 10 /*history_depth*/, callback); } @@ -107,10 +108,10 @@ public: * Acquires a mutex in order to get the last message received member. * \return the last message received */ - MetricsMessage GetLastReceivedMessage() const + std::vector GetReceivedMessages() const { std::unique_lock ulock{mutex_}; - return last_received_message_; + return received_messages_; } /** @@ -132,13 +133,13 @@ private: { std::unique_lock ulock{mutex_}; ++num_messages_received_; - last_received_message_ = msg; + received_messages_.push_back(msg); if (num_messages_received_ >= number_of_messages_to_receive_) { PromiseSetter::SetPromise(); } } - MetricsMessage last_received_message_; + std::vector received_messages_; rclcpp::Subscription::SharedPtr subscription_; mutable std::mutex mutex_; std::atomic num_messages_received_{0};