From 8cd5b03b033f557f24c7cc2cfbd3ab22ac0dd827 Mon Sep 17 00:00:00 2001 From: Niklas Halle Date: Mon, 9 Jun 2025 15:24:34 +0200 Subject: [PATCH] got ros and edf to work, built startup script for reliable experimenting --- build.sh | 8 +- build_run_copy.sh | 71 ++- src/full_topology/CMakeLists.txt | 32 +- .../launch/trace_full_topology.launch.py | 19 +- src/full_topology/package.xml | 1 + src/full_topology/src/full_topology.cpp | 498 +++++++++++------- src/full_topology/src/nodes.hpp | 203 +++++++ src/full_topology/src/simple.cpp | 125 +++++ src/ros_edf | 2 +- 9 files changed, 745 insertions(+), 214 deletions(-) create mode 100644 src/full_topology/src/nodes.hpp create mode 100644 src/full_topology/src/simple.cpp diff --git a/build.sh b/build.sh index e04b80d..2fb9f8c 100755 --- a/build.sh +++ b/build.sh @@ -1,5 +1,7 @@ #!/usr/bin/env bash +export RMW_IMPLEMENTATION=rmw_cyclonedds_cpp + colcon build --packages-select tracetools --allow-overriding tracetools && \ source install/setup.bash && \ colcon build --packages-select cyclonedds --allow-overriding cyclonedds && \ @@ -10,7 +12,9 @@ colcon build --packages-select tracetools --allow-overriding tracetools && \ source install/setup.bash && \ colcon build --packages-select rclcpp --allow-overriding rclcpp && \ source install/setup.bash && \ - colcon build --packages-up-to full_topology && \ + colcon build --packages-select simple_timer priority_executor && \ + source install/setup.bash && \ + colcon build --packages-up-to ros2trace ros2trace_analysis full_topology && \ source install/setup.bash -export RMW_IMPLEMENTATION=rmw_cyclonedds_cpp +export RMW_IMPLEMENTATION=rmw_cyclonedds_cpp \ No newline at end of file diff --git a/build_run_copy.sh b/build_run_copy.sh index 3e3e598..5a6d7bc 100755 --- a/build_run_copy.sh +++ b/build_run_copy.sh @@ -1,9 +1,66 @@ #!/usr/bin/env bash -colcon build --packages-select full_topology && \ - source install/setup.bash && \ - TRACE_ID=$(taskset -c 2,3 ros2 launch full_topology trace_full_topology.launch.py length:=100 | \ - grep -oP '(?<=Writing tracing session to: /home/niklas/ROS-Dynamic-Executor-Experiments/analysis/tracing/full_topology_tracing-)\d+') && \ - ros2 trace-analysis convert "/home/niklas/ROS-Dynamic-Executor-Experiments/analysis/tracing/full_topology_tracing-${TRACE_ID}/ust" && \ - scp -r "analysis/tracing/full_topology_tracing-${TRACE_ID}/" 192.168.162.249:/home/niklas/dataflow-analysis/traces/ && \ - echo "Trace analysis \"${TRACE_ID}\" complete and copied to remote server." +set -e + +# ----------- CONFIGURABLES ----------- +REMOTE_USER="niklas" +REMOTE_HOST="192.168.162.249" +REMOTE_PATH="/home/niklas/dataflow-analysis/traces" +TRACING_BASE="analysis/tracing" +LOCAL_PROJECT_DIR="/home/niklas/ROS-Dynamic-Executor-Experiments" +# -------------------------------------- + +usage() { + echo "Usage: $0 [time_in_seconds]" + exit 1 +} + +if [[ $# -lt 3 ]]; then + usage +fi + +SCHEDULER_TYPE="$1" +THREADING_TYPE="$2" +FUSION_TYPE="$3" +TRACE_LENGTH="${4:-10}" + +# Determine defines based on parameters +CMAKE_ARGS=() +[[ "$SCHEDULER_TYPE" == "ros" ]] && CMAKE_ARGS+=("-DROS_DEFAULT_EXECUTOR=ON") +[[ "$SCHEDULER_TYPE" == "edf" ]] && CMAKE_ARGS+=("-DEDF_PRIORITY_EXECUTOR=ON") +[[ "$SCHEDULER_TYPE" == "sem" ]] && CMAKE_ARGS+=("-DSEMANTIC_EXECUTOR=ON") + +[[ "$THREADING_TYPE" == "multi" ]] && CMAKE_ARGS+=("-DMULTI_THREADED=ON") +[[ "$FUSION_TYPE" == "timed" ]] && CMAKE_ARGS+=("-DUSE_TIMER_IN_FUSION_NODES=ON") + +TRACE_NAME="${SCHEDULER_TYPE}_${THREADING_TYPE}_${FUSION_TYPE}_${TRACE_LENGTH}" + +cd "$LOCAL_PROJECT_DIR" + +colcon build --packages-select full_topology --cmake-args "${CMAKE_ARGS[@]}" + +source install/setup.bash + +TRACE_ID=$(taskset -c 2,3 ros2 launch full_topology trace_full_topology.launch.py "length:=${TRACE_LENGTH}" | \ + grep -oP '(?<=Writing tracing session to: /home/niklas/ROS-Dynamic-Executor-Experiments/analysis/tracing/)[^\s]+') + +if [[ -z "$TRACE_ID" ]]; then + echo "[ERROR] Failed to get tracing session directory. Aborting." + exit 2 +fi + +TIMESTAMP_SUFFIX="${TRACE_ID#trace_full_topology}" +NEW_TRACE_ID="${TRACE_NAME}${TIMESTAMP_SUFFIX}" +TRACE_ORIG_PATH="${TRACING_BASE}/${TRACE_ID}" +TRACE_NEW_PATH="${TRACING_BASE}/${NEW_TRACE_ID}" + +ros2 trace-analysis convert "${TRACE_ORIG_PATH}/ust" + +if [[ -e "$TRACE_NEW_PATH" ]]; then + rm -rf "$TRACE_NEW_PATH" +fi +mv "$TRACE_ORIG_PATH" "$TRACE_NEW_PATH" + +scp -r "$TRACE_NEW_PATH/" "$REMOTE_USER@$REMOTE_HOST:$REMOTE_PATH/" + +echo "[SUCCESS] Trace analysis '$NEW_TRACE_ID' complete and copied to remote server." diff --git a/src/full_topology/CMakeLists.txt b/src/full_topology/CMakeLists.txt index b3f816f..0ae7f0c 100644 --- a/src/full_topology/CMakeLists.txt +++ b/src/full_topology/CMakeLists.txt @@ -12,14 +12,43 @@ if(CMAKE_COMPILER_IS_GNUCXX OR CMAKE_CXX_COMPILER_ID MATCHES "Clang") endif() # find dependencies +find_package(priority_executor REQUIRED) find_package(ament_cmake REQUIRED) find_package(rclcpp REQUIRED) find_package(std_msgs REQUIRED) +# Of the following three, only one should be one at any given time. If none is set, ros default will be used +option(ROS_DEFAULT_EXECUTOR "Use ROS default executor" OFF) +option(EDF_PRIORITY_EXECUTOR "Use EDF priority executor" OFF) +option(SEMANTIC_EXECUTOR "Use semantic executor" OFF) + +option(MULTI_THREADED "Use multi-threading" OFF) +option(USE_TIMER_IN_FUSION_NODES "Use timers in fusion nodes" OFF) + +if(ROS_DEFAULT_EXECUTOR) + add_compile_definitions(ROS_DEFAULT_EXECUTOR) +endif() + +if(EDF_PRIORITY_EXECUTOR) + add_compile_definitions(EDF_PRIORITY_EXECUTOR) +endif() + +if(SEMANTIC_EXECUTOR) + add_compile_definitions(SEMANTIC_EXECUTOR) +endif() + +if(MULTI_THREADED) + add_compile_definitions(MULTI_THREADED) +endif() + +if(USE_TIMER_IN_FUSION_NODES) + add_compile_definitions(USE_TIMER_IN_FUSION_NODES) +endif() + add_compile_definitions(RCLCPP_ENABLE_TRACEPOINTS) add_executable(${PROJECT_NAME} src/full_topology.cpp) -#add_executable(${PROJECT_NAME} src/test.cpp) +#add_executable(${PROJECT_NAME} src/simple.cpp) target_include_directories(${PROJECT_NAME} PUBLIC $ $ @@ -27,6 +56,7 @@ target_include_directories(${PROJECT_NAME} PUBLIC # Add dependencies with ament ament_target_dependencies(${PROJECT_NAME} + priority_executor rclcpp std_msgs tracetools diff --git a/src/full_topology/launch/trace_full_topology.launch.py b/src/full_topology/launch/trace_full_topology.launch.py index 2a40d95..073a56f 100644 --- a/src/full_topology/launch/trace_full_topology.launch.py +++ b/src/full_topology/launch/trace_full_topology.launch.py @@ -17,7 +17,7 @@ def generate_launch_description(): return launch.LaunchDescription([ length_arg, Trace( - session_name='full_topology_tracing', + session_name='trace_full_topology', append_timestamp=True, base_path=target_path, events_ust=[ @@ -29,23 +29,8 @@ def generate_launch_description(): 'rclcpp:*callback*', 'rclcpp:*executor*', 'rmw_cyclonedds:*', - # explicit list of message‑flow events - 'ros2:rclcpp_publish', - 'ros2:rclcpp_subscription_callback', - 'ros2:rclcpp_timer_callback', - 'ros2:rclcpp_executor_execute', - 'ros2:rclcpp_intra_publish', - 'ros2:rmw_take', - 'ros2:callback_start', - 'ros2:callback_end', - # ---- (optional) add whatever else you like ---- - # scheduling / executor / memory events: - 'ros2:rclcpp_executor_*', - 'ros2:*callback_added', - ], - events_kernel=[ - 'sched_switch', 'sched_wakeup' ], + events_kernel=[], ), Node( package='full_topology', diff --git a/src/full_topology/package.xml b/src/full_topology/package.xml index 0afe9fe..f1f6db4 100644 --- a/src/full_topology/package.xml +++ b/src/full_topology/package.xml @@ -15,6 +15,7 @@ tracetools_launch + priority_executor rclcpp std_msgs tracetools diff --git a/src/full_topology/src/full_topology.cpp b/src/full_topology/src/full_topology.cpp index a392bb7..966a707 100644 --- a/src/full_topology/src/full_topology.cpp +++ b/src/full_topology/src/full_topology.cpp @@ -9,222 +9,244 @@ // the order here matters #include "tracetools/utils.hpp" -#include +#include "rclcpp/rclcpp.hpp" #include "std_msgs/msg/string.hpp" -/** Whether to use timers in fusion nodes */ -//#define USE_TIMER_IN_FUSION_NODES +#include "nodes.hpp" -using namespace std::chrono_literals; +#include "priority_executor/priority_executor.hpp" +#include "priority_executor/multithread_priority_executor.hpp" +#include "priority_executor/priority_memory_strategy.hpp" -constexpr bool DEBUG = false; +#define PROCENTUAL_INPUT_JITTER 0.05 +#define PROCENTUAL_PROCESSING_JITTER 0.1 -// A generic sensor node that publishes at a given rate + jitter -#define DEFINE_SENSOR_NODE_CLASS(NAME) \ -class NAME : public rclcpp::Node \ -{ \ -public: \ - NAME(std::string const &name, \ - std::string const &topic, \ - int rate_hz, \ - double jitter_ms_ = 0.0) \ - : Node(name) \ - { \ - publisher_ = create_publisher(topic, 10); \ - auto period = std::chrono::milliseconds(1000 / rate_hz); \ - timer_ = create_wall_timer(period, [this, name, jitter_ms_]() \ - { \ - double jitter = 0.0; \ - if (jitter_ms_ > 0.0) { \ - static thread_local std::mt19937 gen(std::random_device{}()); \ - std::normal_distribution<> d(0.0, jitter_ms_); \ - jitter = d(gen); \ - } \ - /* Ensure total delay is non-negative */ \ - int total_delay = std::max(0, static_cast(std::round(jitter))); \ - std::this_thread::sleep_for(std::chrono::milliseconds(total_delay)); \ - auto msg = std_msgs::msg::String(); \ - msg.data = name + std::string(" publishes"); \ - if (DEBUG) { \ - RCLCPP_INFO(get_logger(), "%s", msg.data.c_str()); \ - } \ - publisher_->publish(msg); }); \ - } \ - \ -private: \ - rclcpp::Publisher::SharedPtr publisher_; \ - rclcpp::TimerBase::SharedPtr timer_; \ -}; +#define STR_HELPER(x) #x +#define STR(x) STR_HELPER(x) -// A generic single-input processing node with a fixed delay +/- jitter -#define DEFINE_PROCESSING_NODE_CLASS(NAME) \ -class NAME : public rclcpp::Node \ -{ \ -public: \ - NAME(const std::string & name, \ - const std::string & input_topic, \ - const std::string & output_topic, \ - int delay_ms, \ - double jitter_ms = 0.0) \ - : Node(name), delay_ms_(delay_ms), jitter_ms_(jitter_ms) \ - { \ - subscription_ = create_subscription( \ - input_topic, 10, \ - [this, name](std_msgs::msg::String::SharedPtr msg) \ - { \ - if (DEBUG) { \ - RCLCPP_INFO(get_logger(), "%s received: %s", name.c_str(), msg->data.c_str()); \ - } \ - /* Simulate processing delay */ \ - double jitter = 0.0; \ - if (jitter_ms_ > 0.0) { \ - static thread_local std::mt19937 gen(std::random_device{}());\ - std::normal_distribution<> d(0.0, jitter_ms_); \ - jitter = d(gen); \ - } \ - std::this_thread::sleep_for(std::chrono::milliseconds((int)(delay_ms_ + jitter))); \ - auto out = std_msgs::msg::String(); \ - out.data = msg->data + " -> " + name; \ - publisher_->publish(out); \ - }); \ - publisher_ = create_publisher(output_topic, 10); \ - } \ -protected: \ - int delay_ms_; \ - double jitter_ms_; \ - rclcpp::Subscription::SharedPtr subscription_; \ - rclcpp::Publisher::SharedPtr publisher_; \ -}; +#ifndef DEBUG + #define PRINT_BEFORE(CHAIN_ID, NAME) \ + std::cout << "Chain " << CHAIN_ID << " before: " << CHAIN_DURATION(CHAIN_ID) << " ms" << std::endl; + #define PRINT_AFTER(CHAIN_ID, NAME) \ + std::cout << "Chain " << CHAIN_ID << " after: " << CHAIN_DURATION(CHAIN_ID) << " ms" << std::endl; +#else + #define PRINT_BEFORE(CHAIN_ID, NAME) + #define PRINT_AFTER(CHAIN_ID, NAME) +#endif -// A generic single-input processing node with a fixed delay +/- jitter -#define DEFINE_MULTI_PROCESSING_NODE_CLASS(NAME) \ -class NAME : public rclcpp::Node \ -{ \ -public: \ - NAME(std::string const &name, \ - std::vector const &input_topics, \ - std::string const &output_topic, \ - int delay_ms, \ - double jitter_ms = 0.0) \ - : Node(name), delay_ms_(delay_ms), jitter_ms_(jitter_ms) \ - { \ - for (auto &topic : input_topics) \ - { \ - auto sub = create_subscription( \ - topic, 10, \ - [this, name](std_msgs::msg::String::SharedPtr msg) \ - { \ - /* simple fusion: log each arrival */ \ - if (DEBUG) \ - { \ - RCLCPP_INFO(get_logger(), "%s fusing input: %s", name.c_str(), msg->data.c_str()); \ - } \ - /* Simulate processing delay */ \ - double jitter = 0.0; \ - if (jitter_ms_ > 0.0) { \ - static thread_local std::mt19937 gen(std::random_device{}()); \ - std::normal_distribution<> d(0.0, jitter_ms_); \ - jitter = d(gen); \ - } \ - std::this_thread::sleep_for(std::chrono::milliseconds((int)(delay_ms_ + jitter))); \ - auto out = std_msgs::msg::String(); \ - out.data = msg->data + " -> " + name; \ - if (DEBUG) { \ - RCLCPP_INFO(get_logger(), "%s publishes fused message", name.c_str()); \ - } \ - publisher_->publish(out); \ - }); \ - subscriptions_.emplace_back(sub); \ - } \ - publisher_ = create_publisher(output_topic, 10); \ - \ - } \ -protected: \ - std::vector::SharedPtr> subscriptions_; \ - rclcpp::Publisher::SharedPtr publisher_; \ - int delay_ms_; \ - double jitter_ms_; \ -}; +#ifdef EDF_PRIORITY_EXECUTOR + #define CHAIN_DURATION(CHAIN_ID) chain_duration_##CHAIN_ID + #define DECLARE_CHAIN(CHAIN_ID) double CHAIN_DURATION(CHAIN_ID) = 0.0; + #define SUM_CHAIN_DURATION(CHAIN_ID, NAME) \ + PRINT_BEFORE(CHAIN_ID, NAME); \ + CHAIN_DURATION(CHAIN_ID) += NAME##_duration + NAME##_jitter; \ + PRINT_AFTER(CHAIN_ID, NAME); +#else + #define DECLARE_CHAIN(CHAIN_ID) + #define SUM_CHAIN_DURATION(CHAIN_ID, NAME) +#endif -DEFINE_SENSOR_NODE_CLASS(CameraNode) -DEFINE_PROCESSING_NODE_CLASS(DebayerNode) -DEFINE_PROCESSING_NODE_CLASS(RadiometricNode) -DEFINE_PROCESSING_NODE_CLASS(GeometricNode) -DEFINE_PROCESSING_NODE_CLASS(MappingNode) +#define MAKE_INPUT_NODE(CHAIN_ID, NAME, CLASS, TOPIC, PERIOD) \ + auto NAME##_duration = (PERIOD); \ + auto NAME##_jitter = PROCENTUAL_INPUT_JITTER * (PERIOD); \ + auto NAME = std::make_shared(std::string("input_" STR(NAME) "_node"), TOPIC, NAME##_duration, NAME##_jitter); \ + SUM_CHAIN_DURATION(CHAIN_ID, NAME) -DEFINE_PROCESSING_NODE_CLASS(SmokeClassifierNode) +#define MAKE_PROCESSING_NODE(CHAIN_ID, NAME, CLASS, INPUT_TOPIC, OUTPUT_TOPIC, DURATION) \ + auto NAME##_duration = (DURATION); \ + auto NAME##_jitter = PROCENTUAL_PROCESSING_JITTER * (DURATION); \ + auto NAME = std::make_shared(std::string(STR(NAME) "_node"), INPUT_TOPIC, OUTPUT_TOPIC, NAME##_duration, NAME##_jitter); \ + SUM_CHAIN_DURATION(CHAIN_ID, NAME) -DEFINE_SENSOR_NODE_CLASS(GPSNode) -DEFINE_SENSOR_NODE_CLASS(IMUNode) -DEFINE_SENSOR_NODE_CLASS(BaroNode) +#define MAKE_OUTPUT_NODE(CHAIN_ID, NAME, CLASS, INPUT_TOPIC, OUTPUT_TOPIC, DURATION) \ + auto NAME##_duration = (DURATION); \ + auto NAME##_jitter = PROCENTUAL_PROCESSING_JITTER * (DURATION); \ + auto NAME = std::make_shared(std::string("output_" STR(NAME) "_node"), INPUT_TOPIC, OUTPUT_TOPIC, NAME##_duration, NAME##_jitter); \ + SUM_CHAIN_DURATION(CHAIN_ID, NAME) -DEFINE_PROCESSING_NODE_CLASS(FusionNodeS) -DEFINE_MULTI_PROCESSING_NODE_CLASS(FusionNode) +#ifdef USE_TIMER_IN_FUSION_NODES + #define REGISTER_MULTI_PROCESSING_NODE(NAME, CHAIN_PERIOD, CHAIN_ID) \ + for (auto const &sub : NAME->subscriptions_) { \ + strategy->set_executable_deadline(sub->get_subscription_handle(), CHAIN_PERIOD, priority_executor::ExecutableType::SUBSCRIPTION, CHAIN_ID); \ + } \ + strategy->set_executable_deadline(NAME->timer_->get_timer_handle(), CHAIN_PERIOD, priority_executor::ExecutableType::TIMER, CHAIN_ID); +#else + #define REGISTER_MULTI_PROCESSING_NODE(NAME, CHAIN_PERIOD, CHAIN_ID) \ + for (auto const &sub : NAME->subscriptions_) { \ + strategy->set_executable_deadline(sub->get_subscription_handle(), CHAIN_PERIOD, priority_executor::ExecutableType::SUBSCRIPTION, CHAIN_ID); \ + } +#endif -DEFINE_SENSOR_NODE_CLASS(LidarNode) +int edf_usage_example(int argc, char **argv); -DEFINE_SENSOR_NODE_CLASS(CommandNode) - -DEFINE_PROCESSING_NODE_CLASS(FlightManagementNodeS) - -DEFINE_MULTI_PROCESSING_NODE_CLASS(FlightManagementNode) - -DEFINE_PROCESSING_NODE_CLASS(ControlNode) - -DEFINE_PROCESSING_NODE_CLASS(TelemetryNodeS) -DEFINE_MULTI_PROCESSING_NODE_CLASS(TelemetryNode) - -DEFINE_PROCESSING_NODE_CLASS(RadioNode) - -#define PROCENTUAL_JITTER 0.1 -#define DELAY_JITTER_PAIR(DELAY) DELAY, PROCENTUAL_JITTER * DELAY int main(int argc, char **argv) { rclcpp::init(argc, argv); + + DECLARE_CHAIN(0); // Chain 1: Image2Ground Mapping + DECLARE_CHAIN(1); // Chain 2: Semantic Scheduler Adaptation + DECLARE_CHAIN(2); // Chain 3: Flight Control + DECLARE_CHAIN(3); // Chain 4: Data Storage/Streaming + +#if defined(SEMANTIC_EXECUTOR) + std::cerr << "Not yet implemented" << std::endl; + return -1; +#elif defined(EDF_PRIORITY_EXECUTOR) + // Create a priority executor with a custom memory strategy + auto strategy = std::make_shared>(); + rclcpp::ExecutorOptions options; + options.memory_strategy = strategy; + #ifdef MULTI_THREADED + auto executor = priority_executor::MultithreadTimedExecutor(options); + #else + auto executor = priority_executor::TimedExecutor(options); + #endif + + // Must be set to post_execute can set new deadlines + executor.prio_memory_strategy_ = strategy; +#else // default: ROS + #ifdef MULTI_THREADED + // Create a multi-threaded executor to handle all nodes rclcpp::executors::MultiThreadedExecutor executor; + #else + // Create a single-threaded executor to handle all nodes + rclcpp::executors::SingleThreadedExecutor executor; + #endif +#endif // Chain 1: Image2Ground Mapping - auto camera = std::make_shared("input_camera_node", "/input/camera/raw", DELAY_JITTER_PAIR(5)); // Sensor read, low latency - auto debayer = std::make_shared("debayer_node", "/input/camera/raw", "/camera/debayered", DELAY_JITTER_PAIR(18)); // Moderate, image conversion - auto radiometric = std::make_shared("radiometric_node", "/camera/debayered", "/camera/radiometric", DELAY_JITTER_PAIR(22)); // Heavier image math - auto geometric = std::make_shared("geometric_node", "/camera/radiometric", "/camera/geometric", DELAY_JITTER_PAIR(12)); // Perspective transform, medium - auto mapping = std::make_shared("output_mapping_node", "/camera/geometric", "/output/camera/mapped", DELAY_JITTER_PAIR(16)); // Warping/stitching, moderate - + MAKE_INPUT_NODE(0, cameraA, CameraNode, "/input/cameraA/raw", 200); // 60 FPS camera, fast + MAKE_PROCESSING_NODE(0, debayerA, DebayerNode, "/input/cameraA/raw", "/cameraA/debayered", 18); // Moderate, image conversion + MAKE_PROCESSING_NODE(0, radiometricA, RadiometricNode, "/cameraA/debayered", "/cameraA/radiometric", 22); // Heavier image math + MAKE_PROCESSING_NODE(0, geometric, GeometricNode, "/cameraA/radiometric", "/cameraA/geometric", 12); // Perspective transform, medium + MAKE_OUTPUT_NODE(0, mapping, MappingNode, "/cameraA/geometric", "/output/cameraA/mapped", 16); // Warping/stitching, moderate + // Chain 2: Semantic Scheduler Adaptation - auto smoke = std::make_shared("output_smoke_classifier_node", "/camera/radiometric", "/output/classifier/classification", DELAY_JITTER_PAIR(35)); // CNN/ML classifier, can be high + MAKE_INPUT_NODE(1, cameraB, CameraNode, "/input/cameraB/raw", 150); // 60 FPS camera, fast + MAKE_PROCESSING_NODE(1, debayerB, DebayerNode, "/input/cameraB/raw", "/cameraB/debayered", 18); // Moderate, image conversion + MAKE_PROCESSING_NODE(1, radiometricB, RadiometricNode, "/cameraB/debayered", "/cameraB/radiometric", 22); // Heavier image math + MAKE_OUTPUT_NODE(1, smoke_classifier, SmokeClassifierNode, "/cameraB/radiometric", "/output/classifier/classification", 35); // CNN/ML classifier, can be high // Chain 3: Flight Control - auto gps = std::make_shared("input_gps_node", "/input/gps/fix", DELAY_JITTER_PAIR(7)); // Fast, low latency sensor - auto imu = std::make_shared("input_imu_node", "/input/imu/data", DELAY_JITTER_PAIR(5)); // Fast, low latency sensor - auto baro = std::make_shared("input_baro_node", "/input/baro/alt", DELAY_JITTER_PAIR(6)); // Fast, low latency sensor - auto fusion = std::make_shared( - "sensor_fusion_node", std::vector{"/input/gps/fix", "/input/imu/data", "/input/baro/alt"}, "/sensors/fused", DELAY_JITTER_PAIR(14)); // Sensor fusion, not trivial but not heavy - - auto lidar = std::make_shared("input_lidar_node", "/input/lidar/scan", DELAY_JITTER_PAIR(10)); // LIDAR driver, can be fast - auto cmd = std::make_shared("input_operator_cmd_node", "/input/operator/commands", DELAY_JITTER_PAIR(8)); // Operator commands, fast - auto mgmt = std::make_shared( - "flight_mgmt_node", std::vector{"/sensors/fused", "/input/lidar/scan", "/input/operator/commands"}, "/flight/plan", DELAY_JITTER_PAIR(18)); // Path planning, moderate - auto control = std::make_shared("output_flight_control_node", "/flight/plan", "/output/flight/cmd", DELAY_JITTER_PAIR(12)); // PID controller, fast but non-zero + std::vector fusionA_input_topics = {"/input/gpsA/fix", "/input/imuA/data", "/input/baroA/alt"}; + MAKE_INPUT_NODE(2, gpsA, GPSNode, "/input/gpsA/fix", 100); // Fast, low latency sensor + MAKE_INPUT_NODE(2, imuA, IMUNode, "/input/imuA/data", 100); // Fast, low latency sensor + MAKE_INPUT_NODE(2, baroA, BaroNode, "/input/baroA/alt", 100); // Fast, low latency sensor + MAKE_PROCESSING_NODE(2, fusionA, FusionNode, fusionA_input_topics, "/sensorsA/fused", 14); // Sensor fusion, not trivial but not heavy + + MAKE_INPUT_NODE(2, lidar, LidarNode, "/input/lidar/scan", 100); // LIDAR driver, can be fast + MAKE_INPUT_NODE(2, cmd, CommandNode, "/input/operator/commands", 100); // Operator commands, fast + std::vector mgmt_input_topics = {"/sensorsA/fused", "/input/lidar/scan", "/input/operator/commands"}; + MAKE_PROCESSING_NODE(2, mgmt, FlightManagementNode, mgmt_input_topics, "/flight/plan", 18); // Path planning, moderate + //MAKE_PROCESSING_NODE(2, mgmt, FlightManagementNodeS, "/input/operator/commands", "/flight/plan", 18); // Path planning, moderate + MAKE_OUTPUT_NODE(2, control, ControlNode, "/flight/plan", "/output/flight/cmd", 12); // PID controller, fast but non-zero // Chain 4: Data Storage/Streaming - auto telemetry = std::make_shared( - "telemetry_node", std::vector{"/sensors/fused", "/output/camera/mapped"}, "/telemetry/data", DELAY_JITTER_PAIR(6)); // Quick serialization - auto radio = std::make_shared("output_radio_tx_node", "/telemetry/data", "/output/telemetry/radio", DELAY_JITTER_PAIR(3)); // Output, almost instant + std::vector fusionB_input_topics = {"/input/gpsB/fix", "/input/imuB/data", "/input/baroB/alt"}; + MAKE_INPUT_NODE(3, gpsB, GPSNode, "/input/gpsB/fix", 100); // Fast, low latency sensor + MAKE_INPUT_NODE(3, imuB, IMUNode, "/input/imuB/data", 100); // Fast, low latency sensor + MAKE_INPUT_NODE(3, baroB, BaroNode, "/input/baroB/alt", 100); // Fast, low latency sensor + MAKE_PROCESSING_NODE(3, fusionB, FusionNode, fusionB_input_topics, "/sensorsB/fused", 14); // Sensor fusion, not trivial but not heavy + //MAKE_PROCESSING_NODE(3, fusionB, FusionNodeS, "/input/baroB/alt", "/sensorsB/fused", 14); // Sensor fusion, not trivial but not heavy - // Add all to executor - executor.add_node(camera); - executor.add_node(debayer); - executor.add_node(radiometric); + std::vector telemetry_input_topics = {"/sensorsB/fused", "/output/cameraA/mapped"}; + MAKE_PROCESSING_NODE(3, telemetry, TelemetryNode, telemetry_input_topics, "/telemetry/data", 6); // Quick serialization, low latency + //MAKE_PROCESSING_NODE(3, telemetry, TelemetryNodeS, "/sensorsB/fused", "/telemetry/data", 6); // Quick serialization, low latency + MAKE_OUTPUT_NODE(3, radio, RadioNode, "/telemetry/data", "/output/telemetry/radio", 3); // Output, almost instant + +#if defined(EDF_PRIORITY_EXECUTOR) + int chain_period; + int chain_id; + + // --- + chain_id = 0; + chain_period = 1000; + { + // the new funcitons in PriorityMemoryStrategy accept the handle of the + // timer/subscription as the first argument + strategy->set_executable_deadline(cameraA->timer_->get_timer_handle(), chain_period, priority_executor::ExecutableType::TIMER, chain_id); + // you _must_ set the timer_handle for each chain + strategy->get_priority_settings(cameraA->timer_->get_timer_handle())->timer_handle = cameraA->timer_; + // you _must_ mark the first executable in the chain + strategy->set_first_in_chain(cameraA->timer_->get_timer_handle()); + + // Set the executable deadlines for each node + strategy->set_executable_deadline(debayerA->subscription_->get_subscription_handle(), chain_period, priority_executor::ExecutableType::SUBSCRIPTION, chain_id); + strategy->set_executable_deadline(radiometricA->subscription_->get_subscription_handle(), chain_period, priority_executor::ExecutableType::SUBSCRIPTION, chain_id); + strategy->set_executable_deadline(geometric->subscription_->get_subscription_handle(), chain_period, priority_executor::ExecutableType::SUBSCRIPTION, chain_id); + strategy->set_executable_deadline(mapping->subscription_->get_subscription_handle(), chain_period, priority_executor::ExecutableType::SUBSCRIPTION, chain_id); + + // you _must_ mark the last executable in the chain (used to keep track of different instances of the same chain) + strategy->set_last_in_chain(mapping->subscription_->get_subscription_handle()); +} + + // --- + chain_id = 1; + chain_period = 1000; + { + strategy->set_executable_deadline(cameraB->timer_->get_timer_handle(), chain_period, priority_executor::ExecutableType::TIMER, chain_id); + strategy->get_priority_settings(cameraB->timer_->get_timer_handle())->timer_handle = cameraB->timer_; + strategy->set_first_in_chain(cameraB->timer_->get_timer_handle()); + strategy->set_executable_deadline(debayerB->subscription_->get_subscription_handle(), chain_period, priority_executor::ExecutableType::SUBSCRIPTION, chain_id); + strategy->set_executable_deadline(radiometricB->subscription_->get_subscription_handle(), chain_period, priority_executor::ExecutableType::SUBSCRIPTION, chain_id); + strategy->set_executable_deadline(smoke_classifier->subscription_->get_subscription_handle(), chain_period, priority_executor::ExecutableType::SUBSCRIPTION, chain_id); + strategy->set_last_in_chain(smoke_classifier->subscription_->get_subscription_handle()); + } + + // --- + chain_id = 2; + chain_period = 1000; + { + strategy->set_executable_deadline(cmd->timer_->get_timer_handle(), chain_period, priority_executor::ExecutableType::TIMER, chain_id); + strategy->get_priority_settings(cmd->timer_->get_timer_handle())->timer_handle = cmd->timer_; + strategy->set_first_in_chain(cmd->timer_->get_timer_handle()); + strategy->set_executable_deadline(baroA->timer_->get_timer_handle(), chain_period, priority_executor::ExecutableType::TIMER, chain_id); + strategy->set_executable_deadline(imuA->timer_->get_timer_handle(), chain_period, priority_executor::ExecutableType::TIMER, chain_id); + strategy->set_executable_deadline(gpsA->timer_->get_timer_handle(), chain_period, priority_executor::ExecutableType::TIMER, chain_id); + REGISTER_MULTI_PROCESSING_NODE(fusionA, chain_period, chain_id); + strategy->set_executable_deadline(lidar->timer_->get_timer_handle(), chain_period, priority_executor::ExecutableType::TIMER, chain_id); + REGISTER_MULTI_PROCESSING_NODE(mgmt, chain_period, chain_id); + strategy->set_executable_deadline(control->subscription_->get_subscription_handle(), chain_period, priority_executor::ExecutableType::SUBSCRIPTION, chain_id); + strategy->set_last_in_chain(control->subscription_->get_subscription_handle()); + } + + // --- + chain_id = 3; + chain_period = 1000; + { + strategy->set_executable_deadline(baroB->timer_->get_timer_handle(), chain_period, priority_executor::ExecutableType::TIMER, chain_id); + strategy->get_priority_settings(baroB->timer_->get_timer_handle())->timer_handle = baroB->timer_; + strategy->set_first_in_chain(baroB->timer_->get_timer_handle()); + strategy->set_executable_deadline(imuB->timer_->get_timer_handle(), chain_period, priority_executor::ExecutableType::TIMER, chain_id); + strategy->set_executable_deadline(gpsB->timer_->get_timer_handle(), chain_period, priority_executor::ExecutableType::TIMER, chain_id); + REGISTER_MULTI_PROCESSING_NODE(fusionB, chain_period, chain_id); + REGISTER_MULTI_PROCESSING_NODE(telemetry, chain_period, chain_id); + strategy->set_executable_deadline(radio->subscription_->get_subscription_handle(), chain_period, priority_executor::ExecutableType::SUBSCRIPTION, chain_id); + strategy->set_last_in_chain(radio->subscription_->get_subscription_handle()); + } +#endif + + executor.add_node(cameraA); + executor.add_node(debayerA); + executor.add_node(radiometricA); executor.add_node(geometric); executor.add_node(mapping); - executor.add_node(smoke); + executor.add_node(cameraB); + executor.add_node(debayerB); + executor.add_node(radiometricB); + executor.add_node(smoke_classifier); - executor.add_node(gps); - executor.add_node(imu); - executor.add_node(baro); - executor.add_node(fusion); + executor.add_node(gpsA); + executor.add_node(imuA); + executor.add_node(baroA); + executor.add_node(fusionA); + + executor.add_node(gpsB); + executor.add_node(imuB); + executor.add_node(baroB); + executor.add_node(fusionB); executor.add_node(telemetry); executor.add_node(radio); @@ -239,3 +261,107 @@ int main(int argc, char **argv) rclcpp::shutdown(); return 0; } + +// ----------------------------------------------- + +// re-create the classic talker-listener example with two listeners +class Talker : public rclcpp::Node +{ +public: + Talker() : Node("talker") + { + // Create a publisher on the "chatter" topic with 10 msg queue size. + pub_ = this->create_publisher("chatter", 10); + // Create a timer of period 1s that calls our callback member function. + timer_ = this->create_wall_timer(std::chrono::seconds(1), + std::bind(&Talker::timer_callback, this)); + } + // the timer must be public + rclcpp::TimerBase::SharedPtr timer_; + +private: + void timer_callback() + { + std_msgs::msg::String msg; + msg.data = "Hello World!"; + RCLCPP_INFO(this->get_logger(), "Publishing: '%s'", msg.data.c_str()); + pub_->publish(msg); + } + rclcpp::Publisher::SharedPtr pub_; +}; + +class Listener : public rclcpp::Node +{ +public: + Listener(std::string name) : Node(name) + { + // Create a subscription on the "chatter" topic with the default callback + // method. + pub_ = this->create_publisher("/out", 10); + sub_ = this->create_subscription( + "chatter", 10, + std::bind(&Listener::callback, this, std::placeholders::_1)); + } + // the publisher must be public + rclcpp::Subscription::SharedPtr sub_; + rclcpp::Publisher::SharedPtr pub_; + +private: + void callback(const std_msgs::msg::String::SharedPtr msg) + { + RCLCPP_INFO(this->get_logger(), "I heard: '%s'", msg->data.c_str()); + std_msgs::msg::String msgn; + msgn.data = "Hello World!"; + RCLCPP_INFO(this->get_logger(), "Publishing: 'I heard: %s'", msgn.data.c_str()); + pub_->publish(msgn); + } +}; + +int edf_usage_example(int argc, char **argv) { + rclcpp::init(argc, argv); + + auto talker = std::make_shared(); + auto listener1 = std::make_shared("listener1"); + //auto listener2 = std::make_shared("listener2"); + rclcpp::ExecutorOptions options; + + auto strategy = std::make_shared>(); + options.memory_strategy = strategy; + auto executor = new priority_executor::TimedExecutor(options); + + // must be set to post_execute can set new deadlines + executor->prio_memory_strategy_ = strategy; + + + // the new funcitons in PriorityMemoryStrategy accept the handle of the + // timer/subscription as the first argument + strategy->set_executable_deadline(talker->timer_->get_timer_handle(), 1000, + priority_executor::ExecutableType::TIMER, 0); + // you _must_ set the timer_handle for each chain + strategy->get_priority_settings(talker->timer_->get_timer_handle())->timer_handle = talker->timer_; + // you _must_ mark the first executable in the chain + strategy->set_first_in_chain(talker->timer_->get_timer_handle()); + // set the same period and chain_id for each callback in the chain + strategy->set_executable_deadline(listener1->sub_->get_subscription_handle(), + 1000, priority_executor::ExecutableType::SUBSCRIPTION, 0); + //strategy->set_executable_deadline(listener2->sub_->get_subscription_handle(), 1000, priority_executor::ExecutableType::SUBSCRIPTION, 0); + // you _must_ mark the last executable in the chain (used to keep track of different instances of the same chain) + strategy->set_last_in_chain(listener1->sub_->get_subscription_handle()); + // add all the nodes to the executor + executor->add_node(talker); + executor->add_node(listener1); + //executor->add_node(listener2); + + // if the executor behaves unexpectedly, you can print the priority settings to make sure they are correct + std::cout << *strategy->get_priority_settings( + talker->timer_->get_timer_handle()) + << std::endl; + std::cout << *strategy->get_priority_settings( + listener1->sub_->get_subscription_handle()) + << std::endl; + //std::cout << *strategy->get_priority_settings(listener2->sub_->get_subscription_handle()) << std::endl; + executor->spin(); + + rclcpp::shutdown(); + return 0; +} \ No newline at end of file diff --git a/src/full_topology/src/nodes.hpp b/src/full_topology/src/nodes.hpp new file mode 100644 index 0000000..935460d --- /dev/null +++ b/src/full_topology/src/nodes.hpp @@ -0,0 +1,203 @@ +#pragma once + +#include +#include +#include +#include +#include +#include + +using namespace std::chrono_literals; + +#include "rclcpp/rclcpp.hpp" +#include "std_msgs/msg/string.hpp" + +// A generic sensor node that publishes at a given rate + jitter +#define DEFINE_SENSOR_NODE_CLASS(NAME) \ +class NAME : public rclcpp::Node \ +{ \ +public: \ + NAME(std::string const &name, \ + std::string const &topic, \ + int period_ms, \ + double jitter_ms_ = 0.0) \ + : Node(name) \ + { \ + publisher_ = create_publisher(topic, 10); \ + timer_ = create_wall_timer(std::chrono::milliseconds(period_ms), [this, name, jitter_ms_]() \ + { \ + double jitter = 0.0; \ + if (jitter_ms_ > 0.0) { \ + static thread_local std::mt19937 gen(std::random_device{}()); \ + std::normal_distribution<> d(0.0, jitter_ms_); \ + jitter = d(gen); \ + } \ + /* Ensure total delay is non-negative */ \ + int total_delay = std::max(0, static_cast(std::round(jitter))); \ + std::this_thread::sleep_for(std::chrono::milliseconds(total_delay)); \ + auto msg = std_msgs::msg::String(); \ + msg.data = name + std::string(" publishes"); \ + publisher_->publish(msg); }); \ + } \ + \ + rclcpp::Publisher::SharedPtr publisher_; \ + rclcpp::TimerBase::SharedPtr timer_; \ +}; + +// A generic single-input processing node with a fixed delay +/- jitter +#define DEFINE_PROCESSING_NODE_CLASS(NAME) \ +class NAME : public rclcpp::Node \ +{ \ +public: \ + NAME(const std::string & name, \ + const std::string & input_topic, \ + const std::string & output_topic, \ + int delay_ms, \ + double jitter_ms = 0.0) \ + : Node(name), delay_ms_(delay_ms), jitter_ms_(jitter_ms) \ + { \ + subscription_ = create_subscription( \ + input_topic, 10, \ + [this, name](std_msgs::msg::String::SharedPtr msg) \ + { \ + /* Simulate processing delay */ \ + double jitter = 0.0; \ + if (jitter_ms_ > 0.0) { \ + static thread_local std::mt19937 gen(std::random_device{}());\ + std::normal_distribution<> d(0.0, jitter_ms_); \ + jitter = d(gen); \ + } \ + std::this_thread::sleep_for(std::chrono::milliseconds((int)(delay_ms_ + jitter))); \ + auto out = std_msgs::msg::String(); \ + out.data = msg->data + " -> " + name; \ + publisher_->publish(out); \ + }); \ + publisher_ = create_publisher(output_topic, 10); \ + } \ + \ + rclcpp::Subscription::SharedPtr subscription_; \ + rclcpp::Publisher::SharedPtr publisher_; \ +protected: \ + int delay_ms_; \ + double jitter_ms_; \ +}; + +// A generic single-input processing node with a fixed delay +/- jitter +#ifdef USE_TIMER_IN_FUSION_NODES + #define DEFINE_MULTI_PROCESSING_NODE_CLASS(NAME) \ +class NAME : public rclcpp::Node \ +{ \ +public: \ + NAME(std::string const &name, \ + std::vector const &input_topics, \ + std::string const &output_topic, \ + int delay_ms, \ + double jitter_ms = 0.0) \ + : Node(name), delay_ms_(delay_ms), jitter_ms_(jitter_ms) \ + { \ + for (auto &topic : input_topics) \ + { \ + auto sub = create_subscription( \ + topic, 10, \ + [this, name](std_msgs::msg::String::SharedPtr msg) \ + { \ + }); \ + subscriptions_.emplace_back(sub); \ + } \ + publisher_ = create_publisher(output_topic, 10); \ + /* Also publish fused result periodically */ \ + timer_ = create_wall_timer(std::chrono::milliseconds(delay_ms_), [this, name]() \ + { \ + /* Simulate processing delay */ \ + double jitter = 0.0; \ + if (jitter_ms_ > 0.0) { \ + static thread_local std::mt19937 gen(std::random_device{}()); \ + std::normal_distribution<> d(0.0, jitter_ms_); \ + jitter = d(gen); \ + } \ + std::this_thread::sleep_for(std::chrono::milliseconds((int)jitter)); \ + auto out = std_msgs::msg::String(); \ + out.data = name + std::string(" fused"); \ + publisher_->publish(out); \ + }); \ + } \ + \ + std::vector::SharedPtr> subscriptions_; \ + rclcpp::Publisher::SharedPtr publisher_; \ + rclcpp::TimerBase::SharedPtr timer_; \ +protected: \ + int delay_ms_; \ + double jitter_ms_; \ +}; +#else + #define DEFINE_MULTI_PROCESSING_NODE_CLASS(NAME) \ +class NAME : public rclcpp::Node \ +{ \ +public: \ + NAME(std::string const &name, \ + std::vector const &input_topics, \ + std::string const &output_topic, \ + int delay_ms, \ + double jitter_ms = 0.0) \ + : Node(name), delay_ms_(delay_ms), jitter_ms_(jitter_ms) \ + { \ + for (auto &topic : input_topics) \ + { \ + auto sub = create_subscription( \ + topic, 10, \ + [this, name](std_msgs::msg::String::SharedPtr msg) \ + { \ + /* Simulate processing delay */ \ + double jitter = 0.0; \ + if (jitter_ms_ > 0.0) { \ + static thread_local std::mt19937 gen(std::random_device{}()); \ + std::normal_distribution<> d(0.0, jitter_ms_); \ + jitter = d(gen); \ + } \ + std::this_thread::sleep_for(std::chrono::milliseconds((int)(delay_ms_ + jitter))); \ + auto out = std_msgs::msg::String(); \ + out.data = msg->data + " -> " + name; \ + publisher_->publish(out); \ + }); \ + subscriptions_.emplace_back(sub); \ + } \ + publisher_ = create_publisher(output_topic, 10); \ + \ + } \ + \ + std::vector::SharedPtr> subscriptions_; \ + rclcpp::Publisher::SharedPtr publisher_; \ +protected: \ + int delay_ms_; \ + double jitter_ms_; \ +}; +#endif + +DEFINE_SENSOR_NODE_CLASS(CameraNode) +DEFINE_PROCESSING_NODE_CLASS(DebayerNode) +DEFINE_PROCESSING_NODE_CLASS(RadiometricNode) +DEFINE_PROCESSING_NODE_CLASS(GeometricNode) +DEFINE_PROCESSING_NODE_CLASS(MappingNode) + +DEFINE_PROCESSING_NODE_CLASS(SmokeClassifierNode) + +DEFINE_SENSOR_NODE_CLASS(GPSNode) +DEFINE_SENSOR_NODE_CLASS(IMUNode) +DEFINE_SENSOR_NODE_CLASS(BaroNode) + +DEFINE_PROCESSING_NODE_CLASS(FusionNodeS) +DEFINE_MULTI_PROCESSING_NODE_CLASS(FusionNode) + +DEFINE_SENSOR_NODE_CLASS(LidarNode) + +DEFINE_SENSOR_NODE_CLASS(CommandNode) + +DEFINE_PROCESSING_NODE_CLASS(FlightManagementNodeS) +DEFINE_MULTI_PROCESSING_NODE_CLASS(FlightManagementNode) + +DEFINE_PROCESSING_NODE_CLASS(ControlNode) + +DEFINE_PROCESSING_NODE_CLASS(TelemetryNodeS) +DEFINE_MULTI_PROCESSING_NODE_CLASS(TelemetryNode) + +DEFINE_PROCESSING_NODE_CLASS(RadioNode) \ No newline at end of file diff --git a/src/full_topology/src/simple.cpp b/src/full_topology/src/simple.cpp new file mode 100644 index 0000000..b6c39c4 --- /dev/null +++ b/src/full_topology/src/simple.cpp @@ -0,0 +1,125 @@ +// full_topology.cpp + +#include +#include +#include +#include +#include +#include +// the order here matters +#include "tracetools/utils.hpp" + +#include "rclcpp/rclcpp.hpp" +#include "std_msgs/msg/string.hpp" + +#include "config.hpp" +#include "nodes.hpp" + +#include "priority_executor/priority_executor.hpp" +#include "priority_executor/multithread_priority_executor.hpp" +#include "priority_executor/priority_memory_strategy.hpp" + +#define PROCENTUAL_INPUT_JITTER 0.05 +#define PROCENTUAL_PROCESSING_JITTER 0.1 + +#define STR_HELPER(x) #x +#define STR(x) STR_HELPER(x) + +#ifndef DEBUG + #define PRINT_BEFORE(CHAIN_ID, NAME) \ + std::cout << "Chain " << CHAIN_ID << " before: " << CHAIN_DURATION(CHAIN_ID) << " ms" << std::endl; + #define PRINT_AFTER(CHAIN_ID, NAME) \ + std::cout << "Chain " << CHAIN_ID << " after: " << CHAIN_DURATION(CHAIN_ID) << " ms" << std::endl; +#else + #define PRINT_BEFORE(CHAIN_ID, NAME) + #define PRINT_AFTER(CHAIN_ID, NAME) +#endif + +#define CHAIN_DURATION(CHAIN_ID) chain_duration_##CHAIN_ID +#define DECLARE_CHAIN(CHAIN_ID) double CHAIN_DURATION(CHAIN_ID) = 0.0; +#define SUM_CHAIN_DURATION(CHAIN_ID, NAME) \ +PRINT_BEFORE(CHAIN_ID, NAME); \ +CHAIN_DURATION(CHAIN_ID) += NAME##_duration + NAME##_jitter; \ +PRINT_AFTER(CHAIN_ID, NAME); + +#define MAKE_INPUT_NODE(CHAIN_ID, NAME, CLASS, TOPIC, PERIOD) \ + auto NAME##_duration = (PERIOD); \ + auto NAME##_jitter = PROCENTUAL_INPUT_JITTER * (PERIOD); \ + auto NAME = std::make_shared(std::string("input_" STR(NAME) "_node"), TOPIC, NAME##_duration, NAME##_jitter); \ + SUM_CHAIN_DURATION(CHAIN_ID, NAME) + +#define MAKE_PROCESSING_NODE(CHAIN_ID, NAME, CLASS, INPUT_TOPIC, OUTPUT_TOPIC, DURATION) \ + auto NAME##_duration = (DURATION); \ + auto NAME##_jitter = PROCENTUAL_PROCESSING_JITTER * (DURATION); \ + auto NAME = std::make_shared(std::string(STR(NAME) "_node"), INPUT_TOPIC, OUTPUT_TOPIC, NAME##_duration, NAME##_jitter); \ + SUM_CHAIN_DURATION(CHAIN_ID, NAME) + +#define MAKE_OUTPUT_NODE(CHAIN_ID, NAME, CLASS, INPUT_TOPIC, OUTPUT_TOPIC, DURATION) \ + auto NAME##_duration = (DURATION); \ + auto NAME##_jitter = PROCENTUAL_PROCESSING_JITTER * (DURATION); \ + auto NAME = std::make_shared(std::string("output_" STR(NAME) "_node"), INPUT_TOPIC, OUTPUT_TOPIC, NAME##_duration, NAME##_jitter); \ + SUM_CHAIN_DURATION(CHAIN_ID, NAME) + + +int main(int argc, char **argv) +{ + DECLARE_CHAIN(0); // Chain 1: Image2Ground Mapping + + rclcpp::init(argc, argv); + + // Create a priority executor with a custom memory strategy + auto strategy = std::make_shared>(); + rclcpp::ExecutorOptions options; + options.memory_strategy = strategy; + auto executor = priority_executor::MultithreadTimedExecutor(options); + + // Must be set to post_execute can set new deadlines + executor.prio_memory_strategy_ = strategy; + + // chain id, node name, node class, input topic, output topic, duration in ms + MAKE_INPUT_NODE( 0, cameraA , CameraNode , /*has no input topic*/ "/input/cameraA/raw" , 1000.0/2.0); // 60 FPS camera, fast + MAKE_PROCESSING_NODE(0, debayerA , DebayerNode , "/input/cameraA/raw" , "/cameraA/debayered" , 18); // Moderate, image conversion + MAKE_PROCESSING_NODE(0, radiometricA, RadiometricNode, "/cameraA/debayered" , "/cameraA/radiometric" , 22); // Heavier image math + MAKE_PROCESSING_NODE(0, geometric , GeometricNode , "/cameraA/radiometric", "/cameraA/geometric" , 12); // Perspective transform, medium + MAKE_OUTPUT_NODE( 0, mapping , MappingNode , "/cameraA/geometric" , "/output/cameraA/mapped", 16); // Warping/stitching, moderate + + // the new funcitons in PriorityMemoryStrategy accept the handle of the + // timer/subscription as the first argument + strategy->set_executable_deadline(cameraA->timer_->get_timer_handle() , 1000, priority_executor::ExecutableType::TIMER , 0); + // you _must_ set the timer_handle for each chain + strategy->get_priority_settings(cameraA->timer_->get_timer_handle())->timer_handle = cameraA->timer_; + // you _must_ mark the first executable in the chain + strategy->set_first_in_chain(cameraA->timer_->get_timer_handle()); + + // Set the executable deadlines for each node + strategy->set_executable_deadline(debayerA->subscription_->get_subscription_handle() , 1000, priority_executor::ExecutableType::SUBSCRIPTION, 0); + strategy->set_executable_deadline(radiometricA->subscription_->get_subscription_handle(), 1000, priority_executor::ExecutableType::SUBSCRIPTION, 0); + strategy->set_executable_deadline(geometric->subscription_->get_subscription_handle() , 1000, priority_executor::ExecutableType::SUBSCRIPTION, 0); + strategy->set_executable_deadline(mapping->subscription_->get_subscription_handle() , 1000, priority_executor::ExecutableType::SUBSCRIPTION, 0); + + // you _must_ mark the last executable in the chain (used to keep track of different instances of the same chain) + strategy->set_last_in_chain(mapping->subscription_->get_subscription_handle()); + + executor.add_node(cameraA); + executor.add_node(debayerA); + executor.add_node(radiometricA); + executor.add_node(geometric); + executor.add_node(mapping); + + std::cout << *strategy->get_priority_settings(cameraA->timer_->get_timer_handle()) + << std::endl; + std::cout << *strategy->get_priority_settings(debayerA->subscription_->get_subscription_handle()) + << std::endl; + std::cout << *strategy->get_priority_settings(radiometricA->subscription_->get_subscription_handle()) + << std::endl; + std::cout << *strategy->get_priority_settings(geometric->subscription_->get_subscription_handle()) + << std::endl; + std::cout << *strategy->get_priority_settings(mapping->subscription_->get_subscription_handle()) + << std::endl; + + executor.spin(); + + rclcpp::shutdown(); + + return 0; +} diff --git a/src/ros_edf b/src/ros_edf index c44b7b8..5f73198 160000 --- a/src/ros_edf +++ b/src/ros_edf @@ -1 +1 @@ -Subproject commit c44b7b8c3c527a8d16739349b50340a7de9b6f51 +Subproject commit 5f73198adbabb3a34985fb5e5c7f84b5e4149a86