Merge pull request #64 from ros2/msg_memory_strategy

Create MessageMemoryStrategy for subscribers
This commit is contained in:
Jackie Kay 2015-07-24 15:06:45 -07:00
commit e16cef54cc
5 changed files with 164 additions and 6 deletions

View file

@ -0,0 +1,53 @@
// Copyright 2015 Open Source Robotics Foundation, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef RCLCPP_RCLCPP_MSG_MEMORY_STRATEGY_HPP_
#define RCLCPP_RCLCPP_MSG_MEMORY_STRATEGY_HPP_
#include <memory>
#include <rclcpp/macros.hpp>
namespace rclcpp
{
namespace message_memory_strategy
{
template<typename MessageT>
class MessageMemoryStrategy
{
public:
RCLCPP_MAKE_SHARED_DEFINITIONS(MessageMemoryStrategy);
static SharedPtr create_default()
{
return SharedPtr(new MessageMemoryStrategy<MessageT>);
}
virtual std::shared_ptr<MessageT> borrow_message()
{
return std::shared_ptr<MessageT>(new MessageT);
}
virtual void return_message(std::shared_ptr<MessageT> & msg)
{
msg.reset();
}
};
} /* message_memory_strategy */
} /* rclcpp */
#endif /* RCLCPP_RCLCPP_MSG_MEMORY_STRATEGY_HPP_ */

View file

@ -29,6 +29,7 @@
#include <rclcpp/client.hpp> #include <rclcpp/client.hpp>
#include <rclcpp/context.hpp> #include <rclcpp/context.hpp>
#include <rclcpp/macros.hpp> #include <rclcpp/macros.hpp>
#include <rclcpp/message_memory_strategy.hpp>
#include <rclcpp/parameter.hpp> #include <rclcpp/parameter.hpp>
#include <rclcpp/publisher.hpp> #include <rclcpp/publisher.hpp>
#include <rclcpp/service.hpp> #include <rclcpp/service.hpp>
@ -120,6 +121,7 @@ public:
create_publisher(const std::string & topic_name, size_t queue_size); create_publisher(const std::string & topic_name, size_t queue_size);
/* Create and return a Subscription. */ /* Create and return a Subscription. */
template<typename MessageT> template<typename MessageT>
typename rclcpp::subscription::Subscription<MessageT>::SharedPtr typename rclcpp::subscription::Subscription<MessageT>::SharedPtr
create_subscription( create_subscription(
@ -127,7 +129,10 @@ public:
size_t queue_size, size_t queue_size,
std::function<void(const std::shared_ptr<MessageT> &)> callback, std::function<void(const std::shared_ptr<MessageT> &)> callback,
rclcpp::callback_group::CallbackGroup::SharedPtr group = nullptr, rclcpp::callback_group::CallbackGroup::SharedPtr group = nullptr,
bool ignore_local_publications = false); bool ignore_local_publications = false,
typename rclcpp::message_memory_strategy::MessageMemoryStrategy<MessageT>::SharedPtr
msg_mem_strat =
rclcpp::message_memory_strategy::MessageMemoryStrategy<MessageT>::create_default());
/* Create a timer. */ /* Create a timer. */
rclcpp::timer::WallTimer::SharedPtr rclcpp::timer::WallTimer::SharedPtr

View file

