Merge pull request #98 from ros2/intra_process_img
Add missing API for the intra process demo
This commit is contained in:
		
						commit
						65300405be
					
				
					 10 changed files with 433 additions and 92 deletions
				
			
		
							
								
								
									
										127
									
								
								rclcpp/include/rclcpp/any_subscription_callback.hpp
									
										
									
									
									
										Normal file
									
								
							
							
						
						
									
										127
									
								
								rclcpp/include/rclcpp/any_subscription_callback.hpp
									
										
									
									
									
										Normal file
									
								
							| 
						 | 
				
			
			@ -0,0 +1,127 @@
 | 
			
		|||
// Copyright 2014 Open Source Robotics Foundation, Inc.
 | 
			
		||||
//
 | 
			
		||||
// Licensed under the Apache License, Version 2.0 (the "License");
 | 
			
		||||
// you may not use this file except in compliance with the License.
 | 
			
		||||
// You may obtain a copy of the License at
 | 
			
		||||
//
 | 
			
		||||
//     http://www.apache.org/licenses/LICENSE-2.0
 | 
			
		||||
//
 | 
			
		||||
// Unless required by applicable law or agreed to in writing, software
 | 
			
		||||
// distributed under the License is distributed on an "AS IS" BASIS,
 | 
			
		||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
			
		||||
// See the License for the specific language governing permissions and
 | 
			
		||||
// limitations under the License.
 | 
			
		||||
 | 
			
		||||
#ifndef RCLCPP_RCLCPP_ANY_SUBSCRIPTION_CALLBACK_HPP_
 | 
			
		||||
#define RCLCPP_RCLCPP_ANY_SUBSCRIPTION_CALLBACK_HPP_
 | 
			
		||||
 | 
			
		||||
#include <rclcpp/function_traits.hpp>
 | 
			
		||||
 | 
			
		||||
#include <functional>
 | 
			
		||||
#include <memory>
 | 
			
		||||
#include <type_traits>
 | 
			
		||||
 | 
			
		||||
#include <rmw/types.h>
 | 
			
		||||
 | 
			
		||||
