diff --git a/.vscode/settings.json b/.vscode/settings.json index 90df76a..c5df8dd 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -73,7 +73,8 @@ "codecvt": "cpp", "forward_list": "cpp", "valarray": "cpp", - "ddsi_sertopic.h": "c" + "ddsi_sertopic.h": "c", + "regex": "cpp" }, "cmake.ignoreCMakeListsMissing": true, "python.autoComplete.extraPaths": [ diff --git a/build.sh b/build.sh index cd45cf5..e04b80d 100755 --- a/build.sh +++ b/build.sh @@ -1,17 +1,16 @@ #!/usr/bin/env bash -set -eo pipefail +colcon build --packages-select tracetools --allow-overriding tracetools && \ + source install/setup.bash && \ + colcon build --packages-select cyclonedds --allow-overriding cyclonedds && \ + source install/setup.bash && \ + colcon build --packages-select rmw_cyclonedds_cpp --allow-overriding rmw_cyclonedds_cpp && \ + source install/setup.bash && \ + colcon build --packages-select rcl rcl_yaml_param_parser --allow-overriding rcl rcl_yaml_param_parser && \ + source install/setup.bash && \ + colcon build --packages-select rclcpp --allow-overriding rclcpp && \ + source install/setup.bash && \ + colcon build --packages-up-to full_topology && \ + source install/setup.bash -colcon build --packages-select tracetools -source install/setup.bash -colcon build --packages-select cyclonedds -source install/setup.bash -colcon build --packages-select rmw_cyclonedds_cpp -source install/setup.bash -colcon build --packages-select rcl rcl_yaml_param_parser -source install/setup.bash -colcon build --packages-select rclcpp -source install/setup.bash -colcon build -source install/setup.bash export RMW_IMPLEMENTATION=rmw_cyclonedds_cpp diff --git a/build_run_copy.sh b/build_run_copy.sh new file mode 100755 index 0000000..3e3e598 --- /dev/null +++ b/build_run_copy.sh @@ -0,0 +1,9 @@ +#!/usr/bin/env bash + +colcon build --packages-select full_topology && \ + source install/setup.bash && \ + TRACE_ID=$(taskset -c 2,3 ros2 launch full_topology trace_full_topology.launch.py length:=100 | \ + grep -oP '(?<=Writing tracing session to: /home/niklas/ROS-Dynamic-Executor-Experiments/analysis/tracing/full_topology_tracing-)\d+') && \ + ros2 trace-analysis convert "/home/niklas/ROS-Dynamic-Executor-Experiments/analysis/tracing/full_topology_tracing-${TRACE_ID}/ust" && \ + scp -r "analysis/tracing/full_topology_tracing-${TRACE_ID}/" 192.168.162.249:/home/niklas/dataflow-analysis/traces/ && \ + echo "Trace analysis \"${TRACE_ID}\" complete and copied to remote server." diff --git a/colcon_defaults.yaml b/colcon_defaults.yaml index 21a4048..4b18297 100755 --- a/colcon_defaults.yaml +++ b/colcon_defaults.yaml @@ -3,7 +3,7 @@ "symlink-install": true, "cmake-args": [ "-DCMAKE_BUILD_TYPE=RelWithDebInfo", - "-DCMAKE_EXPORT_COMPILE_COMMANDS=True", + "-DCMAKE_EXPORT_COMPILE_COMMANDS=ON", "-GNinja", # clang "-DCMAKE_C_COMPILER=clang-18", @@ -14,4 +14,4 @@ "event-handlers": ["console_cohesion+"], # "mixin": "clang", } -} \ No newline at end of file +} diff --git a/init.sh b/init.sh new file mode 100644 index 0000000..6df82ff --- /dev/null +++ b/init.sh @@ -0,0 +1,5 @@ +source /opt/ros/foxy/setup.bash +source ./venv38/bin/activate + +export MAKEFLAGS="-j 1" +export RMW_IMPLEMENTATION=rmw_cyclonedds_cpp diff --git a/src/casestudy/CMakeLists.txt b/src/casestudy/CMakeLists.txt deleted file mode 100755 index 8c9e324..0000000 --- a/src/casestudy/CMakeLists.txt +++ /dev/null @@ -1,78 +0,0 @@ -cmake_minimum_required(VERSION 3.8) -project(casestudy VERSION 0.1.0) - -# Set C++ standards -set(CMAKE_CXX_STANDARD 17) -set(CMAKE_CXX_STANDARD_REQUIRED ON) -set(CMAKE_CXX_EXTENSIONS OFF) - -# Compiler options -if(CMAKE_COMPILER_IS_GNUCXX OR CMAKE_CXX_COMPILER_ID MATCHES "Clang") - add_compile_options(-Wall -Wextra -Wpedantic) -endif() - -# Find dependencies -find_package(ament_cmake REQUIRED) -find_package(tracetools REQUIRED) -find_package(rclcpp REQUIRED) -find_package(priority_executor REQUIRED) -find_package(simple_timer REQUIRED) -find_package(jsoncpp REQUIRED) -find_package(casestudy_tools REQUIRED) - -# Define experiment targets -set(EXPERIMENTS - casestudy_2023customfile_singlethread - casestudy_2024ours_latency - casestudy_2024ours_executor2executor - casestudy_example - casestudy_monitored_example - casestudy_fire_drone -) - -# Function to create experiment targets -function(new_experiment experiment_name) - add_executable(${experiment_name} src/${experiment_name}.cpp) - target_include_directories(${experiment_name} PUBLIC - $ - $ - ) - - # Add dependencies with ament - ament_target_dependencies(${experiment_name} - tracetools - simple_timer - casestudy_tools - rclcpp - priority_executor - ) -endfunction() - -# Create all experiment targets -foreach(experiment ${EXPERIMENTS}) - new_experiment(${experiment}) -endforeach() - -# Special handling for targets with additional dependencies -target_link_libraries(casestudy_2023customfile_singlethread jsoncpp) - -# Installation -install( - TARGETS ${EXPERIMENTS} - RUNTIME DESTINATION lib/${PROJECT_NAME} -) - -install( - DIRECTORY include/ - DESTINATION include -) - -# Testing -if(BUILD_TESTING) - find_package(ament_lint_auto REQUIRED) - ament_lint_auto_find_test_dependencies() -endif() - -# Export and package configuration -ament_export_include_directories(include) -ament_package() diff --git a/src/casestudy/include/.gitignore b/src/casestudy/include/.gitignore deleted file mode 100644 index e14d801..0000000 --- a/src/casestudy/include/.gitignore +++ /dev/null @@ -1,3 +0,0 @@ -# We don't have content in this dir yet, but want it in git anyways -# So we add a non-empty .gitignore file, explicitly not ignoring itself -!.gitignore \ No newline at end of file diff --git a/src/casestudy/package.xml b/src/casestudy/package.xml deleted file mode 100755 index c82d8b6..0000000 --- a/src/casestudy/package.xml +++ /dev/null @@ -1,29 +0,0 @@ - - - - casestudy - 0.1.0 - - ROS 2 case study package for evaluating different executor implementations and timing behaviors - - kurt - Apache License 2.0 - - ament_cmake - - - tracetools - rclcpp - priority_executor - simple_timer - casestudy_tools - jsoncpp - - - ament_lint_auto - ament_lint_common - - - ament_cmake - - diff --git a/src/casestudy/src/casestudy_2023customfile_singlethread.cpp b/src/casestudy/src/casestudy_2023customfile_singlethread.cpp deleted file mode 100755 index 92f4960..0000000 --- a/src/casestudy/src/casestudy_2023customfile_singlethread.cpp +++ /dev/null @@ -1,164 +0,0 @@ -#include "rclcpp/rclcpp.hpp" -#include "priority_executor/priority_executor.hpp" -#include "priority_executor/priority_memory_strategy.hpp" -#include "casestudy_tools/test_nodes.hpp" -#include "std_msgs/msg/string.hpp" -#include "casestudy_tools/primes_workload.hpp" -#include "casestudy_tools/experiment.hpp" -#include "jsoncpp/json/reader.h" -#include -#include -#include -#include - -std::atomic should_do_task(true); - -void run_one_executor( - std::function&)> const& exec_fun, - int const idx) { - (void)idx; - - // set this process to highest priority - struct sched_param param; - param.sched_priority = sched_get_priority_max(SCHED_FIFO) - 1; - int const result = sched_setscheduler(0, SCHED_FIFO, ¶m); - if (result != 0) { - std::cout << "ros_experiment: sched_setscheduler failed: " << result - << ": " << strerror(errno) << std::endl; - } - - // set process name - prctl(PR_SET_NAME, "ros_experiment", 0, 0, 0); - exec_fun(should_do_task); -} - -int ros_experiment( - int const argc, - char** const argv, - std::string const& file_name, - ExperimentConfig config) { - - rclcpp::init(argc, argv); - - // we have to create a node to process any passed parameters - auto const node = rclcpp::Node::make_shared("experiment_node"); - node->declare_parameter("executor_type", "edf"); - - std::string executor_type; - node->get_parameter("executor_type", executor_type); - std::cout << "using executor type: " << executor_type << std::endl; - - config.executor_type = executor_type; - config.nodes.push_back(node); - - Experiment experiment(config); - auto const exec_funs = experiment.getRunFunctions(); - std::cout << "got " << exec_funs.size() << " executors" << std::endl; - - std::vector exec_threads; - int i = 0; - experiment.resetTimers(); - - for (auto const& exec_fun : exec_funs) { - exec_threads.emplace_back(run_one_executor, exec_fun, i++); - } - - for (auto& t : exec_threads) { - t.join(); - } - - experiment.writeLogsToFile(file_name, "2023customfile_singlethread"); - return 0; -} - -int main(int argc, char* argv[]) { - std::ifstream input_file("taskset.json"); - - if (!input_file.is_open()) { - std::cerr << "Error opening file!" << std::endl; - return 1; - } - - Json::Value root; - Json::Reader reader; - bool const parsingSuccessful = reader.parse(input_file, root); - - if (!parsingSuccessful) { - std::cerr << "Error parsing file!" << std::endl; - return 1; - } - - // set ourselves to the second highest priority - struct sched_param param; - param.sched_priority = sched_get_priority_max(SCHED_FIFO) - 2; - int const result = sched_setscheduler(0, SCHED_FIFO, ¶m); - if (result != 0) { - std::cout << "ros_experiment: sched_setscheduler failed: " << result - << ": " << strerror(errno) << std::endl; - } - - // calibrate the dummy load for the current system - calibrate_dummy_load(); - - ExperimentConfig config; - - for (uint i = 0; i < root["chain_lengths"].size(); i++) { - config.chain_lengths.push_back(root["chain_lengths"][i].asUInt()); - } - - for (uint i = 0; i < root["node_ids"].size(); i++) { - std::vector node_ids_row; - for (uint j = 0; j < root["node_ids"][i].size(); j++) { - node_ids_row.push_back(root["node_ids"][i][j].asUInt()); - } - config.callback_ids.push_back(std::move(node_ids_row)); - } - - for (uint i = 0; i < root["node_priorities"].size(); i++) { - config.callback_priorities.push_back(root["node_priorities"][i].asUInt()); - } - - config.num_groups = 0; - - for (uint i = 0; i < root["chain_timer_control"].size(); i++) { - config.chain_timer_control.push_back(root["chain_timer_control"][i].asInt()); - } - - for (uint i = 0; i < root["node_runtimes"].size(); i++) { - config.callback_runtimes.push_back(root["node_runtimes"][i].asDouble()); - } - - for (uint i = 0; i < root["chain_periods"].size(); i++) { - config.chain_periods.push_back(root["chain_periods"][i].asInt()); - } - - for (uint i = 0; i < root["node_executor_assignments"].size(); i++) { - config.callback_executor_assignments.push_back(root["node_executor_assignments"][i].asUInt() - 1); - } - - for (uint i = 0; i < root["executor_to_cpu_core"].size(); i++) { - config.executor_to_cpu_assignments.push_back(root["executor_to_cpu_core"][i].asUInt() - 1); - } - - config.parallel_mode = false; - config.cores = 4; - - int const i = (100 - 35) / 5; - int const ros_budget = 35 + i * 5; - int const other_task_budget = 65 - i * 5; - std::cout << "ROS budget: " << ros_budget - << " Other task budget: " << other_task_budget << std::endl; - - should_do_task.store(true); - double const other_task_time = 80 * other_task_budget / 100.0; - std::thread other_task(do_other_task, other_task_time, 80, std::ref(should_do_task)); - - std::string const file_name = "ros_" + std::to_string(ros_budget); - std::thread ros_task(ros_experiment, argc, argv, file_name, config); - std::cout << "tasks started" << std::endl; - ros_task.join(); - other_task.join(); - std::cout << "tasks joined" << std::endl; - - return 0; -} \ No newline at end of file diff --git a/src/casestudy/src/casestudy_2024ours_executor2executor.cpp b/src/casestudy/src/casestudy_2024ours_executor2executor.cpp deleted file mode 100755 index 368e493..0000000 --- a/src/casestudy/src/casestudy_2024ours_executor2executor.cpp +++ /dev/null @@ -1,197 +0,0 @@ -#include "casestudy_tools/experiment.hpp" -#include "casestudy_tools/primes_workload.hpp" -#include -#include -#include - -// bool should_do_task = true; -std::atomic should_do_task(true); - -void run_one_executor( - std::function&)> const& exec_fun, - int const idx) { - (void)idx; - // set this process to highest priority - // struct sched_param param; - // param.sched_priority = sched_get_priority_max(SCHED_FIFO) - 1; - // int result = sched_setscheduler(0, SCHED_FIFO, ¶m); - // if (result != 0) - // { - // std::cout << "ros_experiment: sched_setscheduler failed: " << result << - // ": " << strerror(errno) << std::endl; - // } - sched_attr rt_attr; - rt_attr.size = sizeof(rt_attr); - rt_attr.sched_flags = 0; - rt_attr.sched_nice = 0; - rt_attr.sched_priority = 99; - rt_attr.sched_policy = SCHED_FIFO; - rt_attr.sched_runtime = 0; - rt_attr.sched_period = 0; - - int const result = sched_setattr(0, &rt_attr, 0); - if (result != 0) { - std::cout << "executor task: could not set scheduler: " << result << ": " - << strerror(errno) << std::endl; - } - - // set process name - prctl(PR_SET_NAME, "ros_experiment", 0, 0, 0); - exec_fun(should_do_task); -} - -int ros_experiment( - rclcpp::Node::SharedPtr const node, - std::string const& file_name, - ExperimentConfig config) { - - std::string executor_type; - node->get_parameter("executor_type", executor_type); - int num_subscribers; - node->get_parameter("num_subscribers", num_subscribers); - int num_chains; - node->get_parameter("num_chains", num_chains); - std::cout << "using executor type: " << executor_type << std::endl; - - // TODO: move experiment configuration to main - config.executor_type = executor_type; - config.nodes.push_back(node); - - Experiment experiment(config); - // std::string result_log = experiment.run(should_do_task); - auto const exec_funs = experiment.getRunFunctions(); - std::cout << "got " << exec_funs.size() << " executors" << std::endl; - std::vector exec_threads; - int i = 0; - experiment.resetTimers(); - - for (auto const& exec_fun : exec_funs) { - exec_threads.emplace_back(run_one_executor, exec_fun, i++); - } - - for (auto& t : exec_threads) { - t.join(); - } - - std::string outputname = "2024ours_executor2executor_" + - std::to_string(num_subscribers) + "_" + - std::to_string(num_chains); - experiment.writeLogsToFile(file_name, outputname); - - return 0; -} - -int main(int argc, char* argv[]) { - rclcpp::init(argc, argv); - // we have to create a node to process any passed parameters - auto const node = rclcpp::Node::make_shared("experiment_node"); - // auto node = rclcpp::Node::make_shared("experiment_node", - // rclcpp::NodeOptions().use_intra_process_comms(true)); - node->declare_parameter("executor_type", "edf"); - node->declare_parameter("num_subscribers", 3); - node->declare_parameter("num_chains", 1); - node->declare_parameter("runtime", 0); - - int num_s; - node->get_parameter("num_subscribers", num_s); - int runtime; - node->get_parameter("runtime", runtime); - int num_chains; - node->get_parameter("num_chains", num_chains); - - // set ourselves to the second highest priority - // struct sched_param param; - // param.sched_priority = sched_get_priority_max(SCHED_FIFO) - 2; - // int result = sched_setscheduler(0, SCHED_FIFO, ¶m); - sched_attr rt_attr; - rt_attr.size = sizeof(rt_attr); - rt_attr.sched_policy = SCHED_FIFO; - // rt_attr.sched_priority = sched_get_priority_max(SCHED_FIFO) - 2; - rt_attr.sched_priority = 99; - int const result = sched_setattr(0, &rt_attr, 0); - - if (result != 0) { - std::cout << "ros_experiment: sched_setscheduler failed: " << result << ": " - << strerror(errno) << std::endl; - } - - // calibrate the dummy load for the current system - calibrate_dummy_load(); - - ExperimentConfig config; - - // runtimes and timers subject to change - config.chain_lengths = {}; - config.callback_ids = {}; - config.chain_timer_control = {}; - config.chain_periods = {}; - config.callback_runtimes = {}; - config.callback_executor_assignments = {}; - int node_id = 0; - for (int c = 0; c < num_chains; c++) { - config.callback_ids.push_back({}); - for (int i = 0; i < num_s + 1; i++) { - config.callback_ids[c].push_back(node_id); - if (i == 0) { - // first node is the publisher, and goes in the first executor - config.callback_executor_assignments.push_back(0); - } else { - // all other nodes go in the second executor - config.callback_executor_assignments.push_back(1); - } - node_id++; - } - for (int i = 0; i < num_s + 1; i++) { - config.callback_priorities.push_back(i); - config.callback_runtimes.push_back(runtime); - } - config.chain_lengths.push_back(num_s+1); - config.chain_periods.push_back(10); - config.chain_timer_control.push_back(c); - } - - config.executor_to_cpu_assignments = {0, 1}; - - std::cout << "node ids: " << std::endl; - for (size_t i = 0; i < config.callback_ids.size(); i++) { - for (size_t j = 0; j < config.callback_ids[i].size(); j++) { - std::cout << config.callback_ids[i][j] << " "; - } - std::cout << std::endl; - } - - - config.num_groups = 0; - config.group_memberships = {}; // no groups - - config.parallel_mode = false; - config.cores = 1; - config.special_cases.push_back("2024ours_latency"); - - // sanity_check_config(config); - - // from 35/65 to 100/0 in increments of 5 - // for (int i = 0; i <= (100 - 35) / 5; i++) - // { - // for now, run with 100% ros - int const i = (100 - 35) / 5; - int const ros_budget = 35 + i * 5; - int const other_task_budget = 65 - i * 5; - std::cout << "ROS budget: " << ros_budget - << " Other task budget: " << other_task_budget << std::endl; - // do other task in a separate thread - should_do_task.store(true); - - // do ros task - std::string const file_name = "ros_" + std::to_string(ros_budget); - // ros_experiment(argc, argv, file_name); - std::thread ros_task(ros_experiment, node, file_name, config); - std::cout << "tasks started" << std::endl; - ros_task.join(); - std::cout << "tasks joined" << std::endl; - - // // run once - // break; - // } - return 0; -} \ No newline at end of file diff --git a/src/casestudy/src/casestudy_2024ours_latency.cpp b/src/casestudy/src/casestudy_2024ours_latency.cpp deleted file mode 100755 index 373b939..0000000 --- a/src/casestudy/src/casestudy_2024ours_latency.cpp +++ /dev/null @@ -1,185 +0,0 @@ -#include "casestudy_tools/experiment.hpp" -#include "casestudy_tools/primes_workload.hpp" -#include -#include -#include - -// bool should_do_task = true; -std::atomic should_do_task(true); - -void run_one_executor(std::function&)> const& exec_fun, - int const idx) { - (void)idx; - // set this process to highest priority - // struct sched_param param; - // param.sched_priority = sched_get_priority_max(SCHED_FIFO) - 1; - // int result = sched_setscheduler(0, SCHED_FIFO, ¶m); - // if (result != 0) - // { - // std::cout << "ros_experiment: sched_setscheduler failed: " << result << - // ": " << strerror(errno) << std::endl; - // } - sched_attr rt_attr; - rt_attr.size = sizeof(rt_attr); - rt_attr.sched_flags = 0; - rt_attr.sched_nice = 0; - rt_attr.sched_priority = 99; - rt_attr.sched_policy = SCHED_FIFO; - rt_attr.sched_runtime = 0; - rt_attr.sched_period = 0; - int const result = sched_setattr(0, &rt_attr, 0); - if (result != 0) { - std::cout << "executor task: could not set scheduler: " << result << ": " - << strerror(errno) << std::endl; - } - - // set process name - prctl(PR_SET_NAME, "ros_experiment", 0, 0, 0); - - exec_fun(should_do_task); -} - -int ros_experiment(rclcpp::Node::SharedPtr const& node, std::string const& file_name, - ExperimentConfig config) { - std::string executor_type; - node->get_parameter("executor_type", executor_type); - int num_subscribers; - node->get_parameter("num_subscribers", num_subscribers); - int num_chains; - node->get_parameter("num_chains", num_chains); - std::cout << "using executor type: " << executor_type << std::endl; - - // TODO: move experiment configuration to main - config.executor_type = executor_type; - config.nodes.push_back(node); - - Experiment experiment(config); - // std::string result_log = experiment.run(should_do_task); - std::vector&)>> exec_funs = - experiment.getRunFunctions(); - std::cout << "got " << exec_funs.size() << " executors" << std::endl; - std::vector exec_threads; - int i = 0; - experiment.resetTimers(); - for (auto const& exec_fun : exec_funs) { - exec_threads.push_back(std::thread(run_one_executor, exec_fun, i)); - i++; - } - - for (auto& t : exec_threads) { - t.join(); - } - - std::string const outputname = "2024ours_latency_" + - std::to_string(num_subscribers) + "_" + std::to_string(num_chains); - experiment.writeLogsToFile(file_name, outputname); - - return 0; -} - -int main(int argc, char* argv[]) { - rclcpp::init(argc, argv); - // we have to create a node to process any passed parameters - auto const node = rclcpp::Node::make_shared("experiment_node"); - // auto node = rclcpp::Node::make_shared("experiment_node", - // rclcpp::NodeOptions().use_intra_process_comms(true)); - node->declare_parameter("executor_type", "edf"); - node->declare_parameter("num_subscribers", 3); - node->declare_parameter("num_chains", 1); - node->declare_parameter("runtime", 0); - - int num_s; - node->get_parameter("num_subscribers", num_s); - int runtime; - node->get_parameter("runtime", runtime); - int num_chains; - node->get_parameter("num_chains", num_chains); - - // set ourselves to the second highest priority - // struct sched_param param; - // param.sched_priority = sched_get_priority_max(SCHED_FIFO) - 2; - // int result = sched_setscheduler(0, SCHED_FIFO, ¶m); - sched_attr rt_attr; - rt_attr.size = sizeof(rt_attr); - rt_attr.sched_policy = SCHED_FIFO; - // rt_attr.sched_priority = sched_get_priority_max(SCHED_FIFO) - 2; - rt_attr.sched_priority = 99; - int const result = sched_setattr(0, &rt_attr, 0); - - if (result != 0) { - std::cout << "ros_experiment: sched_setscheduler failed: " << result << ": " - << strerror(errno) << std::endl; - } - // calibrate the dummy load for the current system - calibrate_dummy_load(); - - ExperimentConfig config; - - // runtimes and timers subject to change - config.chain_lengths = {}; - config.callback_ids = {}; - config.chain_timer_control = {}; - config.chain_periods = {}; - config.callback_runtimes = {}; - int node_id = 0; - for (int c = 0; c < num_chains; c++) { - config.callback_ids.push_back({}); - for (int i = 0; i < num_s + 1; i++) { - config.callback_ids[c].push_back(node_id); - node_id++; - } - for (int i = 0; i < num_s + 1; i++) { - config.callback_priorities.push_back(i); - } - config.chain_lengths.push_back(num_s + 1); - config.chain_periods.push_back(10); - for (int i = 0; i < num_s + 1; i++) { - config.callback_runtimes.push_back(runtime); - } - config.chain_timer_control.push_back(c); - } - - std::cout << "node ids: " << std::endl; - for (size_t i = 0; i < config.callback_ids.size(); i++) { - for (size_t j = 0; j < config.callback_ids[i].size(); j++) { - std::cout << config.callback_ids[i][j] << " "; - } - std::cout << std::endl; - } - - config.num_groups = 0; - config.group_memberships = {}; // no groups - - config.callback_executor_assignments = {}; - // config.node_executor_assignments = {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}; - config.parallel_mode = false; - config.cores = 1; - config.special_cases.push_back("2024ours_latency"); - - // sanity_check_config(config); - - // from 35/65 to 100/0 in increments of 5 - // for (int i = 0; i <= (100 - 35) / 5; i++) - // { - // for now, run with 100% ros - int const i = (100 - 35) / 5; - int const ros_budget = 35 + i * 5; - int const other_task_budget = 65 - i * 5; - std::cout << "ROS budget: " << ros_budget - << " Other task budget: " << other_task_budget << std::endl; - // do other task in a separate thread - should_do_task.store(true); - - // do ros task - std::string const file_name = "ros_" + std::to_string(ros_budget); - // ros_experiment(argc, argv, file_name); - std::thread ros_task(ros_experiment, node, file_name, config); - std::cout << "tasks started" << std::endl; - ros_task.join(); - std::cout << "tasks joined" << std::endl; - - // // run once - // break; - // } - return 0; -} \ No newline at end of file diff --git a/src/casestudy/src/casestudy_example.cpp b/src/casestudy/src/casestudy_example.cpp deleted file mode 100644 index 776036c..0000000 --- a/src/casestudy/src/casestudy_example.cpp +++ /dev/null @@ -1,139 +0,0 @@ -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include "casestudy_tools/test_nodes.hpp" -#include "casestudy_tools/experiment.hpp" -#include "casestudy_tools/primes_workload.hpp" -#include "priority_executor/priority_executor.hpp" -#include "priority_executor/priority_memory_strategy.hpp" - -// bool should_do_task = true; -std::atomic should_do_task(true); - -void run_one_executor( - std::function&)> const& exec_fun, - int const idx) { - (void)idx; - // cpu_set_t cpuset; - // CPU_ZERO(&cpuset); - // CPU_SET(0, &cpuset); - // CPU_SET(1, &cpuset); - // CPU_SET(2, &cpuset); - // CPU_SET(3, &cpuset); - - // int result = sched_setaffinity(0, sizeof(cpu_set_t), &cpuset); - // if (result != 0) - // { - // std::cout << "ros_experiment: sched_setaffinity failed: " << result << ": " << strerror(errno) << std::endl; - // } - - // set this process to second highest priority - struct sched_param param; - param.sched_priority = sched_get_priority_max(SCHED_FIFO) - 2; - int const result = sched_setscheduler(0, SCHED_FIFO, ¶m); - - if (result != 0) { - std::cerr << "ros_experiment: sched_setscheduler failed: " << result - << ": " << strerror(errno) << std::endl; - std::cerr << "This operation requires root privileges. Please run the program with sufficient permissions." - << std::endl; -#ifndef WIN_DOCKER_IS_BROKEN - exit(EXIT_FAILURE); -#else - // Windows Docker workaround: just print the error and continue - std::cerr << "Continuing without setting scheduler, fuck Windows" << std::endl; -#endif - } - - // Set the process name to "ros_experiment" for easier identification during debugging or system monitoring. - prctl(PR_SET_NAME, "ros_experiment", 0, 0, 0); - exec_fun(should_do_task); -} - -int ros_experiment( - int const argc, - char** const argv, - std::string const& file_name, - ExperimentConfig config) { - - rclcpp::init(argc, argv); - - // we have to create a node to process any passed parameters - auto const node = rclcpp::Node::make_shared("experiment_parameters"); - node->declare_parameter("executor_type", "edf"); - - std::string executor_type; - node->get_parameter("executor_type", executor_type); - std::cout << "using executor type: " << executor_type << std::endl; - - config.executor_type = executor_type; - // Enable parallel mode for the experiment configuration. - // When set to true, this allows multiple executors to run concurrently, - // leveraging multi-threading to improve performance on multi-core systems. - config.parallel_mode = true; - config.nodes.push_back(node); - - Experiment experiment(config); - // std::string result_log = experiment.run(should_do_task); - auto const exec_funs = experiment.getRunFunctions(); - std::cout << "got " << exec_funs.size() << " executors" << std::endl; - - std::vector exec_threads; - int i = 0; - experiment.resetTimers(); - - for (auto const& exec_fun : exec_funs) { - exec_threads.emplace_back(run_one_executor, exec_fun, i++); - } - - for (auto& t : exec_threads) { - t.join(); - } - - std::string const outputname = "casestudy_example"; - experiment.writeLogsToFile(file_name, outputname); - - return 0; -} - -int main(int argc, char* argv[]) { - // calibrate the dummy load for the current system - calibrate_dummy_load(); - - ExperimentConfig config; - config.chain_lengths = {2, 2}; - config.callback_ids = {{0, 1}, {2, 3}}; - config.callback_priorities = {1, 0, 3, 2}; - config.chain_timer_control = {0, 1}; - - config.callback_runtimes = {10, 10, 10, 10}; - // node 0 has a period of 80, and is the only timer - config.chain_periods = {100, 100}; - config.callback_executor_assignments = {}; - config.parallel_mode = true; - config.cores = 2; - - sanity_check_config(config); - - std::thread ros_task([&]() { - try { - ros_experiment(argc, argv, "cs_example", config); - } catch (std::exception const& e) { - std::cerr << "Exception in ros_experiment: " << e.what() << std::endl; - } catch (...) { - std::cerr << "Unknown exception in ros_experiment." << std::endl; - } - }); - - std::cout << "tasks started" << std::endl; - ros_task.join(); - std::cout << "tasks joined" << std::endl; - - return 0; -} \ No newline at end of file diff --git a/src/casestudy/src/casestudy_fire_drone.cpp b/src/casestudy/src/casestudy_fire_drone.cpp deleted file mode 100644 index d78df79..0000000 --- a/src/casestudy/src/casestudy_fire_drone.cpp +++ /dev/null @@ -1,189 +0,0 @@ -#include "casestudy_tools/experiment.hpp" -#include "casestudy_tools/primes_workload.hpp" - -#include "priority_executor/priority_executor.hpp" -#include "priority_executor/priority_memory_strategy.hpp" - -#include - -#include -#include -#include -#include -#include - -#include - -// bool should_do_task = true; -std::atomic should_do_task(true); - -void run_one_executor( - std::function&)> const& exec_fun, - int const idx) { - (void)idx; - // cpu_set_t cpuset; - // CPU_ZERO(&cpuset); - // CPU_SET(0, &cpuset); - // CPU_SET(1, &cpuset); - // CPU_SET(2, &cpuset); - // CPU_SET(3, &cpuset); - - // int result = sched_setaffinity(0, sizeof(cpu_set_t), &cpuset); - // if (result != 0) - // { - // std::cout << "ros_experiment: sched_setaffinity failed: " << result << ": " << strerror(errno) << std::endl; - // } - - // set this process to second highest priority - struct sched_param param; - param.sched_priority = sched_get_priority_max(SCHED_FIFO) - 2; - int const result = sched_setscheduler(0, SCHED_FIFO, ¶m); - - if (result != 0) { - std::cerr << "ros_experiment: sched_setscheduler failed: " << result - << ": " << strerror(errno) << std::endl; - std::cerr << "This operation requires root privileges. Please run the program with sufficient permissions." - << std::endl; -#ifndef WIN_DOCKER_IS_BROKEN - exit(EXIT_FAILURE); -#else - // Windows Docker workaround: just print the error and continue - std::cerr << "Continuing without setting scheduler, fuck Windows" << std::endl; -#endif - } - - // Set the process name to "ros_experiment" for easier identification during debugging or system monitoring. - prctl(PR_SET_NAME, "ros_experiment", 0, 0, 0); - exec_fun(should_do_task); -} - -int ros_experiment( - int const argc, - char** const argv, - std::string const& file_name, - ExperimentConfig config) -{ - rclcpp::init(argc, argv); - - // we have to create a node to process any passed parameters - auto const node = rclcpp::Node::make_shared("experiment_parameters"); - node->declare_parameter("executor_type", "edf"); - - std::string executor_type; - node->get_parameter("executor_type", executor_type); - std::cout << "using executor type: " << executor_type << std::endl; - - config.executor_type = executor_type; - // Enable parallel mode for the experiment configuration. - // When set to true, this allows multiple executors to run concurrently, - // leveraging multi-threading to improve performance on multi-core systems. - config.parallel_mode = true; - config.nodes.push_back(node); - - Experiment experiment(config); - // std::string result_log = experiment.run(should_do_task); - auto const exec_funs = experiment.getRunFunctions(); - std::cout << "got " << exec_funs.size() << " executors" << std::endl; - - std::vector exec_threads; - int i = 0; - experiment.resetTimers(); - - for (auto const& exec_fun : exec_funs) { - exec_threads.emplace_back(run_one_executor, exec_fun, i++); - } - - for (auto& t : exec_threads) { - t.join(); - } - - std::string const outputname = "casestudy_wildfire_drone"; - experiment.writeLogsToFile(file_name, outputname); - -/* - // Create a PriorityMemoryStrategy instance - auto strat = std::make_shared>(); - - // Atomic flags to control the chains - std::atomic fire_detection_running{true}; - std::atomic image_processing_running{true}; - std::atomic data_streaming_running{true}; - std::atomic flight_control_running{true}; - - // Launch chains in separate threads - - - // Run for a fixed duration (e.g., 10 seconds) - std::this_thread::sleep_for(std::chrono::seconds(10)); - - // Stop all chains - fire_detection_running.store(false); - image_processing_running.store(false); - data_streaming_running.store(false); - flight_control_running.store(false); - - // Join threads - fire_detection_thread.join(); - image_processing_thread.join(); - data_streaming_thread.join(); - flight_control_thread.join(); - - std::cout << "All chains stopped. Exiting..." << std::endl; -*/ - - rclcpp::shutdown(); - return 0; -} - -int main(int argc, char* argv[]) { - // calibrate the dummy load for the current system - calibrate_dummy_load(); - - ExperimentConfig config; - // call back id: 0 1 2 3 4 5 6 7 8 9 10 11 12 13 - config.callback_priorities = { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}; - // duration of the callback in ms - config.callback_runtimes = { 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10}; - - // we have four chains where - // - the first is 1 + numOf(radiometric corrections) + 1 long - // - the second is 1 + numOf(radiometric corrections) + numOf(geometric corrections) + 1 long - // - the third is 3 long - // - the fourth is 4 long - config.callback_ids = { - {1, 2, 3, 0}, - {1, 2, 3, 4, 5, 6}, - {1, 2, 3, 4, 5, 6, 9, 7, 8}, - {9, 12, 13, 10, 11} - }; - config.chain_lengths = {4, 6, 9, 5}; - - // period of the timer of the starting node in ms - thus basically sensor frequency - config.chain_periods = {100, 100, 100, 100}; - config.chain_timer_control = {0, 0, 0, 0}; - - // TODO: ? - config.callback_executor_assignments = {}; - config.parallel_mode = false; - config.cores = 1; - - // TODO: you can add nodes to the config - what does it do? do we want that? - - sanity_check_config(config); - - std::thread ros_task([&]() { - try { - ros_experiment(argc, argv, "cs_example", config); - } catch (std::exception const& e) { - std::cerr << "Exception in ros_experiment: " << e.what() << std::endl; - } catch (...) { - std::cerr << "Unknown exception in ros_experiment." << std::endl; - } - }); - - std::cout << "tasks started" << std::endl; - ros_task.join(); - std::cout << "tasks joined" << std::endl; - - return 0; -} diff --git a/src/casestudy/src/casestudy_monitored_example.cpp b/src/casestudy/src/casestudy_monitored_example.cpp deleted file mode 100644 index d6ead78..0000000 --- a/src/casestudy/src/casestudy_monitored_example.cpp +++ /dev/null @@ -1,175 +0,0 @@ -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include "casestudy_tools/test_nodes.hpp" -#include "casestudy_tools/experiment.hpp" -#include "casestudy_tools/primes_workload.hpp" -#include "priority_executor/priority_executor.hpp" -#include "priority_executor/priority_memory_strategy.hpp" - -std::atomic should_do_task(true); - -// Global performance monitoring configuration -constexpr size_t MONITOR_BUFFER_SIZE = 10000; -constexpr size_t MONITOR_AUTO_DUMP_THRESHOLD = 5000; -const std::string MONITOR_OUTPUT_DIR = "performance_logs/monitored_example"; - -void configure_executor_monitoring(std::shared_ptr executor, - const std::string& name) { - executor->setMonitoringOptions( - MONITOR_BUFFER_SIZE, - MONITOR_AUTO_DUMP_THRESHOLD, - MONITOR_OUTPUT_DIR + "/" + name - ); - - // Enable monitoring by default - executor->enableMonitoring(true); -} - -void run_one_executor( - std::function&)> const& exec_fun, - int const idx) { - // Set up process scheduling - struct sched_param param; - param.sched_priority = sched_get_priority_max(SCHED_FIFO) - 2; - int const result = sched_setscheduler(0, SCHED_FIFO, ¶m); - - if (result != 0) { - std::cerr << "ros_experiment: sched_setscheduler failed: " << result - << ": " << strerror(errno) << std::endl; - std::cerr << "This operation requires root privileges. Please run the program with sufficient permissions." - << std::endl; -#ifndef WIN_DOCKER_IS_BROKEN - exit(EXIT_FAILURE); -#else - std::cerr << "Continuing without setting scheduler, fuck Windows" << std::endl; -#endif - } - - prctl(PR_SET_NAME, "ros_experiment", 0, 0, 0); - exec_fun(should_do_task); -} - -int ros_experiment( - int const argc, - char** const argv, - std::string const& file_name, - ExperimentConfig config) { - - rclcpp::init(argc, argv); - - auto const node = rclcpp::Node::make_shared("experiment_parameters"); - node->declare_parameter("executor_type", "edf"); - - std::string executor_type; - node->get_parameter("executor_type", executor_type); - std::cout << "using executor type: " << executor_type << std::endl; - - config.executor_type = executor_type; - config.parallel_mode = true; - config.nodes.push_back(node); - - Experiment experiment(config); - auto const exec_funs = experiment.getRunFunctions(); - std::cout << "got " << exec_funs.size() << " executors" << std::endl; - - // Configure monitoring for each executor - int executor_id = 0; - std::vector> monitored_executors; - for (const auto& executor : experiment.getExecutors()) { - if (auto timed_exec = std::dynamic_pointer_cast(executor.executor)) { - std::string exec_name = "executor_" + std::to_string(executor_id++); - configure_executor_monitoring(timed_exec, exec_name); - monitored_executors.push_back(timed_exec); - } - } - - std::vector exec_threads; - int i = 0; - experiment.resetTimers(); - - for (auto const& exec_fun : exec_funs) { - exec_threads.emplace_back(run_one_executor, exec_fun, i++); - } - - for (auto& t : exec_threads) { - t.join(); - } - - // Write performance reports for each executor - std::ofstream report_file(MONITOR_OUTPUT_DIR + "/performance_report.txt"); - if (report_file.is_open()) { - report_file << "===========================\n"; - report_file << " Performance Monitor Report\n"; - report_file << "===========================\n\n"; - - for (const auto& executor : monitored_executors) { - report_file << "Executor: " << executor->getExecutorName() - << " (ID: " << executor->getExecutorId() << ")\n"; - report_file << "----------------------------------------\n"; - report_file << executor->getCallbackExecutionReport() << "\n\n"; - } - - report_file.close(); - std::cout << "Performance report written to " << MONITOR_OUTPUT_DIR - << "/performance_report.txt" << std::endl; - } else { - std::cerr << "Failed to open performance report file" << std::endl; - } - - // Ensure all performance logs are written before experiment logs - auto& monitor = priority_executor::PerformanceMonitor::getInstance(); - monitor.dumpToFile("final_dump.json"); - - std::string const outputname = "monitored_example"; - experiment.writeLogsToFile(file_name, outputname); - - return 0; -} - -int main(int argc, char* argv[]) { - // Create the performance logs directory - std::filesystem::create_directories(MONITOR_OUTPUT_DIR); - - // calibrate the dummy load for the current system - calibrate_dummy_load(); - - ExperimentConfig config; - // Set up two chains of callbacks to demonstrate monitoring - config.chain_lengths = {2, 2}; - config.callback_ids = {{0, 1}, {2, 3}}; - config.callback_priorities = {1, 0, 3, 2}; - config.chain_timer_control = {0, 1}; - - // Set different runtimes to see varying processing times in the logs - config.callback_runtimes = {10, 20, 15, 25}; - config.chain_periods = {100, 100}; - config.callback_executor_assignments = {}; - config.parallel_mode = true; - config.cores = 2; - - sanity_check_config(config); - - std::thread ros_task([&]() { - try { - ros_experiment(argc, argv, "monitored_example", config); - } catch (std::exception const& e) { - std::cerr << "Exception in ros_experiment: " << e.what() << std::endl; - } catch (...) { - std::cerr << "Unknown exception in ros_experiment." << std::endl; - } - }); - - std::cout << "tasks started" << std::endl; - ros_task.join(); - std::cout << "tasks joined" << std::endl; - - return 0; -} \ No newline at end of file diff --git a/src/casestudy_tools/CMakeLists.txt b/src/casestudy_tools/CMakeLists.txt deleted file mode 100755 index 6b4c541..0000000 --- a/src/casestudy_tools/CMakeLists.txt +++ /dev/null @@ -1,91 +0,0 @@ -cmake_minimum_required(VERSION 3.8) -project(casestudy_tools VERSION 0.1.0) - -# Set C++ standards -set(CMAKE_CXX_STANDARD 17) -set(CMAKE_CXX_STANDARD_REQUIRED ON) -set(CMAKE_CXX_EXTENSIONS OFF) - -# Compiler options -if(CMAKE_COMPILER_IS_GNUCXX OR CMAKE_CXX_COMPILER_ID MATCHES "Clang") - add_compile_options(-Wall -Wextra -Wpedantic) -endif() - -# Find dependencies -find_package(ament_cmake REQUIRED) -find_package(tracetools REQUIRED) -find_package(rclcpp REQUIRED) -find_package(priority_executor REQUIRED) -find_package(simple_timer REQUIRED) - -# Library targets -add_library(primes_workload - src/primes_workload.cpp -) -target_include_directories(primes_workload PUBLIC - $ - $ -) -ament_target_dependencies(primes_workload - tracetools - simple_timer - rclcpp - priority_executor -) - -add_library(test_nodes src/test_nodes.cpp) -target_include_directories(test_nodes PUBLIC - $ - $ -) -target_link_libraries(test_nodes primes_workload) -ament_target_dependencies(test_nodes - tracetools - rclcpp - simple_timer -) -add_library(experiment_host - src/experiment.cpp -) -target_include_directories(experiment_host PUBLIC - $ - $ -) -target_link_libraries(experiment_host - primes_workload - test_nodes -) -ament_target_dependencies(experiment_host - tracetools - rclcpp - simple_timer - priority_executor -) - -# Testing -if(BUILD_TESTING) - find_package(ament_lint_auto REQUIRED) - ament_lint_auto_find_test_dependencies() -endif() - -# Installation -install( - DIRECTORY include/ - DESTINATION include -) - -install( - TARGETS primes_workload test_nodes experiment_host - EXPORT export_${PROJECT_NAME} - ARCHIVE DESTINATION lib - LIBRARY DESTINATION lib - RUNTIME DESTINATION bin - INCLUDES DESTINATION include -) - -# Export and package configuration -ament_export_include_directories(include) -# ament_export_libraries(primes_workload test_nodes experiment_host) -ament_export_targets(export_${PROJECT_NAME} HAS_LIBRARY_TARGET) -ament_export_dependencies(rclcpp priority_executor simple_timer) -ament_package() diff --git a/src/casestudy_tools/include/casestudy_tools/experiment.hpp b/src/casestudy_tools/include/casestudy_tools/experiment.hpp deleted file mode 100755 index 90c2524..0000000 --- a/src/casestudy_tools/include/casestudy_tools/experiment.hpp +++ /dev/null @@ -1,149 +0,0 @@ -#ifndef RTIS_EXPERIMENT_HOST -#define RTIS_EXPERIMENT_HOST - -#include -#include -#include -#include "priority_executor/priority_executor.hpp" -#include "casestudy_tools/test_nodes.hpp" -#include "simple_timer/rt-sched.hpp" - -struct ExperimentConfig -{ - std::vector chain_lengths; - std::vector> callback_ids; - - // duration of the callback in ms - std::vector callback_runtimes; - - // used for picas or in edf as tie breaker - std::vector callback_priorities; - - // period of the timer of the starting node in ms - std::vector chain_periods; - - // for each chain, the index of the timer node - TODO: why would I ever want this to be non 0? - std::vector chain_timer_control; - - uint num_groups = 0; - std::vector group_memberships; - - // TODO: change to enum - // can be "default", "picas", "edf" - std::string executor_type = "default"; - - // if this is empty, then the nodes will be assigned to a single executor - std::vector callback_executor_assignments; - - // used in single-threaded mode to assign executors to cores - std::vector executor_to_cpu_assignments; - - bool parallel_mode = false; - size_t cores = 2; - - // nodes to host the callbacks. In parallel (one executor using multiple threads) mode, - // there is one node and one executor. The node is provided by the experiment provider. - // In singlethread (each executor uses one thread), there is one node per executor. - // Additional nodes may be created by the experiment host. - std::vector nodes; - std::vector special_cases; -}; - -struct experiment_executor -{ - std::shared_ptr executor; - std::shared_ptr> strat; - std::shared_ptr default_executor; -}; - -class Experiment -{ -public: - explicit Experiment(ExperimentConfig config); - - /** - * @brief Run the experiment in the current thread - * @param should_do_task a flag that can be set to false to stop the experiment. - * The flag will be set to false when the experiment is complete. - * @return std::string containing experiment results - */ - std::string run(std::atomic &should_do_task); - - /** - * @brief Get the functions required to run the experiment - * @return vector of functions. Each function will run a different executor. - * It's up to the caller to run these functions in different threads. - * The caller may set the thread properties (e.g. priority) before running the function. - * Each function accepts a flag that can be set to false to stop the experiment. - * The flag will be set to false when the experiment is complete. - */ - std::vector &)>> getRunFunctions(); - - static size_t numExecutorsRequired(ExperimentConfig const &config); - - /** - * @brief Get the executor (and strategy) for a given executor index - * @param executor_idx The index of the executor (defaults to 0) - * @return experiment_executor - */ - experiment_executor getExecutor(int executor_idx = 0); - - /** - * @brief Get the executors - * @return experiment_executors - */ - std::vector getExecutors(); - - node_time_logger getInternalLogger(); - - /** - * @brief Collect logs from the executors and optionally add logs from passed in loggers - * @param loggers Optional array of loggers to add to the logs - * @return std::string containing the combined logs - */ - std::string buildLogs(std::vector loggers = {}); - - void resetTimers(); - void writeLogsToFile(std::string const &filename, std::string const &experiment_name); - -private: - ExperimentConfig config; - std::vector executors; - - // Chain-indexed vector of deques of deadlines - std::vector *> chain_deadlines_deque; - // Maps node_id to node - std::map> nodes; - // Maps chain_id to timer - std::map> chain_timer_handles; - - /** - * @brief Create the executors using the type and amount set in the config - */ - void createExecutors(); - - /** - * @brief Create a single executor using the type set in the config - * @param executor_num The executor number to create - * @return experiment_executor - */ - experiment_executor createSingleExecutor(uint executor_num = 0); - - /** - * @brief Get the executor for a given node id - * @param node_id If node_executor_assignments is empty, this parameter is ignored - * and the first executor is returned - * @return experiment_executor - */ - experiment_executor getExecutorForNode(int node_id = 0); - - void createNodesAndAssignProperties(); - void setInitialDeadlines(); - - node_time_logger _logger; -}; - -int do_other_task(double work_time, uint period, std::atomic &should_do_task); -void sanity_check_config(ExperimentConfig const &config); - -#endif // RTIS_EXPERIMENT_HOST \ No newline at end of file diff --git a/src/casestudy_tools/include/casestudy_tools/primes_workload.hpp b/src/casestudy_tools/include/casestudy_tools/primes_workload.hpp deleted file mode 100755 index 058c738..0000000 --- a/src/casestudy_tools/include/casestudy_tools/primes_workload.hpp +++ /dev/null @@ -1,44 +0,0 @@ -#ifndef RTIS_PRIMES_WORKLOAD -#define RTIS_PRIMES_WORKLOAD - -#include -#include - -#include -#include - -// Parameters for the dummy load calibration -constexpr int TARGET_DURATION_MS = 100; -constexpr int MAX_ITERATIONS = 100; -constexpr int ACCEPTABLE_MARGIN_US = 500; - -/** - * @brief Dummy load calibration namespace. - * This namespace contains functions and variables for calibrating the dummy load. - * It uses an atomic integer to store the calibration factor. - * The calibration factor is used to adjust the workload of the dummy load function. - */ -namespace DummyLoadCalibration -{ - inline std::atomic calibration{1}; - - inline void setCalibration(int const calib); - - inline int getCalibration(); -} - -/** - * @brief Dummy load function to simulate a workload. - * This function performs a busy wait for a specified duration. - * It is used to calibrate the dummy load for the current system. - */ -void calibrate_dummy_load(); - -/** - * @brief Dummy load function to simulate a workload. - * This function performs a busy wait for a specified duration. - * It is used to calibrate the dummy load for the current system. - */ -void dummy_load(int const load_ms); - -#endif \ No newline at end of file diff --git a/src/casestudy_tools/include/casestudy_tools/test_nodes.hpp b/src/casestudy_tools/include/casestudy_tools/test_nodes.hpp deleted file mode 100755 index b5ea038..0000000 --- a/src/casestudy_tools/include/casestudy_tools/test_nodes.hpp +++ /dev/null @@ -1,66 +0,0 @@ -#ifndef RTIS_TEST_NODES -#define RTIS_TEST_NODES - -#include -#include "rclcpp/rclcpp.hpp" -#include "std_msgs/msg/string.hpp" -#include "std_msgs/msg/int32.hpp" -#include "simple_timer/rt-sched.hpp" - -class CaseStudyNode { -public: - node_time_logger logger; - // inherit constructor - CaseStudyNode(std::string const &publish_topic, rclcpp::Node::SharedPtr const &node) { - this->name = publish_topic; - this->node = node; - // seed random number generator - srand(time(nullptr)); - } - - bool use_random_runtime = false; - double normal_runtime_scale = 0.8; - double over_runtime_scale = 1.2; - double runtime_over_chance = 0.1; - std::string get_name(); - rclcpp::CallbackGroup::SharedPtr callback_group_; - -private: - rclcpp::Node::SharedPtr node; - std::string name; -}; - -#define COUNT_MAX 500 - -class PublisherNode : public CaseStudyNode { -public: - PublisherNode(std::string const &publish_topic, double runtime, int period, int chain, - rclcpp::Node::SharedPtr const &node, - rclcpp::CallbackGroup::SharedPtr const &callback_group = nullptr); - rclcpp::TimerBase::SharedPtr timer_; - int count_max_ = COUNT_MAX; - -private: - rclcpp::Publisher::SharedPtr pub_; - double runtime_; - int period_; - int chain_; -}; - -class WorkerNode : public CaseStudyNode { -public: - WorkerNode(std::string const &subscribe_topic, std::string const &publish_topic, - double runtime, int period, int chain, - rclcpp::Node::SharedPtr const &node, - rclcpp::CallbackGroup::SharedPtr const &callback_group = nullptr); - rclcpp::Subscription::SharedPtr sub_; - int count_max_ = COUNT_MAX; - -private: - double runtime_; - int period_; - int chain_; - rclcpp::Publisher::SharedPtr pub_; -}; - -#endif // RTIS_TEST_NODES \ No newline at end of file diff --git a/src/casestudy_tools/package.xml b/src/casestudy_tools/package.xml deleted file mode 100755 index 33bb1bd..0000000 --- a/src/casestudy_tools/package.xml +++ /dev/null @@ -1,25 +0,0 @@ - - - - casestudy_tools - 0.1.0 - - ROS 2 package providing utility tools and common functionality for dynamic executor case studies - - kurt - Apache License 2.0 - - ament_cmake - - tracetools - rclcpp - priority_executor - simple_timer - - ament_lint_auto - ament_lint_common - - - ament_cmake - - diff --git a/src/casestudy_tools/src/experiment.cpp b/src/casestudy_tools/src/experiment.cpp deleted file mode 100755 index dbbed77..0000000 --- a/src/casestudy_tools/src/experiment.cpp +++ /dev/null @@ -1,614 +0,0 @@ -#include "casestudy_tools/experiment.hpp" -#include "casestudy_tools/test_nodes.hpp" -#include "casestudy_tools/primes_workload.hpp" - -#include "priority_executor/priority_executor.hpp" -#include "priority_executor/priority_memory_strategy.hpp" -#include "priority_executor/multithread_priority_executor.hpp" - -#include -#include -#include -#include -#include -#include -#include -#include - - -Experiment::Experiment(ExperimentConfig config) - : config(std::move(config)) { - _logger = create_logger(); -} - -void sanity_check_config(ExperimentConfig const &config) { - // make sure chain_lengths and node_ids are the same size - if (config.chain_lengths.size() != config.callback_ids.size()) { - std::cout << "ros_experiment: chain_lengths.size()= " - << config.chain_lengths.size() - << " != node_ids.size()= " << config.callback_ids.size() << std::endl; - exit(1); - } - - // make sure each chain_lengths is the same size as the corresponding node_ids - for (uint32_t i = 0; i < config.chain_lengths.size(); i++) { - if (config.chain_lengths[i] != config.callback_ids[i].size()) { - std::cout << "ros_experiment: chain_lengths[" << i - << "]= " << config.chain_lengths[i] << " != node_ids[" << i - << "].size()= " << config.callback_ids[i].size() << std::endl; - exit(1); - } - } - - // make sure chain_timer_control is the same size as chain_lengths - if (config.chain_timer_control.size() != config.chain_lengths.size()) { - std::cout << "ros_experiment: chain_timer_control.size()= " - << config.chain_timer_control.size() - << " != chain_lengths.size()= " << config.chain_lengths.size() - << std::endl; - exit(1); - } - - std::set all_node_ids; - for (auto const &chain : config.callback_ids) { - for (auto const &node_id : chain) { - all_node_ids.insert(node_id); - } - } - - // make sure we have the right number of node_priorities and node_runtimes - if (all_node_ids.size() != config.callback_priorities.size()) { - std::cout << "ros_experiment: all_node_ids.size()= " << all_node_ids.size() - << " != node_priorities.size()= " << config.callback_priorities.size() - << std::endl; - exit(1); - } - if (all_node_ids.size() != config.callback_runtimes.size()) { - std::cout << "ros_experiment: all_node_ids.size()= " << all_node_ids.size() - << " != node_runtimes.size()= " << config.callback_runtimes.size() - << std::endl; - exit(1); - } -} - -std::string Experiment::run(std::atomic& should_do_task) { - // TODO: split into setup and run, so that run can be re-used in - // Experiment::getRunFunctions - if (!config.callback_executor_assignments.empty() - || numExecutorsRequired(config) != 1) { - std::cerr << "called Experiment::run with non-empty node_executor_assignments" - << std::endl; - return ""; - } - - createExecutors(); - createNodesAndAssignProperties(); - setInitialDeadlines(); - resetTimers(); - - // In this function (run), there is only one executor, so retrieve it now - experiment_executor executor = getExecutorForNode(); - if (config.executor_type == "edf" || config.executor_type == "picas") { - executor.executor->spin(); - } else if (config.executor_type == "default") { - executor.default_executor->spin(); - } - - std::cout << "spin done" << std::endl; - should_do_task.store(false); - return buildLogs(); -} - -std::vector&)>> -Experiment::getRunFunctions() { - // Do pre-run setup - createExecutors(); - createNodesAndAssignProperties(); - - // Create a function for each executor - std::vector&)>> run_functions; - for (size_t i = 0; i < numExecutorsRequired(config); i++) { - experiment_executor const executor = executors[i]; - run_functions.emplace_back( - [this, executor](std::atomic& should_do_task) { - if (config.executor_type == "edf" || config.executor_type == "picas") { - executor.executor->spin(); - } else if (config.executor_type == "default") { - executor.default_executor->spin(); - } - std::cout << "spin done" << std::endl; - should_do_task.store(false); - }); - } - - // Do these last since they're time-sensitive - setInitialDeadlines(); - return run_functions; -} - -size_t Experiment::numExecutorsRequired(ExperimentConfig const &config) { - if (config.callback_executor_assignments.empty()) { - return 1; - } - - std::set unique_executors; - for (auto const &assignment : config.callback_executor_assignments) { - unique_executors.insert(assignment); - } - return unique_executors.size(); -} - -experiment_executor Experiment::getExecutor(int const executor_idx) { - return executors[executor_idx]; -} - -std::vector Experiment::getExecutors() { - return executors; -} - -void Experiment::createExecutors() { - if (config.callback_executor_assignments.empty()) { - // Create a single executor - executors.push_back(createSingleExecutor(0)); - return; - } - - // Check configuration validity - // depending on config.executor_to_cpu_assignments and config.parallel_mode, - // we may need to create the host nodes - if (!config.executor_to_cpu_assignments.empty() && config.parallel_mode) { - std::cerr << "executor_to_cpu_assignments not supported for parallel mode" - << std::endl; - } - - if (!config.executor_to_cpu_assignments.empty() - && config.executor_to_cpu_assignments.size() != numExecutorsRequired(config)) { - std::cerr << "executor_to_cpu_assignments.size() != numExecutorsRequired(config)" - << std::endl; - exit(1); - } - - // Create the required host nodes - re-use any existing nodes (by not starting at 0) - for (uint i = config.nodes.size(); i < config.executor_to_cpu_assignments.size(); i++) { - // create a node for each executor - auto const node = std::make_shared("node_" + std::to_string(i)); - config.nodes.push_back(node); - } - std::cout << "created " << config.nodes.size() << " nodes" << std::endl; - - int const num_executors = numExecutorsRequired(config); - std::cout << "creating " << num_executors << " executors" << std::endl; - for (int i = 0; i < num_executors; i++) { - executors.push_back(createSingleExecutor(i)); - } -} - -experiment_executor Experiment::createSingleExecutor(uint const executor_num) { - experiment_executor executor = {nullptr, nullptr, nullptr}; - - if (config.executor_type == "edf" || config.executor_type == "picas") { - executor.strat = std::make_shared>(); - rclcpp::ExecutorOptions options; - options.memory_strategy = executor.strat; - - if (config.parallel_mode) { - executor.executor = std::make_shared( - options, "priority_executor", config.cores); - } else { - executor.executor = std::make_shared(options); - } - - executor.executor->prio_memory_strategy_ = executor.strat; - - if (config.callback_executor_assignments.empty()) { - // Add all nodes to the executor - for (auto const &node : config.nodes) { - executor.executor->add_node(node); - } - } else { - // Add only this executor's nodes - executor.executor->add_node(config.nodes[executor_num]); - } - } else if (config.executor_type == "default") { - if (config.parallel_mode) { - // executor.default_executor = - // std::make_shared(rclcpp::ExecutorOptions(), - // config.cores); - executor.default_executor = std::make_shared( - rclcpp::ExecutorOptions(), config.cores); - } else { - executor.default_executor = std::make_shared(); - } - - if (config.callback_executor_assignments.empty()) { - // Add all nodes to the executor - for (auto const &node : config.nodes) { - executor.default_executor->add_node(node); - } - } else { - // Add only this executor's nodes - executor.default_executor->add_node(config.nodes[executor_num]); - } - } - - // RCLCPP_ERROR(node->get_logger(), "Unknown executor type: %s", - // executor_type.c_str()); - return executor; -} - -experiment_executor Experiment::getExecutorForNode(int const node_id) { - if (config.callback_executor_assignments.empty()) { - return executors[0]; - } - - int const executor_idx = config.callback_executor_assignments[node_id]; - std::cout << "node " << node_id << " assigned to executor " << executor_idx - << std::endl; - return executors[executor_idx]; -} - -void Experiment::createNodesAndAssignProperties() { - // BEGIN SPECIAL CASES - std::vector>> all_nodes; - rclcpp::CallbackGroup::SharedPtr cb_group0; - // LEGACY GROUP BEHAVIOR - bool jiang2022_cb = false; - bool jiang2022_cs2 = false; - // one-to-many test. TODO: add generic api for multiple one->many - bool ours2024_latency = false; - for (auto &special_case : config.special_cases) { - if (special_case == "jiang2022_cb") { - jiang2022_cb = true; - } else if (special_case == "2022jiang2") { - jiang2022_cs2 = true; - } else if (special_case == "2024ours_latency") { - std::cout << "using 2024ours_latency special case" << std::endl; - ours2024_latency = true; - } - } - if (jiang2022_cb || jiang2022_cs2) { - // cb_group0 = - // config.node->create_callback_group(rclcpp::CallbackGroupType::MutuallyExclusive); - } - // END SPECIAL CASES - - std::vector cb_groups; - if (config.num_groups != 0 && - !config.executor_to_cpu_assignments.empty()) { - std::cout << "WARNING: num_groups and executor_to_cpu_assignments are both " - "set. This is not supported. Ignoring num_groups." - << std::endl; - } - for (uint32_t i = 0; i < config.num_groups; i++) { - cb_groups.push_back(config.nodes[0]->create_callback_group( - rclcpp::CallbackGroupType::MutuallyExclusive)); - } - - // for each chain - for (uint32_t chain_id = 0; chain_id < config.callback_ids.size(); chain_id++) { - all_nodes.push_back(std::vector>()); - - // for each node in the chain - bool first_node = true; - for (uint32_t node_chain_idx = 0; - node_chain_idx < config.callback_ids[chain_id].size(); node_chain_idx++) { - uint32_t const node_id = config.callback_ids[chain_id][node_chain_idx]; - // has this node been created yet? - if (nodes.find(node_id) != nodes.end()) { - // if it has, then this node exists in another chain - // if it's a timer node, we should add it to the timer handle map - continue; - } - experiment_executor executor = getExecutorForNode(node_id); - std::shared_ptr handle; - priority_executor::ExecutableType type; - // is this the first node in the chain? - if (node_chain_idx == 0) { - // create a timer node - rclcpp::CallbackGroup::SharedPtr cb_group = nullptr; - if (jiang2022_cs2) { - std::cout << "jiang2022_cs2: placing all nodes in cb_group0" - << std::endl; - cb_group = cb_group0; - } - if (config.num_groups != 0 && config.group_memberships[node_id] != 0) { - cb_group = cb_groups[config.group_memberships[node_id] - 1]; - } - std::shared_ptr node; - if (config.callback_executor_assignments.empty()) { - node = std::make_shared( - "node_" + std::to_string(node_id), config.callback_runtimes[node_id], - config.chain_periods[chain_id], chain_id, config.nodes[0], - cb_group); - } else { - node = std::make_shared( - "node_" + std::to_string(node_id), config.callback_runtimes[node_id], - config.chain_periods[chain_id], chain_id, - config.nodes[config.callback_executor_assignments[node_id]], - cb_group); - } - handle = node->timer_->get_timer_handle(); - nodes[node_id] = node; - type = priority_executor::ExecutableType::TIMER; - // this_chain_timer_handle = node->timer_; - std::cout << "chain timer handle set: " << chain_id << std::endl; - chain_timer_handles[chain_id] = node->timer_; - - all_nodes[chain_id].push_back(node); - } else { - // create a subscriber node - int const subscription_id = config.callback_ids[chain_id][node_chain_idx - 1]; - rclcpp::CallbackGroup::SharedPtr cb_group; - if (jiang2022_cb && ((chain_id == 1 && node_chain_idx == 1) || - (chain_id == 2 && node_chain_idx == 1))) { - cb_group = cb_group0; - std::cout << "adding c: " << chain_id << " n: " << node_id - << " to cb group 0" << std::endl; - } else { - cb_group = nullptr; - } - - if (config.num_groups != 0 && config.group_memberships[node_id] != 0) { - cb_group = cb_groups[config.group_memberships[node_id] - 1]; - } - - std::shared_ptr node; - if (ours2024_latency) { - uint32_t const first_node_id = config.callback_ids[chain_id][0]; - node = std::make_shared( - "node_" + std::to_string(first_node_id), - "node_" + std::to_string(node_id), config.callback_runtimes[node_id], - config.chain_periods[chain_id], chain_id, config.nodes[0], - cb_group); - } else if (config.callback_executor_assignments.empty()) { - node = std::make_shared( - "node_" + std::to_string(subscription_id), - "node_" + std::to_string(node_id), config.callback_runtimes[node_id], - config.chain_periods[chain_id], chain_id, config.nodes[0], - cb_group); - } else { - node = std::make_shared( - "node_" + std::to_string(subscription_id), - "node_" + std::to_string(node_id), config.callback_runtimes[node_id], - config.chain_periods[chain_id], chain_id, - config.nodes[config.callback_executor_assignments[node_id]], - cb_group); - } - handle = node->sub_->get_subscription_handle(); - nodes[node_id] = node; - type = priority_executor::ExecutableType::SUBSCRIPTION; - - all_nodes[chain_id].push_back(node); - } - if (config.executor_type == "edf") { - std::string const cb_name = "node_" + std::to_string(node_id) + "_cb"; - executor.strat->set_executable_deadline( - handle, config.chain_periods[chain_id], type, chain_id, cb_name); - // executor.executor->add_node(nodes[node_id]); - // make sure timer handle exists. if not, we did something wrong - if (chain_timer_handles.find(config.chain_timer_control[chain_id]) == - chain_timer_handles.end()) { - std::cerr << "chain timer handle not found for chain " << chain_id - << ": " << config.chain_timer_control[chain_id] - << std::endl; - } - executor.strat->get_priority_settings(handle)->timer_handle = - chain_timer_handles[config.chain_timer_control[chain_id]]; - executor.strat->get_priority_settings(handle)->priority = - config.callback_priorities[node_id]; // use priority as tiebreaker - if (first_node) { - executor.strat->set_first_in_chain(handle); - first_node = false; - } - // is this the last node in the chain? - if (node_chain_idx == config.callback_ids[chain_id].size() - 1) { - executor.strat->set_last_in_chain(handle); - } - } else if (config.executor_type == "picas") { - // executor.executor->add_node(nodes[node_id]); - executor.strat->set_executable_priority( - handle, config.callback_priorities[node_id], type, - priority_executor::ExecutableScheduleType::CHAIN_AWARE_PRIORITY, chain_id); - std::cout << "node " << node_id << " has priority " - << config.callback_priorities[node_id] << std::endl; - } else if (config.executor_type == "default") { - // executor.default_executor->add_node(nodes[node_id]); - } - // TODO: other types - } - } -} - -std::string Experiment::buildLogs(std::vector node_logs) { - std::stringstream output_file; - - output_file << "[" << std::endl; - // for each node - for (auto const &node : nodes) { - // print the node's logs - node_time_logger const logger = node.second->logger; - for (auto const &log : *(logger.recorded_times)) { - output_file << "{\"entry\": " << log.first - << ", \"time\": " << log.second << "}," << std::endl; - } - } - - if (config.executor_type == "edf" || config.executor_type == "picas") { - for (auto const &executor : executors) { - node_time_logger const strat_log = executor.strat->logger_; - for (auto const &log : *(strat_log.recorded_times)) { - // std::cout << log.first << " " << log.second << std::endl; - output_file << "{\"entry\": " << log.first - << ", \"time\": " << log.second << "}," << std::endl; - } - - // executor may be a subclass of RTISTimed, so we can get its logger - if (auto const *timed_executor = dynamic_cast(executor.executor.get())) { - node_time_logger const executor_log = timed_executor->logger_; - for (auto const &log : *(executor_log.recorded_times)) { - output_file << "{\"entry\": " << log.first - << ", \"time\": " << log.second << "}," << std::endl; - } - } else { - std::cout << "executor is not RTISTimed" << std::endl; - } - } - } else { - // a default executor - for (auto const &executor : executors) { - // experimental: executor.default_executor's type is rclcpp::Executor, but - // here it's guaranteed to inherit from RTISTimed we can get a logger from - // it. for safety, we check the type first - if (auto const* timed_executor = dynamic_cast(executor.default_executor.get())) { - node_time_logger const default_log = timed_executor->logger_; - for (auto const &log : *(default_log.recorded_times)) { - output_file << "{\"entry\": " << log.first - << ", \"time\": " << log.second << "}," << std::endl; - } - std::cout << "recovered logs from default executor" << std::endl; - } else { - std::cout << "default executor is not RTISTimed" << std::endl; - } - } - } - - // add internal logs - for (auto const &log_entry : *(_logger.recorded_times)) { - output_file << "{\"entry\": " << log_entry.first - << ", \"time\": " << log_entry.second << "}," << std::endl; - } - - // add logs from argument - for (auto const &log : node_logs) { - for (auto const &log_entry : *(log.recorded_times)) { - output_file << "{\"entry\": " << log_entry.first - << ", \"time\": " << log_entry.second << "}," << std::endl; - } - } - // remove the last comma - output_file.seekp(-2, std::ios_base::end); - output_file << "]" << std::endl; - - return output_file.str(); -} - -void Experiment::setInitialDeadlines() { - timespec current_time; - clock_gettime(CLOCK_MONOTONIC_RAW, ¤t_time); - uint64_t const millis = - (current_time.tv_sec * 1000UL) + (current_time.tv_nsec / 1000000); - // we need a logger to record the first deadline, which is generated here - // TODO: generate the first deadline in the executor (or node) itself - if (config.executor_type == "edf") { - for (uint32_t chain_index = 0; chain_index < config.chain_lengths.size(); - chain_index++) { - size_t executor_idx; - if (config.callback_executor_assignments.empty()) { - executor_idx = 0; - } else { - executor_idx = config.callback_executor_assignments[chain_index]; - } - std::shared_ptr> deadlines = - executors[executor_idx].strat->get_chain_deadlines(chain_index); - deadlines->push_back(millis + config.chain_periods[chain_index] * 2); - std::stringstream oss; - oss << "{\"operation\": \"next_deadline\", \"chain_id\": " << chain_index - << ", \"deadline\": " - << millis + config.chain_periods[chain_index] * 2 << "}"; - // std::cout<<"initial deadline: "<logger_, - // oss.str()); - log_entry(_logger, oss.str()); - } - } -} - -void Experiment::resetTimers() { - for (auto const &timer : chain_timer_handles) { - std::cout << "resetting timer " << timer.first << std::endl; - if (timer.second == nullptr) { - std::cout << "timer is null" << std::endl; - } else { - timer.second->reset(); - /* int64_t old_period; - // set the period to 0 for force immediate execution - rcl_timer_exchange_period(timer.second->get_timer_handle().get(), 0, - &old_period); - // timer.second->get_timer_handle() */ - } - } - // sleep 1 seconds to let every timer reset - // std::this_thread::sleep_for(std::chrono::seconds(1)); -} - -void Experiment::writeLogsToFile(std::string const &file_name, - std::string const &experiment_name) { - std::filesystem::path results_dir = "results"; - std::filesystem::path exp_dir = results_dir / experiment_name; - - // Create directories if they don't exist - try { - std::filesystem::create_directories(exp_dir); - } catch (std::filesystem::filesystem_error const &e) { - std::cout << "Could not create results directory: " << e.what() << std::endl; - return; - } - - std::string output_filename = (file_name + "_" + this->config.executor_type + ".json"); - std::filesystem::path output_path = exp_dir / output_filename; - - std::ofstream output_file(output_path); - output_file << buildLogs(); - output_file.close(); - - std::cout << "results written to " << output_path.string() << std::endl; -} - -int do_other_task(double const work_time, uint const period, - std::atomic& should_do_task) { - if (work_time == 0) { - std::cout << "other task: work time is 0, exiting" << std::endl; - return 0; - } - // pin to CPU 0 with highest priority - cpu_set_t cpuset; - CPU_ZERO(&cpuset); - CPU_SET(0, &cpuset); - int const result = sched_setaffinity(0, sizeof(cpu_set_t), &cpuset); - if (result != 0) { - std::cout << "other task: could not set affinity: " << result << ": " - << strerror(errno) << std::endl; - } - - struct sched_param param; - param.sched_priority = sched_get_priority_max(SCHED_FIFO) - 1; - sched_setscheduler(0, SCHED_FIFO, ¶m); - if (result != 0) { - std::cout << "other task: could not set scheduler: " << result << ": " - << strerror(errno) << std::endl; - } - - prctl(PR_SET_NAME, "other_task", 0, 0, 0); - - // do every period milliseconds - do { - timespec current_time; - clock_gettime(CLOCK_MONOTONIC_RAW, ¤t_time); - uint64_t const millis_start = (current_time.tv_sec * (uint64_t)1000) + - (current_time.tv_nsec / 1000000); - - // do work - // nth_prime_silly(work_time); - dummy_load(work_time); - - // sleep until next period - clock_gettime(CLOCK_MONOTONIC_RAW, ¤t_time); - uint64_t const millis_end = (current_time.tv_sec * (uint64_t)1000) + - (current_time.tv_nsec / 1000000); - uint64_t const sleep_time = period - (millis_end - millis_start); - std::this_thread::sleep_for(std::chrono::milliseconds(sleep_time)); - } while (should_do_task); - std::cout << "other task done" << std::endl; - return 0; -} \ No newline at end of file diff --git a/src/casestudy_tools/src/primes_workload.cpp b/src/casestudy_tools/src/primes_workload.cpp deleted file mode 100755 index fdec6af..0000000 --- a/src/casestudy_tools/src/primes_workload.cpp +++ /dev/null @@ -1,71 +0,0 @@ -#include -#include -#include -#include - -#include "casestudy_tools/primes_workload.hpp" - - -void DummyLoadCalibration::setCalibration(int const calib) { - calibration.store(calib, std::memory_order_relaxed); -} - -int DummyLoadCalibration::getCalibration() { - return calibration.load(std::memory_order_relaxed); -} - -void calibrate_dummy_load() -{ - int calibration_factor = 1; - for (int iteration = 0; iteration < MAX_ITERATIONS; ++iteration) - { - // Measure the time taken for the dummy load - auto const start_time = std::chrono::high_resolution_clock::now(); - dummy_load(TARGET_DURATION_MS); // intended workload - auto const end_time = std::chrono::high_resolution_clock::now(); - - // Calculate duration in microseconds - auto const duration_us = - std::chrono::duration_cast(end_time - start_time).count(); - - std::cout << "[Calibration] Iteration: " << iteration - << ", Calibration Factor: " << calibration_factor - << ", Measured Duration: " << duration_us << " µs" << std::endl; - - // Check if within acceptable margin - if (std::abs(duration_us - TARGET_DURATION_MS * 1000) < ACCEPTABLE_MARGIN_US) - { - std::cout << "[Calibration] Converged successfully." << std::endl; - break; - } - - // Adjust calibration factor proportionally - calibration_factor = (TARGET_DURATION_MS * 1000 * calibration_factor) / duration_us; - - if (calibration_factor <= 0) - { - calibration_factor = 1; - } - - DummyLoadCalibration::setCalibration(calibration_factor); - } -} - -void dummy_load(int const load_ms) -{ - int const calib_factor = DummyLoadCalibration::getCalibration(); - - // calibration factor corresponds to 100ms load, scale it - int const scaled_factor = calib_factor * load_ms / 100; - - volatile int dummy_var = 0; - constexpr int DUMMY_LOAD_ITER = 100'000; - - for (int j = 0; j < scaled_factor; ++j) - { - for (int i = 0; i < DUMMY_LOAD_ITER; ++i) - { - dummy_var += i; // clearer dummy operation (optimizations prevented by volatile) - } - } -} diff --git a/src/casestudy_tools/src/test_nodes.cpp b/src/casestudy_tools/src/test_nodes.cpp deleted file mode 100755 index bf44280..0000000 --- a/src/casestudy_tools/src/test_nodes.cpp +++ /dev/null @@ -1,180 +0,0 @@ -#include "casestudy_tools/test_nodes.hpp" -#include "casestudy_tools/primes_workload.hpp" -#include "rclcpp/rclcpp.hpp" -#include "simple_timer/rt-sched.hpp" -#include -#include -#include - -std::string CaseStudyNode::get_name() { - return name; -} - -#define BUFFER_LENGTH 0 - -rclcpp::QoS get_qos() { - if (BUFFER_LENGTH == 0) { - return rclcpp::QoS(rclcpp::KeepAll()); - } - return rclcpp::QoS(rclcpp::KeepLast(BUFFER_LENGTH)); -} - -void timespec_diff(timespec const* start, timespec const* end, timespec* result) { - if ((end->tv_nsec - start->tv_nsec) < 0) { - result->tv_sec = end->tv_sec - start->tv_sec - 1; - result->tv_nsec = 1000000000 + end->tv_nsec - start->tv_nsec; - } else { - result->tv_sec = end->tv_sec - start->tv_sec; - result->tv_nsec = end->tv_nsec - start->tv_nsec; - } -} - -PublisherNode::PublisherNode( - std::string const &publish_topic, - double const runtime, - int const period, - int const chain, - rclcpp::Node::SharedPtr const &node, - rclcpp::CallbackGroup::SharedPtr const &callback_group) - : CaseStudyNode(publish_topic, node) - , runtime_(runtime) - , period_(period) - , chain_(chain) { - - logger = create_logger(); - pub_ = node->create_publisher(publish_topic, get_qos()); - - auto const timer_callback = [this]() -> void { - std::ostringstream ss; - ss << "{\"operation\": \"start_work\", \"chain\": " << chain_ - << ", \"node\": \"" << get_name() << "\", \"count\": " << count_max_; - - std::chrono::nanoseconds const time_until_trigger = timer_->time_until_trigger(); - std::chrono::microseconds const time_until_trigger_us = - std::chrono::duration_cast(time_until_trigger); - - ss << ", \"next_release_us\": " << time_until_trigger_us.count() << "}"; - log_entry(logger, ss.str()); - - double used_runtime = runtime_; - if (use_random_runtime) { - if (rand() % 100 < runtime_over_chance * 100) { - used_runtime *= over_runtime_scale; - } else { - used_runtime *= normal_runtime_scale; - } - } - - // RCLCPP_INFO(rclcpp::get_logger(get_name()), "running %s with runtime %f", - // get_name().c_str(), used_runtime); - - struct timespec start_time; - clock_gettime(CLOCK_MONOTONIC, &start_time); - - dummy_load(used_runtime); - std_msgs::msg::Int32 msg; - msg.data = count_max_; - // RCLCPP_INFO(this->get_logger(), "Publishing: '%s'", msg.data.c_str()); - pub_->publish(msg); - // TEST: can we take more than one msg from DDS? - // pub_->publish(msg); - - ss.str(""); - ss << "{\"operation\": \"end_work\", \"chain\": " << chain_ - << ", \"node\": \"" << get_name() << "\", \"count\": " << count_max_ - << ", \"target_runtime\": " << runtime_ << "}"; - log_entry(logger, ss.str()); - - count_max_--; - if (count_max_ == 0) { - rclcpp::shutdown(); - } -/* struct timespec end_time; - clock_gettime(CLOCK_MONOTONIC, &end_time); - struct timespec diff; - timespec_diff(&start_time, &end_time, &diff); - size_t diff_ns = diff.tv_sec * 1000000000 + diff.tv_nsec; - size_t diff_ms = diff_ns / 1000000; - RCLCPP_INFO(rclcpp::get_logger(get_name()), "done running %s, took %lu ms", - get_name().c_str(), diff_ms); */ - }; - - if (callback_group != nullptr) { - callback_group_ = callback_group; - } else { - callback_group_ = node->create_callback_group( - rclcpp::CallbackGroupType::Reentrant); - } - - timer_ = node->create_wall_timer( - std::chrono::milliseconds(period), - timer_callback, - callback_group_); -} - -WorkerNode::WorkerNode( - std::string const &subscribe_topic, - std::string const &publish_topic, - double const runtime, - int const period, - int const chain, - rclcpp::Node::SharedPtr const &node, - rclcpp::CallbackGroup::SharedPtr const &callback_group) - : CaseStudyNode(publish_topic, node) - , runtime_(runtime) - , period_(period) - , chain_(chain) { - - logger = create_logger(); - - auto const callback = [this](std_msgs::msg::Int32::SharedPtr const msg) -> void { - // prevent unused variable warning - (void)msg; - - std::ostringstream ss; - ss << "{\"operation\": \"start_work\", \"chain\": " << chain_ - << ", \"node\": \"" << get_name() << "\", \"count\": " << msg->data - << "}"; - log_entry(logger, ss.str()); - // RCLCPP_INFO(this->get_logger(), "I heard: '%s'", msg->data.c_str()); - // nth_prime_silly(runtime_); - - // TODO: investigate jitter generation - double used_runtime = runtime_; - if (use_random_runtime) { - if (rand() % 100 < runtime_over_chance * 100) { - used_runtime *= over_runtime_scale; - } else { - used_runtime *= normal_runtime_scale; - } - } - - dummy_load(used_runtime); - // RCLCPP_INFO(this->get_logger(), "Result: %d", result); - auto new_msg = std_msgs::msg::Int32(); - new_msg.data = msg->data; - pub_->publish(new_msg); - - ss.str(""); - ss << "{\"operation\": \"end_work\", \"chain\": " << chain_ - << ", \"node\": \"" << get_name() << "\", \"count\": " << msg->data - << "}"; - log_entry(logger, ss.str()); - }; - - rclcpp::SubscriptionOptions sub_options; - if (callback_group) { - callback_group_ = callback_group; - } else { - callback_group_ = node->create_callback_group( - rclcpp::CallbackGroupType::Reentrant); - } - - sub_options.callback_group = callback_group_; - sub_ = node->create_subscription( - subscribe_topic, - get_qos(), - callback, - sub_options); - pub_ = node->create_publisher(publish_topic, get_qos()); -} diff --git a/src/full_topology/src/Untitled-1 b/src/full_topology/src/Untitled-1 new file mode 100644 index 0000000..7311e7b --- /dev/null +++ b/src/full_topology/src/Untitled-1 @@ -0,0 +1,51 @@ +// A generic single-input processing node with a fixed delay +/- jitter +#define DEFINE_MULTI_PROCESSING_NODE_CLASS(NAME) \ +class NAME : public rclcpp::Node\ +{\ +public:\ + NAME(std::string const &name,\ + std::vector const &input_topics,\ + std::string const &output_topic,\ + int delay_ms,\ + double jitter_ms = 0.0) \ + : Node(name)\ + {\ + for (auto &topic : input_topics)\ + {\ + auto sub = create_subscription(\ + topic, 10,\ + [this, name, delay_ms, jitter_ms](std_msgs::msg::String::SharedPtr msg) \ + {\ + /* simple fusion: log each arrival */\ + if (DEBUG)\ + {\ + RCLCPP_INFO(get_logger(), "%s fusing input: %s", name.c_str(), msg->data.c_str());\ + }\ +#ifndef USE_TIMER_IN_FUSION_NODES\ + // Simulate processing delay + std::this_thread::sleep_for(std::chrono::milliseconds(delay_ms)); + auto out = std_msgs::msg::String(); + out.data = msg->data + " -> " + name + std::string(" fused"); + if (DEBUG) { + RCLCPP_INFO(get_logger(), "%s publishes fused message", name.c_str()); + } + publisher_->publish(out); +#endif + }); + subscriptions_.emplace_back(sub); + } + publisher_ = create_publisher(output_topic, 10); + +#ifdef USE_TIMER_IN_FUSION_NODES + // Also publish fused result periodically + timer_ = create_wall_timer(50ms, [this, name, delay_ms]() + { + std::this_thread::sleep_for(std::chrono::milliseconds(delay_ms)); + auto out = std_msgs::msg::String(); + out.data = name + std::string(" fused"); + if (DEBUG) { + RCLCPP_INFO(get_logger(), "%s publishes fused message", name.c_str()); + } + publisher_->publish(out); }); +#endif + } \ No newline at end of file diff --git a/src/full_topology/src/full_topology.cpp b/src/full_topology/src/full_topology.cpp index d80fe33..a392bb7 100644 --- a/src/full_topology/src/full_topology.cpp +++ b/src/full_topology/src/full_topology.cpp @@ -2,239 +2,182 @@ #include #include +#include #include -#include #include +#include // the order here matters #include "tracetools/utils.hpp" #include #include "std_msgs/msg/string.hpp" +/** Whether to use timers in fusion nodes */ +//#define USE_TIMER_IN_FUSION_NODES + using namespace std::chrono_literals; constexpr bool DEBUG = false; -// A generic sensor node that publishes at a given rate -class SensorNode : public rclcpp::Node -{ -public: - SensorNode(std::string const &name, - std::string const &topic, - int rate_hz) - : Node(name) - { - publisher_ = create_publisher(topic, 10); - auto period = std::chrono::milliseconds(1000 / rate_hz); - timer_ = create_wall_timer(period, [this, name]() - { - auto msg = std_msgs::msg::String(); - msg.data = name + std::string(" publishes"); - if (DEBUG) { - RCLCPP_INFO(get_logger(), "%s", msg.data.c_str()); - } - publisher_->publish(msg); }); - } - -private: - rclcpp::Publisher::SharedPtr publisher_; - rclcpp::TimerBase::SharedPtr timer_; +// A generic sensor node that publishes at a given rate + jitter +#define DEFINE_SENSOR_NODE_CLASS(NAME) \ +class NAME : public rclcpp::Node \ +{ \ +public: \ + NAME(std::string const &name, \ + std::string const &topic, \ + int rate_hz, \ + double jitter_ms_ = 0.0) \ + : Node(name) \ + { \ + publisher_ = create_publisher(topic, 10); \ + auto period = std::chrono::milliseconds(1000 / rate_hz); \ + timer_ = create_wall_timer(period, [this, name, jitter_ms_]() \ + { \ + double jitter = 0.0; \ + if (jitter_ms_ > 0.0) { \ + static thread_local std::mt19937 gen(std::random_device{}()); \ + std::normal_distribution<> d(0.0, jitter_ms_); \ + jitter = d(gen); \ + } \ + /* Ensure total delay is non-negative */ \ + int total_delay = std::max(0, static_cast(std::round(jitter))); \ + std::this_thread::sleep_for(std::chrono::milliseconds(total_delay)); \ + auto msg = std_msgs::msg::String(); \ + msg.data = name + std::string(" publishes"); \ + if (DEBUG) { \ + RCLCPP_INFO(get_logger(), "%s", msg.data.c_str()); \ + } \ + publisher_->publish(msg); }); \ + } \ + \ +private: \ + rclcpp::Publisher::SharedPtr publisher_; \ + rclcpp::TimerBase::SharedPtr timer_; \ }; -// A generic single-input processing node with a fixed delay -class ProcessingNode : public rclcpp::Node -{ -public: - ProcessingNode(std::string const &name, - std::string const &input_topic, - std::string const &output_topic, - int delay_ms) - : Node(name) - { - subscription_ = create_subscription( - input_topic, 10, - [this, name, output_topic, delay_ms](std_msgs::msg::String::SharedPtr msg) - { - if (DEBUG) { - RCLCPP_INFO(get_logger(), "%s received: %s", name.c_str(), msg->data.c_str()); - } - std::this_thread::sleep_for(std::chrono::milliseconds(delay_ms)); - auto out = std_msgs::msg::String(); - out.data = msg->data + " -> " + name; - publisher_->publish(out); - }); - publisher_ = create_publisher(output_topic, 10); - } - -protected: - rclcpp::Subscription::SharedPtr subscription_; - rclcpp::Publisher::SharedPtr publisher_; +// A generic single-input processing node with a fixed delay +/- jitter +#define DEFINE_PROCESSING_NODE_CLASS(NAME) \ +class NAME : public rclcpp::Node \ +{ \ +public: \ + NAME(const std::string & name, \ + const std::string & input_topic, \ + const std::string & output_topic, \ + int delay_ms, \ + double jitter_ms = 0.0) \ + : Node(name), delay_ms_(delay_ms), jitter_ms_(jitter_ms) \ + { \ + subscription_ = create_subscription( \ + input_topic, 10, \ + [this, name](std_msgs::msg::String::SharedPtr msg) \ + { \ + if (DEBUG) { \ + RCLCPP_INFO(get_logger(), "%s received: %s", name.c_str(), msg->data.c_str()); \ + } \ + /* Simulate processing delay */ \ + double jitter = 0.0; \ + if (jitter_ms_ > 0.0) { \ + static thread_local std::mt19937 gen(std::random_device{}());\ + std::normal_distribution<> d(0.0, jitter_ms_); \ + jitter = d(gen); \ + } \ + std::this_thread::sleep_for(std::chrono::milliseconds((int)(delay_ms_ + jitter))); \ + auto out = std_msgs::msg::String(); \ + out.data = msg->data + " -> " + name; \ + publisher_->publish(out); \ + }); \ + publisher_ = create_publisher(output_topic, 10); \ + } \ +protected: \ + int delay_ms_; \ + double jitter_ms_; \ + rclcpp::Subscription::SharedPtr subscription_; \ + rclcpp::Publisher::SharedPtr publisher_; \ }; -// A processing node that fuses multiple inputs -class MultiInputProcessingNode : public rclcpp::Node -{ -public: - MultiInputProcessingNode(std::string const &name, - std::vector const &input_topics, - std::string const &output_topic, - int delay_ms) - : Node(name) - { - for (auto &topic : input_topics) - { - auto sub = create_subscription( - topic, 10, - [this, name, delay_ms](std_msgs::msg::String::SharedPtr msg) - { - // simple fusion: log each arrival - if (DEBUG) - { - RCLCPP_INFO(get_logger(), "%s fusing input: %s", name.c_str(), msg->data.c_str()); - } - }); - subscriptions_.push_back(sub); - } - publisher_ = create_publisher(output_topic, 10); - // Also publish fused result periodically - timer_ = create_wall_timer(50ms, [this, name, delay_ms]() - { - std::this_thread::sleep_for(std::chrono::milliseconds(delay_ms)); - auto out = std_msgs::msg::String(); - out.data = name + std::string(" fused"); - if (DEBUG) { - RCLCPP_INFO(get_logger(), "%s publishes fused message", name.c_str()); - } - publisher_->publish(out); }); - } - -protected: - std::vector::SharedPtr> subscriptions_; - rclcpp::Publisher::SharedPtr publisher_; - rclcpp::TimerBase::SharedPtr timer_; +// A generic single-input processing node with a fixed delay +/- jitter +#define DEFINE_MULTI_PROCESSING_NODE_CLASS(NAME) \ +class NAME : public rclcpp::Node \ +{ \ +public: \ + NAME(std::string const &name, \ + std::vector const &input_topics, \ + std::string const &output_topic, \ + int delay_ms, \ + double jitter_ms = 0.0) \ + : Node(name), delay_ms_(delay_ms), jitter_ms_(jitter_ms) \ + { \ + for (auto &topic : input_topics) \ + { \ + auto sub = create_subscription( \ + topic, 10, \ + [this, name](std_msgs::msg::String::SharedPtr msg) \ + { \ + /* simple fusion: log each arrival */ \ + if (DEBUG) \ + { \ + RCLCPP_INFO(get_logger(), "%s fusing input: %s", name.c_str(), msg->data.c_str()); \ + } \ + /* Simulate processing delay */ \ + double jitter = 0.0; \ + if (jitter_ms_ > 0.0) { \ + static thread_local std::mt19937 gen(std::random_device{}()); \ + std::normal_distribution<> d(0.0, jitter_ms_); \ + jitter = d(gen); \ + } \ + std::this_thread::sleep_for(std::chrono::milliseconds((int)(delay_ms_ + jitter))); \ + auto out = std_msgs::msg::String(); \ + out.data = msg->data + " -> " + name; \ + if (DEBUG) { \ + RCLCPP_INFO(get_logger(), "%s publishes fused message", name.c_str()); \ + } \ + publisher_->publish(out); \ + }); \ + subscriptions_.emplace_back(sub); \ + } \ + publisher_ = create_publisher(output_topic, 10); \ + \ + } \ +protected: \ + std::vector::SharedPtr> subscriptions_; \ + rclcpp::Publisher::SharedPtr publisher_; \ + int delay_ms_; \ + double jitter_ms_; \ }; -class CameraNode : public SensorNode -{ -public: - CameraNode(std::string const &name, std::string const &topic, int rate_hz) : SensorNode(name, topic, rate_hz) {} -}; +DEFINE_SENSOR_NODE_CLASS(CameraNode) +DEFINE_PROCESSING_NODE_CLASS(DebayerNode) +DEFINE_PROCESSING_NODE_CLASS(RadiometricNode) +DEFINE_PROCESSING_NODE_CLASS(GeometricNode) +DEFINE_PROCESSING_NODE_CLASS(MappingNode) -class DebayerNode : public ProcessingNode -{ -public: - DebayerNode(std::string const &name, std::string const &input_topic, std::string const &output_topic, int delay_ms) : ProcessingNode(name, input_topic, output_topic, delay_ms) {} -}; +DEFINE_PROCESSING_NODE_CLASS(SmokeClassifierNode) -class RadiometricNode : public ProcessingNode -{ -public: - RadiometricNode(std::string const &name, std::string const &input_topic, std::string const &output_topic, int delay_ms) : ProcessingNode(name, input_topic, output_topic, delay_ms) { - // Overwrite the subscription callback - subscription_ = create_subscription( - input_topic, 10, - [this, name, output_topic, delay_ms](std_msgs::msg::String::SharedPtr msg) - { - // Custom logic for RadiometricNode - if (DEBUG) { - RCLCPP_INFO(get_logger(), "%s received: %s", name.c_str(), msg->data.c_str()); - } +DEFINE_SENSOR_NODE_CLASS(GPSNode) +DEFINE_SENSOR_NODE_CLASS(IMUNode) +DEFINE_SENSOR_NODE_CLASS(BaroNode) - /* TODO - // random: 1 in 5 it will be 10s instead of delay_ms - if (rand() % 5 == 0) { - std::this_thread::sleep_for(std::chrono::milliseconds(10000)); - } else { - std::this_thread::sleep_for(std::chrono::milliseconds(delay_ms)); - }*/ +DEFINE_PROCESSING_NODE_CLASS(FusionNodeS) +DEFINE_MULTI_PROCESSING_NODE_CLASS(FusionNode) - // Custom transformation of data - auto out = std_msgs::msg::String(); - out.data = msg->data + " -> Radiometric processing complete"; - - // Publish the processed message - publisher_->publish(out); - }); - } -}; +DEFINE_SENSOR_NODE_CLASS(LidarNode) -class GeometricNode : public ProcessingNode -{ -public: - GeometricNode(std::string const &name, std::string const &input_topic, std::string const &output_topic, int delay_ms) : ProcessingNode(name, input_topic, output_topic, delay_ms) {} -}; +DEFINE_SENSOR_NODE_CLASS(CommandNode) -class MappingNode : public ProcessingNode -{ -public: - MappingNode(std::string const &name, std::string const &input_topic, std::string const &output_topic, int delay_ms) : ProcessingNode(name, input_topic, output_topic, delay_ms) {} -}; +DEFINE_PROCESSING_NODE_CLASS(FlightManagementNodeS) -class SmokeClassifierNode : public ProcessingNode -{ -public: - SmokeClassifierNode(std::string const &name, std::string const &input_topic, std::string const &output_topic, int delay_ms) : ProcessingNode(name, input_topic, output_topic, delay_ms) {} -}; +DEFINE_MULTI_PROCESSING_NODE_CLASS(FlightManagementNode) -class GPSNode : public SensorNode -{ -public: - GPSNode(std::string const &name, std::string const &topic, int rate_hz) : SensorNode(name, topic, rate_hz) {} -}; +DEFINE_PROCESSING_NODE_CLASS(ControlNode) -class IMUNode : public SensorNode -{ -public: - IMUNode(std::string const &name, std::string const &topic, int rate_hz) : SensorNode(name, topic, rate_hz) {} -}; +DEFINE_PROCESSING_NODE_CLASS(TelemetryNodeS) +DEFINE_MULTI_PROCESSING_NODE_CLASS(TelemetryNode) -class BaroNode : public SensorNode -{ -public: - BaroNode(std::string const &name, std::string const &topic, int rate_hz) : SensorNode(name, topic, rate_hz) {} -}; +DEFINE_PROCESSING_NODE_CLASS(RadioNode) -class FusionNode : public MultiInputProcessingNode -{ -public: - FusionNode(std::string const &name, std::vector const &input_topics, std::string const &output_topic, int delay_ms) : MultiInputProcessingNode(name, input_topics, output_topic, delay_ms) {} -}; - -class LidarNode : public SensorNode -{ -public: - LidarNode(std::string const &name, std::string const &topic, int rate_hz) : SensorNode(name, topic, rate_hz) {} -}; - -class CommandNode : public SensorNode -{ -public: - CommandNode(std::string const &name, std::string const &topic, int rate_hz) : SensorNode(name, topic, rate_hz) {} -}; - -class FlightManagementNode : public MultiInputProcessingNode -{ -public: - FlightManagementNode(std::string const &name, std::vector const &input_topics, std::string const &output_topic, int delay_ms) : MultiInputProcessingNode(name, input_topics, output_topic, delay_ms) {} -}; - -class ControlNode : public ProcessingNode -{ -public: - ControlNode(std::string const &name, std::string const &input_topic, std::string const &output_topic, int delay_ms) : ProcessingNode(name, input_topic, output_topic, delay_ms) {} -}; - -class TelemetryNode : public MultiInputProcessingNode -{ -public: - TelemetryNode(std::string const &name, std::vector const &input_topics, std::string const &output_topic, int delay_ms) : MultiInputProcessingNode(name, input_topics, output_topic, delay_ms) {} -}; - -class RadioNode : public ProcessingNode -{ -public: - RadioNode(std::string const &name, std::string const &input_topic, std::string const &output_topic, int delay_ms) : ProcessingNode(name, input_topic, output_topic, delay_ms) {} -}; +#define PROCENTUAL_JITTER 0.1 +#define DELAY_JITTER_PAIR(DELAY) DELAY, PROCENTUAL_JITTER * DELAY int main(int argc, char **argv) { @@ -242,32 +185,32 @@ int main(int argc, char **argv) rclcpp::executors::MultiThreadedExecutor executor; // Chain 1: Image2Ground Mapping - auto camera = std::make_shared("camera_node", "/camera/raw", 30); - auto debayer = std::make_shared("debayer_node", "/camera/raw", "/camera/debayered", 15); - auto radiometric = std::make_shared("radiometric_node", "/camera/debayered", "/camera/radiometric", 20); - auto geometric = std::make_shared("geometric_node", "/camera/radiometric", "/camera/geometric", 25); - auto mapping = std::make_shared("mapping_node", "/camera/geometric", "/camera/mapped", 30); + auto camera = std::make_shared("input_camera_node", "/input/camera/raw", DELAY_JITTER_PAIR(5)); // Sensor read, low latency + auto debayer = std::make_shared("debayer_node", "/input/camera/raw", "/camera/debayered", DELAY_JITTER_PAIR(18)); // Moderate, image conversion + auto radiometric = std::make_shared("radiometric_node", "/camera/debayered", "/camera/radiometric", DELAY_JITTER_PAIR(22)); // Heavier image math + auto geometric = std::make_shared("geometric_node", "/camera/radiometric", "/camera/geometric", DELAY_JITTER_PAIR(12)); // Perspective transform, medium + auto mapping = std::make_shared("output_mapping_node", "/camera/geometric", "/output/camera/mapped", DELAY_JITTER_PAIR(16)); // Warping/stitching, moderate // Chain 2: Semantic Scheduler Adaptation - auto smoke = std::make_shared("smoke_classifier_node", "/camera/radiometric", "/classifier/classification", 50); + auto smoke = std::make_shared("output_smoke_classifier_node", "/camera/radiometric", "/output/classifier/classification", DELAY_JITTER_PAIR(35)); // CNN/ML classifier, can be high // Chain 3: Flight Control - auto gps = std::make_shared("gps_node", "/gps/fix", 10); - auto imu = std::make_shared("imu_node", "/imu/data", 100); - auto baro = std::make_shared("baro_node", "/baro/alt", 20); + auto gps = std::make_shared("input_gps_node", "/input/gps/fix", DELAY_JITTER_PAIR(7)); // Fast, low latency sensor + auto imu = std::make_shared("input_imu_node", "/input/imu/data", DELAY_JITTER_PAIR(5)); // Fast, low latency sensor + auto baro = std::make_shared("input_baro_node", "/input/baro/alt", DELAY_JITTER_PAIR(6)); // Fast, low latency sensor auto fusion = std::make_shared( - "sensor_fusion_node", std::vector{"/gps/fix", "/imu/data", "/baro/alt"}, "/sensors/fused", 10); - auto lidar = std::make_shared("lidar_node", "/lidar/scan", 15); - // TODO: operator or autonomous flight plan - auto cmd = std::make_shared("operator_cmd_node", "/operator/commands", 1); - auto mgmt = std::make_shared( - "flight_mgmt_node", std::vector{"/sensors/fused", "/lidar/scan", "/operator/commands"}, "/flight/plan", 20); - auto control = std::make_shared("flight_control_node", "/flight/plan", "/flight/cmd", 25); + "sensor_fusion_node", std::vector{"/input/gps/fix", "/input/imu/data", "/input/baro/alt"}, "/sensors/fused", DELAY_JITTER_PAIR(14)); // Sensor fusion, not trivial but not heavy + + auto lidar = std::make_shared("input_lidar_node", "/input/lidar/scan", DELAY_JITTER_PAIR(10)); // LIDAR driver, can be fast + auto cmd = std::make_shared("input_operator_cmd_node", "/input/operator/commands", DELAY_JITTER_PAIR(8)); // Operator commands, fast + auto mgmt = std::make_shared( + "flight_mgmt_node", std::vector{"/sensors/fused", "/input/lidar/scan", "/input/operator/commands"}, "/flight/plan", DELAY_JITTER_PAIR(18)); // Path planning, moderate + auto control = std::make_shared("output_flight_control_node", "/flight/plan", "/output/flight/cmd", DELAY_JITTER_PAIR(12)); // PID controller, fast but non-zero // Chain 4: Data Storage/Streaming auto telemetry = std::make_shared( - "telemetry_node", std::vector{"/sensors/fused", "/camera/mapped"}, "/telemetry/data", 5); - auto radio = std::make_shared("radio_tx_node", "/telemetry/data", "/telemetry/radio", 10); + "telemetry_node", std::vector{"/sensors/fused", "/output/camera/mapped"}, "/telemetry/data", DELAY_JITTER_PAIR(6)); // Quick serialization + auto radio = std::make_shared("output_radio_tx_node", "/telemetry/data", "/output/telemetry/radio", DELAY_JITTER_PAIR(3)); // Output, almost instant // Add all to executor executor.add_node(camera); @@ -275,17 +218,21 @@ int main(int argc, char **argv) executor.add_node(radiometric); executor.add_node(geometric); executor.add_node(mapping); + executor.add_node(smoke); + executor.add_node(gps); executor.add_node(imu); executor.add_node(baro); executor.add_node(fusion); + + executor.add_node(telemetry); + executor.add_node(radio); + executor.add_node(lidar); executor.add_node(cmd); executor.add_node(mgmt); executor.add_node(control); - executor.add_node(telemetry); - executor.add_node(radio); executor.spin(); diff --git a/src/ros2_tracing b/src/ros2_tracing index 1b96054..e8637c9 160000 --- a/src/ros2_tracing +++ b/src/ros2_tracing @@ -1 +1 @@ -Subproject commit 1b9605494554d4d740d356efc43cf647c0fb9f66 +Subproject commit e8637c9043b9bc396135dbfa7068143bfa8a879c diff --git a/src/simple_topology/CMakeLists.txt b/src/simple_topology/CMakeLists.txt deleted file mode 100644 index bfc47b6..0000000 --- a/src/simple_topology/CMakeLists.txt +++ /dev/null @@ -1,55 +0,0 @@ -cmake_minimum_required(VERSION 3.5) -project(simple_topology VERSION 0.1.0) - -# Set C++ standards -set(CMAKE_CXX_STANDARD 17) -set(CMAKE_CXX_STANDARD_REQUIRED ON) -set(CMAKE_CXX_EXTENSIONS OFF) - -# Compiler options -if(CMAKE_COMPILER_IS_GNUCXX OR CMAKE_CXX_COMPILER_ID MATCHES "Clang") - add_compile_options(-Wall -Wextra -Wpedantic) -endif() - -# find dependencies -find_package(ament_cmake REQUIRED) -find_package(tracetools REQUIRED) -find_package(rclcpp REQUIRED) -find_package(std_msgs REQUIRED) - -add_executable(${PROJECT_NAME} src/simple_topology.cpp) -target_include_directories(${PROJECT_NAME} PUBLIC - $ - $ -) - -# Add dependencies with ament -ament_target_dependencies(${PROJECT_NAME} - tracetools - rclcpp - std_msgs -) - -# Installation -install( - TARGETS ${PROJECT_NAME} - RUNTIME DESTINATION lib/${PROJECT_NAME} -) -install( - DIRECTORY launch - DESTINATION share/${PROJECT_NAME} -) -install( - DIRECTORY include/ - DESTINATION include -) - -# Testing -if(BUILD_TESTING) - find_package(ament_lint_auto REQUIRED) - ament_lint_auto_find_test_dependencies() -endif() - -# Export and package configuration -ament_export_include_directories(include) -ament_package() \ No newline at end of file diff --git a/src/simple_topology/launch/trace_simple_topology.launch.py b/src/simple_topology/launch/trace_simple_topology.launch.py deleted file mode 100644 index e4c7bad..0000000 --- a/src/simple_topology/launch/trace_simple_topology.launch.py +++ /dev/null @@ -1,24 +0,0 @@ -from launch import LaunchDescription -from launch_ros.actions import Node -from tracetools_launch.action import Trace - -def generate_launch_description(): - return LaunchDescription([ - # Start tracing automatically - Trace( - session_name='ros2_tracing_session', - events_ust=[ - 'rclcpp:*', 'rcl:*', 'rmw:*', 'ros2:*', 'rclcpp:*callback*', 'rclcpp:*executor*' - ], - events_kernel=[ - 'sched_switch', 'sched_wakeup' - ], - ), - - # Your executable as a Node - Node( - package='simple_topology', - executable='simple_topology', - output='screen', - ), - ]) diff --git a/src/simple_topology/package.xml b/src/simple_topology/package.xml deleted file mode 100644 index 22cfbca..0000000 --- a/src/simple_topology/package.xml +++ /dev/null @@ -1,36 +0,0 @@ - - - - simple_topology - 0.1.0 - TODO: Package description - dev - TODO: License declaration - - ament_cmake - - launch - launch_ros - tracetools - tracetools_trace - tracetools_launch - - - rclcpp - std_msgs - - ament_copyright - ament_flake8 - ament_mypy - ament_pep257 - ament_xmllint - python3-pytest - - - ament_lint_auto - ament_lint_common - - - ament_cmake - - diff --git a/src/simple_topology/src/simple_topology.cpp b/src/simple_topology/src/simple_topology.cpp deleted file mode 100644 index 2165157..0000000 --- a/src/simple_topology/src/simple_topology.cpp +++ /dev/null @@ -1,132 +0,0 @@ -#include -#include -#include -#include - -#include "rclcpp/rclcpp.hpp" -#include "std_msgs/msg/string.hpp" - -using namespace std::chrono_literals; - -constexpr int RUN_DURATION_SECONDS = 5; - -// Nodes in the topology: -// - Chain 1: Image2Ground Mapping -// - Node 1: Camera Node (Sensor Node): Simulates a camera that publishes messages. -// - Node 2: De-Bayering Node (Processing Node): Processes the camera messages. Listens to Node 1. -// - Node 3: Radiometric Correction Node (Processing Node): Further processes the messages. Listens to Node 2. -// - Node 4: Geometric Correction Node (Processing Node): Applies geometric corrections. Listens to Node 3. -// - Node 5: Image to Ground Mapping Node (Processing Node): Maps the image to ground coordinates. Listens to Node 4. -// - Chain 2: Semantic Scheduler Adaptation -// - Node 6: Smoke/Fire Classification Node (Decision Node): Listens to Node 3 and makes scheduluer decisions based on the processed data. -// - Chain 3: Flight Control -// - Node 7: GPS Node (Sensor Node): Simulates a GPS sensor that publishes messages. -// - Node 8: IMU Node (Sensor Node): Simulates an IMU sensor that publishes messages. -// - Node 9: Barometric Altimeter Node (Sensor Node): Simulates a barometric altimeter that publishes messages. -// - Node 10: Sensor Fusion Node (Processing Node): Fuses data from the GPS, IMU, and barometric altimeter. Listens to Nodes 7, 8, and 9. -// - Node 11: LIDAR Node (Sensor Node): Simulates a LiDAR sensor that publishes messages. -// - Node 12: Operator Command Node (Sensor Node): Remote control or flight plan issuing node -// - Node 13: Flight Management Node (Processing Node): Manages flight operations. Listens to Node 10, 11, and 12. -// - Node 14: Flight Control Node (Processing Node): Controls the flight based on data from Node 13. Listens to Node 13. -// - Chain 4: Data Storage/Streaming -// - Node 15: Telemetry Node (Processing Node): Collects telemetry data. Listens to Node 10 and 5. -// - Node 16: Radio Transmitter Node (Processing Node): Transmits data. Listens to Node 15. - - -// --- Sensor Node --- -class SensorNode : public rclcpp::Node -{ -public: - SensorNode() : Node("sensor_node") - { - publisher_ = this->create_publisher("sensor_topic", 10); - timer_ = this->create_wall_timer( - 100ms, [this]() { publish_message(); }); - } - -private: - void publish_message() - { - auto message = std_msgs::msg::String(); - message.data = "Sensor reading"; - RCLCPP_INFO(this->get_logger(), "Publishing: '%s'", message.data.c_str()); - publisher_->publish(message); - } - - rclcpp::Publisher::SharedPtr publisher_; - rclcpp::TimerBase::SharedPtr timer_; -}; - -// --- Processing Node --- -class ProcessingNode : public rclcpp::Node -{ -public: - ProcessingNode() : Node("processing_node") - { - subscription_ = this->create_subscription( - "sensor_topic", 10, - [this](std_msgs::msg::String::SharedPtr msg) { process_message(msg); }); - publisher_ = this->create_publisher("processed_topic", 10); - } - -private: - void process_message(const std_msgs::msg::String::SharedPtr msg) - { - RCLCPP_INFO(this->get_logger(), "Processing: '%s'", msg->data.c_str()); - // Simulate processing delay - std::this_thread::sleep_for(10ms); - auto new_msg = std_msgs::msg::String(); - new_msg.data = msg->data + " -> processed"; - publisher_->publish(new_msg); - } - - rclcpp::Subscription::SharedPtr subscription_; - rclcpp::Publisher::SharedPtr publisher_; -}; - -// --- Decision Node --- -class DecisionNode : public rclcpp::Node -{ -public: - DecisionNode() : Node("decision_node") - { - subscription_ = this->create_subscription( - "processed_topic", 10, - [this](std_msgs::msg::String::SharedPtr msg) { decide(msg); }); - } - -private: - void decide(const std_msgs::msg::String::SharedPtr msg) - { - RCLCPP_INFO(this->get_logger(), "Deciding based on: '%s'", msg->data.c_str()); - // Future place for semantic detection (e.g., "if smoke detected, boost priority!") - } - - rclcpp::Subscription::SharedPtr subscription_; -}; - -// --- Main --- -int main(int argc, char * argv[]) -{ - rclcpp::init(argc, argv); - - auto sensor_node = std::make_shared(); - auto processing_node = std::make_shared(); - auto decision_node = std::make_shared(); - - rclcpp::executors::MultiThreadedExecutor executor; - executor.add_node(sensor_node); - executor.add_node(processing_node); - executor.add_node(decision_node); - - auto start_time = std::chrono::steady_clock::now(); - - while (rclcpp::ok() && (std::chrono::steady_clock::now() - start_time) < std::chrono::seconds(RUN_DURATION_SECONDS)) - { - executor.spin_some(); - std::this_thread::sleep_for(1ms); - } - - rclcpp::shutdown(); - return 0; -}