Use rcl_clock_t jump callbacks (#543)
* Use rcl_clock_t jump callbacks Relieves rclcpp::TimeSource responsibility of calling jump callbacks.
This commit is contained in:
parent
4653bfcce6
commit
a55e320e6e
4 changed files with 119 additions and 187 deletions
|
@ -31,53 +31,18 @@ namespace rclcpp
|
||||||
|
|
||||||
class TimeSource;
|
class TimeSource;
|
||||||
|
|
||||||
/// A struct to represent a timejump.
|
|
||||||
/**
|
|
||||||
* It represents the time jump duration and whether it changed clock type.
|
|
||||||
*/
|
|
||||||
struct TimeJump
|
|
||||||
{
|
|
||||||
typedef enum ClockChange_t
|
|
||||||
{
|
|
||||||
ROS_TIME_NO_CHANGE, ///< The time type before and after the jump is ROS_TIME
|
|
||||||
ROS_TIME_ACTIVATED, ///< The time type switched to ROS_TIME from SYSTEM_TIME
|
|
||||||
ROS_TIME_DEACTIVATED, ///< The time type switched to SYSTEM_TIME from ROS_TIME
|
|
||||||
SYSTEM_TIME_NO_CHANGE ///< The time type before and after the jump is SYSTEM_TIME
|
|
||||||
} ClockChange_t;
|
|
||||||
|
|
||||||
ClockChange_t jump_type_; ///< The change in clock_type if there is one.
|
|
||||||
rcl_duration_t delta_; ///< The change in time value.
|
|
||||||
};
|
|
||||||
|
|
||||||
/// A class to store a threshold for a TimeJump.
|
|
||||||
/**
|
|
||||||
* This class can be used to evaluate a time jump's magnitude.
|
|
||||||
*/
|
|
||||||
class JumpThreshold
|
|
||||||
{
|
|
||||||
public:
|
|
||||||
uint64_t min_forward_; ///< The minimum jump forward to be considered exceeded..
|
|
||||||
uint64_t min_backward_; ///< The minimum backwards jump to be considered exceeded.
|
|
||||||
bool on_clock_change_; ///< Whether to trigger on any clock type change.
|
|
||||||
|
|
||||||
// Test if the threshold is exceeded by a TimeJump
|
|
||||||
RCLCPP_PUBLIC
|
|
||||||
bool
|
|
||||||
is_exceeded(const TimeJump & jump);
|
|
||||||
};
|
|
||||||
|
|
||||||
class JumpHandler
|
class JumpHandler
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
RCLCPP_SMART_PTR_DEFINITIONS(JumpHandler)
|
RCLCPP_SMART_PTR_DEFINITIONS(JumpHandler)
|
||||||
JumpHandler(
|
JumpHandler(
|
||||||
std::function<void()> pre_callback,
|
std::function<void()> pre_callback,
|
||||||
std::function<void(TimeJump)> post_callback,
|
std::function<void(const rcl_time_jump_t &)> post_callback,
|
||||||
JumpThreshold & threshold);
|
const rcl_jump_threshold_t & threshold);
|
||||||
|
|
||||||
std::function<void()> pre_callback;
|
std::function<void()> pre_callback;
|
||||||
std::function<void(const TimeJump &)> post_callback;
|
std::function<void(const rcl_time_jump_t &)> post_callback;
|
||||||
JumpThreshold notice_threshold;
|
rcl_jump_threshold_t notice_threshold;
|
||||||
};
|
};
|
||||||
|
|
||||||
class Clock
|
class Clock
|
||||||
|
@ -116,32 +81,22 @@ public:
|
||||||
JumpHandler::SharedPtr
|
JumpHandler::SharedPtr
|
||||||
create_jump_callback(
|
create_jump_callback(
|
||||||
std::function<void()> pre_callback,
|
std::function<void()> pre_callback,
|
||||||
std::function<void(const TimeJump &)> post_callback,
|
std::function<void(const rcl_time_jump_t &)> post_callback,
|
||||||
JumpThreshold & threshold);
|
const rcl_jump_threshold_t & threshold);
|
||||||
|
|
||||||
private:
|
private:
|
||||||
// A method for TimeSource to get a list of callbacks to invoke while updating
|
// Invoke time jump callback
|
||||||
RCLCPP_PUBLIC
|
RCLCPP_PUBLIC
|
||||||
std::vector<JumpHandler::SharedPtr>
|
static void
|
||||||
get_triggered_callback_handlers(const TimeJump & jump);
|
on_time_jump(
|
||||||
|
const struct rcl_time_jump_t * time_jump,
|
||||||
// Invoke callbacks that are valid and outside threshold.
|
bool before_jump,
|
||||||
RCLCPP_PUBLIC
|
void * user_data);
|
||||||
static void invoke_prejump_callbacks(
|
|
||||||
const std::vector<JumpHandler::SharedPtr> & callbacks);
|
|
||||||
|
|
||||||
RCLCPP_PUBLIC
|
|
||||||
static void invoke_postjump_callbacks(
|
|
||||||
const std::vector<JumpHandler::SharedPtr> & callbacks,
|
|
||||||
const TimeJump & jump);
|
|
||||||
|
|
||||||
/// Internal storage backed by rcl
|
/// Internal storage backed by rcl
|
||||||
rcl_clock_t rcl_clock_;
|
rcl_clock_t rcl_clock_;
|
||||||
friend TimeSource; /// Allow TimeSource to access the rcl_clock_ datatype.
|
friend TimeSource; /// Allow TimeSource to access the rcl_clock_ datatype.
|
||||||
rcl_allocator_t allocator_;
|
rcl_allocator_t allocator_;
|
||||||
|
|
||||||
std::mutex callback_list_mutex_;
|
|
||||||
std::vector<std::weak_ptr<JumpHandler>> active_jump_handlers_;
|
|
||||||
};
|
};
|
||||||
|
|
||||||
} // namespace rclcpp
|
} // namespace rclcpp
|
||||||
|
|
|
@ -28,27 +28,11 @@
|
||||||
|
|
||||||
namespace rclcpp
|
namespace rclcpp
|
||||||
{
|
{
|
||||||
bool
|
|
||||||
JumpThreshold::is_exceeded(const TimeJump & jump)
|
|
||||||
{
|
|
||||||
if (on_clock_change_ &&
|
|
||||||
(jump.jump_type_ == TimeJump::ClockChange_t::ROS_TIME_ACTIVATED ||
|
|
||||||
jump.jump_type_ == TimeJump::ClockChange_t::ROS_TIME_DEACTIVATED))
|
|
||||||
{
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
if ((uint64_t)jump.delta_.nanoseconds > min_forward_ ||
|
|
||||||
(uint64_t)jump.delta_.nanoseconds < min_backward_)
|
|
||||||
{
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
JumpHandler::JumpHandler(
|
JumpHandler::JumpHandler(
|
||||||
std::function<void()> pre_callback,
|
std::function<void()> pre_callback,
|
||||||
std::function<void(TimeJump)> post_callback,
|
std::function<void(const rcl_time_jump_t &)> post_callback,
|
||||||
JumpThreshold & threshold)
|
const rcl_jump_threshold_t & threshold)
|
||||||
: pre_callback(pre_callback),
|
: pre_callback(pre_callback),
|
||||||
post_callback(post_callback),
|
post_callback(post_callback),
|
||||||
notice_threshold(threshold)
|
notice_threshold(threshold)
|
||||||
|
@ -115,66 +99,51 @@ Clock::get_clock_type()
|
||||||
return rcl_clock_.type;
|
return rcl_clock_.type;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void
|
||||||
|
Clock::on_time_jump(
|
||||||
|
const struct rcl_time_jump_t * time_jump,
|
||||||
|
bool before_jump,
|
||||||
|
void * user_data)
|
||||||
|
{
|
||||||
|
rclcpp::JumpHandler * handler = static_cast<rclcpp::JumpHandler *>(user_data);
|
||||||
|
if (before_jump && handler->pre_callback) {
|
||||||
|
handler->pre_callback();
|
||||||
|
} else if (!before_jump && handler->post_callback) {
|
||||||
|
handler->post_callback(*time_jump);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
rclcpp::JumpHandler::SharedPtr
|
rclcpp::JumpHandler::SharedPtr
|
||||||
Clock::create_jump_callback(
|
Clock::create_jump_callback(
|
||||||
std::function<void()> pre_callback,
|
std::function<void()> pre_callback,
|
||||||
std::function<void(const TimeJump &)> post_callback,
|
std::function<void(const rcl_time_jump_t &)> post_callback,
|
||||||
JumpThreshold & threshold)
|
const rcl_jump_threshold_t & threshold)
|
||||||
{
|
{
|
||||||
// JumpHandler jump_callback;
|
// Allocate a new jump handler
|
||||||
auto jump_callback =
|
auto handler = new rclcpp::JumpHandler(pre_callback, post_callback, threshold);
|
||||||
std::make_shared<rclcpp::JumpHandler>(pre_callback, post_callback, threshold);
|
if (nullptr == handler) {
|
||||||
{
|
rclcpp::exceptions::throw_from_rcl_error(RCL_RET_BAD_ALLOC, "Failed to allocate jump handler");
|
||||||
std::lock_guard<std::mutex> guard(callback_list_mutex_);
|
|
||||||
active_jump_handlers_.push_back(jump_callback);
|
|
||||||
}
|
|
||||||
return jump_callback;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
std::vector<JumpHandler::SharedPtr>
|
// Try to add the jump callback to the clock
|
||||||
Clock::get_triggered_callback_handlers(const TimeJump & jump)
|
rcl_ret_t ret = rcl_clock_add_jump_callback(&rcl_clock_, threshold,
|
||||||
{
|
rclcpp::Clock::on_time_jump, handler);
|
||||||
std::vector<JumpHandler::SharedPtr> callbacks;
|
if (RCL_RET_OK != ret) {
|
||||||
std::lock_guard<std::mutex> guard(callback_list_mutex_);
|
delete handler;
|
||||||
active_jump_handlers_.erase(
|
rclcpp::exceptions::throw_from_rcl_error(ret, "Failed to add time jump callback");
|
||||||
std::remove_if(
|
|
||||||
active_jump_handlers_.begin(),
|
|
||||||
active_jump_handlers_.end(),
|
|
||||||
[&callbacks, &jump](const std::weak_ptr<JumpHandler> & wjcb) {
|
|
||||||
if (auto jcb = wjcb.lock()) {
|
|
||||||
if (jcb->notice_threshold.is_exceeded(jump)) {
|
|
||||||
callbacks.push_back(jcb);
|
|
||||||
}
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
// Lock failed so clear the weak pointer.
|
|
||||||
return true;
|
|
||||||
}),
|
|
||||||
active_jump_handlers_.end());
|
|
||||||
return callbacks;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void
|
// *INDENT-OFF*
|
||||||
Clock::invoke_prejump_callbacks(
|
// create shared_ptr that removes the callback automatically when all copies are destructed
|
||||||
const std::vector<JumpHandler::SharedPtr> & callbacks)
|
return rclcpp::JumpHandler::SharedPtr(handler, [this](rclcpp::JumpHandler * handler) noexcept {
|
||||||
{
|
rcl_ret_t ret = rcl_clock_remove_jump_callback(&rcl_clock_, rclcpp::Clock::on_time_jump,
|
||||||
for (const auto cb : callbacks) {
|
handler);
|
||||||
if (cb->pre_callback != nullptr) {
|
delete handler;
|
||||||
cb->pre_callback();
|
if (RCL_RET_OK != ret) {
|
||||||
}
|
RCUTILS_LOG_ERROR("Failed to remove time jump callback");
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
void
|
|
||||||
Clock::invoke_postjump_callbacks(
|
|
||||||
const std::vector<JumpHandler::SharedPtr> & callbacks,
|
|
||||||
const TimeJump & jump)
|
|
||||||
{
|
|
||||||
for (auto cb : callbacks) {
|
|
||||||
if (cb->post_callback != nullptr) {
|
|
||||||
cb->post_callback(jump);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
});
|
||||||
|
// *INDENT-ON*
|
||||||
}
|
}
|
||||||
|
|
||||||
} // namespace rclcpp
|
} // namespace rclcpp
|
||||||
|
|
|
@ -147,55 +147,19 @@ void TimeSource::set_clock(
|
||||||
const builtin_interfaces::msg::Time::SharedPtr msg, bool set_ros_time_enabled,
|
const builtin_interfaces::msg::Time::SharedPtr msg, bool set_ros_time_enabled,
|
||||||
std::shared_ptr<rclcpp::Clock> clock)
|
std::shared_ptr<rclcpp::Clock> clock)
|
||||||
{
|
{
|
||||||
// Compute diff
|
|
||||||
rclcpp::Time msg_time = rclcpp::Time(*msg);
|
|
||||||
rclcpp::Time now = clock->now();
|
|
||||||
auto diff = now - msg_time;
|
|
||||||
rclcpp::TimeJump jump;
|
|
||||||
jump.delta_.nanoseconds = diff.nanoseconds();
|
|
||||||
|
|
||||||
// Compute jump type
|
|
||||||
if (clock->ros_time_is_active()) {
|
|
||||||
if (set_ros_time_enabled) {
|
|
||||||
jump.jump_type_ = TimeJump::ClockChange_t::ROS_TIME_NO_CHANGE;
|
|
||||||
} else {
|
|
||||||
jump.jump_type_ = TimeJump::ClockChange_t::ROS_TIME_DEACTIVATED;
|
|
||||||
}
|
|
||||||
} else if (!clock->ros_time_is_active()) {
|
|
||||||
if (set_ros_time_enabled) {
|
|
||||||
jump.jump_type_ = TimeJump::ClockChange_t::ROS_TIME_ACTIVATED;
|
|
||||||
} else {
|
|
||||||
jump.jump_type_ = TimeJump::ClockChange_t::SYSTEM_TIME_NO_CHANGE;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (jump.jump_type_ == TimeJump::ClockChange_t::SYSTEM_TIME_NO_CHANGE) {
|
|
||||||
// No change/no updates don't act.
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
auto active_callbacks = clock->get_triggered_callback_handlers(jump);
|
|
||||||
clock->invoke_prejump_callbacks(active_callbacks);
|
|
||||||
|
|
||||||
// Do change
|
// Do change
|
||||||
if (jump.jump_type_ == TimeJump::ClockChange_t::ROS_TIME_DEACTIVATED) {
|
if (!set_ros_time_enabled && clock->ros_time_is_active()) {
|
||||||
disable_ros_time(clock);
|
disable_ros_time(clock);
|
||||||
} else if (jump.jump_type_ == TimeJump::ClockChange_t::ROS_TIME_ACTIVATED) {
|
} else if (set_ros_time_enabled && !clock->ros_time_is_active()) {
|
||||||
enable_ros_time(clock);
|
enable_ros_time(clock);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (jump.jump_type_ == TimeJump::ClockChange_t::ROS_TIME_ACTIVATED ||
|
auto ret = rcl_set_ros_time_override(&(clock->rcl_clock_), rclcpp::Time(*msg).nanoseconds());
|
||||||
jump.jump_type_ == TimeJump::ClockChange_t::ROS_TIME_NO_CHANGE)
|
|
||||||
{
|
|
||||||
auto ret = rcl_set_ros_time_override(&(clock->rcl_clock_), msg_time.nanoseconds());
|
|
||||||
if (ret != RCL_RET_OK) {
|
if (ret != RCL_RET_OK) {
|
||||||
rclcpp::exceptions::throw_from_rcl_error(
|
rclcpp::exceptions::throw_from_rcl_error(
|
||||||
ret, "Failed to set ros_time_override_status");
|
ret, "Failed to set ros_time_override_status");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// Post change callbacks
|
|
||||||
clock->invoke_postjump_callbacks(active_callbacks, jump);
|
|
||||||
}
|
|
||||||
|
|
||||||
void TimeSource::clock_cb(const rosgraph_msgs::msg::Clock::SharedPtr msg)
|
void TimeSource::clock_cb(const rosgraph_msgs::msg::Clock::SharedPtr msg)
|
||||||
{
|
{
|
||||||
|
|
|
@ -133,27 +133,30 @@ TEST_F(TestTimeSource, clock) {
|
||||||
class CallbackObject
|
class CallbackObject
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
CallbackObject()
|
int pre_callback_calls_ = 0;
|
||||||
: last_precallback_id_(0),
|
int last_precallback_id_ = 0;
|
||||||
last_postcallback_id_(0)
|
void pre_callback(int id)
|
||||||
{}
|
{
|
||||||
int last_precallback_id_;
|
last_precallback_id_ = id;
|
||||||
void pre_callback(int id) {last_precallback_id_ = id;}
|
++pre_callback_calls_;
|
||||||
|
}
|
||||||
|
|
||||||
int last_postcallback_id_;
|
int post_callback_calls_ = 0;
|
||||||
rclcpp::TimeJump last_timejump_;
|
int last_postcallback_id_ = 0;
|
||||||
void post_callback(const rclcpp::TimeJump & jump, int id)
|
rcl_time_jump_t last_timejump_;
|
||||||
|
void post_callback(const rcl_time_jump_t & jump, int id)
|
||||||
{
|
{
|
||||||
last_postcallback_id_ = id; last_timejump_ = jump;
|
last_postcallback_id_ = id; last_timejump_ = jump;
|
||||||
|
++post_callback_calls_;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
TEST_F(TestTimeSource, callbacks) {
|
TEST_F(TestTimeSource, callbacks) {
|
||||||
CallbackObject cbo;
|
CallbackObject cbo;
|
||||||
rclcpp::JumpThreshold jump_threshold;
|
rcl_jump_threshold_t jump_threshold;
|
||||||
jump_threshold.min_forward_ = 0;
|
jump_threshold.min_forward.nanoseconds = 0;
|
||||||
jump_threshold.min_backward_ = 0;
|
jump_threshold.min_backward.nanoseconds = 0;
|
||||||
jump_threshold.on_clock_change_ = true;
|
jump_threshold.on_clock_change = true;
|
||||||
|
|
||||||
rclcpp::TimeSource ts(node);
|
rclcpp::TimeSource ts(node);
|
||||||
auto ros_clock = std::make_shared<rclcpp::Clock>(RCL_ROS_TIME);
|
auto ros_clock = std::make_shared<rclcpp::Clock>(RCL_ROS_TIME);
|
||||||
|
@ -212,7 +215,7 @@ TEST_F(TestTimeSource, callbacks) {
|
||||||
// Register a callback handler with only pre_callback
|
// Register a callback handler with only pre_callback
|
||||||
rclcpp::JumpHandler::SharedPtr callback_handler3 = ros_clock->create_jump_callback(
|
rclcpp::JumpHandler::SharedPtr callback_handler3 = ros_clock->create_jump_callback(
|
||||||
std::bind(&CallbackObject::pre_callback, &cbo, 3),
|
std::bind(&CallbackObject::pre_callback, &cbo, 3),
|
||||||
std::function<void(rclcpp::TimeJump)>(),
|
std::function<void(rcl_time_jump_t)>(),
|
||||||
jump_threshold);
|
jump_threshold);
|
||||||
|
|
||||||
trigger_clock_changes(node);
|
trigger_clock_changes(node);
|
||||||
|
@ -233,10 +236,10 @@ TEST_F(TestTimeSource, callbacks) {
|
||||||
|
|
||||||
TEST_F(TestTimeSource, callback_handler_erasure) {
|
TEST_F(TestTimeSource, callback_handler_erasure) {
|
||||||
CallbackObject cbo;
|
CallbackObject cbo;
|
||||||
rclcpp::JumpThreshold jump_threshold;
|
rcl_jump_threshold_t jump_threshold;
|
||||||
jump_threshold.min_forward_ = 0;
|
jump_threshold.min_forward.nanoseconds = 0;
|
||||||
jump_threshold.min_backward_ = 0;
|
jump_threshold.min_backward.nanoseconds = 0;
|
||||||
jump_threshold.on_clock_change_ = true;
|
jump_threshold.on_clock_change = true;
|
||||||
|
|
||||||
rclcpp::TimeSource ts(node);
|
rclcpp::TimeSource ts(node);
|
||||||
auto ros_clock = std::make_shared<rclcpp::Clock>(RCL_ROS_TIME);
|
auto ros_clock = std::make_shared<rclcpp::Clock>(RCL_ROS_TIME);
|
||||||
|
@ -356,3 +359,44 @@ TEST_F(TestTimeSource, parameter_activation) {
|
||||||
rclcpp::spin_some(node);
|
rclcpp::spin_some(node);
|
||||||
EXPECT_FALSE(ros_clock->ros_time_is_active());
|
EXPECT_FALSE(ros_clock->ros_time_is_active());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
TEST_F(TestTimeSource, no_pre_jump_callback) {
|
||||||
|
CallbackObject cbo;
|
||||||
|
rcl_jump_threshold_t jump_threshold;
|
||||||
|
jump_threshold.min_forward.nanoseconds = 0;
|
||||||
|
jump_threshold.min_backward.nanoseconds = 0;
|
||||||
|
jump_threshold.on_clock_change = true;
|
||||||
|
|
||||||
|
rclcpp::TimeSource ts(node);
|
||||||
|
auto ros_clock = std::make_shared<rclcpp::Clock>(RCL_ROS_TIME);
|
||||||
|
|
||||||
|
// Register a callback for time jumps
|
||||||
|
rclcpp::JumpHandler::SharedPtr callback_handler = ros_clock->create_jump_callback(
|
||||||
|
nullptr,
|
||||||
|
std::bind(&CallbackObject::post_callback, &cbo, std::placeholders::_1, 1),
|
||||||
|
jump_threshold);
|
||||||
|
|
||||||
|
ASSERT_EQ(0, cbo.last_precallback_id_);
|
||||||
|
ASSERT_EQ(0, cbo.last_postcallback_id_);
|
||||||
|
ts.attachClock(ros_clock);
|
||||||
|
|
||||||
|
// Activate ROS time
|
||||||
|
auto parameters_client = std::make_shared<rclcpp::SyncParametersClient>(node);
|
||||||
|
ASSERT_TRUE(parameters_client->wait_for_service(2s));
|
||||||
|
auto set_parameters_results = parameters_client->set_parameters({
|
||||||
|
rclcpp::Parameter("use_sim_time", true)
|
||||||
|
});
|
||||||
|
for (auto & result : set_parameters_results) {
|
||||||
|
EXPECT_TRUE(result.successful);
|
||||||
|
}
|
||||||
|
// SyncParametersClient returns when parameters have been set on the node_parameters interface,
|
||||||
|
// but it doesn't mean the on_parameter_event subscription in TimeSource has been called.
|
||||||
|
// Spin some to handle that subscription.
|
||||||
|
rclcpp::spin_some(node);
|
||||||
|
ASSERT_TRUE(ros_clock->ros_time_is_active());
|
||||||
|
|
||||||
|
EXPECT_EQ(0, cbo.last_precallback_id_);
|
||||||
|
EXPECT_EQ(0, cbo.pre_callback_calls_);
|
||||||
|
EXPECT_EQ(1, cbo.last_postcallback_id_);
|
||||||
|
EXPECT_EQ(1, cbo.post_callback_calls_);
|
||||||
|
}
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue