Merge pull request #193 from ros2/issue_192
potential fix for issue 192
This commit is contained in:
		
						commit
						69f7bca85d
					
				
					 5 changed files with 21 additions and 12 deletions
				
			
		| 
						 | 
					@ -66,7 +66,7 @@ public:
 | 
				
			||||||
  get_service_ptrs() const;
 | 
					  get_service_ptrs() const;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
  RCLCPP_PUBLIC
 | 
					  RCLCPP_PUBLIC
 | 
				
			||||||
  const std::vector<rclcpp::client::ClientBase::SharedPtr> &
 | 
					  const std::vector<rclcpp::client::ClientBase::WeakPtr> &
 | 
				
			||||||
  get_client_ptrs() const;
 | 
					  get_client_ptrs() const;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
  RCLCPP_PUBLIC
 | 
					  RCLCPP_PUBLIC
 | 
				
			||||||
| 
						 | 
					@ -100,7 +100,7 @@ private:
 | 
				
			||||||
  std::vector<rclcpp::subscription::SubscriptionBase::WeakPtr> subscription_ptrs_;
 | 
					  std::vector<rclcpp::subscription::SubscriptionBase::WeakPtr> subscription_ptrs_;
 | 
				
			||||||
  std::vector<rclcpp::timer::TimerBase::WeakPtr> timer_ptrs_;
 | 
					  std::vector<rclcpp::timer::TimerBase::WeakPtr> timer_ptrs_;
 | 
				
			||||||
  std::vector<rclcpp::service::ServiceBase::SharedPtr> service_ptrs_;
 | 
					  std::vector<rclcpp::service::ServiceBase::SharedPtr> service_ptrs_;
 | 
				
			||||||
  std::vector<rclcpp::client::ClientBase::SharedPtr> client_ptrs_;
 | 
					  std::vector<rclcpp::client::ClientBase::WeakPtr> client_ptrs_;
 | 
				
			||||||
  std::atomic_bool can_be_taken_from_;
 | 
					  std::atomic_bool can_be_taken_from_;
 | 
				
			||||||
};
 | 
					};
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -60,7 +60,7 @@ public:
 | 
				
			||||||
  virtual std::shared_ptr<void> create_response() = 0;
 | 
					  virtual std::shared_ptr<void> create_response() = 0;
 | 
				
			||||||
  virtual std::shared_ptr<void> create_request_header() = 0;
 | 
					  virtual std::shared_ptr<void> create_request_header() = 0;
 | 
				
			||||||
  virtual void handle_response(
 | 
					  virtual void handle_response(
 | 
				
			||||||
    std::shared_ptr<void> & request_header, std::shared_ptr<void> & response) = 0;
 | 
					    std::shared_ptr<void> request_header, std::shared_ptr<void> response) = 0;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
private:
 | 
					private:
 | 
				
			||||||
  RCLCPP_DISABLE_COPY(ClientBase);
 | 
					  RCLCPP_DISABLE_COPY(ClientBase);
 | 
				
			||||||
