This commit is contained in:
Niklas Halle 2025-04-02 15:51:09 +02:00
parent 98cc5a6a3f
commit 50f099dbfd
6 changed files with 12237 additions and 12145 deletions

View file

@ -70,5 +70,12 @@
"typeindex": "cpp",
"typeinfo": "cpp",
"variant": "cpp"
}
},
"cmake.ignoreCMakeListsMissing": true,
"python.autoComplete.extraPaths": [
"/opt/ros/foxy/lib/python3.8/site-packages"
],
"python.analysis.extraPaths": [
"/opt/ros/foxy/lib/python3.8/site-packages"
]
}

File diff suppressed because one or more lines are too long

File diff suppressed because it is too large Load diff

View file

@ -25,6 +25,7 @@ set(EXPERIMENTS
casestudy_2024ours_latency
casestudy_2024ours_executor2executor
casestudy_example
casestudy_fire_drone
)
# Function to create experiment targets

View file

@ -0,0 +1,172 @@
#include "casestudy_tools/experiment.hpp"
#include "casestudy_tools/primes_workload.hpp"
#include "priority_executor/priority_executor.hpp"
#include "priority_executor/priority_memory_strategy.hpp"
#include <rclcpp/rclcpp.hpp>
#include <atomic>
#include <chrono>
#include <memory>
#include <thread>
#include <iostream>
#include <sys/prctl.h>
// bool should_do_task = true;
std::atomic<bool> should_do_task(true);
void run_one_executor(
std::function<void(std::atomic<bool>&)> const& exec_fun,
int const idx) {
(void)idx;
// cpu_set_t cpuset;
// CPU_ZERO(&cpuset);
// CPU_SET(0, &cpuset);
// CPU_SET(1, &cpuset);
// CPU_SET(2, &cpuset);
// CPU_SET(3, &cpuset);
// int result = sched_setaffinity(0, sizeof(cpu_set_t), &cpuset);
// if (result != 0)
// {
// std::cout << "ros_experiment: sched_setaffinity failed: " << result << ": " << strerror(errno) << std::endl;
// }
// set this process to second highest priority
struct sched_param param;
param.sched_priority = sched_get_priority_max(SCHED_FIFO) - 2;
int const result = sched_setscheduler(0, SCHED_FIFO, &param);
if (result != 0) {
std::cerr << "ros_experiment: sched_setscheduler failed: " << result
<< ": " << strerror(errno) << std::endl;
std::cerr << "This operation requires root privileges. Please run the program with sufficient permissions."
<< std::endl;
#ifndef WIN_DOCKER_IS_BROKEN
exit(EXIT_FAILURE);
#else
// Windows Docker workaround: just print the error and continue
std::cerr << "Continuing without setting scheduler, fuck Windows" << std::endl;
#endif
}
// Set the process name to "ros_experiment" for easier identification during debugging or system monitoring.
prctl(PR_SET_NAME, "ros_experiment", 0, 0, 0);
exec_fun(should_do_task);
}
int ros_experiment(
int const argc,
char** const argv,
std::string const& file_name,
ExperimentConfig config) {
rclcpp::init(argc, argv);
// we have to create a node to process any passed parameters
auto const node = rclcpp::Node::make_shared("experiment_parameters");
node->declare_parameter("executor_type", "edf");
std::string executor_type;
node->get_parameter("executor_type", executor_type);
std::cout << "using executor type: " << executor_type << std::endl;
config.executor_type = executor_type;
// Enable parallel mode for the experiment configuration.
// When set to true, this allows multiple executors to run concurrently,
// leveraging multi-threading to improve performance on multi-core systems.
config.parallel_mode = true;
config.nodes.push_back(node);
Experiment experiment(config);
// std::string result_log = experiment.run(should_do_task);
auto const exec_funs = experiment.getRunFunctions();
std::cout << "got " << exec_funs.size() << " executors" << std::endl;
std::vector<std::thread> exec_threads;
int i = 0;
experiment.resetTimers();
for (auto const& exec_fun : exec_funs) {
exec_threads.emplace_back(run_one_executor, exec_fun, i++);
}
for (auto& t : exec_threads) {
t.join();
}
std::string const outputname = "casestudy_example";
experiment.writeLogsToFile(file_name, outputname);
/*
// Create a PriorityMemoryStrategy instance
auto strat = std::make_shared<priority_executor::PriorityMemoryStrategy<>>();
// Atomic flags to control the chains
std::atomic<bool> fire_detection_running{true};
std::atomic<bool> image_processing_running{true};
std::atomic<bool> data_streaming_running{true};
std::atomic<bool> flight_control_running{true};
// Launch chains in separate threads
// Run for a fixed duration (e.g., 10 seconds)
std::this_thread::sleep_for(std::chrono::seconds(10));
// Stop all chains
fire_detection_running.store(false);
image_processing_running.store(false);
data_streaming_running.store(false);
flight_control_running.store(false);
// Join threads
fire_detection_thread.join();
image_processing_thread.join();
data_streaming_thread.join();
flight_control_thread.join();
std::cout << "All chains stopped. Exiting..." << std::endl;
*/
rclcpp::shutdown();
return 0;
}
int main(int argc, char* argv[]) {
// calibrate the dummy load for the current system
calibrate_dummy_load();
ExperimentConfig config;
config.chain_lengths = {2, 2};
config.node_ids = {{0, 1}, {2, 3}};
config.node_priorities = {1, 0, 3, 2};
config.chain_timer_control = {0, 1};
config.node_runtimes = {10, 10, 10, 10};
// node 0 has a period of 80, and is the only timer
config.chain_periods = {100, 100};
config.node_executor_assignments = {};
config.parallel_mode = true;
config.cores = 2;
sanity_check_config(config);
std::thread ros_task([&]() {
try {
ros_experiment(argc, argv, "cs_example", config);
} catch (std::exception const& e) {
std::cerr << "Exception in ros_experiment: " << e.what() << std::endl;
} catch (...) {
std::cerr << "Unknown exception in ros_experiment." << std::endl;
}
});
std::cout << "tasks started" << std::endl;
ros_task.join();
std::cout << "tasks joined" << std::endl;
return 0;
}

