add template parameter to publisher
This commit is contained in:
		
							parent
							
								
									89541ea10f
								
							
						
					
					
						commit
						743d9d0e76
					
				
					 5 changed files with 174 additions and 139 deletions
				
			
		| 
						 | 
					@ -190,7 +190,8 @@ public:
 | 
				
			||||||
   */
 | 
					   */
 | 
				
			||||||
  template<typename MessageT>
 | 
					  template<typename MessageT>
 | 
				
			||||||
  uint64_t
 | 
					  uint64_t
 | 
				
			||||||
  add_publisher(publisher::Publisher::SharedPtr publisher, size_t buffer_size = 0)
 | 
					  add_publisher(typename publisher::Publisher<MessageT>::SharedPtr publisher,
 | 
				
			||||||
 | 
					    size_t buffer_size = 0)
 | 
				
			||||||
  {
 | 
					  {
 | 
				
			||||||
    auto id = IntraProcessManager::get_next_unique_id();
 | 
					    auto id = IntraProcessManager::get_next_unique_id();
 | 
				
			||||||
    publishers_[id].publisher = publisher;
 | 
					    publishers_[id].publisher = publisher;
 | 
				
			||||||
| 
						 | 
					@ -419,7 +420,7 @@ private:
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    PublisherInfo() = default;
 | 
					    PublisherInfo() = default;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    publisher::Publisher::WeakPtr publisher;
 | 
					    publisher::PublisherBase::WeakPtr publisher;
 | 
				
			||||||
    std::atomic<uint64_t> sequence_number;
 | 
					    std::atomic<uint64_t> sequence_number;
 | 
				
			||||||
    mapped_ring_buffer::MappedRingBufferBase::SharedPtr buffer;
 | 
					    mapped_ring_buffer::MappedRingBufferBase::SharedPtr buffer;
 | 
				
			||||||
    std::unordered_map<uint64_t, std::set<uint64_t>> target_subscriptions_by_message_sequence;
 | 
					    std::unordered_map<uint64_t, std::set<uint64_t>> target_subscriptions_by_message_sequence;
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -99,7 +99,7 @@ public:
 | 
				
			||||||
   * \return Shared pointer to the created publisher.
 | 
					   * \return Shared pointer to the created publisher.
 | 
				
			||||||
   */
 | 
					   */
 | 
				
			||||||
  template<typename MessageT>
 | 
					  template<typename MessageT>
 | 
				
			||||||
  rclcpp::publisher::Publisher::SharedPtr
 | 
					  typename rclcpp::publisher::Publisher<MessageT>::SharedPtr
 | 
				
			||||||
  create_publisher(
 | 
					  create_publisher(
 | 
				
			||||||
    const std::string & topic_name, const rmw_qos_profile_t & qos_profile);
 | 
					    const std::string & topic_name, const rmw_qos_profile_t & qos_profile);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					@ -235,7 +235,7 @@ private:
 | 
				
			||||||
 | 
					
 | 
				
			||||||
  std::map<std::string, rclcpp::parameter::ParameterVariant> parameters_;
 | 
					  std::map<std::string, rclcpp::parameter::ParameterVariant> parameters_;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
  publisher::Publisher::SharedPtr events_publisher_;
 | 
					  publisher::Publisher<rcl_interfaces::msg::ParameterEvent>::SharedPtr events_publisher_;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
  template<typename MessageT>
 | 
					  template<typename MessageT>
 | 
				
			||||||
  typename subscription::Subscription<MessageT>::SharedPtr
 | 
					  typename subscription::Subscription<MessageT>::SharedPtr
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -114,7 +114,7 @@ Node::create_callback_group(
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
template<typename MessageT>
 | 
					template<typename MessageT>
 | 
				
			||||||
publisher::Publisher::SharedPtr
 | 
					typename publisher::Publisher<MessageT>::SharedPtr
 | 
				
			||||||
Node::create_publisher(
 | 
					Node::create_publisher(
 | 
				
			||||||
  const std::string & topic_name, const rmw_qos_profile_t & qos_profile)
 | 
					  const std::string & topic_name, const rmw_qos_profile_t & qos_profile)
 | 
				
			||||||
{
 | 
					{
 | 
				
			||||||
| 
						 | 
					@ -130,7 +130,7 @@ Node::create_publisher(
 | 
				
			||||||
    // *INDENT-ON*
 | 
					    // *INDENT-ON*
 | 
				
			||||||
  }
 | 
					  }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
  auto publisher = publisher::Publisher::make_shared(
 | 
					  auto publisher = publisher::Publisher<MessageT>::make_shared(
 | 
				
			||||||
    node_handle_, publisher_handle, topic_name, qos_profile.depth);
 | 
					    node_handle_, publisher_handle, topic_name, qos_profile.depth);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
  if (use_intra_process_comms_) {
 | 
					  if (use_intra_process_comms_) {
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -40,14 +40,10 @@ class Node;
 | 
				
			||||||
namespace publisher
 | 
					namespace publisher
 | 
				
			||||||
{
 | 
					{
 | 
				
			||||||
 | 
					
 | 
				
			||||||
/// A publisher publishes messages of any type to a topic.
 | 
					class PublisherBase
 | 
				
			||||||
class Publisher
 | 
					 | 
				
			||||||
{
 | 
					{
 | 
				
			||||||
  friend rclcpp::node::Node;
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
public:
 | 
					public:
 | 
				
			||||||
  RCLCPP_SMART_PTR_DEFINITIONS(Publisher);
 | 
					  RCLCPP_SMART_PTR_DEFINITIONS(PublisherBase);
 | 
				
			||||||
 | 
					 | 
				
			||||||
  /// Default constructor.
 | 
					  /// Default constructor.
 | 
				
			||||||
  /**
 | 
					  /**
 | 
				
			||||||
   * Typically, a publisher is not created through this method, but instead is created through a
 | 
					   * Typically, a publisher is not created through this method, but instead is created through a
 | 
				
			||||||
| 
						 | 
					@ -57,7 +53,7 @@ public:
 | 
				
			||||||
   * \param[in] topic The topic that this publisher publishes on.
 | 
					   * \param[in] topic The topic that this publisher publishes on.
 | 
				
			||||||
   * \param[in] queue_size The maximum number of unpublished messages to queue.
 | 
					   * \param[in] queue_size The maximum number of unpublished messages to queue.
 | 
				
			||||||
   */
 | 
					   */
 | 
				
			||||||
  Publisher(
 | 
					  PublisherBase(
 | 
				
			||||||
    std::shared_ptr<rmw_node_t> node_handle,
 | 
					    std::shared_ptr<rmw_node_t> node_handle,
 | 
				
			||||||
    rmw_publisher_t * publisher_handle,
 | 
					    rmw_publisher_t * publisher_handle,
 | 
				
			||||||
    std::string topic,
 | 
					    std::string topic,
 | 
				
			||||||
| 
						 | 
					@ -77,7 +73,7 @@ public:
 | 
				
			||||||
  }
 | 
					  }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
  /// Default destructor.
 | 
					  /// Default destructor.
 | 
				
			||||||
  virtual ~Publisher()
 | 
					  virtual ~PublisherBase()
 | 
				
			||||||
  {
 | 
					  {
 | 
				
			||||||
    if (intra_process_publisher_handle_) {
 | 
					    if (intra_process_publisher_handle_) {
 | 
				
			||||||
      if (rmw_destroy_publisher(node_handle_.get(), intra_process_publisher_handle_)) {
 | 
					      if (rmw_destroy_publisher(node_handle_.get(), intra_process_publisher_handle_)) {
 | 
				
			||||||
| 
						 | 
					@ -97,98 +93,6 @@ public:
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
  }
 | 
					  }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
  /// Send a message to the topic for this publisher.
 | 
					 | 
				
			||||||
  /**
 | 
					 | 
				
			||||||
   * This function is templated on the input message type, MessageT.
 | 
					 | 
				
			||||||
   * \param[in] msg A shared pointer to the message to send.
 | 
					 | 
				
			||||||
   */
 | 
					 | 
				
			||||||
  template<typename MessageT>
 | 
					 | 
				
			||||||
  void
 | 
					 | 
				
			||||||
  publish(std::unique_ptr<MessageT> & msg)
 | 
					 | 
				
			||||||
  {
 | 
					 | 
				
			||||||
    this->do_inter_process_publish<MessageT>(msg.get());
 | 
					 | 
				
			||||||
    if (store_intra_process_message_) {
 | 
					 | 
				
			||||||
      // Take the pointer from the unique_msg, release it and pass as a void *
 | 
					 | 
				
			||||||
      // to the ipm. The ipm should then capture it again as a unique_ptr of
 | 
					 | 
				
			||||||
      // the correct type.
 | 
					 | 
				
			||||||
      // TODO(wjwwood):
 | 
					 | 
				
			||||||
      //   investigate how to transfer the custom deleter (if there is one)
 | 
					 | 
				
			||||||
      //   from the incoming unique_ptr through to the ipm's unique_ptr.
 | 
					 | 
				
			||||||
      //   See: http://stackoverflow.com/questions/11002641/dynamic-casting-for-unique-ptr
 | 
					 | 
				
			||||||
      MessageT * msg_ptr = msg.get();
 | 
					 | 
				
			||||||
      msg.release();
 | 
					 | 
				
			||||||
      uint64_t message_seq;
 | 
					 | 
				
			||||||
      {
 | 
					 | 
				
			||||||
        std::lock_guard<std::mutex> lock(intra_process_publish_mutex_);
 | 
					 | 
				
			||||||
        message_seq =
 | 
					 | 
				
			||||||
          store_intra_process_message_(intra_process_publisher_id_, msg_ptr, typeid(MessageT));
 | 
					 | 
				
			||||||
      }
 | 
					 | 
				
			||||||
      rcl_interfaces::msg::IntraProcessMessage ipm;
 | 
					 | 
				
			||||||
      ipm.publisher_id = intra_process_publisher_id_;
 | 
					 | 
				
			||||||
      ipm.message_sequence = message_seq;
 | 
					 | 
				
			||||||
      auto status = rmw_publish(intra_process_publisher_handle_, &ipm);
 | 
					 | 
				
			||||||
      if (status != RMW_RET_OK) {
 | 
					 | 
				
			||||||
        // *INDENT-OFF* (prevent uncrustify from making unecessary indents here)
 | 
					 | 
				
			||||||
        throw std::runtime_error(
 | 
					 | 
				
			||||||
          std::string("failed to publish intra process message: ") + rmw_get_error_string_safe());
 | 
					 | 
				
			||||||
        // *INDENT-ON*
 | 
					 | 
				
			||||||
      }
 | 
					 | 
				
			||||||
    } else {
 | 
					 | 
				
			||||||
      // Always destroy the message, even if we don't consume it, for consistency.
 | 
					 | 
				
			||||||
      msg.reset();
 | 
					 | 
				
			||||||
    }
 | 
					 | 
				
			||||||
  }
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
  template<typename MessageT>
 | 
					 | 
				
			||||||
  void
 | 
					 | 
				
			||||||
  publish(const std::shared_ptr<MessageT> & msg)
 | 
					 | 
				
			||||||
  {
 | 
					 | 
				
			||||||
    // Avoid allocating when not using intra process.
 | 
					 | 
				
			||||||
    if (!store_intra_process_message_) {
 | 
					 | 
				
			||||||
      // In this case we're not using intra process.
 | 
					 | 
				
			||||||
      return this->do_inter_process_publish(msg.get());
 | 
					 | 
				
			||||||
    }
 | 
					 | 
				
			||||||
    // Otherwise we have to allocate memory in a unique_ptr and pass it along.
 | 
					 | 
				
			||||||
    // TODO(wjwwood):
 | 
					 | 
				
			||||||
    //   The intra process manager should probably also be able to store
 | 
					 | 
				
			||||||
    //   shared_ptr's and do the "smart" thing based on other intra process
 | 
					 | 
				
			||||||
    //   subscriptions. For now call the other publish().
 | 
					 | 
				
			||||||
    std::unique_ptr<MessageT> unique_msg(new MessageT(*msg.get()));
 | 
					 | 
				
			||||||
    return this->publish(unique_msg);
 | 
					 | 
				
			||||||
  }
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
  template<typename MessageT>
 | 
					 | 
				
			||||||
  void
 | 
					 | 
				
			||||||
  publish(std::shared_ptr<const MessageT> msg)
 | 
					 | 
				
			||||||
  {
 | 
					 | 
				
			||||||
    // Avoid allocating when not using intra process.
 | 
					 | 
				
			||||||
    if (!store_intra_process_message_) {
 | 
					 | 
				
			||||||
      // In this case we're not using intra process.
 | 
					 | 
				
			||||||
      return this->do_inter_process_publish(msg.get());
 | 
					 | 
				
			||||||
    }
 | 
					 | 
				
			||||||
    // Otherwise we have to allocate memory in a unique_ptr and pass it along.
 | 
					 | 
				
			||||||
    // TODO(wjwwood):
 | 
					 | 
				
			||||||
    //   The intra process manager should probably also be able to store
 | 
					 | 
				
			||||||
    //   shared_ptr's and do the "smart" thing based on other intra process
 | 
					 | 
				
			||||||
    //   subscriptions. For now call the other publish().
 | 
					 | 
				
			||||||
    std::unique_ptr<MessageT> unique_msg(new MessageT(*msg.get()));
 | 
					 | 
				
			||||||
    return this->publish(unique_msg);
 | 
					 | 
				
			||||||
  }
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
  template<typename MessageT>
 | 
					 | 
				
			||||||
  void
 | 
					 | 
				
			||||||
  publish(const MessageT & msg)
 | 
					 | 
				
			||||||
  {
 | 
					 | 
				
			||||||
    // Avoid allocating when not using intra process.
 | 
					 | 
				
			||||||
    if (!store_intra_process_message_) {
 | 
					 | 
				
			||||||
      // In this case we're not using intra process.
 | 
					 | 
				
			||||||
      return this->do_inter_process_publish(msg.get());
 | 
					 | 
				
			||||||
    }
 | 
					 | 
				
			||||||
    // Otherwise we have to allocate memory in a unique_ptr and pass it along.
 | 
					 | 
				
			||||||
    std::unique_ptr<MessageT> unique_msg(new MessageT(msg));
 | 
					 | 
				
			||||||
    return this->publish(unique_msg);
 | 
					 | 
				
			||||||
  }
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
  /// Get the topic that this publisher publishes on.
 | 
					  /// Get the topic that this publisher publishes on.
 | 
				
			||||||
  // \return The topic name.
 | 
					  // \return The topic name.
 | 
				
			||||||
  const std::string &
 | 
					  const std::string &
 | 
				
			||||||
| 
						 | 
					@ -261,20 +165,6 @@ public:
 | 
				
			||||||
  typedef std::function<uint64_t(uint64_t, void *, const std::type_info &)> StoreMessageCallbackT;
 | 
					  typedef std::function<uint64_t(uint64_t, void *, const std::type_info &)> StoreMessageCallbackT;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
protected:
 | 
					protected:
 | 
				
			||||||
  template<typename MessageT>
 | 
					 | 
				
			||||||
  void
 | 
					 | 
				
			||||||
  do_inter_process_publish(MessageT * msg)
 | 
					 | 
				
			||||||
  {
 | 
					 | 
				
			||||||
    auto status = rmw_publish(publisher_handle_, msg);
 | 
					 | 
				
			||||||
    if (status != RMW_RET_OK) {
 | 
					 | 
				
			||||||
      // *INDENT-OFF* (prevent uncrustify from making unecessary indents here)
 | 
					 | 
				
			||||||
      throw std::runtime_error(
 | 
					 | 
				
			||||||
        std::string("failed to publish message: ") + rmw_get_error_string_safe());
 | 
					 | 
				
			||||||
      // *INDENT-ON*
 | 
					 | 
				
			||||||
    }
 | 
					 | 
				
			||||||
  }
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
  void
 | 
					  void
 | 
				
			||||||
  setup_intra_process(
 | 
					  setup_intra_process(
 | 
				
			||||||
    uint64_t intra_process_publisher_id,
 | 
					    uint64_t intra_process_publisher_id,
 | 
				
			||||||
| 
						 | 
					@ -295,7 +185,6 @@ protected:
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
  }
 | 
					  }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
private:
 | 
					 | 
				
			||||||
  std::shared_ptr<rmw_node_t> node_handle_;
 | 
					  std::shared_ptr<rmw_node_t> node_handle_;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
  rmw_publisher_t * publisher_handle_;
 | 
					  rmw_publisher_t * publisher_handle_;
 | 
				
			||||||
| 
						 | 
					@ -311,6 +200,127 @@ private:
 | 
				
			||||||
  rmw_gid_t intra_process_rmw_gid_;
 | 
					  rmw_gid_t intra_process_rmw_gid_;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
  std::mutex intra_process_publish_mutex_;
 | 
					  std::mutex intra_process_publish_mutex_;
 | 
				
			||||||
 | 
					};
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					/// A publisher publishes messages of any type to a topic.
 | 
				
			||||||
 | 
					template<typename MessageT>
 | 
				
			||||||
 | 
					class Publisher : public PublisherBase
 | 
				
			||||||
 | 
					{
 | 
				
			||||||
 | 
					  friend rclcpp::node::Node;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					public:
 | 
				
			||||||
 | 
					  RCLCPP_SMART_PTR_DEFINITIONS(Publisher<MessageT>);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					  Publisher(
 | 
				
			||||||
 | 
					    std::shared_ptr<rmw_node_t> node_handle,
 | 
				
			||||||
 | 
					    rmw_publisher_t * publisher_handle,
 | 
				
			||||||
 | 
					    std::string topic,
 | 
				
			||||||
 | 
					    size_t queue_size)
 | 
				
			||||||
 | 
					  : PublisherBase(node_handle, publisher_handle, topic, queue_size)
 | 
				
			||||||
 | 
					  {}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					  /// Send a message to the topic for this publisher.
 | 
				
			||||||
 | 
					  /**
 | 
				
			||||||
 | 
					   * This function is templated on the input message type, MessageT.
 | 
				
			||||||
 | 
					   * \param[in] msg A shared pointer to the message to send.
 | 
				
			||||||
 | 
					   */
 | 
				
			||||||
 | 
					  void
 | 
				
			||||||
 | 
					  publish(std::unique_ptr<MessageT> & msg)
 | 
				
			||||||
 | 
					  {
 | 
				
			||||||
 | 
					    this->do_inter_process_publish(msg.get());
 | 
				
			||||||
 | 
					    if (store_intra_process_message_) {
 | 
				
			||||||
 | 
					      // Take the pointer from the unique_msg, release it and pass as a void *
 | 
				
			||||||
 | 
					      // to the ipm. The ipm should then capture it again as a unique_ptr of
 | 
				
			||||||
 | 
					      // the correct type.
 | 
				
			||||||
 | 
					      // TODO(wjwwood):
 | 
				
			||||||
 | 
					      //   investigate how to transfer the custom deleter (if there is one)
 | 
				
			||||||
 | 
					      //   from the incoming unique_ptr through to the ipm's unique_ptr.
 | 
				
			||||||
 | 
					      //   See: http://stackoverflow.com/questions/11002641/dynamic-casting-for-unique-ptr
 | 
				
			||||||
 | 
					      MessageT * msg_ptr = msg.get();
 | 
				
			||||||
 | 
					      msg.release();
 | 
				
			||||||
 | 
					      uint64_t message_seq;
 | 
				
			||||||
 | 
					      {
 | 
				
			||||||
 | 
					        std::lock_guard<std::mutex> lock(intra_process_publish_mutex_);
 | 
				
			||||||
 | 
					        message_seq =
 | 
				
			||||||
 | 
					          store_intra_process_message_(intra_process_publisher_id_, msg_ptr, typeid(MessageT));
 | 
				
			||||||
 | 
					      }
 | 
				
			||||||
 | 
					      rcl_interfaces::msg::IntraProcessMessage ipm;
 | 
				
			||||||
 | 
					      ipm.publisher_id = intra_process_publisher_id_;
 | 
				
			||||||
 | 
					      ipm.message_sequence = message_seq;
 | 
				
			||||||
 | 
					      auto status = rmw_publish(intra_process_publisher_handle_, &ipm);
 | 
				
			||||||
 | 
					      if (status != RMW_RET_OK) {
 | 
				
			||||||
 | 
					        // *INDENT-OFF* (prevent uncrustify from making unecessary indents here)
 | 
				
			||||||
 | 
					        throw std::runtime_error(
 | 
				
			||||||
 | 
					          std::string("failed to publish intra process message: ") + rmw_get_error_string_safe());
 | 
				
			||||||
 | 
					        // *INDENT-ON*
 | 
				
			||||||
 | 
					      }
 | 
				
			||||||
 | 
					    } else {
 | 
				
			||||||
 | 
					      // Always destroy the message, even if we don't consume it, for consistency.
 | 
				
			||||||
 | 
					      msg.reset();
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					  }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					  void
 | 
				
			||||||
 | 
					  publish(const std::shared_ptr<MessageT> & msg)
 | 
				
			||||||
 | 
					  {
 | 
				
			||||||
 | 
					    // Avoid allocating when not using intra process.
 | 
				
			||||||
 | 
					    if (!store_intra_process_message_) {
 | 
				
			||||||
 | 
					      // In this case we're not using intra process.
 | 
				
			||||||
 | 
					      return this->do_inter_process_publish(msg.get());
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					    // Otherwise we have to allocate memory in a unique_ptr and pass it along.
 | 
				
			||||||
 | 
					    // TODO(wjwwood):
 | 
				
			||||||
 | 
					    //   The intra process manager should probably also be able to store
 | 
				
			||||||
 | 
					    //   shared_ptr's and do the "smart" thing based on other intra process
 | 
				
			||||||
 | 
					    //   subscriptions. For now call the other publish().
 | 
				
			||||||
 | 
					    std::unique_ptr<MessageT> unique_msg(new MessageT(*msg.get()));
 | 
				
			||||||
 | 
					    return this->publish(unique_msg);
 | 
				
			||||||
 | 
					  }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					  void
 | 
				
			||||||
 | 
					  publish(std::shared_ptr<const MessageT> msg)
 | 
				
			||||||
 | 
					  {
 | 
				
			||||||
 | 
					    // Avoid allocating when not using intra process.
 | 
				
			||||||
 | 
					    if (!store_intra_process_message_) {
 | 
				
			||||||
 | 
					      // In this case we're not using intra process.
 | 
				
			||||||
 | 
					      return this->do_inter_process_publish(msg.get());
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					    // Otherwise we have to allocate memory in a unique_ptr and pass it along.
 | 
				
			||||||
 | 
					    // TODO(wjwwood):
 | 
				
			||||||
 | 
					    //   The intra process manager should probably also be able to store
 | 
				
			||||||
 | 
					    //   shared_ptr's and do the "smart" thing based on other intra process
 | 
				
			||||||
 | 
					    //   subscriptions. For now call the other publish().
 | 
				
			||||||
 | 
					    std::unique_ptr<MessageT> unique_msg(new MessageT(*msg.get()));
 | 
				
			||||||
 | 
					    return this->publish(unique_msg);
 | 
				
			||||||
 | 
					  }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					  void
 | 
				
			||||||
 | 
					  publish(const MessageT & msg)
 | 
				
			||||||
 | 
					  {
 | 
				
			||||||
 | 
					    // Avoid allocating when not using intra process.
 | 
				
			||||||
 | 
					    if (!store_intra_process_message_) {
 | 
				
			||||||
 | 
					      // In this case we're not using intra process.
 | 
				
			||||||
 | 
					      return this->do_inter_process_publish(msg.get());
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					    // Otherwise we have to allocate memory in a unique_ptr and pass it along.
 | 
				
			||||||
 | 
					    std::unique_ptr<MessageT> unique_msg(new MessageT(msg));
 | 
				
			||||||
 | 
					    return this->publish(unique_msg);
 | 
				
			||||||
 | 
					  }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					protected:
 | 
				
			||||||
 | 
					  void
 | 
				
			||||||
 | 
					  do_inter_process_publish(MessageT * msg)
 | 
				
			||||||
 | 
					  {
 | 
				
			||||||
 | 
					    auto status = rmw_publish(publisher_handle_, msg);
 | 
				
			||||||
 | 
					    if (status != RMW_RET_OK) {
 | 
				
			||||||
 | 
					      // *INDENT-OFF* (prevent uncrustify from making unecessary indents here)
 | 
				
			||||||
 | 
					      throw std::runtime_error(
 | 
				
			||||||
 | 
					        std::string("failed to publish message: ") + rmw_get_error_string_safe());
 | 
				
			||||||
 | 
					      // *INDENT-ON*
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					  }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
};
 | 
					};
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -27,12 +27,12 @@ namespace publisher
 | 
				
			||||||
namespace mock
 | 
					namespace mock
 | 
				
			||||||
{
 | 
					{
 | 
				
			||||||
 | 
					
 | 
				
			||||||
class Publisher
 | 
					class PublisherBase
 | 
				
			||||||
{
 | 
					{
 | 
				
			||||||
public:
 | 
					public:
 | 
				
			||||||
  RCLCPP_SMART_PTR_DEFINITIONS(Publisher);
 | 
					  RCLCPP_SMART_PTR_DEFINITIONS(PublisherBase);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
  Publisher()
 | 
					  PublisherBase()
 | 
				
			||||||
  : mock_topic_name(""), mock_queue_size(0) {}
 | 
					  : mock_topic_name(""), mock_queue_size(0) {}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
  const std::string & get_topic_name() const
 | 
					  const std::string & get_topic_name() const
 | 
				
			||||||
| 
						 | 
					@ -55,6 +55,13 @@ public:
 | 
				
			||||||
  size_t mock_queue_size;
 | 
					  size_t mock_queue_size;
 | 
				
			||||||
};
 | 
					};
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					template<typename T>
 | 
				
			||||||
 | 
					class Publisher : public PublisherBase
 | 
				
			||||||
 | 
					{
 | 
				
			||||||
 | 
					public:
 | 
				
			||||||
 | 
					  RCLCPP_SMART_PTR_DEFINITIONS(Publisher<T>);
 | 
				
			||||||
 | 
					};
 | 
				
			||||||
 | 
					
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
| 
						 | 
					@ -96,10 +103,12 @@ public:
 | 
				
			||||||
#define RCLCPP_RCLCPP_SUBSCRIPTION_HPP_
 | 
					#define RCLCPP_RCLCPP_SUBSCRIPTION_HPP_
 | 
				
			||||||
// Force ipm to use our mock publisher class.
 | 
					// Force ipm to use our mock publisher class.
 | 
				
			||||||
#define Publisher mock::Publisher
 | 
					#define Publisher mock::Publisher
 | 
				
			||||||
 | 
					#define PublisherBase mock::PublisherBase
 | 
				
			||||||
#define SubscriptionBase mock::SubscriptionBase
 | 
					#define SubscriptionBase mock::SubscriptionBase
 | 
				
			||||||
#include <rclcpp/intra_process_manager.hpp>
 | 
					#include <rclcpp/intra_process_manager.hpp>
 | 
				
			||||||
#undef SubscriptionBase
 | 
					#undef SubscriptionBase
 | 
				
			||||||
#undef Publisher
 | 
					#undef Publisher
 | 
				
			||||||
 | 
					#undef PublisherBase
 | 
				
			||||||
 | 
					
 | 
				
			||||||
#include <rcl_interfaces/msg/intra_process_message.hpp>
 | 
					#include <rcl_interfaces/msg/intra_process_message.hpp>
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					@ -116,11 +125,13 @@ public:
 | 
				
			||||||
TEST(TestIntraProcessManager, nominal) {
 | 
					TEST(TestIntraProcessManager, nominal) {
 | 
				
			||||||
  rclcpp::intra_process_manager::IntraProcessManager ipm;
 | 
					  rclcpp::intra_process_manager::IntraProcessManager ipm;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
  auto p1 = std::make_shared<rclcpp::publisher::mock::Publisher>();
 | 
					  auto p1 =
 | 
				
			||||||
 | 
					    std::make_shared<rclcpp::publisher::mock::Publisher<rcl_interfaces::msg::IntraProcessMessage>>();
 | 
				
			||||||
  p1->mock_topic_name = "nominal1";
 | 
					  p1->mock_topic_name = "nominal1";
 | 
				
			||||||
  p1->mock_queue_size = 2;
 | 
					  p1->mock_queue_size = 2;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
  auto p2 = std::make_shared<rclcpp::publisher::mock::Publisher>();
 | 
					  auto p2 =
 | 
				
			||||||
 | 
					    std::make_shared<rclcpp::publisher::mock::Publisher<rcl_interfaces::msg::IntraProcessMessage>>();
 | 
				
			||||||
  p2->mock_topic_name = "nominal2";
 | 
					  p2->mock_topic_name = "nominal2";
 | 
				
			||||||
  p2->mock_queue_size = 10;
 | 
					  p2->mock_queue_size = 10;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					@ -201,7 +212,8 @@ TEST(TestIntraProcessManager, nominal) {
 | 
				
			||||||
TEST(TestIntraProcessManager, remove_publisher_before_trying_to_take) {
 | 
					TEST(TestIntraProcessManager, remove_publisher_before_trying_to_take) {
 | 
				
			||||||
  rclcpp::intra_process_manager::IntraProcessManager ipm;
 | 
					  rclcpp::intra_process_manager::IntraProcessManager ipm;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
  auto p1 = std::make_shared<rclcpp::publisher::mock::Publisher>();
 | 
					  auto p1 =
 | 
				
			||||||
 | 
					    std::make_shared<rclcpp::publisher::mock::Publisher<rcl_interfaces::msg::IntraProcessMessage>>();
 | 
				
			||||||
  p1->mock_topic_name = "nominal1";
 | 
					  p1->mock_topic_name = "nominal1";
 | 
				
			||||||
  p1->mock_queue_size = 10;
 | 
					  p1->mock_queue_size = 10;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					@ -240,7 +252,8 @@ TEST(TestIntraProcessManager, remove_publisher_before_trying_to_take) {
 | 
				
			||||||
TEST(TestIntraProcessManager, removed_subscription_affects_take) {
 | 
					TEST(TestIntraProcessManager, removed_subscription_affects_take) {
 | 
				
			||||||
  rclcpp::intra_process_manager::IntraProcessManager ipm;
 | 
					  rclcpp::intra_process_manager::IntraProcessManager ipm;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
  auto p1 = std::make_shared<rclcpp::publisher::mock::Publisher>();
 | 
					  auto p1 =
 | 
				
			||||||
 | 
					    std::make_shared<rclcpp::publisher::mock::Publisher<rcl_interfaces::msg::IntraProcessMessage>>();
 | 
				
			||||||
  p1->mock_topic_name = "nominal1";
 | 
					  p1->mock_topic_name = "nominal1";
 | 
				
			||||||
  p1->mock_queue_size = 10;
 | 
					  p1->mock_queue_size = 10;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					@ -308,7 +321,8 @@ TEST(TestIntraProcessManager, removed_subscription_affects_take) {
 | 
				
			||||||
TEST(TestIntraProcessManager, multiple_subscriptions_one_publisher) {
 | 
					TEST(TestIntraProcessManager, multiple_subscriptions_one_publisher) {
 | 
				
			||||||
  rclcpp::intra_process_manager::IntraProcessManager ipm;
 | 
					  rclcpp::intra_process_manager::IntraProcessManager ipm;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
  auto p1 = std::make_shared<rclcpp::publisher::mock::Publisher>();
 | 
					  auto p1 =
 | 
				
			||||||
 | 
					    std::make_shared<rclcpp::publisher::mock::Publisher<rcl_interfaces::msg::IntraProcessMessage>>();
 | 
				
			||||||
  p1->mock_topic_name = "nominal1";
 | 
					  p1->mock_topic_name = "nominal1";
 | 
				
			||||||
  p1->mock_queue_size = 10;
 | 
					  p1->mock_queue_size = 10;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					@ -377,15 +391,18 @@ TEST(TestIntraProcessManager, multiple_subscriptions_one_publisher) {
 | 
				
			||||||
TEST(TestIntraProcessManager, multiple_publishers_one_subscription) {
 | 
					TEST(TestIntraProcessManager, multiple_publishers_one_subscription) {
 | 
				
			||||||
  rclcpp::intra_process_manager::IntraProcessManager ipm;
 | 
					  rclcpp::intra_process_manager::IntraProcessManager ipm;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
  auto p1 = std::make_shared<rclcpp::publisher::mock::Publisher>();
 | 
					  auto p1 =
 | 
				
			||||||
 | 
					    std::make_shared<rclcpp::publisher::mock::Publisher<rcl_interfaces::msg::IntraProcessMessage>>();
 | 
				
			||||||
  p1->mock_topic_name = "nominal1";
 | 
					  p1->mock_topic_name = "nominal1";
 | 
				
			||||||
  p1->mock_queue_size = 10;
 | 
					  p1->mock_queue_size = 10;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
  auto p2 = std::make_shared<rclcpp::publisher::mock::Publisher>();
 | 
					  auto p2 =
 | 
				
			||||||
 | 
					    std::make_shared<rclcpp::publisher::mock::Publisher<rcl_interfaces::msg::IntraProcessMessage>>();
 | 
				
			||||||
  p2->mock_topic_name = "nominal1";
 | 
					  p2->mock_topic_name = "nominal1";
 | 
				
			||||||
  p2->mock_queue_size = 10;
 | 
					  p2->mock_queue_size = 10;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
  auto p3 = std::make_shared<rclcpp::publisher::mock::Publisher>();
 | 
					  auto p3 =
 | 
				
			||||||
 | 
					    std::make_shared<rclcpp::publisher::mock::Publisher<rcl_interfaces::msg::IntraProcessMessage>>();
 | 
				
			||||||
  p3->mock_topic_name = "nominal1";
 | 
					  p3->mock_topic_name = "nominal1";
 | 
				
			||||||
  p3->mock_queue_size = 10;
 | 
					  p3->mock_queue_size = 10;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					@ -468,15 +485,18 @@ TEST(TestIntraProcessManager, multiple_publishers_one_subscription) {
 | 
				
			||||||
TEST(TestIntraProcessManager, multiple_publishers_multiple_subscription) {
 | 
					TEST(TestIntraProcessManager, multiple_publishers_multiple_subscription) {
 | 
				
			||||||
  rclcpp::intra_process_manager::IntraProcessManager ipm;
 | 
					  rclcpp::intra_process_manager::IntraProcessManager ipm;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
  auto p1 = std::make_shared<rclcpp::publisher::mock::Publisher>();
 | 
					  auto p1 =
 | 
				
			||||||
 | 
					    std::make_shared<rclcpp::publisher::mock::Publisher<rcl_interfaces::msg::IntraProcessMessage>>();
 | 
				
			||||||
  p1->mock_topic_name = "nominal1";
 | 
					  p1->mock_topic_name = "nominal1";
 | 
				
			||||||
  p1->mock_queue_size = 10;
 | 
					  p1->mock_queue_size = 10;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
  auto p2 = std::make_shared<rclcpp::publisher::mock::Publisher>();
 | 
					  auto p2 =
 | 
				
			||||||
 | 
					    std::make_shared<rclcpp::publisher::mock::Publisher<rcl_interfaces::msg::IntraProcessMessage>>();
 | 
				
			||||||
  p2->mock_topic_name = "nominal1";
 | 
					  p2->mock_topic_name = "nominal1";
 | 
				
			||||||
  p2->mock_queue_size = 10;
 | 
					  p2->mock_queue_size = 10;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
  auto p3 = std::make_shared<rclcpp::publisher::mock::Publisher>();
 | 
					  auto p3 =
 | 
				
			||||||
 | 
					    std::make_shared<rclcpp::publisher::mock::Publisher<rcl_interfaces::msg::IntraProcessMessage>>();
 | 
				
			||||||
  p3->mock_topic_name = "nominal1";
 | 
					  p3->mock_topic_name = "nominal1";
 | 
				
			||||||
  p3->mock_queue_size = 10;
 | 
					  p3->mock_queue_size = 10;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					@ -626,7 +646,8 @@ TEST(TestIntraProcessManager, multiple_publishers_multiple_subscription) {
 | 
				
			||||||
TEST(TestIntraProcessManager, ring_buffer_displacement) {
 | 
					TEST(TestIntraProcessManager, ring_buffer_displacement) {
 | 
				
			||||||
  rclcpp::intra_process_manager::IntraProcessManager ipm;
 | 
					  rclcpp::intra_process_manager::IntraProcessManager ipm;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
  auto p1 = std::make_shared<rclcpp::publisher::mock::Publisher>();
 | 
					  auto p1 =
 | 
				
			||||||
 | 
					    std::make_shared<rclcpp::publisher::mock::Publisher<rcl_interfaces::msg::IntraProcessMessage>>();
 | 
				
			||||||
  p1->mock_topic_name = "nominal1";
 | 
					  p1->mock_topic_name = "nominal1";
 | 
				
			||||||
  p1->mock_queue_size = 2;
 | 
					  p1->mock_queue_size = 2;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					@ -694,7 +715,8 @@ TEST(TestIntraProcessManager, ring_buffer_displacement) {
 | 
				
			||||||
TEST(TestIntraProcessManager, subscription_creation_race_condition) {
 | 
					TEST(TestIntraProcessManager, subscription_creation_race_condition) {
 | 
				
			||||||
  rclcpp::intra_process_manager::IntraProcessManager ipm;
 | 
					  rclcpp::intra_process_manager::IntraProcessManager ipm;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
  auto p1 = std::make_shared<rclcpp::publisher::mock::Publisher>();
 | 
					  auto p1 =
 | 
				
			||||||
 | 
					    std::make_shared<rclcpp::publisher::mock::Publisher<rcl_interfaces::msg::IntraProcessMessage>>();
 | 
				
			||||||
  p1->mock_topic_name = "nominal1";
 | 
					  p1->mock_topic_name = "nominal1";
 | 
				
			||||||
  p1->mock_queue_size = 2;
 | 
					  p1->mock_queue_size = 2;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					@ -740,7 +762,8 @@ TEST(TestIntraProcessManager, publisher_out_of_scope_take) {
 | 
				
			||||||
  uint64_t p1_id;
 | 
					  uint64_t p1_id;
 | 
				
			||||||
  uint64_t p1_m1_id;
 | 
					  uint64_t p1_m1_id;
 | 
				
			||||||
  {
 | 
					  {
 | 
				
			||||||
    auto p1 = std::make_shared<rclcpp::publisher::mock::Publisher>();
 | 
					    auto p1 =
 | 
				
			||||||
 | 
					      std::make_shared<rclcpp::publisher::mock::Publisher<rcl_interfaces::msg::IntraProcessMessage>>();
 | 
				
			||||||
    p1->mock_topic_name = "nominal1";
 | 
					    p1->mock_topic_name = "nominal1";
 | 
				
			||||||
    p1->mock_queue_size = 2;
 | 
					    p1->mock_queue_size = 2;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					@ -777,7 +800,8 @@ TEST(TestIntraProcessManager, publisher_out_of_scope_store) {
 | 
				
			||||||
 | 
					
 | 
				
			||||||
  uint64_t p1_id;
 | 
					  uint64_t p1_id;
 | 
				
			||||||
  {
 | 
					  {
 | 
				
			||||||
    auto p1 = std::make_shared<rclcpp::publisher::mock::Publisher>();
 | 
					    auto p1 =
 | 
				
			||||||
 | 
					      std::make_shared<rclcpp::publisher::mock::Publisher<rcl_interfaces::msg::IntraProcessMessage>>();
 | 
				
			||||||
    p1->mock_topic_name = "nominal1";
 | 
					    p1->mock_topic_name = "nominal1";
 | 
				
			||||||
    p1->mock_queue_size = 2;
 | 
					    p1->mock_queue_size = 2;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue