seperate chains completely, fix simple example
This commit is contained in:
		
							parent
							
								
									7397b124b6
								
							
						
					
					
						commit
						20c7abdff6
					
				
					 4 changed files with 83 additions and 55 deletions
				
			
		|  | @ -62,4 +62,4 @@ for time_mode in $TIMER_MODES; do | |||
|             run_test_suite "edf" "$boost" "$time_mode" "$thread_mode" | ||||
|         done | ||||
|     done | ||||
| done | ||||
| done | ||||
|  |  | |||
|  | @ -33,12 +33,17 @@ | |||
| #endif | ||||
| 
 | ||||
| #ifdef EDF_PRIORITY_EXECUTOR | ||||
|   #define CHAIN_DURATION(CHAIN_ID) chain_duration_##CHAIN_ID | ||||
|   #define DECLARE_CHAIN(CHAIN_ID) double CHAIN_DURATION(CHAIN_ID) = 0.0; | ||||
|   #define SUM_CHAIN_DURATION(CHAIN_ID, NAME) \ | ||||
|     PRINT_BEFORE(CHAIN_ID, NAME); \ | ||||
|     CHAIN_DURATION(CHAIN_ID) += NAME##_duration + NAME##_jitter; \ | ||||
|     PRINT_AFTER(CHAIN_ID, NAME); | ||||
|   #ifdef DEBUG | ||||
|     #define CHAIN_DURATION(CHAIN_ID) chain_duration_##CHAIN_ID | ||||
|     #define DECLARE_CHAIN(CHAIN_ID) double CHAIN_DURATION(CHAIN_ID) = 0.0; | ||||
|     #define SUM_CHAIN_DURATION(CHAIN_ID, NAME) \ | ||||
|       PRINT_BEFORE(CHAIN_ID, NAME); \ | ||||
|       CHAIN_DURATION(CHAIN_ID) += NAME##_duration + NAME##_jitter; \ | ||||
|       PRINT_AFTER(CHAIN_ID, NAME); | ||||
|   #else | ||||
|     #define DECLARE_CHAIN(CHAIN_ID) | ||||
|     #define SUM_CHAIN_DURATION(CHAIN_ID, NAME) | ||||
|   #endif | ||||
| #else | ||||
|   #define DECLARE_CHAIN(CHAIN_ID) | ||||
|   #define SUM_CHAIN_DURATION(CHAIN_ID, NAME) | ||||
|  | @ -114,49 +119,52 @@ int main(int argc, char **argv) | |||
|   #endif | ||||
| #endif | ||||
| 
 | ||||
|   int chain_id = 0; | ||||
|   // Chain 1: Image2Ground Mapping
 | ||||
|   MAKE_INPUT_NODE(0, cameraA, CameraNode, "/input/cameraA/raw", 200); // 60 FPS camera, fast
 | ||||
|   MAKE_PROCESSING_NODE(0, debayerA, DebayerNode, "/input/cameraA/raw", "/cameraA/debayered", 18); // Moderate, image conversion
 | ||||
|   MAKE_PROCESSING_NODE(0, radiometricA, RadiometricNode, "/cameraA/debayered", "/cameraA/radiometric", 22); // Heavier image math
 | ||||
|   MAKE_PROCESSING_NODE(0, geometric, GeometricNode, "/cameraA/radiometric", "/cameraA/geometric", 12); // Perspective transform, medium
 | ||||
|   MAKE_OUTPUT_NODE(0, mapping, MappingNode, "/cameraA/geometric", "/output/cameraA/mapped", 16); // Warping/stitching, moderate
 | ||||
|   MAKE_INPUT_NODE(chain_id, cameraA, CameraNode, "/input/cameraA/raw", 200); | ||||
|   MAKE_PROCESSING_NODE(chain_id, debayerA, DebayerNode, "/input/cameraA/raw", "/cameraA/debayered", 18); | ||||
|   MAKE_PROCESSING_NODE(chain_id, radiometricA, RadiometricNode, "/cameraA/debayered", "/cameraA/radiometric", 22); | ||||
|   MAKE_PROCESSING_NODE(chain_id, geometric, GeometricNode, "/cameraA/radiometric", "/cameraA/geometric", 12); | ||||
|   MAKE_OUTPUT_NODE(chain_id, mapping, MappingNode, "/cameraA/geometric", "/output/cameraA/mapped", 16); | ||||
|    | ||||
|   // Chain 2: Semantic Scheduler Adaptation
 | ||||
|   MAKE_INPUT_NODE(1, cameraB, CameraNode, "/input/cameraB/raw", 150); // 60 FPS camera, fast
 | ||||
|   MAKE_PROCESSING_NODE(1, debayerB, DebayerNode, "/input/cameraB/raw", "/cameraB/debayered", 18); // Moderate, image conversion
 | ||||
|   MAKE_PROCESSING_NODE(1, radiometricB, RadiometricNode, "/cameraB/debayered", "/cameraB/radiometric", 22); // Heavier image math
 | ||||
|   MAKE_OUTPUT_NODE(1, smoke_classifier, SmokeClassifierNode, "/cameraB/radiometric", "/output/classifier/classification", 35); // CNN/ML classifier, can be high
 | ||||
|   chain_id = 1; | ||||
|   MAKE_INPUT_NODE(chain_id, cameraB, CameraNode, "/input/cameraB/raw", 150); | ||||
|   MAKE_PROCESSING_NODE(chain_id, debayerB, DebayerNode, "/input/cameraB/raw", "/cameraB/debayered", 18); | ||||
|   MAKE_PROCESSING_NODE(chain_id, radiometricB, RadiometricNode, "/cameraB/debayered", "/cameraB/radiometric", 22); | ||||
|   MAKE_OUTPUT_NODE(chain_id, smoke_classifier, SmokeClassifierNode, "/cameraB/radiometric", "/output/classifier/classification", 35); | ||||
| 
 | ||||
|   // Chain 3: Flight Control
 | ||||
|   chain_id = 2; | ||||
|   std::vector<std::string> fusionA_input_topics = {"/input/gpsA/fix", "/input/imuA/data", "/input/baroA/alt"}; | ||||
|   MAKE_INPUT_NODE(2, gpsA, GPSNode, "/input/gpsA/fix", 100);    // Fast, low latency sensor
 | ||||
|   MAKE_INPUT_NODE(2, imuA, IMUNode, "/input/imuA/data", 100);   // Fast, low latency sensor
 | ||||
|   MAKE_INPUT_NODE(2, baroA, BaroNode, "/input/baroA/alt", 100); // Fast, low latency sensor
 | ||||
|   MAKE_PROCESSING_NODE(2, fusionA, FusionNode, fusionA_input_topics, "/sensorsA/fused", 14); // Sensor fusion, not trivial but not heavy
 | ||||
|   MAKE_INPUT_NODE(chain_id, gpsA, GPSNode, "/input/gpsA/fix", 100); | ||||
|   MAKE_INPUT_NODE(chain_id, imuA, IMUNode, "/input/imuA/data", 100); | ||||
|   MAKE_INPUT_NODE(chain_id, baroA, BaroNode, "/input/baroA/alt", 100); | ||||
|   MAKE_PROCESSING_NODE(chain_id, fusionA, FusionNode, fusionA_input_topics, "/sensorsA/fused", 14); | ||||
|    | ||||
|   MAKE_INPUT_NODE(2, lidar, LidarNode, "/input/lidar/scan", 100); // LIDAR driver, can be fast
 | ||||
|   MAKE_INPUT_NODE(2, cmd, CommandNode, "/input/operator/commands", 100); // Operator commands, fast
 | ||||
|   MAKE_INPUT_NODE(chain_id, lidar, LidarNode, "/input/lidar/scan", 100); | ||||
|   MAKE_INPUT_NODE(chain_id, cmd, CommandNode, "/input/operator/commands", 100); | ||||
|   std::vector<std::string> mgmt_input_topics = {"/sensorsA/fused", "/input/lidar/scan", "/input/operator/commands"}; | ||||
|   MAKE_PROCESSING_NODE(2, mgmt, FlightManagementNode, mgmt_input_topics, "/flight/plan", 18); // Path planning, moderate
 | ||||
