*_raw function (#388)

* publish_raw function

* subscription traits

* listener raw

* rebased

* cleanup and linters

* explicit test for deleter in unique_ptr

* add missing copyright

* cleanup

* add rmw_serialize functions

* linters

* explicit differentiation between take and take_raw

* cleanup debug messages

* rename to rmw_message_init`

* address comments

* address review comments

* raw->serialized

* use size_t (#497)
This commit is contained in:
Karsten Knese 2018-06-16 10:36:00 +02:00 committed by GitHub
parent 1556b6edf4
commit ec17d68b41
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
18 changed files with 607 additions and 75 deletions

View file

@ -260,6 +260,17 @@ if(BUILD_TESTING)
${PROJECT_NAME}
)
endif()
ament_add_gtest(test_serialized_message_allocator test/test_serialized_message_allocator.cpp
ENV RCL_ASSERT_RMW_ID_MATCHES=${rmw_implementation})
if(TARGET test_serialized_message_allocator)
target_include_directories(test_serialized_message_allocator PUBLIC
${test_msgs_INCLUDE_DIRS}
)
target_link_libraries(test_serialized_message_allocator
${PROJECT_NAME}
${test_msgs_LIBRARIES}
)
endif()
ament_add_gtest(test_service test/test_service.cpp)
if(TARGET test_service)
target_include_directories(test_service PUBLIC
@ -280,6 +291,16 @@ if(BUILD_TESTING)
)
target_link_libraries(test_subscription ${PROJECT_NAME})
endif()
find_package(test_msgs REQUIRED)
ament_add_gtest(test_subscription_traits test/test_subscription_traits.cpp)
if(TARGET test_subscription_traits)
target_include_directories(test_subscription_traits PUBLIC
${rcl_INCLUDE_DIRS}
)
ament_target_dependencies(test_subscription_traits
"test_msgs"
)
endif()
ament_add_gtest(test_find_weak_nodes test/test_find_weak_nodes.cpp)
if(TARGET test_find_weak_nodes)
target_include_directories(test_find_weak_nodes PUBLIC

View file

@ -26,8 +26,13 @@
namespace rclcpp
{
template<typename MessageT, typename CallbackT, typename AllocatorT, typename SubscriptionT>
typename rclcpp::Subscription<MessageT, AllocatorT>::SharedPtr
template<
typename MessageT,
typename CallbackT,
typename AllocatorT,
typename CallbackMessageT,
typename SubscriptionT = rclcpp::Subscription<CallbackMessageT, AllocatorT>>
typename std::shared_ptr<SubscriptionT>
create_subscription(
rclcpp::node_interfaces::NodeTopicsInterface * node_topics,
const std::string & topic_name,
@ -36,7 +41,8 @@ create_subscription(
rclcpp::callback_group::CallbackGroup::SharedPtr group,
bool ignore_local_publications,
bool use_intra_process_comms,
typename rclcpp::message_memory_strategy::MessageMemoryStrategy<MessageT, AllocatorT>::SharedPtr
typename rclcpp::message_memory_strategy::MessageMemoryStrategy<
CallbackMessageT, AllocatorT>::SharedPtr
msg_mem_strat,
typename std::shared_ptr<AllocatorT> allocator)
{
@ -44,8 +50,8 @@ create_subscription(
subscription_options.qos = qos_profile;
subscription_options.ignore_local_publications = ignore_local_publications;
auto factory =
rclcpp::create_subscription_factory<MessageT, CallbackT, AllocatorT, SubscriptionT>(
auto factory = rclcpp::create_subscription_factory
<MessageT, CallbackT, AllocatorT, CallbackMessageT, SubscriptionT>(
std::forward<CallbackT>(callback), msg_mem_strat, allocator);
auto sub = node_topics->create_subscription(

View file

@ -18,10 +18,15 @@
#include <memory>
#include <stdexcept>
#include "rcl/types.h"
#include "rclcpp/allocator/allocator_common.hpp"
#include "rclcpp/exceptions.hpp"
#include "rclcpp/macros.hpp"
#include "rclcpp/visibility_control.hpp"
#include "rmw/serialized_message.h"
namespace rclcpp
{
namespace message_memory_strategy
@ -39,14 +44,29 @@ public:
using MessageAlloc = typename MessageAllocTraits::allocator_type;
using MessageDeleter = allocator::Deleter<MessageAlloc, MessageT>;
using SerializedMessageAllocTraits = allocator::AllocRebind<rcl_serialized_message_t, Alloc>;
using SerializedMessageAlloc = typename SerializedMessageAllocTraits::allocator_type;
using SerializedMessageDeleter =
allocator::Deleter<SerializedMessageAlloc, rcl_serialized_message_t>;
using BufferAllocTraits = allocator::AllocRebind<char, Alloc>;
using BufferAlloc = typename BufferAllocTraits::allocator_type;
using BufferDeleter = allocator::Deleter<BufferAlloc, char>;
MessageMemoryStrategy()
{
message_allocator_ = std::make_shared<MessageAlloc>();
serialized_message_allocator_ = std::make_shared<SerializedMessageAlloc>();
buffer_allocator_ = std::make_shared<BufferAlloc>();
rcutils_allocator_ = allocator::get_rcl_allocator<char, BufferAlloc>(*buffer_allocator_.get());
}
explicit MessageMemoryStrategy(std::shared_ptr<Alloc> allocator)
{
message_allocator_ = std::make_shared<MessageAlloc>(*allocator.get());
serialized_message_allocator_ = std::make_shared<SerializedMessageAlloc>(*allocator.get());
buffer_allocator_ = std::make_shared<BufferAlloc>(*allocator.get());
rcutils_allocator_ = allocator::get_rcl_allocator<char, BufferAlloc>(*buffer_allocator_.get());
}
/// Default factory method
@ -62,6 +82,37 @@ public:
return std::allocate_shared<MessageT, MessageAlloc>(*message_allocator_.get());
}
virtual std::shared_ptr<rcl_serialized_message_t> borrow_serialized_message(size_t capacity)
{
auto msg = new rcl_serialized_message_t;
*msg = rmw_get_zero_initialized_serialized_message();
auto ret = rmw_serialized_message_init(msg, capacity, &rcutils_allocator_);
if (ret != RCL_RET_OK) {
rclcpp::exceptions::throw_from_rcl_error(ret);
}
auto serialized_msg = std::shared_ptr<rcl_serialized_message_t>(msg,
[](rmw_serialized_message_t * msg) {
auto ret = rmw_serialized_message_fini(msg);
delete msg;
if (ret != RCL_RET_OK) {
rclcpp::exceptions::throw_from_rcl_error(ret, "leaking memory");
}
});
return serialized_msg;
}
virtual std::shared_ptr<rcl_serialized_message_t> borrow_serialized_message()
{
return borrow_serialized_message(default_buffer_capacity_);
}
virtual void set_default_buffer_capacity(size_t capacity)
{
default_buffer_capacity_ = capacity;
}
/// Release ownership of the message, which will deallocate it if it has no more owners.
/** \param[in] msg Shared pointer to the message we are returning. */
virtual void return_message(std::shared_ptr<MessageT> & msg)
@ -69,8 +120,22 @@ public:
msg.reset();
}
virtual void return_serialized_message(std::shared_ptr<rcl_serialized_message_t> & serialized_msg)
{
serialized_msg.reset();
}
std::shared_ptr<MessageAlloc> message_allocator_;
MessageDeleter message_deleter_;
std::shared_ptr<SerializedMessageAlloc> serialized_message_allocator_;
SerializedMessageDeleter serialized_message_deleter_;
std::shared_ptr<BufferAlloc> buffer_allocator_;
BufferDeleter buffer_deleter_;
size_t default_buffer_capacity_ = 0;
rcutils_allocator_t rcutils_allocator_;
};
} // namespace message_memory_strategy

View file

@ -53,6 +53,7 @@
#include "rclcpp/publisher.hpp"
#include "rclcpp/service.hpp"
#include "rclcpp/subscription.hpp"
#include "rclcpp/subscription_traits.hpp"
#include "rclcpp/time.hpp"
#include "rclcpp/timer.hpp"
#include "rclcpp/visibility_control.hpp"
@ -183,7 +184,8 @@ public:
typename MessageT,
typename CallbackT,
typename Alloc = std::allocator<void>,
typename SubscriptionT = rclcpp::Subscription<MessageT, Alloc>>
typename SubscriptionT = rclcpp::Subscription<
typename rclcpp::subscription_traits::has_message_type<CallbackT>::type, Alloc>>
std::shared_ptr<SubscriptionT>
create_subscription(
const std::string & topic_name,
@ -191,7 +193,8 @@ public:
const rmw_qos_profile_t & qos_profile = rmw_qos_profile_default,
rclcpp::callback_group::CallbackGroup::SharedPtr group = nullptr,
bool ignore_local_publications = false,
typename rclcpp::message_memory_strategy::MessageMemoryStrategy<MessageT, Alloc>::SharedPtr
typename rclcpp::message_memory_strategy::MessageMemoryStrategy<
typename rclcpp::subscription_traits::has_message_type<CallbackT>::type, Alloc>::SharedPtr
msg_mem_strat = nullptr,
std::shared_ptr<Alloc> allocator = nullptr);
@ -214,15 +217,17 @@ public:
typename MessageT,
typename CallbackT,
typename Alloc = std::allocator<void>,
typename SubscriptionT = rclcpp::Subscription<MessageT, Alloc>>
typename SubscriptionT = rclcpp::Subscription<
typename rclcpp::subscription_traits::has_message_type<CallbackT>::type, Alloc>>
std::shared_ptr<SubscriptionT>
create_subscription(
const std::string & topic_name,
size_t qos_history_depth,
CallbackT && callback,
size_t qos_history_depth,
rclcpp::callback_group::CallbackGroup::SharedPtr group = nullptr,
bool ignore_local_publications = false,
typename rclcpp::message_memory_strategy::MessageMemoryStrategy<MessageT, Alloc>::SharedPtr
typename rclcpp::message_memory_strategy::MessageMemoryStrategy<
typename rclcpp::subscription_traits::has_message_type<CallbackT>::type, Alloc>::SharedPtr
msg_mem_strat = nullptr,
std::shared_ptr<Alloc> allocator = nullptr);

View file

@ -82,7 +82,11 @@ Node::create_publisher(
allocator);
}
template<typename MessageT, typename CallbackT, typename Alloc, typename SubscriptionT>
template<
typename MessageT,
typename CallbackT,
typename Alloc,
typename SubscriptionT>
std::shared_ptr<SubscriptionT>
Node::create_subscription(
const std::string & topic_name,
@ -90,20 +94,23 @@ Node::create_subscription(
const rmw_qos_profile_t & qos_profile,
rclcpp::callback_group::CallbackGroup::SharedPtr group,
bool ignore_local_publications,
typename rclcpp::message_memory_strategy::MessageMemoryStrategy<MessageT, Alloc>::SharedPtr
typename rclcpp::message_memory_strategy::MessageMemoryStrategy<
typename rclcpp::subscription_traits::has_message_type<CallbackT>::type, Alloc>::SharedPtr
msg_mem_strat,
std::shared_ptr<Alloc> allocator)
{
using CallbackMessageT = typename rclcpp::subscription_traits::has_message_type<CallbackT>::type;
if (!allocator) {
allocator = std::make_shared<Alloc>();
}
if (!msg_mem_strat) {
using rclcpp::message_memory_strategy::MessageMemoryStrategy;
msg_mem_strat = MessageMemoryStrategy<MessageT, Alloc>::create_default();
msg_mem_strat = MessageMemoryStrategy<CallbackMessageT, Alloc>::create_default();
}
return rclcpp::create_subscription<MessageT, CallbackT, Alloc, SubscriptionT>(
return rclcpp::create_subscription<MessageT, CallbackT, Alloc, CallbackMessageT, SubscriptionT>(
this->node_topics_.get(),
topic_name,
std::forward<CallbackT>(callback),
@ -115,21 +122,26 @@ Node::create_subscription(
allocator);
}
template<typename MessageT, typename CallbackT, typename Alloc, typename SubscriptionT>
template<
typename MessageT,
typename CallbackT,
typename Alloc,
typename SubscriptionT>
std::shared_ptr<SubscriptionT>
Node::create_subscription(
const std::string & topic_name,
size_t qos_history_depth,
CallbackT && callback,
size_t qos_history_depth,
rclcpp::callback_group::CallbackGroup::SharedPtr group,
bool ignore_local_publications,
typename rclcpp::message_memory_strategy::MessageMemoryStrategy<MessageT, Alloc>::SharedPtr
typename rclcpp::message_memory_strategy::MessageMemoryStrategy<
typename rclcpp::subscription_traits::has_message_type<CallbackT>::type, Alloc>::SharedPtr
msg_mem_strat,
std::shared_ptr<Alloc> allocator)
{
rmw_qos_profile_t qos = rmw_qos_profile_default;
qos.depth = qos_history_depth;
return this->create_subscription<MessageT, CallbackT, Alloc, SubscriptionT>(
return this->create_subscription<MessageT>(
topic_name,
std::forward<CallbackT>(callback),
qos,

View file

@ -120,8 +120,9 @@ public:
auto msg_mem_strat =
MessageMemoryStrategy<rcl_interfaces::msg::ParameterEvent, Alloc>::create_default();
using rcl_interfaces::msg::ParameterEvent;
return rclcpp::create_subscription<
rcl_interfaces::msg::ParameterEvent, CallbackT, Alloc, SubscriptionT>(
ParameterEvent, CallbackT, Alloc, ParameterEvent, SubscriptionT>(
this->node_topics_interface_.get(),
"parameter_events",
std::forward<CallbackT>(callback),

View file

@ -205,11 +205,8 @@ public:
ipm.publisher_id = intra_process_publisher_id_;
ipm.message_sequence = message_seq;
auto status = rcl_publish(&intra_process_publisher_handle_, &ipm);
if (status != RCL_RET_OK) {
// *INDENT-OFF* (prevent uncrustify from making unnecessary indents here)
throw std::runtime_error(
std::string("failed to publish intra process message: ") + rcl_get_error_string_safe());
// *INDENT-ON*
if (RCL_RET_OK != status) {
rclcpp::exceptions::throw_from_rcl_error(status, "failed to publish intra process message");
}
} else {
// Always destroy the message, even if we don't consume it, for consistency.
@ -279,6 +276,19 @@ public:
return this->publish(*msg);
}
void
publish(const rcl_serialized_message_t * serialized_msg)
{
if (store_intra_process_message_) {
// TODO(Karsten1987): support serialized message passed by intraprocess
throw std::runtime_error("storing serialized messages in intra process is not supported yet");
}
auto status = rcl_publish_serialized_message(&publisher_handle_, serialized_msg);
if (RCL_RET_OK != status) {
rclcpp::exceptions::throw_from_rcl_error(status, "failed to publish serialized message");
}
}
std::shared_ptr<MessageAlloc> get_allocator() const
{
return message_allocator_;
@ -289,11 +299,8 @@ protected:
do_inter_process_publish(const MessageT * msg)
{
auto status = rcl_publish(&publisher_handle_, msg);
if (status != RCL_RET_OK) {
// *INDENT-OFF* (prevent uncrustify from making unnecessary indents here)
throw std::runtime_error(
std::string("failed to publish message: ") + rcl_get_error_string_safe());
// *INDENT-ON*
if (RCL_RET_OK != status) {
rclcpp::exceptions::throw_from_rcl_error(status, "failed to publish message");
}
}

View file

@ -29,12 +29,13 @@
#include "rcl_interfaces/msg/intra_process_message.hpp"
#include "rclcpp/any_subscription_callback.hpp"
#include "rclcpp/exceptions.hpp"
#include "rclcpp/expand_topic_or_service_name.hpp"
#include "rclcpp/macros.hpp"
#include "rclcpp/message_memory_strategy.hpp"
#include "rclcpp/any_subscription_callback.hpp"
#include "rclcpp/subscription_traits.hpp"
#include "rclcpp/type_support_decl.hpp"
#include "rclcpp/expand_topic_or_service_name.hpp"
#include "rclcpp/visibility_control.hpp"
namespace rclcpp
@ -64,7 +65,8 @@ public:
std::shared_ptr<rcl_node_t> node_handle,
const rosidl_message_type_support_t & type_support_handle,
const std::string & topic_name,
const rcl_subscription_options_t & subscription_options);
const rcl_subscription_options_t & subscription_options,
bool is_serialized = false);
/// Default destructor.
RCLCPP_PUBLIC
@ -91,6 +93,12 @@ public:
/** \return Shared pointer to the fresh message. */
virtual std::shared_ptr<void>
create_message() = 0;
/// Borrow a new serialized message
/** \return Shared pointer to a rcl_message_serialized_t. */
virtual std::shared_ptr<rcl_serialized_message_t>
create_serialized_message() = 0;
/// Check if we need to handle the message, and execute the callback if we do.
/**
* \param[in] message Shared pointer to the message to handle.
@ -104,11 +112,22 @@ public:
virtual void
return_message(std::shared_ptr<void> & message) = 0;
/// Return the message borrowed in create_serialized_message.
/** \param[in] message Shared pointer to the returned message. */
virtual void
return_serialized_message(std::shared_ptr<rcl_serialized_message_t> & message) = 0;
virtual void
handle_intra_process_message(
rcl_interfaces::msg::IntraProcessMessage & ipm,
const rmw_message_info_t & message_info) = 0;
const rosidl_message_type_support_t &
get_message_type_support_handle() const;
bool
is_serialized() const;
protected:
std::shared_ptr<rcl_subscription_t> intra_process_subscription_handle_;
std::shared_ptr<rcl_subscription_t> subscription_handle_;
@ -116,19 +135,24 @@ protected:
private:
RCLCPP_DISABLE_COPY(SubscriptionBase)
rosidl_message_type_support_t type_support_;
bool is_serialized_;
};
/// Subscription implementation, templated on the type of message this subscription receives.
template<typename MessageT, typename Alloc = std::allocator<void>>
template<
typename CallbackMessageT,
typename Alloc = std::allocator<void>>
class Subscription : public SubscriptionBase
{
friend class rclcpp::node_interfaces::NodeTopicsInterface;
public:
using MessageAllocTraits = allocator::AllocRebind<MessageT, Alloc>;
using MessageAllocTraits = allocator::AllocRebind<CallbackMessageT, Alloc>;
using MessageAlloc = typename MessageAllocTraits::allocator_type;
using MessageDeleter = allocator::Deleter<MessageAlloc, MessageT>;
using MessageUniquePtr = std::unique_ptr<MessageT, MessageDeleter>;
using MessageDeleter = allocator::Deleter<MessageAlloc, CallbackMessageT>;
using MessageUniquePtr = std::unique_ptr<CallbackMessageT, MessageDeleter>;
RCLCPP_SMART_PTR_DEFINITIONS(Subscription)
@ -144,17 +168,19 @@ public:
*/
Subscription(
std::shared_ptr<rcl_node_t> node_handle,
const rosidl_message_type_support_t & ts,
const std::string & topic_name,
const rcl_subscription_options_t & subscription_options,
AnySubscriptionCallback<MessageT, Alloc> callback,
typename message_memory_strategy::MessageMemoryStrategy<MessageT, Alloc>::SharedPtr
memory_strategy = message_memory_strategy::MessageMemoryStrategy<MessageT,
AnySubscriptionCallback<CallbackMessageT, Alloc> callback,
typename message_memory_strategy::MessageMemoryStrategy<CallbackMessageT, Alloc>::SharedPtr
memory_strategy = message_memory_strategy::MessageMemoryStrategy<CallbackMessageT,
Alloc>::create_default())
: SubscriptionBase(
node_handle,
*rosidl_typesupport_cpp::get_message_type_support_handle<MessageT>(),
ts,
topic_name,
subscription_options),
subscription_options,
rclcpp::subscription_traits::is_serialized_subscription_argument<CallbackMessageT>::value),
any_callback_(callback),
message_memory_strategy_(memory_strategy),
get_intra_process_message_callback_(nullptr),
@ -167,11 +193,12 @@ public:
* \param[in] message_memory_strategy Shared pointer to the memory strategy to set.
*/
void set_message_memory_strategy(
typename message_memory_strategy::MessageMemoryStrategy<MessageT,
typename message_memory_strategy::MessageMemoryStrategy<CallbackMessageT,
Alloc>::SharedPtr message_memory_strategy)
{
message_memory_strategy_ = message_memory_strategy;
}
std::shared_ptr<void> create_message()
{
/* The default message memory strategy provides a dynamically allocated message on each call to
@ -181,6 +208,11 @@ public:
return message_memory_strategy_->borrow_message();
}
std::shared_ptr<rcl_serialized_message_t> create_serialized_message()
{
return message_memory_strategy_->borrow_serialized_message();
}
void handle_message(std::shared_ptr<void> & message, const rmw_message_info_t & message_info)
{
if (matches_any_intra_process_publishers_) {
@ -190,7 +222,7 @@ public:
return;
}
}
auto typed_message = std::static_pointer_cast<MessageT>(message);
auto typed_message = std::static_pointer_cast<CallbackMessageT>(message);
any_callback_.dispatch(typed_message, message_info);
}
@ -198,10 +230,15 @@ public:
/** \param message message to be returned */
void return_message(std::shared_ptr<void> & message)
{
auto typed_message = std::static_pointer_cast<MessageT>(message);
auto typed_message = std::static_pointer_cast<CallbackMessageT>(message);
message_memory_strategy_->return_message(typed_message);
}
void return_serialized_message(std::shared_ptr<rcl_serialized_message_t> & message)
{
message_memory_strategy_->return_serialized_message(message);
}
void handle_intra_process_message(
rcl_interfaces::msg::IntraProcessMessage & ipm,
const rmw_message_info_t & message_info)
@ -278,8 +315,8 @@ public:
private:
RCLCPP_DISABLE_COPY(Subscription)
AnySubscriptionCallback<MessageT, Alloc> any_callback_;
typename message_memory_strategy::MessageMemoryStrategy<MessageT, Alloc>::SharedPtr
AnySubscriptionCallback<CallbackMessageT, Alloc> any_callback_;
typename message_memory_strategy::MessageMemoryStrategy<CallbackMessageT, Alloc>::SharedPtr
message_memory_strategy_;
GetMessageCallbackType get_intra_process_message_callback_;

View file

@ -25,6 +25,7 @@
#include "rosidl_typesupport_cpp/message_type_support.hpp"
#include "rclcpp/subscription.hpp"
#include "rclcpp/subscription_traits.hpp"
#include "rclcpp/intra_process_manager.hpp"
#include "rclcpp/node_interfaces/node_base_interface.hpp"
#include "rclcpp/visibility_control.hpp"
@ -65,22 +66,28 @@ struct SubscriptionFactory
};
/// Return a SubscriptionFactory with functions for creating a SubscriptionT<MessageT, Alloc>.
template<typename MessageT, typename CallbackT, typename Alloc, typename SubscriptionT>
template<
typename MessageT,
typename CallbackT,
typename Alloc,
typename CallbackMessageT,
typename SubscriptionT>
SubscriptionFactory
create_subscription_factory(
CallbackT && callback,
typename rclcpp::message_memory_strategy::MessageMemoryStrategy<MessageT, Alloc>::SharedPtr
typename rclcpp::message_memory_strategy::MessageMemoryStrategy<
CallbackMessageT, Alloc>::SharedPtr
msg_mem_strat,
std::shared_ptr<Alloc> allocator)
{
SubscriptionFactory factory;
using rclcpp::AnySubscriptionCallback;
AnySubscriptionCallback<MessageT, Alloc> any_subscription_callback(allocator);
AnySubscriptionCallback<CallbackMessageT, Alloc> any_subscription_callback(allocator);
any_subscription_callback.set(std::forward<CallbackT>(callback));
auto message_alloc =
std::make_shared<typename Subscription<MessageT, Alloc>::MessageAlloc>();
std::make_shared<typename Subscription<CallbackMessageT, Alloc>::MessageAlloc>();
// factory function that creates a MessageT specific SubscriptionT
factory.create_typed_subscription =
@ -91,13 +98,14 @@ create_subscription_factory(
) -> rclcpp::SubscriptionBase::SharedPtr
{
subscription_options.allocator =
rclcpp::allocator::get_rcl_allocator<MessageT>(*message_alloc.get());
rclcpp::allocator::get_rcl_allocator<CallbackMessageT>(*message_alloc.get());
using rclcpp::Subscription;
using rclcpp::SubscriptionBase;
auto sub = Subscription<MessageT, Alloc>::make_shared(
auto sub = Subscription<CallbackMessageT, Alloc>::make_shared(
node_base->get_shared_rcl_node_handle(),
*rosidl_typesupport_cpp::get_message_type_support_handle<MessageT>(),
topic_name,
subscription_options,
any_subscription_callback,
@ -117,7 +125,7 @@ create_subscription_factory(
uint64_t intra_process_subscription_id = ipm->add_subscription(subscription);
auto intra_process_options = rcl_subscription_get_default_options();
intra_process_options.allocator = rclcpp::allocator::get_rcl_allocator<MessageT>(
intra_process_options.allocator = rclcpp::allocator::get_rcl_allocator<CallbackMessageT>(
*message_alloc.get());
intra_process_options.qos = subscription_options.qos;
intra_process_options.ignore_local_publications = false;
@ -128,7 +136,7 @@ create_subscription_factory(
uint64_t publisher_id,
uint64_t message_sequence,
uint64_t subscription_id,
typename rclcpp::Subscription<MessageT, Alloc>::MessageUniquePtr & message)
typename rclcpp::Subscription<CallbackMessageT, Alloc>::MessageUniquePtr & message)
{
auto ipm = weak_ipm.lock();
if (!ipm) {
@ -136,7 +144,7 @@ create_subscription_factory(
throw std::runtime_error(
"intra process take called after destruction of intra process manager");
}
ipm->take_intra_process_message<MessageT, Alloc>(
ipm->take_intra_process_message<CallbackMessageT, Alloc>(
publisher_id, message_sequence, subscription_id, message);
};

View file

@ -0,0 +1,80 @@
// Copyright 2017 Open Source Robotics Foundation, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef RCLCPP__SUBSCRIPTION_TRAITS_HPP_
#define RCLCPP__SUBSCRIPTION_TRAITS_HPP_
#include <memory>
#include "rclcpp/function_traits.hpp"
namespace rclcpp
{
namespace subscription_traits
{
/*
* The current version of uncrustify has a misinterpretion here
* between `:` used for inheritance vs for initializer list
* The result is that whenever a templated struct is used,
* the colon has to be without any whitespace next to it whereas
* when no template is used, the colon has to be separated by a space.
* Cheers!
*/
template<typename T>
struct is_serialized_subscription_argument : std::false_type
{};
template<>
struct is_serialized_subscription_argument<rcl_serialized_message_t>: std::true_type
{};
template<>
struct is_serialized_subscription_argument<std::shared_ptr<rcl_serialized_message_t>>
: std::true_type
{};
template<typename T>
struct is_serialized_subscription : is_serialized_subscription_argument<T>
{};
template<typename CallbackT>
struct is_serialized_callback
: is_serialized_subscription_argument<
typename rclcpp::function_traits::function_traits<CallbackT>::template argument_type<0>>
{};
template<typename MessageT>
struct extract_message_type
{
using type = typename std::remove_cv<MessageT>::type;
};
template<typename MessageT>
struct extract_message_type<std::shared_ptr<MessageT>>: extract_message_type<MessageT>
{};
template<typename MessageT, typename Deleter>
struct extract_message_type<std::unique_ptr<MessageT, Deleter>>: extract_message_type<MessageT>
{};
template<typename CallbackT>
struct has_message_type : extract_message_type<
typename rclcpp::function_traits::function_traits<CallbackT>::template argument_type<0>>
{};
} // namespace subscription_traits
} // namespace rclcpp
#endif // RCLCPP__SUBSCRIPTION_TRAITS_HPP_

View file

@ -36,6 +36,7 @@
<test_depend>ament_lint_common</test_depend>
<test_depend>rmw</test_depend>
<test_depend>rmw_implementation_cmake</test_depend>
<test_depend>test_msgs</test_depend>
<export>
<build_type>ament_cmake</build_type>

View file

@ -281,22 +281,41 @@ void
Executor::execute_subscription(
rclcpp::SubscriptionBase::SharedPtr subscription)
{
std::shared_ptr<void> message = subscription->create_message();
rmw_message_info_t message_info;
message_info.from_intra_process = false;
auto ret = rcl_take(subscription->get_subscription_handle().get(),
if (subscription->is_serialized()) {
auto serialized_msg = subscription->create_serialized_message();
auto ret = rcl_take_serialized_message(
subscription->get_subscription_handle().get(),
serialized_msg.get(), &message_info);
if (RCL_RET_OK == ret) {
auto void_serialized_msg = std::static_pointer_cast<void>(serialized_msg);
subscription->handle_message(void_serialized_msg, message_info);
} else if (RCL_RET_SUBSCRIPTION_TAKE_FAILED != ret) {
RCUTILS_LOG_ERROR_NAMED(
"rclcpp",
"take_serialized failed for subscription on topic '%s': %s",
subscription->get_topic_name(), rcl_get_error_string_safe());
rcl_reset_error();
}
subscription->return_serialized_message(serialized_msg);
} else {
std::shared_ptr<void> message = subscription->create_message();
auto ret = rcl_take(
subscription->get_subscription_handle().get(),
message.get(), &message_info);
if (ret == RCL_RET_OK) {
message_info.from_intra_process = false;
subscription->handle_message(message, message_info);
} else if (ret != RCL_RET_SUBSCRIPTION_TAKE_FAILED) {
RCUTILS_LOG_ERROR_NAMED(
"rclcpp",
"take failed for subscription on topic '%s': %s",
subscription->get_topic_name(), rcl_get_error_string_safe());
rcl_reset_error();
if (RCL_RET_OK == ret) {
subscription->handle_message(message, message_info);
} else if (RCL_RET_SUBSCRIPTION_TAKE_FAILED != ret) {
RCUTILS_LOG_ERROR_NAMED(
"rclcpp",
"could not deserialize serialized message on topic '%s': %s",
subscription->get_topic_name(), rcl_get_error_string_safe());
rcl_reset_error();
}
subscription->return_message(message);
}
subscription->return_message(message);
}
void

View file

@ -399,8 +399,9 @@ SyncParametersClient::set_parameters(
{
auto f = async_parameters_client_->set_parameters(parameters);
auto node_base_interface = node_->get_node_base_interface();
using rclcpp::executors::spin_node_until_future_complete;
if (spin_node_until_future_complete(*executor_, node_->get_node_base_interface(), f) ==
if (spin_node_until_future_complete(*executor_, node_base_interface, f) ==
rclcpp::executor::FutureReturnCode::SUCCESS)
{
return f.get();

View file

@ -31,8 +31,11 @@ SubscriptionBase::SubscriptionBase(
std::shared_ptr<rcl_node_t> node_handle,
const rosidl_message_type_support_t & type_support_handle,
const std::string & topic_name,
const rcl_subscription_options_t & subscription_options)
: node_handle_(node_handle)
const rcl_subscription_options_t & subscription_options,
bool is_serialized)
: node_handle_(node_handle),
type_support_(type_support_handle),
is_serialized_(is_serialized)
{
std::weak_ptr<rcl_node_t> weak_node_handle(node_handle_);
auto custom_deletor = [weak_node_handle](rcl_subscription_t * rcl_subs)
@ -111,3 +114,15 @@ SubscriptionBase::get_intra_process_subscription_handle() const
{
return intra_process_subscription_handle_;
}
const rosidl_message_type_support_t &
SubscriptionBase::get_message_type_support_handle() const
{
return type_support_;
}
bool
SubscriptionBase::is_serialized() const
{
return is_serialized_;
}

View file

@ -74,7 +74,9 @@ void TimeSource::attachNode(
auto cb = std::bind(&TimeSource::clock_cb, this, std::placeholders::_1);
clock_subscription_ = rclcpp::create_subscription<MessageT, decltype(cb), Alloc, SubscriptionT>(
clock_subscription_ = rclcpp::create_subscription<
MessageT, decltype(cb), Alloc, MessageT, SubscriptionT
>(
node_topics_.get(),
topic_name,
std::move(cb),

View file

@ -0,0 +1,67 @@
// Copyright 2018 Open Source Robotics Foundation, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <gtest/gtest.h>
#include <string>
#include <memory>
#include "rclcpp/rclcpp.hpp"
#include "rcl/types.h"
#include "test_msgs/msg/empty.hpp"
TEST(TestSerializedMessageAllocator, default_allocator) {
using DummyMessageT = float;
auto mem_strategy =
rclcpp::message_memory_strategy::MessageMemoryStrategy<DummyMessageT>::create_default();
auto msg0 = mem_strategy->borrow_serialized_message();
ASSERT_EQ(msg0->buffer_capacity, 0u);
mem_strategy->return_serialized_message(msg0);
auto msg100 = mem_strategy->borrow_serialized_message(100);
ASSERT_EQ(msg100->buffer_capacity, 100u);
mem_strategy->return_serialized_message(msg100);
auto msg200 = mem_strategy->borrow_serialized_message();
auto ret = rmw_serialized_message_resize(msg200.get(), 200);
ASSERT_EQ(RCL_RET_OK, ret);
EXPECT_EQ(0u, msg200->buffer_length);
EXPECT_EQ(200u, msg200->buffer_capacity);
mem_strategy->return_serialized_message(msg200);
auto msg1000 = mem_strategy->borrow_serialized_message(1000);
ASSERT_EQ(msg1000->buffer_capacity, 1000u);
ret = rmw_serialized_message_resize(msg1000.get(), 2000);
ASSERT_EQ(RCL_RET_OK, ret);
EXPECT_EQ(2000u, msg1000->buffer_capacity);
mem_strategy->return_serialized_message(msg1000);
}
TEST(TestSerializedMessageAllocator, borrow_from_subscription) {
rclcpp::init(0, NULL);
auto node = std::make_shared<rclcpp::Node>("test_serialized_message_allocator_node");
std::shared_ptr<rclcpp::SubscriptionBase> sub =
node->create_subscription<test_msgs::msg::Empty>("~/dummy_topic", [](
std::shared_ptr<test_msgs::msg::Empty> test_msg) {(void) test_msg;});
auto msg0 = sub->create_serialized_message();
EXPECT_EQ(0u, msg0->buffer_capacity);
sub->return_serialized_message(msg0);
rclcpp::shutdown();
}

View file

@ -0,0 +1,178 @@
// Copyright 2017 Open Source Robotics Foundation, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <gtest/gtest.h>
#include <functional>
#include <memory>
#include <string>
#include "rcl/types.h"
#include "rclcpp/allocator/allocator_common.hpp"
#include "rclcpp/subscription_traits.hpp"
#include "test_msgs/msg/empty.hpp"
void serialized_callback_copy(rcl_serialized_message_t unused)
{
(void) unused;
}
void serialized_callback_shared_ptr(std::shared_ptr<rcl_serialized_message_t> unused)
{
(void) unused;
}
void not_serialized_callback(char * unused)
{
(void) unused;
}
void not_serialized_shared_ptr_callback(std::shared_ptr<char> unused)
{
(void) unused;
}
void not_serialized_unique_ptr_callback(
test_msgs::msg::Empty::UniquePtrWithDeleter<rclcpp::allocator::Deleter<std::allocator<void>,
test_msgs::msg::Empty>> unused)
{
(void) unused;
}
TEST(TestSubscriptionTraits, is_serialized_callback) {
// Test regular functions
auto cb1 = &serialized_callback_copy;
static_assert(
rclcpp::subscription_traits::is_serialized_callback<decltype(cb1)>::value == true,
"rcl_serialized_message_t in a first argument callback makes it a serialized callback");
auto cb2 = &serialized_callback_shared_ptr;
static_assert(
rclcpp::subscription_traits::is_serialized_callback<decltype(cb2)>::value == true,
"std::shared_ptr<rcl_serialized_message_t> in a callback makes it a serialized callback");
auto cb3 = &not_serialized_callback;
static_assert(
rclcpp::subscription_traits::is_serialized_callback<decltype(cb3)>::value == false,
"passing a char * is not a serialized callback");
auto cb4 = &not_serialized_shared_ptr_callback;
static_assert(
rclcpp::subscription_traits::is_serialized_callback<decltype(cb4)>::value == false,
"passing a std::shared_tr<char> is not a serialized callback");
auto cb5 = [](rcl_serialized_message_t unused) -> void
{
(void) unused;
};
static_assert(
rclcpp::subscription_traits::is_serialized_callback<decltype(cb5)>::value == true,
"rcl_serialized_message_t in a first argument callback makes it a serialized callback");
using MessageT = test_msgs::msg::Empty;
using MessageTAllocator = std::allocator<void>;
using MessageTDeallocator = rclcpp::allocator::Deleter<MessageTAllocator, MessageT>;
auto cb6 = [](MessageT::UniquePtrWithDeleter<MessageTDeallocator> unique_msg_ptr) -> void
{
(void) unique_msg_ptr;
};
static_assert(
rclcpp::subscription_traits::is_serialized_callback<decltype(cb6)>::value == false,
"passing a std::unique_ptr of test_msgs::msg::Empty is not a serialized callback");
auto cb7 = &not_serialized_unique_ptr_callback;
static_assert(
rclcpp::subscription_traits::is_serialized_callback<decltype(cb7)>::value == false,
"passing a fancy unique_ptr of test_msgs::msg::Empty is not a serialized callback");
}
TEST(TestSubscriptionTraits, callback_messages) {
static_assert(
std::is_same<
std::shared_ptr<char>,
rclcpp::function_traits::function_traits<
decltype(not_serialized_shared_ptr_callback)
>::template argument_type<0>
>::value, "wrong!");
static_assert(
std::is_same<
char,
rclcpp::subscription_traits::extract_message_type<
rclcpp::function_traits::function_traits<
decltype(not_serialized_shared_ptr_callback)
>::template argument_type<0>
>::type
>::value, "wrong!");
auto cb1 = &serialized_callback_copy;
static_assert(
std::is_same<
rcl_serialized_message_t,
rclcpp::subscription_traits::has_message_type<decltype(cb1)>::type>::value,
"serialized callback message type is rcl_serialized_message_t");
auto cb2 = &serialized_callback_shared_ptr;
static_assert(
std::is_same<
rcl_serialized_message_t,
rclcpp::subscription_traits::has_message_type<decltype(cb2)>::type>::value,
"serialized callback message type is rcl_serialized_message_t");
auto cb3 = &not_serialized_callback;
static_assert(
std::is_same<
char *,
rclcpp::subscription_traits::has_message_type<decltype(cb3)>::type>::value,
"not serialized callback message type is char");
auto cb4 = &not_serialized_shared_ptr_callback;
static_assert(
std::is_same<
char,
rclcpp::subscription_traits::has_message_type<decltype(cb4)>::type>::value,
"not serialized shared_ptr callback message type is std::shared_ptr<char>");
auto cb5 = [](rcl_serialized_message_t unused) -> void
{
(void) unused;
};
static_assert(
std::is_same<
rcl_serialized_message_t,
rclcpp::subscription_traits::has_message_type<decltype(cb5)>::type>::value,
"serialized callback message type is rcl_serialized_message_t");
using MessageT = test_msgs::msg::Empty;
using MessageTAllocator = std::allocator<MessageT>;
using MessageTDeallocator = rclcpp::allocator::Deleter<MessageTAllocator, MessageT>;
auto cb6 = [](std::unique_ptr<MessageT, MessageTDeallocator> unique_msg_ptr) -> void
{
(void) unique_msg_ptr;
};
static_assert(
std::is_same<
test_msgs::msg::Empty,
rclcpp::subscription_traits::has_message_type<decltype(cb6)>::type>::value,
"passing a std::unique_ptr of test_msgs::msg::Empty has message type Empty");
auto cb7 = &not_serialized_unique_ptr_callback;
static_assert(
std::is_same<
test_msgs::msg::Empty,
rclcpp::subscription_traits::has_message_type<decltype(cb7)>::type>::value,
"passing a fancy std::unique_ptr of test_msgs::msg::Empty has message type Empty");
}

View file

@ -297,11 +297,14 @@ public:
return RCL_RET_ERROR;
}
constexpr bool publish_update = true;
// keep the initial state to pass to a transition callback
State initial_state(state_machine_.current_state);
uint8_t transition_id = lifecycle_transition;
if (rcl_lifecycle_trigger_transition(&state_machine_, transition_id, true) != RCL_RET_OK) {
if (rcl_lifecycle_trigger_transition(
&state_machine_, transition_id, publish_update) != RCL_RET_OK)
{
RCUTILS_LOG_ERROR("Unable to start transition %u from current state %s: %s",
transition_id, state_machine_.current_state->label, rcl_get_error_string_safe())
return RCL_RET_ERROR;
@ -311,7 +314,7 @@ public:
state_machine_.current_state->id, initial_state);
if (rcl_lifecycle_trigger_transition(
&state_machine_, cb_return_code, true) != RCL_RET_OK)
&state_machine_, cb_return_code, publish_update) != RCL_RET_OK)
{
RCUTILS_LOG_ERROR("Failed to finish transition %u. Current state is now: %s",
transition_id, state_machine_.current_state->label)
@ -326,13 +329,17 @@ public:
state_machine_.current_state->id, initial_state);
if (error_resolved == lifecycle_msgs::msg::Transition::TRANSITION_CALLBACK_SUCCESS) {
// We call cleanup on the error state
if (rcl_lifecycle_trigger_transition(&state_machine_, error_resolved, true) != RCL_RET_OK) {
if (rcl_lifecycle_trigger_transition(
&state_machine_, error_resolved, publish_update) != RCL_RET_OK)
{
RCUTILS_LOG_ERROR("Failed to call cleanup on error state")
return RCL_RET_ERROR;
}
} else {
// We call shutdown on the error state
if (rcl_lifecycle_trigger_transition(&state_machine_, error_resolved, true) != RCL_RET_OK) {
if (rcl_lifecycle_trigger_transition(
&state_machine_, error_resolved, publish_update) != RCL_RET_OK)
{
RCUTILS_LOG_ERROR("Failed to call cleanup on error state")
return RCL_RET_ERROR;
}