From 5e64191e10b4ecc46a8e2a5d4744cceeade49333 Mon Sep 17 00:00:00 2001 From: Tully Foote Date: Tue, 21 Nov 2017 20:44:28 -0800 Subject: [PATCH] Resolve erase race condition. (#406) Fixes #401 Extend time source tests to allocate and deallocate callback handlers. --- rclcpp/src/rclcpp/clock.cpp | 24 ++++--- rclcpp/test/test_time_source.cpp | 105 ++++++++++++++++++++++++++++++- 2 files changed, 118 insertions(+), 11 deletions(-) diff --git a/rclcpp/src/rclcpp/clock.cpp b/rclcpp/src/rclcpp/clock.cpp index 45a13ea..1ca8708 100644 --- a/rclcpp/src/rclcpp/clock.cpp +++ b/rclcpp/src/rclcpp/clock.cpp @@ -130,15 +130,21 @@ Clock::get_triggered_callback_handlers(const TimeJump & jump) { std::vector callbacks; std::lock_guard guard(callback_list_mutex_); - for (auto wjcb = active_jump_handlers_.begin(); wjcb != active_jump_handlers_.end(); wjcb++) { - if (auto jcb = wjcb->lock()) { - if (jcb->notice_threshold.is_exceeded(jump)) { - callbacks.push_back(jcb); - } - } else { - active_jump_handlers_.erase(wjcb); - } - } + active_jump_handlers_.erase( + std::remove_if( + active_jump_handlers_.begin(), + active_jump_handlers_.end(), + [&callbacks, &jump](const std::weak_ptr & wjcb) { + if (auto jcb = wjcb.lock()) { + if (jcb->notice_threshold.is_exceeded(jump)) { + callbacks.push_back(jcb); + } + return false; + } + // Lock failed so clear the weak pointer. + return true; + }), + active_jump_handlers_.end()); return callbacks; } diff --git a/rclcpp/test/test_time_source.cpp b/rclcpp/test/test_time_source.cpp index b5fa4f4..00f07b5 100644 --- a/rclcpp/test/test_time_source.cpp +++ b/rclcpp/test/test_time_source.cpp @@ -15,6 +15,7 @@ #include #include +#include #include #include #include @@ -26,6 +27,8 @@ #include "rclcpp/time.hpp" #include "rclcpp/time_source.hpp" +using namespace std::chrono_literals; + class TestTimeSource : public ::testing::Test { protected: @@ -147,7 +150,7 @@ TEST_F(TestTimeSource, callbacks) { auto ros_clock = std::make_shared(RCL_ROS_TIME); // Register a callback for time jumps - rclcpp::JumpHandler::SharedPtr callback_holder = ros_clock->create_jump_callback( + rclcpp::JumpHandler::SharedPtr callback_handler = ros_clock->create_jump_callback( std::bind(&CallbackObject::pre_callback, &cbo, 1), std::bind(&CallbackObject::post_callback, &cbo, std::placeholders::_1, 1), jump_threshold); @@ -193,7 +196,7 @@ TEST_F(TestTimeSource, callbacks) { // Change callbacks - rclcpp::JumpHandler::SharedPtr callback_holder2 = ros_clock->create_jump_callback( + rclcpp::JumpHandler::SharedPtr callback_handler2 = ros_clock->create_jump_callback( std::bind(&CallbackObject::pre_callback, &cbo, 2), std::bind(&CallbackObject::post_callback, &cbo, std::placeholders::_1, 2), jump_threshold); @@ -224,6 +227,104 @@ TEST_F(TestTimeSource, callbacks) { EXPECT_GT(t_high.nanoseconds(), t_out.nanoseconds()); } +void trigger_clock_changes( + rclcpp::node::Node::SharedPtr node) +{ + auto clock_pub = node->create_publisher("clock", + rmw_qos_profile_default); + + rclcpp::executors::SingleThreadedExecutor executor; + executor.add_node(node); + + rclcpp::WallRate loop_rate(50); + for (int i = 0; i < 5; ++i) { + if (!rclcpp::ok()) { + break; // Break for ctrl-c + } + auto msg = std::make_shared(); + msg->sec = i; + msg->nanosec = 1000; + clock_pub->publish(msg); + // std::cout << "Publishing: '" << msg->sec << ".000000" << msg->nanosec << "'" << std::endl; + executor.spin_once(1000000ns); + loop_rate.sleep(); + } +} + + +TEST_F(TestTimeSource, callback_handler_erasure) { + CallbackObject cbo; + rclcpp::JumpThreshold jump_threshold; + jump_threshold.min_forward_ = 0; + jump_threshold.min_backward_ = 0; + jump_threshold.on_clock_change_ = true; + + rclcpp::TimeSource ts(node); + auto ros_clock = std::make_shared(RCL_ROS_TIME); + ts.attachClock(ros_clock); + EXPECT_FALSE(ros_clock->ros_time_is_active()); + + // Register a callback for time jumps + rclcpp::JumpHandler::SharedPtr callback_handler = ros_clock->create_jump_callback( + std::bind(&CallbackObject::pre_callback, &cbo, 1), + std::bind(&CallbackObject::post_callback, &cbo, std::placeholders::_1, 1), + jump_threshold); + + // Second callback handler + rclcpp::JumpHandler::SharedPtr callback_handler2 = ros_clock->create_jump_callback( + std::bind(&CallbackObject::pre_callback, &cbo, 1), + std::bind(&CallbackObject::post_callback, &cbo, std::placeholders::_1, 1), + jump_threshold); + + + EXPECT_EQ(0, cbo.last_precallback_id_); + EXPECT_EQ(0, cbo.last_postcallback_id_); + + + EXPECT_FALSE(ros_clock->ros_time_is_active()); + + trigger_clock_changes(node); + + auto t_low = rclcpp::Time(1, 0, RCL_ROS_TIME); + auto t_high = rclcpp::Time(10, 100000, RCL_ROS_TIME); + + EXPECT_EQ(1, cbo.last_precallback_id_); + EXPECT_EQ(1, cbo.last_postcallback_id_); + + // Now that we've recieved a message it should be active with parameter unset + EXPECT_TRUE(ros_clock->ros_time_is_active()); + + auto t_out = ros_clock->now(); + + EXPECT_NE(0UL, t_out.nanoseconds()); + EXPECT_LT(t_low.nanoseconds(), t_out.nanoseconds()); + EXPECT_GT(t_high.nanoseconds(), t_out.nanoseconds()); + + // Requeue a pointer in a new position + callback_handler = ros_clock->create_jump_callback( + std::bind(&CallbackObject::pre_callback, &cbo, 2), + std::bind(&CallbackObject::post_callback, &cbo, std::placeholders::_1, 2), + jump_threshold); + + // Remove the last callback in the vector + callback_handler2.reset(); + + trigger_clock_changes(node); + + + EXPECT_EQ(2, cbo.last_precallback_id_); + EXPECT_EQ(2, cbo.last_postcallback_id_); + + // Now that we've recieved a message it should be active with parameter unset + EXPECT_TRUE(ros_clock->ros_time_is_active()); + + t_out = ros_clock->now(); + + EXPECT_NE(0UL, t_out.nanoseconds()); + EXPECT_LT(t_low.nanoseconds(), t_out.nanoseconds()); + EXPECT_GT(t_high.nanoseconds(), t_out.nanoseconds()); +} + TEST_F(TestTimeSource, parameter_activation) { rclcpp::TimeSource ts(node);