|   //MAKE_PROCESSING_NODE(2, mgmt, FlightManagementNodeS, "/input/operator/commands", "/flight/plan", 18); // Path planning, moderate
 | ||||
|   MAKE_OUTPUT_NODE(2, control, ControlNode, "/flight/plan", "/output/flight/cmd", 12); // PID controller, fast but non-zero
 | ||||
|   MAKE_PROCESSING_NODE(chain_id, mgmt, FlightManagementNode, mgmt_input_topics, "/flight/plan", 18); | ||||
|   //MAKE_PROCESSING_NODE(chain_id, mgmt, FlightManagementNodeS, "/input/operator/commands", "/flight/plan", 18);
 | ||||
|   MAKE_OUTPUT_NODE(chain_id, control, ControlNode, "/flight/plan", "/output/flight/cmd", 12); | ||||
| 
 | ||||
|   // Chain 4: Data Storage/Streaming
 | ||||
|   chain_id = 3; | ||||
|   std::vector<std::string> fusionB_input_topics = {"/input/gpsB/fix", "/input/imuB/data", "/input/baroB/alt"}; | ||||
|   MAKE_INPUT_NODE(3, gpsB, GPSNode, "/input/gpsB/fix", 100);    // Fast, low latency sensor
 | ||||
|   MAKE_INPUT_NODE(3, imuB, IMUNode, "/input/imuB/data", 100);   // Fast, low latency sensor
 | ||||
|   MAKE_INPUT_NODE(3, baroB, BaroNode, "/input/baroB/alt", 100); // Fast, low latency sensor
 | ||||
|   MAKE_PROCESSING_NODE(3, fusionB, FusionNode, fusionB_input_topics, "/sensorsB/fused", 14); // Sensor fusion, not trivial but not heavy
 | ||||
|   //MAKE_PROCESSING_NODE(3, fusionB, FusionNodeS, "/input/baroB/alt", "/sensorsB/fused", 14); // Sensor fusion, not trivial but not heavy
 | ||||
|   MAKE_INPUT_NODE(chain_id, gpsB, GPSNode, "/input/gpsB/fix", 100);    // Fast, low latency sensor
 | ||||
|   MAKE_INPUT_NODE(chain_id, imuB, IMUNode, "/input/imuB/data", 100);   // Fast, low latency sensor
 | ||||
|   MAKE_INPUT_NODE(chain_id, baroB, BaroNode, "/input/baroB/alt", 100); // Fast, low latency sensor
 | ||||
|   MAKE_PROCESSING_NODE(chain_id, fusionB, FusionNode, fusionB_input_topics, "/sensorsB/fused", 14); | ||||
|   //MAKE_PROCESSING_NODE(chain_id, fusionB, FusionNodeS, "/input/baroB/alt", "/sensorsB/fused", 14);
 | ||||
| 
 | ||||
|   std::vector<std::string> telemetry_input_topics = {"/sensorsB/fused", "/output/cameraA/mapped"}; | ||||
|   MAKE_PROCESSING_NODE(3, telemetry, TelemetryNode, telemetry_input_topics, "/telemetry/data", 6); // Quick serialization, low latency
 | ||||
|   //MAKE_PROCESSING_NODE(3, telemetry, TelemetryNodeS, "/sensorsB/fused", "/telemetry/data", 6); // Quick serialization, low latency
 | ||||
|   MAKE_OUTPUT_NODE(3, radio, RadioNode, "/telemetry/data", "/output/telemetry/radio", 3); // Output, almost instant
 | ||||
|   //std::vector<std::string> telemetry_input_topics = {"/sensorsB/fused", "/output/cameraA/mapped"};
 | ||||
|   //MAKE_PROCESSING_NODE(chain_id, telemetry, TelemetryNode, telemetry_input_topics, "/telemetry/data", 6);
 | ||||
