Finish renaming, add mutex to store and take

This commit is contained in:
Jackie Kay 2015-12-01 11:45:52 -08:00
parent 7fdbc4a89a
commit 67151def41
6 changed files with 38 additions and 33 deletions

View file

@ -24,7 +24,7 @@ set(${PROJECT_NAME}_SRCS
src/rclcpp/executors/multi_threaded_executor.cpp
src/rclcpp/executors/single_threaded_executor.cpp
src/rclcpp/intra_process_manager.cpp
src/rclcpp/intra_process_manager_state.cpp
src/rclcpp/intra_process_manager_impl.cpp
src/rclcpp/memory_strategies.cpp
src/rclcpp/memory_strategy.cpp
src/rclcpp/parameter.cpp

View file

@ -27,7 +27,7 @@
#include <set>
#include "rclcpp/allocator/allocator_deleter.hpp"
#include "rclcpp/intra_process_manager_state.hpp"
#include "rclcpp/intra_process_manager_impl.hpp"
#include "rclcpp/mapped_ring_buffer.hpp"
#include "rclcpp/macros.hpp"
#include "rclcpp/publisher.hpp"
@ -127,7 +127,7 @@ public:
RCLCPP_PUBLIC
explicit IntraProcessManager(
IntraProcessManagerStateBase::SharedPtr state = create_default_state());
IntraProcessManagerImplBase::SharedPtr state = create_default_impl());
RCLCPP_PUBLIC
virtual ~IntraProcessManager();
@ -189,7 +189,7 @@ public:
auto mrb = mapped_ring_buffer::MappedRingBuffer<MessageT,
typename publisher::Publisher<MessageT, Alloc>::MessageAlloc>::make_shared(
size, publisher->get_allocator());
state_->add_publisher(id, publisher, mrb, size);
impl_->add_publisher(id, publisher, mrb, size);
return id;
}
@ -242,7 +242,7 @@ public:
using MRBMessageAlloc = typename std::allocator_traits<Alloc>::template rebind_alloc<MessageT>;
using TypedMRB = typename mapped_ring_buffer::MappedRingBuffer<MessageT, MRBMessageAlloc>;
uint64_t message_seq = 0;
mapped_ring_buffer::MappedRingBufferBase::SharedPtr buffer = state_->get_publisher_info_for_id(
mapped_ring_buffer::MappedRingBufferBase::SharedPtr buffer = impl_->get_publisher_info_for_id(
intra_process_publisher_id, message_seq);
typename TypedMRB::SharedPtr typed_buffer = std::static_pointer_cast<TypedMRB>(buffer);
if (!typed_buffer) {
@ -254,7 +254,7 @@ public:
// TODO(wjwwood): do something when a message was displaced. log debug?
(void)did_replace; // Avoid unused variable warning.
state_->store_intra_process_message(intra_process_publisher_id, message_seq);
impl_->store_intra_process_message(intra_process_publisher_id, message_seq);
// Return the message sequence which is sent to the subscription.
return message_seq;
@ -308,7 +308,7 @@ public:
message = nullptr;
size_t target_subs_size = 0;
mapped_ring_buffer::MappedRingBufferBase::SharedPtr buffer = state_->take_intra_process_message(
mapped_ring_buffer::MappedRingBufferBase::SharedPtr buffer = impl_->take_intra_process_message(
intra_process_publisher_id,
message_sequence_number,
requesting_subscriptions_intra_process_id,
@ -338,7 +338,7 @@ private:
static uint64_t
get_next_unique_id();
IntraProcessManagerStateBase::SharedPtr state_;
IntraProcessManagerImplBase::SharedPtr impl_;
};
} // namespace intra_process_manager

View file

@ -12,8 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef RCLCPP__INTRA_PROCESS_MANAGER_STATE_HPP_
#define RCLCPP__INTRA_PROCESS_MANAGER_STATE_HPP_
#ifndef RCLCPP__INTRA_PROCESS_MANAGER_IMPL_HPP_
#define RCLCPP__INTRA_PROCESS_MANAGER_IMPL_HPP_
#include <algorithm>
#include <atomic>
@ -21,6 +21,7 @@
#include <limits>
#include <map>
#include <memory>
#include <mutex>
#include <set>
#include <string>
#include <unordered_map>
@ -37,13 +38,13 @@ namespace rclcpp
namespace intra_process_manager
{
class IntraProcessManagerStateBase
class IntraProcessManagerImplBase
{
public:
RCLCPP_SMART_PTR_DEFINITIONS_NOT_COPYABLE(IntraProcessManagerStateBase);
RCLCPP_SMART_PTR_DEFINITIONS_NOT_COPYABLE(IntraProcessManagerImplBase);
IntraProcessManagerStateBase() = default;
~IntraProcessManagerStateBase() = default;
IntraProcessManagerImplBase() = default;
~IntraProcessManagerImplBase() = default;
virtual void
add_subscription(uint64_t id, subscription::SubscriptionBase::SharedPtr subscription) = 0;
@ -77,15 +78,15 @@ public:
matches_any_publishers(const rmw_gid_t * id) const = 0;
private:
RCLCPP_DISABLE_COPY(IntraProcessManagerStateBase);
RCLCPP_DISABLE_COPY(IntraProcessManagerImplBase);
};
template<typename Allocator = std::allocator<void>>
class IntraProcessManagerState : public IntraProcessManagerStateBase
class IntraProcessManagerImpl : public IntraProcessManagerImplBase
{
public:
IntraProcessManagerState() = default;
~IntraProcessManagerState() = default;
IntraProcessManagerImpl() = default;
~IntraProcessManagerImpl() = default;
void
add_subscription(uint64_t id, subscription::SubscriptionBase::SharedPtr subscription)
@ -152,6 +153,7 @@ public:
void
store_intra_process_message(uint64_t intra_process_publisher_id, uint64_t message_seq)
{
std::lock_guard<std::mutex> lock(runtime_mutex_);
auto it = publishers_.find(intra_process_publisher_id);
if (it == publishers_.end()) {
throw std::runtime_error("store_intra_process_message called with invalid publisher id");
@ -184,6 +186,7 @@ public:
size_t & size
)
{
std::lock_guard<std::mutex> lock(runtime_mutex_);
PublisherInfo * info;
{
auto it = publishers_.find(intra_process_publisher_id);
@ -233,7 +236,7 @@ public:
}
private:
RCLCPP_DISABLE_COPY(IntraProcessManagerState);
RCLCPP_DISABLE_COPY(IntraProcessManagerImpl);
template<typename T>
using RebindAlloc = typename std::allocator_traits<Allocator>::template rebind_alloc<T>;
@ -270,13 +273,15 @@ private:
RebindAlloc<std::pair<const uint64_t, PublisherInfo>>>;
PublisherMap publishers_;
std::mutex runtime_mutex_;
};
RCLCPP_PUBLIC
IntraProcessManagerStateBase::SharedPtr
create_default_state();
IntraProcessManagerImplBase::SharedPtr
create_default_impl();
} // namespace intra_process_manager
} // namespace rclcpp
#endif // RCLCPP__INTRA_PROCESS_MANAGER_STATE_HPP_
#endif // RCLCPP__INTRA_PROCESS_MANAGER_IMPL_HPP_

View file

@ -22,8 +22,8 @@ namespace intra_process_manager
static std::atomic<uint64_t> _next_unique_id {1};
IntraProcessManager::IntraProcessManager(
rclcpp::intra_process_manager::IntraProcessManagerStateBase::SharedPtr state)
: state_(state)
rclcpp::intra_process_manager::IntraProcessManagerImplBase::SharedPtr impl)
: impl_(impl)
{}
IntraProcessManager::~IntraProcessManager()
@ -34,26 +34,26 @@ IntraProcessManager::add_subscription(
rclcpp::subscription::SubscriptionBase::SharedPtr subscription)
{
auto id = IntraProcessManager::get_next_unique_id();
state_->add_subscription(id, subscription);
impl_->add_subscription(id, subscription);
return id;
}
void
IntraProcessManager::remove_subscription(uint64_t intra_process_subscription_id)
{
state_->remove_subscription(intra_process_subscription_id);
impl_->remove_subscription(intra_process_subscription_id);
}
void
IntraProcessManager::remove_publisher(uint64_t intra_process_publisher_id)
{
state_->remove_publisher(intra_process_publisher_id);
impl_->remove_publisher(intra_process_publisher_id);
}
bool
IntraProcessManager::matches_any_publishers(const rmw_gid_t * id) const
{
return state_->matches_any_publishers(id);
return impl_->matches_any_publishers(id);
}
uint64_t

View file

@ -12,12 +12,12 @@
// See the License for the specific language governing permissions and
// limitations under the License.
#include "rclcpp/intra_process_manager_state.hpp"
#include "rclcpp/intra_process_manager_impl.hpp"
#include <memory>
rclcpp::intra_process_manager::IntraProcessManagerStateBase::SharedPtr
rclcpp::intra_process_manager::create_default_state()
rclcpp::intra_process_manager::IntraProcessManagerImplBase::SharedPtr
rclcpp::intra_process_manager::create_default_impl()
{
return std::make_shared<IntraProcessManagerState<>>();
return std::make_shared<IntraProcessManagerImpl<>>();
}

View file

@ -124,7 +124,7 @@ public:
#define PublisherBase mock::PublisherBase
#define SubscriptionBase mock::SubscriptionBase
#include "../src/rclcpp/intra_process_manager.cpp"
#include "../src/rclcpp/intra_process_manager_state.cpp"
#include "../src/rclcpp/intra_process_manager_impl.cpp"
#undef SubscriptionBase
#undef Publisher
#undef PublisherBase