diff --git a/rclcpp/CMakeLists.txt b/rclcpp/CMakeLists.txt index 2cc262d..14ea23d 100644 --- a/rclcpp/CMakeLists.txt +++ b/rclcpp/CMakeLists.txt @@ -14,6 +14,8 @@ if(AMENT_ENABLE_TESTING) find_package(ament_lint_auto REQUIRED) ament_lint_auto_find_test_dependencies() + find_package(rmw REQUIRED) + if(NOT WIN32) set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11 -Wall -Wextra") endif() @@ -23,7 +25,9 @@ if(AMENT_ENABLE_TESTING) ament_add_gtest(test_mapped_ring_buffer test/test_mapped_ring_buffer.cpp) ament_add_gtest(test_intra_process_manager test/test_intra_process_manager.cpp) if(TARGET test_intra_process_manager) - target_include_directories(test_intra_process_manager PUBLIC "${rcl_interfaces_INCLUDE_DIRS}") + target_include_directories(test_intra_process_manager PUBLIC + "${rcl_interfaces_INCLUDE_DIRS}" + "${rmw_INCLUDE_DIRS}") endif() endif() diff --git a/rclcpp/include/rclcpp/any_subscription_callback.hpp b/rclcpp/include/rclcpp/any_subscription_callback.hpp new file mode 100644 index 0000000..3165608 --- /dev/null +++ b/rclcpp/include/rclcpp/any_subscription_callback.hpp @@ -0,0 +1,127 @@ +// Copyright 2014 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef RCLCPP_RCLCPP_ANY_SUBSCRIPTION_CALLBACK_HPP_ +#define RCLCPP_RCLCPP_ANY_SUBSCRIPTION_CALLBACK_HPP_ + +#include + +#include +#include +#include + +#include + +namespace rclcpp +{ + +namespace any_subscription_callback +{ + +template +struct AnySubscriptionCallback +{ + using SharedPtrCallback = std::function &)>; + using SharedPtrWithInfoCallback = + std::function &, const rmw_message_info_t &)>; + using UniquePtrCallback = std::function &)>; + using UniquePtrWithInfoCallback = + std::function &, const rmw_message_info_t &)>; + + SharedPtrCallback shared_ptr_callback; + SharedPtrWithInfoCallback shared_ptr_with_info_callback; + UniquePtrCallback unique_ptr_callback; + UniquePtrWithInfoCallback unique_ptr_with_info_callback; + + AnySubscriptionCallback() + : shared_ptr_callback(nullptr), shared_ptr_with_info_callback(nullptr), + unique_ptr_callback(nullptr), unique_ptr_with_info_callback(nullptr) + {} + + AnySubscriptionCallback(const AnySubscriptionCallback &) = default; + + template::arity == 1 + >::type * = nullptr, + typename std::enable_if< + std::is_same< + typename function_traits::template argument_type<0>, + typename std::shared_ptr + >::value + >::type * = nullptr + > + void set(CallbackT callback) + { + shared_ptr_callback = callback; + } + + template::arity == 2 + >::type * = nullptr, + typename std::enable_if< + std::is_same< + typename function_traits::template argument_type<0>, + typename std::shared_ptr + >::value + >::type * = nullptr + > + void set(CallbackT callback) + { + shared_ptr_with_info_callback = callback; + } +/* + template::arity == 1 + >::type * = nullptr, + typename std::enable_if< + std::is_same< + typename function_traits::template argument_type<0>, + typename std::unique_ptr + >::value + >::type * = nullptr + > + void set(CallbackT callback) + { + static_assert(std::is_same< + typename function_traits::template argument_type<0>, + typename std::unique_ptr + >::value, "Not a unique pointer"); + unique_ptr_callback = callback; + } + + template::arity == 2 + >::type * = nullptr, + typename std::enable_if< + std::is_same< + typename function_traits::template argument_type<0>, + typename std::unique_ptr + >::value + >::type * = nullptr + > + void set(CallbackT callback) + { + unique_ptr_with_info_callback = callback; + } + */ +}; + +} /* namespace any_subscription_callback */ +} /* namespace rclcpp */ + +#endif /* RCLCPP_RCLCPP_ANY_SUBSCRIPTION_CALLBACK_HPP_ */ diff --git a/rclcpp/include/rclcpp/executor.hpp b/rclcpp/include/rclcpp/executor.hpp index 801259a..12d0aa4 100644 --- a/rclcpp/include/rclcpp/executor.hpp +++ b/rclcpp/include/rclcpp/executor.hpp @@ -88,7 +88,7 @@ public: * node was added, it will wake up. */ virtual void - add_node(rclcpp::node::Node::SharedPtr & node_ptr, bool notify = true) + add_node(rclcpp::node::Node::SharedPtr node_ptr, bool notify = true) { // Check to ensure node not already added for (auto & weak_node : weak_nodes_) { @@ -248,7 +248,8 @@ protected: rmw_take_with_info(subscription->subscription_handle_, message.get(), &taken, &message_info); if (ret == RMW_RET_OK) { if (taken) { - subscription->handle_message(message, &message_info.publisher_gid); + message_info.from_intra_process = false; + subscription->handle_message(message, message_info); } } else { fprintf(stderr, @@ -264,10 +265,16 @@ protected: { rcl_interfaces::msg::IntraProcessMessage ipm; bool taken = false; - rmw_ret_t status = rmw_take(subscription->intra_process_subscription_handle_, &ipm, &taken); + rmw_message_info_t message_info; + rmw_ret_t status = rmw_take_with_info( + subscription->intra_process_subscription_handle_, + &ipm, + &taken, + &message_info); if (status == RMW_RET_OK) { if (taken) { - subscription->handle_intra_process_message(ipm); + message_info.from_intra_process = true; + subscription->handle_intra_process_message(ipm, message_info); } } else { fprintf(stderr, diff --git a/rclcpp/include/rclcpp/function_traits.hpp b/rclcpp/include/rclcpp/function_traits.hpp new file mode 100644 index 0000000..0a82783 --- /dev/null +++ b/rclcpp/include/rclcpp/function_traits.hpp @@ -0,0 +1,63 @@ +// Copyright 2014 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef RCLCPP_RCLCPP_FUNCTION_TRAITS_HPP_ +#define RCLCPP_RCLCPP_FUNCTION_TRAITS_HPP_ +#include + +namespace rclcpp +{ + +/* NOTE(esteve): + * We support service callbacks that can optionally take the request id, + * which should be possible with two overloaded create_service methods, + * but unfortunately std::function's constructor on VS2015 is too greedy, + * so we need a mechanism for checking the arity and the type of each argument + * in a callback function. + */ + + +template +struct function_traits +{ + static constexpr std::size_t arity = + function_traits::arity - 1; + + + template + using argument_type = + typename function_traits::template argument_type; +}; + +template +struct function_traits +{ + static constexpr std::size_t arity = sizeof ... (Args); + + template + using argument_type = typename std::tuple_element>::type; +}; + +template +struct function_traits: public function_traits +{}; + +template +struct function_traits + : public function_traits +{}; + +} /* namespace rclcpp */ + +#endif /* RCLCPP_RCLCPP_FUNCTION_TRAITS_HPP_ */ diff --git a/rclcpp/include/rclcpp/node.hpp b/rclcpp/include/rclcpp/node.hpp index 31d58c8..d82b130 100644 --- a/rclcpp/include/rclcpp/node.hpp +++ b/rclcpp/include/rclcpp/node.hpp @@ -29,6 +29,7 @@ #include #include #include +#include #include #include #include @@ -55,43 +56,6 @@ class Executor; namespace node { - -/* NOTE(esteve): - * We support service callbacks that can optionally take the request id, - * which should be possible with two overloaded create_service methods, - * but unfortunately std::function's constructor on VS2015 is too greedy, - * so we need a mechanism for checking the arity and the type of each argument - * in a callback function. - */ -template -struct function_traits -{ - static constexpr std::size_t arity = - function_traits::arity - 1; - - template - using argument_type = - typename function_traits::template argument_type; -}; - -template -struct function_traits -{ - static constexpr std::size_t arity = sizeof ... (Args); - - template - using argument_type = typename std::tuple_element>::type; -}; - -template -struct function_traits: public function_traits -{}; - -template -struct function_traits - : public function_traits -{}; - /// Node is the single point of entry for creating publishers and subscribers. class Node { @@ -153,12 +117,23 @@ public: Windows build breaks when static member function passed as default argument to msg_mem_strat, nullptr is a workaround. */ - template + template typename rclcpp::subscription::Subscription::SharedPtr create_subscription( const std::string & topic_name, const rmw_qos_profile_t & qos_profile, - std::function &)> callback, + CallbackT callback, + rclcpp::callback_group::CallbackGroup::SharedPtr group = nullptr, + bool ignore_local_publications = false, + typename rclcpp::message_memory_strategy::MessageMemoryStrategy::SharedPtr + msg_mem_strat = nullptr); + + template + typename rclcpp::subscription::Subscription::SharedPtr + create_subscription_with_unique_ptr_callback( + const std::string & topic_name, + const rmw_qos_profile_t & qos_profile, + typename rclcpp::subscription::AnySubscriptionCallback::UniquePtrCallback callback, rclcpp::callback_group::CallbackGroup::SharedPtr group = nullptr, bool ignore_local_publications = false, typename rclcpp::message_memory_strategy::MessageMemoryStrategy::SharedPtr @@ -182,11 +157,12 @@ public: * \param[in] callback User-defined callback function. * \param[in] group Callback group to execute this timer's callback in. */ - rclcpp::timer::WallTimer::SharedPtr - create_wall_timer( - std::chrono::duration period, - rclcpp::timer::CallbackType callback, - rclcpp::callback_group::CallbackGroup::SharedPtr group = nullptr); + // TODO(wjwwood): reenable this once I figure out why the demo doesn't build with it. + // rclcpp::timer::WallTimer::SharedPtr + // create_wall_timer( + // std::chrono::duration period, + // rclcpp::timer::CallbackType callback, + // rclcpp::callback_group::CallbackGroup::SharedPtr group = nullptr); using CallbackGroup = rclcpp::callback_group::CallbackGroup; using CallbackGroupWeakPtr = std::weak_ptr; @@ -261,6 +237,16 @@ private: publisher::Publisher::SharedPtr events_publisher_; + template + typename subscription::Subscription::SharedPtr + create_subscription_internal( + const std::string & topic_name, + const rmw_qos_profile_t & qos_profile, + rclcpp::subscription::AnySubscriptionCallback callback, + rclcpp::callback_group::CallbackGroup::SharedPtr group, + bool ignore_local_publications, + typename message_memory_strategy::MessageMemoryStrategy::SharedPtr msg_mem_strat); + template< typename ServiceT, typename FunctorT, diff --git a/rclcpp/include/rclcpp/node_impl.hpp b/rclcpp/include/rclcpp/node_impl.hpp index 42cf7f2..80c9325 100644 --- a/rclcpp/include/rclcpp/node_impl.hpp +++ b/rclcpp/include/rclcpp/node_impl.hpp @@ -153,7 +153,7 @@ Node::create_publisher( rclcpp::intra_process_manager::IntraProcessManager::WeakPtr weak_ipm = intra_process_manager; // *INDENT-OFF* auto shared_publish_callback = - [weak_ipm](uint64_t publisher_id, std::shared_ptr msg) -> uint64_t + [weak_ipm](uint64_t publisher_id, void * msg, const std::type_info & type_info) -> uint64_t { auto ipm = weak_ipm.lock(); if (!ipm) { @@ -161,8 +161,17 @@ Node::create_publisher( throw std::runtime_error( "intra process publish called after destruction of intra process manager"); } - auto typed_msg = std::static_pointer_cast(msg); - std::unique_ptr unique_msg(new MessageT(*typed_msg)); + if (!msg) { + throw std::runtime_error("cannot publisher msg which is a null pointer"); + } + auto & message_type_info = typeid(MessageT); + if (message_type_info != type_info) { + throw std::runtime_error( + std::string("published type '") + type_info.name() + + "' is incompatible from the publisher type '" + message_type_info.name() + "'"); + } + MessageT * typed_message_ptr = static_cast(msg); + std::unique_ptr unique_msg(typed_message_ptr); uint64_t message_seq = ipm->store_intra_process_message(publisher_id, unique_msg); return message_seq; }; @@ -188,12 +197,56 @@ Node::group_in_node(callback_group::CallbackGroup::SharedPtr & group) return group_belongs_to_this_node; } -template -typename subscription::Subscription::SharedPtr +template +typename rclcpp::subscription::Subscription::SharedPtr Node::create_subscription( const std::string & topic_name, const rmw_qos_profile_t & qos_profile, - std::function &)> callback, + CallbackT callback, + rclcpp::callback_group::CallbackGroup::SharedPtr group, + bool ignore_local_publications, + typename rclcpp::message_memory_strategy::MessageMemoryStrategy::SharedPtr + msg_mem_strat) +{ + rclcpp::subscription::AnySubscriptionCallback any_subscription_callback; + any_subscription_callback.set(callback); + return this->create_subscription_internal( + topic_name, + qos_profile, + any_subscription_callback, + group, + ignore_local_publications, + msg_mem_strat); +} + +template +typename rclcpp::subscription::Subscription::SharedPtr +Node::create_subscription_with_unique_ptr_callback( + const std::string & topic_name, + const rmw_qos_profile_t & qos_profile, + typename rclcpp::subscription::AnySubscriptionCallback::UniquePtrCallback callback, + rclcpp::callback_group::CallbackGroup::SharedPtr group, + bool ignore_local_publications, + typename rclcpp::message_memory_strategy::MessageMemoryStrategy::SharedPtr + msg_mem_strat) +{ + rclcpp::subscription::AnySubscriptionCallback any_subscription_callback; + any_subscription_callback.unique_ptr_callback = callback; + return this->create_subscription_internal( + topic_name, + qos_profile, + any_subscription_callback, + group, + ignore_local_publications, + msg_mem_strat); +} + +template +typename subscription::Subscription::SharedPtr +Node::create_subscription_internal( + const std::string & topic_name, + const rmw_qos_profile_t & qos_profile, + rclcpp::subscription::AnySubscriptionCallback callback, rclcpp::callback_group::CallbackGroup::SharedPtr group, bool ignore_local_publications, typename message_memory_strategy::MessageMemoryStrategy::SharedPtr msg_mem_strat) @@ -305,17 +358,18 @@ Node::create_wall_timer( return timer; } -rclcpp::timer::WallTimer::SharedPtr -Node::create_wall_timer( - std::chrono::duration period, - rclcpp::timer::CallbackType callback, - rclcpp::callback_group::CallbackGroup::SharedPtr group) -{ - return create_wall_timer( - std::chrono::duration_cast(period), - callback, - group); -} +// TODO(wjwwood): reenable this once I figure out why the demo doesn't build with it. +// rclcpp::timer::WallTimer::SharedPtr +// Node::create_wall_timer( +// std::chrono::duration period, +// rclcpp::timer::CallbackType callback, +// rclcpp::callback_group::CallbackGroup::SharedPtr group) +// { +// return create_wall_timer( +// std::chrono::duration_cast(period), +// callback, +// group); +// } template typename client::Client::SharedPtr diff --git a/rclcpp/include/rclcpp/parameter_client.hpp b/rclcpp/include/rclcpp/parameter_client.hpp index 0971356..a825441 100644 --- a/rclcpp/include/rclcpp/parameter_client.hpp +++ b/rclcpp/include/rclcpp/parameter_client.hpp @@ -229,7 +229,7 @@ public: template typename rclcpp::subscription::Subscription::SharedPtr - on_parameter_event(FunctorT & callback) + on_parameter_event(FunctorT callback) { return node_->create_subscription( "parameter_events", rmw_qos_profile_parameter_events, callback); diff --git a/rclcpp/include/rclcpp/publisher.hpp b/rclcpp/include/rclcpp/publisher.hpp index 014cf18..e9504ad 100644 --- a/rclcpp/include/rclcpp/publisher.hpp +++ b/rclcpp/include/rclcpp/publisher.hpp @@ -19,6 +19,7 @@ #include #include +#include #include #include @@ -103,31 +104,73 @@ public: */ template void - publish(std::shared_ptr & msg) + publish(std::unique_ptr & msg) { - rmw_ret_t status; - status = rmw_publish(publisher_handle_, msg.get()); - if (status != RMW_RET_OK) { - // *INDENT-OFF* (prevent uncrustify from making unecessary indents here) - throw std::runtime_error( - std::string("failed to publish message: ") + rmw_get_error_string_safe()); - // *INDENT-ON* - } + this->do_inter_process_publish(msg.get()); if (store_intra_process_message_) { - uint64_t message_seq = store_intra_process_message_(intra_process_publisher_id_, msg); + // Take the pointer from the unique_msg, release it and pass as a void * + // to the ipm. The ipm should then capture it again as a unique_ptr of + // the correct type. + // TODO(wjwwood): + // investigate how to transfer the custom deleter (if there is one) + // from the incoming unique_ptr through to the ipm's unique_ptr. + // See: http://stackoverflow.com/questions/11002641/dynamic-casting-for-unique-ptr + MessageT * msg_ptr = msg.get(); + msg.release(); + uint64_t message_seq; + { + std::lock_guard lock(intra_process_publish_mutex_); + message_seq = + store_intra_process_message_(intra_process_publisher_id_, msg_ptr, typeid(MessageT)); + } rcl_interfaces::msg::IntraProcessMessage ipm; ipm.publisher_id = intra_process_publisher_id_; ipm.message_sequence = message_seq; - status = rmw_publish(intra_process_publisher_handle_, &ipm); + auto status = rmw_publish(intra_process_publisher_handle_, &ipm); if (status != RMW_RET_OK) { // *INDENT-OFF* (prevent uncrustify from making unecessary indents here) throw std::runtime_error( std::string("failed to publish intra process message: ") + rmw_get_error_string_safe()); // *INDENT-ON* } + } else { + // Always destroy the message, even if we don't consume it, for consistency. + msg.reset(); } } + template + void + publish(const std::shared_ptr & msg) + { + // Avoid allocating when not using intra process. + if (!store_intra_process_message_) { + // In this case we're not using intra process. + return this->do_inter_process_publish(msg.get()); + } + // Otherwise we have to allocate memory in a unique_ptr and pass it along. + // TODO(wjwwood): + // The intra process manager should probably also be able to store + // shared_ptr's and do the "smart" thing based on other intra process + // subscriptions. For now call the other publish(). + std::unique_ptr unique_msg(new MessageT(*msg.get())); + return this->publish(unique_msg); + } + + template + void + publish(const MessageT & msg) + { + // Avoid allocating when not using intra process. + if (!store_intra_process_message_) { + // In this case we're not using intra process. + return this->do_inter_process_publish(msg.get()); + } + // Otherwise we have to allocate memory in a unique_ptr and pass it along. + std::unique_ptr unique_msg(new MessageT(msg)); + return this->publish(unique_msg); + } + /// Get the topic that this publisher publishes on. // \return The topic name. const std::string & @@ -197,13 +240,27 @@ public: return result; } - typedef std::function)> StoreSharedMessageCallbackT; + typedef std::function StoreMessageCallbackT; protected: + template + void + do_inter_process_publish(MessageT * msg) + { + auto status = rmw_publish(publisher_handle_, msg); + if (status != RMW_RET_OK) { + // *INDENT-OFF* (prevent uncrustify from making unecessary indents here) + throw std::runtime_error( + std::string("failed to publish message: ") + rmw_get_error_string_safe()); + // *INDENT-ON* + } + } + + void setup_intra_process( uint64_t intra_process_publisher_id, - StoreSharedMessageCallbackT callback, + StoreMessageCallbackT callback, rmw_publisher_t * intra_process_publisher_handle) { intra_process_publisher_id_ = intra_process_publisher_id; @@ -230,11 +287,13 @@ private: size_t queue_size_; uint64_t intra_process_publisher_id_; - StoreSharedMessageCallbackT store_intra_process_message_; + StoreMessageCallbackT store_intra_process_message_; rmw_gid_t rmw_gid_; rmw_gid_t intra_process_rmw_gid_; + std::mutex intra_process_publish_mutex_; + }; } /* namespace publisher */ diff --git a/rclcpp/include/rclcpp/rclcpp.hpp b/rclcpp/include/rclcpp/rclcpp.hpp index fb7b772..df72462 100644 --- a/rclcpp/include/rclcpp/rclcpp.hpp +++ b/rclcpp/include/rclcpp/rclcpp.hpp @@ -39,6 +39,18 @@ const std::chrono::nanoseconds operator"" _s(long double s) std::chrono::duration(s)); } +const std::chrono::nanoseconds +operator"" _ms(unsigned long long ms) +{ + return std::chrono::milliseconds(ms); +} +const std::chrono::nanoseconds +operator"" _ms(long double ms) +{ + return std::chrono::duration_cast( + std::chrono::duration(ms)); +} + const std::chrono::nanoseconds operator"" _ns(unsigned long long ns) { diff --git a/rclcpp/include/rclcpp/subscription.hpp b/rclcpp/include/rclcpp/subscription.hpp index e73bf9d..d2dd094 100644 --- a/rclcpp/include/rclcpp/subscription.hpp +++ b/rclcpp/include/rclcpp/subscription.hpp @@ -27,6 +27,7 @@ #include #include +#include namespace rclcpp { @@ -106,19 +107,21 @@ public: /// Borrow a new message. // \return Shared pointer to the fresh message. virtual std::shared_ptr create_message() = 0; - /// Check if we need to handle the message, and execute the callback if we do. /** * \param[in] message Shared pointer to the message to handle. - * \param[in] sender_id Global identifier of the entity that sent this message. + * \param[in] message_info Metadata associated with this message. */ - virtual void handle_message(std::shared_ptr & message, const rmw_gid_t * sender_id) = 0; + virtual void handle_message( + std::shared_ptr & message, + const rmw_message_info_t & message_info) = 0; /// Return the message borrowed in create_message. // \param[in] Shared pointer to the returned message. virtual void return_message(std::shared_ptr & message) = 0; - - virtual void handle_intra_process_message(rcl_interfaces::msg::IntraProcessMessage & ipm) = 0; + virtual void handle_intra_process_message( + rcl_interfaces::msg::IntraProcessMessage & ipm, + const rmw_message_info_t & message_info) = 0; protected: rmw_subscription_t * intra_process_subscription_handle_; @@ -135,6 +138,8 @@ private: }; +using namespace any_subscription_callback; + /// Subscription implementation, templated on the type of message this subscription receives. template class Subscription : public SubscriptionBase @@ -142,7 +147,6 @@ class Subscription : public SubscriptionBase friend class rclcpp::node::Node; public: - using CallbackType = std::function &)>; RCLCPP_SMART_PTR_DEFINITIONS(Subscription); /// Default constructor. @@ -160,11 +164,11 @@ public: rmw_subscription_t * subscription_handle, const std::string & topic_name, bool ignore_local_publications, - CallbackType callback, + AnySubscriptionCallback callback, typename message_memory_strategy::MessageMemoryStrategy::SharedPtr memory_strategy = message_memory_strategy::MessageMemoryStrategy::create_default()) : SubscriptionBase(node_handle, subscription_handle, topic_name, ignore_local_publications), - callback_(callback), + any_callback_(callback), message_memory_strategy_(memory_strategy), get_intra_process_message_callback_(nullptr), matches_any_intra_process_publishers_(nullptr) @@ -189,17 +193,29 @@ public: return message_memory_strategy_->borrow_message(); } - void handle_message(std::shared_ptr & message, const rmw_gid_t * sender_id) + void handle_message(std::shared_ptr & message, const rmw_message_info_t & message_info) { if (matches_any_intra_process_publishers_) { - if (matches_any_intra_process_publishers_(sender_id)) { + if (matches_any_intra_process_publishers_(&message_info.publisher_gid)) { // In this case, the message will be delivered via intra process and // we should ignore this copy of the message. return; } } auto typed_message = std::static_pointer_cast(message); - callback_(typed_message); + if (any_callback_.shared_ptr_callback) { + any_callback_.shared_ptr_callback(typed_message); + } else if (any_callback_.shared_ptr_with_info_callback) { + any_callback_.shared_ptr_with_info_callback(typed_message, message_info); + } else if (any_callback_.unique_ptr_callback) { + std::unique_ptr unique_msg(new MessageT(*typed_message)); + any_callback_.unique_ptr_callback(unique_msg); + } else if (any_callback_.unique_ptr_with_info_callback) { + std::unique_ptr unique_msg(new MessageT(*typed_message)); + any_callback_.unique_ptr_with_info_callback(unique_msg, message_info); + } else { + throw std::runtime_error("unexpected message without any callback set"); + } } void return_message(std::shared_ptr & message) @@ -208,7 +224,9 @@ public: message_memory_strategy_->return_message(typed_message); } - void handle_intra_process_message(rcl_interfaces::msg::IntraProcessMessage & ipm) + void handle_intra_process_message( + rcl_interfaces::msg::IntraProcessMessage & ipm, + const rmw_message_info_t & message_info) { if (!get_intra_process_message_callback_) { // throw std::runtime_error( @@ -229,8 +247,19 @@ public: // TODO(wjwwood): should we notify someone of this? log error, log warning? return; } - typename MessageT::SharedPtr shared_msg = std::move(msg); - callback_(shared_msg); + if (any_callback_.shared_ptr_callback) { + typename MessageT::SharedPtr shared_msg = std::move(msg); + any_callback_.shared_ptr_callback(shared_msg); + } else if (any_callback_.shared_ptr_with_info_callback) { + typename MessageT::SharedPtr shared_msg = std::move(msg); + any_callback_.shared_ptr_with_info_callback(shared_msg, message_info); + } else if (any_callback_.unique_ptr_callback) { + any_callback_.unique_ptr_callback(msg); + } else if (any_callback_.unique_ptr_with_info_callback) { + any_callback_.unique_ptr_with_info_callback(msg, message_info); + } else { + throw std::runtime_error("unexpected message without any callback set"); + } } private: @@ -254,7 +283,7 @@ private: RCLCPP_DISABLE_COPY(Subscription); - CallbackType callback_; + AnySubscriptionCallback any_callback_; typename message_memory_strategy::MessageMemoryStrategy::SharedPtr message_memory_strategy_; diff --git a/rclcpp/include/rclcpp/utilities.hpp b/rclcpp/include/rclcpp/utilities.hpp index e0d2a11..fb913ba 100644 --- a/rclcpp/include/rclcpp/utilities.hpp +++ b/rclcpp/include/rclcpp/utilities.hpp @@ -192,7 +192,8 @@ sleep_for(const std::chrono::nanoseconds & nanoseconds) // TODO: determine if posix's nanosleep(2) is more efficient here std::unique_lock lock(::g_interrupt_mutex); auto cvs = ::g_interrupt_condition_variable.wait_for(lock, nanoseconds); - return cvs == std::cv_status::no_timeout; + // Return true if the timeout elapsed successfully, otherwise false. + return cvs != std::cv_status::no_timeout; } } /* namespace utilities */ diff --git a/rclcpp/package.xml b/rclcpp/package.xml index 50fb412..afae9eb 100644 --- a/rclcpp/package.xml +++ b/rclcpp/package.xml @@ -16,6 +16,7 @@ ament_cmake_gtest ament_lint_auto ament_lint_common + rmw ament_cmake