diff --git a/src/priority_executor/CMakeLists.txt b/src/priority_executor/CMakeLists.txt index 4ce2108..e6ec3da 100755 --- a/src/priority_executor/CMakeLists.txt +++ b/src/priority_executor/CMakeLists.txt @@ -19,11 +19,16 @@ find_package(rmw REQUIRED) find_package(std_msgs REQUIRED) find_package(std_srvs REQUIRED) find_package(simple_timer REQUIRED) +find_package(nlohmann_json REQUIRED) # Library targets add_library(priority_executor src/priority_executor.cpp src/priority_memory_strategy.cpp + src/performance_monitor.cpp + src/default_executor.cpp + src/usage_example.cpp + src/multithread_priority_executor.cpp ) target_include_directories(priority_executor PUBLIC $ @@ -34,35 +39,7 @@ ament_target_dependencies(priority_executor rclcpp rcl simple_timer -) - -add_library(multithread_priority_executor - src/multithread_priority_executor.cpp - src/priority_memory_strategy.cpp -) -target_include_directories(multithread_priority_executor PUBLIC - $ - $ -) -ament_target_dependencies(multithread_priority_executor - rmw - rclcpp - rcl - simple_timer -) - -add_library(default_executor - src/default_executor.cpp -) -target_include_directories(default_executor PUBLIC - $ - $ -) -ament_target_dependencies(default_executor - rmw - rclcpp - rcl - simple_timer + nlohmann_json ) # Example executable @@ -75,8 +52,6 @@ target_include_directories(usage_example PUBLIC ) target_link_libraries(usage_example priority_executor - multithread_priority_executor - default_executor ) ament_target_dependencies(usage_example rclcpp @@ -98,7 +73,7 @@ install( ) install( - TARGETS priority_executor multithread_priority_executor default_executor + TARGETS priority_executor EXPORT export_${PROJECT_NAME} ARCHIVE DESTINATION lib LIBRARY DESTINATION lib @@ -114,5 +89,5 @@ install( # Export and package configuration ament_export_include_directories(include) ament_export_targets(export_${PROJECT_NAME} HAS_LIBRARY_TARGET) -ament_export_dependencies(rclcpp rcl rmw simple_timer) +ament_export_dependencies(rclcpp rcl rmw simple_timer nlohmann_json) ament_package() diff --git a/src/priority_executor/include/priority_executor/multithread_priority_executor.hpp b/src/priority_executor/include/priority_executor/multithread_priority_executor.hpp index a895af6..a722b53 100755 --- a/src/priority_executor/include/priority_executor/multithread_priority_executor.hpp +++ b/src/priority_executor/include/priority_executor/multithread_priority_executor.hpp @@ -37,13 +37,13 @@ public: RCLCPP_PUBLIC void spin() override; protected: - RCLCPP_PUBLIC void run(size_t _thread_number); + RCLCPP_PUBLIC void run(size_t thread_number); private: + RCLCPP_DISABLE_COPY(MultithreadTimedExecutor) std::set scheduled_timers_; size_t number_of_threads_; - std::chrono::nanoseconds next_exec_timeout_ = std::chrono::nanoseconds(-1); - node_time_logger logger_; + std::chrono::nanoseconds next_exec_timeout_{std::chrono::nanoseconds(-1)}; static std::unordered_map> diff --git a/src/priority_executor/include/priority_executor/performance_monitor.hpp b/src/priority_executor/include/priority_executor/performance_monitor.hpp new file mode 100644 index 0000000..beece1f --- /dev/null +++ b/src/priority_executor/include/priority_executor/performance_monitor.hpp @@ -0,0 +1,237 @@ +#ifndef PERFORMANCE_MONITOR_HPP_ +#define PERFORMANCE_MONITOR_HPP_ + +#include +#include +#include +#include +#include +#include +#include +#include +#include "rclcpp/rclcpp.hpp" +#include "simple_timer/rt-sched.hpp" + +namespace priority_executor { + +enum class PerformanceEventType { + CALLBACK_READY, + CALLBACK_START, + CALLBACK_END, + DEADLINE_MISSED, + DEADLINE_MET, + CHAIN_START, + CHAIN_END +}; + +struct PerformanceEvent { + uint64_t timestamp; // High-resolution timestamp (microseconds) + PerformanceEventType type; // Type of event + std::string node_name; // Name of the node + std::string callback_name; // Name of the callback + int chain_id{-1}; // Chain identifier + bool is_first_in_chain{false}; // First callback in chain + bool is_last_in_chain{false}; // Last callback in chain + uint64_t deadline{0}; // Deadline (if applicable) + uint64_t processing_time{0}; // Time spent executing (if applicable) + int executor_id{0}; // Executor identifier + std::string additional_data; // JSON string for extensible data + + // Helper method to extract callback type from additional_data + std::string get_callback_type() const { + if (additional_data.empty()) return "unknown"; + + try { + nlohmann::json json = nlohmann::json::parse(additional_data); + if (json.contains("callback_type")) { + return json["callback_type"].get(); + } + } catch (...) {} + + return "unknown"; + } + + // Helper method to extract thread info from additional_data + int get_thread_id() const { + if (additional_data.empty()) return -1; + + try { + nlohmann::json json = nlohmann::json::parse(additional_data); + if (json.contains("thread_info") && json["thread_info"].is_object() && + json["thread_info"].contains("thread_id")) { + return json["thread_info"]["thread_id"].get(); + } + // Direct thread_id if present + if (json.contains("thread_id")) { + return json["thread_id"].get(); + } + } catch (...) {} + + return -1; + } + + // Helper method to create a descriptive identifier for this callback + std::string get_descriptive_id() const { + std::stringstream ss; + ss << node_name; + + if (!callback_name.empty()) { + ss << "/" << callback_name; + } + + if (chain_id >= 0) { + ss << " (chain:" << chain_id; + if (is_first_in_chain) ss << " start"; + if (is_last_in_chain) ss << " end"; + ss << ")"; + } + + return ss.str(); + } + + // Convert additional_data to JSON object + nlohmann::json get_additional_data_json() const { + try { + if (!additional_data.empty()) { + return nlohmann::json::parse(additional_data); + } + } catch (...) {} + + return nlohmann::json::object(); + } +}; + +class PerformanceMonitor { +public: + static PerformanceMonitor& getInstance(); + + void recordEvent(const PerformanceEvent& event); + void setBufferSize(size_t size); + void setAutoDumpThreshold(size_t threshold); + void setDumpPath(const std::string& path); + bool dumpToFile(const std::string& filename); + void enable(bool enabled = true); + void clear(); + + // Helper method to set monitoring options in one call + void setMonitoringOptions(size_t buffer_size, size_t auto_dump_threshold, + const std::string& dump_path) { + setBufferSize(buffer_size); + setAutoDumpThreshold(auto_dump_threshold); + setDumpPath(dump_path); + } + + // Get all events for a specific callback + std::vector getEventsForCallback(const std::string& node_name, + const std::string& callback_name) const { + std::vector result; + std::lock_guard lock(event_mutex_); + + for (const auto& event : event_buffer_) { + if (event.node_name == node_name && event.callback_name == callback_name) { + result.push_back(event); + } + } + + return result; + } + + // Get all events for a specific chain + std::vector getEventsForChain(int chain_id) const { + std::vector result; + std::lock_guard lock(event_mutex_); + + for (const auto& event : event_buffer_) { + if (event.chain_id == chain_id) { + result.push_back(event); + } + } + + return result; + } + + // Get all events by type + std::vector getEventsByType(PerformanceEventType type) const { + std::vector result; + std::lock_guard lock(event_mutex_); + + for (const auto& event : event_buffer_) { + if (event.type == type) { + result.push_back(event); + } + } + + return result; + } + + // Get a formatted report of callbacks and their execution times + std::string getCallbackExecutionReport() const { + std::lock_guard lock(event_mutex_); + std::stringstream ss; + + ss << "Callback Execution Report:\n"; + ss << "=========================\n\n"; + + // Maps to store total execution time and count per callback + std::unordered_map total_time; + std::unordered_map call_count; + std::unordered_map max_time; + std::unordered_map min_time; + + // Process all CALLBACK_END events (they contain processing_time) + for (const auto& event : event_buffer_) { + if (event.type == PerformanceEventType::CALLBACK_END && event.processing_time > 0) { + std::string callback_id = event.get_descriptive_id(); + total_time[callback_id] += event.processing_time; + call_count[callback_id]++; + max_time[callback_id] = std::max(max_time[callback_id], event.processing_time); + if (min_time[callback_id] == 0 || event.processing_time < min_time[callback_id]) { + min_time[callback_id] = event.processing_time; + } + } + } + + // Print results sorted by callback id + std::vector callback_ids; + for (const auto& entry : call_count) { + callback_ids.push_back(entry.first); + } + + std::sort(callback_ids.begin(), callback_ids.end()); + + for (const auto& id : callback_ids) { + double avg_time = static_cast(total_time[id]) / call_count[id]; + ss << id << "\n"; + ss << " Calls: " << call_count[id] << "\n"; + ss << " Avg time: " << avg_time << " µs\n"; + ss << " Min time: " << min_time[id] << " µs\n"; + ss << " Max time: " << max_time[id] << " µs\n"; + ss << " Total time: " << total_time[id] << " µs\n\n"; + } + + return ss.str(); + } + +private: + PerformanceMonitor(); + ~PerformanceMonitor() = default; + + PerformanceMonitor(const PerformanceMonitor&) = delete; + PerformanceMonitor& operator=(const PerformanceMonitor&) = delete; + PerformanceMonitor(PerformanceMonitor&&) = delete; + PerformanceMonitor& operator=(PerformanceMonitor&&) = delete; + + mutable std::mutex event_mutex_; + std::vector event_buffer_; + size_t buffer_size_{10000}; + size_t auto_dump_threshold_{0}; + std::string dump_path_{"performance_logs"}; + std::atomic enabled_{true}; + + std::string eventsToJson() const; + void checkAndAutoDump(); +}; + +} // namespace priority_executor + +#endif // PERFORMANCE_MONITOR_HPP_ \ No newline at end of file diff --git a/src/priority_executor/include/priority_executor/priority_executor.hpp b/src/priority_executor/include/priority_executor/priority_executor.hpp index 5923496..62d7248 100755 --- a/src/priority_executor/include/priority_executor/priority_executor.hpp +++ b/src/priority_executor/include/priority_executor/priority_executor.hpp @@ -20,6 +20,7 @@ #include #include #include +#include #include "rclcpp/executor.hpp" #include "rclcpp/macros.hpp" @@ -27,6 +28,7 @@ #include "priority_executor/priority_memory_strategy.hpp" #include "priority_executor/default_executor.hpp" +#include "priority_executor/performance_monitor.hpp" namespace priority_executor { @@ -63,6 +65,43 @@ public: void set_use_priorities(bool use_prio); std::shared_ptr> prio_memory_strategy_{nullptr}; + // Performance monitoring control + void enableMonitoring(bool enable = true) { + PerformanceMonitor::getInstance().enable(enable); + } + + void setMonitoringOptions( + size_t buffer_size, + size_t auto_dump_threshold, + const std::string& dump_path) { + auto& monitor = PerformanceMonitor::getInstance(); + monitor.setBufferSize(buffer_size); + monitor.setAutoDumpThreshold(auto_dump_threshold); + monitor.setDumpPath(dump_path); + } + + // Get a formatted report of all callback execution times + std::string getCallbackExecutionReport() const { + auto& monitor = PerformanceMonitor::getInstance(); + return monitor.getCallbackExecutionReport(); + } + + // Manually trigger a log dump with a specific filename + bool dumpMonitoringData(const std::string& filename) { + auto& monitor = PerformanceMonitor::getInstance(); + return monitor.dumpToFile(filename); + } + + // Get executor ID for identification in logs + int getExecutorId() const { + return executor_id_; + } + + // Get executor name for identification in logs + const std::string& getExecutorName() const { + return name; + } + protected: bool get_next_executable(rclcpp::AnyExecutable& any_executable, std::chrono::nanoseconds timeout = std::chrono::nanoseconds(-1)); @@ -72,8 +111,14 @@ private: void wait_for_work(std::chrono::nanoseconds timeout); bool get_next_ready_executable(rclcpp::AnyExecutable& any_executable); + void recordExecutionEvent(const rclcpp::AnyExecutable& any_exec, + PerformanceEventType event_type, + uint64_t timestamp, + uint64_t processing_time = 0); bool use_priorities{true}; + static std::atomic next_executor_id_; + int executor_id_; }; } // namespace priority_executor diff --git a/src/priority_executor/src/multithread_priority_executor.cpp b/src/priority_executor/src/multithread_priority_executor.cpp index 50f65f5..7619413 100755 --- a/src/priority_executor/src/multithread_priority_executor.cpp +++ b/src/priority_executor/src/multithread_priority_executor.cpp @@ -1,4 +1,5 @@ #include "priority_executor/multithread_priority_executor.hpp" +#include "priority_executor/performance_monitor.hpp" #include #include @@ -7,6 +8,7 @@ #include #include #include +#include #include "rcpputils/scope_exit.hpp" #include "rclcpp/any_executable.hpp" @@ -58,6 +60,7 @@ void MultithreadTimedExecutor::spin() { for (auto& thread : threads) { thread.join(); } + spinning.store(false); } void MultithreadTimedExecutor::run(size_t _thread_number) { @@ -68,9 +71,16 @@ void MultithreadTimedExecutor::run(size_t _thread_number) { int rc = pthread_setaffinity_np(pthread_self(), sizeof(cpu_set_t), &cpuset); if (rc != 0) { - std::cout << "Error calling pthread_setaffinity_np: " << rc << "\n"; + RCLCPP_ERROR(rclcpp::get_logger("priority_executor"), + "Error calling pthread_setaffinity_np: %d", rc); } + // Create thread context information for logs + nlohmann::json thread_info; + thread_info["thread_id"] = _thread_number; + thread_info["cpu_affinity"] = _thread_number; + std::string thread_info_str = thread_info.dump(); + while (rclcpp::ok(this->context_) && spinning.load()) { rclcpp::AnyExecutable any_exec; { @@ -98,8 +108,160 @@ void MultithreadTimedExecutor::run(size_t _thread_number) { } } + // Record execution start with thread info + auto start = std::chrono::steady_clock::now(); + uint64_t start_time = std::chrono::duration_cast( + start.time_since_epoch()).count(); + + PerformanceEvent start_event; + start_event.timestamp = start_time; + start_event.type = PerformanceEventType::CALLBACK_START; + start_event.executor_id = getExecutorId(); + + if (any_exec.node_base) { + start_event.node_name = any_exec.node_base->get_name(); + + // Enhanced callback identification based on executable type + if (any_exec.timer) { + // For timer callbacks + std::stringstream ss; + + // retrieve the timer period + int64_t period; + rcl_timer_get_period(any_exec.timer->get_timer_handle().get(), &period); + + // convert period (which is in nanoseconds) to milliseconds + std::chrono::nanoseconds period_ns(period); + std::chrono::milliseconds period_ms = std::chrono::duration_cast(period_ns); + + ss << "timer_" << period_ms.count() << "ms"; + start_event.callback_name = ss.str(); + + // Add memory address as unique identifier + std::stringstream addr; + addr << "@" << any_exec.timer.get(); + nlohmann::json additional; + additional["callback_type"] = "timer"; + additional["timer_addr"] = addr.str(); + additional["thread_info"] = thread_info; + start_event.additional_data = additional.dump(); + } + else if (any_exec.subscription) { + // For subscription callbacks + start_event.callback_name = any_exec.subscription->get_topic_name(); + + nlohmann::json additional; + additional["callback_type"] = "subscription"; + additional["thread_info"] = thread_info; + start_event.additional_data = additional.dump(); + } + else if (any_exec.service) { + start_event.callback_name = any_exec.service->get_service_name(); + + nlohmann::json additional; + additional["callback_type"] = "service"; + additional["thread_info"] = thread_info; + start_event.additional_data = additional.dump(); + } + else if (any_exec.client) { + start_event.callback_name = "client_callback"; + + nlohmann::json additional; + additional["callback_type"] = "client"; + additional["thread_info"] = thread_info; + start_event.additional_data = additional.dump(); + } + else if (any_exec.waitable) { + start_event.callback_name = "waitable_callback"; + + nlohmann::json additional; + additional["callback_type"] = "waitable"; + additional["thread_info"] = thread_info; + start_event.additional_data = additional.dump(); + } + + // Add chain information + std::shared_ptr executable = nullptr; + if (prio_memory_strategy_) { + if (any_exec.timer) { + executable = prio_memory_strategy_->get_priority_settings( + any_exec.timer->get_timer_handle()); + } else if (any_exec.subscription) { + executable = prio_memory_strategy_->get_priority_settings( + any_exec.subscription->get_subscription_handle()); + } + } + + if (executable) { + start_event.chain_id = executable->chain_id; + start_event.is_first_in_chain = executable->is_first_in_chain; + start_event.is_last_in_chain = executable->is_last_in_chain; + + if (executable->deadlines && !executable->deadlines->empty()) { + start_event.deadline = executable->deadlines->front(); + } + + // Include the priority information + try { + nlohmann::json additional = start_event.additional_data.empty() ? + nlohmann::json() : nlohmann::json::parse(start_event.additional_data); + + additional["priority"] = executable->priority; + + if (executable->chain_id >= 0) { + additional["chain_id"] = executable->chain_id; + } + + if (executable->sched_type != ExecutableScheduleType::DEFAULT) { + additional["schedule_type"] = static_cast(executable->sched_type); + } + + start_event.additional_data = additional.dump(); + } catch (...) { + // If there was an error parsing the JSON, create new JSON object + nlohmann::json additional; + additional["priority"] = executable->priority; + additional["thread_info"] = thread_info; + start_event.additional_data = additional.dump(); + } + } + } else { + start_event.additional_data = thread_info_str; + } + + PerformanceMonitor::getInstance().recordEvent(start_event); + + // Execute the callback execute_any_executable(any_exec); + // Record execution completion + auto end = std::chrono::steady_clock::now(); + uint64_t end_time = std::chrono::duration_cast( + end.time_since_epoch()).count(); + uint64_t processing_time = end_time - start_time; + + // Create identical event structure but for end event + PerformanceEvent end_event = start_event; + end_event.timestamp = end_time; + end_event.type = PerformanceEventType::CALLBACK_END; + end_event.processing_time = processing_time; + + // Update the additional data to include processing time + try { + nlohmann::json additional = end_event.additional_data.empty() ? + nlohmann::json() : nlohmann::json::parse(end_event.additional_data); + additional["processing_time_us"] = processing_time; + end_event.additional_data = additional.dump(); + } catch (...) { + // If JSON parsing failed, create a new object + nlohmann::json additional; + additional["processing_time_us"] = processing_time; + additional["thread_info"] = thread_info; + end_event.additional_data = additional.dump(); + } + + PerformanceMonitor::getInstance().recordEvent(end_event); + if (any_exec.timer) { auto wait_mutex = MultithreadTimedExecutor::wait_mutex_set_[this]; auto high_priority_wait_mutex = wait_mutex->get_high_priority_lockable(); @@ -118,7 +280,6 @@ void MultithreadTimedExecutor::run(size_t _thread_number) { auto wait_mutex = MultithreadTimedExecutor::wait_mutex_set_[this]; auto low_priority_wait_mutex = wait_mutex->get_low_priority_lockable(); std::lock_guard wait_lock(low_priority_wait_mutex); - // std::shared_ptr> prio_memory_strategy_ = std::dynamic_pointer_cast(memory_strategy_); prio_memory_strategy_->post_execute(any_exec, _thread_number); } } diff --git a/src/priority_executor/src/performance_monitor.cpp b/src/priority_executor/src/performance_monitor.cpp new file mode 100644 index 0000000..1c5f569 --- /dev/null +++ b/src/priority_executor/src/performance_monitor.cpp @@ -0,0 +1,149 @@ +#include "priority_executor/performance_monitor.hpp" +#include +#include +#include +#include +#include + +namespace priority_executor { + +PerformanceMonitor& PerformanceMonitor::getInstance() { + static PerformanceMonitor instance; + return instance; +} + +PerformanceMonitor::PerformanceMonitor() { + event_buffer_.reserve(buffer_size_); + std::filesystem::create_directories(dump_path_); +} + +void PerformanceMonitor::recordEvent(const PerformanceEvent& event) { + if (!enabled_) return; + + { + std::lock_guard lock(event_mutex_); + event_buffer_.push_back(event); + + if (auto_dump_threshold_ > 0 && event_buffer_.size() >= auto_dump_threshold_) { + checkAndAutoDump(); + } + } +} + +void PerformanceMonitor::setBufferSize(size_t size) { + std::lock_guard lock(event_mutex_); + buffer_size_ = size; + event_buffer_.reserve(size); +} + +void PerformanceMonitor::setAutoDumpThreshold(size_t threshold) { + auto_dump_threshold_ = threshold; +} + +void PerformanceMonitor::setDumpPath(const std::string& path) { + dump_path_ = path; + std::filesystem::create_directories(dump_path_); +} + +void PerformanceMonitor::enable(bool enabled) { + enabled_ = enabled; +} + +void PerformanceMonitor::clear() { + std::lock_guard lock(event_mutex_); + event_buffer_.clear(); +} + +std::string PerformanceMonitor::eventsToJson() const { + nlohmann::json root = nlohmann::json::array(); + + for (const auto& event : event_buffer_) { + nlohmann::json event_json; + + event_json["timestamp"] = event.timestamp; + + switch (event.type) { + case PerformanceEventType::CALLBACK_READY: + event_json["type"] = "callback_ready"; + break; + case PerformanceEventType::CALLBACK_START: + event_json["type"] = "callback_start"; + break; + case PerformanceEventType::CALLBACK_END: + event_json["type"] = "callback_end"; + break; + case PerformanceEventType::DEADLINE_MISSED: + event_json["type"] = "deadline_missed"; + break; + case PerformanceEventType::DEADLINE_MET: + event_json["type"] = "deadline_met"; + break; + case PerformanceEventType::CHAIN_START: + event_json["type"] = "chain_start"; + break; + case PerformanceEventType::CHAIN_END: + event_json["type"] = "chain_end"; + break; + } + + event_json["node_name"] = event.node_name; + event_json["callback_name"] = event.callback_name; + event_json["chain_id"] = event.chain_id; + event_json["is_first_in_chain"] = event.is_first_in_chain; + event_json["is_last_in_chain"] = event.is_last_in_chain; + + if (event.deadline > 0) { + event_json["deadline"] = event.deadline; + } + + if (event.processing_time > 0) { + event_json["processing_time"] = event.processing_time; + } + + event_json["executor_id"] = event.executor_id; + + if (!event.additional_data.empty()) { + try { + event_json["additional_data"] = nlohmann::json::parse(event.additional_data); + } catch (...) { + event_json["additional_data"] = event.additional_data; + } + } + + root.push_back(event_json); + } + + return root.dump(2); // Indent with 2 spaces +} + +bool PerformanceMonitor::dumpToFile(const std::string& filename) { + std::lock_guard lock(event_mutex_); + + if (event_buffer_.empty()) { + return true; // Nothing to dump + } + + std::string full_path = dump_path_ + "/" + filename; + std::ofstream file(full_path); + if (!file.is_open()) { + return false; + } + + file << eventsToJson(); + event_buffer_.clear(); + return true; +} + +void PerformanceMonitor::checkAndAutoDump() { + if (event_buffer_.size() >= auto_dump_threshold_) { + auto now = std::chrono::system_clock::now(); + auto timestamp = std::chrono::duration_cast( + now.time_since_epoch()).count(); + + std::stringstream ss; + ss << "performance_log_" << timestamp << ".json"; + dumpToFile(ss.str()); + } +} + +} // namespace priority_executor \ No newline at end of file