diff --git a/src/priority_executor/CMakeLists.txt b/src/priority_executor/CMakeLists.txt index 4ce2108..e6ec3da 100755 --- a/src/priority_executor/CMakeLists.txt +++ b/src/priority_executor/CMakeLists.txt @@ -19,11 +19,16 @@ find_package(rmw REQUIRED) find_package(std_msgs REQUIRED) find_package(std_srvs REQUIRED) find_package(simple_timer REQUIRED) +find_package(nlohmann_json REQUIRED) # Library targets add_library(priority_executor src/priority_executor.cpp src/priority_memory_strategy.cpp + src/performance_monitor.cpp + src/default_executor.cpp + src/usage_example.cpp + src/multithread_priority_executor.cpp ) target_include_directories(priority_executor PUBLIC $ @@ -34,35 +39,7 @@ ament_target_dependencies(priority_executor rclcpp rcl simple_timer -) - -add_library(multithread_priority_executor - src/multithread_priority_executor.cpp - src/priority_memory_strategy.cpp -) -target_include_directories(multithread_priority_executor PUBLIC - $ - $ -) -ament_target_dependencies(multithread_priority_executor - rmw - rclcpp - rcl - simple_timer -) - -add_library(default_executor - src/default_executor.cpp -) -target_include_directories(default_executor PUBLIC - $ - $ -) -ament_target_dependencies(default_executor - rmw - rclcpp - rcl - simple_timer + nlohmann_json ) # Example executable @@ -75,8 +52,6 @@ target_include_directories(usage_example PUBLIC ) target_link_libraries(usage_example priority_executor - multithread_priority_executor - default_executor ) ament_target_dependencies(usage_example rclcpp @@ -98,7 +73,7 @@ install( ) install( - TARGETS priority_executor multithread_priority_executor default_executor + TARGETS priority_executor EXPORT export_${PROJECT_NAME} ARCHIVE DESTINATION lib LIBRARY DESTINATION lib @@ -114,5 +89,5 @@ install( # Export and package configuration ament_export_include_directories(include) ament_export_targets(export_${PROJECT_NAME} HAS_LIBRARY_TARGET) -ament_export_dependencies(rclcpp rcl rmw simple_timer) +ament_export_dependencies(rclcpp rcl rmw simple_timer nlohmann_json) ament_package() diff --git a/src/priority_executor/include/priority_executor/default_executor.hpp b/src/priority_executor/include/priority_executor/default_executor.hpp index bbe0222..090687f 100755 --- a/src/priority_executor/include/priority_executor/default_executor.hpp +++ b/src/priority_executor/include/priority_executor/default_executor.hpp @@ -17,10 +17,10 @@ #include +#include +#include #include #include -#include -#include #include #include "rclcpp/executor.hpp" @@ -40,19 +40,20 @@ class ROSDefaultMultithreadedExecutor : public rclcpp::Executor, public RTISTime public: RCLCPP_SMART_PTR_DEFINITIONS(ROSDefaultMultithreadedExecutor) + RCLCPP_PUBLIC explicit ROSDefaultMultithreadedExecutor( - rclcpp::ExecutorOptions const& options = rclcpp::ExecutorOptions(), + rclcpp::ExecutorOptions const &options = rclcpp::ExecutorOptions(), int number_of_threads = 2, std::chrono::nanoseconds next_exec_timeout = std::chrono::nanoseconds(-1)); - size_t get_number_of_threads(); - void spin() override; + RCLCPP_PUBLIC size_t get_number_of_threads(); + RCLCPP_PUBLIC void spin() override; bool get_next_executable(rclcpp::AnyExecutable& any_executable, std::chrono::nanoseconds timeout = std::chrono::nanoseconds(-1)); protected: - void run(size_t thread_number); + RCLCPP_PUBLIC void run(size_t thread_number); private: RCLCPP_DISABLE_COPY(ROSDefaultMultithreadedExecutor) @@ -76,10 +77,12 @@ public: RCLCPP_SMART_PTR_DEFINITIONS(ROSDefaultExecutor) /// Default constructor. See the default constructor for Executor. + RCLCPP_PUBLIC explicit ROSDefaultExecutor( - rclcpp::ExecutorOptions const& options = rclcpp::ExecutorOptions()); + rclcpp::ExecutorOptions const &options = rclcpp::ExecutorOptions()); /// Default destructor. + RCLCPP_PUBLIC virtual ~ROSDefaultExecutor(); /// Single-threaded implementation of spin. @@ -90,11 +93,13 @@ public: * if the associated context is configured to shutdown on SIGINT. * \throws std::runtime_error when spin() called while already spinning */ + RCLCPP_PUBLIC void spin() override; bool get_next_executable(rclcpp::AnyExecutable& any_executable, std::chrono::nanoseconds timeout = std::chrono::nanoseconds(-1)); + RCLCPP_PUBLIC void wait_for_work(std::chrono::nanoseconds timeout = std::chrono::nanoseconds(-1)); private: diff --git a/src/priority_executor/include/priority_executor/multithread_priority_executor.hpp b/src/priority_executor/include/priority_executor/multithread_priority_executor.hpp index 9a38263..a722b53 100755 --- a/src/priority_executor/include/priority_executor/multithread_priority_executor.hpp +++ b/src/priority_executor/include/priority_executor/multithread_priority_executor.hpp @@ -1,15 +1,16 @@ #ifndef PRIORITY_EXECUTOR__MULTITHREAD_PRIORITY_EXECUTOR_HPP_ #define PRIORITY_EXECUTOR__MULTITHREAD_PRIORITY_EXECUTOR_HPP_ +#include +#include #include #include -#include -#include #include #include #include "priority_executor/default_executor.hpp" #include "priority_executor/priority_executor.hpp" + #include "rclcpp/executor.hpp" #include "rclcpp/macros.hpp" #include "rclcpp/memory_strategies.hpp" @@ -29,18 +30,20 @@ public: int number_of_threads = 2, std::chrono::nanoseconds next_exec_timeout = std::chrono::nanoseconds(-1)); - RCLCPP_PUBLIC virtual ~MultithreadTimedExecutor() = default; + RCLCPP_PUBLIC + virtual ~MultithreadTimedExecutor() = default; + RCLCPP_PUBLIC size_t get_number_of_threads(); RCLCPP_PUBLIC void spin() override; protected: - RCLCPP_PUBLIC void run(size_t _thread_number); + RCLCPP_PUBLIC void run(size_t thread_number); private: + RCLCPP_DISABLE_COPY(MultithreadTimedExecutor) std::set scheduled_timers_; - int number_of_threads_; - std::chrono::nanoseconds next_exec_timeout_ = std::chrono::nanoseconds(-1); - node_time_logger logger_; + size_t number_of_threads_; + std::chrono::nanoseconds next_exec_timeout_{std::chrono::nanoseconds(-1)}; static std::unordered_map> diff --git a/src/priority_executor/include/priority_executor/performance_monitor.hpp b/src/priority_executor/include/priority_executor/performance_monitor.hpp new file mode 100644 index 0000000..beece1f --- /dev/null +++ b/src/priority_executor/include/priority_executor/performance_monitor.hpp @@ -0,0 +1,237 @@ +#ifndef PERFORMANCE_MONITOR_HPP_ +#define PERFORMANCE_MONITOR_HPP_ + +#include +#include +#include +#include +#include +#include +#include +#include +#include "rclcpp/rclcpp.hpp" +#include "simple_timer/rt-sched.hpp" + +namespace priority_executor { + +enum class PerformanceEventType { + CALLBACK_READY, + CALLBACK_START, + CALLBACK_END, + DEADLINE_MISSED, + DEADLINE_MET, + CHAIN_START, + CHAIN_END +}; + +struct PerformanceEvent { + uint64_t timestamp; // High-resolution timestamp (microseconds) + PerformanceEventType type; // Type of event + std::string node_name; // Name of the node + std::string callback_name; // Name of the callback + int chain_id{-1}; // Chain identifier + bool is_first_in_chain{false}; // First callback in chain + bool is_last_in_chain{false}; // Last callback in chain + uint64_t deadline{0}; // Deadline (if applicable) + uint64_t processing_time{0}; // Time spent executing (if applicable) + int executor_id{0}; // Executor identifier + std::string additional_data; // JSON string for extensible data + + // Helper method to extract callback type from additional_data + std::string get_callback_type() const { + if (additional_data.empty()) return "unknown"; + + try { + nlohmann::json json = nlohmann::json::parse(additional_data); + if (json.contains("callback_type")) { + return json["callback_type"].get(); + } + } catch (...) {} + + return "unknown"; + } + + // Helper method to extract thread info from additional_data + int get_thread_id() const { + if (additional_data.empty()) return -1; + + try { + nlohmann::json json = nlohmann::json::parse(additional_data); + if (json.contains("thread_info") && json["thread_info"].is_object() && + json["thread_info"].contains("thread_id")) { + return json["thread_info"]["thread_id"].get(); + } + // Direct thread_id if present + if (json.contains("thread_id")) { + return json["thread_id"].get(); + } + } catch (...) {} + + return -1; + } + + // Helper method to create a descriptive identifier for this callback + std::string get_descriptive_id() const { + std::stringstream ss; + ss << node_name; + + if (!callback_name.empty()) { + ss << "/" << callback_name; + } + + if (chain_id >= 0) { + ss << " (chain:" << chain_id; + if (is_first_in_chain) ss << " start"; + if (is_last_in_chain) ss << " end"; + ss << ")"; + } + + return ss.str(); + } + + // Convert additional_data to JSON object + nlohmann::json get_additional_data_json() const { + try { + if (!additional_data.empty()) { + return nlohmann::json::parse(additional_data); + } + } catch (...) {} + + return nlohmann::json::object(); + } +}; + +class PerformanceMonitor { +public: + static PerformanceMonitor& getInstance(); + + void recordEvent(const PerformanceEvent& event); + void setBufferSize(size_t size); + void setAutoDumpThreshold(size_t threshold); + void setDumpPath(const std::string& path); + bool dumpToFile(const std::string& filename); + void enable(bool enabled = true); + void clear(); + + // Helper method to set monitoring options in one call + void setMonitoringOptions(size_t buffer_size, size_t auto_dump_threshold, + const std::string& dump_path) { + setBufferSize(buffer_size); + setAutoDumpThreshold(auto_dump_threshold); + setDumpPath(dump_path); + } + + // Get all events for a specific callback + std::vector getEventsForCallback(const std::string& node_name, + const std::string& callback_name) const { + std::vector result; + std::lock_guard lock(event_mutex_); + + for (const auto& event : event_buffer_) { + if (event.node_name == node_name && event.callback_name == callback_name) { + result.push_back(event); + } + } + + return result; + } + + // Get all events for a specific chain + std::vector getEventsForChain(int chain_id) const { + std::vector result; + std::lock_guard lock(event_mutex_); + + for (const auto& event : event_buffer_) { + if (event.chain_id == chain_id) { + result.push_back(event); + } + } + + return result; + } + + // Get all events by type + std::vector getEventsByType(PerformanceEventType type) const { + std::vector result; + std::lock_guard lock(event_mutex_); + + for (const auto& event : event_buffer_) { + if (event.type == type) { + result.push_back(event); + } + } + + return result; + } + + // Get a formatted report of callbacks and their execution times + std::string getCallbackExecutionReport() const { + std::lock_guard lock(event_mutex_); + std::stringstream ss; + + ss << "Callback Execution Report:\n"; + ss << "=========================\n\n"; + + // Maps to store total execution time and count per callback + std::unordered_map total_time; + std::unordered_map call_count; + std::unordered_map max_time; + std::unordered_map min_time; + + // Process all CALLBACK_END events (they contain processing_time) + for (const auto& event : event_buffer_) { + if (event.type == PerformanceEventType::CALLBACK_END && event.processing_time > 0) { + std::string callback_id = event.get_descriptive_id(); + total_time[callback_id] += event.processing_time; + call_count[callback_id]++; + max_time[callback_id] = std::max(max_time[callback_id], event.processing_time); + if (min_time[callback_id] == 0 || event.processing_time < min_time[callback_id]) { + min_time[callback_id] = event.processing_time; + } + } + } + + // Print results sorted by callback id + std::vector callback_ids; + for (const auto& entry : call_count) { + callback_ids.push_back(entry.first); + } + + std::sort(callback_ids.begin(), callback_ids.end()); + + for (const auto& id : callback_ids) { + double avg_time = static_cast(total_time[id]) / call_count[id]; + ss << id << "\n"; + ss << " Calls: " << call_count[id] << "\n"; + ss << " Avg time: " << avg_time << " µs\n"; + ss << " Min time: " << min_time[id] << " µs\n"; + ss << " Max time: " << max_time[id] << " µs\n"; + ss << " Total time: " << total_time[id] << " µs\n\n"; + } + + return ss.str(); + } + +private: + PerformanceMonitor(); + ~PerformanceMonitor() = default; + + PerformanceMonitor(const PerformanceMonitor&) = delete; + PerformanceMonitor& operator=(const PerformanceMonitor&) = delete; + PerformanceMonitor(PerformanceMonitor&&) = delete; + PerformanceMonitor& operator=(PerformanceMonitor&&) = delete; + + mutable std::mutex event_mutex_; + std::vector event_buffer_; + size_t buffer_size_{10000}; + size_t auto_dump_threshold_{0}; + std::string dump_path_{"performance_logs"}; + std::atomic enabled_{true}; + + std::string eventsToJson() const; + void checkAndAutoDump(); +}; + +} // namespace priority_executor + +#endif // PERFORMANCE_MONITOR_HPP_ \ No newline at end of file diff --git a/src/priority_executor/include/priority_executor/priority_executor.hpp b/src/priority_executor/include/priority_executor/priority_executor.hpp index e485ced..62d7248 100755 --- a/src/priority_executor/include/priority_executor/priority_executor.hpp +++ b/src/priority_executor/include/priority_executor/priority_executor.hpp @@ -20,12 +20,15 @@ #include #include #include +#include #include "rclcpp/executor.hpp" #include "rclcpp/macros.hpp" #include "rclcpp/visibility_control.hpp" + #include "priority_executor/priority_memory_strategy.hpp" #include "priority_executor/default_executor.hpp" +#include "priority_executor/performance_monitor.hpp" namespace priority_executor { @@ -38,11 +41,13 @@ public: RCLCPP_SMART_PTR_DEFINITIONS(TimedExecutor) /// Default constructor. See the default constructor for Executor. + RCLCPP_PUBLIC explicit TimedExecutor( - rclcpp::ExecutorOptions const& options = rclcpp::ExecutorOptions(), + rclcpp::ExecutorOptions const &options = rclcpp::ExecutorOptions(), std::string name = "unnamed executor"); /// Default destructor. + RCLCPP_PUBLIC virtual ~TimedExecutor(); /// Single-threaded implementation of spin. @@ -53,12 +58,50 @@ public: * if the associated context is configured to shutdown on SIGINT. * \throws std::runtime_error when spin() called while already spinning */ + RCLCPP_PUBLIC void spin() override; std::string name; void set_use_priorities(bool use_prio); std::shared_ptr> prio_memory_strategy_{nullptr}; + // Performance monitoring control + void enableMonitoring(bool enable = true) { + PerformanceMonitor::getInstance().enable(enable); + } + + void setMonitoringOptions( + size_t buffer_size, + size_t auto_dump_threshold, + const std::string& dump_path) { + auto& monitor = PerformanceMonitor::getInstance(); + monitor.setBufferSize(buffer_size); + monitor.setAutoDumpThreshold(auto_dump_threshold); + monitor.setDumpPath(dump_path); + } + + // Get a formatted report of all callback execution times + std::string getCallbackExecutionReport() const { + auto& monitor = PerformanceMonitor::getInstance(); + return monitor.getCallbackExecutionReport(); + } + + // Manually trigger a log dump with a specific filename + bool dumpMonitoringData(const std::string& filename) { + auto& monitor = PerformanceMonitor::getInstance(); + return monitor.dumpToFile(filename); + } + + // Get executor ID for identification in logs + int getExecutorId() const { + return executor_id_; + } + + // Get executor name for identification in logs + const std::string& getExecutorName() const { + return name; + } + protected: bool get_next_executable(rclcpp::AnyExecutable& any_executable, std::chrono::nanoseconds timeout = std::chrono::nanoseconds(-1)); @@ -68,8 +111,14 @@ private: void wait_for_work(std::chrono::nanoseconds timeout); bool get_next_ready_executable(rclcpp::AnyExecutable& any_executable); + void recordExecutionEvent(const rclcpp::AnyExecutable& any_exec, + PerformanceEventType event_type, + uint64_t timestamp, + uint64_t processing_time = 0); bool use_priorities{true}; + static std::atomic next_executor_id_; + int executor_id_; }; } // namespace priority_executor diff --git a/src/priority_executor/include/priority_executor/priority_memory_strategy.hpp b/src/priority_executor/include/priority_executor/priority_memory_strategy.hpp index d71bafb..6033862 100755 --- a/src/priority_executor/include/priority_executor/priority_memory_strategy.hpp +++ b/src/priority_executor/include/priority_executor/priority_memory_strategy.hpp @@ -1,3 +1,17 @@ +// Copyright 2015 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + #ifndef PRIORITY_EXECUTOR__PRIORITY_MEMORY_STRATEGY_HPP_ #define PRIORITY_EXECUTOR__PRIORITY_MEMORY_STRATEGY_HPP_ @@ -10,10 +24,19 @@ #include #include "rcl/allocator.h" + #include "rclcpp/allocator/allocator_common.hpp" #include "rclcpp/memory_strategy.hpp" + #include "simple_timer/rt-sched.hpp" +/// Delegate for handling memory allocations while the Executor is executing. +/** + * By default, the memory strategy dynamically allocates memory for structures + * that come in from the rmw implementation after the executor waits for work, + * based on the number of entities that come through. + */ + namespace priority_executor { enum class ExecutableType { @@ -24,7 +47,7 @@ enum class ExecutableType { WAITABLE }; -std::ostream& operator << (std::ostream& os, const ExecutableType& obj); +std::ostream& operator<<(std::ostream& os, const ExecutableType& obj); enum class ExecutableScheduleType { DEADLINE = 0, @@ -33,15 +56,15 @@ enum class ExecutableScheduleType { DEFAULT, // not used here }; -std::ostream& operator << (std::ostream& os, const ExecutableScheduleType& obj); +std::ostream& operator<<(std::ostream& os, const ExecutableScheduleType& obj); class PriorityExecutable { public: static size_t num_executables; explicit PriorityExecutable( - std::shared_ptr h, - int p, + std::shared_ptr h, + int p, ExecutableType t, ExecutableScheduleType sched_type = ExecutableScheduleType::CHAIN_INDEPENDENT_PRIORITY); @@ -50,8 +73,8 @@ public: void dont_run(); void allow_run(); void increment_counter(); - bool operator==(PriorityExecutable const& other) const; - friend std::ostream& operator<<(std::ostream& os, PriorityExecutable const& pe); + bool operator==(PriorityExecutable const &other) const; + friend std::ostream& operator<<(std::ostream& os, PriorityExecutable const &pe); std::shared_ptr handle; ExecutableType type; @@ -74,7 +97,7 @@ public: class PriorityExecutableComparator { public: - bool operator()(PriorityExecutable const* p1, PriorityExecutable const* p2) const; + bool operator()(PriorityExecutable const *p1, PriorityExecutable const *p2) const; }; template > diff --git a/src/priority_executor/package.xml b/src/priority_executor/package.xml index ee5a022..faaedb5 100755 --- a/src/priority_executor/package.xml +++ b/src/priority_executor/package.xml @@ -6,7 +6,7 @@ ROS 2 package implementing priority-based executors for real-time task scheduling - ROS-Dynamic-Executor Team + kurt Apache License 2.0 ament_cmake diff --git a/src/priority_executor/src/default_executor.cpp b/src/priority_executor/src/default_executor.cpp index 8e3962a..5210e23 100755 --- a/src/priority_executor/src/default_executor.cpp +++ b/src/priority_executor/src/default_executor.cpp @@ -14,12 +14,12 @@ #include "priority_executor/default_executor.hpp" -#include -#include -#include #include -#include +#include +#include #include +#include +#include #include #include "rcpputils/scope_exit.hpp" @@ -33,7 +33,7 @@ std::unordered_map #include @@ -7,6 +8,7 @@ #include #include #include +#include #include "rcpputils/scope_exit.hpp" #include "rclcpp/any_executable.hpp" @@ -20,7 +22,7 @@ std::unordered_mapcontext_) && spinning.load()) { rclcpp::AnyExecutable any_exec; { @@ -83,8 +108,160 @@ void MultithreadTimedExecutor::run(size_t _thread_number) { } } + // Record execution start with thread info + auto start = std::chrono::steady_clock::now(); + uint64_t start_time = std::chrono::duration_cast( + start.time_since_epoch()).count(); + + PerformanceEvent start_event; + start_event.timestamp = start_time; + start_event.type = PerformanceEventType::CALLBACK_START; + start_event.executor_id = getExecutorId(); + + if (any_exec.node_base) { + start_event.node_name = any_exec.node_base->get_name(); + + // Enhanced callback identification based on executable type + if (any_exec.timer) { + // For timer callbacks + std::stringstream ss; + + // retrieve the timer period + int64_t period; + rcl_timer_get_period(any_exec.timer->get_timer_handle().get(), &period); + + // convert period (which is in nanoseconds) to milliseconds + std::chrono::nanoseconds period_ns(period); + std::chrono::milliseconds period_ms = std::chrono::duration_cast(period_ns); + + ss << "timer_" << period_ms.count() << "ms"; + start_event.callback_name = ss.str(); + + // Add memory address as unique identifier + std::stringstream addr; + addr << "@" << any_exec.timer.get(); + nlohmann::json additional; + additional["callback_type"] = "timer"; + additional["timer_addr"] = addr.str(); + additional["thread_info"] = thread_info; + start_event.additional_data = additional.dump(); + } + else if (any_exec.subscription) { + // For subscription callbacks + start_event.callback_name = any_exec.subscription->get_topic_name(); + + nlohmann::json additional; + additional["callback_type"] = "subscription"; + additional["thread_info"] = thread_info; + start_event.additional_data = additional.dump(); + } + else if (any_exec.service) { + start_event.callback_name = any_exec.service->get_service_name(); + + nlohmann::json additional; + additional["callback_type"] = "service"; + additional["thread_info"] = thread_info; + start_event.additional_data = additional.dump(); + } + else if (any_exec.client) { + start_event.callback_name = "client_callback"; + + nlohmann::json additional; + additional["callback_type"] = "client"; + additional["thread_info"] = thread_info; + start_event.additional_data = additional.dump(); + } + else if (any_exec.waitable) { + start_event.callback_name = "waitable_callback"; + + nlohmann::json additional; + additional["callback_type"] = "waitable"; + additional["thread_info"] = thread_info; + start_event.additional_data = additional.dump(); + } + + // Add chain information + std::shared_ptr executable = nullptr; + if (prio_memory_strategy_) { + if (any_exec.timer) { + executable = prio_memory_strategy_->get_priority_settings( + any_exec.timer->get_timer_handle()); + } else if (any_exec.subscription) { + executable = prio_memory_strategy_->get_priority_settings( + any_exec.subscription->get_subscription_handle()); + } + } + + if (executable) { + start_event.chain_id = executable->chain_id; + start_event.is_first_in_chain = executable->is_first_in_chain; + start_event.is_last_in_chain = executable->is_last_in_chain; + + if (executable->deadlines && !executable->deadlines->empty()) { + start_event.deadline = executable->deadlines->front(); + } + + // Include the priority information + try { + nlohmann::json additional = start_event.additional_data.empty() ? + nlohmann::json() : nlohmann::json::parse(start_event.additional_data); + + additional["priority"] = executable->priority; + + if (executable->chain_id >= 0) { + additional["chain_id"] = executable->chain_id; + } + + if (executable->sched_type != ExecutableScheduleType::DEFAULT) { + additional["schedule_type"] = static_cast(executable->sched_type); + } + + start_event.additional_data = additional.dump(); + } catch (...) { + // If there was an error parsing the JSON, create new JSON object + nlohmann::json additional; + additional["priority"] = executable->priority; + additional["thread_info"] = thread_info; + start_event.additional_data = additional.dump(); + } + } + } else { + start_event.additional_data = thread_info_str; + } + + PerformanceMonitor::getInstance().recordEvent(start_event); + + // Execute the callback execute_any_executable(any_exec); + // Record execution completion + auto end = std::chrono::steady_clock::now(); + uint64_t end_time = std::chrono::duration_cast( + end.time_since_epoch()).count(); + uint64_t processing_time = end_time - start_time; + + // Create identical event structure but for end event + PerformanceEvent end_event = start_event; + end_event.timestamp = end_time; + end_event.type = PerformanceEventType::CALLBACK_END; + end_event.processing_time = processing_time; + + // Update the additional data to include processing time + try { + nlohmann::json additional = end_event.additional_data.empty() ? + nlohmann::json() : nlohmann::json::parse(end_event.additional_data); + additional["processing_time_us"] = processing_time; + end_event.additional_data = additional.dump(); + } catch (...) { + // If JSON parsing failed, create a new object + nlohmann::json additional; + additional["processing_time_us"] = processing_time; + additional["thread_info"] = thread_info; + end_event.additional_data = additional.dump(); + } + + PerformanceMonitor::getInstance().recordEvent(end_event); + if (any_exec.timer) { auto wait_mutex = MultithreadTimedExecutor::wait_mutex_set_[this]; auto high_priority_wait_mutex = wait_mutex->get_high_priority_lockable(); @@ -98,6 +275,13 @@ void MultithreadTimedExecutor::run(size_t _thread_number) { // Clear the callback_group to prevent the AnyExecutable destructor from // resetting the callback group `can_be_taken_from` any_exec.callback_group.reset(); + if (prio_memory_strategy_ != nullptr) + { + auto wait_mutex = MultithreadTimedExecutor::wait_mutex_set_[this]; + auto low_priority_wait_mutex = wait_mutex->get_low_priority_lockable(); + std::lock_guard wait_lock(low_priority_wait_mutex); + prio_memory_strategy_->post_execute(any_exec, _thread_number); + } } } } // namespace priority_executor diff --git a/src/priority_executor/src/performance_monitor.cpp b/src/priority_executor/src/performance_monitor.cpp new file mode 100644 index 0000000..1c5f569 --- /dev/null +++ b/src/priority_executor/src/performance_monitor.cpp @@ -0,0 +1,149 @@ +#include "priority_executor/performance_monitor.hpp" +#include +#include +#include +#include +#include + +namespace priority_executor { + +PerformanceMonitor& PerformanceMonitor::getInstance() { + static PerformanceMonitor instance; + return instance; +} + +PerformanceMonitor::PerformanceMonitor() { + event_buffer_.reserve(buffer_size_); + std::filesystem::create_directories(dump_path_); +} + +void PerformanceMonitor::recordEvent(const PerformanceEvent& event) { + if (!enabled_) return; + + { + std::lock_guard lock(event_mutex_); + event_buffer_.push_back(event); + + if (auto_dump_threshold_ > 0 && event_buffer_.size() >= auto_dump_threshold_) { + checkAndAutoDump(); + } + } +} + +void PerformanceMonitor::setBufferSize(size_t size) { + std::lock_guard lock(event_mutex_); + buffer_size_ = size; + event_buffer_.reserve(size); +} + +void PerformanceMonitor::setAutoDumpThreshold(size_t threshold) { + auto_dump_threshold_ = threshold; +} + +void PerformanceMonitor::setDumpPath(const std::string& path) { + dump_path_ = path; + std::filesystem::create_directories(dump_path_); +} + +void PerformanceMonitor::enable(bool enabled) { + enabled_ = enabled; +} + +void PerformanceMonitor::clear() { + std::lock_guard lock(event_mutex_); + event_buffer_.clear(); +} + +std::string PerformanceMonitor::eventsToJson() const { + nlohmann::json root = nlohmann::json::array(); + + for (const auto& event : event_buffer_) { + nlohmann::json event_json; + + event_json["timestamp"] = event.timestamp; + + switch (event.type) { + case PerformanceEventType::CALLBACK_READY: + event_json["type"] = "callback_ready"; + break; + case PerformanceEventType::CALLBACK_START: + event_json["type"] = "callback_start"; + break; + case PerformanceEventType::CALLBACK_END: + event_json["type"] = "callback_end"; + break; + case PerformanceEventType::DEADLINE_MISSED: + event_json["type"] = "deadline_missed"; + break; + case PerformanceEventType::DEADLINE_MET: + event_json["type"] = "deadline_met"; + break; + case PerformanceEventType::CHAIN_START: + event_json["type"] = "chain_start"; + break; + case PerformanceEventType::CHAIN_END: + event_json["type"] = "chain_end"; + break; + } + + event_json["node_name"] = event.node_name; + event_json["callback_name"] = event.callback_name; + event_json["chain_id"] = event.chain_id; + event_json["is_first_in_chain"] = event.is_first_in_chain; + event_json["is_last_in_chain"] = event.is_last_in_chain; + + if (event.deadline > 0) { + event_json["deadline"] = event.deadline; + } + + if (event.processing_time > 0) { + event_json["processing_time"] = event.processing_time; + } + + event_json["executor_id"] = event.executor_id; + + if (!event.additional_data.empty()) { + try { + event_json["additional_data"] = nlohmann::json::parse(event.additional_data); + } catch (...) { + event_json["additional_data"] = event.additional_data; + } + } + + root.push_back(event_json); + } + + return root.dump(2); // Indent with 2 spaces +} + +bool PerformanceMonitor::dumpToFile(const std::string& filename) { + std::lock_guard lock(event_mutex_); + + if (event_buffer_.empty()) { + return true; // Nothing to dump + } + + std::string full_path = dump_path_ + "/" + filename; + std::ofstream file(full_path); + if (!file.is_open()) { + return false; + } + + file << eventsToJson(); + event_buffer_.clear(); + return true; +} + +void PerformanceMonitor::checkAndAutoDump() { + if (event_buffer_.size() >= auto_dump_threshold_) { + auto now = std::chrono::system_clock::now(); + auto timestamp = std::chrono::duration_cast( + now.time_since_epoch()).count(); + + std::stringstream ss; + ss << "performance_log_" << timestamp << ".json"; + dumpToFile(ss.str()); + } +} + +} // namespace priority_executor \ No newline at end of file diff --git a/src/priority_executor/src/priority_executor.cpp b/src/priority_executor/src/priority_executor.cpp index e5f1fa0..f9eee3e 100755 --- a/src/priority_executor/src/priority_executor.cpp +++ b/src/priority_executor/src/priority_executor.cpp @@ -24,7 +24,7 @@ namespace priority_executor { -TimedExecutor::TimedExecutor(rclcpp::ExecutorOptions const& options, std::string name) +TimedExecutor::TimedExecutor(rclcpp::ExecutorOptions const &options, std::string name) : rclcpp::Executor(options) { this->name = std::move(name); logger_ = create_logger(); @@ -38,11 +38,19 @@ void TimedExecutor::spin() { } RCLCPP_SCOPE_EXIT(this->spinning.store(false);); - while (rclcpp::ok(this->context_) && spinning.load()) { + using namespace std::chrono_literals; + std::chrono::time_point start = std::chrono::steady_clock::now(); + + while (rclcpp::ok(this->context_) && spinning.load() && (std::chrono::steady_clock::now() - start) <= 5s) { rclcpp::AnyExecutable any_executable; + // std::cout<number_of_ready_timers()<name << std::endl; + // size_t ready = memory_strategy_->number_of_ready_subscriptions(); + // std::cout << "ready:" << ready << std::endl; if (get_next_executable(any_executable, std::chrono::nanoseconds(-1))) { execute_any_executable(any_executable); + // make sure memory_strategy_ is an instance of PriorityMemoryStrategy if (prio_memory_strategy_ != nullptr) { prio_memory_strategy_->post_execute(any_executable); } @@ -55,6 +63,11 @@ bool TimedExecutor::get_next_executable( rclcpp::AnyExecutable& any_executable, std::chrono::nanoseconds timeout) { bool success = false; + // Check to see if there are any subscriptions or timers needing service + // TODO(wjwwood): improve run to run efficiency of this function + // sched_yield(); + // sleep for 10us + // usleep(20); auto const start = std::chrono::steady_clock::now(); wait_for_work(timeout); diff --git a/src/priority_executor/src/priority_memory_strategy.cpp b/src/priority_executor/src/priority_memory_strategy.cpp index 0fbdf5a..68f6295 100755 --- a/src/priority_executor/src/priority_memory_strategy.cpp +++ b/src/priority_executor/src/priority_memory_strategy.cpp @@ -1,9 +1,11 @@ #include "priority_executor/priority_memory_strategy.hpp" #include "simple_timer/rt-sched.hpp" + #include -#include -#include #include +#include + +#include using namespace priority_executor; @@ -33,11 +35,10 @@ std::ostream& priority_executor::operator<<(std::ostream& os, const ExecutableSc return os; } -std::ostream &operator<<(std::ostream &os, const PriorityExecutable &pe) +std::ostream& priority_executor::operator<<(std::ostream &os, const PriorityExecutable &pe) { os << "sched_type: " << pe.sched_type << ", "; - if (pe.sched_type == ExecutableScheduleType::DEADLINE) - { + if (pe.sched_type == ExecutableScheduleType::DEADLINE) { os << "period: " << pe.period << ", "; } os << "priority: " << pe.priority << ", "; @@ -68,13 +69,16 @@ std::shared_ptr PriorityMemoryStrategy<>::get_and_reset_priority(std::shared_ptr executable, ExecutableType t) { + // PriorityExecutable *p = get_priority_settings(executable); std::shared_ptr p = get_priority_settings(executable); if (p == nullptr) { + // priority_map[executable] = PriorityExecutable(executable, 0, t); priority_map[executable] = std::make_shared(executable, 0, t); p = priority_map[executable]; } + // p->can_be_run = true; return p; } @@ -287,7 +291,7 @@ void PriorityExecutable::increment_counter() { this->counter += 1; } -bool PriorityExecutable::operator==(PriorityExecutable const& other) const { +bool PriorityExecutable::operator==(PriorityExecutable const &other) const { std::cout << "PriorityExecutable::operator== called" << std::endl; return this->handle == other.handle; } @@ -296,7 +300,11 @@ bool PriorityExecutableComparator::operator()( PriorityExecutable const *p1, PriorityExecutable const *p2) const { + // since this will be used in a std::set, also check for equality if (p1 == nullptr || p2 == nullptr) { + // TODO: realistic value + std::cout << "PriorityExecutableComparator::operator() called with nullptr" + << std::endl; return false; } @@ -311,18 +319,21 @@ bool PriorityExecutableComparator::operator()( if (p1->sched_type == ExecutableScheduleType::CHAIN_INDEPENDENT_PRIORITY) { return p1->priority == p2->priority ? - p1->executable_id < p2->executable_id : + p1->executable_id < p2->executable_id : + // tie-breaker: lower value runs first p1->priority < p2->priority; } if (p1->sched_type == ExecutableScheduleType::CHAIN_AWARE_PRIORITY) { return p1->priority == p2->priority ? - p1->executable_id < p2->executable_id : + p1->executable_id < p2->executable_id : // tie-breaker: lower value runs first p1->priority < p2->priority; } if (p1->sched_type == ExecutableScheduleType::DEADLINE) { + // TODO: use the counter logic here as well + uint64_t p1_deadline = 0; uint64_t p2_deadline = 0; @@ -361,8 +372,8 @@ bool PriorityExecutableComparator::operator()( template <> void PriorityMemoryStrategy<>::add_guard_condition( - rcl_guard_condition_t const* guard_condition) { - for (auto const& existing_guard_condition : guard_conditions_) { + rcl_guard_condition_t const *guard_condition) { + for (auto const &existing_guard_condition : guard_conditions_) { if (existing_guard_condition == guard_condition) { return; } @@ -372,7 +383,7 @@ void PriorityMemoryStrategy<>::add_guard_condition( template <> void PriorityMemoryStrategy<>::remove_guard_condition( - rcl_guard_condition_t const* guard_condition) { + rcl_guard_condition_t const *guard_condition) { for (auto it = guard_conditions_.begin(); it != guard_conditions_.end(); ++it) { if (*it == guard_condition) { guard_conditions_.erase(it); diff --git a/src/priority_executor/src/usage_example.cpp b/src/priority_executor/src/usage_example.cpp index 9ae1f31..b9c4b4c 100755 --- a/src/priority_executor/src/usage_example.cpp +++ b/src/priority_executor/src/usage_example.cpp @@ -1,43 +1,107 @@ -// Copyright 2015 Open Source Robotics Foundation, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#include -#include #include -#include +#include +#include -#include "priority_executor/default_executor.hpp" -#include "priority_executor/multithread_priority_executor.hpp" -#include "rclcpp/rclcpp.hpp" -#include "std_msgs/msg/string.hpp" +#include +#include -using namespace std::chrono_literals; +#include "priority_executor/priority_executor.hpp" +#include "priority_executor/priority_memory_strategy.hpp" -void chatter_callback(std_msgs::msg::String::SharedPtr const msg) { - RCLCPP_INFO(rclcpp::get_logger("rclcpp"), "I heard: '%s'", msg->data.c_str()); -} +// re-create the classic talker-listener example with two listeners +class Talker : public rclcpp::Node +{ +public: + Talker() : Node("talker") + { + // Create a publisher on the "chatter" topic with 10 msg queue size. + pub_ = this->create_publisher("chatter", 10); + // Create a timer of period 1s that calls our callback member function. + timer_ = this->create_wall_timer(std::chrono::seconds(1), + std::bind(&Talker::timer_callback, this)); + } + // the timer must be public + rclcpp::TimerBase::SharedPtr timer_; + +private: + void timer_callback() + { + std_msgs::msg::String msg; + msg.data = "Hello World!"; + RCLCPP_INFO(this->get_logger(), "Publishing: '%s'", msg.data.c_str()); + pub_->publish(msg); + } + rclcpp::Publisher::SharedPtr pub_; +}; + +class Listener : public rclcpp::Node +{ +public: + Listener(std::string name) : Node(name) + { + // Create a subscription on the "chatter" topic with the default callback + // method. + sub_ = this->create_subscription( + "chatter", 10, + std::bind(&Listener::callback, this, std::placeholders::_1)); + } + // the publisher must be public + rclcpp::Subscription::SharedPtr sub_; + +private: + void callback(const std_msgs::msg::String::SharedPtr msg) + { + RCLCPP_INFO(this->get_logger(), "I heard: '%s'", msg->data.c_str()); + } +}; int main(int argc, char* argv[]) { rclcpp::init(argc, argv); - auto node = rclcpp::Node::make_shared("minimal_subscriber"); + auto talker = std::make_shared(); + auto listener1 = std::make_shared("listener1"); + auto listener2 = std::make_shared("listener2"); + rclcpp::ExecutorOptions options; - auto subscription = node->create_subscription( - "topic", 10, chatter_callback); + auto strategy = std::make_shared>(); + options.memory_strategy = strategy; + auto executor = new priority_executor::TimedExecutor(options); - auto executor = std::make_shared(); - executor->add_node(node); + // must be set to post_execute can set new deadlines + executor->prio_memory_strategy_ = strategy; + + + // the new funcitons in PriorityMemoryStrategy accept the handle of the + // timer/subscription as the first argument + strategy->set_executable_deadline(talker->timer_->get_timer_handle(), 1000, + priority_executor::ExecutableType::TIMER, 0); + // you _must_ set the timer_handle for each chain + strategy->get_priority_settings(talker->timer_->get_timer_handle()) + ->timer_handle = talker->timer_; + // you _must_ mark the first executable in the chain + strategy->set_first_in_chain(talker->timer_->get_timer_handle()); + // set the same period and chain_id for each callback in the chain + strategy->set_executable_deadline(listener1->sub_->get_subscription_handle(), + 1000, priority_executor::ExecutableType::SUBSCRIPTION, 0); + strategy->set_executable_deadline(listener2->sub_->get_subscription_handle(), + 1000, priority_executor::ExecutableType::SUBSCRIPTION, 0); + // you _must_ mark the last executable in the chain (used to keep track of different instances of the same chain) + strategy->set_last_in_chain(listener2->sub_->get_subscription_handle()); + // add all the nodes to the executor + executor->add_node(talker); + executor->add_node(listener1); + executor->add_node(listener2); + + // if the executor behaves unexpectedly, you can print the priority settings to make sure they are correct + std::cout << *strategy->get_priority_settings( + talker->timer_->get_timer_handle()) + << std::endl; + std::cout << *strategy->get_priority_settings( + listener1->sub_->get_subscription_handle()) + << std::endl; + std::cout << *strategy->get_priority_settings( + listener2->sub_->get_subscription_handle()) + << std::endl; executor->spin(); rclcpp::shutdown(); diff --git a/src/simple_timer/package.xml b/src/simple_timer/package.xml index 3022371..f0e1c00 100644 --- a/src/simple_timer/package.xml +++ b/src/simple_timer/package.xml @@ -6,7 +6,7 @@ ROS 2 package providing timer functionality for the dynamic executor experiments - ROS-Dynamic-Executor Team + kurt Apache License 2.0 ament_cmake diff --git a/src/simple_timer/src/rt-sched.cpp b/src/simple_timer/src/rt-sched.cpp index f718941..a6955c8 100644 --- a/src/simple_timer/src/rt-sched.cpp +++ b/src/simple_timer/src/rt-sched.cpp @@ -33,6 +33,7 @@ int sched_getattr( void log_entry(node_time_logger logger, std::string const& text) { if (logger.recorded_times != nullptr) { logger.recorded_times->emplace_back(text, get_time_us() / 1000); + // std::cout<