From 8332480c8510a09374c49be1e196a788041460f9 Mon Sep 17 00:00:00 2001 From: Esteve Fernandez Date: Tue, 13 Jan 2015 18:32:14 -0800 Subject: [PATCH] Added support for asynchronous clients --- rclcpp/include/rclcpp/callback_group.hpp | 8 ++ rclcpp/include/rclcpp/client.hpp | 91 +++++++++++-- rclcpp/include/rclcpp/executor.hpp | 166 +++++++++++++++++++++++ rclcpp/include/rclcpp/node.hpp | 5 +- rclcpp/include/rclcpp/node_impl.hpp | 22 ++- 5 files changed, 280 insertions(+), 12 deletions(-) diff --git a/rclcpp/include/rclcpp/callback_group.hpp b/rclcpp/include/rclcpp/callback_group.hpp index 82023e0..088439e 100644 --- a/rclcpp/include/rclcpp/callback_group.hpp +++ b/rclcpp/include/rclcpp/callback_group.hpp @@ -23,6 +23,7 @@ #include #include #include +#include namespace rclcpp { @@ -69,10 +70,17 @@ private: service_ptrs_.push_back(service_ptr); } + void + add_client(const client::ClientBase::SharedPtr &client_ptr) + { + client_ptrs_.push_back(client_ptr); + } + CallbackGroupType type_; std::vector subscription_ptrs_; std::vector timer_ptrs_; std::vector service_ptrs_; + std::vector client_ptrs_; std::atomic_bool can_be_taken_from_; }; diff --git a/rclcpp/include/rclcpp/client.hpp b/rclcpp/include/rclcpp/client.hpp index f9ca72c..f6635f7 100644 --- a/rclcpp/include/rclcpp/client.hpp +++ b/rclcpp/include/rclcpp/client.hpp @@ -21,7 +21,11 @@ #include #include +#include #include +#include +#include +#include namespace rclcpp { @@ -32,30 +36,99 @@ namespace node {class Node;} namespace client { +class ClientBase +{ + friend class rclcpp::executor::Executor; +public: + RCLCPP_MAKE_SHARED_DEFINITIONS(ClientBase); + + ClientBase( + ros_middleware_interface::ClientHandle client_handle, + std::string &service_name) + : client_handle_(client_handle), service_name_(service_name) + {} + + std::string get_service_name() + { + return this->service_name_; + } + + ros_middleware_interface::ClientHandle get_client_handle() + { + return this->client_handle_; + } + + virtual std::shared_ptr create_response() = 0; + virtual std::shared_ptr create_request_header() = 0; + virtual void handle_response(std::shared_ptr &response, std::shared_ptr &req_id) = 0; + +private: + RCLCPP_DISABLE_COPY(ClientBase); + + ros_middleware_interface::ClientHandle client_handle_; + std::string service_name_; + +}; + template -class Client +class Client : public ClientBase { public: + typedef std::promise Promise; + typedef std::shared_ptr SharedPromise; + typedef std::shared_future SharedFuture; + RCLCPP_MAKE_SHARED_DEFINITIONS(Client); Client(ros_middleware_interface::ClientHandle client_handle, std::string& service_name) - : client_handle_(client_handle), service_name_(service_name) + : ClientBase(client_handle, service_name) {} - ::ros_middleware_interface::ROS2_RETCODE_t send_request( - std::shared_ptr &req, - std::shared_ptr &res) + std::shared_ptr get_response(int64_t sequence_number) { - ::ros_middleware_interface::send_request(client_handle_, req.get()); + auto pair = this->pending_requests_[sequence_number]; + return pair.second; + } - return ::ros_middleware_interface::receive_response(client_handle_, res.get()); + std::shared_ptr create_response() + { + return std::shared_ptr(new typename ServiceT::Response()); + } + + std::shared_ptr create_request_header() + { + return std::shared_ptr(new ros_middleware_interface::RequestId()); + } + + void handle_response(std::shared_ptr &response, std::shared_ptr &req_id) + { + auto typed_req_id = std::static_pointer_cast(req_id); + auto typed_response = std::static_pointer_cast(response); + int64_t sequence_number = typed_req_id->sequence_number; + auto pair = this->pending_requests_[sequence_number]; + auto call_promise = pair.first; + this->pending_requests_.erase(sequence_number); + call_promise->set_value(typed_response); + } + + SharedFuture async_send_request( + typename ServiceT::Request::Ptr &request, + typename ServiceT::Response::Ptr &response) + { + int64_t sequence_number = ::ros_middleware_interface::send_request(get_client_handle(), request.get()); + + SharedPromise call_promise = std::make_shared(); + pending_requests_[sequence_number] = std::make_pair(call_promise, response); + + SharedFuture f(call_promise->get_future()); + return f; } private: - ros_middleware_interface::ClientHandle client_handle_; - std::string service_name_; + RCLCPP_DISABLE_COPY(Client); + std::map > pending_requests_; }; } /* namespace client */ diff --git a/rclcpp/include/rclcpp/executor.hpp b/rclcpp/include/rclcpp/executor.hpp index df38ffc..4b1725b 100644 --- a/rclcpp/include/rclcpp/executor.hpp +++ b/rclcpp/include/rclcpp/executor.hpp @@ -107,6 +107,7 @@ protected: rclcpp::subscription::SubscriptionBase::SharedPtr subscription; rclcpp::timer::TimerBase::SharedPtr timer; rclcpp::service::ServiceBase::SharedPtr service; + rclcpp::client::ClientBase::SharedPtr client; // These are used to keep the scope on the containing items rclcpp::callback_group::CallbackGroup::SharedPtr callback_group; rclcpp::node::Node::SharedPtr node; @@ -131,6 +132,10 @@ protected: { execute_service(any_exec->service); } + if (any_exec->client) + { + execute_client(any_exec->client); + } // Reset the callback_group, regardless of type any_exec->callback_group->can_be_taken_from_.store(true); // Wake the wait, because it may need to be recalculated or work that @@ -188,6 +193,26 @@ protected: } } + static void + execute_client( + rclcpp::client::ClientBase::SharedPtr &client) + { + std::shared_ptr response = client->create_response(); + std::shared_ptr request_header = client->create_request_header(); + bool taken = ros_middleware_interface::take_response( + client->client_handle_, + response.get(), + request_header.get()); + if (taken) + { + client->handle_response(response, request_header); + } + else + { + std::cout << "[rclcpp::error] take failed for service on client" << std::endl; + } + } + /*** Populate class storage from stored weak node pointers and wait. ***/ void @@ -198,6 +223,7 @@ protected: std::vector subs; std::vector timers; std::vector services; + std::vector clients; for (auto &weak_node : weak_nodes_) { auto node = weak_node.lock(); @@ -225,6 +251,10 @@ protected: { services.push_back(service); } + for (auto &client : group->client_ptrs_) + { + clients.push_back(client); + } } } // Clean up any invalid nodes, if they were detected @@ -278,6 +308,27 @@ protected: service_handle_index += 1; } + // Use the number of clients to allocate memory in the handles + size_t number_of_clients = clients.size(); + ros_middleware_interface::ClientHandles client_handles; + client_handles.client_count_ = number_of_clients; + // TODO: Avoid redundant malloc's + client_handles.clients_ = static_cast( + std::malloc(sizeof(void *) * number_of_clients)); + if (client_handles.clients_ == NULL) + { + // TODO: Use a different error here? maybe std::bad_alloc? + throw std::runtime_error("Could not malloc for client pointers."); + } + // Then fill the ServiceHandles with ready clients + size_t client_handle_index = 0; + for (auto &client : clients) + { + client_handles.clients_[client_handle_index] = \ + client->client_handle_.data_; + client_handle_index += 1; + } + // Use the number of guard conditions to allocate memory in the handles // Add 2 to the number for the ctrl-c guard cond and the executor's size_t start_of_timer_guard_conds = 2; @@ -313,6 +364,7 @@ protected: ros_middleware_interface::wait(subscriber_handles, guard_condition_handles, service_handles, + client_handles, nonblocking); // If ctrl-c guard condition, return directly if (guard_condition_handles.guard_conditions_[0] != 0) @@ -352,6 +404,15 @@ protected: service_handles_.push_back(handle); } } + // Then the clients + for (size_t i = 0; i < number_of_clients; ++i) + { + void *handle = client_handles.clients_[i]; + if (handle) + { + client_handles_.push_back(handle); + } + } // Make sure to free memory // TODO: Remove theses when "Avoid redundant malloc's" todo is addressed @@ -449,6 +510,36 @@ protected: return rclcpp::service::ServiceBase::SharedPtr(); } + rclcpp::client::ClientBase::SharedPtr + get_client_by_handle(void * client_handle) + { + for (auto weak_node : weak_nodes_) + { + auto node = weak_node.lock(); + if (!node) + { + continue; + } + for (auto weak_group : node->callback_groups_) + { + auto group = weak_group.lock(); + if (!group) + { + continue; + } + for (auto client : group->client_ptrs_) + { + if (client->client_handle_.data_ == client_handle) + { + return client; + } + } + } + } + return rclcpp::client::ClientBase::SharedPtr(); + } + + void remove_subscriber_handle_from_subscriber_handles(void *handle) { @@ -467,6 +558,12 @@ protected: service_handles_.remove(handle); } + void + remove_client_handle_from_client_handles(void *handle) + { + client_handles_.remove(handle); + } + rclcpp::node::Node::SharedPtr get_node_by_group(rclcpp::callback_group::CallbackGroup::SharedPtr &group) { @@ -680,6 +777,67 @@ protected: } } + rclcpp::callback_group::CallbackGroup::SharedPtr + get_group_by_client( + rclcpp::client::ClientBase::SharedPtr &client) + { + for (auto &weak_node : weak_nodes_) + { + auto node = weak_node.lock(); + if (!node) + { + continue; + } + for (auto &weak_group : node->callback_groups_) + { + auto group = weak_group.lock(); + for (auto &cli : group->client_ptrs_) + { + if (cli == client) + { + return group; + } + } + } + } + return rclcpp::callback_group::CallbackGroup::SharedPtr(); + } + + void + get_next_client(std::shared_ptr &any_exec) + { + for (auto handle : client_handles_) + { + auto client = get_client_by_handle(handle); + if (client) + { + // Find the group for this handle and see if it can be serviced + auto group = get_group_by_client(client); + if (!group) + { + // Group was not found, meaning the service is not valid... + // Remove it from the ready list and continue looking + remove_client_handle_from_client_handles(handle); + continue; + } + if (!group->can_be_taken_from_.load()) + { + // Group is mutually exclusive and is being used, so skip it for now + // Leave it to be checked next time, but continue searching + continue; + } + // Otherwise it is safe to set and return the any_exec + any_exec->client = client; + any_exec->callback_group = group; + any_exec->node = get_node_by_group(group); + remove_client_handle_from_client_handles(handle); + return; + } + // Else, the service is no longer valid, remove it and continue + remove_client_handle_from_client_handles(handle); + } + } + std::shared_ptr get_next_ready_executable() { @@ -702,6 +860,12 @@ protected: { return any_exec; } + // Check the clients to see if there are any that are ready + get_next_client(any_exec); + if (any_exec->client) + { + return any_exec; + } // If there is neither a ready timer nor subscription, return a null ptr any_exec.reset(); return any_exec; @@ -752,6 +916,8 @@ private: GuardConditionHandles guard_condition_handles_; typedef std::list ServiceHandles; ServiceHandles service_handles_; + typedef std::list ClientHandles; + ClientHandles client_handles_; }; diff --git a/rclcpp/include/rclcpp/node.hpp b/rclcpp/include/rclcpp/node.hpp index dcc91bd..cbae283 100644 --- a/rclcpp/include/rclcpp/node.hpp +++ b/rclcpp/include/rclcpp/node.hpp @@ -94,7 +94,9 @@ public: /* Create and return a Client. */ template typename rclcpp::client::Client::SharedPtr - create_client(std::string service_name); + create_client( + std::string service_name, + rclcpp::callback_group::CallbackGroup::SharedPtr group=nullptr); /* Create and return a Service. */ template @@ -123,6 +125,7 @@ private: size_t number_of_subscriptions_; size_t number_of_timers_; size_t number_of_services_; + size_t number_of_clients_; }; diff --git a/rclcpp/include/rclcpp/node_impl.hpp b/rclcpp/include/rclcpp/node_impl.hpp index 002360f..f08d0bd 100644 --- a/rclcpp/include/rclcpp/node_impl.hpp +++ b/rclcpp/include/rclcpp/node_impl.hpp @@ -165,7 +165,9 @@ Node::create_wall_timer( template typename client::Client::SharedPtr -Node::create_client(std::string service_name) +Node::create_client( + std::string service_name, + rclcpp::callback_group::CallbackGroup::SharedPtr group) { namespace rmi = ::ros_middleware_interface; @@ -180,6 +182,22 @@ Node::create_client(std::string service_name) auto cli = Client::make_shared(client_handle, service_name); + auto cli_base_ptr = std::dynamic_pointer_cast(cli); + if (group) + { + if (!group_in_node(group)) + { + // TODO: use custom exception + throw std::runtime_error("Cannot create client, group not in node."); + } + group->add_client(cli_base_ptr); + } + else + { + default_callback_group_->add_client(cli_base_ptr); + } + number_of_clients_++; + return cli; } @@ -211,7 +229,7 @@ Node::create_service( if (!group_in_node(group)) { // TODO: use custom exception - throw std::runtime_error("Cannot create timer, group not in node."); + throw std::runtime_error("Cannot create service, group not in node."); } group->add_service(serv_base_ptr); }