|   MAKE_PROCESSING_NODE(chain_id, telemetryS, TelemetryNodeS, "/sensorsB/fused", "/telemetry/data", 6); | ||||
|   MAKE_OUTPUT_NODE(chain_id, radio, RadioNode, "/telemetry/data", "/output/telemetry/radio", 3); | ||||
| 
 | ||||
| #if defined(EDF_PRIORITY_EXECUTOR) | ||||
|   int chain_period; | ||||
|   int chain_id; | ||||
| 
 | ||||
|   // ---
 | ||||
|   chain_id = 0; | ||||
|  | @ -179,7 +187,7 @@ int main(int argc, char **argv) | |||
|     // you _must_ mark the last executable in the chain (used to keep track of different instances of the same chain)
 | ||||
|     strategy->set_last_in_chain(mapping->subscription_->get_subscription_handle()); | ||||
|     strategy->get_priority_settings(mapping->subscription_->get_subscription_handle())->timer_handle = cameraA->timer_; | ||||
| } | ||||
|   } | ||||
| 
 | ||||
|   // ---
 | ||||
|   chain_id = 1; | ||||
|  | @ -233,7 +241,8 @@ int main(int argc, char **argv) | |||
|     strategy->set_executable_deadline(imuB->timer_->get_timer_handle(), chain_period, priority_executor::ExecutableType::TIMER, chain_id, "imuB"); | ||||
|     strategy->set_executable_deadline(gpsB->timer_->get_timer_handle(), chain_period, priority_executor::ExecutableType::TIMER, chain_id, "gpsB"); | ||||
|     REGISTER_MULTI_PROCESSING_NODE(fusionB, chain_period, chain_id); | ||||
|     REGISTER_MULTI_PROCESSING_NODE(telemetry, chain_period, chain_id); | ||||
|     //REGISTER_MULTI_PROCESSING_NODE(telemetry, chain_period, chain_id);
 | ||||
|     strategy->set_executable_deadline(telemetryS->subscription_->get_subscription_handle(), chain_period, priority_executor::ExecutableType::SUBSCRIPTION, chain_id, "telemetryS"); | ||||
|     strategy->set_executable_deadline(radio->subscription_->get_subscription_handle(), chain_period, priority_executor::ExecutableType::SUBSCRIPTION, chain_id, "radio"); | ||||
|     strategy->set_last_in_chain(radio->subscription_->get_subscription_handle()); | ||||
|     strategy->get_priority_settings(radio->subscription_->get_subscription_handle())->timer_handle = baroB->timer_; | ||||
|  | @ -261,7 +270,7 @@ int main(int argc, char **argv) | |||
|   executor.add_node(baroB); | ||||
|   executor.add_node(fusionB); | ||||
|    | ||||
|   executor.add_node(telemetry); | ||||
|   executor.add_node(telemetryS); | ||||
|   executor.add_node(radio); | ||||
| 
 | ||||
|   executor.add_node(lidar); | ||||
|  |  | |||
|  | @ -6,13 +6,12 @@ | |||
| #include <string> | ||||
| #include <thread> | ||||
| #include <vector> | ||||
| // the order here matters
 | ||||
| 
 | ||||
| #include "tracetools/utils.hpp" | ||||
| 
 | ||||
| #include "rclcpp/rclcpp.hpp" | ||||
| #include "std_msgs/msg/string.hpp" | ||||
| 
 | ||||
| #include "config.hpp" | ||||
| #include "nodes.hpp" | ||||
| 
 | ||||
| #include "priority_executor/priority_executor.hpp" | ||||
|  | @ -35,12 +34,17 @@ | |||
|   #define PRINT_AFTER(CHAIN_ID, NAME) | ||||
| #endif | ||||
| 
 | ||||
| #define CHAIN_DURATION(CHAIN_ID) chain_duration_##CHAIN_ID | ||||
| #define DECLARE_CHAIN(CHAIN_ID) double CHAIN_DURATION(CHAIN_ID) = 0.0; | ||||
| #define SUM_CHAIN_DURATION(CHAIN_ID, NAME) \ | ||||
| PRINT_BEFORE(CHAIN_ID, NAME); \ | ||||
| CHAIN_DURATION(CHAIN_ID) += NAME##_duration + NAME##_jitter; \ | ||||
| PRINT_AFTER(CHAIN_ID, NAME); | ||||
| #ifdef EDF_PRIORITY_EXECUTOR | ||||
|   #define CHAIN_DURATION(CHAIN_ID) chain_duration_##CHAIN_ID | ||||
|   #define DECLARE_CHAIN(CHAIN_ID) double CHAIN_DURATION(CHAIN_ID) = 0.0; | ||||
|   #define SUM_CHAIN_DURATION(CHAIN_ID, NAME) \ | ||||
|     PRINT_BEFORE(CHAIN_ID, NAME); \ | ||||
|     CHAIN_DURATION(CHAIN_ID) += NAME##_duration + NAME##_jitter; \ | ||||
|     PRINT_AFTER(CHAIN_ID, NAME); | ||||
| #else | ||||
|   #define DECLARE_CHAIN(CHAIN_ID) | ||||
|   #define SUM_CHAIN_DURATION(CHAIN_ID, NAME) | ||||
| #endif | ||||
| 
 | ||||
| #define MAKE_INPUT_NODE(CHAIN_ID, NAME, CLASS, TOPIC, PERIOD) \ | ||||
|   auto NAME##_duration = (PERIOD); \ | ||||
|  | @ -60,6 +64,18 @@ PRINT_AFTER(CHAIN_ID, NAME); | |||
|   auto NAME = std::make_shared<CLASS>(std::string("output_" STR(NAME) "_node"), INPUT_TOPIC, OUTPUT_TOPIC, NAME##_duration, NAME##_jitter); \ | ||||
|   SUM_CHAIN_DURATION(CHAIN_ID, NAME) | ||||
| 
 | ||||
| #ifdef USE_TIMER_IN_FUSION_NODES | ||||
|   #define REGISTER_MULTI_PROCESSING_NODE(NAME, CHAIN_PERIOD, CHAIN_ID) \ | ||||
|   for (auto const &sub : NAME->subscriptions_) { \ | ||||
|     strategy->set_executable_deadline(sub->get_subscription_handle(), CHAIN_PERIOD, priority_executor::ExecutableType::SUBSCRIPTION, CHAIN_ID, STR(NAME)); \ | ||||
|   } \ | ||||
|   strategy->set_executable_deadline(NAME->timer_->get_timer_handle(), CHAIN_PERIOD, priority_executor::ExecutableType::TIMER, CHAIN_ID, STR(NAME)); | ||||
| #else | ||||
|   #define REGISTER_MULTI_PROCESSING_NODE(NAME, CHAIN_PERIOD, CHAIN_ID) \ | ||||
|   for (auto const &sub : NAME->subscriptions_) { \ | ||||
|     strategy->set_executable_deadline(sub->get_subscription_handle(), CHAIN_PERIOD, priority_executor::ExecutableType::SUBSCRIPTION, CHAIN_ID, STR(NAME)); \ | ||||
|   } | ||||
| #endif | ||||
| 
 | ||||
| int main(int argc, char **argv) | ||||
| { | ||||
|  | @ -76,8 +92,10 @@ int main(int argc, char **argv) | |||
|   // Must be set to post_execute can set new deadlines
 | ||||
|   executor.prio_memory_strategy_ = strategy; | ||||
| 
 | ||||
|   double fps = 60.0; // Frames per second for the camera
 | ||||
|   double period = 1000.0 / fps; // Period in milliseconds
 | ||||
|   //            chain id,    node name,      node class,            input topic,             output topic, duration in ms
 | ||||
|   MAKE_INPUT_NODE(     0, cameraA     , CameraNode     , /*has no input topic*/  "/input/cameraA/raw"    , 1000.0/2.0); // 60 FPS camera, fast
 | ||||
|   MAKE_INPUT_NODE(     0, cameraA     , CameraNode     , /*has no input topic*/  "/input/cameraA/raw"    , period); // 60 FPS camera, fast
 | ||||
|   MAKE_PROCESSING_NODE(0, debayerA    , DebayerNode    , "/input/cameraA/raw"  , "/cameraA/debayered"    , 18); // Moderate, image conversion
 | ||||
|   MAKE_PROCESSING_NODE(0, radiometricA, RadiometricNode, "/cameraA/debayered"  , "/cameraA/radiometric"  , 22); // Heavier image math
 | ||||
|   MAKE_PROCESSING_NODE(0, geometric   , GeometricNode  , "/cameraA/radiometric", "/cameraA/geometric"    , 12); // Perspective transform, medium
 | ||||
|  | @ -85,21 +103,22 @@ int main(int argc, char **argv) | |||
| 
 | ||||
|   // the new funcitons in PriorityMemoryStrategy accept the handle of the
 | ||||
|   // timer/subscription as the first argument
 | ||||
|   strategy->set_executable_deadline(cameraA->timer_->get_timer_handle()                   , 1000, priority_executor::ExecutableType::TIMER       , 0); | ||||
|   // you _must_ set the timer_handle for each chain
 | ||||
|   strategy->get_priority_settings(cameraA->timer_->get_timer_handle())->timer_handle = cameraA->timer_; | ||||
|   strategy->set_executable_deadline(cameraA->timer_->get_timer_handle()                   , period, priority_executor::ExecutableType::TIMER       , 0); | ||||
|   // you _must_ mark the first executable in the chain
 | ||||
|   strategy->set_first_in_chain(cameraA->timer_->get_timer_handle()); | ||||
| 
 | ||||
|   // Set the executable deadlines for each node
 | ||||
|   strategy->set_executable_deadline(debayerA->subscription_->get_subscription_handle()    , 1000, priority_executor::ExecutableType::SUBSCRIPTION, 0); | ||||
|   strategy->set_executable_deadline(radiometricA->subscription_->get_subscription_handle(), 1000, priority_executor::ExecutableType::SUBSCRIPTION, 0); | ||||
|   strategy->set_executable_deadline(geometric->subscription_->get_subscription_handle()   , 1000, priority_executor::ExecutableType::SUBSCRIPTION, 0); | ||||
|   strategy->set_executable_deadline(mapping->subscription_->get_subscription_handle()     , 1000, priority_executor::ExecutableType::SUBSCRIPTION, 0); | ||||
|   strategy->set_executable_deadline(debayerA->subscription_->get_subscription_handle()    , period, priority_executor::ExecutableType::SUBSCRIPTION, 0); | ||||
|   strategy->set_executable_deadline(radiometricA->subscription_->get_subscription_handle(), period, priority_executor::ExecutableType::SUBSCRIPTION, 0); | ||||
|   strategy->set_executable_deadline(geometric->subscription_->get_subscription_handle()   , period, priority_executor::ExecutableType::SUBSCRIPTION, 0); | ||||
|   strategy->set_executable_deadline(mapping->subscription_->get_subscription_handle()     , period, priority_executor::ExecutableType::SUBSCRIPTION, 0); | ||||
| 
 | ||||
|   // you _must_ mark the last executable in the chain (used to keep track of different instances of the same chain)
 | ||||
|   strategy->set_last_in_chain(mapping->subscription_->get_subscription_handle()); | ||||
| 
 | ||||
|   // you _must_ set the timer_handle of the last callback in the chain to the timer of the first node
 | ||||
|   strategy->get_priority_settings(mapping->subscription_->get_subscription_handle())->timer_handle = cameraA->timer_; | ||||
| 
 | ||||
|   executor.add_node(cameraA); | ||||
|   executor.add_node(debayerA); | ||||
|   executor.add_node(radiometricA); | ||||
|  |  | |||
|  | @ -1 +1 @@ | |||
| Subproject commit 5f73198adbabb3a34985fb5e5c7f84b5e4149a86 | ||||
| Subproject commit f62e64cb565b8c6f422c8033691c8099e075fe0f | ||||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue