Compare commits

...
Sign in to create a new pull request.

1 commit

Author SHA1 Message Date
9fea6a3950 wip - likely stupid 2025-04-01 16:03:45 +02:00
19 changed files with 1410 additions and 1471 deletions

View file

@ -1,36 +1,34 @@
cmake_minimum_required(VERSION 3.5) cmake_minimum_required(VERSION 3.8)
project(priority_executor) project(priority_executor VERSION 0.1.0)
# Default to C99 # Set C++ standards
if(NOT CMAKE_C_STANDARD) set(CMAKE_CXX_STANDARD 17)
set(CMAKE_C_STANDARD 99) set(CMAKE_CXX_STANDARD_REQUIRED ON)
endif() set(CMAKE_CXX_EXTENSIONS OFF)
# Default to C++14
if(NOT CMAKE_CXX_STANDARD)
set(CMAKE_CXX_STANDARD 14)
endif()
# Compiler options
if(CMAKE_COMPILER_IS_GNUCXX OR CMAKE_CXX_COMPILER_ID MATCHES "Clang") if(CMAKE_COMPILER_IS_GNUCXX OR CMAKE_CXX_COMPILER_ID MATCHES "Clang")
add_compile_options(-Wall -Wextra -Wpedantic) add_compile_options(-Wall -Wextra -Wpedantic)
endif() endif()
# find dependencies # Find dependencies
find_package(ament_cmake REQUIRED) find_package(ament_cmake REQUIRED)
find_package(rclcpp REQUIRED) find_package(rclcpp REQUIRED)
find_package(rcl REQUIRED) find_package(rcl REQUIRED)
find_package(rmw REQUIRED)
find_package(std_msgs REQUIRED) find_package(std_msgs REQUIRED)
find_package(std_srvs REQUIRED) find_package(std_srvs REQUIRED)
find_package(simple_timer REQUIRED) find_package(simple_timer REQUIRED)
add_library(priority_executor src/priority_executor.cpp src/priority_memory_strategy.cpp) # Library targets
add_library(priority_executor
src/priority_executor.cpp
src/priority_memory_strategy.cpp
)
target_include_directories(priority_executor PUBLIC target_include_directories(priority_executor PUBLIC
$<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/include> $<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/include>
$<INSTALL_INTERFACE:include> $<INSTALL_INTERFACE:include>
) )
# target_link_libraries(priority_executor
# simple_timer
# )
ament_target_dependencies(priority_executor ament_target_dependencies(priority_executor
rmw rmw
rclcpp rclcpp
@ -38,7 +36,10 @@ ament_target_dependencies(priority_executor
simple_timer simple_timer
) )
add_library(multithread_priority_executor src/multithread_priority_executor.cpp src/priority_memory_strategy.cpp) add_library(multithread_priority_executor
src/multithread_priority_executor.cpp
src/priority_memory_strategy.cpp
)
target_include_directories(multithread_priority_executor PUBLIC target_include_directories(multithread_priority_executor PUBLIC
$<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/include> $<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/include>
$<INSTALL_INTERFACE:include> $<INSTALL_INTERFACE:include>
@ -50,7 +51,9 @@ ament_target_dependencies(multithread_priority_executor
simple_timer simple_timer
) )
add_library(default_executor src/default_executor.cpp) add_library(default_executor
src/default_executor.cpp
)
target_include_directories(default_executor PUBLIC target_include_directories(default_executor PUBLIC
$<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/include> $<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/include>
$<INSTALL_INTERFACE:include> $<INSTALL_INTERFACE:include>
@ -62,48 +65,54 @@ ament_target_dependencies(default_executor
simple_timer simple_timer
) )
add_executable(usage_example src/usage_example.cpp) # Example executable
add_executable(usage_example
src/usage_example.cpp
)
target_include_directories(usage_example PUBLIC target_include_directories(usage_example PUBLIC
$<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/include> $<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/include>
$<INSTALL_INTERFACE:include>) $<INSTALL_INTERFACE:include>
)
target_link_libraries(usage_example target_link_libraries(usage_example
priority_executor priority_executor
multithread_priority_executor
default_executor
) )
ament_target_dependencies(usage_example ament_target_dependencies(usage_example
rclcpp rclcpp
std_msgs std_msgs
std_srvs std_srvs
simple_timer
) )
# Testing
if(BUILD_TESTING)
find_package(ament_lint_auto REQUIRED)
ament_lint_auto_find_test_dependencies()
endif()
# Installation
install( install(
DIRECTORY include/ DIRECTORY include/
DESTINATION include DESTINATION include
) )
install(TARGETS usage_example install(
DESTINATION lib/${PROJECT_NAME} TARGETS priority_executor multithread_priority_executor default_executor
) EXPORT export_${PROJECT_NAME}
install(TARGETS priority_executor multithread_priority_executor default_executor
ARCHIVE DESTINATION lib ARCHIVE DESTINATION lib
LIBRARY DESTINATION lib LIBRARY DESTINATION lib
RUNTIME DESTINATION bin RUNTIME DESTINATION bin
INCLUDES DESTINATION include
) )
if(BUILD_TESTING)
find_package(ament_lint_auto REQUIRED)
# the following line skips the linter which checks for copyrights
# uncomment the line when a copyright and license is not present in all source files
#set(ament_cmake_copyright_FOUND TRUE)
# the following line skips cpplint (only works in a git repo)
# uncomment the line when this package is not in a git repo
#set(ament_cmake_cpplint_FOUND TRUE)
ament_lint_auto_find_test_dependencies()
endif()
install(
TARGETS usage_example
RUNTIME DESTINATION lib/${PROJECT_NAME}
)
# Export and package configuration
ament_export_include_directories(include) ament_export_include_directories(include)
ament_export_libraries(priority_executor) ament_export_targets(export_${PROJECT_NAME} HAS_LIBRARY_TARGET)
ament_export_libraries(multithread_priority_executor) ament_export_dependencies(rclcpp rcl rmw simple_timer)
ament_export_libraries(default_executor)
ament_export_dependencies(rclcpp rcl simple_timer)
ament_package() ament_package()

View file

@ -12,8 +12,8 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
#ifndef RTIS_DEFAULT_EXECUTOR #ifndef PRIORITY_EXECUTOR__DEFAULT_EXECUTOR_HPP_
#define RTIS_DEFAULT_EXECUTOR #define PRIORITY_EXECUTOR__DEFAULT_EXECUTOR_HPP_
#include <rmw/rmw.h> #include <rmw/rmw.h>
@ -21,6 +21,7 @@
#include <cstdlib> #include <cstdlib>
#include <memory> #include <memory>
#include <set> #include <set>
#include <unordered_map>
#include "rclcpp/executor.hpp" #include "rclcpp/executor.hpp"
#include "rclcpp/macros.hpp" #include "rclcpp/macros.hpp"
@ -28,73 +29,78 @@
#include "rclcpp/detail/mutex_two_priorities.hpp" #include "rclcpp/detail/mutex_two_priorities.hpp"
#include "simple_timer/rt-sched.hpp" #include "simple_timer/rt-sched.hpp"
class RTISTimed namespace priority_executor {
{
class RTISTimed {
public: public:
node_time_logger logger_; node_time_logger logger_;
}; };
class ROSDefaultMultithreadedExecutor : public rclcpp::Executor, public RTISTimed class ROSDefaultMultithreadedExecutor : public rclcpp::Executor, public RTISTimed {
{
public: public:
RCLCPP_PUBLIC RCLCPP_SMART_PTR_DEFINITIONS(ROSDefaultMultithreadedExecutor)
explicit ROSDefaultMultithreadedExecutor(
const rclcpp::ExecutorOptions &options = rclcpp::ExecutorOptions(), int number_of_threads = 2, std::chrono::nanoseconds next_exec_timeout = std::chrono::nanoseconds(-1));
RCLCPP_PUBLIC size_t get_number_of_threads(); explicit ROSDefaultMultithreadedExecutor(
RCLCPP_PUBLIC void spin() override; rclcpp::ExecutorOptions const& options = rclcpp::ExecutorOptions(),
bool get_next_executable(rclcpp::AnyExecutable &any_executable, std::chrono::nanoseconds timeout = std::chrono::nanoseconds(-1)); int number_of_threads = 2,
std::chrono::nanoseconds next_exec_timeout = std::chrono::nanoseconds(-1));
size_t get_number_of_threads();
void spin() override;
bool get_next_executable(rclcpp::AnyExecutable& any_executable,
std::chrono::nanoseconds timeout = std::chrono::nanoseconds(-1));
protected: protected:
RCLCPP_PUBLIC void run(size_t thread_number); void run(size_t thread_number);
private: private:
size_t number_of_threads_; RCLCPP_DISABLE_COPY(ROSDefaultMultithreadedExecutor)
std::set<rclcpp::TimerBase::SharedPtr> scheduled_timers_;
static std::unordered_map<ROSDefaultMultithreadedExecutor *, size_t number_of_threads_;
std::shared_ptr<rclcpp::detail::MutexTwoPriorities>> std::set<rclcpp::TimerBase::SharedPtr> scheduled_timers_;
wait_mutex_set_; std::chrono::nanoseconds next_exec_timeout_{std::chrono::nanoseconds(-1)};
static std::mutex shared_wait_mutex_;
std::chrono::nanoseconds next_exec_timeout_ = std::chrono::nanoseconds(-1); static std::unordered_map<ROSDefaultMultithreadedExecutor*,
std::shared_ptr<rclcpp::detail::MutexTwoPriorities>>
wait_mutex_set_;
static std::mutex shared_wait_mutex_;
}; };
/// Single-threaded executor implementation. /// Single-threaded executor implementation.
/** /**
* This is the default executor created by rclcpp::spin. * This is the default executor created by rclcpp::spin.
*/ */
class ROSDefaultExecutor : public rclcpp::Executor, public RTISTimed class ROSDefaultExecutor : public rclcpp::Executor, public RTISTimed {
{
public: public:
RCLCPP_SMART_PTR_DEFINITIONS(ROSDefaultExecutor) RCLCPP_SMART_PTR_DEFINITIONS(ROSDefaultExecutor)
/// Default constructor. See the default constructor for Executor. /// Default constructor. See the default constructor for Executor.
RCLCPP_PUBLIC explicit ROSDefaultExecutor(
explicit ROSDefaultExecutor( rclcpp::ExecutorOptions const& options = rclcpp::ExecutorOptions());
const rclcpp::ExecutorOptions &options = rclcpp::ExecutorOptions());
/// Default destructor. /// Default destructor.
RCLCPP_PUBLIC virtual ~ROSDefaultExecutor();
virtual ~ROSDefaultExecutor();
/// Single-threaded implementation of spin. /// Single-threaded implementation of spin.
/** /**
* This function will block until work comes in, execute it, and then repeat * This function will block until work comes in, execute it, and then repeat
* the process until canceled. * the process until canceled.
* It may be interrupt by a call to rclcpp::Executor::cancel() or by ctrl-c * It may be interrupt by a call to rclcpp::Executor::cancel() or by ctrl-c
* if the associated context is configured to shutdown on SIGINT. * if the associated context is configured to shutdown on SIGINT.
* \throws std::runtime_error when spin() called while already spinning * \throws std::runtime_error when spin() called while already spinning
*/ */
RCLCPP_PUBLIC void spin() override;
void
spin() override;
bool get_next_executable(rclcpp::AnyExecutable &any_executable, std::chrono::nanoseconds timeout = std::chrono::nanoseconds(-1));
RCLCPP_PUBLIC bool get_next_executable(rclcpp::AnyExecutable& any_executable,
void std::chrono::nanoseconds timeout = std::chrono::nanoseconds(-1));
wait_for_work(std::chrono::nanoseconds timeout = std::chrono::nanoseconds(-1));
void wait_for_work(std::chrono::nanoseconds timeout = std::chrono::nanoseconds(-1));
private: private:
RCLCPP_DISABLE_COPY(ROSDefaultExecutor) RCLCPP_DISABLE_COPY(ROSDefaultExecutor)
}; };
#endif // RCLCPP__EXECUTORS__SINGLE_THREADED_EXECUTOR_HPP_ } // namespace priority_executor
#endif // PRIORITY_EXECUTOR__DEFAULT_EXECUTOR_HPP_

View file

@ -1,34 +1,53 @@
#ifndef RTIS_MULTITHREAD_EXECUTOR #ifndef PRIORITY_EXECUTOR__MULTITHREAD_PRIORITY_EXECUTOR_HPP_
#define RTIS_MULTITHREAD_EXECUTOR #define PRIORITY_EXECUTOR__MULTITHREAD_PRIORITY_EXECUTOR_HPP_
#include <priority_executor/priority_executor.hpp> #include <chrono>
#include "rclcpp/detail/mutex_two_priorities.hpp" #include <memory>
#include <mutex>
#include <set> #include <set>
#include <thread>
#include <unordered_map>
namespace timed_executor #include "priority_executor/default_executor.hpp"
{ #include "priority_executor/priority_executor.hpp"
class MultithreadTimedExecutor : public TimedExecutor #include "rclcpp/executor.hpp"
{ #include "rclcpp/macros.hpp"
public: #include "rclcpp/memory_strategies.hpp"
#include "rclcpp/node.hpp"
#include "rclcpp/rate.hpp"
#include "rclcpp/utilities.hpp"
#include "rclcpp/visibility_control.hpp"
namespace priority_executor {
class MultithreadTimedExecutor : public TimedExecutor {
public:
RCLCPP_PUBLIC RCLCPP_PUBLIC
explicit MultithreadTimedExecutor( explicit MultithreadTimedExecutor(
const rclcpp::ExecutorOptions &options = rclcpp::ExecutorOptions(), std::string name = "unnamed executor", int number_of_threads = 2, std::chrono::nanoseconds next_exec_timeout = std::chrono::nanoseconds(-1)); rclcpp::ExecutorOptions const &options = rclcpp::ExecutorOptions(),
std::string const &name = "unnamed executor",
int number_of_threads = 2,
std::chrono::nanoseconds next_exec_timeout = std::chrono::nanoseconds(-1));
RCLCPP_PUBLIC virtual ~MultithreadTimedExecutor() = default;
RCLCPP_PUBLIC size_t get_number_of_threads();
RCLCPP_PUBLIC void spin() override; RCLCPP_PUBLIC void spin() override;
protected: protected:
RCLCPP_PUBLIC void run(size_t thread_number); RCLCPP_PUBLIC void run(size_t _thread_number);
private: private:
size_t number_of_threads_;
std::set<rclcpp::TimerBase::SharedPtr> scheduled_timers_; std::set<rclcpp::TimerBase::SharedPtr> scheduled_timers_;
static std::unordered_map<MultithreadTimedExecutor *, int number_of_threads_;
std::shared_ptr<rclcpp::detail::MutexTwoPriorities>> std::chrono::nanoseconds next_exec_timeout_ = std::chrono::nanoseconds(-1);
node_time_logger logger_;
static std::unordered_map<MultithreadTimedExecutor*,
std::shared_ptr<rclcpp::detail::MutexTwoPriorities>>
wait_mutex_set_; wait_mutex_set_;
static std::mutex shared_wait_mutex_; static std::mutex shared_wait_mutex_;
std::chrono::nanoseconds next_exec_timeout_ = std::chrono::nanoseconds(-1); };
};
}
#endif } // namespace priority_executor
#endif // PRIORITY_EXECUTOR__MULTITHREAD_PRIORITY_EXECUTOR_HPP_

View file

@ -12,8 +12,8 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
#ifndef RTIS_TIMED_EXECUTOR #ifndef PRIORITY_EXECUTOR__PRIORITY_EXECUTOR_HPP_
#define RTIS_TIMED_EXECUTOR #define PRIORITY_EXECUTOR__PRIORITY_EXECUTOR_HPP_
#include <rmw/rmw.h> #include <rmw/rmw.h>
@ -25,59 +25,53 @@
#include "rclcpp/macros.hpp" #include "rclcpp/macros.hpp"
#include "rclcpp/visibility_control.hpp" #include "rclcpp/visibility_control.hpp"
#include "priority_executor/priority_memory_strategy.hpp" #include "priority_executor/priority_memory_strategy.hpp"
#include <priority_executor/default_executor.hpp> #include "priority_executor/default_executor.hpp"
namespace timed_executor
{
/// Single-threaded executor implementation. namespace priority_executor {
/**
/// Single-threaded executor implementation.
/**
* This is the default executor created by rclcpp::spin. * This is the default executor created by rclcpp::spin.
*/ */
class TimedExecutor : public rclcpp::Executor, public RTISTimed class TimedExecutor : public rclcpp::Executor, public RTISTimed {
{ public:
public:
RCLCPP_SMART_PTR_DEFINITIONS(TimedExecutor) RCLCPP_SMART_PTR_DEFINITIONS(TimedExecutor)
/// Default constructor. See the default constructor for Executor. /// Default constructor. See the default constructor for Executor.
RCLCPP_PUBLIC
explicit TimedExecutor( explicit TimedExecutor(
const rclcpp::ExecutorOptions &options = rclcpp::ExecutorOptions(), std::string name = "unnamed executor"); rclcpp::ExecutorOptions const& options = rclcpp::ExecutorOptions(),
std::string name = "unnamed executor");
/// Default destructor. /// Default destructor.
RCLCPP_PUBLIC
virtual ~TimedExecutor(); virtual ~TimedExecutor();
/// Single-threaded implementation of spin. /// Single-threaded implementation of spin.
/** /**
* This function will block until work comes in, execute it, and then repeat * This function will block until work comes in, execute it, and then repeat
* the process until canceled. * the process until canceled.
* It may be interrupt by a call to rclcpp::Executor::cancel() or by ctrl-c * It may be interrupt by a call to rclcpp::Executor::cancel() or by ctrl-c
* if the associated context is configured to shutdown on SIGINT. * if the associated context is configured to shutdown on SIGINT.
* \throws std::runtime_error when spin() called while already spinning * \throws std::runtime_error when spin() called while already spinning
*/ */
RCLCPP_PUBLIC void spin() override;
void
spin() override;
std::string name; std::string name;
void set_use_priorities(bool use_prio); void set_use_priorities(bool use_prio);
std::shared_ptr<PriorityMemoryStrategy<>> prio_memory_strategy_ = nullptr; std::shared_ptr<PriorityMemoryStrategy<>> prio_memory_strategy_{nullptr};
protected: protected:
bool bool get_next_executable(rclcpp::AnyExecutable& any_executable,
get_next_executable(rclcpp::AnyExecutable &any_executable, std::chrono::nanoseconds timeout = std::chrono::nanoseconds(-1)); std::chrono::nanoseconds timeout = std::chrono::nanoseconds(-1));
private: private:
RCLCPP_DISABLE_COPY(TimedExecutor) RCLCPP_DISABLE_COPY(TimedExecutor)
void
wait_for_work(std::chrono::nanoseconds timeout);
bool void wait_for_work(std::chrono::nanoseconds timeout);
get_next_ready_executable(rclcpp::AnyExecutable &any_executable); bool get_next_ready_executable(rclcpp::AnyExecutable& any_executable);
bool use_priorities = true; bool use_priorities{true};
}; };
} // namespace timed_executor } // namespace priority_executor
#endif // RCLCPP__EXECUTORS__SINGLE_THREADED_EXECUTOR_HPP_ #endif // PRIORITY_EXECUTOR__PRIORITY_EXECUTOR_HPP_

View file

@ -1,19 +1,5 @@
// Copyright 2015 Open Source Robotics Foundation, Inc. #ifndef PRIORITY_EXECUTOR__PRIORITY_MEMORY_STRATEGY_HPP_
// #define PRIORITY_EXECUTOR__PRIORITY_MEMORY_STRATEGY_HPP_
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef RTIS_PRIORITY_STRATEGY
#define RTIS_PRIORITY_STRATEGY
#include <cstdint> #include <cstdint>
#include <deque> #include <deque>
@ -24,383 +10,153 @@
#include <vector> #include <vector>
#include "rcl/allocator.h" #include "rcl/allocator.h"
#include "rclcpp/allocator/allocator_common.hpp" #include "rclcpp/allocator/allocator_common.hpp"
#include "rclcpp/memory_strategy.hpp" #include "rclcpp/memory_strategy.hpp"
#include "simple_timer/rt-sched.hpp" #include "simple_timer/rt-sched.hpp"
/// Delegate for handling memory allocations while the Executor is executing. namespace priority_executor {
/**
* By default, the memory strategy dynamically allocates memory for structures
* that come in from the rmw implementation after the executor waits for work,
* based on the number of entities that come through.
*/
enum ExecutableType enum class ExecutableType {
{ SUBSCRIPTION,
SUBSCRIPTION, SERVICE,
SERVICE, CLIENT,
CLIENT, TIMER,
TIMER, WAITABLE
WAITABLE
}; };
enum ExecutableScheduleType std::ostream& operator << (std::ostream& os, const ExecutableType& obj);
{
DEADLINE = 0, enum class ExecutableScheduleType {
CHAIN_AWARE_PRIORITY, DEADLINE = 0,
CHAIN_INDEPENDENT_PRIORITY, // not used here CHAIN_AWARE_PRIORITY,
DEFAULT, // not used here CHAIN_INDEPENDENT_PRIORITY, // not used here
DEFAULT, // not used here
}; };
class PriorityExecutable std::ostream& operator << (std::ostream& os, const ExecutableScheduleType& obj);
{
class PriorityExecutable {
public: public:
std::shared_ptr<const void> handle; static size_t num_executables;
ExecutableType type;
bool can_be_run = false;
std::shared_ptr<rclcpp::Waitable> waitable;
ExecutableScheduleType sched_type;
int priority = 0; explicit PriorityExecutable(
long period = 1000; // milliseconds std::shared_ptr<const void> h,
int p,
ExecutableType t,
ExecutableScheduleType sched_type = ExecutableScheduleType::CHAIN_INDEPENDENT_PRIORITY);
bool is_first_in_chain = false; PriorityExecutable();
bool is_last_in_chain = false;
// chain aware deadlines
std::deque<uint64_t> *deadlines = nullptr;
std::shared_ptr<rclcpp::TimerBase> timer_handle;
// just used for logging
int chain_id = 0;
// chain aware priority void dont_run();
int counter = 0; void allow_run();
void increment_counter();
bool operator==(PriorityExecutable const& other) const;
friend std::ostream& operator<<(std::ostream& os, PriorityExecutable const& pe);
PriorityExecutable( std::shared_ptr<const void> handle;
std::shared_ptr<const void> h, int p, ExecutableType t, ExecutableType type;
ExecutableScheduleType sched_type = CHAIN_INDEPENDENT_PRIORITY); bool can_be_run = false;
std::shared_ptr<rclcpp::Waitable> waitable;
ExecutableScheduleType sched_type;
void dont_run(); int priority = 0;
long period = 1000; // milliseconds
void allow_run(); bool is_first_in_chain = false;
bool is_last_in_chain = false;
PriorityExecutable(); std::deque<uint64_t>* deadlines = nullptr; // chain aware deadlines
std::shared_ptr<rclcpp::TimerBase> timer_handle;
void increment_counter(); int chain_id = 0; // just used for logging
int counter = 0; // chain aware priority
bool operator==(const PriorityExecutable &other) const; int executable_id = 0;
std::string name = "";
static size_t num_executables;
int executable_id = 0;
std::string name = "";
// stream operator for debug
friend std::ostream &operator<<(std::ostream &os,
const PriorityExecutable &pe)
{
os << "sched_type: " << pe.sched_type << ", ";
if (pe.sched_type == DEADLINE)
{
os << "period: " << pe.period << ", ";
}
os << "priority: " << pe.priority << ", ";
os << "executable_id: " << pe.executable_id << ", ";
os << "chain_id: " << pe.chain_id << ", ";
os << "is_first_in_chain: " << pe.is_first_in_chain << ", ";
os << "is_last_in_chain: " << pe.is_last_in_chain << ", ";
return os;
}
}; };
class PriorityExecutableComparator class PriorityExecutableComparator {
{
public: public:
bool operator()(const PriorityExecutable *p1, const PriorityExecutable *p2); bool operator()(PriorityExecutable const* p1, PriorityExecutable const* p2) const;
}; };
template <typename Alloc = std::allocator<void>> template <typename Alloc = std::allocator<void>>
class PriorityMemoryStrategy : public rclcpp::memory_strategy::MemoryStrategy class PriorityMemoryStrategy : public rclcpp::memory_strategy::MemoryStrategy {
{
public: public:
RCLCPP_SMART_PTR_DEFINITIONS(PriorityMemoryStrategy<Alloc>) RCLCPP_SMART_PTR_DEFINITIONS(PriorityMemoryStrategy<Alloc>)
using VoidAllocTraits = using VoidAllocTraits = typename rclcpp::allocator::AllocRebind<void*, Alloc>;
typename rclcpp::allocator::AllocRebind<void *, Alloc>; using VoidAlloc = typename VoidAllocTraits::allocator_type;
using VoidAlloc = typename VoidAllocTraits::allocator_type;
explicit PriorityMemoryStrategy(std::shared_ptr<Alloc> allocator) explicit PriorityMemoryStrategy(std::shared_ptr<Alloc> allocator);
{ PriorityMemoryStrategy();
allocator_ = std::make_shared<VoidAlloc>(*allocator.get());
logger_ = create_logger();
}
PriorityMemoryStrategy() node_time_logger logger_;
{
allocator_ = std::make_shared<VoidAlloc>();
logger_ = create_logger();
}
node_time_logger logger_;
void // Override methods
add_guard_condition(const rcl_guard_condition_t *guard_condition) override; void add_guard_condition(const rcl_guard_condition_t* guard_condition) override;
void remove_guard_condition(const rcl_guard_condition_t* guard_condition) override;
void clear_handles() override;
void remove_null_handles(rcl_wait_set_t* wait_set) override;
bool collect_entities(const WeakNodeList& weak_nodes) override;
void add_waitable_handle(const rclcpp::Waitable::SharedPtr& waitable) override;
bool add_handles_to_wait_set(rcl_wait_set_t* wait_set) override;
void // Class-specific methods
remove_guard_condition(const rcl_guard_condition_t *guard_condition) override; void get_next_executable(rclcpp::AnyExecutable& any_exec, const WeakNodeList& weak_nodes);
void post_execute(rclcpp::AnyExecutable any_exec, int thread_id = -1);
void clear_handles() override; // Override virtual methods
void get_next_subscription(rclcpp::AnyExecutable& any_exec, const WeakNodeList& weak_nodes) override;
void get_next_service(rclcpp::AnyExecutable& any_exec, const WeakNodeList& weak_nodes) override;
void get_next_client(rclcpp::AnyExecutable& any_exec, const WeakNodeList& weak_nodes) override;
void get_next_timer(rclcpp::AnyExecutable& any_exec, const WeakNodeList& weak_nodes) override;
void get_next_waitable(rclcpp::AnyExecutable& any_exec, const WeakNodeList& weak_nodes) override;
void remove_null_handles(rcl_wait_set_t *wait_set) override; // Priority handling methods
std::shared_ptr<PriorityExecutable> get_priority_settings(std::shared_ptr<const void> executable);
rcl_allocator_t get_allocator() override;
bool collect_entities(const WeakNodeList &weak_nodes) override; // Counter methods
size_t number_of_ready_subscriptions() const override;
size_t number_of_ready_services() const override;
size_t number_of_ready_events() const override;
size_t number_of_ready_clients() const override;
size_t number_of_guard_conditions() const override;
size_t number_of_ready_timers() const override;
size_t number_of_waitables() const override;
void // Executable priority methods
add_waitable_handle(const rclcpp::Waitable::SharedPtr &waitable) override; void set_executable_priority(std::shared_ptr<const void> handle, int priority, ExecutableType t);
void set_executable_priority(std::shared_ptr<const void> handle, int priority, ExecutableType t,
bool add_handles_to_wait_set(rcl_wait_set_t *wait_set) override; ExecutableScheduleType sc, int chain_index);
void set_executable_deadline(std::shared_ptr<const void> handle, int period, ExecutableType t,
void get_next_executable(rclcpp::AnyExecutable &any_exec, int chain_id = 0, std::string name = "");
const WeakNodeList &weak_nodes); int get_priority(std::shared_ptr<const void> executable);
void set_first_in_chain(std::shared_ptr<const void> exec_handle);
/** void set_last_in_chain(std::shared_ptr<const void> exec_handle);
* thread-id is used for logging. if -1, then don't log the thread id void assign_deadlines_queue(std::shared_ptr<const void> exec_handle, std::deque<uint64_t>* deadlines);
*/ std::shared_ptr<std::deque<uint64_t>> get_chain_deadlines(int chain_id);
void post_execute(rclcpp::AnyExecutable any_exec, int thread_id = -1);
void get_next_subscription(rclcpp::AnyExecutable &any_exec,
const WeakNodeList &weak_nodes) override;
void get_next_service(rclcpp::AnyExecutable &any_exec,
const WeakNodeList &weak_nodes) override;
void get_next_client(rclcpp::AnyExecutable &any_exec,
const WeakNodeList &weak_nodes) override;
void get_next_timer(rclcpp::AnyExecutable &any_exec,
const WeakNodeList &weak_nodes) override;
void get_next_waitable(rclcpp::AnyExecutable &any_exec,
const WeakNodeList &weak_nodes) override;
std::shared_ptr<PriorityExecutable>
get_priority_settings(std::shared_ptr<const void> executable)
{
auto search = priority_map.find(executable);
if (search != priority_map.end())
{
return search->second;
}
else
{
return nullptr;
}
}
rcl_allocator_t get_allocator() override
{
return rclcpp::allocator::get_rcl_allocator<void *, VoidAlloc>(
*allocator_.get());
}
size_t number_of_ready_subscriptions() const override
{
size_t number_of_subscriptions = subscription_handles_.size();
// std::cout << "ready_raw: " << number_of_subscriptions << std::endl;
for (auto waitable : waitable_handles_)
{
number_of_subscriptions += waitable->get_number_of_ready_subscriptions();
}
return number_of_subscriptions;
}
size_t number_of_ready_services() const override
{
size_t number_of_services = service_handles_.size();
for (auto waitable : waitable_handles_)
{
number_of_services += waitable->get_number_of_ready_services();
}
return number_of_services;
}
size_t number_of_ready_events() const override
{
size_t number_of_events = 0;
for (auto waitable : waitable_handles_)
{
number_of_events += waitable->get_number_of_ready_events();
}
return number_of_events;
}
size_t number_of_ready_clients() const override
{
size_t number_of_clients = client_handles_.size();
for (auto waitable : waitable_handles_)
{
number_of_clients += waitable->get_number_of_ready_clients();
}
return number_of_clients;
}
size_t number_of_guard_conditions() const override
{
size_t number_of_guard_conditions = guard_conditions_.size();
for (auto waitable : waitable_handles_)
{
number_of_guard_conditions +=
waitable->get_number_of_ready_guard_conditions();
}
return number_of_guard_conditions;
}
size_t number_of_ready_timers() const override
{
size_t number_of_timers = timer_handles_.size();
for (auto waitable : waitable_handles_)
{
number_of_timers += waitable->get_number_of_ready_timers();
}
return number_of_timers;
}
size_t number_of_waitables() const override
{
return waitable_handles_.size();
}
void set_executable_priority(std::shared_ptr<const void> handle, int priority,
ExecutableType t)
{
// TODO: any sanity checks should go here
priority_map[handle] =
std::make_shared<PriorityExecutable>(handle, priority, t);
}
void set_executable_priority(std::shared_ptr<const void> handle, int priority,
ExecutableType t, ExecutableScheduleType sc,
int chain_index)
{
// TODO: any sanity checks should go here
priority_map[handle] =
std::make_shared<PriorityExecutable>(handle, priority, t, sc);
priority_map[handle]->chain_id = chain_index;
}
void set_executable_deadline(std::shared_ptr<const void> handle, int period,
ExecutableType t, int chain_id = 0,
std::string name = "")
{
// TODO: any sanity checks should go here
priority_map[handle] =
std::make_shared<PriorityExecutable>(handle, period, t, DEADLINE);
priority_map[handle]->chain_id = chain_id;
priority_map[handle]->name = name;
// is there a deadline queue for this chain id?
auto search = chain_deadlines.find(chain_id);
if (search == chain_deadlines.end())
{
chain_deadlines[chain_id] = std::make_shared<std::deque<uint64_t>>();
}
priority_map[handle]->deadlines = chain_deadlines[chain_id].get();
}
int get_priority(std::shared_ptr<const void> executable)
{
auto search = priority_map.find(executable);
if (search != priority_map.end())
{
return search->second->priority;
}
else
{
return 0;
}
}
void set_first_in_chain(std::shared_ptr<const void> exec_handle)
{
get_priority_settings(exec_handle)->is_first_in_chain = true;
}
void set_last_in_chain(std::shared_ptr<const void> exec_handle)
{
get_priority_settings(exec_handle)->is_last_in_chain = true;
}
void assign_deadlines_queue(std::shared_ptr<const void> exec_handle,
std::deque<uint64_t> *deadlines)
{
get_priority_settings(exec_handle)->deadlines = deadlines;
}
std::shared_ptr<std::deque<uint64_t>> get_chain_deadlines(int chain_id)
{
auto search = chain_deadlines.find(chain_id);
if (search != chain_deadlines.end())
{
return search->second;
}
else
{
return nullptr;
}
}
private: private:
std::shared_ptr<PriorityExecutable> std::shared_ptr<PriorityExecutable> get_and_reset_priority(std::shared_ptr<const void> executable,
get_and_reset_priority(std::shared_ptr<const void> executable, ExecutableType t);
ExecutableType t)
{
// PriorityExecutable *p = get_priority_settings(executable);
std::shared_ptr<PriorityExecutable> p = get_priority_settings(executable);
if (p == nullptr)
{
// priority_map[executable] = PriorityExecutable(executable, 0, t);
priority_map[executable] =
std::make_shared<PriorityExecutable>(executable, 0, t);
p = priority_map[executable];
}
// p->can_be_run = true;
return p;
}
template <typename T> template <typename T>
using VectorRebind = std::vector< using VectorRebind = std::vector<T, typename std::allocator_traits<Alloc>::template rebind_alloc<T>>;
T, typename std::allocator_traits<Alloc>::template rebind_alloc<T>>;
VectorRebind<const rcl_guard_condition_t *> guard_conditions_; // Member variables
VectorRebind<const rcl_guard_condition_t*> guard_conditions_;
VectorRebind<std::shared_ptr<const rcl_subscription_t>> subscription_handles_;
VectorRebind<std::shared_ptr<const rcl_service_t>> service_handles_;
VectorRebind<std::shared_ptr<const rcl_client_t>> client_handles_;
VectorRebind<std::shared_ptr<const rcl_timer_t>> timer_handles_;
VectorRebind<std::shared_ptr<rclcpp::Waitable>> waitable_handles_;
std::shared_ptr<VoidAlloc> allocator_;
std::map<std::shared_ptr<const void>, std::shared_ptr<PriorityExecutable>> priority_map;
std::map<int, std::shared_ptr<std::deque<uint64_t>>> chain_deadlines;
std::set<const PriorityExecutable*, PriorityExecutableComparator> all_executables_;
VectorRebind<std::shared_ptr<const rcl_subscription_t>> subscription_handles_; void add_executable_to_queue(std::shared_ptr<PriorityExecutable> p);
VectorRebind<std::shared_ptr<const rcl_service_t>> service_handles_;
VectorRebind<std::shared_ptr<const rcl_client_t>> client_handles_;
VectorRebind<std::shared_ptr<const rcl_timer_t>> timer_handles_;
VectorRebind<std::shared_ptr<rclcpp::Waitable>> waitable_handles_;
std::shared_ptr<VoidAlloc> allocator_;
// TODO: evaluate using node/subscription namespaced strings as keys
// holds *all* handle->priority mappings
std::map<std::shared_ptr<const void>, std::shared_ptr<PriorityExecutable>>
priority_map;
std::map<int, std::shared_ptr<std::deque<uint64_t>>> chain_deadlines;
// hold *only valid* executable+priorities
// std::priority_queue<const PriorityExecutable *, std::vector<const
// PriorityExecutable *>, PriorityExecutableComparator> all_executables_;
std::set<const PriorityExecutable *, PriorityExecutableComparator>
all_executables_;
// priority queue doesn't allow iteration. fortunately, std::map is sorted by
// key, so we can replace the priority queue with a map the key will be the
// priority. the value doesn't matter. std::map<const PriorityExecutable *,
// int, PriorityExecutableComparator> all_executables_ = std::map<const
// PriorityExecutable *, int,
// PriorityExecutableComparator>(PriorityExecutableComparator());
void add_executable_to_queue(std::shared_ptr<PriorityExecutable> p);
}; };
#endif // RCLCPP__STRATEGIES__ALLOCATOR_MEMORY_STRATEGY_HPP_ } // namespace priority_executor
#endif // PRIORITY_EXECUTOR__PRIORITY_MEMORY_STRATEGY_HPP_

View file

@ -2,15 +2,18 @@
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?> <?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3"> <package format="3">
<name>priority_executor</name> <name>priority_executor</name>
<version>0.0.0</version> <version>0.1.0</version>
<description>TODO: Package description</description> <description>
<maintainer email="kurt4wilson@gmail.com">kurt</maintainer> ROS 2 package implementing priority-based executors for real-time task scheduling
<license>TODO: License declaration</license> </description>
<maintainer email="your.email@example.com">ROS-Dynamic-Executor Team</maintainer>
<license>Apache License 2.0</license>
<buildtool_depend>ament_cmake</buildtool_depend> <buildtool_depend>ament_cmake</buildtool_depend>
<depend>rclcpp</depend> <depend>rclcpp</depend>
<depend>rcl</depend> <depend>rcl</depend>
<depend>rmw</depend>
<depend>std_msgs</depend> <depend>std_msgs</depend>
<depend>std_srvs</depend> <depend>std_srvs</depend>
<depend>simple_timer</depend> <depend>simple_timer</depend>

View file

@ -13,255 +13,253 @@
// limitations under the License. // limitations under the License.
#include "priority_executor/default_executor.hpp" #include "priority_executor/default_executor.hpp"
#include <chrono>
#include <functional>
#include <memory>
#include <mutex>
#include <sstream>
#include <thread>
#include <unordered_map>
#include "rcpputils/scope_exit.hpp" #include "rcpputils/scope_exit.hpp"
#include "rclcpp/any_executable.hpp" #include "rclcpp/any_executable.hpp"
#include "simple_timer/rt-sched.hpp" #include "simple_timer/rt-sched.hpp"
ROSDefaultExecutor::ROSDefaultExecutor(const rclcpp::ExecutorOptions &options) namespace priority_executor {
: rclcpp::Executor(options)
{ std::unordered_map<ROSDefaultMultithreadedExecutor*,
logger_ = create_logger(); std::shared_ptr<rclcpp::detail::MutexTwoPriorities>>
ROSDefaultMultithreadedExecutor::wait_mutex_set_;
std::mutex ROSDefaultMultithreadedExecutor::shared_wait_mutex_;
ROSDefaultExecutor::ROSDefaultExecutor(rclcpp::ExecutorOptions const& options)
: rclcpp::Executor(options) {
logger_ = create_logger();
} }
ROSDefaultExecutor::~ROSDefaultExecutor() {} ROSDefaultExecutor::~ROSDefaultExecutor() {}
void ROSDefaultExecutor::wait_for_work(std::chrono::nanoseconds timeout) void ROSDefaultExecutor::wait_for_work(std::chrono::nanoseconds timeout) {
{
{
std::unique_lock<std::mutex> lock(memory_strategy_mutex_);
// Collect the subscriptions and timers to be waited on
memory_strategy_->clear_handles();
bool has_invalid_weak_nodes = memory_strategy_->collect_entities(weak_nodes_);
// Clean up any invalid nodes, if they were detected
if (has_invalid_weak_nodes)
{ {
auto node_it = weak_nodes_.begin(); std::unique_lock<std::mutex> lock(memory_strategy_mutex_);
auto gc_it = guard_conditions_.begin();
while (node_it != weak_nodes_.end()) // Collect the subscriptions and timers to be waited on
{ memory_strategy_->clear_handles();
if (node_it->expired()) bool const has_invalid_weak_nodes = memory_strategy_->collect_entities(weak_nodes_);
{
node_it = weak_nodes_.erase(node_it); // Clean up any invalid nodes, if they were detected
memory_strategy_->remove_guard_condition(*gc_it); if (has_invalid_weak_nodes) {
gc_it = guard_conditions_.erase(gc_it); auto node_it = weak_nodes_.begin();
auto gc_it = guard_conditions_.begin();
while (node_it != weak_nodes_.end()) {
if (node_it->expired()) {
node_it = weak_nodes_.erase(node_it);
memory_strategy_->remove_guard_condition(*gc_it);
gc_it = guard_conditions_.erase(gc_it);
} else {
++node_it;
++gc_it;
}
}
} }
else // clear wait set
{ rcl_ret_t ret = rcl_wait_set_clear(&wait_set_);
++node_it; if (ret != RCL_RET_OK) {
++gc_it; rclcpp::exceptions::throw_from_rcl_error(ret, "Couldn't clear wait set");
}
// The size of waitables are accounted for in size of the other entities
ret = rcl_wait_set_resize(
&wait_set_, memory_strategy_->number_of_ready_subscriptions(),
memory_strategy_->number_of_guard_conditions(),
memory_strategy_->number_of_ready_timers(),
memory_strategy_->number_of_ready_clients(),
memory_strategy_->number_of_ready_services(),
memory_strategy_->number_of_ready_events());
if (RCL_RET_OK != ret) {
rclcpp::exceptions::throw_from_rcl_error(ret, "Couldn't resize the wait set");
}
if (!memory_strategy_->add_handles_to_wait_set(&wait_set_)) {
throw std::runtime_error("Couldn't fill wait set");
} }
}
} }
// clear wait set rcl_ret_t const status =
rcl_ret_t ret = rcl_wait_set_clear(&wait_set_); rcl_wait(&wait_set_, std::chrono::duration_cast<std::chrono::nanoseconds>(timeout).count());
if (ret != RCL_RET_OK) if (status == RCL_RET_WAIT_SET_EMPTY) {
{ RCUTILS_LOG_WARN_NAMED(
rclcpp::exceptions::throw_from_rcl_error(ret, "Couldn't clear wait set"); "rclcpp",
"empty wait set received in rcl_wait(). This should never happen.");
} else if (status != RCL_RET_OK && status != RCL_RET_TIMEOUT) {
using rclcpp::exceptions::throw_from_rcl_error;
throw_from_rcl_error(status, "rcl_wait() failed");
} }
// The size of waitables are accounted for in size of the other entities // check the null handles in the wait set and remove them from the handles in memory strategy
ret = rcl_wait_set_resize( // for callback-based entities
&wait_set_, memory_strategy_->number_of_ready_subscriptions(), memory_strategy_->remove_null_handles(&wait_set_);
memory_strategy_->number_of_guard_conditions(), memory_strategy_->number_of_ready_timers(),
memory_strategy_->number_of_ready_clients(), memory_strategy_->number_of_ready_services(),
memory_strategy_->number_of_ready_events());
if (RCL_RET_OK != ret)
{
rclcpp::exceptions::throw_from_rcl_error(ret, "Couldn't resize the wait set");
}
if (!memory_strategy_->add_handles_to_wait_set(&wait_set_))
{
throw std::runtime_error("Couldn't fill wait set");
}
}
rcl_ret_t status =
rcl_wait(&wait_set_, std::chrono::duration_cast<std::chrono::nanoseconds>(timeout).count());
if (status == RCL_RET_WAIT_SET_EMPTY)
{
RCUTILS_LOG_WARN_NAMED(
"rclcpp",
"empty wait set received in rcl_wait(). This should never happen.");
}
else if (status != RCL_RET_OK && status != RCL_RET_TIMEOUT)
{
using rclcpp::exceptions::throw_from_rcl_error;
throw_from_rcl_error(status, "rcl_wait() failed");
}
// check the null handles in the wait set and remove them from the handles in memory strategy
// for callback-based entities
memory_strategy_->remove_null_handles(&wait_set_);
} }
bool ROSDefaultExecutor::get_next_executable(rclcpp::AnyExecutable &any_executable, std::chrono::nanoseconds timeout) bool ROSDefaultExecutor::get_next_executable(rclcpp::AnyExecutable& any_executable,
{ std::chrono::nanoseconds timeout) {
bool success = false; bool success = false;
// Check to see if there are any subscriptions or timers needing service // Check to see if there are any subscriptions or timers needing service
// TODO(wjwwood): improve run to run efficiency of this function // TODO(wjwwood): improve run to run efficiency of this function
// sched_yield(); // sched_yield();
wait_for_work(timeout);
success = get_next_ready_executable(any_executable);
return success;
}
void ROSDefaultExecutor::spin()
{
if (spinning.exchange(true))
{
throw std::runtime_error("spin() called while already spinning");
}
RCPPUTILS_SCOPE_EXIT(this->spinning.store(false););
while (rclcpp::ok(this->context_) && spinning.load())
{
rclcpp::AnyExecutable any_executable;
if (get_next_executable(any_executable))
{
execute_any_executable(any_executable);
}
}
}
bool ROSDefaultMultithreadedExecutor::get_next_executable(rclcpp::AnyExecutable &any_executable, std::chrono::nanoseconds timeout)
{
bool success = false;
// Check to see if there are any subscriptions or timers needing service
// TODO(wjwwood): improve run to run efficiency of this function
// try to get an executable
// record the start time
auto start = std::chrono::steady_clock::now();
success = get_next_ready_executable(any_executable);
// and the end time
auto end = std::chrono::steady_clock::now();
std::stringstream oss;
oss << "{\"operation\":\"get_next_executable\", \"result\":\"" << success << "\", \"duration\":\"" << std::chrono::duration_cast<std::chrono::microseconds>(end - start).count() << "\"}";
log_entry(logger_, oss.str());
// If there are none
if (!success)
{
// Wait for subscriptions or timers to work on
// queue refresh
start = std::chrono::steady_clock::now();
wait_for_work(timeout); wait_for_work(timeout);
// and the end time
end = std::chrono::steady_clock::now();
auto wait_duration = std::chrono::duration_cast<std::chrono::microseconds>(end - start).count();
oss.str("");
oss << "{\"operation\":\"wait_for_work\", \"result\":\"" << success << "\", \"wait_duration\":\"" << wait_duration << "\"}";
log_entry(logger_, oss.str());
if (!spinning.load())
{
return false;
}
// Try again
start = std::chrono::steady_clock::now();
success = get_next_ready_executable(any_executable); success = get_next_ready_executable(any_executable);
// and the end time return success;
end = std::chrono::steady_clock::now();
oss.str("");
oss << "{\"operation\":\"get_next_executable\", \"result\":\"" << success << "\", \"duration\":\"" << std::chrono::duration_cast<std::chrono::microseconds>(end - start).count() << "\"}";
log_entry(logger_, oss.str());
}
return success;
} }
std::unordered_map<ROSDefaultMultithreadedExecutor *, std::shared_ptr<rclcpp::detail::MutexTwoPriorities>> ROSDefaultMultithreadedExecutor::wait_mutex_set_; void ROSDefaultExecutor::spin() {
std::mutex ROSDefaultMultithreadedExecutor::shared_wait_mutex_; if (spinning.exchange(true)) {
throw std::runtime_error("spin() called while already spinning");
}
RCPPUTILS_SCOPE_EXIT(this->spinning.store(false););
while (rclcpp::ok(this->context_) && spinning.load()) {
rclcpp::AnyExecutable any_executable;
if (get_next_executable(any_executable)) {
execute_any_executable(any_executable);
}
}
}
size_t ROSDefaultMultithreadedExecutor::get_number_of_threads() {
return number_of_threads_;
}
ROSDefaultMultithreadedExecutor::ROSDefaultMultithreadedExecutor( ROSDefaultMultithreadedExecutor::ROSDefaultMultithreadedExecutor(
const rclcpp::ExecutorOptions &options, rclcpp::ExecutorOptions const& options,
int number_of_threads, std::chrono::nanoseconds next_exec_timeout) int number_of_threads,
: Executor(options) std::chrono::nanoseconds next_exec_timeout)
{ : Executor(options) {
std::lock_guard<std::mutex> wait_lock(ROSDefaultMultithreadedExecutor::shared_wait_mutex_); std::lock_guard<std::mutex> wait_lock(ROSDefaultMultithreadedExecutor::shared_wait_mutex_);
wait_mutex_set_[this] = std::make_shared<rclcpp::detail::MutexTwoPriorities>(); wait_mutex_set_[this] = std::make_shared<rclcpp::detail::MutexTwoPriorities>();
number_of_threads_ = number_of_threads; number_of_threads_ = number_of_threads;
next_exec_timeout_ = next_exec_timeout; next_exec_timeout_ = next_exec_timeout;
logger_ = create_logger(); logger_ = create_logger();
} }
void ROSDefaultMultithreadedExecutor::spin() void ROSDefaultMultithreadedExecutor::spin() {
{ if (spinning.exchange(true)) {
if (spinning.exchange(true)) throw std::runtime_error("spin() called while already spinning");
{
throw std::runtime_error("spin() called while already spinning");
}
std::vector<std::thread> threads;
size_t thread_id = 0;
{
auto wait_mutex = ROSDefaultMultithreadedExecutor::wait_mutex_set_[this];
auto low_priority_wait_mutex = wait_mutex->get_low_priority_lockable();
std::lock_guard<rclcpp::detail::MutexTwoPriorities::LowPriorityLockable> wait_lock(low_priority_wait_mutex);
for (; thread_id < number_of_threads_ - 1; ++thread_id)
{
auto func = std::bind(&ROSDefaultMultithreadedExecutor::run, this, thread_id);
threads.emplace_back(func);
} }
}
run(thread_id);
for (auto &thread : threads)
{
thread.join();
}
}
void ROSDefaultMultithreadedExecutor::run(__attribute__((unused)) size_t _thread_number) { std::vector<std::thread> threads;
while (rclcpp::ok(this->context_) && spinning.load()) size_t thread_id = 0;
{
rclcpp::AnyExecutable any_exec;
{ {
auto wait_mutex = ROSDefaultMultithreadedExecutor::wait_mutex_set_[this]; auto wait_mutex = ROSDefaultMultithreadedExecutor::wait_mutex_set_[this];
auto low_priority_wait_mutex = wait_mutex->get_low_priority_lockable(); auto low_priority_wait_mutex = wait_mutex->get_low_priority_lockable();
std::lock_guard<rclcpp::detail::MutexTwoPriorities::LowPriorityLockable> wait_lock(low_priority_wait_mutex); std::lock_guard<rclcpp::detail::MutexTwoPriorities::LowPriorityLockable>
if (!rclcpp::ok(this->context_) || !spinning.load()) wait_lock(low_priority_wait_mutex);
{ for (; thread_id < number_of_threads_ - 1; ++thread_id) {
return; auto const func = std::bind(&ROSDefaultMultithreadedExecutor::run, this, thread_id);
} threads.emplace_back(func);
if (!get_next_executable(any_exec, next_exec_timeout_))
{
continue;
}
if (any_exec.timer)
{
// Guard against multiple threads getting the same timer.
if (scheduled_timers_.count(any_exec.timer) != 0)
{
// Make sure that any_exec's callback group is reset before
// the lock is released.
if (any_exec.callback_group)
{
any_exec.callback_group->can_be_taken_from().store(true);
}
continue;
} }
scheduled_timers_.insert(any_exec.timer);
}
} }
// if (yield_before_execute_) run(thread_id);
// { for (auto& thread : threads) {
// std::this_thread::yield(); thread.join();
// }
execute_any_executable(any_exec);
if (any_exec.timer)
{
auto wait_mutex = ROSDefaultMultithreadedExecutor::wait_mutex_set_[this];
auto high_priority_wait_mutex = wait_mutex->get_high_priority_lockable();
std::lock_guard<rclcpp::detail::MutexTwoPriorities::HighPriorityLockable> wait_lock(high_priority_wait_mutex);
auto it = scheduled_timers_.find(any_exec.timer);
if (it != scheduled_timers_.end())
{
scheduled_timers_.erase(it);
}
} }
// Clear the callback_group to prevent the AnyExecutable destructor from
// resetting the callback group `can_be_taken_from`
any_exec.callback_group.reset();
}
} }
void ROSDefaultMultithreadedExecutor::run(size_t thread_number) {
while (rclcpp::ok(this->context_) && spinning.load()) {
rclcpp::AnyExecutable any_exec;
{
auto wait_mutex = ROSDefaultMultithreadedExecutor::wait_mutex_set_[this];
auto low_priority_wait_mutex = wait_mutex->get_low_priority_lockable();
std::lock_guard<rclcpp::detail::MutexTwoPriorities::LowPriorityLockable>
wait_lock(low_priority_wait_mutex);
if (!rclcpp::ok(this->context_) || !spinning.load()) {
return;
}
if (!get_next_executable(any_exec, next_exec_timeout_)) {
continue;
}
if (any_exec.timer) {
// Guard against multiple threads getting the same timer.
if (scheduled_timers_.count(any_exec.timer) != 0) {
// Make sure that any_exec's callback group is reset before
// the lock is released.
if (any_exec.callback_group) {
any_exec.callback_group->can_be_taken_from().store(true);
}
continue;
}
scheduled_timers_.insert(any_exec.timer);
}
}
execute_any_executable(any_exec);
if (any_exec.timer) {
auto wait_mutex = ROSDefaultMultithreadedExecutor::wait_mutex_set_[this];
auto high_priority_wait_mutex = wait_mutex->get_high_priority_lockable();
std::lock_guard<rclcpp::detail::MutexTwoPriorities::HighPriorityLockable>
wait_lock(high_priority_wait_mutex);
auto const it = scheduled_timers_.find(any_exec.timer);
if (it != scheduled_timers_.end()) {
scheduled_timers_.erase(it);
}
}
// Clear the callback_group to prevent the AnyExecutable destructor from
// resetting the callback group `can_be_taken_from`
any_exec.callback_group.reset();
}
}
bool ROSDefaultMultithreadedExecutor::get_next_executable(rclcpp::AnyExecutable& any_executable,
std::chrono::nanoseconds timeout) {
bool success = false;
// Check to see if there are any subscriptions or timers needing service
// TODO(wjwwood): improve run to run efficiency of this function
// try to get an executable
// record the start time
auto const start = std::chrono::steady_clock::now();
success = get_next_ready_executable(any_executable);
// and the end time
auto const end = std::chrono::steady_clock::now();
std::stringstream oss;
oss << "{\"operation\":\"get_next_executable\", \"result\":\"" << success
<< "\", \"duration\":\""
<< std::chrono::duration_cast<std::chrono::microseconds>(end - start).count()
<< "\"}";
log_entry(logger_, oss.str());
// If there are none
if (!success) {
// Wait for subscriptions or timers to work on
// queue refresh
auto const wait_start = std::chrono::steady_clock::now();
wait_for_work(timeout);
// and the end time
auto const wait_end = std::chrono::steady_clock::now();
auto const wait_duration = std::chrono::duration_cast<std::chrono::microseconds>(
wait_end - wait_start).count();
oss.str("");
oss << "{\"operation\":\"wait_for_work\", \"result\":\"" << success
<< "\", \"wait_duration\":\"" << wait_duration << "\"}";
log_entry(logger_, oss.str());
if (!spinning.load()) {
return false;
}
// Try again
auto const retry_start = std::chrono::steady_clock::now();
success = get_next_ready_executable(any_executable);
// and the end time
auto const retry_end = std::chrono::steady_clock::now();
oss.str("");
oss << "{\"operation\":\"get_next_executable\", \"result\":\"" << success
<< "\", \"duration\":\""
<< std::chrono::duration_cast<std::chrono::microseconds>(
retry_end - retry_start).count()
<< "\"}";
log_entry(logger_, oss.str());
}
return success;
}
} // namespace priority_executor

View file

@ -1,114 +1,103 @@
#include "priority_executor/multithread_priority_executor.hpp" #include "priority_executor/multithread_priority_executor.hpp"
namespace timed_executor
{ #include <chrono>
std::unordered_map<MultithreadTimedExecutor *, std::shared_ptr<rclcpp::detail::MutexTwoPriorities>> #include <functional>
MultithreadTimedExecutor::wait_mutex_set_; #include <memory>
std::mutex MultithreadTimedExecutor::shared_wait_mutex_; #include <mutex>
MultithreadTimedExecutor::MultithreadTimedExecutor( #include <sstream>
const rclcpp::ExecutorOptions &options, #include <thread>
std::string name, #include <unordered_map>
int number_of_threads, std::chrono::nanoseconds next_exec_timeout)
: TimedExecutor(options, name) #include "rcpputils/scope_exit.hpp"
{ #include "rclcpp/any_executable.hpp"
std::lock_guard<std::mutex> wait_lock(MultithreadTimedExecutor::shared_wait_mutex_); #include "simple_timer/rt-sched.hpp"
wait_mutex_set_[this] = std::make_shared<rclcpp::detail::MutexTwoPriorities>();
number_of_threads_ = number_of_threads; namespace priority_executor {
next_exec_timeout_ = next_exec_timeout;
logger_ = create_logger(); std::unordered_map<MultithreadTimedExecutor*,
std::shared_ptr<rclcpp::detail::MutexTwoPriorities>>
MultithreadTimedExecutor::wait_mutex_set_;
std::mutex MultithreadTimedExecutor::shared_wait_mutex_;
MultithreadTimedExecutor::MultithreadTimedExecutor(
rclcpp::ExecutorOptions const& options,
std::string const &name,
int number_of_threads,
std::chrono::nanoseconds next_exec_timeout)
: TimedExecutor(options, name) {
std::lock_guard<std::mutex> wait_lock(MultithreadTimedExecutor::shared_wait_mutex_);
wait_mutex_set_[this] = std::make_shared<rclcpp::detail::MutexTwoPriorities>();
number_of_threads_ = number_of_threads;
next_exec_timeout_ = next_exec_timeout;
logger_ = create_logger();
}
void MultithreadTimedExecutor::spin() {
if (spinning.exchange(true)) {
throw std::runtime_error("spin() called while already spinning");
} }
size_t MultithreadTimedExecutor::get_number_of_threads() std::vector<std::thread> threads;
size_t thread_id = 0;
{ {
return number_of_threads_; auto wait_mutex = MultithreadTimedExecutor::wait_mutex_set_[this];
} auto low_priority_wait_mutex = wait_mutex->get_low_priority_lockable();
std::lock_guard<rclcpp::detail::MutexTwoPriorities::LowPriorityLockable>
void MultithreadTimedExecutor::spin() wait_lock(low_priority_wait_mutex);
{ for (; thread_id < number_of_threads_ - 1; ++thread_id) {
if (spinning.exchange(true)) auto const func = std::bind(&MultithreadTimedExecutor::run, this, thread_id);
{ threads.emplace_back(func);
throw std::runtime_error("spin() called while already spinning");
} }
}
run(thread_id);
for (auto& thread : threads) {
thread.join();
}
}
std::vector<std::thread> threads; void MultithreadTimedExecutor::run(size_t _thread_number) {
size_t thread_id = 0; while (rclcpp::ok(this->context_) && spinning.load()) {
rclcpp::AnyExecutable any_exec;
{ {
auto wait_mutex = MultithreadTimedExecutor::wait_mutex_set_[this]; auto wait_mutex = MultithreadTimedExecutor::wait_mutex_set_[this];
auto low_priority_wait_mutex = wait_mutex->get_low_priority_lockable(); auto low_priority_wait_mutex = wait_mutex->get_low_priority_lockable();
std::lock_guard<rclcpp::detail::MutexTwoPriorities::LowPriorityLockable> wait_lock(low_priority_wait_mutex); std::lock_guard<rclcpp::detail::MutexTwoPriorities::LowPriorityLockable>
for (; thread_id < number_of_threads_ - 1; ++thread_id) wait_lock(low_priority_wait_mutex);
{ if (!rclcpp::ok(this->context_) || !spinning.load()) {
auto func = std::bind(&MultithreadTimedExecutor::run, this, thread_id); return;
threads.emplace_back(func);
} }
} if (!get_next_executable(any_exec, next_exec_timeout_)) {
run(thread_id); continue;
for (auto &thread : threads) }
{ if (any_exec.timer) {
thread.join(); // Guard against multiple threads getting the same timer.
} if (scheduled_timers_.count(any_exec.timer) != 0) {
} // Make sure that any_exec's callback group is reset before
// the lock is released.
void MultithreadTimedExecutor::run(size_t thread_number) if (any_exec.callback_group) {
{ any_exec.callback_group->can_be_taken_from().store(true);
// set affinity }
cpu_set_t cpuset;
CPU_ZERO(&cpuset);
CPU_SET(thread_number, &cpuset);
int rc = pthread_setaffinity_np(pthread_self(), sizeof(cpu_set_t), &cpuset);
if (rc != 0)
{
std::cout << "Error calling pthread_setaffinity_np: " << rc << "\n";
}
while (rclcpp::ok(this->context_) && spinning.load())
{
rclcpp::AnyExecutable any_executable;
{
auto wait_mutex = MultithreadTimedExecutor::wait_mutex_set_[this];
auto low_priority_wait_mutex = wait_mutex->get_low_priority_lockable();
std::lock_guard<rclcpp::detail::MutexTwoPriorities::LowPriorityLockable> wait_lock(low_priority_wait_mutex);
if (!rclcpp::ok(this->context_) || !spinning.load())
{
return;
}
if (!get_next_executable(any_executable, next_exec_timeout_))
{
continue; continue;
} }
if (any_executable.timer) scheduled_timers_.insert(any_exec.timer);
{
if (scheduled_timers_.count(any_executable.timer) != 0)
{
if (any_executable.callback_group)
{
any_executable.callback_group->can_be_taken_from().store(true);
}
continue;
}
scheduled_timers_.insert(any_executable.timer);
}
}
execute_any_executable(any_executable);
if (any_executable.timer)
{
auto wait_mutex = MultithreadTimedExecutor::wait_mutex_set_[this];
auto high_priority_wait_mutex = wait_mutex->get_high_priority_lockable();
std::lock_guard<rclcpp::detail::MutexTwoPriorities::HighPriorityLockable> wait_lock(high_priority_wait_mutex);
auto it = scheduled_timers_.find(any_executable.timer);
if (it != scheduled_timers_.end())
{
scheduled_timers_.erase(it);
}
}
any_executable.callback_group.reset();
if (prio_memory_strategy_ != nullptr)
{
auto wait_mutex = MultithreadTimedExecutor::wait_mutex_set_[this];
auto low_priority_wait_mutex = wait_mutex->get_low_priority_lockable();
std::lock_guard<rclcpp::detail::MutexTwoPriorities::LowPriorityLockable> wait_lock(low_priority_wait_mutex);
// std::shared_ptr<PriorityMemoryStrategy<>> prio_memory_strategy_ = std::dynamic_pointer_cast<PriorityMemoryStrategy>(memory_strategy_);
prio_memory_strategy_->post_execute(any_executable, thread_number);
} }
} }
execute_any_executable(any_exec);
if (any_exec.timer) {
auto wait_mutex = MultithreadTimedExecutor::wait_mutex_set_[this];
auto high_priority_wait_mutex = wait_mutex->get_high_priority_lockable();
std::lock_guard<rclcpp::detail::MutexTwoPriorities::HighPriorityLockable>
wait_lock(high_priority_wait_mutex);
auto const it = scheduled_timers_.find(any_exec.timer);
if (it != scheduled_timers_.end()) {
scheduled_timers_.erase(it);
}
}
// Clear the callback_group to prevent the AnyExecutable destructor from
// resetting the callback group `can_be_taken_from`
any_exec.callback_group.reset();
} }
} }
} // namespace priority_executor

View file

@ -19,236 +19,226 @@
#include "rclcpp/utilities.hpp" #include "rclcpp/utilities.hpp"
#include <memory> #include <memory>
#include <sched.h> #include <sched.h>
// for sleep #include <unistd.h> // for sleep
#include <unistd.h>
namespace timed_executor namespace priority_executor
{ {
TimedExecutor::TimedExecutor(const rclcpp::ExecutorOptions &options, std::string name) TimedExecutor::TimedExecutor(rclcpp::ExecutorOptions const& options, std::string name)
: rclcpp::Executor(options) : rclcpp::Executor(options) {
{ this->name = std::move(name);
this->name = name;
logger_ = create_logger(); logger_ = create_logger();
} }
TimedExecutor::~TimedExecutor() {} TimedExecutor::~TimedExecutor() = default;
void void TimedExecutor::spin() {
TimedExecutor::spin() if (spinning.exchange(true)) {
{ throw std::runtime_error("spin() called while already spinning");
if (spinning.exchange(true))
{
throw std::runtime_error("spin() called while already spinning");
} }
RCLCPP_SCOPE_EXIT(this->spinning.store(false);); RCLCPP_SCOPE_EXIT(this->spinning.store(false););
while (rclcpp::ok(this->context_) && spinning.load())
{
rclcpp::AnyExecutable any_executable;
// std::cout<<memory_strategy_->number_of_ready_timers()<<std::endl;
// std::cout << "spinning " << this->name << std::endl;
// size_t ready = memory_strategy_->number_of_ready_subscriptions();
// std::cout << "ready:" << ready << std::endl;
if (get_next_executable(any_executable, std::chrono::nanoseconds(-1))) while (rclcpp::ok(this->context_) && spinning.load()) {
{ rclcpp::AnyExecutable any_executable;
execute_any_executable(any_executable);
// make sure memory_strategy_ is an instance of PriorityMemoryStrategy if (get_next_executable(any_executable, std::chrono::nanoseconds(-1))) {
if (prio_memory_strategy_!=nullptr) execute_any_executable(any_executable);
{ if (prio_memory_strategy_ != nullptr) {
prio_memory_strategy_->post_execute(any_executable); prio_memory_strategy_->post_execute(any_executable);
}
} }
}
} }
RCLCPP_INFO(rclcpp::get_logger("priority_executor"), "priority executor shutdown"); RCLCPP_INFO(rclcpp::get_logger("priority_executor"), "priority executor shutdown");
} }
bool TimedExecutor::get_next_executable(rclcpp::AnyExecutable &any_executable, std::chrono::nanoseconds timeout) bool TimedExecutor::get_next_executable(
{ rclcpp::AnyExecutable& any_executable,
std::chrono::nanoseconds timeout) {
bool success = false; bool success = false;
// Check to see if there are any subscriptions or timers needing service
// TODO(wjwwood): improve run to run efficiency of this function auto const start = std::chrono::steady_clock::now();
// sched_yield();
// sleep for 10us
// usleep(20);
auto start = std::chrono::steady_clock::now();
wait_for_work(timeout); wait_for_work(timeout);
auto end = std::chrono::steady_clock::now(); auto const end = std::chrono::steady_clock::now();
auto wait_duration = std::chrono::duration_cast<std::chrono::microseconds>(end - start); auto const wait_duration = std::chrono::duration_cast<std::chrono::microseconds>(end - start);
std::ostringstream oss; std::ostringstream oss;
oss << "{\"operation\": \"wait_for_work\", \"wait_duration\": " << wait_duration.count() << "}"; oss << "{\"operation\": \"wait_for_work\", \"wait_duration\": " << wait_duration.count() << "}";
log_entry(logger_, oss.str()); log_entry(logger_, oss.str());
start = std::chrono::steady_clock::now(); auto const exec_start = std::chrono::steady_clock::now();
success = get_next_ready_executable(any_executable); success = get_next_ready_executable(any_executable);
end = std::chrono::steady_clock::now(); auto const exec_end = std::chrono::steady_clock::now();
auto get_next_duration = std::chrono::duration_cast<std::chrono::microseconds>(end - start); auto const get_next_duration = std::chrono::duration_cast<std::chrono::microseconds>(exec_end - exec_start);
oss.str(""); oss.str("");
oss << "{\"operation\": \"get_next_executable\", \"duration\": " << get_next_duration.count() << ", \"result\": " << success << "}"; oss << "{\"operation\": \"get_next_executable\", \"duration\": " << get_next_duration.count()
<< ", \"result\": " << success << "}";
log_entry(logger_, oss.str()); log_entry(logger_, oss.str());
return success; return success;
} }
// TODO: since we're calling this more often, clean it up a bit // TODO: since we're calling this more often, clean it up a bit
void void
TimedExecutor::wait_for_work(std::chrono::nanoseconds timeout) TimedExecutor::wait_for_work(std::chrono::nanoseconds timeout)
{ {
{ {
std::unique_lock<std::mutex> lock(memory_strategy_mutex_); std::unique_lock<std::mutex> lock(memory_strategy_mutex_);
// Collect the subscriptions and timers to be waited on // Collect the subscriptions and timers to be waited on
memory_strategy_->clear_handles(); memory_strategy_->clear_handles();
bool has_invalid_weak_nodes = memory_strategy_->collect_entities(weak_nodes_); bool has_invalid_weak_nodes = memory_strategy_->collect_entities(weak_nodes_);
// Clean up any invalid nodes, if they were detected // Clean up any invalid nodes, if they were detected
if (has_invalid_weak_nodes) if (has_invalid_weak_nodes)
{
auto node_it = weak_nodes_.begin();
auto gc_it = guard_conditions_.begin();
while (node_it != weak_nodes_.end())
{ {
if (node_it->expired()) auto node_it = weak_nodes_.begin();
{ auto gc_it = guard_conditions_.begin();
node_it = weak_nodes_.erase(node_it); while (node_it != weak_nodes_.end())
memory_strategy_->remove_guard_condition(*gc_it); {
gc_it = guard_conditions_.erase(gc_it); if (node_it->expired())
} {
else node_it = weak_nodes_.erase(node_it);
{ memory_strategy_->remove_guard_condition(*gc_it);
++node_it; gc_it = guard_conditions_.erase(gc_it);
++gc_it; }
} else
{
++node_it;
++gc_it;
}
}
}
// clear wait set
rcl_ret_t ret = rcl_wait_set_clear(&wait_set_);
if (ret != RCL_RET_OK)
{
rclcpp::exceptions::throw_from_rcl_error(ret, "Couldn't clear wait set");
} }
}
// clear wait set
rcl_ret_t ret = rcl_wait_set_clear(&wait_set_);
if (ret != RCL_RET_OK)
{
rclcpp::exceptions::throw_from_rcl_error(ret, "Couldn't clear wait set");
}
// The size of waitables are accounted for in size of the other entities // The size of waitables are accounted for in size of the other entities
ret = rcl_wait_set_resize( ret = rcl_wait_set_resize(
&wait_set_, memory_strategy_->number_of_ready_subscriptions(), &wait_set_, memory_strategy_->number_of_ready_subscriptions(),
memory_strategy_->number_of_guard_conditions(), memory_strategy_->number_of_ready_timers(), memory_strategy_->number_of_guard_conditions(), memory_strategy_->number_of_ready_timers(),
memory_strategy_->number_of_ready_clients(), memory_strategy_->number_of_ready_services(), memory_strategy_->number_of_ready_clients(), memory_strategy_->number_of_ready_services(),
memory_strategy_->number_of_ready_events()); memory_strategy_->number_of_ready_events());
if (RCL_RET_OK != ret) if (RCL_RET_OK != ret)
{ {
rclcpp::exceptions::throw_from_rcl_error(ret, "Couldn't resize the wait set"); rclcpp::exceptions::throw_from_rcl_error(ret, "Couldn't resize the wait set");
} }
if (!memory_strategy_->add_handles_to_wait_set(&wait_set_)) if (!memory_strategy_->add_handles_to_wait_set(&wait_set_))
{ {
throw std::runtime_error("Couldn't fill wait set"); throw std::runtime_error("Couldn't fill wait set");
} }
} }
rcl_ret_t status = rcl_ret_t status =
rcl_wait(&wait_set_, std::chrono::duration_cast<std::chrono::nanoseconds>(timeout).count()); rcl_wait(&wait_set_, std::chrono::duration_cast<std::chrono::nanoseconds>(timeout).count());
if (status == RCL_RET_WAIT_SET_EMPTY) if (status == RCL_RET_WAIT_SET_EMPTY)
{ {
RCUTILS_LOG_WARN_NAMED( RCUTILS_LOG_WARN_NAMED(
"rclcpp", "rclcpp",
"empty wait set received in rcl_wait(). This should never happen."); "empty wait set received in rcl_wait(). This should never happen.");
} }
else if (status != RCL_RET_OK && status != RCL_RET_TIMEOUT) else if (status != RCL_RET_OK && status != RCL_RET_TIMEOUT)
{ {
using rclcpp::exceptions::throw_from_rcl_error; using rclcpp::exceptions::throw_from_rcl_error;
throw_from_rcl_error(status, "rcl_wait() failed"); throw_from_rcl_error(status, "rcl_wait() failed");
} }
// check the null handles in the wait set and remove them from the handles in memory strategy // check the null handles in the wait set and remove them from the handles in memory strategy
// for callback-based entities // for callback-based entities
memory_strategy_->remove_null_handles(&wait_set_); memory_strategy_->remove_null_handles(&wait_set_);
} }
bool bool
TimedExecutor::get_next_ready_executable(rclcpp::AnyExecutable &any_executable) TimedExecutor::get_next_ready_executable(rclcpp::AnyExecutable &any_executable)
{ {
bool success = false; bool success = false;
if (use_priorities) if (use_priorities)
{ {
std::shared_ptr<PriorityMemoryStrategy<>> strat = std::dynamic_pointer_cast<PriorityMemoryStrategy<>>(memory_strategy_); std::shared_ptr<PriorityMemoryStrategy<>> strat = std::dynamic_pointer_cast<PriorityMemoryStrategy<>>(memory_strategy_);
strat->get_next_executable(any_executable, weak_nodes_); strat->get_next_executable(any_executable, weak_nodes_);
if (any_executable.timer || any_executable.subscription || any_executable.service || any_executable.client || any_executable.waitable) if (any_executable.timer || any_executable.subscription || any_executable.service || any_executable.client || any_executable.waitable)
{ {
success = true; success = true;
} }
} }
else else
{ {
// Check the timers to see if there are any that are ready // Check the timers to see if there are any that are ready
memory_strategy_->get_next_timer(any_executable, weak_nodes_); memory_strategy_->get_next_timer(any_executable, weak_nodes_);
if (any_executable.timer) if (any_executable.timer)
{
std::cout << "got timer" << std::endl;
success = true;
}
if (!success)
{
// Check the subscriptions to see if there are any that are ready
memory_strategy_->get_next_subscription(any_executable, weak_nodes_);
if (any_executable.subscription)
{ {
// std::cout << "got subs" << std::endl; std::cout << "got timer" << std::endl;
success = true; success = true;
} }
} if (!success)
if (!success)
{
// Check the services to see if there are any that are ready
memory_strategy_->get_next_service(any_executable, weak_nodes_);
if (any_executable.service)
{ {
std::cout << "got serv" << std::endl; // Check the subscriptions to see if there are any that are ready
success = true; memory_strategy_->get_next_subscription(any_executable, weak_nodes_);
if (any_executable.subscription)
{
// std::cout << "got subs" << std::endl;
success = true;
}
} }
} if (!success)
if (!success)
{
// Check the clients to see if there are any that are ready
memory_strategy_->get_next_client(any_executable, weak_nodes_);
if (any_executable.client)
{ {
std::cout << "got client" << std::endl; // Check the services to see if there are any that are ready
success = true; memory_strategy_->get_next_service(any_executable, weak_nodes_);
if (any_executable.service)
{
std::cout << "got serv" << std::endl;
success = true;
}
} }
} if (!success)
if (!success)
{
// Check the waitables to see if there are any that are ready
memory_strategy_->get_next_waitable(any_executable, weak_nodes_);
if (any_executable.waitable)
{ {
std::cout << "got wait" << std::endl; // Check the clients to see if there are any that are ready
success = true; memory_strategy_->get_next_client(any_executable, weak_nodes_);
if (any_executable.client)
{
std::cout << "got client" << std::endl;
success = true;
}
}
if (!success)
{
// Check the waitables to see if there are any that are ready
memory_strategy_->get_next_waitable(any_executable, weak_nodes_);
if (any_executable.waitable)
{
std::cout << "got wait" << std::endl;
success = true;
}
} }
}
} }
// At this point any_exec should be valid with either a valid subscription // At this point any_exec should be valid with either a valid subscription
// or a valid timer, or it should be a null shared_ptr // or a valid timer, or it should be a null shared_ptr
if (success) if (success)
{ {
// If it is valid, check to see if the group is mutually exclusive or // If it is valid, check to see if the group is mutually exclusive or
// not, then mark it accordingly // not, then mark it accordingly
using rclcpp::callback_group::CallbackGroupType; using rclcpp::callback_group::CallbackGroupType;
if ( if (
any_executable.callback_group && any_executable.callback_group &&
any_executable.callback_group->type() == rclcpp::CallbackGroupType::MutuallyExclusive) any_executable.callback_group->type() == rclcpp::CallbackGroupType::MutuallyExclusive)
{ {
// It should not have been taken otherwise // It should not have been taken otherwise
assert(any_executable.callback_group->can_be_taken_from().load()); assert(any_executable.callback_group->can_be_taken_from().load());
// Set to false to indicate something is being run from this group // Set to false to indicate something is being run from this group
// This is reset to true either when the any_exec is executed or when the // This is reset to true either when the any_exec is executed or when the
// any_exec is destructued // any_exec is destructued
any_executable.callback_group->can_be_taken_from().store(false); any_executable.callback_group->can_be_taken_from().store(false);
} }
} }
// If there is no ready executable, return false // If there is no ready executable, return false
return success; return success;
} }
void TimedExecutor::set_use_priorities(bool use_prio) void TimedExecutor::set_use_priorities(bool use_prio)
{ {
use_priorities = use_prio; use_priorities = use_prio;
} }
} // namespace timed_executor } // namespace timed_executor

View file

@ -4,233 +4,479 @@
#include <rclcpp/logging.hpp> #include <rclcpp/logging.hpp>
#include <semaphore.h> #include <semaphore.h>
#include <sstream> #include <sstream>
using namespace priority_executor;
template<typename Alloc>
PriorityMemoryStrategy<Alloc>::PriorityMemoryStrategy(std::shared_ptr<Alloc> allocator)
{
allocator_ = std::make_shared<VoidAlloc>(*allocator.get());
logger_ = create_logger();
}
template<>
PriorityMemoryStrategy<>::PriorityMemoryStrategy()
{
allocator_ = std::make_shared<VoidAlloc>();
logger_ = create_logger();
}
std::ostream& priority_executor::operator<<(std::ostream& os, const ExecutableType& obj)
{
os << static_cast<std::underlying_type<ExecutableType>::type>(obj);
return os;
}
std::ostream& priority_executor::operator<<(std::ostream& os, const ExecutableScheduleType& obj)
{
os << static_cast<std::underlying_type<ExecutableScheduleType>::type>(obj);
return os;
}
std::ostream &operator<<(std::ostream &os, const PriorityExecutable &pe)
{
os << "sched_type: " << pe.sched_type << ", ";
if (pe.sched_type == ExecutableScheduleType::DEADLINE)
{
os << "period: " << pe.period << ", ";
}
os << "priority: " << pe.priority << ", ";
os << "executable_id: " << pe.executable_id << ", ";
os << "chain_id: " << pe.chain_id << ", ";
os << "is_first_in_chain: " << pe.is_first_in_chain << ", ";
os << "is_last_in_chain: " << pe.is_last_in_chain << ", ";
return os;
}
size_t PriorityExecutable::num_executables; size_t PriorityExecutable::num_executables;
PriorityExecutable::PriorityExecutable(std::shared_ptr<const void> h, int p, template<>
ExecutableType t, std::shared_ptr<PriorityExecutable>
ExecutableScheduleType sched_type) { PriorityMemoryStrategy<>::get_priority_settings(std::shared_ptr<const void> executable)
std::cout << "priority_executable constructor called" << std::endl; {
std::cout << "type: " << t << std::endl; auto search = priority_map.find(executable);
handle = h; if (search != priority_map.end())
type = t; {
if (sched_type == CHAIN_INDEPENDENT_PRIORITY || return search->second;
sched_type == CHAIN_AWARE_PRIORITY) {
priority = p;
} else if (sched_type == DEADLINE) {
period = p;
// as a tiebreaker
priority = num_executables;
} }
this->sched_type = sched_type; return nullptr;
this->executable_id = num_executables;
num_executables += 1;
} }
PriorityExecutable::PriorityExecutable() { template<>
handle = nullptr; std::shared_ptr<PriorityExecutable>
priority = 0; PriorityMemoryStrategy<>::get_and_reset_priority(std::shared_ptr<const void> executable,
type = SUBSCRIPTION; ExecutableType t)
{
std::shared_ptr<PriorityExecutable> p = get_priority_settings(executable);
if (p == nullptr)
{
priority_map[executable] =
std::make_shared<PriorityExecutable>(executable, 0, t);
p = priority_map[executable];
}
return p;
} }
void PriorityExecutable::dont_run() { this->can_be_run = false; } template<>
rcl_allocator_t PriorityMemoryStrategy<>::get_allocator()
{
return rclcpp::allocator::get_rcl_allocator<void *, VoidAlloc>(
*allocator_.get());
}
void PriorityExecutable::allow_run() { this->can_be_run = true; } template<>
size_t PriorityMemoryStrategy<>::number_of_ready_subscriptions() const
{
size_t number_of_subscriptions = subscription_handles_.size();
// std::cout << "ready_raw: " << number_of_subscriptions << std::endl;
for (auto waitable : waitable_handles_)
{
number_of_subscriptions += waitable->get_number_of_ready_subscriptions();
}
return number_of_subscriptions;
}
void PriorityExecutable::increment_counter() { this->counter += 1; } template<>
size_t PriorityMemoryStrategy<>::number_of_ready_services() const
{
size_t number_of_services = service_handles_.size();
for (auto waitable : waitable_handles_)
{
number_of_services += waitable->get_number_of_ready_services();
}
return number_of_services;
}
bool PriorityExecutable::operator==(const PriorityExecutable &other) const { template<>
std::cout << "PriorityExecutable::operator== called" << std::endl; size_t PriorityMemoryStrategy<>::number_of_ready_events() const
if (this->handle == other.handle) { {
return true; size_t number_of_events = 0;
} else { for (auto waitable : waitable_handles_)
return false; {
number_of_events += waitable->get_number_of_ready_events();
}
return number_of_events;
}
template<>
size_t PriorityMemoryStrategy<>::number_of_ready_clients() const
{
size_t number_of_clients = client_handles_.size();
for (auto waitable : waitable_handles_)
{
number_of_clients += waitable->get_number_of_ready_clients();
}
return number_of_clients;
}
template<>
size_t PriorityMemoryStrategy<>::number_of_guard_conditions() const
{
size_t number_of_guard_conditions = guard_conditions_.size();
for (auto waitable : waitable_handles_)
{
number_of_guard_conditions +=
waitable->get_number_of_ready_guard_conditions();
}
return number_of_guard_conditions;
}
template<>
size_t PriorityMemoryStrategy<>::number_of_ready_timers() const
{
size_t number_of_timers = timer_handles_.size();
for (auto waitable : waitable_handles_)
{
number_of_timers += waitable->get_number_of_ready_timers();
}
return number_of_timers;
}
template<>
size_t PriorityMemoryStrategy<>::number_of_waitables() const
{
return waitable_handles_.size();
}
template<>
void PriorityMemoryStrategy<>::set_executable_priority(std::shared_ptr<const void> handle, int priority,
ExecutableType t)
{
// TODO: any sanity checks should go here
priority_map[handle] =
std::make_shared<PriorityExecutable>(handle, priority, t);
}
template<>
void PriorityMemoryStrategy<>::set_executable_priority(std::shared_ptr<const void> handle, int priority,
ExecutableType t, ExecutableScheduleType sc,
int chain_index)
{
// TODO: any sanity checks should go here
priority_map[handle] =
std::make_shared<PriorityExecutable>(handle, priority, t, sc);
priority_map[handle]->chain_id = chain_index;
}
template<>
void PriorityMemoryStrategy<>::set_executable_deadline(std::shared_ptr<const void> handle, int period,
ExecutableType t, int chain_id,
std::string name)
{
// TODO: any sanity checks should go here
priority_map[handle] =
std::make_shared<PriorityExecutable>(handle, period, t, ExecutableScheduleType::DEADLINE);
priority_map[handle]->chain_id = chain_id;
priority_map[handle]->name = name;
// is there a deadline queue for this chain id?
auto search = chain_deadlines.find(chain_id);
if (search == chain_deadlines.end())
{
chain_deadlines[chain_id] = std::make_shared<std::deque<uint64_t>>();
}
priority_map[handle]->deadlines = chain_deadlines[chain_id].get();
}
template<>
int PriorityMemoryStrategy<>::get_priority(std::shared_ptr<const void> executable)
{
auto search = priority_map.find(executable);
if (search != priority_map.end())
{
return search->second->priority;
}
else
{
return 0;
} }
} }
bool PriorityExecutableComparator::operator()(const PriorityExecutable *p1, template<>
const PriorityExecutable *p2) { void PriorityMemoryStrategy<>::set_first_in_chain(std::shared_ptr<const void> exec_handle)
// since this will be used in a std::set, also check for equality {
if (p1 == nullptr || p2 == nullptr) { get_priority_settings(exec_handle)->is_first_in_chain = true;
// TODO: realistic value }
std::cout << "PriorityExecutableComparator::operator() called with nullptr"
<< std::endl;
return false;
}
if (p1->handle == p2->handle) {
return false;
}
if (p1->executable_id == p2->executable_id) {
return false;
}
if (p1->sched_type != p2->sched_type) { template<>
// in order: DEADLINE, CHAIN_AWARE, CHAIN_INDEPENDENT void PriorityMemoryStrategy<>::set_last_in_chain(std::shared_ptr<const void> exec_handle)
return p1->sched_type < p2->sched_type; {
} get_priority_settings(exec_handle)->is_last_in_chain = true;
if (p1->sched_type == CHAIN_INDEPENDENT_PRIORITY) { }
if (p1->priority == p2->priority) {
return p1->executable_id < p2->executable_id;
}
// lower value runs first
return p1->priority < p2->priority;
}
if (p1->sched_type == CHAIN_AWARE_PRIORITY) {
if (p1->priority == p2->priority) {
return p1->executable_id < p2->executable_id;
}
return p1->priority < p2->priority;
}
if (p1->sched_type == DEADLINE) {
// TODO: use the counter logic here as well
uint64_t p1_deadline = 0; template<>
uint64_t p2_deadline = 0; void PriorityMemoryStrategy<>::assign_deadlines_queue(std::shared_ptr<const void> exec_handle,
if (p1->deadlines != nullptr && !p1->deadlines->empty()) { std::deque<uint64_t> *deadlines)
p1_deadline = p1->deadlines->front(); {
get_priority_settings(exec_handle)->deadlines = deadlines;
}
template<>
std::shared_ptr<std::deque<uint64_t>> PriorityMemoryStrategy<>::get_chain_deadlines(int chain_id)
{
auto search = chain_deadlines.find(chain_id);
if (search != chain_deadlines.end())
{
return search->second;
}
else
{
return nullptr;
}
}
PriorityExecutable::PriorityExecutable(
std::shared_ptr<void const> h,
int p,
ExecutableType t,
ExecutableScheduleType sched_type) {
handle = std::move(h);
std::cout << "priority_executable constructor called" << std::endl;
std::cout << "type: " << t << std::endl;
type = t;
if (sched_type == ExecutableScheduleType::CHAIN_INDEPENDENT_PRIORITY
|| sched_type == ExecutableScheduleType::CHAIN_AWARE_PRIORITY) {
priority = p;
} else if (sched_type == ExecutableScheduleType::DEADLINE) {
period = p;
// as a tiebreaker
priority = num_executables;
} }
if (p2->deadlines != nullptr && !p2->deadlines->empty()) {
p2_deadline = p2->deadlines->front(); this->sched_type = sched_type;
this->executable_id = num_executables;
num_executables += 1;
}
PriorityExecutable::PriorityExecutable()
: handle(nullptr)
, type(ExecutableType::SUBSCRIPTION)
, priority(0) {
}
void PriorityExecutable::dont_run() {
this->can_be_run = false;
}
void PriorityExecutable::allow_run() {
this->can_be_run = true;
}
void PriorityExecutable::increment_counter() {
this->counter += 1;
}
bool PriorityExecutable::operator==(PriorityExecutable const& other) const {
std::cout << "PriorityExecutable::operator== called" << std::endl;
return this->handle == other.handle;
}
bool PriorityExecutableComparator::operator()(
PriorityExecutable const *p1,
PriorityExecutable const *p2) const
{
if (p1 == nullptr || p2 == nullptr) {
return false;
} }
if (p1_deadline == p2_deadline || p1->deadlines == p2->deadlines) {
// this looks bad and is bad, BUT if (p1->handle == p2->handle || p1->executable_id == p2->executable_id) {
// if we tell std::set these are equal, only one will be added, and we return false;
// will lose the other. we need _something_ to make them unique since we'd
// rather finish a chain than start a new one, select the higher id return
// p1->executable_id > p2->executable_id;
// return p1->priority < p2->priority;
if (p1->priority != p2->priority) {
return p1->priority < p2->priority;
}
return p1->executable_id < p2->executable_id;
} }
if (p1_deadline == 0) {
p1_deadline = std::numeric_limits<uint64_t>::max(); if (p1->sched_type != p2->sched_type) {
// in order: DEADLINE, CHAIN_AWARE, CHAIN_INDEPENDENT
return p1->sched_type < p2->sched_type;
} }
if (p2_deadline == 0) {
p2_deadline = std::numeric_limits<uint64_t>::max(); if (p1->sched_type == ExecutableScheduleType::CHAIN_INDEPENDENT_PRIORITY) {
return p1->priority == p2->priority ?
p1->executable_id < p2->executable_id :
p1->priority < p2->priority;
} }
return p1_deadline < p2_deadline;
} else { if (p1->sched_type == ExecutableScheduleType::CHAIN_AWARE_PRIORITY) {
return p1->priority == p2->priority ?
p1->executable_id < p2->executable_id :
// tie-breaker: lower value runs first
p1->priority < p2->priority;
}
if (p1->sched_type == ExecutableScheduleType::DEADLINE) {
uint64_t p1_deadline = 0;
uint64_t p2_deadline = 0;
if (p1->deadlines != nullptr && !p1->deadlines->empty()) {
p1_deadline = p1->deadlines->front();
}
if (p2->deadlines != nullptr && !p2->deadlines->empty()) {
p2_deadline = p2->deadlines->front();
}
if (p1_deadline == p2_deadline || p1->deadlines == p2->deadlines) {
// this looks bad and is bad, BUT
// if we tell std::set these are equal, only one will be added, and we
// will lose the other. we need _something_ to make them unique since we'd
// rather finish a chain than start a new one, select the higher id return
// p1->executable_id > p2->executable_id;
// return p1->priority < p2->priority;
if (p1->priority != p2->priority) {
return p1->priority < p2->priority;
}
return p1->executable_id < p2->executable_id;
}
if (p1_deadline == 0) {
p1_deadline = std::numeric_limits<uint64_t>::max();
}
if (p2_deadline == 0) {
p2_deadline = std::numeric_limits<uint64_t>::max();
}
return p1_deadline < p2_deadline;
}
std::cout << "invalid compare opration on priority_exec" << std::endl; std::cout << "invalid compare opration on priority_exec" << std::endl;
return false; return false;
}
} }
template <> template <>
void PriorityMemoryStrategy<>::add_guard_condition( void PriorityMemoryStrategy<>::add_guard_condition(
const rcl_guard_condition_t *guard_condition) { rcl_guard_condition_t const* guard_condition) {
for (const auto &existing_guard_condition : guard_conditions_) { for (auto const& existing_guard_condition : guard_conditions_) {
if (existing_guard_condition == guard_condition) { if (existing_guard_condition == guard_condition) {
return; return;
}
} }
} guard_conditions_.push_back(guard_condition);
guard_conditions_.push_back(guard_condition);
} }
template <> template <>
void PriorityMemoryStrategy<>::remove_guard_condition( void PriorityMemoryStrategy<>::remove_guard_condition(
const rcl_guard_condition_t *guard_condition) { rcl_guard_condition_t const* guard_condition) {
for (auto it = guard_conditions_.begin(); it != guard_conditions_.end(); for (auto it = guard_conditions_.begin(); it != guard_conditions_.end(); ++it) {
++it) { if (*it == guard_condition) {
if (*it == guard_condition) { guard_conditions_.erase(it);
guard_conditions_.erase(it); break;
break; }
} }
}
} }
template <> template <>
void PriorityMemoryStrategy<>::add_executable_to_queue( void PriorityMemoryStrategy<>::add_executable_to_queue(
std::shared_ptr<PriorityExecutable> e) { std::shared_ptr<PriorityExecutable> e) {
// e may have changed. remove and re-add // e may have changed. remove and re-add
all_executables_.erase(e.get()); all_executables_.erase(e.get());
// convert from shared_ptr to raw pointer // convert from shared_ptr to raw pointer
all_executables_.insert(e.get()); all_executables_.insert(e.get());
} }
template <> void PriorityMemoryStrategy<>::clear_handles() { template <>
subscription_handles_.clear(); void PriorityMemoryStrategy<>::clear_handles() {
service_handles_.clear(); subscription_handles_.clear();
client_handles_.clear(); service_handles_.clear();
timer_handles_.clear(); client_handles_.clear();
waitable_handles_.clear(); timer_handles_.clear();
waitable_handles_.clear();
// all_executables_ = std::priority_queue<const PriorityExecutable *, // all_executables_ = std::priority_queue<const PriorityExecutable *,
// std::vector<const PriorityExecutable *>, PriorityExecutableComparator>(); // std::vector<const PriorityExecutable *>, PriorityExecutableComparator>();
// all_executables_.clear(); // all_executables_.clear();
// all_executables_ = std::map<const PriorityExecutable *, int, // all_executables_ = std::map<const PriorityExecutable *, int,
// PriorityExecutableComparator>(PriorityExecutableComparator()); // PriorityExecutableComparator>(PriorityExecutableComparator());
} }
template <> template <>
void PriorityMemoryStrategy<>::remove_null_handles(rcl_wait_set_t *wait_set) { void PriorityMemoryStrategy<>::remove_null_handles(rcl_wait_set_t* wait_set) {
// TODO(jacobperron): Check if wait set sizes are what we expect them to be? // TODO(jacobperron): Check if wait set sizes are what we expect them to be?
// e.g. wait_set->size_of_clients == client_handles_.size() // e.g. wait_set->size_of_clients == client_handles_.size()
// Important to use subscription_handles_.size() instead of wait set's size // Important to use subscription_handles_.size() instead of wait set's size
// since there may be more subscriptions in the wait set due to Waitables // since there may be more subscriptions in the wait set due to Waitables
// added to the end. The same logic applies for other entities. // added to the end. The same logic applies for other entities.
for (size_t i = 0; i < subscription_handles_.size(); ++i) { for (size_t i = 0; i < subscription_handles_.size(); ++i) {
if (!wait_set->subscriptions[i]) { if (!wait_set->subscriptions[i]) {
priority_map[subscription_handles_[i]]->dont_run(); priority_map[subscription_handles_[i]]->dont_run();
subscription_handles_[i].reset(); subscription_handles_[i].reset();
} else { } else {
priority_map[subscription_handles_[i]]->allow_run(); priority_map[subscription_handles_[i]]->allow_run();
}
} }
}
for (size_t i = 0; i < service_handles_.size(); ++i) { for (size_t i = 0; i < service_handles_.size(); ++i) {
if (!wait_set->services[i]) { if (!wait_set->services[i]) {
priority_map[service_handles_[i]]->dont_run(); priority_map[service_handles_[i]]->dont_run();
service_handles_[i].reset(); service_handles_[i].reset();
} else { } else {
priority_map[service_handles_[i]]->allow_run(); priority_map[service_handles_[i]]->allow_run();
}
} }
}
for (size_t i = 0; i < client_handles_.size(); ++i) { for (size_t i = 0; i < client_handles_.size(); ++i) {
if (!wait_set->clients[i]) { if (!wait_set->clients[i]) {
priority_map[client_handles_[i]]->dont_run(); priority_map[client_handles_[i]]->dont_run();
client_handles_[i].reset(); client_handles_[i].reset();
} else { } else {
priority_map[client_handles_[i]]->allow_run(); priority_map[client_handles_[i]]->allow_run();
}
} }
}
for (size_t i = 0; i < timer_handles_.size(); ++i) { for (size_t i = 0; i < timer_handles_.size(); ++i) {
if (!wait_set->timers[i]) { if (!wait_set->timers[i]) {
priority_map[timer_handles_[i]]->dont_run(); priority_map[timer_handles_[i]]->dont_run();
timer_handles_[i].reset(); timer_handles_[i].reset();
} else { } else {
priority_map[timer_handles_[i]]->allow_run(); priority_map[timer_handles_[i]]->allow_run();
}
} }
}
for (size_t i = 0; i < waitable_handles_.size(); ++i) { for (size_t i = 0; i < waitable_handles_.size(); ++i) {
if (!waitable_handles_[i]->is_ready(wait_set)) { if (!waitable_handles_[i]->is_ready(wait_set)) {
priority_map[waitable_handles_[i]]->dont_run(); priority_map[waitable_handles_[i]]->dont_run();
waitable_handles_[i].reset(); waitable_handles_[i].reset();
} else { } else {
priority_map[waitable_handles_[i]]->allow_run(); priority_map[waitable_handles_[i]]->allow_run();
}
} }
}
subscription_handles_.erase(std::remove(subscription_handles_.begin(), subscription_handles_.erase(std::remove(subscription_handles_.begin(),
subscription_handles_.end(), nullptr), subscription_handles_.end(), nullptr),
subscription_handles_.end()); subscription_handles_.end());
service_handles_.erase( service_handles_.erase(
std::remove(service_handles_.begin(), service_handles_.end(), nullptr), std::remove(service_handles_.begin(), service_handles_.end(), nullptr),
service_handles_.end()); service_handles_.end());
client_handles_.erase( client_handles_.erase(
std::remove(client_handles_.begin(), client_handles_.end(), nullptr), std::remove(client_handles_.begin(), client_handles_.end(), nullptr),
client_handles_.end()); client_handles_.end());
timer_handles_.erase( timer_handles_.erase(
std::remove(timer_handles_.begin(), timer_handles_.end(), nullptr), std::remove(timer_handles_.begin(), timer_handles_.end(), nullptr),
timer_handles_.end()); timer_handles_.end());
waitable_handles_.erase( waitable_handles_.erase(
std::remove(waitable_handles_.begin(), waitable_handles_.end(), nullptr), std::remove(waitable_handles_.begin(), waitable_handles_.end(), nullptr),
waitable_handles_.end()); waitable_handles_.end());
} }
template <> template <>
@ -256,33 +502,33 @@ bool PriorityMemoryStrategy<>::collect_entities(
auto subscription_handle = subscription->get_subscription_handle(); auto subscription_handle = subscription->get_subscription_handle();
subscription_handles_.push_back(subscription_handle); subscription_handles_.push_back(subscription_handle);
add_executable_to_queue( add_executable_to_queue(
get_and_reset_priority(subscription_handle, SUBSCRIPTION)); get_and_reset_priority(subscription_handle, ExecutableType::SUBSCRIPTION));
return false; return false;
}); });
group->find_service_ptrs_if( group->find_service_ptrs_if(
[this](const rclcpp::ServiceBase::SharedPtr &service) { [this](const rclcpp::ServiceBase::SharedPtr &service) {
add_executable_to_queue( add_executable_to_queue(
get_and_reset_priority(service->get_service_handle(), SERVICE)); get_and_reset_priority(service->get_service_handle(), ExecutableType::SERVICE));
service_handles_.push_back(service->get_service_handle()); service_handles_.push_back(service->get_service_handle());
return false; return false;
}); });
group->find_client_ptrs_if( group->find_client_ptrs_if(
[this](const rclcpp::ClientBase::SharedPtr &client) { [this](const rclcpp::ClientBase::SharedPtr &client) {
add_executable_to_queue( add_executable_to_queue(
get_and_reset_priority(client->get_client_handle(), CLIENT)); get_and_reset_priority(client->get_client_handle(), ExecutableType::CLIENT));
client_handles_.push_back(client->get_client_handle()); client_handles_.push_back(client->get_client_handle());
return false; return false;
}); });
group->find_timer_ptrs_if( group->find_timer_ptrs_if(
[this](const rclcpp::TimerBase::SharedPtr &timer) { [this](const rclcpp::TimerBase::SharedPtr &timer) {
add_executable_to_queue( add_executable_to_queue(
get_and_reset_priority(timer->get_timer_handle(), TIMER)); get_and_reset_priority(timer->get_timer_handle(), ExecutableType::TIMER));
timer_handles_.push_back(timer->get_timer_handle()); timer_handles_.push_back(timer->get_timer_handle());
return false; return false;
}); });
group->find_waitable_ptrs_if( group->find_waitable_ptrs_if(
[this](const rclcpp::Waitable::SharedPtr &waitable) { [this](const rclcpp::Waitable::SharedPtr &waitable) {
add_executable_to_queue(get_and_reset_priority(waitable, WAITABLE)); add_executable_to_queue(get_and_reset_priority(waitable, ExecutableType::WAITABLE));
waitable_handles_.push_back(waitable); waitable_handles_.push_back(waitable);
return false; return false;
}); });
@ -376,7 +622,7 @@ void PriorityMemoryStrategy<>::get_next_executable(
} }
ExecutableType type = next_exec->type; ExecutableType type = next_exec->type;
switch (type) { switch (type) {
case SUBSCRIPTION: { case ExecutableType::SUBSCRIPTION: {
std::shared_ptr<const rcl_subscription_t> subs_handle = std::shared_ptr<const rcl_subscription_t> subs_handle =
std::static_pointer_cast<const rcl_subscription_t>(next_exec->handle); std::static_pointer_cast<const rcl_subscription_t>(next_exec->handle);
auto subscription = get_subscription_by_handle(subs_handle, weak_nodes); auto subscription = get_subscription_by_handle(subs_handle, weak_nodes);
@ -408,7 +654,7 @@ void PriorityMemoryStrategy<>::get_next_executable(
// subscription->get_topic_name() << std::endl; // subscription->get_topic_name() << std::endl;
} }
} break; } break;
case SERVICE: { case ExecutableType::SERVICE: {
std::shared_ptr<const rcl_service_t> service_handle = std::shared_ptr<const rcl_service_t> service_handle =
std::static_pointer_cast<const rcl_service_t>(next_exec->handle); std::static_pointer_cast<const rcl_service_t>(next_exec->handle);
auto service = get_service_by_handle(service_handle, weak_nodes); auto service = get_service_by_handle(service_handle, weak_nodes);
@ -433,7 +679,7 @@ void PriorityMemoryStrategy<>::get_next_executable(
// service->get_service_name() << std::endl; // service->get_service_name() << std::endl;
} }
} break; } break;
case CLIENT: { case ExecutableType::CLIENT: {
std::shared_ptr<const rcl_client_t> client_handle = std::shared_ptr<const rcl_client_t> client_handle =
std::static_pointer_cast<const rcl_client_t>(next_exec->handle); std::static_pointer_cast<const rcl_client_t>(next_exec->handle);
auto client = get_client_by_handle(client_handle, weak_nodes); auto client = get_client_by_handle(client_handle, weak_nodes);
@ -458,7 +704,7 @@ void PriorityMemoryStrategy<>::get_next_executable(
// client->get_service_name() << std::endl; // client->get_service_name() << std::endl;
} }
} break; } break;
case TIMER: { case ExecutableType::TIMER: {
std::shared_ptr<const rcl_timer_t> timer_handle = std::shared_ptr<const rcl_timer_t> timer_handle =
std::static_pointer_cast<const rcl_timer_t>(next_exec->handle); std::static_pointer_cast<const rcl_timer_t>(next_exec->handle);
auto timer = get_timer_by_handle(timer_handle, weak_nodes); auto timer = get_timer_by_handle(timer_handle, weak_nodes);
@ -487,7 +733,7 @@ void PriorityMemoryStrategy<>::get_next_executable(
any_exec.node_base = get_node_by_group(group, weak_nodes); any_exec.node_base = get_node_by_group(group, weak_nodes);
} }
} break; } break;
case WAITABLE: { case ExecutableType::WAITABLE: {
std::shared_ptr<rclcpp::Waitable> waitable_handle = std::shared_ptr<rclcpp::Waitable> waitable_handle =
std::static_pointer_cast<rclcpp::Waitable>(next_exec->waitable); std::static_pointer_cast<rclcpp::Waitable>(next_exec->waitable);
auto waitable = waitable_handle; auto waitable = waitable_handle;
@ -516,7 +762,7 @@ void PriorityMemoryStrategy<>::get_next_executable(
continue; continue;
// break; // break;
} }
if (next_exec->is_first_in_chain && next_exec->sched_type == DEADLINE) { if (next_exec->is_first_in_chain && next_exec->sched_type == ExecutableScheduleType::DEADLINE) {
// std::cout << "running first in chain deadline" << std::endl; // std::cout << "running first in chain deadline" << std::endl;
} }
/* std::ostringstream oss; /* std::ostringstream oss;
@ -576,7 +822,8 @@ void PriorityMemoryStrategy<>::post_execute(rclcpp::AnyExecutable any_exec,
// callback has executed // callback has executed
// std::cout<< "running callback. first? " << next_exec->is_first_in_chain << // std::cout<< "running callback. first? " << next_exec->is_first_in_chain <<
// " type " << next_exec->sched_type << std::endl; // " type " << next_exec->sched_type << std::endl;
if (next_exec->is_first_in_chain && next_exec->sched_type != DEADLINE) { if (next_exec->is_first_in_chain
&& next_exec->sched_type != ExecutableScheduleType::DEADLINE) {
/* /*
timespec current_time; timespec current_time;
clock_gettime(CLOCK_MONOTONIC_RAW, &current_time); clock_gettime(CLOCK_MONOTONIC_RAW, &current_time);
@ -590,7 +837,8 @@ void PriorityMemoryStrategy<>::post_execute(rclcpp::AnyExecutable any_exec,
std::to_string(time_until_next_call) << std::endl; std::to_string(time_until_next_call) << std::endl;
*/ */
} }
if (next_exec->is_last_in_chain && next_exec->sched_type == DEADLINE) { if (next_exec->is_last_in_chain
&& next_exec->sched_type == ExecutableScheduleType::DEADLINE) {
// did we make the deadline? // did we make the deadline?
timespec current_time; timespec current_time;
clock_gettime(CLOCK_MONOTONIC_RAW, &current_time); clock_gettime(CLOCK_MONOTONIC_RAW, &current_time);
@ -678,8 +926,8 @@ void PriorityMemoryStrategy<>::post_execute(rclcpp::AnyExecutable any_exec,
if (!next_exec->deadlines->empty()) if (!next_exec->deadlines->empty())
next_exec->deadlines->pop_front(); next_exec->deadlines->pop_front();
} }
if (next_exec->sched_type == CHAIN_AWARE_PRIORITY || if (next_exec->sched_type == ExecutableScheduleType::CHAIN_AWARE_PRIORITY
next_exec->sched_type == DEADLINE) { || next_exec->sched_type == ExecutableScheduleType::DEADLINE) {
// this is safe, since we popped it earlier // this is safe, since we popped it earlier
// get a mutable reference // get a mutable reference
// TODO: find a cleaner way to do this // TODO: find a cleaner way to do this
@ -856,4 +1104,4 @@ void PriorityMemoryStrategy<>::get_next_waitable(
// Else, the waitable is no longer valid, remove it and continue // Else, the waitable is no longer valid, remove it and continue
it = waitable_handles_.erase(it); it = waitable_handles_.erase(it);
} }
} }