View file

@ -8,7 +8,8 @@
#include "casestudy_tools/test_nodes.hpp"
#include "simple_timer/rt-sched.hpp"
struct ExperimentConfig {
struct ExperimentConfig
{
std::vector<uint32_t> chain_lengths;
// TODO: rename to callback_ids
std::vector<std::vector<uint32_t>> node_ids;
@ -44,13 +45,15 @@ struct ExperimentConfig {
std::vector<std::string> special_cases;
};
struct experiment_executor {
struct experiment_executor
{
std::shared_ptr<priority_executor::TimedExecutor> executor;
std::shared_ptr<priority_executor::PriorityMemoryStrategy<>> strat;
std::shared_ptr<rclcpp::Executor> default_executor;
};
class Experiment {
class Experiment
{
public:
explicit Experiment(ExperimentConfig config);
@ -60,7 +63,7 @@ public:
* The flag will be set to false when the experiment is complete.
* @return std::string containing experiment results
*/
std::string run(std::atomic<bool>& should_do_task);
std::string run(std::atomic<bool> &should_do_task);
/**
* @brief Get the functions required to run the experiment
@ -70,7 +73,7 @@ public:
* Each function accepts a flag that can be set to false to stop the experiment.
* The flag will be set to false when the experiment is complete.
*/
std::vector<std::function<void(std::atomic<bool>&)>> getRunFunctions();
std::vector<std::function<void(std::atomic<bool> &)>> getRunFunctions();
static size_t numExecutorsRequired(ExperimentConfig const &config);
@ -98,7 +101,7 @@ private:
std::vector<experiment_executor> executors;
// Chain-indexed vector of deques of deadlines
std::vector<std::deque<uint64_t>*> chain_deadlines_deque;
std::vector<std::deque<uint64_t> *> chain_deadlines_deque;
// Maps node_id to node
std::map<uint32_t, std::shared_ptr<CaseStudyNode>> nodes;
// Maps chain_id to timer
@ -130,7 +133,7 @@ private:
node_time_logger _logger;
};
int do_other_task(double work_time, uint period, std::atomic<bool>& should_do_task);
int do_other_task(double work_time, uint period, std::atomic<bool> &should_do_task);
void sanity_check_config(ExperimentConfig const &config);
#endif // RTIS_EXPERIMENT_HOST