From 1b8277bcf5522d74c06d2a616e3d4e13abcbf71c Mon Sep 17 00:00:00 2001 From: Kurt Wilson Date: Sat, 22 Mar 2025 19:25:01 -0400 Subject: [PATCH] misc fixed & improvements. auto-set chain deadline queue. update usage example --- src/priority_executor/CMakeLists.txt | 1 + .../priority_memory_strategy.hpp | 510 ++--- .../src/priority_executor.cpp | 2 +- .../src/priority_memory_strategy.cpp | 1732 ++++++++--------- src/priority_executor/src/usage_example.cpp | 127 +- src/simple_timer/CMakeLists.txt | 61 + .../include/simple_timer/cycle_timer.hpp | 23 + .../include/simple_timer/period_timer.hpp | 25 + .../include/simple_timer/rt-sched.hpp | 98 + src/simple_timer/package.xml | 18 + src/simple_timer/src/cycle_timer.cpp | 43 + src/simple_timer/src/period_timer.cpp | 56 + src/simple_timer/src/rt-sched.cpp | 50 + 13 files changed, 1541 insertions(+), 1205 deletions(-) create mode 100644 src/simple_timer/CMakeLists.txt create mode 100644 src/simple_timer/include/simple_timer/cycle_timer.hpp create mode 100644 src/simple_timer/include/simple_timer/period_timer.hpp create mode 100644 src/simple_timer/include/simple_timer/rt-sched.hpp create mode 100644 src/simple_timer/package.xml create mode 100644 src/simple_timer/src/cycle_timer.cpp create mode 100644 src/simple_timer/src/period_timer.cpp create mode 100644 src/simple_timer/src/rt-sched.cpp diff --git a/src/priority_executor/CMakeLists.txt b/src/priority_executor/CMakeLists.txt index f65a112..0eacd31 100755 --- a/src/priority_executor/CMakeLists.txt +++ b/src/priority_executor/CMakeLists.txt @@ -105,4 +105,5 @@ ament_export_include_directories(include) ament_export_libraries(priority_executor) ament_export_libraries(multithread_priority_executor) ament_export_libraries(default_executor) +ament_export_dependencies(rclcpp rcl simple_timer) ament_package() diff --git a/src/priority_executor/include/priority_executor/priority_memory_strategy.hpp b/src/priority_executor/include/priority_executor/priority_memory_strategy.hpp index f56f263..487c2c4 100755 --- a/src/priority_executor/include/priority_executor/priority_memory_strategy.hpp +++ b/src/priority_executor/include/priority_executor/priority_memory_strategy.hpp @@ -15,11 +15,13 @@ #ifndef RTIS_PRIORITY_STRATEGY #define RTIS_PRIORITY_STRATEGY -#include -#include -#include -#include +#include #include +#include +#include +#include +#include +#include #include "rcl/allocator.h" @@ -30,317 +32,375 @@ /// Delegate for handling memory allocations while the Executor is executing. /** - * By default, the memory strategy dynamically allocates memory for structures that come in from - * the rmw implementation after the executor waits for work, based on the number of entities that - * come through. + * By default, the memory strategy dynamically allocates memory for structures + * that come in from the rmw implementation after the executor waits for work, + * based on the number of entities that come through. */ enum ExecutableType { - SUBSCRIPTION, - SERVICE, - CLIENT, - TIMER, - WAITABLE + SUBSCRIPTION, + SERVICE, + CLIENT, + TIMER, + WAITABLE }; enum ExecutableScheduleType { - DEADLINE = 0, - CHAIN_AWARE_PRIORITY, - CHAIN_INDEPENDENT_PRIORITY, // not used here - DEFAULT, // not used here + DEADLINE = 0, + CHAIN_AWARE_PRIORITY, + CHAIN_INDEPENDENT_PRIORITY, // not used here + DEFAULT, // not used here }; class PriorityExecutable { public: - std::shared_ptr handle; - ExecutableType type; - bool can_be_run = false; - std::shared_ptr waitable; - ExecutableScheduleType sched_type; + std::shared_ptr handle; + ExecutableType type; + bool can_be_run = false; + std::shared_ptr waitable; + ExecutableScheduleType sched_type; - int priority; - long period = 1000; // milliseconds + int priority = 0; + long period = 1000; // milliseconds - bool is_first_in_chain = false; - bool is_last_in_chain = false; - // chain aware deadlines - std::deque *deadlines = nullptr; - std::shared_ptr timer_handle; - // just used for logging - int chain_id = 0; + bool is_first_in_chain = false; + bool is_last_in_chain = false; + // chain aware deadlines + std::deque *deadlines = nullptr; + std::shared_ptr timer_handle; + // just used for logging + int chain_id = 0; - // chain aware priority - int counter = 0; + // chain aware priority + int counter = 0; - PriorityExecutable(std::shared_ptr h, int p, ExecutableType t, ExecutableScheduleType sched_type = CHAIN_INDEPENDENT_PRIORITY); + PriorityExecutable( + std::shared_ptr h, int p, ExecutableType t, + ExecutableScheduleType sched_type = CHAIN_INDEPENDENT_PRIORITY); - void dont_run(); + void dont_run(); - void allow_run(); + void allow_run(); - PriorityExecutable(); + PriorityExecutable(); - void increment_counter(); + void increment_counter(); - bool operator==(const PriorityExecutable &other) const; + bool operator==(const PriorityExecutable &other) const; - static size_t num_executables; - int executable_id = 0; + static size_t num_executables; + int executable_id = 0; - std::string name = ""; + std::string name = ""; + + // stream operator for debug + friend std::ostream &operator<<(std::ostream &os, + const PriorityExecutable &pe) + { + os << "sched_type: " << pe.sched_type << ", "; + if (pe.sched_type == DEADLINE) + { + os << "period: " << pe.period << ", "; + } + os << "priority: " << pe.priority << ", "; + os << "executable_id: " << pe.executable_id << ", "; + os << "chain_id: " << pe.chain_id << ", "; + os << "is_first_in_chain: " << pe.is_first_in_chain << ", "; + os << "is_last_in_chain: " << pe.is_last_in_chain << ", "; + + return os; + } }; class PriorityExecutableComparator { public: - bool operator()(const PriorityExecutable *p1, const PriorityExecutable *p2); + bool operator()(const PriorityExecutable *p1, const PriorityExecutable *p2); }; template > class PriorityMemoryStrategy : public rclcpp::memory_strategy::MemoryStrategy { public: - RCLCPP_SMART_PTR_DEFINITIONS(PriorityMemoryStrategy) + RCLCPP_SMART_PTR_DEFINITIONS(PriorityMemoryStrategy) - using VoidAllocTraits = typename rclcpp::allocator::AllocRebind; - using VoidAlloc = typename VoidAllocTraits::allocator_type; + using VoidAllocTraits = + typename rclcpp::allocator::AllocRebind; + using VoidAlloc = typename VoidAllocTraits::allocator_type; - explicit PriorityMemoryStrategy(std::shared_ptr allocator) + explicit PriorityMemoryStrategy(std::shared_ptr allocator) + { + allocator_ = std::make_shared(*allocator.get()); + logger_ = create_logger(); + } + + PriorityMemoryStrategy() + { + allocator_ = std::make_shared(); + logger_ = create_logger(); + } + node_time_logger logger_; + + void + add_guard_condition(const rcl_guard_condition_t *guard_condition) override; + + void + remove_guard_condition(const rcl_guard_condition_t *guard_condition) override; + + void clear_handles() override; + + void remove_null_handles(rcl_wait_set_t *wait_set) override; + + bool collect_entities(const WeakNodeList &weak_nodes) override; + + void + add_waitable_handle(const rclcpp::Waitable::SharedPtr &waitable) override; + + bool add_handles_to_wait_set(rcl_wait_set_t *wait_set) override; + + void get_next_executable(rclcpp::AnyExecutable &any_exec, + const WeakNodeList &weak_nodes); + + /** + * thread-id is used for logging. if -1, then don't log the thread id + */ + void post_execute(rclcpp::AnyExecutable any_exec, int thread_id = -1); + + void get_next_subscription(rclcpp::AnyExecutable &any_exec, + const WeakNodeList &weak_nodes) override; + + void get_next_service(rclcpp::AnyExecutable &any_exec, + const WeakNodeList &weak_nodes) override; + + void get_next_client(rclcpp::AnyExecutable &any_exec, + const WeakNodeList &weak_nodes) override; + + void get_next_timer(rclcpp::AnyExecutable &any_exec, + const WeakNodeList &weak_nodes) override; + + void get_next_waitable(rclcpp::AnyExecutable &any_exec, + const WeakNodeList &weak_nodes) override; + + std::shared_ptr + get_priority_settings(std::shared_ptr executable) + { + auto search = priority_map.find(executable); + if (search != priority_map.end()) { - allocator_ = std::make_shared(*allocator.get()); - logger_ = create_logger(); + return search->second; } - - PriorityMemoryStrategy() + else { - allocator_ = std::make_shared(); - logger_ = create_logger(); + return nullptr; } - node_time_logger logger_; + } - void add_guard_condition(const rcl_guard_condition_t *guard_condition) override; + rcl_allocator_t get_allocator() override + { + return rclcpp::allocator::get_rcl_allocator( + *allocator_.get()); + } - void remove_guard_condition(const rcl_guard_condition_t *guard_condition) override; - - void clear_handles() override; - - void remove_null_handles(rcl_wait_set_t *wait_set) override; - - bool collect_entities(const WeakNodeList &weak_nodes) override; - - void add_waitable_handle(const rclcpp::Waitable::SharedPtr &waitable) override; - - bool add_handles_to_wait_set(rcl_wait_set_t *wait_set) override; - - void - get_next_executable( - rclcpp::AnyExecutable &any_exec, - const WeakNodeList &weak_nodes); - - /** - * thread-id is used for logging. if -1, then don't log the thread id - */ - void post_execute(rclcpp::AnyExecutable any_exec, int thread_id = -1); - - void - get_next_subscription( - rclcpp::AnyExecutable &any_exec, - const WeakNodeList &weak_nodes) override; - - void - get_next_service( - rclcpp::AnyExecutable &any_exec, - const WeakNodeList &weak_nodes) override; - - void - get_next_client(rclcpp::AnyExecutable &any_exec, const WeakNodeList &weak_nodes) override; - - void - get_next_timer( - rclcpp::AnyExecutable &any_exec, - const WeakNodeList &weak_nodes) override; - - void - get_next_waitable(rclcpp::AnyExecutable &any_exec, const WeakNodeList &weak_nodes) override; - - std::shared_ptr get_priority_settings(std::shared_ptr executable) + size_t number_of_ready_subscriptions() const override + { + size_t number_of_subscriptions = subscription_handles_.size(); + // std::cout << "ready_raw: " << number_of_subscriptions << std::endl; + for (auto waitable : waitable_handles_) { - auto search = priority_map.find(executable); - if (search != priority_map.end()) - { - return search->second; - } - else - { - return nullptr; - } + number_of_subscriptions += waitable->get_number_of_ready_subscriptions(); } + return number_of_subscriptions; + } - rcl_allocator_t get_allocator() override + size_t number_of_ready_services() const override + { + size_t number_of_services = service_handles_.size(); + for (auto waitable : waitable_handles_) { - return rclcpp::allocator::get_rcl_allocator(*allocator_.get()); + number_of_services += waitable->get_number_of_ready_services(); } + return number_of_services; + } - size_t number_of_ready_subscriptions() const override + size_t number_of_ready_events() const override + { + size_t number_of_events = 0; + for (auto waitable : waitable_handles_) { - size_t number_of_subscriptions = subscription_handles_.size(); - // std::cout << "ready_raw: " << number_of_subscriptions << std::endl; - for (auto waitable : waitable_handles_) - { - number_of_subscriptions += waitable->get_number_of_ready_subscriptions(); - } - return number_of_subscriptions; + number_of_events += waitable->get_number_of_ready_events(); } + return number_of_events; + } - size_t number_of_ready_services() const override + size_t number_of_ready_clients() const override + { + size_t number_of_clients = client_handles_.size(); + for (auto waitable : waitable_handles_) { - size_t number_of_services = service_handles_.size(); - for (auto waitable : waitable_handles_) - { - number_of_services += waitable->get_number_of_ready_services(); - } - return number_of_services; + number_of_clients += waitable->get_number_of_ready_clients(); } + return number_of_clients; + } - size_t number_of_ready_events() const override + size_t number_of_guard_conditions() const override + { + size_t number_of_guard_conditions = guard_conditions_.size(); + for (auto waitable : waitable_handles_) { - size_t number_of_events = 0; - for (auto waitable : waitable_handles_) - { - number_of_events += waitable->get_number_of_ready_events(); - } - return number_of_events; + number_of_guard_conditions += + waitable->get_number_of_ready_guard_conditions(); } + return number_of_guard_conditions; + } - size_t number_of_ready_clients() const override + size_t number_of_ready_timers() const override + { + size_t number_of_timers = timer_handles_.size(); + for (auto waitable : waitable_handles_) { - size_t number_of_clients = client_handles_.size(); - for (auto waitable : waitable_handles_) - { - number_of_clients += waitable->get_number_of_ready_clients(); - } - return number_of_clients; + number_of_timers += waitable->get_number_of_ready_timers(); } + return number_of_timers; + } - size_t number_of_guard_conditions() const override - { - size_t number_of_guard_conditions = guard_conditions_.size(); - for (auto waitable : waitable_handles_) - { - number_of_guard_conditions += waitable->get_number_of_ready_guard_conditions(); - } - return number_of_guard_conditions; - } + size_t number_of_waitables() const override + { + return waitable_handles_.size(); + } - size_t number_of_ready_timers() const override - { - size_t number_of_timers = timer_handles_.size(); - for (auto waitable : waitable_handles_) - { - number_of_timers += waitable->get_number_of_ready_timers(); - } - return number_of_timers; - } + void set_executable_priority(std::shared_ptr handle, int priority, + ExecutableType t) + { + // TODO: any sanity checks should go here + priority_map[handle] = + std::make_shared(handle, priority, t); + } + void set_executable_priority(std::shared_ptr handle, int priority, + ExecutableType t, ExecutableScheduleType sc, + int chain_index) + { + // TODO: any sanity checks should go here + priority_map[handle] = + std::make_shared(handle, priority, t, sc); + priority_map[handle]->chain_id = chain_index; + } - size_t number_of_waitables() const override + void set_executable_deadline(std::shared_ptr handle, int period, + ExecutableType t, int chain_id = 0, + std::string name = "") + { + // TODO: any sanity checks should go here + priority_map[handle] = + std::make_shared(handle, period, t, DEADLINE); + priority_map[handle]->chain_id = chain_id; + priority_map[handle]->name = name; + // is there a deadline queue for this chain id? + auto search = chain_deadlines.find(chain_id); + if (search == chain_deadlines.end()) { - return waitable_handles_.size(); + chain_deadlines[chain_id] = std::make_shared>(); } + priority_map[handle]->deadlines = chain_deadlines[chain_id].get(); + } - void set_executable_priority(std::shared_ptr handle, int priority, ExecutableType t) + int get_priority(std::shared_ptr executable) + { + auto search = priority_map.find(executable); + if (search != priority_map.end()) { - // TODO: any sanity checks should go here - priority_map[handle] = std::make_shared(handle, priority, t); + return search->second->priority; } - void set_executable_priority(std::shared_ptr handle, int priority, ExecutableType t, ExecutableScheduleType sc, int chain_index) + else { - // TODO: any sanity checks should go here - priority_map[handle] = std::make_shared(handle, priority, t, sc); - priority_map[handle]->chain_id = chain_index; + return 0; } + } - void set_executable_deadline(std::shared_ptr handle, int period, ExecutableType t, int chain_id = 0, std::string name = "") - { - // TODO: any sanity checks should go here - priority_map[handle] = std::make_shared(handle, period, t, DEADLINE); - priority_map[handle]->chain_id = chain_id; - priority_map[handle]->name = name; - } + void set_first_in_chain(std::shared_ptr exec_handle) + { + get_priority_settings(exec_handle)->is_first_in_chain = true; + } - int get_priority(std::shared_ptr executable) - { - auto search = priority_map.find(executable); - if (search != priority_map.end()) - { - return search->second->priority; - } - else - { - return 0; - } - } + void set_last_in_chain(std::shared_ptr exec_handle) + { + get_priority_settings(exec_handle)->is_last_in_chain = true; + } + void assign_deadlines_queue(std::shared_ptr exec_handle, + std::deque *deadlines) + { + get_priority_settings(exec_handle)->deadlines = deadlines; + } - void set_first_in_chain(std::shared_ptr exec_handle) + std::shared_ptr> get_chain_deadlines(int chain_id) + { + auto search = chain_deadlines.find(chain_id); + if (search != chain_deadlines.end()) { - get_priority_settings(exec_handle)->is_first_in_chain = true; + return search->second; } - - void set_last_in_chain(std::shared_ptr exec_handle) + else { - get_priority_settings(exec_handle)->is_last_in_chain = true; - } - - void assign_deadlines_queue(std::shared_ptr exec_handle, std::deque *deadlines) - { - get_priority_settings(exec_handle)->deadlines = deadlines; + return nullptr; } + } private: - std::shared_ptr get_and_reset_priority(std::shared_ptr executable, ExecutableType t) + std::shared_ptr + get_and_reset_priority(std::shared_ptr executable, + ExecutableType t) + { + // PriorityExecutable *p = get_priority_settings(executable); + std::shared_ptr p = get_priority_settings(executable); + if (p == nullptr) { - // PriorityExecutable *p = get_priority_settings(executable); - std::shared_ptr p = get_priority_settings(executable); - if (p == nullptr) - { - // priority_map[executable] = PriorityExecutable(executable, 0, t); - priority_map[executable] = std::make_shared(executable, 0, t); - p = priority_map[executable]; - } - // p->can_be_run = true; - return p; + // priority_map[executable] = PriorityExecutable(executable, 0, t); + priority_map[executable] = + std::make_shared(executable, 0, t); + p = priority_map[executable]; } + // p->can_be_run = true; + return p; + } - template - using VectorRebind = - std::vector::template rebind_alloc>; + template + using VectorRebind = std::vector< + T, typename std::allocator_traits::template rebind_alloc>; - VectorRebind guard_conditions_; + VectorRebind guard_conditions_; - VectorRebind> subscription_handles_; - VectorRebind> service_handles_; - VectorRebind> client_handles_; - VectorRebind> timer_handles_; - VectorRebind> waitable_handles_; + VectorRebind> subscription_handles_; + VectorRebind> service_handles_; + VectorRebind> client_handles_; + VectorRebind> timer_handles_; + VectorRebind> waitable_handles_; - std::shared_ptr allocator_; + std::shared_ptr allocator_; - // TODO: evaluate using node/subscription namespaced strings as keys + // TODO: evaluate using node/subscription namespaced strings as keys - // holds *all* handle->priority mappings - std::map, std::shared_ptr> priority_map; + // holds *all* handle->priority mappings + std::map, std::shared_ptr> + priority_map; - // hold *only valid* executable+priorities - // std::priority_queue, PriorityExecutableComparator> all_executables_; - std::set all_executables_; + std::map>> chain_deadlines; - // priority queue doesn't allow iteration. fortunately, std::map is sorted by key, so we can replace the priority queue with a map - // the key will be the priority. the value doesn't matter. - // std::map all_executables_ = std::map(PriorityExecutableComparator()); - void add_executable_to_queue(std::shared_ptr p); + // hold *only valid* executable+priorities + // std::priority_queue, PriorityExecutableComparator> all_executables_; + std::set + all_executables_; + + // priority queue doesn't allow iteration. fortunately, std::map is sorted by + // key, so we can replace the priority queue with a map the key will be the + // priority. the value doesn't matter. std::map all_executables_ = std::map(PriorityExecutableComparator()); + void add_executable_to_queue(std::shared_ptr p); }; #endif // RCLCPP__STRATEGIES__ALLOCATOR_MEMORY_STRATEGY_HPP_ diff --git a/src/priority_executor/src/priority_executor.cpp b/src/priority_executor/src/priority_executor.cpp index e305c77..d207b3f 100755 --- a/src/priority_executor/src/priority_executor.cpp +++ b/src/priority_executor/src/priority_executor.cpp @@ -59,7 +59,7 @@ namespace timed_executor } } } - std::cout << "shutdown" << std::endl; + RCLCPP_INFO(rclcpp::get_logger("priority_executor"), "priority executor shutdown"); } bool TimedExecutor::get_next_executable(rclcpp::AnyExecutable &any_executable, std::chrono::nanoseconds timeout) diff --git a/src/priority_executor/src/priority_memory_strategy.cpp b/src/priority_executor/src/priority_memory_strategy.cpp index ba42379..0948470 100755 --- a/src/priority_executor/src/priority_memory_strategy.cpp +++ b/src/priority_executor/src/priority_memory_strategy.cpp @@ -1,977 +1,859 @@ #include "priority_executor/priority_memory_strategy.hpp" #include "simple_timer/rt-sched.hpp" +#include +#include +#include #include size_t PriorityExecutable::num_executables; -PriorityExecutable::PriorityExecutable(std::shared_ptr h, int p, ExecutableType t, ExecutableScheduleType sched_type) -{ - std::cout << "priority_executable constructor called" << std::endl; - std::cout << "type: " << t << std::endl; - handle = h; - type = t; - if (sched_type == CHAIN_INDEPENDENT_PRIORITY || sched_type == CHAIN_AWARE_PRIORITY) - { - priority = p; - } - else if (sched_type == DEADLINE) - { - period = p; - } - this->sched_type = sched_type; +PriorityExecutable::PriorityExecutable(std::shared_ptr h, int p, + ExecutableType t, + ExecutableScheduleType sched_type) { + std::cout << "priority_executable constructor called" << std::endl; + std::cout << "type: " << t << std::endl; + handle = h; + type = t; + if (sched_type == CHAIN_INDEPENDENT_PRIORITY || + sched_type == CHAIN_AWARE_PRIORITY) { + priority = p; + } else if (sched_type == DEADLINE) { + period = p; + // as a tiebreaker + priority = num_executables; + } + this->sched_type = sched_type; - this->executable_id = num_executables; - num_executables += 1; + this->executable_id = num_executables; + num_executables += 1; } -PriorityExecutable::PriorityExecutable() -{ - handle = nullptr; - priority = 0; - type = SUBSCRIPTION; +PriorityExecutable::PriorityExecutable() { + handle = nullptr; + priority = 0; + type = SUBSCRIPTION; } -void PriorityExecutable::dont_run() -{ +void PriorityExecutable::dont_run() { this->can_be_run = false; } - this->can_be_run = false; -} +void PriorityExecutable::allow_run() { this->can_be_run = true; } -void PriorityExecutable::allow_run() -{ - this->can_be_run = true; -} +void PriorityExecutable::increment_counter() { this->counter += 1; } -void PriorityExecutable::increment_counter() -{ - this->counter += 1; -} - -bool PriorityExecutable::operator==(const PriorityExecutable &other) const -{ - std::cout << "PriorityExecutable::operator== called" << std::endl; - if (this->handle == other.handle) - { - return true; - } - else - { - return false; - } -} - -bool PriorityExecutableComparator::operator()(const PriorityExecutable *p1, const PriorityExecutable *p2) -{ - // since this will be used in a std::set, also check for equality - if (p1 == nullptr || p2 == nullptr) - { - // TODO: realistic value - std::cout << "PriorityExecutableComparator::operator() called with nullptr" << std::endl; - return false; - } - if (p1->handle == p2->handle) - { - return false; - } - if (p1->executable_id == p2->executable_id) - { - return false; - } - - if (p1->sched_type != p2->sched_type) - { - // in order: DEADLINE, CHAIN_AWARE, CHAIN_INDEPENDENT - return p1->sched_type < p2->sched_type; - } - if (p1->sched_type == CHAIN_INDEPENDENT_PRIORITY) - { - if (p1->priority == p2->priority) - { - return p1->executable_id < p2->executable_id; - } - // lower value runs first - return p1->priority < p2->priority; - } - if (p1->sched_type == CHAIN_AWARE_PRIORITY) - { - if (p1->priority == p2->priority) - { - return p1->executable_id < p2->executable_id; - } - return p1->priority < p2->priority; - } - if (p1->sched_type == DEADLINE) - { - // TODO: use the counter logic here as well - - uint64_t p1_deadline = 0; - uint64_t p2_deadline = 0; - if (p1->deadlines != nullptr && !p1->deadlines->empty()) - { - p1_deadline = p1->deadlines->front(); - } - if (p2->deadlines != nullptr && !p2->deadlines->empty()) - { - p2_deadline = p2->deadlines->front(); - } - if (p1_deadline == p2_deadline || p1->deadlines == p2->deadlines) - { - // this looks bad and is bad, BUT - // if we tell std::set these are equal, only one will be added, and we will lose the other. - // we need _something_ to make them unique - // since we'd rather finish a chain than start a new one, select the higher id - // return p1->executable_id > p2->executable_id; - return p1->priority < p2->priority; - } - if (p1_deadline == 0) - { - p1_deadline = std::numeric_limits::max(); - } - if (p2_deadline == 0) - { - p2_deadline = std::numeric_limits::max(); - } - return p1_deadline < p2_deadline; - } - else - { - std::cout << "invalid compare opration on priority_exec" << std::endl; - return false; - } -} - -template <> -void PriorityMemoryStrategy<>::add_guard_condition(const rcl_guard_condition_t *guard_condition) -{ - for (const auto &existing_guard_condition : guard_conditions_) - { - if (existing_guard_condition == guard_condition) - { - return; - } - } - guard_conditions_.push_back(guard_condition); -} - -template <> -void PriorityMemoryStrategy<>::remove_guard_condition(const rcl_guard_condition_t *guard_condition) -{ - for (auto it = guard_conditions_.begin(); it != guard_conditions_.end(); ++it) - { - if (*it == guard_condition) - { - guard_conditions_.erase(it); - break; - } - } -} - -template <> -void PriorityMemoryStrategy<>::add_executable_to_queue(std::shared_ptr e) -{ - // e may have changed. remove and re-add - all_executables_.erase(e.get()); - // convert from shared_ptr to raw pointer - all_executables_.insert(e.get()); -} - -template <> -void PriorityMemoryStrategy<>::clear_handles() -{ - subscription_handles_.clear(); - service_handles_.clear(); - client_handles_.clear(); - timer_handles_.clear(); - waitable_handles_.clear(); - - // all_executables_ = std::priority_queue, PriorityExecutableComparator>(); - // all_executables_.clear(); - // all_executables_ = std::map(PriorityExecutableComparator()); -} - -template <> -void PriorityMemoryStrategy<>::remove_null_handles(rcl_wait_set_t *wait_set) -{ - // TODO(jacobperron): Check if wait set sizes are what we expect them to be? - // e.g. wait_set->size_of_clients == client_handles_.size() - - // Important to use subscription_handles_.size() instead of wait set's size since - // there may be more subscriptions in the wait set due to Waitables added to the end. - // The same logic applies for other entities. - for (size_t i = 0; i < subscription_handles_.size(); ++i) - { - if (!wait_set->subscriptions[i]) - { - priority_map[subscription_handles_[i]]->dont_run(); - subscription_handles_[i].reset(); - } - else - { - priority_map[subscription_handles_[i]]->allow_run(); - } - } - for (size_t i = 0; i < service_handles_.size(); ++i) - { - if (!wait_set->services[i]) - { - priority_map[service_handles_[i]]->dont_run(); - service_handles_[i].reset(); - } - else - { - priority_map[service_handles_[i]]->allow_run(); - } - } - for (size_t i = 0; i < client_handles_.size(); ++i) - { - if (!wait_set->clients[i]) - { - priority_map[client_handles_[i]]->dont_run(); - client_handles_[i].reset(); - } - else - { - priority_map[client_handles_[i]]->allow_run(); - } - } - for (size_t i = 0; i < timer_handles_.size(); ++i) - { - if (!wait_set->timers[i]) - { - priority_map[timer_handles_[i]]->dont_run(); - timer_handles_[i].reset(); - } - else - { - priority_map[timer_handles_[i]]->allow_run(); - } - } - for (size_t i = 0; i < waitable_handles_.size(); ++i) - { - if (!waitable_handles_[i]->is_ready(wait_set)) - { - priority_map[waitable_handles_[i]]->dont_run(); - waitable_handles_[i].reset(); - } - else - { - priority_map[waitable_handles_[i]]->allow_run(); - } - } - - subscription_handles_.erase( - std::remove(subscription_handles_.begin(), subscription_handles_.end(), nullptr), - subscription_handles_.end()); - - service_handles_.erase( - std::remove(service_handles_.begin(), service_handles_.end(), nullptr), - service_handles_.end()); - - client_handles_.erase( - std::remove(client_handles_.begin(), client_handles_.end(), nullptr), - client_handles_.end()); - - timer_handles_.erase( - std::remove(timer_handles_.begin(), timer_handles_.end(), nullptr), - timer_handles_.end()); - - waitable_handles_.erase( - std::remove(waitable_handles_.begin(), waitable_handles_.end(), nullptr), - waitable_handles_.end()); -} - -template <> -bool PriorityMemoryStrategy<>::collect_entities(const WeakNodeList &weak_nodes) -{ - - bool has_invalid_weak_nodes = false; - for (auto &weak_node : weak_nodes) - { - auto node = weak_node.lock(); - if (!node) - { - has_invalid_weak_nodes = true; - continue; - } - for (auto &weak_group : node->get_callback_groups()) - { - auto group = weak_group.lock(); - if (!group) - // if (!group || !group->can_be_taken_from().load()) - { - continue; - } - group->find_subscription_ptrs_if( - [this](const rclcpp::SubscriptionBase::SharedPtr &subscription) - { - auto subscription_handle = subscription->get_subscription_handle(); - subscription_handles_.push_back(subscription_handle); - add_executable_to_queue(get_and_reset_priority(subscription_handle, SUBSCRIPTION)); - return false; - }); - group->find_service_ptrs_if( - [this](const rclcpp::ServiceBase::SharedPtr &service) - { - add_executable_to_queue(get_and_reset_priority(service->get_service_handle(), SERVICE)); - service_handles_.push_back(service->get_service_handle()); - return false; - }); - group->find_client_ptrs_if( - [this](const rclcpp::ClientBase::SharedPtr &client) - { - add_executable_to_queue(get_and_reset_priority(client->get_client_handle(), CLIENT)); - client_handles_.push_back(client->get_client_handle()); - return false; - }); - group->find_timer_ptrs_if( - [this](const rclcpp::TimerBase::SharedPtr &timer) - { - add_executable_to_queue(get_and_reset_priority(timer->get_timer_handle(), TIMER)); - timer_handles_.push_back(timer->get_timer_handle()); - return false; - }); - group->find_waitable_ptrs_if( - [this](const rclcpp::Waitable::SharedPtr &waitable) - { - add_executable_to_queue(get_and_reset_priority(waitable, WAITABLE)); - waitable_handles_.push_back(waitable); - return false; - }); - } - } - return has_invalid_weak_nodes; -} - -template <> -void PriorityMemoryStrategy<>::add_waitable_handle(const rclcpp::Waitable::SharedPtr &waitable) -{ - - if (nullptr == waitable) - { - throw std::runtime_error("waitable object unexpectedly nullptr"); - } - waitable_handles_.push_back(waitable); -} - -template <> -bool PriorityMemoryStrategy<>::add_handles_to_wait_set(rcl_wait_set_t *wait_set) -{ - for (auto subscription : subscription_handles_) - { - if (rcl_wait_set_add_subscription(wait_set, subscription.get(), NULL) != RCL_RET_OK) - { - RCUTILS_LOG_ERROR_NAMED( - "rclcpp", - "Couldn't add subscription to wait set: %s", rcl_get_error_string().str); - return false; - } - } - - for (auto client : client_handles_) - { - if (rcl_wait_set_add_client(wait_set, client.get(), NULL) != RCL_RET_OK) - { - RCUTILS_LOG_ERROR_NAMED( - "rclcpp", - "Couldn't add client to wait set: %s", rcl_get_error_string().str); - return false; - } - } - - for (auto service : service_handles_) - { - if (rcl_wait_set_add_service(wait_set, service.get(), NULL) != RCL_RET_OK) - { - RCUTILS_LOG_ERROR_NAMED( - "rclcpp", - "Couldn't add service to wait set: %s", rcl_get_error_string().str); - return false; - } - } - - for (auto timer : timer_handles_) - { - if (rcl_wait_set_add_timer(wait_set, timer.get(), NULL) != RCL_RET_OK) - { - RCUTILS_LOG_ERROR_NAMED( - "rclcpp", - "Couldn't add timer to wait set: %s", rcl_get_error_string().str); - return false; - } - } - - for (auto guard_condition : guard_conditions_) - { - if (rcl_wait_set_add_guard_condition(wait_set, guard_condition, NULL) != RCL_RET_OK) - { - RCUTILS_LOG_ERROR_NAMED( - "rclcpp", - "Couldn't add guard_condition to wait set: %s", - rcl_get_error_string().str); - return false; - } - } - - for (auto waitable : waitable_handles_) - { - if (!waitable->add_to_wait_set(wait_set)) - { - RCUTILS_LOG_ERROR_NAMED( - "rclcpp", - "Couldn't add waitable to wait set: %s", rcl_get_error_string().str); - return false; - } - } +bool PriorityExecutable::operator==(const PriorityExecutable &other) const { + std::cout << "PriorityExecutable::operator== called" << std::endl; + if (this->handle == other.handle) { return true; + } else { + return false; + } +} + +bool PriorityExecutableComparator::operator()(const PriorityExecutable *p1, + const PriorityExecutable *p2) { + // since this will be used in a std::set, also check for equality + if (p1 == nullptr || p2 == nullptr) { + // TODO: realistic value + std::cout << "PriorityExecutableComparator::operator() called with nullptr" + << std::endl; + return false; + } + if (p1->handle == p2->handle) { + return false; + } + if (p1->executable_id == p2->executable_id) { + return false; + } + + if (p1->sched_type != p2->sched_type) { + // in order: DEADLINE, CHAIN_AWARE, CHAIN_INDEPENDENT + return p1->sched_type < p2->sched_type; + } + if (p1->sched_type == CHAIN_INDEPENDENT_PRIORITY) { + if (p1->priority == p2->priority) { + return p1->executable_id < p2->executable_id; + } + // lower value runs first + return p1->priority < p2->priority; + } + if (p1->sched_type == CHAIN_AWARE_PRIORITY) { + if (p1->priority == p2->priority) { + return p1->executable_id < p2->executable_id; + } + return p1->priority < p2->priority; + } + if (p1->sched_type == DEADLINE) { + // TODO: use the counter logic here as well + + uint64_t p1_deadline = 0; + uint64_t p2_deadline = 0; + if (p1->deadlines != nullptr && !p1->deadlines->empty()) { + p1_deadline = p1->deadlines->front(); + } + if (p2->deadlines != nullptr && !p2->deadlines->empty()) { + p2_deadline = p2->deadlines->front(); + } + if (p1_deadline == p2_deadline || p1->deadlines == p2->deadlines) { + // this looks bad and is bad, BUT + // if we tell std::set these are equal, only one will be added, and we + // will lose the other. we need _something_ to make them unique since we'd + // rather finish a chain than start a new one, select the higher id return + // p1->executable_id > p2->executable_id; + // return p1->priority < p2->priority; + if (p1->priority != p2->priority) { + return p1->priority < p2->priority; + } + return p1->executable_id < p2->executable_id; + } + if (p1_deadline == 0) { + p1_deadline = std::numeric_limits::max(); + } + if (p2_deadline == 0) { + p2_deadline = std::numeric_limits::max(); + } + return p1_deadline < p2_deadline; + } else { + std::cout << "invalid compare opration on priority_exec" << std::endl; + return false; + } +} + +template <> +void PriorityMemoryStrategy<>::add_guard_condition( + const rcl_guard_condition_t *guard_condition) { + for (const auto &existing_guard_condition : guard_conditions_) { + if (existing_guard_condition == guard_condition) { + return; + } + } + guard_conditions_.push_back(guard_condition); +} + +template <> +void PriorityMemoryStrategy<>::remove_guard_condition( + const rcl_guard_condition_t *guard_condition) { + for (auto it = guard_conditions_.begin(); it != guard_conditions_.end(); + ++it) { + if (*it == guard_condition) { + guard_conditions_.erase(it); + break; + } + } +} + +template <> +void PriorityMemoryStrategy<>::add_executable_to_queue( + std::shared_ptr e) { + // e may have changed. remove and re-add + all_executables_.erase(e.get()); + // convert from shared_ptr to raw pointer + all_executables_.insert(e.get()); +} + +template <> void PriorityMemoryStrategy<>::clear_handles() { + subscription_handles_.clear(); + service_handles_.clear(); + client_handles_.clear(); + timer_handles_.clear(); + waitable_handles_.clear(); + + // all_executables_ = std::priority_queue, PriorityExecutableComparator>(); + // all_executables_.clear(); + // all_executables_ = std::map(PriorityExecutableComparator()); +} + +template <> +void PriorityMemoryStrategy<>::remove_null_handles(rcl_wait_set_t *wait_set) { + // TODO(jacobperron): Check if wait set sizes are what we expect them to be? + // e.g. wait_set->size_of_clients == client_handles_.size() + + // Important to use subscription_handles_.size() instead of wait set's size + // since there may be more subscriptions in the wait set due to Waitables + // added to the end. The same logic applies for other entities. + for (size_t i = 0; i < subscription_handles_.size(); ++i) { + if (!wait_set->subscriptions[i]) { + priority_map[subscription_handles_[i]]->dont_run(); + subscription_handles_[i].reset(); + } else { + priority_map[subscription_handles_[i]]->allow_run(); + } + } + for (size_t i = 0; i < service_handles_.size(); ++i) { + if (!wait_set->services[i]) { + priority_map[service_handles_[i]]->dont_run(); + service_handles_[i].reset(); + } else { + priority_map[service_handles_[i]]->allow_run(); + } + } + for (size_t i = 0; i < client_handles_.size(); ++i) { + if (!wait_set->clients[i]) { + priority_map[client_handles_[i]]->dont_run(); + client_handles_[i].reset(); + } else { + priority_map[client_handles_[i]]->allow_run(); + } + } + for (size_t i = 0; i < timer_handles_.size(); ++i) { + if (!wait_set->timers[i]) { + priority_map[timer_handles_[i]]->dont_run(); + timer_handles_[i].reset(); + } else { + priority_map[timer_handles_[i]]->allow_run(); + } + } + for (size_t i = 0; i < waitable_handles_.size(); ++i) { + if (!waitable_handles_[i]->is_ready(wait_set)) { + priority_map[waitable_handles_[i]]->dont_run(); + waitable_handles_[i].reset(); + } else { + priority_map[waitable_handles_[i]]->allow_run(); + } + } + + subscription_handles_.erase(std::remove(subscription_handles_.begin(), + subscription_handles_.end(), nullptr), + subscription_handles_.end()); + + service_handles_.erase( + std::remove(service_handles_.begin(), service_handles_.end(), nullptr), + service_handles_.end()); + + client_handles_.erase( + std::remove(client_handles_.begin(), client_handles_.end(), nullptr), + client_handles_.end()); + + timer_handles_.erase( + std::remove(timer_handles_.begin(), timer_handles_.end(), nullptr), + timer_handles_.end()); + + waitable_handles_.erase( + std::remove(waitable_handles_.begin(), waitable_handles_.end(), nullptr), + waitable_handles_.end()); +} + +template <> +bool PriorityMemoryStrategy<>::collect_entities( + const WeakNodeList &weak_nodes) { + + bool has_invalid_weak_nodes = false; + for (auto &weak_node : weak_nodes) { + auto node = weak_node.lock(); + if (!node) { + has_invalid_weak_nodes = true; + continue; + } + for (auto &weak_group : node->get_callback_groups()) { + auto group = weak_group.lock(); + if (!group) + // if (!group || !group->can_be_taken_from().load()) + { + continue; + } + group->find_subscription_ptrs_if( + [this](const rclcpp::SubscriptionBase::SharedPtr &subscription) { + auto subscription_handle = subscription->get_subscription_handle(); + subscription_handles_.push_back(subscription_handle); + add_executable_to_queue( + get_and_reset_priority(subscription_handle, SUBSCRIPTION)); + return false; + }); + group->find_service_ptrs_if( + [this](const rclcpp::ServiceBase::SharedPtr &service) { + add_executable_to_queue( + get_and_reset_priority(service->get_service_handle(), SERVICE)); + service_handles_.push_back(service->get_service_handle()); + return false; + }); + group->find_client_ptrs_if( + [this](const rclcpp::ClientBase::SharedPtr &client) { + add_executable_to_queue( + get_and_reset_priority(client->get_client_handle(), CLIENT)); + client_handles_.push_back(client->get_client_handle()); + return false; + }); + group->find_timer_ptrs_if( + [this](const rclcpp::TimerBase::SharedPtr &timer) { + add_executable_to_queue( + get_and_reset_priority(timer->get_timer_handle(), TIMER)); + timer_handles_.push_back(timer->get_timer_handle()); + return false; + }); + group->find_waitable_ptrs_if( + [this](const rclcpp::Waitable::SharedPtr &waitable) { + add_executable_to_queue(get_and_reset_priority(waitable, WAITABLE)); + waitable_handles_.push_back(waitable); + return false; + }); + } + } + return has_invalid_weak_nodes; +} + +template <> +void PriorityMemoryStrategy<>::add_waitable_handle( + const rclcpp::Waitable::SharedPtr &waitable) { + + if (nullptr == waitable) { + throw std::runtime_error("waitable object unexpectedly nullptr"); + } + waitable_handles_.push_back(waitable); +} + +template <> +bool PriorityMemoryStrategy<>::add_handles_to_wait_set( + rcl_wait_set_t *wait_set) { + for (auto subscription : subscription_handles_) { + if (rcl_wait_set_add_subscription(wait_set, subscription.get(), NULL) != + RCL_RET_OK) { + RCUTILS_LOG_ERROR_NAMED("rclcpp", + "Couldn't add subscription to wait set: %s", + rcl_get_error_string().str); + return false; + } + } + + for (auto client : client_handles_) { + if (rcl_wait_set_add_client(wait_set, client.get(), NULL) != RCL_RET_OK) { + RCUTILS_LOG_ERROR_NAMED("rclcpp", "Couldn't add client to wait set: %s", + rcl_get_error_string().str); + return false; + } + } + + for (auto service : service_handles_) { + if (rcl_wait_set_add_service(wait_set, service.get(), NULL) != RCL_RET_OK) { + RCUTILS_LOG_ERROR_NAMED("rclcpp", "Couldn't add service to wait set: %s", + rcl_get_error_string().str); + return false; + } + } + + for (auto timer : timer_handles_) { + if (rcl_wait_set_add_timer(wait_set, timer.get(), NULL) != RCL_RET_OK) { + RCUTILS_LOG_ERROR_NAMED("rclcpp", "Couldn't add timer to wait set: %s", + rcl_get_error_string().str); + return false; + } + } + + for (auto guard_condition : guard_conditions_) { + if (rcl_wait_set_add_guard_condition(wait_set, guard_condition, NULL) != + RCL_RET_OK) { + RCUTILS_LOG_ERROR_NAMED("rclcpp", + "Couldn't add guard_condition to wait set: %s", + rcl_get_error_string().str); + return false; + } + } + + for (auto waitable : waitable_handles_) { + if (!waitable->add_to_wait_set(wait_set)) { + RCUTILS_LOG_ERROR_NAMED("rclcpp", "Couldn't add waitable to wait set: %s", + rcl_get_error_string().str); + return false; + } + } + return true; } template <> void PriorityMemoryStrategy<>::get_next_executable( - rclcpp::AnyExecutable &any_exec, - const WeakNodeList &weak_nodes) -{ + rclcpp::AnyExecutable &any_exec, const WeakNodeList &weak_nodes) { - const PriorityExecutable *next_exec = nullptr; - // std::cout << "all_executables_.size():" << all_executables_.size() << std::endl; - // log contents of all_executables_ - // std::cout << exec->name << ": " << exec->handle << " : " << exec->sched_type << std::endl; - std::set skip_groups; + const PriorityExecutable *next_exec = nullptr; + // std::cout << "all_executables_.size():" << all_executables_.size() << + // std::endl; log contents of all_executables_ std::cout << exec->name << ": " + // << exec->handle << " : " << exec->sched_type << std::endl; + std::set skip_groups; - // while (!all_executables_.empty()) - for (auto exec : all_executables_) - { - next_exec = exec; - if (!next_exec->can_be_run) - { - continue; - } - ExecutableType type = next_exec->type; - switch (type) - { - case SUBSCRIPTION: - { - std::shared_ptr subs_handle = std::static_pointer_cast(next_exec->handle); - auto subscription = get_subscription_by_handle(subs_handle, weak_nodes); - if (subscription) - { - auto group = get_group_by_subscription(subscription, weak_nodes); - if (!group) - { - // Group was not found, meaning the waitable is not valid... - // Remove it from the ready list and continue looking - // it = subscription_handles_.erase(it); - continue; - } - if (skip_groups.find(group) != skip_groups.end()) - { - // group was used at some point during this loop. skip it so we can re-check it next time - continue; - } - - if (!group->can_be_taken_from().load()) - { - // Group is mutually exclusive and is being used, so skip it for now - // Leave it to be checked next time, but continue searching - // ++it; - skip_groups.insert(group); - continue; - } - any_exec.callback_group = group; - any_exec.subscription = subscription; - any_exec.node_base = get_node_by_group(group, weak_nodes); - // std::cout << "Using new priority sub " << subscription->get_topic_name() << std::endl; - } - } - break; - case SERVICE: - { - std::shared_ptr service_handle = std::static_pointer_cast(next_exec->handle); - auto service = get_service_by_handle(service_handle, weak_nodes); - if (service) - { - auto group = get_group_by_service(service, weak_nodes); - if (!group) - { - // Group was not found, meaning the waitable is not valid... - // Remove it from the ready list and continue looking - // it = subscription_handles_.erase(it); - continue; - } - if (!group->can_be_taken_from().load()) - { - // Group is mutually exclusive and is being used, so skip it for now - // Leave it to be checked next time, but continue searching - // ++it; - continue; - } - any_exec.callback_group = group; - any_exec.service = service; - any_exec.node_base = get_node_by_group(group, weak_nodes); - // std::cout << "Using new priority service " << service->get_service_name() << std::endl; - } - } - break; - case CLIENT: - { - std::shared_ptr client_handle = std::static_pointer_cast(next_exec->handle); - auto client = get_client_by_handle(client_handle, weak_nodes); - if (client) - { - auto group = get_group_by_client(client, weak_nodes); - if (!group) - { - // Group was not found, meaning the waitable is not valid... - // Remove it from the ready list and continue looking - // it = subscription_handles_.erase(it); - continue; - } - if (!group->can_be_taken_from().load()) - { - // Group is mutually exclusive and is being used, so skip it for now - // Leave it to be checked next time, but continue searching - // ++it; - continue; - } - any_exec.callback_group = group; - any_exec.client = client; - any_exec.node_base = get_node_by_group(group, weak_nodes); - // std::cout << "Using new priority client " << client->get_service_name() << std::endl; - } - } - break; - case TIMER: - { - std::shared_ptr timer_handle = std::static_pointer_cast(next_exec->handle); - auto timer = get_timer_by_handle(timer_handle, weak_nodes); - if (timer) - { - auto group = get_group_by_timer(timer, weak_nodes); - if (!group) - { - // Group was not found, meaning the waitable is not valid... - // Remove it from the ready list and continue looking - // it = subscription_handles_.erase(it); - continue; - } - if (skip_groups.find(group) != skip_groups.end()) - { - // group was used at some point during this loop. skip it so we can re-check it next time - continue; - } - if (!group->can_be_taken_from().load()) - { - // Group is mutually exclusive and is being used, so skip it for now - // Leave it to be checked next time, but continue searching - // ++it; - skip_groups.insert(group); - continue; - } - any_exec.callback_group = group; - any_exec.timer = timer; - any_exec.node_base = get_node_by_group(group, weak_nodes); - } - } - break; - case WAITABLE: - { - std::shared_ptr waitable_handle = std::static_pointer_cast(next_exec->waitable); - auto waitable = waitable_handle; - if (waitable) - { - auto group = get_group_by_waitable(waitable, weak_nodes); - if (!group) - { - // Group was not found, meaning the waitable is not valid... - // Remove it from the ready list and continue looking - // it = subscription_handles_.erase(it); - continue; - } - if (!group->can_be_taken_from().load()) - { - // Group is mutually exclusive and is being used, so skip it for now - // Leave it to be checked next time, but continue searching - // ++it; - continue; - } - any_exec.callback_group = group; - any_exec.waitable = waitable; - any_exec.node_base = get_node_by_group(group, weak_nodes); - // std::cout << "Using new priority waitable" << std::endl; - } - } - break; - default: - std::cout << "Unknown type from priority!!!" << std::endl; - continue; - // break; - } - if (next_exec->is_first_in_chain && next_exec->sched_type == DEADLINE) - { - // std::cout << "running first in chain deadline" << std::endl; - } - /* std::ostringstream oss; - oss << "{\"operation\":\"get_next_executable\""; - // output names and handles of all_executables_ in json array - oss << ",\"all_executables\":["; - for (auto exec : all_executables_) - { - // if (exec->can_be_run) - oss << "{\"name\":\"" << exec->name << "\", \"sched_type\":\"" << exec->sched_type << "\", \"can_be_run\":\"" << exec->can_be_run << "\"},"; - } - // remove trailing comma - oss.seekp(-1, oss.cur); - oss << "]}"; - log_entry(logger_, oss.str()); - oss.str(""); - oss.clear(); - oss << "{\"operation\": \"select_task\", \"task\": \"" << exec->name << "\"}"; - log_entry(logger_, oss.str()); */ - - // returning with an executable - // remove from all_executables_ map - all_executables_.erase(next_exec); - // std::stringstream oss; - // oss << "{\"operation\":\"get_next_executable\", \"result\":\"success\", \"type\":\"" << type << "\"}"; - // log_entry(logger_, oss.str()); - return; + // while (!all_executables_.empty()) + for (auto exec : all_executables_) { + next_exec = exec; + if (!next_exec->can_be_run) { + continue; } - // nothing found + ExecutableType type = next_exec->type; + switch (type) { + case SUBSCRIPTION: { + std::shared_ptr subs_handle = + std::static_pointer_cast(next_exec->handle); + auto subscription = get_subscription_by_handle(subs_handle, weak_nodes); + if (subscription) { + auto group = get_group_by_subscription(subscription, weak_nodes); + if (!group) { + // Group was not found, meaning the waitable is not valid... + // Remove it from the ready list and continue looking + // it = subscription_handles_.erase(it); + continue; + } + if (skip_groups.find(group) != skip_groups.end()) { + // group was used at some point during this loop. skip it so we can + // re-check it next time + continue; + } + + if (!group->can_be_taken_from().load()) { + // Group is mutually exclusive and is being used, so skip it for now + // Leave it to be checked next time, but continue searching + // ++it; + skip_groups.insert(group); + continue; + } + any_exec.callback_group = group; + any_exec.subscription = subscription; + any_exec.node_base = get_node_by_group(group, weak_nodes); + // std::cout << "Using new priority sub " << + // subscription->get_topic_name() << std::endl; + } + } break; + case SERVICE: { + std::shared_ptr service_handle = + std::static_pointer_cast(next_exec->handle); + auto service = get_service_by_handle(service_handle, weak_nodes); + if (service) { + auto group = get_group_by_service(service, weak_nodes); + if (!group) { + // Group was not found, meaning the waitable is not valid... + // Remove it from the ready list and continue looking + // it = subscription_handles_.erase(it); + continue; + } + if (!group->can_be_taken_from().load()) { + // Group is mutually exclusive and is being used, so skip it for now + // Leave it to be checked next time, but continue searching + // ++it; + continue; + } + any_exec.callback_group = group; + any_exec.service = service; + any_exec.node_base = get_node_by_group(group, weak_nodes); + // std::cout << "Using new priority service " << + // service->get_service_name() << std::endl; + } + } break; + case CLIENT: { + std::shared_ptr client_handle = + std::static_pointer_cast(next_exec->handle); + auto client = get_client_by_handle(client_handle, weak_nodes); + if (client) { + auto group = get_group_by_client(client, weak_nodes); + if (!group) { + // Group was not found, meaning the waitable is not valid... + // Remove it from the ready list and continue looking + // it = subscription_handles_.erase(it); + continue; + } + if (!group->can_be_taken_from().load()) { + // Group is mutually exclusive and is being used, so skip it for now + // Leave it to be checked next time, but continue searching + // ++it; + continue; + } + any_exec.callback_group = group; + any_exec.client = client; + any_exec.node_base = get_node_by_group(group, weak_nodes); + // std::cout << "Using new priority client " << + // client->get_service_name() << std::endl; + } + } break; + case TIMER: { + std::shared_ptr timer_handle = + std::static_pointer_cast(next_exec->handle); + auto timer = get_timer_by_handle(timer_handle, weak_nodes); + if (timer) { + auto group = get_group_by_timer(timer, weak_nodes); + if (!group) { + // Group was not found, meaning the waitable is not valid... + // Remove it from the ready list and continue looking + // it = subscription_handles_.erase(it); + continue; + } + if (skip_groups.find(group) != skip_groups.end()) { + // group was used at some point during this loop. skip it so we can + // re-check it next time + continue; + } + if (!group->can_be_taken_from().load()) { + // Group is mutually exclusive and is being used, so skip it for now + // Leave it to be checked next time, but continue searching + // ++it; + skip_groups.insert(group); + continue; + } + any_exec.callback_group = group; + any_exec.timer = timer; + any_exec.node_base = get_node_by_group(group, weak_nodes); + } + } break; + case WAITABLE: { + std::shared_ptr waitable_handle = + std::static_pointer_cast(next_exec->waitable); + auto waitable = waitable_handle; + if (waitable) { + auto group = get_group_by_waitable(waitable, weak_nodes); + if (!group) { + // Group was not found, meaning the waitable is not valid... + // Remove it from the ready list and continue looking + // it = subscription_handles_.erase(it); + continue; + } + if (!group->can_be_taken_from().load()) { + // Group is mutually exclusive and is being used, so skip it for now + // Leave it to be checked next time, but continue searching + // ++it; + continue; + } + any_exec.callback_group = group; + any_exec.waitable = waitable; + any_exec.node_base = get_node_by_group(group, weak_nodes); + // std::cout << "Using new priority waitable" << std::endl; + } + } break; + default: + std::cout << "Unknown type from priority!!!" << std::endl; + continue; + // break; + } + if (next_exec->is_first_in_chain && next_exec->sched_type == DEADLINE) { + // std::cout << "running first in chain deadline" << std::endl; + } + /* std::ostringstream oss; + oss << "{\"operation\":\"get_next_executable\""; + // output names and handles of all_executables_ in json array + oss << ",\"all_executables\":["; + for (auto exec : all_executables_) + { + // if (exec->can_be_run) + oss << "{\"name\":\"" << exec->name << "\", \"sched_type\":\"" + << exec->sched_type << "\", \"can_be_run\":\"" << exec->can_be_run << + "\"},"; + } + // remove trailing comma + oss.seekp(-1, oss.cur); + oss << "]}"; + log_entry(logger_, oss.str()); + oss.str(""); + oss.clear(); + oss << "{\"operation\": \"select_task\", \"task\": \"" << exec->name + << "\"}"; log_entry(logger_, oss.str()); */ + + // returning with an executable + // remove from all_executables_ map + all_executables_.erase(next_exec); // std::stringstream oss; - // oss << "{\"operation\":\"get_next_executable\", \"result\":\"nothing_found\"}"; + // oss << "{\"operation\":\"get_next_executable\", \"result\":\"success\", + // \"type\":\"" << type << "\"}"; log_entry(logger_, oss.str()); + return; + } + // nothing found + // std::stringstream oss; + // oss << "{\"operation\":\"get_next_executable\", + // \"result\":\"nothing_found\"}"; log_entry(logger_, oss.str()); +} + +template <> +void PriorityMemoryStrategy<>::post_execute(rclcpp::AnyExecutable any_exec, + int thread_id) { + std::shared_ptr next_exec = nullptr; + if (any_exec.subscription) { + next_exec = + get_priority_settings(any_exec.subscription->get_subscription_handle()); + } else if (any_exec.service) { + next_exec = get_priority_settings(any_exec.service->get_service_handle()); + } else if (any_exec.client) { + next_exec = get_priority_settings(any_exec.client->get_client_handle()); + } else if (any_exec.timer) { + next_exec = get_priority_settings(any_exec.timer->get_timer_handle()); + } else if (any_exec.waitable) { + next_exec = get_priority_settings(any_exec.waitable); + } + if (next_exec == nullptr) { + RCLCPP_WARN(rclcpp::get_logger("rclcpp"), + "Could not find priority settings for handle"); + } + // callback has executed + // std::cout<< "running callback. first? " << next_exec->is_first_in_chain << + // " type " << next_exec->sched_type << std::endl; + if (next_exec->is_first_in_chain && next_exec->sched_type != DEADLINE) { + /* + timespec current_time; + clock_gettime(CLOCK_MONOTONIC_RAW, ¤t_time); + uint64_t millis = (current_time.tv_sec * (uint64_t)1000) + + (current_time.tv_nsec / 1000000); + + auto timer = next_exec->timer_handle; + int64_t time_until_next_call = timer->time_until_trigger().count() / + 1000000; + // std::cout << "end of chain. time until trigger: " << + std::to_string(time_until_next_call) << std::endl; + */ + } + if (next_exec->is_last_in_chain && next_exec->sched_type == DEADLINE) { + // did we make the deadline? + timespec current_time; + clock_gettime(CLOCK_MONOTONIC_RAW, ¤t_time); + uint64_t millis = + (current_time.tv_sec * 1000UL) + (current_time.tv_nsec / 1000000); + uint64_t this_deadline = next_exec->deadlines->front(); + // RCLCPP_INFO(rclcpp::get_logger("rclcpp"), "last, chain id: %d", + // next_exec->chain_id); for (auto deadline : *next_exec->deadlines) + // { + // RCLCPP_INFO(rclcpp::get_logger("rclcpp"), "deadline: %d", deadline); + // } + std::ostringstream oss; + // oss << "{\"operation\": \"deadline_check\", \"chain_id\": " << + // next_exec->chain_id << ", \"deadline\": " << + // next_exec->deadlines->front() << ", \"current_time\": " << millis << "}"; // log_entry(logger_, oss.str()); + + if (next_exec->timer_handle == nullptr) { + std::cout << "tried to use a chain without a timer handle!!!" + << std::endl; + } + auto timer = next_exec->timer_handle; + if (timer == nullptr) { + std::cout << "somehow, this timer handle didn't have an associated timer" + << std::endl; + } + // timespec current_time; + // clock_gettime(CLOCK_MONOTONIC_RAW, ¤t_time); + // uint64_t millis = (current_time.tv_sec * (uint64_t)1000) + + // (current_time.tv_nsec / 1000000); uint64_t this_deadline = + // next_exec->deadlines->front(); + uint64_t next_deadline = 0; + bool on_time; + // time_diff may be negative, to use a signed type to prevent overflow + int64_t time_diff = millis - this_deadline; + int periods_late; + // if time_diff is negative, we completed on time. add one period to the + // deadline + if (time_diff <= 0) { + periods_late = 0; + next_deadline = this_deadline + next_exec->period; + on_time = true; + } + // if time_diff is positive, we completed late. add one period for each + // period we were late + else { + periods_late = std::ceil(time_diff / (double)next_exec->period); + next_deadline = this_deadline + (periods_late)*next_exec->period; + on_time = false; + } + // std::chrono::nanoseconds time_until = + // next_exec->timer_handle->time_until_trigger(); next_deadline = millis + + // (time_until.count() / 1000000) + next_exec->period; the next deadline is + // the current time plus the period, skipping periods that have already + // passed + + // RCLCPP_INFO(rclcpp::get_logger("rclcpp"), "next deadline: %d", + // next_deadline); + oss << "{\"operation\": \"next_deadline\", \"chain_id\": " + << next_exec->chain_id << ", \"deadline\": " << next_deadline + << ", \"on_time\": " << on_time << ", \"time_diff\": " << time_diff + << ", \"periods_late\": " << periods_late; + if (thread_id != -1) { + oss << ", \"thread_id\": " << thread_id; + } + + oss << "}"; + // oss << "{\"operation\": \"next_deadline\", \"chain_id\": " << + // next_exec->chain_id << ", \"deadline\": " << + // next_exec->deadlines->front() << "}"; + log_entry(logger_, oss.str()); + if (time_diff < -next_exec->period) { + // the deadline was much later than it needed to be + } + next_exec->deadlines->push_back(next_deadline); + + // print chain id and deadline contents + // RCLCPP_INFO(rclcpp::get_logger("rclcpp"), "chain id: %d", + // next_exec->chain_id); for (auto deadline : *next_exec->deadlines) + // { + // RCLCPP_INFO(rclcpp::get_logger("rclcpp"), "deadline: %d", deadline); + // } + // std::cout << "deadline set" << std::endl; + + if (!next_exec->deadlines->empty()) + next_exec->deadlines->pop_front(); + } + if (next_exec->sched_type == CHAIN_AWARE_PRIORITY || + next_exec->sched_type == DEADLINE) { + // this is safe, since we popped it earlier + // get a mutable reference + // TODO: find a cleaner way to do this + std::shared_ptr mut_executable = + get_priority_settings(next_exec->handle); + // std::cout << "running chain aware cb" << std::endl; + mut_executable->increment_counter(); + } } template <> -void PriorityMemoryStrategy<>::post_execute(rclcpp::AnyExecutable any_exec, int thread_id) -{ +void PriorityMemoryStrategy<>::get_next_subscription( + rclcpp::AnyExecutable &any_exec, const WeakNodeList &weak_nodes) { - std::shared_ptr next_exec = nullptr; - if (any_exec.subscription) - { - next_exec = get_priority_settings(any_exec.subscription->get_subscription_handle()); - } - else if (any_exec.service) - { - next_exec = get_priority_settings(any_exec.service->get_service_handle()); - } - else if (any_exec.client) - { - next_exec = get_priority_settings(any_exec.client->get_client_handle()); - } - else if (any_exec.timer) - { - next_exec = get_priority_settings(any_exec.timer->get_timer_handle()); - } - else if (any_exec.waitable) - { - next_exec = get_priority_settings(any_exec.waitable); - } - if (next_exec == nullptr) - { - RCLCPP_WARN(rclcpp::get_logger("rclcpp"), "Could not find priority settings for handle"); - } - // callback has executed - // std::cout<< "running callback. first? " << next_exec->is_first_in_chain << " type " << next_exec->sched_type << std::endl; - if (next_exec->is_first_in_chain && next_exec->sched_type != DEADLINE) - { - /* - timespec current_time; - clock_gettime(CLOCK_MONOTONIC_RAW, ¤t_time); - uint64_t millis = (current_time.tv_sec * (uint64_t)1000) + (current_time.tv_nsec / 1000000); - - auto timer = next_exec->timer_handle; - int64_t time_until_next_call = timer->time_until_trigger().count() / 1000000; - // std::cout << "end of chain. time until trigger: " << std::to_string(time_until_next_call) << std::endl; - */ - } - if (next_exec->is_last_in_chain && next_exec->sched_type == DEADLINE) - { - // did we make the deadline? - timespec current_time; - clock_gettime(CLOCK_MONOTONIC_RAW, ¤t_time); - uint64_t millis = (current_time.tv_sec * 1000UL) + (current_time.tv_nsec / 1000000); - uint64_t this_deadline = next_exec->deadlines->front(); - // RCLCPP_INFO(rclcpp::get_logger("rclcpp"), "last, chain id: %d", next_exec->chain_id); - // for (auto deadline : *next_exec->deadlines) - // { - // RCLCPP_INFO(rclcpp::get_logger("rclcpp"), "deadline: %d", deadline); - // } - std::ostringstream oss; - // oss << "{\"operation\": \"deadline_check\", \"chain_id\": " << next_exec->chain_id << ", \"deadline\": " << next_exec->deadlines->front() << ", \"current_time\": " << millis << "}"; - // log_entry(logger_, oss.str()); - - if (next_exec->timer_handle == nullptr) - { - std::cout << "tried to use a chain without a timer handle!!!" << std::endl; - } - auto timer = next_exec->timer_handle; - if (timer == nullptr) - { - std::cout << "somehow, this timer handle didn't have an associated timer" << std::endl; - } - // timespec current_time; - // clock_gettime(CLOCK_MONOTONIC_RAW, ¤t_time); - // uint64_t millis = (current_time.tv_sec * (uint64_t)1000) + (current_time.tv_nsec / 1000000); - // uint64_t this_deadline = next_exec->deadlines->front(); - uint64_t next_deadline = 0; - bool on_time; - // time_diff may be negative, to use a signed type to prevent overflow - int64_t time_diff = millis - this_deadline; - int periods_late; - // if time_diff is negative, we completed on time. add one period to the deadline - if (time_diff <= 0) - { - periods_late = 0; - next_deadline = this_deadline + next_exec->period; - on_time = true; - } - // if time_diff is positive, we completed late. add one period for each period we were late - else - { - periods_late = std::ceil(time_diff / (double)next_exec->period); - next_deadline = this_deadline + (periods_late)*next_exec->period; - on_time = false; - } - // std::chrono::nanoseconds time_until = next_exec->timer_handle->time_until_trigger(); - // next_deadline = millis + (time_until.count() / 1000000) + next_exec->period; - // the next deadline is the current time plus the period, skipping periods that have already passed - - // RCLCPP_INFO(rclcpp::get_logger("rclcpp"), "next deadline: %d", next_deadline); - oss << "{\"operation\": \"next_deadline\", \"chain_id\": " << next_exec->chain_id << ", \"deadline\": " << next_deadline << ", \"on_time\": " << on_time << ", \"time_diff\": " << time_diff << ", \"periods_late\": " << periods_late; - if (thread_id != -1) - { - oss << ", \"thread_id\": " << thread_id; - } - - oss << "}"; - // oss << "{\"operation\": \"next_deadline\", \"chain_id\": " << next_exec->chain_id << ", \"deadline\": " << next_exec->deadlines->front() << "}"; - log_entry(logger_, oss.str()); - if (time_diff < -next_exec->period) - { - // the deadline was much later than it needed to be - } - next_exec->deadlines->push_back(next_deadline); - - // print chain id and deadline contents - // RCLCPP_INFO(rclcpp::get_logger("rclcpp"), "chain id: %d", next_exec->chain_id); - // for (auto deadline : *next_exec->deadlines) - // { - // RCLCPP_INFO(rclcpp::get_logger("rclcpp"), "deadline: %d", deadline); - // } - // std::cout << "deadline set" << std::endl; - - if (!next_exec->deadlines->empty()) - next_exec->deadlines->pop_front(); - } - if (next_exec->sched_type == CHAIN_AWARE_PRIORITY || next_exec->sched_type == DEADLINE) - { - // this is safe, since we popped it earlier - // get a mutable reference - // TODO: find a cleaner way to do this - std::shared_ptr mut_executable = get_priority_settings(next_exec->handle); - // std::cout << "running chain aware cb" << std::endl; - mut_executable->increment_counter(); - } -} - -template <> -void PriorityMemoryStrategy<>:: - get_next_subscription( - rclcpp::AnyExecutable &any_exec, - const WeakNodeList &weak_nodes) -{ - - auto it = subscription_handles_.begin(); - while (it != subscription_handles_.end()) - { - auto subscription = get_subscription_by_handle(*it, weak_nodes); - if (subscription) - { - // Find the group for this handle and see if it can be serviced - auto group = get_group_by_subscription(subscription, weak_nodes); - if (!group) - { - // Group was not found, meaning the subscription is not valid... - // Remove it from the ready list and continue looking - it = subscription_handles_.erase(it); - continue; - } - if (!group->can_be_taken_from().load()) - { - // Group is mutually exclusive and is being used, so skip it for now - // Leave it to be checked next time, but continue searching - ++it; - continue; - } - // Otherwise it is safe to set and return the any_exec - any_exec.subscription = subscription; - any_exec.callback_group = group; - any_exec.node_base = get_node_by_group(group, weak_nodes); - subscription_handles_.erase(it); - return; - } - // Else, the subscription is no longer valid, remove it and continue + auto it = subscription_handles_.begin(); + while (it != subscription_handles_.end()) { + auto subscription = get_subscription_by_handle(*it, weak_nodes); + if (subscription) { + // Find the group for this handle and see if it can be serviced + auto group = get_group_by_subscription(subscription, weak_nodes); + if (!group) { + // Group was not found, meaning the subscription is not valid... + // Remove it from the ready list and continue looking it = subscription_handles_.erase(it); + continue; + } + if (!group->can_be_taken_from().load()) { + // Group is mutually exclusive and is being used, so skip it for now + // Leave it to be checked next time, but continue searching + ++it; + continue; + } + // Otherwise it is safe to set and return the any_exec + any_exec.subscription = subscription; + any_exec.callback_group = group; + any_exec.node_base = get_node_by_group(group, weak_nodes); + subscription_handles_.erase(it); + return; } + // Else, the subscription is no longer valid, remove it and continue + it = subscription_handles_.erase(it); + } } template <> -void PriorityMemoryStrategy<>:: - get_next_service( - rclcpp::AnyExecutable &any_exec, - const WeakNodeList &weak_nodes) -{ - auto it = service_handles_.begin(); - while (it != service_handles_.end()) - { - auto service = get_service_by_handle(*it, weak_nodes); - if (service) - { - // Find the group for this handle and see if it can be serviced - auto group = get_group_by_service(service, weak_nodes); - if (!group) - { - // Group was not found, meaning the service is not valid... - // Remove it from the ready list and continue looking - it = service_handles_.erase(it); - continue; - } - if (!group->can_be_taken_from().load()) - { - // Group is mutually exclusive and is being used, so skip it for now - // Leave it to be checked next time, but continue searching - ++it; - continue; - } - // Otherwise it is safe to set and return the any_exec - any_exec.service = service; - any_exec.callback_group = group; - any_exec.node_base = get_node_by_group(group, weak_nodes); - service_handles_.erase(it); - return; - } - // Else, the service is no longer valid, remove it and continue +void PriorityMemoryStrategy<>::get_next_service( + rclcpp::AnyExecutable &any_exec, const WeakNodeList &weak_nodes) { + auto it = service_handles_.begin(); + while (it != service_handles_.end()) { + auto service = get_service_by_handle(*it, weak_nodes); + if (service) { + // Find the group for this handle and see if it can be serviced + auto group = get_group_by_service(service, weak_nodes); + if (!group) { + // Group was not found, meaning the service is not valid... + // Remove it from the ready list and continue looking it = service_handles_.erase(it); + continue; + } + if (!group->can_be_taken_from().load()) { + // Group is mutually exclusive and is being used, so skip it for now + // Leave it to be checked next time, but continue searching + ++it; + continue; + } + // Otherwise it is safe to set and return the any_exec + any_exec.service = service; + any_exec.callback_group = group; + any_exec.node_base = get_node_by_group(group, weak_nodes); + service_handles_.erase(it); + return; } + // Else, the service is no longer valid, remove it and continue + it = service_handles_.erase(it); + } } template <> -void PriorityMemoryStrategy<>:: - get_next_client( - rclcpp::AnyExecutable &any_exec, - const WeakNodeList &weak_nodes) -{ +void PriorityMemoryStrategy<>::get_next_client(rclcpp::AnyExecutable &any_exec, + const WeakNodeList &weak_nodes) { - auto it = client_handles_.begin(); - while (it != client_handles_.end()) - { - auto client = get_client_by_handle(*it, weak_nodes); - if (client) - { - // Find the group for this handle and see if it can be serviced - auto group = get_group_by_client(client, weak_nodes); - if (!group) - { - // Group was not found, meaning the service is not valid... - // Remove it from the ready list and continue looking - it = client_handles_.erase(it); - continue; - } - if (!group->can_be_taken_from().load()) - { - // Group is mutually exclusive and is being used, so skip it for now - // Leave it to be checked next time, but continue searching - ++it; - continue; - } - // Otherwise it is safe to set and return the any_exec - any_exec.client = client; - any_exec.callback_group = group; - any_exec.node_base = get_node_by_group(group, weak_nodes); - client_handles_.erase(it); - return; - } - // Else, the service is no longer valid, remove it and continue + auto it = client_handles_.begin(); + while (it != client_handles_.end()) { + auto client = get_client_by_handle(*it, weak_nodes); + if (client) { + // Find the group for this handle and see if it can be serviced + auto group = get_group_by_client(client, weak_nodes); + if (!group) { + // Group was not found, meaning the service is not valid... + // Remove it from the ready list and continue looking it = client_handles_.erase(it); + continue; + } + if (!group->can_be_taken_from().load()) { + // Group is mutually exclusive and is being used, so skip it for now + // Leave it to be checked next time, but continue searching + ++it; + continue; + } + // Otherwise it is safe to set and return the any_exec + any_exec.client = client; + any_exec.callback_group = group; + any_exec.node_base = get_node_by_group(group, weak_nodes); + client_handles_.erase(it); + return; } + // Else, the service is no longer valid, remove it and continue + it = client_handles_.erase(it); + } } template <> -void PriorityMemoryStrategy<>:: - get_next_timer( - rclcpp::AnyExecutable &any_exec, - const WeakNodeList &weak_nodes) -{ +void PriorityMemoryStrategy<>::get_next_timer(rclcpp::AnyExecutable &any_exec, + const WeakNodeList &weak_nodes) { - auto it = timer_handles_.begin(); - while (it != timer_handles_.end()) - { - auto timer = get_timer_by_handle(*it, weak_nodes); - if (timer) - { - // Find the group for this handle and see if it can be serviced - auto group = get_group_by_timer(timer, weak_nodes); - if (!group) - { - // Group was not found, meaning the timer is not valid... - // Remove it from the ready list and continue looking - it = timer_handles_.erase(it); - continue; - } - if (!group->can_be_taken_from().load()) - { - // Group is mutually exclusive and is being used, so skip it for now - // Leave it to be checked next time, but continue searching - ++it; - continue; - } - // Otherwise it is safe to set and return the any_exec - any_exec.timer = timer; - any_exec.callback_group = group; - any_exec.node_base = get_node_by_group(group, weak_nodes); - timer_handles_.erase(it); - return; - } - // Else, the service is no longer valid, remove it and continue + auto it = timer_handles_.begin(); + while (it != timer_handles_.end()) { + auto timer = get_timer_by_handle(*it, weak_nodes); + if (timer) { + // Find the group for this handle and see if it can be serviced + auto group = get_group_by_timer(timer, weak_nodes); + if (!group) { + // Group was not found, meaning the timer is not valid... + // Remove it from the ready list and continue looking it = timer_handles_.erase(it); + continue; + } + if (!group->can_be_taken_from().load()) { + // Group is mutually exclusive and is being used, so skip it for now + // Leave it to be checked next time, but continue searching + ++it; + continue; + } + // Otherwise it is safe to set and return the any_exec + any_exec.timer = timer; + any_exec.callback_group = group; + any_exec.node_base = get_node_by_group(group, weak_nodes); + timer_handles_.erase(it); + return; } + // Else, the service is no longer valid, remove it and continue + it = timer_handles_.erase(it); + } } template <> -void PriorityMemoryStrategy<>:: - get_next_waitable( - rclcpp::AnyExecutable &any_exec, - const WeakNodeList &weak_nodes) -{ - auto it = waitable_handles_.begin(); - while (it != waitable_handles_.end()) - { - auto waitable = *it; - if (waitable) - { - // Find the group for this handle and see if it can be serviced - auto group = get_group_by_waitable(waitable, weak_nodes); - if (!group) - { - // Group was not found, meaning the waitable is not valid... - // Remove it from the ready list and continue looking - it = waitable_handles_.erase(it); - continue; - } - if (!group->can_be_taken_from().load()) - { - // Group is mutually exclusive and is being used, so skip it for now - // Leave it to be checked next time, but continue searching - ++it; - continue; - } - // Otherwise it is safe to set and return the any_exec - any_exec.waitable = waitable; - any_exec.callback_group = group; - any_exec.node_base = get_node_by_group(group, weak_nodes); - waitable_handles_.erase(it); - return; - } - // Else, the waitable is no longer valid, remove it and continue +void PriorityMemoryStrategy<>::get_next_waitable( + rclcpp::AnyExecutable &any_exec, const WeakNodeList &weak_nodes) { + auto it = waitable_handles_.begin(); + while (it != waitable_handles_.end()) { + auto waitable = *it; + if (waitable) { + // Find the group for this handle and see if it can be serviced + auto group = get_group_by_waitable(waitable, weak_nodes); + if (!group) { + // Group was not found, meaning the waitable is not valid... + // Remove it from the ready list and continue looking it = waitable_handles_.erase(it); + continue; + } + if (!group->can_be_taken_from().load()) { + // Group is mutually exclusive and is being used, so skip it for now + // Leave it to be checked next time, but continue searching + ++it; + continue; + } + // Otherwise it is safe to set and return the any_exec + any_exec.waitable = waitable; + any_exec.callback_group = group; + any_exec.node_base = get_node_by_group(group, weak_nodes); + waitable_handles_.erase(it); + return; } + // Else, the waitable is no longer valid, remove it and continue + it = waitable_handles_.erase(it); + } } diff --git a/src/priority_executor/src/usage_example.cpp b/src/priority_executor/src/usage_example.cpp index 8750cdc..e6a8d3d 100755 --- a/src/priority_executor/src/usage_example.cpp +++ b/src/priority_executor/src/usage_example.cpp @@ -1,79 +1,98 @@ -#include "rclcpp/rclcpp.hpp" #include "priority_executor/priority_executor.hpp" #include "priority_executor/priority_memory_strategy.hpp" -#include -#include -#include +#include "rclcpp/rclcpp.hpp" #include "std_msgs/msg/string.hpp" +#include +#include +#include // re-create the classic talker-listener example with two listeners class Talker : public rclcpp::Node { public: - Talker() : Node("talker") - { - // Create a publisher on the "chatter" topic with 10 msg queue size. - pub_ = this->create_publisher("chatter", 10); - // Create a timer of period 1s that calls our callback member function. - timer_ = this->create_wall_timer(std::chrono::seconds(1), std::bind(&Talker::timer_callback, this)); - } - // the timer must be public - rclcpp::TimerBase::SharedPtr timer_; + Talker() : Node("talker") + { + // Create a publisher on the "chatter" topic with 10 msg queue size. + pub_ = this->create_publisher("chatter", 10); + // Create a timer of period 1s that calls our callback member function. + timer_ = this->create_wall_timer(std::chrono::seconds(1), + std::bind(&Talker::timer_callback, this)); + } + // the timer must be public + rclcpp::TimerBase::SharedPtr timer_; private: - void timer_callback() - { - // Create a message and publish it 10 times. - std_msgs::msg::String msg; - msg.data = "Hello World!"; - for (int i = 0; i < 10; ++i) - { - RCLCPP_INFO(this->get_logger(), "Publishing: '%s'", msg.data.c_str()); - pub_->publish(msg); - } - } - rclcpp::Publisher::SharedPtr pub_; + void timer_callback() + { + std_msgs::msg::String msg; + msg.data = "Hello World!"; + RCLCPP_INFO(this->get_logger(), "Publishing: '%s'", msg.data.c_str()); + pub_->publish(msg); + } + rclcpp::Publisher::SharedPtr pub_; }; class Listener : public rclcpp::Node { public: - Listener(std::string name) : Node(name) - { - // Create a subscription on the "chatter" topic with the default callback method. - sub_ = this->create_subscription("chatter", 10, std::bind(&Listener::callback, this, std::placeholders::_1)); - } - // the publisher must be public - rclcpp::Subscription::SharedPtr sub_; + Listener(std::string name) : Node(name) + { + // Create a subscription on the "chatter" topic with the default callback + // method. + sub_ = this->create_subscription( + "chatter", 10, + std::bind(&Listener::callback, this, std::placeholders::_1)); + } + // the publisher must be public + rclcpp::Subscription::SharedPtr sub_; private: - void callback(const std_msgs::msg::String::SharedPtr msg) - { - RCLCPP_INFO(this->get_logger(), "I heard: '%s'", msg->data.c_str()); - } + void callback(const std_msgs::msg::String::SharedPtr msg) + { + RCLCPP_INFO(this->get_logger(), "I heard: '%s'", msg->data.c_str()); + } }; int main(int argc, char **argv) { - rclcpp::init(argc, argv); - auto talker = std::make_shared(); - auto listener1 = std::make_shared("listener1"); - auto listener2 = std::make_shared("listener2"); - rclcpp::ExecutorOptions options; + rclcpp::init(argc, argv); + auto talker = std::make_shared(); + auto listener1 = std::make_shared("listener1"); + auto listener2 = std::make_shared("listener2"); + rclcpp::ExecutorOptions options; - auto strategy = std::make_shared>(); - options.memory_strategy = strategy; - auto executor = new timed_executor::TimedExecutor(options); - // replace the above line with the following line to use the default executor - // which will intermix the execution of listener1 and listener2 - // auto executor = std::make_shared(options); + auto strategy = std::make_shared>(); + options.memory_strategy = strategy; + auto executor = new timed_executor::TimedExecutor(options); + // replace the above line with the following line to use the default executor + // which will intermix the execution of listener1 and listener2 + // auto executor = + // std::make_shared(options); - strategy->set_executable_deadline(talker->timer_->get_timer_handle(), 100, TIMER); - strategy->set_executable_deadline(listener1->sub_->get_subscription_handle(), 100, SUBSCRIPTION); - strategy->set_executable_deadline(listener2->sub_->get_subscription_handle(), 50, SUBSCRIPTION); - executor->add_node(talker); - executor->add_node(listener1); - executor->add_node(listener2); + strategy->set_executable_deadline(talker->timer_->get_timer_handle(), 1000, + TIMER); + strategy->get_priority_settings(talker->timer_->get_timer_handle()) + ->timer_handle = talker->timer_; + strategy->set_first_in_chain(talker->timer_->get_timer_handle()); + strategy->get_priority_settings(talker->timer_->get_timer_handle()); + strategy->set_executable_deadline(listener1->sub_->get_subscription_handle(), + 1000, SUBSCRIPTION); + strategy->set_executable_deadline(listener2->sub_->get_subscription_handle(), + 1000, SUBSCRIPTION); + strategy->set_last_in_chain(listener2->sub_->get_subscription_handle()); + executor->add_node(talker); + executor->add_node(listener1); + executor->add_node(listener2); - executor->spin(); + std::cout << *strategy->get_priority_settings( + talker->timer_->get_timer_handle()) + << std::endl; + std::cout << *strategy->get_priority_settings( + listener1->sub_->get_subscription_handle()) + << std::endl; + std::cout << *strategy->get_priority_settings( + listener2->sub_->get_subscription_handle()) + << std::endl; + + executor->spin(); } diff --git a/src/simple_timer/CMakeLists.txt b/src/simple_timer/CMakeLists.txt new file mode 100644 index 0000000..f0630d9 --- /dev/null +++ b/src/simple_timer/CMakeLists.txt @@ -0,0 +1,61 @@ +cmake_minimum_required(VERSION 3.5) +project(simple_timer) + +# Default to C99 +if(NOT CMAKE_C_STANDARD) + set(CMAKE_C_STANDARD 99) +endif() + +# Default to C++14 +if(NOT CMAKE_CXX_STANDARD) + set(CMAKE_CXX_STANDARD 14) +endif() + +if(CMAKE_COMPILER_IS_GNUCXX OR CMAKE_CXX_COMPILER_ID MATCHES "Clang") + add_compile_options(-Wall -Wextra -Wpedantic) +endif() + +# find dependencies +find_package(ament_cmake REQUIRED) +# uncomment the following section in order to fill in +# further dependencies manually. +# find_package( REQUIRED) + +add_library(simple_timer src/cycle_timer.cpp src/period_timer.cpp) +target_include_directories(simple_timer PUBLIC + $ + $ +) +add_library(rt-sched src/rt-sched.cpp) +target_include_directories(rt-sched PUBLIC + $ + $ +) + +if(BUILD_TESTING) + find_package(ament_lint_auto REQUIRED) + # the following line skips the linter which checks for copyrights + # uncomment the line when a copyright and license is not present in all source files + #set(ament_cmake_copyright_FOUND TRUE) + # the following line skips cpplint (only works in a git repo) + # uncomment the line when this package is not in a git repo + #set(ament_cmake_cpplint_FOUND TRUE) + ament_lint_auto_find_test_dependencies() +endif() + +install( + DIRECTORY include/ + DESTINATION include +) + +install( + TARGETS simple_timer rt-sched + LIBRARY DESTINATION lib + ARCHIVE DESTINATION lib + RUNTIME DESTINATION bin +) + +ament_export_include_directories(include) +ament_export_libraries(simple_timer) +ament_export_libraries(rt-sched) +ament_package() diff --git a/src/simple_timer/include/simple_timer/cycle_timer.hpp b/src/simple_timer/include/simple_timer/cycle_timer.hpp new file mode 100644 index 0000000..556c8d4 --- /dev/null +++ b/src/simple_timer/include/simple_timer/cycle_timer.hpp @@ -0,0 +1,23 @@ +#ifndef __CYCLE_TIMER__ +#define __CYCLE_TIMER__ + +#include +#include "simple_timer/rt-sched.hpp" +namespace simple_timer +{ + class CycleTimer + { + public: + CycleTimer(long start_delay=0); + void tick() ; + const u64 start_delay_time; + u64 start_time = 0; + u64 last_cycle_time = 0; + unsigned long max_diff = 0; + unsigned long min_diff = 0; + unsigned long last_diff = 0; + bool recording = false; + }; +} // namespace simple_timer + +#endif \ No newline at end of file diff --git a/src/simple_timer/include/simple_timer/period_timer.hpp b/src/simple_timer/include/simple_timer/period_timer.hpp new file mode 100644 index 0000000..bdc909c --- /dev/null +++ b/src/simple_timer/include/simple_timer/period_timer.hpp @@ -0,0 +1,25 @@ +#ifndef __PERIOD_TIMER__ +#define __PERIOD_TIMER__ + +#include +#include "simple_timer/rt-sched.hpp" +namespace simple_timer +{ + class PeriodTimer + { + public: + PeriodTimer(long start_delay = 0); + void start(); + void stop(); + const u64 start_delay_time; + u64 start_time = 0; + + u64 last_period_time = 0; + unsigned long max_period = 0; + unsigned long min_period = 0; + unsigned long last_period = 0; + bool recording = false; + }; +} // namespace simple_timer + +#endif \ No newline at end of file diff --git a/src/simple_timer/include/simple_timer/rt-sched.hpp b/src/simple_timer/include/simple_timer/rt-sched.hpp new file mode 100644 index 0000000..4f035b2 --- /dev/null +++ b/src/simple_timer/include/simple_timer/rt-sched.hpp @@ -0,0 +1,98 @@ +// SPDX-License-Identifier: GPL-2.0-or-later +/* + * rt-sched.h - sched_setattr() and sched_getattr() API + * (C) Dario Faggioli , 2009, 2010 + * Copyright (C) 2014 BMW Car IT GmbH, Daniel Wagner +#include +#include +#include +#include + +#ifndef SCHED_DEADLINE +#define SCHED_DEADLINE 6 +#endif + +#ifdef __x86_64__ +#define __NR_sched_setattr 314 +#define __NR_sched_getattr 315 +#endif + +#ifdef __i386__ +#define __NR_sched_setattr 351 +#define __NR_sched_getattr 352 +#endif + +#ifdef __arm__ +#ifndef __NR_sched_setattr +#define __NR_sched_setattr 380 +#endif +#ifndef __NR_sched_getattr +#define __NR_sched_getattr 381 +#endif +#endif + +#ifdef __tilegx__ +#define __NR_sched_setattr 274 +#define __NR_sched_getattr 275 +#endif + +typedef unsigned long long u64; +#define NS_TO_MS 1000000 +struct sched_attr { + uint32_t size; + uint32_t sched_policy; + uint64_t sched_flags; + + /* SCHED_NORMAL, SCHED_BATCH */ + int32_t sched_nice; + + /* SCHED_FIFO, SCHED_RR */ + uint32_t sched_priority; + + /* SCHED_DEADLINE */ + uint64_t sched_runtime; + uint64_t sched_deadline; + uint64_t sched_period; +}; + +int sched_setattr(pid_t pid, + const struct sched_attr *attr, + unsigned int flags); + +int sched_getattr(pid_t pid, + struct sched_attr *attr, + unsigned int size, + unsigned int flags); + +u64 get_time_us(void); + +typedef struct node_time_logger +{ + std::shared_ptr>> recorded_times; +} node_time_logger; + +void log_entry(node_time_logger logger, std::string text); +node_time_logger create_logger(); + +inline u64 get_time_us(void) +{ + struct timespec ts; + unsigned long long time; + + clock_gettime(CLOCK_MONOTONIC_RAW, &ts); + time = ts.tv_sec * 1000000; + time += ts.tv_nsec / 1000; + + return time; +} + +#endif /* __RT_SCHED_H__ */ diff --git a/src/simple_timer/package.xml b/src/simple_timer/package.xml new file mode 100644 index 0000000..f009c67 --- /dev/null +++ b/src/simple_timer/package.xml @@ -0,0 +1,18 @@ + + + + simple_timer + 0.0.0 + TODO: Package description + nvidia + TODO: License declaration + + ament_cmake + + ament_lint_auto + ament_lint_common + + + ament_cmake + + diff --git a/src/simple_timer/src/cycle_timer.cpp b/src/simple_timer/src/cycle_timer.cpp new file mode 100644 index 0000000..143f13b --- /dev/null +++ b/src/simple_timer/src/cycle_timer.cpp @@ -0,0 +1,43 @@ +#include "simple_timer/cycle_timer.hpp" + +namespace simple_timer +{ + CycleTimer::CycleTimer(long start_delay) : start_delay_time(start_delay * 1000) + { + } + + void CycleTimer::tick() + { + u64 current_wall_time = get_time_us(); + u64 time_diff = 0; + + if (!recording) + { + + if (start_time == 0) + { + start_time = current_wall_time; + } + else if (current_wall_time - start_time > start_delay_time) + { + recording = true; + last_cycle_time = current_wall_time; + start_time = current_wall_time; + } + } + else + { + time_diff = current_wall_time - last_cycle_time; + if (time_diff < min_diff || min_diff == 0) + { + min_diff = time_diff; + } + if (time_diff > max_diff || max_diff == 0) + { + max_diff = time_diff; + } + last_cycle_time = current_wall_time; + last_diff = time_diff; + } + } +} // namespace simple_timer diff --git a/src/simple_timer/src/period_timer.cpp b/src/simple_timer/src/period_timer.cpp new file mode 100644 index 0000000..9c9a9ce --- /dev/null +++ b/src/simple_timer/src/period_timer.cpp @@ -0,0 +1,56 @@ + +#include "simple_timer/period_timer.hpp" + +namespace simple_timer +{ + PeriodTimer::PeriodTimer(long start_delay) : start_delay_time(start_delay * 1000) + { + } + + void PeriodTimer::start() + { + u64 current_wall_time = get_time_us(); + + if (!recording) + { + + if (start_time == 0) + { + start_time = current_wall_time; + } + else if (current_wall_time - start_time > start_delay_time) + { + recording = true; + start_time = current_wall_time; + last_period_time = current_wall_time; + } + } + else + { + last_period_time = current_wall_time; + } + } + void PeriodTimer::stop() + { + u64 current_wall_time = get_time_us(); + u64 time_diff = 0; + + if (!recording) + { + return; + } + else + { + time_diff = current_wall_time - last_period_time; + if (time_diff < min_period || min_period == 0) + { + min_period = time_diff; + } + if (time_diff > max_period || max_period == 0) + { + max_period = time_diff; + } + last_period = time_diff; + } + } +} // namespace simple_timer diff --git a/src/simple_timer/src/rt-sched.cpp b/src/simple_timer/src/rt-sched.cpp new file mode 100644 index 0000000..000c370 --- /dev/null +++ b/src/simple_timer/src/rt-sched.cpp @@ -0,0 +1,50 @@ +// SPDX-License-Identifier: GPL-2.0-or-later +/* + * rt-sched.h - sched_setattr() and sched_getattr() API + * + * (C) Dario Faggioli , 2009, 2010 + * Copyright (C) 2014 BMW Car IT GmbH, Daniel Wagner +#include +#include +#include + +#include "simple_timer/rt-sched.hpp" + +int sched_setattr(pid_t pid, + const struct sched_attr *attr, + unsigned int flags) +{ + return syscall(__NR_sched_setattr, pid, attr, flags); +} + +int sched_getattr(pid_t pid, + struct sched_attr *attr, + unsigned int size, + unsigned int flags) +{ + return syscall(__NR_sched_getattr, pid, attr, size, flags); +} + +void log_entry(node_time_logger logger, std::string text) +{ + if (logger.recorded_times != nullptr) + { + logger.recorded_times->push_back(std::make_pair(text, get_time_us() / 1000)); + // std::cout<>>(); + return logger; +} \ No newline at end of file