misc fixed & improvements. auto-set chain deadline queue. update usage example

This commit is contained in:
Kurt Wilson 2025-03-22 19:25:01 -04:00
parent 99531e6c36
commit 1b8277bcf5
No known key found for this signature in database
13 changed files with 1541 additions and 1205 deletions

View file

@ -105,4 +105,5 @@ ament_export_include_directories(include)
ament_export_libraries(priority_executor) ament_export_libraries(priority_executor)
ament_export_libraries(multithread_priority_executor) ament_export_libraries(multithread_priority_executor)
ament_export_libraries(default_executor) ament_export_libraries(default_executor)
ament_export_dependencies(rclcpp rcl simple_timer)
ament_package() ament_package()

View file

@ -15,11 +15,13 @@
#ifndef RTIS_PRIORITY_STRATEGY #ifndef RTIS_PRIORITY_STRATEGY
#define RTIS_PRIORITY_STRATEGY #define RTIS_PRIORITY_STRATEGY
#include <memory> #include <cstdint>
#include <vector>
#include <time.h>
#include <set>
#include <deque> #include <deque>
#include <memory>
#include <semaphore.h>
#include <set>
#include <time.h>
#include <vector>
#include "rcl/allocator.h" #include "rcl/allocator.h"
@ -30,317 +32,375 @@
/// Delegate for handling memory allocations while the Executor is executing. /// Delegate for handling memory allocations while the Executor is executing.
/** /**
* By default, the memory strategy dynamically allocates memory for structures that come in from * By default, the memory strategy dynamically allocates memory for structures
* the rmw implementation after the executor waits for work, based on the number of entities that * that come in from the rmw implementation after the executor waits for work,
* come through. * based on the number of entities that come through.
*/ */
enum ExecutableType enum ExecutableType
{ {
SUBSCRIPTION, SUBSCRIPTION,
SERVICE, SERVICE,
CLIENT, CLIENT,
TIMER, TIMER,
WAITABLE WAITABLE
}; };
enum ExecutableScheduleType enum ExecutableScheduleType
{ {
DEADLINE = 0, DEADLINE = 0,
CHAIN_AWARE_PRIORITY, CHAIN_AWARE_PRIORITY,
CHAIN_INDEPENDENT_PRIORITY, // not used here CHAIN_INDEPENDENT_PRIORITY, // not used here
DEFAULT, // not used here DEFAULT, // not used here
}; };
class PriorityExecutable class PriorityExecutable
{ {
public: public:
std::shared_ptr<const void> handle; std::shared_ptr<const void> handle;
ExecutableType type; ExecutableType type;
bool can_be_run = false; bool can_be_run = false;
std::shared_ptr<rclcpp::Waitable> waitable; std::shared_ptr<rclcpp::Waitable> waitable;
ExecutableScheduleType sched_type; ExecutableScheduleType sched_type;
int priority; int priority = 0;
long period = 1000; // milliseconds long period = 1000; // milliseconds
bool is_first_in_chain = false; bool is_first_in_chain = false;
bool is_last_in_chain = false; bool is_last_in_chain = false;
// chain aware deadlines // chain aware deadlines
std::deque<uint64_t> *deadlines = nullptr; std::deque<uint64_t> *deadlines = nullptr;
std::shared_ptr<rclcpp::TimerBase> timer_handle; std::shared_ptr<rclcpp::TimerBase> timer_handle;
// just used for logging // just used for logging
int chain_id = 0; int chain_id = 0;
// chain aware priority // chain aware priority
int counter = 0; int counter = 0;
PriorityExecutable(std::shared_ptr<const void> h, int p, ExecutableType t, ExecutableScheduleType sched_type = CHAIN_INDEPENDENT_PRIORITY); PriorityExecutable(
std::shared_ptr<const void> h, int p, ExecutableType t,
ExecutableScheduleType sched_type = CHAIN_INDEPENDENT_PRIORITY);
void dont_run(); void dont_run();
void allow_run(); void allow_run();
PriorityExecutable(); PriorityExecutable();
void increment_counter(); void increment_counter();
bool operator==(const PriorityExecutable &other) const; bool operator==(const PriorityExecutable &other) const;
static size_t num_executables; static size_t num_executables;
int executable_id = 0; int executable_id = 0;
std::string name = ""; std::string name = "";
// stream operator for debug
friend std::ostream &operator<<(std::ostream &os,
const PriorityExecutable &pe)
{
os << "sched_type: " << pe.sched_type << ", ";
if (pe.sched_type == DEADLINE)
{
os << "period: " << pe.period << ", ";
}
os << "priority: " << pe.priority << ", ";
os << "executable_id: " << pe.executable_id << ", ";
os << "chain_id: " << pe.chain_id << ", ";
os << "is_first_in_chain: " << pe.is_first_in_chain << ", ";
os << "is_last_in_chain: " << pe.is_last_in_chain << ", ";
return os;
}
}; };
class PriorityExecutableComparator class PriorityExecutableComparator
{ {
public: public:
bool operator()(const PriorityExecutable *p1, const PriorityExecutable *p2); bool operator()(const PriorityExecutable *p1, const PriorityExecutable *p2);
}; };
template <typename Alloc = std::allocator<void>> template <typename Alloc = std::allocator<void>>
class PriorityMemoryStrategy : public rclcpp::memory_strategy::MemoryStrategy class PriorityMemoryStrategy : public rclcpp::memory_strategy::MemoryStrategy
{ {
public: public:
RCLCPP_SMART_PTR_DEFINITIONS(PriorityMemoryStrategy<Alloc>) RCLCPP_SMART_PTR_DEFINITIONS(PriorityMemoryStrategy<Alloc>)
using VoidAllocTraits = typename rclcpp::allocator::AllocRebind<void *, Alloc>; using VoidAllocTraits =
using VoidAlloc = typename VoidAllocTraits::allocator_type; typename rclcpp::allocator::AllocRebind<void *, Alloc>;
using VoidAlloc = typename VoidAllocTraits::allocator_type;
explicit PriorityMemoryStrategy(std::shared_ptr<Alloc> allocator) explicit PriorityMemoryStrategy(std::shared_ptr<Alloc> allocator)
{
allocator_ = std::make_shared<VoidAlloc>(*allocator.get());
logger_ = create_logger();
}
PriorityMemoryStrategy()
{
allocator_ = std::make_shared<VoidAlloc>();
logger_ = create_logger();
}
node_time_logger logger_;
void
add_guard_condition(const rcl_guard_condition_t *guard_condition) override;
void
remove_guard_condition(const rcl_guard_condition_t *guard_condition) override;
void clear_handles() override;
void remove_null_handles(rcl_wait_set_t *wait_set) override;
bool collect_entities(const WeakNodeList &weak_nodes) override;
void
add_waitable_handle(const rclcpp::Waitable::SharedPtr &waitable) override;
bool add_handles_to_wait_set(rcl_wait_set_t *wait_set) override;
void get_next_executable(rclcpp::AnyExecutable &any_exec,
const WeakNodeList &weak_nodes);
/**
* thread-id is used for logging. if -1, then don't log the thread id
*/
void post_execute(rclcpp::AnyExecutable any_exec, int thread_id = -1);
void get_next_subscription(rclcpp::AnyExecutable &any_exec,
const WeakNodeList &weak_nodes) override;
void get_next_service(rclcpp::AnyExecutable &any_exec,
const WeakNodeList &weak_nodes) override;
void get_next_client(rclcpp::AnyExecutable &any_exec,
const WeakNodeList &weak_nodes) override;
void get_next_timer(rclcpp::AnyExecutable &any_exec,
const WeakNodeList &weak_nodes) override;
void get_next_waitable(rclcpp::AnyExecutable &any_exec,
const WeakNodeList &weak_nodes) override;
std::shared_ptr<PriorityExecutable>
get_priority_settings(std::shared_ptr<const void> executable)
{
auto search = priority_map.find(executable);
if (search != priority_map.end())
{ {
allocator_ = std::make_shared<VoidAlloc>(*allocator.get()); return search->second;
logger_ = create_logger();
} }
else
PriorityMemoryStrategy()
{ {
allocator_ = std::make_shared<VoidAlloc>(); return nullptr;
logger_ = create_logger();
} }
node_time_logger logger_; }
void add_guard_condition(const rcl_guard_condition_t *guard_condition) override; rcl_allocator_t get_allocator() override
{
return rclcpp::allocator::get_rcl_allocator<void *, VoidAlloc>(
*allocator_.get());
}
void remove_guard_condition(const rcl_guard_condition_t *guard_condition) override; size_t number_of_ready_subscriptions() const override
{
void clear_handles() override; size_t number_of_subscriptions = subscription_handles_.size();
// std::cout << "ready_raw: " << number_of_subscriptions << std::endl;
void remove_null_handles(rcl_wait_set_t *wait_set) override; for (auto waitable : waitable_handles_)
bool collect_entities(const WeakNodeList &weak_nodes) override;
void add_waitable_handle(const rclcpp::Waitable::SharedPtr &waitable) override;
bool add_handles_to_wait_set(rcl_wait_set_t *wait_set) override;
void
get_next_executable(
rclcpp::AnyExecutable &any_exec,
const WeakNodeList &weak_nodes);
/**
* thread-id is used for logging. if -1, then don't log the thread id
*/
void post_execute(rclcpp::AnyExecutable any_exec, int thread_id = -1);
void
get_next_subscription(
rclcpp::AnyExecutable &any_exec,
const WeakNodeList &weak_nodes) override;
void
get_next_service(
rclcpp::AnyExecutable &any_exec,
const WeakNodeList &weak_nodes) override;
void
get_next_client(rclcpp::AnyExecutable &any_exec, const WeakNodeList &weak_nodes) override;
void
get_next_timer(
rclcpp::AnyExecutable &any_exec,
const WeakNodeList &weak_nodes) override;
void
get_next_waitable(rclcpp::AnyExecutable &any_exec, const WeakNodeList &weak_nodes) override;
std::shared_ptr<PriorityExecutable> get_priority_settings(std::shared_ptr<const void> executable)
{ {
auto search = priority_map.find(executable); number_of_subscriptions += waitable->get_number_of_ready_subscriptions();
if (search != priority_map.end())
{
return search->second;
}
else
{
return nullptr;
}
} }
return number_of_subscriptions;
}
rcl_allocator_t get_allocator() override size_t number_of_ready_services() const override
{
size_t number_of_services = service_handles_.size();
for (auto waitable : waitable_handles_)
{ {
return rclcpp::allocator::get_rcl_allocator<void *, VoidAlloc>(*allocator_.get()); number_of_services += waitable->get_number_of_ready_services();
} }
return number_of_services;
}
size_t number_of_ready_subscriptions() const override size_t number_of_ready_events() const override
{
size_t number_of_events = 0;
for (auto waitable : waitable_handles_)
{ {
size_t number_of_subscriptions = subscription_handles_.size(); number_of_events += waitable->get_number_of_ready_events();
// std::cout << "ready_raw: " << number_of_subscriptions << std::endl;
for (auto waitable : waitable_handles_)
{
number_of_subscriptions += waitable->get_number_of_ready_subscriptions();
}
return number_of_subscriptions;
} }
return number_of_events;
}
size_t number_of_ready_services() const override size_t number_of_ready_clients() const override
{
size_t number_of_clients = client_handles_.size();
for (auto waitable : waitable_handles_)
{ {
size_t number_of_services = service_handles_.size(); number_of_clients += waitable->get_number_of_ready_clients();
for (auto waitable : waitable_handles_)
{
number_of_services += waitable->get_number_of_ready_services();
}
return number_of_services;
} }
return number_of_clients;
}
size_t number_of_ready_events() const override size_t number_of_guard_conditions() const override
{
size_t number_of_guard_conditions = guard_conditions_.size();
for (auto waitable : waitable_handles_)
{ {
size_t number_of_events = 0; number_of_guard_conditions +=
for (auto waitable : waitable_handles_) waitable->get_number_of_ready_guard_conditions();
{
number_of_events += waitable->get_number_of_ready_events();
}
return number_of_events;
} }
return number_of_guard_conditions;
}
size_t number_of_ready_clients() const override size_t number_of_ready_timers() const override
{
size_t number_of_timers = timer_handles_.size();
for (auto waitable : waitable_handles_)
{ {
size_t number_of_clients = client_handles_.size(); number_of_timers += waitable->get_number_of_ready_timers();
for (auto waitable : waitable_handles_)
{
number_of_clients += waitable->get_number_of_ready_clients();
}
return number_of_clients;
} }
return number_of_timers;
}
size_t number_of_guard_conditions() const override size_t number_of_waitables() const override
{ {
size_t number_of_guard_conditions = guard_conditions_.size(); return waitable_handles_.size();
for (auto waitable : waitable_handles_) }
{
number_of_guard_conditions += waitable->get_number_of_ready_guard_conditions();
}
return number_of_guard_conditions;
}
size_t number_of_ready_timers() const override void set_executable_priority(std::shared_ptr<const void> handle, int priority,
{ ExecutableType t)
size_t number_of_timers = timer_handles_.size(); {
for (auto waitable : waitable_handles_) // TODO: any sanity checks should go here
{ priority_map[handle] =
number_of_timers += waitable->get_number_of_ready_timers(); std::make_shared<PriorityExecutable>(handle, priority, t);
} }
return number_of_timers; void set_executable_priority(std::shared_ptr<const void> handle, int priority,
} ExecutableType t, ExecutableScheduleType sc,
int chain_index)
{
// TODO: any sanity checks should go here
priority_map[handle] =
std::make_shared<PriorityExecutable>(handle, priority, t, sc);
priority_map[handle]->chain_id = chain_index;
}
size_t number_of_waitables() const override void set_executable_deadline(std::shared_ptr<const void> handle, int period,
ExecutableType t, int chain_id = 0,
std::string name = "")
{
// TODO: any sanity checks should go here
priority_map[handle] =
std::make_shared<PriorityExecutable>(handle, period, t, DEADLINE);
priority_map[handle]->chain_id = chain_id;
priority_map[handle]->name = name;
// is there a deadline queue for this chain id?
auto search = chain_deadlines.find(chain_id);
if (search == chain_deadlines.end())
{ {
return waitable_handles_.size(); chain_deadlines[chain_id] = std::make_shared<std::deque<uint64_t>>();
} }
priority_map[handle]->deadlines = chain_deadlines[chain_id].get();
}
void set_executable_priority(std::shared_ptr<const void> handle, int priority, ExecutableType t) int get_priority(std::shared_ptr<const void> executable)
{
auto search = priority_map.find(executable);
if (search != priority_map.end())
{ {
// TODO: any sanity checks should go here return search->second->priority;
priority_map[handle] = std::make_shared<PriorityExecutable>(handle, priority, t);
} }
void set_executable_priority(std::shared_ptr<const void> handle, int priority, ExecutableType t, ExecutableScheduleType sc, int chain_index) else
{ {
// TODO: any sanity checks should go here return 0;
priority_map[handle] = std::make_shared<PriorityExecutable>(handle, priority, t, sc);
priority_map[handle]->chain_id = chain_index;
} }
}
void set_executable_deadline(std::shared_ptr<const void> handle, int period, ExecutableType t, int chain_id = 0, std::string name = "") void set_first_in_chain(std::shared_ptr<const void> exec_handle)
{ {
// TODO: any sanity checks should go here get_priority_settings(exec_handle)->is_first_in_chain = true;
priority_map[handle] = std::make_shared<PriorityExecutable>(handle, period, t, DEADLINE); }
priority_map[handle]->chain_id = chain_id;
priority_map[handle]->name = name;
}
int get_priority(std::shared_ptr<const void> executable) void set_last_in_chain(std::shared_ptr<const void> exec_handle)
{ {
auto search = priority_map.find(executable); get_priority_settings(exec_handle)->is_last_in_chain = true;
if (search != priority_map.end()) }
{
return search->second->priority;
}
else
{
return 0;
}
}
void assign_deadlines_queue(std::shared_ptr<const void> exec_handle,
std::deque<uint64_t> *deadlines)
{
get_priority_settings(exec_handle)->deadlines = deadlines;
}
void set_first_in_chain(std::shared_ptr<const void> exec_handle) std::shared_ptr<std::deque<uint64_t>> get_chain_deadlines(int chain_id)
{
auto search = chain_deadlines.find(chain_id);
if (search != chain_deadlines.end())
{ {
get_priority_settings(exec_handle)->is_first_in_chain = true; return search->second;
} }
else
void set_last_in_chain(std::shared_ptr<const void> exec_handle)
{ {
get_priority_settings(exec_handle)->is_last_in_chain = true; return nullptr;
}
void assign_deadlines_queue(std::shared_ptr<const void> exec_handle, std::deque<uint64_t> *deadlines)
{
get_priority_settings(exec_handle)->deadlines = deadlines;
} }
}
private: private:
std::shared_ptr<PriorityExecutable> get_and_reset_priority(std::shared_ptr<const void> executable, ExecutableType t) std::shared_ptr<PriorityExecutable>
get_and_reset_priority(std::shared_ptr<const void> executable,
ExecutableType t)
{
// PriorityExecutable *p = get_priority_settings(executable);
std::shared_ptr<PriorityExecutable> p = get_priority_settings(executable);
if (p == nullptr)
{ {
// PriorityExecutable *p = get_priority_settings(executable); // priority_map[executable] = PriorityExecutable(executable, 0, t);
std::shared_ptr<PriorityExecutable> p = get_priority_settings(executable); priority_map[executable] =
if (p == nullptr) std::make_shared<PriorityExecutable>(executable, 0, t);
{ p = priority_map[executable];
// priority_map[executable] = PriorityExecutable(executable, 0, t);
priority_map[executable] = std::make_shared<PriorityExecutable>(executable, 0, t);
p = priority_map[executable];
}
// p->can_be_run = true;
return p;
} }
// p->can_be_run = true;
return p;
}
template <typename T> template <typename T>
using VectorRebind = using VectorRebind = std::vector<
std::vector<T, typename std::allocator_traits<Alloc>::template rebind_alloc<T>>; T, typename std::allocator_traits<Alloc>::template rebind_alloc<T>>;
VectorRebind<const rcl_guard_condition_t *> guard_conditions_; VectorRebind<const rcl_guard_condition_t *> guard_conditions_;
VectorRebind<std::shared_ptr<const rcl_subscription_t>> subscription_handles_; VectorRebind<std::shared_ptr<const rcl_subscription_t>> subscription_handles_;
VectorRebind<std::shared_ptr<const rcl_service_t>> service_handles_; VectorRebind<std::shared_ptr<const rcl_service_t>> service_handles_;
VectorRebind<std::shared_ptr<const rcl_client_t>> client_handles_; VectorRebind<std::shared_ptr<const rcl_client_t>> client_handles_;
VectorRebind<std::shared_ptr<const rcl_timer_t>> timer_handles_; VectorRebind<std::shared_ptr<const rcl_timer_t>> timer_handles_;
VectorRebind<std::shared_ptr<rclcpp::Waitable>> waitable_handles_; VectorRebind<std::shared_ptr<rclcpp::Waitable>> waitable_handles_;
std::shared_ptr<VoidAlloc> allocator_; std::shared_ptr<VoidAlloc> allocator_;
// TODO: evaluate using node/subscription namespaced strings as keys // TODO: evaluate using node/subscription namespaced strings as keys
// holds *all* handle->priority mappings // holds *all* handle->priority mappings
std::map<std::shared_ptr<const void>, std::shared_ptr<PriorityExecutable>> priority_map; std::map<std::shared_ptr<const void>, std::shared_ptr<PriorityExecutable>>
priority_map;
// hold *only valid* executable+priorities std::map<int, std::shared_ptr<std::deque<uint64_t>>> chain_deadlines;
// std::priority_queue<const PriorityExecutable *, std::vector<const PriorityExecutable *>, PriorityExecutableComparator> all_executables_;
std::set<const PriorityExecutable *, PriorityExecutableComparator> all_executables_;
// priority queue doesn't allow iteration. fortunately, std::map is sorted by key, so we can replace the priority queue with a map // hold *only valid* executable+priorities
// the key will be the priority. the value doesn't matter. // std::priority_queue<const PriorityExecutable *, std::vector<const
// std::map<const PriorityExecutable *, int, PriorityExecutableComparator> all_executables_ = std::map<const PriorityExecutable *, int, PriorityExecutableComparator>(PriorityExecutableComparator()); // PriorityExecutable *>, PriorityExecutableComparator> all_executables_;
void add_executable_to_queue(std::shared_ptr<PriorityExecutable> p); std::set<const PriorityExecutable *, PriorityExecutableComparator>
all_executables_;
// priority queue doesn't allow iteration. fortunately, std::map is sorted by
// key, so we can replace the priority queue with a map the key will be the
// priority. the value doesn't matter. std::map<const PriorityExecutable *,
// int, PriorityExecutableComparator> all_executables_ = std::map<const
// PriorityExecutable *, int,
// PriorityExecutableComparator>(PriorityExecutableComparator());
void add_executable_to_queue(std::shared_ptr<PriorityExecutable> p);
}; };
#endif // RCLCPP__STRATEGIES__ALLOCATOR_MEMORY_STRATEGY_HPP_ #endif // RCLCPP__STRATEGIES__ALLOCATOR_MEMORY_STRATEGY_HPP_

