commit e3fc02ebb664c9690ba3671ace2afc49b8afb585 Author: Kurt Wilson Date: Tue Mar 29 13:51:58 2022 -0400 initial commit diff --git a/experiments/picas_priority_long.yaml b/experiments/picas_priority_long.yaml new file mode 100644 index 0000000..249b1b8 --- /dev/null +++ b/experiments/picas_priority_long.yaml @@ -0,0 +1,5 @@ +experiment_parameters: + ros__parameters: + experiment_name: "picas_priority_long" + schedule_type: "chain_priority" + count_max: 500 \ No newline at end of file diff --git a/experiments/picas_priority_short.yaml b/experiments/picas_priority_short.yaml new file mode 100644 index 0000000..077b2f5 --- /dev/null +++ b/experiments/picas_priority_short.yaml @@ -0,0 +1,5 @@ +experiment_parameters: + ros__parameters: + experiment_name: "picas_priority_short" + schedule_type: "chain_priority" + count_max: 20 \ No newline at end of file diff --git a/experiments/ros_default_long.yaml b/experiments/ros_default_long.yaml new file mode 100644 index 0000000..664ed51 --- /dev/null +++ b/experiments/ros_default_long.yaml @@ -0,0 +1,5 @@ +experiment_parameters: + ros__parameters: + experiment_name: "ros_default_long" + schedule_type: "default" + count_max: 500 \ No newline at end of file diff --git a/experiments/ros_default_short.yaml b/experiments/ros_default_short.yaml new file mode 100644 index 0000000..9a2259a --- /dev/null +++ b/experiments/ros_default_short.yaml @@ -0,0 +1,5 @@ +experiment_parameters: + ros__parameters: + experiment_name: "ros_default_short" + schedule_type: "default" + count_max: 20 \ No newline at end of file diff --git a/experiments/rtis_deadline_long.yaml b/experiments/rtis_deadline_long.yaml new file mode 100644 index 0000000..b282d72 --- /dev/null +++ b/experiments/rtis_deadline_long.yaml @@ -0,0 +1,5 @@ +experiment_parameters: + ros__parameters: + experiment_name: "rtis_deadline_long" + schedule_type: "deadline" + count_max: 500 \ No newline at end of file diff --git a/experiments/rtis_deadline_short.yaml b/experiments/rtis_deadline_short.yaml new file mode 100644 index 0000000..6c9cc5c --- /dev/null +++ b/experiments/rtis_deadline_short.yaml @@ -0,0 +1,7 @@ +experiment_parameters: + ros__parameters: + experiment_name: "rtis_deadline_short" + schedule_type: "deadline" + count_max: 20 + chain_deadlines: [1000, 1000] + chain_periods: [1000, 1000] \ No newline at end of file diff --git a/src/priority_executor/CMakeLists.txt b/src/priority_executor/CMakeLists.txt new file mode 100644 index 0000000..42275de --- /dev/null +++ b/src/priority_executor/CMakeLists.txt @@ -0,0 +1,126 @@ +cmake_minimum_required(VERSION 3.5) +project(priority_executor) + +# Default to C99 +if(NOT CMAKE_C_STANDARD) + set(CMAKE_C_STANDARD 99) +endif() + +# Default to C++14 +if(NOT CMAKE_CXX_STANDARD) + set(CMAKE_CXX_STANDARD 14) +endif() + +if(CMAKE_COMPILER_IS_GNUCXX OR CMAKE_CXX_COMPILER_ID MATCHES "Clang") + add_compile_options(-Wall -Wextra -Wpedantic) +endif() + +# find dependencies +find_package(ament_cmake REQUIRED) +find_package(rclcpp REQUIRED) +find_package(rcl REQUIRED) +find_package(std_msgs REQUIRED) +find_package(std_srvs REQUIRED) +find_package(simple_timer REQUIRED) + +add_library(priority_executor src/priority_executor.cpp) +target_include_directories(priority_executor PUBLIC + $ + $ +) +ament_target_dependencies(priority_executor + rmw + rclcpp + rcl + simple_timer +) + +add_library(test_nodes src/test_nodes.cpp) +target_include_directories(test_nodes PUBLIC + $ + $ +) +ament_target_dependencies(test_nodes + rmw + rclcpp + rcl + simple_timer + std_msgs +) +target_link_libraries(test_nodes + primes_workload +) + +add_library(primes_workload src/primes_workload.cpp) +target_include_directories(primes_workload PUBLIC + $ + $ +) +ament_target_dependencies(primes_workload + simple_timer +) + + +add_executable(test_publisher src/test_publisher.cpp) +target_include_directories(test_publisher PUBLIC + $ + $) +target_link_libraries(test_publisher + priority_executor + test_nodes + default_executor +) +ament_target_dependencies( + test_publisher + rclcpp + std_msgs + std_srvs + simple_timer +) + +install(TARGETS test_publisher priority_executor + DESTINATION lib/${PROJECT_NAME}) + +add_library(default_executor src/default_executor.cpp) +target_include_directories(default_executor PUBLIC + $ + $) +ament_target_dependencies(default_executor + rclcpp + simple_timer +) +target_link_libraries(default_executor + primes_workload +) + +add_executable(f1tenth_publisher src/f1tenth_test.cpp) +target_include_directories(test_publisher PUBLIC + $ + $) +target_link_libraries(f1tenth_publisher + priority_executor + test_nodes + default_executor +) +ament_target_dependencies(f1tenth_publisher + rclcpp + std_msgs + std_srvs + simple_timer +) + +install(TARGETS f1tenth_publisher priority_executor + DESTINATION lib/${PROJECT_NAME}) + +if(BUILD_TESTING) + find_package(ament_lint_auto REQUIRED) + # the following line skips the linter which checks for copyrights + # uncomment the line when a copyright and license is not present in all source files + #set(ament_cmake_copyright_FOUND TRUE) + # the following line skips cpplint (only works in a git repo) + # uncomment the line when this package is not in a git repo + #set(ament_cmake_cpplint_FOUND TRUE) + ament_lint_auto_find_test_dependencies() +endif() + +ament_package() diff --git a/src/priority_executor/include/priority_executor/default_executor.hpp b/src/priority_executor/include/priority_executor/default_executor.hpp new file mode 100644 index 0000000..8f72871 --- /dev/null +++ b/src/priority_executor/include/priority_executor/default_executor.hpp @@ -0,0 +1,71 @@ +// Copyright 2014 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef RTIS_DEFAULT_EXECUTOR +#define RTIS_DEFAULT_EXECUTOR + +#include + +#include +#include +#include +#include + +#include "rclcpp/executor.hpp" +#include "rclcpp/macros.hpp" +#include "rclcpp/memory_strategies.hpp" +#include "rclcpp/node.hpp" +#include "rclcpp/utilities.hpp" +#include "rclcpp/rate.hpp" +#include "rclcpp/visibility_control.hpp" +#include "priority_executor/priority_memory_strategy.hpp" + +/// Single-threaded executor implementation. +/** + * This is the default executor created by rclcpp::spin. + */ +class ROSDefaultExecutor : public rclcpp::Executor +{ +public: + RCLCPP_SMART_PTR_DEFINITIONS(ROSDefaultExecutor) + std::unordered_map, PriorityExecutable> priority_map; + node_time_logger logger; + + /// Default constructor. See the default constructor for Executor. + RCLCPP_PUBLIC + explicit ROSDefaultExecutor( + const rclcpp::ExecutorOptions &options = rclcpp::ExecutorOptions()); + + /// Default destructor. + RCLCPP_PUBLIC + virtual ~ROSDefaultExecutor(); + + /// Single-threaded implementation of spin. + /** + * This function will block until work comes in, execute it, and then repeat + * the process until canceled. + * It may be interrupt by a call to rclcpp::Executor::cancel() or by ctrl-c + * if the associated context is configured to shutdown on SIGINT. + * \throws std::runtime_error when spin() called while already spinning + */ + RCLCPP_PUBLIC + void + spin() override; + bool get_next_executable(rclcpp::AnyExecutable &any_executable, std::chrono::nanoseconds timeout = std::chrono::nanoseconds(-1)); + +private: + RCLCPP_DISABLE_COPY(ROSDefaultExecutor) +}; + +#endif // RCLCPP__EXECUTORS__SINGLE_THREADED_EXECUTOR_HPP_ diff --git a/src/priority_executor/include/priority_executor/primes_workload.hpp b/src/priority_executor/include/priority_executor/primes_workload.hpp new file mode 100644 index 0000000..143e529 --- /dev/null +++ b/src/priority_executor/include/priority_executor/primes_workload.hpp @@ -0,0 +1,14 @@ +#ifndef RTIS_PRIMES_WORKLOAD +#define RTIS_PRIMES_WORKLOAD +#include +#include +#include +#include +typedef double ktimeunit; +ktimeunit nth_prime_silly(int n, double millis = 100); +ktimeunit get_thread_time(struct timespec *currTime); +// int main() +// { +// printf("%lf\n", nth_prime_silly(100000, 750)); +// } +#endif \ No newline at end of file diff --git a/src/priority_executor/include/priority_executor/priority_executor.hpp b/src/priority_executor/include/priority_executor/priority_executor.hpp new file mode 100644 index 0000000..9fff969 --- /dev/null +++ b/src/priority_executor/include/priority_executor/priority_executor.hpp @@ -0,0 +1,130 @@ +// Copyright 2014 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef RTIS_TIMED_EXECUTOR +#define RTIS_TIMED_EXECUTOR + +#include + +#include +#include +#include +#include +#include + +#include "rclcpp/executor.hpp" +#include "rclcpp/macros.hpp" +#include "rclcpp/memory_strategies.hpp" +#include "rclcpp/node.hpp" +#include "rclcpp/utilities.hpp" +#include "rclcpp/rate.hpp" +#include "rclcpp/visibility_control.hpp" +namespace timed_executor +{ + + /// Single-threaded executor implementation. + /** + * This is the default executor created by rclcpp::spin. + */ + class TimedExecutor : public rclcpp::Executor + { + public: + RCLCPP_SMART_PTR_DEFINITIONS(TimedExecutor) + + /// Default constructor. See the default constructor for Executor. + RCLCPP_PUBLIC + explicit TimedExecutor( + const rclcpp::ExecutorOptions &options = rclcpp::ExecutorOptions(), std::string name = "unnamed executor"); + + /// Default destructor. + RCLCPP_PUBLIC + virtual ~TimedExecutor(); + + /// Single-threaded implementation of spin. + /** + * This function will block until work comes in, execute it, and then repeat + * the process until canceled. + * It may be interrupt by a call to rclcpp::Executor::cancel() or by ctrl-c + * if the associated context is configured to shutdown on SIGINT. + * \throws std::runtime_error when spin() called while already spinning + */ + RCLCPP_PUBLIC + void + spin() override; + unsigned long long get_max_runtime(void); + std::string name; + + void set_use_priorities(bool use_prio); + + private: + RCLCPP_DISABLE_COPY(TimedExecutor) + // TODO: remove these + unsigned long long maxRuntime = 0; + unsigned long long start_time = 0; + int recording = 0; + void execute_subscription(rclcpp::AnyExecutable subscription); + bool + get_next_executable(rclcpp::AnyExecutable &any_executable, std::chrono::nanoseconds timeout = std::chrono::nanoseconds(-1)); + void + wait_for_work(std::chrono::nanoseconds timeout); + + bool + get_next_ready_executable(rclcpp::AnyExecutable &any_executable); + + bool use_priorities = true; + }; + +} // namespace timed_executor + +static void +take_and_do_error_handling( + const char *action_description, + const char *topic_or_service_name, + std::function take_action, + std::function handle_action) +{ + bool taken = false; + try + { + taken = take_action(); + } + catch (const rclcpp::exceptions::RCLError &rcl_error) + { + RCLCPP_ERROR( + rclcpp::get_logger("rclcpp"), + "executor %s '%s' unexpectedly failed: %s", + action_description, + topic_or_service_name, + rcl_error.what()); + } + if (taken) + { + handle_action(); + } + else + { + // Message or Service was not taken for some reason. + // Note that this can be normal, if the underlying middleware needs to + // interrupt wait spuriously it is allowed. + // So in that case the executor cannot tell the difference in a + // spurious wake up and an entity actually having data until trying + // to take the data. + RCLCPP_DEBUG( + rclcpp::get_logger("rclcpp"), + "executor %s '%s' failed to take anything", + action_description, + topic_or_service_name); + } +} +#endif // RCLCPP__EXECUTORS__SINGLE_THREADED_EXECUTOR_HPP_ diff --git a/src/priority_executor/include/priority_executor/priority_memory_strategy.hpp b/src/priority_executor/include/priority_executor/priority_memory_strategy.hpp new file mode 100644 index 0000000..e2e26b2 --- /dev/null +++ b/src/priority_executor/include/priority_executor/priority_memory_strategy.hpp @@ -0,0 +1,1090 @@ +// Copyright 2015 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef RTIS_PRIORITY_STRATEGY +#define RTIS_PRIORITY_STRATEGY + +#include +#include +#include +#include + +#include "rcl/allocator.h" + +#include "rclcpp/allocator/allocator_common.hpp" +#include "rclcpp/memory_strategy.hpp" +#include "rclcpp/node.hpp" +#include "rclcpp/visibility_control.hpp" + +#include "rcutils/logging_macros.h" + +#include "rmw/types.h" + +#include "simple_timer/rt-sched.hpp" + +/// Delegate for handling memory allocations while the Executor is executing. +/** + * By default, the memory strategy dynamically allocates memory for structures that come in from + * the rmw implementation after the executor waits for work, based on the number of entities that + * come through. + */ + +enum ExecutableType +{ + SUBSCRIPTION, + SERVICE, + CLIENT, + TIMER, + WAITABLE +}; + +enum ExecutableScheduleType +{ + CHAIN_INDEPENDENT_PRIORITY, // not used here + CHAIN_AWARE_PRIORITY, + DEADLINE, + DEFAULT, // not used here +}; + +class PriorityExecutable +{ +public: + std::shared_ptr handle; + ExecutableType type; + bool can_be_run = true; + std::shared_ptr waitable; + ExecutableScheduleType sched_type; + + int priority; + long period = 1000; // milliseconds + + bool is_first_in_chain = false; + bool is_last_in_chain = false; + // chain aware deadlines + std::deque *deadlines = nullptr; + std::shared_ptr timer_handle; + // just used for logging + int chain_id = 0; + + // chain aware priority + int counter = 0; + + PriorityExecutable(std::shared_ptr h, int p, ExecutableType t, ExecutableScheduleType sched_type = CHAIN_INDEPENDENT_PRIORITY) + { + handle = h; + type = t; + if (sched_type == CHAIN_INDEPENDENT_PRIORITY || sched_type == CHAIN_AWARE_PRIORITY) + { + priority = p; + } + else if (sched_type == DEADLINE) + { + period = p; + } + this->sched_type = sched_type; + } + + void dont_run() + { + this->can_be_run = false; + } + + void allow_run() + { + if (this->can_be_run) + { + // release has been detected + } + else + { + this->can_be_run = true; + } + } + + PriorityExecutable() + { + handle = nullptr; + priority = 0; + type = SUBSCRIPTION; + } + + void increment_counter() + { + this->counter += 1; + } +}; + +class PriorityExecutableComparator +{ +public: + bool operator()(const PriorityExecutable *p1, const PriorityExecutable *p2) + { + if (p1 == nullptr || p2 == nullptr) + { + // TODO: realistic value + return 0; + } + if (p1->sched_type != p2->sched_type) + { + if (p1->sched_type == DEADLINE) + { + return false; + } + else if (p2->sched_type == DEADLINE) + { + return true; + } + if (p1->sched_type == CHAIN_INDEPENDENT_PRIORITY) + { + return false; + } + else if (p2->sched_type == CHAIN_INDEPENDENT_PRIORITY) + { + return true; + } + else if (p1->sched_type == CHAIN_AWARE_PRIORITY) + { + return false; + } + else + { + return true; + } + } + if (p1->sched_type == CHAIN_INDEPENDENT_PRIORITY) + { + // lower value runs first + return p1->priority > p2->priority; + } + if (p1->sched_type == CHAIN_AWARE_PRIORITY) + { + if (p1->priority != p2->priority) + { + return p1->priority > p2->priority; + } + // return p1->counter > p2->counter; + return 0; + } + if (p1->sched_type == DEADLINE) + { + // TODO: use the counter logic here as well + + uint p1_deadline = 0; + uint p2_deadline = 0; + if (p1->deadlines != nullptr && !p1->deadlines->empty()) + { + p1_deadline = p1->deadlines->front(); + } + if (p2->deadlines != nullptr && !p2->deadlines->empty()) + { + p2_deadline = p2->deadlines->front(); + } + if (p1_deadline == 0) + { + return true; + } + if (p2_deadline == 0) + { + return false; + } + if (p1_deadline == p2_deadline) + { + // these must be from the same chain + // settle the difference with the counter + return p1->counter > p2->counter; + } + return p1_deadline > p2_deadline; + } + else + { + std::cout << "invalid compare opration on priority_exec" << std::endl; + return 0; + } + } +}; +template > +class PriorityMemoryStrategy : public rclcpp::memory_strategy::MemoryStrategy +{ +public: + RCLCPP_SMART_PTR_DEFINITIONS(PriorityMemoryStrategy) + + node_time_logger logger; + bool is_f1tenth = false; + + using VoidAllocTraits = typename rclcpp::allocator::AllocRebind; + using VoidAlloc = typename VoidAllocTraits::allocator_type; + + explicit PriorityMemoryStrategy(std::shared_ptr allocator) + { + allocator_ = std::make_shared(*allocator.get()); + } + + PriorityMemoryStrategy() + { + allocator_ = std::make_shared(); + } + + void add_guard_condition(const rcl_guard_condition_t *guard_condition) override + { + for (const auto &existing_guard_condition : guard_conditions_) + { + if (existing_guard_condition == guard_condition) + { + return; + } + } + guard_conditions_.push_back(guard_condition); + } + + void remove_guard_condition(const rcl_guard_condition_t *guard_condition) override + { + for (auto it = guard_conditions_.begin(); it != guard_conditions_.end(); ++it) + { + if (*it == guard_condition) + { + guard_conditions_.erase(it); + break; + } + } + } + + void clear_handles() override + { + subscription_handles_.clear(); + service_handles_.clear(); + client_handles_.clear(); + timer_handles_.clear(); + waitable_handles_.clear(); + + // priority_queue doesn't have a clear function, so we swap it with an empty one. `empty` will go out of scope, and be cleared + // all_executables_ = std::priority_queue, PriorityExecutableComparator>(); + std::priority_queue, PriorityExecutableComparator> empty; + std::swap(all_executables_, empty); + if (!all_executables_.empty()) + { + std::cout << "couldn't clear all exec" << std::endl; + } + } + + void remove_null_handles(rcl_wait_set_t *wait_set) override + { + // TODO(jacobperron): Check if wait set sizes are what we expect them to be? + // e.g. wait_set->size_of_clients == client_handles_.size() + + // Important to use subscription_handles_.size() instead of wait set's size since + // there may be more subscriptions in the wait set due to Waitables added to the end. + // The same logic applies for other entities. + for (size_t i = 0; i < subscription_handles_.size(); ++i) + { + if (!wait_set->subscriptions[i]) + { + priority_map[subscription_handles_[i]].dont_run(); + subscription_handles_[i].reset(); + } + else + { + priority_map[subscription_handles_[i]].allow_run(); + } + } + for (size_t i = 0; i < service_handles_.size(); ++i) + { + if (!wait_set->services[i]) + { + priority_map[service_handles_[i]].dont_run(); + service_handles_[i].reset(); + } + else + { + priority_map[service_handles_[i]].allow_run(); + } + } + for (size_t i = 0; i < client_handles_.size(); ++i) + { + if (!wait_set->clients[i]) + { + priority_map[client_handles_[i]].dont_run(); + client_handles_[i].reset(); + } + else + { + priority_map[client_handles_[i]].allow_run(); + } + } + for (size_t i = 0; i < timer_handles_.size(); ++i) + { + if (!wait_set->timers[i]) + { + priority_map[timer_handles_[i]].dont_run(); + timer_handles_[i].reset(); + } + else + { + priority_map[timer_handles_[i]].allow_run(); + } + } + for (size_t i = 0; i < waitable_handles_.size(); ++i) + { + if (!waitable_handles_[i]->is_ready(wait_set)) + { + priority_map[waitable_handles_[i]].dont_run(); + waitable_handles_[i].reset(); + } + else + { + priority_map[waitable_handles_[i]].allow_run(); + } + } + + subscription_handles_.erase( + std::remove(subscription_handles_.begin(), subscription_handles_.end(), nullptr), + subscription_handles_.end()); + + service_handles_.erase( + std::remove(service_handles_.begin(), service_handles_.end(), nullptr), + service_handles_.end()); + + client_handles_.erase( + std::remove(client_handles_.begin(), client_handles_.end(), nullptr), + client_handles_.end()); + + timer_handles_.erase( + std::remove(timer_handles_.begin(), timer_handles_.end(), nullptr), + timer_handles_.end()); + + waitable_handles_.erase( + std::remove(waitable_handles_.begin(), waitable_handles_.end(), nullptr), + waitable_handles_.end()); + } + + bool collect_entities(const WeakNodeList &weak_nodes) override + { + bool has_invalid_weak_nodes = false; + for (auto &weak_node : weak_nodes) + { + auto node = weak_node.lock(); + if (!node) + { + has_invalid_weak_nodes = true; + continue; + } + for (auto &weak_group : node->get_callback_groups()) + { + auto group = weak_group.lock(); + if (!group || !group->can_be_taken_from().load()) + { + continue; + } + group->find_subscription_ptrs_if( + [this](const rclcpp::SubscriptionBase::SharedPtr &subscription) + { + auto subscription_handle = subscription->get_subscription_handle(); + subscription_handles_.push_back(subscription_handle); + all_executables_.push(get_and_reset_priority(subscription_handle, SUBSCRIPTION)); + + return false; + }); + group->find_service_ptrs_if( + [this](const rclcpp::ServiceBase::SharedPtr &service) + { + all_executables_.push(get_and_reset_priority(service->get_service_handle(), SERVICE)); + service_handles_.push_back(service->get_service_handle()); + return false; + }); + group->find_client_ptrs_if( + [this](const rclcpp::ClientBase::SharedPtr &client) + { + all_executables_.push(get_and_reset_priority(client->get_client_handle(), CLIENT)); + client_handles_.push_back(client->get_client_handle()); + return false; + }); + group->find_timer_ptrs_if( + [this](const rclcpp::TimerBase::SharedPtr &timer) + { + all_executables_.push(get_and_reset_priority(timer->get_timer_handle(), TIMER)); + timer_handles_.push_back(timer->get_timer_handle()); + return false; + }); + group->find_waitable_ptrs_if( + [this](const rclcpp::Waitable::SharedPtr &waitable) + { + all_executables_.push(get_and_reset_priority(waitable, WAITABLE)); + waitable_handles_.push_back(waitable); + return false; + }); + } + } + return has_invalid_weak_nodes; + } + + void add_waitable_handle(const rclcpp::Waitable::SharedPtr &waitable) override + { + if (nullptr == waitable) + { + throw std::runtime_error("waitable object unexpectedly nullptr"); + } + waitable_handles_.push_back(waitable); + } + + bool add_handles_to_wait_set(rcl_wait_set_t *wait_set) override + { + for (auto subscription : subscription_handles_) + { + if (rcl_wait_set_add_subscription(wait_set, subscription.get(), NULL) != RCL_RET_OK) + { + RCUTILS_LOG_ERROR_NAMED( + "rclcpp", + "Couldn't add subscription to wait set: %s", rcl_get_error_string().str); + return false; + } + } + + for (auto client : client_handles_) + { + if (rcl_wait_set_add_client(wait_set, client.get(), NULL) != RCL_RET_OK) + { + RCUTILS_LOG_ERROR_NAMED( + "rclcpp", + "Couldn't add client to wait set: %s", rcl_get_error_string().str); + return false; + } + } + + for (auto service : service_handles_) + { + if (rcl_wait_set_add_service(wait_set, service.get(), NULL) != RCL_RET_OK) + { + RCUTILS_LOG_ERROR_NAMED( + "rclcpp", + "Couldn't add service to wait set: %s", rcl_get_error_string().str); + return false; + } + } + + for (auto timer : timer_handles_) + { + if (rcl_wait_set_add_timer(wait_set, timer.get(), NULL) != RCL_RET_OK) + { + RCUTILS_LOG_ERROR_NAMED( + "rclcpp", + "Couldn't add timer to wait set: %s", rcl_get_error_string().str); + return false; + } + } + + for (auto guard_condition : guard_conditions_) + { + if (rcl_wait_set_add_guard_condition(wait_set, guard_condition, NULL) != RCL_RET_OK) + { + RCUTILS_LOG_ERROR_NAMED( + "rclcpp", + "Couldn't add guard_condition to wait set: %s", + rcl_get_error_string().str); + return false; + } + } + + for (auto waitable : waitable_handles_) + { + if (!waitable->add_to_wait_set(wait_set)) + { + RCUTILS_LOG_ERROR_NAMED( + "rclcpp", + "Couldn't add waitable to wait set: %s", rcl_get_error_string().str); + return false; + } + } + return true; + } + + void + get_next_executable( + rclcpp::AnyExecutable &any_exec, + const WeakNodeList &weak_nodes) + { + const PriorityExecutable *next_exec = nullptr; + while (!all_executables_.empty()) + { + next_exec = all_executables_.top(); + all_executables_.pop(); + if (!next_exec->can_be_run) + { + continue; + } + ExecutableType type = next_exec->type; + switch (type) + { + case SUBSCRIPTION: + { + std::shared_ptr subs_handle = std::static_pointer_cast(next_exec->handle); + auto subscription = get_subscription_by_handle(subs_handle, weak_nodes); + if (subscription) + { + auto group = get_group_by_subscription(subscription, weak_nodes); + if (!group) + { + // Group was not found, meaning the waitable is not valid... + // Remove it from the ready list and continue looking + // it = subscription_handles_.erase(it); + continue; + } + if (!group->can_be_taken_from().load()) + { + // Group is mutually exclusive and is being used, so skip it for now + // Leave it to be checked next time, but continue searching + // ++it; + continue; + } + any_exec.callback_group = group; + any_exec.subscription = subscription; + any_exec.node_base = get_node_by_group(group, weak_nodes); + // std::cout << "Using new priority sub " << subscription->get_topic_name() << std::endl; + } + } + break; + case SERVICE: + { + std::shared_ptr service_handle = std::static_pointer_cast(next_exec->handle); + auto service = get_service_by_handle(service_handle, weak_nodes); + if (service) + { + auto group = get_group_by_service(service, weak_nodes); + if (!group) + { + // Group was not found, meaning the waitable is not valid... + // Remove it from the ready list and continue looking + // it = subscription_handles_.erase(it); + continue; + } + if (!group->can_be_taken_from().load()) + { + // Group is mutually exclusive and is being used, so skip it for now + // Leave it to be checked next time, but continue searching + // ++it; + continue; + } + any_exec.callback_group = group; + any_exec.service = service; + any_exec.node_base = get_node_by_group(group, weak_nodes); + // std::cout << "Using new priority service " << service->get_service_name() << std::endl; + } + } + break; + case CLIENT: + { + std::shared_ptr client_handle = std::static_pointer_cast(next_exec->handle); + auto client = get_client_by_handle(client_handle, weak_nodes); + if (client) + { + auto group = get_group_by_client(client, weak_nodes); + if (!group) + { + // Group was not found, meaning the waitable is not valid... + // Remove it from the ready list and continue looking + // it = subscription_handles_.erase(it); + continue; + } + if (!group->can_be_taken_from().load()) + { + // Group is mutually exclusive and is being used, so skip it for now + // Leave it to be checked next time, but continue searching + // ++it; + continue; + } + any_exec.callback_group = group; + any_exec.client = client; + any_exec.node_base = get_node_by_group(group, weak_nodes); + // std::cout << "Using new priority client " << client->get_service_name() << std::endl; + } + } + break; + case TIMER: + { + std::shared_ptr timer_handle = std::static_pointer_cast(next_exec->handle); + auto timer = get_timer_by_handle(timer_handle, weak_nodes); + if (timer) + { + auto group = get_group_by_timer(timer, weak_nodes); + if (!group) + { + // Group was not found, meaning the waitable is not valid... + // Remove it from the ready list and continue looking + // it = subscription_handles_.erase(it); + continue; + } + if (!group->can_be_taken_from().load()) + { + // Group is mutually exclusive and is being used, so skip it for now + // Leave it to be checked next time, but continue searching + // ++it; + continue; + } + any_exec.callback_group = group; + any_exec.timer = timer; + any_exec.node_base = get_node_by_group(group, weak_nodes); + } + } + break; + case WAITABLE: + { + std::shared_ptr waitable_handle = std::static_pointer_cast(next_exec->waitable); + auto waitable = waitable_handle; + if (waitable) + { + auto group = get_group_by_waitable(waitable, weak_nodes); + if (!group) + { + // Group was not found, meaning the waitable is not valid... + // Remove it from the ready list and continue looking + // it = subscription_handles_.erase(it); + continue; + } + if (!group->can_be_taken_from().load()) + { + // Group is mutually exclusive and is being used, so skip it for now + // Leave it to be checked next time, but continue searching + // ++it; + continue; + } + any_exec.callback_group = group; + any_exec.waitable = waitable; + any_exec.node_base = get_node_by_group(group, weak_nodes); + // std::cout << "Using new priority waitable" << std::endl; + } + } + break; + default: + // std::cout << "Unknown type from priority!!!" << std::endl; + break; + } + // callback is about to be released + if (next_exec->is_first_in_chain && next_exec->sched_type != DEADLINE) + { + timespec current_time; + clock_gettime(CLOCK_MONOTONIC_RAW, ¤t_time); + uint64_t millis = (current_time.tv_sec * (uint64_t)1000) + (current_time.tv_nsec / 1000000); + + auto timer = next_exec->timer_handle; + int64_t time_until_next_call = timer->time_until_trigger().count() / 1000000; + // std::cout << "end of chain. time until trigger: " << std::to_string(time_until_next_call) << std::endl; + log_entry(logger, "timer_" + std::to_string(next_exec->chain_id) + "_release_" + std::to_string(millis + time_until_next_call)); + if (next_exec->chain_id == 0 && is_f1tenth) + { + log_entry(logger, "timer_" + std::to_string(next_exec->chain_id + 1) + "_release_" + std::to_string(millis + time_until_next_call)); + } + } + if (next_exec->is_first_in_chain && next_exec->sched_type == DEADLINE) + { + if (next_exec->timer_handle == nullptr) + { + std::cout << "tried to use a chain without a timer handle!!!" << std::endl; + } + timespec current_time; + clock_gettime(CLOCK_MONOTONIC_RAW, ¤t_time); + uint64_t millis = (current_time.tv_sec * (uint64_t)1000) + (current_time.tv_nsec / 1000000); + auto timer = next_exec->timer_handle; + if (timer == nullptr) + { + std::cout << "somehow, this timer handle didn't have an associated timer" << std::endl; + } + int64_t time_until_next_call = timer->time_until_trigger().count() / 1000000; + // std::cout << "end of chain. time until trigger: " << std::to_string(time_until_next_call) << std::endl; + log_entry(logger, "timer_" + std::to_string(next_exec->chain_id) + "_release_" + std::to_string(millis + time_until_next_call)); + uint64_t next_deadline = millis + time_until_next_call + next_exec->period; + next_exec->deadlines->push_back(next_deadline); + log_entry(logger, "deadline_" + std::to_string(next_exec->chain_id) + "_" + std::to_string(next_deadline)); + // std::cout << "deadline set" << std::endl; + } + if (next_exec->is_last_in_chain && next_exec->sched_type == DEADLINE) + { + if (!next_exec->deadlines->empty() && (!is_f1tenth || next_exec->chain_id != 0)) + next_exec->deadlines->pop_front(); + } + if (next_exec->sched_type == CHAIN_AWARE_PRIORITY || next_exec->sched_type == DEADLINE) + { + // this is safe, since we popped it earlier + // get a mutable reference + // TODO: find a cleaner way to do this + PriorityExecutable *mut_executable = get_priority_settings(next_exec->handle); + // std::cout << "running chain aware cb" << std::endl; + mut_executable->increment_counter(); + } + return; + } + } + + void + get_next_subscription( + rclcpp::AnyExecutable &any_exec, + const WeakNodeList &weak_nodes) override + { + auto it = subscription_handles_.begin(); + while (it != subscription_handles_.end()) + { + auto subscription = get_subscription_by_handle(*it, weak_nodes); + if (subscription) + { + // Find the group for this handle and see if it can be serviced + auto group = get_group_by_subscription(subscription, weak_nodes); + if (!group) + { + // Group was not found, meaning the subscription is not valid... + // Remove it from the ready list and continue looking + it = subscription_handles_.erase(it); + continue; + } + if (!group->can_be_taken_from().load()) + { + // Group is mutually exclusive and is being used, so skip it for now + // Leave it to be checked next time, but continue searching + ++it; + continue; + } + // Otherwise it is safe to set and return the any_exec + any_exec.subscription = subscription; + any_exec.callback_group = group; + any_exec.node_base = get_node_by_group(group, weak_nodes); + subscription_handles_.erase(it); + return; + } + // Else, the subscription is no longer valid, remove it and continue + it = subscription_handles_.erase(it); + } + } + + void + get_next_service( + rclcpp::AnyExecutable &any_exec, + const WeakNodeList &weak_nodes) override + { + auto it = service_handles_.begin(); + while (it != service_handles_.end()) + { + auto service = get_service_by_handle(*it, weak_nodes); + if (service) + { + // Find the group for this handle and see if it can be serviced + auto group = get_group_by_service(service, weak_nodes); + if (!group) + { + // Group was not found, meaning the service is not valid... + // Remove it from the ready list and continue looking + it = service_handles_.erase(it); + continue; + } + if (!group->can_be_taken_from().load()) + { + // Group is mutually exclusive and is being used, so skip it for now + // Leave it to be checked next time, but continue searching + ++it; + continue; + } + // Otherwise it is safe to set and return the any_exec + any_exec.service = service; + any_exec.callback_group = group; + any_exec.node_base = get_node_by_group(group, weak_nodes); + service_handles_.erase(it); + return; + } + // Else, the service is no longer valid, remove it and continue + it = service_handles_.erase(it); + } + } + + void + get_next_client(rclcpp::AnyExecutable &any_exec, const WeakNodeList &weak_nodes) override + { + auto it = client_handles_.begin(); + while (it != client_handles_.end()) + { + auto client = get_client_by_handle(*it, weak_nodes); + if (client) + { + // Find the group for this handle and see if it can be serviced + auto group = get_group_by_client(client, weak_nodes); + if (!group) + { + // Group was not found, meaning the service is not valid... + // Remove it from the ready list and continue looking + it = client_handles_.erase(it); + continue; + } + if (!group->can_be_taken_from().load()) + { + // Group is mutually exclusive and is being used, so skip it for now + // Leave it to be checked next time, but continue searching + ++it; + continue; + } + // Otherwise it is safe to set and return the any_exec + any_exec.client = client; + any_exec.callback_group = group; + any_exec.node_base = get_node_by_group(group, weak_nodes); + client_handles_.erase(it); + return; + } + // Else, the service is no longer valid, remove it and continue + it = client_handles_.erase(it); + } + } + + void + get_next_timer( + rclcpp::AnyExecutable &any_exec, + const WeakNodeList &weak_nodes) override + { + auto it = timer_handles_.begin(); + while (it != timer_handles_.end()) + { + auto timer = get_timer_by_handle(*it, weak_nodes); + if (timer) + { + // Find the group for this handle and see if it can be serviced + auto group = get_group_by_timer(timer, weak_nodes); + if (!group) + { + // Group was not found, meaning the timer is not valid... + // Remove it from the ready list and continue looking + it = timer_handles_.erase(it); + continue; + } + if (!group->can_be_taken_from().load()) + { + // Group is mutually exclusive and is being used, so skip it for now + // Leave it to be checked next time, but continue searching + ++it; + continue; + } + // Otherwise it is safe to set and return the any_exec + any_exec.timer = timer; + any_exec.callback_group = group; + any_exec.node_base = get_node_by_group(group, weak_nodes); + timer_handles_.erase(it); + return; + } + // Else, the service is no longer valid, remove it and continue + it = timer_handles_.erase(it); + } + } + + void + get_next_waitable(rclcpp::AnyExecutable &any_exec, const WeakNodeList &weak_nodes) override + { + auto it = waitable_handles_.begin(); + while (it != waitable_handles_.end()) + { + auto waitable = *it; + if (waitable) + { + // Find the group for this handle and see if it can be serviced + auto group = get_group_by_waitable(waitable, weak_nodes); + if (!group) + { + // Group was not found, meaning the waitable is not valid... + // Remove it from the ready list and continue looking + it = waitable_handles_.erase(it); + continue; + } + if (!group->can_be_taken_from().load()) + { + // Group is mutually exclusive and is being used, so skip it for now + // Leave it to be checked next time, but continue searching + ++it; + continue; + } + // Otherwise it is safe to set and return the any_exec + any_exec.waitable = waitable; + any_exec.callback_group = group; + any_exec.node_base = get_node_by_group(group, weak_nodes); + waitable_handles_.erase(it); + return; + } + // Else, the waitable is no longer valid, remove it and continue + it = waitable_handles_.erase(it); + } + } + + rcl_allocator_t get_allocator() override + { + return rclcpp::allocator::get_rcl_allocator(*allocator_.get()); + } + + size_t number_of_ready_subscriptions() const override + { + size_t number_of_subscriptions = subscription_handles_.size(); + // std::cout << "ready_raw: " << number_of_subscriptions << std::endl; + for (auto waitable : waitable_handles_) + { + number_of_subscriptions += waitable->get_number_of_ready_subscriptions(); + } + return number_of_subscriptions; + } + + size_t number_of_ready_services() const override + { + size_t number_of_services = service_handles_.size(); + for (auto waitable : waitable_handles_) + { + number_of_services += waitable->get_number_of_ready_services(); + } + return number_of_services; + } + + size_t number_of_ready_events() const override + { + size_t number_of_events = 0; + for (auto waitable : waitable_handles_) + { + number_of_events += waitable->get_number_of_ready_events(); + } + return number_of_events; + } + + size_t number_of_ready_clients() const override + { + size_t number_of_clients = client_handles_.size(); + for (auto waitable : waitable_handles_) + { + number_of_clients += waitable->get_number_of_ready_clients(); + } + return number_of_clients; + } + + size_t number_of_guard_conditions() const override + { + size_t number_of_guard_conditions = guard_conditions_.size(); + for (auto waitable : waitable_handles_) + { + number_of_guard_conditions += waitable->get_number_of_ready_guard_conditions(); + } + return number_of_guard_conditions; + } + + size_t number_of_ready_timers() const override + { + size_t number_of_timers = timer_handles_.size(); + for (auto waitable : waitable_handles_) + { + number_of_timers += waitable->get_number_of_ready_timers(); + } + return number_of_timers; + } + + size_t number_of_waitables() const override + { + return waitable_handles_.size(); + } + + void set_executable_priority(std::shared_ptr handle, int priority, ExecutableType t) + { + // TODO: any sanity checks should go here + // priority_map.insert(executable, priority); + priority_map[handle] = PriorityExecutable(handle, priority, t); + } + void set_executable_priority(std::shared_ptr handle, int priority, ExecutableType t, ExecutableScheduleType sc, int chain_index) + { + // TODO: any sanity checks should go here + // priority_map.insert(executable, priority); + priority_map[handle] = PriorityExecutable(handle, priority, t, sc); + priority_map[handle].chain_id = chain_index; + } + + void set_executable_deadline(std::shared_ptr handle, int period, ExecutableType t, int chain_id) + { + // TODO: any sanity checks should go here + // priority_map.insert(executable, priority); + priority_map[handle] = PriorityExecutable(handle, period, t, DEADLINE); + priority_map[handle].chain_id = chain_id; + } + + int get_priority(std::shared_ptr executable) + { + auto search = priority_map.find(executable); + if (search != priority_map.end()) + { + return search->second.priority; + } + else + { + return 0; + } + } + + PriorityExecutable *get_priority_settings(std::shared_ptr executable) + { + auto search = priority_map.find(executable); + if (search != priority_map.end()) + { + return &(search->second); + } + else + { + return nullptr; + } + } + + void set_first_in_chain(std::shared_ptr exec_handle) + { + PriorityExecutable *settings = get_priority_settings(exec_handle); + settings->is_first_in_chain = true; + } + + void set_last_in_chain(std::shared_ptr exec_handle) + { + PriorityExecutable *settings = get_priority_settings(exec_handle); + settings->is_last_in_chain = true; + } + + void assign_deadlines_queue(std::shared_ptr exec_handle, std::deque *deadlines) + { + PriorityExecutable *settings = get_priority_settings(exec_handle); + settings->deadlines = deadlines; + } + +private: + PriorityExecutable *get_and_reset_priority(std::shared_ptr executable, ExecutableType t) + { + PriorityExecutable *p = get_priority_settings(executable); + if (p == nullptr) + { + priority_map[executable] = PriorityExecutable(executable, 0, t); + p = &(priority_map[executable]); + } + // p->can_be_run = true; + return p; + } + + template + using VectorRebind = + std::vector::template rebind_alloc>; + + VectorRebind guard_conditions_; + + VectorRebind> subscription_handles_; + VectorRebind> service_handles_; + VectorRebind> client_handles_; + VectorRebind> timer_handles_; + VectorRebind> waitable_handles_; + + std::shared_ptr allocator_; + + // TODO: evaluate using node/subscription namespaced strings as keys + + // holds *all* handle->priority mappings + std::unordered_map, PriorityExecutable> priority_map; + + // hold *only valid* executable+priorities + std::priority_queue, PriorityExecutableComparator> all_executables_; +}; + +#endif // RCLCPP__STRATEGIES__ALLOCATOR_MEMORY_STRATEGY_HPP_ diff --git a/src/priority_executor/include/priority_executor/test_nodes.hpp b/src/priority_executor/include/priority_executor/test_nodes.hpp new file mode 100644 index 0000000..84c3fa8 --- /dev/null +++ b/src/priority_executor/include/priority_executor/test_nodes.hpp @@ -0,0 +1,39 @@ +#include "rclcpp/rclcpp.hpp" +#include "simple_timer/rt-sched.hpp" +#include "std_msgs/msg/string.hpp" + +using namespace std::chrono_literals; +using std::placeholders::_1; + +class PublisherNode : public rclcpp::Node +{ +public: + PublisherNode(std::string publish_topic, int chain, int period, double runtime); + + rclcpp::TimerBase::SharedPtr timer_; + uint count_max = 20; + node_time_logger logger_; + +private: + rclcpp::Publisher::SharedPtr publisher_; + uint count_; + int chain; + double runtime; + int period; +}; +class DummyWorker : public rclcpp::Node +{ +public: + DummyWorker(const std::string &name, double runtime, int chain, int number, bool is_multichain = false); + node_time_logger logger_; + +private: + double runtime; + int number; + int chain; + rclcpp::Publisher::SharedPtr publisher_; + void topic_callback(const std_msgs::msg::String::SharedPtr msg) const; + +public: + rclcpp::Subscription::SharedPtr subscription_; +}; \ No newline at end of file diff --git a/src/priority_executor/package.xml b/src/priority_executor/package.xml new file mode 100644 index 0000000..15635ac --- /dev/null +++ b/src/priority_executor/package.xml @@ -0,0 +1,24 @@ + + + + priority_executor + 0.0.0 + TODO: Package description + kurt + TODO: License declaration + + ament_cmake + + rclcpp + rcl + std_msgs + std_srvs + simple_timer + + ament_lint_auto + ament_lint_common + + + ament_cmake + + diff --git a/src/priority_executor/src/default_executor.cpp b/src/priority_executor/src/default_executor.cpp new file mode 100644 index 0000000..4caaa22 --- /dev/null +++ b/src/priority_executor/src/default_executor.cpp @@ -0,0 +1,66 @@ +// Copyright 2015 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "rcpputils/scope_exit.hpp" + +#include "rclcpp/any_executable.hpp" +#include "priority_executor/priority_memory_strategy.hpp" +#include "priority_executor/default_executor.hpp" +#include "priority_executor/primes_workload.hpp" + +ROSDefaultExecutor::ROSDefaultExecutor(const rclcpp::ExecutorOptions &options) + : rclcpp::Executor(options) {} + +ROSDefaultExecutor::~ROSDefaultExecutor() {} + +bool ROSDefaultExecutor::get_next_executable(rclcpp::AnyExecutable &any_executable, std::chrono::nanoseconds timeout) +{ + bool success = false; + // Check to see if there are any subscriptions or timers needing service + // TODO(wjwwood): improve run to run efficiency of this function + // sched_yield(); + wait_for_work(std::chrono::milliseconds(1)); + success = get_next_ready_executable(any_executable); + return success; +} + +void ROSDefaultExecutor::spin() +{ + if (spinning.exchange(true)) + { + throw std::runtime_error("spin() called while already spinning"); + } + RCPPUTILS_SCOPE_EXIT(this->spinning.store(false);); + while (rclcpp::ok(this->context_) && spinning.load()) + { + rclcpp::AnyExecutable any_executable; + if (get_next_executable(any_executable)) + { + if (any_executable.timer) + { + if (priority_map.find(any_executable.timer->get_timer_handle()) != priority_map.end()) + { + timespec current_time; + clock_gettime(CLOCK_MONOTONIC_RAW, ¤t_time); + uint64_t millis = (current_time.tv_sec * (uint64_t)1000) + (current_time.tv_nsec / 1000000); + PriorityExecutable next_exec = priority_map[any_executable.timer->get_timer_handle()]; + + auto timer = next_exec.timer_handle; + log_entry(logger, "timer_" + std::to_string(next_exec.chain_id) + "_release_" + std::to_string(millis + (timer->time_until_trigger().count() / 1000000))); + } + } + execute_any_executable(any_executable); + } + } +} diff --git a/src/priority_executor/src/f1tenth_test.cpp b/src/priority_executor/src/f1tenth_test.cpp new file mode 100644 index 0000000..9d32d43 --- /dev/null +++ b/src/priority_executor/src/f1tenth_test.cpp @@ -0,0 +1,330 @@ +#include "rclcpp/rclcpp.hpp" +#include "simple_timer/rt-sched.hpp" +#include "priority_executor/priority_memory_strategy.hpp" +#include "priority_executor/priority_executor.hpp" +#include "priority_executor/test_nodes.hpp" +#include "priority_executor/default_executor.hpp" +#include +#include +#include + +typedef struct +{ + std::shared_ptr executor; + std::shared_ptr> strat; + + std::shared_ptr default_executor; +} executor_strat; + +void spin_exec(executor_strat strat, int id, int index) +{ + cpu_set_t cpuset; + CPU_ZERO(&cpuset); + CPU_SET(id, &cpuset); + + pthread_t current_thread = pthread_self(); + int result; + if (result = pthread_setaffinity_np(current_thread, sizeof(cpu_set_t), &cpuset)) + { + std::cout << "problem setting cpu core" << std::endl; + std::cout << strerror(result) << std::endl; + } + sched_param sch_params; + + // experiment: RT threads have priority 99, all others 98 + if (index < 4) + { + sch_params.sched_priority = 99; + } + else + { + sch_params.sched_priority = 98; + } + // sch_params.sched_priority = 99 - index; + if (pthread_setschedparam(pthread_self(), SCHED_FIFO, &sch_params)) + { + RCLCPP_INFO(rclcpp::get_logger("rclcpp"), "spin_rt thread has an error."); + } + if (strat.executor != nullptr) + { + strat.executor->spin(); + } + else if (strat.default_executor != nullptr) + { + strat.default_executor->spin(); + } + else + { + std::cout << "spin_exec got a executor_strat with null values!" << std::endl; + } +} + +int main(int argc, char **argv) +{ + // read parameters + rclcpp::init(argc, argv); + std::cout << "starting..." << std::endl; + auto node = rclcpp::Node::make_shared("experiment_parameters"); + node->declare_parameter("experiment_name"); + node->declare_parameter("count_max"); + node->declare_parameter("schedule_type"); + + auto parameters_client = std::make_shared(node); + // parameters_client->wait_for_service(); + const std::string schedule_type_str = parameters_client->get_parameter("schedule_type", std::string("deadline")); + std::cout << schedule_type_str << std::endl; + int COUNT_MAX = parameters_client->get_parameter("count_max", 500); + ExecutableScheduleType schedule_type = DEFAULT; + if (schedule_type_str == "deadline") + { + schedule_type = DEADLINE; + } + else if (schedule_type_str == "chain_priority") + { + schedule_type = CHAIN_AWARE_PRIORITY; + } + else + { + schedule_type = DEFAULT; + } + + // create executors + std::vector executors; + const int NUM_EXECUTORS = 8; + std::cout << "creating executors" << std::endl; + for (int i = 0; i < NUM_EXECUTORS; i++) + { + executor_strat executor; + if (schedule_type == DEFAULT) + { + executor.default_executor = std::make_shared(); + executor.default_executor->logger = create_logger(); + } + else + { + + executor.strat = std::make_shared>(); + rclcpp::ExecutorOptions options; + options.memory_strategy = executor.strat; + executor.strat->logger = create_logger(); + executor.strat->is_f1tenth = true; + + executor.executor = std::make_shared(options); + executor.executor->set_use_priorities(true); + } + executors.push_back(executor); + } + std::cout << "executors created" << std::endl; + + std::vector chain_lengths = {2, 4, 4, 3, 4, 2, 2, 2, 2, 2, 2, 2}; + std::vector> chain_member_ids = {{1, 2}, {1, 3, 4, 5}, {6, 7, 8, 9}, {10, 11, 12}, {13, 14, 15, 16}, {17, 18}, {19, 20}, {21, 22}, {23, 24}, {25, 26}, {27, 28}, {29, 30}}; + std::vector> chain_priorities = {{1, 0}, {5, 4, 3, 2, 1}, {9, 8, 7, 6}, {12, 11, 10}, {16, 15, 14, 13}, {18, 17}, {20, 19}, {22, 21}, {24, 23}, {26, 25}, {28, 27}, {30, 29}}; + // assignments for ROS and EDF + std::vector> node_executor_assignment = {{0, 0}, {0, 1, 1, 1}, {2, 2, 2, 2}, {3, 3, 3}, {0, 0, 0, 0}, {1, 1}, {4, 4}, {5, 5}, {6, 6}, {7, 7}, {4, 4}, {5, 5}}; + std::vector executor_cpu_assignment = {0, 1, 2, 3, 0, 1, 2, 3}; + std::vector node_runtimes = {2.3, 16.1, 2.3, 2.2, 18.4, 9.1, 23.1, 7.9, 14.2, 17.9, 20.6, 17.9, 6.6, 1.7, 11.0, 6.6, 7.9, 1.7, 195.9, 33.2, 2.2, 33.2, 2.2, 33.2, 2.2, 33.2, 2.2, 33.2, 2.2, 33.2, 2.2}; + std::cout << std::to_string(node_runtimes.size()) << std::endl; + std::vector chain_periods = {80, 80, 100, 100, 160, 1000, 120, 120, 120, 120, 120, 120}; + std::vector chain_deadlines = {80, 80, 100, 100, 160, 1000, 120, 120, 120, 120, 120, 120}; + + std::vector>> nodes; + std::vector> publishers; + std::vector> workers; + std::vector *> chain_deadlines_deque; + // create nodes and assign to executors + uint64_t current_node_id = 0; + for (uint chain_index = 0; chain_index < chain_lengths.size(); chain_index++) + { + std::cout << "making chain " << std::to_string(chain_index) << std::endl; + std::shared_ptr this_chain_timer_handle; + std::deque *this_chain_deadlines_deque = new std::deque(); + nodes.push_back(std::vector>()); + for (uint cb_index = 0; cb_index < chain_lengths[chain_index]; cb_index++) + { + std::cout << "making node " << std::to_string(current_node_id) << " with runtime " << node_runtimes[current_node_id] << std::endl; + + executor_strat this_executor = executors[node_executor_assignment[chain_index][cb_index] % NUM_EXECUTORS]; + if (cb_index == 0) + { + // this should be a timer + std::shared_ptr publisher_node; + if (chain_index == 1) + { + // special case, re-use timer from index 0 + publisher_node = std::static_pointer_cast(nodes[0][0]); + this_chain_timer_handle = publisher_node->timer_; + // current_node_id--; + } + else + { + publisher_node = std::make_shared("topic_" + std::to_string(chain_index), chain_index, chain_periods[chain_index], node_runtimes[current_node_id]); + publishers.push_back(publisher_node); + publisher_node->count_max = COUNT_MAX; + if (schedule_type == DEADLINE) + { + assert(this_executor.strat != nullptr); + auto timer_handle = publisher_node->timer_->get_timer_handle(); + assert(timer_handle != nullptr); + this_executor.strat->set_executable_deadline(publisher_node->timer_->get_timer_handle(), chain_deadlines[chain_index], TIMER, chain_index); + } + else if (schedule_type == CHAIN_AWARE_PRIORITY) + { + this_executor.strat->set_executable_priority(publisher_node->timer_->get_timer_handle(), chain_priorities[chain_index][cb_index], TIMER, CHAIN_AWARE_PRIORITY, chain_index); + } + + if (schedule_type == DEFAULT) + { + this_executor.default_executor->add_node(publisher_node); + PriorityExecutable e; + e.chain_id = chain_index; + e.timer_handle = publisher_node->timer_; + this_executor.default_executor->priority_map[publisher_node->timer_->get_timer_handle()] = e; + } + else + { + this_executor.strat->set_first_in_chain(publisher_node->timer_->get_timer_handle()); + this_executor.strat->assign_deadlines_queue(publisher_node->timer_->get_timer_handle(), this_chain_deadlines_deque); + this_chain_timer_handle = publisher_node->timer_; + this_executor.strat->get_priority_settings(publisher_node->timer_->get_timer_handle())->timer_handle = this_chain_timer_handle; + this_executor.executor->add_node(publisher_node); + } + } + nodes[chain_index].push_back(std::static_pointer_cast(publisher_node)); + } + else + { + // this is a worker node + std::shared_ptr sub_node; + + if (chain_index == 1 && cb_index == 1) + { + sub_node = std::make_shared("chain_" + std::to_string(chain_index) + "_worker_" + std::to_string(cb_index), node_runtimes[current_node_id], chain_index, cb_index, true); + } + else + { + sub_node = std::make_shared("chain_" + std::to_string(chain_index) + "_worker_" + std::to_string(cb_index), node_runtimes[current_node_id], chain_index, cb_index); + } + workers.push_back(sub_node); + if (schedule_type == DEADLINE) + { + this_executor.strat->set_executable_deadline(sub_node->subscription_->get_subscription_handle(), chain_deadlines[chain_index], SUBSCRIPTION, chain_index); + } + else if (schedule_type == CHAIN_AWARE_PRIORITY) + { + this_executor.strat->set_executable_priority(sub_node->subscription_->get_subscription_handle(), chain_priorities[chain_index][cb_index], SUBSCRIPTION, CHAIN_AWARE_PRIORITY, chain_index); + } + + if (schedule_type == DEFAULT) + { + this_executor.default_executor->add_node(sub_node); + } + else + { + this_executor.executor->add_node(sub_node); + this_executor.strat->assign_deadlines_queue(sub_node->subscription_->get_subscription_handle(), this_chain_deadlines_deque); + if (cb_index == chain_lengths[chain_index] - 1) + { + this_executor.strat->set_last_in_chain(sub_node->subscription_->get_subscription_handle()); + this_executor.strat->get_priority_settings(sub_node->subscription_->get_subscription_handle())->timer_handle = this_chain_timer_handle; + } + } + nodes[chain_index].push_back(std::static_pointer_cast(sub_node)); + } + current_node_id++; + } + chain_deadlines_deque.push_back(this_chain_deadlines_deque); + } + std::cout << "initialized nodes" << std::endl; + node_time_logger logger = create_logger(); + timespec current_time; + clock_gettime(CLOCK_MONOTONIC_RAW, ¤t_time); + uint64_t millis = (current_time.tv_sec * (uint64_t)1000) + (current_time.tv_nsec / 1000000); + for (uint chain_index = 0; chain_index < chain_lengths.size(); chain_index++) + { + if (schedule_type == DEADLINE) + { + + log_entry(logger, "deadline_" + std::to_string(chain_index) + "_" + std::to_string(millis + chain_deadlines[chain_index])); + } + log_entry(logger, "timer_" + std::to_string(chain_index) + "_release_" + std::to_string(millis)); + chain_deadlines_deque[chain_index]->push_back(millis + chain_deadlines[chain_index]); + // chain_deadlines_deque[chain_index]->push_back(0); + } + + std::vector threads; + // start each executor on it's own thread + for (int i = 0; i < NUM_EXECUTORS; i++) + { + executor_strat strat = executors[i]; + auto func = std::bind(spin_exec, strat, executor_cpu_assignment[i], i); + threads.emplace_back(func); + } + for (auto &thread : threads) + { + thread.join(); + } + rclcpp::shutdown(); + + std::ofstream output_file; + std::string suffix = "_rtis_alloc"; + if (schedule_type == DEADLINE) + { + output_file.open("experiments/results/f1tenth_full" + std::to_string(NUM_EXECUTORS) + "c" + suffix + ".txt"); + } + else if (schedule_type == CHAIN_AWARE_PRIORITY) + { + output_file.open("experiments/results/f1tenth_full_chain" + std::to_string(NUM_EXECUTORS) + "c" + suffix + ".txt"); + } + else + { + output_file.open("experiments/results/f1tenth_default" + std::to_string(NUM_EXECUTORS) + "c" + suffix + ".txt"); + } + + std::vector> combined_logs; + for (auto &publisher : publishers) + { + for (auto &log : *(publisher->logger_.recorded_times)) + { + combined_logs.push_back(log); + } + } + + for (auto &worker : workers) + { + for (auto &log : *(worker->logger_.recorded_times)) + { + combined_logs.push_back(log); + } + } + + if (schedule_type == DEFAULT) + { + for (auto &executor : executors) + { + for (auto &log : *(executor.default_executor->logger.recorded_times)) + { + combined_logs.push_back(log); + } + } + } + else + { + for (auto &executor : executors) + { + for (auto &log : *(executor.strat->logger.recorded_times)) + { + combined_logs.push_back(log); + } + } + } + + std::sort(combined_logs.begin(), combined_logs.end(), [](const std::pair &a, const std::pair &b) + { return a.second < b.second; }); + + for (auto p : combined_logs) + { + output_file << p.second << " " << p.first << std::endl; + } + output_file.close(); + std::cout<<"data written"< millis) + { + break; + } + for (j = 2; j < i; j++) + { + sum += j; + } + } + return get_thread_time(&currTime) - start_cpu_time; +} +ktimeunit get_thread_time(struct timespec *currTime) +{ + clockid_t threadClockId; + pthread_getcpuclockid(pthread_self(), &threadClockId); + clock_gettime(threadClockId, currTime); + return currTime->tv_nsec / 1000000.0 + currTime->tv_sec * 1000.0; +} diff --git a/src/priority_executor/src/priority_executor.cpp b/src/priority_executor/src/priority_executor.cpp new file mode 100644 index 0000000..a4d0973 --- /dev/null +++ b/src/priority_executor/src/priority_executor.cpp @@ -0,0 +1,339 @@ +// Copyright 2015 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "priority_executor/priority_executor.hpp" +#include "priority_executor/priority_memory_strategy.hpp" +#include "rclcpp/any_executable.hpp" +#include "rclcpp/scope_exit.hpp" +#include "simple_timer/rt-sched.hpp" +#include "rclcpp/utilities.hpp" +#include "rclcpp/exceptions.hpp" +#include +#include +namespace timed_executor +{ + + TimedExecutor::TimedExecutor(const rclcpp::ExecutorOptions &options, std::string name) + : rclcpp::Executor(options) + { + this->name = name; + } + + TimedExecutor::~TimedExecutor() {} + + void + TimedExecutor::spin() + { + if (spinning.exchange(true)) + { + throw std::runtime_error("spin() called while already spinning"); + } + RCLCPP_SCOPE_EXIT(this->spinning.store(false);); + while (rclcpp::ok(this->context_) && spinning.load()) + { + rclcpp::AnyExecutable any_executable; + // std::cout<number_of_ready_timers()<name << std::endl; + // size_t ready = memory_strategy_->number_of_ready_subscriptions(); + // std::cout << "ready:" << ready << std::endl; + + if (get_next_executable(any_executable)) + { + if (any_executable.subscription) + { + execute_subscription(any_executable); + } + else + { + execute_any_executable(any_executable); + } + } + } + std::cout << "shutdown" << std::endl; + } + + unsigned long long TimedExecutor::get_max_runtime(void) + { + return maxRuntime; + } + + void + TimedExecutor::execute_subscription(rclcpp::AnyExecutable executable) + { + rclcpp::SubscriptionBase::SharedPtr subscription = executable.subscription; + + rclcpp::MessageInfo message_info; + message_info.get_rmw_message_info().from_intra_process = false; + + if (subscription->is_serialized()) + { + // This is the case where a copy of the serialized message is taken from + // the middleware via inter-process communication. + + // if this should happen on another thread, we'd pass it to a thread here + std::shared_ptr serialized_msg = subscription->create_serialized_message(); + take_and_do_error_handling( + "taking a serialized message from topic", + subscription->get_topic_name(), + [&]() + { + auto result = subscription->take_serialized(*serialized_msg.get(), message_info); + // RCLCPP_INFO(rclcpp::get_logger(this->name), "at topic %s, serialized msg sent at %ld, and recieved at %ld", executable.node_base->get_name(), message_info.get_rmw_message_info().source_timestamp, message_info.get_rmw_message_info().received_timestamp); + return result; + }, + [&]() + { + auto void_serialized_msg = std::static_pointer_cast(serialized_msg); + subscription->handle_message(void_serialized_msg, message_info); + }); + subscription->return_serialized_message(serialized_msg); + } + else if (subscription->can_loan_messages()) + { + // This is the case where a loaned message is taken from the middleware via + // inter-process communication, given to the user for their callback, + // and then returned. + void *loaned_msg = nullptr; + // TODO(wjwwood): refactor this into methods on subscription when LoanedMessage + // is extened to support subscriptions as well. + take_and_do_error_handling( + "taking a loaned message from topic", + subscription->get_topic_name(), + [&]() + { + rcl_ret_t ret = rcl_take_loaned_message( + subscription->get_subscription_handle().get(), + &loaned_msg, + &message_info.get_rmw_message_info(), + nullptr); + if (RCL_RET_SUBSCRIPTION_TAKE_FAILED == ret) + { + return false; + } + else if (RCL_RET_OK != ret) + { + rclcpp::exceptions::throw_from_rcl_error(ret); + } + // RCLCPP_INFO(rclcpp::get_logger(this->name), "at topic %s, loaned msg sent at %ld, and recieved at %ld", executable.node_base->get_name(), message_info.get_rmw_message_info().source_timestamp, message_info.get_rmw_message_info().received_timestamp); + return true; + }, + [&]() + { subscription->handle_loaned_message(loaned_msg, message_info); }); + rcl_ret_t ret = rcl_return_loaned_message_from_subscription( + subscription->get_subscription_handle().get(), + loaned_msg); + if (RCL_RET_OK != ret) + { + RCLCPP_ERROR( + rclcpp::get_logger("rclcpp"), + "rcl_return_loaned_message_from_subscription() failed for subscription on topic '%s': %s", + subscription->get_topic_name(), rcl_get_error_string().str); + } + loaned_msg = nullptr; + } + else + { + // This case is taking a copy of the message data from the middleware via + // inter-process communication. + std::shared_ptr message = subscription->create_message(); + take_and_do_error_handling( + "taking a message from topic", + subscription->get_topic_name(), + [&]() + { + auto result = subscription->take_type_erased(message.get(), message_info); + // RCLCPP_INFO(rclcpp::get_logger(this->name), "at topic %s, IPC msg sent at %ld, and recieved at %ld", executable.node_base->get_name(), message_info.get_rmw_message_info().source_timestamp, message_info.get_rmw_message_info().received_timestamp); + return result; + }, + [&]() + { subscription->handle_message(message, message_info); }); + // this just deallocates + subscription->return_message(message); + } + } + bool TimedExecutor::get_next_executable(rclcpp::AnyExecutable &any_executable, std::chrono::nanoseconds timeout) + { + bool success = false; + // Check to see if there are any subscriptions or timers needing service + // TODO(wjwwood): improve run to run efficiency of this function + // sched_yield(); + wait_for_work(std::chrono::milliseconds(1)); + success = get_next_ready_executable(any_executable); + return success; + } + + // TODO: since we're calling this more often, clean it up a bit + void + TimedExecutor::wait_for_work(std::chrono::nanoseconds timeout) + { + { + std::unique_lock lock(memory_strategy_mutex_); + + // Collect the subscriptions and timers to be waited on + memory_strategy_->clear_handles(); + bool has_invalid_weak_nodes = memory_strategy_->collect_entities(weak_nodes_); + + // Clean up any invalid nodes, if they were detected + if (has_invalid_weak_nodes) + { + auto node_it = weak_nodes_.begin(); + auto gc_it = guard_conditions_.begin(); + while (node_it != weak_nodes_.end()) + { + if (node_it->expired()) + { + node_it = weak_nodes_.erase(node_it); + memory_strategy_->remove_guard_condition(*gc_it); + gc_it = guard_conditions_.erase(gc_it); + } + else + { + ++node_it; + ++gc_it; + } + } + } + // clear wait set + rcl_ret_t ret = rcl_wait_set_clear(&wait_set_); + if (ret != RCL_RET_OK) + { + rclcpp::exceptions::throw_from_rcl_error(ret, "Couldn't clear wait set"); + } + + // The size of waitables are accounted for in size of the other entities + ret = rcl_wait_set_resize( + &wait_set_, memory_strategy_->number_of_ready_subscriptions(), + memory_strategy_->number_of_guard_conditions(), memory_strategy_->number_of_ready_timers(), + memory_strategy_->number_of_ready_clients(), memory_strategy_->number_of_ready_services(), + memory_strategy_->number_of_ready_events()); + if (RCL_RET_OK != ret) + { + rclcpp::exceptions::throw_from_rcl_error(ret, "Couldn't resize the wait set"); + } + + if (!memory_strategy_->add_handles_to_wait_set(&wait_set_)) + { + throw std::runtime_error("Couldn't fill wait set"); + } + } + rcl_ret_t status = + rcl_wait(&wait_set_, std::chrono::duration_cast(timeout).count()); + if (status == RCL_RET_WAIT_SET_EMPTY) + { + RCUTILS_LOG_WARN_NAMED( + "rclcpp", + "empty wait set received in rcl_wait(). This should never happen."); + } + else if (status != RCL_RET_OK && status != RCL_RET_TIMEOUT) + { + using rclcpp::exceptions::throw_from_rcl_error; + throw_from_rcl_error(status, "rcl_wait() failed"); + } + + // check the null handles in the wait set and remove them from the handles in memory strategy + // for callback-based entities + memory_strategy_->remove_null_handles(&wait_set_); + } + bool + TimedExecutor::get_next_ready_executable(rclcpp::AnyExecutable &any_executable) + { + bool success = false; + if (use_priorities) + { + std::shared_ptr> strat = std::dynamic_pointer_cast>(memory_strategy_); + strat->get_next_executable(any_executable, weak_nodes_); + if (any_executable.timer || any_executable.subscription || any_executable.service || any_executable.client || any_executable.waitable) + { + success = true; + } + } + else + { + // Check the timers to see if there are any that are ready + memory_strategy_->get_next_timer(any_executable, weak_nodes_); + if (any_executable.timer) + { + std::cout << "got timer" << std::endl; + success = true; + } + if (!success) + { + // Check the subscriptions to see if there are any that are ready + memory_strategy_->get_next_subscription(any_executable, weak_nodes_); + if (any_executable.subscription) + { + // std::cout << "got subs" << std::endl; + success = true; + } + } + if (!success) + { + // Check the services to see if there are any that are ready + memory_strategy_->get_next_service(any_executable, weak_nodes_); + if (any_executable.service) + { + std::cout << "got serv" << std::endl; + success = true; + } + } + if (!success) + { + // Check the clients to see if there are any that are ready + memory_strategy_->get_next_client(any_executable, weak_nodes_); + if (any_executable.client) + { + std::cout << "got client" << std::endl; + success = true; + } + } + if (!success) + { + // Check the waitables to see if there are any that are ready + memory_strategy_->get_next_waitable(any_executable, weak_nodes_); + if (any_executable.waitable) + { + std::cout << "got wait" << std::endl; + success = true; + } + } + } + // At this point any_exec should be valid with either a valid subscription + // or a valid timer, or it should be a null shared_ptr + if (success) + { + // If it is valid, check to see if the group is mutually exclusive or + // not, then mark it accordingly + using rclcpp::callback_group::CallbackGroupType; + if ( + any_executable.callback_group && + any_executable.callback_group->type() == rclcpp::CallbackGroupType::MutuallyExclusive) + { + // It should not have been taken otherwise + assert(any_executable.callback_group->can_be_taken_from().load()); + // Set to false to indicate something is being run from this group + // This is reset to true either when the any_exec is executed or when the + // any_exec is destructued + any_executable.callback_group->can_be_taken_from().store(false); + } + } + // If there is no ready executable, return false + return success; + } + + void TimedExecutor::set_use_priorities(bool use_prio) + { + use_priorities = use_prio; + } + +} // namespace timed_executor diff --git a/src/priority_executor/src/test_nodes.cpp b/src/priority_executor/src/test_nodes.cpp new file mode 100644 index 0000000..9a91d82 --- /dev/null +++ b/src/priority_executor/src/test_nodes.cpp @@ -0,0 +1,81 @@ +#include "priority_executor/test_nodes.hpp" +#include "priority_executor/primes_workload.hpp" +#include "simple_timer/rt-sched.hpp" +#include +#include + +using namespace std::chrono_literals; +using std::placeholders::_1; +PublisherNode::PublisherNode(std::string publish_topic, int chain, int period, double runtime) + : Node("PublisherNode_" + publish_topic), count_(0) +{ + logger_ = create_logger(); + publisher_ = this->create_publisher(publish_topic, 1); + this->chain = chain; + this->runtime = runtime; + this->period = period; + std::cout << "creating timer " + << publish_topic << std::endl; + auto timer_callback = + [this]() -> void + { + if (this->count_ > this->count_max) + { + rclcpp::shutdown(); + return; + } + this->logger_.recorded_times->push_back(std::make_pair(std::string(this->get_name()) + "_publish_" + std::to_string(this->count_), get_time_us())); + this->logger_.recorded_times->push_back(std::make_pair("chain_" + std::to_string(this->chain) + "_worker_0_recv_MESSAGE" + std::to_string(this->count_), get_time_us())); + + double result = nth_prime_silly(100000, this->runtime); + + this->logger_.recorded_times->push_back(std::make_pair("chain_" + std::to_string(this->chain) + "_worker_0_processed_MESSAGE" + std::to_string(this->count_), get_time_us())); + auto message = std_msgs::msg::String(); + message.data = "MESSAGE" + std::to_string(this->count_++); + this->publisher_->publish(message); + + // RCLCPP_INFO(this->get_logger(), "I did work on: '%s', taking %lf ms", message.data.c_str(), result); + // usleep(600 * 1000); + }; + timer_ = this->create_wall_timer(std::chrono::milliseconds(period), timer_callback); +} + +DummyWorker::DummyWorker(const std::string &name, double runtime, int chain, int number, bool is_multichain) + : Node(name) +{ + this->runtime = runtime; + this->number = number; + this->chain = chain; + this->logger_ = create_logger(); + std::cout << "creating dummy worker " + << name << std::endl; + if (is_multichain) + { + subscription_ = this->create_subscription( + "topic_" + std::to_string(chain - 1), 1, std::bind(&DummyWorker::topic_callback, this, _1)); + } + else if (number == 1) + { + subscription_ = this->create_subscription( + "topic_" + std::to_string(chain), 1, std::bind(&DummyWorker::topic_callback, this, _1)); + } + else + { + subscription_ = this->create_subscription( + "chain_" + std::to_string(chain) + "_topic_" + std::to_string(number), 1, std::bind(&DummyWorker::topic_callback, this, _1)); + } + this->publisher_ = this->create_publisher("chain_" + std::to_string(chain) + "_topic_" + std::to_string(number + 1), 1); +} +void DummyWorker::topic_callback(const std_msgs::msg::String::SharedPtr msg) const +{ + this->logger_.recorded_times->push_back(std::make_pair(std::string(this->get_name()) + "_recv_" + msg->data, get_time_us())); + + double result = nth_prime_silly(100000, runtime); + + this->logger_.recorded_times->push_back(std::make_pair(std::string(this->get_name()) + "_processed_" + msg->data, get_time_us())); + // RCLCPP_INFO(this->get_logger(), "I did work on: '%s', taking %lf ms", msg->data.c_str(), result); + auto message = std_msgs::msg::String(); + message.data = msg->data; + this->publisher_->publish(message); + // usleep(600 * 1000); +} diff --git a/src/priority_executor/src/test_publisher.cpp b/src/priority_executor/src/test_publisher.cpp new file mode 100644 index 0000000..5e2cf03 --- /dev/null +++ b/src/priority_executor/src/test_publisher.cpp @@ -0,0 +1,298 @@ +#include "rclcpp/rclcpp.hpp" +#include "priority_executor/priority_executor.hpp" +#include "priority_executor/priority_memory_strategy.hpp" +#include "priority_executor/test_nodes.hpp" +#include +#include +#include "simple_timer/rt-sched.hpp" +#include "priority_executor/default_executor.hpp" +#include + +// clock_t times(struct tms *buf); + +std::vector get_parameter_array(std::shared_ptr node, std::string name, std::vector default_val) +{ + rclcpp::Parameter param_result(name, default_val); + node->get_parameter_or(name, param_result, param_result); + return param_result.as_integer_array(); +} + +int main(int argc, char **argv) +{ + cpu_set_t cpuset; + CPU_ZERO(&cpuset); + CPU_SET(0, &cpuset); + + pthread_t current_thread = pthread_self(); + if (pthread_setaffinity_np(current_thread, sizeof(cpu_set_t), &cpuset)) + { + std::cout << "problem setting cpu core" << std::endl; + } + sched_param sch_params; + sch_params.sched_priority = 98; + if (pthread_setschedparam(pthread_self(), SCHED_FIFO, &sch_params)) + { + RCLCPP_INFO(rclcpp::get_logger("rclcpp"), "spin_rt thread has an error."); + } + rclcpp::init(argc, argv); + + // https://design.ros2.org/articles/ros_command_line_arguments.html#multiple-parameter-assignments + auto node = rclcpp::Node::make_shared("experiment_parameters"); + node->declare_parameter("experiment_name"); + node->declare_parameter("count_max"); + node->declare_parameter("schedule_type"); + + node->declare_parameter("chain_lengths"); + node->declare_parameter("chain_periods"); + node->declare_parameter("chain_deadlines"); + node->declare_parameter("chain_runtimes"); + node->declare_parameter("chain_priorities"); + node->declare_parameter("chain_timer_runtimes"); + + auto parameters_client = std::make_shared(node); + parameters_client->wait_for_service(); + const std::string schedule_type_str = parameters_client->get_parameter("schedule_type", std::string("deadline")); + std::cout << schedule_type_str << std::endl; + ExecutableScheduleType schedule_type = DEFAULT; + if (schedule_type_str == "deadline") + { + schedule_type = DEADLINE; + } + else if (schedule_type_str == "chain_priority") + { + schedule_type = CHAIN_AWARE_PRIORITY; + } + else + { + schedule_type = DEFAULT; + } + + const std::vector chain_lengths = get_parameter_array(node, "chain_lengths", std::vector({3, 7})); + + const std::vector chain_periods = get_parameter_array(node, "chain_periods", std::vector({1000, 1000})); + const std::vector chain_deadlines = get_parameter_array(node, "chain_deadlines", std::vector({1000, 1000})); + const std::vector chain_runtimes = get_parameter_array(node, "chain_runtimes", std::vector({131, 131})); + const std::vector chain_timer_runtimes = get_parameter_array(node, "chain_timer_runtimes", std::vector({109, 109})); + + const std::vector chain_priorities = get_parameter_array(node, "chain_priorities", std::vector({1, 2})); + const uint NUM_CHAINS = chain_lengths.size(); + if (chain_lengths.size() > chain_periods.size()) + { + std::cout << "chain_periods shorter than chain_lengths" << std::endl; + exit(-1); + } + + if (chain_lengths.size() > chain_runtimes.size()) + { + std::cout << "chain_runtimes shorter than chain_lengths" << std::endl; + exit(-1); + } + if (chain_lengths.size() > chain_timer_runtimes.size()) + { + std::cout << "chain_timer_runtimes shorter than chain_lengths" << std::endl; + exit(-1); + } + if (schedule_type == DEADLINE) + { + if (chain_lengths.size() > chain_deadlines.size()) + { + std::cout << "chain_deadlines shorter than chain_lengths" << std::endl; + exit(-1); + } + } + else if (schedule_type == CHAIN_AWARE_PRIORITY) + { + if (chain_lengths.size() > chain_priorities.size()) + { + std::cout << "chain_priorities shorter than chain_lengths" << std::endl; + exit(-1); + } + } + + const uint COUNT_MAX = parameters_client->get_parameter("count_max", 20); + const std::string experiment_name = parameters_client->get_parameter("experiment_name", std::string("unnamed_experiment")); + + rclcpp::ExecutorOptions options; + // use a modified memorystrategy + std::shared_ptr> strat = std::make_shared>(); + strat->logger = create_logger(); + // publisher + options.memory_strategy = strat; + rclcpp::Executor *sub1_executor = nullptr; + + ROSDefaultExecutor *default_executor = nullptr; + if (schedule_type != DEFAULT) + { + timed_executor::TimedExecutor *rtis_executor = new timed_executor::TimedExecutor(options, "short_executor"); + rtis_executor->set_use_priorities(true); + sub1_executor = rtis_executor; + } + else if (schedule_type == DEFAULT) + { + default_executor = new ROSDefaultExecutor(); + default_executor->logger = create_logger(); + } + // stock ROS executor + // rclcpp::executors::SingleThreadedExecutor sub1_executor; + std::vector> workers; + std::vector *> chain_deadlines_deque; + std::vector> timers; + // TODO: make the chain layout configurable via rosparam + for (uint chain_index = 0; chain_index < NUM_CHAINS; chain_index++) + { + std::shared_ptr this_chain_timer_handle; + std::deque *this_chain_deadlines_deque = new std::deque(); + for (int cb_index = 0; cb_index < chain_lengths[chain_index]; cb_index++) + { + int total_prio = 0; + int this_chain_prio = 0; + if (schedule_type == CHAIN_AWARE_PRIORITY) + { + this_chain_prio = chain_priorities[chain_index]; + total_prio = chain_lengths[chain_index]; + for (uint eval_chain = 0; eval_chain < NUM_CHAINS; eval_chain++) + { + if (eval_chain == chain_index) + { + continue; + } + if (chain_priorities[eval_chain] < this_chain_prio) + { + total_prio += chain_lengths[eval_chain] * chain_priorities[eval_chain]; + } + } + } + if (cb_index == 0) + { + std::shared_ptr pubnode = std::make_shared("topic_" + std::to_string(chain_index), chain_index, chain_periods[chain_index], chain_timer_runtimes[chain_index]); + pubnode->count_max = COUNT_MAX; + if (schedule_type == DEADLINE) + { + strat->set_executable_deadline(pubnode->timer_->get_timer_handle(), chain_deadlines[chain_index], TIMER, chain_index); + } + else if (schedule_type == CHAIN_AWARE_PRIORITY) + { + std::cout << "creating prio timer on chain " << std::to_string(chain_index) << " with prio " << std::to_string(chain_index) << std::endl; + strat->set_executable_priority(pubnode->timer_->get_timer_handle(), chain_index, TIMER, CHAIN_AWARE_PRIORITY, chain_index); + } + if (schedule_type != DEFAULT) + { + strat->set_first_in_chain(pubnode->timer_->get_timer_handle()); + strat->assign_deadlines_queue(pubnode->timer_->get_timer_handle(), this_chain_deadlines_deque); + strat->get_priority_settings(pubnode->timer_->get_timer_handle())->timer_handle = pubnode->timer_; + this_chain_timer_handle = pubnode->timer_; + } + if (schedule_type == DEFAULT) + { + PriorityExecutable e; + e.chain_id = chain_index; + e.timer_handle = pubnode->timer_; + default_executor->priority_map[pubnode->timer_->get_timer_handle()] = e; + default_executor->add_node(pubnode); + } + else + { + sub1_executor->add_node(pubnode); + } + timers.push_back(pubnode); + } + else + { + auto sub1node = std::make_shared("chain_" + std::to_string(chain_index) + "_worker_" + std::to_string(cb_index), chain_runtimes[chain_index], chain_index, cb_index); + if (schedule_type == DEFAULT) + { + default_executor->add_node(sub1node); + } + else + { + sub1_executor->add_node(sub1node); + } + if (schedule_type == DEADLINE) + { + strat->set_executable_deadline(sub1node->subscription_->get_subscription_handle(), chain_deadlines[chain_index], SUBSCRIPTION, chain_index); + } + else if (schedule_type == CHAIN_AWARE_PRIORITY) + { + std::cout << "creating prio cb with prio " << std::to_string(chain_index) << std::endl; + strat->set_executable_priority(sub1node->subscription_->get_subscription_handle(), (chain_index), SUBSCRIPTION, CHAIN_AWARE_PRIORITY, chain_index); + } + if (schedule_type != DEFAULT) + { + strat->assign_deadlines_queue(sub1node->subscription_->get_subscription_handle(), this_chain_deadlines_deque); + if (cb_index == chain_lengths[chain_index] - 1) + { + strat->set_last_in_chain(sub1node->subscription_->get_subscription_handle()); + strat->get_priority_settings(sub1node->subscription_->get_subscription_handle())->timer_handle = this_chain_timer_handle; + } + } + workers.push_back(sub1node); + } + } + chain_deadlines_deque.push_back(this_chain_deadlines_deque); + } + std::cout << "initialized nodes" << std::endl; + node_time_logger logger = create_logger(); + // initialize first deadlines + timespec current_time; + clock_gettime(CLOCK_MONOTONIC_RAW, ¤t_time); + uint64_t millis = (current_time.tv_sec * (uint64_t)1000) + (current_time.tv_nsec / 1000000); + for (uint chain_index = 0; chain_index < NUM_CHAINS; chain_index++) + { + log_entry(logger, "deadline_" + std::to_string(chain_index) + "_" + std::to_string(millis + chain_deadlines[chain_index])); + log_entry(logger, "timer_" + std::to_string(chain_index) + "_release_" + std::to_string(millis)); + chain_deadlines_deque[chain_index]->push_back(millis + chain_deadlines[chain_index]); + // chain_deadlines_deque[chain_index]->push_back(0); + } + if (schedule_type == DEFAULT) + { + default_executor->spin(); + } + else + { + sub1_executor->spin(); + } + rclcpp::shutdown(); + + // combine logs from all chains + std::vector> combined_logs; + for (auto &worker : workers) + { + for (auto &log : *(worker->logger_.recorded_times)) + { + combined_logs.push_back(log); + } + } + for (auto &timer : timers) + { + for (auto &log : *(timer->logger_.recorded_times)) + { + combined_logs.push_back(log); + } + } + // add logs from strategy + for (auto &log : *(strat->logger.recorded_times)) + { + combined_logs.push_back(log); + } + + // if using the "default" executor, grab those logs too + if (schedule_type == DEFAULT) + { + for (auto &log : *(default_executor->logger.recorded_times)) + { + combined_logs.push_back(log); + } + } + + // sort logs + std::sort(combined_logs.begin(), combined_logs.end(), [](const std::pair &a, const std::pair &b) + { return a.second < b.second; }); + + std::ofstream output_file; + output_file.open("experiments/results/" + experiment_name + ".txt"); + for (auto p : combined_logs) + { + output_file << p.second << " " << p.first << std::endl; + } + output_file.close(); +} diff --git a/src/simple_timer/CMakeLists.txt b/src/simple_timer/CMakeLists.txt new file mode 100644 index 0000000..2bcb70d --- /dev/null +++ b/src/simple_timer/CMakeLists.txt @@ -0,0 +1,61 @@ +cmake_minimum_required(VERSION 3.5) +project(simple_timer) + +# Default to C99 +if(NOT CMAKE_C_STANDARD) + set(CMAKE_C_STANDARD 99) +endif() + +# Default to C++14 +if(NOT CMAKE_CXX_STANDARD) + set(CMAKE_CXX_STANDARD 14) +endif() + +if(CMAKE_COMPILER_IS_GNUCXX OR CMAKE_CXX_COMPILER_ID MATCHES "Clang") + add_compile_options(-Wall -Wextra -Wpedantic) +endif() + +# find dependencies +find_package(ament_cmake REQUIRED) +# uncomment the following section in order to fill in +# further dependencies manually. +# find_package( REQUIRED) + +add_library(simple_timer SHARED src/cycle_timer.cpp src/period_timer.cpp) +target_include_directories(simple_timer PUBLIC + $ + $ +) +add_library(rt-sched src/rt-sched.cpp) +target_include_directories(rt-sched PUBLIC + $ + $ +) + +if(BUILD_TESTING) + find_package(ament_lint_auto REQUIRED) + # the following line skips the linter which checks for copyrights + # uncomment the line when a copyright and license is not present in all source files + #set(ament_cmake_copyright_FOUND TRUE) + # the following line skips cpplint (only works in a git repo) + # uncomment the line when this package is not in a git repo + #set(ament_cmake_cpplint_FOUND TRUE) + ament_lint_auto_find_test_dependencies() +endif() + +install( + DIRECTORY include/ + DESTINATION include +) + +install( + TARGETS simple_timer rt-sched + LIBRARY DESTINATION lib + ARCHIVE DESTINATION lib + RUNTIME DESTINATION bin +) + +ament_export_include_directories(include) +ament_export_libraries(simple_timer) +ament_export_libraries(rt-sched) +ament_package() diff --git a/src/simple_timer/include/simple_timer/cycle_timer.hpp b/src/simple_timer/include/simple_timer/cycle_timer.hpp new file mode 100644 index 0000000..556c8d4 --- /dev/null +++ b/src/simple_timer/include/simple_timer/cycle_timer.hpp @@ -0,0 +1,23 @@ +#ifndef __CYCLE_TIMER__ +#define __CYCLE_TIMER__ + +#include +#include "simple_timer/rt-sched.hpp" +namespace simple_timer +{ + class CycleTimer + { + public: + CycleTimer(long start_delay=0); + void tick() ; + const u64 start_delay_time; + u64 start_time = 0; + u64 last_cycle_time = 0; + unsigned long max_diff = 0; + unsigned long min_diff = 0; + unsigned long last_diff = 0; + bool recording = false; + }; +} // namespace simple_timer + +#endif \ No newline at end of file diff --git a/src/simple_timer/include/simple_timer/period_timer.hpp b/src/simple_timer/include/simple_timer/period_timer.hpp new file mode 100644 index 0000000..bdc909c --- /dev/null +++ b/src/simple_timer/include/simple_timer/period_timer.hpp @@ -0,0 +1,25 @@ +#ifndef __PERIOD_TIMER__ +#define __PERIOD_TIMER__ + +#include +#include "simple_timer/rt-sched.hpp" +namespace simple_timer +{ + class PeriodTimer + { + public: + PeriodTimer(long start_delay = 0); + void start(); + void stop(); + const u64 start_delay_time; + u64 start_time = 0; + + u64 last_period_time = 0; + unsigned long max_period = 0; + unsigned long min_period = 0; + unsigned long last_period = 0; + bool recording = false; + }; +} // namespace simple_timer + +#endif \ No newline at end of file diff --git a/src/simple_timer/include/simple_timer/rt-sched.hpp b/src/simple_timer/include/simple_timer/rt-sched.hpp new file mode 100644 index 0000000..4f035b2 --- /dev/null +++ b/src/simple_timer/include/simple_timer/rt-sched.hpp @@ -0,0 +1,98 @@ +// SPDX-License-Identifier: GPL-2.0-or-later +/* + * rt-sched.h - sched_setattr() and sched_getattr() API + * (C) Dario Faggioli , 2009, 2010 + * Copyright (C) 2014 BMW Car IT GmbH, Daniel Wagner +#include +#include +#include +#include + +#ifndef SCHED_DEADLINE +#define SCHED_DEADLINE 6 +#endif + +#ifdef __x86_64__ +#define __NR_sched_setattr 314 +#define __NR_sched_getattr 315 +#endif + +#ifdef __i386__ +#define __NR_sched_setattr 351 +#define __NR_sched_getattr 352 +#endif + +#ifdef __arm__ +#ifndef __NR_sched_setattr +#define __NR_sched_setattr 380 +#endif +#ifndef __NR_sched_getattr +#define __NR_sched_getattr 381 +#endif +#endif + +#ifdef __tilegx__ +#define __NR_sched_setattr 274 +#define __NR_sched_getattr 275 +#endif + +typedef unsigned long long u64; +#define NS_TO_MS 1000000 +struct sched_attr { + uint32_t size; + uint32_t sched_policy; + uint64_t sched_flags; + + /* SCHED_NORMAL, SCHED_BATCH */ + int32_t sched_nice; + + /* SCHED_FIFO, SCHED_RR */ + uint32_t sched_priority; + + /* SCHED_DEADLINE */ + uint64_t sched_runtime; + uint64_t sched_deadline; + uint64_t sched_period; +}; + +int sched_setattr(pid_t pid, + const struct sched_attr *attr, + unsigned int flags); + +int sched_getattr(pid_t pid, + struct sched_attr *attr, + unsigned int size, + unsigned int flags); + +u64 get_time_us(void); + +typedef struct node_time_logger +{ + std::shared_ptr>> recorded_times; +} node_time_logger; + +void log_entry(node_time_logger logger, std::string text); +node_time_logger create_logger(); + +inline u64 get_time_us(void) +{ + struct timespec ts; + unsigned long long time; + + clock_gettime(CLOCK_MONOTONIC_RAW, &ts); + time = ts.tv_sec * 1000000; + time += ts.tv_nsec / 1000; + + return time; +} + +#endif /* __RT_SCHED_H__ */ diff --git a/src/simple_timer/package.xml b/src/simple_timer/package.xml new file mode 100644 index 0000000..f009c67 --- /dev/null +++ b/src/simple_timer/package.xml @@ -0,0 +1,18 @@ + + + + simple_timer + 0.0.0 + TODO: Package description + nvidia + TODO: License declaration + + ament_cmake + + ament_lint_auto + ament_lint_common + + + ament_cmake + + diff --git a/src/simple_timer/src/cycle_timer.cpp b/src/simple_timer/src/cycle_timer.cpp new file mode 100644 index 0000000..143f13b --- /dev/null +++ b/src/simple_timer/src/cycle_timer.cpp @@ -0,0 +1,43 @@ +#include "simple_timer/cycle_timer.hpp" + +namespace simple_timer +{ + CycleTimer::CycleTimer(long start_delay) : start_delay_time(start_delay * 1000) + { + } + + void CycleTimer::tick() + { + u64 current_wall_time = get_time_us(); + u64 time_diff = 0; + + if (!recording) + { + + if (start_time == 0) + { + start_time = current_wall_time; + } + else if (current_wall_time - start_time > start_delay_time) + { + recording = true; + last_cycle_time = current_wall_time; + start_time = current_wall_time; + } + } + else + { + time_diff = current_wall_time - last_cycle_time; + if (time_diff < min_diff || min_diff == 0) + { + min_diff = time_diff; + } + if (time_diff > max_diff || max_diff == 0) + { + max_diff = time_diff; + } + last_cycle_time = current_wall_time; + last_diff = time_diff; + } + } +} // namespace simple_timer diff --git a/src/simple_timer/src/period_timer.cpp b/src/simple_timer/src/period_timer.cpp new file mode 100644 index 0000000..9c9a9ce --- /dev/null +++ b/src/simple_timer/src/period_timer.cpp @@ -0,0 +1,56 @@ + +#include "simple_timer/period_timer.hpp" + +namespace simple_timer +{ + PeriodTimer::PeriodTimer(long start_delay) : start_delay_time(start_delay * 1000) + { + } + + void PeriodTimer::start() + { + u64 current_wall_time = get_time_us(); + + if (!recording) + { + + if (start_time == 0) + { + start_time = current_wall_time; + } + else if (current_wall_time - start_time > start_delay_time) + { + recording = true; + start_time = current_wall_time; + last_period_time = current_wall_time; + } + } + else + { + last_period_time = current_wall_time; + } + } + void PeriodTimer::stop() + { + u64 current_wall_time = get_time_us(); + u64 time_diff = 0; + + if (!recording) + { + return; + } + else + { + time_diff = current_wall_time - last_period_time; + if (time_diff < min_period || min_period == 0) + { + min_period = time_diff; + } + if (time_diff > max_period || max_period == 0) + { + max_period = time_diff; + } + last_period = time_diff; + } + } +} // namespace simple_timer diff --git a/src/simple_timer/src/rt-sched.cpp b/src/simple_timer/src/rt-sched.cpp new file mode 100644 index 0000000..a99cf22 --- /dev/null +++ b/src/simple_timer/src/rt-sched.cpp @@ -0,0 +1,46 @@ +// SPDX-License-Identifier: GPL-2.0-or-later +/* + * rt-sched.h - sched_setattr() and sched_getattr() API + * + * (C) Dario Faggioli , 2009, 2010 + * Copyright (C) 2014 BMW Car IT GmbH, Daniel Wagner +#include +#include + +#include "simple_timer/rt-sched.hpp" + +int sched_setattr(pid_t pid, + const struct sched_attr *attr, + unsigned int flags) +{ + return syscall(__NR_sched_setattr, pid, attr, flags); +} + +int sched_getattr(pid_t pid, + struct sched_attr *attr, + unsigned int size, + unsigned int flags) +{ + return syscall(__NR_sched_getattr, pid, attr, size, flags); +} + +void log_entry(node_time_logger logger, std::string text) +{ + if (logger.recorded_times != nullptr) + { + logger.recorded_times->push_back(std::make_pair(text, get_time_us())); + } +} + +node_time_logger create_logger() +{ + node_time_logger logger; + logger.recorded_times = std::make_shared>>(); + return logger; +} \ No newline at end of file