From a151eeee02d7fc8cc34f9d8ac8d8d412632ac817 Mon Sep 17 00:00:00 2001 From: Kurt Wilson Date: Sat, 20 May 2023 15:30:26 -0400 Subject: [PATCH] add multithreaded executor. various improvements --- src/priority_executor/CMakeLists.txt | 15 +- .../multithread_priority_executor.hpp | 34 +++ .../priority_executor/priority_executor.hpp | 52 +---- .../priority_memory_strategy.hpp | 67 +++--- .../src/multithread_priority_executor.cpp | 113 ++++++++++ .../src/priority_executor.cpp | 14 +- .../src/priority_memory_strategy.cpp | 195 ++++++++++-------- 7 files changed, 323 insertions(+), 167 deletions(-) create mode 100644 src/priority_executor/include/priority_executor/multithread_priority_executor.hpp create mode 100644 src/priority_executor/src/multithread_priority_executor.cpp diff --git a/src/priority_executor/CMakeLists.txt b/src/priority_executor/CMakeLists.txt index 66827e9..8313ee4 100644 --- a/src/priority_executor/CMakeLists.txt +++ b/src/priority_executor/CMakeLists.txt @@ -38,6 +38,18 @@ ament_target_dependencies(priority_executor simple_timer ) +add_library(multithread_priority_executor src/multithread_priority_executor.cpp src/priority_memory_strategy.cpp) +target_include_directories(multithread_priority_executor PUBLIC + $ + $ +) +ament_target_dependencies(multithread_priority_executor + rmw + rclcpp + rcl + simple_timer +) + add_executable(usage_example src/usage_example.cpp) target_include_directories(usage_example PUBLIC $ @@ -60,7 +72,7 @@ install(TARGETS usage_example DESTINATION lib/${PROJECT_NAME} ) -install(TARGETS priority_executor +install(TARGETS priority_executor multithread_priority_executor ARCHIVE DESTINATION lib LIBRARY DESTINATION lib RUNTIME DESTINATION bin @@ -79,4 +91,5 @@ endif() ament_export_include_directories(include) ament_export_libraries(priority_executor) +ament_export_libraries(multithread_priority_executor) ament_package() diff --git a/src/priority_executor/include/priority_executor/multithread_priority_executor.hpp b/src/priority_executor/include/priority_executor/multithread_priority_executor.hpp new file mode 100644 index 0000000..d5e45e4 --- /dev/null +++ b/src/priority_executor/include/priority_executor/multithread_priority_executor.hpp @@ -0,0 +1,34 @@ +#ifndef RTIS_MULTITHREAD_EXECUTOR +#define RTIS_MULTITHREAD_EXECUTOR + +#include +#include "rclcpp/detail/mutex_two_priorities.hpp" +#include + +namespace timed_executor +{ + class MultithreadTimedExecutor : public TimedExecutor + { + public: + RCLCPP_PUBLIC + explicit MultithreadTimedExecutor( + const rclcpp::ExecutorOptions &options = rclcpp::ExecutorOptions(), std::string name = "unnamed executor", int number_of_threads = 2, std::chrono::nanoseconds next_exec_timeout = std::chrono::nanoseconds(-1)); + + RCLCPP_PUBLIC size_t get_number_of_threads(); + RCLCPP_PUBLIC void spin() override; + + protected: + RCLCPP_PUBLIC void run(size_t thread_number); + + private: + size_t number_of_threads_; + std::set scheduled_timers_; + static std::unordered_map> + wait_mutex_set_; + static std::mutex shared_wait_mutex_; + std::chrono::nanoseconds next_exec_timeout_ = std::chrono::microseconds(100); // 0.1 ms + }; +} + +#endif \ No newline at end of file diff --git a/src/priority_executor/include/priority_executor/priority_executor.hpp b/src/priority_executor/include/priority_executor/priority_executor.hpp index 3223af1..11a026f 100644 --- a/src/priority_executor/include/priority_executor/priority_executor.hpp +++ b/src/priority_executor/include/priority_executor/priority_executor.hpp @@ -63,21 +63,17 @@ namespace timed_executor RCLCPP_PUBLIC void spin() override; - unsigned long long get_max_runtime(void); std::string name; void set_use_priorities(bool use_prio); std::shared_ptr> prio_memory_strategy_ = nullptr; - private: - RCLCPP_DISABLE_COPY(TimedExecutor) - // TODO: remove these - unsigned long long maxRuntime = 0; - unsigned long long start_time = 0; - int recording = 0; - void execute_subscription(rclcpp::AnyExecutable subscription); + protected: bool get_next_executable(rclcpp::AnyExecutable &any_executable, std::chrono::nanoseconds timeout = std::chrono::nanoseconds(-1)); + + private: + RCLCPP_DISABLE_COPY(TimedExecutor) void wait_for_work(std::chrono::nanoseconds timeout); @@ -89,44 +85,4 @@ namespace timed_executor } // namespace timed_executor -static void -take_and_do_error_handling( - const char *action_description, - const char *topic_or_service_name, - std::function take_action, - std::function handle_action) -{ - bool taken = false; - try - { - taken = take_action(); - } - catch (const rclcpp::exceptions::RCLError &rcl_error) - { - RCLCPP_ERROR( - rclcpp::get_logger("rclcpp"), - "executor %s '%s' unexpectedly failed: %s", - action_description, - topic_or_service_name, - rcl_error.what()); - } - if (taken) - { - handle_action(); - } - else - { - // Message or Service was not taken for some reason. - // Note that this can be normal, if the underlying middleware needs to - // interrupt wait spuriously it is allowed. - // So in that case the executor cannot tell the difference in a - // spurious wake up and an entity actually having data until trying - // to take the data. - RCLCPP_DEBUG( - rclcpp::get_logger("rclcpp"), - "executor %s '%s' failed to take anything", - action_description, - topic_or_service_name); - } -} #endif // RCLCPP__EXECUTORS__SINGLE_THREADED_EXECUTOR_HPP_ diff --git a/src/priority_executor/include/priority_executor/priority_memory_strategy.hpp b/src/priority_executor/include/priority_executor/priority_memory_strategy.hpp index 12ee1c0..17a6b24 100644 --- a/src/priority_executor/include/priority_executor/priority_memory_strategy.hpp +++ b/src/priority_executor/include/priority_executor/priority_memory_strategy.hpp @@ -19,6 +19,7 @@ #include #include #include +#include #include "rcl/allocator.h" @@ -51,10 +52,10 @@ enum ExecutableType enum ExecutableScheduleType { - CHAIN_INDEPENDENT_PRIORITY, // not used here + DEADLINE = 0, CHAIN_AWARE_PRIORITY, - DEADLINE, - DEFAULT, // not used here + CHAIN_INDEPENDENT_PRIORITY, // not used here + DEFAULT, // not used here }; class PriorityExecutable @@ -62,7 +63,7 @@ class PriorityExecutable public: std::shared_ptr handle; ExecutableType type; - bool can_be_run = true; + bool can_be_run = false; std::shared_ptr waitable; ExecutableScheduleType sched_type; @@ -89,6 +90,11 @@ public: PriorityExecutable(); void increment_counter(); + + bool operator==(const PriorityExecutable &other) const; + + static size_t num_executables; + int executable_id = 0; }; class PriorityExecutableComparator @@ -138,7 +144,10 @@ public: rclcpp::AnyExecutable &any_exec, const WeakNodeList &weak_nodes); - void post_execute(rclcpp::AnyExecutable any_exec); + /** + * thread-id is used for logging. if -1, then don't log the thread id + */ + void post_execute(rclcpp::AnyExecutable any_exec, int thread_id = -1); void get_next_subscription( @@ -161,12 +170,12 @@ public: void get_next_waitable(rclcpp::AnyExecutable &any_exec, const WeakNodeList &weak_nodes) override; - PriorityExecutable *get_priority_settings(std::shared_ptr executable) + std::shared_ptr get_priority_settings(std::shared_ptr executable) { auto search = priority_map.find(executable); if (search != priority_map.end()) { - return &(search->second); + return search->second; } else { @@ -248,23 +257,20 @@ public: void set_executable_priority(std::shared_ptr handle, int priority, ExecutableType t) { // TODO: any sanity checks should go here - // priority_map.insert(executable, priority); - priority_map[handle] = PriorityExecutable(handle, priority, t); + priority_map[handle] = std::make_shared(handle, priority, t); } void set_executable_priority(std::shared_ptr handle, int priority, ExecutableType t, ExecutableScheduleType sc, int chain_index) { // TODO: any sanity checks should go here - // priority_map.insert(executable, priority); - priority_map[handle] = PriorityExecutable(handle, priority, t, sc); - priority_map[handle].chain_id = chain_index; + priority_map[handle] = std::make_shared(handle, priority, t, sc); + priority_map[handle]->chain_id = chain_index; } void set_executable_deadline(std::shared_ptr handle, int period, ExecutableType t, int chain_id = 0) { // TODO: any sanity checks should go here - // priority_map.insert(executable, priority); - priority_map[handle] = PriorityExecutable(handle, period, t, DEADLINE); - priority_map[handle].chain_id = chain_id; + priority_map[handle] = std::make_shared(handle, period, t, DEADLINE); + priority_map[handle]->chain_id = chain_id; } int get_priority(std::shared_ptr executable) @@ -272,7 +278,7 @@ public: auto search = priority_map.find(executable); if (search != priority_map.end()) { - return search->second.priority; + return search->second->priority; } else { @@ -283,30 +289,29 @@ public: void set_first_in_chain(std::shared_ptr exec_handle) { - PriorityExecutable *settings = get_priority_settings(exec_handle); - settings->is_first_in_chain = true; + get_priority_settings(exec_handle)->is_first_in_chain = true; } void set_last_in_chain(std::shared_ptr exec_handle) { - PriorityExecutable *settings = get_priority_settings(exec_handle); - settings->is_last_in_chain = true; + get_priority_settings(exec_handle)->is_last_in_chain = true; } void assign_deadlines_queue(std::shared_ptr exec_handle, std::deque *deadlines) { - PriorityExecutable *settings = get_priority_settings(exec_handle); - settings->deadlines = deadlines; + get_priority_settings(exec_handle)->deadlines = deadlines; } private: - PriorityExecutable *get_and_reset_priority(std::shared_ptr executable, ExecutableType t) + std::shared_ptr get_and_reset_priority(std::shared_ptr executable, ExecutableType t) { - PriorityExecutable *p = get_priority_settings(executable); + // PriorityExecutable *p = get_priority_settings(executable); + std::shared_ptr p = get_priority_settings(executable); if (p == nullptr) { - priority_map[executable] = PriorityExecutable(executable, 0, t); - p = &(priority_map[executable]); + // priority_map[executable] = PriorityExecutable(executable, 0, t); + priority_map[executable] = std::make_shared(executable, 0, t); + p = priority_map[executable]; } // p->can_be_run = true; return p; @@ -329,10 +334,16 @@ private: // TODO: evaluate using node/subscription namespaced strings as keys // holds *all* handle->priority mappings - std::unordered_map, PriorityExecutable> priority_map; + std::map, std::shared_ptr> priority_map; // hold *only valid* executable+priorities - std::priority_queue, PriorityExecutableComparator> all_executables_; + // std::priority_queue, PriorityExecutableComparator> all_executables_; + std::set all_executables_; + + // priority queue doesn't allow iteration. fortunately, std::map is sorted by key, so we can replace the priority queue with a map + // the key will be the priority. the value doesn't matter. + // std::map all_executables_ = std::map(PriorityExecutableComparator()); + void add_executable_to_queue(std::shared_ptr p); }; #endif // RCLCPP__STRATEGIES__ALLOCATOR_MEMORY_STRATEGY_HPP_ diff --git a/src/priority_executor/src/multithread_priority_executor.cpp b/src/priority_executor/src/multithread_priority_executor.cpp new file mode 100644 index 0000000..93c9b85 --- /dev/null +++ b/src/priority_executor/src/multithread_priority_executor.cpp @@ -0,0 +1,113 @@ +#include "priority_executor/multithread_priority_executor.hpp" +namespace timed_executor +{ + std::unordered_map> + MultithreadTimedExecutor::wait_mutex_set_; + std::mutex MultithreadTimedExecutor::shared_wait_mutex_; + MultithreadTimedExecutor::MultithreadTimedExecutor( + const rclcpp::ExecutorOptions &options, + std::string name, + int number_of_threads, std::chrono::nanoseconds next_exec_timeout) + : TimedExecutor(options, name) + { + std::lock_guard wait_lock(MultithreadTimedExecutor::shared_wait_mutex_); + wait_mutex_set_[this] = std::make_shared(); + number_of_threads_ = number_of_threads; + next_exec_timeout_ = next_exec_timeout; + } + + size_t MultithreadTimedExecutor::get_number_of_threads() + { + return number_of_threads_; + } + + void MultithreadTimedExecutor::spin() + { + if (spinning.exchange(true)) + { + throw std::runtime_error("spin() called while already spinning"); + } + + std::vector threads; + size_t thread_id = 0; + { + auto wait_mutex = MultithreadTimedExecutor::wait_mutex_set_[this]; + auto low_priority_wait_mutex = wait_mutex->get_low_priority_lockable(); + std::lock_guard wait_lock(low_priority_wait_mutex); + for (; thread_id < number_of_threads_ - 1; ++thread_id) + { + auto func = std::bind(&MultithreadTimedExecutor::run, this, thread_id); + threads.emplace_back(func); + } + } + run(thread_id); + for (auto &thread : threads) + { + thread.join(); + } + } + + void MultithreadTimedExecutor::run(size_t thread_number) + { + // set affinity + cpu_set_t cpuset; + CPU_ZERO(&cpuset); + CPU_SET(thread_number, &cpuset); + int rc = pthread_setaffinity_np(pthread_self(), sizeof(cpu_set_t), &cpuset); + if (rc != 0) + { + std::cout << "Error calling pthread_setaffinity_np: " << rc << "\n"; + } + + while (rclcpp::ok(this->context_) && spinning.load()) + { + rclcpp::AnyExecutable any_executable; + { + auto wait_mutex = MultithreadTimedExecutor::wait_mutex_set_[this]; + auto low_priority_wait_mutex = wait_mutex->get_low_priority_lockable(); + std::lock_guard wait_lock(low_priority_wait_mutex); + if (!rclcpp::ok(this->context_) || !spinning.load()) + { + return; + } + if (!get_next_executable(any_executable, next_exec_timeout_)) + { + continue; + } + if (any_executable.timer) + { + if (scheduled_timers_.count(any_executable.timer) != 0) + { + if (any_executable.callback_group) + { + any_executable.callback_group->can_be_taken_from().store(true); + } + continue; + } + scheduled_timers_.insert(any_executable.timer); + } + } + execute_any_executable(any_executable); + if (any_executable.timer) + { + auto wait_mutex = MultithreadTimedExecutor::wait_mutex_set_[this]; + auto high_priority_wait_mutex = wait_mutex->get_high_priority_lockable(); + std::lock_guard wait_lock(high_priority_wait_mutex); + auto it = scheduled_timers_.find(any_executable.timer); + if (it != scheduled_timers_.end()) + { + scheduled_timers_.erase(it); + } + } + any_executable.callback_group.reset(); + if (prio_memory_strategy_ != nullptr) + { + auto wait_mutex = MultithreadTimedExecutor::wait_mutex_set_[this]; + auto low_priority_wait_mutex = wait_mutex->get_low_priority_lockable(); + std::lock_guard wait_lock(low_priority_wait_mutex); + // std::shared_ptr> prio_memory_strategy_ = std::dynamic_pointer_cast(memory_strategy_); + prio_memory_strategy_->post_execute(any_executable, thread_number); + } + } + } +} diff --git a/src/priority_executor/src/priority_executor.cpp b/src/priority_executor/src/priority_executor.cpp index a4e8c63..3cf3d1a 100644 --- a/src/priority_executor/src/priority_executor.cpp +++ b/src/priority_executor/src/priority_executor.cpp @@ -20,6 +20,8 @@ #include "rclcpp/exceptions.hpp" #include #include +// for sleep +#include namespace timed_executor { @@ -60,19 +62,17 @@ namespace timed_executor std::cout << "shutdown" << std::endl; } - unsigned long long TimedExecutor::get_max_runtime(void) - { - return maxRuntime; - } - bool TimedExecutor::get_next_executable(rclcpp::AnyExecutable &any_executable, std::chrono::nanoseconds timeout) { bool success = false; // Check to see if there are any subscriptions or timers needing service // TODO(wjwwood): improve run to run efficiency of this function - sched_yield(); - wait_for_work(std::chrono::milliseconds(1)); + // sched_yield(); + // sleep for 10us + // usleep(20); + wait_for_work(timeout); success = get_next_ready_executable(any_executable); + // std::cout << "get_next_executable: " << success << std::endl; return success; } diff --git a/src/priority_executor/src/priority_memory_strategy.cpp b/src/priority_executor/src/priority_memory_strategy.cpp index a956a47..8f8ddf0 100644 --- a/src/priority_executor/src/priority_memory_strategy.cpp +++ b/src/priority_executor/src/priority_memory_strategy.cpp @@ -1,9 +1,12 @@ #include "priority_executor/priority_memory_strategy.hpp" #include "simple_timer/rt-sched.hpp" #include +size_t PriorityExecutable::num_executables; PriorityExecutable::PriorityExecutable(std::shared_ptr h, int p, ExecutableType t, ExecutableScheduleType sched_type) { + std::cout << "priority_executable constructor called" << std::endl; + std::cout << "type: " << t << std::endl; handle = h; type = t; if (sched_type == CHAIN_INDEPENDENT_PRIORITY || sched_type == CHAIN_AWARE_PRIORITY) @@ -15,6 +18,9 @@ PriorityExecutable::PriorityExecutable(std::shared_ptr h, int p, Exe period = p; } this->sched_type = sched_type; + + this->executable_id = num_executables; + num_executables += 1; } PriorityExecutable::PriorityExecutable() @@ -32,14 +38,7 @@ void PriorityExecutable::dont_run() void PriorityExecutable::allow_run() { - if (this->can_be_run) - { - // release has been detected - } - else - { - this->can_be_run = true; - } + this->can_be_run = true; } void PriorityExecutable::increment_counter() @@ -47,53 +46,58 @@ void PriorityExecutable::increment_counter() this->counter += 1; } +bool PriorityExecutable::operator==(const PriorityExecutable &other) const +{ + std::cout << "PriorityExecutable::operator== called" << std::endl; + if (this->handle == other.handle) + { + return true; + } + else + { + return false; + } +} + bool PriorityExecutableComparator::operator()(const PriorityExecutable *p1, const PriorityExecutable *p2) { + // since this will be used in a std::set, also check for equality if (p1 == nullptr || p2 == nullptr) { // TODO: realistic value - return 0; + std::cout << "PriorityExecutableComparator::operator() called with nullptr" << std::endl; + return false; } + if (p1->handle == p2->handle) + { + return false; + } + if (p1->executable_id == p2->executable_id) + { + return false; + } + if (p1->sched_type != p2->sched_type) { - if (p1->sched_type == DEADLINE) - { - return false; - } - else if (p2->sched_type == DEADLINE) - { - return true; - } - if (p1->sched_type == CHAIN_INDEPENDENT_PRIORITY) - { - return false; - } - else if (p2->sched_type == CHAIN_INDEPENDENT_PRIORITY) - { - return true; - } - else if (p1->sched_type == CHAIN_AWARE_PRIORITY) - { - return false; - } - else - { - return true; - } + // in order: DEADLINE, CHAIN_AWARE, CHAIN_INDEPENDENT + return p1->sched_type < p2->sched_type; } if (p1->sched_type == CHAIN_INDEPENDENT_PRIORITY) { + if (p1->priority == p2->priority) + { + return p1->executable_id < p2->executable_id; + } // lower value runs first - return p1->priority > p2->priority; + return p1->priority < p2->priority; } if (p1->sched_type == CHAIN_AWARE_PRIORITY) { - if (p1->priority != p2->priority) + if (p1->priority == p2->priority) { - return p1->priority > p2->priority; + return p1->executable_id < p2->executable_id; } - // return p1->counter > p2->counter; - return 0; + return p1->priority < p2->priority; } if (p1->sched_type == DEADLINE) { @@ -109,26 +113,27 @@ bool PriorityExecutableComparator::operator()(const PriorityExecutable *p1, cons { p2_deadline = p2->deadlines->front(); } + if (p1_deadline == p2_deadline) + { + // this looks bad and is bad, BUT + // if we tell std::set these are equal, only one will be added, and we will lose the other. + // we need _something_ to make them unique + return p1->executable_id < p2->executable_id; + } if (p1_deadline == 0) { - return true; + p1_deadline = std::numeric_limits::max(); } if (p2_deadline == 0) { - return false; + p2_deadline = std::numeric_limits::max(); } - if (p1_deadline == p2_deadline) - { - // these must be from the same chain - // settle the difference with the counter - return p1->counter > p2->counter; - } - return p1_deadline > p2_deadline; + return p1_deadline < p2_deadline; } else { std::cout << "invalid compare opration on priority_exec" << std::endl; - return 0; + return false; } } @@ -158,6 +163,15 @@ void PriorityMemoryStrategy<>::remove_guard_condition(const rcl_guard_condition_ } } +template <> +void PriorityMemoryStrategy<>::add_executable_to_queue(std::shared_ptr e) +{ + // e may have changed. remove and re-add + all_executables_.erase(e.get()); + // convert from shared_ptr to raw pointer + all_executables_.insert(e.get()); +} + template <> void PriorityMemoryStrategy<>::clear_handles() { @@ -167,14 +181,9 @@ void PriorityMemoryStrategy<>::clear_handles() timer_handles_.clear(); waitable_handles_.clear(); - // priority_queue doesn't have a clear function, so we swap it with an empty one. `empty` will go out of scope, and be cleared // all_executables_ = std::priority_queue, PriorityExecutableComparator>(); - std::priority_queue, PriorityExecutableComparator> empty; - std::swap(all_executables_, empty); - if (!all_executables_.empty()) - { - std::cout << "couldn't clear all exec" << std::endl; - } + // all_executables_.clear(); + // all_executables_ = std::map(PriorityExecutableComparator()); } template <> @@ -190,60 +199,60 @@ void PriorityMemoryStrategy<>::remove_null_handles(rcl_wait_set_t *wait_set) { if (!wait_set->subscriptions[i]) { - priority_map[subscription_handles_[i]].dont_run(); + priority_map[subscription_handles_[i]]->dont_run(); subscription_handles_[i].reset(); } else { - priority_map[subscription_handles_[i]].allow_run(); + priority_map[subscription_handles_[i]]->allow_run(); } } for (size_t i = 0; i < service_handles_.size(); ++i) { if (!wait_set->services[i]) { - priority_map[service_handles_[i]].dont_run(); + priority_map[service_handles_[i]]->dont_run(); service_handles_[i].reset(); } else { - priority_map[service_handles_[i]].allow_run(); + priority_map[service_handles_[i]]->allow_run(); } } for (size_t i = 0; i < client_handles_.size(); ++i) { if (!wait_set->clients[i]) { - priority_map[client_handles_[i]].dont_run(); + priority_map[client_handles_[i]]->dont_run(); client_handles_[i].reset(); } else { - priority_map[client_handles_[i]].allow_run(); + priority_map[client_handles_[i]]->allow_run(); } } for (size_t i = 0; i < timer_handles_.size(); ++i) { if (!wait_set->timers[i]) { - priority_map[timer_handles_[i]].dont_run(); + priority_map[timer_handles_[i]]->dont_run(); timer_handles_[i].reset(); } else { - priority_map[timer_handles_[i]].allow_run(); + priority_map[timer_handles_[i]]->allow_run(); } } for (size_t i = 0; i < waitable_handles_.size(); ++i) { if (!waitable_handles_[i]->is_ready(wait_set)) { - priority_map[waitable_handles_[i]].dont_run(); + priority_map[waitable_handles_[i]]->dont_run(); waitable_handles_[i].reset(); } else { - priority_map[waitable_handles_[i]].allow_run(); + priority_map[waitable_handles_[i]]->allow_run(); } } @@ -293,35 +302,34 @@ bool PriorityMemoryStrategy<>::collect_entities(const WeakNodeList &weak_nodes) { auto subscription_handle = subscription->get_subscription_handle(); subscription_handles_.push_back(subscription_handle); - all_executables_.push(get_and_reset_priority(subscription_handle, SUBSCRIPTION)); - + add_executable_to_queue(get_and_reset_priority(subscription_handle, SUBSCRIPTION)); return false; }); group->find_service_ptrs_if( [this](const rclcpp::ServiceBase::SharedPtr &service) { - all_executables_.push(get_and_reset_priority(service->get_service_handle(), SERVICE)); + add_executable_to_queue(get_and_reset_priority(service->get_service_handle(), SERVICE)); service_handles_.push_back(service->get_service_handle()); return false; }); group->find_client_ptrs_if( [this](const rclcpp::ClientBase::SharedPtr &client) { - all_executables_.push(get_and_reset_priority(client->get_client_handle(), CLIENT)); + add_executable_to_queue(get_and_reset_priority(client->get_client_handle(), CLIENT)); client_handles_.push_back(client->get_client_handle()); return false; }); group->find_timer_ptrs_if( [this](const rclcpp::TimerBase::SharedPtr &timer) { - all_executables_.push(get_and_reset_priority(timer->get_timer_handle(), TIMER)); + add_executable_to_queue(get_and_reset_priority(timer->get_timer_handle(), TIMER)); timer_handles_.push_back(timer->get_timer_handle()); return false; }); group->find_waitable_ptrs_if( [this](const rclcpp::Waitable::SharedPtr &waitable) { - all_executables_.push(get_and_reset_priority(waitable, WAITABLE)); + add_executable_to_queue(get_and_reset_priority(waitable, WAITABLE)); waitable_handles_.push_back(waitable); return false; }); @@ -420,10 +428,17 @@ void PriorityMemoryStrategy<>::get_next_executable( { const PriorityExecutable *next_exec = nullptr; - while (!all_executables_.empty()) + // std::cout << "all_executables_.size():" << all_executables_.size() << std::endl; + // print all_executables_ + // for (auto exec : all_executables_) + // { + // std::cout << exec << ": " << exec->handle << " : " << exec->sched_type << std::endl; + // } + + // while (!all_executables_.empty()) + for (auto exec : all_executables_) { - next_exec = all_executables_.top(); - all_executables_.pop(); + next_exec = exec; if (!next_exec->can_be_run) { continue; @@ -571,22 +586,26 @@ void PriorityMemoryStrategy<>::get_next_executable( } break; default: - // std::cout << "Unknown type from priority!!!" << std::endl; - break; + std::cout << "Unknown type from priority!!!" << std::endl; + continue; + // break; } if (next_exec->is_first_in_chain && next_exec->sched_type == DEADLINE) { // std::cout << "running first in chain deadline" << std::endl; } + // returning with an executable + // remove from all_executables_ map + all_executables_.erase(exec); return; } } template <> -void PriorityMemoryStrategy<>::post_execute(rclcpp::AnyExecutable any_exec) +void PriorityMemoryStrategy<>::post_execute(rclcpp::AnyExecutable any_exec, int thread_id) { - PriorityExecutable *next_exec = nullptr; + std::shared_ptr next_exec = nullptr; if (any_exec.subscription) { next_exec = get_priority_settings(any_exec.subscription->get_subscription_handle()); @@ -660,7 +679,7 @@ void PriorityMemoryStrategy<>::post_execute(rclcpp::AnyExecutable any_exec) int64_t time_diff = millis - this_deadline; int periods_late; // if time_diff is negative, we completed on time. add one period to the deadline - if (time_diff < 0) + if (time_diff <= 0) { periods_late = 0; next_deadline = this_deadline + next_exec->period; @@ -670,7 +689,7 @@ void PriorityMemoryStrategy<>::post_execute(rclcpp::AnyExecutable any_exec) else { periods_late = std::ceil(time_diff / (double)next_exec->period); - next_deadline = this_deadline + (periods_late + 1) * next_exec->period; + next_deadline = this_deadline + (periods_late)*next_exec->period; on_time = false; } // std::chrono::nanoseconds time_until = next_exec->timer_handle->time_until_trigger(); @@ -678,9 +697,19 @@ void PriorityMemoryStrategy<>::post_execute(rclcpp::AnyExecutable any_exec) // the next deadline is the current time plus the period, skipping periods that have already passed // RCLCPP_INFO(rclcpp::get_logger("rclcpp"), "next deadline: %d", next_deadline); - oss << "{\"operation\": \"next_deadline\", \"chain_id\": " << next_exec->chain_id << ", \"deadline\": " << next_deadline << ", \"on_time\": " << on_time << ", \"time_diff\": " << time_diff << ", \"periods_late\": " << periods_late << "}"; + oss << "{\"operation\": \"next_deadline\", \"chain_id\": " << next_exec->chain_id << ", \"deadline\": " << next_deadline << ", \"on_time\": " << on_time << ", \"time_diff\": " << time_diff << ", \"periods_late\": " << periods_late; + if (thread_id != -1) + { + oss << ", \"thread_id\": " << thread_id; + } + + oss << "}"; // oss << "{\"operation\": \"next_deadline\", \"chain_id\": " << next_exec->chain_id << ", \"deadline\": " << next_exec->deadlines->front() << "}"; log_entry(logger_, oss.str()); + if (time_diff < -next_exec->period) + { + // the deadline was much later than it needed to be + } next_exec->deadlines->push_back(next_deadline); // print chain id and deadline contents @@ -699,7 +728,7 @@ void PriorityMemoryStrategy<>::post_execute(rclcpp::AnyExecutable any_exec) // this is safe, since we popped it earlier // get a mutable reference // TODO: find a cleaner way to do this - PriorityExecutable *mut_executable = get_priority_settings(next_exec->handle); + std::shared_ptr mut_executable = get_priority_settings(next_exec->handle); // std::cout << "running chain aware cb" << std::endl; mut_executable->increment_counter(); } @@ -906,4 +935,4 @@ void PriorityMemoryStrategy<>:: // Else, the waitable is no longer valid, remove it and continue it = waitable_handles_.erase(it); } -} \ No newline at end of file +}