From 23c3cd5ceba8a475605f2b3ffc22661c8a28b948 Mon Sep 17 00:00:00 2001 From: Niklas Halle Date: Mon, 16 Jun 2025 14:52:39 +0200 Subject: [PATCH] add boosted flag (int) to the experiment pipeline --- build_run_copy.sh | 23 ++++++++--- src/full_topology/CMakeLists.txt | 4 ++ src/full_topology/src/full_topology.cpp | 16 ++++--- src/full_topology/src/nodes.hpp | 55 +++++++++++++++++++++++++ 4 files changed, 87 insertions(+), 11 deletions(-) diff --git a/build_run_copy.sh b/build_run_copy.sh index 6a3003b..701bead 100755 --- a/build_run_copy.sh +++ b/build_run_copy.sh @@ -11,14 +11,15 @@ LOCAL_PROJECT_DIR="/home/niklas/ROS-Dynamic-Executor-Experiments" # -------------------------------------- usage() { - echo "Usage: $0 [time_in_seconds] [no_rebuild]" + echo "Usage: $0 [boosted] [time_in_seconds] [no_rebuild]" echo "Example: $0 ros multi timed 10" echo "This script builds the full_topology package, runs a trace, and copies the results to a remote server." echo "Parameters:" echo " : Scheduler type (ros, edf, sem)" echo " : Threading type (single, multi)" echo " : Fusion type (timed, direct)" - echo " : Length of the trace (default: 10)" + echo " [boosted] : Optional flag to enable boosted mode (default: 1000, equals to disabled)" + echo " [time_in_seconds] : Length of the trace (default: 10)" echo " [no_rebuild] : Optional flag to skip rebuilding the package (true/false, default: false)" exit 1 } @@ -30,19 +31,29 @@ fi SCHEDULER_TYPE="$1" THREADING_TYPE="$2" FUSION_TYPE="$3" -TRACE_LENGTH="${4:-10}" -NO_REBUILD="${5:-false}" +BOOSTED="${4:-1000}" # Default to 1000 if not provided +TRACE_LENGTH="${5:-10}" +NO_REBUILD="${6:-false}" + +# When boosted is not set to 1000, we assume it is enabled +if [[ "$BOOSTED" != "1000" ]]; then + BOOSTED_FLAG="_boosted_${BOOSTED}" +else + BOOSTED_ENABLED="" +fi # Determine defines based on parameters CMAKE_ARGS=() [[ "$SCHEDULER_TYPE" == "ros" ]] && CMAKE_ARGS+=("-DROS_DEFAULT_EXECUTOR=ON") [[ "$SCHEDULER_TYPE" == "edf" ]] && CMAKE_ARGS+=("-DEDF_PRIORITY_EXECUTOR=ON") -[[ "$SCHEDULER_TYPE" == "sem" ]] && CMAKE_ARGS+=("-DSEMANTIC_EXECUTOR=ON") +[[ "$SCHEDULER_TYPE" == "sem" ]] && CMAKE_ARGS+=("-DEDF_PRIORITY_EXECUTOR=ON") && CMAKE_ARGS+=("-DSEMANTIC_EXECUTOR=ON") [[ "$THREADING_TYPE" == "multi" ]] && CMAKE_ARGS+=("-DMULTI_THREADED=ON") [[ "$FUSION_TYPE" == "timed" ]] && CMAKE_ARGS+=("-DUSE_TIMER_IN_FUSION_NODES=ON") +# We always hand the boosted value down +CMAKE_ARGS+=("-DBOOSTED=$BOOSTED") -TRACE_NAME="${SCHEDULER_TYPE}_${THREADING_TYPE}_${FUSION_TYPE}_${TRACE_LENGTH}" +TRACE_NAME="${SCHEDULER_TYPE}_${THREADING_TYPE}_${FUSION_TYPE}_${TRACE_LENGTH}${BOOSTED_FLAG}" cd "$LOCAL_PROJECT_DIR" diff --git a/src/full_topology/CMakeLists.txt b/src/full_topology/CMakeLists.txt index b2f50c7..0d9217e 100644 --- a/src/full_topology/CMakeLists.txt +++ b/src/full_topology/CMakeLists.txt @@ -50,6 +50,10 @@ if(USE_TIMER_IN_FUSION_NODES) add_compile_definitions(USE_TIMER_IN_FUSION_NODES) endif() +set(BOOSTED "1000" CACHE STRING "Boosted value for C++") +message(NOTICE "BOOSTED: ${BOOSTED}") +add_compile_definitions(BOOSTED=${BOOSTED}) + add_compile_definitions(RCLCPP_ENABLE_TRACEPOINTS) add_executable(${PROJECT_NAME} src/full_topology.cpp) diff --git a/src/full_topology/src/full_topology.cpp b/src/full_topology/src/full_topology.cpp index dd1f88a..8f88960 100644 --- a/src/full_topology/src/full_topology.cpp +++ b/src/full_topology/src/full_topology.cpp @@ -87,10 +87,7 @@ int main(int argc, char **argv) DECLARE_CHAIN(2); // Chain 3: Flight Control DECLARE_CHAIN(3); // Chain 4: Data Storage/Streaming -#if defined(SEMANTIC_EXECUTOR) - std::cerr << "Not yet implemented" << std::endl; - return -1; -#elif defined(EDF_PRIORITY_EXECUTOR) +#if defined(EDF_PRIORITY_EXECUTOR) // Create a priority executor with a custom memory strategy auto strategy = std::make_shared>(); rclcpp::ExecutorOptions options; @@ -185,7 +182,7 @@ int main(int argc, char **argv) // --- chain_id = 1; - chain_period = 1000; + chain_period = BOOSTED; { strategy->set_executable_deadline(cameraB->timer_->get_timer_handle(), chain_period, priority_executor::ExecutableType::TIMER, chain_id); strategy->get_priority_settings(cameraB->timer_->get_timer_handle())->timer_handle = cameraB->timer_; @@ -194,6 +191,15 @@ int main(int argc, char **argv) strategy->set_executable_deadline(radiometricB->subscription_->get_subscription_handle(), chain_period, priority_executor::ExecutableType::SUBSCRIPTION, chain_id); strategy->set_executable_deadline(smoke_classifier->subscription_->get_subscription_handle(), chain_period, priority_executor::ExecutableType::SUBSCRIPTION, chain_id); strategy->set_last_in_chain(smoke_classifier->subscription_->get_subscription_handle()); +#if defined(SEMANTIC_EXECUTOR) + smoke_classifier->chain_id_ = chain_id; + smoke_classifier->strategy_ = strategy; + smoke_classifier->executables_.reserve(4); + smoke_classifier->executables_.push_back({cameraB->timer_->get_timer_handle(), priority_executor::ExecutableType::TIMER}); + smoke_classifier->executables_.push_back({debayerB->subscription_->get_subscription_handle(), priority_executor::ExecutableType::SUBSCRIPTION}); + smoke_classifier->executables_.push_back({radiometricB->subscription_->get_subscription_handle(), priority_executor::ExecutableType::SUBSCRIPTION}); + smoke_classifier->executables_.push_back({smoke_classifier->subscription_->get_subscription_handle(), priority_executor::ExecutableType::SUBSCRIPTION}); +#endif } // --- diff --git a/src/full_topology/src/nodes.hpp b/src/full_topology/src/nodes.hpp index c073aea..f7b59b3 100644 --- a/src/full_topology/src/nodes.hpp +++ b/src/full_topology/src/nodes.hpp @@ -12,6 +12,10 @@ using namespace std::chrono_literals; #include "rclcpp/rclcpp.hpp" #include "std_msgs/msg/string.hpp" +#include "priority_executor/priority_executor.hpp" +#include "priority_executor/multithread_priority_executor.hpp" +#include "priority_executor/priority_memory_strategy.hpp" + // A generic sensor node that publishes at a given rate + jitter #define DEFINE_SENSOR_NODE_CLASS(NAME) \ class NAME : public rclcpp::Node \ @@ -180,7 +184,58 @@ DEFINE_PROCESSING_NODE_CLASS(RadiometricNode) DEFINE_PROCESSING_NODE_CLASS(GeometricNode) DEFINE_PROCESSING_NODE_CLASS(MappingNode) +#ifdef SEMANTIC_EXECUTOR +class SmokeClassifierNode : public rclcpp::Node +{ +public: + SmokeClassifierNode(const std::string & name, + const std::string & input_topic, + const std::string & output_topic, + int delay_ms, + double jitter_ms = 0.0) + : Node(name), delay_ms_(delay_ms), jitter_ms_(jitter_ms) + { + subscription_ = create_subscription( + input_topic, 10, + [this, name](std_msgs::msg::String::SharedPtr msg) + { + auto now = std::chrono::steady_clock::now(); + auto elapsed = std::chrono::duration_cast(now - construction_time_).count(); + if (elapsed > 10) { + for (auto const & [exec, type] : executables_) { + strategy_->set_executable_deadline(exec, 10, type, chain_id_); + } + } + double jitter = 0.0; + if (jitter_ms_ > 0.0) { + static thread_local std::mt19937 gen(std::random_device{}());\ + std::normal_distribution<> d(0.0, jitter_ms_); + jitter = d(gen); + } + std::this_thread::sleep_for(std::chrono::milliseconds((int)(delay_ms_ + jitter))); + auto out = std_msgs::msg::String(); + out.data = msg->data + " -> " + name; + publisher_->publish(out); + }); + publisher_ = create_publisher(output_topic, 10); + } + + rclcpp::Subscription::SharedPtr subscription_; + rclcpp::Publisher::SharedPtr publisher_; + + // keep a strategy pointer and a vector of executables + std::shared_ptr > strategy_; + std::vector, priority_executor::ExecutableType> > executables_; + int chain_id_ = -1; +protected: + int delay_ms_; + double jitter_ms_; + // record our construction time stamp + std::chrono::steady_clock::time_point construction_time_ = std::chrono::steady_clock::now(); +}; +#else DEFINE_PROCESSING_NODE_CLASS(SmokeClassifierNode) +#endif DEFINE_SENSOR_NODE_CLASS(GPSNode) DEFINE_SENSOR_NODE_CLASS(IMUNode)