Compare commits

..

1 commit
main ... orig

Author SHA1 Message Date
Kurt Wilson
20c1f68d41
add simple timer package needed to build 2024-11-09 14:32:52 -05:00
22 changed files with 1951 additions and 2554 deletions

View file

@ -1 +1 @@
This contains the priority_executor package that adds an Executor with deadline and chain-aware priority support. The `usage_example.cpp` file shows the needed steps to use the executor. This contains the priority_executor package that adds an Executor with deadline and chain-aware priority support. The current usage example is out-of-date

View file

@ -1,55 +1,71 @@
cmake_minimum_required(VERSION 3.8) cmake_minimum_required(VERSION 3.5)
project(priority_executor VERSION 0.1.0) project(priority_executor)
# Set C++ standards # Default to C99
set(CMAKE_CXX_STANDARD 17) if(NOT CMAKE_C_STANDARD)
set(CMAKE_CXX_STANDARD_REQUIRED ON) set(CMAKE_C_STANDARD 99)
set(CMAKE_CXX_EXTENSIONS OFF) endif()
# Default to C++14
if(NOT CMAKE_CXX_STANDARD)
set(CMAKE_CXX_STANDARD 14)
endif()
# Compiler options
if(CMAKE_COMPILER_IS_GNUCXX OR CMAKE_CXX_COMPILER_ID MATCHES "Clang") if(CMAKE_COMPILER_IS_GNUCXX OR CMAKE_CXX_COMPILER_ID MATCHES "Clang")
add_compile_options(-Wall -Wextra -Wpedantic) add_compile_options(-Wall -Wextra -Wpedantic)
endif() endif()
# Find dependencies # find dependencies
find_package(ament_cmake REQUIRED) find_package(ament_cmake REQUIRED)
find_package(rclcpp REQUIRED) find_package(rclcpp REQUIRED)
find_package(rcl REQUIRED) find_package(rcl REQUIRED)
find_package(rmw REQUIRED)
find_package(std_msgs REQUIRED) find_package(std_msgs REQUIRED)
find_package(std_srvs REQUIRED) find_package(std_srvs REQUIRED)
find_package(simple_timer REQUIRED) find_package(simple_timer REQUIRED)
find_package(nlohmann_json REQUIRED)
# Library targets add_library(priority_executor src/priority_executor.cpp src/priority_memory_strategy.cpp)
add_library(priority_executor
src/priority_executor.cpp
src/priority_memory_strategy.cpp
src/performance_monitor.cpp
src/default_executor.cpp
src/usage_example.cpp
src/multithread_priority_executor.cpp
)
target_include_directories(priority_executor PUBLIC target_include_directories(priority_executor PUBLIC
$<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/include> $<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/include>
$<INSTALL_INTERFACE:include> $<INSTALL_INTERFACE:include>
) )
# target_link_libraries(priority_executor
# simple_timer
# )
ament_target_dependencies(priority_executor ament_target_dependencies(priority_executor
rmw rmw
rclcpp rclcpp
rcl rcl
simple_timer simple_timer
nlohmann_json
) )
# Example executable add_library(multithread_priority_executor src/multithread_priority_executor.cpp src/priority_memory_strategy.cpp)
add_executable(usage_example target_include_directories(multithread_priority_executor PUBLIC
src/usage_example.cpp
)
target_include_directories(usage_example PUBLIC
$<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/include> $<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/include>
$<INSTALL_INTERFACE:include> $<INSTALL_INTERFACE:include>
) )
ament_target_dependencies(multithread_priority_executor
rmw
rclcpp
rcl
simple_timer
)
add_library(default_executor src/default_executor.cpp)
target_include_directories(default_executor PUBLIC
$<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/include>
$<INSTALL_INTERFACE:include>
)
ament_target_dependencies(default_executor
rmw
rclcpp
rcl
simple_timer
)
add_executable(usage_example src/usage_example.cpp)
target_include_directories(usage_example PUBLIC
$<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/include>
$<INSTALL_INTERFACE:include>)
target_link_libraries(usage_example target_link_libraries(usage_example
priority_executor priority_executor
) )
@ -57,37 +73,36 @@ ament_target_dependencies(usage_example
rclcpp rclcpp
std_msgs std_msgs
std_srvs std_srvs
simple_timer
) )
# Testing
if(BUILD_TESTING)
find_package(ament_lint_auto REQUIRED)
ament_lint_auto_find_test_dependencies()
endif()
# Installation
install( install(
DIRECTORY include/ DIRECTORY include/
DESTINATION include DESTINATION include
) )
install( install(TARGETS usage_example
TARGETS priority_executor DESTINATION lib/${PROJECT_NAME}
EXPORT export_${PROJECT_NAME} )
install(TARGETS priority_executor multithread_priority_executor default_executor
ARCHIVE DESTINATION lib ARCHIVE DESTINATION lib
LIBRARY DESTINATION lib LIBRARY DESTINATION lib
RUNTIME DESTINATION bin RUNTIME DESTINATION bin
INCLUDES DESTINATION include
) )
install( if(BUILD_TESTING)
TARGETS usage_example find_package(ament_lint_auto REQUIRED)
RUNTIME DESTINATION lib/${PROJECT_NAME} # the following line skips the linter which checks for copyrights
) # uncomment the line when a copyright and license is not present in all source files
#set(ament_cmake_copyright_FOUND TRUE)
# the following line skips cpplint (only works in a git repo)
# uncomment the line when this package is not in a git repo
#set(ament_cmake_cpplint_FOUND TRUE)
ament_lint_auto_find_test_dependencies()
endif()
# Export and package configuration
ament_export_include_directories(include) ament_export_include_directories(include)
ament_export_targets(export_${PROJECT_NAME} HAS_LIBRARY_TARGET) ament_export_libraries(priority_executor)
ament_export_dependencies(rclcpp rcl rmw simple_timer nlohmann_json) ament_export_libraries(multithread_priority_executor)
ament_export_libraries(default_executor)
ament_package() ament_package()

View file

@ -12,74 +12,72 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
#ifndef PRIORITY_EXECUTOR__DEFAULT_EXECUTOR_HPP_ #ifndef RTIS_DEFAULT_EXECUTOR
#define PRIORITY_EXECUTOR__DEFAULT_EXECUTOR_HPP_ #define RTIS_DEFAULT_EXECUTOR
#include <rmw/rmw.h> #include <rmw/rmw.h>
#include <set>
#include <memory>
#include <cassert> #include <cassert>
#include <cstdlib> #include <cstdlib>
#include <unordered_map> #include <memory>
#include <vector>
#include <set>
#include "rclcpp/executor.hpp" #include "rclcpp/executor.hpp"
#include "rclcpp/rclcpp.hpp"
#include "rclcpp/macros.hpp" #include "rclcpp/macros.hpp"
#include "rclcpp/memory_strategies.hpp"
#include "rclcpp/node.hpp"
#include "rclcpp/utilities.hpp"
#include "rclcpp/rate.hpp"
#include "rclcpp/visibility_control.hpp" #include "rclcpp/visibility_control.hpp"
#include "rclcpp/detail/mutex_two_priorities.hpp" #include "rclcpp/detail/mutex_two_priorities.hpp"
#include "priority_executor/priority_memory_strategy.hpp"
#include "simple_timer/rt-sched.hpp" #include "simple_timer/rt-sched.hpp"
namespace priority_executor { class RTISTimed
{
class RTISTimed {
public: public:
node_time_logger logger_; node_time_logger logger_;
}; };
class ROSDefaultMultithreadedExecutor : public rclcpp::Executor, public RTISTimed { class ROSDefaultMultithreadedExecutor : public rclcpp::Executor, public RTISTimed
{
public: public:
RCLCPP_SMART_PTR_DEFINITIONS(ROSDefaultMultithreadedExecutor)
RCLCPP_PUBLIC RCLCPP_PUBLIC
explicit ROSDefaultMultithreadedExecutor( explicit ROSDefaultMultithreadedExecutor(
rclcpp::ExecutorOptions const &options = rclcpp::ExecutorOptions(), const rclcpp::ExecutorOptions &options = rclcpp::ExecutorOptions(), int number_of_threads = 2, std::chrono::nanoseconds next_exec_timeout = std::chrono::nanoseconds(-1));
int number_of_threads = 2,
std::chrono::nanoseconds next_exec_timeout = std::chrono::nanoseconds(-1));
RCLCPP_PUBLIC size_t get_number_of_threads(); RCLCPP_PUBLIC size_t get_number_of_threads();
RCLCPP_PUBLIC void spin() override; RCLCPP_PUBLIC void spin() override;
bool get_next_executable(rclcpp::AnyExecutable &any_executable, std::chrono::nanoseconds timeout = std::chrono::nanoseconds(-1));
bool get_next_executable(rclcpp::AnyExecutable& any_executable,
std::chrono::nanoseconds timeout = std::chrono::nanoseconds(-1));
protected: protected:
RCLCPP_PUBLIC void run(size_t thread_number); RCLCPP_PUBLIC void run(size_t thread_number);
private: private:
RCLCPP_DISABLE_COPY(ROSDefaultMultithreadedExecutor)
size_t number_of_threads_; size_t number_of_threads_;
std::set<rclcpp::TimerBase::SharedPtr> scheduled_timers_; std::set<rclcpp::TimerBase::SharedPtr> scheduled_timers_;
std::chrono::nanoseconds next_exec_timeout_{std::chrono::nanoseconds(-1)};
static std::unordered_map<ROSDefaultMultithreadedExecutor *, static std::unordered_map<ROSDefaultMultithreadedExecutor *,
std::shared_ptr<rclcpp::detail::MutexTwoPriorities>> std::shared_ptr<rclcpp::detail::MutexTwoPriorities>>
wait_mutex_set_; wait_mutex_set_;
static std::mutex shared_wait_mutex_; static std::mutex shared_wait_mutex_;
std::chrono::nanoseconds next_exec_timeout_ = std::chrono::nanoseconds(-1);
}; };
/// Single-threaded executor implementation. /// Single-threaded executor implementation.
/** /**
* This is the default executor created by rclcpp::spin. * This is the default executor created by rclcpp::spin.
*/ */
class ROSDefaultExecutor : public rclcpp::Executor, public RTISTimed { class ROSDefaultExecutor : public rclcpp::Executor, public RTISTimed
{
public: public:
RCLCPP_SMART_PTR_DEFINITIONS(ROSDefaultExecutor) RCLCPP_SMART_PTR_DEFINITIONS(ROSDefaultExecutor)
/// Default constructor. See the default constructor for Executor. /// Default constructor. See the default constructor for Executor.
RCLCPP_PUBLIC RCLCPP_PUBLIC
explicit ROSDefaultExecutor( explicit ROSDefaultExecutor(
rclcpp::ExecutorOptions const &options = rclcpp::ExecutorOptions()); const rclcpp::ExecutorOptions &options = rclcpp::ExecutorOptions());
/// Default destructor. /// Default destructor.
RCLCPP_PUBLIC RCLCPP_PUBLIC
@ -94,18 +92,16 @@ public:
* \throws std::runtime_error when spin() called while already spinning * \throws std::runtime_error when spin() called while already spinning
*/ */
RCLCPP_PUBLIC RCLCPP_PUBLIC
void spin() override; void
spin() override;
bool get_next_executable(rclcpp::AnyExecutable& any_executable, bool get_next_executable(rclcpp::AnyExecutable &any_executable, std::chrono::nanoseconds timeout = std::chrono::nanoseconds(-1));
std::chrono::nanoseconds timeout = std::chrono::nanoseconds(-1));
RCLCPP_PUBLIC RCLCPP_PUBLIC
void wait_for_work(std::chrono::nanoseconds timeout = std::chrono::nanoseconds(-1)); void
wait_for_work(std::chrono::nanoseconds timeout = std::chrono::nanoseconds(-1));
private: private:
RCLCPP_DISABLE_COPY(ROSDefaultExecutor) RCLCPP_DISABLE_COPY(ROSDefaultExecutor)
}; };
} // namespace priority_executor #endif // RCLCPP__EXECUTORS__SINGLE_THREADED_EXECUTOR_HPP_
#endif // PRIORITY_EXECUTOR__DEFAULT_EXECUTOR_HPP_

View file

