executor could take more than once incorrectly (#383)
* Baseline test and force threads to yield. * Add timer tracking for executor. * Add locking and test happy-path exit conditions. * Move logic to multi_threaded_executor * Address reviewer feedback by reducing scope of set. * Expand tolerance on testing. * comment fixup Otherwise it seemed to me like it would yield twice.
This commit is contained in:
parent
d33a46c3b6
commit
62c8c5b762
4 changed files with 151 additions and 3 deletions
|
@ -387,6 +387,14 @@ if(BUILD_TESTING)
|
||||||
"rcl")
|
"rcl")
|
||||||
target_link_libraries(test_utilities ${PROJECT_NAME})
|
target_link_libraries(test_utilities ${PROJECT_NAME})
|
||||||
endif()
|
endif()
|
||||||
|
|
||||||
|
ament_add_gtest(test_multi_threaded_executor test/executors/test_multi_threaded_executor.cpp
|
||||||
|
APPEND_LIBRARY_DIRS "${append_library_dirs}")
|
||||||
|
if(TARGET test_multi_threaded_executor)
|
||||||
|
ament_target_dependencies(test_multi_threaded_executor
|
||||||
|
"rcl")
|
||||||
|
target_link_libraries(test_multi_threaded_executor ${PROJECT_NAME})
|
||||||
|
endif()
|
||||||
endif()
|
endif()
|
||||||
|
|
||||||
ament_package(
|
ament_package(
|
||||||
|
|
|
@ -15,7 +15,9 @@
|
||||||
#ifndef RCLCPP__EXECUTORS__MULTI_THREADED_EXECUTOR_HPP_
|
#ifndef RCLCPP__EXECUTORS__MULTI_THREADED_EXECUTOR_HPP_
|
||||||
#define RCLCPP__EXECUTORS__MULTI_THREADED_EXECUTOR_HPP_
|
#define RCLCPP__EXECUTORS__MULTI_THREADED_EXECUTOR_HPP_
|
||||||
|
|
||||||
|
#include <memory>
|
||||||
#include <mutex>
|
#include <mutex>
|
||||||
|
#include <set>
|
||||||
#include <thread>
|
#include <thread>
|
||||||
#include <unordered_map>
|
#include <unordered_map>
|
||||||
|
|
||||||
|
@ -34,10 +36,22 @@ class MultiThreadedExecutor : public executor::Executor
|
||||||
public:
|
public:
|
||||||
RCLCPP_SMART_PTR_DEFINITIONS(MultiThreadedExecutor)
|
RCLCPP_SMART_PTR_DEFINITIONS(MultiThreadedExecutor)
|
||||||
|
|
||||||
|
/// Constructor for MultiThreadedExecutor.
|
||||||
|
/**
|
||||||
|
* For the yield_before_execute option, when true std::this_thread::yield()
|
||||||
|
* will be called after acquiring work (as an AnyExecutable) and
|
||||||
|
* releasing the spinning lock, but before executing the work.
|
||||||
|
* This is useful for reproducing some bugs related to taking work more than
|
||||||
|
* once.
|
||||||
|
*
|
||||||
|
* \param args common arguments for all executors
|
||||||
|
* \param yield_before_execute if true std::this_thread::yield() is called
|
||||||
|
*/
|
||||||
RCLCPP_PUBLIC
|
RCLCPP_PUBLIC
|
||||||
MultiThreadedExecutor(
|
MultiThreadedExecutor(
|
||||||
const executor::ExecutorArgs & args = rclcpp::executor::create_default_executor_arguments(),
|
const executor::ExecutorArgs & args = rclcpp::executor::create_default_executor_arguments(),
|
||||||
size_t number_of_threads = 0);
|
size_t number_of_threads = 0,
|
||||||
|
bool yield_before_execute = false);
|
||||||
|
|
||||||
RCLCPP_PUBLIC
|
RCLCPP_PUBLIC
|
||||||
virtual ~MultiThreadedExecutor();
|
virtual ~MultiThreadedExecutor();
|
||||||
|
@ -60,6 +74,10 @@ private:
|
||||||
|
|
||||||
std::mutex wait_mutex_;
|
std::mutex wait_mutex_;
|
||||||
size_t number_of_threads_;
|
size_t number_of_threads_;
|
||||||
|
bool yield_before_execute_;
|
||||||
|
|
||||||
|
std::mutex scheduled_timers_mutex_;
|
||||||
|
std::set<TimerBase::SharedPtr> scheduled_timers_;
|
||||||
};
|
};
|
||||||
|
|
||||||
} // namespace executors
|
} // namespace executors
|
||||||
|
|
|
@ -16,6 +16,7 @@
|
||||||
|
|
||||||
#include <chrono>
|
#include <chrono>
|
||||||
#include <functional>
|
#include <functional>
|
||||||
|
#include <memory>
|
||||||
#include <vector>
|
#include <vector>
|
||||||
|
|
||||||
#include "rclcpp/utilities.hpp"
|
#include "rclcpp/utilities.hpp"
|
||||||
|
@ -25,8 +26,9 @@ using rclcpp::executors::MultiThreadedExecutor;
|
||||||
|
|
||||||
MultiThreadedExecutor::MultiThreadedExecutor(
|
MultiThreadedExecutor::MultiThreadedExecutor(
|
||||||
const rclcpp::executor::ExecutorArgs & args,
|
const rclcpp::executor::ExecutorArgs & args,
|
||||||
size_t number_of_threads)
|
size_t number_of_threads,
|
||||||
: executor::Executor(args)
|
bool yield_before_execute)
|
||||||
|
: executor::Executor(args), yield_before_execute_(yield_before_execute)
|
||||||
{
|
{
|
||||||
number_of_threads_ = number_of_threads ? number_of_threads : std::thread::hardware_concurrency();
|
number_of_threads_ = number_of_threads ? number_of_threads : std::thread::hardware_concurrency();
|
||||||
if (number_of_threads_ == 0) {
|
if (number_of_threads_ == 0) {
|
||||||
|
@ -78,7 +80,27 @@ MultiThreadedExecutor::run(size_t)
|
||||||
if (!get_next_executable(any_exec)) {
|
if (!get_next_executable(any_exec)) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
if (any_exec.timer) {
|
||||||
|
// Guard against multiple threads getting the same timer.
|
||||||
|
std::lock_guard<std::mutex> lock(scheduled_timers_mutex_);
|
||||||
|
if (scheduled_timers_.count(any_exec.timer) != 0) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
scheduled_timers_.insert(any_exec.timer);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
if (yield_before_execute_) {
|
||||||
|
std::this_thread::yield();
|
||||||
|
}
|
||||||
|
|
||||||
execute_any_executable(any_exec);
|
execute_any_executable(any_exec);
|
||||||
|
|
||||||
|
if (any_exec.timer) {
|
||||||
|
std::lock_guard<std::mutex> lock(scheduled_timers_mutex_);
|
||||||
|
auto it = scheduled_timers_.find(any_exec.timer);
|
||||||
|
if (it != scheduled_timers_.end()) {
|
||||||
|
scheduled_timers_.erase(it);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
100
rclcpp/test/executors/test_multi_threaded_executor.cpp
Normal file
100
rclcpp/test/executors/test_multi_threaded_executor.cpp
Normal file
|
@ -0,0 +1,100 @@
|
||||||
|
// Copyright 2018 Open Source Robotics Foundation, Inc.
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
#include <gtest/gtest.h>
|
||||||
|
|
||||||
|
#include <chrono>
|
||||||
|
#include <string>
|
||||||
|
#include <memory>
|
||||||
|
|
||||||
|
#include "rclcpp/exceptions.hpp"
|
||||||
|
#include "rclcpp/node.hpp"
|
||||||
|
#include "rclcpp/rclcpp.hpp"
|
||||||
|
#include "rclcpp/executors.hpp"
|
||||||
|
|
||||||
|
#include "rcl_interfaces/msg/intra_process_message.hpp"
|
||||||
|
|
||||||
|
using namespace std::chrono_literals;
|
||||||
|
|
||||||
|
using rcl_interfaces::msg::IntraProcessMessage;
|
||||||
|
|
||||||
|
class TestMultiThreadedExecutor : public ::testing::Test
|
||||||
|
{
|
||||||
|
protected:
|
||||||
|
static void SetUpTestCase()
|
||||||
|
{
|
||||||
|
rclcpp::init(0, nullptr);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
/*
|
||||||
|
Test that timers are not taken multiple times when using reentrant callback groups.
|
||||||
|
*/
|
||||||
|
TEST_F(TestMultiThreadedExecutor, timer_over_take) {
|
||||||
|
#ifdef __linux__
|
||||||
|
// This seems to be the most effective way to force the bug to happen on Linux.
|
||||||
|
// This is unnecessary on MacOS, since the default scheduler causes it.
|
||||||
|
struct sched_param param;
|
||||||
|
param.sched_priority = 0;
|
||||||
|
if (sched_setscheduler(0, SCHED_BATCH, ¶m) != 0) {
|
||||||
|
perror("sched_setscheduler");
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
|
bool yield_before_execute = true;
|
||||||
|
|
||||||
|
rclcpp::executors::MultiThreadedExecutor executor(
|
||||||
|
rclcpp::executor::create_default_executor_arguments(), 2u, yield_before_execute);
|
||||||
|
|
||||||
|
ASSERT_GT(executor.get_number_of_threads(), 1u);
|
||||||
|
|
||||||
|
std::shared_ptr<rclcpp::Node> node =
|
||||||
|
std::make_shared<rclcpp::Node>("test_multi_threaded_executor_timer_over_take");
|
||||||
|
|
||||||
|
auto cbg = node->create_callback_group(rclcpp::callback_group::CallbackGroupType::Reentrant);
|
||||||
|
|
||||||
|
rclcpp::Clock system_clock(RCL_STEADY_TIME);
|
||||||
|
std::mutex last_mutex;
|
||||||
|
auto last = system_clock.now();
|
||||||
|
|
||||||
|
std::atomic_int timer_count {0};
|
||||||
|
|
||||||
|
auto timer_callback = [&timer_count, &executor, &system_clock, &last_mutex, &last]() {
|
||||||
|
const double PERIOD = 0.1f;
|
||||||
|
const double TOLERANCE = 0.01f;
|
||||||
|
|
||||||
|
rclcpp::Time now = system_clock.now();
|
||||||
|
timer_count++;
|
||||||
|
|
||||||
|
if (timer_count > 5) {
|
||||||
|
executor.cancel();
|
||||||
|
}
|
||||||
|
|
||||||
|
{
|
||||||
|
std::lock_guard<std::mutex> lock(last_mutex);
|
||||||
|
double diff = std::abs((now - last).nanoseconds()) / 1.0e9;
|
||||||
|
last = now;
|
||||||
|
|
||||||
|
if (diff < PERIOD - TOLERANCE || diff > PERIOD + TOLERANCE) {
|
||||||
|
executor.cancel();
|
||||||
|
ASSERT_TRUE(diff > PERIOD - TOLERANCE);
|
||||||
|
ASSERT_TRUE(diff < PERIOD + TOLERANCE);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
auto timer = node->create_wall_timer(100ms, timer_callback, cbg);
|
||||||
|
executor.add_node(node);
|
||||||
|
executor.spin();
|
||||||
|
}
|
Loading…
Add table
Add a link
Reference in a new issue