View file

@ -1,105 +1,45 @@
#include "priority_executor/priority_executor.hpp" // Copyright 2015 Open Source Robotics Foundation, Inc.
#include "priority_executor/priority_memory_strategy.hpp" //
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <chrono>
#include <memory>
#include <string>
#include <thread>
#include "priority_executor/default_executor.hpp"
#include "priority_executor/multithread_priority_executor.hpp"
#include "rclcpp/rclcpp.hpp" #include "rclcpp/rclcpp.hpp"
#include "std_msgs/msg/string.hpp" #include "std_msgs/msg/string.hpp"
#include <fstream>
#include <string>
#include <unistd.h>
// re-create the classic talker-listener example with two listeners using namespace std::chrono_literals;
class Talker : public rclcpp::Node
{
public:
Talker() : Node("talker")
{
// Create a publisher on the "chatter" topic with 10 msg queue size.
pub_ = this->create_publisher<std_msgs::msg::String>("chatter", 10);
// Create a timer of period 1s that calls our callback member function.
timer_ = this->create_wall_timer(std::chrono::seconds(1),
std::bind(&Talker::timer_callback, this));
}
// the timer must be public
rclcpp::TimerBase::SharedPtr timer_;
private: void chatter_callback(std_msgs::msg::String::SharedPtr const msg) {
void timer_callback() RCLCPP_INFO(rclcpp::get_logger("rclcpp"), "I heard: '%s'", msg->data.c_str());
{ }
std_msgs::msg::String msg;
msg.data = "Hello World!"; int main(int argc, char* argv[]) {
RCLCPP_INFO(this->get_logger(), "Publishing: '%s'", msg.data.c_str()); rclcpp::init(argc, argv);
pub_->publish(msg);
} auto node = rclcpp::Node::make_shared("minimal_subscriber");
rclcpp::Publisher<std_msgs::msg::String>::SharedPtr pub_;
}; auto subscription = node->create_subscription<std_msgs::msg::String>(
"topic", 10, chatter_callback);
class Listener : public rclcpp::Node
{ auto executor = std::make_shared<priority_executor::ROSDefaultExecutor>();
public: executor->add_node(node);
Listener(std::string name) : Node(name) executor->spin();
{
// Create a subscription on the "chatter" topic with the default callback rclcpp::shutdown();
// method. return 0;
sub_ = this->create_subscription<std_msgs::msg::String>(
"chatter", 10,
std::bind(&Listener::callback, this, std::placeholders::_1));
}
// the publisher must be public
rclcpp::Subscription<std_msgs::msg::String>::SharedPtr sub_;
private:
void callback(const std_msgs::msg::String::SharedPtr msg)
{
RCLCPP_INFO(this->get_logger(), "I heard: '%s'", msg->data.c_str());
}
};
int main(int argc, char **argv)
{
rclcpp::init(argc, argv);
auto talker = std::make_shared<Talker>();
auto listener1 = std::make_shared<Listener>("listener1");
auto listener2 = std::make_shared<Listener>("listener2");
rclcpp::ExecutorOptions options;
auto strategy = std::make_shared<PriorityMemoryStrategy<>>();
options.memory_strategy = strategy;
auto executor = new timed_executor::TimedExecutor(options);
// must be set to post_execute can set new deadlines
executor->prio_memory_strategy_ = strategy;
// the new funcitons in PriorityMemoryStrategy accept the handle of the
// timer/subscription as the first argument
strategy->set_executable_deadline(talker->timer_->get_timer_handle(), 1000,
TIMER, 0);
// you _must_ set the timer_handle for each chain
strategy->get_priority_settings(talker->timer_->get_timer_handle())
->timer_handle = talker->timer_;
// you _must_ mark the first executable in the chain
strategy->set_first_in_chain(talker->timer_->get_timer_handle());
// set the same period and chain_id for each callback in the chain
strategy->set_executable_deadline(listener1->sub_->get_subscription_handle(),
1000, SUBSCRIPTION, 0);
strategy->set_executable_deadline(listener2->sub_->get_subscription_handle(),
1000, SUBSCRIPTION, 0);
// you _must_ mark the last executable in the chain (used to keep track of different instances of the same chain)
strategy->set_last_in_chain(listener2->sub_->get_subscription_handle());
// add all the nodes to the executor
executor->add_node(talker);
executor->add_node(listener1);
executor->add_node(listener2);
// if the executor behaves unexpectedly, you can print the priority settings to make sure they are correct
std::cout << *strategy->get_priority_settings(
talker->timer_->get_timer_handle())
<< std::endl;
std::cout << *strategy->get_priority_settings(
listener1->sub_->get_subscription_handle())
<< std::endl;
std::cout << *strategy->get_priority_settings(
listener2->sub_->get_subscription_handle())
<< std::endl;
executor->spin();
} }

