From f531b029285eab8de6dcba7243700a1befb0f137 Mon Sep 17 00:00:00 2001 From: William Woodall Date: Thu, 5 Nov 2015 18:55:51 -0800 Subject: [PATCH] split headers into cpp files --- ...any_executable.hpp_ => any_executable.hpp} | 0 ...callback_group.hpp_ => callback_group.hpp} | 0 .../rclcpp/{client.hpp_ => client.hpp} | 0 .../rclcpp/{context.hpp_ => context.hpp} | 0 ...fault_context.hpp_ => default_context.hpp} | 0 .../rclcpp/{executor.hpp_ => executor.hpp} | 0 .../rclcpp/{executors.hpp_ => executors.hpp} | 0 ...cutor.hpp_ => multi_threaded_executor.hpp} | 0 ...utor.hpp_ => single_threaded_executor.hpp} | 0 ...manager.hpp_ => intra_process_manager.hpp} | 0 ...e.hpp_ => intra_process_manager_state.hpp} | 0 ..._strategies.hpp_ => memory_strategies.hpp} | 0 ...mory_strategy.hpp_ => memory_strategy.hpp} | 0 .../rclcpp/{parameter.hpp_ => parameter.hpp} | 0 ...meter_client.hpp_ => parameter_client.hpp} | 0 ...ter_service.hpp_ => parameter_service.hpp} | 0 .../rclcpp/{publisher.hpp_ => publisher.hpp} | 0 .../rclcpp/{service.hpp_ => service.hpp} | 0 .../{subscription.hpp_ => subscription.hpp} | 0 .../include/rclcpp/{timer.hpp_ => timer.hpp} | 0 .../rclcpp/{utilities.hpp_ => utilities.hpp} | 0 rclcpp/src/rclcpp/any_executable.cpp | 48 ++ rclcpp/src/rclcpp/callback_group.cpp | 130 ++++ rclcpp/src/rclcpp/client.cpp | 164 +++++ rclcpp/src/rclcpp/context.cpp | 80 +++ .../src/rclcpp/contexts/default_context.cpp | 47 ++ rclcpp/src/rclcpp/executor.cpp | 616 ++++++++++++++++++ rclcpp/src/rclcpp/executors.cpp | 87 +++ .../executors/multi_threaded_executor.cpp | 106 +++ .../executors/single_threaded_executor.cpp | 73 +++ rclcpp/src/rclcpp/intra_process_manager.cpp | 377 +++++++++++ .../rclcpp/intra_process_manager_state.cpp | 270 ++++++++ rclcpp/src/rclcpp/memory_strategies.cpp | 36 + rclcpp/src/rclcpp/memory_strategy.cpp | 255 ++++++++ .../node_impl.hpp => src/rclcpp/node.cpp} | 0 rclcpp/src/rclcpp/parameter.cpp | 337 ++++++++++ rclcpp/src/rclcpp/parameter_client.cpp | 359 ++++++++++ rclcpp/src/rclcpp/parameter_service.cpp | 177 +++++ rclcpp/src/rclcpp/publisher.cpp | 354 ++++++++++ rclcpp/src/rclcpp/service.cpp | 158 +++++ rclcpp/src/rclcpp/subscription.cpp | 284 ++++++++ rclcpp/src/rclcpp/timer.cpp | 177 +++++ rclcpp/src/rclcpp/utilities.cpp | 226 +++++++ 43 files changed, 4361 insertions(+) rename rclcpp/include/rclcpp/{any_executable.hpp_ => any_executable.hpp} (100%) rename rclcpp/include/rclcpp/{callback_group.hpp_ => callback_group.hpp} (100%) rename rclcpp/include/rclcpp/{client.hpp_ => client.hpp} (100%) rename rclcpp/include/rclcpp/{context.hpp_ => context.hpp} (100%) rename rclcpp/include/rclcpp/contexts/{default_context.hpp_ => default_context.hpp} (100%) rename rclcpp/include/rclcpp/{executor.hpp_ => executor.hpp} (100%) rename rclcpp/include/rclcpp/{executors.hpp_ => executors.hpp} (100%) rename rclcpp/include/rclcpp/executors/{multi_threaded_executor.hpp_ => multi_threaded_executor.hpp} (100%) rename rclcpp/include/rclcpp/executors/{single_threaded_executor.hpp_ => single_threaded_executor.hpp} (100%) rename rclcpp/include/rclcpp/{intra_process_manager.hpp_ => intra_process_manager.hpp} (100%) rename rclcpp/include/rclcpp/{intra_process_manager_state.hpp_ => intra_process_manager_state.hpp} (100%) rename rclcpp/include/rclcpp/{memory_strategies.hpp_ => memory_strategies.hpp} (100%) rename rclcpp/include/rclcpp/{memory_strategy.hpp_ => memory_strategy.hpp} (100%) rename rclcpp/include/rclcpp/{parameter.hpp_ => parameter.hpp} (100%) rename rclcpp/include/rclcpp/{parameter_client.hpp_ => parameter_client.hpp} (100%) rename rclcpp/include/rclcpp/{parameter_service.hpp_ => parameter_service.hpp} (100%) rename rclcpp/include/rclcpp/{publisher.hpp_ => publisher.hpp} (100%) rename rclcpp/include/rclcpp/{service.hpp_ => service.hpp} (100%) rename rclcpp/include/rclcpp/{subscription.hpp_ => subscription.hpp} (100%) rename rclcpp/include/rclcpp/{timer.hpp_ => timer.hpp} (100%) rename rclcpp/include/rclcpp/{utilities.hpp_ => utilities.hpp} (100%) create mode 100644 rclcpp/src/rclcpp/any_executable.cpp create mode 100644 rclcpp/src/rclcpp/callback_group.cpp create mode 100644 rclcpp/src/rclcpp/client.cpp create mode 100644 rclcpp/src/rclcpp/context.cpp create mode 100644 rclcpp/src/rclcpp/contexts/default_context.cpp create mode 100644 rclcpp/src/rclcpp/executor.cpp create mode 100644 rclcpp/src/rclcpp/executors.cpp create mode 100644 rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp create mode 100644 rclcpp/src/rclcpp/executors/single_threaded_executor.cpp create mode 100644 rclcpp/src/rclcpp/intra_process_manager.cpp create mode 100644 rclcpp/src/rclcpp/intra_process_manager_state.cpp create mode 100644 rclcpp/src/rclcpp/memory_strategies.cpp create mode 100644 rclcpp/src/rclcpp/memory_strategy.cpp rename rclcpp/{include/rclcpp/node_impl.hpp => src/rclcpp/node.cpp} (100%) create mode 100644 rclcpp/src/rclcpp/parameter.cpp create mode 100644 rclcpp/src/rclcpp/parameter_client.cpp create mode 100644 rclcpp/src/rclcpp/parameter_service.cpp create mode 100644 rclcpp/src/rclcpp/publisher.cpp create mode 100644 rclcpp/src/rclcpp/service.cpp create mode 100644 rclcpp/src/rclcpp/subscription.cpp create mode 100644 rclcpp/src/rclcpp/timer.cpp create mode 100644 rclcpp/src/rclcpp/utilities.cpp diff --git a/rclcpp/include/rclcpp/any_executable.hpp_ b/rclcpp/include/rclcpp/any_executable.hpp similarity index 100% rename from rclcpp/include/rclcpp/any_executable.hpp_ rename to rclcpp/include/rclcpp/any_executable.hpp diff --git a/rclcpp/include/rclcpp/callback_group.hpp_ b/rclcpp/include/rclcpp/callback_group.hpp similarity index 100% rename from rclcpp/include/rclcpp/callback_group.hpp_ rename to rclcpp/include/rclcpp/callback_group.hpp diff --git a/rclcpp/include/rclcpp/client.hpp_ b/rclcpp/include/rclcpp/client.hpp similarity index 100% rename from rclcpp/include/rclcpp/client.hpp_ rename to rclcpp/include/rclcpp/client.hpp diff --git a/rclcpp/include/rclcpp/context.hpp_ b/rclcpp/include/rclcpp/context.hpp similarity index 100% rename from rclcpp/include/rclcpp/context.hpp_ rename to rclcpp/include/rclcpp/context.hpp diff --git a/rclcpp/include/rclcpp/contexts/default_context.hpp_ b/rclcpp/include/rclcpp/contexts/default_context.hpp similarity index 100% rename from rclcpp/include/rclcpp/contexts/default_context.hpp_ rename to rclcpp/include/rclcpp/contexts/default_context.hpp diff --git a/rclcpp/include/rclcpp/executor.hpp_ b/rclcpp/include/rclcpp/executor.hpp similarity index 100% rename from rclcpp/include/rclcpp/executor.hpp_ rename to rclcpp/include/rclcpp/executor.hpp diff --git a/rclcpp/include/rclcpp/executors.hpp_ b/rclcpp/include/rclcpp/executors.hpp similarity index 100% rename from rclcpp/include/rclcpp/executors.hpp_ rename to rclcpp/include/rclcpp/executors.hpp diff --git a/rclcpp/include/rclcpp/executors/multi_threaded_executor.hpp_ b/rclcpp/include/rclcpp/executors/multi_threaded_executor.hpp similarity index 100% rename from rclcpp/include/rclcpp/executors/multi_threaded_executor.hpp_ rename to rclcpp/include/rclcpp/executors/multi_threaded_executor.hpp diff --git a/rclcpp/include/rclcpp/executors/single_threaded_executor.hpp_ b/rclcpp/include/rclcpp/executors/single_threaded_executor.hpp similarity index 100% rename from rclcpp/include/rclcpp/executors/single_threaded_executor.hpp_ rename to rclcpp/include/rclcpp/executors/single_threaded_executor.hpp diff --git a/rclcpp/include/rclcpp/intra_process_manager.hpp_ b/rclcpp/include/rclcpp/intra_process_manager.hpp similarity index 100% rename from rclcpp/include/rclcpp/intra_process_manager.hpp_ rename to rclcpp/include/rclcpp/intra_process_manager.hpp diff --git a/rclcpp/include/rclcpp/intra_process_manager_state.hpp_ b/rclcpp/include/rclcpp/intra_process_manager_state.hpp similarity index 100% rename from rclcpp/include/rclcpp/intra_process_manager_state.hpp_ rename to rclcpp/include/rclcpp/intra_process_manager_state.hpp diff --git a/rclcpp/include/rclcpp/memory_strategies.hpp_ b/rclcpp/include/rclcpp/memory_strategies.hpp similarity index 100% rename from rclcpp/include/rclcpp/memory_strategies.hpp_ rename to rclcpp/include/rclcpp/memory_strategies.hpp diff --git a/rclcpp/include/rclcpp/memory_strategy.hpp_ b/rclcpp/include/rclcpp/memory_strategy.hpp similarity index 100% rename from rclcpp/include/rclcpp/memory_strategy.hpp_ rename to rclcpp/include/rclcpp/memory_strategy.hpp diff --git a/rclcpp/include/rclcpp/parameter.hpp_ b/rclcpp/include/rclcpp/parameter.hpp similarity index 100% rename from rclcpp/include/rclcpp/parameter.hpp_ rename to rclcpp/include/rclcpp/parameter.hpp diff --git a/rclcpp/include/rclcpp/parameter_client.hpp_ b/rclcpp/include/rclcpp/parameter_client.hpp similarity index 100% rename from rclcpp/include/rclcpp/parameter_client.hpp_ rename to rclcpp/include/rclcpp/parameter_client.hpp diff --git a/rclcpp/include/rclcpp/parameter_service.hpp_ b/rclcpp/include/rclcpp/parameter_service.hpp similarity index 100% rename from rclcpp/include/rclcpp/parameter_service.hpp_ rename to rclcpp/include/rclcpp/parameter_service.hpp diff --git a/rclcpp/include/rclcpp/publisher.hpp_ b/rclcpp/include/rclcpp/publisher.hpp similarity index 100% rename from rclcpp/include/rclcpp/publisher.hpp_ rename to rclcpp/include/rclcpp/publisher.hpp diff --git a/rclcpp/include/rclcpp/service.hpp_ b/rclcpp/include/rclcpp/service.hpp similarity index 100% rename from rclcpp/include/rclcpp/service.hpp_ rename to rclcpp/include/rclcpp/service.hpp diff --git a/rclcpp/include/rclcpp/subscription.hpp_ b/rclcpp/include/rclcpp/subscription.hpp similarity index 100% rename from rclcpp/include/rclcpp/subscription.hpp_ rename to rclcpp/include/rclcpp/subscription.hpp diff --git a/rclcpp/include/rclcpp/timer.hpp_ b/rclcpp/include/rclcpp/timer.hpp similarity index 100% rename from rclcpp/include/rclcpp/timer.hpp_ rename to rclcpp/include/rclcpp/timer.hpp diff --git a/rclcpp/include/rclcpp/utilities.hpp_ b/rclcpp/include/rclcpp/utilities.hpp similarity index 100% rename from rclcpp/include/rclcpp/utilities.hpp_ rename to rclcpp/include/rclcpp/utilities.hpp diff --git a/rclcpp/src/rclcpp/any_executable.cpp b/rclcpp/src/rclcpp/any_executable.cpp new file mode 100644 index 0000000..1bdddb8 --- /dev/null +++ b/rclcpp/src/rclcpp/any_executable.cpp @@ -0,0 +1,48 @@ +// Copyright 2015 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef RCLCPP_RCLCPP_ANY_EXECUTABLE_HPP_ +#define RCLCPP_RCLCPP_ANY_EXECUTABLE_HPP_ + +#include + +#include +#include + +namespace rclcpp +{ +namespace executor +{ + +struct AnyExecutable +{ + RCLCPP_SMART_PTR_DEFINITIONS(AnyExecutable); + AnyExecutable() + : subscription(0), timer(0), callback_group(0), node(0) + {} + // Only one of the following pointers will be set. + rclcpp::subscription::SubscriptionBase::SharedPtr subscription; + rclcpp::subscription::SubscriptionBase::SharedPtr subscription_intra_process; + rclcpp::timer::TimerBase::SharedPtr timer; + rclcpp::service::ServiceBase::SharedPtr service; + rclcpp::client::ClientBase::SharedPtr client; + // These are used to keep the scope on the containing items + rclcpp::callback_group::CallbackGroup::SharedPtr callback_group; + rclcpp::node::Node::SharedPtr node; +}; + +} /* executor */ +} /* rclcpp */ + +#endif diff --git a/rclcpp/src/rclcpp/callback_group.cpp b/rclcpp/src/rclcpp/callback_group.cpp new file mode 100644 index 0000000..ca050fa --- /dev/null +++ b/rclcpp/src/rclcpp/callback_group.cpp @@ -0,0 +1,130 @@ +// Copyright 2014 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef RCLCPP_RCLCPP_CALLBACK_GROUP_HPP_ +#define RCLCPP_RCLCPP_CALLBACK_GROUP_HPP_ + +#include +#include +#include + +#include +#include +#include +#include + +namespace rclcpp +{ + +// Forward declarations for friend statement in class CallbackGroup +namespace node +{ +class Node; +} // namespace node + +namespace callback_group +{ + +enum class CallbackGroupType +{ + MutuallyExclusive, + Reentrant +}; + +class CallbackGroup +{ + friend class rclcpp::node::Node; + +public: + RCLCPP_SMART_PTR_DEFINITIONS(CallbackGroup); + + CallbackGroup(CallbackGroupType group_type) + : type_(group_type), can_be_taken_from_(true) + {} + + const std::vector & + get_subscription_ptrs() const + { + return subscription_ptrs_; + } + + const std::vector & + get_timer_ptrs() const + { + return timer_ptrs_; + } + + const std::vector & + get_service_ptrs() const + { + return service_ptrs_; + } + + const std::vector & + get_client_ptrs() const + { + return client_ptrs_; + } + + std::atomic_bool & can_be_taken_from() + { + return can_be_taken_from_; + } + + const CallbackGroupType & type() const + { + return type_; + } + +private: + RCLCPP_DISABLE_COPY(CallbackGroup); + + void + add_subscription( + const subscription::SubscriptionBase::SharedPtr subscription_ptr) + { + subscription_ptrs_.push_back(subscription_ptr); + } + + void + add_timer(const timer::TimerBase::SharedPtr timer_ptr) + { + timer_ptrs_.push_back(timer_ptr); + } + + void + add_service(const service::ServiceBase::SharedPtr service_ptr) + { + service_ptrs_.push_back(service_ptr); + } + + void + add_client(const client::ClientBase::SharedPtr client_ptr) + { + client_ptrs_.push_back(client_ptr); + } + + CallbackGroupType type_; + std::vector subscription_ptrs_; + std::vector timer_ptrs_; + std::vector service_ptrs_; + std::vector client_ptrs_; + std::atomic_bool can_be_taken_from_; + +}; + +} /* namespace callback_group */ +} /* namespace rclcpp */ + +#endif /* RCLCPP_RCLCPP_CALLBACK_GROUP_HPP_ */ diff --git a/rclcpp/src/rclcpp/client.cpp b/rclcpp/src/rclcpp/client.cpp new file mode 100644 index 0000000..7037363 --- /dev/null +++ b/rclcpp/src/rclcpp/client.cpp @@ -0,0 +1,164 @@ +// Copyright 2014 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef RCLCPP_RCLCPP_CLIENT_HPP_ +#define RCLCPP_RCLCPP_CLIENT_HPP_ + +#include +#include +#include +#include +#include +#include + +#include +#include + +#include +#include + +namespace rclcpp +{ + +namespace client +{ + +class ClientBase +{ + +public: + RCLCPP_SMART_PTR_DEFINITIONS_NOT_COPYABLE(ClientBase); + + ClientBase( + std::shared_ptr node_handle, + rmw_client_t * client_handle, + const std::string & service_name) + : node_handle_(node_handle), client_handle_(client_handle), service_name_(service_name) + {} + + virtual ~ClientBase() + { + if (client_handle_) { + if (rmw_destroy_client(client_handle_) != RMW_RET_OK) { + fprintf(stderr, + "Error in destruction of rmw client handle: %s\n", rmw_get_error_string_safe()); + } + } + } + + const std::string & get_service_name() const + { + return this->service_name_; + } + + const rmw_client_t * get_client_handle() const + { + return this->client_handle_; + } + + virtual std::shared_ptr create_response() = 0; + virtual std::shared_ptr create_request_header() = 0; + virtual void handle_response( + std::shared_ptr & request_header, std::shared_ptr & response) = 0; + +private: + RCLCPP_DISABLE_COPY(ClientBase); + + std::shared_ptr node_handle_; + + rmw_client_t * client_handle_; + std::string service_name_; + +}; + +template +class Client : public ClientBase +{ +public: + using Promise = std::promise; + using SharedPromise = std::shared_ptr; + using SharedFuture = std::shared_future; + + using CallbackType = std::function; + + RCLCPP_SMART_PTR_DEFINITIONS(Client); + + Client( + std::shared_ptr node_handle, + rmw_client_t * client_handle, + const std::string & service_name) + : ClientBase(node_handle, client_handle, service_name) + {} + + std::shared_ptr create_response() + { + return std::shared_ptr(new typename ServiceT::Response()); + } + + std::shared_ptr create_request_header() + { + // TODO(wjwwood): This should probably use rmw_request_id's allocator. + // (since it is a C type) + return std::shared_ptr(new rmw_request_id_t); + } + + void handle_response(std::shared_ptr & request_header, std::shared_ptr & response) + { + auto typed_request_header = std::static_pointer_cast(request_header); + auto typed_response = std::static_pointer_cast(response); + int64_t sequence_number = typed_request_header->sequence_number; + // TODO this must check if the sequence_number is valid otherwise the call_promise will be null + auto tuple = this->pending_requests_[sequence_number]; + auto call_promise = std::get<0>(tuple); + auto callback = std::get<1>(tuple); + auto future = std::get<2>(tuple); + this->pending_requests_.erase(sequence_number); + call_promise->set_value(typed_response); + callback(future); + } + + SharedFuture async_send_request( + typename ServiceT::Request::SharedPtr request) + { + return async_send_request(request, [](SharedFuture) {}); + } + + SharedFuture async_send_request( + typename ServiceT::Request::SharedPtr request, + CallbackType cb) + { + int64_t sequence_number; + if (RMW_RET_OK != rmw_send_request(get_client_handle(), request.get(), &sequence_number)) { + // *INDENT-OFF* (prevent uncrustify from making unecessary indents here) + throw std::runtime_error( + std::string("failed to send request: ") + rmw_get_error_string_safe()); + // *INDENT-ON* + } + + SharedPromise call_promise = std::make_shared(); + SharedFuture f(call_promise->get_future()); + pending_requests_[sequence_number] = std::make_tuple(call_promise, cb, f); + return f; + } + +private: + RCLCPP_DISABLE_COPY(Client); + + std::map> pending_requests_; +}; + +} /* namespace client */ +} /* namespace rclcpp */ + +#endif /* RCLCPP_RCLCPP_CLIENT_HPP_ */ diff --git a/rclcpp/src/rclcpp/context.cpp b/rclcpp/src/rclcpp/context.cpp new file mode 100644 index 0000000..1c56cbb --- /dev/null +++ b/rclcpp/src/rclcpp/context.cpp @@ -0,0 +1,80 @@ +// Copyright 2014 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef RCLCPP_RCLCPP_CONTEXT_HPP_ +#define RCLCPP_RCLCPP_CONTEXT_HPP_ + +#include + +#include + +#include +#include +#include +#include +#include + +#include + +namespace rclcpp +{ + +namespace context +{ + +class Context +{ +public: + RCLCPP_SMART_PTR_DEFINITIONS(Context); + + Context() {} + + template + std::shared_ptr + get_sub_context(Args && ... args) + { + std::lock_guard lock(mutex_); + + std::type_index type_i(typeid(SubContext)); + std::shared_ptr sub_context; + auto it = sub_contexts_.find(type_i); + if (it == sub_contexts_.end()) { + // It doesn't exist yet, make it + // *INDENT-OFF* (prevent uncrustify from making unecessary indents here) + sub_context = std::shared_ptr( + new SubContext(std::forward(args) ...), + [] (SubContext * sub_context_ptr) { + delete sub_context_ptr; + }); + // *INDENT-ON* + sub_contexts_[type_i] = sub_context; + } else { + // It exists, get it out and cast it. + sub_context = std::static_pointer_cast(it->second); + } + return sub_context; + } + +private: + RCLCPP_DISABLE_COPY(Context); + + std::unordered_map> sub_contexts_; + std::mutex mutex_; + +}; + +} /* namespace context */ +} /* namespace rclcpp */ + +#endif /* RCLCPP_RCLCPP_CONTEXT_HPP_ */ diff --git a/rclcpp/src/rclcpp/contexts/default_context.cpp b/rclcpp/src/rclcpp/contexts/default_context.cpp new file mode 100644 index 0000000..c10225e --- /dev/null +++ b/rclcpp/src/rclcpp/contexts/default_context.cpp @@ -0,0 +1,47 @@ +// Copyright 2014 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef RCLCPP_RCLCPP_CONTEXTS_DEFAULT_CONTEXT_HPP_ +#define RCLCPP_RCLCPP_CONTEXTS_DEFAULT_CONTEXT_HPP_ + +#include + +namespace rclcpp +{ +namespace contexts +{ +namespace default_context +{ + +class DefaultContext : public rclcpp::context::Context +{ +public: + RCLCPP_SMART_PTR_DEFINITIONS(DefaultContext); + + DefaultContext() {} + +}; + +DefaultContext::SharedPtr +get_global_default_context() +{ + static DefaultContext::SharedPtr default_context = DefaultContext::make_shared(); + return default_context; +} + +} // namespace default_context +} // namespace contexts +} // namespace rclcpp + +#endif /* RCLCPP_RCLCPP_CONTEXTS_DEFAULT_CONTEXT_HPP_ */ diff --git a/rclcpp/src/rclcpp/executor.cpp b/rclcpp/src/rclcpp/executor.cpp new file mode 100644 index 0000000..cc3fe42 --- /dev/null +++ b/rclcpp/src/rclcpp/executor.cpp @@ -0,0 +1,616 @@ +// Copyright 2014 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef RCLCPP__EXECUTOR_HPP_ +#define RCLCPP__EXECUTOR_HPP_ + +#include +#include +#include +#include +#include +#include +#include + +#include "rcl_interfaces/msg/intra_process_message.hpp" + +#include "rclcpp/any_executable.hpp" +#include "rclcpp/macros.hpp" +#include "rclcpp/memory_strategies.hpp" +#include "rclcpp/memory_strategy.hpp" +#include "rclcpp/node.hpp" +#include "rclcpp/utilities.hpp" + +namespace rclcpp +{ +namespace executor +{ + +/// Coordinate the order and timing of available communication tasks. +/** + * Executor provides spin functions (including spin_node_once and spin_some). + * It coordinates the nodes and callback groups by looking for available work and completing it, + * based on the threading or concurrency scheme provided by the subclass implementation. + * An example of available work is executing a subscription callback, or a timer callback. + * The executor structure allows for a decoupling of the communication graph and the execution + * model. + * See SingleThreadedExecutor and MultiThreadedExecutor for examples of execution paradigms. + */ +class Executor +{ +public: + RCLCPP_SMART_PTR_DEFINITIONS_NOT_COPYABLE(Executor); + + /// Default constructor. + // \param[in] ms The memory strategy to be used with this executor. + explicit Executor(memory_strategy::MemoryStrategy::SharedPtr ms = + memory_strategies::create_default_strategy()) + : interrupt_guard_condition_(rmw_create_guard_condition()), + memory_strategy_(ms) + { + } + + /// Default destructor. + virtual ~Executor() + { + // Try to deallocate the interrupt guard condition. + if (interrupt_guard_condition_ != nullptr) { + rmw_ret_t status = rmw_destroy_guard_condition(interrupt_guard_condition_); + if (status != RMW_RET_OK) { + fprintf(stderr, + "[rclcpp::error] failed to destroy guard condition: %s\n", rmw_get_error_string_safe()); + } + } + } + + /// Do work periodically as it becomes available to us. Blocking call, may block indefinitely. + // It is up to the implementation of Executor to implement spin. + virtual void spin() = 0; + + /// Add a node to the executor. + /** + * An executor can have zero or more nodes which provide work during `spin` functions. + * \param[in] node_ptr Shared pointer to the node to be added. + * \param[in] notify True to trigger the interrupt guard condition during this function. If + * the executor is blocked at the rmw layer while waiting for work and it is notified that a new + * node was added, it will wake up. + */ + virtual void + add_node(rclcpp::node::Node::SharedPtr node_ptr, bool notify = true) + { + // Check to ensure node not already added + for (auto & weak_node : weak_nodes_) { + auto node = weak_node.lock(); + if (node == node_ptr) { + // TODO(jacquelinekay): Use a different error here? + throw std::runtime_error("Cannot add node to executor, node already added."); + } + } + weak_nodes_.push_back(node_ptr); + if (notify) { + // Interrupt waiting to handle new node + rmw_ret_t status = rmw_trigger_guard_condition(interrupt_guard_condition_); + if (status != RMW_RET_OK) { + throw std::runtime_error(rmw_get_error_string_safe()); + } + } + } + + /// Remove a node from the executor. + /** + * \param[in] node_ptr Shared pointer to the node to remove. + * \param[in] notify True to trigger the interrupt guard condition and wake up the executor. + * This is useful if the last node was removed from the executor while the executor was blocked + * waiting for work in another thread, because otherwise the executor would never be notified. + */ + virtual void + remove_node(rclcpp::node::Node::SharedPtr node_ptr, bool notify = true) + { + bool node_removed = false; + weak_nodes_.erase( + std::remove_if( + weak_nodes_.begin(), weak_nodes_.end(), + [&](std::weak_ptr & i) + { + bool matched = (i.lock() == node_ptr); + node_removed |= matched; + return matched; + })); + if (notify) { + // If the node was matched and removed, interrupt waiting + if (node_removed) { + rmw_ret_t status = rmw_trigger_guard_condition(interrupt_guard_condition_); + if (status != RMW_RET_OK) { + throw std::runtime_error(rmw_get_error_string_safe()); + } + } + } + } + + /// Add a node to executor, execute the next available unit of work, and remove the node. + /** + * \param[in] node Shared pointer to the node to add. + * \param[in] timeout How long to wait for work to become available. Negative values cause + * spin_node_once to block indefinitely (the default behavior). A timeout of 0 causes this + * function to be non-blocking. + */ + template + void spin_node_once(rclcpp::node::Node::SharedPtr node, + std::chrono::duration timeout = std::chrono::duration(-1)) + { + this->add_node(node, false); + // non-blocking = true + auto any_exec = get_next_executable(timeout); + if (any_exec) { + execute_any_executable(any_exec); + } + this->remove_node(node, false); + } + + /// Add a node, complete all immediately available work, and remove the node. + /** + * \param[in] node Shared pointer to the node to add. + */ + void spin_node_some(rclcpp::node::Node::SharedPtr node) + { + this->add_node(node, false); + spin_some(); + this->remove_node(node, false); + } + + /// Complete all available queued work without blocking. + /** + * This function can be overridden. The default implementation is suitable for a + * single-threaded model of execution. + * Adding subscriptions, timers, services, etc. with blocking callbacks will cause this function + * to block (which may have unintended consequences). + */ + virtual void spin_some() + { + while (AnyExecutable::SharedPtr any_exec = + get_next_executable(std::chrono::milliseconds::zero())) + { + execute_any_executable(any_exec); + } + } + + /// Support dynamic switching of the memory strategy. + /** + * Switching the memory strategy while the executor is spinning in another threading could have + * unintended consequences. + * \param[in] memory_strategy Shared pointer to the memory strategy to set. + */ + void + set_memory_strategy(memory_strategy::MemoryStrategy::SharedPtr memory_strategy) + { + if (memory_strategy == nullptr) { + throw std::runtime_error("Received NULL memory strategy in executor."); + } + memory_strategy_ = memory_strategy; + } + +protected: + /// Find the next available executable and do the work associated with it. + /** \param[in] any_exec Union structure that can hold any executable type (timer, subscription, + * service, client). + */ + void + execute_any_executable(AnyExecutable::SharedPtr any_exec) + { + if (!any_exec) { + return; + } + if (any_exec->timer) { + execute_timer(any_exec->timer); + } + if (any_exec->subscription) { + execute_subscription(any_exec->subscription); + } + if (any_exec->subscription_intra_process) { + execute_intra_process_subscription(any_exec->subscription_intra_process); + } + if (any_exec->service) { + execute_service(any_exec->service); + } + if (any_exec->client) { + execute_client(any_exec->client); + } + // Reset the callback_group, regardless of type + any_exec->callback_group->can_be_taken_from().store(true); + // Wake the wait, because it may need to be recalculated or work that + // was previously blocked is now available. + rmw_ret_t status = rmw_trigger_guard_condition(interrupt_guard_condition_); + if (status != RMW_RET_OK) { + throw std::runtime_error(rmw_get_error_string_safe()); + } + } + + static void + execute_subscription( + rclcpp::subscription::SubscriptionBase::SharedPtr subscription) + { + std::shared_ptr message = subscription->create_message(); + bool taken = false; + rmw_message_info_t message_info; + auto ret = + rmw_take_with_info(subscription->get_subscription_handle(), + message.get(), &taken, &message_info); + if (ret == RMW_RET_OK) { + if (taken) { + message_info.from_intra_process = false; + subscription->handle_message(message, message_info); + } + } else { + fprintf(stderr, + "[rclcpp::error] take failed for subscription on topic '%s': %s\n", + subscription->get_topic_name().c_str(), rmw_get_error_string_safe()); + } + subscription->return_message(message); + } + + static void + execute_intra_process_subscription( + rclcpp::subscription::SubscriptionBase::SharedPtr subscription) + { + rcl_interfaces::msg::IntraProcessMessage ipm; + bool taken = false; + rmw_message_info_t message_info; + rmw_ret_t status = rmw_take_with_info( + subscription->get_intra_process_subscription_handle(), + &ipm, + &taken, + &message_info); + if (status == RMW_RET_OK) { + if (taken) { + message_info.from_intra_process = true; + subscription->handle_intra_process_message(ipm, message_info); + } + } else { + fprintf(stderr, + "[rclcpp::error] take failed for intra process subscription on topic '%s': %s\n", + subscription->get_topic_name().c_str(), rmw_get_error_string_safe()); + } + } + + static void + execute_timer( + rclcpp::timer::TimerBase::SharedPtr timer) + { + timer->execute_callback(); + } + + static void + execute_service( + rclcpp::service::ServiceBase::SharedPtr service) + { + std::shared_ptr request_header = service->create_request_header(); + std::shared_ptr request = service->create_request(); + bool taken = false; + rmw_ret_t status = rmw_take_request( + service->get_service_handle(), + request_header.get(), + request.get(), + &taken); + if (status == RMW_RET_OK) { + if (taken) { + service->handle_request(request_header, request); + } + } else { + fprintf(stderr, + "[rclcpp::error] take request failed for server of service '%s': %s\n", + service->get_service_name().c_str(), rmw_get_error_string_safe()); + } + } + + static void + execute_client( + rclcpp::client::ClientBase::SharedPtr client) + { + std::shared_ptr request_header = client->create_request_header(); + std::shared_ptr response = client->create_response(); + bool taken = false; + rmw_ret_t status = rmw_take_response( + client->get_client_handle(), + request_header.get(), + response.get(), + &taken); + if (status == RMW_RET_OK) { + if (taken) { + client->handle_response(request_header, response); + } + } else { + fprintf(stderr, + "[rclcpp::error] take response failed for client of service '%s': %s\n", + client->get_service_name().c_str(), rmw_get_error_string_safe()); + } + } + +/*** Populate class storage from stored weak node pointers and wait. ***/ + + template + void + wait_for_work(std::chrono::duration timeout = std::chrono::duration(-1)) + { + memory_strategy_->clear_active_entities(); + + // Collect the subscriptions and timers to be waited on + bool has_invalid_weak_nodes = memory_strategy_->collect_entities(weak_nodes_); + + // Clean up any invalid nodes, if they were detected + if (has_invalid_weak_nodes) { + weak_nodes_.erase( + remove_if(weak_nodes_.begin(), weak_nodes_.end(), + [](std::weak_ptr i) + { + return i.expired(); + })); + } + + // Use the number of subscriptions to allocate memory in the handles + rmw_subscriptions_t subscriber_handles; + subscriber_handles.subscriber_count = + memory_strategy_->fill_subscriber_handles(subscriber_handles.subscribers); + + rmw_services_t service_handles; + service_handles.service_count = + memory_strategy_->fill_service_handles(service_handles.services); + + rmw_clients_t client_handles; + client_handles.client_count = + memory_strategy_->fill_client_handles(client_handles.clients); + + // The number of guard conditions is fixed at 2: 1 for the ctrl-c guard cond, + // and one for the executor's guard cond (interrupt_guard_condition_) + size_t number_of_guard_conds = 2; + rmw_guard_conditions_t guard_condition_handles; + guard_condition_handles.guard_condition_count = number_of_guard_conds; + guard_condition_handles.guard_conditions = static_cast(guard_cond_handles_.data()); + if (guard_condition_handles.guard_conditions == NULL && + number_of_guard_conds > 0) + { + // TODO(wjwwood): Use a different error here? maybe std::bad_alloc? + throw std::runtime_error("Could not malloc for guard condition pointers."); + } + // Put the global ctrl-c guard condition in + assert(guard_condition_handles.guard_condition_count > 1); + guard_condition_handles.guard_conditions[0] = \ + rclcpp::utilities::get_global_sigint_guard_condition()->data; + // Put the executor's guard condition in + guard_condition_handles.guard_conditions[1] = \ + interrupt_guard_condition_->data; + + rmw_time_t * wait_timeout = NULL; + rmw_time_t rmw_timeout; + + auto next_timer_duration = get_earliest_timer(); + // If the next timer timeout must preempt the requested timeout + // or if the requested timeout blocks forever, and there exists a valid timer, + // replace the requested timeout with the next timeout. + bool has_valid_timer = next_timer_duration >= std::chrono::nanoseconds::zero(); + if ((next_timer_duration < timeout || + timeout < std::chrono::duration::zero()) && has_valid_timer) + { + rmw_timeout.sec = + std::chrono::duration_cast(next_timer_duration).count(); + rmw_timeout.nsec = next_timer_duration.count() % (1000 * 1000 * 1000); + wait_timeout = &rmw_timeout; + } else if (timeout >= std::chrono::duration::zero()) { + // Convert timeout representation to rmw_time + rmw_timeout.sec = std::chrono::duration_cast(timeout).count(); + rmw_timeout.nsec = std::chrono::duration_cast(timeout).count() % + (1000 * 1000 * 1000); + wait_timeout = &rmw_timeout; + } + + // Now wait on the waitable subscriptions and timers + rmw_ret_t status = rmw_wait( + &subscriber_handles, + &guard_condition_handles, + &service_handles, + &client_handles, + wait_timeout); + if (status != RMW_RET_OK && status != RMW_RET_TIMEOUT) { + throw std::runtime_error(rmw_get_error_string_safe()); + } + // If ctrl-c guard condition, return directly + if (guard_condition_handles.guard_conditions[0] != 0) { + // Make sure to free or clean memory + memory_strategy_->clear_handles(); + return; + } + + memory_strategy_->revalidate_handles(); + } + +/******************************/ + rclcpp::node::Node::SharedPtr + get_node_by_group(rclcpp::callback_group::CallbackGroup::SharedPtr group) + { + if (!group) { + return rclcpp::node::Node::SharedPtr(); + } + for (auto & weak_node : weak_nodes_) { + auto node = weak_node.lock(); + if (!node) { + continue; + } + for (auto & weak_group : node->get_callback_groups()) { + auto callback_group = weak_group.lock(); + if (callback_group == group) { + return node; + } + } + } + return rclcpp::node::Node::SharedPtr(); + } + + rclcpp::callback_group::CallbackGroup::SharedPtr + get_group_by_timer( + rclcpp::timer::TimerBase::SharedPtr timer) + { + for (auto & weak_node : weak_nodes_) { + auto node = weak_node.lock(); + if (!node) { + continue; + } + for (auto & weak_group : node->get_callback_groups()) { + auto group = weak_group.lock(); + if (!group) { + continue; + } + for (auto & weak_timer : group->get_timer_ptrs()) { + auto t = weak_timer.lock(); + if (t == timer) { + return group; + } + } + } + } + return rclcpp::callback_group::CallbackGroup::SharedPtr(); + } + + void + get_next_timer(AnyExecutable::SharedPtr any_exec) + { + for (auto & weak_node : weak_nodes_) { + auto node = weak_node.lock(); + if (!node) { + continue; + } + for (auto & weak_group : node->get_callback_groups()) { + auto group = weak_group.lock(); + if (!group || !group->can_be_taken_from().load()) { + continue; + } + for (auto & timer_ref : group->get_timer_ptrs()) { + auto timer = timer_ref.lock(); + if (timer && timer->check_and_trigger()) { + any_exec->timer = timer; + any_exec->callback_group = group; + node = get_node_by_group(group); + return; + } + } + } + } + } + + std::chrono::nanoseconds + get_earliest_timer() + { + std::chrono::nanoseconds latest = std::chrono::nanoseconds::max(); + bool timers_empty = true; + for (auto & weak_node : weak_nodes_) { + auto node = weak_node.lock(); + if (!node) { + continue; + } + for (auto & weak_group : node->get_callback_groups()) { + auto group = weak_group.lock(); + if (!group || !group->can_be_taken_from().load()) { + continue; + } + for (auto & timer_ref : group->get_timer_ptrs()) { + timers_empty = false; + // Check the expected trigger time + auto timer = timer_ref.lock(); + if (timer && timer->time_until_trigger() < latest) { + latest = timer->time_until_trigger(); + } + } + } + } + if (timers_empty) { + return std::chrono::nanoseconds(-1); + } + return latest; + } + + AnyExecutable::SharedPtr + get_next_ready_executable() + { + auto any_exec = memory_strategy_->instantiate_next_executable(); + // Check the timers to see if there are any that are ready, if so return + get_next_timer(any_exec); + if (any_exec->timer) { + return any_exec; + } + // Check the subscriptions to see if there are any that are ready + memory_strategy_->get_next_subscription(any_exec, weak_nodes_); + if (any_exec->subscription || any_exec->subscription_intra_process) { + return any_exec; + } + // Check the services to see if there are any that are ready + memory_strategy_->get_next_service(any_exec, weak_nodes_); + if (any_exec->service) { + return any_exec; + } + // Check the clients to see if there are any that are ready + memory_strategy_->get_next_client(any_exec, weak_nodes_); + if (any_exec->client) { + return any_exec; + } + // If there is no ready executable, return a null ptr + return nullptr; + } + + template + AnyExecutable::SharedPtr + get_next_executable(std::chrono::duration timeout = std::chrono::duration(-1)) + { + // Check to see if there are any subscriptions or timers needing service + // TODO(wjwwood): improve run to run efficiency of this function + auto any_exec = get_next_ready_executable(); + // If there are none + if (!any_exec) { + // Wait for subscriptions or timers to work on + wait_for_work(timeout); + // Try again + any_exec = get_next_ready_executable(); + } + // At this point any_exec should be valid with either a valid subscription + // or a valid timer, or it should be a null shared_ptr + if (any_exec) { + // If it is valid, check to see if the group is mutually exclusive or + // not, then mark it accordingly + if (any_exec->callback_group && any_exec->callback_group->type() == \ + callback_group::CallbackGroupType::MutuallyExclusive) + { + // It should not have been taken otherwise + assert(any_exec->callback_group->can_be_taken_from().load()); + // Set to false to indicate something is being run from this group + any_exec->callback_group->can_be_taken_from().store(false); + } + } + return any_exec; + } + + /// Guard condition for signaling the rmw layer to wake up for special events. + rmw_guard_condition_t * interrupt_guard_condition_; + + /// The memory strategy: an interface for handling user-defined memory allocation strategies. + memory_strategy::MemoryStrategy::SharedPtr memory_strategy_; + +private: + RCLCPP_DISABLE_COPY(Executor); + + std::vector> weak_nodes_; + std::array guard_cond_handles_; +}; + +} // namespace executor +} // namespace rclcpp + +#endif // RCLCPP__EXECUTOR_HPP_ diff --git a/rclcpp/src/rclcpp/executors.cpp b/rclcpp/src/rclcpp/executors.cpp new file mode 100644 index 0000000..045cc48 --- /dev/null +++ b/rclcpp/src/rclcpp/executors.cpp @@ -0,0 +1,87 @@ +// Copyright 2014 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef RCLCPP_RCLCPP_EXECUTORS_HPP_ +#define RCLCPP_RCLCPP_EXECUTORS_HPP_ + +#include +#include +#include +#include +#include + +namespace rclcpp +{ +namespace executors +{ + +using rclcpp::executors::multi_threaded_executor::MultiThreadedExecutor; +using rclcpp::executors::single_threaded_executor::SingleThreadedExecutor; + +/// Return codes to be used with spin_until_future_complete. +/** + * SUCCESS: The future is complete and can be accessed with "get" without blocking. + * INTERRUPTED: The future is not complete, spinning was interrupted by Ctrl-C or another error. + * TIMEOUT: Spinning timed out. + */ +enum FutureReturnCode {SUCCESS, INTERRUPTED, TIMEOUT}; + +/// Spin (blocking) until the future is complete, until the function times out (if applicable), +/// or until rclcpp is interrupted. +/** + * \param[in] executor The executor which will spin the node. + * \param[in] node_ptr The node to spin. + * \param[in] future The future to wait on. If SUCCESS, the future is safe to access after this function + * \param[in] timeout Optional timeout parameter, which gets passed to Executor::spin_node_once. + -1 is block forever, 0 is non-blocking. + If the time spent inside the blocking loop exceeds this timeout, return a TIMEOUT return code. + * \return The return code, one of SUCCESS, INTERRUPTED, or TIMEOUT. + */ +template +FutureReturnCode +spin_node_until_future_complete( + rclcpp::executor::Executor & executor, rclcpp::node::Node::SharedPtr node_ptr, + std::shared_future & future, + std::chrono::duration timeout = std::chrono::duration(-1)) +{ + // TODO(wjwwood): does not work recursively right, can't call spin_node_until_future_complete + // inside a callback executed by an executor. + + // Check the future before entering the while loop. + // If the future is already complete, don't try to spin. + std::future_status status = future.wait_for(std::chrono::seconds(0)); + + auto start_time = std::chrono::system_clock::now(); + + while (status != std::future_status::ready && rclcpp::utilities::ok()) { + executor.spin_node_once(node_ptr, timeout); + if (timeout.count() >= 0) { + if (start_time + timeout < std::chrono::system_clock::now()) { + return TIMEOUT; + } + } + status = future.wait_for(std::chrono::seconds(0)); + } + + // If the future completed, and we weren't interrupted by ctrl-C, return the response + if (status == std::future_status::ready) { + return FutureReturnCode::SUCCESS; + } + return FutureReturnCode::INTERRUPTED; +} + +} // namespace executors +} // namespace rclcpp + +#endif /* RCLCPP_RCLCPP_EXECUTORS_HPP_ */ diff --git a/rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp b/rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp new file mode 100644 index 0000000..82b1f84 --- /dev/null +++ b/rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp @@ -0,0 +1,106 @@ +// Copyright 2014 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef RCLCPP__EXECUTORS__MULTI_THREADED_EXECUTOR_HPP_ +#define RCLCPP__EXECUTORS__MULTI_THREADED_EXECUTOR_HPP_ + +#include + +#include +#include +#include +#include +#include + +#include "rclcpp/executor.hpp" +#include "rclcpp/macros.hpp" +#include "rclcpp/node.hpp" +#include "rclcpp/utilities.hpp" + +namespace rclcpp +{ +namespace executors +{ +namespace multi_threaded_executor +{ + +class MultiThreadedExecutor : public executor::Executor +{ +public: + RCLCPP_SMART_PTR_DEFINITIONS(MultiThreadedExecutor); + + MultiThreadedExecutor(memory_strategy::MemoryStrategy::SharedPtr ms = + memory_strategies::create_default_strategy()) + : executor::Executor(ms) + { + number_of_threads_ = std::thread::hardware_concurrency(); + if (number_of_threads_ == 0) { + number_of_threads_ = 1; + } + } + + virtual ~MultiThreadedExecutor() {} + + void + spin() + { + std::vector threads; + { + std::lock_guard wait_lock(wait_mutex_); + size_t thread_id_ = 1; // Use a _ suffix to avoid shadowing `rclcpp::thread_id` + for (size_t i = number_of_threads_; i > 0; --i) { + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + auto func = std::bind(&MultiThreadedExecutor::run, this, thread_id_++); + threads.emplace_back(func); + } + } + for (auto & thread : threads) { + thread.join(); + } + } + + size_t + get_number_of_threads() + { + return number_of_threads_; + } + +private: + void run(size_t this_thread_id) + { + rclcpp::thread_id = this_thread_id; + while (rclcpp::utilities::ok()) { + executor::AnyExecutable::SharedPtr any_exec; + { + std::lock_guard wait_lock(wait_mutex_); + if (!rclcpp::utilities::ok()) { + return; + } + any_exec = get_next_executable(); + } + execute_any_executable(any_exec); + } + } + + RCLCPP_DISABLE_COPY(MultiThreadedExecutor); + + std::mutex wait_mutex_; + size_t number_of_threads_; +}; + +} // namespace multi_threaded_executor +} // namespace executors +} // namespace rclcpp + +#endif // RCLCPP__EXECUTORS__MULTI_THREADED_EXECUTOR_HPP_ diff --git a/rclcpp/src/rclcpp/executors/single_threaded_executor.cpp b/rclcpp/src/rclcpp/executors/single_threaded_executor.cpp new file mode 100644 index 0000000..be87e19 --- /dev/null +++ b/rclcpp/src/rclcpp/executors/single_threaded_executor.cpp @@ -0,0 +1,73 @@ +// Copyright 2014 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef RCLCPP__EXECUTORS__SINGLE_THREADED_EXECUTOR_HPP_ +#define RCLCPP__EXECUTORS__SINGLE_THREADED_EXECUTOR_HPP_ + +#include + +#include +#include +#include +#include + +#include "rclcpp/executor.hpp" +#include "rclcpp/macros.hpp" +#include "rclcpp/memory_strategies.hpp" +#include "rclcpp/node.hpp" +#include "rclcpp/utilities.hpp" +#include "rclcpp/rate.hpp" + +namespace rclcpp +{ +namespace executors +{ +namespace single_threaded_executor +{ + +/// Single-threaded executor implementation +// This is the default executor created by rclcpp::spin. +class SingleThreadedExecutor : public executor::Executor +{ +public: + RCLCPP_SMART_PTR_DEFINITIONS(SingleThreadedExecutor); + + /// Default constructor. See the default constructor for Executor. + SingleThreadedExecutor(memory_strategy::MemoryStrategy::SharedPtr ms = + memory_strategies::create_default_strategy()) + : executor::Executor(ms) {} + + /// Default destrcutor. + virtual ~SingleThreadedExecutor() {} + + /// Single-threaded implementation of spin. + // This function will block until work comes in, execute it, and keep blocking. + // It will only be interrupt by a CTRL-C (managed by the global signal handler). + void spin() + { + while (rclcpp::utilities::ok()) { + auto any_exec = get_next_executable(); + execute_any_executable(any_exec); + } + } + +private: + RCLCPP_DISABLE_COPY(SingleThreadedExecutor); +}; + +} // namespace single_threaded_executor +} // namespace executors +} // namespace rclcpp + +#endif // RCLCPP__EXECUTORS__SINGLE_THREADED_EXECUTOR_HPP_ diff --git a/rclcpp/src/rclcpp/intra_process_manager.cpp b/rclcpp/src/rclcpp/intra_process_manager.cpp new file mode 100644 index 0000000..d49d91c --- /dev/null +++ b/rclcpp/src/rclcpp/intra_process_manager.cpp @@ -0,0 +1,377 @@ +// Copyright 2015 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef RCLCPP__INTRA_PROCESS_MANAGER_HPP_ +#define RCLCPP__INTRA_PROCESS_MANAGER_HPP_ + +#include + +#include +#include +#include +#include +#include +#include +#include +#include + +#include "rclcpp/allocator/allocator_deleter.hpp" +#include "rclcpp/intra_process_manager_state.hpp" +#include "rclcpp/mapped_ring_buffer.hpp" +#include "rclcpp/macros.hpp" +#include "rclcpp/publisher.hpp" +#include "rclcpp/subscription.hpp" + +namespace rclcpp +{ +namespace intra_process_manager +{ + +/// This class facilitates intra process communication between nodes. +/* This class is used in the creation of publishers and subscriptions. + * A singleton instance of this class is owned by a rclcpp::Context and a + * rclcpp::Node can use an associated Context to get an instance of this class. + * Nodes which do not have a common Context will not exchange intra process + * messages because they will not share access to an instance of this class. + * + * When a Node creates a publisher or subscription, it will register them + * with this class. + * The node will also hook into the publisher's publish call + * in order to do intra process related work. + * + * When a publisher is created, it advertises on the topic the user provided, + * as well as a "shadowing" topic of type rcl_interfaces/IntraProcessMessage. + * For instance, if the user specified the topic '/namespace/chatter', then the + * corresponding intra process topic might be '/namespace/chatter__intra'. + * The publisher is also allocated an id which is unique among all publishers + * and subscriptions in this process. + * Additionally, when registered with this class a ring buffer is created and + * owned by this class as a temporary place to hold messages destined for intra + * process subscriptions. + * + * When a subscription is created, it subscribes to the topic provided by the + * user as well as to the corresponding intra process topic. + * It is also gets a unique id from the singleton instance of this class which + * is unique among publishers and subscriptions. + * + * When the user publishes a message, the message is stored by calling + * store_intra_process_message on this class. + * The instance of that message is uniquely identified by a publisher id and a + * message sequence number. + * The publisher id, message sequence pair is unique with in the process. + * At that point a list of the id's of intra process subscriptions which have + * been registered with the singleton instance of this class are stored with + * the message instance so that delivery is only made to those subscriptions. + * Then an instance of rcl_interfaces/IntraProcessMessage is published to the + * intra process topic which is specific to the topic specified by the user. + * + * When an instance of rcl_interfaces/IntraProcessMessage is received by a + * subscription, then it is handled by calling take_intra_process_message + * on a singleton of this class. + * The subscription passes a publisher id, message sequence pair which + * uniquely identifies the message instance it was suppose to receive as well + * as the subscriptions unique id. + * If the message is still being held by this class and the subscription's id + * is in the list of intended subscriptions then the message is returned. + * If either of those predicates are not satisfied then the message is not + * returned and the subscription does not call the users callback. + * + * Since the publisher builds a list of destined subscriptions on publish, and + * other requests are ignored, this class knows how many times a message + * instance should be requested. + * The final time a message is requested, the ownership is passed out of this + * class and passed to the final subscription, effectively freeing space in + * this class's internal storage. + * + * Since a topic is being used to ferry notifications about new intra process + * messages between publishers and subscriptions, it is possible for that + * notification to be lost. + * It is also possible that a subscription which was available when publish was + * called will no longer exist once the notification gets posted. + * In both cases this might result in a message instance getting requested + * fewer times than expected. + * This is why the internal storage of this class is a ring buffer. + * That way if a message is orphaned it will eventually be dropped from storage + * when a new message instance is stored and will not result in a memory leak. + * + * However, since the storage system is finite, this also means that a message + * instance might get displaced by an incoming message instance before all + * interested parties have called take_intra_process_message. + * Because of this the size of the internal storage should be carefully + * considered. + * + * /TODO(wjwwood): update to include information about handling latching. + * /TODO(wjwwood): consider thread safety of the class. + * + * This class is neither CopyConstructable nor CopyAssignable. + */ +class IntraProcessManager +{ +private: + RCLCPP_DISABLE_COPY(IntraProcessManager); + +public: + RCLCPP_SMART_PTR_DEFINITIONS(IntraProcessManager); + + explicit IntraProcessManager( + IntraProcessManagerStateBase::SharedPtr state = create_default_state() + ) + : state_(state) + { + } + + /// Register a subscription with the manager, returns subscriptions unique id. + /* In addition to generating a unique intra process id for the subscription, + * this method also stores the topic name of the subscription. + * + * This method is normally called during the creation of a subscription, + * but after it creates the internal intra process rmw_subscription_t. + * + * This method will allocate memory. + * + * \param subscription the Subscription to register. + * \return an unsigned 64-bit integer which is the subscription's unique id. + */ + uint64_t + add_subscription(subscription::SubscriptionBase::SharedPtr subscription) + { + auto id = IntraProcessManager::get_next_unique_id(); + state_->add_subscription(id, subscription); + return id; + } + + /// Unregister a subscription using the subscription's unique id. + /* This method does not allocate memory. + * + * \param intra_process_subscription_id id of the subscription to remove. + */ + void + remove_subscription(uint64_t intra_process_subscription_id) + { + state_->remove_subscription(intra_process_subscription_id); + } + + /// Register a publisher with the manager, returns the publisher unique id. + /* In addition to generating and returning a unique id for the publisher, + * this method creates internal ring buffer storage for "in-flight" intra + * process messages which are stored when store_intra_process_message is + * called with this publisher's unique id. + * + * The buffer_size must be less than or equal to the max uint64_t value. + * If the buffer_size is 0 then a buffer size is calculated using the + * publisher's QoS settings. + * The default is to use the depth field of the publisher's QoS. + * TODO(wjwwood): Consider doing depth *= 1.2, round up, or similar. + * TODO(wjwwood): Consider what to do for keep all. + * + * This method is templated on the publisher's message type so that internal + * storage of the same type can be allocated. + * + * This method will allocate memory. + * + * \param publisher publisher to be registered with the manager. + * \param buffer_size if 0 (default) a size is calculated based on the QoS. + * \return an unsigned 64-bit integer which is the publisher's unique id. + */ + template + uint64_t + add_publisher(typename publisher::Publisher::SharedPtr publisher, + size_t buffer_size = 0) + { + auto id = IntraProcessManager::get_next_unique_id(); + size_t size = buffer_size > 0 ? buffer_size : publisher->get_queue_size(); + auto mrb = mapped_ring_buffer::MappedRingBuffer::MessageAlloc>::make_shared( + size, publisher->get_allocator()); + state_->add_publisher(id, publisher, mrb, size); + return id; + } + + /// Unregister a publisher using the publisher's unique id. + /* This method does not allocate memory. + * + * \param intra_process_publisher_id id of the publisher to remove. + */ + void + remove_publisher(uint64_t intra_process_publisher_id) + { + state_->remove_publisher(intra_process_publisher_id); + } + + /// Store a message in the manager, and return the message sequence number. + /* The given message is stored in internal storage using the given publisher + * id and the newly generated message sequence, which is also returned. + * The combination of publisher id and message sequence number can later + * be used with a subscription id to retrieve the message by calling + * take_intra_process_message. + * The number of times take_intra_process_message can be called with this + * unique pair of id's is determined by the number of subscriptions currently + * subscribed to the same topic and which share the same Context, i.e. once + * for each subscription which should receive the intra process message. + * + * The ownership of the incoming message is transfered to the internal + * storage in order to avoid copying the message data. + * Therefore, the message parameter will no longer contain the original + * message after calling this method. + * Instead it will either be a nullptr or it will contain the ownership of + * the message instance which was displaced. + * If the message parameter is not equal to nullptr after calling this method + * then a message was prematurely displaced, i.e. take_intra_process_message + * had not been called on it as many times as was expected. + * + * This method can throw an exception if the publisher id is not found or + * if the publisher shared_ptr given to add_publisher has gone out of scope. + * + * This method does allocate memory. + * + * \param intra_process_publisher_id the id of the publisher of this message. + * \param message the message that is being stored. + * \return the message sequence number. + */ + template, + typename Deleter = std::default_delete> + uint64_t + store_intra_process_message( + uint64_t intra_process_publisher_id, + std::unique_ptr & message) + { + using MRBMessageAlloc = typename std::allocator_traits::template rebind_alloc; + using TypedMRB = typename mapped_ring_buffer::MappedRingBuffer; + uint64_t message_seq = 0; + mapped_ring_buffer::MappedRingBufferBase::SharedPtr buffer = state_->get_publisher_info_for_id( + intra_process_publisher_id, message_seq); + typename TypedMRB::SharedPtr typed_buffer = std::static_pointer_cast(buffer); + if (!typed_buffer) { + throw std::runtime_error("Typecast failed due to incorrect message type"); + } + + // Insert the message into the ring buffer using the message_seq to identify it. + bool did_replace = typed_buffer->push_and_replace(message_seq, message); + // TODO(wjwwood): do something when a message was displaced. log debug? + (void)did_replace; // Avoid unused variable warning. + + state_->store_intra_process_message(intra_process_publisher_id, message_seq); + + // Return the message sequence which is sent to the subscription. + return message_seq; + } + + /// Take an intra process message. + /* The intra_process_publisher_id and message_sequence_number parameters + * uniquely identify a message instance, which should be taken. + * + * The requesting_subscriptions_intra_process_id parameter is used to make + * sure the requesting subscription was intended to receive this message + * instance. + * This check is made because it could happen that the requester + * comes up after the publish event, so it still receives the notification of + * a new intra process message, but it wasn't registered with the manager at + * the time of publishing, causing it to take when it wasn't intended. + * This should be avioded unless latching-like behavior is involved. + * + * The message parameter is used to store the taken message. + * On the last expected call to this method, the ownership is transfered out + * of internal storage and into the message parameter. + * On all previous calls a copy of the internally stored message is made and + * the ownership of the copy is transfered to the message parameter. + * TODO(wjwwood): update this documentation when latching is supported. + * + * The message parameter can be set to nullptr if: + * + * - The publisher id is not found. + * - The message sequence is not found for the given publisher id. + * - The requesting subscription's id is not in the list of intended takers. + * - The requesting subscription's id has been used before with this message. + * + * This method may allocate memory to copy the stored message. + * + * \param intra_process_publisher_id the id of the message's publisher. + * \param message_sequence_number the sequence number of the message. + * \param requesting_subscriptions_intra_process_id the subscription's id. + * \param message the message typed unique_ptr used to return the message. + */ + template, + typename Deleter = std::default_delete> + void + take_intra_process_message( + uint64_t intra_process_publisher_id, + uint64_t message_sequence_number, + uint64_t requesting_subscriptions_intra_process_id, + std::unique_ptr & message) + { + using MRBMessageAlloc = typename std::allocator_traits::template rebind_alloc; + using TypedMRB = mapped_ring_buffer::MappedRingBuffer; + message = nullptr; + + size_t target_subs_size = 0; + mapped_ring_buffer::MappedRingBufferBase::SharedPtr buffer = state_->take_intra_process_message( + intra_process_publisher_id, + message_sequence_number, + requesting_subscriptions_intra_process_id, + target_subs_size + ); + typename TypedMRB::SharedPtr typed_buffer = std::static_pointer_cast(buffer); + if (!typed_buffer) { + return; + } + // Return a copy or the unique_ptr (ownership) depending on how many subscriptions are left. + if (target_subs_size) { + // There are more subscriptions to serve, return a copy. + typed_buffer->get_copy_at_key(message_sequence_number, message); + } else { + // This is the last one to be returned, transfer ownership. + typed_buffer->pop_at_key(message_sequence_number, message); + } + } + + /// Return true if the given rmw_gid_t matches any stored Publishers. + bool + matches_any_publishers(const rmw_gid_t * id) const + { + return state_->matches_any_publishers(id); + } + +private: + static uint64_t get_next_unique_id() + { + auto next_id = next_unique_id_.fetch_add(1, std::memory_order_relaxed); + // Check for rollover (we started at 1). + if (0 == next_id) { + // This puts a technical limit on the number of times you can add a publisher or subscriber. + // But even if you could add (and remove) them at 1 kHz (very optimistic rate) + // it would still be a very long time before you could exhaust the pool of id's: + // 2^64 / 1000 times per sec / 60 sec / 60 min / 24 hours / 365 days = 584,942,417 years + // So around 585 million years. Even at 1 GHz, it would take 585 years. + // I think it's safe to avoid trying to handle overflow. + // If we roll over then it's most likely a bug. + // *INDENT-OFF* (prevent uncrustify from making unecessary indents here) + throw std::overflow_error( + "exhausted the unique id's for publishers and subscribers in this process " + "(congratulations your computer is either extremely fast or extremely old)"); + // *INDENT-ON* + } + return next_id; + } + + static std::atomic next_unique_id_; + + IntraProcessManagerStateBase::SharedPtr state_; +}; + +std::atomic IntraProcessManager::next_unique_id_ {1}; + +} // namespace intra_process_manager +} // namespace rclcpp + +#endif // RCLCPP__INTRA_PROCESS_MANAGER_HPP_ diff --git a/rclcpp/src/rclcpp/intra_process_manager_state.cpp b/rclcpp/src/rclcpp/intra_process_manager_state.cpp new file mode 100644 index 0000000..8a4bc5e --- /dev/null +++ b/rclcpp/src/rclcpp/intra_process_manager_state.cpp @@ -0,0 +1,270 @@ +// Copyright 2015 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef RCLCPP__INTRA_PROCESS_MANAGER_STATE_HPP_ +#define RCLCPP__INTRA_PROCESS_MANAGER_STATE_HPP_ + +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace rclcpp +{ +namespace intra_process_manager +{ + +class IntraProcessManagerStateBase +{ +public: + RCLCPP_SMART_PTR_DEFINITIONS_NOT_COPYABLE(IntraProcessManagerStateBase); + + virtual void + add_subscription(uint64_t id, subscription::SubscriptionBase::SharedPtr subscription) = 0; + + virtual void + remove_subscription(uint64_t intra_process_subscription_id) = 0; + + virtual void add_publisher(uint64_t id, + publisher::PublisherBase::WeakPtr publisher, + mapped_ring_buffer::MappedRingBufferBase::SharedPtr mrb, + size_t size) = 0; + + virtual void + remove_publisher(uint64_t intra_process_publisher_id) = 0; + + virtual mapped_ring_buffer::MappedRingBufferBase::SharedPtr + get_publisher_info_for_id( + uint64_t intra_process_publisher_id, + uint64_t & message_seq) = 0; + + virtual void + store_intra_process_message(uint64_t intra_process_publisher_id, uint64_t message_seq) = 0; + + virtual mapped_ring_buffer::MappedRingBufferBase::SharedPtr + take_intra_process_message(uint64_t intra_process_publisher_id, + uint64_t message_sequence_number, + uint64_t requesting_subscriptions_intra_process_id, + size_t & size) = 0; + + virtual bool + matches_any_publishers(const rmw_gid_t * id) const = 0; +}; + +template> +class IntraProcessManagerState : public IntraProcessManagerStateBase +{ +public: + void + add_subscription(uint64_t id, subscription::SubscriptionBase::SharedPtr subscription) + { + subscriptions_[id] = subscription; + subscription_ids_by_topic_[subscription->get_topic_name()].insert(id); + } + + void + remove_subscription(uint64_t intra_process_subscription_id) + { + subscriptions_.erase(intra_process_subscription_id); + for (auto & pair : subscription_ids_by_topic_) { + pair.second.erase(intra_process_subscription_id); + } + // Iterate over all publisher infos and all stored subscription id's and + // remove references to this subscription's id. + for (auto & publisher_pair : publishers_) { + for (auto & sub_pair : publisher_pair.second.target_subscriptions_by_message_sequence) { + sub_pair.second.erase(intra_process_subscription_id); + } + } + } + + void add_publisher(uint64_t id, + publisher::PublisherBase::WeakPtr publisher, + mapped_ring_buffer::MappedRingBufferBase::SharedPtr mrb, + size_t size) + { + publishers_[id].publisher = publisher; + // As long as the size of the ring buffer is less than the max sequence number, we're safe. + if (size > std::numeric_limits::max()) { + throw std::invalid_argument("the calculated buffer size is too large"); + } + publishers_[id].sequence_number.store(0); + + publishers_[id].buffer = mrb; + publishers_[id].target_subscriptions_by_message_sequence.reserve(size); + } + + void + remove_publisher(uint64_t intra_process_publisher_id) + { + publishers_.erase(intra_process_publisher_id); + } + + // return message_seq and mrb + mapped_ring_buffer::MappedRingBufferBase::SharedPtr + get_publisher_info_for_id( + uint64_t intra_process_publisher_id, + uint64_t & message_seq) + { + auto it = publishers_.find(intra_process_publisher_id); + if (it == publishers_.end()) { + throw std::runtime_error("store_intra_process_message called with invalid publisher id"); + } + PublisherInfo & info = it->second; + // Calculate the next message sequence number. + message_seq = info.sequence_number.fetch_add(1, std::memory_order_relaxed); + + return info.buffer; + } + + void + store_intra_process_message(uint64_t intra_process_publisher_id, uint64_t message_seq) + { + auto it = publishers_.find(intra_process_publisher_id); + if (it == publishers_.end()) { + throw std::runtime_error("store_intra_process_message called with invalid publisher id"); + } + PublisherInfo & info = it->second; + auto publisher = info.publisher.lock(); + if (!publisher) { + throw std::runtime_error("publisher has unexpectedly gone out of scope"); + } + + // Figure out what subscriptions should receive the message. + auto & destined_subscriptions = subscription_ids_by_topic_[publisher->get_topic_name()]; + // Store the list for later comparison. + info.target_subscriptions_by_message_sequence[message_seq].clear(); + std::copy( + destined_subscriptions.begin(), destined_subscriptions.end(), + // Memory allocation occurs in info.target_subscriptions_by_message_sequence[message_seq] + std::inserter( + info.target_subscriptions_by_message_sequence[message_seq], + // This ends up only being a hint to std::set, could also be .begin(). + info.target_subscriptions_by_message_sequence[message_seq].end() + ) + ); + } + + mapped_ring_buffer::MappedRingBufferBase::SharedPtr + take_intra_process_message(uint64_t intra_process_publisher_id, + uint64_t message_sequence_number, + uint64_t requesting_subscriptions_intra_process_id, + size_t & size + ) + { + PublisherInfo * info; + { + auto it = publishers_.find(intra_process_publisher_id); + if (it == publishers_.end()) { + // Publisher is either invalid or no longer exists. + return 0; + } + info = &it->second; + } + // Figure out how many subscriptions are left. + AllocSet * target_subs; + { + auto it = info->target_subscriptions_by_message_sequence.find(message_sequence_number); + if (it == info->target_subscriptions_by_message_sequence.end()) { + // Message is no longer being stored by this publisher. + return 0; + } + target_subs = &it->second; + } + { + auto it = std::find( + target_subs->begin(), target_subs->end(), + requesting_subscriptions_intra_process_id); + if (it == target_subs->end()) { + // This publisher id/message seq pair was not intended for this subscription. + return 0; + } + target_subs->erase(it); + } + size = target_subs->size(); + return info->buffer; + } + + bool + matches_any_publishers(const rmw_gid_t * id) const + { + for (auto & publisher_pair : publishers_) { + auto publisher = publisher_pair.second.publisher.lock(); + if (!publisher) { + continue; + } + if (*publisher.get() == id) { + return true; + } + } + return false; + } + +private: + template + using RebindAlloc = typename std::allocator_traits::template rebind_alloc; + + using AllocSet = std::set, RebindAlloc>; + using SubscriptionMap = std::unordered_map, std::equal_to, + RebindAlloc>>; + using IDTopicMap = std::map, RebindAlloc>>; + + SubscriptionMap subscriptions_; + + IDTopicMap subscription_ids_by_topic_; + + struct PublisherInfo + { + RCLCPP_DISABLE_COPY(PublisherInfo); + + PublisherInfo() = default; + + publisher::PublisherBase::WeakPtr publisher; + std::atomic sequence_number; + mapped_ring_buffer::MappedRingBufferBase::SharedPtr buffer; + + using TargetSubscriptionsMap = std::unordered_map, std::equal_to, + RebindAlloc>>; + TargetSubscriptionsMap target_subscriptions_by_message_sequence; + }; + + using PublisherMap = std::unordered_map, std::equal_to, + RebindAlloc>>; + + PublisherMap publishers_; +}; + +static IntraProcessManagerStateBase::SharedPtr create_default_state() +{ + return std::make_shared>(); +} + +} // namespace intra_process_manager +} // namespace rclcpp + +#endif // RCLCPP__INTRA_PROCESS_MANAGER_STATE_HPP_ diff --git a/rclcpp/src/rclcpp/memory_strategies.cpp b/rclcpp/src/rclcpp/memory_strategies.cpp new file mode 100644 index 0000000..f5a1a2a --- /dev/null +++ b/rclcpp/src/rclcpp/memory_strategies.cpp @@ -0,0 +1,36 @@ +// Copyright 2015 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef RCLCPP__MEMORY_STRATEGIES_HPP_ +#define RCLCPP__MEMORY_STRATEGIES_HPP_ + +#include +#include + +namespace rclcpp +{ +namespace memory_strategies +{ + +using rclcpp::memory_strategies::allocator_memory_strategy::AllocatorMemoryStrategy; + +static memory_strategy::MemoryStrategy::SharedPtr create_default_strategy() +{ + return std::make_shared>(); +} + +} // namespace memory_strategies +} // namespace rclcpp + +#endif // RCLCPP__MEMORY_STRATEGIES_HPP_ diff --git a/rclcpp/src/rclcpp/memory_strategy.cpp b/rclcpp/src/rclcpp/memory_strategy.cpp new file mode 100644 index 0000000..895ea9d --- /dev/null +++ b/rclcpp/src/rclcpp/memory_strategy.cpp @@ -0,0 +1,255 @@ +// Copyright 2015 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef RCLCPP__MEMORY_STRATEGY_HPP_ +#define RCLCPP__MEMORY_STRATEGY_HPP_ + +#include +#include + +#include "rclcpp/any_executable.hpp" +#include "rclcpp/macros.hpp" +#include "rclcpp/node.hpp" + +namespace rclcpp +{ +namespace memory_strategy +{ + +/// Delegate for handling memory allocations while the Executor is executing. +/** + * By default, the memory strategy dynamically allocates memory for structures that come in from + * the rmw implementation after the executor waits for work, based on the number of entities that + * come through. + */ +class MemoryStrategy +{ +public: + RCLCPP_SMART_PTR_DEFINITIONS_NOT_COPYABLE(MemoryStrategy); + using WeakNodeVector = std::vector>; + + // return the new number of subscribers + virtual size_t fill_subscriber_handles(void ** & ptr) = 0; + + // return the new number of services + virtual size_t fill_service_handles(void ** & ptr) = 0; + + // return the new number of clients + virtual size_t fill_client_handles(void ** & ptr) = 0; + + virtual void clear_active_entities() = 0; + + virtual void clear_handles() = 0; + virtual void revalidate_handles() = 0; + virtual bool collect_entities(const WeakNodeVector & weak_nodes) = 0; + + /// Provide a newly initialized AnyExecutable object. + // \return Shared pointer to the fresh executable. + virtual executor::AnyExecutable::SharedPtr instantiate_next_executable() = 0; + + virtual void + get_next_subscription(executor::AnyExecutable::SharedPtr any_exec, + const WeakNodeVector & weak_nodes) = 0; + + virtual void + get_next_service(executor::AnyExecutable::SharedPtr any_exec, + const WeakNodeVector & weak_nodes) = 0; + + virtual void + get_next_client(executor::AnyExecutable::SharedPtr any_exec, + const WeakNodeVector & weak_nodes) = 0; + + static rclcpp::subscription::SubscriptionBase::SharedPtr + get_subscription_by_handle(void * subscriber_handle, + const WeakNodeVector & weak_nodes) + { + for (auto & weak_node : weak_nodes) { + auto node = weak_node.lock(); + if (!node) { + continue; + } + for (auto & weak_group : node->get_callback_groups()) { + auto group = weak_group.lock(); + if (!group) { + continue; + } + for (auto & weak_subscription : group->get_subscription_ptrs()) { + auto subscription = weak_subscription.lock(); + if (subscription) { + if (subscription->get_subscription_handle()->data == subscriber_handle) { + return subscription; + } + if (subscription->get_intra_process_subscription_handle() && + subscription->get_intra_process_subscription_handle()->data == subscriber_handle) + { + return subscription; + } + } + } + } + } + return nullptr; + } + + static rclcpp::service::ServiceBase::SharedPtr + get_service_by_handle(void * service_handle, const WeakNodeVector & weak_nodes) + { + for (auto & weak_node : weak_nodes) { + auto node = weak_node.lock(); + if (!node) { + continue; + } + for (auto & weak_group : node->get_callback_groups()) { + auto group = weak_group.lock(); + if (!group) { + continue; + } + for (auto & service : group->get_service_ptrs()) { + if (service->get_service_handle()->data == service_handle) { + return service; + } + } + } + } + return nullptr; + } + + static rclcpp::client::ClientBase::SharedPtr + get_client_by_handle(void * client_handle, const WeakNodeVector & weak_nodes) + { + for (auto & weak_node : weak_nodes) { + auto node = weak_node.lock(); + if (!node) { + continue; + } + for (auto & weak_group : node->get_callback_groups()) { + auto group = weak_group.lock(); + if (!group) { + continue; + } + for (auto & client : group->get_client_ptrs()) { + if (client->get_client_handle()->data == client_handle) { + return client; + } + } + } + } + return nullptr; + } + + static rclcpp::node::Node::SharedPtr + get_node_by_group(rclcpp::callback_group::CallbackGroup::SharedPtr group, + const WeakNodeVector & weak_nodes) + { + if (!group) { + return nullptr; + } + for (auto & weak_node : weak_nodes) { + auto node = weak_node.lock(); + if (!node) { + continue; + } + for (auto & weak_group : node->get_callback_groups()) { + auto callback_group = weak_group.lock(); + if (callback_group == group) { + return node; + } + } + } + return nullptr; + } + + static rclcpp::callback_group::CallbackGroup::SharedPtr + get_group_by_subscription( + rclcpp::subscription::SubscriptionBase::SharedPtr subscription, + const WeakNodeVector & weak_nodes) + { + for (auto & weak_node : weak_nodes) { + auto node = weak_node.lock(); + if (!node) { + continue; + } + for (auto & weak_group : node->get_callback_groups()) { + auto group = weak_group.lock(); + if (!group) { + continue; + } + for (auto & weak_sub : group->get_subscription_ptrs()) { + auto sub = weak_sub.lock(); + if (sub == subscription) { + return group; + } + } + } + } + return nullptr; + } + + static rclcpp::callback_group::CallbackGroup::SharedPtr + get_group_by_service( + rclcpp::service::ServiceBase::SharedPtr service, + const WeakNodeVector & weak_nodes) + { + for (auto & weak_node : weak_nodes) { + auto node = weak_node.lock(); + if (!node) { + continue; + } + for (auto & weak_group : node->get_callback_groups()) { + auto group = weak_group.lock(); + if (!group) { + continue; + } + for (auto & serv : group->get_service_ptrs()) { + if (serv == service) { + return group; + } + } + } + } + return nullptr; + } + + static rclcpp::callback_group::CallbackGroup::SharedPtr + get_group_by_client(rclcpp::client::ClientBase::SharedPtr client, + const WeakNodeVector & weak_nodes) + { + for (auto & weak_node : weak_nodes) { + auto node = weak_node.lock(); + if (!node) { + continue; + } + for (auto & weak_group : node->get_callback_groups()) { + auto group = weak_group.lock(); + if (!group) { + continue; + } + for (auto & cli : group->get_client_ptrs()) { + if (cli == client) { + return group; + } + } + } + } + return nullptr; + } + + +}; + +} // namespace memory_strategy + +} // namespace rclcpp + +#endif // RCLCPP__MEMORY_STRATEGY_HPP_ diff --git a/rclcpp/include/rclcpp/node_impl.hpp b/rclcpp/src/rclcpp/node.cpp similarity index 100% rename from rclcpp/include/rclcpp/node_impl.hpp rename to rclcpp/src/rclcpp/node.cpp diff --git a/rclcpp/src/rclcpp/parameter.cpp b/rclcpp/src/rclcpp/parameter.cpp new file mode 100644 index 0000000..f10be34 --- /dev/null +++ b/rclcpp/src/rclcpp/parameter.cpp @@ -0,0 +1,337 @@ +// Copyright 2015 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef RCLCPP_RCLCPP_PARAMETER_HPP_ +#define RCLCPP_RCLCPP_PARAMETER_HPP_ + +#include +#include +#include + +#include + +#include +#include +#include + +namespace rclcpp +{ + +namespace parameter +{ + +enum ParameterType +{ + PARAMETER_NOT_SET=rcl_interfaces::msg::ParameterType::PARAMETER_NOT_SET, + PARAMETER_BOOL=rcl_interfaces::msg::ParameterType::PARAMETER_BOOL, + PARAMETER_INTEGER=rcl_interfaces::msg::ParameterType::PARAMETER_INTEGER, + PARAMETER_DOUBLE=rcl_interfaces::msg::ParameterType::PARAMETER_DOUBLE, + PARAMETER_STRING=rcl_interfaces::msg::ParameterType::PARAMETER_STRING, + PARAMETER_BYTES=rcl_interfaces::msg::ParameterType::PARAMETER_BYTES, +}; + +// Structure to store an arbitrary parameter with templated get/set methods +class ParameterVariant +{ +public: + ParameterVariant() + : name_("") + { + value_.type = rcl_interfaces::msg::ParameterType::PARAMETER_NOT_SET; + } + explicit ParameterVariant(const std::string & name, const bool bool_value) + : name_(name) + { + value_.bool_value = bool_value; + value_.type = rcl_interfaces::msg::ParameterType::PARAMETER_BOOL; + } + explicit ParameterVariant(const std::string & name, const int int_value) + : name_(name) + { + value_.integer_value = int_value; + value_.type = rcl_interfaces::msg::ParameterType::PARAMETER_INTEGER; + } + explicit ParameterVariant(const std::string & name, const int64_t int_value) + : name_(name) + { + value_.integer_value = int_value; + value_.type = rcl_interfaces::msg::ParameterType::PARAMETER_INTEGER; + } + explicit ParameterVariant(const std::string & name, const float double_value) + : name_(name) + { + value_.double_value = double_value; + value_.type = rcl_interfaces::msg::ParameterType::PARAMETER_DOUBLE; + } + explicit ParameterVariant(const std::string & name, const double double_value) + : name_(name) + { + value_.double_value = double_value; + value_.type = rcl_interfaces::msg::ParameterType::PARAMETER_DOUBLE; + } + explicit ParameterVariant(const std::string & name, const std::string & string_value) + : name_(name) + { + value_.string_value = string_value; + value_.type = rcl_interfaces::msg::ParameterType::PARAMETER_STRING; + } + explicit ParameterVariant(const std::string & name, const char * string_value) + : ParameterVariant(name, std::string(string_value)) {} + explicit ParameterVariant(const std::string & name, const std::vector & bytes_value) + : name_(name) + { + value_.bytes_value = bytes_value; + value_.type = rcl_interfaces::msg::ParameterType::PARAMETER_BYTES; + } + + inline ParameterType get_type() const {return static_cast(value_.type); } + + inline std::string get_type_name() const + { + switch (get_type()) { + case rclcpp::parameter::ParameterType::PARAMETER_BOOL: + return "bool"; + case rclcpp::parameter::ParameterType::PARAMETER_INTEGER: + return "integer"; + case rclcpp::parameter::ParameterType::PARAMETER_DOUBLE: + return "double"; + case rclcpp::parameter::ParameterType::PARAMETER_STRING: + return "string"; + case rclcpp::parameter::ParameterType::PARAMETER_BYTES: + return "bytes"; + case rclcpp::parameter::ParameterType::PARAMETER_NOT_SET: + return "not set"; + default: + // *INDENT-OFF* (prevent uncrustify from making unecessary indents here) + throw std::runtime_error( + "Unexpected type from ParameterVariant: " + std::to_string(get_type())); + // *INDENT-ON* + } + } + + inline std::string get_name() const & {return name_; } + + inline rcl_interfaces::msg::ParameterValue get_parameter_value() const + { + return value_; + } + + template + typename std::enable_if::type + get_value() const + { + if (value_.type != rcl_interfaces::msg::ParameterType::PARAMETER_INTEGER) { + // TODO: use custom exception + throw std::runtime_error("Invalid type"); + } + return value_.integer_value; + } + template + typename std::enable_if::type + get_value() const + { + if (value_.type != rcl_interfaces::msg::ParameterType::PARAMETER_DOUBLE) { + // TODO: use custom exception + throw std::runtime_error("Invalid type"); + } + return value_.double_value; + } + template + typename std::enable_if::type + get_value() const + { + if (value_.type != rcl_interfaces::msg::ParameterType::PARAMETER_STRING) { + // TODO: use custom exception + throw std::runtime_error("Invalid type"); + } + return value_.string_value; + } + template + typename std::enable_if::type + get_value() const + { + if (value_.type != rcl_interfaces::msg::ParameterType::PARAMETER_BOOL) { + // TODO: use custom exception + throw std::runtime_error("Invalid type"); + } + return value_.bool_value; + } + template + typename std::enable_if &>::type + get_value() const + { + if (value_.type != rcl_interfaces::msg::ParameterType::PARAMETER_BYTES) { + // TODO: use custom exception + throw std::runtime_error("Invalid type"); + } + return value_.bytes_value; + } + + int64_t as_int() const {return get_value(); } + + double as_double() const {return get_value(); } + + const std::string & as_string() const {return get_value(); } + + bool as_bool() const {return get_value(); } + + const std::vector & as_bytes() const + { + return get_value(); + } + + static ParameterVariant from_parameter(const rcl_interfaces::msg::Parameter & parameter) + { + switch (parameter.value.type) { + case PARAMETER_BOOL: + return ParameterVariant(parameter.name, parameter.value.bool_value); + case PARAMETER_INTEGER: + return ParameterVariant(parameter.name, parameter.value.integer_value); + case PARAMETER_DOUBLE: + return ParameterVariant(parameter.name, parameter.value.double_value); + case PARAMETER_STRING: + return ParameterVariant(parameter.name, parameter.value.string_value); + case PARAMETER_BYTES: + return ParameterVariant(parameter.name, parameter.value.bytes_value); + case PARAMETER_NOT_SET: + throw std::runtime_error("Type from ParameterValue is not set"); + default: + // TODO(wjwwood): use custom exception + // *INDENT-OFF* (prevent uncrustify from making unecessary indents here) + throw std::runtime_error( + "Unexpected type from ParameterVariant: " + std::to_string(parameter.value.type)); + // *INDENT-ON* + } + } + + rcl_interfaces::msg::Parameter to_parameter() + { + rcl_interfaces::msg::Parameter parameter; + parameter.name = name_; + parameter.value = value_; + return parameter; + } + + std::string value_to_string() const + { + switch (get_type()) { + case rclcpp::parameter::ParameterType::PARAMETER_BOOL: + return as_bool() ? "true" : "false"; + case rclcpp::parameter::ParameterType::PARAMETER_INTEGER: + return std::to_string(as_int()); + case rclcpp::parameter::ParameterType::PARAMETER_DOUBLE: + return std::to_string(as_double()); + case rclcpp::parameter::ParameterType::PARAMETER_STRING: + return as_string(); + case rclcpp::parameter::ParameterType::PARAMETER_BYTES: + { + std::stringstream bytes; + bool first_byte = true; + bytes << "[" << std::hex; + for (auto & byte : as_bytes()) { + bytes << "0x" << byte; + if (!first_byte) { + bytes << ", "; + } else { + first_byte = false; + } + } + return bytes.str(); + } + case rclcpp::parameter::ParameterType::PARAMETER_NOT_SET: + return "not set"; + default: + // *INDENT-OFF* (prevent uncrustify from making unecessary indents here) + throw std::runtime_error( + "Unexpected type from ParameterVariant: " + std::to_string(get_type())); + // *INDENT-ON* + } + } + +private: + std::string name_; + rcl_interfaces::msg::ParameterValue value_; +}; + + +/* Return a json encoded version of the parameter intended for a dict. */ +std::string _to_json_dict_entry(const ParameterVariant & param) +{ + std::stringstream ss; + ss << "\"" << param.get_name() << "\": "; + ss << "{\"type\": \"" << param.get_type_name() << "\", "; + ss << "\"value\": \"" << param.value_to_string() << "\"}"; + return ss.str(); +} + + +} /* namespace parameter */ + +} /* namespace rclcpp */ + +namespace std +{ +/* Return a json encoded version of the parameter intended for a list. */ +inline std::string to_string(const rclcpp::parameter::ParameterVariant & param) +{ + std::stringstream ss; + ss << "{\"name\": \"" << param.get_name() << "\", "; + ss << "\"type\": \"" << param.get_type_name() << "\", "; + ss << "\"value\": \"" << param.value_to_string() << "\"}"; + return ss.str(); +} + +/* Return a json encoded version of a vector of parameters, as a string*/ +inline std::string to_string(const std::vector & parameters) +{ + std::stringstream ss; + ss << "{"; + bool first = true; + for (const auto & pv : parameters) { + if (first == false) { + ss << ", "; + } else { + first = false; + } + ss << rclcpp::parameter::_to_json_dict_entry(pv); + } + ss << "}"; + return ss.str(); +} + +} /* namespace std */ + +namespace rclcpp +{ +namespace parameter +{ + +std::ostream & operator<<(std::ostream & os, const rclcpp::parameter::ParameterVariant & pv) +{ + os << std::to_string(pv); + return os; +} + +std::ostream & operator<<(std::ostream & os, const std::vector & parameters) +{ + os << std::to_string(parameters); + return os; +} + +} /* namespace parameter */ + +} /* namespace rclcpp */ + +#endif /* RCLCPP_RCLCPP_PARAMETER_HPP_ */ diff --git a/rclcpp/src/rclcpp/parameter_client.cpp b/rclcpp/src/rclcpp/parameter_client.cpp new file mode 100644 index 0000000..66314bd --- /dev/null +++ b/rclcpp/src/rclcpp/parameter_client.cpp @@ -0,0 +1,359 @@ +// Copyright 2015 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef RCLCPP_RCLCPP_PARAMETER_CLIENT_HPP_ +#define RCLCPP_RCLCPP_PARAMETER_CLIENT_HPP_ + +#include + +#include + +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace rclcpp +{ + +namespace parameter_client +{ + +class AsyncParametersClient +{ + +public: + RCLCPP_SMART_PTR_DEFINITIONS(AsyncParametersClient); + + AsyncParametersClient(const rclcpp::node::Node::SharedPtr node, + const std::string & remote_node_name = "") + : node_(node) + { + if (remote_node_name != "") { + remote_node_name_ = remote_node_name; + } else { + remote_node_name_ = node_->get_name(); + } + get_parameters_client_ = node_->create_client( + remote_node_name_ + "__get_parameters"); + get_parameter_types_client_ = node_->create_client( + remote_node_name_ + "__get_parameter_types"); + set_parameters_client_ = node_->create_client( + remote_node_name_ + "__set_parameters"); + list_parameters_client_ = node_->create_client( + remote_node_name_ + "__list_parameters"); + describe_parameters_client_ = node_->create_client( + remote_node_name_ + "__describe_parameters"); + } + + std::shared_future> + get_parameters( + const std::vector & names, + std::function>)> callback = nullptr) + { + auto promise_result = + std::make_shared>>(); + auto future_result = promise_result->get_future().share(); + + auto request = std::make_shared(); + request->names = names; + + get_parameters_client_->async_send_request( + request, + [request, promise_result, future_result, &callback]( + rclcpp::client::Client::SharedFuture cb_f) { + std::vector parameter_variants; + auto & pvalues = cb_f.get()->values; + + for (auto & pvalue : pvalues) { + auto i = &pvalue - &pvalues[0]; + rcl_interfaces::msg::Parameter parameter; + parameter.name = request->names[i]; + parameter.value = pvalue; + parameter_variants.push_back(rclcpp::parameter::ParameterVariant::from_parameter( + parameter)); + } + + promise_result->set_value(parameter_variants); + if (callback != nullptr) { + callback(future_result); + } + } + ); + + return future_result; + } + + std::shared_future> + get_parameter_types( + const std::vector & names, + std::function>)> callback = nullptr) + { + auto promise_result = + std::make_shared>>(); + auto future_result = promise_result->get_future().share(); + + auto request = std::make_shared(); + request->names = names; + + get_parameter_types_client_->async_send_request( + request, + [promise_result, future_result, &callback]( + rclcpp::client::Client::SharedFuture cb_f) { + std::vector types; + auto & pts = cb_f.get()->types; + for (auto & pt : pts) { + pts.push_back(static_cast(pt)); + } + promise_result->set_value(types); + if (callback != nullptr) { + callback(future_result); + } + } + ); + + return future_result; + } + + std::shared_future> + set_parameters( + const std::vector & parameters, + std::function>)> callback = + nullptr) + { + auto promise_result = + std::make_shared>>(); + auto future_result = promise_result->get_future().share(); + + auto request = std::make_shared(); + + std::transform(parameters.begin(), parameters.end(), std::back_inserter( + request->parameters), []( + rclcpp::parameter::ParameterVariant p) {return p.to_parameter(); }); + + set_parameters_client_->async_send_request( + request, + [promise_result, future_result, &callback]( + rclcpp::client::Client::SharedFuture cb_f) { + promise_result->set_value(cb_f.get()->results); + if (callback != nullptr) { + callback(future_result); + } + } + ); + + return future_result; + } + + std::shared_future + set_parameters_atomically( + const std::vector & parameters, + std::function)> callback = + nullptr) + { + auto promise_result = + std::make_shared>(); + auto future_result = promise_result->get_future().share(); + + auto request = std::make_shared(); + + std::transform(parameters.begin(), parameters.end(), std::back_inserter( + request->parameters), []( + rclcpp::parameter::ParameterVariant p) {return p.to_parameter(); }); + + set_parameters_atomically_client_->async_send_request( + request, + [promise_result, future_result, &callback]( + rclcpp::client::Client::SharedFuture cb_f) { + promise_result->set_value(cb_f.get()->result); + if (callback != nullptr) { + callback(future_result); + } + } + ); + + return future_result; + } + + std::shared_future + list_parameters( + const std::vector & prefixes, + uint64_t depth, + std::function)> callback = + nullptr) + { + auto promise_result = + std::make_shared>(); + auto future_result = promise_result->get_future().share(); + + auto request = std::make_shared(); + request->prefixes = prefixes; + request->depth = depth; + + list_parameters_client_->async_send_request( + request, + [promise_result, future_result, &callback]( + rclcpp::client::Client::SharedFuture cb_f) { + promise_result->set_value(cb_f.get()->result); + if (callback != nullptr) { + callback(future_result); + } + } + ); + + return future_result; + } + + template + typename rclcpp::subscription::Subscription::SharedPtr + on_parameter_event(FunctorT callback) + { + return node_->create_subscription( + "parameter_events", callback, rmw_qos_profile_parameter_events); + } + +private: + const rclcpp::node::Node::SharedPtr node_; + rclcpp::client::Client::SharedPtr get_parameters_client_; + rclcpp::client::Client::SharedPtr + get_parameter_types_client_; + rclcpp::client::Client::SharedPtr set_parameters_client_; + rclcpp::client::Client::SharedPtr + set_parameters_atomically_client_; + rclcpp::client::Client::SharedPtr list_parameters_client_; + rclcpp::client::Client::SharedPtr + describe_parameters_client_; + std::string remote_node_name_; +}; + +class SyncParametersClient +{ +public: + RCLCPP_SMART_PTR_DEFINITIONS(SyncParametersClient); + + SyncParametersClient( + rclcpp::node::Node::SharedPtr node) + : node_(node) + { + executor_ = std::make_shared(); + async_parameters_client_ = std::make_shared(node); + } + + SyncParametersClient( + rclcpp::executor::Executor::SharedPtr executor, + rclcpp::node::Node::SharedPtr node) + : executor_(executor), node_(node) + { + async_parameters_client_ = std::make_shared(node); + } + + std::vector + get_parameters(const std::vector & parameter_names) + { + auto f = async_parameters_client_->get_parameters(parameter_names); + if (rclcpp::executors::spin_node_until_future_complete(*executor_, node_, f) == + rclcpp::executors::FutureReturnCode::SUCCESS) + { + return f.get(); + } + // Return an empty vector if unsuccessful + return std::vector(); + } + + std::vector + get_parameter_types(const std::vector & parameter_names) + { + auto f = async_parameters_client_->get_parameter_types(parameter_names); + + if (rclcpp::executors::spin_node_until_future_complete(*executor_, node_, f) == + rclcpp::executors::FutureReturnCode::SUCCESS) + { + return f.get(); + } + return std::vector(); + } + + std::vector + set_parameters(const std::vector & parameters) + { + auto f = async_parameters_client_->set_parameters(parameters); + + if (rclcpp::executors::spin_node_until_future_complete(*executor_, node_, f) == + rclcpp::executors::FutureReturnCode::SUCCESS) + { + return f.get(); + } + return std::vector(); + } + + rcl_interfaces::msg::SetParametersResult + set_parameters_atomically(const std::vector & parameters) + { + auto f = async_parameters_client_->set_parameters_atomically(parameters); + + if (rclcpp::executors::spin_node_until_future_complete(*executor_, node_, f) == + rclcpp::executors::FutureReturnCode::SUCCESS) + { + return f.get(); + } + + throw std::runtime_error("Unable to get result of set parameters service call."); + } + + rcl_interfaces::msg::ListParametersResult + list_parameters( + const std::vector & parameter_prefixes, + uint64_t depth) + { + auto f = async_parameters_client_->list_parameters(parameter_prefixes, depth); + + if (rclcpp::executors::spin_node_until_future_complete(*executor_, node_, f) == + rclcpp::executors::FutureReturnCode::SUCCESS) + { + return f.get(); + } + + throw std::runtime_error("Unable to get result of list parameters service call."); + } + + template + typename rclcpp::subscription::Subscription::SharedPtr + on_parameter_event(FunctorT callback) + { + return async_parameters_client_->on_parameter_event(callback); + } + +private: + rclcpp::executor::Executor::SharedPtr executor_; + rclcpp::node::Node::SharedPtr node_; + AsyncParametersClient::SharedPtr async_parameters_client_; +}; + +} /* namespace parameter_client */ + +} /* namespace rclcpp */ + +#endif /* RCLCPP_RCLCPP_PARAMETER_CLIENT_HPP_ */ diff --git a/rclcpp/src/rclcpp/parameter_service.cpp b/rclcpp/src/rclcpp/parameter_service.cpp new file mode 100644 index 0000000..ffc2983 --- /dev/null +++ b/rclcpp/src/rclcpp/parameter_service.cpp @@ -0,0 +1,177 @@ +// Copyright 2015 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef RCLCPP_RCLCPP_PARAMETER_SERVICE_HPP_ +#define RCLCPP_RCLCPP_PARAMETER_SERVICE_HPP_ + +#include + +#include + +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include + +namespace rclcpp +{ + +namespace parameter_service +{ + +class ParameterService +{ + +public: + RCLCPP_SMART_PTR_DEFINITIONS(ParameterService); + + ParameterService(const rclcpp::node::Node::SharedPtr node) + : node_(node) + { + std::weak_ptr captured_node = node_; + get_parameters_service_ = node_->create_service( + node_->get_name() + "__get_parameters", [captured_node]( + const std::shared_ptr, + const std::shared_ptr request, + std::shared_ptr response) + { + auto node = captured_node.lock(); + if (!node) { + return; + } + auto values = node->get_parameters(request->names); + for (auto & pvariant : values) { + response->values.push_back(pvariant.get_parameter_value()); + } + } + ); + + get_parameter_types_service_ = node_->create_service( + node_->get_name() + "__get_parameter_types", [captured_node]( + const std::shared_ptr, + const std::shared_ptr request, + std::shared_ptr response) + { + auto node = captured_node.lock(); + if (!node) { + return; + } + auto types = node->get_parameter_types(request->names); + std::transform(types.cbegin(), types.cend(), + std::back_inserter(response->types), [](const uint8_t & type) { + return static_cast(type); + }); + } + ); + + set_parameters_service_ = node_->create_service( + node_->get_name() + "__set_parameters", [captured_node]( + const std::shared_ptr, + const std::shared_ptr request, + std::shared_ptr response) + { + auto node = captured_node.lock(); + if (!node) { + return; + } + std::vector pvariants; + for (auto & p : request->parameters) { + pvariants.push_back(rclcpp::parameter::ParameterVariant::from_parameter(p)); + } + auto results = node->set_parameters(pvariants); + response->results = results; + } + ); + + set_parameters_atomically_service_ = + node_->create_service( + node_->get_name() + "__set_parameters_atomically", [captured_node]( + const std::shared_ptr, + const std::shared_ptr request, + std::shared_ptr response) + { + auto node = captured_node.lock(); + if (!node) { + return; + } + std::vector pvariants; + std::transform(request->parameters.cbegin(), request->parameters.cend(), + std::back_inserter(pvariants), + [](const rcl_interfaces::msg::Parameter & p) { + return rclcpp::parameter::ParameterVariant:: + from_parameter(p); + }); + auto result = node->set_parameters_atomically(pvariants); + response->result = result; + } + ); + + describe_parameters_service_ = node_->create_service( + node_->get_name() + "__describe_parameters", [captured_node]( + const std::shared_ptr, + const std::shared_ptr request, + std::shared_ptr response) + { + auto node = captured_node.lock(); + if (!node) { + return; + } + auto descriptors = node->describe_parameters(request->names); + response->descriptors = descriptors; + } + ); + + list_parameters_service_ = node_->create_service( + node_->get_name() + "__list_parameters", [captured_node]( + const std::shared_ptr, + const std::shared_ptr request, + std::shared_ptr response) + { + auto node = captured_node.lock(); + if (!node) { + return; + } + auto result = node->list_parameters(request->prefixes, request->depth); + response->result = result; + } + ); + + + } + +private: + const rclcpp::node::Node::SharedPtr node_; + rclcpp::service::Service::SharedPtr get_parameters_service_; + rclcpp::service::Service::SharedPtr + get_parameter_types_service_; + rclcpp::service::Service::SharedPtr set_parameters_service_; + rclcpp::service::Service::SharedPtr + set_parameters_atomically_service_; + rclcpp::service::Service::SharedPtr + describe_parameters_service_; + rclcpp::service::Service::SharedPtr list_parameters_service_; +}; + +} /* namespace parameter_service */ + +} /* namespace rclcpp */ + +#endif /* RCLCPP_RCLCPP_PARAMETER_SERVICE_HPP_ */ diff --git a/rclcpp/src/rclcpp/publisher.cpp b/rclcpp/src/rclcpp/publisher.cpp new file mode 100644 index 0000000..d0acec5 --- /dev/null +++ b/rclcpp/src/rclcpp/publisher.cpp @@ -0,0 +1,354 @@ +// Copyright 2014 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef RCLCPP__PUBLISHER_HPP_ +#define RCLCPP__PUBLISHER_HPP_ + +#include +#include + +#include +#include +#include +#include +#include + +#include "rcl_interfaces/msg/intra_process_message.hpp" +#include "rmw/impl/cpp/demangle.hpp" + +#include "rclcpp/allocator/allocator_deleter.hpp" +#include "rclcpp/macros.hpp" + +namespace rclcpp +{ + +// Forward declaration for friend statement +namespace node +{ +class Node; +} // namespace node + +namespace publisher +{ + +class PublisherBase +{ +public: + RCLCPP_SMART_PTR_DEFINITIONS(PublisherBase); + /// Default constructor. + /** + * Typically, a publisher is not created through this method, but instead is created through a + * call to `Node::create_publisher`. + * \param[in] node_handle The corresponding rmw representation of the owner node. + * \param[in] publisher_handle The rmw publisher handle corresponding to this publisher. + * \param[in] topic The topic that this publisher publishes on. + * \param[in] queue_size The maximum number of unpublished messages to queue. + */ + PublisherBase( + std::shared_ptr node_handle, + rmw_publisher_t * publisher_handle, + std::string topic, + size_t queue_size) + : node_handle_(node_handle), publisher_handle_(publisher_handle), + intra_process_publisher_handle_(nullptr), + topic_(topic), queue_size_(queue_size), + intra_process_publisher_id_(0), store_intra_process_message_(nullptr) + { + // Life time of this object is tied to the publisher handle. + if (rmw_get_gid_for_publisher(publisher_handle_, &rmw_gid_) != RMW_RET_OK) { + // *INDENT-OFF* (prevent uncrustify from making unecessary indents here) + throw std::runtime_error( + std::string("failed to get publisher gid: ") + rmw_get_error_string_safe()); + // *INDENT-ON* + } + } + + /// Default destructor. + virtual ~PublisherBase() + { + if (intra_process_publisher_handle_) { + if (rmw_destroy_publisher(node_handle_.get(), intra_process_publisher_handle_)) { + fprintf( + stderr, + "Error in destruction of intra process rmw publisher handle: %s\n", + rmw_get_error_string_safe()); + } + } + if (publisher_handle_) { + if (rmw_destroy_publisher(node_handle_.get(), publisher_handle_) != RMW_RET_OK) { + fprintf( + stderr, + "Error in destruction of rmw publisher handle: %s\n", + rmw_get_error_string_safe()); + } + } + } + + /// Get the topic that this publisher publishes on. + // \return The topic name. + const std::string & + get_topic_name() const + { + return topic_; + } + + /// Get the queue size for this publisher. + // \return The queue size. + size_t + get_queue_size() const + { + return queue_size_; + } + + /// Get the global identifier for this publisher (used in rmw and by DDS). + // \return The gid. + const rmw_gid_t & + get_gid() const + { + return rmw_gid_; + } + + /// Get the global identifier for this publisher used by intra-process communication. + // \return The intra-process gid. + const rmw_gid_t & + get_intra_process_gid() const + { + return intra_process_rmw_gid_; + } + + /// Compare this publisher to a gid. + /** + * Note that this function calls the next function. + * \param[in] gid Reference to a gid. + * \return True if the publisher's gid matches the input. + */ + bool + operator==(const rmw_gid_t & gid) const + { + return *this == &gid; + } + + /// Compare this publisher to a pointer gid. + /** + * A wrapper for comparing this publisher's gid to the input using rmw_compare_gids_equal. + * \param[in] gid A pointer to a gid. + * \return True if this publisher's gid matches the input. + */ + bool + operator==(const rmw_gid_t * gid) const + { + bool result = false; + auto ret = rmw_compare_gids_equal(gid, &this->get_gid(), &result); + if (ret != RMW_RET_OK) { + throw std::runtime_error( + std::string("failed to compare gids: ") + rmw_get_error_string_safe()); + } + if (!result) { + ret = rmw_compare_gids_equal(gid, &this->get_intra_process_gid(), &result); + if (ret != RMW_RET_OK) { + throw std::runtime_error( + std::string("failed to compare gids: ") + rmw_get_error_string_safe()); + } + } + return result; + } + + typedef std::function StoreMessageCallbackT; + +protected: + void + setup_intra_process( + uint64_t intra_process_publisher_id, + StoreMessageCallbackT callback, + rmw_publisher_t * intra_process_publisher_handle) + { + intra_process_publisher_id_ = intra_process_publisher_id; + store_intra_process_message_ = callback; + intra_process_publisher_handle_ = intra_process_publisher_handle; + // Life time of this object is tied to the publisher handle. + auto ret = rmw_get_gid_for_publisher(intra_process_publisher_handle_, &intra_process_rmw_gid_); + if (ret != RMW_RET_OK) { + // *INDENT-OFF* (prevent uncrustify from making unecessary indents here) + throw std::runtime_error( + std::string("failed to create intra process publisher gid: ") + + rmw_get_error_string_safe()); + // *INDENT-ON* + } + } + + std::shared_ptr node_handle_; + + rmw_publisher_t * publisher_handle_; + rmw_publisher_t * intra_process_publisher_handle_; + + std::string topic_; + size_t queue_size_; + + uint64_t intra_process_publisher_id_; + StoreMessageCallbackT store_intra_process_message_; + + rmw_gid_t rmw_gid_; + rmw_gid_t intra_process_rmw_gid_; + + std::mutex intra_process_publish_mutex_; +}; + +/// A publisher publishes messages of any type to a topic. +template> +class Publisher : public PublisherBase +{ + friend rclcpp::node::Node; + +public: + using MessageAllocTraits = allocator::AllocRebind; + using MessageAlloc = typename MessageAllocTraits::allocator_type; + using MessageDeleter = allocator::Deleter; + using MessageUniquePtr = std::unique_ptr; + + RCLCPP_SMART_PTR_DEFINITIONS(Publisher); + + Publisher( + std::shared_ptr node_handle, + rmw_publisher_t * publisher_handle, + std::string topic, + size_t queue_size, + std::shared_ptr allocator) + : PublisherBase(node_handle, publisher_handle, topic, queue_size) + { + message_allocator_ = std::make_shared(*allocator.get()); + allocator::set_allocator_for_deleter(&message_deleter_, message_allocator_.get()); + } + + + /// Send a message to the topic for this publisher. + /** + * This function is templated on the input message type, MessageT. + * \param[in] msg A shared pointer to the message to send. + */ + void + publish(std::unique_ptr & msg) + { + this->do_inter_process_publish(msg.get()); + if (store_intra_process_message_) { + // Take the pointer from the unique_msg, release it and pass as a void * + // to the ipm. The ipm should then capture it again as a unique_ptr of + // the correct type. + // TODO(wjwwood): + // investigate how to transfer the custom deleter (if there is one) + // from the incoming unique_ptr through to the ipm's unique_ptr. + // See: http://stackoverflow.com/questions/11002641/dynamic-casting-for-unique-ptr + MessageT * msg_ptr = msg.get(); + msg.release(); + uint64_t message_seq; + { + std::lock_guard lock(intra_process_publish_mutex_); + message_seq = + store_intra_process_message_(intra_process_publisher_id_, msg_ptr, typeid(MessageT)); + } + rcl_interfaces::msg::IntraProcessMessage ipm; + ipm.publisher_id = intra_process_publisher_id_; + ipm.message_sequence = message_seq; + auto status = rmw_publish(intra_process_publisher_handle_, &ipm); + if (status != RMW_RET_OK) { + // *INDENT-OFF* (prevent uncrustify from making unecessary indents here) + throw std::runtime_error( + std::string("failed to publish intra process message: ") + rmw_get_error_string_safe()); + // *INDENT-ON* + } + } else { + // Always destroy the message, even if we don't consume it, for consistency. + msg.reset(); + } + } + + void + publish(const std::shared_ptr & msg) + { + // Avoid allocating when not using intra process. + if (!store_intra_process_message_) { + // In this case we're not using intra process. + return this->do_inter_process_publish(msg.get()); + } + // Otherwise we have to allocate memory in a unique_ptr and pass it along. + // TODO(wjwwood): + // The intra process manager should probably also be able to store + // shared_ptr's and do the "smart" thing based on other intra process + // subscriptions. For now call the other publish(). + auto ptr = MessageAllocTraits::allocate(*message_allocator_.get(), 1); + MessageAllocTraits::construct(*message_allocator_.get(), ptr, *msg.get()); + MessageUniquePtr unique_msg(ptr, message_deleter_); + return this->publish(unique_msg); + } + + void + publish(std::shared_ptr msg) + { + // Avoid allocating when not using intra process. + if (!store_intra_process_message_) { + // In this case we're not using intra process. + return this->do_inter_process_publish(msg.get()); + } + // Otherwise we have to allocate memory in a unique_ptr and pass it along. + // TODO(wjwwood): + // The intra process manager should probably also be able to store + // shared_ptr's and do the "smart" thing based on other intra process + // subscriptions. For now call the other publish(). + auto ptr = MessageAllocTraits::allocate(*message_allocator_.get(), 1); + MessageAllocTraits::construct(*message_allocator_.get(), ptr, *msg.get()); + MessageUniquePtr unique_msg(ptr, message_deleter_); + return this->publish(unique_msg); + } + + void + publish(const MessageT & msg) + { + // Avoid allocating when not using intra process. + if (!store_intra_process_message_) { + // In this case we're not using intra process. + return this->do_inter_process_publish(&msg); + } + // Otherwise we have to allocate memory in a unique_ptr and pass it along. + auto ptr = MessageAllocTraits::allocate(*message_allocator_.get(), 1); + MessageAllocTraits::construct(*message_allocator_.get(), ptr, msg); + MessageUniquePtr unique_msg(ptr, message_deleter_); + return this->publish(unique_msg); + } + + std::shared_ptr get_allocator() const + { + return message_allocator_; + } + +protected: + void + do_inter_process_publish(const MessageT * msg) + { + auto status = rmw_publish(publisher_handle_, msg); + if (status != RMW_RET_OK) { + // *INDENT-OFF* (prevent uncrustify from making unecessary indents here) + throw std::runtime_error( + std::string("failed to publish message: ") + rmw_get_error_string_safe()); + // *INDENT-ON* + } + } + + std::shared_ptr message_allocator_; + + MessageDeleter message_deleter_; +}; + +} // namespace publisher +} // namespace rclcpp + +#endif // RCLCPP__PUBLISHER_HPP_ diff --git a/rclcpp/src/rclcpp/service.cpp b/rclcpp/src/rclcpp/service.cpp new file mode 100644 index 0000000..2bfbb09 --- /dev/null +++ b/rclcpp/src/rclcpp/service.cpp @@ -0,0 +1,158 @@ +// Copyright 2014 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef RCLCPP_RCLCPP_SERVICE_HPP_ +#define RCLCPP_RCLCPP_SERVICE_HPP_ + +#include +#include +#include +#include +#include + +#include +#include + +#include +#include + +namespace rclcpp +{ + +namespace service +{ + +class ServiceBase +{ + +public: + RCLCPP_SMART_PTR_DEFINITIONS_NOT_COPYABLE(ServiceBase); + + ServiceBase( + std::shared_ptr node_handle, + rmw_service_t * service_handle, + const std::string service_name) + : node_handle_(node_handle), service_handle_(service_handle), service_name_(service_name) + {} + + virtual ~ServiceBase() + { + if (service_handle_) { + if (rmw_destroy_service(service_handle_) != RMW_RET_OK) { + std::stringstream ss; + ss << "Error in destruction of rmw service_handle_ handle: " << + rmw_get_error_string_safe() << '\n'; + (std::cerr << ss.str()).flush(); + } + } + } + + std::string get_service_name() + { + return this->service_name_; + } + + const rmw_service_t * get_service_handle() + { + return this->service_handle_; + } + + virtual std::shared_ptr create_request() = 0; + virtual std::shared_ptr create_request_header() = 0; + virtual void handle_request( + std::shared_ptr request_header, + std::shared_ptr request) = 0; + +private: + RCLCPP_DISABLE_COPY(ServiceBase); + + std::shared_ptr node_handle_; + + rmw_service_t * service_handle_; + std::string service_name_; + +}; + +using namespace any_service_callback; + +template +class Service : public ServiceBase +{ +public: + using CallbackType = std::function< + void( + const std::shared_ptr, + std::shared_ptr)>; + + using CallbackWithHeaderType = std::function< + void( + const std::shared_ptr, + const std::shared_ptr, + std::shared_ptr)>; + RCLCPP_SMART_PTR_DEFINITIONS(Service); + + Service( + std::shared_ptr node_handle, + rmw_service_t * service_handle, + const std::string & service_name, + AnyServiceCallback any_callback) + : ServiceBase(node_handle, service_handle, service_name), any_callback_(any_callback) + {} + + Service() = delete; + + std::shared_ptr create_request() + { + return std::shared_ptr(new typename ServiceT::Request()); + } + + std::shared_ptr create_request_header() + { + // TODO(wjwwood): This should probably use rmw_request_id's allocator. + // (since it is a C type) + return std::shared_ptr(new rmw_request_id_t); + } + + void handle_request(std::shared_ptr request_header, std::shared_ptr request) + { + auto typed_request = std::static_pointer_cast(request); + auto typed_request_header = std::static_pointer_cast(request_header); + auto response = std::shared_ptr(new typename ServiceT::Response); + any_callback_.dispatch(typed_request_header, typed_request, response); + send_response(typed_request_header, response); + } + + void send_response( + std::shared_ptr req_id, + std::shared_ptr response) + { + rmw_ret_t status = rmw_send_response(get_service_handle(), req_id.get(), response.get()); + if (status != RMW_RET_OK) { + // *INDENT-OFF* (prevent uncrustify from making unecessary indents here) + throw std::runtime_error( + std::string("failed to send response: ") + rmw_get_error_string_safe()); + // *INDENT-ON* + } + } + +private: + RCLCPP_DISABLE_COPY(Service); + + AnyServiceCallback any_callback_; +}; + +} /* namespace service */ +} /* namespace rclcpp */ + +#endif /* RCLCPP_RCLCPP_SERVICE_HPP_ */ diff --git a/rclcpp/src/rclcpp/subscription.cpp b/rclcpp/src/rclcpp/subscription.cpp new file mode 100644 index 0000000..fb9c952 --- /dev/null +++ b/rclcpp/src/rclcpp/subscription.cpp @@ -0,0 +1,284 @@ +// Copyright 2014 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef RCLCPP__SUBSCRIPTION_HPP_ +#define RCLCPP__SUBSCRIPTION_HPP_ + +#include +#include + +#include +#include +#include +#include +#include + +#include "rcl_interfaces/msg/intra_process_message.hpp" + +#include "rclcpp/macros.hpp" +#include "rclcpp/message_memory_strategy.hpp" +#include "rclcpp/any_subscription_callback.hpp" + +namespace rclcpp +{ + +namespace node +{ +class Node; +} // namespace node + +namespace subscription +{ + +/// Virtual base class for subscriptions. This pattern allows us to iterate over different template +/// specializations of Subscription, among other things. +class SubscriptionBase +{ +public: + RCLCPP_SMART_PTR_DEFINITIONS_NOT_COPYABLE(SubscriptionBase); + + /// Default constructor. + /** + * \param[in] node_handle The rmw representation of the node that owns this subscription. + * \param[in] topic_name Name of the topic to subscribe to. + * \param[in] ignore_local_publications True to ignore local publications (unused). + */ + SubscriptionBase( + std::shared_ptr node_handle, + rmw_subscription_t * subscription_handle, + const std::string & topic_name, + bool ignore_local_publications) + : intra_process_subscription_handle_(nullptr), + node_handle_(node_handle), + subscription_handle_(subscription_handle), + topic_name_(topic_name), + ignore_local_publications_(ignore_local_publications) + { + // To avoid unused private member warnings. + (void)ignore_local_publications_; + } + + /// Default destructor. + virtual ~SubscriptionBase() + { + if (subscription_handle_) { + if (rmw_destroy_subscription(node_handle_.get(), subscription_handle_) != RMW_RET_OK) { + std::stringstream ss; + ss << "Error in destruction of rmw subscription handle: " << + rmw_get_error_string_safe() << '\n'; + (std::cerr << ss.str()).flush(); + } + } + if (intra_process_subscription_handle_) { + auto ret = rmw_destroy_subscription(node_handle_.get(), intra_process_subscription_handle_); + if (ret != RMW_RET_OK) { + std::stringstream ss; + ss << "Error in destruction of rmw intra process subscription handle: " << + rmw_get_error_string_safe() << '\n'; + (std::cerr << ss.str()).flush(); + } + } + } + + /// Get the topic that this subscription is subscribed on. + const std::string & get_topic_name() const + { + return this->topic_name_; + } + + const rmw_subscription_t * get_subscription_handle() const + { + return subscription_handle_; + } + + const rmw_subscription_t * get_intra_process_subscription_handle() const + { + return intra_process_subscription_handle_; + } + + /// Borrow a new message. + // \return Shared pointer to the fresh message. + virtual std::shared_ptr create_message() = 0; + /// Check if we need to handle the message, and execute the callback if we do. + /** + * \param[in] message Shared pointer to the message to handle. + * \param[in] message_info Metadata associated with this message. + */ + virtual void handle_message( + std::shared_ptr & message, + const rmw_message_info_t & message_info) = 0; + + /// Return the message borrowed in create_message. + // \param[in] Shared pointer to the returned message. + virtual void return_message(std::shared_ptr & message) = 0; + virtual void handle_intra_process_message( + rcl_interfaces::msg::IntraProcessMessage & ipm, + const rmw_message_info_t & message_info) = 0; + +protected: + rmw_subscription_t * intra_process_subscription_handle_; + +private: + RCLCPP_DISABLE_COPY(SubscriptionBase); + + std::shared_ptr node_handle_; + + rmw_subscription_t * subscription_handle_; + + std::string topic_name_; + bool ignore_local_publications_; +}; + +using namespace any_subscription_callback; + +/// Subscription implementation, templated on the type of message this subscription receives. +template> +class Subscription : public SubscriptionBase +{ + friend class rclcpp::node::Node; + +public: + using MessageAllocTraits = allocator::AllocRebind; + using MessageAlloc = typename MessageAllocTraits::allocator_type; + using MessageDeleter = allocator::Deleter; + using MessageUniquePtr = std::unique_ptr; + + RCLCPP_SMART_PTR_DEFINITIONS(Subscription); + + /// Default constructor. + /** + * The constructor for a subscription is almost never called directly. Instead, subscriptions + * should be instantiated through Node::create_subscription. + * \param[in] node_handle rmw representation of the node that owns this subscription. + * \param[in] topic_name Name of the topic to subscribe to. + * \param[in] ignore_local_publications True to ignore local publications (unused). + * \param[in] callback User-defined callback to call when a message is received. + * \param[in] memory_strategy The memory strategy to be used for managing message memory. + */ + Subscription( + std::shared_ptr node_handle, + rmw_subscription_t * subscription_handle, + const std::string & topic_name, + bool ignore_local_publications, + AnySubscriptionCallback callback, + typename message_memory_strategy::MessageMemoryStrategy::SharedPtr memory_strategy = + message_memory_strategy::MessageMemoryStrategy::create_default()) + : SubscriptionBase(node_handle, subscription_handle, topic_name, ignore_local_publications), + any_callback_(callback), + message_memory_strategy_(memory_strategy), + get_intra_process_message_callback_(nullptr), + matches_any_intra_process_publishers_(nullptr) + { + } + + /// Support dynamically setting the message memory strategy. + /** + * Behavior may be undefined if called while the subscription could be executing. + * \param[in] message_memory_strategy Shared pointer to the memory strategy to set. + */ + void set_message_memory_strategy( + typename message_memory_strategy::MessageMemoryStrategy::SharedPtr message_memory_strategy) + { + message_memory_strategy_ = message_memory_strategy; + } + std::shared_ptr create_message() + { + /* The default message memory strategy provides a dynamically allocated message on each call to + * create_message, though alternative memory strategies that re-use a preallocated message may be + * used (see rclcpp/strategies/message_pool_memory_strategy.hpp). + */ + return message_memory_strategy_->borrow_message(); + } + + void handle_message(std::shared_ptr & message, const rmw_message_info_t & message_info) + { + if (matches_any_intra_process_publishers_) { + if (matches_any_intra_process_publishers_(&message_info.publisher_gid)) { + // In this case, the message will be delivered via intra process and + // we should ignore this copy of the message. + return; + } + } + auto typed_message = std::static_pointer_cast(message); + any_callback_.dispatch(typed_message, message_info); + } + + void return_message(std::shared_ptr & message) + { + auto typed_message = std::static_pointer_cast(message); + message_memory_strategy_->return_message(typed_message); + } + + void handle_intra_process_message( + rcl_interfaces::msg::IntraProcessMessage & ipm, + const rmw_message_info_t & message_info) + { + if (!get_intra_process_message_callback_) { + // throw std::runtime_error( + // "handle_intra_process_message called before setup_intra_process"); + // TODO(wjwwood): for now, this could mean that intra process was just not enabled. + // However, this can only really happen if this node has it disabled, but the other doesn't. + return; + } + MessageUniquePtr msg; + get_intra_process_message_callback_( + ipm.publisher_id, + ipm.message_sequence, + intra_process_subscription_id_, + msg); + if (!msg) { + // This either occurred because the publisher no longer exists or the + // message requested is no longer being stored. + // TODO(wjwwood): should we notify someone of this? log error, log warning? + return; + } + any_callback_.dispatch_intra_process(msg, message_info); + } + +private: + typedef + std::function< + void (uint64_t, uint64_t, uint64_t, MessageUniquePtr &) + > GetMessageCallbackType; + typedef std::function MatchesAnyPublishersCallbackType; + + void setup_intra_process( + uint64_t intra_process_subscription_id, + rmw_subscription_t * intra_process_subscription, + GetMessageCallbackType get_message_callback, + MatchesAnyPublishersCallbackType matches_any_publisher_callback) + { + intra_process_subscription_id_ = intra_process_subscription_id; + intra_process_subscription_handle_ = intra_process_subscription; + get_intra_process_message_callback_ = get_message_callback; + matches_any_intra_process_publishers_ = matches_any_publisher_callback; + } + + RCLCPP_DISABLE_COPY(Subscription); + + AnySubscriptionCallback any_callback_; + typename message_memory_strategy::MessageMemoryStrategy::SharedPtr + message_memory_strategy_; + + GetMessageCallbackType get_intra_process_message_callback_; + MatchesAnyPublishersCallbackType matches_any_intra_process_publishers_; + uint64_t intra_process_subscription_id_; +}; + +} // namespace subscription +} // namespace rclcpp + +#endif // RCLCPP__SUBSCRIPTION_HPP_ diff --git a/rclcpp/src/rclcpp/timer.cpp b/rclcpp/src/rclcpp/timer.cpp new file mode 100644 index 0000000..9a69cb8 --- /dev/null +++ b/rclcpp/src/rclcpp/timer.cpp @@ -0,0 +1,177 @@ +// Copyright 2014 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef RCLCPP_RCLCPP_TIMER_HPP_ +#define RCLCPP_RCLCPP_TIMER_HPP_ + +#include +#include +#include +#include +#include + +#include +#include + +#include +#include +#include + +namespace rclcpp +{ + +namespace timer +{ + +using CallbackType = std::function; + +class TimerBase +{ + +public: + RCLCPP_SMART_PTR_DEFINITIONS_NOT_COPYABLE(TimerBase); + + TimerBase(std::chrono::nanoseconds period, CallbackType callback) + : period_(period), + callback_(callback), + canceled_(false) + { + } + + virtual ~TimerBase() + { + } + + void + cancel() + { + this->canceled_ = true; + } + + void execute_callback() const + { + callback_(); + } + + const CallbackType & get_callback() const + { + return callback_; + } + + /// Check how long the timer has until its next scheduled callback. + // \return A std::chrono::duration representing the relative time until the next callback. + virtual std::chrono::nanoseconds + time_until_trigger() = 0; + + /// Is the clock steady (i.e. is the time between ticks constant?) + // \return True if the clock used by this timer is steady. + virtual bool is_steady() = 0; + + /// Check if the timer needs to trigger the callback. + /** + * This function expects its caller to immediately trigger the callback after this function, + * since it maintains the last time the callback was triggered. + * \return True if the timer needs to trigger. + */ + virtual bool check_and_trigger() = 0; + +protected: + std::chrono::nanoseconds period_; + CallbackType callback_; + + bool canceled_; + +}; + +/// Generic timer templated on the clock type. Periodically executes a user-specified callback. +template +class GenericTimer : public TimerBase +{ + +public: + RCLCPP_SMART_PTR_DEFINITIONS(GenericTimer); + + /// Default constructor. + /** + * \param[in] period The interval at which the timer fires. + * \param[in] callback User-specified callback function. + */ + GenericTimer(std::chrono::nanoseconds period, CallbackType callback) + : TimerBase(period, callback), loop_rate_(period) + { + /* Subtracting the loop rate period ensures that the callback gets triggered + on the first call to check_and_trigger. */ + last_triggered_time_ = Clock::now() - period; + } + + /// Default destructor. + virtual ~GenericTimer() + { + // Stop the timer from running. + cancel(); + } + + bool + check_and_trigger() + { + if (canceled_) { + return false; + } + if (Clock::now() < last_triggered_time_) { + return false; + } + if (std::chrono::duration_cast(Clock::now() - last_triggered_time_) >= + loop_rate_.period()) + { + last_triggered_time_ = Clock::now(); + return true; + } + return false; + } + + std::chrono::nanoseconds + time_until_trigger() + { + std::chrono::nanoseconds time_until_trigger; + // Calculate the time between the next trigger and the current time + if (last_triggered_time_ + loop_rate_.period() < Clock::now()) { + // time is overdue, need to trigger immediately + time_until_trigger = std::chrono::nanoseconds::zero(); + } else { + time_until_trigger = std::chrono::duration_cast( + last_triggered_time_ - Clock::now()) + loop_rate_.period(); + } + return time_until_trigger; + } + + virtual bool + is_steady() + { + return Clock::is_steady; + } + +private: + RCLCPP_DISABLE_COPY(GenericTimer); + + rclcpp::rate::GenericRate loop_rate_; + std::chrono::time_point last_triggered_time_; + +}; + +using WallTimer = GenericTimer; + +} /* namespace timer */ +} /* namespace rclcpp */ + +#endif /* RCLCPP_RCLCPP_TIMER_HPP_ */ diff --git a/rclcpp/src/rclcpp/utilities.cpp b/rclcpp/src/rclcpp/utilities.cpp new file mode 100644 index 0000000..520818e --- /dev/null +++ b/rclcpp/src/rclcpp/utilities.cpp @@ -0,0 +1,226 @@ +// Copyright 2014 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef RCLCPP_RCLCPP_UTILITIES_HPP_ +#define RCLCPP_RCLCPP_UTILITIES_HPP_ + +// TODO(wjwwood): remove +#include + +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include + +// Determine if sigaction is available +#if __APPLE__ || _POSIX_C_SOURCE >= 1 || _XOPEN_SOURCE || _POSIX_SOURCE +#define HAS_SIGACTION +#endif + +namespace +{ +/// Represent the status of the global interrupt signal. +volatile sig_atomic_t g_signal_status = 0; +/// Guard condition for interrupting the rmw implementation when the global interrupt signal fired. +rmw_guard_condition_t * g_sigint_guard_cond_handle = \ + rmw_create_guard_condition(); +/// Condition variable for timed sleep (see sleep_for). +std::condition_variable g_interrupt_condition_variable; +std::atomic g_is_interrupted(false); +/// Mutex for protecting the global condition variable. +std::mutex g_interrupt_mutex; + +#ifdef HAS_SIGACTION +struct sigaction old_action; +#else +void (* old_signal_handler)(int) = 0; +#endif + +/// Handle the interrupt signal. +/** When the interrupt signal fires, the signal handler notifies the condition variable to wake up + * and triggers the interrupt guard condition, so that all global threads managed by rclcpp + * are interrupted. + */ +void +#ifdef HAS_SIGACTION +signal_handler(int signal_value, siginfo_t * siginfo, void * context) +#else +signal_handler(int signal_value) +#endif +{ + // TODO(wjwwood): remove + std::cout << "signal_handler(" << signal_value << ")" << std::endl; +#ifdef HAS_SIGACTION + if (old_action.sa_flags & SA_SIGINFO) { + if (old_action.sa_sigaction != NULL) { + old_action.sa_sigaction(signal_value, siginfo, context); + } + } else { + // *INDENT-OFF* + if ( + old_action.sa_handler != NULL && // Is set + old_action.sa_handler != SIG_DFL && // Is not default + old_action.sa_handler != SIG_IGN) // Is not ignored + // *INDENT-ON* + { + old_action.sa_handler(signal_value); + } + } +#else + if (old_signal_handler) { + old_signal_handler(signal_value); + } +#endif + g_signal_status = signal_value; + rmw_ret_t status = rmw_trigger_guard_condition(g_sigint_guard_cond_handle); + if (status != RMW_RET_OK) { + fprintf(stderr, + "[rclcpp::error] failed to trigger guard condition: %s\n", rmw_get_error_string_safe()); + } + g_is_interrupted.store(true); + g_interrupt_condition_variable.notify_all(); +} +} // namespace + +namespace rclcpp +{ + +RMW_THREAD_LOCAL size_t thread_id = 0; + +namespace utilities +{ + +/// Initialize communications via the rmw implementation and set up a global signal handler. +/** + * \param[in] argc Number of arguments. + * \param[in] argv Argument vector. Will eventually be used for passing options to rclcpp. + */ +void +init(int argc, char * argv[]) +{ + (void)argc; + (void)argv; + g_is_interrupted.store(false); + rmw_ret_t status = rmw_init(); + if (status != RMW_RET_OK) { + // *INDENT-OFF* (prevent uncrustify from making unecessary indents here) + throw std::runtime_error( + std::string("failed to initialize rmw implementation: ") + rmw_get_error_string_safe()); + // *INDENT-ON* + } +#ifdef HAS_SIGACTION + struct sigaction action; + memset(&action, 0, sizeof(action)); + sigemptyset(&action.sa_mask); + action.sa_sigaction = ::signal_handler; + action.sa_flags = SA_SIGINFO; + ssize_t ret = sigaction(SIGINT, &action, &old_action); + if (ret == -1) +#else + ::old_signal_handler = std::signal(SIGINT, ::signal_handler); + if (::old_signal_handler == SIG_ERR) +#endif + { + const size_t error_length = 1024; + char error_string[error_length]; +#ifndef _WIN32 + auto rc = strerror_r(errno, error_string, error_length); + if (rc) { + // *INDENT-OFF* + throw std::runtime_error( + "Failed to set SIGINT signal handler: (" + std::to_string(errno) + + ") unable to retrieve error string"); + // *INDENT-ON* + } + +#else + strerror_s(error_string, error_length, errno); +#endif + // *INDENT-OFF* + throw std::runtime_error( + std::string("Failed to set SIGINT signal handler: (" + std::to_string(errno) + ")") + + error_string); + // *INDENT-ON* + } +} + +/// Check rclcpp's status. +// \return True if SIGINT hasn't fired yet, false otherwise. +bool +ok() +{ + return ::g_signal_status == 0; +} + +/// Notify the signal handler and rmw that rclcpp is shutting down. +void +shutdown() +{ + g_signal_status = SIGINT; + rmw_ret_t status = rmw_trigger_guard_condition(g_sigint_guard_cond_handle); + if (status != RMW_RET_OK) { + fprintf(stderr, + "[rclcpp::error] failed to trigger guard condition: %s\n", rmw_get_error_string_safe()); + } + g_is_interrupted.store(true); + g_interrupt_condition_variable.notify_all(); +} + + +/// Get a handle to the rmw guard condition that manages the signal handler. +rmw_guard_condition_t * +get_global_sigint_guard_condition() +{ + return ::g_sigint_guard_cond_handle; +} + +/// Use the global condition variable to block for the specified amount of time. +/** + * \param[in] nanoseconds A std::chrono::duration representing how long to sleep for. + * \return True if the condition variable did not timeout. + */ +bool +sleep_for(const std::chrono::nanoseconds & nanoseconds) +{ + // TODO: determine if posix's nanosleep(2) is more efficient here + std::chrono::nanoseconds time_left = nanoseconds; + { + std::unique_lock lock(::g_interrupt_mutex); + auto start = std::chrono::steady_clock::now(); + ::g_interrupt_condition_variable.wait_for(lock, nanoseconds); + time_left -= std::chrono::steady_clock::now() - start; + } + if (time_left > std::chrono::nanoseconds::zero() && !g_is_interrupted) { + return sleep_for(time_left); + } + // Return true if the timeout elapsed successfully, otherwise false. + return !g_is_interrupted; +} + +} /* namespace utilities */ +} /* namespace rclcpp */ + +#ifdef HAS_SIGACTION +#undef HAS_SIGACTION +#endif + +#endif /* RCLCPP_RCLCPP_UTILITIES_HPP_ */