Backports #1516 and follow-up fix #1628 Patched to keep ABI compatibility by using static class variables to store the mutex two priorities instances. Signed-off-by: hsgwa <hasegawa@isp.co.jp>
This commit is contained in:
parent
791c23afe5
commit
d12ed36e89
5 changed files with 208 additions and 4 deletions
|
@ -39,6 +39,7 @@ set(${PROJECT_NAME}_SRCS
|
||||||
src/rclcpp/clock.cpp
|
src/rclcpp/clock.cpp
|
||||||
src/rclcpp/context.cpp
|
src/rclcpp/context.cpp
|
||||||
src/rclcpp/contexts/default_context.cpp
|
src/rclcpp/contexts/default_context.cpp
|
||||||
|
src/rclcpp/detail/mutex_two_priorities.cpp
|
||||||
src/rclcpp/detail/rmw_implementation_specific_payload.cpp
|
src/rclcpp/detail/rmw_implementation_specific_payload.cpp
|
||||||
src/rclcpp/detail/rmw_implementation_specific_publisher_payload.cpp
|
src/rclcpp/detail/rmw_implementation_specific_publisher_payload.cpp
|
||||||
src/rclcpp/detail/rmw_implementation_specific_subscription_payload.cpp
|
src/rclcpp/detail/rmw_implementation_specific_subscription_payload.cpp
|
||||||
|
|
76
rclcpp/include/rclcpp/detail/mutex_two_priorities.hpp
Normal file
76
rclcpp/include/rclcpp/detail/mutex_two_priorities.hpp
Normal file
|
@ -0,0 +1,76 @@
|
||||||
|
// Copyright 2021 Open Source Robotics Foundation, Inc.
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
#ifndef RCLCPP__DETAIL__MUTEX_TWO_PRIORITIES_HPP_
|
||||||
|
#define RCLCPP__DETAIL__MUTEX_TWO_PRIORITIES_HPP_
|
||||||
|
|
||||||
|
#include <condition_variable>
|
||||||
|
#include <mutex>
|
||||||
|
|
||||||
|
namespace rclcpp
|
||||||
|
{
|
||||||
|
namespace detail
|
||||||
|
{
|
||||||
|
/// \internal A mutex that has two locking mechanism, one with higher priority than the other.
|
||||||
|
/**
|
||||||
|
* After the current mutex owner release the lock, a thread that used the high
|
||||||
|
* priority mechanism will have priority over threads that used the low priority mechanism.
|
||||||
|
*/
|
||||||
|
class MutexTwoPriorities
|
||||||
|
{
|
||||||
|
public:
|
||||||
|
class HighPriorityLockable
|
||||||
|
{
|
||||||
|
public:
|
||||||
|
explicit HighPriorityLockable(MutexTwoPriorities & parent);
|
||||||
|
|
||||||
|
void lock();
|
||||||
|
|
||||||
|
void unlock();
|
||||||
|
|
||||||
|
private:
|
||||||
|
MutexTwoPriorities & parent_;
|
||||||
|
};
|
||||||
|
|
||||||
|
class LowPriorityLockable
|
||||||
|
{
|
||||||
|
public:
|
||||||
|
explicit LowPriorityLockable(MutexTwoPriorities & parent);
|
||||||
|
|
||||||
|
void lock();
|
||||||
|
|
||||||
|
void unlock();
|
||||||
|
|
||||||
|
private:
|
||||||
|
MutexTwoPriorities & parent_;
|
||||||
|
};
|
||||||
|
|
||||||
|
HighPriorityLockable
|
||||||
|
get_high_priority_lockable();
|
||||||
|
|
||||||
|
LowPriorityLockable
|
||||||
|
get_low_priority_lockable();
|
||||||
|
|
||||||
|
private:
|
||||||
|
std::condition_variable hp_cv_;
|
||||||
|
std::condition_variable lp_cv_;
|
||||||
|
std::mutex cv_mutex_;
|
||||||
|
size_t hp_waiting_count_{0u};
|
||||||
|
bool data_taken_{false};
|
||||||
|
};
|
||||||
|
|
||||||
|
} // namespace detail
|
||||||
|
} // namespace rclcpp
|
||||||
|
|
||||||
|
#endif // RCLCPP__DETAIL__MUTEX_TWO_PRIORITIES_HPP_
|
|
@ -22,6 +22,7 @@
|
||||||
#include <thread>
|
#include <thread>
|
||||||
#include <unordered_map>
|
#include <unordered_map>
|
||||||
|
|
||||||
|
#include "rclcpp/detail/mutex_two_priorities.hpp"
|
||||||
#include "rclcpp/executor.hpp"
|
#include "rclcpp/executor.hpp"
|
||||||
#include "rclcpp/macros.hpp"
|
#include "rclcpp/macros.hpp"
|
||||||
#include "rclcpp/memory_strategies.hpp"
|
#include "rclcpp/memory_strategies.hpp"
|
||||||
|
@ -81,12 +82,17 @@ protected:
|
||||||
private:
|
private:
|
||||||
RCLCPP_DISABLE_COPY(MultiThreadedExecutor)
|
RCLCPP_DISABLE_COPY(MultiThreadedExecutor)
|
||||||
|
|
||||||
std::mutex wait_mutex_;
|
std::mutex wait_mutex_; // Unused. Leave it for ABI compatibility.
|
||||||
size_t number_of_threads_;
|
size_t number_of_threads_;
|
||||||
bool yield_before_execute_;
|
bool yield_before_execute_;
|
||||||
std::chrono::nanoseconds next_exec_timeout_;
|
std::chrono::nanoseconds next_exec_timeout_;
|
||||||
|
|
||||||
std::set<TimerBase::SharedPtr> scheduled_timers_;
|
std::set<TimerBase::SharedPtr> scheduled_timers_;
|
||||||
|
static std::unordered_map<MultiThreadedExecutor *,
|
||||||
|
std::shared_ptr<detail::MutexTwoPriorities>> wait_mutex_set_;
|
||||||
|
static std::mutex shared_wait_mutex_;
|
||||||
|
// These variables are declared as static variables for ABI-compatibiliity.
|
||||||
|
// And they mimic member variables needed to backport from master.
|
||||||
};
|
};
|
||||||
|
|
||||||
} // namespace executors
|
} // namespace executors
|
||||||
|
|
104
rclcpp/src/rclcpp/detail/mutex_two_priorities.cpp
Normal file
104
rclcpp/src/rclcpp/detail/mutex_two_priorities.cpp
Normal file
|
@ -0,0 +1,104 @@
|
||||||
|
// Copyright 2021 Open Source Robotics Foundation, Inc.
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
#include "rclcpp/detail/mutex_two_priorities.hpp"
|
||||||
|
|
||||||
|
#include <mutex>
|
||||||
|
|
||||||
|
namespace rclcpp
|
||||||
|
{
|
||||||
|
namespace detail
|
||||||
|
{
|
||||||
|
|
||||||
|
using LowPriorityLockable = MutexTwoPriorities::LowPriorityLockable;
|
||||||
|
using HighPriorityLockable = MutexTwoPriorities::HighPriorityLockable;
|
||||||
|
|
||||||
|
HighPriorityLockable::HighPriorityLockable(MutexTwoPriorities & parent)
|
||||||
|
: parent_(parent)
|
||||||
|
{}
|
||||||
|
|
||||||
|
void
|
||||||
|
HighPriorityLockable::lock()
|
||||||
|
{
|
||||||
|
std::unique_lock<std::mutex> guard{parent_.cv_mutex_};
|
||||||
|
if (parent_.data_taken_) {
|
||||||
|
++parent_.hp_waiting_count_;
|
||||||
|
while (parent_.data_taken_) {
|
||||||
|
parent_.hp_cv_.wait(guard);
|
||||||
|
}
|
||||||
|
--parent_.hp_waiting_count_;
|
||||||
|
}
|
||||||
|
parent_.data_taken_ = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
void
|
||||||
|
HighPriorityLockable::unlock()
|
||||||
|
{
|
||||||
|
bool notify_lp{false};
|
||||||
|
{
|
||||||
|
std::lock_guard<std::mutex> guard{parent_.cv_mutex_};
|
||||||
|
parent_.data_taken_ = false;
|
||||||
|
notify_lp = 0u == parent_.hp_waiting_count_;
|
||||||
|
}
|
||||||
|
if (notify_lp) {
|
||||||
|
parent_.lp_cv_.notify_one();
|
||||||
|
} else {
|
||||||
|
parent_.hp_cv_.notify_one();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
LowPriorityLockable::LowPriorityLockable(MutexTwoPriorities & parent)
|
||||||
|
: parent_(parent)
|
||||||
|
{}
|
||||||
|
|
||||||
|
void
|
||||||
|
LowPriorityLockable::lock()
|
||||||
|
{
|
||||||
|
std::unique_lock<std::mutex> guard{parent_.cv_mutex_};
|
||||||
|
while (parent_.data_taken_ || parent_.hp_waiting_count_) {
|
||||||
|
parent_.lp_cv_.wait(guard);
|
||||||
|
}
|
||||||
|
parent_.data_taken_ = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
void
|
||||||
|
LowPriorityLockable::unlock()
|
||||||
|
{
|
||||||
|
bool notify_lp{false};
|
||||||
|
{
|
||||||
|
std::lock_guard<std::mutex> guard{parent_.cv_mutex_};
|
||||||
|
parent_.data_taken_ = false;
|
||||||
|
notify_lp = 0u == parent_.hp_waiting_count_;
|
||||||
|
}
|
||||||
|
if (notify_lp) {
|
||||||
|
parent_.lp_cv_.notify_one();
|
||||||
|
} else {
|
||||||
|
parent_.hp_cv_.notify_one();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
HighPriorityLockable
|
||||||
|
MutexTwoPriorities::get_high_priority_lockable()
|
||||||
|
{
|
||||||
|
return HighPriorityLockable{*this};
|
||||||
|
}
|
||||||
|
|
||||||
|
LowPriorityLockable
|
||||||
|
MutexTwoPriorities::get_low_priority_lockable()
|
||||||
|
{
|
||||||
|
return LowPriorityLockable{*this};
|
||||||
|
}
|
||||||
|
|
||||||
|
} // namespace detail
|
||||||
|
} // namespace rclcpp
|
|
@ -17,13 +17,19 @@
|
||||||
#include <chrono>
|
#include <chrono>
|
||||||
#include <functional>
|
#include <functional>
|
||||||
#include <memory>
|
#include <memory>
|
||||||
|
#include <unordered_map>
|
||||||
#include <vector>
|
#include <vector>
|
||||||
|
|
||||||
#include "rclcpp/utilities.hpp"
|
#include "rclcpp/utilities.hpp"
|
||||||
#include "rclcpp/scope_exit.hpp"
|
#include "rclcpp/scope_exit.hpp"
|
||||||
|
|
||||||
|
using rclcpp::detail::MutexTwoPriorities;
|
||||||
using rclcpp::executors::MultiThreadedExecutor;
|
using rclcpp::executors::MultiThreadedExecutor;
|
||||||
|
|
||||||
|
std::unordered_map<MultiThreadedExecutor *, std::shared_ptr<MutexTwoPriorities>>
|
||||||
|
MultiThreadedExecutor::wait_mutex_set_;
|
||||||
|
std::mutex MultiThreadedExecutor::shared_wait_mutex_;
|
||||||
|
|
||||||
MultiThreadedExecutor::MultiThreadedExecutor(
|
MultiThreadedExecutor::MultiThreadedExecutor(
|
||||||
const rclcpp::ExecutorOptions & options,
|
const rclcpp::ExecutorOptions & options,
|
||||||
size_t number_of_threads,
|
size_t number_of_threads,
|
||||||
|
@ -33,6 +39,11 @@ MultiThreadedExecutor::MultiThreadedExecutor(
|
||||||
yield_before_execute_(yield_before_execute),
|
yield_before_execute_(yield_before_execute),
|
||||||
next_exec_timeout_(next_exec_timeout)
|
next_exec_timeout_(next_exec_timeout)
|
||||||
{
|
{
|
||||||
|
{
|
||||||
|
std::lock_guard<std::mutex> wait_lock(
|
||||||
|
MultiThreadedExecutor::shared_wait_mutex_);
|
||||||
|
wait_mutex_set_[this] = std::make_shared<MutexTwoPriorities>();
|
||||||
|
}
|
||||||
number_of_threads_ = number_of_threads ? number_of_threads : std::thread::hardware_concurrency();
|
number_of_threads_ = number_of_threads ? number_of_threads : std::thread::hardware_concurrency();
|
||||||
if (number_of_threads_ == 0) {
|
if (number_of_threads_ == 0) {
|
||||||
number_of_threads_ = 1;
|
number_of_threads_ = 1;
|
||||||
|
@ -51,7 +62,9 @@ MultiThreadedExecutor::spin()
|
||||||
std::vector<std::thread> threads;
|
std::vector<std::thread> threads;
|
||||||
size_t thread_id = 0;
|
size_t thread_id = 0;
|
||||||
{
|
{
|
||||||
std::lock_guard<std::mutex> wait_lock(wait_mutex_);
|
auto wait_mutex = MultiThreadedExecutor::wait_mutex_set_[this];
|
||||||
|
auto low_priority_wait_mutex = wait_mutex->get_low_priority_lockable();
|
||||||
|
std::lock_guard<MutexTwoPriorities::LowPriorityLockable> wait_lock(low_priority_wait_mutex);
|
||||||
for (; thread_id < number_of_threads_ - 1; ++thread_id) {
|
for (; thread_id < number_of_threads_ - 1; ++thread_id) {
|
||||||
auto func = std::bind(&MultiThreadedExecutor::run, this, thread_id);
|
auto func = std::bind(&MultiThreadedExecutor::run, this, thread_id);
|
||||||
threads.emplace_back(func);
|
threads.emplace_back(func);
|
||||||
|
@ -76,7 +89,9 @@ MultiThreadedExecutor::run(size_t)
|
||||||
while (rclcpp::ok(this->context_) && spinning.load()) {
|
while (rclcpp::ok(this->context_) && spinning.load()) {
|
||||||
rclcpp::AnyExecutable any_exec;
|
rclcpp::AnyExecutable any_exec;
|
||||||
{
|
{
|
||||||
std::lock_guard<std::mutex> wait_lock(wait_mutex_);
|
auto wait_mutex = MultiThreadedExecutor::wait_mutex_set_[this];
|
||||||
|
auto low_priority_wait_mutex = wait_mutex->get_low_priority_lockable();
|
||||||
|
std::lock_guard<MutexTwoPriorities::LowPriorityLockable> wait_lock(low_priority_wait_mutex);
|
||||||
if (!rclcpp::ok(this->context_) || !spinning.load()) {
|
if (!rclcpp::ok(this->context_) || !spinning.load()) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -103,7 +118,9 @@ MultiThreadedExecutor::run(size_t)
|
||||||
execute_any_executable(any_exec);
|
execute_any_executable(any_exec);
|
||||||
|
|
||||||
if (any_exec.timer) {
|
if (any_exec.timer) {
|
||||||
std::lock_guard<std::mutex> wait_lock(wait_mutex_);
|
auto wait_mutex = MultiThreadedExecutor::wait_mutex_set_[this];
|
||||||
|
auto high_priority_wait_mutex = wait_mutex->get_high_priority_lockable();
|
||||||
|
std::lock_guard<MutexTwoPriorities::HighPriorityLockable> wait_lock(high_priority_wait_mutex);
|
||||||
auto it = scheduled_timers_.find(any_exec.timer);
|
auto it = scheduled_timers_.find(any_exec.timer);
|
||||||
if (it != scheduled_timers_.end()) {
|
if (it != scheduled_timers_.end()) {
|
||||||
scheduled_timers_.erase(it);
|
scheduled_timers_.erase(it);
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue