diff --git a/src/priority_executor/CMakeLists.txt b/src/priority_executor/CMakeLists.txt index 8313ee4..f65a112 100644 --- a/src/priority_executor/CMakeLists.txt +++ b/src/priority_executor/CMakeLists.txt @@ -50,6 +50,18 @@ ament_target_dependencies(multithread_priority_executor simple_timer ) +add_library(default_executor src/default_executor.cpp) +target_include_directories(default_executor PUBLIC + $ + $ +) +ament_target_dependencies(default_executor + rmw + rclcpp + rcl + simple_timer +) + add_executable(usage_example src/usage_example.cpp) target_include_directories(usage_example PUBLIC $ @@ -72,7 +84,7 @@ install(TARGETS usage_example DESTINATION lib/${PROJECT_NAME} ) -install(TARGETS priority_executor multithread_priority_executor +install(TARGETS priority_executor multithread_priority_executor default_executor ARCHIVE DESTINATION lib LIBRARY DESTINATION lib RUNTIME DESTINATION bin @@ -92,4 +104,5 @@ endif() ament_export_include_directories(include) ament_export_libraries(priority_executor) ament_export_libraries(multithread_priority_executor) +ament_export_libraries(default_executor) ament_package() diff --git a/src/priority_executor/include/priority_executor/default_executor.hpp b/src/priority_executor/include/priority_executor/default_executor.hpp index e5f56d6..90f3038 100644 --- a/src/priority_executor/include/priority_executor/default_executor.hpp +++ b/src/priority_executor/include/priority_executor/default_executor.hpp @@ -23,6 +23,7 @@ #include #include "rclcpp/executor.hpp" +#include "rclcpp/rclcpp.hpp" #include "rclcpp/macros.hpp" #include "rclcpp/memory_strategies.hpp" #include "rclcpp/node.hpp" @@ -30,17 +31,42 @@ #include "rclcpp/rate.hpp" #include "rclcpp/visibility_control.hpp" #include "priority_executor/priority_memory_strategy.hpp" +#include "simple_timer/rt-sched.hpp" + +class RTISTimed +{ +public: + node_time_logger logger_ = create_logger(); +}; + +class ROSDefaultMultithreadedExecutor : public rclcpp::executors::MultiThreadedExecutor, public RTISTimed +{ +public: + RCLCPP_SMART_PTR_DEFINITIONS(ROSDefaultMultithreadedExecutor) + + /// Default constructor. See the default constructor for Executor. + RCLCPP_PUBLIC explicit ROSDefaultMultithreadedExecutor( + const rclcpp::ExecutorOptions &options = rclcpp::ExecutorOptions(), + size_t number_of_threads = 0, + bool yield_before_execute = false, + std::chrono::nanoseconds timeout = std::chrono::nanoseconds(-1)) : rclcpp::executors::MultiThreadedExecutor(options, number_of_threads, yield_before_execute, timeout){}; + + /// Default destructor. + RCLCPP_PUBLIC + virtual ~ROSDefaultMultithreadedExecutor(); + + RCLCPP_PUBLIC + bool get_next_executable(rclcpp::AnyExecutable &any_executable, std::chrono::nanoseconds timeout = std::chrono::nanoseconds(-1)); +}; /// Single-threaded executor implementation. /** * This is the default executor created by rclcpp::spin. */ -class ROSDefaultExecutor : public rclcpp::Executor +class ROSDefaultExecutor : public rclcpp::Executor, public RTISTimed { public: RCLCPP_SMART_PTR_DEFINITIONS(ROSDefaultExecutor) - std::unordered_map, PriorityExecutable> priority_map; - node_time_logger logger; /// Default constructor. See the default constructor for Executor. RCLCPP_PUBLIC @@ -64,6 +90,10 @@ public: spin() override; bool get_next_executable(rclcpp::AnyExecutable &any_executable, std::chrono::nanoseconds timeout = std::chrono::nanoseconds(-1)); + RCLCPP_PUBLIC + void + wait_for_work(std::chrono::nanoseconds timeout = std::chrono::nanoseconds(-1)); + private: RCLCPP_DISABLE_COPY(ROSDefaultExecutor) }; diff --git a/src/priority_executor/src/default_executor.cpp b/src/priority_executor/src/default_executor.cpp index a305bc2..43bbe3a 100644 --- a/src/priority_executor/src/default_executor.cpp +++ b/src/priority_executor/src/default_executor.cpp @@ -12,25 +12,100 @@ // See the License for the specific language governing permissions and // limitations under the License. -#include "rcpputils/scope_exit.hpp" - -#include "rclcpp/any_executable.hpp" -#include "priority_executor/priority_memory_strategy.hpp" #include "priority_executor/default_executor.hpp" -#include "priority_executor/primes_workload.hpp" +#include "rcpputils/scope_exit.hpp" +#include "rclcpp/any_executable.hpp" +#include "simple_timer/rt-sched.hpp" ROSDefaultExecutor::ROSDefaultExecutor(const rclcpp::ExecutorOptions &options) - : rclcpp::Executor(options) {} + : rclcpp::Executor(options) +{ + logger_ = create_logger(); +} ROSDefaultExecutor::~ROSDefaultExecutor() {} +ROSDefaultMultithreadedExecutor::~ROSDefaultMultithreadedExecutor() {} + + +void ROSDefaultExecutor::wait_for_work(std::chrono::nanoseconds timeout) +{ + { + std::unique_lock lock(memory_strategy_mutex_); + + // Collect the subscriptions and timers to be waited on + memory_strategy_->clear_handles(); + bool has_invalid_weak_nodes = memory_strategy_->collect_entities(weak_nodes_); + + // Clean up any invalid nodes, if they were detected + if (has_invalid_weak_nodes) + { + auto node_it = weak_nodes_.begin(); + auto gc_it = guard_conditions_.begin(); + while (node_it != weak_nodes_.end()) + { + if (node_it->expired()) + { + node_it = weak_nodes_.erase(node_it); + memory_strategy_->remove_guard_condition(*gc_it); + gc_it = guard_conditions_.erase(gc_it); + } + else + { + ++node_it; + ++gc_it; + } + } + } + // clear wait set + rcl_ret_t ret = rcl_wait_set_clear(&wait_set_); + if (ret != RCL_RET_OK) + { + rclcpp::exceptions::throw_from_rcl_error(ret, "Couldn't clear wait set"); + } + + // The size of waitables are accounted for in size of the other entities + ret = rcl_wait_set_resize( + &wait_set_, memory_strategy_->number_of_ready_subscriptions(), + memory_strategy_->number_of_guard_conditions(), memory_strategy_->number_of_ready_timers(), + memory_strategy_->number_of_ready_clients(), memory_strategy_->number_of_ready_services(), + memory_strategy_->number_of_ready_events()); + if (RCL_RET_OK != ret) + { + rclcpp::exceptions::throw_from_rcl_error(ret, "Couldn't resize the wait set"); + } + + if (!memory_strategy_->add_handles_to_wait_set(&wait_set_)) + { + throw std::runtime_error("Couldn't fill wait set"); + } + } + rcl_ret_t status = + rcl_wait(&wait_set_, std::chrono::duration_cast(timeout).count()); + if (status == RCL_RET_WAIT_SET_EMPTY) + { + RCUTILS_LOG_WARN_NAMED( + "rclcpp", + "empty wait set received in rcl_wait(). This should never happen."); + } + else if (status != RCL_RET_OK && status != RCL_RET_TIMEOUT) + { + using rclcpp::exceptions::throw_from_rcl_error; + throw_from_rcl_error(status, "rcl_wait() failed"); + } + + // check the null handles in the wait set and remove them from the handles in memory strategy + // for callback-based entities + memory_strategy_->remove_null_handles(&wait_set_); +} + bool ROSDefaultExecutor::get_next_executable(rclcpp::AnyExecutable &any_executable, std::chrono::nanoseconds timeout) { bool success = false; // Check to see if there are any subscriptions or timers needing service // TODO(wjwwood): improve run to run efficiency of this function // sched_yield(); - wait_for_work(std::chrono::milliseconds(1)); + wait_for_work(timeout); success = get_next_ready_executable(any_executable); return success; } @@ -47,21 +122,37 @@ void ROSDefaultExecutor::spin() rclcpp::AnyExecutable any_executable; if (get_next_executable(any_executable)) { - if (any_executable.timer) - { - if (priority_map.find(any_executable.timer->get_timer_handle()) != priority_map.end()) - { - timespec current_time; - clock_gettime(CLOCK_MONOTONIC_RAW, ¤t_time); - uint64_t millis = (current_time.tv_sec * (uint64_t)1000) + (current_time.tv_nsec / 1000000); - PriorityExecutable next_exec = priority_map[any_executable.timer->get_timer_handle()]; - - auto timer = next_exec.timer_handle; - // TODO: this is really a fire - // log_entry(logger, "timer_" + std::to_string(next_exec.chain_id) + "_release_" + std::to_string(millis + (timer->time_until_trigger().count() / 1000000))); - } - } execute_any_executable(any_executable); } } -} \ No newline at end of file +} + +bool ROSDefaultMultithreadedExecutor::get_next_executable(rclcpp::AnyExecutable &any_executable, std::chrono::nanoseconds timeout) +{ + bool success = false; + // Check to see if there are any subscriptions or timers needing service + // TODO(wjwwood): improve run to run efficiency of this function + + // try to get an executable + success = get_next_ready_executable(any_executable); + std::stringstream oss; + oss << "{\"operation\":\"get_next_executable\", \"result\":\"" << success << "\"}"; + log_entry(logger_, oss.str()); + + // If there are none + if (!success) + { + // Wait for subscriptions or timers to work on + // queue refresh + wait_for_work(timeout); + if (!spinning.load()) + { + return false; + } + // Try again + success = get_next_ready_executable(any_executable); + oss.str(""); + oss << "{\"operation\":\"wait_for_work\", \"result\":\"" << success << "\"}"; + } + return success; +}