clean up publisher and subscription creation logic (#867)

* streamline creation of publishers after removing deprecated API

Signed-off-by: William Woodall <william@osrfoundation.org>

* use deduced template arguments to cleanup create_subscription

Signed-off-by: William Woodall <william@osrfoundation.org>

* add missing file

Signed-off-by: William Woodall <william@osrfoundation.org>

* streamline creation of subscriptions after removing deprecated API

Signed-off-by: William Woodall <william@osrfoundation.org>

* small subscription code cleanup to match publisher's style

Signed-off-by: William Woodall <william@osrfoundation.org>

* some fixes to rclcpp_lifecycle to match rclcpp

Signed-off-by: William Woodall <william@osrfoundation.org>

* add README to the rclcpp/detail include directory

Signed-off-by: William Woodall <william@osrfoundation.org>

* fixup SubscriptionBase's use of visibility macros

Signed-off-by: William Woodall <william@osrfoundation.org>

* reapply function to create default options, as it is still needed on Windows

Signed-off-by: William Woodall <william@osrfoundation.org>

* address review comments

Signed-off-by: William Woodall <william@osrfoundation.org>

* workaround cppcheck 1.89 syntax error

Signed-off-by: William Woodall <william@osrfoundation.org>
This commit is contained in:
William Woodall 2019-09-25 16:22:06 -07:00 committed by GitHub
parent 7728d8a38c
commit 81049c42c0
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
22 changed files with 381 additions and 297 deletions

View file

@ -21,9 +21,9 @@
#include <vector>
#include "rclcpp/client.hpp"
#include "rclcpp/publisher.hpp"
#include "rclcpp/publisher_base.hpp"
#include "rclcpp/service.hpp"
#include "rclcpp/subscription.hpp"
#include "rclcpp/subscription_base.hpp"
#include "rclcpp/timer.hpp"
#include "rclcpp/visibility_control.hpp"
#include "rclcpp/waitable.hpp"

View file

@ -37,7 +37,7 @@ namespace rclcpp
template<
typename MessageT,
typename AllocatorT = std::allocator<void>,
typename PublisherT = ::rclcpp::Publisher<MessageT, AllocatorT>,
typename PublisherT = rclcpp::Publisher<MessageT, AllocatorT>,
typename NodeT>
std::shared_ptr<PublisherT>
create_publisher(
@ -46,43 +46,23 @@ create_publisher(
const rclcpp::QoS & qos,
const rclcpp::PublisherOptionsWithAllocator<AllocatorT> & options = (
rclcpp::PublisherOptionsWithAllocator<AllocatorT>()
))
)
)
{
// Extract the NodeTopicsInterface from the NodeT.
using rclcpp::node_interfaces::get_node_topics_interface;
auto node_topics = get_node_topics_interface(node);
std::shared_ptr<AllocatorT> allocator = options.allocator;
if (!allocator) {
allocator = std::make_shared<AllocatorT>();
}
bool use_intra_process;
switch (options.use_intra_process_comm) {
case IntraProcessSetting::Enable:
use_intra_process = true;
break;
case IntraProcessSetting::Disable:
use_intra_process = false;
break;
case IntraProcessSetting::NodeDefault:
use_intra_process = node_topics->get_node_base_interface()->get_use_intra_process_default();
break;
default:
throw std::runtime_error("Unrecognized IntraProcessSetting value");
break;
}
// TODO(wjwwood): convert all of the interfaces to use QoS and PublisherOptionsBase
// Create the publisher.
auto pub = node_topics->create_publisher(
topic_name,
rclcpp::create_publisher_factory<MessageT, AllocatorT, PublisherT>(
options.event_callbacks,
allocator
),
options.template to_rcl_publisher_options<MessageT>(qos),
use_intra_process
rclcpp::create_publisher_factory<MessageT, AllocatorT, PublisherT>(options),
qos
);
// Add the publisher to the node topics interface.
node_topics->add_publisher(pub, options.callback_group);
return std::dynamic_pointer_cast<PublisherT>(pub);
}

View file

@ -42,6 +42,10 @@ template<
typename CallbackMessageT =
typename rclcpp::subscription_traits::has_message_type<CallbackT>::type,
typename SubscriptionT = rclcpp::Subscription<CallbackMessageT, AllocatorT>,
typename MessageMemoryStrategyT = rclcpp::message_memory_strategy::MessageMemoryStrategy<
CallbackMessageT,
AllocatorT
>,
typename NodeT>
typename std::shared_ptr<SubscriptionT>
create_subscription(
@ -52,48 +56,21 @@ create_subscription(
const rclcpp::SubscriptionOptionsWithAllocator<AllocatorT> & options = (
rclcpp::SubscriptionOptionsWithAllocator<AllocatorT>()
),
typename rclcpp::message_memory_strategy::MessageMemoryStrategy<
CallbackMessageT, AllocatorT>::SharedPtr
msg_mem_strat = nullptr)
typename MessageMemoryStrategyT::SharedPtr msg_mem_strat = (
MessageMemoryStrategyT::create_default()
)
)
{
using rclcpp::node_interfaces::get_node_topics_interface;
auto node_topics = get_node_topics_interface(std::forward<NodeT>(node));
if (!msg_mem_strat) {
using rclcpp::message_memory_strategy::MessageMemoryStrategy;
msg_mem_strat = MessageMemoryStrategy<CallbackMessageT, AllocatorT>::create_default();
}
auto factory = rclcpp::create_subscription_factory<MessageT>(
std::forward<CallbackT>(callback),
options,
msg_mem_strat
);
std::shared_ptr<AllocatorT> allocator = options.allocator;
if (!allocator) {
allocator = std::make_shared<AllocatorT>();
}
auto factory = rclcpp::create_subscription_factory
<MessageT, CallbackT, AllocatorT, CallbackMessageT, SubscriptionT>(
std::forward<CallbackT>(callback), options.event_callbacks, msg_mem_strat, allocator);
bool use_intra_process;
switch (options.use_intra_process_comm) {
case IntraProcessSetting::Enable:
use_intra_process = true;
break;
case IntraProcessSetting::Disable:
use_intra_process = false;
break;
case IntraProcessSetting::NodeDefault:
use_intra_process = node_topics->get_node_base_interface()->get_use_intra_process_default();
break;
default:
throw std::runtime_error("Unrecognized IntraProcessSetting value");
break;
}
// TODO(wjwwood): convert all of the interfaces to use QoS and SubscriptionOptionsBase
auto sub = node_topics->create_subscription(
topic_name,
factory,
options.template to_rcl_subscription_options<MessageT>(qos),
use_intra_process);
auto sub = node_topics->create_subscription(topic_name, factory, qos);
node_topics->add_subscription(sub, options.callback_group);
return std::dynamic_pointer_cast<SubscriptionT>(sub);
}

View file

@ -0,0 +1,3 @@
Notice that headers in this folder should only provide symbols in the rclcpp::detail namespace.
Also that these headers are not considered part of the public API and are subject to change without notice.

View file

@ -0,0 +1,55 @@
// Copyright 2019 Open Source Robotics Foundation, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef RCLCPP__DETAIL__RESOLVE_USE_INTRA_PROCESS_HPP_
#define RCLCPP__DETAIL__RESOLVE_USE_INTRA_PROCESS_HPP_
#include <stdexcept>
#include "rclcpp/intra_process_setting.hpp"
namespace rclcpp
{
namespace detail
{
template<typename OptionsT, typename NodeBaseT>
bool
resolve_use_intra_process(const OptionsT & options, const NodeBaseT & node_base)
{
bool use_intra_process;
switch (options.use_intra_process_comm) {
case IntraProcessSetting::Enable:
use_intra_process = true;
break;
case IntraProcessSetting::Disable:
use_intra_process = false;
break;
case IntraProcessSetting::NodeDefault:
use_intra_process = node_base.get_use_intra_process_default();
break;
default:
throw std::runtime_error("Unrecognized IntraProcessSetting value");
break;
}
return use_intra_process;
}
} // namespace detail
} // namespace rclcpp
#endif // RCLCPP__DETAIL__RESOLVE_USE_INTRA_PROCESS_HPP_

View file

