diff --git a/src/priority_executor/CMakeLists.txt b/src/priority_executor/CMakeLists.txt index e6ec3da..4ce2108 100755 --- a/src/priority_executor/CMakeLists.txt +++ b/src/priority_executor/CMakeLists.txt @@ -19,16 +19,11 @@ find_package(rmw REQUIRED) find_package(std_msgs REQUIRED) find_package(std_srvs REQUIRED) find_package(simple_timer REQUIRED) -find_package(nlohmann_json REQUIRED) # Library targets add_library(priority_executor src/priority_executor.cpp src/priority_memory_strategy.cpp - src/performance_monitor.cpp - src/default_executor.cpp - src/usage_example.cpp - src/multithread_priority_executor.cpp ) target_include_directories(priority_executor PUBLIC $ @@ -39,7 +34,35 @@ ament_target_dependencies(priority_executor rclcpp rcl simple_timer - nlohmann_json +) + +add_library(multithread_priority_executor + src/multithread_priority_executor.cpp + src/priority_memory_strategy.cpp +) +target_include_directories(multithread_priority_executor PUBLIC + $ + $ +) +ament_target_dependencies(multithread_priority_executor + rmw + rclcpp + rcl + simple_timer +) + +add_library(default_executor + src/default_executor.cpp +) +target_include_directories(default_executor PUBLIC + $ + $ +) +ament_target_dependencies(default_executor + rmw + rclcpp + rcl + simple_timer ) # Example executable @@ -52,6 +75,8 @@ target_include_directories(usage_example PUBLIC ) target_link_libraries(usage_example priority_executor + multithread_priority_executor + default_executor ) ament_target_dependencies(usage_example rclcpp @@ -73,7 +98,7 @@ install( ) install( - TARGETS priority_executor + TARGETS priority_executor multithread_priority_executor default_executor EXPORT export_${PROJECT_NAME} ARCHIVE DESTINATION lib LIBRARY DESTINATION lib @@ -89,5 +114,5 @@ install( # Export and package configuration ament_export_include_directories(include) ament_export_targets(export_${PROJECT_NAME} HAS_LIBRARY_TARGET) -ament_export_dependencies(rclcpp rcl rmw simple_timer nlohmann_json) +ament_export_dependencies(rclcpp rcl rmw simple_timer) ament_package() diff --git a/src/priority_executor/include/priority_executor/default_executor.hpp b/src/priority_executor/include/priority_executor/default_executor.hpp index 090687f..bbe0222 100755 --- a/src/priority_executor/include/priority_executor/default_executor.hpp +++ b/src/priority_executor/include/priority_executor/default_executor.hpp @@ -17,10 +17,10 @@ #include -#include -#include #include #include +#include +#include #include #include "rclcpp/executor.hpp" @@ -40,20 +40,19 @@ class ROSDefaultMultithreadedExecutor : public rclcpp::Executor, public RTISTime public: RCLCPP_SMART_PTR_DEFINITIONS(ROSDefaultMultithreadedExecutor) - RCLCPP_PUBLIC explicit ROSDefaultMultithreadedExecutor( - rclcpp::ExecutorOptions const &options = rclcpp::ExecutorOptions(), + rclcpp::ExecutorOptions const& options = rclcpp::ExecutorOptions(), int number_of_threads = 2, std::chrono::nanoseconds next_exec_timeout = std::chrono::nanoseconds(-1)); - RCLCPP_PUBLIC size_t get_number_of_threads(); - RCLCPP_PUBLIC void spin() override; + size_t get_number_of_threads(); + void spin() override; bool get_next_executable(rclcpp::AnyExecutable& any_executable, std::chrono::nanoseconds timeout = std::chrono::nanoseconds(-1)); protected: - RCLCPP_PUBLIC void run(size_t thread_number); + void run(size_t thread_number); private: RCLCPP_DISABLE_COPY(ROSDefaultMultithreadedExecutor) @@ -77,12 +76,10 @@ public: RCLCPP_SMART_PTR_DEFINITIONS(ROSDefaultExecutor) /// Default constructor. See the default constructor for Executor. - RCLCPP_PUBLIC explicit ROSDefaultExecutor( - rclcpp::ExecutorOptions const &options = rclcpp::ExecutorOptions()); + rclcpp::ExecutorOptions const& options = rclcpp::ExecutorOptions()); /// Default destructor. - RCLCPP_PUBLIC virtual ~ROSDefaultExecutor(); /// Single-threaded implementation of spin. @@ -93,13 +90,11 @@ public: * if the associated context is configured to shutdown on SIGINT. * \throws std::runtime_error when spin() called while already spinning */ - RCLCPP_PUBLIC void spin() override; bool get_next_executable(rclcpp::AnyExecutable& any_executable, std::chrono::nanoseconds timeout = std::chrono::nanoseconds(-1)); - RCLCPP_PUBLIC void wait_for_work(std::chrono::nanoseconds timeout = std::chrono::nanoseconds(-1)); private: diff --git a/src/priority_executor/include/priority_executor/multithread_priority_executor.hpp b/src/priority_executor/include/priority_executor/multithread_priority_executor.hpp index a722b53..9a38263 100755 --- a/src/priority_executor/include/priority_executor/multithread_priority_executor.hpp +++ b/src/priority_executor/include/priority_executor/multithread_priority_executor.hpp @@ -1,16 +1,15 @@ #ifndef PRIORITY_EXECUTOR__MULTITHREAD_PRIORITY_EXECUTOR_HPP_ #define PRIORITY_EXECUTOR__MULTITHREAD_PRIORITY_EXECUTOR_HPP_ -#include -#include #include #include +#include +#include #include #include #include "priority_executor/default_executor.hpp" #include "priority_executor/priority_executor.hpp" - #include "rclcpp/executor.hpp" #include "rclcpp/macros.hpp" #include "rclcpp/memory_strategies.hpp" @@ -30,20 +29,18 @@ public: int number_of_threads = 2, std::chrono::nanoseconds next_exec_timeout = std::chrono::nanoseconds(-1)); - RCLCPP_PUBLIC - virtual ~MultithreadTimedExecutor() = default; + RCLCPP_PUBLIC virtual ~MultithreadTimedExecutor() = default; - RCLCPP_PUBLIC size_t get_number_of_threads(); RCLCPP_PUBLIC void spin() override; protected: - RCLCPP_PUBLIC void run(size_t thread_number); + RCLCPP_PUBLIC void run(size_t _thread_number); private: - RCLCPP_DISABLE_COPY(MultithreadTimedExecutor) std::set scheduled_timers_; - size_t number_of_threads_; - std::chrono::nanoseconds next_exec_timeout_{std::chrono::nanoseconds(-1)}; + int number_of_threads_; + std::chrono::nanoseconds next_exec_timeout_ = std::chrono::nanoseconds(-1); + node_time_logger logger_; static std::unordered_map> diff --git a/src/priority_executor/include/priority_executor/performance_monitor.hpp b/src/priority_executor/include/priority_executor/performance_monitor.hpp deleted file mode 100644 index beece1f..0000000 --- a/src/priority_executor/include/priority_executor/performance_monitor.hpp +++ /dev/null @@ -1,237 +0,0 @@ -#ifndef PERFORMANCE_MONITOR_HPP_ -#define PERFORMANCE_MONITOR_HPP_ - -#include -#include -#include -#include -#include -#include -#include -#include -#include "rclcpp/rclcpp.hpp" -#include "simple_timer/rt-sched.hpp" - -namespace priority_executor { - -enum class PerformanceEventType { - CALLBACK_READY, - CALLBACK_START, - CALLBACK_END, - DEADLINE_MISSED, - DEADLINE_MET, - CHAIN_START, - CHAIN_END -}; - -struct PerformanceEvent { - uint64_t timestamp; // High-resolution timestamp (microseconds) - PerformanceEventType type; // Type of event - std::string node_name; // Name of the node - std::string callback_name; // Name of the callback - int chain_id{-1}; // Chain identifier - bool is_first_in_chain{false}; // First callback in chain - bool is_last_in_chain{false}; // Last callback in chain - uint64_t deadline{0}; // Deadline (if applicable) - uint64_t processing_time{0}; // Time spent executing (if applicable) - int executor_id{0}; // Executor identifier - std::string additional_data; // JSON string for extensible data - - // Helper method to extract callback type from additional_data - std::string get_callback_type() const { - if (additional_data.empty()) return "unknown"; - - try { - nlohmann::json json = nlohmann::json::parse(additional_data); - if (json.contains("callback_type")) { - return json["callback_type"].get(); - } - } catch (...) {} - - return "unknown"; - } - - // Helper method to extract thread info from additional_data - int get_thread_id() const { - if (additional_data.empty()) return -1; - - try { - nlohmann::json json = nlohmann::json::parse(additional_data); - if (json.contains("thread_info") && json["thread_info"].is_object() && - json["thread_info"].contains("thread_id")) { - return json["thread_info"]["thread_id"].get(); - } - // Direct thread_id if present - if (json.contains("thread_id")) { - return json["thread_id"].get(); - } - } catch (...) {} - - return -1; - } - - // Helper method to create a descriptive identifier for this callback - std::string get_descriptive_id() const { - std::stringstream ss; - ss << node_name; - - if (!callback_name.empty()) { - ss << "/" << callback_name; - } - - if (chain_id >= 0) { - ss << " (chain:" << chain_id; - if (is_first_in_chain) ss << " start"; - if (is_last_in_chain) ss << " end"; - ss << ")"; - } - - return ss.str(); - } - - // Convert additional_data to JSON object - nlohmann::json get_additional_data_json() const { - try { - if (!additional_data.empty()) { - return nlohmann::json::parse(additional_data); - } - } catch (...) {} - - return nlohmann::json::object(); - } -}; - -class PerformanceMonitor { -public: - static PerformanceMonitor& getInstance(); - - void recordEvent(const PerformanceEvent& event); - void setBufferSize(size_t size); - void setAutoDumpThreshold(size_t threshold); - void setDumpPath(const std::string& path); - bool dumpToFile(const std::string& filename); - void enable(bool enabled = true); - void clear(); - - // Helper method to set monitoring options in one call - void setMonitoringOptions(size_t buffer_size, size_t auto_dump_threshold, - const std::string& dump_path) { - setBufferSize(buffer_size); - setAutoDumpThreshold(auto_dump_threshold); - setDumpPath(dump_path); - } - - // Get all events for a specific callback - std::vector getEventsForCallback(const std::string& node_name, - const std::string& callback_name) const { - std::vector result; - std::lock_guard lock(event_mutex_); - - for (const auto& event : event_buffer_) { - if (event.node_name == node_name && event.callback_name == callback_name) { - result.push_back(event); - } - } - - return result; - } - - // Get all events for a specific chain - std::vector getEventsForChain(int chain_id) const { - std::vector result; - std::lock_guard lock(event_mutex_); - - for (const auto& event : event_buffer_) { - if (event.chain_id == chain_id) { - result.push_back(event); - } - } - - return result; - } - - // Get all events by type - std::vector getEventsByType(PerformanceEventType type) const { - std::vector result; - std::lock_guard lock(event_mutex_); - - for (const auto& event : event_buffer_) { - if (event.type == type) { - result.push_back(event); - } - } - - return result; - } - - // Get a formatted report of callbacks and their execution times - std::string getCallbackExecutionReport() const { - std::lock_guard lock(event_mutex_); - std::stringstream ss; - - ss << "Callback Execution Report:\n"; - ss << "=========================\n\n"; - - // Maps to store total execution time and count per callback - std::unordered_map total_time; - std::unordered_map call_count; - std::unordered_map max_time; - std::unordered_map min_time; - - // Process all CALLBACK_END events (they contain processing_time) - for (const auto& event : event_buffer_) { - if (event.type == PerformanceEventType::CALLBACK_END && event.processing_time > 0) { - std::string callback_id = event.get_descriptive_id(); - total_time[callback_id] += event.processing_time; - call_count[callback_id]++; - max_time[callback_id] = std::max(max_time[callback_id], event.processing_time); - if (min_time[callback_id] == 0 || event.processing_time < min_time[callback_id]) { - min_time[callback_id] = event.processing_time; - } - } - } - - // Print results sorted by callback id - std::vector callback_ids; - for (const auto& entry : call_count) { - callback_ids.push_back(entry.first); - } - - std::sort(callback_ids.begin(), callback_ids.end()); - - for (const auto& id : callback_ids) { - double avg_time = static_cast(total_time[id]) / call_count[id]; - ss << id << "\n"; - ss << " Calls: " << call_count[id] << "\n"; - ss << " Avg time: " << avg_time << " µs\n"; - ss << " Min time: " << min_time[id] << " µs\n"; - ss << " Max time: " << max_time[id] << " µs\n"; - ss << " Total time: " << total_time[id] << " µs\n\n"; - } - - return ss.str(); - } - -private: - PerformanceMonitor(); - ~PerformanceMonitor() = default; - - PerformanceMonitor(const PerformanceMonitor&) = delete; - PerformanceMonitor& operator=(const PerformanceMonitor&) = delete; - PerformanceMonitor(PerformanceMonitor&&) = delete; - PerformanceMonitor& operator=(PerformanceMonitor&&) = delete; - - mutable std::mutex event_mutex_; - std::vector event_buffer_; - size_t buffer_size_{10000}; - size_t auto_dump_threshold_{0}; - std::string dump_path_{"performance_logs"}; - std::atomic enabled_{true}; - - std::string eventsToJson() const; - void checkAndAutoDump(); -}; - -} // namespace priority_executor - -#endif // PERFORMANCE_MONITOR_HPP_ \ No newline at end of file diff --git a/src/priority_executor/include/priority_executor/priority_executor.hpp b/src/priority_executor/include/priority_executor/priority_executor.hpp index 62d7248..e485ced 100755 --- a/src/priority_executor/include/priority_executor/priority_executor.hpp +++ b/src/priority_executor/include/priority_executor/priority_executor.hpp @@ -20,15 +20,12 @@ #include #include #include -#include #include "rclcpp/executor.hpp" #include "rclcpp/macros.hpp" #include "rclcpp/visibility_control.hpp" - #include "priority_executor/priority_memory_strategy.hpp" #include "priority_executor/default_executor.hpp" -#include "priority_executor/performance_monitor.hpp" namespace priority_executor { @@ -41,13 +38,11 @@ public: RCLCPP_SMART_PTR_DEFINITIONS(TimedExecutor) /// Default constructor. See the default constructor for Executor. - RCLCPP_PUBLIC explicit TimedExecutor( - rclcpp::ExecutorOptions const &options = rclcpp::ExecutorOptions(), + rclcpp::ExecutorOptions const& options = rclcpp::ExecutorOptions(), std::string name = "unnamed executor"); /// Default destructor. - RCLCPP_PUBLIC virtual ~TimedExecutor(); /// Single-threaded implementation of spin. @@ -58,50 +53,12 @@ public: * if the associated context is configured to shutdown on SIGINT. * \throws std::runtime_error when spin() called while already spinning */ - RCLCPP_PUBLIC void spin() override; std::string name; void set_use_priorities(bool use_prio); std::shared_ptr> prio_memory_strategy_{nullptr}; - // Performance monitoring control - void enableMonitoring(bool enable = true) { - PerformanceMonitor::getInstance().enable(enable); - } - - void setMonitoringOptions( - size_t buffer_size, - size_t auto_dump_threshold, - const std::string& dump_path) { - auto& monitor = PerformanceMonitor::getInstance(); - monitor.setBufferSize(buffer_size); - monitor.setAutoDumpThreshold(auto_dump_threshold); - monitor.setDumpPath(dump_path); - } - - // Get a formatted report of all callback execution times - std::string getCallbackExecutionReport() const { - auto& monitor = PerformanceMonitor::getInstance(); - return monitor.getCallbackExecutionReport(); - } - - // Manually trigger a log dump with a specific filename - bool dumpMonitoringData(const std::string& filename) { - auto& monitor = PerformanceMonitor::getInstance(); - return monitor.dumpToFile(filename); - } - - // Get executor ID for identification in logs - int getExecutorId() const { - return executor_id_; - } - - // Get executor name for identification in logs - const std::string& getExecutorName() const { - return name; - } - protected: bool get_next_executable(rclcpp::AnyExecutable& any_executable, std::chrono::nanoseconds timeout = std::chrono::nanoseconds(-1)); @@ -111,14 +68,8 @@ private: void wait_for_work(std::chrono::nanoseconds timeout); bool get_next_ready_executable(rclcpp::AnyExecutable& any_executable); - void recordExecutionEvent(const rclcpp::AnyExecutable& any_exec, - PerformanceEventType event_type, - uint64_t timestamp, - uint64_t processing_time = 0); bool use_priorities{true}; - static std::atomic next_executor_id_; - int executor_id_; }; } // namespace priority_executor diff --git a/src/priority_executor/include/priority_executor/priority_memory_strategy.hpp b/src/priority_executor/include/priority_executor/priority_memory_strategy.hpp index 6033862..d71bafb 100755 --- a/src/priority_executor/include/priority_executor/priority_memory_strategy.hpp +++ b/src/priority_executor/include/priority_executor/priority_memory_strategy.hpp @@ -1,17 +1,3 @@ -// Copyright 2015 Open Source Robotics Foundation, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - #ifndef PRIORITY_EXECUTOR__PRIORITY_MEMORY_STRATEGY_HPP_ #define PRIORITY_EXECUTOR__PRIORITY_MEMORY_STRATEGY_HPP_ @@ -24,19 +10,10 @@ #include #include "rcl/allocator.h" - #include "rclcpp/allocator/allocator_common.hpp" #include "rclcpp/memory_strategy.hpp" - #include "simple_timer/rt-sched.hpp" -/// Delegate for handling memory allocations while the Executor is executing. -/** - * By default, the memory strategy dynamically allocates memory for structures - * that come in from the rmw implementation after the executor waits for work, - * based on the number of entities that come through. - */ - namespace priority_executor { enum class ExecutableType { @@ -47,7 +24,7 @@ enum class ExecutableType { WAITABLE }; -std::ostream& operator<<(std::ostream& os, const ExecutableType& obj); +std::ostream& operator << (std::ostream& os, const ExecutableType& obj); enum class ExecutableScheduleType { DEADLINE = 0, @@ -56,15 +33,15 @@ enum class ExecutableScheduleType { DEFAULT, // not used here }; -std::ostream& operator<<(std::ostream& os, const ExecutableScheduleType& obj); +std::ostream& operator << (std::ostream& os, const ExecutableScheduleType& obj); class PriorityExecutable { public: static size_t num_executables; explicit PriorityExecutable( - std::shared_ptr h, - int p, + std::shared_ptr h, + int p, ExecutableType t, ExecutableScheduleType sched_type = ExecutableScheduleType::CHAIN_INDEPENDENT_PRIORITY); @@ -73,8 +50,8 @@ public: void dont_run(); void allow_run(); void increment_counter(); - bool operator==(PriorityExecutable const &other) const; - friend std::ostream& operator<<(std::ostream& os, PriorityExecutable const &pe); + bool operator==(PriorityExecutable const& other) const; + friend std::ostream& operator<<(std::ostream& os, PriorityExecutable const& pe); std::shared_ptr handle; ExecutableType type; @@ -97,7 +74,7 @@ public: class PriorityExecutableComparator { public: - bool operator()(PriorityExecutable const *p1, PriorityExecutable const *p2) const; + bool operator()(PriorityExecutable const* p1, PriorityExecutable const* p2) const; }; template > diff --git a/src/priority_executor/package.xml b/src/priority_executor/package.xml index faaedb5..ee5a022 100755 --- a/src/priority_executor/package.xml +++ b/src/priority_executor/package.xml @@ -6,7 +6,7 @@ ROS 2 package implementing priority-based executors for real-time task scheduling - kurt + ROS-Dynamic-Executor Team Apache License 2.0 ament_cmake diff --git a/src/priority_executor/src/default_executor.cpp b/src/priority_executor/src/default_executor.cpp index 5210e23..8e3962a 100755 --- a/src/priority_executor/src/default_executor.cpp +++ b/src/priority_executor/src/default_executor.cpp @@ -14,12 +14,12 @@ #include "priority_executor/default_executor.hpp" -#include #include -#include -#include -#include #include +#include +#include +#include +#include #include #include "rcpputils/scope_exit.hpp" @@ -33,7 +33,7 @@ std::unordered_map #include @@ -8,7 +7,6 @@ #include #include #include -#include #include "rcpputils/scope_exit.hpp" #include "rclcpp/any_executable.hpp" @@ -22,7 +20,7 @@ std::unordered_mapcontext_) && spinning.load()) { rclcpp::AnyExecutable any_exec; { @@ -108,160 +83,8 @@ void MultithreadTimedExecutor::run(size_t _thread_number) { } } - // Record execution start with thread info - auto start = std::chrono::steady_clock::now(); - uint64_t start_time = std::chrono::duration_cast( - start.time_since_epoch()).count(); - - PerformanceEvent start_event; - start_event.timestamp = start_time; - start_event.type = PerformanceEventType::CALLBACK_START; - start_event.executor_id = getExecutorId(); - - if (any_exec.node_base) { - start_event.node_name = any_exec.node_base->get_name(); - - // Enhanced callback identification based on executable type - if (any_exec.timer) { - // For timer callbacks - std::stringstream ss; - - // retrieve the timer period - int64_t period; - rcl_timer_get_period(any_exec.timer->get_timer_handle().get(), &period); - - // convert period (which is in nanoseconds) to milliseconds - std::chrono::nanoseconds period_ns(period); - std::chrono::milliseconds period_ms = std::chrono::duration_cast(period_ns); - - ss << "timer_" << period_ms.count() << "ms"; - start_event.callback_name = ss.str(); - - // Add memory address as unique identifier - std::stringstream addr; - addr << "@" << any_exec.timer.get(); - nlohmann::json additional; - additional["callback_type"] = "timer"; - additional["timer_addr"] = addr.str(); - additional["thread_info"] = thread_info; - start_event.additional_data = additional.dump(); - } - else if (any_exec.subscription) { - // For subscription callbacks - start_event.callback_name = any_exec.subscription->get_topic_name(); - - nlohmann::json additional; - additional["callback_type"] = "subscription"; - additional["thread_info"] = thread_info; - start_event.additional_data = additional.dump(); - } - else if (any_exec.service) { - start_event.callback_name = any_exec.service->get_service_name(); - - nlohmann::json additional; - additional["callback_type"] = "service"; - additional["thread_info"] = thread_info; - start_event.additional_data = additional.dump(); - } - else if (any_exec.client) { - start_event.callback_name = "client_callback"; - - nlohmann::json additional; - additional["callback_type"] = "client"; - additional["thread_info"] = thread_info; - start_event.additional_data = additional.dump(); - } - else if (any_exec.waitable) { - start_event.callback_name = "waitable_callback"; - - nlohmann::json additional; - additional["callback_type"] = "waitable"; - additional["thread_info"] = thread_info; - start_event.additional_data = additional.dump(); - } - - // Add chain information - std::shared_ptr executable = nullptr; - if (prio_memory_strategy_) { - if (any_exec.timer) { - executable = prio_memory_strategy_->get_priority_settings( - any_exec.timer->get_timer_handle()); - } else if (any_exec.subscription) { - executable = prio_memory_strategy_->get_priority_settings( - any_exec.subscription->get_subscription_handle()); - } - } - - if (executable) { - start_event.chain_id = executable->chain_id; - start_event.is_first_in_chain = executable->is_first_in_chain; - start_event.is_last_in_chain = executable->is_last_in_chain; - - if (executable->deadlines && !executable->deadlines->empty()) { - start_event.deadline = executable->deadlines->front(); - } - - // Include the priority information - try { - nlohmann::json additional = start_event.additional_data.empty() ? - nlohmann::json() : nlohmann::json::parse(start_event.additional_data); - - additional["priority"] = executable->priority; - - if (executable->chain_id >= 0) { - additional["chain_id"] = executable->chain_id; - } - - if (executable->sched_type != ExecutableScheduleType::DEFAULT) { - additional["schedule_type"] = static_cast(executable->sched_type); - } - - start_event.additional_data = additional.dump(); - } catch (...) { - // If there was an error parsing the JSON, create new JSON object - nlohmann::json additional; - additional["priority"] = executable->priority; - additional["thread_info"] = thread_info; - start_event.additional_data = additional.dump(); - } - } - } else { - start_event.additional_data = thread_info_str; - } - - PerformanceMonitor::getInstance().recordEvent(start_event); - - // Execute the callback execute_any_executable(any_exec); - // Record execution completion - auto end = std::chrono::steady_clock::now(); - uint64_t end_time = std::chrono::duration_cast( - end.time_since_epoch()).count(); - uint64_t processing_time = end_time - start_time; - - // Create identical event structure but for end event - PerformanceEvent end_event = start_event; - end_event.timestamp = end_time; - end_event.type = PerformanceEventType::CALLBACK_END; - end_event.processing_time = processing_time; - - // Update the additional data to include processing time - try { - nlohmann::json additional = end_event.additional_data.empty() ? - nlohmann::json() : nlohmann::json::parse(end_event.additional_data); - additional["processing_time_us"] = processing_time; - end_event.additional_data = additional.dump(); - } catch (...) { - // If JSON parsing failed, create a new object - nlohmann::json additional; - additional["processing_time_us"] = processing_time; - additional["thread_info"] = thread_info; - end_event.additional_data = additional.dump(); - } - - PerformanceMonitor::getInstance().recordEvent(end_event); - if (any_exec.timer) { auto wait_mutex = MultithreadTimedExecutor::wait_mutex_set_[this]; auto high_priority_wait_mutex = wait_mutex->get_high_priority_lockable(); @@ -275,13 +98,6 @@ void MultithreadTimedExecutor::run(size_t _thread_number) { // Clear the callback_group to prevent the AnyExecutable destructor from // resetting the callback group `can_be_taken_from` any_exec.callback_group.reset(); - if (prio_memory_strategy_ != nullptr) - { - auto wait_mutex = MultithreadTimedExecutor::wait_mutex_set_[this]; - auto low_priority_wait_mutex = wait_mutex->get_low_priority_lockable(); - std::lock_guard wait_lock(low_priority_wait_mutex); - prio_memory_strategy_->post_execute(any_exec, _thread_number); - } } } } // namespace priority_executor diff --git a/src/priority_executor/src/performance_monitor.cpp b/src/priority_executor/src/performance_monitor.cpp deleted file mode 100644 index 1c5f569..0000000 --- a/src/priority_executor/src/performance_monitor.cpp +++ /dev/null @@ -1,149 +0,0 @@ -#include "priority_executor/performance_monitor.hpp" -#include -#include -#include -#include -#include - -namespace priority_executor { - -PerformanceMonitor& PerformanceMonitor::getInstance() { - static PerformanceMonitor instance; - return instance; -} - -PerformanceMonitor::PerformanceMonitor() { - event_buffer_.reserve(buffer_size_); - std::filesystem::create_directories(dump_path_); -} - -void PerformanceMonitor::recordEvent(const PerformanceEvent& event) { - if (!enabled_) return; - - { - std::lock_guard lock(event_mutex_); - event_buffer_.push_back(event); - - if (auto_dump_threshold_ > 0 && event_buffer_.size() >= auto_dump_threshold_) { - checkAndAutoDump(); - } - } -} - -void PerformanceMonitor::setBufferSize(size_t size) { - std::lock_guard lock(event_mutex_); - buffer_size_ = size; - event_buffer_.reserve(size); -} - -void PerformanceMonitor::setAutoDumpThreshold(size_t threshold) { - auto_dump_threshold_ = threshold; -} - -void PerformanceMonitor::setDumpPath(const std::string& path) { - dump_path_ = path; - std::filesystem::create_directories(dump_path_); -} - -void PerformanceMonitor::enable(bool enabled) { - enabled_ = enabled; -} - -void PerformanceMonitor::clear() { - std::lock_guard lock(event_mutex_); - event_buffer_.clear(); -} - -std::string PerformanceMonitor::eventsToJson() const { - nlohmann::json root = nlohmann::json::array(); - - for (const auto& event : event_buffer_) { - nlohmann::json event_json; - - event_json["timestamp"] = event.timestamp; - - switch (event.type) { - case PerformanceEventType::CALLBACK_READY: - event_json["type"] = "callback_ready"; - break; - case PerformanceEventType::CALLBACK_START: - event_json["type"] = "callback_start"; - break; - case PerformanceEventType::CALLBACK_END: - event_json["type"] = "callback_end"; - break; - case PerformanceEventType::DEADLINE_MISSED: - event_json["type"] = "deadline_missed"; - break; - case PerformanceEventType::DEADLINE_MET: - event_json["type"] = "deadline_met"; - break; - case PerformanceEventType::CHAIN_START: - event_json["type"] = "chain_start"; - break; - case PerformanceEventType::CHAIN_END: - event_json["type"] = "chain_end"; - break; - } - - event_json["node_name"] = event.node_name; - event_json["callback_name"] = event.callback_name; - event_json["chain_id"] = event.chain_id; - event_json["is_first_in_chain"] = event.is_first_in_chain; - event_json["is_last_in_chain"] = event.is_last_in_chain; - - if (event.deadline > 0) { - event_json["deadline"] = event.deadline; - } - - if (event.processing_time > 0) { - event_json["processing_time"] = event.processing_time; - } - - event_json["executor_id"] = event.executor_id; - - if (!event.additional_data.empty()) { - try { - event_json["additional_data"] = nlohmann::json::parse(event.additional_data); - } catch (...) { - event_json["additional_data"] = event.additional_data; - } - } - - root.push_back(event_json); - } - - return root.dump(2); // Indent with 2 spaces -} - -bool PerformanceMonitor::dumpToFile(const std::string& filename) { - std::lock_guard lock(event_mutex_); - - if (event_buffer_.empty()) { - return true; // Nothing to dump - } - - std::string full_path = dump_path_ + "/" + filename; - std::ofstream file(full_path); - if (!file.is_open()) { - return false; - } - - file << eventsToJson(); - event_buffer_.clear(); - return true; -} - -void PerformanceMonitor::checkAndAutoDump() { - if (event_buffer_.size() >= auto_dump_threshold_) { - auto now = std::chrono::system_clock::now(); - auto timestamp = std::chrono::duration_cast( - now.time_since_epoch()).count(); - - std::stringstream ss; - ss << "performance_log_" << timestamp << ".json"; - dumpToFile(ss.str()); - } -} - -} // namespace priority_executor \ No newline at end of file diff --git a/src/priority_executor/src/priority_executor.cpp b/src/priority_executor/src/priority_executor.cpp index f9eee3e..e5f1fa0 100755 --- a/src/priority_executor/src/priority_executor.cpp +++ b/src/priority_executor/src/priority_executor.cpp @@ -24,7 +24,7 @@ namespace priority_executor { -TimedExecutor::TimedExecutor(rclcpp::ExecutorOptions const &options, std::string name) +TimedExecutor::TimedExecutor(rclcpp::ExecutorOptions const& options, std::string name) : rclcpp::Executor(options) { this->name = std::move(name); logger_ = create_logger(); @@ -38,19 +38,11 @@ void TimedExecutor::spin() { } RCLCPP_SCOPE_EXIT(this->spinning.store(false);); - using namespace std::chrono_literals; - std::chrono::time_point start = std::chrono::steady_clock::now(); - - while (rclcpp::ok(this->context_) && spinning.load() && (std::chrono::steady_clock::now() - start) <= 5s) { + while (rclcpp::ok(this->context_) && spinning.load()) { rclcpp::AnyExecutable any_executable; - // std::cout<number_of_ready_timers()<name << std::endl; - // size_t ready = memory_strategy_->number_of_ready_subscriptions(); - // std::cout << "ready:" << ready << std::endl; if (get_next_executable(any_executable, std::chrono::nanoseconds(-1))) { execute_any_executable(any_executable); - // make sure memory_strategy_ is an instance of PriorityMemoryStrategy if (prio_memory_strategy_ != nullptr) { prio_memory_strategy_->post_execute(any_executable); } @@ -63,11 +55,6 @@ bool TimedExecutor::get_next_executable( rclcpp::AnyExecutable& any_executable, std::chrono::nanoseconds timeout) { bool success = false; - // Check to see if there are any subscriptions or timers needing service - // TODO(wjwwood): improve run to run efficiency of this function - // sched_yield(); - // sleep for 10us - // usleep(20); auto const start = std::chrono::steady_clock::now(); wait_for_work(timeout); diff --git a/src/priority_executor/src/priority_memory_strategy.cpp b/src/priority_executor/src/priority_memory_strategy.cpp index 68f6295..0fbdf5a 100755 --- a/src/priority_executor/src/priority_memory_strategy.cpp +++ b/src/priority_executor/src/priority_memory_strategy.cpp @@ -1,11 +1,9 @@ #include "priority_executor/priority_memory_strategy.hpp" #include "simple_timer/rt-sched.hpp" - #include -#include -#include - #include +#include +#include using namespace priority_executor; @@ -35,10 +33,11 @@ std::ostream& priority_executor::operator<<(std::ostream& os, const ExecutableSc return os; } -std::ostream& priority_executor::operator<<(std::ostream &os, const PriorityExecutable &pe) +std::ostream &operator<<(std::ostream &os, const PriorityExecutable &pe) { os << "sched_type: " << pe.sched_type << ", "; - if (pe.sched_type == ExecutableScheduleType::DEADLINE) { + if (pe.sched_type == ExecutableScheduleType::DEADLINE) + { os << "period: " << pe.period << ", "; } os << "priority: " << pe.priority << ", "; @@ -69,16 +68,13 @@ std::shared_ptr PriorityMemoryStrategy<>::get_and_reset_priority(std::shared_ptr executable, ExecutableType t) { - // PriorityExecutable *p = get_priority_settings(executable); std::shared_ptr p = get_priority_settings(executable); if (p == nullptr) { - // priority_map[executable] = PriorityExecutable(executable, 0, t); priority_map[executable] = std::make_shared(executable, 0, t); p = priority_map[executable]; } - // p->can_be_run = true; return p; } @@ -291,7 +287,7 @@ void PriorityExecutable::increment_counter() { this->counter += 1; } -bool PriorityExecutable::operator==(PriorityExecutable const &other) const { +bool PriorityExecutable::operator==(PriorityExecutable const& other) const { std::cout << "PriorityExecutable::operator== called" << std::endl; return this->handle == other.handle; } @@ -300,11 +296,7 @@ bool PriorityExecutableComparator::operator()( PriorityExecutable const *p1, PriorityExecutable const *p2) const { - // since this will be used in a std::set, also check for equality if (p1 == nullptr || p2 == nullptr) { - // TODO: realistic value - std::cout << "PriorityExecutableComparator::operator() called with nullptr" - << std::endl; return false; } @@ -319,21 +311,18 @@ bool PriorityExecutableComparator::operator()( if (p1->sched_type == ExecutableScheduleType::CHAIN_INDEPENDENT_PRIORITY) { return p1->priority == p2->priority ? - p1->executable_id < p2->executable_id : - // tie-breaker: lower value runs first + p1->executable_id < p2->executable_id : p1->priority < p2->priority; } if (p1->sched_type == ExecutableScheduleType::CHAIN_AWARE_PRIORITY) { return p1->priority == p2->priority ? - p1->executable_id < p2->executable_id : + p1->executable_id < p2->executable_id : // tie-breaker: lower value runs first p1->priority < p2->priority; } if (p1->sched_type == ExecutableScheduleType::DEADLINE) { - // TODO: use the counter logic here as well - uint64_t p1_deadline = 0; uint64_t p2_deadline = 0; @@ -372,8 +361,8 @@ bool PriorityExecutableComparator::operator()( template <> void PriorityMemoryStrategy<>::add_guard_condition( - rcl_guard_condition_t const *guard_condition) { - for (auto const &existing_guard_condition : guard_conditions_) { + rcl_guard_condition_t const* guard_condition) { + for (auto const& existing_guard_condition : guard_conditions_) { if (existing_guard_condition == guard_condition) { return; } @@ -383,7 +372,7 @@ void PriorityMemoryStrategy<>::add_guard_condition( template <> void PriorityMemoryStrategy<>::remove_guard_condition( - rcl_guard_condition_t const *guard_condition) { + rcl_guard_condition_t const* guard_condition) { for (auto it = guard_conditions_.begin(); it != guard_conditions_.end(); ++it) { if (*it == guard_condition) { guard_conditions_.erase(it); diff --git a/src/priority_executor/src/usage_example.cpp b/src/priority_executor/src/usage_example.cpp index b9c4b4c..9ae1f31 100755 --- a/src/priority_executor/src/usage_example.cpp +++ b/src/priority_executor/src/usage_example.cpp @@ -1,107 +1,43 @@ +// Copyright 2015 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include #include -#include -#include +#include -#include -#include +#include "priority_executor/default_executor.hpp" +#include "priority_executor/multithread_priority_executor.hpp" +#include "rclcpp/rclcpp.hpp" +#include "std_msgs/msg/string.hpp" -#include "priority_executor/priority_executor.hpp" -#include "priority_executor/priority_memory_strategy.hpp" +using namespace std::chrono_literals; -// re-create the classic talker-listener example with two listeners -class Talker : public rclcpp::Node -{ -public: - Talker() : Node("talker") - { - // Create a publisher on the "chatter" topic with 10 msg queue size. - pub_ = this->create_publisher("chatter", 10); - // Create a timer of period 1s that calls our callback member function. - timer_ = this->create_wall_timer(std::chrono::seconds(1), - std::bind(&Talker::timer_callback, this)); - } - // the timer must be public - rclcpp::TimerBase::SharedPtr timer_; - -private: - void timer_callback() - { - std_msgs::msg::String msg; - msg.data = "Hello World!"; - RCLCPP_INFO(this->get_logger(), "Publishing: '%s'", msg.data.c_str()); - pub_->publish(msg); - } - rclcpp::Publisher::SharedPtr pub_; -}; - -class Listener : public rclcpp::Node -{ -public: - Listener(std::string name) : Node(name) - { - // Create a subscription on the "chatter" topic with the default callback - // method. - sub_ = this->create_subscription( - "chatter", 10, - std::bind(&Listener::callback, this, std::placeholders::_1)); - } - // the publisher must be public - rclcpp::Subscription::SharedPtr sub_; - -private: - void callback(const std_msgs::msg::String::SharedPtr msg) - { - RCLCPP_INFO(this->get_logger(), "I heard: '%s'", msg->data.c_str()); - } -}; +void chatter_callback(std_msgs::msg::String::SharedPtr const msg) { + RCLCPP_INFO(rclcpp::get_logger("rclcpp"), "I heard: '%s'", msg->data.c_str()); +} int main(int argc, char* argv[]) { rclcpp::init(argc, argv); - auto talker = std::make_shared(); - auto listener1 = std::make_shared("listener1"); - auto listener2 = std::make_shared("listener2"); - rclcpp::ExecutorOptions options; + auto node = rclcpp::Node::make_shared("minimal_subscriber"); - auto strategy = std::make_shared>(); - options.memory_strategy = strategy; - auto executor = new priority_executor::TimedExecutor(options); + auto subscription = node->create_subscription( + "topic", 10, chatter_callback); - // must be set to post_execute can set new deadlines - executor->prio_memory_strategy_ = strategy; - - - // the new funcitons in PriorityMemoryStrategy accept the handle of the - // timer/subscription as the first argument - strategy->set_executable_deadline(talker->timer_->get_timer_handle(), 1000, - priority_executor::ExecutableType::TIMER, 0); - // you _must_ set the timer_handle for each chain - strategy->get_priority_settings(talker->timer_->get_timer_handle()) - ->timer_handle = talker->timer_; - // you _must_ mark the first executable in the chain - strategy->set_first_in_chain(talker->timer_->get_timer_handle()); - // set the same period and chain_id for each callback in the chain - strategy->set_executable_deadline(listener1->sub_->get_subscription_handle(), - 1000, priority_executor::ExecutableType::SUBSCRIPTION, 0); - strategy->set_executable_deadline(listener2->sub_->get_subscription_handle(), - 1000, priority_executor::ExecutableType::SUBSCRIPTION, 0); - // you _must_ mark the last executable in the chain (used to keep track of different instances of the same chain) - strategy->set_last_in_chain(listener2->sub_->get_subscription_handle()); - // add all the nodes to the executor - executor->add_node(talker); - executor->add_node(listener1); - executor->add_node(listener2); - - // if the executor behaves unexpectedly, you can print the priority settings to make sure they are correct - std::cout << *strategy->get_priority_settings( - talker->timer_->get_timer_handle()) - << std::endl; - std::cout << *strategy->get_priority_settings( - listener1->sub_->get_subscription_handle()) - << std::endl; - std::cout << *strategy->get_priority_settings( - listener2->sub_->get_subscription_handle()) - << std::endl; + auto executor = std::make_shared(); + executor->add_node(node); executor->spin(); rclcpp::shutdown(); diff --git a/src/simple_timer/package.xml b/src/simple_timer/package.xml index f0e1c00..3022371 100644 --- a/src/simple_timer/package.xml +++ b/src/simple_timer/package.xml @@ -6,7 +6,7 @@ ROS 2 package providing timer functionality for the dynamic executor experiments - kurt + ROS-Dynamic-Executor Team Apache License 2.0 ament_cmake diff --git a/src/simple_timer/src/rt-sched.cpp b/src/simple_timer/src/rt-sched.cpp index a6955c8..f718941 100644 --- a/src/simple_timer/src/rt-sched.cpp +++ b/src/simple_timer/src/rt-sched.cpp @@ -33,7 +33,6 @@ int sched_getattr( void log_entry(node_time_logger logger, std::string const& text) { if (logger.recorded_times != nullptr) { logger.recorded_times->emplace_back(text, get_time_us() / 1000); - // std::cout<