@ -1,37 +1,18 @@
#ifndef PRIORITY_EXECUTOR__MULTITHREAD_PRIORITY_EXECUTOR_HPP_ #ifndef RTIS_MULTITHREAD_EXECUTOR
#define PRIORITY_EXECUTOR__MULTITHREAD_PRIORITY_EXECUTOR_HPP_ #define RTIS_MULTITHREAD_EXECUTOR
#include <priority_executor/priority_executor.hpp>
#include "rclcpp/detail/mutex_two_priorities.hpp"
#include <set> #include <set>
#include <mutex>
#include <chrono>
#include <memory>
#include <thread>
#include <unordered_map>
#include "priority_executor/default_executor.hpp" namespace timed_executor
#include "priority_executor/priority_executor.hpp" {
class MultithreadTimedExecutor : public TimedExecutor
#include "rclcpp/executor.hpp" {
#include "rclcpp/macros.hpp"
#include "rclcpp/memory_strategies.hpp"
#include "rclcpp/node.hpp"
#include "rclcpp/rate.hpp"
#include "rclcpp/utilities.hpp"
#include "rclcpp/visibility_control.hpp"
namespace priority_executor {
class MultithreadTimedExecutor : public TimedExecutor {
public: public:
RCLCPP_PUBLIC RCLCPP_PUBLIC
explicit MultithreadTimedExecutor( explicit MultithreadTimedExecutor(
rclcpp::ExecutorOptions const &options = rclcpp::ExecutorOptions(), const rclcpp::ExecutorOptions &options = rclcpp::ExecutorOptions(), std::string name = "unnamed executor", int number_of_threads = 2, std::chrono::nanoseconds next_exec_timeout = std::chrono::nanoseconds(-1));
std::string const &name = "unnamed executor",
int number_of_threads = 2,
std::chrono::nanoseconds next_exec_timeout = std::chrono::nanoseconds(-1));
RCLCPP_PUBLIC
virtual ~MultithreadTimedExecutor() = default;
RCLCPP_PUBLIC size_t get_number_of_threads(); RCLCPP_PUBLIC size_t get_number_of_threads();
RCLCPP_PUBLIC void spin() override; RCLCPP_PUBLIC void spin() override;
@ -40,17 +21,14 @@ protected:
RCLCPP_PUBLIC void run(size_t thread_number); RCLCPP_PUBLIC void run(size_t thread_number);
private: private:
RCLCPP_DISABLE_COPY(MultithreadTimedExecutor)
std::set<rclcpp::TimerBase::SharedPtr> scheduled_timers_;
size_t number_of_threads_; size_t number_of_threads_;
std::chrono::nanoseconds next_exec_timeout_{std::chrono::nanoseconds(-1)}; std::set<rclcpp::TimerBase::SharedPtr> scheduled_timers_;
static std::unordered_map<MultithreadTimedExecutor *, static std::unordered_map<MultithreadTimedExecutor *,
std::shared_ptr<rclcpp::detail::MutexTwoPriorities>> std::shared_ptr<rclcpp::detail::MutexTwoPriorities>>
wait_mutex_set_; wait_mutex_set_;
static std::mutex shared_wait_mutex_; static std::mutex shared_wait_mutex_;
std::chrono::nanoseconds next_exec_timeout_ = std::chrono::nanoseconds(-1);
}; };
}
} // namespace priority_executor #endif
#endif // PRIORITY_EXECUTOR__MULTITHREAD_PRIORITY_EXECUTOR_HPP_

View file

@ -1,237 +0,0 @@
#ifndef PERFORMANCE_MONITOR_HPP_
#define PERFORMANCE_MONITOR_HPP_
#include <atomic>
#include <chrono>
#include <deque>
#include <mutex>
#include <string>
#include <unordered_map>
#include <vector>
#include <nlohmann/json.hpp>
#include "rclcpp/rclcpp.hpp"
#include "simple_timer/rt-sched.hpp"
namespace priority_executor {
enum class PerformanceEventType {
CALLBACK_READY,
CALLBACK_START,
CALLBACK_END,
DEADLINE_MISSED,
DEADLINE_MET,
CHAIN_START,
CHAIN_END
};
struct PerformanceEvent {
uint64_t timestamp; // High-resolution timestamp (microseconds)
PerformanceEventType type; // Type of event
std::string node_name; // Name of the node
std::string callback_name; // Name of the callback
int chain_id{-1}; // Chain identifier
bool is_first_in_chain{false}; // First callback in chain
bool is_last_in_chain{false}; // Last callback in chain
uint64_t deadline{0}; // Deadline (if applicable)
uint64_t processing_time{0}; // Time spent executing (if applicable)
int executor_id{0}; // Executor identifier
std::string additional_data; // JSON string for extensible data
// Helper method to extract callback type from additional_data
std::string get_callback_type() const {
if (additional_data.empty()) return "unknown";
try {
nlohmann::json json = nlohmann::json::parse(additional_data);
if (json.contains("callback_type")) {
return json["callback_type"].get<std::string>();
}
} catch (...) {}
return "unknown";
}
// Helper method to extract thread info from additional_data
int get_thread_id() const {
if (additional_data.empty()) return -1;
try {
nlohmann::json json = nlohmann::json::parse(additional_data);
if (json.contains("thread_info") && json["thread_info"].is_object() &&
json["thread_info"].contains("thread_id")) {
return json["thread_info"]["thread_id"].get<int>();
}
// Direct thread_id if present
if (json.contains("thread_id")) {
return json["thread_id"].get<int>();
}
} catch (...) {}
return -1;
}
// Helper method to create a descriptive identifier for this callback
std::string get_descriptive_id() const {
std::stringstream ss;
ss << node_name;
if (!callback_name.empty()) {
ss << "/" << callback_name;
}
if (chain_id >= 0) {
ss << " (chain:" << chain_id;
if (is_first_in_chain) ss << " start";
if (is_last_in_chain) ss << " end";
ss << ")";
}
return ss.str();
}
// Convert additional_data to JSON object
nlohmann::json get_additional_data_json() const {
try {
if (!additional_data.empty()) {
return nlohmann::json::parse(additional_data);
}
} catch (...) {}
return nlohmann::json::object();
}
};
class PerformanceMonitor {
public:
static PerformanceMonitor& getInstance();
void recordEvent(const PerformanceEvent& event);
void setBufferSize(size_t size);
void setAutoDumpThreshold(size_t threshold);
void setDumpPath(const std::string& path);
bool dumpToFile(const std::string& filename);
void enable(bool enabled = true);
void clear();
// Helper method to set monitoring options in one call
void setMonitoringOptions(size_t buffer_size, size_t auto_dump_threshold,
const std::string& dump_path) {
setBufferSize(buffer_size);
setAutoDumpThreshold(auto_dump_threshold);
setDumpPath(dump_path);
}
// Get all events for a specific callback
std::vector<PerformanceEvent> getEventsForCallback(const std::string& node_name,
const std::string& callback_name) const {
std::vector<PerformanceEvent> result;
std::lock_guard<std::mutex> lock(event_mutex_);
for (const auto& event : event_buffer_) {
if (event.node_name == node_name && event.callback_name == callback_name) {
result.push_back(event);
}
}
return result;
}
// Get all events for a specific chain
std::vector<PerformanceEvent> getEventsForChain(int chain_id) const {
std::vector<PerformanceEvent> result;
std::lock_guard<std::mutex> lock(event_mutex_);
for (const auto& event : event_buffer_) {
if (event.chain_id == chain_id) {
result.push_back(event);
}
}
return result;
}
// Get all events by type
std::vector<PerformanceEvent> getEventsByType(PerformanceEventType type) const {
std::vector<PerformanceEvent> result;
std::lock_guard<std::mutex> lock(event_mutex_);
for (const auto& event : event_buffer_) {
if (event.type == type) {
result.push_back(event);
}
}
return result;
}
// Get a formatted report of callbacks and their execution times
std::string getCallbackExecutionReport() const {
std::lock_guard<std::mutex> lock(event_mutex_);
std::stringstream ss;
ss << "Callback Execution Report:\n";
ss << "=========================\n\n";
// Maps to store total execution time and count per callback
std::unordered_map<std::string, uint64_t> total_time;
std::unordered_map<std::string, int> call_count;
std::unordered_map<std::string, uint64_t> max_time;
std::unordered_map<std::string, uint64_t> min_time;
// Process all CALLBACK_END events (they contain processing_time)
for (const auto& event : event_buffer_) {
if (event.type == PerformanceEventType::CALLBACK_END && event.processing_time > 0) {
std::string callback_id = event.get_descriptive_id();
total_time[callback_id] += event.processing_time;
call_count[callback_id]++;
max_time[callback_id] = std::max(max_time[callback_id], event.processing_time);
if (min_time[callback_id] == 0 || event.processing_time < min_time[callback_id]) {
min_time[callback_id] = event.processing_time;
}
}
}
// Print results sorted by callback id
std::vector<std::string> callback_ids;
for (const auto& entry : call_count) {
callback_ids.push_back(entry.first);
}
std::sort(callback_ids.begin(), callback_ids.end());
for (const auto& id : callback_ids) {
double avg_time = static_cast<double>(total_time[id]) / call_count[id];
ss << id << "\n";
ss << " Calls: " << call_count[id] << "\n";
ss << " Avg time: " << avg_time << " µs\n";
ss << " Min time: " << min_time[id] << " µs\n";
ss << " Max time: " << max_time[id] << " µs\n";
ss << " Total time: " << total_time[id] << " µs\n\n";
}
return ss.str();
}
private:
PerformanceMonitor();
~PerformanceMonitor() = default;
PerformanceMonitor(const PerformanceMonitor&) = delete;
PerformanceMonitor& operator=(const PerformanceMonitor&) = delete;
PerformanceMonitor(PerformanceMonitor&&) = delete;
PerformanceMonitor& operator=(PerformanceMonitor&&) = delete;
mutable std::mutex event_mutex_;
std::vector<PerformanceEvent> event_buffer_;
size_t buffer_size_{10000};
size_t auto_dump_threshold_{0};
std::string dump_path_{"performance_logs"};
std::atomic<bool> enabled_{true};
std::string eventsToJson() const;
void checkAndAutoDump();
};
} // namespace priority_executor
#endif // PERFORMANCE_MONITOR_HPP_

View file

