From 351eb96e8c6e9b2265f68afb3a0e14e99db9b1ef Mon Sep 17 00:00:00 2001 From: Dirk Thomas Date: Thu, 27 Aug 2015 14:07:18 -0700 Subject: [PATCH 1/2] declare dependency --- rclcpp/CMakeLists.txt | 6 +++++- rclcpp/package.xml | 1 + 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/rclcpp/CMakeLists.txt b/rclcpp/CMakeLists.txt index 2cc262d..14ea23d 100644 --- a/rclcpp/CMakeLists.txt +++ b/rclcpp/CMakeLists.txt @@ -14,6 +14,8 @@ if(AMENT_ENABLE_TESTING) find_package(ament_lint_auto REQUIRED) ament_lint_auto_find_test_dependencies() + find_package(rmw REQUIRED) + if(NOT WIN32) set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11 -Wall -Wextra") endif() @@ -23,7 +25,9 @@ if(AMENT_ENABLE_TESTING) ament_add_gtest(test_mapped_ring_buffer test/test_mapped_ring_buffer.cpp) ament_add_gtest(test_intra_process_manager test/test_intra_process_manager.cpp) if(TARGET test_intra_process_manager) - target_include_directories(test_intra_process_manager PUBLIC "${rcl_interfaces_INCLUDE_DIRS}") + target_include_directories(test_intra_process_manager PUBLIC + "${rcl_interfaces_INCLUDE_DIRS}" + "${rmw_INCLUDE_DIRS}") endif() endif() diff --git a/rclcpp/package.xml b/rclcpp/package.xml index 50fb412..afae9eb 100644 --- a/rclcpp/package.xml +++ b/rclcpp/package.xml @@ -16,6 +16,7 @@ ament_cmake_gtest ament_lint_auto ament_lint_common + rmw ament_cmake From 51ecd3f3865bc2420f869997126bdc2c5c0cf5e7 Mon Sep 17 00:00:00 2001 From: William Woodall Date: Thu, 27 Aug 2015 21:35:19 -0700 Subject: [PATCH 2/2] adding callbacks and publish for unique_ptrs --- .../rclcpp/any_subscription_callback.hpp | 127 ++++++++++++++++++ rclcpp/include/rclcpp/executor.hpp | 15 ++- rclcpp/include/rclcpp/function_traits.hpp | 63 +++++++++ rclcpp/include/rclcpp/node.hpp | 73 +++++----- rclcpp/include/rclcpp/node_impl.hpp | 88 +++++++++--- rclcpp/include/rclcpp/parameter_client.hpp | 2 +- rclcpp/include/rclcpp/publisher.hpp | 87 ++++++++++-- rclcpp/include/rclcpp/rclcpp.hpp | 12 ++ rclcpp/include/rclcpp/subscription.hpp | 55 ++++++-- rclcpp/include/rclcpp/utilities.hpp | 3 +- 10 files changed, 433 insertions(+), 92 deletions(-) create mode 100644 rclcpp/include/rclcpp/any_subscription_callback.hpp create mode 100644 rclcpp/include/rclcpp/function_traits.hpp diff --git a/rclcpp/include/rclcpp/any_subscription_callback.hpp b/rclcpp/include/rclcpp/any_subscription_callback.hpp new file mode 100644 index 0000000..3165608 --- /dev/null +++ b/rclcpp/include/rclcpp/any_subscription_callback.hpp @@ -0,0 +1,127 @@ +// Copyright 2014 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef RCLCPP_RCLCPP_ANY_SUBSCRIPTION_CALLBACK_HPP_ +#define RCLCPP_RCLCPP_ANY_SUBSCRIPTION_CALLBACK_HPP_ + +#include + +#include +#include +#include + +#include + +namespace rclcpp +{ + +namespace any_subscription_callback +{ + +template +struct AnySubscriptionCallback +{ + using SharedPtrCallback = std::function &)>; + using SharedPtrWithInfoCallback = + std::function &, const rmw_message_info_t &)>; + using UniquePtrCallback = std::function &)>; + using UniquePtrWithInfoCallback = + std::function &, const rmw_message_info_t &)>; + + SharedPtrCallback shared_ptr_callback; + SharedPtrWithInfoCallback shared_ptr_with_info_callback; + UniquePtrCallback unique_ptr_callback; + UniquePtrWithInfoCallback unique_ptr_with_info_callback; + + AnySubscriptionCallback() + : shared_ptr_callback(nullptr), shared_ptr_with_info_callback(nullptr), + unique_ptr_callback(nullptr), unique_ptr_with_info_callback(nullptr) + {} + + AnySubscriptionCallback(const AnySubscriptionCallback &) = default; + + template::arity == 1 + >::type * = nullptr, + typename std::enable_if< + std::is_same< + typename function_traits::template argument_type<0>, + typename std::shared_ptr + >::value + >::type * = nullptr + > + void set(CallbackT callback) + { + shared_ptr_callback = callback; + } + + template::arity == 2 + >::type * = nullptr, + typename std::enable_if< + std::is_same< + typename function_traits::template argument_type<0>, + typename std::shared_ptr + >::value + >::type * = nullptr + > + void set(CallbackT callback) + { + shared_ptr_with_info_callback = callback; + } +/* + template::arity == 1 + >::type * = nullptr, + typename std::enable_if< + std::is_same< + typename function_traits::template argument_type<0>, + typename std::unique_ptr + >::value + >::type * = nullptr + > + void set(CallbackT callback) + { + static_assert(std::is_same< + typename function_traits::template argument_type<0>, + typename std::unique_ptr + >::value, "Not a unique pointer"); + unique_ptr_callback = callback; + } + + template::arity == 2 + >::type * = nullptr, + typename std::enable_if< + std::is_same< + typename function_traits::template argument_type<0>, + typename std::unique_ptr + >::value + >::type * = nullptr + > + void set(CallbackT callback) + { + unique_ptr_with_info_callback = callback; + } + */ +}; + +} /* namespace any_subscription_callback */ +} /* namespace rclcpp */ + +#endif /* RCLCPP_RCLCPP_ANY_SUBSCRIPTION_CALLBACK_HPP_ */ diff --git a/rclcpp/include/rclcpp/executor.hpp b/rclcpp/include/rclcpp/executor.hpp index 3911d59..b931ad6 100644 --- a/rclcpp/include/rclcpp/executor.hpp +++ b/rclcpp/include/rclcpp/executor.hpp @@ -64,7 +64,7 @@ public: virtual void spin() = 0; virtual void - add_node(rclcpp::node::Node::SharedPtr & node_ptr, bool notify = true) + add_node(rclcpp::node::Node::SharedPtr node_ptr, bool notify = true) { // Check to ensure node not already added for (auto & weak_node : weak_nodes_) { @@ -190,7 +190,8 @@ protected: rmw_take_with_info(subscription->subscription_handle_, message.get(), &taken, &message_info); if (ret == RMW_RET_OK) { if (taken) { - subscription->handle_message(message, &message_info.publisher_gid); + message_info.from_intra_process = false; + subscription->handle_message(message, message_info); } } else { fprintf(stderr, @@ -206,10 +207,16 @@ protected: { rcl_interfaces::msg::IntraProcessMessage ipm; bool taken = false; - rmw_ret_t status = rmw_take(subscription->intra_process_subscription_handle_, &ipm, &taken); + rmw_message_info_t message_info; + rmw_ret_t status = rmw_take_with_info( + subscription->intra_process_subscription_handle_, + &ipm, + &taken, + &message_info); if (status == RMW_RET_OK) { if (taken) { - subscription->handle_intra_process_message(ipm); + message_info.from_intra_process = true; + subscription->handle_intra_process_message(ipm, message_info); } } else { fprintf(stderr, diff --git a/rclcpp/include/rclcpp/function_traits.hpp b/rclcpp/include/rclcpp/function_traits.hpp new file mode 100644 index 0000000..0a82783 --- /dev/null +++ b/rclcpp/include/rclcpp/function_traits.hpp @@ -0,0 +1,63 @@ +// Copyright 2014 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef RCLCPP_RCLCPP_FUNCTION_TRAITS_HPP_ +#define RCLCPP_RCLCPP_FUNCTION_TRAITS_HPP_ +#include + +namespace rclcpp +{ + +/* NOTE(esteve): + * We support service callbacks that can optionally take the request id, + * which should be possible with two overloaded create_service methods, + * but unfortunately std::function's constructor on VS2015 is too greedy, + * so we need a mechanism for checking the arity and the type of each argument + * in a callback function. + */ + + +template +struct function_traits +{ + static constexpr std::size_t arity = + function_traits::arity - 1; + + + template + using argument_type = + typename function_traits::template argument_type; +}; + +template +struct function_traits +{ + static constexpr std::size_t arity = sizeof ... (Args); + + template + using argument_type = typename std::tuple_element>::type; +}; + +template +struct function_traits: public function_traits +{}; + +template +struct function_traits + : public function_traits +{}; + +} /* namespace rclcpp */ + +#endif /* RCLCPP_RCLCPP_FUNCTION_TRAITS_HPP_ */ diff --git a/rclcpp/include/rclcpp/node.hpp b/rclcpp/include/rclcpp/node.hpp index 03ff9de..04fb29e 100644 --- a/rclcpp/include/rclcpp/node.hpp +++ b/rclcpp/include/rclcpp/node.hpp @@ -29,6 +29,7 @@ #include #include #include +#include #include #include #include @@ -56,42 +57,6 @@ class Executor; namespace node { -/* NOTE(esteve): - * We support service callbacks that can optionally take the request id, - * which should be possible with two overloaded create_service methods, - * but unfortunately std::function's constructor on VS2015 is too greedy, - * so we need a mechanism for checking the arity and the type of each argument - * in a callback function. - */ -template -struct function_traits -{ - static constexpr std::size_t arity = - function_traits::arity - 1; - - template - using argument_type = - typename function_traits::template argument_type; -}; - -template -struct function_traits -{ - static constexpr std::size_t arity = sizeof ... (Args); - - template - using argument_type = typename std::tuple_element>::type; -}; - -template -struct function_traits: public function_traits -{}; - -template -struct function_traits - : public function_traits -{}; - /* ROS Node Interface. * * This is the single point of entry for creating publishers and subscribers. @@ -130,12 +95,23 @@ public: Windows build breaks when static member function passed as default argument to msg_mem_strat, nullptr is a workaround. */ - template + template typename rclcpp::subscription::Subscription::SharedPtr create_subscription( const std::string & topic_name, const rmw_qos_profile_t & qos_profile, - std::function &)> callback, + CallbackT callback, + rclcpp::callback_group::CallbackGroup::SharedPtr group = nullptr, + bool ignore_local_publications = false, + typename rclcpp::message_memory_strategy::MessageMemoryStrategy::SharedPtr + msg_mem_strat = nullptr); + + template + typename rclcpp::subscription::Subscription::SharedPtr + create_subscription_with_unique_ptr_callback( + const std::string & topic_name, + const rmw_qos_profile_t & qos_profile, + typename rclcpp::subscription::AnySubscriptionCallback::UniquePtrCallback callback, rclcpp::callback_group::CallbackGroup::SharedPtr group = nullptr, bool ignore_local_publications = false, typename rclcpp::message_memory_strategy::MessageMemoryStrategy::SharedPtr @@ -148,11 +124,12 @@ public: rclcpp::timer::CallbackType callback, rclcpp::callback_group::CallbackGroup::SharedPtr group = nullptr); - rclcpp::timer::WallTimer::SharedPtr - create_wall_timer( - std::chrono::duration period, - rclcpp::timer::CallbackType callback, - rclcpp::callback_group::CallbackGroup::SharedPtr group = nullptr); + // TODO(wjwwood): reenable this once I figure out why the demo doesn't build with it. + // rclcpp::timer::WallTimer::SharedPtr + // create_wall_timer( + // std::chrono::duration period, + // rclcpp::timer::CallbackType callback, + // rclcpp::callback_group::CallbackGroup::SharedPtr group = nullptr); using CallbackGroup = rclcpp::callback_group::CallbackGroup; using CallbackGroupWeakPtr = std::weak_ptr; @@ -227,6 +204,16 @@ private: publisher::Publisher::SharedPtr events_publisher_; + template + typename subscription::Subscription::SharedPtr + create_subscription_internal( + const std::string & topic_name, + const rmw_qos_profile_t & qos_profile, + rclcpp::subscription::AnySubscriptionCallback callback, + rclcpp::callback_group::CallbackGroup::SharedPtr group, + bool ignore_local_publications, + typename message_memory_strategy::MessageMemoryStrategy::SharedPtr msg_mem_strat); + template< typename ServiceT, typename FunctorT, diff --git a/rclcpp/include/rclcpp/node_impl.hpp b/rclcpp/include/rclcpp/node_impl.hpp index 42cf7f2..80c9325 100644 --- a/rclcpp/include/rclcpp/node_impl.hpp +++ b/rclcpp/include/rclcpp/node_impl.hpp @@ -153,7 +153,7 @@ Node::create_publisher( rclcpp::intra_process_manager::IntraProcessManager::WeakPtr weak_ipm = intra_process_manager; // *INDENT-OFF* auto shared_publish_callback = - [weak_ipm](uint64_t publisher_id, std::shared_ptr msg) -> uint64_t + [weak_ipm](uint64_t publisher_id, void * msg, const std::type_info & type_info) -> uint64_t { auto ipm = weak_ipm.lock(); if (!ipm) { @@ -161,8 +161,17 @@ Node::create_publisher( throw std::runtime_error( "intra process publish called after destruction of intra process manager"); } - auto typed_msg = std::static_pointer_cast(msg); - std::unique_ptr unique_msg(new MessageT(*typed_msg)); + if (!msg) { + throw std::runtime_error("cannot publisher msg which is a null pointer"); + } + auto & message_type_info = typeid(MessageT); + if (message_type_info != type_info) { + throw std::runtime_error( + std::string("published type '") + type_info.name() + + "' is incompatible from the publisher type '" + message_type_info.name() + "'"); + } + MessageT * typed_message_ptr = static_cast(msg); + std::unique_ptr unique_msg(typed_message_ptr); uint64_t message_seq = ipm->store_intra_process_message(publisher_id, unique_msg); return message_seq; }; @@ -188,12 +197,56 @@ Node::group_in_node(callback_group::CallbackGroup::SharedPtr & group) return group_belongs_to_this_node; } -template -typename subscription::Subscription::SharedPtr +template +typename rclcpp::subscription::Subscription::SharedPtr Node::create_subscription( const std::string & topic_name, const rmw_qos_profile_t & qos_profile, - std::function &)> callback, + CallbackT callback, + rclcpp::callback_group::CallbackGroup::SharedPtr group, + bool ignore_local_publications, + typename rclcpp::message_memory_strategy::MessageMemoryStrategy::SharedPtr + msg_mem_strat) +{ + rclcpp::subscription::AnySubscriptionCallback any_subscription_callback; + any_subscription_callback.set(callback); + return this->create_subscription_internal( + topic_name, + qos_profile, + any_subscription_callback, + group, + ignore_local_publications, + msg_mem_strat); +} + +template +typename rclcpp::subscription::Subscription::SharedPtr +Node::create_subscription_with_unique_ptr_callback( + const std::string & topic_name, + const rmw_qos_profile_t & qos_profile, + typename rclcpp::subscription::AnySubscriptionCallback::UniquePtrCallback callback, + rclcpp::callback_group::CallbackGroup::SharedPtr group, + bool ignore_local_publications, + typename rclcpp::message_memory_strategy::MessageMemoryStrategy::SharedPtr + msg_mem_strat) +{ + rclcpp::subscription::AnySubscriptionCallback any_subscription_callback; + any_subscription_callback.unique_ptr_callback = callback; + return this->create_subscription_internal( + topic_name, + qos_profile, + any_subscription_callback, + group, + ignore_local_publications, + msg_mem_strat); +} + +template +typename subscription::Subscription::SharedPtr +Node::create_subscription_internal( + const std::string & topic_name, + const rmw_qos_profile_t & qos_profile, + rclcpp::subscription::AnySubscriptionCallback callback, rclcpp::callback_group::CallbackGroup::SharedPtr group, bool ignore_local_publications, typename message_memory_strategy::MessageMemoryStrategy::SharedPtr msg_mem_strat) @@ -305,17 +358,18 @@ Node::create_wall_timer( return timer; } -rclcpp::timer::WallTimer::SharedPtr -Node::create_wall_timer( - std::chrono::duration period, - rclcpp::timer::CallbackType callback, - rclcpp::callback_group::CallbackGroup::SharedPtr group) -{ - return create_wall_timer( - std::chrono::duration_cast(period), - callback, - group); -} +// TODO(wjwwood): reenable this once I figure out why the demo doesn't build with it. +// rclcpp::timer::WallTimer::SharedPtr +// Node::create_wall_timer( +// std::chrono::duration period, +// rclcpp::timer::CallbackType callback, +// rclcpp::callback_group::CallbackGroup::SharedPtr group) +// { +// return create_wall_timer( +// std::chrono::duration_cast(period), +// callback, +// group); +// } template typename client::Client::SharedPtr diff --git a/rclcpp/include/rclcpp/parameter_client.hpp b/rclcpp/include/rclcpp/parameter_client.hpp index 0971356..a825441 100644 --- a/rclcpp/include/rclcpp/parameter_client.hpp +++ b/rclcpp/include/rclcpp/parameter_client.hpp @@ -229,7 +229,7 @@ public: template typename rclcpp::subscription::Subscription::SharedPtr - on_parameter_event(FunctorT & callback) + on_parameter_event(FunctorT callback) { return node_->create_subscription( "parameter_events", rmw_qos_profile_parameter_events, callback); diff --git a/rclcpp/include/rclcpp/publisher.hpp b/rclcpp/include/rclcpp/publisher.hpp index 21cf457..8501732 100644 --- a/rclcpp/include/rclcpp/publisher.hpp +++ b/rclcpp/include/rclcpp/publisher.hpp @@ -19,6 +19,7 @@ #include #include +#include #include #include @@ -87,31 +88,73 @@ public: template void - publish(std::shared_ptr & msg) + publish(std::unique_ptr & msg) { - rmw_ret_t status; - status = rmw_publish(publisher_handle_, msg.get()); - if (status != RMW_RET_OK) { - // *INDENT-OFF* (prevent uncrustify from making unecessary indents here) - throw std::runtime_error( - std::string("failed to publish message: ") + rmw_get_error_string_safe()); - // *INDENT-ON* - } + this->do_inter_process_publish(msg.get()); if (store_intra_process_message_) { - uint64_t message_seq = store_intra_process_message_(intra_process_publisher_id_, msg); + // Take the pointer from the unique_msg, release it and pass as a void * + // to the ipm. The ipm should then capture it again as a unique_ptr of + // the correct type. + // TODO(wjwwood): + // investigate how to transfer the custom deleter (if there is one) + // from the incoming unique_ptr through to the ipm's unique_ptr. + // See: http://stackoverflow.com/questions/11002641/dynamic-casting-for-unique-ptr + MessageT * msg_ptr = msg.get(); + msg.release(); + uint64_t message_seq; + { + std::lock_guard lock(intra_process_publish_mutex_); + message_seq = + store_intra_process_message_(intra_process_publisher_id_, msg_ptr, typeid(MessageT)); + } rcl_interfaces::msg::IntraProcessMessage ipm; ipm.publisher_id = intra_process_publisher_id_; ipm.message_sequence = message_seq; - status = rmw_publish(intra_process_publisher_handle_, &ipm); + auto status = rmw_publish(intra_process_publisher_handle_, &ipm); if (status != RMW_RET_OK) { // *INDENT-OFF* (prevent uncrustify from making unecessary indents here) throw std::runtime_error( std::string("failed to publish intra process message: ") + rmw_get_error_string_safe()); // *INDENT-ON* } + } else { + // Always destroy the message, even if we don't consume it, for consistency. + msg.reset(); } } + template + void + publish(const std::shared_ptr & msg) + { + // Avoid allocating when not using intra process. + if (!store_intra_process_message_) { + // In this case we're not using intra process. + return this->do_inter_process_publish(msg.get()); + } + // Otherwise we have to allocate memory in a unique_ptr and pass it along. + // TODO(wjwwood): + // The intra process manager should probably also be able to store + // shared_ptr's and do the "smart" thing based on other intra process + // subscriptions. For now call the other publish(). + std::unique_ptr unique_msg(new MessageT(*msg.get())); + return this->publish(unique_msg); + } + + template + void + publish(const MessageT & msg) + { + // Avoid allocating when not using intra process. + if (!store_intra_process_message_) { + // In this case we're not using intra process. + return this->do_inter_process_publish(msg.get()); + } + // Otherwise we have to allocate memory in a unique_ptr and pass it along. + std::unique_ptr unique_msg(new MessageT(msg)); + return this->publish(unique_msg); + } + const std::string & get_topic_name() const { @@ -161,13 +204,27 @@ public: return result; } - typedef std::function)> StoreSharedMessageCallbackT; + typedef std::function StoreMessageCallbackT; protected: + template + void + do_inter_process_publish(MessageT * msg) + { + auto status = rmw_publish(publisher_handle_, msg); + if (status != RMW_RET_OK) { + // *INDENT-OFF* (prevent uncrustify from making unecessary indents here) + throw std::runtime_error( + std::string("failed to publish message: ") + rmw_get_error_string_safe()); + // *INDENT-ON* + } + } + + void setup_intra_process( uint64_t intra_process_publisher_id, - StoreSharedMessageCallbackT callback, + StoreMessageCallbackT callback, rmw_publisher_t * intra_process_publisher_handle) { intra_process_publisher_id_ = intra_process_publisher_id; @@ -194,11 +251,13 @@ private: size_t queue_size_; uint64_t intra_process_publisher_id_; - StoreSharedMessageCallbackT store_intra_process_message_; + StoreMessageCallbackT store_intra_process_message_; rmw_gid_t rmw_gid_; rmw_gid_t intra_process_rmw_gid_; + std::mutex intra_process_publish_mutex_; + }; } /* namespace publisher */ diff --git a/rclcpp/include/rclcpp/rclcpp.hpp b/rclcpp/include/rclcpp/rclcpp.hpp index 6d388d1..5ba4aab 100644 --- a/rclcpp/include/rclcpp/rclcpp.hpp +++ b/rclcpp/include/rclcpp/rclcpp.hpp @@ -39,6 +39,18 @@ const std::chrono::nanoseconds operator"" _s(long double s) std::chrono::duration(s)); } +const std::chrono::nanoseconds +operator"" _ms(unsigned long long ms) +{ + return std::chrono::milliseconds(ms); +} +const std::chrono::nanoseconds +operator"" _ms(long double ms) +{ + return std::chrono::duration_cast( + std::chrono::duration(ms)); +} + const std::chrono::nanoseconds operator"" _ns(unsigned long long ns) { diff --git a/rclcpp/include/rclcpp/subscription.hpp b/rclcpp/include/rclcpp/subscription.hpp index 4952087..15a6a57 100644 --- a/rclcpp/include/rclcpp/subscription.hpp +++ b/rclcpp/include/rclcpp/subscription.hpp @@ -27,6 +27,7 @@ #include #include +#include namespace rclcpp { @@ -94,9 +95,13 @@ public: } virtual std::shared_ptr create_message() = 0; - virtual void handle_message(std::shared_ptr & message, const rmw_gid_t * sender_id) = 0; + virtual void handle_message( + std::shared_ptr & message, + const rmw_message_info_t & message_info) = 0; virtual void return_message(std::shared_ptr & message) = 0; - virtual void handle_intra_process_message(rcl_interfaces::msg::IntraProcessMessage & ipm) = 0; + virtual void handle_intra_process_message( + rcl_interfaces::msg::IntraProcessMessage & ipm, + const rmw_message_info_t & message_info) = 0; protected: rmw_subscription_t * intra_process_subscription_handle_; @@ -113,13 +118,14 @@ private: }; +using namespace any_subscription_callback; + template class Subscription : public SubscriptionBase { friend class rclcpp::node::Node; public: - using CallbackType = std::function &)>; RCLCPP_SMART_PTR_DEFINITIONS(Subscription); Subscription( @@ -127,11 +133,11 @@ public: rmw_subscription_t * subscription_handle, const std::string & topic_name, bool ignore_local_publications, - CallbackType callback, + AnySubscriptionCallback callback, typename message_memory_strategy::MessageMemoryStrategy::SharedPtr memory_strategy = message_memory_strategy::MessageMemoryStrategy::create_default()) : SubscriptionBase(node_handle, subscription_handle, topic_name, ignore_local_publications), - callback_(callback), + any_callback_(callback), message_memory_strategy_(memory_strategy), get_intra_process_message_callback_(nullptr), matches_any_intra_process_publishers_(nullptr) @@ -148,17 +154,29 @@ public: return message_memory_strategy_->borrow_message(); } - void handle_message(std::shared_ptr & message, const rmw_gid_t * sender_id) + void handle_message(std::shared_ptr & message, const rmw_message_info_t & message_info) { if (matches_any_intra_process_publishers_) { - if (matches_any_intra_process_publishers_(sender_id)) { + if (matches_any_intra_process_publishers_(&message_info.publisher_gid)) { // In this case, the message will be delivered via intra process and // we should ignore this copy of the message. return; } } auto typed_message = std::static_pointer_cast(message); - callback_(typed_message); + if (any_callback_.shared_ptr_callback) { + any_callback_.shared_ptr_callback(typed_message); + } else if (any_callback_.shared_ptr_with_info_callback) { + any_callback_.shared_ptr_with_info_callback(typed_message, message_info); + } else if (any_callback_.unique_ptr_callback) { + std::unique_ptr unique_msg(new MessageT(*typed_message)); + any_callback_.unique_ptr_callback(unique_msg); + } else if (any_callback_.unique_ptr_with_info_callback) { + std::unique_ptr unique_msg(new MessageT(*typed_message)); + any_callback_.unique_ptr_with_info_callback(unique_msg, message_info); + } else { + throw std::runtime_error("unexpected message without any callback set"); + } } void return_message(std::shared_ptr & message) @@ -167,7 +185,9 @@ public: message_memory_strategy_->return_message(typed_message); } - void handle_intra_process_message(rcl_interfaces::msg::IntraProcessMessage & ipm) + void handle_intra_process_message( + rcl_interfaces::msg::IntraProcessMessage & ipm, + const rmw_message_info_t & message_info) { if (!get_intra_process_message_callback_) { // throw std::runtime_error( @@ -188,8 +208,19 @@ public: // TODO(wjwwood): should we notify someone of this? log error, log warning? return; } - typename MessageT::SharedPtr shared_msg = std::move(msg); - callback_(shared_msg); + if (any_callback_.shared_ptr_callback) { + typename MessageT::SharedPtr shared_msg = std::move(msg); + any_callback_.shared_ptr_callback(shared_msg); + } else if (any_callback_.shared_ptr_with_info_callback) { + typename MessageT::SharedPtr shared_msg = std::move(msg); + any_callback_.shared_ptr_with_info_callback(shared_msg, message_info); + } else if (any_callback_.unique_ptr_callback) { + any_callback_.unique_ptr_callback(msg); + } else if (any_callback_.unique_ptr_with_info_callback) { + any_callback_.unique_ptr_with_info_callback(msg, message_info); + } else { + throw std::runtime_error("unexpected message without any callback set"); + } } private: @@ -213,7 +244,7 @@ private: RCLCPP_DISABLE_COPY(Subscription); - CallbackType callback_; + AnySubscriptionCallback any_callback_; typename message_memory_strategy::MessageMemoryStrategy::SharedPtr message_memory_strategy_; diff --git a/rclcpp/include/rclcpp/utilities.hpp b/rclcpp/include/rclcpp/utilities.hpp index 9e77976..77d6c4d 100644 --- a/rclcpp/include/rclcpp/utilities.hpp +++ b/rclcpp/include/rclcpp/utilities.hpp @@ -169,7 +169,8 @@ sleep_for(const std::chrono::nanoseconds & nanoseconds) // TODO: determine if posix's nanosleep(2) is more efficient here std::unique_lock lock(::g_interrupt_mutex); auto cvs = ::g_interrupt_condition_variable.wait_for(lock, nanoseconds); - return cvs == std::cv_status::no_timeout; + // Return true if the timeout elapsed successfully, otherwise false. + return cvs != std::cv_status::no_timeout; } } /* namespace utilities */