templatize IPM state

This commit is contained in:
Jackie Kay 2015-10-26 17:01:40 -07:00
parent 4138e6be1d
commit c663d892a4
2 changed files with 308 additions and 109 deletions

View file

@ -15,6 +15,7 @@
#ifndef RCLCPP_RCLCPP_INTRA_PROCESS_MANAGER_HPP_
#define RCLCPP_RCLCPP_INTRA_PROCESS_MANAGER_HPP_
#include <rclcpp/intra_process_manager_state.hpp>
#include <rclcpp/mapped_ring_buffer.hpp>
#include <rclcpp/macros.hpp>
#include <rclcpp/publisher.hpp>
@ -24,7 +25,6 @@
#include <atomic>
#include <cstdint>
#include <exception>
#include <limits>
#include <map>
#include <unordered_map>
#include <set>
@ -122,7 +122,10 @@ private:
public:
RCLCPP_SMART_PTR_DEFINITIONS(IntraProcessManager);
IntraProcessManager() = default;
//IntraProcessManager() = default;
IntraProcessManager(IntraProcessManagerStateBase::SharedPtr state = create_default_state())
: state_(state) {
}
/// Register a subscription with the manager, returns subscriptions unique id.
/* In addition to generating a unique intra process id for the subscription,
@ -140,8 +143,7 @@ public:
add_subscription(subscription::SubscriptionBase::SharedPtr subscription)
{
auto id = IntraProcessManager::get_next_unique_id();
subscriptions_[id] = subscription;
subscription_ids_by_topic_[subscription->get_topic_name()].insert(id);
state_->add_subscription(id, subscription);
return id;
}
@ -153,17 +155,7 @@ public:
void
remove_subscription(uint64_t intra_process_subscription_id)
{
subscriptions_.erase(intra_process_subscription_id);
for (auto & pair : subscription_ids_by_topic_) {
pair.second.erase(intra_process_subscription_id);
}
// Iterate over all publisher infos and all stored subscription id's and
// remove references to this subscription's id.
for (auto & publisher_pair : publishers_) {
for (auto & sub_pair : publisher_pair.second.target_subscriptions_by_message_sequence) {
sub_pair.second.erase(intra_process_subscription_id);
}
}
state_->remove_subscription(intra_process_subscription_id);
}
/// Register a publisher with the manager, returns the publisher unique id.
@ -194,18 +186,11 @@ public:
size_t buffer_size = 0)
{
auto id = IntraProcessManager::get_next_unique_id();
publishers_[id].publisher = publisher;
size_t size = buffer_size > 0 ? buffer_size : publisher->get_queue_size();
// As long as the size of the ring buffer is less than the max sequence number, we're safe.
if (size > std::numeric_limits<uint64_t>::max()) {
throw std::invalid_argument("the calculated buffer size is too large");
}
publishers_[id].sequence_number.store(0);
publishers_[id].buffer =
mapped_ring_buffer::MappedRingBuffer<MessageT,
auto mrb = mapped_ring_buffer::MappedRingBuffer<MessageT,
typename publisher::Publisher<MessageT, Alloc>::MessageAlloc>::make_shared(
size, publisher->get_allocator());
publishers_[id].target_subscriptions_by_message_sequence.reserve(size);
state_->add_publisher(id, publisher, mrb, size);
return id;
}
@ -217,7 +202,7 @@ public:
void
remove_publisher(uint64_t intra_process_publisher_id)
{
publishers_.erase(intra_process_publisher_id);
state_->remove_publisher(intra_process_publisher_id);
}
/// Store a message in the manager, and return the message sequence number.
@ -258,37 +243,23 @@ public:
typename allocator::Deleter<typename std::allocator_traits<Alloc>::template rebind_alloc<MessageT>,
MessageT>> & message)
{
auto it = publishers_.find(intra_process_publisher_id);
if (it == publishers_.end()) {
throw std::runtime_error("store_intra_process_message called with invalid publisher id");
}
PublisherInfo & info = it->second;
// Calculate the next message sequence number.
uint64_t message_seq = info.sequence_number.fetch_add(1, std::memory_order_relaxed);
// Insert the message into the ring buffer using the message_seq to identify it.
using MRBMessageAlloc = typename std::allocator_traits<Alloc>::template rebind_alloc<MessageT>;
typedef typename mapped_ring_buffer::MappedRingBuffer<MessageT, MRBMessageAlloc> TypedMRB;
typename TypedMRB::SharedPtr typed_buffer = std::static_pointer_cast<TypedMRB>(info.buffer);
mapped_ring_buffer::MappedRingBufferBase::SharedPtr buffer;
auto message_seq = state_->get_publisher_info_for_id(intra_process_publisher_id, buffer);
typename TypedMRB::SharedPtr typed_buffer = std::static_pointer_cast<TypedMRB>(buffer);
if (!typed_buffer) {
throw std::runtime_error("Typecast failed due to incorrect message type");
}
// Insert the message into the ring buffer using the message_seq to identify it.
bool did_replace = typed_buffer->push_and_replace(message_seq, message);
// TODO(wjwwood): do something when a message was displaced. log debug?
(void)did_replace; // Avoid unused variable warning.
// Figure out what subscriptions should receive the message.
auto publisher = info.publisher.lock();
if (!publisher) {
throw std::runtime_error("publisher has unexpectedly gone out of scope");
}
auto & destined_subscriptions = subscription_ids_by_topic_[publisher->get_topic_name()];
// Store the list for later comparison.
info.target_subscriptions_by_message_sequence[message_seq].clear();
std::copy(
destined_subscriptions.begin(), destined_subscriptions.end(),
// Memory allocation occurs in info.target_subscriptions_by_message_sequence[message_seq]
std::inserter(
info.target_subscriptions_by_message_sequence[message_seq],
// This ends up only being a hint to std::set, could also be .begin().
info.target_subscriptions_by_message_sequence[message_seq].end()
)
);
state_->store_intra_process_message(intra_process_publisher_id, message_seq);
// Return the message sequence which is sent to the subscription.
return message_seq;
}
@ -337,41 +308,22 @@ public:
typename allocator::Deleter<typename std::allocator_traits<Alloc>::template rebind_alloc<MessageT>,
MessageT>> & message)
{
message = nullptr;
PublisherInfo * info;
{
auto it = publishers_.find(intra_process_publisher_id);
if (it == publishers_.end()) {
// Publisher is either invalid or no longer exists.
return;
}
info = &it->second;
}
// Figure out how many subscriptions are left.
std::set<uint64_t> * target_subs;
{
auto it = info->target_subscriptions_by_message_sequence.find(message_sequence_number);
if (it == info->target_subscriptions_by_message_sequence.end()) {
// Message is no longer being stored by this publisher.
return;
}
target_subs = &it->second;
}
{
auto it = std::find(
target_subs->begin(), target_subs->end(),
requesting_subscriptions_intra_process_id);
if (it == target_subs->end()) {
// This publisher id/message seq pair was not intended for this subscription.
return;
}
target_subs->erase(it);
}
using MRBMessageAlloc = typename std::allocator_traits<Alloc>::template rebind_alloc<MessageT>;
typedef typename mapped_ring_buffer::MappedRingBuffer<MessageT, MRBMessageAlloc> TypedMRB;
typename TypedMRB::SharedPtr typed_buffer = std::static_pointer_cast<TypedMRB>(info->buffer);
mapped_ring_buffer::MappedRingBufferBase::SharedPtr buffer;
size_t target_subs_size = state_->take_intra_process_message(
intra_process_publisher_id,
message_sequence_number,
requesting_subscriptions_intra_process_id,
buffer
);
typename TypedMRB::SharedPtr typed_buffer = std::static_pointer_cast<TypedMRB>(buffer);
if (!typed_buffer) {
return;
}
// Return a copy or the unique_ptr (ownership) depending on how many subscriptions are left.
if (target_subs->size()) {
if (target_subs_size) {
// There are more subscriptions to serve, return a copy.
typed_buffer->get_copy_at_key(message_sequence_number, message);
} else {
@ -384,16 +336,7 @@ public:
bool
matches_any_publishers(const rmw_gid_t * id) const
{
for (auto & publisher_pair : publishers_) {
auto publisher = publisher_pair.second.publisher.lock();
if (!publisher) {
continue;
}
if (*publisher.get() == id) {
return true;
}
}
return false;
return state_->matches_any_publishers(id);
}
private:
@ -420,22 +363,7 @@ private:
static std::atomic<uint64_t> next_unique_id_;
std::unordered_map<uint64_t, subscription::SubscriptionBase::WeakPtr> subscriptions_;
std::map<std::string, std::set<uint64_t>> subscription_ids_by_topic_;
struct PublisherInfo
{
RCLCPP_DISABLE_COPY(PublisherInfo);
PublisherInfo() = default;
publisher::PublisherBase::WeakPtr publisher;
std::atomic<uint64_t> sequence_number;
mapped_ring_buffer::MappedRingBufferBase::SharedPtr buffer;
std::unordered_map<uint64_t, std::set<uint64_t>> target_subscriptions_by_message_sequence;
};
std::unordered_map<uint64_t, PublisherInfo> publishers_;
IntraProcessManagerStateBase::SharedPtr state_;
};

View file

@ -0,0 +1,271 @@
// Copyright 2015 Open Source Robotics Foundation, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef RCLCPP__RCLCPP__INTRA_PROCESS_MANAGER_STATE_HPP_
#define RCLCPP__RCLCPP__INTRA_PROCESS_MANAGER_STATE_HPP_
#include <rclcpp/mapped_ring_buffer.hpp>
#include <rclcpp/publisher.hpp>
#include <rclcpp/subscription.hpp>
#include <algorithm>
#include <atomic>
#include <limits>
#include <memory>
#include <map>
#include <unordered_map>
#include <set>
namespace rclcpp
{
namespace intra_process_manager
{
class IntraProcessManagerStateBase {
public:
//RCLCPP_SMART_PTR_DEFINITIONS(IntraProcessManagerStateBase);
using SharedPtr = std::shared_ptr<IntraProcessManagerStateBase>;
virtual void
add_subscription(uint64_t id, subscription::SubscriptionBase::SharedPtr subscription) = 0;
virtual void
remove_subscription(uint64_t intra_process_subscription_id) = 0;
virtual void add_publisher(uint64_t id,
publisher::PublisherBase::WeakPtr publisher,
mapped_ring_buffer::MappedRingBufferBase::SharedPtr mrb,
size_t size) = 0;
virtual void
remove_publisher(uint64_t intra_process_publisher_id) = 0;
virtual uint64_t
get_publisher_info_for_id(
uint64_t intra_process_publisher_id,
mapped_ring_buffer::MappedRingBufferBase::SharedPtr & mrb) = 0;
virtual void
store_intra_process_message(uint64_t intra_process_publisher_id, uint64_t message_seq) = 0;
virtual size_t
take_intra_process_message(uint64_t intra_process_publisher_id,
uint64_t message_sequence_number,
uint64_t requesting_subscriptions_intra_process_id,
mapped_ring_buffer::MappedRingBufferBase::SharedPtr & mrb) = 0;
virtual bool
matches_any_publishers(const rmw_gid_t * id) const = 0;
};
template<typename Allocator = std::allocator<void>>
class IntraProcessManagerState : public IntraProcessManagerStateBase {
public:
void
add_subscription(uint64_t id, subscription::SubscriptionBase::SharedPtr subscription) {
subscriptions_[id] = subscription;
subscription_ids_by_topic_[subscription->get_topic_name()].insert(id);
}
void
remove_subscription(uint64_t intra_process_subscription_id) {
subscriptions_.erase(intra_process_subscription_id);
for (auto & pair : subscription_ids_by_topic_) {
pair.second.erase(intra_process_subscription_id);
}
// Iterate over all publisher infos and all stored subscription id's and
// remove references to this subscription's id.
for (auto & publisher_pair : publishers_) {
for (auto & sub_pair : publisher_pair.second.target_subscriptions_by_message_sequence) {
sub_pair.second.erase(intra_process_subscription_id);
}
}
}
void add_publisher(uint64_t id,
publisher::PublisherBase::WeakPtr publisher,
mapped_ring_buffer::MappedRingBufferBase::SharedPtr mrb,
size_t size) {
publishers_[id].publisher = publisher;
// As long as the size of the ring buffer is less than the max sequence number, we're safe.
if (size > std::numeric_limits<uint64_t>::max()) {
throw std::invalid_argument("the calculated buffer size is too large");
}
publishers_[id].sequence_number.store(0);
publishers_[id].buffer = mrb;
publishers_[id].target_subscriptions_by_message_sequence.reserve(size);
}
void
remove_publisher(uint64_t intra_process_publisher_id) {
publishers_.erase(intra_process_publisher_id);
}
// TODO
// return message_seq and mrb
uint64_t
get_publisher_info_for_id(
uint64_t intra_process_publisher_id,
mapped_ring_buffer::MappedRingBufferBase::SharedPtr & mrb)
{
auto it = publishers_.find(intra_process_publisher_id);
if (it == publishers_.end()) {
throw std::runtime_error("store_intra_process_message called with invalid publisher id");
}
PublisherInfo & info = it->second;
mrb = info.buffer;
// Calculate the next message sequence number.
uint64_t message_seq = info.sequence_number.fetch_add(1, std::memory_order_relaxed);
return message_seq;
}
void
store_intra_process_message(uint64_t intra_process_publisher_id, uint64_t message_seq)
{
auto it = publishers_.find(intra_process_publisher_id);
if (it == publishers_.end()) {
throw std::runtime_error("store_intra_process_message called with invalid publisher id");
}
PublisherInfo & info = it->second;
auto publisher = info.publisher.lock();
if (!publisher) {
throw std::runtime_error("publisher has unexpectedly gone out of scope");
}
// Figure out what subscriptions should receive the message.
auto & destined_subscriptions = subscription_ids_by_topic_[publisher->get_topic_name()];
// Store the list for later comparison.
info.target_subscriptions_by_message_sequence[message_seq].clear();
std::copy(
destined_subscriptions.begin(), destined_subscriptions.end(),
// Memory allocation occurs in info.target_subscriptions_by_message_sequence[message_seq]
std::inserter(
info.target_subscriptions_by_message_sequence[message_seq],
// This ends up only being a hint to std::set, could also be .begin().
info.target_subscriptions_by_message_sequence[message_seq].end()
)
);
}
size_t
take_intra_process_message(uint64_t intra_process_publisher_id,
uint64_t message_sequence_number,
uint64_t requesting_subscriptions_intra_process_id,
mapped_ring_buffer::MappedRingBufferBase::SharedPtr & mrb
)
{
mrb = nullptr;
PublisherInfo * info;
{
auto it = publishers_.find(intra_process_publisher_id);
if (it == publishers_.end()) {
// Publisher is either invalid or no longer exists.
return 0;
}
info = &it->second;
}
// Figure out how many subscriptions are left.
AllocSet * target_subs;
{
auto it = info->target_subscriptions_by_message_sequence.find(message_sequence_number);
if (it == info->target_subscriptions_by_message_sequence.end()) {
// Message is no longer being stored by this publisher.
return 0;
}
target_subs = &it->second;
}
{
auto it = std::find(
target_subs->begin(), target_subs->end(),
requesting_subscriptions_intra_process_id);
if (it == target_subs->end()) {
// This publisher id/message seq pair was not intended for this subscription.
return 0;
}
target_subs->erase(it);
}
mrb = info->buffer;
return target_subs->size();
}
bool
matches_any_publishers(const rmw_gid_t * id) const
{
for (auto & publisher_pair : publishers_) {
auto publisher = publisher_pair.second.publisher.lock();
if (!publisher) {
continue;
}
if (*publisher.get() == id) {
return true;
}
}
return false;
}
private:
template<typename T>
using RebindAlloc = typename std::allocator_traits<Allocator>::template rebind_alloc<T>;
//using AllocString = std::basic_string<char, std::char_traits<char>, RebindAlloc<char>>;
using AllocString = std::string;
using AllocSet = std::set<uint64_t, std::less<uint64_t>, RebindAlloc<uint64_t>>;
using SubscriptionMap = std::unordered_map<uint64_t, subscription::SubscriptionBase::WeakPtr,
std::hash<uint64_t>, std::equal_to<uint64_t>,
RebindAlloc<std::pair<const uint64_t, subscription::SubscriptionBase::WeakPtr>>>;
using IDTopicMap = std::map<AllocString, AllocSet,
std::less<AllocString>, RebindAlloc<std::pair<AllocString, AllocSet>>>;
SubscriptionMap subscriptions_;
IDTopicMap subscription_ids_by_topic_;
struct PublisherInfo
{
RCLCPP_DISABLE_COPY(PublisherInfo);
PublisherInfo() = default;
publisher::PublisherBase::WeakPtr publisher;
std::atomic<uint64_t> sequence_number;
mapped_ring_buffer::MappedRingBufferBase::SharedPtr buffer;
using TargetSubscriptionsMap = std::unordered_map<uint64_t, AllocSet,
std::hash<uint64_t>, std::equal_to<uint64_t>, RebindAlloc<std::pair<const uint64_t, AllocSet>>>;
TargetSubscriptionsMap target_subscriptions_by_message_sequence;
};
using PublisherMap = std::unordered_map<uint64_t, PublisherInfo,
std::hash<uint64_t>, std::equal_to<uint64_t>,
RebindAlloc<std::pair<const uint64_t, PublisherInfo>>>;
PublisherMap publishers_;
};
static IntraProcessManagerStateBase::SharedPtr create_default_state() {
return std::make_shared<IntraProcessManagerState<>>();
}
}
}
#endif