Merge pull request #165 from ros2/intra_process_lock

Make intra-process manager thread safe, rename IPMState to IPMImpl
This commit is contained in:
Jackie Kay 2015-12-03 09:34:06 -08:00
commit f73ebcb8d7
6 changed files with 38 additions and 33 deletions

View file

@ -24,7 +24,7 @@ set(${PROJECT_NAME}_SRCS
src/rclcpp/executors/multi_threaded_executor.cpp src/rclcpp/executors/multi_threaded_executor.cpp
src/rclcpp/executors/single_threaded_executor.cpp src/rclcpp/executors/single_threaded_executor.cpp
src/rclcpp/intra_process_manager.cpp src/rclcpp/intra_process_manager.cpp
src/rclcpp/intra_process_manager_state.cpp src/rclcpp/intra_process_manager_impl.cpp
src/rclcpp/memory_strategies.cpp src/rclcpp/memory_strategies.cpp
src/rclcpp/memory_strategy.cpp src/rclcpp/memory_strategy.cpp
src/rclcpp/parameter.cpp src/rclcpp/parameter.cpp

View file

@ -27,7 +27,7 @@
#include <set> #include <set>
#include "rclcpp/allocator/allocator_deleter.hpp" #include "rclcpp/allocator/allocator_deleter.hpp"
#include "rclcpp/intra_process_manager_state.hpp" #include "rclcpp/intra_process_manager_impl.hpp"
#include "rclcpp/mapped_ring_buffer.hpp" #include "rclcpp/mapped_ring_buffer.hpp"
#include "rclcpp/macros.hpp" #include "rclcpp/macros.hpp"
#include "rclcpp/publisher.hpp" #include "rclcpp/publisher.hpp"
@ -127,7 +127,7 @@ public:
RCLCPP_PUBLIC RCLCPP_PUBLIC
explicit IntraProcessManager( explicit IntraProcessManager(
IntraProcessManagerStateBase::SharedPtr state = create_default_state()); IntraProcessManagerImplBase::SharedPtr state = create_default_impl());
RCLCPP_PUBLIC RCLCPP_PUBLIC
virtual ~IntraProcessManager(); virtual ~IntraProcessManager();
@ -189,7 +189,7 @@ public:
auto mrb = mapped_ring_buffer::MappedRingBuffer<MessageT, auto mrb = mapped_ring_buffer::MappedRingBuffer<MessageT,
typename publisher::Publisher<MessageT, Alloc>::MessageAlloc>::make_shared( typename publisher::Publisher<MessageT, Alloc>::MessageAlloc>::make_shared(
size, publisher->get_allocator()); size, publisher->get_allocator());
state_->add_publisher(id, publisher, mrb, size); impl_->add_publisher(id, publisher, mrb, size);
return id; return id;
} }
@ -242,7 +242,7 @@ public:
using MRBMessageAlloc = typename std::allocator_traits<Alloc>::template rebind_alloc<MessageT>; using MRBMessageAlloc = typename std::allocator_traits<Alloc>::template rebind_alloc<MessageT>;
using TypedMRB = typename mapped_ring_buffer::MappedRingBuffer<MessageT, MRBMessageAlloc>; using TypedMRB = typename mapped_ring_buffer::MappedRingBuffer<MessageT, MRBMessageAlloc>;
uint64_t message_seq = 0; uint64_t message_seq = 0;
mapped_ring_buffer::MappedRingBufferBase::SharedPtr buffer = state_->get_publisher_info_for_id( mapped_ring_buffer::MappedRingBufferBase::SharedPtr buffer = impl_->get_publisher_info_for_id(
intra_process_publisher_id, message_seq); intra_process_publisher_id, message_seq);
typename TypedMRB::SharedPtr typed_buffer = std::static_pointer_cast<TypedMRB>(buffer); typename TypedMRB::SharedPtr typed_buffer = std::static_pointer_cast<TypedMRB>(buffer);
if (!typed_buffer) { if (!typed_buffer) {
@ -254,7 +254,7 @@ public:
// TODO(wjwwood): do something when a message was displaced. log debug? // TODO(wjwwood): do something when a message was displaced. log debug?
(void)did_replace; // Avoid unused variable warning. (void)did_replace; // Avoid unused variable warning.
state_->store_intra_process_message(intra_process_publisher_id, message_seq); impl_->store_intra_process_message(intra_process_publisher_id, message_seq);
// Return the message sequence which is sent to the subscription. // Return the message sequence which is sent to the subscription.
return message_seq; return message_seq;
@ -308,7 +308,7 @@ public:
message = nullptr; message = nullptr;
size_t target_subs_size = 0; size_t target_subs_size = 0;
mapped_ring_buffer::MappedRingBufferBase::SharedPtr buffer = state_->take_intra_process_message( mapped_ring_buffer::MappedRingBufferBase::SharedPtr buffer = impl_->take_intra_process_message(
intra_process_publisher_id, intra_process_publisher_id,
message_sequence_number, message_sequence_number,
requesting_subscriptions_intra_process_id, requesting_subscriptions_intra_process_id,
@ -338,7 +338,7 @@ private:
static uint64_t static uint64_t
get_next_unique_id(); get_next_unique_id();
IntraProcessManagerStateBase::SharedPtr state_; IntraProcessManagerImplBase::SharedPtr impl_;
}; };
} // namespace intra_process_manager } // namespace intra_process_manager

View file

@ -12,8 +12,8 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
#ifndef RCLCPP__INTRA_PROCESS_MANAGER_STATE_HPP_ #ifndef RCLCPP__INTRA_PROCESS_MANAGER_IMPL_HPP_
#define RCLCPP__INTRA_PROCESS_MANAGER_STATE_HPP_ #define RCLCPP__INTRA_PROCESS_MANAGER_IMPL_HPP_
#include <algorithm> #include <algorithm>
#include <atomic> #include <atomic>
@ -21,6 +21,7 @@
#include <limits> #include <limits>
#include <map> #include <map>
#include <memory> #include <memory>
#include <mutex>
#include <set> #include <set>
#include <string> #include <string>
#include <unordered_map> #include <unordered_map>
@ -37,13 +38,13 @@ namespace rclcpp
namespace intra_process_manager namespace intra_process_manager
{ {
class IntraProcessManagerStateBase class IntraProcessManagerImplBase
{ {
public: public:
RCLCPP_SMART_PTR_DEFINITIONS_NOT_COPYABLE(IntraProcessManagerStateBase); RCLCPP_SMART_PTR_DEFINITIONS_NOT_COPYABLE(IntraProcessManagerImplBase);
IntraProcessManagerStateBase() = default; IntraProcessManagerImplBase() = default;
~IntraProcessManagerStateBase() = default; ~IntraProcessManagerImplBase() = default;
virtual void virtual void
add_subscription(uint64_t id, subscription::SubscriptionBase::SharedPtr subscription) = 0; add_subscription(uint64_t id, subscription::SubscriptionBase::SharedPtr subscription) = 0;
@ -77,15 +78,15 @@ public:
matches_any_publishers(const rmw_gid_t * id) const = 0; matches_any_publishers(const rmw_gid_t * id) const = 0;
private: private:
RCLCPP_DISABLE_COPY(IntraProcessManagerStateBase); RCLCPP_DISABLE_COPY(IntraProcessManagerImplBase);
}; };
template<typename Allocator = std::allocator<void>> template<typename Allocator = std::allocator<void>>
class IntraProcessManagerState : public IntraProcessManagerStateBase class IntraProcessManagerImpl : public IntraProcessManagerImplBase
{ {
public: public:
IntraProcessManagerState() = default; IntraProcessManagerImpl() = default;
~IntraProcessManagerState() = default; ~IntraProcessManagerImpl() = default;
void void
add_subscription(uint64_t id, subscription::SubscriptionBase::SharedPtr subscription) add_subscription(uint64_t id, subscription::SubscriptionBase::SharedPtr subscription)
@ -152,6 +153,7 @@ public:
void void
store_intra_process_message(uint64_t intra_process_publisher_id, uint64_t message_seq) store_intra_process_message(uint64_t intra_process_publisher_id, uint64_t message_seq)
{ {
std::lock_guard<std::mutex> lock(runtime_mutex_);
auto it = publishers_.find(intra_process_publisher_id); auto it = publishers_.find(intra_process_publisher_id);
if (it == publishers_.end()) { if (it == publishers_.end()) {
throw std::runtime_error("store_intra_process_message called with invalid publisher id"); throw std::runtime_error("store_intra_process_message called with invalid publisher id");
@ -184,6 +186,7 @@ public:
size_t & size size_t & size
) )
{ {
std::lock_guard<std::mutex> lock(runtime_mutex_);
PublisherInfo * info; PublisherInfo * info;
{ {
auto it = publishers_.find(intra_process_publisher_id); auto it = publishers_.find(intra_process_publisher_id);
@ -233,7 +236,7 @@ public:
} }
private: private:
RCLCPP_DISABLE_COPY(IntraProcessManagerState); RCLCPP_DISABLE_COPY(IntraProcessManagerImpl);
template<typename T> template<typename T>
using RebindAlloc = typename std::allocator_traits<Allocator>::template rebind_alloc<T>; using RebindAlloc = typename std::allocator_traits<Allocator>::template rebind_alloc<T>;
@ -270,13 +273,15 @@ private:
RebindAlloc<std::pair<const uint64_t, PublisherInfo>>>; RebindAlloc<std::pair<const uint64_t, PublisherInfo>>>;
PublisherMap publishers_; PublisherMap publishers_;
std::mutex runtime_mutex_;
}; };
RCLCPP_PUBLIC RCLCPP_PUBLIC
IntraProcessManagerStateBase::SharedPtr IntraProcessManagerImplBase::SharedPtr
create_default_state(); create_default_impl();
} // namespace intra_process_manager } // namespace intra_process_manager
} // namespace rclcpp } // namespace rclcpp
#endif // RCLCPP__INTRA_PROCESS_MANAGER_STATE_HPP_ #endif // RCLCPP__INTRA_PROCESS_MANAGER_IMPL_HPP_

View file

@ -22,8 +22,8 @@ namespace intra_process_manager
static std::atomic<uint64_t> _next_unique_id {1}; static std::atomic<uint64_t> _next_unique_id {1};
IntraProcessManager::IntraProcessManager( IntraProcessManager::IntraProcessManager(
rclcpp::intra_process_manager::IntraProcessManagerStateBase::SharedPtr state) rclcpp::intra_process_manager::IntraProcessManagerImplBase::SharedPtr impl)
: state_(state) : impl_(impl)
{} {}
IntraProcessManager::~IntraProcessManager() IntraProcessManager::~IntraProcessManager()
@ -34,26 +34,26 @@ IntraProcessManager::add_subscription(
rclcpp::subscription::SubscriptionBase::SharedPtr subscription) rclcpp::subscription::SubscriptionBase::SharedPtr subscription)
{ {
auto id = IntraProcessManager::get_next_unique_id(); auto id = IntraProcessManager::get_next_unique_id();
state_->add_subscription(id, subscription); impl_->add_subscription(id, subscription);
return id; return id;
} }
void void
IntraProcessManager::remove_subscription(uint64_t intra_process_subscription_id) IntraProcessManager::remove_subscription(uint64_t intra_process_subscription_id)
{ {
state_->remove_subscription(intra_process_subscription_id); impl_->remove_subscription(intra_process_subscription_id);
} }
void void
IntraProcessManager::remove_publisher(uint64_t intra_process_publisher_id) IntraProcessManager::remove_publisher(uint64_t intra_process_publisher_id)
{ {
state_->remove_publisher(intra_process_publisher_id); impl_->remove_publisher(intra_process_publisher_id);
} }
bool bool
IntraProcessManager::matches_any_publishers(const rmw_gid_t * id) const IntraProcessManager::matches_any_publishers(const rmw_gid_t * id) const
{ {
return state_->matches_any_publishers(id); return impl_->matches_any_publishers(id);
} }
uint64_t uint64_t

View file

@ -12,12 +12,12 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
#include "rclcpp/intra_process_manager_state.hpp" #include "rclcpp/intra_process_manager_impl.hpp"
#include <memory> #include <memory>
rclcpp::intra_process_manager::IntraProcessManagerStateBase::SharedPtr rclcpp::intra_process_manager::IntraProcessManagerImplBase::SharedPtr
rclcpp::intra_process_manager::create_default_state() rclcpp::intra_process_manager::create_default_impl()
{ {
return std::make_shared<IntraProcessManagerState<>>(); return std::make_shared<IntraProcessManagerImpl<>>();
} }

View file

@ -124,7 +124,7 @@ public:
#define PublisherBase mock::PublisherBase #define PublisherBase mock::PublisherBase
#define SubscriptionBase mock::SubscriptionBase #define SubscriptionBase mock::SubscriptionBase
#include "../src/rclcpp/intra_process_manager.cpp" #include "../src/rclcpp/intra_process_manager.cpp"
#include "../src/rclcpp/intra_process_manager_state.cpp" #include "../src/rclcpp/intra_process_manager_impl.cpp"
#undef SubscriptionBase #undef SubscriptionBase
#undef Publisher #undef Publisher
#undef PublisherBase #undef PublisherBase