commit 803f778e6c355cfb0cb04aacad60cf2108cdd79e Author: Kurt Wilson Date: Sat Mar 22 19:42:17 2025 -0400 add casestudy files diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..2f674c6 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,5 @@ +FROM osrf/ros:foxy-desktop + +RUN apt update && apt install -y ninja-build clang-18 + +RUN echo "source /opt/ros/foxy/setup.bash" >> /root/.bashrc diff --git a/colcon_defaults.yaml b/colcon_defaults.yaml new file mode 100755 index 0000000..72df3fd --- /dev/null +++ b/colcon_defaults.yaml @@ -0,0 +1,15 @@ +{ + "build":{ + # "symlink-install": true, + "cmake-args": [ + "-DCMAKE_BUILD_TYPE=RelWithDebInfo", + "-DCMAKE_EXPORT_COMPILE_COMMANDS=True", + "-GNinja", + # clang + "-DCMAKE_C_COMPILER=clang-18", + "-DCMAKE_CXX_COMPILER=clang++-18", + ], + "event-handlers": ["console_cohesion+"], + # "mixin": "clang", + } +} \ No newline at end of file diff --git a/src/casestudy/CMakeLists.txt b/src/casestudy/CMakeLists.txt new file mode 100755 index 0000000..963fb8c --- /dev/null +++ b/src/casestudy/CMakeLists.txt @@ -0,0 +1,95 @@ +cmake_minimum_required(VERSION 3.5) +project(casestudy) + +# Default to C99 +if(NOT CMAKE_C_STANDARD) + set(CMAKE_C_STANDARD 99) +endif() + +# Default to C++14 +if(NOT CMAKE_CXX_STANDARD) + set(CMAKE_CXX_STANDARD 14) +endif() + +if(CMAKE_COMPILER_IS_GNUCXX OR CMAKE_CXX_COMPILER_ID MATCHES "Clang") + add_compile_options(-Wall -Wextra -Wpedantic) +endif() + +# find dependencies +find_package(ament_cmake REQUIRED) +# uncomment the following section in order to fill in +# further dependencies manually. +# find_package( REQUIRED) +find_package(priority_executor REQUIRED) +find_package(simple_timer REQUIRED) +find_package(rclcpp REQUIRED) +find_package(jsoncpp REQUIRED) +find_package(casestudy_tools REQUIRED) + +# add_library(test_nodes src/test_nodes.cpp) +# target_include_directories(test_nodes PUBLIC +# $ +# $ +# ) +# ament_target_dependencies(test_nodes +# rclcpp +# simple_timer +# casestudy_tools +# ) + + +list(APPEND experiments + casestudy_2023customfile_singlethread + casestudy_2024ours_latency + casestudy_2024ours_executor2executor + casestudy_example +) + +function(new_experiment experiment_name) + add_executable(${experiment_name} src/${experiment_name}.cpp) + target_include_directories(${experiment_name} PUBLIC + $ + $ + ) + ament_target_dependencies(${experiment_name} + rclcpp + priority_executor + simple_timer + casestudy_tools + ) +endfunction() + +foreach(experiment ${experiments}) + new_experiment(${experiment}) +endforeach() + +target_link_libraries(casestudy_2023customfile_singlethread + jsoncpp +) + + +install(TARGETS + ${experiments} + DESTINATION lib/${PROJECT_NAME} +) + + +install( + DIRECTORY include/ + DESTINATION include +) + + +if(BUILD_TESTING) + find_package(ament_lint_auto REQUIRED) + # the following line skips the linter which checks for copyrights + # uncomment the line when a copyright and license is not present in all source files + #set(ament_cmake_copyright_FOUND TRUE) + # the following line skips cpplint (only works in a git repo) + # uncomment the line when this package is not in a git repo + #set(ament_cmake_cpplint_FOUND TRUE) + ament_lint_auto_find_test_dependencies() +endif() + +ament_export_include_directories(include) +ament_package() diff --git a/src/casestudy/package.xml b/src/casestudy/package.xml new file mode 100755 index 0000000..917a316 --- /dev/null +++ b/src/casestudy/package.xml @@ -0,0 +1,22 @@ + + + + casestudy + 0.0.0 + TODO: Package description + vboxuser + TODO: License declaration + + ament_cmake + + ament_lint_auto + ament_lint_common + + priority_executor + simple_timer + casestudy_tools + + + ament_cmake + + diff --git a/src/casestudy/src/casestudy_2023customfile_singlethread.cpp b/src/casestudy/src/casestudy_2023customfile_singlethread.cpp new file mode 100755 index 0000000..1b6d1a9 --- /dev/null +++ b/src/casestudy/src/casestudy_2023customfile_singlethread.cpp @@ -0,0 +1,213 @@ +#include "rclcpp/rclcpp.hpp" +#include "priority_executor/priority_executor.hpp" +#include "priority_executor/priority_memory_strategy.hpp" +#include "casestudy_tools/test_nodes.hpp" +#include +#include +#include +#include "std_msgs/msg/string.hpp" +#include "casestudy_tools/primes_workload.hpp" +#include +#include "casestudy_tools/primes_workload.hpp" +#include "casestudy_tools/experiment.hpp" +#include "jsoncpp/json/reader.h" + +// bool should_do_task = true; +std::atomic should_do_task(true); + +void run_one_executor(std::function &)> exec_fun, int idx) +{ + (void)idx; + // set this process to highest priority + struct sched_param param; + param.sched_priority = sched_get_priority_max(SCHED_FIFO) - 1; + int result = sched_setscheduler(0, SCHED_FIFO, ¶m); + if (result != 0) + { + std::cout << "ros_experiment: sched_setscheduler failed: " << result << ": " << strerror(errno) << std::endl; + } + + // set process name + prctl(PR_SET_NAME, "ros_experiment", 0, 0, 0); + + exec_fun(should_do_task); +} + +int ros_experiment(int argc, char **arg, std::string file_name, ExperimentConfig config) +{ + + rclcpp::init(argc, arg); + // we have to create a node to process any passed parameters + auto node = rclcpp::Node::make_shared("experiment_node"); + node->declare_parameter("executor_type", "edf"); + + std::string executor_type; + node->get_parameter("executor_type", executor_type); + std::cout << "using executor type: " << executor_type << std::endl; + + // TODO: move experiment configuration to main + config.executor_type = executor_type; + config.nodes.push_back(node); + + Experiment experiment(config); + // std::string result_log = experiment.run(should_do_task); + std::vector &)>> exec_funs = experiment.getRunFunctions(); + std::cout << "got " << exec_funs.size() << " executors" << std::endl; + std::vector exec_threads; + int i = 0; + experiment.resetTimers(); + for (auto exec_fun : exec_funs) + { + exec_threads.push_back(std::thread(run_one_executor, exec_fun, i)); + i++; + } + + for (auto &t : exec_threads) + { + t.join(); + } + + experiment.writeLogsToFile(file_name, "2023customfile_singlethread"); + + return 0; +} + +int main(int argc, char *argv[]) +{ + std::ifstream input_file("taskset.json"); // Replace with actual filename + + if (!input_file.is_open()) + { + std::cerr << "Error opening file!" << std::endl; + return 1; + } + + Json::Value root; + Json::Reader reader; + bool parsingSuccessful = reader.parse(input_file, root); + + if (!parsingSuccessful) + { + std::cerr << "Error parsing file!" << std::endl; + return 1; + } + + // set ourselves to the second highest priority + struct sched_param param; + param.sched_priority = sched_get_priority_max(SCHED_FIFO) - 2; + int result = sched_setscheduler(0, SCHED_FIFO, ¶m); + if (result != 0) + { + std::cout << "ros_experiment: sched_setscheduler failed: " << result << ": " << strerror(errno) << std::endl; + } + // calibrate the dummy load + // Naive way to calibrate dummy workload for current system + int dummy_load_calib = 1; + while (1) + { + timeval ctime, ftime; + int duration_us; + gettimeofday(&ctime, NULL); + dummy_load(100); // 100ms + gettimeofday(&ftime, NULL); + duration_us = (ftime.tv_sec - ctime.tv_sec) * 1000000 + (ftime.tv_usec - ctime.tv_usec); + RCLCPP_INFO(rclcpp::get_logger("rclcpp"), "dummy_load_calib: %d (duration_us: %d ns)", dummy_load_calib, duration_us); + if (abs(duration_us - 100 * 1000) < 500) + { // error margin: 500us + break; + } + dummy_load_calib = 100 * 1000 * dummy_load_calib / duration_us; + if (dummy_load_calib <= 0) + dummy_load_calib = 1; + DummyLoadCalibration::setCalibration(dummy_load_calib); + } + // DummyLoadCalibration::setCalibration(2900); + ExperimentConfig config; + + // config.chain_lengths = {4, 3, 4}; + for (uint i = 0; i < root["chain_lengths"].size(); i++) + { + config.chain_lengths.push_back(root["chain_lengths"][i].asUInt()); + } + + // config.node_ids = {{0, 1, 2, 3}, {4, 5, 6}, {7, 8, 9, 10}}; + for (uint i = 0; i < root["node_ids"].size(); i++) + { + std::vector node_ids_row; + for (uint j = 0; j < root["node_ids"][i].size(); j++) + { + node_ids_row.push_back(root["node_ids"][i][j].asUInt()); + } + config.node_ids.push_back(node_ids_row); + } + // config.node_priorities = {3, 2, 1, 0, 6, 5, 4, 10, 9, 8, 7}; // PICAS-style prioritiesA + for (uint i = 0; i < root["node_priorities"].size(); i++) + { + config.node_priorities.push_back(root["node_priorities"][i].asUInt()); + } + + // disabled for custom + config.num_groups = 0; + // config.group_memberships = {0, 0, 1, 2, 0, 0, 1, 0, 0, 1, 2}; + + // config.chain_timer_control = {0, 1, 2}; + for (uint i = 0; i < root["chain_timer_control"].size(); i++) + { + config.chain_timer_control.push_back(root["chain_timer_control"][i].asInt()); + } + + // config.node_runtimes = {5, 5, 5, 5, // chain 1 + // 10, 20, 5, // chain 2 + // 10, 10, 15, 20}; // chain 3 + for (uint i = 0; i < root["node_runtimes"].size(); i++) + { + config.node_runtimes.push_back(root["node_runtimes"][i].asDouble()); + } + // config.chain_periods = {50, 100, 110}; + for (uint i = 0; i < root["chain_periods"].size(); i++) + { + config.chain_periods.push_back(root["chain_periods"][i].asInt()); + } + // config.node_executor_assignments = {}; // empty - multicore mode + for (uint i = 0; i < root["node_executor_assignments"].size(); i++) + { + config.node_executor_assignments.push_back(root["node_executor_assignments"][i].asUInt() - 1); + } + // single threaded mode - assign executors to cpu cores + for (uint i = 0; i < root["executor_to_cpu_core"].size(); i++) + { + config.executor_to_cpu_assignments.push_back(root["executor_to_cpu_core"][i].asUInt() - 1); + } + + config.parallel_mode = false; + config.cores = 4; + + // sanity_check_config(config); + + // from 35/65 to 100/0 in increments of 5 + // for (int i = 0; i <= (100 - 35) / 5; i++) + // { + // for now, run with 100% ros + int i = (100 - 35) / 5; + int ros_budget = 35 + i * 5; + int other_task_budget = 65 - i * 5; + std::cout << "ROS budget: " << ros_budget << " Other task budget: " << other_task_budget << std::endl; + // do other task in a separate thread + should_do_task.store(true); + double other_task_time = 80 * other_task_budget / 100.0; + std::thread other_task(do_other_task, other_task_time, 80, std::ref(should_do_task)); + + // do ros task + + std::string file_name = "ros_" + std::to_string(ros_budget); + // ros_experiment(argc, argv, file_name); + std::thread ros_task(ros_experiment, argc, argv, file_name, config); + std::cout << "tasks started" << std::endl; + ros_task.join(); + other_task.join(); + std::cout << "tasks joined" << std::endl; + + // // run once + // break; + // } +} \ No newline at end of file diff --git a/src/casestudy/src/casestudy_2024ours_executor2executor.cpp b/src/casestudy/src/casestudy_2024ours_executor2executor.cpp new file mode 100755 index 0000000..63f35a9 --- /dev/null +++ b/src/casestudy/src/casestudy_2024ours_executor2executor.cpp @@ -0,0 +1,215 @@ +#include "casestudy_tools/experiment.hpp" +#include "casestudy_tools/primes_workload.hpp" +#include +#include +#include + +// bool should_do_task = true; +std::atomic should_do_task(true); + +void run_one_executor(std::function &)> exec_fun, + int idx) { + (void)idx; + // set this process to highest priority + // struct sched_param param; + // param.sched_priority = sched_get_priority_max(SCHED_FIFO) - 1; + // int result = sched_setscheduler(0, SCHED_FIFO, ¶m); + // if (result != 0) + // { + // std::cout << "ros_experiment: sched_setscheduler failed: " << result << + // ": " << strerror(errno) << std::endl; + // } + sched_attr rt_attr; + rt_attr.size = sizeof(rt_attr); + rt_attr.sched_flags = 0; + rt_attr.sched_nice = 0; + rt_attr.sched_priority = 99; + rt_attr.sched_policy = SCHED_FIFO; + rt_attr.sched_runtime = 0; + rt_attr.sched_period = 0; + int result = sched_setattr(0, &rt_attr, 0); + if (result != 0) { + std::cout << "executor task: could not set scheduler: " << result << ": " + << strerror(errno) << std::endl; + } + + // set process name + prctl(PR_SET_NAME, "ros_experiment", 0, 0, 0); + + exec_fun(should_do_task); +} + +int ros_experiment(rclcpp::Node::SharedPtr node, std::string file_name, + ExperimentConfig config) { + + std::string executor_type; + node->get_parameter("executor_type", executor_type); + int num_subscribers; + node->get_parameter("num_subscribers", num_subscribers); + int num_chains; + node->get_parameter("num_chains", num_chains); + std::cout << "using executor type: " << executor_type << std::endl; + + // TODO: move experiment configuration to main + config.executor_type = executor_type; + config.nodes.push_back(node); + + Experiment experiment(config); + // std::string result_log = experiment.run(should_do_task); + std::vector &)>> exec_funs = + experiment.getRunFunctions(); + std::cout << "got " << exec_funs.size() << " executors" << std::endl; + std::vector exec_threads; + int i = 0; + experiment.resetTimers(); + for (auto exec_fun : exec_funs) { + exec_threads.push_back(std::thread(run_one_executor, exec_fun, i)); + i++; + } + + for (auto &t : exec_threads) { + t.join(); + } + + std::string outputname = "2024ours_executor2executor"; + outputname += + "_" + std::to_string(num_subscribers) + "_" + std::to_string(num_chains); + experiment.writeLogsToFile(file_name, outputname); + + return 0; +} + +int main(int argc, char *argv[]) { + rclcpp::init(argc, argv); + // we have to create a node to process any passed parameters + auto node = rclcpp::Node::make_shared("experiment_node"); + // auto node = rclcpp::Node::make_shared("experiment_node", + // rclcpp::NodeOptions().use_intra_process_comms(true)); + node->declare_parameter("executor_type", "edf"); + node->declare_parameter("num_subscribers", 3); + node->declare_parameter("num_chains", 1); + node->declare_parameter("runtime", 0); + + int num_s; + node->get_parameter("num_subscribers", num_s); + int runtime; + node->get_parameter("runtime", runtime); + int num_chains; + node->get_parameter("num_chains", num_chains); + + // set ourselves to the second highest priority + // struct sched_param param; + // param.sched_priority = sched_get_priority_max(SCHED_FIFO) - 2; + // int result = sched_setscheduler(0, SCHED_FIFO, ¶m); + sched_attr rt_attr; + rt_attr.size = sizeof(rt_attr); + rt_attr.sched_policy = SCHED_FIFO; + // rt_attr.sched_priority = sched_get_priority_max(SCHED_FIFO) - 2; + rt_attr.sched_priority = 99; + int result = sched_setattr(0, &rt_attr, 0); + + if (result != 0) { + std::cout << "ros_experiment: sched_setscheduler failed: " << result << ": " + << strerror(errno) << std::endl; + } + // calibrate the dummy load + // Naive way to calibrate dummy workload for current system + int dummy_load_calib = 1; + while (1) { + timeval ctime, ftime; + int duration_us; + gettimeofday(&ctime, NULL); + dummy_load(100); // 100ms + gettimeofday(&ftime, NULL); + duration_us = (ftime.tv_sec - ctime.tv_sec) * 1000000 + + (ftime.tv_usec - ctime.tv_usec); + RCLCPP_INFO(rclcpp::get_logger("rclcpp"), + "dummy_load_calib: %d (duration_us: %d ns)", dummy_load_calib, + duration_us); + if (abs(duration_us - 100 * 1000) < 500) { // error margin: 500us + break; + } + dummy_load_calib = 100 * 1000 * dummy_load_calib / duration_us; + if (dummy_load_calib <= 0) + dummy_load_calib = 1; + DummyLoadCalibration::setCalibration(dummy_load_calib); + } + // DummyLoadCalibration::setCalibration(2900); + ExperimentConfig config; + + // runtimes and timers subject to change + config.chain_lengths = {}; + config.node_ids = {}; + config.chain_timer_control = {}; + config.chain_periods = {}; + config.node_runtimes = {}; + config.node_executor_assignments = {}; + int node_id = 0; + for (int c = 0; c < num_chains; c++) { + config.node_ids.push_back({}); + for (int i = 0; i < num_s + 1; i++) { + config.node_ids[c].push_back(node_id); + if (i == 0) { + // first node is the publisher, and goes in the first executor + config.node_executor_assignments.push_back(0); + } else { + // all other nodes go in the second executor + config.node_executor_assignments.push_back(1); + } + node_id++; + } + for (int i = 0; i < num_s + 1; i++) { + config.node_priorities.push_back(i); + } + config.chain_lengths.push_back(num_s+1); + config.chain_periods.push_back(10); + for (int i = 0; i < num_s + 1; i++) { + config.node_runtimes.push_back(runtime); + } + config.chain_timer_control.push_back(c); + } + + config.executor_to_cpu_assignments = {0, 1}; + + std::cout << "node ids: " << std::endl; + for (size_t i = 0; i < config.node_ids.size(); i++) { + for (size_t j = 0; j < config.node_ids[i].size(); j++) { + std::cout << config.node_ids[i][j] << " "; + } + std::cout << std::endl; + } + + + config.num_groups = 0; + config.group_memberships = {}; // no groups + + config.parallel_mode = false; + config.cores = 1; + config.special_cases.push_back("2024ours_latency"); + + // sanity_check_config(config); + + // from 35/65 to 100/0 in increments of 5 + // for (int i = 0; i <= (100 - 35) / 5; i++) + // { + // for now, run with 100% ros + int i = (100 - 35) / 5; + int ros_budget = 35 + i * 5; + int other_task_budget = 65 - i * 5; + std::cout << "ROS budget: " << ros_budget + << " Other task budget: " << other_task_budget << std::endl; + // do other task in a separate thread + should_do_task.store(true); + + // do ros task + std::string file_name = "ros_" + std::to_string(ros_budget); + // ros_experiment(argc, argv, file_name); + std::thread ros_task(ros_experiment, node, file_name, config); + std::cout << "tasks started" << std::endl; + ros_task.join(); + std::cout << "tasks joined" << std::endl; + + // // run once + // break; + // } +} \ No newline at end of file diff --git a/src/casestudy/src/casestudy_2024ours_latency.cpp b/src/casestudy/src/casestudy_2024ours_latency.cpp new file mode 100755 index 0000000..fdccb61 --- /dev/null +++ b/src/casestudy/src/casestudy_2024ours_latency.cpp @@ -0,0 +1,208 @@ +#include "casestudy_tools/experiment.hpp" +#include "casestudy_tools/primes_workload.hpp" +#include +#include +#include + +// bool should_do_task = true; +std::atomic should_do_task(true); + + +void run_one_executor(std::function &)> exec_fun, + int idx) { + (void)idx; + // set this process to highest priority + // struct sched_param param; + // param.sched_priority = sched_get_priority_max(SCHED_FIFO) - 1; + // int result = sched_setscheduler(0, SCHED_FIFO, ¶m); + // if (result != 0) + // { + // std::cout << "ros_experiment: sched_setscheduler failed: " << result << + // ": " << strerror(errno) << std::endl; + // } + sched_attr rt_attr; + rt_attr.size = sizeof(rt_attr); + rt_attr.sched_flags = 0; + rt_attr.sched_nice = 0; + rt_attr.sched_priority = 99; + rt_attr.sched_policy = SCHED_FIFO; + rt_attr.sched_runtime = 0; + rt_attr.sched_period = 0; + int result = sched_setattr(0, &rt_attr, 0); + if (result != 0) { + std::cout << "executor task: could not set scheduler: " << result << ": " + << strerror(errno) << std::endl; + } + + // set process name + prctl(PR_SET_NAME, "ros_experiment", 0, 0, 0); + + exec_fun(should_do_task); +} + +int ros_experiment(rclcpp::Node::SharedPtr node, std::string file_name, + ExperimentConfig config) { + + std::string executor_type; + node->get_parameter("executor_type", executor_type); + int num_subscribers; + node->get_parameter("num_subscribers", num_subscribers); + int num_chains; + node->get_parameter("num_chains", num_chains); + std::cout << "using executor type: " << executor_type << std::endl; + + // TODO: move experiment configuration to main + config.executor_type = executor_type; + config.nodes.push_back(node); + + Experiment experiment(config); + // std::string result_log = experiment.run(should_do_task); + std::vector &)>> exec_funs = + experiment.getRunFunctions(); + std::cout << "got " << exec_funs.size() << " executors" << std::endl; + std::vector exec_threads; + int i = 0; + experiment.resetTimers(); + for (auto exec_fun : exec_funs) { + exec_threads.push_back(std::thread(run_one_executor, exec_fun, i)); + i++; + } + + for (auto &t : exec_threads) { + t.join(); + } + + std::string outputname = "2024ours_latency"; + outputname += + "_" + std::to_string(num_subscribers) + "_" + std::to_string(num_chains); + experiment.writeLogsToFile(file_name, outputname); + + return 0; +} + +int main(int argc, char *argv[]) { + rclcpp::init(argc, argv); + // we have to create a node to process any passed parameters + auto node = rclcpp::Node::make_shared("experiment_node"); + // auto node = rclcpp::Node::make_shared("experiment_node", + // rclcpp::NodeOptions().use_intra_process_comms(true)); + node->declare_parameter("executor_type", "edf"); + node->declare_parameter("num_subscribers", 3); + node->declare_parameter("num_chains", 1); + node->declare_parameter("runtime", 0); + + int num_s; + node->get_parameter("num_subscribers", num_s); + int runtime; + node->get_parameter("runtime", runtime); + int num_chains; + node->get_parameter("num_chains", num_chains); + + // set ourselves to the second highest priority + // struct sched_param param; + // param.sched_priority = sched_get_priority_max(SCHED_FIFO) - 2; + // int result = sched_setscheduler(0, SCHED_FIFO, ¶m); + sched_attr rt_attr; + rt_attr.size = sizeof(rt_attr); + rt_attr.sched_policy = SCHED_FIFO; + // rt_attr.sched_priority = sched_get_priority_max(SCHED_FIFO) - 2; + rt_attr.sched_priority = 99; + int result = sched_setattr(0, &rt_attr, 0); + + if (result != 0) { + std::cout << "ros_experiment: sched_setscheduler failed: " << result << ": " + << strerror(errno) << std::endl; + } + // calibrate the dummy load + // Naive way to calibrate dummy workload for current system + int dummy_load_calib = 1; + while (1) { + timeval ctime, ftime; + int duration_us; + gettimeofday(&ctime, NULL); + dummy_load(100); // 100ms + gettimeofday(&ftime, NULL); + duration_us = (ftime.tv_sec - ctime.tv_sec) * 1000000 + + (ftime.tv_usec - ctime.tv_usec); + RCLCPP_INFO(rclcpp::get_logger("rclcpp"), + "dummy_load_calib: %d (duration_us: %d ns)", dummy_load_calib, + duration_us); + if (abs(duration_us - 100 * 1000) < 500) { // error margin: 500us + break; + } + dummy_load_calib = 100 * 1000 * dummy_load_calib / duration_us; + if (dummy_load_calib <= 0) + dummy_load_calib = 1; + DummyLoadCalibration::setCalibration(dummy_load_calib); + } + // DummyLoadCalibration::setCalibration(2900); + ExperimentConfig config; + + // runtimes and timers subject to change + config.chain_lengths = {}; + config.node_ids = {}; + config.chain_timer_control = {}; + config.chain_periods = {}; + config.node_runtimes = {}; + int node_id = 0; + for (int c = 0; c < num_chains; c++) { + config.node_ids.push_back({}); + for (int i = 0; i < num_s + 1; i++) { + config.node_ids[c].push_back(node_id); + node_id++; + } + for (int i = 0; i < num_s + 1; i++) { + config.node_priorities.push_back(i); + } + config.chain_lengths.push_back(num_s+1); + config.chain_periods.push_back(10); + for (int i = 0; i < num_s + 1; i++) { + config.node_runtimes.push_back(runtime); + } + config.chain_timer_control.push_back(c); + } + + std::cout << "node ids: " << std::endl; + for (size_t i = 0; i < config.node_ids.size(); i++) { + for (size_t j = 0; j < config.node_ids[i].size(); j++) { + std::cout << config.node_ids[i][j] << " "; + } + std::cout << std::endl; + } + + + config.num_groups = 0; + config.group_memberships = {}; // no groups + + config.node_executor_assignments = {}; + // config.node_executor_assignments = {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}; + config.parallel_mode = false; + config.cores = 1; + config.special_cases.push_back("2024ours_latency"); + + // sanity_check_config(config); + + // from 35/65 to 100/0 in increments of 5 + // for (int i = 0; i <= (100 - 35) / 5; i++) + // { + // for now, run with 100% ros + int i = (100 - 35) / 5; + int ros_budget = 35 + i * 5; + int other_task_budget = 65 - i * 5; + std::cout << "ROS budget: " << ros_budget + << " Other task budget: " << other_task_budget << std::endl; + // do other task in a separate thread + should_do_task.store(true); + + // do ros task + std::string file_name = "ros_" + std::to_string(ros_budget); + // ros_experiment(argc, argv, file_name); + std::thread ros_task(ros_experiment, node, file_name, config); + std::cout << "tasks started" << std::endl; + ros_task.join(); + std::cout << "tasks joined" << std::endl; + + // // run once + // break; + // } +} \ No newline at end of file diff --git a/src/casestudy/src/casestudy_example.cpp b/src/casestudy/src/casestudy_example.cpp new file mode 100644 index 0000000..0ee1bbd --- /dev/null +++ b/src/casestudy/src/casestudy_example.cpp @@ -0,0 +1,132 @@ +#include "rclcpp/rclcpp.hpp" +#include "priority_executor/priority_executor.hpp" +#include "priority_executor/priority_memory_strategy.hpp" +#include "casestudy_tools/test_nodes.hpp" +#include +#include +#include +#include "std_msgs/msg/string.hpp" +#include "casestudy_tools/primes_workload.hpp" +#include +#include "casestudy_tools/primes_workload.hpp" +#include "casestudy_tools/experiment.hpp" + +// bool should_do_task = true; +std::atomic should_do_task(true); + +void run_one_executor(std::function &)> exec_fun, int idx) +{ + (void)idx; + // cpu_set_t cpuset; + // CPU_ZERO(&cpuset); + // CPU_SET(0, &cpuset); + // CPU_SET(1, &cpuset); + // CPU_SET(2, &cpuset); + // CPU_SET(3, &cpuset); + + // int result = sched_setaffinity(0, sizeof(cpu_set_t), &cpuset); + // if (result != 0) + // { + // std::cout << "ros_experiment: sched_setaffinity failed: " << result << ": " << strerror(errno) << std::endl; + // } + + // set this process to second highest priority + struct sched_param param; + param.sched_priority = sched_get_priority_max(SCHED_FIFO) - 2; + int result = sched_setscheduler(0, SCHED_FIFO, ¶m); + if (result != 0) + { + std::cout << "ros_experiment: sched_setscheduler failed: " << result << ": " << strerror(errno) << std::endl; + } + + // set process name + prctl(PR_SET_NAME, "ros_experiment", 0, 0, 0); + + exec_fun(should_do_task); +} + +int ros_experiment(int argc, char **arg, std::string file_name, ExperimentConfig config) +{ + + rclcpp::init(argc, arg); + // we have to create a node to process any passed parameters + auto node = rclcpp::Node::make_shared("experiment_parameters"); + node->declare_parameter("executor_type", "edf"); + + std::string executor_type; + node->get_parameter("executor_type", executor_type); + std::cout << "using executor type: " << executor_type << std::endl; + + // TODO: move experiment configuration to main + config.executor_type = executor_type; + config.parallel_mode = true; + config.nodes.push_back(node); + + Experiment experiment(config); + // std::string result_log = experiment.run(should_do_task); + std::vector &)>> exec_funs = experiment.getRunFunctions(); + std::cout << "got " << exec_funs.size() << " executors" << std::endl; + std::vector exec_threads; + int i = 0; + experiment.resetTimers(); + for (auto exec_fun : exec_funs) + { + exec_threads.push_back(std::thread(run_one_executor, exec_fun, i)); + i++; + } + + for (auto &t : exec_threads) + { + t.join(); + } + std::string outputname = "casestudy_example"; + experiment.writeLogsToFile(file_name, outputname); + + return 0; +} + +int main(int argc, char *argv[]) +{ + + // calibrate the dummy load + // Naive way to calibrate dummy workload for current system + int dummy_load_calib = 1; + while (1) + { + timeval ctime, ftime; + int duration_us; + gettimeofday(&ctime, NULL); + dummy_load(100); // 100ms + gettimeofday(&ftime, NULL); + duration_us = (ftime.tv_sec - ctime.tv_sec) * 1000000 + (ftime.tv_usec - ctime.tv_usec); + RCLCPP_INFO(rclcpp::get_logger("rclcpp"), "dummy_load_calib: %d (duration_us: %d ns)", dummy_load_calib, duration_us); + if (abs(duration_us - 100 * 1000) < 500) + { // error margin: 500us + break; + } + dummy_load_calib = 100 * 1000 * dummy_load_calib / duration_us; + if (dummy_load_calib <= 0) + dummy_load_calib = 1; + DummyLoadCalibration::setCalibration(dummy_load_calib); + } + // DummyLoadCalibration::setCalibration(2900); + ExperimentConfig config; + config.chain_lengths = {2, 2}; + config.node_ids = {{0, 1}, {2, 3}}; + config.node_priorities = {1, 0, 3, 2}; + config.chain_timer_control = {0, 1}; + + config.node_runtimes = {10, 10, 10, 10}; + // node 0 has a period of 80, and is the only timer + config.chain_periods = {100, 100}; + config.node_executor_assignments = {}; + config.parallel_mode = true; + config.cores = 2; + + sanity_check_config(config); + + std::thread ros_task(ros_experiment, argc, argv, "cs_example", config); + std::cout << "tasks started" << std::endl; + ros_task.join(); + std::cout << "tasks joined" << std::endl; +} \ No newline at end of file diff --git a/src/casestudy_tools/CMakeLists.txt b/src/casestudy_tools/CMakeLists.txt new file mode 100755 index 0000000..fa5be69 --- /dev/null +++ b/src/casestudy_tools/CMakeLists.txt @@ -0,0 +1,92 @@ +cmake_minimum_required(VERSION 3.5) +project(casestudy_tools) + +# Default to C99 +if(NOT CMAKE_C_STANDARD) + set(CMAKE_C_STANDARD 99) +endif() + +# Default to C++14 +if(NOT CMAKE_CXX_STANDARD) + set(CMAKE_CXX_STANDARD 14) +endif() + +if(CMAKE_COMPILER_IS_GNUCXX OR CMAKE_CXX_COMPILER_ID MATCHES "Clang") + add_compile_options(-Wall -Wextra -Wpedantic) +endif() + +# find dependencies +find_package(ament_cmake REQUIRED) +find_package(rclcpp REQUIRED) +find_package(priority_executor REQUIRED) +find_package(simple_timer REQUIRED) + +if(BUILD_TESTING) + find_package(ament_lint_auto REQUIRED) + # the following line skips the linter which checks for copyrights + # uncomment the line when a copyright and license is not present in all source files + #set(ament_cmake_copyright_FOUND TRUE) + # the following line skips cpplint (only works in a git repo) + # uncomment the line when this package is not in a git repo + #set(ament_cmake_cpplint_FOUND TRUE) + ament_lint_auto_find_test_dependencies() +endif() + +add_library(primes_workload src/primes_workload.cpp) +target_include_directories(primes_workload PUBLIC + $ + $ +) +ament_target_dependencies(primes_workload + simple_timer + rclcpp + priority_executor + +) + +add_library(test_nodes src/test_nodes.cpp) +target_include_directories(test_nodes PUBLIC + $ + $ +) +target_link_libraries(test_nodes primes_workload) +ament_target_dependencies(test_nodes + rclcpp + simple_timer +) + +add_library(experiment_host src/experiment.cpp) +target_include_directories(experiment_host PUBLIC + $ + $ +) +target_link_libraries(experiment_host + primes_workload + test_nodes +) +ament_target_dependencies(experiment_host + rclcpp + simple_timer + priority_executor +) + + +install( + DIRECTORY include/ + DESTINATION include +) + +install( + TARGETS primes_workload test_nodes experiment_host + EXPORT export_${PROJECT_NAME} + ARCHIVE DESTINATION lib + LIBRARY DESTINATION lib + RUNTIME DESTINATION bin + INCLUDES DESTINATION include +) + +ament_export_include_directories(include) +# ament_export_libraries(primes_workload test_nodes experiment_host) +ament_export_targets(export_${PROJECT_NAME} HAS_LIBRARY_TARGET) +ament_export_dependencies(rclcpp priority_executor simple_timer) +ament_package() diff --git a/src/casestudy_tools/include/casestudy_tools/experiment.hpp b/src/casestudy_tools/include/casestudy_tools/experiment.hpp new file mode 100755 index 0000000..b7da6ab --- /dev/null +++ b/src/casestudy_tools/include/casestudy_tools/experiment.hpp @@ -0,0 +1,140 @@ +#ifndef RTIS_EXPERIMENT_HOST +#define RTIS_EXPERIMENT_HOST + +#include +#include +#include +#include "priority_executor/priority_executor.hpp" +#include "casestudy_tools/test_nodes.hpp" +#include "simple_timer/rt-sched.hpp" + + +struct ExperimentConfig +{ + std::vector chain_lengths; + // TODO: rename to callback_ids + std::vector> node_ids; + // TODO: rename to callback_runtimes + std::vector node_runtimes; + // TODO: rename to callback_priorities + std::vector node_priorities; + std::vector chain_periods; + // for each chain, the index of the timer node + std::vector chain_timer_control; + + uint num_groups = 0; + std::vector group_memberships; + + // TODO: change to enum + // can be "default", "picas", "edf" + std::string executor_type = "default"; + + // if this is empty, then the nodes will be assigned to a single executor + // TODO: rename this to callback_executor_assignments + std::vector node_executor_assignments; + // used in single-threaded mode to assign executors to cores + std::vector executor_to_cpu_assignments; + + bool parallel_mode = false; + size_t cores = 2; + + // rclcpp::Node::SharedPtr node; + // nodes to host the callbacks. In parallel (one executor using multiple threads) mode, there is one node and one executor. The node is provided by the experiment provider + // in singlethread (each executor uses one thread), there is one node per executor. Additional nodes may be created by the experiment host + std::vector nodes; + std::vector special_cases = {}; +}; + +struct experiment_executor +{ + std::shared_ptr executor; + std::shared_ptr> strat; + + std::shared_ptr default_executor; +}; + +class Experiment +{ +public: + Experiment(ExperimentConfig config); + + /** + * @brief Run the experiment + * runs in the current thread + * @param should_do_task a flag that can be set to false to stop the experiment. The flag will be set to false when the experiment is complete. + */ + std::string run(std::atomic &should_do_task); + + + /** + * @brief Get the functions required to run the experiment + * returns a vector of functions. Each function will run a different executor. It's up to the caller to run these functions in different threads. + * The caller may set the thread properties (e.g. priority) before running the function. + * Each function accepts a flag that can be set to false to stop the experiment. The flag will be set to false when the experiment is complete. + */ + std::vector &)>> getRunFunctions(); + + static size_t numExecutorsRequired(const ExperimentConfig &config); + + /** + * get the executor (and strategy) for a given executor index (0 by default) + */ + experiment_executor getExecutor(int executor_idx = 0); + + node_time_logger getInternalLogger(); + + /** + * collect logs from the executors. Also add any logs from passed in loggers + * @param optional array of loggers to add to the logs + */ + std::string buildLogs(std::vector loggers = {}); + + void resetTimers(); + + void writeLogsToFile(std::string filename, std::string experiment_name); + +private: + ExperimentConfig config; + std::vector executors; + + // values to keep track of runtime values + + // chain-indexed vector of deques of deadlines + std::vector *> chain_deadlines_deque; + // maps node_id to node + std::map> nodes; + // maps chain_id to timer + std::map> chain_timer_handles; + + /** + * @brief Create the executors using the type and amount set in the config + * + */ + void createExecutors(); + + /** + * @brief Create a single executor using the type set in the config + * + * @return experiment_executor + */ + experiment_executor createSingleExecutor(uint executor_num = 0); + + /** + * @brief Get the executor for a given node id + * @param node_id if the node_executor_assignments is empty, then this parameter is ignored, and the first executor is returned + */ + experiment_executor getExecutorForNode(int node_id=0); + + void createNodesAndAssignProperties(); + + + void setInitialDeadlines(); + + + node_time_logger _logger; +}; + +int do_other_task(double work_time, uint period, std::atomic &should_do_task); +void sanity_check_config(ExperimentConfig config); + +#endif // RTIS_EXPERIMENT_HOST \ No newline at end of file diff --git a/src/casestudy_tools/include/casestudy_tools/primes_workload.hpp b/src/casestudy_tools/include/casestudy_tools/primes_workload.hpp new file mode 100755 index 0000000..a7d2373 --- /dev/null +++ b/src/casestudy_tools/include/casestudy_tools/primes_workload.hpp @@ -0,0 +1,22 @@ +#ifndef RTIS_PRIMES_WORKLOAD +#define RTIS_PRIMES_WORKLOAD +#include +#include +#include +#include +typedef double ktimeunit; +ktimeunit nth_prime_silly(double millis = 100); +void dummy_load(int load_ms); +// create a global variable to store the calibration value +class DummyLoadCalibration +{ +public: + static int getCalibration(); + static void setCalibration(int calib); + +private: + static int calibration; +}; + +ktimeunit get_thread_time(struct timespec *currTime); +#endif \ No newline at end of file diff --git a/src/casestudy_tools/include/casestudy_tools/test_nodes.hpp b/src/casestudy_tools/include/casestudy_tools/test_nodes.hpp new file mode 100755 index 0000000..f8da7a4 --- /dev/null +++ b/src/casestudy_tools/include/casestudy_tools/test_nodes.hpp @@ -0,0 +1,61 @@ +#ifndef RTIS_TEST_NODES +#define RTIS_TEST_NODES +#include +#include "rclcpp/rclcpp.hpp" +#include "std_msgs/msg/string.hpp" +#include "std_msgs/msg/int32.hpp" +#include "simple_timer/rt-sched.hpp" + +class CaseStudyNode +{ + public: node_time_logger logger; + // inherit constructor + CaseStudyNode(std::string publish_topic, rclcpp::Node::SharedPtr node) + { + this->name = publish_topic; + this->node = node; + // seed random number generator + srand(time(NULL)); + } + + bool use_random_runtime = false; + double normal_runtime_scale = 0.8; + double over_runtime_scale = 1.2; + double runtime_over_chance = 0.1; + std::string get_name(); + rclcpp::CallbackGroup::SharedPtr callback_group_; + + private: + rclcpp::Node::SharedPtr node; + std::string name; +}; + +#define COUNT_MAX 500 + +class PublisherNode : public CaseStudyNode{ + public: + PublisherNode(std::string publish_topic, double runtime, int period, int chain, rclcpp::Node::SharedPtr node, rclcpp::CallbackGroup::SharedPtr callback_group = nullptr); + rclcpp::TimerBase::SharedPtr timer_; + int count_max_ = COUNT_MAX; + + private: + rclcpp::Publisher::SharedPtr pub_; + double runtime_; + int period_; + int chain_; +}; + +class WorkerNode : public CaseStudyNode{ + public: + WorkerNode(std::string subscribe_topic, std::string publish_topic, double runtime, int period, int chain, rclcpp::Node::SharedPtr node, rclcpp::CallbackGroup::SharedPtr callback_group = nullptr); + rclcpp::Subscription::SharedPtr sub_; + int count_max_ = COUNT_MAX; + + private: + double runtime_; + int period_; + int chain_; + rclcpp::Publisher::SharedPtr pub_; +}; + +#endif // RTIS_TEST_NODES \ No newline at end of file diff --git a/src/casestudy_tools/package.xml b/src/casestudy_tools/package.xml new file mode 100755 index 0000000..82d3b67 --- /dev/null +++ b/src/casestudy_tools/package.xml @@ -0,0 +1,22 @@ + + + + casestudy_tools + 0.0.0 + TODO: Package description + vboxuser + TODO: License declaration + + ament_cmake + + rclcpp + priority_executor + simple_timer + + ament_lint_auto + ament_lint_common + + + ament_cmake + + diff --git a/src/casestudy_tools/src/experiment.cpp b/src/casestudy_tools/src/experiment.cpp new file mode 100755 index 0000000..f76a6ad --- /dev/null +++ b/src/casestudy_tools/src/experiment.cpp @@ -0,0 +1,612 @@ +#include "casestudy_tools/experiment.hpp" +#include "casestudy_tools/primes_workload.hpp" +#include "casestudy_tools/test_nodes.hpp" +#include "priority_executor/multithread_priority_executor.hpp" +#include "priority_executor/priority_executor.hpp" +#include "priority_executor/priority_memory_strategy.hpp" + +#include +#include +#include +#include +#include +#include +#include + +Experiment::Experiment(ExperimentConfig config) : config(config) { + _logger = create_logger(); +} + +void sanity_check_config(ExperimentConfig config) { + // make sure chain_lengths and node_ids are the same size + if (config.chain_lengths.size() != config.node_ids.size()) { + std::cout << "ros_experiment: chain_lengths.size()= " + << config.chain_lengths.size() + << " != node_ids.size()= " << config.node_ids.size() << std::endl; + exit(1); + } + // make sure each chain_lengths is the same size as the corresponding node_ids + for (uint32_t i = 0; i < config.chain_lengths.size(); i++) { + if (config.chain_lengths[i] != config.node_ids[i].size()) { + std::cout << "ros_experiment: chain_lengths[" << i + << "]= " << config.chain_lengths[i] << " != node_ids[" << i + << "].size()= " << config.node_ids[i].size() << std::endl; + exit(1); + } + } + // make sure chain_timer_control is the same size as chain_lengths + if (config.chain_timer_control.size() != config.chain_lengths.size()) { + std::cout << "ros_experiment: chain_timer_control.size()= " + << config.chain_timer_control.size() + << " != chain_lengths.size()= " << config.chain_lengths.size() + << std::endl; + exit(1); + } + std::set all_node_ids = std::set(); + for (uint32_t i = 0; i < config.node_ids.size(); i++) { + for (uint32_t j = 0; j < config.node_ids[i].size(); j++) { + all_node_ids.insert(config.node_ids[i][j]); + } + } + // make sure we have the right number of node_priorities and node_runtimes + if (all_node_ids.size() != config.node_priorities.size()) { + std::cout << "ros_experiment: all_node_ids.size()= " << all_node_ids.size() + << " != node_priorities.size()= " << config.node_priorities.size() + << std::endl; + exit(1); + } + if (all_node_ids.size() != config.node_runtimes.size()) { + std::cout << "ros_experiment: all_node_ids.size()= " << all_node_ids.size() + << " != node_runtimes.size()= " << config.node_runtimes.size() + << std::endl; + exit(1); + } +} + +std::string Experiment::run(std::atomic &should_do_task) { + // TODO: split into setup and run, so that run can be re-used in + // Experiment::getRunFunctions + if (config.node_executor_assignments.size() != 0 || + numExecutorsRequired(config) != 1) { + std::cerr + << "called Experiment::run with node_executor_assignments.size() != 0" + << std::endl; + return ""; + } + + createExecutors(); + + createNodesAndAssignProperties(); + + setInitialDeadlines(); + // reset all timers + resetTimers(); + // in this function (run), there is only one executor, so retrieve it now + experiment_executor executor = getExecutorForNode(); + if (config.executor_type == "edf" || config.executor_type == "picas") { + executor.executor->spin(); + } else if (config.executor_type == "default") { + executor.default_executor->spin(); + } + std::cout << "spin done" << std::endl; + should_do_task.store(false); + + return buildLogs(); +} + +std::vector &)>> +Experiment::getRunFunctions() { + // do pre-run setup + createExecutors(); + createNodesAndAssignProperties(); + + // create a function for each executor + std::vector &)>> run_functions; + for (size_t i = 0; i < numExecutorsRequired(config); i++) { + // get the executor for this function + experiment_executor executor = executors[i]; + run_functions.push_back([this, + executor](std::atomic &should_do_task) { + if (config.executor_type == "edf" || config.executor_type == "picas") { + executor.executor->spin(); + } else if (config.executor_type == "default") { + executor.default_executor->spin(); + } + std::cout << "spin done" << std::endl; + should_do_task.store(false); + }); + } + + // do these last, since they're time-sensitive + setInitialDeadlines(); + // resetTimers(); + + return run_functions; +} + +size_t Experiment::numExecutorsRequired(const ExperimentConfig &config) { + // get the number of unique values in the node_executor_assignments + if (config.node_executor_assignments.size() == 0) { + return 1; + } + std::set unique_executors; + for (auto &assignment : config.node_executor_assignments) { + unique_executors.insert(assignment); + } + return unique_executors.size(); +} + +experiment_executor Experiment::getExecutor(int executor_idx) { + return executors[executor_idx]; +} + +void Experiment::createExecutors() { + if (config.node_executor_assignments.size() == 0) { + // create a single executor + executors.push_back(createSingleExecutor(0)); + return; + } + + // depending on config.executor_to_cpu_assignments and config.parallel_mode, + // we may need to create the host nodes + if (config.executor_to_cpu_assignments.size() != 0 && config.parallel_mode) { + std::cerr << "executor_to_cpu_assignments not supported for parallel mode" + << std::endl; + } + + if (config.executor_to_cpu_assignments.size() != 0 && + config.executor_to_cpu_assignments.size() != + numExecutorsRequired(config)) { + std::cerr + << "executor_to_cpu_assignments.size() != numExecutorsRequired(config)" + << std::endl; + exit(1); + } + + // create the required host nodes - re-use any existing nodes + for (uint i = config.nodes.size(); + i < config.executor_to_cpu_assignments.size(); i++) { + // create a node for each executor + rclcpp::Node::SharedPtr node = + std::make_shared("node_" + std::to_string(i)); + config.nodes.push_back(node); + } + std::cout << "created " << config.nodes.size() << " nodes" << std::endl; + + int num_executors = numExecutorsRequired(config); + std::cout << "creating " << num_executors << " executors" << std::endl; + for (int i = 0; i < num_executors; i++) { + executors.push_back(createSingleExecutor(i)); + } +} + +experiment_executor Experiment::createSingleExecutor(uint executor_num) { + experiment_executor executor = {nullptr, nullptr, nullptr}; + + if (config.executor_type == "edf" || config.executor_type == "picas") { + executor.strat = std::make_shared>(); + rclcpp::ExecutorOptions options; + options.memory_strategy = executor.strat; + if (config.parallel_mode) { + executor.executor = + std::make_shared( + options, "priority_executor", config.cores); + } else { + executor.executor = + std::make_shared(options); + } + executor.executor->prio_memory_strategy_ = executor.strat; + if (config.node_executor_assignments.size() == 0) { + // add all nodes to the executor + for (auto &node : config.nodes) { + executor.executor->add_node(node); + } + } else { + // add only this executor's nodes + executor.executor->add_node(config.nodes[executor_num]); + } + } else if (config.executor_type == "default") { + if (config.parallel_mode) { + // executor.default_executor = + // std::make_shared(rclcpp::ExecutorOptions(), + // config.cores); + executor.default_executor = + std::make_shared( + rclcpp::ExecutorOptions(), config.cores); + } else { + executor.default_executor = std::make_shared(); + } + if (config.node_executor_assignments.size() == 0) { + // add all nodes to the executor + for (auto &node : config.nodes) { + executor.default_executor->add_node(node); + } + } else { + // add only this executor's nodes + executor.default_executor->add_node(config.nodes[executor_num]); + } + } else { + // RCLCPP_ERROR(node->get_logger(), "Unknown executor type: %s", + // executor_type.c_str()); + return executor; + } + return executor; +} + +experiment_executor Experiment::getExecutorForNode(int node_id) { + if (config.node_executor_assignments.size() == 0) { + return executors[0]; + } else { + int executor_idx = config.node_executor_assignments[node_id]; + std::cout << "node " << node_id << " assigned to executor " << executor_idx + << std::endl; + return executors[executor_idx]; + } +} + +void Experiment::createNodesAndAssignProperties() { + + // BEGIN SPECIAL CASES + std::vector>> all_nodes; + rclcpp::CallbackGroup::SharedPtr cb_group0; + // LEGACY GROUP BEHAVIOR + bool jiang2022_cb = false; + bool jiang2022_cs2 = false; + // one-to-many test. TODO: add generic api for multiple one->many + bool ours2024_latency = false; + for (auto &special_case : config.special_cases) { + if (special_case == "jiang2022_cb") { + jiang2022_cb = true; + } else if (special_case == "2022jiang2") { + jiang2022_cs2 = true; + } else if (special_case == "2024ours_latency") { + std::cout << "using 2024ours_latency special case" << std::endl; + ours2024_latency = true; + } + } + if (jiang2022_cb || jiang2022_cs2) { + // cb_group0 = + // config.node->create_callback_group(rclcpp::CallbackGroupType::MutuallyExclusive); + } + // END SPECIAL CASES + + std::vector cb_groups; + if (config.num_groups != 0 && + config.executor_to_cpu_assignments.size() != 0) { + std::cout << "WARNING: num_groups and executor_to_cpu_assignments are both " + "set. This is not supported. Ignoring num_groups." + << std::endl; + } + for (uint32_t i = 0; i < config.num_groups; i++) { + cb_groups.push_back(config.nodes[0]->create_callback_group( + rclcpp::CallbackGroupType::MutuallyExclusive)); + } + + // for each chain + for (uint32_t chain_id = 0; chain_id < config.node_ids.size(); chain_id++) { + all_nodes.push_back(std::vector>()); + + // for each node in the chain + bool first_node = true; + for (uint32_t node_chain_idx = 0; + node_chain_idx < config.node_ids[chain_id].size(); node_chain_idx++) { + uint32_t node_id = config.node_ids[chain_id][node_chain_idx]; + // has this node been created yet? + if (nodes.find(node_id) != nodes.end()) { + // if it has, then this node exists in another chain + // if it's a timer node, we should add it to the timer handle map + continue; + } + experiment_executor executor = getExecutorForNode(node_id); + std::shared_ptr handle; + ExecutableType type; + // is this the first node in the chain? + if (node_chain_idx == 0) { + // create a timer node + rclcpp::CallbackGroup::SharedPtr cb_group = nullptr; + if (jiang2022_cs2) { + std::cout << "jiang2022_cs2: placing all nodes in cb_group0" + << std::endl; + cb_group = cb_group0; + } + if (config.num_groups != 0 && config.group_memberships[node_id] != 0) { + cb_group = cb_groups[config.group_memberships[node_id] - 1]; + } + std::shared_ptr node; + if (config.node_executor_assignments.size() == 0) { + node = std::make_shared( + "node_" + std::to_string(node_id), config.node_runtimes[node_id], + config.chain_periods[chain_id], chain_id, config.nodes[0], + cb_group); + } else { + node = std::make_shared( + "node_" + std::to_string(node_id), config.node_runtimes[node_id], + config.chain_periods[chain_id], chain_id, + config.nodes[config.node_executor_assignments[node_id]], + cb_group); + } + handle = node->timer_->get_timer_handle(); + nodes[node_id] = node; + type = ExecutableType::TIMER; + // this_chain_timer_handle = node->timer_; + std::cout << "chain timer handle set: " << chain_id << std::endl; + chain_timer_handles[chain_id] = node->timer_; + + all_nodes[chain_id].push_back(node); + } else { + // create a subscriber node + int subscription_id = config.node_ids[chain_id][node_chain_idx - 1]; + rclcpp::CallbackGroup::SharedPtr cb_group; + if (jiang2022_cb && ((chain_id == 1 && node_chain_idx == 1) || + (chain_id == 2 && node_chain_idx == 1))) { + cb_group = cb_group0; + std::cout << "adding c: " << chain_id << " n: " << node_id + << " to cb group 0" << std::endl; + } else { + cb_group = nullptr; + } + + if (config.num_groups != 0 && config.group_memberships[node_id] != 0) { + cb_group = cb_groups[config.group_memberships[node_id] - 1]; + } + + std::shared_ptr node; + if (ours2024_latency) { + uint32_t first_node_id = config.node_ids[chain_id][0]; + node = std::make_shared( + "node_" + std::to_string(first_node_id), + "node_" + std::to_string(node_id), config.node_runtimes[node_id], + config.chain_periods[chain_id], chain_id, config.nodes[0], + cb_group); + } else if (config.node_executor_assignments.size() == 0) { + node = std::make_shared( + "node_" + std::to_string(subscription_id), + "node_" + std::to_string(node_id), config.node_runtimes[node_id], + config.chain_periods[chain_id], chain_id, config.nodes[0], + cb_group); + } else { + node = std::make_shared( + "node_" + std::to_string(subscription_id), + "node_" + std::to_string(node_id), config.node_runtimes[node_id], + config.chain_periods[chain_id], chain_id, + config.nodes[config.node_executor_assignments[node_id]], + cb_group); + } + handle = node->sub_->get_subscription_handle(); + nodes[node_id] = node; + type = ExecutableType::SUBSCRIPTION; + + all_nodes[chain_id].push_back(node); + } + if (config.executor_type == "edf") { + std::string cb_name = "node_" + std::to_string(node_id) + "_cb"; + executor.strat->set_executable_deadline( + handle, config.chain_periods[chain_id], type, chain_id, cb_name); + // executor.executor->add_node(nodes[node_id]); + // make sure timer handle exists. if not, we did something wrong + if (chain_timer_handles.find(config.chain_timer_control[chain_id]) == + chain_timer_handles.end()) { + std::cerr << "chain timer handle not found for chain " << chain_id + << ": " << config.chain_timer_control[chain_id] + << std::endl; + } + executor.strat->get_priority_settings(handle)->timer_handle = + chain_timer_handles[config.chain_timer_control[chain_id]]; + executor.strat->get_priority_settings(handle)->priority = + config.node_priorities[node_id]; // use priority as tiebreaker + if (first_node) { + executor.strat->set_first_in_chain(handle); + first_node = false; + } + // is this the last node in the chain? + if (node_chain_idx == config.node_ids[chain_id].size() - 1) { + executor.strat->set_last_in_chain(handle); + } + } else if (config.executor_type == "picas") { + // executor.executor->add_node(nodes[node_id]); + executor.strat->set_executable_priority( + handle, config.node_priorities[node_id], type, + ExecutableScheduleType::CHAIN_AWARE_PRIORITY, chain_id); + std::cout << "node " << node_id << " has priority " + << config.node_priorities[node_id] << std::endl; + } else if (config.executor_type == "default") { + // executor.default_executor->add_node(nodes[node_id]); + } + // TODO: other types + } + } +} + +std::string Experiment::buildLogs(std::vector node_logs) { + std::stringstream output_file; + + output_file << "[" << std::endl; + // for each node + for (auto node : nodes) { + // print the node's logs + node_time_logger logger = node.second->logger; + for (auto &log : *(logger.recorded_times)) { + output_file << "{\"entry\": " << log.first + << ", \"time\": " << log.second << "}," << std::endl; + } + } + + if (config.executor_type == "edf" || config.executor_type == "picas") { + for (auto &executor : executors) { + node_time_logger strat_log = executor.strat->logger_; + for (auto &log : *(strat_log.recorded_times)) { + // std::cout << log.first << " " << log.second << std::endl; + output_file << "{\"entry\": " << log.first + << ", \"time\": " << log.second << "}," << std::endl; + } + + // executor may be a subclass of RTISTimed, so we can get its logger + if (dynamic_cast(executor.executor.get())) { + node_time_logger executor_log = + dynamic_cast(executor.executor.get())->logger_; + for (auto &log : *(executor_log.recorded_times)) { + output_file << "{\"entry\": " << log.first + << ", \"time\": " << log.second << "}," << std::endl; + } + } else { + std::cout << "executor is not RTISTimed" << std::endl; + } + } + } else { + // a default executor + for (auto &executor : executors) { + // experimental: executor.default_executor's type is rclcpp::Executor, but + // here it's guaranteed to inherit from RTISTimed we can get a logger from + // it. for safety, we check the type first + if (dynamic_cast(executor.default_executor.get())) { + node_time_logger default_log = + dynamic_cast(executor.default_executor.get())->logger_; + for (auto &log : *(default_log.recorded_times)) { + output_file << "{\"entry\": " << log.first + << ", \"time\": " << log.second << "}," << std::endl; + } + std::cout << "recovered logs from default executor" << std::endl; + } else { + std::cout << "default executor is not RTISTimed" << std::endl; + } + } + } + + // add internal logs + for (auto &log_entry : *(_logger.recorded_times)) { + output_file << "{\"entry\": " << log_entry.first + << ", \"time\": " << log_entry.second << "}," << std::endl; + } + + // add logs from argument + for (auto &log : node_logs) { + for (auto &log_entry : *(log.recorded_times)) { + output_file << "{\"entry\": " << log_entry.first + << ", \"time\": " << log_entry.second << "}," << std::endl; + } + } + // remove the last comma + output_file.seekp(-2, std::ios_base::end); + output_file << "]" << std::endl; + + return output_file.str(); +} + +void Experiment::setInitialDeadlines() { + timespec current_time; + clock_gettime(CLOCK_MONOTONIC_RAW, ¤t_time); + uint64_t millis = + (current_time.tv_sec * 1000UL) + (current_time.tv_nsec / 1000000); + // we need a logger to record the first deadline, which is generated here + // TODO: generate the first deadline in the executor (or node) itself + if (config.executor_type == "edf") { + for (uint32_t chain_index = 0; chain_index < config.chain_lengths.size(); + chain_index++) { + size_t executor_idx; + if (config.node_executor_assignments.size() == 0) { + executor_idx = 0; + } else { + executor_idx = config.node_executor_assignments[chain_index]; + } + std::shared_ptr> deadlines = + executors[executor_idx].strat->get_chain_deadlines(chain_index); + deadlines->push_back(millis + config.chain_periods[chain_index] * 2); + std::stringstream oss; + oss << "{\"operation\": \"next_deadline\", \"chain_id\": " << chain_index + << ", \"deadline\": " + << millis + config.chain_periods[chain_index] * 2 << "}"; + // std::cout<<"initial deadline: "<logger_, + // oss.str()); + log_entry(_logger, oss.str()); + } + } +} + +void Experiment::resetTimers() { + for (auto timer : chain_timer_handles) { + std::cout << "resetting timer " << timer.first << std::endl; + if (timer.second == nullptr) { + std::cout << "timer is null" << std::endl; + } else { + timer.second->reset(); + /* int64_t old_period; + // set the period to 0 for force immediate execution + rcl_timer_exchange_period(timer.second->get_timer_handle().get(), 0, + &old_period); + // timer.second->get_timer_handle() */ + } + } + // sleep 1 seconds to let every timer reset + // std::this_thread::sleep_for(std::chrono::seconds(1)); +} + +void Experiment::writeLogsToFile(std::string file_name, + std::string experiment_name) { + std::ofstream output_file; + std::string name = experiment_name; + // make sure results directory exists. we don't have access to std::filesystem + // here :( + std::string command = "mkdir -p results/" + name; + int result = system(command.c_str()); + if (result != 0) { + std::cout << "could not create results directory: " << result << ": " + << strerror(errno) << std::endl; + } + output_file.open("results/" + name + "/" + file_name + "_" + + this->config.executor_type + ".json"); + output_file << buildLogs(); + output_file.close(); + std::cout << "results written to results/" + name + "/" + file_name + "_" + + this->config.executor_type + ".json" + << std::endl; +} + +int do_other_task(double work_time, uint period, + std::atomic &should_do_task) { + if (work_time == 0) { + std::cout << "other task: work time is 0, exiting" << std::endl; + return 0; + } + // pin to CPU 0 with highest priority + cpu_set_t cpuset; + CPU_ZERO(&cpuset); + CPU_SET(0, &cpuset); + int result = sched_setaffinity(0, sizeof(cpu_set_t), &cpuset); + if (result != 0) { + std::cout << "other task: could not set affinity: " << result << ": " + << strerror(errno) << std::endl; + } + + struct sched_param param; + param.sched_priority = sched_get_priority_max(SCHED_FIFO) - 1; + sched_setscheduler(0, SCHED_FIFO, ¶m); + if (result != 0) { + std::cout << "other task: could not set scheduler: " << result << ": " + << strerror(errno) << std::endl; + } + + prctl(PR_SET_NAME, "other_task", 0, 0, 0); + + // do every period milliseconds + do { + timespec current_time; + clock_gettime(CLOCK_MONOTONIC_RAW, ¤t_time); + uint64_t millis_start = (current_time.tv_sec * (uint64_t)1000) + + (current_time.tv_nsec / 1000000); + + // do work + // nth_prime_silly(work_time); + dummy_load(work_time); + + // sleep until next period + clock_gettime(CLOCK_MONOTONIC_RAW, ¤t_time); + uint64_t millis_end = (current_time.tv_sec * (uint64_t)1000) + + (current_time.tv_nsec / 1000000); + uint64_t sleep_time = period - (millis_end - millis_start); + std::this_thread::sleep_for(std::chrono::milliseconds(sleep_time)); + } while (should_do_task); + std::cout << "other task done" << std::endl; + return 0; +} \ No newline at end of file diff --git a/src/casestudy_tools/src/primes_workload.cpp b/src/casestudy_tools/src/primes_workload.cpp new file mode 100755 index 0000000..7b8a915 --- /dev/null +++ b/src/casestudy_tools/src/primes_workload.cpp @@ -0,0 +1,67 @@ +#include "casestudy_tools/primes_workload.hpp" +#include +ktimeunit nth_prime_silly(double millis) +{ + // struct tms this_thread_times; + struct timespec currTime; + int sum = 0; + int i; + int j; + ktimeunit const start_cpu_time = get_thread_time(&currTime); + ktimeunit last_iter_time = 0; + ktimeunit last_iter_start_time = start_cpu_time; + for (i = 2; i < 4294967296 - 1; i++) + { + // times(&this_thread_times); + ktimeunit cum_time = get_thread_time(&currTime); + last_iter_time = cum_time - last_iter_start_time; + last_iter_start_time = cum_time; + if ((cum_time - start_cpu_time + last_iter_time*2) > millis) + { + break; + } + for (j = 2; j < i; j++) + { + sum += j; + } + if (cum_time - start_cpu_time > millis) + { + std::cout << "Warning: Time limit exceeded" << std::endl; + } + } + // std::cout<< "took " << (get_thread_time(&currTime) - start_cpu_time) << "ms, allowed " << millis << "ms" << std::endl; + return get_thread_time(&currTime) - start_cpu_time; +} +ktimeunit get_thread_time(struct timespec *currTime) +{ + // clockid_t threadClockId; + // pthread_getcpuclockid(pthread_self(), &threadClockId); + // clock_gettime(threadClockId, currTime); + // return currTime->tv_nsec / 1000000.0 + currTime->tv_sec * 1000.0; + + // use the wall clock instead + clock_gettime(CLOCK_MONOTONIC_RAW, currTime); + return currTime->tv_nsec / 1000000.0 + currTime->tv_sec * 1000.0; +} + +#define DUMMY_LOAD_ITER 100000 + +void dummy_load(int load_ms) { + int i, j; + int dummy_load_calib = DummyLoadCalibration::getCalibration(); + // dummy_load_calib is for 100ms + dummy_load_calib = dummy_load_calib * load_ms / 100; + for (j = 0; j < dummy_load_calib; j++) + for (i = 0 ; i < DUMMY_LOAD_ITER; i++) + __asm__ volatile ("nop"); +} + +void DummyLoadCalibration::setCalibration(int calib) { + DummyLoadCalibration::calibration = calib; +} + +int DummyLoadCalibration::getCalibration() { + return DummyLoadCalibration::calibration; +} + +int DummyLoadCalibration::calibration = 1; \ No newline at end of file diff --git a/src/casestudy_tools/src/test_nodes.cpp b/src/casestudy_tools/src/test_nodes.cpp new file mode 100755 index 0000000..b1e925a --- /dev/null +++ b/src/casestudy_tools/src/test_nodes.cpp @@ -0,0 +1,174 @@ +#include "casestudy_tools/test_nodes.hpp" +#include "casestudy_tools/primes_workload.hpp" +#include "rclcpp/rclcpp.hpp" +#include "simple_timer/rt-sched.hpp" +#include +#include +#include + +std::string CaseStudyNode::get_name() { return name; } + +#define BUFFER_LENGTH 0 + +rclcpp::QoS get_qos() +{ + if (BUFFER_LENGTH == 0) + { + return rclcpp::QoS(rclcpp::KeepAll()); + } + else + { + return rclcpp::QoS(rclcpp::KeepLast(BUFFER_LENGTH)); + } +} + +void timespec_diff(timespec *start, timespec *end, timespec *result) +{ + if ((end->tv_nsec - start->tv_nsec) < 0) + { + result->tv_sec = end->tv_sec - start->tv_sec - 1; + result->tv_nsec = 1000000000 + end->tv_nsec - start->tv_nsec; + } + else + { + result->tv_sec = end->tv_sec - start->tv_sec; + result->tv_nsec = end->tv_nsec - start->tv_nsec; + } +} + +PublisherNode::PublisherNode(std::string publish_topic, double runtime, + int period, int chain, + rclcpp::Node::SharedPtr node, + rclcpp::CallbackGroup::SharedPtr callback_group) + : CaseStudyNode(publish_topic, node), runtime_(runtime), period_(period), + chain_(chain) +{ + logger = create_logger(); + pub_ = node->create_publisher(publish_topic, get_qos()); + auto timer_callback = [this]() -> void + { + std::ostringstream ss; + ss << "{\"operation\": \"start_work\", \"chain\": " << chain_ + << ", \"node\": \"" << get_name() << "\", \"count\": " << count_max_; + std::chrono::nanoseconds time_until_trigger = timer_->time_until_trigger(); + std::chrono::microseconds time_until_trigger_us = + std::chrono::duration_cast( + time_until_trigger); + ss << ", \"next_release_us\": " << time_until_trigger_us.count() << "}"; + log_entry(logger, ss.str()); + // nth_prime_silly(runtime_); + double used_runtime = runtime_; + if (use_random_runtime) + { + if (rand() % 100 < runtime_over_chance * 100) + { + used_runtime *= over_runtime_scale; + } + else + { + used_runtime *= normal_runtime_scale; + } + } + // RCLCPP_INFO(rclcpp::get_logger(get_name()), "running %s with runtime %f", + // get_name().c_str(), used_runtime); + struct timespec start_time; + clock_gettime(CLOCK_MONOTONIC, &start_time); + + dummy_load(used_runtime); + std_msgs::msg::Int32 msg; + msg.data = count_max_; + // RCLCPP_INFO(this->get_logger(), "Publishing: '%s'", msg.data.c_str()); + pub_->publish(msg); + // TEST: can we take more than one msg from DDS? + // pub_->publish(msg); + ss.str(""); + ss << "{\"operation\": \"end_work\", \"chain\": " << chain_ + << ", \"node\": \"" << get_name() << "\", \"count\": " << count_max_ << ", \"target_runtime\": " << runtime_ + << "}"; + log_entry(logger, ss.str()); + count_max_--; + if (count_max_ == 0) + { + rclcpp::shutdown(); + } +/* struct timespec end_time; + clock_gettime(CLOCK_MONOTONIC, &end_time); + struct timespec diff; + timespec_diff(&start_time, &end_time, &diff); + size_t diff_ns = diff.tv_sec * 1000000000 + diff.tv_nsec; + size_t diff_ms = diff_ns / 1000000; + RCLCPP_INFO(rclcpp::get_logger(get_name()), "done running %s, took %lu ms", + get_name().c_str(), diff_ms); */ + }; + if (callback_group != nullptr) + { + callback_group_ = callback_group; + } + else + { + callback_group_ = + node->create_callback_group(rclcpp::CallbackGroupType::Reentrant); + } + timer_ = node->create_wall_timer(std::chrono::milliseconds(period), + timer_callback, callback_group_); +} + +WorkerNode::WorkerNode(std::string subscribe_topic, std::string publish_topic, + double runtime, int period, int chain, + rclcpp::Node::SharedPtr node, + rclcpp::CallbackGroup::SharedPtr callback_group) + : CaseStudyNode(publish_topic, node), runtime_(runtime), period_(period), + chain_(chain) +{ + logger = create_logger(); + auto callback = [this](const std_msgs::msg::Int32::SharedPtr msg) -> void + { + // prevent unused variable warning + (void)msg; + + std::ostringstream ss; + ss << "{\"operation\": \"start_work\", \"chain\": " << chain_ + << ", \"node\": \"" << get_name() << "\", \"count\": " << msg->data + << "}"; + log_entry(logger, ss.str()); + // RCLCPP_INFO(this->get_logger(), "I heard: '%s'", msg->data.c_str()); + // nth_prime_silly(runtime_); + double used_runtime = runtime_; + if (use_random_runtime) + { + if (rand() % 100 < runtime_over_chance * 100) + { + used_runtime *= over_runtime_scale; + } + else + { + used_runtime *= normal_runtime_scale; + } + } + dummy_load(used_runtime); + // RCLCPP_INFO(this->get_logger(), "Result: %d", result); + auto new_msg = std_msgs::msg::Int32(); + new_msg.data = msg->data; + pub_->publish(new_msg); + ss.str(""); + ss << "{\"operation\": \"end_work\", \"chain\": " << chain_ + << ", \"node\": \"" << get_name() << "\", \"count\": " << msg->data + << "}"; + log_entry(logger, ss.str()); + }; + rclcpp::SubscriptionOptions sub_options; + if (callback_group) + { + callback_group_ = callback_group; + } + else + { + callback_group_ = + node->create_callback_group(rclcpp::CallbackGroupType::Reentrant); + } + + sub_options.callback_group = callback_group_; + sub_ = node->create_subscription( + subscribe_topic, get_qos(), callback, sub_options); + pub_ = node->create_publisher(publish_topic, get_qos()); +}