got ros and edf to work, built startup script for reliable experimenting
This commit is contained in:
parent
bf3f7ff485
commit
8cd5b03b03
9 changed files with 745 additions and 214 deletions
6
build.sh
6
build.sh
|
@ -1,5 +1,7 @@
|
|||
#!/usr/bin/env bash
|
||||
|
||||
export RMW_IMPLEMENTATION=rmw_cyclonedds_cpp
|
||||
|
||||
colcon build --packages-select tracetools --allow-overriding tracetools && \
|
||||
source install/setup.bash && \
|
||||
colcon build --packages-select cyclonedds --allow-overriding cyclonedds && \
|
||||
|
@ -10,7 +12,9 @@ colcon build --packages-select tracetools --allow-overriding tracetools && \
|
|||
source install/setup.bash && \
|
||||
colcon build --packages-select rclcpp --allow-overriding rclcpp && \
|
||||
source install/setup.bash && \
|
||||
colcon build --packages-up-to full_topology && \
|
||||
colcon build --packages-select simple_timer priority_executor && \
|
||||
source install/setup.bash && \
|
||||
colcon build --packages-up-to ros2trace ros2trace_analysis full_topology && \
|
||||
source install/setup.bash
|
||||
|
||||
export RMW_IMPLEMENTATION=rmw_cyclonedds_cpp
|
|
@ -1,9 +1,66 @@
|
|||
#!/usr/bin/env bash
|
||||
|
||||
colcon build --packages-select full_topology && \
|
||||
source install/setup.bash && \
|
||||
TRACE_ID=$(taskset -c 2,3 ros2 launch full_topology trace_full_topology.launch.py length:=100 | \
|
||||
grep -oP '(?<=Writing tracing session to: /home/niklas/ROS-Dynamic-Executor-Experiments/analysis/tracing/full_topology_tracing-)\d+') && \
|
||||
ros2 trace-analysis convert "/home/niklas/ROS-Dynamic-Executor-Experiments/analysis/tracing/full_topology_tracing-${TRACE_ID}/ust" && \
|
||||
scp -r "analysis/tracing/full_topology_tracing-${TRACE_ID}/" 192.168.162.249:/home/niklas/dataflow-analysis/traces/ && \
|
||||
echo "Trace analysis \"${TRACE_ID}\" complete and copied to remote server."
|
||||
set -e
|
||||
|
||||
# ----------- CONFIGURABLES -----------
|
||||
REMOTE_USER="niklas"
|
||||
REMOTE_HOST="192.168.162.249"
|
||||
REMOTE_PATH="/home/niklas/dataflow-analysis/traces"
|
||||
TRACING_BASE="analysis/tracing"
|
||||
LOCAL_PROJECT_DIR="/home/niklas/ROS-Dynamic-Executor-Experiments"
|
||||
# --------------------------------------
|
||||
|
||||
usage() {
|
||||
echo "Usage: $0 <ros|edf|sem> <single|multi> <timed|direct> [time_in_seconds]"
|
||||
exit 1
|
||||
}
|
||||
|
||||
if [[ $# -lt 3 ]]; then
|
||||
usage
|
||||
fi
|
||||
|
||||
SCHEDULER_TYPE="$1"
|
||||
THREADING_TYPE="$2"
|
||||
FUSION_TYPE="$3"
|
||||
TRACE_LENGTH="${4:-10}"
|
||||
|
||||
# Determine defines based on parameters
|
||||
CMAKE_ARGS=()
|
||||
[[ "$SCHEDULER_TYPE" == "ros" ]] && CMAKE_ARGS+=("-DROS_DEFAULT_EXECUTOR=ON")
|
||||
[[ "$SCHEDULER_TYPE" == "edf" ]] && CMAKE_ARGS+=("-DEDF_PRIORITY_EXECUTOR=ON")
|
||||
[[ "$SCHEDULER_TYPE" == "sem" ]] && CMAKE_ARGS+=("-DSEMANTIC_EXECUTOR=ON")
|
||||
|
||||
[[ "$THREADING_TYPE" == "multi" ]] && CMAKE_ARGS+=("-DMULTI_THREADED=ON")
|
||||
[[ "$FUSION_TYPE" == "timed" ]] && CMAKE_ARGS+=("-DUSE_TIMER_IN_FUSION_NODES=ON")
|
||||
|
||||
TRACE_NAME="${SCHEDULER_TYPE}_${THREADING_TYPE}_${FUSION_TYPE}_${TRACE_LENGTH}"
|
||||
|
||||
cd "$LOCAL_PROJECT_DIR"
|
||||
|
||||
colcon build --packages-select full_topology --cmake-args "${CMAKE_ARGS[@]}"
|
||||
|
||||
source install/setup.bash
|
||||
|
||||
TRACE_ID=$(taskset -c 2,3 ros2 launch full_topology trace_full_topology.launch.py "length:=${TRACE_LENGTH}" | \
|
||||
grep -oP '(?<=Writing tracing session to: /home/niklas/ROS-Dynamic-Executor-Experiments/analysis/tracing/)[^\s]+')
|
||||
|
||||
if [[ -z "$TRACE_ID" ]]; then
|
||||
echo "[ERROR] Failed to get tracing session directory. Aborting."
|
||||
exit 2
|
||||
fi
|
||||
|
||||
TIMESTAMP_SUFFIX="${TRACE_ID#trace_full_topology}"
|
||||
NEW_TRACE_ID="${TRACE_NAME}${TIMESTAMP_SUFFIX}"
|
||||
TRACE_ORIG_PATH="${TRACING_BASE}/${TRACE_ID}"
|
||||
TRACE_NEW_PATH="${TRACING_BASE}/${NEW_TRACE_ID}"
|
||||
|
||||
ros2 trace-analysis convert "${TRACE_ORIG_PATH}/ust"
|
||||
|
||||
if [[ -e "$TRACE_NEW_PATH" ]]; then
|
||||
rm -rf "$TRACE_NEW_PATH"
|
||||
fi
|
||||
mv "$TRACE_ORIG_PATH" "$TRACE_NEW_PATH"
|
||||
|
||||
scp -r "$TRACE_NEW_PATH/" "$REMOTE_USER@$REMOTE_HOST:$REMOTE_PATH/"
|
||||
|
||||
echo "[SUCCESS] Trace analysis '$NEW_TRACE_ID' complete and copied to remote server."
|
||||
|
|
|
@ -12,14 +12,43 @@ if(CMAKE_COMPILER_IS_GNUCXX OR CMAKE_CXX_COMPILER_ID MATCHES "Clang")
|
|||
endif()
|
||||
|
||||
# find dependencies
|
||||
find_package(priority_executor REQUIRED)
|
||||
find_package(ament_cmake REQUIRED)
|
||||
find_package(rclcpp REQUIRED)
|
||||
find_package(std_msgs REQUIRED)
|
||||
|
||||
# Of the following three, only one should be one at any given time. If none is set, ros default will be used
|
||||
option(ROS_DEFAULT_EXECUTOR "Use ROS default executor" OFF)
|
||||
option(EDF_PRIORITY_EXECUTOR "Use EDF priority executor" OFF)
|
||||
option(SEMANTIC_EXECUTOR "Use semantic executor" OFF)
|
||||
|
||||
option(MULTI_THREADED "Use multi-threading" OFF)
|
||||
option(USE_TIMER_IN_FUSION_NODES "Use timers in fusion nodes" OFF)
|
||||
|
||||
if(ROS_DEFAULT_EXECUTOR)
|
||||
add_compile_definitions(ROS_DEFAULT_EXECUTOR)
|
||||
endif()
|
||||
|
||||
if(EDF_PRIORITY_EXECUTOR)
|
||||
add_compile_definitions(EDF_PRIORITY_EXECUTOR)
|
||||
endif()
|
||||
|
||||
if(SEMANTIC_EXECUTOR)
|
||||
add_compile_definitions(SEMANTIC_EXECUTOR)
|
||||
endif()
|
||||
|
||||
if(MULTI_THREADED)
|
||||
add_compile_definitions(MULTI_THREADED)
|
||||
endif()
|
||||
|
||||
if(USE_TIMER_IN_FUSION_NODES)
|
||||
add_compile_definitions(USE_TIMER_IN_FUSION_NODES)
|
||||
endif()
|
||||
|
||||
add_compile_definitions(RCLCPP_ENABLE_TRACEPOINTS)
|
||||
|
||||
add_executable(${PROJECT_NAME} src/full_topology.cpp)
|
||||
#add_executable(${PROJECT_NAME} src/test.cpp)
|
||||
#add_executable(${PROJECT_NAME} src/simple.cpp)
|
||||
target_include_directories(${PROJECT_NAME} PUBLIC
|
||||
$<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/include>
|
||||
$<INSTALL_INTERFACE:include>
|
||||
|
@ -27,6 +56,7 @@ target_include_directories(${PROJECT_NAME} PUBLIC
|
|||
|
||||
# Add dependencies with ament
|
||||
ament_target_dependencies(${PROJECT_NAME}
|
||||
priority_executor
|
||||
rclcpp
|
||||
std_msgs
|
||||
tracetools
|
||||
|
|
|
@ -17,7 +17,7 @@ def generate_launch_description():
|
|||
return launch.LaunchDescription([
|
||||
length_arg,
|
||||
Trace(
|
||||
session_name='full_topology_tracing',
|
||||
session_name='trace_full_topology',
|
||||
append_timestamp=True,
|
||||
base_path=target_path,
|
||||
events_ust=[
|
||||
|
@ -29,23 +29,8 @@ def generate_launch_description():
|
|||
'rclcpp:*callback*',
|
||||
'rclcpp:*executor*',
|
||||
'rmw_cyclonedds:*',
|
||||
# explicit list of message‑flow events
|
||||
'ros2:rclcpp_publish',
|
||||
'ros2:rclcpp_subscription_callback',
|
||||
'ros2:rclcpp_timer_callback',
|
||||
'ros2:rclcpp_executor_execute',
|
||||
'ros2:rclcpp_intra_publish',
|
||||
'ros2:rmw_take',
|
||||
'ros2:callback_start',
|
||||
'ros2:callback_end',
|
||||
# ---- (optional) add whatever else you like ----
|
||||
# scheduling / executor / memory events:
|
||||
'ros2:rclcpp_executor_*',
|
||||
'ros2:*callback_added',
|
||||
],
|
||||
events_kernel=[
|
||||
'sched_switch', 'sched_wakeup'
|
||||
],
|
||||
events_kernel=[],
|
||||
),
|
||||
Node(
|
||||
package='full_topology',
|
||||
|
|
|
@ -15,6 +15,7 @@
|
|||
<depend>tracetools_launch</depend>
|
||||
|
||||
<!-- Build Dependencies -->
|
||||
<depend>priority_executor</depend>
|
||||
<depend>rclcpp</depend>
|
||||
<depend>std_msgs</depend>
|
||||
<depend>tracetools</depend>
|
||||
|
|
|
@ -9,222 +9,244 @@
|
|||
// the order here matters
|
||||
#include "tracetools/utils.hpp"
|
||||
|
||||
#include <rclcpp/rclcpp.hpp>
|
||||
#include "rclcpp/rclcpp.hpp"
|
||||
#include "std_msgs/msg/string.hpp"
|
||||
|
||||
/** Whether to use timers in fusion nodes */
|
||||
//#define USE_TIMER_IN_FUSION_NODES
|
||||
#include "nodes.hpp"
|
||||
|
||||
using namespace std::chrono_literals;
|
||||
#include "priority_executor/priority_executor.hpp"
|
||||
#include "priority_executor/multithread_priority_executor.hpp"
|
||||
#include "priority_executor/priority_memory_strategy.hpp"
|
||||
|
||||
constexpr bool DEBUG = false;
|
||||
#define PROCENTUAL_INPUT_JITTER 0.05
|
||||
#define PROCENTUAL_PROCESSING_JITTER 0.1
|
||||
|
||||
// A generic sensor node that publishes at a given rate + jitter
|
||||
#define DEFINE_SENSOR_NODE_CLASS(NAME) \
|
||||
class NAME : public rclcpp::Node \
|
||||
{ \
|
||||
public: \
|
||||
NAME(std::string const &name, \
|
||||
std::string const &topic, \
|
||||
int rate_hz, \
|
||||
double jitter_ms_ = 0.0) \
|
||||
: Node(name) \
|
||||
{ \
|
||||
publisher_ = create_publisher<std_msgs::msg::String>(topic, 10); \
|
||||
auto period = std::chrono::milliseconds(1000 / rate_hz); \
|
||||
timer_ = create_wall_timer(period, [this, name, jitter_ms_]() \
|
||||
{ \
|
||||
double jitter = 0.0; \
|
||||
if (jitter_ms_ > 0.0) { \
|
||||
static thread_local std::mt19937 gen(std::random_device{}()); \
|
||||
std::normal_distribution<> d(0.0, jitter_ms_); \
|
||||
jitter = d(gen); \
|
||||
#define STR_HELPER(x) #x
|
||||
#define STR(x) STR_HELPER(x)
|
||||
|
||||
#ifndef DEBUG
|
||||
#define PRINT_BEFORE(CHAIN_ID, NAME) \
|
||||
std::cout << "Chain " << CHAIN_ID << " before: " << CHAIN_DURATION(CHAIN_ID) << " ms" << std::endl;
|
||||
#define PRINT_AFTER(CHAIN_ID, NAME) \
|
||||
std::cout << "Chain " << CHAIN_ID << " after: " << CHAIN_DURATION(CHAIN_ID) << " ms" << std::endl;
|
||||
#else
|
||||
#define PRINT_BEFORE(CHAIN_ID, NAME)
|
||||
#define PRINT_AFTER(CHAIN_ID, NAME)
|
||||
#endif
|
||||
|
||||
#ifdef EDF_PRIORITY_EXECUTOR
|
||||
#define CHAIN_DURATION(CHAIN_ID) chain_duration_##CHAIN_ID
|
||||
#define DECLARE_CHAIN(CHAIN_ID) double CHAIN_DURATION(CHAIN_ID) = 0.0;
|
||||
#define SUM_CHAIN_DURATION(CHAIN_ID, NAME) \
|
||||
PRINT_BEFORE(CHAIN_ID, NAME); \
|
||||
CHAIN_DURATION(CHAIN_ID) += NAME##_duration + NAME##_jitter; \
|
||||
PRINT_AFTER(CHAIN_ID, NAME);
|
||||
#else
|
||||
#define DECLARE_CHAIN(CHAIN_ID)
|
||||
#define SUM_CHAIN_DURATION(CHAIN_ID, NAME)
|
||||
#endif
|
||||
|
||||
#define MAKE_INPUT_NODE(CHAIN_ID, NAME, CLASS, TOPIC, PERIOD) \
|
||||
auto NAME##_duration = (PERIOD); \
|
||||
auto NAME##_jitter = PROCENTUAL_INPUT_JITTER * (PERIOD); \
|
||||
auto NAME = std::make_shared<CLASS>(std::string("input_" STR(NAME) "_node"), TOPIC, NAME##_duration, NAME##_jitter); \
|
||||
SUM_CHAIN_DURATION(CHAIN_ID, NAME)
|
||||
|
||||
#define MAKE_PROCESSING_NODE(CHAIN_ID, NAME, CLASS, INPUT_TOPIC, OUTPUT_TOPIC, DURATION) \
|
||||
auto NAME##_duration = (DURATION); \
|
||||
auto NAME##_jitter = PROCENTUAL_PROCESSING_JITTER * (DURATION); \
|
||||
auto NAME = std::make_shared<CLASS>(std::string(STR(NAME) "_node"), INPUT_TOPIC, OUTPUT_TOPIC, NAME##_duration, NAME##_jitter); \
|
||||
SUM_CHAIN_DURATION(CHAIN_ID, NAME)
|
||||
|
||||
#define MAKE_OUTPUT_NODE(CHAIN_ID, NAME, CLASS, INPUT_TOPIC, OUTPUT_TOPIC, DURATION) \
|
||||
auto NAME##_duration = (DURATION); \
|
||||
auto NAME##_jitter = PROCENTUAL_PROCESSING_JITTER * (DURATION); \
|
||||
auto NAME = std::make_shared<CLASS>(std::string("output_" STR(NAME) "_node"), INPUT_TOPIC, OUTPUT_TOPIC, NAME##_duration, NAME##_jitter); \
|
||||
SUM_CHAIN_DURATION(CHAIN_ID, NAME)
|
||||
|
||||
#ifdef USE_TIMER_IN_FUSION_NODES
|
||||
#define REGISTER_MULTI_PROCESSING_NODE(NAME, CHAIN_PERIOD, CHAIN_ID) \
|
||||
for (auto const &sub : NAME->subscriptions_) { \
|
||||
strategy->set_executable_deadline(sub->get_subscription_handle(), CHAIN_PERIOD, priority_executor::ExecutableType::SUBSCRIPTION, CHAIN_ID); \
|
||||
} \
|
||||
/* Ensure total delay is non-negative */ \
|
||||
int total_delay = std::max(0, static_cast<int>(std::round(jitter))); \
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(total_delay)); \
|
||||
auto msg = std_msgs::msg::String(); \
|
||||
msg.data = name + std::string(" publishes"); \
|
||||
if (DEBUG) { \
|
||||
RCLCPP_INFO(get_logger(), "%s", msg.data.c_str()); \
|
||||
} \
|
||||
publisher_->publish(msg); }); \
|
||||
} \
|
||||
\
|
||||
private: \
|
||||
rclcpp::Publisher<std_msgs::msg::String>::SharedPtr publisher_; \
|
||||
rclcpp::TimerBase::SharedPtr timer_; \
|
||||
};
|
||||
strategy->set_executable_deadline(NAME->timer_->get_timer_handle(), CHAIN_PERIOD, priority_executor::ExecutableType::TIMER, CHAIN_ID);
|
||||
#else
|
||||
#define REGISTER_MULTI_PROCESSING_NODE(NAME, CHAIN_PERIOD, CHAIN_ID) \
|
||||
for (auto const &sub : NAME->subscriptions_) { \
|
||||
strategy->set_executable_deadline(sub->get_subscription_handle(), CHAIN_PERIOD, priority_executor::ExecutableType::SUBSCRIPTION, CHAIN_ID); \
|
||||
}
|
||||
#endif
|
||||
|
||||
// A generic single-input processing node with a fixed delay +/- jitter
|
||||
#define DEFINE_PROCESSING_NODE_CLASS(NAME) \
|
||||
class NAME : public rclcpp::Node \
|
||||
{ \
|
||||
public: \
|
||||
NAME(const std::string & name, \
|
||||
const std::string & input_topic, \
|
||||
const std::string & output_topic, \
|
||||
int delay_ms, \
|
||||
double jitter_ms = 0.0) \
|
||||
: Node(name), delay_ms_(delay_ms), jitter_ms_(jitter_ms) \
|
||||
{ \
|
||||
subscription_ = create_subscription<std_msgs::msg::String>( \
|
||||
input_topic, 10, \
|
||||
[this, name](std_msgs::msg::String::SharedPtr msg) \
|
||||
{ \
|
||||
if (DEBUG) { \
|
||||
RCLCPP_INFO(get_logger(), "%s received: %s", name.c_str(), msg->data.c_str()); \
|
||||
} \
|
||||
/* Simulate processing delay */ \
|
||||
double jitter = 0.0; \
|
||||
if (jitter_ms_ > 0.0) { \
|
||||
static thread_local std::mt19937 gen(std::random_device{}());\
|
||||
std::normal_distribution<> d(0.0, jitter_ms_); \
|
||||
jitter = d(gen); \
|
||||
} \
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds((int)(delay_ms_ + jitter))); \
|
||||
auto out = std_msgs::msg::String(); \
|
||||
out.data = msg->data + " -> " + name; \
|
||||
publisher_->publish(out); \
|
||||
}); \
|
||||
publisher_ = create_publisher<std_msgs::msg::String>(output_topic, 10); \
|
||||
} \
|
||||
protected: \
|
||||
int delay_ms_; \
|
||||
double jitter_ms_; \
|
||||
rclcpp::Subscription<std_msgs::msg::String>::SharedPtr subscription_; \
|
||||
rclcpp::Publisher<std_msgs::msg::String>::SharedPtr publisher_; \
|
||||
};
|
||||
int edf_usage_example(int argc, char **argv);
|
||||
|
||||
// A generic single-input processing node with a fixed delay +/- jitter
|
||||
#define DEFINE_MULTI_PROCESSING_NODE_CLASS(NAME) \
|
||||
class NAME : public rclcpp::Node \
|
||||
{ \
|
||||
public: \
|
||||
NAME(std::string const &name, \
|
||||
std::vector<std::string> const &input_topics, \
|
||||
std::string const &output_topic, \
|
||||
int delay_ms, \
|
||||
double jitter_ms = 0.0) \
|
||||
: Node(name), delay_ms_(delay_ms), jitter_ms_(jitter_ms) \
|
||||
{ \
|
||||
for (auto &topic : input_topics) \
|
||||
{ \
|
||||
auto sub = create_subscription<std_msgs::msg::String>( \
|
||||
topic, 10, \
|
||||
[this, name](std_msgs::msg::String::SharedPtr msg) \
|
||||
{ \
|
||||
/* simple fusion: log each arrival */ \
|
||||
if (DEBUG) \
|
||||
{ \
|
||||
RCLCPP_INFO(get_logger(), "%s fusing input: %s", name.c_str(), msg->data.c_str()); \
|
||||
} \
|
||||
/* Simulate processing delay */ \
|
||||
double jitter = 0.0; \
|
||||
if (jitter_ms_ > 0.0) { \
|
||||
static thread_local std::mt19937 gen(std::random_device{}()); \
|
||||
std::normal_distribution<> d(0.0, jitter_ms_); \
|
||||
jitter = d(gen); \
|
||||
} \
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds((int)(delay_ms_ + jitter))); \
|
||||
auto out = std_msgs::msg::String(); \
|
||||
out.data = msg->data + " -> " + name; \
|
||||
if (DEBUG) { \
|
||||
RCLCPP_INFO(get_logger(), "%s publishes fused message", name.c_str()); \
|
||||
} \
|
||||
publisher_->publish(out); \
|
||||
}); \
|
||||
subscriptions_.emplace_back(sub); \
|
||||
} \
|
||||
publisher_ = create_publisher<std_msgs::msg::String>(output_topic, 10); \
|
||||
\
|
||||
} \
|
||||
protected: \
|
||||
std::vector<rclcpp::Subscription<std_msgs::msg::String>::SharedPtr> subscriptions_; \
|
||||
rclcpp::Publisher<std_msgs::msg::String>::SharedPtr publisher_; \
|
||||
int delay_ms_; \
|
||||
double jitter_ms_; \
|
||||
};
|
||||
|
||||
DEFINE_SENSOR_NODE_CLASS(CameraNode)
|
||||
DEFINE_PROCESSING_NODE_CLASS(DebayerNode)
|
||||
DEFINE_PROCESSING_NODE_CLASS(RadiometricNode)
|
||||
DEFINE_PROCESSING_NODE_CLASS(GeometricNode)
|
||||
DEFINE_PROCESSING_NODE_CLASS(MappingNode)
|
||||
|
||||
DEFINE_PROCESSING_NODE_CLASS(SmokeClassifierNode)
|
||||
|
||||
DEFINE_SENSOR_NODE_CLASS(GPSNode)
|
||||
DEFINE_SENSOR_NODE_CLASS(IMUNode)
|
||||
DEFINE_SENSOR_NODE_CLASS(BaroNode)
|
||||
|
||||
DEFINE_PROCESSING_NODE_CLASS(FusionNodeS)
|
||||
DEFINE_MULTI_PROCESSING_NODE_CLASS(FusionNode)
|
||||
|
||||
DEFINE_SENSOR_NODE_CLASS(LidarNode)
|
||||
|
||||
DEFINE_SENSOR_NODE_CLASS(CommandNode)
|
||||
|
||||
DEFINE_PROCESSING_NODE_CLASS(FlightManagementNodeS)
|
||||
|
||||
DEFINE_MULTI_PROCESSING_NODE_CLASS(FlightManagementNode)
|
||||
|
||||
DEFINE_PROCESSING_NODE_CLASS(ControlNode)
|
||||
|
||||
DEFINE_PROCESSING_NODE_CLASS(TelemetryNodeS)
|
||||
DEFINE_MULTI_PROCESSING_NODE_CLASS(TelemetryNode)
|
||||
|
||||
DEFINE_PROCESSING_NODE_CLASS(RadioNode)
|
||||
|
||||
#define PROCENTUAL_JITTER 0.1
|
||||
#define DELAY_JITTER_PAIR(DELAY) DELAY, PROCENTUAL_JITTER * DELAY
|
||||
|
||||
int main(int argc, char **argv)
|
||||
{
|
||||
rclcpp::init(argc, argv);
|
||||
|
||||
DECLARE_CHAIN(0); // Chain 1: Image2Ground Mapping
|
||||
DECLARE_CHAIN(1); // Chain 2: Semantic Scheduler Adaptation
|
||||
DECLARE_CHAIN(2); // Chain 3: Flight Control
|
||||
DECLARE_CHAIN(3); // Chain 4: Data Storage/Streaming
|
||||
|
||||
#if defined(SEMANTIC_EXECUTOR)
|
||||
std::cerr << "Not yet implemented" << std::endl;
|
||||
return -1;
|
||||
#elif defined(EDF_PRIORITY_EXECUTOR)
|
||||
// Create a priority executor with a custom memory strategy
|
||||
auto strategy = std::make_shared<priority_executor::PriorityMemoryStrategy<>>();
|
||||
rclcpp::ExecutorOptions options;
|
||||
options.memory_strategy = strategy;
|
||||
#ifdef MULTI_THREADED
|
||||
auto executor = priority_executor::MultithreadTimedExecutor(options);
|
||||
#else
|
||||
auto executor = priority_executor::TimedExecutor(options);
|
||||
#endif
|
||||
|
||||
// Must be set to post_execute can set new deadlines
|
||||
executor.prio_memory_strategy_ = strategy;
|
||||
#else // default: ROS
|
||||
#ifdef MULTI_THREADED
|
||||
// Create a multi-threaded executor to handle all nodes
|
||||
rclcpp::executors::MultiThreadedExecutor executor;
|
||||
#else
|
||||
// Create a single-threaded executor to handle all nodes
|
||||
rclcpp::executors::SingleThreadedExecutor executor;
|
||||
#endif
|
||||
#endif
|
||||
|
||||
// Chain 1: Image2Ground Mapping
|
||||
auto camera = std::make_shared<CameraNode>("input_camera_node", "/input/camera/raw", DELAY_JITTER_PAIR(5)); // Sensor read, low latency
|
||||
auto debayer = std::make_shared<DebayerNode>("debayer_node", "/input/camera/raw", "/camera/debayered", DELAY_JITTER_PAIR(18)); // Moderate, image conversion
|
||||
auto radiometric = std::make_shared<RadiometricNode>("radiometric_node", "/camera/debayered", "/camera/radiometric", DELAY_JITTER_PAIR(22)); // Heavier image math
|
||||
auto geometric = std::make_shared<GeometricNode>("geometric_node", "/camera/radiometric", "/camera/geometric", DELAY_JITTER_PAIR(12)); // Perspective transform, medium
|
||||
auto mapping = std::make_shared<MappingNode>("output_mapping_node", "/camera/geometric", "/output/camera/mapped", DELAY_JITTER_PAIR(16)); // Warping/stitching, moderate
|
||||
MAKE_INPUT_NODE(0, cameraA, CameraNode, "/input/cameraA/raw", 200); // 60 FPS camera, fast
|
||||
MAKE_PROCESSING_NODE(0, debayerA, DebayerNode, "/input/cameraA/raw", "/cameraA/debayered", 18); // Moderate, image conversion
|
||||
MAKE_PROCESSING_NODE(0, radiometricA, RadiometricNode, "/cameraA/debayered", "/cameraA/radiometric", 22); // Heavier image math
|
||||
MAKE_PROCESSING_NODE(0, geometric, GeometricNode, "/cameraA/radiometric", "/cameraA/geometric", 12); // Perspective transform, medium
|
||||
MAKE_OUTPUT_NODE(0, mapping, MappingNode, "/cameraA/geometric", "/output/cameraA/mapped", 16); // Warping/stitching, moderate
|
||||
|
||||
// Chain 2: Semantic Scheduler Adaptation
|
||||
auto smoke = std::make_shared<SmokeClassifierNode>("output_smoke_classifier_node", "/camera/radiometric", "/output/classifier/classification", DELAY_JITTER_PAIR(35)); // CNN/ML classifier, can be high
|
||||
MAKE_INPUT_NODE(1, cameraB, CameraNode, "/input/cameraB/raw", 150); // 60 FPS camera, fast
|
||||
MAKE_PROCESSING_NODE(1, debayerB, DebayerNode, "/input/cameraB/raw", "/cameraB/debayered", 18); // Moderate, image conversion
|
||||
MAKE_PROCESSING_NODE(1, radiometricB, RadiometricNode, "/cameraB/debayered", "/cameraB/radiometric", 22); // Heavier image math
|
||||
MAKE_OUTPUT_NODE(1, smoke_classifier, SmokeClassifierNode, "/cameraB/radiometric", "/output/classifier/classification", 35); // CNN/ML classifier, can be high
|
||||
|
||||
// Chain 3: Flight Control
|
||||
auto gps = std::make_shared<GPSNode>("input_gps_node", "/input/gps/fix", DELAY_JITTER_PAIR(7)); // Fast, low latency sensor
|
||||
auto imu = std::make_shared<IMUNode>("input_imu_node", "/input/imu/data", DELAY_JITTER_PAIR(5)); // Fast, low latency sensor
|
||||
auto baro = std::make_shared<BaroNode>("input_baro_node", "/input/baro/alt", DELAY_JITTER_PAIR(6)); // Fast, low latency sensor
|
||||
auto fusion = std::make_shared<FusionNode>(
|
||||
"sensor_fusion_node", std::vector<std::string>{"/input/gps/fix", "/input/imu/data", "/input/baro/alt"}, "/sensors/fused", DELAY_JITTER_PAIR(14)); // Sensor fusion, not trivial but not heavy
|
||||
std::vector<std::string> fusionA_input_topics = {"/input/gpsA/fix", "/input/imuA/data", "/input/baroA/alt"};
|
||||
MAKE_INPUT_NODE(2, gpsA, GPSNode, "/input/gpsA/fix", 100); // Fast, low latency sensor
|
||||
MAKE_INPUT_NODE(2, imuA, IMUNode, "/input/imuA/data", 100); // Fast, low latency sensor
|
||||
MAKE_INPUT_NODE(2, baroA, BaroNode, "/input/baroA/alt", 100); // Fast, low latency sensor
|
||||
MAKE_PROCESSING_NODE(2, fusionA, FusionNode, fusionA_input_topics, "/sensorsA/fused", 14); // Sensor fusion, not trivial but not heavy
|
||||
|
||||
auto lidar = std::make_shared<LidarNode>("input_lidar_node", "/input/lidar/scan", DELAY_JITTER_PAIR(10)); // LIDAR driver, can be fast
|
||||
auto cmd = std::make_shared<CommandNode>("input_operator_cmd_node", "/input/operator/commands", DELAY_JITTER_PAIR(8)); // Operator commands, fast
|
||||
auto mgmt = std::make_shared<FlightManagementNode>(
|
||||
"flight_mgmt_node", std::vector<std::string>{"/sensors/fused", "/input/lidar/scan", "/input/operator/commands"}, "/flight/plan", DELAY_JITTER_PAIR(18)); // Path planning, moderate
|
||||
auto control = std::make_shared<ControlNode>("output_flight_control_node", "/flight/plan", "/output/flight/cmd", DELAY_JITTER_PAIR(12)); // PID controller, fast but non-zero
|
||||
MAKE_INPUT_NODE(2, lidar, LidarNode, "/input/lidar/scan", 100); // LIDAR driver, can be fast
|
||||
MAKE_INPUT_NODE(2, cmd, CommandNode, "/input/operator/commands", 100); // Operator commands, fast
|
||||
std::vector<std::string> mgmt_input_topics = {"/sensorsA/fused", "/input/lidar/scan", "/input/operator/commands"};
|
||||
MAKE_PROCESSING_NODE(2, mgmt, FlightManagementNode, mgmt_input_topics, "/flight/plan", 18); // Path planning, moderate
|
||||
//MAKE_PROCESSING_NODE(2, mgmt, FlightManagementNodeS, "/input/operator/commands", "/flight/plan", 18); // Path planning, moderate
|
||||
MAKE_OUTPUT_NODE(2, control, ControlNode, "/flight/plan", "/output/flight/cmd", 12); // PID controller, fast but non-zero
|
||||
|
||||
// Chain 4: Data Storage/Streaming
|
||||
auto telemetry = std::make_shared<TelemetryNode>(
|
||||
"telemetry_node", std::vector<std::string>{"/sensors/fused", "/output/camera/mapped"}, "/telemetry/data", DELAY_JITTER_PAIR(6)); // Quick serialization
|
||||
auto radio = std::make_shared<RadioNode>("output_radio_tx_node", "/telemetry/data", "/output/telemetry/radio", DELAY_JITTER_PAIR(3)); // Output, almost instant
|
||||
std::vector<std::string> fusionB_input_topics = {"/input/gpsB/fix", "/input/imuB/data", "/input/baroB/alt"};
|
||||
MAKE_INPUT_NODE(3, gpsB, GPSNode, "/input/gpsB/fix", 100); // Fast, low latency sensor
|
||||
MAKE_INPUT_NODE(3, imuB, IMUNode, "/input/imuB/data", 100); // Fast, low latency sensor
|
||||
MAKE_INPUT_NODE(3, baroB, BaroNode, "/input/baroB/alt", 100); // Fast, low latency sensor
|
||||
MAKE_PROCESSING_NODE(3, fusionB, FusionNode, fusionB_input_topics, "/sensorsB/fused", 14); // Sensor fusion, not trivial but not heavy
|
||||
//MAKE_PROCESSING_NODE(3, fusionB, FusionNodeS, "/input/baroB/alt", "/sensorsB/fused", 14); // Sensor fusion, not trivial but not heavy
|
||||
|
||||
// Add all to executor
|
||||
executor.add_node(camera);
|
||||
executor.add_node(debayer);
|
||||
executor.add_node(radiometric);
|
||||
std::vector<std::string> telemetry_input_topics = {"/sensorsB/fused", "/output/cameraA/mapped"};
|
||||
MAKE_PROCESSING_NODE(3, telemetry, TelemetryNode, telemetry_input_topics, "/telemetry/data", 6); // Quick serialization, low latency
|
||||
//MAKE_PROCESSING_NODE(3, telemetry, TelemetryNodeS, "/sensorsB/fused", "/telemetry/data", 6); // Quick serialization, low latency
|
||||
MAKE_OUTPUT_NODE(3, radio, RadioNode, "/telemetry/data", "/output/telemetry/radio", 3); // Output, almost instant
|
||||
|
||||
#if defined(EDF_PRIORITY_EXECUTOR)
|
||||
int chain_period;
|
||||
int chain_id;
|
||||
|
||||
// ---
|
||||
chain_id = 0;
|
||||
chain_period = 1000;
|
||||
{
|
||||
// the new funcitons in PriorityMemoryStrategy accept the handle of the
|
||||
// timer/subscription as the first argument
|
||||
strategy->set_executable_deadline(cameraA->timer_->get_timer_handle(), chain_period, priority_executor::ExecutableType::TIMER, chain_id);
|
||||
// you _must_ set the timer_handle for each chain
|
||||
strategy->get_priority_settings(cameraA->timer_->get_timer_handle())->timer_handle = cameraA->timer_;
|
||||
// you _must_ mark the first executable in the chain
|
||||
strategy->set_first_in_chain(cameraA->timer_->get_timer_handle());
|
||||
|
||||
// Set the executable deadlines for each node
|
||||
strategy->set_executable_deadline(debayerA->subscription_->get_subscription_handle(), chain_period, priority_executor::ExecutableType::SUBSCRIPTION, chain_id);
|
||||
strategy->set_executable_deadline(radiometricA->subscription_->get_subscription_handle(), chain_period, priority_executor::ExecutableType::SUBSCRIPTION, chain_id);
|
||||
strategy->set_executable_deadline(geometric->subscription_->get_subscription_handle(), chain_period, priority_executor::ExecutableType::SUBSCRIPTION, chain_id);
|
||||
strategy->set_executable_deadline(mapping->subscription_->get_subscription_handle(), chain_period, priority_executor::ExecutableType::SUBSCRIPTION, chain_id);
|
||||
|
||||
// you _must_ mark the last executable in the chain (used to keep track of different instances of the same chain)
|
||||
strategy->set_last_in_chain(mapping->subscription_->get_subscription_handle());
|
||||
}
|
||||
|
||||
// ---
|
||||
chain_id = 1;
|
||||
chain_period = 1000;
|
||||
{
|
||||
strategy->set_executable_deadline(cameraB->timer_->get_timer_handle(), chain_period, priority_executor::ExecutableType::TIMER, chain_id);
|
||||
strategy->get_priority_settings(cameraB->timer_->get_timer_handle())->timer_handle = cameraB->timer_;
|
||||
strategy->set_first_in_chain(cameraB->timer_->get_timer_handle());
|
||||
strategy->set_executable_deadline(debayerB->subscription_->get_subscription_handle(), chain_period, priority_executor::ExecutableType::SUBSCRIPTION, chain_id);
|
||||
strategy->set_executable_deadline(radiometricB->subscription_->get_subscription_handle(), chain_period, priority_executor::ExecutableType::SUBSCRIPTION, chain_id);
|
||||
strategy->set_executable_deadline(smoke_classifier->subscription_->get_subscription_handle(), chain_period, priority_executor::ExecutableType::SUBSCRIPTION, chain_id);
|
||||
strategy->set_last_in_chain(smoke_classifier->subscription_->get_subscription_handle());
|
||||
}
|
||||
|
||||
// ---
|
||||
chain_id = 2;
|
||||
chain_period = 1000;
|
||||
{
|
||||
strategy->set_executable_deadline(cmd->timer_->get_timer_handle(), chain_period, priority_executor::ExecutableType::TIMER, chain_id);
|
||||
strategy->get_priority_settings(cmd->timer_->get_timer_handle())->timer_handle = cmd->timer_;
|
||||
strategy->set_first_in_chain(cmd->timer_->get_timer_handle());
|
||||
strategy->set_executable_deadline(baroA->timer_->get_timer_handle(), chain_period, priority_executor::ExecutableType::TIMER, chain_id);
|
||||
strategy->set_executable_deadline(imuA->timer_->get_timer_handle(), chain_period, priority_executor::ExecutableType::TIMER, chain_id);
|
||||
strategy->set_executable_deadline(gpsA->timer_->get_timer_handle(), chain_period, priority_executor::ExecutableType::TIMER, chain_id);
|
||||
REGISTER_MULTI_PROCESSING_NODE(fusionA, chain_period, chain_id);
|
||||
strategy->set_executable_deadline(lidar->timer_->get_timer_handle(), chain_period, priority_executor::ExecutableType::TIMER, chain_id);
|
||||
REGISTER_MULTI_PROCESSING_NODE(mgmt, chain_period, chain_id);
|
||||
strategy->set_executable_deadline(control->subscription_->get_subscription_handle(), chain_period, priority_executor::ExecutableType::SUBSCRIPTION, chain_id);
|
||||
strategy->set_last_in_chain(control->subscription_->get_subscription_handle());
|
||||
}
|
||||
|
||||
// ---
|
||||
chain_id = 3;
|
||||
chain_period = 1000;
|
||||
{
|
||||
strategy->set_executable_deadline(baroB->timer_->get_timer_handle(), chain_period, priority_executor::ExecutableType::TIMER, chain_id);
|
||||
strategy->get_priority_settings(baroB->timer_->get_timer_handle())->timer_handle = baroB->timer_;
|
||||
strategy->set_first_in_chain(baroB->timer_->get_timer_handle());
|
||||
strategy->set_executable_deadline(imuB->timer_->get_timer_handle(), chain_period, priority_executor::ExecutableType::TIMER, chain_id);
|
||||
strategy->set_executable_deadline(gpsB->timer_->get_timer_handle(), chain_period, priority_executor::ExecutableType::TIMER, chain_id);
|
||||
REGISTER_MULTI_PROCESSING_NODE(fusionB, chain_period, chain_id);
|
||||
REGISTER_MULTI_PROCESSING_NODE(telemetry, chain_period, chain_id);
|
||||
strategy->set_executable_deadline(radio->subscription_->get_subscription_handle(), chain_period, priority_executor::ExecutableType::SUBSCRIPTION, chain_id);
|
||||
strategy->set_last_in_chain(radio->subscription_->get_subscription_handle());
|
||||
}
|
||||
#endif
|
||||
|
||||
executor.add_node(cameraA);
|
||||
executor.add_node(debayerA);
|
||||
executor.add_node(radiometricA);
|
||||
executor.add_node(geometric);
|
||||
executor.add_node(mapping);
|
||||
|
||||
executor.add_node(smoke);
|
||||
executor.add_node(cameraB);
|
||||
executor.add_node(debayerB);
|
||||
executor.add_node(radiometricB);
|
||||
executor.add_node(smoke_classifier);
|
||||
|
||||
executor.add_node(gps);
|
||||
executor.add_node(imu);
|
||||
executor.add_node(baro);
|
||||
executor.add_node(fusion);
|
||||
executor.add_node(gpsA);
|
||||
executor.add_node(imuA);
|
||||
executor.add_node(baroA);
|
||||
executor.add_node(fusionA);
|
||||
|
||||
executor.add_node(gpsB);
|
||||
executor.add_node(imuB);
|
||||
executor.add_node(baroB);
|
||||
executor.add_node(fusionB);
|
||||
|
||||
executor.add_node(telemetry);
|
||||
executor.add_node(radio);
|
||||
|
@ -239,3 +261,107 @@ int main(int argc, char **argv)
|
|||
rclcpp::shutdown();
|
||||
return 0;
|
||||
}
|
||||
|
||||
// -----------------------------------------------
|
||||
|
||||
// re-create the classic talker-listener example with two listeners
|
||||
class Talker : public rclcpp::Node
|
||||
{
|
||||
public:
|
||||
Talker() : Node("talker")
|
||||
{
|
||||
// Create a publisher on the "chatter" topic with 10 msg queue size.
|
||||
pub_ = this->create_publisher<std_msgs::msg::String>("chatter", 10);
|
||||
// Create a timer of period 1s that calls our callback member function.
|
||||
timer_ = this->create_wall_timer(std::chrono::seconds(1),
|
||||
std::bind(&Talker::timer_callback, this));
|
||||
}
|
||||
// the timer must be public
|
||||
rclcpp::TimerBase::SharedPtr timer_;
|
||||
|
||||
private:
|
||||
void timer_callback()
|
||||
{
|
||||
std_msgs::msg::String msg;
|
||||
msg.data = "Hello World!";
|
||||
RCLCPP_INFO(this->get_logger(), "Publishing: '%s'", msg.data.c_str());
|
||||
pub_->publish(msg);
|
||||
}
|
||||
rclcpp::Publisher<std_msgs::msg::String>::SharedPtr pub_;
|
||||
};
|
||||
|
||||
class Listener : public rclcpp::Node
|
||||
{
|
||||
public:
|
||||
Listener(std::string name) : Node(name)
|
||||
{
|
||||
// Create a subscription on the "chatter" topic with the default callback
|
||||
// method.
|
||||
pub_ = this->create_publisher<std_msgs::msg::String>("/out", 10);
|
||||
sub_ = this->create_subscription<std_msgs::msg::String>(
|
||||
"chatter", 10,
|
||||
std::bind(&Listener::callback, this, std::placeholders::_1));
|
||||
}
|
||||
// the publisher must be public
|
||||
rclcpp::Subscription<std_msgs::msg::String>::SharedPtr sub_;
|
||||
rclcpp::Publisher<std_msgs::msg::String>::SharedPtr pub_;
|
||||
|
||||
private:
|
||||
void callback(const std_msgs::msg::String::SharedPtr msg)
|
||||
{
|
||||
RCLCPP_INFO(this->get_logger(), "I heard: '%s'", msg->data.c_str());
|
||||
std_msgs::msg::String msgn;
|
||||
msgn.data = "Hello World!";
|
||||
RCLCPP_INFO(this->get_logger(), "Publishing: 'I heard: %s'", msgn.data.c_str());
|
||||
pub_->publish(msgn);
|
||||
}
|
||||
};
|
||||
|
||||
int edf_usage_example(int argc, char **argv) {
|
||||
rclcpp::init(argc, argv);
|
||||
|
||||
auto talker = std::make_shared<Talker>();
|
||||
auto listener1 = std::make_shared<Listener>("listener1");
|
||||
//auto listener2 = std::make_shared<Listener>("listener2");
|
||||
rclcpp::ExecutorOptions options;
|
||||
|
||||
auto strategy = std::make_shared<priority_executor::PriorityMemoryStrategy<>>();
|
||||
options.memory_strategy = strategy;
|
||||
auto executor = new priority_executor::TimedExecutor(options);
|
||||
|
||||
// must be set to post_execute can set new deadlines
|
||||
executor->prio_memory_strategy_ = strategy;
|
||||
|
||||
|
||||
// the new funcitons in PriorityMemoryStrategy accept the handle of the
|
||||
// timer/subscription as the first argument
|
||||
strategy->set_executable_deadline(talker->timer_->get_timer_handle(), 1000,
|
||||
priority_executor::ExecutableType::TIMER, 0);
|
||||
// you _must_ set the timer_handle for each chain
|
||||
strategy->get_priority_settings(talker->timer_->get_timer_handle())->timer_handle = talker->timer_;
|
||||
// you _must_ mark the first executable in the chain
|
||||
strategy->set_first_in_chain(talker->timer_->get_timer_handle());
|
||||
// set the same period and chain_id for each callback in the chain
|
||||
strategy->set_executable_deadline(listener1->sub_->get_subscription_handle(),
|
||||
1000, priority_executor::ExecutableType::SUBSCRIPTION, 0);
|
||||
//strategy->set_executable_deadline(listener2->sub_->get_subscription_handle(), 1000, priority_executor::ExecutableType::SUBSCRIPTION, 0);
|
||||
// you _must_ mark the last executable in the chain (used to keep track of different instances of the same chain)
|
||||
strategy->set_last_in_chain(listener1->sub_->get_subscription_handle());
|
||||
// add all the nodes to the executor
|
||||
executor->add_node(talker);
|
||||
executor->add_node(listener1);
|
||||
//executor->add_node(listener2);
|
||||
|
||||
// if the executor behaves unexpectedly, you can print the priority settings to make sure they are correct
|
||||
std::cout << *strategy->get_priority_settings(
|
||||
talker->timer_->get_timer_handle())
|
||||
<< std::endl;
|
||||
std::cout << *strategy->get_priority_settings(
|
||||
listener1->sub_->get_subscription_handle())
|
||||
<< std::endl;
|
||||
//std::cout << *strategy->get_priority_settings(listener2->sub_->get_subscription_handle()) << std::endl;
|
||||
executor->spin();
|
||||
|
||||
rclcpp::shutdown();
|
||||
return 0;
|
||||
}
|
203
src/full_topology/src/nodes.hpp
Normal file
203
src/full_topology/src/nodes.hpp
Normal file
|
@ -0,0 +1,203 @@
|
|||
#pragma once
|
||||
|
||||
#include <chrono>
|
||||
#include <memory>
|
||||
#include <random>
|
||||
#include <string>
|
||||
#include <thread>
|
||||
#include <vector>
|
||||
|
||||
using namespace std::chrono_literals;
|
||||
|
||||
#include "rclcpp/rclcpp.hpp"
|
||||
#include "std_msgs/msg/string.hpp"
|
||||
|
||||
// A generic sensor node that publishes at a given rate + jitter
|
||||
#define DEFINE_SENSOR_NODE_CLASS(NAME) \
|
||||
class NAME : public rclcpp::Node \
|
||||
{ \
|
||||
public: \
|
||||
NAME(std::string const &name, \
|
||||
std::string const &topic, \
|
||||
int period_ms, \
|
||||
double jitter_ms_ = 0.0) \
|
||||
: Node(name) \
|
||||
{ \
|
||||
publisher_ = create_publisher<std_msgs::msg::String>(topic, 10); \
|
||||
timer_ = create_wall_timer(std::chrono::milliseconds(period_ms), [this, name, jitter_ms_]() \
|
||||
{ \
|
||||
double jitter = 0.0; \
|
||||
if (jitter_ms_ > 0.0) { \
|
||||
static thread_local std::mt19937 gen(std::random_device{}()); \
|
||||
std::normal_distribution<> d(0.0, jitter_ms_); \
|
||||
jitter = d(gen); \
|
||||
} \
|
||||
/* Ensure total delay is non-negative */ \
|
||||
int total_delay = std::max(0, static_cast<int>(std::round(jitter))); \
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(total_delay)); \
|
||||
auto msg = std_msgs::msg::String(); \
|
||||
msg.data = name + std::string(" publishes"); \
|
||||
publisher_->publish(msg); }); \
|
||||
} \
|
||||
\
|
||||
rclcpp::Publisher<std_msgs::msg::String>::SharedPtr publisher_; \
|
||||
rclcpp::TimerBase::SharedPtr timer_; \
|
||||
};
|
||||
|
||||
// A generic single-input processing node with a fixed delay +/- jitter
|
||||
#define DEFINE_PROCESSING_NODE_CLASS(NAME) \
|
||||
class NAME : public rclcpp::Node \
|
||||
{ \
|
||||
public: \
|
||||
NAME(const std::string & name, \
|
||||
const std::string & input_topic, \
|
||||
const std::string & output_topic, \
|
||||
int delay_ms, \
|
||||
double jitter_ms = 0.0) \
|
||||
: Node(name), delay_ms_(delay_ms), jitter_ms_(jitter_ms) \
|
||||
{ \
|
||||
subscription_ = create_subscription<std_msgs::msg::String>( \
|
||||
input_topic, 10, \
|
||||
[this, name](std_msgs::msg::String::SharedPtr msg) \
|
||||
{ \
|
||||
/* Simulate processing delay */ \
|
||||
double jitter = 0.0; \
|
||||
if (jitter_ms_ > 0.0) { \
|
||||
static thread_local std::mt19937 gen(std::random_device{}());\
|
||||
std::normal_distribution<> d(0.0, jitter_ms_); \
|
||||
jitter = d(gen); \
|
||||
} \
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds((int)(delay_ms_ + jitter))); \
|
||||
auto out = std_msgs::msg::String(); \
|
||||
out.data = msg->data + " -> " + name; \
|
||||
publisher_->publish(out); \
|
||||
}); \
|
||||
publisher_ = create_publisher<std_msgs::msg::String>(output_topic, 10); \
|
||||
} \
|
||||
\
|
||||
rclcpp::Subscription<std_msgs::msg::String>::SharedPtr subscription_; \
|
||||
rclcpp::Publisher<std_msgs::msg::String>::SharedPtr publisher_; \
|
||||
protected: \
|
||||
int delay_ms_; \
|
||||
double jitter_ms_; \
|
||||
};
|
||||
|
||||
// A generic single-input processing node with a fixed delay +/- jitter
|
||||
#ifdef USE_TIMER_IN_FUSION_NODES
|
||||
#define DEFINE_MULTI_PROCESSING_NODE_CLASS(NAME) \
|
||||
class NAME : public rclcpp::Node \
|
||||
{ \
|
||||
public: \
|
||||
NAME(std::string const &name, \
|
||||
std::vector<std::string> const &input_topics, \
|
||||
std::string const &output_topic, \
|
||||
int delay_ms, \
|
||||
double jitter_ms = 0.0) \
|
||||
: Node(name), delay_ms_(delay_ms), jitter_ms_(jitter_ms) \
|
||||
{ \
|
||||
for (auto &topic : input_topics) \
|
||||
{ \
|
||||
auto sub = create_subscription<std_msgs::msg::String>( \
|
||||
topic, 10, \
|
||||
[this, name](std_msgs::msg::String::SharedPtr msg) \
|
||||
{ \
|
||||
}); \
|
||||
subscriptions_.emplace_back(sub); \
|
||||
} \
|
||||
publisher_ = create_publisher<std_msgs::msg::String>(output_topic, 10); \
|
||||
/* Also publish fused result periodically */ \
|
||||
timer_ = create_wall_timer(std::chrono::milliseconds(delay_ms_), [this, name]() \
|
||||
{ \
|
||||
/* Simulate processing delay */ \
|
||||
double jitter = 0.0; \
|
||||
if (jitter_ms_ > 0.0) { \
|
||||
static thread_local std::mt19937 gen(std::random_device{}()); \
|
||||
std::normal_distribution<> d(0.0, jitter_ms_); \
|
||||
jitter = d(gen); \
|
||||
} \
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds((int)jitter)); \
|
||||
auto out = std_msgs::msg::String(); \
|
||||
out.data = name + std::string(" fused"); \
|
||||
publisher_->publish(out); \
|
||||
}); \
|
||||
} \
|
||||
\
|
||||
std::vector<rclcpp::Subscription<std_msgs::msg::String>::SharedPtr> subscriptions_; \
|
||||
rclcpp::Publisher<std_msgs::msg::String>::SharedPtr publisher_; \
|
||||
rclcpp::TimerBase::SharedPtr timer_; \
|
||||
protected: \
|
||||
int delay_ms_; \
|
||||
double jitter_ms_; \
|
||||
};
|
||||
#else
|
||||
#define DEFINE_MULTI_PROCESSING_NODE_CLASS(NAME) \
|
||||
class NAME : public rclcpp::Node \
|
||||
{ \
|
||||
public: \
|
||||
NAME(std::string const &name, \
|
||||
std::vector<std::string> const &input_topics, \
|
||||
std::string const &output_topic, \
|
||||
int delay_ms, \
|
||||
double jitter_ms = 0.0) \
|
||||
: Node(name), delay_ms_(delay_ms), jitter_ms_(jitter_ms) \
|
||||
{ \
|
||||
for (auto &topic : input_topics) \
|
||||
{ \
|
||||
auto sub = create_subscription<std_msgs::msg::String>( \
|
||||
topic, 10, \
|
||||
[this, name](std_msgs::msg::String::SharedPtr msg) \
|
||||
{ \
|
||||
/* Simulate processing delay */ \
|
||||
double jitter = 0.0; \
|
||||
if (jitter_ms_ > 0.0) { \
|
||||
static thread_local std::mt19937 gen(std::random_device{}()); \
|
||||
std::normal_distribution<> d(0.0, jitter_ms_); \
|
||||
jitter = d(gen); \
|
||||
} \
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds((int)(delay_ms_ + jitter))); \
|
||||
auto out = std_msgs::msg::String(); \
|
||||
out.data = msg->data + " -> " + name; \
|
||||
publisher_->publish(out); \
|
||||
}); \
|
||||
subscriptions_.emplace_back(sub); \
|
||||
} \
|
||||
publisher_ = create_publisher<std_msgs::msg::String>(output_topic, 10); \
|
||||
\
|
||||
} \
|
||||
\
|
||||
std::vector<rclcpp::Subscription<std_msgs::msg::String>::SharedPtr> subscriptions_; \
|
||||
rclcpp::Publisher<std_msgs::msg::String>::SharedPtr publisher_; \
|
||||
protected: \
|
||||
int delay_ms_; \
|
||||
double jitter_ms_; \
|
||||
};
|
||||
#endif
|
||||
|
||||
DEFINE_SENSOR_NODE_CLASS(CameraNode)
|
||||
DEFINE_PROCESSING_NODE_CLASS(DebayerNode)
|
||||
DEFINE_PROCESSING_NODE_CLASS(RadiometricNode)
|
||||
DEFINE_PROCESSING_NODE_CLASS(GeometricNode)
|
||||
DEFINE_PROCESSING_NODE_CLASS(MappingNode)
|
||||
|
||||
DEFINE_PROCESSING_NODE_CLASS(SmokeClassifierNode)
|
||||
|
||||
DEFINE_SENSOR_NODE_CLASS(GPSNode)
|
||||
DEFINE_SENSOR_NODE_CLASS(IMUNode)
|
||||
DEFINE_SENSOR_NODE_CLASS(BaroNode)
|
||||
|
||||
DEFINE_PROCESSING_NODE_CLASS(FusionNodeS)
|
||||
DEFINE_MULTI_PROCESSING_NODE_CLASS(FusionNode)
|
||||
|
||||
DEFINE_SENSOR_NODE_CLASS(LidarNode)
|
||||
|
||||
DEFINE_SENSOR_NODE_CLASS(CommandNode)
|
||||
|
||||
DEFINE_PROCESSING_NODE_CLASS(FlightManagementNodeS)
|
||||
DEFINE_MULTI_PROCESSING_NODE_CLASS(FlightManagementNode)
|
||||
|
||||
DEFINE_PROCESSING_NODE_CLASS(ControlNode)
|
||||
|
||||
DEFINE_PROCESSING_NODE_CLASS(TelemetryNodeS)
|
||||
DEFINE_MULTI_PROCESSING_NODE_CLASS(TelemetryNode)
|
||||
|
||||
DEFINE_PROCESSING_NODE_CLASS(RadioNode)
|
125
src/full_topology/src/simple.cpp
Normal file
125
src/full_topology/src/simple.cpp
Normal file
|
@ -0,0 +1,125 @@
|
|||
// full_topology.cpp
|
||||
|
||||
#include <chrono>
|
||||
#include <memory>
|
||||
#include <random>
|
||||
#include <string>
|
||||
#include <thread>
|
||||
#include <vector>
|
||||
// the order here matters
|
||||
#include "tracetools/utils.hpp"
|
||||
|
||||
#include "rclcpp/rclcpp.hpp"
|
||||
#include "std_msgs/msg/string.hpp"
|
||||
|
||||
#include "config.hpp"
|
||||
#include "nodes.hpp"
|
||||
|
||||
#include "priority_executor/priority_executor.hpp"
|
||||
#include "priority_executor/multithread_priority_executor.hpp"
|
||||
#include "priority_executor/priority_memory_strategy.hpp"
|
||||
|
||||
#define PROCENTUAL_INPUT_JITTER 0.05
|
||||
#define PROCENTUAL_PROCESSING_JITTER 0.1
|
||||
|
||||
#define STR_HELPER(x) #x
|
||||
#define STR(x) STR_HELPER(x)
|
||||
|
||||
#ifndef DEBUG
|
||||
#define PRINT_BEFORE(CHAIN_ID, NAME) \
|
||||
std::cout << "Chain " << CHAIN_ID << " before: " << CHAIN_DURATION(CHAIN_ID) << " ms" << std::endl;
|
||||
#define PRINT_AFTER(CHAIN_ID, NAME) \
|
||||
std::cout << "Chain " << CHAIN_ID << " after: " << CHAIN_DURATION(CHAIN_ID) << " ms" << std::endl;
|
||||
#else
|
||||
#define PRINT_BEFORE(CHAIN_ID, NAME)
|
||||
#define PRINT_AFTER(CHAIN_ID, NAME)
|
||||
#endif
|
||||
|
||||
#define CHAIN_DURATION(CHAIN_ID) chain_duration_##CHAIN_ID
|
||||
#define DECLARE_CHAIN(CHAIN_ID) double CHAIN_DURATION(CHAIN_ID) = 0.0;
|
||||
#define SUM_CHAIN_DURATION(CHAIN_ID, NAME) \
|
||||
PRINT_BEFORE(CHAIN_ID, NAME); \
|
||||
CHAIN_DURATION(CHAIN_ID) += NAME##_duration + NAME##_jitter; \
|
||||
PRINT_AFTER(CHAIN_ID, NAME);
|
||||
|
||||
#define MAKE_INPUT_NODE(CHAIN_ID, NAME, CLASS, TOPIC, PERIOD) \
|
||||
auto NAME##_duration = (PERIOD); \
|
||||
auto NAME##_jitter = PROCENTUAL_INPUT_JITTER * (PERIOD); \
|
||||
auto NAME = std::make_shared<CLASS>(std::string("input_" STR(NAME) "_node"), TOPIC, NAME##_duration, NAME##_jitter); \
|
||||
SUM_CHAIN_DURATION(CHAIN_ID, NAME)
|
||||
|
||||
#define MAKE_PROCESSING_NODE(CHAIN_ID, NAME, CLASS, INPUT_TOPIC, OUTPUT_TOPIC, DURATION) \
|
||||
auto NAME##_duration = (DURATION); \
|
||||
auto NAME##_jitter = PROCENTUAL_PROCESSING_JITTER * (DURATION); \
|
||||
auto NAME = std::make_shared<CLASS>(std::string(STR(NAME) "_node"), INPUT_TOPIC, OUTPUT_TOPIC, NAME##_duration, NAME##_jitter); \
|
||||
SUM_CHAIN_DURATION(CHAIN_ID, NAME)
|
||||
|
||||
#define MAKE_OUTPUT_NODE(CHAIN_ID, NAME, CLASS, INPUT_TOPIC, OUTPUT_TOPIC, DURATION) \
|
||||
auto NAME##_duration = (DURATION); \
|
||||
auto NAME##_jitter = PROCENTUAL_PROCESSING_JITTER * (DURATION); \
|
||||
auto NAME = std::make_shared<CLASS>(std::string("output_" STR(NAME) "_node"), INPUT_TOPIC, OUTPUT_TOPIC, NAME##_duration, NAME##_jitter); \
|
||||
SUM_CHAIN_DURATION(CHAIN_ID, NAME)
|
||||
|
||||
|
||||
int main(int argc, char **argv)
|
||||
{
|
||||
DECLARE_CHAIN(0); // Chain 1: Image2Ground Mapping
|
||||
|
||||
rclcpp::init(argc, argv);
|
||||
|
||||
// Create a priority executor with a custom memory strategy
|
||||
auto strategy = std::make_shared<priority_executor::PriorityMemoryStrategy<>>();
|
||||
rclcpp::ExecutorOptions options;
|
||||
options.memory_strategy = strategy;
|
||||
auto executor = priority_executor::MultithreadTimedExecutor(options);
|
||||
|
||||
// Must be set to post_execute can set new deadlines
|
||||
executor.prio_memory_strategy_ = strategy;
|
||||
|
||||
// chain id, node name, node class, input topic, output topic, duration in ms
|
||||
MAKE_INPUT_NODE( 0, cameraA , CameraNode , /*has no input topic*/ "/input/cameraA/raw" , 1000.0/2.0); // 60 FPS camera, fast
|
||||
MAKE_PROCESSING_NODE(0, debayerA , DebayerNode , "/input/cameraA/raw" , "/cameraA/debayered" , 18); // Moderate, image conversion
|
||||
MAKE_PROCESSING_NODE(0, radiometricA, RadiometricNode, "/cameraA/debayered" , "/cameraA/radiometric" , 22); // Heavier image math
|
||||
MAKE_PROCESSING_NODE(0, geometric , GeometricNode , "/cameraA/radiometric", "/cameraA/geometric" , 12); // Perspective transform, medium
|
||||
MAKE_OUTPUT_NODE( 0, mapping , MappingNode , "/cameraA/geometric" , "/output/cameraA/mapped", 16); // Warping/stitching, moderate
|
||||
|
||||
// the new funcitons in PriorityMemoryStrategy accept the handle of the
|
||||
// timer/subscription as the first argument
|
||||
strategy->set_executable_deadline(cameraA->timer_->get_timer_handle() , 1000, priority_executor::ExecutableType::TIMER , 0);
|
||||
// you _must_ set the timer_handle for each chain
|
||||
strategy->get_priority_settings(cameraA->timer_->get_timer_handle())->timer_handle = cameraA->timer_;
|
||||
// you _must_ mark the first executable in the chain
|
||||
strategy->set_first_in_chain(cameraA->timer_->get_timer_handle());
|
||||
|
||||
// Set the executable deadlines for each node
|
||||
strategy->set_executable_deadline(debayerA->subscription_->get_subscription_handle() , 1000, priority_executor::ExecutableType::SUBSCRIPTION, 0);
|
||||
strategy->set_executable_deadline(radiometricA->subscription_->get_subscription_handle(), 1000, priority_executor::ExecutableType::SUBSCRIPTION, 0);
|
||||
strategy->set_executable_deadline(geometric->subscription_->get_subscription_handle() , 1000, priority_executor::ExecutableType::SUBSCRIPTION, 0);
|
||||
strategy->set_executable_deadline(mapping->subscription_->get_subscription_handle() , 1000, priority_executor::ExecutableType::SUBSCRIPTION, 0);
|
||||
|
||||
// you _must_ mark the last executable in the chain (used to keep track of different instances of the same chain)
|
||||
strategy->set_last_in_chain(mapping->subscription_->get_subscription_handle());
|
||||
|
||||
executor.add_node(cameraA);
|
||||
executor.add_node(debayerA);
|
||||
executor.add_node(radiometricA);
|
||||
executor.add_node(geometric);
|
||||
executor.add_node(mapping);
|
||||
|
||||
std::cout << *strategy->get_priority_settings(cameraA->timer_->get_timer_handle())
|
||||
<< std::endl;
|
||||
std::cout << *strategy->get_priority_settings(debayerA->subscription_->get_subscription_handle())
|
||||
<< std::endl;
|
||||
std::cout << *strategy->get_priority_settings(radiometricA->subscription_->get_subscription_handle())
|
||||
<< std::endl;
|
||||
std::cout << *strategy->get_priority_settings(geometric->subscription_->get_subscription_handle())
|
||||
<< std::endl;
|
||||
std::cout << *strategy->get_priority_settings(mapping->subscription_->get_subscription_handle())
|
||||
<< std::endl;
|
||||
|
||||
executor.spin();
|
||||
|
||||
rclcpp::shutdown();
|
||||
|
||||
return 0;
|
||||
}
|
|
@ -1 +1 @@
|
|||
Subproject commit c44b7b8c3c527a8d16739349b50340a7de9b6f51
|
||||
Subproject commit 5f73198adbabb3a34985fb5e5c7f84b5e4149a86
|
Loading…
Add table
Add a link
Reference in a new issue