| 
						 | 
					@ -111,13 +111,17 @@ public:
 | 
				
			||||||
    return std::shared_ptr<void>(new rmw_request_id_t);
 | 
					    return std::shared_ptr<void>(new rmw_request_id_t);
 | 
				
			||||||
  }
 | 
					  }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
  void handle_response(std::shared_ptr<void> & request_header, std::shared_ptr<void> & response)
 | 
					  void handle_response(std::shared_ptr<void> request_header, std::shared_ptr<void> response)
 | 
				
			||||||
  {
 | 
					  {
 | 
				
			||||||
 | 
					    std::lock_guard<std::mutex> lock(pending_requests_mutex_);
 | 
				
			||||||
    auto typed_request_header = std::static_pointer_cast<rmw_request_id_t>(request_header);
 | 
					    auto typed_request_header = std::static_pointer_cast<rmw_request_id_t>(request_header);
 | 
				
			||||||
    auto typed_response = std::static_pointer_cast<typename ServiceT::Response>(response);
 | 
					    auto typed_response = std::static_pointer_cast<typename ServiceT::Response>(response);
 | 
				
			||||||
    int64_t sequence_number = typed_request_header->sequence_number;
 | 
					    int64_t sequence_number = typed_request_header->sequence_number;
 | 
				
			||||||
    // TODO(esteve) this must check if the sequence_number is valid otherwise the
 | 
					    // TODO(esteve) this should throw instead since it is not expected to happen in the first place
 | 
				
			||||||
    // call_promise will be null
 | 
					    if (this->pending_requests_.count(sequence_number) == 0) {
 | 
				
			||||||
 | 
					      fprintf(stderr, "Received invalid sequence number. Ignoring...\n");
 | 
				
			||||||
 | 
					      return;
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
    auto tuple = this->pending_requests_[sequence_number];
 | 
					    auto tuple = this->pending_requests_[sequence_number];
 | 
				
			||||||
    auto call_promise = std::get<0>(tuple);
 | 
					    auto call_promise = std::get<0>(tuple);
 | 
				
			||||||
    auto callback = std::get<1>(tuple);
 | 
					    auto callback = std::get<1>(tuple);
 | 
				
			||||||
| 
						 | 
					@ -143,6 +147,7 @@ public:
 | 
				
			||||||
  >
 | 
					  >
 | 
				
			||||||
  SharedFuture async_send_request(SharedRequest request, CallbackT && cb)
 | 
					  SharedFuture async_send_request(SharedRequest request, CallbackT && cb)
 | 
				
			||||||
  {
 | 
					  {
 | 
				
			||||||
 | 
					    std::lock_guard<std::mutex> lock(pending_requests_mutex_);
 | 
				
			||||||
    int64_t sequence_number;
 | 
					    int64_t sequence_number;
 | 
				
			||||||
    if (RMW_RET_OK != rmw_send_request(get_client_handle(), request.get(), &sequence_number)) {
 | 
					    if (RMW_RET_OK != rmw_send_request(get_client_handle(), request.get(), &sequence_number)) {
 | 
				
			||||||
      // *INDENT-OFF* (prevent uncrustify from making unecessary indents here)
 | 
					      // *INDENT-OFF* (prevent uncrustify from making unecessary indents here)
 | 
				
			||||||
| 
						 | 
					@ -187,6 +192,7 @@ private:
 | 
				
			||||||
  RCLCPP_DISABLE_COPY(Client);
 | 
					  RCLCPP_DISABLE_COPY(Client);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
  std::map<int64_t, std::tuple<SharedPromise, CallbackType, SharedFuture>> pending_requests_;
 | 
					  std::map<int64_t, std::tuple<SharedPromise, CallbackType, SharedFuture>> pending_requests_;
 | 
				
			||||||
 | 
					  std::mutex pending_requests_mutex_;
 | 
				
			||||||
};
 | 
					};
 | 
				
			||||||
 | 
					
 | 
				
			||||||
}  // namespace client
 | 
					}  // namespace client
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -151,7 +151,8 @@ public:
 | 
				
			||||||
            services_.push_back(service);
 | 
					            services_.push_back(service);
 | 
				
			||||||
          }
 | 
					          }
 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
        for (auto & client : group->get_client_ptrs()) {
 | 
					        for (auto & weak_client : group->get_client_ptrs()) {
 | 
				
			||||||
 | 
					          auto client = weak_client.lock();
 | 
				
			||||||
          if (client) {
 | 
					          if (client) {
 | 
				
			||||||
            clients_.push_back(client);
 | 
					            clients_.push_back(client);
 | 
				
			||||||
          }
 | 
					          }
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -41,7 +41,7 @@ CallbackGroup::get_service_ptrs() const
 | 
				
			||||||
  return service_ptrs_;
 | 
					  return service_ptrs_;
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
const std::vector<rclcpp::client::ClientBase::SharedPtr> &
 | 
					const std::vector<rclcpp::client::ClientBase::WeakPtr> &
 | 
				
			||||||
CallbackGroup::get_client_ptrs() const
 | 
					CallbackGroup::get_client_ptrs() const
 | 
				
			||||||
{
 | 
					{
 | 
				
			||||||
  return client_ptrs_;
 | 
					  return client_ptrs_;
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -84,8 +84,9 @@ MemoryStrategy::get_client_by_handle(void * client_handle, const WeakNodeVector
 | 
				
			||||||
      if (!group) {
 | 
					      if (!group) {
 | 
				
			||||||
        continue;
 | 
					        continue;
 | 
				
			||||||
      }
 | 
					      }
 | 
				
			||||||
      for (auto & client : group->get_client_ptrs()) {
 | 
					      for (auto & weak_client : group->get_client_ptrs()) {
 | 
				
			||||||
        if (client->get_client_handle()->data == client_handle) {
 | 
					        auto client = weak_client.lock();
 | 
				
			||||||
 | 
					        if (client && client->get_client_handle()->data == client_handle) {
 | 
				
			||||||
          return client;
 | 
					          return client;
 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
      }
 | 
					      }
 | 
				
			||||||
| 
						 | 
					@ -182,8 +183,9 @@ MemoryStrategy::get_group_by_client(
 | 
				
			||||||
      if (!group) {
 | 
					      if (!group) {
 | 
				
			||||||
        continue;
 | 
					        continue;
 | 
				
			||||||
      }
 | 
					      }
 | 
				
			||||||
      for (auto & cli : group->get_client_ptrs()) {
 | 
					      for (auto & weak_client : group->get_client_ptrs()) {
 | 
				
			||||||
        if (cli == client) {
 | 
					        auto cli = weak_client.lock();
 | 
				
			||||||
 | 
					        if (cli && cli == client) {
 | 
				
			||||||
          return group;
 | 
					          return group;
 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
      }
 | 
					      }
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue