Remove thread from Timer, remove guard conditions for timers
This commit is contained in:
parent
a4154a2424
commit
44c296f7ab
3 changed files with 112 additions and 113 deletions
|
@ -251,7 +251,6 @@ protected:
|
||||||
// Collect the subscriptions and timers to be waited on
|
// Collect the subscriptions and timers to be waited on
|
||||||
bool has_invalid_weak_nodes = false;
|
bool has_invalid_weak_nodes = false;
|
||||||
std::vector<rclcpp::subscription::SubscriptionBase::SharedPtr> subs;
|
std::vector<rclcpp::subscription::SubscriptionBase::SharedPtr> subs;
|
||||||
std::vector<rclcpp::timer::TimerBase::SharedPtr> timers;
|
|
||||||
std::vector<rclcpp::service::ServiceBase::SharedPtr> services;
|
std::vector<rclcpp::service::ServiceBase::SharedPtr> services;
|
||||||
std::vector<rclcpp::client::ClientBase::SharedPtr> clients;
|
std::vector<rclcpp::client::ClientBase::SharedPtr> clients;
|
||||||
for (auto & weak_node : weak_nodes_) {
|
for (auto & weak_node : weak_nodes_) {
|
||||||
|
@ -271,12 +270,6 @@ protected:
|
||||||
subs.push_back(subscription);
|
subs.push_back(subscription);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
for (auto & weak_timer : group->timer_ptrs_) {
|
|
||||||
auto timer = weak_timer.lock();
|
|
||||||
if (timer) {
|
|
||||||
timers.push_back(timer);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
for (auto & service : group->service_ptrs_) {
|
for (auto & service : group->service_ptrs_) {
|
||||||
services.push_back(service);
|
services.push_back(service);
|
||||||
}
|
}
|
||||||
|
@ -349,10 +342,9 @@ protected:
|
||||||
client_handle_index += 1;
|
client_handle_index += 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Use the number of guard conditions to allocate memory in the handles
|
// The number of guard conditions is fixed at 2: 1 for the ctrl-c guard cond,
|
||||||
// Add 2 to the number for the ctrl-c guard cond and the executor's
|
// and one for the executor's guard cond (interrupt_guard_condition_)
|
||||||
size_t start_of_timer_guard_conds = 2;
|
size_t number_of_guard_conds = 2;
|
||||||
size_t number_of_guard_conds = timers.size() + start_of_timer_guard_conds;
|
|
||||||
rmw_guard_conditions_t guard_condition_handles;
|
rmw_guard_conditions_t guard_condition_handles;
|
||||||
guard_condition_handles.guard_condition_count = number_of_guard_conds;
|
guard_condition_handles.guard_condition_count = number_of_guard_conds;
|
||||||
guard_condition_handles.guard_conditions =
|
guard_condition_handles.guard_conditions =
|
||||||
|
@ -368,21 +360,25 @@ protected:
|
||||||
// Put the executor's guard condition in
|
// Put the executor's guard condition in
|
||||||
guard_condition_handles.guard_conditions[1] = \
|
guard_condition_handles.guard_conditions[1] = \
|
||||||
interrupt_guard_condition_->data;
|
interrupt_guard_condition_->data;
|
||||||
// Then fill the SubscriberHandles with ready subscriptions
|
|
||||||
size_t guard_cond_handle_index = start_of_timer_guard_conds;
|
|
||||||
for (auto & timer : timers) {
|
|
||||||
guard_condition_handles.guard_conditions[guard_cond_handle_index] = \
|
|
||||||
timer->guard_condition_->data;
|
|
||||||
guard_cond_handle_index += 1;
|
|
||||||
}
|
|
||||||
|
|
||||||
rmw_time_t * wait_timeout = NULL;
|
rmw_time_t * wait_timeout = NULL;
|
||||||
rmw_time_t rmw_timeout;
|
rmw_time_t rmw_timeout;
|
||||||
|
|
||||||
if (timeout >= std::chrono::duration<int64_t, T>::zero()) {
|
auto next_timer_duration = get_earliest_timer();
|
||||||
|
// If the next timer timeout must preempt the requested timeout
|
||||||
|
// or if the requested timeout blocks forever, and there exists a valid timer,
|
||||||
|
// replace the requested timeout with the next timeout.
|
||||||
|
bool has_valid_timer = next_timer_duration >= std::chrono::nanoseconds::zero();
|
||||||
|
if ((next_timer_duration < timeout ||
|
||||||
|
timeout < std::chrono::duration<int64_t, T>::zero()) && has_valid_timer)
|
||||||
|
{
|
||||||
|
rmw_timeout.sec =
|
||||||
|
std::chrono::duration_cast<std::chrono::seconds>(next_timer_duration).count();
|
||||||
|
rmw_timeout.nsec = next_timer_duration.count() % (1000 * 1000 * 1000);
|
||||||
|
wait_timeout = &rmw_timeout;
|
||||||
|
} else if (timeout >= std::chrono::duration<int64_t, T>::zero()) {
|
||||||
// Convert timeout representation to rmw_time
|
// Convert timeout representation to rmw_time
|
||||||
auto timeout_sec = std::chrono::duration_cast<std::chrono::seconds>(timeout);
|
rmw_timeout.sec = std::chrono::duration_cast<std::chrono::seconds>(timeout).count();
|
||||||
rmw_timeout.sec = timeout_sec.count();
|
|
||||||
rmw_timeout.nsec = std::chrono::duration_cast<std::chrono::nanoseconds>(timeout).count() %
|
rmw_timeout.nsec = std::chrono::duration_cast<std::chrono::nanoseconds>(timeout).count() %
|
||||||
(1000 * 1000 * 1000);
|
(1000 * 1000 * 1000);
|
||||||
wait_timeout = &rmw_timeout;
|
wait_timeout = &rmw_timeout;
|
||||||
|
@ -419,13 +415,6 @@ protected:
|
||||||
subscriber_handles_.push_back(handle);
|
subscriber_handles_.push_back(handle);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// Then the timers, start with start_of_timer_guard_conds
|
|
||||||
for (size_t i = start_of_timer_guard_conds; i < number_of_guard_conds; ++i) {
|
|
||||||
void * handle = guard_condition_handles.guard_conditions[i];
|
|
||||||
if (handle) {
|
|
||||||
guard_condition_handles_.push_back(handle);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// Then the services
|
// Then the services
|
||||||
for (size_t i = 0; i < number_of_services; ++i) {
|
for (size_t i = 0; i < number_of_services; ++i) {
|
||||||
void * handle = service_handles.services[i];
|
void * handle = service_handles.services[i];
|
||||||
|
@ -478,30 +467,6 @@ protected:
|
||||||
return rclcpp::subscription::SubscriptionBase::SharedPtr();
|
return rclcpp::subscription::SubscriptionBase::SharedPtr();
|
||||||
}
|
}
|
||||||
|
|
||||||
rclcpp::timer::TimerBase::SharedPtr
|
|
||||||
get_timer_by_handle(void * guard_condition_handle)
|
|
||||||
{
|
|
||||||
for (auto weak_node : weak_nodes_) {
|
|
||||||
auto node = weak_node.lock();
|
|
||||||
if (!node) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
for (auto weak_group : node->callback_groups_) {
|
|
||||||
auto group = weak_group.lock();
|
|
||||||
if (!group) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
for (auto weak_timer : group->timer_ptrs_) {
|
|
||||||
auto timer = weak_timer.lock();
|
|
||||||
if (timer && timer->guard_condition_->data == guard_condition_handle) {
|
|
||||||
return timer;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return rclcpp::timer::TimerBase::SharedPtr();
|
|
||||||
}
|
|
||||||
|
|
||||||
rclcpp::service::ServiceBase::SharedPtr
|
rclcpp::service::ServiceBase::SharedPtr
|
||||||
get_service_by_handle(void * service_handle)
|
get_service_by_handle(void * service_handle)
|
||||||
{
|
{
|
||||||
|
@ -548,7 +513,6 @@ protected:
|
||||||
return rclcpp::client::ClientBase::SharedPtr();
|
return rclcpp::client::ClientBase::SharedPtr();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
rclcpp::node::Node::SharedPtr
|
rclcpp::node::Node::SharedPtr
|
||||||
get_node_by_group(rclcpp::callback_group::CallbackGroup::SharedPtr & group)
|
get_node_by_group(rclcpp::callback_group::CallbackGroup::SharedPtr & group)
|
||||||
{
|
{
|
||||||
|
@ -598,35 +562,59 @@ protected:
|
||||||
void
|
void
|
||||||
get_next_timer(AnyExecutable::SharedPtr & any_exec)
|
get_next_timer(AnyExecutable::SharedPtr & any_exec)
|
||||||
{
|
{
|
||||||
auto it = guard_condition_handles_.begin();
|
for (auto & weak_node : weak_nodes_) {
|
||||||
while (it != guard_condition_handles_.end()) {
|
auto node = weak_node.lock();
|
||||||
auto timer = get_timer_by_handle(*it);
|
if (!node) {
|
||||||
if (timer) {
|
|
||||||
// Find the group for this handle and see if it can be serviced
|
|
||||||
auto group = get_group_by_timer(timer);
|
|
||||||
if (!group) {
|
|
||||||
// Group was not found, meaning the timer is not valid...
|
|
||||||
// Remove it from the ready list and continue looking
|
|
||||||
guard_condition_handles_.erase(it++);
|
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
if (!group->can_be_taken_from_.load()) {
|
for (auto & weak_group : node->callback_groups_) {
|
||||||
// Group is mutually exclusive and is being used, so skip it for now
|
auto group = weak_group.lock();
|
||||||
// Leave it to be checked next time, but continue searching
|
if (!group || !group->can_be_taken_from_.load()) {
|
||||||
++it;
|
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
// Otherwise it is safe to set and return the any_exec
|
for (auto & timer_ref : group->timer_ptrs_) {
|
||||||
|
auto timer = timer_ref.lock();
|
||||||
|
if (timer && timer->check_and_trigger()) {
|
||||||
any_exec->timer = timer;
|
any_exec->timer = timer;
|
||||||
any_exec->callback_group = group;
|
any_exec->callback_group = group;
|
||||||
any_exec->node = get_node_by_group(group);
|
node = get_node_by_group(group);
|
||||||
guard_condition_handles_.erase(it++);
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
// Else, the timer is no longer valid, remove it and continue
|
|
||||||
guard_condition_handles_.erase(it++);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
std::chrono::nanoseconds
|
||||||
|
get_earliest_timer()
|
||||||
|
{
|
||||||
|
std::chrono::nanoseconds latest = std::chrono::nanoseconds::max();
|
||||||
|
bool timers_empty = true;
|
||||||
|
for (auto & weak_node : weak_nodes_) {
|
||||||
|
auto node = weak_node.lock();
|
||||||
|
if (!node) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
for (auto & weak_group : node->callback_groups_) {
|
||||||
|
auto group = weak_group.lock();
|
||||||
|
if (!group || !group->can_be_taken_from_.load()) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
for (auto & timer_ref : group->timer_ptrs_) {
|
||||||
|
timers_empty = false;
|
||||||
|
// Check the expected trigger time
|
||||||
|
auto timer = timer_ref.lock();
|
||||||
|
if (timer && timer->time_until_trigger() < latest) {
|
||||||
|
latest = timer->time_until_trigger();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (timers_empty) {
|
||||||
|
return std::chrono::nanoseconds(-1);
|
||||||
|
}
|
||||||
|
return latest;
|
||||||
|
}
|
||||||
|
|
||||||
rclcpp::callback_group::CallbackGroup::SharedPtr
|
rclcpp::callback_group::CallbackGroup::SharedPtr
|
||||||
get_group_by_subscription(
|
get_group_by_subscription(
|
||||||
|
@ -867,8 +855,6 @@ private:
|
||||||
std::vector<std::weak_ptr<rclcpp::node::Node>> weak_nodes_;
|
std::vector<std::weak_ptr<rclcpp::node::Node>> weak_nodes_;
|
||||||
typedef std::list<void *> SubscriberHandles;
|
typedef std::list<void *> SubscriberHandles;
|
||||||
SubscriberHandles subscriber_handles_;
|
SubscriberHandles subscriber_handles_;
|
||||||
typedef std::list<void *> GuardConditionHandles;
|
|
||||||
GuardConditionHandles guard_condition_handles_;
|
|
||||||
typedef std::list<void *> ServiceHandles;
|
typedef std::list<void *> ServiceHandles;
|
||||||
ServiceHandles service_handles_;
|
ServiceHandles service_handles_;
|
||||||
typedef std::list<void *> ClientHandles;
|
typedef std::list<void *> ClientHandles;
|
||||||
|
|
|
@ -99,6 +99,11 @@ public:
|
||||||
last_interval_ = Clock::now();
|
last_interval_ = Clock::now();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
std::chrono::nanoseconds period() const
|
||||||
|
{
|
||||||
|
return period_;
|
||||||
|
}
|
||||||
|
|
||||||
private:
|
private:
|
||||||
RCLCPP_DISABLE_COPY(GenericRate);
|
RCLCPP_DISABLE_COPY(GenericRate);
|
||||||
|
|
||||||
|
|
|
@ -53,28 +53,12 @@ public:
|
||||||
TimerBase(std::chrono::nanoseconds period, CallbackType callback)
|
TimerBase(std::chrono::nanoseconds period, CallbackType callback)
|
||||||
: period_(period),
|
: period_(period),
|
||||||
callback_(callback),
|
callback_(callback),
|
||||||
guard_condition_(rmw_create_guard_condition()),
|
|
||||||
canceled_(false)
|
canceled_(false)
|
||||||
{
|
{
|
||||||
if (!guard_condition_) {
|
|
||||||
// TODO(wjwwood): use custom exception
|
|
||||||
throw std::runtime_error(
|
|
||||||
std::string("failed to create guard condition: ") +
|
|
||||||
rmw_get_error_string_safe()
|
|
||||||
);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
virtual ~TimerBase()
|
virtual ~TimerBase()
|
||||||
{
|
{
|
||||||
if (guard_condition_) {
|
|
||||||
if (rmw_destroy_guard_condition(guard_condition_) != RMW_RET_OK) {
|
|
||||||
std::stringstream ss;
|
|
||||||
ss << "Error in TimerBase destructor, rmw_destroy_guard_condition failed: " <<
|
|
||||||
rmw_get_error_string_safe() << '\n';
|
|
||||||
(std::cerr << ss.str()).flush();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void
|
void
|
||||||
|
@ -83,12 +67,17 @@ public:
|
||||||
this->canceled_ = true;
|
this->canceled_ = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
virtual std::chrono::nanoseconds
|
||||||
|
time_until_trigger() = 0;
|
||||||
|
|
||||||
virtual bool is_steady() = 0;
|
virtual bool is_steady() = 0;
|
||||||
|
|
||||||
|
// Interface for externally triggering the timer event
|
||||||
|
virtual bool check_and_trigger() = 0;
|
||||||
|
|
||||||
protected:
|
protected:
|
||||||
std::chrono::nanoseconds period_;
|
std::chrono::nanoseconds period_;
|
||||||
CallbackType callback_;
|
CallbackType callback_;
|
||||||
rmw_guard_condition_t * guard_condition_;
|
|
||||||
|
|
||||||
bool canceled_;
|
bool canceled_;
|
||||||
|
|
||||||
|
@ -106,29 +95,48 @@ public:
|
||||||
GenericTimer(std::chrono::nanoseconds period, CallbackType callback)
|
GenericTimer(std::chrono::nanoseconds period, CallbackType callback)
|
||||||
: TimerBase(period, callback), loop_rate_(period)
|
: TimerBase(period, callback), loop_rate_(period)
|
||||||
{
|
{
|
||||||
thread_ = std::thread(&GenericTimer<Clock>::run, this);
|
/* Subtracting the loop rate period ensures that the callback gets triggered
|
||||||
|
on the first call to check_and_trigger. */
|
||||||
|
last_triggered_time_ = Clock::now() - period;
|
||||||
}
|
}
|
||||||
|
|
||||||
virtual ~GenericTimer()
|
virtual ~GenericTimer()
|
||||||
{
|
{
|
||||||
cancel();
|
cancel();
|
||||||
thread_.join();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void
|
// return: true to trigger callback on the next "execute_timer" call in executor
|
||||||
run()
|
bool
|
||||||
|
check_and_trigger()
|
||||||
{
|
{
|
||||||
while (rclcpp::utilities::ok() && !this->canceled_) {
|
if (canceled_) {
|
||||||
loop_rate_.sleep();
|
return false;
|
||||||
if (!rclcpp::utilities::ok()) {
|
|
||||||
return;
|
|
||||||
}
|
}
|
||||||
rmw_ret_t status = rmw_trigger_guard_condition(guard_condition_);
|
if (Clock::now() < last_triggered_time_) {
|
||||||
if (status != RMW_RET_OK) {
|
return false;
|
||||||
fprintf(stderr,
|
|
||||||
"[rclcpp::error] failed to trigger guard condition: %s\n", rmw_get_error_string_safe());
|
|
||||||
}
|
}
|
||||||
|
if (std::chrono::duration_cast<std::chrono::nanoseconds>(Clock::now() - last_triggered_time_) >=
|
||||||
|
loop_rate_.period())
|
||||||
|
{
|
||||||
|
last_triggered_time_ = Clock::now();
|
||||||
|
return true;
|
||||||
}
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
std::chrono::nanoseconds
|
||||||
|
time_until_trigger()
|
||||||
|
{
|
||||||
|
std::chrono::nanoseconds time_until_trigger;
|
||||||
|
// Calculate the time between the next trigger and the current time
|
||||||
|
if (last_triggered_time_ + loop_rate_.period() < Clock::now()) {
|
||||||
|
// time is overdue, need to trigger immediately
|
||||||
|
time_until_trigger = std::chrono::nanoseconds::zero();
|
||||||
|
} else {
|
||||||
|
time_until_trigger = std::chrono::duration_cast<std::chrono::nanoseconds>(
|
||||||
|
last_triggered_time_ - Clock::now()) + loop_rate_.period();
|
||||||
|
}
|
||||||
|
return time_until_trigger;
|
||||||
}
|
}
|
||||||
|
|
||||||
virtual bool
|
virtual bool
|
||||||
|
@ -140,8 +148,8 @@ public:
|
||||||
private:
|
private:
|
||||||
RCLCPP_DISABLE_COPY(GenericTimer);
|
RCLCPP_DISABLE_COPY(GenericTimer);
|
||||||
|
|
||||||
std::thread thread_;
|
|
||||||
rclcpp::rate::GenericRate<Clock> loop_rate_;
|
rclcpp::rate::GenericRate<Clock> loop_rate_;
|
||||||
|
std::chrono::time_point<Clock> last_triggered_time_;
|
||||||
|
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue