windows Sun Apr 13 20:58:23 WEDT 2025
This commit is contained in:
parent
f53b7047f1
commit
d7ecdf0108
6 changed files with 605 additions and 38 deletions
|
@ -19,11 +19,16 @@ find_package(rmw REQUIRED)
|
|||
find_package(std_msgs REQUIRED)
|
||||
find_package(std_srvs REQUIRED)
|
||||
find_package(simple_timer REQUIRED)
|
||||
find_package(nlohmann_json REQUIRED)
|
||||
|
||||
# Library targets
|
||||
add_library(priority_executor
|
||||
src/priority_executor.cpp
|
||||
src/priority_memory_strategy.cpp
|
||||
src/performance_monitor.cpp
|
||||
src/default_executor.cpp
|
||||
src/usage_example.cpp
|
||||
src/multithread_priority_executor.cpp
|
||||
)
|
||||
target_include_directories(priority_executor PUBLIC
|
||||
$<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/include>
|
||||
|
@ -34,35 +39,7 @@ ament_target_dependencies(priority_executor
|
|||
rclcpp
|
||||
rcl
|
||||
simple_timer
|
||||
)
|
||||
|
||||
add_library(multithread_priority_executor
|
||||
src/multithread_priority_executor.cpp
|
||||
src/priority_memory_strategy.cpp
|
||||
)
|
||||
target_include_directories(multithread_priority_executor PUBLIC
|
||||
$<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/include>
|
||||
$<INSTALL_INTERFACE:include>
|
||||
)
|
||||
ament_target_dependencies(multithread_priority_executor
|
||||
rmw
|
||||
rclcpp
|
||||
rcl
|
||||
simple_timer
|
||||
)
|
||||
|
||||
add_library(default_executor
|
||||
src/default_executor.cpp
|
||||
)
|
||||
target_include_directories(default_executor PUBLIC
|
||||
$<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/include>
|
||||
$<INSTALL_INTERFACE:include>
|
||||
)
|
||||
ament_target_dependencies(default_executor
|
||||
rmw
|
||||
rclcpp
|
||||
rcl
|
||||
simple_timer
|
||||
nlohmann_json
|
||||
)
|
||||
|
||||
# Example executable
|
||||
|
@ -75,8 +52,6 @@ target_include_directories(usage_example PUBLIC
|
|||
)
|
||||
target_link_libraries(usage_example
|
||||
priority_executor
|
||||
multithread_priority_executor
|
||||
default_executor
|
||||
)
|
||||
ament_target_dependencies(usage_example
|
||||
rclcpp
|
||||
|
@ -98,7 +73,7 @@ install(
|
|||
)
|
||||
|
||||
install(
|
||||
TARGETS priority_executor multithread_priority_executor default_executor
|
||||
TARGETS priority_executor
|
||||
EXPORT export_${PROJECT_NAME}
|
||||
ARCHIVE DESTINATION lib
|
||||
LIBRARY DESTINATION lib
|
||||
|
@ -114,5 +89,5 @@ install(
|
|||
# Export and package configuration
|
||||
ament_export_include_directories(include)
|
||||
ament_export_targets(export_${PROJECT_NAME} HAS_LIBRARY_TARGET)
|
||||
ament_export_dependencies(rclcpp rcl rmw simple_timer)
|
||||
ament_export_dependencies(rclcpp rcl rmw simple_timer nlohmann_json)
|
||||
ament_package()
|
||||
|
|
|
@ -37,13 +37,13 @@ public:
|
|||
RCLCPP_PUBLIC void spin() override;
|
||||
|
||||
protected:
|
||||
RCLCPP_PUBLIC void run(size_t _thread_number);
|
||||
RCLCPP_PUBLIC void run(size_t thread_number);
|
||||
|
||||
private:
|
||||
RCLCPP_DISABLE_COPY(MultithreadTimedExecutor)
|
||||
std::set<rclcpp::TimerBase::SharedPtr> scheduled_timers_;
|
||||
size_t number_of_threads_;
|
||||
std::chrono::nanoseconds next_exec_timeout_ = std::chrono::nanoseconds(-1);
|
||||
node_time_logger logger_;
|
||||
std::chrono::nanoseconds next_exec_timeout_{std::chrono::nanoseconds(-1)};
|
||||
|
||||
static std::unordered_map<MultithreadTimedExecutor*,
|
||||
std::shared_ptr<rclcpp::detail::MutexTwoPriorities>>
|
||||
|
|
|
@ -0,0 +1,237 @@
|
|||
#ifndef PERFORMANCE_MONITOR_HPP_
|
||||
#define PERFORMANCE_MONITOR_HPP_
|
||||
|
||||
#include <atomic>
|
||||
#include <chrono>
|
||||
#include <deque>
|
||||
#include <mutex>
|
||||
#include <string>
|
||||
#include <unordered_map>
|
||||
#include <vector>
|
||||
#include <nlohmann/json.hpp>
|
||||
#include "rclcpp/rclcpp.hpp"
|
||||
#include "simple_timer/rt-sched.hpp"
|
||||
|
||||
namespace priority_executor {
|
||||
|
||||
enum class PerformanceEventType {
|
||||
CALLBACK_READY,
|
||||
CALLBACK_START,
|
||||
CALLBACK_END,
|
||||
DEADLINE_MISSED,
|
||||
DEADLINE_MET,
|
||||
CHAIN_START,
|
||||
CHAIN_END
|
||||
};
|
||||
|
||||
struct PerformanceEvent {
|
||||
uint64_t timestamp; // High-resolution timestamp (microseconds)
|
||||
PerformanceEventType type; // Type of event
|
||||
std::string node_name; // Name of the node
|
||||
std::string callback_name; // Name of the callback
|
||||
int chain_id{-1}; // Chain identifier
|
||||
bool is_first_in_chain{false}; // First callback in chain
|
||||
bool is_last_in_chain{false}; // Last callback in chain
|
||||
uint64_t deadline{0}; // Deadline (if applicable)
|
||||
uint64_t processing_time{0}; // Time spent executing (if applicable)
|
||||
int executor_id{0}; // Executor identifier
|
||||
std::string additional_data; // JSON string for extensible data
|
||||
|
||||
// Helper method to extract callback type from additional_data
|
||||
std::string get_callback_type() const {
|
||||
if (additional_data.empty()) return "unknown";
|
||||
|
||||
try {
|
||||
nlohmann::json json = nlohmann::json::parse(additional_data);
|
||||
if (json.contains("callback_type")) {
|
||||
return json["callback_type"].get<std::string>();
|
||||
}
|
||||
} catch (...) {}
|
||||
|
||||
return "unknown";
|
||||
}
|
||||
|
||||
// Helper method to extract thread info from additional_data
|
||||
int get_thread_id() const {
|
||||
if (additional_data.empty()) return -1;
|
||||
|
||||
try {
|
||||
nlohmann::json json = nlohmann::json::parse(additional_data);
|
||||
if (json.contains("thread_info") && json["thread_info"].is_object() &&
|
||||
json["thread_info"].contains("thread_id")) {
|
||||
return json["thread_info"]["thread_id"].get<int>();
|
||||
}
|
||||
// Direct thread_id if present
|
||||
if (json.contains("thread_id")) {
|
||||
return json["thread_id"].get<int>();
|
||||
}
|
||||
} catch (...) {}
|
||||
|
||||
return -1;
|
||||
}
|
||||
|
||||
// Helper method to create a descriptive identifier for this callback
|
||||
std::string get_descriptive_id() const {
|
||||
std::stringstream ss;
|
||||
ss << node_name;
|
||||
|
||||
if (!callback_name.empty()) {
|
||||
ss << "/" << callback_name;
|
||||
}
|
||||
|
||||
if (chain_id >= 0) {
|
||||
ss << " (chain:" << chain_id;
|
||||
if (is_first_in_chain) ss << " start";
|
||||
if (is_last_in_chain) ss << " end";
|
||||
ss << ")";
|
||||
}
|
||||
|
||||
return ss.str();
|
||||
}
|
||||
|
||||
// Convert additional_data to JSON object
|
||||
nlohmann::json get_additional_data_json() const {
|
||||
try {
|
||||
if (!additional_data.empty()) {
|
||||
return nlohmann::json::parse(additional_data);
|
||||
}
|
||||
} catch (...) {}
|
||||
|
||||
return nlohmann::json::object();
|
||||
}
|
||||
};
|
||||
|
||||
class PerformanceMonitor {
|
||||
public:
|
||||
static PerformanceMonitor& getInstance();
|
||||
|
||||
void recordEvent(const PerformanceEvent& event);
|
||||
void setBufferSize(size_t size);
|
||||
void setAutoDumpThreshold(size_t threshold);
|
||||
void setDumpPath(const std::string& path);
|
||||
bool dumpToFile(const std::string& filename);
|
||||
void enable(bool enabled = true);
|
||||
void clear();
|
||||
|
||||
// Helper method to set monitoring options in one call
|
||||
void setMonitoringOptions(size_t buffer_size, size_t auto_dump_threshold,
|
||||
const std::string& dump_path) {
|
||||
setBufferSize(buffer_size);
|
||||
setAutoDumpThreshold(auto_dump_threshold);
|
||||
setDumpPath(dump_path);
|
||||
}
|
||||
|
||||
// Get all events for a specific callback
|
||||
std::vector<PerformanceEvent> getEventsForCallback(const std::string& node_name,
|
||||
const std::string& callback_name) const {
|
||||
std::vector<PerformanceEvent> result;
|
||||
std::lock_guard<std::mutex> lock(event_mutex_);
|
||||
|
||||
for (const auto& event : event_buffer_) {
|
||||
if (event.node_name == node_name && event.callback_name == callback_name) {
|
||||
result.push_back(event);
|
||||
}
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
// Get all events for a specific chain
|
||||
std::vector<PerformanceEvent> getEventsForChain(int chain_id) const {
|
||||
std::vector<PerformanceEvent> result;
|
||||
std::lock_guard<std::mutex> lock(event_mutex_);
|
||||
|
||||
for (const auto& event : event_buffer_) {
|
||||
if (event.chain_id == chain_id) {
|
||||
result.push_back(event);
|
||||
}
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
// Get all events by type
|
||||
std::vector<PerformanceEvent> getEventsByType(PerformanceEventType type) const {
|
||||
std::vector<PerformanceEvent> result;
|
||||
std::lock_guard<std::mutex> lock(event_mutex_);
|
||||
|
||||
for (const auto& event : event_buffer_) {
|
||||
if (event.type == type) {
|
||||
result.push_back(event);
|
||||
}
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
// Get a formatted report of callbacks and their execution times
|
||||
std::string getCallbackExecutionReport() const {
|
||||
std::lock_guard<std::mutex> lock(event_mutex_);
|
||||
std::stringstream ss;
|
||||
|
||||
ss << "Callback Execution Report:\n";
|
||||
ss << "=========================\n\n";
|
||||
|
||||
// Maps to store total execution time and count per callback
|
||||
std::unordered_map<std::string, uint64_t> total_time;
|
||||
std::unordered_map<std::string, int> call_count;
|
||||
std::unordered_map<std::string, uint64_t> max_time;
|
||||
std::unordered_map<std::string, uint64_t> min_time;
|
||||
|
||||
// Process all CALLBACK_END events (they contain processing_time)
|
||||
for (const auto& event : event_buffer_) {
|
||||
if (event.type == PerformanceEventType::CALLBACK_END && event.processing_time > 0) {
|
||||
std::string callback_id = event.get_descriptive_id();
|
||||
total_time[callback_id] += event.processing_time;
|
||||
call_count[callback_id]++;
|
||||
max_time[callback_id] = std::max(max_time[callback_id], event.processing_time);
|
||||
if (min_time[callback_id] == 0 || event.processing_time < min_time[callback_id]) {
|
||||
min_time[callback_id] = event.processing_time;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Print results sorted by callback id
|
||||
std::vector<std::string> callback_ids;
|
||||
for (const auto& entry : call_count) {
|
||||
callback_ids.push_back(entry.first);
|
||||
}
|
||||
|
||||
std::sort(callback_ids.begin(), callback_ids.end());
|
||||
|
||||
for (const auto& id : callback_ids) {
|
||||
double avg_time = static_cast<double>(total_time[id]) / call_count[id];
|
||||
ss << id << "\n";
|
||||
ss << " Calls: " << call_count[id] << "\n";
|
||||
ss << " Avg time: " << avg_time << " µs\n";
|
||||
ss << " Min time: " << min_time[id] << " µs\n";
|
||||
ss << " Max time: " << max_time[id] << " µs\n";
|
||||
ss << " Total time: " << total_time[id] << " µs\n\n";
|
||||
}
|
||||
|
||||
return ss.str();
|
||||
}
|
||||
|
||||
private:
|
||||
PerformanceMonitor();
|
||||
~PerformanceMonitor() = default;
|
||||
|
||||
PerformanceMonitor(const PerformanceMonitor&) = delete;
|
||||
PerformanceMonitor& operator=(const PerformanceMonitor&) = delete;
|
||||
PerformanceMonitor(PerformanceMonitor&&) = delete;
|
||||
PerformanceMonitor& operator=(PerformanceMonitor&&) = delete;
|
||||
|
||||
mutable std::mutex event_mutex_;
|
||||
std::vector<PerformanceEvent> event_buffer_;
|
||||
size_t buffer_size_{10000};
|
||||
size_t auto_dump_threshold_{0};
|
||||
std::string dump_path_{"performance_logs"};
|
||||
std::atomic<bool> enabled_{true};
|
||||
|
||||
std::string eventsToJson() const;
|
||||
void checkAndAutoDump();
|
||||
};
|
||||
|
||||
} // namespace priority_executor
|
||||
|
||||
#endif // PERFORMANCE_MONITOR_HPP_
|
|
@ -20,6 +20,7 @@
|
|||
#include <cassert>
|
||||
#include <cstdlib>
|
||||
#include <memory>
|
||||
#include <string>
|
||||
|
||||
#include "rclcpp/executor.hpp"
|
||||
#include "rclcpp/macros.hpp"
|
||||
|
@ -27,6 +28,7 @@
|
|||
|
||||
#include "priority_executor/priority_memory_strategy.hpp"
|
||||
#include "priority_executor/default_executor.hpp"
|
||||
#include "priority_executor/performance_monitor.hpp"
|
||||
|
||||
namespace priority_executor {
|
||||
|
||||
|
@ -63,6 +65,43 @@ public:
|
|||
void set_use_priorities(bool use_prio);
|
||||
std::shared_ptr<PriorityMemoryStrategy<>> prio_memory_strategy_{nullptr};
|
||||
|
||||
// Performance monitoring control
|
||||
void enableMonitoring(bool enable = true) {
|
||||
PerformanceMonitor::getInstance().enable(enable);
|
||||
}
|
||||
|
||||
void setMonitoringOptions(
|
||||
size_t buffer_size,
|
||||
size_t auto_dump_threshold,
|
||||
const std::string& dump_path) {
|
||||
auto& monitor = PerformanceMonitor::getInstance();
|
||||
monitor.setBufferSize(buffer_size);
|
||||
monitor.setAutoDumpThreshold(auto_dump_threshold);
|
||||
monitor.setDumpPath(dump_path);
|
||||
}
|
||||
|
||||
// Get a formatted report of all callback execution times
|
||||
std::string getCallbackExecutionReport() const {
|
||||
auto& monitor = PerformanceMonitor::getInstance();
|
||||
return monitor.getCallbackExecutionReport();
|
||||
}
|
||||
|
||||
// Manually trigger a log dump with a specific filename
|
||||
bool dumpMonitoringData(const std::string& filename) {
|
||||
auto& monitor = PerformanceMonitor::getInstance();
|
||||
return monitor.dumpToFile(filename);
|
||||
}
|
||||
|
||||
// Get executor ID for identification in logs
|
||||
int getExecutorId() const {
|
||||
return executor_id_;
|
||||
}
|
||||
|
||||
// Get executor name for identification in logs
|
||||
const std::string& getExecutorName() const {
|
||||
return name;
|
||||
}
|
||||
|
||||
protected:
|
||||
bool get_next_executable(rclcpp::AnyExecutable& any_executable,
|
||||
std::chrono::nanoseconds timeout = std::chrono::nanoseconds(-1));
|
||||
|
@ -72,8 +111,14 @@ private:
|
|||
|
||||
void wait_for_work(std::chrono::nanoseconds timeout);
|
||||
bool get_next_ready_executable(rclcpp::AnyExecutable& any_executable);
|
||||
void recordExecutionEvent(const rclcpp::AnyExecutable& any_exec,
|
||||
PerformanceEventType event_type,
|
||||
uint64_t timestamp,
|
||||
uint64_t processing_time = 0);
|
||||
|
||||
bool use_priorities{true};
|
||||
static std::atomic<int> next_executor_id_;
|
||||
int executor_id_;
|
||||
};
|
||||
|
||||
} // namespace priority_executor
|
||||
|
|
|
@ -1,4 +1,5 @@
|
|||
#include "priority_executor/multithread_priority_executor.hpp"
|
||||
#include "priority_executor/performance_monitor.hpp"
|
||||
|
||||
#include <chrono>
|
||||
#include <functional>
|
||||
|
@ -7,6 +8,7 @@
|
|||
#include <sstream>
|
||||
#include <thread>
|
||||
#include <unordered_map>
|
||||
#include <nlohmann/json.hpp>
|
||||
|
||||
#include "rcpputils/scope_exit.hpp"
|
||||
#include "rclcpp/any_executable.hpp"
|
||||
|
@ -58,6 +60,7 @@ void MultithreadTimedExecutor::spin() {
|
|||
for (auto& thread : threads) {
|
||||
thread.join();
|
||||
}
|
||||
spinning.store(false);
|
||||
}
|
||||
|
||||
void MultithreadTimedExecutor::run(size_t _thread_number) {
|
||||
|
@ -68,9 +71,16 @@ void MultithreadTimedExecutor::run(size_t _thread_number) {
|
|||
int rc = pthread_setaffinity_np(pthread_self(), sizeof(cpu_set_t), &cpuset);
|
||||
if (rc != 0)
|
||||
{
|
||||
std::cout << "Error calling pthread_setaffinity_np: " << rc << "\n";
|
||||
RCLCPP_ERROR(rclcpp::get_logger("priority_executor"),
|
||||
"Error calling pthread_setaffinity_np: %d", rc);
|
||||
}
|
||||
|
||||
// Create thread context information for logs
|
||||
nlohmann::json thread_info;
|
||||
thread_info["thread_id"] = _thread_number;
|
||||
thread_info["cpu_affinity"] = _thread_number;
|
||||
std::string thread_info_str = thread_info.dump();
|
||||
|
||||
while (rclcpp::ok(this->context_) && spinning.load()) {
|
||||
rclcpp::AnyExecutable any_exec;
|
||||
{
|
||||
|
@ -98,8 +108,160 @@ void MultithreadTimedExecutor::run(size_t _thread_number) {
|
|||
}
|
||||
}
|
||||
|
||||
// Record execution start with thread info
|
||||
auto start = std::chrono::steady_clock::now();
|
||||
uint64_t start_time = std::chrono::duration_cast<std::chrono::microseconds>(
|
||||
start.time_since_epoch()).count();
|
||||
|
||||
PerformanceEvent start_event;
|
||||
start_event.timestamp = start_time;
|
||||
start_event.type = PerformanceEventType::CALLBACK_START;
|
||||
start_event.executor_id = getExecutorId();
|
||||
|
||||
if (any_exec.node_base) {
|
||||
start_event.node_name = any_exec.node_base->get_name();
|
||||
|
||||
// Enhanced callback identification based on executable type
|
||||
if (any_exec.timer) {
|
||||
// For timer callbacks
|
||||
std::stringstream ss;
|
||||
|
||||
// retrieve the timer period
|
||||
int64_t period;
|
||||
rcl_timer_get_period(any_exec.timer->get_timer_handle().get(), &period);
|
||||
|
||||
// convert period (which is in nanoseconds) to milliseconds
|
||||
std::chrono::nanoseconds period_ns(period);
|
||||
std::chrono::milliseconds period_ms = std::chrono::duration_cast<std::chrono::milliseconds>(period_ns);
|
||||
|
||||
ss << "timer_" << period_ms.count() << "ms";
|
||||
start_event.callback_name = ss.str();
|
||||
|
||||
// Add memory address as unique identifier
|
||||
std::stringstream addr;
|
||||
addr << "@" << any_exec.timer.get();
|
||||
nlohmann::json additional;
|
||||
additional["callback_type"] = "timer";
|
||||
additional["timer_addr"] = addr.str();
|
||||
additional["thread_info"] = thread_info;
|
||||
start_event.additional_data = additional.dump();
|
||||
}
|
||||
else if (any_exec.subscription) {
|
||||
// For subscription callbacks
|
||||
start_event.callback_name = any_exec.subscription->get_topic_name();
|
||||
|
||||
nlohmann::json additional;
|
||||
additional["callback_type"] = "subscription";
|
||||
additional["thread_info"] = thread_info;
|
||||
start_event.additional_data = additional.dump();
|
||||
}
|
||||
else if (any_exec.service) {
|
||||
start_event.callback_name = any_exec.service->get_service_name();
|
||||
|
||||
nlohmann::json additional;
|
||||
additional["callback_type"] = "service";
|
||||
additional["thread_info"] = thread_info;
|
||||
start_event.additional_data = additional.dump();
|
||||
}
|
||||
else if (any_exec.client) {
|
||||
start_event.callback_name = "client_callback";
|
||||
|
||||
nlohmann::json additional;
|
||||
additional["callback_type"] = "client";
|
||||
additional["thread_info"] = thread_info;
|
||||
start_event.additional_data = additional.dump();
|
||||
}
|
||||
else if (any_exec.waitable) {
|
||||
start_event.callback_name = "waitable_callback";
|
||||
|
||||
nlohmann::json additional;
|
||||
additional["callback_type"] = "waitable";
|
||||
additional["thread_info"] = thread_info;
|
||||
start_event.additional_data = additional.dump();
|
||||
}
|
||||
|
||||
// Add chain information
|
||||
std::shared_ptr<PriorityExecutable> executable = nullptr;
|
||||
if (prio_memory_strategy_) {
|
||||
if (any_exec.timer) {
|
||||
executable = prio_memory_strategy_->get_priority_settings(
|
||||
any_exec.timer->get_timer_handle());
|
||||
} else if (any_exec.subscription) {
|
||||
executable = prio_memory_strategy_->get_priority_settings(
|
||||
any_exec.subscription->get_subscription_handle());
|
||||
}
|
||||
}
|
||||
|
||||
if (executable) {
|
||||
start_event.chain_id = executable->chain_id;
|
||||
start_event.is_first_in_chain = executable->is_first_in_chain;
|
||||
start_event.is_last_in_chain = executable->is_last_in_chain;
|
||||
|
||||
if (executable->deadlines && !executable->deadlines->empty()) {
|
||||
start_event.deadline = executable->deadlines->front();
|
||||
}
|
||||
|
||||
// Include the priority information
|
||||
try {
|
||||
nlohmann::json additional = start_event.additional_data.empty() ?
|
||||
nlohmann::json() : nlohmann::json::parse(start_event.additional_data);
|
||||
|
||||
additional["priority"] = executable->priority;
|
||||
|
||||
if (executable->chain_id >= 0) {
|
||||
additional["chain_id"] = executable->chain_id;
|
||||
}
|
||||
|
||||
if (executable->sched_type != ExecutableScheduleType::DEFAULT) {
|
||||
additional["schedule_type"] = static_cast<int>(executable->sched_type);
|
||||
}
|
||||
|
||||
start_event.additional_data = additional.dump();
|
||||
} catch (...) {
|
||||
// If there was an error parsing the JSON, create new JSON object
|
||||
nlohmann::json additional;
|
||||
additional["priority"] = executable->priority;
|
||||
additional["thread_info"] = thread_info;
|
||||
start_event.additional_data = additional.dump();
|
||||
}
|
||||
}
|
||||
} else {
|
||||
start_event.additional_data = thread_info_str;
|
||||
}
|
||||
|
||||
PerformanceMonitor::getInstance().recordEvent(start_event);
|
||||
|
||||
// Execute the callback
|
||||
execute_any_executable(any_exec);
|
||||
|
||||
// Record execution completion
|
||||
auto end = std::chrono::steady_clock::now();
|
||||
uint64_t end_time = std::chrono::duration_cast<std::chrono::microseconds>(
|
||||
end.time_since_epoch()).count();
|
||||
uint64_t processing_time = end_time - start_time;
|
||||
|
||||
// Create identical event structure but for end event
|
||||
PerformanceEvent end_event = start_event;
|
||||
end_event.timestamp = end_time;
|
||||
end_event.type = PerformanceEventType::CALLBACK_END;
|
||||
end_event.processing_time = processing_time;
|
||||
|
||||
// Update the additional data to include processing time
|
||||
try {
|
||||
nlohmann::json additional = end_event.additional_data.empty() ?
|
||||
nlohmann::json() : nlohmann::json::parse(end_event.additional_data);
|
||||
additional["processing_time_us"] = processing_time;
|
||||
end_event.additional_data = additional.dump();
|
||||
} catch (...) {
|
||||
// If JSON parsing failed, create a new object
|
||||
nlohmann::json additional;
|
||||
additional["processing_time_us"] = processing_time;
|
||||
additional["thread_info"] = thread_info;
|
||||
end_event.additional_data = additional.dump();
|
||||
}
|
||||
|
||||
PerformanceMonitor::getInstance().recordEvent(end_event);
|
||||
|
||||
if (any_exec.timer) {
|
||||
auto wait_mutex = MultithreadTimedExecutor::wait_mutex_set_[this];
|
||||
auto high_priority_wait_mutex = wait_mutex->get_high_priority_lockable();
|
||||
|
@ -118,7 +280,6 @@ void MultithreadTimedExecutor::run(size_t _thread_number) {
|
|||
auto wait_mutex = MultithreadTimedExecutor::wait_mutex_set_[this];
|
||||
auto low_priority_wait_mutex = wait_mutex->get_low_priority_lockable();
|
||||
std::lock_guard<rclcpp::detail::MutexTwoPriorities::LowPriorityLockable> wait_lock(low_priority_wait_mutex);
|
||||
// std::shared_ptr<PriorityMemoryStrategy<>> prio_memory_strategy_ = std::dynamic_pointer_cast<PriorityMemoryStrategy>(memory_strategy_);
|
||||
prio_memory_strategy_->post_execute(any_exec, _thread_number);
|
||||
}
|
||||
}
|
||||
|
|
149
src/priority_executor/src/performance_monitor.cpp
Normal file
149
src/priority_executor/src/performance_monitor.cpp
Normal file
|
@ -0,0 +1,149 @@
|
|||
#include "priority_executor/performance_monitor.hpp"
|
||||
#include <filesystem>
|
||||
#include <fstream>
|
||||
#include <nlohmann/json.hpp>
|
||||
#include <sstream>
|
||||
#include <iomanip>
|
||||
|
||||
namespace priority_executor {
|
||||
|
||||
PerformanceMonitor& PerformanceMonitor::getInstance() {
|
||||
static PerformanceMonitor instance;
|
||||
return instance;
|
||||
}
|
||||
|
||||
PerformanceMonitor::PerformanceMonitor() {
|
||||
event_buffer_.reserve(buffer_size_);
|
||||
std::filesystem::create_directories(dump_path_);
|
||||
}
|
||||
|
||||
void PerformanceMonitor::recordEvent(const PerformanceEvent& event) {
|
||||
if (!enabled_) return;
|
||||
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(event_mutex_);
|
||||
event_buffer_.push_back(event);
|
||||
|
||||
if (auto_dump_threshold_ > 0 && event_buffer_.size() >= auto_dump_threshold_) {
|
||||
checkAndAutoDump();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void PerformanceMonitor::setBufferSize(size_t size) {
|
||||
std::lock_guard<std::mutex> lock(event_mutex_);
|
||||
buffer_size_ = size;
|
||||
event_buffer_.reserve(size);
|
||||
}
|
||||
|
||||
void PerformanceMonitor::setAutoDumpThreshold(size_t threshold) {
|
||||
auto_dump_threshold_ = threshold;
|
||||
}
|
||||
|
||||
void PerformanceMonitor::setDumpPath(const std::string& path) {
|
||||
dump_path_ = path;
|
||||
std::filesystem::create_directories(dump_path_);
|
||||
}
|
||||
|
||||
void PerformanceMonitor::enable(bool enabled) {
|
||||
enabled_ = enabled;
|
||||
}
|
||||
|
||||
void PerformanceMonitor::clear() {
|
||||
std::lock_guard<std::mutex> lock(event_mutex_);
|
||||
event_buffer_.clear();
|
||||
}
|
||||
|
||||
std::string PerformanceMonitor::eventsToJson() const {
|
||||
nlohmann::json root = nlohmann::json::array();
|
||||
|
||||
for (const auto& event : event_buffer_) {
|
||||
nlohmann::json event_json;
|
||||
|
||||
event_json["timestamp"] = event.timestamp;
|
||||
|
||||
switch (event.type) {
|
||||
case PerformanceEventType::CALLBACK_READY:
|
||||
event_json["type"] = "callback_ready";
|
||||
break;
|
||||
case PerformanceEventType::CALLBACK_START:
|
||||
event_json["type"] = "callback_start";
|
||||
break;
|
||||
case PerformanceEventType::CALLBACK_END:
|
||||
event_json["type"] = "callback_end";
|
||||
break;
|
||||
case PerformanceEventType::DEADLINE_MISSED:
|
||||
event_json["type"] = "deadline_missed";
|
||||
break;
|
||||
case PerformanceEventType::DEADLINE_MET:
|
||||
event_json["type"] = "deadline_met";
|
||||
break;
|
||||
case PerformanceEventType::CHAIN_START:
|
||||
event_json["type"] = "chain_start";
|
||||
break;
|
||||
case PerformanceEventType::CHAIN_END:
|
||||
event_json["type"] = "chain_end";
|
||||
break;
|
||||
}
|
||||
|
||||
event_json["node_name"] = event.node_name;
|
||||
event_json["callback_name"] = event.callback_name;
|
||||
event_json["chain_id"] = event.chain_id;
|
||||
event_json["is_first_in_chain"] = event.is_first_in_chain;
|
||||
event_json["is_last_in_chain"] = event.is_last_in_chain;
|
||||
|
||||
if (event.deadline > 0) {
|
||||
event_json["deadline"] = event.deadline;
|
||||
}
|
||||
|
||||
if (event.processing_time > 0) {
|
||||
event_json["processing_time"] = event.processing_time;
|
||||
}
|
||||
|
||||
event_json["executor_id"] = event.executor_id;
|
||||
|
||||
if (!event.additional_data.empty()) {
|
||||
try {
|
||||
event_json["additional_data"] = nlohmann::json::parse(event.additional_data);
|
||||
} catch (...) {
|
||||
event_json["additional_data"] = event.additional_data;
|
||||
}
|
||||
}
|
||||
|
||||
root.push_back(event_json);
|
||||
}
|
||||
|
||||
return root.dump(2); // Indent with 2 spaces
|
||||
}
|
||||
|
||||
bool PerformanceMonitor::dumpToFile(const std::string& filename) {
|
||||
std::lock_guard<std::mutex> lock(event_mutex_);
|
||||
|
||||
if (event_buffer_.empty()) {
|
||||
return true; // Nothing to dump
|
||||
}
|
||||
|
||||
std::string full_path = dump_path_ + "/" + filename;
|
||||
std::ofstream file(full_path);
|
||||
if (!file.is_open()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
file << eventsToJson();
|
||||
event_buffer_.clear();
|
||||
return true;
|
||||
}
|
||||
|
||||
void PerformanceMonitor::checkAndAutoDump() {
|
||||
if (event_buffer_.size() >= auto_dump_threshold_) {
|
||||
auto now = std::chrono::system_clock::now();
|
||||
auto timestamp = std::chrono::duration_cast<std::chrono::milliseconds>(
|
||||
now.time_since_epoch()).count();
|
||||
|
||||
std::stringstream ss;
|
||||
ss << "performance_log_" << timestamp << ".json";
|
||||
dumpToFile(ss.str());
|
||||
}
|
||||
}
|
||||
|
||||
} // namespace priority_executor
|
Loading…
Add table
Add a link
Reference in a new issue