basic refactor

This commit is contained in:
Niklas Halle 2025-04-01 16:04:19 +02:00 committed by Niklas Halle
parent a6df3fcc41
commit abf3496c78
21 changed files with 13752 additions and 13232 deletions

74
.vscode/settings.json vendored Normal file
View file

@ -0,0 +1,74 @@
{
"files.associations": {
"cctype": "cpp",
"clocale": "cpp",
"cmath": "cpp",
"csignal": "cpp",
"cstdarg": "cpp",
"cstddef": "cpp",
"cstdio": "cpp",
"cstdlib": "cpp",
"cstring": "cpp",
"ctime": "cpp",
"cwchar": "cpp",
"cwctype": "cpp",
"array": "cpp",
"atomic": "cpp",
"strstream": "cpp",
"bit": "cpp",
"*.tcc": "cpp",
"bitset": "cpp",
"chrono": "cpp",
"compare": "cpp",
"complex": "cpp",
"concepts": "cpp",
"condition_variable": "cpp",
"cstdint": "cpp",
"deque": "cpp",
"list": "cpp",
"map": "cpp",
"set": "cpp",
"unordered_map": "cpp",
"unordered_set": "cpp",
"vector": "cpp",
"exception": "cpp",
"algorithm": "cpp",
"functional": "cpp",
"iterator": "cpp",
"memory": "cpp",
"memory_resource": "cpp",
"numeric": "cpp",
"optional": "cpp",
"random": "cpp",
"ratio": "cpp",
"string": "cpp",
"string_view": "cpp",
"system_error": "cpp",
"tuple": "cpp",
"type_traits": "cpp",
"utility": "cpp",
"fstream": "cpp",
"future": "cpp",
"initializer_list": "cpp",
"iomanip": "cpp",
"iosfwd": "cpp",
"iostream": "cpp",
"istream": "cpp",
"limits": "cpp",
"mutex": "cpp",
"new": "cpp",
"ostream": "cpp",
"ranges": "cpp",
"shared_mutex": "cpp",
"sstream": "cpp",
"stdexcept": "cpp",
"stop_token": "cpp",
"streambuf": "cpp",
"thread": "cpp",
"cfenv": "cpp",
"cinttypes": "cpp",
"typeindex": "cpp",
"typeinfo": "cpp",
"variant": "cpp"
}
}

125
README.md
View file

@ -1,3 +1,124 @@
Experiment files for the RTIS ros_rt executor. Clone with submodules on, or add the executor to the src folder.
# Dynamic Priority Scheduling for ROS-EDF Executor
Run `ros2 run casestudy casestudy_example` then run the `analysis/analysis.ipynb` notebook.
This repository contains experiments for the ROS-EDF executor with priority-based and deadline-based scheduling capabilities.
## Usage
Clone with submodules enabled:
```
git clone --recurse-submodules https://github.com/user/ROS-Dynamic-Executor-Experiments.git
```
Make sure you have sourced your ROS 2 environment.
Run the example:
```
ros2 run casestudy casestudy_example
```
Then run the analysis notebook:
```
jupyter notebook analysis/analysis.ipynb
```
---
## Dynamically Adjusting Scheduler Approaches
The PriorityMemoryStrategy supports different scheduling policies for ROS callbacks. To extend it with dynamically adjusting scheduling that reacts to semantic events, there are several viable approaches:
### Approach A: Custom PriorityExecutableComparator
Replace the existing comparator with a custom implementation:
**Pros:**
- Direct control over scheduling logic
- Clean separation of concerns
- Can implement complex scheduling policies
- Doesn't require modifying deadline values
**Cons:**
- Requires understanding and maintaining the comparison logic
- May need to add new fields to PriorityExecutable to track dynamic priorities
- Could become complex if multiple factors affect priority
### Approach B: Dynamic Deadline Adjustment
Keep the EDF comparator but adjust deadlines based on priority logic:
**Pros:**
- Works within the existing EDF framework
- Conceptually simple - just manipulate deadline values
- Doesn't require changing the core sorting mechanism
- Easier to debug (you can log deadline changes)
**Cons:**
- Potentially confusing semantics (using deadlines to represent non-deadline priorities)
- May interfere with actual deadline-based requirements
- Could lead to instability if not carefully managed
### Approach C: Event-Driven Priority Field
Add a dynamic boost factor field to PriorityExecutable:
```cpp
int dynamic_priority_boost = 0;
```
Then modify the comparator to consider this value:
```cpp
if (p1->sched_type == DEADLINE) {
// First compare deadline as usual
// Then use dynamic_priority_boost as a tiebreaker
if (p1_deadline == p2_deadline) {
return p1->dynamic_priority_boost > p2->dynamic_priority_boost;
}
return p1_deadline < p2_deadline;
}
```
**Pros:**
- Clearer semantics than manipulating deadlines
- Keeps deadline scheduling intact for real-time guarantees
- Easy to adjust at runtime
**Cons:**
- Requires modifying both PriorityExecutable and comparison logic
### Approach D: Priority Multiplier System
Implement a system where chains can have priority multipliers applied:
```cpp
float priority_multiplier = 1.0f;
```
And then in the comparator:
```cpp
// For priority-based scheduling
int effective_p1 = p1->priority * p1->priority_multiplier;
int effective_p2 = p2->priority * p2->priority_multiplier;
return effective_p1 < effective_p2;
```
**Pros:**
- Scales existing priorities rather than replacing them
- Preserves relative importance within chains
- Intuitive model for temporary priority boosts
**Cons:**
- May need to handle overflow/boundary cases
- Requires careful tuning of multiplier ranges
## Recommendation
Approach C (Event-Driven Priority Field) offers the best balance of:
1. Clean semantics
2. Minimal interference with existing scheduling logic
3. Clear separation between baseline priorities and dynamic adjustments
4. Straightforward implementation
This approach maintains real-time guarantees while enabling dynamic behaviors based on semantic events.

File diff suppressed because one or more lines are too long

File diff suppressed because it is too large Load diff

View file

@ -1,95 +1,74 @@
cmake_minimum_required(VERSION 3.5)
project(casestudy)
cmake_minimum_required(VERSION 3.8)
project(casestudy VERSION 0.1.0)
# Default to C99
if(NOT CMAKE_C_STANDARD)
set(CMAKE_C_STANDARD 99)
endif()
# Default to C++14
if(NOT CMAKE_CXX_STANDARD)
set(CMAKE_CXX_STANDARD 14)
endif()
# Set C++ standards
set(CMAKE_CXX_STANDARD 17)
set(CMAKE_CXX_STANDARD_REQUIRED ON)
set(CMAKE_CXX_EXTENSIONS OFF)
# Compiler options
if(CMAKE_COMPILER_IS_GNUCXX OR CMAKE_CXX_COMPILER_ID MATCHES "Clang")
add_compile_options(-Wall -Wextra -Wpedantic)
endif()
# find dependencies
# Find dependencies
find_package(ament_cmake REQUIRED)
# uncomment the following section in order to fill in
# further dependencies manually.
# find_package(<dependency> REQUIRED)
find_package(rclcpp REQUIRED)
find_package(priority_executor REQUIRED)
find_package(simple_timer REQUIRED)
find_package(rclcpp REQUIRED)
find_package(jsoncpp REQUIRED)
find_package(casestudy_tools REQUIRED)
# add_library(test_nodes src/test_nodes.cpp)
# target_include_directories(test_nodes PUBLIC
# $<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/include>
# $<INSTALL_INTERFACE:include>
# )
# ament_target_dependencies(test_nodes
# rclcpp
# simple_timer
# casestudy_tools
# )
list(APPEND experiments
# Define experiment targets
set(EXPERIMENTS
casestudy_2023customfile_singlethread
casestudy_2024ours_latency
casestudy_2024ours_executor2executor
casestudy_example
)
# Function to create experiment targets
function(new_experiment experiment_name)
add_executable(${experiment_name} src/${experiment_name}.cpp)
target_include_directories(${experiment_name} PUBLIC
$<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/include>
$<INSTALL_INTERFACE:include>
)
# Add dependencies with ament
ament_target_dependencies(${experiment_name}
rclcpp
priority_executor
simple_timer
casestudy_tools
rclcpp
priority_executor
)
endfunction()
foreach(experiment ${experiments})
# Create all experiment targets
foreach(experiment ${EXPERIMENTS})
new_experiment(${experiment})
endforeach()
target_link_libraries(casestudy_2023customfile_singlethread
jsoncpp
# Special handling for targets with additional dependencies
target_link_libraries(casestudy_2023customfile_singlethread jsoncpp)
# Installation
install(
TARGETS ${EXPERIMENTS}
RUNTIME DESTINATION lib/${PROJECT_NAME}
)
install(TARGETS
${experiments}
DESTINATION lib/${PROJECT_NAME}
)
install(
DIRECTORY include/
DESTINATION include
)
# Testing
if(BUILD_TESTING)
find_package(ament_lint_auto REQUIRED)
# the following line skips the linter which checks for copyrights
# uncomment the line when a copyright and license is not present in all source files
#set(ament_cmake_copyright_FOUND TRUE)
# the following line skips cpplint (only works in a git repo)
# uncomment the line when this package is not in a git repo
#set(ament_cmake_cpplint_FOUND TRUE)
ament_lint_auto_find_test_dependencies()
endif()
# Export and package configuration
ament_export_include_directories(include)
ament_package()

3
src/casestudy/include/.gitignore vendored Normal file
View file

@ -0,0 +1,3 @@
# We don't have content in this dir yet, but want it in git anyways
# So we add a non-empty .gitignore file, explicitly not ignoring itself
!.gitignore

View file

@ -2,19 +2,25 @@
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
<name>casestudy</name>
<version>0.0.0</version>
<description>TODO: Package description</description>
<maintainer email="vboxuser@todo.todo">vboxuser</maintainer>
<license>TODO: License declaration</license>
<version>0.1.0</version>
<description>
ROS 2 case study package for evaluating different executor implementations and timing behaviors
</description>
<maintainer email="kurt4wilson@gmail.com">kurt</maintainer>
<license>Apache License 2.0</license>
<buildtool_depend>ament_cmake</buildtool_depend>
<test_depend>ament_lint_auto</test_depend>
<test_depend>ament_lint_common</test_depend>
<!-- Build Dependencies -->
<depend>rclcpp</depend>
<depend>priority_executor</depend>
<depend>simple_timer</depend>
<depend>casestudy_tools</depend>
<depend>jsoncpp</depend>
<!-- Test Dependencies -->
<test_depend>ament_lint_auto</test_depend>
<test_depend>ament_lint_common</test_depend>
<export>
<build_type>ament_cmake</build_type>

View file

@ -2,193 +2,163 @@
#include "priority_executor/priority_executor.hpp"
#include "priority_executor/priority_memory_strategy.hpp"
#include "casestudy_tools/test_nodes.hpp"
#include <string>
#include <fstream>
#include <unistd.h>
#include "std_msgs/msg/string.hpp"
#include "casestudy_tools/primes_workload.hpp"
#include <sys/prctl.h>
#include "casestudy_tools/primes_workload.hpp"
#include "casestudy_tools/experiment.hpp"
#include "jsoncpp/json/reader.h"
#include <string>
#include <fstream>
#include <unistd.h>
#include <sys/prctl.h>
// bool should_do_task = true;
std::atomic<bool> should_do_task(true);
void run_one_executor(std::function<void(std::atomic<bool> &)> exec_fun, int idx)
{
(void)idx;
// set this process to highest priority
struct sched_param param;
param.sched_priority = sched_get_priority_max(SCHED_FIFO) - 1;
int result = sched_setscheduler(0, SCHED_FIFO, &param);
if (result != 0)
{
std::cout << "ros_experiment: sched_setscheduler failed: " << result << ": " << strerror(errno) << std::endl;
}
void run_one_executor(
std::function<void(std::atomic<bool>&)> const& exec_fun,
int const idx) {
(void)idx;
// set process name
prctl(PR_SET_NAME, "ros_experiment", 0, 0, 0);
exec_fun(should_do_task);
}
int ros_experiment(int argc, char **arg, std::string file_name, ExperimentConfig config)
{
rclcpp::init(argc, arg);
// we have to create a node to process any passed parameters
auto node = rclcpp::Node::make_shared("experiment_node");
node->declare_parameter("executor_type", "edf");
std::string executor_type;
node->get_parameter("executor_type", executor_type);
std::cout << "using executor type: " << executor_type << std::endl;
// TODO: move experiment configuration to main
config.executor_type = executor_type;
config.nodes.push_back(node);
Experiment experiment(config);
// std::string result_log = experiment.run(should_do_task);
std::vector<std::function<void(std::atomic<bool> &)>> exec_funs = experiment.getRunFunctions();
std::cout << "got " << exec_funs.size() << " executors" << std::endl;
std::vector<std::thread> exec_threads;
int i = 0;
experiment.resetTimers();
for (auto exec_fun : exec_funs)
{
exec_threads.push_back(std::thread(run_one_executor, exec_fun, i));
i++;
}
for (auto &t : exec_threads)
{
t.join();
}
experiment.writeLogsToFile(file_name, "2023customfile_singlethread");
return 0;
}
int main(int argc, char *argv[])
{
std::ifstream input_file("taskset.json"); // Replace with actual filename
if (!input_file.is_open())
{
std::cerr << "Error opening file!" << std::endl;
return 1;
}
Json::Value root;
Json::Reader reader;
bool parsingSuccessful = reader.parse(input_file, root);
if (!parsingSuccessful)
{
std::cerr << "Error parsing file!" << std::endl;
return 1;
}
// set ourselves to the second highest priority
struct sched_param param;
param.sched_priority = sched_get_priority_max(SCHED_FIFO) - 2;
int result = sched_setscheduler(0, SCHED_FIFO, &param);
if (result != 0)
{
std::cout << "ros_experiment: sched_setscheduler failed: " << result << ": " << strerror(errno) << std::endl;
}
// calibrate the dummy load for the current system
calibrate_dummy_load();
ExperimentConfig config;
// config.chain_lengths = {4, 3, 4};
for (uint i = 0; i < root["chain_lengths"].size(); i++)
{
config.chain_lengths.push_back(root["chain_lengths"][i].asUInt());
}
// config.node_ids = {{0, 1, 2, 3}, {4, 5, 6}, {7, 8, 9, 10}};
for (uint i = 0; i < root["node_ids"].size(); i++)
{
std::vector<uint32_t> node_ids_row;
for (uint j = 0; j < root["node_ids"][i].size(); j++)
{
node_ids_row.push_back(root["node_ids"][i][j].asUInt());
// set this process to highest priority
struct sched_param param;
param.sched_priority = sched_get_priority_max(SCHED_FIFO) - 1;
int const result = sched_setscheduler(0, SCHED_FIFO, &param);
if (result != 0) {
std::cout << "ros_experiment: sched_setscheduler failed: " << result
<< ": " << strerror(errno) << std::endl;
}
config.node_ids.push_back(node_ids_row);
}
// config.node_priorities = {3, 2, 1, 0, 6, 5, 4, 10, 9, 8, 7}; // PICAS-style prioritiesA
for (uint i = 0; i < root["node_priorities"].size(); i++)
{
config.node_priorities.push_back(root["node_priorities"][i].asUInt());
}
// disabled for custom
config.num_groups = 0;
// config.group_memberships = {0, 0, 1, 2, 0, 0, 1, 0, 0, 1, 2};
// config.chain_timer_control = {0, 1, 2};
for (uint i = 0; i < root["chain_timer_control"].size(); i++)
{
config.chain_timer_control.push_back(root["chain_timer_control"][i].asInt());
}
// config.node_runtimes = {5, 5, 5, 5, // chain 1
// 10, 20, 5, // chain 2
// 10, 10, 15, 20}; // chain 3
for (uint i = 0; i < root["node_runtimes"].size(); i++)
{
config.node_runtimes.push_back(root["node_runtimes"][i].asDouble());
}
// config.chain_periods = {50, 100, 110};
for (uint i = 0; i < root["chain_periods"].size(); i++)
{
config.chain_periods.push_back(root["chain_periods"][i].asInt());
}
// config.node_executor_assignments = {}; // empty - multicore mode
for (uint i = 0; i < root["node_executor_assignments"].size(); i++)
{
config.node_executor_assignments.push_back(root["node_executor_assignments"][i].asUInt() - 1);
}
// single threaded mode - assign executors to cpu cores
for (uint i = 0; i < root["executor_to_cpu_core"].size(); i++)
{
config.executor_to_cpu_assignments.push_back(root["executor_to_cpu_core"][i].asUInt() - 1);
}
config.parallel_mode = false;
config.cores = 4;
// sanity_check_config(config);
// from 35/65 to 100/0 in increments of 5
// for (int i = 0; i <= (100 - 35) / 5; i++)
// {
// for now, run with 100% ros
int i = (100 - 35) / 5;
int ros_budget = 35 + i * 5;
int other_task_budget = 65 - i * 5;
std::cout << "ROS budget: " << ros_budget << " Other task budget: " << other_task_budget << std::endl;
// do other task in a separate thread
should_do_task.store(true);
double other_task_time = 80 * other_task_budget / 100.0;
std::thread other_task(do_other_task, other_task_time, 80, std::ref(should_do_task));
// do ros task
std::string file_name = "ros_" + std::to_string(ros_budget);
// ros_experiment(argc, argv, file_name);
std::thread ros_task(ros_experiment, argc, argv, file_name, config);
std::cout << "tasks started" << std::endl;
ros_task.join();
other_task.join();
std::cout << "tasks joined" << std::endl;
// // run once
// break;
// }
// set process name
prctl(PR_SET_NAME, "ros_experiment", 0, 0, 0);
exec_fun(should_do_task);
}
int ros_experiment(
int const argc,
char** const argv,
std::string const& file_name,
ExperimentConfig config) {
rclcpp::init(argc, argv);
// we have to create a node to process any passed parameters
auto const node = rclcpp::Node::make_shared("experiment_node");
node->declare_parameter("executor_type", "edf");
std::string executor_type;
node->get_parameter("executor_type", executor_type);
std::cout << "using executor type: " << executor_type << std::endl;
config.executor_type = executor_type;
config.nodes.push_back(node);
Experiment experiment(config);
auto const exec_funs = experiment.getRunFunctions();
std::cout << "got " << exec_funs.size() << " executors" << std::endl;
std::vector<std::thread> exec_threads;
int i = 0;
experiment.resetTimers();
for (auto const& exec_fun : exec_funs) {
exec_threads.emplace_back(run_one_executor, exec_fun, i++);
}
for (auto& t : exec_threads) {
t.join();
}
experiment.writeLogsToFile(file_name, "2023customfile_singlethread");
return 0;
}
int main(int argc, char* argv[]) {
std::ifstream input_file("taskset.json");
if (!input_file.is_open()) {
std::cerr << "Error opening file!" << std::endl;
return 1;
}
Json::Value root;
Json::Reader reader;
bool const parsingSuccessful = reader.parse(input_file, root);
if (!parsingSuccessful) {
std::cerr << "Error parsing file!" << std::endl;
return 1;
}
// set ourselves to the second highest priority
struct sched_param param;
param.sched_priority = sched_get_priority_max(SCHED_FIFO) - 2;
int const result = sched_setscheduler(0, SCHED_FIFO, &param);
if (result != 0) {
std::cout << "ros_experiment: sched_setscheduler failed: " << result
<< ": " << strerror(errno) << std::endl;
}
// calibrate the dummy load for the current system
calibrate_dummy_load();
ExperimentConfig config;
for (uint i = 0; i < root["chain_lengths"].size(); i++) {
config.chain_lengths.push_back(root["chain_lengths"][i].asUInt());
}
for (uint i = 0; i < root["node_ids"].size(); i++) {
std::vector<uint32_t> node_ids_row;
for (uint j = 0; j < root["node_ids"][i].size(); j++) {
node_ids_row.push_back(root["node_ids"][i][j].asUInt());
}
config.node_ids.push_back(std::move(node_ids_row));
}
for (uint i = 0; i < root["node_priorities"].size(); i++) {
config.node_priorities.push_back(root["node_priorities"][i].asUInt());
}
config.num_groups = 0;
for (uint i = 0; i < root["chain_timer_control"].size(); i++) {
config.chain_timer_control.push_back(root["chain_timer_control"][i].asInt());
}
for (uint i = 0; i < root["node_runtimes"].size(); i++) {
config.node_runtimes.push_back(root["node_runtimes"][i].asDouble());
}
for (uint i = 0; i < root["chain_periods"].size(); i++) {
config.chain_periods.push_back(root["chain_periods"][i].asInt());
}
for (uint i = 0; i < root["node_executor_assignments"].size(); i++) {
config.node_executor_assignments.push_back(root["node_executor_assignments"][i].asUInt() - 1);
}
for (uint i = 0; i < root["executor_to_cpu_core"].size(); i++) {
config.executor_to_cpu_assignments.push_back(root["executor_to_cpu_core"][i].asUInt() - 1);
}
config.parallel_mode = false;
config.cores = 4;
int const i = (100 - 35) / 5;
int const ros_budget = 35 + i * 5;
int const other_task_budget = 65 - i * 5;
std::cout << "ROS budget: " << ros_budget
<< " Other task budget: " << other_task_budget << std::endl;
should_do_task.store(true);
double const other_task_time = 80 * other_task_budget / 100.0;
std::thread other_task(do_other_task, other_task_time, 80, std::ref(should_do_task));
std::string const file_name = "ros_" + std::to_string(ros_budget);
std::thread ros_task(ros_experiment, argc, argv, file_name, config);
std::cout << "tasks started" << std::endl;
ros_task.join();
other_task.join();
std::cout << "tasks joined" << std::endl;
return 0;
}

View file

@ -7,190 +7,191 @@
// bool should_do_task = true;
std::atomic<bool> should_do_task(true);
void run_one_executor(std::function<void(std::atomic<bool> &)> exec_fun,
int idx) {
(void)idx;
// set this process to highest priority
// struct sched_param param;
// param.sched_priority = sched_get_priority_max(SCHED_FIFO) - 1;
// int result = sched_setscheduler(0, SCHED_FIFO, &param);
// if (result != 0)
// {
// std::cout << "ros_experiment: sched_setscheduler failed: " << result <<
// ": " << strerror(errno) << std::endl;
// }
sched_attr rt_attr;
rt_attr.size = sizeof(rt_attr);
rt_attr.sched_flags = 0;
rt_attr.sched_nice = 0;
rt_attr.sched_priority = 99;
rt_attr.sched_policy = SCHED_FIFO;
rt_attr.sched_runtime = 0;
rt_attr.sched_period = 0;
int result = sched_setattr(0, &rt_attr, 0);
if (result != 0) {
std::cout << "executor task: could not set scheduler: " << result << ": "
<< strerror(errno) << std::endl;
}
void run_one_executor(
std::function<void(std::atomic<bool>&)> const& exec_fun,
int const idx) {
(void)idx;
// set this process to highest priority
// struct sched_param param;
// param.sched_priority = sched_get_priority_max(SCHED_FIFO) - 1;
// int result = sched_setscheduler(0, SCHED_FIFO, &param);
// if (result != 0)
// {
// std::cout << "ros_experiment: sched_setscheduler failed: " << result <<
// ": " << strerror(errno) << std::endl;
// }
sched_attr rt_attr;
rt_attr.size = sizeof(rt_attr);
rt_attr.sched_flags = 0;
rt_attr.sched_nice = 0;
rt_attr.sched_priority = 99;
rt_attr.sched_policy = SCHED_FIFO;
rt_attr.sched_runtime = 0;
rt_attr.sched_period = 0;
// set process name
prctl(PR_SET_NAME, "ros_experiment", 0, 0, 0);
int const result = sched_setattr(0, &rt_attr, 0);
if (result != 0) {
std::cout << "executor task: could not set scheduler: " << result << ": "
<< strerror(errno) << std::endl;
}
exec_fun(should_do_task);
// set process name
prctl(PR_SET_NAME, "ros_experiment", 0, 0, 0);
exec_fun(should_do_task);
}
int ros_experiment(rclcpp::Node::SharedPtr node, std::string file_name,
ExperimentConfig config) {
int ros_experiment(
rclcpp::Node::SharedPtr const node,
std::string const& file_name,
ExperimentConfig config) {
std::string executor_type;
node->get_parameter("executor_type", executor_type);
int num_subscribers;
node->get_parameter("num_subscribers", num_subscribers);
int num_chains;
node->get_parameter("num_chains", num_chains);
std::cout << "using executor type: " << executor_type << std::endl;
std::string executor_type;
node->get_parameter("executor_type", executor_type);
int num_subscribers;
node->get_parameter("num_subscribers", num_subscribers);
int num_chains;
node->get_parameter("num_chains", num_chains);
std::cout << "using executor type: " << executor_type << std::endl;
// TODO: move experiment configuration to main
config.executor_type = executor_type;
config.nodes.push_back(node);
// TODO: move experiment configuration to main
config.executor_type = executor_type;
config.nodes.push_back(node);
Experiment experiment(config);
// std::string result_log = experiment.run(should_do_task);
std::vector<std::function<void(std::atomic<bool> &)>> exec_funs =
experiment.getRunFunctions();
std::cout << "got " << exec_funs.size() << " executors" << std::endl;
std::vector<std::thread> exec_threads;
int i = 0;
experiment.resetTimers();
for (auto exec_fun : exec_funs) {
exec_threads.push_back(std::thread(run_one_executor, exec_fun, i));
i++;
}
Experiment experiment(config);
// std::string result_log = experiment.run(should_do_task);
auto const exec_funs = experiment.getRunFunctions();
std::cout << "got " << exec_funs.size() << " executors" << std::endl;
std::vector<std::thread> exec_threads;
int i = 0;
experiment.resetTimers();
for (auto &t : exec_threads) {
t.join();
}
for (auto const& exec_fun : exec_funs) {
exec_threads.emplace_back(run_one_executor, exec_fun, i++);
}
std::string outputname = "2024ours_executor2executor";
outputname +=
"_" + std::to_string(num_subscribers) + "_" + std::to_string(num_chains);
experiment.writeLogsToFile(file_name, outputname);
for (auto& t : exec_threads) {
t.join();
}
return 0;
std::string outputname = "2024ours_executor2executor_" +
std::to_string(num_subscribers) + "_" +
std::to_string(num_chains);
experiment.writeLogsToFile(file_name, outputname);
return 0;
}
int main(int argc, char *argv[]) {
rclcpp::init(argc, argv);
// we have to create a node to process any passed parameters
auto node = rclcpp::Node::make_shared("experiment_node");
// auto node = rclcpp::Node::make_shared("experiment_node",
// rclcpp::NodeOptions().use_intra_process_comms(true));
node->declare_parameter("executor_type", "edf");
node->declare_parameter("num_subscribers", 3);
node->declare_parameter("num_chains", 1);
node->declare_parameter("runtime", 0);
int main(int argc, char* argv[]) {
rclcpp::init(argc, argv);
// we have to create a node to process any passed parameters
auto const node = rclcpp::Node::make_shared("experiment_node");
// auto node = rclcpp::Node::make_shared("experiment_node",
// rclcpp::NodeOptions().use_intra_process_comms(true));
node->declare_parameter("executor_type", "edf");
node->declare_parameter("num_subscribers", 3);
node->declare_parameter("num_chains", 1);
node->declare_parameter("runtime", 0);
int num_s;
node->get_parameter("num_subscribers", num_s);
int runtime;
node->get_parameter("runtime", runtime);
int num_chains;
node->get_parameter("num_chains", num_chains);
int num_s;
node->get_parameter("num_subscribers", num_s);
int runtime;
node->get_parameter("runtime", runtime);
int num_chains;
node->get_parameter("num_chains", num_chains);
// set ourselves to the second highest priority
// struct sched_param param;
// param.sched_priority = sched_get_priority_max(SCHED_FIFO) - 2;
// int result = sched_setscheduler(0, SCHED_FIFO, &param);
sched_attr rt_attr;
rt_attr.size = sizeof(rt_attr);
rt_attr.sched_policy = SCHED_FIFO;
// rt_attr.sched_priority = sched_get_priority_max(SCHED_FIFO) - 2;
rt_attr.sched_priority = 99;
int result = sched_setattr(0, &rt_attr, 0);
// set ourselves to the second highest priority
// struct sched_param param;
// param.sched_priority = sched_get_priority_max(SCHED_FIFO) - 2;
// int result = sched_setscheduler(0, SCHED_FIFO, &param);
sched_attr rt_attr;
rt_attr.size = sizeof(rt_attr);
rt_attr.sched_policy = SCHED_FIFO;
// rt_attr.sched_priority = sched_get_priority_max(SCHED_FIFO) - 2;
rt_attr.sched_priority = 99;
int const result = sched_setattr(0, &rt_attr, 0);
if (result != 0) {
std::cout << "ros_experiment: sched_setscheduler failed: " << result << ": "
<< strerror(errno) << std::endl;
}
// calibrate the dummy load for the current system
calibrate_dummy_load();
ExperimentConfig config;
// runtimes and timers subject to change
config.chain_lengths = {};
config.node_ids = {};
config.chain_timer_control = {};
config.chain_periods = {};
config.node_runtimes = {};
config.node_executor_assignments = {};
int node_id = 0;
for (int c = 0; c < num_chains; c++) {
config.node_ids.push_back({});
for (int i = 0; i < num_s + 1; i++) {
config.node_ids[c].push_back(node_id);
if (i == 0) {
// first node is the publisher, and goes in the first executor
config.node_executor_assignments.push_back(0);
} else {
// all other nodes go in the second executor
config.node_executor_assignments.push_back(1);
}
node_id++;
if (result != 0) {
std::cout << "ros_experiment: sched_setscheduler failed: " << result << ": "
<< strerror(errno) << std::endl;
}
for (int i = 0; i < num_s + 1; i++) {
config.node_priorities.push_back(i);
// calibrate the dummy load for the current system
calibrate_dummy_load();
ExperimentConfig config;
// runtimes and timers subject to change
config.chain_lengths = {};
config.node_ids = {};
config.chain_timer_control = {};
config.chain_periods = {};
config.node_runtimes = {};
config.node_executor_assignments = {};
int node_id = 0;
for (int c = 0; c < num_chains; c++) {
config.node_ids.push_back({});
for (int i = 0; i < num_s + 1; i++) {
config.node_ids[c].push_back(node_id);
if (i == 0) {
// first node is the publisher, and goes in the first executor
config.node_executor_assignments.push_back(0);
} else {
// all other nodes go in the second executor
config.node_executor_assignments.push_back(1);
}
node_id++;
}
for (int i = 0; i < num_s + 1; i++) {
config.node_priorities.push_back(i);
config.node_runtimes.push_back(runtime);
}
config.chain_lengths.push_back(num_s+1);
config.chain_periods.push_back(10);
config.chain_timer_control.push_back(c);
}
config.chain_lengths.push_back(num_s+1);
config.chain_periods.push_back(10);
for (int i = 0; i < num_s + 1; i++) {
config.node_runtimes.push_back(runtime);
config.executor_to_cpu_assignments = {0, 1};
std::cout << "node ids: " << std::endl;
for (size_t i = 0; i < config.node_ids.size(); i++) {
for (size_t j = 0; j < config.node_ids[i].size(); j++) {
std::cout << config.node_ids[i][j] << " ";
}
std::cout << std::endl;
}
config.chain_timer_control.push_back(c);
}
config.executor_to_cpu_assignments = {0, 1};
std::cout << "node ids: " << std::endl;
for (size_t i = 0; i < config.node_ids.size(); i++) {
for (size_t j = 0; j < config.node_ids[i].size(); j++) {
std::cout << config.node_ids[i][j] << " ";
}
std::cout << std::endl;
}
config.num_groups = 0;
config.group_memberships = {}; // no groups
config.num_groups = 0;
config.group_memberships = {}; // no groups
config.parallel_mode = false;
config.cores = 1;
config.special_cases.push_back("2024ours_latency");
config.parallel_mode = false;
config.cores = 1;
config.special_cases.push_back("2024ours_latency");
// sanity_check_config(config);
// sanity_check_config(config);
// from 35/65 to 100/0 in increments of 5
// for (int i = 0; i <= (100 - 35) / 5; i++)
// {
// for now, run with 100% ros
int i = (100 - 35) / 5;
int ros_budget = 35 + i * 5;
int other_task_budget = 65 - i * 5;
std::cout << "ROS budget: " << ros_budget
<< " Other task budget: " << other_task_budget << std::endl;
// do other task in a separate thread
should_do_task.store(true);
// from 35/65 to 100/0 in increments of 5
// for (int i = 0; i <= (100 - 35) / 5; i++)
// {
// for now, run with 100% ros
int const i = (100 - 35) / 5;
int const ros_budget = 35 + i * 5;
int const other_task_budget = 65 - i * 5;
std::cout << "ROS budget: " << ros_budget
<< " Other task budget: " << other_task_budget << std::endl;
// do other task in a separate thread
should_do_task.store(true);
// do ros task
std::string file_name = "ros_" + std::to_string(ros_budget);
// ros_experiment(argc, argv, file_name);
std::thread ros_task(ros_experiment, node, file_name, config);
std::cout << "tasks started" << std::endl;
ros_task.join();
std::cout << "tasks joined" << std::endl;
// do ros task
std::string const file_name = "ros_" + std::to_string(ros_budget);
// ros_experiment(argc, argv, file_name);
std::thread ros_task(ros_experiment, node, file_name, config);
std::cout << "tasks started" << std::endl;
ros_task.join();
std::cout << "tasks joined" << std::endl;
// // run once
// break;
// }
// // run once
// break;
// }
return 0;
}

View file

@ -7,182 +7,179 @@
// bool should_do_task = true;
std::atomic<bool> should_do_task(true);
void run_one_executor(std::function<void(std::atomic<bool>&)> const& exec_fun,
int const idx) {
(void)idx;
// set this process to highest priority
// struct sched_param param;
// param.sched_priority = sched_get_priority_max(SCHED_FIFO) - 1;
// int result = sched_setscheduler(0, SCHED_FIFO, &param);
// if (result != 0)
// {
// std::cout << "ros_experiment: sched_setscheduler failed: " << result <<
// ": " << strerror(errno) << std::endl;
// }
sched_attr rt_attr;
rt_attr.size = sizeof(rt_attr);
rt_attr.sched_flags = 0;
rt_attr.sched_nice = 0;
rt_attr.sched_priority = 99;
rt_attr.sched_policy = SCHED_FIFO;
rt_attr.sched_runtime = 0;
rt_attr.sched_period = 0;
int const result = sched_setattr(0, &rt_attr, 0);
if (result != 0) {
std::cout << "executor task: could not set scheduler: " << result << ": "
<< strerror(errno) << std::endl;
}
void run_one_executor(std::function<void(std::atomic<bool> &)> exec_fun,
int idx) {
(void)idx;
// set this process to highest priority
// struct sched_param param;
// param.sched_priority = sched_get_priority_max(SCHED_FIFO) - 1;
// int result = sched_setscheduler(0, SCHED_FIFO, &param);
// if (result != 0)
// {
// std::cout << "ros_experiment: sched_setscheduler failed: " << result <<
// ": " << strerror(errno) << std::endl;
// }
sched_attr rt_attr;
rt_attr.size = sizeof(rt_attr);
rt_attr.sched_flags = 0;
rt_attr.sched_nice = 0;
rt_attr.sched_priority = 99;
rt_attr.sched_policy = SCHED_FIFO;
rt_attr.sched_runtime = 0;
rt_attr.sched_period = 0;
int result = sched_setattr(0, &rt_attr, 0);
if (result != 0) {
std::cout << "executor task: could not set scheduler: " << result << ": "
<< strerror(errno) << std::endl;
}
// set process name
prctl(PR_SET_NAME, "ros_experiment", 0, 0, 0);
// set process name
prctl(PR_SET_NAME, "ros_experiment", 0, 0, 0);
exec_fun(should_do_task);
exec_fun(should_do_task);
}
int ros_experiment(rclcpp::Node::SharedPtr node, std::string file_name,
ExperimentConfig config) {
int ros_experiment(rclcpp::Node::SharedPtr const& node, std::string const& file_name,
ExperimentConfig config) {
std::string executor_type;
node->get_parameter("executor_type", executor_type);
int num_subscribers;
node->get_parameter("num_subscribers", num_subscribers);
int num_chains;
node->get_parameter("num_chains", num_chains);
std::cout << "using executor type: " << executor_type << std::endl;
std::string executor_type;
node->get_parameter("executor_type", executor_type);
int num_subscribers;
node->get_parameter("num_subscribers", num_subscribers);
int num_chains;
node->get_parameter("num_chains", num_chains);
std::cout << "using executor type: " << executor_type << std::endl;
// TODO: move experiment configuration to main
config.executor_type = executor_type;
config.nodes.push_back(node);
// TODO: move experiment configuration to main
config.executor_type = executor_type;
config.nodes.push_back(node);
Experiment experiment(config);
// std::string result_log = experiment.run(should_do_task);
std::vector<std::function<void(std::atomic<bool>&)>> exec_funs =
experiment.getRunFunctions();
std::cout << "got " << exec_funs.size() << " executors" << std::endl;
std::vector<std::thread> exec_threads;
int i = 0;
experiment.resetTimers();
for (auto const& exec_fun : exec_funs) {
exec_threads.push_back(std::thread(run_one_executor, exec_fun, i));
i++;
}
Experiment experiment(config);
// std::string result_log = experiment.run(should_do_task);
std::vector<std::function<void(std::atomic<bool> &)>> exec_funs =
experiment.getRunFunctions();
std::cout << "got " << exec_funs.size() << " executors" << std::endl;
std::vector<std::thread> exec_threads;
int i = 0;
experiment.resetTimers();
for (auto exec_fun : exec_funs) {
exec_threads.push_back(std::thread(run_one_executor, exec_fun, i));
i++;
}
for (auto& t : exec_threads) {
t.join();
}
for (auto &t : exec_threads) {
t.join();
}
std::string const outputname = "2024ours_latency_" +
std::to_string(num_subscribers) + "_" + std::to_string(num_chains);
experiment.writeLogsToFile(file_name, outputname);
std::string outputname = "2024ours_latency";
outputname +=
"_" + std::to_string(num_subscribers) + "_" + std::to_string(num_chains);
experiment.writeLogsToFile(file_name, outputname);
return 0;
return 0;
}
int main(int argc, char *argv[]) {
rclcpp::init(argc, argv);
// we have to create a node to process any passed parameters
auto node = rclcpp::Node::make_shared("experiment_node");
// auto node = rclcpp::Node::make_shared("experiment_node",
// rclcpp::NodeOptions().use_intra_process_comms(true));
node->declare_parameter("executor_type", "edf");
node->declare_parameter("num_subscribers", 3);
node->declare_parameter("num_chains", 1);
node->declare_parameter("runtime", 0);
int main(int argc, char* argv[]) {
rclcpp::init(argc, argv);
// we have to create a node to process any passed parameters
auto const node = rclcpp::Node::make_shared("experiment_node");
// auto node = rclcpp::Node::make_shared("experiment_node",
// rclcpp::NodeOptions().use_intra_process_comms(true));
node->declare_parameter("executor_type", "edf");
node->declare_parameter("num_subscribers", 3);
node->declare_parameter("num_chains", 1);
node->declare_parameter("runtime", 0);
int num_s;
node->get_parameter("num_subscribers", num_s);
int runtime;
node->get_parameter("runtime", runtime);
int num_chains;
node->get_parameter("num_chains", num_chains);
int num_s;
node->get_parameter("num_subscribers", num_s);
int runtime;
node->get_parameter("runtime", runtime);
int num_chains;
node->get_parameter("num_chains", num_chains);
// set ourselves to the second highest priority
// struct sched_param param;
// param.sched_priority = sched_get_priority_max(SCHED_FIFO) - 2;
// int result = sched_setscheduler(0, SCHED_FIFO, &param);
sched_attr rt_attr;
rt_attr.size = sizeof(rt_attr);
rt_attr.sched_policy = SCHED_FIFO;
// rt_attr.sched_priority = sched_get_priority_max(SCHED_FIFO) - 2;
rt_attr.sched_priority = 99;
int result = sched_setattr(0, &rt_attr, 0);
// set ourselves to the second highest priority
// struct sched_param param;
// param.sched_priority = sched_get_priority_max(SCHED_FIFO) - 2;
// int result = sched_setscheduler(0, SCHED_FIFO, &param);
sched_attr rt_attr;
rt_attr.size = sizeof(rt_attr);
rt_attr.sched_policy = SCHED_FIFO;
// rt_attr.sched_priority = sched_get_priority_max(SCHED_FIFO) - 2;
rt_attr.sched_priority = 99;
int const result = sched_setattr(0, &rt_attr, 0);
if (result != 0) {
std::cout << "ros_experiment: sched_setscheduler failed: " << result << ": "
<< strerror(errno) << std::endl;
}
// calibrate the dummy load for the current system
calibrate_dummy_load();
ExperimentConfig config;
// runtimes and timers subject to change
config.chain_lengths = {};
config.node_ids = {};
config.chain_timer_control = {};
config.chain_periods = {};
config.node_runtimes = {};
int node_id = 0;
for (int c = 0; c < num_chains; c++) {
config.node_ids.push_back({});
for (int i = 0; i < num_s + 1; i++) {
config.node_ids[c].push_back(node_id);
node_id++;
if (result != 0) {
std::cout << "ros_experiment: sched_setscheduler failed: " << result << ": "
<< strerror(errno) << std::endl;
}
for (int i = 0; i < num_s + 1; i++) {
config.node_priorities.push_back(i);
// calibrate the dummy load for the current system
calibrate_dummy_load();
ExperimentConfig config;
// runtimes and timers subject to change
config.chain_lengths = {};
config.node_ids = {};
config.chain_timer_control = {};
config.chain_periods = {};
config.node_runtimes = {};
int node_id = 0;
for (int c = 0; c < num_chains; c++) {
config.node_ids.push_back({});
for (int i = 0; i < num_s + 1; i++) {
config.node_ids[c].push_back(node_id);
node_id++;
}
for (int i = 0; i < num_s + 1; i++) {
config.node_priorities.push_back(i);
}
config.chain_lengths.push_back(num_s + 1);
config.chain_periods.push_back(10);
for (int i = 0; i < num_s + 1; i++) {
config.node_runtimes.push_back(runtime);
}
config.chain_timer_control.push_back(c);
}
config.chain_lengths.push_back(num_s+1);
config.chain_periods.push_back(10);
for (int i = 0; i < num_s + 1; i++) {
config.node_runtimes.push_back(runtime);
std::cout << "node ids: " << std::endl;
for (size_t i = 0; i < config.node_ids.size(); i++) {
for (size_t j = 0; j < config.node_ids[i].size(); j++) {
std::cout << config.node_ids[i][j] << " ";
}
std::cout << std::endl;
}
config.chain_timer_control.push_back(c);
}
std::cout << "node ids: " << std::endl;
for (size_t i = 0; i < config.node_ids.size(); i++) {
for (size_t j = 0; j < config.node_ids[i].size(); j++) {
std::cout << config.node_ids[i][j] << " ";
}
std::cout << std::endl;
}
config.num_groups = 0;
config.group_memberships = {}; // no groups
config.node_executor_assignments = {};
// config.node_executor_assignments = {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0};
config.parallel_mode = false;
config.cores = 1;
config.special_cases.push_back("2024ours_latency");
config.num_groups = 0;
config.group_memberships = {}; // no groups
// sanity_check_config(config);
config.node_executor_assignments = {};
// config.node_executor_assignments = {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0};
config.parallel_mode = false;
config.cores = 1;
config.special_cases.push_back("2024ours_latency");
// from 35/65 to 100/0 in increments of 5
// for (int i = 0; i <= (100 - 35) / 5; i++)
// {
// for now, run with 100% ros
int const i = (100 - 35) / 5;
int const ros_budget = 35 + i * 5;
int const other_task_budget = 65 - i * 5;
std::cout << "ROS budget: " << ros_budget
<< " Other task budget: " << other_task_budget << std::endl;
// do other task in a separate thread
should_do_task.store(true);
// sanity_check_config(config);
// do ros task
std::string const file_name = "ros_" + std::to_string(ros_budget);
// ros_experiment(argc, argv, file_name);
std::thread ros_task(ros_experiment, node, file_name, config);
std::cout << "tasks started" << std::endl;
ros_task.join();
std::cout << "tasks joined" << std::endl;
// from 35/65 to 100/0 in increments of 5
// for (int i = 0; i <= (100 - 35) / 5; i++)
// {
// for now, run with 100% ros
int i = (100 - 35) / 5;
int ros_budget = 35 + i * 5;
int other_task_budget = 65 - i * 5;
std::cout << "ROS budget: " << ros_budget
<< " Other task budget: " << other_task_budget << std::endl;
// do other task in a separate thread
should_do_task.store(true);
// do ros task
std::string file_name = "ros_" + std::to_string(ros_budget);
// ros_experiment(argc, argv, file_name);
std::thread ros_task(ros_experiment, node, file_name, config);
std::cout << "tasks started" << std::endl;
ros_task.join();
std::cout << "tasks joined" << std::endl;
// // run once
// break;
// }
// // run once
// break;
// }
return 0;
}

View file

@ -3,13 +3,10 @@
#include <string>
#include <fstream>
#include <iostream>
#include <unistd.h>
#include <sys/prctl.h>
#include <rclcpp/rclcpp.hpp>
#include <std_msgs/msg/string.hpp>
#include "casestudy_tools/test_nodes.hpp"
#include "casestudy_tools/experiment.hpp"
#include "casestudy_tools/primes_workload.hpp"
@ -19,116 +16,119 @@
// bool should_do_task = true;
std::atomic<bool> should_do_task(true);
void run_one_executor(std::function<void(std::atomic<bool> &)> exec_fun, int idx)
{
(void)idx;
// cpu_set_t cpuset;
// CPU_ZERO(&cpuset);
// CPU_SET(0, &cpuset);
// CPU_SET(1, &cpuset);
// CPU_SET(2, &cpuset);
// CPU_SET(3, &cpuset);
void run_one_executor(
std::function<void(std::atomic<bool>&)> const& exec_fun,
int const idx) {
(void)idx;
// cpu_set_t cpuset;
// CPU_ZERO(&cpuset);
// CPU_SET(0, &cpuset);
// CPU_SET(1, &cpuset);
// CPU_SET(2, &cpuset);
// CPU_SET(3, &cpuset);
// int result = sched_setaffinity(0, sizeof(cpu_set_t), &cpuset);
// if (result != 0)
// {
// std::cout << "ros_experiment: sched_setaffinity failed: " << result << ": " << strerror(errno) << std::endl;
// }
// int result = sched_setaffinity(0, sizeof(cpu_set_t), &cpuset);
// if (result != 0)
// {
// std::cout << "ros_experiment: sched_setaffinity failed: " << result << ": " << strerror(errno) << std::endl;
// }
// set this process to second highest priority
struct sched_param param;
param.sched_priority = sched_get_priority_max(SCHED_FIFO) - 2;
int result = sched_setscheduler(0, SCHED_FIFO, &param);
if (result != 0)
{
std::cerr << "ros_experiment: sched_setscheduler failed: " << result << ": " << strerror(errno) << std::endl;
std::cerr << "This operation requires root privileges. Please run the program with sufficient permissions." << std::endl;
exit(EXIT_FAILURE);
}
// Set the process name to "ros_experiment" for easier identification during debugging or system monitoring.
prctl(PR_SET_NAME, "ros_experiment", 0, 0, 0);
// set this process to second highest priority
struct sched_param param;
param.sched_priority = sched_get_priority_max(SCHED_FIFO) - 2;
int const result = sched_setscheduler(0, SCHED_FIFO, &param);
exec_fun(should_do_task);
if (result != 0) {
std::cerr << "ros_experiment: sched_setscheduler failed: " << result
<< ": " << strerror(errno) << std::endl;
std::cerr << "This operation requires root privileges. Please run the program with sufficient permissions."
<< std::endl;
exit(EXIT_FAILURE);
}
// Set the process name to "ros_experiment" for easier identification during debugging or system monitoring.
prctl(PR_SET_NAME, "ros_experiment", 0, 0, 0);
exec_fun(should_do_task);
}
int ros_experiment(int argc, char **arg, std::string file_name, ExperimentConfig config)
{
int ros_experiment(
int const argc,
char** const argv,
std::string const& file_name,
ExperimentConfig config) {
rclcpp::init(argc, arg);
// we have to create a node to process any passed parameters
auto node = rclcpp::Node::make_shared("experiment_parameters");
node->declare_parameter("executor_type", "edf");
rclcpp::init(argc, argv);
std::string executor_type;
node->get_parameter("executor_type", executor_type);
std::cout << "using executor type: " << executor_type << std::endl;
// we have to create a node to process any passed parameters
auto const node = rclcpp::Node::make_shared("experiment_parameters");
node->declare_parameter("executor_type", "edf");
// TODO: move experiment configuration to main
config.executor_type = executor_type;
// Enable parallel mode for the experiment configuration.
// When set to true, this allows multiple executors to run concurrently,
// leveraging multi-threading to improve performance on multi-core systems.
config.parallel_mode = true;
config.nodes.push_back(node);
std::string executor_type;
node->get_parameter("executor_type", executor_type);
std::cout << "using executor type: " << executor_type << std::endl;
Experiment experiment(config);
// std::string result_log = experiment.run(should_do_task);
std::vector<std::function<void(std::atomic<bool> &)> > exec_funs = experiment.getRunFunctions();
std::cout << "got " << exec_funs.size() << " executors" << std::endl;
std::vector<std::thread> exec_threads;
int i = 0;
experiment.resetTimers();
for (auto exec_fun : exec_funs)
{
exec_threads.push_back(std::thread(run_one_executor, exec_fun, i));
i++;
}
config.executor_type = executor_type;
// Enable parallel mode for the experiment configuration.
// When set to true, this allows multiple executors to run concurrently,
// leveraging multi-threading to improve performance on multi-core systems.
config.parallel_mode = true;
config.nodes.push_back(node);
for (auto &t : exec_threads)
{
t.join();
}
std::string outputname = "casestudy_example";
experiment.writeLogsToFile(file_name, outputname);
Experiment experiment(config);
// std::string result_log = experiment.run(should_do_task);
auto const exec_funs = experiment.getRunFunctions();
std::cout << "got " << exec_funs.size() << " executors" << std::endl;
return 0;
std::vector<std::thread> exec_threads;
int i = 0;
experiment.resetTimers();
for (auto const& exec_fun : exec_funs) {
exec_threads.emplace_back(run_one_executor, exec_fun, i++);
}
for (auto& t : exec_threads) {
t.join();
}
std::string const outputname = "casestudy_example";
experiment.writeLogsToFile(file_name, outputname);
return 0;
}
int main(int argc, char *argv[])
{
// calibrate the dummy load for the current system
calibrate_dummy_load();
int main(int argc, char* argv[]) {
// calibrate the dummy load for the current system
calibrate_dummy_load();
ExperimentConfig config;
config.chain_lengths = {2, 2};
config.node_ids = {{0, 1}, {2, 3}};
config.node_priorities = {1, 0, 3, 2};
config.chain_timer_control = {0, 1};
ExperimentConfig config;
config.chain_lengths = {2, 2};
config.node_ids = {{0, 1}, {2, 3}};
config.node_priorities = {1, 0, 3, 2};
config.chain_timer_control = {0, 1};
config.node_runtimes = {10, 10, 10, 10};
// node 0 has a period of 80, and is the only timer
config.chain_periods = {100, 100};
config.node_executor_assignments = {};
config.parallel_mode = true;
config.cores = 2;
config.node_runtimes = {10, 10, 10, 10};
// node 0 has a period of 80, and is the only timer
config.chain_periods = {100, 100};
config.node_executor_assignments = {};
config.parallel_mode = true;
config.cores = 2;
sanity_check_config(config);
sanity_check_config(config);
std::thread ros_task([&]() {
try
{
ros_experiment(argc, argv, "cs_example", config);
}
catch (const std::exception &e)
{
std::cerr << "Exception in ros_experiment: " << e.what() << std::endl;
}
catch (...)
{
std::cerr << "Unknown exception in ros_experiment." << std::endl;
}
});
std::cout << "tasks started" << std::endl;
ros_task.join();
std::cout << "tasks joined" << std::endl;
std::thread ros_task([&]() {
try {
ros_experiment(argc, argv, "cs_example", config);
} catch (std::exception const& e) {
std::cerr << "Exception in ros_experiment: " << e.what() << std::endl;
} catch (...) {
std::cerr << "Unknown exception in ros_experiment." << std::endl;
}
});
std::cout << "tasks started" << std::endl;
ros_task.join();
std::cout << "tasks joined" << std::endl;
return 0;
}

View file

@ -1,38 +1,26 @@
cmake_minimum_required(VERSION 3.5)
project(casestudy_tools)
cmake_minimum_required(VERSION 3.8)
project(casestudy_tools VERSION 0.1.0)
# Default to C99
if(NOT CMAKE_C_STANDARD)
set(CMAKE_C_STANDARD 99)
endif()
# Default to C++14
if(NOT CMAKE_CXX_STANDARD)
set(CMAKE_CXX_STANDARD 14)
endif()
# Set C++ standards
set(CMAKE_CXX_STANDARD 17)
set(CMAKE_CXX_STANDARD_REQUIRED ON)
set(CMAKE_CXX_EXTENSIONS OFF)
# Compiler options
if(CMAKE_COMPILER_IS_GNUCXX OR CMAKE_CXX_COMPILER_ID MATCHES "Clang")
add_compile_options(-Wall -Wextra -Wpedantic)
endif()
# find dependencies
# Find dependencies
find_package(ament_cmake REQUIRED)
find_package(rclcpp REQUIRED)
find_package(priority_executor REQUIRED)
find_package(simple_timer REQUIRED)
if(BUILD_TESTING)
find_package(ament_lint_auto REQUIRED)
# the following line skips the linter which checks for copyrights
# uncomment the line when a copyright and license is not present in all source files
#set(ament_cmake_copyright_FOUND TRUE)
# the following line skips cpplint (only works in a git repo)
# uncomment the line when this package is not in a git repo
#set(ament_cmake_cpplint_FOUND TRUE)
ament_lint_auto_find_test_dependencies()
endif()
add_library(primes_workload src/primes_workload.cpp)
# Library targets
add_library(primes_workload
src/primes_workload.cpp
)
target_include_directories(primes_workload PUBLIC
$<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/include>
$<INSTALL_INTERFACE:include>
@ -41,7 +29,6 @@ ament_target_dependencies(primes_workload
simple_timer
rclcpp
priority_executor
)
add_library(test_nodes src/test_nodes.cpp)
@ -54,8 +41,9 @@ ament_target_dependencies(test_nodes
rclcpp
simple_timer
)
add_library(experiment_host src/experiment.cpp)
add_library(experiment_host
src/experiment.cpp
)
target_include_directories(experiment_host PUBLIC
$<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/include>
$<INSTALL_INTERFACE:include>
@ -70,7 +58,13 @@ ament_target_dependencies(experiment_host
priority_executor
)
# Testing
if(BUILD_TESTING)
find_package(ament_lint_auto REQUIRED)
ament_lint_auto_find_test_dependencies()
endif()
# Installation
install(
DIRECTORY include/
DESTINATION include
@ -85,6 +79,7 @@ install(
INCLUDES DESTINATION include
)
# Export and package configuration
ament_export_include_directories(include)
# ament_export_libraries(primes_workload test_nodes experiment_host)
ament_export_targets(export_${PROJECT_NAME} HAS_LIBRARY_TARGET)

View file

@ -8,9 +8,7 @@
#include "casestudy_tools/test_nodes.hpp"
#include "simple_timer/rt-sched.hpp"
struct ExperimentConfig
{
struct ExperimentConfig {
std::vector<uint32_t> chain_lengths;
// TODO: rename to callback_ids
std::vector<std::vector<uint32_t>> node_ids;
@ -38,103 +36,101 @@ struct ExperimentConfig
bool parallel_mode = false;
size_t cores = 2;
// rclcpp::Node::SharedPtr node;
// nodes to host the callbacks. In parallel (one executor using multiple threads) mode, there is one node and one executor. The node is provided by the experiment provider
// in singlethread (each executor uses one thread), there is one node per executor. Additional nodes may be created by the experiment host
// nodes to host the callbacks. In parallel (one executor using multiple threads) mode,
// there is one node and one executor. The node is provided by the experiment provider.
// In singlethread (each executor uses one thread), there is one node per executor.
// Additional nodes may be created by the experiment host.
std::vector<rclcpp::Node::SharedPtr> nodes;
std::vector<std::string> special_cases = {};
std::vector<std::string> special_cases;
};
struct experiment_executor
{
std::shared_ptr<timed_executor::TimedExecutor> executor;
std::shared_ptr<PriorityMemoryStrategy<>> strat;
std::shared_ptr<rclcpp::Executor> default_executor;
struct experiment_executor {
std::shared_ptr<priority_executor::TimedExecutor> executor;
std::shared_ptr<priority_executor::PriorityMemoryStrategy<>> strat;
std::shared_ptr<rclcpp::Executor> default_executor;
};
class Experiment
{
class Experiment {
public:
Experiment(ExperimentConfig config);
explicit Experiment(ExperimentConfig config);
/**
* @brief Run the experiment
* runs in the current thread
* @param should_do_task a flag that can be set to false to stop the experiment. The flag will be set to false when the experiment is complete.
*/
std::string run(std::atomic<bool> &should_do_task);
* @brief Run the experiment in the current thread
* @param should_do_task a flag that can be set to false to stop the experiment.
* The flag will be set to false when the experiment is complete.
* @return std::string containing experiment results
*/
std::string run(std::atomic<bool>& should_do_task);
/**
* @brief Get the functions required to run the experiment
* returns a vector of functions. Each function will run a different executor. It's up to the caller to run these functions in different threads.
* The caller may set the thread properties (e.g. priority) before running the function.
* Each function accepts a flag that can be set to false to stop the experiment. The flag will be set to false when the experiment is complete.
*/
std::vector<std::function<void(std::atomic<bool> &)>> getRunFunctions();
* @return vector of functions. Each function will run a different executor.
* It's up to the caller to run these functions in different threads.
* The caller may set the thread properties (e.g. priority) before running the function.
* Each function accepts a flag that can be set to false to stop the experiment.
* The flag will be set to false when the experiment is complete.
*/
std::vector<std::function<void(std::atomic<bool>&)>> getRunFunctions();
static size_t numExecutorsRequired(const ExperimentConfig &config);
static size_t numExecutorsRequired(ExperimentConfig const &config);
/**
* get the executor (and strategy) for a given executor index (0 by default)
*/
* @brief Get the executor (and strategy) for a given executor index
* @param executor_idx The index of the executor (defaults to 0)
* @return experiment_executor
*/
experiment_executor getExecutor(int executor_idx = 0);
node_time_logger getInternalLogger();
/**
* collect logs from the executors. Also add any logs from passed in loggers
* @param optional array of loggers to add to the logs
*/
* @brief Collect logs from the executors and optionally add logs from passed in loggers
* @param loggers Optional array of loggers to add to the logs
* @return std::string containing the combined logs
*/
std::string buildLogs(std::vector<node_time_logger> loggers = {});
void resetTimers();
void writeLogsToFile(std::string filename, std::string experiment_name);
void writeLogsToFile(std::string const &filename, std::string const &experiment_name);
private:
ExperimentConfig config;
std::vector<experiment_executor> executors;
// values to keep track of runtime values
// chain-indexed vector of deques of deadlines
std::vector<std::deque<uint64_t> *> chain_deadlines_deque;
// maps node_id to node
// Chain-indexed vector of deques of deadlines
std::vector<std::deque<uint64_t>*> chain_deadlines_deque;
// Maps node_id to node
std::map<uint32_t, std::shared_ptr<CaseStudyNode>> nodes;
// maps chain_id to timer
// Maps chain_id to timer
std::map<uint32_t, std::shared_ptr<rclcpp::TimerBase>> chain_timer_handles;
/**
* @brief Create the executors using the type and amount set in the config
*
*/
void createExecutors();
/**
* @brief Create a single executor using the type set in the config
*
* @param executor_num The executor number to create
* @return experiment_executor
*/
experiment_executor createSingleExecutor(uint executor_num = 0);
/**
* @brief Get the executor for a given node id
* @param node_id if the node_executor_assignments is empty, then this parameter is ignored, and the first executor is returned
*/
experiment_executor getExecutorForNode(int node_id=0);
* @param node_id If node_executor_assignments is empty, this parameter is ignored
* and the first executor is returned
* @return experiment_executor
*/
experiment_executor getExecutorForNode(int node_id = 0);
void createNodesAndAssignProperties();
void setInitialDeadlines();
node_time_logger _logger;
};
int do_other_task(double work_time, uint period, std::atomic<bool> &should_do_task);
void sanity_check_config(ExperimentConfig config);
int do_other_task(double work_time, uint period, std::atomic<bool>& should_do_task);
void sanity_check_config(ExperimentConfig const &config);
#endif // RTIS_EXPERIMENT_HOST
#endif // RTIS_EXPERIMENT_HOST

View file

@ -22,7 +22,7 @@ namespace DummyLoadCalibration
{
inline std::atomic<int> calibration{1};
inline void setCalibration(int calib);
inline void setCalibration(int const calib);
inline int getCalibration();
}
@ -39,6 +39,6 @@ void calibrate_dummy_load();
* This function performs a busy wait for a specified duration.
* It is used to calibrate the dummy load for the current system.
*/
void dummy_load(int load_ms);
void dummy_load(int const load_ms);
#endif

View file

@ -1,61 +1,66 @@
#ifndef RTIS_TEST_NODES
#define RTIS_TEST_NODES
#include <string>
#include "rclcpp/rclcpp.hpp"
#include "std_msgs/msg/string.hpp"
#include "std_msgs/msg/int32.hpp"
#include "simple_timer/rt-sched.hpp"
class CaseStudyNode
{
public: node_time_logger logger;
// inherit constructor
CaseStudyNode(std::string publish_topic, rclcpp::Node::SharedPtr node)
{
this->name = publish_topic;
this->node = node;
// seed random number generator
srand(time(NULL));
}
class CaseStudyNode {
public:
node_time_logger logger;
// inherit constructor
CaseStudyNode(std::string const &publish_topic, rclcpp::Node::SharedPtr const &node) {
this->name = publish_topic;
this->node = node;
// seed random number generator
srand(time(nullptr));
}
bool use_random_runtime = false;
double normal_runtime_scale = 0.8;
double over_runtime_scale = 1.2;
double runtime_over_chance = 0.1;
std::string get_name();
rclcpp::CallbackGroup::SharedPtr callback_group_;
bool use_random_runtime = false;
double normal_runtime_scale = 0.8;
double over_runtime_scale = 1.2;
double runtime_over_chance = 0.1;
std::string get_name();
rclcpp::CallbackGroup::SharedPtr callback_group_;
private:
rclcpp::Node::SharedPtr node;
std::string name;
private:
rclcpp::Node::SharedPtr node;
std::string name;
};
#define COUNT_MAX 500
class PublisherNode : public CaseStudyNode{
public:
PublisherNode(std::string publish_topic, double runtime, int period, int chain, rclcpp::Node::SharedPtr node, rclcpp::CallbackGroup::SharedPtr callback_group = nullptr);
rclcpp::TimerBase::SharedPtr timer_;
int count_max_ = COUNT_MAX;
class PublisherNode : public CaseStudyNode {
public:
PublisherNode(std::string const &publish_topic, double runtime, int period, int chain,
rclcpp::Node::SharedPtr const &node,
rclcpp::CallbackGroup::SharedPtr const &callback_group = nullptr);
rclcpp::TimerBase::SharedPtr timer_;
int count_max_ = COUNT_MAX;
private:
rclcpp::Publisher<std_msgs::msg::Int32>::SharedPtr pub_;
double runtime_;
int period_;
int chain_;
private:
rclcpp::Publisher<std_msgs::msg::Int32>::SharedPtr pub_;
double runtime_;
int period_;
int chain_;
};
class WorkerNode : public CaseStudyNode{
public:
WorkerNode(std::string subscribe_topic, std::string publish_topic, double runtime, int period, int chain, rclcpp::Node::SharedPtr node, rclcpp::CallbackGroup::SharedPtr callback_group = nullptr);
rclcpp::Subscription<std_msgs::msg::Int32>::SharedPtr sub_;
int count_max_ = COUNT_MAX;
class WorkerNode : public CaseStudyNode {
public:
WorkerNode(std::string const &subscribe_topic, std::string const &publish_topic,
double runtime, int period, int chain,
rclcpp::Node::SharedPtr const &node,
rclcpp::CallbackGroup::SharedPtr const &callback_group = nullptr);
rclcpp::Subscription<std_msgs::msg::Int32>::SharedPtr sub_;
int count_max_ = COUNT_MAX;
private:
double runtime_;
int period_;
int chain_;
rclcpp::Publisher<std_msgs::msg::Int32>::SharedPtr pub_;
private:
double runtime_;
int period_;
int chain_;
rclcpp::Publisher<std_msgs::msg::Int32>::SharedPtr pub_;
};
#endif // RTIS_TEST_NODES

View file

@ -2,10 +2,12 @@
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
<name>casestudy_tools</name>
<version>0.0.0</version>
<description>TODO: Package description</description>
<maintainer email="kurt4wilson@gmail.com">vboxuser</maintainer>
<license>TODO: License declaration</license>
<version>0.1.0</version>
<description>
ROS 2 package providing utility tools and common functionality for dynamic executor case studies
</description>
<maintainer email="kurt4wilson@gmail.com">kurt</maintainer>
<license>Apache License 2.0</license>
<buildtool_depend>ament_cmake</buildtool_depend>

File diff suppressed because it is too large Load diff

View file

@ -6,7 +6,7 @@
#include "casestudy_tools/primes_workload.hpp"
void DummyLoadCalibration::setCalibration(int calib) {
void DummyLoadCalibration::setCalibration(int const calib) {
calibration.store(calib, std::memory_order_relaxed);
}
@ -20,12 +20,12 @@ void calibrate_dummy_load()
for (int iteration = 0; iteration < MAX_ITERATIONS; ++iteration)
{
// Measure the time taken for the dummy load
auto start_time = std::chrono::high_resolution_clock::now();
auto const start_time = std::chrono::high_resolution_clock::now();
dummy_load(TARGET_DURATION_MS); // intended workload
auto end_time = std::chrono::high_resolution_clock::now();
auto const end_time = std::chrono::high_resolution_clock::now();
// Calculate duration in microseconds
auto duration_us =
auto const duration_us =
std::chrono::duration_cast<std::chrono::microseconds>(end_time - start_time).count();
std::cout << "[Calibration] Iteration: " << iteration
@ -51,12 +51,12 @@ void calibrate_dummy_load()
}
}
void dummy_load(int load_ms)
void dummy_load(int const load_ms)
{
const int calib_factor = DummyLoadCalibration::getCalibration();
int const calib_factor = DummyLoadCalibration::getCalibration();
// calibration factor corresponds to 100ms load, scale it
const int scaled_factor = calib_factor * load_ms / 100;
int const scaled_factor = calib_factor * load_ms / 100;
volatile int dummy_var = 0;
constexpr int DUMMY_LOAD_ITER = 100'000;

View file

@ -6,169 +6,174 @@
#include <ctime>
#include <rclcpp/logger.hpp>
std::string CaseStudyNode::get_name() { return name; }
std::string CaseStudyNode::get_name() {
return name;
}
#define BUFFER_LENGTH 0
rclcpp::QoS get_qos()
{
if (BUFFER_LENGTH == 0)
{
return rclcpp::QoS(rclcpp::KeepAll());
}
else
{
rclcpp::QoS get_qos() {
if (BUFFER_LENGTH == 0) {
return rclcpp::QoS(rclcpp::KeepAll());
}
return rclcpp::QoS(rclcpp::KeepLast(BUFFER_LENGTH));
}
}
void timespec_diff(timespec *start, timespec *end, timespec *result)
{
if ((end->tv_nsec - start->tv_nsec) < 0)
{
result->tv_sec = end->tv_sec - start->tv_sec - 1;
result->tv_nsec = 1000000000 + end->tv_nsec - start->tv_nsec;
}
else
{
result->tv_sec = end->tv_sec - start->tv_sec;
result->tv_nsec = end->tv_nsec - start->tv_nsec;
}
}
PublisherNode::PublisherNode(std::string publish_topic, double runtime,
int period, int chain,
rclcpp::Node::SharedPtr node,
rclcpp::CallbackGroup::SharedPtr callback_group)
: CaseStudyNode(publish_topic, node), runtime_(runtime), period_(period),
chain_(chain)
{
logger = create_logger();
pub_ = node->create_publisher<std_msgs::msg::Int32>(publish_topic, get_qos());
auto timer_callback = [this]() -> void
{
std::ostringstream ss;
ss << "{\"operation\": \"start_work\", \"chain\": " << chain_
<< ", \"node\": \"" << get_name() << "\", \"count\": " << count_max_;
std::chrono::nanoseconds time_until_trigger = timer_->time_until_trigger();
std::chrono::microseconds time_until_trigger_us =
std::chrono::duration_cast<std::chrono::microseconds>(
time_until_trigger);
ss << ", \"next_release_us\": " << time_until_trigger_us.count() << "}";
log_entry(logger, ss.str());
// nth_prime_silly(runtime_);
double used_runtime = runtime_;
if (use_random_runtime)
{
if (rand() % 100 < runtime_over_chance * 100)
{
used_runtime *= over_runtime_scale;
}
else
{
used_runtime *= normal_runtime_scale;
}
void timespec_diff(timespec const* start, timespec const* end, timespec* result) {
if ((end->tv_nsec - start->tv_nsec) < 0) {
result->tv_sec = end->tv_sec - start->tv_sec - 1;
result->tv_nsec = 1000000000 + end->tv_nsec - start->tv_nsec;
} else {
result->tv_sec = end->tv_sec - start->tv_sec;
result->tv_nsec = end->tv_nsec - start->tv_nsec;
}
// RCLCPP_INFO(rclcpp::get_logger(get_name()), "running %s with runtime %f",
// get_name().c_str(), used_runtime);
struct timespec start_time;
clock_gettime(CLOCK_MONOTONIC, &start_time);
dummy_load(used_runtime);
std_msgs::msg::Int32 msg;
msg.data = count_max_;
// RCLCPP_INFO(this->get_logger(), "Publishing: '%s'", msg.data.c_str());
pub_->publish(msg);
// TEST: can we take more than one msg from DDS?
// pub_->publish(msg);
ss.str("");
ss << "{\"operation\": \"end_work\", \"chain\": " << chain_
<< ", \"node\": \"" << get_name() << "\", \"count\": " << count_max_ << ", \"target_runtime\": " << runtime_
<< "}";
log_entry(logger, ss.str());
count_max_--;
if (count_max_ == 0)
{
rclcpp::shutdown();
}
/* struct timespec end_time;
clock_gettime(CLOCK_MONOTONIC, &end_time);
struct timespec diff;
timespec_diff(&start_time, &end_time, &diff);
size_t diff_ns = diff.tv_sec * 1000000000 + diff.tv_nsec;
size_t diff_ms = diff_ns / 1000000;
RCLCPP_INFO(rclcpp::get_logger(get_name()), "done running %s, took %lu ms",
get_name().c_str(), diff_ms); */
};
if (callback_group != nullptr)
{
callback_group_ = callback_group;
}
else
{
callback_group_ =
node->create_callback_group(rclcpp::CallbackGroupType::Reentrant);
}
timer_ = node->create_wall_timer(std::chrono::milliseconds(period),
timer_callback, callback_group_);
}
WorkerNode::WorkerNode(std::string subscribe_topic, std::string publish_topic,
double runtime, int period, int chain,
rclcpp::Node::SharedPtr node,
rclcpp::CallbackGroup::SharedPtr callback_group)
: CaseStudyNode(publish_topic, node), runtime_(runtime), period_(period),
chain_(chain)
{
logger = create_logger();
auto callback = [this](const std_msgs::msg::Int32::SharedPtr msg) -> void
{
// prevent unused variable warning
(void)msg;
PublisherNode::PublisherNode(
std::string const &publish_topic,
double const runtime,
int const period,
int const chain,
rclcpp::Node::SharedPtr const &node,
rclcpp::CallbackGroup::SharedPtr const &callback_group)
: CaseStudyNode(publish_topic, node)
, runtime_(runtime)
, period_(period)
, chain_(chain) {
std::ostringstream ss;
ss << "{\"operation\": \"start_work\", \"chain\": " << chain_
<< ", \"node\": \"" << get_name() << "\", \"count\": " << msg->data
<< "}";
log_entry(logger, ss.str());
// RCLCPP_INFO(this->get_logger(), "I heard: '%s'", msg->data.c_str());
// nth_prime_silly(runtime_);
double used_runtime = runtime_;
if (use_random_runtime)
{
if (rand() % 100 < runtime_over_chance * 100)
{
used_runtime *= over_runtime_scale;
}
else
{
used_runtime *= normal_runtime_scale;
}
logger = create_logger();
pub_ = node->create_publisher<std_msgs::msg::Int32>(publish_topic, get_qos());
auto const timer_callback = [this]() -> void {
std::ostringstream ss;
ss << "{\"operation\": \"start_work\", \"chain\": " << chain_
<< ", \"node\": \"" << get_name() << "\", \"count\": " << count_max_;
std::chrono::nanoseconds const time_until_trigger = timer_->time_until_trigger();
std::chrono::microseconds const time_until_trigger_us =
std::chrono::duration_cast<std::chrono::microseconds>(time_until_trigger);
ss << ", \"next_release_us\": " << time_until_trigger_us.count() << "}";
log_entry(logger, ss.str());
double used_runtime = runtime_;
if (use_random_runtime) {
if (rand() % 100 < runtime_over_chance * 100) {
used_runtime *= over_runtime_scale;
} else {
used_runtime *= normal_runtime_scale;
}
}
// RCLCPP_INFO(rclcpp::get_logger(get_name()), "running %s with runtime %f",
// get_name().c_str(), used_runtime);
struct timespec start_time;
clock_gettime(CLOCK_MONOTONIC, &start_time);
dummy_load(used_runtime);
std_msgs::msg::Int32 msg;
msg.data = count_max_;
// RCLCPP_INFO(this->get_logger(), "Publishing: '%s'", msg.data.c_str());
pub_->publish(msg);
// TEST: can we take more than one msg from DDS?
// pub_->publish(msg);
ss.str("");
ss << "{\"operation\": \"end_work\", \"chain\": " << chain_
<< ", \"node\": \"" << get_name() << "\", \"count\": " << count_max_
<< ", \"target_runtime\": " << runtime_ << "}";
log_entry(logger, ss.str());
count_max_--;
if (count_max_ == 0) {
rclcpp::shutdown();
}
/* struct timespec end_time;
clock_gettime(CLOCK_MONOTONIC, &end_time);
struct timespec diff;
timespec_diff(&start_time, &end_time, &diff);
size_t diff_ns = diff.tv_sec * 1000000000 + diff.tv_nsec;
size_t diff_ms = diff_ns / 1000000;
RCLCPP_INFO(rclcpp::get_logger(get_name()), "done running %s, took %lu ms",
get_name().c_str(), diff_ms); */
};
if (callback_group != nullptr) {
callback_group_ = callback_group;
} else {
callback_group_ = node->create_callback_group(
rclcpp::CallbackGroupType::Reentrant);
}
dummy_load(used_runtime);
// RCLCPP_INFO(this->get_logger(), "Result: %d", result);
auto new_msg = std_msgs::msg::Int32();
new_msg.data = msg->data;
pub_->publish(new_msg);
ss.str("");
ss << "{\"operation\": \"end_work\", \"chain\": " << chain_
<< ", \"node\": \"" << get_name() << "\", \"count\": " << msg->data
<< "}";
log_entry(logger, ss.str());
};
rclcpp::SubscriptionOptions sub_options;
if (callback_group)
{
callback_group_ = callback_group;
}
else
{
callback_group_ =
node->create_callback_group(rclcpp::CallbackGroupType::Reentrant);
}
sub_options.callback_group = callback_group_;
sub_ = node->create_subscription<std_msgs::msg::Int32>(
subscribe_topic, get_qos(), callback, sub_options);
pub_ = node->create_publisher<std_msgs::msg::Int32>(publish_topic, get_qos());
timer_ = node->create_wall_timer(
std::chrono::milliseconds(period),
timer_callback,
callback_group_);
}
WorkerNode::WorkerNode(
std::string const &subscribe_topic,
std::string const &publish_topic,
double const runtime,
int const period,
int const chain,
rclcpp::Node::SharedPtr const &node,
rclcpp::CallbackGroup::SharedPtr const &callback_group)
: CaseStudyNode(publish_topic, node)
, runtime_(runtime)
, period_(period)
, chain_(chain) {
logger = create_logger();
auto const callback = [this](std_msgs::msg::Int32::SharedPtr const msg) -> void {
// prevent unused variable warning
(void)msg;
std::ostringstream ss;
ss << "{\"operation\": \"start_work\", \"chain\": " << chain_
<< ", \"node\": \"" << get_name() << "\", \"count\": " << msg->data
<< "}";
log_entry(logger, ss.str());
// RCLCPP_INFO(this->get_logger(), "I heard: '%s'", msg->data.c_str());
// nth_prime_silly(runtime_);
double used_runtime = runtime_;
if (use_random_runtime) {
if (rand() % 100 < runtime_over_chance * 100) {
used_runtime *= over_runtime_scale;
} else {
used_runtime *= normal_runtime_scale;
}
}
dummy_load(used_runtime);
// RCLCPP_INFO(this->get_logger(), "Result: %d", result);
auto new_msg = std_msgs::msg::Int32();
new_msg.data = msg->data;
pub_->publish(new_msg);
ss.str("");
ss << "{\"operation\": \"end_work\", \"chain\": " << chain_
<< ", \"node\": \"" << get_name() << "\", \"count\": " << msg->data
<< "}";
log_entry(logger, ss.str());
};
rclcpp::SubscriptionOptions sub_options;
if (callback_group) {
callback_group_ = callback_group;
} else {
callback_group_ = node->create_callback_group(
rclcpp::CallbackGroupType::Reentrant);
}
sub_options.callback_group = callback_group_;
sub_ = node->create_subscription<std_msgs::msg::Int32>(
subscribe_topic,
get_qos(),
callback,
sub_options);
pub_ = node->create_publisher<std_msgs::msg::Int32>(publish_topic, get_qos());
}

@ -1 +1 @@
Subproject commit a2741afc9a204758ec047cb25c6a85c92b3298a6
Subproject commit 91c2a42d6bc0130cec32447373a0fec0def34a26