From 20c7abdff66348b9df9407d34e2eddd19c4f97aa Mon Sep 17 00:00:00 2001 From: Niklas Halle Date: Wed, 30 Jul 2025 15:10:07 +0200 Subject: [PATCH] seperate chains completely, fix simple example --- batch_run.sh | 2 +- src/full_topology/src/full_topology.cpp | 83 ++++++++++++++----------- src/full_topology/src/simple.cpp | 51 ++++++++++----- src/ros_edf | 2 +- 4 files changed, 83 insertions(+), 55 deletions(-) diff --git a/batch_run.sh b/batch_run.sh index 0d8b460..5650f54 100755 --- a/batch_run.sh +++ b/batch_run.sh @@ -62,4 +62,4 @@ for time_mode in $TIMER_MODES; do run_test_suite "edf" "$boost" "$time_mode" "$thread_mode" done done -done \ No newline at end of file +done diff --git a/src/full_topology/src/full_topology.cpp b/src/full_topology/src/full_topology.cpp index d5ba77a..e260777 100644 --- a/src/full_topology/src/full_topology.cpp +++ b/src/full_topology/src/full_topology.cpp @@ -33,12 +33,17 @@ #endif #ifdef EDF_PRIORITY_EXECUTOR - #define CHAIN_DURATION(CHAIN_ID) chain_duration_##CHAIN_ID - #define DECLARE_CHAIN(CHAIN_ID) double CHAIN_DURATION(CHAIN_ID) = 0.0; - #define SUM_CHAIN_DURATION(CHAIN_ID, NAME) \ - PRINT_BEFORE(CHAIN_ID, NAME); \ - CHAIN_DURATION(CHAIN_ID) += NAME##_duration + NAME##_jitter; \ - PRINT_AFTER(CHAIN_ID, NAME); + #ifdef DEBUG + #define CHAIN_DURATION(CHAIN_ID) chain_duration_##CHAIN_ID + #define DECLARE_CHAIN(CHAIN_ID) double CHAIN_DURATION(CHAIN_ID) = 0.0; + #define SUM_CHAIN_DURATION(CHAIN_ID, NAME) \ + PRINT_BEFORE(CHAIN_ID, NAME); \ + CHAIN_DURATION(CHAIN_ID) += NAME##_duration + NAME##_jitter; \ + PRINT_AFTER(CHAIN_ID, NAME); + #else + #define DECLARE_CHAIN(CHAIN_ID) + #define SUM_CHAIN_DURATION(CHAIN_ID, NAME) + #endif #else #define DECLARE_CHAIN(CHAIN_ID) #define SUM_CHAIN_DURATION(CHAIN_ID, NAME) @@ -114,49 +119,52 @@ int main(int argc, char **argv) #endif #endif + int chain_id = 0; // Chain 1: Image2Ground Mapping - MAKE_INPUT_NODE(0, cameraA, CameraNode, "/input/cameraA/raw", 200); // 60 FPS camera, fast - MAKE_PROCESSING_NODE(0, debayerA, DebayerNode, "/input/cameraA/raw", "/cameraA/debayered", 18); // Moderate, image conversion - MAKE_PROCESSING_NODE(0, radiometricA, RadiometricNode, "/cameraA/debayered", "/cameraA/radiometric", 22); // Heavier image math - MAKE_PROCESSING_NODE(0, geometric, GeometricNode, "/cameraA/radiometric", "/cameraA/geometric", 12); // Perspective transform, medium - MAKE_OUTPUT_NODE(0, mapping, MappingNode, "/cameraA/geometric", "/output/cameraA/mapped", 16); // Warping/stitching, moderate + MAKE_INPUT_NODE(chain_id, cameraA, CameraNode, "/input/cameraA/raw", 200); + MAKE_PROCESSING_NODE(chain_id, debayerA, DebayerNode, "/input/cameraA/raw", "/cameraA/debayered", 18); + MAKE_PROCESSING_NODE(chain_id, radiometricA, RadiometricNode, "/cameraA/debayered", "/cameraA/radiometric", 22); + MAKE_PROCESSING_NODE(chain_id, geometric, GeometricNode, "/cameraA/radiometric", "/cameraA/geometric", 12); + MAKE_OUTPUT_NODE(chain_id, mapping, MappingNode, "/cameraA/geometric", "/output/cameraA/mapped", 16); // Chain 2: Semantic Scheduler Adaptation - MAKE_INPUT_NODE(1, cameraB, CameraNode, "/input/cameraB/raw", 150); // 60 FPS camera, fast - MAKE_PROCESSING_NODE(1, debayerB, DebayerNode, "/input/cameraB/raw", "/cameraB/debayered", 18); // Moderate, image conversion - MAKE_PROCESSING_NODE(1, radiometricB, RadiometricNode, "/cameraB/debayered", "/cameraB/radiometric", 22); // Heavier image math - MAKE_OUTPUT_NODE(1, smoke_classifier, SmokeClassifierNode, "/cameraB/radiometric", "/output/classifier/classification", 35); // CNN/ML classifier, can be high + chain_id = 1; + MAKE_INPUT_NODE(chain_id, cameraB, CameraNode, "/input/cameraB/raw", 150); + MAKE_PROCESSING_NODE(chain_id, debayerB, DebayerNode, "/input/cameraB/raw", "/cameraB/debayered", 18); + MAKE_PROCESSING_NODE(chain_id, radiometricB, RadiometricNode, "/cameraB/debayered", "/cameraB/radiometric", 22); + MAKE_OUTPUT_NODE(chain_id, smoke_classifier, SmokeClassifierNode, "/cameraB/radiometric", "/output/classifier/classification", 35); // Chain 3: Flight Control + chain_id = 2; std::vector fusionA_input_topics = {"/input/gpsA/fix", "/input/imuA/data", "/input/baroA/alt"}; - MAKE_INPUT_NODE(2, gpsA, GPSNode, "/input/gpsA/fix", 100); // Fast, low latency sensor - MAKE_INPUT_NODE(2, imuA, IMUNode, "/input/imuA/data", 100); // Fast, low latency sensor - MAKE_INPUT_NODE(2, baroA, BaroNode, "/input/baroA/alt", 100); // Fast, low latency sensor - MAKE_PROCESSING_NODE(2, fusionA, FusionNode, fusionA_input_topics, "/sensorsA/fused", 14); // Sensor fusion, not trivial but not heavy + MAKE_INPUT_NODE(chain_id, gpsA, GPSNode, "/input/gpsA/fix", 100); + MAKE_INPUT_NODE(chain_id, imuA, IMUNode, "/input/imuA/data", 100); + MAKE_INPUT_NODE(chain_id, baroA, BaroNode, "/input/baroA/alt", 100); + MAKE_PROCESSING_NODE(chain_id, fusionA, FusionNode, fusionA_input_topics, "/sensorsA/fused", 14); - MAKE_INPUT_NODE(2, lidar, LidarNode, "/input/lidar/scan", 100); // LIDAR driver, can be fast - MAKE_INPUT_NODE(2, cmd, CommandNode, "/input/operator/commands", 100); // Operator commands, fast + MAKE_INPUT_NODE(chain_id, lidar, LidarNode, "/input/lidar/scan", 100); + MAKE_INPUT_NODE(chain_id, cmd, CommandNode, "/input/operator/commands", 100); std::vector mgmt_input_topics = {"/sensorsA/fused", "/input/lidar/scan", "/input/operator/commands"}; - MAKE_PROCESSING_NODE(2, mgmt, FlightManagementNode, mgmt_input_topics, "/flight/plan", 18); // Path planning, moderate - //MAKE_PROCESSING_NODE(2, mgmt, FlightManagementNodeS, "/input/operator/commands", "/flight/plan", 18); // Path planning, moderate - MAKE_OUTPUT_NODE(2, control, ControlNode, "/flight/plan", "/output/flight/cmd", 12); // PID controller, fast but non-zero + MAKE_PROCESSING_NODE(chain_id, mgmt, FlightManagementNode, mgmt_input_topics, "/flight/plan", 18); + //MAKE_PROCESSING_NODE(chain_id, mgmt, FlightManagementNodeS, "/input/operator/commands", "/flight/plan", 18); + MAKE_OUTPUT_NODE(chain_id, control, ControlNode, "/flight/plan", "/output/flight/cmd", 12); // Chain 4: Data Storage/Streaming + chain_id = 3; std::vector fusionB_input_topics = {"/input/gpsB/fix", "/input/imuB/data", "/input/baroB/alt"}; - MAKE_INPUT_NODE(3, gpsB, GPSNode, "/input/gpsB/fix", 100); // Fast, low latency sensor - MAKE_INPUT_NODE(3, imuB, IMUNode, "/input/imuB/data", 100); // Fast, low latency sensor - MAKE_INPUT_NODE(3, baroB, BaroNode, "/input/baroB/alt", 100); // Fast, low latency sensor - MAKE_PROCESSING_NODE(3, fusionB, FusionNode, fusionB_input_topics, "/sensorsB/fused", 14); // Sensor fusion, not trivial but not heavy - //MAKE_PROCESSING_NODE(3, fusionB, FusionNodeS, "/input/baroB/alt", "/sensorsB/fused", 14); // Sensor fusion, not trivial but not heavy + MAKE_INPUT_NODE(chain_id, gpsB, GPSNode, "/input/gpsB/fix", 100); // Fast, low latency sensor + MAKE_INPUT_NODE(chain_id, imuB, IMUNode, "/input/imuB/data", 100); // Fast, low latency sensor + MAKE_INPUT_NODE(chain_id, baroB, BaroNode, "/input/baroB/alt", 100); // Fast, low latency sensor + MAKE_PROCESSING_NODE(chain_id, fusionB, FusionNode, fusionB_input_topics, "/sensorsB/fused", 14); + //MAKE_PROCESSING_NODE(chain_id, fusionB, FusionNodeS, "/input/baroB/alt", "/sensorsB/fused", 14); - std::vector telemetry_input_topics = {"/sensorsB/fused", "/output/cameraA/mapped"}; - MAKE_PROCESSING_NODE(3, telemetry, TelemetryNode, telemetry_input_topics, "/telemetry/data", 6); // Quick serialization, low latency - //MAKE_PROCESSING_NODE(3, telemetry, TelemetryNodeS, "/sensorsB/fused", "/telemetry/data", 6); // Quick serialization, low latency - MAKE_OUTPUT_NODE(3, radio, RadioNode, "/telemetry/data", "/output/telemetry/radio", 3); // Output, almost instant + //std::vector telemetry_input_topics = {"/sensorsB/fused", "/output/cameraA/mapped"}; + //MAKE_PROCESSING_NODE(chain_id, telemetry, TelemetryNode, telemetry_input_topics, "/telemetry/data", 6); + MAKE_PROCESSING_NODE(chain_id, telemetryS, TelemetryNodeS, "/sensorsB/fused", "/telemetry/data", 6); + MAKE_OUTPUT_NODE(chain_id, radio, RadioNode, "/telemetry/data", "/output/telemetry/radio", 3); #if defined(EDF_PRIORITY_EXECUTOR) int chain_period; - int chain_id; // --- chain_id = 0; @@ -179,7 +187,7 @@ int main(int argc, char **argv) // you _must_ mark the last executable in the chain (used to keep track of different instances of the same chain) strategy->set_last_in_chain(mapping->subscription_->get_subscription_handle()); strategy->get_priority_settings(mapping->subscription_->get_subscription_handle())->timer_handle = cameraA->timer_; -} + } // --- chain_id = 1; @@ -233,7 +241,8 @@ int main(int argc, char **argv) strategy->set_executable_deadline(imuB->timer_->get_timer_handle(), chain_period, priority_executor::ExecutableType::TIMER, chain_id, "imuB"); strategy->set_executable_deadline(gpsB->timer_->get_timer_handle(), chain_period, priority_executor::ExecutableType::TIMER, chain_id, "gpsB"); REGISTER_MULTI_PROCESSING_NODE(fusionB, chain_period, chain_id); - REGISTER_MULTI_PROCESSING_NODE(telemetry, chain_period, chain_id); + //REGISTER_MULTI_PROCESSING_NODE(telemetry, chain_period, chain_id); + strategy->set_executable_deadline(telemetryS->subscription_->get_subscription_handle(), chain_period, priority_executor::ExecutableType::SUBSCRIPTION, chain_id, "telemetryS"); strategy->set_executable_deadline(radio->subscription_->get_subscription_handle(), chain_period, priority_executor::ExecutableType::SUBSCRIPTION, chain_id, "radio"); strategy->set_last_in_chain(radio->subscription_->get_subscription_handle()); strategy->get_priority_settings(radio->subscription_->get_subscription_handle())->timer_handle = baroB->timer_; @@ -261,7 +270,7 @@ int main(int argc, char **argv) executor.add_node(baroB); executor.add_node(fusionB); - executor.add_node(telemetry); + executor.add_node(telemetryS); executor.add_node(radio); executor.add_node(lidar); diff --git a/src/full_topology/src/simple.cpp b/src/full_topology/src/simple.cpp index b6c39c4..a42ae60 100644 --- a/src/full_topology/src/simple.cpp +++ b/src/full_topology/src/simple.cpp @@ -6,13 +6,12 @@ #include #include #include -// the order here matters + #include "tracetools/utils.hpp" #include "rclcpp/rclcpp.hpp" #include "std_msgs/msg/string.hpp" -#include "config.hpp" #include "nodes.hpp" #include "priority_executor/priority_executor.hpp" @@ -35,12 +34,17 @@ #define PRINT_AFTER(CHAIN_ID, NAME) #endif -#define CHAIN_DURATION(CHAIN_ID) chain_duration_##CHAIN_ID -#define DECLARE_CHAIN(CHAIN_ID) double CHAIN_DURATION(CHAIN_ID) = 0.0; -#define SUM_CHAIN_DURATION(CHAIN_ID, NAME) \ -PRINT_BEFORE(CHAIN_ID, NAME); \ -CHAIN_DURATION(CHAIN_ID) += NAME##_duration + NAME##_jitter; \ -PRINT_AFTER(CHAIN_ID, NAME); +#ifdef EDF_PRIORITY_EXECUTOR + #define CHAIN_DURATION(CHAIN_ID) chain_duration_##CHAIN_ID + #define DECLARE_CHAIN(CHAIN_ID) double CHAIN_DURATION(CHAIN_ID) = 0.0; + #define SUM_CHAIN_DURATION(CHAIN_ID, NAME) \ + PRINT_BEFORE(CHAIN_ID, NAME); \ + CHAIN_DURATION(CHAIN_ID) += NAME##_duration + NAME##_jitter; \ + PRINT_AFTER(CHAIN_ID, NAME); +#else + #define DECLARE_CHAIN(CHAIN_ID) + #define SUM_CHAIN_DURATION(CHAIN_ID, NAME) +#endif #define MAKE_INPUT_NODE(CHAIN_ID, NAME, CLASS, TOPIC, PERIOD) \ auto NAME##_duration = (PERIOD); \ @@ -60,6 +64,18 @@ PRINT_AFTER(CHAIN_ID, NAME); auto NAME = std::make_shared(std::string("output_" STR(NAME) "_node"), INPUT_TOPIC, OUTPUT_TOPIC, NAME##_duration, NAME##_jitter); \ SUM_CHAIN_DURATION(CHAIN_ID, NAME) +#ifdef USE_TIMER_IN_FUSION_NODES + #define REGISTER_MULTI_PROCESSING_NODE(NAME, CHAIN_PERIOD, CHAIN_ID) \ + for (auto const &sub : NAME->subscriptions_) { \ + strategy->set_executable_deadline(sub->get_subscription_handle(), CHAIN_PERIOD, priority_executor::ExecutableType::SUBSCRIPTION, CHAIN_ID, STR(NAME)); \ + } \ + strategy->set_executable_deadline(NAME->timer_->get_timer_handle(), CHAIN_PERIOD, priority_executor::ExecutableType::TIMER, CHAIN_ID, STR(NAME)); +#else + #define REGISTER_MULTI_PROCESSING_NODE(NAME, CHAIN_PERIOD, CHAIN_ID) \ + for (auto const &sub : NAME->subscriptions_) { \ + strategy->set_executable_deadline(sub->get_subscription_handle(), CHAIN_PERIOD, priority_executor::ExecutableType::SUBSCRIPTION, CHAIN_ID, STR(NAME)); \ + } +#endif int main(int argc, char **argv) { @@ -76,8 +92,10 @@ int main(int argc, char **argv) // Must be set to post_execute can set new deadlines executor.prio_memory_strategy_ = strategy; + double fps = 60.0; // Frames per second for the camera + double period = 1000.0 / fps; // Period in milliseconds // chain id, node name, node class, input topic, output topic, duration in ms - MAKE_INPUT_NODE( 0, cameraA , CameraNode , /*has no input topic*/ "/input/cameraA/raw" , 1000.0/2.0); // 60 FPS camera, fast + MAKE_INPUT_NODE( 0, cameraA , CameraNode , /*has no input topic*/ "/input/cameraA/raw" , period); // 60 FPS camera, fast MAKE_PROCESSING_NODE(0, debayerA , DebayerNode , "/input/cameraA/raw" , "/cameraA/debayered" , 18); // Moderate, image conversion MAKE_PROCESSING_NODE(0, radiometricA, RadiometricNode, "/cameraA/debayered" , "/cameraA/radiometric" , 22); // Heavier image math MAKE_PROCESSING_NODE(0, geometric , GeometricNode , "/cameraA/radiometric", "/cameraA/geometric" , 12); // Perspective transform, medium @@ -85,21 +103,22 @@ int main(int argc, char **argv) // the new funcitons in PriorityMemoryStrategy accept the handle of the // timer/subscription as the first argument - strategy->set_executable_deadline(cameraA->timer_->get_timer_handle() , 1000, priority_executor::ExecutableType::TIMER , 0); - // you _must_ set the timer_handle for each chain - strategy->get_priority_settings(cameraA->timer_->get_timer_handle())->timer_handle = cameraA->timer_; + strategy->set_executable_deadline(cameraA->timer_->get_timer_handle() , period, priority_executor::ExecutableType::TIMER , 0); // you _must_ mark the first executable in the chain strategy->set_first_in_chain(cameraA->timer_->get_timer_handle()); // Set the executable deadlines for each node - strategy->set_executable_deadline(debayerA->subscription_->get_subscription_handle() , 1000, priority_executor::ExecutableType::SUBSCRIPTION, 0); - strategy->set_executable_deadline(radiometricA->subscription_->get_subscription_handle(), 1000, priority_executor::ExecutableType::SUBSCRIPTION, 0); - strategy->set_executable_deadline(geometric->subscription_->get_subscription_handle() , 1000, priority_executor::ExecutableType::SUBSCRIPTION, 0); - strategy->set_executable_deadline(mapping->subscription_->get_subscription_handle() , 1000, priority_executor::ExecutableType::SUBSCRIPTION, 0); + strategy->set_executable_deadline(debayerA->subscription_->get_subscription_handle() , period, priority_executor::ExecutableType::SUBSCRIPTION, 0); + strategy->set_executable_deadline(radiometricA->subscription_->get_subscription_handle(), period, priority_executor::ExecutableType::SUBSCRIPTION, 0); + strategy->set_executable_deadline(geometric->subscription_->get_subscription_handle() , period, priority_executor::ExecutableType::SUBSCRIPTION, 0); + strategy->set_executable_deadline(mapping->subscription_->get_subscription_handle() , period, priority_executor::ExecutableType::SUBSCRIPTION, 0); // you _must_ mark the last executable in the chain (used to keep track of different instances of the same chain) strategy->set_last_in_chain(mapping->subscription_->get_subscription_handle()); + // you _must_ set the timer_handle of the last callback in the chain to the timer of the first node + strategy->get_priority_settings(mapping->subscription_->get_subscription_handle())->timer_handle = cameraA->timer_; + executor.add_node(cameraA); executor.add_node(debayerA); executor.add_node(radiometricA); diff --git a/src/ros_edf b/src/ros_edf index 5f73198..f62e64c 160000 --- a/src/ros_edf +++ b/src/ros_edf @@ -1 +1 @@ -Subproject commit 5f73198adbabb3a34985fb5e5c7f84b5e4149a86 +Subproject commit f62e64cb565b8c6f422c8033691c8099e075fe0f