@ -121,7 +121,8 @@ Node::create_subscription(
size_t queue_size, size_t queue_size,
std::function<void(const std::shared_ptr<MessageT> &)> callback, std::function<void(const std::shared_ptr<MessageT> &)> callback,
rclcpp::callback_group::CallbackGroup::SharedPtr group, rclcpp::callback_group::CallbackGroup::SharedPtr group,
bool ignore_local_publications) bool ignore_local_publications,
typename message_memory_strategy::MessageMemoryStrategy<MessageT>::SharedPtr msg_mem_strat)
{ {
using rosidl_generator_cpp::get_message_type_support_handle; using rosidl_generator_cpp::get_message_type_support_handle;
auto type_support_handle = get_message_type_support_handle<MessageT>(); auto type_support_handle = get_message_type_support_handle<MessageT>();
@ -143,7 +144,8 @@ Node::create_subscription(
subscriber_handle, subscriber_handle,
topic_name, topic_name,
ignore_local_publications, ignore_local_publications,
callback); callback,
msg_mem_strat);
auto sub_base_ptr = std::dynamic_pointer_cast<SubscriptionBase>(sub); auto sub_base_ptr = std::dynamic_pointer_cast<SubscriptionBase>(sub);
if (group) { if (group) {
if (!group_in_node(group)) { if (!group_in_node(group)) {

View file

@ -0,0 +1,85 @@
// Copyright 2015 Open Source Robotics Foundation, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef RCLCPP_RCLCPP_MSG_POOL_MEMORY_STRATEGY_HPP_
#define RCLCPP_RCLCPP_MSG_POOL_MEMORY_STRATEGY_HPP_
#include <rclcpp/macros.hpp>
#include <rclcpp/message_memory_strategy.hpp>
namespace rclcpp
{
namespace strategies
{
namespace message_pool_memory_strategy
{
template<typename MessageT, size_t size,
typename std::enable_if<rosidl_generator_traits::has_fixed_size<MessageT>::value>::type * =
nullptr>
class MessagePoolMemoryStrategy
: public message_memory_strategy::MessageMemoryStrategy<MessageT>
{
public:
RCLCPP_MAKE_SHARED_DEFINITIONS(MessagePoolMemoryStrategy);
MessagePoolMemoryStrategy()
: next_array_index_(0)
{
for (size_t i = 0; i < size; ++i) {
pool_[i].msg_ptr_ = std::make_shared<MessageT>();
pool_[i].used = false;
}
}
std::shared_ptr<MessageT> borrow_message()
{
size_t current_index = next_array_index_;
next_array_index_ = (next_array_index_ + 1) % size;
if (pool_[current_index].used) {
throw std::runtime_error("Tried to access message that was still in use! Abort.");
}
pool_[current_index].msg_ptr_->~MessageT();
new (pool_[current_index].msg_ptr_.get())MessageT;
pool_[current_index].used = true;
return pool_[current_index].msg_ptr_;
}
void return_message(std::shared_ptr<MessageT> & msg)
{
for (size_t i = 0; i < size; ++i) {
if (pool_[i].msg_ptr_ == msg) {
pool_[i].used = false;
return;
}
}
throw std::runtime_error("Unrecognized message ptr in return_message.");
}
protected:
struct PoolMember
{
std::shared_ptr<MessageT> msg_ptr_;
bool used;
};
std::array<PoolMember, size> pool_;
size_t next_array_index_;
};
} /* message_pool_memory_strategy */
} /* strategies */
} /* rclcpp */
#endif /* RCLCPP_RCLCPP_MSG_POOL_MEMORY_STRATEGY_HPP_ */

View file

@ -25,6 +25,7 @@
#include <rmw/rmw.h> #include <rmw/rmw.h>
#include <rclcpp/macros.hpp> #include <rclcpp/macros.hpp>
#include <rclcpp/message_memory_strategy.hpp>
namespace rclcpp namespace rclcpp
{ {
@ -102,26 +103,38 @@ public:
rmw_subscription_t * subscription_handle, rmw_subscription_t * subscription_handle,
const std::string & topic_name, const std::string & topic_name,
bool ignore_local_publications, bool ignore_local_publications,
CallbackType callback) CallbackType callback,
typename message_memory_strategy::MessageMemoryStrategy<MessageT>::SharedPtr memory_strategy =
message_memory_strategy::MessageMemoryStrategy<MessageT>::create_default())
: SubscriptionBase(node_handle, subscription_handle, topic_name, ignore_local_publications), : SubscriptionBase(node_handle, subscription_handle, topic_name, ignore_local_publications),
callback_(callback) callback_(callback),
message_memory_strategy_(memory_strategy)
{} {}
void set_message_memory_strategy(
typename message_memory_strategy::MessageMemoryStrategy<MessageT>::SharedPtr message_memory_strategy)
{
message_memory_strategy_ = message_memory_strategy;
}
std::shared_ptr<void> create_message() std::shared_ptr<void> create_message()
{ {
return std::shared_ptr<void>(new MessageT()); return message_memory_strategy_->borrow_message();
} }
void handle_message(std::shared_ptr<void> & message) void handle_message(std::shared_ptr<void> & message)
{ {
auto typed_message = std::static_pointer_cast<MessageT>(message); auto typed_message = std::static_pointer_cast<MessageT>(message);
callback_(typed_message); callback_(typed_message);
message_memory_strategy_->return_message(typed_message);
} }
private: private:
RCLCPP_DISABLE_COPY(Subscription); RCLCPP_DISABLE_COPY(Subscription);
CallbackType callback_; CallbackType callback_;
typename message_memory_strategy::MessageMemoryStrategy<MessageT>::SharedPtr
message_memory_strategy_;
}; };