From 67151def413b836c036250d3ea05d05fbcb59192 Mon Sep 17 00:00:00 2001 From: Jackie Kay Date: Tue, 1 Dec 2015 11:45:52 -0800 Subject: [PATCH] Finish renaming, add mutex to store and take --- rclcpp/CMakeLists.txt | 2 +- .../include/rclcpp/intra_process_manager.hpp | 14 ++++---- .../rclcpp/intra_process_manager_impl.hpp | 33 +++++++++++-------- rclcpp/src/rclcpp/intra_process_manager.cpp | 12 +++---- .../src/rclcpp/intra_process_manager_impl.cpp | 8 ++--- rclcpp/test/test_intra_process_manager.cpp | 2 +- 6 files changed, 38 insertions(+), 33 deletions(-) diff --git a/rclcpp/CMakeLists.txt b/rclcpp/CMakeLists.txt index daa5973..c342640 100644 --- a/rclcpp/CMakeLists.txt +++ b/rclcpp/CMakeLists.txt @@ -24,7 +24,7 @@ set(${PROJECT_NAME}_SRCS src/rclcpp/executors/multi_threaded_executor.cpp src/rclcpp/executors/single_threaded_executor.cpp src/rclcpp/intra_process_manager.cpp - src/rclcpp/intra_process_manager_state.cpp + src/rclcpp/intra_process_manager_impl.cpp src/rclcpp/memory_strategies.cpp src/rclcpp/memory_strategy.cpp src/rclcpp/parameter.cpp diff --git a/rclcpp/include/rclcpp/intra_process_manager.hpp b/rclcpp/include/rclcpp/intra_process_manager.hpp index 96c682d..0edec72 100644 --- a/rclcpp/include/rclcpp/intra_process_manager.hpp +++ b/rclcpp/include/rclcpp/intra_process_manager.hpp @@ -27,7 +27,7 @@ #include #include "rclcpp/allocator/allocator_deleter.hpp" -#include "rclcpp/intra_process_manager_state.hpp" +#include "rclcpp/intra_process_manager_impl.hpp" #include "rclcpp/mapped_ring_buffer.hpp" #include "rclcpp/macros.hpp" #include "rclcpp/publisher.hpp" @@ -127,7 +127,7 @@ public: RCLCPP_PUBLIC explicit IntraProcessManager( - IntraProcessManagerStateBase::SharedPtr state = create_default_state()); + IntraProcessManagerImplBase::SharedPtr state = create_default_impl()); RCLCPP_PUBLIC virtual ~IntraProcessManager(); @@ -189,7 +189,7 @@ public: auto mrb = mapped_ring_buffer::MappedRingBuffer::MessageAlloc>::make_shared( size, publisher->get_allocator()); - state_->add_publisher(id, publisher, mrb, size); + impl_->add_publisher(id, publisher, mrb, size); return id; } @@ -242,7 +242,7 @@ public: using MRBMessageAlloc = typename std::allocator_traits::template rebind_alloc; using TypedMRB = typename mapped_ring_buffer::MappedRingBuffer; uint64_t message_seq = 0; - mapped_ring_buffer::MappedRingBufferBase::SharedPtr buffer = state_->get_publisher_info_for_id( + mapped_ring_buffer::MappedRingBufferBase::SharedPtr buffer = impl_->get_publisher_info_for_id( intra_process_publisher_id, message_seq); typename TypedMRB::SharedPtr typed_buffer = std::static_pointer_cast(buffer); if (!typed_buffer) { @@ -254,7 +254,7 @@ public: // TODO(wjwwood): do something when a message was displaced. log debug? (void)did_replace; // Avoid unused variable warning. - state_->store_intra_process_message(intra_process_publisher_id, message_seq); + impl_->store_intra_process_message(intra_process_publisher_id, message_seq); // Return the message sequence which is sent to the subscription. return message_seq; @@ -308,7 +308,7 @@ public: message = nullptr; size_t target_subs_size = 0; - mapped_ring_buffer::MappedRingBufferBase::SharedPtr buffer = state_->take_intra_process_message( + mapped_ring_buffer::MappedRingBufferBase::SharedPtr buffer = impl_->take_intra_process_message( intra_process_publisher_id, message_sequence_number, requesting_subscriptions_intra_process_id, @@ -338,7 +338,7 @@ private: static uint64_t get_next_unique_id(); - IntraProcessManagerStateBase::SharedPtr state_; + IntraProcessManagerImplBase::SharedPtr impl_; }; } // namespace intra_process_manager diff --git a/rclcpp/include/rclcpp/intra_process_manager_impl.hpp b/rclcpp/include/rclcpp/intra_process_manager_impl.hpp index 0649505..8c4754c 100644 --- a/rclcpp/include/rclcpp/intra_process_manager_impl.hpp +++ b/rclcpp/include/rclcpp/intra_process_manager_impl.hpp @@ -12,8 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. -#ifndef RCLCPP__INTRA_PROCESS_MANAGER_STATE_HPP_ -#define RCLCPP__INTRA_PROCESS_MANAGER_STATE_HPP_ +#ifndef RCLCPP__INTRA_PROCESS_MANAGER_IMPL_HPP_ +#define RCLCPP__INTRA_PROCESS_MANAGER_IMPL_HPP_ #include #include @@ -21,6 +21,7 @@ #include #include #include +#include #include #include #include @@ -37,13 +38,13 @@ namespace rclcpp namespace intra_process_manager { -class IntraProcessManagerStateBase +class IntraProcessManagerImplBase { public: - RCLCPP_SMART_PTR_DEFINITIONS_NOT_COPYABLE(IntraProcessManagerStateBase); + RCLCPP_SMART_PTR_DEFINITIONS_NOT_COPYABLE(IntraProcessManagerImplBase); - IntraProcessManagerStateBase() = default; - ~IntraProcessManagerStateBase() = default; + IntraProcessManagerImplBase() = default; + ~IntraProcessManagerImplBase() = default; virtual void add_subscription(uint64_t id, subscription::SubscriptionBase::SharedPtr subscription) = 0; @@ -77,15 +78,15 @@ public: matches_any_publishers(const rmw_gid_t * id) const = 0; private: - RCLCPP_DISABLE_COPY(IntraProcessManagerStateBase); + RCLCPP_DISABLE_COPY(IntraProcessManagerImplBase); }; template> -class IntraProcessManagerState : public IntraProcessManagerStateBase +class IntraProcessManagerImpl : public IntraProcessManagerImplBase { public: - IntraProcessManagerState() = default; - ~IntraProcessManagerState() = default; + IntraProcessManagerImpl() = default; + ~IntraProcessManagerImpl() = default; void add_subscription(uint64_t id, subscription::SubscriptionBase::SharedPtr subscription) @@ -152,6 +153,7 @@ public: void store_intra_process_message(uint64_t intra_process_publisher_id, uint64_t message_seq) { + std::lock_guard lock(runtime_mutex_); auto it = publishers_.find(intra_process_publisher_id); if (it == publishers_.end()) { throw std::runtime_error("store_intra_process_message called with invalid publisher id"); @@ -184,6 +186,7 @@ public: size_t & size ) { + std::lock_guard lock(runtime_mutex_); PublisherInfo * info; { auto it = publishers_.find(intra_process_publisher_id); @@ -233,7 +236,7 @@ public: } private: - RCLCPP_DISABLE_COPY(IntraProcessManagerState); + RCLCPP_DISABLE_COPY(IntraProcessManagerImpl); template using RebindAlloc = typename std::allocator_traits::template rebind_alloc; @@ -270,13 +273,15 @@ private: RebindAlloc>>; PublisherMap publishers_; + + std::mutex runtime_mutex_; }; RCLCPP_PUBLIC -IntraProcessManagerStateBase::SharedPtr -create_default_state(); +IntraProcessManagerImplBase::SharedPtr +create_default_impl(); } // namespace intra_process_manager } // namespace rclcpp -#endif // RCLCPP__INTRA_PROCESS_MANAGER_STATE_HPP_ +#endif // RCLCPP__INTRA_PROCESS_MANAGER_IMPL_HPP_ diff --git a/rclcpp/src/rclcpp/intra_process_manager.cpp b/rclcpp/src/rclcpp/intra_process_manager.cpp index b919f4c..7f2a7b7 100644 --- a/rclcpp/src/rclcpp/intra_process_manager.cpp +++ b/rclcpp/src/rclcpp/intra_process_manager.cpp @@ -22,8 +22,8 @@ namespace intra_process_manager static std::atomic _next_unique_id {1}; IntraProcessManager::IntraProcessManager( - rclcpp::intra_process_manager::IntraProcessManagerStateBase::SharedPtr state) -: state_(state) + rclcpp::intra_process_manager::IntraProcessManagerImplBase::SharedPtr impl) +: impl_(impl) {} IntraProcessManager::~IntraProcessManager() @@ -34,26 +34,26 @@ IntraProcessManager::add_subscription( rclcpp::subscription::SubscriptionBase::SharedPtr subscription) { auto id = IntraProcessManager::get_next_unique_id(); - state_->add_subscription(id, subscription); + impl_->add_subscription(id, subscription); return id; } void IntraProcessManager::remove_subscription(uint64_t intra_process_subscription_id) { - state_->remove_subscription(intra_process_subscription_id); + impl_->remove_subscription(intra_process_subscription_id); } void IntraProcessManager::remove_publisher(uint64_t intra_process_publisher_id) { - state_->remove_publisher(intra_process_publisher_id); + impl_->remove_publisher(intra_process_publisher_id); } bool IntraProcessManager::matches_any_publishers(const rmw_gid_t * id) const { - return state_->matches_any_publishers(id); + return impl_->matches_any_publishers(id); } uint64_t diff --git a/rclcpp/src/rclcpp/intra_process_manager_impl.cpp b/rclcpp/src/rclcpp/intra_process_manager_impl.cpp index 113623f..2fa1d2f 100644 --- a/rclcpp/src/rclcpp/intra_process_manager_impl.cpp +++ b/rclcpp/src/rclcpp/intra_process_manager_impl.cpp @@ -12,12 +12,12 @@ // See the License for the specific language governing permissions and // limitations under the License. -#include "rclcpp/intra_process_manager_state.hpp" +#include "rclcpp/intra_process_manager_impl.hpp" #include -rclcpp::intra_process_manager::IntraProcessManagerStateBase::SharedPtr -rclcpp::intra_process_manager::create_default_state() +rclcpp::intra_process_manager::IntraProcessManagerImplBase::SharedPtr +rclcpp::intra_process_manager::create_default_impl() { - return std::make_shared>(); + return std::make_shared>(); } diff --git a/rclcpp/test/test_intra_process_manager.cpp b/rclcpp/test/test_intra_process_manager.cpp index 6d511d9..a39f36a 100644 --- a/rclcpp/test/test_intra_process_manager.cpp +++ b/rclcpp/test/test_intra_process_manager.cpp @@ -124,7 +124,7 @@ public: #define PublisherBase mock::PublisherBase #define SubscriptionBase mock::SubscriptionBase #include "../src/rclcpp/intra_process_manager.cpp" -#include "../src/rclcpp/intra_process_manager_state.cpp" +#include "../src/rclcpp/intra_process_manager_impl.cpp" #undef SubscriptionBase #undef Publisher #undef PublisherBase