add casestudy files
This commit is contained in:
commit
803f778e6c
16 changed files with 2095 additions and 0 deletions
95
src/casestudy/CMakeLists.txt
Executable file
95
src/casestudy/CMakeLists.txt
Executable file
|
@ -0,0 +1,95 @@
|
|||
cmake_minimum_required(VERSION 3.5)
|
||||
project(casestudy)
|
||||
|
||||
# Default to C99
|
||||
if(NOT CMAKE_C_STANDARD)
|
||||
set(CMAKE_C_STANDARD 99)
|
||||
endif()
|
||||
|
||||
# Default to C++14
|
||||
if(NOT CMAKE_CXX_STANDARD)
|
||||
set(CMAKE_CXX_STANDARD 14)
|
||||
endif()
|
||||
|
||||
if(CMAKE_COMPILER_IS_GNUCXX OR CMAKE_CXX_COMPILER_ID MATCHES "Clang")
|
||||
add_compile_options(-Wall -Wextra -Wpedantic)
|
||||
endif()
|
||||
|
||||
# find dependencies
|
||||
find_package(ament_cmake REQUIRED)
|
||||
# uncomment the following section in order to fill in
|
||||
# further dependencies manually.
|
||||
# find_package(<dependency> REQUIRED)
|
||||
find_package(priority_executor REQUIRED)
|
||||
find_package(simple_timer REQUIRED)
|
||||
find_package(rclcpp REQUIRED)
|
||||
find_package(jsoncpp REQUIRED)
|
||||
find_package(casestudy_tools REQUIRED)
|
||||
|
||||
# add_library(test_nodes src/test_nodes.cpp)
|
||||
# target_include_directories(test_nodes PUBLIC
|
||||
# $<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/include>
|
||||
# $<INSTALL_INTERFACE:include>
|
||||
# )
|
||||
# ament_target_dependencies(test_nodes
|
||||
# rclcpp
|
||||
# simple_timer
|
||||
# casestudy_tools
|
||||
# )
|
||||
|
||||
|
||||
list(APPEND experiments
|
||||
casestudy_2023customfile_singlethread
|
||||
casestudy_2024ours_latency
|
||||
casestudy_2024ours_executor2executor
|
||||
casestudy_example
|
||||
)
|
||||
|
||||
function(new_experiment experiment_name)
|
||||
add_executable(${experiment_name} src/${experiment_name}.cpp)
|
||||
target_include_directories(${experiment_name} PUBLIC
|
||||
$<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/include>
|
||||
$<INSTALL_INTERFACE:include>
|
||||
)
|
||||
ament_target_dependencies(${experiment_name}
|
||||
rclcpp
|
||||
priority_executor
|
||||
simple_timer
|
||||
casestudy_tools
|
||||
)
|
||||
endfunction()
|
||||
|
||||
foreach(experiment ${experiments})
|
||||
new_experiment(${experiment})
|
||||
endforeach()
|
||||
|
||||
target_link_libraries(casestudy_2023customfile_singlethread
|
||||
jsoncpp
|
||||
)
|
||||
|
||||
|
||||
install(TARGETS
|
||||
${experiments}
|
||||
DESTINATION lib/${PROJECT_NAME}
|
||||
)
|
||||
|
||||
|
||||
install(
|
||||
DIRECTORY include/
|
||||
DESTINATION include
|
||||
)
|
||||
|
||||
|
||||
if(BUILD_TESTING)
|
||||
find_package(ament_lint_auto REQUIRED)
|
||||
# the following line skips the linter which checks for copyrights
|
||||
# uncomment the line when a copyright and license is not present in all source files
|
||||
#set(ament_cmake_copyright_FOUND TRUE)
|
||||
# the following line skips cpplint (only works in a git repo)
|
||||
# uncomment the line when this package is not in a git repo
|
||||
#set(ament_cmake_cpplint_FOUND TRUE)
|
||||
ament_lint_auto_find_test_dependencies()
|
||||
endif()
|
||||
|
||||
ament_export_include_directories(include)
|
||||
ament_package()
|
22
src/casestudy/package.xml
Executable file
22
src/casestudy/package.xml
Executable file
|
@ -0,0 +1,22 @@
|
|||
<?xml version="1.0"?>
|
||||
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
|
||||
<package format="3">
|
||||
<name>casestudy</name>
|
||||
<version>0.0.0</version>
|
||||
<description>TODO: Package description</description>
|
||||
<maintainer email="vboxuser@todo.todo">vboxuser</maintainer>
|
||||
<license>TODO: License declaration</license>
|
||||
|
||||
<buildtool_depend>ament_cmake</buildtool_depend>
|
||||
|
||||
<test_depend>ament_lint_auto</test_depend>
|
||||
<test_depend>ament_lint_common</test_depend>
|
||||
|
||||
<depend>priority_executor</depend>
|
||||
<depend>simple_timer</depend>
|
||||
<depend>casestudy_tools</depend>
|
||||
|
||||
<export>
|
||||
<build_type>ament_cmake</build_type>
|
||||
</export>
|
||||
</package>
|
213
src/casestudy/src/casestudy_2023customfile_singlethread.cpp
Executable file
213
src/casestudy/src/casestudy_2023customfile_singlethread.cpp
Executable file
|
@ -0,0 +1,213 @@
|
|||
#include "rclcpp/rclcpp.hpp"
|
||||
#include "priority_executor/priority_executor.hpp"
|
||||
#include "priority_executor/priority_memory_strategy.hpp"
|
||||
#include "casestudy_tools/test_nodes.hpp"
|
||||
#include <string>
|
||||
#include <fstream>
|
||||
#include <unistd.h>
|
||||
#include "std_msgs/msg/string.hpp"
|
||||
#include "casestudy_tools/primes_workload.hpp"
|
||||
#include <sys/prctl.h>
|
||||
#include "casestudy_tools/primes_workload.hpp"
|
||||
#include "casestudy_tools/experiment.hpp"
|
||||
#include "jsoncpp/json/reader.h"
|
||||
|
||||
// bool should_do_task = true;
|
||||
std::atomic<bool> should_do_task(true);
|
||||
|
||||
void run_one_executor(std::function<void(std::atomic<bool> &)> exec_fun, int idx)
|
||||
{
|
||||
(void)idx;
|
||||
// set this process to highest priority
|
||||
struct sched_param param;
|
||||
param.sched_priority = sched_get_priority_max(SCHED_FIFO) - 1;
|
||||
int result = sched_setscheduler(0, SCHED_FIFO, ¶m);
|
||||
if (result != 0)
|
||||
{
|
||||
std::cout << "ros_experiment: sched_setscheduler failed: " << result << ": " << strerror(errno) << std::endl;
|
||||
}
|
||||
|
||||
// set process name
|
||||
prctl(PR_SET_NAME, "ros_experiment", 0, 0, 0);
|
||||
|
||||
exec_fun(should_do_task);
|
||||
}
|
||||
|
||||
int ros_experiment(int argc, char **arg, std::string file_name, ExperimentConfig config)
|
||||
{
|
||||
|
||||
rclcpp::init(argc, arg);
|
||||
// we have to create a node to process any passed parameters
|
||||
auto node = rclcpp::Node::make_shared("experiment_node");
|
||||
node->declare_parameter("executor_type", "edf");
|
||||
|
||||
std::string executor_type;
|
||||
node->get_parameter("executor_type", executor_type);
|
||||
std::cout << "using executor type: " << executor_type << std::endl;
|
||||
|
||||
// TODO: move experiment configuration to main
|
||||
config.executor_type = executor_type;
|
||||
config.nodes.push_back(node);
|
||||
|
||||
Experiment experiment(config);
|
||||
// std::string result_log = experiment.run(should_do_task);
|
||||
std::vector<std::function<void(std::atomic<bool> &)>> exec_funs = experiment.getRunFunctions();
|
||||
std::cout << "got " << exec_funs.size() << " executors" << std::endl;
|
||||
std::vector<std::thread> exec_threads;
|
||||
int i = 0;
|
||||
experiment.resetTimers();
|
||||
for (auto exec_fun : exec_funs)
|
||||
{
|
||||
exec_threads.push_back(std::thread(run_one_executor, exec_fun, i));
|
||||
i++;
|
||||
}
|
||||
|
||||
for (auto &t : exec_threads)
|
||||
{
|
||||
t.join();
|
||||
}
|
||||
|
||||
experiment.writeLogsToFile(file_name, "2023customfile_singlethread");
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int main(int argc, char *argv[])
|
||||
{
|
||||
std::ifstream input_file("taskset.json"); // Replace with actual filename
|
||||
|
||||
if (!input_file.is_open())
|
||||
{
|
||||
std::cerr << "Error opening file!" << std::endl;
|
||||
return 1;
|
||||
}
|
||||
|
||||
Json::Value root;
|
||||
Json::Reader reader;
|
||||
bool parsingSuccessful = reader.parse(input_file, root);
|
||||
|
||||
if (!parsingSuccessful)
|
||||
{
|
||||
std::cerr << "Error parsing file!" << std::endl;
|
||||
return 1;
|
||||
}
|
||||
|
||||
// set ourselves to the second highest priority
|
||||
struct sched_param param;
|
||||
param.sched_priority = sched_get_priority_max(SCHED_FIFO) - 2;
|
||||
int result = sched_setscheduler(0, SCHED_FIFO, ¶m);
|
||||
if (result != 0)
|
||||
{
|
||||
std::cout << "ros_experiment: sched_setscheduler failed: " << result << ": " << strerror(errno) << std::endl;
|
||||
}
|
||||
// calibrate the dummy load
|
||||
// Naive way to calibrate dummy workload for current system
|
||||
int dummy_load_calib = 1;
|
||||
while (1)
|
||||
{
|
||||
timeval ctime, ftime;
|
||||
int duration_us;
|
||||
gettimeofday(&ctime, NULL);
|
||||
dummy_load(100); // 100ms
|
||||
gettimeofday(&ftime, NULL);
|
||||
duration_us = (ftime.tv_sec - ctime.tv_sec) * 1000000 + (ftime.tv_usec - ctime.tv_usec);
|
||||
RCLCPP_INFO(rclcpp::get_logger("rclcpp"), "dummy_load_calib: %d (duration_us: %d ns)", dummy_load_calib, duration_us);
|
||||
if (abs(duration_us - 100 * 1000) < 500)
|
||||
{ // error margin: 500us
|
||||
break;
|
||||
}
|
||||
dummy_load_calib = 100 * 1000 * dummy_load_calib / duration_us;
|
||||
if (dummy_load_calib <= 0)
|
||||
dummy_load_calib = 1;
|
||||
DummyLoadCalibration::setCalibration(dummy_load_calib);
|
||||
}
|
||||
// DummyLoadCalibration::setCalibration(2900);
|
||||
ExperimentConfig config;
|
||||
|
||||
// config.chain_lengths = {4, 3, 4};
|
||||
for (uint i = 0; i < root["chain_lengths"].size(); i++)
|
||||
{
|
||||
config.chain_lengths.push_back(root["chain_lengths"][i].asUInt());
|
||||
}
|
||||
|
||||
// config.node_ids = {{0, 1, 2, 3}, {4, 5, 6}, {7, 8, 9, 10}};
|
||||
for (uint i = 0; i < root["node_ids"].size(); i++)
|
||||
{
|
||||
std::vector<uint32_t> node_ids_row;
|
||||
for (uint j = 0; j < root["node_ids"][i].size(); j++)
|
||||
{
|
||||
node_ids_row.push_back(root["node_ids"][i][j].asUInt());
|
||||
}
|
||||
config.node_ids.push_back(node_ids_row);
|
||||
}
|
||||
// config.node_priorities = {3, 2, 1, 0, 6, 5, 4, 10, 9, 8, 7}; // PICAS-style prioritiesA
|
||||
for (uint i = 0; i < root["node_priorities"].size(); i++)
|
||||
{
|
||||
config.node_priorities.push_back(root["node_priorities"][i].asUInt());
|
||||
}
|
||||
|
||||
// disabled for custom
|
||||
config.num_groups = 0;
|
||||
// config.group_memberships = {0, 0, 1, 2, 0, 0, 1, 0, 0, 1, 2};
|
||||
|
||||
// config.chain_timer_control = {0, 1, 2};
|
||||
for (uint i = 0; i < root["chain_timer_control"].size(); i++)
|
||||
{
|
||||
config.chain_timer_control.push_back(root["chain_timer_control"][i].asInt());
|
||||
}
|
||||
|
||||
// config.node_runtimes = {5, 5, 5, 5, // chain 1
|
||||
// 10, 20, 5, // chain 2
|
||||
// 10, 10, 15, 20}; // chain 3
|
||||
for (uint i = 0; i < root["node_runtimes"].size(); i++)
|
||||
{
|
||||
config.node_runtimes.push_back(root["node_runtimes"][i].asDouble());
|
||||
}
|
||||
// config.chain_periods = {50, 100, 110};
|
||||
for (uint i = 0; i < root["chain_periods"].size(); i++)
|
||||
{
|
||||
config.chain_periods.push_back(root["chain_periods"][i].asInt());
|
||||
}
|
||||
// config.node_executor_assignments = {}; // empty - multicore mode
|
||||
for (uint i = 0; i < root["node_executor_assignments"].size(); i++)
|
||||
{
|
||||
config.node_executor_assignments.push_back(root["node_executor_assignments"][i].asUInt() - 1);
|
||||
}
|
||||
// single threaded mode - assign executors to cpu cores
|
||||
for (uint i = 0; i < root["executor_to_cpu_core"].size(); i++)
|
||||
{
|
||||
config.executor_to_cpu_assignments.push_back(root["executor_to_cpu_core"][i].asUInt() - 1);
|
||||
}
|
||||
|
||||
config.parallel_mode = false;
|
||||
config.cores = 4;
|
||||
|
||||
// sanity_check_config(config);
|
||||
|
||||
// from 35/65 to 100/0 in increments of 5
|
||||
// for (int i = 0; i <= (100 - 35) / 5; i++)
|
||||
// {
|
||||
// for now, run with 100% ros
|
||||
int i = (100 - 35) / 5;
|
||||
int ros_budget = 35 + i * 5;
|
||||
int other_task_budget = 65 - i * 5;
|
||||
std::cout << "ROS budget: " << ros_budget << " Other task budget: " << other_task_budget << std::endl;
|
||||
// do other task in a separate thread
|
||||
should_do_task.store(true);
|
||||
double other_task_time = 80 * other_task_budget / 100.0;
|
||||
std::thread other_task(do_other_task, other_task_time, 80, std::ref(should_do_task));
|
||||
|
||||
// do ros task
|
||||
|
||||
std::string file_name = "ros_" + std::to_string(ros_budget);
|
||||
// ros_experiment(argc, argv, file_name);
|
||||
std::thread ros_task(ros_experiment, argc, argv, file_name, config);
|
||||
std::cout << "tasks started" << std::endl;
|
||||
ros_task.join();
|
||||
other_task.join();
|
||||
std::cout << "tasks joined" << std::endl;
|
||||
|
||||
// // run once
|
||||
// break;
|
||||
// }
|
||||
}
|
215
src/casestudy/src/casestudy_2024ours_executor2executor.cpp
Executable file
215
src/casestudy/src/casestudy_2024ours_executor2executor.cpp
Executable file
|
@ -0,0 +1,215 @@
|
|||
#include "casestudy_tools/experiment.hpp"
|
||||
#include "casestudy_tools/primes_workload.hpp"
|
||||
#include <string>
|
||||
#include <sys/prctl.h>
|
||||
#include <unistd.h>
|
||||
|
||||
// bool should_do_task = true;
|
||||
std::atomic<bool> should_do_task(true);
|
||||
|
||||
void run_one_executor(std::function<void(std::atomic<bool> &)> exec_fun,
|
||||
int idx) {
|
||||
(void)idx;
|
||||
// set this process to highest priority
|
||||
// struct sched_param param;
|
||||
// param.sched_priority = sched_get_priority_max(SCHED_FIFO) - 1;
|
||||
// int result = sched_setscheduler(0, SCHED_FIFO, ¶m);
|
||||
// if (result != 0)
|
||||
// {
|
||||
// std::cout << "ros_experiment: sched_setscheduler failed: " << result <<
|
||||
// ": " << strerror(errno) << std::endl;
|
||||
// }
|
||||
sched_attr rt_attr;
|
||||
rt_attr.size = sizeof(rt_attr);
|
||||
rt_attr.sched_flags = 0;
|
||||
rt_attr.sched_nice = 0;
|
||||
rt_attr.sched_priority = 99;
|
||||
rt_attr.sched_policy = SCHED_FIFO;
|
||||
rt_attr.sched_runtime = 0;
|
||||
rt_attr.sched_period = 0;
|
||||
int result = sched_setattr(0, &rt_attr, 0);
|
||||
if (result != 0) {
|
||||
std::cout << "executor task: could not set scheduler: " << result << ": "
|
||||
<< strerror(errno) << std::endl;
|
||||
}
|
||||
|
||||
// set process name
|
||||
prctl(PR_SET_NAME, "ros_experiment", 0, 0, 0);
|
||||
|
||||
exec_fun(should_do_task);
|
||||
}
|
||||
|
||||
int ros_experiment(rclcpp::Node::SharedPtr node, std::string file_name,
|
||||
ExperimentConfig config) {
|
||||
|
||||
std::string executor_type;
|
||||
node->get_parameter("executor_type", executor_type);
|
||||
int num_subscribers;
|
||||
node->get_parameter("num_subscribers", num_subscribers);
|
||||
int num_chains;
|
||||
node->get_parameter("num_chains", num_chains);
|
||||
std::cout << "using executor type: " << executor_type << std::endl;
|
||||
|
||||
// TODO: move experiment configuration to main
|
||||
config.executor_type = executor_type;
|
||||
config.nodes.push_back(node);
|
||||
|
||||
Experiment experiment(config);
|
||||
// std::string result_log = experiment.run(should_do_task);
|
||||
std::vector<std::function<void(std::atomic<bool> &)>> exec_funs =
|
||||
experiment.getRunFunctions();
|
||||
std::cout << "got " << exec_funs.size() << " executors" << std::endl;
|
||||
std::vector<std::thread> exec_threads;
|
||||
int i = 0;
|
||||
experiment.resetTimers();
|
||||
for (auto exec_fun : exec_funs) {
|
||||
exec_threads.push_back(std::thread(run_one_executor, exec_fun, i));
|
||||
i++;
|
||||
}
|
||||
|
||||
for (auto &t : exec_threads) {
|
||||
t.join();
|
||||
}
|
||||
|
||||
std::string outputname = "2024ours_executor2executor";
|
||||
outputname +=
|
||||
"_" + std::to_string(num_subscribers) + "_" + std::to_string(num_chains);
|
||||
experiment.writeLogsToFile(file_name, outputname);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int main(int argc, char *argv[]) {
|
||||
rclcpp::init(argc, argv);
|
||||
// we have to create a node to process any passed parameters
|
||||
auto node = rclcpp::Node::make_shared("experiment_node");
|
||||
// auto node = rclcpp::Node::make_shared("experiment_node",
|
||||
// rclcpp::NodeOptions().use_intra_process_comms(true));
|
||||
node->declare_parameter("executor_type", "edf");
|
||||
node->declare_parameter("num_subscribers", 3);
|
||||
node->declare_parameter("num_chains", 1);
|
||||
node->declare_parameter("runtime", 0);
|
||||
|
||||
int num_s;
|
||||
node->get_parameter("num_subscribers", num_s);
|
||||
int runtime;
|
||||
node->get_parameter("runtime", runtime);
|
||||
int num_chains;
|
||||
node->get_parameter("num_chains", num_chains);
|
||||
|
||||
// set ourselves to the second highest priority
|
||||
// struct sched_param param;
|
||||
// param.sched_priority = sched_get_priority_max(SCHED_FIFO) - 2;
|
||||
// int result = sched_setscheduler(0, SCHED_FIFO, ¶m);
|
||||
sched_attr rt_attr;
|
||||
rt_attr.size = sizeof(rt_attr);
|
||||
rt_attr.sched_policy = SCHED_FIFO;
|
||||
// rt_attr.sched_priority = sched_get_priority_max(SCHED_FIFO) - 2;
|
||||
rt_attr.sched_priority = 99;
|
||||
int result = sched_setattr(0, &rt_attr, 0);
|
||||
|
||||
if (result != 0) {
|
||||
std::cout << "ros_experiment: sched_setscheduler failed: " << result << ": "
|
||||
<< strerror(errno) << std::endl;
|
||||
}
|
||||
// calibrate the dummy load
|
||||
// Naive way to calibrate dummy workload for current system
|
||||
int dummy_load_calib = 1;
|
||||
while (1) {
|
||||
timeval ctime, ftime;
|
||||
int duration_us;
|
||||
gettimeofday(&ctime, NULL);
|
||||
dummy_load(100); // 100ms
|
||||
gettimeofday(&ftime, NULL);
|
||||
duration_us = (ftime.tv_sec - ctime.tv_sec) * 1000000 +
|
||||
(ftime.tv_usec - ctime.tv_usec);
|
||||
RCLCPP_INFO(rclcpp::get_logger("rclcpp"),
|
||||
"dummy_load_calib: %d (duration_us: %d ns)", dummy_load_calib,
|
||||
duration_us);
|
||||
if (abs(duration_us - 100 * 1000) < 500) { // error margin: 500us
|
||||
break;
|
||||
}
|
||||
dummy_load_calib = 100 * 1000 * dummy_load_calib / duration_us;
|
||||
if (dummy_load_calib <= 0)
|
||||
dummy_load_calib = 1;
|
||||
DummyLoadCalibration::setCalibration(dummy_load_calib);
|
||||
}
|
||||
// DummyLoadCalibration::setCalibration(2900);
|
||||
ExperimentConfig config;
|
||||
|
||||
// runtimes and timers subject to change
|
||||
config.chain_lengths = {};
|
||||
config.node_ids = {};
|
||||
config.chain_timer_control = {};
|
||||
config.chain_periods = {};
|
||||
config.node_runtimes = {};
|
||||
config.node_executor_assignments = {};
|
||||
int node_id = 0;
|
||||
for (int c = 0; c < num_chains; c++) {
|
||||
config.node_ids.push_back({});
|
||||
for (int i = 0; i < num_s + 1; i++) {
|
||||
config.node_ids[c].push_back(node_id);
|
||||
if (i == 0) {
|
||||
// first node is the publisher, and goes in the first executor
|
||||
config.node_executor_assignments.push_back(0);
|
||||
} else {
|
||||
// all other nodes go in the second executor
|
||||
config.node_executor_assignments.push_back(1);
|
||||
}
|
||||
node_id++;
|
||||
}
|
||||
for (int i = 0; i < num_s + 1; i++) {
|
||||
config.node_priorities.push_back(i);
|
||||
}
|
||||
config.chain_lengths.push_back(num_s+1);
|
||||
config.chain_periods.push_back(10);
|
||||
for (int i = 0; i < num_s + 1; i++) {
|
||||
config.node_runtimes.push_back(runtime);
|
||||
}
|
||||
config.chain_timer_control.push_back(c);
|
||||
}
|
||||
|
||||
config.executor_to_cpu_assignments = {0, 1};
|
||||
|
||||
std::cout << "node ids: " << std::endl;
|
||||
for (size_t i = 0; i < config.node_ids.size(); i++) {
|
||||
for (size_t j = 0; j < config.node_ids[i].size(); j++) {
|
||||
std::cout << config.node_ids[i][j] << " ";
|
||||
}
|
||||
std::cout << std::endl;
|
||||
}
|
||||
|
||||
|
||||
config.num_groups = 0;
|
||||
config.group_memberships = {}; // no groups
|
||||
|
||||
config.parallel_mode = false;
|
||||
config.cores = 1;
|
||||
config.special_cases.push_back("2024ours_latency");
|
||||
|
||||
// sanity_check_config(config);
|
||||
|
||||
// from 35/65 to 100/0 in increments of 5
|
||||
// for (int i = 0; i <= (100 - 35) / 5; i++)
|
||||
// {
|
||||
// for now, run with 100% ros
|
||||
int i = (100 - 35) / 5;
|
||||
int ros_budget = 35 + i * 5;
|
||||
int other_task_budget = 65 - i * 5;
|
||||
std::cout << "ROS budget: " << ros_budget
|
||||
<< " Other task budget: " << other_task_budget << std::endl;
|
||||
// do other task in a separate thread
|
||||
should_do_task.store(true);
|
||||
|
||||
// do ros task
|
||||
std::string file_name = "ros_" + std::to_string(ros_budget);
|
||||
// ros_experiment(argc, argv, file_name);
|
||||
std::thread ros_task(ros_experiment, node, file_name, config);
|
||||
std::cout << "tasks started" << std::endl;
|
||||
ros_task.join();
|
||||
std::cout << "tasks joined" << std::endl;
|
||||
|
||||
// // run once
|
||||
// break;
|
||||
// }
|
||||
}
|
208
src/casestudy/src/casestudy_2024ours_latency.cpp
Executable file
208
src/casestudy/src/casestudy_2024ours_latency.cpp
Executable file
|
@ -0,0 +1,208 @@
|
|||
#include "casestudy_tools/experiment.hpp"
|
||||
#include "casestudy_tools/primes_workload.hpp"
|
||||
#include <string>
|
||||
#include <sys/prctl.h>
|
||||
#include <unistd.h>
|
||||
|
||||
// bool should_do_task = true;
|
||||
std::atomic<bool> should_do_task(true);
|
||||
|
||||
|
||||
void run_one_executor(std::function<void(std::atomic<bool> &)> exec_fun,
|
||||
int idx) {
|
||||
(void)idx;
|
||||
// set this process to highest priority
|
||||
// struct sched_param param;
|
||||
// param.sched_priority = sched_get_priority_max(SCHED_FIFO) - 1;
|
||||
// int result = sched_setscheduler(0, SCHED_FIFO, ¶m);
|
||||
// if (result != 0)
|
||||
// {
|
||||
// std::cout << "ros_experiment: sched_setscheduler failed: " << result <<
|
||||
// ": " << strerror(errno) << std::endl;
|
||||
// }
|
||||
sched_attr rt_attr;
|
||||
rt_attr.size = sizeof(rt_attr);
|
||||
rt_attr.sched_flags = 0;
|
||||
rt_attr.sched_nice = 0;
|
||||
rt_attr.sched_priority = 99;
|
||||
rt_attr.sched_policy = SCHED_FIFO;
|
||||
rt_attr.sched_runtime = 0;
|
||||
rt_attr.sched_period = 0;
|
||||
int result = sched_setattr(0, &rt_attr, 0);
|
||||
if (result != 0) {
|
||||
std::cout << "executor task: could not set scheduler: " << result << ": "
|
||||
<< strerror(errno) << std::endl;
|
||||
}
|
||||
|
||||
// set process name
|
||||
prctl(PR_SET_NAME, "ros_experiment", 0, 0, 0);
|
||||
|
||||
exec_fun(should_do_task);
|
||||
}
|
||||
|
||||
int ros_experiment(rclcpp::Node::SharedPtr node, std::string file_name,
|
||||
ExperimentConfig config) {
|
||||
|
||||
std::string executor_type;
|
||||
node->get_parameter("executor_type", executor_type);
|
||||
int num_subscribers;
|
||||
node->get_parameter("num_subscribers", num_subscribers);
|
||||
int num_chains;
|
||||
node->get_parameter("num_chains", num_chains);
|
||||
std::cout << "using executor type: " << executor_type << std::endl;
|
||||
|
||||
// TODO: move experiment configuration to main
|
||||
config.executor_type = executor_type;
|
||||
config.nodes.push_back(node);
|
||||
|
||||
Experiment experiment(config);
|
||||
// std::string result_log = experiment.run(should_do_task);
|
||||
std::vector<std::function<void(std::atomic<bool> &)>> exec_funs =
|
||||
experiment.getRunFunctions();
|
||||
std::cout << "got " << exec_funs.size() << " executors" << std::endl;
|
||||
std::vector<std::thread> exec_threads;
|
||||
int i = 0;
|
||||
experiment.resetTimers();
|
||||
for (auto exec_fun : exec_funs) {
|
||||
exec_threads.push_back(std::thread(run_one_executor, exec_fun, i));
|
||||
i++;
|
||||
}
|
||||
|
||||
for (auto &t : exec_threads) {
|
||||
t.join();
|
||||
}
|
||||
|
||||
std::string outputname = "2024ours_latency";
|
||||
outputname +=
|
||||
"_" + std::to_string(num_subscribers) + "_" + std::to_string(num_chains);
|
||||
experiment.writeLogsToFile(file_name, outputname);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int main(int argc, char *argv[]) {
|
||||
rclcpp::init(argc, argv);
|
||||
// we have to create a node to process any passed parameters
|
||||
auto node = rclcpp::Node::make_shared("experiment_node");
|
||||
// auto node = rclcpp::Node::make_shared("experiment_node",
|
||||
// rclcpp::NodeOptions().use_intra_process_comms(true));
|
||||
node->declare_parameter("executor_type", "edf");
|
||||
node->declare_parameter("num_subscribers", 3);
|
||||
node->declare_parameter("num_chains", 1);
|
||||
node->declare_parameter("runtime", 0);
|
||||
|
||||
int num_s;
|
||||
node->get_parameter("num_subscribers", num_s);
|
||||
int runtime;
|
||||
node->get_parameter("runtime", runtime);
|
||||
int num_chains;
|
||||
node->get_parameter("num_chains", num_chains);
|
||||
|
||||
// set ourselves to the second highest priority
|
||||
// struct sched_param param;
|
||||
// param.sched_priority = sched_get_priority_max(SCHED_FIFO) - 2;
|
||||
// int result = sched_setscheduler(0, SCHED_FIFO, ¶m);
|
||||
sched_attr rt_attr;
|
||||
rt_attr.size = sizeof(rt_attr);
|
||||
rt_attr.sched_policy = SCHED_FIFO;
|
||||
// rt_attr.sched_priority = sched_get_priority_max(SCHED_FIFO) - 2;
|
||||
rt_attr.sched_priority = 99;
|
||||
int result = sched_setattr(0, &rt_attr, 0);
|
||||
|
||||
if (result != 0) {
|
||||
std::cout << "ros_experiment: sched_setscheduler failed: " << result << ": "
|
||||
<< strerror(errno) << std::endl;
|
||||
}
|
||||
// calibrate the dummy load
|
||||
// Naive way to calibrate dummy workload for current system
|
||||
int dummy_load_calib = 1;
|
||||
while (1) {
|
||||
timeval ctime, ftime;
|
||||
int duration_us;
|
||||
gettimeofday(&ctime, NULL);
|
||||
dummy_load(100); // 100ms
|
||||
gettimeofday(&ftime, NULL);
|
||||
duration_us = (ftime.tv_sec - ctime.tv_sec) * 1000000 +
|
||||
(ftime.tv_usec - ctime.tv_usec);
|
||||
RCLCPP_INFO(rclcpp::get_logger("rclcpp"),
|
||||
"dummy_load_calib: %d (duration_us: %d ns)", dummy_load_calib,
|
||||
duration_us);
|
||||
if (abs(duration_us - 100 * 1000) < 500) { // error margin: 500us
|
||||
break;
|
||||
}
|
||||
dummy_load_calib = 100 * 1000 * dummy_load_calib / duration_us;
|
||||
if (dummy_load_calib <= 0)
|
||||
dummy_load_calib = 1;
|
||||
DummyLoadCalibration::setCalibration(dummy_load_calib);
|
||||
}
|
||||
// DummyLoadCalibration::setCalibration(2900);
|
||||
ExperimentConfig config;
|
||||
|
||||
// runtimes and timers subject to change
|
||||
config.chain_lengths = {};
|
||||
config.node_ids = {};
|
||||
config.chain_timer_control = {};
|
||||
config.chain_periods = {};
|
||||
config.node_runtimes = {};
|
||||
int node_id = 0;
|
||||
for (int c = 0; c < num_chains; c++) {
|
||||
config.node_ids.push_back({});
|
||||
for (int i = 0; i < num_s + 1; i++) {
|
||||
config.node_ids[c].push_back(node_id);
|
||||
node_id++;
|
||||
}
|
||||
for (int i = 0; i < num_s + 1; i++) {
|
||||
config.node_priorities.push_back(i);
|
||||
}
|
||||
config.chain_lengths.push_back(num_s+1);
|
||||
config.chain_periods.push_back(10);
|
||||
for (int i = 0; i < num_s + 1; i++) {
|
||||
config.node_runtimes.push_back(runtime);
|
||||
}
|
||||
config.chain_timer_control.push_back(c);
|
||||
}
|
||||
|
||||
std::cout << "node ids: " << std::endl;
|
||||
for (size_t i = 0; i < config.node_ids.size(); i++) {
|
||||
for (size_t j = 0; j < config.node_ids[i].size(); j++) {
|
||||
std::cout << config.node_ids[i][j] << " ";
|
||||
}
|
||||
std::cout << std::endl;
|
||||
}
|
||||
|
||||
|
||||
config.num_groups = 0;
|
||||
config.group_memberships = {}; // no groups
|
||||
|
||||
config.node_executor_assignments = {};
|
||||
// config.node_executor_assignments = {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0};
|
||||
config.parallel_mode = false;
|
||||
config.cores = 1;
|
||||
config.special_cases.push_back("2024ours_latency");
|
||||
|
||||
// sanity_check_config(config);
|
||||
|
||||
// from 35/65 to 100/0 in increments of 5
|
||||
// for (int i = 0; i <= (100 - 35) / 5; i++)
|
||||
// {
|
||||
// for now, run with 100% ros
|
||||
int i = (100 - 35) / 5;
|
||||
int ros_budget = 35 + i * 5;
|
||||
int other_task_budget = 65 - i * 5;
|
||||
std::cout << "ROS budget: " << ros_budget
|
||||
<< " Other task budget: " << other_task_budget << std::endl;
|
||||
// do other task in a separate thread
|
||||
should_do_task.store(true);
|
||||
|
||||
// do ros task
|
||||
std::string file_name = "ros_" + std::to_string(ros_budget);
|
||||
// ros_experiment(argc, argv, file_name);
|
||||
std::thread ros_task(ros_experiment, node, file_name, config);
|
||||
std::cout << "tasks started" << std::endl;
|
||||
ros_task.join();
|
||||
std::cout << "tasks joined" << std::endl;
|
||||
|
||||
// // run once
|
||||
// break;
|
||||
// }
|
||||
}
|
132
src/casestudy/src/casestudy_example.cpp
Normal file
132
src/casestudy/src/casestudy_example.cpp
Normal file
|
@ -0,0 +1,132 @@
|
|||
#include "rclcpp/rclcpp.hpp"
|
||||
#include "priority_executor/priority_executor.hpp"
|
||||
#include "priority_executor/priority_memory_strategy.hpp"
|
||||
#include "casestudy_tools/test_nodes.hpp"
|
||||
#include <string>
|
||||
#include <fstream>
|
||||
#include <unistd.h>
|
||||
#include "std_msgs/msg/string.hpp"
|
||||
#include "casestudy_tools/primes_workload.hpp"
|
||||
#include <sys/prctl.h>
|
||||
#include "casestudy_tools/primes_workload.hpp"
|
||||
#include "casestudy_tools/experiment.hpp"
|
||||
|
||||
// bool should_do_task = true;
|
||||
std::atomic<bool> should_do_task(true);
|
||||
|
||||
void run_one_executor(std::function<void(std::atomic<bool> &)> exec_fun, int idx)
|
||||
{
|
||||
(void)idx;
|
||||
// cpu_set_t cpuset;
|
||||
// CPU_ZERO(&cpuset);
|
||||
// CPU_SET(0, &cpuset);
|
||||
// CPU_SET(1, &cpuset);
|
||||
// CPU_SET(2, &cpuset);
|
||||
// CPU_SET(3, &cpuset);
|
||||
|
||||
// int result = sched_setaffinity(0, sizeof(cpu_set_t), &cpuset);
|
||||
// if (result != 0)
|
||||
// {
|
||||
// std::cout << "ros_experiment: sched_setaffinity failed: " << result << ": " << strerror(errno) << std::endl;
|
||||
// }
|
||||
|
||||
// set this process to second highest priority
|
||||
struct sched_param param;
|
||||
param.sched_priority = sched_get_priority_max(SCHED_FIFO) - 2;
|
||||
int result = sched_setscheduler(0, SCHED_FIFO, ¶m);
|
||||
if (result != 0)
|
||||
{
|
||||
std::cout << "ros_experiment: sched_setscheduler failed: " << result << ": " << strerror(errno) << std::endl;
|
||||
}
|
||||
|
||||
// set process name
|
||||
prctl(PR_SET_NAME, "ros_experiment", 0, 0, 0);
|
||||
|
||||
exec_fun(should_do_task);
|
||||
}
|
||||
|
||||
int ros_experiment(int argc, char **arg, std::string file_name, ExperimentConfig config)
|
||||
{
|
||||
|
||||
rclcpp::init(argc, arg);
|
||||
// we have to create a node to process any passed parameters
|
||||
auto node = rclcpp::Node::make_shared("experiment_parameters");
|
||||
node->declare_parameter("executor_type", "edf");
|
||||
|
||||
std::string executor_type;
|
||||
node->get_parameter("executor_type", executor_type);
|
||||
std::cout << "using executor type: " << executor_type << std::endl;
|
||||
|
||||
// TODO: move experiment configuration to main
|
||||
config.executor_type = executor_type;
|
||||
config.parallel_mode = true;
|
||||
config.nodes.push_back(node);
|
||||
|
||||
Experiment experiment(config);
|
||||
// std::string result_log = experiment.run(should_do_task);
|
||||
std::vector<std::function<void(std::atomic<bool> &)>> exec_funs = experiment.getRunFunctions();
|
||||
std::cout << "got " << exec_funs.size() << " executors" << std::endl;
|
||||
std::vector<std::thread> exec_threads;
|
||||
int i = 0;
|
||||
experiment.resetTimers();
|
||||
for (auto exec_fun : exec_funs)
|
||||
{
|
||||
exec_threads.push_back(std::thread(run_one_executor, exec_fun, i));
|
||||
i++;
|
||||
}
|
||||
|
||||
for (auto &t : exec_threads)
|
||||
{
|
||||
t.join();
|
||||
}
|
||||
std::string outputname = "casestudy_example";
|
||||
experiment.writeLogsToFile(file_name, outputname);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int main(int argc, char *argv[])
|
||||
{
|
||||
|
||||
// calibrate the dummy load
|
||||
// Naive way to calibrate dummy workload for current system
|
||||
int dummy_load_calib = 1;
|
||||
while (1)
|
||||
{
|
||||
timeval ctime, ftime;
|
||||
int duration_us;
|
||||
gettimeofday(&ctime, NULL);
|
||||
dummy_load(100); // 100ms
|
||||
gettimeofday(&ftime, NULL);
|
||||
duration_us = (ftime.tv_sec - ctime.tv_sec) * 1000000 + (ftime.tv_usec - ctime.tv_usec);
|
||||
RCLCPP_INFO(rclcpp::get_logger("rclcpp"), "dummy_load_calib: %d (duration_us: %d ns)", dummy_load_calib, duration_us);
|
||||
if (abs(duration_us - 100 * 1000) < 500)
|
||||
{ // error margin: 500us
|
||||
break;
|
||||
}
|
||||
dummy_load_calib = 100 * 1000 * dummy_load_calib / duration_us;
|
||||
if (dummy_load_calib <= 0)
|
||||
dummy_load_calib = 1;
|
||||
DummyLoadCalibration::setCalibration(dummy_load_calib);
|
||||
}
|
||||
// DummyLoadCalibration::setCalibration(2900);
|
||||
ExperimentConfig config;
|
||||
config.chain_lengths = {2, 2};
|
||||
config.node_ids = {{0, 1}, {2, 3}};
|
||||
config.node_priorities = {1, 0, 3, 2};
|
||||
config.chain_timer_control = {0, 1};
|
||||
|
||||
config.node_runtimes = {10, 10, 10, 10};
|
||||
// node 0 has a period of 80, and is the only timer
|
||||
config.chain_periods = {100, 100};
|
||||
config.node_executor_assignments = {};
|
||||
config.parallel_mode = true;
|
||||
config.cores = 2;
|
||||
|
||||
sanity_check_config(config);
|
||||
|
||||
std::thread ros_task(ros_experiment, argc, argv, "cs_example", config);
|
||||
std::cout << "tasks started" << std::endl;
|
||||
ros_task.join();
|
||||
std::cout << "tasks joined" << std::endl;
|
||||
}
|
92
src/casestudy_tools/CMakeLists.txt
Executable file
92
src/casestudy_tools/CMakeLists.txt
Executable file
|
@ -0,0 +1,92 @@
|
|||
cmake_minimum_required(VERSION 3.5)
|
||||
project(casestudy_tools)
|
||||
|
||||
# Default to C99
|
||||
if(NOT CMAKE_C_STANDARD)
|
||||
set(CMAKE_C_STANDARD 99)
|
||||
endif()
|
||||
|
||||
# Default to C++14
|
||||
if(NOT CMAKE_CXX_STANDARD)
|
||||
set(CMAKE_CXX_STANDARD 14)
|
||||
endif()
|
||||
|
||||
if(CMAKE_COMPILER_IS_GNUCXX OR CMAKE_CXX_COMPILER_ID MATCHES "Clang")
|
||||
add_compile_options(-Wall -Wextra -Wpedantic)
|
||||
endif()
|
||||
|
||||
# find dependencies
|
||||
find_package(ament_cmake REQUIRED)
|
||||
find_package(rclcpp REQUIRED)
|
||||
find_package(priority_executor REQUIRED)
|
||||
find_package(simple_timer REQUIRED)
|
||||
|
||||
if(BUILD_TESTING)
|
||||
find_package(ament_lint_auto REQUIRED)
|
||||
# the following line skips the linter which checks for copyrights
|
||||
# uncomment the line when a copyright and license is not present in all source files
|
||||
#set(ament_cmake_copyright_FOUND TRUE)
|
||||
# the following line skips cpplint (only works in a git repo)
|
||||
# uncomment the line when this package is not in a git repo
|
||||
#set(ament_cmake_cpplint_FOUND TRUE)
|
||||
ament_lint_auto_find_test_dependencies()
|
||||
endif()
|
||||
|
||||
add_library(primes_workload src/primes_workload.cpp)
|
||||
target_include_directories(primes_workload PUBLIC
|
||||
$<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/include>
|
||||
$<INSTALL_INTERFACE:include>
|
||||
)
|
||||
ament_target_dependencies(primes_workload
|
||||
simple_timer
|
||||
rclcpp
|
||||
priority_executor
|
||||
|
||||
)
|
||||
|
||||
add_library(test_nodes src/test_nodes.cpp)
|
||||
target_include_directories(test_nodes PUBLIC
|
||||
$<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/include>
|
||||
$<INSTALL_INTERFACE:include>
|
||||
)
|
||||
target_link_libraries(test_nodes primes_workload)
|
||||
ament_target_dependencies(test_nodes
|
||||
rclcpp
|
||||
simple_timer
|
||||
)
|
||||
|
||||
add_library(experiment_host src/experiment.cpp)
|
||||
target_include_directories(experiment_host PUBLIC
|
||||
$<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/include>
|
||||
$<INSTALL_INTERFACE:include>
|
||||
)
|
||||
target_link_libraries(experiment_host
|
||||
primes_workload
|
||||
test_nodes
|
||||
)
|
||||
ament_target_dependencies(experiment_host
|
||||
rclcpp
|
||||
simple_timer
|
||||
priority_executor
|
||||
)
|
||||
|
||||
|
||||
install(
|
||||
DIRECTORY include/
|
||||
DESTINATION include
|
||||
)
|
||||
|
||||
install(
|
||||
TARGETS primes_workload test_nodes experiment_host
|
||||
EXPORT export_${PROJECT_NAME}
|
||||
ARCHIVE DESTINATION lib
|
||||
LIBRARY DESTINATION lib
|
||||
RUNTIME DESTINATION bin
|
||||
INCLUDES DESTINATION include
|
||||
)
|
||||
|
||||
ament_export_include_directories(include)
|
||||
# ament_export_libraries(primes_workload test_nodes experiment_host)
|
||||
ament_export_targets(export_${PROJECT_NAME} HAS_LIBRARY_TARGET)
|
||||
ament_export_dependencies(rclcpp priority_executor simple_timer)
|
||||
ament_package()
|
140
src/casestudy_tools/include/casestudy_tools/experiment.hpp
Executable file
140
src/casestudy_tools/include/casestudy_tools/experiment.hpp
Executable file
|
@ -0,0 +1,140 @@
|
|||
#ifndef RTIS_EXPERIMENT_HOST
|
||||
#define RTIS_EXPERIMENT_HOST
|
||||
|
||||
#include <vector>
|
||||
#include <cstdint>
|
||||
#include <string>
|
||||
#include "priority_executor/priority_executor.hpp"
|
||||
#include "casestudy_tools/test_nodes.hpp"
|
||||
#include "simple_timer/rt-sched.hpp"
|
||||
|
||||
|
||||
struct ExperimentConfig
|
||||
{
|
||||
std::vector<uint32_t> chain_lengths;
|
||||
// TODO: rename to callback_ids
|
||||
std::vector<std::vector<uint32_t>> node_ids;
|
||||
// TODO: rename to callback_runtimes
|
||||
std::vector<double> node_runtimes;
|
||||
// TODO: rename to callback_priorities
|
||||
std::vector<uint32_t> node_priorities;
|
||||
std::vector<int> chain_periods;
|
||||
// for each chain, the index of the timer node
|
||||
std::vector<int> chain_timer_control;
|
||||
|
||||
uint num_groups = 0;
|
||||
std::vector<int> group_memberships;
|
||||
|
||||
// TODO: change to enum
|
||||
// can be "default", "picas", "edf"
|
||||
std::string executor_type = "default";
|
||||
|
||||
// if this is empty, then the nodes will be assigned to a single executor
|
||||
// TODO: rename this to callback_executor_assignments
|
||||
std::vector<uint32_t> node_executor_assignments;
|
||||
// used in single-threaded mode to assign executors to cores
|
||||
std::vector<uint32_t> executor_to_cpu_assignments;
|
||||
|
||||
bool parallel_mode = false;
|
||||
size_t cores = 2;
|
||||
|
||||
// rclcpp::Node::SharedPtr node;
|
||||
// nodes to host the callbacks. In parallel (one executor using multiple threads) mode, there is one node and one executor. The node is provided by the experiment provider
|
||||
// in singlethread (each executor uses one thread), there is one node per executor. Additional nodes may be created by the experiment host
|
||||
std::vector<rclcpp::Node::SharedPtr> nodes;
|
||||
std::vector<std::string> special_cases = {};
|
||||
};
|
||||
|
||||
struct experiment_executor
|
||||
{
|
||||
std::shared_ptr<timed_executor::TimedExecutor> executor;
|
||||
std::shared_ptr<PriorityMemoryStrategy<>> strat;
|
||||
|
||||
std::shared_ptr<rclcpp::Executor> default_executor;
|
||||
};
|
||||
|
||||
class Experiment
|
||||
{
|
||||
public:
|
||||
Experiment(ExperimentConfig config);
|
||||
|
||||
/**
|
||||
* @brief Run the experiment
|
||||
* runs in the current thread
|
||||
* @param should_do_task a flag that can be set to false to stop the experiment. The flag will be set to false when the experiment is complete.
|
||||
*/
|
||||
std::string run(std::atomic<bool> &should_do_task);
|
||||
|
||||
|
||||
/**
|
||||
* @brief Get the functions required to run the experiment
|
||||
* returns a vector of functions. Each function will run a different executor. It's up to the caller to run these functions in different threads.
|
||||
* The caller may set the thread properties (e.g. priority) before running the function.
|
||||
* Each function accepts a flag that can be set to false to stop the experiment. The flag will be set to false when the experiment is complete.
|
||||
*/
|
||||
std::vector<std::function<void(std::atomic<bool> &)>> getRunFunctions();
|
||||
|
||||
static size_t numExecutorsRequired(const ExperimentConfig &config);
|
||||
|
||||
/**
|
||||
* get the executor (and strategy) for a given executor index (0 by default)
|
||||
*/
|
||||
experiment_executor getExecutor(int executor_idx = 0);
|
||||
|
||||
node_time_logger getInternalLogger();
|
||||
|
||||
/**
|
||||
* collect logs from the executors. Also add any logs from passed in loggers
|
||||
* @param optional array of loggers to add to the logs
|
||||
*/
|
||||
std::string buildLogs(std::vector<node_time_logger> loggers = {});
|
||||
|
||||
void resetTimers();
|
||||
|
||||
void writeLogsToFile(std::string filename, std::string experiment_name);
|
||||
|
||||
private:
|
||||
ExperimentConfig config;
|
||||
std::vector<experiment_executor> executors;
|
||||
|
||||
// values to keep track of runtime values
|
||||
|
||||
// chain-indexed vector of deques of deadlines
|
||||
std::vector<std::deque<uint64_t> *> chain_deadlines_deque;
|
||||
// maps node_id to node
|
||||
std::map<uint32_t, std::shared_ptr<CaseStudyNode>> nodes;
|
||||
// maps chain_id to timer
|
||||
std::map<uint32_t, std::shared_ptr<rclcpp::TimerBase>> chain_timer_handles;
|
||||
|
||||
/**
|
||||
* @brief Create the executors using the type and amount set in the config
|
||||
*
|
||||
*/
|
||||
void createExecutors();
|
||||
|
||||
/**
|
||||
* @brief Create a single executor using the type set in the config
|
||||
*
|
||||
* @return experiment_executor
|
||||
*/
|
||||
experiment_executor createSingleExecutor(uint executor_num = 0);
|
||||
|
||||
/**
|
||||
* @brief Get the executor for a given node id
|
||||
* @param node_id if the node_executor_assignments is empty, then this parameter is ignored, and the first executor is returned
|
||||
*/
|
||||
experiment_executor getExecutorForNode(int node_id=0);
|
||||
|
||||
void createNodesAndAssignProperties();
|
||||
|
||||
|
||||
void setInitialDeadlines();
|
||||
|
||||
|
||||
node_time_logger _logger;
|
||||
};
|
||||
|
||||
int do_other_task(double work_time, uint period, std::atomic<bool> &should_do_task);
|
||||
void sanity_check_config(ExperimentConfig config);
|
||||
|
||||
#endif // RTIS_EXPERIMENT_HOST
|
22
src/casestudy_tools/include/casestudy_tools/primes_workload.hpp
Executable file
22
src/casestudy_tools/include/casestudy_tools/primes_workload.hpp
Executable file
|
@ -0,0 +1,22 @@
|
|||
#ifndef RTIS_PRIMES_WORKLOAD
|
||||
#define RTIS_PRIMES_WORKLOAD
|
||||
#include <time.h>
|
||||
#include <sys/time.h>
|
||||
#include <thread>
|
||||
#include <cstdio>
|
||||
typedef double ktimeunit;
|
||||
ktimeunit nth_prime_silly(double millis = 100);
|
||||
void dummy_load(int load_ms);
|
||||
// create a global variable to store the calibration value
|
||||
class DummyLoadCalibration
|
||||
{
|
||||
public:
|
||||
static int getCalibration();
|
||||
static void setCalibration(int calib);
|
||||
|
||||
private:
|
||||
static int calibration;
|
||||
};
|
||||
|
||||
ktimeunit get_thread_time(struct timespec *currTime);
|
||||
#endif
|
61
src/casestudy_tools/include/casestudy_tools/test_nodes.hpp
Executable file
61
src/casestudy_tools/include/casestudy_tools/test_nodes.hpp
Executable file
|
@ -0,0 +1,61 @@
|
|||
#ifndef RTIS_TEST_NODES
|
||||
#define RTIS_TEST_NODES
|
||||
#include <string>
|
||||
#include "rclcpp/rclcpp.hpp"
|
||||
#include "std_msgs/msg/string.hpp"
|
||||
#include "std_msgs/msg/int32.hpp"
|
||||
#include "simple_timer/rt-sched.hpp"
|
||||
|
||||
class CaseStudyNode
|
||||
{
|
||||
public: node_time_logger logger;
|
||||
// inherit constructor
|
||||
CaseStudyNode(std::string publish_topic, rclcpp::Node::SharedPtr node)
|
||||
{
|
||||
this->name = publish_topic;
|
||||
this->node = node;
|
||||
// seed random number generator
|
||||
srand(time(NULL));
|
||||
}
|
||||
|
||||
bool use_random_runtime = false;
|
||||
double normal_runtime_scale = 0.8;
|
||||
double over_runtime_scale = 1.2;
|
||||
double runtime_over_chance = 0.1;
|
||||
std::string get_name();
|
||||
rclcpp::CallbackGroup::SharedPtr callback_group_;
|
||||
|
||||
private:
|
||||
rclcpp::Node::SharedPtr node;
|
||||
std::string name;
|
||||
};
|
||||
|
||||
#define COUNT_MAX 500
|
||||
|
||||
class PublisherNode : public CaseStudyNode{
|
||||
public:
|
||||
PublisherNode(std::string publish_topic, double runtime, int period, int chain, rclcpp::Node::SharedPtr node, rclcpp::CallbackGroup::SharedPtr callback_group = nullptr);
|
||||
rclcpp::TimerBase::SharedPtr timer_;
|
||||
int count_max_ = COUNT_MAX;
|
||||
|
||||
private:
|
||||
rclcpp::Publisher<std_msgs::msg::Int32>::SharedPtr pub_;
|
||||
double runtime_;
|
||||
int period_;
|
||||
int chain_;
|
||||
};
|
||||
|
||||
class WorkerNode : public CaseStudyNode{
|
||||
public:
|
||||
WorkerNode(std::string subscribe_topic, std::string publish_topic, double runtime, int period, int chain, rclcpp::Node::SharedPtr node, rclcpp::CallbackGroup::SharedPtr callback_group = nullptr);
|
||||
rclcpp::Subscription<std_msgs::msg::Int32>::SharedPtr sub_;
|
||||
int count_max_ = COUNT_MAX;
|
||||
|
||||
private:
|
||||
double runtime_;
|
||||
int period_;
|
||||
int chain_;
|
||||
rclcpp::Publisher<std_msgs::msg::Int32>::SharedPtr pub_;
|
||||
};
|
||||
|
||||
#endif // RTIS_TEST_NODES
|
22
src/casestudy_tools/package.xml
Executable file
22
src/casestudy_tools/package.xml
Executable file
|
@ -0,0 +1,22 @@
|
|||
<?xml version="1.0"?>
|
||||
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
|
||||
<package format="3">
|
||||
<name>casestudy_tools</name>
|
||||
<version>0.0.0</version>
|
||||
<description>TODO: Package description</description>
|
||||
<maintainer email="kurt4wilson@gmail.com">vboxuser</maintainer>
|
||||
<license>TODO: License declaration</license>
|
||||
|
||||
<buildtool_depend>ament_cmake</buildtool_depend>
|
||||
|
||||
<depend>rclcpp</depend>
|
||||
<depend>priority_executor</depend>
|
||||
<depend>simple_timer</depend>
|
||||
|
||||
<test_depend>ament_lint_auto</test_depend>
|
||||
<test_depend>ament_lint_common</test_depend>
|
||||
|
||||
<export>
|
||||
<build_type>ament_cmake</build_type>
|
||||
</export>
|
||||
</package>
|
612
src/casestudy_tools/src/experiment.cpp
Executable file
612
src/casestudy_tools/src/experiment.cpp
Executable file
|
@ -0,0 +1,612 @@
|
|||
#include "casestudy_tools/experiment.hpp"
|
||||
#include "casestudy_tools/primes_workload.hpp"
|
||||
#include "casestudy_tools/test_nodes.hpp"
|
||||
#include "priority_executor/multithread_priority_executor.hpp"
|
||||
#include "priority_executor/priority_executor.hpp"
|
||||
#include "priority_executor/priority_memory_strategy.hpp"
|
||||
|
||||
#include <cstddef>
|
||||
#include <deque>
|
||||
#include <fstream>
|
||||
#include <iostream>
|
||||
#include <memory>
|
||||
#include <sys/prctl.h>
|
||||
#include <vector>
|
||||
|
||||
Experiment::Experiment(ExperimentConfig config) : config(config) {
|
||||
_logger = create_logger();
|
||||
}
|
||||
|
||||
void sanity_check_config(ExperimentConfig config) {
|
||||
// make sure chain_lengths and node_ids are the same size
|
||||
if (config.chain_lengths.size() != config.node_ids.size()) {
|
||||
std::cout << "ros_experiment: chain_lengths.size()= "
|
||||
<< config.chain_lengths.size()
|
||||
<< " != node_ids.size()= " << config.node_ids.size() << std::endl;
|
||||
exit(1);
|
||||
}
|
||||
// make sure each chain_lengths is the same size as the corresponding node_ids
|
||||
for (uint32_t i = 0; i < config.chain_lengths.size(); i++) {
|
||||
if (config.chain_lengths[i] != config.node_ids[i].size()) {
|
||||
std::cout << "ros_experiment: chain_lengths[" << i
|
||||
<< "]= " << config.chain_lengths[i] << " != node_ids[" << i
|
||||
<< "].size()= " << config.node_ids[i].size() << std::endl;
|
||||
exit(1);
|
||||
}
|
||||
}
|
||||
// make sure chain_timer_control is the same size as chain_lengths
|
||||
if (config.chain_timer_control.size() != config.chain_lengths.size()) {
|
||||
std::cout << "ros_experiment: chain_timer_control.size()= "
|
||||
<< config.chain_timer_control.size()
|
||||
<< " != chain_lengths.size()= " << config.chain_lengths.size()
|
||||
<< std::endl;
|
||||
exit(1);
|
||||
}
|
||||
std::set<int> all_node_ids = std::set<int>();
|
||||
for (uint32_t i = 0; i < config.node_ids.size(); i++) {
|
||||
for (uint32_t j = 0; j < config.node_ids[i].size(); j++) {
|
||||
all_node_ids.insert(config.node_ids[i][j]);
|
||||
}
|
||||
}
|
||||
// make sure we have the right number of node_priorities and node_runtimes
|
||||
if (all_node_ids.size() != config.node_priorities.size()) {
|
||||
std::cout << "ros_experiment: all_node_ids.size()= " << all_node_ids.size()
|
||||
<< " != node_priorities.size()= " << config.node_priorities.size()
|
||||
<< std::endl;
|
||||
exit(1);
|
||||
}
|
||||
if (all_node_ids.size() != config.node_runtimes.size()) {
|
||||
std::cout << "ros_experiment: all_node_ids.size()= " << all_node_ids.size()
|
||||
<< " != node_runtimes.size()= " << config.node_runtimes.size()
|
||||
<< std::endl;
|
||||
exit(1);
|
||||
}
|
||||
}
|
||||
|
||||
std::string Experiment::run(std::atomic<bool> &should_do_task) {
|
||||
// TODO: split into setup and run, so that run can be re-used in
|
||||
// Experiment::getRunFunctions
|
||||
if (config.node_executor_assignments.size() != 0 ||
|
||||
numExecutorsRequired(config) != 1) {
|
||||
std::cerr
|
||||
<< "called Experiment::run with node_executor_assignments.size() != 0"
|
||||
<< std::endl;
|
||||
return "";
|
||||
}
|
||||
|
||||
createExecutors();
|
||||
|
||||
createNodesAndAssignProperties();
|
||||
|
||||
setInitialDeadlines();
|
||||
// reset all timers
|
||||
resetTimers();
|
||||
// in this function (run), there is only one executor, so retrieve it now
|
||||
experiment_executor executor = getExecutorForNode();
|
||||
if (config.executor_type == "edf" || config.executor_type == "picas") {
|
||||
executor.executor->spin();
|
||||
} else if (config.executor_type == "default") {
|
||||
executor.default_executor->spin();
|
||||
}
|
||||
std::cout << "spin done" << std::endl;
|
||||
should_do_task.store(false);
|
||||
|
||||
return buildLogs();
|
||||
}
|
||||
|
||||
std::vector<std::function<void(std::atomic<bool> &)>>
|
||||
Experiment::getRunFunctions() {
|
||||
// do pre-run setup
|
||||
createExecutors();
|
||||
createNodesAndAssignProperties();
|
||||
|
||||
// create a function for each executor
|
||||
std::vector<std::function<void(std::atomic<bool> &)>> run_functions;
|
||||
for (size_t i = 0; i < numExecutorsRequired(config); i++) {
|
||||
// get the executor for this function
|
||||
experiment_executor executor = executors[i];
|
||||
run_functions.push_back([this,
|
||||
executor](std::atomic<bool> &should_do_task) {
|
||||
if (config.executor_type == "edf" || config.executor_type == "picas") {
|
||||
executor.executor->spin();
|
||||
} else if (config.executor_type == "default") {
|
||||
executor.default_executor->spin();
|
||||
}
|
||||
std::cout << "spin done" << std::endl;
|
||||
should_do_task.store(false);
|
||||
});
|
||||
}
|
||||
|
||||
// do these last, since they're time-sensitive
|
||||
setInitialDeadlines();
|
||||
// resetTimers();
|
||||
|
||||
return run_functions;
|
||||
}
|
||||
|
||||
size_t Experiment::numExecutorsRequired(const ExperimentConfig &config) {
|
||||
// get the number of unique values in the node_executor_assignments
|
||||
if (config.node_executor_assignments.size() == 0) {
|
||||
return 1;
|
||||
}
|
||||
std::set<int> unique_executors;
|
||||
for (auto &assignment : config.node_executor_assignments) {
|
||||
unique_executors.insert(assignment);
|
||||
}
|
||||
return unique_executors.size();
|
||||
}
|
||||
|
||||
experiment_executor Experiment::getExecutor(int executor_idx) {
|
||||
return executors[executor_idx];
|
||||
}
|
||||
|
||||
void Experiment::createExecutors() {
|
||||
if (config.node_executor_assignments.size() == 0) {
|
||||
// create a single executor
|
||||
executors.push_back(createSingleExecutor(0));
|
||||
return;
|
||||
}
|
||||
|
||||
// depending on config.executor_to_cpu_assignments and config.parallel_mode,
|
||||
// we may need to create the host nodes
|
||||
if (config.executor_to_cpu_assignments.size() != 0 && config.parallel_mode) {
|
||||
std::cerr << "executor_to_cpu_assignments not supported for parallel mode"
|
||||
<< std::endl;
|
||||
}
|
||||
|
||||
if (config.executor_to_cpu_assignments.size() != 0 &&
|
||||
config.executor_to_cpu_assignments.size() !=
|
||||
numExecutorsRequired(config)) {
|
||||
std::cerr
|
||||
<< "executor_to_cpu_assignments.size() != numExecutorsRequired(config)"
|
||||
<< std::endl;
|
||||
exit(1);
|
||||
}
|
||||
|
||||
// create the required host nodes - re-use any existing nodes
|
||||
for (uint i = config.nodes.size();
|
||||
i < config.executor_to_cpu_assignments.size(); i++) {
|
||||
// create a node for each executor
|
||||
rclcpp::Node::SharedPtr node =
|
||||
std::make_shared<rclcpp::Node>("node_" + std::to_string(i));
|
||||
config.nodes.push_back(node);
|
||||
}
|
||||
std::cout << "created " << config.nodes.size() << " nodes" << std::endl;
|
||||
|
||||
int num_executors = numExecutorsRequired(config);
|
||||
std::cout << "creating " << num_executors << " executors" << std::endl;
|
||||
for (int i = 0; i < num_executors; i++) {
|
||||
executors.push_back(createSingleExecutor(i));
|
||||
}
|
||||
}
|
||||
|
||||
experiment_executor Experiment::createSingleExecutor(uint executor_num) {
|
||||
experiment_executor executor = {nullptr, nullptr, nullptr};
|
||||
|
||||
if (config.executor_type == "edf" || config.executor_type == "picas") {
|
||||
executor.strat = std::make_shared<PriorityMemoryStrategy<>>();
|
||||
rclcpp::ExecutorOptions options;
|
||||
options.memory_strategy = executor.strat;
|
||||
if (config.parallel_mode) {
|
||||
executor.executor =
|
||||
std::make_shared<timed_executor::MultithreadTimedExecutor>(
|
||||
options, "priority_executor", config.cores);
|
||||
} else {
|
||||
executor.executor =
|
||||
std::make_shared<timed_executor::TimedExecutor>(options);
|
||||
}
|
||||
executor.executor->prio_memory_strategy_ = executor.strat;
|
||||
if (config.node_executor_assignments.size() == 0) {
|
||||
// add all nodes to the executor
|
||||
for (auto &node : config.nodes) {
|
||||
executor.executor->add_node(node);
|
||||
}
|
||||
} else {
|
||||
// add only this executor's nodes
|
||||
executor.executor->add_node(config.nodes[executor_num]);
|
||||
}
|
||||
} else if (config.executor_type == "default") {
|
||||
if (config.parallel_mode) {
|
||||
// executor.default_executor =
|
||||
// std::make_shared<ROSDefaultMultithreadedExecutor>(rclcpp::ExecutorOptions(),
|
||||
// config.cores);
|
||||
executor.default_executor =
|
||||
std::make_shared<rclcpp::executors::MultiThreadedExecutor>(
|
||||
rclcpp::ExecutorOptions(), config.cores);
|
||||
} else {
|
||||
executor.default_executor = std::make_shared<ROSDefaultExecutor>();
|
||||
}
|
||||
if (config.node_executor_assignments.size() == 0) {
|
||||
// add all nodes to the executor
|
||||
for (auto &node : config.nodes) {
|
||||
executor.default_executor->add_node(node);
|
||||
}
|
||||
} else {
|
||||
// add only this executor's nodes
|
||||
executor.default_executor->add_node(config.nodes[executor_num]);
|
||||
}
|
||||
} else {
|
||||
// RCLCPP_ERROR(node->get_logger(), "Unknown executor type: %s",
|
||||
// executor_type.c_str());
|
||||
return executor;
|
||||
}
|
||||
return executor;
|
||||
}
|
||||
|
||||
experiment_executor Experiment::getExecutorForNode(int node_id) {
|
||||
if (config.node_executor_assignments.size() == 0) {
|
||||
return executors[0];
|
||||
} else {
|
||||
int executor_idx = config.node_executor_assignments[node_id];
|
||||
std::cout << "node " << node_id << " assigned to executor " << executor_idx
|
||||
<< std::endl;
|
||||
return executors[executor_idx];
|
||||
}
|
||||
}
|
||||
|
||||
void Experiment::createNodesAndAssignProperties() {
|
||||
|
||||
// BEGIN SPECIAL CASES
|
||||
std::vector<std::vector<std::shared_ptr<CaseStudyNode>>> all_nodes;
|
||||
rclcpp::CallbackGroup::SharedPtr cb_group0;
|
||||
// LEGACY GROUP BEHAVIOR
|
||||
bool jiang2022_cb = false;
|
||||
bool jiang2022_cs2 = false;
|
||||
// one-to-many test. TODO: add generic api for multiple one->many
|
||||
bool ours2024_latency = false;
|
||||
for (auto &special_case : config.special_cases) {
|
||||
if (special_case == "jiang2022_cb") {
|
||||
jiang2022_cb = true;
|
||||
} else if (special_case == "2022jiang2") {
|
||||
jiang2022_cs2 = true;
|
||||
} else if (special_case == "2024ours_latency") {
|
||||
std::cout << "using 2024ours_latency special case" << std::endl;
|
||||
ours2024_latency = true;
|
||||
}
|
||||
}
|
||||
if (jiang2022_cb || jiang2022_cs2) {
|
||||
// cb_group0 =
|
||||
// config.node->create_callback_group(rclcpp::CallbackGroupType::MutuallyExclusive);
|
||||
}
|
||||
// END SPECIAL CASES
|
||||
|
||||
std::vector<rclcpp::CallbackGroup::SharedPtr> cb_groups;
|
||||
if (config.num_groups != 0 &&
|
||||
config.executor_to_cpu_assignments.size() != 0) {
|
||||
std::cout << "WARNING: num_groups and executor_to_cpu_assignments are both "
|
||||
"set. This is not supported. Ignoring num_groups."
|
||||
<< std::endl;
|
||||
}
|
||||
for (uint32_t i = 0; i < config.num_groups; i++) {
|
||||
cb_groups.push_back(config.nodes[0]->create_callback_group(
|
||||
rclcpp::CallbackGroupType::MutuallyExclusive));
|
||||
}
|
||||
|
||||
// for each chain
|
||||
for (uint32_t chain_id = 0; chain_id < config.node_ids.size(); chain_id++) {
|
||||
all_nodes.push_back(std::vector<std::shared_ptr<CaseStudyNode>>());
|
||||
|
||||
// for each node in the chain
|
||||
bool first_node = true;
|
||||
for (uint32_t node_chain_idx = 0;
|
||||
node_chain_idx < config.node_ids[chain_id].size(); node_chain_idx++) {
|
||||
uint32_t node_id = config.node_ids[chain_id][node_chain_idx];
|
||||
// has this node been created yet?
|
||||
if (nodes.find(node_id) != nodes.end()) {
|
||||
// if it has, then this node exists in another chain
|
||||
// if it's a timer node, we should add it to the timer handle map
|
||||
continue;
|
||||
}
|
||||
experiment_executor executor = getExecutorForNode(node_id);
|
||||
std::shared_ptr<const void> handle;
|
||||
ExecutableType type;
|
||||
// is this the first node in the chain?
|
||||
if (node_chain_idx == 0) {
|
||||
// create a timer node
|
||||
rclcpp::CallbackGroup::SharedPtr cb_group = nullptr;
|
||||
if (jiang2022_cs2) {
|
||||
std::cout << "jiang2022_cs2: placing all nodes in cb_group0"
|
||||
<< std::endl;
|
||||
cb_group = cb_group0;
|
||||
}
|
||||
if (config.num_groups != 0 && config.group_memberships[node_id] != 0) {
|
||||
cb_group = cb_groups[config.group_memberships[node_id] - 1];
|
||||
}
|
||||
std::shared_ptr<PublisherNode> node;
|
||||
if (config.node_executor_assignments.size() == 0) {
|
||||
node = std::make_shared<PublisherNode>(
|
||||
"node_" + std::to_string(node_id), config.node_runtimes[node_id],
|
||||
config.chain_periods[chain_id], chain_id, config.nodes[0],
|
||||
cb_group);
|
||||
} else {
|
||||
node = std::make_shared<PublisherNode>(
|
||||
"node_" + std::to_string(node_id), config.node_runtimes[node_id],
|
||||
config.chain_periods[chain_id], chain_id,
|
||||
config.nodes[config.node_executor_assignments[node_id]],
|
||||
cb_group);
|
||||
}
|
||||
handle = node->timer_->get_timer_handle();
|
||||
nodes[node_id] = node;
|
||||
type = ExecutableType::TIMER;
|
||||
// this_chain_timer_handle = node->timer_;
|
||||
std::cout << "chain timer handle set: " << chain_id << std::endl;
|
||||
chain_timer_handles[chain_id] = node->timer_;
|
||||
|
||||
all_nodes[chain_id].push_back(node);
|
||||
} else {
|
||||
// create a subscriber node
|
||||
int subscription_id = config.node_ids[chain_id][node_chain_idx - 1];
|
||||
rclcpp::CallbackGroup::SharedPtr cb_group;
|
||||
if (jiang2022_cb && ((chain_id == 1 && node_chain_idx == 1) ||
|
||||
(chain_id == 2 && node_chain_idx == 1))) {
|
||||
cb_group = cb_group0;
|
||||
std::cout << "adding c: " << chain_id << " n: " << node_id
|
||||
<< " to cb group 0" << std::endl;
|
||||
} else {
|
||||
cb_group = nullptr;
|
||||
}
|
||||
|
||||
if (config.num_groups != 0 && config.group_memberships[node_id] != 0) {
|
||||
cb_group = cb_groups[config.group_memberships[node_id] - 1];
|
||||
}
|
||||
|
||||
std::shared_ptr<WorkerNode> node;
|
||||
if (ours2024_latency) {
|
||||
uint32_t first_node_id = config.node_ids[chain_id][0];
|
||||
node = std::make_shared<WorkerNode>(
|
||||
"node_" + std::to_string(first_node_id),
|
||||
"node_" + std::to_string(node_id), config.node_runtimes[node_id],
|
||||
config.chain_periods[chain_id], chain_id, config.nodes[0],
|
||||
cb_group);
|
||||
} else if (config.node_executor_assignments.size() == 0) {
|
||||
node = std::make_shared<WorkerNode>(
|
||||
"node_" + std::to_string(subscription_id),
|
||||
"node_" + std::to_string(node_id), config.node_runtimes[node_id],
|
||||
config.chain_periods[chain_id], chain_id, config.nodes[0],
|
||||
cb_group);
|
||||
} else {
|
||||
node = std::make_shared<WorkerNode>(
|
||||
"node_" + std::to_string(subscription_id),
|
||||
"node_" + std::to_string(node_id), config.node_runtimes[node_id],
|
||||
config.chain_periods[chain_id], chain_id,
|
||||
config.nodes[config.node_executor_assignments[node_id]],
|
||||
cb_group);
|
||||
}
|
||||
handle = node->sub_->get_subscription_handle();
|
||||
nodes[node_id] = node;
|
||||
type = ExecutableType::SUBSCRIPTION;
|
||||
|
||||
all_nodes[chain_id].push_back(node);
|
||||
}
|
||||
if (config.executor_type == "edf") {
|
||||
std::string cb_name = "node_" + std::to_string(node_id) + "_cb";
|
||||
executor.strat->set_executable_deadline(
|
||||
handle, config.chain_periods[chain_id], type, chain_id, cb_name);
|
||||
// executor.executor->add_node(nodes[node_id]);
|
||||
// make sure timer handle exists. if not, we did something wrong
|
||||
if (chain_timer_handles.find(config.chain_timer_control[chain_id]) ==
|
||||
chain_timer_handles.end()) {
|
||||
std::cerr << "chain timer handle not found for chain " << chain_id
|
||||
<< ": " << config.chain_timer_control[chain_id]
|
||||
<< std::endl;
|
||||
}
|
||||
executor.strat->get_priority_settings(handle)->timer_handle =
|
||||
chain_timer_handles[config.chain_timer_control[chain_id]];
|
||||
executor.strat->get_priority_settings(handle)->priority =
|
||||
config.node_priorities[node_id]; // use priority as tiebreaker
|
||||
if (first_node) {
|
||||
executor.strat->set_first_in_chain(handle);
|
||||
first_node = false;
|
||||
}
|
||||
// is this the last node in the chain?
|
||||
if (node_chain_idx == config.node_ids[chain_id].size() - 1) {
|
||||
executor.strat->set_last_in_chain(handle);
|
||||
}
|
||||
} else if (config.executor_type == "picas") {
|
||||
// executor.executor->add_node(nodes[node_id]);
|
||||
executor.strat->set_executable_priority(
|
||||
handle, config.node_priorities[node_id], type,
|
||||
ExecutableScheduleType::CHAIN_AWARE_PRIORITY, chain_id);
|
||||
std::cout << "node " << node_id << " has priority "
|
||||
<< config.node_priorities[node_id] << std::endl;
|
||||
} else if (config.executor_type == "default") {
|
||||
// executor.default_executor->add_node(nodes[node_id]);
|
||||
}
|
||||
// TODO: other types
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
std::string Experiment::buildLogs(std::vector<node_time_logger> node_logs) {
|
||||
std::stringstream output_file;
|
||||
|
||||
output_file << "[" << std::endl;
|
||||
// for each node
|
||||
for (auto node : nodes) {
|
||||
// print the node's logs
|
||||
node_time_logger logger = node.second->logger;
|
||||
for (auto &log : *(logger.recorded_times)) {
|
||||
output_file << "{\"entry\": " << log.first
|
||||
<< ", \"time\": " << log.second << "}," << std::endl;
|
||||
}
|
||||
}
|
||||
|
||||
if (config.executor_type == "edf" || config.executor_type == "picas") {
|
||||
for (auto &executor : executors) {
|
||||
node_time_logger strat_log = executor.strat->logger_;
|
||||
for (auto &log : *(strat_log.recorded_times)) {
|
||||
// std::cout << log.first << " " << log.second << std::endl;
|
||||
output_file << "{\"entry\": " << log.first
|
||||
<< ", \"time\": " << log.second << "}," << std::endl;
|
||||
}
|
||||
|
||||
// executor may be a subclass of RTISTimed, so we can get its logger
|
||||
if (dynamic_cast<RTISTimed *>(executor.executor.get())) {
|
||||
node_time_logger executor_log =
|
||||
dynamic_cast<RTISTimed *>(executor.executor.get())->logger_;
|
||||
for (auto &log : *(executor_log.recorded_times)) {
|
||||
output_file << "{\"entry\": " << log.first
|
||||
<< ", \"time\": " << log.second << "}," << std::endl;
|
||||
}
|
||||
} else {
|
||||
std::cout << "executor is not RTISTimed" << std::endl;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// a default executor
|
||||
for (auto &executor : executors) {
|
||||
// experimental: executor.default_executor's type is rclcpp::Executor, but
|
||||
// here it's guaranteed to inherit from RTISTimed we can get a logger from
|
||||
// it. for safety, we check the type first
|
||||
if (dynamic_cast<RTISTimed *>(executor.default_executor.get())) {
|
||||
node_time_logger default_log =
|
||||
dynamic_cast<RTISTimed *>(executor.default_executor.get())->logger_;
|
||||
for (auto &log : *(default_log.recorded_times)) {
|
||||
output_file << "{\"entry\": " << log.first
|
||||
<< ", \"time\": " << log.second << "}," << std::endl;
|
||||
}
|
||||
std::cout << "recovered logs from default executor" << std::endl;
|
||||
} else {
|
||||
std::cout << "default executor is not RTISTimed" << std::endl;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// add internal logs
|
||||
for (auto &log_entry : *(_logger.recorded_times)) {
|
||||
output_file << "{\"entry\": " << log_entry.first
|
||||
<< ", \"time\": " << log_entry.second << "}," << std::endl;
|
||||
}
|
||||
|
||||
// add logs from argument
|
||||
for (auto &log : node_logs) {
|
||||
for (auto &log_entry : *(log.recorded_times)) {
|
||||
output_file << "{\"entry\": " << log_entry.first
|
||||
<< ", \"time\": " << log_entry.second << "}," << std::endl;
|
||||
}
|
||||
}
|
||||
// remove the last comma
|
||||
output_file.seekp(-2, std::ios_base::end);
|
||||
output_file << "]" << std::endl;
|
||||
|
||||
return output_file.str();
|
||||
}
|
||||
|
||||
void Experiment::setInitialDeadlines() {
|
||||
timespec current_time;
|
||||
clock_gettime(CLOCK_MONOTONIC_RAW, ¤t_time);
|
||||
uint64_t millis =
|
||||
(current_time.tv_sec * 1000UL) + (current_time.tv_nsec / 1000000);
|
||||
// we need a logger to record the first deadline, which is generated here
|
||||
// TODO: generate the first deadline in the executor (or node) itself
|
||||
if (config.executor_type == "edf") {
|
||||
for (uint32_t chain_index = 0; chain_index < config.chain_lengths.size();
|
||||
chain_index++) {
|
||||
size_t executor_idx;
|
||||
if (config.node_executor_assignments.size() == 0) {
|
||||
executor_idx = 0;
|
||||
} else {
|
||||
executor_idx = config.node_executor_assignments[chain_index];
|
||||
}
|
||||
std::shared_ptr<std::deque<uint64_t>> deadlines =
|
||||
executors[executor_idx].strat->get_chain_deadlines(chain_index);
|
||||
deadlines->push_back(millis + config.chain_periods[chain_index] * 2);
|
||||
std::stringstream oss;
|
||||
oss << "{\"operation\": \"next_deadline\", \"chain_id\": " << chain_index
|
||||
<< ", \"deadline\": "
|
||||
<< millis + config.chain_periods[chain_index] * 2 << "}";
|
||||
// std::cout<<"initial deadline: "<<millis +
|
||||
// config.chain_periods[chain_index] * 2<<std::endl; std::cout<<"current
|
||||
// time: "<<millis<<std::endl; log_entry(executor.strat->logger_,
|
||||
// oss.str());
|
||||
log_entry(_logger, oss.str());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void Experiment::resetTimers() {
|
||||
for (auto timer : chain_timer_handles) {
|
||||
std::cout << "resetting timer " << timer.first << std::endl;
|
||||
if (timer.second == nullptr) {
|
||||
std::cout << "timer is null" << std::endl;
|
||||
} else {
|
||||
timer.second->reset();
|
||||
/* int64_t old_period;
|
||||
// set the period to 0 for force immediate execution
|
||||
rcl_timer_exchange_period(timer.second->get_timer_handle().get(), 0,
|
||||
&old_period);
|
||||
// timer.second->get_timer_handle() */
|
||||
}
|
||||
}
|
||||
// sleep 1 seconds to let every timer reset
|
||||
// std::this_thread::sleep_for(std::chrono::seconds(1));
|
||||
}
|
||||
|
||||
void Experiment::writeLogsToFile(std::string file_name,
|
||||
std::string experiment_name) {
|
||||
std::ofstream output_file;
|
||||
std::string name = experiment_name;
|
||||
// make sure results directory exists. we don't have access to std::filesystem
|
||||
// here :(
|
||||
std::string command = "mkdir -p results/" + name;
|
||||
int result = system(command.c_str());
|
||||
if (result != 0) {
|
||||
std::cout << "could not create results directory: " << result << ": "
|
||||
<< strerror(errno) << std::endl;
|
||||
}
|
||||
output_file.open("results/" + name + "/" + file_name + "_" +
|
||||
this->config.executor_type + ".json");
|
||||
output_file << buildLogs();
|
||||
output_file.close();
|
||||
std::cout << "results written to results/" + name + "/" + file_name + "_" +
|
||||
this->config.executor_type + ".json"
|
||||
<< std::endl;
|
||||
}
|
||||
|
||||
int do_other_task(double work_time, uint period,
|
||||
std::atomic<bool> &should_do_task) {
|
||||
if (work_time == 0) {
|
||||
std::cout << "other task: work time is 0, exiting" << std::endl;
|
||||
return 0;
|
||||
}
|
||||
// pin to CPU 0 with highest priority
|
||||
cpu_set_t cpuset;
|
||||
CPU_ZERO(&cpuset);
|
||||
CPU_SET(0, &cpuset);
|
||||
int result = sched_setaffinity(0, sizeof(cpu_set_t), &cpuset);
|
||||
if (result != 0) {
|
||||
std::cout << "other task: could not set affinity: " << result << ": "
|
||||
<< strerror(errno) << std::endl;
|
||||
}
|
||||
|
||||
struct sched_param param;
|
||||
param.sched_priority = sched_get_priority_max(SCHED_FIFO) - 1;
|
||||
sched_setscheduler(0, SCHED_FIFO, ¶m);
|
||||
if (result != 0) {
|
||||
std::cout << "other task: could not set scheduler: " << result << ": "
|
||||
<< strerror(errno) << std::endl;
|
||||
}
|
||||
|
||||
prctl(PR_SET_NAME, "other_task", 0, 0, 0);
|
||||
|
||||
// do every period milliseconds
|
||||
do {
|
||||
timespec current_time;
|
||||
clock_gettime(CLOCK_MONOTONIC_RAW, ¤t_time);
|
||||
uint64_t millis_start = (current_time.tv_sec * (uint64_t)1000) +
|
||||
(current_time.tv_nsec / 1000000);
|
||||
|
||||
// do work
|
||||
// nth_prime_silly(work_time);
|
||||
dummy_load(work_time);
|
||||
|
||||
// sleep until next period
|
||||
clock_gettime(CLOCK_MONOTONIC_RAW, ¤t_time);
|
||||
uint64_t millis_end = (current_time.tv_sec * (uint64_t)1000) +
|
||||
(current_time.tv_nsec / 1000000);
|
||||
uint64_t sleep_time = period - (millis_end - millis_start);
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(sleep_time));
|
||||
} while (should_do_task);
|
||||
std::cout << "other task done" << std::endl;
|
||||
return 0;
|
||||
}
|
67
src/casestudy_tools/src/primes_workload.cpp
Executable file
67
src/casestudy_tools/src/primes_workload.cpp
Executable file
|
@ -0,0 +1,67 @@
|
|||
#include "casestudy_tools/primes_workload.hpp"
|
||||
#include <iostream>
|
||||
ktimeunit nth_prime_silly(double millis)
|
||||
{
|
||||
// struct tms this_thread_times;
|
||||
struct timespec currTime;
|
||||
int sum = 0;
|
||||
int i;
|
||||
int j;
|
||||
ktimeunit const start_cpu_time = get_thread_time(&currTime);
|
||||
ktimeunit last_iter_time = 0;
|
||||
ktimeunit last_iter_start_time = start_cpu_time;
|
||||
for (i = 2; i < 4294967296 - 1; i++)
|
||||
{
|
||||
// times(&this_thread_times);
|
||||
ktimeunit cum_time = get_thread_time(&currTime);
|
||||
last_iter_time = cum_time - last_iter_start_time;
|
||||
last_iter_start_time = cum_time;
|
||||
if ((cum_time - start_cpu_time + last_iter_time*2) > millis)
|
||||
{
|
||||
break;
|
||||
}
|
||||
for (j = 2; j < i; j++)
|
||||
{
|
||||
sum += j;
|
||||
}
|
||||
if (cum_time - start_cpu_time > millis)
|
||||
{
|
||||
std::cout << "Warning: Time limit exceeded" << std::endl;
|
||||
}
|
||||
}
|
||||
// std::cout<< "took " << (get_thread_time(&currTime) - start_cpu_time) << "ms, allowed " << millis << "ms" << std::endl;
|
||||
return get_thread_time(&currTime) - start_cpu_time;
|
||||
}
|
||||
ktimeunit get_thread_time(struct timespec *currTime)
|
||||
{
|
||||
// clockid_t threadClockId;
|
||||
// pthread_getcpuclockid(pthread_self(), &threadClockId);
|
||||
// clock_gettime(threadClockId, currTime);
|
||||
// return currTime->tv_nsec / 1000000.0 + currTime->tv_sec * 1000.0;
|
||||
|
||||
// use the wall clock instead
|
||||
clock_gettime(CLOCK_MONOTONIC_RAW, currTime);
|
||||
return currTime->tv_nsec / 1000000.0 + currTime->tv_sec * 1000.0;
|
||||
}
|
||||
|
||||
#define DUMMY_LOAD_ITER 100000
|
||||
|
||||
void dummy_load(int load_ms) {
|
||||
int i, j;
|
||||
int dummy_load_calib = DummyLoadCalibration::getCalibration();
|
||||
// dummy_load_calib is for 100ms
|
||||
dummy_load_calib = dummy_load_calib * load_ms / 100;
|
||||
for (j = 0; j < dummy_load_calib; j++)
|
||||
for (i = 0 ; i < DUMMY_LOAD_ITER; i++)
|
||||
__asm__ volatile ("nop");
|
||||
}
|
||||
|
||||
void DummyLoadCalibration::setCalibration(int calib) {
|
||||
DummyLoadCalibration::calibration = calib;
|
||||
}
|
||||
|
||||
int DummyLoadCalibration::getCalibration() {
|
||||
return DummyLoadCalibration::calibration;
|
||||
}
|
||||
|
||||
int DummyLoadCalibration::calibration = 1;
|
174
src/casestudy_tools/src/test_nodes.cpp
Executable file
174
src/casestudy_tools/src/test_nodes.cpp
Executable file
|
@ -0,0 +1,174 @@
|
|||
#include "casestudy_tools/test_nodes.hpp"
|
||||
#include "casestudy_tools/primes_workload.hpp"
|
||||
#include "rclcpp/rclcpp.hpp"
|
||||
#include "simple_timer/rt-sched.hpp"
|
||||
#include <cstddef>
|
||||
#include <ctime>
|
||||
#include <rclcpp/logger.hpp>
|
||||
|
||||
std::string CaseStudyNode::get_name() { return name; }
|
||||
|
||||
#define BUFFER_LENGTH 0
|
||||
|
||||
rclcpp::QoS get_qos()
|
||||
{
|
||||
if (BUFFER_LENGTH == 0)
|
||||
{
|
||||
return rclcpp::QoS(rclcpp::KeepAll());
|
||||
}
|
||||
else
|
||||
{
|
||||
return rclcpp::QoS(rclcpp::KeepLast(BUFFER_LENGTH));
|
||||
}
|
||||
}
|
||||
|
||||
void timespec_diff(timespec *start, timespec *end, timespec *result)
|
||||
{
|
||||
if ((end->tv_nsec - start->tv_nsec) < 0)
|
||||
{
|
||||
result->tv_sec = end->tv_sec - start->tv_sec - 1;
|
||||
result->tv_nsec = 1000000000 + end->tv_nsec - start->tv_nsec;
|
||||
}
|
||||
else
|
||||
{
|
||||
result->tv_sec = end->tv_sec - start->tv_sec;
|
||||
result->tv_nsec = end->tv_nsec - start->tv_nsec;
|
||||
}
|
||||
}
|
||||
|
||||
PublisherNode::PublisherNode(std::string publish_topic, double runtime,
|
||||
int period, int chain,
|
||||
rclcpp::Node::SharedPtr node,
|
||||
rclcpp::CallbackGroup::SharedPtr callback_group)
|
||||
: CaseStudyNode(publish_topic, node), runtime_(runtime), period_(period),
|
||||
chain_(chain)
|
||||
{
|
||||
logger = create_logger();
|
||||
pub_ = node->create_publisher<std_msgs::msg::Int32>(publish_topic, get_qos());
|
||||
auto timer_callback = [this]() -> void
|
||||
{
|
||||
std::ostringstream ss;
|
||||
ss << "{\"operation\": \"start_work\", \"chain\": " << chain_
|
||||
<< ", \"node\": \"" << get_name() << "\", \"count\": " << count_max_;
|
||||
std::chrono::nanoseconds time_until_trigger = timer_->time_until_trigger();
|
||||
std::chrono::microseconds time_until_trigger_us =
|
||||
std::chrono::duration_cast<std::chrono::microseconds>(
|
||||
time_until_trigger);
|
||||
ss << ", \"next_release_us\": " << time_until_trigger_us.count() << "}";
|
||||
log_entry(logger, ss.str());
|
||||
// nth_prime_silly(runtime_);
|
||||
double used_runtime = runtime_;
|
||||
if (use_random_runtime)
|
||||
{
|
||||
if (rand() % 100 < runtime_over_chance * 100)
|
||||
{
|
||||
used_runtime *= over_runtime_scale;
|
||||
}
|
||||
else
|
||||
{
|
||||
used_runtime *= normal_runtime_scale;
|
||||
}
|
||||
}
|
||||
// RCLCPP_INFO(rclcpp::get_logger(get_name()), "running %s with runtime %f",
|
||||
// get_name().c_str(), used_runtime);
|
||||
struct timespec start_time;
|
||||
clock_gettime(CLOCK_MONOTONIC, &start_time);
|
||||
|
||||
dummy_load(used_runtime);
|
||||
std_msgs::msg::Int32 msg;
|
||||
msg.data = count_max_;
|
||||
// RCLCPP_INFO(this->get_logger(), "Publishing: '%s'", msg.data.c_str());
|
||||
pub_->publish(msg);
|
||||
// TEST: can we take more than one msg from DDS?
|
||||
// pub_->publish(msg);
|
||||
ss.str("");
|
||||
ss << "{\"operation\": \"end_work\", \"chain\": " << chain_
|
||||
<< ", \"node\": \"" << get_name() << "\", \"count\": " << count_max_ << ", \"target_runtime\": " << runtime_
|
||||
<< "}";
|
||||
log_entry(logger, ss.str());
|
||||
count_max_--;
|
||||
if (count_max_ == 0)
|
||||
{
|
||||
rclcpp::shutdown();
|
||||
}
|
||||
/* struct timespec end_time;
|
||||
clock_gettime(CLOCK_MONOTONIC, &end_time);
|
||||
struct timespec diff;
|
||||
timespec_diff(&start_time, &end_time, &diff);
|
||||
size_t diff_ns = diff.tv_sec * 1000000000 + diff.tv_nsec;
|
||||
size_t diff_ms = diff_ns / 1000000;
|
||||
RCLCPP_INFO(rclcpp::get_logger(get_name()), "done running %s, took %lu ms",
|
||||
get_name().c_str(), diff_ms); */
|
||||
};
|
||||
if (callback_group != nullptr)
|
||||
{
|
||||
callback_group_ = callback_group;
|
||||
}
|
||||
else
|
||||
{
|
||||
callback_group_ =
|
||||
node->create_callback_group(rclcpp::CallbackGroupType::Reentrant);
|
||||
}
|
||||
timer_ = node->create_wall_timer(std::chrono::milliseconds(period),
|
||||
timer_callback, callback_group_);
|
||||
}
|
||||
|
||||
WorkerNode::WorkerNode(std::string subscribe_topic, std::string publish_topic,
|
||||
double runtime, int period, int chain,
|
||||
rclcpp::Node::SharedPtr node,
|
||||
rclcpp::CallbackGroup::SharedPtr callback_group)
|
||||
: CaseStudyNode(publish_topic, node), runtime_(runtime), period_(period),
|
||||
chain_(chain)
|
||||
{
|
||||
logger = create_logger();
|
||||
auto callback = [this](const std_msgs::msg::Int32::SharedPtr msg) -> void
|
||||
{
|
||||
// prevent unused variable warning
|
||||
(void)msg;
|
||||
|
||||
std::ostringstream ss;
|
||||
ss << "{\"operation\": \"start_work\", \"chain\": " << chain_
|
||||
<< ", \"node\": \"" << get_name() << "\", \"count\": " << msg->data
|
||||
<< "}";
|
||||
log_entry(logger, ss.str());
|
||||
// RCLCPP_INFO(this->get_logger(), "I heard: '%s'", msg->data.c_str());
|
||||
// nth_prime_silly(runtime_);
|
||||
double used_runtime = runtime_;
|
||||
if (use_random_runtime)
|
||||
{
|
||||
if (rand() % 100 < runtime_over_chance * 100)
|
||||
{
|
||||
used_runtime *= over_runtime_scale;
|
||||
}
|
||||
else
|
||||
{
|
||||
used_runtime *= normal_runtime_scale;
|
||||
}
|
||||
}
|
||||
dummy_load(used_runtime);
|
||||
// RCLCPP_INFO(this->get_logger(), "Result: %d", result);
|
||||
auto new_msg = std_msgs::msg::Int32();
|
||||
new_msg.data = msg->data;
|
||||
pub_->publish(new_msg);
|
||||
ss.str("");
|
||||
ss << "{\"operation\": \"end_work\", \"chain\": " << chain_
|
||||
<< ", \"node\": \"" << get_name() << "\", \"count\": " << msg->data
|
||||
<< "}";
|
||||
log_entry(logger, ss.str());
|
||||
};
|
||||
rclcpp::SubscriptionOptions sub_options;
|
||||
if (callback_group)
|
||||
{
|
||||
callback_group_ = callback_group;
|
||||
}
|
||||
else
|
||||
{
|
||||
callback_group_ =
|
||||
node->create_callback_group(rclcpp::CallbackGroupType::Reentrant);
|
||||
}
|
||||
|
||||
sub_options.callback_group = callback_group_;
|
||||
sub_ = node->create_subscription<std_msgs::msg::Int32>(
|
||||
subscribe_topic, get_qos(), callback, sub_options);
|
||||
pub_ = node->create_publisher<std_msgs::msg::Int32>(publish_topic, get_qos());
|
||||
}
|
Loading…
Add table
Add a link
Reference in a new issue