wipcontinued external observation framework. got some output already, but not correct yet (i think)

This commit is contained in:
Niklas Halle 2025-04-07 14:05:16 +00:00
parent 5c40743697
commit 3899d56cca
12 changed files with 38734 additions and 13177 deletions

View file

@ -69,7 +69,8 @@
"cinttypes": "cpp",
"typeindex": "cpp",
"typeinfo": "cpp",
"variant": "cpp"
"variant": "cpp",
"codecvt": "cpp"
},
"cmake.ignoreCMakeListsMissing": true,
"python.autoComplete.extraPaths": [

File diff suppressed because one or more lines are too long

File diff suppressed because it is too large Load diff

File diff suppressed because it is too large Load diff

View file

@ -0,0 +1,43 @@
[
{"entry": {"operation": "start_work", "chain": 0, "node": "CustomListenerNode/CustomListenerNode-wildfire_talk", "count": 0}, "time": 18545986},
{"entry": {"operation": "end_work", "chain": 0, "node": "CustomListenerNode/CustomListenerNode-wildfire_talk", "count": 0, "duration_us": 19}, "time": 18545986},
{"entry": {"operation": "start_work", "chain": 0, "node": "CustomListenerNode/CustomListenerNode-wildfire_talk", "count": 0}, "time": 18546487},
{"entry": {"operation": "end_work", "chain": 0, "node": "CustomListenerNode/CustomListenerNode-wildfire_talk", "count": 0, "duration_us": 91}, "time": 18546487},
{"entry": {"operation": "start_work", "chain": 0, "node": "CustomListenerNode/CustomListenerNode-wildfire_talk", "count": 0}, "time": 18546987},
{"entry": {"operation": "end_work", "chain": 0, "node": "CustomListenerNode/CustomListenerNode-wildfire_talk", "count": 0, "duration_us": 30}, "time": 18546987},
{"entry": {"operation": "start_work", "chain": 0, "node": "CustomListenerNode/CustomListenerNode-wildfire_talk", "count": 0}, "time": 18547487},
{"entry": {"operation": "end_work", "chain": 0, "node": "CustomListenerNode/CustomListenerNode-wildfire_talk", "count": 0, "duration_us": 76}, "time": 18547487},
{"entry": {"operation": "start_work", "chain": 0, "node": "CustomListenerNode/CustomListenerNode-wildfire_talk", "count": 0}, "time": 18547986},
{"entry": {"operation": "end_work", "chain": 0, "node": "CustomListenerNode/CustomListenerNode-wildfire_talk", "count": 0, "duration_us": 46}, "time": 18547986},
{"entry": {"operation": "start_work", "chain": 0, "node": "CustomListenerNode/CustomListenerNode-wildfire_talk", "count": 0}, "time": 18548488},
{"entry": {"operation": "end_work", "chain": 0, "node": "CustomListenerNode/CustomListenerNode-wildfire_talk", "count": 0, "duration_us": 61}, "time": 18548488},
{"entry": {"operation": "start_work", "chain": 0, "node": "CustomListenerNode/CustomListenerNode-wildfire_talk", "count": 0}, "time": 18548986},
{"entry": {"operation": "end_work", "chain": 0, "node": "CustomListenerNode/CustomListenerNode-wildfire_talk", "count": 0, "duration_us": 23}, "time": 18548986},
{"entry": {"operation": "start_work", "chain": 0, "node": "CustomListenerNode/CustomListenerNode-wildfire_talk", "count": 0}, "time": 18549487},
{"entry": {"operation": "end_work", "chain": 0, "node": "CustomListenerNode/CustomListenerNode-wildfire_talk", "count": 0, "duration_us": 73}, "time": 18549487},
{"entry": {"operation": "start_work", "chain": 0, "node": "CustomListenerNode/CustomListenerNode-wildfire_talk", "count": 0}, "time": 18549987},
{"entry": {"operation": "end_work", "chain": 0, "node": "CustomListenerNode/CustomListenerNode-wildfire_talk", "count": 0, "duration_us": 13}, "time": 18549987},
{"entry": {"operation": "start_work", "chain": 0, "node": "CustomListenerNode/CustomListenerNode-wildfire_talk", "count": 0}, "time": 18550486},
{"entry": {"operation": "end_work", "chain": 0, "node": "CustomListenerNode/CustomListenerNode-wildfire_talk", "count": 0, "duration_us": 141}, "time": 18550487},
{"entry": {"operation": "start_work", "chain": 0, "node": "CustomTalkerNode/CustomTalkerNode-timer", "count": 0}, "time": 18545986},
{"entry": {"operation": "end_work", "chain": 0, "node": "CustomTalkerNode/CustomTalkerNode-timer", "count": 0, "duration_us": 255}, "time": 18545986},
{"entry": {"operation": "start_work", "chain": 0, "node": "CustomTalkerNode/CustomTalkerNode-timer", "count": 0}, "time": 18546486},
{"entry": {"operation": "end_work", "chain": 0, "node": "CustomTalkerNode/CustomTalkerNode-timer", "count": 0, "duration_us": 459}, "time": 18546486},
{"entry": {"operation": "start_work", "chain": 0, "node": "CustomTalkerNode/CustomTalkerNode-timer", "count": 0}, "time": 18546986},
{"entry": {"operation": "end_work", "chain": 0, "node": "CustomTalkerNode/CustomTalkerNode-timer", "count": 0, "duration_us": 264}, "time": 18546986},
{"entry": {"operation": "start_work", "chain": 0, "node": "CustomTalkerNode/CustomTalkerNode-timer", "count": 0}, "time": 18547486},
{"entry": {"operation": "end_work", "chain": 0, "node": "CustomTalkerNode/CustomTalkerNode-timer", "count": 0, "duration_us": 338}, "time": 18547486},
{"entry": {"operation": "start_work", "chain": 0, "node": "CustomTalkerNode/CustomTalkerNode-timer", "count": 0}, "time": 18547986},
{"entry": {"operation": "end_work", "chain": 0, "node": "CustomTalkerNode/CustomTalkerNode-timer", "count": 0, "duration_us": 153}, "time": 18547986},
{"entry": {"operation": "start_work", "chain": 0, "node": "CustomTalkerNode/CustomTalkerNode-timer", "count": 0}, "time": 18548487},
{"entry": {"operation": "end_work", "chain": 0, "node": "CustomTalkerNode/CustomTalkerNode-timer", "count": 0, "duration_us": 227}, "time": 18548488},
{"entry": {"operation": "start_work", "chain": 0, "node": "CustomTalkerNode/CustomTalkerNode-timer", "count": 0}, "time": 18548986},
{"entry": {"operation": "end_work", "chain": 0, "node": "CustomTalkerNode/CustomTalkerNode-timer", "count": 0, "duration_us": 78}, "time": 18548986},
{"entry": {"operation": "start_work", "chain": 0, "node": "CustomTalkerNode/CustomTalkerNode-timer", "count": 0}, "time": 18549486},
{"entry": {"operation": "end_work", "chain": 0, "node": "CustomTalkerNode/CustomTalkerNode-timer", "count": 0, "duration_us": 294}, "time": 18549486},
{"entry": {"operation": "start_work", "chain": 0, "node": "CustomTalkerNode/CustomTalkerNode-timer", "count": 0}, "time": 18549986},
{"entry": {"operation": "end_work", "chain": 0, "node": "CustomTalkerNode/CustomTalkerNode-timer", "count": 0, "duration_us": 260}, "time": 18549986},
{"entry": {"operation": "start_work", "chain": 0, "node": "CustomTalkerNode/CustomTalkerNode-timer", "count": 0}, "time": 18550486},
{"entry": {"operation": "end_work", "chain": 0, "node": "CustomTalkerNode/CustomTalkerNode-timer", "count": 0, "duration_us": 183}, "time": 18550486},
{"entry": {"operation": "start_work", "chain": 0, "node": "CustomTalkerNode/CustomTalkerNode-timer", "count": 0}, "time": 18550986},
{"entry": {"operation": "end_work", "chain": 0, "node": "CustomTalkerNode/CustomTalkerNode-timer", "count": 0, "duration_us": 395}, "time": 18550986}]

View file

@ -61,8 +61,8 @@ int ros_experiment(
int const argc,
char** const argv,
std::string const& file_name,
ExperimentConfig config) {
ExperimentConfig config)
{
rclcpp::init(argc, argv);
// we have to create a node to process any passed parameters

View file

@ -1,22 +1,23 @@
cmake_minimum_required(VERSION 3.5)
project(existing_system_monitor)
project(existing_system_monitor VERSION 0.1.0)
# Default to C++14
if(NOT CMAKE_CXX_STANDARD)
set(CMAKE_CXX_STANDARD 17)
endif()
# Set C++ standards
set(CMAKE_CXX_STANDARD 17)
set(CMAKE_CXX_STANDARD_REQUIRED ON)
set(CMAKE_CXX_EXTENSIONS OFF)
# Compiler options
if(CMAKE_COMPILER_IS_GNUCXX OR CMAKE_CXX_COMPILER_ID MATCHES "Clang")
add_compile_options(-Wall -Wextra -Wpedantic)
endif()
# Find dependencies
find_package(ament_cmake REQUIRED)
find_package(rclcpp REQUIRED)
find_package(casestudy_tools REQUIRED)
find_package(simple_timer REQUIRED)
include_directories(include)
# Library targets
add_library(${PROJECT_NAME} src/instrumented_node.cpp)
target_include_directories(${PROJECT_NAME} PUBLIC
$<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/include>
@ -27,44 +28,42 @@ ament_target_dependencies(${PROJECT_NAME}
simple_timer
)
# Executable targets
add_executable(uas src/uas.cpp)
target_include_directories(uas PUBLIC
$<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/include>
$<INSTALL_INTERFACE:include>
)
ament_target_dependencies(uas
simple_timer
casestudy_tools
rclcpp
)
target_link_libraries(uas ${PROJECT_NAME})
# Testing
if(BUILD_TESTING)
find_package(ament_lint_auto REQUIRED)
ament_lint_auto_find_test_dependencies()
endif()
# Installation
install(
DIRECTORY include/
DESTINATION include
)
install(TARGETS
${PROJECT_NAME}
install(
TARGETS ${PROJECT_NAME} uas
EXPORT export_${PROJECT_NAME}
LIBRARY DESTINATION lib
ARCHIVE DESTINATION lib
RUNTIME DESTINATION bin
RUNTIME DESTINATION lib/${PROJECT_NAME}
INCLUDES DESTINATION include
)
# add executable for uas.cpp
add_library(uas src/uas.cpp src/instrumented_node.cpp)
ament_target_dependencies(uas
rclcpp
casestudy_tools
simple_timer
)
target_link_libraries(uas
${PROJECT_NAME}
)
target_include_directories(uas PUBLIC
$<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/include>
$<INSTALL_INTERFACE:include>)
install(TARGETS
uas
EXPORT export_${PROJECT_NAME}
RUNTIME DESTINATION bin
)
# Export and package configuration
ament_export_include_directories(include)
ament_export_libraries(${PROJECT_NAME})
ament_export_targets(export_${PROJECT_NAME})
ament_export_targets(export_${PROJECT_NAME} HAS_LIBRARY_TARGET)
ament_export_dependencies(rclcpp casestudy_tools simple_timer)
ament_package()

View file

@ -29,11 +29,15 @@ public:
template<typename... Args>
void after_callback(Args&&... args);
friend class MetricsCollector;
protected:
node_time_logger logger_;
private:
std::string node_name_;
std::string callback_name_;
int chain_id_;
node_time_logger logger_;
std::chrono::high_resolution_clock::time_point start_time_;
};
@ -98,14 +102,24 @@ public:
// Class to hold metrics collection and analysis
class MetricsCollector {
public:
struct Config {
std::string executor_type = "default";
};
static MetricsCollector& get_instance();
void register_callback(std::shared_ptr<InstrumentedCallback> callback);
void write_logs_to_file(const std::string& file_name, const std::string& experiment_name);
void set_config(Config config) { config_ = config; }
private:
MetricsCollector() = default;
MetricsCollector() {
logger_ = create_logger();
}
std::unordered_map<std::string, std::shared_ptr<InstrumentedCallback>> callbacks_;
node_time_logger logger_;
Config config_;
std::string buildLogs(std::vector<node_time_logger> node_logs = {});
};
} // namespace existing_system_monitor

View file

@ -15,9 +15,14 @@ namespace existing_system_monitor {
template <typename NodeT>
class InstrumentedNode : public NodeT {
public:
struct NodeIdentifier {
int chain_id;
std::string node_name;
};
template <typename... Args>
InstrumentedNode(int chain_id, Args&&... args)
: NodeT(std::forward<Args>(args)...), chain_id_(chain_id) {
InstrumentedNode(NodeIdentifier id, Args&&... args)
: NodeT(std::forward<Args>(args)...), chain_id_(id.chain_id), node_name_(id.node_name) {
}
// Override subscription creation
@ -26,20 +31,30 @@ public:
const std::string& topic_name,
const rclcpp::QoS& qos,
CallbackT&& callback,
const rclcpp::SubscriptionOptions& options = rclcpp::SubscriptionOptions()) {
const rclcpp::SubscriptionOptions& options = rclcpp::SubscriptionOptions())
{
// Create a wrapper for the callback
auto instrumented_callback = std::make_shared<InstrumentedSubscriptionCallback<MessageT>>(
this->get_name(), topic_name, chain_id_,
std::forward<CallbackT>(callback));
auto instrumented_callback =
std::make_shared<InstrumentedSubscriptionCallback<MessageT>>(
this->get_name(),
node_name_ + "-" + topic_name,
chain_id_,
std::forward<CallbackT>(callback)
);
// Register the callback with the MetricsCollector
MetricsCollector::get_instance().register_callback(instrumented_callback);
// Create the subscription with the instrumented callback
auto subscription = NodeT::template create_subscription<MessageT>(
topic_name, qos,
auto subscription =
NodeT::template create_subscription<MessageT>(
topic_name,
qos,
[instrumented_callback](typename MessageT::SharedPtr msg) {
instrumented_callback->callback(msg);
},
options);
options
);
return subscription;
}
@ -49,26 +64,36 @@ public:
rclcpp::TimerBase::SharedPtr create_wall_timer(
DurationT period,
CallbackT&& callback,
rclcpp::CallbackGroup::SharedPtr group = nullptr) {
rclcpp::CallbackGroup::SharedPtr group = nullptr)
{
// Create a wrapper for the callback
auto instrumented_callback = std::make_shared<InstrumentedTimerCallback>(
this->get_name(), "timer", chain_id_,
std::forward<CallbackT>(callback));
auto instrumented_callback =
std::make_shared<InstrumentedTimerCallback>(
this->get_name(),
node_name_ + "-timer",
chain_id_,
std::forward<CallbackT>(callback)
);
// Register the callback with the MetricsCollector
MetricsCollector::get_instance().register_callback(instrumented_callback);
// Create the timer with the instrumented callback
auto timer = NodeT::create_wall_timer(
auto timer =
NodeT::create_wall_timer(
period,
[instrumented_callback]() {
instrumented_callback->callback();
},
group);
group
);
return timer;
}
private:
int chain_id_;
std::string node_name_;
};
} // namespace existing_system_monitor

View file

@ -4,7 +4,7 @@
<name>existing_system_monitor</name>
<version>0.1.0</version>
<description>Tools for monitoring existing ROS2 systems</description>
<maintainer email="user@todo.todo">user</maintainer>
<maintainer email="niklas@niklashalle.net">niklas</maintainer>
<license>Apache License 2.0</license>
<buildtool_depend>ament_cmake</buildtool_depend>
@ -13,6 +13,9 @@
<depend>casestudy_tools</depend>
<depend>simple_timer</depend>
<test_depend>ament_lint_auto</test_depend>
<test_depend>ament_lint_common</test_depend>
<export>
<build_type>ament_cmake</build_type>
</export>

View file

@ -1,5 +1,6 @@
#include <sstream>
#include <fstream>
#include <filesystem>
#include "std_msgs/msg/string.hpp"
@ -91,11 +92,68 @@ MetricsCollector& MetricsCollector::get_instance() {
}
void MetricsCollector::register_callback(std::shared_ptr<InstrumentedCallback> callback) {
// To be implemented
// Create a unique key for the callback using its node name and callback name
std::string key = std::to_string(callback->chain_id_) + "_" +
callback->node_name_ + "_" +
callback->callback_name_;
callbacks_[key] = callback;
}
std::string MetricsCollector::buildLogs(std::vector<node_time_logger> node_logs) {
std::stringstream output_file;
output_file << "[" << std::endl;
// Add logs from MetricsCollector's logger
for (auto const& log : *(logger_.recorded_times)) {
output_file << "{\"entry\": " << log.first
<< ", \"time\": " << log.second << "}," << std::endl;
}
// Add logs from registered callbacks
for (auto const& [name, callback] : callbacks_) {
for (auto const& log : *(callback->logger_.recorded_times)) {
output_file << "{\"entry\": " << log.first
<< ", \"time\": " << log.second << "}," << std::endl;
}
}
// Add logs from additional node loggers passed in
for (auto const& logger : node_logs) {
for (auto const& log : *(logger.recorded_times)) {
output_file << "{\"entry\": " << log.first
<< ", \"time\": " << log.second << "}," << std::endl;
}
}
// Remove the last comma if there are entries
if (!logger_.recorded_times->empty() || !callbacks_.empty() || !node_logs.empty()) {
output_file.seekp(-2, std::ios_base::end);
}
output_file << "]" << std::endl;
return output_file.str();
}
void MetricsCollector::write_logs_to_file(const std::string& file_name, const std::string& experiment_name) {
// This would use similar code to the existing Experiment::writeLogsToFile method
std::filesystem::path results_dir = "results";
std::filesystem::path exp_dir = results_dir / experiment_name;
// Create directories if they don't exist
try {
std::filesystem::create_directories(exp_dir);
} catch (std::filesystem::filesystem_error const &e) {
std::cout << "Could not create results directory: " << e.what() << std::endl;
return;
}
std::filesystem::path output_path = exp_dir / (file_name + "_" + config_.executor_type + ".json");
std::ofstream output_file(output_path);
output_file << buildLogs();
output_file.close();
std::cout << "results written to " << output_path.string() << std::endl;
}
template class InstrumentedSubscriptionCallback<std_msgs::msg::String>;

View file

@ -20,7 +20,7 @@ constexpr char const* TOPIC_NAME = "wildfire_talk";
class CustomTalkerNode : public existing_system_monitor::InstrumentedNode<rclcpp::Node> {
public:
// Pass chain_id to InstrumentedNode constructor
CustomTalkerNode(int chain_id, const std::string& node_name) : InstrumentedNode(chain_id, node_name) {
CustomTalkerNode(int chain_id, std::string const &node_name) : InstrumentedNode({chain_id, node_name}, node_name) {
// The rest of the code stays the same!
timer_ = create_wall_timer(
std::chrono::milliseconds(100),
@ -49,7 +49,7 @@ private:
class CustomListenerNode : public existing_system_monitor::InstrumentedNode<rclcpp::Node> {
public:
// Pass chain_id to InstrumentedNode constructor
CustomListenerNode(int chain_id, const std::string& node_name) : InstrumentedNode(chain_id, node_name) {
CustomListenerNode(int chain_id, const std::string& node_name) : InstrumentedNode({chain_id, node_name}, node_name) {
sub_ = this->create_subscription<std_msgs::msg::String>(
TOPIC_NAME, 10, std::bind(&CustomListenerNode::topic_callback, this, std::placeholders::_1));
}
@ -67,6 +67,11 @@ int main(int argc, char* argv[]) {
// calibrate the dummy load for the current system
calibrate_dummy_load();
existing_system_monitor::MetricsCollector::Config config;
config.executor_type = "edf";
auto& collector = existing_system_monitor::MetricsCollector::get_instance();
collector.set_config(config);
rclcpp::init(argc, argv);
// Create nodes for the experiment
@ -88,21 +93,16 @@ int main(int argc, char* argv[]) {
// must be set to post_execute can set new deadlines
executor->prio_memory_strategy_ = strategy;
// the new funcitons in PriorityMemoryStrategy accept the handle of the
// timer/subscription as the first argument
strategy->set_executable_deadline(talker->timer_->get_timer_handle(), 1000,
priority_executor::ExecutableType::TIMER, 0);
strategy->set_executable_deadline(talker->timer_->get_timer_handle(), 1000, priority_executor::ExecutableType::TIMER, 0);
// you _must_ set the timer_handle for each chain
strategy->get_priority_settings(talker->timer_->get_timer_handle())
->timer_handle = talker->timer_;
strategy->get_priority_settings(talker->timer_->get_timer_handle())->timer_handle = talker->timer_;
// you _must_ mark the first executable in the chain
strategy->set_first_in_chain(talker->timer_->get_timer_handle());
// set the same period and chain_id for each callback in the chain
strategy->set_executable_deadline(listener1->sub_->get_subscription_handle(),
1000, priority_executor::ExecutableType::SUBSCRIPTION, 0);
strategy->set_executable_deadline(listener2->sub_->get_subscription_handle(),
1000, priority_executor::ExecutableType::SUBSCRIPTION, 0);
strategy->set_executable_deadline(listener1->sub_->get_subscription_handle(), 1000, priority_executor::ExecutableType::SUBSCRIPTION, 0);
strategy->set_executable_deadline(listener2->sub_->get_subscription_handle(), 1000, priority_executor::ExecutableType::SUBSCRIPTION, 0);
// you _must_ mark the last executable in the chain (used to keep track of different instances of the same chain)
strategy->set_last_in_chain(listener2->sub_->get_subscription_handle());
// add all the nodes to the executor
@ -111,22 +111,15 @@ int main(int argc, char* argv[]) {
executor->add_node(listener2);
// if the executor behaves unexpectedly, you can print the priority settings to make sure they are correct
std::cout << *strategy->get_priority_settings(
talker->timer_->get_timer_handle())
<< std::endl;
std::cout << *strategy->get_priority_settings(
listener1->sub_->get_subscription_handle())
<< std::endl;
std::cout << *strategy->get_priority_settings(
listener2->sub_->get_subscription_handle())
<< std::endl;
std::cout << *strategy->get_priority_settings(talker->timer_->get_timer_handle()) << std::endl;
std::cout << *strategy->get_priority_settings(listener1->sub_->get_subscription_handle()) << std::endl;
std::cout << *strategy->get_priority_settings(listener2->sub_->get_subscription_handle()) << std::endl;
executor->spin();
rclcpp::shutdown();
// Get all collected metrics and write them to a file
auto& collector = existing_system_monitor::MetricsCollector::get_instance();
collector.write_logs_to_file("my_system_analysis", "real_system_test");
collector.write_logs_to_file("uas", "existing_system_monitor");
return 0;
}