support default executor instrumentation

This commit is contained in:
Kurt Wilson 2023-10-10 15:29:20 -04:00
parent 65b0cf2072
commit daffbf55b8
6 changed files with 158 additions and 27 deletions

View file

@ -21,6 +21,7 @@
#include <cstdlib>
#include <memory>
#include <vector>
#include <set>
#include "rclcpp/executor.hpp"
#include "rclcpp/rclcpp.hpp"
@ -30,33 +31,38 @@
#include "rclcpp/utilities.hpp"
#include "rclcpp/rate.hpp"
#include "rclcpp/visibility_control.hpp"
#include "rclcpp/detail/mutex_two_priorities.hpp"
#include "priority_executor/priority_memory_strategy.hpp"
#include "simple_timer/rt-sched.hpp"
class RTISTimed
{
public:
node_time_logger logger_ = create_logger();
node_time_logger logger_;
};
class ROSDefaultMultithreadedExecutor : public rclcpp::executors::MultiThreadedExecutor, public RTISTimed
class ROSDefaultMultithreadedExecutor : public rclcpp::Executor, public RTISTimed
{
public:
RCLCPP_SMART_PTR_DEFINITIONS(ROSDefaultMultithreadedExecutor)
/// Default constructor. See the default constructor for Executor.
RCLCPP_PUBLIC explicit ROSDefaultMultithreadedExecutor(
const rclcpp::ExecutorOptions &options = rclcpp::ExecutorOptions(),
size_t number_of_threads = 0,
bool yield_before_execute = false,
std::chrono::nanoseconds timeout = std::chrono::nanoseconds(-1)) : rclcpp::executors::MultiThreadedExecutor(options, number_of_threads, yield_before_execute, timeout){};
/// Default destructor.
RCLCPP_PUBLIC
virtual ~ROSDefaultMultithreadedExecutor();
explicit ROSDefaultMultithreadedExecutor(
const rclcpp::ExecutorOptions &options = rclcpp::ExecutorOptions(), int number_of_threads = 2, std::chrono::nanoseconds next_exec_timeout = std::chrono::nanoseconds(-1));
RCLCPP_PUBLIC
RCLCPP_PUBLIC size_t get_number_of_threads();
RCLCPP_PUBLIC void spin() override;
bool get_next_executable(rclcpp::AnyExecutable &any_executable, std::chrono::nanoseconds timeout = std::chrono::nanoseconds(-1));
protected:
RCLCPP_PUBLIC void run(size_t thread_number);
private:
size_t number_of_threads_;
std::set<rclcpp::TimerBase::SharedPtr> scheduled_timers_;
static std::unordered_map<ROSDefaultMultithreadedExecutor *,
std::shared_ptr<rclcpp::detail::MutexTwoPriorities>>
wait_mutex_set_;
static std::mutex shared_wait_mutex_;
std::chrono::nanoseconds next_exec_timeout_ = std::chrono::nanoseconds(-1);
};
/// Single-threaded executor implementation.

View file

@ -31,6 +31,7 @@
#include "rclcpp/rate.hpp"
#include "rclcpp/visibility_control.hpp"
#include "priority_executor/priority_memory_strategy.hpp"
#include <priority_executor/default_executor.hpp>
namespace timed_executor
{
@ -38,7 +39,7 @@ namespace timed_executor
/**
* This is the default executor created by rclcpp::spin.
*/
class TimedExecutor : public rclcpp::Executor
class TimedExecutor : public rclcpp::Executor, public RTISTimed
{
public:
RCLCPP_SMART_PTR_DEFINITIONS(TimedExecutor)

View file

@ -25,9 +25,6 @@ ROSDefaultExecutor::ROSDefaultExecutor(const rclcpp::ExecutorOptions &options)
ROSDefaultExecutor::~ROSDefaultExecutor() {}
ROSDefaultMultithreadedExecutor::~ROSDefaultMultithreadedExecutor() {}
void ROSDefaultExecutor::wait_for_work(std::chrono::nanoseconds timeout)
{
{
@ -134,9 +131,13 @@ bool ROSDefaultMultithreadedExecutor::get_next_executable(rclcpp::AnyExecutable
// TODO(wjwwood): improve run to run efficiency of this function
// try to get an executable
// record the start time
auto start = std::chrono::steady_clock::now();
success = get_next_ready_executable(any_executable);
// and the end time
auto end = std::chrono::steady_clock::now();
std::stringstream oss;
oss << "{\"operation\":\"get_next_executable\", \"result\":\"" << success << "\"}";
oss << "{\"operation\":\"get_next_executable\", \"result\":\"" << success << "\", \"duration\":\"" << std::chrono::duration_cast<std::chrono::microseconds>(end - start).count() << "\"}";
log_entry(logger_, oss.str());
// If there are none
@ -144,15 +145,124 @@ bool ROSDefaultMultithreadedExecutor::get_next_executable(rclcpp::AnyExecutable
{
// Wait for subscriptions or timers to work on
// queue refresh
start = std::chrono::steady_clock::now();
wait_for_work(timeout);
// and the end time
end = std::chrono::steady_clock::now();
auto wait_duration = std::chrono::duration_cast<std::chrono::microseconds>(end - start).count();
oss.str("");
oss << "{\"operation\":\"wait_for_work\", \"result\":\"" << success << "\", \"wait_duration\":\"" << wait_duration << "\"}";
log_entry(logger_, oss.str());
if (!spinning.load())
{
return false;
}
// Try again
start = std::chrono::steady_clock::now();
success = get_next_ready_executable(any_executable);
// and the end time
end = std::chrono::steady_clock::now();
oss.str("");
oss << "{\"operation\":\"wait_for_work\", \"result\":\"" << success << "\"}";
oss << "{\"operation\":\"get_next_executable\", \"result\":\"" << success << "\", \"duration\":\"" << std::chrono::duration_cast<std::chrono::microseconds>(end - start).count() << "\"}";
log_entry(logger_, oss.str());
}
return success;
}
std::unordered_map<ROSDefaultMultithreadedExecutor *, std::shared_ptr<rclcpp::detail::MutexTwoPriorities>> ROSDefaultMultithreadedExecutor::wait_mutex_set_;
std::mutex ROSDefaultMultithreadedExecutor::shared_wait_mutex_;
ROSDefaultMultithreadedExecutor::ROSDefaultMultithreadedExecutor(
const rclcpp::ExecutorOptions &options,
int number_of_threads, std::chrono::nanoseconds next_exec_timeout)
: Executor(options)
{
std::lock_guard<std::mutex> wait_lock(ROSDefaultMultithreadedExecutor::shared_wait_mutex_);
wait_mutex_set_[this] = std::make_shared<rclcpp::detail::MutexTwoPriorities>();
number_of_threads_ = number_of_threads;
next_exec_timeout_ = next_exec_timeout;
logger_ = create_logger();
}
void ROSDefaultMultithreadedExecutor::spin()
{
if (spinning.exchange(true))
{
throw std::runtime_error("spin() called while already spinning");
}
std::vector<std::thread> threads;
size_t thread_id = 0;
{
auto wait_mutex = ROSDefaultMultithreadedExecutor::wait_mutex_set_[this];
auto low_priority_wait_mutex = wait_mutex->get_low_priority_lockable();
std::lock_guard<rclcpp::detail::MutexTwoPriorities::LowPriorityLockable> wait_lock(low_priority_wait_mutex);
for (; thread_id < number_of_threads_ - 1; ++thread_id)
{
auto func = std::bind(&ROSDefaultMultithreadedExecutor::run, this, thread_id);
threads.emplace_back(func);
}
}
run(thread_id);
for (auto &thread : threads)
{
thread.join();
}
}
void ROSDefaultMultithreadedExecutor::run(size_t thread_number)
{
while (rclcpp::ok(this->context_) && spinning.load())
{
rclcpp::AnyExecutable any_exec;
{
auto wait_mutex = ROSDefaultMultithreadedExecutor::wait_mutex_set_[this];
auto low_priority_wait_mutex = wait_mutex->get_low_priority_lockable();
std::lock_guard<rclcpp::detail::MutexTwoPriorities::LowPriorityLockable> wait_lock(low_priority_wait_mutex);
if (!rclcpp::ok(this->context_) || !spinning.load())
{
return;
}
if (!get_next_executable(any_exec, next_exec_timeout_))
{
continue;
}
if (any_exec.timer)
{
// Guard against multiple threads getting the same timer.
if (scheduled_timers_.count(any_exec.timer) != 0)
{
// Make sure that any_exec's callback group is reset before
// the lock is released.
if (any_exec.callback_group)
{
any_exec.callback_group->can_be_taken_from().store(true);
}
continue;
}
scheduled_timers_.insert(any_exec.timer);
}
}
// if (yield_before_execute_)
// {
// std::this_thread::yield();
// }
execute_any_executable(any_exec);
if (any_exec.timer)
{
auto wait_mutex = ROSDefaultMultithreadedExecutor::wait_mutex_set_[this];
auto high_priority_wait_mutex = wait_mutex->get_high_priority_lockable();
std::lock_guard<rclcpp::detail::MutexTwoPriorities::HighPriorityLockable> wait_lock(high_priority_wait_mutex);
auto it = scheduled_timers_.find(any_exec.timer);
if (it != scheduled_timers_.end())
{
scheduled_timers_.erase(it);
}
}
// Clear the callback_group to prevent the AnyExecutable destructor from
// resetting the callback group `can_be_taken_from`
any_exec.callback_group.reset();
}
}

View file

@ -14,6 +14,7 @@ namespace timed_executor
wait_mutex_set_[this] = std::make_shared<rclcpp::detail::MutexTwoPriorities>();
number_of_threads_ = number_of_threads;
next_exec_timeout_ = next_exec_timeout;
logger_ = create_logger();
}
size_t MultithreadTimedExecutor::get_number_of_threads()

View file

@ -29,6 +29,7 @@ namespace timed_executor
: rclcpp::Executor(options)
{
this->name = name;
logger_ = create_logger();
}
TimedExecutor::~TimedExecutor() {}
@ -70,9 +71,21 @@ namespace timed_executor
// sched_yield();
// sleep for 10us
// usleep(20);
auto start = std::chrono::steady_clock::now();
wait_for_work(timeout);
auto end = std::chrono::steady_clock::now();
auto wait_duration = std::chrono::duration_cast<std::chrono::microseconds>(end - start);
std::ostringstream oss;
oss << "{\"operation\": \"wait_for_work\", \"wait_duration\": " << wait_duration.count() << "}";
log_entry(logger_, oss.str());
start = std::chrono::steady_clock::now();
success = get_next_ready_executable(any_executable);
// std::cout << "get_next_executable: " << success << std::endl;
end = std::chrono::steady_clock::now();
auto get_next_duration = std::chrono::duration_cast<std::chrono::microseconds>(end - start);
oss.str("");
oss << "{\"operation\": \"get_next_executable\", \"duration\": " << get_next_duration.count() << ", \"result\": " << success << "}";
log_entry(logger_, oss.str());
return success;
}

View file

@ -627,15 +627,15 @@ void PriorityMemoryStrategy<>::get_next_executable(
// returning with an executable
// remove from all_executables_ map
all_executables_.erase(next_exec);
std::stringstream oss;
oss << "{\"operation\":\"get_next_executable\", \"result\":\"success\", \"type\":\"" << type << "\"}";
log_entry(logger_, oss.str());
// std::stringstream oss;
// oss << "{\"operation\":\"get_next_executable\", \"result\":\"success\", \"type\":\"" << type << "\"}";
// log_entry(logger_, oss.str());
return;
}
// nothing found
std::stringstream oss;
oss << "{\"operation\":\"get_next_executable\", \"result\":\"nothing_found\"}";
log_entry(logger_, oss.str());
// std::stringstream oss;
// oss << "{\"operation\":\"get_next_executable\", \"result\":\"nothing_found\"}";
// log_entry(logger_, oss.str());
}
template <>