@ -192,16 +192,18 @@ public:
* \param[in] msg_mem_strat The message memory strategy to use for allocating messages.
* \return Shared pointer to the created subscription.
*/
/* TODO(jacquelinekay):
Windows build breaks when static member function passed as default
argument to msg_mem_strat, nullptr is a workaround.
*/
template<
typename MessageT,
typename CallbackT,
typename AllocatorT = std::allocator<void>,
typename SubscriptionT = rclcpp::Subscription<
typename rclcpp::subscription_traits::has_message_type<CallbackT>::type, AllocatorT>>
typename CallbackMessageT =
typename rclcpp::subscription_traits::has_message_type<CallbackT>::type,
typename SubscriptionT = rclcpp::Subscription<CallbackMessageT, AllocatorT>,
typename MessageMemoryStrategyT = rclcpp::message_memory_strategy::MessageMemoryStrategy<
CallbackMessageT,
AllocatorT
>
>
std::shared_ptr<SubscriptionT>
create_subscription(
const std::string & topic_name,
@ -209,10 +211,10 @@ public:
CallbackT && callback,
const SubscriptionOptionsWithAllocator<AllocatorT> & options =
SubscriptionOptionsWithAllocator<AllocatorT>(),
typename rclcpp::message_memory_strategy::MessageMemoryStrategy<
typename rclcpp::subscription_traits::has_message_type<CallbackT>::type, AllocatorT
>::SharedPtr
msg_mem_strat = nullptr);
typename MessageMemoryStrategyT::SharedPtr msg_mem_strat = (
MessageMemoryStrategyT::create_default()
)
);
/// Create a timer.
/**

View file

@ -83,16 +83,16 @@ template<
typename MessageT,
typename CallbackT,
typename AllocatorT,
typename SubscriptionT>
typename CallbackMessageT,
typename SubscriptionT,
typename MessageMemoryStrategyT>
std::shared_ptr<SubscriptionT>
Node::create_subscription(
const std::string & topic_name,
const rclcpp::QoS & qos,
CallbackT && callback,
const SubscriptionOptionsWithAllocator<AllocatorT> & options,
typename rclcpp::message_memory_strategy::MessageMemoryStrategy<
typename rclcpp::subscription_traits::has_message_type<CallbackT>::type, AllocatorT>::SharedPtr
msg_mem_strat)
typename MessageMemoryStrategyT::SharedPtr msg_mem_strat)
{
return rclcpp::create_subscription<MessageT>(
*this,

View file

@ -49,8 +49,7 @@ public:
create_publisher(
const std::string & topic_name,
const rclcpp::PublisherFactory & publisher_factory,
const rcl_publisher_options_t & publisher_options,
bool use_intra_process) override;
const rclcpp::QoS & qos) override;
RCLCPP_PUBLIC
void
@ -63,8 +62,7 @@ public:
create_subscription(
const std::string & topic_name,
const rclcpp::SubscriptionFactory & subscription_factory,
const rcl_subscription_options_t & subscription_options,
bool use_intra_process) override;
const rclcpp::QoS & qos) override;
RCLCPP_PUBLIC
void

View file

@ -50,8 +50,7 @@ public:
create_publisher(
const std::string & topic_name,
const rclcpp::PublisherFactory & publisher_factory,
const rcl_publisher_options_t & publisher_options,
bool use_intra_process) = 0;
const rclcpp::QoS & qos) = 0;
RCLCPP_PUBLIC
virtual
@ -66,8 +65,7 @@ public:
create_subscription(
const std::string & topic_name,
const rclcpp::SubscriptionFactory & subscription_factory,
const rcl_subscription_options_t & subscription_options,
bool use_intra_process) = 0;
const rclcpp::QoS & qos) = 0;
RCLCPP_PUBLIC
virtual

View file

@ -32,9 +32,12 @@
#include "rclcpp/allocator/allocator_common.hpp"
#include "rclcpp/allocator/allocator_deleter.hpp"
#include "rclcpp/detail/resolve_use_intra_process.hpp"
#include "rclcpp/intra_process_manager.hpp"
#include "rclcpp/macros.hpp"
#include "rclcpp/node_interfaces/node_base_interface.hpp"
#include "rclcpp/publisher_base.hpp"
#include "rclcpp/publisher_options.hpp"
#include "rclcpp/type_support_decl.hpp"
#include "rclcpp/visibility_control.hpp"
@ -42,40 +45,78 @@ namespace rclcpp
{
/// A publisher publishes messages of any type to a topic.
template<typename MessageT, typename Alloc = std::allocator<void>>
template<typename MessageT, typename AllocatorT = std::allocator<void>>
class Publisher : public PublisherBase
{
public:
using MessageAllocTraits = allocator::AllocRebind<MessageT, Alloc>;
using MessageAlloc = typename MessageAllocTraits::allocator_type;
using MessageDeleter = allocator::Deleter<MessageAlloc, MessageT>;
using MessageAllocatorTraits = allocator::AllocRebind<MessageT, AllocatorT>;
using MessageAllocator = typename MessageAllocatorTraits::allocator_type;
using MessageDeleter = allocator::Deleter<MessageAllocator, MessageT>;
using MessageUniquePtr = std::unique_ptr<MessageT, MessageDeleter>;
using MessageSharedPtr = std::shared_ptr<const MessageT>;
RCLCPP_SMART_PTR_DEFINITIONS(Publisher<MessageT, Alloc>)
RCLCPP_SMART_PTR_DEFINITIONS(Publisher<MessageT, AllocatorT>)
Publisher(
rclcpp::node_interfaces::NodeBaseInterface * node_base,
const std::string & topic,
const rcl_publisher_options_t & publisher_options,
const PublisherEventCallbacks & event_callbacks,
const std::shared_ptr<MessageAlloc> & allocator)
const rclcpp::QoS & qos,
const rclcpp::PublisherOptionsWithAllocator<AllocatorT> & options)
: PublisherBase(
node_base,
topic,
*rosidl_typesupport_cpp::get_message_type_support_handle<MessageT>(),
publisher_options),
message_allocator_(allocator)
options.template to_rcl_publisher_options<MessageT>(qos)),
options_(options),
message_allocator_(new MessageAllocator(*options.get_allocator().get()))
{
allocator::set_allocator_for_deleter(&message_deleter_, message_allocator_.get());
if (event_callbacks.deadline_callback) {
if (options_.event_callbacks.deadline_callback) {
this->add_event_handler(
event_callbacks.deadline_callback,
options_.event_callbacks.deadline_callback,
RCL_PUBLISHER_OFFERED_DEADLINE_MISSED);
}
if (event_callbacks.liveliness_callback) {
this->add_event_handler(event_callbacks.liveliness_callback, RCL_PUBLISHER_LIVELINESS_LOST);
if (options_.event_callbacks.liveliness_callback) {
this->add_event_handler(
options_.event_callbacks.liveliness_callback,
RCL_PUBLISHER_LIVELINESS_LOST);
}
// Setup continues in the post construction method, post_init_setup().
}
/// Called post construction, so that construction may continue after shared_from_this() works.
virtual
void
post_init_setup(
rclcpp::node_interfaces::NodeBaseInterface * node_base,
const std::string & topic,
const rclcpp::QoS & qos,
const rclcpp::PublisherOptionsWithAllocator<AllocatorT> & options)
{
// Topic is unused for now.
(void)topic;
// If needed, setup intra process communication.
if (rclcpp::detail::resolve_use_intra_process(options_, *node_base)) {
auto context = node_base->get_context();
// Get the intra process manager instance for this context.
auto ipm = context->get_sub_context<rclcpp::intra_process_manager::IntraProcessManager>();
// Register the publisher with the intra process manager.
if (qos.get_rmw_qos_profile().history == RMW_QOS_POLICY_HISTORY_KEEP_ALL) {
throw std::invalid_argument(
"intraprocess communication is not allowed with keep all history qos policy");
}
if (qos.get_rmw_qos_profile().depth == 0) {
throw std::invalid_argument(
"intraprocess communication is not allowed with a zero qos history depth value");
}
uint64_t intra_process_publisher_id = ipm->add_publisher(this->shared_from_this());
this->setup_intra_process(
intra_process_publisher_id,
ipm,
options.template to_rcl_publisher_options<MessageT>(qos));
}
}
@ -87,7 +128,7 @@ public:
{
return mapped_ring_buffer::MappedRingBuffer<
MessageT,
typename Publisher<MessageT, Alloc>::MessageAlloc
typename Publisher<MessageT, AllocatorT>::MessageAllocator
>::make_shared(size, this->get_allocator());
}
@ -138,8 +179,8 @@ public:
// Otherwise we have to allocate memory in a unique_ptr and pass it along.
// As the message is not const, a copy should be made.
// A shared_ptr<const MessageT> could also be constructed here.
auto ptr = MessageAllocTraits::allocate(*message_allocator_.get(), 1);
MessageAllocTraits::construct(*message_allocator_.get(), ptr, msg);
auto ptr = MessageAllocatorTraits::allocate(*message_allocator_.get(), 1);
MessageAllocatorTraits::construct(*message_allocator_.get(), ptr, msg);
MessageUniquePtr unique_msg(ptr, message_deleter_);
this->publish(std::move(unique_msg));
}
@ -150,7 +191,8 @@ public:
return this->do_serialized_publish(&serialized_msg);
}
std::shared_ptr<MessageAlloc> get_allocator() const
std::shared_ptr<MessageAllocator>
get_allocator() const
{
return message_allocator_;
}
@ -224,7 +266,7 @@ protected:
throw std::runtime_error("cannot publisher msg which is a null pointer");
}
uint64_t message_seq =
ipm->template store_intra_process_message<MessageT, Alloc>(publisher_id, msg);
ipm->template store_intra_process_message<MessageT, AllocatorT>(publisher_id, msg);
return message_seq;
}
@ -242,11 +284,13 @@ protected:
throw std::runtime_error("cannot publisher msg which is a null pointer");
}
uint64_t message_seq =
ipm->template store_intra_process_message<MessageT, Alloc>(publisher_id, std::move(msg));
ipm->template store_intra_process_message<MessageT, AllocatorT>(publisher_id, std::move(msg));
return message_seq;
}
std::shared_ptr<MessageAlloc> message_allocator_;
const rclcpp::PublisherOptionsWithAllocator<AllocatorT> options_;
std::shared_ptr<MessageAllocator> message_allocator_;
MessageDeleter message_deleter_;
};

View file

@ -52,7 +52,7 @@ namespace intra_process_manager
class IntraProcessManager;
}
class PublisherBase
class PublisherBase : public std::enable_shared_from_this<PublisherBase>
{
friend ::rclcpp::node_interfaces::NodeTopicsInterface;
@ -184,8 +184,9 @@ public:
/// Implementation utility function that creates a typed mapped ring buffer.
RCLCPP_PUBLIC
virtual
mapped_ring_buffer::MappedRingBufferBase::SharedPtr
virtual make_mapped_ring_buffer(size_t size) const;
make_mapped_ring_buffer(size_t size) const;
/// Implementation utility function used to setup intra process publishing after creation.
RCLCPP_PUBLIC

View file

@ -24,6 +24,7 @@
#include "rosidl_typesupport_cpp/message_type_support.hpp"
#include "rclcpp/publisher.hpp"
#include "rclcpp/publisher_options.hpp"
#include "rclcpp/intra_process_manager.hpp"
#include "rclcpp/node_interfaces/node_base_interface.hpp"
#include "rclcpp/visibility_control.hpp"
@ -41,6 +42,9 @@ namespace rclcpp
* called from a templated "create_publisher" method on the Node class, and
* is passed to the non-templated "create_publisher" method on the NodeTopics
* class where it is used to create and setup the Publisher.
*
* It also handles the two step construction of Publishers, first calling
* the constructor and then the post_init_setup() method.
*/
struct PublisherFactory
{
@ -49,38 +53,32 @@ struct PublisherFactory
rclcpp::PublisherBase::SharedPtr(
rclcpp::node_interfaces::NodeBaseInterface * node_base,
const std::string & topic_name,
const rcl_publisher_options_t & publisher_options)>;
const rclcpp::QoS & qos
)>;
PublisherFactoryFunction create_typed_publisher;
const PublisherFactoryFunction create_typed_publisher;
};
/// Return a PublisherFactory with functions setup for creating a PublisherT<MessageT, Alloc>.
template<typename MessageT, typename Alloc, typename PublisherT>
/// Return a PublisherFactory with functions setup for creating a PublisherT<MessageT, AllocatorT>.
template<typename MessageT, typename AllocatorT, typename PublisherT>
PublisherFactory
create_publisher_factory(
const PublisherEventCallbacks & event_callbacks,
std::shared_ptr<Alloc> allocator)
create_publisher_factory(const rclcpp::PublisherOptionsWithAllocator<AllocatorT> & options)
{
PublisherFactory factory;
PublisherFactory factory {
// factory function that creates a MessageT specific PublisherT
factory.create_typed_publisher =
[event_callbacks, allocator](
[options](
rclcpp::node_interfaces::NodeBaseInterface * node_base,
const std::string & topic_name,
const rcl_publisher_options_t & publisher_options
const rclcpp::QoS & qos
) -> std::shared_ptr<PublisherT>
{
auto options_copy = publisher_options;
auto message_alloc = std::make_shared<typename PublisherT::MessageAlloc>(*allocator.get());
options_copy.allocator = allocator::get_rcl_allocator<MessageT>(*message_alloc.get());
return std::make_shared<PublisherT>(
node_base,
topic_name,
options_copy,
event_callbacks,
message_alloc);
auto publisher = std::make_shared<PublisherT>(node_base, topic_name, qos, options);
// This is used for setting up things like intra process comms which
// require this->shared_from_this() which cannot be called from
// the constructor.
publisher->post_init_setup(node_base, topic_name, qos, options);
return publisher;
}
};
// return the factory now that it is populated

View file

@ -19,16 +19,21 @@
#include <string>
#include <vector>
#include "rclcpp/callback_group.hpp"
#include "rcl/publisher.h"
#include "rclcpp/allocator/allocator_common.hpp"
#include "rclcpp/intra_process_setting.hpp"
#include "rclcpp/qos.hpp"
#include "rclcpp/qos_event.hpp"
#include "rclcpp/visibility_control.hpp"
#include "rcl/publisher.h"
namespace rclcpp
{
namespace callback_group
{
class CallbackGroup;
} // namespace callback_group
/// Non-templated part of PublisherOptionsWithAllocator<Allocator>.
struct PublisherOptionsBase
{
@ -39,7 +44,7 @@ struct PublisherOptionsBase
PublisherEventCallbacks event_callbacks;
/// Callback group in which the waitable items from the publisher should be placed.
rclcpp::callback_group::CallbackGroup::SharedPtr callback_group;
std::shared_ptr<rclcpp::callback_group::CallbackGroup> callback_group;
};
/// Structure containing optional configuration for Publishers.
@ -64,11 +69,26 @@ struct PublisherOptionsWithAllocator : public PublisherOptionsBase
rcl_publisher_options_t result;
using AllocatorTraits = std::allocator_traits<Allocator>;
using MessageAllocatorT = typename AllocatorTraits::template rebind_alloc<MessageT>;
auto message_alloc = std::make_shared<MessageAllocatorT>(*allocator.get());
result.allocator = allocator::get_rcl_allocator<MessageT>(*message_alloc);
auto message_alloc = std::make_shared<MessageAllocatorT>(*this->get_allocator().get());
result.allocator = rclcpp::allocator::get_rcl_allocator<MessageT>(*message_alloc);
result.qos = qos.get_rmw_qos_profile();
return result;
}
/// Get the allocator, creating one if needed.
std::shared_ptr<Allocator>
get_allocator() const
{
if (!this->allocator) {
// TODO(wjwwood): I would like to use the commented line instead, but
// cppcheck 1.89 fails with:
// Syntax Error: AST broken, binary operator '>' doesn't have two operands.
// return std::make_shared<Allocator>();
std::shared_ptr<Allocator> tmp(new Allocator());
return tmp;
}
return this->allocator;
}
};
using PublisherOptions = PublisherOptionsWithAllocator<std::allocator<void>>;

View file

@ -32,13 +32,16 @@
#include "rcl_interfaces/msg/intra_process_message.hpp"
#include "rclcpp/any_subscription_callback.hpp"
#include "rclcpp/detail/resolve_use_intra_process.hpp"
#include "rclcpp/exceptions.hpp"
#include "rclcpp/expand_topic_or_service_name.hpp"
#include "rclcpp/intra_process_manager.hpp"
#include "rclcpp/logging.hpp"
#include "rclcpp/macros.hpp"
#include "rclcpp/message_memory_strategy.hpp"
#include "rclcpp/node_interfaces/node_base_interface.hpp"
#include "rclcpp/subscription_base.hpp"
#include "rclcpp/subscription_options.hpp"
#include "rclcpp/subscription_traits.hpp"
#include "rclcpp/type_support_decl.hpp"
#include "rclcpp/visibility_control.hpp"
@ -55,15 +58,19 @@ class NodeTopicsInterface;
/// Subscription implementation, templated on the type of message this subscription receives.
template<
typename CallbackMessageT,
typename Alloc = std::allocator<void>>
typename AllocatorT = std::allocator<void>,
typename MessageMemoryStrategyT = rclcpp::message_memory_strategy::MessageMemoryStrategy<
CallbackMessageT,
AllocatorT
>>
class Subscription : public SubscriptionBase
{
friend class rclcpp::node_interfaces::NodeTopicsInterface;
public:
using MessageAllocTraits = allocator::AllocRebind<CallbackMessageT, Alloc>;
using MessageAlloc = typename MessageAllocTraits::allocator_type;
using MessageDeleter = allocator::Deleter<MessageAlloc, CallbackMessageT>;
using MessageAllocatorTraits = allocator::AllocRebind<CallbackMessageT, AllocatorT>;
using MessageAllocator = typename MessageAllocatorTraits::allocator_type;
using MessageDeleter = allocator::Deleter<MessageAllocator, CallbackMessageT>;
using ConstMessageSharedPtr = std::shared_ptr<const CallbackMessageT>;
using MessageUniquePtr = std::unique_ptr<CallbackMessageT, MessageDeleter>;
@ -71,46 +78,66 @@ public:
/// Default constructor.
/**
* The constructor for a subscription is almost never called directly. Instead, subscriptions
* should be instantiated through Node::create_subscription.
* \param[in] node_handle rcl representation of the node that owns this subscription.
* The constructor for a subscription is almost never called directly.
* Instead, subscriptions should be instantiated through the function
* rclcpp::create_subscription().
*
* \param[in] node_base NodeBaseInterface pointer that is used in part of the setup.
* \param[in] type_support_handle rosidl type support struct, for the Message type of the topic.
* \param[in] topic_name Name of the topic to subscribe to.
* \param[in] subscription_options options for the subscription.
* \param[in] callback User defined callback to call when a message is received.
* \param[in] memory_strategy The memory strategy to be used for managing message memory.
* \param[in] options options for the subscription.
* \param[in] message_memory_strategy The memory strategy to be used for managing message memory.
*/
Subscription(
std::shared_ptr<rcl_node_t> node_handle,
rclcpp::node_interfaces::NodeBaseInterface * node_base,
const rosidl_message_type_support_t & type_support_handle,
const std::string & topic_name,
const rcl_subscription_options_t & subscription_options,
AnySubscriptionCallback<CallbackMessageT, Alloc> callback,
const SubscriptionEventCallbacks & event_callbacks,
typename message_memory_strategy::MessageMemoryStrategy<CallbackMessageT, Alloc>::SharedPtr
memory_strategy = message_memory_strategy::MessageMemoryStrategy<CallbackMessageT,
Alloc>::create_default())
const rclcpp::QoS & qos,
AnySubscriptionCallback<CallbackMessageT, AllocatorT> callback,
const rclcpp::SubscriptionOptionsWithAllocator<AllocatorT> & options,
typename MessageMemoryStrategyT::SharedPtr message_memory_strategy)
: SubscriptionBase(
node_handle,
node_base,
type_support_handle,
topic_name,
subscription_options,
options.template to_rcl_subscription_options<CallbackMessageT>(qos),
rclcpp::subscription_traits::is_serialized_subscription_argument<CallbackMessageT>::value),
any_callback_(callback),
message_memory_strategy_(memory_strategy)
message_memory_strategy_(message_memory_strategy)
{
if (event_callbacks.deadline_callback) {
if (options.event_callbacks.deadline_callback) {
this->add_event_handler(
event_callbacks.deadline_callback,
options.event_callbacks.deadline_callback,
RCL_SUBSCRIPTION_REQUESTED_DEADLINE_MISSED);
}
if (event_callbacks.liveliness_callback) {
if (options.event_callbacks.liveliness_callback) {
this->add_event_handler(
event_callbacks.liveliness_callback,
options.event_callbacks.liveliness_callback,
RCL_SUBSCRIPTION_LIVELINESS_CHANGED);
}
}
/// Called after construction to continue setup that requires shared_from_this().
void
post_init_setup(
rclcpp::node_interfaces::NodeBaseInterface * node_base,
const rclcpp::QoS & qos,
const rclcpp::SubscriptionOptionsWithAllocator<AllocatorT> & options)
{
// Setup intra process publishing if requested.
if (rclcpp::detail::resolve_use_intra_process(options, *node_base)) {
auto context = node_base->get_context();
using rclcpp::intra_process_manager::IntraProcessManager;
auto ipm = context->get_sub_context<IntraProcessManager>();
uint64_t intra_process_subscription_id = ipm->add_subscription(this->shared_from_this());
this->setup_intra_process(
intra_process_subscription_id,
ipm,
options.template to_rcl_subscription_options<CallbackMessageT>(qos));
}
}
/// Support dynamically setting the message memory strategy.
/**
* Behavior may be undefined if called while the subscription could be executing.
@ -118,7 +145,7 @@ public:
*/
void set_message_memory_strategy(
typename message_memory_strategy::MessageMemoryStrategy<CallbackMessageT,
Alloc>::SharedPtr message_memory_strategy)
AllocatorT>::SharedPtr message_memory_strategy)
{
message_memory_strategy_ = message_memory_strategy;
}
@ -237,7 +264,7 @@ private:
throw std::runtime_error(
"intra process take called after destruction of intra process manager");
}
ipm->template take_intra_process_message<CallbackMessageT, Alloc>(
ipm->template take_intra_process_message<CallbackMessageT, AllocatorT>(
publisher_id, message_sequence, subscription_id, message);
}
@ -253,7 +280,7 @@ private:
throw std::runtime_error(
"intra process take called after destruction of intra process manager");
}
ipm->template take_intra_process_message<CallbackMessageT, Alloc>(
ipm->template take_intra_process_message<CallbackMessageT, AllocatorT>(
publisher_id, message_sequence, subscription_id, message);
}
@ -274,8 +301,8 @@ private:
RCLCPP_DISABLE_COPY(Subscription)
AnySubscriptionCallback<CallbackMessageT, Alloc> any_callback_;
typename message_memory_strategy::MessageMemoryStrategy<CallbackMessageT, Alloc>::SharedPtr
AnySubscriptionCallback<CallbackMessageT, AllocatorT> any_callback_;
typename message_memory_strategy::MessageMemoryStrategy<CallbackMessageT, AllocatorT>::SharedPtr
message_memory_strategy_;
};

View file

@ -36,7 +36,7 @@ namespace rclcpp
namespace node_interfaces
{
class NodeTopicsInterface;
class NodeBaseInterface;
} // namespace node_interfaces
namespace intra_process_manager
@ -50,14 +50,14 @@ class IntraProcessManager;
/// Virtual base class for subscriptions. This pattern allows us to iterate over different template
/// specializations of Subscription, among other things.
class SubscriptionBase
class SubscriptionBase : public std::enable_shared_from_this<SubscriptionBase>
{
public:
RCLCPP_SMART_PTR_DEFINITIONS_NOT_COPYABLE(SubscriptionBase)
/// Default constructor.
/**
* \param[in] node_handle The rcl representation of the node that owns this subscription.
* \param[in] node_base NodeBaseInterface pointer used in parts of the setup.
* \param[in] type_support_handle rosidl type support struct, for the Message type of the topic.
* \param[in] topic_name Name of the topic to subscribe to.
* \param[in] subscription_options options for the subscription.
@ -65,7 +65,7 @@ public:
*/
RCLCPP_PUBLIC
SubscriptionBase(
std::shared_ptr<rcl_node_t> node_handle,
rclcpp::node_interfaces::NodeBaseInterface * node_base,
const rosidl_message_type_support_t & type_support_handle,
const std::string & topic_name,
const rcl_subscription_options_t & subscription_options,
@ -114,12 +114,16 @@ public:
/// Borrow a new message.
/** \return Shared pointer to the fresh message. */
virtual std::shared_ptr<void>
RCLCPP_PUBLIC
virtual
std::shared_ptr<void>
create_message() = 0;
/// Borrow a new serialized message
/** \return Shared pointer to a rcl_message_serialized_t. */
virtual std::shared_ptr<rcl_serialized_message_t>
RCLCPP_PUBLIC
virtual
std::shared_ptr<rcl_serialized_message_t>
create_serialized_message() = 0;
/// Check if we need to handle the message, and execute the callback if we do.
@ -127,27 +131,37 @@ public:
* \param[in] message Shared pointer to the message to handle.
* \param[in] message_info Metadata associated with this message.
*/
virtual void
RCLCPP_PUBLIC
virtual
void
handle_message(std::shared_ptr<void> & message, const rmw_message_info_t & message_info) = 0;
/// Return the message borrowed in create_message.
/** \param[in] message Shared pointer to the returned message. */
virtual void
RCLCPP_PUBLIC
virtual
void
return_message(std::shared_ptr<void> & message) = 0;
/// Return the message borrowed in create_serialized_message.
/** \param[in] message Shared pointer to the returned message. */
virtual void
RCLCPP_PUBLIC
virtual
void
return_serialized_message(std::shared_ptr<rcl_serialized_message_t> & message) = 0;
virtual void
RCLCPP_PUBLIC
virtual
void
handle_intra_process_message(
rcl_interfaces::msg::IntraProcessMessage & ipm,
const rmw_message_info_t & message_info) = 0;
RCLCPP_PUBLIC
const rosidl_message_type_support_t &
get_message_type_support_handle() const;
RCLCPP_PUBLIC
bool
is_serialized() const;
@ -161,7 +175,9 @@ public:
std::weak_ptr<rclcpp::intra_process_manager::IntraProcessManager>;
/// Implemenation detail.
void setup_intra_process(
RCLCPP_PUBLIC
void
setup_intra_process(
uint64_t intra_process_subscription_id,
IntraProcessManagerWeakPtr weak_ipm,
const rcl_subscription_options_t & intra_process_options);

View file

@ -43,6 +43,9 @@ namespace rclcpp
* usually called from a templated "create_subscription" method of the Node
* class, and is passed to the non-templated "create_subscription" method of
* the NodeTopics class where it is used to create and setup the Subscription.
*
* It also handles the two step construction of Subscriptions, first calling
* the constructor and then the post_init_setup() method.
*/
struct SubscriptionFactory
{
@ -51,70 +54,61 @@ struct SubscriptionFactory
rclcpp::SubscriptionBase::SharedPtr(
rclcpp::node_interfaces::NodeBaseInterface * node_base,
const std::string & topic_name,
const rcl_subscription_options_t & subscription_options)>;
const rclcpp::QoS & qos)>;
SubscriptionFactoryFunction create_typed_subscription;
// Function that takes a MessageT from the intra process manager
using SetupIntraProcessFunction = std::function<
void (
rclcpp::intra_process_manager::IntraProcessManager::SharedPtr ipm,
rclcpp::SubscriptionBase::SharedPtr subscription,
const rcl_subscription_options_t & subscription_options)>;
SetupIntraProcessFunction setup_intra_process;
const SubscriptionFactoryFunction create_typed_subscription;
};
/// Return a SubscriptionFactory with functions for creating a SubscriptionT<MessageT, Alloc>.
template<
typename MessageT,
typename CallbackT,
typename Alloc,
typename CallbackMessageT,
typename SubscriptionT>
typename AllocatorT,
typename CallbackMessageT =
typename rclcpp::subscription_traits::has_message_type<CallbackT>::type,
typename SubscriptionT = rclcpp::Subscription<CallbackMessageT, AllocatorT>,
typename MessageMemoryStrategyT = rclcpp::message_memory_strategy::MessageMemoryStrategy<
CallbackMessageT,
AllocatorT
>>
SubscriptionFactory
create_subscription_factory(
CallbackT && callback,
const SubscriptionEventCallbacks & event_callbacks,
typename rclcpp::message_memory_strategy::MessageMemoryStrategy<
CallbackMessageT, Alloc>::SharedPtr
msg_mem_strat,
std::shared_ptr<Alloc> allocator)
const rclcpp::SubscriptionOptionsWithAllocator<AllocatorT> & options,
typename MessageMemoryStrategyT::SharedPtr msg_mem_strat)
{
SubscriptionFactory factory;
auto allocator = options.get_allocator();
using rclcpp::AnySubscriptionCallback;
AnySubscriptionCallback<CallbackMessageT, Alloc> any_subscription_callback(allocator);
AnySubscriptionCallback<CallbackMessageT, AllocatorT> any_subscription_callback(allocator);
any_subscription_callback.set(std::forward<CallbackT>(callback));
auto message_alloc =
std::make_shared<typename Subscription<CallbackMessageT, Alloc>::MessageAlloc>();
SubscriptionFactory factory {
// factory function that creates a MessageT specific SubscriptionT
factory.create_typed_subscription =
[allocator, msg_mem_strat, any_subscription_callback, event_callbacks, message_alloc](
[options, msg_mem_strat, any_subscription_callback](
rclcpp::node_interfaces::NodeBaseInterface * node_base,
const std::string & topic_name,
const rcl_subscription_options_t & subscription_options
const rclcpp::QoS & qos
) -> rclcpp::SubscriptionBase::SharedPtr
{
auto options_copy = subscription_options;
options_copy.allocator =
rclcpp::allocator::get_rcl_allocator<CallbackMessageT>(*message_alloc.get());
using rclcpp::Subscription;
using rclcpp::SubscriptionBase;
auto sub = Subscription<CallbackMessageT, Alloc>::make_shared(
node_base->get_shared_rcl_node_handle(),
auto sub = Subscription<CallbackMessageT, AllocatorT>::make_shared(
node_base,
*rosidl_typesupport_cpp::get_message_type_support_handle<MessageT>(),
topic_name,
options_copy,
qos,
any_subscription_callback,
event_callbacks,
options,
msg_mem_strat);
// This is used for setting up things like intra process comms which
// require this->shared_from_this() which cannot be called from
// the constructor.
sub->post_init_setup(node_base, qos, options);
auto sub_base_ptr = std::dynamic_pointer_cast<SubscriptionBase>(sub);
return sub_base_ptr;
}
};
// return the factory now that it is populated

View file

@ -70,6 +70,16 @@ struct SubscriptionOptionsWithAllocator : public SubscriptionOptionsBase
result.qos = qos.get_rmw_qos_profile();
return result;
}
/// Get the allocator, creating one if needed.
std::shared_ptr<Allocator>
get_allocator() const
{
if (!this->allocator) {
return std::make_shared<Allocator>();
}
return this->allocator;
}
};
using SubscriptionOptions = SubscriptionOptionsWithAllocator<std::allocator<void>>;

View file

@ -34,36 +34,10 @@ rclcpp::PublisherBase::SharedPtr
NodeTopics::create_publisher(
const std::string & topic_name,
const rclcpp::PublisherFactory & publisher_factory,
const rcl_publisher_options_t & publisher_options,
bool use_intra_process)
const rclcpp::QoS & qos)
{
// Create the MessageT specific Publisher using the factory, but store it as PublisherBase.
auto publisher = publisher_factory.create_typed_publisher(
node_base_, topic_name, publisher_options);
// Setup intra process publishing if requested.
if (use_intra_process) {
auto context = node_base_->get_context();
// Get the intra process manager instance for this context.
auto ipm = context->get_sub_context<rclcpp::intra_process_manager::IntraProcessManager>();
// Register the publisher with the intra process manager.
if (publisher_options.qos.history == RMW_QOS_POLICY_HISTORY_KEEP_ALL) {
throw std::invalid_argument(
"intraprocess communication is not allowed with keep all history qos policy");
}
if (publisher_options.qos.depth == 0) {
throw std::invalid_argument(
"intraprocess communication is not allowed with a zero qos history depth value");
}
uint64_t intra_process_publisher_id = ipm->add_publisher(publisher);
publisher->setup_intra_process(
intra_process_publisher_id,
ipm,
publisher_options);
}
// Return the completed publisher.
return publisher;
// Create the MessageT specific Publisher using the factory, but return it as PublisherBase.
return publisher_factory.create_typed_publisher(node_base_, topic_name, qos);
}
void
@ -99,25 +73,10 @@ rclcpp::SubscriptionBase::SharedPtr
NodeTopics::create_subscription(
const std::string & topic_name,
const rclcpp::SubscriptionFactory & subscription_factory,
const rcl_subscription_options_t & subscription_options,
bool use_intra_process)
const rclcpp::QoS & qos)
{
auto subscription = subscription_factory.create_typed_subscription(
node_base_, topic_name, subscription_options);
// Setup intra process publishing if requested.
if (use_intra_process) {
auto context = node_base_->get_context();
auto ipm =
context->get_sub_context<rclcpp::intra_process_manager::IntraProcessManager>();
uint64_t intra_process_subscription_id = ipm->add_subscription(subscription);
auto options_copy = subscription_options;
options_copy.ignore_local_publications = false;
subscription->setup_intra_process(intra_process_subscription_id, ipm, options_copy);
}
// Return the completed subscription.
return subscription;
// Create the MessageT specific Subscription using the factory, but return a SubscriptionBase.
return subscription_factory.create_typed_subscription(node_base_, topic_name, qos);
}
void

