Resolve erase race condition. (#406)

Fixes #401

Extend time source tests to allocate and deallocate callback handlers.
This commit is contained in:
Tully Foote 2017-11-21 20:44:28 -08:00 committed by GitHub
parent 5e565c7e75
commit 5e64191e10
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 118 additions and 11 deletions

View file

@ -130,15 +130,21 @@ Clock::get_triggered_callback_handlers(const TimeJump & jump)
{
std::vector<JumpHandler::SharedPtr> callbacks;
std::lock_guard<std::mutex> guard(callback_list_mutex_);
for (auto wjcb = active_jump_handlers_.begin(); wjcb != active_jump_handlers_.end(); wjcb++) {
if (auto jcb = wjcb->lock()) {
active_jump_handlers_.erase(
std::remove_if(
active_jump_handlers_.begin(),
active_jump_handlers_.end(),
[&callbacks, &jump](const std::weak_ptr<JumpHandler> & wjcb) {
if (auto jcb = wjcb.lock()) {
if (jcb->notice_threshold.is_exceeded(jump)) {
callbacks.push_back(jcb);
}
} else {
active_jump_handlers_.erase(wjcb);
}
return false;
}
// Lock failed so clear the weak pointer.
return true;
}),
active_jump_handlers_.end());
return callbacks;
}

View file

@ -15,6 +15,7 @@
#include <gtest/gtest.h>
#include <algorithm>
#include <chrono>
#include <limits>
#include <memory>
#include <string>
@ -26,6 +27,8 @@
#include "rclcpp/time.hpp"
#include "rclcpp/time_source.hpp"
using namespace std::chrono_literals;
class TestTimeSource : public ::testing::Test
{
protected:
@ -147,7 +150,7 @@ TEST_F(TestTimeSource, callbacks) {
auto ros_clock = std::make_shared<rclcpp::Clock>(RCL_ROS_TIME);
// Register a callback for time jumps
rclcpp::JumpHandler::SharedPtr callback_holder = ros_clock->create_jump_callback(
rclcpp::JumpHandler::SharedPtr callback_handler = ros_clock->create_jump_callback(
std::bind(&CallbackObject::pre_callback, &cbo, 1),
std::bind(&CallbackObject::post_callback, &cbo, std::placeholders::_1, 1),
jump_threshold);
@ -193,7 +196,7 @@ TEST_F(TestTimeSource, callbacks) {
// Change callbacks
rclcpp::JumpHandler::SharedPtr callback_holder2 = ros_clock->create_jump_callback(
rclcpp::JumpHandler::SharedPtr callback_handler2 = ros_clock->create_jump_callback(
std::bind(&CallbackObject::pre_callback, &cbo, 2),
std::bind(&CallbackObject::post_callback, &cbo, std::placeholders::_1, 2),
jump_threshold);
@ -224,6 +227,104 @@ TEST_F(TestTimeSource, callbacks) {
EXPECT_GT(t_high.nanoseconds(), t_out.nanoseconds());
}
void trigger_clock_changes(
rclcpp::node::Node::SharedPtr node)
{
auto clock_pub = node->create_publisher<builtin_interfaces::msg::Time>("clock",
rmw_qos_profile_default);
rclcpp::executors::SingleThreadedExecutor executor;
executor.add_node(node);
rclcpp::WallRate loop_rate(50);
for (int i = 0; i < 5; ++i) {
if (!rclcpp::ok()) {
break; // Break for ctrl-c
}
auto msg = std::make_shared<builtin_interfaces::msg::Time>();
msg->sec = i;
msg->nanosec = 1000;
clock_pub->publish(msg);
// std::cout << "Publishing: '" << msg->sec << ".000000" << msg->nanosec << "'" << std::endl;
executor.spin_once(1000000ns);
loop_rate.sleep();
}
}
TEST_F(TestTimeSource, callback_handler_erasure) {
CallbackObject cbo;
rclcpp::JumpThreshold jump_threshold;
jump_threshold.min_forward_ = 0;
jump_threshold.min_backward_ = 0;
jump_threshold.on_clock_change_ = true;
rclcpp::TimeSource ts(node);
auto ros_clock = std::make_shared<rclcpp::Clock>(RCL_ROS_TIME);
ts.attachClock(ros_clock);
EXPECT_FALSE(ros_clock->ros_time_is_active());
// Register a callback for time jumps
rclcpp::JumpHandler::SharedPtr callback_handler = ros_clock->create_jump_callback(
std::bind(&CallbackObject::pre_callback, &cbo, 1),
std::bind(&CallbackObject::post_callback, &cbo, std::placeholders::_1, 1),
jump_threshold);
// Second callback handler
rclcpp::JumpHandler::SharedPtr callback_handler2 = ros_clock->create_jump_callback(
std::bind(&CallbackObject::pre_callback, &cbo, 1),
std::bind(&CallbackObject::post_callback, &cbo, std::placeholders::_1, 1),
jump_threshold);
EXPECT_EQ(0, cbo.last_precallback_id_);
EXPECT_EQ(0, cbo.last_postcallback_id_);
EXPECT_FALSE(ros_clock->ros_time_is_active());
trigger_clock_changes(node);
auto t_low = rclcpp::Time(1, 0, RCL_ROS_TIME);
auto t_high = rclcpp::Time(10, 100000, RCL_ROS_TIME);
EXPECT_EQ(1, cbo.last_precallback_id_);
EXPECT_EQ(1, cbo.last_postcallback_id_);
// Now that we've recieved a message it should be active with parameter unset
EXPECT_TRUE(ros_clock->ros_time_is_active());
auto t_out = ros_clock->now();
EXPECT_NE(0UL, t_out.nanoseconds());
EXPECT_LT(t_low.nanoseconds(), t_out.nanoseconds());
EXPECT_GT(t_high.nanoseconds(), t_out.nanoseconds());
// Requeue a pointer in a new position
callback_handler = ros_clock->create_jump_callback(
std::bind(&CallbackObject::pre_callback, &cbo, 2),
std::bind(&CallbackObject::post_callback, &cbo, std::placeholders::_1, 2),
jump_threshold);
// Remove the last callback in the vector
callback_handler2.reset();
trigger_clock_changes(node);
EXPECT_EQ(2, cbo.last_precallback_id_);
EXPECT_EQ(2, cbo.last_postcallback_id_);
// Now that we've recieved a message it should be active with parameter unset
EXPECT_TRUE(ros_clock->ros_time_is_active());
t_out = ros_clock->now();
EXPECT_NE(0UL, t_out.nanoseconds());
EXPECT_LT(t_low.nanoseconds(), t_out.nanoseconds());
EXPECT_GT(t_high.nanoseconds(), t_out.nanoseconds());
}
TEST_F(TestTimeSource, parameter_activation) {
rclcpp::TimeSource ts(node);