View file

@ -1,48 +1,51 @@
cmake_minimum_required(VERSION 3.5) cmake_minimum_required(VERSION 3.8)
project(simple_timer) project(simple_timer VERSION 0.1.0)
# Default to C99 # Set C++ standards
if(NOT CMAKE_C_STANDARD) set(CMAKE_CXX_STANDARD 17)
set(CMAKE_C_STANDARD 99) set(CMAKE_CXX_STANDARD_REQUIRED ON)
endif() set(CMAKE_CXX_EXTENSIONS OFF)
# Default to C++14
if(NOT CMAKE_CXX_STANDARD)
set(CMAKE_CXX_STANDARD 14)
endif()
# Compiler options
if(CMAKE_COMPILER_IS_GNUCXX OR CMAKE_CXX_COMPILER_ID MATCHES "Clang") if(CMAKE_COMPILER_IS_GNUCXX OR CMAKE_CXX_COMPILER_ID MATCHES "Clang")
add_compile_options(-Wall -Wextra -Wpedantic) add_compile_options(-Wall -Wextra -Wpedantic)
endif() endif()
# find dependencies # Find dependencies
find_package(ament_cmake REQUIRED) find_package(ament_cmake REQUIRED)
# uncomment the following section in order to fill in find_package(rclcpp REQUIRED)
# further dependencies manually.
# find_package(<dependency> REQUIRED)
add_library(simple_timer src/cycle_timer.cpp src/period_timer.cpp) # Library targets
add_library(simple_timer
src/cycle_timer.cpp
src/period_timer.cpp
)
target_include_directories(simple_timer PUBLIC target_include_directories(simple_timer PUBLIC
$<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/include> $<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/include>
$<INSTALL_INTERFACE:include> $<INSTALL_INTERFACE:include>
) )
add_library(rt-sched src/rt-sched.cpp) ament_target_dependencies(simple_timer
rclcpp
)
add_library(rt-sched
src/rt-sched.cpp
)
target_include_directories(rt-sched PUBLIC target_include_directories(rt-sched PUBLIC
$<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/include> $<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/include>
$<INSTALL_INTERFACE:include> $<INSTALL_INTERFACE:include>
) )
ament_target_dependencies(rt-sched
rclcpp
)
# Testing
if(BUILD_TESTING) if(BUILD_TESTING)
find_package(ament_lint_auto REQUIRED) find_package(ament_lint_auto REQUIRED)
# the following line skips the linter which checks for copyrights
# uncomment the line when a copyright and license is not present in all source files
#set(ament_cmake_copyright_FOUND TRUE)
# the following line skips cpplint (only works in a git repo)
# uncomment the line when this package is not in a git repo
#set(ament_cmake_cpplint_FOUND TRUE)
ament_lint_auto_find_test_dependencies() ament_lint_auto_find_test_dependencies()
endif() endif()
# Installation
install( install(
DIRECTORY include/ DIRECTORY include/
DESTINATION include DESTINATION include
@ -50,12 +53,15 @@ install(
install( install(
TARGETS simple_timer rt-sched TARGETS simple_timer rt-sched
EXPORT export_${PROJECT_NAME}
LIBRARY DESTINATION lib LIBRARY DESTINATION lib
ARCHIVE DESTINATION lib ARCHIVE DESTINATION lib
RUNTIME DESTINATION bin RUNTIME DESTINATION bin
INCLUDES DESTINATION include
) )
# Export and package configuration
ament_export_include_directories(include) ament_export_include_directories(include)
ament_export_libraries(simple_timer) ament_export_targets(export_${PROJECT_NAME} HAS_LIBRARY_TARGET)
ament_export_libraries(rt-sched) ament_export_dependencies(rclcpp)
ament_package() ament_package()

View file

@ -3,21 +3,23 @@
#include <memory> #include <memory>
#include "simple_timer/rt-sched.hpp" #include "simple_timer/rt-sched.hpp"
namespace simple_timer
{ namespace simple_timer {
class CycleTimer
{ class CycleTimer {
public: public:
CycleTimer(long start_delay=0); explicit CycleTimer(long const start_delay = 0);
void tick() ; void tick();
const u64 start_delay_time;
u64 start_time = 0; u64 const start_delay_time;
u64 last_cycle_time = 0; u64 start_time = 0;
unsigned long max_diff = 0; u64 last_cycle_time = 0;
unsigned long min_diff = 0; unsigned long max_diff = 0;
unsigned long last_diff = 0; unsigned long min_diff = 0;
bool recording = false; unsigned long last_diff = 0;
}; bool recording = false;
};
} // namespace simple_timer } // namespace simple_timer
#endif #endif

View file

@ -3,23 +3,24 @@
#include <memory> #include <memory>
#include "simple_timer/rt-sched.hpp" #include "simple_timer/rt-sched.hpp"
namespace simple_timer
{
class PeriodTimer
{
public:
PeriodTimer(long start_delay = 0);
void start();
void stop();
const u64 start_delay_time;
u64 start_time = 0;
u64 last_period_time = 0; namespace simple_timer {
unsigned long max_period = 0;
unsigned long min_period = 0; class PeriodTimer {
unsigned long last_period = 0; public:
bool recording = false; explicit PeriodTimer(long const start_delay = 0);
}; void start();
void stop();
u64 const start_delay_time;
u64 start_time = 0;
u64 last_period_time = 0;
unsigned long max_period = 0;
unsigned long min_period = 0;
unsigned long last_period = 0;
bool recording = false;
};
} // namespace simple_timer } // namespace simple_timer
#endif #endif

View file

@ -22,77 +22,76 @@
#endif #endif
#ifdef __x86_64__ #ifdef __x86_64__
#define __NR_sched_setattr 314 #define __NR_sched_setattr 314
#define __NR_sched_getattr 315 #define __NR_sched_getattr 315
#endif #endif
#ifdef __i386__ #ifdef __i386__
#define __NR_sched_setattr 351 #define __NR_sched_setattr 351
#define __NR_sched_getattr 352 #define __NR_sched_getattr 352
#endif #endif
#ifdef __arm__ #ifdef __arm__
#ifndef __NR_sched_setattr #ifndef __NR_sched_setattr
#define __NR_sched_setattr 380 #define __NR_sched_setattr 380
#endif #endif
#ifndef __NR_sched_getattr #ifndef __NR_sched_getattr
#define __NR_sched_getattr 381 #define __NR_sched_getattr 381
#endif #endif
#endif #endif
#ifdef __tilegx__ #ifdef __tilegx__
#define __NR_sched_setattr 274 #define __NR_sched_setattr 274
#define __NR_sched_getattr 275 #define __NR_sched_getattr 275
#endif #endif
typedef unsigned long long u64; typedef unsigned long long u64;
#define NS_TO_MS 1000000 #define NS_TO_MS 1000000
struct sched_attr { struct sched_attr {
uint32_t size; uint32_t size;
uint32_t sched_policy; uint32_t sched_policy;
uint64_t sched_flags; uint64_t sched_flags;
/* SCHED_NORMAL, SCHED_BATCH */ /* SCHED_NORMAL, SCHED_BATCH */
int32_t sched_nice; int32_t sched_nice;
/* SCHED_FIFO, SCHED_RR */ /* SCHED_FIFO, SCHED_RR */
uint32_t sched_priority; uint32_t sched_priority;
/* SCHED_DEADLINE */ /* SCHED_DEADLINE */
uint64_t sched_runtime; uint64_t sched_runtime;
uint64_t sched_deadline; uint64_t sched_deadline;
uint64_t sched_period; uint64_t sched_period;
}; };
int sched_setattr(pid_t pid, int sched_setattr(pid_t pid,
const struct sched_attr *attr, sched_attr const* attr,
unsigned int flags); unsigned int flags);
int sched_getattr(pid_t pid, int sched_getattr(pid_t pid,
struct sched_attr *attr, sched_attr* attr,
unsigned int size, unsigned int size,
unsigned int flags); unsigned int flags);
u64 get_time_us(void); u64 get_time_us(void);
typedef struct node_time_logger typedef struct node_time_logger {
{ std::shared_ptr<std::vector<std::pair<std::string, u64>>> recorded_times;
std::shared_ptr<std::vector<std::pair<std::string, u64>>> recorded_times;
} node_time_logger; } node_time_logger;
void log_entry(node_time_logger logger, std::string text); void log_entry(node_time_logger logger, std::string const& text);
node_time_logger create_logger(); node_time_logger create_logger();
inline u64 get_time_us(void) inline u64 get_time_us(void) {
{ struct timespec ts;
struct timespec ts; u64 time;
unsigned long long time;
clock_gettime(CLOCK_MONOTONIC_RAW, &ts); clock_gettime(CLOCK_MONOTONIC_RAW, &ts);
time = ts.tv_sec * 1000000; time = ts.tv_sec * 1000000;
time += ts.tv_nsec / 1000; time += ts.tv_nsec / 1000;
return time; return time;
} }
#endif /* __RT_SCHED_H__ */ #endif /* __RT_SCHED_H__ */

View file

@ -2,13 +2,19 @@
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?> <?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3"> <package format="3">
<name>simple_timer</name> <name>simple_timer</name>
<version>0.0.0</version> <version>0.1.0</version>
<description>TODO: Package description</description> <description>
<maintainer email="kurt4wilson@gmail.com">nvidia</maintainer> ROS 2 package providing timer functionality for the dynamic executor experiments
<license>TODO: License declaration</license> </description>
<maintainer email="your.email@example.com">ROS-Dynamic-Executor Team</maintainer>
<license>Apache License 2.0</license>
<buildtool_depend>ament_cmake</buildtool_depend> <buildtool_depend>ament_cmake</buildtool_depend>
<!-- Build Dependencies -->
<depend>rclcpp</depend>
<!-- Test Dependencies -->
<test_depend>ament_lint_auto</test_depend> <test_depend>ament_lint_auto</test_depend>
<test_depend>ament_lint_common</test_depend> <test_depend>ament_lint_common</test_depend>

View file

@ -1,43 +1,34 @@
#include "simple_timer/cycle_timer.hpp" #include "simple_timer/cycle_timer.hpp"
namespace simple_timer namespace simple_timer {
{
CycleTimer::CycleTimer(long start_delay) : start_delay_time(start_delay * 1000)
{
}
void CycleTimer::tick() CycleTimer::CycleTimer(long const start_delay)
{ : start_delay_time(start_delay * 1000) {
u64 current_wall_time = get_time_us(); }
void CycleTimer::tick() {
u64 const current_wall_time = get_time_us();
u64 time_diff = 0; u64 time_diff = 0;
if (!recording) if (!recording) {
{ if (start_time == 0) {
start_time = current_wall_time;
if (start_time == 0) } else if (current_wall_time - start_time > start_delay_time) {
{ recording = true;
start_time = current_wall_time; last_cycle_time = current_wall_time;
} start_time = current_wall_time;
else if (current_wall_time - start_time > start_delay_time) }
{ } else {
recording = true; time_diff = current_wall_time - last_cycle_time;
if (time_diff < min_diff || min_diff == 0) {
min_diff = time_diff;
}
if (time_diff > max_diff || max_diff == 0) {
max_diff = time_diff;
}
last_cycle_time = current_wall_time; last_cycle_time = current_wall_time;
start_time = current_wall_time; last_diff = time_diff;
}
} }
else }
{
time_diff = current_wall_time - last_cycle_time; } // namespace simple_timer
if (time_diff < min_diff || min_diff == 0)
{
min_diff = time_diff;
}
if (time_diff > max_diff || max_diff == 0)
{
max_diff = time_diff;
}
last_cycle_time = current_wall_time;
last_diff = time_diff;
}
}
} // namespace simple_timer

View file

@ -1,56 +1,43 @@
#include "simple_timer/period_timer.hpp" #include "simple_timer/period_timer.hpp"
namespace simple_timer namespace simple_timer {
{
PeriodTimer::PeriodTimer(long start_delay) : start_delay_time(start_delay * 1000)
{
}
void PeriodTimer::start() PeriodTimer::PeriodTimer(long const start_delay)
{ : start_delay_time(start_delay * 1000) {
u64 current_wall_time = get_time_us(); }
if (!recording) void PeriodTimer::start() {
{ u64 const current_wall_time = get_time_us();
if (start_time == 0) if (!recording) {
{ if (start_time == 0) {
start_time = current_wall_time; start_time = current_wall_time;
} } else if (current_wall_time - start_time > start_delay_time) {
else if (current_wall_time - start_time > start_delay_time) recording = true;
{ start_time = current_wall_time;
recording = true;
start_time = current_wall_time;
last_period_time = current_wall_time;
}
}
else
{
last_period_time = current_wall_time; last_period_time = current_wall_time;
} }
} else {
last_period_time = current_wall_time;
} }
void PeriodTimer::stop() }
{
u64 current_wall_time = get_time_us(); void PeriodTimer::stop() {
u64 const current_wall_time = get_time_us();
u64 time_diff = 0; u64 time_diff = 0;
if (!recording) if (!recording) {
{
return; return;
} }
else
{ time_diff = current_wall_time - last_period_time;
time_diff = current_wall_time - last_period_time; if (time_diff < min_period || min_period == 0) {
if (time_diff < min_period || min_period == 0)
{
min_period = time_diff; min_period = time_diff;
}
if (time_diff > max_period || max_period == 0)
{
max_period = time_diff;
}
last_period = time_diff;
} }
} if (time_diff > max_period || max_period == 0) {
} // namespace simple_timer max_period = time_diff;
}
last_period = time_diff;
}
} // namespace simple_timer

View file

@ -13,37 +13,32 @@
#include <sys/syscall.h> #include <sys/syscall.h>
#include <time.h> #include <time.h>
#include <iostream> #include <iostream>
#include "simple_timer/rt-sched.hpp" #include "simple_timer/rt-sched.hpp"
int sched_setattr(pid_t pid, int sched_setattr(
const struct sched_attr *attr, pid_t pid,
unsigned int flags) sched_attr const* attr,
{ unsigned int flags) {
return syscall(__NR_sched_setattr, pid, attr, flags); return syscall(__NR_sched_setattr, pid, attr, flags);
} }
int sched_getattr(pid_t pid, int sched_getattr(
struct sched_attr *attr, pid_t pid,
unsigned int size, sched_attr* attr,
unsigned int flags) unsigned int size,
{ unsigned int flags) {
return syscall(__NR_sched_getattr, pid, attr, size, flags); return syscall(__NR_sched_getattr, pid, attr, size, flags);
} }
void log_entry(node_time_logger logger, std::string text) void log_entry(node_time_logger logger, std::string const& text) {
{ if (logger.recorded_times != nullptr) {
if (logger.recorded_times != nullptr) logger.recorded_times->emplace_back(text, get_time_us() / 1000);
{ } else {
logger.recorded_times->push_back(std::make_pair(text, get_time_us() / 1000));
// std::cout<<text<<std::endl;
}else{
// TODO: report error // TODO: report error
} }
} }
node_time_logger create_logger() node_time_logger create_logger() {
{
node_time_logger logger; node_time_logger logger;
logger.recorded_times = std::make_shared<std::vector<std::pair<std::string, u64>>>(); logger.recorded_times = std::make_shared<std::vector<std::pair<std::string, u64>>>();
return logger; return logger;