View file

@ -23,6 +23,7 @@
#include "rclcpp/expand_topic_or_service_name.hpp"
#include "rclcpp/intra_process_manager.hpp"
#include "rclcpp/logging.hpp"
#include "rclcpp/node_interfaces/node_base_interface.hpp"
#include "rmw/error_handling.h"
#include "rmw/rmw.h"
@ -30,18 +31,18 @@
using rclcpp::SubscriptionBase;
SubscriptionBase::SubscriptionBase(
std::shared_ptr<rcl_node_t> node_handle,
rclcpp::node_interfaces::NodeBaseInterface * node_base,
const rosidl_message_type_support_t & type_support_handle,
const std::string & topic_name,
const rcl_subscription_options_t & subscription_options,
bool is_serialized)
: node_handle_(node_handle),
: node_handle_(node_base->get_shared_rcl_node_handle()),
use_intra_process_(false),
intra_process_subscription_id_(0),
type_support_(type_support_handle),
is_serialized_(is_serialized)
{
auto custom_deletor = [node_handle](rcl_subscription_t * rcl_subs)
auto custom_deletor = [node_handle = this->node_handle_](rcl_subscription_t * rcl_subs)
{
if (rcl_subscription_fini(rcl_subs, node_handle.get()) != RCL_RET_OK) {
RCLCPP_ERROR(
@ -167,7 +168,8 @@ SubscriptionBase::get_publisher_count() const
return inter_process_publisher_count;
}
void SubscriptionBase::setup_intra_process(
void
SubscriptionBase::setup_intra_process(
uint64_t intra_process_subscription_id,
IntraProcessManagerWeakPtr weak_ipm,
const rcl_subscription_options_t & intra_process_options)

View file

@ -166,8 +166,9 @@ public:
create_publisher(
const std::string & topic_name,
const rclcpp::QoS & qos,
const PublisherOptionsWithAllocator<AllocatorT> & options =
const PublisherOptionsWithAllocator<AllocatorT> & options = (
create_default_publisher_options<AllocatorT>()
)
);
/// Create and return a Subscription.
@ -179,15 +180,17 @@ public:
* \param[in] msg_mem_strat The message memory strategy to use for allocating messages.
* \return Shared pointer to the created subscription.
*/
/* TODO(jacquelinekay):
Windows build breaks when static member function passed as default
argument to msg_mem_strat, nullptr is a workaround.
*/
template<
typename MessageT,
typename CallbackT,
typename AllocatorT = std::allocator<void>,
typename SubscriptionT = rclcpp::Subscription<MessageT, AllocatorT>>
typename CallbackMessageT =
typename rclcpp::subscription_traits::has_message_type<CallbackT>::type,
typename SubscriptionT = rclcpp::Subscription<MessageT, AllocatorT>,
typename MessageMemoryStrategyT = rclcpp::message_memory_strategy::MessageMemoryStrategy<
CallbackMessageT,
AllocatorT
>>
std::shared_ptr<SubscriptionT>
create_subscription(
const std::string & topic_name,
@ -195,10 +198,10 @@ public:
CallbackT && callback,
const SubscriptionOptionsWithAllocator<AllocatorT> & options =
create_default_subscription_options<AllocatorT>(),
typename rclcpp::message_memory_strategy::MessageMemoryStrategy<
typename rclcpp::subscription_traits::has_message_type<CallbackT>::type, AllocatorT
>::SharedPtr
msg_mem_strat = nullptr);
typename MessageMemoryStrategyT::SharedPtr msg_mem_strat = (
MessageMemoryStrategyT::create_default()
)
);
/// Create a timer.
/**

View file

@ -59,16 +59,16 @@ template<
typename MessageT,
typename CallbackT,
typename AllocatorT,
typename SubscriptionT>
typename CallbackMessageT,
typename SubscriptionT,
typename MessageMemoryStrategyT>
std::shared_ptr<SubscriptionT>
LifecycleNode::create_subscription(
const std::string & topic_name,
const rclcpp::QoS & qos,
CallbackT && callback,
const rclcpp::SubscriptionOptionsWithAllocator<AllocatorT> & options,
typename rclcpp::message_memory_strategy::MessageMemoryStrategy<
typename rclcpp::subscription_traits::has_message_type<CallbackT>::type, AllocatorT>::SharedPtr
msg_mem_strat)
typename MessageMemoryStrategyT::SharedPtr msg_mem_strat)
{
return rclcpp::create_subscription<MessageT>(
*this,

View file

@ -43,8 +43,7 @@ public:
/// brief child class of rclcpp Publisher class.
/**
* Overrides all publisher functions to check for
* enabled/disabled state.
* Overrides all publisher functions to check for enabled/disabled state.
*/
template<typename MessageT, typename Alloc = std::allocator<void>>
class LifecyclePublisher : public LifecyclePublisherInterface,
@ -61,11 +60,9 @@ public:
LifecyclePublisher(
rclcpp::node_interfaces::NodeBaseInterface * node_base,
const std::string & topic,
const rcl_publisher_options_t & publisher_options,
const rclcpp::PublisherEventCallbacks event_callbacks,
std::shared_ptr<MessageAlloc> allocator)
: rclcpp::Publisher<MessageT, Alloc>(
node_base, topic, publisher_options, event_callbacks, allocator),
const rclcpp::QoS & qos,
const rclcpp::PublisherOptionsWithAllocator<Alloc> & options)
: rclcpp::Publisher<MessageT, Alloc>(node_base, topic, qos, options),
enabled_(false),
logger_(rclcpp::get_logger("LifecyclePublisher"))
{