View file

@ -59,7 +59,7 @@ namespace timed_executor
} }
} }
} }
std::cout << "shutdown" << std::endl; RCLCPP_INFO(rclcpp::get_logger("priority_executor"), "priority executor shutdown");
} }
bool TimedExecutor::get_next_executable(rclcpp::AnyExecutable &any_executable, std::chrono::nanoseconds timeout) bool TimedExecutor::get_next_executable(rclcpp::AnyExecutable &any_executable, std::chrono::nanoseconds timeout)

File diff suppressed because it is too large Load diff

View file

@ -1,79 +1,98 @@
#include "rclcpp/rclcpp.hpp"
#include "priority_executor/priority_executor.hpp" #include "priority_executor/priority_executor.hpp"
#include "priority_executor/priority_memory_strategy.hpp" #include "priority_executor/priority_memory_strategy.hpp"
#include <string> #include "rclcpp/rclcpp.hpp"
#include <fstream>
#include <unistd.h>
#include "std_msgs/msg/string.hpp" #include "std_msgs/msg/string.hpp"
#include <fstream>
#include <string>
#include <unistd.h>
// re-create the classic talker-listener example with two listeners // re-create the classic talker-listener example with two listeners
class Talker : public rclcpp::Node class Talker : public rclcpp::Node
{ {
public: public:
Talker() : Node("talker") Talker() : Node("talker")
{ {
// Create a publisher on the "chatter" topic with 10 msg queue size. // Create a publisher on the "chatter" topic with 10 msg queue size.
pub_ = this->create_publisher<std_msgs::msg::String>("chatter", 10); pub_ = this->create_publisher<std_msgs::msg::String>("chatter", 10);
// Create a timer of period 1s that calls our callback member function. // Create a timer of period 1s that calls our callback member function.
timer_ = this->create_wall_timer(std::chrono::seconds(1), std::bind(&Talker::timer_callback, this)); timer_ = this->create_wall_timer(std::chrono::seconds(1),
} std::bind(&Talker::timer_callback, this));
// the timer must be public }
rclcpp::TimerBase::SharedPtr timer_; // the timer must be public
rclcpp::TimerBase::SharedPtr timer_;
private: private:
void timer_callback() void timer_callback()
{ {
// Create a message and publish it 10 times. std_msgs::msg::String msg;
std_msgs::msg::String msg; msg.data = "Hello World!";
msg.data = "Hello World!"; RCLCPP_INFO(this->get_logger(), "Publishing: '%s'", msg.data.c_str());
for (int i = 0; i < 10; ++i) pub_->publish(msg);
{ }
RCLCPP_INFO(this->get_logger(), "Publishing: '%s'", msg.data.c_str()); rclcpp::Publisher<std_msgs::msg::String>::SharedPtr pub_;
pub_->publish(msg);
}
}
rclcpp::Publisher<std_msgs::msg::String>::SharedPtr pub_;
}; };
class Listener : public rclcpp::Node class Listener : public rclcpp::Node
{ {
public: public:
Listener(std::string name) : Node(name) Listener(std::string name) : Node(name)
{ {
// Create a subscription on the "chatter" topic with the default callback method. // Create a subscription on the "chatter" topic with the default callback
sub_ = this->create_subscription<std_msgs::msg::String>("chatter", 10, std::bind(&Listener::callback, this, std::placeholders::_1)); // method.
} sub_ = this->create_subscription<std_msgs::msg::String>(
// the publisher must be public "chatter", 10,
rclcpp::Subscription<std_msgs::msg::String>::SharedPtr sub_; std::bind(&Listener::callback, this, std::placeholders::_1));
}
// the publisher must be public
rclcpp::Subscription<std_msgs::msg::String>::SharedPtr sub_;
private: private:
void callback(const std_msgs::msg::String::SharedPtr msg) void callback(const std_msgs::msg::String::SharedPtr msg)
{ {
RCLCPP_INFO(this->get_logger(), "I heard: '%s'", msg->data.c_str()); RCLCPP_INFO(this->get_logger(), "I heard: '%s'", msg->data.c_str());
} }
}; };
int main(int argc, char **argv) int main(int argc, char **argv)
{ {
rclcpp::init(argc, argv); rclcpp::init(argc, argv);
auto talker = std::make_shared<Talker>(); auto talker = std::make_shared<Talker>();
auto listener1 = std::make_shared<Listener>("listener1"); auto listener1 = std::make_shared<Listener>("listener1");
auto listener2 = std::make_shared<Listener>("listener2"); auto listener2 = std::make_shared<Listener>("listener2");
rclcpp::ExecutorOptions options; rclcpp::ExecutorOptions options;
auto strategy = std::make_shared<PriorityMemoryStrategy<>>(); auto strategy = std::make_shared<PriorityMemoryStrategy<>>();
options.memory_strategy = strategy; options.memory_strategy = strategy;
auto executor = new timed_executor::TimedExecutor(options); auto executor = new timed_executor::TimedExecutor(options);
// replace the above line with the following line to use the default executor // replace the above line with the following line to use the default executor
// which will intermix the execution of listener1 and listener2 // which will intermix the execution of listener1 and listener2
// auto executor = std::make_shared<rclcpp::executors::SingleThreadedExecutor>(options); // auto executor =
// std::make_shared<rclcpp::executors::SingleThreadedExecutor>(options);
strategy->set_executable_deadline(talker->timer_->get_timer_handle(), 100, TIMER); strategy->set_executable_deadline(talker->timer_->get_timer_handle(), 1000,
strategy->set_executable_deadline(listener1->sub_->get_subscription_handle(), 100, SUBSCRIPTION); TIMER);
strategy->set_executable_deadline(listener2->sub_->get_subscription_handle(), 50, SUBSCRIPTION); strategy->get_priority_settings(talker->timer_->get_timer_handle())
executor->add_node(talker); ->timer_handle = talker->timer_;
executor->add_node(listener1); strategy->set_first_in_chain(talker->timer_->get_timer_handle());
executor->add_node(listener2); strategy->get_priority_settings(talker->timer_->get_timer_handle());
strategy->set_executable_deadline(listener1->sub_->get_subscription_handle(),
1000, SUBSCRIPTION);
strategy->set_executable_deadline(listener2->sub_->get_subscription_handle(),
1000, SUBSCRIPTION);
strategy->set_last_in_chain(listener2->sub_->get_subscription_handle());
executor->add_node(talker);
executor->add_node(listener1);
executor->add_node(listener2);
executor->spin(); std::cout << *strategy->get_priority_settings(
talker->timer_->get_timer_handle())
<< std::endl;
std::cout << *strategy->get_priority_settings(
listener1->sub_->get_subscription_handle())
<< std::endl;
std::cout << *strategy->get_priority_settings(
listener2->sub_->get_subscription_handle())
<< std::endl;
executor->spin();
} }

View file

@ -0,0 +1,61 @@
cmake_minimum_required(VERSION 3.5)
project(simple_timer)
# Default to C99
if(NOT CMAKE_C_STANDARD)
set(CMAKE_C_STANDARD 99)
endif()
# Default to C++14
if(NOT CMAKE_CXX_STANDARD)
set(CMAKE_CXX_STANDARD 14)
endif()
if(CMAKE_COMPILER_IS_GNUCXX OR CMAKE_CXX_COMPILER_ID MATCHES "Clang")
add_compile_options(-Wall -Wextra -Wpedantic)
endif()
# find dependencies
find_package(ament_cmake REQUIRED)
# uncomment the following section in order to fill in
# further dependencies manually.
# find_package(<dependency> REQUIRED)
add_library(simple_timer src/cycle_timer.cpp src/period_timer.cpp)
target_include_directories(simple_timer PUBLIC
$<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/include>
$<INSTALL_INTERFACE:include>
)
add_library(rt-sched src/rt-sched.cpp)
target_include_directories(rt-sched PUBLIC
$<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/include>
$<INSTALL_INTERFACE:include>
)
if(BUILD_TESTING)
find_package(ament_lint_auto REQUIRED)
# the following line skips the linter which checks for copyrights
# uncomment the line when a copyright and license is not present in all source files
#set(ament_cmake_copyright_FOUND TRUE)
# the following line skips cpplint (only works in a git repo)
# uncomment the line when this package is not in a git repo
#set(ament_cmake_cpplint_FOUND TRUE)
ament_lint_auto_find_test_dependencies()
endif()
install(
DIRECTORY include/
DESTINATION include
)
install(
TARGETS simple_timer rt-sched
LIBRARY DESTINATION lib
ARCHIVE DESTINATION lib
RUNTIME DESTINATION bin
)
ament_export_include_directories(include)
ament_export_libraries(simple_timer)
ament_export_libraries(rt-sched)
ament_package()

View file

@ -0,0 +1,23 @@
#ifndef __CYCLE_TIMER__
#define __CYCLE_TIMER__
#include <memory>
#include "simple_timer/rt-sched.hpp"
namespace simple_timer
{
class CycleTimer
{
public:
CycleTimer(long start_delay=0);
void tick() ;
const u64 start_delay_time;
u64 start_time = 0;
u64 last_cycle_time = 0;
unsigned long max_diff = 0;
unsigned long min_diff = 0;
unsigned long last_diff = 0;
bool recording = false;
};
} // namespace simple_timer
#endif

View file

@ -0,0 +1,25 @@
#ifndef __PERIOD_TIMER__
#define __PERIOD_TIMER__
#include <memory>
#include "simple_timer/rt-sched.hpp"
namespace simple_timer
{
class PeriodTimer
{
public:
PeriodTimer(long start_delay = 0);
void start();
void stop();
const u64 start_delay_time;
u64 start_time = 0;
u64 last_period_time = 0;
unsigned long max_period = 0;
unsigned long min_period = 0;
unsigned long last_period = 0;
bool recording = false;
};
} // namespace simple_timer
#endif

View file

@ -0,0 +1,98 @@
// SPDX-License-Identifier: GPL-2.0-or-later
/*
* rt-sched.h - sched_setattr() and sched_getattr() API
* (C) Dario Faggioli <raistlin@linux.it>, 2009, 2010
* Copyright (C) 2014 BMW Car IT GmbH, Daniel Wagner <daniel.wagner@bmw-carit.de
*/
/* This file is based on Dario Faggioli's libdl. Eventually it will be
replaced by a proper implemenation of this API. */
#ifndef __RT_SCHED_H__
#define __RT_SCHED_H__
#include <stdint.h>
#include <sys/types.h>
#include <mutex>
#include <vector>
#include <memory>
#ifndef SCHED_DEADLINE
#define SCHED_DEADLINE 6
#endif
#ifdef __x86_64__
#define __NR_sched_setattr 314
#define __NR_sched_getattr 315
#endif
#ifdef __i386__
#define __NR_sched_setattr 351
#define __NR_sched_getattr 352
#endif
#ifdef __arm__
#ifndef __NR_sched_setattr
#define __NR_sched_setattr 380
#endif
#ifndef __NR_sched_getattr
#define __NR_sched_getattr 381
#endif
#endif
#ifdef __tilegx__
#define __NR_sched_setattr 274
#define __NR_sched_getattr 275
#endif
typedef unsigned long long u64;
#define NS_TO_MS 1000000
struct sched_attr {
uint32_t size;
uint32_t sched_policy;
uint64_t sched_flags;
/* SCHED_NORMAL, SCHED_BATCH */
int32_t sched_nice;
/* SCHED_FIFO, SCHED_RR */
uint32_t sched_priority;
/* SCHED_DEADLINE */
uint64_t sched_runtime;
uint64_t sched_deadline;
uint64_t sched_period;
};
int sched_setattr(pid_t pid,
const struct sched_attr *attr,
unsigned int flags);
int sched_getattr(pid_t pid,
struct sched_attr *attr,
unsigned int size,
unsigned int flags);
u64 get_time_us(void);
typedef struct node_time_logger
{
std::shared_ptr<std::vector<std::pair<std::string, u64>>> recorded_times;
} node_time_logger;
void log_entry(node_time_logger logger, std::string text);
node_time_logger create_logger();
inline u64 get_time_us(void)
{
struct timespec ts;
unsigned long long time;
clock_gettime(CLOCK_MONOTONIC_RAW, &ts);
time = ts.tv_sec * 1000000;
time += ts.tv_nsec / 1000;
return time;
}
#endif /* __RT_SCHED_H__ */

View file

@ -0,0 +1,18 @@
<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
<name>simple_timer</name>
<version>0.0.0</version>
<description>TODO: Package description</description>
<maintainer email="kurt4wilson@gmail.com">nvidia</maintainer>
<license>TODO: License declaration</license>
<buildtool_depend>ament_cmake</buildtool_depend>
<test_depend>ament_lint_auto</test_depend>
<test_depend>ament_lint_common</test_depend>
<export>
<build_type>ament_cmake</build_type>
</export>
</package>

View file

@ -0,0 +1,43 @@
#include "simple_timer/cycle_timer.hpp"
namespace simple_timer
{
CycleTimer::CycleTimer(long start_delay) : start_delay_time(start_delay * 1000)
{
}
void CycleTimer::tick()
{
u64 current_wall_time = get_time_us();
u64 time_diff = 0;
if (!recording)
{
if (start_time == 0)
{
start_time = current_wall_time;
}
else if (current_wall_time - start_time > start_delay_time)
{
recording = true;
last_cycle_time = current_wall_time;
start_time = current_wall_time;
}
}
else
{
time_diff = current_wall_time - last_cycle_time;
if (time_diff < min_diff || min_diff == 0)
{
min_diff = time_diff;
}
if (time_diff > max_diff || max_diff == 0)
{
max_diff = time_diff;
}
last_cycle_time = current_wall_time;
last_diff = time_diff;
}
}
} // namespace simple_timer

View file

@ -0,0 +1,56 @@
#include "simple_timer/period_timer.hpp"
namespace simple_timer
{
PeriodTimer::PeriodTimer(long start_delay) : start_delay_time(start_delay * 1000)
{
}
void PeriodTimer::start()
{
u64 current_wall_time = get_time_us();
if (!recording)
{
if (start_time == 0)
{
start_time = current_wall_time;
}
else if (current_wall_time - start_time > start_delay_time)
{
recording = true;
start_time = current_wall_time;
last_period_time = current_wall_time;
}
}
else
{
last_period_time = current_wall_time;
}
}
void PeriodTimer::stop()
{
u64 current_wall_time = get_time_us();
u64 time_diff = 0;
if (!recording)
{
return;
}
else
{
time_diff = current_wall_time - last_period_time;
if (time_diff < min_period || min_period == 0)
{
min_period = time_diff;
}
if (time_diff > max_period || max_period == 0)
{
max_period = time_diff;
}
last_period = time_diff;
}
}
} // namespace simple_timer

View file

@ -0,0 +1,50 @@
// SPDX-License-Identifier: GPL-2.0-or-later
/*
* rt-sched.h - sched_setattr() and sched_getattr() API
*
* (C) Dario Faggioli <raistlin@linux.it>, 2009, 2010
* Copyright (C) 2014 BMW Car IT GmbH, Daniel Wagner <daniel.wagner@bmw-carit.de
*/
/* This file is based on Dario Faggioli's libdl. Eventually it will be
replaced by a proper implemenation of this API. */
#include <unistd.h>
#include <sys/syscall.h>
#include <time.h>
#include <iostream>
#include "simple_timer/rt-sched.hpp"
int sched_setattr(pid_t pid,
const struct sched_attr *attr,
unsigned int flags)
{
return syscall(__NR_sched_setattr, pid, attr, flags);
}
int sched_getattr(pid_t pid,
struct sched_attr *attr,
unsigned int size,
unsigned int flags)
{
return syscall(__NR_sched_getattr, pid, attr, size, flags);
}
void log_entry(node_time_logger logger, std::string text)
{
if (logger.recorded_times != nullptr)
{
logger.recorded_times->push_back(std::make_pair(text, get_time_us() / 1000));
// std::cout<<text<<std::endl;
}else{
// TODO: report error
}
}
node_time_logger create_logger()
{
node_time_logger logger;
logger.recorded_times = std::make_shared<std::vector<std::pair<std::string, u64>>>();
return logger;
}