add multithreaded executor. various improvements

This commit is contained in:
Kurt Wilson 2023-05-20 15:30:26 -04:00
parent 46641d7fea
commit a151eeee02
7 changed files with 323 additions and 167 deletions

View file

@ -38,6 +38,18 @@ ament_target_dependencies(priority_executor
simple_timer
)
add_library(multithread_priority_executor src/multithread_priority_executor.cpp src/priority_memory_strategy.cpp)
target_include_directories(multithread_priority_executor PUBLIC
$<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/include>
$<INSTALL_INTERFACE:include>
)
ament_target_dependencies(multithread_priority_executor
rmw
rclcpp
rcl
simple_timer
)
add_executable(usage_example src/usage_example.cpp)
target_include_directories(usage_example PUBLIC
$<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/include>
@ -60,7 +72,7 @@ install(TARGETS usage_example
DESTINATION lib/${PROJECT_NAME}
)
install(TARGETS priority_executor
install(TARGETS priority_executor multithread_priority_executor
ARCHIVE DESTINATION lib
LIBRARY DESTINATION lib
RUNTIME DESTINATION bin
@ -79,4 +91,5 @@ endif()
ament_export_include_directories(include)
ament_export_libraries(priority_executor)
ament_export_libraries(multithread_priority_executor)
ament_package()

View file

@ -0,0 +1,34 @@
#ifndef RTIS_MULTITHREAD_EXECUTOR
#define RTIS_MULTITHREAD_EXECUTOR
#include <priority_executor/priority_executor.hpp>
#include "rclcpp/detail/mutex_two_priorities.hpp"
#include <set>
namespace timed_executor
{
class MultithreadTimedExecutor : public TimedExecutor
{
public:
RCLCPP_PUBLIC
explicit MultithreadTimedExecutor(
const rclcpp::ExecutorOptions &options = rclcpp::ExecutorOptions(), std::string name = "unnamed executor", int number_of_threads = 2, std::chrono::nanoseconds next_exec_timeout = std::chrono::nanoseconds(-1));
RCLCPP_PUBLIC size_t get_number_of_threads();
RCLCPP_PUBLIC void spin() override;
protected:
RCLCPP_PUBLIC void run(size_t thread_number);
private:
size_t number_of_threads_;
std::set<rclcpp::TimerBase::SharedPtr> scheduled_timers_;
static std::unordered_map<MultithreadTimedExecutor *,
std::shared_ptr<rclcpp::detail::MutexTwoPriorities>>
wait_mutex_set_;
static std::mutex shared_wait_mutex_;
std::chrono::nanoseconds next_exec_timeout_ = std::chrono::microseconds(100); // 0.1 ms
};
}
#endif

View file

@ -63,21 +63,17 @@ namespace timed_executor
RCLCPP_PUBLIC
void
spin() override;
unsigned long long get_max_runtime(void);
std::string name;
void set_use_priorities(bool use_prio);
std::shared_ptr<PriorityMemoryStrategy<>> prio_memory_strategy_ = nullptr;
private:
RCLCPP_DISABLE_COPY(TimedExecutor)
// TODO: remove these
unsigned long long maxRuntime = 0;
unsigned long long start_time = 0;
int recording = 0;
void execute_subscription(rclcpp::AnyExecutable subscription);
protected:
bool
get_next_executable(rclcpp::AnyExecutable &any_executable, std::chrono::nanoseconds timeout = std::chrono::nanoseconds(-1));
private:
RCLCPP_DISABLE_COPY(TimedExecutor)
void
wait_for_work(std::chrono::nanoseconds timeout);
@ -89,44 +85,4 @@ namespace timed_executor
} // namespace timed_executor
static void
take_and_do_error_handling(
const char *action_description,
const char *topic_or_service_name,
std::function<bool()> take_action,
std::function<void()> handle_action)
{
bool taken = false;
try
{
taken = take_action();
}
catch (const rclcpp::exceptions::RCLError &rcl_error)
{
RCLCPP_ERROR(
rclcpp::get_logger("rclcpp"),
"executor %s '%s' unexpectedly failed: %s",
action_description,
topic_or_service_name,
rcl_error.what());
}
if (taken)
{
handle_action();
}
else
{
// Message or Service was not taken for some reason.
// Note that this can be normal, if the underlying middleware needs to
// interrupt wait spuriously it is allowed.
// So in that case the executor cannot tell the difference in a
// spurious wake up and an entity actually having data until trying
// to take the data.
RCLCPP_DEBUG(
rclcpp::get_logger("rclcpp"),
"executor %s '%s' failed to take anything",
action_description,
topic_or_service_name);
}
}
#endif // RCLCPP__EXECUTORS__SINGLE_THREADED_EXECUTOR_HPP_

View file

@ -19,6 +19,7 @@
#include <vector>
#include <queue>
#include <time.h>
#include <set>
#include "rcl/allocator.h"
@ -51,10 +52,10 @@ enum ExecutableType
enum ExecutableScheduleType
{
CHAIN_INDEPENDENT_PRIORITY, // not used here
DEADLINE = 0,
CHAIN_AWARE_PRIORITY,
DEADLINE,
DEFAULT, // not used here
CHAIN_INDEPENDENT_PRIORITY, // not used here
DEFAULT, // not used here
};
class PriorityExecutable
@ -62,7 +63,7 @@ class PriorityExecutable
public:
std::shared_ptr<const void> handle;
ExecutableType type;
bool can_be_run = true;
bool can_be_run = false;
std::shared_ptr<rclcpp::Waitable> waitable;
ExecutableScheduleType sched_type;
@ -89,6 +90,11 @@ public:
PriorityExecutable();
void increment_counter();
bool operator==(const PriorityExecutable &other) const;
static size_t num_executables;
int executable_id = 0;
};
class PriorityExecutableComparator
@ -138,7 +144,10 @@ public:
rclcpp::AnyExecutable &any_exec,
const WeakNodeList &weak_nodes);
void post_execute(rclcpp::AnyExecutable any_exec);
/**
* thread-id is used for logging. if -1, then don't log the thread id
*/
void post_execute(rclcpp::AnyExecutable any_exec, int thread_id = -1);
void
get_next_subscription(
@ -161,12 +170,12 @@ public:
void
get_next_waitable(rclcpp::AnyExecutable &any_exec, const WeakNodeList &weak_nodes) override;
PriorityExecutable *get_priority_settings(std::shared_ptr<const void> executable)
std::shared_ptr<PriorityExecutable> get_priority_settings(std::shared_ptr<const void> executable)
{
auto search = priority_map.find(executable);
if (search != priority_map.end())
{
return &(search->second);
return search->second;
}
else
{
@ -248,23 +257,20 @@ public:
void set_executable_priority(std::shared_ptr<const void> handle, int priority, ExecutableType t)
{
// TODO: any sanity checks should go here
// priority_map.insert(executable, priority);
priority_map[handle] = PriorityExecutable(handle, priority, t);
priority_map[handle] = std::make_shared<PriorityExecutable>(handle, priority, t);
}
void set_executable_priority(std::shared_ptr<const void> handle, int priority, ExecutableType t, ExecutableScheduleType sc, int chain_index)
{
// TODO: any sanity checks should go here
// priority_map.insert(executable, priority);
priority_map[handle] = PriorityExecutable(handle, priority, t, sc);
priority_map[handle].chain_id = chain_index;
priority_map[handle] = std::make_shared<PriorityExecutable>(handle, priority, t, sc);
priority_map[handle]->chain_id = chain_index;
}
void set_executable_deadline(std::shared_ptr<const void> handle, int period, ExecutableType t, int chain_id = 0)
{
// TODO: any sanity checks should go here
// priority_map.insert(executable, priority);
priority_map[handle] = PriorityExecutable(handle, period, t, DEADLINE);
priority_map[handle].chain_id = chain_id;
priority_map[handle] = std::make_shared<PriorityExecutable>(handle, period, t, DEADLINE);
priority_map[handle]->chain_id = chain_id;
}
int get_priority(std::shared_ptr<const void> executable)
@ -272,7 +278,7 @@ public:
auto search = priority_map.find(executable);
if (search != priority_map.end())
{
return search->second.priority;
return search->second->priority;
}
else
{
@ -283,30 +289,29 @@ public:
void set_first_in_chain(std::shared_ptr<const void> exec_handle)
{
PriorityExecutable *settings = get_priority_settings(exec_handle);
settings->is_first_in_chain = true;
get_priority_settings(exec_handle)->is_first_in_chain = true;
}
void set_last_in_chain(std::shared_ptr<const void> exec_handle)
{
PriorityExecutable *settings = get_priority_settings(exec_handle);
settings->is_last_in_chain = true;
get_priority_settings(exec_handle)->is_last_in_chain = true;
}
void assign_deadlines_queue(std::shared_ptr<const void> exec_handle, std::deque<uint64_t> *deadlines)
{
PriorityExecutable *settings = get_priority_settings(exec_handle);
settings->deadlines = deadlines;
get_priority_settings(exec_handle)->deadlines = deadlines;
}
private:
PriorityExecutable *get_and_reset_priority(std::shared_ptr<const void> executable, ExecutableType t)
std::shared_ptr<PriorityExecutable> get_and_reset_priority(std::shared_ptr<const void> executable, ExecutableType t)
{
PriorityExecutable *p = get_priority_settings(executable);
// PriorityExecutable *p = get_priority_settings(executable);
std::shared_ptr<PriorityExecutable> p = get_priority_settings(executable);
if (p == nullptr)
{
priority_map[executable] = PriorityExecutable(executable, 0, t);
p = &(priority_map[executable]);
// priority_map[executable] = PriorityExecutable(executable, 0, t);
priority_map[executable] = std::make_shared<PriorityExecutable>(executable, 0, t);
p = priority_map[executable];
}
// p->can_be_run = true;
return p;
@ -329,10 +334,16 @@ private:
// TODO: evaluate using node/subscription namespaced strings as keys
// holds *all* handle->priority mappings
std::unordered_map<std::shared_ptr<const void>, PriorityExecutable> priority_map;
std::map<std::shared_ptr<const void>, std::shared_ptr<PriorityExecutable>> priority_map;
// hold *only valid* executable+priorities
std::priority_queue<const PriorityExecutable *, std::vector<const PriorityExecutable *>, PriorityExecutableComparator> all_executables_;
// std::priority_queue<const PriorityExecutable *, std::vector<const PriorityExecutable *>, PriorityExecutableComparator> all_executables_;
std::set<const PriorityExecutable *, PriorityExecutableComparator> all_executables_;
// priority queue doesn't allow iteration. fortunately, std::map is sorted by key, so we can replace the priority queue with a map
// the key will be the priority. the value doesn't matter.
// std::map<const PriorityExecutable *, int, PriorityExecutableComparator> all_executables_ = std::map<const PriorityExecutable *, int, PriorityExecutableComparator>(PriorityExecutableComparator());
void add_executable_to_queue(std::shared_ptr<PriorityExecutable> p);
};
#endif // RCLCPP__STRATEGIES__ALLOCATOR_MEMORY_STRATEGY_HPP_

View file

@ -0,0 +1,113 @@
#include "priority_executor/multithread_priority_executor.hpp"
namespace timed_executor
{
std::unordered_map<MultithreadTimedExecutor *, std::shared_ptr<rclcpp::detail::MutexTwoPriorities>>
MultithreadTimedExecutor::wait_mutex_set_;
std::mutex MultithreadTimedExecutor::shared_wait_mutex_;
MultithreadTimedExecutor::MultithreadTimedExecutor(
const rclcpp::ExecutorOptions &options,
std::string name,
int number_of_threads, std::chrono::nanoseconds next_exec_timeout)
: TimedExecutor(options, name)
{
std::lock_guard<std::mutex> wait_lock(MultithreadTimedExecutor::shared_wait_mutex_);
wait_mutex_set_[this] = std::make_shared<rclcpp::detail::MutexTwoPriorities>();
number_of_threads_ = number_of_threads;
next_exec_timeout_ = next_exec_timeout;
}
size_t MultithreadTimedExecutor::get_number_of_threads()
{
return number_of_threads_;
}
void MultithreadTimedExecutor::spin()
{
if (spinning.exchange(true))
{
throw std::runtime_error("spin() called while already spinning");
}
std::vector<std::thread> threads;
size_t thread_id = 0;
{
auto wait_mutex = MultithreadTimedExecutor::wait_mutex_set_[this];
auto low_priority_wait_mutex = wait_mutex->get_low_priority_lockable();
std::lock_guard<rclcpp::detail::MutexTwoPriorities::LowPriorityLockable> wait_lock(low_priority_wait_mutex);
for (; thread_id < number_of_threads_ - 1; ++thread_id)
{
auto func = std::bind(&MultithreadTimedExecutor::run, this, thread_id);
threads.emplace_back(func);
}
}
run(thread_id);
for (auto &thread : threads)
{
thread.join();
}
}
void MultithreadTimedExecutor::run(size_t thread_number)
{
// set affinity
cpu_set_t cpuset;
CPU_ZERO(&cpuset);
CPU_SET(thread_number, &cpuset);
int rc = pthread_setaffinity_np(pthread_self(), sizeof(cpu_set_t), &cpuset);
if (rc != 0)
{
std::cout << "Error calling pthread_setaffinity_np: " << rc << "\n";
}
while (rclcpp::ok(this->context_) && spinning.load())
{
rclcpp::AnyExecutable any_executable;
{
auto wait_mutex = MultithreadTimedExecutor::wait_mutex_set_[this];
auto low_priority_wait_mutex = wait_mutex->get_low_priority_lockable();
std::lock_guard<rclcpp::detail::MutexTwoPriorities::LowPriorityLockable> wait_lock(low_priority_wait_mutex);
if (!rclcpp::ok(this->context_) || !spinning.load())
{
return;
}
if (!get_next_executable(any_executable, next_exec_timeout_))
{
continue;
}
if (any_executable.timer)
{
if (scheduled_timers_.count(any_executable.timer) != 0)
{
if (any_executable.callback_group)
{
any_executable.callback_group->can_be_taken_from().store(true);
}
continue;
}
scheduled_timers_.insert(any_executable.timer);
}
}
execute_any_executable(any_executable);
if (any_executable.timer)
{
auto wait_mutex = MultithreadTimedExecutor::wait_mutex_set_[this];
auto high_priority_wait_mutex = wait_mutex->get_high_priority_lockable();
std::lock_guard<rclcpp::detail::MutexTwoPriorities::HighPriorityLockable> wait_lock(high_priority_wait_mutex);
auto it = scheduled_timers_.find(any_executable.timer);
if (it != scheduled_timers_.end())
{
scheduled_timers_.erase(it);
}
}
any_executable.callback_group.reset();
if (prio_memory_strategy_ != nullptr)
{
auto wait_mutex = MultithreadTimedExecutor::wait_mutex_set_[this];
auto low_priority_wait_mutex = wait_mutex->get_low_priority_lockable();
std::lock_guard<rclcpp::detail::MutexTwoPriorities::LowPriorityLockable> wait_lock(low_priority_wait_mutex);
// std::shared_ptr<PriorityMemoryStrategy<>> prio_memory_strategy_ = std::dynamic_pointer_cast<PriorityMemoryStrategy>(memory_strategy_);
prio_memory_strategy_->post_execute(any_executable, thread_number);
}
}
}
}

View file

@ -20,6 +20,8 @@
#include "rclcpp/exceptions.hpp"
#include <memory>
#include <sched.h>
// for sleep
#include <unistd.h>
namespace timed_executor
{
@ -60,19 +62,17 @@ namespace timed_executor
std::cout << "shutdown" << std::endl;
}
unsigned long long TimedExecutor::get_max_runtime(void)
{
return maxRuntime;
}
bool TimedExecutor::get_next_executable(rclcpp::AnyExecutable &any_executable, std::chrono::nanoseconds timeout)
{
bool success = false;
// Check to see if there are any subscriptions or timers needing service
// TODO(wjwwood): improve run to run efficiency of this function
sched_yield();
wait_for_work(std::chrono::milliseconds(1));
// sched_yield();
// sleep for 10us
// usleep(20);
wait_for_work(timeout);
success = get_next_ready_executable(any_executable);
// std::cout << "get_next_executable: " << success << std::endl;
return success;
}

View file

@ -1,9 +1,12 @@
#include "priority_executor/priority_memory_strategy.hpp"
#include "simple_timer/rt-sched.hpp"
#include <sstream>
size_t PriorityExecutable::num_executables;
PriorityExecutable::PriorityExecutable(std::shared_ptr<const void> h, int p, ExecutableType t, ExecutableScheduleType sched_type)
{
std::cout << "priority_executable constructor called" << std::endl;
std::cout << "type: " << t << std::endl;
handle = h;
type = t;
if (sched_type == CHAIN_INDEPENDENT_PRIORITY || sched_type == CHAIN_AWARE_PRIORITY)
@ -15,6 +18,9 @@ PriorityExecutable::PriorityExecutable(std::shared_ptr<const void> h, int p, Exe
period = p;
}
this->sched_type = sched_type;
this->executable_id = num_executables;
num_executables += 1;
}
PriorityExecutable::PriorityExecutable()
@ -32,14 +38,7 @@ void PriorityExecutable::dont_run()
void PriorityExecutable::allow_run()
{
if (this->can_be_run)
{
// release has been detected
}
else
{
this->can_be_run = true;
}
this->can_be_run = true;
}
void PriorityExecutable::increment_counter()
@ -47,53 +46,58 @@ void PriorityExecutable::increment_counter()
this->counter += 1;
}
bool PriorityExecutable::operator==(const PriorityExecutable &other) const
{
std::cout << "PriorityExecutable::operator== called" << std::endl;
if (this->handle == other.handle)
{
return true;
}
else
{
return false;
}
}
bool PriorityExecutableComparator::operator()(const PriorityExecutable *p1, const PriorityExecutable *p2)
{
// since this will be used in a std::set, also check for equality
if (p1 == nullptr || p2 == nullptr)
{
// TODO: realistic value
return 0;
std::cout << "PriorityExecutableComparator::operator() called with nullptr" << std::endl;
return false;
}
if (p1->handle == p2->handle)
{
return false;
}
if (p1->executable_id == p2->executable_id)
{
return false;
}
if (p1->sched_type != p2->sched_type)
{
if (p1->sched_type == DEADLINE)
{
return false;
}
else if (p2->sched_type == DEADLINE)
{
return true;
}
if (p1->sched_type == CHAIN_INDEPENDENT_PRIORITY)
{
return false;
}
else if (p2->sched_type == CHAIN_INDEPENDENT_PRIORITY)
{
return true;
}
else if (p1->sched_type == CHAIN_AWARE_PRIORITY)
{
return false;
}
else
{
return true;
}
// in order: DEADLINE, CHAIN_AWARE, CHAIN_INDEPENDENT
return p1->sched_type < p2->sched_type;
}
if (p1->sched_type == CHAIN_INDEPENDENT_PRIORITY)
{
if (p1->priority == p2->priority)
{
return p1->executable_id < p2->executable_id;
}
// lower value runs first
return p1->priority > p2->priority;
return p1->priority < p2->priority;
}
if (p1->sched_type == CHAIN_AWARE_PRIORITY)
{
if (p1->priority != p2->priority)
if (p1->priority == p2->priority)
{
return p1->priority > p2->priority;
return p1->executable_id < p2->executable_id;
}
// return p1->counter > p2->counter;
return 0;
return p1->priority < p2->priority;
}
if (p1->sched_type == DEADLINE)
{
@ -109,26 +113,27 @@ bool PriorityExecutableComparator::operator()(const PriorityExecutable *p1, cons
{
p2_deadline = p2->deadlines->front();
}
if (p1_deadline == p2_deadline)
{
// this looks bad and is bad, BUT
// if we tell std::set these are equal, only one will be added, and we will lose the other.
// we need _something_ to make them unique
return p1->executable_id < p2->executable_id;
}
if (p1_deadline == 0)
{
return true;
p1_deadline = std::numeric_limits<uint64_t>::max();
}
if (p2_deadline == 0)
{
return false;
p2_deadline = std::numeric_limits<uint64_t>::max();
}
if (p1_deadline == p2_deadline)
{
// these must be from the same chain
// settle the difference with the counter
return p1->counter > p2->counter;
}
return p1_deadline > p2_deadline;
return p1_deadline < p2_deadline;
}
else
{
std::cout << "invalid compare opration on priority_exec" << std::endl;
return 0;
return false;
}
}
@ -158,6 +163,15 @@ void PriorityMemoryStrategy<>::remove_guard_condition(const rcl_guard_condition_
}
}
template <>
void PriorityMemoryStrategy<>::add_executable_to_queue(std::shared_ptr<PriorityExecutable> e)
{
// e may have changed. remove and re-add
all_executables_.erase(e.get());
// convert from shared_ptr to raw pointer
all_executables_.insert(e.get());
}
template <>
void PriorityMemoryStrategy<>::clear_handles()
{
@ -167,14 +181,9 @@ void PriorityMemoryStrategy<>::clear_handles()
timer_handles_.clear();
waitable_handles_.clear();
// priority_queue doesn't have a clear function, so we swap it with an empty one. `empty` will go out of scope, and be cleared
// all_executables_ = std::priority_queue<const PriorityExecutable *, std::vector<const PriorityExecutable *>, PriorityExecutableComparator>();
std::priority_queue<const PriorityExecutable *, std::vector<const PriorityExecutable *>, PriorityExecutableComparator> empty;
std::swap(all_executables_, empty);
if (!all_executables_.empty())
{
std::cout << "couldn't clear all exec" << std::endl;
}
// all_executables_.clear();
// all_executables_ = std::map<const PriorityExecutable *, int, PriorityExecutableComparator>(PriorityExecutableComparator());
}
template <>
@ -190,60 +199,60 @@ void PriorityMemoryStrategy<>::remove_null_handles(rcl_wait_set_t *wait_set)
{
if (!wait_set->subscriptions[i])
{
priority_map[subscription_handles_[i]].dont_run();
priority_map[subscription_handles_[i]]->dont_run();
subscription_handles_[i].reset();
}
else
{
priority_map[subscription_handles_[i]].allow_run();
priority_map[subscription_handles_[i]]->allow_run();
}
}
for (size_t i = 0; i < service_handles_.size(); ++i)
{
if (!wait_set->services[i])
{
priority_map[service_handles_[i]].dont_run();
priority_map[service_handles_[i]]->dont_run();
service_handles_[i].reset();
}
else
{
priority_map[service_handles_[i]].allow_run();
priority_map[service_handles_[i]]->allow_run();
}
}
for (size_t i = 0; i < client_handles_.size(); ++i)
{
if (!wait_set->clients[i])
{
priority_map[client_handles_[i]].dont_run();
priority_map[client_handles_[i]]->dont_run();
client_handles_[i].reset();
}
else
{
priority_map[client_handles_[i]].allow_run();
priority_map[client_handles_[i]]->allow_run();
}
}
for (size_t i = 0; i < timer_handles_.size(); ++i)
{
if (!wait_set->timers[i])
{
priority_map[timer_handles_[i]].dont_run();
priority_map[timer_handles_[i]]->dont_run();
timer_handles_[i].reset();
}
else
{
priority_map[timer_handles_[i]].allow_run();
priority_map[timer_handles_[i]]->allow_run();
}
}
for (size_t i = 0; i < waitable_handles_.size(); ++i)
{
if (!waitable_handles_[i]->is_ready(wait_set))
{
priority_map[waitable_handles_[i]].dont_run();
priority_map[waitable_handles_[i]]->dont_run();
waitable_handles_[i].reset();
}
else
{
priority_map[waitable_handles_[i]].allow_run();
priority_map[waitable_handles_[i]]->allow_run();
}
}
@ -293,35 +302,34 @@ bool PriorityMemoryStrategy<>::collect_entities(const WeakNodeList &weak_nodes)
{
auto subscription_handle = subscription->get_subscription_handle();
subscription_handles_.push_back(subscription_handle);
all_executables_.push(get_and_reset_priority(subscription_handle, SUBSCRIPTION));
add_executable_to_queue(get_and_reset_priority(subscription_handle, SUBSCRIPTION));
return false;
});
group->find_service_ptrs_if(
[this](const rclcpp::ServiceBase::SharedPtr &service)
{
all_executables_.push(get_and_reset_priority(service->get_service_handle(), SERVICE));
add_executable_to_queue(get_and_reset_priority(service->get_service_handle(), SERVICE));
service_handles_.push_back(service->get_service_handle());
return false;
});
group->find_client_ptrs_if(
[this](const rclcpp::ClientBase::SharedPtr &client)
{
all_executables_.push(get_and_reset_priority(client->get_client_handle(), CLIENT));
add_executable_to_queue(get_and_reset_priority(client->get_client_handle(), CLIENT));
client_handles_.push_back(client->get_client_handle());
return false;
});
group->find_timer_ptrs_if(
[this](const rclcpp::TimerBase::SharedPtr &timer)
{
all_executables_.push(get_and_reset_priority(timer->get_timer_handle(), TIMER));
add_executable_to_queue(get_and_reset_priority(timer->get_timer_handle(), TIMER));
timer_handles_.push_back(timer->get_timer_handle());
return false;
});
group->find_waitable_ptrs_if(
[this](const rclcpp::Waitable::SharedPtr &waitable)
{
all_executables_.push(get_and_reset_priority(waitable, WAITABLE));
add_executable_to_queue(get_and_reset_priority(waitable, WAITABLE));
waitable_handles_.push_back(waitable);
return false;
});
@ -420,10 +428,17 @@ void PriorityMemoryStrategy<>::get_next_executable(
{
const PriorityExecutable *next_exec = nullptr;
while (!all_executables_.empty())
// std::cout << "all_executables_.size():" << all_executables_.size() << std::endl;
// print all_executables_
// for (auto exec : all_executables_)
// {
// std::cout << exec << ": " << exec->handle << " : " << exec->sched_type << std::endl;
// }
// while (!all_executables_.empty())
for (auto exec : all_executables_)
{
next_exec = all_executables_.top();
all_executables_.pop();
next_exec = exec;
if (!next_exec->can_be_run)
{
continue;
@ -571,22 +586,26 @@ void PriorityMemoryStrategy<>::get_next_executable(
}
break;
default:
// std::cout << "Unknown type from priority!!!" << std::endl;
break;
std::cout << "Unknown type from priority!!!" << std::endl;
continue;
// break;
}
if (next_exec->is_first_in_chain && next_exec->sched_type == DEADLINE)
{
// std::cout << "running first in chain deadline" << std::endl;
}
// returning with an executable
// remove from all_executables_ map
all_executables_.erase(exec);
return;
}
}
template <>
void PriorityMemoryStrategy<>::post_execute(rclcpp::AnyExecutable any_exec)
void PriorityMemoryStrategy<>::post_execute(rclcpp::AnyExecutable any_exec, int thread_id)
{
PriorityExecutable *next_exec = nullptr;
std::shared_ptr<PriorityExecutable> next_exec = nullptr;
if (any_exec.subscription)
{
next_exec = get_priority_settings(any_exec.subscription->get_subscription_handle());
@ -660,7 +679,7 @@ void PriorityMemoryStrategy<>::post_execute(rclcpp::AnyExecutable any_exec)
int64_t time_diff = millis - this_deadline;
int periods_late;
// if time_diff is negative, we completed on time. add one period to the deadline
if (time_diff < 0)
if (time_diff <= 0)
{
periods_late = 0;
next_deadline = this_deadline + next_exec->period;
@ -670,7 +689,7 @@ void PriorityMemoryStrategy<>::post_execute(rclcpp::AnyExecutable any_exec)
else
{
periods_late = std::ceil(time_diff / (double)next_exec->period);
next_deadline = this_deadline + (periods_late + 1) * next_exec->period;
next_deadline = this_deadline + (periods_late)*next_exec->period;
on_time = false;
}
// std::chrono::nanoseconds time_until = next_exec->timer_handle->time_until_trigger();
@ -678,9 +697,19 @@ void PriorityMemoryStrategy<>::post_execute(rclcpp::AnyExecutable any_exec)
// the next deadline is the current time plus the period, skipping periods that have already passed
// RCLCPP_INFO(rclcpp::get_logger("rclcpp"), "next deadline: %d", next_deadline);
oss << "{\"operation\": \"next_deadline\", \"chain_id\": " << next_exec->chain_id << ", \"deadline\": " << next_deadline << ", \"on_time\": " << on_time << ", \"time_diff\": " << time_diff << ", \"periods_late\": " << periods_late << "}";
oss << "{\"operation\": \"next_deadline\", \"chain_id\": " << next_exec->chain_id << ", \"deadline\": " << next_deadline << ", \"on_time\": " << on_time << ", \"time_diff\": " << time_diff << ", \"periods_late\": " << periods_late;
if (thread_id != -1)
{
oss << ", \"thread_id\": " << thread_id;
}
oss << "}";
// oss << "{\"operation\": \"next_deadline\", \"chain_id\": " << next_exec->chain_id << ", \"deadline\": " << next_exec->deadlines->front() << "}";
log_entry(logger_, oss.str());
if (time_diff < -next_exec->period)
{
// the deadline was much later than it needed to be
}
next_exec->deadlines->push_back(next_deadline);
// print chain id and deadline contents
@ -699,7 +728,7 @@ void PriorityMemoryStrategy<>::post_execute(rclcpp::AnyExecutable any_exec)
// this is safe, since we popped it earlier
// get a mutable reference
// TODO: find a cleaner way to do this
PriorityExecutable *mut_executable = get_priority_settings(next_exec->handle);
std::shared_ptr<PriorityExecutable> mut_executable = get_priority_settings(next_exec->handle);
// std::cout << "running chain aware cb" << std::endl;
mut_executable->increment_counter();
}