Compare commits

..

1 commit

Author SHA1 Message Date
9fea6a3950 wip - likely stupid 2025-04-01 16:03:45 +02:00
15 changed files with 107 additions and 821 deletions

View file

@ -19,16 +19,11 @@ find_package(rmw REQUIRED)
find_package(std_msgs REQUIRED) find_package(std_msgs REQUIRED)
find_package(std_srvs REQUIRED) find_package(std_srvs REQUIRED)
find_package(simple_timer REQUIRED) find_package(simple_timer REQUIRED)
find_package(nlohmann_json REQUIRED)
# Library targets # Library targets
add_library(priority_executor add_library(priority_executor
src/priority_executor.cpp src/priority_executor.cpp
src/priority_memory_strategy.cpp src/priority_memory_strategy.cpp
src/performance_monitor.cpp
src/default_executor.cpp
src/usage_example.cpp
src/multithread_priority_executor.cpp
) )
target_include_directories(priority_executor PUBLIC target_include_directories(priority_executor PUBLIC
$<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/include> $<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/include>
@ -39,7 +34,35 @@ ament_target_dependencies(priority_executor
rclcpp rclcpp
rcl rcl
simple_timer simple_timer
nlohmann_json )
add_library(multithread_priority_executor
src/multithread_priority_executor.cpp
src/priority_memory_strategy.cpp
)
target_include_directories(multithread_priority_executor PUBLIC
$<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/include>
$<INSTALL_INTERFACE:include>
)
ament_target_dependencies(multithread_priority_executor
rmw
rclcpp
rcl
simple_timer
)
add_library(default_executor
src/default_executor.cpp
)
target_include_directories(default_executor PUBLIC
$<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/include>
$<INSTALL_INTERFACE:include>
)
ament_target_dependencies(default_executor
rmw
rclcpp
rcl
simple_timer
) )
# Example executable # Example executable
@ -52,6 +75,8 @@ target_include_directories(usage_example PUBLIC
) )
target_link_libraries(usage_example target_link_libraries(usage_example
priority_executor priority_executor
multithread_priority_executor
default_executor
) )
ament_target_dependencies(usage_example ament_target_dependencies(usage_example
rclcpp rclcpp
@ -73,7 +98,7 @@ install(
) )
install( install(
TARGETS priority_executor TARGETS priority_executor multithread_priority_executor default_executor
EXPORT export_${PROJECT_NAME} EXPORT export_${PROJECT_NAME}
ARCHIVE DESTINATION lib ARCHIVE DESTINATION lib
LIBRARY DESTINATION lib LIBRARY DESTINATION lib
@ -89,5 +114,5 @@ install(
# Export and package configuration # Export and package configuration
ament_export_include_directories(include) ament_export_include_directories(include)
ament_export_targets(export_${PROJECT_NAME} HAS_LIBRARY_TARGET) ament_export_targets(export_${PROJECT_NAME} HAS_LIBRARY_TARGET)
ament_export_dependencies(rclcpp rcl rmw simple_timer nlohmann_json) ament_export_dependencies(rclcpp rcl rmw simple_timer)
ament_package() ament_package()

View file

@ -17,10 +17,10 @@
#include <rmw/rmw.h> #include <rmw/rmw.h>
#include <set>
#include <memory>
#include <cassert> #include <cassert>
#include <cstdlib> #include <cstdlib>
#include <memory>
#include <set>
#include <unordered_map> #include <unordered_map>
#include "rclcpp/executor.hpp" #include "rclcpp/executor.hpp"
@ -40,20 +40,19 @@ class ROSDefaultMultithreadedExecutor : public rclcpp::Executor, public RTISTime
public: public:
RCLCPP_SMART_PTR_DEFINITIONS(ROSDefaultMultithreadedExecutor) RCLCPP_SMART_PTR_DEFINITIONS(ROSDefaultMultithreadedExecutor)
RCLCPP_PUBLIC
explicit ROSDefaultMultithreadedExecutor( explicit ROSDefaultMultithreadedExecutor(
rclcpp::ExecutorOptions const &options = rclcpp::ExecutorOptions(), rclcpp::ExecutorOptions const& options = rclcpp::ExecutorOptions(),
int number_of_threads = 2, int number_of_threads = 2,
std::chrono::nanoseconds next_exec_timeout = std::chrono::nanoseconds(-1)); std::chrono::nanoseconds next_exec_timeout = std::chrono::nanoseconds(-1));
RCLCPP_PUBLIC size_t get_number_of_threads(); size_t get_number_of_threads();
RCLCPP_PUBLIC void spin() override; void spin() override;
bool get_next_executable(rclcpp::AnyExecutable& any_executable, bool get_next_executable(rclcpp::AnyExecutable& any_executable,
std::chrono::nanoseconds timeout = std::chrono::nanoseconds(-1)); std::chrono::nanoseconds timeout = std::chrono::nanoseconds(-1));
protected: protected:
RCLCPP_PUBLIC void run(size_t thread_number); void run(size_t thread_number);
private: private:
RCLCPP_DISABLE_COPY(ROSDefaultMultithreadedExecutor) RCLCPP_DISABLE_COPY(ROSDefaultMultithreadedExecutor)
@ -77,12 +76,10 @@ public:
RCLCPP_SMART_PTR_DEFINITIONS(ROSDefaultExecutor) RCLCPP_SMART_PTR_DEFINITIONS(ROSDefaultExecutor)
/// Default constructor. See the default constructor for Executor. /// Default constructor. See the default constructor for Executor.
RCLCPP_PUBLIC
explicit ROSDefaultExecutor( explicit ROSDefaultExecutor(
rclcpp::ExecutorOptions const &options = rclcpp::ExecutorOptions()); rclcpp::ExecutorOptions const& options = rclcpp::ExecutorOptions());
/// Default destructor. /// Default destructor.
RCLCPP_PUBLIC
virtual ~ROSDefaultExecutor(); virtual ~ROSDefaultExecutor();
/// Single-threaded implementation of spin. /// Single-threaded implementation of spin.
@ -93,13 +90,11 @@ public:
* if the associated context is configured to shutdown on SIGINT. * if the associated context is configured to shutdown on SIGINT.
* \throws std::runtime_error when spin() called while already spinning * \throws std::runtime_error when spin() called while already spinning
*/ */
RCLCPP_PUBLIC
void spin() override; void spin() override;
bool get_next_executable(rclcpp::AnyExecutable& any_executable, bool get_next_executable(rclcpp::AnyExecutable& any_executable,
std::chrono::nanoseconds timeout = std::chrono::nanoseconds(-1)); std::chrono::nanoseconds timeout = std::chrono::nanoseconds(-1));
RCLCPP_PUBLIC
void wait_for_work(std::chrono::nanoseconds timeout = std::chrono::nanoseconds(-1)); void wait_for_work(std::chrono::nanoseconds timeout = std::chrono::nanoseconds(-1));
private: private:

View file

@ -1,16 +1,15 @@
#ifndef PRIORITY_EXECUTOR__MULTITHREAD_PRIORITY_EXECUTOR_HPP_ #ifndef PRIORITY_EXECUTOR__MULTITHREAD_PRIORITY_EXECUTOR_HPP_
#define PRIORITY_EXECUTOR__MULTITHREAD_PRIORITY_EXECUTOR_HPP_ #define PRIORITY_EXECUTOR__MULTITHREAD_PRIORITY_EXECUTOR_HPP_
#include <set>
#include <mutex>
#include <chrono> #include <chrono>
#include <memory> #include <memory>
#include <mutex>
#include <set>
#include <thread> #include <thread>
#include <unordered_map> #include <unordered_map>
#include "priority_executor/default_executor.hpp" #include "priority_executor/default_executor.hpp"
#include "priority_executor/priority_executor.hpp" #include "priority_executor/priority_executor.hpp"
#include "rclcpp/executor.hpp" #include "rclcpp/executor.hpp"
#include "rclcpp/macros.hpp" #include "rclcpp/macros.hpp"
#include "rclcpp/memory_strategies.hpp" #include "rclcpp/memory_strategies.hpp"
@ -30,20 +29,18 @@ public:
int number_of_threads = 2, int number_of_threads = 2,
std::chrono::nanoseconds next_exec_timeout = std::chrono::nanoseconds(-1)); std::chrono::nanoseconds next_exec_timeout = std::chrono::nanoseconds(-1));
RCLCPP_PUBLIC RCLCPP_PUBLIC virtual ~MultithreadTimedExecutor() = default;
virtual ~MultithreadTimedExecutor() = default;
RCLCPP_PUBLIC size_t get_number_of_threads();
RCLCPP_PUBLIC void spin() override; RCLCPP_PUBLIC void spin() override;
protected: protected:
RCLCPP_PUBLIC void run(size_t thread_number); RCLCPP_PUBLIC void run(size_t _thread_number);
private: private:
RCLCPP_DISABLE_COPY(MultithreadTimedExecutor)
std::set<rclcpp::TimerBase::SharedPtr> scheduled_timers_; std::set<rclcpp::TimerBase::SharedPtr> scheduled_timers_;
size_t number_of_threads_; int number_of_threads_;
std::chrono::nanoseconds next_exec_timeout_{std::chrono::nanoseconds(-1)}; std::chrono::nanoseconds next_exec_timeout_ = std::chrono::nanoseconds(-1);
node_time_logger logger_;
static std::unordered_map<MultithreadTimedExecutor*, static std::unordered_map<MultithreadTimedExecutor*,
std::shared_ptr<rclcpp::detail::MutexTwoPriorities>> std::shared_ptr<rclcpp::detail::MutexTwoPriorities>>

View file

@ -1,237 +0,0 @@
#ifndef PERFORMANCE_MONITOR_HPP_
#define PERFORMANCE_MONITOR_HPP_
#include <atomic>
#include <chrono>
#include <deque>
#include <mutex>
#include <string>
#include <unordered_map>
#include <vector>
#include <nlohmann/json.hpp>
#include "rclcpp/rclcpp.hpp"
#include "simple_timer/rt-sched.hpp"
namespace priority_executor {
enum class PerformanceEventType {
CALLBACK_READY,
CALLBACK_START,
CALLBACK_END,
DEADLINE_MISSED,
DEADLINE_MET,
CHAIN_START,
CHAIN_END
};
struct PerformanceEvent {
uint64_t timestamp; // High-resolution timestamp (microseconds)
PerformanceEventType type; // Type of event
std::string node_name; // Name of the node
std::string callback_name; // Name of the callback
int chain_id{-1}; // Chain identifier
bool is_first_in_chain{false}; // First callback in chain
bool is_last_in_chain{false}; // Last callback in chain
uint64_t deadline{0}; // Deadline (if applicable)
uint64_t processing_time{0}; // Time spent executing (if applicable)
int executor_id{0}; // Executor identifier
std::string additional_data; // JSON string for extensible data
// Helper method to extract callback type from additional_data
std::string get_callback_type() const {
if (additional_data.empty()) return "unknown";
try {
nlohmann::json json = nlohmann::json::parse(additional_data);
if (json.contains("callback_type")) {
return json["callback_type"].get<std::string>();
}
} catch (...) {}
return "unknown";
}
// Helper method to extract thread info from additional_data
int get_thread_id() const {
if (additional_data.empty()) return -1;
try {
nlohmann::json json = nlohmann::json::parse(additional_data);
if (json.contains("thread_info") && json["thread_info"].is_object() &&
json["thread_info"].contains("thread_id")) {
return json["thread_info"]["thread_id"].get<int>();
}
// Direct thread_id if present
if (json.contains("thread_id")) {
return json["thread_id"].get<int>();
}
} catch (...) {}
return -1;
}
// Helper method to create a descriptive identifier for this callback
std::string get_descriptive_id() const {
std::stringstream ss;
ss << node_name;
if (!callback_name.empty()) {
ss << "/" << callback_name;
}
if (chain_id >= 0) {
ss << " (chain:" << chain_id;
if (is_first_in_chain) ss << " start";
if (is_last_in_chain) ss << " end";
ss << ")";
}
return ss.str();
}
// Convert additional_data to JSON object
nlohmann::json get_additional_data_json() const {
try {
if (!additional_data.empty()) {
return nlohmann::json::parse(additional_data);
}
} catch (...) {}
return nlohmann::json::object();
}
};
class PerformanceMonitor {
public:
static PerformanceMonitor& getInstance();
void recordEvent(const PerformanceEvent& event);
void setBufferSize(size_t size);
void setAutoDumpThreshold(size_t threshold);
void setDumpPath(const std::string& path);
bool dumpToFile(const std::string& filename);
void enable(bool enabled = true);
void clear();
// Helper method to set monitoring options in one call
void setMonitoringOptions(size_t buffer_size, size_t auto_dump_threshold,
const std::string& dump_path) {
setBufferSize(buffer_size);
setAutoDumpThreshold(auto_dump_threshold);
setDumpPath(dump_path);
}
// Get all events for a specific callback
std::vector<PerformanceEvent> getEventsForCallback(const std::string& node_name,
const std::string& callback_name) const {
std::vector<PerformanceEvent> result;
std::lock_guard<std::mutex> lock(event_mutex_);
for (const auto& event : event_buffer_) {
if (event.node_name == node_name && event.callback_name == callback_name) {
result.push_back(event);
}
}
return result;
}
// Get all events for a specific chain
std::vector<PerformanceEvent> getEventsForChain(int chain_id) const {
std::vector<PerformanceEvent> result;
std::lock_guard<std::mutex> lock(event_mutex_);
for (const auto& event : event_buffer_) {
if (event.chain_id == chain_id) {
result.push_back(event);
}
}
return result;
}
// Get all events by type
std::vector<PerformanceEvent> getEventsByType(PerformanceEventType type) const {
std::vector<PerformanceEvent> result;
std::lock_guard<std::mutex> lock(event_mutex_);
for (const auto& event : event_buffer_) {
if (event.type == type) {
result.push_back(event);
}
}
return result;
}
// Get a formatted report of callbacks and their execution times
std::string getCallbackExecutionReport() const {
std::lock_guard<std::mutex> lock(event_mutex_);
std::stringstream ss;
ss << "Callback Execution Report:\n";
ss << "=========================\n\n";
// Maps to store total execution time and count per callback
std::unordered_map<std::string, uint64_t> total_time;
std::unordered_map<std::string, int> call_count;
std::unordered_map<std::string, uint64_t> max_time;
std::unordered_map<std::string, uint64_t> min_time;
// Process all CALLBACK_END events (they contain processing_time)
for (const auto& event : event_buffer_) {
if (event.type == PerformanceEventType::CALLBACK_END && event.processing_time > 0) {
std::string callback_id = event.get_descriptive_id();
total_time[callback_id] += event.processing_time;
call_count[callback_id]++;
max_time[callback_id] = std::max(max_time[callback_id], event.processing_time);
if (min_time[callback_id] == 0 || event.processing_time < min_time[callback_id]) {
min_time[callback_id] = event.processing_time;
}
}
}
// Print results sorted by callback id
std::vector<std::string> callback_ids;
for (const auto& entry : call_count) {
callback_ids.push_back(entry.first);
}
std::sort(callback_ids.begin(), callback_ids.end());
for (const auto& id : callback_ids) {
double avg_time = static_cast<double>(total_time[id]) / call_count[id];
ss << id << "\n";
ss << " Calls: " << call_count[id] << "\n";
ss << " Avg time: " << avg_time << " µs\n";
ss << " Min time: " << min_time[id] << " µs\n";
ss << " Max time: " << max_time[id] << " µs\n";
ss << " Total time: " << total_time[id] << " µs\n\n";
}
return ss.str();
}
private:
PerformanceMonitor();
~PerformanceMonitor() = default;
PerformanceMonitor(const PerformanceMonitor&) = delete;
PerformanceMonitor& operator=(const PerformanceMonitor&) = delete;
PerformanceMonitor(PerformanceMonitor&&) = delete;
PerformanceMonitor& operator=(PerformanceMonitor&&) = delete;
mutable std::mutex event_mutex_;
std::vector<PerformanceEvent> event_buffer_;
size_t buffer_size_{10000};
size_t auto_dump_threshold_{0};
std::string dump_path_{"performance_logs"};
std::atomic<bool> enabled_{true};
std::string eventsToJson() const;
void checkAndAutoDump();
};
} // namespace priority_executor
#endif // PERFORMANCE_MONITOR_HPP_

View file

@ -20,15 +20,12 @@
#include <cassert> #include <cassert>
#include <cstdlib> #include <cstdlib>
#include <memory> #include <memory>
#include <string>
#include "rclcpp/executor.hpp" #include "rclcpp/executor.hpp"
#include "rclcpp/macros.hpp" #include "rclcpp/macros.hpp"
#include "rclcpp/visibility_control.hpp" #include "rclcpp/visibility_control.hpp"
#include "priority_executor/priority_memory_strategy.hpp" #include "priority_executor/priority_memory_strategy.hpp"
#include "priority_executor/default_executor.hpp" #include "priority_executor/default_executor.hpp"
#include "priority_executor/performance_monitor.hpp"
namespace priority_executor { namespace priority_executor {
@ -41,13 +38,11 @@ public:
RCLCPP_SMART_PTR_DEFINITIONS(TimedExecutor) RCLCPP_SMART_PTR_DEFINITIONS(TimedExecutor)
/// Default constructor. See the default constructor for Executor. /// Default constructor. See the default constructor for Executor.
RCLCPP_PUBLIC
explicit TimedExecutor( explicit TimedExecutor(
rclcpp::ExecutorOptions const &options = rclcpp::ExecutorOptions(), rclcpp::ExecutorOptions const& options = rclcpp::ExecutorOptions(),
std::string name = "unnamed executor"); std::string name = "unnamed executor");
/// Default destructor. /// Default destructor.
RCLCPP_PUBLIC
virtual ~TimedExecutor(); virtual ~TimedExecutor();
/// Single-threaded implementation of spin. /// Single-threaded implementation of spin.
@ -58,50 +53,12 @@ public:
* if the associated context is configured to shutdown on SIGINT. * if the associated context is configured to shutdown on SIGINT.
* \throws std::runtime_error when spin() called while already spinning * \throws std::runtime_error when spin() called while already spinning
*/ */
RCLCPP_PUBLIC
void spin() override; void spin() override;
std::string name; std::string name;
void set_use_priorities(bool use_prio); void set_use_priorities(bool use_prio);
std::shared_ptr<PriorityMemoryStrategy<>> prio_memory_strategy_{nullptr}; std::shared_ptr<PriorityMemoryStrategy<>> prio_memory_strategy_{nullptr};
// Performance monitoring control
void enableMonitoring(bool enable = true) {
PerformanceMonitor::getInstance().enable(enable);
}
void setMonitoringOptions(
size_t buffer_size,
size_t auto_dump_threshold,
const std::string& dump_path) {
auto& monitor = PerformanceMonitor::getInstance();
monitor.setBufferSize(buffer_size);
monitor.setAutoDumpThreshold(auto_dump_threshold);
monitor.setDumpPath(dump_path);
}
// Get a formatted report of all callback execution times
std::string getCallbackExecutionReport() const {
auto& monitor = PerformanceMonitor::getInstance();
return monitor.getCallbackExecutionReport();
}
// Manually trigger a log dump with a specific filename
bool dumpMonitoringData(const std::string& filename) {
auto& monitor = PerformanceMonitor::getInstance();
return monitor.dumpToFile(filename);
}
// Get executor ID for identification in logs
int getExecutorId() const {
return executor_id_;
}
// Get executor name for identification in logs
const std::string& getExecutorName() const {
return name;
}
protected: protected:
bool get_next_executable(rclcpp::AnyExecutable& any_executable, bool get_next_executable(rclcpp::AnyExecutable& any_executable,
std::chrono::nanoseconds timeout = std::chrono::nanoseconds(-1)); std::chrono::nanoseconds timeout = std::chrono::nanoseconds(-1));
@ -111,14 +68,8 @@ private:
void wait_for_work(std::chrono::nanoseconds timeout); void wait_for_work(std::chrono::nanoseconds timeout);
bool get_next_ready_executable(rclcpp::AnyExecutable& any_executable); bool get_next_ready_executable(rclcpp::AnyExecutable& any_executable);
void recordExecutionEvent(const rclcpp::AnyExecutable& any_exec,
PerformanceEventType event_type,
uint64_t timestamp,
uint64_t processing_time = 0);
bool use_priorities{true}; bool use_priorities{true};
static std::atomic<int> next_executor_id_;
int executor_id_;
}; };
} // namespace priority_executor } // namespace priority_executor

View file

@ -1,17 +1,3 @@
// Copyright 2015 Open Source Robotics Foundation, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef PRIORITY_EXECUTOR__PRIORITY_MEMORY_STRATEGY_HPP_ #ifndef PRIORITY_EXECUTOR__PRIORITY_MEMORY_STRATEGY_HPP_
#define PRIORITY_EXECUTOR__PRIORITY_MEMORY_STRATEGY_HPP_ #define PRIORITY_EXECUTOR__PRIORITY_MEMORY_STRATEGY_HPP_
@ -24,19 +10,10 @@
#include <vector> #include <vector>
#include "rcl/allocator.h" #include "rcl/allocator.h"
#include "rclcpp/allocator/allocator_common.hpp" #include "rclcpp/allocator/allocator_common.hpp"
#include "rclcpp/memory_strategy.hpp" #include "rclcpp/memory_strategy.hpp"
#include "simple_timer/rt-sched.hpp" #include "simple_timer/rt-sched.hpp"
/// Delegate for handling memory allocations while the Executor is executing.
/**
* By default, the memory strategy dynamically allocates memory for structures
* that come in from the rmw implementation after the executor waits for work,
* based on the number of entities that come through.
*/
namespace priority_executor { namespace priority_executor {
enum class ExecutableType { enum class ExecutableType {
@ -47,7 +24,7 @@ enum class ExecutableType {
WAITABLE WAITABLE
}; };
std::ostream& operator<<(std::ostream& os, const ExecutableType& obj); std::ostream& operator << (std::ostream& os, const ExecutableType& obj);
enum class ExecutableScheduleType { enum class ExecutableScheduleType {
DEADLINE = 0, DEADLINE = 0,
@ -56,15 +33,15 @@ enum class ExecutableScheduleType {
DEFAULT, // not used here DEFAULT, // not used here
}; };
std::ostream& operator<<(std::ostream& os, const ExecutableScheduleType& obj); std::ostream& operator << (std::ostream& os, const ExecutableScheduleType& obj);
class PriorityExecutable { class PriorityExecutable {
public: public:
static size_t num_executables; static size_t num_executables;
explicit PriorityExecutable( explicit PriorityExecutable(
std::shared_ptr<const void> h, std::shared_ptr<const void> h,
int p, int p,
ExecutableType t, ExecutableType t,
ExecutableScheduleType sched_type = ExecutableScheduleType::CHAIN_INDEPENDENT_PRIORITY); ExecutableScheduleType sched_type = ExecutableScheduleType::CHAIN_INDEPENDENT_PRIORITY);
@ -73,8 +50,8 @@ public:
void dont_run(); void dont_run();
void allow_run(); void allow_run();
void increment_counter(); void increment_counter();
bool operator==(PriorityExecutable const &other) const; bool operator==(PriorityExecutable const& other) const;
friend std::ostream& operator<<(std::ostream& os, PriorityExecutable const &pe); friend std::ostream& operator<<(std::ostream& os, PriorityExecutable const& pe);
std::shared_ptr<const void> handle; std::shared_ptr<const void> handle;
ExecutableType type; ExecutableType type;
@ -97,7 +74,7 @@ public:
class PriorityExecutableComparator { class PriorityExecutableComparator {
public: public:
bool operator()(PriorityExecutable const *p1, PriorityExecutable const *p2) const; bool operator()(PriorityExecutable const* p1, PriorityExecutable const* p2) const;
}; };
template <typename Alloc = std::allocator<void>> template <typename Alloc = std::allocator<void>>

View file

@ -6,7 +6,7 @@
<description> <description>
ROS 2 package implementing priority-based executors for real-time task scheduling ROS 2 package implementing priority-based executors for real-time task scheduling
</description> </description>
<maintainer email="kurt4wilson@gmail.com">kurt</maintainer> <maintainer email="your.email@example.com">ROS-Dynamic-Executor Team</maintainer>
<license>Apache License 2.0</license> <license>Apache License 2.0</license>
<buildtool_depend>ament_cmake</buildtool_depend> <buildtool_depend>ament_cmake</buildtool_depend>

View file

@ -14,12 +14,12 @@
#include "priority_executor/default_executor.hpp" #include "priority_executor/default_executor.hpp"
#include <mutex>
#include <chrono> #include <chrono>
#include <memory>
#include <thread>
#include <sstream>
#include <functional> #include <functional>
#include <memory>
#include <mutex>
#include <sstream>
#include <thread>
#include <unordered_map> #include <unordered_map>
#include "rcpputils/scope_exit.hpp" #include "rcpputils/scope_exit.hpp"
@ -33,7 +33,7 @@ std::unordered_map<ROSDefaultMultithreadedExecutor*,
ROSDefaultMultithreadedExecutor::wait_mutex_set_; ROSDefaultMultithreadedExecutor::wait_mutex_set_;
std::mutex ROSDefaultMultithreadedExecutor::shared_wait_mutex_; std::mutex ROSDefaultMultithreadedExecutor::shared_wait_mutex_;
ROSDefaultExecutor::ROSDefaultExecutor(rclcpp::ExecutorOptions const &options) ROSDefaultExecutor::ROSDefaultExecutor(rclcpp::ExecutorOptions const& options)
: rclcpp::Executor(options) { : rclcpp::Executor(options) {
logger_ = create_logger(); logger_ = create_logger();
} }
@ -130,7 +130,7 @@ size_t ROSDefaultMultithreadedExecutor::get_number_of_threads() {
} }
ROSDefaultMultithreadedExecutor::ROSDefaultMultithreadedExecutor( ROSDefaultMultithreadedExecutor::ROSDefaultMultithreadedExecutor(
rclcpp::ExecutorOptions const &options, rclcpp::ExecutorOptions const& options,
int number_of_threads, int number_of_threads,
std::chrono::nanoseconds next_exec_timeout) std::chrono::nanoseconds next_exec_timeout)
: Executor(options) { : Executor(options) {

View file

@ -1,5 +1,4 @@
#include "priority_executor/multithread_priority_executor.hpp" #include "priority_executor/multithread_priority_executor.hpp"
#include "priority_executor/performance_monitor.hpp"
#include <chrono> #include <chrono>
#include <functional> #include <functional>
@ -8,7 +7,6 @@
#include <sstream> #include <sstream>
#include <thread> #include <thread>
#include <unordered_map> #include <unordered_map>
#include <nlohmann/json.hpp>
#include "rcpputils/scope_exit.hpp" #include "rcpputils/scope_exit.hpp"
#include "rclcpp/any_executable.hpp" #include "rclcpp/any_executable.hpp"
@ -22,7 +20,7 @@ std::unordered_map<MultithreadTimedExecutor*,
std::mutex MultithreadTimedExecutor::shared_wait_mutex_; std::mutex MultithreadTimedExecutor::shared_wait_mutex_;
MultithreadTimedExecutor::MultithreadTimedExecutor( MultithreadTimedExecutor::MultithreadTimedExecutor(
rclcpp::ExecutorOptions const &options, rclcpp::ExecutorOptions const& options,
std::string const &name, std::string const &name,
int number_of_threads, int number_of_threads,
std::chrono::nanoseconds next_exec_timeout) std::chrono::nanoseconds next_exec_timeout)
@ -34,11 +32,6 @@ MultithreadTimedExecutor::MultithreadTimedExecutor(
logger_ = create_logger(); logger_ = create_logger();
} }
size_t MultithreadTimedExecutor::get_number_of_threads()
{
return number_of_threads_;
}
void MultithreadTimedExecutor::spin() { void MultithreadTimedExecutor::spin() {
if (spinning.exchange(true)) { if (spinning.exchange(true)) {
throw std::runtime_error("spin() called while already spinning"); throw std::runtime_error("spin() called while already spinning");
@ -60,27 +53,9 @@ void MultithreadTimedExecutor::spin() {
for (auto& thread : threads) { for (auto& thread : threads) {
thread.join(); thread.join();
} }
spinning.store(false);
} }
void MultithreadTimedExecutor::run(size_t _thread_number) { void MultithreadTimedExecutor::run(size_t _thread_number) {
// set affinity
cpu_set_t cpuset;
CPU_ZERO(&cpuset);
CPU_SET(_thread_number, &cpuset);
int rc = pthread_setaffinity_np(pthread_self(), sizeof(cpu_set_t), &cpuset);
if (rc != 0)
{
RCLCPP_ERROR(rclcpp::get_logger("priority_executor"),
"Error calling pthread_setaffinity_np: %d", rc);
}
// Create thread context information for logs
nlohmann::json thread_info;
thread_info["thread_id"] = _thread_number;
thread_info["cpu_affinity"] = _thread_number;
std::string thread_info_str = thread_info.dump();
while (rclcpp::ok(this->context_) && spinning.load()) { while (rclcpp::ok(this->context_) && spinning.load()) {
rclcpp::AnyExecutable any_exec; rclcpp::AnyExecutable any_exec;
{ {
@ -108,160 +83,8 @@ void MultithreadTimedExecutor::run(size_t _thread_number) {
} }
} }
// Record execution start with thread info
auto start = std::chrono::steady_clock::now();
uint64_t start_time = std::chrono::duration_cast<std::chrono::microseconds>(
start.time_since_epoch()).count();
PerformanceEvent start_event;
start_event.timestamp = start_time;
start_event.type = PerformanceEventType::CALLBACK_START;
start_event.executor_id = getExecutorId();
if (any_exec.node_base) {
start_event.node_name = any_exec.node_base->get_name();
// Enhanced callback identification based on executable type
if (any_exec.timer) {
// For timer callbacks
std::stringstream ss;
// retrieve the timer period
int64_t period;
rcl_timer_get_period(any_exec.timer->get_timer_handle().get(), &period);
// convert period (which is in nanoseconds) to milliseconds
std::chrono::nanoseconds period_ns(period);
std::chrono::milliseconds period_ms = std::chrono::duration_cast<std::chrono::milliseconds>(period_ns);
ss << "timer_" << period_ms.count() << "ms";
start_event.callback_name = ss.str();
// Add memory address as unique identifier
std::stringstream addr;
addr << "@" << any_exec.timer.get();
nlohmann::json additional;
additional["callback_type"] = "timer";
additional["timer_addr"] = addr.str();
additional["thread_info"] = thread_info;
start_event.additional_data = additional.dump();
}
else if (any_exec.subscription) {
// For subscription callbacks
start_event.callback_name = any_exec.subscription->get_topic_name();
nlohmann::json additional;
additional["callback_type"] = "subscription";
additional["thread_info"] = thread_info;
start_event.additional_data = additional.dump();
}
else if (any_exec.service) {
start_event.callback_name = any_exec.service->get_service_name();
nlohmann::json additional;
additional["callback_type"] = "service";
additional["thread_info"] = thread_info;
start_event.additional_data = additional.dump();
}
else if (any_exec.client) {
start_event.callback_name = "client_callback";
nlohmann::json additional;
additional["callback_type"] = "client";
additional["thread_info"] = thread_info;
start_event.additional_data = additional.dump();
}
else if (any_exec.waitable) {
start_event.callback_name = "waitable_callback";
nlohmann::json additional;
additional["callback_type"] = "waitable";
additional["thread_info"] = thread_info;
start_event.additional_data = additional.dump();
}
// Add chain information
std::shared_ptr<PriorityExecutable> executable = nullptr;
if (prio_memory_strategy_) {
if (any_exec.timer) {
executable = prio_memory_strategy_->get_priority_settings(
any_exec.timer->get_timer_handle());
} else if (any_exec.subscription) {
executable = prio_memory_strategy_->get_priority_settings(
any_exec.subscription->get_subscription_handle());
}
}
if (executable) {
start_event.chain_id = executable->chain_id;
start_event.is_first_in_chain = executable->is_first_in_chain;
start_event.is_last_in_chain = executable->is_last_in_chain;
if (executable->deadlines && !executable->deadlines->empty()) {
start_event.deadline = executable->deadlines->front();
}
// Include the priority information
try {
nlohmann::json additional = start_event.additional_data.empty() ?
nlohmann::json() : nlohmann::json::parse(start_event.additional_data);
additional["priority"] = executable->priority;
if (executable->chain_id >= 0) {
additional["chain_id"] = executable->chain_id;
}
if (executable->sched_type != ExecutableScheduleType::DEFAULT) {
additional["schedule_type"] = static_cast<int>(executable->sched_type);
}
start_event.additional_data = additional.dump();
} catch (...) {
// If there was an error parsing the JSON, create new JSON object
nlohmann::json additional;
additional["priority"] = executable->priority;
additional["thread_info"] = thread_info;
start_event.additional_data = additional.dump();
}
}
} else {
start_event.additional_data = thread_info_str;
}
PerformanceMonitor::getInstance().recordEvent(start_event);
// Execute the callback
execute_any_executable(any_exec); execute_any_executable(any_exec);
// Record execution completion
auto end = std::chrono::steady_clock::now();
uint64_t end_time = std::chrono::duration_cast<std::chrono::microseconds>(
end.time_since_epoch()).count();
uint64_t processing_time = end_time - start_time;
// Create identical event structure but for end event
PerformanceEvent end_event = start_event;
end_event.timestamp = end_time;
end_event.type = PerformanceEventType::CALLBACK_END;
end_event.processing_time = processing_time;
// Update the additional data to include processing time
try {
nlohmann::json additional = end_event.additional_data.empty() ?
nlohmann::json() : nlohmann::json::parse(end_event.additional_data);
additional["processing_time_us"] = processing_time;
end_event.additional_data = additional.dump();
} catch (...) {
// If JSON parsing failed, create a new object
nlohmann::json additional;
additional["processing_time_us"] = processing_time;
additional["thread_info"] = thread_info;
end_event.additional_data = additional.dump();
}
PerformanceMonitor::getInstance().recordEvent(end_event);
if (any_exec.timer) { if (any_exec.timer) {
auto wait_mutex = MultithreadTimedExecutor::wait_mutex_set_[this]; auto wait_mutex = MultithreadTimedExecutor::wait_mutex_set_[this];
auto high_priority_wait_mutex = wait_mutex->get_high_priority_lockable(); auto high_priority_wait_mutex = wait_mutex->get_high_priority_lockable();
@ -275,13 +98,6 @@ void MultithreadTimedExecutor::run(size_t _thread_number) {
// Clear the callback_group to prevent the AnyExecutable destructor from // Clear the callback_group to prevent the AnyExecutable destructor from
// resetting the callback group `can_be_taken_from` // resetting the callback group `can_be_taken_from`
any_exec.callback_group.reset(); any_exec.callback_group.reset();
if (prio_memory_strategy_ != nullptr)
{
auto wait_mutex = MultithreadTimedExecutor::wait_mutex_set_[this];
auto low_priority_wait_mutex = wait_mutex->get_low_priority_lockable();
std::lock_guard<rclcpp::detail::MutexTwoPriorities::LowPriorityLockable> wait_lock(low_priority_wait_mutex);
prio_memory_strategy_->post_execute(any_exec, _thread_number);
}
} }
} }
} // namespace priority_executor } // namespace priority_executor

View file

@ -1,149 +0,0 @@
#include "priority_executor/performance_monitor.hpp"
#include <filesystem>
#include <fstream>
#include <nlohmann/json.hpp>
#include <sstream>
#include <iomanip>
namespace priority_executor {
PerformanceMonitor& PerformanceMonitor::getInstance() {
static PerformanceMonitor instance;
return instance;
}
PerformanceMonitor::PerformanceMonitor() {
event_buffer_.reserve(buffer_size_);
std::filesystem::create_directories(dump_path_);
}
void PerformanceMonitor::recordEvent(const PerformanceEvent& event) {
if (!enabled_) return;
{
std::lock_guard<std::mutex> lock(event_mutex_);
event_buffer_.push_back(event);
if (auto_dump_threshold_ > 0 && event_buffer_.size() >= auto_dump_threshold_) {
checkAndAutoDump();
}
}
}
void PerformanceMonitor::setBufferSize(size_t size) {
std::lock_guard<std::mutex> lock(event_mutex_);
buffer_size_ = size;
event_buffer_.reserve(size);
}
void PerformanceMonitor::setAutoDumpThreshold(size_t threshold) {
auto_dump_threshold_ = threshold;
}
void PerformanceMonitor::setDumpPath(const std::string& path) {
dump_path_ = path;
std::filesystem::create_directories(dump_path_);
}
void PerformanceMonitor::enable(bool enabled) {
enabled_ = enabled;
}
void PerformanceMonitor::clear() {
std::lock_guard<std::mutex> lock(event_mutex_);
event_buffer_.clear();
}
std::string PerformanceMonitor::eventsToJson() const {
nlohmann::json root = nlohmann::json::array();
for (const auto& event : event_buffer_) {
nlohmann::json event_json;
event_json["timestamp"] = event.timestamp;
switch (event.type) {
case PerformanceEventType::CALLBACK_READY:
event_json["type"] = "callback_ready";
break;
case PerformanceEventType::CALLBACK_START:
event_json["type"] = "callback_start";
break;
case PerformanceEventType::CALLBACK_END:
event_json["type"] = "callback_end";
break;
case PerformanceEventType::DEADLINE_MISSED:
event_json["type"] = "deadline_missed";
break;
case PerformanceEventType::DEADLINE_MET:
event_json["type"] = "deadline_met";
break;
case PerformanceEventType::CHAIN_START:
event_json["type"] = "chain_start";
break;
case PerformanceEventType::CHAIN_END:
event_json["type"] = "chain_end";
break;
}
event_json["node_name"] = event.node_name;
event_json["callback_name"] = event.callback_name;
event_json["chain_id"] = event.chain_id;
event_json["is_first_in_chain"] = event.is_first_in_chain;
event_json["is_last_in_chain"] = event.is_last_in_chain;
if (event.deadline > 0) {
event_json["deadline"] = event.deadline;
}
if (event.processing_time > 0) {
event_json["processing_time"] = event.processing_time;
}
event_json["executor_id"] = event.executor_id;
if (!event.additional_data.empty()) {
try {
event_json["additional_data"] = nlohmann::json::parse(event.additional_data);
} catch (...) {
event_json["additional_data"] = event.additional_data;
}
}
root.push_back(event_json);
}
return root.dump(2); // Indent with 2 spaces
}
bool PerformanceMonitor::dumpToFile(const std::string& filename) {
std::lock_guard<std::mutex> lock(event_mutex_);
if (event_buffer_.empty()) {
return true; // Nothing to dump
}
std::string full_path = dump_path_ + "/" + filename;
std::ofstream file(full_path);
if (!file.is_open()) {
return false;
}
file << eventsToJson();
event_buffer_.clear();
return true;
}
void PerformanceMonitor::checkAndAutoDump() {
if (event_buffer_.size() >= auto_dump_threshold_) {
auto now = std::chrono::system_clock::now();
auto timestamp = std::chrono::duration_cast<std::chrono::milliseconds>(
now.time_since_epoch()).count();
std::stringstream ss;
ss << "performance_log_" << timestamp << ".json";
dumpToFile(ss.str());
}
}
} // namespace priority_executor

View file

@ -24,7 +24,7 @@
namespace priority_executor namespace priority_executor
{ {
TimedExecutor::TimedExecutor(rclcpp::ExecutorOptions const &options, std::string name) TimedExecutor::TimedExecutor(rclcpp::ExecutorOptions const& options, std::string name)
: rclcpp::Executor(options) { : rclcpp::Executor(options) {
this->name = std::move(name); this->name = std::move(name);
logger_ = create_logger(); logger_ = create_logger();
@ -38,19 +38,11 @@ void TimedExecutor::spin() {
} }
RCLCPP_SCOPE_EXIT(this->spinning.store(false);); RCLCPP_SCOPE_EXIT(this->spinning.store(false););
using namespace std::chrono_literals; while (rclcpp::ok(this->context_) && spinning.load()) {
std::chrono::time_point start = std::chrono::steady_clock::now();
while (rclcpp::ok(this->context_) && spinning.load() && (std::chrono::steady_clock::now() - start) <= 5s) {
rclcpp::AnyExecutable any_executable; rclcpp::AnyExecutable any_executable;
// std::cout<<memory_strategy_->number_of_ready_timers()<<std::endl;
// std::cout << "spinning " << this->name << std::endl;
// size_t ready = memory_strategy_->number_of_ready_subscriptions();
// std::cout << "ready:" << ready << std::endl;
if (get_next_executable(any_executable, std::chrono::nanoseconds(-1))) { if (get_next_executable(any_executable, std::chrono::nanoseconds(-1))) {
execute_any_executable(any_executable); execute_any_executable(any_executable);
// make sure memory_strategy_ is an instance of PriorityMemoryStrategy
if (prio_memory_strategy_ != nullptr) { if (prio_memory_strategy_ != nullptr) {
prio_memory_strategy_->post_execute(any_executable); prio_memory_strategy_->post_execute(any_executable);
} }
@ -63,11 +55,6 @@ bool TimedExecutor::get_next_executable(
rclcpp::AnyExecutable& any_executable, rclcpp::AnyExecutable& any_executable,
std::chrono::nanoseconds timeout) { std::chrono::nanoseconds timeout) {
bool success = false; bool success = false;
// Check to see if there are any subscriptions or timers needing service
// TODO(wjwwood): improve run to run efficiency of this function
// sched_yield();
// sleep for 10us
// usleep(20);
auto const start = std::chrono::steady_clock::now(); auto const start = std::chrono::steady_clock::now();
wait_for_work(timeout); wait_for_work(timeout);

View file

@ -1,11 +1,9 @@
#include "priority_executor/priority_memory_strategy.hpp" #include "priority_executor/priority_memory_strategy.hpp"
#include "simple_timer/rt-sched.hpp" #include "simple_timer/rt-sched.hpp"
#include <ctime> #include <ctime>
#include <sstream>
#include <semaphore.h>
#include <rclcpp/logging.hpp> #include <rclcpp/logging.hpp>
#include <semaphore.h>
#include <sstream>
using namespace priority_executor; using namespace priority_executor;
@ -35,10 +33,11 @@ std::ostream& priority_executor::operator<<(std::ostream& os, const ExecutableSc
return os; return os;
} }
std::ostream& priority_executor::operator<<(std::ostream &os, const PriorityExecutable &pe) std::ostream &operator<<(std::ostream &os, const PriorityExecutable &pe)
{ {
os << "sched_type: " << pe.sched_type << ", "; os << "sched_type: " << pe.sched_type << ", ";
if (pe.sched_type == ExecutableScheduleType::DEADLINE) { if (pe.sched_type == ExecutableScheduleType::DEADLINE)
{
os << "period: " << pe.period << ", "; os << "period: " << pe.period << ", ";
} }
os << "priority: " << pe.priority << ", "; os << "priority: " << pe.priority << ", ";
@ -69,16 +68,13 @@ std::shared_ptr<PriorityExecutable>
PriorityMemoryStrategy<>::get_and_reset_priority(std::shared_ptr<const void> executable, PriorityMemoryStrategy<>::get_and_reset_priority(std::shared_ptr<const void> executable,
ExecutableType t) ExecutableType t)
{ {
// PriorityExecutable *p = get_priority_settings(executable);
std::shared_ptr<PriorityExecutable> p = get_priority_settings(executable); std::shared_ptr<PriorityExecutable> p = get_priority_settings(executable);
if (p == nullptr) if (p == nullptr)
{ {
// priority_map[executable] = PriorityExecutable(executable, 0, t);
priority_map[executable] = priority_map[executable] =
std::make_shared<PriorityExecutable>(executable, 0, t); std::make_shared<PriorityExecutable>(executable, 0, t);
p = priority_map[executable]; p = priority_map[executable];
} }
// p->can_be_run = true;
return p; return p;
} }
@ -291,7 +287,7 @@ void PriorityExecutable::increment_counter() {
this->counter += 1; this->counter += 1;
} }
bool PriorityExecutable::operator==(PriorityExecutable const &other) const { bool PriorityExecutable::operator==(PriorityExecutable const& other) const {
std::cout << "PriorityExecutable::operator== called" << std::endl; std::cout << "PriorityExecutable::operator== called" << std::endl;
return this->handle == other.handle; return this->handle == other.handle;
} }
@ -300,11 +296,7 @@ bool PriorityExecutableComparator::operator()(
PriorityExecutable const *p1, PriorityExecutable const *p1,
PriorityExecutable const *p2) const PriorityExecutable const *p2) const
{ {
// since this will be used in a std::set, also check for equality
if (p1 == nullptr || p2 == nullptr) { if (p1 == nullptr || p2 == nullptr) {
// TODO: realistic value
std::cout << "PriorityExecutableComparator::operator() called with nullptr"
<< std::endl;
return false; return false;
} }
@ -319,21 +311,18 @@ bool PriorityExecutableComparator::operator()(
if (p1->sched_type == ExecutableScheduleType::CHAIN_INDEPENDENT_PRIORITY) { if (p1->sched_type == ExecutableScheduleType::CHAIN_INDEPENDENT_PRIORITY) {
return p1->priority == p2->priority ? return p1->priority == p2->priority ?
p1->executable_id < p2->executable_id : p1->executable_id < p2->executable_id :
// tie-breaker: lower value runs first
p1->priority < p2->priority; p1->priority < p2->priority;
} }
if (p1->sched_type == ExecutableScheduleType::CHAIN_AWARE_PRIORITY) { if (p1->sched_type == ExecutableScheduleType::CHAIN_AWARE_PRIORITY) {
return p1->priority == p2->priority ? return p1->priority == p2->priority ?
p1->executable_id < p2->executable_id : p1->executable_id < p2->executable_id :
// tie-breaker: lower value runs first // tie-breaker: lower value runs first
p1->priority < p2->priority; p1->priority < p2->priority;
} }
if (p1->sched_type == ExecutableScheduleType::DEADLINE) { if (p1->sched_type == ExecutableScheduleType::DEADLINE) {
// TODO: use the counter logic here as well
uint64_t p1_deadline = 0; uint64_t p1_deadline = 0;
uint64_t p2_deadline = 0; uint64_t p2_deadline = 0;
@ -372,8 +361,8 @@ bool PriorityExecutableComparator::operator()(
template <> template <>
void PriorityMemoryStrategy<>::add_guard_condition( void PriorityMemoryStrategy<>::add_guard_condition(
rcl_guard_condition_t const *guard_condition) { rcl_guard_condition_t const* guard_condition) {
for (auto const &existing_guard_condition : guard_conditions_) { for (auto const& existing_guard_condition : guard_conditions_) {
if (existing_guard_condition == guard_condition) { if (existing_guard_condition == guard_condition) {
return; return;
} }
@ -383,7 +372,7 @@ void PriorityMemoryStrategy<>::add_guard_condition(
template <> template <>
void PriorityMemoryStrategy<>::remove_guard_condition( void PriorityMemoryStrategy<>::remove_guard_condition(
rcl_guard_condition_t const *guard_condition) { rcl_guard_condition_t const* guard_condition) {
for (auto it = guard_conditions_.begin(); it != guard_conditions_.end(); ++it) { for (auto it = guard_conditions_.begin(); it != guard_conditions_.end(); ++it) {
if (*it == guard_condition) { if (*it == guard_condition) {
guard_conditions_.erase(it); guard_conditions_.erase(it);

View file

@ -1,107 +1,43 @@
// Copyright 2015 Open Source Robotics Foundation, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <chrono>
#include <memory>
#include <string> #include <string>
#include <fstream> #include <thread>
#include <unistd.h>
#include <rclcpp/rclcpp.hpp> #include "priority_executor/default_executor.hpp"
#include <std_msgs/msg/string.hpp> #include "priority_executor/multithread_priority_executor.hpp"
#include "rclcpp/rclcpp.hpp"
#include "std_msgs/msg/string.hpp"
#include "priority_executor/priority_executor.hpp" using namespace std::chrono_literals;
#include "priority_executor/priority_memory_strategy.hpp"
// re-create the classic talker-listener example with two listeners void chatter_callback(std_msgs::msg::String::SharedPtr const msg) {
class Talker : public rclcpp::Node RCLCPP_INFO(rclcpp::get_logger("rclcpp"), "I heard: '%s'", msg->data.c_str());
{ }
public:
Talker() : Node("talker")
{
// Create a publisher on the "chatter" topic with 10 msg queue size.
pub_ = this->create_publisher<std_msgs::msg::String>("chatter", 10);
// Create a timer of period 1s that calls our callback member function.
timer_ = this->create_wall_timer(std::chrono::seconds(1),
std::bind(&Talker::timer_callback, this));
}
// the timer must be public
rclcpp::TimerBase::SharedPtr timer_;
private:
void timer_callback()
{
std_msgs::msg::String msg;
msg.data = "Hello World!";
RCLCPP_INFO(this->get_logger(), "Publishing: '%s'", msg.data.c_str());
pub_->publish(msg);
}
rclcpp::Publisher<std_msgs::msg::String>::SharedPtr pub_;
};
class Listener : public rclcpp::Node
{
public:
Listener(std::string name) : Node(name)
{
// Create a subscription on the "chatter" topic with the default callback
// method.
sub_ = this->create_subscription<std_msgs::msg::String>(
"chatter", 10,
std::bind(&Listener::callback, this, std::placeholders::_1));
}
// the publisher must be public
rclcpp::Subscription<std_msgs::msg::String>::SharedPtr sub_;
private:
void callback(const std_msgs::msg::String::SharedPtr msg)
{
RCLCPP_INFO(this->get_logger(), "I heard: '%s'", msg->data.c_str());
}
};
int main(int argc, char* argv[]) { int main(int argc, char* argv[]) {
rclcpp::init(argc, argv); rclcpp::init(argc, argv);
auto talker = std::make_shared<Talker>(); auto node = rclcpp::Node::make_shared("minimal_subscriber");
auto listener1 = std::make_shared<Listener>("listener1");
auto listener2 = std::make_shared<Listener>("listener2");
rclcpp::ExecutorOptions options;
auto strategy = std::make_shared<priority_executor::PriorityMemoryStrategy<>>(); auto subscription = node->create_subscription<std_msgs::msg::String>(
options.memory_strategy = strategy; "topic", 10, chatter_callback);
auto executor = new priority_executor::TimedExecutor(options);
// must be set to post_execute can set new deadlines auto executor = std::make_shared<priority_executor::ROSDefaultExecutor>();
executor->prio_memory_strategy_ = strategy; executor->add_node(node);
// the new funcitons in PriorityMemoryStrategy accept the handle of the
// timer/subscription as the first argument
strategy->set_executable_deadline(talker->timer_->get_timer_handle(), 1000,
priority_executor::ExecutableType::TIMER, 0);
// you _must_ set the timer_handle for each chain
strategy->get_priority_settings(talker->timer_->get_timer_handle())
->timer_handle = talker->timer_;
// you _must_ mark the first executable in the chain
strategy->set_first_in_chain(talker->timer_->get_timer_handle());
// set the same period and chain_id for each callback in the chain
strategy->set_executable_deadline(listener1->sub_->get_subscription_handle(),
1000, priority_executor::ExecutableType::SUBSCRIPTION, 0);
strategy->set_executable_deadline(listener2->sub_->get_subscription_handle(),
1000, priority_executor::ExecutableType::SUBSCRIPTION, 0);
// you _must_ mark the last executable in the chain (used to keep track of different instances of the same chain)
strategy->set_last_in_chain(listener2->sub_->get_subscription_handle());
// add all the nodes to the executor
executor->add_node(talker);
executor->add_node(listener1);
executor->add_node(listener2);
// if the executor behaves unexpectedly, you can print the priority settings to make sure they are correct
std::cout << *strategy->get_priority_settings(
talker->timer_->get_timer_handle())
<< std::endl;
std::cout << *strategy->get_priority_settings(
listener1->sub_->get_subscription_handle())
<< std::endl;
std::cout << *strategy->get_priority_settings(
listener2->sub_->get_subscription_handle())
<< std::endl;
executor->spin(); executor->spin();
rclcpp::shutdown(); rclcpp::shutdown();

View file

@ -6,7 +6,7 @@
<description> <description>
ROS 2 package providing timer functionality for the dynamic executor experiments ROS 2 package providing timer functionality for the dynamic executor experiments
</description> </description>
<maintainer email="kurt4wilson@gmail.com">kurt</maintainer> <maintainer email="your.email@example.com">ROS-Dynamic-Executor Team</maintainer>
<license>Apache License 2.0</license> <license>Apache License 2.0</license>
<buildtool_depend>ament_cmake</buildtool_depend> <buildtool_depend>ament_cmake</buildtool_depend>

View file

@ -33,7 +33,6 @@ int sched_getattr(
void log_entry(node_time_logger logger, std::string const& text) { void log_entry(node_time_logger logger, std::string const& text) {
if (logger.recorded_times != nullptr) { if (logger.recorded_times != nullptr) {
logger.recorded_times->emplace_back(text, get_time_us() / 1000); logger.recorded_times->emplace_back(text, get_time_us() / 1000);
// std::cout<<text<<std::endl;
} else { } else {
// TODO: report error // TODO: report error
} }