diff --git a/rclcpp/include/rclcpp/callback_group.hpp b/rclcpp/include/rclcpp/callback_group.hpp index bfe4431..088439e 100644 --- a/rclcpp/include/rclcpp/callback_group.hpp +++ b/rclcpp/include/rclcpp/callback_group.hpp @@ -22,6 +22,8 @@ #include #include +#include +#include namespace rclcpp { @@ -62,9 +64,23 @@ private: timer_ptrs_.push_back(timer_ptr); } + void + add_service(const service::ServiceBase::SharedPtr &service_ptr) + { + service_ptrs_.push_back(service_ptr); + } + + void + add_client(const client::ClientBase::SharedPtr &client_ptr) + { + client_ptrs_.push_back(client_ptr); + } + CallbackGroupType type_; std::vector subscription_ptrs_; std::vector timer_ptrs_; + std::vector service_ptrs_; + std::vector client_ptrs_; std::atomic_bool can_be_taken_from_; }; diff --git a/rclcpp/include/rclcpp/client.hpp b/rclcpp/include/rclcpp/client.hpp new file mode 100644 index 0000000..66976d5 --- /dev/null +++ b/rclcpp/include/rclcpp/client.hpp @@ -0,0 +1,149 @@ +/* Copyright 2014 Open Source Robotics Foundation, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef RCLCPP_RCLCPP_CLIENT_HPP_ +#define RCLCPP_RCLCPP_CLIENT_HPP_ + +#include + +#include +#include + +#include +#include +#include +#include +#include + +namespace rclcpp +{ + +// Forward declaration for friend statement +namespace node {class Node;} + +namespace client +{ + +class ClientBase +{ + friend class rclcpp::executor::Executor; +public: + RCLCPP_MAKE_SHARED_DEFINITIONS(ClientBase); + + ClientBase( + ros_middleware_interface::ClientHandle client_handle, + std::string &service_name) + : client_handle_(client_handle), service_name_(service_name) + {} + + std::string get_service_name() + { + return this->service_name_; + } + + ros_middleware_interface::ClientHandle get_client_handle() + { + return this->client_handle_; + } + + virtual std::shared_ptr create_response() = 0; + virtual std::shared_ptr create_request_header() = 0; + virtual void handle_response(std::shared_ptr &response, std::shared_ptr &req_id) = 0; + +private: + RCLCPP_DISABLE_COPY(ClientBase); + + ros_middleware_interface::ClientHandle client_handle_; + std::string service_name_; + +}; + +template +class Client : public ClientBase +{ +public: + typedef std::promise Promise; + typedef std::shared_ptr SharedPromise; + typedef std::shared_future SharedFuture; + + typedef std::function CallbackType; + + RCLCPP_MAKE_SHARED_DEFINITIONS(Client); + + Client(ros_middleware_interface::ClientHandle client_handle, + std::string& service_name) + : ClientBase(client_handle, service_name) + {} + + std::shared_ptr get_response(int64_t sequence_number) + { + auto pair = this->pending_requests_[sequence_number]; + return pair.second; + } + + std::shared_ptr create_response() + { + return std::shared_ptr(new typename ServiceT::Response()); + } + + std::shared_ptr create_request_header() + { + return std::shared_ptr(new ros_middleware_interface::RequestId()); + } + + void handle_response(std::shared_ptr &response, std::shared_ptr &req_id) + { + auto typed_req_id = std::static_pointer_cast(req_id); + auto typed_response = std::static_pointer_cast(response); + int64_t sequence_number = typed_req_id->sequence_number; + auto tuple = this->pending_requests_[sequence_number]; + auto call_promise = std::get<0>(tuple); + auto callback = std::get<1>(tuple); + auto future = std::get<2>(tuple); + this->pending_requests_.erase(sequence_number); + call_promise->set_value(typed_response); + callback(future); + } + + SharedFuture async_send_request( + typename ServiceT::Request::Ptr &request, + typename ServiceT::Response::Ptr &response) + { + return async_send_request(request, response, [] (SharedFuture f) { }); + } + + SharedFuture async_send_request( + typename ServiceT::Request::Ptr &request, + typename ServiceT::Response::Ptr &response, + CallbackType cb) + { + int64_t sequence_number = ::ros_middleware_interface::send_request(get_client_handle(), request.get()); + + SharedPromise call_promise = std::make_shared(); + SharedFuture f(call_promise->get_future()); + pending_requests_[sequence_number] = std::make_tuple(call_promise, cb, f); + return f; + } + +private: + RCLCPP_DISABLE_COPY(Client); + + std::map > pending_requests_; +}; + +} /* namespace client */ +} /* namespace rclcpp */ + +#endif /* RCLCPP_RCLCPP_CLIENT_HPP_ */ diff --git a/rclcpp/include/rclcpp/executor.hpp b/rclcpp/include/rclcpp/executor.hpp index 2d14094..4b1725b 100644 --- a/rclcpp/include/rclcpp/executor.hpp +++ b/rclcpp/include/rclcpp/executor.hpp @@ -106,6 +106,8 @@ protected: // Either the subscription or the timer will be set, but not both rclcpp::subscription::SubscriptionBase::SharedPtr subscription; rclcpp::timer::TimerBase::SharedPtr timer; + rclcpp::service::ServiceBase::SharedPtr service; + rclcpp::client::ClientBase::SharedPtr client; // These are used to keep the scope on the containing items rclcpp::callback_group::CallbackGroup::SharedPtr callback_group; rclcpp::node::Node::SharedPtr node; @@ -126,6 +128,14 @@ protected: { execute_subscription(any_exec->subscription); } + if (any_exec->service) + { + execute_service(any_exec->service); + } + if (any_exec->client) + { + execute_client(any_exec->client); + } // Reset the callback_group, regardless of type any_exec->callback_group->can_be_taken_from_.store(true); // Wake the wait, because it may need to be recalculated or work that @@ -161,6 +171,48 @@ protected: timer->callback_(); } + static void + execute_service( + rclcpp::service::ServiceBase::SharedPtr &service) + { + std::shared_ptr request = service->create_request(); + std::shared_ptr request_header = service->create_request_header(); + bool taken = ros_middleware_interface::take_request( + service->service_handle_, + request.get(), + request_header.get()); + if (taken) + { + service->handle_request(request, request_header); + } + else + { + std::cout << "[rclcpp::error] take failed for service on service: " + << service->get_service_name() + << std::endl; + } + } + + static void + execute_client( + rclcpp::client::ClientBase::SharedPtr &client) + { + std::shared_ptr response = client->create_response(); + std::shared_ptr request_header = client->create_request_header(); + bool taken = ros_middleware_interface::take_response( + client->client_handle_, + response.get(), + request_header.get()); + if (taken) + { + client->handle_response(response, request_header); + } + else + { + std::cout << "[rclcpp::error] take failed for service on client" << std::endl; + } + } + /*** Populate class storage from stored weak node pointers and wait. ***/ void @@ -170,6 +222,8 @@ protected: bool has_invalid_weak_nodes = false; std::vector subs; std::vector timers; + std::vector services; + std::vector clients; for (auto &weak_node : weak_nodes_) { auto node = weak_node.lock(); @@ -193,6 +247,14 @@ protected: { timers.push_back(timer); } + for (auto &service : group->service_ptrs_) + { + services.push_back(service); + } + for (auto &client : group->client_ptrs_) + { + clients.push_back(client); + } } } // Clean up any invalid nodes, if they were detected @@ -224,6 +286,49 @@ protected: subscription->subscription_handle_.data_; subscriber_handle_index += 1; } + + // Use the number of services to allocate memory in the handles + size_t number_of_services = services.size(); + ros_middleware_interface::ServiceHandles service_handles; + service_handles.service_count_ = number_of_services; + // TODO: Avoid redundant malloc's + service_handles.services_ = static_cast( + std::malloc(sizeof(void *) * number_of_services)); + if (service_handles.services_ == NULL) + { + // TODO: Use a different error here? maybe std::bad_alloc? + throw std::runtime_error("Could not malloc for service pointers."); + } + // Then fill the ServiceHandles with ready services + size_t service_handle_index = 0; + for (auto &service : services) + { + service_handles.services_[service_handle_index] = \ + service->service_handle_.data_; + service_handle_index += 1; + } + + // Use the number of clients to allocate memory in the handles + size_t number_of_clients = clients.size(); + ros_middleware_interface::ClientHandles client_handles; + client_handles.client_count_ = number_of_clients; + // TODO: Avoid redundant malloc's + client_handles.clients_ = static_cast( + std::malloc(sizeof(void *) * number_of_clients)); + if (client_handles.clients_ == NULL) + { + // TODO: Use a different error here? maybe std::bad_alloc? + throw std::runtime_error("Could not malloc for client pointers."); + } + // Then fill the ServiceHandles with ready clients + size_t client_handle_index = 0; + for (auto &client : clients) + { + client_handles.clients_[client_handle_index] = \ + client->client_handle_.data_; + client_handle_index += 1; + } + // Use the number of guard conditions to allocate memory in the handles // Add 2 to the number for the ctrl-c guard cond and the executor's size_t start_of_timer_guard_conds = 2; @@ -254,9 +359,12 @@ protected: timer->guard_condition_.data_; guard_cond_handle_index += 1; } + // Now wait on the waitable subscriptions and timers ros_middleware_interface::wait(subscriber_handles, guard_condition_handles, + service_handles, + client_handles, nonblocking); // If ctrl-c guard condition, return directly if (guard_condition_handles.guard_conditions_[0] != 0) @@ -264,6 +372,7 @@ protected: // Make sure to free memory // TODO: Remove theses when "Avoid redundant malloc's" todo is addressed std::free(subscriber_handles.subscribers_); + std::free(service_handles.services_); std::free(guard_condition_handles.guard_conditions_); return; } @@ -286,9 +395,29 @@ protected: guard_condition_handles_.push_back(handle); } } + // Then the services + for (size_t i = 0; i < number_of_services; ++i) + { + void *handle = service_handles.services_[i]; + if (handle) + { + service_handles_.push_back(handle); + } + } + // Then the clients + for (size_t i = 0; i < number_of_clients; ++i) + { + void *handle = client_handles.clients_[i]; + if (handle) + { + client_handles_.push_back(handle); + } + } + // Make sure to free memory // TODO: Remove theses when "Avoid redundant malloc's" todo is addressed std::free(subscriber_handles.subscribers_); + std::free(service_handles.services_); std::free(guard_condition_handles.guard_conditions_); } @@ -352,6 +481,65 @@ protected: return rclcpp::timer::TimerBase::SharedPtr(); } + rclcpp::service::ServiceBase::SharedPtr + get_service_by_handle(void * service_handle) + { + for (auto weak_node : weak_nodes_) + { + auto node = weak_node.lock(); + if (!node) + { + continue; + } + for (auto weak_group : node->callback_groups_) + { + auto group = weak_group.lock(); + if (!group) + { + continue; + } + for (auto service : group->service_ptrs_) + { + if (service->service_handle_.data_ == service_handle) + { + return service; + } + } + } + } + return rclcpp::service::ServiceBase::SharedPtr(); + } + + rclcpp::client::ClientBase::SharedPtr + get_client_by_handle(void * client_handle) + { + for (auto weak_node : weak_nodes_) + { + auto node = weak_node.lock(); + if (!node) + { + continue; + } + for (auto weak_group : node->callback_groups_) + { + auto group = weak_group.lock(); + if (!group) + { + continue; + } + for (auto client : group->client_ptrs_) + { + if (client->client_handle_.data_ == client_handle) + { + return client; + } + } + } + } + return rclcpp::client::ClientBase::SharedPtr(); + } + + void remove_subscriber_handle_from_subscriber_handles(void *handle) { @@ -364,6 +552,18 @@ protected: guard_condition_handles_.remove(handle); } + void + remove_service_handle_from_service_handles(void *handle) + { + service_handles_.remove(handle); + } + + void + remove_client_handle_from_client_handles(void *handle) + { + client_handles_.remove(handle); + } + rclcpp::node::Node::SharedPtr get_node_by_group(rclcpp::callback_group::CallbackGroup::SharedPtr &group) { @@ -516,6 +716,128 @@ protected: } } + rclcpp::callback_group::CallbackGroup::SharedPtr + get_group_by_service( + rclcpp::service::ServiceBase::SharedPtr &service) + { + for (auto &weak_node : weak_nodes_) + { + auto node = weak_node.lock(); + if (!node) + { + continue; + } + for (auto &weak_group : node->callback_groups_) + { + auto group = weak_group.lock(); + for (auto &serv : group->service_ptrs_) + { + if (serv == service) + { + return group; + } + } + } + } + return rclcpp::callback_group::CallbackGroup::SharedPtr(); + } + + void + get_next_service(std::shared_ptr &any_exec) + { + for (auto handle : service_handles_) + { + auto service = get_service_by_handle(handle); + if (service) + { + // Find the group for this handle and see if it can be serviced + auto group = get_group_by_service(service); + if (!group) + { + // Group was not found, meaning the service is not valid... + // Remove it from the ready list and continue looking + remove_service_handle_from_service_handles(handle); + continue; + } + if (!group->can_be_taken_from_.load()) + { + // Group is mutually exclusive and is being used, so skip it for now + // Leave it to be checked next time, but continue searching + continue; + } + // Otherwise it is safe to set and return the any_exec + any_exec->service = service; + any_exec->callback_group = group; + any_exec->node = get_node_by_group(group); + remove_service_handle_from_service_handles(handle); + return; + } + // Else, the service is no longer valid, remove it and continue + remove_service_handle_from_service_handles(handle); + } + } + + rclcpp::callback_group::CallbackGroup::SharedPtr + get_group_by_client( + rclcpp::client::ClientBase::SharedPtr &client) + { + for (auto &weak_node : weak_nodes_) + { + auto node = weak_node.lock(); + if (!node) + { + continue; + } + for (auto &weak_group : node->callback_groups_) + { + auto group = weak_group.lock(); + for (auto &cli : group->client_ptrs_) + { + if (cli == client) + { + return group; + } + } + } + } + return rclcpp::callback_group::CallbackGroup::SharedPtr(); + } + + void + get_next_client(std::shared_ptr &any_exec) + { + for (auto handle : client_handles_) + { + auto client = get_client_by_handle(handle); + if (client) + { + // Find the group for this handle and see if it can be serviced + auto group = get_group_by_client(client); + if (!group) + { + // Group was not found, meaning the service is not valid... + // Remove it from the ready list and continue looking + remove_client_handle_from_client_handles(handle); + continue; + } + if (!group->can_be_taken_from_.load()) + { + // Group is mutually exclusive and is being used, so skip it for now + // Leave it to be checked next time, but continue searching + continue; + } + // Otherwise it is safe to set and return the any_exec + any_exec->client = client; + any_exec->callback_group = group; + any_exec->node = get_node_by_group(group); + remove_client_handle_from_client_handles(handle); + return; + } + // Else, the service is no longer valid, remove it and continue + remove_client_handle_from_client_handles(handle); + } + } + std::shared_ptr get_next_ready_executable() { @@ -532,6 +854,18 @@ protected: { return any_exec; } + // Check the services to see if there are any that are ready + get_next_service(any_exec); + if (any_exec->service) + { + return any_exec; + } + // Check the clients to see if there are any that are ready + get_next_client(any_exec); + if (any_exec->client) + { + return any_exec; + } // If there is neither a ready timer nor subscription, return a null ptr any_exec.reset(); return any_exec; @@ -580,6 +914,10 @@ private: SubscriberHandles subscriber_handles_; typedef std::list GuardConditionHandles; GuardConditionHandles guard_condition_handles_; + typedef std::list ServiceHandles; + ServiceHandles service_handles_; + typedef std::list ClientHandles; + ClientHandles client_handles_; }; diff --git a/rclcpp/include/rclcpp/node.hpp b/rclcpp/include/rclcpp/node.hpp index 7b69631..cbae283 100644 --- a/rclcpp/include/rclcpp/node.hpp +++ b/rclcpp/include/rclcpp/node.hpp @@ -21,9 +21,11 @@ #include #include +#include #include #include #include +#include #include #include @@ -89,6 +91,22 @@ public: typedef std::weak_ptr CallbackGroupWeakPtr; typedef std::list CallbackGroupWeakPtrList; + /* Create and return a Client. */ + template + typename rclcpp::client::Client::SharedPtr + create_client( + std::string service_name, + rclcpp::callback_group::CallbackGroup::SharedPtr group=nullptr); + + /* Create and return a Service. */ + template + typename rclcpp::service::Service::SharedPtr + create_service( + std::string service_name, + std::function &, + std::shared_ptr&)> callback, + rclcpp::callback_group::CallbackGroup::SharedPtr group=nullptr); + private: RCLCPP_DISABLE_COPY(Node); @@ -106,6 +124,8 @@ private: size_t number_of_subscriptions_; size_t number_of_timers_; + size_t number_of_services_; + size_t number_of_clients_; }; diff --git a/rclcpp/include/rclcpp/node_impl.hpp b/rclcpp/include/rclcpp/node_impl.hpp index 22856df..f08d0bd 100644 --- a/rclcpp/include/rclcpp/node_impl.hpp +++ b/rclcpp/include/rclcpp/node_impl.hpp @@ -23,6 +23,7 @@ #include #include +#include #include @@ -41,7 +42,7 @@ Node::Node(std::string node_name) Node::Node(std::string node_name, context::Context::SharedPtr context) : name_(node_name), context_(context), - number_of_subscriptions_(0), number_of_timers_(0) + number_of_subscriptions_(0), number_of_timers_(0), number_of_services_(0) { node_handle_ = ::ros_middleware_interface::create_node(); using rclcpp::callback_group::CallbackGroupType; @@ -162,4 +163,83 @@ Node::create_wall_timer( group); } +template +typename client::Client::SharedPtr +Node::create_client( + std::string service_name, + rclcpp::callback_group::CallbackGroup::SharedPtr group) +{ + namespace rmi = ::ros_middleware_interface; + + auto &service_type_support_handle = rmi::get_service_type_support_handle(); + + auto client_handle = rmi::create_client(this->node_handle_, + service_type_support_handle, + service_name.c_str()); + + using namespace rclcpp::client; + + auto cli = Client::make_shared(client_handle, + service_name); + + auto cli_base_ptr = std::dynamic_pointer_cast(cli); + if (group) + { + if (!group_in_node(group)) + { + // TODO: use custom exception + throw std::runtime_error("Cannot create client, group not in node."); + } + group->add_client(cli_base_ptr); + } + else + { + default_callback_group_->add_client(cli_base_ptr); + } + number_of_clients_++; + + return cli; +} + +template +typename service::Service::SharedPtr +Node::create_service( + std::string service_name, + std::function &, + std::shared_ptr&)> callback, + rclcpp::callback_group::CallbackGroup::SharedPtr group) +{ + namespace rmi = ::ros_middleware_interface; + + auto &service_type_support_handle = rmi::get_service_type_support_handle(); + + auto service_handle = rmi::create_service(this->node_handle_, + service_type_support_handle, + service_name.c_str()); + + using namespace rclcpp::service; + + auto serv = Service::make_shared(service_handle, + service_name, + callback); + auto serv_base_ptr = std::dynamic_pointer_cast(serv); + + if (group) + { + if (!group_in_node(group)) + { + // TODO: use custom exception + throw std::runtime_error("Cannot create service, group not in node."); + } + group->add_service(serv_base_ptr); + } + else + { + default_callback_group_->add_service(serv_base_ptr); + } + number_of_services_++; + + return serv; +} + #endif /* RCLCPP_RCLCPP_NODE_IMPL_HPP_ */ diff --git a/rclcpp/include/rclcpp/service.hpp b/rclcpp/include/rclcpp/service.hpp new file mode 100644 index 0000000..04d644e --- /dev/null +++ b/rclcpp/include/rclcpp/service.hpp @@ -0,0 +1,122 @@ +/* Copyright 2014 Open Source Robotics Foundation, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef RCLCPP_RCLCPP_SERVICE_HPP_ +#define RCLCPP_RCLCPP_SERVICE_HPP_ + +#include + +#include +#include +#include + +#include + + +namespace rclcpp +{ + +// Forward declaration for friend statement +namespace node {class Node;} + +namespace service +{ + +class ServiceBase +{ + friend class rclcpp::executor::Executor; +public: + RCLCPP_MAKE_SHARED_DEFINITIONS(ServiceBase); + + ServiceBase( + ros_middleware_interface::ServiceHandle service_handle, + std::string &service_name) + : service_handle_(service_handle), service_name_(service_name) + {} + + std::string get_service_name() + { + return this->service_name_; + } + + ros_middleware_interface::ServiceHandle get_service_handle() + { + return this->service_handle_; + } + + virtual std::shared_ptr create_request() = 0; + virtual std::shared_ptr create_request_header() = 0; + virtual void handle_request(std::shared_ptr &request, std::shared_ptr &req_id) = 0; + +private: + RCLCPP_DISABLE_COPY(ServiceBase); + + ros_middleware_interface::ServiceHandle service_handle_; + std::string service_name_; + +}; + +template +class Service : public ServiceBase +{ +public: + typedef std::function< + void(const std::shared_ptr &, + std::shared_ptr&)> CallbackType; + RCLCPP_MAKE_SHARED_DEFINITIONS(Service); + + Service( + ros_middleware_interface::ServiceHandle service_handle, + std::string &service_name, + CallbackType callback) + : ServiceBase(service_handle, service_name), callback_(callback) + {} + + std::shared_ptr create_request() + { + return std::shared_ptr(new typename ServiceT::Request()); + } + + std::shared_ptr create_request_header() + { + return std::shared_ptr(new ros_middleware_interface::RequestId()); + } + + void handle_request(std::shared_ptr &request, std::shared_ptr &req_id) + { + auto typed_request = std::static_pointer_cast(request); + auto typed_req_id = std::static_pointer_cast(req_id); + auto response = std::shared_ptr(new typename ServiceT::Response); + callback_(typed_request, response); + send_response(typed_req_id, response); + } + + void send_response( + std::shared_ptr &req_id, + std::shared_ptr &response) + { + ::ros_middleware_interface::send_response(get_service_handle(), req_id.get(), response.get()); + } + +private: + RCLCPP_DISABLE_COPY(Service); + + CallbackType callback_; +}; + +} /* namespace service */ +} /* namespace rclcpp */ + +#endif /* RCLCPP_RCLCPP_SERVICE_HPP_ */