@ -12,39 +12,42 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
#ifndef PRIORITY_EXECUTOR__PRIORITY_EXECUTOR_HPP_ #ifndef RTIS_TIMED_EXECUTOR
#define PRIORITY_EXECUTOR__PRIORITY_EXECUTOR_HPP_ #define RTIS_TIMED_EXECUTOR
#include <rmw/rmw.h> #include <rmw/rmw.h>
#include <cassert> #include <cassert>
#include <cstdlib> #include <cstdlib>
#include <memory> #include <memory>
#include <string> #include <vector>
#include <time.h>
#include "rclcpp/executor.hpp" #include "rclcpp/executor.hpp"
#include "rclcpp/macros.hpp" #include "rclcpp/macros.hpp"
#include "rclcpp/memory_strategies.hpp"
#include "rclcpp/node.hpp"
#include "rclcpp/utilities.hpp"
#include "rclcpp/rate.hpp"
#include "rclcpp/visibility_control.hpp" #include "rclcpp/visibility_control.hpp"
#include "priority_executor/priority_memory_strategy.hpp" #include "priority_executor/priority_memory_strategy.hpp"
#include "priority_executor/default_executor.hpp" #include <priority_executor/default_executor.hpp>
#include "priority_executor/performance_monitor.hpp" namespace timed_executor
{
namespace priority_executor {
/// Single-threaded executor implementation. /// Single-threaded executor implementation.
/** /**
* This is the default executor created by rclcpp::spin. * This is the default executor created by rclcpp::spin.
*/ */
class TimedExecutor : public rclcpp::Executor, public RTISTimed { class TimedExecutor : public rclcpp::Executor, public RTISTimed
{
public: public:
RCLCPP_SMART_PTR_DEFINITIONS(TimedExecutor) RCLCPP_SMART_PTR_DEFINITIONS(TimedExecutor)
/// Default constructor. See the default constructor for Executor. /// Default constructor. See the default constructor for Executor.
RCLCPP_PUBLIC RCLCPP_PUBLIC
explicit TimedExecutor( explicit TimedExecutor(
rclcpp::ExecutorOptions const &options = rclcpp::ExecutorOptions(), const rclcpp::ExecutorOptions &options = rclcpp::ExecutorOptions(), std::string name = "unnamed executor");
std::string name = "unnamed executor");
/// Default destructor. /// Default destructor.
RCLCPP_PUBLIC RCLCPP_PUBLIC
@ -59,68 +62,28 @@ public:
* \throws std::runtime_error when spin() called while already spinning * \throws std::runtime_error when spin() called while already spinning
*/ */
RCLCPP_PUBLIC RCLCPP_PUBLIC
void spin() override; void
spin() override;
std::string name; std::string name;
void set_use_priorities(bool use_prio); void set_use_priorities(bool use_prio);
std::shared_ptr<PriorityMemoryStrategy<>> prio_memory_strategy_{nullptr}; std::shared_ptr<PriorityMemoryStrategy<>> prio_memory_strategy_ = nullptr;
// Performance monitoring control
void enableMonitoring(bool enable = true) {
PerformanceMonitor::getInstance().enable(enable);
}
void setMonitoringOptions(
size_t buffer_size,
size_t auto_dump_threshold,
const std::string& dump_path) {
auto& monitor = PerformanceMonitor::getInstance();
monitor.setBufferSize(buffer_size);
monitor.setAutoDumpThreshold(auto_dump_threshold);
monitor.setDumpPath(dump_path);
}
// Get a formatted report of all callback execution times
std::string getCallbackExecutionReport() const {
auto& monitor = PerformanceMonitor::getInstance();
return monitor.getCallbackExecutionReport();
}
// Manually trigger a log dump with a specific filename
bool dumpMonitoringData(const std::string& filename) {
auto& monitor = PerformanceMonitor::getInstance();
return monitor.dumpToFile(filename);
}
// Get executor ID for identification in logs
int getExecutorId() const {
return executor_id_;
}
// Get executor name for identification in logs
const std::string& getExecutorName() const {
return name;
}
protected: protected:
bool get_next_executable(rclcpp::AnyExecutable& any_executable, bool
std::chrono::nanoseconds timeout = std::chrono::nanoseconds(-1)); get_next_executable(rclcpp::AnyExecutable &any_executable, std::chrono::nanoseconds timeout = std::chrono::nanoseconds(-1));
private: private:
RCLCPP_DISABLE_COPY(TimedExecutor) RCLCPP_DISABLE_COPY(TimedExecutor)
void
wait_for_work(std::chrono::nanoseconds timeout);
void wait_for_work(std::chrono::nanoseconds timeout); bool
bool get_next_ready_executable(rclcpp::AnyExecutable& any_executable); get_next_ready_executable(rclcpp::AnyExecutable &any_executable);
void recordExecutionEvent(const rclcpp::AnyExecutable& any_exec,
PerformanceEventType event_type,
uint64_t timestamp,
uint64_t processing_time = 0);
bool use_priorities{true}; bool use_priorities = true;
static std::atomic<int> next_executor_id_;
int executor_id_;
}; };
} // namespace priority_executor } // namespace timed_executor
#endif // PRIORITY_EXECUTOR__PRIORITY_EXECUTOR_HPP_ #endif // RCLCPP__EXECUTORS__SINGLE_THREADED_EXECUTOR_HPP_

View file

@ -12,34 +12,37 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
#ifndef PRIORITY_EXECUTOR__PRIORITY_MEMORY_STRATEGY_HPP_ #ifndef RTIS_PRIORITY_STRATEGY
#define PRIORITY_EXECUTOR__PRIORITY_MEMORY_STRATEGY_HPP_ #define RTIS_PRIORITY_STRATEGY
#include <cstdint>
#include <deque>
#include <memory> #include <memory>
#include <semaphore.h>
#include <set>
#include <time.h>
#include <vector> #include <vector>
#include <queue>
#include <time.h>
#include <set>
#include "rcl/allocator.h" #include "rcl/allocator.h"
#include "rclcpp/allocator/allocator_common.hpp" #include "rclcpp/allocator/allocator_common.hpp"
#include "rclcpp/memory_strategy.hpp" #include "rclcpp/memory_strategy.hpp"
#include "rclcpp/node.hpp"
#include "rclcpp/visibility_control.hpp"
#include "rcutils/logging_macros.h"
#include "rmw/types.h"
#include "simple_timer/rt-sched.hpp" #include "simple_timer/rt-sched.hpp"
/// Delegate for handling memory allocations while the Executor is executing. /// Delegate for handling memory allocations while the Executor is executing.
/** /**
* By default, the memory strategy dynamically allocates memory for structures * By default, the memory strategy dynamically allocates memory for structures that come in from
* that come in from the rmw implementation after the executor waits for work, * the rmw implementation after the executor waits for work, based on the number of entities that
* based on the number of entities that come through. * come through.
*/ */
namespace priority_executor { enum ExecutableType
{
enum class ExecutableType {
SUBSCRIPTION, SUBSCRIPTION,
SERVICE, SERVICE,
CLIENT, CLIENT,
@ -47,139 +50,303 @@ enum class ExecutableType {
WAITABLE WAITABLE
}; };
std::ostream& operator<<(std::ostream& os, const ExecutableType& obj); enum ExecutableScheduleType
{
enum class ExecutableScheduleType {
DEADLINE = 0, DEADLINE = 0,
CHAIN_AWARE_PRIORITY, CHAIN_AWARE_PRIORITY,
CHAIN_INDEPENDENT_PRIORITY, // not used here CHAIN_INDEPENDENT_PRIORITY, // not used here
DEFAULT, // not used here DEFAULT, // not used here
}; };
std::ostream& operator<<(std::ostream& os, const ExecutableScheduleType& obj); class PriorityExecutable
{
class PriorityExecutable {
public: public:
static size_t num_executables;
explicit PriorityExecutable(
std::shared_ptr<const void> h,
int p,
ExecutableType t,
ExecutableScheduleType sched_type = ExecutableScheduleType::CHAIN_INDEPENDENT_PRIORITY);
PriorityExecutable();
void dont_run();
void allow_run();
void increment_counter();
bool operator==(PriorityExecutable const &other) const;
friend std::ostream& operator<<(std::ostream& os, PriorityExecutable const &pe);
std::shared_ptr<const void> handle; std::shared_ptr<const void> handle;
ExecutableType type; ExecutableType type;
bool can_be_run = false; bool can_be_run = false;
std::shared_ptr<rclcpp::Waitable> waitable; std::shared_ptr<rclcpp::Waitable> waitable;
ExecutableScheduleType sched_type; ExecutableScheduleType sched_type;
int priority = 0; int priority;
long period = 1000; // milliseconds long period = 1000; // milliseconds
bool is_first_in_chain = false; bool is_first_in_chain = false;
bool is_last_in_chain = false; bool is_last_in_chain = false;
std::deque<uint64_t>* deadlines = nullptr; // chain aware deadlines // chain aware deadlines
std::deque<uint64_t> *deadlines = nullptr;
std::shared_ptr<rclcpp::TimerBase> timer_handle; std::shared_ptr<rclcpp::TimerBase> timer_handle;
int chain_id = 0; // just used for logging // just used for logging
int counter = 0; // chain aware priority int chain_id = 0;
// chain aware priority
int counter = 0;
PriorityExecutable(std::shared_ptr<const void> h, int p, ExecutableType t, ExecutableScheduleType sched_type = CHAIN_INDEPENDENT_PRIORITY);
void dont_run();
void allow_run();
PriorityExecutable();
void increment_counter();
bool operator==(const PriorityExecutable &other) const;
static size_t num_executables;
int executable_id = 0; int executable_id = 0;
std::string name = ""; std::string name = "";
}; };
class PriorityExecutableComparator { class PriorityExecutableComparator
{
public: public:
bool operator()(PriorityExecutable const *p1, PriorityExecutable const *p2) const; bool operator()(const PriorityExecutable *p1, const PriorityExecutable *p2);
}; };
template <typename Alloc = std::allocator<void>> template <typename Alloc = std::allocator<void>>
class PriorityMemoryStrategy : public rclcpp::memory_strategy::MemoryStrategy { class PriorityMemoryStrategy : public rclcpp::memory_strategy::MemoryStrategy
{
public: public:
RCLCPP_SMART_PTR_DEFINITIONS(PriorityMemoryStrategy<Alloc>) RCLCPP_SMART_PTR_DEFINITIONS(PriorityMemoryStrategy<Alloc>)
using VoidAllocTraits = typename rclcpp::allocator::AllocRebind<void *, Alloc>; using VoidAllocTraits = typename rclcpp::allocator::AllocRebind<void *, Alloc>;
using VoidAlloc = typename VoidAllocTraits::allocator_type; using VoidAlloc = typename VoidAllocTraits::allocator_type;
explicit PriorityMemoryStrategy(std::shared_ptr<Alloc> allocator); explicit PriorityMemoryStrategy(std::shared_ptr<Alloc> allocator)
PriorityMemoryStrategy(); {
allocator_ = std::make_shared<VoidAlloc>(*allocator.get());
logger_ = create_logger();
}
PriorityMemoryStrategy()
{
allocator_ = std::make_shared<VoidAlloc>();
logger_ = create_logger();
}
node_time_logger logger_; node_time_logger logger_;
// Override methods
void add_guard_condition(const rcl_guard_condition_t *guard_condition) override; void add_guard_condition(const rcl_guard_condition_t *guard_condition) override;
void remove_guard_condition(const rcl_guard_condition_t *guard_condition) override; void remove_guard_condition(const rcl_guard_condition_t *guard_condition) override;
void clear_handles() override; void clear_handles() override;
void remove_null_handles(rcl_wait_set_t *wait_set) override; void remove_null_handles(rcl_wait_set_t *wait_set) override;
bool collect_entities(const WeakNodeList &weak_nodes) override; bool collect_entities(const WeakNodeList &weak_nodes) override;
void add_waitable_handle(const rclcpp::Waitable::SharedPtr &waitable) override; void add_waitable_handle(const rclcpp::Waitable::SharedPtr &waitable) override;
bool add_handles_to_wait_set(rcl_wait_set_t *wait_set) override; bool add_handles_to_wait_set(rcl_wait_set_t *wait_set) override;
// Class-specific methods void
void get_next_executable(rclcpp::AnyExecutable& any_exec, const WeakNodeList& weak_nodes); get_next_executable(
rclcpp::AnyExecutable &any_exec,
const WeakNodeList &weak_nodes);
/**
* thread-id is used for logging. if -1, then don't log the thread id
*/
void post_execute(rclcpp::AnyExecutable any_exec, int thread_id = -1); void post_execute(rclcpp::AnyExecutable any_exec, int thread_id = -1);
// Override virtual methods void
void get_next_subscription(rclcpp::AnyExecutable& any_exec, const WeakNodeList& weak_nodes) override; get_next_subscription(
void get_next_service(rclcpp::AnyExecutable& any_exec, const WeakNodeList& weak_nodes) override; rclcpp::AnyExecutable &any_exec,
void get_next_client(rclcpp::AnyExecutable& any_exec, const WeakNodeList& weak_nodes) override; const WeakNodeList &weak_nodes) override;
void get_next_timer(rclcpp::AnyExecutable& any_exec, const WeakNodeList& weak_nodes) override;
void get_next_waitable(rclcpp::AnyExecutable& any_exec, const WeakNodeList& weak_nodes) override;
// Priority handling methods void
std::shared_ptr<PriorityExecutable> get_priority_settings(std::shared_ptr<const void> executable); get_next_service(
rcl_allocator_t get_allocator() override; rclcpp::AnyExecutable &any_exec,
const WeakNodeList &weak_nodes) override;
// Counter methods void
size_t number_of_ready_subscriptions() const override; get_next_client(rclcpp::AnyExecutable &any_exec, const WeakNodeList &weak_nodes) override;
size_t number_of_ready_services() const override;
size_t number_of_ready_events() const override;
size_t number_of_ready_clients() const override;
size_t number_of_guard_conditions() const override;
size_t number_of_ready_timers() const override;
size_t number_of_waitables() const override;
// Executable priority methods void
void set_executable_priority(std::shared_ptr<const void> handle, int priority, ExecutableType t); get_next_timer(
void set_executable_priority(std::shared_ptr<const void> handle, int priority, ExecutableType t, rclcpp::AnyExecutable &any_exec,
ExecutableScheduleType sc, int chain_index); const WeakNodeList &weak_nodes) override;
void set_executable_deadline(std::shared_ptr<const void> handle, int period, ExecutableType t,
int chain_id = 0, std::string name = ""); void
int get_priority(std::shared_ptr<const void> executable); get_next_waitable(rclcpp::AnyExecutable &any_exec, const WeakNodeList &weak_nodes) override;
void set_first_in_chain(std::shared_ptr<const void> exec_handle);
void set_last_in_chain(std::shared_ptr<const void> exec_handle); std::shared_ptr<PriorityExecutable> get_priority_settings(std::shared_ptr<const void> executable)
void assign_deadlines_queue(std::shared_ptr<const void> exec_handle, std::deque<uint64_t>* deadlines); {
std::shared_ptr<std::deque<uint64_t>> get_chain_deadlines(int chain_id); auto search = priority_map.find(executable);
if (search != priority_map.end())
{
return search->second;
}
else
{
return nullptr;
}
}
rcl_allocator_t get_allocator() override
{
return rclcpp::allocator::get_rcl_allocator<void *, VoidAlloc>(*allocator_.get());
}
size_t number_of_ready_subscriptions() const override
{
size_t number_of_subscriptions = subscription_handles_.size();
// std::cout << "ready_raw: " << number_of_subscriptions << std::endl;
for (auto waitable : waitable_handles_)
{
number_of_subscriptions += waitable->get_number_of_ready_subscriptions();
}
return number_of_subscriptions;
}
size_t number_of_ready_services() const override
{
size_t number_of_services = service_handles_.size();
for (auto waitable : waitable_handles_)
{
number_of_services += waitable->get_number_of_ready_services();
}
return number_of_services;
}
size_t number_of_ready_events() const override
{
size_t number_of_events = 0;
for (auto waitable : waitable_handles_)
{
number_of_events += waitable->get_number_of_ready_events();
}
return number_of_events;
}
size_t number_of_ready_clients() const override
{
size_t number_of_clients = client_handles_.size();
for (auto waitable : waitable_handles_)
{
number_of_clients += waitable->get_number_of_ready_clients();
}
return number_of_clients;
}
size_t number_of_guard_conditions() const override
{
size_t number_of_guard_conditions = guard_conditions_.size();
for (auto waitable : waitable_handles_)
{
number_of_guard_conditions += waitable->get_number_of_ready_guard_conditions();
}
return number_of_guard_conditions;
}
size_t number_of_ready_timers() const override
{
size_t number_of_timers = timer_handles_.size();
for (auto waitable : waitable_handles_)
{
number_of_timers += waitable->get_number_of_ready_timers();
}
return number_of_timers;
}
size_t number_of_waitables() const override
{
return waitable_handles_.size();
}
void set_executable_priority(std::shared_ptr<const void> handle, int priority, ExecutableType t)
{
// TODO: any sanity checks should go here
priority_map[handle] = std::make_shared<PriorityExecutable>(handle, priority, t);
}
void set_executable_priority(std::shared_ptr<const void> handle, int priority, ExecutableType t, ExecutableScheduleType sc, int chain_index)
{
// TODO: any sanity checks should go here
priority_map[handle] = std::make_shared<PriorityExecutable>(handle, priority, t, sc);
priority_map[handle]->chain_id = chain_index;
}
void set_executable_deadline(std::shared_ptr<const void> handle, int period, ExecutableType t, int chain_id = 0, std::string name = "")
{
// TODO: any sanity checks should go here
priority_map[handle] = std::make_shared<PriorityExecutable>(handle, period, t, DEADLINE);
priority_map[handle]->chain_id = chain_id;
priority_map[handle]->name = name;
}
int get_priority(std::shared_ptr<const void> executable)
{
auto search = priority_map.find(executable);
if (search != priority_map.end())
{
return search->second->priority;
}
else
{
return 0;
}
}
void set_first_in_chain(std::shared_ptr<const void> exec_handle)
{
get_priority_settings(exec_handle)->is_first_in_chain = true;
}
void set_last_in_chain(std::shared_ptr<const void> exec_handle)
{
get_priority_settings(exec_handle)->is_last_in_chain = true;
}
void assign_deadlines_queue(std::shared_ptr<const void> exec_handle, std::deque<uint64_t> *deadlines)
{
get_priority_settings(exec_handle)->deadlines = deadlines;
}
private: private:
std::shared_ptr<PriorityExecutable> get_and_reset_priority(std::shared_ptr<const void> executable, std::shared_ptr<PriorityExecutable> get_and_reset_priority(std::shared_ptr<const void> executable, ExecutableType t)
ExecutableType t); {
// PriorityExecutable *p = get_priority_settings(executable);
std::shared_ptr<PriorityExecutable> p = get_priority_settings(executable);
if (p == nullptr)
{
// priority_map[executable] = PriorityExecutable(executable, 0, t);
priority_map[executable] = std::make_shared<PriorityExecutable>(executable, 0, t);
p = priority_map[executable];
}
// p->can_be_run = true;
return p;
}
template <typename T> template <typename T>
using VectorRebind = std::vector<T, typename std::allocator_traits<Alloc>::template rebind_alloc<T>>; using VectorRebind =
std::vector<T, typename std::allocator_traits<Alloc>::template rebind_alloc<T>>;
// Member variables
VectorRebind<const rcl_guard_condition_t *> guard_conditions_; VectorRebind<const rcl_guard_condition_t *> guard_conditions_;
VectorRebind<std::shared_ptr<const rcl_subscription_t>> subscription_handles_; VectorRebind<std::shared_ptr<const rcl_subscription_t>> subscription_handles_;
VectorRebind<std::shared_ptr<const rcl_service_t>> service_handles_; VectorRebind<std::shared_ptr<const rcl_service_t>> service_handles_;
VectorRebind<std::shared_ptr<const rcl_client_t>> client_handles_; VectorRebind<std::shared_ptr<const rcl_client_t>> client_handles_;
VectorRebind<std::shared_ptr<const rcl_timer_t>> timer_handles_; VectorRebind<std::shared_ptr<const rcl_timer_t>> timer_handles_;
VectorRebind<std::shared_ptr<rclcpp::Waitable>> waitable_handles_; VectorRebind<std::shared_ptr<rclcpp::Waitable>> waitable_handles_;
std::shared_ptr<VoidAlloc> allocator_; std::shared_ptr<VoidAlloc> allocator_;
// TODO: evaluate using node/subscription namespaced strings as keys
// holds *all* handle->priority mappings
std::map<std::shared_ptr<const void>, std::shared_ptr<PriorityExecutable>> priority_map; std::map<std::shared_ptr<const void>, std::shared_ptr<PriorityExecutable>> priority_map;
std::map<int, std::shared_ptr<std::deque<uint64_t>>> chain_deadlines;
// hold *only valid* executable+priorities
// std::priority_queue<const PriorityExecutable *, std::vector<const PriorityExecutable *>, PriorityExecutableComparator> all_executables_;
std::set<const PriorityExecutable *, PriorityExecutableComparator> all_executables_; std::set<const PriorityExecutable *, PriorityExecutableComparator> all_executables_;
// priority queue doesn't allow iteration. fortunately, std::map is sorted by key, so we can replace the priority queue with a map
// the key will be the priority. the value doesn't matter.
// std::map<const PriorityExecutable *, int, PriorityExecutableComparator> all_executables_ = std::map<const PriorityExecutable *, int, PriorityExecutableComparator>(PriorityExecutableComparator());
void add_executable_to_queue(std::shared_ptr<PriorityExecutable> p); void add_executable_to_queue(std::shared_ptr<PriorityExecutable> p);
}; };
} // namespace priority_executor #endif // RCLCPP__STRATEGIES__ALLOCATOR_MEMORY_STRATEGY_HPP_
#endif // PRIORITY_EXECUTOR__PRIORITY_MEMORY_STRATEGY_HPP_

View file

@ -2,18 +2,15 @@
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?> <?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3"> <package format="3">
<name>priority_executor</name> <name>priority_executor</name>
<version>0.1.0</version> <version>0.0.0</version>
<description> <description>TODO: Package description</description>
ROS 2 package implementing priority-based executors for real-time task scheduling
</description>
<maintainer email="kurt4wilson@gmail.com">kurt</maintainer> <maintainer email="kurt4wilson@gmail.com">kurt</maintainer>
<license>Apache License 2.0</license> <license>TODO: License declaration</license>
<buildtool_depend>ament_cmake</buildtool_depend> <buildtool_depend>ament_cmake</buildtool_depend>
<depend>rclcpp</depend> <depend>rclcpp</depend>
<depend>rcl</depend> <depend>rcl</depend>
<depend>rmw</depend>
<depend>std_msgs</depend> <depend>std_msgs</depend>
<depend>std_srvs</depend> <depend>std_srvs</depend>
<depend>simple_timer</depend> <depend>simple_timer</depend>

View file

@ -13,51 +13,42 @@
// limitations under the License. // limitations under the License.
#include "priority_executor/default_executor.hpp" #include "priority_executor/default_executor.hpp"
#include <mutex>
#include <chrono>
#include <memory>
#include <thread>
#include <sstream>
#include <functional>
#include <unordered_map>
#include "rcpputils/scope_exit.hpp" #include "rcpputils/scope_exit.hpp"
#include "rclcpp/any_executable.hpp" #include "rclcpp/any_executable.hpp"
#include "simple_timer/rt-sched.hpp" #include "simple_timer/rt-sched.hpp"
namespace priority_executor { ROSDefaultExecutor::ROSDefaultExecutor(const rclcpp::ExecutorOptions &options)
: rclcpp::Executor(options)
std::unordered_map<ROSDefaultMultithreadedExecutor*, {
std::shared_ptr<rclcpp::detail::MutexTwoPriorities>>
ROSDefaultMultithreadedExecutor::wait_mutex_set_;
std::mutex ROSDefaultMultithreadedExecutor::shared_wait_mutex_;
ROSDefaultExecutor::ROSDefaultExecutor(rclcpp::ExecutorOptions const &options)
: rclcpp::Executor(options) {
logger_ = create_logger(); logger_ = create_logger();
} }
ROSDefaultExecutor::~ROSDefaultExecutor() {} ROSDefaultExecutor::~ROSDefaultExecutor() {}
void ROSDefaultExecutor::wait_for_work(std::chrono::nanoseconds timeout) { void ROSDefaultExecutor::wait_for_work(std::chrono::nanoseconds timeout)
{
{ {
std::unique_lock<std::mutex> lock(memory_strategy_mutex_); std::unique_lock<std::mutex> lock(memory_strategy_mutex_);
// Collect the subscriptions and timers to be waited on // Collect the subscriptions and timers to be waited on
memory_strategy_->clear_handles(); memory_strategy_->clear_handles();
bool const has_invalid_weak_nodes = memory_strategy_->collect_entities(weak_nodes_); bool has_invalid_weak_nodes = memory_strategy_->collect_entities(weak_nodes_);
// Clean up any invalid nodes, if they were detected // Clean up any invalid nodes, if they were detected
if (has_invalid_weak_nodes) { if (has_invalid_weak_nodes)
{
auto node_it = weak_nodes_.begin(); auto node_it = weak_nodes_.begin();
auto gc_it = guard_conditions_.begin(); auto gc_it = guard_conditions_.begin();
while (node_it != weak_nodes_.end()) { while (node_it != weak_nodes_.end())
if (node_it->expired()) { {
if (node_it->expired())
{
node_it = weak_nodes_.erase(node_it); node_it = weak_nodes_.erase(node_it);
memory_strategy_->remove_guard_condition(*gc_it); memory_strategy_->remove_guard_condition(*gc_it);
gc_it = guard_conditions_.erase(gc_it); gc_it = guard_conditions_.erase(gc_it);
} else { }
else
{
++node_it; ++node_it;
++gc_it; ++gc_it;
} }
@ -65,33 +56,37 @@ void ROSDefaultExecutor::wait_for_work(std::chrono::nanoseconds timeout) {
} }
// clear wait set // clear wait set
rcl_ret_t ret = rcl_wait_set_clear(&wait_set_); rcl_ret_t ret = rcl_wait_set_clear(&wait_set_);
if (ret != RCL_RET_OK) { if (ret != RCL_RET_OK)
{
rclcpp::exceptions::throw_from_rcl_error(ret, "Couldn't clear wait set"); rclcpp::exceptions::throw_from_rcl_error(ret, "Couldn't clear wait set");
} }
// The size of waitables are accounted for in size of the other entities // The size of waitables are accounted for in size of the other entities
ret = rcl_wait_set_resize( ret = rcl_wait_set_resize(
&wait_set_, memory_strategy_->number_of_ready_subscriptions(), &wait_set_, memory_strategy_->number_of_ready_subscriptions(),
memory_strategy_->number_of_guard_conditions(), memory_strategy_->number_of_guard_conditions(), memory_strategy_->number_of_ready_timers(),
memory_strategy_->number_of_ready_timers(), memory_strategy_->number_of_ready_clients(), memory_strategy_->number_of_ready_services(),
memory_strategy_->number_of_ready_clients(),
memory_strategy_->number_of_ready_services(),
memory_strategy_->number_of_ready_events()); memory_strategy_->number_of_ready_events());
if (RCL_RET_OK != ret) { if (RCL_RET_OK != ret)
{
rclcpp::exceptions::throw_from_rcl_error(ret, "Couldn't resize the wait set"); rclcpp::exceptions::throw_from_rcl_error(ret, "Couldn't resize the wait set");
} }
if (!memory_strategy_->add_handles_to_wait_set(&wait_set_)) { if (!memory_strategy_->add_handles_to_wait_set(&wait_set_))
{
throw std::runtime_error("Couldn't fill wait set"); throw std::runtime_error("Couldn't fill wait set");
} }
} }
rcl_ret_t const status = rcl_ret_t status =
rcl_wait(&wait_set_, std::chrono::duration_cast<std::chrono::nanoseconds>(timeout).count()); rcl_wait(&wait_set_, std::chrono::duration_cast<std::chrono::nanoseconds>(timeout).count());
if (status == RCL_RET_WAIT_SET_EMPTY) { if (status == RCL_RET_WAIT_SET_EMPTY)
{
RCUTILS_LOG_WARN_NAMED( RCUTILS_LOG_WARN_NAMED(
"rclcpp", "rclcpp",
"empty wait set received in rcl_wait(). This should never happen."); "empty wait set received in rcl_wait(). This should never happen.");
} else if (status != RCL_RET_OK && status != RCL_RET_TIMEOUT) { }
else if (status != RCL_RET_OK && status != RCL_RET_TIMEOUT)
{
using rclcpp::exceptions::throw_from_rcl_error; using rclcpp::exceptions::throw_from_rcl_error;
throw_from_rcl_error(status, "rcl_wait() failed"); throw_from_rcl_error(status, "rcl_wait() failed");
} }
@ -101,8 +96,8 @@ void ROSDefaultExecutor::wait_for_work(std::chrono::nanoseconds timeout) {
memory_strategy_->remove_null_handles(&wait_set_); memory_strategy_->remove_null_handles(&wait_set_);
} }
bool ROSDefaultExecutor::get_next_executable(rclcpp::AnyExecutable& any_executable, bool ROSDefaultExecutor::get_next_executable(rclcpp::AnyExecutable &any_executable, std::chrono::nanoseconds timeout)
std::chrono::nanoseconds timeout) { {
bool success = false; bool success = false;
// Check to see if there are any subscriptions or timers needing service // Check to see if there are any subscriptions or timers needing service
// TODO(wjwwood): improve run to run efficiency of this function // TODO(wjwwood): improve run to run efficiency of this function
@ -112,28 +107,76 @@ bool ROSDefaultExecutor::get_next_executable(rclcpp::AnyExecutable& any_executab
return success; return success;
} }
void ROSDefaultExecutor::spin() { void ROSDefaultExecutor::spin()
if (spinning.exchange(true)) { {
if (spinning.exchange(true))
{
throw std::runtime_error("spin() called while already spinning"); throw std::runtime_error("spin() called while already spinning");
} }
RCPPUTILS_SCOPE_EXIT(this->spinning.store(false);); RCPPUTILS_SCOPE_EXIT(this->spinning.store(false););
while (rclcpp::ok(this->context_) && spinning.load()) { while (rclcpp::ok(this->context_) && spinning.load())
{
rclcpp::AnyExecutable any_executable; rclcpp::AnyExecutable any_executable;
if (get_next_executable(any_executable)) { if (get_next_executable(any_executable))
{
execute_any_executable(any_executable); execute_any_executable(any_executable);
} }
} }
} }
size_t ROSDefaultMultithreadedExecutor::get_number_of_threads() { bool ROSDefaultMultithreadedExecutor::get_next_executable(rclcpp::AnyExecutable &any_executable, std::chrono::nanoseconds timeout)
return number_of_threads_; {
bool success = false;
// Check to see if there are any subscriptions or timers needing service
// TODO(wjwwood): improve run to run efficiency of this function
// try to get an executable
// record the start time
auto start = std::chrono::steady_clock::now();
success = get_next_ready_executable(any_executable);
// and the end time
auto end = std::chrono::steady_clock::now();
std::stringstream oss;
oss << "{\"operation\":\"get_next_executable\", \"result\":\"" << success << "\", \"duration\":\"" << std::chrono::duration_cast<std::chrono::microseconds>(end - start).count() << "\"}";
log_entry(logger_, oss.str());
// If there are none
if (!success)
{
// Wait for subscriptions or timers to work on
// queue refresh
start = std::chrono::steady_clock::now();
wait_for_work(timeout);
// and the end time
end = std::chrono::steady_clock::now();
auto wait_duration = std::chrono::duration_cast<std::chrono::microseconds>(end - start).count();
oss.str("");
oss << "{\"operation\":\"wait_for_work\", \"result\":\"" << success << "\", \"wait_duration\":\"" << wait_duration << "\"}";
log_entry(logger_, oss.str());
if (!spinning.load())
{
return false;
}
// Try again
start = std::chrono::steady_clock::now();
success = get_next_ready_executable(any_executable);
// and the end time
end = std::chrono::steady_clock::now();
oss.str("");
oss << "{\"operation\":\"get_next_executable\", \"result\":\"" << success << "\", \"duration\":\"" << std::chrono::duration_cast<std::chrono::microseconds>(end - start).count() << "\"}";
log_entry(logger_, oss.str());
}
return success;
} }
std::unordered_map<ROSDefaultMultithreadedExecutor *, std::shared_ptr<rclcpp::detail::MutexTwoPriorities>> ROSDefaultMultithreadedExecutor::wait_mutex_set_;
std::mutex ROSDefaultMultithreadedExecutor::shared_wait_mutex_;
ROSDefaultMultithreadedExecutor::ROSDefaultMultithreadedExecutor( ROSDefaultMultithreadedExecutor::ROSDefaultMultithreadedExecutor(
rclcpp::ExecutorOptions const &options, const rclcpp::ExecutorOptions &options,
int number_of_threads, int number_of_threads, std::chrono::nanoseconds next_exec_timeout)
std::chrono::nanoseconds next_exec_timeout) : Executor(options)
: Executor(options) { {
std::lock_guard<std::mutex> wait_lock(ROSDefaultMultithreadedExecutor::shared_wait_mutex_); std::lock_guard<std::mutex> wait_lock(ROSDefaultMultithreadedExecutor::shared_wait_mutex_);
wait_mutex_set_[this] = std::make_shared<rclcpp::detail::MutexTwoPriorities>(); wait_mutex_set_[this] = std::make_shared<rclcpp::detail::MutexTwoPriorities>();
number_of_threads_ = number_of_threads; number_of_threads_ = number_of_threads;
@ -141,8 +184,10 @@ ROSDefaultMultithreadedExecutor::ROSDefaultMultithreadedExecutor(
logger_ = create_logger(); logger_ = create_logger();
} }
void ROSDefaultMultithreadedExecutor::spin() { void ROSDefaultMultithreadedExecutor::spin()
if (spinning.exchange(true)) { {
if (spinning.exchange(true))
{
throw std::runtime_error("spin() called while already spinning"); throw std::runtime_error("spin() called while already spinning");
} }
@ -151,39 +196,45 @@ void ROSDefaultMultithreadedExecutor::spin() {
{ {
auto wait_mutex = ROSDefaultMultithreadedExecutor::wait_mutex_set_[this]; auto wait_mutex = ROSDefaultMultithreadedExecutor::wait_mutex_set_[this];
auto low_priority_wait_mutex = wait_mutex->get_low_priority_lockable(); auto low_priority_wait_mutex = wait_mutex->get_low_priority_lockable();
std::lock_guard<rclcpp::detail::MutexTwoPriorities::LowPriorityLockable> std::lock_guard<rclcpp::detail::MutexTwoPriorities::LowPriorityLockable> wait_lock(low_priority_wait_mutex);
wait_lock(low_priority_wait_mutex); for (; thread_id < number_of_threads_ - 1; ++thread_id)
for (; thread_id < number_of_threads_ - 1; ++thread_id) { {
auto const func = std::bind(&ROSDefaultMultithreadedExecutor::run, this, thread_id); auto func = std::bind(&ROSDefaultMultithreadedExecutor::run, this, thread_id);
threads.emplace_back(func); threads.emplace_back(func);
} }
} }
run(thread_id); run(thread_id);
for (auto& thread : threads) { for (auto &thread : threads)
{
thread.join(); thread.join();
} }
} }
void ROSDefaultMultithreadedExecutor::run(size_t thread_number) { void ROSDefaultMultithreadedExecutor::run(__attribute__((unused)) size_t _thread_number) {
while (rclcpp::ok(this->context_) && spinning.load()) { while (rclcpp::ok(this->context_) && spinning.load())
{
rclcpp::AnyExecutable any_exec; rclcpp::AnyExecutable any_exec;
{ {
auto wait_mutex = ROSDefaultMultithreadedExecutor::wait_mutex_set_[this]; auto wait_mutex = ROSDefaultMultithreadedExecutor::wait_mutex_set_[this];
auto low_priority_wait_mutex = wait_mutex->get_low_priority_lockable(); auto low_priority_wait_mutex = wait_mutex->get_low_priority_lockable();
std::lock_guard<rclcpp::detail::MutexTwoPriorities::LowPriorityLockable> std::lock_guard<rclcpp::detail::MutexTwoPriorities::LowPriorityLockable> wait_lock(low_priority_wait_mutex);
wait_lock(low_priority_wait_mutex); if (!rclcpp::ok(this->context_) || !spinning.load())
if (!rclcpp::ok(this->context_) || !spinning.load()) { {
return; return;
} }
if (!get_next_executable(any_exec, next_exec_timeout_)) { if (!get_next_executable(any_exec, next_exec_timeout_))
{
continue; continue;
} }
if (any_exec.timer) { if (any_exec.timer)
{
// Guard against multiple threads getting the same timer. // Guard against multiple threads getting the same timer.
if (scheduled_timers_.count(any_exec.timer) != 0) { if (scheduled_timers_.count(any_exec.timer) != 0)
{
// Make sure that any_exec's callback group is reset before // Make sure that any_exec's callback group is reset before
// the lock is released. // the lock is released.
if (any_exec.callback_group) { if (any_exec.callback_group)
{
any_exec.callback_group->can_be_taken_from().store(true); any_exec.callback_group->can_be_taken_from().store(true);
} }
continue; continue;
@ -191,16 +242,21 @@ void ROSDefaultMultithreadedExecutor::run(size_t thread_number) {
scheduled_timers_.insert(any_exec.timer); scheduled_timers_.insert(any_exec.timer);
} }
} }
// if (yield_before_execute_)
// {
// std::this_thread::yield();
// }
execute_any_executable(any_exec); execute_any_executable(any_exec);
if (any_exec.timer) { if (any_exec.timer)
{
auto wait_mutex = ROSDefaultMultithreadedExecutor::wait_mutex_set_[this]; auto wait_mutex = ROSDefaultMultithreadedExecutor::wait_mutex_set_[this];
auto high_priority_wait_mutex = wait_mutex->get_high_priority_lockable(); auto high_priority_wait_mutex = wait_mutex->get_high_priority_lockable();
std::lock_guard<rclcpp::detail::MutexTwoPriorities::HighPriorityLockable> std::lock_guard<rclcpp::detail::MutexTwoPriorities::HighPriorityLockable> wait_lock(high_priority_wait_mutex);
wait_lock(high_priority_wait_mutex); auto it = scheduled_timers_.find(any_exec.timer);
auto const it = scheduled_timers_.find(any_exec.timer); if (it != scheduled_timers_.end())
if (it != scheduled_timers_.end()) { {
scheduled_timers_.erase(it); scheduled_timers_.erase(it);
} }
} }
@ -209,57 +265,3 @@ void ROSDefaultMultithreadedExecutor::run(size_t thread_number) {
any_exec.callback_group.reset(); any_exec.callback_group.reset();
} }
} }
bool ROSDefaultMultithreadedExecutor::get_next_executable(rclcpp::AnyExecutable& any_executable,
std::chrono::nanoseconds timeout) {
bool success = false;
// Check to see if there are any subscriptions or timers needing service
// TODO(wjwwood): improve run to run efficiency of this function
// try to get an executable
// record the start time
auto const start = std::chrono::steady_clock::now();
success = get_next_ready_executable(any_executable);
// and the end time
auto const end = std::chrono::steady_clock::now();
std::stringstream oss;
oss << "{\"operation\":\"get_next_executable\", \"result\":\"" << success
<< "\", \"duration\":\""
<< std::chrono::duration_cast<std::chrono::microseconds>(end - start).count()
<< "\"}";
log_entry(logger_, oss.str());
// If there are none
if (!success) {
// Wait for subscriptions or timers to work on
// queue refresh
auto const wait_start = std::chrono::steady_clock::now();
wait_for_work(timeout);
// and the end time
auto const wait_end = std::chrono::steady_clock::now();
auto const wait_duration = std::chrono::duration_cast<std::chrono::microseconds>(
wait_end - wait_start).count();
oss.str("");
oss << "{\"operation\":\"wait_for_work\", \"result\":\"" << success
<< "\", \"wait_duration\":\"" << wait_duration << "\"}";
log_entry(logger_, oss.str());
if (!spinning.load()) {
return false;
}
// Try again
auto const retry_start = std::chrono::steady_clock::now();
success = get_next_ready_executable(any_executable);
// and the end time
auto const retry_end = std::chrono::steady_clock::now();
oss.str("");
oss << "{\"operation\":\"get_next_executable\", \"result\":\"" << success
<< "\", \"duration\":\""
<< std::chrono::duration_cast<std::chrono::microseconds>(
retry_end - retry_start).count()
<< "\"}";
log_entry(logger_, oss.str());
}
return success;
}
} // namespace priority_executor

View file

@ -1,32 +1,15 @@
#include "priority_executor/multithread_priority_executor.hpp" #include "priority_executor/multithread_priority_executor.hpp"
#include "priority_executor/performance_monitor.hpp" namespace timed_executor
{
#include <chrono> std::unordered_map<MultithreadTimedExecutor *, std::shared_ptr<rclcpp::detail::MutexTwoPriorities>>
#include <functional>
#include <memory>
#include <mutex>
#include <sstream>
#include <thread>
#include <unordered_map>
#include <nlohmann/json.hpp>
#include "rcpputils/scope_exit.hpp"
#include "rclcpp/any_executable.hpp"
#include "simple_timer/rt-sched.hpp"
namespace priority_executor {
std::unordered_map<MultithreadTimedExecutor*,
std::shared_ptr<rclcpp::detail::MutexTwoPriorities>>
MultithreadTimedExecutor::wait_mutex_set_; MultithreadTimedExecutor::wait_mutex_set_;
std::mutex MultithreadTimedExecutor::shared_wait_mutex_; std::mutex MultithreadTimedExecutor::shared_wait_mutex_;
MultithreadTimedExecutor::MultithreadTimedExecutor( MultithreadTimedExecutor::MultithreadTimedExecutor(
rclcpp::ExecutorOptions const &options, const rclcpp::ExecutorOptions &options,
std::string const &name, std::string name,
int number_of_threads, int number_of_threads, std::chrono::nanoseconds next_exec_timeout)
std::chrono::nanoseconds next_exec_timeout) : TimedExecutor(options, name)
: TimedExecutor(options, name) { {
std::lock_guard<std::mutex> wait_lock(MultithreadTimedExecutor::shared_wait_mutex_); std::lock_guard<std::mutex> wait_lock(MultithreadTimedExecutor::shared_wait_mutex_);
wait_mutex_set_[this] = std::make_shared<rclcpp::detail::MutexTwoPriorities>(); wait_mutex_set_[this] = std::make_shared<rclcpp::detail::MutexTwoPriorities>();
number_of_threads_ = number_of_threads; number_of_threads_ = number_of_threads;
@ -39,8 +22,10 @@ size_t MultithreadTimedExecutor::get_number_of_threads()
return number_of_threads_; return number_of_threads_;
} }
void MultithreadTimedExecutor::spin() { void MultithreadTimedExecutor::spin()
if (spinning.exchange(true)) { {
if (spinning.exchange(true))
{
throw std::runtime_error("spin() called while already spinning"); throw std::runtime_error("spin() called while already spinning");
} }
@ -49,239 +34,81 @@ void MultithreadTimedExecutor::spin() {
{ {
auto wait_mutex = MultithreadTimedExecutor::wait_mutex_set_[this]; auto wait_mutex = MultithreadTimedExecutor::wait_mutex_set_[this];
auto low_priority_wait_mutex = wait_mutex->get_low_priority_lockable(); auto low_priority_wait_mutex = wait_mutex->get_low_priority_lockable();
std::lock_guard<rclcpp::detail::MutexTwoPriorities::LowPriorityLockable> std::lock_guard<rclcpp::detail::MutexTwoPriorities::LowPriorityLockable> wait_lock(low_priority_wait_mutex);
wait_lock(low_priority_wait_mutex); for (; thread_id < number_of_threads_ - 1; ++thread_id)
for (; thread_id < number_of_threads_ - 1; ++thread_id) { {
auto const func = std::bind(&MultithreadTimedExecutor::run, this, thread_id); auto func = std::bind(&MultithreadTimedExecutor::run, this, thread_id);
threads.emplace_back(func); threads.emplace_back(func);
} }
} }
run(thread_id); run(thread_id);
for (auto& thread : threads) { for (auto &thread : threads)
{
thread.join(); thread.join();
} }
spinning.store(false);
} }
void MultithreadTimedExecutor::run(size_t _thread_number) { void MultithreadTimedExecutor::run(size_t thread_number)
{
// set affinity // set affinity
cpu_set_t cpuset; cpu_set_t cpuset;
CPU_ZERO(&cpuset); CPU_ZERO(&cpuset);
CPU_SET(_thread_number, &cpuset); CPU_SET(thread_number, &cpuset);
int rc = pthread_setaffinity_np(pthread_self(), sizeof(cpu_set_t), &cpuset); int rc = pthread_setaffinity_np(pthread_self(), sizeof(cpu_set_t), &cpuset);
if (rc != 0) if (rc != 0)
{ {
RCLCPP_ERROR(rclcpp::get_logger("priority_executor"), std::cout << "Error calling pthread_setaffinity_np: " << rc << "\n";
"Error calling pthread_setaffinity_np: %d", rc);
} }
// Create thread context information for logs while (rclcpp::ok(this->context_) && spinning.load())
nlohmann::json thread_info; {
thread_info["thread_id"] = _thread_number; rclcpp::AnyExecutable any_executable;
thread_info["cpu_affinity"] = _thread_number;
std::string thread_info_str = thread_info.dump();
while (rclcpp::ok(this->context_) && spinning.load()) {
rclcpp::AnyExecutable any_exec;
{ {
auto wait_mutex = MultithreadTimedExecutor::wait_mutex_set_[this]; auto wait_mutex = MultithreadTimedExecutor::wait_mutex_set_[this];
auto low_priority_wait_mutex = wait_mutex->get_low_priority_lockable(); auto low_priority_wait_mutex = wait_mutex->get_low_priority_lockable();
std::lock_guard<rclcpp::detail::MutexTwoPriorities::LowPriorityLockable> std::lock_guard<rclcpp::detail::MutexTwoPriorities::LowPriorityLockable> wait_lock(low_priority_wait_mutex);
wait_lock(low_priority_wait_mutex); if (!rclcpp::ok(this->context_) || !spinning.load())
if (!rclcpp::ok(this->context_) || !spinning.load()) { {
return; return;
} }
if (!get_next_executable(any_exec, next_exec_timeout_)) { if (!get_next_executable(any_executable, next_exec_timeout_))
{
continue; continue;
} }
if (any_exec.timer) { if (any_executable.timer)
// Guard against multiple threads getting the same timer. {
if (scheduled_timers_.count(any_exec.timer) != 0) { if (scheduled_timers_.count(any_executable.timer) != 0)
// Make sure that any_exec's callback group is reset before {
// the lock is released. if (any_executable.callback_group)
if (any_exec.callback_group) { {
any_exec.callback_group->can_be_taken_from().store(true); any_executable.callback_group->can_be_taken_from().store(true);
} }
continue; continue;
} }
scheduled_timers_.insert(any_exec.timer); scheduled_timers_.insert(any_executable.timer);
} }
} }
execute_any_executable(any_executable);
// Record execution start with thread info if (any_executable.timer)
auto start = std::chrono::steady_clock::now(); {
uint64_t start_time = std::chrono::duration_cast<std::chrono::microseconds>(
start.time_since_epoch()).count();
PerformanceEvent start_event;
start_event.timestamp = start_time;
start_event.type = PerformanceEventType::CALLBACK_START;
start_event.executor_id = getExecutorId();
if (any_exec.node_base) {
start_event.node_name = any_exec.node_base->get_name();
// Enhanced callback identification based on executable type
if (any_exec.timer) {
// For timer callbacks
std::stringstream ss;
// retrieve the timer period
int64_t period;
rcl_timer_get_period(any_exec.timer->get_timer_handle().get(), &period);
// convert period (which is in nanoseconds) to milliseconds
std::chrono::nanoseconds period_ns(period);
std::chrono::milliseconds period_ms = std::chrono::duration_cast<std::chrono::milliseconds>(period_ns);
ss << "timer_" << period_ms.count() << "ms";
start_event.callback_name = ss.str();
// Add memory address as unique identifier
std::stringstream addr;
addr << "@" << any_exec.timer.get();
nlohmann::json additional;
additional["callback_type"] = "timer";
additional["timer_addr"] = addr.str();
additional["thread_info"] = thread_info;
start_event.additional_data = additional.dump();
}
else if (any_exec.subscription) {
// For subscription callbacks
start_event.callback_name = any_exec.subscription->get_topic_name();
nlohmann::json additional;
additional["callback_type"] = "subscription";
additional["thread_info"] = thread_info;
start_event.additional_data = additional.dump();
}
else if (any_exec.service) {
start_event.callback_name = any_exec.service->get_service_name();
nlohmann::json additional;
additional["callback_type"] = "service";
additional["thread_info"] = thread_info;
start_event.additional_data = additional.dump();
}
else if (any_exec.client) {
start_event.callback_name = "client_callback";
nlohmann::json additional;
additional["callback_type"] = "client";
additional["thread_info"] = thread_info;
start_event.additional_data = additional.dump();
}
else if (any_exec.waitable) {
start_event.callback_name = "waitable_callback";
nlohmann::json additional;
additional["callback_type"] = "waitable";
additional["thread_info"] = thread_info;
start_event.additional_data = additional.dump();
}
// Add chain information
std::shared_ptr<PriorityExecutable> executable = nullptr;
if (prio_memory_strategy_) {
if (any_exec.timer) {
executable = prio_memory_strategy_->get_priority_settings(
any_exec.timer->get_timer_handle());
} else if (any_exec.subscription) {
executable = prio_memory_strategy_->get_priority_settings(
any_exec.subscription->get_subscription_handle());
}
}
if (executable) {
start_event.chain_id = executable->chain_id;
start_event.is_first_in_chain = executable->is_first_in_chain;
start_event.is_last_in_chain = executable->is_last_in_chain;
if (executable->deadlines && !executable->deadlines->empty()) {
start_event.deadline = executable->deadlines->front();
}
// Include the priority information
try {
nlohmann::json additional = start_event.additional_data.empty() ?
nlohmann::json() : nlohmann::json::parse(start_event.additional_data);
additional["priority"] = executable->priority;
if (executable->chain_id >= 0) {
additional["chain_id"] = executable->chain_id;
}
if (executable->sched_type != ExecutableScheduleType::DEFAULT) {
additional["schedule_type"] = static_cast<int>(executable->sched_type);
}
start_event.additional_data = additional.dump();
} catch (...) {
// If there was an error parsing the JSON, create new JSON object
nlohmann::json additional;
additional["priority"] = executable->priority;
additional["thread_info"] = thread_info;
start_event.additional_data = additional.dump();
}
}
} else {
start_event.additional_data = thread_info_str;
}
PerformanceMonitor::getInstance().recordEvent(start_event);
// Execute the callback
execute_any_executable(any_exec);
// Record execution completion
auto end = std::chrono::steady_clock::now();
uint64_t end_time = std::chrono::duration_cast<std::chrono::microseconds>(
end.time_since_epoch()).count();
uint64_t processing_time = end_time - start_time;
// Create identical event structure but for end event
PerformanceEvent end_event = start_event;
end_event.timestamp = end_time;
end_event.type = PerformanceEventType::CALLBACK_END;
end_event.processing_time = processing_time;
// Update the additional data to include processing time
try {
nlohmann::json additional = end_event.additional_data.empty() ?
nlohmann::json() : nlohmann::json::parse(end_event.additional_data);
additional["processing_time_us"] = processing_time;
end_event.additional_data = additional.dump();
} catch (...) {
// If JSON parsing failed, create a new object
nlohmann::json additional;
additional["processing_time_us"] = processing_time;
additional["thread_info"] = thread_info;
end_event.additional_data = additional.dump();
}
PerformanceMonitor::getInstance().recordEvent(end_event);
if (any_exec.timer) {
auto wait_mutex = MultithreadTimedExecutor::wait_mutex_set_[this]; auto wait_mutex = MultithreadTimedExecutor::wait_mutex_set_[this];
auto high_priority_wait_mutex = wait_mutex->get_high_priority_lockable(); auto high_priority_wait_mutex = wait_mutex->get_high_priority_lockable();
std::lock_guard<rclcpp::detail::MutexTwoPriorities::HighPriorityLockable> std::lock_guard<rclcpp::detail::MutexTwoPriorities::HighPriorityLockable> wait_lock(high_priority_wait_mutex);
wait_lock(high_priority_wait_mutex); auto it = scheduled_timers_.find(any_executable.timer);
auto const it = scheduled_timers_.find(any_exec.timer); if (it != scheduled_timers_.end())
if (it != scheduled_timers_.end()) { {
scheduled_timers_.erase(it); scheduled_timers_.erase(it);
} }
} }
// Clear the callback_group to prevent the AnyExecutable destructor from any_executable.callback_group.reset();
// resetting the callback group `can_be_taken_from`
any_exec.callback_group.reset();
if (prio_memory_strategy_ != nullptr) if (prio_memory_strategy_ != nullptr)
{ {
auto wait_mutex = MultithreadTimedExecutor::wait_mutex_set_[this]; auto wait_mutex = MultithreadTimedExecutor::wait_mutex_set_[this];
auto low_priority_wait_mutex = wait_mutex->get_low_priority_lockable(); auto low_priority_wait_mutex = wait_mutex->get_low_priority_lockable();
std::lock_guard<rclcpp::detail::MutexTwoPriorities::LowPriorityLockable> wait_lock(low_priority_wait_mutex); std::lock_guard<rclcpp::detail::MutexTwoPriorities::LowPriorityLockable> wait_lock(low_priority_wait_mutex);
prio_memory_strategy_->post_execute(any_exec, _thread_number); // std::shared_ptr<PriorityMemoryStrategy<>> prio_memory_strategy_ = std::dynamic_pointer_cast<PriorityMemoryStrategy>(memory_strategy_);
prio_memory_strategy_->post_execute(any_executable, thread_number);
}
} }
} }
} }
} // namespace priority_executor

View file

@ -1,149 +0,0 @@
#include "priority_executor/performance_monitor.hpp"
#include <filesystem>
#include <fstream>
#include <nlohmann/json.hpp>
#include <sstream>
#include <iomanip>
namespace priority_executor {
PerformanceMonitor& PerformanceMonitor::getInstance() {
static PerformanceMonitor instance;
return instance;
}
PerformanceMonitor::PerformanceMonitor() {
event_buffer_.reserve(buffer_size_);
std::filesystem::create_directories(dump_path_);
}
void PerformanceMonitor::recordEvent(const PerformanceEvent& event) {
if (!enabled_) return;
{
std::lock_guard<std::mutex> lock(event_mutex_);
event_buffer_.push_back(event);
if (auto_dump_threshold_ > 0 && event_buffer_.size() >= auto_dump_threshold_) {
checkAndAutoDump();
}
}
}
void PerformanceMonitor::setBufferSize(size_t size) {
std::lock_guard<std::mutex> lock(event_mutex_);
buffer_size_ = size;
event_buffer_.reserve(size);
}
void PerformanceMonitor::setAutoDumpThreshold(size_t threshold) {
auto_dump_threshold_ = threshold;
}
void PerformanceMonitor::setDumpPath(const std::string& path) {
dump_path_ = path;
std::filesystem::create_directories(dump_path_);
}
void PerformanceMonitor::enable(bool enabled) {
enabled_ = enabled;
}
void PerformanceMonitor::clear() {
std::lock_guard<std::mutex> lock(event_mutex_);
event_buffer_.clear();
}
std::string PerformanceMonitor::eventsToJson() const {
nlohmann::json root = nlohmann::json::array();
for (const auto& event : event_buffer_) {
nlohmann::json event_json;
event_json["timestamp"] = event.timestamp;
switch (event.type) {
case PerformanceEventType::CALLBACK_READY:
event_json["type"] = "callback_ready";
break;
case PerformanceEventType::CALLBACK_START:
event_json["type"] = "callback_start";
break;
case PerformanceEventType::CALLBACK_END:
event_json["type"] = "callback_end";
break;
case PerformanceEventType::DEADLINE_MISSED:
event_json["type"] = "deadline_missed";
break;
case PerformanceEventType::DEADLINE_MET:
event_json["type"] = "deadline_met";
break;
case PerformanceEventType::CHAIN_START:
event_json["type"] = "chain_start";
break;
case PerformanceEventType::CHAIN_END:
event_json["type"] = "chain_end";
break;
}
event_json["node_name"] = event.node_name;
event_json["callback_name"] = event.callback_name;
event_json["chain_id"] = event.chain_id;
event_json["is_first_in_chain"] = event.is_first_in_chain;
event_json["is_last_in_chain"] = event.is_last_in_chain;
if (event.deadline > 0) {
event_json["deadline"] = event.deadline;
}
if (event.processing_time > 0) {
event_json["processing_time"] = event.processing_time;
}
event_json["executor_id"] = event.executor_id;
if (!event.additional_data.empty()) {
try {
event_json["additional_data"] = nlohmann::json::parse(event.additional_data);
} catch (...) {
event_json["additional_data"] = event.additional_data;
}
}
root.push_back(event_json);
}
return root.dump(2); // Indent with 2 spaces
}
bool PerformanceMonitor::dumpToFile(const std::string& filename) {
std::lock_guard<std::mutex> lock(event_mutex_);
if (event_buffer_.empty()) {
return true; // Nothing to dump
}
std::string full_path = dump_path_ + "/" + filename;
std::ofstream file(full_path);
if (!file.is_open()) {
return false;
}
file << eventsToJson();
event_buffer_.clear();
return true;
}
void PerformanceMonitor::checkAndAutoDump() {
if (event_buffer_.size() >= auto_dump_threshold_) {
auto now = std::chrono::system_clock::now();
auto timestamp = std::chrono::duration_cast<std::chrono::milliseconds>(
now.time_since_epoch()).count();
std::stringstream ss;
ss << "performance_log_" << timestamp << ".json";
dumpToFile(ss.str());
}
}
} // namespace priority_executor

View file

@ -19,75 +19,72 @@
#include "rclcpp/utilities.hpp" #include "rclcpp/utilities.hpp"
#include <memory> #include <memory>
#include <sched.h> #include <sched.h>
#include <unistd.h> // for sleep // for sleep
#include <unistd.h>
namespace priority_executor namespace timed_executor
{ {
TimedExecutor::TimedExecutor(rclcpp::ExecutorOptions const &options, std::string name) TimedExecutor::TimedExecutor(const rclcpp::ExecutorOptions &options, std::string name)
: rclcpp::Executor(options) { : rclcpp::Executor(options)
this->name = std::move(name); {
this->name = name;
logger_ = create_logger(); logger_ = create_logger();
} }
TimedExecutor::~TimedExecutor() = default; TimedExecutor::~TimedExecutor() {}
void TimedExecutor::spin() { void
if (spinning.exchange(true)) { TimedExecutor::spin()
{
if (spinning.exchange(true))
{
throw std::runtime_error("spin() called while already spinning"); throw std::runtime_error("spin() called while already spinning");
} }
RCLCPP_SCOPE_EXIT(this->spinning.store(false);); RCLCPP_SCOPE_EXIT(this->spinning.store(false););
while (rclcpp::ok(this->context_) && spinning.load())
using namespace std::chrono_literals; {
std::chrono::time_point start = std::chrono::steady_clock::now();
while (rclcpp::ok(this->context_) && spinning.load() && (std::chrono::steady_clock::now() - start) <= 5s) {
rclcpp::AnyExecutable any_executable; rclcpp::AnyExecutable any_executable;
// std::cout<<memory_strategy_->number_of_ready_timers()<<std::endl; // std::cout<<memory_strategy_->number_of_ready_timers()<<std::endl;
// std::cout << "spinning " << this->name << std::endl; // std::cout << "spinning " << this->name << std::endl;
// size_t ready = memory_strategy_->number_of_ready_subscriptions(); // size_t ready = memory_strategy_->number_of_ready_subscriptions();
// std::cout << "ready:" << ready << std::endl; // std::cout << "ready:" << ready << std::endl;
if (get_next_executable(any_executable, std::chrono::nanoseconds(-1))) { if (get_next_executable(any_executable, std::chrono::nanoseconds(-1)))
{
execute_any_executable(any_executable); execute_any_executable(any_executable);
// make sure memory_strategy_ is an instance of PriorityMemoryStrategy // make sure memory_strategy_ is an instance of PriorityMemoryStrategy
if (prio_memory_strategy_ != nullptr) { if (prio_memory_strategy_!=nullptr)
{
prio_memory_strategy_->post_execute(any_executable); prio_memory_strategy_->post_execute(any_executable);
} }
} }
} }
RCLCPP_INFO(rclcpp::get_logger("priority_executor"), "priority executor shutdown"); std::cout << "shutdown" << std::endl;
} }
bool TimedExecutor::get_next_executable( bool TimedExecutor::get_next_executable(rclcpp::AnyExecutable &any_executable, std::chrono::nanoseconds timeout)
rclcpp::AnyExecutable& any_executable, {
std::chrono::nanoseconds timeout) {
bool success = false; bool success = false;
// Check to see if there are any subscriptions or timers needing service // Check to see if there are any subscriptions or timers needing service
// TODO(wjwwood): improve run to run efficiency of this function // TODO(wjwwood): improve run to run efficiency of this function
// sched_yield(); // sched_yield();
// sleep for 10us // sleep for 10us
// usleep(20); // usleep(20);
auto start = std::chrono::steady_clock::now();
auto const start = std::chrono::steady_clock::now();
wait_for_work(timeout); wait_for_work(timeout);
auto const end = std::chrono::steady_clock::now(); auto end = std::chrono::steady_clock::now();
auto const wait_duration = std::chrono::duration_cast<std::chrono::microseconds>(end - start); auto wait_duration = std::chrono::duration_cast<std::chrono::microseconds>(end - start);
std::ostringstream oss; std::ostringstream oss;
oss << "{\"operation\": \"wait_for_work\", \"wait_duration\": " << wait_duration.count() << "}"; oss << "{\"operation\": \"wait_for_work\", \"wait_duration\": " << wait_duration.count() << "}";
log_entry(logger_, oss.str()); log_entry(logger_, oss.str());
auto const exec_start = std::chrono::steady_clock::now(); start = std::chrono::steady_clock::now();
success = get_next_ready_executable(any_executable); success = get_next_ready_executable(any_executable);
auto const exec_end = std::chrono::steady_clock::now(); end = std::chrono::steady_clock::now();
auto const get_next_duration = std::chrono::duration_cast<std::chrono::microseconds>(exec_end - exec_start); auto get_next_duration = std::chrono::duration_cast<std::chrono::microseconds>(end - start);
oss.str(""); oss.str("");
oss << "{\"operation\": \"get_next_executable\", \"duration\": " << get_next_duration.count() oss << "{\"operation\": \"get_next_executable\", \"duration\": " << get_next_duration.count() << ", \"result\": " << success << "}";
<< ", \"result\": " << success << "}";
log_entry(logger_, oss.str()); log_entry(logger_, oss.str());
return success; return success;
} }

File diff suppressed because it is too large Load diff

View file

@ -1,12 +1,10 @@
#include "rclcpp/rclcpp.hpp"
#include "priority_executor/priority_executor.hpp"
#include "priority_executor/priority_memory_strategy.hpp"
#include <string> #include <string>
#include <fstream> #include <fstream>
#include <unistd.h> #include <unistd.h>
#include "std_msgs/msg/string.hpp"
#include <rclcpp/rclcpp.hpp>
#include <std_msgs/msg/string.hpp>
#include "priority_executor/priority_executor.hpp"
#include "priority_executor/priority_memory_strategy.hpp"
// re-create the classic talker-listener example with two listeners // re-create the classic talker-listener example with two listeners
class Talker : public rclcpp::Node class Talker : public rclcpp::Node
@ -17,8 +15,7 @@ public:
// Create a publisher on the "chatter" topic with 10 msg queue size. // Create a publisher on the "chatter" topic with 10 msg queue size.
pub_ = this->create_publisher<std_msgs::msg::String>("chatter", 10); pub_ = this->create_publisher<std_msgs::msg::String>("chatter", 10);
// Create a timer of period 1s that calls our callback member function. // Create a timer of period 1s that calls our callback member function.
timer_ = this->create_wall_timer(std::chrono::seconds(1), timer_ = this->create_wall_timer(std::chrono::seconds(1), std::bind(&Talker::timer_callback, this));
std::bind(&Talker::timer_callback, this));
} }
// the timer must be public // the timer must be public
rclcpp::TimerBase::SharedPtr timer_; rclcpp::TimerBase::SharedPtr timer_;
@ -26,11 +23,15 @@ public:
private: private:
void timer_callback() void timer_callback()
{ {
// Create a message and publish it 10 times.
std_msgs::msg::String msg; std_msgs::msg::String msg;
msg.data = "Hello World!"; msg.data = "Hello World!";
for (int i = 0; i < 10; ++i)
{
RCLCPP_INFO(this->get_logger(), "Publishing: '%s'", msg.data.c_str()); RCLCPP_INFO(this->get_logger(), "Publishing: '%s'", msg.data.c_str());
pub_->publish(msg); pub_->publish(msg);
} }
}
rclcpp::Publisher<std_msgs::msg::String>::SharedPtr pub_; rclcpp::Publisher<std_msgs::msg::String>::SharedPtr pub_;
}; };
@ -39,11 +40,8 @@ class Listener : public rclcpp::Node
public: public:
Listener(std::string name) : Node(name) Listener(std::string name) : Node(name)
{ {
// Create a subscription on the "chatter" topic with the default callback // Create a subscription on the "chatter" topic with the default callback method.
// method. sub_ = this->create_subscription<std_msgs::msg::String>("chatter", 10, std::bind(&Listener::callback, this, std::placeholders::_1));
sub_ = this->create_subscription<std_msgs::msg::String>(
"chatter", 10,
std::bind(&Listener::callback, this, std::placeholders::_1));
} }
// the publisher must be public // the publisher must be public
rclcpp::Subscription<std_msgs::msg::String>::SharedPtr sub_; rclcpp::Subscription<std_msgs::msg::String>::SharedPtr sub_;
@ -55,55 +53,27 @@ private:
} }
}; };
int main(int argc, char* argv[]) { int main(int argc, char **argv)
{
rclcpp::init(argc, argv); rclcpp::init(argc, argv);
auto talker = std::make_shared<Talker>(); auto talker = std::make_shared<Talker>();
auto listener1 = std::make_shared<Listener>("listener1"); auto listener1 = std::make_shared<Listener>("listener1");
auto listener2 = std::make_shared<Listener>("listener2"); auto listener2 = std::make_shared<Listener>("listener2");
rclcpp::ExecutorOptions options; rclcpp::ExecutorOptions options;
auto strategy = std::make_shared<priority_executor::PriorityMemoryStrategy<>>(); auto strategy = std::make_shared<PriorityMemoryStrategy<>>();
options.memory_strategy = strategy; options.memory_strategy = strategy;
auto executor = new priority_executor::TimedExecutor(options); auto executor = new timed_executor::TimedExecutor(options);
// replace the above line with the following line to use the default executor
// which will intermix the execution of listener1 and listener2
// auto executor = std::make_shared<rclcpp::executors::SingleThreadedExecutor>(options);
// must be set to post_execute can set new deadlines strategy->set_executable_deadline(talker->timer_->get_timer_handle(), 100, TIMER);
executor->prio_memory_strategy_ = strategy; strategy->set_executable_deadline(listener1->sub_->get_subscription_handle(), 100, SUBSCRIPTION);
strategy->set_executable_deadline(listener2->sub_->get_subscription_handle(), 50, SUBSCRIPTION);
// the new funcitons in PriorityMemoryStrategy accept the handle of the
// timer/subscription as the first argument
strategy->set_executable_deadline(talker->timer_->get_timer_handle(), 1000,
priority_executor::ExecutableType::TIMER, 0);
// you _must_ set the timer_handle for each chain
strategy->get_priority_settings(talker->timer_->get_timer_handle())
->timer_handle = talker->timer_;
// you _must_ mark the first executable in the chain
strategy->set_first_in_chain(talker->timer_->get_timer_handle());
// set the same period and chain_id for each callback in the chain
strategy->set_executable_deadline(listener1->sub_->get_subscription_handle(),
1000, priority_executor::ExecutableType::SUBSCRIPTION, 0);
strategy->set_executable_deadline(listener2->sub_->get_subscription_handle(),
1000, priority_executor::ExecutableType::SUBSCRIPTION, 0);
// you _must_ mark the last executable in the chain (used to keep track of different instances of the same chain)
strategy->set_last_in_chain(listener2->sub_->get_subscription_handle());
// add all the nodes to the executor
executor->add_node(talker); executor->add_node(talker);
executor->add_node(listener1); executor->add_node(listener1);
executor->add_node(listener2); executor->add_node(listener2);
// if the executor behaves unexpectedly, you can print the priority settings to make sure they are correct
std::cout << *strategy->get_priority_settings(
talker->timer_->get_timer_handle())
<< std::endl;
std::cout << *strategy->get_priority_settings(
listener1->sub_->get_subscription_handle())
<< std::endl;
std::cout << *strategy->get_priority_settings(
listener2->sub_->get_subscription_handle())
<< std::endl;
executor->spin(); executor->spin();
rclcpp::shutdown();
return 0;
} }

View file

@ -1,51 +1,48 @@
cmake_minimum_required(VERSION 3.8) cmake_minimum_required(VERSION 3.5)
project(simple_timer VERSION 0.1.0) project(simple_timer)
# Set C++ standards # Default to C99
set(CMAKE_CXX_STANDARD 17) if(NOT CMAKE_C_STANDARD)
set(CMAKE_CXX_STANDARD_REQUIRED ON) set(CMAKE_C_STANDARD 99)
set(CMAKE_CXX_EXTENSIONS OFF) endif()
# Default to C++14
if(NOT CMAKE_CXX_STANDARD)
set(CMAKE_CXX_STANDARD 14)
endif()
# Compiler options
if(CMAKE_COMPILER_IS_GNUCXX OR CMAKE_CXX_COMPILER_ID MATCHES "Clang") if(CMAKE_COMPILER_IS_GNUCXX OR CMAKE_CXX_COMPILER_ID MATCHES "Clang")
add_compile_options(-Wall -Wextra -Wpedantic) add_compile_options(-Wall -Wextra -Wpedantic)
endif() endif()
# Find dependencies # find dependencies
find_package(ament_cmake REQUIRED) find_package(ament_cmake REQUIRED)
find_package(rclcpp REQUIRED) # uncomment the following section in order to fill in
# further dependencies manually.
# find_package(<dependency> REQUIRED)
# Library targets add_library(simple_timer src/cycle_timer.cpp src/period_timer.cpp)
add_library(simple_timer
src/cycle_timer.cpp
src/period_timer.cpp
)
target_include_directories(simple_timer PUBLIC target_include_directories(simple_timer PUBLIC
$<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/include> $<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/include>
$<INSTALL_INTERFACE:include> $<INSTALL_INTERFACE:include>
) )
ament_target_dependencies(simple_timer add_library(rt-sched src/rt-sched.cpp)
rclcpp
)
add_library(rt-sched
src/rt-sched.cpp
)
target_include_directories(rt-sched PUBLIC target_include_directories(rt-sched PUBLIC
$<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/include> $<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/include>
$<INSTALL_INTERFACE:include> $<INSTALL_INTERFACE:include>
) )
ament_target_dependencies(rt-sched
rclcpp
)
# Testing
if(BUILD_TESTING) if(BUILD_TESTING)
find_package(ament_lint_auto REQUIRED) find_package(ament_lint_auto REQUIRED)
# the following line skips the linter which checks for copyrights
# uncomment the line when a copyright and license is not present in all source files
#set(ament_cmake_copyright_FOUND TRUE)
# the following line skips cpplint (only works in a git repo)
# uncomment the line when this package is not in a git repo
#set(ament_cmake_cpplint_FOUND TRUE)
ament_lint_auto_find_test_dependencies() ament_lint_auto_find_test_dependencies()
endif() endif()
# Installation
install( install(
DIRECTORY include/ DIRECTORY include/
DESTINATION include DESTINATION include
@ -53,15 +50,12 @@ install(
install( install(
TARGETS simple_timer rt-sched TARGETS simple_timer rt-sched
EXPORT export_${PROJECT_NAME}
LIBRARY DESTINATION lib LIBRARY DESTINATION lib
ARCHIVE DESTINATION lib ARCHIVE DESTINATION lib
RUNTIME DESTINATION bin RUNTIME DESTINATION bin
INCLUDES DESTINATION include
) )
# Export and package configuration
ament_export_include_directories(include) ament_export_include_directories(include)
ament_export_targets(export_${PROJECT_NAME} HAS_LIBRARY_TARGET) ament_export_libraries(simple_timer)
ament_export_dependencies(rclcpp) ament_export_libraries(rt-sched)
ament_package() ament_package()

View file

@ -3,15 +3,14 @@
#include <memory> #include <memory>
#include "simple_timer/rt-sched.hpp" #include "simple_timer/rt-sched.hpp"
namespace simple_timer
namespace simple_timer { {
class CycleTimer
class CycleTimer { {
public: public:
explicit CycleTimer(long const start_delay = 0); CycleTimer(long start_delay=0);
void tick() ; void tick() ;
const u64 start_delay_time;
u64 const start_delay_time;
u64 start_time = 0; u64 start_time = 0;
u64 last_cycle_time = 0; u64 last_cycle_time = 0;
unsigned long max_diff = 0; unsigned long max_diff = 0;
@ -19,7 +18,6 @@ public:
unsigned long last_diff = 0; unsigned long last_diff = 0;
bool recording = false; bool recording = false;
}; };
} // namespace simple_timer } // namespace simple_timer
#endif #endif

View file

@ -3,24 +3,23 @@
#include <memory> #include <memory>
#include "simple_timer/rt-sched.hpp" #include "simple_timer/rt-sched.hpp"
namespace simple_timer
namespace simple_timer { {
class PeriodTimer
class PeriodTimer { {
public: public:
explicit PeriodTimer(long const start_delay = 0); PeriodTimer(long start_delay = 0);
void start(); void start();
void stop(); void stop();
const u64 start_delay_time;
u64 const start_delay_time;
u64 start_time = 0; u64 start_time = 0;
u64 last_period_time = 0; u64 last_period_time = 0;
unsigned long max_period = 0; unsigned long max_period = 0;
unsigned long min_period = 0; unsigned long min_period = 0;
unsigned long last_period = 0; unsigned long last_period = 0;
bool recording = false; bool recording = false;
}; };
} // namespace simple_timer } // namespace simple_timer
#endif #endif

View file

@ -47,7 +47,6 @@
typedef unsigned long long u64; typedef unsigned long long u64;
#define NS_TO_MS 1000000 #define NS_TO_MS 1000000
struct sched_attr { struct sched_attr {
uint32_t size; uint32_t size;
uint32_t sched_policy; uint32_t sched_policy;
@ -66,26 +65,28 @@ struct sched_attr {
}; };
int sched_setattr(pid_t pid, int sched_setattr(pid_t pid,
sched_attr const* attr, const struct sched_attr *attr,
unsigned int flags); unsigned int flags);
int sched_getattr(pid_t pid, int sched_getattr(pid_t pid,
sched_attr* attr, struct sched_attr *attr,
unsigned int size, unsigned int size,
unsigned int flags); unsigned int flags);
u64 get_time_us(void); u64 get_time_us(void);
typedef struct node_time_logger { typedef struct node_time_logger
{
std::shared_ptr<std::vector<std::pair<std::string, u64>>> recorded_times; std::shared_ptr<std::vector<std::pair<std::string, u64>>> recorded_times;
} node_time_logger; } node_time_logger;
void log_entry(node_time_logger logger, std::string const& text); void log_entry(node_time_logger logger, std::string text);
node_time_logger create_logger(); node_time_logger create_logger();
inline u64 get_time_us(void) { inline u64 get_time_us(void)
{
struct timespec ts; struct timespec ts;
u64 time; unsigned long long time;
clock_gettime(CLOCK_MONOTONIC_RAW, &ts); clock_gettime(CLOCK_MONOTONIC_RAW, &ts);
time = ts.tv_sec * 1000000; time = ts.tv_sec * 1000000;

View file

@ -2,19 +2,13 @@
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?> <?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3"> <package format="3">
<name>simple_timer</name> <name>simple_timer</name>
<version>0.1.0</version> <version>0.0.0</version>
<description> <description>TODO: Package description</description>
ROS 2 package providing timer functionality for the dynamic executor experiments <maintainer email="kurt4wilson@gmail.com">nvidia</maintainer>
</description> <license>TODO: License declaration</license>
<maintainer email="kurt4wilson@gmail.com">kurt</maintainer>
<license>Apache License 2.0</license>
<buildtool_depend>ament_cmake</buildtool_depend> <buildtool_depend>ament_cmake</buildtool_depend>
<!-- Build Dependencies -->
<depend>rclcpp</depend>
<!-- Test Dependencies -->
<test_depend>ament_lint_auto</test_depend> <test_depend>ament_lint_auto</test_depend>
<test_depend>ament_lint_common</test_depend> <test_depend>ament_lint_common</test_depend>

View file

@ -1,34 +1,43 @@
#include "simple_timer/cycle_timer.hpp" #include "simple_timer/cycle_timer.hpp"
namespace simple_timer { namespace simple_timer
{
CycleTimer::CycleTimer(long const start_delay) CycleTimer::CycleTimer(long start_delay) : start_delay_time(start_delay * 1000)
: start_delay_time(start_delay * 1000) { {
} }
void CycleTimer::tick() { void CycleTimer::tick()
u64 const current_wall_time = get_time_us(); {
u64 current_wall_time = get_time_us();
u64 time_diff = 0; u64 time_diff = 0;
if (!recording) { if (!recording)
if (start_time == 0) { {
if (start_time == 0)
{
start_time = current_wall_time; start_time = current_wall_time;
} else if (current_wall_time - start_time > start_delay_time) { }
else if (current_wall_time - start_time > start_delay_time)
{
recording = true; recording = true;
last_cycle_time = current_wall_time; last_cycle_time = current_wall_time;
start_time = current_wall_time; start_time = current_wall_time;
} }
} else { }
else
{
time_diff = current_wall_time - last_cycle_time; time_diff = current_wall_time - last_cycle_time;
if (time_diff < min_diff || min_diff == 0) { if (time_diff < min_diff || min_diff == 0)
{
min_diff = time_diff; min_diff = time_diff;
} }
if (time_diff > max_diff || max_diff == 0) { if (time_diff > max_diff || max_diff == 0)
{
max_diff = time_diff; max_diff = time_diff;
} }
last_cycle_time = current_wall_time; last_cycle_time = current_wall_time;
last_diff = time_diff; last_diff = time_diff;
} }
} }
} // namespace simple_timer } // namespace simple_timer

View file

@ -1,43 +1,56 @@
#include "simple_timer/period_timer.hpp" #include "simple_timer/period_timer.hpp"
namespace simple_timer { namespace simple_timer
{
PeriodTimer::PeriodTimer(long const start_delay) PeriodTimer::PeriodTimer(long start_delay) : start_delay_time(start_delay * 1000)
: start_delay_time(start_delay * 1000) { {
} }
void PeriodTimer::start() { void PeriodTimer::start()
u64 const current_wall_time = get_time_us(); {
u64 current_wall_time = get_time_us();
if (!recording) { if (!recording)
if (start_time == 0) { {
if (start_time == 0)
{
start_time = current_wall_time; start_time = current_wall_time;
} else if (current_wall_time - start_time > start_delay_time) { }
else if (current_wall_time - start_time > start_delay_time)
{
recording = true; recording = true;
start_time = current_wall_time; start_time = current_wall_time;
last_period_time = current_wall_time; last_period_time = current_wall_time;
} }
} else { }
else
{
last_period_time = current_wall_time; last_period_time = current_wall_time;
} }
} }
void PeriodTimer::stop()
void PeriodTimer::stop() { {
u64 const current_wall_time = get_time_us(); u64 current_wall_time = get_time_us();
u64 time_diff = 0; u64 time_diff = 0;
if (!recording) { if (!recording)
{
return; return;
} }
else
{
time_diff = current_wall_time - last_period_time; time_diff = current_wall_time - last_period_time;
if (time_diff < min_period || min_period == 0) { if (time_diff < min_period || min_period == 0)
{
min_period = time_diff; min_period = time_diff;
} }
if (time_diff > max_period || max_period == 0) { if (time_diff > max_period || max_period == 0)
{
max_period = time_diff; max_period = time_diff;
} }
last_period = time_diff; last_period = time_diff;
} }
}
} // namespace simple_timer } // namespace simple_timer

View file

@ -13,33 +13,37 @@
#include <sys/syscall.h> #include <sys/syscall.h>
#include <time.h> #include <time.h>
#include <iostream> #include <iostream>
#include "simple_timer/rt-sched.hpp" #include "simple_timer/rt-sched.hpp"
int sched_setattr( int sched_setattr(pid_t pid,
pid_t pid, const struct sched_attr *attr,
sched_attr const* attr, unsigned int flags)
unsigned int flags) { {
return syscall(__NR_sched_setattr, pid, attr, flags); return syscall(__NR_sched_setattr, pid, attr, flags);
} }
int sched_getattr( int sched_getattr(pid_t pid,
pid_t pid, struct sched_attr *attr,
sched_attr* attr,
unsigned int size, unsigned int size,
unsigned int flags) { unsigned int flags)
{
return syscall(__NR_sched_getattr, pid, attr, size, flags); return syscall(__NR_sched_getattr, pid, attr, size, flags);
} }
void log_entry(node_time_logger logger, std::string const& text) { void log_entry(node_time_logger logger, std::string text)
if (logger.recorded_times != nullptr) { {
logger.recorded_times->emplace_back(text, get_time_us() / 1000); if (logger.recorded_times != nullptr)
{
logger.recorded_times->push_back(std::make_pair(text, get_time_us() / 1000));
// std::cout<<text<<std::endl; // std::cout<<text<<std::endl;
}else{ }else{
// TODO: report error // TODO: report error
} }
} }
node_time_logger create_logger() { node_time_logger create_logger()
{
node_time_logger logger; node_time_logger logger;
logger.recorded_times = std::make_shared<std::vector<std::pair<std::string, u64>>>(); logger.recorded_times = std::make_shared<std::vector<std::pair<std::string, u64>>>();
return logger; return logger;