diff --git a/src/priority_executor/include/priority_executor/default_executor.hpp b/src/priority_executor/include/priority_executor/default_executor.hpp index 90f3038..3359e92 100644 --- a/src/priority_executor/include/priority_executor/default_executor.hpp +++ b/src/priority_executor/include/priority_executor/default_executor.hpp @@ -21,6 +21,7 @@ #include #include #include +#include #include "rclcpp/executor.hpp" #include "rclcpp/rclcpp.hpp" @@ -30,33 +31,38 @@ #include "rclcpp/utilities.hpp" #include "rclcpp/rate.hpp" #include "rclcpp/visibility_control.hpp" +#include "rclcpp/detail/mutex_two_priorities.hpp" #include "priority_executor/priority_memory_strategy.hpp" #include "simple_timer/rt-sched.hpp" class RTISTimed { public: - node_time_logger logger_ = create_logger(); + node_time_logger logger_; }; -class ROSDefaultMultithreadedExecutor : public rclcpp::executors::MultiThreadedExecutor, public RTISTimed +class ROSDefaultMultithreadedExecutor : public rclcpp::Executor, public RTISTimed { public: - RCLCPP_SMART_PTR_DEFINITIONS(ROSDefaultMultithreadedExecutor) - - /// Default constructor. See the default constructor for Executor. - RCLCPP_PUBLIC explicit ROSDefaultMultithreadedExecutor( - const rclcpp::ExecutorOptions &options = rclcpp::ExecutorOptions(), - size_t number_of_threads = 0, - bool yield_before_execute = false, - std::chrono::nanoseconds timeout = std::chrono::nanoseconds(-1)) : rclcpp::executors::MultiThreadedExecutor(options, number_of_threads, yield_before_execute, timeout){}; - - /// Default destructor. RCLCPP_PUBLIC - virtual ~ROSDefaultMultithreadedExecutor(); + explicit ROSDefaultMultithreadedExecutor( + const rclcpp::ExecutorOptions &options = rclcpp::ExecutorOptions(), int number_of_threads = 2, std::chrono::nanoseconds next_exec_timeout = std::chrono::nanoseconds(-1)); - RCLCPP_PUBLIC + RCLCPP_PUBLIC size_t get_number_of_threads(); + RCLCPP_PUBLIC void spin() override; bool get_next_executable(rclcpp::AnyExecutable &any_executable, std::chrono::nanoseconds timeout = std::chrono::nanoseconds(-1)); + +protected: + RCLCPP_PUBLIC void run(size_t thread_number); + +private: + size_t number_of_threads_; + std::set scheduled_timers_; + static std::unordered_map> + wait_mutex_set_; + static std::mutex shared_wait_mutex_; + std::chrono::nanoseconds next_exec_timeout_ = std::chrono::nanoseconds(-1); }; /// Single-threaded executor implementation. diff --git a/src/priority_executor/include/priority_executor/priority_executor.hpp b/src/priority_executor/include/priority_executor/priority_executor.hpp index 11a026f..a942d08 100644 --- a/src/priority_executor/include/priority_executor/priority_executor.hpp +++ b/src/priority_executor/include/priority_executor/priority_executor.hpp @@ -31,6 +31,7 @@ #include "rclcpp/rate.hpp" #include "rclcpp/visibility_control.hpp" #include "priority_executor/priority_memory_strategy.hpp" +#include namespace timed_executor { @@ -38,7 +39,7 @@ namespace timed_executor /** * This is the default executor created by rclcpp::spin. */ - class TimedExecutor : public rclcpp::Executor + class TimedExecutor : public rclcpp::Executor, public RTISTimed { public: RCLCPP_SMART_PTR_DEFINITIONS(TimedExecutor) diff --git a/src/priority_executor/src/default_executor.cpp b/src/priority_executor/src/default_executor.cpp index 43bbe3a..d1dfc1b 100644 --- a/src/priority_executor/src/default_executor.cpp +++ b/src/priority_executor/src/default_executor.cpp @@ -25,9 +25,6 @@ ROSDefaultExecutor::ROSDefaultExecutor(const rclcpp::ExecutorOptions &options) ROSDefaultExecutor::~ROSDefaultExecutor() {} -ROSDefaultMultithreadedExecutor::~ROSDefaultMultithreadedExecutor() {} - - void ROSDefaultExecutor::wait_for_work(std::chrono::nanoseconds timeout) { { @@ -134,9 +131,13 @@ bool ROSDefaultMultithreadedExecutor::get_next_executable(rclcpp::AnyExecutable // TODO(wjwwood): improve run to run efficiency of this function // try to get an executable + // record the start time + auto start = std::chrono::steady_clock::now(); success = get_next_ready_executable(any_executable); + // and the end time + auto end = std::chrono::steady_clock::now(); std::stringstream oss; - oss << "{\"operation\":\"get_next_executable\", \"result\":\"" << success << "\"}"; + oss << "{\"operation\":\"get_next_executable\", \"result\":\"" << success << "\", \"duration\":\"" << std::chrono::duration_cast(end - start).count() << "\"}"; log_entry(logger_, oss.str()); // If there are none @@ -144,15 +145,124 @@ bool ROSDefaultMultithreadedExecutor::get_next_executable(rclcpp::AnyExecutable { // Wait for subscriptions or timers to work on // queue refresh + start = std::chrono::steady_clock::now(); wait_for_work(timeout); + // and the end time + end = std::chrono::steady_clock::now(); + auto wait_duration = std::chrono::duration_cast(end - start).count(); + oss.str(""); + oss << "{\"operation\":\"wait_for_work\", \"result\":\"" << success << "\", \"wait_duration\":\"" << wait_duration << "\"}"; + log_entry(logger_, oss.str()); if (!spinning.load()) { return false; } // Try again + start = std::chrono::steady_clock::now(); success = get_next_ready_executable(any_executable); + // and the end time + end = std::chrono::steady_clock::now(); oss.str(""); - oss << "{\"operation\":\"wait_for_work\", \"result\":\"" << success << "\"}"; + oss << "{\"operation\":\"get_next_executable\", \"result\":\"" << success << "\", \"duration\":\"" << std::chrono::duration_cast(end - start).count() << "\"}"; + log_entry(logger_, oss.str()); } return success; } + +std::unordered_map> ROSDefaultMultithreadedExecutor::wait_mutex_set_; +std::mutex ROSDefaultMultithreadedExecutor::shared_wait_mutex_; + +ROSDefaultMultithreadedExecutor::ROSDefaultMultithreadedExecutor( + const rclcpp::ExecutorOptions &options, + int number_of_threads, std::chrono::nanoseconds next_exec_timeout) + : Executor(options) +{ + std::lock_guard wait_lock(ROSDefaultMultithreadedExecutor::shared_wait_mutex_); + wait_mutex_set_[this] = std::make_shared(); + number_of_threads_ = number_of_threads; + next_exec_timeout_ = next_exec_timeout; + logger_ = create_logger(); +} + +void ROSDefaultMultithreadedExecutor::spin() +{ + if (spinning.exchange(true)) + { + throw std::runtime_error("spin() called while already spinning"); + } + + std::vector threads; + size_t thread_id = 0; + { + auto wait_mutex = ROSDefaultMultithreadedExecutor::wait_mutex_set_[this]; + auto low_priority_wait_mutex = wait_mutex->get_low_priority_lockable(); + std::lock_guard wait_lock(low_priority_wait_mutex); + for (; thread_id < number_of_threads_ - 1; ++thread_id) + { + auto func = std::bind(&ROSDefaultMultithreadedExecutor::run, this, thread_id); + threads.emplace_back(func); + } + } + run(thread_id); + for (auto &thread : threads) + { + thread.join(); + } +} + +void ROSDefaultMultithreadedExecutor::run(size_t thread_number) +{ + while (rclcpp::ok(this->context_) && spinning.load()) + { + rclcpp::AnyExecutable any_exec; + { + auto wait_mutex = ROSDefaultMultithreadedExecutor::wait_mutex_set_[this]; + auto low_priority_wait_mutex = wait_mutex->get_low_priority_lockable(); + std::lock_guard wait_lock(low_priority_wait_mutex); + if (!rclcpp::ok(this->context_) || !spinning.load()) + { + return; + } + if (!get_next_executable(any_exec, next_exec_timeout_)) + { + continue; + } + if (any_exec.timer) + { + // Guard against multiple threads getting the same timer. + if (scheduled_timers_.count(any_exec.timer) != 0) + { + // Make sure that any_exec's callback group is reset before + // the lock is released. + if (any_exec.callback_group) + { + any_exec.callback_group->can_be_taken_from().store(true); + } + continue; + } + scheduled_timers_.insert(any_exec.timer); + } + } + // if (yield_before_execute_) + // { + // std::this_thread::yield(); + // } + + execute_any_executable(any_exec); + + if (any_exec.timer) + { + auto wait_mutex = ROSDefaultMultithreadedExecutor::wait_mutex_set_[this]; + auto high_priority_wait_mutex = wait_mutex->get_high_priority_lockable(); + std::lock_guard wait_lock(high_priority_wait_mutex); + auto it = scheduled_timers_.find(any_exec.timer); + if (it != scheduled_timers_.end()) + { + scheduled_timers_.erase(it); + } + } + // Clear the callback_group to prevent the AnyExecutable destructor from + // resetting the callback group `can_be_taken_from` + any_exec.callback_group.reset(); + } +} diff --git a/src/priority_executor/src/multithread_priority_executor.cpp b/src/priority_executor/src/multithread_priority_executor.cpp index 93c9b85..ed691c1 100644 --- a/src/priority_executor/src/multithread_priority_executor.cpp +++ b/src/priority_executor/src/multithread_priority_executor.cpp @@ -14,6 +14,7 @@ namespace timed_executor wait_mutex_set_[this] = std::make_shared(); number_of_threads_ = number_of_threads; next_exec_timeout_ = next_exec_timeout; + logger_ = create_logger(); } size_t MultithreadTimedExecutor::get_number_of_threads() diff --git a/src/priority_executor/src/priority_executor.cpp b/src/priority_executor/src/priority_executor.cpp index fae5463..feb902d 100644 --- a/src/priority_executor/src/priority_executor.cpp +++ b/src/priority_executor/src/priority_executor.cpp @@ -29,6 +29,7 @@ namespace timed_executor : rclcpp::Executor(options) { this->name = name; + logger_ = create_logger(); } TimedExecutor::~TimedExecutor() {} @@ -70,9 +71,21 @@ namespace timed_executor // sched_yield(); // sleep for 10us // usleep(20); + auto start = std::chrono::steady_clock::now(); wait_for_work(timeout); + auto end = std::chrono::steady_clock::now(); + auto wait_duration = std::chrono::duration_cast(end - start); + std::ostringstream oss; + oss << "{\"operation\": \"wait_for_work\", \"wait_duration\": " << wait_duration.count() << "}"; + log_entry(logger_, oss.str()); + + start = std::chrono::steady_clock::now(); success = get_next_ready_executable(any_executable); - // std::cout << "get_next_executable: " << success << std::endl; + end = std::chrono::steady_clock::now(); + auto get_next_duration = std::chrono::duration_cast(end - start); + oss.str(""); + oss << "{\"operation\": \"get_next_executable\", \"duration\": " << get_next_duration.count() << ", \"result\": " << success << "}"; + log_entry(logger_, oss.str()); return success; } diff --git a/src/priority_executor/src/priority_memory_strategy.cpp b/src/priority_executor/src/priority_memory_strategy.cpp index bd3e4ea..383d698 100644 --- a/src/priority_executor/src/priority_memory_strategy.cpp +++ b/src/priority_executor/src/priority_memory_strategy.cpp @@ -627,15 +627,15 @@ void PriorityMemoryStrategy<>::get_next_executable( // returning with an executable // remove from all_executables_ map all_executables_.erase(next_exec); - std::stringstream oss; - oss << "{\"operation\":\"get_next_executable\", \"result\":\"success\", \"type\":\"" << type << "\"}"; - log_entry(logger_, oss.str()); + // std::stringstream oss; + // oss << "{\"operation\":\"get_next_executable\", \"result\":\"success\", \"type\":\"" << type << "\"}"; + // log_entry(logger_, oss.str()); return; } // nothing found - std::stringstream oss; - oss << "{\"operation\":\"get_next_executable\", \"result\":\"nothing_found\"}"; - log_entry(logger_, oss.str()); + // std::stringstream oss; + // oss << "{\"operation\":\"get_next_executable\", \"result\":\"nothing_found\"}"; + // log_entry(logger_, oss.str()); } template <>