wip Fri Apr 4 04:25:13 PM CEST 2025

This commit is contained in:
Niklas Halle 2025-04-04 16:25:13 +02:00
parent 917db081b7
commit 5c40743697
16 changed files with 1675 additions and 123 deletions

File diff suppressed because one or more lines are too long

View file

View file

@ -111,11 +111,11 @@ int main(int argc, char* argv[]) {
for (uint j = 0; j < root["node_ids"][i].size(); j++) {
node_ids_row.push_back(root["node_ids"][i][j].asUInt());
}
config.node_ids.push_back(std::move(node_ids_row));
config.callback_ids.push_back(std::move(node_ids_row));
}
for (uint i = 0; i < root["node_priorities"].size(); i++) {
config.node_priorities.push_back(root["node_priorities"][i].asUInt());
config.callback_priorities.push_back(root["node_priorities"][i].asUInt());
}
config.num_groups = 0;
@ -125,7 +125,7 @@ int main(int argc, char* argv[]) {
}
for (uint i = 0; i < root["node_runtimes"].size(); i++) {
config.node_runtimes.push_back(root["node_runtimes"][i].asDouble());
config.callback_runtimes.push_back(root["node_runtimes"][i].asDouble());
}
for (uint i = 0; i < root["chain_periods"].size(); i++) {
@ -133,7 +133,7 @@ int main(int argc, char* argv[]) {
}
for (uint i = 0; i < root["node_executor_assignments"].size(); i++) {
config.node_executor_assignments.push_back(root["node_executor_assignments"][i].asUInt() - 1);
config.callback_executor_assignments.push_back(root["node_executor_assignments"][i].asUInt() - 1);
}
for (uint i = 0; i < root["executor_to_cpu_core"].size(); i++) {

View file

@ -122,28 +122,28 @@ int main(int argc, char* argv[]) {
// runtimes and timers subject to change
config.chain_lengths = {};
config.node_ids = {};
config.callback_ids = {};
config.chain_timer_control = {};
config.chain_periods = {};
config.node_runtimes = {};
config.node_executor_assignments = {};
config.callback_runtimes = {};
config.callback_executor_assignments = {};
int node_id = 0;
for (int c = 0; c < num_chains; c++) {
config.node_ids.push_back({});
config.callback_ids.push_back({});
for (int i = 0; i < num_s + 1; i++) {
config.node_ids[c].push_back(node_id);
config.callback_ids[c].push_back(node_id);
if (i == 0) {
// first node is the publisher, and goes in the first executor
config.node_executor_assignments.push_back(0);
config.callback_executor_assignments.push_back(0);
} else {
// all other nodes go in the second executor
config.node_executor_assignments.push_back(1);
config.callback_executor_assignments.push_back(1);
}
node_id++;
}
for (int i = 0; i < num_s + 1; i++) {
config.node_priorities.push_back(i);
config.node_runtimes.push_back(runtime);
config.callback_priorities.push_back(i);
config.callback_runtimes.push_back(runtime);
}
config.chain_lengths.push_back(num_s+1);
config.chain_periods.push_back(10);
@ -153,9 +153,9 @@ int main(int argc, char* argv[]) {
config.executor_to_cpu_assignments = {0, 1};
std::cout << "node ids: " << std::endl;
for (size_t i = 0; i < config.node_ids.size(); i++) {
for (size_t j = 0; j < config.node_ids[i].size(); j++) {
std::cout << config.node_ids[i][j] << " ";
for (size_t i = 0; i < config.callback_ids.size(); i++) {
for (size_t j = 0; j < config.callback_ids[i].size(); j++) {
std::cout << config.callback_ids[i][j] << " ";
}
std::cout << std::endl;
}

View file

@ -117,32 +117,32 @@ int main(int argc, char* argv[]) {
// runtimes and timers subject to change
config.chain_lengths = {};
config.node_ids = {};
config.callback_ids = {};
config.chain_timer_control = {};
config.chain_periods = {};
config.node_runtimes = {};
config.callback_runtimes = {};
int node_id = 0;
for (int c = 0; c < num_chains; c++) {
config.node_ids.push_back({});
config.callback_ids.push_back({});
for (int i = 0; i < num_s + 1; i++) {
config.node_ids[c].push_back(node_id);
config.callback_ids[c].push_back(node_id);
node_id++;
}
for (int i = 0; i < num_s + 1; i++) {
config.node_priorities.push_back(i);
config.callback_priorities.push_back(i);
}
config.chain_lengths.push_back(num_s + 1);
config.chain_periods.push_back(10);
for (int i = 0; i < num_s + 1; i++) {
config.node_runtimes.push_back(runtime);
config.callback_runtimes.push_back(runtime);
}
config.chain_timer_control.push_back(c);
}
std::cout << "node ids: " << std::endl;
for (size_t i = 0; i < config.node_ids.size(); i++) {
for (size_t j = 0; j < config.node_ids[i].size(); j++) {
std::cout << config.node_ids[i][j] << " ";
for (size_t i = 0; i < config.callback_ids.size(); i++) {
for (size_t j = 0; j < config.callback_ids[i].size(); j++) {
std::cout << config.callback_ids[i][j] << " ";
}
std::cout << std::endl;
}
@ -150,7 +150,7 @@ int main(int argc, char* argv[]) {
config.num_groups = 0;
config.group_memberships = {}; // no groups
config.node_executor_assignments = {};
config.callback_executor_assignments = {};
// config.node_executor_assignments = {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0};
config.parallel_mode = false;
config.cores = 1;

View file

@ -108,14 +108,14 @@ int main(int argc, char* argv[]) {
ExperimentConfig config;
config.chain_lengths = {2, 2};
config.node_ids = {{0, 1}, {2, 3}};
config.node_priorities = {1, 0, 3, 2};
config.callback_ids = {{0, 1}, {2, 3}};
config.callback_priorities = {1, 0, 3, 2};
config.chain_timer_control = {0, 1};
config.node_runtimes = {10, 10, 10, 10};
config.callback_runtimes = {10, 10, 10, 10};
// node 0 has a period of 80, and is the only timer
config.chain_periods = {100, 100};
config.node_executor_assignments = {};
config.callback_executor_assignments = {};
config.parallel_mode = true;
config.cores = 2;

View file

@ -97,7 +97,7 @@ int ros_experiment(
t.join();
}
std::string const outputname = "casestudy_example";
std::string const outputname = "casestudy_wildfire_drone";
experiment.writeLogsToFile(file_name, outputname);
/*
@ -140,17 +140,34 @@ int main(int argc, char* argv[]) {
calibrate_dummy_load();
ExperimentConfig config;
config.chain_lengths = {2, 2};
config.node_ids = {{0, 1}, {2, 3}};
config.node_priorities = {1, 0, 3, 2};
config.chain_timer_control = {0, 1};
// call back id: 0 1 2 3 4 5 6 7 8 9 10 11 12 13
config.callback_priorities = { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0};
// duration of the callback in ms
config.callback_runtimes = { 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10};
config.node_runtimes = {10, 10, 10, 10};
// node 0 has a period of 80, and is the only timer
config.chain_periods = {100, 100};
config.node_executor_assignments = {};
config.parallel_mode = true;
config.cores = 2;
// we have four chains where
// - the first is 1 + numOf(radiometric corrections) + 1 long
// - the second is 1 + numOf(radiometric corrections) + numOf(geometric corrections) + 1 long
// - the third is 3 long
// - the fourth is 4 long
config.callback_ids = {
{1, 2, 3, 0},
{1, 2, 3, 4, 5, 6},
{1, 2, 3, 4, 5, 6, 9, 7, 8},
{9, 12, 13, 10, 11}
};
config.chain_lengths = {4, 6, 9, 5};
// period of the timer of the starting node in ms - thus basically sensor frequency
config.chain_periods = {100, 100, 100, 100};
config.chain_timer_control = {0, 0, 0, 0};
// TODO: ?
config.callback_executor_assignments = {};
config.parallel_mode = false;
config.cores = 1;
// TODO: you can add nodes to the config - what does it do? do we want that?
sanity_check_config(config);

View file

@ -11,14 +11,18 @@
struct ExperimentConfig
{
std::vector<uint32_t> chain_lengths;
// TODO: rename to callback_ids
std::vector<std::vector<uint32_t>> node_ids;
// TODO: rename to callback_runtimes
std::vector<double> node_runtimes;
// TODO: rename to callback_priorities
std::vector<uint32_t> node_priorities;
std::vector<std::vector<uint32_t>> callback_ids;
// duration of the callback in ms
std::vector<double> callback_runtimes;
// used for picas or in edf as tie breaker
std::vector<uint32_t> callback_priorities;
// period of the timer of the starting node in ms
std::vector<int> chain_periods;
// for each chain, the index of the timer node
// for each chain, the index of the timer node - TODO: why would I ever want this to be non 0?
std::vector<int> chain_timer_control;
uint num_groups = 0;
@ -29,8 +33,8 @@ struct ExperimentConfig
std::string executor_type = "default";
// if this is empty, then the nodes will be assigned to a single executor
// TODO: rename this to callback_executor_assignments
std::vector<uint32_t> node_executor_assignments;
std::vector<uint32_t> callback_executor_assignments;
// used in single-threaded mode to assign executors to cores
std::vector<uint32_t> executor_to_cpu_assignments;

View file

@ -1,17 +1,20 @@
#include "casestudy_tools/experiment.hpp"
#include "casestudy_tools/primes_workload.hpp"
#include "casestudy_tools/test_nodes.hpp"
#include "priority_executor/multithread_priority_executor.hpp"
#include "casestudy_tools/primes_workload.hpp"
#include "priority_executor/priority_executor.hpp"
#include "priority_executor/priority_memory_strategy.hpp"
#include "priority_executor/multithread_priority_executor.hpp"
#include <cstddef>
#include <deque>
#include <fstream>
#include <iostream>
#include <memory>
#include <sys/prctl.h>
#include <vector>
#include <fstream>
#include <cstddef>
#include <iostream>
#include <filesystem>
#include <sys/prctl.h>
Experiment::Experiment(ExperimentConfig config)
: config(std::move(config)) {
@ -20,19 +23,19 @@ Experiment::Experiment(ExperimentConfig config)
void sanity_check_config(ExperimentConfig const &config) {
// make sure chain_lengths and node_ids are the same size
if (config.chain_lengths.size() != config.node_ids.size()) {
if (config.chain_lengths.size() != config.callback_ids.size()) {
std::cout << "ros_experiment: chain_lengths.size()= "
<< config.chain_lengths.size()
<< " != node_ids.size()= " << config.node_ids.size() << std::endl;
<< " != node_ids.size()= " << config.callback_ids.size() << std::endl;
exit(1);
}
// make sure each chain_lengths is the same size as the corresponding node_ids
for (uint32_t i = 0; i < config.chain_lengths.size(); i++) {
if (config.chain_lengths[i] != config.node_ids[i].size()) {
if (config.chain_lengths[i] != config.callback_ids[i].size()) {
std::cout << "ros_experiment: chain_lengths[" << i
<< "]= " << config.chain_lengths[i] << " != node_ids[" << i
<< "].size()= " << config.node_ids[i].size() << std::endl;
<< "].size()= " << config.callback_ids[i].size() << std::endl;
exit(1);
}
}
@ -47,22 +50,22 @@ void sanity_check_config(ExperimentConfig const &config) {
}
std::set<int> all_node_ids;
for (auto const &chain : config.node_ids) {
for (auto const &chain : config.callback_ids) {
for (auto const &node_id : chain) {
all_node_ids.insert(node_id);
}
}
// make sure we have the right number of node_priorities and node_runtimes
if (all_node_ids.size() != config.node_priorities.size()) {
if (all_node_ids.size() != config.callback_priorities.size()) {
std::cout << "ros_experiment: all_node_ids.size()= " << all_node_ids.size()
<< " != node_priorities.size()= " << config.node_priorities.size()
<< " != node_priorities.size()= " << config.callback_priorities.size()
<< std::endl;
exit(1);
}
if (all_node_ids.size() != config.node_runtimes.size()) {
if (all_node_ids.size() != config.callback_runtimes.size()) {
std::cout << "ros_experiment: all_node_ids.size()= " << all_node_ids.size()
<< " != node_runtimes.size()= " << config.node_runtimes.size()
<< " != node_runtimes.size()= " << config.callback_runtimes.size()
<< std::endl;
exit(1);
}
@ -71,7 +74,7 @@ void sanity_check_config(ExperimentConfig const &config) {
std::string Experiment::run(std::atomic<bool>& should_do_task) {
// TODO: split into setup and run, so that run can be re-used in
// Experiment::getRunFunctions
if (!config.node_executor_assignments.empty()
if (!config.callback_executor_assignments.empty()
|| numExecutorsRequired(config) != 1) {
std::cerr << "called Experiment::run with non-empty node_executor_assignments"
<< std::endl;
@ -124,12 +127,12 @@ Experiment::getRunFunctions() {
}
size_t Experiment::numExecutorsRequired(ExperimentConfig const &config) {
if (config.node_executor_assignments.empty()) {
if (config.callback_executor_assignments.empty()) {
return 1;
}
std::set<int> unique_executors;
for (auto const &assignment : config.node_executor_assignments) {
for (auto const &assignment : config.callback_executor_assignments) {
unique_executors.insert(assignment);
}
return unique_executors.size();
@ -140,7 +143,7 @@ experiment_executor Experiment::getExecutor(int const executor_idx) {
}
void Experiment::createExecutors() {
if (config.node_executor_assignments.empty()) {
if (config.callback_executor_assignments.empty()) {
// Create a single executor
executors.push_back(createSingleExecutor(0));
return;
@ -161,12 +164,10 @@ void Experiment::createExecutors() {
exit(1);
}
// Create the required host nodes - re-use any existing nodes
for (uint i = config.nodes.size();
i < config.executor_to_cpu_assignments.size(); i++) {
// Create the required host nodes - re-use any existing nodes (by not starting at 0)
for (uint i = config.nodes.size(); i < config.executor_to_cpu_assignments.size(); i++) {
// create a node for each executor
auto const node = std::make_shared<rclcpp::Node>(
"node_" + std::to_string(i));
auto const node = std::make_shared<rclcpp::Node>("node_" + std::to_string(i));
config.nodes.push_back(node);
}
std::cout << "created " << config.nodes.size() << " nodes" << std::endl;
@ -195,7 +196,7 @@ experiment_executor Experiment::createSingleExecutor(uint const executor_num) {
executor.executor->prio_memory_strategy_ = executor.strat;
if (config.node_executor_assignments.empty()) {
if (config.callback_executor_assignments.empty()) {
// Add all nodes to the executor
for (auto const &node : config.nodes) {
executor.executor->add_node(node);
@ -215,7 +216,7 @@ experiment_executor Experiment::createSingleExecutor(uint const executor_num) {
executor.default_executor = std::make_shared<priority_executor::ROSDefaultExecutor>();
}
if (config.node_executor_assignments.empty()) {
if (config.callback_executor_assignments.empty()) {
// Add all nodes to the executor
for (auto const &node : config.nodes) {
executor.default_executor->add_node(node);
@ -232,11 +233,11 @@ experiment_executor Experiment::createSingleExecutor(uint const executor_num) {
}
experiment_executor Experiment::getExecutorForNode(int const node_id) {
if (config.node_executor_assignments.empty()) {
if (config.callback_executor_assignments.empty()) {
return executors[0];
}
int const executor_idx = config.node_executor_assignments[node_id];
int const executor_idx = config.callback_executor_assignments[node_id];
std::cout << "node " << node_id << " assigned to executor " << executor_idx
<< std::endl;
return executors[executor_idx];
@ -280,14 +281,14 @@ void Experiment::createNodesAndAssignProperties() {
}
// for each chain
for (uint32_t chain_id = 0; chain_id < config.node_ids.size(); chain_id++) {
for (uint32_t chain_id = 0; chain_id < config.callback_ids.size(); chain_id++) {
all_nodes.push_back(std::vector<std::shared_ptr<CaseStudyNode>>());
// for each node in the chain
bool first_node = true;
for (uint32_t node_chain_idx = 0;
node_chain_idx < config.node_ids[chain_id].size(); node_chain_idx++) {
uint32_t const node_id = config.node_ids[chain_id][node_chain_idx];
node_chain_idx < config.callback_ids[chain_id].size(); node_chain_idx++) {
uint32_t const node_id = config.callback_ids[chain_id][node_chain_idx];
// has this node been created yet?
if (nodes.find(node_id) != nodes.end()) {
// if it has, then this node exists in another chain
@ -310,16 +311,16 @@ void Experiment::createNodesAndAssignProperties() {
cb_group = cb_groups[config.group_memberships[node_id] - 1];
}
std::shared_ptr<PublisherNode> node;
if (config.node_executor_assignments.empty()) {
if (config.callback_executor_assignments.empty()) {
node = std::make_shared<PublisherNode>(
"node_" + std::to_string(node_id), config.node_runtimes[node_id],
"node_" + std::to_string(node_id), config.callback_runtimes[node_id],
config.chain_periods[chain_id], chain_id, config.nodes[0],
cb_group);
} else {
node = std::make_shared<PublisherNode>(
"node_" + std::to_string(node_id), config.node_runtimes[node_id],
"node_" + std::to_string(node_id), config.callback_runtimes[node_id],
config.chain_periods[chain_id], chain_id,
config.nodes[config.node_executor_assignments[node_id]],
config.nodes[config.callback_executor_assignments[node_id]],
cb_group);
}
handle = node->timer_->get_timer_handle();
@ -332,7 +333,7 @@ void Experiment::createNodesAndAssignProperties() {
all_nodes[chain_id].push_back(node);
} else {
// create a subscriber node
int const subscription_id = config.node_ids[chain_id][node_chain_idx - 1];
int const subscription_id = config.callback_ids[chain_id][node_chain_idx - 1];
rclcpp::CallbackGroup::SharedPtr cb_group;
if (jiang2022_cb && ((chain_id == 1 && node_chain_idx == 1) ||
(chain_id == 2 && node_chain_idx == 1))) {
@ -349,24 +350,24 @@ void Experiment::createNodesAndAssignProperties() {
std::shared_ptr<WorkerNode> node;
if (ours2024_latency) {
uint32_t const first_node_id = config.node_ids[chain_id][0];
uint32_t const first_node_id = config.callback_ids[chain_id][0];
node = std::make_shared<WorkerNode>(
"node_" + std::to_string(first_node_id),
"node_" + std::to_string(node_id), config.node_runtimes[node_id],
"node_" + std::to_string(node_id), config.callback_runtimes[node_id],
config.chain_periods[chain_id], chain_id, config.nodes[0],
cb_group);
} else if (config.node_executor_assignments.empty()) {
} else if (config.callback_executor_assignments.empty()) {
node = std::make_shared<WorkerNode>(
"node_" + std::to_string(subscription_id),
"node_" + std::to_string(node_id), config.node_runtimes[node_id],
"node_" + std::to_string(node_id), config.callback_runtimes[node_id],
config.chain_periods[chain_id], chain_id, config.nodes[0],
cb_group);
} else {
node = std::make_shared<WorkerNode>(
"node_" + std::to_string(subscription_id),
"node_" + std::to_string(node_id), config.node_runtimes[node_id],
"node_" + std::to_string(node_id), config.callback_runtimes[node_id],
config.chain_periods[chain_id], chain_id,
config.nodes[config.node_executor_assignments[node_id]],
config.nodes[config.callback_executor_assignments[node_id]],
cb_group);
}
handle = node->sub_->get_subscription_handle();
@ -390,22 +391,22 @@ void Experiment::createNodesAndAssignProperties() {
executor.strat->get_priority_settings(handle)->timer_handle =
chain_timer_handles[config.chain_timer_control[chain_id]];
executor.strat->get_priority_settings(handle)->priority =
config.node_priorities[node_id]; // use priority as tiebreaker
config.callback_priorities[node_id]; // use priority as tiebreaker
if (first_node) {
executor.strat->set_first_in_chain(handle);
first_node = false;
}
// is this the last node in the chain?
if (node_chain_idx == config.node_ids[chain_id].size() - 1) {
if (node_chain_idx == config.callback_ids[chain_id].size() - 1) {
executor.strat->set_last_in_chain(handle);
}
} else if (config.executor_type == "picas") {
// executor.executor->add_node(nodes[node_id]);
executor.strat->set_executable_priority(
handle, config.node_priorities[node_id], type,
handle, config.callback_priorities[node_id], type,
priority_executor::ExecutableScheduleType::CHAIN_AWARE_PRIORITY, chain_id);
std::cout << "node " << node_id << " has priority "
<< config.node_priorities[node_id] << std::endl;
<< config.callback_priorities[node_id] << std::endl;
} else if (config.executor_type == "default") {
// executor.default_executor->add_node(nodes[node_id]);
}
@ -498,10 +499,10 @@ void Experiment::setInitialDeadlines() {
for (uint32_t chain_index = 0; chain_index < config.chain_lengths.size();
chain_index++) {
size_t executor_idx;
if (config.node_executor_assignments.empty()) {
if (config.callback_executor_assignments.empty()) {
executor_idx = 0;
} else {
executor_idx = config.node_executor_assignments[chain_index];
executor_idx = config.callback_executor_assignments[chain_index];
}
std::shared_ptr<std::deque<uint64_t>> deadlines =
executors[executor_idx].strat->get_chain_deadlines(chain_index);
@ -538,24 +539,26 @@ void Experiment::resetTimers() {
}
void Experiment::writeLogsToFile(std::string const &file_name,
std::string const &experiment_name) {
std::ofstream output_file;
std::string const name = experiment_name;
// make sure results directory exists. we don't have access to std::filesystem
// here :(
std::string const command = "mkdir -p results/" + name;
int const result = system(command.c_str());
if (result != 0) {
std::cout << "could not create results directory: " << result << ": "
<< strerror(errno) << std::endl;
std::string const &experiment_name) {
std::filesystem::path results_dir = "results";
std::filesystem::path exp_dir = results_dir / experiment_name;
// Create directories if they don't exist
try {
std::filesystem::create_directories(exp_dir);
} catch (std::filesystem::filesystem_error const &e) {
std::cout << "Could not create results directory: " << e.what() << std::endl;
return;
}
output_file.open("results/" + name + "/" + file_name + "_" +
this->config.executor_type + ".json");
std::string output_filename = (file_name + "_" + this->config.executor_type + ".json");
std::filesystem::path output_path = exp_dir / output_filename;
std::ofstream output_file(output_path);
output_file << buildLogs();
output_file.close();
std::cout << "results written to results/" + name + "/" + file_name + "_" +
this->config.executor_type + ".json"
<< std::endl;
std::cout << "results written to " << output_path.string() << std::endl;
}
int do_other_task(double const work_time, uint const period,

View file

@ -139,6 +139,7 @@ WorkerNode::WorkerNode(
// RCLCPP_INFO(this->get_logger(), "I heard: '%s'", msg->data.c_str());
// nth_prime_silly(runtime_);
// TODO: investigate jitter generation
double used_runtime = runtime_;
if (use_random_runtime) {
if (rand() % 100 < runtime_over_chance * 100) {

View file

@ -0,0 +1,70 @@
cmake_minimum_required(VERSION 3.5)
project(existing_system_monitor)
# Default to C++14
if(NOT CMAKE_CXX_STANDARD)
set(CMAKE_CXX_STANDARD 17)
endif()
if(CMAKE_COMPILER_IS_GNUCXX OR CMAKE_CXX_COMPILER_ID MATCHES "Clang")
add_compile_options(-Wall -Wextra -Wpedantic)
endif()
find_package(ament_cmake REQUIRED)
find_package(rclcpp REQUIRED)
find_package(casestudy_tools REQUIRED)
find_package(simple_timer REQUIRED)
include_directories(include)
add_library(${PROJECT_NAME} src/instrumented_node.cpp)
target_include_directories(${PROJECT_NAME} PUBLIC
$<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/include>
$<INSTALL_INTERFACE:include>)
ament_target_dependencies(${PROJECT_NAME}
rclcpp
casestudy_tools
simple_timer
)
install(
DIRECTORY include/
DESTINATION include
)
install(TARGETS
${PROJECT_NAME}
EXPORT export_${PROJECT_NAME}
LIBRARY DESTINATION lib
ARCHIVE DESTINATION lib
RUNTIME DESTINATION bin
INCLUDES DESTINATION include
)
# add executable for uas.cpp
add_library(uas src/uas.cpp src/instrumented_node.cpp)
ament_target_dependencies(uas
rclcpp
casestudy_tools
simple_timer
)
target_link_libraries(uas
${PROJECT_NAME}
)
target_include_directories(uas PUBLIC
$<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/include>
$<INSTALL_INTERFACE:include>)
install(TARGETS
uas
EXPORT export_${PROJECT_NAME}
RUNTIME DESTINATION bin
)
ament_export_include_directories(include)
ament_export_libraries(${PROJECT_NAME})
ament_export_targets(export_${PROJECT_NAME})
ament_package()

View file

@ -0,0 +1,113 @@
#ifndef EXISTING_SYSTEM_MONITOR_INSTRUMENTED_NODE_HPP
#define EXISTING_SYSTEM_MONITOR_INSTRUMENTED_NODE_HPP
#include <string>
#include <memory>
#include <functional>
#include <unordered_map>
#include <chrono>
#include "rclcpp/rclcpp.hpp"
#include "simple_timer/rt-sched.hpp"
namespace existing_system_monitor {
// Base class for instrumented callbacks
class InstrumentedCallback {
public:
InstrumentedCallback(std::string const &node_name, std::string const &callback_name, int chain_id)
: node_name_(node_name)
, callback_name_(callback_name)
, chain_id_(chain_id) {
logger_ = create_logger();
}
virtual ~InstrumentedCallback() = default;
template<typename... Args>
void before_callback(Args&&... args);
template<typename... Args>
void after_callback(Args&&... args);
private:
std::string node_name_;
std::string callback_name_;
int chain_id_;
node_time_logger logger_;
std::chrono::high_resolution_clock::time_point start_time_;
};
// Wrapper for a subscription callback
template<typename MessageT>
class InstrumentedSubscriptionCallback : public InstrumentedCallback {
public:
using CallbackT = std::function<void(typename MessageT::SharedPtr)>;
InstrumentedSubscriptionCallback(
const std::string& node_name,
const std::string& topic_name,
int chain_id,
CallbackT original_callback);
void callback(typename MessageT::SharedPtr msg) {
before_callback();
original_callback_(msg);
after_callback();
}
private:
CallbackT original_callback_;
};
// Wrapper for a timer callback
class InstrumentedTimerCallback : public InstrumentedCallback {
public:
using CallbackT = std::function<void()>;
InstrumentedTimerCallback(
const std::string& node_name,
const std::string& timer_name,
int chain_id,
CallbackT original_callback);
void callback();
private:
CallbackT original_callback_;
};
// Factory class for creating instrumented nodes
class NodeInstrumenter {
public:
static std::shared_ptr<rclcpp::Node> instrument_node(
std::shared_ptr<rclcpp::Node> original_node,
int chain_id = 0);
template<typename MessageT>
static typename rclcpp::Subscription<MessageT>::SharedPtr instrument_subscription(
std::shared_ptr<rclcpp::Node> node,
typename rclcpp::Subscription<MessageT>::SharedPtr original_subscription,
int chain_id = 0);
static rclcpp::TimerBase::SharedPtr instrument_timer(
std::shared_ptr<rclcpp::Node> node,
rclcpp::TimerBase::SharedPtr original_timer,
int chain_id = 0);
};
// Class to hold metrics collection and analysis
class MetricsCollector {
public:
static MetricsCollector& get_instance();
void register_callback(std::shared_ptr<InstrumentedCallback> callback);
void write_logs_to_file(const std::string& file_name, const std::string& experiment_name);
private:
MetricsCollector() = default;
std::unordered_map<std::string, std::shared_ptr<InstrumentedCallback>> callbacks_;
};
} // namespace existing_system_monitor
#endif // EXISTING_SYSTEM_MONITOR_INSTRUMENTED_NODE_HPP

View file

@ -0,0 +1,76 @@
#ifndef EXISTING_SYSTEM_MONITOR_NODE_ADAPTER_HPP
#define EXISTING_SYSTEM_MONITOR_NODE_ADAPTER_HPP
#include "rclcpp/rclcpp.hpp"
#include "existing_system_monitor/instrumented_node.hpp"
namespace existing_system_monitor {
/**
* Adapter class that wraps an existing ROS2 node with instrumentation
* Usage: instead of inheriting from rclcpp::Node, inherit from InstrumentedNode
*/
template <typename NodeT>
class InstrumentedNode : public NodeT {
public:
template <typename... Args>
InstrumentedNode(int chain_id, Args&&... args)
: NodeT(std::forward<Args>(args)...), chain_id_(chain_id) {
}
// Override subscription creation
template <typename MessageT, typename CallbackT>
typename rclcpp::Subscription<MessageT>::SharedPtr create_subscription(
const std::string& topic_name,
const rclcpp::QoS& qos,
CallbackT&& callback,
const rclcpp::SubscriptionOptions& options = rclcpp::SubscriptionOptions()) {
// Create a wrapper for the callback
auto instrumented_callback = std::make_shared<InstrumentedSubscriptionCallback<MessageT>>(
this->get_name(), topic_name, chain_id_,
std::forward<CallbackT>(callback));
// Create the subscription with the instrumented callback
auto subscription = NodeT::template create_subscription<MessageT>(
topic_name, qos,
[instrumented_callback](typename MessageT::SharedPtr msg) {
instrumented_callback->callback(msg);
},
options);
return subscription;
}
// Override timer creation
template <typename DurationT, typename CallbackT>
rclcpp::TimerBase::SharedPtr create_wall_timer(
DurationT period,
CallbackT&& callback,
rclcpp::CallbackGroup::SharedPtr group = nullptr) {
// Create a wrapper for the callback
auto instrumented_callback = std::make_shared<InstrumentedTimerCallback>(
this->get_name(), "timer", chain_id_,
std::forward<CallbackT>(callback));
// Create the timer with the instrumented callback
auto timer = NodeT::create_wall_timer(
period,
[instrumented_callback]() {
instrumented_callback->callback();
},
group);
return timer;
}
private:
int chain_id_;
};
} // namespace existing_system_monitor
#endif // EXISTING_SYSTEM_MONITOR_NODE_ADAPTER_HPP

View file

@ -0,0 +1,19 @@
<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
<name>existing_system_monitor</name>
<version>0.1.0</version>
<description>Tools for monitoring existing ROS2 systems</description>
<maintainer email="user@todo.todo">user</maintainer>
<license>Apache License 2.0</license>
<buildtool_depend>ament_cmake</buildtool_depend>
<depend>rclcpp</depend>
<depend>casestudy_tools</depend>
<depend>simple_timer</depend>
<export>
<build_type>ament_cmake</build_type>
</export>
</package>

View file

@ -0,0 +1,103 @@
#include <sstream>
#include <fstream>
#include "std_msgs/msg/string.hpp"
#include "existing_system_monitor/instrumented_node.hpp"
namespace existing_system_monitor {
template<typename... Args>
void InstrumentedCallback::before_callback(Args&&... args) {
// Record start time
start_time_ = std::chrono::high_resolution_clock::now();
// Log start of callback execution
std::ostringstream ss;
ss << "{\"operation\": \"start_work\", \"chain\": " << chain_id_
<< ", \"node\": \"" << node_name_ << "/" << callback_name_ << "\", \"count\": 0}";
log_entry(logger_, ss.str());
}
template<typename... Args>
void InstrumentedCallback::after_callback(Args&&... args) {
// Calculate execution time
auto end_time = std::chrono::high_resolution_clock::now();
auto duration = std::chrono::duration_cast<std::chrono::microseconds>(
end_time - start_time_).count();
// Log end of callback execution
std::ostringstream ss;
ss << "{\"operation\": \"end_work\", \"chain\": " << chain_id_
<< ", \"node\": \"" << node_name_ << "/" << callback_name_
<< "\", \"count\": 0, \"duration_us\": " << duration << "}";
log_entry(logger_, ss.str());
}
template<typename MessageT>
InstrumentedSubscriptionCallback<MessageT>::InstrumentedSubscriptionCallback(
const std::string& node_name,
const std::string& topic_name,
int chain_id,
CallbackT original_callback)
: InstrumentedCallback(node_name, topic_name, chain_id),
original_callback_(original_callback) {
}
InstrumentedTimerCallback::InstrumentedTimerCallback(
const std::string& node_name,
const std::string& timer_name,
int chain_id,
CallbackT original_callback)
: InstrumentedCallback(node_name, timer_name, chain_id),
original_callback_(original_callback) {
}
void InstrumentedTimerCallback::callback() {
before_callback();
original_callback_();
after_callback();
}
std::shared_ptr<rclcpp::Node> NodeInstrumenter::instrument_node(
std::shared_ptr<rclcpp::Node> original_node,
int chain_id) {
// For now, we just return the original node
// In a more advanced implementation, we might create a proxy node
return original_node;
}
template<typename MessageT>
typename rclcpp::Subscription<MessageT>::SharedPtr NodeInstrumenter::instrument_subscription(
std::shared_ptr<rclcpp::Node> node,
typename rclcpp::Subscription<MessageT>::SharedPtr original_subscription,
int chain_id) {
// To be implemented
return original_subscription;
}
rclcpp::TimerBase::SharedPtr NodeInstrumenter::instrument_timer(
std::shared_ptr<rclcpp::Node> node,
rclcpp::TimerBase::SharedPtr original_timer,
int chain_id) {
// To be implemented
return original_timer;
}
MetricsCollector& MetricsCollector::get_instance() {
static MetricsCollector instance;
return instance;
}
void MetricsCollector::register_callback(std::shared_ptr<InstrumentedCallback> callback) {
// To be implemented
}
void MetricsCollector::write_logs_to_file(const std::string& file_name, const std::string& experiment_name) {
// This would use similar code to the existing Experiment::writeLogsToFile method
}
template class InstrumentedSubscriptionCallback<std_msgs::msg::String>;
} // namespace existing_system_monitor

View file

@ -0,0 +1,132 @@
#include <chrono>
#include <memory>
#include <string>
#include "rclcpp/rclcpp.hpp"
#include "std_msgs/msg/string.hpp"
#include "casestudy_tools/primes_workload.hpp"
#include "existing_system_monitor/node_adapter.hpp"
#include "priority_executor/priority_executor.hpp"
#include "priority_executor/priority_memory_strategy.hpp"
using namespace std::chrono_literals;
// the topic
constexpr char const* TOPIC_NAME = "wildfire_talk";
// Modified Node - Inherit from InstrumentedNode instead of rclcpp::Node
class CustomTalkerNode : public existing_system_monitor::InstrumentedNode<rclcpp::Node> {
public:
// Pass chain_id to InstrumentedNode constructor
CustomTalkerNode(int chain_id, const std::string& node_name) : InstrumentedNode(chain_id, node_name) {
// The rest of the code stays the same!
timer_ = create_wall_timer(
std::chrono::milliseconds(100),
std::bind(&CustomTalkerNode::timer_callback, this));
publisher_ = this->create_publisher<std_msgs::msg::String>(TOPIC_NAME, 10);
timer_ = this->create_wall_timer(
500ms, std::bind(&CustomTalkerNode::timer_callback, this));
}
rclcpp::TimerBase::SharedPtr timer_;
private:
void timer_callback() {
auto message = std_msgs::msg::String();
message.data = "Hello, world! " + std::to_string(count_++);
RCLCPP_INFO(this->get_logger(), "Publishing: '%s'", message.data.c_str());
publisher_->publish(message);
}
rclcpp::Publisher<std_msgs::msg::String>::SharedPtr publisher_;
int count_;
};
// Modified Node - Inherit from InstrumentedNode instead of rclcpp::Node
class CustomListenerNode : public existing_system_monitor::InstrumentedNode<rclcpp::Node> {
public:
// Pass chain_id to InstrumentedNode constructor
CustomListenerNode(int chain_id, const std::string& node_name) : InstrumentedNode(chain_id, node_name) {
sub_ = this->create_subscription<std_msgs::msg::String>(
TOPIC_NAME, 10, std::bind(&CustomListenerNode::topic_callback, this, std::placeholders::_1));
}
rclcpp::Subscription<std_msgs::msg::String>::SharedPtr sub_;
private:
void topic_callback(const std_msgs::msg::String::SharedPtr msg) const {
RCLCPP_INFO(this->get_logger(), "I heard: '%s'", msg->data.c_str());
}
};
int main(int argc, char* argv[]) {
// calibrate the dummy load for the current system
calibrate_dummy_load();
rclcpp::init(argc, argv);
// Create nodes for the experiment
/*
auto talker = std::make_shared<existing_system_monitor::InstrumentedNode<CustomTalkerNode>>(0, "CustomTalkerNode");
auto listener1 = std::make_shared<existing_system_monitor::InstrumentedNode<CustomListenerNode>>(0, "CustomListenerNode");
auto listener2 = std::make_shared<existing_system_monitor::InstrumentedNode<CustomListenerNode>>(0, "CustomListenerNode");
*/
auto talker = std::make_shared<CustomTalkerNode>(0, "CustomTalkerNode");
auto listener1 = std::make_shared<CustomListenerNode>(0, "CustomListenerNode");
auto listener2 = std::make_shared<CustomListenerNode>(0, "CustomListenerNode");
rclcpp::ExecutorOptions options;
auto strategy = std::make_shared<priority_executor::PriorityMemoryStrategy<>>();
options.memory_strategy = strategy;
auto executor = new priority_executor::TimedExecutor(options);
// must be set to post_execute can set new deadlines
executor->prio_memory_strategy_ = strategy;
// the new funcitons in PriorityMemoryStrategy accept the handle of the
// timer/subscription as the first argument
strategy->set_executable_deadline(talker->timer_->get_timer_handle(), 1000,
priority_executor::ExecutableType::TIMER, 0);
// you _must_ set the timer_handle for each chain
strategy->get_priority_settings(talker->timer_->get_timer_handle())
->timer_handle = talker->timer_;
// you _must_ mark the first executable in the chain
strategy->set_first_in_chain(talker->timer_->get_timer_handle());
// set the same period and chain_id for each callback in the chain
strategy->set_executable_deadline(listener1->sub_->get_subscription_handle(),
1000, priority_executor::ExecutableType::SUBSCRIPTION, 0);
strategy->set_executable_deadline(listener2->sub_->get_subscription_handle(),
1000, priority_executor::ExecutableType::SUBSCRIPTION, 0);
// you _must_ mark the last executable in the chain (used to keep track of different instances of the same chain)
strategy->set_last_in_chain(listener2->sub_->get_subscription_handle());
// add all the nodes to the executor
executor->add_node(talker);
executor->add_node(listener1);
executor->add_node(listener2);
// if the executor behaves unexpectedly, you can print the priority settings to make sure they are correct
std::cout << *strategy->get_priority_settings(
talker->timer_->get_timer_handle())
<< std::endl;
std::cout << *strategy->get_priority_settings(
listener1->sub_->get_subscription_handle())
<< std::endl;
std::cout << *strategy->get_priority_settings(
listener2->sub_->get_subscription_handle())
<< std::endl;
executor->spin();
rclcpp::shutdown();
// Get all collected metrics and write them to a file
auto& collector = existing_system_monitor::MetricsCollector::get_instance();
collector.write_logs_to_file("my_system_analysis", "real_system_test");
return 0;
}