namespace rclcpp
 | 
			
		||||
{
 | 
			
		||||
 | 
			
		||||
namespace any_subscription_callback
 | 
			
		||||
{
 | 
			
		||||
 | 
			
		||||
template<typename MessageT>
 | 
			
		||||
struct AnySubscriptionCallback
 | 
			
		||||
{
 | 
			
		||||
  using SharedPtrCallback = std::function<void(const std::shared_ptr<MessageT> &)>;
 | 
			
		||||
  using SharedPtrWithInfoCallback =
 | 
			
		||||
      std::function<void(const std::shared_ptr<MessageT> &, const rmw_message_info_t &)>;
 | 
			
		||||
  using UniquePtrCallback = std::function<void(std::unique_ptr<MessageT> &)>;
 | 
			
		||||
  using UniquePtrWithInfoCallback =
 | 
			
		||||
      std::function<void(std::unique_ptr<MessageT> &, const rmw_message_info_t &)>;
 | 
			
		||||
 | 
			
		||||
  SharedPtrCallback shared_ptr_callback;
 | 
			
		||||
  SharedPtrWithInfoCallback shared_ptr_with_info_callback;
 | 
			
		||||
  UniquePtrCallback unique_ptr_callback;
 | 
			
		||||
  UniquePtrWithInfoCallback unique_ptr_with_info_callback;
 | 
			
		||||
 | 
			
		||||
  AnySubscriptionCallback()
 | 
			
		||||
  : shared_ptr_callback(nullptr), shared_ptr_with_info_callback(nullptr),
 | 
			
		||||
    unique_ptr_callback(nullptr), unique_ptr_with_info_callback(nullptr)
 | 
			
		||||
  {}
 | 
			
		||||
 | 
			
		||||
  AnySubscriptionCallback(const AnySubscriptionCallback &) = default;
 | 
			
		||||
 | 
			
		||||
  template<typename CallbackT,
 | 
			
		||||
  typename std::enable_if<
 | 
			
		||||
    function_traits<CallbackT>::arity == 1
 | 
			
		||||
  >::type * = nullptr,
 | 
			
		||||
  typename std::enable_if<
 | 
			
		||||
    std::is_same<
 | 
			
		||||
      typename function_traits<CallbackT>::template argument_type<0>,
 | 
			
		||||
      typename std::shared_ptr<MessageT>
 | 
			
		||||
    >::value
 | 
			
		||||
  >::type * = nullptr
 | 
			
		||||
  >
 | 
			
		||||
  void set(CallbackT callback)
 | 
			
		||||
  {
 | 
			
		||||
    shared_ptr_callback = callback;
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  template<typename CallbackT,
 | 
			
		||||
  typename std::enable_if<
 | 
			
		||||
    function_traits<CallbackT>::arity == 2
 | 
			
		||||
  >::type * = nullptr,
 | 
			
		||||
  typename std::enable_if<
 | 
			
		||||
    std::is_same<
 | 
			
		||||
      typename function_traits<CallbackT>::template argument_type<0>,
 | 
			
		||||
      typename std::shared_ptr<MessageT>
 | 
			
		||||
    >::value
 | 
			
		||||
  >::type * = nullptr
 | 
			
		||||
  >
 | 
			
		||||
  void set(CallbackT callback)
 | 
			
		||||
  {
 | 
			
		||||
    shared_ptr_with_info_callback = callback;
 | 
			
		||||
  }
 | 
			
		||||
/*
 | 
			
		||||
  template<typename CallbackT,
 | 
			
		||||
    typename std::enable_if<
 | 
			
		||||
      function_traits<CallbackT>::arity == 1
 | 
			
		||||
    >::type * = nullptr,
 | 
			
		||||
    typename std::enable_if<
 | 
			
		||||
      std::is_same<
 | 
			
		||||
        typename function_traits<CallbackT>::template argument_type<0>,
 | 
			
		||||
        typename std::unique_ptr<MessageT>
 | 
			
		||||
      >::value
 | 
			
		||||
    >::type * = nullptr
 | 
			
		||||
  >
 | 
			
		||||
  void set(CallbackT callback)
 | 
			
		||||
  {
 | 
			
		||||
    static_assert(std::is_same<
 | 
			
		||||
        typename function_traits<CallbackT>::template argument_type<0>,
 | 
			
		||||
        typename std::unique_ptr<MessageT>
 | 
			
		||||
      >::value, "Not a unique pointer");
 | 
			
		||||
    unique_ptr_callback = callback;
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  template<typename CallbackT,
 | 
			
		||||
    typename std::enable_if<
 | 
			
		||||
      function_traits<CallbackT>::arity == 2
 | 
			
		||||
    >::type * = nullptr,
 | 
			
		||||
    typename std::enable_if<
 | 
			
		||||
      std::is_same<
 | 
			
		||||
        typename function_traits<CallbackT>::template argument_type<0>,
 | 
			
		||||
        typename std::unique_ptr<MessageT>
 | 
			
		||||
      >::value
 | 
			
		||||
    >::type * = nullptr
 | 
			
		||||
  >
 | 
			
		||||
  void set(CallbackT callback)
 | 
			
		||||
  {
 | 
			
		||||
    unique_ptr_with_info_callback = callback;
 | 
			
		||||
  }
 | 
			
		||||
  */
 | 
			
		||||
};
 | 
			
		||||
 | 
			
		||||
} /* namespace any_subscription_callback */
 | 
			
		||||
} /* namespace rclcpp */
 | 
			
		||||
 | 
			
		||||
#endif /* RCLCPP_RCLCPP_ANY_SUBSCRIPTION_CALLBACK_HPP_ */
 | 
			
		||||
| 
						 | 
				
			
			@ -64,7 +64,7 @@ public:
 | 
			
		|||
  virtual void spin() = 0;
 | 
			
		||||
 | 
			
		||||
  virtual void
 | 
			
		||||
  add_node(rclcpp::node::Node::SharedPtr & node_ptr, bool notify = true)
 | 
			
		||||
  add_node(rclcpp::node::Node::SharedPtr node_ptr, bool notify = true)
 | 
			
		||||
  {
 | 
			
		||||
    // Check to ensure node not already added
 | 
			
		||||
    for (auto & weak_node : weak_nodes_) {
 | 
			
		||||
| 
						 | 
				
			
			@ -190,7 +190,8 @@ protected:
 | 
			
		|||
      rmw_take_with_info(subscription->subscription_handle_, message.get(), &taken, &message_info);
 | 
			
		||||
    if (ret == RMW_RET_OK) {
 | 
			
		||||
      if (taken) {
 | 
			
		||||
        subscription->handle_message(message, &message_info.publisher_gid);
 | 
			
		||||
        message_info.from_intra_process = false;
 | 
			
		||||
        subscription->handle_message(message, message_info);
 | 
			
		||||
      }
 | 
			
		||||
    } else {
 | 
			
		||||
      fprintf(stderr,
 | 
			
		||||
| 
						 | 
				
			
			@ -206,10 +207,16 @@ protected:
 | 
			
		|||
  {
 | 
			
		||||
    rcl_interfaces::msg::IntraProcessMessage ipm;
 | 
			
		||||
    bool taken = false;
 | 
			
		||||
    rmw_ret_t status = rmw_take(subscription->intra_process_subscription_handle_, &ipm, &taken);
 | 
			
		||||
    rmw_message_info_t message_info;
 | 
			
		||||
    rmw_ret_t status = rmw_take_with_info(
 | 
			
		||||
      subscription->intra_process_subscription_handle_,
 | 
			
		||||
      &ipm,
 | 
			
		||||
      &taken,
 | 
			
		||||
      &message_info);
 | 
			
		||||
    if (status == RMW_RET_OK) {
 | 
			
		||||
      if (taken) {
 | 
			
		||||
        subscription->handle_intra_process_message(ipm);
 | 
			
		||||
        message_info.from_intra_process = true;
 | 
			
		||||
        subscription->handle_intra_process_message(ipm, message_info);
 | 
			
		||||
      }
 | 
			
		||||
    } else {
 | 
			
		||||
      fprintf(stderr,
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
							
								
								
									
										63
									
								
								rclcpp/include/rclcpp/function_traits.hpp
									
										
									
									
									
										Normal file
									
								
							
							
						
						
									
										63
									
								
								rclcpp/include/rclcpp/function_traits.hpp
									
										
									
									
									
										Normal file
									
								
							| 
						 | 
				
			
			@ -0,0 +1,63 @@
 | 
			
		|||
// Copyright 2014 Open Source Robotics Foundation, Inc.
 | 
			
		||||
//
 | 
			
		||||
// Licensed under the Apache License, Version 2.0 (the "License");
 | 
			
		||||
// you may not use this file except in compliance with the License.
 | 
			
		||||
// You may obtain a copy of the License at
 | 
			
		||||
//
 | 
			
		||||
//     http://www.apache.org/licenses/LICENSE-2.0
 | 
			
		||||
//
 | 
			
		||||
// Unless required by applicable law or agreed to in writing, software
 | 
			
		||||
// distributed under the License is distributed on an "AS IS" BASIS,
 | 
			
		||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
			
		||||
// See the License for the specific language governing permissions and
 | 
			
		||||
// limitations under the License.
 | 
			
		||||
 | 
			
		||||
#ifndef RCLCPP_RCLCPP_FUNCTION_TRAITS_HPP_
 | 
			
		||||
#define RCLCPP_RCLCPP_FUNCTION_TRAITS_HPP_
 | 
			
		||||
#include <memory>
 | 
			
		||||
 | 
			
		||||
namespace rclcpp
 | 
			
		||||
{
 | 
			
		||||
 | 
			
		||||
/* NOTE(esteve):
 | 
			
		||||
 * We support service callbacks that can optionally take the request id,
 | 
			
		||||
 * which should be possible with two overloaded create_service methods,
 | 
			
		||||
 * but unfortunately std::function's constructor on VS2015 is too greedy,
 | 
			
		||||
 * so we need a mechanism for checking the arity and the type of each argument
 | 
			
		||||
 * in a callback function.
 | 
			
		||||
 */
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
template<typename FunctionT>
 | 
			
		||||
struct function_traits
 | 
			
		||||
{
 | 
			
		||||
  static constexpr std::size_t arity =
 | 
			
		||||
    function_traits<decltype( & FunctionT::operator())>::arity - 1;
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
  template<std::size_t N>
 | 
			
		||||
  using argument_type =
 | 
			
		||||
      typename function_traits<decltype( & FunctionT::operator())>::template argument_type<N + 1>;
 | 
			
		||||
};
 | 
			
		||||
 | 
			
		||||
template<typename ReturnTypeT, typename ... Args>
 | 
			
		||||
struct function_traits<ReturnTypeT(Args ...)>
 | 
			
		||||
{
 | 
			
		||||
  static constexpr std::size_t arity = sizeof ... (Args);
 | 
			
		||||
 | 
			
		||||
  template<std::size_t N>
 | 
			
		||||
  using argument_type = typename std::tuple_element<N, std::tuple<Args ...>>::type;
 | 
			
		||||
};
 | 
			
		||||
 | 
			
		||||
template<typename ReturnTypeT, typename ... Args>
 | 
			
		||||
struct function_traits<ReturnTypeT (*)(Args ...)>: public function_traits<ReturnTypeT(Args ...)>
 | 
			
		||||
{};
 | 
			
		||||
 | 
			
		||||
template<typename ClassT, typename ReturnTypeT, typename ... Args>
 | 
			
		||||
struct function_traits<ReturnTypeT (ClassT::*)(Args ...) const>
 | 
			
		||||
  : public function_traits<ReturnTypeT(ClassT &, Args ...)>
 | 
			
		||||
{};
 | 
			
		||||
 | 
			
		||||
} /* namespace rclcpp */
 | 
			
		||||
 | 
			
		||||
#endif /* RCLCPP_RCLCPP_FUNCTION_TRAITS_HPP_ */
 | 
			
		||||
| 
						 | 
				
			
			@ -29,6 +29,7 @@
 | 
			
		|||
#include <rclcpp/callback_group.hpp>
 | 
			
		||||
#include <rclcpp/client.hpp>
 | 
			
		||||
#include <rclcpp/context.hpp>
 | 
			
		||||
#include <rclcpp/function_traits.hpp>
 | 
			
		||||
#include <rclcpp/macros.hpp>
 | 
			
		||||
#include <rclcpp/message_memory_strategy.hpp>
 | 
			
		||||
#include <rclcpp/parameter.hpp>
 | 
			
		||||
| 
						 | 
				
			
			@ -56,42 +57,6 @@ class Executor;
 | 
			
		|||
namespace node
 | 
			
		||||
{
 | 
			
		||||
 | 
			
		||||
/* NOTE(esteve):
 | 
			
		||||
 * We support service callbacks that can optionally take the request id,
 | 
			
		||||
 * which should be possible with two overloaded create_service methods,
 | 
			
		||||
 * but unfortunately std::function's constructor on VS2015 is too greedy,
 | 
			
		||||
 * so we need a mechanism for checking the arity and the type of each argument
 | 
			
		||||
 * in a callback function.
 | 
			
		||||
 */
 | 
			
		||||
template<typename FunctionT>
 | 
			
		||||
struct function_traits
 | 
			
		||||
{
 | 
			
		||||
  static constexpr std::size_t arity =
 | 
			
		||||
    function_traits<decltype( & FunctionT::operator())>::arity - 1;
 | 
			
		||||
 | 
			
		||||
  template<std::size_t N>
 | 
			
		||||
  using argument_type =
 | 
			
		||||
      typename function_traits<decltype( & FunctionT::operator())>::template argument_type<N + 1>;
 | 
			
		||||
};
 | 
			
		||||
 | 
			
		||||
template<typename ReturnTypeT, typename ... Args>
 | 
			
		||||
struct function_traits<ReturnTypeT(Args ...)>
 | 
			
		||||
{
 | 
			
		||||
  static constexpr std::size_t arity = sizeof ... (Args);
 | 
			
		||||
 | 
			
		||||
  template<std::size_t N>
 | 
			
		||||
  using argument_type = typename std::tuple_element<N, std::tuple<Args ...>>::type;
 | 
			
		||||
};
 | 
			
		||||
 | 
			
		||||
template<typename ReturnTypeT, typename ... Args>
 | 
			
		||||
struct function_traits<ReturnTypeT (*)(Args ...)>: public function_traits<ReturnTypeT(Args ...)>
 | 
			
		||||
{};
 | 
			
		||||
 | 
			
		||||
template<typename ClassT, typename ReturnTypeT, typename ... Args>
 | 
			
		||||
struct function_traits<ReturnTypeT (ClassT::*)(Args ...) const>
 | 
			
		||||
  : public function_traits<ReturnTypeT(ClassT &, Args ...)>
 | 
			
		||||
{};
 | 
			
		||||
 | 
			
		||||
/* ROS Node Interface.
 | 
			
		||||
 *
 | 
			
		||||
 * This is the single point of entry for creating publishers and subscribers.
 | 
			
		||||
| 
						 | 
				
			
			@ -130,12 +95,23 @@ public:
 | 
			
		|||
     Windows build breaks when static member function passed as default
 | 
			
		||||
     argument to msg_mem_strat, nullptr is a workaround.
 | 
			
		||||
   */
 | 
			
		||||
  template<typename MessageT>
 | 
			
		||||
  template<typename MessageT, typename CallbackT>
 | 
			
		||||
  typename rclcpp::subscription::Subscription<MessageT>::SharedPtr
 | 
			
		||||
  create_subscription(
 | 
			
		||||
    const std::string & topic_name,
 | 
			
		||||
    const rmw_qos_profile_t & qos_profile,
 | 
			
		||||
    std::function<void(const std::shared_ptr<MessageT> &)> callback,
 | 
			
		||||
    CallbackT callback,
 | 
			
		||||
    rclcpp::callback_group::CallbackGroup::SharedPtr group = nullptr,
 | 
			
		||||
    bool ignore_local_publications = false,
 | 
			
		||||
    typename rclcpp::message_memory_strategy::MessageMemoryStrategy<MessageT>::SharedPtr
 | 
			
		||||
    msg_mem_strat = nullptr);
 | 
			
		||||
 | 
			
		||||
  template<typename MessageT>
 | 
			
		||||
  typename rclcpp::subscription::Subscription<MessageT>::SharedPtr
 | 
			
		||||
  create_subscription_with_unique_ptr_callback(
 | 
			
		||||
    const std::string & topic_name,
 | 
			
		||||
    const rmw_qos_profile_t & qos_profile,
 | 
			
		||||
    typename rclcpp::subscription::AnySubscriptionCallback<MessageT>::UniquePtrCallback callback,
 | 
			
		||||
    rclcpp::callback_group::CallbackGroup::SharedPtr group = nullptr,
 | 
			
		||||
    bool ignore_local_publications = false,
 | 
			
		||||
    typename rclcpp::message_memory_strategy::MessageMemoryStrategy<MessageT>::SharedPtr
 | 
			
		||||
| 
						 | 
				
			
			@ -148,11 +124,12 @@ public:
 | 
			
		|||
    rclcpp::timer::CallbackType callback,
 | 
			
		||||
    rclcpp::callback_group::CallbackGroup::SharedPtr group = nullptr);
 | 
			
		||||
 | 
			
		||||
  rclcpp::timer::WallTimer::SharedPtr
 | 
			
		||||
  create_wall_timer(
 | 
			
		||||
    std::chrono::duration<long double, std::nano> period,
 | 
			
		||||
    rclcpp::timer::CallbackType callback,
 | 
			
		||||
    rclcpp::callback_group::CallbackGroup::SharedPtr group = nullptr);
 | 
			
		||||
  // TODO(wjwwood): reenable this once I figure out why the demo doesn't build with it.
 | 
			
		||||
  // rclcpp::timer::WallTimer::SharedPtr
 | 
			
		||||
  // create_wall_timer(
 | 
			
		||||
  //   std::chrono::duration<long double, std::nano> period,
 | 
			
		||||
  //   rclcpp::timer::CallbackType callback,
 | 
			
		||||
  //   rclcpp::callback_group::CallbackGroup::SharedPtr group = nullptr);
 | 
			
		||||
 | 
			
		||||
  using CallbackGroup = rclcpp::callback_group::CallbackGroup;
 | 
			
		||||
  using CallbackGroupWeakPtr = std::weak_ptr<CallbackGroup>;
 | 
			
		||||
| 
						 | 
				
			
			@ -227,6 +204,16 @@ private:
 | 
			
		|||
 | 
			
		||||
  publisher::Publisher::SharedPtr events_publisher_;
 | 
			
		||||
 | 
			
		||||
  template<typename MessageT>
 | 
			
		||||
  typename subscription::Subscription<MessageT>::SharedPtr
 | 
			
		||||
  create_subscription_internal(
 | 
			
		||||
    const std::string & topic_name,
 | 
			
		||||
    const rmw_qos_profile_t & qos_profile,
 | 
			
		||||
    rclcpp::subscription::AnySubscriptionCallback<MessageT> callback,
 | 
			
		||||
    rclcpp::callback_group::CallbackGroup::SharedPtr group,
 | 
			
		||||
    bool ignore_local_publications,
 | 
			
		||||
    typename message_memory_strategy::MessageMemoryStrategy<MessageT>::SharedPtr msg_mem_strat);
 | 
			
		||||
 | 
			
		||||
  template<
 | 
			
		||||
    typename ServiceT,
 | 
			
		||||
    typename FunctorT,
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -153,7 +153,7 @@ Node::create_publisher(
 | 
			
		|||
    rclcpp::intra_process_manager::IntraProcessManager::WeakPtr weak_ipm = intra_process_manager;
 | 
			
		||||
    // *INDENT-OFF*
 | 
			
		||||
    auto shared_publish_callback =
 | 
			
		||||
      [weak_ipm](uint64_t publisher_id, std::shared_ptr<void> msg) -> uint64_t
 | 
			
		||||
      [weak_ipm](uint64_t publisher_id, void * msg, const std::type_info & type_info) -> uint64_t
 | 
			
		||||
    {
 | 
			
		||||
      auto ipm = weak_ipm.lock();
 | 
			
		||||
      if (!ipm) {
 | 
			
		||||
| 
						 | 
				
			
			@ -161,8 +161,17 @@ Node::create_publisher(
 | 
			
		|||
        throw std::runtime_error(
 | 
			
		||||
          "intra process publish called after destruction of intra process manager");
 | 
			
		||||
      }
 | 
			
		||||
      auto typed_msg = std::static_pointer_cast<const MessageT>(msg);
 | 
			
		||||
      std::unique_ptr<MessageT> unique_msg(new MessageT(*typed_msg));
 | 
			
		||||
      if (!msg) {
 | 
			
		||||
        throw std::runtime_error("cannot publisher msg which is a null pointer");
 | 
			
		||||
      }
 | 
			
		||||
      auto & message_type_info = typeid(MessageT);
 | 
			
		||||
      if (message_type_info != type_info) {
 | 
			
		||||
        throw std::runtime_error(
 | 
			
		||||
          std::string("published type '") + type_info.name() +
 | 
			
		||||
          "' is incompatible from the publisher type '" + message_type_info.name() + "'");
 | 
			
		||||
      }
 | 
			
		||||
      MessageT * typed_message_ptr = static_cast<MessageT *>(msg);
 | 
			
		||||
      std::unique_ptr<MessageT> unique_msg(typed_message_ptr);
 | 
			
		||||
      uint64_t message_seq = ipm->store_intra_process_message(publisher_id, unique_msg);
 | 
			
		||||
      return message_seq;
 | 
			
		||||
    };
 | 
			
		||||
| 
						 | 
				
			
			@ -188,12 +197,56 @@ Node::group_in_node(callback_group::CallbackGroup::SharedPtr & group)
 | 
			
		|||
  return group_belongs_to_this_node;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
template<typename MessageT>
 | 
			
		||||
typename subscription::Subscription<MessageT>::SharedPtr
 | 
			
		||||
template<typename MessageT, typename CallbackT>
 | 
			
		||||
typename rclcpp::subscription::Subscription<MessageT>::SharedPtr
 | 
			
		||||
Node::create_subscription(
 | 
			
		||||
  const std::string & topic_name,
 | 
			
		||||
  const rmw_qos_profile_t & qos_profile,
 | 
			
		||||
  std::function<void(const std::shared_ptr<MessageT> &)> callback,
 | 
			
		||||
  CallbackT callback,
 | 
			
		||||
  rclcpp::callback_group::CallbackGroup::SharedPtr group,
 | 
			
		||||
  bool ignore_local_publications,
 | 
			
		||||
  typename rclcpp::message_memory_strategy::MessageMemoryStrategy<MessageT>::SharedPtr
 | 
			
		||||
  msg_mem_strat)
 | 
			
		||||
{
 | 
			
		||||
  rclcpp::subscription::AnySubscriptionCallback<MessageT> any_subscription_callback;
 | 
			
		||||
  any_subscription_callback.set(callback);
 | 
			
		||||
  return this->create_subscription_internal(
 | 
			
		||||
    topic_name,
 | 
			
		||||
    qos_profile,
 | 
			
		||||
    any_subscription_callback,
 | 
			
		||||
    group,
 | 
			
		||||
    ignore_local_publications,
 | 
			
		||||
    msg_mem_strat);
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
template<typename MessageT>
 | 
			
		||||
typename rclcpp::subscription::Subscription<MessageT>::SharedPtr
 | 
			
		||||
Node::create_subscription_with_unique_ptr_callback(
 | 
			
		||||
  const std::string & topic_name,
 | 
			
		||||
  const rmw_qos_profile_t & qos_profile,
 | 
			
		||||
  typename rclcpp::subscription::AnySubscriptionCallback<MessageT>::UniquePtrCallback callback,
 | 
			
		||||
  rclcpp::callback_group::CallbackGroup::SharedPtr group,
 | 
			
		||||
  bool ignore_local_publications,
 | 
			
		||||
  typename rclcpp::message_memory_strategy::MessageMemoryStrategy<MessageT>::SharedPtr
 | 
			
		||||
  msg_mem_strat)
 | 
			
		||||
{
 | 
			
		||||
  rclcpp::subscription::AnySubscriptionCallback<MessageT> any_subscription_callback;
 | 
			
		||||
  any_subscription_callback.unique_ptr_callback = callback;
 | 
			
		||||
  return this->create_subscription_internal(
 | 
			
		||||
    topic_name,
 | 
			
		||||
    qos_profile,
 | 
			
		||||
    any_subscription_callback,
 | 
			
		||||
    group,
 | 
			
		||||
    ignore_local_publications,
 | 
			
		||||
    msg_mem_strat);
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
template<typename MessageT>
 | 
			
		||||
typename subscription::Subscription<MessageT>::SharedPtr
 | 
			
		||||
Node::create_subscription_internal(
 | 
			
		||||
  const std::string & topic_name,
 | 
			
		||||
  const rmw_qos_profile_t & qos_profile,
 | 
			
		||||
  rclcpp::subscription::AnySubscriptionCallback<MessageT> callback,
 | 
			
		||||
  rclcpp::callback_group::CallbackGroup::SharedPtr group,
 | 
			
		||||
  bool ignore_local_publications,
 | 
			
		||||
  typename message_memory_strategy::MessageMemoryStrategy<MessageT>::SharedPtr msg_mem_strat)
 | 
			
		||||
| 
						 | 
				
			
			@ -305,17 +358,18 @@ Node::create_wall_timer(
 | 
			
		|||
  return timer;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
rclcpp::timer::WallTimer::SharedPtr
 | 
			
		||||
Node::create_wall_timer(
 | 
			
		||||
  std::chrono::duration<long double, std::nano> period,
 | 
			
		||||
  rclcpp::timer::CallbackType callback,
 | 
			
		||||
  rclcpp::callback_group::CallbackGroup::SharedPtr group)
 | 
			
		||||
{
 | 
			
		||||
  return create_wall_timer(
 | 
			
		||||
    std::chrono::duration_cast<std::chrono::nanoseconds>(period),
 | 
			
		||||
    callback,
 | 
			
		||||
    group);
 | 
			
		||||
}
 | 
			
		||||
// TODO(wjwwood): reenable this once I figure out why the demo doesn't build with it.
 | 
			
		||||
// rclcpp::timer::WallTimer::SharedPtr
 | 
			
		||||
// Node::create_wall_timer(
 | 
			
		||||
//   std::chrono::duration<long double, std::nano> period,
 | 
			
		||||
//   rclcpp::timer::CallbackType callback,
 | 
			
		||||
//   rclcpp::callback_group::CallbackGroup::SharedPtr group)
 | 
			
		||||
// {
 | 
			
		||||
//   return create_wall_timer(
 | 
			
		||||
//     std::chrono::duration_cast<std::chrono::nanoseconds>(period),
 | 
			
		||||
//     callback,
 | 
			
		||||
//     group);
 | 
			
		||||
// }
 | 
			
		||||
 | 
			
		||||
template<typename ServiceT>
 | 
			
		||||
typename client::Client<ServiceT>::SharedPtr
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -229,7 +229,7 @@ public:
 | 
			
		|||
 | 
			
		||||
  template<typename FunctorT>
 | 
			
		||||
  typename rclcpp::subscription::Subscription<rcl_interfaces::msg::ParameterEvent>::SharedPtr
 | 
			
		||||
  on_parameter_event(FunctorT & callback)
 | 
			
		||||
  on_parameter_event(FunctorT callback)
 | 
			
		||||
  {
 | 
			
		||||
    return node_->create_subscription<rcl_interfaces::msg::ParameterEvent>(
 | 
			
		||||
      "parameter_events", rmw_qos_profile_parameter_events, callback);
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -19,6 +19,7 @@
 | 
			
		|||
 | 
			
		||||
#include <iostream>
 | 
			
		||||
#include <memory>
 | 
			
		||||
#include <mutex>
 | 
			
		||||
#include <sstream>
 | 
			
		||||
#include <string>
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -87,31 +88,73 @@ public:
 | 
			
		|||
 | 
			
		||||
  template<typename MessageT>
 | 
			
		||||
  void
 | 
			
		||||
  publish(std::shared_ptr<MessageT> & msg)
 | 
			
		||||
  publish(std::unique_ptr<MessageT> & msg)
 | 
			
		||||
  {
 | 
			
		||||
    rmw_ret_t status;
 | 
			
		||||
    status = rmw_publish(publisher_handle_, msg.get());
 | 
			
		||||
    if (status != RMW_RET_OK) {
 | 
			
		||||
      // *INDENT-OFF* (prevent uncrustify from making unecessary indents here)
 | 
			
		||||
      throw std::runtime_error(
 | 
			
		||||
        std::string("failed to publish message: ") + rmw_get_error_string_safe());
 | 
			
		||||
      // *INDENT-ON*
 | 
			
		||||
    }
 | 
			
		||||
    this->do_inter_process_publish<MessageT>(msg.get());
 | 
			
		||||
    if (store_intra_process_message_) {
 | 
			
		||||
      uint64_t message_seq = store_intra_process_message_(intra_process_publisher_id_, msg);
 | 
			
		||||
      // Take the pointer from the unique_msg, release it and pass as a void *
 | 
			
		||||
      // to the ipm. The ipm should then capture it again as a unique_ptr of
 | 
			
		||||
      // the correct type.
 | 
			
		||||
      // TODO(wjwwood):
 | 
			
		||||
      //   investigate how to transfer the custom deleter (if there is one)
 | 
			
		||||
      //   from the incoming unique_ptr through to the ipm's unique_ptr.
 | 
			
		||||
      //   See: http://stackoverflow.com/questions/11002641/dynamic-casting-for-unique-ptr
 | 
			
		||||
      MessageT * msg_ptr = msg.get();
 | 
			
		||||
      msg.release();
 | 
			
		||||
      uint64_t message_seq;
 | 
			
		||||
      {
 | 
			
		||||
        std::lock_guard<std::mutex> lock(intra_process_publish_mutex_);
 | 
			
		||||
        message_seq =
 | 
			
		||||
          store_intra_process_message_(intra_process_publisher_id_, msg_ptr, typeid(MessageT));
 | 
			
		||||
      }
 | 
			
		||||
      rcl_interfaces::msg::IntraProcessMessage ipm;
 | 
			
		||||
      ipm.publisher_id = intra_process_publisher_id_;
 | 
			
		||||
      ipm.message_sequence = message_seq;
 | 
			
		||||
      status = rmw_publish(intra_process_publisher_handle_, &ipm);
 | 
			
		||||
      auto status = rmw_publish(intra_process_publisher_handle_, &ipm);
 | 
			
		||||
      if (status != RMW_RET_OK) {
 | 
			
		||||
        // *INDENT-OFF* (prevent uncrustify from making unecessary indents here)
 | 
			
		||||
        throw std::runtime_error(
 | 
			
		||||
          std::string("failed to publish intra process message: ") + rmw_get_error_string_safe());
 | 
			
		||||
        // *INDENT-ON*
 | 
			
		||||
      }
 | 
			
		||||
    } else {
 | 
			
		||||
      // Always destroy the message, even if we don't consume it, for consistency.
 | 
			
		||||
      msg.reset();
 | 
			
		||||
    }
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  template<typename MessageT>
 | 
			
		||||
  void
 | 
			
		||||
  publish(const std::shared_ptr<MessageT> & msg)
 | 
			
		||||
  {
 | 
			
		||||
    // Avoid allocating when not using intra process.
 | 
			
		||||
    if (!store_intra_process_message_) {
 | 
			
		||||
      // In this case we're not using intra process.
 | 
			
		||||
      return this->do_inter_process_publish(msg.get());
 | 
			
		||||
    }
 | 
			
		||||
    // Otherwise we have to allocate memory in a unique_ptr and pass it along.
 | 
			
		||||
    // TODO(wjwwood):
 | 
			
		||||
    //   The intra process manager should probably also be able to store
 | 
			
		||||
    //   shared_ptr's and do the "smart" thing based on other intra process
 | 
			
		||||
    //   subscriptions. For now call the other publish().
 | 
			
		||||
    std::unique_ptr<MessageT> unique_msg(new MessageT(*msg.get()));
 | 
			
		||||
    return this->publish(unique_msg);
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  template<typename MessageT>
 | 
			
		||||
  void
 | 
			
		||||
  publish(const MessageT & msg)
 | 
			
		||||
  {
 | 
			
		||||
    // Avoid allocating when not using intra process.
 | 
			
		||||
    if (!store_intra_process_message_) {
 | 
			
		||||
      // In this case we're not using intra process.
 | 
			
		||||
      return this->do_inter_process_publish(msg.get());
 | 
			
		||||
    }
 | 
			
		||||
    // Otherwise we have to allocate memory in a unique_ptr and pass it along.
 | 
			
		||||
    std::unique_ptr<MessageT> unique_msg(new MessageT(msg));
 | 
			
		||||
    return this->publish(unique_msg);
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  const std::string &
 | 
			
		||||
  get_topic_name() const
 | 
			
		||||
  {
 | 
			
		||||
| 
						 | 
				
			
			@ -161,13 +204,27 @@ public:
 | 
			
		|||
    return result;
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  typedef std::function<uint64_t(uint64_t, std::shared_ptr<void>)> StoreSharedMessageCallbackT;
 | 
			
		||||
  typedef std::function<uint64_t(uint64_t, void *, const std::type_info &)> StoreMessageCallbackT;
 | 
			
		||||
 | 
			
		||||
protected:
 | 
			
		||||
  template<typename MessageT>
 | 
			
		||||
  void
 | 
			
		||||
  do_inter_process_publish(MessageT * msg)
 | 
			
		||||
  {
 | 
			
		||||
    auto status = rmw_publish(publisher_handle_, msg);
 | 
			
		||||
    if (status != RMW_RET_OK) {
 | 
			
		||||
      // *INDENT-OFF* (prevent uncrustify from making unecessary indents here)
 | 
			
		||||
      throw std::runtime_error(
 | 
			
		||||
        std::string("failed to publish message: ") + rmw_get_error_string_safe());
 | 
			
		||||
      // *INDENT-ON*
 | 
			
		||||
    }
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
  void
 | 
			
		||||
  setup_intra_process(
 | 
			
		||||
    uint64_t intra_process_publisher_id,
 | 
			
		||||
    StoreSharedMessageCallbackT callback,
 | 
			
		||||
    StoreMessageCallbackT callback,
 | 
			
		||||
    rmw_publisher_t * intra_process_publisher_handle)
 | 
			
		||||
  {
 | 
			
		||||
    intra_process_publisher_id_ = intra_process_publisher_id;
 | 
			
		||||
| 
						 | 
				
			
			@ -194,11 +251,13 @@ private:
 | 
			
		|||
  size_t queue_size_;
 | 
			
		||||
 | 
			
		||||
  uint64_t intra_process_publisher_id_;
 | 
			
		||||
  StoreSharedMessageCallbackT store_intra_process_message_;
 | 
			
		||||
  StoreMessageCallbackT store_intra_process_message_;
 | 
			
		||||
 | 
			
		||||
  rmw_gid_t rmw_gid_;
 | 
			
		||||
  rmw_gid_t intra_process_rmw_gid_;
 | 
			
		||||
 | 
			
		||||
  std::mutex intra_process_publish_mutex_;
 | 
			
		||||
 | 
			
		||||
};
 | 
			
		||||
 | 
			
		||||
} /* namespace publisher */
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -39,6 +39,18 @@ const std::chrono::nanoseconds operator"" _s(long double s)
 | 
			
		|||
    std::chrono::duration<long double>(s));
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
const std::chrono::nanoseconds
 | 
			
		||||
operator"" _ms(unsigned long long ms)
 | 
			
		||||
{
 | 
			
		||||
  return std::chrono::milliseconds(ms);
 | 
			
		||||
}
 | 
			
		||||
const std::chrono::nanoseconds
 | 
			
		||||
operator"" _ms(long double ms)
 | 
			
		||||
{
 | 
			
		||||
  return std::chrono::duration_cast<std::chrono::nanoseconds>(
 | 
			
		||||
    std::chrono::duration<long double, std::milli>(ms));
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
const std::chrono::nanoseconds
 | 
			
		||||
operator"" _ns(unsigned long long ns)
 | 
			
		||||
{
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -27,6 +27,7 @@
 | 
			
		|||
 | 
			
		||||
#include <rclcpp/macros.hpp>
 | 
			
		||||
#include <rclcpp/message_memory_strategy.hpp>
 | 
			
		||||
#include <rclcpp/any_subscription_callback.hpp>
 | 
			
		||||
 | 
			
		||||
namespace rclcpp
 | 
			
		||||
{
 | 
			
		||||
| 
						 | 
				
			
			@ -94,9 +95,13 @@ public:
 | 
			
		|||
  }
 | 
			
		||||
 | 
			
		||||
  virtual std::shared_ptr<void> create_message() = 0;
 | 
			
		||||
  virtual void handle_message(std::shared_ptr<void> & message, const rmw_gid_t * sender_id) = 0;
 | 
			
		||||
  virtual void handle_message(
 | 
			
		||||
    std::shared_ptr<void> & message,
 | 
			
		||||
    const rmw_message_info_t & message_info) = 0;
 | 
			
		||||
  virtual void return_message(std::shared_ptr<void> & message) = 0;
 | 
			
		||||
  virtual void handle_intra_process_message(rcl_interfaces::msg::IntraProcessMessage & ipm) = 0;
 | 
			
		||||
  virtual void handle_intra_process_message(
 | 
			
		||||
    rcl_interfaces::msg::IntraProcessMessage & ipm,
 | 
			
		||||
    const rmw_message_info_t & message_info) = 0;
 | 
			
		||||
 | 
			
		||||
protected:
 | 
			
		||||
  rmw_subscription_t * intra_process_subscription_handle_;
 | 
			
		||||
| 
						 | 
				
			
			@ -113,13 +118,14 @@ private:
 | 
			
		|||
 | 
			
		||||
};
 | 
			
		||||
 | 
			
		||||
using namespace any_subscription_callback;
 | 
			
		||||
 | 
			
		||||
template<typename MessageT>
 | 
			
		||||
class Subscription : public SubscriptionBase
 | 
			
		||||
{
 | 
			
		||||
  friend class rclcpp::node::Node;
 | 
			
		||||
 | 
			
		||||
public:
 | 
			
		||||
  using CallbackType = std::function<void(const std::shared_ptr<MessageT> &)>;
 | 
			
		||||
  RCLCPP_SMART_PTR_DEFINITIONS(Subscription);
 | 
			
		||||
 | 
			
		||||
  Subscription(
 | 
			
		||||
| 
						 | 
				
			
			@ -127,11 +133,11 @@ public:
 | 
			
		|||
    rmw_subscription_t * subscription_handle,
 | 
			
		||||
    const std::string & topic_name,
 | 
			
		||||
    bool ignore_local_publications,
 | 
			
		||||
    CallbackType callback,
 | 
			
		||||
    AnySubscriptionCallback<MessageT> callback,
 | 
			
		||||
    typename message_memory_strategy::MessageMemoryStrategy<MessageT>::SharedPtr memory_strategy =
 | 
			
		||||
    message_memory_strategy::MessageMemoryStrategy<MessageT>::create_default())
 | 
			
		||||
  : SubscriptionBase(node_handle, subscription_handle, topic_name, ignore_local_publications),
 | 
			
		||||
    callback_(callback),
 | 
			
		||||
    any_callback_(callback),
 | 
			
		||||
    message_memory_strategy_(memory_strategy),
 | 
			
		||||
    get_intra_process_message_callback_(nullptr),
 | 
			
		||||
    matches_any_intra_process_publishers_(nullptr)
 | 
			
		||||
| 
						 | 
				
			
			@ -148,17 +154,29 @@ public:
 | 
			
		|||
    return message_memory_strategy_->borrow_message();
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  void handle_message(std::shared_ptr<void> & message, const rmw_gid_t * sender_id)
 | 
			
		||||
  void handle_message(std::shared_ptr<void> & message, const rmw_message_info_t & message_info)
 | 
			
		||||
  {
 | 
			
		||||
    if (matches_any_intra_process_publishers_) {
 | 
			
		||||
      if (matches_any_intra_process_publishers_(sender_id)) {
 | 
			
		||||
      if (matches_any_intra_process_publishers_(&message_info.publisher_gid)) {
 | 
			
		||||
        // In this case, the message will be delivered via intra process and
 | 
			
		||||
        // we should ignore this copy of the message.
 | 
			
		||||
        return;
 | 
			
		||||
      }
 | 
			
		||||
    }
 | 
			
		||||
    auto typed_message = std::static_pointer_cast<MessageT>(message);
 | 
			
		||||
    callback_(typed_message);
 | 
			
		||||
    if (any_callback_.shared_ptr_callback) {
 | 
			
		||||
      any_callback_.shared_ptr_callback(typed_message);
 | 
			
		||||
    } else if (any_callback_.shared_ptr_with_info_callback) {
 | 
			
		||||
      any_callback_.shared_ptr_with_info_callback(typed_message, message_info);
 | 
			
		||||
    } else if (any_callback_.unique_ptr_callback) {
 | 
			
		||||
      std::unique_ptr<MessageT> unique_msg(new MessageT(*typed_message));
 | 
			
		||||
      any_callback_.unique_ptr_callback(unique_msg);
 | 
			
		||||
    } else if (any_callback_.unique_ptr_with_info_callback) {
 | 
			
		||||
      std::unique_ptr<MessageT> unique_msg(new MessageT(*typed_message));
 | 
			
		||||
      any_callback_.unique_ptr_with_info_callback(unique_msg, message_info);
 | 
			
		||||
    } else {
 | 
			
		||||
      throw std::runtime_error("unexpected message without any callback set");
 | 
			
		||||
    }
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  void return_message(std::shared_ptr<void> & message)
 | 
			
		||||
| 
						 | 
				
			
			@ -167,7 +185,9 @@ public:
 | 
			
		|||
    message_memory_strategy_->return_message(typed_message);
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  void handle_intra_process_message(rcl_interfaces::msg::IntraProcessMessage & ipm)
 | 
			
		||||
  void handle_intra_process_message(
 | 
			
		||||
    rcl_interfaces::msg::IntraProcessMessage & ipm,
 | 
			
		||||
    const rmw_message_info_t & message_info)
 | 
			
		||||
  {
 | 
			
		||||
    if (!get_intra_process_message_callback_) {
 | 
			
		||||
      // throw std::runtime_error(
 | 
			
		||||
| 
						 | 
				
			
			@ -188,8 +208,19 @@ public:
 | 
			
		|||
      // TODO(wjwwood): should we notify someone of this? log error, log warning?
 | 
			
		||||
      return;
 | 
			
		||||
    }
 | 
			
		||||
    typename MessageT::SharedPtr shared_msg = std::move(msg);
 | 
			
		||||
    callback_(shared_msg);
 | 
			
		||||
    if (any_callback_.shared_ptr_callback) {
 | 
			
		||||
      typename MessageT::SharedPtr shared_msg = std::move(msg);
 | 
			
		||||
      any_callback_.shared_ptr_callback(shared_msg);
 | 
			
		||||
    } else if (any_callback_.shared_ptr_with_info_callback) {
 | 
			
		||||
      typename MessageT::SharedPtr shared_msg = std::move(msg);
 | 
			
		||||
      any_callback_.shared_ptr_with_info_callback(shared_msg, message_info);
 | 
			
		||||
    } else if (any_callback_.unique_ptr_callback) {
 | 
			
		||||
      any_callback_.unique_ptr_callback(msg);
 | 
			
		||||
    } else if (any_callback_.unique_ptr_with_info_callback) {
 | 
			
		||||
      any_callback_.unique_ptr_with_info_callback(msg, message_info);
 | 
			
		||||
    } else {
 | 
			
		||||
      throw std::runtime_error("unexpected message without any callback set");
 | 
			
		||||
    }
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
private:
 | 
			
		||||
| 
						 | 
				
			
			@ -213,7 +244,7 @@ private:
 | 
			
		|||
 | 
			
		||||
  RCLCPP_DISABLE_COPY(Subscription);
 | 
			
		||||
 | 
			
		||||
  CallbackType callback_;
 | 
			
		||||
  AnySubscriptionCallback<MessageT> any_callback_;
 | 
			
		||||
  typename message_memory_strategy::MessageMemoryStrategy<MessageT>::SharedPtr
 | 
			
		||||
  message_memory_strategy_;
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -169,7 +169,8 @@ sleep_for(const std::chrono::nanoseconds & nanoseconds)
 | 
			
		|||
  // TODO: determine if posix's nanosleep(2) is more efficient here
 | 
			
		||||
  std::unique_lock<std::mutex> lock(::g_interrupt_mutex);
 | 
			
		||||
  auto cvs = ::g_interrupt_condition_variable.wait_for(lock, nanoseconds);
 | 
			
		||||
  return cvs == std::cv_status::no_timeout;
 | 
			
		||||
  // Return true if the timeout elapsed successfully, otherwise false.
 | 
			
		||||
  return cvs != std::cv_status::no_timeout;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
} /* namespace utilities */
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue