add boosted flag (int) to the experiment pipeline

This commit is contained in:
Niklas Halle 2025-06-16 14:52:39 +02:00
parent 5b60f1b821
commit 23c3cd5ceb
4 changed files with 87 additions and 11 deletions

View file

@ -11,14 +11,15 @@ LOCAL_PROJECT_DIR="/home/niklas/ROS-Dynamic-Executor-Experiments"
# -------------------------------------- # --------------------------------------
usage() { usage() {
echo "Usage: $0 <ros|edf|sem> <single|multi> <timed|direct> [time_in_seconds] [no_rebuild]" echo "Usage: $0 <ros|edf|sem> <single|multi> <timed|direct> [boosted] [time_in_seconds] [no_rebuild]"
echo "Example: $0 ros multi timed 10" echo "Example: $0 ros multi timed 10"
echo "This script builds the full_topology package, runs a trace, and copies the results to a remote server." echo "This script builds the full_topology package, runs a trace, and copies the results to a remote server."
echo "Parameters:" echo "Parameters:"
echo " <ros|edf|sem> : Scheduler type (ros, edf, sem)" echo " <ros|edf|sem> : Scheduler type (ros, edf, sem)"
echo " <single|multi> : Threading type (single, multi)" echo " <single|multi> : Threading type (single, multi)"
echo " <timed|direct> : Fusion type (timed, direct)" echo " <timed|direct> : Fusion type (timed, direct)"
echo " <time_in_seconds> : Length of the trace (default: 10)" echo " [boosted] : Optional flag to enable boosted mode (default: 1000, equals to disabled)"
echo " [time_in_seconds] : Length of the trace (default: 10)"
echo " [no_rebuild] : Optional flag to skip rebuilding the package (true/false, default: false)" echo " [no_rebuild] : Optional flag to skip rebuilding the package (true/false, default: false)"
exit 1 exit 1
} }
@ -30,19 +31,29 @@ fi
SCHEDULER_TYPE="$1" SCHEDULER_TYPE="$1"
THREADING_TYPE="$2" THREADING_TYPE="$2"
FUSION_TYPE="$3" FUSION_TYPE="$3"
TRACE_LENGTH="${4:-10}" BOOSTED="${4:-1000}" # Default to 1000 if not provided
NO_REBUILD="${5:-false}" TRACE_LENGTH="${5:-10}"
NO_REBUILD="${6:-false}"
# When boosted is not set to 1000, we assume it is enabled
if [[ "$BOOSTED" != "1000" ]]; then
BOOSTED_FLAG="_boosted_${BOOSTED}"
else
BOOSTED_ENABLED=""
fi
# Determine defines based on parameters # Determine defines based on parameters
CMAKE_ARGS=() CMAKE_ARGS=()
[[ "$SCHEDULER_TYPE" == "ros" ]] && CMAKE_ARGS+=("-DROS_DEFAULT_EXECUTOR=ON") [[ "$SCHEDULER_TYPE" == "ros" ]] && CMAKE_ARGS+=("-DROS_DEFAULT_EXECUTOR=ON")
[[ "$SCHEDULER_TYPE" == "edf" ]] && CMAKE_ARGS+=("-DEDF_PRIORITY_EXECUTOR=ON") [[ "$SCHEDULER_TYPE" == "edf" ]] && CMAKE_ARGS+=("-DEDF_PRIORITY_EXECUTOR=ON")
[[ "$SCHEDULER_TYPE" == "sem" ]] && CMAKE_ARGS+=("-DSEMANTIC_EXECUTOR=ON") [[ "$SCHEDULER_TYPE" == "sem" ]] && CMAKE_ARGS+=("-DEDF_PRIORITY_EXECUTOR=ON") && CMAKE_ARGS+=("-DSEMANTIC_EXECUTOR=ON")
[[ "$THREADING_TYPE" == "multi" ]] && CMAKE_ARGS+=("-DMULTI_THREADED=ON") [[ "$THREADING_TYPE" == "multi" ]] && CMAKE_ARGS+=("-DMULTI_THREADED=ON")
[[ "$FUSION_TYPE" == "timed" ]] && CMAKE_ARGS+=("-DUSE_TIMER_IN_FUSION_NODES=ON") [[ "$FUSION_TYPE" == "timed" ]] && CMAKE_ARGS+=("-DUSE_TIMER_IN_FUSION_NODES=ON")
# We always hand the boosted value down
CMAKE_ARGS+=("-DBOOSTED=$BOOSTED")
TRACE_NAME="${SCHEDULER_TYPE}_${THREADING_TYPE}_${FUSION_TYPE}_${TRACE_LENGTH}" TRACE_NAME="${SCHEDULER_TYPE}_${THREADING_TYPE}_${FUSION_TYPE}_${TRACE_LENGTH}${BOOSTED_FLAG}"
cd "$LOCAL_PROJECT_DIR" cd "$LOCAL_PROJECT_DIR"

View file

@ -50,6 +50,10 @@ if(USE_TIMER_IN_FUSION_NODES)
add_compile_definitions(USE_TIMER_IN_FUSION_NODES) add_compile_definitions(USE_TIMER_IN_FUSION_NODES)
endif() endif()
set(BOOSTED "1000" CACHE STRING "Boosted value for C++")
message(NOTICE "BOOSTED: ${BOOSTED}")
add_compile_definitions(BOOSTED=${BOOSTED})
add_compile_definitions(RCLCPP_ENABLE_TRACEPOINTS) add_compile_definitions(RCLCPP_ENABLE_TRACEPOINTS)
add_executable(${PROJECT_NAME} src/full_topology.cpp) add_executable(${PROJECT_NAME} src/full_topology.cpp)

View file

@ -87,10 +87,7 @@ int main(int argc, char **argv)
DECLARE_CHAIN(2); // Chain 3: Flight Control DECLARE_CHAIN(2); // Chain 3: Flight Control
DECLARE_CHAIN(3); // Chain 4: Data Storage/Streaming DECLARE_CHAIN(3); // Chain 4: Data Storage/Streaming
#if defined(SEMANTIC_EXECUTOR) #if defined(EDF_PRIORITY_EXECUTOR)
std::cerr << "Not yet implemented" << std::endl;
return -1;
#elif defined(EDF_PRIORITY_EXECUTOR)
// Create a priority executor with a custom memory strategy // Create a priority executor with a custom memory strategy
auto strategy = std::make_shared<priority_executor::PriorityMemoryStrategy<>>(); auto strategy = std::make_shared<priority_executor::PriorityMemoryStrategy<>>();
rclcpp::ExecutorOptions options; rclcpp::ExecutorOptions options;
@ -185,7 +182,7 @@ int main(int argc, char **argv)
// --- // ---
chain_id = 1; chain_id = 1;
chain_period = 1000; chain_period = BOOSTED;
{ {
strategy->set_executable_deadline(cameraB->timer_->get_timer_handle(), chain_period, priority_executor::ExecutableType::TIMER, chain_id); strategy->set_executable_deadline(cameraB->timer_->get_timer_handle(), chain_period, priority_executor::ExecutableType::TIMER, chain_id);
strategy->get_priority_settings(cameraB->timer_->get_timer_handle())->timer_handle = cameraB->timer_; strategy->get_priority_settings(cameraB->timer_->get_timer_handle())->timer_handle = cameraB->timer_;
@ -194,6 +191,15 @@ int main(int argc, char **argv)
strategy->set_executable_deadline(radiometricB->subscription_->get_subscription_handle(), chain_period, priority_executor::ExecutableType::SUBSCRIPTION, chain_id); strategy->set_executable_deadline(radiometricB->subscription_->get_subscription_handle(), chain_period, priority_executor::ExecutableType::SUBSCRIPTION, chain_id);
strategy->set_executable_deadline(smoke_classifier->subscription_->get_subscription_handle(), chain_period, priority_executor::ExecutableType::SUBSCRIPTION, chain_id); strategy->set_executable_deadline(smoke_classifier->subscription_->get_subscription_handle(), chain_period, priority_executor::ExecutableType::SUBSCRIPTION, chain_id);
strategy->set_last_in_chain(smoke_classifier->subscription_->get_subscription_handle()); strategy->set_last_in_chain(smoke_classifier->subscription_->get_subscription_handle());
#if defined(SEMANTIC_EXECUTOR)
smoke_classifier->chain_id_ = chain_id;
smoke_classifier->strategy_ = strategy;
smoke_classifier->executables_.reserve(4);
smoke_classifier->executables_.push_back({cameraB->timer_->get_timer_handle(), priority_executor::ExecutableType::TIMER});
smoke_classifier->executables_.push_back({debayerB->subscription_->get_subscription_handle(), priority_executor::ExecutableType::SUBSCRIPTION});
smoke_classifier->executables_.push_back({radiometricB->subscription_->get_subscription_handle(), priority_executor::ExecutableType::SUBSCRIPTION});
smoke_classifier->executables_.push_back({smoke_classifier->subscription_->get_subscription_handle(), priority_executor::ExecutableType::SUBSCRIPTION});
#endif
} }
// --- // ---

View file

@ -12,6 +12,10 @@ using namespace std::chrono_literals;
#include "rclcpp/rclcpp.hpp" #include "rclcpp/rclcpp.hpp"
#include "std_msgs/msg/string.hpp" #include "std_msgs/msg/string.hpp"
#include "priority_executor/priority_executor.hpp"
#include "priority_executor/multithread_priority_executor.hpp"
#include "priority_executor/priority_memory_strategy.hpp"
// A generic sensor node that publishes at a given rate + jitter // A generic sensor node that publishes at a given rate + jitter
#define DEFINE_SENSOR_NODE_CLASS(NAME) \ #define DEFINE_SENSOR_NODE_CLASS(NAME) \
class NAME : public rclcpp::Node \ class NAME : public rclcpp::Node \
@ -180,7 +184,58 @@ DEFINE_PROCESSING_NODE_CLASS(RadiometricNode)
DEFINE_PROCESSING_NODE_CLASS(GeometricNode) DEFINE_PROCESSING_NODE_CLASS(GeometricNode)
DEFINE_PROCESSING_NODE_CLASS(MappingNode) DEFINE_PROCESSING_NODE_CLASS(MappingNode)
#ifdef SEMANTIC_EXECUTOR
class SmokeClassifierNode : public rclcpp::Node
{
public:
SmokeClassifierNode(const std::string & name,
const std::string & input_topic,
const std::string & output_topic,
int delay_ms,
double jitter_ms = 0.0)
: Node(name), delay_ms_(delay_ms), jitter_ms_(jitter_ms)
{
subscription_ = create_subscription<std_msgs::msg::String>(
input_topic, 10,
[this, name](std_msgs::msg::String::SharedPtr msg)
{
auto now = std::chrono::steady_clock::now();
auto elapsed = std::chrono::duration_cast<std::chrono::seconds>(now - construction_time_).count();
if (elapsed > 10) {
for (auto const & [exec, type] : executables_) {
strategy_->set_executable_deadline(exec, 10, type, chain_id_);
}
}
double jitter = 0.0;
if (jitter_ms_ > 0.0) {
static thread_local std::mt19937 gen(std::random_device{}());\
std::normal_distribution<> d(0.0, jitter_ms_);
jitter = d(gen);
}
std::this_thread::sleep_for(std::chrono::milliseconds((int)(delay_ms_ + jitter)));
auto out = std_msgs::msg::String();
out.data = msg->data + " -> " + name;
publisher_->publish(out);
});
publisher_ = create_publisher<std_msgs::msg::String>(output_topic, 10);
}
rclcpp::Subscription<std_msgs::msg::String>::SharedPtr subscription_;
rclcpp::Publisher<std_msgs::msg::String>::SharedPtr publisher_;
// keep a strategy pointer and a vector of executables
std::shared_ptr<priority_executor::PriorityMemoryStrategy<> > strategy_;
std::vector<std::pair<std::shared_ptr<const void>, priority_executor::ExecutableType> > executables_;
int chain_id_ = -1;
protected:
int delay_ms_;
double jitter_ms_;
// record our construction time stamp
std::chrono::steady_clock::time_point construction_time_ = std::chrono::steady_clock::now();
};
#else
DEFINE_PROCESSING_NODE_CLASS(SmokeClassifierNode) DEFINE_PROCESSING_NODE_CLASS(SmokeClassifierNode)
#endif
DEFINE_SENSOR_NODE_CLASS(GPSNode) DEFINE_SENSOR_NODE_CLASS(GPSNode)
DEFINE_SENSOR_NODE_CLASS(IMUNode) DEFINE_SENSOR_NODE_CLASS(IMUNode)